192 lines
5.3 KiB
Python
192 lines
5.3 KiB
Python
import os
|
|
from typing import Any, Optional
|
|
|
|
|
|
def _truthy(value: Optional[str]) -> bool:
|
|
if value is None:
|
|
return False
|
|
return value.strip().lower() in {"1", "true", "yes", "y", "on"}
|
|
|
|
|
|
def _nmcli():
|
|
import nmcli # type: ignore
|
|
|
|
# Ensure stable English-ish outputs internally.
|
|
try:
|
|
nmcli.set_lang("C.UTF-8")
|
|
except Exception:
|
|
pass
|
|
|
|
# The python wrapper uses sudo by default. If we run as root, disable sudo.
|
|
try:
|
|
if hasattr(os, "geteuid") and os.geteuid() == 0:
|
|
nmcli.disable_use_sudo()
|
|
except Exception:
|
|
pass
|
|
|
|
if _truthy(os.getenv("NMCLI_DISABLE_SUDO")):
|
|
try:
|
|
nmcli.disable_use_sudo()
|
|
except Exception:
|
|
pass
|
|
|
|
return nmcli
|
|
|
|
|
|
def get_wifi_ifname() -> str:
|
|
return os.getenv("WIFI_IFNAME", "wlan0").strip() or "wlan0"
|
|
|
|
|
|
def get_current_ssid(*, ifname: Optional[str] = None) -> Optional[str]:
|
|
"""Return current connected WiFi SSID if any, else None."""
|
|
nmcli = _nmcli()
|
|
ifname = ifname or get_wifi_ifname()
|
|
|
|
# Preferred: find "in_use" AP from scan list (no rescan by default).
|
|
try:
|
|
aps = nmcli.device.wifi(ifname=ifname, rescan=False)
|
|
for ap in aps or []:
|
|
in_use = getattr(ap, "in_use", None)
|
|
ssid = getattr(ap, "ssid", None)
|
|
if in_use in (True, "*", "yes") and ssid:
|
|
return str(ssid)
|
|
except Exception:
|
|
pass
|
|
|
|
# Fallback: device status provides active connection name (often equals SSID).
|
|
try:
|
|
devices = nmcli.device.status()
|
|
for d in devices or []:
|
|
dtype = (getattr(d, "type", "") or "").lower()
|
|
if "wifi" not in dtype and "wireless" not in dtype:
|
|
continue
|
|
if (getattr(d, "device", None) or getattr(d, "ifname", None) or "") != ifname:
|
|
continue
|
|
state = (getattr(d, "state", "") or "").lower()
|
|
conn = getattr(d, "connection", None) or getattr(d, "con", None)
|
|
if "connected" in state and conn and conn not in {"--", "-"}:
|
|
return str(conn)
|
|
except Exception:
|
|
pass
|
|
|
|
return None
|
|
|
|
|
|
def wifi_connect(
|
|
ssid: str,
|
|
password: str = "",
|
|
*,
|
|
ifname: Optional[str] = None,
|
|
wait: Optional[int] = None,
|
|
) -> None:
|
|
"""Connect to the given SSID using NetworkManager (nmcli wrapper)."""
|
|
nmcli = _nmcli()
|
|
ifname = ifname or get_wifi_ifname()
|
|
|
|
ssid = (ssid or "").strip()
|
|
if not ssid:
|
|
raise ValueError("ssid is required")
|
|
|
|
if wait is None:
|
|
wait = int(os.getenv("WIFI_CONNECT_WAIT", "30"))
|
|
|
|
# Ensure Wi-Fi radio is on.
|
|
try:
|
|
nmcli.radio.wifi_on()
|
|
except Exception:
|
|
pass
|
|
|
|
nmcli.device.wifi_connect(ssid, password or "", ifname=ifname, wait=wait)
|
|
|
|
|
|
def _hotspot_config() -> dict[str, Any]:
|
|
return {
|
|
"ifname": get_wifi_ifname(),
|
|
"con_name": os.getenv("AP_CON_NAME", "motor-scan-ap").strip() or "motor-scan-ap",
|
|
"ssid": os.getenv("AP_SSID", "motor-scan").strip() or "motor-scan",
|
|
"password": os.getenv("AP_PASSWORD", "motor-scan-1234").strip() or "motor-scan-1234",
|
|
"band": os.getenv("AP_BAND", "bg").strip() or None,
|
|
"channel": int(os.getenv("AP_CHANNEL", "6")),
|
|
}
|
|
|
|
|
|
def hotspot_up() -> dict[str, Any]:
|
|
"""Start an AP (hotspot) if not connected to WiFi."""
|
|
nmcli = _nmcli()
|
|
cfg = _hotspot_config()
|
|
|
|
# NetworkManager requires WPA2 password >= 8 chars if provided.
|
|
if cfg["password"] and len(cfg["password"]) < 8:
|
|
raise ValueError("AP_PASSWORD must be at least 8 characters")
|
|
|
|
try:
|
|
nmcli.radio.wifi_on()
|
|
except Exception:
|
|
pass
|
|
|
|
hs = nmcli.device.wifi_hotspot(
|
|
ifname=cfg["ifname"],
|
|
con_name=cfg["con_name"],
|
|
ssid=cfg["ssid"],
|
|
band=cfg["band"],
|
|
channel=cfg["channel"],
|
|
password=cfg["password"],
|
|
)
|
|
|
|
return {
|
|
"mode": "ap",
|
|
"ifname": cfg["ifname"],
|
|
"con_name": cfg["con_name"],
|
|
"ssid": cfg["ssid"],
|
|
"channel": cfg["channel"],
|
|
"band": cfg["band"],
|
|
"hotspot": getattr(hs, "__dict__", None) or str(hs),
|
|
}
|
|
|
|
|
|
def hotspot_down() -> None:
|
|
nmcli = _nmcli()
|
|
cfg = _hotspot_config()
|
|
try:
|
|
nmcli.connection.down(cfg["con_name"], wait=10)
|
|
except Exception:
|
|
pass
|
|
|
|
if _truthy(os.getenv("AP_DELETE_ON_CONNECT", "0")):
|
|
try:
|
|
nmcli.connection.delete(cfg["con_name"], wait=10)
|
|
except Exception:
|
|
pass
|
|
|
|
|
|
def get_wifi_mode() -> str:
|
|
"""Return 'client', 'ap' or 'none' (best-effort)."""
|
|
ssid = get_current_ssid()
|
|
if ssid:
|
|
return "client"
|
|
|
|
nmcli = _nmcli()
|
|
cfg = _hotspot_config()
|
|
try:
|
|
active = nmcli.connection.show_all(active=True)
|
|
for c in active or []:
|
|
name = getattr(c, "name", None) or getattr(c, "connection", None)
|
|
if name == cfg["con_name"]:
|
|
return "ap"
|
|
except Exception:
|
|
pass
|
|
|
|
return "none"
|
|
|
|
|
|
def ensure_wifi_or_start_ap() -> dict[str, Any]:
|
|
"""If no WiFi connection, start AP; return the resulting mode and details."""
|
|
if not _truthy(os.getenv("WIFI_MANAGE_ENABLED", "1")):
|
|
return {"mode": "disabled"}
|
|
|
|
ssid = get_current_ssid()
|
|
if ssid:
|
|
return {"mode": "client", "ssid": ssid}
|
|
|
|
return hotspot_up()
|