mirror of
https://github.com/markqvist/LXST.git
synced 2026-04-27 14:20:39 +00:00
176 lines
7.1 KiB
Python
176 lines
7.1 KiB
Python
import RNS
|
|
import LXST
|
|
import time
|
|
import math
|
|
import threading
|
|
import numpy as np
|
|
from collections import deque
|
|
from inspect import currentframe
|
|
from .Codecs import Codec, Raw
|
|
from .Codecs.Codec import resample
|
|
from .Sinks import LocalSink
|
|
from .Sources import LocalSource, Backend
|
|
|
|
class Mixer(LocalSource, LocalSink):
|
|
MAX_FRAMES = 8
|
|
TYPE_MAP_FACTOR = np.iinfo("int16").max
|
|
|
|
def __init__(self, target_frame_ms=40, samplerate=None, codec=None, sink=None, gain=0.0):
|
|
self.incoming_frames = {}
|
|
self.target_frame_ms = target_frame_ms
|
|
self.frame_time = self.target_frame_ms/1000
|
|
self.should_run = False
|
|
self.mixer_thread = None
|
|
self.mixer_lock = threading.Lock()
|
|
self.insert_lock = threading.Lock()
|
|
self.muted = False
|
|
self.gain = gain
|
|
self.bitdepth = 32
|
|
self.channels = None
|
|
self.samplerate = None
|
|
self._sink = None
|
|
self._source = None
|
|
self._codec = None
|
|
|
|
if samplerate: self.samplerate = samplerate
|
|
if sink: self.sink = sink
|
|
if codec: self.codec = codec
|
|
|
|
def start(self):
|
|
if not self.should_run:
|
|
RNS.log(f"{self} starting", RNS.LOG_DEBUG)
|
|
self.should_run = True
|
|
self.mixer_thread = threading.Thread(target=self._mixer_job, daemon=True)
|
|
self.mixer_thread.start()
|
|
|
|
def stop(self):
|
|
self.should_run = False
|
|
|
|
def set_gain(self, gain=None):
|
|
if gain == None: self.gain = 0.0
|
|
else: self.gain = float(gain)
|
|
|
|
def mute(self, mute=True):
|
|
if mute == True or mute == False: self.muted = mute
|
|
else: self.muted = False
|
|
|
|
def unmute(self, unmute=True):
|
|
if unmute == True or unmute == False: self.muted = unmute
|
|
else: self.muted = False
|
|
|
|
def set_source_max_frames(self, source, max_frames):
|
|
with self.insert_lock:
|
|
if not source in self.incoming_frames: self.incoming_frames[source] = deque(maxlen=max_frames)
|
|
else: self.incoming_frames[source] = deque(self.incoming_frames[source], maxlen=max_frames)
|
|
|
|
def can_receive(self, from_source):
|
|
if not from_source in self.incoming_frames: return True
|
|
elif len(self.incoming_frames[from_source]) < self.MAX_FRAMES: return True
|
|
else: return False
|
|
|
|
def handle_frame(self, frame, source, decoded=False):
|
|
with self.insert_lock:
|
|
if not source in self.incoming_frames:
|
|
self.incoming_frames[source] = deque(maxlen=self.MAX_FRAMES)
|
|
|
|
if not self.channels:
|
|
self.channels = source.channels
|
|
|
|
if not self.samplerate:
|
|
self.samplerate = source.samplerate
|
|
self.samples_per_frame = math.ceil((self.target_frame_ms/1000)*self.samplerate)
|
|
self.frame_time = self.samples_per_frame/self.samplerate
|
|
RNS.log(f"{self} samplerate set to {RNS.prettyfrequency(self.samplerate)}", RNS.LOG_DEBUG)
|
|
RNS.log(f"{self} frame time is {RNS.prettyshorttime(self.frame_time)}")
|
|
|
|
if not decoded: frame_samples = source.codec.decode(frame)
|
|
else: frame_samples = frame
|
|
|
|
# TODO: Add resampling for all source types
|
|
# if CODEC_OUTPUT_RATE != self.samplerate:
|
|
# frame_samples = resample(frame_samples, source.bitdepth, source.channels, CODEC_OUTPUT_RATE, self.samplerate)
|
|
|
|
self.incoming_frames[source].append(frame_samples)
|
|
|
|
@property
|
|
def _mixing_gain(self):
|
|
if self.muted: return 0.0
|
|
elif self.gain == 0.0: return 1.0
|
|
else: return 10**(self.gain/10)
|
|
|
|
def _mixer_job(self):
|
|
with self.mixer_lock:
|
|
while self.should_run:
|
|
if self.sink and self.sink.can_receive():
|
|
source_count = 0
|
|
mixed_frame = None
|
|
for source in self.incoming_frames.copy():
|
|
if len(self.incoming_frames[source]) > 0:
|
|
next_frame = self.incoming_frames[source].popleft()
|
|
if source_count == 0: mixed_frame = next_frame*self._mixing_gain
|
|
else: mixed_frame = mixed_frame + next_frame*self._mixing_gain
|
|
source_count += 1
|
|
|
|
if source_count > 0:
|
|
mixed_frame = np.clip(mixed_frame, -1.0, 1.0)
|
|
if RNS.loglevel >= RNS.LOG_DEBUG:
|
|
if mixed_frame.max() >= 1.0 or mixed_frame.min() <= -1.0:
|
|
RNS.log(f"Signal clipped on {self}", RNS.LOG_WARNING)
|
|
|
|
if self.codec: self.sink.handle_frame(self.codec.encode(mixed_frame), self)
|
|
else: self.sink.handle_frame(mixed_frame, self)
|
|
else:
|
|
time.sleep(self.frame_time*0.1)
|
|
|
|
else:
|
|
time.sleep(self.frame_time*0.1)
|
|
|
|
@property
|
|
def codec(self):
|
|
return self._codec
|
|
|
|
@codec.setter
|
|
def codec(self, codec):
|
|
if codec == None:
|
|
self._codec = None
|
|
elif not issubclass(type(codec), Codec):
|
|
raise CodecError(f"Invalid codec specified for {self}")
|
|
else:
|
|
self._codec = codec
|
|
|
|
if self.codec.preferred_samplerate:
|
|
self.samplerate = self.codec.preferred_samplerate
|
|
else:
|
|
self.samplerate = Backend.SAMPLERATE
|
|
|
|
if self.codec.frame_quanta_ms:
|
|
if self.target_frame_ms%self.codec.frame_quanta_ms != 0:
|
|
self.target_frame_ms = math.ceil(self.target_frame_ms/self.codec.frame_quanta_ms)*self.codec.frame_quanta_ms
|
|
RNS.log(f"{self} target frame time quantized to {self.target_frame_ms}ms due to codec frame quanta", RNS.LOG_DEBUG)
|
|
|
|
if self.codec.frame_max_ms:
|
|
if self.target_frame_ms > self.codec.frame_max_ms:
|
|
self.target_frame_ms = self.codec.frame_max_ms
|
|
RNS.log(f"{self} target frame time clamped to {self.target_frame_ms}ms due to codec frame limit", RNS.LOG_DEBUG)
|
|
|
|
if self.codec.valid_frame_ms:
|
|
if not self.target_frame_ms in self.codec.valid_frame_ms:
|
|
self.target_frame_ms = min(self.codec.valid_frame_ms, key=lambda t:abs(t-self.target_frame_ms))
|
|
RNS.log(f"{self} target frame time clamped to closest valid value of {self.target_frame_ms}ms ", RNS.LOG_DEBUG)
|
|
|
|
@property
|
|
def source(self):
|
|
return self._source
|
|
|
|
@source.setter
|
|
def source(self, source):
|
|
self._source = source
|
|
|
|
@property
|
|
def sink(self):
|
|
return self._sink
|
|
|
|
@sink.setter
|
|
def sink(self, sink):
|
|
self._sink = sink
|
|
|