"""The native (``ma_engine``) production audio backend.
``MiniaudioBackend`` mixes, spatializes, and runs DSP entirely in native C via
the ``simvx.core._native.miniaudio_engine`` CFFI wrapper. Python only pushes
parameter updates. Falls back to the legacy / null backends via
``make_backend`` when the compiled extension is unavailable.
"""
from __future__ import annotations
import threading
from typing import TYPE_CHECKING, Any
import numpy as np
from .._native import miniaudio_engine as _me
from ..audio_errors import (
AudioCapabilityError,
AudioError,
InvalidStreamError,
raise_or_warn,
)
from ..audio_protocol import Capability
from ._effects import _NativeEffectChain
from ._shared import (
_DEFAULT_CHANNELS,
_DEFAULT_SAMPLE_RATE,
_DESKTOP_CAPABILITIES,
_NATIVE_EFFECT_CAPABILITIES,
_alloc_channel_id,
_db_to_linear,
_register_atexit_shutdown,
_SoundEntry,
_warn_if_no_vorbis,
)
if TYPE_CHECKING:
from ..audio import AudioClip
from ..audio_bus import AudioBusLayout
[docs]
class MiniaudioBackend:
"""Native-mixed audio backend on top of ``ma_engine``.
All mixing, spatialization, and DSP runs in native C: Python only
pushes parameter updates. Target latency: ``buffersize_msec=20`` (vs
the legacy backend's 100 ms).
The native extension must be built once after install:
simvx build-audio
If the extension is missing, the constructor raises
``MiniaudioEngineUnavailable``. Use ``make_backend()`` for an
automatic fallback to ``_LegacyMiniaudioBackend`` with a one-time
warning.
"""
def __init__(
self,
sample_rate: int = _DEFAULT_SAMPLE_RATE,
nchannels: int = _DEFAULT_CHANNELS,
):
self._sample_rate = sample_rate
self._nchannels = nchannels
self._lock = threading.Lock()
self._sounds: dict[int, _SoundEntry] = {}
self._groups: dict[str, _me.SoundGroup] = {}
# Per-bus native effect chains (one per group). Built lazily on
# first sync_bus_layout that sees a non-empty effects list.
self._effect_chains: dict[str, _NativeEffectChain] = {}
# Snapshot of (volume_db, mute, send_to) per bus so sync_bus_layout
# is cheap to re-call every frame: only push to native on change.
self._bus_snapshot: dict[str, tuple[float, bool, str]] = {}
_warn_if_no_vorbis()
self._engine = _me.Engine(
sample_rate=sample_rate, channels=nchannels, device=True
)
# Seed default groups so unfamiliar bus names play at master gain.
self._ensure_default_groups()
# Belt-and-braces: register an atexit hook so a `sys.exit(0)` from
# anywhere (test runners, demos that bypass App.quit()) still
# shuts down the engine and joins miniaudio's audio thread.
# Captured by weakref so the hook doesn't pin the backend alive.
_register_atexit_shutdown(self)
# -- internal helpers -----------------------------------------------------
def _ensure_default_groups(self) -> None:
from ..audio_bus import AudioBusLayout
layout = AudioBusLayout.get_default()
self.sync_bus_layout(layout)
def _ensure_group(self, bus_name: str) -> _me.SoundGroup:
"""Return the SoundGroup for ``bus_name``, lazily creating if absent.
Bus names must match the active :class:`AudioBusLayout` exactly:
lookup is case-sensitive. Unknown buses raise
:class:`UnknownBusError` so typos surface at the call site rather
than playing on a phantom root-attached group.
"""
group = self._groups.get(bus_name)
if group is not None:
return group
# Validate against the active layout before allocating native state.
from ..audio_bus import AudioBusLayout
layout = AudioBusLayout.get_default()
layout.get_bus(bus_name) # raises UnknownBusError on miss
try:
group = _me.SoundGroup(self._engine)
except Exception as exc:
raise AudioError(
f"Failed to create native group for bus {bus_name!r}"
) from exc
self._groups[bus_name] = group
return group
def _make_sound(
self,
stream: AudioClip,
*,
bus: str,
spatial: bool,
pitch_enabled: bool,
) -> tuple[Any, Any] | tuple[None, None]:
"""Build a ``_me.Sound`` for `stream`. Returns (sound, keeper).
`keeper` pins an ``AudioBuffer`` when the source is an ndarray;
``None`` when loaded from a file path.
"""
group = self._ensure_group(bus)
backend_data = getattr(stream, "backend_data", None)
if isinstance(backend_data, np.ndarray):
try:
buf = _me.AudioBuffer(
backend_data,
sample_rate=self._sample_rate,
channels=self._nchannels,
)
sound = _me.Sound.from_buffer(
self._engine,
buf,
group=group,
spatial=spatial,
pitch_enabled=pitch_enabled,
)
return sound, buf
except Exception as exc:
raise InvalidStreamError(
f"Failed to play ndarray source: {exc}"
) from exc
path = getattr(stream, "path", None)
if path:
try:
sound = _me.Sound.from_file(
self._engine,
path,
group=group,
spatial=spatial,
pitch_enabled=pitch_enabled,
)
return sound, None
except Exception as exc:
raise InvalidStreamError(
f"Failed to play file {path!r}: {exc}"
) from exc
return None, None
# -- public interface -----------------------------------------------------
[docs]
def play_audio(
self,
stream: AudioClip,
*,
mode: str = "non_positional",
position: Any = None,
volume_db: float = 0.0,
pitch: float = 1.0,
loop: bool = False,
bus: str = "Master",
max_distance: float = 100.0,
from_position: float = 0.0,
pan: float = 0.0,
gain_db: float = 0.0,
) -> int | None:
# 2D / 3D players compute ``pan`` + ``volume_db`` Python-side and
# forward them here so the native spatializer applies them before
# the first buffer (matches web backend behaviour exactly). HRTF
# is intentionally not used for the 2D path: wrong shape for 2D
# scenes, so the native ``spatial`` flag stays False; ``mode`` is
# accepted for Protocol parity but the native ma_engine path does
# not currently branch on it.
del mode, position, max_distance
sound, keeper = self._make_sound(
stream, bus=bus, spatial=False, pitch_enabled=True
)
if sound is None:
return None
sound.set_volume(_db_to_linear(volume_db + gain_db))
sound.set_pitch(max(0.1, float(pitch)))
sound.set_looping(bool(loop))
# Apply pan *before* ``sound.start()`` so the audio thread starts
# mixing with the correct pan on its very first buffer. Without
# this, spatial players play centred for one buffer (~20 ms on the
# native path) before the per-frame ``update_audio_2d/3d`` lands.
sound.set_pan(max(-1.0, min(1.0, float(pan))))
if from_position > 0.0:
try:
sound.seek_to_frame(int(float(from_position) * self._sample_rate))
except Exception as exc:
sound.shutdown()
raise AudioError(
f"seek_to_frame failed for from_position={from_position}: {exc}"
) from exc
try:
sound.start()
except Exception as exc:
sound.shutdown()
raise AudioError(f"sound.start failed: {exc}") from exc
cid = _alloc_channel_id()
with self._lock:
self._sounds[cid] = _SoundEntry(sound=sound, bus=bus, keeper=keeper)
return cid
[docs]
def stop_audio(self, channel_id: int) -> None:
with self._lock:
entry = self._sounds.pop(channel_id, None)
if entry is None:
return
try:
entry.sound.stop()
except Exception as exc:
raise_or_warn(
exc,
key="audio.native.stop_audio.sound_stop_failed",
message="MiniaudioBackend.stop_audio: sound.stop failed",
)
entry.sound.shutdown()
[docs]
def pause_audio(self, channel_id: int) -> None:
with self._lock:
entry = self._sounds.get(channel_id)
if entry is None:
return
try:
entry.sound.stop()
entry.paused = True
except Exception as exc:
raise_or_warn(
exc,
key="audio.native.pause_audio.sound_stop_failed",
message="MiniaudioBackend.pause_audio: sound.stop failed",
)
[docs]
def resume_audio(self, channel_id: int) -> None:
with self._lock:
entry = self._sounds.get(channel_id)
if entry is None or not entry.paused:
return
try:
entry.sound.start()
entry.paused = False
except Exception as exc:
raise_or_warn(
exc,
key="audio.native.resume_audio.sound_start_failed",
message="MiniaudioBackend.resume_audio: sound.start failed",
)
[docs]
def update_audio_2d(self, channel_id: int, volume_db: float, pan: float) -> None:
with self._lock:
entry = self._sounds.get(channel_id)
if entry is None:
return
entry.sound.set_volume(_db_to_linear(volume_db))
entry.sound.set_pan(max(-1.0, min(1.0, float(pan))))
[docs]
def update_audio_3d(
self, channel_id: int, volume_db: float, pan: float, pitch: float
) -> None:
with self._lock:
entry = self._sounds.get(channel_id)
if entry is None:
return
entry.sound.set_volume(_db_to_linear(volume_db))
entry.sound.set_pan(max(-1.0, min(1.0, float(pan))))
entry.sound.set_pitch(max(0.1, float(pitch)))
[docs]
def set_pitch(self, channel_id: int, pitch: float) -> None:
with self._lock:
entry = self._sounds.get(channel_id)
if entry is None:
return
if entry.streaming:
# Native streaming sounds are built with ``pitch_enabled=False``
# (the resampler would read past what the producer has written);
# surface that as a typed capability error rather than silently
# dropping the change.
raise AudioCapabilityError(
"pitch.streaming",
backend="MiniaudioBackend",
advertised=self.list_capabilities(),
remediation=(
"Pitch modulation is not supported on streaming sounds. "
"Use AudioPlayer with stream_mode='memory' for "
"pitch control."
),
)
entry.sound.set_pitch(max(0.1, float(pitch)))
[docs]
def get_playback_position(self, channel_id: int) -> float:
with self._lock:
entry = self._sounds.get(channel_id)
if entry is None:
return 0.0
return entry.sound.cursor_frames() / float(self._sample_rate)
[docs]
def is_channel_active(self, channel_id: int) -> bool:
with self._lock:
entry = self._sounds.get(channel_id)
if entry is None:
return False
if entry.paused:
return True
return entry.sound.is_playing() and not entry.sound.at_end()
[docs]
def open_stream(
self,
*,
volume_db: float = 0.0,
bus: str = "Master",
buffer_seconds: float = 0.5,
stream: AudioClip | None = None,
) -> int:
"""Open a streaming channel.
``buffer_seconds`` (default 0.5 s) trades latency for underrun
tolerance. At 48 kHz stereo s16 = ~96 KB. Drop to 0.1 s for
low-latency interactive synthesis; bump to 1-2 s for music
streams under heavy CPU load. Producer pushes int16 stereo
bytes via ``feed_audio_chunk``; underrun produces silence
(no glitch). Only applies to the chunk-fed PCM path: when
``stream`` is a compressed container, miniaudio's
``ma_sound_init_from_file(..., MA_SOUND_FLAG_STREAM)`` owns the
buffering internally.
Container routing (when ``stream`` is provided):
* ``"wav"`` / ``"pcm"`` (synthetic) / ``None``: open an
``ma_pcm_rb``-backed sound. Caller feeds raw int16 stereo bytes.
* ``"ogg"`` / ``"mp3"`` / ``"flac"``: open the file directly via
``Sound.from_file(stream=True)``. Subsequent
:meth:`feed_audio_chunk` calls are no-ops for this channel.
* ``"unknown"``: raise :class:`InvalidStreamError`.
Pitch is intentionally not a parameter: streaming sounds are
built with ``pitch_enabled=False`` so the resampler can't read
past what the producer has written. ``set_pitch`` against the
returned channel raises :class:`AudioCapabilityError`.
"""
group = self._ensure_group(bus)
container = stream.container if stream is not None else "wav"
if container in ("ogg", "mp3", "flac"):
path = stream.path if stream is not None else ""
if not path:
raise InvalidStreamError(
f"Native streaming open: compressed container {container!r} "
"requires a file path on the AudioClip"
)
try:
sound = _me.Sound.from_file(
self._engine, path, group=group, stream=True, pitch_enabled=False
)
except Exception as exc:
raise AudioError(
f"open_stream failed for {container} file {path!r}: {exc}"
) from exc
sound.set_volume(_db_to_linear(volume_db))
try:
sound.start()
except Exception as exc:
sound.shutdown()
raise AudioError(f"open_stream sound.start failed: {exc}") from exc
cid = _alloc_channel_id()
with self._lock:
# ``keeper`` is None: there's no Python-side rb to pin. The
# ma_sound itself owns the decoder + buffering. ``streaming``
# stays True so ``set_pitch`` raises the same
# ``AudioCapabilityError`` as the chunk-fed path.
self._sounds[cid] = _SoundEntry(
sound=sound, bus=bus, keeper=None, streaming=True
)
return cid
if container == "unknown":
path = stream.path if stream is not None else "<no-path>"
raise InvalidStreamError(
f"Native streaming open: could not detect audio container for {path!r}. "
"Expected WAV/OGG/MP3/FLAC header."
)
# container in {"wav", "pcm"} (or stream is None: legacy callers
# without a stream object get the chunk-fed PCM ring path).
try:
rb = _me.StreamSource(
self._engine,
sample_rate=self._sample_rate,
channels=self._nchannels,
buffer_seconds=buffer_seconds,
format="s16",
)
sound = _me.Sound.from_stream(self._engine, rb, group=group)
except Exception as exc:
raise AudioError(f"open_stream failed: {exc}") from exc
sound.set_volume(_db_to_linear(volume_db))
try:
sound.start()
except Exception as exc:
sound.shutdown()
rb.shutdown()
raise AudioError(f"open_stream sound.start failed: {exc}") from exc
cid = _alloc_channel_id()
with self._lock:
self._sounds[cid] = _SoundEntry(
sound=sound, bus=bus, keeper=rb, streaming=True
)
return cid
[docs]
def feed_audio_chunk(self, channel_id: int, chunk: bytes) -> None:
"""Push raw int16 stereo bytes into the channel's ring buffer.
Misaligned chunks (length not a multiple of bytes-per-frame)
have the trailing partial frame trimmed. When the ring is full
the excess is silently dropped: callers can poll
``entry.keeper.available_write_frames`` if they need to know.
"""
if not chunk:
return
with self._lock:
entry = self._sounds.get(channel_id)
if entry is None or entry.keeper is None:
return
if not isinstance(entry.keeper, _me.StreamSource):
return
try:
entry.keeper.write(chunk)
except Exception as exc:
raise_or_warn(
exc,
key="audio.native.feed_audio_chunk.ring_write_failed",
message="MiniaudioBackend.feed_audio_chunk: ring write failed",
)
# ------------------------------------------------------------------
# 3D listener: drives the native ma_engine spatializer.
# ------------------------------------------------------------------
[docs]
def set_listener_position(self, x: float, y: float, z: float) -> None:
"""Forward listener position to the native ma_engine spatializer."""
self._engine.set_listener_position(float(x), float(y), float(z))
[docs]
def set_listener_velocity(self, x: float, y: float, z: float) -> None:
"""Forward listener velocity (m/s) to the native engine for Doppler."""
self._engine.set_listener_velocity(float(x), float(y), float(z))
[docs]
def set_listener_direction(self, x: float, y: float, z: float) -> None:
"""Forward listener forward vector to the native engine."""
self._engine.set_listener_direction(float(x), float(y), float(z))
[docs]
def set_listener_world_up(self, x: float, y: float, z: float) -> None:
"""Forward listener world-up vector to the native engine."""
self._engine.set_listener_world_up(float(x), float(y), float(z))
[docs]
def shutdown(self) -> None:
with self._lock:
entries = list(self._sounds.values())
self._sounds.clear()
chains = list(self._effect_chains.values())
self._effect_chains.clear()
groups = list(self._groups.values())
self._groups.clear()
for entry in entries:
try:
entry.sound.shutdown()
except Exception as exc:
raise_or_warn(
exc,
key="audio.native.shutdown.sound_cleanup_failed",
message="MiniaudioBackend.shutdown: sound cleanup failed",
)
for chain in chains:
try:
chain.shutdown()
except Exception as exc:
raise_or_warn(
exc,
key="audio.native.shutdown.effect_chain_cleanup_failed",
message="MiniaudioBackend.shutdown: effect chain cleanup failed",
)
for group in groups:
try:
group.shutdown()
except Exception as exc:
raise_or_warn(
exc,
key="audio.native.shutdown.group_cleanup_failed",
message="MiniaudioBackend.shutdown: group cleanup failed",
)
try:
self._engine.shutdown()
except Exception as exc:
raise_or_warn(
exc,
key="audio.native.shutdown.engine_cleanup_failed",
message="MiniaudioBackend.shutdown: engine cleanup failed",
)
[docs]
def list_capabilities(self) -> frozenset[Capability]:
return _DESKTOP_CAPABILITIES | _NATIVE_EFFECT_CAPABILITIES
[docs]
def sync_bus_layout(self, layout: AudioBusLayout) -> None:
"""Reconcile native ``ma_sound_group`` volumes + effect chains against `layout`.
Cheap and idempotent: pushes only on change. Called by the
audio server every frame; safe to call manually.
"""
for bus in layout.buses:
# Bus volume / mute / routing.
current = (bus.volume_db, bus.mute, bus.send_to)
previous = self._bus_snapshot.get(bus.name)
if previous != current:
self._bus_snapshot[bus.name] = current
group = self._ensure_group(bus.name)
if group is not None:
effective = bus.effective_linear_volume
try:
group.set_volume(effective)
except Exception as exc:
raise_or_warn(
exc,
key="audio.native.sync_bus_layout.set_volume_failed",
message=f"MiniaudioBackend.sync_bus_layout: set_volume failed for bus {bus.name!r}",
)
# Effect chain reconciliation. Chain rebuilds only on signature
# change; cheap if the effects list is unchanged.
group = self._ensure_group(bus.name)
if group is None:
continue
chain = self._effect_chains.get(bus.name)
if chain is None:
# Only allocate a chain object when there's at least one
# effect to install (avoids per-bus overhead when buses
# have no effects).
if not bus.effects:
continue
chain = _NativeEffectChain(self._engine, group, _me.engine_endpoint(self._engine))
self._effect_chains[bus.name] = chain
try:
chain.reconcile(bus.effects)
except Exception as exc:
raise_or_warn(
exc,
key="audio.native.sync_bus_layout.effect_chain_rebuild_failed",
message=f"MiniaudioBackend.sync_bus_layout: effect chain rebuild failed for bus {bus.name!r}",
)