309 lines
10 KiB
Python
309 lines
10 KiB
Python
from machine import Pin, SPI, SoftI2C
|
|
import framebuf
|
|
import json
|
|
import machine
|
|
import network
|
|
import ubinascii
|
|
import uasyncio
|
|
import utime
|
|
|
|
from umqtt.simple import MQTTClient
|
|
|
|
import cd4052
|
|
import ssd1306
|
|
import mcp4
|
|
from button import Button
|
|
from rotary_irq_esp import RotaryIRQ
|
|
from statetree import StateTree
|
|
|
|
VOLUME_MAX = const(128)
|
|
|
|
MQTT_KEEPALIVE = const(60)
|
|
MQTT_UPDATE_INTERVAL = const(60)
|
|
MQTT_RECONNECT_INTERVAL = const(60)
|
|
|
|
channels = ["LINE 1", "LINE 2", "PHONO", "DAC"]
|
|
state = StateTree(
|
|
{
|
|
"network": "OFF",
|
|
"volume": {
|
|
"left": 0,
|
|
"right": 0,
|
|
"muted": "OFF",
|
|
},
|
|
"channel": channels[0],
|
|
}
|
|
)
|
|
last_update = 0
|
|
|
|
rotary = RotaryIRQ(
|
|
33,
|
|
32,
|
|
0,
|
|
max_val=128,
|
|
range_mode=RotaryIRQ.RANGE_BOUNDED,
|
|
pull_up=True,
|
|
incr=4,
|
|
)
|
|
rotary_value = rotary.value()
|
|
rotary_button = Button(Pin(36, Pin.IN))
|
|
|
|
try:
|
|
i2c = SoftI2C(sda=Pin(21), scl=Pin(22))
|
|
oled_width = const(128)
|
|
oled_height = const(32)
|
|
oled = ssd1306.SSD1306_I2C(oled_width, oled_height, i2c)
|
|
except Exception as e:
|
|
print("WARNING: OLED unavailable:", e)
|
|
oled = None
|
|
|
|
switch = cd4052.CD4052(18, 19, 23)
|
|
switch.select(0)
|
|
|
|
sta_if = network.WLAN(network.STA_IF)
|
|
|
|
spi = SPI(1)
|
|
cs = Pin(15, mode=Pin.OUT, value=1)
|
|
pot = mcp4.MCP4(spi, cs)
|
|
|
|
with open("settings.json", "r") as f:
|
|
settings = json.load(f)
|
|
|
|
mqtt = None
|
|
mqtt_client_id = ubinascii.hexlify(machine.unique_id())
|
|
mqtt_broker = settings["mqtt"]["broker"]
|
|
mqtt_prefix = settings["mqtt"]["prefix"]
|
|
last_mqtt_attempt = 0
|
|
|
|
|
|
def mqtt_init():
|
|
print("Starting MQTT client")
|
|
mqtt = MQTTClient(mqtt_client_id, mqtt_broker, keepalive=MQTT_KEEPALIVE)
|
|
mqtt.set_callback(on_message)
|
|
mqtt.set_last_will(f"{mqtt_prefix}/status", b"offline", retain=True)
|
|
mqtt.connect()
|
|
mqtt.subscribe(f"{mqtt_prefix}/set")
|
|
mqtt_device = {
|
|
"identifiers": mqtt_client_id,
|
|
"manufacturer": "correl",
|
|
"model": "digital-audio-switch",
|
|
"name": "Digital Audio Switch",
|
|
}
|
|
|
|
mqtt.publish(
|
|
f"homeassistant/number/digital-audio-switch/volume-left/config".encode(),
|
|
json.dumps(
|
|
{
|
|
"name": "Digital Audio Switch Volume (Left)",
|
|
"command_topic": f"{mqtt_prefix}/set",
|
|
"command_template": '{"volume": {"left": {{value}}}}',
|
|
"state_topic": f"{mqtt_prefix}/state",
|
|
"value_template": "{{ value_json.volume.left }}",
|
|
"availability_topic": f"{mqtt_prefix}/status",
|
|
"min": 0,
|
|
"max": VOLUME_MAX,
|
|
"mode": "slider",
|
|
"step": 1,
|
|
"unique_id": "digital-audio-switch-volume-left",
|
|
"device": mqtt_device,
|
|
}
|
|
).encode(),
|
|
retain=True,
|
|
)
|
|
mqtt.publish(
|
|
f"homeassistant/number/digital-audio-switch/volume-right/config".encode(),
|
|
json.dumps(
|
|
{
|
|
"name": "Digital Audio Switch Volume (Right)",
|
|
"command_topic": f"{mqtt_prefix}/set",
|
|
"command_template": '{"volume": {"right": {{value}}}}',
|
|
"state_topic": f"{mqtt_prefix}/state",
|
|
"value_template": "{{ value_json.volume.right }}",
|
|
"availability_topic": f"{mqtt_prefix}/status",
|
|
"min": 0,
|
|
"max": VOLUME_MAX,
|
|
"mode": "slider",
|
|
"step": 1,
|
|
"unique_id": "digital-audio-switch-volume-right",
|
|
"device": mqtt_device,
|
|
}
|
|
).encode(),
|
|
retain=True,
|
|
)
|
|
mqtt.publish(
|
|
f"homeassistant/number/digital-audio-switch/volume-master/config".encode(),
|
|
json.dumps(
|
|
{
|
|
"name": "Digital Audio Switch Volume (Master)",
|
|
"command_topic": f"{mqtt_prefix}/set",
|
|
"command_template": '{"volume": {"right": {{value}}, "left": {{value}}}}',
|
|
"state_topic": f"{mqtt_prefix}/state",
|
|
"value_template": """
|
|
{%set values = value_json.volume.left,
|
|
value_json.volume.right %}
|
|
{{ values|max }}
|
|
""",
|
|
"availability_topic": f"{mqtt_prefix}/status",
|
|
"min": 0,
|
|
"max": VOLUME_MAX,
|
|
"mode": "slider",
|
|
"step": 1,
|
|
"unique_id": "digital-audio-switch-volume-master",
|
|
"device": mqtt_device,
|
|
}
|
|
).encode(),
|
|
retain=True,
|
|
)
|
|
mqtt.publish(
|
|
f"homeassistant/switch/digital-audio-switch/mute/config".encode(),
|
|
json.dumps(
|
|
{
|
|
"name": "Digital Audio Switch Mute",
|
|
"command_topic": f"{mqtt_prefix}/set",
|
|
"payload_on": '{"volume": {"muted": "ON"}}',
|
|
"payload_off": '{"volume": {"muted": "OFF"}}',
|
|
"state_on": "ON",
|
|
"state_off": "OFF",
|
|
"state_topic": f"{mqtt_prefix}/state",
|
|
"value_template": "{{ value_json.volume.muted }}",
|
|
"availability_topic": f"{mqtt_prefix}/status",
|
|
"unique_id": "digital-audio-switch-volume-mute",
|
|
"device": mqtt_device,
|
|
}
|
|
).encode(),
|
|
retain=True,
|
|
)
|
|
mqtt.publish(
|
|
f"homeassistant/select/digital-audio-switch/channel/config".encode(),
|
|
json.dumps(
|
|
{
|
|
"name": "Digital Audio Switch Channel",
|
|
"command_topic": f"{mqtt_prefix}/set",
|
|
"command_template": '{"channel": "{{value}}"}',
|
|
"state_topic": f"{mqtt_prefix}/state",
|
|
"value_template": "{{ value_json.channel }}",
|
|
"availability_topic": f"{mqtt_prefix}/status",
|
|
"options": channels,
|
|
"unique_id": "digital-audio-switch-channel",
|
|
"device": mqtt_device,
|
|
}
|
|
).encode(),
|
|
retain=True,
|
|
)
|
|
return mqtt
|
|
|
|
|
|
def on_message(topic, msg):
|
|
print(f"MQTT <- [{topic}] {msg}")
|
|
try:
|
|
msg = json.loads(msg)
|
|
except:
|
|
return
|
|
if volume := msg.get("volume"):
|
|
if isinstance(volume.get("left"), int):
|
|
pot.write(0, volume["left"])
|
|
if isinstance(volume.get("right"), int):
|
|
pot.write(1, volume["right"])
|
|
if isinstance(volume.get("muted"), str):
|
|
switch.mute(volume["muted"] == "ON")
|
|
if isinstance(msg.get("channel"), str):
|
|
try:
|
|
switch.select(channels.index(msg["channel"]))
|
|
except ValueError:
|
|
print("WARNING: Attempted to select invalid channel", msg["channel"])
|
|
|
|
|
|
def loop():
|
|
global mqtt, state, last_update, last_mqtt_attempt
|
|
global rotary, rotary_button, rotary_value
|
|
|
|
rotary_button.update()
|
|
if rotary_button.was_clicked():
|
|
switch.toggle_mute()
|
|
if rotary_button.was_double_clicked():
|
|
if switch.channel() >= 3:
|
|
switch.select(0)
|
|
else:
|
|
switch.select(switch.channel() + 1)
|
|
|
|
state["volume"]["left"] = pot.read(0)
|
|
state["volume"]["right"] = pot.read(1)
|
|
state["volume"]["muted"] = "ON" if switch.muted() else "OFF"
|
|
state["channel"] = channels[switch.channel()]
|
|
|
|
if state.changed:
|
|
# Volume changed externally
|
|
rotary.set(value=max(state["volume"]["left"], state["volume"]["right"]))
|
|
rotary_value = rotary.value()
|
|
|
|
new_value = rotary.value()
|
|
if rotary_value != new_value:
|
|
print("Rotary:", new_value)
|
|
state["volume"]["left"] = new_value
|
|
state["volume"]["right"] = new_value
|
|
pot.write(0, new_value)
|
|
pot.write(1, new_value)
|
|
rotary_value = new_value
|
|
|
|
if not sta_if.active():
|
|
print("Connecting to WiFi")
|
|
sta_if.active(True)
|
|
sta_if.connect(settings["wifi"]["ssid"], settings["wifi"]["password"])
|
|
|
|
if sta_if.active() and not sta_if.isconnected():
|
|
state["network"] = "ACT"
|
|
if sta_if.isconnected():
|
|
ip, _, _, _ = sta_if.ifconfig()
|
|
if ip == "0.0.0.0":
|
|
# Something went wrong, try to reconnect
|
|
print("IP invalid, retrying WiFi connection")
|
|
state["network"] = "ACT"
|
|
sta_if.active(True)
|
|
sta_if.connect(settings["wifi"]["ssid"], settings["wifi"]["password"])
|
|
elif state["network"] != "OK":
|
|
print(f"WIFI Connected to {sta_if.config('ssid')}")
|
|
print(f"IP Address: {ip}")
|
|
state["network"] = "OK"
|
|
if not mqtt and utime.time() - last_mqtt_attempt >= MQTT_RECONNECT_INTERVAL:
|
|
last_mqtt_attempt = utime.time()
|
|
try:
|
|
mqtt = mqtt_init()
|
|
except OSError as e:
|
|
print(f"Failed to connect to MQTT ({mqtt_broker}): {e}")
|
|
if mqtt:
|
|
if state.changed or utime.time() - last_update >= MQTT_UPDATE_INTERVAL:
|
|
topic = f"{mqtt_prefix}/state".encode()
|
|
payload = json.dumps(state.dictionary).encode()
|
|
print(f"MQTT -> [{topic}] {payload}")
|
|
mqtt.publish(f"{mqtt_prefix}/status", b"online", retain=True)
|
|
mqtt.publish(topic, payload, retain=True)
|
|
last_update = utime.time()
|
|
mqtt.check_msg()
|
|
if oled and state.changed:
|
|
oled.fill(0)
|
|
oled.framebuf.rect(10, 0, 92, 8, 1)
|
|
oled.framebuf.rect(
|
|
12, 2, round(state["volume"]["left"] / VOLUME_MAX * 88), 4, 1, True
|
|
)
|
|
oled.framebuf.rect(10, 10, 92, 8, 1)
|
|
oled.framebuf.rect(
|
|
12, 12, round(state["volume"]["right"] / VOLUME_MAX * 88), 4, 1, True
|
|
)
|
|
oled.text("L", 0, 0)
|
|
oled.text("R", 0, 10)
|
|
oled.text(f"{state['volume']['left']:3d}", 104, 0)
|
|
oled.text(f"{state['volume']['right']:3d}", 104, 10)
|
|
if state["volume"]["muted"] == "ON":
|
|
oled.framebuf.rect(40, 4, 4 * 8 + 2, 10, 0, True)
|
|
oled.framebuf.rect(39, 3, 4 * 8 + 4, 12, 1)
|
|
oled.framebuf.rect(38, 2, 4 * 8 + 6, 14, 0)
|
|
oled.text("MUTE", 41, 5)
|
|
oled.text(f"WiFi: {state['network']}", 0, 20)
|
|
oled.text(f'{state["channel"]:>6}', 80, 20)
|
|
oled.show()
|
|
state.clean()
|
|
|
|
|
|
while True:
|
|
loop()
|
|
utime.sleep_ms(10)
|