import json import network import ubinascii import utime from machine import Pin import requests from umqtt.simple import MQTTClient from button import Button from rotary_irq_esp import RotaryIRQ from statetree import StateTree D1 = 5 D2 = 4 D3 = 0 D4 = 2 D5 = 14 D6 = 12 D7 = 13 D8 = 15 D0 = 16 MQTT_KEEPALIVE = const(60) MQTT_UPDATE_INTERVAL = const(60) MQTT_RECONNECT_INTERVAL = const(60) state = StateTree( { "network": "OFF", } ) rotary = RotaryIRQ( D4, D5, 0, max_val=128, range_mode=RotaryIRQ.RANGE_UNBOUNDED, pull_up=True, incr=5, ) rotary_button = Button(Pin(D1, Pin.IN, Pin.PULL_UP), inverted=True) rotary_value = rotary.value() with open("settings.json", "r") as f: settings = json.load(f) sta_if = network.WLAN(network.STA_IF) mqtt = None mqtt_client_id = ubinascii.hexlify(machine.unique_id()) mqtt_broker = settings["mqtt"]["broker"] mqtt_prefix = settings["mqtt"]["prefix"] last_update = 0 last_mqtt_attempt = 0 def mqtt_init(): print("Starting MQTT client") mqtt = MQTTClient(mqtt_client_id, mqtt_broker, keepalive=MQTT_KEEPALIVE) mqtt.set_callback(on_message) mqtt.set_last_will(f"{mqtt_prefix}/status", b"offline", retain=True) mqtt.connect() mqtt.subscribe(f"{mqtt_prefix}/set") mqtt_device = { "identifiers": mqtt_client_id, "manufacturer": "correl", "model": "desk-controls", "name": "desk-controls", } return mqtt def on_message(topic, msg): print(f"MQTT <- [{topic}] {msg}") class HomeAssistant: def __init__(self, url: str, api_token: str) -> None: self._url = url self._api_token = api_token def toggle_light(self, entity_id: str) -> None: response = requests.post( f"{self._url}/api/services/light/toggle", headers={"Authorization": f"Bearer {self._api_token}"}, json={"entity_id": entity_id}, ) def adjust_light(self, entity_id: str, value: int) -> None: response = requests.post( f"{self._url}/api/services/light/turn_on", headers={"Authorization": f"Bearer {self._api_token}"}, json={"entity_id": entity_id, "brightness_step": value}, ) hass = HomeAssistant( url=settings["home-assistant"]["url"], api_token=settings["home-assistant"]["api_token"], ) def loop(): global sta_if, state, last_update global hass global mqtt, last_mqtt_attempt global rotary, rotary_button, rotary_value rotary_button.update() if rotary_button.was_clicked(): print("CLICKED!") if state["network"] == "OK": hass.toggle_light("light.key_lights") new_value = rotary.value() if new_value != rotary_value: change = new_value - rotary_value print(f"ROTARY CHANGED: {new_value} ({change})") if state["network"] == "OK": hass.adjust_light("light.key_lights", change) rotary_value = new_value if not sta_if.active(): print("Connecting to WiFi") sta_if.active(True) sta_if.connect(settings["wifi"]["ssid"], settings["wifi"]["password"]) if sta_if.active() and not sta_if.isconnected(): state["network"] = "ACT" if sta_if.isconnected(): ip, _, _, _ = sta_if.ifconfig() if ip == "0.0.0.0": # Something went wrong, try to reconnect print("IP invalid, retrying WiFi connection") state["network"] = "ACT" sta_if.active(True) sta_if.connect(settings["wifi"]["ssid"], settings["wifi"]["password"]) elif state["network"] != "OK": print(f"WIFI Connected to {sta_if.config('ssid')}") print(f"IP Address: {ip}") state["network"] = "OK" if not mqtt and utime.time() - last_mqtt_attempt >= MQTT_RECONNECT_INTERVAL: last_mqtt_attempt = utime.time() try: mqtt = mqtt_init() except OSError as e: print(f"Failed to connect to MQTT ({mqtt_broker}): {e}") if mqtt: if state.changed or utime.time() - last_update >= MQTT_UPDATE_INTERVAL: topic = f"{mqtt_prefix}/state".encode() payload = json.dumps(state.dictionary).encode() print(f"MQTT -> [{topic}] {payload}") mqtt.publish(f"{mqtt_prefix}/status", b"online", retain=True) mqtt.publish(topic, payload, retain=True) last_update = utime.time() mqtt.check_msg() state.clean() while True: loop() utime.sleep_ms(10)