id: fastapi-advanced
FastAPI Advanced#
Four powerful patterns that take FastAPI from a simple REST server to a production-grade application platform: Background Tasks, WebSockets, Dependency Injection, and Lifespan Events.
Background Tasks#
Background tasks run after the HTTP response is sent. Use them for things the user doesn’t need to wait for: sending emails, writing to a database, calling external APIs, generating reports.
from fastapi import FastAPI, BackgroundTasks
import time, logging
app = FastAPI()
logger = logging.getLogger(__name__)
def send_email(to: str, subject: str, body: str):
"""This runs after the response is sent"""
time.sleep(2) # simulate slow email sending
logger.info(f"Email sent to {to}: {subject}")
def write_audit_log(user_id: int, action: str):
"""Log to a slow external system"""
time.sleep(0.5)
logger.info(f"Audit: user {user_id} did {action}")
@app.post("/register")
async def register_user(email: str, background_tasks: BackgroundTasks):
# User registration logic here
user_id = 42
# Schedule background tasks — they run after this function returns
background_tasks.add_task(send_email, email, "Welcome!", "Thanks for signing up.")
background_tasks.add_task(write_audit_log, user_id, "register")
# This response is sent immediately — user doesn't wait 2.5 seconds
return {"message": "Registered! Check your email.", "user_id": user_id}?> When to use Background Tasks vs Celery/Redis Queue? ?> - Background Tasks: Fast jobs (<30 seconds), no retry needed, simple ?> - Celery + Redis: Long jobs, need retries, need progress tracking, distributed workers
WebSockets#
WebSockets provide bi-directional, real-time communication — unlike HTTP where only the client can initiate. Perfect for chat, live dashboards, collaborative editing, game servers.
Simple Echo WebSocket#
from fastapi import WebSocket, WebSocketDisconnect
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept() # complete the handshake
try:
while True:
data = await websocket.receive_text() # wait for message
await websocket.send_text(f"Echo: {data}") # send back
except WebSocketDisconnect:
print("Client disconnected")Real-Time Chat Room#
from typing import List
class ConnectionManager:
def __init__(self):
self.active: List[WebSocket] = []
async def connect(self, ws: WebSocket):
await ws.accept()
self.active.append(ws)
def disconnect(self, ws: WebSocket):
self.active.remove(ws)
async def broadcast(self, message: str):
for ws in self.active:
await ws.send_text(message)
manager = ConnectionManager()
@app.websocket("/chat/{username}")
async def chat(websocket: WebSocket, username: str):
await manager.connect(websocket)
await manager.broadcast(f"🟢 {username} joined the chat")
try:
while True:
msg = await websocket.receive_text()
await manager.broadcast(f"{username}: {msg}")
except WebSocketDisconnect:
manager.disconnect(websocket)
await manager.broadcast(f"🔴 {username} left the chat")Test with a simple HTML client:
<!-- test-chat.html -->
<script>
const ws = new WebSocket("ws://localhost:8000/chat/Arjun");
ws.onmessage = (e) => console.log(e.data);
ws.send("Hello everyone!");
</script>Dependency Injection#
Dependency Injection (DI) is FastAPI’s way to share logic, resources, and validation across routes without repeating code. The Depends() function is the key.
Database Connection (most common use case)#
from fastapi import Depends
import sqlite3
def get_db():
"""Open a DB connection, yield it, then close it"""
conn = sqlite3.connect("app.db")
try:
yield conn # ← routes use this
finally:
conn.close() # ← always runs after the route
@app.get("/users")
def list_users(db: sqlite3.Connection = Depends(get_db)):
cursor = db.execute("SELECT id, email FROM users")
return [{"id": r[0], "email": r[1]} for r in cursor.fetchall()]
@app.post("/users")
def create_user(email: str, db: sqlite3.Connection = Depends(get_db)):
db.execute("INSERT INTO users (email) VALUES (?)", [email])
db.commit()
return {"message": "Created"}Layered Dependencies#
Dependencies can depend on other dependencies:
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
security = HTTPBearer()
def get_token(credentials: HTTPAuthorizationCredentials = Depends(security)) -> str:
return credentials.credentials
def get_current_user(token: str = Depends(get_token)) -> dict:
"""Builds on get_token"""
user = decode_jwt(token) # from google-oauth topic
return user
def get_admin_user(user: dict = Depends(get_current_user)) -> dict:
"""Builds on get_current_user"""
if user.get("role") != "admin":
raise HTTPException(403, "Admin access required")
return user
# Each route only asks for what it needs
@app.get("/dashboard")
def dashboard(user: dict = Depends(get_current_user)):
return {"welcome": user["email"]}
@app.delete("/users/{id}")
def delete_user(id: int, admin: dict = Depends(get_admin_user)):
return {"deleted": id}Caching Dependencies (singleton pattern)#
from functools import lru_cache
from pydantic_settings import BaseSettings
class Settings(BaseSettings):
db_url: str
api_key: str
class Config:
env_file = ".env"
@lru_cache() # only runs once, result cached for app lifetime
def get_settings() -> Settings:
return Settings()
@app.get("/config")
def show_config(settings: Settings = Depends(get_settings)):
return {"db_url": settings.db_url} # api_key hiddenLifespan Events (Startup & Shutdown)#
Use lifespan to set up resources when the app starts and clean them up when it shuts down: database connection pools, loading ML models, connecting to Redis.
from contextlib import asynccontextmanager
from fastapi import FastAPI
import asyncpg # async PostgreSQL driver
# Global state shared across requests
db_pool = None
@asynccontextmanager
async def lifespan(app: FastAPI):
# ─── STARTUP ───
global db_pool
print("🚀 Starting up — connecting to database...")
db_pool = await asyncpg.create_pool(
"postgresql://user:pass@localhost/mydb",
min_size=5,
max_size=20,
)
print("✅ Database pool ready")
yield # ← app runs here
# ─── SHUTDOWN ───
print("🛑 Shutting down — closing database pool...")
await db_pool.close()
print("✅ Cleaned up")
app = FastAPI(lifespan=lifespan)
@app.get("/users")
async def get_users():
async with db_pool.acquire() as conn:
rows = await conn.fetch("SELECT id, email FROM users")
return [dict(r) for r in rows]Loading an ML Model at Startup#
from contextlib import asynccontextmanager
import joblib
ml_model = None
@asynccontextmanager
async def lifespan(app: FastAPI):
global ml_model
print("Loading model... (this might take a moment)")
ml_model = joblib.load("model.pkl") # blocking but only once
print("Model loaded!")
yield
ml_model = None # cleanup
app = FastAPI(lifespan=lifespan)
@app.post("/predict")
def predict(features: list[float]):
prediction = ml_model.predict([features])
return {"prediction": prediction[0]}Putting It All Together#
from contextlib import asynccontextmanager
from fastapi import FastAPI, Depends, BackgroundTasks, WebSocket
import asyncpg, logging
logger = logging.getLogger(__name__)
db_pool = None
@asynccontextmanager
async def lifespan(app: FastAPI):
global db_pool
db_pool = await asyncpg.create_pool("postgresql://...")
yield
await db_pool.close()
app = FastAPI(lifespan=lifespan)
async def get_db():
async with db_pool.acquire() as conn:
yield conn
def notify_admin(event: str):
logger.info(f"Admin notification: {event}")
@app.post("/orders")
async def create_order(
item: str,
background_tasks: BackgroundTasks,
db = Depends(get_db),
):
await db.execute("INSERT INTO orders (item) VALUES ($1)", item)
background_tasks.add_task(notify_admin, f"New order: {item}")
return {"status": "created"}Summary#
| Pattern | Problem It Solves |
|---|---|
| Background Tasks | Don’t make users wait for slow side effects |
| WebSockets | Real-time bidirectional communication |
| Dependency Injection | Share DB connections, auth, config without repetition |
| Lifespan Events | Reliable startup/shutdown for resources |