How to Set Up Real-Time Video Alerts with Webhooks and Trio
From camera event to Slack notification in under 500ms — a step-by-step integration guide
Most teams wire up a camera, run inference, and then wonder: "Now what?" The AI fires a detection event — a person in a restricted zone, a forklift in a pedestrian corridor, a piece of equipment left unattended — and that insight evaporates into a log file nobody reads.
Webhooks change that. Instead of polling an endpoint or tailing logs, your application receives a signed HTTP push the moment an event fires. No open connections. No wasted inference cycles. Just a precise, low-latency nudge that travels from a camera frame to a Slack notification in under 500 milliseconds.
This tutorial walks through the complete integration: subscribing to Trio events, setting confidence thresholds and cooldown windows, routing alerts to Slack, PagerDuty, and email, adding retry logic, and monitoring the pipeline in production. If you haven't set up your first stream yet, start with the Trio Stream API getting started guide first — this tutorial picks up from a working stream connection.
What Is a Webhook in the Context of Video AI?
- Video AI Webhook
A webhook is an HTTP callback that a server (in this case, Trio's inference infrastructure) sends to your application endpoint the moment a specified condition is detected in a video stream. Unlike polling — where your code repeatedly asks "did anything happen?" — a webhook inverts the flow: Trio calls you when something happens. This makes webhooks the correct primitive for low-latency, event-driven video alert pipelines.
In a traditional polling architecture you might call stream.ask() every five seconds and check the result. At 720 calls per hour per camera that burns API quota fast, adds artificial latency equal to half your polling interval, and still misses events that happen between polls. With a webhook subscription, Trio watches the stream continuously and POSTs to your endpoint within milliseconds of a condition being met.
73%
reduction in mean time to alert compared to 5-second polling, achieved by switching to webhook-based event subscriptions in Trio production deployments
Polling vs. Webhooks vs. Server-Sent Events
Before writing a line of code it is worth understanding where webhooks fit relative to the two other common real-time notification patterns.
SSE is excellent when you are rendering a live dashboard in a browser — the connection stays open and events stream in. Webhooks win when you need to fan out to multiple downstream services (Slack, PagerDuty, a database write) without keeping a persistent connection alive. For video alert pipelines, webhooks are almost always the right choice. The latency vs. throughput tradeoffs in real-time AI post explores this in more depth if you want to reason through the architecture before committing.
Step 1: Create the Event Subscription
With a connected stream in hand, subscribing to events is a single SDK call. Here is the minimal working version:
import os
import trio_sdk
from dotenv import load_dotenv
load_dotenv()
client = trio_sdk.Client(api_key=os.environ["TRIO_API_KEY"])
# Connect your camera stream
stream = client.streams.connect(
url="rtsp://camera.local/live",
label="loading-dock-east",
)
# Subscribe to a specific event condition
subscription = stream.subscribe(
question="Alert if any person enters the restricted zone marked with yellow floor tape.",
webhook_url="https://your-app.com/api/trio-events",
)
print(f"Subscription ID: {subscription.id}")
print(f"Status: {subscription.status}")That is enough to start receiving webhooks. Trio validates the stream URL, registers the subscription, and begins watching. But a bare subscription with default settings will generate a lot of noise. The next two parameters are the ones that matter most in production.
Step 2: Set Confidence Thresholds and Cooldown Windows
Two parameters control the signal-to-noise ratio of your alert pipeline: confidence_threshold and cooldown_seconds.
import os
import trio_sdk
from dotenv import load_dotenv
load_dotenv()
client = trio_sdk.Client(api_key=os.environ["TRIO_API_KEY"])
stream = client.streams.connect(
url="rtsp://camera.local/live",
label="loading-dock-east",
)
subscription = stream.subscribe(
question="Alert if any person enters the restricted zone marked with yellow floor tape.",
webhook_url="https://your-app.com/api/trio-events",
confidence_threshold=0.85, # Only fire if model confidence >= 85%
cooldown_seconds=60, # Suppress duplicate alerts for 60 seconds
metadata={"zone": "loading-dock-east", "severity": "high"},
)
print(f"Subscription active: {subscription.id}")confidence_threshold prevents low-confidence detections — shadows, reflections, ambiguous frames — from triggering alerts. The right value depends on your use case. Security and safety applications typically use 0.80–0.90. Retail shelf monitoring can go as low as 0.65 because false negatives (missed detections) are more costly than false positives.
cooldown_seconds is a deduplication window. If a person stands in the restricted zone for three minutes, you want one alert — not one every two seconds for the duration. Set this to match your expected operator response time. Sixty seconds is a sensible default; critical safety applications often use shorter windows (15–30 seconds) to ensure re-alerting if the condition persists and the first alert was missed.
The metadata field passes arbitrary key-value pairs through to every webhook payload, which you will use in Step 4 to route alerts to the right destination.
Step 3: Receive and Verify Webhooks
Trio signs every webhook payload with an HMAC-SHA256 signature using your webhook secret. Verifying the signature before acting on the payload is not optional — it is the only thing stopping a malicious actor from spoofing events into your alert pipeline.
Here is a complete FastAPI webhook receiver with signature verification:
import hashlib
import hmac
import json
import os
from fastapi import FastAPI, HTTPException, Request
from dotenv import load_dotenv
load_dotenv()
app = FastAPI()
WEBHOOK_SECRET = os.environ["TRIO_WEBHOOK_SECRET"]
def verify_signature(payload_bytes: bytes, signature_header: str) -> bool:
"""Verify the X-Trio-Signature HMAC-SHA256 header."""
expected = hmac.new(
WEBHOOK_SECRET.encode(),
payload_bytes,
hashlib.sha256,
).hexdigest()
return hmac.compare_digest(expected, signature_header)
@app.post("/api/trio-events")
async def handle_trio_event(request: Request):
payload_bytes = await request.body()
signature = request.headers.get("X-Trio-Signature", "")
if not verify_signature(payload_bytes, signature):
raise HTTPException(status_code=401, detail="Invalid signature")
event = json.loads(payload_bytes)
# event fields: stream_id, subscription_id, question, answer,
# confidence, frame_ts, metadata
print(f"[EVENT] {event['answer']} (confidence: {event['confidence']:.2f})")
# Route to notification channels
await route_alert(event)
return {"status": "ok"}
async def route_alert(event: dict):
"""Fan out alert to all configured notification channels."""
severity = event.get("metadata", {}).get("severity", "medium")
if severity == "high":
await send_pagerduty(event)
await send_slack(event)
await send_email(event)Respond with HTTP 200 as quickly as possible. Do all downstream work (Slack POST, database write, PagerDuty trigger) asynchronously after returning — Trio will retry delivery if it does not receive a 2xx response within five seconds.
Step 4: Route Alerts to Slack, PagerDuty, and Email
With a verified payload in hand, fan-out to multiple notification channels is straightforward. Here are production-ready implementations for the three most common destinations.
import httpx
import smtplib
import os
from email.mime.text import MIMEText
from datetime import datetime
async def send_slack(event: dict):
"""Post an alert to a Slack channel via Incoming Webhook."""
zone = event.get("metadata", {}).get("zone", "unknown")
ts = datetime.fromisoformat(event["frame_ts"]).strftime("%H:%M:%S UTC")
payload = {
"blocks": [
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": (
f":rotating_light: *Video Alert — {zone}*\n"
f"{event['answer']}\n"
f"Confidence: {event['confidence']:.0%} | {ts}"
),
},
}
]
}
async with httpx.AsyncClient() as client:
await client.post(
os.environ["SLACK_WEBHOOK_URL"],
json=payload,
timeout=5.0,
)
async def send_pagerduty(event: dict):
"""Trigger a PagerDuty incident for high-severity events."""
payload = {
"routing_key": os.environ["PAGERDUTY_ROUTING_KEY"],
"event_action": "trigger",
"payload": {
"summary": event["answer"],
"source": event["stream_id"],
"severity": "critical",
"custom_details": {
"confidence": event["confidence"],
"subscription_id": event["subscription_id"],
"frame_ts": event["frame_ts"],
**event.get("metadata", {}),
},
},
}
async with httpx.AsyncClient() as client:
await client.post(
"https://events.pagerduty.com/v2/enqueue",
json=payload,
timeout=5.0,
)
def send_email(event: dict):
"""Send an alert email via SMTP."""
zone = event.get("metadata", {}).get("zone", "unknown")
body = (
f"Video Alert: {event['answer']}\n\n"
f"Zone: {zone}\n"
f"Confidence: {event['confidence']:.0%}\n"
f"Stream: {event['stream_id']}\n"
f"Timestamp: {event['frame_ts']}\n"
)
msg = MIMEText(body)
msg["Subject"] = f"[Trio Alert] {zone} — {event['answer'][:60]}"
msg["From"] = os.environ["ALERT_FROM_EMAIL"]
msg["To"] = os.environ["ALERT_TO_EMAIL"]
with smtplib.SMTP(os.environ["SMTP_HOST"], int(os.environ["SMTP_PORT"])) as server:
server.starttls()
server.login(os.environ["SMTP_USER"], os.environ["SMTP_PASSWORD"])
server.send_message(msg)For more complex multi-camera deployments — where you need to correlate events across feeds before firing an alert — see the multi-camera AI dashboard guide for a session-based approach that queries multiple streams together.
Notification Channel Comparison
Step 5: Implement Retry Logic
Webhooks fail. Your endpoint will restart, your DNS will flap, your TLS certificate will expire at 3 AM. A production alert pipeline needs retry logic on both sides: Trio retries delivery on its end, and your receiver needs to handle duplicate delivery gracefully.
Trio's retry behavior: Trio attempts delivery up to five times using exponential back-off — 1s, 2s, 4s, 8s, 16s. If all five attempts fail, the event is written to your dead-letter queue, accessible via the dashboard and the /subscriptions/{id}/failed-events API endpoint.
Idempotent receivers: Because Trio may retry delivery, your handler must be idempotent. Use event["request_id"] as a deduplication key:
import redis
import json
r = redis.Redis.from_url(os.environ["REDIS_URL"])
@app.post("/api/trio-events")
async def handle_trio_event(request: Request):
payload_bytes = await request.body()
signature = request.headers.get("X-Trio-Signature", "")
if not verify_signature(payload_bytes, signature):
raise HTTPException(status_code=401, detail="Invalid signature")
event = json.loads(payload_bytes)
request_id = event["request_id"]
# Idempotency check — have we already processed this event?
already_processed = r.set(
f"trio:processed:{request_id}",
"1",
ex=3600, # TTL 1 hour
nx=True, # Only set if key does not exist
)
if not already_processed:
# Duplicate delivery — acknowledge but skip processing
return {"status": "duplicate"}
await route_alert(event)
return {"status": "ok"}This pattern — "set if not exists" with a short TTL — gives you exactly-once processing even when Trio retries delivery multiple times. Redis is the standard choice here; any distributed key-value store works.
Step 6: Monitor the Alert Pipeline
An alert pipeline that silently fails is worse than no alert pipeline. Here is what to instrument.
28%
of production webhook pipelines have experienced silent delivery failures lasting more than one hour, due to absent endpoint health monitoring
import trio_sdk
import os
from dotenv import load_dotenv
load_dotenv()
client = trio_sdk.Client(api_key=os.environ["TRIO_API_KEY"])
# Inspect subscription health
for sub in client.subscriptions.list():
stats = sub.stats()
print(f"Subscription: {sub.id}")
print(f" Events fired (24h): {stats.events_24h}")
print(f" Delivery success rate: {stats.delivery_success_rate:.1%}")
print(f" Failed events in DLQ: {stats.dlq_count}")
print(f" Avg delivery latency: {stats.avg_latency_ms}ms")
print()
# Drain the dead-letter queue
for sub in client.subscriptions.list():
failed = sub.failed_events(limit=50)
for event in failed:
print(f"[DLQ] {event['request_id']} — {event['answer']}")
# Re-process or page on-callBeyond the SDK, add these four checks to your monitoring stack:
- Endpoint uptime — Your webhook receiver must return 2xx within 5 seconds. Use an uptime monitor (Checkly, Better Uptime, or a simple cron hitting a
/healthroute) to catch receiver outages before Trio exhausts its retries. - DLQ depth alert — Alert if
dlq_count > 0for any subscription. A non-empty dead-letter queue means events that triggered a real detection were never delivered to your notification channels. - Delivery latency P95 — Track the
avg_latency_msfrom the stats endpoint. Spikes indicate either network issues between Trio's infrastructure and your endpoint or a slow receiver blocking the response. - Event rate anomaly — An alert subscription that fires zero events for 24 hours in an active zone is suspicious. Either your camera went offline, your subscription expired, or the confidence threshold is set too high.
For warehouse and logistics deployments, the AI warehouse video monitoring guide shows how these monitoring patterns integrate with a broader operational dashboard. If you're building for security or surveillance use cases, the AI security surveillance guide covers compliance-aware alert architectures.
Step 7: Manage Multiple Subscriptions at Scale
A production deployment might have dozens of cameras and dozens of event conditions per camera. Treat subscriptions as configuration — store them in your database, not hardcoded in scripts.
import trio_sdk
import os
from dataclasses import dataclass
from typing import List
@dataclass
class CameraAlertConfig:
stream_url: str
label: str
conditions: List[dict] # [{question, confidence, cooldown, metadata}]
def provision_subscriptions(configs: List[CameraAlertConfig], webhook_url: str):
"""Provision all subscriptions from a configuration list."""
client = trio_sdk.Client(api_key=os.environ["TRIO_API_KEY"])
for config in configs:
stream = client.streams.connect(
url=config.stream_url,
label=config.label,
)
for cond in config.conditions:
sub = stream.subscribe(
question=cond["question"],
webhook_url=webhook_url,
confidence_threshold=cond.get("confidence", 0.80),
cooldown_seconds=cond.get("cooldown", 60),
metadata=cond.get("metadata", {}),
)
print(f"Provisioned {sub.id} on {config.label}: {cond['question'][:50]}")
# Example: provision alerts for a construction site
provision_subscriptions(
configs=[
CameraAlertConfig(
stream_url="rtsp://site-cam-01.local/live",
label="site-entrance",
conditions=[
{
"question": "Alert if any worker is not wearing a hard hat.",
"confidence": 0.82,
"cooldown": 30,
"metadata": {"zone": "entrance", "severity": "high"},
},
{
"question": "Alert if a vehicle enters the pedestrian zone.",
"confidence": 0.88,
"cooldown": 15,
"metadata": {"zone": "pedestrian", "severity": "critical"},
},
],
),
],
webhook_url="https://your-app.com/api/trio-events",
)For construction and outdoor deployments with changing lighting conditions, the construction site safety AI guide covers additional confidence calibration strategies. The scaling video AI architecture post addresses how subscription management evolves as you go from tens to hundreds of cameras.
For a deeper look at how the underlying stream analysis works before events are fired, the live video stream with AI guide explains the full pipeline from frame extraction to inference.
Frequently Asked Questions
Keep Reading
- Getting Started with the Trio Stream API — Connect your first camera, run your first inference call, and understand the core SDK primitives before adding webhook subscriptions.
- How to Analyze a Live Video Stream with AI — A deep dive into the full pipeline architecture from frame extraction to inference to structured output — the foundation that webhook events are built on top of.
- Multi-Camera AI Dashboard with Python — Extend beyond single-camera webhooks to a correlated, multi-stream alert dashboard that aggregates events across an entire facility.