Source code for simvx.core._audio_players

"""
Audio player nodes: :class:`AudioStreamPlayer` / ``2D`` / ``3D``.

Private leaf module behind the :mod:`simvx.core.audio` facade. The player
node classes compose the shared :class:`_AudioPlaybackMixin` with the node
base classes and add the non-positional / 2D / 3D playback specifics. The
shared playback state machine lives in ``_audio_playback``; the stream/data
layer in ``_audio_stream``.
"""

from __future__ import annotations

import logging
import os
from typing import Any

from ._audio_playback import _AudioPlaybackMixin
from ._audio_stream import AudioSource, AudioStream, _seek_wav_data_chunk
from .audio_errors import AudioError
from .descriptors import Property
from .math.types import Vec2, Vec3, clamp
from .node import Node
from .nodes_2d.node2d import Node2D
from .nodes_3d.node3d import Node3D
from .properties import Colour
from .signals import Signal

log = logging.getLogger(__name__)

# ============================================================================
# AudioStreamPlayer: Background music / UI sounds
# ============================================================================

[docs] class AudioStreamPlayer(_AudioPlaybackMixin, Node): """Non-positional audio player for background music and UI sounds. This player does not use 3D positioning: volume is constant regardless of camera position. Use AudioStreamPlayer2D or AudioStreamPlayer3D for spatial audio. Settings: volume_db: Volume in decibels (-80 to 24). 0 = full volume. pitch_scale: Playback speed multiplier (0.5 to 2.0). bus: Audio bus name. One of ``"Master"``, ``"Music"``, ``"SFX"``, ``"Voice"``, ``"UI"`` (case-sensitive, Godot convention). autoplay: Start playing when added to scene tree. loop: Loop playback when finished. stream_mode: "memory" loads entire file; "streaming" reads in chunks. buffer_size: Chunk size in bytes for streaming mode (default 64KB). """ volume_db = Property( 0.0, range=(-80.0, 24.0), hint="Volume in decibels [live: pushed mid-playback]", group="Playback", on_change="_on_volume_db_changed", ) pitch_scale = Property( 1.0, range=(0.5, 2.0), hint="Playback speed [live: pushed mid-playback via backend.set_pitch]", group="Playback", on_change="_on_pitch_scale_changed", ) bus = Property( "Master", enum=["Master", "Music", "SFX", "Voice", "UI"], hint="Audio bus [next_play: mid-playback writes raise in strict mode]", group="Playback", on_change="_on_bus_changed", ) autoplay = Property( False, hint="Auto-play on ready [next_play: changing after on_ready has no effect]", group="Playback", on_change="_on_autoplay_changed", ) loop = Property( False, hint="Loop playback [next_play: mid-playback writes raise in strict mode]", group="Playback", on_change="_on_loop_changed", ) queue_free_on_end = Property( False, hint="Remove this node from the tree once playback finishes [next_play]", group="Playback", on_change="_on_queue_free_on_end_changed", ) stream_mode = Property( "memory", enum=["memory", "streaming"], hint="Load mode [next_play: mid-playback writes raise in strict mode]", group="Playback", on_change="_on_stream_mode_changed", ) buffer_size = Property( 65536, range=(4096, 524288), hint="Streaming buffer size [next_play]", group="Playback", on_change="_on_buffer_size_changed", ) # Emitted when `stream_mode == "streaming"` and `play()` cannot open or parse # the source file. Argument is the resolved path string. Games can hook this # to fall back to a placeholder track, log to a UI, etc. stream_open_failed = Signal(str) def __init__(self, stream: AudioSource | AudioStream | None = None, **kwargs): super().__init__(**kwargs) self._init_playback(stream) self._stream_file: Any = None # Open file handle for streaming mode self._stream_offset: int = 0 # Current read offset in file self._stream_data_offset: int = 0 # Byte offset past file header (e.g. WAV header)
[docs] def on_ready(self): self._autoplay_check()
[docs] def on_process(self, delta: float): """Feed audio chunks to backend in streaming mode; tick fades; auto-free.""" if self._playing and not self._paused and self._tick_fade(delta): backend = self._get_backend() if backend and self._backend_channel is not None: self._push_audio_state(backend, self._backend_channel) if self.stream_mode == "streaming" and self._playing and not self._paused: self._process_streaming() if self._check_queue_free_on_end(): return
def _process_streaming(self): """Read next chunk from WAV file and feed to audio backend. Only runs when the player owns a Python-side file handle: i.e. when the container is WAV (or raw PCM). Compressed containers (OGG/MP3/FLAC) are decoded by the backend's native streaming decoder; ``_stream_file`` is ``None`` and this method is a no-op for them. """ if not self._stream_file or not self._tree: return backend = self._get_backend() if not backend: return chunk = self._stream_file.read(self.buffer_size) if not chunk: if self.loop: self._stream_file.seek(self._stream_data_offset) chunk = self._stream_file.read(self.buffer_size) if not chunk: self.stop() return backend.feed_audio_chunk(self._backend_channel, chunk)
[docs] def play(self, from_position: float = 0.0): """Start or resume playback. Args: from_position: Start position in seconds (0.0 = beginning). """ if not self._play_common(from_position): return # Open file for streaming mode. Must happen before allocating a backend # channel: a failure here used to leave a silent ghost channel attached # to the bus with no source feeding it. if self.stream_mode == "streaming": if not self._open_stream_file(): self._playing = False return backend = self._get_backend() if backend: if self.stream_mode == "streaming": # `from_position` doesn't apply to ``open_stream``: the file # cursor is owned by ``_process_streaming`` and a seek here # would just be overwritten by the next chunk read. Skip it # for streaming mode and let the file handle seek if needed. # # Passing ``stream`` lets the backend route compressed # containers (ogg/mp3/flac) through its native decoder # rather than expecting raw PCM via ``feed_audio_chunk``. # # Narrow to AudioStreamingBackend: NullAudioBackend (and any # future playback-only backend) doesn't implement streaming. # Surface that as a typed AudioCapabilityError so the user # gets a clear remediation rather than an AttributeError. from .audio_protocol import AudioStreamingBackend if not isinstance(backend, AudioStreamingBackend): from .audio_errors import AudioCapabilityError if self._stream_file: self._stream_file.close() self._stream_file = None self.stream_open_failed(self.stream.path if self.stream else "") self._playing = False raise AudioCapabilityError( "streaming", backend=type(backend).__name__, advertised=backend.list_capabilities(), remediation=( "AudioStreamPlayer(stream_mode='streaming') requires an " "AudioStreamingBackend (open_stream / feed_audio_chunk). " "Use stream_mode='memory' or install the native extension." ), ) try: self._backend_channel = backend.open_stream( volume_db=self._effective_volume_db(), bus=self.bus, stream=self.stream, ) except AudioError as exc: log.warning( "AudioStreamPlayer: backend rejected open_stream for %r: %s", self.stream.path if self.stream else "", exc, ) if self._stream_file: self._stream_file.close() self._stream_file = None self.stream_open_failed(self.stream.path if self.stream else "") self._playing = False return else: self._backend_channel = backend.play_audio( self.stream, mode="non_positional", volume_db=self._effective_volume_db(), pitch=self.pitch_scale, loop=self.loop, bus=self.bus, from_position=from_position, )
def _open_stream_file(self) -> bool: """Prepare ``self.stream`` for streaming playback. For WAV containers, opens the file and positions it past the header so ``_process_streaming`` can feed raw int16 PCM bytes to the backend. For compressed containers (OGG/MP3/FLAC), there is no Python-side file handle: the backend's native decoder opens the file itself when :meth:`AudioBackend.open_stream` is invoked with the stream. This method just validates the file is readable and the container is recognised, then returns True without setting ``self._stream_file``. Returns True if the backend can proceed to open the stream. On failure logs, emits ``stream_open_failed``, leaves ``self._stream_file`` None, and returns False: caller must skip backend channel allocation to avoid leaking a silent channel. """ if self._stream_file: self._stream_file.close() self._stream_file = None path = self.stream.path if self.stream else "" if not path: log.warning("AudioStreamPlayer: streaming mode requires a file path") self.stream_open_failed(path) return False container = self.stream.container if self.stream else "unknown" # Synthetic streams (from_pcm / tone) carry decoded ndarray data; # they were never meant to flow through the chunk-fed streaming # path. The stream player should use stream_mode="memory" for them. if container == "pcm": log.warning( "AudioStreamPlayer: stream_mode='streaming' is not supported for " "synthetic AudioStream (from_pcm/tone). Use stream_mode='memory'. path=%s", path, ) self.stream_open_failed(path) return False if container == "wav": try: f = open(path, "rb") # noqa: SIM115 except OSError as exc: log.warning("Failed to open audio stream file %s: %s", path, exc) self.stream_open_failed(path) return False data_offset = _seek_wav_data_chunk(f) if data_offset is None: log.warning("Invalid or unsupported WAV file (no data chunk): %s", path) f.close() self.stream_open_failed(path) return False self._stream_data_offset = data_offset self._stream_file = f return True if container in ("ogg", "mp3", "flac"): # Verify the file is at least readable so we fail loud here # rather than inside the backend's native decoder call. if not os.path.isfile(path): log.warning( "AudioStreamPlayer: streaming file does not exist or is not a regular file: %s", path, ) self.stream_open_failed(path) return False # No Python-side file handle: the backend will open the file # via its native decoder. ``_process_streaming`` no-ops. return True # container == "unknown": the header probe failed to recognise the # format. Fail loud here rather than letting the backend's decoder # produce noise from misinterpreted bytes. log.warning( "AudioStreamPlayer: unrecognised audio container for streaming source %s " "(expected WAV/OGG/MP3/FLAC header)", path, ) self.stream_open_failed(path) return False
[docs] def stop(self): """Stop playback and reset position to beginning.""" if self._stream_file: self._stream_file.close() self._stream_file = None super().stop()
[docs] def get_playback_position(self) -> float: """Get current playback position in seconds.""" backend = self._get_backend() if backend and self._backend_channel is not None: return backend.get_playback_position(self._backend_channel) return 0.0
# ============================================================================ # AudioStreamPlayer2D: 2D positional audio # ============================================================================
[docs] class AudioStreamPlayer2D(_AudioPlaybackMixin, Node2D): """2D positional audio player with stereo panning. Audio volume and pan are calculated based on distance from the 2D listener (typically Camera2D position). Left/right panning simulates direction. Settings: volume_db: Base volume in decibels (-80 to 24). pitch_scale: Playback speed multiplier (0.5 to 2.0). bus: Audio bus name. autoplay: Start playing when added to scene tree. loop: Loop playback when finished. max_distance: Distance at which audio is inaudible (pixels). attenuation: Distance attenuation exponent (1.0 = linear, 2.0 = inverse square). """ volume_db = Property( 0.0, range=(-80.0, 24.0), hint="Volume in decibels [live]", group="Playback", on_change="_on_volume_db_changed", ) pitch_scale = Property( 1.0, range=(0.5, 2.0), hint="Playback speed [live: pushed mid-playback via backend.set_pitch]", group="Playback", on_change="_on_pitch_scale_changed", ) bus = Property( "SFX", enum=["Master", "Music", "SFX", "Voice", "UI"], hint="Audio bus [next_play]", group="Playback", on_change="_on_bus_changed", ) autoplay = Property( False, hint="Auto-play on ready [next_play]", group="Playback", on_change="_on_autoplay_changed", ) loop = Property( False, hint="Loop playback [next_play]", group="Playback", on_change="_on_loop_changed", ) queue_free_on_end = Property( False, hint="Remove this node from the tree once playback finishes [next_play]", group="Playback", on_change="_on_queue_free_on_end_changed", ) max_distance = Property( 2000.0, range=(1.0, 10000.0), hint="Max hearing distance (pixels) [live]", group="Spatial", on_change="_on_spatial_changed", ) attenuation = Property( 1.0, range=(0.1, 4.0), hint="Distance attenuation exponent [live]", group="Spatial", on_change="_on_spatial_changed", ) pan_override = Property( None, hint="Override positional pan; None = use world position [live]", group="Playback", on_change="_on_pan_override_changed", ) gizmo_colour = Colour((0.6, 0.4, 1.0, 0.5))
[docs] def get_gizmo_lines(self) -> list[tuple[Vec2, Vec2]]: """Return circle showing the audio range.""" from .physics_nodes import _circle_lines_2d p = self.world_position return _circle_lines_2d(p.x, p.y, float(self.max_distance), 32)
def __init__(self, stream: AudioSource | AudioStream | None = None, **kwargs): super().__init__(**kwargs) self._init_playback(stream)
[docs] def on_ready(self): self._autoplay_check()
# ------------------------------------------------------------------ # Spatial helpers # ------------------------------------------------------------------ def _attenuated_volume_db(self, distance: float) -> float: """Return distance-attenuated volume_db for a given listener distance. Includes any active fade offset on top of the base ``volume_db``. """ if distance > self.max_distance: return -80.0 # volume_db = base - 80 * (dist / max_dist) ^ attenuation dist_ratio = distance / self.max_distance return self._effective_volume_db() - 80.0 * (dist_ratio**self.attenuation) def _compute_positional_pan(self) -> float: """Compute the positional stereo pan (-1=left, 0=center, 1=right).""" tree = self.tree listener = tree.audio_listener_2d() if tree is not None else None listener_x = float(listener.position.x) if listener is not None else 0.0 dx = self.world_position.x - listener_x return clamp(dx / self.max_distance, -1.0, 1.0) def _push_audio_state(self, backend, channel) -> None: """Send distance-attenuated volume + (override or positional) pan.""" tree = self.tree listener = tree.audio_listener_2d() if tree is not None else None if listener is not None: distance = (self.world_position - listener.position).length() else: distance = self.world_position.length() volume = self._attenuated_volume_db(distance) pan = self.pan_override if self.pan_override is not None else self._compute_positional_pan() backend.update_audio_2d(channel, volume, pan) def _on_pan_override_changed(self) -> None: """Push the new pan to the active backend channel.""" if getattr(self, "_suppress_audio_push", False): return backend = self._get_backend() channel = getattr(self, "_backend_channel", None) if backend is None or channel is None: return self._push_audio_state(backend, channel)
[docs] def set_pan_and_gain(self, pan: float, gain_db: float) -> None: """Atomically set ``pan_override`` + ``volume_db`` with one backend call. Use when both must land on the same audio frame (e.g. fade-out before retrigger). Setting them as separate Property assignments would fire two backend updates. """ self._suppress_audio_push = True try: self.pan_override = pan self.volume_db = gain_db finally: self._suppress_audio_push = False backend = self._get_backend() channel = getattr(self, "_backend_channel", None) if backend is not None and channel is not None: self._push_audio_state(backend, channel)
[docs] def on_process(self, delta: float): """Update 2D spatialization each frame; tick fades; auto-free.""" # Pending reap (from fade_out → stop) must run even after _playing # flips to False, otherwise queue_free_on_end never fires for the # fade-out-into-cleanup pattern. Check first so the early-return # path below doesn't skip it. if self._check_queue_free_on_end(): return if not self._playing or self._paused: return self._tick_fade(delta) backend = self._get_backend() if backend and self._backend_channel is not None: self._push_audio_state(backend, self._backend_channel) self._check_queue_free_on_end()
[docs] def play(self, from_position: float = 0.0): """Start or resume playback.""" if not self._play_common(from_position): return backend = self._get_backend() if backend: # Pre-compute attenuated volume + pan from the current listener # state so the backend can apply them *before* the first audio # buffer is rendered. Without this, the channel plays unattenuated # + centred for one buffer (~20-100 ms depending on backend) # before the per-frame ``update_audio_2d`` lands: audible for # short SFX (footsteps, gunshots) or sources spawned beyond # max_distance. Fixes bug-audio-legacy-spatial-first-frame. tree = self.tree listener = tree.audio_listener_2d() if tree is not None else None if listener is not None: distance = (self.world_position - listener.position).length() else: distance = self.world_position.length() initial_volume = self._attenuated_volume_db(distance) initial_pan = ( self.pan_override if self.pan_override is not None else self._compute_positional_pan() ) self._backend_channel = backend.play_audio( self.stream, mode="2d", position=self.world_position, volume_db=initial_volume, pitch=self.pitch_scale, loop=self.loop, bus=self.bus, max_distance=self.max_distance, from_position=from_position, pan=float(initial_pan), )
# ============================================================================ # AudioStreamPlayer3D: 3D spatial audio # ============================================================================
[docs] class AudioStreamPlayer3D(_AudioPlaybackMixin, Node3D): """3D spatial audio player with distance attenuation and directional panning. Audio volume and stereo panning are calculated based on distance and direction from the 3D listener (typically Camera3D position/orientation). Settings: volume_db: Base volume in decibels (-80 to 24). pitch_scale: Playback speed multiplier (0.5 to 2.0). bus: Audio bus name. autoplay: Start playing when added to scene tree. loop: Loop playback when finished. max_distance: Distance at which audio is inaudible (world units). attenuation: Distance attenuation exponent (1.0 = linear, 2.0 = inverse square). doppler_scale: Doppler effect strength (0.0 = off, 1.0 = realistic). """ volume_db = Property( 0.0, range=(-80.0, 24.0), hint="Volume in decibels [live]", group="Playback", on_change="_on_volume_db_changed", ) pitch_scale = Property( 1.0, range=(0.5, 2.0), hint="Playback speed [live: pushed mid-playback via backend.set_pitch]", group="Playback", on_change="_on_pitch_scale_changed", ) bus = Property( "SFX", enum=["Master", "Music", "SFX", "Voice", "UI"], hint="Audio bus [next_play]", group="Playback", on_change="_on_bus_changed", ) autoplay = Property( False, hint="Auto-play on ready [next_play]", group="Playback", on_change="_on_autoplay_changed", ) loop = Property( False, hint="Loop playback [next_play]", group="Playback", on_change="_on_loop_changed", ) queue_free_on_end = Property( False, hint="Remove this node from the tree once playback finishes [next_play]", group="Playback", on_change="_on_queue_free_on_end_changed", ) max_distance = Property( 100.0, range=(1.0, 1000.0), hint="Max hearing distance [live]", group="Spatial", on_change="_on_spatial_changed", ) attenuation = Property( 1.0, range=(0.1, 4.0), hint="Distance attenuation exponent [live]", group="Spatial", on_change="_on_spatial_changed", ) doppler_scale = Property( 0.0, range=(0.0, 4.0), hint="Doppler effect strength [live]", group="Spatial", on_change="_on_spatial_changed", ) pan_override = Property( None, hint="Override directional pan; None = use world position [live]", group="Playback", on_change="_on_pan_override_changed", ) gizmo_colour = Colour((0.6, 0.4, 1.0, 0.5))
[docs] def get_gizmo_lines(self) -> list[tuple[Vec3, Vec3]]: """Return 3 circles showing the audio range sphere.""" from .physics_nodes import _circle_lines_3d p = self.world_position r = float(self.max_distance) lines: list[tuple[Vec3, Vec3]] = [] lines.extend(_circle_lines_3d(p, Vec3(1, 0, 0), Vec3(0, 1, 0), r)) lines.extend(_circle_lines_3d(p, Vec3(1, 0, 0), Vec3(0, 0, 1), r)) lines.extend(_circle_lines_3d(p, Vec3(0, 1, 0), Vec3(0, 0, 1), r)) return lines
def __init__(self, stream: AudioSource | AudioStream | None = None, **kwargs): super().__init__(**kwargs) self._init_playback(stream) self._prev_position: Vec3 = Vec3() # For Doppler
[docs] def on_ready(self): self._prev_position = self.world_position self._autoplay_check()
# ------------------------------------------------------------------ # Spatial helpers # ------------------------------------------------------------------ def _compute_3d_state(self, delta: float) -> tuple[float, float, float]: """Return (volume_db, pan, pitch) for the current frame. ``delta`` drives Doppler. Pass 0.0 from on_change handlers (no Doppler kick on a Property nudge: Doppler reapplies next frame). ``_prev_position`` is left untouched here; ``on_process()`` is the sole owner of that state to keep Doppler velocity stable. """ tree = self.tree listener = tree.audio_listener_3d() if tree is not None else None if listener is not None: listener_pos = listener.position listener_forward = listener.forward listener_up = listener.up listener_velocity = listener.velocity else: # No camera and no listener: degrade to origin/default orientation. # Already warned by _autocreate_listener_3d. listener_pos = Vec3() listener_forward = Vec3(0, 0, -1) listener_up = Vec3(0, 1, 0) listener_velocity = Vec3(0, 0, 0) to_source = self.world_position - listener_pos distance = to_source.length() # Distance attenuation if distance > self.max_distance: volume = -80.0 else: d = max(distance, 0.1) # avoid divide-by-zero at the listener dist_ratio = d / self.max_distance volume = self._effective_volume_db() - 80.0 * (dist_ratio**self.attenuation) # Pan: positional unless caller has overridden it if self.pan_override is not None: pan = float(self.pan_override) elif distance > 0.01: to_source_norm = to_source / distance right = listener_forward.cross(listener_up) pan = clamp(to_source_norm.dot(right), -1.0, 1.0) else: pan = 0.0 # Doppler: pitch shift from radial velocity pitch = self.pitch_scale if self.doppler_scale > 0.0 and delta > 0.0: velocity = (self.world_position - self._prev_position) / delta velocity_towards = velocity.dot(-to_source) / (distance if distance > 0.1 else 0.1) speed_of_sound = 343.0 doppler_factor = 1.0 + (velocity_towards / speed_of_sound) * self.doppler_scale pitch = self.pitch_scale * clamp(doppler_factor, 0.5, 2.0) return volume, pan, pitch def _push_audio_state(self, backend, channel) -> None: """Send current 3D volume/pan/pitch (without advancing Doppler velocity).""" volume, pan, pitch = self._compute_3d_state(0.0) backend.update_audio_3d(channel, volume, pan, pitch) def _on_pan_override_changed(self) -> None: if getattr(self, "_suppress_audio_push", False): return backend = self._get_backend() channel = getattr(self, "_backend_channel", None) if backend is None or channel is None: return self._push_audio_state(backend, channel)
[docs] def set_pan_and_gain(self, pan: float, gain_db: float) -> None: """Atomically set ``pan_override`` + ``volume_db`` with one backend call.""" self._suppress_audio_push = True try: self.pan_override = pan self.volume_db = gain_db finally: self._suppress_audio_push = False backend = self._get_backend() channel = getattr(self, "_backend_channel", None) if backend is not None and channel is not None: self._push_audio_state(backend, channel)
[docs] def on_process(self, delta: float): """Update 3D spatialization each frame; tick fades; auto-free.""" # Pending reap (from fade_out → stop) must run even after _playing # flips to False. See AudioStreamPlayer2D.on_process for rationale. if self._check_queue_free_on_end(): return if not self._playing or self._paused: return self._tick_fade(delta) volume, pan, pitch = self._compute_3d_state(delta) self._prev_position = Vec3(self.world_position) backend = self._get_backend() if backend and self._backend_channel is not None: backend.update_audio_3d(self._backend_channel, volume, pan, pitch) self._check_queue_free_on_end()
[docs] def play(self, from_position: float = 0.0): """Start or resume playback.""" if not self._play_common(from_position): return self._prev_position = self.world_position backend = self._get_backend() if backend: # Pre-compute attenuated volume + pan + pitch from the current # 3D listener state so the backend can apply them *before* the # first audio buffer. Mirrors AudioStreamPlayer2D.play. Doppler # is intentionally skipped on the first frame (delta=0.0) so a # stationary source isn't kicked by an undefined prev_position # delta. initial_volume, initial_pan, initial_pitch = self._compute_3d_state(0.0) self._backend_channel = backend.play_audio( self.stream, mode="3d", position=self.world_position, volume_db=initial_volume, pitch=initial_pitch, loop=self.loop, bus=self.bus, max_distance=self.max_distance, from_position=from_position, pan=float(initial_pan), )