Source code for simvx.core.audio_backend

"""Desktop audio backends for SimVX.

Three backends ship in this module:

- ``MiniaudioBackend``: the production path. Mixes audio natively via
  ``ma_engine`` (CFFI wrapper at ``simvx.core._native.miniaudio_engine``).
  The native extension is built at ``uv pip install`` time by the PEP 517
  build hook in ``packages/core/build_audio_ext_hook.py``. If that build
  is skipped or fails, the runtime falls back to legacy.
- ``_LegacyMiniaudioBackend``: the pure-Python fallback. Mixes in a
  numpy generator callback driven by ``miniaudio.PlaybackDevice``. Slower
  (target latency 100 ms vs 20 ms for the native path) and runs on the
  Python audio thread under the GIL, so it's vulnerable to underruns
  from heavy main-thread frames.
- ``NullAudioBackend``: silent no-op backend used when no audio device
  is available (sandboxed CI, headless containers).

``make_backend()`` resolution order, with **loud** warnings at every
fallback (never silent degradation):

1. Native ``MiniaudioBackend``: picked when the extension is importable.
2. Legacy ``_LegacyMiniaudioBackend``: picked when native is unavailable
   AND a real audio device opens. Emits a one-time WARNING explaining the
   latency hit and the rebuild command.
3. Null ``NullAudioBackend``: picked when neither native nor legacy can
   start (no audio device). Emits a one-time WARNING.

Set ``SIMVX_ALLOW_LEGACY_AUDIO=0`` to disable the legacy/null fallbacks
and raise :class:`AudioBackendUnavailable` instead.

The runtime never invokes a C compiler. Use ``simvx build-audio`` (or
``uv pip install --reinstall -e packages/core``) to (re)build the
extension manually.
"""

from __future__ import annotations

import atexit
import logging
import os
import threading
import weakref
from dataclasses import dataclass, field
from typing import TYPE_CHECKING, Any

import numpy as np

from ._native import miniaudio_engine as _me
from .audio_errors import (
    AudioBackendUnavailable,
    AudioCapabilityError,
    AudioError,
    InvalidStreamError,
    raise_or_warn,
    warn_once,
)
from .audio_protocol import CAPABILITIES_CORE, Capability

if TYPE_CHECKING:
    from .audio import AudioStream
    from .audio_bus import AudioBusLayout

log = logging.getLogger(__name__)

__all__ = [
    "MiniaudioBackend",
    "_LegacyMiniaudioBackend",
    "NullAudioBackend",
    "make_backend",
]


_DEFAULT_SAMPLE_RATE = 48000
_DEFAULT_CHANNELS = 2

# Per-channel high-water mark for the legacy backend's streaming buffer.
# Producer code that overfeeds (main thread runs faster than the audio
# period) used to grow the buffer without bound; now we drop oldest bytes
# on overflow and warn-once per channel. Override via
# ``SIMVX_AUDIO_STREAM_BUFFER_BYTES`` (positive int, bytes).
_DEFAULT_STREAM_BUFFER_MAX_BYTES = 1 << 20  # 1 MiB


def _stream_buffer_cap() -> int:
    """Resolve the legacy streaming buffer cap from env / default.

    Reads ``SIMVX_AUDIO_STREAM_BUFFER_BYTES`` on every call so tests can
    monkeypatch the env mid-suite. Falls back to the default if the value
    is missing or unparseable.
    """
    raw = os.environ.get("SIMVX_AUDIO_STREAM_BUFFER_BYTES")
    if raw:
        try:
            cap = int(raw)
            if cap > 0:
                return cap
        except ValueError:
            pass
    return _DEFAULT_STREAM_BUFFER_MAX_BYTES

# Streaming-capable compressed formats: only the native (miniaudio C
# extension) backend can decode these on-the-fly. The legacy pure-Python
# mixer would need its own streaming decoder, which doesn't exist.
_COMPRESSED_STREAMING_FORMATS: frozenset[Capability] = frozenset(
    {Capability.STREAMING_OGG, Capability.STREAMING_MP3, Capability.STREAMING_FLAC}
)

# Capabilities advertised by the *native* desktop backend.
_DESKTOP_CAPABILITIES: frozenset[Capability] = (
    CAPABILITIES_CORE | _COMPRESSED_STREAMING_FORMATS | frozenset({Capability.SPATIAL_DOPPLER})
)

# Capabilities advertised by the *legacy* (pure-Python) desktop backend.
# Drops the compressed-streaming formats since the mixer doesn't carry a
# decoder: ``open_stream`` raises :class:`AudioCapabilityError` for them
# rather than feeding container bytes through ``np.frombuffer(dtype=int16)``
# and producing noise.
_LEGACY_CAPABILITIES: frozenset[Capability] = (
    CAPABILITIES_CORE | frozenset({Capability.SPATIAL_DOPPLER})
)

# Effect capabilities the *native* backend additionally supports: built-in
# ma_*_node filters/delay plus custom DSP nodes (freeverb, compressor,
# soft-clip). Legacy backend doesn't advertise these: the Python mixer
# can't run them at acceptable performance.
_NATIVE_EFFECT_CAPABILITIES: frozenset[Capability] = frozenset(
    {
        Capability.EFFECT_GAIN,
        Capability.EFFECT_FILTER_BIQUAD,
        Capability.EFFECT_DELAY,
        Capability.EFFECT_PARAMETRIC_EQ,
        Capability.EFFECT_REVERB,
        Capability.EFFECT_SOFTCLIP,
        Capability.EFFECT_COMPRESSOR,
    }
)


# ---------------------------------------------------------------------------
# Shared helpers
# ---------------------------------------------------------------------------


def _db_to_linear(db: float) -> float:
    """Convert decibels to linear gain. ``db <= -80`` is treated as silence."""
    if db <= -80.0:
        return 0.0
    return 10.0 ** (db / 20.0)


def _decode_stream_to_ndarray(
    stream: AudioStream, sample_rate: int, nchannels: int
) -> np.ndarray | None:
    """Decode an AudioStream into float32 interleaved samples.

    Used by the legacy backend (which mixes in Python) and by the native
    backend when the source is an ndarray-backed ``AudioStream`` (e.g.
    ``AudioStream.from_pcm`` or anything baked by ``AudioSynth``).
    File-backed streams on the native path go through
    ``ma_sound_init_from_file`` instead.
    """
    import miniaudio

    if stream.backend_data is not None and isinstance(stream.backend_data, np.ndarray):
        return stream.backend_data

    path = stream.path
    if not path:
        return None

    try:
        decoded = miniaudio.decode_file(
            path,
            output_format=miniaudio.SampleFormat.SIGNED16,
            nchannels=nchannels,
            sample_rate=sample_rate,
        )
    except (miniaudio.DecodeError, FileNotFoundError, OSError) as exc:
        log.warning("audio_backend: failed to decode %r: %s", path, exc)
        return None

    raw = np.frombuffer(decoded.samples, dtype=np.int16).astype(np.float32) / 32768.0
    stream.backend_data = raw
    return raw


# ===========================================================================
# Legacy (pure-Python) backend
# ===========================================================================


@dataclass
class _Channel:
    """A single active voice in the legacy mixer."""

    samples: np.ndarray  # float32, interleaved stereo
    cursor: int = 0
    total_frames: int = 0
    volume: float = 1.0
    pan: float = 0.0
    pitch: float = 1.0
    loop: bool = False
    paused: bool = False
    stopped: bool = False
    bus: str = "Master"
    streaming: bool = False
    stream_buffer: bytearray = field(default_factory=bytearray)


