#!/usr/bin/env python3 from __future__ import annotations import argparse import csv import glob import math import os from dataclasses import dataclass import numpy as np def _should_use_agg(save_path: str | None) -> bool: if save_path: return True # Headless environment return not (os.getenv("DISPLAY") or os.getenv("WAYLAND_DISPLAY")) def _parse_float(x: object) -> float: try: if x is None: return float("nan") s = str(x).strip() if not s: return float("nan") return float(s) except Exception: return float("nan") @dataclass class ScanSeries: label: str mm: np.ndarray f0_hz: np.ndarray dphi_rad: np.ndarray dphi_deg: np.ndarray amp1_pp_adc: np.ndarray amp2_pp_adc: np.ndarray def load_scan_csv(path: str, *, label: str | None = None) -> ScanSeries: mm: list[float] = [] f0_hz: list[float] = [] dphi_rad: list[float] = [] dphi_deg: list[float] = [] amp1: list[float] = [] amp2: list[float] = [] with open(path, "r", encoding="utf-8", newline="") as f: r = csv.DictReader(f) for row in r: mm.append(_parse_float(row.get("mm"))) f0_hz.append(_parse_float(row.get("f0_hz"))) dphi_rad.append(_parse_float(row.get("dphi_rad"))) dphi_deg.append(_parse_float(row.get("dphi_deg"))) amp1.append(_parse_float(row.get("amp1_pp_adc"))) amp2.append(_parse_float(row.get("amp2_pp_adc"))) arr_mm = np.asarray(mm, dtype=float) order = np.argsort(arr_mm) series_label = label or os.path.basename(path) return ScanSeries( label=series_label, mm=arr_mm[order], f0_hz=np.asarray(f0_hz, dtype=float)[order], dphi_rad=np.asarray(dphi_rad, dtype=float)[order], dphi_deg=np.asarray(dphi_deg, dtype=float)[order], amp1_pp_adc=np.asarray(amp1, dtype=float)[order], amp2_pp_adc=np.asarray(amp2, dtype=float)[order], ) def nan_moving_average(y: np.ndarray, window: int) -> np.ndarray: if window <= 1: return y y = np.asarray(y, dtype=float) n = y.size out = np.full(n, np.nan, dtype=float) half = window // 2 for i in range(n): lo = max(0, i - half) hi = min(n, i + half + 1) out[i] = np.nanmean(y[lo:hi]) return out def _mask_range(x: np.ndarray, *, x_min: float | None, x_max: float | None) -> np.ndarray: m = np.isfinite(x) if x_min is not None: m &= x >= x_min if x_max is not None: m &= x <= x_max return m def _get_pingfang_font_properties(ttf_path: str): """Return (FontProperties, name) for PingFang SC from a TTF file.""" from matplotlib import font_manager if not os.path.exists(ttf_path): raise FileNotFoundError( f"未找到字体文件:{ttf_path}(请确认 PingFangSC-Regular.ttf 已放在项目目录,或已安装到 ~/.local/share/fonts)" ) # Ensure Matplotlib can see this font even if fontconfig cache is stale. try: font_manager.fontManager.addfont(ttf_path) except Exception: pass fp = font_manager.FontProperties(fname=ttf_path) return fp, fp.get_name() def main() -> None: ap = argparse.ArgumentParser(description="可视化 motor_rigol_scan_csv.py 输出的扫描 CSV") ap.add_argument( "csv", nargs="+", help="CSV 路径(支持通配符),例如 scan_*.csv", ) ap.add_argument("--unwrap", action="store_true", help="对相位差 dphi 做 unwrap(基于 dphi_rad)") ap.add_argument("--smooth", type=int, default=1, help="平滑窗口(点数),默认1=不平滑") ap.add_argument("--mm-min", type=float, default=None, help="仅显示 mm >= mm-min") ap.add_argument("--mm-max", type=float, default=None, help="仅显示 mm <= mm-max") ap.add_argument("--save", default="", help="保存为图片(PNG/PDF等),例如 out.png;为空则弹窗显示") ap.add_argument("--title", default="Motor Scan", help="图标题") args = ap.parse_args() paths: list[str] = [] for p in args.csv: expanded = sorted(glob.glob(p)) if expanded: paths.extend(expanded) else: paths.append(p) if not paths: raise SystemExit("未找到任何 CSV") save_path = args.save.strip() or None if _should_use_agg(save_path): import matplotlib matplotlib.use("Agg") import matplotlib pingfang_ttf = os.path.join(os.path.dirname(__file__), "PingFangSC-Regular.ttf") cjk_fp, cjk_name = _get_pingfang_font_properties(pingfang_ttf) matplotlib.rcParams["font.family"] = cjk_name matplotlib.rcParams["axes.unicode_minus"] = False print(f"[+] Font: {cjk_name} ({pingfang_ttf})") import matplotlib.pyplot as plt # type: ignore series_list: list[ScanSeries] = [] for path in paths: if not os.path.exists(path): raise SystemExit(f"CSV 不存在:{path}") series_list.append(load_scan_csv(path)) fig = plt.figure(figsize=(12, 5.5)) gs = fig.add_gridspec(1, 2) ax_phase = fig.add_subplot(gs[0, 0]) ax_amp = fig.add_subplot(gs[0, 1]) ax_phase.set_title("相位差", fontproperties=cjk_fp) ax_phase.set_xlabel("位置 (mm)", fontproperties=cjk_fp) ax_phase.set_ylabel("相位差 (度)", fontproperties=cjk_fp) ax_phase.grid(True, alpha=0.3) ax_amp.set_title("峰峰值", fontproperties=cjk_fp) ax_amp.set_xlabel("位置 (mm)", fontproperties=cjk_fp) ax_amp.set_ylabel("峰峰值 (ADC)", fontproperties=cjk_fp) ax_amp.grid(True, alpha=0.3) for s in series_list: mm_mask = _mask_range(s.mm, x_min=args.mm_min, x_max=args.mm_max) mm = s.mm[mm_mask] amp1 = s.amp1_pp_adc[mm_mask] amp2 = s.amp2_pp_adc[mm_mask] # phase (deg) if args.unwrap: dphi_r = s.dphi_rad[mm_mask] dphi_r_unwrapped = np.unwrap(dphi_r) dphi_deg = dphi_r_unwrapped * 180.0 / math.pi else: dphi_deg = s.dphi_deg[mm_mask] # scatter raw ax_phase.plot(mm, dphi_deg, ".", ms=3, alpha=0.6, label=s.label) ax_amp.plot(mm, amp1, ".", ms=3, alpha=0.35, label=f"{s.label} CH1") ax_amp.plot(mm, amp2, ".", ms=3, alpha=0.35, label=f"{s.label} CH2") # smoothed overlay if args.smooth and args.smooth > 1: ax_phase.plot(mm, nan_moving_average(dphi_deg, args.smooth), "-", lw=1.2, alpha=0.9) ax_amp.plot(mm, nan_moving_average(amp1, args.smooth), "-", lw=1.2, alpha=0.9) ax_amp.plot(mm, nan_moving_average(amp2, args.smooth), "-", lw=1.2, alpha=0.9) # legends (avoid huge) ax_phase.legend(loc="best", fontsize=8) ax_amp.legend(loc="best", fontsize=7, ncols=2) subtitle = " | ".join([s.label for s in series_list]) # 中文标题使用 CJK 字体;subtitle 一般是文件名/ASCII,用默认字体。 fig.suptitle(f"{args.title}", fontproperties=cjk_fp) fig.text(0.5, 0.965, subtitle, ha="center", va="top", fontsize=9) fig.tight_layout(rect=(0, 0, 1, 0.95)) if save_path: fig.savefig(save_path, dpi=160) print(f"[+] Saved: {save_path}") else: plt.show() if __name__ == "__main__": main()