#!/usr/bin/env python3 """ Test script for LPR (License Plate Recognition) integration. This script simulates LPR service sending license plate results to Redis. """ import json import time import redis import sys import os def main(): """Test LPR integration by sending mock license plate results to Redis""" # Redis configuration (should match pipeline.json) redis_config = { 'host': '10.100.1.3', 'port': 6379, 'password': 'FBQgi0i5RevAAMO5Hh66', 'db': 0 } try: # Connect to Redis print("๐Ÿ”Œ Connecting to Redis...") redis_client = redis.Redis( host=redis_config['host'], port=redis_config['port'], password=redis_config['password'], db=redis_config['db'], decode_responses=True ) # Test connection redis_client.ping() print(f"โœ… Connected to Redis at {redis_config['host']}:{redis_config['port']}") # Mock LPR results to send test_cases = [ { "session_id": "123", "license_character": "ABC-1234" }, { "session_id": "124", "license_character": "XYZ-5678" }, { "session_id": "125", "license_character": "DEF-9999" } ] print("\n๐Ÿงช Sending mock LPR results...") for i, test_case in enumerate(test_cases, 1): print(f"\n๐Ÿ“ค Test {i}: Sending LPR result for session {test_case['session_id']}") print(f" License: {test_case['license_character']}") # Publish to license_results channel result = redis_client.publish("license_results", json.dumps(test_case)) print(f" ๐Ÿ“ก Published to 'license_results' channel (subscribers: {result})") if result == 0: print(" โš ๏ธ Warning: No subscribers listening to 'license_results' channel") print(" ๐Ÿ’ก Make sure the detector worker is running and has loaded a model") # Wait between test cases if i < len(test_cases): print(" โณ Waiting 3 seconds before next test...") time.sleep(3) print(f"\nโœ… Completed {len(test_cases)} test cases") print("๐Ÿ” Check detector worker logs for LPR processing messages:") print(" - Look for '๐Ÿš— LPR Result received' messages") print(" - Look for 'โœ… Updated detection' messages") print(" - Look for '๐Ÿ“ค Sent LPR update to backend' messages") except redis.exceptions.ConnectionError as e: print(f"โŒ Failed to connect to Redis: {e}") print("๐Ÿ’ก Make sure Redis is running and accessible") sys.exit(1) except Exception as e: print(f"โŒ Error during testing: {e}") sys.exit(1) if __name__ == "__main__": print("๐Ÿงช LPR Integration Test Script") print("=" * 50) main()