Source code for simvx.core.audio_synth

"""Procedural audio synthesis: oscillators, envelopes, AudioSynth.

Pure-numpy DSP for generating audio at runtime without external assets.
The common pattern is "bake then play": build a synth, render a short
clip into an `AudioStream`, and play it through any `AudioStreamPlayer`
on either backend (desktop / web).

Quick start::

    from simvx.core import AudioSynth, Oscillator, ADSR, AudioStreamPlayer

    synth = AudioSynth()
    synth.add(
        Oscillator.sine(440.0),
        envelope=ADSR(attack=0.01, decay=0.1, sustain=0.6, release=0.2),
    )
    pluck = synth.bake(duration=0.4)  # → AudioStream

    player = AudioStreamPlayer(stream=pluck)
    self.add_child(player)
    player.play()

Multiple sources mix together in one bake::

    synth = AudioSynth()
    synth.add(Oscillator.sine(220.0), gain=0.5)              # root
    synth.add(Oscillator.sine(330.0), gain=0.3)              # fifth
    synth.add(Oscillator.noise.white(), gain=0.1, pan=0.3)   # snare hiss
    chord = synth.bake(duration=1.0)

`AudioSynth.bake()` returns an `AudioStream` whose `backend_data` is a
float32 interleaved-stereo ndarray. Both `MiniaudioBackend` (native and
legacy) and `WebAudioBackend` accept that directly: no file I/O, no
extra serialization.

For live procedural synthesis with parameter control, see
`AudioSynth.attach_to()` (streams via `backend.open_stream` and feeds
chunks per frame).
"""

from __future__ import annotations

import abc
import logging
from typing import TYPE_CHECKING, ClassVar

import numpy as np

from .audio_errors import AudioCapabilityError, AudioError, raise_or_warn
from .audio_protocol import AudioStreamingBackend
from .node import Node

log = logging.getLogger(__name__)

if TYPE_CHECKING:
    from .audio import AudioStream, AudioStreamPlayer

__all__ = [
    "AudioSource",
    "Envelope",
    "Filter",
    "LowPass",
    "HighPass",
    "Oscillator",
    "WhiteNoise",
    "PinkNoise",
    "ADSR",
    "Linear",
    "Exponential",
    "AudioSynth",
]


# ===========================================================================
# Sources
# ===========================================================================


