Source code for simvx.core._audio_playback

"""
Shared audio playback control: :class:`_AudioPlaybackMixin`.

Private leaf module behind the :mod:`simvx.core.audio` facade. Provides the
play/stop/pause/fade/crossfade/queue-free state machine shared by every
player node. The player nodes live in the sibling ``_audio_players`` module;
the stream/data layer lives in ``_audio_stream``.
"""

from __future__ import annotations

import logging
from typing import Any

from . import audio_errors
from ._audio_stream import AudioSource, AudioStream
from .audio_errors import (
    AudioError,
    AudioMutationDuringPlaybackError,
    raise_or_warn,
    warn_once,
)

log = logging.getLogger(__name__)

# ============================================================================
# _AudioPlaybackMixin: Shared play/stop/pause/is_playing logic
# ============================================================================

class _AudioPlaybackMixin:
    """Mixin providing common audio playback state and backend interaction.

    Subclasses must set self.stream, self._playing, self._paused,
    self._backend_channel in __init__, and define volume_db, pitch_scale,
    bus, autoplay, loop Settings.
    """

    def _init_playback(self, stream: AudioSource | AudioStream | None):
        self.stream: AudioStream | None = None
        if stream is not None:
            self.stream = stream if isinstance(stream, AudioStream) else AudioStream(stream)
        self._playing: bool = False
        self._paused: bool = False
        self._backend_channel: Any = None
        # Suppresses on_change-driven backend pushes while ``set_pan_and_gain``
        # writes both Properties so the dual-write lands as one backend call.
        self._suppress_audio_push: bool = False
        # Linear-dB fade modulation applied on top of ``volume_db``. Driven by
        # ``fade_out`` / ``crossfade_to``; advances each ``process(delta)`` tick.
        self._fade_db_offset: float = 0.0
        self._fade_target_offset: float = 0.0
        self._fade_step_per_sec: float = 0.0
        self._fade_stop_on_complete: bool = False
        # One-shot reap flag: set when an *internal* stop fires for a player
        # with ``queue_free_on_end=True`` (typically fade_out's terminal stop
        # or a natural-end watcher path). The next ``on_process`` tick honours
        # the flag and removes the node from its parent. Decoupled from the
        # "poll for channel-end" path in ``_check_queue_free_on_end`` so the
        # two state machines compose: fade-out into stop still reaps cleanly.
        self._queue_free_on_end_pending: bool = False

    def _get_backend(self):
        tree = self.tree
        return tree.audio_backend if tree is not None else None

    # ------------------------------------------------------------------
    # Live property push: shared on_change handler
    # ------------------------------------------------------------------

    def _push_audio_state(self, backend, channel) -> None:
        """Push current volume/pan/pitch through ``backend`` for ``channel``.

        Subclasses override this to supply spatial pan/pitch. The base
        non-positional player sends pan=0.
        """
        backend.update_audio_2d(channel, self._effective_volume_db(), 0.0)

    def _on_volume_db_changed(self) -> None:
        """Push live volume change to the active backend channel.

        No-op while not playing or before the backend wires up; tolerates
        partial state because Property on_change can fire during ``__init__``.
        """
        if getattr(self, "_suppress_audio_push", False):
            return
        backend = self._get_backend()
        channel = getattr(self, "_backend_channel", None)
        if backend is None or channel is None:
            return
        self._push_audio_state(backend, channel)

    # ------------------------------------------------------------------
    # Property mutation policy: live / next_play
    # ------------------------------------------------------------------

    def _is_playing(self) -> bool:
        """Return True iff this player owns an active backend channel.

        Used by ``next_play`` Property handlers to gate the strict/warn
        path. Independent of ``is_playing()`` (the public accessor) which
        also checks the paused flag: a paused channel is still bound to
        the backend and so should be considered "playing" for the
        mutation-policy check.
        """
        return getattr(self, "_backend_channel", None) is not None

    def _handle_next_play_mutation(self, prop_name: str) -> None:
        """Strict-raise or warn-once when a ``next_play`` Property is written mid-playback.

        ``next_play`` Properties (``bus``, ``loop``, ``autoplay``,
        ``stream_mode``, ``buffer_size``, ``queue_free_on_end``) require
        a fresh ``play()`` call to take effect. Mutating them while a
        channel is active either raises :class:`AudioMutationDuringPlaybackError`
        (strict mode, dev default) or logs a one-time warning and defers
        the change to the next ``play()`` (non-strict).
        """
        if not self._is_playing():
            return
        player_repr = f"{type(self).__name__}(name={self.name!r})"
        if audio_errors.STRICT:
            raise AudioMutationDuringPlaybackError(prop_name, player=player_repr)
        warn_once(
            f"audio.player.{prop_name}.mid_play",
            "Property %r on %s changed mid-playback; deferred to next play(). "
            "Set SIMVX_AUDIO_STRICT=1 to raise instead.",
            prop_name,
            player_repr,
        )

    def _on_pitch_scale_changed(self) -> None:
        """Push live pitch change to the active backend channel (live policy).

        Mirrors :meth:`_on_volume_db_changed`. ``set_pitch`` is fed the
        already-clamped Property value (clamp range ``[0.5, 2.0]`` lives
        on the Property declaration), so the backend never sees an
        out-of-bounds pitch.
        """
        if getattr(self, "_suppress_audio_push", False):
            return
        backend = self._get_backend()
        channel = getattr(self, "_backend_channel", None)
        if backend is None or channel is None:
            return
        set_pitch = getattr(backend, "set_pitch", None)
        if set_pitch is not None:
            set_pitch(channel, float(self.pitch_scale))

    def _on_bus_changed(self) -> None:
        self._handle_next_play_mutation("bus")

    def _on_loop_changed(self) -> None:
        self._handle_next_play_mutation("loop")

    def _on_autoplay_changed(self) -> None:
        self._handle_next_play_mutation("autoplay")

    def _on_stream_mode_changed(self) -> None:
        self._handle_next_play_mutation("stream_mode")

    def _on_buffer_size_changed(self) -> None:
        self._handle_next_play_mutation("buffer_size")

    def _on_queue_free_on_end_changed(self) -> None:
        self._handle_next_play_mutation("queue_free_on_end")

    def _on_spatial_changed(self) -> None:
        """Live push for spatial Properties (max_distance / attenuation / doppler_scale).

        These all feed into ``_push_audio_state`` (which spatial subclasses
        override to compute volume/pan/pitch from the latest values). One
        canonical handler avoids duplicating the same body on every
        Property.
        """
        if getattr(self, "_suppress_audio_push", False):
            return
        backend = self._get_backend()
        channel = getattr(self, "_backend_channel", None)
        if backend is None or channel is None:
            return
        self._push_audio_state(backend, channel)

    def stop(self):
        """Stop playback."""
        self._playing = False
        self._paused = False
        # Clear fade so the next play() starts at full volume.
        self._fade_db_offset = 0.0
        self._fade_target_offset = 0.0
        self._fade_step_per_sec = 0.0
        self._fade_stop_on_complete = False
        backend = self._get_backend()
        if backend and self._backend_channel is not None:
            backend.stop_audio(self._backend_channel)
            self._backend_channel = None

    def pause(self):
        """Pause playback. Call play() to resume."""
        if not self._playing:
            return
        self._paused = True
        self._playing = False
        backend = self._get_backend()
        if backend and self._backend_channel is not None:
            backend.pause_audio(self._backend_channel)

    def is_playing(self) -> bool:
        """Check if audio is currently playing."""
        return self._playing and not self._paused

    def is_paused(self) -> bool:
        """Check if audio is currently paused (call ``play()`` to resume)."""
        return self._paused

    def _resume_if_paused(self) -> bool:
        """If paused, resume playback and return True; otherwise return False."""
        if not self._paused or self._backend_channel is None:
            return False
        self._playing = True
        self._paused = False
        backend = self._get_backend()
        if backend:
            backend.resume_audio(self._backend_channel)
        return True

    # ------------------------------------------------------------------
    # Fade / crossfade
    # ------------------------------------------------------------------

    def _effective_volume_db(self) -> float:
        """Current playback volume in dB, including any active fade offset.

        ``volume_db`` is the user-set target; ``_fade_db_offset`` is a
        time-varying modulation (negative attenuates, 0 is no fade).
        """
        return max(-80.0, self.volume_db + self._fade_db_offset)

    def _start_fade(self, target_offset: float, seconds: float, *, stop_on_complete: bool) -> None:
        """Start a linear-dB fade toward ``target_offset`` over ``seconds``.

        ``seconds <= 0`` snaps to the target on the next state push.
        """
        if seconds <= 0.0:
            self._fade_db_offset = target_offset
            self._fade_target_offset = target_offset
            self._fade_step_per_sec = 0.0
            self._fade_stop_on_complete = stop_on_complete
            if stop_on_complete:
                # Internal terminal stop: let queue_free_on_end reap on the
                # next process tick (mirrors the _tick_fade completion path).
                if getattr(self, "queue_free_on_end", False) and not getattr(self, "loop", False):
                    self._queue_free_on_end_pending = True
                self.stop()
            return
        self._fade_target_offset = target_offset
        self._fade_step_per_sec = (target_offset - self._fade_db_offset) / seconds
        self._fade_stop_on_complete = stop_on_complete

    def _tick_fade(self, delta: float) -> bool:
        """Advance the active fade. Returns True if the audio state needs a push."""
        if self._fade_step_per_sec == 0.0 and self._fade_db_offset == self._fade_target_offset:
            return False
        if self._fade_step_per_sec == 0.0:
            return False
        new_offset = self._fade_db_offset + self._fade_step_per_sec * delta
        # Snap when crossing the target.
        if (self._fade_step_per_sec > 0.0 and new_offset >= self._fade_target_offset) or (
            self._fade_step_per_sec < 0.0 and new_offset <= self._fade_target_offset
        ):
            new_offset = self._fade_target_offset
            self._fade_step_per_sec = 0.0
            self._fade_db_offset = new_offset
            if self._fade_stop_on_complete:
                self._fade_stop_on_complete = False
                # Mark this stop as an *internal* terminal stop so the
                # queue_free_on_end machinery reaps the node on the next
                # process tick (see _check_queue_free_on_end_pending).
                if getattr(self, "queue_free_on_end", False) and not getattr(self, "loop", False):
                    self._queue_free_on_end_pending = True
                self.stop()
            return True
        self._fade_db_offset = new_offset
        return True

    def fade_out(self, seconds: float) -> None:
        """Linearly ramp volume to silence over ``seconds`` then stop.

        Idempotent for non-playing players (no-op). Subsequent ``play()`` resets
        the fade state. Used for PyDew Valley's sleep-transition polish.
        """
        if not self._playing or self._backend_channel is None:
            return
        # Target offset = -80 dB relative to current volume_db so the channel
        # reaches the silence floor.
        target = -80.0 - self.volume_db
        self._start_fade(target, seconds, stop_on_complete=True)

    def fade_in(self, seconds: float) -> None:
        """Start playback at silence and linearly ramp up to ``volume_db`` over ``seconds``.

        Starts the player if it isn't already playing. The fade offset is
        seeded at ``-80 dB relative to volume_db`` (silence floor) and
        ramps to ``0 dB offset`` (the configured ``volume_db``). Mirrors
        :meth:`fade_out`.

        Per-player fades replace the removed ``FadeEffect`` bus-level
        effect: fades are a property of the playing source, not the bus.
        """
        # Seed at silence relative to the target volume; ``_effective_volume_db``
        # caps the floor at -80 dB so any deeper offset is harmless.
        self._fade_db_offset = -80.0 - self.volume_db
        if not self._playing:
            self.play()
        self._start_fade(0.0, seconds, stop_on_complete=False)

    def _check_queue_free_on_end(self) -> bool:
        """If ``queue_free_on_end`` is set and the backend channel has ended,
        remove this node from its parent and return True.

        Two reap paths are checked here:

        1. A *pending* reap was scheduled by an internal terminal stop,
           e.g. ``fade_out`` ramped to silence and called ``stop()``. Stop
           cleared ``_playing`` + ``_backend_channel`` so the channel-poll
           path below can't fire; the pending flag is how those two state
           machines compose.
        2. The natural-end poll path: the channel is still active but the
           backend reports it as finished. Detach the node from its parent.

        Loops never trigger queue-free (they only end via explicit ``stop``).
        Explicit-stop paths are handled by ``_exit_tree`` so they don't
        double-free.
        """
        if not getattr(self, "queue_free_on_end", False):
            return False
        if getattr(self, "loop", False):
            return False

        # Path 1: pending reap from an internal terminal stop (fade_out etc.).
        if self._queue_free_on_end_pending:
            self._queue_free_on_end_pending = False
            parent = getattr(self, "parent", None)
            if parent is not None:
                parent.remove_child(self)
            return True

        # Path 2: natural-end poll: channel still bound but backend says done.
        if not self._playing or self._backend_channel is None:
            return False
        backend = self._get_backend()
        if backend is None:
            return False
        is_active = getattr(backend, "is_channel_active", None)
        if is_active is None:
            return False
        if is_active(self._backend_channel):
            return False
        # Channel finished naturally: flush state and detach from parent.
        self._playing = False
        self._backend_channel = None
        parent = getattr(self, "parent", None)
        if parent is not None:
            parent.remove_child(self)
        return True

    def pitch_modulate(self, scale: float) -> None:
        """Live pitch shift on an already-playing stream.

        Sets ``pitch_scale`` and pushes the new resampler ratio to the active
        backend channel. Without this, ``pitch_scale = x`` does not propagate
        mid-stream (HexGL engine hum doppler regression). The base 2D player
        path has no per-frame pitch update, so we route through the
        backend's ``set_pitch`` method directly.

        Clamped to the ``pitch_scale`` Property's [0.5, 2.0] range.
        """
        clamped = max(0.5, min(2.0, float(scale)))
        self.pitch_scale = clamped
        backend = self._get_backend()
        channel = getattr(self, "_backend_channel", None)
        if backend is None or channel is None:
            return
        set_pitch = getattr(backend, "set_pitch", None)
        if set_pitch is not None:
            set_pitch(channel, clamped)

    def crossfade_to(self, other: _AudioPlaybackMixin, seconds: float) -> None:
        """Fade self out while ``other`` fades in over the same duration.

        ``other`` starts (if not already playing) at -80 dB offset and ramps to 0,
        landing exactly at its configured ``volume_db``. Self fades to silence
        and stops at the end of the crossfade.
        """
        # Start other silently, then fade in.
        if not other._playing:
            other._fade_db_offset = -80.0 - other.volume_db
            other.play()
        else:
            other._fade_db_offset = -80.0 - other.volume_db
        other._start_fade(0.0, seconds, stop_on_complete=False)
        self.fade_out(seconds)

    def _play_common(self, from_position: float = 0.0) -> bool:
        """Common play() preamble. Returns True if playback should proceed."""
        if not self.stream:
            return False
        # A pending reap from a previous fade_out terminal stop is invalidated
        # by an explicit re-play: the user is reusing this player, not
        # cleaning it up.
        self._queue_free_on_end_pending = False
        if from_position == 0.0 and self._resume_if_paused():
            return False
        # Stop any active channel from a previous play() before starting a new
        # one, so rapid re-trigger on the same player doesn't pile up
        # overlapping sounds. Use multiple AudioStreamPlayers (pool pattern) if
        # you genuinely want overlapping plays of the same stream.
        if self._backend_channel is not None:
            backend = self._get_backend()
            if backend is not None:
                try:
                    backend.stop_audio(self._backend_channel)
                except AudioError as exc:
                    raise_or_warn(
                        exc,
                        key="audio.player.stop_previous_failed",
                        message="Failed to stop previous channel before re-trigger",
                    )
            self._backend_channel = None
        self._playing = True
        self._paused = False
        return True

    def _autoplay_check(self):
        """Start playback if autoplay is enabled. Call from ready()."""
        if self.autoplay and self.stream:
            self.play()

    def _exit_tree(self):
        super()._exit_tree()
        self.stop()