Files
lspower/soundcard_input.py

2170 lines
88 KiB
Python

"""Live soundcard input monitor for DSP-output voltage measurements.
This first milestone intentionally does not do FFT, impedance loading, or
power estimation. It only verifies the audio-interface input path and reports
per-channel RMS/peak levels.
"""
from __future__ import annotations
import argparse
import json
import math
import queue
import sys
import time
from dataclasses import dataclass
from datetime import datetime
from pathlib import Path
from lspower.audio import (
QueuedInputStream,
SoundDeviceManager,
format_channel_list,
parse_channel,
parse_channel_list,
)
from lspower.impedance import ImpedanceCurve
from lspower.power import PowerEstimator, PowerFrame, VoltageSpectrumEstimator, VoltageSpectrumFrame, WayConfig
from lspower.winisd import WinISDLimitCurve, check_excursion_scaling, headroom_status, run_self_test
np = None
# ---------------------------------------------------------------------------
# User defaults
# ---------------------------------------------------------------------------
# None asks sounddevice/PortAudio to use the device default sample rate.
DEFAULT_SAMPLERATE: float | None = None
# Audio callback block size in frames. This is only for input-level monitoring.
DEFAULT_BLOCKSIZE = 2048
DEFAULT_POWER_FFT_SIZE = 8192
DEFAULT_POWER_OVERLAP = 0.75
DEFAULT_POWER_SMOOTHING_ALPHA = 0.95
DEFAULT_POWER_UPDATE_INTERVAL_S = 0.25
DEFAULT_AMP_GAIN_DB = 32.0
DEFAULT_POWER_WARNING_W = 400.0
DEFAULT_POWER_CRITICAL_W = 550.0
WINISD_MAX_POWER_FILE = Path("winisd/max_power.csv")
WINISD_EXCURSION_10W_FILE = Path("winisd/excursion_10w.csv")
WINISD_EXCURSION_100W_FILE = Path("winisd/excursion_100w.csv")
ENABLE_WINISD_HEADROOM = True
HEADROOM_EPS_W = 1e-12
HEADROOM_SMOOTHING_ALPHA = 0.9
# Convert sounddevice's normalized floating-point samples to volts.
#
# The calibration factor is path-specific. It includes the analog input gain,
# driver/API path, Windows recording level, and any mixer/routing gain before
# sounddevice returns samples. It should not be interpreted as the MOTU's
# hardware full-scale voltage unless all of those stages are known to be unity.
#
# Current measured reference:
# Device path: UltraLite mk5 Line In 3-4 through Windows MME, device 3, ch 1
# Reference signal: 1 kHz sine, 2.32 Vrms at the input
# Captured level: 0.021384357 sample units RMS
DEFAULT_INPUT_CALIBRATION_REFERENCE_RMS_V = 2.32
DEFAULT_INPUT_CALIBRATION_REFERENCE_RMS_SAMPLE = 0.021384357
DEFAULT_INPUT_VOLTS_PER_SAMPLE_UNIT = (
DEFAULT_INPUT_CALIBRATION_REFERENCE_RMS_V
/ DEFAULT_INPUT_CALIBRATION_REFERENCE_RMS_SAMPLE
)
DEFAULT_INPUT_FULL_SCALE_SINE_RMS_V = DEFAULT_INPUT_VOLTS_PER_SAMPLE_UNIT / math.sqrt(2.0)
# How often the live console line is refreshed.
DEFAULT_UPDATE_INTERVAL_S = 0.1
# Calibration capture length. Use a steady REW sine generator tone, e.g. 1 kHz,
# and enter the calibrated true-RMS voltage measured at the soundcard input.
DEFAULT_CALIBRATION_DURATION_S = 3.0
MIN_CALIBRATION_RMS_FS = 1e-4
# Warn visually when any selected input approaches digital full scale.
CLIPPING_WARNING_THRESHOLD_FS = 0.98
OSCILLOSCOPE_TRACE_COLORS = ("#1f77b4", "#d62728", "#2ca02c", "#9467bd")
CALIBRATION_DIR = Path("calibrations")
@dataclass(frozen=True)
class InputConfig:
device_index: int
channels_zero_based: list[int]
samplerate: float | None
blocksize: int
input_volts_per_sample_unit: float
update_interval_s: float
@property
def stream_channel_count(self) -> int:
"""Open enough device channels to include the highest selected input."""
return max(self.channels_zero_based) + 1
class CalibrationStore:
def __init__(self, directory: Path = CALIBRATION_DIR):
self.directory = directory
def latest(self) -> dict | None:
if not self.directory.exists():
return None
files = sorted(self.directory.glob("calibration_*.json"))
for path in reversed(files):
try:
data = json.loads(path.read_text(encoding="utf-8"))
except (OSError, json.JSONDecodeError):
continue
if "volts_per_sample_unit" in data:
data["_path"] = str(path)
return data
return None
def latest_scale(self, fallback: float = DEFAULT_INPUT_VOLTS_PER_SAMPLE_UNIT) -> float:
data = self.latest()
if data is None:
return fallback
try:
scale = float(data["volts_per_sample_unit"])
except (TypeError, ValueError):
return fallback
return scale if scale > 0 else fallback
def save(
self,
*,
volts_per_sample_unit: float,
known_rms_v: float,
measured_rms_sample_by_channel: dict[str, float],
peak_sample_by_channel: dict[str, float],
device_index: int,
channels_zero_based: list[int],
samplerate: float | None,
source: str,
) -> Path:
self.directory.mkdir(parents=True, exist_ok=True)
now = datetime.now()
timestamp = now.strftime("%Y%m%d_%H%M%S")
path = self.directory / f"calibration_{timestamp}.json"
data = {
"created_at": now.isoformat(timespec="seconds"),
"source": source,
"volts_per_sample_unit": volts_per_sample_unit,
"equivalent_full_scale_sine_rms_v": volts_per_sample_unit / math.sqrt(2.0),
"known_rms_v": known_rms_v,
"measured_rms_sample_by_channel": measured_rms_sample_by_channel,
"peak_sample_by_channel": peak_sample_by_channel,
"device_index": device_index,
"channels_one_based": [channel + 1 for channel in channels_zero_based],
"samplerate": samplerate,
}
path.write_text(json.dumps(data, indent=2), encoding="utf-8")
return path
def load_dependencies():
global np
try:
import sounddevice as sd # type: ignore
except ImportError as exc:
raise SystemExit(
"The 'sounddevice' package is required.\n"
"Install it with: python -m pip install sounddevice"
) from exc
try:
import numpy as numpy_module # type: ignore
except ImportError as exc:
raise SystemExit(
"The 'numpy' package is required.\n"
"Install it with: python -m pip install numpy"
) from exc
np = numpy_module
return sd
def resolve_config(args: argparse.Namespace, sd) -> InputConfig:
device_manager = SoundDeviceManager(sd)
input_devices = device_manager.print_input_devices()
if args.device is None:
device_index = device_manager.prompt_input_device_index(input_devices)
else:
device_index = args.device
device = device_manager.device_info(device_index)
max_input_channels = int(device["max_input_channels"])
if args.channels is None:
channels_zero_based = device_manager.prompt_channels(max_input_channels)
else:
try:
channels_zero_based = parse_channel_list(args.channels)
device_manager.validate_channels(channels_zero_based, max_input_channels)
except ValueError as exc:
raise SystemExit(f"Invalid channel selection: {exc}") from exc
return InputConfig(
device_index=device_index,
channels_zero_based=channels_zero_based,
samplerate=args.samplerate,
blocksize=args.blocksize,
input_volts_per_sample_unit=args.input_volts_per_sample_unit,
update_interval_s=args.update_interval,
)
def compute_levels(
block,
selected_channels_zero_based: list[int],
input_volts_per_sample_unit: float,
) -> tuple:
if np is None:
raise RuntimeError("numpy has not been loaded")
selected = block[:, selected_channels_zero_based].astype(np.float64, copy=False)
volts = selected * input_volts_per_sample_unit
rms_fs = np.sqrt(np.mean(selected * selected, axis=0))
rms_v = np.sqrt(np.mean(volts * volts, axis=0))
peak_fs = np.max(np.abs(selected), axis=0)
return rms_fs, rms_v, peak_fs
def compute_volts_per_sample_unit(known_rms_v: float, measured_rms_sample) -> tuple:
if np is None:
raise RuntimeError("numpy has not been loaded")
measured = np.asarray(measured_rms_sample, dtype=np.float64)
if np.any(measured < MIN_CALIBRATION_RMS_FS):
raise ValueError(
"measured RMS is too close to zero; start REW's generator and check the selected input channel"
)
per_channel = known_rms_v / measured
return per_channel, float(np.mean(per_channel))
def format_levels(
channels_zero_based: list[int],
rms_fs,
rms_v,
peak_fs,
) -> str:
parts = []
for idx, channel in enumerate(channels_zero_based):
warning = " CLIP?" if peak_fs[idx] >= CLIPPING_WARNING_THRESHOLD_FS else ""
parts.append(
f"ch {channel + 1}: "
f"rms={rms_fs[idx]:.6f} sample / {rms_v[idx]:.6f} V, "
f"peak={peak_fs[idx]:.6f} FS{warning}"
)
return " | ".join(parts)
def smooth_headroom_db(previous: float | None, raw: float, alpha: float) -> float:
if previous is None or not math.isfinite(previous) or not math.isfinite(raw) or alpha <= 0.0:
return raw
return alpha * previous + (1.0 - alpha) * raw
def format_headroom(headroom: dict | None) -> str:
if not headroom:
return "WinISD headroom disabled"
return (
f"WinISD headroom={headroom['worst_headroom_db']:+6.2f} dB "
f"@ {headroom['critical_frequency_hz']:7.1f} Hz | {headroom['status']}"
)
class PowerConsoleRunner:
"""Live console runner for estimated amplifier output power."""
def __init__(self, sd, args: argparse.Namespace):
self.sd = sd
self.args = args
self.device_manager = SoundDeviceManager(sd)
self.ways = self._build_way_configs()
self.channels = sorted({way.channel_zero_based for way in self.ways})
self.device_index = self._resolve_device_index()
self._validate_audio_config()
self.impedances = {
way.name: ImpedanceCurve.from_file(way.impedance_path)
for way in self.ways
}
self.capture = QueuedInputStream(
sd=sd,
device_index=self.device_index,
channels_zero_based=self.channels,
samplerate=args.samplerate,
blocksize=args.blocksize,
)
self.estimators: dict[str, PowerEstimator] = {}
self.buffers: dict[str, object] = {}
self.headroom_curves: dict[str, WinISDLimitCurve] = {}
self.headroom_smoothed_db: dict[str, float] = {}
self.hop_size = 0
@staticmethod
def parse_way(text: str) -> WayConfig:
parts = text.split(":", 2)
if len(parts) != 3:
raise argparse.ArgumentTypeError(
"way must be NAME:CHANNEL:IMPEDANCE_FILE[:GAIN_DB], e.g. LF:1:woofer.csv:34"
)
name = parts[0].strip()
if not name:
raise argparse.ArgumentTypeError("way name must not be empty")
try:
channel = parse_channel(parts[1].strip())
except ValueError as exc:
raise argparse.ArgumentTypeError(str(exc)) from exc
path_text = parts[2].strip()
gain_db = DEFAULT_AMP_GAIN_DB
if ":" in path_text:
maybe_path, maybe_gain = path_text.rsplit(":", 1)
try:
gain_db = float(maybe_gain)
path_text = maybe_path
except ValueError:
pass
return WayConfig(
name=name,
channel_zero_based=channel,
impedance_path=Path(path_text),
amp_gain_db=gain_db,
)
def _build_way_configs(self) -> list[WayConfig]:
ways: list[WayConfig] = []
if self.args.way:
ways.extend(self.args.way)
if self.args.lf_impedance:
ways.append(
WayConfig(
name="LF",
channel_zero_based=self.args.lf_channel,
impedance_path=Path(self.args.lf_impedance),
amp_gain_db=self.args.lf_amp_gain_db,
)
)
if self.args.hf_impedance:
ways.append(
WayConfig(
name="HF",
channel_zero_based=self.args.hf_channel,
impedance_path=Path(self.args.hf_impedance),
amp_gain_db=self.args.hf_amp_gain_db,
)
)
if not ways:
raise SystemExit(
"Configure at least one way with --way NAME:CHANNEL:IMPEDANCE_FILE[:GAIN_DB] "
"or --lf-impedance/--hf-impedance."
)
seen: dict[int, str] = {}
for way in ways:
if way.channel_zero_based in seen:
raise SystemExit(
f"Input channel {way.channel_zero_based + 1} is used by both "
f"{seen[way.channel_zero_based]} and {way.name}."
)
seen[way.channel_zero_based] = way.name
return ways
def _resolve_device_index(self) -> int:
entries = self.device_manager.print_input_devices()
if self.args.device is not None:
return self.args.device
return self.device_manager.prompt_input_device_index(entries)
def _validate_audio_config(self) -> None:
device = self.device_manager.device_info(self.device_index)
max_input_channels = int(device["max_input_channels"])
self.device_manager.validate_channels(self.channels, max_input_channels)
def _create_estimators(self) -> None:
actual_samplerate = self.capture.actual_samplerate
self.hop_size = int(round(self.args.power_fft_size * (1.0 - self.args.power_overlap)))
if self.hop_size <= 0:
raise SystemExit("--power-overlap is too high; resulting hop size is zero")
self.estimators = {
way.name: PowerEstimator(
way=way,
impedance=self.impedances[way.name],
samplerate=actual_samplerate,
fft_size=self.args.power_fft_size,
input_volts_per_sample_unit=self.args.input_volts_per_sample_unit,
smoothing_alpha=self.args.power_smoothing_alpha,
)
for way in self.ways
}
self.buffers = {
way.name: np.zeros(0, dtype=np.float64)
for way in self.ways
}
self._create_headroom_curves()
def _create_headroom_curves(self) -> None:
self.headroom_curves = {}
self.headroom_smoothed_db = {}
if self.args.disable_winisd_headroom:
return
path = Path(self.args.winisd_max_power)
lf_ways = [way for way in self.ways if way.name.upper() == "LF"]
if not lf_ways:
return
try:
curve = WinISDLimitCurve.from_file(path, eps_w=HEADROOM_EPS_W)
except Exception as exc:
print(f"WinISD headroom disabled: {exc}", file=sys.stderr)
return
for way in lf_ways:
self.headroom_curves[way.name] = curve
def format_power_line(self, name: str, frame: PowerFrame) -> str:
headroom = self._compute_headroom(name, frame)
line = (
f"{name}: "
f"Vdsp={frame.rms_input_v:9.4f} Vrms, "
f"Vamp={frame.rms_amp_v:9.2f} Vrms, "
f"P={frame.total_p_w:10.2f} W, "
f"Q={frame.total_q_var:10.2f} var, "
f"Sapp={frame.total_s_va:10.2f} VA"
)
if headroom:
line += ", " + format_headroom(headroom)
return line
def _compute_headroom(self, name: str, frame: PowerFrame) -> dict | None:
curve = self.headroom_curves.get(name)
if curve is None:
return None
headroom = curve.compute_headroom(frame.p_w_per_bin, frame.frequencies_hz)
raw = float(headroom["worst_headroom_db"])
smoothed = smooth_headroom_db(
self.headroom_smoothed_db.get(name),
raw,
self.args.headroom_smoothing_alpha,
)
self.headroom_smoothed_db[name] = smoothed
headroom["worst_headroom_db"] = smoothed
headroom["status"] = headroom_status(smoothed)
headroom["over_limit_bool"] = bool(math.isfinite(smoothed) and smoothed <= 0.0)
return headroom
def run(self) -> int:
try:
self.capture.start()
except Exception as exc:
raise SystemExit(f"Could not open input stream: {exc}") from exc
try:
self._create_estimators()
actual_samplerate = self.capture.actual_samplerate
print()
print("Starting DSP-output power estimator. Press Ctrl+C to stop.")
print(f"Device index: {self.device_index}")
print(f"Actual samplerate: {actual_samplerate:g} Hz")
print(
f"FFT size: {self.args.power_fft_size}, hop size: {self.hop_size}, "
f"df: {actual_samplerate / self.args.power_fft_size:g} Hz"
)
print(f"Input voltage scale: {self.args.input_volts_per_sample_unit:g} V / sample unit")
print("Assumption: ideal linear amplifier voltage gain per way.")
if self.headroom_curves:
print(f"WinISD headroom: {Path(self.args.winisd_max_power)}")
if WINISD_EXCURSION_10W_FILE.exists() and WINISD_EXCURSION_100W_FILE.exists():
try:
print(check_excursion_scaling(WINISD_EXCURSION_10W_FILE, WINISD_EXCURSION_100W_FILE).message)
except Exception as exc:
print(f"Excursion check skipped: {exc}")
else:
print("WinISD headroom: disabled")
for way in self.ways:
curve = self.impedances[way.name]
print(
f" {way.name}: input ch {way.channel_zero_based + 1}, "
f"amp gain {way.amp_gain_db:g} dB ({way.amp_gain_linear:g}x), "
f"impedance {curve.source}, range {curve.frequency_hz[0]:g}-{curve.frequency_hz[-1]:g} Hz"
)
print()
latest_frames: dict[str, PowerFrame] = {}
last_print = 0.0
while True:
try:
block, status = self.capture.queue.get(timeout=0.5)
except queue.Empty:
continue
if status:
print(f"\nStream status: {status}", file=sys.stderr)
for way in self.ways:
self.buffers[way.name] = np.concatenate(
(self.buffers[way.name], block[:, way.channel_zero_based].astype(np.float64))
)
while len(self.buffers[way.name]) >= self.args.power_fft_size:
latest_frames[way.name] = self.estimators[way.name].estimate(
self.buffers[way.name][: self.args.power_fft_size]
)
self.buffers[way.name] = self.buffers[way.name][self.hop_size :]
now = time.monotonic()
if latest_frames and now - last_print >= self.args.power_update_interval:
lines = [
self.format_power_line(name, latest_frames[name])
for name in sorted(latest_frames)
]
print("\r" + " | ".join(lines), end="", flush=True)
last_print = now
except KeyboardInterrupt:
print("\nStopped.")
return 0
finally:
self.capture.stop()
def run_input_calibration(
config: InputConfig,
sd,
known_rms_v: float,
duration_s: float,
wait_for_ready: bool = True,
) -> tuple[float, object, object]:
if np is None:
raise RuntimeError("numpy has not been loaded")
capture = QueuedInputStream(
sd=sd,
device_index=config.device_index,
channels_zero_based=config.channels_zero_based,
samplerate=config.samplerate,
blocksize=config.blocksize,
max_queue_blocks=32,
)
try:
capture.start()
except Exception as exc:
raise SystemExit(f"Could not open input stream: {exc}") from exc
print()
print("Input calibration")
print("Use REW's generator with a steady sine tone, e.g. 1 kHz.")
print("Keep the tone below clipping, then enter its calibrated true-RMS voltage.")
print(f"Device index: {config.device_index}")
print(f"Selected channels: {format_channel_list(config.channels_zero_based)}")
print(f"Known RMS voltage: {known_rms_v:g} V")
if wait_for_ready:
input("Start the REW sine generator now, then press Enter to measure...")
print(f"Measuring for {duration_s:g} s...")
sums = np.zeros(len(config.channels_zero_based), dtype=np.float64)
frame_count = 0
peak_fs = np.zeros(len(config.channels_zero_based), dtype=np.float64)
deadline = time.monotonic() + duration_s
try:
while time.monotonic() < deadline:
try:
block, status = capture.queue.get(timeout=0.5)
except queue.Empty:
continue
if status:
print(f"Stream status: {status}")
selected = block[:, config.channels_zero_based].astype(np.float64, copy=False)
sums += np.sum(selected * selected, axis=0)
frame_count += selected.shape[0]
peak_fs = np.maximum(peak_fs, np.max(np.abs(selected), axis=0))
finally:
capture.stop()
if frame_count == 0:
raise SystemExit("No input samples were captured during calibration.")
measured_rms_fs = np.sqrt(sums / frame_count)
try:
per_channel, average = compute_volts_per_sample_unit(known_rms_v, measured_rms_fs)
except ValueError as exc:
measured_text = ", ".join(
f"ch {channel + 1}: {measured_rms_fs[idx]:.9f} sample units RMS"
for idx, channel in enumerate(config.channels_zero_based)
)
raise SystemExit(f"Calibration failed: {exc}\nMeasured: {measured_text}") from exc
print()
for idx, channel in enumerate(config.channels_zero_based):
warning = " CLIP?" if peak_fs[idx] >= CLIPPING_WARNING_THRESHOLD_FS else ""
print(
f"ch {channel + 1}: measured {measured_rms_fs[idx]:.9f} sample units RMS, "
f"peak {peak_fs[idx]:.6f} FS{warning}, "
f"scale {per_channel[idx]:.9g} V / sample unit"
)
print(f"Average voltage scale: {average:.9g} V / sample unit")
print(f"Equivalent full-scale sine RMS: {average / math.sqrt(2.0):.9g} Vrms")
print(f"Use: --input-volts-per-sample-unit {average:.9g}")
return average, measured_rms_fs, peak_fs
def run_capture(config: InputConfig, sd) -> None:
capture = QueuedInputStream(
sd=sd,
device_index=config.device_index,
channels_zero_based=config.channels_zero_based,
samplerate=config.samplerate,
blocksize=config.blocksize,
max_queue_blocks=8,
)
try:
capture.start()
except Exception as exc:
raise SystemExit(f"Could not open input stream: {exc}") from exc
print()
print("Starting input monitor. Press Ctrl+C to stop.")
print(f"Device index: {config.device_index}")
print(f"Selected channels: {format_channel_list(config.channels_zero_based)}")
print(f"Voltage scale: {config.input_volts_per_sample_unit:g} V / sample unit")
print(
"Equivalent full-scale sine RMS: "
f"{config.input_volts_per_sample_unit / math.sqrt(2.0):g} Vrms"
)
last_print = 0.0
try:
actual_samplerate = capture.actual_samplerate
print(f"Actual stream samplerate: {actual_samplerate:g} Hz")
print()
while True:
try:
block, status = capture.queue.get(timeout=0.5)
except queue.Empty:
continue
now = time.monotonic()
if now - last_print < config.update_interval_s:
continue
rms_fs, rms_v, peak_fs = compute_levels(
block,
config.channels_zero_based,
config.input_volts_per_sample_unit,
)
status_text = f" | status: {status}" if status else ""
line = format_levels(config.channels_zero_based, rms_fs, rms_v, peak_fs)
print(f"\r{line}{status_text}", end="", flush=True)
last_print = now
finally:
capture.stop()
class LevelMonitorGUI:
def __init__(self, sd, args: argparse.Namespace):
import tkinter as tk
from tkinter import messagebox, ttk
self.sd = sd
self.args = args
self.tk = tk
self.messagebox = messagebox
self.ttk = ttk
self.root = tk.Tk()
self.root.title("Soundcard Input Voltage Monitor")
self.root.geometry("1180x820")
self.root.minsize(980, 680)
self.device_manager = SoundDeviceManager(sd)
self.capture: QueuedInputStream | None = None
self.config: InputConfig | None = None
self.calibration_state: dict | None = None
self.input_spectrum_estimators: dict[int, VoltageSpectrumEstimator] = {}
self.input_spectrum_buffers: dict[int, object] = {}
self.input_spectrum_frames: dict[int, VoltageSpectrumFrame] = {}
self.power_estimators: dict[str, PowerEstimator] = {}
self.power_buffers: dict[str, object] = {}
self.power_rows: dict[str, dict] = {}
self.power_history: dict[str, list[tuple[float, float, float]]] = {}
self.power_max_hold_w: dict[str, float] = {}
self.headroom_curves: dict[str, WinISDLimitCurve] = {}
self.headroom_smoothed_db: dict[str, float] = {}
self.power_history_start_s: float | None = None
self.power_hop_size = 0
self.level_rows: list[dict] = []
self.input_metric_rows: dict[int, dict] = {}
self.device_entries = self.device_manager.input_devices()
if not self.device_entries:
raise SystemExit("No input-capable sounddevice devices were found.")
self.device_var = tk.StringVar()
self.channels_var = tk.StringVar(value=args.channels or "1,2")
self.samplerate_var = tk.StringVar(
value="" if args.samplerate is None else f"{args.samplerate:g}"
)
self.blocksize_var = tk.StringVar(value=str(args.blocksize))
self.calibration_var = tk.StringVar(value=f"{args.input_volts_per_sample_unit:g}")
self.known_rms_v_var = tk.StringVar(
value=(
f"{args.calibrate_known_rms_v:g}"
if args.calibrate_known_rms_v
else f"{DEFAULT_INPUT_CALIBRATION_REFERENCE_RMS_V:g}"
)
)
self.calibration_duration_var = tk.StringVar(value=f"{args.calibration_duration:g}")
self.waveform_y_limit_var = tk.StringVar(value="")
self.power_history_window_var = tk.StringVar(value="30")
self.power_fft_size_var = tk.StringVar(value=str(DEFAULT_POWER_FFT_SIZE))
self.power_overlap_var = tk.StringVar(value=f"{DEFAULT_POWER_OVERLAP:g}")
self.power_smoothing_var = tk.StringVar(value=f"{DEFAULT_POWER_SMOOTHING_ALPHA:g}")
self.winisd_enabled_var = tk.BooleanVar(value=not args.disable_winisd_headroom and ENABLE_WINISD_HEADROOM)
self.winisd_max_power_var = tk.StringVar(value=str(args.winisd_max_power or WINISD_MAX_POWER_FILE))
self.headroom_smoothing_var = tk.StringVar(value=f"{args.headroom_smoothing_alpha:g}")
self.winisd_status_var = tk.StringVar(value="WinISD headroom uses per-bin P/Pmax for LF.")
self.lf_enabled_var = tk.BooleanVar(value=True)
self.lf_channel_var = tk.StringVar(value="1")
self.lf_impedance_var = tk.StringVar(value="impedance.txt" if Path("impedance.txt").exists() else "")
self.lf_gain_var = tk.StringVar(value=f"{DEFAULT_AMP_GAIN_DB:g}")
self.hf_enabled_var = tk.BooleanVar(value=False)
self.hf_channel_var = tk.StringVar(value="2")
self.hf_impedance_var = tk.StringVar(value="")
self.hf_gain_var = tk.StringVar(value=f"{DEFAULT_AMP_GAIN_DB:g}")
self.status_var = tk.StringVar(value="Stopped")
self.power_style = ttk.Style()
self.power_style.configure("PowerOk.Horizontal.TProgressbar", troughcolor="#d9d9d9", background="#22a447")
self.power_style.configure("PowerWarn.Horizontal.TProgressbar", troughcolor="#d9d9d9", background="#e6a400")
self.power_style.configure("PowerCritical.Horizontal.TProgressbar", troughcolor="#d9d9d9", background="#d62728")
self._build_widgets()
self._select_initial_device(args.device)
self.root.protocol("WM_DELETE_WINDOW", self.close)
self.root.after(50, self.poll_audio_queue)
def _build_widgets(self) -> None:
ttk = self.ttk
outer = ttk.Frame(self.root, padding=12)
outer.pack(fill="both", expand=True)
config_frame = ttk.LabelFrame(outer, text="Input setup", padding=10)
config_frame.pack(fill="x")
config_frame.columnconfigure(1, weight=1)
config_frame.columnconfigure(5, weight=1)
ttk.Label(config_frame, text="Device").grid(row=0, column=0, sticky="w")
self.device_combo = ttk.Combobox(
config_frame,
textvariable=self.device_var,
values=[entry.label for entry in self.device_entries],
state="readonly",
)
self.device_combo.grid(row=0, column=1, columnspan=5, sticky="ew", padx=(8, 0))
ttk.Label(config_frame, text="Channels").grid(row=1, column=0, sticky="w", pady=(8, 0))
ttk.Entry(config_frame, textvariable=self.channels_var, width=12).grid(
row=1, column=1, sticky="w", padx=(8, 16), pady=(8, 0)
)
ttk.Label(config_frame, text="Sample rate").grid(row=1, column=2, sticky="w", pady=(8, 0))
ttk.Entry(config_frame, textvariable=self.samplerate_var, width=12).grid(
row=1, column=3, sticky="w", padx=(8, 16), pady=(8, 0)
)
ttk.Label(config_frame, text="Blocksize").grid(row=1, column=4, sticky="w", pady=(8, 0))
ttk.Entry(config_frame, textvariable=self.blocksize_var, width=10).grid(
row=1, column=5, sticky="w", padx=(8, 0), pady=(8, 0)
)
ttk.Label(config_frame, text="Voltage scale").grid(row=2, column=0, sticky="w", pady=(8, 0))
ttk.Entry(config_frame, textvariable=self.calibration_var, width=12).grid(
row=2, column=1, sticky="w", padx=(8, 4), pady=(8, 0)
)
ttk.Label(config_frame, text="V per sample unit").grid(
row=2, column=2, sticky="w", padx=(0, 16), pady=(8, 0)
)
self.start_button = ttk.Button(config_frame, text="Start", command=self.start)
self.start_button.grid(row=2, column=3, sticky="ew", padx=(0, 8), pady=(8, 0))
self.stop_button = ttk.Button(config_frame, text="Stop", command=self.stop, state="disabled")
self.stop_button.grid(row=2, column=4, sticky="ew", padx=(0, 16), pady=(8, 0))
ttk.Label(config_frame, textvariable=self.status_var).grid(
row=2, column=5, sticky="w", pady=(8, 0)
)
ttk.Label(config_frame, text="Cal signal").grid(row=3, column=0, sticky="w", pady=(8, 0))
ttk.Entry(config_frame, textvariable=self.known_rms_v_var, width=12).grid(
row=3, column=1, sticky="w", padx=(8, 4), pady=(8, 0)
)
ttk.Label(config_frame, text="Vrms at input").grid(
row=3, column=2, sticky="w", padx=(0, 16), pady=(8, 0)
)
ttk.Label(config_frame, text="Seconds").grid(row=3, column=3, sticky="w", pady=(8, 0))
ttk.Entry(config_frame, textvariable=self.calibration_duration_var, width=10).grid(
row=3, column=4, sticky="w", padx=(8, 16), pady=(8, 0)
)
self.calibrate_button = ttk.Button(
config_frame,
text="Calibrate",
command=self.start_calibration,
state="disabled",
)
self.calibrate_button.grid(row=3, column=5, sticky="ew", pady=(8, 0))
self.level_frame = ttk.LabelFrame(outer, text="Input-side metrics", padding=10)
self.level_frame.pack(fill="x", pady=(12, 0))
self.level_frame.columnconfigure(6, weight=1)
input_note = ttk.Label(
self.level_frame,
text="Measured at DSP output / amplifier input, using the current voltage scale.",
)
input_note.grid(row=0, column=0, columnspan=7, sticky="w", pady=(0, 8))
for col, heading in enumerate(("Ch", "RMS voltage", "Peak voltage", "Crest factor", "Crest", "Digital peak")):
ttk.Label(self.level_frame, text=heading).grid(
row=1,
column=col,
sticky="w" if col == 0 else "e",
padx=(0, 18 if col < 5 else 0),
)
self.input_metric_static_widget_count = len(self.level_frame.winfo_children())
self.power_frame = ttk.LabelFrame(outer, text="Estimated amplifier output power", padding=10)
self.power_frame.pack(fill="x", pady=(12, 0))
self.power_frame.columnconfigure(3, weight=1)
power_note = ttk.Label(
self.power_frame,
text="Output side is estimated from DSP voltage, ideal amplifier gain, and the impedance curve.",
)
power_note.grid(row=0, column=0, columnspan=10, sticky="w", pady=(0, 8))
ttk.Label(self.power_frame, text="FFT").grid(row=1, column=0, sticky="w")
ttk.Entry(self.power_frame, textvariable=self.power_fft_size_var, width=8).grid(
row=1, column=1, sticky="w", padx=(8, 16)
)
ttk.Label(self.power_frame, text="Overlap").grid(row=1, column=2, sticky="e")
ttk.Entry(self.power_frame, textvariable=self.power_overlap_var, width=8).grid(
row=1, column=3, sticky="w", padx=(8, 16)
)
ttk.Label(self.power_frame, text="Smooth").grid(row=1, column=4, sticky="e")
ttk.Entry(self.power_frame, textvariable=self.power_smoothing_var, width=8).grid(
row=1, column=5, sticky="w", padx=(8, 16)
)
ttk.Checkbutton(self.power_frame, text="WinISD", variable=self.winisd_enabled_var).grid(
row=2, column=0, sticky="w", pady=(8, 0)
)
ttk.Entry(self.power_frame, textvariable=self.winisd_max_power_var, width=44).grid(
row=2, column=1, columnspan=3, sticky="ew", padx=(8, 12), pady=(8, 0)
)
ttk.Label(self.power_frame, text="Headroom smooth").grid(row=2, column=4, sticky="e", pady=(8, 0))
ttk.Entry(self.power_frame, textvariable=self.headroom_smoothing_var, width=8).grid(
row=2, column=5, sticky="w", padx=(8, 16), pady=(8, 0)
)
ttk.Label(self.power_frame, textvariable=self.winisd_status_var).grid(
row=2, column=6, columnspan=9, sticky="w", pady=(8, 0)
)
headings = (
"Way",
"On",
"Ch",
"Impedance file",
"Gain dB",
"Vdsp",
"Vamp",
"P",
"Q",
"Sapp",
"Headroom",
"Crit Hz",
"Limit",
"P meter",
"",
"Max P",
)
for col, heading in enumerate(headings):
ttk.Label(self.power_frame, text=heading).grid(
row=3,
column=col,
sticky="w" if col <= 4 else "e",
pady=(8, 0),
padx=(0, 12 if col < 9 else 0),
)
self._build_power_config_row("LF", 4, self.lf_enabled_var, self.lf_channel_var, self.lf_impedance_var, self.lf_gain_var)
self._build_power_config_row("HF", 5, self.hf_enabled_var, self.hf_channel_var, self.hf_impedance_var, self.hf_gain_var)
plots_frame = ttk.Frame(outer)
plots_frame.pack(fill="both", expand=True, pady=(12, 0))
plots_frame.columnconfigure(0, weight=1)
plots_frame.columnconfigure(1, weight=1)
plots_frame.rowconfigure(0, weight=1)
plots_frame.rowconfigure(1, weight=1)
self.scope_frame = ttk.LabelFrame(plots_frame, text="Input waveform, calibrated voltage", padding=10)
self.scope_frame.grid(row=0, column=0, sticky="nsew", padx=(0, 6))
self.scope_frame.rowconfigure(0, weight=1)
self.scope_frame.columnconfigure(0, weight=1)
self.scope_canvas = self.tk.Canvas(
self.scope_frame,
background="#111111",
highlightthickness=0,
height=260,
)
self.scope_canvas.grid(row=0, column=0, sticky="nsew")
self.scope_status_var = self.tk.StringVar(value="Waveform shows the newest input block in calibrated volts.")
scope_controls = ttk.Frame(self.scope_frame)
scope_controls.grid(row=1, column=0, sticky="ew", pady=(8, 0))
scope_controls.columnconfigure(3, weight=1)
ttk.Label(scope_controls, text="Y limit").grid(row=0, column=0, sticky="w")
ttk.Entry(scope_controls, textvariable=self.waveform_y_limit_var, width=10).grid(
row=0, column=1, sticky="w", padx=(8, 4)
)
ttk.Label(scope_controls, text="V, blank = auto").grid(row=0, column=2, sticky="w")
ttk.Label(scope_controls, textvariable=self.scope_status_var).grid(
row=0, column=3, sticky="w", padx=(16, 0)
)
self.waveform_y_limit_var.trace_add("write", lambda *_args: self.draw_scope_grid())
self.scope_canvas.bind("<Configure>", lambda _event: self.draw_scope_grid())
self.draw_scope_grid()
self.spectrum_frame = ttk.LabelFrame(plots_frame, text="Input PSD, Welch scaled", padding=10)
self.spectrum_frame.grid(row=0, column=1, sticky="nsew", padx=(6, 0))
self.spectrum_frame.rowconfigure(0, weight=1)
self.spectrum_frame.columnconfigure(0, weight=1)
self.spectrum_canvas = self.tk.Canvas(
self.spectrum_frame,
background="#111111",
highlightthickness=0,
height=260,
)
self.spectrum_canvas.grid(row=0, column=0, sticky="nsew")
self.spectrum_status_var = self.tk.StringVar(value="PSD uses Hann/Welch scaling in V^2/Hz.")
ttk.Label(self.spectrum_frame, textvariable=self.spectrum_status_var).grid(
row=1, column=0, sticky="w", pady=(8, 0)
)
self.spectrum_canvas.bind("<Configure>", lambda _event: self.draw_spectrum_grid())
self.draw_spectrum_grid()
self.power_history_frame = ttk.LabelFrame(plots_frame, text="Output power history", padding=10)
self.power_history_frame.grid(row=1, column=0, columnspan=2, sticky="nsew", pady=(12, 0))
self.power_history_frame.rowconfigure(0, weight=1)
self.power_history_frame.columnconfigure(0, weight=1)
self.power_history_canvas = self.tk.Canvas(
self.power_history_frame,
background="#111111",
highlightthickness=0,
height=220,
)
self.power_history_canvas.grid(row=0, column=0, sticky="nsew")
power_history_controls = ttk.Frame(self.power_history_frame)
power_history_controls.grid(row=1, column=0, sticky="ew", pady=(8, 0))
power_history_controls.columnconfigure(3, weight=1)
ttk.Label(power_history_controls, text="Window").grid(row=0, column=0, sticky="w")
ttk.Entry(power_history_controls, textvariable=self.power_history_window_var, width=10).grid(
row=0, column=1, sticky="w", padx=(8, 4)
)
ttk.Label(power_history_controls, text="s").grid(row=0, column=2, sticky="w")
self.power_history_status_var = self.tk.StringVar(value="P and Sapp over the selected moving window.")
ttk.Label(power_history_controls, textvariable=self.power_history_status_var).grid(
row=0, column=3, sticky="w", padx=(16, 0)
)
self.power_history_window_var.trace_add("write", lambda *_args: self.draw_power_history())
self.power_history_canvas.bind("<Configure>", lambda _event: self.draw_power_history_grid())
self.draw_power_history_grid()
def _build_power_config_row(
self,
name: str,
row: int,
enabled_var,
channel_var,
impedance_var,
gain_var,
) -> None:
ttk = self.ttk
ttk.Label(self.power_frame, text=name).grid(row=row, column=0, sticky="w", pady=4)
ttk.Checkbutton(self.power_frame, variable=enabled_var).grid(row=row, column=1, sticky="w", pady=4)
ttk.Entry(self.power_frame, textvariable=channel_var, width=5).grid(row=row, column=2, sticky="w", pady=4)
ttk.Entry(self.power_frame, textvariable=impedance_var, width=44).grid(row=row, column=3, sticky="ew", padx=(8, 12), pady=4)
ttk.Entry(self.power_frame, textvariable=gain_var, width=8).grid(row=row, column=4, sticky="w", pady=4)
vars_by_metric = {
"vdsp": self.tk.StringVar(value="-"),
"vamp": self.tk.StringVar(value="-"),
"p": self.tk.StringVar(value="-"),
"q": self.tk.StringVar(value="-"),
"s": self.tk.StringVar(value="-"),
"headroom": self.tk.StringVar(value="-"),
"crit_hz": self.tk.StringVar(value="-"),
"limit": self.tk.StringVar(value="-"),
"max_p": self.tk.StringVar(value="-"),
}
for offset, key in enumerate(("vdsp", "vamp", "p", "q", "s", "headroom", "crit_hz", "limit"), start=5):
ttk.Label(self.power_frame, textvariable=vars_by_metric[key], width=12, anchor="e").grid(
row=row, column=offset, sticky="e", padx=(0, 12 if offset < 9 else 0), pady=4
)
meter = ttk.Progressbar(
self.power_frame,
orient="horizontal",
mode="determinate",
maximum=DEFAULT_POWER_CRITICAL_W,
style="PowerOk.Horizontal.TProgressbar",
length=150,
)
meter.grid(row=row, column=13, sticky="ew", padx=(8, 8), pady=4)
light = self.tk.Canvas(self.power_frame, width=18, height=18, highlightthickness=0)
light.grid(row=row, column=14, sticky="w", padx=(0, 6), pady=4)
light_id = light.create_oval(3, 3, 15, 15, fill="#22a447", outline="#555555")
ttk.Label(self.power_frame, textvariable=vars_by_metric["max_p"], width=12, anchor="e").grid(
row=row, column=15, sticky="e", pady=4
)
vars_by_metric["meter"] = meter
vars_by_metric["light"] = light
vars_by_metric["light_id"] = light_id
self.power_rows[name] = vars_by_metric
def _select_initial_device(self, requested_index: int | None) -> None:
selected = 0
if requested_index is not None:
for idx, entry in enumerate(self.device_entries):
if entry.index == requested_index:
selected = idx
break
else:
for idx, entry in enumerate(self.device_entries):
if "UltraLite" in entry.label:
selected = idx
break
self.device_combo.current(selected)
def _parse_gui_config(self) -> InputConfig:
combo_index = self.device_combo.current()
if combo_index < 0:
raise ValueError("select an input device")
entry = self.device_entries[combo_index]
device_index = entry.index
device = entry.info
max_input_channels = int(device["max_input_channels"])
channels = parse_channel_list(self.channels_var.get())
self.device_manager.validate_channels(channels, max_input_channels)
samplerate_text = self.samplerate_var.get().strip()
samplerate = None if samplerate_text == "" else float(samplerate_text)
blocksize = int(self.blocksize_var.get())
calibration = float(self.calibration_var.get())
if samplerate is not None and samplerate <= 0:
raise ValueError("sample rate must be positive")
if blocksize <= 0:
raise ValueError("blocksize must be positive")
if calibration <= 0:
raise ValueError("volts per sample unit must be positive")
return InputConfig(
device_index=device_index,
channels_zero_based=channels,
samplerate=samplerate,
blocksize=blocksize,
input_volts_per_sample_unit=calibration,
update_interval_s=self.args.update_interval,
)
def _parse_power_settings(self, config: InputConfig, actual_samplerate: float) -> None:
if np is None:
raise RuntimeError("numpy has not been loaded")
fft_size = int(self.power_fft_size_var.get())
overlap = float(self.power_overlap_var.get())
smoothing = float(self.power_smoothing_var.get())
headroom_smoothing = float(self.headroom_smoothing_var.get())
if fft_size <= 0 or fft_size % 2:
raise ValueError("power FFT size must be a positive even number")
if not 0.0 <= overlap < 1.0:
raise ValueError("power overlap must be >= 0 and < 1")
if not 0.0 <= smoothing < 1.0:
raise ValueError("power smoothing must be >= 0 and < 1")
if not 0.0 <= headroom_smoothing < 1.0:
raise ValueError("headroom smoothing must be >= 0 and < 1")
hop_size = int(round(fft_size * (1.0 - overlap)))
if hop_size <= 0:
raise ValueError("power overlap is too high")
ways: list[WayConfig] = []
specs = (
("LF", self.lf_enabled_var, self.lf_channel_var, self.lf_impedance_var, self.lf_gain_var),
("HF", self.hf_enabled_var, self.hf_channel_var, self.hf_impedance_var, self.hf_gain_var),
)
for name, enabled_var, channel_var, impedance_var, gain_var in specs:
if not enabled_var.get():
continue
impedance_path = impedance_var.get().strip()
if not impedance_path:
continue
channel = parse_channel_list(channel_var.get())[0]
if channel >= config.stream_channel_count:
raise ValueError(
f"{name} channel {channel + 1} is not captured; include it in Channels"
)
ways.append(
WayConfig(
name=name,
channel_zero_based=channel,
impedance_path=Path(impedance_path),
amp_gain_db=float(gain_var.get()),
)
)
names = {way.name for way in ways}
for name, row in self.power_rows.items():
if name not in names:
for key in ("vdsp", "vamp", "p", "q", "s", "headroom", "crit_hz", "limit", "max_p"):
row[key].set("-")
row["meter"]["value"] = 0.0
row["meter"]["maximum"] = DEFAULT_POWER_CRITICAL_W
row["meter"].configure(style="PowerOk.Horizontal.TProgressbar")
row["light"].itemconfigure(row["light_id"], fill="#22a447")
self.power_estimators = {}
self.power_buffers = {}
self.power_history = {}
self.power_max_hold_w = {}
self.headroom_curves = {}
self.headroom_smoothed_db = {}
self.power_history_start_s = None
self.power_hop_size = hop_size
self.input_spectrum_estimators = {}
self.input_spectrum_buffers = {}
self.input_spectrum_frames = {}
for channel in config.channels_zero_based:
self.input_spectrum_estimators[channel] = VoltageSpectrumEstimator(
samplerate=actual_samplerate,
fft_size=fft_size,
input_volts_per_sample_unit=config.input_volts_per_sample_unit,
smoothing_alpha=smoothing,
)
self.input_spectrum_buffers[channel] = np.zeros(0, dtype=np.float64)
for way in ways:
impedance = ImpedanceCurve.from_file(way.impedance_path)
self.power_estimators[way.name] = PowerEstimator(
way=way,
impedance=impedance,
samplerate=actual_samplerate,
fft_size=fft_size,
input_volts_per_sample_unit=config.input_volts_per_sample_unit,
smoothing_alpha=smoothing,
)
self.power_buffers[way.name] = np.zeros(0, dtype=np.float64)
self.power_history[way.name] = []
self.power_max_hold_w[way.name] = 0.0
self._load_gui_winisd_curves()
def _load_gui_winisd_curves(self) -> None:
self.headroom_curves = {}
self.headroom_smoothed_db = {}
if not self.winisd_enabled_var.get():
self.winisd_status_var.set("WinISD headroom disabled.")
return
lf_estimator = self.power_estimators.get("LF")
if lf_estimator is None:
self.winisd_status_var.set("WinISD headroom disabled: LF way is not active.")
return
path = Path(self.winisd_max_power_var.get().strip() or WINISD_MAX_POWER_FILE)
try:
curve = WinISDLimitCurve.from_file(path, eps_w=HEADROOM_EPS_W)
except Exception as exc:
self.winisd_status_var.set(f"WinISD headroom disabled: {exc}")
return
self.headroom_curves["LF"] = curve
status = (
f"WinISD headroom active: {path}, "
f"{curve.frequency_hz[0]:.1f}-{curve.frequency_hz[-1]:.1f} Hz."
)
excursion_10w = WINISD_EXCURSION_10W_FILE
excursion_100w = WINISD_EXCURSION_100W_FILE
if excursion_10w.exists() and excursion_100w.exists():
try:
check = check_excursion_scaling(excursion_10w, excursion_100w)
status += " " + check.message
except Exception as exc:
status += f" Excursion check skipped: {exc}"
self.winisd_status_var.set(status)
def _process_input_spectrum_block(self, block) -> None:
if np is None or not self.input_spectrum_estimators:
return
for channel, estimator in self.input_spectrum_estimators.items():
samples = block[:, channel].astype(np.float64, copy=False)
self.input_spectrum_buffers[channel] = np.concatenate(
(self.input_spectrum_buffers[channel], samples)
)
latest = None
while len(self.input_spectrum_buffers[channel]) >= estimator.fft_size:
latest = estimator.estimate(
self.input_spectrum_buffers[channel][: estimator.fft_size]
)
self.input_spectrum_buffers[channel] = self.input_spectrum_buffers[channel][
self.power_hop_size :
]
if latest is None:
continue
self.input_spectrum_frames[channel] = latest
row = self.input_metric_rows.get(channel)
if row is not None:
row["rms"].set(f"{latest.rms_v:.3f} V")
row["peak"].set(f"{latest.peak_v:.3f} V")
row["crest"].set(f"{latest.crest_factor:.2f}x")
row["crest_db"].set(f"{latest.crest_factor_db:.2f} dB")
digital_peak = latest.peak_v / self.config.input_volts_per_sample_unit if self.config else 0.0
row["digital_peak"].set(f"{digital_peak:.5f} FS")
self.draw_spectrum()
def _process_power_block(self, block) -> None:
if np is None or not self.power_estimators:
return
for name, estimator in self.power_estimators.items():
way = estimator.way
samples = block[:, way.channel_zero_based].astype(np.float64, copy=False)
self.power_buffers[name] = np.concatenate((self.power_buffers[name], samples))
latest = None
while len(self.power_buffers[name]) >= estimator.fft_size:
latest = estimator.estimate(self.power_buffers[name][: estimator.fft_size])
self.power_buffers[name] = self.power_buffers[name][self.power_hop_size :]
if latest is None:
continue
row = self.power_rows.get(name)
if row is None:
continue
row["vdsp"].set(f"{latest.rms_input_v:.3f} V")
row["vamp"].set(f"{latest.rms_amp_v:.1f} V")
row["p"].set(f"{latest.total_p_w:.1f} W")
row["q"].set(f"{latest.total_q_var:.1f} var")
row["s"].set(f"{latest.total_s_va:.1f} VA")
headroom = self._compute_gui_headroom(name, latest)
if headroom is None:
row["headroom"].set("-")
row["crit_hz"].set("-")
row["limit"].set("-")
else:
row["headroom"].set(f"{headroom['worst_headroom_db']:+.1f} dB")
row["crit_hz"].set(f"{headroom['critical_frequency_hz']:.1f}")
row["limit"].set(headroom["status"])
self._update_power_meter(name, latest.total_p_w, headroom)
self._append_power_history(name, latest)
self.draw_power_history()
def _compute_gui_headroom(self, name: str, frame: PowerFrame) -> dict | None:
curve = self.headroom_curves.get(name)
if curve is None:
return None
headroom = curve.compute_headroom(frame.p_w_per_bin, frame.frequencies_hz)
raw = float(headroom["worst_headroom_db"])
try:
alpha = float(self.headroom_smoothing_var.get())
except ValueError:
alpha = HEADROOM_SMOOTHING_ALPHA
alpha = min(max(alpha, 0.0), 0.999999)
smoothed = smooth_headroom_db(self.headroom_smoothed_db.get(name), raw, alpha)
self.headroom_smoothed_db[name] = smoothed
headroom["worst_headroom_db"] = smoothed
headroom["status"] = headroom_status(smoothed)
headroom["over_limit_bool"] = bool(math.isfinite(smoothed) and smoothed <= 0.0)
return headroom
def _update_power_meter(self, name: str, power_w: float, headroom: dict | None = None) -> None:
row = self.power_rows.get(name)
if row is None:
return
max_hold = max(self.power_max_hold_w.get(name, 0.0), float(power_w))
self.power_max_hold_w[name] = max_hold
if headroom is not None and math.isfinite(float(headroom["critical_pmax_w"])):
critical_pmax = max(float(headroom["critical_pmax_w"]), HEADROOM_EPS_W)
critical_power = max(float(headroom["critical_power_w"]), 0.0)
meter_value = min((critical_power / critical_pmax) * 100.0, 100.0)
row["meter"]["maximum"] = 100.0
status = headroom["status"]
if status == "LIMIT EXCEEDED":
color = "#d62728"
style = "PowerCritical.Horizontal.TProgressbar"
elif status == "WARNING":
color = "#e6a400"
style = "PowerWarn.Horizontal.TProgressbar"
else:
color = "#22a447"
style = "PowerOk.Horizontal.TProgressbar"
else:
meter_value = min(max(float(power_w), 0.0), DEFAULT_POWER_CRITICAL_W)
row["meter"]["maximum"] = DEFAULT_POWER_CRITICAL_W
if power_w >= DEFAULT_POWER_CRITICAL_W:
color = "#d62728"
style = "PowerCritical.Horizontal.TProgressbar"
elif power_w >= DEFAULT_POWER_WARNING_W:
color = "#e6a400"
style = "PowerWarn.Horizontal.TProgressbar"
else:
color = "#22a447"
style = "PowerOk.Horizontal.TProgressbar"
row["meter"]["value"] = meter_value
row["meter"].configure(style=style)
row["light"].itemconfigure(row["light_id"], fill=color)
row["max_p"].set(f"{max_hold:.1f} W")
def _append_power_history(self, name: str, frame: PowerFrame) -> None:
now = time.monotonic()
if self.power_history_start_s is None:
self.power_history_start_s = now
t = now - self.power_history_start_s
history = self.power_history.setdefault(name, [])
history.append((t, frame.total_p_w, frame.total_s_va))
window_s = self._power_history_window_s()
keep_after = max(0.0, t - window_s * 1.5)
self.power_history[name] = [item for item in history if item[0] >= keep_after]
def _power_history_window_s(self) -> float:
try:
value = float(self.power_history_window_var.get().strip())
except ValueError:
value = 30.0
return value if value > 0.0 else 30.0
def _build_level_rows(self, channels_zero_based: list[int]) -> None:
for child in self.level_frame.winfo_children()[self.input_metric_static_widget_count:]:
child.destroy()
self.level_rows = []
self.input_metric_rows = {}
for row, channel in enumerate(channels_zero_based, start=2):
channel_label = self.ttk.Label(self.level_frame, text=f"Ch {channel + 1}", width=8)
channel_label.grid(row=row, column=0, sticky="w", pady=6)
vars_by_metric = {
"rms": self.tk.StringVar(value="-"),
"peak": self.tk.StringVar(value="-"),
"crest": self.tk.StringVar(value="-"),
"crest_db": self.tk.StringVar(value="-"),
"digital_peak": self.tk.StringVar(value="-"),
}
for col, key in enumerate(("rms", "peak", "crest", "crest_db", "digital_peak"), start=1):
self.ttk.Label(self.level_frame, textvariable=vars_by_metric[key], width=16, anchor="e").grid(
row=row,
column=col,
sticky="e",
padx=(0, 18 if col < 5 else 0),
pady=6,
)
self.input_metric_rows[channel] = vars_by_metric
self.draw_scope_grid()
self.draw_spectrum_grid()
def draw_scope_grid(self) -> None:
canvas = self.scope_canvas
canvas.delete("grid")
canvas.delete("trace")
width = max(canvas.winfo_width(), 2)
height = max(canvas.winfo_height(), 2)
y_limit_v = self._scope_y_limit_v()
for frac in (0.0, 0.25, 0.5, 0.75, 1.0):
y = int(frac * (height - 1))
color = "#555555" if frac == 0.5 else "#2a2a2a"
canvas.create_line(0, y, width, y, fill=color, tags="grid")
for frac in (0.0, 0.25, 0.5, 0.75, 1.0):
x = int(frac * (width - 1))
canvas.create_line(x, 0, x, height, fill="#242424", tags="grid")
canvas.create_text(6, 6, anchor="nw", text=f"+{y_limit_v:g} V", fill="#bbbbbb", tags="grid")
canvas.create_text(6, height // 2 + 4, anchor="nw", text="0", fill="#bbbbbb", tags="grid")
canvas.create_text(6, height - 20, anchor="nw", text=f"-{y_limit_v:g} V", fill="#bbbbbb", tags="grid")
if self.config is not None:
legend_parts = []
for idx, channel in enumerate(self.config.channels_zero_based):
color = OSCILLOSCOPE_TRACE_COLORS[idx % len(OSCILLOSCOPE_TRACE_COLORS)]
legend_parts.append((f"Ch {channel + 1}", color))
x = width - 10
for label, color in reversed(legend_parts):
text_id = canvas.create_text(x, 6, anchor="ne", text=label, fill=color, tags="grid")
bbox = canvas.bbox(text_id)
if bbox:
x = bbox[0] - 12
def _scope_y_limit_v(self) -> float:
text = self.waveform_y_limit_var.get().strip()
if text:
try:
manual_limit = float(text)
except ValueError:
manual_limit = 0.0
if manual_limit > 0.0:
return manual_limit
if self.config is None:
return DEFAULT_INPUT_VOLTS_PER_SAMPLE_UNIT
return self.config.input_volts_per_sample_unit
def draw_waveform(self, block) -> None:
if self.config is None or np is None:
return
canvas = self.scope_canvas
width = max(canvas.winfo_width(), 2)
height = max(canvas.winfo_height(), 2)
selected = block[:, self.config.channels_zero_based].astype(np.float64, copy=False)
volts = selected * self.config.input_volts_per_sample_unit
frame_count = selected.shape[0]
if frame_count < 2:
return
canvas.delete("trace")
samples_per_pixel = max(1, frame_count // width)
decimated = volts[::samples_per_pixel, :]
point_count = decimated.shape[0]
if point_count < 2:
return
x_scale = (width - 1) / (point_count - 1)
y_mid = (height - 1) / 2.0
y_limit_v = self._scope_y_limit_v()
y_scale = y_mid / y_limit_v
for channel_idx in range(decimated.shape[1]):
waveform = np.clip(
decimated[:, channel_idx],
-y_limit_v,
y_limit_v,
)
points: list[float] = []
for sample_idx, sample in enumerate(waveform):
points.extend((sample_idx * x_scale, y_mid - float(sample) * y_scale))
color = OSCILLOSCOPE_TRACE_COLORS[channel_idx % len(OSCILLOSCOPE_TRACE_COLORS)]
canvas.create_line(
points,
fill=color,
width=1.5,
smooth=False,
tags="trace",
)
def draw_spectrum_grid(self) -> None:
canvas = self.spectrum_canvas
canvas.delete("grid")
canvas.delete("spectrum")
width = max(canvas.winfo_width(), 2)
height = max(canvas.winfo_height(), 2)
for frac in (0.0, 0.25, 0.5, 0.75, 1.0):
y = int(frac * (height - 1))
color = "#555555" if frac == 0.5 else "#2a2a2a"
canvas.create_line(0, y, width, y, fill=color, tags="grid")
for frac in (0.0, 0.25, 0.5, 0.75, 1.0):
x = int(frac * (width - 1))
canvas.create_line(x, 0, x, height, fill="#242424", tags="grid")
canvas.create_text(6, 6, anchor="nw", text="PSD dB V^2/Hz", fill="#bbbbbb", tags="grid")
canvas.create_text(6, height - 20, anchor="nw", text="20 Hz", fill="#bbbbbb", tags="grid")
canvas.create_text(width - 6, height - 20, anchor="ne", text="Nyquist", fill="#bbbbbb", tags="grid")
def draw_spectrum(self) -> None:
if np is None or not self.input_spectrum_frames:
return
canvas = self.spectrum_canvas
width = max(canvas.winfo_width(), 2)
height = max(canvas.winfo_height(), 2)
canvas.delete("spectrum")
all_db = []
prepared = []
for idx, (channel, frame) in enumerate(sorted(self.input_spectrum_frames.items())):
freq = frame.frequencies_hz
mask = freq >= 20.0
if np.count_nonzero(mask) < 2:
continue
freq = freq[mask]
db = 10.0 * np.log10(np.maximum(frame.psd_v2_per_hz[mask], 1e-30))
all_db.append(db)
prepared.append((idx, channel, freq, db))
if not prepared:
return
all_values = np.concatenate(all_db)
y_max = float(np.nanmax(all_values))
y_min = float(np.nanmin(all_values))
if not np.isfinite(y_min) or not np.isfinite(y_max):
return
y_max = math.ceil((y_max + 3.0) / 10.0) * 10.0
y_min = min(y_max - 60.0, math.floor((y_min - 3.0) / 10.0) * 10.0)
if y_max <= y_min:
y_max = y_min + 60.0
f_min = 20.0
f_max = max(float(frame.frequencies_hz[-1]) for frame in self.input_spectrum_frames.values())
log_min = math.log10(f_min)
log_max = math.log10(f_max)
canvas.create_text(6, 22, anchor="nw", text=f"{y_max:g}", fill="#888888", tags="spectrum")
canvas.create_text(6, height - 38, anchor="nw", text=f"{y_min:g}", fill="#888888", tags="spectrum")
for idx, channel, freq, db in prepared:
x = (np.log10(freq) - log_min) / (log_max - log_min) * (width - 1)
y = (y_max - db) / (y_max - y_min) * (height - 1)
points: list[float] = []
step = max(1, len(x) // max(width, 1))
for px, py in zip(x[::step], y[::step]):
points.extend((float(px), float(np.clip(py, 0, height - 1))))
if len(points) >= 4:
color = OSCILLOSCOPE_TRACE_COLORS[idx % len(OSCILLOSCOPE_TRACE_COLORS)]
canvas.create_line(points, fill=color, width=1.4, smooth=False, tags="spectrum")
canvas.create_text(
width - 8,
8 + 16 * idx,
anchor="ne",
text=f"Ch {channel + 1}",
fill=color,
tags="spectrum",
)
self.spectrum_status_var.set(
f"PSD: {y_min:g} to {y_max:g} dB V^2/Hz, Hann/Welch, {len(prepared)} channel(s)"
)
def draw_power_history_grid(self) -> None:
canvas = self.power_history_canvas
canvas.delete("grid")
canvas.delete("history")
width = max(canvas.winfo_width(), 2)
height = max(canvas.winfo_height(), 2)
for frac in (0.0, 0.25, 0.5, 0.75, 1.0):
y = int(frac * (height - 1))
color = "#555555" if frac == 0.5 else "#2a2a2a"
canvas.create_line(0, y, width, y, fill=color, tags="grid")
for frac in (0.0, 0.25, 0.5, 0.75, 1.0):
x = int(frac * (width - 1))
canvas.create_line(x, 0, x, height, fill="#242424", tags="grid")
canvas.create_text(6, 6, anchor="nw", text="P solid / Sapp dashed", fill="#bbbbbb", tags="grid")
canvas.create_text(6, height - 20, anchor="nw", text=f"-{self._power_history_window_s():g} s", fill="#bbbbbb", tags="grid")
canvas.create_text(width - 6, height - 20, anchor="ne", text="now", fill="#bbbbbb", tags="grid")
def draw_power_history(self) -> None:
canvas = self.power_history_canvas
if not self.power_history:
self.draw_power_history_grid()
return
canvas.delete("history")
width = max(canvas.winfo_width(), 2)
height = max(canvas.winfo_height(), 2)
window_s = self._power_history_window_s()
latest_t = max((items[-1][0] for items in self.power_history.values() if items), default=0.0)
t_min = max(0.0, latest_t - window_s)
prepared = []
all_values: list[float] = []
for idx, (name, items) in enumerate(sorted(self.power_history.items())):
visible = [item for item in items if item[0] >= t_min]
if len(visible) < 2:
continue
t = np.asarray([item[0] for item in visible], dtype=np.float64)
p = np.asarray([item[1] for item in visible], dtype=np.float64)
s = np.asarray([item[2] for item in visible], dtype=np.float64)
prepared.append((idx, name, t, p, s))
all_values.extend(float(v) for v in p)
all_values.extend(float(v) for v in s)
if not prepared:
return
y_max = max(all_values)
y_min = min(0.0, min(all_values))
if y_max <= y_min:
y_max = y_min + 1.0
padding = 0.08 * (y_max - y_min)
y_max += padding
y_min -= padding
def to_points(t_values, y_values) -> list[float]:
x = (t_values - (latest_t - window_s)) / window_s * (width - 1)
y = (y_max - y_values) / (y_max - y_min) * (height - 1)
points: list[float] = []
for px, py in zip(x, y):
points.extend((float(np.clip(px, 0, width - 1)), float(np.clip(py, 0, height - 1))))
return points
canvas.create_text(6, 22, anchor="nw", text=f"{y_max:.1f}", fill="#888888", tags="history")
canvas.create_text(6, height - 38, anchor="nw", text=f"{y_min:.1f}", fill="#888888", tags="history")
for idx, name, t, p, s in prepared:
color = OSCILLOSCOPE_TRACE_COLORS[idx % len(OSCILLOSCOPE_TRACE_COLORS)]
p_points = to_points(t, p)
s_points = to_points(t, s)
if len(p_points) >= 4:
canvas.create_line(p_points, fill=color, width=1.8, smooth=False, tags="history")
if len(s_points) >= 4:
canvas.create_line(
s_points,
fill=color,
width=1.2,
smooth=False,
dash=(5, 3),
tags="history",
)
canvas.create_text(
width - 8,
8 + 16 * idx,
anchor="ne",
text=f"{name} P/Sapp",
fill=color,
tags="history",
)
self.power_history_status_var.set(
f"Window {window_s:g} s, y {y_min:.1f} to {y_max:.1f} W / VA"
)
def start(self) -> None:
if self.capture is not None:
return
try:
config = self._parse_gui_config()
except Exception as exc:
self.messagebox.showerror("Invalid input setup", str(exc))
return
capture = QueuedInputStream(
sd=self.sd,
device_index=config.device_index,
channels_zero_based=config.channels_zero_based,
samplerate=config.samplerate,
blocksize=config.blocksize,
max_queue_blocks=8,
)
try:
capture.start()
actual_samplerate = capture.actual_samplerate
self._parse_power_settings(config, actual_samplerate)
except Exception as exc:
capture.stop()
self.messagebox.showerror("Start failed", str(exc))
return
self.capture = capture
self.config = config
self._build_level_rows(config.channels_zero_based)
self.start_button.configure(state="disabled")
self.stop_button.configure(state="normal")
self.calibrate_button.configure(state="normal")
self.device_combo.configure(state="disabled")
self.status_var.set(f"Running at {actual_samplerate:g} Hz")
def stop(self) -> None:
if self.capture is not None:
self.capture.stop()
self.capture = None
self.config = None
self.calibration_state = None
self.input_spectrum_estimators = {}
self.input_spectrum_buffers = {}
self.input_spectrum_frames = {}
self.power_estimators = {}
self.power_buffers = {}
self.power_history = {}
self.power_max_hold_w = {}
self.headroom_curves = {}
self.headroom_smoothed_db = {}
self.power_history_start_s = None
self.power_hop_size = 0
for row in self.input_metric_rows.values():
for value_var in row.values():
value_var.set("-")
for row in self.power_rows.values():
for key in ("vdsp", "vamp", "p", "q", "s", "headroom", "crit_hz", "limit", "max_p"):
row[key].set("-")
row["meter"]["value"] = 0.0
row["meter"]["maximum"] = DEFAULT_POWER_CRITICAL_W
row["meter"].configure(style="PowerOk.Horizontal.TProgressbar")
row["light"].itemconfigure(row["light_id"], fill="#22a447")
self.draw_spectrum_grid()
self.draw_power_history_grid()
self.start_button.configure(state="normal")
self.stop_button.configure(state="disabled")
self.calibrate_button.configure(state="disabled")
self.device_combo.configure(state="readonly")
self.status_var.set("Stopped")
def start_calibration(self) -> None:
if self.config is None or self.capture is None or np is None:
self.messagebox.showerror("Calibration unavailable", "Start the input monitor first.")
return
try:
known_rms_v = float(self.known_rms_v_var.get())
duration_s = float(self.calibration_duration_var.get())
except ValueError:
self.messagebox.showerror("Invalid calibration", "Cal signal Vrms and seconds must be numbers.")
return
if known_rms_v <= 0:
self.messagebox.showerror("Invalid calibration", "Cal signal Vrms must be positive.")
return
if duration_s <= 0:
self.messagebox.showerror("Invalid calibration", "Calibration duration must be positive.")
return
self.calibration_state = {
"known_rms_v": known_rms_v,
"deadline": time.monotonic() + duration_s,
"sums": np.zeros(len(self.config.channels_zero_based), dtype=np.float64),
"frames": 0,
"peak_fs": np.zeros(len(self.config.channels_zero_based), dtype=np.float64),
}
self.calibrate_button.configure(state="disabled")
self.status_var.set(f"Calibrating from {known_rms_v:g} Vrms...")
def _accumulate_calibration(self, block) -> None:
if self.calibration_state is None or self.config is None or np is None:
return
selected = block[:, self.config.channels_zero_based].astype(np.float64, copy=False)
self.calibration_state["sums"] += np.sum(selected * selected, axis=0)
self.calibration_state["frames"] += selected.shape[0]
self.calibration_state["peak_fs"] = np.maximum(
self.calibration_state["peak_fs"],
np.max(np.abs(selected), axis=0),
)
if time.monotonic() < self.calibration_state["deadline"]:
return
frames = self.calibration_state["frames"]
if frames <= 0:
self.calibration_state = None
self.calibrate_button.configure(state="normal")
self.messagebox.showerror("Calibration failed", "No input samples were captured.")
return
try:
measured_rms_fs = np.sqrt(self.calibration_state["sums"] / frames)
per_channel, average = compute_volts_per_sample_unit(
self.calibration_state["known_rms_v"],
measured_rms_fs,
)
except ValueError as exc:
self.calibration_state = None
self.calibrate_button.configure(state="normal")
self.messagebox.showerror("Calibration failed", str(exc))
return
peak_fs = self.calibration_state["peak_fs"]
known_rms_v = self.calibration_state["known_rms_v"]
self.calibration_state = None
self.calibration_var.set(f"{average:.9g}")
self.config = InputConfig(
device_index=self.config.device_index,
channels_zero_based=self.config.channels_zero_based,
samplerate=self.config.samplerate,
blocksize=self.config.blocksize,
input_volts_per_sample_unit=average,
update_interval_s=self.config.update_interval_s,
)
if self.capture is not None:
try:
self._parse_power_settings(self.config, self.capture.actual_samplerate)
except Exception as exc:
self.power_estimators = {}
self.power_buffers = {}
self.messagebox.showwarning(
"Power estimator disabled",
f"Voltage calibration was applied, but power estimation could not restart: {exc}",
)
self.draw_scope_grid()
self.calibrate_button.configure(state="normal")
cal_path = CalibrationStore().save(
volts_per_sample_unit=average,
known_rms_v=known_rms_v,
measured_rms_sample_by_channel={
str(channel + 1): float(measured_rms_fs[idx])
for idx, channel in enumerate(self.config.channels_zero_based)
},
peak_sample_by_channel={
str(channel + 1): float(peak_fs[idx])
for idx, channel in enumerate(self.config.channels_zero_based)
},
device_index=self.config.device_index,
channels_zero_based=self.config.channels_zero_based,
samplerate=self.config.samplerate,
source="gui",
)
details = []
for idx, channel in enumerate(self.config.channels_zero_based):
warning = " CLIP?" if peak_fs[idx] >= CLIPPING_WARNING_THRESHOLD_FS else ""
details.append(
f"Ch {channel + 1}: {measured_rms_fs[idx]:.9f} sample units RMS, "
f"{per_channel[idx]:.9g} V/sample unit, peak {peak_fs[idx]:.6f} FS{warning}"
)
self.status_var.set(f"Voltage scale applied: {average:.9g} V / sample unit")
self.messagebox.showinfo(
"Calibration complete",
"\n".join(details)
+ f"\n\nAverage applied: {average:.9g} V / sample unit"
+ f"\nFull-scale sine equivalent: {average / math.sqrt(2.0):.9g} Vrms"
+ f"\nSaved: {cal_path}",
)
def poll_audio_queue(self) -> None:
latest = None
while True:
try:
if self.capture is None:
break
latest = self.capture.queue.get_nowait()
except queue.Empty:
break
if latest is not None and self.config is not None and self.capture is not None:
block, status = latest
self._accumulate_calibration(block)
rms_fs, rms_v, peak_fs = compute_levels(
block,
self.config.channels_zero_based,
self.config.input_volts_per_sample_unit,
)
for idx, channel in enumerate(self.config.channels_zero_based):
row = self.input_metric_rows.get(channel)
if row is not None and channel not in self.input_spectrum_frames:
peak_v = float(peak_fs[idx] * self.config.input_volts_per_sample_unit)
crest = peak_v / float(rms_v[idx]) if rms_v[idx] > 0 else 0.0
crest_db = 20.0 * math.log10(crest) if crest > 0 else float("-inf")
row["rms"].set(f"{rms_v[idx]:.3f} V")
row["peak"].set(f"{peak_v:.3f} V")
row["crest"].set(f"{crest:.2f}x")
row["crest_db"].set(f"{crest_db:.2f} dB")
row["digital_peak"].set(f"{peak_fs[idx]:.5f} FS")
self._process_input_spectrum_block(block)
self._process_power_block(block)
self.draw_waveform(block)
duration_ms = 1000.0 * block.shape[0] / self.capture.actual_samplerate
self.scope_status_var.set(
f"Newest block: {block.shape[0]} samples, {duration_ms:.1f} ms, "
f"vertical scale +/-{self._scope_y_limit_v():g} V"
)
if status:
self.status_var.set(f"Running at {self.capture.actual_samplerate:g} Hz | {status}")
self.root.after(50, self.poll_audio_queue)
def close(self) -> None:
self.stop()
self.root.destroy()
def run(self) -> None:
self.root.mainloop()
def run_gui(args: argparse.Namespace, sd) -> None:
app = LevelMonitorGUI(sd, args)
app.run()
def build_arg_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(
description="Monitor live soundcard input RMS/peak levels for DSP-output voltage."
)
parser.add_argument(
"--device",
type=int,
help="sounddevice input device index. If omitted, an interactive prompt is shown.",
)
parser.add_argument(
"--channels",
type=str,
help="One-based input channel list, e.g. '1' or '1,2'. If omitted, prompted.",
)
parser.add_argument(
"--samplerate",
type=float,
default=DEFAULT_SAMPLERATE,
help="Sample rate in Hz. Omit to use the device default.",
)
parser.add_argument(
"--blocksize",
type=int,
default=DEFAULT_BLOCKSIZE,
help=f"Audio block size in frames. Default: {DEFAULT_BLOCKSIZE}.",
)
parser.add_argument(
"--input-volts-per-sample-unit",
"--input-calibration",
dest="input_volts_per_sample_unit",
type=float,
default=None,
help=(
"Path-specific voltage scale in volts per captured sounddevice sample unit. "
"--input-calibration is accepted as a legacy alias. "
"Default: newest saved calibration, otherwise "
f"{DEFAULT_INPUT_VOLTS_PER_SAMPLE_UNIT:g}."
),
)
parser.add_argument(
"--calibrate-known-rms-v",
type=float,
help=(
"Run input calibration using a known true-RMS voltage at the selected input, "
"then apply the measured volts-per-sample-unit scale and start monitoring. "
"Use a steady REW sine tone, for example 1 kHz."
),
)
parser.add_argument(
"--calibrate-only",
action="store_true",
help="With --calibrate-known-rms-v, print the measured voltage scale and exit.",
)
parser.add_argument(
"--no-calibration-ready-prompt",
action="store_true",
help="Start calibration measurement immediately instead of waiting for Enter.",
)
parser.add_argument(
"--calibration-duration",
type=float,
default=DEFAULT_CALIBRATION_DURATION_S,
help=f"Calibration measurement duration in seconds. Default: {DEFAULT_CALIBRATION_DURATION_S:g}.",
)
parser.add_argument(
"--update-interval",
type=float,
default=DEFAULT_UPDATE_INTERVAL_S,
help=f"Console update interval in seconds. Default: {DEFAULT_UPDATE_INTERVAL_S:g}.",
)
parser.add_argument(
"--way",
type=PowerConsoleRunner.parse_way,
action="append",
help="Run power estimator for way NAME:CHANNEL:IMPEDANCE_FILE[:GAIN_DB], e.g. LF:1:impedance.txt:34.",
)
parser.add_argument("--lf-impedance", help="REW impedance export for LF power estimation.")
parser.add_argument(
"--lf-channel",
type=parse_channel,
default=0,
help="One-based LF input channel for power estimation. Default: 1.",
)
parser.add_argument(
"--lf-amp-gain-db",
type=float,
default=DEFAULT_AMP_GAIN_DB,
help=f"LF amplifier voltage gain in dB. Default: {DEFAULT_AMP_GAIN_DB:g}.",
)
parser.add_argument("--hf-impedance", help="REW impedance export for HF power estimation.")
parser.add_argument(
"--hf-channel",
type=parse_channel,
default=1,
help="One-based HF input channel for power estimation. Default: 2.",
)
parser.add_argument(
"--hf-amp-gain-db",
type=float,
default=DEFAULT_AMP_GAIN_DB,
help=f"HF amplifier voltage gain in dB. Default: {DEFAULT_AMP_GAIN_DB:g}.",
)
parser.add_argument(
"--power-fft-size",
"--fft-size",
dest="power_fft_size",
type=int,
default=DEFAULT_POWER_FFT_SIZE,
help=f"Power estimator FFT size. Default: {DEFAULT_POWER_FFT_SIZE}.",
)
parser.add_argument(
"--power-overlap",
"--overlap",
dest="power_overlap",
type=float,
default=DEFAULT_POWER_OVERLAP,
help=f"Power estimator overlap fraction. Default: {DEFAULT_POWER_OVERLAP:g}.",
)
parser.add_argument(
"--power-smoothing-alpha",
"--smoothing-alpha",
dest="power_smoothing_alpha",
type=float,
default=DEFAULT_POWER_SMOOTHING_ALPHA,
help=f"Power estimator exponential smoothing alpha. Default: {DEFAULT_POWER_SMOOTHING_ALPHA:g}.",
)
parser.add_argument(
"--power-update-interval",
type=float,
default=DEFAULT_POWER_UPDATE_INTERVAL_S,
help=f"Power estimator console update interval. Default: {DEFAULT_POWER_UPDATE_INTERVAL_S:g}.",
)
parser.add_argument(
"--winisd-max-power",
default=str(WINISD_MAX_POWER_FILE),
help=f"WinISD frequency-dependent max-power CSV for LF headroom. Default: {WINISD_MAX_POWER_FILE}.",
)
parser.add_argument(
"--disable-winisd-headroom",
action="store_true",
help="Disable WinISD max-power headroom monitoring.",
)
parser.add_argument(
"--headroom-smoothing-alpha",
type=float,
default=HEADROOM_SMOOTHING_ALPHA,
help=f"WinISD headroom smoothing alpha. Default: {HEADROOM_SMOOTHING_ALPHA:g}.",
)
parser.add_argument(
"--self-test-winisd",
action="store_true",
help="Run synthetic WinISD headroom tests and exit.",
)
parser.add_argument(
"--list-devices",
action="store_true",
help="List input devices and exit.",
)
parser.add_argument(
"--gui",
action="store_true",
help="Open the GUI with input-side voltage/PSD metrics and output-side power estimates.",
)
return parser
def main(argv: list[str] | None = None) -> int:
parser = build_arg_parser()
args = parser.parse_args(argv)
if args.input_volts_per_sample_unit is None:
args.input_volts_per_sample_unit = CalibrationStore().latest_scale()
if args.blocksize <= 0:
parser.error("--blocksize must be positive")
if args.input_volts_per_sample_unit <= 0:
parser.error("--input-volts-per-sample-unit must be positive")
if args.calibrate_known_rms_v is not None and args.calibrate_known_rms_v <= 0:
parser.error("--calibrate-known-rms-v must be positive")
if args.calibration_duration <= 0:
parser.error("--calibration-duration must be positive")
if args.update_interval <= 0:
parser.error("--update-interval must be positive")
if args.power_update_interval <= 0:
parser.error("--power-update-interval must be positive")
if args.power_fft_size <= 0:
parser.error("--power-fft-size must be positive")
if args.power_fft_size % 2:
parser.error("--power-fft-size must be even")
if not 0.0 <= args.power_overlap < 1.0:
parser.error("--power-overlap must be >= 0 and < 1")
if not 0.0 <= args.power_smoothing_alpha < 1.0:
parser.error("--power-smoothing-alpha must be >= 0 and < 1")
if not 0.0 <= args.headroom_smoothing_alpha < 1.0:
parser.error("--headroom-smoothing-alpha must be >= 0 and < 1")
if args.samplerate is not None and args.samplerate <= 0:
parser.error("--samplerate must be positive")
if args.self_test_winisd:
run_self_test()
for path in (Path(args.winisd_max_power), WINISD_EXCURSION_10W_FILE, WINISD_EXCURSION_100W_FILE):
if path.exists():
WinISDLimitCurve.from_file(path, eps_w=HEADROOM_EPS_W)
if WINISD_EXCURSION_10W_FILE.exists() and WINISD_EXCURSION_100W_FILE.exists():
print(check_excursion_scaling(WINISD_EXCURSION_10W_FILE, WINISD_EXCURSION_100W_FILE).message)
print("WinISD self-test OK")
return 0
sd = load_dependencies()
if args.list_devices:
SoundDeviceManager(sd).print_input_devices()
return 0
if args.gui:
run_gui(args, sd)
return 0
if args.way or args.lf_impedance or args.hf_impedance:
return PowerConsoleRunner(sd, args).run()
config = resolve_config(args, sd)
if args.calibrate_known_rms_v is not None:
learned_calibration, measured_rms_fs, peak_fs = run_input_calibration(
config,
sd,
args.calibrate_known_rms_v,
args.calibration_duration,
wait_for_ready=not args.no_calibration_ready_prompt,
)
cal_path = CalibrationStore().save(
volts_per_sample_unit=learned_calibration,
known_rms_v=args.calibrate_known_rms_v,
measured_rms_sample_by_channel={
str(channel + 1): float(measured_rms_fs[idx])
for idx, channel in enumerate(config.channels_zero_based)
},
peak_sample_by_channel={
str(channel + 1): float(peak_fs[idx])
for idx, channel in enumerate(config.channels_zero_based)
},
device_index=config.device_index,
channels_zero_based=config.channels_zero_based,
samplerate=config.samplerate,
source="cli",
)
print(f"Saved calibration: {cal_path}")
if args.calibrate_only:
return 0
config = InputConfig(
device_index=config.device_index,
channels_zero_based=config.channels_zero_based,
samplerate=config.samplerate,
blocksize=config.blocksize,
input_volts_per_sample_unit=learned_calibration,
update_interval_s=config.update_interval_s,
)
try:
run_capture(config, sd)
except KeyboardInterrupt:
print("\nStopped.")
return 0
return 0
if __name__ == "__main__":
raise SystemExit(main(sys.argv[1:]))