id: fastapi-advanced

FastAPI Advanced#

Four powerful patterns that take FastAPI from a simple REST server to a production-grade application platform: Background Tasks, WebSockets, Dependency Injection, and Lifespan Events.


Background Tasks#

Background tasks run after the HTTP response is sent. Use them for things the user doesn’t need to wait for: sending emails, writing to a database, calling external APIs, generating reports.

from fastapi import FastAPI, BackgroundTasks
import time, logging

app = FastAPI()
logger = logging.getLogger(__name__)

def send_email(to: str, subject: str, body: str):
    """This runs after the response is sent"""
    time.sleep(2)  # simulate slow email sending
    logger.info(f"Email sent to {to}: {subject}")

def write_audit_log(user_id: int, action: str):
    """Log to a slow external system"""
    time.sleep(0.5)
    logger.info(f"Audit: user {user_id} did {action}")

@app.post("/register")
async def register_user(email: str, background_tasks: BackgroundTasks):
    # User registration logic here
    user_id = 42

    # Schedule background tasks — they run after this function returns
    background_tasks.add_task(send_email, email, "Welcome!", "Thanks for signing up.")
    background_tasks.add_task(write_audit_log, user_id, "register")

    # This response is sent immediately — user doesn't wait 2.5 seconds
    return {"message": "Registered! Check your email.", "user_id": user_id}

?> When to use Background Tasks vs Celery/Redis Queue? ?> - Background Tasks: Fast jobs (<30 seconds), no retry needed, simple ?> - Celery + Redis: Long jobs, need retries, need progress tracking, distributed workers


WebSockets#

WebSockets provide bi-directional, real-time communication — unlike HTTP where only the client can initiate. Perfect for chat, live dashboards, collaborative editing, game servers.

Simple Echo WebSocket#

from fastapi import WebSocket, WebSocketDisconnect

@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()        # complete the handshake
    try:
        while True:
            data = await websocket.receive_text()      # wait for message
            await websocket.send_text(f"Echo: {data}") # send back
    except WebSocketDisconnect:
        print("Client disconnected")

Real-Time Chat Room#

from typing import List

class ConnectionManager:
    def __init__(self):
        self.active: List[WebSocket] = []

    async def connect(self, ws: WebSocket):
        await ws.accept()
        self.active.append(ws)

    def disconnect(self, ws: WebSocket):
        self.active.remove(ws)

    async def broadcast(self, message: str):
        for ws in self.active:
            await ws.send_text(message)

manager = ConnectionManager()

@app.websocket("/chat/{username}")
async def chat(websocket: WebSocket, username: str):
    await manager.connect(websocket)
    await manager.broadcast(f"🟢 {username} joined the chat")
    try:
        while True:
            msg = await websocket.receive_text()
            await manager.broadcast(f"{username}: {msg}")
    except WebSocketDisconnect:
        manager.disconnect(websocket)
        await manager.broadcast(f"🔴 {username} left the chat")

Test with a simple HTML client:

<!-- test-chat.html -->
<script>
const ws = new WebSocket("ws://localhost:8000/chat/Arjun");
ws.onmessage = (e) => console.log(e.data);
ws.send("Hello everyone!");
</script>

Dependency Injection#

Dependency Injection (DI) is FastAPI’s way to share logic, resources, and validation across routes without repeating code. The Depends() function is the key.

Database Connection (most common use case)#

from fastapi import Depends
import sqlite3

def get_db():
    """Open a DB connection, yield it, then close it"""
    conn = sqlite3.connect("app.db")
    try:
        yield conn           # ← routes use this
    finally:
        conn.close()         # ← always runs after the route

@app.get("/users")
def list_users(db: sqlite3.Connection = Depends(get_db)):
    cursor = db.execute("SELECT id, email FROM users")
    return [{"id": r[0], "email": r[1]} for r in cursor.fetchall()]

@app.post("/users")
def create_user(email: str, db: sqlite3.Connection = Depends(get_db)):
    db.execute("INSERT INTO users (email) VALUES (?)", [email])
    db.commit()
    return {"message": "Created"}

Layered Dependencies#

Dependencies can depend on other dependencies:

