style: drop inline comments, trim docstring periods

remove inline comments (CLAUDE.md: docstrings only), strip trailing periods
from single-line docstrings, and fix a PulseArmy->PulseAudio typo. no behavior
change.

Signed-off-by: disqualifier <dev@disqualifier.me>
This commit is contained in:
disqualifier 2026-06-25 18:42:17 -04:00
parent bf516143b5
commit 7f4a6f6699
8 changed files with 36 additions and 39 deletions

View File

@ -1,3 +1,3 @@
"""claudedo — voice-control daemon for claude code (local STT -> tmux send-keys).""" """claudedo — voice-control daemon for claude code (local STT -> tmux send-keys)"""
__version__ = "0.1.0" __version__ = "0.1.0"

View File

@ -1,4 +1,4 @@
"""claudedo CLI: start | stop | status | test-audio | install.""" """claudedo CLI: start | stop | status | test-audio | install"""
from __future__ import annotations from __future__ import annotations
@ -95,7 +95,7 @@ def cmd_test_audio(args: argparse.Namespace) -> int:
config.samplerate, config.channels, device, config.samplerate, config.channels, device,
held=_timed_hold(2.0), max_utterance=3.0, min_utterance=0.0, held=_timed_hold(2.0), max_utterance=3.0, min_utterance=0.0,
) )
except Exception as exc: # noqa: BLE001 — surface any capture failure to the user except Exception as exc:
print(f"\naudio capture FAILED: {exc}", file=sys.stderr) print(f"\naudio capture FAILED: {exc}", file=sys.stderr)
print("fix-chain: install.sh apt deps + ~/.asoundrc pulse shim + Windows mic permission", print("fix-chain: install.sh apt deps + ~/.asoundrc pulse shim + Windows mic permission",
file=sys.stderr) file=sys.stderr)

View File

