Successful

This commit is contained in:
Pongsatorn 2025-08-29 02:13:22 +07:00
parent 39394caa8e
commit 5875b76d74
3 changed files with 846 additions and 0 deletions

92
test_lpr.py Normal file
View file

@ -0,0 +1,92 @@
#!/usr/bin/env python3
"""
Test script for LPR (License Plate Recognition) integration.
This script simulates LPR service sending license plate results to Redis.
"""
import json
import time
import redis
import sys
import os
def main():
"""Test LPR integration by sending mock license plate results to Redis"""
# Redis configuration (should match pipeline.json)
redis_config = {
'host': '10.100.1.3',
'port': 6379,
'password': 'FBQgi0i5RevAAMO5Hh66',
'db': 0
}
try:
# Connect to Redis
print("🔌 Connecting to Redis...")
redis_client = redis.Redis(
host=redis_config['host'],
port=redis_config['port'],
password=redis_config['password'],
db=redis_config['db'],
decode_responses=True
)
# Test connection
redis_client.ping()
print(f"✅ Connected to Redis at {redis_config['host']}:{redis_config['port']}")
# Mock LPR results to send
test_cases = [
{
"session_id": "123",
"license_character": "ABC-1234"
},
{
"session_id": "124",
"license_character": "XYZ-5678"
},
{
"session_id": "125",
"license_character": "DEF-9999"
}
]
print("\n🧪 Sending mock LPR results...")
for i, test_case in enumerate(test_cases, 1):
print(f"\n📤 Test {i}: Sending LPR result for session {test_case['session_id']}")
print(f" License: {test_case['license_character']}")
# Publish to license_results channel
result = redis_client.publish("license_results", json.dumps(test_case))
print(f" 📡 Published to 'license_results' channel (subscribers: {result})")
if result == 0:
print(" ⚠️ Warning: No subscribers listening to 'license_results' channel")
print(" 💡 Make sure the detector worker is running and has loaded a model")
# Wait between test cases
if i < len(test_cases):
print(" ⏳ Waiting 3 seconds before next test...")
time.sleep(3)
print(f"\n✅ Completed {len(test_cases)} test cases")
print("🔍 Check detector worker logs for LPR processing messages:")
print(" - Look for '🚗 LPR Result received' messages")
print(" - Look for '✅ Updated detection' messages")
print(" - Look for '📤 Sent LPR update to backend' messages")
except redis.exceptions.ConnectionError as e:
print(f"❌ Failed to connect to Redis: {e}")
print("💡 Make sure Redis is running and accessible")
sys.exit(1)
except Exception as e:
print(f"❌ Error during testing: {e}")
sys.exit(1)
if __name__ == "__main__":
print("🧪 LPR Integration Test Script")
print("=" * 50)
main()