motor-scan-server/old/motor_rigol_scan_csv.py
2026-01-28 14:40:48 +08:00

258 lines
8.1 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
import argparse
import csv
import os
import time
from datetime import datetime
from gpiozero import OutputDevice
from rigol_phase import DEV as RIGOL_DEV, measure_phase
# 这里的“step”按全步计不考虑细分。细分后会自动把全步换算为脉冲数。
BASE_STEPS_PER_MM = 200 # 200 full-steps = 1mm
MM_MIN = 0
MM_MAX = 300
class GPIOStepper:
def __init__(
self,
*,
dir_pin: int,
step_pin: int,
microsteps: int = 8,
) -> None:
if microsteps <= 0:
raise ValueError("microsteps must be a positive integer")
self.microsteps = int(microsteps)
# gpiozero 默认使用 BCM 编号
self._dir = OutputDevice(dir_pin)
self._step = OutputDevice(step_pin)
self._step.off()
def close(self) -> None:
try:
self._step.off()
except Exception:
pass
try:
self._dir.off()
except Exception:
pass
def ping(self) -> str:
return "GPIO"
def set_dir(self, inc: bool) -> None:
value = 1 if inc else 0
self._dir.value = value
def move_steps(self, full_steps: int, freq_hz: int) -> None:
if full_steps < 0:
raise ValueError("steps must be >= 0")
if freq_hz <= 0:
raise ValueError("freq_hz must be > 0")
pulses = int(full_steps) * self.microsteps
if pulses <= 0:
return
half_period = 0.5 / float(freq_hz)
for _ in range(pulses):
self._step.on()
time.sleep(half_period)
self._step.off()
time.sleep(half_period)
def _default_csv_path() -> str:
ts = datetime.now().strftime("%Y%m%d_%H%M%S")
return os.path.join(os.getcwd(), f"scan_{ts}.csv")
def _ensure_csv_header(path: str) -> None:
exists = os.path.exists(path) and os.path.getsize(path) > 0
if exists:
return
with open(path, "w", newline="", encoding="utf-8") as f:
w = csv.writer(f)
w.writerow(
[
"ts",
"mm",
"f0_hz",
"phi1_rad",
"phi2_rad",
"dphi_rad",
"dphi_deg",
"amp1_pp_adc",
"amp2_pp_adc",
"rigol_idn",
"points_mode",
"n",
]
)
def _append_row(path: str, *, mm: float, r: dict) -> None:
with open(path, "a", newline="", encoding="utf-8") as f:
w = csv.writer(f)
w.writerow(
[
r.get("ts"),
mm,
r.get("f0_hz"),
r.get("phi1_rad"),
r.get("phi2_rad"),
r.get("dphi_rad"),
r.get("dphi_deg"),
r.get("amp1_pp_adc"),
r.get("amp2_pp_adc"),
r.get("idn"),
r.get("points_mode"),
r.get("n"),
]
)
f.flush()
def main() -> None:
ap = argparse.ArgumentParser(description="电机每1mm移动一次并读取Rigol示波器数据写入CSV")
ap.add_argument("--mm", type=float, required=True, help="启动时装置当前位置(mm)范围0-300")
ap.add_argument("--end-mm", type=float, default=300.0, help="扫描终点(mm)默认300")
ap.add_argument(
"--step",
type=int,
default=40,
help="每步移动(全步 steps)默认40200 step = 1mm会按细分自动换算为脉冲",
)
ap.add_argument("--microsteps", type=int, default=8, help="驱动器细分倍数默认8")
ap.add_argument("--dir-pin", type=int, default=22, help="DIR 引脚(BCM编号)默认22")
ap.add_argument("--step-pin", type=int, default=27, help="STEP 引脚(BCM编号)默认27")
ap.add_argument("--move-freq", type=int, default=1200, help="MOVE命令使用的频率Hz默认500")
ap.add_argument("--settle-s", type=float, default=0.05, help="每次移动后等待再测量(秒)默认0.05")
ap.add_argument("--rigol-dev", default=RIGOL_DEV, help="Rigol设备路径(默认/dev/usbtmc0)")
ap.add_argument("--rigol-timeout", type=float, default=3.0)
ap.add_argument("--rigol-mode", default="NORM", choices=["NORM", "RAW", "MAX"], help="WAV:POIN:MODE")
ap.add_argument(
"--live",
action="store_true",
help="开启 Matplotlib 实时图形监视器(相位差/峰峰值/频率 + 进度条)",
)
ap.add_argument("--csv", default="", help="输出CSV路径默认当前目录scan_时间戳.csv")
args = ap.parse_args()
if args.step <= 0:
raise SystemExit("--step 必须是正整数")
if args.microsteps <= 0:
raise SystemExit("--microsteps 必须是正整数")
if not (MM_MIN <= args.mm <= MM_MAX):
raise SystemExit("--mm 超出范围 0-300")
if not (MM_MIN <= args.end_mm <= MM_MAX):
raise SystemExit("--end-mm 超出范围 0-300")
# 用“步数”作为内部位置单位,避免浮点累积误差。
# 这里的 steps 是“全步”;实际输出到 STEP 引脚时会乘以 microsteps。
start_steps = int(round(args.mm * BASE_STEPS_PER_MM))
end_steps = int(round(args.end_mm * BASE_STEPS_PER_MM))
step_steps = int(args.step)
csv_path = args.csv or _default_csv_path()
_ensure_csv_header(csv_path)
motor = GPIOStepper(
dir_pin=args.dir_pin,
step_pin=args.step_pin,
microsteps=args.microsteps,
)
live = None
if args.live:
try:
from live_monitor import LiveMonitor
start_mm = start_steps / BASE_STEPS_PER_MM
end_mm = end_steps / BASE_STEPS_PER_MM
live = LiveMonitor(start_mm=start_mm, end_mm=end_mm, csv_path=csv_path)
except Exception as e:
print(f"[!] Live monitor disabled: {e}")
live = None
def live_add(mm: float, meas: dict) -> None:
nonlocal live
if live is None:
return
try:
live.add_point(mm=mm, meas=meas)
except Exception as e:
print(f"[!] Live monitor error (disabled): {e}")
try:
live.close()
except Exception:
pass
live = None
try:
r = motor.ping()
print(
f"[+] Motor GPIO: DIR={args.dir_pin}, STEP={args.step_pin}, microsteps={args.microsteps} (PING -> {r})"
)
# 先测一次包含一次性IDN
start_mm = start_steps / BASE_STEPS_PER_MM
print(f"[+] Measure @ {start_mm:.3f}mm")
meas = measure_phase(dev=args.rigol_dev, timeout=args.rigol_timeout, points_mode=args.rigol_mode, fetch_idn=True)
_append_row(csv_path, mm=start_mm, r=meas)
live_add(start_mm, meas)
print(f"[+] CSV: {csv_path}")
cur_steps = start_steps
if cur_steps == end_steps:
print("[+] 已在终点,无需移动")
return
inc = end_steps > cur_steps
direction_str = "+" if inc else "-"
step_mm = step_steps / BASE_STEPS_PER_MM
end_mm = end_steps / BASE_STEPS_PER_MM
print(f"[+] Scan: {start_mm:.3f} -> {end_mm:.3f} (step {direction_str}{step_steps} steps = {step_mm:.3f}mm)")
while cur_steps != end_steps:
motor.set_dir(inc=inc)
remaining = (end_steps - cur_steps) if inc else (cur_steps - end_steps)
move = step_steps if step_steps <= remaining else remaining
motor.move_steps(move, args.move_freq)
time.sleep(args.settle_s)
cur_steps = cur_steps + (move if inc else -move)
cur_mm = cur_steps / BASE_STEPS_PER_MM
print(f"[+] Measure @ {cur_mm:.3f}mm")
meas = measure_phase(dev=args.rigol_dev, timeout=args.rigol_timeout, points_mode=args.rigol_mode, fetch_idn=False)
_append_row(csv_path, mm=cur_mm, r=meas)
live_add(cur_mm, meas)
except KeyboardInterrupt:
print("\n[!] Interrupted")
finally:
motor.close()
if live is not None:
try:
live.close()
except Exception:
pass
if __name__ == "__main__":
main()