#!/usr/bin/env python3 import argparse import csv import os import time from datetime import datetime from gpiozero import OutputDevice from rigol_phase import DEV as RIGOL_DEV, measure_phase # 这里的“step”按全步计(不考虑细分)。细分后会自动把全步换算为脉冲数。 BASE_STEPS_PER_MM = 200 # 200 full-steps = 1mm MM_MIN = 0 MM_MAX = 300 class GPIOStepper: def __init__( self, *, dir_pin: int, step_pin: int, microsteps: int = 8, ) -> None: if microsteps <= 0: raise ValueError("microsteps must be a positive integer") self.microsteps = int(microsteps) # gpiozero 默认使用 BCM 编号 self._dir = OutputDevice(dir_pin) self._step = OutputDevice(step_pin) self._step.off() def close(self) -> None: try: self._step.off() except Exception: pass try: self._dir.off() except Exception: pass def ping(self) -> str: return "GPIO" def set_dir(self, inc: bool) -> None: value = 1 if inc else 0 self._dir.value = value def move_steps(self, full_steps: int, freq_hz: int) -> None: if full_steps < 0: raise ValueError("steps must be >= 0") if freq_hz <= 0: raise ValueError("freq_hz must be > 0") pulses = int(full_steps) * self.microsteps if pulses <= 0: return half_period = 0.5 / float(freq_hz) for _ in range(pulses): self._step.on() time.sleep(half_period) self._step.off() time.sleep(half_period) def _default_csv_path() -> str: ts = datetime.now().strftime("%Y%m%d_%H%M%S") return os.path.join(os.getcwd(), f"scan_{ts}.csv") def _ensure_csv_header(path: str) -> None: exists = os.path.exists(path) and os.path.getsize(path) > 0 if exists: return with open(path, "w", newline="", encoding="utf-8") as f: w = csv.writer(f) w.writerow( [ "ts", "mm", "f0_hz", "phi1_rad", "phi2_rad", "dphi_rad", "dphi_deg", "amp1_pp_adc", "amp2_pp_adc", "rigol_idn", "points_mode", "n", ] ) def _append_row(path: str, *, mm: float, r: dict) -> None: with open(path, "a", newline="", encoding="utf-8") as f: w = csv.writer(f) w.writerow( [ r.get("ts"), mm, r.get("f0_hz"), r.get("phi1_rad"), r.get("phi2_rad"), r.get("dphi_rad"), r.get("dphi_deg"), r.get("amp1_pp_adc"), r.get("amp2_pp_adc"), r.get("idn"), r.get("points_mode"), r.get("n"), ] ) f.flush() def main() -> None: ap = argparse.ArgumentParser(description="电机每1mm移动一次,并读取Rigol示波器数据写入CSV") ap.add_argument("--mm", type=float, required=True, help="启动时装置当前位置(mm),范围0-300") ap.add_argument("--end-mm", type=float, default=300.0, help="扫描终点(mm),默认300") ap.add_argument( "--step", type=int, default=40, help="每步移动(全步 steps),默认40(200 step = 1mm;会按细分自动换算为脉冲)", ) ap.add_argument("--microsteps", type=int, default=8, help="驱动器细分倍数,默认8") ap.add_argument("--dir-pin", type=int, default=22, help="DIR 引脚(BCM编号),默认22") ap.add_argument("--step-pin", type=int, default=27, help="STEP 引脚(BCM编号),默认27") ap.add_argument("--move-freq", type=int, default=1200, help="MOVE命令使用的频率Hz,默认500") ap.add_argument("--settle-s", type=float, default=0.05, help="每次移动后等待再测量(秒),默认0.05") ap.add_argument("--rigol-dev", default=RIGOL_DEV, help="Rigol设备路径(默认/dev/usbtmc0)") ap.add_argument("--rigol-timeout", type=float, default=3.0) ap.add_argument("--rigol-mode", default="NORM", choices=["NORM", "RAW", "MAX"], help="WAV:POIN:MODE") ap.add_argument( "--live", action="store_true", help="开启 Matplotlib 实时图形监视器(相位差/峰峰值/频率 + 进度条)", ) ap.add_argument("--csv", default="", help="输出CSV路径,默认当前目录scan_时间戳.csv") args = ap.parse_args() if args.step <= 0: raise SystemExit("--step 必须是正整数") if args.microsteps <= 0: raise SystemExit("--microsteps 必须是正整数") if not (MM_MIN <= args.mm <= MM_MAX): raise SystemExit("--mm 超出范围 0-300") if not (MM_MIN <= args.end_mm <= MM_MAX): raise SystemExit("--end-mm 超出范围 0-300") # 用“步数”作为内部位置单位,避免浮点累积误差。 # 这里的 steps 是“全步”;实际输出到 STEP 引脚时会乘以 microsteps。 start_steps = int(round(args.mm * BASE_STEPS_PER_MM)) end_steps = int(round(args.end_mm * BASE_STEPS_PER_MM)) step_steps = int(args.step) csv_path = args.csv or _default_csv_path() _ensure_csv_header(csv_path) motor = GPIOStepper( dir_pin=args.dir_pin, step_pin=args.step_pin, microsteps=args.microsteps, ) live = None if args.live: try: from live_monitor import LiveMonitor start_mm = start_steps / BASE_STEPS_PER_MM end_mm = end_steps / BASE_STEPS_PER_MM live = LiveMonitor(start_mm=start_mm, end_mm=end_mm, csv_path=csv_path) except Exception as e: print(f"[!] Live monitor disabled: {e}") live = None def live_add(mm: float, meas: dict) -> None: nonlocal live if live is None: return try: live.add_point(mm=mm, meas=meas) except Exception as e: print(f"[!] Live monitor error (disabled): {e}") try: live.close() except Exception: pass live = None try: r = motor.ping() print( f"[+] Motor GPIO: DIR={args.dir_pin}, STEP={args.step_pin}, microsteps={args.microsteps} (PING -> {r})" ) # 先测一次(包含一次性IDN) start_mm = start_steps / BASE_STEPS_PER_MM print(f"[+] Measure @ {start_mm:.3f}mm") meas = measure_phase(dev=args.rigol_dev, timeout=args.rigol_timeout, points_mode=args.rigol_mode, fetch_idn=True) _append_row(csv_path, mm=start_mm, r=meas) live_add(start_mm, meas) print(f"[+] CSV: {csv_path}") cur_steps = start_steps if cur_steps == end_steps: print("[+] 已在终点,无需移动") return inc = end_steps > cur_steps direction_str = "+" if inc else "-" step_mm = step_steps / BASE_STEPS_PER_MM end_mm = end_steps / BASE_STEPS_PER_MM print(f"[+] Scan: {start_mm:.3f} -> {end_mm:.3f} (step {direction_str}{step_steps} steps = {step_mm:.3f}mm)") while cur_steps != end_steps: motor.set_dir(inc=inc) remaining = (end_steps - cur_steps) if inc else (cur_steps - end_steps) move = step_steps if step_steps <= remaining else remaining motor.move_steps(move, args.move_freq) time.sleep(args.settle_s) cur_steps = cur_steps + (move if inc else -move) cur_mm = cur_steps / BASE_STEPS_PER_MM print(f"[+] Measure @ {cur_mm:.3f}mm") meas = measure_phase(dev=args.rigol_dev, timeout=args.rigol_timeout, points_mode=args.rigol_mode, fetch_idn=False) _append_row(csv_path, mm=cur_mm, r=meas) live_add(cur_mm, meas) except KeyboardInterrupt: print("\n[!] Interrupted") finally: motor.close() if live is not None: try: live.close() except Exception: pass if __name__ == "__main__": main()