motor-scan-server/wifi_control.py
2026-01-28 14:40:48 +08:00

192 lines
5.3 KiB
Python

import os
from typing import Any, Optional
def _truthy(value: Optional[str]) -> bool:
if value is None:
return False
return value.strip().lower() in {"1", "true", "yes", "y", "on"}
def _nmcli():
import nmcli # type: ignore
# Ensure stable English-ish outputs internally.
try:
nmcli.set_lang("C.UTF-8")
except Exception:
pass
# The python wrapper uses sudo by default. If we run as root, disable sudo.
try:
if hasattr(os, "geteuid") and os.geteuid() == 0:
nmcli.disable_use_sudo()
except Exception:
pass
if _truthy(os.getenv("NMCLI_DISABLE_SUDO")):
try:
nmcli.disable_use_sudo()
except Exception:
pass
return nmcli
def get_wifi_ifname() -> str:
return os.getenv("WIFI_IFNAME", "wlan0").strip() or "wlan0"
def get_current_ssid(*, ifname: Optional[str] = None) -> Optional[str]:
"""Return current connected WiFi SSID if any, else None."""
nmcli = _nmcli()
ifname = ifname or get_wifi_ifname()
# Preferred: find "in_use" AP from scan list (no rescan by default).
try:
aps = nmcli.device.wifi(ifname=ifname, rescan=False)
for ap in aps or []:
in_use = getattr(ap, "in_use", None)
ssid = getattr(ap, "ssid", None)
if in_use in (True, "*", "yes") and ssid:
return str(ssid)
except Exception:
pass
# Fallback: device status provides active connection name (often equals SSID).
try:
devices = nmcli.device.status()
for d in devices or []:
dtype = (getattr(d, "type", "") or "").lower()
if "wifi" not in dtype and "wireless" not in dtype:
continue
if (getattr(d, "device", None) or getattr(d, "ifname", None) or "") != ifname:
continue
state = (getattr(d, "state", "") or "").lower()
conn = getattr(d, "connection", None) or getattr(d, "con", None)
if "connected" in state and conn and conn not in {"--", "-"}:
return str(conn)
except Exception:
pass
return None
def wifi_connect(
ssid: str,
password: str = "",
*,
ifname: Optional[str] = None,
wait: Optional[int] = None,
) -> None:
"""Connect to the given SSID using NetworkManager (nmcli wrapper)."""
nmcli = _nmcli()
ifname = ifname or get_wifi_ifname()
ssid = (ssid or "").strip()
if not ssid:
raise ValueError("ssid is required")
if wait is None:
wait = int(os.getenv("WIFI_CONNECT_WAIT", "30"))
# Ensure Wi-Fi radio is on.
try:
nmcli.radio.wifi_on()
except Exception:
pass
nmcli.device.wifi_connect(ssid, password or "", ifname=ifname, wait=wait)
def _hotspot_config() -> dict[str, Any]:
return {
"ifname": get_wifi_ifname(),
"con_name": os.getenv("AP_CON_NAME", "motor-scan-ap").strip() or "motor-scan-ap",
"ssid": os.getenv("AP_SSID", "motor-scan").strip() or "motor-scan",
"password": os.getenv("AP_PASSWORD", "motor-scan-1234").strip() or "motor-scan-1234",
"band": os.getenv("AP_BAND", "bg").strip() or None,
"channel": int(os.getenv("AP_CHANNEL", "6")),
}
def hotspot_up() -> dict[str, Any]:
"""Start an AP (hotspot) if not connected to WiFi."""
nmcli = _nmcli()
cfg = _hotspot_config()
# NetworkManager requires WPA2 password >= 8 chars if provided.
if cfg["password"] and len(cfg["password"]) < 8:
raise ValueError("AP_PASSWORD must be at least 8 characters")
try:
nmcli.radio.wifi_on()
except Exception:
pass
hs = nmcli.device.wifi_hotspot(
ifname=cfg["ifname"],
con_name=cfg["con_name"],
ssid=cfg["ssid"],
band=cfg["band"],
channel=cfg["channel"],
password=cfg["password"],
)
return {
"mode": "ap",
"ifname": cfg["ifname"],
"con_name": cfg["con_name"],
"ssid": cfg["ssid"],
"channel": cfg["channel"],
"band": cfg["band"],
"hotspot": getattr(hs, "__dict__", None) or str(hs),
}
def hotspot_down() -> None:
nmcli = _nmcli()
cfg = _hotspot_config()
try:
nmcli.connection.down(cfg["con_name"], wait=10)
except Exception:
pass
if _truthy(os.getenv("AP_DELETE_ON_CONNECT", "0")):
try:
nmcli.connection.delete(cfg["con_name"], wait=10)
except Exception:
pass
def get_wifi_mode() -> str:
"""Return 'client', 'ap' or 'none' (best-effort)."""
ssid = get_current_ssid()
if ssid:
return "client"
nmcli = _nmcli()
cfg = _hotspot_config()
try:
active = nmcli.connection.show_all(active=True)
for c in active or []:
name = getattr(c, "name", None) or getattr(c, "connection", None)
if name == cfg["con_name"]:
return "ap"
except Exception:
pass
return "none"
def ensure_wifi_or_start_ap() -> dict[str, Any]:
"""If no WiFi connection, start AP; return the resulting mode and details."""
if not _truthy(os.getenv("WIFI_MANAGE_ENABLED", "1")):
return {"mode": "disabled"}
ssid = get_current_ssid()
if ssid:
return {"mode": "client", "ssid": ssid}
return hotspot_up()