162 lines
No EOL
5.2 KiB
Python
162 lines
No EOL
5.2 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Script to view frontal images saved in Redis
|
|
"""
|
|
import redis
|
|
import cv2
|
|
import numpy as np
|
|
import sys
|
|
from datetime import datetime
|
|
|
|
# Redis connection config (from pipeline.json)
|
|
REDIS_CONFIG = {
|
|
"host": "10.100.1.3",
|
|
"port": 6379,
|
|
"password": "FBQgi0i5RevAAMO5Hh66",
|
|
"db": 0
|
|
}
|
|
|
|
def connect_redis():
|
|
"""Connect to Redis server."""
|
|
try:
|
|
client = redis.Redis(
|
|
host=REDIS_CONFIG["host"],
|
|
port=REDIS_CONFIG["port"],
|
|
password=REDIS_CONFIG["password"],
|
|
db=REDIS_CONFIG["db"],
|
|
decode_responses=False # Keep bytes for images
|
|
)
|
|
client.ping()
|
|
print(f"✅ Connected to Redis at {REDIS_CONFIG['host']}:{REDIS_CONFIG['port']}")
|
|
return client
|
|
except redis.exceptions.ConnectionError as e:
|
|
print(f"❌ Failed to connect to Redis: {e}")
|
|
return None
|
|
|
|
def list_image_keys(client):
|
|
"""List all image keys in Redis."""
|
|
try:
|
|
# Look for keys matching the inference pattern
|
|
keys = client.keys("inference:*")
|
|
print(f"\n📋 Found {len(keys)} image keys:")
|
|
for i, key in enumerate(keys):
|
|
key_str = key.decode() if isinstance(key, bytes) else key
|
|
print(f"{i+1}. {key_str}")
|
|
return keys
|
|
except Exception as e:
|
|
print(f"❌ Error listing keys: {e}")
|
|
return []
|
|
|
|
def view_image(client, key):
|
|
"""View a specific image from Redis."""
|
|
try:
|
|
# Get image data from Redis
|
|
image_data = client.get(key)
|
|
if image_data is None:
|
|
print(f"❌ No data found for key: {key}")
|
|
return
|
|
|
|
print(f"📸 Image size: {len(image_data)} bytes")
|
|
|
|
# Convert bytes to numpy array
|
|
nparr = np.frombuffer(image_data, np.uint8)
|
|
|
|
# Decode image
|
|
img = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
|
|
if img is None:
|
|
print("❌ Failed to decode image data")
|
|
return
|
|
|
|
print(f"🖼️ Image dimensions: {img.shape[1]}x{img.shape[0]} pixels")
|
|
|
|
# Display image
|
|
key_str = key.decode() if isinstance(key, bytes) else key
|
|
cv2.imshow(f'Redis Image: {key_str}', img)
|
|
print("👁️ Image displayed. Press any key to close...")
|
|
cv2.waitKey(0)
|
|
cv2.destroyAllWindows()
|
|
|
|
# Ask if user wants to save the image
|
|
save = input("💾 Save image to file? (y/n): ").lower().strip()
|
|
if save == 'y':
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
filename = f"redis_image_{timestamp}.jpg"
|
|
cv2.imwrite(filename, img)
|
|
print(f"💾 Image saved as: {filename}")
|
|
|
|
except Exception as e:
|
|
print(f"❌ Error viewing image: {e}")
|
|
|
|
def monitor_new_images(client):
|
|
"""Monitor for new images being added to Redis."""
|
|
print("👀 Monitoring for new images... (Press Ctrl+C to stop)")
|
|
try:
|
|
# Subscribe to Redis pub/sub for car detections
|
|
pubsub = client.pubsub()
|
|
pubsub.subscribe('car_detections')
|
|
|
|
for message in pubsub.listen():
|
|
if message['type'] == 'message':
|
|
data = message['data'].decode()
|
|
print(f"🚨 New detection: {data}")
|
|
|
|
# Try to extract image key from message
|
|
import json
|
|
try:
|
|
detection_data = json.loads(data)
|
|
image_key = detection_data.get('image_key')
|
|
if image_key:
|
|
print(f"🖼️ New image available: {image_key}")
|
|
view_choice = input("View this image now? (y/n): ").lower().strip()
|
|
if view_choice == 'y':
|
|
view_image(client, image_key)
|
|
except json.JSONDecodeError:
|
|
pass
|
|
|
|
except KeyboardInterrupt:
|
|
print("\n👋 Stopping monitor...")
|
|
except Exception as e:
|
|
print(f"❌ Monitor error: {e}")
|
|
|
|
def main():
|
|
"""Main function."""
|
|
print("🔍 Redis Image Viewer")
|
|
print("=" * 50)
|
|
|
|
# Connect to Redis
|
|
client = connect_redis()
|
|
if not client:
|
|
return
|
|
|
|
while True:
|
|
print("\n📋 Options:")
|
|
print("1. List all image keys")
|
|
print("2. View specific image")
|
|
print("3. Monitor for new images")
|
|
print("4. Exit")
|
|
|
|
choice = input("\nEnter choice (1-4): ").strip()
|
|
|
|
if choice == '1':
|
|
keys = list_image_keys(client)
|
|
elif choice == '2':
|
|
keys = list_image_keys(client)
|
|
if keys:
|
|
try:
|
|
idx = int(input(f"\nEnter image number (1-{len(keys)}): ")) - 1
|
|
if 0 <= idx < len(keys):
|
|
view_image(client, keys[idx])
|
|
else:
|
|
print("❌ Invalid selection")
|
|
except ValueError:
|
|
print("❌ Please enter a valid number")
|
|
elif choice == '3':
|
|
monitor_new_images(client)
|
|
elif choice == '4':
|
|
print("👋 Goodbye!")
|
|
break
|
|
else:
|
|
print("❌ Invalid choice")
|
|
|
|
if __name__ == "__main__":
|
|
main() |