How to Build a Real-Time Object Detection Pipeline with Python
From YOLO to production — a step-by-step tutorial for live video inference
Real-time object detection is one of the most practically useful capabilities in modern computer vision — and Python has become the default language for building it. Whether you're monitoring a manufacturing line for defects, tracking foot traffic in a retail space, or flagging safety incidents on a job site, the same core pipeline underlies almost every use case: grab a video frame, run a model, parse the results, and act on what you find — all fast enough to keep up with a live feed. This tutorial walks you through every step of building that pipeline from scratch, ending with a production-ready architecture and a look at when managed infrastructure like Trio makes more sense than rolling your own.
What You'll Build
By the end of this tutorial you'll have a Python script that:
- Connects to any RTSP camera stream or local webcam
- Runs YOLOv8 inference on every frame at 25–30 FPS (GPU) or 5–8 FPS (CPU)
- Draws labeled bounding boxes with confidence scores
- Filters detections by class and confidence threshold
- Publishes structured JSON results to stdout (ready for downstream consumers)
- Handles reconnection and frame-drop gracefully
The final codebase is around 120 lines of Python and depends only on ultralytics, opencv-python, and numpy.
- Object Detection
Object detection is the computer vision task of simultaneously identifying what objects are present in an image and where they are, represented as bounding boxes with class labels and confidence scores. Unlike image classification (which assigns a single label to a whole image), detection models output a variable-length list of localized predictions — making them suitable for scenes with multiple objects of different types.
Prerequisites
Before diving in, make sure you have the following:
- Python 3.10 or newer — YOLOv8's
ultralyticspackage requires 3.8+ but 3.10 is recommended formatchstatement support in the post-processing section - pip or conda for package management
- A CUDA-capable GPU (NVIDIA GTX 1060 or better) or an Apple Silicon Mac for MPS acceleration — CPU-only inference works but won't sustain real-time frame rates on 1080p feeds
- A video source: either a USB webcam, an RTSP camera URL, or a local
.mp4file for testing - Basic familiarity with Python classes and context managers
Step 1: Environment Setup
Create a clean virtual environment and install the required packages.
# Create and activate a virtual environment
python -m venv venv
source venv/bin/activate # Windows: venv\Scripts\activate
# Install dependencies
pip install ultralytics opencv-python numpy
# Optional: verify CUDA is available
python -c "import torch; print(torch.cuda.is_available())"The ultralytics package bundles PyTorch and will automatically download the correct CUDA-enabled wheels if a compatible GPU is detected. You do not need to install PyTorch separately.
Create your project directory structure:
detection-pipeline/
main.py # entry point
detector.py # YOLOv8 wrapper class
stream.py # video capture abstraction
postprocess.py # NMS and filtering utilities
requirements.txtStep 2: Loading YOLOv8
YOLOv8 ships in five sizes: nano (n), small (s), medium (m), large (l), and extra-large (x). For real-time inference on live video, yolov8n or yolov8s is almost always the right starting point — they hit 30+ FPS on a mid-range GPU while still delivering solid accuracy on the 80 COCO classes.
from __future__ import annotations
import json
from dataclasses import dataclass, asdict
from typing import Iterator
import numpy as np
from ultralytics import YOLO
from ultralytics.engine.results import Results
@dataclass
class Detection:
class_id: int
class_name: str
confidence: float
x1: float
y1: float
x2: float
y2: float
@property
def bbox_center(self) -> tuple[float, float]:
return ((self.x1 + self.x2) / 2, (self.y1 + self.y2) / 2)
def to_json(self) -> str:
return json.dumps(asdict(self))
class ObjectDetector:
"""
Thin wrapper around YOLOv8 that yields Detection instances
from raw numpy frames.
"""
def __init__(
self,
model_size: str = "yolov8n.pt",
confidence: float = 0.40,
iou_threshold: float = 0.45,
device: str = "auto",
allowed_classes: list[int] | None = None,
) -> None:
self.model = YOLO(model_size)
self.confidence = confidence
self.iou_threshold = iou_threshold
self.allowed_classes = allowed_classes # None = all COCO classes
if device == "auto":
import torch
if torch.cuda.is_available():
self.device = "cuda"
elif torch.backends.mps.is_available():
self.device = "mps"
else:
self.device = "cpu"
else:
self.device = device
# Warm up the model with a blank frame to avoid first-frame latency
self._warmup()
def _warmup(self) -> None:
dummy = np.zeros((640, 640, 3), dtype=np.uint8)
self.model(
dummy,
verbose=False,
device=self.device,
conf=self.confidence,
)
def detect(self, frame: np.ndarray) -> list[Detection]:
results: list[Results] = self.model(
frame,
verbose=False,
device=self.device,
conf=self.confidence,
iou=self.iou_threshold,
stream=False,
)
detections: list[Detection] = []
for result in results:
for box in result.boxes:
class_id = int(box.cls.item())
if self.allowed_classes and class_id not in self.allowed_classes:
continue
detections.append(
Detection(
class_id=class_id,
class_name=result.names[class_id],
confidence=float(box.conf.item()),
x1=float(box.xyxy[0][0]),
y1=float(box.xyxy[0][1]),
x2=float(box.xyxy[0][2]),
y2=float(box.xyxy[0][3]),
)
)
return detections3.2ms
Avg. YOLOv8-nano inference time per 640x640 frame on an NVIDIA RTX 3090
Step 3: Connecting to a Video Stream
OpenCV's VideoCapture supports USB webcams (integer index), local video files (file path), and RTSP/HTTP streams (URL string) through a single unified interface. The VideoStream class below wraps it with automatic reconnection and a configurable target resolution.
from __future__ import annotations
import time
import logging
import cv2
import numpy as np
logger = logging.getLogger(__name__)
class VideoStream:
"""
Resilient video capture wrapper supporting webcams,
local files, and RTSP/HTTP streams.
"""
def __init__(
self,
source: int | str,
width: int = 1280,
height: int = 720,
fps: int = 30,
reconnect_delay: float = 2.0,
max_reconnect_attempts: int = 10,
) -> None:
self.source = source
self.width = width
self.height = height
self.fps = fps
self.reconnect_delay = reconnect_delay
self.max_reconnect_attempts = max_reconnect_attempts
self._cap: cv2.VideoCapture | None = None
self._connect()
def _connect(self) -> None:
if self._cap is not None:
self._cap.release()
self._cap = cv2.VideoCapture(self.source)
self._cap.set(cv2.CAP_PROP_FRAME_WIDTH, self.width)
self._cap.set(cv2.CAP_PROP_FRAME_HEIGHT, self.height)
self._cap.set(cv2.CAP_PROP_FPS, self.fps)
# Minimize buffer to reduce latency on live streams
self._cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)
if not self._cap.isOpened():
raise ConnectionError(f"Cannot open video source: {self.source}")
logger.info("Connected to video source: %s", self.source)
def frames(self) -> None:
"""Yield (frame_index, numpy_frame) with auto-reconnect on drop."""
frame_index = 0
reconnect_count = 0
while True:
ret, frame = self._cap.read()
if not ret:
reconnect_count += 1
if reconnect_count > self.max_reconnect_attempts:
logger.error("Max reconnect attempts reached. Stopping.")
break
logger.warning(
"Frame read failed. Reconnecting in %.1fs (attempt %d/%d)",
self.reconnect_delay,
reconnect_count,
self.max_reconnect_attempts,
)
time.sleep(self.reconnect_delay)
self._connect()
continue
reconnect_count = 0
yield frame_index, frame
frame_index += 1
def release(self) -> None:
if self._cap:
self._cap.release()
def __enter__(self) -> "VideoStream":
return self
def __exit__(self, *_) -> None:
self.release()Step 4: Running Inference
With the detector and stream classes in place, the inference loop is straightforward. The key is to keep the loop body as thin as possible — every millisecond spent outside the model call is a millisecond of latency added to your pipeline.
from __future__ import annotations
import argparse
import json
import sys
import time
import logging
import cv2
from detector import ObjectDetector, Detection
from stream import VideoStream
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
)
logger = logging.getLogger("pipeline")
def parse_args() -> argparse.Namespace:
p = argparse.ArgumentParser(description="Real-time object detection pipeline")
p.add_argument("--source", default=0, help="Camera index, file path, or RTSP URL")
p.add_argument("--model", default="yolov8n.pt")
p.add_argument("--confidence", type=float, default=0.40)
p.add_argument("--iou", type=float, default=0.45)
p.add_argument("--classes", nargs="+", type=int, help="COCO class IDs to detect")
p.add_argument("--display", action="store_true", help="Show annotated window")
p.add_argument("--json-out", action="store_true", help="Stream JSON to stdout")
return p.parse_args()
def draw_detections(
frame,
detections: list[Detection],
colors: dict[int, tuple[int, int, int]] | None = None,
) -> None:
"""Draw bounding boxes and labels in-place on frame."""
for det in detections:
color = (colors or {}).get(det.class_id, (138, 67, 225))
x1, y1, x2, y2 = int(det.x1), int(det.y1), int(det.x2), int(det.y2)
cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2)
label = f"{det.class_name} {det.confidence:.2f}"
(lw, lh), _ = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.55, 1)
cv2.rectangle(frame, (x1, y1 - lh - 6), (x1 + lw, y1), color, -1)
cv2.putText(
frame, label, (x1, y1 - 4),
cv2.FONT_HERSHEY_SIMPLEX, 0.55, (255, 255, 255), 1,
)
def run(args: argparse.Namespace) -> None:
source = int(args.source) if str(args.source).isdigit() else args.source
detector = ObjectDetector(
model_size=args.model,
confidence=args.confidence,
iou_threshold=args.iou,
allowed_classes=args.classes,
)
fps_samples: list[float] = []
with VideoStream(source) as stream:
for frame_idx, frame in stream.frames():
t0 = time.perf_counter()
detections = detector.detect(frame)
elapsed = time.perf_counter() - t0
fps_samples.append(1.0 / max(elapsed, 1e-6))
if len(fps_samples) > 30:
fps_samples.pop(0)
avg_fps = sum(fps_samples) / len(fps_samples)
if args.json_out:
payload = {
"frame": frame_idx,
"fps": round(avg_fps, 1),
"detections": [
{
"class": d.class_name,
"confidence": round(d.confidence, 4),
"bbox": [d.x1, d.y1, d.x2, d.y2],
}
for d in detections
],
}
print(json.dumps(payload), flush=True)
if args.display:
draw_detections(frame, detections)
cv2.putText(
frame, f"FPS: {avg_fps:.1f}",
(10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2,
)
cv2.imshow("Detection", frame)
if cv2.waitKey(1) & 0xFF == ord("q"):
break
cv2.destroyAllWindows()
if __name__ == "__main__":
run(parse_args())Run it against a webcam with the display window enabled:
python main.py --source 0 --model yolov8s.pt --confidence 0.45 --displayOr stream detection JSON from an RTSP camera:
python main.py \
--source rtsp://admin:password@192.168.1.100:554/stream \
--model yolov8n.pt \
--confidence 0.50 \
--classes 0 2 7 \
--json-out | tee detections.jsonlStep 5: Post-Processing and Visualization
Raw model output is useful for debugging but rarely sufficient for production. You typically need to apply additional filters, zone logic, or aggregation before the results are actionable.
from __future__ import annotations
from collections import defaultdict, deque
from dataclasses import dataclass
import numpy as np
from detector import Detection
@dataclass
class Zone:
"""Rectangular region of interest in pixel coordinates."""
name: str
x1: float
y1: float
x2: float
y2: float
def contains_center(self, det: Detection) -> bool:
cx, cy = det.bbox_center
return self.x1 <= cx <= self.x2 and self.y1 <= cy <= self.y2
class DetectionFilter:
"""
Stateful post-processor that applies confidence filtering,
zone masking, and a temporal smoothing buffer.
"""
def __init__(
self,
zones: list[Zone] | None = None,
min_confidence: float = 0.50,
temporal_window: int = 5,
) -> None:
self.zones = zones or []
self.min_confidence = min_confidence
# Ring buffer of last N frames' detections per class
self._history: dict[str, deque[int]] = defaultdict(
lambda: deque(maxlen=temporal_window)
)
def process(
self, detections: list[Detection]
) -> dict[str, list[Detection]]:
"""
Returns detections grouped by zone name.
"global" key contains detections not in any named zone.
"""
filtered = [
d for d in detections if d.confidence >= self.min_confidence
]
# Update history for smoothing
counts: dict[str, int] = defaultdict(int)
for d in filtered:
counts[d.class_name] += 1
for cls, buf in self._history.items():
buf.append(counts.get(cls, 0))
grouped: dict[str, list[Detection]] = defaultdict(list)
for det in filtered:
matched = False
for zone in self.zones:
if zone.contains_center(det):
grouped[zone.name].append(det)
matched = True
break
if not matched:
grouped["global"].append(det)
return dict(grouped)
def smoothed_count(self, class_name: str) -> float:
"""Return the temporal average count for a given class."""
buf = self._history[class_name]
return sum(buf) / len(buf) if buf else 0.0Step 6: Optimizing for Production
Once your pipeline is functionally correct on a single machine, several optimizations become important before you can run it at scale.
Model quantization is the fastest win. YOLOv8 supports INT8 export out of the box:
from ultralytics import YOLO
model = YOLO("yolov8s.pt")
# Export to TensorRT (NVIDIA GPUs) — typically 2-3x faster than PyTorch
model.export(format="engine", int8=True, device=0)
# Export to ONNX for cross-platform deployment
model.export(format="onnx", opset=17, simplify=True)
# Load the optimized engine for inference
optimized = YOLO("yolov8s.engine")Frame skipping is essential for high-FPS sources when your model can't keep up. Rather than queuing frames and adding latency, drop intermediate frames:
# In your capture loop, skip every N-1 frames
SKIP_FRAMES = 2 # run inference on 1 in every 3 frames
for frame_idx, frame in stream.frames():
if frame_idx % (SKIP_FRAMES + 1) != 0:
continue # discard this frame, grab the next one
detections = detector.detect(frame)Async inference decouples capture from inference using a producer-consumer queue, eliminating GPU idle time between frames:
import queue
import threading
from detector import ObjectDetector
from stream import VideoStream
def capture_worker(source, frame_queue: queue.Queue, maxsize: int = 4) -> None:
with VideoStream(source) as stream:
for frame_idx, frame in stream.frames():
try:
frame_queue.put_nowait((frame_idx, frame))
except queue.Full:
pass # drop frame rather than build latency
def inference_worker(
frame_queue: queue.Queue,
result_queue: queue.Queue,
detector: ObjectDetector,
) -> None:
while True:
frame_idx, frame = frame_queue.get()
detections = detector.detect(frame)
result_queue.put((frame_idx, detections))
def run_async(source, model_size: str = "yolov8n.pt") -> None:
frame_q: queue.Queue = queue.Queue(maxsize=4)
result_q: queue.Queue = queue.Queue(maxsize=16)
detector = ObjectDetector(model_size=model_size)
capture_t = threading.Thread(target=capture_worker, args=(source, frame_q), daemon=True)
infer_t = threading.Thread(target=inference_worker, args=(frame_q, result_q, detector), daemon=True)
capture_t.start()
infer_t.start()
while True:
frame_idx, detections = result_q.get()
for d in detections:
print(f"[{frame_idx}] {d.class_name} @ {d.confidence:.2f}")Using Trio as an Alternative
Building and maintaining a real-time object detection pipeline requires sustained engineering effort that scales with your deployment footprint. Every camera you add means more GPU capacity to provision, more reconnection logic to harden, more inference workers to scale, and more model versions to track. If detection is core to your product, that investment pays off. If it's infrastructure supporting a higher-level application, it's overhead you can eliminate.
Trio's multimodal stream API handles the entire pipeline described above — capture, inference, post-processing, and structured output — as a managed service. You connect a camera stream and describe what you want to detect; Trio runs the inference and delivers structured results over a WebSocket or HTTP streaming endpoint.
import trio_sdk as trio
client = trio.Client(api_key="YOUR_API_KEY")
# Connect a live RTSP feed
stream = client.streams.connect(
source="rtsp://192.168.1.100:554/cam/realmonitor",
modalities=["video"],
)
# Describe what to detect in natural language
for event in stream.detect(
prompt="Detect all people, vehicles, and packages. "
"Alert if a person enters the restricted zone "
"(top-left quadrant).",
structured_output=True,
):
print(event.detections) # typed Detection objects
print(event.alerts) # triggered alert conditions
print(event.frame_metadata) # timestamp, fps, resolutionTrio abstracts away model selection, GPU provisioning, frame-level batching, and reconnection — letting you focus on what to detect and what to do with the results rather than how the inference runs. For teams that need to process dozens or hundreds of simultaneous camera feeds, this separation is essential. See our posts on how to analyze a live video stream with AI and the build vs. buy decision for video analytics pipelines for a deeper look at when managed infrastructure makes sense.
If you're encountering the upstream challenge that makes object detection difficult to operationalize — getting raw video frames into an AI-ready format in the first place — The Video-to-LLM Gap covers why that problem is harder than it looks and how Trio solves it.
Keep Reading
- How to Analyze a Live Video Stream with AI — Connect any RTSP or WebRTC feed to an AI model and start extracting insights in under 10 minutes
- Build vs. Buy: Video Analytics Pipeline — When rolling your own detection pipeline makes sense, and when it becomes a liability
- The Video-to-LLM Gap — Why raw video frames are incompatible with language models, and the infrastructure layer that bridges them