From 960c60accc5f152df0dd39a64f5799c182836f9b Mon Sep 17 00:00:00 2001 From: Siwat Sirichai Date: Wed, 28 May 2025 00:43:29 +0700 Subject: [PATCH 01/23] add build workflow for backend application and Docker image --- .gitea/workflows/build.yml | 35 +++++++++++++++++++++++++++++++++++ 1 file changed, 35 insertions(+) create mode 100644 .gitea/workflows/build.yml diff --git a/.gitea/workflows/build.yml b/.gitea/workflows/build.yml new file mode 100644 index 0000000..e891ac9 --- /dev/null +++ b/.gitea/workflows/build.yml @@ -0,0 +1,35 @@ +name: Build Backend Application and Docker Image + +on: + push: + branches: + - main + workflow_dispatch: + +jobs: + build-docker: + needs: build-backend + runs-on: ubuntu-latest + permissions: + packages: write + steps: + - name: Checkout code + uses: actions/checkout@v3 + + - name: Set up Docker Buildx + uses: docker/setup-buildx-action@v2 + + - name: Login to GitHub Container Registry + uses: docker/login-action@v3 + with: + registry: git.siwatsystem.com + username: ${{ github.actor }} + password: ${{ secrets.RUNNER_TOKEN }} + + - name: Build and push Docker image + uses: docker/build-push-action@v4 + with: + context: . + file: ./Dockerfile + push: true + tags: git.siwatsystem.com/adsist-cms/worker:latest \ No newline at end of file From 3511d6ad7a1162660023f74d4bff235e17c71e5d Mon Sep 17 00:00:00 2001 From: Siwat Sirichai Date: Wed, 28 May 2025 00:59:34 +0700 Subject: [PATCH 02/23] remove dependency on build-backend job in Docker build workflow --- .gitea/workflows/build.yml | 1 - 1 file changed, 1 deletion(-) diff --git a/.gitea/workflows/build.yml b/.gitea/workflows/build.yml index e891ac9..dad25b3 100644 --- a/.gitea/workflows/build.yml +++ b/.gitea/workflows/build.yml @@ -8,7 +8,6 @@ on: jobs: build-docker: - needs: build-backend runs-on: ubuntu-latest permissions: packages: write From d4754fcd27ca4197a2bd8f7c4ef202fc9feccfa0 Mon Sep 17 00:00:00 2001 From: Siwat Sirichai Date: Wed, 28 May 2025 19:18:58 +0700 Subject: [PATCH 03/23] enhance logging for model loading and pipeline processing; update log levels and add detailed error messages --- .gitignore | 5 +- app.py | 237 ++++++++++++++++++++++++++++++++---------- siwatsystem/pympta.py | 165 ++++++++++++++++++++++++----- 3 files changed, 325 insertions(+), 82 deletions(-) diff --git a/.gitignore b/.gitignore index fab3efb..ff8c99d 100644 --- a/.gitignore +++ b/.gitignore @@ -6,4 +6,7 @@ app.log __pycache__/ .mptacache -mptas \ No newline at end of file +mptas +detector_worker.log +.gitignore +no_frame_debug.log diff --git a/app.py b/app.py index cbf6186..53aa8bf 100644 --- a/app.py +++ b/app.py @@ -41,41 +41,61 @@ max_retries = config.get("max_retries", 3) # Configure logging logging.basicConfig( - level=logging.DEBUG, - format="%(asctime)s [%(levelname)s] %(message)s", + level=logging.INFO, # Set to INFO level for less verbose output + format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", handlers=[ - logging.FileHandler("app.log"), - logging.StreamHandler() + logging.FileHandler("detector_worker.log"), # Write logs to a file + logging.StreamHandler() # Also output to console ] ) +# Create a logger specifically for this application +logger = logging.getLogger("detector_worker") +logger.setLevel(logging.DEBUG) # Set app-specific logger to DEBUG level + +# Ensure all other libraries (including root) use at least INFO level +logging.getLogger().setLevel(logging.INFO) + +logger.info("Starting detector worker application") +logger.info(f"Configuration: Target FPS: {TARGET_FPS}, Max streams: {max_streams}, Max retries: {max_retries}") + # Ensure the models directory exists os.makedirs("models", exist_ok=True) +logger.info("Ensured models directory exists") # Constants for heartbeat and timeouts HEARTBEAT_INTERVAL = 2 # seconds WORKER_TIMEOUT_MS = 10000 +logger.debug(f"Heartbeat interval set to {HEARTBEAT_INTERVAL} seconds") # Locks for thread-safe operations streams_lock = threading.Lock() models_lock = threading.Lock() +logger.debug("Initialized thread locks") # Add helper to download mpta ZIP file from a remote URL def download_mpta(url: str, dest_path: str) -> str: try: + logger.info(f"Starting download of model from {url} to {dest_path}") os.makedirs(os.path.dirname(dest_path), exist_ok=True) response = requests.get(url, stream=True) if response.status_code == 200: + file_size = int(response.headers.get('content-length', 0)) + logger.info(f"Model file size: {file_size/1024/1024:.2f} MB") + downloaded = 0 with open(dest_path, "wb") as f: for chunk in response.iter_content(chunk_size=8192): f.write(chunk) - logging.info(f"Downloaded mpta file from {url} to {dest_path}") + downloaded += len(chunk) + if file_size > 0 and downloaded % (file_size // 10) < 8192: # Log approximately every 10% + logger.debug(f"Download progress: {downloaded/file_size*100:.1f}%") + logger.info(f"Successfully downloaded mpta file from {url} to {dest_path}") return dest_path else: - logging.error(f"Failed to download mpta file (status code {response.status_code})") + logger.error(f"Failed to download mpta file (status code {response.status_code}): {response.text}") return None except Exception as e: - logging.error(f"Exception downloading mpta file from {url}: {e}") + logger.error(f"Exception downloading mpta file from {url}: {str(e)}", exc_info=True) return None #################################################### @@ -83,12 +103,17 @@ def download_mpta(url: str, dest_path: str) -> str: #################################################### @app.websocket("/") async def detect(websocket: WebSocket): - logging.info("WebSocket connection accepted") + logger.info("WebSocket connection accepted") persistent_data_dict = {} async def handle_detection(camera_id, stream, frame, websocket, model_tree, persistent_data): try: + logger.debug(f"Processing frame for camera {camera_id} with model {stream['modelId']}") + start_time = time.time() detection_result = run_pipeline(frame, model_tree) + process_time = (time.time() - start_time) * 1000 + logger.debug(f"Detection for camera {camera_id} completed in {process_time:.2f}ms") + detection_data = { "type": "imageDetection", "cameraIdentifier": camera_id, @@ -99,87 +124,157 @@ async def detect(websocket: WebSocket): "modelName": stream["modelName"] } } - logging.debug(f"Sending detection data for camera {camera_id}: {detection_data}") + + if detection_result: + detection_count = len(detection_result.get("detections", [])) + logger.info(f"Camera {camera_id}: Detected {detection_count} objects with model {stream['modelName']}") + await websocket.send_json(detection_data) return persistent_data except Exception as e: - logging.error(f"Error in handle_detection for camera {camera_id}: {e}") + logger.error(f"Error in handle_detection for camera {camera_id}: {str(e)}", exc_info=True) return persistent_data def frame_reader(camera_id, cap, buffer, stop_event): retries = 0 + logger.info(f"Starting frame reader thread for camera {camera_id}") + frame_count = 0 + last_log_time = time.time() + try: + # Log initial camera status and properties + if cap.isOpened(): + width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) + height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) + fps = cap.get(cv2.CAP_PROP_FPS) + logger.info(f"Camera {camera_id} opened successfully with resolution {width}x{height}, FPS: {fps}") + else: + logger.error(f"Camera {camera_id} failed to open initially") + while not stop_event.is_set(): try: + if not cap.isOpened(): + logger.error(f"Camera {camera_id} is not open before trying to read") + # Attempt to reopen + cap = cv2.VideoCapture(streams[camera_id]["rtsp_url"]) + time.sleep(reconnect_interval) + continue + + logger.debug(f"Attempting to read frame from camera {camera_id}") ret, frame = cap.read() + if not ret: - logging.warning(f"Connection lost for camera: {camera_id}, retry {retries+1}/{max_retries}") + logger.warning(f"Connection lost for camera: {camera_id}, retry {retries+1}/{max_retries}") cap.release() time.sleep(reconnect_interval) retries += 1 if retries > max_retries and max_retries != -1: - logging.error(f"Max retries reached for camera: {camera_id}") + logger.error(f"Max retries reached for camera: {camera_id}, stopping frame reader") break # Re-open + logger.info(f"Attempting to reopen RTSP stream for camera: {camera_id}") cap = cv2.VideoCapture(streams[camera_id]["rtsp_url"]) if not cap.isOpened(): - logging.error(f"Failed to reopen RTSP stream for camera: {camera_id}") + logger.error(f"Failed to reopen RTSP stream for camera: {camera_id}") continue + logger.info(f"Successfully reopened RTSP stream for camera: {camera_id}") continue + + # Successfully read a frame + frame_count += 1 + current_time = time.time() + # Log frame stats every 5 seconds + if current_time - last_log_time > 5: + logger.info(f"Camera {camera_id}: Read {frame_count} frames in the last {current_time - last_log_time:.1f} seconds") + frame_count = 0 + last_log_time = current_time + + logger.debug(f"Successfully read frame from camera {camera_id}, shape: {frame.shape}") retries = 0 + # Overwrite old frame if buffer is full if not buffer.empty(): try: buffer.get_nowait() + logger.debug(f"Removed old frame from buffer for camera {camera_id}") except queue.Empty: pass + buffer.put(frame) + logger.debug(f"Added new frame to buffer for camera {camera_id}") + + # Short sleep to avoid CPU overuse + time.sleep(0.01) + except cv2.error as e: - logging.error(f"OpenCV error for camera {camera_id}: {e}") + logger.error(f"OpenCV error for camera {camera_id}: {e}", exc_info=True) cap.release() time.sleep(reconnect_interval) retries += 1 if retries > max_retries and max_retries != -1: - logging.error(f"Max retries reached after OpenCV error for camera {camera_id}") + logger.error(f"Max retries reached after OpenCV error for camera {camera_id}") break + logger.info(f"Attempting to reopen RTSP stream after OpenCV error for camera: {camera_id}") cap = cv2.VideoCapture(streams[camera_id]["rtsp_url"]) if not cap.isOpened(): - logging.error(f"Failed to reopen RTSP stream for camera {camera_id} after OpenCV error") + logger.error(f"Failed to reopen RTSP stream for camera {camera_id} after OpenCV error") continue + logger.info(f"Successfully reopened RTSP stream after OpenCV error for camera: {camera_id}") except Exception as e: - logging.error(f"Unexpected error for camera {camera_id}: {e}") + logger.error(f"Unexpected error for camera {camera_id}: {str(e)}", exc_info=True) cap.release() break except Exception as e: - logging.error(f"Error in frame_reader thread for camera {camera_id}: {e}") + logger.error(f"Error in frame_reader thread for camera {camera_id}: {str(e)}", exc_info=True) + finally: + logger.info(f"Frame reader thread for camera {camera_id} is exiting") + if cap and cap.isOpened(): + cap.release() async def process_streams(): - logging.info("Started processing streams") + logger.info("Started processing streams") try: while True: start_time = time.time() with streams_lock: current_streams = list(streams.items()) + if current_streams: + logger.debug(f"Processing {len(current_streams)} active streams") + else: + logger.debug("No active streams to process") + for camera_id, stream in current_streams: buffer = stream["buffer"] - if not buffer.empty(): - frame = buffer.get() - with models_lock: - model_tree = models.get(camera_id, {}).get(stream["modelId"]) - key = (camera_id, stream["modelId"]) - persistent_data = persistent_data_dict.get(key, {}) - updated_persistent_data = await handle_detection( - camera_id, stream, frame, websocket, model_tree, persistent_data - ) - persistent_data_dict[key] = updated_persistent_data + if buffer.empty(): + logger.debug(f"Frame buffer is empty for camera {camera_id}") + continue + + logger.debug(f"Got frame from buffer for camera {camera_id}") + frame = buffer.get() + + with models_lock: + model_tree = models.get(camera_id, {}).get(stream["modelId"]) + if not model_tree: + logger.warning(f"Model not found for camera {camera_id}, modelId {stream['modelId']}") + continue + logger.debug(f"Found model tree for camera {camera_id}, modelId {stream['modelId']}") + + key = (camera_id, stream["modelId"]) + persistent_data = persistent_data_dict.get(key, {}) + logger.debug(f"Starting detection for camera {camera_id} with modelId {stream['modelId']}") + updated_persistent_data = await handle_detection( + camera_id, stream, frame, websocket, model_tree, persistent_data + ) + persistent_data_dict[key] = updated_persistent_data + elapsed_time = (time.time() - start_time) * 1000 # ms sleep_time = max(poll_interval - elapsed_time, 0) - logging.debug(f"Elapsed time: {elapsed_time}ms, sleeping for: {sleep_time}ms") + logger.debug(f"Frame processing cycle: {elapsed_time:.2f}ms, sleeping for: {sleep_time:.2f}ms") await asyncio.sleep(sleep_time / 1000.0) except asyncio.CancelledError: - logging.info("Stream processing task cancelled") + logger.info("Stream processing task cancelled") except Exception as e: - logging.error(f"Error in process_streams: {e}") + logger.error(f"Error in process_streams: {str(e)}", exc_info=True) async def send_heartbeat(): while True: @@ -212,17 +307,17 @@ async def detect(websocket: WebSocket): "cameraConnections": camera_connections } await websocket.send_text(json.dumps(state_report)) - logging.debug("Sent stateReport as heartbeat") + logger.debug("Sent stateReport as heartbeat") await asyncio.sleep(HEARTBEAT_INTERVAL) except Exception as e: - logging.error(f"Error sending stateReport heartbeat: {e}") + logger.error(f"Error sending stateReport heartbeat: {e}") break async def on_message(): while True: try: msg = await websocket.receive_text() - logging.debug(f"Received message: {msg}") + logger.debug(f"Received message: {msg}") data = json.loads(msg) msg_type = data.get("type") @@ -236,35 +331,67 @@ async def detect(websocket: WebSocket): if model_url: with models_lock: - if camera_id not in models: - models[camera_id] = {} - if modelId not in models[camera_id]: - logging.info(f"Loading model from {model_url}") + if (camera_id not in models) or (modelId not in models[camera_id]): + logger.info(f"Loading model from {model_url} for camera {camera_id}, modelId {modelId}") extraction_dir = os.path.join("models", camera_id, str(modelId)) os.makedirs(extraction_dir, exist_ok=True) # If model_url is remote, download it first. parsed = urlparse(model_url) if parsed.scheme in ("http", "https"): + logger.info(f"Downloading remote model from {model_url}") local_mpta = os.path.join(extraction_dir, os.path.basename(parsed.path)) + logger.debug(f"Download destination: {local_mpta}") local_path = download_mpta(model_url, local_mpta) if not local_path: - logging.error("Failed to download the remote mpta file.") + logger.error(f"Failed to download the remote mpta file from {model_url}") + error_response = { + "type": "error", + "cameraIdentifier": camera_id, + "error": f"Failed to download model from {model_url}" + } + await websocket.send_json(error_response) continue model_tree = load_pipeline_from_zip(local_path, extraction_dir) else: + logger.info(f"Loading local model from {model_url}") + # Check if file exists before attempting to load + if not os.path.exists(model_url): + logger.error(f"Local model file not found: {model_url}") + logger.debug(f"Current working directory: {os.getcwd()}") + error_response = { + "type": "error", + "cameraIdentifier": camera_id, + "error": f"Model file not found: {model_url}" + } + await websocket.send_json(error_response) + continue model_tree = load_pipeline_from_zip(model_url, extraction_dir) if model_tree is None: - logging.error("Failed to load model from mpta file.") + logger.error(f"Failed to load model {modelId} from mpta file for camera {camera_id}") + error_response = { + "type": "error", + "cameraIdentifier": camera_id, + "error": f"Failed to load model {modelId}" + } + await websocket.send_json(error_response) continue + if camera_id not in models: + models[camera_id] = {} models[camera_id][modelId] = model_tree - logging.info(f"Loaded model {modelId} for camera {camera_id}") - + logger.info(f"Successfully loaded model {modelId} for camera {camera_id}") + success_response = { + "type": "modelLoaded", + "cameraIdentifier": camera_id, + "modelId": modelId + } + await websocket.send_json(success_response) + if camera_id and rtsp_url: with streams_lock: if camera_id not in streams and len(streams) < max_streams: cap = cv2.VideoCapture(rtsp_url) if not cap.isOpened(): - logging.error(f"Failed to open RTSP stream for camera {camera_id}") + logger.error(f"Failed to open RTSP stream for camera {camera_id}") continue buffer = queue.Queue(maxsize=1) stop_event = threading.Event() @@ -280,12 +407,12 @@ async def detect(websocket: WebSocket): "modelId": modelId, "modelName": modelName } - logging.info(f"Subscribed to camera {camera_id} with modelId {modelId}, modelName {modelName}, URL {rtsp_url}") + logger.info(f"Subscribed to camera {camera_id} with modelId {modelId}, modelName {modelName}, URL {rtsp_url}") elif camera_id and camera_id in streams: # If already subscribed, unsubscribe stream = streams.pop(camera_id) stream["cap"].release() - logging.info(f"Unsubscribed from camera {camera_id}") + logger.info(f"Unsubscribed from camera {camera_id}") with models_lock: if camera_id in models and modelId in models[camera_id]: del models[camera_id][modelId] @@ -294,14 +421,14 @@ async def detect(websocket: WebSocket): elif msg_type == "unsubscribe": payload = data.get("payload", {}) camera_id = payload.get("cameraIdentifier") - logging.debug(f"Unsubscribing from camera {camera_id}") + logger.debug(f"Unsubscribing from camera {camera_id}") with streams_lock: if camera_id and camera_id in streams: stream = streams.pop(camera_id) stream["stop_event"].set() stream["thread"].join() stream["cap"].release() - logging.info(f"Unsubscribed from camera {camera_id}") + logger.info(f"Unsubscribed from camera {camera_id}") with models_lock: if camera_id in models: del models[camera_id] @@ -335,14 +462,14 @@ async def detect(websocket: WebSocket): } await websocket.send_text(json.dumps(state_report)) else: - logging.error(f"Unknown message type: {msg_type}") + logger.error(f"Unknown message type: {msg_type}") except json.JSONDecodeError: - logging.error("Received invalid JSON message") + logger.error("Received invalid JSON message") except (WebSocketDisconnect, ConnectionClosedError) as e: - logging.warning(f"WebSocket disconnected: {e}") + logger.warning(f"WebSocket disconnected: {e}") break except Exception as e: - logging.error(f"Error handling message: {e}") + logger.error(f"Error handling message: {e}") break try: @@ -352,7 +479,7 @@ async def detect(websocket: WebSocket): message_task = asyncio.create_task(on_message()) await asyncio.gather(heartbeat_task, message_task) except Exception as e: - logging.error(f"Error in detect websocket: {e}") + logger.error(f"Error in detect websocket: {e}") finally: stream_task.cancel() await stream_task @@ -366,8 +493,8 @@ async def detect(websocket: WebSocket): stream["buffer"].get_nowait() except queue.Empty: pass - logging.info(f"Released camera {camera_id} and cleaned up resources") + logger.info(f"Released camera {camera_id} and cleaned up resources") streams.clear() with models_lock: models.clear() - logging.info("WebSocket connection closed") + logger.info("WebSocket connection closed") diff --git a/siwatsystem/pympta.py b/siwatsystem/pympta.py index 05a566d..2ebc6a6 100644 --- a/siwatsystem/pympta.py +++ b/siwatsystem/pympta.py @@ -6,19 +6,27 @@ import cv2 import requests import zipfile import shutil +import traceback from ultralytics import YOLO from urllib.parse import urlparse +# Create a logger specifically for this module +logger = logging.getLogger("detector_worker.pympta") + def load_pipeline_node(node_config: dict, mpta_dir: str) -> dict: # Recursively load a model node from configuration. model_path = os.path.join(mpta_dir, node_config["modelFile"]) if not os.path.exists(model_path): - logging.error(f"Model file {model_path} not found.") + logger.error(f"Model file {model_path} not found. Current directory: {os.getcwd()}") + logger.error(f"Directory content: {os.listdir(os.path.dirname(model_path))}") raise FileNotFoundError(f"Model file {model_path} not found.") - logging.info(f"Loading model for node {node_config['modelId']} from {model_path}") + logger.info(f"Loading model for node {node_config['modelId']} from {model_path}") model = YOLO(model_path) if torch.cuda.is_available(): + logger.info(f"CUDA available. Moving model {node_config['modelId']} to GPU") model.to("cuda") + else: + logger.info(f"CUDA not available. Using CPU for model {node_config['modelId']}") node = { "modelId": node_config["modelId"], "modelFile": node_config["modelFile"], @@ -28,11 +36,14 @@ def load_pipeline_node(node_config: dict, mpta_dir: str) -> dict: "model": model, "branches": [] } + logger.debug(f"Configured node {node_config['modelId']} with trigger classes: {node['triggerClasses']}") for child in node_config.get("branches", []): + logger.debug(f"Loading branch for parent node {node_config['modelId']}") node["branches"].append(load_pipeline_node(child, mpta_dir)) return node def load_pipeline_from_zip(zip_source: str, target_dir: str) -> dict: + logger.info(f"Attempting to load pipeline from {zip_source} to {target_dir}") os.makedirs(target_dir, exist_ok=True) zip_path = os.path.join(target_dir, "pipeline.mpta") @@ -40,51 +51,121 @@ def load_pipeline_from_zip(zip_source: str, target_dir: str) -> dict: parsed = urlparse(zip_source) if parsed.scheme in ("", "file"): local_path = parsed.path if parsed.scheme == "file" else zip_source + logger.debug(f"Checking if local file exists: {local_path}") if os.path.exists(local_path): try: shutil.copy(local_path, zip_path) - logging.info(f"Copied local .mpta file from {local_path} to {zip_path}") + logger.info(f"Copied local .mpta file from {local_path} to {zip_path}") except Exception as e: - logging.error(f"Failed to copy local .mpta file from {local_path}: {e}") + logger.error(f"Failed to copy local .mpta file from {local_path}: {str(e)}", exc_info=True) return None else: - logging.error(f"Local file {local_path} does not exist.") + logger.error(f"Local file {local_path} does not exist. Current directory: {os.getcwd()}") + # List all subdirectories of models directory to help debugging + if os.path.exists("models"): + logger.error(f"Content of models directory: {os.listdir('models')}") + for root, dirs, files in os.walk("models"): + logger.error(f"Directory {root} contains subdirs: {dirs} and files: {files}") + else: + logger.error("The models directory doesn't exist") return None else: - logging.error("HTTP download functionality has been moved. Use a local file path here.") + logger.error(f"HTTP download functionality has been moved. Use a local file path here. Received: {zip_source}") return None try: + if not os.path.exists(zip_path): + logger.error(f"Zip file not found at expected location: {zip_path}") + return None + + logger.debug(f"Extracting .mpta file from {zip_path} to {target_dir}") + # Extract contents and track the directories created + extracted_dirs = [] with zipfile.ZipFile(zip_path, "r") as zip_ref: + file_list = zip_ref.namelist() + logger.debug(f"Files in .mpta archive: {file_list}") + + # Extract and track the top-level directories + for file_path in file_list: + parts = file_path.split('/') + if len(parts) > 1: + top_dir = parts[0] + if top_dir and top_dir not in extracted_dirs: + extracted_dirs.append(top_dir) + + # Now extract the files zip_ref.extractall(target_dir) - logging.info(f"Extracted .mpta file to {target_dir}") + + logger.info(f"Successfully extracted .mpta file to {target_dir}") + logger.debug(f"Extracted directories: {extracted_dirs}") + + # Check what was actually created after extraction + actual_dirs = [d for d in os.listdir(target_dir) if os.path.isdir(os.path.join(target_dir, d))] + logger.debug(f"Actual directories created: {actual_dirs}") + except zipfile.BadZipFile as e: + logger.error(f"Bad zip file {zip_path}: {str(e)}", exc_info=True) + return None except Exception as e: - logging.error(f"Failed to extract .mpta file: {e}") + logger.error(f"Failed to extract .mpta file {zip_path}: {str(e)}", exc_info=True) return None finally: if os.path.exists(zip_path): os.remove(zip_path) + logger.debug(f"Removed temporary zip file: {zip_path}") + + # Use the first extracted directory if it exists, otherwise use the expected name pipeline_name = os.path.basename(zip_source) pipeline_name = os.path.splitext(pipeline_name)[0] - mpta_dir = os.path.join(target_dir, pipeline_name) + + # Find the directory with pipeline.json + mpta_dir = None + # First try the expected directory name + expected_dir = os.path.join(target_dir, pipeline_name) + if os.path.exists(expected_dir) and os.path.exists(os.path.join(expected_dir, "pipeline.json")): + mpta_dir = expected_dir + logger.debug(f"Found pipeline.json in the expected directory: {mpta_dir}") + else: + # Look through all subdirectories for pipeline.json + for subdir in actual_dirs: + potential_dir = os.path.join(target_dir, subdir) + if os.path.exists(os.path.join(potential_dir, "pipeline.json")): + mpta_dir = potential_dir + logger.info(f"Found pipeline.json in directory: {mpta_dir} (different from expected: {expected_dir})") + break + + if not mpta_dir: + logger.error(f"Could not find pipeline.json in any extracted directory. Directory content: {os.listdir(target_dir)}") + return None + pipeline_json_path = os.path.join(mpta_dir, "pipeline.json") if not os.path.exists(pipeline_json_path): - logging.error("pipeline.json not found in the .mpta file") + logger.error(f"pipeline.json not found in the .mpta file. Files in directory: {os.listdir(mpta_dir)}") return None try: with open(pipeline_json_path, "r") as f: pipeline_config = json.load(f) + logger.info(f"Successfully loaded pipeline configuration from {pipeline_json_path}") + logger.debug(f"Pipeline config: {json.dumps(pipeline_config, indent=2)}") return load_pipeline_node(pipeline_config["pipeline"], mpta_dir) + except json.JSONDecodeError as e: + logger.error(f"Error parsing pipeline.json: {str(e)}", exc_info=True) + return None + except KeyError as e: + logger.error(f"Missing key in pipeline.json: {str(e)}", exc_info=True) + return None except Exception as e: - logging.error(f"Error loading pipeline.json: {e}") + logger.error(f"Error loading pipeline.json: {str(e)}", exc_info=True) return None -def run_pipeline(frame, node: dict, return_bbox: bool = False): +def run_pipeline(frame, node: dict, return_bbox: bool = False, is_last_stage: bool = True): """ Processes the frame with the given pipeline node. When return_bbox is True, the function returns a tuple (detection, bbox) where bbox is (x1,y1,x2,y2) for drawing. Otherwise, returns only the detection. + + The is_last_stage parameter controls whether this node is considered the last + in the pipeline chain. Only the last stage will return detection results. """ try: # Check model type and use appropriate method @@ -92,7 +173,7 @@ def run_pipeline(frame, node: dict, return_bbox: bool = False): if model_task == "classify": # Classification models need to use predict() instead of track() - logging.debug(f"Running classification model: {node.get('modelId')}") + logger.debug(f"Running classification model: {node.get('modelId')}") results = node["model"].predict(frame, stream=False) detection = None best_box = None @@ -109,18 +190,32 @@ def run_pipeline(frame, node: dict, return_bbox: bool = False): "confidence": conf, "id": None # Classification doesn't have tracking IDs } + logger.debug(f"Classification detection: {detection}") + else: + logger.debug(f"Empty classification results for model {node.get('modelId')}") # Classification doesn't produce bounding boxes bbox = None else: # Detection/segmentation models use tracking - logging.debug(f"Running detection/tracking model: {node.get('modelId')}") + logger.debug(f"Running detection/tracking model: {node.get('modelId')}") results = node["model"].track(frame, stream=False, persist=True) detection = None best_box = None max_conf = -1 + # Log raw detection count + detection_count = 0 + for r in results: + if hasattr(r.boxes, 'cpu') and len(r.boxes.cpu()) > 0: + detection_count += len(r.boxes.cpu()) + + if detection_count == 0: + logger.debug(f"Empty detection results (no objects found) for model {node.get('modelId')}") + else: + logger.debug(f"Detection model {node.get('modelId')} found {detection_count} objects") + for r in results: for box in r.boxes: box_cpu = box.cpu() @@ -133,6 +228,11 @@ def run_pipeline(frame, node: dict, return_bbox: bool = False): "id": box.id.item() } best_box = box_cpu + + if detection: + logger.debug(f"Best detection: {detection}") + else: + logger.debug(f"No valid detection with tracking ID for model {node.get('modelId')}") bbox = None # Calculate bbox if best_box exists @@ -144,31 +244,44 @@ def run_pipeline(frame, node: dict, return_bbox: bool = False): x2, y2 = min(w, x2), min(h, y2) if x2 > x1 and y2 > y1: bbox = (x1, y1, x2, y2) + logger.debug(f"Detection bounding box: {bbox}") if node.get("crop", False): frame = frame[y1:y2, x1:x2] + logger.debug(f"Cropped frame to {frame.shape}") + # Check if we should process branches if detection is not None: for branch in node["branches"]: if detection["class"] in branch.get("triggerClasses", []): min_conf = branch.get("minConfidence") if min_conf is not None and detection["confidence"] < min_conf: - logging.debug(f"Confidence {detection['confidence']} below threshold {min_conf} for branch {branch['modelId']}.") + logger.debug(f"Confidence {detection['confidence']} below threshold {min_conf} for branch {branch['modelId']}.") + break + + # If we have branches, this is not the last stage + branch_result = run_pipeline(frame, branch, return_bbox, is_last_stage=True) + + # This node is no longer the last stage, so its results shouldn't be returned + is_last_stage = False + + if branch_result is not None: if return_bbox: - return detection, bbox - return detection - res = run_pipeline(frame, branch, return_bbox) - if res is not None: - if return_bbox: - return res - return res - if return_bbox: - return detection, bbox - return detection + return branch_result + return branch_result + break + + # Return this node's detection only if it's considered the last stage + if is_last_stage: + if return_bbox: + return detection, bbox + return detection + + # No detection or not the last stage if return_bbox: return None, None return None except Exception as e: - logging.error(f"Error running pipeline on node {node.get('modelId')}: {e}") + logger.error(f"Error running pipeline on node {node.get('modelId')}: {e}") if return_bbox: return None, None return None From f6014abb7aa2b99b1255a65e4ac4d55590b3b2ea Mon Sep 17 00:00:00 2001 From: Siwat Sirichai Date: Wed, 28 May 2025 19:31:22 +0700 Subject: [PATCH 04/23] refactor run_pipeline function for improved clarity and efficiency; add trigger class index handling and streamline detection logic --- siwatsystem/pympta.py | 215 +++++++++++++++++++----------------------- 1 file changed, 96 insertions(+), 119 deletions(-) diff --git a/siwatsystem/pympta.py b/siwatsystem/pympta.py index 2ebc6a6..5e32596 100644 --- a/siwatsystem/pympta.py +++ b/siwatsystem/pympta.py @@ -27,10 +27,21 @@ def load_pipeline_node(node_config: dict, mpta_dir: str) -> dict: model.to("cuda") else: logger.info(f"CUDA not available. Using CPU for model {node_config['modelId']}") + + # Prepare trigger class indices for optimization + trigger_classes = node_config.get("triggerClasses", []) + trigger_class_indices = None + if trigger_classes and hasattr(model, "names"): + # Convert class names to indices for the model + trigger_class_indices = [i for i, name in model.names.items() + if name in trigger_classes] + logger.debug(f"Converted trigger classes to indices: {trigger_class_indices}") + node = { "modelId": node_config["modelId"], "modelFile": node_config["modelFile"], - "triggerClasses": node_config.get("triggerClasses", []), + "triggerClasses": trigger_classes, + "triggerClassIndices": trigger_class_indices, "crop": node_config.get("crop", False), "minConfidence": node_config.get("minConfidence", None), "model": model, @@ -158,130 +169,96 @@ def load_pipeline_from_zip(zip_source: str, target_dir: str) -> dict: logger.error(f"Error loading pipeline.json: {str(e)}", exc_info=True) return None -def run_pipeline(frame, node: dict, return_bbox: bool = False, is_last_stage: bool = True): +def run_pipeline(frame, node: dict, return_bbox: bool=False): """ - Processes the frame with the given pipeline node. When return_bbox is True, - the function returns a tuple (detection, bbox) where bbox is (x1,y1,x2,y2) - for drawing. Otherwise, returns only the detection. - - The is_last_stage parameter controls whether this node is considered the last - in the pipeline chain. Only the last stage will return detection results. + - For detection nodes (task != 'classify'): + • runs `track(..., classes=triggerClassIndices)` + • picks top box ≥ minConfidence + • optionally crops & resizes → recurse into child + • else returns (det_dict, bbox) + - For classify nodes: + • runs `predict()` + • returns top (class,confidence) and no bbox """ try: - # Check model type and use appropriate method - model_task = getattr(node["model"], "task", None) - - if model_task == "classify": - # Classification models need to use predict() instead of track() - logger.debug(f"Running classification model: {node.get('modelId')}") + task = getattr(node["model"], "task", None) + + # ─── Classification stage ─────────────────────────────────── + if task == "classify": + # run the classifier and grab its top-1 directly via the Probs API results = node["model"].predict(frame, stream=False) - detection = None - best_box = None - - # Process classification results - for r in results: - probs = r.probs - if probs is not None and len(probs) > 0: - # Get the most confident class - class_id = int(probs.top1) - conf = float(probs.top1conf) - detection = { - "class": node["model"].names[class_id], - "confidence": conf, - "id": None # Classification doesn't have tracking IDs - } - logger.debug(f"Classification detection: {detection}") - else: - logger.debug(f"Empty classification results for model {node.get('modelId')}") - - # Classification doesn't produce bounding boxes - bbox = None - - else: - # Detection/segmentation models use tracking - logger.debug(f"Running detection/tracking model: {node.get('modelId')}") - results = node["model"].track(frame, stream=False, persist=True) - detection = None - best_box = None - max_conf = -1 + # nothing returned? + if not results: + return (None, None) if return_bbox else None - # Log raw detection count - detection_count = 0 - for r in results: - if hasattr(r.boxes, 'cpu') and len(r.boxes.cpu()) > 0: - detection_count += len(r.boxes.cpu()) - - if detection_count == 0: - logger.debug(f"Empty detection results (no objects found) for model {node.get('modelId')}") - else: - logger.debug(f"Detection model {node.get('modelId')} found {detection_count} objects") + # take the first result's probs object + r = results[0] + probs = r.probs + if probs is None: + return (None, None) if return_bbox else None - for r in results: - for box in r.boxes: - box_cpu = box.cpu() - conf = float(box_cpu.conf[0]) - if conf > max_conf and hasattr(box, "id") and box.id is not None: - max_conf = conf - detection = { - "class": node["model"].names[int(box_cpu.cls[0])], - "confidence": conf, - "id": box.id.item() - } - best_box = box_cpu - - if detection: - logger.debug(f"Best detection: {detection}") - else: - logger.debug(f"No valid detection with tracking ID for model {node.get('modelId')}") + # get the top-1 class index and its confidence + top1_idx = int(probs.top1) + top1_conf = float(probs.top1conf) - bbox = None - # Calculate bbox if best_box exists - if detection and best_box is not None: - coords = best_box.xyxy[0] - x1, y1, x2, y2 = map(int, coords) - h, w = frame.shape[:2] - x1, y1 = max(0, x1), max(0, y1) - x2, y2 = min(w, x2), min(h, y2) - if x2 > x1 and y2 > y1: - bbox = (x1, y1, x2, y2) - logger.debug(f"Detection bounding box: {bbox}") - if node.get("crop", False): - frame = frame[y1:y2, x1:x2] - logger.debug(f"Cropped frame to {frame.shape}") + det = { + "class": node["model"].names[top1_idx], + "confidence": top1_conf, + "id": None + } + return (det, None) if return_bbox else det + + + # ─── Detection stage ──────────────────────────────────────── + # only look for your triggerClasses + tk = node["triggerClassIndices"] + res = node["model"].track( + frame, + stream=False, + persist=True, + **({"classes": tk} if tk else {}) + )[0] + + dets, boxes = [], [] + for box in res.boxes: + conf = float(box.cpu().conf[0]) + cid = int(box.cpu().cls[0]) + name = node["model"].names[cid] + if conf < node["minConfidence"]: + continue + xy = box.cpu().xyxy[0] + x1,y1,x2,y2 = map(int, xy) + dets.append({"class": name, "confidence": conf, + "id": box.id.item() if hasattr(box, "id") else None}) + boxes.append((x1, y1, x2, y2)) + + if not dets: + return (None, None) if return_bbox else None + + # take highest‐confidence + best_idx = max(range(len(dets)), key=lambda i: dets[i]["confidence"]) + best_det = dets[best_idx] + best_box = boxes[best_idx] + + # ─── Branch (classification) ─────────────────────────────── + for br in node["branches"]: + if (best_det["class"] in br["triggerClasses"] + and best_det["confidence"] >= br["minConfidence"]): + # crop if requested + sub = frame + if br["crop"]: + x1,y1,x2,y2 = best_box + sub = frame[y1:y2, x1:x2] + sub = cv2.resize(sub, (224, 224)) + + det2, _ = run_pipeline(sub, br, return_bbox=True) + if det2: + # return classification result + original bbox + return (det2, best_box) if return_bbox else det2 + + # ─── No branch matched → return this detection ───────────── + return (best_det, best_box) if return_bbox else best_det - # Check if we should process branches - if detection is not None: - for branch in node["branches"]: - if detection["class"] in branch.get("triggerClasses", []): - min_conf = branch.get("minConfidence") - if min_conf is not None and detection["confidence"] < min_conf: - logger.debug(f"Confidence {detection['confidence']} below threshold {min_conf} for branch {branch['modelId']}.") - break - - # If we have branches, this is not the last stage - branch_result = run_pipeline(frame, branch, return_bbox, is_last_stage=True) - - # This node is no longer the last stage, so its results shouldn't be returned - is_last_stage = False - - if branch_result is not None: - if return_bbox: - return branch_result - return branch_result - break - - # Return this node's detection only if it's considered the last stage - if is_last_stage: - if return_bbox: - return detection, bbox - return detection - - # No detection or not the last stage - if return_bbox: - return None, None - return None except Exception as e: - logger.error(f"Error running pipeline on node {node.get('modelId')}: {e}") - if return_bbox: - return None, None - return None + logging.error(f"Error in node {node.get('modelId')}: {e}") + return (None, None) if return_bbox else None From a6cf9c20c67632501f2b0e18e1b5a3aa2f30512b Mon Sep 17 00:00:00 2001 From: Siwat Sirichai Date: Wed, 28 May 2025 20:19:42 +0700 Subject: [PATCH 05/23] enhance detection logic to prioritize highest confidence results; improve logging for detection data and heartbeat state reports --- app.py | 57 +++++++++++++++++++++++++++++++++++++++++++++++++++------ 1 file changed, 51 insertions(+), 6 deletions(-) diff --git a/app.py b/app.py index 53aa8bf..fcd2c10 100644 --- a/app.py +++ b/app.py @@ -114,22 +114,67 @@ async def detect(websocket: WebSocket): process_time = (time.time() - start_time) * 1000 logger.debug(f"Detection for camera {camera_id} completed in {process_time:.2f}ms") + # Log the raw detection result for debugging + logger.debug(f"Raw detection result for camera {camera_id}:\n{json.dumps(detection_result, indent=2, default=str)}") + + # Direct class result (no detections/classifications structure) + if detection_result and isinstance(detection_result, dict) and "class" in detection_result and "confidence" in detection_result: + highest_confidence_detection = { + "class": detection_result.get("class", "none"), + "confidence": detection_result.get("confidence", 1.0), + "box": [0, 0, 0, 0] # Empty bounding box for classifications + } + # Handle case when no detections found or result is empty + elif not detection_result or not detection_result.get("detections"): + # Check if we have classification results + if detection_result and detection_result.get("classifications"): + # Get the highest confidence classification + classifications = detection_result.get("classifications", []) + highest_confidence_class = max(classifications, key=lambda x: x.get("confidence", 0)) if classifications else None + + if highest_confidence_class: + highest_confidence_detection = { + "class": highest_confidence_class.get("class", "none"), + "confidence": highest_confidence_class.get("confidence", 1.0), + "box": [0, 0, 0, 0] # Empty bounding box for classifications + } + else: + highest_confidence_detection = { + "class": "none", + "confidence": 1.0, + "box": [0, 0, 0, 0] + } + else: + highest_confidence_detection = { + "class": "none", + "confidence": 1.0, + "box": [0, 0, 0, 0] + } + else: + # Find detection with highest confidence + detections = detection_result.get("detections", []) + highest_confidence_detection = max(detections, key=lambda x: x.get("confidence", 0)) if detections else { + "class": "none", + "confidence": 1.0, + "box": [0, 0, 0, 0] + } + detection_data = { "type": "imageDetection", "cameraIdentifier": camera_id, "timestamp": time.time(), "data": { - "detection": detection_result if detection_result else None, + "detection": highest_confidence_detection, # Send only the highest confidence detection "modelId": stream["modelId"], "modelName": stream["modelName"] } } - if detection_result: - detection_count = len(detection_result.get("detections", [])) - logger.info(f"Camera {camera_id}: Detected {detection_count} objects with model {stream['modelName']}") - + if highest_confidence_detection["class"] != "none": + logger.info(f"Camera {camera_id}: Detected {highest_confidence_detection['class']} with confidence {highest_confidence_detection['confidence']:.2f} using model {stream['modelName']}") + await websocket.send_json(detection_data) + logger.debug(f"Sent detection data to client for camera {camera_id}:\n{json.dumps(detection_data, indent=2)}") return persistent_data except Exception as e: logger.error(f"Error in handle_detection for camera {camera_id}: {str(e)}", exc_info=True) @@ -307,7 +352,7 @@ async def detect(websocket: WebSocket): "cameraConnections": camera_connections } await websocket.send_text(json.dumps(state_report)) - logger.debug("Sent stateReport as heartbeat") + logger.debug(f"Sent stateReport as heartbeat: CPU {cpu_usage:.1f}%, Memory {memory_usage:.1f}%, {len(camera_connections)} active cameras") await asyncio.sleep(HEARTBEAT_INTERVAL) except Exception as e: logger.error(f"Error sending stateReport heartbeat: {e}") From b5ae2801c1edc62e42a2bcbed0792abfe9a6209c Mon Sep 17 00:00:00 2001 From: Siwat Sirichai Date: Sat, 28 Jun 2025 17:44:25 +0700 Subject: [PATCH 06/23] add snapshot fetching functionality; implement snapshot reader for HTTP/HTTPS URLs with retry logic --- app.py | 172 +++++++++++++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 148 insertions(+), 24 deletions(-) diff --git a/app.py b/app.py index fcd2c10..fc492e5 100644 --- a/app.py +++ b/app.py @@ -5,6 +5,7 @@ import time import queue import torch import cv2 +import numpy as np import base64 import logging import threading @@ -98,6 +99,28 @@ def download_mpta(url: str, dest_path: str) -> str: logger.error(f"Exception downloading mpta file from {url}: {str(e)}", exc_info=True) return None +# Add helper to fetch snapshot image from HTTP/HTTPS URL +def fetch_snapshot(url: str): + try: + response = requests.get(url, timeout=10) + if response.status_code == 200: + # Convert response content to numpy array + nparr = np.frombuffer(response.content, np.uint8) + # Decode image + frame = cv2.imdecode(nparr, cv2.IMREAD_COLOR) + if frame is not None: + logger.debug(f"Successfully fetched snapshot from {url}, shape: {frame.shape}") + return frame + else: + logger.error(f"Failed to decode image from snapshot URL: {url}") + return None + else: + logger.error(f"Failed to fetch snapshot (status code {response.status_code}): {url}") + return None + except Exception as e: + logger.error(f"Exception fetching snapshot from {url}: {str(e)}") + return None + #################################################### # Detection and frame processing functions #################################################### @@ -276,6 +299,72 @@ async def detect(websocket: WebSocket): if cap and cap.isOpened(): cap.release() + def snapshot_reader(camera_id, snapshot_url, snapshot_interval, buffer, stop_event): + """Frame reader that fetches snapshots from HTTP/HTTPS URL at specified intervals""" + retries = 0 + logger.info(f"Starting snapshot reader thread for camera {camera_id} from {snapshot_url}") + frame_count = 0 + last_log_time = time.time() + + try: + interval_seconds = snapshot_interval / 1000.0 # Convert milliseconds to seconds + logger.info(f"Snapshot interval for camera {camera_id}: {interval_seconds}s") + + while not stop_event.is_set(): + try: + start_time = time.time() + frame = fetch_snapshot(snapshot_url) + + if frame is None: + logger.warning(f"Failed to fetch snapshot for camera: {camera_id}, retry {retries+1}/{max_retries}") + retries += 1 + if retries > max_retries and max_retries != -1: + logger.error(f"Max retries reached for snapshot camera: {camera_id}, stopping reader") + break + time.sleep(min(interval_seconds, reconnect_interval)) + continue + + # Successfully fetched a frame + frame_count += 1 + current_time = time.time() + # Log frame stats every 5 seconds + if current_time - last_log_time > 5: + logger.info(f"Camera {camera_id}: Fetched {frame_count} snapshots in the last {current_time - last_log_time:.1f} seconds") + frame_count = 0 + last_log_time = current_time + + logger.debug(f"Successfully fetched snapshot from camera {camera_id}, shape: {frame.shape}") + retries = 0 + + # Overwrite old frame if buffer is full + if not buffer.empty(): + try: + buffer.get_nowait() + logger.debug(f"Removed old snapshot from buffer for camera {camera_id}") + except queue.Empty: + pass + + buffer.put(frame) + logger.debug(f"Added new snapshot to buffer for camera {camera_id}") + + # Wait for the specified interval + elapsed = time.time() - start_time + sleep_time = max(interval_seconds - elapsed, 0) + if sleep_time > 0: + time.sleep(sleep_time) + + except Exception as e: + logger.error(f"Unexpected error fetching snapshot for camera {camera_id}: {str(e)}", exc_info=True) + retries += 1 + if retries > max_retries and max_retries != -1: + logger.error(f"Max retries reached after error for snapshot camera {camera_id}") + break + time.sleep(min(interval_seconds, reconnect_interval)) + except Exception as e: + logger.error(f"Error in snapshot_reader thread for camera {camera_id}: {str(e)}", exc_info=True) + finally: + logger.info(f"Snapshot reader thread for camera {camera_id} is exiting") + async def process_streams(): logger.info("Started processing streams") try: @@ -370,6 +459,8 @@ async def detect(websocket: WebSocket): payload = data.get("payload", {}) camera_id = payload.get("cameraIdentifier") rtsp_url = payload.get("rtspUrl") + snapshot_url = payload.get("snapshotUrl") + snapshot_interval = payload.get("snapshotInterval") # in milliseconds model_url = payload.get("modelUrl") # may be remote or local modelId = payload.get("modelId") modelName = payload.get("modelName") @@ -430,34 +521,60 @@ async def detect(websocket: WebSocket): "modelId": modelId } await websocket.send_json(success_response) - - if camera_id and rtsp_url: + if camera_id and (rtsp_url or snapshot_url): with streams_lock: if camera_id not in streams and len(streams) < max_streams: - cap = cv2.VideoCapture(rtsp_url) - if not cap.isOpened(): - logger.error(f"Failed to open RTSP stream for camera {camera_id}") - continue buffer = queue.Queue(maxsize=1) stop_event = threading.Event() - thread = threading.Thread(target=frame_reader, args=(camera_id, cap, buffer, stop_event)) - thread.daemon = True - thread.start() - streams[camera_id] = { - "cap": cap, - "buffer": buffer, - "thread": thread, - "rtsp_url": rtsp_url, - "stop_event": stop_event, - "modelId": modelId, - "modelName": modelName - } - logger.info(f"Subscribed to camera {camera_id} with modelId {modelId}, modelName {modelName}, URL {rtsp_url}") + + # Choose between snapshot and RTSP based on availability + if snapshot_url and snapshot_interval: + logger.info(f"Using snapshot mode for camera {camera_id}: {snapshot_url}") + thread = threading.Thread(target=snapshot_reader, args=(camera_id, snapshot_url, snapshot_interval, buffer, stop_event)) + thread.daemon = True + thread.start() + streams[camera_id] = { + "buffer": buffer, + "thread": thread, + "snapshot_url": snapshot_url, + "snapshot_interval": snapshot_interval, + "stop_event": stop_event, + "modelId": modelId, + "modelName": modelName, + "mode": "snapshot" + } + logger.info(f"Subscribed to camera {camera_id} (snapshot mode) with modelId {modelId}, modelName {modelName}, URL {snapshot_url}, interval {snapshot_interval}ms") + elif rtsp_url: + logger.info(f"Using RTSP mode for camera {camera_id}: {rtsp_url}") + cap = cv2.VideoCapture(rtsp_url) + if not cap.isOpened(): + logger.error(f"Failed to open RTSP stream for camera {camera_id}") + continue + thread = threading.Thread(target=frame_reader, args=(camera_id, cap, buffer, stop_event)) + thread.daemon = True + thread.start() + streams[camera_id] = { + "cap": cap, + "buffer": buffer, + "thread": thread, + "rtsp_url": rtsp_url, + "stop_event": stop_event, + "modelId": modelId, + "modelName": modelName, + "mode": "rtsp" + } + logger.info(f"Subscribed to camera {camera_id} (RTSP mode) with modelId {modelId}, modelName {modelName}, URL {rtsp_url}") + else: + logger.error(f"No valid URL provided for camera {camera_id}") + continue elif camera_id and camera_id in streams: - # If already subscribed, unsubscribe + # If already subscribed, unsubscribe first stream = streams.pop(camera_id) - stream["cap"].release() - logger.info(f"Unsubscribed from camera {camera_id}") + stream["stop_event"].set() + stream["thread"].join() + if "cap" in stream: + stream["cap"].release() + logger.info(f"Unsubscribed from camera {camera_id} for resubscription") with models_lock: if camera_id in models and modelId in models[camera_id]: del models[camera_id][modelId] @@ -472,7 +589,12 @@ async def detect(websocket: WebSocket): stream = streams.pop(camera_id) stream["stop_event"].set() stream["thread"].join() - stream["cap"].release() + # Only release cap if it exists (RTSP mode) + if "cap" in stream: + stream["cap"].release() + logger.info(f"Released RTSP capture for camera {camera_id}") + else: + logger.info(f"Released snapshot reader for camera {camera_id}") logger.info(f"Unsubscribed from camera {camera_id}") with models_lock: if camera_id in models: @@ -532,7 +654,9 @@ async def detect(websocket: WebSocket): for camera_id, stream in streams.items(): stream["stop_event"].set() stream["thread"].join() - stream["cap"].release() + # Only release cap if it exists (RTSP mode) + if "cap" in stream: + stream["cap"].release() while not stream["buffer"].empty(): try: stream["buffer"].get_nowait() From 22370e2040ca895ae39ce6904a9e581cc86e4925 Mon Sep 17 00:00:00 2001 From: Siwat Sirichai Date: Sun, 6 Jul 2025 21:27:17 +0700 Subject: [PATCH 07/23] add REST API endpoint for image retrieval; implement error handling and response formatting --- app.py | 42 +++++- app_single.py | 366 -------------------------------------------------- 2 files changed, 41 insertions(+), 367 deletions(-) delete mode 100644 app_single.py diff --git a/app.py b/app.py index fc492e5..5386496 100644 --- a/app.py +++ b/app.py @@ -14,8 +14,9 @@ import asyncio import psutil import zipfile from urllib.parse import urlparse -from fastapi import FastAPI, WebSocket +from fastapi import FastAPI, WebSocket, HTTPException from fastapi.websockets import WebSocketDisconnect +from fastapi.responses import Response from websockets.exceptions import ConnectionClosedError from ultralytics import YOLO @@ -121,6 +122,45 @@ def fetch_snapshot(url: str): logger.error(f"Exception fetching snapshot from {url}: {str(e)}") return None +#################################################### +# REST API endpoint for image retrieval +#################################################### +@app.get("/camera/{camera_id}/image") +async def get_camera_image(camera_id: str): + """ + Get the current frame from a camera as JPEG image + """ + try: + with streams_lock: + if camera_id not in streams: + raise HTTPException(status_code=404, detail=f"Camera {camera_id} not found or not active") + + stream = streams[camera_id] + buffer = stream["buffer"] + + if buffer.empty(): + raise HTTPException(status_code=404, detail=f"No frame available for camera {camera_id}") + + # Get the latest frame (non-blocking) + try: + frame = buffer.queue[-1] # Get the most recent frame without removing it + except IndexError: + raise HTTPException(status_code=404, detail=f"No frame available for camera {camera_id}") + + # Encode frame as JPEG + success, buffer_img = cv2.imencode('.jpg', frame, [cv2.IMWRITE_JPEG_QUALITY, 85]) + if not success: + raise HTTPException(status_code=500, detail="Failed to encode image as JPEG") + + # Return image as binary response + return Response(content=buffer_img.tobytes(), media_type="image/jpeg") + + except HTTPException: + raise + except Exception as e: + logger.error(f"Error retrieving image for camera {camera_id}: {str(e)}", exc_info=True) + raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}") + #################################################### # Detection and frame processing functions #################################################### diff --git a/app_single.py b/app_single.py deleted file mode 100644 index f0c8266..0000000 --- a/app_single.py +++ /dev/null @@ -1,366 +0,0 @@ -from typing import List -from fastapi import FastAPI, WebSocket -from fastapi.websockets import WebSocketDisconnect -from websockets.exceptions import ConnectionClosedError -from ultralytics import YOLO -import torch -import cv2 -import base64 -import numpy as np -import json -import logging -import threading -import queue -import os -import requests -from urllib.parse import urlparse -import asyncio -import psutil - -app = FastAPI() - -models = {} - -with open("config.json", "r") as f: - config = json.load(f) - -poll_interval = config.get("poll_interval_ms", 100) -reconnect_interval = config.get("reconnect_interval_sec", 5) -TARGET_FPS = config.get("target_fps", 10) -poll_interval = 1000 / TARGET_FPS -logging.info(f"Poll interval: {poll_interval}ms") -max_streams = config.get("max_streams", 5) -max_retries = config.get("max_retries", 3) - -# Configure logging -logging.basicConfig( - level=logging.DEBUG, - format="%(asctime)s [%(levelname)s] %(message)s", - handlers=[ - logging.FileHandler("app.log"), - logging.StreamHandler() - ] -) - -# Ensure the models directory exists -os.makedirs("models", exist_ok=True) - -# Add constants for heartbeat -HEARTBEAT_INTERVAL = 2 # seconds -WORKER_TIMEOUT_MS = 10000 - -# Add a lock for thread-safe operations on shared resources -streams_lock = threading.Lock() -models_lock = threading.Lock() - -@app.websocket("/") -async def detect(websocket: WebSocket): - import asyncio - import time - - logging.info("WebSocket connection accepted") - - streams = {} - - # This function is user-modifiable - # Save data you want to persist across frames in the persistent_data dictionary - async def handle_detection(camera_id, stream, frame, websocket, model: YOLO, persistent_data): - try: - highest_conf_box = None - max_conf = -1 - - for r in model.track(frame, stream=False, persist=True): - for box in r.boxes: - box_cpu = box.cpu() - conf = float(box_cpu.conf[0]) - if conf > max_conf and hasattr(box, "id") and box.id is not None: - max_conf = conf - highest_conf_box = { - "class": model.names[int(box_cpu.cls[0])], - "confidence": conf, - "id": box.id.item(), - } - - # Broadcast to all subscribers of this URL - detection_data = { - "type": "imageDetection", - "cameraIdentifier": camera_id, - "timestamp": time.time(), - "data": { - "detections": highest_conf_box if highest_conf_box else None, - "modelId": stream['modelId'], - "modelName": stream['modelName'] - } - } - logging.debug(f"Sending detection data for camera {camera_id}: {detection_data}") - await websocket.send_json(detection_data) - return persistent_data - except Exception as e: - logging.error(f"Error in handle_detection for camera {camera_id}: {e}") - return persistent_data - - def frame_reader(camera_id, cap, buffer, stop_event): - import time - retries = 0 - try: - while not stop_event.is_set(): - try: - ret, frame = cap.read() - if not ret: - logging.warning(f"Connection lost for camera: {camera_id}, retry {retries+1}/{max_retries}") - cap.release() - time.sleep(reconnect_interval) - retries += 1 - if retries > max_retries and max_retries != -1: - logging.error(f"Max retries reached for camera: {camera_id}") - break - # Re-open the VideoCapture - cap = cv2.VideoCapture(streams[camera_id]['rtsp_url']) - if not cap.isOpened(): - logging.error(f"Failed to reopen RTSP stream for camera: {camera_id}") - continue - continue - retries = 0 # Reset on success - if not buffer.empty(): - try: - buffer.get_nowait() # Discard the old frame - except queue.Empty: - pass - buffer.put(frame) - except cv2.error as e: - logging.error(f"OpenCV error for camera {camera_id}: {e}") - cap.release() - time.sleep(reconnect_interval) - retries += 1 - if retries > max_retries and max_retries != -1: - logging.error(f"Max retries reached after OpenCV error for camera: {camera_id}") - break - # Re-open the VideoCapture - cap = cv2.VideoCapture(streams[camera_id]['rtsp_url']) - if not cap.isOpened(): - logging.error(f"Failed to reopen RTSP stream for camera {camera_id} after OpenCV error") - continue - except Exception as e: - logging.error(f"Unexpected error for camera {camera_id}: {e}") - cap.release() - break - except Exception as e: - logging.error(f"Error in frame_reader thread for camera {camera_id}: {e}") - - async def process_streams(): - global models - logging.info("Started processing streams") - persistent_data_dict = {} - try: - while True: - start_time = time.time() - # Round-robin processing - with streams_lock: - current_streams = list(streams.items()) - for camera_id, stream in current_streams: - buffer = stream['buffer'] - if not buffer.empty(): - frame = buffer.get() - with models_lock: - model = models.get(camera_id, {}).get(stream['modelId']) - key = (camera_id, stream['modelId']) - persistent_data = persistent_data_dict.get(key, {}) - updated_persistent_data = await handle_detection(camera_id, stream, frame, websocket, model, persistent_data) - persistent_data_dict[key] = updated_persistent_data - elapsed_time = (time.time() - start_time) * 1000 # in ms - sleep_time = max(poll_interval - elapsed_time, 0) - logging.debug(f"Elapsed time: {elapsed_time}ms, sleeping for: {sleep_time}ms") - await asyncio.sleep(sleep_time / 1000.0) - except asyncio.CancelledError: - logging.info("Stream processing task cancelled") - except Exception as e: - logging.error(f"Error in process_streams: {e}") - - async def send_heartbeat(): - while True: - try: - cpu_usage = psutil.cpu_percent() - memory_usage = psutil.virtual_memory().percent - if torch.cuda.is_available(): - gpu_usage = torch.cuda.memory_allocated() / (1024 ** 2) # Convert to MB - gpu_memory_usage = torch.cuda.memory_reserved() / (1024 ** 2) # Convert to MB - else: - gpu_usage = None - gpu_memory_usage = None - - camera_connections = [ - { - "cameraIdentifier": camera_id, - "modelId": stream['modelId'], - "modelName": stream['modelName'], - "online": True - } - for camera_id, stream in streams.items() - ] - - state_report = { - "type": "stateReport", - "cpuUsage": cpu_usage, - "memoryUsage": memory_usage, - "gpuUsage": gpu_usage, - "gpuMemoryUsage": gpu_memory_usage, - "cameraConnections": camera_connections - } - await websocket.send_text(json.dumps(state_report)) - logging.debug("Sent stateReport as heartbeat") - await asyncio.sleep(HEARTBEAT_INTERVAL) - except Exception as e: - logging.error(f"Error sending stateReport heartbeat: {e}") - break - - async def on_message(): - global models - while True: - try: - msg = await websocket.receive_text() - logging.debug(f"Received message: {msg}") - print(f"Received message: {msg}") - data = json.loads(msg) - msg_type = data.get("type") - - if msg_type == "subscribe": - payload = data.get("payload", {}) - camera_id = payload.get("cameraIdentifier") - rtsp_url = payload.get("rtspUrl") - model_url = payload.get("modelUrl") - modelId = payload.get("modelId") - modelName = payload.get("modelName") - - if model_url: - with models_lock: - if camera_id not in models: - models[camera_id] = {} - if modelId not in models[camera_id]: - print(f"Downloading model from {model_url}") - parsed_url = urlparse(model_url) - filename = os.path.basename(parsed_url.path) - model_filename = os.path.join("models", filename) - # Download the model - response = requests.get(model_url, stream=True) - if response.status_code == 200: - with open(model_filename, 'wb') as f: - for chunk in response.iter_content(chunk_size=8192): - f.write(chunk) - logging.info(f"Downloaded model from {model_url} to {model_filename}") - model = YOLO(model_filename) - if torch.cuda.is_available(): - model.to('cuda') - models[camera_id][modelId] = model - logging.info(f"Loaded model {modelId} for camera {camera_id}") - else: - logging.error(f"Failed to download model from {model_url}") - continue - if camera_id and rtsp_url: - with streams_lock: - if camera_id not in streams and len(streams) < max_streams: - cap = cv2.VideoCapture(rtsp_url) - if not cap.isOpened(): - logging.error(f"Failed to open RTSP stream for camera {camera_id}") - continue - buffer = queue.Queue(maxsize=1) - stop_event = threading.Event() - thread = threading.Thread(target=frame_reader, args=(camera_id, cap, buffer, stop_event)) - thread.daemon = True - thread.start() - streams[camera_id] = { - 'cap': cap, - 'buffer': buffer, - 'thread': thread, - 'rtsp_url': rtsp_url, - 'stop_event': stop_event, - 'modelId': modelId, - 'modelName': modelName - } - logging.info(f"Subscribed to camera {camera_id} with modelId {modelId}, modelName {modelName} and URL {rtsp_url}") - elif camera_id and camera_id in streams: - stream = streams.pop(camera_id) - stream['cap'].release() - logging.info(f"Unsubscribed from camera {camera_id}") - if camera_id in models and modelId in models[camera_id]: - del models[camera_id][modelId] - if not models[camera_id]: - del models[camera_id] - elif msg_type == "unsubscribe": - payload = data.get("payload", {}) - camera_id = payload.get("cameraIdentifier") - logging.debug(f"Unsubscribing from camera {camera_id}") - with streams_lock: - if camera_id and camera_id in streams: - stream = streams.pop(camera_id) - stream['stop_event'].set() - stream['thread'].join() - stream['cap'].release() - logging.info(f"Unsubscribed from camera {camera_id}") - if camera_id in models and modelId in models[camera_id]: - del models[camera_id][modelId] - if not models[camera_id]: - del models[camera_id] - elif msg_type == "requestState": - # Handle state request - cpu_usage = psutil.cpu_percent() - memory_usage = psutil.virtual_memory().percent - if torch.cuda.is_available(): - gpu_usage = torch.cuda.memory_allocated() / (1024 ** 2) # Convert to MB - gpu_memory_usage = torch.cuda.memory_reserved() / (1024 ** 2) # Convert to MB - else: - gpu_usage = None - gpu_memory_usage = None - - camera_connections = [ - { - "cameraIdentifier": camera_id, - "modelId": stream['modelId'], - "modelName": stream['modelName'], - "online": True - } - for camera_id, stream in streams.items() - ] - - state_report = { - "type": "stateReport", - "cpuUsage": cpu_usage, - "memoryUsage": memory_usage, - "gpuUsage": gpu_usage, - "gpuMemoryUsage": gpu_memory_usage, - "cameraConnections": camera_connections - } - await websocket.send_text(json.dumps(state_report)) - else: - logging.error(f"Unknown message type: {msg_type}") - except json.JSONDecodeError: - logging.error("Received invalid JSON message") - except (WebSocketDisconnect, ConnectionClosedError) as e: - logging.warning(f"WebSocket disconnected: {e}") - break - except Exception as e: - logging.error(f"Error handling message: {e}") - break - - try: - await websocket.accept() - task = asyncio.create_task(process_streams()) - heartbeat_task = asyncio.create_task(send_heartbeat()) - message_task = asyncio.create_task(on_message()) - - await asyncio.gather(heartbeat_task, message_task) - except Exception as e: - logging.error(f"Error in detect websocket: {e}") - finally: - task.cancel() - await task - with streams_lock: - for camera_id, stream in streams.items(): - stream['stop_event'].set() - stream['thread'].join() - stream['cap'].release() - stream['buffer'].queue.clear() - logging.info(f"Released camera {camera_id} and cleaned up resources") - streams.clear() - with models_lock: - models.clear() - logging.info("WebSocket connection closed") \ No newline at end of file From 5cf1bf08ccc016a988e4059bf1d97749b3912928 Mon Sep 17 00:00:00 2001 From: Siwat Sirichai Date: Sun, 13 Jul 2025 19:39:17 +0700 Subject: [PATCH 08/23] add WebSocket communication protocol documentation for detector worker; outline connection, message types, and dynamic configuration --- worker.md | 362 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 362 insertions(+) create mode 100644 worker.md diff --git a/worker.md b/worker.md new file mode 100644 index 0000000..6101f2b --- /dev/null +++ b/worker.md @@ -0,0 +1,362 @@ +# Worker Communication Protocol + +This document outlines the WebSocket-based communication protocol between the CMS backend and a detector worker. As a worker developer, your primary responsibility is to implement a WebSocket server that adheres to this protocol. + +## 1. Connection + +The worker must run a WebSocket server, preferably on port `8000`. The backend system, which is managed by a container orchestration service, will automatically discover and establish a WebSocket connection to your worker. + +Upon a successful connection from the backend, you should begin sending `stateReport` messages as heartbeats. + +## 2. Communication Overview + +Communication is bidirectional and asynchronous. All messages are JSON objects with a `type` field that indicates the message's purpose, and an optional `payload` field containing the data. + +- **Worker -> Backend:** You will send messages to the backend to report status, forward detection events, or request changes to session data. +- **Backend -> Worker:** The backend will send commands to you to manage camera subscriptions. + +## 3. Dynamic Configuration via MPTA File + +To enable modularity and dynamic configuration, the backend will send you a URL to a `.mpta` file when it issues a `subscribe` command. This file is a renamed `.zip` archive that contains everything your worker needs to perform its task. + +**Your worker is responsible for:** +1. Fetching this file from the provided URL. +2. Extracting its contents. +3. Interpreting the contents to configure its internal pipeline. + +**The contents of the `.mpta` file are entirely up to the user who configures the model in the CMS.** This allows for maximum flexibility. For example, the archive could contain: + +* **AI/ML Models:** Pre-trained models for libraries like TensorFlow, PyTorch, or ONNX. +* **Configuration Files:** A `config.json` or `pipeline.yaml` that defines a sequence of operations, specifies model paths, or sets detection thresholds. +* **Scripts:** Custom Python scripts for pre-processing or post-processing. +* **API Integration Details:** A JSON file with endpoint information and credentials for interacting with third-party detection services. + +Essentially, the `.mpta` file is a self-contained package that tells your worker *how* to process the video stream for a given subscription. + +## 4. Messages from Worker to Backend + +These are the messages your worker is expected to send to the backend. + +### 4.1. State Report (Heartbeat) + +This message is crucial for the backend to monitor your worker's health and status, including GPU usage. + +- **Type:** `stateReport` +- **When to Send:** Periodically (e.g., every 2 seconds) after a connection is established. + +**Payload:** + +```json +{ + "type": "stateReport", + "cpuUsage": 75.5, + "memoryUsage": 40.2, + "gpuUsage": 60.0, + "gpuMemoryUsage": 25.1, + "cameraConnections": [ + { + "cameraIdentifier": "cam-001", + "modelId": 101, + "modelName": "General Object Detection", + "online": true + } + ] +} +``` + +### 4.2. Image Detection + +Sent when the worker detects a relevant object. The `detection` object should be flat and contain key-value pairs corresponding to the detected attributes. + +- **Type:** `imageDetection` + +**Payload Example:** + +```json +{ + "type": "imageDetection", + "cameraIdentifier": "cam-001", + "timestamp": "2025-07-14T12:34:56.789Z", + "data": { + "detection": { + "carModel": "Civic", + "carBrand": "Honda", + "carYear": 2023, + "bodyType": "Sedan", + "licensePlateText": "ABCD1234", + "licensePlateConfidence": 0.95 + }, + "modelId": 101, + "modelName": "US-LPR-and-Vehicle-ID" + } +} +``` + +### 4.3. Patch Session + +> **Note:** Patch messages are only used when the worker can't keep up and needs to retroactively send detections. Normally, detections should be sent in real-time using `imageDetection` messages. Use `patchSession` only to update session data after the fact. + +Allows the worker to request a modification to an active session's data. The `data` payload must be a partial object of the `DisplayPersistentData` structure. + +- **Type:** `patchSession` + +**Payload Example:** + +```json +{ + "type": "patchSession", + "sessionId": 12345, + "data": { + "currentCar": { + "carModel": "Civic", + "carBrand": "Honda", + "licensePlateText": "ABCD1234" + } + } +} +``` + +The backend will respond with a `patchSessionResult` command. + +#### `DisplayPersistentData` Structure + +The `data` object in the `patchSession` message is merged with the existing `DisplayPersistentData` on the backend. Here is its structure: + +```typescript +interface DisplayPersistentData { + progressionStage: "welcome" | "car_fueling" | "car_waitpayment" | "car_postpayment" | null; + qrCode: string | null; + adsPlayback: { + playlistSlotOrder: number; // The 'order' of the current slot + adsId: number | null; + adsUrl: string | null; + } | null; + currentCar: { + carModel?: string; + carBrand?: string; + carYear?: number; + bodyType?: string; + licensePlateText?: string; + licensePlateType?: string; + } | null; + fuelPump: { /* FuelPumpData structure */ } | null; + weatherData: { /* WeatherResponse structure */ } | null; + sessionId: number | null; +} +``` + +#### Patching Behavior + +- The patch is a **deep merge**. +- **`undefined`** values are ignored. +- **`null`** values will set the corresponding field to `null`. +- Nested objects are merged recursively. + +## 5. Commands from Backend to Worker + +These are the commands your worker will receive from the backend. + +### 5.1. Subscribe to Camera + +Instructs the worker to process a camera's RTSP stream using the configuration from the specified `.mpta` file. + +- **Type:** `subscribe` + +**Payload:** + +```json +{ + "type": "subscribe", + "payload": { + "rtspUrl": "rtsp://user:pass@host:port/stream", + "cameraIdentifier": "cam-002", + "snapshotUrl": "http://go2rtc/snapshot/1", + "snapshotInterval": 5000, + "modelUrl": "http://storage/models/us-lpr.mpta", + "modelName": "US-LPR-and-Vehicle-ID", + "modelId": 102 + } +} +``` + +### 5.2. Unsubscribe from Camera + +Instructs the worker to stop processing a camera's stream. + +- **Type:** `unsubscribe` + +**Payload:** + +```json +{ + "type": "unsubscribe", + "payload": { + "cameraIdentifier": "cam-002" + } +} +``` + +### 5.3. Request State + +Direct request for the worker's current state. Respond with a `stateReport` message. + +- **Type:** `requestState` + +**Payload:** + +```json +{ + "type": "requestState" +} +``` + +### 5.4. Patch Session Result + +Backend's response to a `patchSession` message. + +- **Type:** `patchSessionResult` + +**Payload:** + +```json +{ + "type": "patchSessionResult", + "payload": { + "sessionId": 12345, + "success": true, + "message": "Session updated successfully." + } +} +``` + +## 6. Example Communication Log + +This section shows a typical sequence of messages between the backend and the worker. Patch messages are not included, as they are only used when the worker cannot keep up. + +> **Note:** Unsubscribe is triggered when a user removes a camera or when the node is too heavily loaded and needs rebalancing. + +1. **Connection Established** & **Heartbeat** + * **Worker -> Backend** + ```json + { + "type": "stateReport", + "cpuUsage": 70.2, + "memoryUsage": 38.1, + "gpuUsage": 55.0, + "gpuMemoryUsage": 20.0, + "cameraConnections": [] + } + ``` +2. **Backend Subscribes Camera** + * **Backend -> Worker** + ```json + { + "type": "subscribe", + "payload": { + "rtspUrl": "rtsp://192.168.1.100/stream1", + "cameraIdentifier": "entry-cam-01", + "modelUrl": "http://storage/models/vehicle-id.mpta", + "modelName": "Vehicle Identification", + "modelId": 201 + } + } + ``` +3. **Worker Acknowledges in Heartbeat** + * **Worker -> Backend** + ```json + { + "type": "stateReport", + "cpuUsage": 72.5, + "memoryUsage": 39.0, + "gpuUsage": 57.0, + "gpuMemoryUsage": 21.0, + "cameraConnections": [ + { + "cameraIdentifier": "entry-cam-01", + "modelId": 201, + "modelName": "Vehicle Identification", + "online": true + } + ] + } + ``` +4. **Worker Detects a Car** + * **Worker -> Backend** + ```json + { + "type": "imageDetection", + "cameraIdentifier": "entry-cam-01", + "timestamp": "2025-07-15T10:00:00.000Z", + "data": { + "detection": { + "carBrand": "Honda", + "carModel": "CR-V", + "bodyType": "SUV", + "licensePlateText": "GEMINI-AI", + "licensePlateConfidence": 0.98 + }, + "modelId": 201, + "modelName": "Vehicle Identification" + } + } + ``` + * **Worker -> Backend** + ```json + { + "type": "imageDetection", + "cameraIdentifier": "entry-cam-01", + "timestamp": "2025-07-15T10:00:01.000Z", + "data": { + "detection": { + "carBrand": "Toyota", + "carModel": "Corolla", + "bodyType": "Sedan", + "licensePlateText": "CMS-1234", + "licensePlateConfidence": 0.97 + }, + "modelId": 201, + "modelName": "Vehicle Identification" + } + } + ``` + * **Worker -> Backend** + ```json + { + "type": "imageDetection", + "cameraIdentifier": "entry-cam-01", + "timestamp": "2025-07-15T10:00:02.000Z", + "data": { + "detection": { + "carBrand": "Ford", + "carModel": "Focus", + "bodyType": "Hatchback", + "licensePlateText": "CMS-5678", + "licensePlateConfidence": 0.96 + }, + "modelId": 201, + "modelName": "Vehicle Identification" + } + } + ``` +5. **Backend Unsubscribes Camera** + * **Backend -> Worker** + ```json + { + "type": "unsubscribe", + "payload": { + "cameraIdentifier": "entry-cam-01" + } + } + ``` +6. **Worker Acknowledges Unsubscription** + * **Worker -> Backend** + ```json + { + "type": "stateReport", + "cpuUsage": 68.0, + "memoryUsage": 37.0, + "gpuUsage": 50.0, + "gpuMemoryUsage": 18.0, + "cameraConnections": [] + } + ``` From 162f29ec215e4da7e5c6308c4cbc75f1f7dba540 Mon Sep 17 00:00:00 2001 From: Siwat Sirichai Date: Sun, 13 Jul 2025 19:43:46 +0700 Subject: [PATCH 09/23] remove license plate confidence from detection messages for simplified reporting --- worker.md | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/worker.md b/worker.md index 6101f2b..8d99e51 100644 --- a/worker.md +++ b/worker.md @@ -83,8 +83,7 @@ Sent when the worker detects a relevant object. The `detection` object should be "carBrand": "Honda", "carYear": 2023, "bodyType": "Sedan", - "licensePlateText": "ABCD1234", - "licensePlateConfidence": 0.95 + "licensePlateText": "ABCD1234" }, "modelId": 101, "modelName": "US-LPR-and-Vehicle-ID" @@ -292,8 +291,7 @@ This section shows a typical sequence of messages between the backend and the wo "carBrand": "Honda", "carModel": "CR-V", "bodyType": "SUV", - "licensePlateText": "GEMINI-AI", - "licensePlateConfidence": 0.98 + "licensePlateText": "ABCD1234" }, "modelId": 201, "modelName": "Vehicle Identification" @@ -312,7 +310,6 @@ This section shows a typical sequence of messages between the backend and the wo "carModel": "Corolla", "bodyType": "Sedan", "licensePlateText": "CMS-1234", - "licensePlateConfidence": 0.97 }, "modelId": 201, "modelName": "Vehicle Identification" @@ -331,7 +328,6 @@ This section shows a typical sequence of messages between the backend and the wo "carModel": "Focus", "bodyType": "Hatchback", "licensePlateText": "CMS-5678", - "licensePlateConfidence": 0.96 }, "modelId": 201, "modelName": "Vehicle Identification" From 1ff6108d087451b7687c8737ce4970b35e5b5996 Mon Sep 17 00:00:00 2001 From: Siwat Sirichai Date: Sun, 13 Jul 2025 23:58:01 +0700 Subject: [PATCH 10/23] update worker communication protocol to use subscription identifiers; add crop coordinates for camera streams and clarify handling of multiple subscriptions --- worker.md | 47 ++++++++++++++++++++++++++++++++++------------- 1 file changed, 34 insertions(+), 13 deletions(-) diff --git a/worker.md b/worker.md index 8d99e51..c7ad37e 100644 --- a/worker.md +++ b/worker.md @@ -55,7 +55,7 @@ This message is crucial for the backend to monitor your worker's health and stat "gpuMemoryUsage": 25.1, "cameraConnections": [ { - "cameraIdentifier": "cam-001", + "subscriptionIdentifier": "display-001;cam-001", "modelId": 101, "modelName": "General Object Detection", "online": true @@ -75,7 +75,7 @@ Sent when the worker detects a relevant object. The `detection` object should be ```json { "type": "imageDetection", - "cameraIdentifier": "cam-001", + "subscriptionIdentifier": "display-001;cam-001", "timestamp": "2025-07-14T12:34:56.789Z", "data": { "detection": { @@ -83,7 +83,8 @@ Sent when the worker detects a relevant object. The `detection` object should be "carBrand": "Honda", "carYear": 2023, "bodyType": "Sedan", - "licensePlateText": "ABCD1234" + "licensePlateText": "ABCD1234", + "licensePlateConfidence": 0.95 }, "modelId": 101, "modelName": "US-LPR-and-Vehicle-ID" @@ -167,17 +168,34 @@ Instructs the worker to process a camera's RTSP stream using the configuration f { "type": "subscribe", "payload": { + "subscriptionIdentifier": "display-001;cam-002", "rtspUrl": "rtsp://user:pass@host:port/stream", - "cameraIdentifier": "cam-002", "snapshotUrl": "http://go2rtc/snapshot/1", "snapshotInterval": 5000, "modelUrl": "http://storage/models/us-lpr.mpta", "modelName": "US-LPR-and-Vehicle-ID", - "modelId": 102 + "modelId": 102, + "cropX": 100, + "cropY": 200 } } ``` +> **Note:** +> - `cropX` and `cropY` (optional, integer) specify the initial crop coordinates for the camera stream. These values are configured per display and passed in the subscription payload. If not provided, the worker should process the full frame. +> + +> **Important:** +> If multiple displays are bound to the same camera, your worker must ensure that only **one stream** is opened per camera. When you receive multiple subscriptions for the same camera (with different `subscriptionIdentifier` values), you should: +> +> - Open the RTSP stream **once** for that camera if using RTSP. +> - Capture each snapshot only once per cycle, and reuse it for all display subscriptions sharing that camera. +> - Capture each frame/image only once per cycle. +> - Reuse the same captured image and snapshot for all display subscriptions that share the camera, processing and routing detection results separately for each display as needed. +> +> This avoids unnecessary load and bandwidth usage, and ensures consistent detection results and snapshots across all displays sharing the same camera. + + ### 5.2. Unsubscribe from Camera Instructs the worker to stop processing a camera's stream. @@ -190,7 +208,7 @@ Instructs the worker to stop processing a camera's stream. { "type": "unsubscribe", "payload": { - "cameraIdentifier": "cam-002" + "subscriptionIdentifier": "display-001;cam-002" } } ``` @@ -252,8 +270,8 @@ This section shows a typical sequence of messages between the backend and the wo { "type": "subscribe", "payload": { + "subscriptionIdentifier": "display-001;entry-cam-01", "rtspUrl": "rtsp://192.168.1.100/stream1", - "cameraIdentifier": "entry-cam-01", "modelUrl": "http://storage/models/vehicle-id.mpta", "modelName": "Vehicle Identification", "modelId": 201 @@ -271,7 +289,7 @@ This section shows a typical sequence of messages between the backend and the wo "gpuMemoryUsage": 21.0, "cameraConnections": [ { - "cameraIdentifier": "entry-cam-01", + "subscriptionIdentifier": "display-001;entry-cam-01", "modelId": 201, "modelName": "Vehicle Identification", "online": true @@ -284,14 +302,15 @@ This section shows a typical sequence of messages between the backend and the wo ```json { "type": "imageDetection", - "cameraIdentifier": "entry-cam-01", + "subscriptionIdentifier": "display-001;entry-cam-01", "timestamp": "2025-07-15T10:00:00.000Z", "data": { "detection": { "carBrand": "Honda", "carModel": "CR-V", "bodyType": "SUV", - "licensePlateText": "ABCD1234" + "licensePlateText": "GEMINI-AI", + "licensePlateConfidence": 0.98 }, "modelId": 201, "modelName": "Vehicle Identification" @@ -302,7 +321,7 @@ This section shows a typical sequence of messages between the backend and the wo ```json { "type": "imageDetection", - "cameraIdentifier": "entry-cam-01", + "subscriptionIdentifier": "display-001;entry-cam-01", "timestamp": "2025-07-15T10:00:01.000Z", "data": { "detection": { @@ -310,6 +329,7 @@ This section shows a typical sequence of messages between the backend and the wo "carModel": "Corolla", "bodyType": "Sedan", "licensePlateText": "CMS-1234", + "licensePlateConfidence": 0.97 }, "modelId": 201, "modelName": "Vehicle Identification" @@ -320,7 +340,7 @@ This section shows a typical sequence of messages between the backend and the wo ```json { "type": "imageDetection", - "cameraIdentifier": "entry-cam-01", + "subscriptionIdentifier": "display-001;entry-cam-01", "timestamp": "2025-07-15T10:00:02.000Z", "data": { "detection": { @@ -328,6 +348,7 @@ This section shows a typical sequence of messages between the backend and the wo "carModel": "Focus", "bodyType": "Hatchback", "licensePlateText": "CMS-5678", + "licensePlateConfidence": 0.96 }, "modelId": 201, "modelName": "Vehicle Identification" @@ -340,7 +361,7 @@ This section shows a typical sequence of messages between the backend and the wo { "type": "unsubscribe", "payload": { - "cameraIdentifier": "entry-cam-01" + "subscriptionIdentifier": "display-001;entry-cam-01" } } ``` From 8e14897a696e1fc33a181a28a7f00464e95c7bca Mon Sep 17 00:00:00 2001 From: Siwat Sirichai Date: Sun, 13 Jul 2025 23:59:51 +0700 Subject: [PATCH 11/23] add crop coordinates to state report messages for camera connections --- worker.md | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/worker.md b/worker.md index c7ad37e..ee3eb67 100644 --- a/worker.md +++ b/worker.md @@ -58,12 +58,17 @@ This message is crucial for the backend to monitor your worker's health and stat "subscriptionIdentifier": "display-001;cam-001", "modelId": 101, "modelName": "General Object Detection", - "online": true + "online": true, + "cropX": 100, + "cropY": 200 } ] } ``` +> **Note:** +> - `cropX` and `cropY` (optional, integer) should be included in each camera connection to indicate the initial crop coordinates for that subscription. + ### 4.2. Image Detection Sent when the worker detects a relevant object. The `detection` object should be flat and contain key-value pairs corresponding to the detected attributes. From 39d49ba61721854e1437b4aa600feeaee945e2dd Mon Sep 17 00:00:00 2001 From: Siwat Sirichai Date: Mon, 14 Jul 2025 01:01:01 +0700 Subject: [PATCH 12/23] update crop coordinate fields in worker communication protocol to support rectangular cropping --- worker.md | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/worker.md b/worker.md index ee3eb67..f8bc0ed 100644 --- a/worker.md +++ b/worker.md @@ -59,15 +59,17 @@ This message is crucial for the backend to monitor your worker's health and stat "modelId": 101, "modelName": "General Object Detection", "online": true, - "cropX": 100, - "cropY": 200 + "cropX1": 100, + "cropY1": 200, + "cropX2": 300, + "cropY2": 400 } ] } ``` > **Note:** -> - `cropX` and `cropY` (optional, integer) should be included in each camera connection to indicate the initial crop coordinates for that subscription. +> - `cropX1`, `cropY1`, `cropX2`, `cropY2` (optional, integer) should be included in each camera connection to indicate the crop coordinates for that subscription. ### 4.2. Image Detection @@ -180,14 +182,16 @@ Instructs the worker to process a camera's RTSP stream using the configuration f "modelUrl": "http://storage/models/us-lpr.mpta", "modelName": "US-LPR-and-Vehicle-ID", "modelId": 102, - "cropX": 100, - "cropY": 200 + "cropX1": 100, + "cropY1": 200, + "cropX2": 300, + "cropY2": 400 } } ``` > **Note:** -> - `cropX` and `cropY` (optional, integer) specify the initial crop coordinates for the camera stream. These values are configured per display and passed in the subscription payload. If not provided, the worker should process the full frame. +> - `cropX1`, `cropY1`, `cropX2`, `cropY2` (optional, integer) specify the crop coordinates for the camera stream. These values are configured per display and passed in the subscription payload. If not provided, the worker should process the full frame. > > **Important:** From 3c67fa933c55fda3b6afe4081fb1ed0663d029f1 Mon Sep 17 00:00:00 2001 From: Siwat Sirichai Date: Mon, 14 Jul 2025 01:46:22 +0700 Subject: [PATCH 13/23] add crop coordinates handling in camera stream management; update logging and refactor subscription identifiers --- app.py | 105 ++++++++++++++++++++++++++++++++------------------------- 1 file changed, 60 insertions(+), 45 deletions(-) diff --git a/app.py b/app.py index 5386496..4e9be15 100644 --- a/app.py +++ b/app.py @@ -122,6 +122,15 @@ def fetch_snapshot(url: str): logger.error(f"Exception fetching snapshot from {url}: {str(e)}") return None +# Helper to get crop coordinates from stream +def get_crop_coords(stream): + return { + "cropX1": stream.get("cropX1"), + "cropY1": stream.get("cropY1"), + "cropX2": stream.get("cropX2"), + "cropY2": stream.get("cropY2") + } + #################################################### # REST API endpoint for image retrieval #################################################### @@ -133,20 +142,24 @@ async def get_camera_image(camera_id: str): try: with streams_lock: if camera_id not in streams: + logger.warning(f"Camera ID '{camera_id}' not found in streams. Current streams: {list(streams.keys())}") raise HTTPException(status_code=404, detail=f"Camera {camera_id} not found or not active") stream = streams[camera_id] buffer = stream["buffer"] + logger.debug(f"Camera '{camera_id}' buffer size: {buffer.qsize()}, buffer empty: {buffer.empty()}") + logger.debug(f"Buffer queue contents: {getattr(buffer, 'queue', None)}") if buffer.empty(): + logger.warning(f"No frame available for camera '{camera_id}'. Buffer is empty.") raise HTTPException(status_code=404, detail=f"No frame available for camera {camera_id}") # Get the latest frame (non-blocking) try: frame = buffer.queue[-1] # Get the most recent frame without removing it except IndexError: + logger.warning(f"Buffer queue is empty for camera '{camera_id}' when trying to access last frame.") raise HTTPException(status_code=404, detail=f"No frame available for camera {camera_id}") - # Encode frame as JPEG success, buffer_img = cv2.imencode('.jpg', frame, [cv2.IMWRITE_JPEG_QUALITY, 85]) if not success: @@ -224,8 +237,8 @@ async def detect(websocket: WebSocket): detection_data = { "type": "imageDetection", - "cameraIdentifier": camera_id, - "timestamp": time.time(), + "subscriptionIdentifier": stream["subscriptionIdentifier"], + "timestamp": time.strftime("%Y-%m-%dT%H:%M:%S.%fZ", time.gmtime()), "data": { "detection": highest_confidence_detection, # Send only the highest confidence detection "modelId": stream["modelId"], @@ -304,12 +317,11 @@ async def detect(websocket: WebSocket): if not buffer.empty(): try: buffer.get_nowait() - logger.debug(f"Removed old frame from buffer for camera {camera_id}") + logger.debug(f"[frame_reader] Removed old frame from buffer for camera {camera_id}") except queue.Empty: pass - buffer.put(frame) - logger.debug(f"Added new frame to buffer for camera {camera_id}") + logger.debug(f"[frame_reader] Added new frame to buffer for camera {camera_id}. Buffer size: {buffer.qsize()}") # Short sleep to avoid CPU overuse time.sleep(0.01) @@ -380,12 +392,11 @@ async def detect(websocket: WebSocket): if not buffer.empty(): try: buffer.get_nowait() - logger.debug(f"Removed old snapshot from buffer for camera {camera_id}") + logger.debug(f"[snapshot_reader] Removed old snapshot from buffer for camera {camera_id}") except queue.Empty: pass - buffer.put(frame) - logger.debug(f"Added new snapshot to buffer for camera {camera_id}") + logger.debug(f"[snapshot_reader] Added new snapshot to buffer for camera {camera_id}. Buffer size: {buffer.qsize()}") # Wait for the specified interval elapsed = time.time() - start_time @@ -456,18 +467,19 @@ async def detect(websocket: WebSocket): cpu_usage = psutil.cpu_percent() memory_usage = psutil.virtual_memory().percent if torch.cuda.is_available(): - gpu_usage = torch.cuda.memory_allocated() / (1024 ** 2) # MB - gpu_memory_usage = torch.cuda.memory_reserved() / (1024 ** 2) # MB + gpu_usage = torch.cuda.utilization() if hasattr(torch.cuda, 'utilization') else None + gpu_memory_usage = torch.cuda.memory_reserved() / (1024 ** 2) else: gpu_usage = None gpu_memory_usage = None camera_connections = [ { - "cameraIdentifier": camera_id, + "subscriptionIdentifier": stream["subscriptionIdentifier"], "modelId": stream["modelId"], "modelName": stream["modelName"], - "online": True + "online": True, + **{k: v for k, v in get_crop_coords(stream).items() if v is not None} } for camera_id, stream in streams.items() ] @@ -497,13 +509,19 @@ async def detect(websocket: WebSocket): if msg_type == "subscribe": payload = data.get("payload", {}) - camera_id = payload.get("cameraIdentifier") + subscriptionIdentifier = payload.get("subscriptionIdentifier") rtsp_url = payload.get("rtspUrl") snapshot_url = payload.get("snapshotUrl") - snapshot_interval = payload.get("snapshotInterval") # in milliseconds - model_url = payload.get("modelUrl") # may be remote or local + snapshot_interval = payload.get("snapshotInterval") + model_url = payload.get("modelUrl") modelId = payload.get("modelId") modelName = payload.get("modelName") + cropX1 = payload.get("cropX1") + cropY1 = payload.get("cropY1") + cropX2 = payload.get("cropX2") + cropY2 = payload.get("cropY2") + + camera_id = subscriptionIdentifier # Use subscriptionIdentifier as camera_id for mapping if model_url: with models_lock: @@ -566,24 +584,30 @@ async def detect(websocket: WebSocket): if camera_id not in streams and len(streams) < max_streams: buffer = queue.Queue(maxsize=1) stop_event = threading.Event() - - # Choose between snapshot and RTSP based on availability + stream_info = { + "buffer": buffer, + "thread": None, + "stop_event": stop_event, + "modelId": modelId, + "modelName": modelName, + "subscriptionIdentifier": subscriptionIdentifier, + "cropX1": cropX1, + "cropY1": cropY1, + "cropX2": cropX2, + "cropY2": cropY2 + } if snapshot_url and snapshot_interval: logger.info(f"Using snapshot mode for camera {camera_id}: {snapshot_url}") thread = threading.Thread(target=snapshot_reader, args=(camera_id, snapshot_url, snapshot_interval, buffer, stop_event)) thread.daemon = True thread.start() - streams[camera_id] = { - "buffer": buffer, - "thread": thread, + stream_info.update({ "snapshot_url": snapshot_url, "snapshot_interval": snapshot_interval, - "stop_event": stop_event, - "modelId": modelId, - "modelName": modelName, "mode": "snapshot" - } - logger.info(f"Subscribed to camera {camera_id} (snapshot mode) with modelId {modelId}, modelName {modelName}, URL {snapshot_url}, interval {snapshot_interval}ms") + }) + stream_info["thread"] = thread + streams[camera_id] = stream_info elif rtsp_url: logger.info(f"Using RTSP mode for camera {camera_id}: {rtsp_url}") cap = cv2.VideoCapture(rtsp_url) @@ -593,17 +617,13 @@ async def detect(websocket: WebSocket): thread = threading.Thread(target=frame_reader, args=(camera_id, cap, buffer, stop_event)) thread.daemon = True thread.start() - streams[camera_id] = { + stream_info.update({ "cap": cap, - "buffer": buffer, - "thread": thread, "rtsp_url": rtsp_url, - "stop_event": stop_event, - "modelId": modelId, - "modelName": modelName, "mode": "rtsp" - } - logger.info(f"Subscribed to camera {camera_id} (RTSP mode) with modelId {modelId}, modelName {modelName}, URL {rtsp_url}") + }) + stream_info["thread"] = thread + streams[camera_id] = stream_info else: logger.error(f"No valid URL provided for camera {camera_id}") continue @@ -622,20 +642,15 @@ async def detect(websocket: WebSocket): del models[camera_id] elif msg_type == "unsubscribe": payload = data.get("payload", {}) - camera_id = payload.get("cameraIdentifier") - logger.debug(f"Unsubscribing from camera {camera_id}") + subscriptionIdentifier = payload.get("subscriptionIdentifier") + camera_id = subscriptionIdentifier with streams_lock: if camera_id and camera_id in streams: stream = streams.pop(camera_id) stream["stop_event"].set() stream["thread"].join() - # Only release cap if it exists (RTSP mode) if "cap" in stream: stream["cap"].release() - logger.info(f"Released RTSP capture for camera {camera_id}") - else: - logger.info(f"Released snapshot reader for camera {camera_id}") - logger.info(f"Unsubscribed from camera {camera_id}") with models_lock: if camera_id in models: del models[camera_id] @@ -643,7 +658,7 @@ async def detect(websocket: WebSocket): cpu_usage = psutil.cpu_percent() memory_usage = psutil.virtual_memory().percent if torch.cuda.is_available(): - gpu_usage = torch.cuda.memory_allocated() / (1024 ** 2) + gpu_usage = torch.cuda.utilization() if hasattr(torch.cuda, 'utilization') else None gpu_memory_usage = torch.cuda.memory_reserved() / (1024 ** 2) else: gpu_usage = None @@ -651,10 +666,11 @@ async def detect(websocket: WebSocket): camera_connections = [ { - "cameraIdentifier": camera_id, + "subscriptionIdentifier": stream["subscriptionIdentifier"], "modelId": stream["modelId"], "modelName": stream["modelName"], - "online": True + "online": True, + **{k: v for k, v in get_crop_coords(stream).items() if v is not None} } for camera_id, stream in streams.items() ] @@ -678,7 +694,6 @@ async def detect(websocket: WebSocket): except Exception as e: logger.error(f"Error handling message: {e}") break - try: await websocket.accept() stream_task = asyncio.create_task(process_streams()) From 8f32de15107a9c8048178f0397ac763db492dab1 Mon Sep 17 00:00:00 2001 From: Siwat Sirichai Date: Mon, 14 Jul 2025 10:49:59 +0700 Subject: [PATCH 14/23] add session ID handling to worker communication protocol; allow backend to associate session IDs with subscriptions --- worker.md | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/worker.md b/worker.md index f8bc0ed..88ddaf0 100644 --- a/worker.md +++ b/worker.md @@ -255,6 +255,27 @@ Backend's response to a `patchSession` message. } ``` +### 5.5. Set Session ID + +Allows the backend to instruct the worker to associate a session ID with a subscription. This is useful for linking detection events to a specific session. + +- **Type:** `setSessionId` + +**Payload:** + +```json +{ + "type": "setSessionId", + "payload": { + "subscriptionIdentifier": "display-001;cam-002", + "sessionId": 12345 + } +} +``` + +> **Note:** +> - The worker should store the session ID for the given subscription and use it in subsequent detection or patch messages as appropriate. + ## 6. Example Communication Log This section shows a typical sequence of messages between the backend and the worker. Patch messages are not included, as they are only used when the worker cannot keep up. From 3edcd286fd7f8307f59e2930601ae09684497a43 Mon Sep 17 00:00:00 2001 From: Siwat Sirichai Date: Mon, 14 Jul 2025 10:57:06 +0700 Subject: [PATCH 15/23] update session ID handling in worker communication protocol; allow null session ID to indicate no active session --- worker.md | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/worker.md b/worker.md index 88ddaf0..f874ff2 100644 --- a/worker.md +++ b/worker.md @@ -257,7 +257,7 @@ Backend's response to a `patchSession` message. ### 5.5. Set Session ID -Allows the backend to instruct the worker to associate a session ID with a subscription. This is useful for linking detection events to a specific session. +Allows the backend to instruct the worker to associate a session ID with a subscription. This is useful for linking detection events to a specific session. The session ID can be `null` to indicate no active session. - **Type:** `setSessionId` @@ -273,8 +273,20 @@ Allows the backend to instruct the worker to associate a session ID with a subsc } ``` +Or to clear the session: + +```json +{ + "type": "setSessionId", + "payload": { + "subscriptionIdentifier": "display-001;cam-002", + "sessionId": null + } +} +``` + > **Note:** -> - The worker should store the session ID for the given subscription and use it in subsequent detection or patch messages as appropriate. +> - The worker should store the session ID for the given subscription and use it in subsequent detection or patch messages as appropriate. If `sessionId` is `null`, the worker should treat the subscription as having no active session. ## 6. Example Communication Log From 700d3b3efef8cc7248a073af4ae100993073707b Mon Sep 17 00:00:00 2001 From: Siwat Sirichai Date: Mon, 14 Jul 2025 11:02:05 +0700 Subject: [PATCH 16/23] add subscription identifier format and session ID association details to worker communication protocol --- worker.md | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/worker.md b/worker.md index f874ff2..67acec5 100644 --- a/worker.md +++ b/worker.md @@ -288,6 +288,24 @@ Or to clear the session: > **Note:** > - The worker should store the session ID for the given subscription and use it in subsequent detection or patch messages as appropriate. If `sessionId` is `null`, the worker should treat the subscription as having no active session. +## Subscription Identifier Format + +The `subscriptionIdentifier` used in all messages is constructed as: + +``` +displayIdentifier;cameraIdentifier +``` + +This uniquely identifies a camera subscription for a specific display. + +### Session ID Association + +When the backend sends a `setSessionId` command, it will only provide the `displayIdentifier` (not the full `subscriptionIdentifier`). + +**Worker Responsibility:** +- The worker must match the `displayIdentifier` to all active subscriptions for that display (i.e., all `subscriptionIdentifier` values that start with `displayIdentifier;`). +- The worker should set or clear the session ID for all matching subscriptions. + ## 6. Example Communication Log This section shows a typical sequence of messages between the backend and the worker. Patch messages are not included, as they are only used when the worker cannot keep up. From 112ca9325d7f7fa80fc3ac20e218468abc91209b Mon Sep 17 00:00:00 2001 From: Siwat Sirichai Date: Mon, 14 Jul 2025 11:05:17 +0700 Subject: [PATCH 17/23] refactor session ID handling in worker communication protocol; replace subscriptionIdentifier with displayIdentifier --- worker.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/worker.md b/worker.md index 67acec5..8bf3349 100644 --- a/worker.md +++ b/worker.md @@ -267,7 +267,7 @@ Allows the backend to instruct the worker to associate a session ID with a subsc { "type": "setSessionId", "payload": { - "subscriptionIdentifier": "display-001;cam-002", + "displayIdentifier": "display-001", "sessionId": 12345 } } @@ -279,7 +279,7 @@ Or to clear the session: { "type": "setSessionId", "payload": { - "subscriptionIdentifier": "display-001;cam-002", + "displayIdentifier": "display-001", "sessionId": null } } From c7bb46e1e3b6d7992ae45578928227fe3bd8cf4b Mon Sep 17 00:00:00 2001 From: Siwat Sirichai Date: Mon, 14 Jul 2025 11:19:11 +0700 Subject: [PATCH 18/23] refactor documentation for worker communication protocol; improve formatting and clarify crop coordinates and session ID handling --- worker.md | 22 ++++++++++++---------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/worker.md b/worker.md index 8bf3349..00a13cf 100644 --- a/worker.md +++ b/worker.md @@ -20,16 +20,17 @@ Communication is bidirectional and asynchronous. All messages are JSON objects w To enable modularity and dynamic configuration, the backend will send you a URL to a `.mpta` file when it issues a `subscribe` command. This file is a renamed `.zip` archive that contains everything your worker needs to perform its task. **Your worker is responsible for:** -1. Fetching this file from the provided URL. -2. Extracting its contents. -3. Interpreting the contents to configure its internal pipeline. + +1. Fetching this file from the provided URL. +2. Extracting its contents. +3. Interpreting the contents to configure its internal pipeline. **The contents of the `.mpta` file are entirely up to the user who configures the model in the CMS.** This allows for maximum flexibility. For example, the archive could contain: -* **AI/ML Models:** Pre-trained models for libraries like TensorFlow, PyTorch, or ONNX. -* **Configuration Files:** A `config.json` or `pipeline.yaml` that defines a sequence of operations, specifies model paths, or sets detection thresholds. -* **Scripts:** Custom Python scripts for pre-processing or post-processing. -* **API Integration Details:** A JSON file with endpoint information and credentials for interacting with third-party detection services. +- AI/ML Models: Pre-trained models for libraries like TensorFlow, PyTorch, or ONNX. +- Configuration Files: A `config.json` or `pipeline.yaml` that defines a sequence of operations, specifies model paths, or sets detection thresholds. +- Scripts: Custom Python scripts for pre-processing or post-processing. +- API Integration Details: A JSON file with endpoint information and credentials for interacting with third-party detection services. Essentially, the `.mpta` file is a self-contained package that tells your worker *how* to process the video stream for a given subscription. @@ -69,6 +70,7 @@ This message is crucial for the backend to monitor your worker's health and stat ``` > **Note:** +> > - `cropX1`, `cropY1`, `cropX2`, `cropY2` (optional, integer) should be included in each camera connection to indicate the crop coordinates for that subscription. ### 4.2. Image Detection @@ -191,9 +193,9 @@ Instructs the worker to process a camera's RTSP stream using the configuration f ``` > **Note:** +> > - `cropX1`, `cropY1`, `cropX2`, `cropY2` (optional, integer) specify the crop coordinates for the camera stream. These values are configured per display and passed in the subscription payload. If not provided, the worker should process the full frame. > - > **Important:** > If multiple displays are bound to the same camera, your worker must ensure that only **one stream** is opened per camera. When you receive multiple subscriptions for the same camera (with different `subscriptionIdentifier` values), you should: > @@ -201,10 +203,8 @@ Instructs the worker to process a camera's RTSP stream using the configuration f > - Capture each snapshot only once per cycle, and reuse it for all display subscriptions sharing that camera. > - Capture each frame/image only once per cycle. > - Reuse the same captured image and snapshot for all display subscriptions that share the camera, processing and routing detection results separately for each display as needed. -> > This avoids unnecessary load and bandwidth usage, and ensures consistent detection results and snapshots across all displays sharing the same camera. - ### 5.2. Unsubscribe from Camera Instructs the worker to stop processing a camera's stream. @@ -286,6 +286,7 @@ Or to clear the session: ``` > **Note:** +> > - The worker should store the session ID for the given subscription and use it in subsequent detection or patch messages as appropriate. If `sessionId` is `null`, the worker should treat the subscription as having no active session. ## Subscription Identifier Format @@ -303,6 +304,7 @@ This uniquely identifies a camera subscription for a specific display. When the backend sends a `setSessionId` command, it will only provide the `displayIdentifier` (not the full `subscriptionIdentifier`). **Worker Responsibility:** + - The worker must match the `displayIdentifier` to all active subscriptions for that display (i.e., all `subscriptionIdentifier` values that start with `displayIdentifier;`). - The worker should set or clear the session ID for all matching subscriptions. From 428f7a96717134c60249ebf91add394e8aa340b8 Mon Sep 17 00:00:00 2001 From: pixchy-commits <6341430@schoolptk.ac.th> Date: Mon, 14 Jul 2025 23:40:19 +0700 Subject: [PATCH 19/23] feat: enhance session management in worker communication protocol; implement session ID handling and crop frame processing --- app.py | 291 ++++++++++++++++++++++++++++++++++------------- test_protocol.py | 125 ++++++++++++++++++++ 2 files changed, 334 insertions(+), 82 deletions(-) create mode 100644 test_protocol.py diff --git a/app.py b/app.py index 4e9be15..60beb27 100644 --- a/app.py +++ b/app.py @@ -29,6 +29,12 @@ app = FastAPI() # "models" now holds a nested dict: { camera_id: { modelId: model_tree } } models: Dict[str, Dict[str, Any]] = {} streams: Dict[str, Dict[str, Any]] = {} +# Store session IDs per display +session_ids: Dict[str, int] = {} +# Track shared camera streams by camera URL +camera_streams: Dict[str, Dict[str, Any]] = {} +# Map subscriptions to their camera URL +subscription_to_camera: Dict[str, str] = {} with open("config.json", "r") as f: config = json.load(f) @@ -184,9 +190,16 @@ async def detect(websocket: WebSocket): async def handle_detection(camera_id, stream, frame, websocket, model_tree, persistent_data): try: + # Apply crop if specified + cropped_frame = frame + if all(coord is not None for coord in [stream.get("cropX1"), stream.get("cropY1"), stream.get("cropX2"), stream.get("cropY2")]): + cropX1, cropY1, cropX2, cropY2 = stream["cropX1"], stream["cropY1"], stream["cropX2"], stream["cropY2"] + cropped_frame = frame[cropY1:cropY2, cropX1:cropX2] + logger.debug(f"Applied crop coordinates ({cropX1}, {cropY1}, {cropX2}, {cropY2}) to frame for camera {camera_id}") + logger.debug(f"Processing frame for camera {camera_id} with model {stream['modelId']}") start_time = time.time() - detection_result = run_pipeline(frame, model_tree) + detection_result = run_pipeline(cropped_frame, model_tree) process_time = (time.time() - start_time) * 1000 logger.debug(f"Detection for camera {camera_id} completed in {process_time:.2f}ms") @@ -235,22 +248,48 @@ async def detect(websocket: WebSocket): "box": [0, 0, 0, 0] } + # Convert detection format to match protocol - flatten detection attributes + detection_dict = {} + + # Handle different detection result formats + if isinstance(highest_confidence_detection, dict): + # Copy all fields from the detection result + for key, value in highest_confidence_detection.items(): + if key not in ["box", "id"]: # Skip internal fields + detection_dict[key] = value + + # Extract display identifier for session ID lookup + subscription_parts = stream["subscriptionIdentifier"].split(';') + display_identifier = subscription_parts[0] if subscription_parts else None + session_id = session_ids.get(display_identifier) if display_identifier else None + detection_data = { "type": "imageDetection", "subscriptionIdentifier": stream["subscriptionIdentifier"], "timestamp": time.strftime("%Y-%m-%dT%H:%M:%S.%fZ", time.gmtime()), "data": { - "detection": highest_confidence_detection, # Send only the highest confidence detection + "detection": detection_dict, "modelId": stream["modelId"], "modelName": stream["modelName"] } } + # Add session ID if available + if session_id is not None: + detection_data["sessionId"] = session_id + if highest_confidence_detection["class"] != "none": logger.info(f"Camera {camera_id}: Detected {highest_confidence_detection['class']} with confidence {highest_confidence_detection['confidence']:.2f} using model {stream['modelName']}") + + # Log session ID if available + subscription_parts = stream["subscriptionIdentifier"].split(';') + display_identifier = subscription_parts[0] if subscription_parts else None + session_id = session_ids.get(display_identifier) if display_identifier else None + if session_id: + logger.debug(f"Detection associated with session ID: {session_id}") await websocket.send_json(detection_data) - logger.debug(f"Sent detection data to client for camera {camera_id}:\n{json.dumps(detection_data, indent=2)}") + logger.debug(f"Sent detection data to client for camera {camera_id}") return persistent_data except Exception as e: logger.error(f"Error in handle_detection for camera {camera_id}: {str(e)}", exc_info=True) @@ -521,50 +560,58 @@ async def detect(websocket: WebSocket): cropX2 = payload.get("cropX2") cropY2 = payload.get("cropY2") - camera_id = subscriptionIdentifier # Use subscriptionIdentifier as camera_id for mapping + # Extract camera_id from subscriptionIdentifier (format: displayIdentifier;cameraIdentifier) + parts = subscriptionIdentifier.split(';') + if len(parts) != 2: + logger.error(f"Invalid subscriptionIdentifier format: {subscriptionIdentifier}") + continue + + display_identifier, camera_identifier = parts + camera_id = subscriptionIdentifier # Use full subscriptionIdentifier as camera_id for mapping if model_url: with models_lock: if (camera_id not in models) or (modelId not in models[camera_id]): logger.info(f"Loading model from {model_url} for camera {camera_id}, modelId {modelId}") - extraction_dir = os.path.join("models", camera_id, str(modelId)) + extraction_dir = os.path.join("models", camera_identifier, str(modelId)) os.makedirs(extraction_dir, exist_ok=True) # If model_url is remote, download it first. parsed = urlparse(model_url) if parsed.scheme in ("http", "https"): - logger.info(f"Downloading remote model from {model_url}") - local_mpta = os.path.join(extraction_dir, os.path.basename(parsed.path)) + logger.info(f"Downloading remote .mpta file from {model_url}") + filename = os.path.basename(parsed.path) or f"model_{modelId}.mpta" + local_mpta = os.path.join(extraction_dir, filename) logger.debug(f"Download destination: {local_mpta}") local_path = download_mpta(model_url, local_mpta) if not local_path: - logger.error(f"Failed to download the remote mpta file from {model_url}") + logger.error(f"Failed to download the remote .mpta file from {model_url}") error_response = { "type": "error", - "cameraIdentifier": camera_id, + "subscriptionIdentifier": subscriptionIdentifier, "error": f"Failed to download model from {model_url}" } await websocket.send_json(error_response) continue model_tree = load_pipeline_from_zip(local_path, extraction_dir) else: - logger.info(f"Loading local model from {model_url}") + logger.info(f"Loading local .mpta file from {model_url}") # Check if file exists before attempting to load if not os.path.exists(model_url): - logger.error(f"Local model file not found: {model_url}") + logger.error(f"Local .mpta file not found: {model_url}") logger.debug(f"Current working directory: {os.getcwd()}") error_response = { "type": "error", - "cameraIdentifier": camera_id, + "subscriptionIdentifier": subscriptionIdentifier, "error": f"Model file not found: {model_url}" } await websocket.send_json(error_response) continue model_tree = load_pipeline_from_zip(model_url, extraction_dir) if model_tree is None: - logger.error(f"Failed to load model {modelId} from mpta file for camera {camera_id}") + logger.error(f"Failed to load model {modelId} from .mpta file for camera {camera_id}") error_response = { "type": "error", - "cameraIdentifier": camera_id, + "subscriptionIdentifier": subscriptionIdentifier, "error": f"Failed to load model {modelId}" } await websocket.send_json(error_response) @@ -573,20 +620,80 @@ async def detect(websocket: WebSocket): models[camera_id] = {} models[camera_id][modelId] = model_tree logger.info(f"Successfully loaded model {modelId} for camera {camera_id}") - success_response = { - "type": "modelLoaded", - "cameraIdentifier": camera_id, - "modelId": modelId - } - await websocket.send_json(success_response) + logger.debug(f"Model extraction directory: {extraction_dir}") if camera_id and (rtsp_url or snapshot_url): with streams_lock: + # Determine camera URL for shared stream management + camera_url = snapshot_url if snapshot_url else rtsp_url + if camera_id not in streams and len(streams) < max_streams: - buffer = queue.Queue(maxsize=1) - stop_event = threading.Event() + # Check if we already have a stream for this camera URL + shared_stream = camera_streams.get(camera_url) + + if shared_stream: + # Reuse existing stream + logger.info(f"Reusing existing stream for camera URL: {camera_url}") + buffer = shared_stream["buffer"] + stop_event = shared_stream["stop_event"] + thread = shared_stream["thread"] + mode = shared_stream["mode"] + + # Increment reference count + shared_stream["ref_count"] = shared_stream.get("ref_count", 0) + 1 + else: + # Create new stream + buffer = queue.Queue(maxsize=1) + stop_event = threading.Event() + + if snapshot_url and snapshot_interval: + logger.info(f"Creating new snapshot stream for camera {camera_id}: {snapshot_url}") + thread = threading.Thread(target=snapshot_reader, args=(camera_identifier, snapshot_url, snapshot_interval, buffer, stop_event)) + thread.daemon = True + thread.start() + mode = "snapshot" + + # Store shared stream info + shared_stream = { + "buffer": buffer, + "thread": thread, + "stop_event": stop_event, + "mode": mode, + "url": snapshot_url, + "snapshot_interval": snapshot_interval, + "ref_count": 1 + } + camera_streams[camera_url] = shared_stream + + elif rtsp_url: + logger.info(f"Creating new RTSP stream for camera {camera_id}: {rtsp_url}") + cap = cv2.VideoCapture(rtsp_url) + if not cap.isOpened(): + logger.error(f"Failed to open RTSP stream for camera {camera_id}") + continue + thread = threading.Thread(target=frame_reader, args=(camera_identifier, cap, buffer, stop_event)) + thread.daemon = True + thread.start() + mode = "rtsp" + + # Store shared stream info + shared_stream = { + "buffer": buffer, + "thread": thread, + "stop_event": stop_event, + "mode": mode, + "url": rtsp_url, + "cap": cap, + "ref_count": 1 + } + camera_streams[camera_url] = shared_stream + else: + logger.error(f"No valid URL provided for camera {camera_id}") + continue + + # Create stream info for this subscription stream_info = { "buffer": buffer, - "thread": None, + "thread": thread, "stop_event": stop_event, "modelId": modelId, "modelName": modelName, @@ -594,52 +701,25 @@ async def detect(websocket: WebSocket): "cropX1": cropX1, "cropY1": cropY1, "cropX2": cropX2, - "cropY2": cropY2 + "cropY2": cropY2, + "mode": mode, + "camera_url": camera_url } - if snapshot_url and snapshot_interval: - logger.info(f"Using snapshot mode for camera {camera_id}: {snapshot_url}") - thread = threading.Thread(target=snapshot_reader, args=(camera_id, snapshot_url, snapshot_interval, buffer, stop_event)) - thread.daemon = True - thread.start() - stream_info.update({ - "snapshot_url": snapshot_url, - "snapshot_interval": snapshot_interval, - "mode": "snapshot" - }) - stream_info["thread"] = thread - streams[camera_id] = stream_info - elif rtsp_url: - logger.info(f"Using RTSP mode for camera {camera_id}: {rtsp_url}") - cap = cv2.VideoCapture(rtsp_url) - if not cap.isOpened(): - logger.error(f"Failed to open RTSP stream for camera {camera_id}") - continue - thread = threading.Thread(target=frame_reader, args=(camera_id, cap, buffer, stop_event)) - thread.daemon = True - thread.start() - stream_info.update({ - "cap": cap, - "rtsp_url": rtsp_url, - "mode": "rtsp" - }) - stream_info["thread"] = thread - streams[camera_id] = stream_info - else: - logger.error(f"No valid URL provided for camera {camera_id}") - continue + + if mode == "snapshot": + stream_info["snapshot_url"] = snapshot_url + stream_info["snapshot_interval"] = snapshot_interval + elif mode == "rtsp": + stream_info["rtsp_url"] = rtsp_url + stream_info["cap"] = shared_stream["cap"] + + streams[camera_id] = stream_info + subscription_to_camera[camera_id] = camera_url + elif camera_id and camera_id in streams: # If already subscribed, unsubscribe first - stream = streams.pop(camera_id) - stream["stop_event"].set() - stream["thread"].join() - if "cap" in stream: - stream["cap"].release() - logger.info(f"Unsubscribed from camera {camera_id} for resubscription") - with models_lock: - if camera_id in models and modelId in models[camera_id]: - del models[camera_id][modelId] - if not models[camera_id]: - del models[camera_id] + logger.info(f"Resubscribing to camera {camera_id}") + # Note: Keep models in memory for reuse across subscriptions elif msg_type == "unsubscribe": payload = data.get("payload", {}) subscriptionIdentifier = payload.get("subscriptionIdentifier") @@ -647,13 +727,25 @@ async def detect(websocket: WebSocket): with streams_lock: if camera_id and camera_id in streams: stream = streams.pop(camera_id) - stream["stop_event"].set() - stream["thread"].join() - if "cap" in stream: - stream["cap"].release() - with models_lock: - if camera_id in models: - del models[camera_id] + camera_url = subscription_to_camera.pop(camera_id, None) + + if camera_url and camera_url in camera_streams: + shared_stream = camera_streams[camera_url] + shared_stream["ref_count"] -= 1 + + # If no more references, stop the shared stream + if shared_stream["ref_count"] <= 0: + logger.info(f"Stopping shared stream for camera URL: {camera_url}") + shared_stream["stop_event"].set() + shared_stream["thread"].join() + if "cap" in shared_stream: + shared_stream["cap"].release() + del camera_streams[camera_url] + else: + logger.info(f"Shared stream for {camera_url} still has {shared_stream['ref_count']} references") + + logger.info(f"Unsubscribed from camera {camera_id}") + # Note: Keep models in memory for potential reuse elif msg_type == "requestState": cpu_usage = psutil.cpu_percent() memory_usage = psutil.virtual_memory().percent @@ -684,6 +776,37 @@ async def detect(websocket: WebSocket): "cameraConnections": camera_connections } await websocket.send_text(json.dumps(state_report)) + + elif msg_type == "setSessionId": + payload = data.get("payload", {}) + display_identifier = payload.get("displayIdentifier") + session_id = payload.get("sessionId") + + if display_identifier: + # Store session ID for this display + if session_id is None: + session_ids.pop(display_identifier, None) + logger.info(f"Cleared session ID for display {display_identifier}") + else: + session_ids[display_identifier] = session_id + logger.info(f"Set session ID {session_id} for display {display_identifier}") + + elif msg_type == "patchSession": + session_id = data.get("sessionId") + patch_data = data.get("data", {}) + + # For now, just acknowledge the patch - actual implementation depends on backend requirements + response = { + "type": "patchSessionResult", + "payload": { + "sessionId": session_id, + "success": True, + "message": "Session patch acknowledged" + } + } + await websocket.send_json(response) + logger.info(f"Acknowledged patch for session {session_id}") + else: logger.error(f"Unknown message type: {msg_type}") except json.JSONDecodeError: @@ -706,19 +829,23 @@ async def detect(websocket: WebSocket): stream_task.cancel() await stream_task with streams_lock: - for camera_id, stream in streams.items(): - stream["stop_event"].set() - stream["thread"].join() - # Only release cap if it exists (RTSP mode) - if "cap" in stream: - stream["cap"].release() - while not stream["buffer"].empty(): + # Clean up shared camera streams + for camera_url, shared_stream in camera_streams.items(): + shared_stream["stop_event"].set() + shared_stream["thread"].join() + if "cap" in shared_stream: + shared_stream["cap"].release() + while not shared_stream["buffer"].empty(): try: - stream["buffer"].get_nowait() + shared_stream["buffer"].get_nowait() except queue.Empty: pass - logger.info(f"Released camera {camera_id} and cleaned up resources") + logger.info(f"Released shared camera stream for {camera_url}") + streams.clear() + camera_streams.clear() + subscription_to_camera.clear() with models_lock: models.clear() + session_ids.clear() logger.info("WebSocket connection closed") diff --git a/test_protocol.py b/test_protocol.py new file mode 100644 index 0000000..74af7d8 --- /dev/null +++ b/test_protocol.py @@ -0,0 +1,125 @@ +#!/usr/bin/env python3 +""" +Test script to verify the worker implementation follows the protocol +""" +import json +import asyncio +import websockets +import time + +async def test_protocol(): + """Test the worker protocol implementation""" + uri = "ws://localhost:8000" + + try: + async with websockets.connect(uri) as websocket: + print("✓ Connected to worker") + + # Test 1: Check if we receive heartbeat (stateReport) + print("\n1. Testing heartbeat...") + try: + message = await asyncio.wait_for(websocket.recv(), timeout=5) + data = json.loads(message) + if data.get("type") == "stateReport": + print("✓ Received stateReport heartbeat") + print(f" - CPU Usage: {data.get('cpuUsage', 'N/A')}%") + print(f" - Memory Usage: {data.get('memoryUsage', 'N/A')}%") + print(f" - Camera Connections: {len(data.get('cameraConnections', []))}") + else: + print(f"✗ Expected stateReport, got {data.get('type')}") + except asyncio.TimeoutError: + print("✗ No heartbeat received within 5 seconds") + + # Test 2: Request state + print("\n2. Testing requestState...") + await websocket.send(json.dumps({"type": "requestState"})) + try: + message = await asyncio.wait_for(websocket.recv(), timeout=5) + data = json.loads(message) + if data.get("type") == "stateReport": + print("✓ Received stateReport response") + else: + print(f"✗ Expected stateReport, got {data.get('type')}") + except asyncio.TimeoutError: + print("✗ No response to requestState within 5 seconds") + + # Test 3: Set session ID + print("\n3. Testing setSessionId...") + session_message = { + "type": "setSessionId", + "payload": { + "displayIdentifier": "display-001", + "sessionId": 12345 + } + } + await websocket.send(json.dumps(session_message)) + print("✓ Sent setSessionId message") + + # Test 4: Test patchSession + print("\n4. Testing patchSession...") + patch_message = { + "type": "patchSession", + "sessionId": 12345, + "data": { + "currentCar": { + "carModel": "Civic", + "carBrand": "Honda" + } + } + } + await websocket.send(json.dumps(patch_message)) + + # Wait for patchSessionResult + try: + message = await asyncio.wait_for(websocket.recv(), timeout=5) + data = json.loads(message) + if data.get("type") == "patchSessionResult": + print("✓ Received patchSessionResult") + print(f" - Success: {data.get('payload', {}).get('success')}") + print(f" - Message: {data.get('payload', {}).get('message')}") + else: + print(f"✗ Expected patchSessionResult, got {data.get('type')}") + except asyncio.TimeoutError: + print("✗ No patchSessionResult received within 5 seconds") + + # Test 5: Test subscribe message format (without actual camera) + print("\n5. Testing subscribe message format...") + subscribe_message = { + "type": "subscribe", + "payload": { + "subscriptionIdentifier": "display-001;cam-001", + "snapshotUrl": "http://example.com/snapshot.jpg", + "snapshotInterval": 5000, + "modelUrl": "http://example.com/model.mpta", + "modelName": "Test Model", + "modelId": 101, + "cropX1": 100, + "cropY1": 200, + "cropX2": 300, + "cropY2": 400 + } + } + await websocket.send(json.dumps(subscribe_message)) + print("✓ Sent subscribe message (will fail without actual camera/model)") + + # Listen for a few more messages to catch any errors + print("\n6. Listening for additional messages...") + for i in range(3): + try: + message = await asyncio.wait_for(websocket.recv(), timeout=2) + data = json.loads(message) + msg_type = data.get("type") + print(f" - Received {msg_type}") + if msg_type == "error": + print(f" Error: {data.get('error')}") + except asyncio.TimeoutError: + break + + print("\n✓ Protocol test completed successfully!") + + except Exception as e: + print(f"✗ Connection failed: {e}") + print("Make sure the worker is running on localhost:8000") + +if __name__ == "__main__": + asyncio.run(test_protocol()) \ No newline at end of file From a1f797f564eb2eb60c659052544589622cf09aea Mon Sep 17 00:00:00 2001 From: Siwat Sirichai Date: Tue, 15 Jul 2025 00:18:28 +0700 Subject: [PATCH 20/23] feat: add HTTP API for image retrieval from camera; implement endpoint for accessing latest image frames --- worker.md | 42 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 42 insertions(+) diff --git a/worker.md b/worker.md index 00a13cf..c50bae5 100644 --- a/worker.md +++ b/worker.md @@ -439,3 +439,45 @@ This section shows a typical sequence of messages between the backend and the wo "cameraConnections": [] } ``` +## 7. HTTP API: Image Retrieval + +In addition to the WebSocket protocol, the worker exposes an HTTP endpoint for retrieving the latest image frame from a camera. + +### Endpoint + +``` +GET /camera/{camera_id}/image +``` + +- **`camera_id`**: The full `subscriptionIdentifier` (e.g., `display-001;cam-001`). + +### Response + +- **Success (200):** Returns the latest JPEG image from the camera stream. + - `Content-Type: image/jpeg` + - Binary JPEG data. + +- **Error (404):** If the camera is not found or no frame is available. + - JSON error response. + +- **Error (500):** Internal server error. + +### Example Request + +``` +GET /camera/display-001;cam-001/image +``` + +### Example Response + +- **Headers:** + ``` + Content-Type: image/jpeg + ``` +- **Body:** Binary JPEG image. + +### Notes + +- The endpoint returns the most recent frame available for the specified camera subscription. +- If multiple displays share the same camera, each subscription has its own buffer; the endpoint uses the buffer for the given `camera_id`. +- This API is useful for debugging, monitoring, or integrating with external systems that require direct image access. From 769371a1a34584c1a1708d4b8abf03f8179aa3d5 Mon Sep 17 00:00:00 2001 From: Siwat Sirichai Date: Tue, 15 Jul 2025 00:30:09 +0700 Subject: [PATCH 21/23] feat: integrate Redis support in pipeline execution; add actions for saving images and publishing messages --- pympta.md | 200 ++++++++++++++++++++++++++++++++++++++++++ requirements.txt | 3 +- siwatsystem/pympta.py | 52 ++++++++++- 3 files changed, 250 insertions(+), 5 deletions(-) create mode 100644 pympta.md diff --git a/pympta.md b/pympta.md new file mode 100644 index 0000000..2136480 --- /dev/null +++ b/pympta.md @@ -0,0 +1,200 @@ +# pympta: Modular Pipeline Task Executor + +`pympta` is a Python module designed to load and execute modular, multi-stage AI pipelines defined in a special package format (`.mpta`). It is primarily used within the detector worker to run complex computer vision tasks where the output of one model can trigger a subsequent model on a specific region of interest. + +## Core Concepts + +### 1. MPTA Package (`.mpta`) + +An `.mpta` file is a standard `.zip` archive with a different extension. It bundles all the necessary components for a pipeline to run. + +A typical `.mpta` file has the following structure: + +``` +my_pipeline.mpta/ +├── pipeline.json +├── model1.pt +├── model2.pt +└── ... +``` + +- **`pipeline.json`**: (Required) The manifest file that defines the structure of the pipeline, the models to use, and the logic connecting them. +- **Model Files (`.pt`, etc.)**: The actual pre-trained model files (e.g., PyTorch, ONNX). The pipeline currently uses `ultralytics.YOLO` models. + +### 2. Pipeline Structure + +A pipeline is a tree-like structure of "nodes," defined in `pipeline.json`. + +- **Root Node**: The entry point of the pipeline. It processes the initial, full-frame image. +- **Branch Nodes**: Child nodes that are triggered by specific detection results from their parent. For example, a root node might detect a "vehicle," which then triggers a branch node to detect a "license plate" within the vehicle's bounding box. + +This modular structure allows for creating complex and efficient inference logic, avoiding the need to run every model on every frame. + +## `pipeline.json` Specification + +This file defines the entire pipeline logic. The root object contains a `pipeline` key for the pipeline definition and an optional `redis` key for Redis configuration. + +### Top-Level Object Structure + +| Key | Type | Required | Description | +| ---------- | ------ | -------- | ------------------------------------------------------- | +| `pipeline` | Object | Yes | The root node object of the pipeline. | +| `redis` | Object | No | Configuration for connecting to a Redis server. | + +### Redis Configuration (`redis`) + +| Key | Type | Required | Description | +| ---------- | ------ | -------- | ------------------------------------------------------- | +| `host` | String | Yes | The hostname or IP address of the Redis server. | +| `port` | Number | Yes | The port number of the Redis server. | +| `password` | String | No | The password for Redis authentication. | +| `db` | Number | No | The Redis database number to use. Defaults to `0`. | + +### Node Object Structure + +| Key | Type | Required | Description | +| ------------------- | ------------- | -------- | -------------------------------------------------------------------------------------------------------------------------------------- | +| `modelId` | String | Yes | A unique identifier for this model node (e.g., "vehicle-detector"). | +| `modelFile` | String | Yes | The path to the model file within the `.mpta` archive (e.g., "yolov8n.pt"). | +| `minConfidence` | Float | Yes | The minimum confidence score (0.0 to 1.0) required for a detection to be considered valid and potentially trigger a branch. | +| `triggerClasses` | Array | Yes | A list of class names that, when detected by the parent, can trigger this node. For the root node, this lists all classes of interest. | +| `crop` | Boolean | No | If `true`, the image is cropped to the parent's detection bounding box before being passed to this node's model. Defaults to `false`. | +| `branches` | Array | No | A list of child node objects that can be triggered by this node's detections. | +| `actions` | Array | No | A list of actions to execute upon a successful detection in this node. | + +### Action Object Structure + +Actions allow the pipeline to interact with Redis. + +#### `redis_save_image` + +Saves the current image frame (or cropped sub-image) to a Redis key. + +| Key | Type | Required | Description | +| ----- | ------ | -------- | ------------------------------------------------------------------------------------------------------- | +| `type`| String | Yes | Must be `"redis_save_image"`. | +| `key` | String | Yes | The Redis key to save the image to. Can contain placeholders like `{class}` or `{id}` to be formatted with detection results. | + +#### `redis_publish` + +Publishes a message to a Redis channel. + +| Key | Type | Required | Description | +| --------- | ------ | -------- | ------------------------------------------------------------------------------------------------------- | +| `type` | String | Yes | Must be `"redis_publish"`. | +| `channel` | String | Yes | The Redis channel to publish the message to. | +| `message` | String | Yes | The message to publish. Can contain placeholders like `{class}` or `{id}` to be formatted with detection results. | + +### Example `pipeline.json` with Redis + +```json +{ + "redis": { + "host": "localhost", + "port": 6379, + "password": "your-password" + }, + "pipeline": { + "modelId": "vehicle-detector", + "modelFile": "vehicle_model.pt", + "minConfidence": 0.5, + "triggerClasses": ["car", "truck"], + "actions": [ + { + "type": "redis_save_image", + "key": "detection:image:{id}" + }, + { + "type": "redis_publish", + "channel": "detections", + "message": "Detected a {class} with ID {id}" + } + ], + "branches": [ + { + "modelId": "lpr-us", + "modelFile": "lpr_model.pt", + "minConfidence": 0.7, + "triggerClasses": ["car"], + "crop": true, + "branches": [] + } + ] + } +} +``` + +## API Reference + +The `pympta` module exposes two main functions. + +### `load_pipeline_from_zip(zip_source: str, target_dir: str) -> dict` + +Loads, extracts, and parses an `.mpta` file to build a pipeline tree in memory. It also establishes a Redis connection if configured in `pipeline.json`. + +- **Parameters:** + - `zip_source` (str): The file path to the local `.mpta` zip archive. + - `target_dir` (str): A directory path where the archive's contents will be extracted. +- **Returns:** + - A dictionary representing the root node of the pipeline, ready to be used with `run_pipeline`. Returns `None` if loading fails. + +### `run_pipeline(frame, node: dict, return_bbox: bool = False)` + +Executes the inference pipeline on a single image frame. + +- **Parameters:** + - `frame`: The input image frame (e.g., a NumPy array from OpenCV). + - `node` (dict): The pipeline node to execute (typically the root node returned by `load_pipeline_from_zip`). + - `return_bbox` (bool): If `True`, the function returns a tuple `(detection, bounding_box)`. Otherwise, it returns only the `detection`. +- **Returns:** + - The final detection result from the last executed node in the chain. A detection is a dictionary like `{'class': 'car', 'confidence': 0.95, 'id': 1}`. If no detection meets the criteria, it returns `None` (or `(None, None)` if `return_bbox` is `True`). + +## Usage Example + +This snippet, inspired by `pipeline_webcam.py`, shows how to use `pympta` to load a pipeline and process an image from a webcam. + +```python +import cv2 +from siwatsystem.pympta import load_pipeline_from_zip, run_pipeline + +# 1. Define paths +MPTA_FILE = "path/to/your/pipeline.mpta" +CACHE_DIR = ".mptacache" + +# 2. Load the pipeline from the .mpta file +# This reads pipeline.json and loads the YOLO models into memory. +model_tree = load_pipeline_from_zip(MPTA_FILE, CACHE_DIR) + +if not model_tree: + print("Failed to load pipeline.") + exit() + +# 3. Open a video source +cap = cv2.VideoCapture(0) + +while True: + ret, frame = cap.read() + if not ret: + break + + # 4. Run the pipeline on the current frame + # The function will handle the entire logic tree (e.g., find a car, then find its license plate). + detection_result, bounding_box = run_pipeline(frame, model_tree, return_bbox=True) + + # 5. Display the results + if detection_result: + print(f"Detected: {detection_result['class']} with confidence {detection_result['confidence']:.2f}") + if bounding_box: + x1, y1, x2, y2 = bounding_box + cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2) + cv2.putText(frame, detection_result['class'], (x1, y1 - 10), + cv2.FONT_HERSHEY_SIMPLEX, 0.9, (36, 255, 12), 2) + + cv2.imshow("Pipeline Output", frame) + + if cv2.waitKey(1) & 0xFF == ord('q'): + break + +cap.release() +cv2.destroyAllWindows() +``` \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index 84f45cc..49ca601 100644 --- a/requirements.txt +++ b/requirements.txt @@ -5,4 +5,5 @@ torchvision ultralytics opencv-python websockets -fastapi[standard] \ No newline at end of file +fastapi[standard] +redis \ No newline at end of file diff --git a/siwatsystem/pympta.py b/siwatsystem/pympta.py index 5e32596..4514182 100644 --- a/siwatsystem/pympta.py +++ b/siwatsystem/pympta.py @@ -7,13 +7,14 @@ import requests import zipfile import shutil import traceback +import redis from ultralytics import YOLO from urllib.parse import urlparse # Create a logger specifically for this module logger = logging.getLogger("detector_worker.pympta") -def load_pipeline_node(node_config: dict, mpta_dir: str) -> dict: +def load_pipeline_node(node_config: dict, mpta_dir: str, redis_client) -> dict: # Recursively load a model node from configuration. model_path = os.path.join(mpta_dir, node_config["modelFile"]) if not os.path.exists(model_path): @@ -44,13 +45,15 @@ def load_pipeline_node(node_config: dict, mpta_dir: str) -> dict: "triggerClassIndices": trigger_class_indices, "crop": node_config.get("crop", False), "minConfidence": node_config.get("minConfidence", None), + "actions": node_config.get("actions", []), "model": model, - "branches": [] + "branches": [], + "redis_client": redis_client } logger.debug(f"Configured node {node_config['modelId']} with trigger classes: {node['triggerClasses']}") for child in node_config.get("branches", []): logger.debug(f"Loading branch for parent node {node_config['modelId']}") - node["branches"].append(load_pipeline_node(child, mpta_dir)) + node["branches"].append(load_pipeline_node(child, mpta_dir, redis_client)) return node def load_pipeline_from_zip(zip_source: str, target_dir: str) -> dict: @@ -158,7 +161,26 @@ def load_pipeline_from_zip(zip_source: str, target_dir: str) -> dict: pipeline_config = json.load(f) logger.info(f"Successfully loaded pipeline configuration from {pipeline_json_path}") logger.debug(f"Pipeline config: {json.dumps(pipeline_config, indent=2)}") - return load_pipeline_node(pipeline_config["pipeline"], mpta_dir) + + # Establish Redis connection if configured + redis_client = None + if "redis" in pipeline_config: + redis_config = pipeline_config["redis"] + try: + redis_client = redis.Redis( + host=redis_config["host"], + port=redis_config["port"], + password=redis_config.get("password"), + db=redis_config.get("db", 0), + decode_responses=True + ) + redis_client.ping() + logger.info(f"Successfully connected to Redis at {redis_config['host']}:{redis_config['port']}") + except redis.exceptions.ConnectionError as e: + logger.error(f"Failed to connect to Redis: {e}") + redis_client = None + + return load_pipeline_node(pipeline_config["pipeline"], mpta_dir, redis_client) except json.JSONDecodeError as e: logger.error(f"Error parsing pipeline.json: {str(e)}", exc_info=True) return None @@ -169,6 +191,25 @@ def load_pipeline_from_zip(zip_source: str, target_dir: str) -> dict: logger.error(f"Error loading pipeline.json: {str(e)}", exc_info=True) return None +def execute_actions(node, frame, detection_result): + if not node["redis_client"] or not node["actions"]: + return + + for action in node["actions"]: + try: + if action["type"] == "redis_save_image": + key = action["key"].format(**detection_result) + _, buffer = cv2.imencode('.jpg', frame) + node["redis_client"].set(key, buffer.tobytes()) + logger.info(f"Saved image to Redis with key: {key}") + elif action["type"] == "redis_publish": + channel = action["channel"] + message = action["message"].format(**detection_result) + node["redis_client"].publish(channel, message) + logger.info(f"Published message to Redis channel '{channel}': {message}") + except Exception as e: + logger.error(f"Error executing action {action['type']}: {e}") + def run_pipeline(frame, node: dict, return_bbox: bool=False): """ - For detection nodes (task != 'classify'): @@ -206,6 +247,7 @@ def run_pipeline(frame, node: dict, return_bbox: bool=False): "confidence": top1_conf, "id": None } + execute_actions(node, frame, det) return (det, None) if return_bbox else det @@ -254,9 +296,11 @@ def run_pipeline(frame, node: dict, return_bbox: bool=False): det2, _ = run_pipeline(sub, br, return_bbox=True) if det2: # return classification result + original bbox + execute_actions(br, sub, det2) return (det2, best_box) if return_bbox else det2 # ─── No branch matched → return this detection ───────────── + execute_actions(node, frame, best_det) return (best_det, best_box) if return_bbox else best_det except Exception as e: From f50585f26d11bdb25e0e72f1bc002ae6366c604e Mon Sep 17 00:00:00 2001 From: Siwat Sirichai Date: Tue, 15 Jul 2025 00:35:22 +0700 Subject: [PATCH 22/23] feat: enhance Redis action handling; add dynamic context for actions and support for expiration time --- pympta.md | 48 +++++++++++++++++++++++-------------------- siwatsystem/pympta.py | 24 ++++++++++++++++++---- 2 files changed, 46 insertions(+), 26 deletions(-) diff --git a/pympta.md b/pympta.md index 2136480..ac61f4a 100644 --- a/pympta.md +++ b/pympta.md @@ -64,16 +64,26 @@ This file defines the entire pipeline logic. The root object contains a `pipelin ### Action Object Structure -Actions allow the pipeline to interact with Redis. +Actions allow the pipeline to interact with Redis. They are executed sequentially for a given detection. + +#### Action Context & Dynamic Keys + +All actions have access to a dynamic context for formatting keys and messages. The context is created for each detection event and includes: + +- All key-value pairs from the detection result (e.g., `class`, `confidence`, `id`). +- `{timestamp_ms}`: The current Unix timestamp in milliseconds. +- `{uuid}`: A unique identifier (UUID4) for the detection event. +- `{image_key}`: If a `redis_save_image` action has already been executed for this event, this placeholder will be replaced with the key where the image was stored. #### `redis_save_image` Saves the current image frame (or cropped sub-image) to a Redis key. -| Key | Type | Required | Description | -| ----- | ------ | -------- | ------------------------------------------------------------------------------------------------------- | -| `type`| String | Yes | Must be `"redis_save_image"`. | -| `key` | String | Yes | The Redis key to save the image to. Can contain placeholders like `{class}` or `{id}` to be formatted with detection results. | +| Key | Type | Required | Description | +| ---------------- | ------ | -------- | ------------------------------------------------------------------------------------------------------- | +| `type` | String | Yes | Must be `"redis_save_image"`. | +| `key` | String | Yes | The Redis key to save the image to. Can contain any of the dynamic placeholders. | +| `expire_seconds` | Number | No | If provided, sets an expiration time (in seconds) for the Redis key. | #### `redis_publish` @@ -83,43 +93,37 @@ Publishes a message to a Redis channel. | --------- | ------ | -------- | ------------------------------------------------------------------------------------------------------- | | `type` | String | Yes | Must be `"redis_publish"`. | | `channel` | String | Yes | The Redis channel to publish the message to. | -| `message` | String | Yes | The message to publish. Can contain placeholders like `{class}` or `{id}` to be formatted with detection results. | +| `message` | String | Yes | The message to publish. Can contain any of the dynamic placeholders, including `{image_key}`. | ### Example `pipeline.json` with Redis +This example demonstrates a pipeline that detects vehicles, saves a uniquely named image of each detection that expires in one hour, and then publishes a notification with the image key. + ```json { "redis": { - "host": "localhost", + "host": "redis.local", "port": 6379, - "password": "your-password" + "password": "your-super-secret-password" }, "pipeline": { "modelId": "vehicle-detector", "modelFile": "vehicle_model.pt", - "minConfidence": 0.5, + "minConfidence": 0.6, "triggerClasses": ["car", "truck"], "actions": [ { "type": "redis_save_image", - "key": "detection:image:{id}" + "key": "detections:{class}:{timestamp_ms}:{uuid}", + "expire_seconds": 3600 }, { "type": "redis_publish", - "channel": "detections", - "message": "Detected a {class} with ID {id}" + "channel": "vehicle_events", + "message": "{\"event\":\"new_detection\",\"class\":\"{class}\",\"confidence\":{confidence},\"image_key\":\"{image_key}\"}" } ], - "branches": [ - { - "modelId": "lpr-us", - "modelFile": "lpr_model.pt", - "minConfidence": 0.7, - "triggerClasses": ["car"], - "crop": true, - "branches": [] - } - ] + "branches": [] } } ``` diff --git a/siwatsystem/pympta.py b/siwatsystem/pympta.py index 4514182..f151b55 100644 --- a/siwatsystem/pympta.py +++ b/siwatsystem/pympta.py @@ -8,6 +8,8 @@ import zipfile import shutil import traceback import redis +import time +import uuid from ultralytics import YOLO from urllib.parse import urlparse @@ -195,16 +197,30 @@ def execute_actions(node, frame, detection_result): if not node["redis_client"] or not node["actions"]: return + # Create a dynamic context for this detection event + action_context = { + **detection_result, + "timestamp_ms": int(time.time() * 1000), + "uuid": str(uuid.uuid4()), + } + for action in node["actions"]: try: if action["type"] == "redis_save_image": - key = action["key"].format(**detection_result) + key = action["key"].format(**action_context) _, buffer = cv2.imencode('.jpg', frame) - node["redis_client"].set(key, buffer.tobytes()) - logger.info(f"Saved image to Redis with key: {key}") + expire_seconds = action.get("expire_seconds") + if expire_seconds: + node["redis_client"].setex(key, expire_seconds, buffer.tobytes()) + logger.info(f"Saved image to Redis with key: {key} (expires in {expire_seconds}s)") + else: + node["redis_client"].set(key, buffer.tobytes()) + logger.info(f"Saved image to Redis with key: {key}") + # Add the generated key to the context for subsequent actions + action_context["image_key"] = key elif action["type"] == "redis_publish": channel = action["channel"] - message = action["message"].format(**detection_result) + message = action["message"].format(**action_context) node["redis_client"].publish(channel, message) logger.info(f"Published message to Redis channel '{channel}': {message}") except Exception as e: From e6716bbe73946dad54f37b7794b86b6d39719abb Mon Sep 17 00:00:00 2001 From: Siwat Sirichai Date: Wed, 16 Jul 2025 03:24:40 +0700 Subject: [PATCH 23/23] feat: add comprehensive documentation for Python Detector Worker; include project overview, architecture, core components, and configuration details --- CLAUDE.md | 188 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 188 insertions(+) create mode 100644 CLAUDE.md diff --git a/CLAUDE.md b/CLAUDE.md new file mode 100644 index 0000000..3177259 --- /dev/null +++ b/CLAUDE.md @@ -0,0 +1,188 @@ +# Python Detector Worker - CLAUDE.md + +## Project Overview +This is a FastAPI-based computer vision detection worker that processes video streams from RTSP/HTTP sources and runs YOLO-based machine learning pipelines for object detection and classification. The system is designed to work within a larger CMS (Content Management System) architecture. + +## Architecture & Technology Stack +- **Framework**: FastAPI with WebSocket support +- **ML/CV**: PyTorch, Ultralytics YOLO, OpenCV +- **Containerization**: Docker (Python 3.13-bookworm base) +- **Data Storage**: Redis integration for action handling +- **Communication**: WebSocket-based real-time protocol + +## Core Components + +### Main Application (`app.py`) +- **FastAPI WebSocket server** for real-time communication +- **Multi-camera stream management** with shared stream optimization +- **HTTP REST endpoint** for image retrieval (`/camera/{camera_id}/image`) +- **Threading-based frame readers** for RTSP streams and HTTP snapshots +- **Model loading and inference** using MPTA (Machine Learning Pipeline Archive) format +- **Session management** with display identifier mapping +- **Resource monitoring** (CPU, memory, GPU usage via psutil) + +### Pipeline System (`siwatsystem/pympta.py`) +- **MPTA file handling** - ZIP archives containing model configurations +- **Hierarchical pipeline execution** with detection → classification branching +- **Redis action system** for image saving and message publishing +- **Dynamic model loading** with GPU optimization +- **Configurable trigger classes and confidence thresholds** + +### Testing & Debugging +- **Protocol test script** (`test_protocol.py`) for WebSocket communication validation +- **Pipeline webcam utility** (`pipeline_webcam.py`) for local testing with visual output +- **RTSP streaming debug tool** (`debug/rtsp_webcam.py`) using GStreamer + +## Code Conventions & Patterns + +### Logging +- **Structured logging** using Python's logging module +- **File + console output** to `detector_worker.log` +- **Debug level separation** for detailed troubleshooting +- **Context-aware messages** with camera IDs and model information + +### Error Handling +- **Graceful failure handling** with retry mechanisms (configurable max_retries) +- **Thread-safe operations** using locks for streams and models +- **WebSocket disconnect handling** with proper cleanup +- **Model loading validation** with detailed error reporting + +### Configuration +- **JSON configuration** (`config.json`) for runtime parameters: + - `poll_interval_ms`: Frame processing interval + - `max_streams`: Concurrent stream limit + - `target_fps`: Target frame rate + - `reconnect_interval_sec`: Stream reconnection delay + - `max_retries`: Maximum retry attempts (-1 for unlimited) + +### Threading Model +- **Frame reader threads** for each camera stream (RTSP/HTTP) +- **Shared stream optimization** - multiple subscriptions can reuse the same camera stream +- **Async WebSocket handling** with concurrent task management +- **Thread-safe data structures** with proper locking mechanisms + +## WebSocket Protocol + +### Message Types +- **subscribe**: Start camera stream with model pipeline +- **unsubscribe**: Stop camera stream processing +- **requestState**: Request current worker status +- **setSessionId**: Associate display with session identifier +- **patchSession**: Update session data +- **stateReport**: Periodic heartbeat with system metrics +- **imageDetection**: Detection results with timestamp and model info + +### Subscription Format +```json +{ + "type": "subscribe", + "payload": { + "subscriptionIdentifier": "display-001;cam-001", + "rtspUrl": "rtsp://...", // OR snapshotUrl + "snapshotUrl": "http://...", + "snapshotInterval": 5000, + "modelUrl": "http://...model.mpta", + "modelId": 101, + "modelName": "Vehicle Detection", + "cropX1": 100, "cropY1": 200, + "cropX2": 300, "cropY2": 400 + } +} +``` + +## Model Pipeline (MPTA) Format + +### Structure +- **ZIP archive** containing models and configuration +- **pipeline.json** - Main configuration file +- **Model files** - YOLO .pt files for detection/classification +- **Redis configuration** - Optional for action execution + +### Pipeline Flow +1. **Detection stage** - YOLO object detection with bounding boxes +2. **Trigger evaluation** - Check if detected class matches trigger conditions +3. **Classification stage** - Crop detected region and run classification model +4. **Action execution** - Redis operations (image saving, message publishing) + +### Branch Configuration +```json +{ + "modelId": "detector-v1", + "modelFile": "detector.pt", + "triggerClasses": ["car", "truck"], + "minConfidence": 0.5, + "branches": [{ + "modelId": "classifier-v1", + "modelFile": "classifier.pt", + "crop": true, + "triggerClasses": ["car"], + "minConfidence": 0.3, + "actions": [...] + }] +} +``` + +## Stream Management + +### Shared Streams +- Multiple subscriptions can share the same camera URL +- Reference counting prevents premature stream termination +- Automatic cleanup when last subscription ends + +### Frame Processing +- **Queue-based buffering** with single frame capacity (latest frame only) +- **Configurable polling interval** based on target FPS +- **Automatic reconnection** with exponential backoff + +## Development & Testing + +### Local Development +```bash +# Install dependencies +pip install -r requirements.txt + +# Run the worker +python app.py + +# Test protocol compliance +python test_protocol.py + +# Test pipeline with webcam +python pipeline_webcam.py --mpta-file path/to/model.mpta --video 0 +``` + +### Docker Deployment +```bash +# Build container +docker build -t detector-worker . + +# Run with volume mounts for models +docker run -p 8000:8000 -v ./models:/app/models detector-worker +``` + +### Testing Commands +- **Protocol testing**: `python test_protocol.py` +- **Pipeline validation**: `python pipeline_webcam.py --mpta-file --video 0` +- **RTSP debugging**: `python debug/rtsp_webcam.py` + +## Dependencies +- **fastapi[standard]**: Web framework with WebSocket support +- **uvicorn**: ASGI server +- **torch, torchvision**: PyTorch for ML inference +- **ultralytics**: YOLO implementation +- **opencv-python**: Computer vision operations +- **websockets**: WebSocket client/server +- **redis**: Redis client for action execution + +## Security Considerations +- Model files are loaded from trusted sources only +- Redis connections use authentication when configured +- WebSocket connections handle disconnects gracefully +- Resource usage is monitored to prevent DoS + +## Performance Optimizations +- GPU acceleration when CUDA is available +- Shared camera streams reduce resource usage +- Frame queue optimization (single latest frame) +- Model caching across subscriptions +- Trigger class filtering for faster inference \ No newline at end of file