# Monotonic channel id counter (shared across legacy + native: both keep
# `dict[int, ...]` lookups so the global counter avoids collisions).
_next_id = 0
_id_lock = threading.Lock()


def _alloc_channel_id() -> int:
    global _next_id
    with _id_lock:
        _next_id += 1
        return _next_id


class _LegacyMiniaudioBackend:
    """Pure-Python audio mixer running on miniaudio's audio thread.

    Buffers at 100 ms latency to survive GIL stalls. Kept as a fallback
    for environments without the native extension built.
    """

    def __init__(
        self, sample_rate: int = _DEFAULT_SAMPLE_RATE, nchannels: int = _DEFAULT_CHANNELS
    ):
        import miniaudio

        self._sample_rate = sample_rate
        self._nchannels = nchannels
        self._lock = threading.Lock()
        self._channels: dict[int, _Channel] = {}

        self._device = miniaudio.PlaybackDevice(
            output_format=miniaudio.SampleFormat.SIGNED16,
            nchannels=nchannels,
            sample_rate=sample_rate,
            buffersize_msec=100,
        )
        gen = self._audio_callback()
        next(gen)
        self._device.start(gen)
        # Tracks whether shutdown has been called: flipped by ``shutdown``,
        # asserted by tests. miniaudio's audio thread is a native C thread
        # (not visible to Python's threading module), but PlaybackDevice.close
        # joins it synchronously, so after ``shutdown`` the leak is gone.
        self._shutdown_done: bool = False
        # Belt-and-braces: register an atexit hook so a `sys.exit(0)` from
        # anywhere (test runners, demos that bypass App.quit()) still
        # shuts down the audio thread. Captured by weakref so the hook
        # doesn't pin the backend alive. Without this the legacy fallback
        # reproduces the original `feedback_app_quit_pattern.md` hang.
        _register_atexit_shutdown(self)

    def _audio_callback(self):
        out_bytes = b""
        while True:
            num_frames = yield out_bytes
            if num_frames is None or num_frames <= 0:
                out_bytes = b""
                continue

            total_samples = num_frames * self._nchannels
            mix = np.zeros(total_samples, dtype=np.float32)
            bus_gains = self._snapshot_bus_gains()

            with self._lock:
                dead: list[int] = []
                for ch_id, ch in self._channels.items():
                    if ch.stopped or ch.paused:
                        if ch.stopped:
                            dead.append(ch_id)
                        continue
                    if ch.streaming:
                        self._mix_streaming(ch, mix, num_frames, bus_gains)
                        continue
                    self._mix_channel(ch, mix, num_frames, bus_gains)
                    if ch.stopped:
                        dead.append(ch_id)
                for ch_id in dead:
                    del self._channels[ch_id]

            np.clip(mix, -1.0, 1.0, out=mix)
            out_bytes = (mix * 32767).astype(np.int16).tobytes()

    def _mix_channel(
        self, ch: _Channel, mix: np.ndarray, num_frames: int, bus_gains: dict[str, float]
    ) -> None:
        samples = ch.samples
        n_total = ch.total_frames
        cursor = ch.cursor
        pitch = max(0.1, ch.pitch)

        # Exact lookup: bus names are case-sensitive and the layout is
        # the authoritative source. A miss means a player referenced a
        # bus that doesn't exist; surface that as :class:`UnknownBusError`
        # so the caller fixes the typo or registers the bus.
        if ch.bus not in bus_gains:
            from .audio_errors import UnknownBusError
            raise UnknownBusError(ch.bus, available=list(bus_gains.keys()))
        vol = ch.volume * bus_gains[ch.bus]
        left_gain = vol * min(1.0, 1.0 - ch.pan)
        right_gain = vol * min(1.0, 1.0 + ch.pan)

        if abs(pitch - 1.0) < 0.01:
            remaining = n_total - cursor
            if remaining <= 0:
                if ch.loop:
                    ch.cursor = 0
                    cursor = 0
                    remaining = n_total
                else:
                    ch.stopped = True
                    return

            frames_to_copy = min(num_frames, remaining)
            src_start = cursor * 2
            src_end = src_start + frames_to_copy * 2
            chunk = samples[src_start:src_end].copy()
            chunk[0::2] *= left_gain
            chunk[1::2] *= right_gain
            mix[: frames_to_copy * 2] += chunk
            ch.cursor = cursor + frames_to_copy

            if frames_to_copy < num_frames and ch.loop:
                ch.cursor = 0
                remainder_mix = mix[frames_to_copy * 2 :]
                self._mix_channel(ch, remainder_mix, num_frames - frames_to_copy, bus_gains)
            elif ch.cursor >= n_total and not ch.loop:
                ch.stopped = True
        else:
            positions = float(cursor) + np.arange(num_frames, dtype=np.float64) * pitch

            if ch.loop:
                positions %= n_total
                int_pos = positions.astype(np.int64)
                next_pos = (int_pos + 1) % n_total
                n = num_frames
            else:
                n = int(np.searchsorted(positions, n_total, side="left"))
                if n <= 0:
                    ch.stopped = True
                    ch.cursor = n_total
                    return
                positions = positions[:n]
                int_pos = positions.astype(np.int64)
                next_pos = np.minimum(int_pos + 1, n_total - 1)
                if n < num_frames:
                    ch.stopped = True

            frac = (positions - int_pos).astype(np.float32)
            left = samples[0::2]
            right = samples[1::2]
            left_out = left[int_pos] + frac * (left[next_pos] - left[int_pos])
            right_out = right[int_pos] + frac * (right[next_pos] - right[int_pos])
            mix[: n * 2 : 2] += left_out * left_gain
            mix[1 : n * 2 : 2] += right_out * right_gain

            if ch.loop:
                ch.cursor = int((float(cursor) + num_frames * pitch) % n_total)
            else:
                ch.cursor = int(float(cursor) + n * pitch)

    def _mix_streaming(
        self, ch: _Channel, mix: np.ndarray, num_frames: int, bus_gains: dict[str, float]
    ) -> None:
        buf = ch.stream_buffer
        bytes_needed = num_frames * self._nchannels * 2
        available = min(len(buf), bytes_needed)
        if available < 4:
            return

        available -= available % (self._nchannels * 2)
        raw = np.frombuffer(buf[:available], dtype=np.int16).astype(np.float32) / 32768.0

        if ch.bus not in bus_gains:
            from .audio_errors import UnknownBusError
            raise UnknownBusError(ch.bus, available=list(bus_gains.keys()))
        vol = ch.volume * bus_gains[ch.bus]
        left_gain = vol * min(1.0, 1.0 - ch.pan)
        right_gain = vol * min(1.0, 1.0 + ch.pan)
        raw[0::2] *= left_gain
        raw[1::2] *= right_gain
        mix[: len(raw)] += raw
        del buf[:available]

    def play_audio(
        self,
        stream: AudioStream,
        *,
        mode: str = "non_positional",
        position: Any = None,
        volume_db: float = 0.0,
        pitch: float = 1.0,
        loop: bool = False,
        bus: str = "Master",
        max_distance: float = 100.0,
        from_position: float = 0.0,
        pan: float = 0.0,
        gain_db: float = 0.0,
    ) -> int | None:
        # ``mode``, ``position`` and ``max_distance`` are part of the unified
        # Protocol shape; the legacy mixer is non-spatial so spatial Node
        # callers pre-compute ``pan`` + ``volume_db`` from the listener and
        # we just apply those. ``gain_db`` is reserved for future per-play
        # extra-gain trims and currently composes additively with ``volume_db``.
        del mode, position, max_distance
        samples = _decode_stream_to_ndarray(stream, self._sample_rate, self._nchannels)
        if samples is None:
            return None
        ch = _Channel(
            samples=samples,
            total_frames=len(samples) // self._nchannels,
            volume=_db_to_linear(volume_db + gain_db),
            # Seed pan on the channel struct *before* it's inserted into
            # ``_channels`` so the audio thread reads the correct value on
            # its very first mix pass. Fixes bug-audio-legacy-spatial-first-frame.
            pan=max(-1.0, min(1.0, float(pan))),
            pitch=pitch,
            loop=loop,
            bus=bus,
        )
        if from_position > 0.0:
            # _Channel.cursor is the frame index the next read draws from;
            # mixer code multiplies by nchannels to index into ``samples``.
            ch.cursor = int(float(from_position) * self._sample_rate)
        ch_id = _alloc_channel_id()
        with self._lock:
            self._channels[ch_id] = ch
        return ch_id

    def stop_audio(self, channel_id: int) -> None:
        with self._lock:
            ch = self._channels.get(channel_id)
            if ch:
                ch.stopped = True

    def pause_audio(self, channel_id: int) -> None:
        with self._lock:
            ch = self._channels.get(channel_id)
            if ch:
                ch.paused = True

    def resume_audio(self, channel_id: int) -> None:
        with self._lock:
            ch = self._channels.get(channel_id)
            if ch:
                ch.paused = False

    def update_audio_2d(self, channel_id: int, volume_db: float, pan: float) -> None:
        with self._lock:
            ch = self._channels.get(channel_id)
            if ch:
                ch.volume = _db_to_linear(volume_db)
                ch.pan = max(-1.0, min(1.0, pan))

    def update_audio_3d(
        self, channel_id: int, volume_db: float, pan: float, pitch: float
    ) -> None:
        with self._lock:
            ch = self._channels.get(channel_id)
            if ch:
                ch.volume = _db_to_linear(volume_db)
                ch.pan = max(-1.0, min(1.0, pan))
                ch.pitch = max(0.1, pitch)

    def set_pitch(self, channel_id: int, pitch: float) -> None:
        with self._lock:
            ch = self._channels.get(channel_id)
            if ch:
                ch.pitch = max(0.1, pitch)

    def get_playback_position(self, channel_id: int) -> float:
        with self._lock:
            ch = self._channels.get(channel_id)
            if ch:
                return ch.cursor / self._sample_rate
        return 0.0

    def is_channel_active(self, channel_id: int) -> bool:
        with self._lock:
            ch = self._channels.get(channel_id)
            return ch is not None and not ch.stopped

    def open_stream(
        self,
        *,
        volume_db: float = 0.0,
        bus: str = "Master",
        buffer_seconds: float = 0.5,
        stream: AudioStream | None = None,
    ) -> int:
        # Legacy path: bytearray-backed; buffer_seconds is informational only
        # (the buffer is whatever feed_audio_chunk has accumulated). Pitch
        # is not a kwarg: call ``set_pitch(channel, pitch)`` after open
        # to modulate playback rate.
        del buffer_seconds  # silence linter; the kwarg is here for Protocol parity

        # The pure-Python mixer has no compressed-format decoder. Streaming a
        # WAV (or feeding raw PCM) works because the mixer reads bytes
        # straight as int16 from the producer; compressed formats would
        # need a streaming decoder that doesn't exist on this path. Fail
        # loud rather than producing the noise the bug-fix audit caught.
        if stream is not None:
            container = stream.container
            if container in ("ogg", "mp3", "flac", "unknown"):
                raise AudioCapabilityError(
                    f"open_stream({container})",
                    backend="_LegacyMiniaudioBackend",
                    advertised=self.list_capabilities(),
                    remediation=(
                        "Legacy backend only supports WAV streaming. Install the "
                        "native extension (uv run --with setuptools simvx build-audio) "
                        "or convert the file to WAV."
                    ),
                )

        ch = _Channel(
            samples=np.empty(0, dtype=np.float32),
            streaming=True,
            volume=_db_to_linear(volume_db),
            bus=bus,
        )
        ch_id = _alloc_channel_id()
        with self._lock:
            self._channels[ch_id] = ch
        return ch_id

    def feed_audio_chunk(self, channel_id: int, chunk: bytes) -> None:
        cap = _stream_buffer_cap()
        with self._lock:
            ch = self._channels.get(channel_id)
            if ch and ch.streaming:
                ch.stream_buffer.extend(chunk)
                # Bound the buffer: drop oldest bytes on overflow so playback
                # stays smooth instead of OOM-ing. Trim along a frame boundary
                # so the next mix doesn't read half a sample.
                overflow = len(ch.stream_buffer) - cap
                if overflow > 0:
                    bytes_per_frame = self._nchannels * 2
                    # Round up to the next frame boundary; safe even if cap is
                    # not frame-aligned because we then drop slightly more.
                    drop = overflow + ((-overflow) % bytes_per_frame)
                    if drop > len(ch.stream_buffer):
                        drop = len(ch.stream_buffer)
                    del ch.stream_buffer[:drop]
                    warn_once(
                        f"audio.legacy.stream_overrun.ch{channel_id}",
                        "Legacy streaming buffer overrun on channel %d; "
                        "dropping oldest %d bytes (raise "
                        "SIMVX_AUDIO_STREAM_BUFFER_BYTES to allocate more "
                        "headroom)",
                        channel_id,
                        drop,
                    )

    def shutdown(self) -> None:
        try:
            if self._device is not None:
                self._device.close()
        except Exception as exc:
            raise_or_warn(
                exc,
                key="audio.legacy.shutdown.device_close_failed",
                message="Legacy audio backend: failed to close playback device",
            )
        with self._lock:
            self._channels.clear()
        self._shutdown_done = True

    def list_capabilities(self) -> frozenset[Capability]:
        return _LEGACY_CAPABILITIES

    def sync_bus_layout(self, layout: AudioBusLayout) -> None:
        # Bus state already polled per audio period via _snapshot_bus_gains.
        return

    # Listener pose endpoints: legacy mixer has no spatializer, so accept
    # the call and store nothing. AudioListener3D pushes these every frame;
    # we return success rather than raise because the backend is correctly
    # honouring the contract (no spatial capability advertised).
    def set_listener_position(self, x: float, y: float, z: float) -> None: return
    def set_listener_velocity(self, x: float, y: float, z: float) -> None: return
    def set_listener_direction(self, x: float, y: float, z: float) -> None: return
    def set_listener_world_up(self, x: float, y: float, z: float) -> None: return

    def _snapshot_bus_gains(self) -> dict[str, float]:
        """Return ``{bus_name: linear_gain}`` for every bus in the active layout.

        Lookup in the mixer is exact / case-sensitive. A channel that
        references a bus not present in the layout raises
        :class:`UnknownBusError` from :meth:`_render_audio` rather than
        silently falling through to Master gain.
        """
        from .audio_bus import AudioBusLayout

        layout = AudioBusLayout.get_default()
        return {bus.name: bus.effective_linear_volume for bus in layout.buses}


