"""
Shared audio playback control: :class:`_AudioPlaybackMixin`.
Private leaf module behind the :mod:`simvx.core.audio` facade. Provides the
play/stop/pause/fade/crossfade/queue-free state machine shared by every
player node. The player nodes live in the sibling ``_audio_players`` module;
the stream/data layer lives in ``_audio_stream``.
"""
from __future__ import annotations
import logging
from typing import Any
from . import audio_errors
from ._audio_stream import AudioSource, AudioStream
from .audio_errors import (
AudioError,
AudioMutationDuringPlaybackError,
raise_or_warn,
warn_once,
)
log = logging.getLogger(__name__)
# ============================================================================
# _AudioPlaybackMixin: Shared play/stop/pause/is_playing logic
# ============================================================================
class _AudioPlaybackMixin:
"""Mixin providing common audio playback state and backend interaction.
Subclasses must set self.stream, self._playing, self._paused,
self._backend_channel in __init__, and define volume_db, pitch_scale,
bus, autoplay, loop Settings.
"""
def _init_playback(self, stream: AudioSource | AudioStream | None):
self.stream: AudioStream | None = None
if stream is not None:
self.stream = stream if isinstance(stream, AudioStream) else AudioStream(stream)
self._playing: bool = False
self._paused: bool = False
self._backend_channel: Any = None
# Suppresses on_change-driven backend pushes while ``set_pan_and_gain``
# writes both Properties so the dual-write lands as one backend call.
self._suppress_audio_push: bool = False
# Linear-dB fade modulation applied on top of ``volume_db``. Driven by
# ``fade_out`` / ``crossfade_to``; advances each ``process(delta)`` tick.
self._fade_db_offset: float = 0.0
self._fade_target_offset: float = 0.0
self._fade_step_per_sec: float = 0.0
self._fade_stop_on_complete: bool = False
# One-shot reap flag: set when an *internal* stop fires for a player
# with ``queue_free_on_end=True`` (typically fade_out's terminal stop
# or a natural-end watcher path). The next ``on_process`` tick honours
# the flag and removes the node from its parent. Decoupled from the
# "poll for channel-end" path in ``_check_queue_free_on_end`` so the
# two state machines compose: fade-out into stop still reaps cleanly.
self._queue_free_on_end_pending: bool = False
def _get_backend(self):
tree = self.tree
return tree.audio_backend if tree is not None else None
# ------------------------------------------------------------------
# Live property push: shared on_change handler
# ------------------------------------------------------------------
def _push_audio_state(self, backend, channel) -> None:
"""Push current volume/pan/pitch through ``backend`` for ``channel``.
Subclasses override this to supply spatial pan/pitch. The base
non-positional player sends pan=0.
"""
backend.update_audio_2d(channel, self._effective_volume_db(), 0.0)
def _on_volume_db_changed(self) -> None:
"""Push live volume change to the active backend channel.
No-op while not playing or before the backend wires up; tolerates
partial state because Property on_change can fire during ``__init__``.
"""
if getattr(self, "_suppress_audio_push", False):
return
backend = self._get_backend()
channel = getattr(self, "_backend_channel", None)
if backend is None or channel is None:
return
self._push_audio_state(backend, channel)
# ------------------------------------------------------------------
# Property mutation policy: live / next_play
# ------------------------------------------------------------------
def _is_playing(self) -> bool:
"""Return True iff this player owns an active backend channel.
Used by ``next_play`` Property handlers to gate the strict/warn
path. Independent of ``is_playing()`` (the public accessor) which
also checks the paused flag: a paused channel is still bound to
the backend and so should be considered "playing" for the
mutation-policy check.
"""
return getattr(self, "_backend_channel", None) is not None
def _handle_next_play_mutation(self, prop_name: str) -> None:
"""Strict-raise or warn-once when a ``next_play`` Property is written mid-playback.
``next_play`` Properties (``bus``, ``loop``, ``autoplay``,
``stream_mode``, ``buffer_size``, ``queue_free_on_end``) require
a fresh ``play()`` call to take effect. Mutating them while a
channel is active either raises :class:`AudioMutationDuringPlaybackError`
(strict mode, dev default) or logs a one-time warning and defers
the change to the next ``play()`` (non-strict).
"""
if not self._is_playing():
return
player_repr = f"{type(self).__name__}(name={self.name!r})"
if audio_errors.STRICT:
raise AudioMutationDuringPlaybackError(prop_name, player=player_repr)
warn_once(
f"audio.player.{prop_name}.mid_play",
"Property %r on %s changed mid-playback; deferred to next play(). "
"Set SIMVX_AUDIO_STRICT=1 to raise instead.",
prop_name,
player_repr,
)
def _on_pitch_scale_changed(self) -> None:
"""Push live pitch change to the active backend channel (live policy).
Mirrors :meth:`_on_volume_db_changed`. ``set_pitch`` is fed the
already-clamped Property value (clamp range ``[0.5, 2.0]`` lives
on the Property declaration), so the backend never sees an
out-of-bounds pitch.
"""
if getattr(self, "_suppress_audio_push", False):
return
backend = self._get_backend()
channel = getattr(self, "_backend_channel", None)
if backend is None or channel is None:
return
set_pitch = getattr(backend, "set_pitch", None)
if set_pitch is not None:
set_pitch(channel, float(self.pitch_scale))
def _on_bus_changed(self) -> None:
self._handle_next_play_mutation("bus")
def _on_loop_changed(self) -> None:
self._handle_next_play_mutation("loop")
def _on_autoplay_changed(self) -> None:
self._handle_next_play_mutation("autoplay")
def _on_stream_mode_changed(self) -> None:
self._handle_next_play_mutation("stream_mode")
def _on_buffer_size_changed(self) -> None:
self._handle_next_play_mutation("buffer_size")
def _on_queue_free_on_end_changed(self) -> None:
self._handle_next_play_mutation("queue_free_on_end")
def _on_spatial_changed(self) -> None:
"""Live push for spatial Properties (max_distance / attenuation / doppler_scale).
These all feed into ``_push_audio_state`` (which spatial subclasses
override to compute volume/pan/pitch from the latest values). One
canonical handler avoids duplicating the same body on every
Property.
"""
if getattr(self, "_suppress_audio_push", False):
return
backend = self._get_backend()
channel = getattr(self, "_backend_channel", None)
if backend is None or channel is None:
return
self._push_audio_state(backend, channel)
def stop(self):
"""Stop playback."""
self._playing = False
self._paused = False
# Clear fade so the next play() starts at full volume.
self._fade_db_offset = 0.0
self._fade_target_offset = 0.0
self._fade_step_per_sec = 0.0
self._fade_stop_on_complete = False
backend = self._get_backend()
if backend and self._backend_channel is not None:
backend.stop_audio(self._backend_channel)
self._backend_channel = None
def pause(self):
"""Pause playback. Call play() to resume."""
if not self._playing:
return
self._paused = True
self._playing = False
backend = self._get_backend()
if backend and self._backend_channel is not None:
backend.pause_audio(self._backend_channel)
def is_playing(self) -> bool:
"""Check if audio is currently playing."""
return self._playing and not self._paused
def is_paused(self) -> bool:
"""Check if audio is currently paused (call ``play()`` to resume)."""
return self._paused
def _resume_if_paused(self) -> bool:
"""If paused, resume playback and return True; otherwise return False."""
if not self._paused or self._backend_channel is None:
return False
self._playing = True
self._paused = False
backend = self._get_backend()
if backend:
backend.resume_audio(self._backend_channel)
return True
# ------------------------------------------------------------------
# Fade / crossfade
# ------------------------------------------------------------------
def _effective_volume_db(self) -> float:
"""Current playback volume in dB, including any active fade offset.
``volume_db`` is the user-set target; ``_fade_db_offset`` is a
time-varying modulation (negative attenuates, 0 is no fade).
"""
return max(-80.0, self.volume_db + self._fade_db_offset)
def _start_fade(self, target_offset: float, seconds: float, *, stop_on_complete: bool) -> None:
"""Start a linear-dB fade toward ``target_offset`` over ``seconds``.
``seconds <= 0`` snaps to the target on the next state push.
"""
if seconds <= 0.0:
self._fade_db_offset = target_offset
self._fade_target_offset = target_offset
self._fade_step_per_sec = 0.0
self._fade_stop_on_complete = stop_on_complete
if stop_on_complete:
# Internal terminal stop: let queue_free_on_end reap on the
# next process tick (mirrors the _tick_fade completion path).
if getattr(self, "queue_free_on_end", False) and not getattr(self, "loop", False):
self._queue_free_on_end_pending = True
self.stop()
return
self._fade_target_offset = target_offset
self._fade_step_per_sec = (target_offset - self._fade_db_offset) / seconds
self._fade_stop_on_complete = stop_on_complete
def _tick_fade(self, delta: float) -> bool:
"""Advance the active fade. Returns True if the audio state needs a push."""
if self._fade_step_per_sec == 0.0 and self._fade_db_offset == self._fade_target_offset:
return False
if self._fade_step_per_sec == 0.0:
return False
new_offset = self._fade_db_offset + self._fade_step_per_sec * delta
# Snap when crossing the target.
if (self._fade_step_per_sec > 0.0 and new_offset >= self._fade_target_offset) or (
self._fade_step_per_sec < 0.0 and new_offset <= self._fade_target_offset
):
new_offset = self._fade_target_offset
self._fade_step_per_sec = 0.0
self._fade_db_offset = new_offset
if self._fade_stop_on_complete:
self._fade_stop_on_complete = False
# Mark this stop as an *internal* terminal stop so the
# queue_free_on_end machinery reaps the node on the next
# process tick (see _check_queue_free_on_end_pending).
if getattr(self, "queue_free_on_end", False) and not getattr(self, "loop", False):
self._queue_free_on_end_pending = True
self.stop()
return True
self._fade_db_offset = new_offset
return True
def fade_out(self, seconds: float) -> None:
"""Linearly ramp volume to silence over ``seconds`` then stop.
Idempotent for non-playing players (no-op). Subsequent ``play()`` resets
the fade state. Used for PyDew Valley's sleep-transition polish.
"""
if not self._playing or self._backend_channel is None:
return
# Target offset = -80 dB relative to current volume_db so the channel
# reaches the silence floor.
target = -80.0 - self.volume_db
self._start_fade(target, seconds, stop_on_complete=True)
def fade_in(self, seconds: float) -> None:
"""Start playback at silence and linearly ramp up to ``volume_db`` over ``seconds``.
Starts the player if it isn't already playing. The fade offset is
seeded at ``-80 dB relative to volume_db`` (silence floor) and
ramps to ``0 dB offset`` (the configured ``volume_db``). Mirrors
:meth:`fade_out`.
Per-player fades replace the removed ``FadeEffect`` bus-level
effect: fades are a property of the playing source, not the bus.
"""
# Seed at silence relative to the target volume; ``_effective_volume_db``
# caps the floor at -80 dB so any deeper offset is harmless.
self._fade_db_offset = -80.0 - self.volume_db
if not self._playing:
self.play()
self._start_fade(0.0, seconds, stop_on_complete=False)
def _check_queue_free_on_end(self) -> bool:
"""If ``queue_free_on_end`` is set and the backend channel has ended,
remove this node from its parent and return True.
Two reap paths are checked here:
1. A *pending* reap was scheduled by an internal terminal stop,
e.g. ``fade_out`` ramped to silence and called ``stop()``. Stop
cleared ``_playing`` + ``_backend_channel`` so the channel-poll
path below can't fire; the pending flag is how those two state
machines compose.
2. The natural-end poll path: the channel is still active but the
backend reports it as finished. Detach the node from its parent.
Loops never trigger queue-free (they only end via explicit ``stop``).
Explicit-stop paths are handled by ``_exit_tree`` so they don't
double-free.
"""
if not getattr(self, "queue_free_on_end", False):
return False
if getattr(self, "loop", False):
return False
# Path 1: pending reap from an internal terminal stop (fade_out etc.).
if self._queue_free_on_end_pending:
self._queue_free_on_end_pending = False
parent = getattr(self, "parent", None)
if parent is not None:
parent.remove_child(self)
return True
# Path 2: natural-end poll: channel still bound but backend says done.
if not self._playing or self._backend_channel is None:
return False
backend = self._get_backend()
if backend is None:
return False
is_active = getattr(backend, "is_channel_active", None)
if is_active is None:
return False
if is_active(self._backend_channel):
return False
# Channel finished naturally: flush state and detach from parent.
self._playing = False
self._backend_channel = None
parent = getattr(self, "parent", None)
if parent is not None:
parent.remove_child(self)
return True
def pitch_modulate(self, scale: float) -> None:
"""Live pitch shift on an already-playing stream.
Sets ``pitch_scale`` and pushes the new resampler ratio to the active
backend channel. Without this, ``pitch_scale = x`` does not propagate
mid-stream (HexGL engine hum doppler regression). The base 2D player
path has no per-frame pitch update, so we route through the
backend's ``set_pitch`` method directly.
Clamped to the ``pitch_scale`` Property's [0.5, 2.0] range.
"""
clamped = max(0.5, min(2.0, float(scale)))
self.pitch_scale = clamped
backend = self._get_backend()
channel = getattr(self, "_backend_channel", None)
if backend is None or channel is None:
return
set_pitch = getattr(backend, "set_pitch", None)
if set_pitch is not None:
set_pitch(channel, clamped)
def crossfade_to(self, other: _AudioPlaybackMixin, seconds: float) -> None:
"""Fade self out while ``other`` fades in over the same duration.
``other`` starts (if not already playing) at -80 dB offset and ramps to 0,
landing exactly at its configured ``volume_db``. Self fades to silence
and stops at the end of the crossfade.
"""
# Start other silently, then fade in.
if not other._playing:
other._fade_db_offset = -80.0 - other.volume_db
other.play()
else:
other._fade_db_offset = -80.0 - other.volume_db
other._start_fade(0.0, seconds, stop_on_complete=False)
self.fade_out(seconds)
def _play_common(self, from_position: float = 0.0) -> bool:
"""Common play() preamble. Returns True if playback should proceed."""
if not self.stream:
return False
# A pending reap from a previous fade_out terminal stop is invalidated
# by an explicit re-play: the user is reusing this player, not
# cleaning it up.
self._queue_free_on_end_pending = False
if from_position == 0.0 and self._resume_if_paused():
return False
# Stop any active channel from a previous play() before starting a new
# one, so rapid re-trigger on the same player doesn't pile up
# overlapping sounds. Use multiple AudioStreamPlayers (pool pattern) if
# you genuinely want overlapping plays of the same stream.
if self._backend_channel is not None:
backend = self._get_backend()
if backend is not None:
try:
backend.stop_audio(self._backend_channel)
except AudioError as exc:
raise_or_warn(
exc,
key="audio.player.stop_previous_failed",
message="Failed to stop previous channel before re-trigger",
)
self._backend_channel = None
self._playing = True
self._paused = False
return True
def _autoplay_check(self):
"""Start playback if autoplay is enabled. Call from ready()."""
if self.autoplay and self.stream:
self.play()
def _exit_tree(self):
super()._exit_tree()
self.stop()