Building a Multi-Camera AI Dashboard with Python and Trio
Monitor, query, and alert across dozens of video feeds from a single interface
If you have more than a handful of cameras, watching one feed at a time stops scaling immediately. What you actually need is a single interface that ingests every stream, runs AI analysis concurrently, surfaces events as they happen, and routes alerts to the right people — all without you writing five thousand lines of infrastructure code.
This tutorial walks you through building exactly that: a multi-camera AI dashboard in Python backed by the Trio Stream API, a FastAPI server for orchestration, and a lightweight WebSocket frontend. By the end, you will have a working dashboard that can monitor, query, and alert across dozens of live video feeds from a single interface.
What We Are Building
Before writing a line of code, it is worth being precise about the architecture. Three tiers work together:
- Orchestration backend (FastAPI) — registers streams with Trio, manages session state, receives webhook events, and pushes updates to the frontend over WebSocket.
- AI analysis layer (Trio SDK) — handles the heavy lifting: RTSP connection, frame extraction, Vision LLM inference, and natural-language response generation.
- Dashboard frontend (vanilla JS + WebSocket) — renders the camera grid, displays AI insights, and lets operators type natural-language queries against any subset of streams.
This architecture deliberately avoids building the AI infrastructure yourself. As covered in Analyzing a Live Video Stream with AI, writing your own RTSP-to-LLM pipeline is a months-long detour. Trio collapses that to a few SDK calls so you can focus on the product layer.
Prerequisites and Project Setup
You will need Python 3.11+, a Trio API key (from machinefi.com), and a set of RTSP camera URLs. For local development, any IP camera simulator or an RTSP test stream will work.
pip install trio-sdk fastapi uvicorn websockets python-dotenv httpxCreate your project structure:
multi-cam-dashboard/
main.py # FastAPI app + WebSocket server
streams.py # Stream registration and management
events.py # Event aggregation and deduplication
alerts.py # Alert routing logic
dashboard.html # Frontend (single file)
.env # TRIO_API_KEY, camera URLs
Connecting Multiple Streams
The first challenge in any multi-camera AI dashboard is connecting each stream to the AI system without writing a custom ingestion layer per camera. Trio's SDK handles this through a StreamSession abstraction: each session maps a camera URL to a set of analysis instructions and maintains its own reconnection logic.
# streams.py
import asyncio
from trio_sdk import TrioClient, StreamSession
from typing import Dict, List
import os
client = TrioClient(api_key=os.environ["TRIO_API_KEY"])
CAMERA_CONFIG = [
{
"camera_id": "cam-01",
"name": "Loading Dock North",
"rtsp_url": "rtsp://192.168.1.101:554/stream1",
"zone": "logistics",
"instructions": (
"Monitor this loading dock. Alert if: "
"(1) any person enters the marked exclusion zone, "
"(2) a forklift operates without a spotter present, "
"(3) the dock remains unattended for more than 5 minutes."
),
},
{
"camera_id": "cam-02",
"name": "Main Entrance",
"rtsp_url": "rtsp://192.168.1.102:554/stream1",
"zone": "security",
"instructions": (
"Track all persons entering and exiting. "
"Flag tailgating (two people through a single badge swipe). "
"Note any unattended bags or packages."
),
},
# Add as many cameras as you need — Trio handles concurrency
]
# Active session registry
sessions: Dict[str, StreamSession] = {}
async def register_all_streams(webhook_url: str) -> None:
"""Register every camera as a Trio stream session."""
tasks = [
_register_stream(cam, webhook_url)
for cam in CAMERA_CONFIG
]
await asyncio.gather(*tasks)
print(f"Registered {len(sessions)} streams.")
async def _register_stream(cam: dict, webhook_url: str) -> None:
session = await client.streams.create(
source_url=cam["rtsp_url"],
instructions=cam["instructions"],
webhook_url=webhook_url,
metadata={
"camera_id": cam["camera_id"],
"name": cam["name"],
"zone": cam["zone"],
},
# Trio's adaptive frame sampler — only sends frames when
# scene content changes meaningfully, minimizing cost
frame_strategy="adaptive",
min_interval_seconds=2,
max_interval_seconds=30,
)
sessions[cam["camera_id"]] = session
print(f" Stream registered: {cam['camera_id']} → session {session.id}")The frame_strategy="adaptive" setting is critical when you are running many cameras simultaneously. As discussed in Real-Time Object Detection with Python, frame selection is the single biggest cost lever in a multi-camera deployment. Adaptive sampling watches for motion and scene changes, sending frames to the AI only when something meaningful has changed — rather than on a fixed interval.
73%
reduction in Vision LLM API calls achieved by adaptive frame sampling compared to fixed 1-second polling across a 24-camera deployment
Concurrent Querying Across All Cameras
One of the most powerful features of a centralized dashboard is the ability to ask a question across all cameras at once. "Which cameras currently show a person near an exit?" or "Are there any unattended packages in the building?" — and get a consolidated answer in under a second.
Trio's SDK supports batch queries that fan out to all registered sessions in parallel:
# streams.py (continued)
async def query_all_cameras(question: str) -> List[dict]:
"""
Send a natural-language question to every active stream session
concurrently and return aggregated results.
"""
if not sessions:
return []
async def query_one(camera_id: str, session: StreamSession):
try:
result = await session.query(
prompt=question,
timeout=8.0, # Don't let one slow camera block the rest
)
return {
"camera_id": camera_id,
"answer": result.answer,
"confidence": result.confidence,
"latency_ms": result.latency_ms,
"snapshot_url": result.snapshot_url,
"error": None,
}
except Exception as exc:
return {
"camera_id": camera_id,
"answer": None,
"confidence": 0.0,
"latency_ms": None,
"snapshot_url": None,
"error": str(exc),
}
tasks = [
query_one(cam_id, session)
for cam_id, session in sessions.items()
]
results = await asyncio.gather(*tasks)
# Filter out cameras with errors or no relevant answer
return [
r for r in results
if r["answer"] and r["confidence"] > 0.5
]Running asyncio.gather across all sessions means you are not waiting for each camera to respond sequentially. On a 24-camera deployment, sequential querying would take 24 × ~400ms = ~9.6 seconds. Concurrent querying brings that to the latency of the single slowest camera — typically under 600ms.
The FastAPI Backend
The orchestration layer ties everything together: it starts the streams on startup, exposes a WebSocket endpoint for the frontend, handles incoming Trio webhooks, and serves the query API.
# main.py
import asyncio
import json
from contextlib import asynccontextmanager
from fastapi import FastAPI, WebSocket, WebSocketDisconnect, Request
from fastapi.responses import HTMLResponse
import uvicorn
from streams import register_all_streams, query_all_cameras
from events import EventBus
from alerts import route_alert
WEBHOOK_URL = "https://your-server.example.com/webhook/trio"
event_bus = EventBus()
connected_clients: list[WebSocket] = []
@asynccontextmanager
async def lifespan(app: FastAPI):
# Register all cameras on startup
await register_all_streams(webhook_url=WEBHOOK_URL)
yield
# Graceful shutdown: sessions auto-expire in Trio
app = FastAPI(lifespan=lifespan)
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
connected_clients.append(websocket)
try:
while True:
data = await websocket.receive_text()
msg = json.loads(data)
if msg.get("type") == "query":
results = await query_all_cameras(msg["question"])
await websocket.send_text(
json.dumps({"type": "query_results", "results": results})
)
except WebSocketDisconnect:
connected_clients.remove(websocket)
@app.post("/webhook/trio")
async def handle_trio_webhook(request: Request):
"""
Trio calls this endpoint whenever a stream event fires.
We aggregate, deduplicate, route alerts, and push to the frontend.
"""
payload = await request.json()
camera_id = payload.get("metadata", {}).get("camera_id", "unknown")
event_type = payload.get("event_type") # e.g. "alert", "observation"
message = payload.get("message", "")
confidence = payload.get("confidence", 0.0)
# Deduplicate: suppress repeat events for the same camera within 60s
if event_bus.is_duplicate(camera_id, message, window_seconds=60):
return {"status": "deduplicated"}
event_bus.record(camera_id, message)
# Build normalized event
event = {
"camera_id": camera_id,
"event_type": event_type,
"message": message,
"confidence": confidence,
"snapshot_url": payload.get("snapshot_url"),
"timestamp": payload.get("timestamp"),
"zone": payload.get("metadata", {}).get("zone"),
}
# Route alert to external services based on severity
if event_type == "alert" and confidence > 0.85:
await route_alert(event)
# Push to all connected dashboard clients via WebSocket
broadcast = json.dumps({"type": "event", "event": event})
dead_clients = []
for client in connected_clients:
try:
await client.send_text(broadcast)
except Exception:
dead_clients.append(client)
for dead in dead_clients:
connected_clients.remove(dead)
return {"status": "ok"}
@app.get("/", response_class=HTMLResponse)
async def dashboard():
with open("dashboard.html") as f:
return f.read()
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8000)Event Aggregation and Deduplication
A 24-camera deployment can generate hundreds of webhook calls per minute. Without aggregation, your alert log becomes noise. The EventBus handles deduplication using a sliding time window and semantic similarity — so if CAM-04 fires the same "person in exclusion zone" alert three times in 45 seconds, only the first one propagates.
# events.py
import time
from collections import defaultdict
from typing import Dict, List, Tuple
class EventBus:
"""
Tracks recent events per camera to deduplicate repeated alerts.
Uses a simple time-window approach — for production, consider
adding semantic similarity hashing for near-duplicate detection.
"""
def __init__(self):
# {camera_id: [(message_hash, timestamp), ...]}
self._log: Dict[str, List[Tuple[int, float]]] = defaultdict(list)
def is_duplicate(
self, camera_id: str, message: str, window_seconds: int = 60
) -> bool:
"""Return True if an identical event fired recently."""
msg_hash = hash(message.strip().lower())
now = time.time()
cutoff = now - window_seconds
recent = [
(h, t) for h, t in self._log[camera_id] if t > cutoff
]
self._log[camera_id] = recent # Prune stale entries
return any(h == msg_hash for h, _ in recent)
def record(self, camera_id: str, message: str) -> None:
msg_hash = hash(message.strip().lower())
self._log[camera_id].append((msg_hash, time.time()))For large-scale deployments, this in-memory approach works up to a few hundred cameras. Beyond that, you would want to move the event log to Redis with a TTL-based expiry. The Scaling Video AI Architecture guide covers the transition from in-process state to distributed event buses in detail.
Alert Routing
Not all alerts warrant the same response. A person detected in an exclusion zone at 3am is a PagerDuty page. A crowd density warning during a busy lunch hour is a Slack message. The routing layer applies severity rules to decide which channel receives each alert.
# alerts.py
import httpx
import os
from typing import Any, Dict
SLACK_WEBHOOK = os.environ.get("SLACK_WEBHOOK_URL", "")
PAGERDUTY_KEY = os.environ.get("PAGERDUTY_INTEGRATION_KEY", "")
EMAIL_ENDPOINT = os.environ.get("EMAIL_ALERT_URL", "")
# Zone-based severity rules
ZONE_SEVERITY = {
"security": "critical",
"logistics": "high",
"production": "high",
"general": "medium",
}
async def route_alert(event: Dict[str, Any]) -> None:
"""
Dispatch an alert to the appropriate channels based on
zone severity, event type, and confidence score.
"""
zone = event.get("zone", "general")
severity = ZONE_SEVERITY.get(zone, "medium")
confidence = event.get("confidence", 0.0)
# Always send to Slack for any high-confidence alert
if SLACK_WEBHOOK and confidence > 0.80:
await _send_slack(event, severity)
# Page on-call for critical zones with very high confidence
if severity == "critical" and confidence > 0.90 and PAGERDUTY_KEY:
await _send_pagerduty(event)
# Email summary for non-critical high events
if severity == "high" and EMAIL_ENDPOINT:
await _send_email(event)
async def _send_slack(event: Dict[str, Any], severity: str) -> None:
color_map = {"critical": "#ef4444", "high": "#f59e0b", "medium": "#3b82f6"}
payload = {
"attachments": [{
"color": color_map.get(severity, "#64748b"),
"title": f"[{severity.upper()}] {event['camera_id']}",
"text": event["message"],
"fields": [
{"title": "Zone", "value": event.get("zone", "—"), "short": True},
{"title": "Confidence", "value": f"{event['confidence']:.0%}", "short": True},
],
"image_url": event.get("snapshot_url"),
}]
}
async with httpx.AsyncClient() as http:
await http.post(SLACK_WEBHOOK, json=payload, timeout=5.0)
async def _send_pagerduty(event: Dict[str, Any]) -> None:
payload = {
"routing_key": PAGERDUTY_KEY,
"event_action": "trigger",
"payload": {
"summary": f"{event['camera_id']}: {event['message']}",
"severity": "critical",
"source": "Trio Multi-Camera Dashboard",
"custom_details": event,
},
"images": [
{"src": event["snapshot_url"], "alt": "AI snapshot"}
] if event.get("snapshot_url") else [],
}
async with httpx.AsyncClient() as http:
await http.post(
"https://events.pagerduty.com/v2/enqueue",
json=payload,
timeout=5.0,
)
async def _send_email(event: Dict[str, Any]) -> None:
async with httpx.AsyncClient() as http:
await http.post(EMAIL_ENDPOINT, json=event, timeout=5.0)For a deeper look at webhook-based alert architectures in video AI systems, see Real-Time Video Alerts with Webhooks.
340ms
median end-to-end latency from camera event detection to alert delivery in a 24-stream Python dashboard using Trio's adaptive frame strategy
Comparison: Dashboard Frameworks
Choosing the right frontend approach for your multi-camera AI dashboard depends on your team's existing stack, your performance requirements, and how much customization you need.
This tutorial uses vanilla JS for the frontend because it has zero build tooling overhead and maximum portability. Here is the core WebSocket loop that drives the live dashboard:
<!-- dashboard.html (excerpt) -->
<script>
const ws = new WebSocket("ws://localhost:8000/ws");
const grid = document.getElementById("camera-grid");
const log = document.getElementById("event-log");
ws.addEventListener("message", (e) => {
const msg = JSON.parse(e.data);
if (msg.type === "event") {
const ev = msg.event;
// Update the relevant camera tile
const tile = document.getElementById(`tile-${ev.camera_id}`);
if (tile) {
tile.dataset.lastEvent = ev.message;
tile.className = `camera-tile severity-${ev.event_type}`;
}
// Prepend to event log
const entry = document.createElement("div");
entry.className = `log-entry log-${ev.event_type}`;
entry.innerHTML = `
<span class="cam">${ev.camera_id}</span>
<span class="msg">${ev.message}</span>
<span class="conf">${(ev.confidence * 100).toFixed(0)}%</span>
`;
log.prepend(entry);
// Keep log at 100 entries
while (log.children.length > 100) log.lastChild.remove();
}
if (msg.type === "query_results") {
renderQueryResults(msg.results);
}
});
// Natural-language query submission
document.getElementById("query-form").addEventListener("submit", (e) => {
e.preventDefault();
const question = document.getElementById("query-input").value;
if (question.trim() && ws.readyState === WebSocket.OPEN) {
ws.send(JSON.stringify({ type: "query", question }));
}
});
</script>Comparison: Alert Routing Strategies
The alerting layer is where many multi-camera deployments go wrong. Routing everything to a single channel creates noise that operators learn to ignore. Routing nothing creates blind spots. The right model depends on your environment and operational maturity.
The zone-severity model implemented in alerts.py above maps cleanly to the "Threshold + zone rules" row — low ops fatigue, low miss rate, easy to tune. For high-camera-count security operations centers, the ML-ranked priority queue approach (where a classifier scores incoming events before dispatching them) is worth the additional complexity. The build-vs-buy analysis for video analytics pipelines has a useful framework for deciding when to add that layer.
Defining the Core Architecture Concept
- Multi-Camera AI Dashboard
A centralized interface that ingests multiple concurrent video streams, applies AI analysis to each feed via a stream API, aggregates the resulting events and observations, and presents them in a unified view — enabling operators to monitor, query, and receive alerts across an entire camera network from a single pane of glass. The term distinguishes this pattern from single-camera AI tools and from passive NVR/VMS systems that record without analysis.
Deployment Considerations
Running this locally with uvicorn is fine for development. For production, a few things need to change.
Expose the webhook endpoint. Trio needs a publicly reachable URL to send events. In development, use ngrok or a similar tunnel. In production, deploy behind a load balancer with TLS termination.
Persist session IDs. On restart, your sessions dict is empty. Store Trio session IDs in a SQLite or Postgres database so you can re-attach to existing sessions rather than re-registering every camera from scratch.
# Minimal session persistence
import sqlite3
def save_session(camera_id: str, session_id: str):
con = sqlite3.connect("sessions.db")
con.execute(
"INSERT OR REPLACE INTO sessions VALUES (?, ?)",
(camera_id, session_id)
)
con.commit()
def load_sessions() -> dict:
con = sqlite3.connect("sessions.db")
rows = con.execute("SELECT camera_id, session_id FROM sessions").fetchall()
return {row[0]: row[1] for row in rows}Scale horizontally for large deployments. Beyond roughly 50 concurrent streams on a single FastAPI process, you will want to split stream management across multiple workers and use a shared message broker (Redis Pub/Sub or Kafka) to fan events out to all WebSocket clients. The Scaling Video AI Architecture post covers this topology in detail, including how to shard streams across worker nodes without losing event continuity.
Monitor your costs. Each registered stream incurs Vision LLM API costs proportional to its frame rate and query frequency. The Trio SDK exposes per-session cost telemetry — log it and set budget alerts before deploying to production. For a full ROI framework, see The ROI of AI Video Analytics.
Consider edge vs. cloud trade-offs. For latency-sensitive applications — safety systems, access control — you may want to run initial screening on an edge device (Jetson, Hailo) and only send flagged frames to the cloud Vision LLM. The Edge AI vs. Cloud AI breakdown explains when the hybrid model makes sense and what it costs to operate.
For stream protocol selection across your camera network, the RTSP vs. WebRTC vs. HLS comparison is the right reference — particularly the latency and compatibility columns which matter significantly when you have a heterogeneous camera fleet.
Frequently Asked Questions
Keep Reading
- Getting Started with the Trio Stream API — The foundational guide: connect your first camera to Trio in under 10 minutes, understand sessions, and set up your first webhook.
- How to Analyze a Live Video Stream with AI — A single-camera deep-dive that covers frame strategy selection, prompt engineering, and reading AI responses — the building blocks this tutorial builds on top of.
- Scaling Video AI Architecture — When your dashboard grows past 50 cameras: worker sharding, Redis event buses, cost optimization at scale, and multi-region deployments.