enhance logging for model loading and pipeline processing; update log levels and add detailed error messages
All checks were successful
Build Backend Application and Docker Image / build-docker (push) Successful in 9m22s

This commit is contained in:
Siwat Sirichai 2025-05-28 19:18:58 +07:00
parent 3511d6ad7a
commit d4754fcd27
3 changed files with 325 additions and 82 deletions

3
.gitignore vendored
View file

@ -7,3 +7,6 @@ __pycache__/
.mptacache .mptacache
mptas mptas
detector_worker.log
.gitignore
no_frame_debug.log

217
app.py
View file

@ -41,41 +41,61 @@ max_retries = config.get("max_retries", 3)
# Configure logging # Configure logging
logging.basicConfig( logging.basicConfig(
level=logging.DEBUG, level=logging.INFO, # Set to INFO level for less verbose output
format="%(asctime)s [%(levelname)s] %(message)s", format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
handlers=[ handlers=[
logging.FileHandler("app.log"), logging.FileHandler("detector_worker.log"), # Write logs to a file
logging.StreamHandler() logging.StreamHandler() # Also output to console
] ]
) )
# Create a logger specifically for this application
logger = logging.getLogger("detector_worker")
logger.setLevel(logging.DEBUG) # Set app-specific logger to DEBUG level
# Ensure all other libraries (including root) use at least INFO level
logging.getLogger().setLevel(logging.INFO)
logger.info("Starting detector worker application")
logger.info(f"Configuration: Target FPS: {TARGET_FPS}, Max streams: {max_streams}, Max retries: {max_retries}")
# Ensure the models directory exists # Ensure the models directory exists
os.makedirs("models", exist_ok=True) os.makedirs("models", exist_ok=True)
logger.info("Ensured models directory exists")
# Constants for heartbeat and timeouts # Constants for heartbeat and timeouts
HEARTBEAT_INTERVAL = 2 # seconds HEARTBEAT_INTERVAL = 2 # seconds
WORKER_TIMEOUT_MS = 10000 WORKER_TIMEOUT_MS = 10000
logger.debug(f"Heartbeat interval set to {HEARTBEAT_INTERVAL} seconds")
# Locks for thread-safe operations # Locks for thread-safe operations
streams_lock = threading.Lock() streams_lock = threading.Lock()
models_lock = threading.Lock() models_lock = threading.Lock()
logger.debug("Initialized thread locks")
# Add helper to download mpta ZIP file from a remote URL # Add helper to download mpta ZIP file from a remote URL
def download_mpta(url: str, dest_path: str) -> str: def download_mpta(url: str, dest_path: str) -> str:
try: try:
logger.info(f"Starting download of model from {url} to {dest_path}")
os.makedirs(os.path.dirname(dest_path), exist_ok=True) os.makedirs(os.path.dirname(dest_path), exist_ok=True)
response = requests.get(url, stream=True) response = requests.get(url, stream=True)
if response.status_code == 200: if response.status_code == 200:
file_size = int(response.headers.get('content-length', 0))
logger.info(f"Model file size: {file_size/1024/1024:.2f} MB")
downloaded = 0
with open(dest_path, "wb") as f: with open(dest_path, "wb") as f:
for chunk in response.iter_content(chunk_size=8192): for chunk in response.iter_content(chunk_size=8192):
f.write(chunk) f.write(chunk)
logging.info(f"Downloaded mpta file from {url} to {dest_path}") downloaded += len(chunk)
if file_size > 0 and downloaded % (file_size // 10) < 8192: # Log approximately every 10%
logger.debug(f"Download progress: {downloaded/file_size*100:.1f}%")
logger.info(f"Successfully downloaded mpta file from {url} to {dest_path}")
return dest_path return dest_path
else: else:
logging.error(f"Failed to download mpta file (status code {response.status_code})") logger.error(f"Failed to download mpta file (status code {response.status_code}): {response.text}")
return None return None
except Exception as e: except Exception as e:
logging.error(f"Exception downloading mpta file from {url}: {e}") logger.error(f"Exception downloading mpta file from {url}: {str(e)}", exc_info=True)
return None return None
#################################################### ####################################################
@ -83,12 +103,17 @@ def download_mpta(url: str, dest_path: str) -> str:
#################################################### ####################################################
@app.websocket("/") @app.websocket("/")
async def detect(websocket: WebSocket): async def detect(websocket: WebSocket):
logging.info("WebSocket connection accepted") logger.info("WebSocket connection accepted")
persistent_data_dict = {} persistent_data_dict = {}
async def handle_detection(camera_id, stream, frame, websocket, model_tree, persistent_data): async def handle_detection(camera_id, stream, frame, websocket, model_tree, persistent_data):
try: try:
logger.debug(f"Processing frame for camera {camera_id} with model {stream['modelId']}")
start_time = time.time()
detection_result = run_pipeline(frame, model_tree) detection_result = run_pipeline(frame, model_tree)
process_time = (time.time() - start_time) * 1000
logger.debug(f"Detection for camera {camera_id} completed in {process_time:.2f}ms")
detection_data = { detection_data = {
"type": "imageDetection", "type": "imageDetection",
"cameraIdentifier": camera_id, "cameraIdentifier": camera_id,
@ -99,87 +124,157 @@ async def detect(websocket: WebSocket):
"modelName": stream["modelName"] "modelName": stream["modelName"]
} }
} }
logging.debug(f"Sending detection data for camera {camera_id}: {detection_data}")
if detection_result:
detection_count = len(detection_result.get("detections", []))
logger.info(f"Camera {camera_id}: Detected {detection_count} objects with model {stream['modelName']}")
await websocket.send_json(detection_data) await websocket.send_json(detection_data)
return persistent_data return persistent_data
except Exception as e: except Exception as e:
logging.error(f"Error in handle_detection for camera {camera_id}: {e}") logger.error(f"Error in handle_detection for camera {camera_id}: {str(e)}", exc_info=True)
return persistent_data return persistent_data
def frame_reader(camera_id, cap, buffer, stop_event): def frame_reader(camera_id, cap, buffer, stop_event):
retries = 0 retries = 0
logger.info(f"Starting frame reader thread for camera {camera_id}")
frame_count = 0
last_log_time = time.time()
try: try:
# Log initial camera status and properties
if cap.isOpened():
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps = cap.get(cv2.CAP_PROP_FPS)
logger.info(f"Camera {camera_id} opened successfully with resolution {width}x{height}, FPS: {fps}")
else:
logger.error(f"Camera {camera_id} failed to open initially")
while not stop_event.is_set(): while not stop_event.is_set():
try: try:
if not cap.isOpened():
logger.error(f"Camera {camera_id} is not open before trying to read")
# Attempt to reopen
cap = cv2.VideoCapture(streams[camera_id]["rtsp_url"])
time.sleep(reconnect_interval)
continue
logger.debug(f"Attempting to read frame from camera {camera_id}")
ret, frame = cap.read() ret, frame = cap.read()
if not ret: if not ret:
logging.warning(f"Connection lost for camera: {camera_id}, retry {retries+1}/{max_retries}") logger.warning(f"Connection lost for camera: {camera_id}, retry {retries+1}/{max_retries}")
cap.release() cap.release()
time.sleep(reconnect_interval) time.sleep(reconnect_interval)
retries += 1 retries += 1
if retries > max_retries and max_retries != -1: if retries > max_retries and max_retries != -1:
logging.error(f"Max retries reached for camera: {camera_id}") logger.error(f"Max retries reached for camera: {camera_id}, stopping frame reader")
break break
# Re-open # Re-open
logger.info(f"Attempting to reopen RTSP stream for camera: {camera_id}")
cap = cv2.VideoCapture(streams[camera_id]["rtsp_url"]) cap = cv2.VideoCapture(streams[camera_id]["rtsp_url"])
if not cap.isOpened(): if not cap.isOpened():
logging.error(f"Failed to reopen RTSP stream for camera: {camera_id}") logger.error(f"Failed to reopen RTSP stream for camera: {camera_id}")
continue continue
logger.info(f"Successfully reopened RTSP stream for camera: {camera_id}")
continue continue
# Successfully read a frame
frame_count += 1
current_time = time.time()
# Log frame stats every 5 seconds
if current_time - last_log_time > 5:
logger.info(f"Camera {camera_id}: Read {frame_count} frames in the last {current_time - last_log_time:.1f} seconds")
frame_count = 0
last_log_time = current_time
logger.debug(f"Successfully read frame from camera {camera_id}, shape: {frame.shape}")
retries = 0 retries = 0
# Overwrite old frame if buffer is full # Overwrite old frame if buffer is full
if not buffer.empty(): if not buffer.empty():
try: try:
buffer.get_nowait() buffer.get_nowait()
logger.debug(f"Removed old frame from buffer for camera {camera_id}")
except queue.Empty: except queue.Empty:
pass pass
buffer.put(frame) buffer.put(frame)
logger.debug(f"Added new frame to buffer for camera {camera_id}")
# Short sleep to avoid CPU overuse
time.sleep(0.01)
except cv2.error as e: except cv2.error as e:
logging.error(f"OpenCV error for camera {camera_id}: {e}") logger.error(f"OpenCV error for camera {camera_id}: {e}", exc_info=True)
cap.release() cap.release()
time.sleep(reconnect_interval) time.sleep(reconnect_interval)
retries += 1 retries += 1
if retries > max_retries and max_retries != -1: if retries > max_retries and max_retries != -1:
logging.error(f"Max retries reached after OpenCV error for camera {camera_id}") logger.error(f"Max retries reached after OpenCV error for camera {camera_id}")
break break
logger.info(f"Attempting to reopen RTSP stream after OpenCV error for camera: {camera_id}")
cap = cv2.VideoCapture(streams[camera_id]["rtsp_url"]) cap = cv2.VideoCapture(streams[camera_id]["rtsp_url"])
if not cap.isOpened(): if not cap.isOpened():
logging.error(f"Failed to reopen RTSP stream for camera {camera_id} after OpenCV error") logger.error(f"Failed to reopen RTSP stream for camera {camera_id} after OpenCV error")
continue continue
logger.info(f"Successfully reopened RTSP stream after OpenCV error for camera: {camera_id}")
except Exception as e: except Exception as e:
logging.error(f"Unexpected error for camera {camera_id}: {e}") logger.error(f"Unexpected error for camera {camera_id}: {str(e)}", exc_info=True)
cap.release() cap.release()
break break
except Exception as e: except Exception as e:
logging.error(f"Error in frame_reader thread for camera {camera_id}: {e}") logger.error(f"Error in frame_reader thread for camera {camera_id}: {str(e)}", exc_info=True)
finally:
logger.info(f"Frame reader thread for camera {camera_id} is exiting")
if cap and cap.isOpened():
cap.release()
async def process_streams(): async def process_streams():
logging.info("Started processing streams") logger.info("Started processing streams")
try: try:
while True: while True:
start_time = time.time() start_time = time.time()
with streams_lock: with streams_lock:
current_streams = list(streams.items()) current_streams = list(streams.items())
if current_streams:
logger.debug(f"Processing {len(current_streams)} active streams")
else:
logger.debug("No active streams to process")
for camera_id, stream in current_streams: for camera_id, stream in current_streams:
buffer = stream["buffer"] buffer = stream["buffer"]
if not buffer.empty(): if buffer.empty():
logger.debug(f"Frame buffer is empty for camera {camera_id}")
continue
logger.debug(f"Got frame from buffer for camera {camera_id}")
frame = buffer.get() frame = buffer.get()
with models_lock: with models_lock:
model_tree = models.get(camera_id, {}).get(stream["modelId"]) model_tree = models.get(camera_id, {}).get(stream["modelId"])
if not model_tree:
logger.warning(f"Model not found for camera {camera_id}, modelId {stream['modelId']}")
continue
logger.debug(f"Found model tree for camera {camera_id}, modelId {stream['modelId']}")
key = (camera_id, stream["modelId"]) key = (camera_id, stream["modelId"])
persistent_data = persistent_data_dict.get(key, {}) persistent_data = persistent_data_dict.get(key, {})
logger.debug(f"Starting detection for camera {camera_id} with modelId {stream['modelId']}")
updated_persistent_data = await handle_detection( updated_persistent_data = await handle_detection(
camera_id, stream, frame, websocket, model_tree, persistent_data camera_id, stream, frame, websocket, model_tree, persistent_data
) )
persistent_data_dict[key] = updated_persistent_data persistent_data_dict[key] = updated_persistent_data
elapsed_time = (time.time() - start_time) * 1000 # ms elapsed_time = (time.time() - start_time) * 1000 # ms
sleep_time = max(poll_interval - elapsed_time, 0) sleep_time = max(poll_interval - elapsed_time, 0)
logging.debug(f"Elapsed time: {elapsed_time}ms, sleeping for: {sleep_time}ms") logger.debug(f"Frame processing cycle: {elapsed_time:.2f}ms, sleeping for: {sleep_time:.2f}ms")
await asyncio.sleep(sleep_time / 1000.0) await asyncio.sleep(sleep_time / 1000.0)
except asyncio.CancelledError: except asyncio.CancelledError:
logging.info("Stream processing task cancelled") logger.info("Stream processing task cancelled")
except Exception as e: except Exception as e:
logging.error(f"Error in process_streams: {e}") logger.error(f"Error in process_streams: {str(e)}", exc_info=True)
async def send_heartbeat(): async def send_heartbeat():
while True: while True:
@ -212,17 +307,17 @@ async def detect(websocket: WebSocket):
"cameraConnections": camera_connections "cameraConnections": camera_connections
} }
await websocket.send_text(json.dumps(state_report)) await websocket.send_text(json.dumps(state_report))
logging.debug("Sent stateReport as heartbeat") logger.debug("Sent stateReport as heartbeat")
await asyncio.sleep(HEARTBEAT_INTERVAL) await asyncio.sleep(HEARTBEAT_INTERVAL)
except Exception as e: except Exception as e:
logging.error(f"Error sending stateReport heartbeat: {e}") logger.error(f"Error sending stateReport heartbeat: {e}")
break break
async def on_message(): async def on_message():
while True: while True:
try: try:
msg = await websocket.receive_text() msg = await websocket.receive_text()
logging.debug(f"Received message: {msg}") logger.debug(f"Received message: {msg}")
data = json.loads(msg) data = json.loads(msg)
msg_type = data.get("type") msg_type = data.get("type")
@ -236,35 +331,67 @@ async def detect(websocket: WebSocket):
if model_url: if model_url:
with models_lock: with models_lock:
if camera_id not in models: if (camera_id not in models) or (modelId not in models[camera_id]):
models[camera_id] = {} logger.info(f"Loading model from {model_url} for camera {camera_id}, modelId {modelId}")
if modelId not in models[camera_id]:
logging.info(f"Loading model from {model_url}")
extraction_dir = os.path.join("models", camera_id, str(modelId)) extraction_dir = os.path.join("models", camera_id, str(modelId))
os.makedirs(extraction_dir, exist_ok=True) os.makedirs(extraction_dir, exist_ok=True)
# If model_url is remote, download it first. # If model_url is remote, download it first.
parsed = urlparse(model_url) parsed = urlparse(model_url)
if parsed.scheme in ("http", "https"): if parsed.scheme in ("http", "https"):
logger.info(f"Downloading remote model from {model_url}")
local_mpta = os.path.join(extraction_dir, os.path.basename(parsed.path)) local_mpta = os.path.join(extraction_dir, os.path.basename(parsed.path))
logger.debug(f"Download destination: {local_mpta}")
local_path = download_mpta(model_url, local_mpta) local_path = download_mpta(model_url, local_mpta)
if not local_path: if not local_path:
logging.error("Failed to download the remote mpta file.") logger.error(f"Failed to download the remote mpta file from {model_url}")
error_response = {
"type": "error",
"cameraIdentifier": camera_id,
"error": f"Failed to download model from {model_url}"
}
await websocket.send_json(error_response)
continue continue
model_tree = load_pipeline_from_zip(local_path, extraction_dir) model_tree = load_pipeline_from_zip(local_path, extraction_dir)
else: else:
logger.info(f"Loading local model from {model_url}")
# Check if file exists before attempting to load
if not os.path.exists(model_url):
logger.error(f"Local model file not found: {model_url}")
logger.debug(f"Current working directory: {os.getcwd()}")
error_response = {
"type": "error",
"cameraIdentifier": camera_id,
"error": f"Model file not found: {model_url}"
}
await websocket.send_json(error_response)
continue
model_tree = load_pipeline_from_zip(model_url, extraction_dir) model_tree = load_pipeline_from_zip(model_url, extraction_dir)
if model_tree is None: if model_tree is None:
logging.error("Failed to load model from mpta file.") logger.error(f"Failed to load model {modelId} from mpta file for camera {camera_id}")
error_response = {
"type": "error",
"cameraIdentifier": camera_id,
"error": f"Failed to load model {modelId}"
}
await websocket.send_json(error_response)
continue continue
if camera_id not in models:
models[camera_id] = {}
models[camera_id][modelId] = model_tree models[camera_id][modelId] = model_tree
logging.info(f"Loaded model {modelId} for camera {camera_id}") logger.info(f"Successfully loaded model {modelId} for camera {camera_id}")
success_response = {
"type": "modelLoaded",
"cameraIdentifier": camera_id,
"modelId": modelId
}
await websocket.send_json(success_response)
if camera_id and rtsp_url: if camera_id and rtsp_url:
with streams_lock: with streams_lock:
if camera_id not in streams and len(streams) < max_streams: if camera_id not in streams and len(streams) < max_streams:
cap = cv2.VideoCapture(rtsp_url) cap = cv2.VideoCapture(rtsp_url)
if not cap.isOpened(): if not cap.isOpened():
logging.error(f"Failed to open RTSP stream for camera {camera_id}") logger.error(f"Failed to open RTSP stream for camera {camera_id}")
continue continue
buffer = queue.Queue(maxsize=1) buffer = queue.Queue(maxsize=1)
stop_event = threading.Event() stop_event = threading.Event()
@ -280,12 +407,12 @@ async def detect(websocket: WebSocket):
"modelId": modelId, "modelId": modelId,
"modelName": modelName "modelName": modelName
} }
logging.info(f"Subscribed to camera {camera_id} with modelId {modelId}, modelName {modelName}, URL {rtsp_url}") logger.info(f"Subscribed to camera {camera_id} with modelId {modelId}, modelName {modelName}, URL {rtsp_url}")
elif camera_id and camera_id in streams: elif camera_id and camera_id in streams:
# If already subscribed, unsubscribe # If already subscribed, unsubscribe
stream = streams.pop(camera_id) stream = streams.pop(camera_id)
stream["cap"].release() stream["cap"].release()
logging.info(f"Unsubscribed from camera {camera_id}") logger.info(f"Unsubscribed from camera {camera_id}")
with models_lock: with models_lock:
if camera_id in models and modelId in models[camera_id]: if camera_id in models and modelId in models[camera_id]:
del models[camera_id][modelId] del models[camera_id][modelId]
@ -294,14 +421,14 @@ async def detect(websocket: WebSocket):
elif msg_type == "unsubscribe": elif msg_type == "unsubscribe":
payload = data.get("payload", {}) payload = data.get("payload", {})
camera_id = payload.get("cameraIdentifier") camera_id = payload.get("cameraIdentifier")
logging.debug(f"Unsubscribing from camera {camera_id}") logger.debug(f"Unsubscribing from camera {camera_id}")
with streams_lock: with streams_lock:
if camera_id and camera_id in streams: if camera_id and camera_id in streams:
stream = streams.pop(camera_id) stream = streams.pop(camera_id)
stream["stop_event"].set() stream["stop_event"].set()
stream["thread"].join() stream["thread"].join()
stream["cap"].release() stream["cap"].release()
logging.info(f"Unsubscribed from camera {camera_id}") logger.info(f"Unsubscribed from camera {camera_id}")
with models_lock: with models_lock:
if camera_id in models: if camera_id in models:
del models[camera_id] del models[camera_id]
@ -335,14 +462,14 @@ async def detect(websocket: WebSocket):
} }
await websocket.send_text(json.dumps(state_report)) await websocket.send_text(json.dumps(state_report))
else: else:
logging.error(f"Unknown message type: {msg_type}") logger.error(f"Unknown message type: {msg_type}")
except json.JSONDecodeError: except json.JSONDecodeError:
logging.error("Received invalid JSON message") logger.error("Received invalid JSON message")
except (WebSocketDisconnect, ConnectionClosedError) as e: except (WebSocketDisconnect, ConnectionClosedError) as e:
logging.warning(f"WebSocket disconnected: {e}") logger.warning(f"WebSocket disconnected: {e}")
break break
except Exception as e: except Exception as e:
logging.error(f"Error handling message: {e}") logger.error(f"Error handling message: {e}")
break break
try: try:
@ -352,7 +479,7 @@ async def detect(websocket: WebSocket):
message_task = asyncio.create_task(on_message()) message_task = asyncio.create_task(on_message())
await asyncio.gather(heartbeat_task, message_task) await asyncio.gather(heartbeat_task, message_task)
except Exception as e: except Exception as e:
logging.error(f"Error in detect websocket: {e}") logger.error(f"Error in detect websocket: {e}")
finally: finally:
stream_task.cancel() stream_task.cancel()
await stream_task await stream_task
@ -366,8 +493,8 @@ async def detect(websocket: WebSocket):
stream["buffer"].get_nowait() stream["buffer"].get_nowait()
except queue.Empty: except queue.Empty:
pass pass
logging.info(f"Released camera {camera_id} and cleaned up resources") logger.info(f"Released camera {camera_id} and cleaned up resources")
streams.clear() streams.clear()
with models_lock: with models_lock:
models.clear() models.clear()
logging.info("WebSocket connection closed") logger.info("WebSocket connection closed")

