""" Shared MPTA Manager for Disk Space Optimization This module implements shared MPTA file management to prevent duplicate downloads and extractions when multiple cameras use the same model. MPTA files are stored in modelId-based directories and shared across all cameras using that model. Key Features: - Thread-safe MPTA downloading and extraction - ModelId-based directory structure: models/{modelId}/ - Reference counting for proper cleanup - Eliminates duplicate MPTA downloads - Maintains compatibility with existing pipeline system """ import os import threading import logging import shutil import requests from typing import Dict, Set, Optional from urllib.parse import urlparse from .pympta import load_pipeline_from_zip # Create a logger for this module logger = logging.getLogger("detector_worker.mpta_manager") class MPTAManager: """ Singleton class for managing shared MPTA files across multiple cameras. This manager ensures that each unique modelId is downloaded and extracted only once, dramatically reducing disk usage and download time when multiple cameras use the same model. """ _instance = None _lock = threading.Lock() def __new__(cls): if cls._instance is None: with cls._lock: if cls._instance is None: cls._instance = super(MPTAManager, cls).__new__(cls) cls._instance._initialized = False return cls._instance def __init__(self): if self._initialized: return self._initialized = True # Thread-safe storage for MPTA management self._model_paths: Dict[int, str] = {} # modelId -> shared_extraction_path self._mpta_file_paths: Dict[int, str] = {} # modelId -> local_mpta_file_path self._reference_counts: Dict[int, int] = {} # modelId -> reference count self._download_locks: Dict[int, threading.Lock] = {} # modelId -> download lock self._cameras_using_model: Dict[int, Set[str]] = {} # modelId -> set of camera_ids self._manager_lock = threading.RLock() # Reentrant lock for nested calls logger.info("๐Ÿญ Shared MPTA Manager initialized - ready for disk-optimized MPTA management") def get_or_download_mpta(self, model_id: int, model_url: str, camera_id: str) -> Optional[tuple[str, str]]: """ Get or download an MPTA file. Returns (extraction_path, mpta_file_path) if successful. Args: model_id: Unique identifier for the model model_url: URL to download the MPTA file from camera_id: Identifier for the requesting camera Returns: Tuple of (extraction_path, mpta_file_path), or None if failed """ with self._manager_lock: # Track camera usage if model_id not in self._cameras_using_model: self._cameras_using_model[model_id] = set() self._cameras_using_model[model_id].add(camera_id) # Check if model directory already exists on disk (from previous sessions) if model_id not in self._model_paths: potential_path = f"models/{model_id}" if os.path.exists(potential_path) and os.path.isdir(potential_path): # Directory exists from previous session, find the MPTA file mpta_files = [f for f in os.listdir(potential_path) if f.endswith('.mpta')] if mpta_files: # Use the first .mpta file found mpta_file_path = os.path.join(potential_path, mpta_files[0]) self._model_paths[model_id] = potential_path self._mpta_file_paths[model_id] = mpta_file_path self._reference_counts[model_id] = 0 # Will be incremented below logger.info(f"๐Ÿ“‚ Found existing MPTA modelId {model_id} from previous session") # Check if already available if model_id in self._model_paths: shared_path = self._model_paths[model_id] mpta_file_path = self._mpta_file_paths.get(model_id) if os.path.exists(shared_path) and mpta_file_path and os.path.exists(mpta_file_path): self._reference_counts[model_id] += 1 logger.info(f"๐Ÿ“‚ MPTA modelId {model_id} reused for camera {camera_id} (ref_count: {self._reference_counts[model_id]}) - SAVED DOWNLOAD!") return (shared_path, mpta_file_path) else: # Path was deleted externally, clean up our records logger.warning(f"โš ๏ธ MPTA path for modelId {model_id} was deleted externally, will re-download") del self._model_paths[model_id] self._mpta_file_paths.pop(model_id, None) self._reference_counts.pop(model_id, 0) # Need to download - get or create download lock for this modelId if model_id not in self._download_locks: self._download_locks[model_id] = threading.Lock() # Download with model-specific lock (released _manager_lock to allow other models) download_lock = self._download_locks[model_id] with download_lock: # Double-check after acquiring download lock with self._manager_lock: if model_id in self._model_paths and os.path.exists(self._model_paths[model_id]): mpta_file_path = self._mpta_file_paths.get(model_id) if mpta_file_path and os.path.exists(mpta_file_path): self._reference_counts[model_id] += 1 logger.info(f"๐Ÿ“‚ MPTA modelId {model_id} became available during wait (ref_count: {self._reference_counts[model_id]})") return (self._model_paths[model_id], mpta_file_path) # Actually download and extract shared_path = f"models/{model_id}" logger.info(f"๐Ÿ”„ Downloading NEW MPTA for modelId {model_id} from {model_url}") try: # Ensure directory exists os.makedirs(shared_path, exist_ok=True) # Download MPTA file mpta_filename = self._extract_filename_from_url(model_url) or f"model_{model_id}.mpta" local_mpta_path = os.path.join(shared_path, mpta_filename) if not self._download_file(model_url, local_mpta_path): logger.error(f"โŒ Failed to download MPTA for modelId {model_id}") return None # Extract MPTA pipeline_tree = load_pipeline_from_zip(local_mpta_path, shared_path) if pipeline_tree is None: logger.error(f"โŒ Failed to extract MPTA for modelId {model_id}") return None # Success - register in manager with self._manager_lock: self._model_paths[model_id] = shared_path self._mpta_file_paths[model_id] = local_mpta_path self._reference_counts[model_id] = 1 logger.info(f"โœ… MPTA modelId {model_id} downloaded and registered (ref_count: 1)") self._log_manager_status() return (shared_path, local_mpta_path) except Exception as e: logger.error(f"โŒ Error downloading/extracting MPTA for modelId {model_id}: {e}") # Clean up partial download if os.path.exists(shared_path): shutil.rmtree(shared_path, ignore_errors=True) return None def release_mpta(self, model_id: int, camera_id: str) -> None: """ Release a reference to an MPTA. If reference count reaches zero, the MPTA directory may be cleaned up to free disk space. Args: model_id: Unique identifier for the model to release camera_id: Identifier for the camera releasing the reference """ with self._manager_lock: if model_id not in self._reference_counts: logger.warning(f"โš ๏ธ Attempted to release unknown MPTA modelId {model_id} for camera {camera_id}") return # Remove camera from usage tracking if model_id in self._cameras_using_model: self._cameras_using_model[model_id].discard(camera_id) self._reference_counts[model_id] -= 1 logger.info(f"๐Ÿ“‰ MPTA modelId {model_id} reference count decreased to {self._reference_counts[model_id]} (released by {camera_id})") # Clean up if no more references # if self._reference_counts[model_id] <= 0: # self._cleanup_mpta(model_id) def _cleanup_mpta(self, model_id: int) -> None: """ Internal method to clean up an MPTA directory and free disk space. """ if model_id in self._model_paths: shared_path = self._model_paths[model_id] try: if os.path.exists(shared_path): shutil.rmtree(shared_path) logger.info(f"๐Ÿ—‘๏ธ Cleaned up MPTA directory: {shared_path}") # Remove from tracking del self._model_paths[model_id] self._mpta_file_paths.pop(model_id, None) del self._reference_counts[model_id] self._cameras_using_model.pop(model_id, None) # Clean up download lock (optional, could keep for future use) self._download_locks.pop(model_id, None) logger.info(f"โœ… MPTA modelId {model_id} fully cleaned up and disk space freed") self._log_manager_status() except Exception as e: logger.error(f"โŒ Error cleaning up MPTA modelId {model_id}: {e}") def get_shared_path(self, model_id: int) -> Optional[str]: """ Get the shared extraction path for a modelId without downloading. Args: model_id: Model identifier to look up Returns: Shared path if available, None otherwise """ with self._manager_lock: return self._model_paths.get(model_id) def get_manager_status(self) -> Dict: """ Get current status of the MPTA manager. Returns: Dictionary with manager statistics """ with self._manager_lock: return { "total_mpta_models": len(self._model_paths), "models": { str(model_id): { "shared_path": path, "reference_count": self._reference_counts.get(model_id, 0), "cameras_using": list(self._cameras_using_model.get(model_id, set())) } for model_id, path in self._model_paths.items() }, "total_references": sum(self._reference_counts.values()), "active_downloads": len(self._download_locks) } def _log_manager_status(self) -> None: """Log current manager status for debugging.""" status = self.get_manager_status() logger.info(f"๐Ÿ“Š MPTA Manager Status: {status['total_mpta_models']} unique models, {status['total_references']} total references") for model_id, info in status['models'].items(): cameras_str = ','.join(info['cameras_using'][:3]) # Show first 3 cameras if len(info['cameras_using']) > 3: cameras_str += f"+{len(info['cameras_using'])-3} more" logger.debug(f" ๐Ÿ“‹ ModelId {model_id}: refs={info['reference_count']}, cameras=[{cameras_str}]") def cleanup_all(self) -> None: """ Clean up all MPTA directories. Used during shutdown. """ with self._manager_lock: model_ids = list(self._model_paths.keys()) logger.info(f"๐Ÿงน Cleaning up {len(model_ids)} MPTA directories") for model_id in model_ids: self._cleanup_mpta(model_id) # Clear all tracking data self._download_locks.clear() logger.info("โœ… MPTA manager cleanup complete") def _download_file(self, url: str, local_path: str) -> bool: """ Download a file from URL to local path with progress logging. Args: url: URL to download from local_path: Local path to save to Returns: True if successful, False otherwise """ try: logger.info(f"โฌ‡๏ธ Starting download from {url}") response = requests.get(url, stream=True) response.raise_for_status() total_size = int(response.headers.get('content-length', 0)) if total_size > 0: logger.info(f"๐Ÿ“ฆ File size: {total_size / 1024 / 1024:.2f} MB") downloaded = 0 last_logged_progress = 0 with open(local_path, 'wb') as f: for chunk in response.iter_content(chunk_size=8192): if chunk: f.write(chunk) downloaded += len(chunk) if total_size > 0: progress = int((downloaded / total_size) * 100) # Log at 10% intervals (10%, 20%, 30%, etc.) if progress >= last_logged_progress + 10 and progress <= 100: logger.debug(f"Download progress: {progress}%") last_logged_progress = progress logger.info(f"โœ… Successfully downloaded to {local_path}") return True except Exception as e: logger.error(f"โŒ Download failed: {e}") # Clean up partial file if os.path.exists(local_path): os.remove(local_path) return False def _extract_filename_from_url(self, url: str) -> Optional[str]: """Extract filename from URL.""" try: parsed = urlparse(url) filename = os.path.basename(parsed.path) return filename if filename else None except Exception: return None # Global singleton instance _mpta_manager = MPTAManager() def get_or_download_mpta(model_id: int, model_url: str, camera_id: str) -> Optional[tuple[str, str]]: """ Convenience function to get or download a shared MPTA. Args: model_id: Unique identifier for the model model_url: URL to download the MPTA file from camera_id: Identifier for the requesting camera Returns: Tuple of (extraction_path, mpta_file_path), or None if failed """ return _mpta_manager.get_or_download_mpta(model_id, model_url, camera_id) def release_mpta(model_id: int, camera_id: str) -> None: """ Convenience function to release a shared MPTA reference. Args: model_id: Unique identifier for the model to release camera_id: Identifier for the camera releasing the reference """ _mpta_manager.release_mpta(model_id, camera_id) def get_mpta_manager_status() -> Dict: """ Convenience function to get MPTA manager status. Returns: Dictionary with manager statistics """ return _mpta_manager.get_manager_status() def cleanup_mpta_manager() -> None: """ Convenience function to cleanup the entire MPTA manager. """ _mpta_manager.cleanup_all()