appdaemon-goodies/hass_mqtt_advanced_light_sync.py

285 lines
No EOL
13 KiB
Python

import appdaemon.plugins.hass.hassapi as hass
import appdaemon.plugins.mqtt.mqttapi as mqtt
import appdaemon.adbase as base
import json
class hass_mqtt_advanced_light_sync(hass.Hass, mqtt.Mqtt):
def initialize(self):
self.entity_id = self.args["entity_id"]
self.entity = self.get_entity(self.entity_id)
self.mqtt_topic_base = self.args["mqtt_topic_base"] # e.g., "/commonroom/dl1"
# MQTT Topics
self.mqtt_topic_state = self.mqtt_topic_base
self.mqtt_topic_state_set = f"{self.mqtt_topic_base}/set"
self.mqtt_topic_state_request = f"{self.mqtt_topic_base}/request"
self.mqtt_topic_brightness = f"{self.mqtt_topic_base}/brightness"
self.mqtt_topic_brightness_set = f"{self.mqtt_topic_base}/brightness/set"
self.mqtt_topic_color_r = f"{self.mqtt_topic_base}/color/r"
self.mqtt_topic_color_r_set = f"{self.mqtt_topic_base}/color/r/set"
self.mqtt_topic_color_g = f"{self.mqtt_topic_base}/color/g"
self.mqtt_topic_color_g_set = f"{self.mqtt_topic_base}/color/g/set"
self.mqtt_topic_color_b = f"{self.mqtt_topic_base}/color/b"
self.mqtt_topic_color_b_set = f"{self.mqtt_topic_base}/color/b/set"
self.mqtt_topic_temp = f"{self.mqtt_topic_base}/temp"
self.mqtt_topic_temp_set = f"{self.mqtt_topic_base}/temp/set"
self.mqtt_topic_mode = f"{self.mqtt_topic_base}/mode"
self.mqtt_topic_mode_set = f"{self.mqtt_topic_base}/mode/set"
# Light capabilities from config
self.has_brightness = self.args.get("has_brightness", True)
self.has_rgb = self.args.get("has_rgb", False)
self.has_ct = self.args.get("has_ct", False)
self.ct_min = self.args.get("ct_min", 2700) # Kelvin
self.ct_max = self.args.get("ct_max", 6500) # Kelvin
# Subscribe to MQTT set topics
self.mqtt_subscribe(self.mqtt_topic_state_set, namespace="mqtt")
self.mqtt_subscribe(self.mqtt_topic_state_request, namespace="mqtt")
if self.has_brightness:
self.mqtt_subscribe(self.mqtt_topic_brightness_set, namespace="mqtt")
if self.has_rgb:
self.mqtt_subscribe(self.mqtt_topic_color_r_set, namespace="mqtt")
self.mqtt_subscribe(self.mqtt_topic_color_g_set, namespace="mqtt")
self.mqtt_subscribe(self.mqtt_topic_color_b_set, namespace="mqtt")
if self.has_ct:
self.mqtt_subscribe(self.mqtt_topic_temp_set, namespace="mqtt")
if self.has_rgb and self.has_ct:
self.mqtt_subscribe(self.mqtt_topic_mode_set, namespace="mqtt")
# Listen to MQTT events
self.listen_event(self.handle_state_set, "MQTT_MESSAGE", topic=self.mqtt_topic_state_set, namespace="mqtt")
self.listen_event(self.handle_state_request, "MQTT_MESSAGE", topic=self.mqtt_topic_state_request, namespace="mqtt")
if self.has_brightness:
self.listen_event(self.handle_brightness_set, "MQTT_MESSAGE", topic=self.mqtt_topic_brightness_set, namespace="mqtt")
if self.has_rgb:
self.listen_event(self.handle_color_r_set, "MQTT_MESSAGE", topic=self.mqtt_topic_color_r_set, namespace="mqtt")
self.listen_event(self.handle_color_g_set, "MQTT_MESSAGE", topic=self.mqtt_topic_color_g_set, namespace="mqtt")
self.listen_event(self.handle_color_b_set, "MQTT_MESSAGE", topic=self.mqtt_topic_color_b_set, namespace="mqtt")
if self.has_ct:
self.listen_event(self.handle_temp_set, "MQTT_MESSAGE", topic=self.mqtt_topic_temp_set, namespace="mqtt")
if self.has_rgb and self.has_ct:
self.listen_event(self.handle_mode_set, "MQTT_MESSAGE", topic=self.mqtt_topic_mode_set, namespace="mqtt")
# Listen to Home Assistant state changes
self.entity.listen_state(self.handle_hass_state_change)
# Initial state publish
self.publish_all_states_to_mqtt()
def publish_all_states_to_mqtt(self):
"""Publish all current Home Assistant states to MQTT"""
try:
# Main state (on/off)
state = self.get_state(self.entity_id)
self.mqtt_publish(self.mqtt_topic_state, str(state), namespace="mqtt")
if state == "on":
# Brightness
if self.has_brightness:
brightness = self.get_state(self.entity_id, attribute="brightness")
if brightness is not None:
self.mqtt_publish(self.mqtt_topic_brightness, str(brightness), namespace="mqtt")
# RGB Color
if self.has_rgb:
rgb_color = self.get_state(self.entity_id, attribute="rgb_color")
if rgb_color is not None and len(rgb_color) >= 3:
self.mqtt_publish(self.mqtt_topic_color_r, str(rgb_color[0]), namespace="mqtt")
self.mqtt_publish(self.mqtt_topic_color_g, str(rgb_color[1]), namespace="mqtt")
self.mqtt_publish(self.mqtt_topic_color_b, str(rgb_color[2]), namespace="mqtt")
# Color Temperature
if self.has_ct:
color_temp = self.get_state(self.entity_id, attribute="color_temp")
if color_temp is not None:
# Convert from Kelvin to 0-4095 range
ct_normalized = self.kelvin_to_4095(color_temp)
self.mqtt_publish(self.mqtt_topic_temp, str(ct_normalized), namespace="mqtt")
# Mode determination
if self.has_rgb and self.has_ct:
# Determine mode based on current light state
color_mode = self.get_state(self.entity_id, attribute="color_mode")
if color_mode == "rgb":
self.mqtt_publish(self.mqtt_topic_mode, "rgb", namespace="mqtt")
elif color_mode in ["color_temp", "white"]:
self.mqtt_publish(self.mqtt_topic_mode, "ct", namespace="mqtt")
else:
# Default to RGB mode
self.mqtt_publish(self.mqtt_topic_mode, "rgb", namespace="mqtt")
except Exception as e:
self.log(f"Error publishing states: {e}")
def kelvin_to_4095(self, kelvin):
"""Convert Kelvin temperature to 0-4095 range"""
if kelvin <= self.ct_min:
return 0
elif kelvin >= self.ct_max:
return 4095
else:
# Linear interpolation
ratio = (kelvin - self.ct_min) / (self.ct_max - self.ct_min)
return int(ratio * 4095)
def range_4095_to_kelvin(self, value_4095):
"""Convert 0-4095 range to Kelvin temperature"""
if value_4095 <= 0:
return self.ct_min
elif value_4095 >= 4095:
return self.ct_max
else:
# Linear interpolation
ratio = value_4095 / 4095
return int(self.ct_min + ratio * (self.ct_max - self.ct_min))
# MQTT → Home Assistant handlers
def handle_state_set(self, event_name, data, cb_args):
"""Handle on/off state changes from MQTT"""
state = data["payload"].lower()
if state == "on":
self.call_service("homeassistant/turn_on", entity_id=self.entity_id)
elif state == "off":
self.call_service("homeassistant/turn_off", entity_id=self.entity_id)
def handle_state_request(self, event_name, data, cb_args):
"""Handle state request from MQTT"""
self.publish_all_states_to_mqtt()
def handle_brightness_set(self, event_name, data, cb_args):
"""Handle brightness changes from MQTT"""
if not self.has_brightness:
return
try:
brightness = int(data["payload"])
brightness = max(0, min(255, brightness)) # Clamp to 0-255
self.call_service("light/turn_on", entity_id=self.entity_id, brightness=brightness)
except ValueError:
self.log(f"Invalid brightness value: {data['payload']}")
def handle_color_r_set(self, event_name, data, cb_args):
"""Handle red color changes from MQTT"""
self.handle_color_component_set("r", data)
def handle_color_g_set(self, event_name, data, cb_args):
"""Handle green color changes from MQTT"""
self.handle_color_component_set("g", data)
def handle_color_b_set(self, event_name, data, cb_args):
"""Handle blue color changes from MQTT"""
self.handle_color_component_set("b", data)
def handle_color_component_set(self, component, data):
"""Handle RGB color component changes from MQTT"""
if not self.has_rgb:
return
try:
value = int(data["payload"])
value = max(0, min(255, value)) # Clamp to 0-255
# Get current RGB values
current_rgb = self.get_state(self.entity_id, attribute="rgb_color")
if current_rgb is None:
current_rgb = [255, 255, 255] # Default to white
# Update the specific component
new_rgb = list(current_rgb)
if component == "r":
new_rgb[0] = value
elif component == "g":
new_rgb[1] = value
elif component == "b":
new_rgb[2] = value
# Set the new RGB color
self.call_service("light/turn_on", entity_id=self.entity_id, rgb_color=new_rgb)
except ValueError:
self.log(f"Invalid color {component} value: {data['payload']}")
def handle_temp_set(self, event_name, data, cb_args):
"""Handle color temperature changes from MQTT"""
if not self.has_ct:
return
try:
temp_4095 = int(data["payload"])
temp_4095 = max(0, min(4095, temp_4095)) # Clamp to 0-4095
# Convert to Kelvin
kelvin = self.range_4095_to_kelvin(temp_4095)
# Set color temperature
self.call_service("light/turn_on", entity_id=self.entity_id, color_temp=kelvin)
except ValueError:
self.log(f"Invalid color temperature value: {data['payload']}")
def handle_mode_set(self, event_name, data, cb_args):
"""Handle mode changes from MQTT"""
if not (self.has_rgb and self.has_ct):
return
mode = data["payload"].lower()
if mode == "rgb":
# Switch to RGB mode - set a default RGB color
self.call_service("light/turn_on", entity_id=self.entity_id, rgb_color=[255, 255, 255])
elif mode == "ct":
# Switch to CT mode - set a default color temperature
default_kelvin = (self.ct_min + self.ct_max) // 2
self.call_service("light/turn_on", entity_id=self.entity_id, color_temp=default_kelvin)
# Home Assistant → MQTT handler
def handle_hass_state_change(self, entity, attribute, old, new, cb_args):
"""Handle Home Assistant state changes and publish to MQTT"""
self.log(f"State change detected: {attribute} {old} -> {new}")
# Publish the changed state
if attribute == "state":
self.mqtt_publish(self.mqtt_topic_state, str(new), namespace="mqtt")
# If turning on, publish all attributes
if new == "on":
self.run_in(self.delayed_publish_all, 0.5) # Small delay to ensure attributes are updated
elif attribute == "brightness" and self.has_brightness:
if new is not None:
self.mqtt_publish(self.mqtt_topic_brightness, str(new), namespace="mqtt")
elif attribute == "rgb_color" and self.has_rgb:
if new is not None and len(new) >= 3:
self.mqtt_publish(self.mqtt_topic_color_r, str(new[0]), namespace="mqtt")
self.mqtt_publish(self.mqtt_topic_color_g, str(new[1]), namespace="mqtt")
self.mqtt_publish(self.mqtt_topic_color_b, str(new[2]), namespace="mqtt")
# Update mode to RGB
if self.has_ct:
self.mqtt_publish(self.mqtt_topic_mode, "rgb", namespace="mqtt")
elif attribute == "color_temp" and self.has_ct:
if new is not None:
# Convert to 0-4095 range
ct_normalized = self.kelvin_to_4095(new)
self.mqtt_publish(self.mqtt_topic_temp, str(ct_normalized), namespace="mqtt")
# Update mode to CT
if self.has_rgb:
self.mqtt_publish(self.mqtt_topic_mode, "ct", namespace="mqtt")
def delayed_publish_all(self, cb_args):
"""Delayed publish all states - used when light turns on"""
self.publish_all_states_to_mqtt()