"""Live soundcard input monitor for DSP-output voltage measurements. This first milestone intentionally does not do FFT, impedance loading, or power estimation. It only verifies the audio-interface input path and reports per-channel RMS/peak levels. """ from __future__ import annotations import argparse import json import math import queue import sys import time from dataclasses import dataclass from datetime import datetime from pathlib import Path from lspower.audio import ( QueuedInputStream, SoundDeviceManager, format_channel_list, parse_channel, parse_channel_list, ) from lspower.impedance import ImpedanceCurve from lspower.power import PowerEstimator, PowerFrame, VoltageSpectrumEstimator, VoltageSpectrumFrame, WayConfig from lspower.winisd import WinISDLimitCurve, check_excursion_scaling, headroom_status, run_self_test np = None # --------------------------------------------------------------------------- # User defaults # --------------------------------------------------------------------------- # None asks sounddevice/PortAudio to use the device default sample rate. DEFAULT_SAMPLERATE: float | None = None # Audio callback block size in frames. This is only for input-level monitoring. DEFAULT_BLOCKSIZE = 2048 DEFAULT_POWER_FFT_SIZE = 8192 DEFAULT_POWER_OVERLAP = 0.75 DEFAULT_POWER_SMOOTHING_ALPHA = 0.95 DEFAULT_POWER_UPDATE_INTERVAL_S = 0.25 DEFAULT_AMP_GAIN_DB = 32.0 DEFAULT_POWER_WARNING_W = 400.0 DEFAULT_POWER_CRITICAL_W = 550.0 WINISD_MAX_POWER_FILE = Path("winisd/max_power.csv") WINISD_EXCURSION_10W_FILE = Path("winisd/excursion_10w.csv") WINISD_EXCURSION_100W_FILE = Path("winisd/excursion_100w.csv") ENABLE_WINISD_HEADROOM = True HEADROOM_EPS_W = 1e-12 HEADROOM_SMOOTHING_ALPHA = 0.9 # Convert sounddevice's normalized floating-point samples to volts. # # The calibration factor is path-specific. It includes the analog input gain, # driver/API path, Windows recording level, and any mixer/routing gain before # sounddevice returns samples. It should not be interpreted as the MOTU's # hardware full-scale voltage unless all of those stages are known to be unity. # # Current measured reference: # Device path: UltraLite mk5 Line In 3-4 through Windows MME, device 3, ch 1 # Reference signal: 1 kHz sine, 2.32 Vrms at the input # Captured level: 0.021384357 sample units RMS DEFAULT_INPUT_CALIBRATION_REFERENCE_RMS_V = 2.32 DEFAULT_INPUT_CALIBRATION_REFERENCE_RMS_SAMPLE = 0.021384357 DEFAULT_INPUT_VOLTS_PER_SAMPLE_UNIT = ( DEFAULT_INPUT_CALIBRATION_REFERENCE_RMS_V / DEFAULT_INPUT_CALIBRATION_REFERENCE_RMS_SAMPLE ) DEFAULT_INPUT_FULL_SCALE_SINE_RMS_V = DEFAULT_INPUT_VOLTS_PER_SAMPLE_UNIT / math.sqrt(2.0) # How often the live console line is refreshed. DEFAULT_UPDATE_INTERVAL_S = 0.1 # Calibration capture length. Use a steady REW sine generator tone, e.g. 1 kHz, # and enter the calibrated true-RMS voltage measured at the soundcard input. DEFAULT_CALIBRATION_DURATION_S = 3.0 MIN_CALIBRATION_RMS_FS = 1e-4 # Warn visually when any selected input approaches digital full scale. CLIPPING_WARNING_THRESHOLD_FS = 0.98 OSCILLOSCOPE_TRACE_COLORS = ("#1f77b4", "#d62728", "#2ca02c", "#9467bd") CALIBRATION_DIR = Path("calibrations") @dataclass(frozen=True) class InputConfig: device_index: int channels_zero_based: list[int] samplerate: float | None blocksize: int input_volts_per_sample_unit: float update_interval_s: float @property def stream_channel_count(self) -> int: """Open enough device channels to include the highest selected input.""" return max(self.channels_zero_based) + 1 class CalibrationStore: def __init__(self, directory: Path = CALIBRATION_DIR): self.directory = directory def latest(self) -> dict | None: if not self.directory.exists(): return None files = sorted(self.directory.glob("calibration_*.json")) for path in reversed(files): try: data = json.loads(path.read_text(encoding="utf-8")) except (OSError, json.JSONDecodeError): continue if "volts_per_sample_unit" in data: data["_path"] = str(path) return data return None def latest_scale(self, fallback: float = DEFAULT_INPUT_VOLTS_PER_SAMPLE_UNIT) -> float: data = self.latest() if data is None: return fallback try: scale = float(data["volts_per_sample_unit"]) except (TypeError, ValueError): return fallback return scale if scale > 0 else fallback def save( self, *, volts_per_sample_unit: float, known_rms_v: float, measured_rms_sample_by_channel: dict[str, float], peak_sample_by_channel: dict[str, float], device_index: int, channels_zero_based: list[int], samplerate: float | None, source: str, ) -> Path: self.directory.mkdir(parents=True, exist_ok=True) now = datetime.now() timestamp = now.strftime("%Y%m%d_%H%M%S") path = self.directory / f"calibration_{timestamp}.json" data = { "created_at": now.isoformat(timespec="seconds"), "source": source, "volts_per_sample_unit": volts_per_sample_unit, "equivalent_full_scale_sine_rms_v": volts_per_sample_unit / math.sqrt(2.0), "known_rms_v": known_rms_v, "measured_rms_sample_by_channel": measured_rms_sample_by_channel, "peak_sample_by_channel": peak_sample_by_channel, "device_index": device_index, "channels_one_based": [channel + 1 for channel in channels_zero_based], "samplerate": samplerate, } path.write_text(json.dumps(data, indent=2), encoding="utf-8") return path def load_dependencies(): global np try: import sounddevice as sd # type: ignore except ImportError as exc: raise SystemExit( "The 'sounddevice' package is required.\n" "Install it with: python -m pip install sounddevice" ) from exc try: import numpy as numpy_module # type: ignore except ImportError as exc: raise SystemExit( "The 'numpy' package is required.\n" "Install it with: python -m pip install numpy" ) from exc np = numpy_module return sd def resolve_config(args: argparse.Namespace, sd) -> InputConfig: device_manager = SoundDeviceManager(sd) input_devices = device_manager.print_input_devices() if args.device is None: device_index = device_manager.prompt_input_device_index(input_devices) else: device_index = args.device device = device_manager.device_info(device_index) max_input_channels = int(device["max_input_channels"]) if args.channels is None: channels_zero_based = device_manager.prompt_channels(max_input_channels) else: try: channels_zero_based = parse_channel_list(args.channels) device_manager.validate_channels(channels_zero_based, max_input_channels) except ValueError as exc: raise SystemExit(f"Invalid channel selection: {exc}") from exc return InputConfig( device_index=device_index, channels_zero_based=channels_zero_based, samplerate=args.samplerate, blocksize=args.blocksize, input_volts_per_sample_unit=args.input_volts_per_sample_unit, update_interval_s=args.update_interval, ) def compute_levels( block, selected_channels_zero_based: list[int], input_volts_per_sample_unit: float, ) -> tuple: if np is None: raise RuntimeError("numpy has not been loaded") selected = block[:, selected_channels_zero_based].astype(np.float64, copy=False) volts = selected * input_volts_per_sample_unit rms_fs = np.sqrt(np.mean(selected * selected, axis=0)) rms_v = np.sqrt(np.mean(volts * volts, axis=0)) peak_fs = np.max(np.abs(selected), axis=0) return rms_fs, rms_v, peak_fs def compute_volts_per_sample_unit(known_rms_v: float, measured_rms_sample) -> tuple: if np is None: raise RuntimeError("numpy has not been loaded") measured = np.asarray(measured_rms_sample, dtype=np.float64) if np.any(measured < MIN_CALIBRATION_RMS_FS): raise ValueError( "measured RMS is too close to zero; start REW's generator and check the selected input channel" ) per_channel = known_rms_v / measured return per_channel, float(np.mean(per_channel)) def format_levels( channels_zero_based: list[int], rms_fs, rms_v, peak_fs, ) -> str: parts = [] for idx, channel in enumerate(channels_zero_based): warning = " CLIP?" if peak_fs[idx] >= CLIPPING_WARNING_THRESHOLD_FS else "" parts.append( f"ch {channel + 1}: " f"rms={rms_fs[idx]:.6f} sample / {rms_v[idx]:.6f} V, " f"peak={peak_fs[idx]:.6f} FS{warning}" ) return " | ".join(parts) def smooth_headroom_db(previous: float | None, raw: float, alpha: float) -> float: if previous is None or not math.isfinite(previous) or not math.isfinite(raw) or alpha <= 0.0: return raw return alpha * previous + (1.0 - alpha) * raw def format_headroom(headroom: dict | None) -> str: if not headroom: return "WinISD headroom disabled" return ( f"WinISD headroom={headroom['worst_headroom_db']:+6.2f} dB " f"@ {headroom['critical_frequency_hz']:7.1f} Hz | {headroom['status']}" ) class PowerConsoleRunner: """Live console runner for estimated amplifier output power.""" def __init__(self, sd, args: argparse.Namespace): self.sd = sd self.args = args self.device_manager = SoundDeviceManager(sd) self.ways = self._build_way_configs() self.channels = sorted({way.channel_zero_based for way in self.ways}) self.device_index = self._resolve_device_index() self._validate_audio_config() self.impedances = { way.name: ImpedanceCurve.from_file(way.impedance_path) for way in self.ways } self.capture = QueuedInputStream( sd=sd, device_index=self.device_index, channels_zero_based=self.channels, samplerate=args.samplerate, blocksize=args.blocksize, ) self.estimators: dict[str, PowerEstimator] = {} self.buffers: dict[str, object] = {} self.headroom_curves: dict[str, WinISDLimitCurve] = {} self.headroom_smoothed_db: dict[str, float] = {} self.hop_size = 0 @staticmethod def parse_way(text: str) -> WayConfig: parts = text.split(":", 2) if len(parts) != 3: raise argparse.ArgumentTypeError( "way must be NAME:CHANNEL:IMPEDANCE_FILE[:GAIN_DB], e.g. LF:1:woofer.csv:34" ) name = parts[0].strip() if not name: raise argparse.ArgumentTypeError("way name must not be empty") try: channel = parse_channel(parts[1].strip()) except ValueError as exc: raise argparse.ArgumentTypeError(str(exc)) from exc path_text = parts[2].strip() gain_db = DEFAULT_AMP_GAIN_DB if ":" in path_text: maybe_path, maybe_gain = path_text.rsplit(":", 1) try: gain_db = float(maybe_gain) path_text = maybe_path except ValueError: pass return WayConfig( name=name, channel_zero_based=channel, impedance_path=Path(path_text), amp_gain_db=gain_db, ) def _build_way_configs(self) -> list[WayConfig]: ways: list[WayConfig] = [] if self.args.way: ways.extend(self.args.way) if self.args.lf_impedance: ways.append( WayConfig( name="LF", channel_zero_based=self.args.lf_channel, impedance_path=Path(self.args.lf_impedance), amp_gain_db=self.args.lf_amp_gain_db, ) ) if self.args.hf_impedance: ways.append( WayConfig( name="HF", channel_zero_based=self.args.hf_channel, impedance_path=Path(self.args.hf_impedance), amp_gain_db=self.args.hf_amp_gain_db, ) ) if not ways: raise SystemExit( "Configure at least one way with --way NAME:CHANNEL:IMPEDANCE_FILE[:GAIN_DB] " "or --lf-impedance/--hf-impedance." ) seen: dict[int, str] = {} for way in ways: if way.channel_zero_based in seen: raise SystemExit( f"Input channel {way.channel_zero_based + 1} is used by both " f"{seen[way.channel_zero_based]} and {way.name}." ) seen[way.channel_zero_based] = way.name return ways def _resolve_device_index(self) -> int: entries = self.device_manager.print_input_devices() if self.args.device is not None: return self.args.device return self.device_manager.prompt_input_device_index(entries) def _validate_audio_config(self) -> None: device = self.device_manager.device_info(self.device_index) max_input_channels = int(device["max_input_channels"]) self.device_manager.validate_channels(self.channels, max_input_channels) def _create_estimators(self) -> None: actual_samplerate = self.capture.actual_samplerate self.hop_size = int(round(self.args.power_fft_size * (1.0 - self.args.power_overlap))) if self.hop_size <= 0: raise SystemExit("--power-overlap is too high; resulting hop size is zero") self.estimators = { way.name: PowerEstimator( way=way, impedance=self.impedances[way.name], samplerate=actual_samplerate, fft_size=self.args.power_fft_size, input_volts_per_sample_unit=self.args.input_volts_per_sample_unit, smoothing_alpha=self.args.power_smoothing_alpha, ) for way in self.ways } self.buffers = { way.name: np.zeros(0, dtype=np.float64) for way in self.ways } self._create_headroom_curves() def _create_headroom_curves(self) -> None: self.headroom_curves = {} self.headroom_smoothed_db = {} if self.args.disable_winisd_headroom: return path = Path(self.args.winisd_max_power) lf_ways = [way for way in self.ways if way.name.upper() == "LF"] if not lf_ways: return try: curve = WinISDLimitCurve.from_file(path, eps_w=HEADROOM_EPS_W) except Exception as exc: print(f"WinISD headroom disabled: {exc}", file=sys.stderr) return for way in lf_ways: self.headroom_curves[way.name] = curve def format_power_line(self, name: str, frame: PowerFrame) -> str: headroom = self._compute_headroom(name, frame) line = ( f"{name}: " f"Vdsp={frame.rms_input_v:9.4f} Vrms, " f"Vamp={frame.rms_amp_v:9.2f} Vrms, " f"P={frame.total_p_w:10.2f} W, " f"Q={frame.total_q_var:10.2f} var, " f"Sapp={frame.total_s_va:10.2f} VA" ) if headroom: line += ", " + format_headroom(headroom) return line def _compute_headroom(self, name: str, frame: PowerFrame) -> dict | None: curve = self.headroom_curves.get(name) if curve is None: return None headroom = curve.compute_headroom(frame.p_w_per_bin, frame.frequencies_hz) raw = float(headroom["worst_headroom_db"]) smoothed = smooth_headroom_db( self.headroom_smoothed_db.get(name), raw, self.args.headroom_smoothing_alpha, ) self.headroom_smoothed_db[name] = smoothed headroom["worst_headroom_db"] = smoothed headroom["status"] = headroom_status(smoothed) headroom["over_limit_bool"] = bool(math.isfinite(smoothed) and smoothed <= 0.0) return headroom def run(self) -> int: try: self.capture.start() except Exception as exc: raise SystemExit(f"Could not open input stream: {exc}") from exc try: self._create_estimators() actual_samplerate = self.capture.actual_samplerate print() print("Starting DSP-output power estimator. Press Ctrl+C to stop.") print(f"Device index: {self.device_index}") print(f"Actual samplerate: {actual_samplerate:g} Hz") print( f"FFT size: {self.args.power_fft_size}, hop size: {self.hop_size}, " f"df: {actual_samplerate / self.args.power_fft_size:g} Hz" ) print(f"Input voltage scale: {self.args.input_volts_per_sample_unit:g} V / sample unit") print("Assumption: ideal linear amplifier voltage gain per way.") if self.headroom_curves: print(f"WinISD headroom: {Path(self.args.winisd_max_power)}") if WINISD_EXCURSION_10W_FILE.exists() and WINISD_EXCURSION_100W_FILE.exists(): try: print(check_excursion_scaling(WINISD_EXCURSION_10W_FILE, WINISD_EXCURSION_100W_FILE).message) except Exception as exc: print(f"Excursion check skipped: {exc}") else: print("WinISD headroom: disabled") for way in self.ways: curve = self.impedances[way.name] print( f" {way.name}: input ch {way.channel_zero_based + 1}, " f"amp gain {way.amp_gain_db:g} dB ({way.amp_gain_linear:g}x), " f"impedance {curve.source}, range {curve.frequency_hz[0]:g}-{curve.frequency_hz[-1]:g} Hz" ) print() latest_frames: dict[str, PowerFrame] = {} last_print = 0.0 while True: try: block, status = self.capture.queue.get(timeout=0.5) except queue.Empty: continue if status: print(f"\nStream status: {status}", file=sys.stderr) for way in self.ways: self.buffers[way.name] = np.concatenate( (self.buffers[way.name], block[:, way.channel_zero_based].astype(np.float64)) ) while len(self.buffers[way.name]) >= self.args.power_fft_size: latest_frames[way.name] = self.estimators[way.name].estimate( self.buffers[way.name][: self.args.power_fft_size] ) self.buffers[way.name] = self.buffers[way.name][self.hop_size :] now = time.monotonic() if latest_frames and now - last_print >= self.args.power_update_interval: lines = [ self.format_power_line(name, latest_frames[name]) for name in sorted(latest_frames) ] print("\r" + " | ".join(lines), end="", flush=True) last_print = now except KeyboardInterrupt: print("\nStopped.") return 0 finally: self.capture.stop() def run_input_calibration( config: InputConfig, sd, known_rms_v: float, duration_s: float, wait_for_ready: bool = True, ) -> tuple[float, object, object]: if np is None: raise RuntimeError("numpy has not been loaded") capture = QueuedInputStream( sd=sd, device_index=config.device_index, channels_zero_based=config.channels_zero_based, samplerate=config.samplerate, blocksize=config.blocksize, max_queue_blocks=32, ) try: capture.start() except Exception as exc: raise SystemExit(f"Could not open input stream: {exc}") from exc print() print("Input calibration") print("Use REW's generator with a steady sine tone, e.g. 1 kHz.") print("Keep the tone below clipping, then enter its calibrated true-RMS voltage.") print(f"Device index: {config.device_index}") print(f"Selected channels: {format_channel_list(config.channels_zero_based)}") print(f"Known RMS voltage: {known_rms_v:g} V") if wait_for_ready: input("Start the REW sine generator now, then press Enter to measure...") print(f"Measuring for {duration_s:g} s...") sums = np.zeros(len(config.channels_zero_based), dtype=np.float64) frame_count = 0 peak_fs = np.zeros(len(config.channels_zero_based), dtype=np.float64) deadline = time.monotonic() + duration_s try: while time.monotonic() < deadline: try: block, status = capture.queue.get(timeout=0.5) except queue.Empty: continue if status: print(f"Stream status: {status}") selected = block[:, config.channels_zero_based].astype(np.float64, copy=False) sums += np.sum(selected * selected, axis=0) frame_count += selected.shape[0] peak_fs = np.maximum(peak_fs, np.max(np.abs(selected), axis=0)) finally: capture.stop() if frame_count == 0: raise SystemExit("No input samples were captured during calibration.") measured_rms_fs = np.sqrt(sums / frame_count) try: per_channel, average = compute_volts_per_sample_unit(known_rms_v, measured_rms_fs) except ValueError as exc: measured_text = ", ".join( f"ch {channel + 1}: {measured_rms_fs[idx]:.9f} sample units RMS" for idx, channel in enumerate(config.channels_zero_based) ) raise SystemExit(f"Calibration failed: {exc}\nMeasured: {measured_text}") from exc print() for idx, channel in enumerate(config.channels_zero_based): warning = " CLIP?" if peak_fs[idx] >= CLIPPING_WARNING_THRESHOLD_FS else "" print( f"ch {channel + 1}: measured {measured_rms_fs[idx]:.9f} sample units RMS, " f"peak {peak_fs[idx]:.6f} FS{warning}, " f"scale {per_channel[idx]:.9g} V / sample unit" ) print(f"Average voltage scale: {average:.9g} V / sample unit") print(f"Equivalent full-scale sine RMS: {average / math.sqrt(2.0):.9g} Vrms") print(f"Use: --input-volts-per-sample-unit {average:.9g}") return average, measured_rms_fs, peak_fs def run_capture(config: InputConfig, sd) -> None: capture = QueuedInputStream( sd=sd, device_index=config.device_index, channels_zero_based=config.channels_zero_based, samplerate=config.samplerate, blocksize=config.blocksize, max_queue_blocks=8, ) try: capture.start() except Exception as exc: raise SystemExit(f"Could not open input stream: {exc}") from exc print() print("Starting input monitor. Press Ctrl+C to stop.") print(f"Device index: {config.device_index}") print(f"Selected channels: {format_channel_list(config.channels_zero_based)}") print(f"Voltage scale: {config.input_volts_per_sample_unit:g} V / sample unit") print( "Equivalent full-scale sine RMS: " f"{config.input_volts_per_sample_unit / math.sqrt(2.0):g} Vrms" ) last_print = 0.0 try: actual_samplerate = capture.actual_samplerate print(f"Actual stream samplerate: {actual_samplerate:g} Hz") print() while True: try: block, status = capture.queue.get(timeout=0.5) except queue.Empty: continue now = time.monotonic() if now - last_print < config.update_interval_s: continue rms_fs, rms_v, peak_fs = compute_levels( block, config.channels_zero_based, config.input_volts_per_sample_unit, ) status_text = f" | status: {status}" if status else "" line = format_levels(config.channels_zero_based, rms_fs, rms_v, peak_fs) print(f"\r{line}{status_text}", end="", flush=True) last_print = now finally: capture.stop() class LevelMonitorGUI: def __init__(self, sd, args: argparse.Namespace): import tkinter as tk from tkinter import messagebox, ttk self.sd = sd self.args = args self.tk = tk self.messagebox = messagebox self.ttk = ttk self.root = tk.Tk() self.root.title("Soundcard Input Voltage Monitor") self.root.geometry("1180x820") self.root.minsize(980, 680) self.device_manager = SoundDeviceManager(sd) self.capture: QueuedInputStream | None = None self.config: InputConfig | None = None self.calibration_state: dict | None = None self.input_spectrum_estimators: dict[int, VoltageSpectrumEstimator] = {} self.input_spectrum_buffers: dict[int, object] = {} self.input_spectrum_frames: dict[int, VoltageSpectrumFrame] = {} self.power_estimators: dict[str, PowerEstimator] = {} self.power_buffers: dict[str, object] = {} self.power_rows: dict[str, dict] = {} self.power_history: dict[str, list[tuple[float, float, float]]] = {} self.power_max_hold_w: dict[str, float] = {} self.latest_power_frames: dict[str, PowerFrame] = {} self.latest_headroom_frames: dict[str, dict] = {} self.headroom_curves: dict[str, WinISDLimitCurve] = {} self.headroom_smoothed_db: dict[str, float] = {} self.power_history_start_s: float | None = None self.power_hop_size = 0 self.level_rows: list[dict] = [] self.input_metric_rows: dict[int, dict] = {} self.device_entries = self.device_manager.input_devices() if not self.device_entries: raise SystemExit("No input-capable sounddevice devices were found.") self.device_var = tk.StringVar() self.channels_var = tk.StringVar(value=args.channels or "1,2") self.samplerate_var = tk.StringVar( value="" if args.samplerate is None else f"{args.samplerate:g}" ) self.blocksize_var = tk.StringVar(value=str(args.blocksize)) self.calibration_var = tk.StringVar(value=f"{args.input_volts_per_sample_unit:g}") self.known_rms_v_var = tk.StringVar( value=( f"{args.calibrate_known_rms_v:g}" if args.calibrate_known_rms_v else f"{DEFAULT_INPUT_CALIBRATION_REFERENCE_RMS_V:g}" ) ) self.calibration_duration_var = tk.StringVar(value=f"{args.calibration_duration:g}") self.waveform_y_limit_var = tk.StringVar(value="") self.power_history_window_var = tk.StringVar(value="30") self.power_fft_size_var = tk.StringVar(value=str(DEFAULT_POWER_FFT_SIZE)) self.power_overlap_var = tk.StringVar(value=f"{DEFAULT_POWER_OVERLAP:g}") self.power_smoothing_var = tk.StringVar(value=f"{DEFAULT_POWER_SMOOTHING_ALPHA:g}") self.winisd_enabled_var = tk.BooleanVar(value=not args.disable_winisd_headroom and ENABLE_WINISD_HEADROOM) self.winisd_max_power_var = tk.StringVar(value=str(args.winisd_max_power or WINISD_MAX_POWER_FILE)) self.headroom_smoothing_var = tk.StringVar(value=f"{args.headroom_smoothing_alpha:g}") self.winisd_status_var = tk.StringVar(value="WinISD headroom uses per-bin P/Pmax for LF.") self.lf_enabled_var = tk.BooleanVar(value=True) self.lf_channel_var = tk.StringVar(value="1") self.lf_impedance_var = tk.StringVar(value="impedance.txt" if Path("impedance.txt").exists() else "") self.lf_gain_var = tk.StringVar(value=f"{DEFAULT_AMP_GAIN_DB:g}") self.hf_enabled_var = tk.BooleanVar(value=False) self.hf_channel_var = tk.StringVar(value="2") self.hf_impedance_var = tk.StringVar(value="") self.hf_gain_var = tk.StringVar(value=f"{DEFAULT_AMP_GAIN_DB:g}") self.status_var = tk.StringVar(value="Stopped") self.power_style = ttk.Style() self.power_style.configure("PowerOk.Horizontal.TProgressbar", troughcolor="#d9d9d9", background="#22a447") self.power_style.configure("PowerWarn.Horizontal.TProgressbar", troughcolor="#d9d9d9", background="#e6a400") self.power_style.configure("PowerCritical.Horizontal.TProgressbar", troughcolor="#d9d9d9", background="#d62728") self._build_widgets() self._select_initial_device(args.device) self.root.protocol("WM_DELETE_WINDOW", self.close) self.root.after(50, self.poll_audio_queue) def _build_widgets(self) -> None: ttk = self.ttk outer = ttk.Frame(self.root, padding=12) outer.pack(fill="both", expand=True) config_frame = ttk.LabelFrame(outer, text="Input setup", padding=10) config_frame.pack(fill="x") config_frame.columnconfigure(1, weight=1) config_frame.columnconfigure(5, weight=1) ttk.Label(config_frame, text="Device").grid(row=0, column=0, sticky="w") self.device_combo = ttk.Combobox( config_frame, textvariable=self.device_var, values=[entry.label for entry in self.device_entries], state="readonly", ) self.device_combo.grid(row=0, column=1, columnspan=5, sticky="ew", padx=(8, 0)) ttk.Label(config_frame, text="Channels").grid(row=1, column=0, sticky="w", pady=(8, 0)) ttk.Entry(config_frame, textvariable=self.channels_var, width=12).grid( row=1, column=1, sticky="w", padx=(8, 16), pady=(8, 0) ) ttk.Label(config_frame, text="Sample rate").grid(row=1, column=2, sticky="w", pady=(8, 0)) ttk.Entry(config_frame, textvariable=self.samplerate_var, width=12).grid( row=1, column=3, sticky="w", padx=(8, 16), pady=(8, 0) ) ttk.Label(config_frame, text="Blocksize").grid(row=1, column=4, sticky="w", pady=(8, 0)) ttk.Entry(config_frame, textvariable=self.blocksize_var, width=10).grid( row=1, column=5, sticky="w", padx=(8, 0), pady=(8, 0) ) ttk.Label(config_frame, text="Voltage scale").grid(row=2, column=0, sticky="w", pady=(8, 0)) ttk.Entry(config_frame, textvariable=self.calibration_var, width=12).grid( row=2, column=1, sticky="w", padx=(8, 4), pady=(8, 0) ) ttk.Label(config_frame, text="V per sample unit").grid( row=2, column=2, sticky="w", padx=(0, 16), pady=(8, 0) ) self.start_button = ttk.Button(config_frame, text="Start", command=self.start) self.start_button.grid(row=2, column=3, sticky="ew", padx=(0, 8), pady=(8, 0)) self.stop_button = ttk.Button(config_frame, text="Stop", command=self.stop, state="disabled") self.stop_button.grid(row=2, column=4, sticky="ew", padx=(0, 16), pady=(8, 0)) ttk.Label(config_frame, textvariable=self.status_var).grid( row=2, column=5, sticky="w", pady=(8, 0) ) ttk.Label(config_frame, text="Cal signal").grid(row=3, column=0, sticky="w", pady=(8, 0)) ttk.Entry(config_frame, textvariable=self.known_rms_v_var, width=12).grid( row=3, column=1, sticky="w", padx=(8, 4), pady=(8, 0) ) ttk.Label(config_frame, text="Vrms at input").grid( row=3, column=2, sticky="w", padx=(0, 16), pady=(8, 0) ) ttk.Label(config_frame, text="Seconds").grid(row=3, column=3, sticky="w", pady=(8, 0)) ttk.Entry(config_frame, textvariable=self.calibration_duration_var, width=10).grid( row=3, column=4, sticky="w", padx=(8, 16), pady=(8, 0) ) self.calibrate_button = ttk.Button( config_frame, text="Calibrate", command=self.start_calibration, state="disabled", ) self.calibrate_button.grid(row=3, column=5, sticky="ew", pady=(8, 0)) self.level_frame = ttk.LabelFrame(outer, text="Input-side metrics", padding=10) self.level_frame.pack(fill="x", pady=(12, 0)) self.level_frame.columnconfigure(6, weight=1) input_note = ttk.Label( self.level_frame, text="Measured at DSP output / amplifier input, using the current voltage scale.", ) input_note.grid(row=0, column=0, columnspan=7, sticky="w", pady=(0, 8)) for col, heading in enumerate(("Ch", "RMS voltage", "Peak voltage", "Crest factor", "Crest", "Digital peak")): ttk.Label(self.level_frame, text=heading).grid( row=1, column=col, sticky="w" if col == 0 else "e", padx=(0, 18 if col < 5 else 0), ) self.input_metric_static_widget_count = len(self.level_frame.winfo_children()) self.power_frame = ttk.LabelFrame(outer, text="Estimated amplifier output power", padding=10) self.power_frame.pack(fill="x", pady=(12, 0)) self.power_frame.columnconfigure(3, weight=1) power_note = ttk.Label( self.power_frame, text="Output side is estimated from DSP voltage, ideal amplifier gain, and the impedance curve.", ) power_note.grid(row=0, column=0, columnspan=10, sticky="w", pady=(0, 8)) ttk.Label(self.power_frame, text="FFT").grid(row=1, column=0, sticky="w") ttk.Entry(self.power_frame, textvariable=self.power_fft_size_var, width=8).grid( row=1, column=1, sticky="w", padx=(8, 16) ) ttk.Label(self.power_frame, text="Overlap").grid(row=1, column=2, sticky="e") ttk.Entry(self.power_frame, textvariable=self.power_overlap_var, width=8).grid( row=1, column=3, sticky="w", padx=(8, 16) ) ttk.Label(self.power_frame, text="Smooth").grid(row=1, column=4, sticky="e") ttk.Entry(self.power_frame, textvariable=self.power_smoothing_var, width=8).grid( row=1, column=5, sticky="w", padx=(8, 16) ) ttk.Checkbutton(self.power_frame, text="WinISD", variable=self.winisd_enabled_var).grid( row=2, column=0, sticky="w", pady=(8, 0) ) ttk.Entry(self.power_frame, textvariable=self.winisd_max_power_var, width=44).grid( row=2, column=1, columnspan=3, sticky="ew", padx=(8, 12), pady=(8, 0) ) ttk.Label(self.power_frame, text="Headroom smooth").grid(row=2, column=4, sticky="e", pady=(8, 0)) ttk.Entry(self.power_frame, textvariable=self.headroom_smoothing_var, width=8).grid( row=2, column=5, sticky="w", padx=(8, 16), pady=(8, 0) ) ttk.Label(self.power_frame, textvariable=self.winisd_status_var).grid( row=2, column=6, columnspan=9, sticky="w", pady=(8, 0) ) headings = ( "Way", "On", "Ch", "Impedance file", "Gain dB", "Vdsp", "Vamp", "P", "Q", "Sapp", "Headroom", "Crit Hz", "Limit", "P meter", "", "Max P", ) for col, heading in enumerate(headings): ttk.Label(self.power_frame, text=heading).grid( row=3, column=col, sticky="w" if col <= 4 else "e", pady=(8, 0), padx=(0, 12 if col < 9 else 0), ) self._build_power_config_row("LF", 4, self.lf_enabled_var, self.lf_channel_var, self.lf_impedance_var, self.lf_gain_var) self._build_power_config_row("HF", 5, self.hf_enabled_var, self.hf_channel_var, self.hf_impedance_var, self.hf_gain_var) plots_frame = ttk.LabelFrame(outer, text="Plot viewer", padding=10) plots_frame.pack(fill="both", expand=True, pady=(12, 0)) plots_frame.columnconfigure(0, weight=1) plots_frame.rowconfigure(0, weight=1) self.plot_tabs = ttk.Notebook(plots_frame) self.plot_tabs.grid(row=0, column=0, sticky="nsew") self.plot_tabs.bind("<>", lambda _event: self._redraw_active_plot()) self.scope_frame = ttk.Frame(self.plot_tabs, padding=10) self.scope_frame.rowconfigure(0, weight=1) self.scope_frame.columnconfigure(0, weight=1) self.scope_canvas = self.tk.Canvas( self.scope_frame, background="#111111", highlightthickness=0, height=260, ) self.scope_canvas.grid(row=0, column=0, sticky="nsew") self.scope_status_var = self.tk.StringVar(value="Waveform shows the newest input block in calibrated volts.") scope_controls = ttk.Frame(self.scope_frame) scope_controls.grid(row=1, column=0, sticky="ew", pady=(8, 0)) scope_controls.columnconfigure(3, weight=1) ttk.Label(scope_controls, text="Y limit").grid(row=0, column=0, sticky="w") ttk.Entry(scope_controls, textvariable=self.waveform_y_limit_var, width=10).grid( row=0, column=1, sticky="w", padx=(8, 4) ) ttk.Label(scope_controls, text="V, blank = auto").grid(row=0, column=2, sticky="w") ttk.Label(scope_controls, textvariable=self.scope_status_var).grid( row=0, column=3, sticky="w", padx=(16, 0) ) self.waveform_y_limit_var.trace_add("write", lambda *_args: self.draw_scope_grid()) self.scope_canvas.bind("", lambda _event: self.draw_scope_grid()) self.draw_scope_grid() self.plot_tabs.add(self.scope_frame, text="Waveform") self.spectrum_frame = ttk.Frame(self.plot_tabs, padding=10) self.spectrum_frame.rowconfigure(0, weight=1) self.spectrum_frame.columnconfigure(0, weight=1) self.spectrum_canvas = self.tk.Canvas( self.spectrum_frame, background="#111111", highlightthickness=0, height=260, ) self.spectrum_canvas.grid(row=0, column=0, sticky="nsew") self.spectrum_status_var = self.tk.StringVar(value="PSD uses Hann/Welch scaling in V^2/Hz.") ttk.Label(self.spectrum_frame, textvariable=self.spectrum_status_var).grid( row=1, column=0, sticky="w", pady=(8, 0) ) self.spectrum_canvas.bind("", lambda _event: self.draw_spectrum_grid()) self.draw_spectrum_grid() self.plot_tabs.add(self.spectrum_frame, text="Input PSD") self.power_history_frame = ttk.Frame(self.plot_tabs, padding=10) self.power_history_frame.rowconfigure(0, weight=1) self.power_history_frame.columnconfigure(0, weight=1) self.power_history_canvas = self.tk.Canvas( self.power_history_frame, background="#111111", highlightthickness=0, height=220, ) self.power_history_canvas.grid(row=0, column=0, sticky="nsew") power_history_controls = ttk.Frame(self.power_history_frame) power_history_controls.grid(row=1, column=0, sticky="ew", pady=(8, 0)) power_history_controls.columnconfigure(3, weight=1) ttk.Label(power_history_controls, text="Window").grid(row=0, column=0, sticky="w") ttk.Entry(power_history_controls, textvariable=self.power_history_window_var, width=10).grid( row=0, column=1, sticky="w", padx=(8, 4) ) ttk.Label(power_history_controls, text="s").grid(row=0, column=2, sticky="w") self.power_history_status_var = self.tk.StringVar(value="P and Sapp over the selected moving window.") ttk.Label(power_history_controls, textvariable=self.power_history_status_var).grid( row=0, column=3, sticky="w", padx=(16, 0) ) self.power_history_window_var.trace_add("write", lambda *_args: self.draw_power_history()) self.power_history_canvas.bind("", lambda _event: self.draw_power_history_grid()) self.draw_power_history_grid() self.plot_tabs.add(self.power_history_frame, text="Power history") self.power_limit_frame = ttk.Frame(self.plot_tabs, padding=10) self.power_limit_frame.rowconfigure(0, weight=1) self.power_limit_frame.columnconfigure(0, weight=1) self.power_limit_canvas = self.tk.Canvas( self.power_limit_frame, background="#111111", highlightthickness=0, height=240, ) self.power_limit_canvas.grid(row=0, column=0, sticky="nsew") self.power_limit_status_var = self.tk.StringVar(value="P_bin and WinISD Pmax over frequency.") ttk.Label(self.power_limit_frame, textvariable=self.power_limit_status_var).grid( row=1, column=0, sticky="w", pady=(8, 0) ) self.power_limit_canvas.bind("", lambda _event: self.draw_power_limit_grid()) self.draw_power_limit_grid() self.plot_tabs.add(self.power_limit_frame, text="P vs WinISD") self.headroom_frame = ttk.Frame(self.plot_tabs, padding=10) self.headroom_frame.rowconfigure(0, weight=1) self.headroom_frame.columnconfigure(0, weight=1) self.headroom_canvas = self.tk.Canvas( self.headroom_frame, background="#111111", highlightthickness=0, height=240, ) self.headroom_canvas.grid(row=0, column=0, sticky="nsew") self.headroom_status_var = self.tk.StringVar(value="H(f,t) = 10 log10(Pmax / P_bin).") ttk.Label(self.headroom_frame, textvariable=self.headroom_status_var).grid( row=1, column=0, sticky="w", pady=(8, 0) ) self.headroom_canvas.bind("", lambda _event: self.draw_headroom_grid()) self.draw_headroom_grid() self.plot_tabs.add(self.headroom_frame, text="Headroom") def _build_power_config_row( self, name: str, row: int, enabled_var, channel_var, impedance_var, gain_var, ) -> None: ttk = self.ttk ttk.Label(self.power_frame, text=name).grid(row=row, column=0, sticky="w", pady=4) ttk.Checkbutton(self.power_frame, variable=enabled_var).grid(row=row, column=1, sticky="w", pady=4) ttk.Entry(self.power_frame, textvariable=channel_var, width=5).grid(row=row, column=2, sticky="w", pady=4) ttk.Entry(self.power_frame, textvariable=impedance_var, width=44).grid(row=row, column=3, sticky="ew", padx=(8, 12), pady=4) ttk.Entry(self.power_frame, textvariable=gain_var, width=8).grid(row=row, column=4, sticky="w", pady=4) vars_by_metric = { "vdsp": self.tk.StringVar(value="-"), "vamp": self.tk.StringVar(value="-"), "p": self.tk.StringVar(value="-"), "q": self.tk.StringVar(value="-"), "s": self.tk.StringVar(value="-"), "headroom": self.tk.StringVar(value="-"), "crit_hz": self.tk.StringVar(value="-"), "limit": self.tk.StringVar(value="-"), "max_p": self.tk.StringVar(value="-"), } for offset, key in enumerate(("vdsp", "vamp", "p", "q", "s", "headroom", "crit_hz", "limit"), start=5): ttk.Label(self.power_frame, textvariable=vars_by_metric[key], width=12, anchor="e").grid( row=row, column=offset, sticky="e", padx=(0, 12 if offset < 9 else 0), pady=4 ) meter = ttk.Progressbar( self.power_frame, orient="horizontal", mode="determinate", maximum=DEFAULT_POWER_CRITICAL_W, style="PowerOk.Horizontal.TProgressbar", length=150, ) meter.grid(row=row, column=13, sticky="ew", padx=(8, 8), pady=4) light = self.tk.Canvas(self.power_frame, width=18, height=18, highlightthickness=0) light.grid(row=row, column=14, sticky="w", padx=(0, 6), pady=4) light_id = light.create_oval(3, 3, 15, 15, fill="#22a447", outline="#555555") ttk.Label(self.power_frame, textvariable=vars_by_metric["max_p"], width=12, anchor="e").grid( row=row, column=15, sticky="e", pady=4 ) vars_by_metric["meter"] = meter vars_by_metric["light"] = light vars_by_metric["light_id"] = light_id self.power_rows[name] = vars_by_metric def _redraw_active_plot(self) -> None: selected = self.plot_tabs.select() if selected == str(self.scope_frame): self.draw_scope_grid() elif selected == str(self.spectrum_frame): self.draw_spectrum_grid() self.draw_spectrum() elif selected == str(self.power_history_frame): self.draw_power_history_grid() self.draw_power_history() elif selected == str(self.power_limit_frame): self.draw_power_limit_grid() self.draw_power_limit() elif selected == str(self.headroom_frame): self.draw_headroom_grid() self.draw_headroom() def _select_initial_device(self, requested_index: int | None) -> None: selected = 0 if requested_index is not None: for idx, entry in enumerate(self.device_entries): if entry.index == requested_index: selected = idx break else: for idx, entry in enumerate(self.device_entries): if "UltraLite" in entry.label: selected = idx break self.device_combo.current(selected) def _parse_gui_config(self) -> InputConfig: combo_index = self.device_combo.current() if combo_index < 0: raise ValueError("select an input device") entry = self.device_entries[combo_index] device_index = entry.index device = entry.info max_input_channels = int(device["max_input_channels"]) channels = parse_channel_list(self.channels_var.get()) self.device_manager.validate_channels(channels, max_input_channels) samplerate_text = self.samplerate_var.get().strip() samplerate = None if samplerate_text == "" else float(samplerate_text) blocksize = int(self.blocksize_var.get()) calibration = float(self.calibration_var.get()) if samplerate is not None and samplerate <= 0: raise ValueError("sample rate must be positive") if blocksize <= 0: raise ValueError("blocksize must be positive") if calibration <= 0: raise ValueError("volts per sample unit must be positive") return InputConfig( device_index=device_index, channels_zero_based=channels, samplerate=samplerate, blocksize=blocksize, input_volts_per_sample_unit=calibration, update_interval_s=self.args.update_interval, ) def _parse_power_settings(self, config: InputConfig, actual_samplerate: float) -> None: if np is None: raise RuntimeError("numpy has not been loaded") fft_size = int(self.power_fft_size_var.get()) overlap = float(self.power_overlap_var.get()) smoothing = float(self.power_smoothing_var.get()) headroom_smoothing = float(self.headroom_smoothing_var.get()) if fft_size <= 0 or fft_size % 2: raise ValueError("power FFT size must be a positive even number") if not 0.0 <= overlap < 1.0: raise ValueError("power overlap must be >= 0 and < 1") if not 0.0 <= smoothing < 1.0: raise ValueError("power smoothing must be >= 0 and < 1") if not 0.0 <= headroom_smoothing < 1.0: raise ValueError("headroom smoothing must be >= 0 and < 1") hop_size = int(round(fft_size * (1.0 - overlap))) if hop_size <= 0: raise ValueError("power overlap is too high") ways: list[WayConfig] = [] specs = ( ("LF", self.lf_enabled_var, self.lf_channel_var, self.lf_impedance_var, self.lf_gain_var), ("HF", self.hf_enabled_var, self.hf_channel_var, self.hf_impedance_var, self.hf_gain_var), ) for name, enabled_var, channel_var, impedance_var, gain_var in specs: if not enabled_var.get(): continue impedance_path = impedance_var.get().strip() if not impedance_path: continue channel = parse_channel_list(channel_var.get())[0] if channel >= config.stream_channel_count: raise ValueError( f"{name} channel {channel + 1} is not captured; include it in Channels" ) ways.append( WayConfig( name=name, channel_zero_based=channel, impedance_path=Path(impedance_path), amp_gain_db=float(gain_var.get()), ) ) names = {way.name for way in ways} for name, row in self.power_rows.items(): if name not in names: for key in ("vdsp", "vamp", "p", "q", "s", "headroom", "crit_hz", "limit", "max_p"): row[key].set("-") row["meter"]["value"] = 0.0 row["meter"]["maximum"] = DEFAULT_POWER_CRITICAL_W row["meter"].configure(style="PowerOk.Horizontal.TProgressbar") row["light"].itemconfigure(row["light_id"], fill="#22a447") self.power_estimators = {} self.power_buffers = {} self.power_history = {} self.power_max_hold_w = {} self.latest_power_frames = {} self.latest_headroom_frames = {} self.headroom_curves = {} self.headroom_smoothed_db = {} self.power_history_start_s = None self.power_hop_size = hop_size self.input_spectrum_estimators = {} self.input_spectrum_buffers = {} self.input_spectrum_frames = {} for channel in config.channels_zero_based: self.input_spectrum_estimators[channel] = VoltageSpectrumEstimator( samplerate=actual_samplerate, fft_size=fft_size, input_volts_per_sample_unit=config.input_volts_per_sample_unit, smoothing_alpha=smoothing, ) self.input_spectrum_buffers[channel] = np.zeros(0, dtype=np.float64) for way in ways: impedance = ImpedanceCurve.from_file(way.impedance_path) self.power_estimators[way.name] = PowerEstimator( way=way, impedance=impedance, samplerate=actual_samplerate, fft_size=fft_size, input_volts_per_sample_unit=config.input_volts_per_sample_unit, smoothing_alpha=smoothing, ) self.power_buffers[way.name] = np.zeros(0, dtype=np.float64) self.power_history[way.name] = [] self.power_max_hold_w[way.name] = 0.0 self._load_gui_winisd_curves() def _load_gui_winisd_curves(self) -> None: self.headroom_curves = {} self.headroom_smoothed_db = {} if not self.winisd_enabled_var.get(): self.winisd_status_var.set("WinISD headroom disabled.") return lf_estimator = self.power_estimators.get("LF") if lf_estimator is None: self.winisd_status_var.set("WinISD headroom disabled: LF way is not active.") return path = Path(self.winisd_max_power_var.get().strip() or WINISD_MAX_POWER_FILE) try: curve = WinISDLimitCurve.from_file(path, eps_w=HEADROOM_EPS_W) except Exception as exc: self.winisd_status_var.set(f"WinISD headroom disabled: {exc}") return self.headroom_curves["LF"] = curve status = ( f"WinISD headroom active: {path}, " f"{curve.frequency_hz[0]:.1f}-{curve.frequency_hz[-1]:.1f} Hz." ) excursion_10w = WINISD_EXCURSION_10W_FILE excursion_100w = WINISD_EXCURSION_100W_FILE if excursion_10w.exists() and excursion_100w.exists(): try: check = check_excursion_scaling(excursion_10w, excursion_100w) status += " " + check.message except Exception as exc: status += f" Excursion check skipped: {exc}" self.winisd_status_var.set(status) def _process_input_spectrum_block(self, block) -> None: if np is None or not self.input_spectrum_estimators: return for channel, estimator in self.input_spectrum_estimators.items(): samples = block[:, channel].astype(np.float64, copy=False) self.input_spectrum_buffers[channel] = np.concatenate( (self.input_spectrum_buffers[channel], samples) ) latest = None while len(self.input_spectrum_buffers[channel]) >= estimator.fft_size: latest = estimator.estimate( self.input_spectrum_buffers[channel][: estimator.fft_size] ) self.input_spectrum_buffers[channel] = self.input_spectrum_buffers[channel][ self.power_hop_size : ] if latest is None: continue self.input_spectrum_frames[channel] = latest row = self.input_metric_rows.get(channel) if row is not None: row["rms"].set(f"{latest.rms_v:.3f} V") row["peak"].set(f"{latest.peak_v:.3f} V") row["crest"].set(f"{latest.crest_factor:.2f}x") row["crest_db"].set(f"{latest.crest_factor_db:.2f} dB") digital_peak = latest.peak_v / self.config.input_volts_per_sample_unit if self.config else 0.0 row["digital_peak"].set(f"{digital_peak:.5f} FS") self.draw_spectrum() def _process_power_block(self, block) -> None: if np is None or not self.power_estimators: return for name, estimator in self.power_estimators.items(): way = estimator.way samples = block[:, way.channel_zero_based].astype(np.float64, copy=False) self.power_buffers[name] = np.concatenate((self.power_buffers[name], samples)) latest = None while len(self.power_buffers[name]) >= estimator.fft_size: latest = estimator.estimate(self.power_buffers[name][: estimator.fft_size]) self.power_buffers[name] = self.power_buffers[name][self.power_hop_size :] if latest is None: continue self.latest_power_frames[name] = latest row = self.power_rows.get(name) if row is None: continue row["vdsp"].set(f"{latest.rms_input_v:.3f} V") row["vamp"].set(f"{latest.rms_amp_v:.1f} V") row["p"].set(f"{latest.total_p_w:.1f} W") row["q"].set(f"{latest.total_q_var:.1f} var") row["s"].set(f"{latest.total_s_va:.1f} VA") headroom = self._compute_gui_headroom(name, latest) if headroom is None: self.latest_headroom_frames.pop(name, None) row["headroom"].set("-") row["crit_hz"].set("-") row["limit"].set("-") else: self.latest_headroom_frames[name] = headroom row["headroom"].set(f"{headroom['worst_headroom_db']:+.1f} dB") row["crit_hz"].set(f"{headroom['critical_frequency_hz']:.1f}") row["limit"].set(headroom["status"]) self._update_power_meter(name, latest.total_p_w, headroom) self._append_power_history(name, latest) self.draw_power_history() self.draw_power_limit() self.draw_headroom() def _compute_gui_headroom(self, name: str, frame: PowerFrame) -> dict | None: curve = self.headroom_curves.get(name) if curve is None: return None headroom = curve.compute_headroom(frame.p_w_per_bin, frame.frequencies_hz) raw = float(headroom["worst_headroom_db"]) try: alpha = float(self.headroom_smoothing_var.get()) except ValueError: alpha = HEADROOM_SMOOTHING_ALPHA alpha = min(max(alpha, 0.0), 0.999999) smoothed = smooth_headroom_db(self.headroom_smoothed_db.get(name), raw, alpha) self.headroom_smoothed_db[name] = smoothed headroom["worst_headroom_db"] = smoothed headroom["status"] = headroom_status(smoothed) headroom["over_limit_bool"] = bool(math.isfinite(smoothed) and smoothed <= 0.0) return headroom def _update_power_meter(self, name: str, power_w: float, headroom: dict | None = None) -> None: row = self.power_rows.get(name) if row is None: return max_hold = max(self.power_max_hold_w.get(name, 0.0), float(power_w)) self.power_max_hold_w[name] = max_hold if headroom is not None and math.isfinite(float(headroom["critical_pmax_w"])): critical_pmax = max(float(headroom["critical_pmax_w"]), HEADROOM_EPS_W) critical_power = max(float(headroom["critical_power_w"]), 0.0) meter_value = min((critical_power / critical_pmax) * 100.0, 100.0) row["meter"]["maximum"] = 100.0 status = headroom["status"] if status == "LIMIT EXCEEDED": color = "#d62728" style = "PowerCritical.Horizontal.TProgressbar" elif status == "WARNING": color = "#e6a400" style = "PowerWarn.Horizontal.TProgressbar" else: color = "#22a447" style = "PowerOk.Horizontal.TProgressbar" else: meter_value = min(max(float(power_w), 0.0), DEFAULT_POWER_CRITICAL_W) row["meter"]["maximum"] = DEFAULT_POWER_CRITICAL_W if power_w >= DEFAULT_POWER_CRITICAL_W: color = "#d62728" style = "PowerCritical.Horizontal.TProgressbar" elif power_w >= DEFAULT_POWER_WARNING_W: color = "#e6a400" style = "PowerWarn.Horizontal.TProgressbar" else: color = "#22a447" style = "PowerOk.Horizontal.TProgressbar" row["meter"]["value"] = meter_value row["meter"].configure(style=style) row["light"].itemconfigure(row["light_id"], fill=color) row["max_p"].set(f"{max_hold:.1f} W") def _append_power_history(self, name: str, frame: PowerFrame) -> None: now = time.monotonic() if self.power_history_start_s is None: self.power_history_start_s = now t = now - self.power_history_start_s history = self.power_history.setdefault(name, []) history.append((t, frame.total_p_w, frame.total_s_va)) window_s = self._power_history_window_s() keep_after = max(0.0, t - window_s * 1.5) self.power_history[name] = [item for item in history if item[0] >= keep_after] def _power_history_window_s(self) -> float: try: value = float(self.power_history_window_var.get().strip()) except ValueError: value = 30.0 return value if value > 0.0 else 30.0 def _build_level_rows(self, channels_zero_based: list[int]) -> None: for child in self.level_frame.winfo_children()[self.input_metric_static_widget_count:]: child.destroy() self.level_rows = [] self.input_metric_rows = {} for row, channel in enumerate(channels_zero_based, start=2): channel_label = self.ttk.Label(self.level_frame, text=f"Ch {channel + 1}", width=8) channel_label.grid(row=row, column=0, sticky="w", pady=6) vars_by_metric = { "rms": self.tk.StringVar(value="-"), "peak": self.tk.StringVar(value="-"), "crest": self.tk.StringVar(value="-"), "crest_db": self.tk.StringVar(value="-"), "digital_peak": self.tk.StringVar(value="-"), } for col, key in enumerate(("rms", "peak", "crest", "crest_db", "digital_peak"), start=1): self.ttk.Label(self.level_frame, textvariable=vars_by_metric[key], width=16, anchor="e").grid( row=row, column=col, sticky="e", padx=(0, 18 if col < 5 else 0), pady=6, ) self.input_metric_rows[channel] = vars_by_metric self.draw_scope_grid() self.draw_spectrum_grid() def draw_scope_grid(self) -> None: canvas = self.scope_canvas canvas.delete("grid") canvas.delete("trace") width = max(canvas.winfo_width(), 2) height = max(canvas.winfo_height(), 2) y_limit_v = self._scope_y_limit_v() for frac in (0.0, 0.25, 0.5, 0.75, 1.0): y = int(frac * (height - 1)) color = "#555555" if frac == 0.5 else "#2a2a2a" canvas.create_line(0, y, width, y, fill=color, tags="grid") for frac in (0.0, 0.25, 0.5, 0.75, 1.0): x = int(frac * (width - 1)) canvas.create_line(x, 0, x, height, fill="#242424", tags="grid") canvas.create_text(6, 6, anchor="nw", text=f"+{y_limit_v:g} V", fill="#bbbbbb", tags="grid") canvas.create_text(6, height // 2 + 4, anchor="nw", text="0", fill="#bbbbbb", tags="grid") canvas.create_text(6, height - 20, anchor="nw", text=f"-{y_limit_v:g} V", fill="#bbbbbb", tags="grid") if self.config is not None: legend_parts = [] for idx, channel in enumerate(self.config.channels_zero_based): color = OSCILLOSCOPE_TRACE_COLORS[idx % len(OSCILLOSCOPE_TRACE_COLORS)] legend_parts.append((f"Ch {channel + 1}", color)) x = width - 10 for label, color in reversed(legend_parts): text_id = canvas.create_text(x, 6, anchor="ne", text=label, fill=color, tags="grid") bbox = canvas.bbox(text_id) if bbox: x = bbox[0] - 12 def _scope_y_limit_v(self) -> float: text = self.waveform_y_limit_var.get().strip() if text: try: manual_limit = float(text) except ValueError: manual_limit = 0.0 if manual_limit > 0.0: return manual_limit if self.config is None: return DEFAULT_INPUT_VOLTS_PER_SAMPLE_UNIT return self.config.input_volts_per_sample_unit def draw_waveform(self, block) -> None: if self.config is None or np is None: return canvas = self.scope_canvas width = max(canvas.winfo_width(), 2) height = max(canvas.winfo_height(), 2) selected = block[:, self.config.channels_zero_based].astype(np.float64, copy=False) volts = selected * self.config.input_volts_per_sample_unit frame_count = selected.shape[0] if frame_count < 2: return canvas.delete("trace") samples_per_pixel = max(1, frame_count // width) decimated = volts[::samples_per_pixel, :] point_count = decimated.shape[0] if point_count < 2: return x_scale = (width - 1) / (point_count - 1) y_mid = (height - 1) / 2.0 y_limit_v = self._scope_y_limit_v() y_scale = y_mid / y_limit_v for channel_idx in range(decimated.shape[1]): waveform = np.clip( decimated[:, channel_idx], -y_limit_v, y_limit_v, ) points: list[float] = [] for sample_idx, sample in enumerate(waveform): points.extend((sample_idx * x_scale, y_mid - float(sample) * y_scale)) color = OSCILLOSCOPE_TRACE_COLORS[channel_idx % len(OSCILLOSCOPE_TRACE_COLORS)] canvas.create_line( points, fill=color, width=1.5, smooth=False, tags="trace", ) def draw_spectrum_grid(self) -> None: canvas = self.spectrum_canvas canvas.delete("grid") canvas.delete("spectrum") width = max(canvas.winfo_width(), 2) height = max(canvas.winfo_height(), 2) for frac in (0.0, 0.25, 0.5, 0.75, 1.0): y = int(frac * (height - 1)) color = "#555555" if frac == 0.5 else "#2a2a2a" canvas.create_line(0, y, width, y, fill=color, tags="grid") for frac in (0.0, 0.25, 0.5, 0.75, 1.0): x = int(frac * (width - 1)) canvas.create_line(x, 0, x, height, fill="#242424", tags="grid") canvas.create_text(6, 6, anchor="nw", text="PSD dB V^2/Hz", fill="#bbbbbb", tags="grid") canvas.create_text(6, height - 20, anchor="nw", text="20 Hz", fill="#bbbbbb", tags="grid") canvas.create_text(width - 6, height - 20, anchor="ne", text="Nyquist", fill="#bbbbbb", tags="grid") def draw_spectrum(self) -> None: if np is None or not self.input_spectrum_frames: return canvas = self.spectrum_canvas width = max(canvas.winfo_width(), 2) height = max(canvas.winfo_height(), 2) canvas.delete("spectrum") all_db = [] prepared = [] for idx, (channel, frame) in enumerate(sorted(self.input_spectrum_frames.items())): freq = frame.frequencies_hz mask = freq >= 20.0 if np.count_nonzero(mask) < 2: continue freq = freq[mask] db = 10.0 * np.log10(np.maximum(frame.psd_v2_per_hz[mask], 1e-30)) all_db.append(db) prepared.append((idx, channel, freq, db)) if not prepared: return all_values = np.concatenate(all_db) y_max = float(np.nanmax(all_values)) y_min = float(np.nanmin(all_values)) if not np.isfinite(y_min) or not np.isfinite(y_max): return y_max = math.ceil((y_max + 3.0) / 10.0) * 10.0 y_min = min(y_max - 60.0, math.floor((y_min - 3.0) / 10.0) * 10.0) if y_max <= y_min: y_max = y_min + 60.0 f_min = 20.0 f_max = max(float(frame.frequencies_hz[-1]) for frame in self.input_spectrum_frames.values()) log_min = math.log10(f_min) log_max = math.log10(f_max) canvas.create_text(6, 22, anchor="nw", text=f"{y_max:g}", fill="#888888", tags="spectrum") canvas.create_text(6, height - 38, anchor="nw", text=f"{y_min:g}", fill="#888888", tags="spectrum") for idx, channel, freq, db in prepared: x = (np.log10(freq) - log_min) / (log_max - log_min) * (width - 1) y = (y_max - db) / (y_max - y_min) * (height - 1) points: list[float] = [] step = max(1, len(x) // max(width, 1)) for px, py in zip(x[::step], y[::step]): points.extend((float(px), float(np.clip(py, 0, height - 1)))) if len(points) >= 4: color = OSCILLOSCOPE_TRACE_COLORS[idx % len(OSCILLOSCOPE_TRACE_COLORS)] canvas.create_line(points, fill=color, width=1.4, smooth=False, tags="spectrum") canvas.create_text( width - 8, 8 + 16 * idx, anchor="ne", text=f"Ch {channel + 1}", fill=color, tags="spectrum", ) self.spectrum_status_var.set( f"PSD: {y_min:g} to {y_max:g} dB V^2/Hz, Hann/Welch, {len(prepared)} channel(s)" ) def draw_power_history_grid(self) -> None: canvas = self.power_history_canvas canvas.delete("grid") canvas.delete("history") width = max(canvas.winfo_width(), 2) height = max(canvas.winfo_height(), 2) for frac in (0.0, 0.25, 0.5, 0.75, 1.0): y = int(frac * (height - 1)) color = "#555555" if frac == 0.5 else "#2a2a2a" canvas.create_line(0, y, width, y, fill=color, tags="grid") for frac in (0.0, 0.25, 0.5, 0.75, 1.0): x = int(frac * (width - 1)) canvas.create_line(x, 0, x, height, fill="#242424", tags="grid") canvas.create_text(6, 6, anchor="nw", text="P solid / Sapp dashed", fill="#bbbbbb", tags="grid") canvas.create_text(6, height - 20, anchor="nw", text=f"-{self._power_history_window_s():g} s", fill="#bbbbbb", tags="grid") canvas.create_text(width - 6, height - 20, anchor="ne", text="now", fill="#bbbbbb", tags="grid") def draw_power_history(self) -> None: canvas = self.power_history_canvas if not self.power_history: self.draw_power_history_grid() return canvas.delete("history") width = max(canvas.winfo_width(), 2) height = max(canvas.winfo_height(), 2) window_s = self._power_history_window_s() latest_t = max((items[-1][0] for items in self.power_history.values() if items), default=0.0) t_min = max(0.0, latest_t - window_s) prepared = [] all_values: list[float] = [] for idx, (name, items) in enumerate(sorted(self.power_history.items())): visible = [item for item in items if item[0] >= t_min] if len(visible) < 2: continue t = np.asarray([item[0] for item in visible], dtype=np.float64) p = np.asarray([item[1] for item in visible], dtype=np.float64) s = np.asarray([item[2] for item in visible], dtype=np.float64) prepared.append((idx, name, t, p, s)) all_values.extend(float(v) for v in p) all_values.extend(float(v) for v in s) if not prepared: return y_max = max(all_values) y_min = min(0.0, min(all_values)) if y_max <= y_min: y_max = y_min + 1.0 padding = 0.08 * (y_max - y_min) y_max += padding y_min -= padding def to_points(t_values, y_values) -> list[float]: x = (t_values - (latest_t - window_s)) / window_s * (width - 1) y = (y_max - y_values) / (y_max - y_min) * (height - 1) points: list[float] = [] for px, py in zip(x, y): points.extend((float(np.clip(px, 0, width - 1)), float(np.clip(py, 0, height - 1)))) return points canvas.create_text(6, 22, anchor="nw", text=f"{y_max:.1f}", fill="#888888", tags="history") canvas.create_text(6, height - 38, anchor="nw", text=f"{y_min:.1f}", fill="#888888", tags="history") for idx, name, t, p, s in prepared: color = OSCILLOSCOPE_TRACE_COLORS[idx % len(OSCILLOSCOPE_TRACE_COLORS)] p_points = to_points(t, p) s_points = to_points(t, s) if len(p_points) >= 4: canvas.create_line(p_points, fill=color, width=1.8, smooth=False, tags="history") if len(s_points) >= 4: canvas.create_line( s_points, fill=color, width=1.2, smooth=False, dash=(5, 3), tags="history", ) canvas.create_text( width - 8, 8 + 16 * idx, anchor="ne", text=f"{name} P/Sapp", fill=color, tags="history", ) self.power_history_status_var.set( f"Window {window_s:g} s, y {y_min:.1f} to {y_max:.1f} W / VA" ) def _winisd_plot_data(self): for name in sorted(self.latest_headroom_frames): frame = self.latest_power_frames.get(name) headroom = self.latest_headroom_frames.get(name) curve = self.headroom_curves.get(name) if frame is not None and headroom is not None and curve is not None: return name, frame, headroom, curve return None def draw_power_limit_grid(self) -> None: canvas = self.power_limit_canvas canvas.delete("grid") canvas.delete("limit") width = max(canvas.winfo_width(), 2) height = max(canvas.winfo_height(), 2) for frac in (0.0, 0.25, 0.5, 0.75, 1.0): y = int(frac * (height - 1)) color = "#555555" if frac == 0.5 else "#2a2a2a" canvas.create_line(0, y, width, y, fill=color, tags="grid") x = int(frac * (width - 1)) canvas.create_line(x, 0, x, height, fill="#242424", tags="grid") canvas.create_text(6, 6, anchor="nw", text="Power [W], log scale", fill="#bbbbbb", tags="grid") canvas.create_text(6, height - 20, anchor="nw", text="20 Hz", fill="#bbbbbb", tags="grid") canvas.create_text(width - 6, height - 20, anchor="ne", text="Nyquist", fill="#bbbbbb", tags="grid") def draw_power_limit(self) -> None: data = self._winisd_plot_data() if np is None or data is None: return name, frame, headroom, curve = data canvas = self.power_limit_canvas canvas.delete("limit") width = max(canvas.winfo_width(), 2) height = max(canvas.winfo_height(), 2) freq = frame.frequencies_hz pmax = curve.interpolate(freq) p_bin = np.maximum(frame.p_w_per_bin, 0.0) mask = np.isfinite(freq) & np.isfinite(pmax) & (freq >= 20.0) & (pmax > HEADROOM_EPS_W) if np.count_nonzero(mask) < 2: return freq = freq[mask] pmax = pmax[mask] p_bin = p_bin[mask] p_plot = np.maximum(p_bin, 1e-9) f_min = max(20.0, float(freq[0])) f_max = float(freq[-1]) if f_max <= f_min: return log_f_min = math.log10(f_min) log_f_max = math.log10(f_max) positive_values = np.concatenate((p_plot[np.isfinite(p_plot)], pmax[np.isfinite(pmax)])) positive_values = positive_values[positive_values > 0.0] if len(positive_values) == 0: return y_min = max(1e-6, 10.0 ** math.floor(math.log10(float(np.nanmin(positive_values))))) y_max = 10.0 ** math.ceil(math.log10(float(np.nanmax(positive_values)) * 1.2)) if y_max <= y_min: y_max = y_min * 10.0 log_y_min = math.log10(y_min) log_y_max = math.log10(y_max) def to_points(y_values: np.ndarray) -> list[float]: x = (np.log10(freq) - log_f_min) / (log_f_max - log_f_min) * (width - 1) y = (log_y_max - np.log10(np.maximum(y_values, y_min))) / (log_y_max - log_y_min) * (height - 1) step = max(1, len(x) // max(width, 1)) points: list[float] = [] for px, py in zip(x[::step], y[::step]): points.extend((float(np.clip(px, 0, width - 1)), float(np.clip(py, 0, height - 1)))) return points p_points = to_points(p_plot) pmax_points = to_points(pmax) if len(p_points) >= 4: canvas.create_line(p_points, fill="#1f77b4", width=1.4, smooth=False, tags="limit") if len(pmax_points) >= 4: canvas.create_line(pmax_points, fill="#d62728", width=1.8, smooth=False, tags="limit") critical_f = float(headroom["critical_frequency_hz"]) if math.isfinite(critical_f) and f_min <= critical_f <= f_max: x = (math.log10(critical_f) - log_f_min) / (log_f_max - log_f_min) * (width - 1) canvas.create_line(x, 0, x, height, fill="#e6a400", width=1.4, dash=(4, 3), tags="limit") canvas.create_text(6, 22, anchor="nw", text=f"{y_max:g} W", fill="#888888", tags="limit") canvas.create_text(6, height - 38, anchor="nw", text=f"{y_min:g} W", fill="#888888", tags="limit") canvas.create_text(width - 8, 8, anchor="ne", text=f"{name} P_bin", fill="#1f77b4", tags="limit") canvas.create_text(width - 8, 24, anchor="ne", text="WinISD Pmax", fill="#d62728", tags="limit") self.power_limit_status_var.set( f"{name}: P_bin vs Pmax, critical {critical_f:.1f} Hz, " f"worst {headroom['worst_headroom_db']:+.1f} dB" ) def draw_headroom_grid(self) -> None: canvas = self.headroom_canvas canvas.delete("grid") canvas.delete("headroom") width = max(canvas.winfo_width(), 2) height = max(canvas.winfo_height(), 2) for frac in (0.0, 0.25, 0.5, 0.75, 1.0): y = int(frac * (height - 1)) color = "#555555" if frac == 0.5 else "#2a2a2a" canvas.create_line(0, y, width, y, fill=color, tags="grid") x = int(frac * (width - 1)) canvas.create_line(x, 0, x, height, fill="#242424", tags="grid") canvas.create_text(6, 6, anchor="nw", text="Headroom [dB]", fill="#bbbbbb", tags="grid") canvas.create_text(6, height - 20, anchor="nw", text="20 Hz", fill="#bbbbbb", tags="grid") canvas.create_text(width - 6, height - 20, anchor="ne", text="Nyquist", fill="#bbbbbb", tags="grid") def draw_headroom(self) -> None: data = self._winisd_plot_data() if np is None or data is None: return name, frame, headroom, curve = data canvas = self.headroom_canvas canvas.delete("headroom") width = max(canvas.winfo_width(), 2) height = max(canvas.winfo_height(), 2) freq = frame.frequencies_hz pmax = curve.interpolate(freq) h_db = np.asarray(headroom["headroom_db_bin"], dtype=np.float64) mask = np.isfinite(freq) & np.isfinite(pmax) & np.isfinite(h_db) & (freq >= 20.0) & (pmax > HEADROOM_EPS_W) if np.count_nonzero(mask) < 2: return freq = freq[mask] h_db = h_db[mask] f_min = max(20.0, float(freq[0])) f_max = float(freq[-1]) if f_max <= f_min: return log_f_min = math.log10(f_min) log_f_max = math.log10(f_max) y_min = min(-6.0, math.floor((float(np.nanmin(h_db)) - 3.0) / 3.0) * 3.0) h_display_top = min(float(np.nanpercentile(h_db, 95.0)), 60.0) y_max = max(12.0, math.ceil((h_display_top + 3.0) / 3.0) * 3.0) if y_max <= y_min: y_max = y_min + 18.0 h_plot = np.clip(h_db, y_min, y_max) x = (np.log10(freq) - log_f_min) / (log_f_max - log_f_min) * (width - 1) y = (y_max - h_plot) / (y_max - y_min) * (height - 1) step = max(1, len(x) // max(width, 1)) points: list[float] = [] for px, py in zip(x[::step], y[::step]): points.extend((float(np.clip(px, 0, width - 1)), float(np.clip(py, 0, height - 1)))) if len(points) >= 4: canvas.create_line(points, fill="#1f77b4", width=1.5, smooth=False, tags="headroom") def y_for_db(value: float) -> float: return (y_max - value) / (y_max - y_min) * (height - 1) for value, color, label in ((0.0, "#d62728", "0 dB"), (6.0, "#e6a400", "+6 dB")): if y_min <= value <= y_max: y_line = y_for_db(value) canvas.create_line(0, y_line, width, y_line, fill=color, width=1.2, dash=(5, 3), tags="headroom") canvas.create_text(width - 8, y_line - 2, anchor="se", text=label, fill=color, tags="headroom") critical_f = float(headroom["critical_frequency_hz"]) if math.isfinite(critical_f) and f_min <= critical_f <= f_max: x_line = (math.log10(critical_f) - log_f_min) / (log_f_max - log_f_min) * (width - 1) canvas.create_line(x_line, 0, x_line, height, fill="#e6a400", width=1.4, dash=(4, 3), tags="headroom") canvas.create_text(6, 22, anchor="nw", text=f"{y_max:g} dB", fill="#888888", tags="headroom") canvas.create_text(6, height - 38, anchor="nw", text=f"{y_min:g} dB", fill="#888888", tags="headroom") canvas.create_text(width - 8, 8, anchor="ne", text=f"{name} H(f,t)", fill="#1f77b4", tags="headroom") self.headroom_status_var.set( f"{name}: worst {headroom['worst_headroom_db']:+.1f} dB " f"at {critical_f:.1f} Hz, {headroom['status']}" ) def start(self) -> None: if self.capture is not None: return try: config = self._parse_gui_config() except Exception as exc: self.messagebox.showerror("Invalid input setup", str(exc)) return capture = QueuedInputStream( sd=self.sd, device_index=config.device_index, channels_zero_based=config.channels_zero_based, samplerate=config.samplerate, blocksize=config.blocksize, max_queue_blocks=8, ) try: capture.start() actual_samplerate = capture.actual_samplerate self._parse_power_settings(config, actual_samplerate) except Exception as exc: capture.stop() self.messagebox.showerror("Start failed", str(exc)) return self.capture = capture self.config = config self._build_level_rows(config.channels_zero_based) self.start_button.configure(state="disabled") self.stop_button.configure(state="normal") self.calibrate_button.configure(state="normal") self.device_combo.configure(state="disabled") self.status_var.set(f"Running at {actual_samplerate:g} Hz") def stop(self) -> None: if self.capture is not None: self.capture.stop() self.capture = None self.config = None self.calibration_state = None self.input_spectrum_estimators = {} self.input_spectrum_buffers = {} self.input_spectrum_frames = {} self.power_estimators = {} self.power_buffers = {} self.power_history = {} self.power_max_hold_w = {} self.latest_power_frames = {} self.latest_headroom_frames = {} self.headroom_curves = {} self.headroom_smoothed_db = {} self.power_history_start_s = None self.power_hop_size = 0 for row in self.input_metric_rows.values(): for value_var in row.values(): value_var.set("-") for row in self.power_rows.values(): for key in ("vdsp", "vamp", "p", "q", "s", "headroom", "crit_hz", "limit", "max_p"): row[key].set("-") row["meter"]["value"] = 0.0 row["meter"]["maximum"] = DEFAULT_POWER_CRITICAL_W row["meter"].configure(style="PowerOk.Horizontal.TProgressbar") row["light"].itemconfigure(row["light_id"], fill="#22a447") self.draw_spectrum_grid() self.draw_power_history_grid() self.draw_power_limit_grid() self.draw_headroom_grid() self.start_button.configure(state="normal") self.stop_button.configure(state="disabled") self.calibrate_button.configure(state="disabled") self.device_combo.configure(state="readonly") self.status_var.set("Stopped") def start_calibration(self) -> None: if self.config is None or self.capture is None or np is None: self.messagebox.showerror("Calibration unavailable", "Start the input monitor first.") return try: known_rms_v = float(self.known_rms_v_var.get()) duration_s = float(self.calibration_duration_var.get()) except ValueError: self.messagebox.showerror("Invalid calibration", "Cal signal Vrms and seconds must be numbers.") return if known_rms_v <= 0: self.messagebox.showerror("Invalid calibration", "Cal signal Vrms must be positive.") return if duration_s <= 0: self.messagebox.showerror("Invalid calibration", "Calibration duration must be positive.") return self.calibration_state = { "known_rms_v": known_rms_v, "deadline": time.monotonic() + duration_s, "sums": np.zeros(len(self.config.channels_zero_based), dtype=np.float64), "frames": 0, "peak_fs": np.zeros(len(self.config.channels_zero_based), dtype=np.float64), } self.calibrate_button.configure(state="disabled") self.status_var.set(f"Calibrating from {known_rms_v:g} Vrms...") def _accumulate_calibration(self, block) -> None: if self.calibration_state is None or self.config is None or np is None: return selected = block[:, self.config.channels_zero_based].astype(np.float64, copy=False) self.calibration_state["sums"] += np.sum(selected * selected, axis=0) self.calibration_state["frames"] += selected.shape[0] self.calibration_state["peak_fs"] = np.maximum( self.calibration_state["peak_fs"], np.max(np.abs(selected), axis=0), ) if time.monotonic() < self.calibration_state["deadline"]: return frames = self.calibration_state["frames"] if frames <= 0: self.calibration_state = None self.calibrate_button.configure(state="normal") self.messagebox.showerror("Calibration failed", "No input samples were captured.") return try: measured_rms_fs = np.sqrt(self.calibration_state["sums"] / frames) per_channel, average = compute_volts_per_sample_unit( self.calibration_state["known_rms_v"], measured_rms_fs, ) except ValueError as exc: self.calibration_state = None self.calibrate_button.configure(state="normal") self.messagebox.showerror("Calibration failed", str(exc)) return peak_fs = self.calibration_state["peak_fs"] known_rms_v = self.calibration_state["known_rms_v"] self.calibration_state = None self.calibration_var.set(f"{average:.9g}") self.config = InputConfig( device_index=self.config.device_index, channels_zero_based=self.config.channels_zero_based, samplerate=self.config.samplerate, blocksize=self.config.blocksize, input_volts_per_sample_unit=average, update_interval_s=self.config.update_interval_s, ) if self.capture is not None: try: self._parse_power_settings(self.config, self.capture.actual_samplerate) except Exception as exc: self.power_estimators = {} self.power_buffers = {} self.messagebox.showwarning( "Power estimator disabled", f"Voltage calibration was applied, but power estimation could not restart: {exc}", ) self.draw_scope_grid() self.calibrate_button.configure(state="normal") cal_path = CalibrationStore().save( volts_per_sample_unit=average, known_rms_v=known_rms_v, measured_rms_sample_by_channel={ str(channel + 1): float(measured_rms_fs[idx]) for idx, channel in enumerate(self.config.channels_zero_based) }, peak_sample_by_channel={ str(channel + 1): float(peak_fs[idx]) for idx, channel in enumerate(self.config.channels_zero_based) }, device_index=self.config.device_index, channels_zero_based=self.config.channels_zero_based, samplerate=self.config.samplerate, source="gui", ) details = [] for idx, channel in enumerate(self.config.channels_zero_based): warning = " CLIP?" if peak_fs[idx] >= CLIPPING_WARNING_THRESHOLD_FS else "" details.append( f"Ch {channel + 1}: {measured_rms_fs[idx]:.9f} sample units RMS, " f"{per_channel[idx]:.9g} V/sample unit, peak {peak_fs[idx]:.6f} FS{warning}" ) self.status_var.set(f"Voltage scale applied: {average:.9g} V / sample unit") self.messagebox.showinfo( "Calibration complete", "\n".join(details) + f"\n\nAverage applied: {average:.9g} V / sample unit" + f"\nFull-scale sine equivalent: {average / math.sqrt(2.0):.9g} Vrms" + f"\nSaved: {cal_path}", ) def poll_audio_queue(self) -> None: latest = None while True: try: if self.capture is None: break latest = self.capture.queue.get_nowait() except queue.Empty: break if latest is not None and self.config is not None and self.capture is not None: block, status = latest self._accumulate_calibration(block) rms_fs, rms_v, peak_fs = compute_levels( block, self.config.channels_zero_based, self.config.input_volts_per_sample_unit, ) for idx, channel in enumerate(self.config.channels_zero_based): row = self.input_metric_rows.get(channel) if row is not None and channel not in self.input_spectrum_frames: peak_v = float(peak_fs[idx] * self.config.input_volts_per_sample_unit) crest = peak_v / float(rms_v[idx]) if rms_v[idx] > 0 else 0.0 crest_db = 20.0 * math.log10(crest) if crest > 0 else float("-inf") row["rms"].set(f"{rms_v[idx]:.3f} V") row["peak"].set(f"{peak_v:.3f} V") row["crest"].set(f"{crest:.2f}x") row["crest_db"].set(f"{crest_db:.2f} dB") row["digital_peak"].set(f"{peak_fs[idx]:.5f} FS") self._process_input_spectrum_block(block) self._process_power_block(block) self.draw_waveform(block) duration_ms = 1000.0 * block.shape[0] / self.capture.actual_samplerate self.scope_status_var.set( f"Newest block: {block.shape[0]} samples, {duration_ms:.1f} ms, " f"vertical scale +/-{self._scope_y_limit_v():g} V" ) if status: self.status_var.set(f"Running at {self.capture.actual_samplerate:g} Hz | {status}") self.root.after(50, self.poll_audio_queue) def close(self) -> None: self.stop() self.root.destroy() def run(self) -> None: self.root.mainloop() def run_gui(args: argparse.Namespace, sd) -> None: app = LevelMonitorGUI(sd, args) app.run() def build_arg_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser( description="Monitor live soundcard input RMS/peak levels for DSP-output voltage." ) parser.add_argument( "--device", type=int, help="sounddevice input device index. If omitted, an interactive prompt is shown.", ) parser.add_argument( "--channels", type=str, help="One-based input channel list, e.g. '1' or '1,2'. If omitted, prompted.", ) parser.add_argument( "--samplerate", type=float, default=DEFAULT_SAMPLERATE, help="Sample rate in Hz. Omit to use the device default.", ) parser.add_argument( "--blocksize", type=int, default=DEFAULT_BLOCKSIZE, help=f"Audio block size in frames. Default: {DEFAULT_BLOCKSIZE}.", ) parser.add_argument( "--input-volts-per-sample-unit", "--input-calibration", dest="input_volts_per_sample_unit", type=float, default=None, help=( "Path-specific voltage scale in volts per captured sounddevice sample unit. " "--input-calibration is accepted as a legacy alias. " "Default: newest saved calibration, otherwise " f"{DEFAULT_INPUT_VOLTS_PER_SAMPLE_UNIT:g}." ), ) parser.add_argument( "--calibrate-known-rms-v", type=float, help=( "Run input calibration using a known true-RMS voltage at the selected input, " "then apply the measured volts-per-sample-unit scale and start monitoring. " "Use a steady REW sine tone, for example 1 kHz." ), ) parser.add_argument( "--calibrate-only", action="store_true", help="With --calibrate-known-rms-v, print the measured voltage scale and exit.", ) parser.add_argument( "--no-calibration-ready-prompt", action="store_true", help="Start calibration measurement immediately instead of waiting for Enter.", ) parser.add_argument( "--calibration-duration", type=float, default=DEFAULT_CALIBRATION_DURATION_S, help=f"Calibration measurement duration in seconds. Default: {DEFAULT_CALIBRATION_DURATION_S:g}.", ) parser.add_argument( "--update-interval", type=float, default=DEFAULT_UPDATE_INTERVAL_S, help=f"Console update interval in seconds. Default: {DEFAULT_UPDATE_INTERVAL_S:g}.", ) parser.add_argument( "--way", type=PowerConsoleRunner.parse_way, action="append", help="Run power estimator for way NAME:CHANNEL:IMPEDANCE_FILE[:GAIN_DB], e.g. LF:1:impedance.txt:34.", ) parser.add_argument("--lf-impedance", help="REW impedance export for LF power estimation.") parser.add_argument( "--lf-channel", type=parse_channel, default=0, help="One-based LF input channel for power estimation. Default: 1.", ) parser.add_argument( "--lf-amp-gain-db", type=float, default=DEFAULT_AMP_GAIN_DB, help=f"LF amplifier voltage gain in dB. Default: {DEFAULT_AMP_GAIN_DB:g}.", ) parser.add_argument("--hf-impedance", help="REW impedance export for HF power estimation.") parser.add_argument( "--hf-channel", type=parse_channel, default=1, help="One-based HF input channel for power estimation. Default: 2.", ) parser.add_argument( "--hf-amp-gain-db", type=float, default=DEFAULT_AMP_GAIN_DB, help=f"HF amplifier voltage gain in dB. Default: {DEFAULT_AMP_GAIN_DB:g}.", ) parser.add_argument( "--power-fft-size", "--fft-size", dest="power_fft_size", type=int, default=DEFAULT_POWER_FFT_SIZE, help=f"Power estimator FFT size. Default: {DEFAULT_POWER_FFT_SIZE}.", ) parser.add_argument( "--power-overlap", "--overlap", dest="power_overlap", type=float, default=DEFAULT_POWER_OVERLAP, help=f"Power estimator overlap fraction. Default: {DEFAULT_POWER_OVERLAP:g}.", ) parser.add_argument( "--power-smoothing-alpha", "--smoothing-alpha", dest="power_smoothing_alpha", type=float, default=DEFAULT_POWER_SMOOTHING_ALPHA, help=f"Power estimator exponential smoothing alpha. Default: {DEFAULT_POWER_SMOOTHING_ALPHA:g}.", ) parser.add_argument( "--power-update-interval", type=float, default=DEFAULT_POWER_UPDATE_INTERVAL_S, help=f"Power estimator console update interval. Default: {DEFAULT_POWER_UPDATE_INTERVAL_S:g}.", ) parser.add_argument( "--winisd-max-power", default=str(WINISD_MAX_POWER_FILE), help=f"WinISD frequency-dependent max-power CSV for LF headroom. Default: {WINISD_MAX_POWER_FILE}.", ) parser.add_argument( "--disable-winisd-headroom", action="store_true", help="Disable WinISD max-power headroom monitoring.", ) parser.add_argument( "--headroom-smoothing-alpha", type=float, default=HEADROOM_SMOOTHING_ALPHA, help=f"WinISD headroom smoothing alpha. Default: {HEADROOM_SMOOTHING_ALPHA:g}.", ) parser.add_argument( "--self-test-winisd", action="store_true", help="Run synthetic WinISD headroom tests and exit.", ) parser.add_argument( "--list-devices", action="store_true", help="List input devices and exit.", ) parser.add_argument( "--gui", action="store_true", help="Open the GUI with input-side voltage/PSD metrics and output-side power estimates.", ) return parser def main(argv: list[str] | None = None) -> int: parser = build_arg_parser() args = parser.parse_args(argv) if args.input_volts_per_sample_unit is None: args.input_volts_per_sample_unit = CalibrationStore().latest_scale() if args.blocksize <= 0: parser.error("--blocksize must be positive") if args.input_volts_per_sample_unit <= 0: parser.error("--input-volts-per-sample-unit must be positive") if args.calibrate_known_rms_v is not None and args.calibrate_known_rms_v <= 0: parser.error("--calibrate-known-rms-v must be positive") if args.calibration_duration <= 0: parser.error("--calibration-duration must be positive") if args.update_interval <= 0: parser.error("--update-interval must be positive") if args.power_update_interval <= 0: parser.error("--power-update-interval must be positive") if args.power_fft_size <= 0: parser.error("--power-fft-size must be positive") if args.power_fft_size % 2: parser.error("--power-fft-size must be even") if not 0.0 <= args.power_overlap < 1.0: parser.error("--power-overlap must be >= 0 and < 1") if not 0.0 <= args.power_smoothing_alpha < 1.0: parser.error("--power-smoothing-alpha must be >= 0 and < 1") if not 0.0 <= args.headroom_smoothing_alpha < 1.0: parser.error("--headroom-smoothing-alpha must be >= 0 and < 1") if args.samplerate is not None and args.samplerate <= 0: parser.error("--samplerate must be positive") if args.self_test_winisd: run_self_test() for path in (Path(args.winisd_max_power), WINISD_EXCURSION_10W_FILE, WINISD_EXCURSION_100W_FILE): if path.exists(): WinISDLimitCurve.from_file(path, eps_w=HEADROOM_EPS_W) if WINISD_EXCURSION_10W_FILE.exists() and WINISD_EXCURSION_100W_FILE.exists(): print(check_excursion_scaling(WINISD_EXCURSION_10W_FILE, WINISD_EXCURSION_100W_FILE).message) print("WinISD self-test OK") return 0 sd = load_dependencies() if args.list_devices: SoundDeviceManager(sd).print_input_devices() return 0 if args.gui: run_gui(args, sd) return 0 if args.way or args.lf_impedance or args.hf_impedance: return PowerConsoleRunner(sd, args).run() config = resolve_config(args, sd) if args.calibrate_known_rms_v is not None: learned_calibration, measured_rms_fs, peak_fs = run_input_calibration( config, sd, args.calibrate_known_rms_v, args.calibration_duration, wait_for_ready=not args.no_calibration_ready_prompt, ) cal_path = CalibrationStore().save( volts_per_sample_unit=learned_calibration, known_rms_v=args.calibrate_known_rms_v, measured_rms_sample_by_channel={ str(channel + 1): float(measured_rms_fs[idx]) for idx, channel in enumerate(config.channels_zero_based) }, peak_sample_by_channel={ str(channel + 1): float(peak_fs[idx]) for idx, channel in enumerate(config.channels_zero_based) }, device_index=config.device_index, channels_zero_based=config.channels_zero_based, samplerate=config.samplerate, source="cli", ) print(f"Saved calibration: {cal_path}") if args.calibrate_only: return 0 config = InputConfig( device_index=config.device_index, channels_zero_based=config.channels_zero_based, samplerate=config.samplerate, blocksize=config.blocksize, input_volts_per_sample_unit=learned_calibration, update_interval_s=config.update_interval_s, ) try: run_capture(config, sd) except KeyboardInterrupt: print("\nStopped.") return 0 return 0 if __name__ == "__main__": raise SystemExit(main(sys.argv[1:]))