import appdaemon.plugins.hass.hassapi as hass import appdaemon.plugins.mqtt.mqttapi as mqtt import appdaemon.adbase as base import json class hass_mqtt_advanced_light_sync(hass.Hass, mqtt.Mqtt): def initialize(self): self.entity_id = self.args["entity_id"] self.entity = self.get_entity(self.entity_id) self.mqtt_topic_base = self.args["mqtt_topic_base"] # e.g., "/commonroom/dl1" # MQTT Topics self.mqtt_topic_state = self.mqtt_topic_base self.mqtt_topic_state_set = f"{self.mqtt_topic_base}/set" self.mqtt_topic_state_request = f"{self.mqtt_topic_base}/request" self.mqtt_topic_brightness = f"{self.mqtt_topic_base}/brightness" self.mqtt_topic_brightness_set = f"{self.mqtt_topic_base}/brightness/set" self.mqtt_topic_color_r = f"{self.mqtt_topic_base}/color/r" self.mqtt_topic_color_r_set = f"{self.mqtt_topic_base}/color/r/set" self.mqtt_topic_color_g = f"{self.mqtt_topic_base}/color/g" self.mqtt_topic_color_g_set = f"{self.mqtt_topic_base}/color/g/set" self.mqtt_topic_color_b = f"{self.mqtt_topic_base}/color/b" self.mqtt_topic_color_b_set = f"{self.mqtt_topic_base}/color/b/set" self.mqtt_topic_temp = f"{self.mqtt_topic_base}/temp" self.mqtt_topic_temp_set = f"{self.mqtt_topic_base}/temp/set" self.mqtt_topic_mode = f"{self.mqtt_topic_base}/mode" self.mqtt_topic_mode_set = f"{self.mqtt_topic_base}/mode/set" # Light capabilities from config self.has_brightness = self.args.get("has_brightness", True) self.has_rgb = self.args.get("has_rgb", False) self.has_ct = self.args.get("has_ct", False) self.ct_min = self.args.get("ct_min", 2700) # Kelvin self.ct_max = self.args.get("ct_max", 6500) # Kelvin # Subscribe to MQTT set topics self.mqtt_subscribe(self.mqtt_topic_state_set, namespace="mqtt") self.mqtt_subscribe(self.mqtt_topic_state_request, namespace="mqtt") if self.has_brightness: self.mqtt_subscribe(self.mqtt_topic_brightness_set, namespace="mqtt") if self.has_rgb: self.mqtt_subscribe(self.mqtt_topic_color_r_set, namespace="mqtt") self.mqtt_subscribe(self.mqtt_topic_color_g_set, namespace="mqtt") self.mqtt_subscribe(self.mqtt_topic_color_b_set, namespace="mqtt") if self.has_ct: self.mqtt_subscribe(self.mqtt_topic_temp_set, namespace="mqtt") if self.has_rgb and self.has_ct: self.mqtt_subscribe(self.mqtt_topic_mode_set, namespace="mqtt") # Listen to MQTT events self.listen_event(self.handle_state_set, "MQTT_MESSAGE", topic=self.mqtt_topic_state_set, namespace="mqtt") self.listen_event(self.handle_state_request, "MQTT_MESSAGE", topic=self.mqtt_topic_state_request, namespace="mqtt") if self.has_brightness: self.listen_event(self.handle_brightness_set, "MQTT_MESSAGE", topic=self.mqtt_topic_brightness_set, namespace="mqtt") if self.has_rgb: self.listen_event(self.handle_color_r_set, "MQTT_MESSAGE", topic=self.mqtt_topic_color_r_set, namespace="mqtt") self.listen_event(self.handle_color_g_set, "MQTT_MESSAGE", topic=self.mqtt_topic_color_g_set, namespace="mqtt") self.listen_event(self.handle_color_b_set, "MQTT_MESSAGE", topic=self.mqtt_topic_color_b_set, namespace="mqtt") if self.has_ct: self.listen_event(self.handle_temp_set, "MQTT_MESSAGE", topic=self.mqtt_topic_temp_set, namespace="mqtt") if self.has_rgb and self.has_ct: self.listen_event(self.handle_mode_set, "MQTT_MESSAGE", topic=self.mqtt_topic_mode_set, namespace="mqtt") # Listen to Home Assistant state changes self.entity.listen_state(self.handle_hass_state_change) # Initial state publish self.publish_all_states_to_mqtt() def publish_all_states_to_mqtt(self): """Publish all current Home Assistant states to MQTT""" try: # Main state (on/off) state = self.get_state(self.entity_id) self.mqtt_publish(self.mqtt_topic_state, str(state), namespace="mqtt") if state == "on": # Brightness if self.has_brightness: brightness = self.get_state(self.entity_id, attribute="brightness") if brightness is not None: self.mqtt_publish(self.mqtt_topic_brightness, str(brightness), namespace="mqtt") # RGB Color if self.has_rgb: rgb_color = self.get_state(self.entity_id, attribute="rgb_color") if rgb_color is not None and len(rgb_color) >= 3: self.mqtt_publish(self.mqtt_topic_color_r, str(rgb_color[0]), namespace="mqtt") self.mqtt_publish(self.mqtt_topic_color_g, str(rgb_color[1]), namespace="mqtt") self.mqtt_publish(self.mqtt_topic_color_b, str(rgb_color[2]), namespace="mqtt") # Color Temperature if self.has_ct: color_temp = self.get_state(self.entity_id, attribute="color_temp") if color_temp is not None: # Convert from Kelvin to 0-4095 range ct_normalized = self.kelvin_to_4095(color_temp) self.mqtt_publish(self.mqtt_topic_temp, str(ct_normalized), namespace="mqtt") # Mode determination if self.has_rgb and self.has_ct: # Determine mode based on current light state color_mode = self.get_state(self.entity_id, attribute="color_mode") if color_mode == "rgb": self.mqtt_publish(self.mqtt_topic_mode, "rgb", namespace="mqtt") elif color_mode in ["color_temp", "white"]: self.mqtt_publish(self.mqtt_topic_mode, "ct", namespace="mqtt") else: # Default to RGB mode self.mqtt_publish(self.mqtt_topic_mode, "rgb", namespace="mqtt") except Exception as e: self.log(f"Error publishing states: {e}") def kelvin_to_4095(self, kelvin): """Convert Kelvin temperature to 0-4095 range""" if kelvin <= self.ct_min: return 0 elif kelvin >= self.ct_max: return 4095 else: # Linear interpolation ratio = (kelvin - self.ct_min) / (self.ct_max - self.ct_min) return int(ratio * 4095) def range_4095_to_kelvin(self, value_4095): """Convert 0-4095 range to Kelvin temperature""" if value_4095 <= 0: return self.ct_min elif value_4095 >= 4095: return self.ct_max else: # Linear interpolation ratio = value_4095 / 4095 return int(self.ct_min + ratio * (self.ct_max - self.ct_min)) # MQTT → Home Assistant handlers def handle_state_set(self, event_name, data, cb_args): """Handle on/off state changes from MQTT""" state = data["payload"].lower() if state == "on": self.call_service("homeassistant/turn_on", entity_id=self.entity_id) elif state == "off": self.call_service("homeassistant/turn_off", entity_id=self.entity_id) def handle_state_request(self, event_name, data, cb_args): """Handle state request from MQTT""" self.publish_all_states_to_mqtt() def handle_brightness_set(self, event_name, data, cb_args): """Handle brightness changes from MQTT""" if not self.has_brightness: return try: brightness = int(data["payload"]) brightness = max(0, min(255, brightness)) # Clamp to 0-255 self.call_service("light/turn_on", entity_id=self.entity_id, brightness=brightness) except ValueError: self.log(f"Invalid brightness value: {data['payload']}") def handle_color_r_set(self, event_name, data, cb_args): """Handle red color changes from MQTT""" self.handle_color_component_set("r", data) def handle_color_g_set(self, event_name, data, cb_args): """Handle green color changes from MQTT""" self.handle_color_component_set("g", data) def handle_color_b_set(self, event_name, data, cb_args): """Handle blue color changes from MQTT""" self.handle_color_component_set("b", data) def handle_color_component_set(self, component, data): """Handle RGB color component changes from MQTT""" if not self.has_rgb: return try: value = int(data["payload"]) value = max(0, min(255, value)) # Clamp to 0-255 # Get current RGB values current_rgb = self.get_state(self.entity_id, attribute="rgb_color") if current_rgb is None: current_rgb = [255, 255, 255] # Default to white # Update the specific component new_rgb = list(current_rgb) if component == "r": new_rgb[0] = value elif component == "g": new_rgb[1] = value elif component == "b": new_rgb[2] = value # Set the new RGB color self.call_service("light/turn_on", entity_id=self.entity_id, rgb_color=new_rgb) except ValueError: self.log(f"Invalid color {component} value: {data['payload']}") def handle_temp_set(self, event_name, data, cb_args): """Handle color temperature changes from MQTT""" if not self.has_ct: return try: temp_4095 = int(data["payload"]) temp_4095 = max(0, min(4095, temp_4095)) # Clamp to 0-4095 # Convert to Kelvin kelvin = self.range_4095_to_kelvin(temp_4095) # Set color temperature self.call_service("light/turn_on", entity_id=self.entity_id, color_temp=kelvin) except ValueError: self.log(f"Invalid color temperature value: {data['payload']}") def handle_mode_set(self, event_name, data, cb_args): """Handle mode changes from MQTT""" if not (self.has_rgb and self.has_ct): return mode = data["payload"].lower() if mode == "rgb": # Switch to RGB mode - set a default RGB color self.call_service("light/turn_on", entity_id=self.entity_id, rgb_color=[255, 255, 255]) elif mode == "ct": # Switch to CT mode - set a default color temperature default_kelvin = (self.ct_min + self.ct_max) // 2 self.call_service("light/turn_on", entity_id=self.entity_id, color_temp=default_kelvin) # Home Assistant → MQTT handler def handle_hass_state_change(self, entity, attribute, old, new, cb_args): """Handle Home Assistant state changes and publish to MQTT""" self.log(f"State change detected: {attribute} {old} -> {new}") # Publish the changed state if attribute == "state": self.mqtt_publish(self.mqtt_topic_state, str(new), namespace="mqtt") # If turning on, publish all attributes if new == "on": self.run_in(self.delayed_publish_all, 0.5) # Small delay to ensure attributes are updated elif attribute == "brightness" and self.has_brightness: if new is not None: self.mqtt_publish(self.mqtt_topic_brightness, str(new), namespace="mqtt") elif attribute == "rgb_color" and self.has_rgb: if new is not None and len(new) >= 3: self.mqtt_publish(self.mqtt_topic_color_r, str(new[0]), namespace="mqtt") self.mqtt_publish(self.mqtt_topic_color_g, str(new[1]), namespace="mqtt") self.mqtt_publish(self.mqtt_topic_color_b, str(new[2]), namespace="mqtt") # Update mode to RGB if self.has_ct: self.mqtt_publish(self.mqtt_topic_mode, "rgb", namespace="mqtt") elif attribute == "color_temp" and self.has_ct: if new is not None: # Convert to 0-4095 range ct_normalized = self.kelvin_to_4095(new) self.mqtt_publish(self.mqtt_topic_temp, str(ct_normalized), namespace="mqtt") # Update mode to CT if self.has_rgb: self.mqtt_publish(self.mqtt_topic_mode, "ct", namespace="mqtt") def delayed_publish_all(self, cb_args): """Delayed publish all states - used when light turns on""" self.publish_all_states_to_mqtt()