digitalaudioswitch/main.py

184 lines
6.2 KiB
Python

from machine import Pin, SPI, SoftI2C
import framebuf
import json
import machine
import network
import ubinascii
import utime
from umqtt.simple import MQTTClient
import ssd1306
import mcp4
from statetree import StateTree
VOLUME_MAX = 128
MQTT_KEEPALIVE = 60
MQTT_UPDATE_INTERVAL = 60
state = StateTree(
{
"network": "OFF",
"volume": {
"left": 0,
"right": 0,
},
}
)
last_update = 0
i2c = SoftI2C(sda=Pin(2), scl=Pin(16))
oled_width = 128
oled_height = 32
oled = ssd1306.SSD1306_I2C(oled_width, oled_height, i2c)
buf = bytearray((oled_height // 8) * oled_width)
fbuf = framebuf.FrameBuffer1(buf, oled_width, oled_height)
sta_if = network.WLAN(network.STA_IF)
spi = SPI(1)
cs = Pin(15, mode=Pin.OUT, value=1)
pot = mcp4.MCP4(spi, cs)
with open("settings.json", "r") as f:
settings = json.load(f)
mqtt = None
mqtt_client_id = ubinascii.hexlify(machine.unique_id())
mqtt_broker = settings["mqtt"]["broker"]
mqtt_prefix = settings["mqtt"]["prefix"]
def on_message(topic, msg):
print(f"MQTT <- [{topic}] {msg}")
try:
msg = json.loads(msg)
except:
return
if volume := msg.get("volume"):
if isinstance(volume.get("left"), int):
pot.write(0, volume["left"])
if isinstance(volume.get("right"), int):
pot.write(1, volume["right"])
def loop():
global mqtt, state, last_update
state["volume"]["left"] = pot.read(0)
state["volume"]["right"] = pot.read(1)
if not sta_if.active():
print("Connecting to WiFi")
sta_if.active(True)
sta_if.connect(settings["wifi"]["ssid"], settings["wifi"]["password"])
if sta_if.active() and not sta_if.isconnected():
state["network"] = "ACT"
if sta_if.isconnected():
if state["network"] != "OK":
ip, _, _, _ = sta_if.ifconfig()
print(f"WIFI Connected to {sta_if.config('ssid')}")
print(f"IP Address: {ip}")
state["network"] = "OK"
if not mqtt:
print("Starting MQTT client")
mqtt = MQTTClient(mqtt_client_id, mqtt_broker, keepalive=MQTT_KEEPALIVE)
mqtt.set_callback(on_message)
mqtt.set_last_will(f"{mqtt_prefix}/status", b"offline", retain=True)
mqtt.connect()
mqtt.subscribe(f"{mqtt_prefix}/set")
mqtt_device = {
"identifiers": mqtt_client_id,
"manufacturer": "correl",
"model": "digital-audio-switch",
"name": "Digital Audio Switch",
}
mqtt.publish(
f"homeassistant/number/digital-audio-switch/volume-left/config".encode(),
json.dumps(
{
"name": "Digital Audio Switch Volume (Left)",
"command_topic": f"{mqtt_prefix}/set",
"command_template": '{"volume": {"left": {{value}}}}',
"state_topic": f"{mqtt_prefix}/state",
"value_template": "{{ value_json.volume.left }}",
"availability_topic": f"{mqtt_prefix}/status",
"min": 0,
"max": VOLUME_MAX,
"mode": "slider",
"step": 1,
"unique_id": "digital-audio-switch-volume-left",
"device": mqtt_device,
}
).encode(),
retain=True,
)
mqtt.publish(
f"homeassistant/number/digital-audio-switch/volume-right/config".encode(),
json.dumps(
{
"name": "Digital Audio Switch Volume (Right)",
"command_topic": f"{mqtt_prefix}/set",
"command_template": '{"volume": {"right": {{value}}}}',
"state_topic": f"{mqtt_prefix}/state",
"value_template": "{{ value_json.volume.right }}",
"availability_topic": f"{mqtt_prefix}/status",
"min": 0,
"max": VOLUME_MAX,
"mode": "slider",
"step": 1,
"unique_id": "digital-audio-switch-volume-right",
"device": mqtt_device,
}
).encode(),
retain=True,
)
mqtt.publish(
f"homeassistant/number/digital-audio-switch/volume-master/config".encode(),
json.dumps(
{
"name": "Digital Audio Switch Volume (Master)",
"command_topic": f"{mqtt_prefix}/set",
"command_template": '{"volume": {"right": {{value}}, "left": {{value}}}}',
"state_topic": f"{mqtt_prefix}/state",
"value_template": """
{%set values = value_json.volume.left,
value_json.volume.right %}
{{ values|max }}
""",
"availability_topic": f"{mqtt_prefix}/status",
"min": 0,
"max": VOLUME_MAX,
"mode": "slider",
"step": 1,
"unique_id": "digital-audio-switch-volume-master",
"device": mqtt_device,
}
).encode(),
retain=True,
)
if state.changed or utime.time() - last_update >= MQTT_UPDATE_INTERVAL:
topic = f"{mqtt_prefix}/state".encode()
payload = json.dumps(state.dictionary).encode()
print(f"MQTT -> [{topic}] {payload}")
mqtt.publish(f"{mqtt_prefix}/status", b"online", retain=True)
mqtt.publish(topic, payload, retain=True)
last_update = utime.time()
mqtt.check_msg()
if state.changed:
oled.fill(0)
oled.text(f"LFT: {state['volume']['left']}", 0, 0)
oled.text(f"RGT: {state['volume']['right']}", 0, 10)
oled.text(f"NET: {state['network']}", 65, 0)
oled.show()
state.clean()
while True:
loop()
utime.sleep(1)