audio: local STT and mic capture
stt.py wraps faster-whisper for fully on-device transcription. audio.py captures via sounddevice with two paths: silence-segmented for listen mode and held-key for ptt. resolves the input device from config (auto/index/name). Signed-off-by: disqualifier <dev@disqualifier.me>
This commit is contained in:
parent
732bad4c8d
commit
da7c39c4f2
155
src/claudedo/audio.py
Normal file
155
src/claudedo/audio.py
Normal file
@ -0,0 +1,155 @@
|
||||
"""mic capture via sounddevice — the WSL-hard part.
|
||||
|
||||
device selection resolves config's stt.device ("auto" | index | name substring) to
|
||||
a concrete sounddevice input device. two capture paths:
|
||||
- record_until_silence(): listen mode — stream until trailing silence segments the
|
||||
utterance (no streaming STT; chunk-on-silence is enough for commands).
|
||||
- record_while(predicate): ptt mode — capture while predicate() is true (key held).
|
||||
|
||||
the WSLg/PulseArmy path is verified separately by `claudedo test-audio`; if capture
|
||||
fails here the fix-chain is the apt deps + ~/.asoundrc + Windows mic permission.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
import queue
|
||||
import time
|
||||
from typing import Callable
|
||||
|
||||
import numpy as np
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class AudioError(Exception):
|
||||
"""raised when no usable input device is found or capture fails."""
|
||||
|
||||
|
||||
def list_devices() -> list[dict]:
|
||||
"""return sounddevice's device table (for test-audio / debugging)."""
|
||||
import sounddevice as sd
|
||||
|
||||
return list(sd.query_devices())
|
||||
|
||||
|
||||
def resolve_device(spec: str) -> int | None:
|
||||
"""resolve a device spec to a sounddevice input index, or None for default.
|
||||
|
||||
spec: "auto" -> default input; a digit string -> that index; otherwise a
|
||||
case-insensitive substring of a device name with input channels.
|
||||
"""
|
||||
import sounddevice as sd
|
||||
|
||||
if spec in ("", "auto", "default"):
|
||||
return None
|
||||
if spec.isdigit():
|
||||
return int(spec)
|
||||
spec_low = spec.lower()
|
||||
for idx, dev in enumerate(sd.query_devices()):
|
||||
if dev.get("max_input_channels", 0) > 0 and spec_low in dev["name"].lower():
|
||||
return idx
|
||||
raise AudioError(f"no input device matching {spec!r}")
|
||||
|
||||
|
||||
def _rms(block: np.ndarray) -> float:
|
||||
if block.size == 0:
|
||||
return 0.0
|
||||
return float(np.sqrt(np.mean(np.square(block, dtype=np.float64))))
|
||||
|
||||
|
||||
def record_until_silence(samplerate: int, channels: int, device: int | None,
|
||||
silence_threshold: float, silence_duration: float,
|
||||
min_utterance: float, max_utterance: float,
|
||||
stop: Callable[[], bool] | None = None) -> np.ndarray | None:
|
||||
"""capture one utterance, ending after trailing silence. returns mono float32.
|
||||
|
||||
blocks until speech is detected and then trailing silence segments it, or until
|
||||
stop() returns true (clean shutdown). returns None if stopped before any speech
|
||||
or if the captured utterance is shorter than min_utterance.
|
||||
"""
|
||||
import sounddevice as sd
|
||||
|
||||
block_dur = 0.05
|
||||
blocksize = int(samplerate * block_dur)
|
||||
q: "queue.Queue[np.ndarray]" = queue.Queue()
|
||||
|
||||
def _cb(indata, _frames, _time, status):
|
||||
if status:
|
||||
log.debug("audio status: %s", status)
|
||||
q.put(indata.copy())
|
||||
|
||||
collected: list[np.ndarray] = []
|
||||
speaking = False
|
||||
silence_run = 0.0
|
||||
started_at = time.monotonic()
|
||||
|
||||
with sd.InputStream(samplerate=samplerate, channels=channels, device=device,
|
||||
dtype="float32", blocksize=blocksize, callback=_cb):
|
||||
while True:
|
||||
if stop is not None and stop():
|
||||
break
|
||||
try:
|
||||
block = q.get(timeout=0.2)
|
||||
except queue.Empty:
|
||||
if not speaking and time.monotonic() - started_at > 600:
|
||||
started_at = time.monotonic()
|
||||
continue
|
||||
mono = block.reshape(-1) if channels == 1 else block.mean(axis=1)
|
||||
level = _rms(mono)
|
||||
if level >= silence_threshold:
|
||||
speaking = True
|
||||
silence_run = 0.0
|
||||
collected.append(mono)
|
||||
elif speaking:
|
||||
silence_run += block_dur
|
||||
collected.append(mono)
|
||||
if silence_run >= silence_duration:
|
||||
break
|
||||
if speaking and (time.monotonic() - started_at) > max_utterance:
|
||||
log.debug("utterance hit max_utterance cap")
|
||||
break
|
||||
|
||||
if not collected:
|
||||
return None
|
||||
audio = np.concatenate(collected).astype(np.float32)
|
||||
if audio.size / samplerate < min_utterance:
|
||||
return None
|
||||
return audio
|
||||
|
||||
|
||||
def record_while(samplerate: int, channels: int, device: int | None,
|
||||
held: Callable[[], bool], max_utterance: float,
|
||||
min_utterance: float) -> np.ndarray | None:
|
||||
"""capture while held() is true (push-to-talk). returns mono float32 or None."""
|
||||
import sounddevice as sd
|
||||
|
||||
block_dur = 0.05
|
||||
blocksize = int(samplerate * block_dur)
|
||||
q: "queue.Queue[np.ndarray]" = queue.Queue()
|
||||
|
||||
def _cb(indata, _frames, _time, status):
|
||||
if status:
|
||||
log.debug("audio status: %s", status)
|
||||
q.put(indata.copy())
|
||||
|
||||
collected: list[np.ndarray] = []
|
||||
started_at = time.monotonic()
|
||||
with sd.InputStream(samplerate=samplerate, channels=channels, device=device,
|
||||
dtype="float32", blocksize=blocksize, callback=_cb):
|
||||
while held():
|
||||
try:
|
||||
block = q.get(timeout=0.1)
|
||||
except queue.Empty:
|
||||
continue
|
||||
mono = block.reshape(-1) if channels == 1 else block.mean(axis=1)
|
||||
collected.append(mono)
|
||||
if (time.monotonic() - started_at) > max_utterance:
|
||||
break
|
||||
|
||||
if not collected:
|
||||
return None
|
||||
audio = np.concatenate(collected).astype(np.float32)
|
||||
if audio.size / samplerate < min_utterance:
|
||||
return None
|
||||
return audio
|
||||
52
src/claudedo/stt.py
Normal file
52
src/claudedo/stt.py
Normal file
@ -0,0 +1,52 @@
|
||||
"""faster-whisper wrapper: load a model once, transcribe audio chunks locally.
|
||||
|
||||
privacy invariant: transcription runs entirely on-device. audio handed here is a
|
||||
short in-memory chunk; nothing is written to disk or sent anywhere.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
|
||||
import numpy as np
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class Transcriber:
|
||||
"""a loaded faster-whisper model that transcribes float32 mono audio chunks."""
|
||||
|
||||
def __init__(self, model: str = "small", language: str = "en", device: str = "auto",
|
||||
compute_type: str = "auto") -> None:
|
||||
self.language = language
|
||||
self._model = self._load(model, device, compute_type)
|
||||
|
||||
@staticmethod
|
||||
def _load(model: str, device: str, compute_type: str):
|
||||
from faster_whisper import WhisperModel
|
||||
|
||||
if device == "auto":
|
||||
device = "cpu"
|
||||
if compute_type == "auto":
|
||||
compute_type = "int8" if device == "cpu" else "float16"
|
||||
log.info("loading faster-whisper model=%s device=%s compute=%s", model, device, compute_type)
|
||||
return WhisperModel(model, device=device, compute_type=compute_type)
|
||||
|
||||
def transcribe(self, audio: np.ndarray, samplerate: int = 16000) -> str:
|
||||
"""transcribe a mono float32 numpy array to a stripped text string.
|
||||
|
||||
the audio must be 16 kHz mono float32 in [-1, 1]; resample upstream if not.
|
||||
"""
|
||||
if audio.dtype != np.float32:
|
||||
audio = audio.astype(np.float32)
|
||||
if audio.ndim > 1:
|
||||
audio = audio.reshape(-1)
|
||||
segments, _info = self._model.transcribe(
|
||||
audio,
|
||||
language=self.language,
|
||||
beam_size=1,
|
||||
vad_filter=True,
|
||||
condition_on_previous_text=False,
|
||||
)
|
||||
text = " ".join(seg.text for seg in segments).strip()
|
||||
return text
|
||||
Loading…
Reference in New Issue
Block a user