@ -6,7 +6,7 @@ a concrete sounddevice input device. two capture paths:
utterance (no streaming STT; chunk-on-silence is enough for commands). utterance (no streaming STT; chunk-on-silence is enough for commands).
- record_while(predicate): ptt mode capture while predicate() is true (key held). - record_while(predicate): ptt mode capture while predicate() is true (key held).
the WSLg/PulseArmy path is verified separately by `claudedo test-audio`; if capture the WSLg/PulseAudio path is verified separately by `claudedo test-audio`; if capture
fails here the fix-chain is the apt deps + ~/.asoundrc + Windows mic permission. fails here the fix-chain is the apt deps + ~/.asoundrc + Windows mic permission.
""" """
@ -23,11 +23,11 @@ log = logging.getLogger(__name__)
class AudioError(Exception): class AudioError(Exception):
"""raised when no usable input device is found or capture fails.""" """raised when no usable input device is found or capture fails"""
def list_devices() -> list[dict]: def list_devices() -> list[dict]:
"""return sounddevice's device table (for test-audio / debugging).""" """return sounddevice's device table (for test-audio / debugging)"""
import sounddevice as sd import sounddevice as sd
return list(sd.query_devices()) return list(sd.query_devices())
@ -121,7 +121,7 @@ def record_until_silence(samplerate: int, channels: int, device: int | None,
def record_while(samplerate: int, channels: int, device: int | None, def record_while(samplerate: int, channels: int, device: int | None,
held: Callable[[], bool], max_utterance: float, held: Callable[[], bool], max_utterance: float,
min_utterance: float) -> np.ndarray | None: min_utterance: float) -> np.ndarray | None:
"""capture while held() is true (push-to-talk). returns mono float32 or None.""" """capture while held() is true (push-to-talk). returns mono float32 or None"""
import sounddevice as sd import sounddevice as sd
block_dur = 0.05 block_dur = 0.05

View File

@ -1,4 +1,4 @@
"""load and validate config.toml into a typed Config object with clear errors.""" """load and validate config.toml into a typed Config object with clear errors"""
from __future__ import annotations from __future__ import annotations
@ -10,7 +10,7 @@ from pathlib import Path
try: try:
import tomllib as _toml import tomllib as _toml
_TOML_BINARY = True _TOML_BINARY = True
except ModuleNotFoundError: # python < 3.11 except ModuleNotFoundError:
import tomli as _toml import tomli as _toml
_TOML_BINARY = True _TOML_BINARY = True
@ -27,12 +27,12 @@ DEFAULT_CONFIG_PATHS = (
class ConfigError(Exception): class ConfigError(Exception):
"""raised on a missing or invalid configuration value.""" """raised on a missing or invalid configuration value"""
@dataclass @dataclass
class Config: class Config:
"""validated claudedo configuration.""" """validated claudedo configuration"""
wake_phrases: list[str] wake_phrases: list[str]
mode: str mode: str
@ -53,7 +53,7 @@ class Config:
def find_config_path(explicit: str | os.PathLike | None = None) -> Path: def find_config_path(explicit: str | os.PathLike | None = None) -> Path:
"""resolve the config file path, raising ConfigError if none is found.""" """resolve the config file path, raising ConfigError if none is found"""
candidates: list[Path] = [] candidates: list[Path] = []
if explicit: if explicit:
candidates.append(Path(explicit)) candidates.append(Path(explicit))
@ -79,7 +79,7 @@ def _require(table: dict, section: str, key: str, types: tuple, default=None):
def load_config(explicit: str | os.PathLike | None = None) -> Config: def load_config(explicit: str | os.PathLike | None = None) -> Config:
"""load config.toml from the first existing default path (or an explicit one).""" """load config.toml from the first existing default path (or an explicit one)"""
path = find_config_path(explicit) path = find_config_path(explicit)
try: try:
with open(path, "rb") as fh: with open(path, "rb") as fh:

View File

@ -32,7 +32,7 @@ def _ensure_state_dir() -> None:
def write_state(pid: int, mode: str, target_session: str | None) -> None: def write_state(pid: int, mode: str, target_session: str | None) -> None:
"""write the running daemon's status for `claudedo status` to read.""" """write the running daemon's status for `claudedo status` to read"""
_ensure_state_dir() _ensure_state_dir()
STATEFILE.write_text(json.dumps({ STATEFILE.write_text(json.dumps({
"pid": pid, "pid": pid,
@ -43,7 +43,7 @@ def write_state(pid: int, mode: str, target_session: str | None) -> None:
def read_state() -> dict | None: def read_state() -> dict | None:
"""read the daemon status file, or None if absent/unreadable.""" """read the daemon status file, or None if absent/unreadable"""
try: try:
return json.loads(STATEFILE.read_text(encoding="utf-8")) return json.loads(STATEFILE.read_text(encoding="utf-8"))
except (FileNotFoundError, json.JSONDecodeError, OSError): except (FileNotFoundError, json.JSONDecodeError, OSError):
@ -51,7 +51,7 @@ def read_state() -> dict | None:
def read_pid() -> int | None: def read_pid() -> int | None:
"""return the pid of a running daemon, or None (also clears stale pidfiles).""" """return the pid of a running daemon, or None (also clears stale pidfiles)"""
try: try:
pid = int(PIDFILE.read_text(encoding="utf-8").strip()) pid = int(PIDFILE.read_text(encoding="utf-8").strip())
except (FileNotFoundError, ValueError, OSError): except (FileNotFoundError, ValueError, OSError):
@ -67,7 +67,7 @@ def read_pid() -> int | None:
def stop_running() -> bool: def stop_running() -> bool:
"""signal a running daemon to stop. returns whether one was found.""" """signal a running daemon to stop. returns whether one was found"""
pid = read_pid() pid = read_pid()
if pid is None: if pid is None:
return False return False
@ -105,7 +105,7 @@ class _PTTKey:
class Daemon: class Daemon:
"""owns the capture/transcribe/inject loop and runtime mode switching.""" """owns the capture/transcribe/inject loop and runtime mode switching"""
def __init__(self, config: Config) -> None: def __init__(self, config: Config) -> None:
self.config = config self.config = config
@ -186,7 +186,7 @@ class Daemon:
write_state(os.getpid(), self.mode, target.read_active()) write_state(os.getpid(), self.mode, target.read_active())
def run(self) -> None: def run(self) -> None:
"""run the daemon loop until a stop signal arrives.""" """run the daemon loop until a stop signal arrives"""
_ensure_state_dir() _ensure_state_dir()
PIDFILE.write_text(str(os.getpid()), encoding="utf-8") PIDFILE.write_text(str(os.getpid()), encoding="utf-8")
self._install_signals() self._install_signals()
@ -213,7 +213,7 @@ class Daemon:
def run_daemon(config: Config) -> None: def run_daemon(config: Config) -> None:
"""entry point used by the CLI ``start`` command.""" """entry point used by the CLI ``start`` command"""
if read_pid() is not None: if read_pid() is not None:
raise RuntimeError("claudedo is already running (see `claudedo status`)") raise RuntimeError("claudedo is already running (see `claudedo status`)")
Daemon(config).run() Daemon(config).run()

View File

@ -42,7 +42,7 @@ class Action:
def normalize(text: str) -> str: def normalize(text: str) -> str:
"""lowercase, strip punctuation, collapse whitespace, map number words to digits.""" """lowercase, strip punctuation, collapse whitespace, map number words to digits"""
text = text.lower().strip() text = text.lower().strip()
text = _PUNCT.sub(" ", text) text = _PUNCT.sub(" ", text)
text = _WS.sub(" ", text).strip() text = _WS.sub(" ", text).strip()
@ -57,7 +57,7 @@ def _ratio(a: str, b: str) -> float:
def _wake_variants(phrase: str) -> set[str]: def _wake_variants(phrase: str) -> set[str]:
"""spaced and despaced forms of a wake phrase for lenient matching.""" """spaced and despaced forms of a wake phrase for lenient matching"""
norm = normalize(phrase) norm = normalize(phrase)
return {norm, norm.replace(" ", "")} return {norm, norm.replace(" ", "")}
@ -104,7 +104,7 @@ def _fuzzy_in(token: str, options: tuple[str, ...], threshold: float) -> bool:
def match_command(remainder: str, threshold: float) -> Action | None: def match_command(remainder: str, threshold: float) -> Action | None:
"""map a normalized command remainder to an Action, or None if unrecognized.""" """map a normalized command remainder to an Action, or None if unrecognized"""
remainder = remainder.strip() remainder = remainder.strip()
if not remainder: if not remainder:
return None return None
@ -152,7 +152,7 @@ def match_command(remainder: str, threshold: float) -> Action | None:
def parse(transcript: str, wake_phrases: list[str], threshold: float, def parse(transcript: str, wake_phrases: list[str], threshold: float,
require_wake: bool) -> Action | None: require_wake: bool) -> Action | None:
"""full parse: wake gate then command match. None means discard.""" """full parse: wake gate then command match. None means discard"""
remainder = strip_wake(transcript, wake_phrases, threshold, require_wake) remainder = strip_wake(transcript, wake_phrases, threshold, require_wake)
if remainder is None: if remainder is None:
return None return None

View File

@ -14,7 +14,7 @@ log = logging.getLogger(__name__)
class Transcriber: class Transcriber:
"""a loaded faster-whisper model that transcribes float32 mono audio chunks.""" """a loaded faster-whisper model that transcribes float32 mono audio chunks"""
def __init__(self, model: str = "small", language: str = "en", device: str = "auto", def __init__(self, model: str = "small", language: str = "en", device: str = "auto",
compute_type: str = "auto") -> None: compute_type: str = "auto") -> None:

View File

@ -1,4 +1,4 @@
"""resolve the active claude code tmux session from ~/.claude-active.""" """resolve the active claude code tmux session from ~/.claude-active"""
from __future__ import annotations from __future__ import annotations
@ -26,7 +26,7 @@ def session_name(name: str) -> str:
def read_active() -> str | None: def read_active() -> str | None:
"""return the target session name from ~/.claude-active, or None if unset.""" """return the target session name from ~/.claude-active, or None if unset"""
try: try:
name = ACTIVE_FILE.read_text(encoding="utf-8").strip() name = ACTIVE_FILE.read_text(encoding="utf-8").strip()
except FileNotFoundError: except FileNotFoundError:
@ -38,7 +38,7 @@ def read_active() -> str | None:
def write_active(name: str) -> None: def write_active(name: str) -> None:
"""overwrite ~/.claude-active with a session name (used by ``switch``).""" """overwrite ~/.claude-active with a session name (used by ``switch``)"""
ACTIVE_FILE.write_text(name + "\n", encoding="utf-8") ACTIVE_FILE.write_text(name + "\n", encoding="utf-8")
@ -51,7 +51,7 @@ def set_target(name: str) -> str:
def session_exists(name: str) -> bool: def session_exists(name: str) -> bool:
"""true if a tmux session with this name currently exists.""" """true if a tmux session with this name currently exists"""
if not name: if not name:
return False return False
result = subprocess.run( result = subprocess.run(
@ -67,6 +67,13 @@ def resolve_target() -> str | None:
never guesses a target: on a missing/empty ~/.claude-active or a stale session never guesses a target: on a missing/empty ~/.claude-active or a stale session
name, this logs a clear warning and returns None so the caller injects nothing. name, this logs a clear warning and returns None so the caller injects nothing.
TODO: most-recently-active targeting (preferred over attached). today the target
is the project most recently ATTACHED to (the cc kit writes ~/.claude-active on
attach); upgrade to the session claude most recently asked a question in, via
tmux session_activity timestamps (list-sessions -F '#{session_name}
#{session_activity}', pick the highest-activity claude-* session) or by scraping
panes (capture-pane) for a waiting-prompt UI.
""" """
name = read_active() name = read_active()
if not name: if not name:
@ -76,13 +83,3 @@ def resolve_target() -> str | None:
log.warning("target session %r no longer exists — skipping injection", name) log.warning("target session %r no longer exists — skipping injection", name)
return None return None
return name return name
# TODO: most-recently-active targeting (preferred over attached). today the target
# is "the project most recently ATTACHED to" (the cc kit writes ~/.claude-active on
# attach). upgrade to "the session claude most recently asked a question / produced
# output in" via tmux session_activity timestamps:
# tmux list-sessions -F '#{session_name} #{session_activity}'
# pick the highest-activity claude-* session; or scrape panes
# (tmux capture-pane -p -t <s>) for a waiting-prompt UI and target the session whose
# pane currently shows one.