2456 lines
101 KiB
Python
2456 lines
101 KiB
Python
"""Live soundcard input monitor for DSP-output voltage measurements.
|
|
|
|
This first milestone intentionally does not do FFT, impedance loading, or
|
|
power estimation. It only verifies the audio-interface input path and reports
|
|
per-channel RMS/peak levels.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
import math
|
|
import queue
|
|
import sys
|
|
import time
|
|
from dataclasses import dataclass
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
from lspower.audio import (
|
|
QueuedInputStream,
|
|
SoundDeviceManager,
|
|
format_channel_list,
|
|
parse_channel,
|
|
parse_channel_list,
|
|
)
|
|
from lspower.impedance import ImpedanceCurve
|
|
from lspower.power import PowerEstimator, PowerFrame, VoltageSpectrumEstimator, VoltageSpectrumFrame, WayConfig
|
|
from lspower.winisd import WinISDLimitCurve, check_excursion_scaling, headroom_status, run_self_test
|
|
|
|
np = None
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# User defaults
|
|
# ---------------------------------------------------------------------------
|
|
|
|
# None asks sounddevice/PortAudio to use the device default sample rate.
|
|
DEFAULT_SAMPLERATE: float | None = None
|
|
|
|
# Audio callback block size in frames. This is only for input-level monitoring.
|
|
DEFAULT_BLOCKSIZE = 2048
|
|
DEFAULT_POWER_FFT_SIZE = 8192
|
|
DEFAULT_POWER_OVERLAP = 0.75
|
|
DEFAULT_POWER_SMOOTHING_ALPHA = 0.95
|
|
DEFAULT_POWER_UPDATE_INTERVAL_S = 0.25
|
|
DEFAULT_AMP_GAIN_DB = 32.0
|
|
DEFAULT_POWER_WARNING_W = 400.0
|
|
DEFAULT_POWER_CRITICAL_W = 550.0
|
|
WINISD_MAX_POWER_FILE = Path("winisd/max_power.csv")
|
|
WINISD_EXCURSION_10W_FILE = Path("winisd/excursion_10w.csv")
|
|
WINISD_EXCURSION_100W_FILE = Path("winisd/excursion_100w.csv")
|
|
ENABLE_WINISD_HEADROOM = True
|
|
HEADROOM_EPS_W = 1e-12
|
|
HEADROOM_SMOOTHING_ALPHA = 0.9
|
|
|
|
# Convert sounddevice's normalized floating-point samples to volts.
|
|
#
|
|
# The calibration factor is path-specific. It includes the analog input gain,
|
|
# driver/API path, Windows recording level, and any mixer/routing gain before
|
|
# sounddevice returns samples. It should not be interpreted as the MOTU's
|
|
# hardware full-scale voltage unless all of those stages are known to be unity.
|
|
#
|
|
# Current measured reference:
|
|
# Device path: UltraLite mk5 Line In 3-4 through Windows MME, device 3, ch 1
|
|
# Reference signal: 1 kHz sine, 2.32 Vrms at the input
|
|
# Captured level: 0.021384357 sample units RMS
|
|
DEFAULT_INPUT_CALIBRATION_REFERENCE_RMS_V = 2.32
|
|
DEFAULT_INPUT_CALIBRATION_REFERENCE_RMS_SAMPLE = 0.021384357
|
|
DEFAULT_INPUT_VOLTS_PER_SAMPLE_UNIT = (
|
|
DEFAULT_INPUT_CALIBRATION_REFERENCE_RMS_V
|
|
/ DEFAULT_INPUT_CALIBRATION_REFERENCE_RMS_SAMPLE
|
|
)
|
|
DEFAULT_INPUT_FULL_SCALE_SINE_RMS_V = DEFAULT_INPUT_VOLTS_PER_SAMPLE_UNIT / math.sqrt(2.0)
|
|
|
|
# How often the live console line is refreshed.
|
|
DEFAULT_UPDATE_INTERVAL_S = 0.1
|
|
|
|
# Calibration capture length. Use a steady REW sine generator tone, e.g. 1 kHz,
|
|
# and enter the calibrated true-RMS voltage measured at the soundcard input.
|
|
DEFAULT_CALIBRATION_DURATION_S = 3.0
|
|
MIN_CALIBRATION_RMS_FS = 1e-4
|
|
|
|
# Warn visually when any selected input approaches digital full scale.
|
|
CLIPPING_WARNING_THRESHOLD_FS = 0.98
|
|
OSCILLOSCOPE_TRACE_COLORS = ("#1f77b4", "#d62728", "#2ca02c", "#9467bd")
|
|
CALIBRATION_DIR = Path("calibrations")
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class InputConfig:
|
|
device_index: int
|
|
channels_zero_based: list[int]
|
|
samplerate: float | None
|
|
blocksize: int
|
|
input_volts_per_sample_unit: float
|
|
update_interval_s: float
|
|
|
|
@property
|
|
def stream_channel_count(self) -> int:
|
|
"""Open enough device channels to include the highest selected input."""
|
|
return max(self.channels_zero_based) + 1
|
|
|
|
|
|
class CalibrationStore:
|
|
def __init__(self, directory: Path = CALIBRATION_DIR):
|
|
self.directory = directory
|
|
|
|
def latest(self) -> dict | None:
|
|
if not self.directory.exists():
|
|
return None
|
|
files = sorted(self.directory.glob("calibration_*.json"))
|
|
for path in reversed(files):
|
|
try:
|
|
data = json.loads(path.read_text(encoding="utf-8"))
|
|
except (OSError, json.JSONDecodeError):
|
|
continue
|
|
if "volts_per_sample_unit" in data:
|
|
data["_path"] = str(path)
|
|
return data
|
|
return None
|
|
|
|
def latest_scale(self, fallback: float = DEFAULT_INPUT_VOLTS_PER_SAMPLE_UNIT) -> float:
|
|
data = self.latest()
|
|
if data is None:
|
|
return fallback
|
|
try:
|
|
scale = float(data["volts_per_sample_unit"])
|
|
except (TypeError, ValueError):
|
|
return fallback
|
|
return scale if scale > 0 else fallback
|
|
|
|
def save(
|
|
self,
|
|
*,
|
|
volts_per_sample_unit: float,
|
|
known_rms_v: float,
|
|
measured_rms_sample_by_channel: dict[str, float],
|
|
peak_sample_by_channel: dict[str, float],
|
|
device_index: int,
|
|
channels_zero_based: list[int],
|
|
samplerate: float | None,
|
|
source: str,
|
|
) -> Path:
|
|
self.directory.mkdir(parents=True, exist_ok=True)
|
|
now = datetime.now()
|
|
timestamp = now.strftime("%Y%m%d_%H%M%S")
|
|
path = self.directory / f"calibration_{timestamp}.json"
|
|
data = {
|
|
"created_at": now.isoformat(timespec="seconds"),
|
|
"source": source,
|
|
"volts_per_sample_unit": volts_per_sample_unit,
|
|
"equivalent_full_scale_sine_rms_v": volts_per_sample_unit / math.sqrt(2.0),
|
|
"known_rms_v": known_rms_v,
|
|
"measured_rms_sample_by_channel": measured_rms_sample_by_channel,
|
|
"peak_sample_by_channel": peak_sample_by_channel,
|
|
"device_index": device_index,
|
|
"channels_one_based": [channel + 1 for channel in channels_zero_based],
|
|
"samplerate": samplerate,
|
|
}
|
|
path.write_text(json.dumps(data, indent=2), encoding="utf-8")
|
|
return path
|
|
|
|
|
|
def load_dependencies():
|
|
global np
|
|
|
|
try:
|
|
import sounddevice as sd # type: ignore
|
|
except ImportError as exc:
|
|
raise SystemExit(
|
|
"The 'sounddevice' package is required.\n"
|
|
"Install it with: python -m pip install sounddevice"
|
|
) from exc
|
|
|
|
try:
|
|
import numpy as numpy_module # type: ignore
|
|
except ImportError as exc:
|
|
raise SystemExit(
|
|
"The 'numpy' package is required.\n"
|
|
"Install it with: python -m pip install numpy"
|
|
) from exc
|
|
|
|
np = numpy_module
|
|
return sd
|
|
|
|
|
|
def resolve_config(args: argparse.Namespace, sd) -> InputConfig:
|
|
device_manager = SoundDeviceManager(sd)
|
|
input_devices = device_manager.print_input_devices()
|
|
|
|
if args.device is None:
|
|
device_index = device_manager.prompt_input_device_index(input_devices)
|
|
else:
|
|
device_index = args.device
|
|
|
|
device = device_manager.device_info(device_index)
|
|
max_input_channels = int(device["max_input_channels"])
|
|
|
|
if args.channels is None:
|
|
channels_zero_based = device_manager.prompt_channels(max_input_channels)
|
|
else:
|
|
try:
|
|
channels_zero_based = parse_channel_list(args.channels)
|
|
device_manager.validate_channels(channels_zero_based, max_input_channels)
|
|
except ValueError as exc:
|
|
raise SystemExit(f"Invalid channel selection: {exc}") from exc
|
|
|
|
return InputConfig(
|
|
device_index=device_index,
|
|
channels_zero_based=channels_zero_based,
|
|
samplerate=args.samplerate,
|
|
blocksize=args.blocksize,
|
|
input_volts_per_sample_unit=args.input_volts_per_sample_unit,
|
|
update_interval_s=args.update_interval,
|
|
)
|
|
|
|
|
|
def compute_levels(
|
|
block,
|
|
selected_channels_zero_based: list[int],
|
|
input_volts_per_sample_unit: float,
|
|
) -> tuple:
|
|
if np is None:
|
|
raise RuntimeError("numpy has not been loaded")
|
|
selected = block[:, selected_channels_zero_based].astype(np.float64, copy=False)
|
|
volts = selected * input_volts_per_sample_unit
|
|
rms_fs = np.sqrt(np.mean(selected * selected, axis=0))
|
|
rms_v = np.sqrt(np.mean(volts * volts, axis=0))
|
|
peak_fs = np.max(np.abs(selected), axis=0)
|
|
return rms_fs, rms_v, peak_fs
|
|
|
|
|
|
def compute_volts_per_sample_unit(known_rms_v: float, measured_rms_sample) -> tuple:
|
|
if np is None:
|
|
raise RuntimeError("numpy has not been loaded")
|
|
measured = np.asarray(measured_rms_sample, dtype=np.float64)
|
|
if np.any(measured < MIN_CALIBRATION_RMS_FS):
|
|
raise ValueError(
|
|
"measured RMS is too close to zero; start REW's generator and check the selected input channel"
|
|
)
|
|
per_channel = known_rms_v / measured
|
|
return per_channel, float(np.mean(per_channel))
|
|
|
|
|
|
def format_levels(
|
|
channels_zero_based: list[int],
|
|
rms_fs,
|
|
rms_v,
|
|
peak_fs,
|
|
) -> str:
|
|
parts = []
|
|
for idx, channel in enumerate(channels_zero_based):
|
|
warning = " CLIP?" if peak_fs[idx] >= CLIPPING_WARNING_THRESHOLD_FS else ""
|
|
parts.append(
|
|
f"ch {channel + 1}: "
|
|
f"rms={rms_fs[idx]:.6f} sample / {rms_v[idx]:.6f} V, "
|
|
f"peak={peak_fs[idx]:.6f} FS{warning}"
|
|
)
|
|
return " | ".join(parts)
|
|
|
|
|
|
def smooth_headroom_db(previous: float | None, raw: float, alpha: float) -> float:
|
|
if previous is None or not math.isfinite(previous) or not math.isfinite(raw) or alpha <= 0.0:
|
|
return raw
|
|
return alpha * previous + (1.0 - alpha) * raw
|
|
|
|
|
|
def format_headroom(headroom: dict | None) -> str:
|
|
if not headroom:
|
|
return "WinISD headroom disabled"
|
|
return (
|
|
f"WinISD headroom={headroom['worst_headroom_db']:+6.2f} dB "
|
|
f"@ {headroom['critical_frequency_hz']:7.1f} Hz | {headroom['status']}"
|
|
)
|
|
|
|
|
|
class PowerConsoleRunner:
|
|
"""Live console runner for estimated amplifier output power."""
|
|
|
|
def __init__(self, sd, args: argparse.Namespace):
|
|
self.sd = sd
|
|
self.args = args
|
|
self.device_manager = SoundDeviceManager(sd)
|
|
self.ways = self._build_way_configs()
|
|
self.channels = sorted({way.channel_zero_based for way in self.ways})
|
|
self.device_index = self._resolve_device_index()
|
|
self._validate_audio_config()
|
|
self.impedances = {
|
|
way.name: ImpedanceCurve.from_file(way.impedance_path)
|
|
for way in self.ways
|
|
}
|
|
self.capture = QueuedInputStream(
|
|
sd=sd,
|
|
device_index=self.device_index,
|
|
channels_zero_based=self.channels,
|
|
samplerate=args.samplerate,
|
|
blocksize=args.blocksize,
|
|
)
|
|
self.estimators: dict[str, PowerEstimator] = {}
|
|
self.buffers: dict[str, object] = {}
|
|
self.headroom_curves: dict[str, WinISDLimitCurve] = {}
|
|
self.headroom_smoothed_db: dict[str, float] = {}
|
|
self.hop_size = 0
|
|
|
|
@staticmethod
|
|
def parse_way(text: str) -> WayConfig:
|
|
parts = text.split(":", 2)
|
|
if len(parts) != 3:
|
|
raise argparse.ArgumentTypeError(
|
|
"way must be NAME:CHANNEL:IMPEDANCE_FILE[:GAIN_DB], e.g. LF:1:woofer.csv:34"
|
|
)
|
|
name = parts[0].strip()
|
|
if not name:
|
|
raise argparse.ArgumentTypeError("way name must not be empty")
|
|
try:
|
|
channel = parse_channel(parts[1].strip())
|
|
except ValueError as exc:
|
|
raise argparse.ArgumentTypeError(str(exc)) from exc
|
|
|
|
path_text = parts[2].strip()
|
|
gain_db = DEFAULT_AMP_GAIN_DB
|
|
if ":" in path_text:
|
|
maybe_path, maybe_gain = path_text.rsplit(":", 1)
|
|
try:
|
|
gain_db = float(maybe_gain)
|
|
path_text = maybe_path
|
|
except ValueError:
|
|
pass
|
|
return WayConfig(
|
|
name=name,
|
|
channel_zero_based=channel,
|
|
impedance_path=Path(path_text),
|
|
amp_gain_db=gain_db,
|
|
)
|
|
|
|
def _build_way_configs(self) -> list[WayConfig]:
|
|
ways: list[WayConfig] = []
|
|
if self.args.way:
|
|
ways.extend(self.args.way)
|
|
if self.args.lf_impedance:
|
|
ways.append(
|
|
WayConfig(
|
|
name="LF",
|
|
channel_zero_based=self.args.lf_channel,
|
|
impedance_path=Path(self.args.lf_impedance),
|
|
amp_gain_db=self.args.lf_amp_gain_db,
|
|
)
|
|
)
|
|
if self.args.hf_impedance:
|
|
ways.append(
|
|
WayConfig(
|
|
name="HF",
|
|
channel_zero_based=self.args.hf_channel,
|
|
impedance_path=Path(self.args.hf_impedance),
|
|
amp_gain_db=self.args.hf_amp_gain_db,
|
|
)
|
|
)
|
|
if not ways:
|
|
raise SystemExit(
|
|
"Configure at least one way with --way NAME:CHANNEL:IMPEDANCE_FILE[:GAIN_DB] "
|
|
"or --lf-impedance/--hf-impedance."
|
|
)
|
|
|
|
seen: dict[int, str] = {}
|
|
for way in ways:
|
|
if way.channel_zero_based in seen:
|
|
raise SystemExit(
|
|
f"Input channel {way.channel_zero_based + 1} is used by both "
|
|
f"{seen[way.channel_zero_based]} and {way.name}."
|
|
)
|
|
seen[way.channel_zero_based] = way.name
|
|
return ways
|
|
|
|
def _resolve_device_index(self) -> int:
|
|
entries = self.device_manager.print_input_devices()
|
|
if self.args.device is not None:
|
|
return self.args.device
|
|
return self.device_manager.prompt_input_device_index(entries)
|
|
|
|
def _validate_audio_config(self) -> None:
|
|
device = self.device_manager.device_info(self.device_index)
|
|
max_input_channels = int(device["max_input_channels"])
|
|
self.device_manager.validate_channels(self.channels, max_input_channels)
|
|
|
|
def _create_estimators(self) -> None:
|
|
actual_samplerate = self.capture.actual_samplerate
|
|
self.hop_size = int(round(self.args.power_fft_size * (1.0 - self.args.power_overlap)))
|
|
if self.hop_size <= 0:
|
|
raise SystemExit("--power-overlap is too high; resulting hop size is zero")
|
|
|
|
self.estimators = {
|
|
way.name: PowerEstimator(
|
|
way=way,
|
|
impedance=self.impedances[way.name],
|
|
samplerate=actual_samplerate,
|
|
fft_size=self.args.power_fft_size,
|
|
input_volts_per_sample_unit=self.args.input_volts_per_sample_unit,
|
|
smoothing_alpha=self.args.power_smoothing_alpha,
|
|
)
|
|
for way in self.ways
|
|
}
|
|
self.buffers = {
|
|
way.name: np.zeros(0, dtype=np.float64)
|
|
for way in self.ways
|
|
}
|
|
self._create_headroom_curves()
|
|
|
|
def _create_headroom_curves(self) -> None:
|
|
self.headroom_curves = {}
|
|
self.headroom_smoothed_db = {}
|
|
if self.args.disable_winisd_headroom:
|
|
return
|
|
|
|
path = Path(self.args.winisd_max_power)
|
|
lf_ways = [way for way in self.ways if way.name.upper() == "LF"]
|
|
if not lf_ways:
|
|
return
|
|
try:
|
|
curve = WinISDLimitCurve.from_file(path, eps_w=HEADROOM_EPS_W)
|
|
except Exception as exc:
|
|
print(f"WinISD headroom disabled: {exc}", file=sys.stderr)
|
|
return
|
|
|
|
for way in lf_ways:
|
|
self.headroom_curves[way.name] = curve
|
|
|
|
def format_power_line(self, name: str, frame: PowerFrame) -> str:
|
|
headroom = self._compute_headroom(name, frame)
|
|
line = (
|
|
f"{name}: "
|
|
f"Vdsp={frame.rms_input_v:9.4f} Vrms, "
|
|
f"Vamp={frame.rms_amp_v:9.2f} Vrms, "
|
|
f"P={frame.total_p_w:10.2f} W, "
|
|
f"Q={frame.total_q_var:10.2f} var, "
|
|
f"Sapp={frame.total_s_va:10.2f} VA"
|
|
)
|
|
if headroom:
|
|
line += ", " + format_headroom(headroom)
|
|
return line
|
|
|
|
def _compute_headroom(self, name: str, frame: PowerFrame) -> dict | None:
|
|
curve = self.headroom_curves.get(name)
|
|
if curve is None:
|
|
return None
|
|
headroom = curve.compute_headroom(frame.p_w_per_bin, frame.frequencies_hz)
|
|
raw = float(headroom["worst_headroom_db"])
|
|
smoothed = smooth_headroom_db(
|
|
self.headroom_smoothed_db.get(name),
|
|
raw,
|
|
self.args.headroom_smoothing_alpha,
|
|
)
|
|
self.headroom_smoothed_db[name] = smoothed
|
|
headroom["worst_headroom_db"] = smoothed
|
|
headroom["status"] = headroom_status(smoothed)
|
|
headroom["over_limit_bool"] = bool(math.isfinite(smoothed) and smoothed <= 0.0)
|
|
return headroom
|
|
|
|
def run(self) -> int:
|
|
try:
|
|
self.capture.start()
|
|
except Exception as exc:
|
|
raise SystemExit(f"Could not open input stream: {exc}") from exc
|
|
|
|
try:
|
|
self._create_estimators()
|
|
actual_samplerate = self.capture.actual_samplerate
|
|
print()
|
|
print("Starting DSP-output power estimator. Press Ctrl+C to stop.")
|
|
print(f"Device index: {self.device_index}")
|
|
print(f"Actual samplerate: {actual_samplerate:g} Hz")
|
|
print(
|
|
f"FFT size: {self.args.power_fft_size}, hop size: {self.hop_size}, "
|
|
f"df: {actual_samplerate / self.args.power_fft_size:g} Hz"
|
|
)
|
|
print(f"Input voltage scale: {self.args.input_volts_per_sample_unit:g} V / sample unit")
|
|
print("Assumption: ideal linear amplifier voltage gain per way.")
|
|
if self.headroom_curves:
|
|
print(f"WinISD headroom: {Path(self.args.winisd_max_power)}")
|
|
if WINISD_EXCURSION_10W_FILE.exists() and WINISD_EXCURSION_100W_FILE.exists():
|
|
try:
|
|
print(check_excursion_scaling(WINISD_EXCURSION_10W_FILE, WINISD_EXCURSION_100W_FILE).message)
|
|
except Exception as exc:
|
|
print(f"Excursion check skipped: {exc}")
|
|
else:
|
|
print("WinISD headroom: disabled")
|
|
for way in self.ways:
|
|
curve = self.impedances[way.name]
|
|
print(
|
|
f" {way.name}: input ch {way.channel_zero_based + 1}, "
|
|
f"amp gain {way.amp_gain_db:g} dB ({way.amp_gain_linear:g}x), "
|
|
f"impedance {curve.source}, range {curve.frequency_hz[0]:g}-{curve.frequency_hz[-1]:g} Hz"
|
|
)
|
|
print()
|
|
|
|
latest_frames: dict[str, PowerFrame] = {}
|
|
last_print = 0.0
|
|
while True:
|
|
try:
|
|
block, status = self.capture.queue.get(timeout=0.5)
|
|
except queue.Empty:
|
|
continue
|
|
if status:
|
|
print(f"\nStream status: {status}", file=sys.stderr)
|
|
|
|
for way in self.ways:
|
|
self.buffers[way.name] = np.concatenate(
|
|
(self.buffers[way.name], block[:, way.channel_zero_based].astype(np.float64))
|
|
)
|
|
while len(self.buffers[way.name]) >= self.args.power_fft_size:
|
|
latest_frames[way.name] = self.estimators[way.name].estimate(
|
|
self.buffers[way.name][: self.args.power_fft_size]
|
|
)
|
|
self.buffers[way.name] = self.buffers[way.name][self.hop_size :]
|
|
|
|
now = time.monotonic()
|
|
if latest_frames and now - last_print >= self.args.power_update_interval:
|
|
lines = [
|
|
self.format_power_line(name, latest_frames[name])
|
|
for name in sorted(latest_frames)
|
|
]
|
|
print("\r" + " | ".join(lines), end="", flush=True)
|
|
last_print = now
|
|
except KeyboardInterrupt:
|
|
print("\nStopped.")
|
|
return 0
|
|
finally:
|
|
self.capture.stop()
|
|
|
|
|
|
def run_input_calibration(
|
|
config: InputConfig,
|
|
sd,
|
|
known_rms_v: float,
|
|
duration_s: float,
|
|
wait_for_ready: bool = True,
|
|
) -> tuple[float, object, object]:
|
|
if np is None:
|
|
raise RuntimeError("numpy has not been loaded")
|
|
|
|
capture = QueuedInputStream(
|
|
sd=sd,
|
|
device_index=config.device_index,
|
|
channels_zero_based=config.channels_zero_based,
|
|
samplerate=config.samplerate,
|
|
blocksize=config.blocksize,
|
|
max_queue_blocks=32,
|
|
)
|
|
try:
|
|
capture.start()
|
|
except Exception as exc:
|
|
raise SystemExit(f"Could not open input stream: {exc}") from exc
|
|
|
|
print()
|
|
print("Input calibration")
|
|
print("Use REW's generator with a steady sine tone, e.g. 1 kHz.")
|
|
print("Keep the tone below clipping, then enter its calibrated true-RMS voltage.")
|
|
print(f"Device index: {config.device_index}")
|
|
print(f"Selected channels: {format_channel_list(config.channels_zero_based)}")
|
|
print(f"Known RMS voltage: {known_rms_v:g} V")
|
|
if wait_for_ready:
|
|
input("Start the REW sine generator now, then press Enter to measure...")
|
|
print(f"Measuring for {duration_s:g} s...")
|
|
|
|
sums = np.zeros(len(config.channels_zero_based), dtype=np.float64)
|
|
frame_count = 0
|
|
peak_fs = np.zeros(len(config.channels_zero_based), dtype=np.float64)
|
|
deadline = time.monotonic() + duration_s
|
|
|
|
try:
|
|
while time.monotonic() < deadline:
|
|
try:
|
|
block, status = capture.queue.get(timeout=0.5)
|
|
except queue.Empty:
|
|
continue
|
|
if status:
|
|
print(f"Stream status: {status}")
|
|
|
|
selected = block[:, config.channels_zero_based].astype(np.float64, copy=False)
|
|
sums += np.sum(selected * selected, axis=0)
|
|
frame_count += selected.shape[0]
|
|
peak_fs = np.maximum(peak_fs, np.max(np.abs(selected), axis=0))
|
|
finally:
|
|
capture.stop()
|
|
|
|
if frame_count == 0:
|
|
raise SystemExit("No input samples were captured during calibration.")
|
|
|
|
measured_rms_fs = np.sqrt(sums / frame_count)
|
|
try:
|
|
per_channel, average = compute_volts_per_sample_unit(known_rms_v, measured_rms_fs)
|
|
except ValueError as exc:
|
|
measured_text = ", ".join(
|
|
f"ch {channel + 1}: {measured_rms_fs[idx]:.9f} sample units RMS"
|
|
for idx, channel in enumerate(config.channels_zero_based)
|
|
)
|
|
raise SystemExit(f"Calibration failed: {exc}\nMeasured: {measured_text}") from exc
|
|
|
|
print()
|
|
for idx, channel in enumerate(config.channels_zero_based):
|
|
warning = " CLIP?" if peak_fs[idx] >= CLIPPING_WARNING_THRESHOLD_FS else ""
|
|
print(
|
|
f"ch {channel + 1}: measured {measured_rms_fs[idx]:.9f} sample units RMS, "
|
|
f"peak {peak_fs[idx]:.6f} FS{warning}, "
|
|
f"scale {per_channel[idx]:.9g} V / sample unit"
|
|
)
|
|
print(f"Average voltage scale: {average:.9g} V / sample unit")
|
|
print(f"Equivalent full-scale sine RMS: {average / math.sqrt(2.0):.9g} Vrms")
|
|
print(f"Use: --input-volts-per-sample-unit {average:.9g}")
|
|
return average, measured_rms_fs, peak_fs
|
|
|
|
|
|
def run_capture(config: InputConfig, sd) -> None:
|
|
capture = QueuedInputStream(
|
|
sd=sd,
|
|
device_index=config.device_index,
|
|
channels_zero_based=config.channels_zero_based,
|
|
samplerate=config.samplerate,
|
|
blocksize=config.blocksize,
|
|
max_queue_blocks=8,
|
|
)
|
|
try:
|
|
capture.start()
|
|
except Exception as exc:
|
|
raise SystemExit(f"Could not open input stream: {exc}") from exc
|
|
|
|
print()
|
|
print("Starting input monitor. Press Ctrl+C to stop.")
|
|
print(f"Device index: {config.device_index}")
|
|
print(f"Selected channels: {format_channel_list(config.channels_zero_based)}")
|
|
print(f"Voltage scale: {config.input_volts_per_sample_unit:g} V / sample unit")
|
|
print(
|
|
"Equivalent full-scale sine RMS: "
|
|
f"{config.input_volts_per_sample_unit / math.sqrt(2.0):g} Vrms"
|
|
)
|
|
|
|
last_print = 0.0
|
|
try:
|
|
actual_samplerate = capture.actual_samplerate
|
|
print(f"Actual stream samplerate: {actual_samplerate:g} Hz")
|
|
print()
|
|
|
|
while True:
|
|
try:
|
|
block, status = capture.queue.get(timeout=0.5)
|
|
except queue.Empty:
|
|
continue
|
|
|
|
now = time.monotonic()
|
|
if now - last_print < config.update_interval_s:
|
|
continue
|
|
|
|
rms_fs, rms_v, peak_fs = compute_levels(
|
|
block,
|
|
config.channels_zero_based,
|
|
config.input_volts_per_sample_unit,
|
|
)
|
|
|
|
status_text = f" | status: {status}" if status else ""
|
|
line = format_levels(config.channels_zero_based, rms_fs, rms_v, peak_fs)
|
|
print(f"\r{line}{status_text}", end="", flush=True)
|
|
last_print = now
|
|
finally:
|
|
capture.stop()
|
|
|
|
|
|
class LevelMonitorGUI:
|
|
def __init__(self, sd, args: argparse.Namespace):
|
|
import tkinter as tk
|
|
from tkinter import messagebox, ttk
|
|
|
|
self.sd = sd
|
|
self.args = args
|
|
self.tk = tk
|
|
self.messagebox = messagebox
|
|
self.ttk = ttk
|
|
self.root = tk.Tk()
|
|
self.root.title("Soundcard Input Voltage Monitor")
|
|
self.root.geometry("1180x820")
|
|
self.root.minsize(980, 680)
|
|
|
|
self.device_manager = SoundDeviceManager(sd)
|
|
self.capture: QueuedInputStream | None = None
|
|
self.config: InputConfig | None = None
|
|
self.calibration_state: dict | None = None
|
|
self.input_spectrum_estimators: dict[int, VoltageSpectrumEstimator] = {}
|
|
self.input_spectrum_buffers: dict[int, object] = {}
|
|
self.input_spectrum_frames: dict[int, VoltageSpectrumFrame] = {}
|
|
self.power_estimators: dict[str, PowerEstimator] = {}
|
|
self.power_buffers: dict[str, object] = {}
|
|
self.power_rows: dict[str, dict] = {}
|
|
self.power_history: dict[str, list[tuple[float, float, float]]] = {}
|
|
self.power_max_hold_w: dict[str, float] = {}
|
|
self.latest_power_frames: dict[str, PowerFrame] = {}
|
|
self.latest_headroom_frames: dict[str, dict] = {}
|
|
self.headroom_curves: dict[str, WinISDLimitCurve] = {}
|
|
self.headroom_smoothed_db: dict[str, float] = {}
|
|
self.power_history_start_s: float | None = None
|
|
self.power_hop_size = 0
|
|
self.level_rows: list[dict] = []
|
|
self.input_metric_rows: dict[int, dict] = {}
|
|
self.device_entries = self.device_manager.input_devices()
|
|
|
|
if not self.device_entries:
|
|
raise SystemExit("No input-capable sounddevice devices were found.")
|
|
|
|
self.device_var = tk.StringVar()
|
|
self.channels_var = tk.StringVar(value=args.channels or "1,2")
|
|
self.samplerate_var = tk.StringVar(
|
|
value="" if args.samplerate is None else f"{args.samplerate:g}"
|
|
)
|
|
self.blocksize_var = tk.StringVar(value=str(args.blocksize))
|
|
self.calibration_var = tk.StringVar(value=f"{args.input_volts_per_sample_unit:g}")
|
|
self.known_rms_v_var = tk.StringVar(
|
|
value=(
|
|
f"{args.calibrate_known_rms_v:g}"
|
|
if args.calibrate_known_rms_v
|
|
else f"{DEFAULT_INPUT_CALIBRATION_REFERENCE_RMS_V:g}"
|
|
)
|
|
)
|
|
self.calibration_duration_var = tk.StringVar(value=f"{args.calibration_duration:g}")
|
|
self.waveform_y_limit_var = tk.StringVar(value="")
|
|
self.power_history_window_var = tk.StringVar(value="30")
|
|
self.power_fft_size_var = tk.StringVar(value=str(DEFAULT_POWER_FFT_SIZE))
|
|
self.power_overlap_var = tk.StringVar(value=f"{DEFAULT_POWER_OVERLAP:g}")
|
|
self.power_smoothing_var = tk.StringVar(value=f"{DEFAULT_POWER_SMOOTHING_ALPHA:g}")
|
|
self.winisd_enabled_var = tk.BooleanVar(value=not args.disable_winisd_headroom and ENABLE_WINISD_HEADROOM)
|
|
self.winisd_max_power_var = tk.StringVar(value=str(args.winisd_max_power or WINISD_MAX_POWER_FILE))
|
|
self.headroom_smoothing_var = tk.StringVar(value=f"{args.headroom_smoothing_alpha:g}")
|
|
self.winisd_status_var = tk.StringVar(value="WinISD headroom uses per-bin P/Pmax for LF.")
|
|
self.show_waveform_plot_var = tk.BooleanVar(value=True)
|
|
self.show_psd_plot_var = tk.BooleanVar(value=True)
|
|
self.show_power_history_plot_var = tk.BooleanVar(value=True)
|
|
self.show_power_limit_plot_var = tk.BooleanVar(value=True)
|
|
self.show_headroom_plot_var = tk.BooleanVar(value=True)
|
|
self.lf_enabled_var = tk.BooleanVar(value=True)
|
|
self.lf_channel_var = tk.StringVar(value="1")
|
|
self.lf_impedance_var = tk.StringVar(value="impedance.txt" if Path("impedance.txt").exists() else "")
|
|
self.lf_gain_var = tk.StringVar(value=f"{DEFAULT_AMP_GAIN_DB:g}")
|
|
self.hf_enabled_var = tk.BooleanVar(value=False)
|
|
self.hf_channel_var = tk.StringVar(value="2")
|
|
self.hf_impedance_var = tk.StringVar(value="")
|
|
self.hf_gain_var = tk.StringVar(value=f"{DEFAULT_AMP_GAIN_DB:g}")
|
|
self.status_var = tk.StringVar(value="Stopped")
|
|
self.power_style = ttk.Style()
|
|
self.power_style.configure("PowerOk.Horizontal.TProgressbar", troughcolor="#d9d9d9", background="#22a447")
|
|
self.power_style.configure("PowerWarn.Horizontal.TProgressbar", troughcolor="#d9d9d9", background="#e6a400")
|
|
self.power_style.configure("PowerCritical.Horizontal.TProgressbar", troughcolor="#d9d9d9", background="#d62728")
|
|
|
|
self._build_widgets()
|
|
self._select_initial_device(args.device)
|
|
self.root.protocol("WM_DELETE_WINDOW", self.close)
|
|
self.root.after(50, self.poll_audio_queue)
|
|
|
|
def _build_widgets(self) -> None:
|
|
ttk = self.ttk
|
|
|
|
outer = ttk.Frame(self.root, padding=12)
|
|
outer.pack(fill="both", expand=True)
|
|
|
|
config_frame = ttk.LabelFrame(outer, text="Input setup", padding=10)
|
|
config_frame.pack(fill="x")
|
|
config_frame.columnconfigure(1, weight=1)
|
|
config_frame.columnconfigure(5, weight=1)
|
|
|
|
ttk.Label(config_frame, text="Device").grid(row=0, column=0, sticky="w")
|
|
self.device_combo = ttk.Combobox(
|
|
config_frame,
|
|
textvariable=self.device_var,
|
|
values=[entry.label for entry in self.device_entries],
|
|
state="readonly",
|
|
)
|
|
self.device_combo.grid(row=0, column=1, columnspan=5, sticky="ew", padx=(8, 0))
|
|
|
|
ttk.Label(config_frame, text="Channels").grid(row=1, column=0, sticky="w", pady=(8, 0))
|
|
ttk.Entry(config_frame, textvariable=self.channels_var, width=12).grid(
|
|
row=1, column=1, sticky="w", padx=(8, 16), pady=(8, 0)
|
|
)
|
|
|
|
ttk.Label(config_frame, text="Sample rate").grid(row=1, column=2, sticky="w", pady=(8, 0))
|
|
ttk.Entry(config_frame, textvariable=self.samplerate_var, width=12).grid(
|
|
row=1, column=3, sticky="w", padx=(8, 16), pady=(8, 0)
|
|
)
|
|
|
|
ttk.Label(config_frame, text="Blocksize").grid(row=1, column=4, sticky="w", pady=(8, 0))
|
|
ttk.Entry(config_frame, textvariable=self.blocksize_var, width=10).grid(
|
|
row=1, column=5, sticky="w", padx=(8, 0), pady=(8, 0)
|
|
)
|
|
|
|
ttk.Label(config_frame, text="Voltage scale").grid(row=2, column=0, sticky="w", pady=(8, 0))
|
|
ttk.Entry(config_frame, textvariable=self.calibration_var, width=12).grid(
|
|
row=2, column=1, sticky="w", padx=(8, 4), pady=(8, 0)
|
|
)
|
|
ttk.Label(config_frame, text="V per sample unit").grid(
|
|
row=2, column=2, sticky="w", padx=(0, 16), pady=(8, 0)
|
|
)
|
|
|
|
self.start_button = ttk.Button(config_frame, text="Start", command=self.start)
|
|
self.start_button.grid(row=2, column=3, sticky="ew", padx=(0, 8), pady=(8, 0))
|
|
self.stop_button = ttk.Button(config_frame, text="Stop", command=self.stop, state="disabled")
|
|
self.stop_button.grid(row=2, column=4, sticky="ew", padx=(0, 16), pady=(8, 0))
|
|
ttk.Label(config_frame, textvariable=self.status_var).grid(
|
|
row=2, column=5, sticky="w", pady=(8, 0)
|
|
)
|
|
|
|
ttk.Label(config_frame, text="Cal signal").grid(row=3, column=0, sticky="w", pady=(8, 0))
|
|
ttk.Entry(config_frame, textvariable=self.known_rms_v_var, width=12).grid(
|
|
row=3, column=1, sticky="w", padx=(8, 4), pady=(8, 0)
|
|
)
|
|
ttk.Label(config_frame, text="Vrms at input").grid(
|
|
row=3, column=2, sticky="w", padx=(0, 16), pady=(8, 0)
|
|
)
|
|
ttk.Label(config_frame, text="Seconds").grid(row=3, column=3, sticky="w", pady=(8, 0))
|
|
ttk.Entry(config_frame, textvariable=self.calibration_duration_var, width=10).grid(
|
|
row=3, column=4, sticky="w", padx=(8, 16), pady=(8, 0)
|
|
)
|
|
self.calibrate_button = ttk.Button(
|
|
config_frame,
|
|
text="Calibrate",
|
|
command=self.start_calibration,
|
|
state="disabled",
|
|
)
|
|
self.calibrate_button.grid(row=3, column=5, sticky="ew", pady=(8, 0))
|
|
|
|
self.level_frame = ttk.LabelFrame(outer, text="Input-side metrics", padding=10)
|
|
self.level_frame.pack(fill="x", pady=(12, 0))
|
|
self.level_frame.columnconfigure(6, weight=1)
|
|
|
|
input_note = ttk.Label(
|
|
self.level_frame,
|
|
text="Measured at DSP output / amplifier input, using the current voltage scale.",
|
|
)
|
|
input_note.grid(row=0, column=0, columnspan=7, sticky="w", pady=(0, 8))
|
|
|
|
for col, heading in enumerate(("Ch", "RMS voltage", "Peak voltage", "Crest factor", "Crest", "Digital peak")):
|
|
ttk.Label(self.level_frame, text=heading).grid(
|
|
row=1,
|
|
column=col,
|
|
sticky="w" if col == 0 else "e",
|
|
padx=(0, 18 if col < 5 else 0),
|
|
)
|
|
self.input_metric_static_widget_count = len(self.level_frame.winfo_children())
|
|
|
|
self.power_frame = ttk.LabelFrame(outer, text="Estimated amplifier output power", padding=10)
|
|
self.power_frame.pack(fill="x", pady=(12, 0))
|
|
self.power_frame.columnconfigure(3, weight=1)
|
|
|
|
power_note = ttk.Label(
|
|
self.power_frame,
|
|
text="Output side is estimated from DSP voltage, ideal amplifier gain, and the impedance curve.",
|
|
)
|
|
power_note.grid(row=0, column=0, columnspan=10, sticky="w", pady=(0, 8))
|
|
|
|
ttk.Label(self.power_frame, text="FFT").grid(row=1, column=0, sticky="w")
|
|
ttk.Entry(self.power_frame, textvariable=self.power_fft_size_var, width=8).grid(
|
|
row=1, column=1, sticky="w", padx=(8, 16)
|
|
)
|
|
ttk.Label(self.power_frame, text="Overlap").grid(row=1, column=2, sticky="e")
|
|
ttk.Entry(self.power_frame, textvariable=self.power_overlap_var, width=8).grid(
|
|
row=1, column=3, sticky="w", padx=(8, 16)
|
|
)
|
|
ttk.Label(self.power_frame, text="Smooth").grid(row=1, column=4, sticky="e")
|
|
ttk.Entry(self.power_frame, textvariable=self.power_smoothing_var, width=8).grid(
|
|
row=1, column=5, sticky="w", padx=(8, 16)
|
|
)
|
|
ttk.Checkbutton(self.power_frame, text="WinISD", variable=self.winisd_enabled_var).grid(
|
|
row=2, column=0, sticky="w", pady=(8, 0)
|
|
)
|
|
ttk.Entry(self.power_frame, textvariable=self.winisd_max_power_var, width=44).grid(
|
|
row=2, column=1, columnspan=3, sticky="ew", padx=(8, 12), pady=(8, 0)
|
|
)
|
|
ttk.Label(self.power_frame, text="Headroom smooth").grid(row=2, column=4, sticky="e", pady=(8, 0))
|
|
ttk.Entry(self.power_frame, textvariable=self.headroom_smoothing_var, width=8).grid(
|
|
row=2, column=5, sticky="w", padx=(8, 16), pady=(8, 0)
|
|
)
|
|
ttk.Label(self.power_frame, textvariable=self.winisd_status_var).grid(
|
|
row=2, column=6, columnspan=9, sticky="w", pady=(8, 0)
|
|
)
|
|
|
|
headings = (
|
|
"Way",
|
|
"On",
|
|
"Ch",
|
|
"Impedance file",
|
|
"Gain dB",
|
|
"Vdsp",
|
|
"Vamp",
|
|
"P",
|
|
"Q",
|
|
"Sapp",
|
|
"Headroom",
|
|
"Crit Hz",
|
|
"Limit",
|
|
"P meter",
|
|
"",
|
|
"Max P",
|
|
)
|
|
for col, heading in enumerate(headings):
|
|
ttk.Label(self.power_frame, text=heading).grid(
|
|
row=3,
|
|
column=col,
|
|
sticky="w" if col <= 4 else "e",
|
|
pady=(8, 0),
|
|
padx=(0, 12 if col < 9 else 0),
|
|
)
|
|
|
|
self._build_power_config_row("LF", 4, self.lf_enabled_var, self.lf_channel_var, self.lf_impedance_var, self.lf_gain_var)
|
|
self._build_power_config_row("HF", 5, self.hf_enabled_var, self.hf_channel_var, self.hf_impedance_var, self.hf_gain_var)
|
|
|
|
plots_frame = ttk.LabelFrame(outer, text="Plot viewer", padding=10)
|
|
plots_frame.pack(fill="both", expand=True, pady=(12, 0))
|
|
plots_frame.columnconfigure(0, weight=1)
|
|
plots_frame.rowconfigure(1, weight=1)
|
|
|
|
plot_selector = ttk.Frame(plots_frame)
|
|
plot_selector.grid(row=0, column=0, sticky="ew", pady=(0, 8))
|
|
plot_options = (
|
|
("Waveform", self.show_waveform_plot_var),
|
|
("Input PSD", self.show_psd_plot_var),
|
|
("Power history", self.show_power_history_plot_var),
|
|
("P vs WinISD", self.show_power_limit_plot_var),
|
|
("Headroom", self.show_headroom_plot_var),
|
|
)
|
|
for col, (label, variable) in enumerate(plot_options):
|
|
ttk.Checkbutton(
|
|
plot_selector,
|
|
text=label,
|
|
variable=variable,
|
|
command=self._layout_plot_tiles,
|
|
).grid(row=0, column=col, sticky="w", padx=(0, 16))
|
|
|
|
self.plot_area = ttk.Frame(plots_frame)
|
|
self.plot_area.grid(row=1, column=0, sticky="nsew")
|
|
self.plot_area.columnconfigure(0, weight=1)
|
|
self.plot_area.columnconfigure(1, weight=1)
|
|
for row in range(3):
|
|
self.plot_area.rowconfigure(row, weight=1)
|
|
|
|
self.scope_frame = ttk.LabelFrame(self.plot_area, text="Input waveform, calibrated voltage", padding=10)
|
|
self.scope_frame.rowconfigure(0, weight=1)
|
|
self.scope_frame.columnconfigure(0, weight=1)
|
|
|
|
self.scope_canvas = self.tk.Canvas(
|
|
self.scope_frame,
|
|
background="#111111",
|
|
highlightthickness=0,
|
|
height=260,
|
|
)
|
|
self.scope_canvas.grid(row=0, column=0, sticky="nsew")
|
|
self.scope_status_var = self.tk.StringVar(value="Waveform shows the newest input block in calibrated volts.")
|
|
scope_controls = ttk.Frame(self.scope_frame)
|
|
scope_controls.grid(row=1, column=0, sticky="ew", pady=(8, 0))
|
|
scope_controls.columnconfigure(3, weight=1)
|
|
ttk.Label(scope_controls, text="Y limit").grid(row=0, column=0, sticky="w")
|
|
ttk.Entry(scope_controls, textvariable=self.waveform_y_limit_var, width=10).grid(
|
|
row=0, column=1, sticky="w", padx=(8, 4)
|
|
)
|
|
ttk.Label(scope_controls, text="V, blank = auto").grid(row=0, column=2, sticky="w")
|
|
ttk.Label(scope_controls, textvariable=self.scope_status_var).grid(
|
|
row=0, column=3, sticky="w", padx=(16, 0)
|
|
)
|
|
self.waveform_y_limit_var.trace_add("write", lambda *_args: self.draw_scope_grid())
|
|
self.scope_canvas.bind("<Configure>", lambda _event: self.draw_scope_grid())
|
|
self.draw_scope_grid()
|
|
|
|
self.spectrum_frame = ttk.LabelFrame(self.plot_area, text="Input PSD, Welch scaled", padding=10)
|
|
self.spectrum_frame.rowconfigure(0, weight=1)
|
|
self.spectrum_frame.columnconfigure(0, weight=1)
|
|
self.spectrum_canvas = self.tk.Canvas(
|
|
self.spectrum_frame,
|
|
background="#111111",
|
|
highlightthickness=0,
|
|
height=260,
|
|
)
|
|
self.spectrum_canvas.grid(row=0, column=0, sticky="nsew")
|
|
self.spectrum_status_var = self.tk.StringVar(value="PSD uses Hann/Welch scaling in V^2/Hz.")
|
|
ttk.Label(self.spectrum_frame, textvariable=self.spectrum_status_var).grid(
|
|
row=1, column=0, sticky="w", pady=(8, 0)
|
|
)
|
|
self.spectrum_canvas.bind("<Configure>", lambda _event: self.draw_spectrum_grid())
|
|
self.draw_spectrum_grid()
|
|
|
|
self.power_history_frame = ttk.LabelFrame(self.plot_area, text="Output power history", padding=10)
|
|
self.power_history_frame.rowconfigure(0, weight=1)
|
|
self.power_history_frame.columnconfigure(0, weight=1)
|
|
self.power_history_canvas = self.tk.Canvas(
|
|
self.power_history_frame,
|
|
background="#111111",
|
|
highlightthickness=0,
|
|
height=220,
|
|
)
|
|
self.power_history_canvas.grid(row=0, column=0, sticky="nsew")
|
|
power_history_controls = ttk.Frame(self.power_history_frame)
|
|
power_history_controls.grid(row=1, column=0, sticky="ew", pady=(8, 0))
|
|
power_history_controls.columnconfigure(3, weight=1)
|
|
ttk.Label(power_history_controls, text="Window").grid(row=0, column=0, sticky="w")
|
|
ttk.Entry(power_history_controls, textvariable=self.power_history_window_var, width=10).grid(
|
|
row=0, column=1, sticky="w", padx=(8, 4)
|
|
)
|
|
ttk.Label(power_history_controls, text="s").grid(row=0, column=2, sticky="w")
|
|
self.power_history_status_var = self.tk.StringVar(value="P and Sapp over the selected moving window.")
|
|
ttk.Label(power_history_controls, textvariable=self.power_history_status_var).grid(
|
|
row=0, column=3, sticky="w", padx=(16, 0)
|
|
)
|
|
self.power_history_window_var.trace_add("write", lambda *_args: self.draw_power_history())
|
|
self.power_history_canvas.bind("<Configure>", lambda _event: self.draw_power_history_grid())
|
|
self.draw_power_history_grid()
|
|
|
|
self.power_limit_frame = ttk.LabelFrame(self.plot_area, text="Power Spectrum vs WinISD Limit", padding=10)
|
|
self.power_limit_frame.rowconfigure(0, weight=1)
|
|
self.power_limit_frame.columnconfigure(0, weight=1)
|
|
self.power_limit_canvas = self.tk.Canvas(
|
|
self.power_limit_frame,
|
|
background="#111111",
|
|
highlightthickness=0,
|
|
height=240,
|
|
)
|
|
self.power_limit_canvas.grid(row=0, column=0, sticky="nsew")
|
|
self.power_limit_status_var = self.tk.StringVar(value="P_bin and WinISD Pmax over frequency.")
|
|
ttk.Label(self.power_limit_frame, textvariable=self.power_limit_status_var).grid(
|
|
row=1, column=0, sticky="w", pady=(8, 0)
|
|
)
|
|
self.power_limit_canvas.bind("<Configure>", lambda _event: self.draw_power_limit_grid())
|
|
self.draw_power_limit_grid()
|
|
|
|
self.headroom_frame = ttk.LabelFrame(self.plot_area, text="WinISD Headroom", padding=10)
|
|
self.headroom_frame.rowconfigure(0, weight=1)
|
|
self.headroom_frame.columnconfigure(0, weight=1)
|
|
self.headroom_canvas = self.tk.Canvas(
|
|
self.headroom_frame,
|
|
background="#111111",
|
|
highlightthickness=0,
|
|
height=240,
|
|
)
|
|
self.headroom_canvas.grid(row=0, column=0, sticky="nsew")
|
|
self.headroom_status_var = self.tk.StringVar(value="H(f,t) = 10 log10(Pmax / P_bin).")
|
|
ttk.Label(self.headroom_frame, textvariable=self.headroom_status_var).grid(
|
|
row=1, column=0, sticky="w", pady=(8, 0)
|
|
)
|
|
self.headroom_canvas.bind("<Configure>", lambda _event: self.draw_headroom_grid())
|
|
self.draw_headroom_grid()
|
|
self._layout_plot_tiles()
|
|
|
|
def _build_power_config_row(
|
|
self,
|
|
name: str,
|
|
row: int,
|
|
enabled_var,
|
|
channel_var,
|
|
impedance_var,
|
|
gain_var,
|
|
) -> None:
|
|
ttk = self.ttk
|
|
ttk.Label(self.power_frame, text=name).grid(row=row, column=0, sticky="w", pady=4)
|
|
ttk.Checkbutton(self.power_frame, variable=enabled_var).grid(row=row, column=1, sticky="w", pady=4)
|
|
ttk.Entry(self.power_frame, textvariable=channel_var, width=5).grid(row=row, column=2, sticky="w", pady=4)
|
|
ttk.Entry(self.power_frame, textvariable=impedance_var, width=44).grid(row=row, column=3, sticky="ew", padx=(8, 12), pady=4)
|
|
ttk.Entry(self.power_frame, textvariable=gain_var, width=8).grid(row=row, column=4, sticky="w", pady=4)
|
|
|
|
vars_by_metric = {
|
|
"vdsp": self.tk.StringVar(value="-"),
|
|
"vamp": self.tk.StringVar(value="-"),
|
|
"p": self.tk.StringVar(value="-"),
|
|
"q": self.tk.StringVar(value="-"),
|
|
"s": self.tk.StringVar(value="-"),
|
|
"headroom": self.tk.StringVar(value="-"),
|
|
"crit_hz": self.tk.StringVar(value="-"),
|
|
"limit": self.tk.StringVar(value="-"),
|
|
"max_p": self.tk.StringVar(value="-"),
|
|
}
|
|
for offset, key in enumerate(("vdsp", "vamp", "p", "q", "s", "headroom", "crit_hz", "limit"), start=5):
|
|
ttk.Label(self.power_frame, textvariable=vars_by_metric[key], width=12, anchor="e").grid(
|
|
row=row, column=offset, sticky="e", padx=(0, 12 if offset < 9 else 0), pady=4
|
|
)
|
|
meter = ttk.Progressbar(
|
|
self.power_frame,
|
|
orient="horizontal",
|
|
mode="determinate",
|
|
maximum=DEFAULT_POWER_CRITICAL_W,
|
|
style="PowerOk.Horizontal.TProgressbar",
|
|
length=150,
|
|
)
|
|
meter.grid(row=row, column=13, sticky="ew", padx=(8, 8), pady=4)
|
|
light = self.tk.Canvas(self.power_frame, width=18, height=18, highlightthickness=0)
|
|
light.grid(row=row, column=14, sticky="w", padx=(0, 6), pady=4)
|
|
light_id = light.create_oval(3, 3, 15, 15, fill="#22a447", outline="#555555")
|
|
ttk.Label(self.power_frame, textvariable=vars_by_metric["max_p"], width=12, anchor="e").grid(
|
|
row=row, column=15, sticky="e", pady=4
|
|
)
|
|
vars_by_metric["meter"] = meter
|
|
vars_by_metric["light"] = light
|
|
vars_by_metric["light_id"] = light_id
|
|
self.power_rows[name] = vars_by_metric
|
|
|
|
def _layout_plot_tiles(self) -> None:
|
|
plot_specs = (
|
|
(self.scope_frame, self.show_waveform_plot_var, self.draw_scope_grid),
|
|
(self.spectrum_frame, self.show_psd_plot_var, self.draw_spectrum_grid),
|
|
(self.power_history_frame, self.show_power_history_plot_var, self.draw_power_history_grid),
|
|
(self.power_limit_frame, self.show_power_limit_plot_var, self.draw_power_limit_grid),
|
|
(self.headroom_frame, self.show_headroom_plot_var, self.draw_headroom_grid),
|
|
)
|
|
active = [(frame, redraw) for frame, variable, redraw in plot_specs if variable.get()]
|
|
for frame, _variable, _redraw in plot_specs:
|
|
frame.grid_forget()
|
|
|
|
for idx, (frame, redraw) in enumerate(active):
|
|
row = idx // 2
|
|
col = idx % 2
|
|
frame.grid(
|
|
row=row,
|
|
column=col,
|
|
sticky="nsew",
|
|
padx=(0, 6) if col == 0 else (6, 0),
|
|
pady=(0, 12),
|
|
)
|
|
redraw()
|
|
|
|
if self.show_psd_plot_var.get():
|
|
self.draw_spectrum()
|
|
if self.show_power_history_plot_var.get():
|
|
self.draw_power_history()
|
|
if self.show_power_limit_plot_var.get():
|
|
self.draw_power_limit()
|
|
if self.show_headroom_plot_var.get():
|
|
self.draw_headroom()
|
|
|
|
def _select_initial_device(self, requested_index: int | None) -> None:
|
|
selected = 0
|
|
if requested_index is not None:
|
|
for idx, entry in enumerate(self.device_entries):
|
|
if entry.index == requested_index:
|
|
selected = idx
|
|
break
|
|
else:
|
|
for idx, entry in enumerate(self.device_entries):
|
|
if "UltraLite" in entry.label:
|
|
selected = idx
|
|
break
|
|
self.device_combo.current(selected)
|
|
|
|
def _parse_gui_config(self) -> InputConfig:
|
|
combo_index = self.device_combo.current()
|
|
if combo_index < 0:
|
|
raise ValueError("select an input device")
|
|
|
|
entry = self.device_entries[combo_index]
|
|
device_index = entry.index
|
|
device = entry.info
|
|
max_input_channels = int(device["max_input_channels"])
|
|
channels = parse_channel_list(self.channels_var.get())
|
|
self.device_manager.validate_channels(channels, max_input_channels)
|
|
|
|
samplerate_text = self.samplerate_var.get().strip()
|
|
samplerate = None if samplerate_text == "" else float(samplerate_text)
|
|
blocksize = int(self.blocksize_var.get())
|
|
calibration = float(self.calibration_var.get())
|
|
|
|
if samplerate is not None and samplerate <= 0:
|
|
raise ValueError("sample rate must be positive")
|
|
if blocksize <= 0:
|
|
raise ValueError("blocksize must be positive")
|
|
if calibration <= 0:
|
|
raise ValueError("volts per sample unit must be positive")
|
|
|
|
return InputConfig(
|
|
device_index=device_index,
|
|
channels_zero_based=channels,
|
|
samplerate=samplerate,
|
|
blocksize=blocksize,
|
|
input_volts_per_sample_unit=calibration,
|
|
update_interval_s=self.args.update_interval,
|
|
)
|
|
|
|
def _parse_power_settings(self, config: InputConfig, actual_samplerate: float) -> None:
|
|
if np is None:
|
|
raise RuntimeError("numpy has not been loaded")
|
|
|
|
fft_size = int(self.power_fft_size_var.get())
|
|
overlap = float(self.power_overlap_var.get())
|
|
smoothing = float(self.power_smoothing_var.get())
|
|
headroom_smoothing = float(self.headroom_smoothing_var.get())
|
|
if fft_size <= 0 or fft_size % 2:
|
|
raise ValueError("power FFT size must be a positive even number")
|
|
if not 0.0 <= overlap < 1.0:
|
|
raise ValueError("power overlap must be >= 0 and < 1")
|
|
if not 0.0 <= smoothing < 1.0:
|
|
raise ValueError("power smoothing must be >= 0 and < 1")
|
|
if not 0.0 <= headroom_smoothing < 1.0:
|
|
raise ValueError("headroom smoothing must be >= 0 and < 1")
|
|
|
|
hop_size = int(round(fft_size * (1.0 - overlap)))
|
|
if hop_size <= 0:
|
|
raise ValueError("power overlap is too high")
|
|
|
|
ways: list[WayConfig] = []
|
|
specs = (
|
|
("LF", self.lf_enabled_var, self.lf_channel_var, self.lf_impedance_var, self.lf_gain_var),
|
|
("HF", self.hf_enabled_var, self.hf_channel_var, self.hf_impedance_var, self.hf_gain_var),
|
|
)
|
|
for name, enabled_var, channel_var, impedance_var, gain_var in specs:
|
|
if not enabled_var.get():
|
|
continue
|
|
impedance_path = impedance_var.get().strip()
|
|
if not impedance_path:
|
|
continue
|
|
channel = parse_channel_list(channel_var.get())[0]
|
|
if channel >= config.stream_channel_count:
|
|
raise ValueError(
|
|
f"{name} channel {channel + 1} is not captured; include it in Channels"
|
|
)
|
|
ways.append(
|
|
WayConfig(
|
|
name=name,
|
|
channel_zero_based=channel,
|
|
impedance_path=Path(impedance_path),
|
|
amp_gain_db=float(gain_var.get()),
|
|
)
|
|
)
|
|
|
|
names = {way.name for way in ways}
|
|
for name, row in self.power_rows.items():
|
|
if name not in names:
|
|
for key in ("vdsp", "vamp", "p", "q", "s", "headroom", "crit_hz", "limit", "max_p"):
|
|
row[key].set("-")
|
|
row["meter"]["value"] = 0.0
|
|
row["meter"]["maximum"] = DEFAULT_POWER_CRITICAL_W
|
|
row["meter"].configure(style="PowerOk.Horizontal.TProgressbar")
|
|
row["light"].itemconfigure(row["light_id"], fill="#22a447")
|
|
|
|
self.power_estimators = {}
|
|
self.power_buffers = {}
|
|
self.power_history = {}
|
|
self.power_max_hold_w = {}
|
|
self.latest_power_frames = {}
|
|
self.latest_headroom_frames = {}
|
|
self.headroom_curves = {}
|
|
self.headroom_smoothed_db = {}
|
|
self.power_history_start_s = None
|
|
self.power_hop_size = hop_size
|
|
self.input_spectrum_estimators = {}
|
|
self.input_spectrum_buffers = {}
|
|
self.input_spectrum_frames = {}
|
|
|
|
for channel in config.channels_zero_based:
|
|
self.input_spectrum_estimators[channel] = VoltageSpectrumEstimator(
|
|
samplerate=actual_samplerate,
|
|
fft_size=fft_size,
|
|
input_volts_per_sample_unit=config.input_volts_per_sample_unit,
|
|
smoothing_alpha=smoothing,
|
|
)
|
|
self.input_spectrum_buffers[channel] = np.zeros(0, dtype=np.float64)
|
|
|
|
for way in ways:
|
|
impedance = ImpedanceCurve.from_file(way.impedance_path)
|
|
self.power_estimators[way.name] = PowerEstimator(
|
|
way=way,
|
|
impedance=impedance,
|
|
samplerate=actual_samplerate,
|
|
fft_size=fft_size,
|
|
input_volts_per_sample_unit=config.input_volts_per_sample_unit,
|
|
smoothing_alpha=smoothing,
|
|
)
|
|
self.power_buffers[way.name] = np.zeros(0, dtype=np.float64)
|
|
self.power_history[way.name] = []
|
|
self.power_max_hold_w[way.name] = 0.0
|
|
|
|
self._load_gui_winisd_curves()
|
|
|
|
def _load_gui_winisd_curves(self) -> None:
|
|
self.headroom_curves = {}
|
|
self.headroom_smoothed_db = {}
|
|
if not self.winisd_enabled_var.get():
|
|
self.winisd_status_var.set("WinISD headroom disabled.")
|
|
return
|
|
|
|
lf_estimator = self.power_estimators.get("LF")
|
|
if lf_estimator is None:
|
|
self.winisd_status_var.set("WinISD headroom disabled: LF way is not active.")
|
|
return
|
|
|
|
path = Path(self.winisd_max_power_var.get().strip() or WINISD_MAX_POWER_FILE)
|
|
try:
|
|
curve = WinISDLimitCurve.from_file(path, eps_w=HEADROOM_EPS_W)
|
|
except Exception as exc:
|
|
self.winisd_status_var.set(f"WinISD headroom disabled: {exc}")
|
|
return
|
|
|
|
self.headroom_curves["LF"] = curve
|
|
status = (
|
|
f"WinISD headroom active: {path}, "
|
|
f"{curve.frequency_hz[0]:.1f}-{curve.frequency_hz[-1]:.1f} Hz."
|
|
)
|
|
excursion_10w = WINISD_EXCURSION_10W_FILE
|
|
excursion_100w = WINISD_EXCURSION_100W_FILE
|
|
if excursion_10w.exists() and excursion_100w.exists():
|
|
try:
|
|
check = check_excursion_scaling(excursion_10w, excursion_100w)
|
|
status += " " + check.message
|
|
except Exception as exc:
|
|
status += f" Excursion check skipped: {exc}"
|
|
self.winisd_status_var.set(status)
|
|
|
|
def _process_input_spectrum_block(self, block) -> None:
|
|
if np is None or not self.input_spectrum_estimators:
|
|
return
|
|
|
|
for channel, estimator in self.input_spectrum_estimators.items():
|
|
samples = block[:, channel].astype(np.float64, copy=False)
|
|
self.input_spectrum_buffers[channel] = np.concatenate(
|
|
(self.input_spectrum_buffers[channel], samples)
|
|
)
|
|
|
|
latest = None
|
|
while len(self.input_spectrum_buffers[channel]) >= estimator.fft_size:
|
|
latest = estimator.estimate(
|
|
self.input_spectrum_buffers[channel][: estimator.fft_size]
|
|
)
|
|
self.input_spectrum_buffers[channel] = self.input_spectrum_buffers[channel][
|
|
self.power_hop_size :
|
|
]
|
|
|
|
if latest is None:
|
|
continue
|
|
|
|
self.input_spectrum_frames[channel] = latest
|
|
row = self.input_metric_rows.get(channel)
|
|
if row is not None:
|
|
row["rms"].set(f"{latest.rms_v:.3f} V")
|
|
row["peak"].set(f"{latest.peak_v:.3f} V")
|
|
row["crest"].set(f"{latest.crest_factor:.2f}x")
|
|
row["crest_db"].set(f"{latest.crest_factor_db:.2f} dB")
|
|
digital_peak = latest.peak_v / self.config.input_volts_per_sample_unit if self.config else 0.0
|
|
row["digital_peak"].set(f"{digital_peak:.5f} FS")
|
|
|
|
self.draw_spectrum()
|
|
|
|
def _process_power_block(self, block) -> None:
|
|
if np is None or not self.power_estimators:
|
|
return
|
|
|
|
for name, estimator in self.power_estimators.items():
|
|
way = estimator.way
|
|
samples = block[:, way.channel_zero_based].astype(np.float64, copy=False)
|
|
self.power_buffers[name] = np.concatenate((self.power_buffers[name], samples))
|
|
|
|
latest = None
|
|
while len(self.power_buffers[name]) >= estimator.fft_size:
|
|
latest = estimator.estimate(self.power_buffers[name][: estimator.fft_size])
|
|
self.power_buffers[name] = self.power_buffers[name][self.power_hop_size :]
|
|
|
|
if latest is None:
|
|
continue
|
|
|
|
self.latest_power_frames[name] = latest
|
|
row = self.power_rows.get(name)
|
|
if row is None:
|
|
continue
|
|
row["vdsp"].set(f"{latest.rms_input_v:.3f} V")
|
|
row["vamp"].set(f"{latest.rms_amp_v:.1f} V")
|
|
row["p"].set(f"{latest.total_p_w:.1f} W")
|
|
row["q"].set(f"{latest.total_q_var:.1f} var")
|
|
row["s"].set(f"{latest.total_s_va:.1f} VA")
|
|
headroom = self._compute_gui_headroom(name, latest)
|
|
if headroom is None:
|
|
self.latest_headroom_frames.pop(name, None)
|
|
row["headroom"].set("-")
|
|
row["crit_hz"].set("-")
|
|
row["limit"].set("-")
|
|
else:
|
|
self.latest_headroom_frames[name] = headroom
|
|
row["headroom"].set(f"{headroom['worst_headroom_db']:+.1f} dB")
|
|
row["crit_hz"].set(f"{headroom['critical_frequency_hz']:.1f}")
|
|
row["limit"].set(headroom["status"])
|
|
self._update_power_meter(name, latest.total_p_w, headroom)
|
|
self._append_power_history(name, latest)
|
|
|
|
self.draw_power_history()
|
|
self.draw_power_limit()
|
|
self.draw_headroom()
|
|
|
|
def _compute_gui_headroom(self, name: str, frame: PowerFrame) -> dict | None:
|
|
curve = self.headroom_curves.get(name)
|
|
if curve is None:
|
|
return None
|
|
headroom = curve.compute_headroom(frame.p_w_per_bin, frame.frequencies_hz)
|
|
raw = float(headroom["worst_headroom_db"])
|
|
try:
|
|
alpha = float(self.headroom_smoothing_var.get())
|
|
except ValueError:
|
|
alpha = HEADROOM_SMOOTHING_ALPHA
|
|
alpha = min(max(alpha, 0.0), 0.999999)
|
|
smoothed = smooth_headroom_db(self.headroom_smoothed_db.get(name), raw, alpha)
|
|
self.headroom_smoothed_db[name] = smoothed
|
|
headroom["worst_headroom_db"] = smoothed
|
|
headroom["status"] = headroom_status(smoothed)
|
|
headroom["over_limit_bool"] = bool(math.isfinite(smoothed) and smoothed <= 0.0)
|
|
return headroom
|
|
|
|
def _update_power_meter(self, name: str, power_w: float, headroom: dict | None = None) -> None:
|
|
row = self.power_rows.get(name)
|
|
if row is None:
|
|
return
|
|
|
|
max_hold = max(self.power_max_hold_w.get(name, 0.0), float(power_w))
|
|
self.power_max_hold_w[name] = max_hold
|
|
|
|
if headroom is not None and math.isfinite(float(headroom["critical_pmax_w"])):
|
|
critical_pmax = max(float(headroom["critical_pmax_w"]), HEADROOM_EPS_W)
|
|
critical_power = max(float(headroom["critical_power_w"]), 0.0)
|
|
meter_value = min((critical_power / critical_pmax) * 100.0, 100.0)
|
|
row["meter"]["maximum"] = 100.0
|
|
status = headroom["status"]
|
|
if status == "LIMIT EXCEEDED":
|
|
color = "#d62728"
|
|
style = "PowerCritical.Horizontal.TProgressbar"
|
|
elif status == "WARNING":
|
|
color = "#e6a400"
|
|
style = "PowerWarn.Horizontal.TProgressbar"
|
|
else:
|
|
color = "#22a447"
|
|
style = "PowerOk.Horizontal.TProgressbar"
|
|
else:
|
|
meter_value = min(max(float(power_w), 0.0), DEFAULT_POWER_CRITICAL_W)
|
|
row["meter"]["maximum"] = DEFAULT_POWER_CRITICAL_W
|
|
if power_w >= DEFAULT_POWER_CRITICAL_W:
|
|
color = "#d62728"
|
|
style = "PowerCritical.Horizontal.TProgressbar"
|
|
elif power_w >= DEFAULT_POWER_WARNING_W:
|
|
color = "#e6a400"
|
|
style = "PowerWarn.Horizontal.TProgressbar"
|
|
else:
|
|
color = "#22a447"
|
|
style = "PowerOk.Horizontal.TProgressbar"
|
|
|
|
row["meter"]["value"] = meter_value
|
|
row["meter"].configure(style=style)
|
|
row["light"].itemconfigure(row["light_id"], fill=color)
|
|
row["max_p"].set(f"{max_hold:.1f} W")
|
|
|
|
def _append_power_history(self, name: str, frame: PowerFrame) -> None:
|
|
now = time.monotonic()
|
|
if self.power_history_start_s is None:
|
|
self.power_history_start_s = now
|
|
t = now - self.power_history_start_s
|
|
history = self.power_history.setdefault(name, [])
|
|
history.append((t, frame.total_p_w, frame.total_s_va))
|
|
window_s = self._power_history_window_s()
|
|
keep_after = max(0.0, t - window_s * 1.5)
|
|
self.power_history[name] = [item for item in history if item[0] >= keep_after]
|
|
|
|
def _power_history_window_s(self) -> float:
|
|
try:
|
|
value = float(self.power_history_window_var.get().strip())
|
|
except ValueError:
|
|
value = 30.0
|
|
return value if value > 0.0 else 30.0
|
|
|
|
def _build_level_rows(self, channels_zero_based: list[int]) -> None:
|
|
for child in self.level_frame.winfo_children()[self.input_metric_static_widget_count:]:
|
|
child.destroy()
|
|
self.level_rows = []
|
|
self.input_metric_rows = {}
|
|
|
|
for row, channel in enumerate(channels_zero_based, start=2):
|
|
channel_label = self.ttk.Label(self.level_frame, text=f"Ch {channel + 1}", width=8)
|
|
channel_label.grid(row=row, column=0, sticky="w", pady=6)
|
|
|
|
vars_by_metric = {
|
|
"rms": self.tk.StringVar(value="-"),
|
|
"peak": self.tk.StringVar(value="-"),
|
|
"crest": self.tk.StringVar(value="-"),
|
|
"crest_db": self.tk.StringVar(value="-"),
|
|
"digital_peak": self.tk.StringVar(value="-"),
|
|
}
|
|
for col, key in enumerate(("rms", "peak", "crest", "crest_db", "digital_peak"), start=1):
|
|
self.ttk.Label(self.level_frame, textvariable=vars_by_metric[key], width=16, anchor="e").grid(
|
|
row=row,
|
|
column=col,
|
|
sticky="e",
|
|
padx=(0, 18 if col < 5 else 0),
|
|
pady=6,
|
|
)
|
|
self.input_metric_rows[channel] = vars_by_metric
|
|
|
|
self.draw_scope_grid()
|
|
self.draw_spectrum_grid()
|
|
|
|
def draw_scope_grid(self) -> None:
|
|
canvas = self.scope_canvas
|
|
canvas.delete("grid")
|
|
canvas.delete("trace")
|
|
width = max(canvas.winfo_width(), 2)
|
|
height = max(canvas.winfo_height(), 2)
|
|
y_limit_v = self._scope_y_limit_v()
|
|
|
|
for frac in (0.0, 0.25, 0.5, 0.75, 1.0):
|
|
y = int(frac * (height - 1))
|
|
color = "#555555" if frac == 0.5 else "#2a2a2a"
|
|
canvas.create_line(0, y, width, y, fill=color, tags="grid")
|
|
|
|
for frac in (0.0, 0.25, 0.5, 0.75, 1.0):
|
|
x = int(frac * (width - 1))
|
|
canvas.create_line(x, 0, x, height, fill="#242424", tags="grid")
|
|
|
|
canvas.create_text(6, 6, anchor="nw", text=f"+{y_limit_v:g} V", fill="#bbbbbb", tags="grid")
|
|
canvas.create_text(6, height // 2 + 4, anchor="nw", text="0", fill="#bbbbbb", tags="grid")
|
|
canvas.create_text(6, height - 20, anchor="nw", text=f"-{y_limit_v:g} V", fill="#bbbbbb", tags="grid")
|
|
|
|
if self.config is not None:
|
|
legend_parts = []
|
|
for idx, channel in enumerate(self.config.channels_zero_based):
|
|
color = OSCILLOSCOPE_TRACE_COLORS[idx % len(OSCILLOSCOPE_TRACE_COLORS)]
|
|
legend_parts.append((f"Ch {channel + 1}", color))
|
|
x = width - 10
|
|
for label, color in reversed(legend_parts):
|
|
text_id = canvas.create_text(x, 6, anchor="ne", text=label, fill=color, tags="grid")
|
|
bbox = canvas.bbox(text_id)
|
|
if bbox:
|
|
x = bbox[0] - 12
|
|
|
|
def _scope_y_limit_v(self) -> float:
|
|
text = self.waveform_y_limit_var.get().strip()
|
|
if text:
|
|
try:
|
|
manual_limit = float(text)
|
|
except ValueError:
|
|
manual_limit = 0.0
|
|
if manual_limit > 0.0:
|
|
return manual_limit
|
|
if self.config is None:
|
|
return DEFAULT_INPUT_VOLTS_PER_SAMPLE_UNIT
|
|
return self.config.input_volts_per_sample_unit
|
|
|
|
def draw_waveform(self, block) -> None:
|
|
if self.config is None or np is None:
|
|
return
|
|
|
|
canvas = self.scope_canvas
|
|
width = max(canvas.winfo_width(), 2)
|
|
height = max(canvas.winfo_height(), 2)
|
|
selected = block[:, self.config.channels_zero_based].astype(np.float64, copy=False)
|
|
volts = selected * self.config.input_volts_per_sample_unit
|
|
frame_count = selected.shape[0]
|
|
if frame_count < 2:
|
|
return
|
|
|
|
canvas.delete("trace")
|
|
samples_per_pixel = max(1, frame_count // width)
|
|
decimated = volts[::samples_per_pixel, :]
|
|
point_count = decimated.shape[0]
|
|
if point_count < 2:
|
|
return
|
|
|
|
x_scale = (width - 1) / (point_count - 1)
|
|
y_mid = (height - 1) / 2.0
|
|
y_limit_v = self._scope_y_limit_v()
|
|
y_scale = y_mid / y_limit_v
|
|
|
|
for channel_idx in range(decimated.shape[1]):
|
|
waveform = np.clip(
|
|
decimated[:, channel_idx],
|
|
-y_limit_v,
|
|
y_limit_v,
|
|
)
|
|
points: list[float] = []
|
|
for sample_idx, sample in enumerate(waveform):
|
|
points.extend((sample_idx * x_scale, y_mid - float(sample) * y_scale))
|
|
|
|
color = OSCILLOSCOPE_TRACE_COLORS[channel_idx % len(OSCILLOSCOPE_TRACE_COLORS)]
|
|
canvas.create_line(
|
|
points,
|
|
fill=color,
|
|
width=1.5,
|
|
smooth=False,
|
|
tags="trace",
|
|
)
|
|
|
|
def draw_spectrum_grid(self) -> None:
|
|
canvas = self.spectrum_canvas
|
|
canvas.delete("grid")
|
|
canvas.delete("spectrum")
|
|
width = max(canvas.winfo_width(), 2)
|
|
height = max(canvas.winfo_height(), 2)
|
|
|
|
for frac in (0.0, 0.25, 0.5, 0.75, 1.0):
|
|
y = int(frac * (height - 1))
|
|
color = "#555555" if frac == 0.5 else "#2a2a2a"
|
|
canvas.create_line(0, y, width, y, fill=color, tags="grid")
|
|
for frac in (0.0, 0.25, 0.5, 0.75, 1.0):
|
|
x = int(frac * (width - 1))
|
|
canvas.create_line(x, 0, x, height, fill="#242424", tags="grid")
|
|
|
|
canvas.create_text(6, 6, anchor="nw", text="PSD dB V^2/Hz", fill="#bbbbbb", tags="grid")
|
|
canvas.create_text(6, height - 20, anchor="nw", text="20 Hz", fill="#bbbbbb", tags="grid")
|
|
canvas.create_text(width - 6, height - 20, anchor="ne", text="Nyquist", fill="#bbbbbb", tags="grid")
|
|
|
|
def draw_spectrum(self) -> None:
|
|
if np is None or not self.input_spectrum_frames:
|
|
return
|
|
|
|
canvas = self.spectrum_canvas
|
|
width = max(canvas.winfo_width(), 2)
|
|
height = max(canvas.winfo_height(), 2)
|
|
canvas.delete("spectrum")
|
|
|
|
all_db = []
|
|
prepared = []
|
|
for idx, (channel, frame) in enumerate(sorted(self.input_spectrum_frames.items())):
|
|
freq = frame.frequencies_hz
|
|
mask = freq >= 20.0
|
|
if np.count_nonzero(mask) < 2:
|
|
continue
|
|
freq = freq[mask]
|
|
db = 10.0 * np.log10(np.maximum(frame.psd_v2_per_hz[mask], 1e-30))
|
|
all_db.append(db)
|
|
prepared.append((idx, channel, freq, db))
|
|
|
|
if not prepared:
|
|
return
|
|
|
|
all_values = np.concatenate(all_db)
|
|
y_max = float(np.nanmax(all_values))
|
|
y_min = float(np.nanmin(all_values))
|
|
if not np.isfinite(y_min) or not np.isfinite(y_max):
|
|
return
|
|
y_max = math.ceil((y_max + 3.0) / 10.0) * 10.0
|
|
y_min = min(y_max - 60.0, math.floor((y_min - 3.0) / 10.0) * 10.0)
|
|
if y_max <= y_min:
|
|
y_max = y_min + 60.0
|
|
|
|
f_min = 20.0
|
|
f_max = max(float(frame.frequencies_hz[-1]) for frame in self.input_spectrum_frames.values())
|
|
log_min = math.log10(f_min)
|
|
log_max = math.log10(f_max)
|
|
|
|
canvas.create_text(6, 22, anchor="nw", text=f"{y_max:g}", fill="#888888", tags="spectrum")
|
|
canvas.create_text(6, height - 38, anchor="nw", text=f"{y_min:g}", fill="#888888", tags="spectrum")
|
|
|
|
for idx, channel, freq, db in prepared:
|
|
x = (np.log10(freq) - log_min) / (log_max - log_min) * (width - 1)
|
|
y = (y_max - db) / (y_max - y_min) * (height - 1)
|
|
points: list[float] = []
|
|
step = max(1, len(x) // max(width, 1))
|
|
for px, py in zip(x[::step], y[::step]):
|
|
points.extend((float(px), float(np.clip(py, 0, height - 1))))
|
|
if len(points) >= 4:
|
|
color = OSCILLOSCOPE_TRACE_COLORS[idx % len(OSCILLOSCOPE_TRACE_COLORS)]
|
|
canvas.create_line(points, fill=color, width=1.4, smooth=False, tags="spectrum")
|
|
canvas.create_text(
|
|
width - 8,
|
|
8 + 16 * idx,
|
|
anchor="ne",
|
|
text=f"Ch {channel + 1}",
|
|
fill=color,
|
|
tags="spectrum",
|
|
)
|
|
|
|
self.spectrum_status_var.set(
|
|
f"PSD: {y_min:g} to {y_max:g} dB V^2/Hz, Hann/Welch, {len(prepared)} channel(s)"
|
|
)
|
|
|
|
def draw_power_history_grid(self) -> None:
|
|
canvas = self.power_history_canvas
|
|
canvas.delete("grid")
|
|
canvas.delete("history")
|
|
width = max(canvas.winfo_width(), 2)
|
|
height = max(canvas.winfo_height(), 2)
|
|
|
|
for frac in (0.0, 0.25, 0.5, 0.75, 1.0):
|
|
y = int(frac * (height - 1))
|
|
color = "#555555" if frac == 0.5 else "#2a2a2a"
|
|
canvas.create_line(0, y, width, y, fill=color, tags="grid")
|
|
for frac in (0.0, 0.25, 0.5, 0.75, 1.0):
|
|
x = int(frac * (width - 1))
|
|
canvas.create_line(x, 0, x, height, fill="#242424", tags="grid")
|
|
|
|
canvas.create_text(6, 6, anchor="nw", text="P solid / Sapp dashed", fill="#bbbbbb", tags="grid")
|
|
canvas.create_text(6, height - 20, anchor="nw", text=f"-{self._power_history_window_s():g} s", fill="#bbbbbb", tags="grid")
|
|
canvas.create_text(width - 6, height - 20, anchor="ne", text="now", fill="#bbbbbb", tags="grid")
|
|
|
|
def draw_power_history(self) -> None:
|
|
canvas = self.power_history_canvas
|
|
if not self.power_history:
|
|
self.draw_power_history_grid()
|
|
return
|
|
|
|
canvas.delete("history")
|
|
width = max(canvas.winfo_width(), 2)
|
|
height = max(canvas.winfo_height(), 2)
|
|
window_s = self._power_history_window_s()
|
|
|
|
latest_t = max((items[-1][0] for items in self.power_history.values() if items), default=0.0)
|
|
t_min = max(0.0, latest_t - window_s)
|
|
prepared = []
|
|
all_values: list[float] = []
|
|
for idx, (name, items) in enumerate(sorted(self.power_history.items())):
|
|
visible = [item for item in items if item[0] >= t_min]
|
|
if len(visible) < 2:
|
|
continue
|
|
t = np.asarray([item[0] for item in visible], dtype=np.float64)
|
|
p = np.asarray([item[1] for item in visible], dtype=np.float64)
|
|
s = np.asarray([item[2] for item in visible], dtype=np.float64)
|
|
prepared.append((idx, name, t, p, s))
|
|
all_values.extend(float(v) for v in p)
|
|
all_values.extend(float(v) for v in s)
|
|
|
|
if not prepared:
|
|
return
|
|
|
|
y_max = max(all_values)
|
|
y_min = min(0.0, min(all_values))
|
|
if y_max <= y_min:
|
|
y_max = y_min + 1.0
|
|
padding = 0.08 * (y_max - y_min)
|
|
y_max += padding
|
|
y_min -= padding
|
|
|
|
def to_points(t_values, y_values) -> list[float]:
|
|
x = (t_values - (latest_t - window_s)) / window_s * (width - 1)
|
|
y = (y_max - y_values) / (y_max - y_min) * (height - 1)
|
|
points: list[float] = []
|
|
for px, py in zip(x, y):
|
|
points.extend((float(np.clip(px, 0, width - 1)), float(np.clip(py, 0, height - 1))))
|
|
return points
|
|
|
|
canvas.create_text(6, 22, anchor="nw", text=f"{y_max:.1f}", fill="#888888", tags="history")
|
|
canvas.create_text(6, height - 38, anchor="nw", text=f"{y_min:.1f}", fill="#888888", tags="history")
|
|
|
|
for idx, name, t, p, s in prepared:
|
|
color = OSCILLOSCOPE_TRACE_COLORS[idx % len(OSCILLOSCOPE_TRACE_COLORS)]
|
|
p_points = to_points(t, p)
|
|
s_points = to_points(t, s)
|
|
if len(p_points) >= 4:
|
|
canvas.create_line(p_points, fill=color, width=1.8, smooth=False, tags="history")
|
|
if len(s_points) >= 4:
|
|
canvas.create_line(
|
|
s_points,
|
|
fill=color,
|
|
width=1.2,
|
|
smooth=False,
|
|
dash=(5, 3),
|
|
tags="history",
|
|
)
|
|
canvas.create_text(
|
|
width - 8,
|
|
8 + 16 * idx,
|
|
anchor="ne",
|
|
text=f"{name} P/Sapp",
|
|
fill=color,
|
|
tags="history",
|
|
)
|
|
|
|
self.power_history_status_var.set(
|
|
f"Window {window_s:g} s, y {y_min:.1f} to {y_max:.1f} W / VA"
|
|
)
|
|
|
|
def _winisd_plot_data(self):
|
|
for name in sorted(self.latest_headroom_frames):
|
|
frame = self.latest_power_frames.get(name)
|
|
headroom = self.latest_headroom_frames.get(name)
|
|
curve = self.headroom_curves.get(name)
|
|
if frame is not None and headroom is not None and curve is not None:
|
|
return name, frame, headroom, curve
|
|
return None
|
|
|
|
def draw_power_limit_grid(self) -> None:
|
|
canvas = self.power_limit_canvas
|
|
canvas.delete("grid")
|
|
canvas.delete("limit")
|
|
width = max(canvas.winfo_width(), 2)
|
|
height = max(canvas.winfo_height(), 2)
|
|
|
|
for frac in (0.0, 0.25, 0.5, 0.75, 1.0):
|
|
y = int(frac * (height - 1))
|
|
color = "#555555" if frac == 0.5 else "#2a2a2a"
|
|
canvas.create_line(0, y, width, y, fill=color, tags="grid")
|
|
x = int(frac * (width - 1))
|
|
canvas.create_line(x, 0, x, height, fill="#242424", tags="grid")
|
|
|
|
canvas.create_text(6, 6, anchor="nw", text="Power [W], log scale", fill="#bbbbbb", tags="grid")
|
|
canvas.create_text(6, height - 20, anchor="nw", text="20 Hz", fill="#bbbbbb", tags="grid")
|
|
canvas.create_text(width - 6, height - 20, anchor="ne", text="Nyquist", fill="#bbbbbb", tags="grid")
|
|
|
|
def draw_power_limit(self) -> None:
|
|
data = self._winisd_plot_data()
|
|
if np is None or data is None:
|
|
return
|
|
|
|
name, frame, headroom, curve = data
|
|
canvas = self.power_limit_canvas
|
|
canvas.delete("limit")
|
|
width = max(canvas.winfo_width(), 2)
|
|
height = max(canvas.winfo_height(), 2)
|
|
|
|
freq = frame.frequencies_hz
|
|
pmax = curve.interpolate(freq)
|
|
p_bin = np.maximum(frame.p_w_per_bin, 0.0)
|
|
mask = np.isfinite(freq) & np.isfinite(pmax) & (freq >= 20.0) & (pmax > HEADROOM_EPS_W)
|
|
if np.count_nonzero(mask) < 2:
|
|
return
|
|
|
|
freq = freq[mask]
|
|
pmax = pmax[mask]
|
|
p_bin = p_bin[mask]
|
|
p_plot = np.maximum(p_bin, 1e-9)
|
|
|
|
f_min = max(20.0, float(freq[0]))
|
|
f_max = float(freq[-1])
|
|
if f_max <= f_min:
|
|
return
|
|
log_f_min = math.log10(f_min)
|
|
log_f_max = math.log10(f_max)
|
|
|
|
positive_values = np.concatenate((p_plot[np.isfinite(p_plot)], pmax[np.isfinite(pmax)]))
|
|
positive_values = positive_values[positive_values > 0.0]
|
|
if len(positive_values) == 0:
|
|
return
|
|
y_min = max(1e-6, 10.0 ** math.floor(math.log10(float(np.nanmin(positive_values)))))
|
|
y_max = 10.0 ** math.ceil(math.log10(float(np.nanmax(positive_values)) * 1.2))
|
|
if y_max <= y_min:
|
|
y_max = y_min * 10.0
|
|
log_y_min = math.log10(y_min)
|
|
log_y_max = math.log10(y_max)
|
|
|
|
def to_points(y_values: np.ndarray) -> list[float]:
|
|
x = (np.log10(freq) - log_f_min) / (log_f_max - log_f_min) * (width - 1)
|
|
y = (log_y_max - np.log10(np.maximum(y_values, y_min))) / (log_y_max - log_y_min) * (height - 1)
|
|
step = max(1, len(x) // max(width, 1))
|
|
points: list[float] = []
|
|
for px, py in zip(x[::step], y[::step]):
|
|
points.extend((float(np.clip(px, 0, width - 1)), float(np.clip(py, 0, height - 1))))
|
|
return points
|
|
|
|
p_points = to_points(p_plot)
|
|
pmax_points = to_points(pmax)
|
|
if len(p_points) >= 4:
|
|
canvas.create_line(p_points, fill="#1f77b4", width=1.4, smooth=False, tags="limit")
|
|
if len(pmax_points) >= 4:
|
|
canvas.create_line(pmax_points, fill="#d62728", width=1.8, smooth=False, tags="limit")
|
|
|
|
critical_f = float(headroom["critical_frequency_hz"])
|
|
if math.isfinite(critical_f) and f_min <= critical_f <= f_max:
|
|
x = (math.log10(critical_f) - log_f_min) / (log_f_max - log_f_min) * (width - 1)
|
|
canvas.create_line(x, 0, x, height, fill="#e6a400", width=1.4, dash=(4, 3), tags="limit")
|
|
|
|
canvas.create_text(6, 22, anchor="nw", text=f"{y_max:g} W", fill="#888888", tags="limit")
|
|
canvas.create_text(6, height - 38, anchor="nw", text=f"{y_min:g} W", fill="#888888", tags="limit")
|
|
canvas.create_text(width - 8, 8, anchor="ne", text=f"{name} P_bin", fill="#1f77b4", tags="limit")
|
|
canvas.create_text(width - 8, 24, anchor="ne", text="WinISD Pmax", fill="#d62728", tags="limit")
|
|
self.power_limit_status_var.set(
|
|
f"{name}: P_bin vs Pmax, critical {critical_f:.1f} Hz, "
|
|
f"worst {headroom['worst_headroom_db']:+.1f} dB"
|
|
)
|
|
|
|
def draw_headroom_grid(self) -> None:
|
|
canvas = self.headroom_canvas
|
|
canvas.delete("grid")
|
|
canvas.delete("headroom")
|
|
width = max(canvas.winfo_width(), 2)
|
|
height = max(canvas.winfo_height(), 2)
|
|
|
|
for frac in (0.0, 0.25, 0.5, 0.75, 1.0):
|
|
y = int(frac * (height - 1))
|
|
color = "#555555" if frac == 0.5 else "#2a2a2a"
|
|
canvas.create_line(0, y, width, y, fill=color, tags="grid")
|
|
x = int(frac * (width - 1))
|
|
canvas.create_line(x, 0, x, height, fill="#242424", tags="grid")
|
|
|
|
canvas.create_text(6, 6, anchor="nw", text="Headroom [dB]", fill="#bbbbbb", tags="grid")
|
|
canvas.create_text(6, height - 20, anchor="nw", text="20 Hz", fill="#bbbbbb", tags="grid")
|
|
canvas.create_text(width - 6, height - 20, anchor="ne", text="Nyquist", fill="#bbbbbb", tags="grid")
|
|
|
|
def draw_headroom(self) -> None:
|
|
data = self._winisd_plot_data()
|
|
if np is None or data is None:
|
|
return
|
|
|
|
name, frame, headroom, curve = data
|
|
canvas = self.headroom_canvas
|
|
canvas.delete("headroom")
|
|
width = max(canvas.winfo_width(), 2)
|
|
height = max(canvas.winfo_height(), 2)
|
|
|
|
freq = frame.frequencies_hz
|
|
pmax = curve.interpolate(freq)
|
|
h_db = np.asarray(headroom["headroom_db_bin"], dtype=np.float64)
|
|
mask = np.isfinite(freq) & np.isfinite(pmax) & np.isfinite(h_db) & (freq >= 20.0) & (pmax > HEADROOM_EPS_W)
|
|
if np.count_nonzero(mask) < 2:
|
|
return
|
|
|
|
freq = freq[mask]
|
|
h_db = h_db[mask]
|
|
f_min = max(20.0, float(freq[0]))
|
|
f_max = float(freq[-1])
|
|
if f_max <= f_min:
|
|
return
|
|
log_f_min = math.log10(f_min)
|
|
log_f_max = math.log10(f_max)
|
|
|
|
y_min = min(-6.0, math.floor((float(np.nanmin(h_db)) - 3.0) / 3.0) * 3.0)
|
|
h_display_top = min(float(np.nanpercentile(h_db, 95.0)), 60.0)
|
|
y_max = max(12.0, math.ceil((h_display_top + 3.0) / 3.0) * 3.0)
|
|
if y_max <= y_min:
|
|
y_max = y_min + 18.0
|
|
h_plot = np.clip(h_db, y_min, y_max)
|
|
|
|
x = (np.log10(freq) - log_f_min) / (log_f_max - log_f_min) * (width - 1)
|
|
y = (y_max - h_plot) / (y_max - y_min) * (height - 1)
|
|
step = max(1, len(x) // max(width, 1))
|
|
points: list[float] = []
|
|
for px, py in zip(x[::step], y[::step]):
|
|
points.extend((float(np.clip(px, 0, width - 1)), float(np.clip(py, 0, height - 1))))
|
|
if len(points) >= 4:
|
|
canvas.create_line(points, fill="#1f77b4", width=1.5, smooth=False, tags="headroom")
|
|
|
|
def y_for_db(value: float) -> float:
|
|
return (y_max - value) / (y_max - y_min) * (height - 1)
|
|
|
|
for value, color, label in ((0.0, "#d62728", "0 dB"), (6.0, "#e6a400", "+6 dB")):
|
|
if y_min <= value <= y_max:
|
|
y_line = y_for_db(value)
|
|
canvas.create_line(0, y_line, width, y_line, fill=color, width=1.2, dash=(5, 3), tags="headroom")
|
|
canvas.create_text(width - 8, y_line - 2, anchor="se", text=label, fill=color, tags="headroom")
|
|
|
|
critical_f = float(headroom["critical_frequency_hz"])
|
|
if math.isfinite(critical_f) and f_min <= critical_f <= f_max:
|
|
x_line = (math.log10(critical_f) - log_f_min) / (log_f_max - log_f_min) * (width - 1)
|
|
canvas.create_line(x_line, 0, x_line, height, fill="#e6a400", width=1.4, dash=(4, 3), tags="headroom")
|
|
|
|
canvas.create_text(6, 22, anchor="nw", text=f"{y_max:g} dB", fill="#888888", tags="headroom")
|
|
canvas.create_text(6, height - 38, anchor="nw", text=f"{y_min:g} dB", fill="#888888", tags="headroom")
|
|
canvas.create_text(width - 8, 8, anchor="ne", text=f"{name} H(f,t)", fill="#1f77b4", tags="headroom")
|
|
self.headroom_status_var.set(
|
|
f"{name}: worst {headroom['worst_headroom_db']:+.1f} dB "
|
|
f"at {critical_f:.1f} Hz, {headroom['status']}"
|
|
)
|
|
|
|
def start(self) -> None:
|
|
if self.capture is not None:
|
|
return
|
|
|
|
try:
|
|
config = self._parse_gui_config()
|
|
except Exception as exc:
|
|
self.messagebox.showerror("Invalid input setup", str(exc))
|
|
return
|
|
|
|
capture = QueuedInputStream(
|
|
sd=self.sd,
|
|
device_index=config.device_index,
|
|
channels_zero_based=config.channels_zero_based,
|
|
samplerate=config.samplerate,
|
|
blocksize=config.blocksize,
|
|
max_queue_blocks=8,
|
|
)
|
|
|
|
try:
|
|
capture.start()
|
|
actual_samplerate = capture.actual_samplerate
|
|
self._parse_power_settings(config, actual_samplerate)
|
|
except Exception as exc:
|
|
capture.stop()
|
|
self.messagebox.showerror("Start failed", str(exc))
|
|
return
|
|
|
|
self.capture = capture
|
|
self.config = config
|
|
self._build_level_rows(config.channels_zero_based)
|
|
self.start_button.configure(state="disabled")
|
|
self.stop_button.configure(state="normal")
|
|
self.calibrate_button.configure(state="normal")
|
|
self.device_combo.configure(state="disabled")
|
|
self.status_var.set(f"Running at {actual_samplerate:g} Hz")
|
|
|
|
def stop(self) -> None:
|
|
if self.capture is not None:
|
|
self.capture.stop()
|
|
self.capture = None
|
|
self.config = None
|
|
self.calibration_state = None
|
|
self.input_spectrum_estimators = {}
|
|
self.input_spectrum_buffers = {}
|
|
self.input_spectrum_frames = {}
|
|
self.power_estimators = {}
|
|
self.power_buffers = {}
|
|
self.power_history = {}
|
|
self.power_max_hold_w = {}
|
|
self.latest_power_frames = {}
|
|
self.latest_headroom_frames = {}
|
|
self.headroom_curves = {}
|
|
self.headroom_smoothed_db = {}
|
|
self.power_history_start_s = None
|
|
self.power_hop_size = 0
|
|
for row in self.input_metric_rows.values():
|
|
for value_var in row.values():
|
|
value_var.set("-")
|
|
for row in self.power_rows.values():
|
|
for key in ("vdsp", "vamp", "p", "q", "s", "headroom", "crit_hz", "limit", "max_p"):
|
|
row[key].set("-")
|
|
row["meter"]["value"] = 0.0
|
|
row["meter"]["maximum"] = DEFAULT_POWER_CRITICAL_W
|
|
row["meter"].configure(style="PowerOk.Horizontal.TProgressbar")
|
|
row["light"].itemconfigure(row["light_id"], fill="#22a447")
|
|
self.draw_spectrum_grid()
|
|
self.draw_power_history_grid()
|
|
self.draw_power_limit_grid()
|
|
self.draw_headroom_grid()
|
|
self.start_button.configure(state="normal")
|
|
self.stop_button.configure(state="disabled")
|
|
self.calibrate_button.configure(state="disabled")
|
|
self.device_combo.configure(state="readonly")
|
|
self.status_var.set("Stopped")
|
|
|
|
def start_calibration(self) -> None:
|
|
if self.config is None or self.capture is None or np is None:
|
|
self.messagebox.showerror("Calibration unavailable", "Start the input monitor first.")
|
|
return
|
|
try:
|
|
known_rms_v = float(self.known_rms_v_var.get())
|
|
duration_s = float(self.calibration_duration_var.get())
|
|
except ValueError:
|
|
self.messagebox.showerror("Invalid calibration", "Cal signal Vrms and seconds must be numbers.")
|
|
return
|
|
if known_rms_v <= 0:
|
|
self.messagebox.showerror("Invalid calibration", "Cal signal Vrms must be positive.")
|
|
return
|
|
if duration_s <= 0:
|
|
self.messagebox.showerror("Invalid calibration", "Calibration duration must be positive.")
|
|
return
|
|
|
|
self.calibration_state = {
|
|
"known_rms_v": known_rms_v,
|
|
"deadline": time.monotonic() + duration_s,
|
|
"sums": np.zeros(len(self.config.channels_zero_based), dtype=np.float64),
|
|
"frames": 0,
|
|
"peak_fs": np.zeros(len(self.config.channels_zero_based), dtype=np.float64),
|
|
}
|
|
self.calibrate_button.configure(state="disabled")
|
|
self.status_var.set(f"Calibrating from {known_rms_v:g} Vrms...")
|
|
|
|
def _accumulate_calibration(self, block) -> None:
|
|
if self.calibration_state is None or self.config is None or np is None:
|
|
return
|
|
|
|
selected = block[:, self.config.channels_zero_based].astype(np.float64, copy=False)
|
|
self.calibration_state["sums"] += np.sum(selected * selected, axis=0)
|
|
self.calibration_state["frames"] += selected.shape[0]
|
|
self.calibration_state["peak_fs"] = np.maximum(
|
|
self.calibration_state["peak_fs"],
|
|
np.max(np.abs(selected), axis=0),
|
|
)
|
|
|
|
if time.monotonic() < self.calibration_state["deadline"]:
|
|
return
|
|
|
|
frames = self.calibration_state["frames"]
|
|
if frames <= 0:
|
|
self.calibration_state = None
|
|
self.calibrate_button.configure(state="normal")
|
|
self.messagebox.showerror("Calibration failed", "No input samples were captured.")
|
|
return
|
|
|
|
try:
|
|
measured_rms_fs = np.sqrt(self.calibration_state["sums"] / frames)
|
|
per_channel, average = compute_volts_per_sample_unit(
|
|
self.calibration_state["known_rms_v"],
|
|
measured_rms_fs,
|
|
)
|
|
except ValueError as exc:
|
|
self.calibration_state = None
|
|
self.calibrate_button.configure(state="normal")
|
|
self.messagebox.showerror("Calibration failed", str(exc))
|
|
return
|
|
|
|
peak_fs = self.calibration_state["peak_fs"]
|
|
known_rms_v = self.calibration_state["known_rms_v"]
|
|
self.calibration_state = None
|
|
self.calibration_var.set(f"{average:.9g}")
|
|
self.config = InputConfig(
|
|
device_index=self.config.device_index,
|
|
channels_zero_based=self.config.channels_zero_based,
|
|
samplerate=self.config.samplerate,
|
|
blocksize=self.config.blocksize,
|
|
input_volts_per_sample_unit=average,
|
|
update_interval_s=self.config.update_interval_s,
|
|
)
|
|
if self.capture is not None:
|
|
try:
|
|
self._parse_power_settings(self.config, self.capture.actual_samplerate)
|
|
except Exception as exc:
|
|
self.power_estimators = {}
|
|
self.power_buffers = {}
|
|
self.messagebox.showwarning(
|
|
"Power estimator disabled",
|
|
f"Voltage calibration was applied, but power estimation could not restart: {exc}",
|
|
)
|
|
self.draw_scope_grid()
|
|
self.calibrate_button.configure(state="normal")
|
|
|
|
cal_path = CalibrationStore().save(
|
|
volts_per_sample_unit=average,
|
|
known_rms_v=known_rms_v,
|
|
measured_rms_sample_by_channel={
|
|
str(channel + 1): float(measured_rms_fs[idx])
|
|
for idx, channel in enumerate(self.config.channels_zero_based)
|
|
},
|
|
peak_sample_by_channel={
|
|
str(channel + 1): float(peak_fs[idx])
|
|
for idx, channel in enumerate(self.config.channels_zero_based)
|
|
},
|
|
device_index=self.config.device_index,
|
|
channels_zero_based=self.config.channels_zero_based,
|
|
samplerate=self.config.samplerate,
|
|
source="gui",
|
|
)
|
|
|
|
details = []
|
|
for idx, channel in enumerate(self.config.channels_zero_based):
|
|
warning = " CLIP?" if peak_fs[idx] >= CLIPPING_WARNING_THRESHOLD_FS else ""
|
|
details.append(
|
|
f"Ch {channel + 1}: {measured_rms_fs[idx]:.9f} sample units RMS, "
|
|
f"{per_channel[idx]:.9g} V/sample unit, peak {peak_fs[idx]:.6f} FS{warning}"
|
|
)
|
|
self.status_var.set(f"Voltage scale applied: {average:.9g} V / sample unit")
|
|
self.messagebox.showinfo(
|
|
"Calibration complete",
|
|
"\n".join(details)
|
|
+ f"\n\nAverage applied: {average:.9g} V / sample unit"
|
|
+ f"\nFull-scale sine equivalent: {average / math.sqrt(2.0):.9g} Vrms"
|
|
+ f"\nSaved: {cal_path}",
|
|
)
|
|
|
|
def poll_audio_queue(self) -> None:
|
|
latest = None
|
|
while True:
|
|
try:
|
|
if self.capture is None:
|
|
break
|
|
latest = self.capture.queue.get_nowait()
|
|
except queue.Empty:
|
|
break
|
|
|
|
if latest is not None and self.config is not None and self.capture is not None:
|
|
block, status = latest
|
|
self._accumulate_calibration(block)
|
|
rms_fs, rms_v, peak_fs = compute_levels(
|
|
block,
|
|
self.config.channels_zero_based,
|
|
self.config.input_volts_per_sample_unit,
|
|
)
|
|
for idx, channel in enumerate(self.config.channels_zero_based):
|
|
row = self.input_metric_rows.get(channel)
|
|
if row is not None and channel not in self.input_spectrum_frames:
|
|
peak_v = float(peak_fs[idx] * self.config.input_volts_per_sample_unit)
|
|
crest = peak_v / float(rms_v[idx]) if rms_v[idx] > 0 else 0.0
|
|
crest_db = 20.0 * math.log10(crest) if crest > 0 else float("-inf")
|
|
row["rms"].set(f"{rms_v[idx]:.3f} V")
|
|
row["peak"].set(f"{peak_v:.3f} V")
|
|
row["crest"].set(f"{crest:.2f}x")
|
|
row["crest_db"].set(f"{crest_db:.2f} dB")
|
|
row["digital_peak"].set(f"{peak_fs[idx]:.5f} FS")
|
|
self._process_input_spectrum_block(block)
|
|
self._process_power_block(block)
|
|
self.draw_waveform(block)
|
|
duration_ms = 1000.0 * block.shape[0] / self.capture.actual_samplerate
|
|
self.scope_status_var.set(
|
|
f"Newest block: {block.shape[0]} samples, {duration_ms:.1f} ms, "
|
|
f"vertical scale +/-{self._scope_y_limit_v():g} V"
|
|
)
|
|
if status:
|
|
self.status_var.set(f"Running at {self.capture.actual_samplerate:g} Hz | {status}")
|
|
|
|
self.root.after(50, self.poll_audio_queue)
|
|
|
|
def close(self) -> None:
|
|
self.stop()
|
|
self.root.destroy()
|
|
|
|
def run(self) -> None:
|
|
self.root.mainloop()
|
|
|
|
|
|
def run_gui(args: argparse.Namespace, sd) -> None:
|
|
app = LevelMonitorGUI(sd, args)
|
|
app.run()
|
|
|
|
|
|
def build_arg_parser() -> argparse.ArgumentParser:
|
|
parser = argparse.ArgumentParser(
|
|
description="Monitor live soundcard input RMS/peak levels for DSP-output voltage."
|
|
)
|
|
parser.add_argument(
|
|
"--device",
|
|
type=int,
|
|
help="sounddevice input device index. If omitted, an interactive prompt is shown.",
|
|
)
|
|
parser.add_argument(
|
|
"--channels",
|
|
type=str,
|
|
help="One-based input channel list, e.g. '1' or '1,2'. If omitted, prompted.",
|
|
)
|
|
parser.add_argument(
|
|
"--samplerate",
|
|
type=float,
|
|
default=DEFAULT_SAMPLERATE,
|
|
help="Sample rate in Hz. Omit to use the device default.",
|
|
)
|
|
parser.add_argument(
|
|
"--blocksize",
|
|
type=int,
|
|
default=DEFAULT_BLOCKSIZE,
|
|
help=f"Audio block size in frames. Default: {DEFAULT_BLOCKSIZE}.",
|
|
)
|
|
parser.add_argument(
|
|
"--input-volts-per-sample-unit",
|
|
"--input-calibration",
|
|
dest="input_volts_per_sample_unit",
|
|
type=float,
|
|
default=None,
|
|
help=(
|
|
"Path-specific voltage scale in volts per captured sounddevice sample unit. "
|
|
"--input-calibration is accepted as a legacy alias. "
|
|
"Default: newest saved calibration, otherwise "
|
|
f"{DEFAULT_INPUT_VOLTS_PER_SAMPLE_UNIT:g}."
|
|
),
|
|
)
|
|
parser.add_argument(
|
|
"--calibrate-known-rms-v",
|
|
type=float,
|
|
help=(
|
|
"Run input calibration using a known true-RMS voltage at the selected input, "
|
|
"then apply the measured volts-per-sample-unit scale and start monitoring. "
|
|
"Use a steady REW sine tone, for example 1 kHz."
|
|
),
|
|
)
|
|
parser.add_argument(
|
|
"--calibrate-only",
|
|
action="store_true",
|
|
help="With --calibrate-known-rms-v, print the measured voltage scale and exit.",
|
|
)
|
|
parser.add_argument(
|
|
"--no-calibration-ready-prompt",
|
|
action="store_true",
|
|
help="Start calibration measurement immediately instead of waiting for Enter.",
|
|
)
|
|
parser.add_argument(
|
|
"--calibration-duration",
|
|
type=float,
|
|
default=DEFAULT_CALIBRATION_DURATION_S,
|
|
help=f"Calibration measurement duration in seconds. Default: {DEFAULT_CALIBRATION_DURATION_S:g}.",
|
|
)
|
|
parser.add_argument(
|
|
"--update-interval",
|
|
type=float,
|
|
default=DEFAULT_UPDATE_INTERVAL_S,
|
|
help=f"Console update interval in seconds. Default: {DEFAULT_UPDATE_INTERVAL_S:g}.",
|
|
)
|
|
parser.add_argument(
|
|
"--way",
|
|
type=PowerConsoleRunner.parse_way,
|
|
action="append",
|
|
help="Run power estimator for way NAME:CHANNEL:IMPEDANCE_FILE[:GAIN_DB], e.g. LF:1:impedance.txt:34.",
|
|
)
|
|
parser.add_argument("--lf-impedance", help="REW impedance export for LF power estimation.")
|
|
parser.add_argument(
|
|
"--lf-channel",
|
|
type=parse_channel,
|
|
default=0,
|
|
help="One-based LF input channel for power estimation. Default: 1.",
|
|
)
|
|
parser.add_argument(
|
|
"--lf-amp-gain-db",
|
|
type=float,
|
|
default=DEFAULT_AMP_GAIN_DB,
|
|
help=f"LF amplifier voltage gain in dB. Default: {DEFAULT_AMP_GAIN_DB:g}.",
|
|
)
|
|
parser.add_argument("--hf-impedance", help="REW impedance export for HF power estimation.")
|
|
parser.add_argument(
|
|
"--hf-channel",
|
|
type=parse_channel,
|
|
default=1,
|
|
help="One-based HF input channel for power estimation. Default: 2.",
|
|
)
|
|
parser.add_argument(
|
|
"--hf-amp-gain-db",
|
|
type=float,
|
|
default=DEFAULT_AMP_GAIN_DB,
|
|
help=f"HF amplifier voltage gain in dB. Default: {DEFAULT_AMP_GAIN_DB:g}.",
|
|
)
|
|
parser.add_argument(
|
|
"--power-fft-size",
|
|
"--fft-size",
|
|
dest="power_fft_size",
|
|
type=int,
|
|
default=DEFAULT_POWER_FFT_SIZE,
|
|
help=f"Power estimator FFT size. Default: {DEFAULT_POWER_FFT_SIZE}.",
|
|
)
|
|
parser.add_argument(
|
|
"--power-overlap",
|
|
"--overlap",
|
|
dest="power_overlap",
|
|
type=float,
|
|
default=DEFAULT_POWER_OVERLAP,
|
|
help=f"Power estimator overlap fraction. Default: {DEFAULT_POWER_OVERLAP:g}.",
|
|
)
|
|
parser.add_argument(
|
|
"--power-smoothing-alpha",
|
|
"--smoothing-alpha",
|
|
dest="power_smoothing_alpha",
|
|
type=float,
|
|
default=DEFAULT_POWER_SMOOTHING_ALPHA,
|
|
help=f"Power estimator exponential smoothing alpha. Default: {DEFAULT_POWER_SMOOTHING_ALPHA:g}.",
|
|
)
|
|
parser.add_argument(
|
|
"--power-update-interval",
|
|
type=float,
|
|
default=DEFAULT_POWER_UPDATE_INTERVAL_S,
|
|
help=f"Power estimator console update interval. Default: {DEFAULT_POWER_UPDATE_INTERVAL_S:g}.",
|
|
)
|
|
parser.add_argument(
|
|
"--winisd-max-power",
|
|
default=str(WINISD_MAX_POWER_FILE),
|
|
help=f"WinISD frequency-dependent max-power CSV for LF headroom. Default: {WINISD_MAX_POWER_FILE}.",
|
|
)
|
|
parser.add_argument(
|
|
"--disable-winisd-headroom",
|
|
action="store_true",
|
|
help="Disable WinISD max-power headroom monitoring.",
|
|
)
|
|
parser.add_argument(
|
|
"--headroom-smoothing-alpha",
|
|
type=float,
|
|
default=HEADROOM_SMOOTHING_ALPHA,
|
|
help=f"WinISD headroom smoothing alpha. Default: {HEADROOM_SMOOTHING_ALPHA:g}.",
|
|
)
|
|
parser.add_argument(
|
|
"--self-test-winisd",
|
|
action="store_true",
|
|
help="Run synthetic WinISD headroom tests and exit.",
|
|
)
|
|
parser.add_argument(
|
|
"--list-devices",
|
|
action="store_true",
|
|
help="List input devices and exit.",
|
|
)
|
|
parser.add_argument(
|
|
"--gui",
|
|
action="store_true",
|
|
help="Open the GUI with input-side voltage/PSD metrics and output-side power estimates.",
|
|
)
|
|
return parser
|
|
|
|
|
|
def main(argv: list[str] | None = None) -> int:
|
|
parser = build_arg_parser()
|
|
args = parser.parse_args(argv)
|
|
if args.input_volts_per_sample_unit is None:
|
|
args.input_volts_per_sample_unit = CalibrationStore().latest_scale()
|
|
|
|
if args.blocksize <= 0:
|
|
parser.error("--blocksize must be positive")
|
|
if args.input_volts_per_sample_unit <= 0:
|
|
parser.error("--input-volts-per-sample-unit must be positive")
|
|
if args.calibrate_known_rms_v is not None and args.calibrate_known_rms_v <= 0:
|
|
parser.error("--calibrate-known-rms-v must be positive")
|
|
if args.calibration_duration <= 0:
|
|
parser.error("--calibration-duration must be positive")
|
|
if args.update_interval <= 0:
|
|
parser.error("--update-interval must be positive")
|
|
if args.power_update_interval <= 0:
|
|
parser.error("--power-update-interval must be positive")
|
|
if args.power_fft_size <= 0:
|
|
parser.error("--power-fft-size must be positive")
|
|
if args.power_fft_size % 2:
|
|
parser.error("--power-fft-size must be even")
|
|
if not 0.0 <= args.power_overlap < 1.0:
|
|
parser.error("--power-overlap must be >= 0 and < 1")
|
|
if not 0.0 <= args.power_smoothing_alpha < 1.0:
|
|
parser.error("--power-smoothing-alpha must be >= 0 and < 1")
|
|
if not 0.0 <= args.headroom_smoothing_alpha < 1.0:
|
|
parser.error("--headroom-smoothing-alpha must be >= 0 and < 1")
|
|
if args.samplerate is not None and args.samplerate <= 0:
|
|
parser.error("--samplerate must be positive")
|
|
|
|
if args.self_test_winisd:
|
|
run_self_test()
|
|
for path in (Path(args.winisd_max_power), WINISD_EXCURSION_10W_FILE, WINISD_EXCURSION_100W_FILE):
|
|
if path.exists():
|
|
WinISDLimitCurve.from_file(path, eps_w=HEADROOM_EPS_W)
|
|
if WINISD_EXCURSION_10W_FILE.exists() and WINISD_EXCURSION_100W_FILE.exists():
|
|
print(check_excursion_scaling(WINISD_EXCURSION_10W_FILE, WINISD_EXCURSION_100W_FILE).message)
|
|
print("WinISD self-test OK")
|
|
return 0
|
|
|
|
sd = load_dependencies()
|
|
|
|
if args.list_devices:
|
|
SoundDeviceManager(sd).print_input_devices()
|
|
return 0
|
|
|
|
if args.gui:
|
|
run_gui(args, sd)
|
|
return 0
|
|
|
|
if args.way or args.lf_impedance or args.hf_impedance:
|
|
return PowerConsoleRunner(sd, args).run()
|
|
|
|
config = resolve_config(args, sd)
|
|
|
|
if args.calibrate_known_rms_v is not None:
|
|
learned_calibration, measured_rms_fs, peak_fs = run_input_calibration(
|
|
config,
|
|
sd,
|
|
args.calibrate_known_rms_v,
|
|
args.calibration_duration,
|
|
wait_for_ready=not args.no_calibration_ready_prompt,
|
|
)
|
|
cal_path = CalibrationStore().save(
|
|
volts_per_sample_unit=learned_calibration,
|
|
known_rms_v=args.calibrate_known_rms_v,
|
|
measured_rms_sample_by_channel={
|
|
str(channel + 1): float(measured_rms_fs[idx])
|
|
for idx, channel in enumerate(config.channels_zero_based)
|
|
},
|
|
peak_sample_by_channel={
|
|
str(channel + 1): float(peak_fs[idx])
|
|
for idx, channel in enumerate(config.channels_zero_based)
|
|
},
|
|
device_index=config.device_index,
|
|
channels_zero_based=config.channels_zero_based,
|
|
samplerate=config.samplerate,
|
|
source="cli",
|
|
)
|
|
print(f"Saved calibration: {cal_path}")
|
|
if args.calibrate_only:
|
|
return 0
|
|
config = InputConfig(
|
|
device_index=config.device_index,
|
|
channels_zero_based=config.channels_zero_based,
|
|
samplerate=config.samplerate,
|
|
blocksize=config.blocksize,
|
|
input_volts_per_sample_unit=learned_calibration,
|
|
update_interval_s=config.update_interval_s,
|
|
)
|
|
|
|
try:
|
|
run_capture(config, sd)
|
|
except KeyboardInterrupt:
|
|
print("\nStopped.")
|
|
return 0
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main(sys.argv[1:]))
|