feat/tracking and save in redis finished

This commit is contained in:
Pongsatorn 2025-08-21 20:59:29 +07:00
parent 3a4a27ca68
commit 5873945115
8 changed files with 393 additions and 245 deletions

162
view_redis_images.py Normal file
View file

@ -0,0 +1,162 @@
#!/usr/bin/env python3
"""
Script to view frontal images saved in Redis
"""
import redis
import cv2
import numpy as np
import sys
from datetime import datetime
# Redis connection config (from pipeline.json)
REDIS_CONFIG = {
"host": "10.100.1.3",
"port": 6379,
"password": "FBQgi0i5RevAAMO5Hh66",
"db": 0
}
def connect_redis():
"""Connect to Redis server."""
try:
client = redis.Redis(
host=REDIS_CONFIG["host"],
port=REDIS_CONFIG["port"],
password=REDIS_CONFIG["password"],
db=REDIS_CONFIG["db"],
decode_responses=False # Keep bytes for images
)
client.ping()
print(f"✅ Connected to Redis at {REDIS_CONFIG['host']}:{REDIS_CONFIG['port']}")
return client
except redis.exceptions.ConnectionError as e:
print(f"❌ Failed to connect to Redis: {e}")
return None
def list_image_keys(client):
"""List all image keys in Redis."""
try:
# Look for keys matching the inference pattern
keys = client.keys("inference:*")
print(f"\n📋 Found {len(keys)} image keys:")
for i, key in enumerate(keys):
key_str = key.decode() if isinstance(key, bytes) else key
print(f"{i+1}. {key_str}")
return keys
except Exception as e:
print(f"❌ Error listing keys: {e}")
return []
def view_image(client, key):
"""View a specific image from Redis."""
try:
# Get image data from Redis
image_data = client.get(key)
if image_data is None:
print(f"❌ No data found for key: {key}")
return
print(f"📸 Image size: {len(image_data)} bytes")
# Convert bytes to numpy array
nparr = np.frombuffer(image_data, np.uint8)
# Decode image
img = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
if img is None:
print("❌ Failed to decode image data")
return
print(f"🖼️ Image dimensions: {img.shape[1]}x{img.shape[0]} pixels")
# Display image
key_str = key.decode() if isinstance(key, bytes) else key
cv2.imshow(f'Redis Image: {key_str}', img)
print("👁️ Image displayed. Press any key to close...")
cv2.waitKey(0)
cv2.destroyAllWindows()
# Ask if user wants to save the image
save = input("💾 Save image to file? (y/n): ").lower().strip()
if save == 'y':
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"redis_image_{timestamp}.jpg"
cv2.imwrite(filename, img)
print(f"💾 Image saved as: {filename}")
except Exception as e:
print(f"❌ Error viewing image: {e}")
def monitor_new_images(client):
"""Monitor for new images being added to Redis."""
print("👀 Monitoring for new images... (Press Ctrl+C to stop)")
try:
# Subscribe to Redis pub/sub for car detections
pubsub = client.pubsub()
pubsub.subscribe('car_detections')
for message in pubsub.listen():
if message['type'] == 'message':
data = message['data'].decode()
print(f"🚨 New detection: {data}")
# Try to extract image key from message
import json
try:
detection_data = json.loads(data)
image_key = detection_data.get('image_key')
if image_key:
print(f"🖼️ New image available: {image_key}")
view_choice = input("View this image now? (y/n): ").lower().strip()
if view_choice == 'y':
view_image(client, image_key)
except json.JSONDecodeError:
pass
except KeyboardInterrupt:
print("\n👋 Stopping monitor...")
except Exception as e:
print(f"❌ Monitor error: {e}")
def main():
"""Main function."""
print("🔍 Redis Image Viewer")
print("=" * 50)
# Connect to Redis
client = connect_redis()
if not client:
return
while True:
print("\n📋 Options:")
print("1. List all image keys")
print("2. View specific image")
print("3. Monitor for new images")
print("4. Exit")
choice = input("\nEnter choice (1-4): ").strip()
if choice == '1':
keys = list_image_keys(client)
elif choice == '2':
keys = list_image_keys(client)
if keys:
try:
idx = int(input(f"\nEnter image number (1-{len(keys)}): ")) - 1
if 0 <= idx < len(keys):
view_image(client, keys[idx])
else:
print("❌ Invalid selection")
except ValueError:
print("❌ Please enter a valid number")
elif choice == '3':
monitor_new_images(client)
elif choice == '4':
print("👋 Goodbye!")
break
else:
print("❌ Invalid choice")
if __name__ == "__main__":
main()