"""Procedural audio synthesis: oscillators, envelopes, AudioSynth.
Pure-numpy DSP for generating audio at runtime without external assets.
The common pattern is "bake then play": build a synth, render a short
clip into an `AudioStream`, and play it through any `AudioStreamPlayer`
on either backend (desktop / web).
Quick start::
from simvx.core import AudioSynth, Oscillator, ADSR, AudioStreamPlayer
synth = AudioSynth()
synth.add(
Oscillator.sine(440.0),
envelope=ADSR(attack=0.01, decay=0.1, sustain=0.6, release=0.2),
)
pluck = synth.bake(duration=0.4) # → AudioStream
player = AudioStreamPlayer(stream=pluck)
self.add_child(player)
player.play()
Multiple sources mix together in one bake::
synth = AudioSynth()
synth.add(Oscillator.sine(220.0), gain=0.5) # root
synth.add(Oscillator.sine(330.0), gain=0.3) # fifth
synth.add(Oscillator.noise.white(), gain=0.1, pan=0.3) # snare hiss
chord = synth.bake(duration=1.0)
`AudioSynth.bake()` returns an `AudioStream` whose `backend_data` is a
float32 interleaved-stereo ndarray. Both `MiniaudioBackend` (native and
legacy) and `WebAudioBackend` accept that directly: no file I/O, no
extra serialization.
For live procedural synthesis with parameter control, see
`AudioSynth.attach_to()` (streams via `backend.open_stream` and feeds
chunks per frame).
"""
from __future__ import annotations
import abc
import logging
from typing import TYPE_CHECKING, ClassVar
import numpy as np
from .audio_errors import AudioCapabilityError, AudioError, raise_or_warn
from .audio_protocol import AudioStreamingBackend
from .node import Node
log = logging.getLogger(__name__)
if TYPE_CHECKING:
from .audio import AudioStream, AudioStreamPlayer
__all__ = [
"AudioSource",
"Envelope",
"Filter",
"LowPass",
"HighPass",
"Oscillator",
"WhiteNoise",
"PinkNoise",
"ADSR",
"Linear",
"Exponential",
"AudioSynth",
]
# ===========================================================================
# Sources
# ===========================================================================
[docs]
class AudioSource(abc.ABC):
"""Anything that produces mono float32 audio samples on demand.
Subclasses implement `render(cursor_samples, n_samples, sample_rate)`
where `cursor_samples` is the global sample offset (so periodic
waves stay phase-continuous across multiple chunked renders).
Instantiating an incomplete subclass raises ``TypeError``.
"""
[docs]
@abc.abstractmethod
def render(self, cursor_samples: int, n_samples: int, sample_rate: int) -> np.ndarray:
raise NotImplementedError(
"AudioSource.render is abstract; subclass must produce mono float32 samples."
)
class _SineOscillator(AudioSource):
"""Pure tone at `freq` Hz with optional initial phase offset (radians)."""
def __init__(self, freq: float, phase: float = 0.0):
self.freq = float(freq)
self.phase = float(phase)
def render(self, cursor_samples: int, n_samples: int, sample_rate: int) -> np.ndarray:
t = (np.arange(n_samples, dtype=np.float64) + cursor_samples) / sample_rate
return np.sin(2 * np.pi * self.freq * t + self.phase).astype(np.float32)
class _SquareOscillator(AudioSource):
"""Square wave at `freq` Hz with pulse-width `duty` (0..1, 0.5 = symmetric)."""
def __init__(self, freq: float, duty: float = 0.5):
self.freq = float(freq)
self.duty = float(np.clip(duty, 0.01, 0.99))
def render(self, cursor_samples: int, n_samples: int, sample_rate: int) -> np.ndarray:
t = (np.arange(n_samples, dtype=np.float64) + cursor_samples) / sample_rate
phase = (self.freq * t) % 1.0
return np.where(phase < self.duty, 1.0, -1.0).astype(np.float32)
class _SawOscillator(AudioSource):
"""Sawtooth wave at `freq` Hz, ramping from -1 to +1 each cycle."""
def __init__(self, freq: float):
self.freq = float(freq)
def render(self, cursor_samples: int, n_samples: int, sample_rate: int) -> np.ndarray:
t = (np.arange(n_samples, dtype=np.float64) + cursor_samples) / sample_rate
phase = (self.freq * t) % 1.0
return (2.0 * phase - 1.0).astype(np.float32)
class _TriangleOscillator(AudioSource):
"""Triangle wave at `freq` Hz."""
def __init__(self, freq: float):
self.freq = float(freq)
def render(self, cursor_samples: int, n_samples: int, sample_rate: int) -> np.ndarray:
t = (np.arange(n_samples, dtype=np.float64) + cursor_samples) / sample_rate
phase = (self.freq * t) % 1.0
# Up 0→1 in [0, 0.5], down 1→0 in [0.5, 1] → scale to [-1, +1]
return (4.0 * np.abs(phase - 0.5) - 1.0).astype(np.float32)
[docs]
class WhiteNoise(AudioSource):
"""Uniform-distribution white noise. Deterministic via the bundled RNG seed."""
def __init__(self, seed: int | None = None):
self._rng = np.random.default_rng(seed)
[docs]
def render(self, cursor_samples: int, n_samples: int, sample_rate: int) -> np.ndarray:
return self._rng.uniform(-1.0, 1.0, n_samples).astype(np.float32)
[docs]
class PinkNoise(AudioSource):
"""Approximate pink noise via the Voss-McCartney algorithm.
Cheap to compute, sounds substantially warmer than white noise.
Useful for ambient layers and percussion.
"""
_NUM_ROWS = 16
def __init__(self, seed: int | None = None):
self._rng = np.random.default_rng(seed)
self._rows = self._rng.uniform(-1.0, 1.0, self._NUM_ROWS)
self._counter = 0
[docs]
def render(self, cursor_samples: int, n_samples: int, sample_rate: int) -> np.ndarray:
out = np.empty(n_samples, dtype=np.float32)
rows = self._rows
for i in range(n_samples):
self._counter += 1
# Find the lowest set bit; that row updates this sample.
lowest = (self._counter & -self._counter).bit_length() - 1
if lowest < self._NUM_ROWS:
rows[lowest] = self._rng.uniform(-1.0, 1.0)
# Sum across rows + a fresh "white" sample, scaled to [-1, 1].
out[i] = (rows.sum() + self._rng.uniform(-1.0, 1.0)) / (self._NUM_ROWS + 1)
return out
class _NoiseFactory:
"""Namespace for noise generators. Available as `Oscillator.noise.<kind>()`."""
@staticmethod
def white(seed: int | None = None) -> AudioSource:
return WhiteNoise(seed)
@staticmethod
def pink(seed: int | None = None) -> AudioSource:
return PinkNoise(seed)
[docs]
class Oscillator:
"""Namespace for oscillator + noise constructors.
Mirrors Web Audio's ``OscillatorNode`` waveform set plus a ``noise``
sub-namespace for stochastic sources. All members are static: never
instantiate ``Oscillator`` itself; call its classmethods directly::
Oscillator.sine(440.0)
Oscillator.square(220.0, duty=0.25)
Oscillator.noise.white(seed=42)
"""
# ``noise`` is a class attribute (single shared factory instance): not
# grafted onto the public symbol after construction, so the type checker
# sees it as a regular attribute. The previous design used a module-level
# ``Oscillator = _OscillatorFactory()`` instance with a post-hoc
# ``Oscillator.noise = _NoiseFactory() # type: ignore`` graft.
noise: ClassVar[_NoiseFactory] = _NoiseFactory()
[docs]
@staticmethod
def sine(freq: float, *, phase: float = 0.0) -> AudioSource:
return _SineOscillator(freq, phase)
[docs]
@staticmethod
def square(freq: float, *, duty: float = 0.5) -> AudioSource:
return _SquareOscillator(freq, duty)
[docs]
@staticmethod
def saw(freq: float) -> AudioSource:
return _SawOscillator(freq)
[docs]
@staticmethod
def triangle(freq: float) -> AudioSource:
return _TriangleOscillator(freq)
# ===========================================================================
# Envelopes
# ===========================================================================
[docs]
class Envelope(abc.ABC):
"""Multiplier curve applied over the total duration of a baked clip.
Envelopes don't know about gate-on/off: they receive the full
duration and lay out their shape inside it. For real-time
note-on/note-off behaviour, render shorter clips and crossfade in
user code (or stream via `AudioSynth.attach_to`).
Instantiating an incomplete subclass raises ``TypeError``.
"""
[docs]
@abc.abstractmethod
def render_total(self, total_duration: float, sample_rate: int) -> np.ndarray:
raise NotImplementedError(
"Envelope.render_total is abstract; subclass must return a per-sample multiplier curve."
)
[docs]
class ADSR(Envelope):
"""Attack / Decay / Sustain / Release.
`attack`, `decay`, `release` are in seconds; `sustain` is a level
[0, 1] held for whatever time remains after the attack and decay
portions consume their share of `total_duration`. If the total
duration is too short to fit the full ADSR, the release portion
truncates from the start of the release phase.
"""
def __init__(
self,
*,
attack: float = 0.01,
decay: float = 0.05,
sustain: float = 1.0,
release: float = 0.05,
):
self.attack = max(0.0, float(attack))
self.decay = max(0.0, float(decay))
self.sustain = float(np.clip(sustain, 0.0, 1.0))
self.release = max(0.0, float(release))
[docs]
def render_total(self, total_duration: float, sample_rate: int) -> np.ndarray:
n_total = max(0, int(total_duration * sample_rate))
if n_total == 0:
return np.zeros(0, dtype=np.float32)
n_a = int(self.attack * sample_rate)
n_d = int(self.decay * sample_rate)
n_r = int(self.release * sample_rate)
n_s = max(0, n_total - n_a - n_d - n_r)
env = np.zeros(n_total, dtype=np.float32)
cursor = 0
if n_a > 0:
end = min(cursor + n_a, n_total)
env[cursor:end] = np.linspace(0.0, 1.0, end - cursor, dtype=np.float32)
cursor = end
if cursor < n_total and n_d > 0:
end = min(cursor + n_d, n_total)
env[cursor:end] = np.linspace(1.0, self.sustain, end - cursor, dtype=np.float32)
cursor = end
if cursor < n_total and n_s > 0:
end = min(cursor + n_s, n_total)
env[cursor:end] = self.sustain
cursor = end
if cursor < n_total:
# Release: sustain → 0 over whatever room is left.
remaining = n_total - cursor
env[cursor:] = np.linspace(self.sustain, 0.0, remaining, dtype=np.float32)
return env
[docs]
class Linear(Envelope):
"""Straight-line ramp from `start` to `end` over the whole duration."""
def __init__(self, *, start: float = 1.0, end: float = 0.0):
self.start = float(start)
self.end = float(end)
[docs]
def render_total(self, total_duration: float, sample_rate: int) -> np.ndarray:
n = max(0, int(total_duration * sample_rate))
if n == 0:
return np.zeros(0, dtype=np.float32)
return np.linspace(self.start, self.end, n, dtype=np.float32)
[docs]
class Exponential(Envelope):
"""Exponential curve: ``start * (end/start)^(t/duration)``.
`start` and `end` must be strictly positive (exponential
interpolation is undefined through zero). Use `Linear` for fades to
silence.
"""
def __init__(self, *, start: float = 1.0, end: float = 0.01, power: float = 1.0):
if start <= 0 or end <= 0:
raise ValueError("Exponential envelope requires strictly positive start and end.")
self.start = float(start)
self.end = float(end)
self.power = float(power)
[docs]
def render_total(self, total_duration: float, sample_rate: int) -> np.ndarray:
n = max(0, int(total_duration * sample_rate))
if n == 0:
return np.zeros(0, dtype=np.float32)
t = np.linspace(0.0, 1.0, n, dtype=np.float64) ** self.power
ratio = self.end / self.start
return (self.start * (ratio ** t)).astype(np.float32)
# ===========================================================================
# Per-source filters
# ===========================================================================
[docs]
class Filter(abc.ABC):
"""Per-source DSP filter applied after the source render + envelope.
Lighter than bus-level effects (``simvx.core.audio_effect.LowPassFilter``
etc.): these run in numpy inside `AudioSynth.bake()` and
`render_chunk()`, so the filter shape is baked into the resulting
`AudioStream` and travels with it through any backend.
Use bus effects when you want the filter to apply to **everything**
routed through a bus (mood-driven low-pass underwater scenes,
sidechain compression). Use `Filter` when you want a single source
in an `AudioSynth` to have a specific filter shape baked in (e.g.
q1k3's filtered noise bursts for shotgun blasts).
Instantiating an incomplete subclass raises ``TypeError``.
"""
[docs]
@abc.abstractmethod
def apply(self, samples: np.ndarray, sample_rate: int) -> np.ndarray:
raise NotImplementedError(
"Filter.apply is abstract; subclass must return the filtered sample buffer."
)
[docs]
class LowPass(Filter):
"""First-order one-pole low-pass.
``a = exp(-2*pi*cutoff_hz / sample_rate)``
``y[k] = a * y[k-1] + (1-a) * x[k]``
First-order: -6 dB/octave above cutoff. Use the bus-level
``simvx.core.audio_effect.LowPassFilter`` for steeper (2nd-order
biquad) cuts.
"""
def __init__(self, cutoff_hz: float):
self.cutoff_hz = float(cutoff_hz)
[docs]
def apply(self, samples: np.ndarray, sample_rate: int) -> np.ndarray:
if samples.size == 0:
return samples
a = float(np.exp(-2.0 * np.pi * self.cutoff_hz / sample_rate))
# Recursive one-pole; numpy doesn't have a built-in IIR so we use
# a tight Python loop with a numpy-typed accumulator. Acceptable
# cost because AudioSynth is for short SFX (typically < 1 s).
out = np.empty_like(samples)
state = 0.0
b = 1.0 - a
for i in range(samples.size):
state = a * state + b * samples[i]
out[i] = state
return out
[docs]
class HighPass(Filter):
"""First-order one-pole high-pass.
``a = exp(-2*pi*cutoff_hz / sample_rate)``
``y[k] = a * (y[k-1] + x[k] - x[k-1])``
-6 dB/octave below cutoff. Use the bus-level
``simvx.core.audio_effect.HighPassFilter`` for steeper biquad cuts.
"""
def __init__(self, cutoff_hz: float):
self.cutoff_hz = float(cutoff_hz)
[docs]
def apply(self, samples: np.ndarray, sample_rate: int) -> np.ndarray:
if samples.size == 0:
return samples
a = float(np.exp(-2.0 * np.pi * self.cutoff_hz / sample_rate))
out = np.empty_like(samples)
prev_y = 0.0
prev_x = 0.0
for i in range(samples.size):
x = float(samples[i])
y = a * (prev_y + x - prev_x)
out[i] = y
prev_x = x
prev_y = y
return out
# ===========================================================================
# AudioSynth
# ===========================================================================
class _Voice:
"""One slot in an AudioSynth: a source + optional envelope/filter + mix params."""
__slots__ = ("source", "envelope", "filter", "gain", "pan")
def __init__(
self,
source: AudioSource,
envelope: Envelope | None = None,
filter: Filter | None = None,
gain: float = 1.0,
pan: float = 0.0,
):
self.source = source
self.envelope = envelope
self.filter = filter
self.gain = gain
self.pan = pan
[docs]
class AudioSynth:
"""Composes audio sources into a bakeable / streamable synth.
Construct, `add()` one or more `(source, envelope, gain, pan)` voices,
then `bake(duration)` to render an `AudioStream` ready to play.
"""
def __init__(self):
self._voices: list[_Voice] = []
[docs]
def add(
self,
source: AudioSource,
*,
envelope: Envelope | None = None,
filter: Filter | None = None,
gain: float = 1.0,
pan: float = 0.0,
) -> int:
"""Add a voice. Returns the voice id (index) for later mutation.
`filter` is an optional per-source DSP filter (e.g. `LowPass`,
`HighPass`) applied after the source render and envelope but
before gain / pan. For bus-wide effects use `AudioBus.add_effect`.
"""
self._voices.append(_Voice(source, envelope, filter, gain, pan))
return len(self._voices) - 1
[docs]
@property
def voices(self) -> list[_Voice]:
"""Read-only-ish view of the voice list. Mutate elements in place."""
return self._voices
[docs]
def clear(self) -> None:
"""Remove all voices. The synth becomes silent."""
self._voices.clear()
[docs]
@property
def voice_count(self) -> int:
return len(self._voices)
[docs]
def set_param(self, voice_id: int, name: str, value: object) -> None:
"""Mutate a parameter on a voice's source. Useful for live tweaks.
Looks up `name` on the source via `setattr`. Common targets:
``freq``, ``duty``, ``phase``. Voices with no such attribute
raise `AttributeError`.
"""
if not (0 <= voice_id < len(self._voices)):
raise IndexError(f"voice_id {voice_id} out of range (0..{len(self._voices) - 1})")
voice = self._voices[voice_id]
if hasattr(voice.source, name):
setattr(voice.source, name, value)
elif name == "gain":
voice.gain = float(value) # type: ignore[arg-type]
elif name == "pan":
voice.pan = float(value) # type: ignore[arg-type]
else:
raise AttributeError(
f"Voice {voice_id} source ({type(voice.source).__name__}) has no attribute {name!r}"
)
[docs]
def render_chunk(
self,
cursor_samples: int,
n_samples: int,
*,
sample_rate: int = 48000,
channels: int = 2,
soft_clip: bool = True,
) -> np.ndarray:
"""Render `n_samples` of mixed output starting at the given cursor.
Voices keep phase across consecutive calls (sources receive
`cursor_samples` so periodic waves remain continuous), so this
is the canonical "chunk this synth for streaming" API. Envelopes
are *not* applied here: use `bake()` for one-shot baked clips
where the envelope shape spans the whole clip.
Returns a float32 interleaved buffer (`n_samples * channels`).
"""
if channels not in (1, 2):
raise ValueError(f"channels must be 1 or 2, got {channels}")
out = np.zeros(n_samples * channels, dtype=np.float32)
for voice in self._voices:
mono = voice.source.render(cursor_samples, n_samples, sample_rate)
if voice.filter is not None:
mono = voice.filter.apply(mono, sample_rate)
mono = mono * voice.gain
if channels == 2:
pan = float(np.clip(voice.pan, -1.0, 1.0))
left_gain = min(1.0, 1.0 - pan)
right_gain = min(1.0, 1.0 + pan)
out[0::2] += mono * left_gain
out[1::2] += mono * right_gain
else:
out[:] += mono
if soft_clip:
np.clip(out, -1.0, 1.0, out=out)
return out
[docs]
def attach_to(
self,
player,
*,
chunk_seconds: float = 0.1,
sample_rate: int = 48000,
channels: int = 2,
):
"""Drive `player` with live synth output as long as the driver lives.
Adds a small `_AudioSynthDriver` node as a child of `player` which
opens a streaming channel on the active audio backend and feeds
chunks of `chunk_seconds` worth of synth output every process tick.
`set_param` mutations on the synth take effect at the start of the
next chunk (so a 100 ms chunk has up to 100 ms parameter latency).
Lower `chunk_seconds` for more responsive control at the cost of
more per-frame work.
Returns the driver node so the caller can `remove_child` it to
stop streaming.
**Backend support:** works on all three backends. The native
ma_engine path uses an `ma_pcm_rb` ring buffer (default 0.5 s);
the legacy path appends to a Python `bytearray`; the web path
posts to an AudioWorkletNode. Underrun is silent padding on all
three.
"""
driver = _AudioSynthDriver(
self,
player,
chunk_seconds=chunk_seconds,
sample_rate=sample_rate,
channels=channels,
)
player.add_child(driver)
return driver
[docs]
def bake(
self,
duration: float,
*,
sample_rate: int = 48000,
channels: int = 2,
soft_clip: bool = True,
) -> AudioStream:
"""Render the synth into an `AudioStream` of length `duration` seconds.
All voices mix into a single buffer; the result is stored as a
float32 interleaved ndarray (`backend_data`) on the returned
AudioStream. Both desktop and web backends accept this format
directly.
`soft_clip=True` (default) clips the final mix to [-1, +1] so
over-mixed voices don't wrap. Set to False for clean overflow
handling upstream (rare).
"""
if channels not in (1, 2):
raise ValueError(f"channels must be 1 or 2, got {channels}")
if duration <= 0:
raise ValueError(f"duration must be positive, got {duration}")
n_total = int(duration * sample_rate)
if n_total == 0:
n_total = 1 # avoid empty buffer
out = np.zeros(n_total * channels, dtype=np.float32)
for voice in self._voices:
mono = voice.source.render(0, n_total, sample_rate)
if voice.envelope is not None:
env = voice.envelope.render_total(duration, sample_rate)
# Defensive: envelope length should match n_total but trim/pad if not.
if env.shape[0] >= n_total:
env = env[:n_total]
else:
env = np.pad(env, (0, n_total - env.shape[0]))
mono = mono * env
if voice.filter is not None:
mono = voice.filter.apply(mono, sample_rate)
mono = mono * voice.gain
if channels == 2:
# Equal-amplitude pan: pan=-1 hard left, pan=+1 hard right.
pan = float(np.clip(voice.pan, -1.0, 1.0))
left_gain = min(1.0, 1.0 - pan)
right_gain = min(1.0, 1.0 + pan)
out[0::2] += mono * left_gain
out[1::2] += mono * right_gain
else:
out[:] += mono
if soft_clip:
np.clip(out, -1.0, 1.0, out=out)
from .audio import AudioStream
# Use ``from_pcm`` so the stream carries the synth's rate/channel
# count: the native decoder uses these to set up the AudioBuffer
# correctly; without them a 44.1 kHz bake would play at the
# backend's 48 kHz default and pitch-shift +9%.
return AudioStream.from_pcm(
out,
sample_rate=sample_rate,
channels=channels,
name="synth_bake",
)
# ===========================================================================
# _AudioSynthDriver: Node that pumps synth chunks per frame
# ===========================================================================
class _AudioSynthDriver(Node):
"""Per-frame streaming driver for `AudioSynth.attach_to(player)`.
Opens a streaming channel on the active audio backend when entering the
scene tree, then feeds chunks of synth output every ``on_process``
tick. Each chunk is `chunk_seconds * sample_rate` samples wide; voices
keep phase across chunks (the driver maintains the global cursor).
Cleans up the stream channel on exit. The synth itself isn't owned:
callers can mutate parameters via `synth.set_param(...)` and changes
appear in the next chunk.
"""
def __init__(
self,
synth: AudioSynth,
player: AudioStreamPlayer,
*,
chunk_seconds: float = 0.1,
sample_rate: int = 48000,
channels: int = 2,
):
super().__init__()
self.name = "AudioSynthDriver"
self._synth = synth
self._player = player
self._chunk_seconds = float(chunk_seconds)
self._sample_rate = int(sample_rate)
self._channels = int(channels)
self._cursor_samples = 0
self._channel: int | None = None
self._backend = None
self._warned_unsupported = False
@property
def synth(self) -> AudioSynth:
return self._synth
def on_enter_tree(self):
# Need both facets: AudioStreamingBackend for open/feed and
# AudioPlaybackBackend for stop_audio on teardown. Every streaming-
# capable backend (Miniaudio native, legacy, Web) implements both,
# so check the narrower facet first and store the union backend.
tree_backend = self.tree.audio_backend
if tree_backend is None:
log.warning(
"AudioSynth.attach_to: no audio backend on tree; synth will be silent"
)
return
if not isinstance(tree_backend, AudioStreamingBackend):
# NullAudioBackend (and any future playback-only backend) doesn't
# implement streaming; raise loud rather than silently dropping
# the synth output. The error message points the user at the
# install path that makes streaming available.
raise AudioCapabilityError(
"streaming",
backend=type(tree_backend).__name__,
advertised=tree_backend.list_capabilities(),
remediation=(
"AudioSynth.attach_to requires an AudioStreamingBackend "
"(open_stream / feed_audio_chunk). The active backend doesn't "
"implement streaming: install the native extension "
"(uv run --with setuptools simvx build-audio) or use a "
"non-streaming player (AudioStream.tone, AudioSynth.bake)."
),
)
self._backend = tree_backend
bus = getattr(self._player, "bus", "Master") or "Master"
# open_stream failures are real: backend rejected the request, no fallback.
# Let AudioError propagate so the caller knows the synth won't produce sound.
self._channel = self._backend.open_stream(bus=bus)
def on_exit_tree(self):
if self._channel is not None and self._backend is not None:
try:
self._backend.stop_audio(self._channel)
except AudioError as exc:
# Best-effort cleanup during scene teardown: don't crash exit.
raise_or_warn(
exc,
key="audio.synth.exit_stop_failed",
message="AudioSynth driver: stop_audio failed during on_exit_tree",
)
self._channel = None
self._backend = None
def on_process(self, dt: float):
if self._channel is None or self._backend is None:
return
if self._synth.voice_count == 0:
# Synth idle: push silence so the stream doesn't underrun.
n = max(1, int(self._chunk_seconds * self._sample_rate))
silence = b"\x00" * (n * self._channels * 2)
# Silence-pad feed_audio_chunk; if this fails the stream will
# underrun audibly. Surface in strict mode, warn-once otherwise.
try:
self._backend.feed_audio_chunk(self._channel, silence)
except AudioError as exc:
raise_or_warn(
exc,
key="audio.synth.feed_silence_failed",
message="AudioSynth driver: feed_audio_chunk failed (silence pad)",
)
self._cursor_samples += n
return
n = max(1, int(self._chunk_seconds * self._sample_rate))
chunk = self._synth.render_chunk(
self._cursor_samples,
n,
sample_rate=self._sample_rate,
channels=self._channels,
soft_clip=True,
)
# Float32 [-1, +1] → int16 interleaved bytes
int16 = (chunk * 32767.0).astype(np.int16, copy=False).tobytes()
try:
self._backend.feed_audio_chunk(self._channel, int16)
except AudioError as exc:
raise_or_warn(
exc,
key="audio.synth.feed_chunk_failed",
message="AudioSynth driver: feed_audio_chunk failed",
)
self._cursor_samples += n