Part 5: Inference at 60 FPS
This blog series chronicles the engineering behind designing and installing a thermal camera system into my car. Read the first post here.
Threading, Tracking, and Thermal Quirks
With the Orin Nano powered and a model trained, next up is the server-side inference, processing, and streaming pipeline.
Part 1: Startup Sequence
The system is designed to start automatically when the car is turned on and handle various failure modes that come with embedded systems - camera disconnections, WiFi dropouts, etc. I built a state machine that continuously checks conditions and handles them accordingly. My code orchestrates:
Connecting to my car’s WiFi (with fallback options)
Detecting and initializing the FLIR thermal camera
Loading and warming up the ML model
Starting the streaming service
Continuous recording
Broadcasting its presence so my phone can find it
The system is designed to connect to my home’s WiFi network as primary, and my car’s WiFi as secondary. These are hard-coded SSIDs and passwords. That way I can SSH into the box and work on it while I’m at home, and stream via WiFi to my phone when I’m on the road. The system also disables WiFi power management, sets the Orin to max performance mode, and increases UDP buffers:
subprocess.run(['sudo', 'sysctl', '-w', 'net.core.wmem_max=4194304'])
subprocess.run(['sudo', 'sysctl', '-w', 'net.ipv4.udp_wmem_min=262144'])
subprocess.run(['sudo', 'nvpmodel', '-m', '0'])
subprocess.run(['sudo', 'jetson_clocks'])
def connect_wifi():
# Try primary first
subprocess.run(['sudo', 'nmcli', 'con', 'up', 'id', PRIMARY_WIFI_SSID])
if check_wifi_connected():
return True
# Fallback to secondary
subprocess.run(['sudo', 'nmcli', 'dev', 'wifi', 'connect',
PRIMARY_WIFI_SSID, 'password', PRIMARY_WIFI_PASSWORD])
Neural network frameworks like PyTorch and TensorRT also do a lot of lazy initialization. They don’t actually compile kernels or allocate GPU memory until the first real inference. This means the first detection could take 500ms instead of 10ms. So the code runs a “dummy” inference during startup to force all the initialization to happen:
model = YOLO(ENGINE_PATH, task='detect')
print("Warming up model...")
dummy_frame = np.random.randint(0, 255, (512, 640, 3), dtype=np.uint8)
with torch.no_grad():
for _ in range(3):
_ = model(dummy_frame)
The warm-up pass forces TensorRT to compile all its optimization kernels and allocate GPU memory pools.
Next - with 640x512 8bit images instreaming in at 60 FPS, that’s 19.66 MB/s of raw pixel data. Naive memory allocation (allocating and freeing up memory for every frame) would lead to degraded performance from allocation overhead and possibly memory fragmentation. So instead, I pre-allocate everything up front with a frame buffer pool:
class FrameBufferPool:
def __init__(self, width, height, channels, pool_size=20):
self.shape = (height, width, channels)
self.buffers = [np.empty(self.shape, dtype=np.uint8) for _ in range(pool_size)]
self.available = queue.Queue()
for buf in self.buffers:
self.available.put(buf)
def get(self, timeout=0.001):
try:
return self.available.get(timeout=timeout)
except queue.Empty:
return None
def release(self, buffer):
if buffer is not None:
self.available.put(buffer)
The queue
gives us thread-safety for free since the inference pipeline, recorder, and streamer all run in separate threads. No locks, no allocation overhead, just pull a buffer when you need one and return it when you're done. To enable recording and streaming simultaneously while running inference, I also implemented separation and explicit ownership transfer. This is important because each pipeline has different consumption patterns.
def add_frame(self, frame):
# Get a buffer from the recording pool
recording_buffer = self.recording_buffer_pool.get(timeout=0)
if recording_buffer is None:
# No buffer available, drop frame
return
# Copy frame data to recording buffer
np.copyto(recording_buffer, frame)
try:
self.recording_queue.put_nowait({
'frame': recording_buffer,
'is_pooled': True # Only need this flag now
})
except queue.Full:
# Queue full, release buffer back to pool
self.recording_buffer_pool.release(recording_buffer)
When fragmenting JPEG data for UDP transmission, I also eliminate unnecessary copies using memoryviews:
mv = memoryview(jpeg_arr).cast('B') # flat view over the JPEG bytes
for di in range(M):
s = di * CHUNK_PAYLOAD
e = min(s + CHUNK_PAYLOAD, frame_len)
header = struct.pack(PKT_HDR_FMT, seq, T, di, frame_len)
self._send_fragment(header, mv[s:e], client) # mv[s:e] is a view, not a copy!
To get the most out of my Orin, I also pin specific threads.
Main thread: Camera capture and orchestration
Inference workers (1-2 threads): YOLO model inference and tracking
Recording thread: Continuous recording to disk
Encoder thread: JPEG encoding and UDP streaming
Various daemon threads: WiFi monitoring, UDP handshakes, etc.
With six cores on the Orin Nano, these threads are pinned to specific cores:
Main thread -> core 4
Inference workers -> cores 0-1
Recording thread -> cores 2-3
Encoder thread -> Core 5
This is intended to isolate latency-sensitive operations (inferencing) from I/O-heavy ones (recording / streaming). Each thread calls os.sched_setaffinity() to pin on startup.
Next, the code broadcasts its existence so my phone can find it on my car’s WiFi network. This is intended to deal with pesky things such as dynamic IPs and I wanted an elegant non-hardcoded solution. My phone app listens on port 9999, discovers the Orin Nano, and connects to the stream with no configuration required. This also means multiple phones can connect provided that they have the app I built.
def broadcast_loop(self):
msg = json.dumps({"service": "webcam_udp", "port": 8888}).encode()
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
while True:
sock.sendto(msg, ('<broadcast>', 9999))
time.sleep(1)
Part 2: Inference and Tracking
Conceptually there are 3 things we need to do here:
Get raw detections (simple: feed image through inference)
Determine when detection is an actual animal, or if it was just a hot exhaust pipe or a random McDonald’s bag someone tossed out.
Determine whether said animal is on a collision course or otherwise merits an alert
Confidence Accumulation:
Instead of treating each frame independently, I implemented a confidence accumulation system that builds trust in detections over time. The initial version of this was a naive “do we have the same class detected over >N frames”; the final version considers the confidence of each detection and decays confidence accordingly over time if we get contradictory frames. In practice, I have four base parameters that I tweaked in testing:
# Tracking config with confidence accumulation
CONFIRM_THRESHOLD = 2.5 # Cumulative confidence needed to confirm track
MAINTAIN_THRESHOLD = 0.35 # Minimum confidence to maintain confirmed track
CONFIDENCE_DECAY = 0.9 # Decay factor per frame (0.9 = 10% decay)
MAX_CONFIDENCE = 5.5 # Max accumulated confidence
Each detection starts as a potential track. So if a frame says “deer with 0.6 confidence" in frame N, that track now has 0.6 accumulated confidence. If the next frame N+1 has the same deer with 0.7 confidence, we’re now at 1.3. If the next frame doesn’t have the deer, then we’re down to 1.17. Once a track hits 2.5 accumulated confidence (3-5 consistent detections) - that’s treated as a confirmed detection. If accumulated confidence drops below 0.35 that kills the track. This enables the system to handle partial occlusions etc gracefully.
“But how do you know it’s the same deer?”
I implemented a hybrid Intersection over Union (“IoU”) + centroid approach:
def calculate_match_score(self, detection_bbox, track):
"""Calculate combined IOU + Centroid score"""
# IOU component
iou = self.calculate_iou(detection_bbox, track.bbox)
# Centroid component
det_cx = (detection_bbox[0] + detection_bbox[2]) / 2
det_cy = (detection_bbox[1] + detection_bbox[3]) / 2
det_centroid = np.array([det_cx, det_cy])
distance = np.linalg.norm(det_centroid - track.centroid)
centroid_score = 1.0 - min(distance / self.image_diagonal, 1.0)
# Combined score
return (IOU_WEIGHT * iou) + (CENTROID_WEIGHT * centroid_score)
IoU handles handles cases where overlapping bounding boxes are probably the same object. The centroid piece helps when frames have limited overlap. In testing, I found 60% IoU and 40% centroid weighting yielded pretty solid tracking. The confidence accumulation approach mirrors human perception - we don't instantly trust every shape we see, but persistent, consistent observations build confidence.
Heuristic Constraints:
ANIMAL_SIZE_CONSTRAINTS = {
"deer": {"min_height": 12, "max_height": 140},
"hog": {"min_height": 10, "max_height": 120},
"small_animal": {"min_height": 8, "max_height": 90}
}
These heuristic constraints are intended to prevent spurious detections.
Part 3: Streaming
Frames are annotated with confirmed tracks and then sent over to the streaming pipeline. What mattered to me most was latency. At the protocol level, there are two choices - TCP and UDP.
TCP is reliable, packet ordering is built in, it does congestion control for you. But those features come at a cost - latency. When TCP sends a packet it looks for a confirmation from the recipient that the packet arrived. Retransmitting packets adds latency. For transferring a file this makes sense, but for my application all I care about is the latest frame pulled from the camera.
UDP on the other hand, is fire and forget. A lost packet might mean a slightly corrupted frame or visual artifact, but the next frame gets there in 33ms anyways, so c’est la vie.
Within UDP there are a couple of options - RTP, WebRTC, a bunch of other protocols. Each of these felt like trying to use a cruise ship to cross a small lake when all I really wanted was a speedboat. I’d previously built my own UDP-based streamer and was pretty comfortable rolling my own with raw UDP packets. Each packet carries a 12-byte header with a sequence number, total chunks so the receiver knows it has all the pieces of a frame, and frame length for buffer allocation:
PKT_HDR_FMT = '>IHHI'
Fragmentation logic:
frame_len = len(mv)
M = (frame_len + CHUNK_PAYLOAD - 1) // CHUNK_PAYLOAD
num_groups = (M + FEC_K - 1) // FEC_K
P = num_groups if FEC_ENABLE else 0
T = M + P
FEC = Forward Error Correction. For each group of four packets I also send an additional packet such that if any single chunk is lost the receiver is able to reconstruct it from the other 3 + parity. This obviously made my receiver-side code more complicated…
if FEC_ENABLE and P > 0:
for g in range(num_groups):
parity = np.zeros(CHUNK_PAYLOAD, dtype=np.uint8)
# send the group's data
for r in range(FEC_K):
di = g * FEC_K + r
if di >= M:
break
s = di * CHUNK_PAYLOAD
e = min(s + CHUNK_PAYLOAD, frame_len)
header = struct.pack(PKT_HDR_FMT, seq, T, di, frame_len)
self._send_fragment(header, mv[s:e], client)
chunk_view = np.frombuffer(mv, dtype=np.uint8, count=e - s, offset=s)
parity[:e - s] ^= chunk_view
# send the group's parity right
parity_idx = M + g
header = struct.pack(PKT_HDR_FMT, seq, T, parity_idx, frame_len)
self._send_fragment(header, memoryview(parity), client)
I also implemented a bit of pacing to prevent bursty behavior. In early versions of this code, I was experiencing stutter on the receiving end that happened to be caused by bursty packet behavior:
self.PACKET_GAP_SEC = float(os.getenv("PACKET_GAP_SEC", PACKET_GAP_SEC_DEFAULT))
self._gap_ns = int(self.PACKET_GAP_SEC * 1e9)
def _pacer():
if self._gap_ns <= 0:
return
target = self._next_pkt_time_ns
now = time.perf_counter_ns()
if now < target:
rem = target - now
# Sleep if >200µs remaining, then spin for the last ~100µs
if rem > 200_000:
time.sleep((rem - 100_000) / 1e9)
while time.perf_counter_ns() < target:
pass
And finally, a bit more socket configuration:
# Video queue priority
self.udp_sock.setsockopt(socket.IPPROTO_IP, socket.IP_TOS, 34 << 2)
# Big send buffer for bursty frames
self.udp_sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 4 * 1024 * 1024)
# Kernel pacing
if hasattr(socket, 'SO_MAX_PACING_RATE'):
self.udp_sock.setsockopt(socket.SOL_SOCKET, socket.SO_MAX_PACING_RATE, 10000000)
Part 4: Black Box Recorder
Home stretch! For debugging and just because I set this up to save the last N minutes of footage in rolling 10-second segments. The idea is that this would help w/ crash resilience (I’d lose 10 seconds at most. Pun NOT intended). I couldn’t get hardware encoding to work, so I used software based encoding:
pipeline = [
'gst-launch-1.0',
'-e',
'fdsrc',
'!', 'rawvideoparse',
'width=640', 'height=512', 'format=bgr', 'framerate=30/1',
'!', 'videorate',
'!', 'video/x-raw,framerate=30/1',
'!', 'videoconvert',
'!', 'video/x-raw,format=I420',
'!', 'x264enc',
'bitrate=2000',
'speed-preset=ultrafast',
'tune=zerolatency',
'key-int-max=30', # Keyframe
'!', 'h264parse',
'!', 'mp4mux',
'fragment-duration=1000', # 1 second fragments
'!', 'filesink',
f'location={self.current_segment_path}.tmp'
]
The recorder runs in its own thread pinned to cores 2-3 and maintains its own buffer pool to avoid unnecessary memory allocation overhead. I also implemented some cleanup logic and made the whole thing circular (so that it overwrites the oldest segment when it hits the recording time limit I set):
def _manage_segments(self):
segments = self._get_existing_segments()
# Remove oldest if max exceeded
while len(segments) > self.max_segments - 1:
oldest_time, oldest_path = segments.pop(0)
try:
os.remove(oldest_path)
except:
pass