motor-scan-server/old/visualize_scan_csv.py
2026-01-28 14:40:48 +08:00

229 lines
7.2 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
from __future__ import annotations
import argparse
import csv
import glob
import math
import os
from dataclasses import dataclass
import numpy as np
def _should_use_agg(save_path: str | None) -> bool:
if save_path:
return True
# Headless environment
return not (os.getenv("DISPLAY") or os.getenv("WAYLAND_DISPLAY"))
def _parse_float(x: object) -> float:
try:
if x is None:
return float("nan")
s = str(x).strip()
if not s:
return float("nan")
return float(s)
except Exception:
return float("nan")
@dataclass
class ScanSeries:
label: str
mm: np.ndarray
f0_hz: np.ndarray
dphi_rad: np.ndarray
dphi_deg: np.ndarray
amp1_pp_adc: np.ndarray
amp2_pp_adc: np.ndarray
def load_scan_csv(path: str, *, label: str | None = None) -> ScanSeries:
mm: list[float] = []
f0_hz: list[float] = []
dphi_rad: list[float] = []
dphi_deg: list[float] = []
amp1: list[float] = []
amp2: list[float] = []
with open(path, "r", encoding="utf-8", newline="") as f:
r = csv.DictReader(f)
for row in r:
mm.append(_parse_float(row.get("mm")))
f0_hz.append(_parse_float(row.get("f0_hz")))
dphi_rad.append(_parse_float(row.get("dphi_rad")))
dphi_deg.append(_parse_float(row.get("dphi_deg")))
amp1.append(_parse_float(row.get("amp1_pp_adc")))
amp2.append(_parse_float(row.get("amp2_pp_adc")))
arr_mm = np.asarray(mm, dtype=float)
order = np.argsort(arr_mm)
series_label = label or os.path.basename(path)
return ScanSeries(
label=series_label,
mm=arr_mm[order],
f0_hz=np.asarray(f0_hz, dtype=float)[order],
dphi_rad=np.asarray(dphi_rad, dtype=float)[order],
dphi_deg=np.asarray(dphi_deg, dtype=float)[order],
amp1_pp_adc=np.asarray(amp1, dtype=float)[order],
amp2_pp_adc=np.asarray(amp2, dtype=float)[order],
)
def nan_moving_average(y: np.ndarray, window: int) -> np.ndarray:
if window <= 1:
return y
y = np.asarray(y, dtype=float)
n = y.size
out = np.full(n, np.nan, dtype=float)
half = window // 2
for i in range(n):
lo = max(0, i - half)
hi = min(n, i + half + 1)
out[i] = np.nanmean(y[lo:hi])
return out
def _mask_range(x: np.ndarray, *, x_min: float | None, x_max: float | None) -> np.ndarray:
m = np.isfinite(x)
if x_min is not None:
m &= x >= x_min
if x_max is not None:
m &= x <= x_max
return m
def _get_pingfang_font_properties(ttf_path: str):
"""Return (FontProperties, name) for PingFang SC from a TTF file."""
from matplotlib import font_manager
if not os.path.exists(ttf_path):
raise FileNotFoundError(
f"未找到字体文件:{ttf_path}(请确认 PingFangSC-Regular.ttf 已放在项目目录,或已安装到 ~/.local/share/fonts"
)
# Ensure Matplotlib can see this font even if fontconfig cache is stale.
try:
font_manager.fontManager.addfont(ttf_path)
except Exception:
pass
fp = font_manager.FontProperties(fname=ttf_path)
return fp, fp.get_name()
def main() -> None:
ap = argparse.ArgumentParser(description="可视化 motor_rigol_scan_csv.py 输出的扫描 CSV")
ap.add_argument(
"csv",
nargs="+",
help="CSV 路径(支持通配符),例如 scan_*.csv",
)
ap.add_argument("--unwrap", action="store_true", help="对相位差 dphi 做 unwrap基于 dphi_rad")
ap.add_argument("--smooth", type=int, default=1, help="平滑窗口(点数)默认1=不平滑")
ap.add_argument("--mm-min", type=float, default=None, help="仅显示 mm >= mm-min")
ap.add_argument("--mm-max", type=float, default=None, help="仅显示 mm <= mm-max")
ap.add_argument("--save", default="", help="保存为图片(PNG/PDF等),例如 out.png为空则弹窗显示")
ap.add_argument("--title", default="Motor Scan", help="图标题")
args = ap.parse_args()
paths: list[str] = []
for p in args.csv:
expanded = sorted(glob.glob(p))
if expanded:
paths.extend(expanded)
else:
paths.append(p)
if not paths:
raise SystemExit("未找到任何 CSV")
save_path = args.save.strip() or None
if _should_use_agg(save_path):
import matplotlib
matplotlib.use("Agg")
import matplotlib
pingfang_ttf = os.path.join(os.path.dirname(__file__), "PingFangSC-Regular.ttf")
cjk_fp, cjk_name = _get_pingfang_font_properties(pingfang_ttf)
matplotlib.rcParams["font.family"] = cjk_name
matplotlib.rcParams["axes.unicode_minus"] = False
print(f"[+] Font: {cjk_name} ({pingfang_ttf})")
import matplotlib.pyplot as plt # type: ignore
series_list: list[ScanSeries] = []
for path in paths:
if not os.path.exists(path):
raise SystemExit(f"CSV 不存在:{path}")
series_list.append(load_scan_csv(path))
fig = plt.figure(figsize=(12, 5.5))
gs = fig.add_gridspec(1, 2)
ax_phase = fig.add_subplot(gs[0, 0])
ax_amp = fig.add_subplot(gs[0, 1])
ax_phase.set_title("相位差", fontproperties=cjk_fp)
ax_phase.set_xlabel("位置 (mm)", fontproperties=cjk_fp)
ax_phase.set_ylabel("相位差 (度)", fontproperties=cjk_fp)
ax_phase.grid(True, alpha=0.3)
ax_amp.set_title("峰峰值", fontproperties=cjk_fp)
ax_amp.set_xlabel("位置 (mm)", fontproperties=cjk_fp)
ax_amp.set_ylabel("峰峰值 (ADC)", fontproperties=cjk_fp)
ax_amp.grid(True, alpha=0.3)
for s in series_list:
mm_mask = _mask_range(s.mm, x_min=args.mm_min, x_max=args.mm_max)
mm = s.mm[mm_mask]
amp1 = s.amp1_pp_adc[mm_mask]
amp2 = s.amp2_pp_adc[mm_mask]
# phase (deg)
if args.unwrap:
dphi_r = s.dphi_rad[mm_mask]
dphi_r_unwrapped = np.unwrap(dphi_r)
dphi_deg = dphi_r_unwrapped * 180.0 / math.pi
else:
dphi_deg = s.dphi_deg[mm_mask]
# scatter raw
ax_phase.plot(mm, dphi_deg, ".", ms=3, alpha=0.6, label=s.label)
ax_amp.plot(mm, amp1, ".", ms=3, alpha=0.35, label=f"{s.label} CH1")
ax_amp.plot(mm, amp2, ".", ms=3, alpha=0.35, label=f"{s.label} CH2")
# smoothed overlay
if args.smooth and args.smooth > 1:
ax_phase.plot(mm, nan_moving_average(dphi_deg, args.smooth), "-", lw=1.2, alpha=0.9)
ax_amp.plot(mm, nan_moving_average(amp1, args.smooth), "-", lw=1.2, alpha=0.9)
ax_amp.plot(mm, nan_moving_average(amp2, args.smooth), "-", lw=1.2, alpha=0.9)
# legends (avoid huge)
ax_phase.legend(loc="best", fontsize=8)
ax_amp.legend(loc="best", fontsize=7, ncols=2)
subtitle = " | ".join([s.label for s in series_list])
# 中文标题使用 CJK 字体subtitle 一般是文件名/ASCII用默认字体。
fig.suptitle(f"{args.title}", fontproperties=cjk_fp)
fig.text(0.5, 0.965, subtitle, ha="center", va="top", fontsize=9)
fig.tight_layout(rect=(0, 0, 1, 0.95))
if save_path:
fig.savefig(save_path, dpi=160)
print(f"[+] Saved: {save_path}")
else:
plt.show()
if __name__ == "__main__":
main()