2512 lines
145 KiB
Python
2512 lines
145 KiB
Python
from typing import Any, Dict
|
||
import os
|
||
import json
|
||
import time
|
||
import queue
|
||
import torch
|
||
import cv2
|
||
import numpy as np
|
||
import base64
|
||
import logging
|
||
import threading
|
||
import requests
|
||
import asyncio
|
||
import psutil
|
||
import zipfile
|
||
import ssl
|
||
import urllib3
|
||
import subprocess
|
||
import tempfile
|
||
import redis
|
||
from urllib.parse import urlparse
|
||
from requests.adapters import HTTPAdapter
|
||
from urllib3.util.ssl_ import create_urllib3_context
|
||
from fastapi import FastAPI, WebSocket, HTTPException
|
||
from fastapi.websockets import WebSocketDisconnect
|
||
from fastapi.responses import Response
|
||
from websockets.exceptions import ConnectionClosedError
|
||
from ultralytics import YOLO
|
||
|
||
# Import shared pipeline functions
|
||
from siwatsystem.pympta import load_pipeline_from_zip, run_pipeline, cleanup_camera_stability
|
||
|
||
app = FastAPI()
|
||
|
||
# Global dictionaries to keep track of models and streams
|
||
# "models" now holds a nested dict: { camera_id: { modelId: model_tree } }
|
||
models: Dict[str, Dict[str, Any]] = {}
|
||
streams: Dict[str, Dict[str, Any]] = {}
|
||
# Store session IDs per display
|
||
session_ids: Dict[str, int] = {}
|
||
# Track shared camera streams by camera URL
|
||
camera_streams: Dict[str, Dict[str, Any]] = {}
|
||
# Map subscriptions to their camera URL
|
||
subscription_to_camera: Dict[str, str] = {}
|
||
# Store latest frames for REST API access (separate from processing buffer)
|
||
latest_frames: Dict[str, Any] = {}
|
||
# Store cached detection dict after successful pipeline completion
|
||
cached_detections: Dict[str, Dict[str, Any]] = {}
|
||
# Enhanced caching system for LPR integration
|
||
session_detections: Dict[str, Dict[str, Any]] = {} # session_id -> detection data
|
||
session_to_camera: Dict[str, str] = {} # session_id -> camera_id
|
||
detection_timestamps: Dict[str, float] = {} # session_id -> timestamp (for cleanup)
|
||
# Track frame skipping for pipeline buffer after detection
|
||
frame_skip_flags: Dict[str, bool] = {}
|
||
# Track camera connection states for immediate error handling
|
||
camera_states: Dict[str, Dict[str, Any]] = {}
|
||
# Track session ID states and pipeline modes per camera
|
||
session_pipeline_states: Dict[str, Dict[str, Any]] = {}
|
||
# Store full pipeline results for caching
|
||
cached_full_pipeline_results: Dict[str, Dict[str, Any]] = {}
|
||
|
||
with open("config.json", "r") as f:
|
||
config = json.load(f)
|
||
|
||
poll_interval = config.get("poll_interval_ms", 100)
|
||
reconnect_interval = config.get("reconnect_interval_sec", 5)
|
||
TARGET_FPS = config.get("target_fps", 10)
|
||
poll_interval = 1000 / TARGET_FPS
|
||
logging.info(f"Poll interval: {poll_interval}ms")
|
||
max_streams = config.get("max_streams", 5)
|
||
max_retries = config.get("max_retries", 3)
|
||
|
||
# Configure logging
|
||
logging.basicConfig(
|
||
level=logging.INFO, # Set to INFO level for less verbose output
|
||
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
|
||
handlers=[
|
||
logging.FileHandler("detector_worker.log"), # Write logs to a file
|
||
logging.StreamHandler() # Also output to console
|
||
]
|
||
)
|
||
|
||
# Create a logger specifically for this application
|
||
logger = logging.getLogger("detector_worker")
|
||
logger.setLevel(logging.DEBUG) # Set app-specific logger to DEBUG level
|
||
|
||
# Create WebSocket communication logger
|
||
ws_logger = logging.getLogger("websocket_comm")
|
||
ws_logger.setLevel(logging.INFO)
|
||
ws_handler = logging.FileHandler("websocket_comm.log", encoding='utf-8')
|
||
ws_formatter = logging.Formatter("%(asctime)s [%(levelname)s] %(message)s")
|
||
ws_handler.setFormatter(ws_formatter)
|
||
ws_logger.addHandler(ws_handler)
|
||
ws_logger.propagate = False # Don't propagate to root logger
|
||
|
||
# Ensure all other libraries (including root) use at least INFO level
|
||
logging.getLogger().setLevel(logging.INFO)
|
||
|
||
logger.info("Starting detector worker application")
|
||
logger.info(f"Configuration: Target FPS: {TARGET_FPS}, Max streams: {max_streams}, Max retries: {max_retries}")
|
||
ws_logger.info("WebSocket communication logging started - TX/RX format")
|
||
logger.info("WebSocket communication will be logged to websocket_comm.log")
|
||
|
||
# Ensure the models directory exists
|
||
os.makedirs("models", exist_ok=True)
|
||
logger.info("Ensured models directory exists")
|
||
|
||
# Constants for heartbeat and timeouts
|
||
HEARTBEAT_INTERVAL = 2 # seconds
|
||
|
||
# Global Redis connection for LPR integration
|
||
redis_client_global = None
|
||
lpr_listener_thread = None
|
||
cleanup_timer_thread = None
|
||
lpr_integration_started = False
|
||
|
||
# Redis connection helper functions
|
||
def get_redis_config_from_model(camera_id: str) -> Dict[str, Any]:
|
||
"""Extract Redis configuration from loaded model pipeline"""
|
||
try:
|
||
for model_id, model_tree in models.get(camera_id, {}).items():
|
||
if hasattr(model_tree, 'get') and 'redis_client' in model_tree:
|
||
# Extract config from existing Redis client
|
||
client = model_tree['redis_client']
|
||
if client:
|
||
return {
|
||
'host': client.connection_pool.connection_kwargs['host'],
|
||
'port': client.connection_pool.connection_kwargs['port'],
|
||
'password': client.connection_pool.connection_kwargs.get('password'),
|
||
'db': client.connection_pool.connection_kwargs.get('db', 0)
|
||
}
|
||
except Exception as e:
|
||
logger.debug(f"Could not extract Redis config from model: {e}")
|
||
|
||
# Fallback - try to read from pipeline.json directly
|
||
try:
|
||
pipeline_dirs = []
|
||
models_dir = "models"
|
||
if os.path.exists(models_dir):
|
||
for root, dirs, files in os.walk(models_dir):
|
||
if "pipeline.json" in files:
|
||
with open(os.path.join(root, "pipeline.json"), 'r') as f:
|
||
config = json.load(f)
|
||
if 'redis' in config:
|
||
return config['redis']
|
||
except Exception as e:
|
||
logger.debug(f"Could not read Redis config from pipeline.json: {e}")
|
||
|
||
return None
|
||
|
||
def create_redis_connection() -> redis.Redis:
|
||
"""Create Redis connection using config from pipeline"""
|
||
global redis_client_global
|
||
|
||
if redis_client_global is not None:
|
||
try:
|
||
redis_client_global.ping()
|
||
return redis_client_global
|
||
except:
|
||
redis_client_global = None
|
||
|
||
# Find any camera with a loaded model to get Redis config
|
||
redis_config = None
|
||
for camera_id in models.keys():
|
||
redis_config = get_redis_config_from_model(camera_id)
|
||
if redis_config:
|
||
break
|
||
|
||
if not redis_config:
|
||
logger.error("No Redis configuration found in any loaded models")
|
||
return None
|
||
|
||
try:
|
||
redis_client_global = redis.Redis(
|
||
host=redis_config['host'],
|
||
port=redis_config['port'],
|
||
password=redis_config.get('password'),
|
||
db=redis_config.get('db', 0),
|
||
decode_responses=True,
|
||
socket_connect_timeout=5,
|
||
socket_timeout=5
|
||
)
|
||
redis_client_global.ping()
|
||
logger.info(f"✅ Connected to Redis for LPR at {redis_config['host']}:{redis_config['port']}")
|
||
return redis_client_global
|
||
except Exception as e:
|
||
logger.error(f"❌ Failed to connect to Redis for LPR: {e}")
|
||
redis_client_global = None
|
||
return None
|
||
|
||
# LPR Integration Functions
|
||
def process_license_result(lpr_data: Dict[str, Any]):
|
||
"""Process incoming LPR result and update backend"""
|
||
try:
|
||
# Enhanced debugging for LPR data reception
|
||
logger.info("=" * 60)
|
||
logger.info("🚗 LPR SERVICE DATA RECEIVED")
|
||
logger.info("=" * 60)
|
||
logger.info(f"📥 Raw LPR data: {json.dumps(lpr_data, indent=2)}")
|
||
|
||
session_id = str(lpr_data.get('session_id', ''))
|
||
license_text = lpr_data.get('license_character', '')
|
||
|
||
logger.info(f"🔍 Extracted session_id: '{session_id}'")
|
||
logger.info(f"🔍 Extracted license_character: '{license_text}'")
|
||
logger.info(f"📊 Current cached sessions count: {len(session_detections)}")
|
||
logger.info(f"📊 Available session IDs: {list(session_detections.keys())}")
|
||
|
||
# Find cached detection by session_id
|
||
if session_id not in session_detections:
|
||
logger.warning("❌ LPR SESSION ID NOT FOUND!")
|
||
logger.warning(f" Looking for session_id: '{session_id}'")
|
||
logger.warning(f" Available sessions: {list(session_detections.keys())}")
|
||
logger.warning(f" Session count: {len(session_detections)}")
|
||
|
||
# Additional debugging - show session timestamps
|
||
if session_detections:
|
||
logger.warning("📅 Available session details:")
|
||
for sid, timestamp in detection_timestamps.items():
|
||
age = time.time() - timestamp
|
||
camera = session_to_camera.get(sid, 'unknown')
|
||
logger.warning(f" Session {sid}: camera={camera}, age={age:.1f}s")
|
||
else:
|
||
logger.warning(" No cached sessions available - worker may not have processed any detections yet")
|
||
|
||
logger.warning("💡 Possible causes:")
|
||
logger.warning(" 1. Session expired (TTL: 10 minutes)")
|
||
logger.warning(" 2. Session ID mismatch between detection and LPR service")
|
||
logger.warning(" 3. Detection was not cached (no sessionId from backend)")
|
||
logger.warning(" 4. Worker restarted after detection but before LPR result")
|
||
return
|
||
|
||
# Get the original detection data
|
||
detection_data = session_detections[session_id].copy()
|
||
camera_id = session_to_camera.get(session_id, 'unknown')
|
||
|
||
logger.info("✅ LPR SESSION FOUND!")
|
||
logger.info(f" 📹 Camera ID: {camera_id}")
|
||
logger.info(f" ⏰ Session age: {time.time() - detection_timestamps.get(session_id, 0):.1f} seconds")
|
||
|
||
# Show original detection structure before update
|
||
original_license = detection_data.get('data', {}).get('detection', {}).get('licensePlateText')
|
||
logger.info(f" 🔍 Original licensePlateText: {original_license}")
|
||
logger.info(f" 🆕 New licensePlateText: '{license_text}'")
|
||
|
||
# Update licensePlateText in detection
|
||
if 'data' in detection_data and 'detection' in detection_data['data']:
|
||
detection_data['data']['detection']['licensePlateText'] = license_text
|
||
|
||
logger.info("🎯 LICENSE PLATE UPDATE SUCCESS!")
|
||
logger.info(f" ✅ Updated detection for session {session_id}")
|
||
logger.info(f" ✅ Set licensePlateText = '{license_text}'")
|
||
|
||
# Show full detection structure after update
|
||
detection_dict = detection_data['data']['detection']
|
||
logger.info("📋 Updated detection dictionary:")
|
||
logger.info(f" carModel: {detection_dict.get('carModel')}")
|
||
logger.info(f" carBrand: {detection_dict.get('carBrand')}")
|
||
logger.info(f" bodyType: {detection_dict.get('bodyType')}")
|
||
logger.info(f" licensePlateText: {detection_dict.get('licensePlateText')} ← UPDATED")
|
||
logger.info(f" licensePlateConfidence: {detection_dict.get('licensePlateConfidence')}")
|
||
else:
|
||
logger.error("❌ INVALID DETECTION DATA STRUCTURE!")
|
||
logger.error(f" Session {session_id} has malformed detection data")
|
||
logger.error(f" Detection data keys: {list(detection_data.keys())}")
|
||
if 'data' in detection_data:
|
||
logger.error(f" Data keys: {list(detection_data['data'].keys())}")
|
||
return
|
||
|
||
# Update timestamp to indicate this is an LPR update
|
||
detection_data['timestamp'] = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())
|
||
|
||
# Update all caches with new data
|
||
session_detections[session_id] = detection_data.copy()
|
||
cached_detections[camera_id] = detection_data.copy()
|
||
|
||
# CRITICAL: Also update the pipeline state cached detection dict (used by lightweight mode)
|
||
if camera_id in session_pipeline_states:
|
||
pipeline_state = session_pipeline_states[camera_id]
|
||
current_cached_dict = pipeline_state.get("cached_detection_dict", {})
|
||
|
||
# Update the pipeline cached detection dict with new license plate
|
||
updated_dict = current_cached_dict.copy() if current_cached_dict else {}
|
||
updated_dict['licensePlateText'] = license_text
|
||
|
||
pipeline_state["cached_detection_dict"] = updated_dict
|
||
logger.info(f"✅ LPR: Updated pipeline state cached_detection_dict for camera {camera_id}")
|
||
logger.debug(f"🔍 Pipeline cached dict now: {updated_dict}")
|
||
else:
|
||
logger.warning(f"⚠️ Camera {camera_id} not found in session_pipeline_states - pipeline cache not updated")
|
||
|
||
logger.info("📡 SENDING UPDATED DETECTION TO BACKEND")
|
||
logger.info(f" 📹 Camera ID: {camera_id}")
|
||
logger.info(f" 📨 Updated licensePlateText: '{license_text}'")
|
||
logger.info(" 🔄 Updated both cache systems:")
|
||
logger.info(f" 1️⃣ cached_detections[{camera_id}] ✅")
|
||
logger.info(f" 2️⃣ session_pipeline_states[{camera_id}].cached_detection_dict ✅")
|
||
|
||
# Log the full message being sent
|
||
logger.info("📋 Updated detection data in cache:")
|
||
logger.info(json.dumps(detection_data, indent=2))
|
||
|
||
logger.info("✅ ALL CACHES UPDATED!")
|
||
logger.info(f" 🎯 Lightweight mode will now use updated licensePlateText")
|
||
logger.info(f" 📤 Backend will receive: licensePlateText = '{license_text}'")
|
||
logger.info(" 🔄 Both cache systems synchronized with LPR data")
|
||
|
||
logger.info("=" * 60)
|
||
logger.info("🏁 LPR PROCESSING COMPLETE")
|
||
logger.info(f" Session: {session_id}")
|
||
logger.info(f" License: '{license_text}'")
|
||
logger.info(f" Status: ✅ SUCCESS - DETECTION CACHE UPDATED")
|
||
logger.info("=" * 60)
|
||
|
||
except Exception as e:
|
||
logger.error("=" * 60)
|
||
logger.error("❌ LPR PROCESSING FAILED")
|
||
logger.error("=" * 60)
|
||
logger.error(f"Error: {e}")
|
||
import traceback
|
||
logger.error(f"Traceback: {traceback.format_exc()}")
|
||
logger.error("=" * 60)
|
||
|
||
# LPR integration now uses cached detection mechanism instead of direct WebSocket sending
|
||
|
||
def license_results_listener():
|
||
"""Background thread to listen for LPR results from Redis"""
|
||
logger.info("🎧 Starting LPR listener thread...")
|
||
|
||
while True:
|
||
try:
|
||
redis_client = create_redis_connection()
|
||
if not redis_client:
|
||
logger.error("❌ No Redis connection available for LPR listener")
|
||
time.sleep(10)
|
||
continue
|
||
|
||
pubsub = redis_client.pubsub()
|
||
pubsub.subscribe("license_results")
|
||
logger.info("✅ LPR listener subscribed to 'license_results' channel")
|
||
|
||
for message in pubsub.listen():
|
||
try:
|
||
if message['type'] == 'message':
|
||
logger.info("🔔 REDIS MESSAGE RECEIVED!")
|
||
logger.info(f" 📡 Channel: {message['channel']}")
|
||
logger.info(f" 📥 Raw data: {message['data']}")
|
||
logger.info(f" 📏 Data size: {len(str(message['data']))} bytes")
|
||
|
||
try:
|
||
lpr_data = json.loads(message['data'])
|
||
logger.info("✅ JSON parsing successful")
|
||
logger.info("🏁 Starting LPR processing...")
|
||
process_license_result(lpr_data)
|
||
logger.info("✅ LPR processing completed")
|
||
except json.JSONDecodeError as e:
|
||
logger.error("❌ JSON PARSING FAILED!")
|
||
logger.error(f" Error: {e}")
|
||
logger.error(f" Raw data: {message['data']}")
|
||
logger.error(f" Data type: {type(message['data'])}")
|
||
except Exception as e:
|
||
logger.error("❌ LPR PROCESSING ERROR!")
|
||
logger.error(f" Error: {e}")
|
||
import traceback
|
||
logger.error(f" Traceback: {traceback.format_exc()}")
|
||
elif message['type'] == 'subscribe':
|
||
logger.info(f"📡 LPR listener subscribed to channel: {message['channel']}")
|
||
logger.info("🎧 Ready to receive license plate results...")
|
||
elif message['type'] == 'unsubscribe':
|
||
logger.warning(f"📡 LPR listener unsubscribed from channel: {message['channel']}")
|
||
else:
|
||
logger.debug(f"📡 Redis message type: {message['type']}")
|
||
|
||
except Exception as e:
|
||
logger.error(f"❌ Error in LPR message processing loop: {e}")
|
||
break
|
||
|
||
except redis.exceptions.ConnectionError as e:
|
||
logger.error(f"❌ Redis connection lost in LPR listener: {e}")
|
||
time.sleep(5) # Wait before reconnecting
|
||
except Exception as e:
|
||
logger.error(f"❌ Unexpected error in LPR listener: {e}")
|
||
time.sleep(10)
|
||
|
||
logger.warning("🛑 LPR listener thread stopped")
|
||
|
||
def cleanup_expired_sessions():
|
||
"""Remove sessions older than TTL (10 minutes)"""
|
||
try:
|
||
current_time = time.time()
|
||
ttl_seconds = 600 # 10 minutes
|
||
|
||
expired_sessions = [
|
||
session_id for session_id, timestamp in detection_timestamps.items()
|
||
if current_time - timestamp > ttl_seconds
|
||
]
|
||
|
||
if expired_sessions:
|
||
logger.info(f"🧹 Cleaning up {len(expired_sessions)} expired sessions")
|
||
|
||
for session_id in expired_sessions:
|
||
session_detections.pop(session_id, None)
|
||
camera_id = session_to_camera.pop(session_id, None)
|
||
detection_timestamps.pop(session_id, None)
|
||
logger.debug(f"Cleaned up expired session: {session_id} (camera: {camera_id})")
|
||
|
||
else:
|
||
logger.debug(f"🧹 No expired sessions to clean up ({len(detection_timestamps)} active)")
|
||
|
||
except Exception as e:
|
||
logger.error(f"❌ Error in session cleanup: {e}")
|
||
|
||
def cleanup_timer():
|
||
"""Background thread for periodic session cleanup"""
|
||
logger.info("⏰ Starting session cleanup timer thread...")
|
||
|
||
while True:
|
||
try:
|
||
time.sleep(120) # Run cleanup every 2 minutes
|
||
cleanup_expired_sessions()
|
||
except Exception as e:
|
||
logger.error(f"❌ Error in cleanup timer: {e}")
|
||
time.sleep(120)
|
||
|
||
def start_lpr_integration():
|
||
"""Start LPR integration threads"""
|
||
global lpr_listener_thread, cleanup_timer_thread
|
||
|
||
# Start LPR listener thread
|
||
lpr_listener_thread = threading.Thread(target=license_results_listener, daemon=True, name="LPR-Listener")
|
||
lpr_listener_thread.start()
|
||
logger.info("✅ LPR listener thread started")
|
||
|
||
# Start cleanup timer thread
|
||
cleanup_timer_thread = threading.Thread(target=cleanup_timer, daemon=True, name="Session-Cleanup")
|
||
cleanup_timer_thread.start()
|
||
logger.info("✅ Session cleanup timer thread started")
|
||
|
||
WORKER_TIMEOUT_MS = 10000
|
||
logger.debug(f"Heartbeat interval set to {HEARTBEAT_INTERVAL} seconds")
|
||
|
||
# Locks for thread-safe operations
|
||
streams_lock = threading.Lock()
|
||
models_lock = threading.Lock()
|
||
logger.debug("Initialized thread locks")
|
||
|
||
# Add helper to download mpta ZIP file from a remote URL
|
||
def download_mpta(url: str, dest_path: str) -> str:
|
||
try:
|
||
logger.info(f"Starting download of model from {url} to {dest_path}")
|
||
os.makedirs(os.path.dirname(dest_path), exist_ok=True)
|
||
response = requests.get(url, stream=True)
|
||
if response.status_code == 200:
|
||
file_size = int(response.headers.get('content-length', 0))
|
||
logger.info(f"Model file size: {file_size/1024/1024:.2f} MB")
|
||
downloaded = 0
|
||
with open(dest_path, "wb") as f:
|
||
for chunk in response.iter_content(chunk_size=8192):
|
||
f.write(chunk)
|
||
downloaded += len(chunk)
|
||
if file_size > 0 and downloaded % (file_size // 10) < 8192: # Log approximately every 10%
|
||
logger.debug(f"Download progress: {downloaded/file_size*100:.1f}%")
|
||
logger.info(f"Successfully downloaded mpta file from {url} to {dest_path}")
|
||
return dest_path
|
||
else:
|
||
logger.error(f"Failed to download mpta file (status code {response.status_code}): {response.text}")
|
||
return None
|
||
except Exception as e:
|
||
logger.error(f"Exception downloading mpta file from {url}: {str(e)}", exc_info=True)
|
||
return None
|
||
|
||
# Add helper to fetch snapshot image from HTTP/HTTPS URL
|
||
def fetch_snapshot(url: str):
|
||
try:
|
||
from requests.auth import HTTPBasicAuth, HTTPDigestAuth
|
||
import requests.adapters
|
||
import urllib3
|
||
|
||
# Parse URL to extract credentials
|
||
parsed = urlparse(url)
|
||
|
||
# Prepare headers - some cameras require User-Agent and specific headers
|
||
headers = {
|
||
'User-Agent': 'Mozilla/5.0 (compatible; DetectorWorker/1.0)',
|
||
'Accept': 'image/jpeg,image/*,*/*',
|
||
'Connection': 'close',
|
||
'Cache-Control': 'no-cache'
|
||
}
|
||
|
||
# Create a session with custom adapter for better connection handling
|
||
session = requests.Session()
|
||
adapter = requests.adapters.HTTPAdapter(
|
||
pool_connections=1,
|
||
pool_maxsize=1,
|
||
max_retries=urllib3.util.retry.Retry(
|
||
total=2,
|
||
backoff_factor=0.1,
|
||
status_forcelist=[500, 502, 503, 504]
|
||
)
|
||
)
|
||
session.mount('http://', adapter)
|
||
session.mount('https://', adapter)
|
||
|
||
# Reconstruct URL without credentials
|
||
clean_url = f"{parsed.scheme}://{parsed.hostname}"
|
||
if parsed.port:
|
||
clean_url += f":{parsed.port}"
|
||
clean_url += parsed.path
|
||
if parsed.query:
|
||
clean_url += f"?{parsed.query}"
|
||
|
||
auth = None
|
||
response = None
|
||
|
||
if parsed.username and parsed.password:
|
||
# Try HTTP Digest authentication first (common for IP cameras)
|
||
try:
|
||
auth = HTTPDigestAuth(parsed.username, parsed.password)
|
||
response = session.get(clean_url, auth=auth, headers=headers, timeout=(5, 15), stream=True)
|
||
if response.status_code == 200:
|
||
logger.debug(f"Successfully authenticated using HTTP Digest for {clean_url}")
|
||
elif response.status_code == 401:
|
||
# If Digest fails, try Basic auth
|
||
logger.debug(f"HTTP Digest failed, trying Basic auth for {clean_url}")
|
||
auth = HTTPBasicAuth(parsed.username, parsed.password)
|
||
response = session.get(clean_url, auth=auth, headers=headers, timeout=(5, 15), stream=True)
|
||
if response.status_code == 200:
|
||
logger.debug(f"Successfully authenticated using HTTP Basic for {clean_url}")
|
||
except Exception as auth_error:
|
||
logger.debug(f"Authentication setup error: {auth_error}")
|
||
# Fallback to original URL with embedded credentials
|
||
response = session.get(url, headers=headers, timeout=(5, 15), stream=True)
|
||
else:
|
||
# No credentials in URL, make request as-is
|
||
response = session.get(url, headers=headers, timeout=(5, 15), stream=True)
|
||
|
||
if response and response.status_code == 200:
|
||
# Read content with size limit to prevent memory issues
|
||
content = b''
|
||
max_size = 10 * 1024 * 1024 # 10MB limit
|
||
for chunk in response.iter_content(chunk_size=8192):
|
||
content += chunk
|
||
if len(content) > max_size:
|
||
logger.error(f"Snapshot too large (>{max_size} bytes) from {clean_url}")
|
||
return None
|
||
|
||
# Convert response content to numpy array
|
||
nparr = np.frombuffer(content, np.uint8)
|
||
# Decode image
|
||
frame = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
|
||
if frame is not None:
|
||
logger.debug(f"Successfully fetched snapshot from {clean_url}, shape: {frame.shape}, size: {len(content)} bytes")
|
||
return frame
|
||
else:
|
||
logger.error(f"Failed to decode image from snapshot URL: {clean_url} (content size: {len(content)} bytes)")
|
||
return None
|
||
elif response:
|
||
logger.error(f"Failed to fetch snapshot (status code {response.status_code}): {clean_url}")
|
||
# Log response headers and first part of content for debugging
|
||
logger.debug(f"Response headers: {dict(response.headers)}")
|
||
if len(response.content) < 1000:
|
||
logger.debug(f"Response content: {response.content[:500]}")
|
||
return None
|
||
else:
|
||
logger.error(f"No response received from snapshot URL: {clean_url}")
|
||
return None
|
||
except requests.exceptions.Timeout as e:
|
||
logger.error(f"Timeout fetching snapshot from {url}: {str(e)}")
|
||
return None
|
||
except requests.exceptions.ConnectionError as e:
|
||
logger.error(f"Connection error fetching snapshot from {url}: {str(e)}")
|
||
return None
|
||
except Exception as e:
|
||
logger.error(f"Exception fetching snapshot from {url}: {str(e)}", exc_info=True)
|
||
return None
|
||
|
||
# Helper to get crop coordinates from stream
|
||
def get_crop_coords(stream):
|
||
return {
|
||
"cropX1": stream.get("cropX1"),
|
||
"cropY1": stream.get("cropY1"),
|
||
"cropX2": stream.get("cropX2"),
|
||
"cropY2": stream.get("cropY2")
|
||
}
|
||
|
||
# Camera state management functions
|
||
def set_camera_connected(camera_id, connected=True, error_msg=None):
|
||
"""Set camera connection state and track error information"""
|
||
current_time = time.time()
|
||
|
||
if camera_id not in camera_states:
|
||
camera_states[camera_id] = {
|
||
"connected": True,
|
||
"last_error": None,
|
||
"last_error_time": None,
|
||
"consecutive_failures": 0,
|
||
"disconnection_notified": False
|
||
}
|
||
|
||
state = camera_states[camera_id]
|
||
was_connected = state["connected"]
|
||
|
||
if connected:
|
||
state["connected"] = True
|
||
state["consecutive_failures"] = 0
|
||
state["disconnection_notified"] = False
|
||
if not was_connected:
|
||
logger.info(f"📶 CAMERA RECONNECTED: {camera_id}")
|
||
else:
|
||
state["connected"] = False
|
||
state["last_error"] = error_msg
|
||
state["last_error_time"] = current_time
|
||
state["consecutive_failures"] += 1
|
||
|
||
# Distinguish between temporary and permanent disconnection
|
||
is_permanent = state["consecutive_failures"] >= 3
|
||
|
||
if was_connected and is_permanent:
|
||
logger.error(f"📵 CAMERA DISCONNECTED: {camera_id} - {error_msg} (consecutive failures: {state['consecutive_failures']})")
|
||
logger.info(f"🚨 CAMERA ERROR DETECTED - Will send detection: null to reset backend session for {camera_id}")
|
||
|
||
def is_camera_connected(camera_id):
|
||
"""Check if camera is currently connected"""
|
||
return camera_states.get(camera_id, {}).get("connected", True)
|
||
|
||
def should_notify_disconnection(camera_id):
|
||
"""Check if we should notify backend about disconnection"""
|
||
state = camera_states.get(camera_id, {})
|
||
is_disconnected = not state.get("connected", True)
|
||
not_yet_notified = not state.get("disconnection_notified", False)
|
||
has_enough_failures = state.get("consecutive_failures", 0) >= 3
|
||
|
||
return is_disconnected and not_yet_notified and has_enough_failures
|
||
|
||
def mark_disconnection_notified(camera_id):
|
||
"""Mark that we've notified backend about this disconnection"""
|
||
if camera_id in camera_states:
|
||
camera_states[camera_id]["disconnection_notified"] = True
|
||
logger.debug(f"Marked disconnection notification sent for camera {camera_id}")
|
||
|
||
def get_or_init_session_pipeline_state(camera_id):
|
||
"""Get or initialize session pipeline state for a camera"""
|
||
if camera_id not in session_pipeline_states:
|
||
session_pipeline_states[camera_id] = {
|
||
"mode": "validation_detecting", # "validation_detecting", "send_detections", "waiting_for_session_id", "full_pipeline", "lightweight", "car_gone_waiting"
|
||
"session_id_received": False,
|
||
"full_pipeline_completed": False,
|
||
"absence_counter": 0,
|
||
"validation_counter": 0, # Counter for validation phase
|
||
"validation_threshold": 4, # Default validation threshold
|
||
"max_absence_frames": 3,
|
||
"yolo_inference_enabled": True, # Controls whether to run YOLO inference
|
||
"cached_detection_dict": None, # Cached detection dict for lightweight mode
|
||
"stable_track_id": None, # The stable track ID we're monitoring
|
||
"validated_detection": None, # Stored detection result from validation phase for full_pipeline reuse
|
||
"progression_stage": None # Tracks current progression stage (welcome, car_wait_staff, car_fueling, car_waitpayment)
|
||
}
|
||
return session_pipeline_states[camera_id]
|
||
|
||
def update_session_pipeline_mode(camera_id, new_mode, session_id=None):
|
||
"""Update session pipeline mode and related state"""
|
||
state = get_or_init_session_pipeline_state(camera_id)
|
||
old_mode = state["mode"]
|
||
state["mode"] = new_mode
|
||
|
||
# Reset counters based on mode transition
|
||
if new_mode == "validation_detecting":
|
||
# Transitioning to validation mode - reset both counters for fresh start
|
||
old_validation_counter = state.get("validation_counter", 0)
|
||
old_absence_counter = state.get("absence_counter", 0)
|
||
state["validation_counter"] = 0
|
||
state["absence_counter"] = 0
|
||
if old_validation_counter > 0 or old_absence_counter > 0:
|
||
logger.info(f"🧹 Camera {camera_id}: VALIDATION MODE RESET - validation_counter: {old_validation_counter}→0, absence_counter: {old_absence_counter}→0")
|
||
|
||
if session_id:
|
||
state["session_id_received"] = True
|
||
state["absence_counter"] = 0 # Reset absence counter when session starts
|
||
|
||
logger.info(f"📊 Camera {camera_id}: Pipeline mode changed from '{old_mode}' to '{new_mode}'")
|
||
return state
|
||
|
||
####################################################
|
||
# REST API endpoint for image retrieval
|
||
####################################################
|
||
@app.get("/lpr/debug")
|
||
async def get_lpr_debug_info():
|
||
"""Debug endpoint to inspect LPR integration state"""
|
||
try:
|
||
return {
|
||
"status": "success",
|
||
"lpr_integration_started": lpr_integration_started,
|
||
"redis_connected": redis_client_global is not None and redis_client_global.ping() if redis_client_global else False,
|
||
"active_sessions": len(session_detections),
|
||
"session_details": {
|
||
session_id: {
|
||
"camera_id": session_to_camera.get(session_id, "unknown"),
|
||
"timestamp": detection_timestamps.get(session_id, 0),
|
||
"age_seconds": time.time() - detection_timestamps.get(session_id, time.time()),
|
||
"has_license": session_detections[session_id].get('data', {}).get('detection', {}).get('licensePlateText') is not None
|
||
}
|
||
for session_id in session_detections.keys()
|
||
},
|
||
"thread_status": {
|
||
"lpr_listener_alive": lpr_listener_thread.is_alive() if lpr_listener_thread else False,
|
||
"cleanup_timer_alive": cleanup_timer_thread.is_alive() if cleanup_timer_thread else False
|
||
},
|
||
"cached_detections_by_camera": list(cached_detections.keys())
|
||
}
|
||
except Exception as e:
|
||
return {
|
||
"status": "error",
|
||
"error": str(e),
|
||
"lpr_integration_started": lpr_integration_started
|
||
}
|
||
|
||
@app.get("/camera/{camera_id}/image")
|
||
async def get_camera_image(camera_id: str):
|
||
"""
|
||
Get the current frame from a camera as JPEG image
|
||
"""
|
||
try:
|
||
# URL decode the camera_id to handle encoded characters like %3B for semicolon
|
||
from urllib.parse import unquote
|
||
original_camera_id = camera_id
|
||
camera_id = unquote(camera_id)
|
||
logger.debug(f"REST API request: original='{original_camera_id}', decoded='{camera_id}'")
|
||
|
||
with streams_lock:
|
||
if camera_id not in streams:
|
||
logger.warning(f"Camera ID '{camera_id}' not found in streams. Current streams: {list(streams.keys())}")
|
||
raise HTTPException(status_code=404, detail=f"Camera {camera_id} not found or not active")
|
||
|
||
# Check if we have a cached frame for this camera
|
||
if camera_id not in latest_frames:
|
||
logger.warning(f"No cached frame available for camera '{camera_id}'.")
|
||
raise HTTPException(status_code=404, detail=f"No frame available for camera {camera_id}")
|
||
|
||
frame = latest_frames[camera_id]
|
||
logger.debug(f"Retrieved cached frame for camera '{camera_id}', frame shape: {frame.shape}")
|
||
# Encode frame as JPEG
|
||
success, buffer_img = cv2.imencode('.jpg', frame, [cv2.IMWRITE_JPEG_QUALITY, 85])
|
||
if not success:
|
||
raise HTTPException(status_code=500, detail="Failed to encode image as JPEG")
|
||
|
||
# Return image as binary response
|
||
return Response(content=buffer_img.tobytes(), media_type="image/jpeg")
|
||
|
||
except HTTPException:
|
||
raise
|
||
except Exception as e:
|
||
logger.error(f"Error retrieving image for camera {camera_id}: {str(e)}", exc_info=True)
|
||
raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}")
|
||
|
||
####################################################
|
||
# Detection and frame processing functions
|
||
####################################################
|
||
@app.websocket("/")
|
||
async def detect(websocket: WebSocket):
|
||
logger.info("WebSocket connection accepted")
|
||
persistent_data_dict = {}
|
||
|
||
async def handle_detection(camera_id, stream, frame, websocket, model_tree, persistent_data):
|
||
try:
|
||
# Check camera connection state first - handle disconnection immediately
|
||
if should_notify_disconnection(camera_id):
|
||
logger.error(f"🚨 CAMERA DISCONNECTION DETECTED: {camera_id} - sending immediate detection: null")
|
||
|
||
# Clear cached detections and occupancy state
|
||
cached_detections.pop(camera_id, None)
|
||
frame_skip_flags.pop(camera_id, None)
|
||
cached_full_pipeline_results.pop(camera_id, None) # Clear cached pipeline results
|
||
session_pipeline_states.pop(camera_id, None) # Reset session pipeline state
|
||
|
||
# Reset pipeline state immediately
|
||
from siwatsystem.pympta import reset_tracking_state
|
||
model_id = stream.get("modelId", "unknown")
|
||
reset_tracking_state(camera_id, model_id, "camera disconnected")
|
||
|
||
# Send immediate detection: null to backend
|
||
detection_data = {
|
||
"type": "imageDetection",
|
||
"subscriptionIdentifier": stream["subscriptionIdentifier"],
|
||
"timestamp": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
|
||
"data": {
|
||
"detection": None, # null detection for disconnection
|
||
"modelId": stream["modelId"],
|
||
"modelName": stream["modelName"]
|
||
}
|
||
}
|
||
|
||
try:
|
||
ws_logger.info(f"TX -> {json.dumps(detection_data, separators=(',', ':'))}")
|
||
await websocket.send_json(detection_data)
|
||
except RuntimeError as e:
|
||
if "websocket.close" in str(e):
|
||
logger.warning(f"WebSocket connection closed - cannot send disconnection signal for camera {camera_id}")
|
||
return persistent_data
|
||
else:
|
||
raise
|
||
mark_disconnection_notified(camera_id)
|
||
logger.info(f"📡 SENT DISCONNECTION SIGNAL - detection: null for camera {camera_id}, backend should clear session")
|
||
|
||
return persistent_data
|
||
|
||
# Apply crop if specified
|
||
cropped_frame = frame
|
||
if all(coord is not None for coord in [stream.get("cropX1"), stream.get("cropY1"), stream.get("cropX2"), stream.get("cropY2")]):
|
||
cropX1, cropY1, cropX2, cropY2 = stream["cropX1"], stream["cropY1"], stream["cropX2"], stream["cropY2"]
|
||
cropped_frame = frame[cropY1:cropY2, cropX1:cropX2]
|
||
logger.debug(f"Applied crop coordinates ({cropX1}, {cropY1}, {cropX2}, {cropY2}) to frame for camera {camera_id}")
|
||
|
||
logger.debug(f"Processing frame for camera {camera_id} with model {stream['modelId']}")
|
||
start_time = time.time()
|
||
|
||
# Extract display identifier for pipeline context
|
||
subscription_parts = stream["subscriptionIdentifier"].split(';')
|
||
display_identifier = subscription_parts[0] if subscription_parts else None
|
||
|
||
# Get backend session ID if available
|
||
backend_session_id = session_ids.get(display_identifier)
|
||
|
||
# Get or initialize session pipeline state
|
||
pipeline_state = get_or_init_session_pipeline_state(camera_id)
|
||
current_mode = pipeline_state["mode"]
|
||
|
||
logger.debug(f"🔍 SESSIONID LOOKUP: display='{display_identifier}', session_id={repr(backend_session_id)}, mode='{current_mode}'")
|
||
logger.debug(f"🔍 Available session_ids: {session_ids}")
|
||
logger.debug(f"🔍 VALIDATED_DETECTION TRACE: {pipeline_state.get('validated_detection')}")
|
||
|
||
# ═══ SESSION ID-BASED PROCESSING MODE ═══
|
||
if not backend_session_id:
|
||
# No session ID - handle different modes appropriately
|
||
if current_mode == "lightweight":
|
||
# Check if we're in car_waitpayment stage - if so, don't reset immediately
|
||
current_progression = pipeline_state.get("progression_stage")
|
||
if current_progression == "car_waitpayment":
|
||
# Stay in lightweight mode - let absence counter + sessionId null logic handle reset
|
||
logger.debug(f"🔍 Camera {camera_id}: No session ID but in car_waitpayment - staying in lightweight mode for dual reset condition")
|
||
else:
|
||
# Not in car_waitpayment - reset immediately (situation 1)
|
||
update_session_pipeline_mode(camera_id, "validation_detecting")
|
||
current_mode = "validation_detecting"
|
||
logger.debug(f"🔍 Camera {camera_id}: No session ID - reset to validation_detecting (not in car_waitpayment)")
|
||
elif current_mode not in ["validation_detecting", "send_detections", "waiting_for_session_id"]:
|
||
# Other modes - reset to validation_detecting
|
||
update_session_pipeline_mode(camera_id, "validation_detecting")
|
||
current_mode = "validation_detecting"
|
||
logger.debug(f"🔍 Camera {camera_id}: No session ID - reset to validation_detecting from {current_mode}")
|
||
else:
|
||
logger.debug(f"🔍 Camera {camera_id}: No session ID - staying in {current_mode} mode")
|
||
else:
|
||
# Session ID available - switch to full pipeline mode
|
||
if current_mode in ["send_detections", "waiting_for_session_id"]:
|
||
# Session ID just arrived - switch to full pipeline mode
|
||
update_session_pipeline_mode(camera_id, "full_pipeline", backend_session_id)
|
||
current_mode = "full_pipeline"
|
||
logger.info(f"🔥 Camera {camera_id}: Session ID received ({backend_session_id}) - switching to FULL PIPELINE mode")
|
||
|
||
# Create context for pipeline execution
|
||
pipeline_context = {
|
||
"camera_id": camera_id,
|
||
"display_id": display_identifier,
|
||
"backend_session_id": backend_session_id,
|
||
"current_mode": current_mode # Pass current mode to pipeline
|
||
}
|
||
|
||
start_time = time.time()
|
||
detection_result = None
|
||
|
||
if current_mode == "validation_detecting":
|
||
# ═══ TRACK VALIDATION MODE ═══
|
||
# Run tracking-based validation with track ID stability
|
||
logger.debug(f"🔍 Camera {camera_id}: In validation_detecting mode - running track-based validation")
|
||
|
||
# Get tracking configuration from model_tree
|
||
tracking_config = model_tree.get("tracking", {})
|
||
tracking_enabled = tracking_config.get("enabled", True)
|
||
stability_threshold = tracking_config.get("stabilityThreshold", 4)
|
||
|
||
# Default to "none" - only proceed after track validation
|
||
detection_result = {"class": "none", "confidence": 1.0, "bbox": [0, 0, 0, 0]}
|
||
|
||
if tracking_enabled:
|
||
# Run full tracking detection to get track IDs
|
||
from siwatsystem.pympta import run_detection_with_tracking
|
||
all_detections, regions_dict, track_validation_result = run_detection_with_tracking(cropped_frame, model_tree, pipeline_context)
|
||
|
||
if track_validation_result.get("validation_complete", False):
|
||
# Track validation completed - we have stable track IDs
|
||
stable_tracks = track_validation_result.get("stable_tracks", [])
|
||
logger.info(f"🎯 Camera {camera_id}: TRACK VALIDATION COMPLETED - stable tracks: {stable_tracks}")
|
||
|
||
# Switch to send_detections mode
|
||
update_session_pipeline_mode(camera_id, "send_detections")
|
||
|
||
# Send the best detection with stable track
|
||
if all_detections:
|
||
# Find detection with stable track ID
|
||
stable_detection = None
|
||
for detection in all_detections:
|
||
if detection.get("id") in stable_tracks:
|
||
stable_detection = detection
|
||
break
|
||
|
||
if stable_detection:
|
||
detection_result = {
|
||
"class": stable_detection.get("class", "car"),
|
||
"confidence": stable_detection.get("confidence", 0.0),
|
||
"bbox": stable_detection.get("bbox", [0, 0, 0, 0]),
|
||
"track_id": stable_detection.get("id")
|
||
}
|
||
|
||
# Store validated detection for full_pipeline mode to reuse
|
||
pipeline_state["validated_detection"] = detection_result.copy()
|
||
logger.debug(f"🔍 Camera {camera_id}: VALIDATION DEBUG - storing detection_result = {detection_result}")
|
||
logger.debug(f"🔍 Camera {camera_id}: VALIDATION DEBUG - pipeline_state after storing = {pipeline_state.get('validated_detection')}")
|
||
logger.info(f"🚗 Camera {camera_id}: SENDING STABLE DETECTION - track ID {detection_result['track_id']}")
|
||
logger.info(f"💾 Camera {camera_id}: STORED VALIDATED DETECTION for full_pipeline reuse")
|
||
else:
|
||
logger.warning(f"⚠️ Camera {camera_id}: Stable tracks found but no matching detection")
|
||
else:
|
||
# Track validation still in progress
|
||
stable_tracks = track_validation_result.get("stable_tracks", [])
|
||
current_tracks = track_validation_result.get("current_tracks", [])
|
||
|
||
if current_tracks:
|
||
track_id = current_tracks[0] if current_tracks else "None"
|
||
stable_status = "STABLE" if stable_tracks else "validating"
|
||
logger.info(f"🔍 Camera {camera_id}: TRACK VALIDATION - car track_id {track_id} ({stable_status}, need {stability_threshold} consecutive frames)")
|
||
else:
|
||
logger.debug(f"👻 Camera {camera_id}: No car detected")
|
||
|
||
logger.debug(f"📤 Camera {camera_id}: Sending 'none' (track validation in progress)")
|
||
else:
|
||
# Tracking disabled - fall back to basic detection validation
|
||
logger.debug(f"🔍 Camera {camera_id}: Tracking disabled - using basic detection validation")
|
||
from siwatsystem.pympta import run_lightweight_detection
|
||
basic_detection = run_lightweight_detection(cropped_frame, model_tree)
|
||
|
||
if basic_detection and basic_detection.get("car_detected"):
|
||
best_detection = basic_detection.get("best_detection")
|
||
|
||
# Increment validation counter for basic detection
|
||
pipeline_state["validation_counter"] += 1
|
||
current_count = pipeline_state["validation_counter"]
|
||
threshold = pipeline_state["validation_threshold"]
|
||
|
||
if current_count >= threshold:
|
||
update_session_pipeline_mode(camera_id, "send_detections")
|
||
detection_result = {
|
||
"class": best_detection.get("class", "car"),
|
||
"confidence": best_detection.get("confidence", 0.0),
|
||
"bbox": best_detection.get("bbox", [0, 0, 0, 0])
|
||
}
|
||
|
||
# Store validated detection for full_pipeline mode to reuse
|
||
pipeline_state["validated_detection"] = detection_result.copy()
|
||
logger.debug(f"🔍 Camera {camera_id}: BASIC VALIDATION DEBUG - storing detection_result = {detection_result}")
|
||
logger.info(f"💾 Camera {camera_id}: STORED BASIC VALIDATED DETECTION for full_pipeline reuse")
|
||
logger.info(f"🎯 Camera {camera_id}: BASIC VALIDATION COMPLETED after {current_count} frames")
|
||
else:
|
||
logger.info(f"📊 Camera {camera_id}: Basic validation progress {current_count}/{threshold}")
|
||
else:
|
||
# Reset validation counter
|
||
if pipeline_state["validation_counter"] > 0:
|
||
pipeline_state["validation_counter"] = 0
|
||
logger.info(f"🔄 Camera {camera_id}: Reset validation counter (no detection)")
|
||
|
||
elif current_mode == "send_detections":
|
||
# ═══ SEND DETECTIONS MODE ═══
|
||
# Validation completed - now send detection_dict for car detections, detection: null for no car
|
||
logger.debug(f"📤 Camera {camera_id}: In send_detections mode - sending detection_dict for cars")
|
||
from siwatsystem.pympta import run_lightweight_detection
|
||
basic_detection = run_lightweight_detection(cropped_frame, model_tree)
|
||
|
||
if basic_detection and basic_detection.get("car_detected"):
|
||
# Car detected - send detection_dict
|
||
best_detection = basic_detection.get("best_detection")
|
||
detection_result = {
|
||
"class": best_detection.get("class", "car"),
|
||
"confidence": best_detection.get("confidence", 0.0),
|
||
"bbox": best_detection.get("bbox", [0, 0, 0, 0])
|
||
}
|
||
logger.info(f"🚗 Camera {camera_id}: SENDING DETECTION_DICT - {detection_result['class']} (conf={detection_result['confidence']:.3f}) - backend should generate session ID")
|
||
else:
|
||
# No car detected - send "none"
|
||
detection_result = {"class": "none", "confidence": 1.0, "bbox": [0, 0, 0, 0]}
|
||
logger.debug(f"👻 Camera {camera_id}: No car detected - sending 'none'")
|
||
|
||
elif current_mode == "waiting_for_session_id":
|
||
# ═══ WAITING FOR SESSION ID MODE ═══
|
||
# Stop processing snapshots, wait for session ID
|
||
logger.debug(f"⏳ Camera {camera_id}: In waiting_for_session_id mode - not processing snapshots")
|
||
return persistent_data # Don't process or send anything
|
||
|
||
elif current_mode == "full_pipeline":
|
||
# ═══ FULL PIPELINE MODE ═══
|
||
logger.info(f"🔥 Camera {camera_id}: Running FULL PIPELINE (classification branches + Redis + PostgreSQL)")
|
||
|
||
# Use validated detection from validation phase instead of detecting again
|
||
validated_detection = pipeline_state.get("validated_detection")
|
||
logger.debug(f"🔍 Camera {camera_id}: FULL_PIPELINE DEBUG - validated_detection = {validated_detection}")
|
||
logger.debug(f"🔍 Camera {camera_id}: FULL_PIPELINE DEBUG - pipeline_state keys = {list(pipeline_state.keys())}")
|
||
if validated_detection:
|
||
logger.info(f"🔄 Camera {camera_id}: Using validated detection for full pipeline: track_id={validated_detection.get('track_id')}")
|
||
detection_result = run_pipeline(cropped_frame, model_tree, context=pipeline_context, validated_detection=validated_detection)
|
||
# Clear the validated detection after using it
|
||
pipeline_state["validated_detection"] = None
|
||
else:
|
||
logger.warning(f"⚠️ Camera {camera_id}: No validated detection found for full pipeline - this shouldn't happen")
|
||
detection_result = run_pipeline(cropped_frame, model_tree, context=pipeline_context)
|
||
|
||
if detection_result and isinstance(detection_result, dict):
|
||
# Cache the full pipeline result
|
||
cached_full_pipeline_results[camera_id] = {
|
||
"result": detection_result.copy(),
|
||
"timestamp": time.time()
|
||
}
|
||
|
||
# Note: Will cache detection_dict after branch processing completes
|
||
|
||
# Store the stable track ID for lightweight monitoring
|
||
track_id = detection_result.get("track_id") or detection_result.get("id")
|
||
if track_id is not None:
|
||
pipeline_state["stable_track_id"] = track_id
|
||
logger.info(f"💾 Camera {camera_id}: Cached stable track_id={track_id}")
|
||
else:
|
||
logger.warning(f"⚠️ Camera {camera_id}: No track_id found in detection_result: {detection_result.keys()}")
|
||
|
||
# Ensure we have a cached detection dict for lightweight mode
|
||
if not pipeline_state.get("cached_detection_dict"):
|
||
# Create fallback cached detection dict if branch processing didn't populate it
|
||
fallback_detection = {
|
||
"carModel": None,
|
||
"carBrand": None,
|
||
"carYear": None,
|
||
"bodyType": None,
|
||
"licensePlateText": None,
|
||
"licensePlateConfidence": None
|
||
}
|
||
pipeline_state["cached_detection_dict"] = fallback_detection
|
||
logger.warning(f"⚠️ Camera {camera_id}: Created fallback cached detection dict (branch processing may have failed)")
|
||
|
||
# Switch to lightweight mode
|
||
update_session_pipeline_mode(camera_id, "lightweight")
|
||
logger.info(f"✅ Camera {camera_id}: Full pipeline completed - switching to LIGHTWEIGHT mode")
|
||
|
||
elif current_mode == "lightweight":
|
||
# ═══ SIMPLIFIED LIGHTWEIGHT MODE ═══
|
||
# Send cached detection dict + check for 2 consecutive empty frames to reset
|
||
|
||
stable_track_id = pipeline_state.get("stable_track_id")
|
||
cached_detection_dict = pipeline_state.get("cached_detection_dict")
|
||
|
||
logger.debug(f"🪶 Camera {camera_id}: LIGHTWEIGHT MODE - stable_track_id={stable_track_id}")
|
||
|
||
if not pipeline_state.get("yolo_inference_enabled", True):
|
||
# YOLO inference disabled during car_fueling - continue sending cached detection dict
|
||
logger.debug(f"🛑 Camera {camera_id}: YOLO inference disabled during car_fueling - continue sending cached detection dict")
|
||
if cached_detection_dict:
|
||
detection_result = cached_detection_dict # Continue sending cached data
|
||
logger.info(f"⛽ Camera {camera_id}: YOLO disabled during car_fueling but sending cached detection dict")
|
||
else:
|
||
logger.warning(f"⚠️ Camera {camera_id}: YOLO disabled but no cached detection dict available")
|
||
detection_result = None
|
||
else:
|
||
# Run lightweight YOLO inference to check car presence for reset logic (no tracking validation needed)
|
||
from siwatsystem.pympta import run_lightweight_detection
|
||
basic_detection = run_lightweight_detection(cropped_frame, model_tree)
|
||
|
||
# Enhanced car detection: requires both confidence pass AND bbox >= 50% of frame
|
||
car_detected_confidence = basic_detection and basic_detection.get("car_detected", False)
|
||
car_detected_with_bbox_validation = False
|
||
|
||
if car_detected_confidence:
|
||
# Car passed confidence - now check bbox area
|
||
best_detection = basic_detection.get("best_detection")
|
||
if best_detection and best_detection.get("bbox"):
|
||
bbox = best_detection["bbox"]
|
||
x1, y1, x2, y2 = bbox
|
||
bbox_area = (x2 - x1) * (y2 - y1)
|
||
frame_height, frame_width = cropped_frame.shape[:2]
|
||
frame_area = frame_height * frame_width
|
||
bbox_area_ratio = bbox_area / frame_area if frame_area > 0 else 0
|
||
|
||
min_area_ratio = 0.2 # 20% of frame
|
||
car_detected_with_bbox_validation = bbox_area_ratio >= min_area_ratio
|
||
|
||
if not car_detected_with_bbox_validation:
|
||
logger.info(f"🚫 Camera {camera_id}: LIGHTWEIGHT - car detected but bbox {bbox_area_ratio:.1%} < {min_area_ratio:.0%} (too distant) - counting as absent")
|
||
else:
|
||
logger.debug(f"✅ Camera {camera_id}: LIGHTWEIGHT - car detected with valid bbox {bbox_area_ratio:.1%} >= {min_area_ratio:.0%}")
|
||
else:
|
||
logger.debug(f"⚠️ Camera {camera_id}: LIGHTWEIGHT - car detected but no bbox info available")
|
||
|
||
logger.debug(f"🔍 Camera {camera_id}: LIGHTWEIGHT - enhanced car presence check: confidence={car_detected_confidence}, bbox_valid={car_detected_with_bbox_validation}")
|
||
|
||
if car_detected_with_bbox_validation:
|
||
# Car detected - reset absence counter, continue sending cached detection dict
|
||
pipeline_state["absence_counter"] = 0 # Reset absence since cars are present
|
||
|
||
if cached_detection_dict:
|
||
detection_result = cached_detection_dict # Always send cached data
|
||
logger.info(f"💾 Camera {camera_id}: LIGHTWEIGHT - car detected, sending cached detection dict")
|
||
else:
|
||
logger.warning(f"⚠️ Camera {camera_id}: LIGHTWEIGHT - car detected but no cached detection dict available")
|
||
detection_result = None
|
||
else:
|
||
# No car detected - increment absence counter
|
||
pipeline_state["absence_counter"] += 1
|
||
absence_count = pipeline_state["absence_counter"]
|
||
max_absence = 3 # Need 3 consecutive empty frames
|
||
|
||
logger.info(f"👻 Camera {camera_id}: LIGHTWEIGHT - no car detected (absence {absence_count}/{max_absence})")
|
||
|
||
# Check if we should reset: Need BOTH 3 consecutive absence frames AND sessionId: null
|
||
current_progression = pipeline_state.get("progression_stage")
|
||
should_check_session_null = current_progression == "car_waitpayment"
|
||
|
||
if absence_count >= max_absence:
|
||
if should_check_session_null:
|
||
# In car_waitpayment stage - require BOTH conditions
|
||
if backend_session_id is None:
|
||
# Both conditions met: 3 absence frames + sessionId: null
|
||
logger.info(f"🔄 Camera {camera_id}: DUAL RESET CONDITIONS MET - {max_absence} consecutive absence frames + sessionId: null")
|
||
|
||
# Clear all state and prepare for next car
|
||
cached_full_pipeline_results.pop(camera_id, None)
|
||
pipeline_state["cached_detection_dict"] = None
|
||
pipeline_state["stable_track_id"] = None
|
||
pipeline_state["validated_detection"] = None
|
||
pipeline_state["progression_stage"] = None
|
||
old_absence_counter = pipeline_state["absence_counter"]
|
||
old_validation_counter = pipeline_state.get("validation_counter", 0)
|
||
pipeline_state["absence_counter"] = 0
|
||
pipeline_state["validation_counter"] = 0
|
||
pipeline_state["yolo_inference_enabled"] = True
|
||
|
||
logger.info(f"🧹 Camera {camera_id}: DUAL RESET - absence_counter: {old_absence_counter}→0, validation_counter: {old_validation_counter}→0, progression_stage: {current_progression}→None")
|
||
|
||
# Clear stability tracking data for this camera
|
||
from siwatsystem.pympta import reset_camera_stability_tracking
|
||
reset_camera_stability_tracking(camera_id, model_tree.get("modelId", "unknown"))
|
||
|
||
# Switch back to validation phase
|
||
update_session_pipeline_mode(camera_id, "validation_detecting")
|
||
logger.info(f"✅ Camera {camera_id}: DUAL RESET TO VALIDATION COMPLETE - ready for new car")
|
||
|
||
# Now in validation mode - send what YOLO detection finds (will be null since no car)
|
||
detection_result = {"class": "none", "confidence": 1.0, "bbox": [0, 0, 0, 0]}
|
||
else:
|
||
# Only absence frames met, but sessionId is not null - continue sending cached detection
|
||
logger.info(f"⏳ Camera {camera_id}: {max_absence} absence frames reached but sessionId={backend_session_id} (not null) - continuing with cached detection")
|
||
if cached_detection_dict:
|
||
detection_result = cached_detection_dict
|
||
else:
|
||
logger.warning(f"⚠️ Camera {camera_id}: No cached detection dict available")
|
||
detection_result = None
|
||
else:
|
||
# Not in car_waitpayment - use original simple reset condition (situation 1)
|
||
logger.info(f"🔄 Camera {camera_id}: SIMPLE RESET CONDITION MET - {max_absence} consecutive empty frames (not in car_waitpayment)")
|
||
|
||
# Clear all state and prepare for next car
|
||
cached_full_pipeline_results.pop(camera_id, None)
|
||
pipeline_state["cached_detection_dict"] = None
|
||
pipeline_state["stable_track_id"] = None
|
||
pipeline_state["validated_detection"] = None
|
||
pipeline_state["progression_stage"] = None
|
||
old_absence_counter = pipeline_state["absence_counter"]
|
||
old_validation_counter = pipeline_state.get("validation_counter", 0)
|
||
pipeline_state["absence_counter"] = 0
|
||
pipeline_state["validation_counter"] = 0
|
||
pipeline_state["yolo_inference_enabled"] = True
|
||
|
||
logger.info(f"🧹 Camera {camera_id}: SIMPLE RESET - absence_counter: {old_absence_counter}→0, validation_counter: {old_validation_counter}→0")
|
||
|
||
# Clear stability tracking data for this camera
|
||
from siwatsystem.pympta import reset_camera_stability_tracking
|
||
reset_camera_stability_tracking(camera_id, model_tree.get("modelId", "unknown"))
|
||
|
||
# Switch back to validation phase
|
||
update_session_pipeline_mode(camera_id, "validation_detecting")
|
||
logger.info(f"✅ Camera {camera_id}: SIMPLE RESET TO VALIDATION COMPLETE - ready for new car")
|
||
|
||
# Now in validation mode - send what YOLO detection finds (will be null since no car)
|
||
detection_result = {"class": "none", "confidence": 1.0, "bbox": [0, 0, 0, 0]}
|
||
else:
|
||
# Still within absence threshold - continue sending cached detection dict
|
||
if cached_detection_dict:
|
||
detection_result = cached_detection_dict # Send cached data
|
||
logger.info(f"⏳ Camera {camera_id}: LIGHTWEIGHT - no car but absence<{max_absence}, still sending cached detection dict")
|
||
else:
|
||
logger.warning(f"⚠️ Camera {camera_id}: LIGHTWEIGHT - no cached detection dict available")
|
||
detection_result = None
|
||
|
||
elif current_mode == "car_gone_waiting":
|
||
# ═══ CAR GONE WAITING STATE ═══
|
||
# Car is gone (both conditions met), YOLO inference disabled, waiting for new session
|
||
|
||
logger.debug(f"🛑 Camera {camera_id}: CAR GONE WAITING - YOLO inference stopped")
|
||
|
||
# Check if backend has started a new session (indicates new car scenario)
|
||
if backend_session_id is not None:
|
||
# Backend started new session - re-enable YOLO and reset to validation
|
||
pipeline_state["yolo_inference_enabled"] = True
|
||
pipeline_state["absence_counter"] = 0
|
||
pipeline_state["stable_track_id"] = None
|
||
pipeline_state["cached_detection_dict"] = None
|
||
pipeline_state["validated_detection"] = None
|
||
|
||
# Clear stability tracking data for this camera
|
||
from siwatsystem.pympta import reset_camera_stability_tracking
|
||
reset_camera_stability_tracking(camera_id, model_tree.get("modelId", "unknown"))
|
||
|
||
update_session_pipeline_mode(camera_id, "validation_detecting")
|
||
logger.info(f"🔄 Camera {camera_id}: New session detected (id={backend_session_id}) - re-enabling YOLO inference")
|
||
logger.info(f"✅ Camera {camera_id}: Reset to validation mode - cleared all tracking, ready for new car detection")
|
||
|
||
# Don't run detection this frame - let next frame start fresh
|
||
detection_result = {"class": "none", "confidence": 1.0, "bbox": [0, 0, 0, 0]}
|
||
else:
|
||
# Still waiting - no sessionId, no detection to send
|
||
logger.debug(f"🛑 Camera {camera_id}: Car gone waiting - no YOLO inference, no data sent")
|
||
detection_result = None
|
||
|
||
process_time = (time.time() - start_time) * 1000
|
||
logger.debug(f"Detection for camera {camera_id} completed in {process_time:.2f}ms (mode: {current_mode})")
|
||
|
||
# Skip processing if no detection result (blocked by session gating)
|
||
if detection_result is None:
|
||
logger.debug(f"No detection result to process for camera {camera_id}")
|
||
return persistent_data
|
||
|
||
# Log the raw detection result for debugging
|
||
logger.debug(f"Raw detection result for camera {camera_id}:\n{json.dumps(detection_result, indent=2, default=str)}")
|
||
|
||
# Extract session_id from pipeline result (always use backend sessionId)
|
||
session_id = backend_session_id
|
||
logger.debug(f"Using backend session_id: {session_id}")
|
||
|
||
|
||
# Process detection result based on current mode
|
||
if current_mode == "validation_detecting":
|
||
# ═══ VALIDATION DETECTING MODE ═══
|
||
# Always send detection: null during validation phase
|
||
detection_dict = None
|
||
logger.debug(f"🔍 SENDING 'NONE' - validation_detecting mode for camera {camera_id}")
|
||
|
||
elif current_mode == "send_detections":
|
||
# ═══ SEND DETECTIONS MODE ═══
|
||
if detection_result.get("class") == "none":
|
||
# No car detected - send detection: null
|
||
detection_dict = None
|
||
logger.debug(f"📤 SENDING 'NONE' - send_detections mode (no car) for camera {camera_id}")
|
||
else:
|
||
# Car detected in send_detections mode - ALWAYS send empty dict to trigger backend sessionId
|
||
# Purpose: Tell backend "car is here, please create sessionId"
|
||
detection_dict = {}
|
||
logger.info(f"📤 SENDING EMPTY DETECTION_DICT - send_detections mode, requesting backend to create sessionId (conf={detection_result.get('confidence', 0):.3f}) for camera {camera_id}")
|
||
|
||
if backend_session_id:
|
||
logger.debug(f"🔄 Camera {camera_id}: Note - sessionId {backend_session_id} exists but still in send_detections mode (transition pending)")
|
||
|
||
elif current_mode == "lightweight":
|
||
# ═══ SIMPLIFIED LIGHTWEIGHT MODE DETECTION PROCESSING ═══
|
||
if detection_result.get("class") == "none":
|
||
# No car detected - this happens when resetting to validation
|
||
detection_dict = None # Send detection: null
|
||
logger.info(f"🚫 LIGHTWEIGHT - no car detected, sending detection=null")
|
||
elif isinstance(detection_result, dict) and ("carBrand" in detection_result or "carModel" in detection_result):
|
||
# Check if we're waiting for dual reset condition
|
||
current_progression = pipeline_state.get("progression_stage")
|
||
if current_progression == "car_waitpayment" and backend_session_id is None:
|
||
# In car_waitpayment + sessionId: null - STOP sending cached detection to prevent new session creation
|
||
detection_dict = None
|
||
logger.info(f"🛑 LIGHTWEIGHT - in car_waitpayment with sessionId: null, NOT sending cached detection (waiting for dual reset)")
|
||
else:
|
||
# Normal lightweight mode - send cached detection dict
|
||
detection_dict = detection_result
|
||
logger.info(f"💾 LIGHTWEIGHT - sending cached detection dict")
|
||
else:
|
||
logger.warning(f"⚠️ LIGHTWEIGHT - unexpected detection_result type: {type(detection_result)}")
|
||
detection_dict = None
|
||
|
||
elif detection_result.get("class") == "none":
|
||
# Other modes - send null to clear session
|
||
detection_dict = None
|
||
logger.info(f"📤 SENDING 'NONE' (detection: null) - Car absent, expecting backend to clear session for camera {camera_id}")
|
||
elif detection_result and "carBrand" in detection_result:
|
||
# Handle cached detection dict format (fallback for compatibility)
|
||
detection_dict = detection_result
|
||
logger.info(f"💾 Camera {camera_id}: LIGHTWEIGHT MODE - using detection_result as detection_dict:")
|
||
logger.info(f"💾 Camera {camera_id}: - detection_dict: {detection_dict}")
|
||
else:
|
||
# Valid detection - convert to backend format (will be populated by branch processing)
|
||
detection_dict = {
|
||
"carModel": None,
|
||
"carBrand": None,
|
||
"carYear": None,
|
||
"bodyType": None,
|
||
"licensePlateText": None,
|
||
"licensePlateConfidence": None
|
||
}
|
||
|
||
# Extract and process branch results from parallel classification (only for valid detections, skip cached mode)
|
||
if detection_result.get("class") != "none" and "branch_results" in detection_result and not detection_result.get("cached_mode", False):
|
||
def process_branch_results(branch_results, depth=0):
|
||
"""Recursively process branch results including nested branches."""
|
||
if not isinstance(branch_results, dict):
|
||
return
|
||
|
||
indent = " " * depth
|
||
for branch_id, branch_data in branch_results.items():
|
||
if isinstance(branch_data, dict):
|
||
logger.debug(f"{indent}Processing branch {branch_id}: {branch_data}")
|
||
|
||
# Map common classification fields to backend-expected names
|
||
if "brand" in branch_data:
|
||
detection_dict["carBrand"] = branch_data["brand"]
|
||
logger.debug(f"{indent}Mapped carBrand: {branch_data['brand']}")
|
||
if "body_type" in branch_data:
|
||
detection_dict["bodyType"] = branch_data["body_type"]
|
||
logger.debug(f"{indent}Mapped bodyType: {branch_data['body_type']}")
|
||
if "class" in branch_data:
|
||
class_name = branch_data["class"]
|
||
|
||
# Map based on branch/model type
|
||
if "brand" in branch_id.lower():
|
||
detection_dict["carBrand"] = class_name
|
||
logger.debug(f"{indent}Mapped carBrand from class: {class_name}")
|
||
elif "bodytype" in branch_id.lower() or "body" in branch_id.lower():
|
||
detection_dict["bodyType"] = class_name
|
||
logger.debug(f"{indent}Mapped bodyType from class: {class_name}")
|
||
|
||
# Process nested branch results recursively
|
||
if "branch_results" in branch_data:
|
||
logger.debug(f"{indent}Processing nested branches in {branch_id}")
|
||
process_branch_results(branch_data["branch_results"], depth + 1)
|
||
|
||
branch_results = detection_result.get("branch_results", {})
|
||
if branch_results:
|
||
logger.debug(f"Processing branch results: {branch_results}")
|
||
process_branch_results(branch_results)
|
||
logger.info(f"Detection payload after branch processing: {detection_dict}")
|
||
|
||
# Cache the detection_dict for lightweight mode (after branch processing completes)
|
||
if current_mode == "full_pipeline":
|
||
pipeline_state = get_or_init_session_pipeline_state(camera_id)
|
||
pipeline_state["cached_detection_dict"] = detection_dict.copy()
|
||
logger.info(f"💾 Camera {camera_id}: CACHED DETECTION DICT after branch processing: {detection_dict}")
|
||
|
||
else:
|
||
logger.debug("No branch results found in detection result")
|
||
|
||
detection_data = {
|
||
"type": "imageDetection",
|
||
"subscriptionIdentifier": stream["subscriptionIdentifier"],
|
||
"timestamp": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
|
||
# "timestamp": time.strftime("%Y-%m-%dT%H:%M:%S", time.gmtime()) + f".{int(time.time() * 1000) % 1000:03d}Z",
|
||
"data": {
|
||
"detection": detection_dict,
|
||
"modelId": stream["modelId"],
|
||
"modelName": stream["modelName"]
|
||
}
|
||
}
|
||
|
||
# SessionId should NEVER be sent from worker to backend - it's uni-directional (backend -> worker only)
|
||
# Backend manages sessionIds independently based on detection content
|
||
logger.debug(f"TX message prepared (no sessionId) - detection_dict type: {type(detection_dict)}")
|
||
|
||
# Log detection details for different modes
|
||
if current_mode == "lightweight":
|
||
if detection_result and detection_result.get("class") == "none":
|
||
logger.info(f"🚫 Camera {camera_id}: LIGHTWEIGHT - No car detected (resetting to validation)")
|
||
elif isinstance(detection_result, dict) and ("carBrand" in detection_result or "carModel" in detection_result):
|
||
logger.info(f"💾 Camera {camera_id}: LIGHTWEIGHT - Sending cached detection data")
|
||
else:
|
||
logger.info(f"🪶 Camera {camera_id}: LIGHTWEIGHT - Processing detection")
|
||
elif detection_result and "class" in detection_result and detection_result.get("class") != "none":
|
||
confidence = detection_result.get("confidence", 0.0)
|
||
logger.info(f"🚗 Camera {camera_id}: Detected {detection_result['class']} with confidence {confidence:.2f} using model {stream['modelName']}")
|
||
|
||
# Send detection data to backend (session gating handled above in processing logic)
|
||
logger.debug(f"📤 SENDING TO BACKEND for camera {camera_id}: {json.dumps(detection_data, indent=2)}")
|
||
try:
|
||
ws_logger.info(f"TX -> {json.dumps(detection_data, separators=(',', ':'))}")
|
||
await websocket.send_json(detection_data)
|
||
logger.debug(f"Sent detection data to client for camera {camera_id}")
|
||
|
||
# Cache the detection data for potential resubscriptions (only if not null detection)
|
||
if detection_dict is not None and detection_result.get("class") != "none":
|
||
cached_detections[camera_id] = detection_data.copy()
|
||
logger.debug(f"Cached detection for camera {camera_id}: {detection_dict}")
|
||
|
||
# Enhanced caching: Store by session_id for LPR integration
|
||
session_id = detection_data.get('sessionId')
|
||
if session_id:
|
||
session_id_str = str(session_id)
|
||
session_detections[session_id_str] = detection_data.copy()
|
||
session_to_camera[session_id_str] = camera_id
|
||
detection_timestamps[session_id_str] = time.time()
|
||
logger.debug(f"🔑 Cached detection for LPR by session_id {session_id_str}: {camera_id}")
|
||
else:
|
||
# Don't cache null/none detections - let them reset properly
|
||
cached_detections.pop(camera_id, None)
|
||
logger.debug(f"Not caching null/none detection for camera {camera_id}")
|
||
|
||
except RuntimeError as e:
|
||
if "websocket.close" in str(e):
|
||
logger.warning(f"WebSocket connection closed - cannot send detection data for camera {camera_id}")
|
||
return persistent_data
|
||
else:
|
||
raise
|
||
|
||
# Log status after sending (no sessionId sent to backend)
|
||
if detection_dict is None:
|
||
logger.info(f"📡 SENT 'none' detection - backend should clear session for camera {camera_id}")
|
||
elif detection_dict == {}:
|
||
logger.info(f"📡 SENT empty detection - backend should create sessionId for camera {camera_id}")
|
||
else:
|
||
logger.info(f"📡 SENT detection data - backend manages sessionId independently for camera {camera_id}")
|
||
return persistent_data
|
||
except Exception as e:
|
||
logger.error(f"Error in handle_detection for camera {camera_id}: {str(e)}", exc_info=True)
|
||
return persistent_data
|
||
|
||
def frame_reader(camera_id, cap, buffer, stop_event):
|
||
retries = 0
|
||
logger.info(f"Starting frame reader thread for camera {camera_id}")
|
||
frame_count = 0
|
||
last_log_time = time.time()
|
||
|
||
try:
|
||
# Log initial camera status and properties
|
||
if cap.isOpened():
|
||
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
|
||
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
|
||
fps = cap.get(cv2.CAP_PROP_FPS)
|
||
logger.info(f"Camera {camera_id} opened successfully with resolution {width}x{height}, FPS: {fps}")
|
||
set_camera_connected(camera_id, True)
|
||
else:
|
||
logger.error(f"Camera {camera_id} failed to open initially")
|
||
set_camera_connected(camera_id, False, "Failed to open camera initially")
|
||
|
||
while not stop_event.is_set():
|
||
try:
|
||
if not cap.isOpened():
|
||
logger.error(f"Camera {camera_id} is not open before trying to read")
|
||
# Attempt to reopen
|
||
cap = cv2.VideoCapture(streams[camera_id]["rtsp_url"])
|
||
time.sleep(reconnect_interval)
|
||
continue
|
||
|
||
logger.debug(f"Attempting to read frame from camera {camera_id}")
|
||
ret, frame = cap.read()
|
||
|
||
if not ret:
|
||
error_msg = f"Connection lost for camera: {camera_id}, retry {retries+1}/{max_retries}"
|
||
logger.warning(error_msg)
|
||
set_camera_connected(camera_id, False, error_msg)
|
||
cap.release()
|
||
time.sleep(reconnect_interval)
|
||
retries += 1
|
||
if retries > max_retries and max_retries != -1:
|
||
logger.error(f"Max retries reached for camera: {camera_id}, stopping frame reader")
|
||
set_camera_connected(camera_id, False, "Max retries reached")
|
||
break
|
||
# Re-open
|
||
logger.info(f"Attempting to reopen RTSP stream for camera: {camera_id}")
|
||
cap = cv2.VideoCapture(streams[camera_id]["rtsp_url"])
|
||
if not cap.isOpened():
|
||
logger.error(f"Failed to reopen RTSP stream for camera: {camera_id}")
|
||
set_camera_connected(camera_id, False, "Failed to reopen RTSP stream")
|
||
continue
|
||
logger.info(f"Successfully reopened RTSP stream for camera: {camera_id}")
|
||
set_camera_connected(camera_id, True)
|
||
continue
|
||
|
||
# Successfully read a frame
|
||
frame_count += 1
|
||
current_time = time.time()
|
||
# Log frame stats every 5 seconds
|
||
if current_time - last_log_time > 5:
|
||
logger.info(f"Camera {camera_id}: Read {frame_count} frames in the last {current_time - last_log_time:.1f} seconds")
|
||
frame_count = 0
|
||
last_log_time = current_time
|
||
|
||
logger.debug(f"Successfully read frame from camera {camera_id}, shape: {frame.shape}")
|
||
retries = 0
|
||
set_camera_connected(camera_id, True) # Mark as connected on successful frame read
|
||
|
||
# Overwrite old frame if buffer is full
|
||
if not buffer.empty():
|
||
try:
|
||
buffer.get_nowait()
|
||
logger.debug(f"[frame_reader] Removed old frame from buffer for camera {camera_id}")
|
||
except queue.Empty:
|
||
pass
|
||
buffer.put(frame)
|
||
logger.debug(f"[frame_reader] Added new frame to buffer for camera {camera_id}. Buffer size: {buffer.qsize()}")
|
||
|
||
# Short sleep to avoid CPU overuse
|
||
time.sleep(0.01)
|
||
|
||
except cv2.error as e:
|
||
error_msg = f"OpenCV error for camera {camera_id}: {e}"
|
||
logger.error(error_msg, exc_info=True)
|
||
set_camera_connected(camera_id, False, error_msg)
|
||
cap.release()
|
||
time.sleep(reconnect_interval)
|
||
retries += 1
|
||
if retries > max_retries and max_retries != -1:
|
||
logger.error(f"Max retries reached after OpenCV error for camera {camera_id}")
|
||
set_camera_connected(camera_id, False, "Max retries reached after OpenCV error")
|
||
break
|
||
logger.info(f"Attempting to reopen RTSP stream after OpenCV error for camera: {camera_id}")
|
||
cap = cv2.VideoCapture(streams[camera_id]["rtsp_url"])
|
||
if not cap.isOpened():
|
||
logger.error(f"Failed to reopen RTSP stream for camera {camera_id} after OpenCV error")
|
||
set_camera_connected(camera_id, False, "Failed to reopen after OpenCV error")
|
||
continue
|
||
logger.info(f"Successfully reopened RTSP stream after OpenCV error for camera: {camera_id}")
|
||
set_camera_connected(camera_id, True)
|
||
except Exception as e:
|
||
error_msg = f"Unexpected error for camera {camera_id}: {str(e)}"
|
||
logger.error(error_msg, exc_info=True)
|
||
set_camera_connected(camera_id, False, error_msg)
|
||
cap.release()
|
||
break
|
||
except Exception as e:
|
||
logger.error(f"Error in frame_reader thread for camera {camera_id}: {str(e)}", exc_info=True)
|
||
finally:
|
||
logger.info(f"Frame reader thread for camera {camera_id} is exiting")
|
||
if cap and cap.isOpened():
|
||
cap.release()
|
||
|
||
def snapshot_reader(camera_id, snapshot_url, snapshot_interval, buffer, stop_event):
|
||
"""Frame reader that fetches snapshots from HTTP/HTTPS URL at specified intervals"""
|
||
retries = 0
|
||
consecutive_failures = 0 # Track consecutive failures for backoff
|
||
logger.info(f"Starting snapshot reader thread for camera {camera_id} from {snapshot_url}")
|
||
frame_count = 0
|
||
last_log_time = time.time()
|
||
|
||
# Initialize camera state
|
||
set_camera_connected(camera_id, True)
|
||
|
||
try:
|
||
interval_seconds = snapshot_interval / 1000.0 # Convert milliseconds to seconds
|
||
logger.info(f"Snapshot interval for camera {camera_id}: {interval_seconds}s")
|
||
|
||
while not stop_event.is_set():
|
||
try:
|
||
start_time = time.time()
|
||
frame = fetch_snapshot(snapshot_url)
|
||
|
||
if frame is None:
|
||
consecutive_failures += 1
|
||
error_msg = f"Failed to fetch snapshot for camera: {camera_id}, consecutive failures: {consecutive_failures}"
|
||
logger.warning(error_msg)
|
||
set_camera_connected(camera_id, False, error_msg)
|
||
retries += 1
|
||
|
||
# Check network connectivity with a simple ping-like test
|
||
if consecutive_failures % 5 == 1: # Every 5th failure, test connectivity
|
||
try:
|
||
test_response = requests.get(snapshot_url, timeout=(2, 5), stream=False)
|
||
logger.info(f"Camera {camera_id}: Connectivity test result: {test_response.status_code}")
|
||
except Exception as test_error:
|
||
logger.warning(f"Camera {camera_id}: Connectivity test failed: {test_error}")
|
||
|
||
if retries > max_retries and max_retries != -1:
|
||
logger.error(f"Max retries reached for snapshot camera: {camera_id}, stopping reader")
|
||
set_camera_connected(camera_id, False, "Max retries reached for snapshot camera")
|
||
break
|
||
|
||
# Exponential backoff based on consecutive failures
|
||
backoff_delay = min(30, max(1, min(2 ** min(consecutive_failures - 1, 6), interval_seconds * 2))) # Start with 1s, max 30s
|
||
logger.debug(f"Camera {camera_id}: Backing off for {backoff_delay:.1f}s (consecutive failures: {consecutive_failures})")
|
||
if stop_event.wait(backoff_delay): # Use wait with timeout instead of sleep
|
||
break # Exit if stop_event is set during backoff
|
||
continue
|
||
|
||
# Successfully fetched a frame - reset consecutive failures
|
||
consecutive_failures = 0 # Reset backoff on success
|
||
frame_count += 1
|
||
current_time = time.time()
|
||
# Log frame stats every 5 seconds
|
||
if current_time - last_log_time > 5:
|
||
logger.info(f"Camera {camera_id}: Fetched {frame_count} snapshots in the last {current_time - last_log_time:.1f} seconds")
|
||
frame_count = 0
|
||
last_log_time = current_time
|
||
|
||
logger.debug(f"Successfully fetched snapshot from camera {camera_id}, shape: {frame.shape}")
|
||
retries = 0
|
||
set_camera_connected(camera_id, True) # Mark as connected on successful snapshot
|
||
|
||
# Overwrite old frame if buffer is full
|
||
if not buffer.empty():
|
||
try:
|
||
buffer.get_nowait()
|
||
logger.debug(f"[snapshot_reader] Removed old snapshot from buffer for camera {camera_id}")
|
||
except queue.Empty:
|
||
pass
|
||
buffer.put(frame)
|
||
logger.debug(f"[snapshot_reader] Added new snapshot to buffer for camera {camera_id}. Buffer size: {buffer.qsize()}")
|
||
|
||
# Wait for the specified interval
|
||
elapsed = time.time() - start_time
|
||
sleep_time = max(interval_seconds - elapsed, 0)
|
||
if sleep_time > 0:
|
||
time.sleep(sleep_time)
|
||
|
||
except Exception as e:
|
||
consecutive_failures += 1
|
||
error_msg = f"Unexpected error fetching snapshot for camera {camera_id}: {str(e)}"
|
||
logger.error(error_msg, exc_info=True)
|
||
set_camera_connected(camera_id, False, error_msg)
|
||
retries += 1
|
||
if retries > max_retries and max_retries != -1:
|
||
logger.error(f"Max retries reached after error for snapshot camera {camera_id}")
|
||
set_camera_connected(camera_id, False, "Max retries reached after error")
|
||
break
|
||
|
||
# Exponential backoff for exceptions too
|
||
backoff_delay = min(30, max(1, min(2 ** min(consecutive_failures - 1, 6), interval_seconds * 2))) # Start with 1s, max 30s
|
||
logger.debug(f"Camera {camera_id}: Exception backoff for {backoff_delay:.1f}s (consecutive failures: {consecutive_failures})")
|
||
if stop_event.wait(backoff_delay): # Use wait with timeout instead of sleep
|
||
break # Exit if stop_event is set during backoff
|
||
except Exception as e:
|
||
logger.error(f"Error in snapshot_reader thread for camera {camera_id}: {str(e)}", exc_info=True)
|
||
finally:
|
||
logger.info(f"Snapshot reader thread for camera {camera_id} is exiting")
|
||
|
||
async def reconcile_subscriptions(desired_subscriptions, websocket):
|
||
"""
|
||
Declarative reconciliation: Compare desired vs current subscriptions and make changes
|
||
"""
|
||
logger.info(f"Reconciling subscriptions: {len(desired_subscriptions)} desired")
|
||
|
||
with streams_lock:
|
||
# Get current subscriptions
|
||
current_subscription_ids = set(streams.keys())
|
||
desired_subscription_ids = set(sub["subscriptionIdentifier"] for sub in desired_subscriptions)
|
||
|
||
# Find what to add and remove
|
||
to_add = desired_subscription_ids - current_subscription_ids
|
||
to_remove = current_subscription_ids - desired_subscription_ids
|
||
to_check_for_changes = current_subscription_ids & desired_subscription_ids
|
||
|
||
logger.info(f"Reconciliation: {len(to_add)} to add, {len(to_remove)} to remove, {len(to_check_for_changes)} to check for changes")
|
||
|
||
# Remove subscriptions that are no longer wanted
|
||
for subscription_id in to_remove:
|
||
await unsubscribe_internal(subscription_id)
|
||
|
||
# Check existing subscriptions for parameter changes
|
||
for subscription_id in to_check_for_changes:
|
||
desired_sub = next(sub for sub in desired_subscriptions if sub["subscriptionIdentifier"] == subscription_id)
|
||
current_stream = streams[subscription_id]
|
||
|
||
# Check if parameters changed
|
||
if has_subscription_changed(desired_sub, current_stream):
|
||
logger.info(f"Parameters changed for {subscription_id}, resubscribing")
|
||
logger.debug(f"Parameter comparison for {subscription_id}:")
|
||
logger.debug(f" rtspUrl: '{desired_sub.get('rtspUrl')}' vs '{current_stream.get('rtsp_url')}'")
|
||
logger.debug(f" snapshotUrl: '{desired_sub.get('snapshotUrl')}' vs '{current_stream.get('snapshot_url')}'")
|
||
logger.debug(f" modelUrl: '{extract_model_file_identifier(desired_sub.get('modelUrl'))}' vs '{extract_model_file_identifier(current_stream.get('modelUrl'))}'")
|
||
logger.debug(f" modelId: {desired_sub.get('modelId')} vs {current_stream.get('modelId')}")
|
||
|
||
# Preserve detection state for resubscription
|
||
cached_detection = cached_detections.get(subscription_id)
|
||
logger.debug(f"Preserving detection state for resubscription: {cached_detection is not None}")
|
||
|
||
await unsubscribe_internal(subscription_id, preserve_detection=True)
|
||
await subscribe_internal(desired_sub, websocket, cached_detection=cached_detection)
|
||
|
||
# Add new subscriptions
|
||
for subscription_id in to_add:
|
||
desired_sub = next(sub for sub in desired_subscriptions if sub["subscriptionIdentifier"] == subscription_id)
|
||
await subscribe_internal(desired_sub, websocket)
|
||
|
||
def extract_model_file_identifier(model_url):
|
||
"""Extract the core model file identifier from S3 URLs, ignoring timestamp parameters"""
|
||
if not model_url:
|
||
return None
|
||
|
||
# For S3 URLs, extract just the path portion before query parameters
|
||
try:
|
||
from urllib.parse import urlparse
|
||
parsed = urlparse(model_url)
|
||
# Return the path which contains the actual model file identifier
|
||
# e.g. "/adsist-cms-staging/models/bangchak_poc-1756312318569.mpta"
|
||
return parsed.path
|
||
except Exception as e:
|
||
logger.warning(f"Failed to parse model URL {model_url}: {e}")
|
||
return model_url
|
||
|
||
def has_subscription_changed(desired_sub, current_stream):
|
||
"""Check if subscription parameters have changed"""
|
||
# Smart model URL comparison - ignore timestamp changes in signed URLs
|
||
desired_model_id = extract_model_file_identifier(desired_sub.get("modelUrl"))
|
||
current_model_id = extract_model_file_identifier(current_stream.get("modelUrl"))
|
||
|
||
return (
|
||
desired_sub.get("rtspUrl") != current_stream.get("rtsp_url") or
|
||
desired_sub.get("snapshotUrl") != current_stream.get("snapshot_url") or
|
||
desired_sub.get("snapshotInterval") != current_stream.get("snapshot_interval") or
|
||
desired_sub.get("cropX1") != current_stream.get("cropX1") or
|
||
desired_sub.get("cropY1") != current_stream.get("cropY1") or
|
||
desired_sub.get("cropX2") != current_stream.get("cropX2") or
|
||
desired_sub.get("cropY2") != current_stream.get("cropY2") or
|
||
desired_sub.get("modelId") != current_stream.get("modelId") or
|
||
desired_sub.get("modelName") != current_stream.get("modelName") or
|
||
desired_model_id != current_model_id
|
||
)
|
||
|
||
async def subscribe_internal(subscription, websocket, cached_detection=None):
|
||
"""Internal subscription logic extracted from original subscribe handler"""
|
||
subscriptionIdentifier = subscription.get("subscriptionIdentifier")
|
||
rtsp_url = subscription.get("rtspUrl")
|
||
snapshot_url = subscription.get("snapshotUrl")
|
||
snapshot_interval = subscription.get("snapshotInterval")
|
||
model_url = subscription.get("modelUrl")
|
||
modelId = subscription.get("modelId")
|
||
modelName = subscription.get("modelName")
|
||
cropX1 = subscription.get("cropX1")
|
||
cropY1 = subscription.get("cropY1")
|
||
cropX2 = subscription.get("cropX2")
|
||
cropY2 = subscription.get("cropY2")
|
||
|
||
# Extract camera_id from subscriptionIdentifier
|
||
parts = subscriptionIdentifier.split(';')
|
||
if len(parts) != 2:
|
||
logger.error(f"Invalid subscriptionIdentifier format: {subscriptionIdentifier}")
|
||
return
|
||
|
||
display_identifier, camera_identifier = parts
|
||
camera_id = subscriptionIdentifier
|
||
|
||
# Load model if needed
|
||
if model_url:
|
||
with models_lock:
|
||
if (camera_id not in models) or (modelId not in models[camera_id]):
|
||
logger.info(f"Loading model from {model_url} for camera {camera_id}, modelId {modelId}")
|
||
extraction_dir = os.path.join("models", camera_identifier, str(modelId))
|
||
os.makedirs(extraction_dir, exist_ok=True)
|
||
|
||
# Handle model loading (same as original)
|
||
parsed = urlparse(model_url)
|
||
if parsed.scheme in ("http", "https"):
|
||
filename = os.path.basename(parsed.path) or f"model_{modelId}.mpta"
|
||
local_mpta = os.path.join(extraction_dir, filename)
|
||
local_path = download_mpta(model_url, local_mpta)
|
||
if not local_path:
|
||
logger.error(f"Failed to download model from {model_url}")
|
||
return
|
||
model_tree = load_pipeline_from_zip(local_path, extraction_dir)
|
||
else:
|
||
if not os.path.exists(model_url):
|
||
logger.error(f"Model file not found: {model_url}")
|
||
return
|
||
model_tree = load_pipeline_from_zip(model_url, extraction_dir)
|
||
|
||
if model_tree is None:
|
||
logger.error(f"Failed to load model {modelId}")
|
||
return
|
||
|
||
if camera_id not in models:
|
||
models[camera_id] = {}
|
||
models[camera_id][modelId] = model_tree
|
||
|
||
# Start LPR integration threads after first model is loaded (only once)
|
||
global lpr_integration_started
|
||
if not lpr_integration_started and hasattr(model_tree, 'get') and model_tree.get('redis_client'):
|
||
try:
|
||
start_lpr_integration()
|
||
lpr_integration_started = True
|
||
logger.info("🚀 LPR integration started after first model load")
|
||
except Exception as e:
|
||
logger.error(f"❌ Failed to start LPR integration: {e}")
|
||
|
||
# Create stream (same logic as original)
|
||
if camera_id and (rtsp_url or snapshot_url) and len(streams) < max_streams:
|
||
camera_url = snapshot_url if snapshot_url else rtsp_url
|
||
|
||
# Check if we already have a stream for this camera URL
|
||
shared_stream = camera_streams.get(camera_url)
|
||
|
||
if shared_stream:
|
||
# Reuse existing stream
|
||
buffer = shared_stream["buffer"]
|
||
stop_event = shared_stream["stop_event"]
|
||
thread = shared_stream["thread"]
|
||
mode = shared_stream["mode"]
|
||
shared_stream["ref_count"] = shared_stream.get("ref_count", 0) + 1
|
||
else:
|
||
# Create new stream
|
||
buffer = queue.Queue(maxsize=1)
|
||
stop_event = threading.Event()
|
||
|
||
if snapshot_url and snapshot_interval:
|
||
thread = threading.Thread(target=snapshot_reader, args=(camera_id, snapshot_url, snapshot_interval, buffer, stop_event))
|
||
thread.daemon = True
|
||
thread.start()
|
||
mode = "snapshot"
|
||
shared_stream = {
|
||
"buffer": buffer, "thread": thread, "stop_event": stop_event,
|
||
"mode": mode, "url": snapshot_url, "snapshot_interval": snapshot_interval, "ref_count": 1
|
||
}
|
||
camera_streams[camera_url] = shared_stream
|
||
elif rtsp_url:
|
||
cap = cv2.VideoCapture(rtsp_url)
|
||
if not cap.isOpened():
|
||
logger.error(f"Failed to open RTSP stream for camera {camera_id}")
|
||
return
|
||
thread = threading.Thread(target=frame_reader, args=(camera_id, cap, buffer, stop_event))
|
||
thread.daemon = True
|
||
thread.start()
|
||
mode = "rtsp"
|
||
shared_stream = {
|
||
"buffer": buffer, "thread": thread, "stop_event": stop_event,
|
||
"mode": mode, "url": rtsp_url, "cap": cap, "ref_count": 1
|
||
}
|
||
camera_streams[camera_url] = shared_stream
|
||
else:
|
||
logger.error(f"No valid URL provided for camera {camera_id}")
|
||
return
|
||
|
||
# Create stream info
|
||
stream_info = {
|
||
"buffer": buffer, "thread": thread, "stop_event": stop_event,
|
||
"modelId": modelId, "modelName": modelName, "subscriptionIdentifier": subscriptionIdentifier,
|
||
"cropX1": cropX1, "cropY1": cropY1, "cropX2": cropX2, "cropY2": cropY2,
|
||
"mode": mode, "camera_url": camera_url, "modelUrl": model_url,
|
||
# Always store both URLs for comparison consistency
|
||
"rtsp_url": rtsp_url,
|
||
"snapshot_url": snapshot_url,
|
||
"snapshot_interval": snapshot_interval
|
||
}
|
||
|
||
if mode == "rtsp":
|
||
stream_info["cap"] = shared_stream["cap"]
|
||
|
||
streams[camera_id] = stream_info
|
||
subscription_to_camera[camera_id] = camera_url
|
||
logger.info(f"Subscribed to camera {camera_id}")
|
||
|
||
# Send initial detection to backend - use cached if available, otherwise "none"
|
||
if cached_detection:
|
||
# Restore cached detection with updated timestamp (RESUBSCRIPTION STATUS UPDATE)
|
||
initial_detection_data = cached_detection.copy()
|
||
initial_detection_data["timestamp"] = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())
|
||
logger.info(f"📡 RESUBSCRIPTION: Restoring cached detection for camera {camera_id}")
|
||
logger.debug(f"📡 RESUBSCRIPTION: Cached detection has sessionId: {initial_detection_data.get('sessionId', 'None')}")
|
||
else:
|
||
# Send "none" detection for new subscriptions
|
||
initial_detection_data = {
|
||
"type": "imageDetection",
|
||
"subscriptionIdentifier": subscriptionIdentifier,
|
||
"timestamp": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
|
||
"data": {
|
||
"detection": None,
|
||
"modelId": modelId,
|
||
"modelName": modelName
|
||
}
|
||
}
|
||
logger.info(f"📡 NEW SUBSCRIPTION: Sending initial 'none' detection for camera {camera_id}")
|
||
|
||
ws_logger.info(f"TX -> {json.dumps(initial_detection_data, separators=(',', ':'))}")
|
||
await websocket.send_json(initial_detection_data)
|
||
logger.debug(f"Initial detection data sent (resubscription={cached_detection is not None}): {initial_detection_data}")
|
||
|
||
# This cached detection was just a one-time status update for resubscription
|
||
# Normal frame processing will continue independently
|
||
|
||
async def unsubscribe_internal(subscription_id, preserve_detection=False):
|
||
"""Internal unsubscription logic"""
|
||
if subscription_id in streams:
|
||
stream = streams.pop(subscription_id)
|
||
camera_url = subscription_to_camera.pop(subscription_id, None)
|
||
|
||
if camera_url and camera_url in camera_streams:
|
||
shared_stream = camera_streams[camera_url]
|
||
shared_stream["ref_count"] -= 1
|
||
|
||
if shared_stream["ref_count"] <= 0:
|
||
shared_stream["stop_event"].set()
|
||
shared_stream["thread"].join()
|
||
if "cap" in shared_stream:
|
||
shared_stream["cap"].release()
|
||
del camera_streams[camera_url]
|
||
|
||
latest_frames.pop(subscription_id, None)
|
||
if not preserve_detection:
|
||
cached_detections.pop(subscription_id, None) # Clear cached detection only if not preserving
|
||
frame_skip_flags.pop(subscription_id, None) # Clear frame skip flag
|
||
camera_states.pop(subscription_id, None) # Clear camera state
|
||
cached_full_pipeline_results.pop(subscription_id, None) # Clear cached pipeline results
|
||
session_pipeline_states.pop(subscription_id, None) # Clear session pipeline state
|
||
cleanup_camera_stability(subscription_id)
|
||
logger.info(f"Unsubscribed from camera {subscription_id} (preserve_detection={preserve_detection})")
|
||
|
||
async def process_streams():
|
||
logger.info("Started processing streams")
|
||
try:
|
||
while True:
|
||
start_time = time.time()
|
||
with streams_lock:
|
||
current_streams = list(streams.items())
|
||
if current_streams:
|
||
logger.debug(f"Processing {len(current_streams)} active streams")
|
||
else:
|
||
logger.debug("No active streams to process")
|
||
|
||
for camera_id, stream in current_streams:
|
||
buffer = stream["buffer"]
|
||
if buffer.empty():
|
||
logger.debug(f"Frame buffer is empty for camera {camera_id}")
|
||
continue
|
||
|
||
logger.debug(f"Got frame from buffer for camera {camera_id}")
|
||
frame = buffer.get()
|
||
|
||
# Cache the frame for REST API access
|
||
latest_frames[camera_id] = frame.copy()
|
||
logger.debug(f"Cached frame for REST API access for camera {camera_id}")
|
||
|
||
with models_lock:
|
||
model_tree = models.get(camera_id, {}).get(stream["modelId"])
|
||
if not model_tree:
|
||
logger.warning(f"Model not found for camera {camera_id}, modelId {stream['modelId']}")
|
||
continue
|
||
logger.debug(f"Found model tree for camera {camera_id}, modelId {stream['modelId']}")
|
||
|
||
key = (camera_id, stream["modelId"])
|
||
persistent_data = persistent_data_dict.get(key, {})
|
||
logger.debug(f"Starting detection for camera {camera_id} with modelId {stream['modelId']}")
|
||
updated_persistent_data = await handle_detection(
|
||
camera_id, stream, frame, websocket, model_tree, persistent_data
|
||
)
|
||
persistent_data_dict[key] = updated_persistent_data
|
||
|
||
elapsed_time = (time.time() - start_time) * 1000 # ms
|
||
sleep_time = max(poll_interval - elapsed_time, 0)
|
||
logger.debug(f"Frame processing cycle: {elapsed_time:.2f}ms, sleeping for: {sleep_time:.2f}ms")
|
||
await asyncio.sleep(sleep_time / 1000.0)
|
||
except asyncio.CancelledError:
|
||
logger.info("Stream processing task cancelled")
|
||
except Exception as e:
|
||
logger.error(f"Error in process_streams: {str(e)}", exc_info=True)
|
||
|
||
async def send_heartbeat():
|
||
while True:
|
||
try:
|
||
cpu_usage = psutil.cpu_percent()
|
||
memory_usage = psutil.virtual_memory().percent
|
||
if torch.cuda.is_available():
|
||
gpu_usage = torch.cuda.utilization() if hasattr(torch.cuda, 'utilization') else None
|
||
gpu_memory_usage = torch.cuda.memory_reserved() / (1024 ** 2)
|
||
else:
|
||
gpu_usage = None
|
||
gpu_memory_usage = None
|
||
|
||
camera_connections = [
|
||
{
|
||
"subscriptionIdentifier": stream["subscriptionIdentifier"],
|
||
"modelId": stream["modelId"],
|
||
"modelName": stream["modelName"],
|
||
"online": True,
|
||
# Include all subscription parameters for proper change detection
|
||
"rtspUrl": stream.get("rtsp_url"),
|
||
"snapshotUrl": stream.get("snapshot_url"),
|
||
"snapshotInterval": stream.get("snapshot_interval"),
|
||
**{k: v for k, v in get_crop_coords(stream).items() if v is not None}
|
||
}
|
||
for camera_id, stream in streams.items()
|
||
]
|
||
|
||
state_report = {
|
||
"type": "stateReport",
|
||
"cpuUsage": cpu_usage,
|
||
"memoryUsage": memory_usage,
|
||
"gpuUsage": gpu_usage,
|
||
"gpuMemoryUsage": gpu_memory_usage,
|
||
"cameraConnections": camera_connections
|
||
}
|
||
await websocket.send_text(json.dumps(state_report))
|
||
logger.debug(f"Sent stateReport as heartbeat: CPU {cpu_usage:.1f}%, Memory {memory_usage:.1f}%, {len(camera_connections)} active cameras")
|
||
await asyncio.sleep(HEARTBEAT_INTERVAL)
|
||
except Exception as e:
|
||
logger.error(f"Error sending stateReport heartbeat: {e}")
|
||
break
|
||
|
||
async def on_message():
|
||
while True:
|
||
try:
|
||
msg = await websocket.receive_text()
|
||
ws_logger.info(f"RX <- {msg}")
|
||
logger.debug(f"Received message: {msg}")
|
||
data = json.loads(msg)
|
||
msg_type = data.get("type")
|
||
|
||
if msg_type == "setSubscriptionList":
|
||
# Declarative approach: Backend sends list of subscriptions this worker should have
|
||
desired_subscriptions = data.get("subscriptions", [])
|
||
logger.info(f"Received subscription list with {len(desired_subscriptions)} subscriptions")
|
||
|
||
await reconcile_subscriptions(desired_subscriptions, websocket)
|
||
|
||
elif msg_type == "subscribe":
|
||
# Legacy support - convert single subscription to list
|
||
payload = data.get("payload", {})
|
||
await reconcile_subscriptions([payload], websocket)
|
||
|
||
elif msg_type == "unsubscribe":
|
||
# Legacy support - remove subscription
|
||
payload = data.get("payload", {})
|
||
subscriptionIdentifier = payload.get("subscriptionIdentifier")
|
||
# Remove from current subscriptions and reconcile
|
||
current_subs = []
|
||
with streams_lock:
|
||
for camera_id, stream in streams.items():
|
||
if stream["subscriptionIdentifier"] != subscriptionIdentifier:
|
||
# Convert stream back to subscription format
|
||
current_subs.append({
|
||
"subscriptionIdentifier": stream["subscriptionIdentifier"],
|
||
"rtspUrl": stream.get("rtsp_url"),
|
||
"snapshotUrl": stream.get("snapshot_url"),
|
||
"snapshotInterval": stream.get("snapshot_interval"),
|
||
"modelId": stream["modelId"],
|
||
"modelName": stream["modelName"],
|
||
"modelUrl": stream.get("modelUrl", ""),
|
||
"cropX1": stream.get("cropX1"),
|
||
"cropY1": stream.get("cropY1"),
|
||
"cropX2": stream.get("cropX2"),
|
||
"cropY2": stream.get("cropY2")
|
||
})
|
||
await reconcile_subscriptions(current_subs, websocket)
|
||
|
||
elif msg_type == "old_subscribe_logic_removed":
|
||
if model_url:
|
||
with models_lock:
|
||
if (camera_id not in models) or (modelId not in models[camera_id]):
|
||
logger.info(f"Loading model from {model_url} for camera {camera_id}, modelId {modelId}")
|
||
extraction_dir = os.path.join("models", camera_identifier, str(modelId))
|
||
os.makedirs(extraction_dir, exist_ok=True)
|
||
# If model_url is remote, download it first.
|
||
parsed = urlparse(model_url)
|
||
if parsed.scheme in ("http", "https"):
|
||
logger.info(f"Downloading remote .mpta file from {model_url}")
|
||
filename = os.path.basename(parsed.path) or f"model_{modelId}.mpta"
|
||
local_mpta = os.path.join(extraction_dir, filename)
|
||
logger.debug(f"Download destination: {local_mpta}")
|
||
local_path = download_mpta(model_url, local_mpta)
|
||
if not local_path:
|
||
logger.error(f"Failed to download the remote .mpta file from {model_url}")
|
||
error_response = {
|
||
"type": "error",
|
||
"subscriptionIdentifier": subscriptionIdentifier,
|
||
"error": f"Failed to download model from {model_url}"
|
||
}
|
||
ws_logger.info(f"TX -> {json.dumps(error_response, separators=(',', ':'))}")
|
||
await websocket.send_json(error_response)
|
||
continue
|
||
model_tree = load_pipeline_from_zip(local_path, extraction_dir)
|
||
else:
|
||
logger.info(f"Loading local .mpta file from {model_url}")
|
||
# Check if file exists before attempting to load
|
||
if not os.path.exists(model_url):
|
||
logger.error(f"Local .mpta file not found: {model_url}")
|
||
logger.debug(f"Current working directory: {os.getcwd()}")
|
||
error_response = {
|
||
"type": "error",
|
||
"subscriptionIdentifier": subscriptionIdentifier,
|
||
"error": f"Model file not found: {model_url}"
|
||
}
|
||
ws_logger.info(f"TX -> {json.dumps(error_response, separators=(',', ':'))}")
|
||
await websocket.send_json(error_response)
|
||
continue
|
||
model_tree = load_pipeline_from_zip(model_url, extraction_dir)
|
||
if model_tree is None:
|
||
logger.error(f"Failed to load model {modelId} from .mpta file for camera {camera_id}")
|
||
error_response = {
|
||
"type": "error",
|
||
"subscriptionIdentifier": subscriptionIdentifier,
|
||
"error": f"Failed to load model {modelId}"
|
||
}
|
||
await websocket.send_json(error_response)
|
||
continue
|
||
if camera_id not in models:
|
||
models[camera_id] = {}
|
||
models[camera_id][modelId] = model_tree
|
||
logger.info(f"Successfully loaded model {modelId} for camera {camera_id}")
|
||
logger.debug(f"Model extraction directory: {extraction_dir}")
|
||
|
||
# Start LPR integration threads after first model is loaded (only once)
|
||
if not lpr_integration_started and hasattr(model_tree, 'get') and model_tree.get('redis_client'):
|
||
try:
|
||
start_lpr_integration()
|
||
lpr_integration_started = True
|
||
logger.info("🚀 LPR integration started after first model load")
|
||
except Exception as e:
|
||
logger.error(f"❌ Failed to start LPR integration: {e}")
|
||
if camera_id and (rtsp_url or snapshot_url):
|
||
with streams_lock:
|
||
# Determine camera URL for shared stream management
|
||
camera_url = snapshot_url if snapshot_url else rtsp_url
|
||
|
||
if camera_id not in streams and len(streams) < max_streams:
|
||
# Check if we already have a stream for this camera URL
|
||
shared_stream = camera_streams.get(camera_url)
|
||
|
||
if shared_stream:
|
||
# Reuse existing stream
|
||
logger.info(f"Reusing existing stream for camera URL: {camera_url}")
|
||
buffer = shared_stream["buffer"]
|
||
stop_event = shared_stream["stop_event"]
|
||
thread = shared_stream["thread"]
|
||
mode = shared_stream["mode"]
|
||
|
||
# Increment reference count
|
||
shared_stream["ref_count"] = shared_stream.get("ref_count", 0) + 1
|
||
else:
|
||
# Create new stream
|
||
buffer = queue.Queue(maxsize=1)
|
||
stop_event = threading.Event()
|
||
|
||
if snapshot_url and snapshot_interval:
|
||
logger.info(f"Creating new snapshot stream for camera {camera_id}: {snapshot_url}")
|
||
thread = threading.Thread(target=snapshot_reader, args=(camera_id, snapshot_url, snapshot_interval, buffer, stop_event))
|
||
thread.daemon = True
|
||
thread.start()
|
||
mode = "snapshot"
|
||
|
||
# Store shared stream info
|
||
shared_stream = {
|
||
"buffer": buffer,
|
||
"thread": thread,
|
||
"stop_event": stop_event,
|
||
"mode": mode,
|
||
"url": snapshot_url,
|
||
"snapshot_interval": snapshot_interval,
|
||
"ref_count": 1
|
||
}
|
||
camera_streams[camera_url] = shared_stream
|
||
|
||
elif rtsp_url:
|
||
logger.info(f"Creating new RTSP stream for camera {camera_id}: {rtsp_url}")
|
||
cap = cv2.VideoCapture(rtsp_url)
|
||
if not cap.isOpened():
|
||
logger.error(f"Failed to open RTSP stream for camera {camera_id}")
|
||
continue
|
||
thread = threading.Thread(target=frame_reader, args=(camera_id, cap, buffer, stop_event))
|
||
thread.daemon = True
|
||
thread.start()
|
||
mode = "rtsp"
|
||
|
||
# Store shared stream info
|
||
shared_stream = {
|
||
"buffer": buffer,
|
||
"thread": thread,
|
||
"stop_event": stop_event,
|
||
"mode": mode,
|
||
"url": rtsp_url,
|
||
"cap": cap,
|
||
"ref_count": 1
|
||
}
|
||
camera_streams[camera_url] = shared_stream
|
||
else:
|
||
logger.error(f"No valid URL provided for camera {camera_id}")
|
||
continue
|
||
|
||
# Create stream info for this subscription
|
||
stream_info = {
|
||
"buffer": buffer,
|
||
"thread": thread,
|
||
"stop_event": stop_event,
|
||
"modelId": modelId,
|
||
"modelName": modelName,
|
||
"subscriptionIdentifier": subscriptionIdentifier,
|
||
"cropX1": cropX1,
|
||
"cropY1": cropY1,
|
||
"cropX2": cropX2,
|
||
"cropY2": cropY2,
|
||
"mode": mode,
|
||
"camera_url": camera_url
|
||
}
|
||
|
||
if mode == "snapshot":
|
||
stream_info["snapshot_url"] = snapshot_url
|
||
stream_info["snapshot_interval"] = snapshot_interval
|
||
elif mode == "rtsp":
|
||
stream_info["rtsp_url"] = rtsp_url
|
||
stream_info["cap"] = shared_stream["cap"]
|
||
|
||
streams[camera_id] = stream_info
|
||
subscription_to_camera[camera_id] = camera_url
|
||
|
||
elif camera_id and camera_id in streams:
|
||
# If already subscribed, unsubscribe first
|
||
logger.info(f"Resubscribing to camera {camera_id}")
|
||
# Note: Keep models in memory for reuse across subscriptions
|
||
elif msg_type == "unsubscribe":
|
||
payload = data.get("payload", {})
|
||
subscriptionIdentifier = payload.get("subscriptionIdentifier")
|
||
camera_id = subscriptionIdentifier
|
||
with streams_lock:
|
||
if camera_id and camera_id in streams:
|
||
stream = streams.pop(camera_id)
|
||
camera_url = subscription_to_camera.pop(camera_id, None)
|
||
|
||
if camera_url and camera_url in camera_streams:
|
||
shared_stream = camera_streams[camera_url]
|
||
shared_stream["ref_count"] -= 1
|
||
|
||
# If no more references, stop the shared stream
|
||
if shared_stream["ref_count"] <= 0:
|
||
logger.info(f"Stopping shared stream for camera URL: {camera_url}")
|
||
shared_stream["stop_event"].set()
|
||
shared_stream["thread"].join()
|
||
if "cap" in shared_stream:
|
||
shared_stream["cap"].release()
|
||
del camera_streams[camera_url]
|
||
else:
|
||
logger.info(f"Shared stream for {camera_url} still has {shared_stream['ref_count']} references")
|
||
|
||
# Clean up cached frame and stability tracking
|
||
latest_frames.pop(camera_id, None)
|
||
cached_detections.pop(camera_id, None) # Clear cached detection
|
||
frame_skip_flags.pop(camera_id, None) # Clear frame skip flag
|
||
camera_states.pop(camera_id, None) # Clear camera state
|
||
cleanup_camera_stability(camera_id)
|
||
logger.info(f"Unsubscribed from camera {camera_id}")
|
||
# Note: Keep models in memory for potential reuse
|
||
elif msg_type == "requestState":
|
||
cpu_usage = psutil.cpu_percent()
|
||
memory_usage = psutil.virtual_memory().percent
|
||
if torch.cuda.is_available():
|
||
gpu_usage = torch.cuda.utilization() if hasattr(torch.cuda, 'utilization') else None
|
||
gpu_memory_usage = torch.cuda.memory_reserved() / (1024 ** 2)
|
||
else:
|
||
gpu_usage = None
|
||
gpu_memory_usage = None
|
||
|
||
camera_connections = [
|
||
{
|
||
"subscriptionIdentifier": stream["subscriptionIdentifier"],
|
||
"modelId": stream["modelId"],
|
||
"modelName": stream["modelName"],
|
||
"online": True,
|
||
# Include all subscription parameters for proper change detection
|
||
"rtspUrl": stream.get("rtsp_url"),
|
||
"snapshotUrl": stream.get("snapshot_url"),
|
||
"snapshotInterval": stream.get("snapshot_interval"),
|
||
**{k: v for k, v in get_crop_coords(stream).items() if v is not None}
|
||
}
|
||
for camera_id, stream in streams.items()
|
||
]
|
||
|
||
state_report = {
|
||
"type": "stateReport",
|
||
"cpuUsage": cpu_usage,
|
||
"memoryUsage": memory_usage,
|
||
"gpuUsage": gpu_usage,
|
||
"gpuMemoryUsage": gpu_memory_usage,
|
||
"cameraConnections": camera_connections
|
||
}
|
||
await websocket.send_text(json.dumps(state_report))
|
||
|
||
elif msg_type == "setSessionId":
|
||
payload = data.get("payload", {})
|
||
display_identifier = payload.get("displayIdentifier")
|
||
session_id = payload.get("sessionId")
|
||
|
||
# Debug sessionId value types and contents
|
||
session_id_type = type(session_id).__name__
|
||
if session_id is None:
|
||
logger.info(f"🆔 BACKEND SESSIONID RECEIVED: displayId={display_identifier}, sessionId=None (type: {session_id_type})")
|
||
logger.info(f"🔄 BACKEND WANTS TO CLEAR SESSION for display {display_identifier}")
|
||
elif session_id == "null":
|
||
logger.info(f"🆔 BACKEND SESSIONID RECEIVED: displayId={display_identifier}, sessionId='null' (type: {session_id_type})")
|
||
logger.info(f"🔄 BACKEND SENT STRING 'null' for display {display_identifier}")
|
||
elif session_id == "":
|
||
logger.info(f"🆔 BACKEND SESSIONID RECEIVED: displayId={display_identifier}, sessionId='' (empty string, type: {session_id_type})")
|
||
logger.info(f"🔄 BACKEND SENT EMPTY STRING for display {display_identifier}")
|
||
else:
|
||
logger.info(f"🆔 BACKEND SESSIONID RECEIVED: displayId={display_identifier}, sessionId='{session_id}' (type: {session_id_type}, length: {len(str(session_id))})")
|
||
logger.info(f"🔄 BACKEND CREATED/UPDATED SESSION: {session_id} for display {display_identifier}")
|
||
|
||
logger.debug(f"Full setSessionId payload: {payload}")
|
||
logger.debug(f"WebSocket message raw data: {json.dumps(data, indent=2)}")
|
||
logger.debug(f"Current active cameras: {list(streams.keys())}")
|
||
|
||
if display_identifier:
|
||
# Store session ID for this display
|
||
if session_id is None or session_id == "null" or session_id == "":
|
||
old_session_id = session_ids.get(display_identifier)
|
||
session_ids.pop(display_identifier, None)
|
||
|
||
if session_id is None:
|
||
logger.info(f"🚫 BACKEND ENDED SESSION: Cleared session ID for display {display_identifier} (was: {old_session_id}) - received None")
|
||
elif session_id == "null":
|
||
logger.info(f"🚫 BACKEND ENDED SESSION: Cleared session ID for display {display_identifier} (was: {old_session_id}) - received string 'null'")
|
||
elif session_id == "":
|
||
logger.info(f"🚫 BACKEND ENDED SESSION: Cleared session ID for display {display_identifier} (was: {old_session_id}) - received empty string")
|
||
|
||
logger.debug(f"Session IDs after clearing: {session_ids}")
|
||
|
||
# Reset tracking state for all cameras associated with this display
|
||
with streams_lock:
|
||
affected_cameras = []
|
||
for camera_id, stream in streams.items():
|
||
if stream["subscriptionIdentifier"].startswith(display_identifier + ";"):
|
||
affected_cameras.append(camera_id)
|
||
# Import here to avoid circular import
|
||
from siwatsystem.pympta import reset_tracking_state
|
||
model_id = stream.get("modelId", "unknown")
|
||
reset_tracking_state(camera_id, model_id, "backend session ended")
|
||
|
||
|
||
logger.info(f"Reset tracking for camera {camera_id} (display: {display_identifier})")
|
||
logger.debug(f"Reset tracking for {len(affected_cameras)} cameras: {affected_cameras}")
|
||
else:
|
||
old_session_id = session_ids.get(display_identifier)
|
||
session_ids[display_identifier] = session_id
|
||
logger.info(f"✅ BACKEND SESSION STARTED: Set session ID {session_id} for display {display_identifier} (previous: {old_session_id})")
|
||
logger.debug(f"Session IDs after update: {session_ids}")
|
||
logger.debug(f"🎯 CMS Backend created sessionId {session_id} after receiving detection data")
|
||
|
||
# 🔑 LPR Integration: Retroactively cache the last detection by this new session_id
|
||
session_id_str = str(session_id)
|
||
logger.info(f"🔑 LPR: Attempting to retroactively cache detection for session_id {session_id_str}")
|
||
|
||
# Find cameras associated with this display
|
||
display_cameras = []
|
||
with streams_lock:
|
||
for camera_id, stream in streams.items():
|
||
if stream["subscriptionIdentifier"].startswith(display_identifier + ";"):
|
||
display_cameras.append(camera_id)
|
||
|
||
logger.debug(f"🔍 Found {len(display_cameras)} cameras for display {display_identifier}: {display_cameras}")
|
||
|
||
# Cache the most recent detection for each camera by the new session_id
|
||
cached_count = 0
|
||
for camera_id in display_cameras:
|
||
if camera_id in cached_detections:
|
||
detection_data = cached_detections[camera_id].copy()
|
||
|
||
# Add sessionId to the detection data
|
||
detection_data['sessionId'] = session_id
|
||
|
||
# Cache by session_id for LPR lookup
|
||
session_detections[session_id_str] = detection_data
|
||
session_to_camera[session_id_str] = camera_id
|
||
detection_timestamps[session_id_str] = time.time()
|
||
cached_count += 1
|
||
|
||
logger.info(f"✅ LPR: Cached detection for session_id {session_id_str} -> camera {camera_id}")
|
||
logger.debug(f"🔍 Detection data: {detection_data.get('data', {}).get('detection', {})}")
|
||
else:
|
||
logger.debug(f"⚠️ No cached detection available for camera {camera_id}")
|
||
|
||
if cached_count > 0:
|
||
logger.info(f"🎉 LPR: Successfully cached {cached_count} detection(s) for session_id {session_id_str}")
|
||
logger.info(f"📊 Total LPR sessions now cached: {len(session_detections)}")
|
||
else:
|
||
logger.warning(f"⚠️ LPR: No detections could be cached for session_id {session_id_str}")
|
||
logger.warning(f" Display cameras: {display_cameras}")
|
||
logger.warning(f" Available cached detections: {list(cached_detections.keys())}")
|
||
|
||
# Clear waiting state for cameras associated with this display
|
||
with streams_lock:
|
||
affected_cameras = []
|
||
for camera_id, stream in streams.items():
|
||
if stream["subscriptionIdentifier"].startswith(display_identifier + ";"):
|
||
affected_cameras.append(camera_id)
|
||
from siwatsystem.pympta import get_camera_stability_data
|
||
model_id = stream.get("modelId", "unknown")
|
||
stability_data = get_camera_stability_data(camera_id, model_id)
|
||
session_state = stability_data["session_state"]
|
||
if session_state.get("waiting_for_backend_session", False):
|
||
session_state["waiting_for_backend_session"] = False
|
||
session_state["wait_start_time"] = 0.0
|
||
logger.info(f"🚀 PIPELINE UNBLOCKED: Backend sessionId {session_id} received - camera {camera_id} can proceed with database operations")
|
||
logger.debug(f"📋 Camera {camera_id}: SessionId {session_id} now available for future database operations")
|
||
logger.debug(f"Updated session state for {len(affected_cameras)} cameras: {affected_cameras}")
|
||
else:
|
||
logger.warning(f"🚨 Invalid setSessionId message: missing displayIdentifier in payload")
|
||
|
||
elif msg_type == "patchSession":
|
||
session_id = data.get("sessionId")
|
||
patch_data = data.get("data", {})
|
||
|
||
# For now, just acknowledge the patch - actual implementation depends on backend requirements
|
||
response = {
|
||
"type": "patchSessionResult",
|
||
"payload": {
|
||
"sessionId": session_id,
|
||
"success": True,
|
||
"message": "Session patch acknowledged"
|
||
}
|
||
}
|
||
ws_logger.info(f"TX -> {json.dumps(response, separators=(',', ':'))}")
|
||
await websocket.send_json(response)
|
||
logger.info(f"Acknowledged patch for session {session_id}")
|
||
|
||
elif msg_type == "setProgressionStage":
|
||
payload = data.get("payload", {})
|
||
display_identifier = payload.get("displayIdentifier")
|
||
progression_stage = payload.get("progressionStage")
|
||
|
||
logger.info(f"🏁 PROGRESSION STAGE RECEIVED: displayId={display_identifier}, stage={progression_stage}")
|
||
|
||
if display_identifier:
|
||
# Find all cameras associated with this display
|
||
with streams_lock:
|
||
affected_cameras = []
|
||
for camera_id, stream in streams.items():
|
||
if stream["subscriptionIdentifier"].startswith(display_identifier + ";"):
|
||
affected_cameras.append(camera_id)
|
||
|
||
logger.debug(f"🎯 Found {len(affected_cameras)} cameras for display {display_identifier}: {affected_cameras}")
|
||
|
||
# Handle different progression stages
|
||
for camera_id in affected_cameras:
|
||
pipeline_state = get_or_init_session_pipeline_state(camera_id)
|
||
current_mode = pipeline_state.get("mode", "validation_detecting")
|
||
|
||
if progression_stage == "car_fueling":
|
||
# Situation 2: Stop YOLO inference, continue sending cached detection dict
|
||
if current_mode == "lightweight":
|
||
pipeline_state["yolo_inference_enabled"] = False
|
||
pipeline_state["progression_stage"] = "car_fueling"
|
||
logger.info(f"⏸️ Camera {camera_id}: YOLO inference DISABLED for car_fueling stage (still sending cached detection dict)")
|
||
else:
|
||
logger.debug(f"📊 Camera {camera_id}: car_fueling received but not in lightweight mode (mode: {current_mode})")
|
||
|
||
elif progression_stage == "car_waitpayment":
|
||
# Resume YOLO inference for absence counter
|
||
pipeline_state["yolo_inference_enabled"] = True
|
||
pipeline_state["progression_stage"] = "car_waitpayment"
|
||
logger.info(f"▶️ Camera {camera_id}: YOLO inference RE-ENABLED for car_waitpayment stage")
|
||
|
||
elif progression_stage == "welcome":
|
||
# Ignore welcome messages during car_waitpayment as per requirement
|
||
current_progression = pipeline_state.get("progression_stage")
|
||
if current_progression == "car_waitpayment":
|
||
logger.info(f"🚫 Camera {camera_id}: IGNORING welcome stage (currently in car_waitpayment)")
|
||
else:
|
||
pipeline_state["progression_stage"] = "welcome"
|
||
logger.info(f"🎉 Camera {camera_id}: Progression stage set to welcome")
|
||
|
||
elif progression_stage in ["car_wait_staff"]:
|
||
pipeline_state["progression_stage"] = progression_stage
|
||
logger.info(f"📋 Camera {camera_id}: Progression stage set to {progression_stage}")
|
||
else:
|
||
logger.warning(f"🚨 Invalid setProgressionStage message: missing displayIdentifier in payload")
|
||
|
||
else:
|
||
logger.error(f"Unknown message type: {msg_type}")
|
||
except json.JSONDecodeError:
|
||
logger.error("Received invalid JSON message")
|
||
except (WebSocketDisconnect, ConnectionClosedError) as e:
|
||
logger.warning(f"WebSocket disconnected: {e}")
|
||
break
|
||
except Exception as e:
|
||
logger.error(f"Error handling message: {e}")
|
||
break
|
||
try:
|
||
await websocket.accept()
|
||
stream_task = asyncio.create_task(process_streams())
|
||
heartbeat_task = asyncio.create_task(send_heartbeat())
|
||
message_task = asyncio.create_task(on_message())
|
||
await asyncio.gather(heartbeat_task, message_task)
|
||
except Exception as e:
|
||
logger.error(f"Error in detect websocket: {e}")
|
||
finally:
|
||
stream_task.cancel()
|
||
await stream_task
|
||
with streams_lock:
|
||
# Clean up shared camera streams
|
||
for camera_url, shared_stream in camera_streams.items():
|
||
shared_stream["stop_event"].set()
|
||
shared_stream["thread"].join()
|
||
if "cap" in shared_stream:
|
||
shared_stream["cap"].release()
|
||
while not shared_stream["buffer"].empty():
|
||
try:
|
||
shared_stream["buffer"].get_nowait()
|
||
except queue.Empty:
|
||
pass
|
||
logger.info(f"Released shared camera stream for {camera_url}")
|
||
|
||
streams.clear()
|
||
camera_streams.clear()
|
||
subscription_to_camera.clear()
|
||
with models_lock:
|
||
models.clear()
|
||
latest_frames.clear()
|
||
cached_detections.clear()
|
||
frame_skip_flags.clear()
|
||
camera_states.clear()
|
||
cached_full_pipeline_results.clear()
|
||
session_pipeline_states.clear()
|
||
session_ids.clear()
|
||
# Clean up LPR integration caches
|
||
session_detections.clear()
|
||
session_to_camera.clear()
|
||
detection_timestamps.clear()
|
||
logger.info("WebSocket connection closed")
|