python-detector-worker/core/communication/models.py
2025-09-24 20:29:31 +07:00

150 lines
No EOL
4.7 KiB
Python

"""
Message data structures for WebSocket communication.
Based on worker.md protocol specification.
"""
from typing import Dict, Any, List, Optional, Union, Literal
from pydantic import BaseModel, Field
from datetime import datetime
class SubscriptionObject(BaseModel):
"""Individual camera subscription configuration."""
subscriptionIdentifier: str = Field(..., description="Format: displayId;cameraId")
rtspUrl: Optional[str] = Field(None, description="RTSP stream URL")
snapshotUrl: Optional[str] = Field(None, description="HTTP snapshot URL")
snapshotInterval: Optional[int] = Field(None, description="Snapshot interval in milliseconds")
modelUrl: str = Field(..., description="Pre-signed URL to .mpta file")
modelId: int = Field(..., description="Unique model identifier")
modelName: str = Field(..., description="Human-readable model name")
cropX1: Optional[int] = Field(None, description="Crop region X1 coordinate")
cropY1: Optional[int] = Field(None, description="Crop region Y1 coordinate")
cropX2: Optional[int] = Field(None, description="Crop region X2 coordinate")
cropY2: Optional[int] = Field(None, description="Crop region Y2 coordinate")
class CameraConnection(BaseModel):
"""Camera connection status for state reporting."""
subscriptionIdentifier: str
modelId: int
modelName: str
online: bool
cropX1: Optional[int] = None
cropY1: Optional[int] = None
cropX2: Optional[int] = None
cropY2: Optional[int] = None
class DetectionData(BaseModel):
"""
Detection result data structure.
Supports three cases:
1. Empty detection: detection = {} (triggers session creation)
2. Full detection: detection = {"carBrand": "Honda", ...} (updates session)
3. Null detection: detection = None (car abandonment)
"""
model_config = {
"json_encoders": {type(None): lambda v: None},
"arbitrary_types_allowed": True
}
detection: Union[Dict[str, Any], None] = Field(
default_factory=dict,
description="Detection results: {} for empty, {...} for data, None/null for abandonment"
)
modelId: int
modelName: str
# Incoming Messages from Backend to Worker
class SetSubscriptionListMessage(BaseModel):
"""Complete subscription list for declarative state management."""
type: Literal["setSubscriptionList"] = "setSubscriptionList"
subscriptions: List[SubscriptionObject]
class SetSessionIdPayload(BaseModel):
"""Session ID association payload."""
displayIdentifier: str
sessionId: Optional[int] = None
class SetSessionIdMessage(BaseModel):
"""Associate session ID with display."""
type: Literal["setSessionId"] = "setSessionId"
payload: SetSessionIdPayload
class SetProgressionStagePayload(BaseModel):
"""Progression stage payload."""
displayIdentifier: str
progressionStage: Optional[str] = None
class SetProgressionStageMessage(BaseModel):
"""Set progression stage for display."""
type: Literal["setProgressionStage"] = "setProgressionStage"
payload: SetProgressionStagePayload
class RequestStateMessage(BaseModel):
"""Request current worker state."""
type: Literal["requestState"] = "requestState"
class PatchSessionResultPayload(BaseModel):
"""Patch session result payload."""
sessionId: int
success: bool
message: str
class PatchSessionResultMessage(BaseModel):
"""Response to patch session request."""
type: Literal["patchSessionResult"] = "patchSessionResult"
payload: PatchSessionResultPayload
# Outgoing Messages from Worker to Backend
class StateReportMessage(BaseModel):
"""Periodic heartbeat with system metrics."""
type: Literal["stateReport"] = "stateReport"
cpuUsage: float
memoryUsage: float
gpuUsage: Optional[float] = None
gpuMemoryUsage: Optional[float] = None
cameraConnections: List[CameraConnection]
class ImageDetectionMessage(BaseModel):
"""Detection event message."""
type: Literal["imageDetection"] = "imageDetection"
subscriptionIdentifier: str
timestamp: str = Field(default_factory=lambda: datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S.%fZ"))
data: DetectionData
class PatchSessionMessage(BaseModel):
"""Request to modify session data."""
type: Literal["patchSession"] = "patchSession"
sessionId: int
data: Dict[str, Any] = Field(..., description="Partial DisplayPersistentData structure")
# Union type for all incoming messages
IncomingMessage = Union[
SetSubscriptionListMessage,
SetSessionIdMessage,
SetProgressionStageMessage,
RequestStateMessage,
PatchSessionResultMessage
]
# Union type for all outgoing messages
OutgoingMessage = Union[
StateReportMessage,
ImageDetectionMessage,
PatchSessionMessage
]