From 2df97a60aa247c09758671d669acc5791adb54fd Mon Sep 17 00:00:00 2001 From: feie9454 Date: Wed, 28 Jan 2026 14:40:48 +0800 Subject: [PATCH] wifi --- main.py | 61 +++ .../PingFangSC-Regular.ttf | Bin .../motor_rigol_scan_csv.py | 0 .../scan_20260120_162345.csv | 0 .../scan_20260120_180933.csv | 0 .../visualize_scan_csv.py | 0 oled_qr.py | 395 ++++++++++++++++++ pyproject.toml | 4 + uv.lock | 92 ++++ wifi_control.py | 191 +++++++++ 10 files changed, 743 insertions(+) rename PingFangSC-Regular.ttf => old/PingFangSC-Regular.ttf (100%) rename motor_rigol_scan_csv.py => old/motor_rigol_scan_csv.py (100%) rename scan_20260120_162345.csv => old/scan_20260120_162345.csv (100%) rename scan_20260120_180933.csv => old/scan_20260120_180933.csv (100%) rename visualize_scan_csv.py => old/visualize_scan_csv.py (100%) create mode 100644 oled_qr.py create mode 100644 wifi_control.py diff --git a/main.py b/main.py index 6c46ff3..dbd6f15 100644 --- a/main.py +++ b/main.py @@ -13,9 +13,70 @@ import asyncio from typing import Optional, Callable, List, Dict, Any from pydantic import BaseModel from rigol_phase import measure_phase, DEV as RIGOL_DEV +from oled_qr import start_oled_startup_display +from wifi_control import get_current_ssid, wifi_connect, ensure_wifi_or_start_ap, hotspot_down app = FastAPI() + +@app.on_event("startup") +def _startup_oled() -> None: + # Non-fatal: if OLED is missing/not wired, the service still starts. + def _state_provider(): + with state_lock: + # keep it small and serialization-friendly + return { + "dis": state.get("dis"), + "phase": state.get("phase"), + "freq": state.get("freq"), + "p2p": state.get("p2p"), + "speed": state.get("speed"), + "tasks": list(state.get("tasks", [])), + } + + # Inject provider into oled module without creating import cycles. + setattr(start_oled_startup_display, "state_provider", _state_provider) + start_oled_startup_display() + + +@app.on_event("startup") +def _startup_wifi() -> None: + # If not connected to any WiFi, bring up an AP for provisioning. + # Non-fatal: any errors should not prevent API server from starting. + try: + ensure_wifi_or_start_ap() + except Exception as e: + print(f"[WiFi] startup failed: {e}") + + +class WifiConnectRequest(BaseModel): + ssid: str + password: str = "" + ifname: Optional[str] = None + wait: Optional[int] = None + + +@app.get("/wifi") +def wifi_get(): + ssid = None + try: + ssid = get_current_ssid() + except Exception as e: + raise HTTPException(status_code=500, detail=f"get ssid failed: {e}") + return {"ssid": ssid} + + +@app.post("/wifi") +def wifi_post(req: WifiConnectRequest): + try: + # Stop AP if it was started, then connect as client. + hotspot_down() + wifi_connect(req.ssid, req.password, ifname=req.ifname, wait=req.wait) + ssid = get_current_ssid(ifname=req.ifname) + return {"status": "connected", "ssid": ssid or req.ssid} + except Exception as e: + raise HTTPException(status_code=500, detail=str(e)) + class BatchTaskItem(BaseModel): cmd: str args: Dict[str, Any] = {} diff --git a/PingFangSC-Regular.ttf b/old/PingFangSC-Regular.ttf similarity index 100% rename from PingFangSC-Regular.ttf rename to old/PingFangSC-Regular.ttf diff --git a/motor_rigol_scan_csv.py b/old/motor_rigol_scan_csv.py similarity index 100% rename from motor_rigol_scan_csv.py rename to old/motor_rigol_scan_csv.py diff --git a/scan_20260120_162345.csv b/old/scan_20260120_162345.csv similarity index 100% rename from scan_20260120_162345.csv rename to old/scan_20260120_162345.csv diff --git a/scan_20260120_180933.csv b/old/scan_20260120_180933.csv similarity index 100% rename from scan_20260120_180933.csv rename to old/scan_20260120_180933.csv diff --git a/visualize_scan_csv.py b/old/visualize_scan_csv.py similarity index 100% rename from visualize_scan_csv.py rename to old/visualize_scan_csv.py diff --git a/oled_qr.py b/oled_qr.py new file mode 100644 index 0000000..ff71553 --- /dev/null +++ b/oled_qr.py @@ -0,0 +1,395 @@ +import os +import socket +import subprocess +import threading +import time +from dataclasses import dataclass +from typing import Optional, Callable, Dict, Any + + +def _truthy(value: Optional[str]) -> bool: + if value is None: + return False + return value.strip().lower() in {"1", "true", "yes", "y", "on"} + + +def _get_lan_ip() -> Optional[str]: + """Best-effort LAN IP detection. + + Uses a UDP socket trick (no packets actually need to be received) to learn + which local interface would be used to reach the internet. + """ + try: + s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + try: + s.connect(("8.8.8.8", 80)) + ip = s.getsockname()[0] + if ip and ip != "0.0.0.0": + return ip + finally: + s.close() + except Exception: + pass + + try: + hostname = socket.gethostname() + ip = socket.gethostbyname(hostname) + if ip and ip != "127.0.0.1": + return ip + except Exception: + pass + + return None + + +def build_public_url() -> str: + """Build a URL that should be reachable from LAN clients.""" + explicit = os.getenv("PUBLIC_URL", "").strip() + if explicit: + return explicit + + ip = _get_lan_ip() or os.getenv("HOST", "0.0.0.0") + + # If host is 0.0.0.0, it's not a client-reachable address. + if ip in {"0.0.0.0", "127.0.0.1"}: + ip = _get_lan_ip() or "raspberrypi.local" + + scheme = os.getenv("PUBLIC_SCHEME", "http").strip() or "http" + return f"{scheme}://{ip}" + + +from luma.core.interface.serial import i2c +from luma.oled.device import ssd1306 + +def _init_oled_device(): + address = int(os.getenv("OLED_I2C_ADDRESS", "0x3C"), 0) + port = int(os.getenv("OLED_I2C_PORT", "1")) + rotate = int(os.getenv("OLED_ROTATE", "0")) + + serial = i2c(port=port, address=address) + + return ssd1306(serial, width=128, height=64, rotate=rotate) + + +def _make_qr_64(url: str): + import qrcode + from PIL import Image + + # Left: 64x64 QR + qr = qrcode.QRCode( + version=None, + error_correction=qrcode.constants.ERROR_CORRECT_M, + box_size=2, + border=1, + ) + qr.add_data(url) + qr.make(fit=True) + + # Use explicit black/white to avoid near-black values like 1 that can + # get thresholded to all-black when converted to 1-bit. + qr_img = qr.make_image(fill_color="black", back_color="white").convert("1") + # Fit into 64x64 (keep square) + return qr_img.resize((64, 64), Image.NEAREST) + + +def _format_state_lines(state: Optional[Dict[str, Any]]) -> list[str]: + if not state: + return ["state: n/a"] + + dis_steps = state.get("dis") + phase = state.get("phase") + freq = state.get("freq") + p2p = state.get("p2p") + + def _fmt_float(value: Any, *, decimals: int) -> str: + try: + if value is None or isinstance(value, bool): + return "-" + f = float(value) + return f"{f:.{decimals}f}" if decimals > 0 else f"{f:.0f}" + except Exception: + return "-" + + # Keep units explicit as requested; no labels; we'll right-align later. + try: + dis_mm = float(dis_steps or 0) / 1600.0 + except Exception: + dis_mm = None + + try: + freq_khz = (float(freq) / 1000.0) if freq is not None else None + except Exception: + freq_khz = None + + lines: list[str] = [] + lines.append(f"{_fmt_float(dis_mm, decimals=2)} mm") + lines.append(f"{_fmt_float(freq_khz, decimals=2)} kHz") + lines.append(f"{_fmt_float(phase, decimals=3)} rad") + lines.append(f"{_fmt_float(p2p, decimals=0)} ADC") + return lines + + +def _load_mono_font(): + from PIL import ImageFont + + # Prefer a real monospace TTF if available. + candidates = [] + env_path = os.getenv("OLED_FONT_PATH", "").strip() + if env_path: + candidates.append(env_path) + candidates.extend( + [ + "/usr/share/fonts/truetype/dejavu/DejaVuSansMono.ttf", + "/usr/share/fonts/truetype/dejavu/DejaVuSansMono-Bold.ttf", + "/usr/share/fonts/truetype/liberation/LiberationMono-Regular.ttf", + ] + ) + + size = int(os.getenv("OLED_FONT_SIZE", "10")) + for path in candidates: + try: + return ImageFont.truetype(path, size=size) + except Exception: + continue + + # Fallback: PIL default (often bitmap monospace) + return ImageFont.load_default() + + +def _max_chars_for_width(font, max_px: int) -> int: + try: + # Estimate monospace char width from '0' + bbox = font.getbbox("0") + char_w = max(1, bbox[2] - bbox[0]) + return max(1, int(max_px // char_w)) + except Exception: + # Conservative fallback + return max(1, int(max_px // 6)) + + +def _get_wifi_ssid() -> str: + """Best-effort current WiFi SSID. + + Tries common tools on Raspberry Pi OS. Never raises. + """ + env = os.getenv("WIFI_SSID", "").strip() + if env: + return env + + def _run(cmd: list[str]) -> str: + try: + p = subprocess.run(cmd, capture_output=True, text=True, timeout=0.5) + if p.returncode == 0: + return (p.stdout or "").strip() + except Exception: + return "" + return "" + + # Most minimal dependency + ssid = _run(["iwgetid", "-r"]) + if ssid: + return ssid + + # NetworkManager + out = _run(["nmcli", "-t", "-f", "active,ssid", "dev", "wifi"]) + if out: + for line in out.splitlines(): + # yes: + if line.startswith("yes:"): + return line.split(":", 1)[1].strip() + + # wpa_supplicant + out = _run(["wpa_cli", "-i", "wlan0", "status"]) + if out: + for line in out.splitlines(): + if line.startswith("ssid="): + return line.split("=", 1)[1].strip() + + return "" + + +def _make_wifi_line(ssid: str, max_chars: int, scroll_offset: int) -> str: + """Build the right-top WiFi line with optional scrolling. + + The prefix is fixed; only the SSID part scrolls when it doesn't fit. + """ + prefix = "WiFi:" + if max_chars <= 0: + return "" + + ssid = (ssid or "").strip() or "-" + + # Keep prefix visible when possible. + # We render as: "WiFi:" + " " + + join = " " + base = prefix + join + + if len(base) >= max_chars: + # Not enough space even for full prefix; truncate prefix. + return (prefix[:max_chars]).ljust(max_chars) + + avail = max_chars - len(base) + if len(ssid) <= avail: + return (base + ssid).ljust(max_chars) + + pad = os.getenv("OLED_SSID_SCROLL_PAD", " ") + if not pad: + pad = " " + scroll_text = ssid + pad + if not scroll_text: + return (base + "-").ljust(max_chars) + + offset = scroll_offset % len(scroll_text) + # Wrap-around slice of length `avail` + doubled = scroll_text + scroll_text + window = doubled[offset : offset + avail] + return (base + window).ljust(max_chars) + + +def _render_screen( + qr_img_64, + state: Optional[Dict[str, Any]], + *, + wifi_line: Optional[str] = None, + font=None, + max_chars: Optional[int] = None, +): + from PIL import Image, ImageDraw + + canvas = Image.new("1", (128, 64), 0) + canvas.paste(qr_img_64, (0, 0)) + + draw = ImageDraw.Draw(canvas) + if font is None: + font = _load_mono_font() + + x = 66 + top_y = 0 + if max_chars is None: + max_chars = _max_chars_for_width(font, 128 - x) + + # Right-top: WiFi SSID + ssid_text = wifi_line if wifi_line is not None else "" + if len(ssid_text) > max_chars: + ssid_text = ssid_text[:max_chars] + draw.text((x, top_y), ssid_text, font=font, fill=255) + + # Right-bottom: state lines + lines = _format_state_lines(state) + line_h = 12 + y = 64 - (len(lines) * line_h) + # Don't overlap SSID line + if y < line_h: + y = line_h + + for line in lines: + text = str(line) + if len(text) > max_chars: + text = text[-max_chars:] + else: + text = text.rjust(max_chars) + draw.text((x, y), text, font=font, fill=255) + y += line_h + + return canvas + + +@dataclass +class OLEDStartupDisplay: + stop_event: threading.Event + thread: threading.Thread + + +_manager: Optional[OLEDStartupDisplay] = None + + +def start_oled_startup_display() -> None: + """Show server URL QR code + live state on a 128x64 I2C OLED. + + Safe by design: failures only log, never crash the API server. + """ + global _manager + + if _manager is not None: + return + + if not _truthy(os.getenv("OLED_ENABLED", "1")): + return + + stop_event = threading.Event() + + def _run(): + # A few retries in case I2C isn't ready instantly. + last_error: Optional[Exception] = None + for _ in range(3): + if stop_event.is_set(): + return + try: + device = _init_oled_device() + url = build_public_url() + qr_img_64 = _make_qr_64(url) + + # Optional live updates using a state provider. + state_provider: Optional[Callable[[], Dict[str, Any]]] = getattr( + start_oled_startup_display, "state_provider", None + ) + refresh_s = float(os.getenv("OLED_REFRESH_S", "0.25")) + if refresh_s <= 0: + refresh_s = 0.25 + + ssid_refresh_s = float(os.getenv("OLED_SSID_REFRESH_S", "2.0")) + if ssid_refresh_s <= 0: + ssid_refresh_s = 2.0 + last_ssid_check = 0.0 + current_ssid = "" + ssid_scroll_offset = 0 + + while not stop_event.is_set(): + now = time.time() + if (now - last_ssid_check) >= ssid_refresh_s: + last_ssid_check = now + new_ssid = _get_wifi_ssid() + if new_ssid != current_ssid: + current_ssid = new_ssid + ssid_scroll_offset = 0 + + st = None + if state_provider is not None: + try: + st = state_provider() + except Exception: + st = None + + # Cache font/width for consistent layout and less overhead. + if "oled_font" not in locals(): + oled_font = _load_mono_font() + oled_max_chars = _max_chars_for_width(oled_font, 128 - 66) + + wifi_line = _make_wifi_line(current_ssid, oled_max_chars, ssid_scroll_offset) + # Scroll only when SSID is actually too long to fit. + if current_ssid and len((current_ssid or "").strip()) > max(0, oled_max_chars - len("WiFi: ")): + ssid_scroll_offset += 1 + + image = _render_screen(qr_img_64, st, wifi_line=wifi_line, font=oled_font, max_chars=oled_max_chars) + device.display(image) + time.sleep(refresh_s) + + return + except Exception as e: + last_error = e + time.sleep(0.5) + + if last_error is not None: + print(f"[OLED] init/display failed: {last_error}") + + t = threading.Thread(target=_run, daemon=True) + t.start() + _manager = OLEDStartupDisplay(stop_event=stop_event, thread=t) + + +def stop_oled_startup_display() -> None: + global _manager + if _manager is None: + return + _manager.stop_event.set() + _manager = None diff --git a/pyproject.toml b/pyproject.toml index 3f40e33..ed6f0e5 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -7,13 +7,17 @@ requires-python = ">=3.13" dependencies = [ "fastapi>=0.128.0", "gpiozero>=2.0.1", + "luma.oled", "matplotlib>=3.8.0", + "nmcli", "numpy>=2.4.0", + "pillow>=10.0.0", "pigpio>=1.78", "pyserial>=3.5", "pyusb>=1.3.1", "pyvisa>=1.16.0", "pyvisa-py>=0.8.1", + "qrcode[pil]>=7.4.2", "uvicorn[standard]>=0.35.0", ] diff --git a/uv.lock b/uv.lock index f0f9aab..bfb23cb 100644 --- a/uv.lock +++ b/uv.lock @@ -32,6 +32,29 @@ wheels = [ { url = "https://pypi.tuna.tsinghua.edu.cn/packages/38/0e/27be9fdef66e72d64c0cdc3cc2823101b80585f8119b5c112c2e8f5f7dab/anyio-4.12.1-py3-none-any.whl", hash = "sha256:d405828884fc140aa80a3c667b8beed277f1dfedec42ba031bd6ac3db606ab6c", size = 113592, upload-time = "2026-01-06T11:45:19.497Z" }, ] +[[package]] +name = "cbor2" +version = "5.8.0" +source = { registry = "https://pypi.tuna.tsinghua.edu.cn/simple" } +sdist = { url = "https://pypi.tuna.tsinghua.edu.cn/packages/d9/8e/8b4fdde28e42ffcd741a37f4ffa9fb59cd4fe01625b544dfcfd9ccb54f01/cbor2-5.8.0.tar.gz", hash = "sha256:b19c35fcae9688ac01ef75bad5db27300c2537eb4ee00ed07e05d8456a0d4931", size = 107825, upload-time = "2025-12-30T18:44:22.455Z" } +wheels = [ + { url = "https://pypi.tuna.tsinghua.edu.cn/packages/a6/0d/5a3f20bafaefeb2c1903d961416f051c0950f0d09e7297a3aa6941596b29/cbor2-5.8.0-cp313-cp313-macosx_11_0_arm64.whl", hash = "sha256:6d8d104480845e2f28c6165b4c961bbe58d08cb5638f368375cfcae051c28015", size = 70332, upload-time = "2025-12-30T18:43:54.694Z" }, + { url = "https://pypi.tuna.tsinghua.edu.cn/packages/57/66/177a3f089e69db69c987453ab4934086408c3338551e4984734597be9f80/cbor2-5.8.0-cp313-cp313-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:43efee947e5ab67d406d6e0dc61b5dee9d2f5e89ae176f90677a3741a20ca2e7", size = 285985, upload-time = "2025-12-30T18:43:55.733Z" }, + { url = "https://pypi.tuna.tsinghua.edu.cn/packages/b7/8e/9e17b8e4ed80a2ce97e2dfa5915c169dbb31599409ddb830f514b57f96cc/cbor2-5.8.0-cp313-cp313-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:be7ae582f50be539e09c134966d0fd63723fc4789b8dff1f6c2e3f24ae3eaf32", size = 285173, upload-time = "2025-12-30T18:43:57.321Z" }, + { url = "https://pypi.tuna.tsinghua.edu.cn/packages/cc/33/9f92e107d78f88ac22723ac15d0259d220ba98c1d855e51796317f4c4114/cbor2-5.8.0-cp313-cp313-musllinux_1_2_aarch64.whl", hash = "sha256:50f5c709561a71ea7970b4cd2bf9eda4eccacc0aac212577080fdfe64183e7f5", size = 278395, upload-time = "2025-12-30T18:43:58.497Z" }, + { url = "https://pypi.tuna.tsinghua.edu.cn/packages/2f/3f/46b80050a4a35ce5cf7903693864a9fdea7213567dc8faa6e25cb375c182/cbor2-5.8.0-cp313-cp313-musllinux_1_2_x86_64.whl", hash = "sha256:a6790ecc73aa93e76d2d9076fc42bf91a9e69f2295e5fa702e776dbe986465bd", size = 278330, upload-time = "2025-12-30T18:43:59.656Z" }, + { url = "https://pypi.tuna.tsinghua.edu.cn/packages/eb/d2/d41f8c04c783a4d204e364be2d38043d4f732a3bed6f4c732e321cf34c7b/cbor2-5.8.0-cp313-cp313-win_amd64.whl", hash = "sha256:c114af8099fa65a19a514db87ce7a06e942d8fea2730afd49be39f8e16e7f5e0", size = 69841, upload-time = "2025-12-30T18:44:01.159Z" }, + { url = "https://pypi.tuna.tsinghua.edu.cn/packages/1b/8c/0397a82f6e67665009951453c83058e4c77ba54b9a9017ede56d6870306c/cbor2-5.8.0-cp313-cp313-win_arm64.whl", hash = "sha256:ab3ba00494ad8669a459b12a558448d309c271fa4f89b116ad496ee35db38fea", size = 64982, upload-time = "2025-12-30T18:44:02.138Z" }, + { url = "https://pypi.tuna.tsinghua.edu.cn/packages/4b/0c/0654233d7543ac8a50f4785f172430ddc97538ba418eb305d6e529d1a120/cbor2-5.8.0-cp314-cp314-macosx_11_0_arm64.whl", hash = "sha256:ad72381477133046ce217617d839ea4e9454f8b77d9a6351b229e214102daeb7", size = 70710, upload-time = "2025-12-30T18:44:03.209Z" }, + { url = "https://pypi.tuna.tsinghua.edu.cn/packages/84/62/4671d24e557d7f5a74a01b422c538925140c0495e57decde7e566f91d029/cbor2-5.8.0-cp314-cp314-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:6da25190fad3434ce99876b11d4ca6b8828df6ca232cf7344cd14ae1166fb718", size = 285005, upload-time = "2025-12-30T18:44:05.109Z" }, + { url = "https://pypi.tuna.tsinghua.edu.cn/packages/87/85/0c67d763a08e848c9a80d7e4723ba497cce676f41bc7ca1828ae90a0a872/cbor2-5.8.0-cp314-cp314-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:c13919e3a24c5a6d286551fa288848a4cedc3e507c58a722ccd134e461217d99", size = 282435, upload-time = "2025-12-30T18:44:06.465Z" }, + { url = "https://pypi.tuna.tsinghua.edu.cn/packages/b2/01/0650972b4dbfbebcfbe37cbba7fc3cd9019a8da6397ab3446e07175e342b/cbor2-5.8.0-cp314-cp314-musllinux_1_2_aarch64.whl", hash = "sha256:f8c40d32e5972047a777f9bf730870828f3cf1c43b3eb96fd0429c57a1d3b9e6", size = 277493, upload-time = "2025-12-30T18:44:07.609Z" }, + { url = "https://pypi.tuna.tsinghua.edu.cn/packages/b3/6c/7704a4f32adc7f10f3b41ec067f500a4458f7606397af5e4cf2d368fd288/cbor2-5.8.0-cp314-cp314-musllinux_1_2_x86_64.whl", hash = "sha256:7627894bc0b3d5d0807f31e3107e11b996205470c4429dc2bb4ef8bfe7f64e1e", size = 276085, upload-time = "2025-12-30T18:44:09.021Z" }, + { url = "https://pypi.tuna.tsinghua.edu.cn/packages/88/6d/e43452347630efe8133f5304127539100d937c138c0996d27ec63963ec2c/cbor2-5.8.0-cp314-cp314-win_amd64.whl", hash = "sha256:b51c5e59becae746ca4de2bbaa8a2f5c64a68fec05cea62941b1a84a8335f7d1", size = 71657, upload-time = "2025-12-30T18:44:10.162Z" }, + { url = "https://pypi.tuna.tsinghua.edu.cn/packages/8b/66/9a780ef34ab10a0437666232e885378cdd5f60197b1b5e61a62499e5a10a/cbor2-5.8.0-cp314-cp314-win_arm64.whl", hash = "sha256:53b630f4db4b9f477ad84077283dd17ecf9894738aa17ef4938c369958e02a71", size = 67171, upload-time = "2025-12-30T18:44:11.619Z" }, + { url = "https://pypi.tuna.tsinghua.edu.cn/packages/d6/4f/101071f880b4da05771128c0b89f41e334cff044dee05fb013c8f4be661c/cbor2-5.8.0-py3-none-any.whl", hash = "sha256:3727d80f539567b03a7aa11890e57798c67092c38df9e6c23abb059e0f65069c", size = 24374, upload-time = "2025-12-30T18:44:21.476Z" }, +] + [[package]] name = "click" version = "8.3.1" @@ -288,6 +311,32 @@ wheels = [ { url = "https://pypi.tuna.tsinghua.edu.cn/packages/80/be/3578e8afd18c88cdf9cb4cffde75a96d2be38c5a903f1ed0ceec061bd09e/kiwisolver-1.4.9-cp314-cp314t-win_arm64.whl", hash = "sha256:4a48a2ce79d65d363597ef7b567ce3d14d68783d2b2263d98db3d9477805ba32", size = 70260, upload-time = "2025-08-10T21:27:36.606Z" }, ] +[[package]] +name = "luma-core" +version = "2.5.3" +source = { registry = "https://pypi.tuna.tsinghua.edu.cn/simple" } +dependencies = [ + { name = "cbor2" }, + { name = "pillow" }, + { name = "smbus2" }, +] +sdist = { url = "https://pypi.tuna.tsinghua.edu.cn/packages/e6/a3/0abb456daf2279483579bed6cf2a7305f93f56ab89f0f238f206fffce303/luma_core-2.5.3.tar.gz", hash = "sha256:ecfb1c12fc32f8ee6cff0f613804b2609387c17547f739d002649f2e6d56ec2f", size = 105745, upload-time = "2025-12-16T21:56:28.065Z" } +wheels = [ + { url = "https://pypi.tuna.tsinghua.edu.cn/packages/59/de/eb014859db3b59eaa35b157451121fbd8cffb96da8f4f52b4fa223fe0bc7/luma_core-2.5.3-py3-none-any.whl", hash = "sha256:ad466acb7bc805ad87cf1ed591d1d0588c3fa9900cba338d4eebf02a4226b95c", size = 72744, upload-time = "2025-12-16T21:56:26.277Z" }, +] + +[[package]] +name = "luma-oled" +version = "3.14.0" +source = { registry = "https://pypi.tuna.tsinghua.edu.cn/simple" } +dependencies = [ + { name = "luma-core" }, +] +sdist = { url = "https://pypi.tuna.tsinghua.edu.cn/packages/95/a7/c5a51d4980a3a4db3b63570731eeac61f2e1142d5462f7e2866ed9d87070/luma_oled-3.14.0.tar.gz", hash = "sha256:36218565eda0614c8cf44ef42cb9a5904ddf808e4516e99ddae111fc93c5a206", size = 16429131, upload-time = "2024-11-02T23:20:47.97Z" } +wheels = [ + { url = "https://pypi.tuna.tsinghua.edu.cn/packages/08/a4/8bbfef1670dc46105e7c93508b99356e8214f9e75d0ddcf24159b654c61d/luma.oled-3.14.0-py2.py3-none-any.whl", hash = "sha256:c1a2063242e1732889be9e3508440de728bf5a5dbef8005b64248b8fd1c145fb", size = 32791, upload-time = "2024-11-02T23:20:45.874Z" }, +] + [[package]] name = "matplotlib" version = "3.10.8" @@ -342,13 +391,17 @@ source = { virtual = "." } dependencies = [ { name = "fastapi" }, { name = "gpiozero" }, + { name = "luma-oled" }, { name = "matplotlib" }, + { name = "nmcli" }, { name = "numpy" }, { name = "pigpio" }, + { name = "pillow" }, { name = "pyserial" }, { name = "pyusb" }, { name = "pyvisa" }, { name = "pyvisa-py" }, + { name = "qrcode", extra = ["pil"] }, { name = "uvicorn", extra = ["standard"] }, ] @@ -356,16 +409,29 @@ dependencies = [ requires-dist = [ { name = "fastapi", specifier = ">=0.128.0" }, { name = "gpiozero", specifier = ">=2.0.1" }, + { name = "luma-oled" }, { name = "matplotlib", specifier = ">=3.8.0" }, + { name = "nmcli" }, { name = "numpy", specifier = ">=2.4.0" }, { name = "pigpio", specifier = ">=1.78" }, + { name = "pillow", specifier = ">=10.0.0" }, { name = "pyserial", specifier = ">=3.5" }, { name = "pyusb", specifier = ">=1.3.1" }, { name = "pyvisa", specifier = ">=1.16.0" }, { name = "pyvisa-py", specifier = ">=0.8.1" }, + { name = "qrcode", extras = ["pil"], specifier = ">=7.4.2" }, { name = "uvicorn", extras = ["standard"], specifier = ">=0.35.0" }, ] +[[package]] +name = "nmcli" +version = "1.7.0" +source = { registry = "https://pypi.tuna.tsinghua.edu.cn/simple" } +sdist = { url = "https://pypi.tuna.tsinghua.edu.cn/packages/1f/fd/01e2cc2d84cf52653e59a2ce4fe0ebb79b388523be753befe843064394c5/nmcli-1.7.0.tar.gz", hash = "sha256:4fb17b6c33d276a264a27b7109fa1d70987570536fa8852b51830f9f7732f982", size = 21209, upload-time = "2026-01-03T02:15:20.137Z" } +wheels = [ + { url = "https://pypi.tuna.tsinghua.edu.cn/packages/8a/b7/648f9f6989c6c2860ebb403c762137578ca412fe9932b85ab33dc21517fa/nmcli-1.7.0-py3-none-any.whl", hash = "sha256:27144f03600d5e53c09cfa500cd7160dea61f83c3669ccd736401793b65ce980", size = 19905, upload-time = "2026-01-03T02:15:18.799Z" }, +] + [[package]] name = "numpy" version = "2.4.1" @@ -669,6 +735,23 @@ wheels = [ { url = "https://pypi.tuna.tsinghua.edu.cn/packages/f1/12/de94a39c2ef588c7e6455cfbe7343d3b2dc9d6b6b2f40c4c6565744c873d/pyyaml-6.0.3-cp314-cp314t-win_arm64.whl", hash = "sha256:ebc55a14a21cb14062aa4162f906cd962b28e2e9ea38f9b4391244cd8de4ae0b", size = 149341, upload-time = "2025-09-25T21:32:56.828Z" }, ] +[[package]] +name = "qrcode" +version = "8.2" +source = { registry = "https://pypi.tuna.tsinghua.edu.cn/simple" } +dependencies = [ + { name = "colorama", marker = "sys_platform == 'win32'" }, +] +sdist = { url = "https://pypi.tuna.tsinghua.edu.cn/packages/8f/b2/7fc2931bfae0af02d5f53b174e9cf701adbb35f39d69c2af63d4a39f81a9/qrcode-8.2.tar.gz", hash = "sha256:35c3f2a4172b33136ab9f6b3ef1c00260dd2f66f858f24d88418a015f446506c", size = 43317, upload-time = "2025-05-01T15:44:24.726Z" } +wheels = [ + { url = "https://pypi.tuna.tsinghua.edu.cn/packages/dd/b8/d2d6d731733f51684bbf76bf34dab3b70a9148e8f2cef2bb544fccec681a/qrcode-8.2-py3-none-any.whl", hash = "sha256:16e64e0716c14960108e85d853062c9e8bba5ca8252c0b4d0231b9df4060ff4f", size = 45986, upload-time = "2025-05-01T15:44:22.781Z" }, +] + +[package.optional-dependencies] +pil = [ + { name = "pillow" }, +] + [[package]] name = "setuptools" version = "80.9.0" @@ -687,6 +770,15 @@ wheels = [ { url = "https://pypi.tuna.tsinghua.edu.cn/packages/b7/ce/149a00dd41f10bc29e5921b496af8b574d8413afcd5e30dfa0ed46c2cc5e/six-1.17.0-py2.py3-none-any.whl", hash = "sha256:4721f391ed90541fddacab5acf947aa0d3dc7d27b2e1e8eda2be8970586c3274", size = 11050, upload-time = "2024-12-04T17:35:26.475Z" }, ] +[[package]] +name = "smbus2" +version = "0.6.0" +source = { registry = "https://pypi.tuna.tsinghua.edu.cn/simple" } +sdist = { url = "https://pypi.tuna.tsinghua.edu.cn/packages/4e/36/afafd43770caae69f04e21402552a8f94a072def46a002fab9357f4852ce/smbus2-0.6.0.tar.gz", hash = "sha256:9b5ff1e998e114730f9dfe0c4babbef06c92468cfb61eaa684e30f225661b95b", size = 17403, upload-time = "2025-12-20T09:02:52.017Z" } +wheels = [ + { url = "https://pypi.tuna.tsinghua.edu.cn/packages/a5/cf/2e1d6805da6f9c9b3a4358076ff2e072d828ba7fed124edc1b729e210c55/smbus2-0.6.0-py2.py3-none-any.whl", hash = "sha256:03d83d2a9a4afc5ddca0698ccabf101cb3de52bc5aefd7b76778ffb27ff654e0", size = 11849, upload-time = "2025-12-20T09:02:51.219Z" }, +] + [[package]] name = "starlette" version = "0.50.0" diff --git a/wifi_control.py b/wifi_control.py new file mode 100644 index 0000000..925b337 --- /dev/null +++ b/wifi_control.py @@ -0,0 +1,191 @@ +import os +from typing import Any, Optional + + +def _truthy(value: Optional[str]) -> bool: + if value is None: + return False + return value.strip().lower() in {"1", "true", "yes", "y", "on"} + + +def _nmcli(): + import nmcli # type: ignore + + # Ensure stable English-ish outputs internally. + try: + nmcli.set_lang("C.UTF-8") + except Exception: + pass + + # The python wrapper uses sudo by default. If we run as root, disable sudo. + try: + if hasattr(os, "geteuid") and os.geteuid() == 0: + nmcli.disable_use_sudo() + except Exception: + pass + + if _truthy(os.getenv("NMCLI_DISABLE_SUDO")): + try: + nmcli.disable_use_sudo() + except Exception: + pass + + return nmcli + + +def get_wifi_ifname() -> str: + return os.getenv("WIFI_IFNAME", "wlan0").strip() or "wlan0" + + +def get_current_ssid(*, ifname: Optional[str] = None) -> Optional[str]: + """Return current connected WiFi SSID if any, else None.""" + nmcli = _nmcli() + ifname = ifname or get_wifi_ifname() + + # Preferred: find "in_use" AP from scan list (no rescan by default). + try: + aps = nmcli.device.wifi(ifname=ifname, rescan=False) + for ap in aps or []: + in_use = getattr(ap, "in_use", None) + ssid = getattr(ap, "ssid", None) + if in_use in (True, "*", "yes") and ssid: + return str(ssid) + except Exception: + pass + + # Fallback: device status provides active connection name (often equals SSID). + try: + devices = nmcli.device.status() + for d in devices or []: + dtype = (getattr(d, "type", "") or "").lower() + if "wifi" not in dtype and "wireless" not in dtype: + continue + if (getattr(d, "device", None) or getattr(d, "ifname", None) or "") != ifname: + continue + state = (getattr(d, "state", "") or "").lower() + conn = getattr(d, "connection", None) or getattr(d, "con", None) + if "connected" in state and conn and conn not in {"--", "-"}: + return str(conn) + except Exception: + pass + + return None + + +def wifi_connect( + ssid: str, + password: str = "", + *, + ifname: Optional[str] = None, + wait: Optional[int] = None, +) -> None: + """Connect to the given SSID using NetworkManager (nmcli wrapper).""" + nmcli = _nmcli() + ifname = ifname or get_wifi_ifname() + + ssid = (ssid or "").strip() + if not ssid: + raise ValueError("ssid is required") + + if wait is None: + wait = int(os.getenv("WIFI_CONNECT_WAIT", "30")) + + # Ensure Wi-Fi radio is on. + try: + nmcli.radio.wifi_on() + except Exception: + pass + + nmcli.device.wifi_connect(ssid, password or "", ifname=ifname, wait=wait) + + +def _hotspot_config() -> dict[str, Any]: + return { + "ifname": get_wifi_ifname(), + "con_name": os.getenv("AP_CON_NAME", "motor-scan-ap").strip() or "motor-scan-ap", + "ssid": os.getenv("AP_SSID", "motor-scan").strip() or "motor-scan", + "password": os.getenv("AP_PASSWORD", "motor-scan-1234").strip() or "motor-scan-1234", + "band": os.getenv("AP_BAND", "bg").strip() or None, + "channel": int(os.getenv("AP_CHANNEL", "6")), + } + + +def hotspot_up() -> dict[str, Any]: + """Start an AP (hotspot) if not connected to WiFi.""" + nmcli = _nmcli() + cfg = _hotspot_config() + + # NetworkManager requires WPA2 password >= 8 chars if provided. + if cfg["password"] and len(cfg["password"]) < 8: + raise ValueError("AP_PASSWORD must be at least 8 characters") + + try: + nmcli.radio.wifi_on() + except Exception: + pass + + hs = nmcli.device.wifi_hotspot( + ifname=cfg["ifname"], + con_name=cfg["con_name"], + ssid=cfg["ssid"], + band=cfg["band"], + channel=cfg["channel"], + password=cfg["password"], + ) + + return { + "mode": "ap", + "ifname": cfg["ifname"], + "con_name": cfg["con_name"], + "ssid": cfg["ssid"], + "channel": cfg["channel"], + "band": cfg["band"], + "hotspot": getattr(hs, "__dict__", None) or str(hs), + } + + +def hotspot_down() -> None: + nmcli = _nmcli() + cfg = _hotspot_config() + try: + nmcli.connection.down(cfg["con_name"], wait=10) + except Exception: + pass + + if _truthy(os.getenv("AP_DELETE_ON_CONNECT", "0")): + try: + nmcli.connection.delete(cfg["con_name"], wait=10) + except Exception: + pass + + +def get_wifi_mode() -> str: + """Return 'client', 'ap' or 'none' (best-effort).""" + ssid = get_current_ssid() + if ssid: + return "client" + + nmcli = _nmcli() + cfg = _hotspot_config() + try: + active = nmcli.connection.show_all(active=True) + for c in active or []: + name = getattr(c, "name", None) or getattr(c, "connection", None) + if name == cfg["con_name"]: + return "ap" + except Exception: + pass + + return "none" + + +def ensure_wifi_or_start_ap() -> dict[str, Any]: + """If no WiFi connection, start AP; return the resulting mode and details.""" + if not _truthy(os.getenv("WIFI_MANAGE_ENABLED", "1")): + return {"mode": "disabled"} + + ssid = get_current_ssid() + if ssid: + return {"mode": "client", "ssid": ssid} + + return hotspot_up()