Feat: connect with cms

This commit is contained in:
ziesorx 2025-08-12 23:18:54 +07:00
parent 9a1496f224
commit 0f8b575c90
2 changed files with 162 additions and 72 deletions

222
app.py
View file

@ -13,7 +13,13 @@ import requests
import asyncio import asyncio
import psutil import psutil
import zipfile import zipfile
import ssl
import urllib3
import subprocess
import tempfile
from urllib.parse import urlparse from urllib.parse import urlparse
from requests.adapters import HTTPAdapter
from urllib3.util.ssl_ import create_urllib3_context
from fastapi import FastAPI, WebSocket, HTTPException from fastapi import FastAPI, WebSocket, HTTPException
from fastapi.websockets import WebSocketDisconnect from fastapi.websockets import WebSocketDisconnect
from fastapi.responses import Response from fastapi.responses import Response
@ -88,7 +94,101 @@ def download_mpta(url: str, dest_path: str) -> str:
try: try:
logger.info(f"Starting download of model from {url} to {dest_path}") logger.info(f"Starting download of model from {url} to {dest_path}")
os.makedirs(os.path.dirname(dest_path), exist_ok=True) os.makedirs(os.path.dirname(dest_path), exist_ok=True)
response = requests.get(url, stream=True)
# Configure session with headers and SSL settings for compatibility
session = requests.Session()
# Add headers to mimic browser request
headers = {
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
'Accept': '*/*',
'Accept-Encoding': 'gzip, deflate',
'Connection': 'keep-alive',
'Cache-Control': 'no-cache',
}
session.headers.update(headers)
# Disable SSL verification warnings
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
# Try multiple approaches for SSL compatibility
ssl_success = False
response = None
# Approach 1: Standard request with verify=False and updated TLS
try:
# Create a custom SSL context with modern settings
ssl_context = ssl.create_default_context()
ssl_context.check_hostname = False
ssl_context.verify_mode = ssl.CERT_NONE
ssl_context.set_ciphers('DEFAULT:@SECLEVEL=1')
# Create adapter with custom SSL context
class SSLContextAdapter(HTTPAdapter):
def __init__(self, ssl_context=None):
self.ssl_context = ssl_context
super().__init__()
def init_poolmanager(self, *args, **kwargs):
kwargs['ssl_context'] = self.ssl_context
return super().init_poolmanager(*args, **kwargs)
session.mount('https://', SSLContextAdapter(ssl_context))
response = session.get(url, stream=True, verify=False, timeout=30)
ssl_success = True
except Exception as e1:
logger.debug(f"First SSL approach failed: {e1}")
# Approach 2: Fallback to basic request without custom SSL
try:
response = session.get(url, stream=True, verify=False, timeout=30)
ssl_success = True
except Exception as e2:
logger.debug(f"Second SSL approach failed: {e2}")
# Approach 3: Last resort - use system curl if available
try:
with tempfile.NamedTemporaryFile(delete=False) as tmp_file:
tmp_path = tmp_file.name
curl_cmd = [
'curl', '-L', '-k', '--silent', '--show-error',
'-H', f'User-Agent: {headers["User-Agent"]}',
'-o', tmp_path, url
]
result = subprocess.run(curl_cmd, capture_output=True, text=True, timeout=60)
if result.returncode == 0:
# Create a fake response object
class FakeResponse:
def __init__(self, file_path):
self.status_code = 200
self.headers = {'content-length': str(os.path.getsize(file_path))}
self._file_path = file_path
def iter_content(self, chunk_size=8192):
with open(self._file_path, 'rb') as f:
while True:
chunk = f.read(chunk_size)
if not chunk:
break
yield chunk
os.unlink(self._file_path) # Clean up temp file
response = FakeResponse(tmp_path)
ssl_success = True
logger.info("Successfully downloaded using system curl as fallback")
else:
logger.error(f"curl fallback failed: {result.stderr}")
if os.path.exists(tmp_path):
os.unlink(tmp_path)
except Exception as e3:
logger.debug(f"curl fallback failed: {e3}")
if 'tmp_path' in locals() and os.path.exists(tmp_path):
os.unlink(tmp_path)
if not ssl_success or not response:
raise Exception("All SSL approaches failed - unable to establish secure connection")
if response.status_code == 200: if response.status_code == 200:
file_size = int(response.headers.get('content-length', 0)) file_size = int(response.headers.get('content-length', 0))
logger.info(f"Model file size: {file_size/1024/1024:.2f} MB") logger.info(f"Model file size: {file_size/1024/1024:.2f} MB")
@ -107,6 +207,13 @@ def download_mpta(url: str, dest_path: str) -> str:
except Exception as e: except Exception as e:
logger.error(f"Exception downloading mpta file from {url}: {str(e)}", exc_info=True) logger.error(f"Exception downloading mpta file from {url}: {str(e)}", exc_info=True)
return None return None
finally:
# Clean up session resources
try:
if 'session' in locals():
session.close()
except:
pass
# Add helper to fetch snapshot image from HTTP/HTTPS URL # Add helper to fetch snapshot image from HTTP/HTTPS URL
def fetch_snapshot(url: str): def fetch_snapshot(url: str):
@ -240,16 +347,14 @@ async def detect(websocket: WebSocket):
logger.debug(f"Processing frame for camera {camera_id} with model {stream['modelId']}") logger.debug(f"Processing frame for camera {camera_id} with model {stream['modelId']}")
start_time = time.time() start_time = time.time()
# Extract display identifier for session ID lookup # Extract display identifier for pipeline context
subscription_parts = stream["subscriptionIdentifier"].split(';') subscription_parts = stream["subscriptionIdentifier"].split(';')
display_identifier = subscription_parts[0] if subscription_parts else None display_identifier = subscription_parts[0] if subscription_parts else None
session_id = session_ids.get(display_identifier) if display_identifier else None
# Create context for pipeline execution # Create context for pipeline execution (session_id will be generated by pipeline)
pipeline_context = { pipeline_context = {
"camera_id": camera_id, "camera_id": camera_id,
"display_id": display_identifier, "display_id": display_identifier
"session_id": session_id
} }
detection_result = run_pipeline(cropped_frame, model_tree, context=pipeline_context) detection_result = run_pipeline(cropped_frame, model_tree, context=pipeline_context)
@ -259,46 +364,24 @@ async def detect(websocket: WebSocket):
# Log the raw detection result for debugging # Log the raw detection result for debugging
logger.debug(f"Raw detection result for camera {camera_id}:\n{json.dumps(detection_result, indent=2, default=str)}") logger.debug(f"Raw detection result for camera {camera_id}:\n{json.dumps(detection_result, indent=2, default=str)}")
# Direct class result (no detections/classifications structure) # Extract session_id from pipeline result (generated during database record creation)
if detection_result and isinstance(detection_result, dict) and "class" in detection_result and "confidence" in detection_result: session_id = None
highest_confidence_detection = { if detection_result and isinstance(detection_result, dict):
"class": detection_result.get("class", "none"), # Check if pipeline generated a session_id (happens when Car+Frontal detected together)
"confidence": detection_result.get("confidence", 1.0), if "session_id" in detection_result:
"box": [0, 0, 0, 0] # Empty bounding box for classifications session_id = detection_result["session_id"]
} logger.debug(f"Extracted session_id from pipeline result: {session_id}")
# Handle case when no detections found or result is empty
elif not detection_result or not detection_result.get("detections"):
# Check if we have classification results
if detection_result and detection_result.get("classifications"):
# Get the highest confidence classification
classifications = detection_result.get("classifications", [])
highest_confidence_class = max(classifications, key=lambda x: x.get("confidence", 0)) if classifications else None
if highest_confidence_class: # Process detection result - run_pipeline returns the primary detection directly
highest_confidence_detection = { if detection_result and isinstance(detection_result, dict) and "class" in detection_result:
"class": highest_confidence_class.get("class", "none"), highest_confidence_detection = detection_result
"confidence": highest_confidence_class.get("confidence", 1.0),
"box": [0, 0, 0, 0] # Empty bounding box for classifications
}
else:
highest_confidence_detection = {
"class": "none",
"confidence": 1.0,
"box": [0, 0, 0, 0]
}
else:
highest_confidence_detection = {
"class": "none",
"confidence": 1.0,
"box": [0, 0, 0, 0]
}
else: else:
# Find detection with highest confidence # No detection found
detections = detection_result.get("detections", []) highest_confidence_detection = {
highest_confidence_detection = max(detections, key=lambda x: x.get("confidence", 0)) if detections else {
"class": "none", "class": "none",
"confidence": 1.0, "confidence": 1.0,
"box": [0, 0, 0, 0] "bbox": [0, 0, 0, 0],
"branch_results": {}
} }
# Convert detection format to match backend expectations exactly as in worker.md section 4.2 # Convert detection format to match backend expectations exactly as in worker.md section 4.2
@ -311,31 +394,33 @@ async def detect(websocket: WebSocket):
"licensePlateConfidence": None "licensePlateConfidence": None
} }
# Handle different detection result formats # Extract and process branch results from parallel classification
if isinstance(highest_confidence_detection, dict): branch_results = highest_confidence_detection.get("branch_results", {})
# Extract and flatten branch results from parallel classification if branch_results:
branch_results = highest_confidence_detection.get("branch_results", {}) logger.debug(f"Processing branch results: {branch_results}")
if branch_results:
logger.debug(f"Processing branch results: {branch_results}")
# Transform branch results into backend-expected detection attributes # Transform branch results into backend-expected detection attributes
for branch_id, branch_data in branch_results.items(): for branch_id, branch_data in branch_results.items():
if isinstance(branch_data, dict): if isinstance(branch_data, dict):
# Map common classification fields to backend-expected names logger.debug(f"Processing branch {branch_id}: {branch_data}")
if "brand" in branch_data:
detection_dict["carBrand"] = branch_data["brand"]
if "body_type" in branch_data:
detection_dict["bodyType"] = branch_data["body_type"]
if "class" in branch_data:
class_name = branch_data["class"]
# Map based on branch/model type # Map common classification fields to backend-expected names
if "brand" in branch_id.lower(): if "brand" in branch_data:
detection_dict["carBrand"] = class_name detection_dict["carBrand"] = branch_data["brand"]
elif "bodytype" in branch_id.lower() or "body" in branch_id.lower(): if "body_type" in branch_data:
detection_dict["bodyType"] = class_name detection_dict["bodyType"] = branch_data["body_type"]
if "class" in branch_data:
class_name = branch_data["class"]
logger.info(f"Detection payload: {detection_dict}") # Map based on branch/model type
if "brand" in branch_id.lower():
detection_dict["carBrand"] = class_name
elif "bodytype" in branch_id.lower() or "body" in branch_id.lower():
detection_dict["bodyType"] = class_name
logger.info(f"Detection payload after branch processing: {detection_dict}")
else:
logger.debug("No branch results found in detection result")
detection_data = { detection_data = {
"type": "imageDetection", "type": "imageDetection",
@ -348,12 +433,14 @@ async def detect(websocket: WebSocket):
} }
} }
# Add session ID if available # Add session ID if available (generated by pipeline when Car+Frontal detected)
if session_id is not None: if session_id is not None:
detection_data["sessionId"] = session_id detection_data["sessionId"] = session_id
logger.debug(f"Added session_id to WebSocket response: {session_id}")
if highest_confidence_detection["class"] != "none": if highest_confidence_detection.get("class") != "none":
logger.info(f"Camera {camera_id}: Detected {highest_confidence_detection['class']} with confidence {highest_confidence_detection['confidence']:.2f} using model {stream['modelName']}") confidence = highest_confidence_detection.get("confidence", 0.0)
logger.info(f"Camera {camera_id}: Detected {highest_confidence_detection['class']} with confidence {confidence:.2f} using model {stream['modelName']}")
# Log session ID if available # Log session ID if available
if session_id: if session_id:
@ -361,6 +448,7 @@ async def detect(websocket: WebSocket):
await websocket.send_json(detection_data) await websocket.send_json(detection_data)
logger.debug(f"Sent detection data to client for camera {camera_id}") logger.debug(f"Sent detection data to client for camera {camera_id}")
logger.debug(f"Sent this detection data: {detection_data}")
return persistent_data return persistent_data
except Exception as e: except Exception as e:
logger.error(f"Error in handle_detection for camera {camera_id}: {str(e)}", exc_info=True) logger.error(f"Error in handle_detection for camera {camera_id}: {str(e)}", exc_info=True)

View file

@ -786,9 +786,11 @@ def run_pipeline(frame, node: dict, return_bbox: bool=False, context=None):
primary_detection = max(all_detections, key=lambda x: x["confidence"]) primary_detection = max(all_detections, key=lambda x: x["confidence"])
primary_bbox = primary_detection["bbox"] primary_bbox = primary_detection["bbox"]
# Add branch results to primary detection for compatibility # Add branch results and session_id to primary detection for compatibility
if "branch_results" in detection_result: if "branch_results" in detection_result:
primary_detection["branch_results"] = detection_result["branch_results"] primary_detection["branch_results"] = detection_result["branch_results"]
if "session_id" in detection_result:
primary_detection["session_id"] = detection_result["session_id"]
return (primary_detection, primary_bbox) if return_bbox else primary_detection return (primary_detection, primary_bbox) if return_bbox else primary_detection