258 lines
8.1 KiB
Python
258 lines
8.1 KiB
Python
#!/usr/bin/env python3
|
||
import argparse
|
||
import csv
|
||
import os
|
||
import time
|
||
from datetime import datetime
|
||
|
||
from gpiozero import OutputDevice
|
||
|
||
from rigol_phase import DEV as RIGOL_DEV, measure_phase
|
||
|
||
|
||
# 这里的“step”按全步计(不考虑细分)。细分后会自动把全步换算为脉冲数。
|
||
BASE_STEPS_PER_MM = 200 # 200 full-steps = 1mm
|
||
MM_MIN = 0
|
||
MM_MAX = 300
|
||
|
||
|
||
class GPIOStepper:
|
||
def __init__(
|
||
self,
|
||
*,
|
||
dir_pin: int,
|
||
step_pin: int,
|
||
microsteps: int = 8,
|
||
) -> None:
|
||
if microsteps <= 0:
|
||
raise ValueError("microsteps must be a positive integer")
|
||
self.microsteps = int(microsteps)
|
||
|
||
# gpiozero 默认使用 BCM 编号
|
||
self._dir = OutputDevice(dir_pin)
|
||
self._step = OutputDevice(step_pin)
|
||
self._step.off()
|
||
|
||
def close(self) -> None:
|
||
try:
|
||
self._step.off()
|
||
except Exception:
|
||
pass
|
||
try:
|
||
self._dir.off()
|
||
except Exception:
|
||
pass
|
||
|
||
def ping(self) -> str:
|
||
return "GPIO"
|
||
|
||
def set_dir(self, inc: bool) -> None:
|
||
value = 1 if inc else 0
|
||
self._dir.value = value
|
||
|
||
def move_steps(self, full_steps: int, freq_hz: int) -> None:
|
||
if full_steps < 0:
|
||
raise ValueError("steps must be >= 0")
|
||
if freq_hz <= 0:
|
||
raise ValueError("freq_hz must be > 0")
|
||
|
||
pulses = int(full_steps) * self.microsteps
|
||
if pulses <= 0:
|
||
return
|
||
|
||
half_period = 0.5 / float(freq_hz)
|
||
for _ in range(pulses):
|
||
self._step.on()
|
||
time.sleep(half_period)
|
||
self._step.off()
|
||
time.sleep(half_period)
|
||
|
||
|
||
def _default_csv_path() -> str:
|
||
ts = datetime.now().strftime("%Y%m%d_%H%M%S")
|
||
return os.path.join(os.getcwd(), f"scan_{ts}.csv")
|
||
|
||
|
||
def _ensure_csv_header(path: str) -> None:
|
||
exists = os.path.exists(path) and os.path.getsize(path) > 0
|
||
if exists:
|
||
return
|
||
|
||
with open(path, "w", newline="", encoding="utf-8") as f:
|
||
w = csv.writer(f)
|
||
w.writerow(
|
||
[
|
||
"ts",
|
||
"mm",
|
||
"f0_hz",
|
||
"phi1_rad",
|
||
"phi2_rad",
|
||
"dphi_rad",
|
||
"dphi_deg",
|
||
"amp1_pp_adc",
|
||
"amp2_pp_adc",
|
||
"rigol_idn",
|
||
"points_mode",
|
||
"n",
|
||
]
|
||
)
|
||
|
||
|
||
def _append_row(path: str, *, mm: float, r: dict) -> None:
|
||
with open(path, "a", newline="", encoding="utf-8") as f:
|
||
w = csv.writer(f)
|
||
w.writerow(
|
||
[
|
||
r.get("ts"),
|
||
mm,
|
||
r.get("f0_hz"),
|
||
r.get("phi1_rad"),
|
||
r.get("phi2_rad"),
|
||
r.get("dphi_rad"),
|
||
r.get("dphi_deg"),
|
||
r.get("amp1_pp_adc"),
|
||
r.get("amp2_pp_adc"),
|
||
r.get("idn"),
|
||
r.get("points_mode"),
|
||
r.get("n"),
|
||
]
|
||
)
|
||
f.flush()
|
||
|
||
|
||
def main() -> None:
|
||
ap = argparse.ArgumentParser(description="电机每1mm移动一次,并读取Rigol示波器数据写入CSV")
|
||
ap.add_argument("--mm", type=float, required=True, help="启动时装置当前位置(mm),范围0-300")
|
||
ap.add_argument("--end-mm", type=float, default=300.0, help="扫描终点(mm),默认300")
|
||
ap.add_argument(
|
||
"--step",
|
||
type=int,
|
||
default=40,
|
||
help="每步移动(全步 steps),默认40(200 step = 1mm;会按细分自动换算为脉冲)",
|
||
)
|
||
|
||
ap.add_argument("--microsteps", type=int, default=8, help="驱动器细分倍数,默认8")
|
||
ap.add_argument("--dir-pin", type=int, default=22, help="DIR 引脚(BCM编号),默认22")
|
||
ap.add_argument("--step-pin", type=int, default=27, help="STEP 引脚(BCM编号),默认27")
|
||
|
||
ap.add_argument("--move-freq", type=int, default=1200, help="MOVE命令使用的频率Hz,默认500")
|
||
ap.add_argument("--settle-s", type=float, default=0.05, help="每次移动后等待再测量(秒),默认0.05")
|
||
|
||
ap.add_argument("--rigol-dev", default=RIGOL_DEV, help="Rigol设备路径(默认/dev/usbtmc0)")
|
||
ap.add_argument("--rigol-timeout", type=float, default=3.0)
|
||
ap.add_argument("--rigol-mode", default="NORM", choices=["NORM", "RAW", "MAX"], help="WAV:POIN:MODE")
|
||
|
||
ap.add_argument(
|
||
"--live",
|
||
action="store_true",
|
||
help="开启 Matplotlib 实时图形监视器(相位差/峰峰值/频率 + 进度条)",
|
||
)
|
||
|
||
ap.add_argument("--csv", default="", help="输出CSV路径,默认当前目录scan_时间戳.csv")
|
||
args = ap.parse_args()
|
||
|
||
if args.step <= 0:
|
||
raise SystemExit("--step 必须是正整数")
|
||
if args.microsteps <= 0:
|
||
raise SystemExit("--microsteps 必须是正整数")
|
||
|
||
if not (MM_MIN <= args.mm <= MM_MAX):
|
||
raise SystemExit("--mm 超出范围 0-300")
|
||
if not (MM_MIN <= args.end_mm <= MM_MAX):
|
||
raise SystemExit("--end-mm 超出范围 0-300")
|
||
|
||
# 用“步数”作为内部位置单位,避免浮点累积误差。
|
||
# 这里的 steps 是“全步”;实际输出到 STEP 引脚时会乘以 microsteps。
|
||
start_steps = int(round(args.mm * BASE_STEPS_PER_MM))
|
||
end_steps = int(round(args.end_mm * BASE_STEPS_PER_MM))
|
||
step_steps = int(args.step)
|
||
|
||
csv_path = args.csv or _default_csv_path()
|
||
_ensure_csv_header(csv_path)
|
||
|
||
motor = GPIOStepper(
|
||
dir_pin=args.dir_pin,
|
||
step_pin=args.step_pin,
|
||
microsteps=args.microsteps,
|
||
)
|
||
|
||
live = None
|
||
if args.live:
|
||
try:
|
||
from live_monitor import LiveMonitor
|
||
|
||
start_mm = start_steps / BASE_STEPS_PER_MM
|
||
end_mm = end_steps / BASE_STEPS_PER_MM
|
||
live = LiveMonitor(start_mm=start_mm, end_mm=end_mm, csv_path=csv_path)
|
||
except Exception as e:
|
||
print(f"[!] Live monitor disabled: {e}")
|
||
live = None
|
||
|
||
def live_add(mm: float, meas: dict) -> None:
|
||
nonlocal live
|
||
if live is None:
|
||
return
|
||
try:
|
||
live.add_point(mm=mm, meas=meas)
|
||
except Exception as e:
|
||
print(f"[!] Live monitor error (disabled): {e}")
|
||
try:
|
||
live.close()
|
||
except Exception:
|
||
pass
|
||
live = None
|
||
|
||
try:
|
||
r = motor.ping()
|
||
print(
|
||
f"[+] Motor GPIO: DIR={args.dir_pin}, STEP={args.step_pin}, microsteps={args.microsteps} (PING -> {r})"
|
||
)
|
||
|
||
# 先测一次(包含一次性IDN)
|
||
start_mm = start_steps / BASE_STEPS_PER_MM
|
||
print(f"[+] Measure @ {start_mm:.3f}mm")
|
||
meas = measure_phase(dev=args.rigol_dev, timeout=args.rigol_timeout, points_mode=args.rigol_mode, fetch_idn=True)
|
||
_append_row(csv_path, mm=start_mm, r=meas)
|
||
live_add(start_mm, meas)
|
||
print(f"[+] CSV: {csv_path}")
|
||
|
||
cur_steps = start_steps
|
||
if cur_steps == end_steps:
|
||
print("[+] 已在终点,无需移动")
|
||
return
|
||
|
||
inc = end_steps > cur_steps
|
||
direction_str = "+" if inc else "-"
|
||
step_mm = step_steps / BASE_STEPS_PER_MM
|
||
end_mm = end_steps / BASE_STEPS_PER_MM
|
||
print(f"[+] Scan: {start_mm:.3f} -> {end_mm:.3f} (step {direction_str}{step_steps} steps = {step_mm:.3f}mm)")
|
||
|
||
while cur_steps != end_steps:
|
||
motor.set_dir(inc=inc)
|
||
|
||
remaining = (end_steps - cur_steps) if inc else (cur_steps - end_steps)
|
||
move = step_steps if step_steps <= remaining else remaining
|
||
motor.move_steps(move, args.move_freq)
|
||
time.sleep(args.settle_s)
|
||
|
||
cur_steps = cur_steps + (move if inc else -move)
|
||
cur_mm = cur_steps / BASE_STEPS_PER_MM
|
||
print(f"[+] Measure @ {cur_mm:.3f}mm")
|
||
meas = measure_phase(dev=args.rigol_dev, timeout=args.rigol_timeout, points_mode=args.rigol_mode, fetch_idn=False)
|
||
_append_row(csv_path, mm=cur_mm, r=meas)
|
||
live_add(cur_mm, meas)
|
||
|
||
except KeyboardInterrupt:
|
||
print("\n[!] Interrupted")
|
||
finally:
|
||
motor.close()
|
||
if live is not None:
|
||
try:
|
||
live.close()
|
||
except Exception:
|
||
pass
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|