#!/usr/bin/env python3 """ Script to view frontal images saved in Redis """ import redis import cv2 import numpy as np import sys from datetime import datetime # Redis connection config (from pipeline.json) REDIS_CONFIG = { "host": "10.100.1.3", "port": 6379, "password": "FBQgi0i5RevAAMO5Hh66", "db": 0 } def connect_redis(): """Connect to Redis server.""" try: client = redis.Redis( host=REDIS_CONFIG["host"], port=REDIS_CONFIG["port"], password=REDIS_CONFIG["password"], db=REDIS_CONFIG["db"], decode_responses=False # Keep bytes for images ) client.ping() print(f"āœ… Connected to Redis at {REDIS_CONFIG['host']}:{REDIS_CONFIG['port']}") return client except redis.exceptions.ConnectionError as e: print(f"āŒ Failed to connect to Redis: {e}") return None def list_image_keys(client): """List all image keys in Redis.""" try: # Look for keys matching the inference pattern keys = client.keys("inference:*") print(f"\nšŸ“‹ Found {len(keys)} image keys:") for i, key in enumerate(keys): key_str = key.decode() if isinstance(key, bytes) else key print(f"{i+1}. {key_str}") return keys except Exception as e: print(f"āŒ Error listing keys: {e}") return [] def view_image(client, key): """View a specific image from Redis.""" try: # Get image data from Redis image_data = client.get(key) if image_data is None: print(f"āŒ No data found for key: {key}") return print(f"šŸ“ø Image size: {len(image_data)} bytes") # Convert bytes to numpy array nparr = np.frombuffer(image_data, np.uint8) # Decode image img = cv2.imdecode(nparr, cv2.IMREAD_COLOR) if img is None: print("āŒ Failed to decode image data") return print(f"šŸ–¼ļø Image dimensions: {img.shape[1]}x{img.shape[0]} pixels") # Display image key_str = key.decode() if isinstance(key, bytes) else key cv2.imshow(f'Redis Image: {key_str}', img) print("šŸ‘ļø Image displayed. Press any key to close...") cv2.waitKey(0) cv2.destroyAllWindows() # Ask if user wants to save the image save = input("šŸ’¾ Save image to file? (y/n): ").lower().strip() if save == 'y': timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") filename = f"redis_image_{timestamp}.jpg" cv2.imwrite(filename, img) print(f"šŸ’¾ Image saved as: {filename}") except Exception as e: print(f"āŒ Error viewing image: {e}") def monitor_new_images(client): """Monitor for new images being added to Redis.""" print("šŸ‘€ Monitoring for new images... (Press Ctrl+C to stop)") try: # Subscribe to Redis pub/sub for car detections pubsub = client.pubsub() pubsub.subscribe('car_detections') for message in pubsub.listen(): if message['type'] == 'message': data = message['data'].decode() print(f"🚨 New detection: {data}") # Try to extract image key from message import json try: detection_data = json.loads(data) image_key = detection_data.get('image_key') if image_key: print(f"šŸ–¼ļø New image available: {image_key}") view_choice = input("View this image now? (y/n): ").lower().strip() if view_choice == 'y': view_image(client, image_key) except json.JSONDecodeError: pass except KeyboardInterrupt: print("\nšŸ‘‹ Stopping monitor...") except Exception as e: print(f"āŒ Monitor error: {e}") def main(): """Main function.""" print("šŸ” Redis Image Viewer") print("=" * 50) # Connect to Redis client = connect_redis() if not client: return while True: print("\nšŸ“‹ Options:") print("1. List all image keys") print("2. View specific image") print("3. Monitor for new images") print("4. Exit") choice = input("\nEnter choice (1-4): ").strip() if choice == '1': keys = list_image_keys(client) elif choice == '2': keys = list_image_keys(client) if keys: try: idx = int(input(f"\nEnter image number (1-{len(keys)}): ")) - 1 if 0 <= idx < len(keys): view_image(client, keys[idx]) else: print("āŒ Invalid selection") except ValueError: print("āŒ Please enter a valid number") elif choice == '3': monitor_new_images(client) elif choice == '4': print("šŸ‘‹ Goodbye!") break else: print("āŒ Invalid choice") if __name__ == "__main__": main()