markqvist___LXST/LXST/Codecs/Raw.py
2025-03-11 16:41:15 +01:00

58 lines
No EOL
2.3 KiB
Python

import RNS
import numpy as np
from .Codec import Codec
class Raw(Codec):
BITDEPTH_16 = 0x00
BITDEPTH_32 = 0x01
BITDEPTH_64 = 0x02
BITDEPTH_128 = 0x03
BITDEPTHS = ["float16", "float32", "float64", "float128"]
def __init__(self, channels=None, bitdepth=16):
if channels:
channels = min(max(channels, 1), 32)
self.bitdepth = bitdepth
self.channels = channels
if self.bitdepth >= 128:
self.dtype = self.BITDEPTHS[self.BITDEPTH_128]
self.header_bitdpeth = self.BITDEPTH_128
elif self.bitdepth >= 64:
self.dtype = self.BITDEPTHS[self.BITDEPTH_64]
self.header_bitdpeth = self.BITDEPTH_64
elif self.bitdepth >= 32:
self.dtype = self.BITDEPTHS[self.BITDEPTH_32]
self.header_bitdpeth = self.BITDEPTH_32
else:
self.dtype = self.BITDEPTHS[self.BITDEPTH_16]
self.header_bitdpeth = self.BITDEPTH_16
def encode(self, frame):
if self.channels == None:
self.channels = frame.shape[1]
RNS.log(f"{self} encoder set to {self.channels} channels", RNS.LOG_DEBUG)
if frame.shape[1] > self.channels:
frame = frame[:, range(0, self.channels)]
elif frame.shape[1] < self.channels:
new_frame = np.zeros(shape=(frame.shape[0], self.channels))
for n in range(0, frame.shape[1]): new_frame[:, n] = frame[:, n]
for n in range(frame.shape[1], new_frame.shape[1]): new_frame[:, n] = frame[:, frame.shape[1]-1]
frame = new_frame
frame_header = (self.header_bitdpeth << 6) | self.channels-1
frame_bytes = frame_header.to_bytes()+frame.astype(self.dtype).tobytes()
return frame_bytes
def decode(self, frame_bytes):
frame_header = frame_bytes[0]
frame_channels = (frame_header & 0b00111111)+1
frame_bitdepth = frame_header >> 6
frame_dtype = self.BITDEPTHS[frame_bitdepth]
frame_samples = np.frombuffer(frame_bytes[1:], dtype=frame_dtype)
frame_samples = frame_samples.reshape(len(frame_samples)//frame_channels, frame_channels)
if not self.channels: self.channels = frame_channels
return frame_samples