from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials

security = HTTPBearer()

def get_token(credentials: HTTPAuthorizationCredentials = Depends(security)) -> str:
    return credentials.credentials

def get_current_user(token: str = Depends(get_token)) -> dict:
    """Builds on get_token"""
    user = decode_jwt(token)   # from google-oauth topic
    return user

def get_admin_user(user: dict = Depends(get_current_user)) -> dict:
    """Builds on get_current_user"""
    if user.get("role") != "admin":
        raise HTTPException(403, "Admin access required")
    return user

# Each route only asks for what it needs
@app.get("/dashboard")
def dashboard(user: dict = Depends(get_current_user)):
    return {"welcome": user["email"]}

@app.delete("/users/{id}")
def delete_user(id: int, admin: dict = Depends(get_admin_user)):
    return {"deleted": id}

Caching Dependencies (singleton pattern)#

from functools import lru_cache
from pydantic_settings import BaseSettings

class Settings(BaseSettings):
    db_url: str
    api_key: str
    class Config:
        env_file = ".env"

@lru_cache()          # only runs once, result cached for app lifetime
def get_settings() -> Settings:
    return Settings()

@app.get("/config")
def show_config(settings: Settings = Depends(get_settings)):
    return {"db_url": settings.db_url}  # api_key hidden

Lifespan Events (Startup & Shutdown)#

Use lifespan to set up resources when the app starts and clean them up when it shuts down: database connection pools, loading ML models, connecting to Redis.

from contextlib import asynccontextmanager
from fastapi import FastAPI
import asyncpg   # async PostgreSQL driver

# Global state shared across requests
db_pool = None

@asynccontextmanager
async def lifespan(app: FastAPI):
    # ─── STARTUP ───
    global db_pool
    print("🚀 Starting up — connecting to database...")
    db_pool = await asyncpg.create_pool(
        "postgresql://user:pass@localhost/mydb",
        min_size=5,
        max_size=20,
    )
    print("✅ Database pool ready")

    yield    # ← app runs here

    # ─── SHUTDOWN ───
    print("🛑 Shutting down — closing database pool...")
    await db_pool.close()
    print("✅ Cleaned up")

app = FastAPI(lifespan=lifespan)

@app.get("/users")
async def get_users():
    async with db_pool.acquire() as conn:
        rows = await conn.fetch("SELECT id, email FROM users")
        return [dict(r) for r in rows]

Loading an ML Model at Startup#

from contextlib import asynccontextmanager
import joblib

ml_model = None

@asynccontextmanager
async def lifespan(app: FastAPI):
    global ml_model
    print("Loading model... (this might take a moment)")
    ml_model = joblib.load("model.pkl")   # blocking but only once
    print("Model loaded!")
    yield
    ml_model = None   # cleanup

app = FastAPI(lifespan=lifespan)

@app.post("/predict")
def predict(features: list[float]):
    prediction = ml_model.predict([features])
    return {"prediction": prediction[0]}

Putting It All Together#

from contextlib import asynccontextmanager
from fastapi import FastAPI, Depends, BackgroundTasks, WebSocket
import asyncpg, logging

logger = logging.getLogger(__name__)
db_pool = None

@asynccontextmanager
async def lifespan(app: FastAPI):
    global db_pool
    db_pool = await asyncpg.create_pool("postgresql://...")
    yield
    await db_pool.close()

app = FastAPI(lifespan=lifespan)

async def get_db():
    async with db_pool.acquire() as conn:
        yield conn

def notify_admin(event: str):
    logger.info(f"Admin notification: {event}")

@app.post("/orders")
async def create_order(
    item: str,
    background_tasks: BackgroundTasks,
    db = Depends(get_db),
):
    await db.execute("INSERT INTO orders (item) VALUES ($1)", item)
    background_tasks.add_task(notify_admin, f"New order: {item}")
    return {"status": "created"}

Summary#

PatternProblem It Solves
Background TasksDon’t make users wait for slow side effects
WebSocketsReal-time bidirectional communication
Dependency InjectionShare DB connections, auth, config without repetition
Lifespan EventsReliable startup/shutdown for resources