desk-controls/main.py

205 lines
5.8 KiB
Python

import json
import network
import ubinascii
import utime
from machine import Pin
import requests
from umqtt.simple import MQTTClient
from button import Button
from rotary_irq_esp import RotaryIRQ
from statetree import StateTree
D1 = 5
D2 = 4
D3 = 0
D4 = 2
D5 = 14
D6 = 12
D7 = 13
D8 = 15
D0 = 16
MQTT_KEEPALIVE = const(60)
MQTT_UPDATE_INTERVAL = const(60)
MQTT_RECONNECT_INTERVAL = const(60)
state = StateTree(
{
"network": "OFF",
}
)
rotary = RotaryIRQ(
D2,
D1,
0,
max_val=128,
range_mode=RotaryIRQ.RANGE_UNBOUNDED,
pull_up=True,
incr=5,
)
rotary_button = Button(Pin(D3, Pin.IN, Pin.PULL_UP), inverted=True)
rotary_value = rotary.value()
buttons = [
Button(Pin(D5, Pin.IN, Pin.PULL_UP), inverted=True),
Button(Pin(D6, Pin.IN, Pin.PULL_UP), inverted=True),
Button(Pin(D7, Pin.IN, Pin.PULL_UP), inverted=True),
]
with open("settings.json", "r") as f:
settings = json.load(f)
sta_if = network.WLAN(network.STA_IF)
mqtt = None
mqtt_client_id = ubinascii.hexlify(machine.unique_id())
mqtt_broker = settings["mqtt"]["broker"]
mqtt_prefix = settings["mqtt"]["prefix"]
last_update = 0
last_mqtt_attempt = 0
def mqtt_init():
print(f"Connecting MQTT client (host: {mqtt_broker}, prefix: {mqtt_prefix})")
username = settings["mqtt"].get("username")
password = settings["mqtt"].get("password")
if username and password:
print(f"Authenticating with MQTT as user '{username}")
mqtt = MQTTClient(
mqtt_client_id,
mqtt_broker,
keepalive=MQTT_KEEPALIVE,
user=username,
password=password,
)
else:
mqtt = MQTTClient(mqtt_client_id, mqtt_broker, keepalive=MQTT_KEEPALIVE)
mqtt.set_callback(on_message)
mqtt.set_last_will(f"{mqtt_prefix}/status", b"offline", retain=True)
mqtt.connect()
mqtt.subscribe(f"{mqtt_prefix}/set")
mqtt_device = {
"identifiers": mqtt_client_id,
"manufacturer": "correl",
"model": "desk-controls",
"name": "desk-controls",
}
return mqtt
def on_message(topic, msg):
print(f"MQTT <- [{topic}] {msg}")
class HomeAssistant:
def __init__(self, url: str, api_token: str) -> None:
self._url = url
self._api_token = api_token
def toggle_light(self, entity_id: str) -> None:
response = requests.post(
f"{self._url}/api/services/light/toggle",
headers={"Authorization": f"Bearer {self._api_token}"},
json={"entity_id": entity_id},
)
def adjust_light(self, entity_id: str, value: int) -> None:
response = requests.post(
f"{self._url}/api/services/light/turn_on",
headers={"Authorization": f"Bearer {self._api_token}"},
json={"entity_id": entity_id, "brightness_step": value},
)
def activate_scene(self, entity_id: str) -> None:
response = requests.post(
f"{self._url}/api/services/scene/turn_on",
headers={"Authorization": f"Bearer {self._api_token}"},
json={"entity_id": entity_id},
)
hass = HomeAssistant(
url=settings["home-assistant"]["url"],
api_token=settings["home-assistant"]["api_token"],
)
def loop():
global sta_if, state, last_update
global hass
global mqtt, last_mqtt_attempt
global rotary, rotary_button, rotary_value
global buttons
rotary_button.update()
if rotary_button.was_clicked():
print("CLICKED!")
if state["network"] == "OK":
hass.toggle_light("light.key_lights")
new_value = rotary.value()
if new_value != rotary_value:
change = new_value - rotary_value
print(f"ROTARY CHANGED: {new_value} ({change})")
if state["network"] == "OK":
hass.adjust_light("light.key_lights", change)
rotary_value = new_value
for i, button in enumerate(buttons):
button.update()
if button.was_clicked():
print(f"Pressed button {i}")
try:
scene = settings["scenes"][i]
print(f"Activating scene {scene}")
hass.activate_scene(scene)
except KeyError:
print("No scenes defined")
except IndexError:
print(f"No scene defined for button {i}")
if not sta_if.active():
print("Connecting to WiFi")
sta_if.active(True)
sta_if.connect(settings["wifi"]["ssid"], settings["wifi"]["password"])
if sta_if.active() and not sta_if.isconnected():
state["network"] = "ACT"
if sta_if.isconnected():
ip, _, _, _ = sta_if.ifconfig()
if ip == "0.0.0.0":
# Something went wrong, try to reconnect
print("IP invalid, retrying WiFi connection")
state["network"] = "ACT"
sta_if.active(True)
sta_if.connect(settings["wifi"]["ssid"], settings["wifi"]["password"])
elif state["network"] != "OK":
print(f"WIFI Connected to {sta_if.config('ssid')}")
print(f"IP Address: {ip}")
state["network"] = "OK"
if not mqtt and utime.time() - last_mqtt_attempt >= MQTT_RECONNECT_INTERVAL:
last_mqtt_attempt = utime.time()
try:
mqtt = mqtt_init()
except OSError as e:
print(f"Failed to connect to MQTT ({mqtt_broker}): {e}")
if mqtt:
if state.changed or utime.time() - last_update >= MQTT_UPDATE_INTERVAL:
topic = f"{mqtt_prefix}/state".encode()
payload = json.dumps(state.dictionary).encode()
print(f"MQTT -> [{topic}] {payload}")
mqtt.publish(f"{mqtt_prefix}/status", b"online", retain=True)
mqtt.publish(topic, payload, retain=True)
last_update = utime.time()
mqtt.check_msg()
state.clean()
while True:
loop()
utime.sleep_ms(10)