# ===========================================================================
# Native (ma_engine) backend
# ===========================================================================


@dataclass
class _SoundEntry:
    """One active voice on the native backend."""

    sound: Any  # _me.Sound
    bus: str
    keeper: Any = None  # AudioBuffer (pin against GC) when buffer-backed
    paused: bool = False
    # True for channels opened via ``open_stream`` (chunk-fed PCM). Native
    # streams build their Sound with ``pitch_enabled=False`` so the resampler
    # can't read past what the producer has written: that makes ``set_pitch``
    # against a streaming channel a guaranteed no-op. Used to raise
    # ``AudioCapabilityError`` instead of silently dropping the change.
    streaming: bool = False
    # Pre-paused position so a later `resume_audio` can restart from the cursor.
    # ma_sound_start auto-resumes from the internal cursor after ma_sound_stop;
    # we just need to remember whether we deliberately paused.


class _NativeEffectChain:
    """Per-bus chain of native ma_*_node effects between a SoundGroup and the destination.

    Chain shape::

        group_node → effect[0] → effect[1] → … → effect[N-1] → dest_node

    Empty effect list collapses to a direct ``group_node → dest_node`` link.
    The chain is rebuilt in place whenever the bus's effect list (signature)
    changes; cheap because each rebuild detaches the source group from its
    current next and re-attaches through the new nodes.

    Effects with `effect.enabled = False` are skipped during rebuild
    (the chain looks like they aren't there at all).
    """

    def __init__(self, engine, group, destination):
        self._engine = engine
        self._group = group  # _me.SoundGroup
        self._destination = destination  # ma_node* (engine endpoint or parent group)
        self._nodes: list[Any] = []  # _me.EffectNode list, in order
        self._signature: list[tuple] = []  # captured from each effect for diff
        # GainEffect is a non-node effect that scales the output bus
        # volume on the final attach. Track the cumulative scalar so multiple
        # gain stages compose. (FadeEffect was removed in the audio refactor,
        # per-player fades live on AudioStreamPlayer.fade_in/fade_out.)
        self._output_volume: float = 1.0

    @staticmethod
    def _effect_signature(effect) -> tuple:
        """Hashable signature for an effect; rebuild only on real changes.

        Float comparisons are exact: chain rebuilds on every nudge are
        acceptable because the reconciler is hash-based and cheap
        (signature tuples are hashable, reconcile short-circuits when
        nothing changed). The previous ``round(x, 3)`` quantisation
        silently dropped continuous filter sweeps below 0.001.
        """
        from .audio_effect import (
            BandPassFilter,
            CompressorEffect,
            DelayEffect,
            GainEffect,
            HighPassFilter,
            LowPassFilter,
            NotchFilter,
            ParametricEQ,
            ReverbEffect,
            SoftClipEffect,
        )

        if isinstance(effect, GainEffect):
            return ("gain", effect.volume_db, effect.enabled)
        if isinstance(effect, LowPassFilter):
            return ("lowpass", effect.cutoff_hz, effect.q, effect.enabled)
        if isinstance(effect, HighPassFilter):
            return ("highpass", effect.cutoff_hz, effect.q, effect.enabled)
        if isinstance(effect, BandPassFilter):
            return ("bandpass", effect.cutoff_hz, effect.q, effect.enabled)
        if isinstance(effect, NotchFilter):
            return ("notch", effect.cutoff_hz, effect.q, effect.enabled)
        if isinstance(effect, DelayEffect):
            return ("delay", effect.time_seconds, effect.feedback,
                    effect.wet, effect.dry, effect.enabled)
        if isinstance(effect, ParametricEQ):
            return ("paramteq", tuple(
                (b.type, b.freq, b.q, b.gain_db)
                for b in effect.bands
            ), effect.enabled)
        if isinstance(effect, ReverbEffect):
            return ("reverb", effect.room_size, effect.damping,
                    effect.wet, effect.dry,
                    effect.width, bool(effect.freeze), effect.enabled)
        if isinstance(effect, SoftClipEffect):
            return ("softclip", effect.drive, effect.output_gain, effect.enabled)
        if isinstance(effect, CompressorEffect):
            return ("compressor", effect.threshold_db, effect.ratio,
                    effect.attack_ms, effect.release_ms,
                    effect.knee_db, effect.makeup_db, effect.enabled)
        return ("unknown", id(effect))

    def reconcile(self, effects: list) -> None:
        """Update the native chain to match `effects`.

        Cheap if the signature didn't change. Otherwise tears down and
        rebuilds the chain via ma_node_attach_output_bus.
        """
        new_sig = [self._effect_signature(e) for e in effects]
        if new_sig == self._signature:
            return

        self._teardown()
        self._signature = new_sig
        self._output_volume = 1.0

        group_node = _me.sound_group_as_node(self._group)
        prev_node = group_node

        for effect in effects:
            if not effect.enabled:
                continue
            built = self._build_effect_node(effect)
            if built is None:
                # Could be a non-node effect (Gain / Fade): handled separately.
                if hasattr(effect, "volume_db") and effect.__class__.__name__ == "GainEffect":
                    self._output_volume *= 10.0 ** (effect.volume_db / 20.0)
                continue
            _me.attach_node(prev_node, 0, built.handle, 0)
            self._nodes.append(built)
            prev_node = built.handle

        # Final attach: chain tail → destination, with cumulative output_volume.
        _me.attach_node(prev_node, 0, self._destination, 0)
        if abs(self._output_volume - 1.0) > 1e-6:
            try:
                _me.set_node_output_volume(prev_node, 0, self._output_volume)
            except Exception as exc:
                raise_or_warn(
                    exc,
                    key="audio.native.chain.output_volume_failed",
                    message="Native effect chain: failed to set node output volume",
                )

    def _build_effect_node(self, effect):
        """Construct one native EffectNode for `effect`, or None if not supported."""
        from .audio_effect import (
            BandPassFilter,
            CompressorEffect,
            DelayEffect,
            HighPassFilter,
            LowPassFilter,
            NotchFilter,
            ParametricEQ,
            ReverbEffect,
            SoftClipEffect,
        )

        try:
            if isinstance(effect, LowPassFilter):
                return _me.LPFEffectNode(self._engine, cutoff_hz=effect.cutoff_hz, q=effect.q)
            if isinstance(effect, HighPassFilter):
                return _me.HPFEffectNode(self._engine, cutoff_hz=effect.cutoff_hz, q=effect.q)
            if isinstance(effect, BandPassFilter):
                return _me.BPFEffectNode(self._engine, cutoff_hz=effect.cutoff_hz, q=effect.q)
            if isinstance(effect, NotchFilter):
                return _me.NotchEffectNode(self._engine, cutoff_hz=effect.cutoff_hz, q=effect.q)
            if isinstance(effect, DelayEffect):
                return _me.DelayEffectNode(
                    self._engine,
                    delay_seconds=effect.time_seconds,
                    decay=effect.feedback,
                    wet=effect.wet,
                    dry=effect.dry,
                )
            if isinstance(effect, ParametricEQ):
                # Use a tiny chain inside one "effect": return a composite-handle wrapper.
                return _EQBandChain(self._engine, effect.bands)
            # Custom DSP nodes
            if isinstance(effect, ReverbEffect):
                return _me.FreeverbEffectNode(
                    self._engine,
                    room_size=effect.room_size,
                    damping=effect.damping,
                    wet=effect.wet,
                    dry=effect.dry,
                    width=effect.width,
                    freeze=effect.freeze,
                )
            if isinstance(effect, SoftClipEffect):
                return _me.SoftClipEffectNode(
                    self._engine,
                    drive=effect.drive,
                    output_gain=effect.output_gain,
                )
            if isinstance(effect, CompressorEffect):
                return _me.CompressorEffectNode(
                    self._engine,
                    threshold_db=effect.threshold_db,
                    ratio=effect.ratio,
                    attack_ms=effect.attack_ms,
                    release_ms=effect.release_ms,
                    knee_db=effect.knee_db,
                    makeup_db=effect.makeup_db,
                )
        except Exception as exc:
            raise AudioCapabilityError(
                capability=f"effect.{type(effect).__name__}",
                backend="MiniaudioBackend",
                advertised=_DESKTOP_CAPABILITIES | _NATIVE_EFFECT_CAPABILITIES,
            ) from exc
        return None

    def _teardown(self) -> None:
        """Tear down the current chain back to the original source → destination link."""
        try:
            group_node = _me.sound_group_as_node(self._group)
            _me.detach_node(group_node, 0)
        except Exception as exc:
            raise_or_warn(
                exc,
                key="audio.native.teardown.group_detach_failed",
                message="Native effect chain teardown: group detach failed",
            )

        for node in self._nodes:
            try:
                _me.detach_node(node.handle, 0)
            except Exception as exc:
                raise_or_warn(
                    exc,
                    key="audio.native.teardown.node_detach_failed",
                    message="Native effect chain teardown: node detach failed",
                )
            try:
                node.shutdown()
            except Exception as exc:
                raise_or_warn(
                    exc,
                    key="audio.native.teardown.node_shutdown_failed",
                    message="Native effect chain teardown: node shutdown failed",
                )
        self._nodes.clear()

        # Restore direct group → destination edge.
        try:
            _me.attach_node(
                _me.sound_group_as_node(self._group), 0, self._destination, 0
            )
        except Exception as exc:
            raise_or_warn(
                exc,
                key="audio.native.teardown.restore_default_link_failed",
                message="Native effect chain teardown: failed to restore default link",
            )

    def shutdown(self) -> None:
        self._teardown()
        self._signature = []


