""" Worker state management for system metrics and subscription tracking. """ import logging import psutil import threading from typing import Dict, Set, Optional, List from dataclasses import dataclass, field from .models import CameraConnection, SubscriptionObject logger = logging.getLogger(__name__) # Try to import torch and pynvml for GPU monitoring try: import torch TORCH_AVAILABLE = True except ImportError: TORCH_AVAILABLE = False logger.warning("PyTorch not available, GPU metrics will not be collected") try: import pynvml PYNVML_AVAILABLE = True pynvml.nvmlInit() logger.info("NVIDIA ML Python (pynvml) initialized successfully") except ImportError: PYNVML_AVAILABLE = False logger.debug("pynvml not available, falling back to PyTorch GPU monitoring") except Exception as e: PYNVML_AVAILABLE = False logger.warning(f"Failed to initialize pynvml: {e}") @dataclass class WorkerState: """Central state management for the detector worker.""" # Active subscriptions indexed by subscription identifier subscriptions: Dict[str, SubscriptionObject] = field(default_factory=dict) # Session ID mapping: display_identifier -> session_id session_ids: Dict[str, int] = field(default_factory=dict) # Progression stage mapping: display_identifier -> stage progression_stages: Dict[str, str] = field(default_factory=dict) # Active camera connections for state reporting camera_connections: List[CameraConnection] = field(default_factory=list) # Thread lock for state synchronization _lock: threading.RLock = field(default_factory=threading.RLock) def set_subscriptions(self, new_subscriptions: List[SubscriptionObject]) -> None: """ Update active subscriptions with declarative list from backend. Args: new_subscriptions: Complete list of desired subscriptions """ with self._lock: # Convert to dict for easy lookup new_sub_dict = {sub.subscriptionIdentifier: sub for sub in new_subscriptions} # Log changes for debugging current_ids = set(self.subscriptions.keys()) new_ids = set(new_sub_dict.keys()) added = new_ids - current_ids removed = current_ids - new_ids updated = current_ids & new_ids if added: logger.info(f"[State Update] Adding subscriptions: {added}") if removed: logger.info(f"[State Update] Removing subscriptions: {removed}") if updated: logger.info(f"[State Update] Updating subscriptions: {updated}") # Replace entire subscription dict self.subscriptions = new_sub_dict # Update camera connections for state reporting self._update_camera_connections() def get_subscription(self, subscription_identifier: str) -> Optional[SubscriptionObject]: """Get subscription by identifier.""" with self._lock: return self.subscriptions.get(subscription_identifier) def get_all_subscriptions(self) -> List[SubscriptionObject]: """Get all active subscriptions.""" with self._lock: return list(self.subscriptions.values()) def set_session_id(self, display_identifier: str, session_id: Optional[int]) -> None: """ Set or clear session ID for a display. Args: display_identifier: Display identifier session_id: Session ID to set, or None to clear """ with self._lock: if session_id is None: self.session_ids.pop(display_identifier, None) logger.info(f"[State Update] Cleared session ID for display {display_identifier}") else: self.session_ids[display_identifier] = session_id logger.info(f"[State Update] Set session ID {session_id} for display {display_identifier}") def get_session_id(self, display_identifier: str) -> Optional[int]: """Get session ID for display identifier.""" with self._lock: return self.session_ids.get(display_identifier) def get_session_id_for_subscription(self, subscription_identifier: str) -> Optional[int]: """Get session ID for subscription by extracting display identifier.""" from .messages import extract_display_identifier display_id = extract_display_identifier(subscription_identifier) if display_id: return self.get_session_id(display_id) return None def set_progression_stage(self, display_identifier: str, stage: Optional[str]) -> None: """ Set or clear progression stage for a display. Args: display_identifier: Display identifier stage: Progression stage to set, or None to clear """ with self._lock: if stage is None: self.progression_stages.pop(display_identifier, None) logger.info(f"[State Update] Cleared progression stage for display {display_identifier}") else: self.progression_stages[display_identifier] = stage logger.info(f"[State Update] Set progression stage '{stage}' for display {display_identifier}") def get_progression_stage(self, display_identifier: str) -> Optional[str]: """Get progression stage for display identifier.""" with self._lock: return self.progression_stages.get(display_identifier) def _update_camera_connections(self) -> None: """Update camera connections list for state reporting.""" connections = [] for sub in self.subscriptions.values(): connection = CameraConnection( subscriptionIdentifier=sub.subscriptionIdentifier, modelId=sub.modelId, modelName=sub.modelName, online=True, # TODO: Add actual online status tracking cropX1=sub.cropX1, cropY1=sub.cropY1, cropX2=sub.cropX2, cropY2=sub.cropY2 ) connections.append(connection) self.camera_connections = connections def get_camera_connections(self) -> List[CameraConnection]: """Get current camera connections for state reporting.""" with self._lock: return self.camera_connections.copy() class SystemMetrics: """System metrics collection for state reporting.""" @staticmethod def get_cpu_usage() -> float: """Get current CPU usage percentage.""" try: return psutil.cpu_percent(interval=0.1) except Exception as e: logger.error(f"Failed to get CPU usage: {e}") return 0.0 @staticmethod def get_memory_usage() -> float: """Get current memory usage percentage.""" try: return psutil.virtual_memory().percent except Exception as e: logger.error(f"Failed to get memory usage: {e}") return 0.0 @staticmethod def get_gpu_usage() -> Optional[float]: """Get current GPU usage percentage.""" try: # Prefer pynvml for accurate GPU utilization if PYNVML_AVAILABLE: handle = pynvml.nvmlDeviceGetHandleByIndex(0) # First GPU utilization = pynvml.nvmlDeviceGetUtilizationRates(handle) return float(utilization.gpu) # Fallback to PyTorch memory-based estimation elif TORCH_AVAILABLE and torch.cuda.is_available(): if hasattr(torch.cuda, 'utilization'): return torch.cuda.utilization() else: # Estimate based on memory usage allocated = torch.cuda.memory_allocated() reserved = torch.cuda.memory_reserved() if reserved > 0: return (allocated / reserved) * 100 return None except Exception as e: logger.error(f"Failed to get GPU usage: {e}") return None @staticmethod def get_gpu_memory_usage() -> Optional[float]: """Get current GPU memory usage in MB.""" if not TORCH_AVAILABLE: return None try: if torch.cuda.is_available(): return torch.cuda.memory_reserved() / (1024 ** 2) # Convert to MB return None except Exception as e: logger.error(f"Failed to get GPU memory usage: {e}") return None # Global worker state instance worker_state = WorkerState()