396 lines
11 KiB
Python
396 lines
11 KiB
Python
import os
|
|
import socket
|
|
import subprocess
|
|
import threading
|
|
import time
|
|
from dataclasses import dataclass
|
|
from typing import Optional, Callable, Dict, Any
|
|
|
|
|
|
def _truthy(value: Optional[str]) -> bool:
|
|
if value is None:
|
|
return False
|
|
return value.strip().lower() in {"1", "true", "yes", "y", "on"}
|
|
|
|
|
|
def _get_lan_ip() -> Optional[str]:
|
|
"""Best-effort LAN IP detection.
|
|
|
|
Uses a UDP socket trick (no packets actually need to be received) to learn
|
|
which local interface would be used to reach the internet.
|
|
"""
|
|
try:
|
|
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
|
try:
|
|
s.connect(("8.8.8.8", 80))
|
|
ip = s.getsockname()[0]
|
|
if ip and ip != "0.0.0.0":
|
|
return ip
|
|
finally:
|
|
s.close()
|
|
except Exception:
|
|
pass
|
|
|
|
try:
|
|
hostname = socket.gethostname()
|
|
ip = socket.gethostbyname(hostname)
|
|
if ip and ip != "127.0.0.1":
|
|
return ip
|
|
except Exception:
|
|
pass
|
|
|
|
return None
|
|
|
|
|
|
def build_public_url() -> str:
|
|
"""Build a URL that should be reachable from LAN clients."""
|
|
explicit = os.getenv("PUBLIC_URL", "").strip()
|
|
if explicit:
|
|
return explicit
|
|
|
|
ip = _get_lan_ip() or os.getenv("HOST", "0.0.0.0")
|
|
|
|
# If host is 0.0.0.0, it's not a client-reachable address.
|
|
if ip in {"0.0.0.0", "127.0.0.1"}:
|
|
ip = _get_lan_ip() or "raspberrypi.local"
|
|
|
|
scheme = os.getenv("PUBLIC_SCHEME", "http").strip() or "http"
|
|
return f"{scheme}://{ip}"
|
|
|
|
|
|
from luma.core.interface.serial import i2c
|
|
from luma.oled.device import ssd1306
|
|
|
|
def _init_oled_device():
|
|
address = int(os.getenv("OLED_I2C_ADDRESS", "0x3C"), 0)
|
|
port = int(os.getenv("OLED_I2C_PORT", "1"))
|
|
rotate = int(os.getenv("OLED_ROTATE", "0"))
|
|
|
|
serial = i2c(port=port, address=address)
|
|
|
|
return ssd1306(serial, width=128, height=64, rotate=rotate)
|
|
|
|
|
|
def _make_qr_64(url: str):
|
|
import qrcode
|
|
from PIL import Image
|
|
|
|
# Left: 64x64 QR
|
|
qr = qrcode.QRCode(
|
|
version=None,
|
|
error_correction=qrcode.constants.ERROR_CORRECT_M,
|
|
box_size=2,
|
|
border=1,
|
|
)
|
|
qr.add_data(url)
|
|
qr.make(fit=True)
|
|
|
|
# Use explicit black/white to avoid near-black values like 1 that can
|
|
# get thresholded to all-black when converted to 1-bit.
|
|
qr_img = qr.make_image(fill_color="black", back_color="white").convert("1")
|
|
# Fit into 64x64 (keep square)
|
|
return qr_img.resize((64, 64), Image.NEAREST)
|
|
|
|
|
|
def _format_state_lines(state: Optional[Dict[str, Any]]) -> list[str]:
|
|
if not state:
|
|
return ["state: n/a"]
|
|
|
|
dis_steps = state.get("dis")
|
|
phase = state.get("phase")
|
|
freq = state.get("freq")
|
|
p2p = state.get("p2p")
|
|
|
|
def _fmt_float(value: Any, *, decimals: int) -> str:
|
|
try:
|
|
if value is None or isinstance(value, bool):
|
|
return "-"
|
|
f = float(value)
|
|
return f"{f:.{decimals}f}" if decimals > 0 else f"{f:.0f}"
|
|
except Exception:
|
|
return "-"
|
|
|
|
# Keep units explicit as requested; no labels; we'll right-align later.
|
|
try:
|
|
dis_mm = float(dis_steps or 0) / 1600.0
|
|
except Exception:
|
|
dis_mm = None
|
|
|
|
try:
|
|
freq_khz = (float(freq) / 1000.0) if freq is not None else None
|
|
except Exception:
|
|
freq_khz = None
|
|
|
|
lines: list[str] = []
|
|
lines.append(f"{_fmt_float(dis_mm, decimals=2)} mm")
|
|
lines.append(f"{_fmt_float(freq_khz, decimals=2)} kHz")
|
|
lines.append(f"{_fmt_float(phase, decimals=3)} rad")
|
|
lines.append(f"{_fmt_float(p2p, decimals=0)} ADC")
|
|
return lines
|
|
|
|
|
|
def _load_mono_font():
|
|
from PIL import ImageFont
|
|
|
|
# Prefer a real monospace TTF if available.
|
|
candidates = []
|
|
env_path = os.getenv("OLED_FONT_PATH", "").strip()
|
|
if env_path:
|
|
candidates.append(env_path)
|
|
candidates.extend(
|
|
[
|
|
"/usr/share/fonts/truetype/dejavu/DejaVuSansMono.ttf",
|
|
"/usr/share/fonts/truetype/dejavu/DejaVuSansMono-Bold.ttf",
|
|
"/usr/share/fonts/truetype/liberation/LiberationMono-Regular.ttf",
|
|
]
|
|
)
|
|
|
|
size = int(os.getenv("OLED_FONT_SIZE", "10"))
|
|
for path in candidates:
|
|
try:
|
|
return ImageFont.truetype(path, size=size)
|
|
except Exception:
|
|
continue
|
|
|
|
# Fallback: PIL default (often bitmap monospace)
|
|
return ImageFont.load_default()
|
|
|
|
|
|
def _max_chars_for_width(font, max_px: int) -> int:
|
|
try:
|
|
# Estimate monospace char width from '0'
|
|
bbox = font.getbbox("0")
|
|
char_w = max(1, bbox[2] - bbox[0])
|
|
return max(1, int(max_px // char_w))
|
|
except Exception:
|
|
# Conservative fallback
|
|
return max(1, int(max_px // 6))
|
|
|
|
|
|
def _get_wifi_ssid() -> str:
|
|
"""Best-effort current WiFi SSID.
|
|
|
|
Tries common tools on Raspberry Pi OS. Never raises.
|
|
"""
|
|
env = os.getenv("WIFI_SSID", "").strip()
|
|
if env:
|
|
return env
|
|
|
|
def _run(cmd: list[str]) -> str:
|
|
try:
|
|
p = subprocess.run(cmd, capture_output=True, text=True, timeout=0.5)
|
|
if p.returncode == 0:
|
|
return (p.stdout or "").strip()
|
|
except Exception:
|
|
return ""
|
|
return ""
|
|
|
|
# Most minimal dependency
|
|
ssid = _run(["iwgetid", "-r"])
|
|
if ssid:
|
|
return ssid
|
|
|
|
# NetworkManager
|
|
out = _run(["nmcli", "-t", "-f", "active,ssid", "dev", "wifi"])
|
|
if out:
|
|
for line in out.splitlines():
|
|
# yes:<ssid>
|
|
if line.startswith("yes:"):
|
|
return line.split(":", 1)[1].strip()
|
|
|
|
# wpa_supplicant
|
|
out = _run(["wpa_cli", "-i", "wlan0", "status"])
|
|
if out:
|
|
for line in out.splitlines():
|
|
if line.startswith("ssid="):
|
|
return line.split("=", 1)[1].strip()
|
|
|
|
return ""
|
|
|
|
|
|
def _make_wifi_line(ssid: str, max_chars: int, scroll_offset: int) -> str:
|
|
"""Build the right-top WiFi line with optional scrolling.
|
|
|
|
The prefix is fixed; only the SSID part scrolls when it doesn't fit.
|
|
"""
|
|
prefix = "WiFi:"
|
|
if max_chars <= 0:
|
|
return ""
|
|
|
|
ssid = (ssid or "").strip() or "-"
|
|
|
|
# Keep prefix visible when possible.
|
|
# We render as: "WiFi:" + " " + <ssid-part>
|
|
join = " "
|
|
base = prefix + join
|
|
|
|
if len(base) >= max_chars:
|
|
# Not enough space even for full prefix; truncate prefix.
|
|
return (prefix[:max_chars]).ljust(max_chars)
|
|
|
|
avail = max_chars - len(base)
|
|
if len(ssid) <= avail:
|
|
return (base + ssid).ljust(max_chars)
|
|
|
|
pad = os.getenv("OLED_SSID_SCROLL_PAD", " ")
|
|
if not pad:
|
|
pad = " "
|
|
scroll_text = ssid + pad
|
|
if not scroll_text:
|
|
return (base + "-").ljust(max_chars)
|
|
|
|
offset = scroll_offset % len(scroll_text)
|
|
# Wrap-around slice of length `avail`
|
|
doubled = scroll_text + scroll_text
|
|
window = doubled[offset : offset + avail]
|
|
return (base + window).ljust(max_chars)
|
|
|
|
|
|
def _render_screen(
|
|
qr_img_64,
|
|
state: Optional[Dict[str, Any]],
|
|
*,
|
|
wifi_line: Optional[str] = None,
|
|
font=None,
|
|
max_chars: Optional[int] = None,
|
|
):
|
|
from PIL import Image, ImageDraw
|
|
|
|
canvas = Image.new("1", (128, 64), 0)
|
|
canvas.paste(qr_img_64, (0, 0))
|
|
|
|
draw = ImageDraw.Draw(canvas)
|
|
if font is None:
|
|
font = _load_mono_font()
|
|
|
|
x = 66
|
|
top_y = 0
|
|
if max_chars is None:
|
|
max_chars = _max_chars_for_width(font, 128 - x)
|
|
|
|
# Right-top: WiFi SSID
|
|
ssid_text = wifi_line if wifi_line is not None else ""
|
|
if len(ssid_text) > max_chars:
|
|
ssid_text = ssid_text[:max_chars]
|
|
draw.text((x, top_y), ssid_text, font=font, fill=255)
|
|
|
|
# Right-bottom: state lines
|
|
lines = _format_state_lines(state)
|
|
line_h = 12
|
|
y = 64 - (len(lines) * line_h)
|
|
# Don't overlap SSID line
|
|
if y < line_h:
|
|
y = line_h
|
|
|
|
for line in lines:
|
|
text = str(line)
|
|
if len(text) > max_chars:
|
|
text = text[-max_chars:]
|
|
else:
|
|
text = text.rjust(max_chars)
|
|
draw.text((x, y), text, font=font, fill=255)
|
|
y += line_h
|
|
|
|
return canvas
|
|
|
|
|
|
@dataclass
|
|
class OLEDStartupDisplay:
|
|
stop_event: threading.Event
|
|
thread: threading.Thread
|
|
|
|
|
|
_manager: Optional[OLEDStartupDisplay] = None
|
|
|
|
|
|
def start_oled_startup_display() -> None:
|
|
"""Show server URL QR code + live state on a 128x64 I2C OLED.
|
|
|
|
Safe by design: failures only log, never crash the API server.
|
|
"""
|
|
global _manager
|
|
|
|
if _manager is not None:
|
|
return
|
|
|
|
if not _truthy(os.getenv("OLED_ENABLED", "1")):
|
|
return
|
|
|
|
stop_event = threading.Event()
|
|
|
|
def _run():
|
|
# A few retries in case I2C isn't ready instantly.
|
|
last_error: Optional[Exception] = None
|
|
for _ in range(3):
|
|
if stop_event.is_set():
|
|
return
|
|
try:
|
|
device = _init_oled_device()
|
|
url = build_public_url()
|
|
qr_img_64 = _make_qr_64(url)
|
|
|
|
# Optional live updates using a state provider.
|
|
state_provider: Optional[Callable[[], Dict[str, Any]]] = getattr(
|
|
start_oled_startup_display, "state_provider", None
|
|
)
|
|
refresh_s = float(os.getenv("OLED_REFRESH_S", "0.25"))
|
|
if refresh_s <= 0:
|
|
refresh_s = 0.25
|
|
|
|
ssid_refresh_s = float(os.getenv("OLED_SSID_REFRESH_S", "2.0"))
|
|
if ssid_refresh_s <= 0:
|
|
ssid_refresh_s = 2.0
|
|
last_ssid_check = 0.0
|
|
current_ssid = ""
|
|
ssid_scroll_offset = 0
|
|
|
|
while not stop_event.is_set():
|
|
now = time.time()
|
|
if (now - last_ssid_check) >= ssid_refresh_s:
|
|
last_ssid_check = now
|
|
new_ssid = _get_wifi_ssid()
|
|
if new_ssid != current_ssid:
|
|
current_ssid = new_ssid
|
|
ssid_scroll_offset = 0
|
|
|
|
st = None
|
|
if state_provider is not None:
|
|
try:
|
|
st = state_provider()
|
|
except Exception:
|
|
st = None
|
|
|
|
# Cache font/width for consistent layout and less overhead.
|
|
if "oled_font" not in locals():
|
|
oled_font = _load_mono_font()
|
|
oled_max_chars = _max_chars_for_width(oled_font, 128 - 66)
|
|
|
|
wifi_line = _make_wifi_line(current_ssid, oled_max_chars, ssid_scroll_offset)
|
|
# Scroll only when SSID is actually too long to fit.
|
|
if current_ssid and len((current_ssid or "").strip()) > max(0, oled_max_chars - len("WiFi: ")):
|
|
ssid_scroll_offset += 1
|
|
|
|
image = _render_screen(qr_img_64, st, wifi_line=wifi_line, font=oled_font, max_chars=oled_max_chars)
|
|
device.display(image)
|
|
time.sleep(refresh_s)
|
|
|
|
return
|
|
except Exception as e:
|
|
last_error = e
|
|
time.sleep(0.5)
|
|
|
|
if last_error is not None:
|
|
print(f"[OLED] init/display failed: {last_error}")
|
|
|
|
t = threading.Thread(target=_run, daemon=True)
|
|
t.start()
|
|
_manager = OLEDStartupDisplay(stop_event=stop_event, thread=t)
|
|
|
|
|
|
def stop_oled_startup_display() -> None:
|
|
global _manager
|
|
if _manager is None:
|
|
return
|
|
_manager.stop_event.set()
|
|
_manager = None
|