class _EQBandChain:
    """Composite EffectNode for ParametricEQ: chains peak/loshelf/hishelf in series.

    Exposes a single ``handle`` (the chain input) and an ``output_handle``
    (the chain tail) so the bus chain logic can attach to it like one
    effect. Subnodes are shut down together.
    """

    def __init__(self, engine, bands):
        self._engine = engine
        self._subnodes: list[Any] = []
        self._tail = None
        if not bands:
            # Empty EQ collapses to a single noop pass: use a peak node with 0 dB gain.
            self._subnodes.append(_me.PeakEffectNode(engine, freq=1000.0, q=1.0, gain_db=0.0))
            self._tail = self._subnodes[-1].handle
            return
        for band in bands:
            t = (band.type or "peaking").lower()
            if t == "lowshelf":
                node = _me.LowShelfEffectNode(engine, freq=band.freq, q=band.q, gain_db=band.gain_db)
            elif t == "highshelf":
                node = _me.HighShelfEffectNode(engine, freq=band.freq, q=band.q, gain_db=band.gain_db)
            else:  # peaking or unknown
                node = _me.PeakEffectNode(engine, freq=band.freq, q=band.q, gain_db=band.gain_db)
            if self._subnodes:
                _me.attach_node(self._subnodes[-1].handle, 0, node.handle, 0)
            self._subnodes.append(node)
        self._tail = self._subnodes[-1].handle

    @property
    def handle(self):
        # External attach point: chain input.
        return self._subnodes[0].handle if self._subnodes else None

    @property
    def output_handle(self):
        return self._tail

    def shutdown(self) -> None:
        # Tear down in reverse: detach tail-to-tail first, then each subnode.
        for node in reversed(self._subnodes):
            try:
                _me.detach_node(node.handle, 0)
            except Exception as exc:
                raise_or_warn(
                    exc,
                    key="audio.native.eqchain.subnode_detach_failed",
                    message="EQ band chain shutdown: subnode detach failed",
                )
            try:
                node.shutdown()
            except Exception as exc:
                raise_or_warn(
                    exc,
                    key="audio.native.eqchain.subnode_shutdown_failed",
                    message="EQ band chain shutdown: subnode shutdown failed",
                )
        self._subnodes.clear()


