mirror of
https://github.com/correl/turntable.git
synced 2024-11-23 11:09:56 +00:00
Add visualization
This commit is contained in:
parent
713af582ec
commit
25aff8c108
4 changed files with 80 additions and 21 deletions
|
@ -1,9 +1,10 @@
|
|||
import logging
|
||||
|
||||
from turntable import application
|
||||
from turntable import application, turntable
|
||||
|
||||
|
||||
def main() -> None:
|
||||
with application.run() as events:
|
||||
while event := events.get():
|
||||
if not isinstance(event, turntable.Audio):
|
||||
logging.info("Event: %s", event)
|
||||
|
|
|
@ -1,52 +1,103 @@
|
|||
import logging
|
||||
import queue
|
||||
from statistics import fmean
|
||||
from typing import Iterable, List, Optional, Tuple, Union
|
||||
|
||||
import numpy as np # type: ignore
|
||||
import pyglet # type: ignore
|
||||
import pyglet.clock # type: ignore
|
||||
import scipy.signal # type: ignore
|
||||
|
||||
from turntable import application, turntable
|
||||
|
||||
|
||||
class Plot:
|
||||
def __init__(
|
||||
self,
|
||||
x: int,
|
||||
y: int,
|
||||
width: int,
|
||||
height: int,
|
||||
bars: int = 20,
|
||||
bar_width: int = 40,
|
||||
color: Tuple[int, int, int] = (255, 255, 255),
|
||||
batch: Optional[pyglet.graphics.Batch] = None,
|
||||
) -> None:
|
||||
self.x = x
|
||||
self.y = y
|
||||
self.width = width
|
||||
self.height = height
|
||||
self.bars = bars
|
||||
self.bar_width = bar_width
|
||||
self.color = color
|
||||
self.batch = batch or pyglet.graphics.Batch()
|
||||
self.lines: List[pyglet.shapes.Line] = []
|
||||
|
||||
def update(self, data):
|
||||
heights = scipy.signal.resample(data, self.bars) * self.height / 2 ** 16
|
||||
self.lines = [
|
||||
pyglet.shapes.Line(
|
||||
self.x + x / self.bars * self.width,
|
||||
0,
|
||||
self.x + x / self.bars * self.width,
|
||||
y,
|
||||
width=self.bar_width,
|
||||
color=self.color,
|
||||
batch=self.batch,
|
||||
)
|
||||
for x, y in enumerate(heights)
|
||||
]
|
||||
|
||||
def draw(self) -> None:
|
||||
self.batch.draw()
|
||||
|
||||
|
||||
def main():
|
||||
window = pyglet.window.Window(fullscreen=True)
|
||||
with application.run() as events:
|
||||
label = pyglet.text.Label(
|
||||
"<Idle>",
|
||||
font_name='Noto Sans',
|
||||
font_name="Noto Sans",
|
||||
font_size=36,
|
||||
x=window.width // 2,
|
||||
y=window.height // 2,
|
||||
anchor_x = 'center',
|
||||
anchor_y = 'center')
|
||||
anchor_x="center",
|
||||
anchor_y="center",
|
||||
)
|
||||
batch = pyglet.graphics.Batch()
|
||||
plot = Plot(
|
||||
x=0,
|
||||
y=0,
|
||||
width=window.width,
|
||||
height=window.height,
|
||||
bars=40,
|
||||
bar_width=window.width // 45,
|
||||
color=(139, 0, 139),
|
||||
batch=batch,
|
||||
)
|
||||
|
||||
@window.event
|
||||
def on_draw():
|
||||
window.clear()
|
||||
batch.draw()
|
||||
label.draw()
|
||||
|
||||
def check_events(dt):
|
||||
try:
|
||||
event = events.get(False)
|
||||
logging.info("Event: %s", event)
|
||||
logging.info("Label: %s", label)
|
||||
if isinstance(event, turntable.StartedPlaying):
|
||||
label.text = "<Record starting...>"
|
||||
elif isinstance(event, turntable.StoppedPlaying):
|
||||
label.text = "<Idle>"
|
||||
elif isinstance(event, turntable.NewMetadata):
|
||||
label.text = event.title
|
||||
elif isinstance(event, turntable.Audio):
|
||||
data = np.fromstring(event.pcm.raw, dtype=np.int16)
|
||||
fft = abs(np.fft.fft(data).real)
|
||||
fft = fft[: len(fft) // 2]
|
||||
plot.update(fft)
|
||||
except queue.Empty:
|
||||
...
|
||||
except:
|
||||
logging.exception("Oops")
|
||||
|
||||
pyglet.clock.schedule_interval_soft(check_events, 0.5)
|
||||
pyglet.clock.schedule(check_events)
|
||||
pyglet.app.run()
|
||||
# icecast.set_title("<Idle>")
|
||||
# while event := events.get():
|
||||
# logging.info("Event: %s", event)
|
||||
# if isinstance(event, StartedPlaying):
|
||||
# icecast.set_title("<Record starting...>")
|
||||
# elif isinstance(event, StoppedPlaying):
|
||||
# icecast.set_title("<Idle>")
|
||||
# elif isinstance(event, NewMetadata):
|
||||
# icecast.set_title(event.title)
|
||||
|
|
|
@ -43,6 +43,9 @@ class PCM:
|
|||
for i in range(0, len(self._data), self.framesize):
|
||||
yield PCM(self.framerate, self.channels, self.raw[i : i + self.framesize])
|
||||
|
||||
def __len__(self) -> int:
|
||||
return len(self._data) // self.framesize
|
||||
|
||||
def append(self, other: "PCM") -> None:
|
||||
if other.framerate != self.framerate or other.channels != self.channels:
|
||||
raise ValueError("Cannot append incompatible PCM audio")
|
||||
|
|
|
@ -55,6 +55,9 @@ class StoppedPlaying(Event):
|
|||
class NewMetadata(Event):
|
||||
title: str
|
||||
|
||||
@dataclass
|
||||
class Audio(Event):
|
||||
pcm: PCM
|
||||
|
||||
class PCMRecognizer(BaseRecognizer):
|
||||
@staticmethod
|
||||
|
@ -103,6 +106,7 @@ class Turntable(Process):
|
|||
logger.info("Initializing Turntable")
|
||||
while fragment := self.pcm_in.get():
|
||||
self.buffer.append(fragment)
|
||||
self.events_out.put(Audio(fragment))
|
||||
maximum = audioop.max(fragment.raw, 2)
|
||||
self.update_audiolevel(maximum)
|
||||
|
||||
|
|
Loading…
Reference in a new issue