commit 8e7dec24baac310a8836a63f166bac81d04eef68 Author: Correl Roush Date: Mon Oct 2 00:38:04 2023 -0400 Initial commit diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..aaeea10 --- /dev/null +++ b/Makefile @@ -0,0 +1,34 @@ +.PHONY: all deps test-deps test deploy run reset + +DEVICE ?= auto +DEPS = \ + umqtt.simple \ + urequests +TEST_DEPS = unittest + +mpremote = mpremote connect $(DEVICE) + +all: deps deploy reset + +deps: $(DEPS) + +test-deps: $(TEST_DEPS) + +$(DEPS) $(TEST_DEPS): + $(mpremote) mip install $@ + +test: deploy + $(mpremote) cp -r tests ":" + $(mpremote) exec 'import unittest; unittest.main("tests")' + +deploy: + $(mpremote) cp *.py ":" + @if test -f settings.json; then \ + $(mpremote) cp settings.json ":"; \ + fi + +run: + $(mpremote) run main.py + +reset: + $(mpremote) reset diff --git a/button.py b/button.py new file mode 100644 index 0000000..a5ac692 --- /dev/null +++ b/button.py @@ -0,0 +1,88 @@ +import utime +from machine import Pin + + +class Button: + DEBOUNCE_MS = 50 + DOUBLECLICK_MS = 400 + HOLD_MS = 1000 + + def __init__(self, pin: Pin, inverted=False) -> None: + self._pin = pin + self._inverted = inverted + self._pressed = False + self._clicked = False + self._doubleclicked = False + self._held = False + + self._debounce = 0 + self._hold = 0 + self._doubleclick = 0 + + if self._pin(): + self._debounce = utime.ticks_ms() + + def update(self) -> None: + now = utime.ticks_ms() + + on = bool(self._pin()) + if self._inverted: + on = not on + if on: + if self._debounce and now - self._debounce >= self.DEBOUNCE_MS: + self._debounce = 0 + self._pressed = True + self._hold = now + elif not self._pressed and not self._debounce: + self._debounce = now + elif self._hold and now - self._hold >= self.HOLD_MS: + self._hold = 0 + self._held = True + else: + if self._pressed: + self._pressed = False + if self._doubleclick: + if now - self._doubleclick <= self.DOUBLECLICK_MS: + self._doubleclicked = True + self._doubleclick = 0 + self._hold = 0 + self._held = False + else: + self._doubleclick = now + if self._doubleclick and now - self._doubleclick > self.DOUBLECLICK_MS: + if not self._held: + self._clicked = True + self._doubleclick = 0 + self._hold = 0 + self._held = False + + def pressed(self) -> bool: + return self._pressed + + def was_clicked(self) -> bool: + if self._clicked: + self._clicked = False + return True + return False + + def was_double_clicked(self) -> bool: + if self._doubleclicked: + self._doubleclicked = False + return True + return False + + def held(self) -> bool: + return self._held + + +if __name__ == "__main__": + button = Button(Pin(36, Pin.IN)) + while True: + button.update() + if button.was_clicked(): + print("CLICKED") + if button.was_double_clicked(): + print("DOUBLE-CLICKED") + if button.held(): + print("HELD") + utime.sleep_ms(10) diff --git a/main.py b/main.py new file mode 100644 index 0000000..5569e8b --- /dev/null +++ b/main.py @@ -0,0 +1,165 @@ +import json +import network +import ubinascii +import utime +from machine import Pin + +import requests +from umqtt.simple import MQTTClient + +from button import Button +from rotary_irq_esp import RotaryIRQ +from statetree import StateTree + +D1 = 5 +D2 = 4 +D3 = 0 +D4 = 2 +D5 = 14 +D6 = 12 +D7 = 13 +D8 = 15 +D0 = 16 + +MQTT_KEEPALIVE = const(60) +MQTT_UPDATE_INTERVAL = const(60) +MQTT_RECONNECT_INTERVAL = const(60) + +state = StateTree( + { + "network": "OFF", + } +) + +rotary = RotaryIRQ( + D4, + D5, + 0, + max_val=128, + range_mode=RotaryIRQ.RANGE_UNBOUNDED, + pull_up=True, + incr=5, +) + +rotary_button = Button(Pin(D1, Pin.IN, Pin.PULL_UP), inverted=True) +rotary_value = rotary.value() + + +with open("settings.json", "r") as f: + settings = json.load(f) + +sta_if = network.WLAN(network.STA_IF) + +mqtt = None +mqtt_client_id = ubinascii.hexlify(machine.unique_id()) +mqtt_broker = settings["mqtt"]["broker"] +mqtt_prefix = settings["mqtt"]["prefix"] +last_update = 0 +last_mqtt_attempt = 0 + + +def mqtt_init(): + print("Starting MQTT client") + mqtt = MQTTClient(mqtt_client_id, mqtt_broker, keepalive=MQTT_KEEPALIVE) + mqtt.set_callback(on_message) + mqtt.set_last_will(f"{mqtt_prefix}/status", b"offline", retain=True) + mqtt.connect() + mqtt.subscribe(f"{mqtt_prefix}/set") + mqtt_device = { + "identifiers": mqtt_client_id, + "manufacturer": "correl", + "model": "desk-controls", + "name": "desk-controls", + } + return mqtt + + +def on_message(topic, msg): + print(f"MQTT <- [{topic}] {msg}") + + +class HomeAssistant: + def __init__(self, url: str, api_token: str) -> None: + self._url = url + self._api_token = api_token + + def toggle_light(self, entity_id: str) -> None: + response = requests.post( + f"{self._url}/api/services/light/toggle", + headers={"Authorization": f"Bearer {self._api_token}"}, + json={"entity_id": entity_id}, + ) + + def adjust_light(self, entity_id: str, value: int) -> None: + response = requests.post( + f"{self._url}/api/services/light/turn_on", + headers={"Authorization": f"Bearer {self._api_token}"}, + json={"entity_id": entity_id, "brightness_step": value}, + ) + + +hass = HomeAssistant( + url=settings["home-assistant"]["url"], + api_token=settings["home-assistant"]["api_token"], +) + + +def loop(): + global sta_if, state, last_update + global hass + global mqtt, last_mqtt_attempt + global rotary, rotary_button, rotary_value + + rotary_button.update() + if rotary_button.was_clicked(): + print("CLICKED!") + if state["network"] == "OK": + hass.toggle_light("light.key_lights") + new_value = rotary.value() + if new_value != rotary_value: + change = new_value - rotary_value + print(f"ROTARY CHANGED: {new_value} ({change})") + if state["network"] == "OK": + hass.adjust_light("light.key_lights", change) + + rotary_value = new_value + if not sta_if.active(): + print("Connecting to WiFi") + sta_if.active(True) + sta_if.connect(settings["wifi"]["ssid"], settings["wifi"]["password"]) + + if sta_if.active() and not sta_if.isconnected(): + state["network"] = "ACT" + if sta_if.isconnected(): + ip, _, _, _ = sta_if.ifconfig() + if ip == "0.0.0.0": + # Something went wrong, try to reconnect + print("IP invalid, retrying WiFi connection") + state["network"] = "ACT" + sta_if.active(True) + sta_if.connect(settings["wifi"]["ssid"], settings["wifi"]["password"]) + elif state["network"] != "OK": + print(f"WIFI Connected to {sta_if.config('ssid')}") + print(f"IP Address: {ip}") + state["network"] = "OK" + if not mqtt and utime.time() - last_mqtt_attempt >= MQTT_RECONNECT_INTERVAL: + last_mqtt_attempt = utime.time() + try: + mqtt = mqtt_init() + except OSError as e: + print(f"Failed to connect to MQTT ({mqtt_broker}): {e}") + if mqtt: + if state.changed or utime.time() - last_update >= MQTT_UPDATE_INTERVAL: + topic = f"{mqtt_prefix}/state".encode() + payload = json.dumps(state.dictionary).encode() + print(f"MQTT -> [{topic}] {payload}") + mqtt.publish(f"{mqtt_prefix}/status", b"online", retain=True) + mqtt.publish(topic, payload, retain=True) + last_update = utime.time() + mqtt.check_msg() + state.clean() + + +while True: + loop() + utime.sleep_ms(10) diff --git a/rotary.py b/rotary.py new file mode 100644 index 0000000..b599e74 --- /dev/null +++ b/rotary.py @@ -0,0 +1,173 @@ +# MIT License (MIT) +# Copyright (c) 2022 Mike Teachman +# https://opensource.org/licenses/MIT + +# Platform-independent MicroPython code for the rotary encoder module + +# Documentation: +# https://github.com/MikeTeachman/micropython-rotary + +import micropython + +_DIR_CW = const(0x10) # Clockwise step +_DIR_CCW = const(0x20) # Counter-clockwise step + +# Rotary Encoder States +_R_START = const(0x0) +_R_CW_1 = const(0x1) +_R_CW_2 = const(0x2) +_R_CW_3 = const(0x3) +_R_CCW_1 = const(0x4) +_R_CCW_2 = const(0x5) +_R_CCW_3 = const(0x6) +_R_ILLEGAL = const(0x7) + +_transition_table = [ + + # |------------- NEXT STATE -------------| |CURRENT STATE| + # CLK/DT CLK/DT CLK/DT CLK/DT + # 00 01 10 11 + [_R_START, _R_CCW_1, _R_CW_1, _R_START], # _R_START + [_R_CW_2, _R_START, _R_CW_1, _R_START], # _R_CW_1 + [_R_CW_2, _R_CW_3, _R_CW_1, _R_START], # _R_CW_2 + [_R_CW_2, _R_CW_3, _R_START, _R_START | _DIR_CW], # _R_CW_3 + [_R_CCW_2, _R_CCW_1, _R_START, _R_START], # _R_CCW_1 + [_R_CCW_2, _R_CCW_1, _R_CCW_3, _R_START], # _R_CCW_2 + [_R_CCW_2, _R_START, _R_CCW_3, _R_START | _DIR_CCW], # _R_CCW_3 + [_R_START, _R_START, _R_START, _R_START]] # _R_ILLEGAL + +_transition_table_half_step = [ + [_R_CW_3, _R_CW_2, _R_CW_1, _R_START], + [_R_CW_3 | _DIR_CCW, _R_START, _R_CW_1, _R_START], + [_R_CW_3 | _DIR_CW, _R_CW_2, _R_START, _R_START], + [_R_CW_3, _R_CCW_2, _R_CCW_1, _R_START], + [_R_CW_3, _R_CW_2, _R_CCW_1, _R_START | _DIR_CW], + [_R_CW_3, _R_CCW_2, _R_CW_3, _R_START | _DIR_CCW], + [_R_START, _R_START, _R_START, _R_START], + [_R_START, _R_START, _R_START, _R_START]] + +_STATE_MASK = const(0x07) +_DIR_MASK = const(0x30) + + +def _wrap(value, incr, lower_bound, upper_bound): + range = upper_bound - lower_bound + 1 + value = value + incr + + if value < lower_bound: + value += range * ((lower_bound - value) // range + 1) + + return lower_bound + (value - lower_bound) % range + + +def _bound(value, incr, lower_bound, upper_bound): + return min(upper_bound, max(lower_bound, value + incr)) + + +def _trigger(rotary_instance): + for listener in rotary_instance._listener: + listener() + + +class Rotary(object): + + RANGE_UNBOUNDED = const(1) + RANGE_WRAP = const(2) + RANGE_BOUNDED = const(3) + + def __init__(self, min_val, max_val, incr, reverse, range_mode, half_step, invert): + self._min_val = min_val + self._max_val = max_val + self._incr = incr + self._reverse = -1 if reverse else 1 + self._range_mode = range_mode + self._value = min_val + self._state = _R_START + self._half_step = half_step + self._invert = invert + self._listener = [] + + def set(self, value=None, min_val=None, incr=None, + max_val=None, reverse=None, range_mode=None): + # disable DT and CLK pin interrupts + self._hal_disable_irq() + + if value is not None: + self._value = value + if min_val is not None: + self._min_val = min_val + if max_val is not None: + self._max_val = max_val + if incr is not None: + self._incr = incr + if reverse is not None: + self._reverse = -1 if reverse else 1 + if range_mode is not None: + self._range_mode = range_mode + self._state = _R_START + + # enable DT and CLK pin interrupts + self._hal_enable_irq() + + def value(self): + return self._value + + def reset(self): + self._value = 0 + + def close(self): + self._hal_close() + + def add_listener(self, l): + self._listener.append(l) + + def remove_listener(self, l): + if l not in self._listener: + raise ValueError('{} is not an installed listener'.format(l)) + self._listener.remove(l) + + def _process_rotary_pins(self, pin): + old_value = self._value + clk_dt_pins = (self._hal_get_clk_value() << + 1) | self._hal_get_dt_value() + + if self._invert: + clk_dt_pins = ~clk_dt_pins & 0x03 + + # Determine next state + if self._half_step: + self._state = _transition_table_half_step[self._state & + _STATE_MASK][clk_dt_pins] + else: + self._state = _transition_table[self._state & + _STATE_MASK][clk_dt_pins] + direction = self._state & _DIR_MASK + + incr = 0 + if direction == _DIR_CW: + incr = self._incr + elif direction == _DIR_CCW: + incr = -self._incr + + incr *= self._reverse + + if self._range_mode == self.RANGE_WRAP: + self._value = _wrap( + self._value, + incr, + self._min_val, + self._max_val) + elif self._range_mode == self.RANGE_BOUNDED: + self._value = _bound( + self._value, + incr, + self._min_val, + self._max_val) + else: + self._value = self._value + incr + + try: + if old_value != self._value and len(self._listener) != 0: + _trigger(self) + except: + pass diff --git a/rotary_irq_esp.py b/rotary_irq_esp.py new file mode 100644 index 0000000..3cf0615 --- /dev/null +++ b/rotary_irq_esp.py @@ -0,0 +1,76 @@ +# MIT License (MIT) +# Copyright (c) 2020 Mike Teachman +# https://opensource.org/licenses/MIT + +# Platform-specific MicroPython code for the rotary encoder module +# ESP8266/ESP32 implementation + +# Documentation: +# https://github.com/MikeTeachman/micropython-rotary + +from machine import Pin +from rotary import Rotary +from sys import platform + +_esp8266_deny_pins = [16] + + +class RotaryIRQ(Rotary): + + def __init__(self, pin_num_clk, pin_num_dt, min_val=0, max_val=10, incr=1, + reverse=False, range_mode=Rotary.RANGE_UNBOUNDED, pull_up=False, half_step=False, invert=False): + + if platform == 'esp8266': + if pin_num_clk in _esp8266_deny_pins: + raise ValueError( + '%s: Pin %d not allowed. Not Available for Interrupt: %s' % + (platform, pin_num_clk, _esp8266_deny_pins)) + if pin_num_dt in _esp8266_deny_pins: + raise ValueError( + '%s: Pin %d not allowed. Not Available for Interrupt: %s' % + (platform, pin_num_dt, _esp8266_deny_pins)) + + super().__init__(min_val, max_val, incr, reverse, range_mode, half_step, invert) + + if pull_up == True: + self._pin_clk = Pin(pin_num_clk, Pin.IN, Pin.PULL_UP) + self._pin_dt = Pin(pin_num_dt, Pin.IN, Pin.PULL_UP) + else: + self._pin_clk = Pin(pin_num_clk, Pin.IN) + self._pin_dt = Pin(pin_num_dt, Pin.IN) + + self._enable_clk_irq(self._process_rotary_pins) + self._enable_dt_irq(self._process_rotary_pins) + + def _enable_clk_irq(self, callback=None): + self._pin_clk.irq( + trigger=Pin.IRQ_RISING | Pin.IRQ_FALLING, + handler=callback) + + def _enable_dt_irq(self, callback=None): + self._pin_dt.irq( + trigger=Pin.IRQ_RISING | Pin.IRQ_FALLING, + handler=callback) + + def _disable_clk_irq(self): + self._pin_clk.irq(handler=None) + + def _disable_dt_irq(self): + self._pin_dt.irq(handler=None) + + def _hal_get_clk_value(self): + return self._pin_clk.value() + + def _hal_get_dt_value(self): + return self._pin_dt.value() + + def _hal_enable_irq(self): + self._enable_clk_irq(self._process_rotary_pins) + self._enable_dt_irq(self._process_rotary_pins) + + def _hal_disable_irq(self): + self._disable_clk_irq() + self._disable_dt_irq() + + def _hal_close(self): + self._hal_disable_irq() diff --git a/statetree.py b/statetree.py new file mode 100644 index 0000000..8499ef2 --- /dev/null +++ b/statetree.py @@ -0,0 +1,77 @@ +class StateTree: + """A dictionary-like object that tracks when values have been changed.""" + + def __init__(self, dictionary: dict = None, parent: "StateTree" = None) -> None: + """Create a new state tree. + + If a dictionary is supplied, its values will be initialized with it and + the tree will be marked as clean. + + If a parent is supplied, the parent will be marked as dirty when this + tree is modified. + + """ + self._dictionary = dictionary if dictionary else dict() + self._parent = parent + self._changed = False + + def dirty(self): + """Mark the tree as dirty. + + This is done automatically whenever an item is updated. + + """ + self._changed = True + if self._parent: + self._parent.dirty() + + def clean(self): + """Mark the tree as clean. + + Use this method to reset the changed status of the tree once after + you've reacted to it being updated. + + """ + self._changed = False + + @property + def changed(self): + """Returns whether the tree has been modified since the last time it was + marked as clean.""" + return self._changed + + @property + def dictionary(self): + """Returns the underlying dictionary.""" + return self._dictionary + + def __getitem__(self, *args, **kwargs): + """Get the value stored in a key in the tree. + + If the value is a dictionary, returns a StateTree object instead that + will notify the parent if a change is made. + + """ + o = self._dictionary.__getitem__(*args, **kwargs) + if isinstance(o, dict): + return StateTree(o, parent=self) + else: + return o + + def __setitem__(self, key, value): + """Update the value of a key in the tree. + + Marks the tree as changed if the key is new or the new value differs + from the current value. + + """ + if key not in self._dictionary or value != self._dictionary[key]: + self.dirty() + self._dictionary[key] = value + + def __repr__(self): + return "".format( + "^" if self._parent else "", + "*" if self._changed else "", + repr(self._dictionary), + )