[docs] class MiniaudioBackend: """Native-mixed audio backend on top of ``ma_engine``. All mixing, spatialization, and DSP runs in native C: Python only pushes parameter updates. Target latency: ``buffersize_msec=20`` (vs the legacy backend's 100 ms). The native extension must be built once after install: simvx build-audio If the extension is missing, the constructor raises ``MiniaudioEngineUnavailable``. Use ``make_backend()`` for an automatic fallback to ``_LegacyMiniaudioBackend`` with a one-time warning. """ def __init__( self, sample_rate: int = _DEFAULT_SAMPLE_RATE, nchannels: int = _DEFAULT_CHANNELS, ): self._sample_rate = sample_rate self._nchannels = nchannels self._lock = threading.Lock() self._sounds: dict[int, _SoundEntry] = {} self._groups: dict[str, _me.SoundGroup] = {} # Per-bus native effect chains (one per group). Built lazily on # first sync_bus_layout that sees a non-empty effects list. self._effect_chains: dict[str, _NativeEffectChain] = {} # Snapshot of (volume_db, mute, send_to) per bus so sync_bus_layout # is cheap to re-call every frame: only push to native on change. self._bus_snapshot: dict[str, tuple[float, bool, str]] = {} self._engine = _me.Engine( sample_rate=sample_rate, channels=nchannels, device=True ) # Seed default groups so unfamiliar bus names play at master gain. self._ensure_default_groups() # Belt-and-braces: register an atexit hook so a `sys.exit(0)` from # anywhere (test runners, demos that bypass App.quit()) still # shuts down the engine and joins miniaudio's audio thread. # Captured by weakref so the hook doesn't pin the backend alive. _register_atexit_shutdown(self) # -- internal helpers ----------------------------------------------------- def _ensure_default_groups(self) -> None: from .audio_bus import AudioBusLayout layout = AudioBusLayout.get_default() self.sync_bus_layout(layout) def _ensure_group(self, bus_name: str) -> _me.SoundGroup: """Return the SoundGroup for ``bus_name``, lazily creating if absent. Bus names must match the active :class:`AudioBusLayout` exactly: lookup is case-sensitive. Unknown buses raise :class:`UnknownBusError` so typos surface at the call site rather than playing on a phantom root-attached group. """ group = self._groups.get(bus_name) if group is not None: return group # Validate against the active layout before allocating native state. from .audio_bus import AudioBusLayout layout = AudioBusLayout.get_default() layout.get_bus(bus_name) # raises UnknownBusError on miss try: group = _me.SoundGroup(self._engine) except Exception as exc: raise AudioError( f"Failed to create native group for bus {bus_name!r}" ) from exc self._groups[bus_name] = group return group def _make_sound( self, stream: AudioStream, *, bus: str, spatial: bool, pitch_enabled: bool, ) -> tuple[Any, Any] | tuple[None, None]: """Build a ``_me.Sound`` for `stream`. Returns (sound, keeper). `keeper` pins an ``AudioBuffer`` when the source is an ndarray; ``None`` when loaded from a file path. """ group = self._ensure_group(bus) backend_data = getattr(stream, "backend_data", None) if isinstance(backend_data, np.ndarray): try: buf = _me.AudioBuffer( backend_data, sample_rate=self._sample_rate, channels=self._nchannels, ) sound = _me.Sound.from_buffer( self._engine, buf, group=group, spatial=spatial, pitch_enabled=pitch_enabled, ) return sound, buf except Exception as exc: raise InvalidStreamError( f"Failed to play ndarray source: {exc}" ) from exc path = getattr(stream, "path", None) if path: try: sound = _me.Sound.from_file( self._engine, path, group=group, spatial=spatial, pitch_enabled=pitch_enabled, ) return sound, None except Exception as exc: raise InvalidStreamError( f"Failed to play file {path!r}: {exc}" ) from exc return None, None # -- public interface -----------------------------------------------------
[docs] def play_audio( self, stream: AudioStream, *, mode: str = "non_positional", position: Any = None, volume_db: float = 0.0, pitch: float = 1.0, loop: bool = False, bus: str = "Master", max_distance: float = 100.0, from_position: float = 0.0, pan: float = 0.0, gain_db: float = 0.0, ) -> int | None: # 2D / 3D players compute ``pan`` + ``volume_db`` Python-side and # forward them here so the native spatializer applies them before # the first buffer (matches web backend behaviour exactly). HRTF # is intentionally not used for the 2D path: wrong shape for 2D # scenes, so the native ``spatial`` flag stays False; ``mode`` is # accepted for Protocol parity but the native ma_engine path does # not currently branch on it. del mode, position, max_distance sound, keeper = self._make_sound( stream, bus=bus, spatial=False, pitch_enabled=True ) if sound is None: return None sound.set_volume(_db_to_linear(volume_db + gain_db)) sound.set_pitch(max(0.1, float(pitch))) sound.set_looping(bool(loop)) # Apply pan *before* ``sound.start()`` so the audio thread starts # mixing with the correct pan on its very first buffer. Without # this, spatial players play centred for one buffer (~20 ms on the # native path) before the per-frame ``update_audio_2d/3d`` lands. sound.set_pan(max(-1.0, min(1.0, float(pan)))) if from_position > 0.0: try: sound.seek_to_frame(int(float(from_position) * self._sample_rate)) except Exception as exc: sound.shutdown() raise AudioError( f"seek_to_frame failed for from_position={from_position}: {exc}" ) from exc try: sound.start() except Exception as exc: sound.shutdown() raise AudioError(f"sound.start failed: {exc}") from exc cid = _alloc_channel_id() with self._lock: self._sounds[cid] = _SoundEntry(sound=sound, bus=bus, keeper=keeper) return cid
[docs] def stop_audio(self, channel_id: int) -> None: with self._lock: entry = self._sounds.pop(channel_id, None) if entry is None: return try: entry.sound.stop() except Exception as exc: raise_or_warn( exc, key="audio.native.stop_audio.sound_stop_failed", message="MiniaudioBackend.stop_audio: sound.stop failed", ) entry.sound.shutdown()
[docs] def pause_audio(self, channel_id: int) -> None: with self._lock: entry = self._sounds.get(channel_id) if entry is None: return try: entry.sound.stop() entry.paused = True except Exception as exc: raise_or_warn( exc, key="audio.native.pause_audio.sound_stop_failed", message="MiniaudioBackend.pause_audio: sound.stop failed", )
[docs] def resume_audio(self, channel_id: int) -> None: with self._lock: entry = self._sounds.get(channel_id) if entry is None or not entry.paused: return try: entry.sound.start() entry.paused = False except Exception as exc: raise_or_warn( exc, key="audio.native.resume_audio.sound_start_failed", message="MiniaudioBackend.resume_audio: sound.start failed", )
[docs] def update_audio_2d(self, channel_id: int, volume_db: float, pan: float) -> None: with self._lock: entry = self._sounds.get(channel_id) if entry is None: return entry.sound.set_volume(_db_to_linear(volume_db)) entry.sound.set_pan(max(-1.0, min(1.0, float(pan))))
[docs] def update_audio_3d( self, channel_id: int, volume_db: float, pan: float, pitch: float ) -> None: with self._lock: entry = self._sounds.get(channel_id) if entry is None: return entry.sound.set_volume(_db_to_linear(volume_db)) entry.sound.set_pan(max(-1.0, min(1.0, float(pan)))) entry.sound.set_pitch(max(0.1, float(pitch)))
[docs] def set_pitch(self, channel_id: int, pitch: float) -> None: with self._lock: entry = self._sounds.get(channel_id) if entry is None: return if entry.streaming: # Native streaming sounds are built with ``pitch_enabled=False`` # (the resampler would read past what the producer has written); # surface that as a typed capability error rather than silently # dropping the change. raise AudioCapabilityError( "pitch.streaming", backend="MiniaudioBackend", advertised=self.list_capabilities(), remediation=( "Pitch modulation is not supported on streaming sounds. " "Use AudioStreamPlayer with stream_mode='memory' for " "pitch control." ), ) entry.sound.set_pitch(max(0.1, float(pitch)))
[docs] def get_playback_position(self, channel_id: int) -> float: with self._lock: entry = self._sounds.get(channel_id) if entry is None: return 0.0 return entry.sound.cursor_frames() / float(self._sample_rate)
[docs] def is_channel_active(self, channel_id: int) -> bool: with self._lock: entry = self._sounds.get(channel_id) if entry is None: return False if entry.paused: return True return entry.sound.is_playing() and not entry.sound.at_end()
[docs] def open_stream( self, *, volume_db: float = 0.0, bus: str = "Master", buffer_seconds: float = 0.5, stream: AudioStream | None = None, ) -> int: """Open a streaming channel. ``buffer_seconds`` (default 0.5 s) trades latency for underrun tolerance. At 48 kHz stereo s16 = ~96 KB. Drop to 0.1 s for low-latency interactive synthesis; bump to 1-2 s for music streams under heavy CPU load. Producer pushes int16 stereo bytes via ``feed_audio_chunk``; underrun produces silence (no glitch). Only applies to the chunk-fed PCM path: when ``stream`` is a compressed container, miniaudio's ``ma_sound_init_from_file(..., MA_SOUND_FLAG_STREAM)`` owns the buffering internally. Container routing (when ``stream`` is provided): * ``"wav"`` / ``"pcm"`` (synthetic) / ``None``: open an ``ma_pcm_rb``-backed sound. Caller feeds raw int16 stereo bytes. * ``"ogg"`` / ``"mp3"`` / ``"flac"``: open the file directly via ``Sound.from_file(stream=True)``. Subsequent :meth:`feed_audio_chunk` calls are no-ops for this channel. * ``"unknown"``: raise :class:`InvalidStreamError`. Pitch is intentionally not a parameter: streaming sounds are built with ``pitch_enabled=False`` so the resampler can't read past what the producer has written. ``set_pitch`` against the returned channel raises :class:`AudioCapabilityError`. """ group = self._ensure_group(bus) container = stream.container if stream is not None else "wav" if container in ("ogg", "mp3", "flac"): path = stream.path if stream is not None else "" if not path: raise InvalidStreamError( f"Native streaming open: compressed container {container!r} " "requires a file path on the AudioStream" ) try: sound = _me.Sound.from_file( self._engine, path, group=group, stream=True, pitch_enabled=False ) except Exception as exc: raise AudioError( f"open_stream failed for {container} file {path!r}: {exc}" ) from exc sound.set_volume(_db_to_linear(volume_db)) try: sound.start() except Exception as exc: sound.shutdown() raise AudioError(f"open_stream sound.start failed: {exc}") from exc cid = _alloc_channel_id() with self._lock: # ``keeper`` is None: there's no Python-side rb to pin. The # ma_sound itself owns the decoder + buffering. ``streaming`` # stays True so ``set_pitch`` raises the same # ``AudioCapabilityError`` as the chunk-fed path. self._sounds[cid] = _SoundEntry( sound=sound, bus=bus, keeper=None, streaming=True ) return cid if container == "unknown": path = stream.path if stream is not None else "<no-path>" raise InvalidStreamError( f"Native streaming open: could not detect audio container for {path!r}. " "Expected WAV/OGG/MP3/FLAC header." ) # container in {"wav", "pcm"} (or stream is None: legacy callers # without a stream object get the chunk-fed PCM ring path). try: rb = _me.StreamSource( self._engine, sample_rate=self._sample_rate, channels=self._nchannels, buffer_seconds=buffer_seconds, format="s16", ) sound = _me.Sound.from_stream(self._engine, rb, group=group) except Exception as exc: raise AudioError(f"open_stream failed: {exc}") from exc sound.set_volume(_db_to_linear(volume_db)) try: sound.start() except Exception as exc: sound.shutdown() rb.shutdown() raise AudioError(f"open_stream sound.start failed: {exc}") from exc cid = _alloc_channel_id() with self._lock: self._sounds[cid] = _SoundEntry( sound=sound, bus=bus, keeper=rb, streaming=True ) return cid
[docs] def feed_audio_chunk(self, channel_id: int, chunk: bytes) -> None: """Push raw int16 stereo bytes into the channel's ring buffer. Misaligned chunks (length not a multiple of bytes-per-frame) have the trailing partial frame trimmed. When the ring is full the excess is silently dropped: callers can poll ``entry.keeper.available_write_frames`` if they need to know. """ if not chunk: return with self._lock: entry = self._sounds.get(channel_id) if entry is None or entry.keeper is None: return if not isinstance(entry.keeper, _me.StreamSource): return try: entry.keeper.write(chunk) except Exception as exc: raise_or_warn( exc, key="audio.native.feed_audio_chunk.ring_write_failed", message="MiniaudioBackend.feed_audio_chunk: ring write failed", )
# ------------------------------------------------------------------ # 3D listener: drives the native ma_engine spatializer. # ------------------------------------------------------------------
[docs] def set_listener_position(self, x: float, y: float, z: float) -> None: """Forward listener position to the native ma_engine spatializer.""" self._engine.set_listener_position(float(x), float(y), float(z))
[docs] def set_listener_velocity(self, x: float, y: float, z: float) -> None: """Forward listener velocity (m/s) to the native engine for Doppler.""" self._engine.set_listener_velocity(float(x), float(y), float(z))
[docs] def set_listener_direction(self, x: float, y: float, z: float) -> None: """Forward listener forward vector to the native engine.""" self._engine.set_listener_direction(float(x), float(y), float(z))
[docs] def set_listener_world_up(self, x: float, y: float, z: float) -> None: """Forward listener world-up vector to the native engine.""" self._engine.set_listener_world_up(float(x), float(y), float(z))
[docs] def shutdown(self) -> None: with self._lock: entries = list(self._sounds.values()) self._sounds.clear() chains = list(self._effect_chains.values()) self._effect_chains.clear() groups = list(self._groups.values()) self._groups.clear() for entry in entries: try: entry.sound.shutdown() except Exception as exc: raise_or_warn( exc, key="audio.native.shutdown.sound_cleanup_failed", message="MiniaudioBackend.shutdown: sound cleanup failed", ) for chain in chains: try: chain.shutdown() except Exception as exc: raise_or_warn( exc, key="audio.native.shutdown.effect_chain_cleanup_failed", message="MiniaudioBackend.shutdown: effect chain cleanup failed", ) for group in groups: try: group.shutdown() except Exception as exc: raise_or_warn( exc, key="audio.native.shutdown.group_cleanup_failed", message="MiniaudioBackend.shutdown: group cleanup failed", ) try: self._engine.shutdown() except Exception as exc: raise_or_warn( exc, key="audio.native.shutdown.engine_cleanup_failed", message="MiniaudioBackend.shutdown: engine cleanup failed", )
[docs] def list_capabilities(self) -> frozenset[Capability]: return _DESKTOP_CAPABILITIES | _NATIVE_EFFECT_CAPABILITIES
[docs] def sync_bus_layout(self, layout: AudioBusLayout) -> None: """Reconcile native ``ma_sound_group`` volumes + effect chains against `layout`. Cheap and idempotent: pushes only on change. Called by the audio server every frame; safe to call manually. """ for bus in layout.buses: # Bus volume / mute / routing. current = (bus.volume_db, bus.mute, bus.send_to) previous = self._bus_snapshot.get(bus.name) if previous != current: self._bus_snapshot[bus.name] = current group = self._ensure_group(bus.name) if group is not None: effective = bus.effective_linear_volume try: group.set_volume(effective) except Exception as exc: raise_or_warn( exc, key="audio.native.sync_bus_layout.set_volume_failed", message=f"MiniaudioBackend.sync_bus_layout: set_volume failed for bus {bus.name!r}", ) # Effect chain reconciliation. Chain rebuilds only on signature # change; cheap if the effects list is unchanged. group = self._ensure_group(bus.name) if group is None: continue chain = self._effect_chains.get(bus.name) if chain is None: # Only allocate a chain object when there's at least one # effect to install (avoids per-bus overhead when buses # have no effects). if not bus.effects: continue chain = _NativeEffectChain(self._engine, group, _me.engine_endpoint(self._engine)) self._effect_chains[bus.name] = chain try: chain.reconcile(bus.effects) except Exception as exc: raise_or_warn( exc, key="audio.native.sync_bus_layout.effect_chain_rebuild_failed", message=f"MiniaudioBackend.sync_bus_layout: effect chain rebuild failed for bus {bus.name!r}", )
# =========================================================================== # Null (silent) backend # ===========================================================================
[docs] class NullAudioBackend: """Silent backend implementing :class:`AudioPlaybackBackend` + :class:`AudioBusBackend`. Selected automatically by :func:`make_backend` when neither the native extension nor the legacy ``miniaudio`` path can start (e.g. no audio device on the host, ALSA/Pulse not running, the ``miniaudio`` Python package not installed, or a sandboxed environment with no compiler). The engine, scene tree, and :class:`AudioStreamPlayer` nodes all operate normally: calls return valid channel IDs and :meth:`is_channel_active` behaves consistently, but nothing is rendered to a device. NullBackend does NOT implement :class:`AudioStreamingBackend`. Code paths that need streaming (procedural synth via :class:`AudioSynth.attach_to`, AudioWorklet feeds, etc.) must check ``isinstance(backend, AudioStreamingBackend)`` and raise / warn-once when it's absent. Advertised capabilities are narrowed to ``{Capability.PLAY_BASIC}`` so effect modules don't try to instantiate native nodes on the null path. """ def __init__( self, sample_rate: int = _DEFAULT_SAMPLE_RATE, nchannels: int = _DEFAULT_CHANNELS, ): self._sample_rate = sample_rate self._nchannels = nchannels self._lock = threading.Lock() self._channels: set[int] = set() self._paused: set[int] = set() # Symmetric atexit registration: no audio thread to leak here, but # keeps shutdown semantics consistent across all three backends so # tests can iterate ``_atexit_backends`` and assert every active # backend gets joined at interpreter exit. _register_atexit_shutdown(self) def _alloc(self) -> int: cid = _alloc_channel_id() with self._lock: self._channels.add(cid) return cid
[docs] def play_audio( self, stream: AudioStream, *, mode: str = "non_positional", position: Any = None, volume_db: float = 0.0, pitch: float = 1.0, loop: bool = False, bus: str = "Master", max_distance: float = 100.0, from_position: float = 0.0, pan: float = 0.0, gain_db: float = 0.0, ) -> int | None: # NullBackend is silent regardless of any kwarg; accept the full # signature so AudioPlaybackBackend Protocol parity holds. return self._alloc()
[docs] def stop_audio(self, channel_id: int) -> None: with self._lock: self._channels.discard(channel_id) self._paused.discard(channel_id)
[docs] def pause_audio(self, channel_id: int) -> None: with self._lock: if channel_id in self._channels: self._paused.add(channel_id)
[docs] def resume_audio(self, channel_id: int) -> None: with self._lock: self._paused.discard(channel_id)
[docs] def update_audio_2d(self, channel_id: int, volume_db: float, pan: float) -> None: return
[docs] def update_audio_3d( self, channel_id: int, volume_db: float, pan: float, pitch: float ) -> None: return
[docs] def set_pitch(self, channel_id: int, pitch: float) -> None: return
# Listener pose endpoints: Null backend has no spatializer.
[docs] def set_listener_position(self, x: float, y: float, z: float) -> None: return
[docs] def set_listener_velocity(self, x: float, y: float, z: float) -> None: return
[docs] def set_listener_direction(self, x: float, y: float, z: float) -> None: return
[docs] def set_listener_world_up(self, x: float, y: float, z: float) -> None: return
[docs] def get_playback_position(self, channel_id: int) -> float: return 0.0
[docs] def is_channel_active(self, channel_id: int) -> bool: with self._lock: return channel_id in self._channels
[docs] def shutdown(self) -> None: with self._lock: self._channels.clear() self._paused.clear()
[docs] def list_capabilities(self) -> frozenset[Capability]: # The null backend supports only the unconditional "play.basic" # advertisement: every call succeeds (silently) and returns a # valid channel id. Spatial / streaming / effect capabilities are # deliberately omitted so callers that gate on them raise # AudioCapabilityError or skip work rather than producing the # illusion of a working pipeline. return frozenset({Capability.PLAY_BASIC})
[docs] def sync_bus_layout(self, layout: AudioBusLayout) -> None: return
# =========================================================================== # Factory # =========================================================================== _fallback_warned = False # Weak-set of native backends still alive at interpreter shutdown. Each # entry's shutdown() is called via atexit so miniaudio's audio thread # is joined cleanly even when callers bypass `App.quit()` via sys.exit # (the long-standing pattern noted in feedback_app_quit_pattern.md). _atexit_backends: weakref.WeakSet[Any] = weakref.WeakSet() _atexit_registered = False def _register_atexit_shutdown(backend) -> None: global _atexit_registered _atexit_backends.add(backend) if _atexit_registered: return _atexit_registered = True def _shutdown_all_atexit(): for be in list(_atexit_backends): try: be.shutdown() except Exception as exc: raise_or_warn( exc, key="audio.native.atexit.backend_shutdown_failed", message="atexit: backend shutdown failed", ) atexit.register(_shutdown_all_atexit) _NATIVE_REBUILD_HINT = ( "Run `uv pip install --reinstall -e packages/core` after installing a " "C compiler to enable low-latency audio, or `uv run --with setuptools " "simvx build-audio` to rebuild without reinstalling." ) def _legacy_allowed() -> bool: """Read ``SIMVX_ALLOW_LEGACY_AUDIO`` fresh each call (tests monkeypatch it).""" return os.environ.get("SIMVX_ALLOW_LEGACY_AUDIO", "1") != "0"
[docs] def make_backend( sample_rate: int = _DEFAULT_SAMPLE_RATE, nchannels: int = _DEFAULT_CHANNELS, ) -> MiniaudioBackend | _LegacyMiniaudioBackend | NullAudioBackend: """Pick the best available audio backend, falling back loudly on failure. Resolution order (the runtime **never** invokes a C compiler: that's the install-time build hook's job): 1. **Native** ``MiniaudioBackend`` (~20 ms latency). Selected when the compiled ``_simvx_miniaudio_engine`` extension imports cleanly. 2. **Legacy** ``_LegacyMiniaudioBackend`` (~100 ms latency). Selected when native is unavailable AND ``SIMVX_ALLOW_LEGACY_AUDIO`` is not set to ``"0"``. Emits a one-time WARNING with rebuild instructions. 3. **Null** ``NullAudioBackend`` (silent). Selected when neither native nor legacy can start: typically a sandboxed CI without an audio device. Emits another one-time WARNING. Set ``SIMVX_ALLOW_LEGACY_AUDIO=0`` to refuse the fallback chain and raise :class:`AudioBackendUnavailable` if the native extension is missing or fails to initialise. """ global _fallback_warned # --- 1. native ---------------------------------------------------------- if _me.is_available(): try: return MiniaudioBackend(sample_rate=sample_rate, nchannels=nchannels) except Exception as exc: if not _legacy_allowed(): raise AudioBackendUnavailable( f"Native audio backend failed to initialise: {exc}. " f"SIMVX_ALLOW_LEGACY_AUDIO=0 is set so the legacy fallback is " f"disabled. {_NATIVE_REBUILD_HINT}" ) from exc log.warning( "MiniaudioBackend init failed (%s); trying legacy mixer.", exc ) elif not _legacy_allowed(): raise AudioBackendUnavailable( "Native audio extension is not built and SIMVX_ALLOW_LEGACY_AUDIO=0 " f"is set. {_NATIVE_REBUILD_HINT}" ) # --- 2. legacy ---------------------------------------------------------- if not _fallback_warned: _fallback_warned = True log.warning( "Native audio extension not available; using legacy mixer " "(100 ms latency). %s Or set SIMVX_ALLOW_LEGACY_AUDIO=0 to fail " "loudly instead.", _NATIVE_REBUILD_HINT, ) try: return _LegacyMiniaudioBackend(sample_rate=sample_rate, nchannels=nchannels) except Exception as exc: log.warning( "_LegacyMiniaudioBackend init failed (%s); using silent NullAudioBackend. " "Audio will be inaudible but the engine will run normally.", exc, ) # --- 3. null ------------------------------------------------------------ return NullAudioBackend(sample_rate=sample_rate, nchannels=nchannels)