digitalaudioswitch/main.py

310 lines
10 KiB
Python

from machine import Pin, SPI, SoftI2C
import framebuf
import json
import machine
import network
import ubinascii
import uasyncio
import utime
from umqtt.simple import MQTTClient
import cd4052
import ssd1306
import mcp4
from button import Button
from rotary_irq_esp import RotaryIRQ
from statetree import StateTree
VOLUME_MAX = const(128)
MQTT_KEEPALIVE = const(60)
MQTT_UPDATE_INTERVAL = const(60)
MQTT_RECONNECT_INTERVAL = const(60)
channels = ["LINE 1", "LINE 2", "PHONO", "DAC"]
state = StateTree(
{
"network": "OFF",
"volume": {
"left": 0,
"right": 0,
"muted": "OFF",
},
"channel": channels[0],
}
)
last_update = 0
rotary = RotaryIRQ(
33,
32,
0,
max_val=128,
range_mode=RotaryIRQ.RANGE_BOUNDED,
pull_up=True,
incr=4,
)
rotary_value = rotary.value()
rotary_button = Button(Pin(36, Pin.IN))
try:
i2c = SoftI2C(sda=Pin(21), scl=Pin(22))
oled_width = const(128)
oled_height = const(32)
oled = ssd1306.SSD1306_I2C(oled_width, oled_height, i2c)
except Exception as e:
print("WARNING: OLED unavailable:", e)
oled = None
switch = cd4052.CD4052(18, 19, 23)
switch.select(0)
sta_if = network.WLAN(network.STA_IF)
spi = SPI(1)
cs = Pin(15, mode=Pin.OUT, value=1)
pot = mcp4.MCP4(spi, cs)
with open("settings.json", "r") as f:
settings = json.load(f)
mqtt = None
mqtt_client_id = ubinascii.hexlify(machine.unique_id())
mqtt_broker = settings["mqtt"]["broker"]
mqtt_prefix = settings["mqtt"]["prefix"]
last_mqtt_attempt = 0
def mqtt_init():
print("Starting MQTT client")
mqtt = MQTTClient(mqtt_client_id, mqtt_broker, keepalive=MQTT_KEEPALIVE)
mqtt.set_callback(on_message)
mqtt.set_last_will(f"{mqtt_prefix}/status", b"offline", retain=True)
mqtt.connect()
mqtt.subscribe(f"{mqtt_prefix}/set")
mqtt_device = {
"identifiers": mqtt_client_id,
"manufacturer": "correl",
"model": "digital-audio-switch",
"name": "Digital Audio Switch",
}
mqtt.publish(
f"homeassistant/number/digital-audio-switch/volume-left/config".encode(),
json.dumps(
{
"name": "Digital Audio Switch Volume (Left)",
"command_topic": f"{mqtt_prefix}/set",
"command_template": '{"volume": {"left": {{value}}}}',
"state_topic": f"{mqtt_prefix}/state",
"value_template": "{{ value_json.volume.left }}",
"availability_topic": f"{mqtt_prefix}/status",
"min": 0,
"max": VOLUME_MAX,
"mode": "slider",
"step": 1,
"unique_id": "digital-audio-switch-volume-left",
"device": mqtt_device,
}
).encode(),
retain=True,
)
mqtt.publish(
f"homeassistant/number/digital-audio-switch/volume-right/config".encode(),
json.dumps(
{
"name": "Digital Audio Switch Volume (Right)",
"command_topic": f"{mqtt_prefix}/set",
"command_template": '{"volume": {"right": {{value}}}}',
"state_topic": f"{mqtt_prefix}/state",
"value_template": "{{ value_json.volume.right }}",
"availability_topic": f"{mqtt_prefix}/status",
"min": 0,
"max": VOLUME_MAX,
"mode": "slider",
"step": 1,
"unique_id": "digital-audio-switch-volume-right",
"device": mqtt_device,
}
).encode(),
retain=True,
)
mqtt.publish(
f"homeassistant/number/digital-audio-switch/volume-master/config".encode(),
json.dumps(
{
"name": "Digital Audio Switch Volume (Master)",
"command_topic": f"{mqtt_prefix}/set",
"command_template": '{"volume": {"right": {{value}}, "left": {{value}}}}',
"state_topic": f"{mqtt_prefix}/state",
"value_template": """
{%set values = value_json.volume.left,
value_json.volume.right %}
{{ values|max }}
""",
"availability_topic": f"{mqtt_prefix}/status",
"min": 0,
"max": VOLUME_MAX,
"mode": "slider",
"step": 1,
"unique_id": "digital-audio-switch-volume-master",
"device": mqtt_device,
}
).encode(),
retain=True,
)
mqtt.publish(
f"homeassistant/switch/digital-audio-switch/mute/config".encode(),
json.dumps(
{
"name": "Digital Audio Switch Mute",
"command_topic": f"{mqtt_prefix}/set",
"payload_on": '{"volume": {"muted": "ON"}}',
"payload_off": '{"volume": {"muted": "OFF"}}',
"state_on": "ON",
"state_off": "OFF",
"state_topic": f"{mqtt_prefix}/state",
"value_template": "{{ value_json.volume.muted }}",
"availability_topic": f"{mqtt_prefix}/status",
"unique_id": "digital-audio-switch-volume-mute",
"device": mqtt_device,
}
).encode(),
retain=True,
)
mqtt.publish(
f"homeassistant/select/digital-audio-switch/channel/config".encode(),
json.dumps(
{
"name": "Digital Audio Switch Channel",
"command_topic": f"{mqtt_prefix}/set",
"command_template": '{"channel": "{{value}}"}',
"state_topic": f"{mqtt_prefix}/state",
"value_template": "{{ value_json.channel }}",
"availability_topic": f"{mqtt_prefix}/status",
"options": channels,
"unique_id": "digital-audio-switch-channel",
"device": mqtt_device,
}
).encode(),
retain=True,
)
return mqtt
def on_message(topic, msg):
print(f"MQTT <- [{topic}] {msg}")
try:
msg = json.loads(msg)
except:
return
if volume := msg.get("volume"):
if isinstance(volume.get("left"), int):
pot.write(0, volume["left"])
if isinstance(volume.get("right"), int):
pot.write(1, volume["right"])
if isinstance(volume.get("muted"), str):
switch.mute(volume["muted"] == "ON")
if isinstance(msg.get("channel"), str):
try:
switch.select(channels.index(msg["channel"]))
except ValueError:
print("WARNING: Attempted to select invalid channel", msg["channel"])
def loop():
global mqtt, state, last_update, last_mqtt_attempt
global rotary, rotary_button, rotary_value
rotary_button.update()
if rotary_button.was_clicked():
switch.toggle_mute()
if rotary_button.was_double_clicked():
if switch.channel() >= 3:
switch.select(0)
else:
switch.select(switch.channel() + 1)
state["volume"]["left"] = pot.read(0)
state["volume"]["right"] = pot.read(1)
state["volume"]["muted"] = "ON" if switch.muted() else "OFF"
state["channel"] = channels[switch.channel()]
if state.changed:
# Volume changed externally
rotary.set(value=max(state["volume"]["left"], state["volume"]["right"]))
rotary_value = rotary.value()
new_value = rotary.value()
if rotary_value != new_value:
print("Rotary:", new_value)
state["volume"]["left"] = new_value
state["volume"]["right"] = new_value
pot.write(0, new_value)
pot.write(1, new_value)
rotary_value = new_value
if not sta_if.active():
print("Connecting to WiFi")
sta_if.active(True)
sta_if.connect(settings["wifi"]["ssid"], settings["wifi"]["password"])
if sta_if.active() and not sta_if.isconnected():
state["network"] = "ACT"
if sta_if.isconnected():
ip, _, _, _ = sta_if.ifconfig()
if ip == "0.0.0.0":
# Something went wrong, try to reconnect
print("IP invalid, retrying WiFi connection")
state["network"] = "ACT"
sta_if.active(True)
sta_if.connect(settings["wifi"]["ssid"], settings["wifi"]["password"])
elif state["network"] != "OK":
print(f"WIFI Connected to {sta_if.config('ssid')}")
print(f"IP Address: {ip}")
state["network"] = "OK"
if not mqtt and utime.time() - last_mqtt_attempt >= MQTT_RECONNECT_INTERVAL:
last_mqtt_attempt = utime.time()
try:
mqtt = mqtt_init()
except OSError as e:
print(f"Failed to connect to MQTT ({mqtt_broker}): {e}")
if mqtt:
if state.changed or utime.time() - last_update >= MQTT_UPDATE_INTERVAL:
topic = f"{mqtt_prefix}/state".encode()
payload = json.dumps(state.dictionary).encode()
print(f"MQTT -> [{topic}] {payload}")
mqtt.publish(f"{mqtt_prefix}/status", b"online", retain=True)
mqtt.publish(topic, payload, retain=True)
last_update = utime.time()
mqtt.check_msg()
if oled and state.changed:
oled.fill(0)
oled.framebuf.rect(10, 0, 92, 8, 1)
oled.framebuf.rect(
12, 2, round(state["volume"]["left"] / VOLUME_MAX * 88), 4, 1, True
)
oled.framebuf.rect(10, 10, 92, 8, 1)
oled.framebuf.rect(
12, 12, round(state["volume"]["right"] / VOLUME_MAX * 88), 4, 1, True
)
oled.text("L", 0, 0)
oled.text("R", 0, 10)
oled.text(f"{state['volume']['left']:3d}", 104, 0)
oled.text(f"{state['volume']['right']:3d}", 104, 10)
if state["volume"]["muted"] == "ON":
oled.framebuf.rect(40, 4, 4 * 8 + 2, 10, 0, True)
oled.framebuf.rect(39, 3, 4 * 8 + 4, 12, 1)
oled.framebuf.rect(38, 2, 4 * 8 + 6, 14, 0)
oled.text("MUTE", 41, 5)
oled.text(f"WiFi: {state['network']}", 0, 20)
oled.text(f'{state["channel"]:>6}', 80, 20)
oled.show()
state.clean()
while True:
loop()
utime.sleep_ms(10)