From 086ba0e4d38b495d3317cdb6d1728d3abc9b8009 Mon Sep 17 00:00:00 2001 From: ziesorx Date: Sat, 13 Sep 2025 01:00:49 +0700 Subject: [PATCH] Fix: got data from backend --- app.py | 9 ++++ .../communication/websocket_handler.py | 34 ++++++++++-- detector_worker/models/model_manager.py | 23 +++++++- detector_worker/streams/stream_manager.py | 53 ++++++++++++++++--- 4 files changed, 107 insertions(+), 12 deletions(-) diff --git a/app.py b/app.py index f6ca225..ed062be 100644 --- a/app.py +++ b/app.py @@ -5,6 +5,7 @@ This replaces the monolithic app.py with a clean, maintainable structure using dependency injection and singleton managers. """ import logging +import os from contextlib import asynccontextmanager from fastapi import FastAPI, WebSocket, HTTPException from fastapi.responses import Response @@ -91,6 +92,14 @@ async def lifespan(app: FastAPI): camera_manager.clear_all() pipeline_manager.clear_all() + # Clear model cache files on disk + import shutil + from detector_worker.core.config import MODELS_DIR + if os.path.exists(MODELS_DIR): + logger.info(f"Clearing model cache directory: {MODELS_DIR}") + shutil.rmtree(MODELS_DIR) + logger.info("Model cache directory cleared") + # Clear dependency container singletons container.get_container().clear_singletons() diff --git a/detector_worker/communication/websocket_handler.py b/detector_worker/communication/websocket_handler.py index 64671d1..4044055 100644 --- a/detector_worker/communication/websocket_handler.py +++ b/detector_worker/communication/websocket_handler.py @@ -7,6 +7,7 @@ and coordination between stream processing and detection pipelines. import asyncio import json import logging +import os import time import traceback import uuid @@ -429,11 +430,38 @@ class WebSocketHandler: print(f"📦 Subscription {i+1}: {sub_id} | Model {model_id}") - # Track unique models for download + # Track unique models for download - check if model already exists locally if model_id and model_url: if model_id not in unique_models: - unique_models[model_id] = model_url - print(f"🎯 New model found: ID {model_id}") + # Check if model directory already exists on disk + from ..core.config import MODELS_DIR + model_dir = os.path.join(MODELS_DIR, str(model_id)) + + print(f"🔍 Checking model directory: {model_dir}") + logger.info(f"Checking if model {model_id} exists at: {model_dir}") + + if os.path.exists(model_dir) and os.path.isdir(model_dir): + # Check if directory has content (not empty) + dir_contents = os.listdir(model_dir) + actual_contents = [f for f in dir_contents if not f.startswith('.')] + + print(f"📋 Directory contents: {dir_contents}") + print(f"📋 Filtered contents: {actual_contents}") + logger.info(f"Model {model_id} directory contents: {actual_contents}") + + if actual_contents: + print(f"📁 Model {model_id} already exists locally, skipping download") + logger.info(f"Skipping download for model {model_id} - already exists") + else: + print(f"📁 Model {model_id} directory exists but empty, will download") + unique_models[model_id] = model_url + print(f"🎯 New model found: ID {model_id}") + logger.info(f"Model {model_id} directory empty, adding to download queue") + else: + print(f"📁 Model {model_id} directory does not exist, will download") + unique_models[model_id] = model_url + print(f"🎯 New model found: ID {model_id}") + logger.info(f"Model {model_id} directory not found, adding to download queue") else: print(f"🔄 Model {model_id} already tracked") diff --git a/detector_worker/models/model_manager.py b/detector_worker/models/model_manager.py index cd63e12..1692b8b 100644 --- a/detector_worker/models/model_manager.py +++ b/detector_worker/models/model_manager.py @@ -251,6 +251,27 @@ class ModelManager: if parsed.scheme in ['http', 'https']: # Create model_id subfolder structure model_dir = os.path.join(self.models_dir, str(model_id)) + + # Simple check: if model_id directory already exists, skip download entirely + if os.path.exists(model_dir) and os.path.isdir(model_dir): + dir_contents = os.listdir(model_dir) + # Filter out hidden files like .DS_Store + actual_contents = [f for f in dir_contents if not f.startswith('.')] + if actual_contents: + logger.info(f"Model {model_id} directory already exists, skipping download") + + # Look for existing MPTA file + mpta_files = [f for f in actual_contents if f.endswith('.mpta')] + if mpta_files: + existing_mpta = os.path.join(model_dir, mpta_files[0]) + logger.info(f"Using existing MPTA file: {existing_mpta}") + return existing_mpta + + # No MPTA file but directory exists - this shouldn't happen in normal flow + # But let's handle it by proceeding with download + logger.warning(f"Model {model_id} directory exists but no MPTA file found") + + # Create the directory if it doesn't exist os.makedirs(model_dir, exist_ok=True) # Generate cache filename @@ -260,7 +281,7 @@ class ModelManager: cache_path = os.path.join(model_dir, filename) - # Check if already cached + # Check if exact MPTA file already cached if os.path.exists(cache_path): logger.info(f"Using cached model file: {cache_path}") return cache_path diff --git a/detector_worker/streams/stream_manager.py b/detector_worker/streams/stream_manager.py index 8908800..40548c8 100644 --- a/detector_worker/streams/stream_manager.py +++ b/detector_worker/streams/stream_manager.py @@ -134,8 +134,13 @@ class StreamManager: camera_id: str, rtsp_url: Optional[str] = None, snapshot_url: Optional[str] = None, - snapshot_interval: Optional[int] = None) -> StreamInfo: - """Create StreamInfo object based on stream type.""" + snapshot_interval: Optional[int] = None, + subscription_metadata: Optional[Dict[str, Any]] = None) -> StreamInfo: + """Create StreamInfo object based on stream type with subscription metadata.""" + + # Extract subscription metadata if provided + metadata = subscription_metadata or {} + if snapshot_url and snapshot_interval: return StreamInfo( camera_id=camera_id, @@ -143,7 +148,15 @@ class StreamManager: stream_type="snapshot", snapshot_interval=snapshot_interval, buffer=queue.Queue(maxsize=SHARED_STREAM_BUFFER_SIZE), - stop_event=threading.Event() + stop_event=threading.Event(), + # Add subscription metadata + subscriptionIdentifier=metadata.get("subscriptionIdentifier"), + modelId=metadata.get("modelId"), + modelName=metadata.get("modelName"), + cropX1=metadata.get("cropX1"), + cropY1=metadata.get("cropY1"), + cropX2=metadata.get("cropX2"), + cropY2=metadata.get("cropY2") ) elif rtsp_url: return StreamInfo( @@ -151,7 +164,15 @@ class StreamManager: stream_url=rtsp_url, stream_type="rtsp", buffer=queue.Queue(maxsize=SHARED_STREAM_BUFFER_SIZE), - stop_event=threading.Event() + stop_event=threading.Event(), + # Add subscription metadata + subscriptionIdentifier=metadata.get("subscriptionIdentifier"), + modelId=metadata.get("modelId"), + modelName=metadata.get("modelName"), + cropX1=metadata.get("cropX1"), + cropY1=metadata.get("cropY1"), + cropX2=metadata.get("cropX2"), + cropY2=metadata.get("cropY2") ) else: raise ValueError("Must provide either RTSP URL or snapshot URL with interval") @@ -162,7 +183,8 @@ class StreamManager: subscriber_id: str, rtsp_url: Optional[str] = None, snapshot_url: Optional[str] = None, - snapshot_interval: Optional[int] = None) -> bool: + snapshot_interval: Optional[int] = None, + subscription_metadata: Optional[Dict[str, Any]] = None) -> bool: """ Create a stream subscription. @@ -194,7 +216,7 @@ class StreamManager: # Create or get existing stream if camera_id not in self._streams: stream_info = self._create_stream_info( - camera_id, rtsp_url, snapshot_url, snapshot_interval + camera_id, rtsp_url, snapshot_url, snapshot_interval, subscription_metadata ) self._streams[camera_id] = stream_info @@ -642,6 +664,19 @@ class StreamManager: # Create a subscriber_id (for WebSocket compatibility, use the subscription_id) subscriber_id = f"websocket_{int(time.time() * 1000)}" + # Extract subscription metadata from payload + subscription_metadata = { + "subscriptionIdentifier": payload.get("subscriptionIdentifier"), + "modelId": payload.get("modelId"), + "modelName": payload.get("modelName"), + "cropX1": payload.get("cropX1"), + "cropY1": payload.get("cropY1"), + "cropX2": payload.get("cropX2"), + "cropY2": payload.get("cropY2") + } + + logger.info(f"Extracted subscription metadata for camera {camera_id}: {subscription_metadata}") + # Create subscription based on available URL type if rtsp_url and rtsp_url.strip(): logger.info(f"Creating RTSP stream for camera {camera_id}: {rtsp_url}") @@ -649,7 +684,8 @@ class StreamManager: subscription_id=subscription_id, camera_id=camera_id, subscriber_id=subscriber_id, - rtsp_url=rtsp_url.strip() + rtsp_url=rtsp_url.strip(), + subscription_metadata=subscription_metadata ) elif snapshot_url and snapshot_url.strip(): logger.info(f"Creating snapshot stream for camera {camera_id}: {snapshot_url}") @@ -658,7 +694,8 @@ class StreamManager: camera_id=camera_id, subscriber_id=subscriber_id, snapshot_url=snapshot_url.strip(), - snapshot_interval=snapshot_interval + snapshot_interval=snapshot_interval, + subscription_metadata=subscription_metadata ) else: logger.error(f"No valid stream URL provided for camera {camera_id}. "