View file

@ -6,19 +6,27 @@ import cv2
import requests import requests
import zipfile import zipfile
import shutil import shutil
import traceback
from ultralytics import YOLO from ultralytics import YOLO
from urllib.parse import urlparse from urllib.parse import urlparse
# Create a logger specifically for this module
logger = logging.getLogger("detector_worker.pympta")
def load_pipeline_node(node_config: dict, mpta_dir: str) -> dict: def load_pipeline_node(node_config: dict, mpta_dir: str) -> dict:
# Recursively load a model node from configuration. # Recursively load a model node from configuration.
model_path = os.path.join(mpta_dir, node_config["modelFile"]) model_path = os.path.join(mpta_dir, node_config["modelFile"])
if not os.path.exists(model_path): if not os.path.exists(model_path):
logging.error(f"Model file {model_path} not found.") logger.error(f"Model file {model_path} not found. Current directory: {os.getcwd()}")
logger.error(f"Directory content: {os.listdir(os.path.dirname(model_path))}")
raise FileNotFoundError(f"Model file {model_path} not found.") raise FileNotFoundError(f"Model file {model_path} not found.")
logging.info(f"Loading model for node {node_config['modelId']} from {model_path}") logger.info(f"Loading model for node {node_config['modelId']} from {model_path}")
model = YOLO(model_path) model = YOLO(model_path)
if torch.cuda.is_available(): if torch.cuda.is_available():
logger.info(f"CUDA available. Moving model {node_config['modelId']} to GPU")
model.to("cuda") model.to("cuda")
else:
logger.info(f"CUDA not available. Using CPU for model {node_config['modelId']}")
node = { node = {
"modelId": node_config["modelId"], "modelId": node_config["modelId"],
"modelFile": node_config["modelFile"], "modelFile": node_config["modelFile"],
@ -28,11 +36,14 @@ def load_pipeline_node(node_config: dict, mpta_dir: str) -> dict:
"model": model, "model": model,
"branches": [] "branches": []
} }
logger.debug(f"Configured node {node_config['modelId']} with trigger classes: {node['triggerClasses']}")
for child in node_config.get("branches", []): for child in node_config.get("branches", []):
logger.debug(f"Loading branch for parent node {node_config['modelId']}")
node["branches"].append(load_pipeline_node(child, mpta_dir)) node["branches"].append(load_pipeline_node(child, mpta_dir))
return node return node
def load_pipeline_from_zip(zip_source: str, target_dir: str) -> dict: def load_pipeline_from_zip(zip_source: str, target_dir: str) -> dict:
logger.info(f"Attempting to load pipeline from {zip_source} to {target_dir}")
os.makedirs(target_dir, exist_ok=True) os.makedirs(target_dir, exist_ok=True)
zip_path = os.path.join(target_dir, "pipeline.mpta") zip_path = os.path.join(target_dir, "pipeline.mpta")
@ -40,51 +51,121 @@ def load_pipeline_from_zip(zip_source: str, target_dir: str) -> dict:
parsed = urlparse(zip_source) parsed = urlparse(zip_source)
if parsed.scheme in ("", "file"): if parsed.scheme in ("", "file"):
local_path = parsed.path if parsed.scheme == "file" else zip_source local_path = parsed.path if parsed.scheme == "file" else zip_source
logger.debug(f"Checking if local file exists: {local_path}")
if os.path.exists(local_path): if os.path.exists(local_path):
try: try:
shutil.copy(local_path, zip_path) shutil.copy(local_path, zip_path)
logging.info(f"Copied local .mpta file from {local_path} to {zip_path}") logger.info(f"Copied local .mpta file from {local_path} to {zip_path}")
except Exception as e: except Exception as e:
logging.error(f"Failed to copy local .mpta file from {local_path}: {e}") logger.error(f"Failed to copy local .mpta file from {local_path}: {str(e)}", exc_info=True)
return None return None
else: else:
logging.error(f"Local file {local_path} does not exist.") logger.error(f"Local file {local_path} does not exist. Current directory: {os.getcwd()}")
# List all subdirectories of models directory to help debugging
if os.path.exists("models"):
logger.error(f"Content of models directory: {os.listdir('models')}")
for root, dirs, files in os.walk("models"):
logger.error(f"Directory {root} contains subdirs: {dirs} and files: {files}")
else:
logger.error("The models directory doesn't exist")
return None return None
else: else:
logging.error("HTTP download functionality has been moved. Use a local file path here.") logger.error(f"HTTP download functionality has been moved. Use a local file path here. Received: {zip_source}")
return None return None
try: try:
if not os.path.exists(zip_path):
logger.error(f"Zip file not found at expected location: {zip_path}")
return None
logger.debug(f"Extracting .mpta file from {zip_path} to {target_dir}")
# Extract contents and track the directories created
extracted_dirs = []
with zipfile.ZipFile(zip_path, "r") as zip_ref: with zipfile.ZipFile(zip_path, "r") as zip_ref:
file_list = zip_ref.namelist()
logger.debug(f"Files in .mpta archive: {file_list}")
# Extract and track the top-level directories
for file_path in file_list:
parts = file_path.split('/')
if len(parts) > 1:
top_dir = parts[0]
if top_dir and top_dir not in extracted_dirs:
extracted_dirs.append(top_dir)
# Now extract the files
zip_ref.extractall(target_dir) zip_ref.extractall(target_dir)
logging.info(f"Extracted .mpta file to {target_dir}")
logger.info(f"Successfully extracted .mpta file to {target_dir}")
logger.debug(f"Extracted directories: {extracted_dirs}")
# Check what was actually created after extraction
actual_dirs = [d for d in os.listdir(target_dir) if os.path.isdir(os.path.join(target_dir, d))]
logger.debug(f"Actual directories created: {actual_dirs}")
except zipfile.BadZipFile as e:
logger.error(f"Bad zip file {zip_path}: {str(e)}", exc_info=True)
return None
except Exception as e: except Exception as e:
logging.error(f"Failed to extract .mpta file: {e}") logger.error(f"Failed to extract .mpta file {zip_path}: {str(e)}", exc_info=True)
return None return None
finally: finally:
if os.path.exists(zip_path): if os.path.exists(zip_path):
os.remove(zip_path) os.remove(zip_path)
logger.debug(f"Removed temporary zip file: {zip_path}")
# Use the first extracted directory if it exists, otherwise use the expected name
pipeline_name = os.path.basename(zip_source) pipeline_name = os.path.basename(zip_source)
pipeline_name = os.path.splitext(pipeline_name)[0] pipeline_name = os.path.splitext(pipeline_name)[0]
mpta_dir = os.path.join(target_dir, pipeline_name)
# Find the directory with pipeline.json
mpta_dir = None
# First try the expected directory name
expected_dir = os.path.join(target_dir, pipeline_name)
if os.path.exists(expected_dir) and os.path.exists(os.path.join(expected_dir, "pipeline.json")):
mpta_dir = expected_dir
logger.debug(f"Found pipeline.json in the expected directory: {mpta_dir}")
else:
# Look through all subdirectories for pipeline.json
for subdir in actual_dirs:
potential_dir = os.path.join(target_dir, subdir)
if os.path.exists(os.path.join(potential_dir, "pipeline.json")):
mpta_dir = potential_dir
logger.info(f"Found pipeline.json in directory: {mpta_dir} (different from expected: {expected_dir})")
break
if not mpta_dir:
logger.error(f"Could not find pipeline.json in any extracted directory. Directory content: {os.listdir(target_dir)}")
return None
pipeline_json_path = os.path.join(mpta_dir, "pipeline.json") pipeline_json_path = os.path.join(mpta_dir, "pipeline.json")
if not os.path.exists(pipeline_json_path): if not os.path.exists(pipeline_json_path):
logging.error("pipeline.json not found in the .mpta file") logger.error(f"pipeline.json not found in the .mpta file. Files in directory: {os.listdir(mpta_dir)}")
return None return None
try: try:
with open(pipeline_json_path, "r") as f: with open(pipeline_json_path, "r") as f:
pipeline_config = json.load(f) pipeline_config = json.load(f)
logger.info(f"Successfully loaded pipeline configuration from {pipeline_json_path}")
logger.debug(f"Pipeline config: {json.dumps(pipeline_config, indent=2)}")
return load_pipeline_node(pipeline_config["pipeline"], mpta_dir) return load_pipeline_node(pipeline_config["pipeline"], mpta_dir)
except json.JSONDecodeError as e:
logger.error(f"Error parsing pipeline.json: {str(e)}", exc_info=True)
return None
except KeyError as e:
logger.error(f"Missing key in pipeline.json: {str(e)}", exc_info=True)
return None
except Exception as e: except Exception as e:
logging.error(f"Error loading pipeline.json: {e}") logger.error(f"Error loading pipeline.json: {str(e)}", exc_info=True)
return None return None
def run_pipeline(frame, node: dict, return_bbox: bool = False): def run_pipeline(frame, node: dict, return_bbox: bool = False, is_last_stage: bool = True):
""" """
Processes the frame with the given pipeline node. When return_bbox is True, Processes the frame with the given pipeline node. When return_bbox is True,
the function returns a tuple (detection, bbox) where bbox is (x1,y1,x2,y2) the function returns a tuple (detection, bbox) where bbox is (x1,y1,x2,y2)
for drawing. Otherwise, returns only the detection. for drawing. Otherwise, returns only the detection.
The is_last_stage parameter controls whether this node is considered the last
in the pipeline chain. Only the last stage will return detection results.
""" """
try: try:
# Check model type and use appropriate method # Check model type and use appropriate method
@ -92,7 +173,7 @@ def run_pipeline(frame, node: dict, return_bbox: bool = False):
if model_task == "classify": if model_task == "classify":
# Classification models need to use predict() instead of track() # Classification models need to use predict() instead of track()
logging.debug(f"Running classification model: {node.get('modelId')}") logger.debug(f"Running classification model: {node.get('modelId')}")
results = node["model"].predict(frame, stream=False) results = node["model"].predict(frame, stream=False)
detection = None detection = None
best_box = None best_box = None
@ -109,18 +190,32 @@ def run_pipeline(frame, node: dict, return_bbox: bool = False):
"confidence": conf, "confidence": conf,
"id": None # Classification doesn't have tracking IDs "id": None # Classification doesn't have tracking IDs
} }
logger.debug(f"Classification detection: {detection}")
else:
logger.debug(f"Empty classification results for model {node.get('modelId')}")
# Classification doesn't produce bounding boxes # Classification doesn't produce bounding boxes
bbox = None bbox = None
else: else:
# Detection/segmentation models use tracking # Detection/segmentation models use tracking
logging.debug(f"Running detection/tracking model: {node.get('modelId')}") logger.debug(f"Running detection/tracking model: {node.get('modelId')}")
results = node["model"].track(frame, stream=False, persist=True) results = node["model"].track(frame, stream=False, persist=True)
detection = None detection = None
best_box = None best_box = None
max_conf = -1 max_conf = -1
# Log raw detection count
detection_count = 0
for r in results:
if hasattr(r.boxes, 'cpu') and len(r.boxes.cpu()) > 0:
detection_count += len(r.boxes.cpu())
if detection_count == 0:
logger.debug(f"Empty detection results (no objects found) for model {node.get('modelId')}")
else:
logger.debug(f"Detection model {node.get('modelId')} found {detection_count} objects")
for r in results: for r in results:
for box in r.boxes: for box in r.boxes:
box_cpu = box.cpu() box_cpu = box.cpu()
@ -134,6 +229,11 @@ def run_pipeline(frame, node: dict, return_bbox: bool = False):
} }
best_box = box_cpu best_box = box_cpu
if detection:
logger.debug(f"Best detection: {detection}")
else:
logger.debug(f"No valid detection with tracking ID for model {node.get('modelId')}")
bbox = None bbox = None
# Calculate bbox if best_box exists # Calculate bbox if best_box exists
if detection and best_box is not None: if detection and best_box is not None:
@ -144,31 +244,44 @@ def run_pipeline(frame, node: dict, return_bbox: bool = False):
x2, y2 = min(w, x2), min(h, y2) x2, y2 = min(w, x2), min(h, y2)
if x2 > x1 and y2 > y1: if x2 > x1 and y2 > y1:
bbox = (x1, y1, x2, y2) bbox = (x1, y1, x2, y2)
logger.debug(f"Detection bounding box: {bbox}")
if node.get("crop", False): if node.get("crop", False):
frame = frame[y1:y2, x1:x2] frame = frame[y1:y2, x1:x2]
logger.debug(f"Cropped frame to {frame.shape}")
# Check if we should process branches
if detection is not None: if detection is not None:
for branch in node["branches"]: for branch in node["branches"]:
if detection["class"] in branch.get("triggerClasses", []): if detection["class"] in branch.get("triggerClasses", []):
min_conf = branch.get("minConfidence") min_conf = branch.get("minConfidence")
if min_conf is not None and detection["confidence"] < min_conf: if min_conf is not None and detection["confidence"] < min_conf:
logging.debug(f"Confidence {detection['confidence']} below threshold {min_conf} for branch {branch['modelId']}.") logger.debug(f"Confidence {detection['confidence']} below threshold {min_conf} for branch {branch['modelId']}.")
if return_bbox: break
return detection, bbox
return detection # If we have branches, this is not the last stage
res = run_pipeline(frame, branch, return_bbox) branch_result = run_pipeline(frame, branch, return_bbox, is_last_stage=True)
if res is not None:
if return_bbox: # This node is no longer the last stage, so its results shouldn't be returned
return res is_last_stage = False
return res
if branch_result is not None:
if return_bbox:
return branch_result
return branch_result
break
# Return this node's detection only if it's considered the last stage
if is_last_stage:
if return_bbox: if return_bbox:
return detection, bbox return detection, bbox
return detection return detection
# No detection or not the last stage
if return_bbox: if return_bbox:
return None, None return None, None
return None return None
except Exception as e: except Exception as e:
logging.error(f"Error running pipeline on node {node.get('modelId')}: {e}") logger.error(f"Error running pipeline on node {node.get('modelId')}: {e}")
if return_bbox: if return_bbox:
return None, None return None, None
return None return None