"""Desktop audio backends for SimVX.
Three backends ship in this module:
- ``MiniaudioBackend``: the production path. Mixes audio natively via
``ma_engine`` (CFFI wrapper at ``simvx.core._native.miniaudio_engine``).
The native extension is built at ``uv pip install`` time by the PEP 517
build hook in ``packages/core/build_audio_ext_hook.py``. If that build
is skipped or fails, the runtime falls back to legacy.
- ``_LegacyMiniaudioBackend``: the pure-Python fallback. Mixes in a
numpy generator callback driven by ``miniaudio.PlaybackDevice``. Slower
(target latency 100 ms vs 20 ms for the native path) and runs on the
Python audio thread under the GIL, so it's vulnerable to underruns
from heavy main-thread frames.
- ``NullAudioBackend``: silent no-op backend used when no audio device
is available (sandboxed CI, headless containers).
``make_backend()`` resolution order, with **loud** warnings at every
fallback (never silent degradation):
1. Native ``MiniaudioBackend``: picked when the extension is importable.
2. Legacy ``_LegacyMiniaudioBackend``: picked when native is unavailable
AND a real audio device opens. Emits a one-time WARNING explaining the
latency hit and the rebuild command.
3. Null ``NullAudioBackend``: picked when neither native nor legacy can
start (no audio device). Emits a one-time WARNING.
Set ``SIMVX_ALLOW_LEGACY_AUDIO=0`` to disable the legacy/null fallbacks
and raise :class:`AudioBackendUnavailable` instead.
The runtime never invokes a C compiler. Use ``simvx build-audio`` (or
``uv pip install --reinstall -e packages/core``) to (re)build the
extension manually.
"""
from __future__ import annotations
import atexit
import logging
import os
import threading
import weakref
from dataclasses import dataclass, field
from typing import TYPE_CHECKING, Any
import numpy as np
from ._native import miniaudio_engine as _me
from .audio_errors import (
AudioBackendUnavailable,
AudioCapabilityError,
AudioError,
InvalidStreamError,
raise_or_warn,
warn_once,
)
from .audio_protocol import CAPABILITIES_CORE, Capability
if TYPE_CHECKING:
from .audio import AudioStream
from .audio_bus import AudioBusLayout
log = logging.getLogger(__name__)
__all__ = [
"MiniaudioBackend",
"_LegacyMiniaudioBackend",
"NullAudioBackend",
"make_backend",
]
_DEFAULT_SAMPLE_RATE = 48000
_DEFAULT_CHANNELS = 2
# Per-channel high-water mark for the legacy backend's streaming buffer.
# Producer code that overfeeds (main thread runs faster than the audio
# period) used to grow the buffer without bound; now we drop oldest bytes
# on overflow and warn-once per channel. Override via
# ``SIMVX_AUDIO_STREAM_BUFFER_BYTES`` (positive int, bytes).
_DEFAULT_STREAM_BUFFER_MAX_BYTES = 1 << 20 # 1 MiB
def _stream_buffer_cap() -> int:
"""Resolve the legacy streaming buffer cap from env / default.
Reads ``SIMVX_AUDIO_STREAM_BUFFER_BYTES`` on every call so tests can
monkeypatch the env mid-suite. Falls back to the default if the value
is missing or unparseable.
"""
raw = os.environ.get("SIMVX_AUDIO_STREAM_BUFFER_BYTES")
if raw:
try:
cap = int(raw)
if cap > 0:
return cap
except ValueError:
pass
return _DEFAULT_STREAM_BUFFER_MAX_BYTES
# Streaming-capable compressed formats: only the native (miniaudio C
# extension) backend can decode these on-the-fly. The legacy pure-Python
# mixer would need its own streaming decoder, which doesn't exist.
_COMPRESSED_STREAMING_FORMATS: frozenset[Capability] = frozenset(
{Capability.STREAMING_OGG, Capability.STREAMING_MP3, Capability.STREAMING_FLAC}
)
# Capabilities advertised by the *native* desktop backend.
_DESKTOP_CAPABILITIES: frozenset[Capability] = (
CAPABILITIES_CORE | _COMPRESSED_STREAMING_FORMATS | frozenset({Capability.SPATIAL_DOPPLER})
)
# Capabilities advertised by the *legacy* (pure-Python) desktop backend.
# Drops the compressed-streaming formats since the mixer doesn't carry a
# decoder: ``open_stream`` raises :class:`AudioCapabilityError` for them
# rather than feeding container bytes through ``np.frombuffer(dtype=int16)``
# and producing noise.
_LEGACY_CAPABILITIES: frozenset[Capability] = (
CAPABILITIES_CORE | frozenset({Capability.SPATIAL_DOPPLER})
)
# Effect capabilities the *native* backend additionally supports: built-in
# ma_*_node filters/delay plus custom DSP nodes (freeverb, compressor,
# soft-clip). Legacy backend doesn't advertise these: the Python mixer
# can't run them at acceptable performance.
_NATIVE_EFFECT_CAPABILITIES: frozenset[Capability] = frozenset(
{
Capability.EFFECT_GAIN,
Capability.EFFECT_FILTER_BIQUAD,
Capability.EFFECT_DELAY,
Capability.EFFECT_PARAMETRIC_EQ,
Capability.EFFECT_REVERB,
Capability.EFFECT_SOFTCLIP,
Capability.EFFECT_COMPRESSOR,
}
)
# ---------------------------------------------------------------------------
# Shared helpers
# ---------------------------------------------------------------------------
def _db_to_linear(db: float) -> float:
"""Convert decibels to linear gain. ``db <= -80`` is treated as silence."""
if db <= -80.0:
return 0.0
return 10.0 ** (db / 20.0)
def _decode_stream_to_ndarray(
stream: AudioStream, sample_rate: int, nchannels: int
) -> np.ndarray | None:
"""Decode an AudioStream into float32 interleaved samples.
Used by the legacy backend (which mixes in Python) and by the native
backend when the source is an ndarray-backed ``AudioStream`` (e.g.
``AudioStream.from_pcm`` or anything baked by ``AudioSynth``).
File-backed streams on the native path go through
``ma_sound_init_from_file`` instead.
"""
import miniaudio
if stream.backend_data is not None and isinstance(stream.backend_data, np.ndarray):
return stream.backend_data
path = stream.path
if not path:
return None
try:
decoded = miniaudio.decode_file(
path,
output_format=miniaudio.SampleFormat.SIGNED16,
nchannels=nchannels,
sample_rate=sample_rate,
)
except (miniaudio.DecodeError, FileNotFoundError, OSError) as exc:
log.warning("audio_backend: failed to decode %r: %s", path, exc)
return None
raw = np.frombuffer(decoded.samples, dtype=np.int16).astype(np.float32) / 32768.0
stream.backend_data = raw
return raw
# ===========================================================================
# Legacy (pure-Python) backend
# ===========================================================================
@dataclass
class _Channel:
"""A single active voice in the legacy mixer."""
samples: np.ndarray # float32, interleaved stereo
cursor: int = 0
total_frames: int = 0
volume: float = 1.0
pan: float = 0.0
pitch: float = 1.0
loop: bool = False
paused: bool = False
stopped: bool = False
bus: str = "Master"
streaming: bool = False
stream_buffer: bytearray = field(default_factory=bytearray)
# Monotonic channel id counter (shared across legacy + native: both keep
# `dict[int, ...]` lookups so the global counter avoids collisions).
_next_id = 0
_id_lock = threading.Lock()
def _alloc_channel_id() -> int:
global _next_id
with _id_lock:
_next_id += 1
return _next_id
class _LegacyMiniaudioBackend:
"""Pure-Python audio mixer running on miniaudio's audio thread.
Buffers at 100 ms latency to survive GIL stalls. Kept as a fallback
for environments without the native extension built.
"""
def __init__(
self, sample_rate: int = _DEFAULT_SAMPLE_RATE, nchannels: int = _DEFAULT_CHANNELS
):
import miniaudio
self._sample_rate = sample_rate
self._nchannels = nchannels
self._lock = threading.Lock()
self._channels: dict[int, _Channel] = {}
self._device = miniaudio.PlaybackDevice(
output_format=miniaudio.SampleFormat.SIGNED16,
nchannels=nchannels,
sample_rate=sample_rate,
buffersize_msec=100,
)
gen = self._audio_callback()
next(gen)
self._device.start(gen)
# Tracks whether shutdown has been called: flipped by ``shutdown``,
# asserted by tests. miniaudio's audio thread is a native C thread
# (not visible to Python's threading module), but PlaybackDevice.close
# joins it synchronously, so after ``shutdown`` the leak is gone.
self._shutdown_done: bool = False
# Belt-and-braces: register an atexit hook so a `sys.exit(0)` from
# anywhere (test runners, demos that bypass App.quit()) still
# shuts down the audio thread. Captured by weakref so the hook
# doesn't pin the backend alive. Without this the legacy fallback
# reproduces the original `feedback_app_quit_pattern.md` hang.
_register_atexit_shutdown(self)
def _audio_callback(self):
out_bytes = b""
while True:
num_frames = yield out_bytes
if num_frames is None or num_frames <= 0:
out_bytes = b""
continue
total_samples = num_frames * self._nchannels
mix = np.zeros(total_samples, dtype=np.float32)
bus_gains = self._snapshot_bus_gains()
with self._lock:
dead: list[int] = []
for ch_id, ch in self._channels.items():
if ch.stopped or ch.paused:
if ch.stopped:
dead.append(ch_id)
continue
if ch.streaming:
self._mix_streaming(ch, mix, num_frames, bus_gains)
continue
self._mix_channel(ch, mix, num_frames, bus_gains)
if ch.stopped:
dead.append(ch_id)
for ch_id in dead:
del self._channels[ch_id]
np.clip(mix, -1.0, 1.0, out=mix)
out_bytes = (mix * 32767).astype(np.int16).tobytes()
def _mix_channel(
self, ch: _Channel, mix: np.ndarray, num_frames: int, bus_gains: dict[str, float]
) -> None:
samples = ch.samples
n_total = ch.total_frames
cursor = ch.cursor
pitch = max(0.1, ch.pitch)
# Exact lookup: bus names are case-sensitive and the layout is
# the authoritative source. A miss means a player referenced a
# bus that doesn't exist; surface that as :class:`UnknownBusError`
# so the caller fixes the typo or registers the bus.
if ch.bus not in bus_gains:
from .audio_errors import UnknownBusError
raise UnknownBusError(ch.bus, available=list(bus_gains.keys()))
vol = ch.volume * bus_gains[ch.bus]
left_gain = vol * min(1.0, 1.0 - ch.pan)
right_gain = vol * min(1.0, 1.0 + ch.pan)
if abs(pitch - 1.0) < 0.01:
remaining = n_total - cursor
if remaining <= 0:
if ch.loop:
ch.cursor = 0
cursor = 0
remaining = n_total
else:
ch.stopped = True
return
frames_to_copy = min(num_frames, remaining)
src_start = cursor * 2
src_end = src_start + frames_to_copy * 2
chunk = samples[src_start:src_end].copy()
chunk[0::2] *= left_gain
chunk[1::2] *= right_gain
mix[: frames_to_copy * 2] += chunk
ch.cursor = cursor + frames_to_copy
if frames_to_copy < num_frames and ch.loop:
ch.cursor = 0
remainder_mix = mix[frames_to_copy * 2 :]
self._mix_channel(ch, remainder_mix, num_frames - frames_to_copy, bus_gains)
elif ch.cursor >= n_total and not ch.loop:
ch.stopped = True
else:
positions = float(cursor) + np.arange(num_frames, dtype=np.float64) * pitch
if ch.loop:
positions %= n_total
int_pos = positions.astype(np.int64)
next_pos = (int_pos + 1) % n_total
n = num_frames
else:
n = int(np.searchsorted(positions, n_total, side="left"))
if n <= 0:
ch.stopped = True
ch.cursor = n_total
return
positions = positions[:n]
int_pos = positions.astype(np.int64)
next_pos = np.minimum(int_pos + 1, n_total - 1)
if n < num_frames:
ch.stopped = True
frac = (positions - int_pos).astype(np.float32)
left = samples[0::2]
right = samples[1::2]
left_out = left[int_pos] + frac * (left[next_pos] - left[int_pos])
right_out = right[int_pos] + frac * (right[next_pos] - right[int_pos])
mix[: n * 2 : 2] += left_out * left_gain
mix[1 : n * 2 : 2] += right_out * right_gain
if ch.loop:
ch.cursor = int((float(cursor) + num_frames * pitch) % n_total)
else:
ch.cursor = int(float(cursor) + n * pitch)
def _mix_streaming(
self, ch: _Channel, mix: np.ndarray, num_frames: int, bus_gains: dict[str, float]
) -> None:
buf = ch.stream_buffer
bytes_needed = num_frames * self._nchannels * 2
available = min(len(buf), bytes_needed)
if available < 4:
return
available -= available % (self._nchannels * 2)
raw = np.frombuffer(buf[:available], dtype=np.int16).astype(np.float32) / 32768.0
if ch.bus not in bus_gains:
from .audio_errors import UnknownBusError
raise UnknownBusError(ch.bus, available=list(bus_gains.keys()))
vol = ch.volume * bus_gains[ch.bus]
left_gain = vol * min(1.0, 1.0 - ch.pan)
right_gain = vol * min(1.0, 1.0 + ch.pan)
raw[0::2] *= left_gain
raw[1::2] *= right_gain
mix[: len(raw)] += raw
del buf[:available]
def play_audio(
self,
stream: AudioStream,
*,
mode: str = "non_positional",
position: Any = None,
volume_db: float = 0.0,
pitch: float = 1.0,
loop: bool = False,
bus: str = "Master",
max_distance: float = 100.0,
from_position: float = 0.0,
pan: float = 0.0,
gain_db: float = 0.0,
) -> int | None:
# ``mode``, ``position`` and ``max_distance`` are part of the unified
# Protocol shape; the legacy mixer is non-spatial so spatial Node
# callers pre-compute ``pan`` + ``volume_db`` from the listener and
# we just apply those. ``gain_db`` is reserved for future per-play
# extra-gain trims and currently composes additively with ``volume_db``.
del mode, position, max_distance
samples = _decode_stream_to_ndarray(stream, self._sample_rate, self._nchannels)
if samples is None:
return None
ch = _Channel(
samples=samples,
total_frames=len(samples) // self._nchannels,
volume=_db_to_linear(volume_db + gain_db),
# Seed pan on the channel struct *before* it's inserted into
# ``_channels`` so the audio thread reads the correct value on
# its very first mix pass. Fixes bug-audio-legacy-spatial-first-frame.
pan=max(-1.0, min(1.0, float(pan))),
pitch=pitch,
loop=loop,
bus=bus,
)
if from_position > 0.0:
# _Channel.cursor is the frame index the next read draws from;
# mixer code multiplies by nchannels to index into ``samples``.
ch.cursor = int(float(from_position) * self._sample_rate)
ch_id = _alloc_channel_id()
with self._lock:
self._channels[ch_id] = ch
return ch_id
def stop_audio(self, channel_id: int) -> None:
with self._lock:
ch = self._channels.get(channel_id)
if ch:
ch.stopped = True
def pause_audio(self, channel_id: int) -> None:
with self._lock:
ch = self._channels.get(channel_id)
if ch:
ch.paused = True
def resume_audio(self, channel_id: int) -> None:
with self._lock:
ch = self._channels.get(channel_id)
if ch:
ch.paused = False
def update_audio_2d(self, channel_id: int, volume_db: float, pan: float) -> None:
with self._lock:
ch = self._channels.get(channel_id)
if ch:
ch.volume = _db_to_linear(volume_db)
ch.pan = max(-1.0, min(1.0, pan))
def update_audio_3d(
self, channel_id: int, volume_db: float, pan: float, pitch: float
) -> None:
with self._lock:
ch = self._channels.get(channel_id)
if ch:
ch.volume = _db_to_linear(volume_db)
ch.pan = max(-1.0, min(1.0, pan))
ch.pitch = max(0.1, pitch)
def set_pitch(self, channel_id: int, pitch: float) -> None:
with self._lock:
ch = self._channels.get(channel_id)
if ch:
ch.pitch = max(0.1, pitch)
def get_playback_position(self, channel_id: int) -> float:
with self._lock:
ch = self._channels.get(channel_id)
if ch:
return ch.cursor / self._sample_rate
return 0.0
def is_channel_active(self, channel_id: int) -> bool:
with self._lock:
ch = self._channels.get(channel_id)
return ch is not None and not ch.stopped
def open_stream(
self,
*,
volume_db: float = 0.0,
bus: str = "Master",
buffer_seconds: float = 0.5,
stream: AudioStream | None = None,
) -> int:
# Legacy path: bytearray-backed; buffer_seconds is informational only
# (the buffer is whatever feed_audio_chunk has accumulated). Pitch
# is not a kwarg: call ``set_pitch(channel, pitch)`` after open
# to modulate playback rate.
del buffer_seconds # silence linter; the kwarg is here for Protocol parity
# The pure-Python mixer has no compressed-format decoder. Streaming a
# WAV (or feeding raw PCM) works because the mixer reads bytes
# straight as int16 from the producer; compressed formats would
# need a streaming decoder that doesn't exist on this path. Fail
# loud rather than producing the noise the bug-fix audit caught.
if stream is not None:
container = stream.container
if container in ("ogg", "mp3", "flac", "unknown"):
raise AudioCapabilityError(
f"open_stream({container})",
backend="_LegacyMiniaudioBackend",
advertised=self.list_capabilities(),
remediation=(
"Legacy backend only supports WAV streaming. Install the "
"native extension (uv run --with setuptools simvx build-audio) "
"or convert the file to WAV."
),
)
ch = _Channel(
samples=np.empty(0, dtype=np.float32),
streaming=True,
volume=_db_to_linear(volume_db),
bus=bus,
)
ch_id = _alloc_channel_id()
with self._lock:
self._channels[ch_id] = ch
return ch_id
def feed_audio_chunk(self, channel_id: int, chunk: bytes) -> None:
cap = _stream_buffer_cap()
with self._lock:
ch = self._channels.get(channel_id)
if ch and ch.streaming:
ch.stream_buffer.extend(chunk)
# Bound the buffer: drop oldest bytes on overflow so playback
# stays smooth instead of OOM-ing. Trim along a frame boundary
# so the next mix doesn't read half a sample.
overflow = len(ch.stream_buffer) - cap
if overflow > 0:
bytes_per_frame = self._nchannels * 2
# Round up to the next frame boundary; safe even if cap is
# not frame-aligned because we then drop slightly more.
drop = overflow + ((-overflow) % bytes_per_frame)
if drop > len(ch.stream_buffer):
drop = len(ch.stream_buffer)
del ch.stream_buffer[:drop]
warn_once(
f"audio.legacy.stream_overrun.ch{channel_id}",
"Legacy streaming buffer overrun on channel %d; "
"dropping oldest %d bytes (raise "
"SIMVX_AUDIO_STREAM_BUFFER_BYTES to allocate more "
"headroom)",
channel_id,
drop,
)
def shutdown(self) -> None:
try:
if self._device is not None:
self._device.close()
except Exception as exc:
raise_or_warn(
exc,
key="audio.legacy.shutdown.device_close_failed",
message="Legacy audio backend: failed to close playback device",
)
with self._lock:
self._channels.clear()
self._shutdown_done = True
def list_capabilities(self) -> frozenset[Capability]:
return _LEGACY_CAPABILITIES
def sync_bus_layout(self, layout: AudioBusLayout) -> None:
# Bus state already polled per audio period via _snapshot_bus_gains.
return
# Listener pose endpoints: legacy mixer has no spatializer, so accept
# the call and store nothing. AudioListener3D pushes these every frame;
# we return success rather than raise because the backend is correctly
# honouring the contract (no spatial capability advertised).
def set_listener_position(self, x: float, y: float, z: float) -> None: return
def set_listener_velocity(self, x: float, y: float, z: float) -> None: return
def set_listener_direction(self, x: float, y: float, z: float) -> None: return
def set_listener_world_up(self, x: float, y: float, z: float) -> None: return
def _snapshot_bus_gains(self) -> dict[str, float]:
"""Return ``{bus_name: linear_gain}`` for every bus in the active layout.
Lookup in the mixer is exact / case-sensitive. A channel that
references a bus not present in the layout raises
:class:`UnknownBusError` from :meth:`_render_audio` rather than
silently falling through to Master gain.
"""
from .audio_bus import AudioBusLayout
layout = AudioBusLayout.get_default()
return {bus.name: bus.effective_linear_volume for bus in layout.buses}
# ===========================================================================
# Native (ma_engine) backend
# ===========================================================================
@dataclass
class _SoundEntry:
"""One active voice on the native backend."""
sound: Any # _me.Sound
bus: str
keeper: Any = None # AudioBuffer (pin against GC) when buffer-backed
paused: bool = False
# True for channels opened via ``open_stream`` (chunk-fed PCM). Native
# streams build their Sound with ``pitch_enabled=False`` so the resampler
# can't read past what the producer has written: that makes ``set_pitch``
# against a streaming channel a guaranteed no-op. Used to raise
# ``AudioCapabilityError`` instead of silently dropping the change.
streaming: bool = False
# Pre-paused position so a later `resume_audio` can restart from the cursor.
# ma_sound_start auto-resumes from the internal cursor after ma_sound_stop;
# we just need to remember whether we deliberately paused.
class _NativeEffectChain:
"""Per-bus chain of native ma_*_node effects between a SoundGroup and the destination.
Chain shape::
group_node → effect[0] → effect[1] → … → effect[N-1] → dest_node
Empty effect list collapses to a direct ``group_node → dest_node`` link.
The chain is rebuilt in place whenever the bus's effect list (signature)
changes; cheap because each rebuild detaches the source group from its
current next and re-attaches through the new nodes.
Effects with `effect.enabled = False` are skipped during rebuild
(the chain looks like they aren't there at all).
"""
def __init__(self, engine, group, destination):
self._engine = engine
self._group = group # _me.SoundGroup
self._destination = destination # ma_node* (engine endpoint or parent group)
self._nodes: list[Any] = [] # _me.EffectNode list, in order
self._signature: list[tuple] = [] # captured from each effect for diff
# GainEffect is a non-node effect that scales the output bus
# volume on the final attach. Track the cumulative scalar so multiple
# gain stages compose. (FadeEffect was removed in the audio refactor,
# per-player fades live on AudioStreamPlayer.fade_in/fade_out.)
self._output_volume: float = 1.0
@staticmethod
def _effect_signature(effect) -> tuple:
"""Hashable signature for an effect; rebuild only on real changes.
Float comparisons are exact: chain rebuilds on every nudge are
acceptable because the reconciler is hash-based and cheap
(signature tuples are hashable, reconcile short-circuits when
nothing changed). The previous ``round(x, 3)`` quantisation
silently dropped continuous filter sweeps below 0.001.
"""
from .audio_effect import (
BandPassFilter,
CompressorEffect,
DelayEffect,
GainEffect,
HighPassFilter,
LowPassFilter,
NotchFilter,
ParametricEQ,
ReverbEffect,
SoftClipEffect,
)
if isinstance(effect, GainEffect):
return ("gain", effect.volume_db, effect.enabled)
if isinstance(effect, LowPassFilter):
return ("lowpass", effect.cutoff_hz, effect.q, effect.enabled)
if isinstance(effect, HighPassFilter):
return ("highpass", effect.cutoff_hz, effect.q, effect.enabled)
if isinstance(effect, BandPassFilter):
return ("bandpass", effect.cutoff_hz, effect.q, effect.enabled)
if isinstance(effect, NotchFilter):
return ("notch", effect.cutoff_hz, effect.q, effect.enabled)
if isinstance(effect, DelayEffect):
return ("delay", effect.time_seconds, effect.feedback,
effect.wet, effect.dry, effect.enabled)
if isinstance(effect, ParametricEQ):
return ("paramteq", tuple(
(b.type, b.freq, b.q, b.gain_db)
for b in effect.bands
), effect.enabled)
if isinstance(effect, ReverbEffect):
return ("reverb", effect.room_size, effect.damping,
effect.wet, effect.dry,
effect.width, bool(effect.freeze), effect.enabled)
if isinstance(effect, SoftClipEffect):
return ("softclip", effect.drive, effect.output_gain, effect.enabled)
if isinstance(effect, CompressorEffect):
return ("compressor", effect.threshold_db, effect.ratio,
effect.attack_ms, effect.release_ms,
effect.knee_db, effect.makeup_db, effect.enabled)
return ("unknown", id(effect))
def reconcile(self, effects: list) -> None:
"""Update the native chain to match `effects`.
Cheap if the signature didn't change. Otherwise tears down and
rebuilds the chain via ma_node_attach_output_bus.
"""
new_sig = [self._effect_signature(e) for e in effects]
if new_sig == self._signature:
return
self._teardown()
self._signature = new_sig
self._output_volume = 1.0
group_node = _me.sound_group_as_node(self._group)
prev_node = group_node
for effect in effects:
if not effect.enabled:
continue
built = self._build_effect_node(effect)
if built is None:
# Could be a non-node effect (Gain / Fade): handled separately.
if hasattr(effect, "volume_db") and effect.__class__.__name__ == "GainEffect":
self._output_volume *= 10.0 ** (effect.volume_db / 20.0)
continue
_me.attach_node(prev_node, 0, built.handle, 0)
self._nodes.append(built)
prev_node = built.handle
# Final attach: chain tail → destination, with cumulative output_volume.
_me.attach_node(prev_node, 0, self._destination, 0)
if abs(self._output_volume - 1.0) > 1e-6:
try:
_me.set_node_output_volume(prev_node, 0, self._output_volume)
except Exception as exc:
raise_or_warn(
exc,
key="audio.native.chain.output_volume_failed",
message="Native effect chain: failed to set node output volume",
)
def _build_effect_node(self, effect):
"""Construct one native EffectNode for `effect`, or None if not supported."""
from .audio_effect import (
BandPassFilter,
CompressorEffect,
DelayEffect,
HighPassFilter,
LowPassFilter,
NotchFilter,
ParametricEQ,
ReverbEffect,
SoftClipEffect,
)
try:
if isinstance(effect, LowPassFilter):
return _me.LPFEffectNode(self._engine, cutoff_hz=effect.cutoff_hz, q=effect.q)
if isinstance(effect, HighPassFilter):
return _me.HPFEffectNode(self._engine, cutoff_hz=effect.cutoff_hz, q=effect.q)
if isinstance(effect, BandPassFilter):
return _me.BPFEffectNode(self._engine, cutoff_hz=effect.cutoff_hz, q=effect.q)
if isinstance(effect, NotchFilter):
return _me.NotchEffectNode(self._engine, cutoff_hz=effect.cutoff_hz, q=effect.q)
if isinstance(effect, DelayEffect):
return _me.DelayEffectNode(
self._engine,
delay_seconds=effect.time_seconds,
decay=effect.feedback,
wet=effect.wet,
dry=effect.dry,
)
if isinstance(effect, ParametricEQ):
# Use a tiny chain inside one "effect": return a composite-handle wrapper.
return _EQBandChain(self._engine, effect.bands)
# Custom DSP nodes
if isinstance(effect, ReverbEffect):
return _me.FreeverbEffectNode(
self._engine,
room_size=effect.room_size,
damping=effect.damping,
wet=effect.wet,
dry=effect.dry,
width=effect.width,
freeze=effect.freeze,
)
if isinstance(effect, SoftClipEffect):
return _me.SoftClipEffectNode(
self._engine,
drive=effect.drive,
output_gain=effect.output_gain,
)
if isinstance(effect, CompressorEffect):
return _me.CompressorEffectNode(
self._engine,
threshold_db=effect.threshold_db,
ratio=effect.ratio,
attack_ms=effect.attack_ms,
release_ms=effect.release_ms,
knee_db=effect.knee_db,
makeup_db=effect.makeup_db,
)
except Exception as exc:
raise AudioCapabilityError(
capability=f"effect.{type(effect).__name__}",
backend="MiniaudioBackend",
advertised=_DESKTOP_CAPABILITIES | _NATIVE_EFFECT_CAPABILITIES,
) from exc
return None
def _teardown(self) -> None:
"""Tear down the current chain back to the original source → destination link."""
try:
group_node = _me.sound_group_as_node(self._group)
_me.detach_node(group_node, 0)
except Exception as exc:
raise_or_warn(
exc,
key="audio.native.teardown.group_detach_failed",
message="Native effect chain teardown: group detach failed",
)
for node in self._nodes:
try:
_me.detach_node(node.handle, 0)
except Exception as exc:
raise_or_warn(
exc,
key="audio.native.teardown.node_detach_failed",
message="Native effect chain teardown: node detach failed",
)
try:
node.shutdown()
except Exception as exc:
raise_or_warn(
exc,
key="audio.native.teardown.node_shutdown_failed",
message="Native effect chain teardown: node shutdown failed",
)
self._nodes.clear()
# Restore direct group → destination edge.
try:
_me.attach_node(
_me.sound_group_as_node(self._group), 0, self._destination, 0
)
except Exception as exc:
raise_or_warn(
exc,
key="audio.native.teardown.restore_default_link_failed",
message="Native effect chain teardown: failed to restore default link",
)
def shutdown(self) -> None:
self._teardown()
self._signature = []
class _EQBandChain:
"""Composite EffectNode for ParametricEQ: chains peak/loshelf/hishelf in series.
Exposes a single ``handle`` (the chain input) and an ``output_handle``
(the chain tail) so the bus chain logic can attach to it like one
effect. Subnodes are shut down together.
"""
def __init__(self, engine, bands):
self._engine = engine
self._subnodes: list[Any] = []
self._tail = None
if not bands:
# Empty EQ collapses to a single noop pass: use a peak node with 0 dB gain.
self._subnodes.append(_me.PeakEffectNode(engine, freq=1000.0, q=1.0, gain_db=0.0))
self._tail = self._subnodes[-1].handle
return
for band in bands:
t = (band.type or "peaking").lower()
if t == "lowshelf":
node = _me.LowShelfEffectNode(engine, freq=band.freq, q=band.q, gain_db=band.gain_db)
elif t == "highshelf":
node = _me.HighShelfEffectNode(engine, freq=band.freq, q=band.q, gain_db=band.gain_db)
else: # peaking or unknown
node = _me.PeakEffectNode(engine, freq=band.freq, q=band.q, gain_db=band.gain_db)
if self._subnodes:
_me.attach_node(self._subnodes[-1].handle, 0, node.handle, 0)
self._subnodes.append(node)
self._tail = self._subnodes[-1].handle
@property
def handle(self):
# External attach point: chain input.
return self._subnodes[0].handle if self._subnodes else None
@property
def output_handle(self):
return self._tail
def shutdown(self) -> None:
# Tear down in reverse: detach tail-to-tail first, then each subnode.
for node in reversed(self._subnodes):
try:
_me.detach_node(node.handle, 0)
except Exception as exc:
raise_or_warn(
exc,
key="audio.native.eqchain.subnode_detach_failed",
message="EQ band chain shutdown: subnode detach failed",
)
try:
node.shutdown()
except Exception as exc:
raise_or_warn(
exc,
key="audio.native.eqchain.subnode_shutdown_failed",
message="EQ band chain shutdown: subnode shutdown failed",
)
self._subnodes.clear()
[docs]
class MiniaudioBackend:
"""Native-mixed audio backend on top of ``ma_engine``.
All mixing, spatialization, and DSP runs in native C: Python only
pushes parameter updates. Target latency: ``buffersize_msec=20`` (vs
the legacy backend's 100 ms).
The native extension must be built once after install:
simvx build-audio
If the extension is missing, the constructor raises
``MiniaudioEngineUnavailable``. Use ``make_backend()`` for an
automatic fallback to ``_LegacyMiniaudioBackend`` with a one-time
warning.
"""
def __init__(
self,
sample_rate: int = _DEFAULT_SAMPLE_RATE,
nchannels: int = _DEFAULT_CHANNELS,
):
self._sample_rate = sample_rate
self._nchannels = nchannels
self._lock = threading.Lock()
self._sounds: dict[int, _SoundEntry] = {}
self._groups: dict[str, _me.SoundGroup] = {}
# Per-bus native effect chains (one per group). Built lazily on
# first sync_bus_layout that sees a non-empty effects list.
self._effect_chains: dict[str, _NativeEffectChain] = {}
# Snapshot of (volume_db, mute, send_to) per bus so sync_bus_layout
# is cheap to re-call every frame: only push to native on change.
self._bus_snapshot: dict[str, tuple[float, bool, str]] = {}
self._engine = _me.Engine(
sample_rate=sample_rate, channels=nchannels, device=True
)
# Seed default groups so unfamiliar bus names play at master gain.
self._ensure_default_groups()
# Belt-and-braces: register an atexit hook so a `sys.exit(0)` from
# anywhere (test runners, demos that bypass App.quit()) still
# shuts down the engine and joins miniaudio's audio thread.
# Captured by weakref so the hook doesn't pin the backend alive.
_register_atexit_shutdown(self)
# -- internal helpers -----------------------------------------------------
def _ensure_default_groups(self) -> None:
from .audio_bus import AudioBusLayout
layout = AudioBusLayout.get_default()
self.sync_bus_layout(layout)
def _ensure_group(self, bus_name: str) -> _me.SoundGroup:
"""Return the SoundGroup for ``bus_name``, lazily creating if absent.
Bus names must match the active :class:`AudioBusLayout` exactly:
lookup is case-sensitive. Unknown buses raise
:class:`UnknownBusError` so typos surface at the call site rather
than playing on a phantom root-attached group.
"""
group = self._groups.get(bus_name)
if group is not None:
return group
# Validate against the active layout before allocating native state.
from .audio_bus import AudioBusLayout
layout = AudioBusLayout.get_default()
layout.get_bus(bus_name) # raises UnknownBusError on miss
try:
group = _me.SoundGroup(self._engine)
except Exception as exc:
raise AudioError(
f"Failed to create native group for bus {bus_name!r}"
) from exc
self._groups[bus_name] = group
return group
def _make_sound(
self,
stream: AudioStream,
*,
bus: str,
spatial: bool,
pitch_enabled: bool,
) -> tuple[Any, Any] | tuple[None, None]:
"""Build a ``_me.Sound`` for `stream`. Returns (sound, keeper).
`keeper` pins an ``AudioBuffer`` when the source is an ndarray;
``None`` when loaded from a file path.
"""
group = self._ensure_group(bus)
backend_data = getattr(stream, "backend_data", None)
if isinstance(backend_data, np.ndarray):
try:
buf = _me.AudioBuffer(
backend_data,
sample_rate=self._sample_rate,
channels=self._nchannels,
)
sound = _me.Sound.from_buffer(
self._engine,
buf,
group=group,
spatial=spatial,
pitch_enabled=pitch_enabled,
)
return sound, buf
except Exception as exc:
raise InvalidStreamError(
f"Failed to play ndarray source: {exc}"
) from exc
path = getattr(stream, "path", None)
if path:
try:
sound = _me.Sound.from_file(
self._engine,
path,
group=group,
spatial=spatial,
pitch_enabled=pitch_enabled,
)
return sound, None
except Exception as exc:
raise InvalidStreamError(
f"Failed to play file {path!r}: {exc}"
) from exc
return None, None
# -- public interface -----------------------------------------------------
[docs]
def play_audio(
self,
stream: AudioStream,
*,
mode: str = "non_positional",
position: Any = None,
volume_db: float = 0.0,
pitch: float = 1.0,
loop: bool = False,
bus: str = "Master",
max_distance: float = 100.0,
from_position: float = 0.0,
pan: float = 0.0,
gain_db: float = 0.0,
) -> int | None:
# 2D / 3D players compute ``pan`` + ``volume_db`` Python-side and
# forward them here so the native spatializer applies them before
# the first buffer (matches web backend behaviour exactly). HRTF
# is intentionally not used for the 2D path: wrong shape for 2D
# scenes, so the native ``spatial`` flag stays False; ``mode`` is
# accepted for Protocol parity but the native ma_engine path does
# not currently branch on it.
del mode, position, max_distance
sound, keeper = self._make_sound(
stream, bus=bus, spatial=False, pitch_enabled=True
)
if sound is None:
return None
sound.set_volume(_db_to_linear(volume_db + gain_db))
sound.set_pitch(max(0.1, float(pitch)))
sound.set_looping(bool(loop))
# Apply pan *before* ``sound.start()`` so the audio thread starts
# mixing with the correct pan on its very first buffer. Without
# this, spatial players play centred for one buffer (~20 ms on the
# native path) before the per-frame ``update_audio_2d/3d`` lands.
sound.set_pan(max(-1.0, min(1.0, float(pan))))
if from_position > 0.0:
try:
sound.seek_to_frame(int(float(from_position) * self._sample_rate))
except Exception as exc:
sound.shutdown()
raise AudioError(
f"seek_to_frame failed for from_position={from_position}: {exc}"
) from exc
try:
sound.start()
except Exception as exc:
sound.shutdown()
raise AudioError(f"sound.start failed: {exc}") from exc
cid = _alloc_channel_id()
with self._lock:
self._sounds[cid] = _SoundEntry(sound=sound, bus=bus, keeper=keeper)
return cid
[docs]
def stop_audio(self, channel_id: int) -> None:
with self._lock:
entry = self._sounds.pop(channel_id, None)
if entry is None:
return
try:
entry.sound.stop()
except Exception as exc:
raise_or_warn(
exc,
key="audio.native.stop_audio.sound_stop_failed",
message="MiniaudioBackend.stop_audio: sound.stop failed",
)
entry.sound.shutdown()
[docs]
def pause_audio(self, channel_id: int) -> None:
with self._lock:
entry = self._sounds.get(channel_id)
if entry is None:
return
try:
entry.sound.stop()
entry.paused = True
except Exception as exc:
raise_or_warn(
exc,
key="audio.native.pause_audio.sound_stop_failed",
message="MiniaudioBackend.pause_audio: sound.stop failed",
)
[docs]
def resume_audio(self, channel_id: int) -> None:
with self._lock:
entry = self._sounds.get(channel_id)
if entry is None or not entry.paused:
return
try:
entry.sound.start()
entry.paused = False
except Exception as exc:
raise_or_warn(
exc,
key="audio.native.resume_audio.sound_start_failed",
message="MiniaudioBackend.resume_audio: sound.start failed",
)
[docs]
def update_audio_2d(self, channel_id: int, volume_db: float, pan: float) -> None:
with self._lock:
entry = self._sounds.get(channel_id)
if entry is None:
return
entry.sound.set_volume(_db_to_linear(volume_db))
entry.sound.set_pan(max(-1.0, min(1.0, float(pan))))
[docs]
def update_audio_3d(
self, channel_id: int, volume_db: float, pan: float, pitch: float
) -> None:
with self._lock:
entry = self._sounds.get(channel_id)
if entry is None:
return
entry.sound.set_volume(_db_to_linear(volume_db))
entry.sound.set_pan(max(-1.0, min(1.0, float(pan))))
entry.sound.set_pitch(max(0.1, float(pitch)))
[docs]
def set_pitch(self, channel_id: int, pitch: float) -> None:
with self._lock:
entry = self._sounds.get(channel_id)
if entry is None:
return
if entry.streaming:
# Native streaming sounds are built with ``pitch_enabled=False``
# (the resampler would read past what the producer has written);
# surface that as a typed capability error rather than silently
# dropping the change.
raise AudioCapabilityError(
"pitch.streaming",
backend="MiniaudioBackend",
advertised=self.list_capabilities(),
remediation=(
"Pitch modulation is not supported on streaming sounds. "
"Use AudioStreamPlayer with stream_mode='memory' for "
"pitch control."
),
)
entry.sound.set_pitch(max(0.1, float(pitch)))
[docs]
def get_playback_position(self, channel_id: int) -> float:
with self._lock:
entry = self._sounds.get(channel_id)
if entry is None:
return 0.0
return entry.sound.cursor_frames() / float(self._sample_rate)
[docs]
def is_channel_active(self, channel_id: int) -> bool:
with self._lock:
entry = self._sounds.get(channel_id)
if entry is None:
return False
if entry.paused:
return True
return entry.sound.is_playing() and not entry.sound.at_end()
[docs]
def open_stream(
self,
*,
volume_db: float = 0.0,
bus: str = "Master",
buffer_seconds: float = 0.5,
stream: AudioStream | None = None,
) -> int:
"""Open a streaming channel.
``buffer_seconds`` (default 0.5 s) trades latency for underrun
tolerance. At 48 kHz stereo s16 = ~96 KB. Drop to 0.1 s for
low-latency interactive synthesis; bump to 1-2 s for music
streams under heavy CPU load. Producer pushes int16 stereo
bytes via ``feed_audio_chunk``; underrun produces silence
(no glitch). Only applies to the chunk-fed PCM path: when
``stream`` is a compressed container, miniaudio's
``ma_sound_init_from_file(..., MA_SOUND_FLAG_STREAM)`` owns the
buffering internally.
Container routing (when ``stream`` is provided):
* ``"wav"`` / ``"pcm"`` (synthetic) / ``None``: open an
``ma_pcm_rb``-backed sound. Caller feeds raw int16 stereo bytes.
* ``"ogg"`` / ``"mp3"`` / ``"flac"``: open the file directly via
``Sound.from_file(stream=True)``. Subsequent
:meth:`feed_audio_chunk` calls are no-ops for this channel.
* ``"unknown"``: raise :class:`InvalidStreamError`.
Pitch is intentionally not a parameter: streaming sounds are
built with ``pitch_enabled=False`` so the resampler can't read
past what the producer has written. ``set_pitch`` against the
returned channel raises :class:`AudioCapabilityError`.
"""
group = self._ensure_group(bus)
container = stream.container if stream is not None else "wav"
if container in ("ogg", "mp3", "flac"):
path = stream.path if stream is not None else ""
if not path:
raise InvalidStreamError(
f"Native streaming open: compressed container {container!r} "
"requires a file path on the AudioStream"
)
try:
sound = _me.Sound.from_file(
self._engine, path, group=group, stream=True, pitch_enabled=False
)
except Exception as exc:
raise AudioError(
f"open_stream failed for {container} file {path!r}: {exc}"
) from exc
sound.set_volume(_db_to_linear(volume_db))
try:
sound.start()
except Exception as exc:
sound.shutdown()
raise AudioError(f"open_stream sound.start failed: {exc}") from exc
cid = _alloc_channel_id()
with self._lock:
# ``keeper`` is None: there's no Python-side rb to pin. The
# ma_sound itself owns the decoder + buffering. ``streaming``
# stays True so ``set_pitch`` raises the same
# ``AudioCapabilityError`` as the chunk-fed path.
self._sounds[cid] = _SoundEntry(
sound=sound, bus=bus, keeper=None, streaming=True
)
return cid
if container == "unknown":
path = stream.path if stream is not None else "<no-path>"
raise InvalidStreamError(
f"Native streaming open: could not detect audio container for {path!r}. "
"Expected WAV/OGG/MP3/FLAC header."
)
# container in {"wav", "pcm"} (or stream is None: legacy callers
# without a stream object get the chunk-fed PCM ring path).
try:
rb = _me.StreamSource(
self._engine,
sample_rate=self._sample_rate,
channels=self._nchannels,
buffer_seconds=buffer_seconds,
format="s16",
)
sound = _me.Sound.from_stream(self._engine, rb, group=group)
except Exception as exc:
raise AudioError(f"open_stream failed: {exc}") from exc
sound.set_volume(_db_to_linear(volume_db))
try:
sound.start()
except Exception as exc:
sound.shutdown()
rb.shutdown()
raise AudioError(f"open_stream sound.start failed: {exc}") from exc
cid = _alloc_channel_id()
with self._lock:
self._sounds[cid] = _SoundEntry(
sound=sound, bus=bus, keeper=rb, streaming=True
)
return cid
[docs]
def feed_audio_chunk(self, channel_id: int, chunk: bytes) -> None:
"""Push raw int16 stereo bytes into the channel's ring buffer.
Misaligned chunks (length not a multiple of bytes-per-frame)
have the trailing partial frame trimmed. When the ring is full
the excess is silently dropped: callers can poll
``entry.keeper.available_write_frames`` if they need to know.
"""
if not chunk:
return
with self._lock:
entry = self._sounds.get(channel_id)
if entry is None or entry.keeper is None:
return
if not isinstance(entry.keeper, _me.StreamSource):
return
try:
entry.keeper.write(chunk)
except Exception as exc:
raise_or_warn(
exc,
key="audio.native.feed_audio_chunk.ring_write_failed",
message="MiniaudioBackend.feed_audio_chunk: ring write failed",
)
# ------------------------------------------------------------------
# 3D listener: drives the native ma_engine spatializer.
# ------------------------------------------------------------------
[docs]
def set_listener_position(self, x: float, y: float, z: float) -> None:
"""Forward listener position to the native ma_engine spatializer."""
self._engine.set_listener_position(float(x), float(y), float(z))
[docs]
def set_listener_velocity(self, x: float, y: float, z: float) -> None:
"""Forward listener velocity (m/s) to the native engine for Doppler."""
self._engine.set_listener_velocity(float(x), float(y), float(z))
[docs]
def set_listener_direction(self, x: float, y: float, z: float) -> None:
"""Forward listener forward vector to the native engine."""
self._engine.set_listener_direction(float(x), float(y), float(z))
[docs]
def set_listener_world_up(self, x: float, y: float, z: float) -> None:
"""Forward listener world-up vector to the native engine."""
self._engine.set_listener_world_up(float(x), float(y), float(z))
[docs]
def shutdown(self) -> None:
with self._lock:
entries = list(self._sounds.values())
self._sounds.clear()
chains = list(self._effect_chains.values())
self._effect_chains.clear()
groups = list(self._groups.values())
self._groups.clear()
for entry in entries:
try:
entry.sound.shutdown()
except Exception as exc:
raise_or_warn(
exc,
key="audio.native.shutdown.sound_cleanup_failed",
message="MiniaudioBackend.shutdown: sound cleanup failed",
)
for chain in chains:
try:
chain.shutdown()
except Exception as exc:
raise_or_warn(
exc,
key="audio.native.shutdown.effect_chain_cleanup_failed",
message="MiniaudioBackend.shutdown: effect chain cleanup failed",
)
for group in groups:
try:
group.shutdown()
except Exception as exc:
raise_or_warn(
exc,
key="audio.native.shutdown.group_cleanup_failed",
message="MiniaudioBackend.shutdown: group cleanup failed",
)
try:
self._engine.shutdown()
except Exception as exc:
raise_or_warn(
exc,
key="audio.native.shutdown.engine_cleanup_failed",
message="MiniaudioBackend.shutdown: engine cleanup failed",
)
[docs]
def list_capabilities(self) -> frozenset[Capability]:
return _DESKTOP_CAPABILITIES | _NATIVE_EFFECT_CAPABILITIES
[docs]
def sync_bus_layout(self, layout: AudioBusLayout) -> None:
"""Reconcile native ``ma_sound_group`` volumes + effect chains against `layout`.
Cheap and idempotent: pushes only on change. Called by the
audio server every frame; safe to call manually.
"""
for bus in layout.buses:
# Bus volume / mute / routing.
current = (bus.volume_db, bus.mute, bus.send_to)
previous = self._bus_snapshot.get(bus.name)
if previous != current:
self._bus_snapshot[bus.name] = current
group = self._ensure_group(bus.name)
if group is not None:
effective = bus.effective_linear_volume
try:
group.set_volume(effective)
except Exception as exc:
raise_or_warn(
exc,
key="audio.native.sync_bus_layout.set_volume_failed",
message=f"MiniaudioBackend.sync_bus_layout: set_volume failed for bus {bus.name!r}",
)
# Effect chain reconciliation. Chain rebuilds only on signature
# change; cheap if the effects list is unchanged.
group = self._ensure_group(bus.name)
if group is None:
continue
chain = self._effect_chains.get(bus.name)
if chain is None:
# Only allocate a chain object when there's at least one
# effect to install (avoids per-bus overhead when buses
# have no effects).
if not bus.effects:
continue
chain = _NativeEffectChain(self._engine, group, _me.engine_endpoint(self._engine))
self._effect_chains[bus.name] = chain
try:
chain.reconcile(bus.effects)
except Exception as exc:
raise_or_warn(
exc,
key="audio.native.sync_bus_layout.effect_chain_rebuild_failed",
message=f"MiniaudioBackend.sync_bus_layout: effect chain rebuild failed for bus {bus.name!r}",
)
# ===========================================================================
# Null (silent) backend
# ===========================================================================
[docs]
class NullAudioBackend:
"""Silent backend implementing :class:`AudioPlaybackBackend` + :class:`AudioBusBackend`.
Selected automatically by :func:`make_backend` when neither the native
extension nor the legacy ``miniaudio`` path can start (e.g. no audio
device on the host, ALSA/Pulse not running, the ``miniaudio`` Python
package not installed, or a sandboxed environment with no compiler).
The engine, scene tree, and :class:`AudioStreamPlayer` nodes all
operate normally: calls return valid channel IDs and
:meth:`is_channel_active` behaves consistently, but nothing is
rendered to a device.
NullBackend does NOT implement :class:`AudioStreamingBackend`. Code
paths that need streaming (procedural synth via
:class:`AudioSynth.attach_to`, AudioWorklet feeds, etc.) must check
``isinstance(backend, AudioStreamingBackend)`` and raise / warn-once
when it's absent. Advertised capabilities are narrowed to
``{Capability.PLAY_BASIC}`` so effect modules don't try to
instantiate native nodes on the null path.
"""
def __init__(
self,
sample_rate: int = _DEFAULT_SAMPLE_RATE,
nchannels: int = _DEFAULT_CHANNELS,
):
self._sample_rate = sample_rate
self._nchannels = nchannels
self._lock = threading.Lock()
self._channels: set[int] = set()
self._paused: set[int] = set()
# Symmetric atexit registration: no audio thread to leak here, but
# keeps shutdown semantics consistent across all three backends so
# tests can iterate ``_atexit_backends`` and assert every active
# backend gets joined at interpreter exit.
_register_atexit_shutdown(self)
def _alloc(self) -> int:
cid = _alloc_channel_id()
with self._lock:
self._channels.add(cid)
return cid
[docs]
def play_audio(
self,
stream: AudioStream,
*,
mode: str = "non_positional",
position: Any = None,
volume_db: float = 0.0,
pitch: float = 1.0,
loop: bool = False,
bus: str = "Master",
max_distance: float = 100.0,
from_position: float = 0.0,
pan: float = 0.0,
gain_db: float = 0.0,
) -> int | None:
# NullBackend is silent regardless of any kwarg; accept the full
# signature so AudioPlaybackBackend Protocol parity holds.
return self._alloc()
[docs]
def stop_audio(self, channel_id: int) -> None:
with self._lock:
self._channels.discard(channel_id)
self._paused.discard(channel_id)
[docs]
def pause_audio(self, channel_id: int) -> None:
with self._lock:
if channel_id in self._channels:
self._paused.add(channel_id)
[docs]
def resume_audio(self, channel_id: int) -> None:
with self._lock:
self._paused.discard(channel_id)
[docs]
def update_audio_2d(self, channel_id: int, volume_db: float, pan: float) -> None:
return
[docs]
def update_audio_3d(
self, channel_id: int, volume_db: float, pan: float, pitch: float
) -> None:
return
[docs]
def set_pitch(self, channel_id: int, pitch: float) -> None:
return
# Listener pose endpoints: Null backend has no spatializer.
[docs]
def set_listener_position(self, x: float, y: float, z: float) -> None: return
[docs]
def set_listener_velocity(self, x: float, y: float, z: float) -> None: return
[docs]
def set_listener_direction(self, x: float, y: float, z: float) -> None: return
[docs]
def set_listener_world_up(self, x: float, y: float, z: float) -> None: return
[docs]
def get_playback_position(self, channel_id: int) -> float:
return 0.0
[docs]
def is_channel_active(self, channel_id: int) -> bool:
with self._lock:
return channel_id in self._channels
[docs]
def shutdown(self) -> None:
with self._lock:
self._channels.clear()
self._paused.clear()
[docs]
def list_capabilities(self) -> frozenset[Capability]:
# The null backend supports only the unconditional "play.basic"
# advertisement: every call succeeds (silently) and returns a
# valid channel id. Spatial / streaming / effect capabilities are
# deliberately omitted so callers that gate on them raise
# AudioCapabilityError or skip work rather than producing the
# illusion of a working pipeline.
return frozenset({Capability.PLAY_BASIC})
[docs]
def sync_bus_layout(self, layout: AudioBusLayout) -> None:
return
# ===========================================================================
# Factory
# ===========================================================================
_fallback_warned = False
# Weak-set of native backends still alive at interpreter shutdown. Each
# entry's shutdown() is called via atexit so miniaudio's audio thread
# is joined cleanly even when callers bypass `App.quit()` via sys.exit
# (the long-standing pattern noted in feedback_app_quit_pattern.md).
_atexit_backends: weakref.WeakSet[Any] = weakref.WeakSet()
_atexit_registered = False
def _register_atexit_shutdown(backend) -> None:
global _atexit_registered
_atexit_backends.add(backend)
if _atexit_registered:
return
_atexit_registered = True
def _shutdown_all_atexit():
for be in list(_atexit_backends):
try:
be.shutdown()
except Exception as exc:
raise_or_warn(
exc,
key="audio.native.atexit.backend_shutdown_failed",
message="atexit: backend shutdown failed",
)
atexit.register(_shutdown_all_atexit)
_NATIVE_REBUILD_HINT = (
"Run `uv pip install --reinstall -e packages/core` after installing a "
"C compiler to enable low-latency audio, or `uv run --with setuptools "
"simvx build-audio` to rebuild without reinstalling."
)
def _legacy_allowed() -> bool:
"""Read ``SIMVX_ALLOW_LEGACY_AUDIO`` fresh each call (tests monkeypatch it)."""
return os.environ.get("SIMVX_ALLOW_LEGACY_AUDIO", "1") != "0"
[docs]
def make_backend(
sample_rate: int = _DEFAULT_SAMPLE_RATE,
nchannels: int = _DEFAULT_CHANNELS,
) -> MiniaudioBackend | _LegacyMiniaudioBackend | NullAudioBackend:
"""Pick the best available audio backend, falling back loudly on failure.
Resolution order (the runtime **never** invokes a C compiler: that's the
install-time build hook's job):
1. **Native** ``MiniaudioBackend`` (~20 ms latency). Selected when the
compiled ``_simvx_miniaudio_engine`` extension imports cleanly.
2. **Legacy** ``_LegacyMiniaudioBackend`` (~100 ms latency). Selected
when native is unavailable AND ``SIMVX_ALLOW_LEGACY_AUDIO`` is not
set to ``"0"``. Emits a one-time WARNING with rebuild instructions.
3. **Null** ``NullAudioBackend`` (silent). Selected when neither native
nor legacy can start: typically a sandboxed CI without an audio
device. Emits another one-time WARNING.
Set ``SIMVX_ALLOW_LEGACY_AUDIO=0`` to refuse the fallback chain and
raise :class:`AudioBackendUnavailable` if the native extension is
missing or fails to initialise.
"""
global _fallback_warned
# --- 1. native ----------------------------------------------------------
if _me.is_available():
try:
return MiniaudioBackend(sample_rate=sample_rate, nchannels=nchannels)
except Exception as exc:
if not _legacy_allowed():
raise AudioBackendUnavailable(
f"Native audio backend failed to initialise: {exc}. "
f"SIMVX_ALLOW_LEGACY_AUDIO=0 is set so the legacy fallback is "
f"disabled. {_NATIVE_REBUILD_HINT}"
) from exc
log.warning(
"MiniaudioBackend init failed (%s); trying legacy mixer.", exc
)
elif not _legacy_allowed():
raise AudioBackendUnavailable(
"Native audio extension is not built and SIMVX_ALLOW_LEGACY_AUDIO=0 "
f"is set. {_NATIVE_REBUILD_HINT}"
)
# --- 2. legacy ----------------------------------------------------------
if not _fallback_warned:
_fallback_warned = True
log.warning(
"Native audio extension not available; using legacy mixer "
"(100 ms latency). %s Or set SIMVX_ALLOW_LEGACY_AUDIO=0 to fail "
"loudly instead.",
_NATIVE_REBUILD_HINT,
)
try:
return _LegacyMiniaudioBackend(sample_rate=sample_rate, nchannels=nchannels)
except Exception as exc:
log.warning(
"_LegacyMiniaudioBackend init failed (%s); using silent NullAudioBackend. "
"Audio will be inaudible but the engine will run normally.",
exc,
)
# --- 3. null ------------------------------------------------------------
return NullAudioBackend(sample_rate=sample_rate, nchannels=nchannels)