204 lines
5.8 KiB
Python
204 lines
5.8 KiB
Python
import json
|
|
import network
|
|
import ubinascii
|
|
import utime
|
|
from machine import Pin
|
|
|
|
import requests
|
|
from umqtt.simple import MQTTClient
|
|
|
|
from button import Button
|
|
from rotary_irq_esp import RotaryIRQ
|
|
from statetree import StateTree
|
|
|
|
D1 = 5
|
|
D2 = 4
|
|
D3 = 0
|
|
D4 = 2
|
|
D5 = 14
|
|
D6 = 12
|
|
D7 = 13
|
|
D8 = 15
|
|
D0 = 16
|
|
|
|
MQTT_KEEPALIVE = const(60)
|
|
MQTT_UPDATE_INTERVAL = const(60)
|
|
MQTT_RECONNECT_INTERVAL = const(60)
|
|
|
|
state = StateTree(
|
|
{
|
|
"network": "OFF",
|
|
}
|
|
)
|
|
|
|
rotary = RotaryIRQ(
|
|
D2,
|
|
D1,
|
|
0,
|
|
max_val=128,
|
|
range_mode=RotaryIRQ.RANGE_UNBOUNDED,
|
|
pull_up=True,
|
|
incr=5,
|
|
)
|
|
|
|
rotary_button = Button(Pin(D3, Pin.IN, Pin.PULL_UP), inverted=True)
|
|
rotary_value = rotary.value()
|
|
|
|
buttons = [
|
|
Button(Pin(D5, Pin.IN, Pin.PULL_UP), inverted=True),
|
|
Button(Pin(D6, Pin.IN, Pin.PULL_UP), inverted=True),
|
|
Button(Pin(D7, Pin.IN, Pin.PULL_UP), inverted=True),
|
|
]
|
|
|
|
with open("settings.json", "r") as f:
|
|
settings = json.load(f)
|
|
|
|
sta_if = network.WLAN(network.STA_IF)
|
|
|
|
mqtt = None
|
|
mqtt_client_id = ubinascii.hexlify(machine.unique_id())
|
|
mqtt_broker = settings["mqtt"]["broker"]
|
|
mqtt_prefix = settings["mqtt"]["prefix"]
|
|
last_update = 0
|
|
last_mqtt_attempt = 0
|
|
|
|
|
|
def mqtt_init():
|
|
print(f"Connecting MQTT client (host: {mqtt_broker}, prefix: {mqtt_prefix})")
|
|
username = settings["mqtt"].get("username")
|
|
password = settings["mqtt"].get("password")
|
|
if username and password:
|
|
print(f"Authenticating with MQTT as user '{username}")
|
|
mqtt = MQTTClient(
|
|
mqtt_client_id,
|
|
mqtt_broker,
|
|
keepalive=MQTT_KEEPALIVE,
|
|
user=username,
|
|
password=password,
|
|
)
|
|
else:
|
|
mqtt = MQTTClient(mqtt_client_id, mqtt_broker, keepalive=MQTT_KEEPALIVE)
|
|
mqtt.set_callback(on_message)
|
|
mqtt.set_last_will(f"{mqtt_prefix}/status", b"offline", retain=True)
|
|
mqtt.connect()
|
|
mqtt.subscribe(f"{mqtt_prefix}/set")
|
|
mqtt_device = {
|
|
"identifiers": mqtt_client_id,
|
|
"manufacturer": "correl",
|
|
"model": "desk-controls",
|
|
"name": "desk-controls",
|
|
}
|
|
return mqtt
|
|
|
|
|
|
def on_message(topic, msg):
|
|
print(f"MQTT <- [{topic}] {msg}")
|
|
|
|
|
|
class HomeAssistant:
|
|
def __init__(self, url: str, api_token: str) -> None:
|
|
self._url = url
|
|
self._api_token = api_token
|
|
|
|
def toggle_light(self, entity_id: str) -> None:
|
|
response = requests.post(
|
|
f"{self._url}/api/services/light/toggle",
|
|
headers={"Authorization": f"Bearer {self._api_token}"},
|
|
json={"entity_id": entity_id},
|
|
)
|
|
|
|
def adjust_light(self, entity_id: str, value: int) -> None:
|
|
response = requests.post(
|
|
f"{self._url}/api/services/light/turn_on",
|
|
headers={"Authorization": f"Bearer {self._api_token}"},
|
|
json={"entity_id": entity_id, "brightness_step": value},
|
|
)
|
|
|
|
def activate_scene(self, entity_id: str) -> None:
|
|
response = requests.post(
|
|
f"{self._url}/api/services/scene/turn_on",
|
|
headers={"Authorization": f"Bearer {self._api_token}"},
|
|
json={"entity_id": entity_id},
|
|
)
|
|
|
|
|
|
hass = HomeAssistant(
|
|
url=settings["home-assistant"]["url"],
|
|
api_token=settings["home-assistant"]["api_token"],
|
|
)
|
|
|
|
|
|
def loop():
|
|
global sta_if, state, last_update
|
|
global hass
|
|
global mqtt, last_mqtt_attempt
|
|
global rotary, rotary_button, rotary_value
|
|
global buttons
|
|
|
|
rotary_button.update()
|
|
if rotary_button.was_clicked():
|
|
print("CLICKED!")
|
|
if state["network"] == "OK":
|
|
hass.toggle_light("light.key_lights")
|
|
new_value = rotary.value()
|
|
if new_value != rotary_value:
|
|
change = new_value - rotary_value
|
|
print(f"ROTARY CHANGED: {new_value} ({change})")
|
|
if state["network"] == "OK":
|
|
hass.adjust_light("light.key_lights", change)
|
|
|
|
rotary_value = new_value
|
|
|
|
for i, button in enumerate(buttons):
|
|
button.update()
|
|
if button.was_clicked():
|
|
print(f"Pressed button {i}")
|
|
try:
|
|
scene = settings["scenes"][i]
|
|
print(f"Activating scene {scene}")
|
|
hass.activate_scene(scene)
|
|
except KeyError:
|
|
print("No scenes defined")
|
|
except IndexError:
|
|
print(f"No scene defined for button {i}")
|
|
|
|
if not sta_if.active():
|
|
print("Connecting to WiFi")
|
|
sta_if.active(True)
|
|
sta_if.connect(settings["wifi"]["ssid"], settings["wifi"]["password"])
|
|
|
|
if sta_if.active() and not sta_if.isconnected():
|
|
state["network"] = "ACT"
|
|
if sta_if.isconnected():
|
|
ip, _, _, _ = sta_if.ifconfig()
|
|
if ip == "0.0.0.0":
|
|
# Something went wrong, try to reconnect
|
|
print("IP invalid, retrying WiFi connection")
|
|
state["network"] = "ACT"
|
|
sta_if.active(True)
|
|
sta_if.connect(settings["wifi"]["ssid"], settings["wifi"]["password"])
|
|
elif state["network"] != "OK":
|
|
print(f"WIFI Connected to {sta_if.config('ssid')}")
|
|
print(f"IP Address: {ip}")
|
|
state["network"] = "OK"
|
|
if not mqtt and utime.time() - last_mqtt_attempt >= MQTT_RECONNECT_INTERVAL:
|
|
last_mqtt_attempt = utime.time()
|
|
try:
|
|
mqtt = mqtt_init()
|
|
except OSError as e:
|
|
print(f"Failed to connect to MQTT ({mqtt_broker}): {e}")
|
|
if mqtt:
|
|
if state.changed or utime.time() - last_update >= MQTT_UPDATE_INTERVAL:
|
|
topic = f"{mqtt_prefix}/state".encode()
|
|
payload = json.dumps(state.dictionary).encode()
|
|
print(f"MQTT -> [{topic}] {payload}")
|
|
mqtt.publish(f"{mqtt_prefix}/status", b"online", retain=True)
|
|
mqtt.publish(topic, payload, retain=True)
|
|
last_update = utime.time()
|
|
mqtt.check_msg()
|
|
state.clean()
|
|
|
|
|
|
while True:
|
|
loop()
|
|
utime.sleep_ms(10)
|