diff --git a/.github/workflows/pya-ci.yaml b/.github/workflows/pya-ci.yaml index 766bac7..6032ea8 100644 --- a/.github/workflows/pya-ci.yaml +++ b/.github/workflows/pya-ci.yaml @@ -40,12 +40,15 @@ jobs: run: | conda init bash conda config --add channels defaults + conda config --add channels conda-forge conda activate test-env - conda install ffmpeg coverage python-coveralls --file=requirements_remote.txt --file=requirements_test.txt + # Explicitly install ffmpeg from conda-forge + conda install -c conda-forge ffmpeg + conda install coverage python-coveralls --file=requirements_remote.txt --file=requirements_test.txt # pyaudio is not yet available on conda pip install -r requirements.txt - name: Run tests shell: bash -l {0} run: | conda activate test-env - pytest --cov pya/ + pytest -v --cov pya/ diff --git a/Changelog.md b/Changelog.md index dd49c6a..c0f0370 100644 --- a/Changelog.md +++ b/Changelog.md @@ -1,5 +1,8 @@ # Changelog +## 0.5.3 (Apr 2025) +* Improve aserver playback with ringbuffer and event scheduling + ## 0.5.2 (Nov 2023) * #82, `pyaudio` is now optional: If you plan to use `PyAudioBackend`, install `pya` with `pip install pya[pyaudio]`. * Fix audio device bug diff --git a/examples/pya-examples.ipynb b/examples/pya-examples.ipynb index 08df7a9..eee97c2 100644 --- a/examples/pya-examples.ipynb +++ b/examples/pya-examples.ipynb @@ -2124,7 +2124,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.10.7" + "version": "3.12.8" }, "toc": { "base_numbering": 1, diff --git a/pya/aserver.py b/pya/aserver.py index 223d944..7d8c783 100644 --- a/pya/aserver.py +++ b/pya/aserver.py @@ -1,4 +1,5 @@ from .helper.backend import determine_backend +from .helper.ringbuffer import RingBuffer import copy import logging import time @@ -12,6 +13,126 @@ _LOGGER.addHandler(logging.NullHandler()) +class AudioEvent: + """Audio Event with timing. This is used in Aserver for scheduling playback.""" + + def __init__(self, signal: np.ndarray, onset: float, out_channel: int = 0): + self._signal = signal + self._onset = onset + self._out_channel = out_channel + self._pos = 0 + self.completed = False + + @property + def pos(self) -> int: + return self._pos + + @property + def onset(self) -> float: + return self._onset + + @property + def out_channel(self) -> int: + return self._out_channel + + @property + def signal(self) -> np.ndarray: + return self._signal + + @property + def remaining_samples(self) -> int: + return self._signal.shape[0] - self.pos + + def get_next_chunk(self, max_samples: int) -> np.ndarray: + """Retrieves the next portion of audio samples from the signal, advancing the playback position. + The function handles both partial and complete chunks, automatically marking the event as completed + when all samples have been played. + + Parameters + ---------- + max_samples : int + Maximum number of samples to return + + Returns + ------- + np.ndarray + Array of audio samples, empty if event is completed + """ + if self.completed: + return np.array([]) + + samples = min(max_samples, self.remaining_samples) + chunk = self._signal[self._pos: self._pos + samples] + self._pos += samples + + if self._pos >= self._signal.shape[0]: + self.completed = True + + return chunk + + +class EventScheduler: + """Manages scheduling of audio events for playback.""" + + def __init__(self, sr: int): + self._sr = sr + self.pending_events = [] + self.active_events = [] + + def schedule(self, event: AudioEvent): + idx = 0 + while (idx < len(self.pending_events) and self.pending_events[idx].onset < event.onset): + idx += 1 + + self.pending_events.insert(idx, event) + + def process( + self, current_time: float, output_buffer: np.ndarray, block_size: int + ) -> np.ndarray: + end_time = current_time + block_size / self._sr + + idx = 0 + while idx < len(self.pending_events): + if self.pending_events[idx].onset <= end_time: + self.active_events.append(self.pending_events.pop(idx)) + else: + idx += 1 + + idx = 0 + while idx < len(self.active_events): + event = self.active_events[idx] + + if event.onset > current_time: + offset_samples = int((event.onset - current_time) * self._sr) + else: + offset_samples = 0 + + audio_chunk = event.get_next_chunk(block_size - offset_samples) + + if len(audio_chunk) > 0: + channels = min( + audio_chunk.shape[1] if len(audio_chunk.shape) > 1 else 1, + output_buffer.shape[1] - event.out_channel, + ) + + if len(audio_chunk.shape) == 1: + audio_chunk = audio_chunk.reshape(-1, 1) + + end_offset = offset_samples + audio_chunk.shape[0] + end_channel = event.out_channel + channels + + output_buffer[ + offset_samples:end_offset, event.out_channel: end_channel + ] += audio_chunk[:, :channels] + + if event.completed: + self.active_events.pop(idx) + else: + idx += 1 + + return output_buffer + + class Aserver: """Pya audio server Based on pyaudio, works as a FIFO style audio stream pipeline, @@ -52,9 +173,15 @@ def shutdown_default_server(): else: warn("Aserver:shutdown_default_server: no default_server to shutdown") - def __init__(self, sr: int = 44100, bs: Optional[int] = None, - device: Optional[int] = None, channels: Optional[int] = None, - backend=None, **kwargs): + def __init__( + self, + sr: int = 44100, + bs: Optional[int] = None, + device: Optional[int] = None, + channels: Optional[int] = None, + backend=None, + **kwargs, + ): """Aserver manages an pyaudio stream, using its aserver callback to feed dispatched signals to output at the right time. @@ -65,7 +192,7 @@ def __init__(self, sr: int = 44100, bs: Optional[int] = None, bs : int Override block size or buffer size set by chosen backend device : int - The device index based on pya.device_info(), default is None which will set + The device index based on pya.device_info(), default is None which will set the default device from PyAudio channels : int number of channel, default is the max output channels of the device @@ -85,25 +212,30 @@ def __init__(self, sr: int = 44100, bs: Optional[int] = None, self.input_devices = [] self.output_devices = [] for i in range(self.backend.get_device_count()): - if int(self.backend.get_device_info_by_index(i)['maxInputChannels']) > 0: + if int(self.backend.get_device_info_by_index(i)["maxInputChannels"]) > 0: self.input_devices.append(self.backend.get_device_info_by_index(i)) - if int(self.backend.get_device_info_by_index(i)['maxOutputChannels']) > 0: + if int(self.backend.get_device_info_by_index(i)["maxOutputChannels"]) > 0: self.output_devices.append(self.backend.get_device_info_by_index(i)) - self._device = self.backend.get_default_output_device_info()['index'] if device is None else device + self._device = ( + self.backend.get_default_output_device_info()["index"] + if device is None + else device + ) self._channels = channels or self.max_out_chn + # Give extra periods for the ring buffer + self.ring_buffer = RingBuffer(self.bs * 4, self.channels) + + self.event_scheduler = EventScheduler(self.sr) + + self.mix_buffer = np.zeros((self.bs, self.channels), dtype=self.backend.dtype) + + self.stream = None self.gain = 1.0 - self.srv_onsets = [] - self.srv_curpos = [] # start of next frame to deliver - self.srv_asigs = [] - self.srv_outs = [] # output channel offset for that asig self.boot_time = 0 # time.time() when stream starts - self.block_cnt = 0 # nr. of callback invocations - self.block_duration = self.bs / self.sr # nominal time increment per callback self.block_time = 0 # estimated time stamp for current block self._stop = True - self.empty_buffer = np.zeros((self.bs, self.channels), dtype=self.backend.dtype) self._is_active = False @property @@ -125,11 +257,11 @@ def device_dict(self): @property def max_out_chn(self) -> int: - return int(self.device_dict['maxOutputChannels']) + return int(self.device_dict["maxOutputChannels"]) @property def max_in_chn(self) -> int: - return int(self.device_dict['maxInputChannels']) + return int(self.device_dict["maxInputChannels"]) @property def is_active(self) -> bool: @@ -141,9 +273,15 @@ def device(self): @device.setter def device(self, val): - self._device = val if val is not None else self.backend.get_default_output_device_info()['index'] + self._device = ( + val + if val is not None + else self.backend.get_default_output_device_info()["index"] + ) if self.max_out_chn < self.channels: - warn(f"Aserver: warning: {self.channels}>{self.max_out_chn} channels requested - truncated.") + warn( + f"Aserver: warning: {self.channels}>{self.max_out_chn} channels requested - truncated." + ) self.channels = self.max_out_chn def __repr__(self): @@ -155,15 +293,23 @@ def get_devices(self, verbose: bool = False): """Return (and optionally print) available input and output device""" if verbose: print("Input Devices: ") - [print(f"Index: {i['index']}, Name: {i['name']}, Channels: {i['maxInputChannels']}") - for i in self.input_devices] + [ + print( + f"Index: {i['index']}, Name: {i['name']}, Channels: {i['maxInputChannels']}" + ) + for i in self.input_devices + ] print("Output Devices: ") - [print(f"Index: {i['index']}, Name: {i['name']}, Channels: {i['maxOutputChannels']}") - for i in self.output_devices] + [ + print( + f"Index: {i['index']}, Name: {i['name']}, Channels: {i['maxOutputChannels']}" + ) + for i in self.output_devices + ] return self.input_devices, self.output_devices def set_device(self, idx: int, reboot: bool = True): - """Set audio device, an alternative way is to direct set the device property, i.e. Aserver.device = 1, + """Set audio device, an alternative way is to direct set the device property, i.e. Aserver.device = 1, but that will not reboot the server. Parameters @@ -191,12 +337,17 @@ def boot(self): return -1 self.boot_time = time.time() self.block_time = self.boot_time - self.block_cnt = 0 - self.stream = self.backend.open(channels=self.channels, rate=self.sr, - input_flag=False, output_flag=True, - frames_per_buffer=self.bs, - output_device_index=self.device, - stream_callback=self._play_callback) + self.ring_buffer.clear() + # self.block_cnt = 0 + self.stream = self.backend.open( + channels=self.channels, + rate=self.sr, + input_flag=False, + output_flag=True, + frames_per_buffer=self.bs, + output_device_index=self.device, + stream_callback=self._play_callback, + ) self._is_active = self.stream.is_active() _LOGGER.info("Server Booted") return self @@ -225,93 +376,54 @@ def play(self, asig, onset: Union[int, float] = 0, out: int = 0, **kwargs): out: int Output channel """ + if not self.is_active: + raise RuntimeError("Aserver not active") self._stop = False - sigid = id(asig) # for copy check if asig.sr != self.sr: asig = asig.resample(self.sr) + if onset < 1e6: rt_onset = time.time() + onset else: rt_onset = onset - idx = np.searchsorted(self.srv_onsets, rt_onset) - self.srv_onsets.insert(idx, rt_onset) + if asig.sig.dtype != self.backend.dtype: - warn("Not the same type. ") - if id(asig) == sigid: - asig = copy.copy(asig) - asig.sig = asig.sig.astype(self.backend.dtype) - # copy only relevant channels... - nchn = min(asig.channels, self.channels - out) # max number of copyable channels - # in: [:nchn] out: [out:out+nchn] - if id(asig) == sigid: - asig = copy.copy(asig) - if len(asig.sig.shape) == 1: - asig.sig = asig.sig.reshape(asig.samples, 1) - asig.sig = asig.sig[:, :nchn].reshape(asig.samples, nchn) - # asig.channels = nchn - # so now in callback safely copy to out:out+asig.sig.shape[1] - self.srv_asigs.insert(idx, asig) - self.srv_curpos.insert(idx, 0) - self.srv_outs.insert(idx, out) - if 'block' in kwargs and kwargs['block']: + asig_data = asig.sig.astype(self.backend.dtype) + else: + asig_data = asig.sig + + event = AudioEvent(asig_data, rt_onset, out) + self.event_scheduler.schedule(event) + + if "block" in kwargs and kwargs["block"]: if onset > 0: # here really omset and not rt_onset! _LOGGER.warning("blocking inactive with play(onset>0)") else: time.sleep(asig.get_duration()) + return self def _play_callback(self, in_data, frame_count, time_info, flag): """callback function, called from pastream thread when data needed.""" - tnow = self.block_time - self.block_time += self.block_duration - # self.block_cnt += 1 # TODO this will get very large eventually - # just curious - not needed but for time stability check - self.timejitter = time.time() - self.block_time - if self.timejitter > 3 * self.block_duration: - msg = f"Aserver late by {self.timejitter} seconds: block_time reset!" - _LOGGER.debug(msg) - self.block_time = time.time() - # to shortcut computing - if not self.srv_asigs or self.srv_onsets[0] > tnow: - return self.backend.process_buffer(self.empty_buffer) - elif self._stop: - self.srv_asigs.clear() - self.srv_onsets.clear() - self.srv_curpos.clear() - self.srv_outs.clear() - return self.backend.process_buffer(self.empty_buffer) - data = np.zeros((self.bs, self.channels), dtype=self.backend.dtype) - # iterate through all registered asigs, adding samples to play - dellist = [] # memorize completed items for deletion - t_next_block = tnow + self.bs / self.sr - for i, t in enumerate(self.srv_onsets): - if t > t_next_block: # doesn't begin before next block - break # since list is always onset-sorted - a = self.srv_asigs[i] - c = self.srv_curpos[i] - if t > tnow: # first block: apply precise zero padding - io0 = int((t - tnow) * self.sr) - else: - io0 = 0 - tmpsig = a.sig[c:c + self.bs - io0] - n, nch = tmpsig.shape - out = self.srv_outs[i] - # .reshape(n, nch) not needed as moved to play - data[io0:io0 + n, out:out + nch] += tmpsig - self.srv_curpos[i] += n - if self.srv_curpos[i] >= a.samples: - dellist.append(i) # store for deletion - # clean up lists - for i in dellist[::-1]: # traverse backwards! - del self.srv_asigs[i] - del self.srv_onsets[i] - del self.srv_curpos[i] - del self.srv_outs[i] - return self.backend.process_buffer(data * (self.backend.range * self.gain)) + current_time = self.block_time + self.block_time += self.bs / self.sr + + # This is fast. + self.mix_buffer.fill(0) + + if not self._stop: + self.event_scheduler.process(current_time, self.mix_buffer, self.bs) + + output_data = self.mix_buffer * (self.backend.range * self.gain) + + return self.backend.process_buffer(output_data) def stop(self): self._stop = True + self.event_scheduler.pending_events = [] + self.event_scheduler.active_events = [] + self.ring_buffer.clear() def __enter__(self): return self.boot() @@ -324,7 +436,7 @@ def __exit__(self, exc_type, exc_value, traceback): def __del__(self): """Backup cleanup, only if context manager wasn't used""" - if hasattr(self, 'stream') and self.stream is not None: + if hasattr(self, "stream") and self.stream is not None: try: self.quit() self.backend.terminate() diff --git a/pya/helper/ringbuffer.py b/pya/helper/ringbuffer.py new file mode 100644 index 0000000..32026b3 --- /dev/null +++ b/pya/helper/ringbuffer.py @@ -0,0 +1,105 @@ +import numpy as np + + +class RingBuffer: + def __init__(self, frames: int, channels: int, dtype: np.dtype = np.float32): + if not (frames & (frames - 1) == 0): + raise ValueError("frames must be a power of 2") + self._frames = frames + self._channels = channels + self._dtype = dtype + # bitmask for the circular buffer, faster than modulo + self._bitmask = frames - 1 + self._buffer = np.zeros((frames, channels), dtype=dtype) + self.write_index = 0 + self.read_index = 0 + + @property + def frames(self) -> int: + return self._frames + + @property + def channels(self) -> int: + return self._channels + + @property + def dtype(self) -> np.dtype: + return self._dtype + + @property + def buffer(self) -> np.ndarray: + return self._buffer + + def write(self, data: np.ndarray) -> int: + """Write data to the ring buffer, returns the number of frames written.""" + if len(data.shape) == 1: + data = data.reshape(1, -1) + + if data.shape[0] > self.frames: + raise ValueError( + "data frames size must not be greater than the ring buffer frames size" + ) + + if data.shape[1] > self.channels: + raise ValueError( + "data channels size must not be greater than the ring buffer channels size" + ) + + frames_to_write = data.shape[0] + channels_to_write = data.shape[1] + # Handle potential wrap-around + first_chunk = min(frames_to_write, self.frames - self.write_index) + self.buffer[ + self.write_index: self.write_index + first_chunk, :channels_to_write + ] = data[:first_chunk, :channels_to_write] + + if first_chunk < frames_to_write: + second_chunk = frames_to_write - first_chunk + self.buffer[:second_chunk, :channels_to_write] = data[ + first_chunk:frames_to_write, :channels_to_write + ] + + self.write_index = (self.write_index + frames_to_write) & self._bitmask + return frames_to_write + + def read(self, frames: int) -> np.ndarray: + """Read data from the ring buffer, returns the number of frames read.""" + if frames > self.frames: + raise ValueError( + "frames must not be greater than the ring buffer frames size" + ) + + frames_available = self.available_read() + frames_to_read = min(frames, frames_available) + + result = np.zeros((frames_to_read, self.channels), dtype=self.dtype) + + if frames_to_read == 0: + return result + + first_chunk = min(frames_to_read, self.capacity - self.read_pos) + result[:first_chunk] = self.buffer[self.read_pos: self.read_pos + first_chunk] + + # If we need to wrap around + if first_chunk < frames_to_read: + second_chunk = frames_to_read - first_chunk + result[first_chunk:] = self.buffer[:second_chunk] + + if advance: + self.read_pos = (self.read_pos + frames_to_read) % self.capacity + + return result + + def available_read(self) -> int: + if self.write_index >= self.read_index: + return self.write_index - self.read_index + else: + return self.frames - self.read_index + self.write_index + + def available_write(self) -> int: + return self.frames - self.available_read() - 1 + + def clear(self): + self.write_index = 0 + self.read_index = 0 + self._buffer.fill(0) diff --git a/pya/version.py b/pya/version.py index 7225152..43a1e95 100644 --- a/pya/version.py +++ b/pya/version.py @@ -1 +1 @@ -__version__ = "0.5.2" +__version__ = "0.5.3" diff --git a/tests/test_aserver.py b/tests/test_aserver.py index 1c6a9f6..722ebff 100644 --- a/tests/test_aserver.py +++ b/tests/test_aserver.py @@ -119,4 +119,3 @@ def test_incompatible_backend(self): s.boot() asine.play(server=s) s.quit() -