# pendulum_transformer.py import argparse, math, os import numpy as np import pandas as pd import torch import torch.nn as nn from torch.utils.data import Dataset, DataLoader import matplotlib.pyplot as plt # ============ 工具函数 ============ def moving_average(a, w=5): if w <= 1: return a pad = np.pad(a, (w//2, w-1-w//2), mode='edge') k = np.ones(w)/w return np.convolve(pad, k, mode='valid') def fit_circle_lstsq(x, y): # 解: x^2 + y^2 = 2 a x + 2 b y + c A = np.c_[2*x, 2*y, np.ones_like(x)] b = x**2 + y**2 sol, *_ = np.linalg.lstsq(A, b, rcond=None) a, b_, c = sol cx, cy = a, b_ r = math.sqrt(max(cx*cx + cy*cy + c, 0.0)) return cx, cy, r def wrap_to_pi(a): return (a + np.pi) % (2*np.pi) - np.pi def build_windows(arr, s_in=120, s_out=30, stride=10): # arr: [T, F] T, F = arr.shape X, Y, t_idx = [], [], [] for t in range(0, T - s_in - s_out + 1, stride): X.append(arr[t:t+s_in]) Y.append(arr[t+s_in:t+s_in+s_out]) t_idx.append(t) return np.array(X), np.array(Y), np.array(t_idx) # [N, S, F], [N, S, F], [N] def sincos_to_angle(s, c): return np.arctan2(s, c) def angle_rmse(pred_sc, true_sc): # pred_sc, true_sc: [..., 2] -> (sin, cos) ps, pc = pred_sc[...,0], pred_sc[...,1] ts, tc = true_sc[...,0], true_sc[...,1] pa = np.arctan2(ps, pc) ta = np.arctan2(ts, tc) diff = wrap_to_pi(pa - ta) return np.sqrt(np.mean(diff**2)) def split_even_indices(n, ratios=(0.7, 0.15, 0.15), block=50, seed=123): """将 [0..n-1] 均匀地分成三部分(train/val/test)""" if n <= 0: return np.array([], dtype=int), np.array([], dtype=int), np.array([], dtype=int) probs = np.array(ratios, dtype=np.float64) s = probs.sum() probs = probs / (s if s > 0 else 1.0) rng = np.random.default_rng(seed) idxs = np.arange(n) tr, va, te = [], [], [] for start in range(0, n, block): end = min(start + block, n) m = end - start target = probs * m base = np.floor(target).astype(int) rem = m - int(base.sum()) if rem > 0: frac = target - base order = np.argsort(-(frac + 1e-9 * rng.random(frac.shape))) for k in order[:rem]: base[k] += 1 local = idxs[start:end].copy() rng.shuffle(local) a, b, c = int(base[0]), int(base[1]), int(base[2]) tr.extend(local[:a]); va.extend(local[a:a+b]); te.extend(local[a+b:a+b+c]) return np.array(tr, dtype=int), np.array(va, dtype=int), np.array(te, dtype=int) # ============ 数据集 ============ class SeqDataset(Dataset): def __init__(self, X, Y, mu=None, std=None): self.X = X.astype(np.float32) self.Y = Y.astype(np.float32) if mu is None or std is None: mu = self.X.reshape(-1, self.X.shape[-1]).mean(0, keepdims=True) std = self.X.reshape(-1, self.X.shape[-1]).std(0, keepdims=True) + 1e-8 self.mu = mu.astype(np.float32) self.std = std.astype(np.float32) def __len__(self): return self.X.shape[0] def __getitem__(self, i): x = (self.X[i] - self.mu) / self.std y = (self.Y[i] - self.mu) / self.std return torch.from_numpy(x), torch.from_numpy(y) # ============ 位置编码 ============ class PositionalEncoding(nn.Module): def __init__(self, d_model, max_len=4096): super().__init__() pe = torch.zeros(max_len, d_model) pos = torch.arange(0, max_len, dtype=torch.float32).unsqueeze(1) div = torch.exp(torch.arange(0, d_model, 2, dtype=torch.float32) * (-math.log(10000.0) / d_model)) pe[:, 0::2] = torch.sin(pos * div) pe[:, 1::2] = torch.cos(pos * div) self.register_buffer('pe', pe.unsqueeze(0)) # [1, L, D] def forward(self, x): return x + self.pe[:, :x.size(1), :] # ============ 极简 Transformer ============ class TimeSeriesTransformer(nn.Module): def __init__(self, in_dim=2, d_model=256, nhead=4, enc_layers=3, dec_layers=3, d_ff=512, dropout=0.1, out_dim=2): super().__init__() self.in_proj = nn.Linear(in_dim, d_model) self.out_proj = nn.Linear(d_model, out_dim) self.tgt_proj = nn.Linear(out_dim, d_model) self.pos_enc = PositionalEncoding(d_model) self.tf = nn.Transformer( d_model=d_model, nhead=nhead, num_encoder_layers=enc_layers, num_decoder_layers=dec_layers, dim_feedforward=d_ff, dropout=dropout, batch_first=True ) def forward(self, src, tgt_in, tgt_mask=None): src = self.pos_enc(self.in_proj(src)) tgt = self.pos_enc(self.tgt_proj(tgt_in)) out = self.tf(src, tgt, tgt_mask=tgt_mask) # [B, S_out, D] return self.out_proj(out) # ============ 主流程 ============ def main(): ap = argparse.ArgumentParser() ap.add_argument("--csv", type=str, nargs='+', required=True, help="CSV files") ap.add_argument("--split", type=float, nargs=3, default=[0.7, 0.15, 0.15]) ap.add_argument("--s_in", type=int, default=180) ap.add_argument("--s_out", type=int, default=30) ap.add_argument("--stride", type=int, default=10) ap.add_argument("--epochs", type=int, default=10) ap.add_argument("--batch", type=int, default=64) ap.add_argument("--lr", type=float, default=1e-3) ap.add_argument("--ckpt", type=str, default="ckpt/pendulum_tf_base.pt") ap.add_argument("--eval_only", action="store_true") # 新增:评估相关 ap.add_argument("--eval_all", action="store_true", help="Evaluate on the entire test set") ap.add_argument("--eval_samples", type=int, default=6, help="How many random samples to plot from test set") ap.add_argument("--seed", type=int, default=123, help="Random seed for eval sampling") args = ap.parse_args() device = torch.device("cuda" if torch.cuda.is_available() else "cpu") # 规范化路径 def _norm(p: str) -> str: return os.path.abspath(os.path.expanduser(p)) csv_list = sorted(set(_norm(p) for p in (args.csv or []))) if not csv_list: raise ValueError("No CSV files provided.") ratios = np.array(args.split, dtype=np.float64) s = ratios.sum() ratios = ratios / (s if s > 0 else 1.0) print(f"[Split] auto per-CSV even mode: files={len(csv_list)} | ratios train/val/test = {ratios.tolist()}") Xtr_list, Ytr_list, Xva_list, Yva_list, Xte_list, Yte_list = [], [], [], [], [], [] for i, csv_path in enumerate(csv_list): df = pd.read_csv(csv_path) x = df['x'].to_numpy(dtype=np.float64) y = df['y'].to_numpy(dtype=np.float64) # 圆拟合 => 悬点 cx, cy, r = fit_circle_lstsq(x, y) print(f"[{i+1}/{len(csv_list)}] {os.path.basename(csv_path)} pivot=({cx:.2f},{cy:.2f}), r~{r:.2f}") # 像素 -> 角度;竖直向下为 0 theta = np.arctan2(x - cx, cy - y) theta = np.unwrap(theta) sin_t, cos_t = np.sin(theta), np.cos(theta) feats = np.stack([sin_t, cos_t], axis=1) # [T, 2] # 滑窗 Xi, Yi, _ = build_windows(feats, s_in=args.s_in, s_out=args.s_out, stride=args.stride) if Xi.size == 0: continue tr_idx, va_idx, te_idx = split_even_indices(Xi.shape[0], ratios) if tr_idx.size: Xtr_list.append(Xi[tr_idx]); Ytr_list.append(Yi[tr_idx]) if va_idx.size: Xva_list.append(Xi[va_idx]); Yva_list.append(Yi[va_idx]) if te_idx.size: Xte_list.append(Xi[te_idx]); Yte_list.append(Yi[te_idx]) def _concat(lst, steps): if not lst: return np.zeros((0, steps, 2), dtype=np.float32) return np.concatenate(lst, axis=0).astype(np.float32) Xtr = _concat(Xtr_list, args.s_in) Ytr = _concat(Ytr_list, args.s_out) Xva = _concat(Xva_list, args.s_in) Yva = _concat(Yva_list, args.s_out) Xte = _concat(Xte_list, args.s_in) Yte = _concat(Yte_list, args.s_out) if Xtr.shape[0] == 0: raise ValueError("No training samples produced.") if Xva.shape[0] == 0: raise ValueError("No validation samples produced.") if Xte.shape[0] == 0: raise ValueError("No test samples produced.") # 标准化参数(仅基于训练集) base = Xtr mu = base.reshape(-1, base.shape[-1]).mean(0, keepdims=True).astype(np.float32) std = base.reshape(-1, base.shape[-1]).std(0, keepdims=True).astype(np.float32) + 1e-8 dtr = SeqDataset(Xtr, Ytr, mu, std) dva = SeqDataset(Xva, Yva, mu, std) dte = SeqDataset(Xte, Yte, mu, std) nw = 2 Ltr = DataLoader(dtr, batch_size=args.batch, shuffle=True, drop_last=True, pin_memory=True, num_workers=nw) Lva = DataLoader(dva, batch_size=args.batch, shuffle=False, pin_memory=True, num_workers=nw) Lte = DataLoader(dte, batch_size=args.batch, shuffle=False, pin_memory=True, num_workers=nw) cfg = dict(d_model=256, nhead=4, enc_layers=3, dec_layers=3, d_ff=512) net = TimeSeriesTransformer(in_dim=2, out_dim=2, **cfg).to(device) mse = nn.MSELoss() opt = torch.optim.AdamW(net.parameters(), lr=args.lr, weight_decay=1e-4) # ---- warmup + cosine 调度(按 step)---- total_steps = max(1, args.epochs * len(Ltr)) warmup = min(500, max(1, total_steps // 10)) def lr_lambda(step): if step < warmup: return float(step + 1) / float(warmup) progress = (step - warmup) / max(1, total_steps - warmup) cosine = 0.5 * (1.0 + math.cos(math.pi * progress)) min_factor = 0.01 return min_factor + (1 - min_factor) * cosine scheduler = torch.optim.lr_scheduler.LambdaLR(opt, lr_lambda=lr_lambda) # === 标准化 <-> 原始空间 转换 & 单位化 === mu_t = torch.from_numpy(mu).to(device) # [1, 2] std_t = torch.from_numpy(std).to(device) # [1, 2] def to_raw(z): return z * std_t + mu_t def to_std(z): return (z - mu_t) / std_t def unitize(z, eps=1e-6): n = torch.clamp(torch.linalg.norm(z, dim=-1, keepdim=True), min=eps) return z / n # === 预测一个 batch 的自回归函数(返回 raw 空间预测) === def rollout_raw(xb_std, steps): # xb_std: [B, S_in, 2] 标准化空间 tgt_tokens = xb_std[:, -1:, :2].clone() # 起始 token(标准化空间) for _ in range(steps): mask = nn.Transformer.generate_square_subsequent_mask(tgt_tokens.size(1)).to(device) out = net(xb_std, tgt_tokens, tgt_mask=mask) # 标准化 next_std = out[:, -1:, :] next_raw = to_raw(next_std) # -> 原始 next_raw = unitize(next_raw) # 单位圆约束 next_std = to_std(next_raw) # -> 标准化(与训练分布一致) tgt_tokens = torch.cat([tgt_tokens, next_std], dim=1) pred_std = tgt_tokens[:, 1:, :] # [B, steps, 2] 标准化 return to_raw(pred_std) # -> 原始 # === 改动一:闭环 rollout 损失所需的角度/方向损失 === def cosine_ang_loss(pred_sc_raw, true_sc_raw, eps=1e-6): """ pred_sc_raw/true_sc_raw: [B,S,2],分别是 RAW 空间下的 (sin,cos) 计算 1 - cos(Δθ) 的平均值,忽略向量长度,只关注方向(角度)差。 """ pu = pred_sc_raw / torch.clamp(torch.linalg.norm(pred_sc_raw, dim=-1, keepdim=True), min=eps) tu = true_sc_raw / torch.clamp(torch.linalg.norm(true_sc_raw, dim=-1, keepdim=True), min=eps) return (1.0 - (pu * tu).sum(dim=-1)).mean() # === 训练 or 加载 === os.makedirs(os.path.dirname(args.ckpt) or ".", exist_ok=True) if args.eval_only: if not os.path.exists(args.ckpt): raise FileNotFoundError(f"Checkpoint not found: {args.ckpt}. 请先训练一次或指定正确路径。") # 关键修复:显式关闭 weights_only 的安全反序列化限制(你信任本地 ckpt) payload = torch.load(args.ckpt, map_location=device, weights_only=False) # 若 ckpt 中包含 cfg,用 ckpt 的配置重建网络,避免结构不一致 ckpt_cfg = payload.get("cfg", None) if ckpt_cfg is not None: # 如果与当前构造的不同,则重建 if ckpt_cfg != cfg: net = TimeSeriesTransformer(in_dim=2, out_dim=2, **ckpt_cfg).to(device) net.load_state_dict(payload["model"], strict=True) mu = payload["mu"]; std = payload["std"] mu_t = torch.from_numpy(mu).to(device); std_t = torch.from_numpy(std).to(device) print(f"Loaded checkpoint from {args.ckpt}") else: # Sanity:原始空间下 ||yb|| 应 ~1 with torch.no_grad(): xb0, yb0 = next(iter(Ltr)) m_norm_raw = torch.linalg.norm(to_raw(yb0.to(device)), dim=-1).mean().item() print(f"[Sanity] mean ||yb|| in RAW space ~= {m_norm_raw:.4f}") best_val = float("inf") global_step = 0 for ep in range(1, args.epochs+1): net.train() total = 0.0 total_tf = 0.0 total_roll = 0.0 # ρ:闭环损失权重(线性升到 1.0;如需更稳可改为升到 0.5) rho = min(1.0, ep / max(1, args.epochs // 2)) for xb, yb in Ltr: xb = xb.to(device, non_blocking=True); yb = yb.to(device, non_blocking=True) B, S, F = yb.shape # ---------- (A) Teacher Forcing 分支(与原来一致) ---------- x_last = xb[:, -1:, :F] tgt_in = torch.cat([x_last, yb[:, :-1, :]], dim=1) mask = nn.Transformer.generate_square_subsequent_mask(S).to(device) pred_tf = net(xb, tgt_in, tgt_mask=mask) # 标准化空间 pred_tf_u = unitize(to_raw(pred_tf)) # 原始空间单位圆(只对方向约束) yb_u = unitize(to_raw(yb)) loss_tf = mse(pred_tf_u, yb_u) # ---------- (B) 闭环 rollout 分支(改动一的核心) ---------- with torch.no_grad(): y_true_raw = to_raw(yb) # RAW,用于对齐真值方向 y_pred_raw = rollout_raw(xb, steps=S) # RAW,模型自回归滚动预测 loss_roll = cosine_ang_loss(y_pred_raw, y_true_raw) # ---------- 合成总损失 ---------- loss = (1.0 - rho) * loss_tf + rho * loss_roll opt.zero_grad(set_to_none=True) loss.backward() torch.nn.utils.clip_grad_norm_(net.parameters(), 1.0) opt.step() scheduler.step() total += loss.item() * xb.size(0) total_tf += loss_tf.item() * xb.size(0) total_roll += loss_roll.item() * xb.size(0) global_step += 1 tr_loss = total / len(dtr) tr_tf = total_tf / len(dtr) tr_rl = total_roll / len(dtr) # 验证:保持原先的 TF-式度量(也可扩展加闭环验证,这里先不改动) net.eval() with torch.no_grad(): total_v = 0.0 for xb, yb in Lva: xb = xb.to(device, non_blocking=True); yb = yb.to(device, non_blocking=True) x_last = xb[:, -1:, :2] tgt_in = torch.cat([x_last, yb[:, :-1, :]], dim=1) mask = nn.Transformer.generate_square_subsequent_mask(yb.size(1)).to(device) pred = net(xb, tgt_in, tgt_mask=mask) pred_u = unitize(to_raw(pred)) yb_u = unitize(to_raw(yb)) total_v += mse(pred_u, yb_u).item() * xb.size(0) va_loss = total_v / len(dva) cur_lr = scheduler.get_last_lr()[0] print(f"[Epoch {ep:02d}] ρ={rho:.2f} | train TOTAL={tr_loss:.6f} (TF={tr_tf:.6f}, ROLL={tr_rl:.6f}) | val MSE(u_raw)={va_loss:.6f} | lr={cur_lr:.6e}") if not np.isnan(va_loss) and va_loss < best_val: best_val = va_loss payload = {"model": net.state_dict(), "mu": mu, "std": std, "cfg": cfg} torch.save(payload, args.ckpt) print(f"Saved checkpoint -> {args.ckpt}") # ===== 评估 ===== net.eval() rng = np.random.default_rng(args.seed) with torch.no_grad(): # --- 1) 全量评估(可选) --- if args.eval_all: steps = args.s_out sum_sq = 0.0 n_total = 0 per_step_sq = np.zeros(steps, dtype=np.float64) total_samples = 0 for xb, yb in Lte: xb = xb.to(device, non_blocking=True); yb = yb.to(device, non_blocking=True) y_pred_raw = rollout_raw(xb, steps) # [B, steps, 2] raw y_true_raw = to_raw(yb) # [B, steps, 2] raw y_pred_np = y_pred_raw.cpu().numpy() y_true_np = y_true_raw.cpu().numpy() # 单位圆 -> 角度误差 n1 = np.clip(np.linalg.norm(y_pred_np, axis=-1, keepdims=True), 1e-6, None) n2 = np.clip(np.linalg.norm(y_true_np, axis=-1, keepdims=True), 1e-6, None) y_pred_u = y_pred_np / n1 y_true_u = y_true_np / n2 pa = np.arctan2(y_pred_u[...,0], y_pred_u[...,1]) # 注意顺序 (sin,cos) ta = np.arctan2(y_true_u[...,0], y_true_u[...,1]) diff = wrap_to_pi(pa - ta) # [B, steps] sum_sq += np.sum(diff**2) per_step_sq += np.sum(diff**2, axis=0) n_total += diff.size total_samples += diff.shape[0] overall_rmse = math.sqrt(sum_sq / n_total) per_step_rmse = np.sqrt(per_step_sq / max(1, total_samples)) print(f"[Test:ALL] overall angle RMSE = {overall_rmse:.6f} rad ({overall_rmse*180/np.pi:.3f} deg)") # 保存每步 RMSE 曲线 ts = np.arange(1, steps+1) plt.figure(figsize=(6,4)) plt.plot(ts, per_step_rmse * 180/np.pi, marker='o') plt.xlabel("horizon (step)"); plt.ylabel("RMSE (deg)") plt.title("Per-step angle RMSE on test set") plt.grid(True, linestyle="--", alpha=0.5) plt.tight_layout() plt.savefig("rmse_per_step.png", dpi=150) print("Saved plot -> rmse_per_step.png") # --- 2) 随机抽样 K 个样本可视化 --- X_list, Y_list = [], [] for xb, yb in Lte: X_list.append(xb); Y_list.append(yb) X_all = torch.cat(X_list, dim=0).to(device, non_blocking=True) Y_all = torch.cat(Y_list, dim=0).to(device, non_blocking=True) N = X_all.size(0) K = min(max(0, args.eval_samples), N) if K > 0: idxs = rng.choice(N, size=K, replace=False) steps = Y_all.size(1) y_pred_raw_all = rollout_raw(X_all[idxs], steps) # [K, steps, 2] y_true_raw_all = to_raw(Y_all[idxs]) # [K, steps, 2] def unit_np(z): n = np.linalg.norm(z, axis=-1, keepdims=True) n = np.clip(n, 1e-6, None) return z / n y_pred_np = y_pred_raw_all.cpu().numpy() y_true_np = y_true_raw_all.cpu().numpy() y_pred_u = unit_np(y_pred_np) y_true_u = unit_np(y_true_np) for j in range(K): ya = np.arctan2(y_true_u[j,:,0], y_true_u[j,:,1]) pa = np.arctan2(y_pred_u[j,:,0], y_pred_u[j,:,1]) pa_aligned = pa + 2*np.pi * np.round((ya - pa)/(2*np.pi)) plt.figure(figsize=(8,4)) plt.plot(np.arange(steps), ya, label="true θ") plt.plot(np.arange(steps), pa_aligned, '--', label="pred θ (aligned)") plt.xlabel("future step"); plt.ylabel("angle (rad)") plt.title(f"{steps}-step forecast (test sample {j})") plt.legend(); plt.tight_layout() out_png = f"pred_plot_sample_{j}.png" plt.savefig(out_png, dpi=150) plt.close() print(f"Saved plot -> {out_png}") # 如果没开 --eval_all,也输出一个“单 batch”对齐的整体 RMSE 以免落差太大 if not args.eval_all: steps = args.s_out xb, yb = X_all[:args.batch], Y_all[:args.batch] y_pred_raw = rollout_raw(xb, steps) y_true_raw = to_raw(yb) y_pred_np = y_pred_raw.cpu().numpy() y_true_np = y_true_raw.cpu().numpy() n1 = np.clip(np.linalg.norm(y_pred_np, axis=-1, keepdims=True), 1e-6, None) n2 = np.clip(np.linalg.norm(y_true_np, axis=-1, keepdims=True), 1e-6, None) y_pred_u = y_pred_np / n1; y_true_u = y_true_np / n2 pa = np.arctan2(y_pred_u[...,0], y_pred_u[...,1]) ta = np.arctan2(y_true_u[...,0], y_true_u[...,1]) diff = wrap_to_pi(pa - ta) rmse = math.sqrt(np.mean(diff**2)) print(f"[Test:Mini] angle RMSE = {rmse:.6f} rad ({rmse*180/np.pi:.3f} deg)") if __name__ == "__main__": main()