479 lines
21 KiB
Python
479 lines
21 KiB
Python
# pendulum_transformer.py
|
||
import argparse, math, os
|
||
import numpy as np
|
||
import pandas as pd
|
||
import torch
|
||
import torch.nn as nn
|
||
from torch.utils.data import Dataset, DataLoader
|
||
import matplotlib.pyplot as plt
|
||
|
||
# ============ 工具函数 ============
|
||
def moving_average(a, w=5):
|
||
if w <= 1: return a
|
||
pad = np.pad(a, (w//2, w-1-w//2), mode='edge')
|
||
k = np.ones(w)/w
|
||
return np.convolve(pad, k, mode='valid')
|
||
|
||
def fit_circle_lstsq(x, y):
|
||
# 解: x^2 + y^2 = 2 a x + 2 b y + c
|
||
A = np.c_[2*x, 2*y, np.ones_like(x)]
|
||
b = x**2 + y**2
|
||
sol, *_ = np.linalg.lstsq(A, b, rcond=None)
|
||
a, b_, c = sol
|
||
cx, cy = a, b_
|
||
r = math.sqrt(max(cx*cx + cy*cy + c, 0.0))
|
||
return cx, cy, r
|
||
|
||
def wrap_to_pi(a):
|
||
return (a + np.pi) % (2*np.pi) - np.pi
|
||
|
||
def build_windows(arr, s_in=120, s_out=30, stride=10):
|
||
# arr: [T, F]
|
||
T, F = arr.shape
|
||
X, Y, t_idx = [], [], []
|
||
for t in range(0, T - s_in - s_out + 1, stride):
|
||
X.append(arr[t:t+s_in])
|
||
Y.append(arr[t+s_in:t+s_in+s_out])
|
||
t_idx.append(t)
|
||
return np.array(X), np.array(Y), np.array(t_idx) # [N, S, F], [N, S, F], [N]
|
||
|
||
def sincos_to_angle(s, c):
|
||
return np.arctan2(s, c)
|
||
|
||
def angle_rmse(pred_sc, true_sc):
|
||
# pred_sc, true_sc: [..., 2] -> (sin, cos)
|
||
ps, pc = pred_sc[...,0], pred_sc[...,1]
|
||
ts, tc = true_sc[...,0], true_sc[...,1]
|
||
pa = np.arctan2(ps, pc)
|
||
ta = np.arctan2(ts, tc)
|
||
diff = wrap_to_pi(pa - ta)
|
||
return np.sqrt(np.mean(diff**2))
|
||
|
||
def split_even_indices(n, ratios=(0.7, 0.15, 0.15), block=50, seed=123):
|
||
"""将 [0..n-1] 均匀地分成三部分(train/val/test)"""
|
||
if n <= 0:
|
||
return np.array([], dtype=int), np.array([], dtype=int), np.array([], dtype=int)
|
||
probs = np.array(ratios, dtype=np.float64)
|
||
s = probs.sum()
|
||
probs = probs / (s if s > 0 else 1.0)
|
||
rng = np.random.default_rng(seed)
|
||
idxs = np.arange(n)
|
||
tr, va, te = [], [], []
|
||
for start in range(0, n, block):
|
||
end = min(start + block, n)
|
||
m = end - start
|
||
target = probs * m
|
||
base = np.floor(target).astype(int)
|
||
rem = m - int(base.sum())
|
||
if rem > 0:
|
||
frac = target - base
|
||
order = np.argsort(-(frac + 1e-9 * rng.random(frac.shape)))
|
||
for k in order[:rem]:
|
||
base[k] += 1
|
||
local = idxs[start:end].copy()
|
||
rng.shuffle(local)
|
||
a, b, c = int(base[0]), int(base[1]), int(base[2])
|
||
tr.extend(local[:a]); va.extend(local[a:a+b]); te.extend(local[a+b:a+b+c])
|
||
return np.array(tr, dtype=int), np.array(va, dtype=int), np.array(te, dtype=int)
|
||
|
||
# ============ 数据集 ============
|
||
class SeqDataset(Dataset):
|
||
def __init__(self, X, Y, mu=None, std=None):
|
||
self.X = X.astype(np.float32)
|
||
self.Y = Y.astype(np.float32)
|
||
if mu is None or std is None:
|
||
mu = self.X.reshape(-1, self.X.shape[-1]).mean(0, keepdims=True)
|
||
std = self.X.reshape(-1, self.X.shape[-1]).std(0, keepdims=True) + 1e-8
|
||
self.mu = mu.astype(np.float32)
|
||
self.std = std.astype(np.float32)
|
||
|
||
def __len__(self): return self.X.shape[0]
|
||
def __getitem__(self, i):
|
||
x = (self.X[i] - self.mu) / self.std
|
||
y = (self.Y[i] - self.mu) / self.std
|
||
return torch.from_numpy(x), torch.from_numpy(y)
|
||
|
||
# ============ 位置编码 ============
|
||
class PositionalEncoding(nn.Module):
|
||
def __init__(self, d_model, max_len=4096):
|
||
super().__init__()
|
||
pe = torch.zeros(max_len, d_model)
|
||
pos = torch.arange(0, max_len, dtype=torch.float32).unsqueeze(1)
|
||
div = torch.exp(torch.arange(0, d_model, 2, dtype=torch.float32) * (-math.log(10000.0) / d_model))
|
||
pe[:, 0::2] = torch.sin(pos * div)
|
||
pe[:, 1::2] = torch.cos(pos * div)
|
||
self.register_buffer('pe', pe.unsqueeze(0)) # [1, L, D]
|
||
def forward(self, x):
|
||
return x + self.pe[:, :x.size(1), :]
|
||
|
||
# ============ 极简 Transformer ============
|
||
class TimeSeriesTransformer(nn.Module):
|
||
def __init__(self, in_dim=2, d_model=256, nhead=4, enc_layers=3, dec_layers=3, d_ff=512, dropout=0.1, out_dim=2):
|
||
super().__init__()
|
||
self.in_proj = nn.Linear(in_dim, d_model)
|
||
self.out_proj = nn.Linear(d_model, out_dim)
|
||
self.tgt_proj = nn.Linear(out_dim, d_model)
|
||
self.pos_enc = PositionalEncoding(d_model)
|
||
self.tf = nn.Transformer(
|
||
d_model=d_model, nhead=nhead,
|
||
num_encoder_layers=enc_layers, num_decoder_layers=dec_layers,
|
||
dim_feedforward=d_ff, dropout=dropout, batch_first=True
|
||
)
|
||
|
||
def forward(self, src, tgt_in, tgt_mask=None):
|
||
src = self.pos_enc(self.in_proj(src))
|
||
tgt = self.pos_enc(self.tgt_proj(tgt_in))
|
||
out = self.tf(src, tgt, tgt_mask=tgt_mask) # [B, S_out, D]
|
||
return self.out_proj(out)
|
||
|
||
# ============ 主流程 ============
|
||
def main():
|
||
ap = argparse.ArgumentParser()
|
||
ap.add_argument("--csv", type=str, nargs='+', required=True, help="CSV files")
|
||
ap.add_argument("--split", type=float, nargs=3, default=[0.7, 0.15, 0.15])
|
||
ap.add_argument("--s_in", type=int, default=180)
|
||
ap.add_argument("--s_out", type=int, default=30)
|
||
ap.add_argument("--stride", type=int, default=10)
|
||
ap.add_argument("--epochs", type=int, default=10)
|
||
ap.add_argument("--batch", type=int, default=64)
|
||
ap.add_argument("--lr", type=float, default=1e-3)
|
||
ap.add_argument("--ckpt", type=str, default="ckpt/pendulum_tf_base.pt")
|
||
ap.add_argument("--eval_only", action="store_true")
|
||
# 新增:评估相关
|
||
ap.add_argument("--eval_all", action="store_true", help="Evaluate on the entire test set")
|
||
ap.add_argument("--eval_samples", type=int, default=6, help="How many random samples to plot from test set")
|
||
ap.add_argument("--seed", type=int, default=123, help="Random seed for eval sampling")
|
||
args = ap.parse_args()
|
||
|
||
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
|
||
|
||
# 规范化路径
|
||
def _norm(p: str) -> str:
|
||
return os.path.abspath(os.path.expanduser(p))
|
||
|
||
csv_list = sorted(set(_norm(p) for p in (args.csv or [])))
|
||
if not csv_list:
|
||
raise ValueError("No CSV files provided.")
|
||
|
||
ratios = np.array(args.split, dtype=np.float64)
|
||
s = ratios.sum()
|
||
ratios = ratios / (s if s > 0 else 1.0)
|
||
print(f"[Split] auto per-CSV even mode: files={len(csv_list)} | ratios train/val/test = {ratios.tolist()}")
|
||
|
||
Xtr_list, Ytr_list, Xva_list, Yva_list, Xte_list, Yte_list = [], [], [], [], [], []
|
||
|
||
for i, csv_path in enumerate(csv_list):
|
||
df = pd.read_csv(csv_path)
|
||
x = df['x'].to_numpy(dtype=np.float64)
|
||
y = df['y'].to_numpy(dtype=np.float64)
|
||
|
||
# 圆拟合 => 悬点
|
||
cx, cy, r = fit_circle_lstsq(x, y)
|
||
print(f"[{i+1}/{len(csv_list)}] {os.path.basename(csv_path)} pivot=({cx:.2f},{cy:.2f}), r~{r:.2f}")
|
||
|
||
# 像素 -> 角度;竖直向下为 0
|
||
theta = np.arctan2(x - cx, cy - y)
|
||
theta = np.unwrap(theta)
|
||
sin_t, cos_t = np.sin(theta), np.cos(theta)
|
||
feats = np.stack([sin_t, cos_t], axis=1) # [T, 2]
|
||
|
||
# 滑窗
|
||
Xi, Yi, _ = build_windows(feats, s_in=args.s_in, s_out=args.s_out, stride=args.stride)
|
||
if Xi.size == 0:
|
||
continue
|
||
tr_idx, va_idx, te_idx = split_even_indices(Xi.shape[0], ratios)
|
||
if tr_idx.size: Xtr_list.append(Xi[tr_idx]); Ytr_list.append(Yi[tr_idx])
|
||
if va_idx.size: Xva_list.append(Xi[va_idx]); Yva_list.append(Yi[va_idx])
|
||
if te_idx.size: Xte_list.append(Xi[te_idx]); Yte_list.append(Yi[te_idx])
|
||
|
||
def _concat(lst, steps):
|
||
if not lst:
|
||
return np.zeros((0, steps, 2), dtype=np.float32)
|
||
return np.concatenate(lst, axis=0).astype(np.float32)
|
||
|
||
Xtr = _concat(Xtr_list, args.s_in)
|
||
Ytr = _concat(Ytr_list, args.s_out)
|
||
Xva = _concat(Xva_list, args.s_in)
|
||
Yva = _concat(Yva_list, args.s_out)
|
||
Xte = _concat(Xte_list, args.s_in)
|
||
Yte = _concat(Yte_list, args.s_out)
|
||
|
||
if Xtr.shape[0] == 0: raise ValueError("No training samples produced.")
|
||
if Xva.shape[0] == 0: raise ValueError("No validation samples produced.")
|
||
if Xte.shape[0] == 0: raise ValueError("No test samples produced.")
|
||
|
||
# 标准化参数(仅基于训练集)
|
||
base = Xtr
|
||
mu = base.reshape(-1, base.shape[-1]).mean(0, keepdims=True).astype(np.float32)
|
||
std = base.reshape(-1, base.shape[-1]).std(0, keepdims=True).astype(np.float32) + 1e-8
|
||
|
||
dtr = SeqDataset(Xtr, Ytr, mu, std)
|
||
dva = SeqDataset(Xva, Yva, mu, std)
|
||
dte = SeqDataset(Xte, Yte, mu, std)
|
||
|
||
nw = 2
|
||
Ltr = DataLoader(dtr, batch_size=args.batch, shuffle=True, drop_last=True, pin_memory=True, num_workers=nw)
|
||
Lva = DataLoader(dva, batch_size=args.batch, shuffle=False, pin_memory=True, num_workers=nw)
|
||
Lte = DataLoader(dte, batch_size=args.batch, shuffle=False, pin_memory=True, num_workers=nw)
|
||
|
||
cfg = dict(d_model=256, nhead=4, enc_layers=3, dec_layers=3, d_ff=512)
|
||
net = TimeSeriesTransformer(in_dim=2, out_dim=2, **cfg).to(device)
|
||
|
||
mse = nn.MSELoss()
|
||
opt = torch.optim.AdamW(net.parameters(), lr=args.lr, weight_decay=1e-4)
|
||
|
||
# ---- warmup + cosine 调度(按 step)----
|
||
total_steps = max(1, args.epochs * len(Ltr))
|
||
warmup = min(500, max(1, total_steps // 10))
|
||
def lr_lambda(step):
|
||
if step < warmup:
|
||
return float(step + 1) / float(warmup)
|
||
progress = (step - warmup) / max(1, total_steps - warmup)
|
||
cosine = 0.5 * (1.0 + math.cos(math.pi * progress))
|
||
min_factor = 0.01
|
||
return min_factor + (1 - min_factor) * cosine
|
||
scheduler = torch.optim.lr_scheduler.LambdaLR(opt, lr_lambda=lr_lambda)
|
||
|
||
# === 标准化 <-> 原始空间 转换 & 单位化 ===
|
||
mu_t = torch.from_numpy(mu).to(device) # [1, 2]
|
||
std_t = torch.from_numpy(std).to(device) # [1, 2]
|
||
|
||
def to_raw(z): return z * std_t + mu_t
|
||
def to_std(z): return (z - mu_t) / std_t
|
||
def unitize(z, eps=1e-6):
|
||
n = torch.clamp(torch.linalg.norm(z, dim=-1, keepdim=True), min=eps)
|
||
return z / n
|
||
|
||
# === 预测一个 batch 的自回归函数(返回 raw 空间预测) ===
|
||
def rollout_raw(xb_std, steps):
|
||
# xb_std: [B, S_in, 2] 标准化空间
|
||
tgt_tokens = xb_std[:, -1:, :2].clone() # 起始 token(标准化空间)
|
||
for _ in range(steps):
|
||
mask = nn.Transformer.generate_square_subsequent_mask(tgt_tokens.size(1)).to(device)
|
||
out = net(xb_std, tgt_tokens, tgt_mask=mask) # 标准化
|
||
next_std = out[:, -1:, :]
|
||
next_raw = to_raw(next_std) # -> 原始
|
||
next_raw = unitize(next_raw) # 单位圆约束
|
||
next_std = to_std(next_raw) # -> 标准化(与训练分布一致)
|
||
tgt_tokens = torch.cat([tgt_tokens, next_std], dim=1)
|
||
pred_std = tgt_tokens[:, 1:, :] # [B, steps, 2] 标准化
|
||
return to_raw(pred_std) # -> 原始
|
||
|
||
# === 改动一:闭环 rollout 损失所需的角度/方向损失 ===
|
||
def cosine_ang_loss(pred_sc_raw, true_sc_raw, eps=1e-6):
|
||
"""
|
||
pred_sc_raw/true_sc_raw: [B,S,2],分别是 RAW 空间下的 (sin,cos)
|
||
计算 1 - cos(Δθ) 的平均值,忽略向量长度,只关注方向(角度)差。
|
||
"""
|
||
pu = pred_sc_raw / torch.clamp(torch.linalg.norm(pred_sc_raw, dim=-1, keepdim=True), min=eps)
|
||
tu = true_sc_raw / torch.clamp(torch.linalg.norm(true_sc_raw, dim=-1, keepdim=True), min=eps)
|
||
return (1.0 - (pu * tu).sum(dim=-1)).mean()
|
||
|
||
# === 训练 or 加载 ===
|
||
os.makedirs(os.path.dirname(args.ckpt) or ".", exist_ok=True)
|
||
if args.eval_only:
|
||
if not os.path.exists(args.ckpt):
|
||
raise FileNotFoundError(f"Checkpoint not found: {args.ckpt}. 请先训练一次或指定正确路径。")
|
||
# 关键修复:显式关闭 weights_only 的安全反序列化限制(你信任本地 ckpt)
|
||
payload = torch.load(args.ckpt, map_location=device, weights_only=False)
|
||
# 若 ckpt 中包含 cfg,用 ckpt 的配置重建网络,避免结构不一致
|
||
ckpt_cfg = payload.get("cfg", None)
|
||
if ckpt_cfg is not None:
|
||
# 如果与当前构造的不同,则重建
|
||
if ckpt_cfg != cfg:
|
||
net = TimeSeriesTransformer(in_dim=2, out_dim=2, **ckpt_cfg).to(device)
|
||
net.load_state_dict(payload["model"], strict=True)
|
||
|
||
mu = payload["mu"]; std = payload["std"]
|
||
mu_t = torch.from_numpy(mu).to(device); std_t = torch.from_numpy(std).to(device)
|
||
print(f"Loaded checkpoint from {args.ckpt}")
|
||
else:
|
||
# Sanity:原始空间下 ||yb|| 应 ~1
|
||
with torch.no_grad():
|
||
xb0, yb0 = next(iter(Ltr))
|
||
m_norm_raw = torch.linalg.norm(to_raw(yb0.to(device)), dim=-1).mean().item()
|
||
print(f"[Sanity] mean ||yb|| in RAW space ~= {m_norm_raw:.4f}")
|
||
|
||
best_val = float("inf")
|
||
global_step = 0
|
||
for ep in range(1, args.epochs+1):
|
||
net.train()
|
||
total = 0.0
|
||
total_tf = 0.0
|
||
total_roll = 0.0
|
||
|
||
# ρ:闭环损失权重(线性升到 1.0;如需更稳可改为升到 0.5)
|
||
rho = min(1.0, ep / max(1, args.epochs // 2))
|
||
|
||
for xb, yb in Ltr:
|
||
xb = xb.to(device, non_blocking=True); yb = yb.to(device, non_blocking=True)
|
||
B, S, F = yb.shape
|
||
|
||
# ---------- (A) Teacher Forcing 分支(与原来一致) ----------
|
||
x_last = xb[:, -1:, :F]
|
||
tgt_in = torch.cat([x_last, yb[:, :-1, :]], dim=1)
|
||
mask = nn.Transformer.generate_square_subsequent_mask(S).to(device)
|
||
|
||
pred_tf = net(xb, tgt_in, tgt_mask=mask) # 标准化空间
|
||
pred_tf_u = unitize(to_raw(pred_tf)) # 原始空间单位圆(只对方向约束)
|
||
yb_u = unitize(to_raw(yb))
|
||
loss_tf = mse(pred_tf_u, yb_u)
|
||
|
||
# ---------- (B) 闭环 rollout 分支(改动一的核心) ----------
|
||
with torch.no_grad():
|
||
y_true_raw = to_raw(yb) # RAW,用于对齐真值方向
|
||
y_pred_raw = rollout_raw(xb, steps=S) # RAW,模型自回归滚动预测
|
||
loss_roll = cosine_ang_loss(y_pred_raw, y_true_raw)
|
||
|
||
# ---------- 合成总损失 ----------
|
||
loss = (1.0 - rho) * loss_tf + rho * loss_roll
|
||
|
||
opt.zero_grad(set_to_none=True)
|
||
loss.backward()
|
||
torch.nn.utils.clip_grad_norm_(net.parameters(), 1.0)
|
||
opt.step()
|
||
scheduler.step()
|
||
|
||
total += loss.item() * xb.size(0)
|
||
total_tf += loss_tf.item() * xb.size(0)
|
||
total_roll += loss_roll.item() * xb.size(0)
|
||
global_step += 1
|
||
|
||
tr_loss = total / len(dtr)
|
||
tr_tf = total_tf / len(dtr)
|
||
tr_rl = total_roll / len(dtr)
|
||
|
||
# 验证:保持原先的 TF-式度量(也可扩展加闭环验证,这里先不改动)
|
||
net.eval()
|
||
with torch.no_grad():
|
||
total_v = 0.0
|
||
for xb, yb in Lva:
|
||
xb = xb.to(device, non_blocking=True); yb = yb.to(device, non_blocking=True)
|
||
x_last = xb[:, -1:, :2]
|
||
tgt_in = torch.cat([x_last, yb[:, :-1, :]], dim=1)
|
||
mask = nn.Transformer.generate_square_subsequent_mask(yb.size(1)).to(device)
|
||
pred = net(xb, tgt_in, tgt_mask=mask)
|
||
pred_u = unitize(to_raw(pred))
|
||
yb_u = unitize(to_raw(yb))
|
||
total_v += mse(pred_u, yb_u).item() * xb.size(0)
|
||
va_loss = total_v / len(dva)
|
||
|
||
cur_lr = scheduler.get_last_lr()[0]
|
||
print(f"[Epoch {ep:02d}] ρ={rho:.2f} | train TOTAL={tr_loss:.6f} (TF={tr_tf:.6f}, ROLL={tr_rl:.6f}) | val MSE(u_raw)={va_loss:.6f} | lr={cur_lr:.6e}")
|
||
|
||
if not np.isnan(va_loss) and va_loss < best_val:
|
||
best_val = va_loss
|
||
payload = {"model": net.state_dict(), "mu": mu, "std": std, "cfg": cfg}
|
||
torch.save(payload, args.ckpt)
|
||
print(f"Saved checkpoint -> {args.ckpt}")
|
||
|
||
# ===== 评估 =====
|
||
net.eval()
|
||
rng = np.random.default_rng(args.seed)
|
||
|
||
with torch.no_grad():
|
||
# --- 1) 全量评估(可选) ---
|
||
if args.eval_all:
|
||
steps = args.s_out
|
||
sum_sq = 0.0
|
||
n_total = 0
|
||
per_step_sq = np.zeros(steps, dtype=np.float64)
|
||
total_samples = 0
|
||
|
||
for xb, yb in Lte:
|
||
xb = xb.to(device, non_blocking=True); yb = yb.to(device, non_blocking=True)
|
||
y_pred_raw = rollout_raw(xb, steps) # [B, steps, 2] raw
|
||
y_true_raw = to_raw(yb) # [B, steps, 2] raw
|
||
y_pred_np = y_pred_raw.cpu().numpy()
|
||
y_true_np = y_true_raw.cpu().numpy()
|
||
|
||
# 单位圆 -> 角度误差
|
||
n1 = np.clip(np.linalg.norm(y_pred_np, axis=-1, keepdims=True), 1e-6, None)
|
||
n2 = np.clip(np.linalg.norm(y_true_np, axis=-1, keepdims=True), 1e-6, None)
|
||
y_pred_u = y_pred_np / n1
|
||
y_true_u = y_true_np / n2
|
||
|
||
pa = np.arctan2(y_pred_u[...,0], y_pred_u[...,1]) # 注意顺序 (sin,cos)
|
||
ta = np.arctan2(y_true_u[...,0], y_true_u[...,1])
|
||
diff = wrap_to_pi(pa - ta) # [B, steps]
|
||
|
||
sum_sq += np.sum(diff**2)
|
||
per_step_sq += np.sum(diff**2, axis=0)
|
||
n_total += diff.size
|
||
total_samples += diff.shape[0]
|
||
|
||
overall_rmse = math.sqrt(sum_sq / n_total)
|
||
per_step_rmse = np.sqrt(per_step_sq / max(1, total_samples))
|
||
|
||
print(f"[Test:ALL] overall angle RMSE = {overall_rmse:.6f} rad ({overall_rmse*180/np.pi:.3f} deg)")
|
||
# 保存每步 RMSE 曲线
|
||
ts = np.arange(1, steps+1)
|
||
plt.figure(figsize=(6,4))
|
||
plt.plot(ts, per_step_rmse * 180/np.pi, marker='o')
|
||
plt.xlabel("horizon (step)"); plt.ylabel("RMSE (deg)")
|
||
plt.title("Per-step angle RMSE on test set")
|
||
plt.grid(True, linestyle="--", alpha=0.5)
|
||
plt.tight_layout()
|
||
plt.savefig("rmse_per_step.png", dpi=150)
|
||
print("Saved plot -> rmse_per_step.png")
|
||
|
||
# --- 2) 随机抽样 K 个样本可视化 ---
|
||
X_list, Y_list = [], []
|
||
for xb, yb in Lte:
|
||
X_list.append(xb); Y_list.append(yb)
|
||
X_all = torch.cat(X_list, dim=0).to(device, non_blocking=True)
|
||
Y_all = torch.cat(Y_list, dim=0).to(device, non_blocking=True)
|
||
|
||
N = X_all.size(0)
|
||
K = min(max(0, args.eval_samples), N)
|
||
if K > 0:
|
||
idxs = rng.choice(N, size=K, replace=False)
|
||
steps = Y_all.size(1)
|
||
y_pred_raw_all = rollout_raw(X_all[idxs], steps) # [K, steps, 2]
|
||
y_true_raw_all = to_raw(Y_all[idxs]) # [K, steps, 2]
|
||
|
||
def unit_np(z):
|
||
n = np.linalg.norm(z, axis=-1, keepdims=True)
|
||
n = np.clip(n, 1e-6, None)
|
||
return z / n
|
||
|
||
y_pred_np = y_pred_raw_all.cpu().numpy()
|
||
y_true_np = y_true_raw_all.cpu().numpy()
|
||
y_pred_u = unit_np(y_pred_np)
|
||
y_true_u = unit_np(y_true_np)
|
||
|
||
for j in range(K):
|
||
ya = np.arctan2(y_true_u[j,:,0], y_true_u[j,:,1])
|
||
pa = np.arctan2(y_pred_u[j,:,0], y_pred_u[j,:,1])
|
||
pa_aligned = pa + 2*np.pi * np.round((ya - pa)/(2*np.pi))
|
||
|
||
plt.figure(figsize=(8,4))
|
||
plt.plot(np.arange(steps), ya, label="true θ")
|
||
plt.plot(np.arange(steps), pa_aligned, '--', label="pred θ (aligned)")
|
||
plt.xlabel("future step"); plt.ylabel("angle (rad)")
|
||
plt.title(f"{steps}-step forecast (test sample {j})")
|
||
plt.legend(); plt.tight_layout()
|
||
out_png = f"pred_plot_sample_{j}.png"
|
||
plt.savefig(out_png, dpi=150)
|
||
plt.close()
|
||
print(f"Saved plot -> {out_png}")
|
||
|
||
# 如果没开 --eval_all,也输出一个“单 batch”对齐的整体 RMSE 以免落差太大
|
||
if not args.eval_all:
|
||
steps = args.s_out
|
||
xb, yb = X_all[:args.batch], Y_all[:args.batch]
|
||
y_pred_raw = rollout_raw(xb, steps)
|
||
y_true_raw = to_raw(yb)
|
||
y_pred_np = y_pred_raw.cpu().numpy()
|
||
y_true_np = y_true_raw.cpu().numpy()
|
||
n1 = np.clip(np.linalg.norm(y_pred_np, axis=-1, keepdims=True), 1e-6, None)
|
||
n2 = np.clip(np.linalg.norm(y_true_np, axis=-1, keepdims=True), 1e-6, None)
|
||
y_pred_u = y_pred_np / n1; y_true_u = y_true_np / n2
|
||
pa = np.arctan2(y_pred_u[...,0], y_pred_u[...,1])
|
||
ta = np.arctan2(y_true_u[...,0], y_true_u[...,1])
|
||
diff = wrap_to_pi(pa - ta)
|
||
rmse = math.sqrt(np.mean(diff**2))
|
||
print(f"[Test:Mini] angle RMSE = {rmse:.6f} rad ({rmse*180/np.pi:.3f} deg)")
|
||
|
||
if __name__ == "__main__":
|
||
main() |