Add hue light control

This commit is contained in:
Correl Roush 2021-02-10 17:35:41 -05:00
parent 785d81a351
commit a7541084fa
3 changed files with 138 additions and 1 deletions

View file

@ -25,6 +25,12 @@
}, },
"database_type": "postgres" "database_type": "postgres"
}, },
"hue": {
"enabled": false,
"host": "localhost",
"username": "turntable",
"light": "My Light"
},
"icecast": { "icecast": {
"enabled": false, "enabled": false,
"host": "localhost", "host": "localhost",

View file

@ -11,6 +11,7 @@ from dejavu import Dejavu # type: ignore
from turntable.audio import Listener, Player from turntable.audio import Listener, Player
from turntable.events import Event from turntable.events import Event
from turntable.hue import Hue
from turntable.icecast import Icecast from turntable.icecast import Icecast
from turntable.models import PCM from turntable.models import PCM
from turntable.turntable import Turntable from turntable.turntable import Turntable
@ -39,7 +40,8 @@ class Application:
audio_config = self.config.get("audio", dict()) audio_config = self.config.get("audio", dict())
pcm_in: "Queue[PCM]" = Queue() pcm_in: "Queue[PCM]" = Queue()
pcms: "List[Queue[PCM]]" = [pcm_in] hue_pcm: "Queue[PCM]" = Queue()
pcms: "List[Queue[PCM]]" = [pcm_in, hue_pcm]
if pcm: if pcm:
pcms.append(pcm) pcms.append(pcm)
if output_device := audio_config.get("output_device"): if output_device := audio_config.get("output_device"):
@ -77,6 +79,20 @@ class Application:
event_queues.append(icecast_events) event_queues.append(icecast_events)
self.processes.append(icecast) self.processes.append(icecast)
hue_config = self.config.get("hue", dict())
hue_enabled = hue_config.get("enabled", False)
if hue_enabled:
hue_events: "Queue[Event]" = Queue()
hue = Hue(
pcm_in=hue_pcm,
events=hue_events,
host=hue_config.get("host", "localhost"),
username=hue_config.get("username", "turntable"),
light=hue_config.get("light", "Light"),
)
event_queues.append(hue_events)
self.processes.append(hue)
dejavu = Dejavu(self.config.get("dejavu", dict())) dejavu = Dejavu(self.config.get("dejavu", dict()))
turntable_config = self.config.get("turntable", dict()) turntable_config = self.config.get("turntable", dict())

115
turntable/hue.py Normal file
View file

@ -0,0 +1,115 @@
import audioop
import logging
from multiprocessing import Process, Queue
import os
import queue
import time
from typing import Any, Optional
import requests
from turntable.events import *
from turntable.models import PCM
logger = logging.getLogger(__name__)
class HueError(Exception):
...
def hue_response(response: requests.Response) -> Any:
try:
response.raise_for_status()
result = response.json()
try:
raise HueError(response.json()[0]["error"]["description"])
except (IndexError, KeyError, TypeError):
return result
except requests.HTTPError as e:
raise HueError(f"http error: {e}") from e
except ValueError:
raise HueError("invalid response")
def hue_error(response: Any) -> Optional[str]:
try:
return response.json()[0]["error"]["description"]
except ValueError:
return "invalid response"
except (IndexError, KeyError, TypeError):
return None
class Hue(Process):
def __init__(
self,
pcm_in: "Queue[PCM]",
events: "Queue[Event]",
host: str,
username: str,
light: str,
):
super().__init__()
self.pcm_in = pcm_in
self.events = events
self.host = host
self.username = username
self.light = light
self.light_id = None
try:
lights = hue_response(
requests.get(f"http://{self.host}/api/{self.username}/lights")
)
except HueError as error:
logger.warn(f"Error fetching lights: {error}")
return
try:
self.light_id = next(
filter(
lambda i: i[1]["name"].lower() == self.light.lower(), lights.items()
)
)[0]
except StopIteration:
logger.warn(f"Could not find a light named '{light}")
return
logger.info("Hue ready")
def run(self) -> None:
if not self.light_id:
logger.warn("No light identified, not starting Hue")
return
logger.debug("Starting Hue")
max_peak = 3000
audio = None
while True:
try:
while event := self.events.get(False):
...
except queue.Empty:
...
try:
while sample := self.pcm_in.get(False):
audio = sample
except queue.Empty:
...
if not audio:
continue
rms = audioop.rms(audio.raw, audio.channels)
peak = audioop.max(audio.raw, audio.channels)
max_peak = max(peak, max_peak)
brightness = int(peak / max_peak * 255)
logger.debug(f"Brightness: {brightness}")
requests.put(
"http://192.168.1.199/api/bx1YKf6IQmU-W1MLHrsZ79Wz4bRWiBShb4ewBpfm/lights/7/state",
json={"bri": brightness, "transitiontime": 1},
)
# requests.put(
# "http://192.168.1.199/api/bx1YKf6IQmU-W1MLHrsZ79Wz4bRWiBShb4ewBpfm/groups/2/action",
# json={"bri": brightness, "transitiontime": 1},
# )
time.sleep(0.1)