[docs] class AudioSource(abc.ABC): """Anything that produces mono float32 audio samples on demand. Subclasses implement `render(cursor_samples, n_samples, sample_rate)` where `cursor_samples` is the global sample offset (so periodic waves stay phase-continuous across multiple chunked renders). Instantiating an incomplete subclass raises ``TypeError``. """
[docs] @abc.abstractmethod def render(self, cursor_samples: int, n_samples: int, sample_rate: int) -> np.ndarray: raise NotImplementedError( "AudioSource.render is abstract; subclass must produce mono float32 samples." )
class _SineOscillator(AudioSource): """Pure tone at `freq` Hz with optional initial phase offset (radians).""" def __init__(self, freq: float, phase: float = 0.0): self.freq = float(freq) self.phase = float(phase) def render(self, cursor_samples: int, n_samples: int, sample_rate: int) -> np.ndarray: t = (np.arange(n_samples, dtype=np.float64) + cursor_samples) / sample_rate return np.sin(2 * np.pi * self.freq * t + self.phase).astype(np.float32) class _SquareOscillator(AudioSource): """Square wave at `freq` Hz with pulse-width `duty` (0..1, 0.5 = symmetric).""" def __init__(self, freq: float, duty: float = 0.5): self.freq = float(freq) self.duty = float(np.clip(duty, 0.01, 0.99)) def render(self, cursor_samples: int, n_samples: int, sample_rate: int) -> np.ndarray: t = (np.arange(n_samples, dtype=np.float64) + cursor_samples) / sample_rate phase = (self.freq * t) % 1.0 return np.where(phase < self.duty, 1.0, -1.0).astype(np.float32) class _SawOscillator(AudioSource): """Sawtooth wave at `freq` Hz, ramping from -1 to +1 each cycle.""" def __init__(self, freq: float): self.freq = float(freq) def render(self, cursor_samples: int, n_samples: int, sample_rate: int) -> np.ndarray: t = (np.arange(n_samples, dtype=np.float64) + cursor_samples) / sample_rate phase = (self.freq * t) % 1.0 return (2.0 * phase - 1.0).astype(np.float32) class _TriangleOscillator(AudioSource): """Triangle wave at `freq` Hz.""" def __init__(self, freq: float): self.freq = float(freq) def render(self, cursor_samples: int, n_samples: int, sample_rate: int) -> np.ndarray: t = (np.arange(n_samples, dtype=np.float64) + cursor_samples) / sample_rate phase = (self.freq * t) % 1.0 # Up 0→1 in [0, 0.5], down 1→0 in [0.5, 1] → scale to [-1, +1] return (4.0 * np.abs(phase - 0.5) - 1.0).astype(np.float32)
[docs] class WhiteNoise(AudioSource): """Uniform-distribution white noise. Deterministic via the bundled RNG seed.""" def __init__(self, seed: int | None = None): self._rng = np.random.default_rng(seed)
[docs] def render(self, cursor_samples: int, n_samples: int, sample_rate: int) -> np.ndarray: return self._rng.uniform(-1.0, 1.0, n_samples).astype(np.float32)
[docs] class PinkNoise(AudioSource): """Approximate pink noise via the Voss-McCartney algorithm. Cheap to compute, sounds substantially warmer than white noise. Useful for ambient layers and percussion. """ _NUM_ROWS = 16 def __init__(self, seed: int | None = None): self._rng = np.random.default_rng(seed) self._rows = self._rng.uniform(-1.0, 1.0, self._NUM_ROWS) self._counter = 0
[docs] def render(self, cursor_samples: int, n_samples: int, sample_rate: int) -> np.ndarray: out = np.empty(n_samples, dtype=np.float32) rows = self._rows for i in range(n_samples): self._counter += 1 # Find the lowest set bit; that row updates this sample. lowest = (self._counter & -self._counter).bit_length() - 1 if lowest < self._NUM_ROWS: rows[lowest] = self._rng.uniform(-1.0, 1.0) # Sum across rows + a fresh "white" sample, scaled to [-1, 1]. out[i] = (rows.sum() + self._rng.uniform(-1.0, 1.0)) / (self._NUM_ROWS + 1) return out
class _NoiseFactory: """Namespace for noise generators. Available as `Oscillator.noise.<kind>()`.""" @staticmethod def white(seed: int | None = None) -> AudioSource: return WhiteNoise(seed) @staticmethod def pink(seed: int | None = None) -> AudioSource: return PinkNoise(seed)
[docs] class Oscillator: """Namespace for oscillator + noise constructors. Mirrors Web Audio's ``OscillatorNode`` waveform set plus a ``noise`` sub-namespace for stochastic sources. All members are static: never instantiate ``Oscillator`` itself; call its classmethods directly:: Oscillator.sine(440.0) Oscillator.square(220.0, duty=0.25) Oscillator.noise.white(seed=42) """ # ``noise`` is a class attribute (single shared factory instance): not # grafted onto the public symbol after construction, so the type checker # sees it as a regular attribute. The previous design used a module-level # ``Oscillator = _OscillatorFactory()`` instance with a post-hoc # ``Oscillator.noise = _NoiseFactory() # type: ignore`` graft. noise: ClassVar[_NoiseFactory] = _NoiseFactory()
[docs] @staticmethod def sine(freq: float, *, phase: float = 0.0) -> AudioSource: return _SineOscillator(freq, phase)
[docs] @staticmethod def square(freq: float, *, duty: float = 0.5) -> AudioSource: return _SquareOscillator(freq, duty)
[docs] @staticmethod def saw(freq: float) -> AudioSource: return _SawOscillator(freq)
[docs] @staticmethod def triangle(freq: float) -> AudioSource: return _TriangleOscillator(freq)
# =========================================================================== # Envelopes # ===========================================================================
[docs] class Envelope(abc.ABC): """Multiplier curve applied over the total duration of a baked clip. Envelopes don't know about gate-on/off: they receive the full duration and lay out their shape inside it. For real-time note-on/note-off behaviour, render shorter clips and crossfade in user code (or stream via `AudioSynth.attach_to`). Instantiating an incomplete subclass raises ``TypeError``. """
[docs] @abc.abstractmethod def render_total(self, total_duration: float, sample_rate: int) -> np.ndarray: raise NotImplementedError( "Envelope.render_total is abstract; subclass must return a per-sample multiplier curve." )
[docs] class ADSR(Envelope): """Attack / Decay / Sustain / Release. `attack`, `decay`, `release` are in seconds; `sustain` is a level [0, 1] held for whatever time remains after the attack and decay portions consume their share of `total_duration`. If the total duration is too short to fit the full ADSR, the release portion truncates from the start of the release phase. """ def __init__( self, *, attack: float = 0.01, decay: float = 0.05, sustain: float = 1.0, release: float = 0.05, ): self.attack = max(0.0, float(attack)) self.decay = max(0.0, float(decay)) self.sustain = float(np.clip(sustain, 0.0, 1.0)) self.release = max(0.0, float(release))
[docs] def render_total(self, total_duration: float, sample_rate: int) -> np.ndarray: n_total = max(0, int(total_duration * sample_rate)) if n_total == 0: return np.zeros(0, dtype=np.float32) n_a = int(self.attack * sample_rate) n_d = int(self.decay * sample_rate) n_r = int(self.release * sample_rate) n_s = max(0, n_total - n_a - n_d - n_r) env = np.zeros(n_total, dtype=np.float32) cursor = 0 if n_a > 0: end = min(cursor + n_a, n_total) env[cursor:end] = np.linspace(0.0, 1.0, end - cursor, dtype=np.float32) cursor = end if cursor < n_total and n_d > 0: end = min(cursor + n_d, n_total) env[cursor:end] = np.linspace(1.0, self.sustain, end - cursor, dtype=np.float32) cursor = end if cursor < n_total and n_s > 0: end = min(cursor + n_s, n_total) env[cursor:end] = self.sustain cursor = end if cursor < n_total: # Release: sustain → 0 over whatever room is left. remaining = n_total - cursor env[cursor:] = np.linspace(self.sustain, 0.0, remaining, dtype=np.float32) return env
[docs] class Linear(Envelope): """Straight-line ramp from `start` to `end` over the whole duration.""" def __init__(self, *, start: float = 1.0, end: float = 0.0): self.start = float(start) self.end = float(end)
[docs] def render_total(self, total_duration: float, sample_rate: int) -> np.ndarray: n = max(0, int(total_duration * sample_rate)) if n == 0: return np.zeros(0, dtype=np.float32) return np.linspace(self.start, self.end, n, dtype=np.float32)
[docs] class Exponential(Envelope): """Exponential curve: ``start * (end/start)^(t/duration)``. `start` and `end` must be strictly positive (exponential interpolation is undefined through zero). Use `Linear` for fades to silence. """ def __init__(self, *, start: float = 1.0, end: float = 0.01, power: float = 1.0): if start <= 0 or end <= 0: raise ValueError("Exponential envelope requires strictly positive start and end.") self.start = float(start) self.end = float(end) self.power = float(power)
[docs] def render_total(self, total_duration: float, sample_rate: int) -> np.ndarray: n = max(0, int(total_duration * sample_rate)) if n == 0: return np.zeros(0, dtype=np.float32) t = np.linspace(0.0, 1.0, n, dtype=np.float64) ** self.power ratio = self.end / self.start return (self.start * (ratio ** t)).astype(np.float32)
# =========================================================================== # Per-source filters # ===========================================================================
[docs] class Filter(abc.ABC): """Per-source DSP filter applied after the source render + envelope. Lighter than bus-level effects (``simvx.core.audio_effect.LowPassFilter`` etc.): these run in numpy inside `AudioSynth.bake()` and `render_chunk()`, so the filter shape is baked into the resulting `AudioStream` and travels with it through any backend. Use bus effects when you want the filter to apply to **everything** routed through a bus (mood-driven low-pass underwater scenes, sidechain compression). Use `Filter` when you want a single source in an `AudioSynth` to have a specific filter shape baked in (e.g. q1k3's filtered noise bursts for shotgun blasts). Instantiating an incomplete subclass raises ``TypeError``. """
[docs] @abc.abstractmethod def apply(self, samples: np.ndarray, sample_rate: int) -> np.ndarray: raise NotImplementedError( "Filter.apply is abstract; subclass must return the filtered sample buffer." )
[docs] class LowPass(Filter): """First-order one-pole low-pass. ``a = exp(-2*pi*cutoff_hz / sample_rate)`` ``y[k] = a * y[k-1] + (1-a) * x[k]`` First-order: -6 dB/octave above cutoff. Use the bus-level ``simvx.core.audio_effect.LowPassFilter`` for steeper (2nd-order biquad) cuts. """ def __init__(self, cutoff_hz: float): self.cutoff_hz = float(cutoff_hz)
[docs] def apply(self, samples: np.ndarray, sample_rate: int) -> np.ndarray: if samples.size == 0: return samples a = float(np.exp(-2.0 * np.pi * self.cutoff_hz / sample_rate)) # Recursive one-pole; numpy doesn't have a built-in IIR so we use # a tight Python loop with a numpy-typed accumulator. Acceptable # cost because AudioSynth is for short SFX (typically < 1 s). out = np.empty_like(samples) state = 0.0 b = 1.0 - a for i in range(samples.size): state = a * state + b * samples[i] out[i] = state return out
[docs] class HighPass(Filter): """First-order one-pole high-pass. ``a = exp(-2*pi*cutoff_hz / sample_rate)`` ``y[k] = a * (y[k-1] + x[k] - x[k-1])`` -6 dB/octave below cutoff. Use the bus-level ``simvx.core.audio_effect.HighPassFilter`` for steeper biquad cuts. """ def __init__(self, cutoff_hz: float): self.cutoff_hz = float(cutoff_hz)
[docs] def apply(self, samples: np.ndarray, sample_rate: int) -> np.ndarray: if samples.size == 0: return samples a = float(np.exp(-2.0 * np.pi * self.cutoff_hz / sample_rate)) out = np.empty_like(samples) prev_y = 0.0 prev_x = 0.0 for i in range(samples.size): x = float(samples[i]) y = a * (prev_y + x - prev_x) out[i] = y prev_x = x prev_y = y return out
# =========================================================================== # AudioSynth # =========================================================================== class _Voice: """One slot in an AudioSynth: a source + optional envelope/filter + mix params.""" __slots__ = ("source", "envelope", "filter", "gain", "pan") def __init__( self, source: AudioSource, envelope: Envelope | None = None, filter: Filter | None = None, gain: float = 1.0, pan: float = 0.0, ): self.source = source self.envelope = envelope self.filter = filter self.gain = gain self.pan = pan
[docs] class AudioSynth: """Composes audio sources into a bakeable / streamable synth. Construct, `add()` one or more `(source, envelope, gain, pan)` voices, then `bake(duration)` to render an `AudioStream` ready to play. """ def __init__(self): self._voices: list[_Voice] = []
[docs] def add( self, source: AudioSource, *, envelope: Envelope | None = None, filter: Filter | None = None, gain: float = 1.0, pan: float = 0.0, ) -> int: """Add a voice. Returns the voice id (index) for later mutation. `filter` is an optional per-source DSP filter (e.g. `LowPass`, `HighPass`) applied after the source render and envelope but before gain / pan. For bus-wide effects use `AudioBus.add_effect`. """ self._voices.append(_Voice(source, envelope, filter, gain, pan)) return len(self._voices) - 1
[docs] @property def voices(self) -> list[_Voice]: """Read-only-ish view of the voice list. Mutate elements in place.""" return self._voices
[docs] def clear(self) -> None: """Remove all voices. The synth becomes silent.""" self._voices.clear()
[docs] @property def voice_count(self) -> int: return len(self._voices)
[docs] def set_param(self, voice_id: int, name: str, value: object) -> None: """Mutate a parameter on a voice's source. Useful for live tweaks. Looks up `name` on the source via `setattr`. Common targets: ``freq``, ``duty``, ``phase``. Voices with no such attribute raise `AttributeError`. """ if not (0 <= voice_id < len(self._voices)): raise IndexError(f"voice_id {voice_id} out of range (0..{len(self._voices) - 1})") voice = self._voices[voice_id] if hasattr(voice.source, name): setattr(voice.source, name, value) elif name == "gain": voice.gain = float(value) # type: ignore[arg-type] elif name == "pan": voice.pan = float(value) # type: ignore[arg-type] else: raise AttributeError( f"Voice {voice_id} source ({type(voice.source).__name__}) has no attribute {name!r}" )
[docs] def render_chunk( self, cursor_samples: int, n_samples: int, *, sample_rate: int = 48000, channels: int = 2, soft_clip: bool = True, ) -> np.ndarray: """Render `n_samples` of mixed output starting at the given cursor. Voices keep phase across consecutive calls (sources receive `cursor_samples` so periodic waves remain continuous), so this is the canonical "chunk this synth for streaming" API. Envelopes are *not* applied here: use `bake()` for one-shot baked clips where the envelope shape spans the whole clip. Returns a float32 interleaved buffer (`n_samples * channels`). """ if channels not in (1, 2): raise ValueError(f"channels must be 1 or 2, got {channels}") out = np.zeros(n_samples * channels, dtype=np.float32) for voice in self._voices: mono = voice.source.render(cursor_samples, n_samples, sample_rate) if voice.filter is not None: mono = voice.filter.apply(mono, sample_rate) mono = mono * voice.gain if channels == 2: pan = float(np.clip(voice.pan, -1.0, 1.0)) left_gain = min(1.0, 1.0 - pan) right_gain = min(1.0, 1.0 + pan) out[0::2] += mono * left_gain out[1::2] += mono * right_gain else: out[:] += mono if soft_clip: np.clip(out, -1.0, 1.0, out=out) return out
[docs] def attach_to( self, player, *, chunk_seconds: float = 0.1, sample_rate: int = 48000, channels: int = 2, ): """Drive `player` with live synth output as long as the driver lives. Adds a small `_AudioSynthDriver` node as a child of `player` which opens a streaming channel on the active audio backend and feeds chunks of `chunk_seconds` worth of synth output every process tick. `set_param` mutations on the synth take effect at the start of the next chunk (so a 100 ms chunk has up to 100 ms parameter latency). Lower `chunk_seconds` for more responsive control at the cost of more per-frame work. Returns the driver node so the caller can `remove_child` it to stop streaming. **Backend support:** works on all three backends. The native ma_engine path uses an `ma_pcm_rb` ring buffer (default 0.5 s); the legacy path appends to a Python `bytearray`; the web path posts to an AudioWorkletNode. Underrun is silent padding on all three. """ driver = _AudioSynthDriver( self, player, chunk_seconds=chunk_seconds, sample_rate=sample_rate, channels=channels, ) player.add_child(driver) return driver
[docs] def bake( self, duration: float, *, sample_rate: int = 48000, channels: int = 2, soft_clip: bool = True, ) -> AudioStream: """Render the synth into an `AudioStream` of length `duration` seconds. All voices mix into a single buffer; the result is stored as a float32 interleaved ndarray (`backend_data`) on the returned AudioStream. Both desktop and web backends accept this format directly. `soft_clip=True` (default) clips the final mix to [-1, +1] so over-mixed voices don't wrap. Set to False for clean overflow handling upstream (rare). """ if channels not in (1, 2): raise ValueError(f"channels must be 1 or 2, got {channels}") if duration <= 0: raise ValueError(f"duration must be positive, got {duration}") n_total = int(duration * sample_rate) if n_total == 0: n_total = 1 # avoid empty buffer out = np.zeros(n_total * channels, dtype=np.float32) for voice in self._voices: mono = voice.source.render(0, n_total, sample_rate) if voice.envelope is not None: env = voice.envelope.render_total(duration, sample_rate) # Defensive: envelope length should match n_total but trim/pad if not. if env.shape[0] >= n_total: env = env[:n_total] else: env = np.pad(env, (0, n_total - env.shape[0])) mono = mono * env if voice.filter is not None: mono = voice.filter.apply(mono, sample_rate) mono = mono * voice.gain if channels == 2: # Equal-amplitude pan: pan=-1 hard left, pan=+1 hard right. pan = float(np.clip(voice.pan, -1.0, 1.0)) left_gain = min(1.0, 1.0 - pan) right_gain = min(1.0, 1.0 + pan) out[0::2] += mono * left_gain out[1::2] += mono * right_gain else: out[:] += mono if soft_clip: np.clip(out, -1.0, 1.0, out=out) from .audio import AudioStream # Use ``from_pcm`` so the stream carries the synth's rate/channel # count: the native decoder uses these to set up the AudioBuffer # correctly; without them a 44.1 kHz bake would play at the # backend's 48 kHz default and pitch-shift +9%. return AudioStream.from_pcm( out, sample_rate=sample_rate, channels=channels, name="synth_bake", )
# =========================================================================== # _AudioSynthDriver: Node that pumps synth chunks per frame # =========================================================================== class _AudioSynthDriver(Node): """Per-frame streaming driver for `AudioSynth.attach_to(player)`. Opens a streaming channel on the active audio backend when entering the scene tree, then feeds chunks of synth output every ``on_process`` tick. Each chunk is `chunk_seconds * sample_rate` samples wide; voices keep phase across chunks (the driver maintains the global cursor). Cleans up the stream channel on exit. The synth itself isn't owned: callers can mutate parameters via `synth.set_param(...)` and changes appear in the next chunk. """ def __init__( self, synth: AudioSynth, player: AudioStreamPlayer, *, chunk_seconds: float = 0.1, sample_rate: int = 48000, channels: int = 2, ): super().__init__() self.name = "AudioSynthDriver" self._synth = synth self._player = player self._chunk_seconds = float(chunk_seconds) self._sample_rate = int(sample_rate) self._channels = int(channels) self._cursor_samples = 0 self._channel: int | None = None self._backend = None self._warned_unsupported = False @property def synth(self) -> AudioSynth: return self._synth def on_enter_tree(self): # Need both facets: AudioStreamingBackend for open/feed and # AudioPlaybackBackend for stop_audio on teardown. Every streaming- # capable backend (Miniaudio native, legacy, Web) implements both, # so check the narrower facet first and store the union backend. tree_backend = self.tree.audio_backend if tree_backend is None: log.warning( "AudioSynth.attach_to: no audio backend on tree; synth will be silent" ) return if not isinstance(tree_backend, AudioStreamingBackend): # NullAudioBackend (and any future playback-only backend) doesn't # implement streaming; raise loud rather than silently dropping # the synth output. The error message points the user at the # install path that makes streaming available. raise AudioCapabilityError( "streaming", backend=type(tree_backend).__name__, advertised=tree_backend.list_capabilities(), remediation=( "AudioSynth.attach_to requires an AudioStreamingBackend " "(open_stream / feed_audio_chunk). The active backend doesn't " "implement streaming: install the native extension " "(uv run --with setuptools simvx build-audio) or use a " "non-streaming player (AudioStream.tone, AudioSynth.bake)." ), ) self._backend = tree_backend bus = getattr(self._player, "bus", "Master") or "Master" # open_stream failures are real: backend rejected the request, no fallback. # Let AudioError propagate so the caller knows the synth won't produce sound. self._channel = self._backend.open_stream(bus=bus) def on_exit_tree(self): if self._channel is not None and self._backend is not None: try: self._backend.stop_audio(self._channel) except AudioError as exc: # Best-effort cleanup during scene teardown: don't crash exit. raise_or_warn( exc, key="audio.synth.exit_stop_failed", message="AudioSynth driver: stop_audio failed during on_exit_tree", ) self._channel = None self._backend = None def on_process(self, dt: float): if self._channel is None or self._backend is None: return if self._synth.voice_count == 0: # Synth idle: push silence so the stream doesn't underrun. n = max(1, int(self._chunk_seconds * self._sample_rate)) silence = b"\x00" * (n * self._channels * 2) # Silence-pad feed_audio_chunk; if this fails the stream will # underrun audibly. Surface in strict mode, warn-once otherwise. try: self._backend.feed_audio_chunk(self._channel, silence) except AudioError as exc: raise_or_warn( exc, key="audio.synth.feed_silence_failed", message="AudioSynth driver: feed_audio_chunk failed (silence pad)", ) self._cursor_samples += n return n = max(1, int(self._chunk_seconds * self._sample_rate)) chunk = self._synth.render_chunk( self._cursor_samples, n, sample_rate=self._sample_rate, channels=self._channels, soft_clip=True, ) # Float32 [-1, +1] → int16 interleaved bytes int16 = (chunk * 32767.0).astype(np.int16, copy=False).tobytes() try: self._backend.feed_audio_chunk(self._channel, int16) except AudioError as exc: raise_or_warn( exc, key="audio.synth.feed_chunk_failed", message="AudioSynth driver: feed_audio_chunk failed", ) self._cursor_samples += n