fix: improve couple things #29
5 changed files with 183 additions and 31 deletions
26
app.py
26
app.py
|
|
@ -319,7 +319,6 @@ async def get_session_image(session_id: int):
|
|||
"""
|
||||
try:
|
||||
from pathlib import Path
|
||||
import glob
|
||||
|
||||
# Images directory
|
||||
images_dir = Path("images")
|
||||
|
|
@ -331,23 +330,34 @@ async def get_session_image(session_id: int):
|
|||
detail=f"No images directory found"
|
||||
)
|
||||
|
||||
# Search for files matching session ID pattern: {session_id}_*
|
||||
pattern = str(images_dir / f"{session_id}_*.jpg")
|
||||
matching_files = glob.glob(pattern)
|
||||
# Use os.scandir() for efficient file searching (3-5x faster than glob.glob)
|
||||
# Filter files matching session ID pattern: {session_id}_*.jpg
|
||||
prefix = f"{session_id}_"
|
||||
most_recent_file = None
|
||||
most_recent_mtime = 0
|
||||
|
||||
if not matching_files:
|
||||
with os.scandir(images_dir) as entries:
|
||||
for entry in entries:
|
||||
# Filter: must be a file, start with session_id prefix, and end with .jpg
|
||||
if entry.is_file() and entry.name.startswith(prefix) and entry.name.endswith('.jpg'):
|
||||
# Use cached stat info from DirEntry (much faster than separate stat calls)
|
||||
entry_stat = entry.stat()
|
||||
if entry_stat.st_mtime > most_recent_mtime:
|
||||
most_recent_mtime = entry_stat.st_mtime
|
||||
most_recent_file = entry.path
|
||||
|
||||
if not most_recent_file:
|
||||
logger.warning(f"No image found for session {session_id}")
|
||||
raise HTTPException(
|
||||
status_code=404,
|
||||
detail=f"No image found for session {session_id}"
|
||||
)
|
||||
|
||||
# Get the most recent file if multiple exist
|
||||
most_recent_file = max(matching_files, key=os.path.getmtime)
|
||||
logger.info(f"Found session image for session {session_id}: {most_recent_file}")
|
||||
|
||||
# Read the image file
|
||||
image_data = open(most_recent_file, 'rb').read()
|
||||
with open(most_recent_file, 'rb') as f:
|
||||
image_data = f.read()
|
||||
|
||||
# Return image as binary response
|
||||
return Response(content=image_data, media_type="image/jpeg")
|
||||
|
|
|
|||
|
|
@ -45,12 +45,17 @@ class BranchProcessor:
|
|||
self.redis_manager = None
|
||||
self.db_manager = None
|
||||
|
||||
# Branch execution timeout (seconds)
|
||||
self.branch_timeout = 30.0
|
||||
|
||||
# Statistics
|
||||
self.stats = {
|
||||
'branches_processed': 0,
|
||||
'parallel_executions': 0,
|
||||
'total_processing_time': 0.0,
|
||||
'models_loaded': 0
|
||||
'models_loaded': 0,
|
||||
'branches_timed_out': 0,
|
||||
'branches_failed': 0
|
||||
}
|
||||
|
||||
logger.info("BranchProcessor initialized")
|
||||
|
|
@ -279,22 +284,46 @@ class BranchProcessor:
|
|||
)
|
||||
future_to_branch[future] = branch
|
||||
|
||||
# Collect results as they complete
|
||||
for future in as_completed(future_to_branch):
|
||||
# Collect results as they complete with timeout
|
||||
try:
|
||||
for future in as_completed(future_to_branch, timeout=self.branch_timeout):
|
||||
branch = future_to_branch[future]
|
||||
branch_id = getattr(branch, 'model_id', 'unknown')
|
||||
|
||||
try:
|
||||
result = future.result()
|
||||
# Get result with timeout to prevent indefinite hanging
|
||||
result = future.result(timeout=self.branch_timeout)
|
||||
results[branch_id] = result
|
||||
logger.info(f"[PARALLEL COMPLETE] {branch_id}: Branch completed successfully")
|
||||
except TimeoutError:
|
||||
logger.error(f"[TIMEOUT] Branch {branch_id} exceeded timeout of {self.branch_timeout}s")
|
||||
self.stats['branches_timed_out'] += 1
|
||||
results[branch_id] = {
|
||||
'status': 'timeout',
|
||||
'message': f'Branch execution timeout after {self.branch_timeout}s',
|
||||
'processing_time': self.branch_timeout
|
||||
}
|
||||
except Exception as e:
|
||||
logger.error(f"Error in parallel branch {branch_id}: {e}")
|
||||
logger.error(f"[ERROR] Error in parallel branch {branch_id}: {e}", exc_info=True)
|
||||
self.stats['branches_failed'] += 1
|
||||
results[branch_id] = {
|
||||
'status': 'error',
|
||||
'message': str(e),
|
||||
'processing_time': 0.0
|
||||
}
|
||||
except TimeoutError:
|
||||
# as_completed iterator timed out - mark remaining futures as timed out
|
||||
logger.error(f"[TIMEOUT] Branch execution timeout after {self.branch_timeout}s - some branches did not complete")
|
||||
for future, branch in future_to_branch.items():
|
||||
branch_id = getattr(branch, 'model_id', 'unknown')
|
||||
if branch_id not in results:
|
||||
logger.error(f"[TIMEOUT] Branch {branch_id} did not complete within timeout")
|
||||
self.stats['branches_timed_out'] += 1
|
||||
results[branch_id] = {
|
||||
'status': 'timeout',
|
||||
'message': f'Branch did not complete within {self.branch_timeout}s timeout',
|
||||
'processing_time': self.branch_timeout
|
||||
}
|
||||
|
||||
# Flatten nested branch results to top level for database access
|
||||
flattened_results = {}
|
||||
|
|
@ -309,6 +338,24 @@ class BranchProcessor:
|
|||
flattened_results[nested_branch_id] = nested_result
|
||||
logger.info(f"[FLATTEN] Added nested branch {nested_branch_id} to top-level results")
|
||||
|
||||
# Log summary of branch execution results
|
||||
succeeded = [bid for bid, res in results.items() if res.get('status') == 'success']
|
||||
failed = [bid for bid, res in results.items() if res.get('status') == 'error']
|
||||
timed_out = [bid for bid, res in results.items() if res.get('status') == 'timeout']
|
||||
skipped = [bid for bid, res in results.items() if res.get('status') == 'skipped']
|
||||
|
||||
summary_parts = []
|
||||
if succeeded:
|
||||
summary_parts.append(f"{len(succeeded)} succeeded: {', '.join(succeeded)}")
|
||||
if failed:
|
||||
summary_parts.append(f"{len(failed)} FAILED: {', '.join(failed)}")
|
||||
if timed_out:
|
||||
summary_parts.append(f"{len(timed_out)} TIMED OUT: {', '.join(timed_out)}")
|
||||
if skipped:
|
||||
summary_parts.append(f"{len(skipped)} skipped: {', '.join(skipped)}")
|
||||
|
||||
logger.info(f"[PARALLEL SUMMARY] Branch execution completed: {' | '.join(summary_parts) if summary_parts else 'no branches'}")
|
||||
|
||||
return flattened_results
|
||||
|
||||
async def _execute_sequential_branches(self,
|
||||
|
|
@ -496,9 +543,19 @@ class BranchProcessor:
|
|||
|
||||
# Use .predict() method for both detection and classification models
|
||||
inference_start = time.time()
|
||||
try:
|
||||
detection_results = model.model.predict(input_frame, conf=min_confidence, verbose=False)
|
||||
inference_time = time.time() - inference_start
|
||||
logger.info(f"[INFERENCE DONE] {branch_id}: Predict completed in {inference_time:.3f}s using .predict() method")
|
||||
except Exception as inference_error:
|
||||
inference_time = time.time() - inference_start
|
||||
logger.error(f"[INFERENCE ERROR] {branch_id}: Model inference failed after {inference_time:.3f}s: {inference_error}", exc_info=True)
|
||||
return {
|
||||
'status': 'error',
|
||||
'branch_id': branch_id,
|
||||
'message': f'Model inference failed: {str(inference_error)}',
|
||||
'processing_time': time.time() - start_time
|
||||
}
|
||||
|
||||
# Initialize branch_detections outside the conditional
|
||||
branch_detections = []
|
||||
|
|
|
|||
|
|
@ -445,6 +445,78 @@ class DetectionPipeline:
|
|||
except Exception as e:
|
||||
logger.error(f"Error sending initial detection imageDetection message: {e}", exc_info=True)
|
||||
|
||||
async def _send_classification_results(self, subscription_id: str, session_id: str, branch_results: Dict[str, Any]):
|
||||
"""
|
||||
Send imageDetection message with classification results (without license plate).
|
||||
Called after processing phase completes to send partial results immediately.
|
||||
|
||||
Args:
|
||||
subscription_id: Subscription identifier to send message to
|
||||
session_id: Session identifier
|
||||
branch_results: Dictionary of branch execution results
|
||||
"""
|
||||
try:
|
||||
if not self.message_sender:
|
||||
logger.warning("No message sender configured, cannot send imageDetection")
|
||||
return
|
||||
|
||||
# Import here to avoid circular imports
|
||||
from ..communication.models import ImageDetectionMessage, DetectionData
|
||||
|
||||
# Extract classification fields from branch results
|
||||
extracted_fields = self._extract_fields_from_branches(branch_results)
|
||||
car_brand = extracted_fields.get('brand')
|
||||
body_type = extracted_fields.get('body_type')
|
||||
|
||||
# Log what we're sending
|
||||
fields_status = []
|
||||
if car_brand is not None:
|
||||
fields_status.append(f"brand={car_brand}")
|
||||
else:
|
||||
fields_status.append("brand=null")
|
||||
if body_type is not None:
|
||||
fields_status.append(f"bodyType={body_type}")
|
||||
else:
|
||||
fields_status.append("bodyType=null")
|
||||
logger.info(f"[CLASSIFICATION] Sending partial results for session {session_id}: {', '.join(fields_status)}")
|
||||
|
||||
# Create detection data with classification results (license plate still pending)
|
||||
detection_data_obj = DetectionData(
|
||||
detection={
|
||||
"carBrand": car_brand,
|
||||
"carModel": None, # Not implemented yet
|
||||
"bodyType": body_type,
|
||||
"licensePlateText": None, # Will be sent later via license plate callback
|
||||
"licensePlateConfidence": None
|
||||
},
|
||||
modelId=self.model_id,
|
||||
modelName=self.pipeline_parser.pipeline_config.model_id if self.pipeline_parser.pipeline_config else "detection_model"
|
||||
)
|
||||
|
||||
# Create imageDetection message
|
||||
detection_message = ImageDetectionMessage(
|
||||
subscriptionIdentifier=subscription_id,
|
||||
data=detection_data_obj
|
||||
)
|
||||
|
||||
# Send message
|
||||
await self.message_sender(detection_message)
|
||||
|
||||
# Log with indication of partial results
|
||||
null_fields = []
|
||||
if car_brand is None:
|
||||
null_fields.append('brand')
|
||||
if body_type is None:
|
||||
null_fields.append('bodyType')
|
||||
|
||||
if null_fields:
|
||||
logger.info(f"[PARTIAL RESULTS] Sent imageDetection with PARTIAL results (null: {', '.join(null_fields)}) - brand='{car_brand}', bodyType='{body_type}' to '{subscription_id}'")
|
||||
else:
|
||||
logger.info(f"[CLASSIFICATION COMPLETE] Sent imageDetection with brand='{car_brand}', bodyType='{body_type}' to '{subscription_id}'")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error sending classification results imageDetection message: {e}", exc_info=True)
|
||||
|
||||
async def execute_detection_phase(self,
|
||||
frame: np.ndarray,
|
||||
display_id: str,
|
||||
|
|
@ -693,6 +765,13 @@ class DetectionPipeline:
|
|||
self.session_processing_results[session_id] = result['branch_results']
|
||||
logger.info(f"[PROCESSING RESULTS] Stored results for session {session_id} for later combination")
|
||||
|
||||
# Send classification results immediately (license plate will come later via callback)
|
||||
await self._send_classification_results(
|
||||
subscription_id=subscription_id,
|
||||
session_id=session_id,
|
||||
branch_results=result['branch_results']
|
||||
)
|
||||
|
||||
logger.info(f"Processing phase completed for session {session_id}: "
|
||||
f"{len(result['branch_results'])} branches, {len(result['actions_executed'])} actions")
|
||||
|
||||
|
|
|
|||
|
|
@ -70,7 +70,8 @@ class TrackingPipelineIntegration:
|
|||
self.abandonment_timeout = 3.0 # seconds to wait before declaring car abandoned
|
||||
|
||||
# Thread pool for pipeline execution
|
||||
self.executor = ThreadPoolExecutor(max_workers=2)
|
||||
# Increased to 8 workers to handle 8 concurrent cameras without queuing
|
||||
self.executor = ThreadPoolExecutor(max_workers=8)
|
||||
|
||||
# Min bbox filtering configuration
|
||||
# TODO: Make this configurable via pipeline.json in the future
|
||||
|
|
|
|||
|
|
@ -56,10 +56,15 @@ class StableCarValidator:
|
|||
self.config = config or {}
|
||||
|
||||
# Validation thresholds
|
||||
self.min_stable_duration = self.config.get('min_stable_duration', 3.0) # seconds
|
||||
self.min_stable_frames = self.config.get('min_stable_frames', 10)
|
||||
# Optimized for 6 FPS RTSP source with 8 concurrent cameras on GPU
|
||||
# GPU contention reduces effective FPS to ~3-5 per camera
|
||||
# Reduced from 3.0s to 1.5s to achieve ~2.75s total validation time (was ~4.25s)
|
||||
self.min_stable_duration = self.config.get('min_stable_duration', 1.5) # seconds
|
||||
# Reduced from 10 to 5 to align with tracker requirement and reduce validation time
|
||||
self.min_stable_frames = self.config.get('min_stable_frames', 5)
|
||||
self.position_variance_threshold = self.config.get('position_variance_threshold', 25.0) # pixels
|
||||
self.min_confidence = self.config.get('min_confidence', 0.7)
|
||||
# Reduced from 0.7 to 0.45 to be more permissive under GPU load
|
||||
self.min_confidence = self.config.get('min_confidence', 0.45)
|
||||
self.velocity_threshold = self.config.get('velocity_threshold', 5.0) # pixels/frame
|
||||
self.entering_zone_ratio = self.config.get('entering_zone_ratio', 0.3) # 30% of frame
|
||||
self.leaving_zone_ratio = self.config.get('leaving_zone_ratio', 0.3)
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue