125 lines
No EOL
5.2 KiB
Python
125 lines
No EOL
5.2 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Test script to verify the worker implementation follows the protocol
|
|
"""
|
|
import json
|
|
import asyncio
|
|
import websockets
|
|
import time
|
|
|
|
async def test_protocol():
|
|
"""Test the worker protocol implementation"""
|
|
uri = "ws://localhost:8000"
|
|
|
|
try:
|
|
async with websockets.connect(uri) as websocket:
|
|
print("✓ Connected to worker")
|
|
|
|
# Test 1: Check if we receive heartbeat (stateReport)
|
|
print("\n1. Testing heartbeat...")
|
|
try:
|
|
message = await asyncio.wait_for(websocket.recv(), timeout=5)
|
|
data = json.loads(message)
|
|
if data.get("type") == "stateReport":
|
|
print("✓ Received stateReport heartbeat")
|
|
print(f" - CPU Usage: {data.get('cpuUsage', 'N/A')}%")
|
|
print(f" - Memory Usage: {data.get('memoryUsage', 'N/A')}%")
|
|
print(f" - Camera Connections: {len(data.get('cameraConnections', []))}")
|
|
else:
|
|
print(f"✗ Expected stateReport, got {data.get('type')}")
|
|
except asyncio.TimeoutError:
|
|
print("✗ No heartbeat received within 5 seconds")
|
|
|
|
# Test 2: Request state
|
|
print("\n2. Testing requestState...")
|
|
await websocket.send(json.dumps({"type": "requestState"}))
|
|
try:
|
|
message = await asyncio.wait_for(websocket.recv(), timeout=5)
|
|
data = json.loads(message)
|
|
if data.get("type") == "stateReport":
|
|
print("✓ Received stateReport response")
|
|
else:
|
|
print(f"✗ Expected stateReport, got {data.get('type')}")
|
|
except asyncio.TimeoutError:
|
|
print("✗ No response to requestState within 5 seconds")
|
|
|
|
# Test 3: Set session ID
|
|
print("\n3. Testing setSessionId...")
|
|
session_message = {
|
|
"type": "setSessionId",
|
|
"payload": {
|
|
"displayIdentifier": "display-001",
|
|
"sessionId": 12345
|
|
}
|
|
}
|
|
await websocket.send(json.dumps(session_message))
|
|
print("✓ Sent setSessionId message")
|
|
|
|
# Test 4: Test patchSession
|
|
print("\n4. Testing patchSession...")
|
|
patch_message = {
|
|
"type": "patchSession",
|
|
"sessionId": 12345,
|
|
"data": {
|
|
"currentCar": {
|
|
"carModel": "Civic",
|
|
"carBrand": "Honda"
|
|
}
|
|
}
|
|
}
|
|
await websocket.send(json.dumps(patch_message))
|
|
|
|
# Wait for patchSessionResult
|
|
try:
|
|
message = await asyncio.wait_for(websocket.recv(), timeout=5)
|
|
data = json.loads(message)
|
|
if data.get("type") == "patchSessionResult":
|
|
print("✓ Received patchSessionResult")
|
|
print(f" - Success: {data.get('payload', {}).get('success')}")
|
|
print(f" - Message: {data.get('payload', {}).get('message')}")
|
|
else:
|
|
print(f"✗ Expected patchSessionResult, got {data.get('type')}")
|
|
except asyncio.TimeoutError:
|
|
print("✗ No patchSessionResult received within 5 seconds")
|
|
|
|
# Test 5: Test subscribe message format (without actual camera)
|
|
print("\n5. Testing subscribe message format...")
|
|
subscribe_message = {
|
|
"type": "subscribe",
|
|
"payload": {
|
|
"subscriptionIdentifier": "display-001;cam-001",
|
|
"snapshotUrl": "http://example.com/snapshot.jpg",
|
|
"snapshotInterval": 5000,
|
|
"modelUrl": "http://example.com/model.mpta",
|
|
"modelName": "Test Model",
|
|
"modelId": 101,
|
|
"cropX1": 100,
|
|
"cropY1": 200,
|
|
"cropX2": 300,
|
|
"cropY2": 400
|
|
}
|
|
}
|
|
await websocket.send(json.dumps(subscribe_message))
|
|
print("✓ Sent subscribe message (will fail without actual camera/model)")
|
|
|
|
# Listen for a few more messages to catch any errors
|
|
print("\n6. Listening for additional messages...")
|
|
for i in range(3):
|
|
try:
|
|
message = await asyncio.wait_for(websocket.recv(), timeout=2)
|
|
data = json.loads(message)
|
|
msg_type = data.get("type")
|
|
print(f" - Received {msg_type}")
|
|
if msg_type == "error":
|
|
print(f" Error: {data.get('error')}")
|
|
except asyncio.TimeoutError:
|
|
break
|
|
|
|
print("\n✓ Protocol test completed successfully!")
|
|
|
|
except Exception as e:
|
|
print(f"✗ Connection failed: {e}")
|
|
print("Make sure the worker is running on localhost:8000")
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(test_protocol()) |