import os import socket import subprocess import threading import time from dataclasses import dataclass from typing import Optional, Callable, Dict, Any def _truthy(value: Optional[str]) -> bool: if value is None: return False return value.strip().lower() in {"1", "true", "yes", "y", "on"} def _get_lan_ip() -> Optional[str]: """Best-effort LAN IP detection. Uses a UDP socket trick (no packets actually need to be received) to learn which local interface would be used to reach the internet. """ try: s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) try: s.connect(("8.8.8.8", 80)) ip = s.getsockname()[0] if ip and ip != "0.0.0.0": return ip finally: s.close() except Exception: pass try: hostname = socket.gethostname() ip = socket.gethostbyname(hostname) if ip and ip != "127.0.0.1": return ip except Exception: pass return None def build_public_url() -> str: """Build a URL that should be reachable from LAN clients.""" explicit = os.getenv("PUBLIC_URL", "").strip() if explicit: return explicit ip = _get_lan_ip() or os.getenv("HOST", "0.0.0.0") # If host is 0.0.0.0, it's not a client-reachable address. if ip in {"0.0.0.0", "127.0.0.1"}: ip = _get_lan_ip() or "raspberrypi.local" scheme = os.getenv("PUBLIC_SCHEME", "http").strip() or "http" return f"{scheme}://{ip}" from luma.core.interface.serial import i2c from luma.oled.device import ssd1306 def _init_oled_device(): address = int(os.getenv("OLED_I2C_ADDRESS", "0x3C"), 0) port = int(os.getenv("OLED_I2C_PORT", "1")) rotate = int(os.getenv("OLED_ROTATE", "0")) serial = i2c(port=port, address=address) return ssd1306(serial, width=128, height=64, rotate=rotate) def _make_qr_64(url: str): import qrcode from PIL import Image # Left: 64x64 QR qr = qrcode.QRCode( version=None, error_correction=qrcode.constants.ERROR_CORRECT_M, box_size=2, border=1, ) qr.add_data(url) qr.make(fit=True) # Use explicit black/white to avoid near-black values like 1 that can # get thresholded to all-black when converted to 1-bit. qr_img = qr.make_image(fill_color="black", back_color="white").convert("1") # Fit into 64x64 (keep square) return qr_img.resize((64, 64), Image.NEAREST) def _format_state_lines(state: Optional[Dict[str, Any]]) -> list[str]: if not state: return ["state: n/a"] dis_steps = state.get("dis") phase = state.get("phase") freq = state.get("freq") p2p = state.get("p2p") def _fmt_float(value: Any, *, decimals: int) -> str: try: if value is None or isinstance(value, bool): return "-" f = float(value) return f"{f:.{decimals}f}" if decimals > 0 else f"{f:.0f}" except Exception: return "-" # Keep units explicit as requested; no labels; we'll right-align later. try: dis_mm = float(dis_steps or 0) / 1600.0 except Exception: dis_mm = None try: freq_khz = (float(freq) / 1000.0) if freq is not None else None except Exception: freq_khz = None lines: list[str] = [] lines.append(f"{_fmt_float(dis_mm, decimals=2)} mm") lines.append(f"{_fmt_float(freq_khz, decimals=2)} kHz") lines.append(f"{_fmt_float(phase, decimals=3)} rad") lines.append(f"{_fmt_float(p2p, decimals=0)} ADC") return lines def _load_mono_font(): from PIL import ImageFont # Prefer a real monospace TTF if available. candidates = [] env_path = os.getenv("OLED_FONT_PATH", "").strip() if env_path: candidates.append(env_path) candidates.extend( [ "/usr/share/fonts/truetype/dejavu/DejaVuSansMono.ttf", "/usr/share/fonts/truetype/dejavu/DejaVuSansMono-Bold.ttf", "/usr/share/fonts/truetype/liberation/LiberationMono-Regular.ttf", ] ) size = int(os.getenv("OLED_FONT_SIZE", "10")) for path in candidates: try: return ImageFont.truetype(path, size=size) except Exception: continue # Fallback: PIL default (often bitmap monospace) return ImageFont.load_default() def _max_chars_for_width(font, max_px: int) -> int: try: # Estimate monospace char width from '0' bbox = font.getbbox("0") char_w = max(1, bbox[2] - bbox[0]) return max(1, int(max_px // char_w)) except Exception: # Conservative fallback return max(1, int(max_px // 6)) def _get_wifi_ssid() -> str: """Best-effort current WiFi SSID. Tries common tools on Raspberry Pi OS. Never raises. """ env = os.getenv("WIFI_SSID", "").strip() if env: return env def _run(cmd: list[str]) -> str: try: p = subprocess.run(cmd, capture_output=True, text=True, timeout=0.5) if p.returncode == 0: return (p.stdout or "").strip() except Exception: return "" return "" # Most minimal dependency ssid = _run(["iwgetid", "-r"]) if ssid: return ssid # NetworkManager out = _run(["nmcli", "-t", "-f", "active,ssid", "dev", "wifi"]) if out: for line in out.splitlines(): # yes: if line.startswith("yes:"): return line.split(":", 1)[1].strip() # wpa_supplicant out = _run(["wpa_cli", "-i", "wlan0", "status"]) if out: for line in out.splitlines(): if line.startswith("ssid="): return line.split("=", 1)[1].strip() return "" def _make_wifi_line(ssid: str, max_chars: int, scroll_offset: int) -> str: """Build the right-top WiFi line with optional scrolling. The prefix is fixed; only the SSID part scrolls when it doesn't fit. """ prefix = "WiFi:" if max_chars <= 0: return "" ssid = (ssid or "").strip() or "-" # Keep prefix visible when possible. # We render as: "WiFi:" + " " + join = " " base = prefix + join if len(base) >= max_chars: # Not enough space even for full prefix; truncate prefix. return (prefix[:max_chars]).ljust(max_chars) avail = max_chars - len(base) if len(ssid) <= avail: return (base + ssid).ljust(max_chars) pad = os.getenv("OLED_SSID_SCROLL_PAD", " ") if not pad: pad = " " scroll_text = ssid + pad if not scroll_text: return (base + "-").ljust(max_chars) offset = scroll_offset % len(scroll_text) # Wrap-around slice of length `avail` doubled = scroll_text + scroll_text window = doubled[offset : offset + avail] return (base + window).ljust(max_chars) def _render_screen( qr_img_64, state: Optional[Dict[str, Any]], *, wifi_line: Optional[str] = None, font=None, max_chars: Optional[int] = None, ): from PIL import Image, ImageDraw canvas = Image.new("1", (128, 64), 0) canvas.paste(qr_img_64, (0, 0)) draw = ImageDraw.Draw(canvas) if font is None: font = _load_mono_font() x = 66 top_y = 0 if max_chars is None: max_chars = _max_chars_for_width(font, 128 - x) # Right-top: WiFi SSID ssid_text = wifi_line if wifi_line is not None else "" if len(ssid_text) > max_chars: ssid_text = ssid_text[:max_chars] draw.text((x, top_y), ssid_text, font=font, fill=255) # Right-bottom: state lines lines = _format_state_lines(state) line_h = 12 y = 64 - (len(lines) * line_h) # Don't overlap SSID line if y < line_h: y = line_h for line in lines: text = str(line) if len(text) > max_chars: text = text[-max_chars:] else: text = text.rjust(max_chars) draw.text((x, y), text, font=font, fill=255) y += line_h return canvas @dataclass class OLEDStartupDisplay: stop_event: threading.Event thread: threading.Thread _manager: Optional[OLEDStartupDisplay] = None def start_oled_startup_display() -> None: """Show server URL QR code + live state on a 128x64 I2C OLED. Safe by design: failures only log, never crash the API server. """ global _manager if _manager is not None: return if not _truthy(os.getenv("OLED_ENABLED", "1")): return stop_event = threading.Event() def _run(): # A few retries in case I2C isn't ready instantly. last_error: Optional[Exception] = None for _ in range(3): if stop_event.is_set(): return try: device = _init_oled_device() url = build_public_url() qr_img_64 = _make_qr_64(url) # Optional live updates using a state provider. state_provider: Optional[Callable[[], Dict[str, Any]]] = getattr( start_oled_startup_display, "state_provider", None ) refresh_s = float(os.getenv("OLED_REFRESH_S", "0.25")) if refresh_s <= 0: refresh_s = 0.25 ssid_refresh_s = float(os.getenv("OLED_SSID_REFRESH_S", "2.0")) if ssid_refresh_s <= 0: ssid_refresh_s = 2.0 last_ssid_check = 0.0 current_ssid = "" ssid_scroll_offset = 0 while not stop_event.is_set(): now = time.time() if (now - last_ssid_check) >= ssid_refresh_s: last_ssid_check = now new_ssid = _get_wifi_ssid() if new_ssid != current_ssid: current_ssid = new_ssid ssid_scroll_offset = 0 st = None if state_provider is not None: try: st = state_provider() except Exception: st = None # Cache font/width for consistent layout and less overhead. if "oled_font" not in locals(): oled_font = _load_mono_font() oled_max_chars = _max_chars_for_width(oled_font, 128 - 66) wifi_line = _make_wifi_line(current_ssid, oled_max_chars, ssid_scroll_offset) # Scroll only when SSID is actually too long to fit. if current_ssid and len((current_ssid or "").strip()) > max(0, oled_max_chars - len("WiFi: ")): ssid_scroll_offset += 1 image = _render_screen(qr_img_64, st, wifi_line=wifi_line, font=oled_font, max_chars=oled_max_chars) device.display(image) time.sleep(refresh_s) return except Exception as e: last_error = e time.sleep(0.5) if last_error is not None: print(f"[OLED] init/display failed: {last_error}") t = threading.Thread(target=_run, daemon=True) t.start() _manager = OLEDStartupDisplay(stop_event=stop_event, thread=t) def stop_oled_startup_display() -> None: global _manager if _manager is None: return _manager.stop_event.set() _manager = None