171 lines
5.7 KiB
Python
171 lines
5.7 KiB
Python
#!/usr/bin/env python3
|
||
from __future__ import annotations
|
||
|
||
from dataclasses import dataclass, field
|
||
from typing import Optional
|
||
|
||
|
||
@dataclass
|
||
class LiveMonitor:
|
||
start_mm: float
|
||
end_mm: float
|
||
csv_path: str = ""
|
||
title: str = "Motor Scan Live Monitor"
|
||
_plt: object = field(init=False, repr=False)
|
||
_fig: object = field(init=False, repr=False)
|
||
_ax_phase: object = field(init=False, repr=False)
|
||
_ax_amp: object = field(init=False, repr=False)
|
||
_ax_freq: object = field(init=False, repr=False)
|
||
_ax_prog: object = field(init=False, repr=False)
|
||
_line_phase: object = field(init=False, repr=False)
|
||
_line_amp1: object = field(init=False, repr=False)
|
||
_line_amp2: object = field(init=False, repr=False)
|
||
_line_freq: object = field(init=False, repr=False)
|
||
_prog_bar: object = field(init=False, repr=False)
|
||
_prog_text: object = field(init=False, repr=False)
|
||
_mm: list[float] = field(default_factory=list, repr=False)
|
||
_dphi_deg: list[float] = field(default_factory=list, repr=False)
|
||
_amp1_pp: list[float] = field(default_factory=list, repr=False)
|
||
_amp2_pp: list[float] = field(default_factory=list, repr=False)
|
||
_f0_hz: list[float] = field(default_factory=list, repr=False)
|
||
|
||
def __post_init__(self) -> None:
|
||
try:
|
||
import matplotlib.pyplot as plt # type: ignore
|
||
except Exception as e: # pragma: no cover
|
||
raise RuntimeError(
|
||
"Matplotlib 不可用;请先安装 matplotlib,或不要使用 --live"
|
||
) from e
|
||
|
||
self._plt = plt
|
||
plt.ion()
|
||
|
||
self._fig = plt.figure(figsize=(12, 7))
|
||
try:
|
||
self._fig.canvas.manager.set_window_title(self.title)
|
||
except Exception:
|
||
pass
|
||
|
||
gs = self._fig.add_gridspec(2, 2)
|
||
self._ax_phase = self._fig.add_subplot(gs[0, 0])
|
||
self._ax_amp = self._fig.add_subplot(gs[0, 1])
|
||
self._ax_freq = self._fig.add_subplot(gs[1, 0])
|
||
self._ax_prog = self._fig.add_subplot(gs[1, 1])
|
||
|
||
(self._line_phase,) = self._ax_phase.plot([], [], lw=1.5)
|
||
self._ax_phase.set_title("相位差 dphi")
|
||
self._ax_phase.set_xlabel("位置 (mm)")
|
||
self._ax_phase.set_ylabel("dphi (deg)")
|
||
self._ax_phase.grid(True, alpha=0.3)
|
||
|
||
(self._line_amp1,) = self._ax_amp.plot([], [], lw=1.5, label="CH1 pp")
|
||
(self._line_amp2,) = self._ax_amp.plot([], [], lw=1.5, label="CH2 pp")
|
||
self._ax_amp.set_title("峰峰值 (ADC counts)")
|
||
self._ax_amp.set_xlabel("位置 (mm)")
|
||
self._ax_amp.set_ylabel("pp")
|
||
self._ax_amp.grid(True, alpha=0.3)
|
||
self._ax_amp.legend(loc="best")
|
||
|
||
(self._line_freq,) = self._ax_freq.plot([], [], lw=1.5)
|
||
self._ax_freq.set_title("频率 f0")
|
||
self._ax_freq.set_xlabel("位置 (mm)")
|
||
self._ax_freq.set_ylabel("Hz")
|
||
self._ax_freq.grid(True, alpha=0.3)
|
||
|
||
self._ax_prog.set_title("扫描进度")
|
||
self._ax_prog.set_xlim(0.0, 1.0)
|
||
self._ax_prog.set_ylim(-0.75, 0.75)
|
||
self._ax_prog.set_yticks([])
|
||
self._ax_prog.set_xlabel("完成比例")
|
||
self._prog_bar = self._ax_prog.barh([0], [0.0], height=0.35)[0]
|
||
self._prog_text = self._ax_prog.text(
|
||
0.01,
|
||
0.0,
|
||
"",
|
||
va="center",
|
||
ha="left",
|
||
transform=self._ax_prog.transData,
|
||
)
|
||
|
||
subtitle = f"{self.start_mm:.3f} -> {self.end_mm:.3f} mm"
|
||
if self.csv_path:
|
||
subtitle += f" | CSV: {self.csv_path}"
|
||
self._fig.suptitle(subtitle)
|
||
|
||
self._fig.tight_layout(rect=(0, 0, 1, 0.95))
|
||
self._flush()
|
||
|
||
def close(self) -> None:
|
||
try:
|
||
self._plt.close(self._fig)
|
||
except Exception:
|
||
pass
|
||
|
||
def add_point(self, *, mm: float, meas: dict) -> None:
|
||
self._mm.append(float(mm))
|
||
self._dphi_deg.append(_to_float(meas.get("dphi_deg")))
|
||
self._amp1_pp.append(_to_float(meas.get("amp1_pp_adc")))
|
||
self._amp2_pp.append(_to_float(meas.get("amp2_pp_adc")))
|
||
self._f0_hz.append(_to_float(meas.get("f0_hz")))
|
||
|
||
self._line_phase.set_data(self._mm, self._dphi_deg)
|
||
self._line_amp1.set_data(self._mm, self._amp1_pp)
|
||
self._line_amp2.set_data(self._mm, self._amp2_pp)
|
||
self._line_freq.set_data(self._mm, self._f0_hz)
|
||
|
||
self._autoscale(self._ax_phase)
|
||
self._autoscale(self._ax_amp)
|
||
self._autoscale(self._ax_freq)
|
||
self._update_progress(cur_mm=mm)
|
||
|
||
self._fig.canvas.draw_idle()
|
||
self._flush()
|
||
|
||
def _autoscale(self, ax: object) -> None:
|
||
try:
|
||
ax.relim()
|
||
ax.autoscale_view()
|
||
except Exception:
|
||
pass
|
||
|
||
def _update_progress(self, *, cur_mm: float) -> None:
|
||
total = self.end_mm - self.start_mm
|
||
if total == 0:
|
||
ratio = 1.0
|
||
else:
|
||
ratio = (cur_mm - self.start_mm) / total
|
||
if ratio < 0.0:
|
||
ratio = 0.0
|
||
if ratio > 1.0:
|
||
ratio = 1.0
|
||
|
||
try:
|
||
self._prog_bar.set_width(ratio)
|
||
except Exception:
|
||
pass
|
||
|
||
txt = f"{ratio * 100.0:6.2f}% | {cur_mm:.3f} mm"
|
||
try:
|
||
self._prog_text.set_text(txt)
|
||
self._prog_text.set_x(min(0.98, ratio + 0.01))
|
||
except Exception:
|
||
pass
|
||
|
||
def _flush(self) -> None:
|
||
try:
|
||
self._fig.canvas.flush_events()
|
||
except Exception:
|
||
pass
|
||
try:
|
||
# pause is required to keep GUI responsive
|
||
self._plt.pause(0.001)
|
||
except Exception:
|
||
pass
|
||
|
||
|
||
def _to_float(x: Optional[object]) -> float:
|
||
try:
|
||
return float(x) # type: ignore[arg-type]
|
||
except Exception:
|
||
return float("nan")
|