server.py

2026-06-29




"""
api_server/server.py - Standalone OpenAI-compatible API server for nanobot.

Uses nanobot.api.server (aiohttp) + nanobot.nanobot.Nanobot.from_config
to expose /v1/chat/completions, /v1/models, and /health.

CLI usage:
  python server.py --config <path> --workspace <path> [--port 47391]
                   [--model nanobot] [--timeout 120] [--parent-pid <PID>]
"""

from __future__ import annotations

import argparse
import asyncio
import json
import os
import re
import sys
import threading
import uuid
import urllib.error
import urllib.parse
import urllib.request
from pathlib import Path

import httpx
import mimetypes

_CORS_HEADERS = {
    "Access-Control-Allow-Origin": "*",
    "Access-Control-Allow-Methods": "GET, PUT, PATCH, DELETE, POST, OPTIONS",
    "Access-Control-Allow-Headers": "Content-Type, Authorization",
    "Access-Control-Max-Age": "86400",
}

# Application version — read dynamically from update/version.json so the
# /api/version endpoint always reflects the actually installed version
# (not a stale compile-time constant).  Falls back to the value below only
# when version.json cannot be found (e.g. very early dev runs).
_APP_VERSION_FALLBACK = "0.3.9"


def _read_app_version() -> str:
    """Read the installed version from update/version.json.

    Works in both frozen (PyInstaller) and dev modes because it uses the same
    install-root logic as update_manager.get_install_root().
    """
    try:
        if getattr(sys, "frozen", False):
            install_root = Path(sys.executable).parent.parent
        else:
            install_root = Path(__file__).parent.parent
        vf = install_root / "update" / "version.json"
        if vf.exists():
            data = json.loads(vf.read_text(encoding="utf-8-sig"))
            v = data.get("version", "")
            if v and v != "0.0.0":
                return v
    except Exception:
        pass
    return _APP_VERSION_FALLBACK


_APP_VERSION = _read_app_version()


# ---------------------------------------------------------------------------
# Parent-process watchdog
# ---------------------------------------------------------------------------

def _is_pid_alive(pid: int) -> bool:
    """Return True if *pid* refers to a running process.

    Uses the Windows kernel API on Windows because ``os.kill(pid, 0)``
    maps to ``CTRL_C_EVENT`` (signal value 0) on Windows, which sends an
    unwanted control event to the target process group and may raise
    ``OSError`` when the two processes belong to different console groups,
    causing the watchdog to incorrectly conclude the parent is gone.
    """
    if os.name == "nt":
        import ctypes
        kernel32 = ctypes.windll.kernel32  # type: ignore[attr-defined]
        PROCESS_QUERY_LIMITED_INFORMATION = 0x1000
        STILL_ACTIVE = 259
        handle = kernel32.OpenProcess(PROCESS_QUERY_LIMITED_INFORMATION, False, pid)
        if not handle:
            return False
        try:
            exit_code = ctypes.c_ulong(0)
            ok = kernel32.GetExitCodeProcess(handle, ctypes.byref(exit_code))
            return bool(ok) and exit_code.value == STILL_ACTIVE
        finally:
            kernel32.CloseHandle(handle)
    else:
        try:
            os.kill(pid, 0)
            return True
        except ProcessLookupError:
            return False
        except PermissionError:
            return True  # process exists but we lack permission to signal it


def _start_parent_watchdog(parent_pid: int) -> None:
    """Exit this process when the parent process (launcher) disappears."""
    import time

    def _watch() -> None:
        while True:
            time.sleep(5)
            if not _is_pid_alive(parent_pid):
                os._exit(0)

    t = threading.Thread(target=_watch, daemon=True)
    t.start()


# ---------------------------------------------------------------------------
# Entry point
# ---------------------------------------------------------------------------

def _parse_args() -> argparse.Namespace:
    p = argparse.ArgumentParser(description="Nanobot OpenAI-compatible API server")
    p.add_argument("--config",            default=None, help="Path to nanobot config.json")
    p.add_argument("--workspace",         default=None, help="Override workspace directory")
    p.add_argument("--port",              type=int, default=47391, help="HTTP port to listen on (default: 47391)")
    p.add_argument("--host",              default="127.0.0.1", help="Host to bind (default: 127.0.0.1)")
    p.add_argument("--model",             default="nanobot", help="Model name reported in responses")
    p.add_argument("--timeout",           type=float, default=120.0, help="Per-request timeout in seconds")
    p.add_argument("--parent-pid",        type=int, default=0, dest="parent_pid",
                   help="If set, exit when this PID disappears")
    p.add_argument("--mec-wiki",           default=None, dest="mec_wiki",
                   help="Path to the bundled ein-wiki folder (contains runtime/node.exe and server.js)")
    p.add_argument("--mec-wiki-vault",     default=None, dest="mec_wiki_vault",
                   help="Default vault path passed to Ein-Wiki server (WIKI_VAULT_PATH env var)")
    p.add_argument("--apply-staged",       action="store_true", dest="apply_staged",
                   help="Apply staged update and exit (invoked by restart helper)")
    p.add_argument("--install-root",       default="", dest="install_root",
                   help="Installation root path for --apply-staged mode")
    return p.parse_args()


async def _main() -> None:
    args = _parse_args()

    if args.parent_pid:
        _start_parent_watchdog(args.parent_pid)

    # Resolve config path
    config_path: Path | None = None
    if args.config:
        config_path = Path(args.config).expanduser().resolve()
        if not config_path.exists():
            print(f"[api_server] ERROR: config not found: {config_path}", file=sys.stderr)
            sys.exit(1)

    from aiohttp import web

    @web.middleware
    async def cors_middleware(request: web.Request, handler):
        # Handle preflight (OPTIONS) for any route, including unregistered ones.
        if request.method == "OPTIONS":
            return web.Response(headers=_CORS_HEADERS)
        try:
            response = await handler(request)
        except web.HTTPException as exc:
            # Framework-generated errors (404, 405, etc.) — attach CORS headers.
            for k, v in _CORS_HEADERS.items():
                exc.headers[k] = v
            raise
        # Attach CORS headers to every successful response.
        for k, v in _CORS_HEADERS.items():
            response.headers[k] = v
        return response

    app = web.Application(middlewares=[cors_middleware])

    # ---------------------------------------------------------------------------
    # User overrides endpoint — available immediately, no nanobot required.
    # Stored in <config_dir>/user_overrides.json.
    # ---------------------------------------------------------------------------

    def _overrides_path() -> Path | None:
        return config_path.parent / "user_overrides.json" if config_path else None

    async def _handle_options(request: web.Request) -> web.Response:
        return web.Response(headers=_CORS_HEADERS)

    async def _handle_get_overrides(request: web.Request) -> web.Response:
        path = _overrides_path()
        data: dict = {}
        if path and path.exists():
            try:
                data = json.loads(path.read_text(encoding="utf-8-sig"))
            except Exception:
                pass
        api_key = data.get("apiKey", "")
        masked = (api_key[:4] + "***" + api_key[-4:]) if len(api_key) > 8 else ("***" if api_key else "")
        return web.Response(
            text=json.dumps({
                "hasApiKey": bool(api_key),
                "maskedApiKey": masked,
                "whisperModelPath": data.get("whisperModelPath", ""),
            }),
            content_type="application/json",
            headers=_CORS_HEADERS,
        )

    async def _handle_patch_overrides(request: web.Request) -> web.Response:
        path = _overrides_path()
        if not path:
            return web.Response(status=503, text="config path unknown", headers=_CORS_HEADERS)
        try:
            body = await request.json()
        except Exception:
            return web.Response(status=400, text="invalid JSON", headers=_CORS_HEADERS)

        existing: dict = {}
        if path.exists():
            try:
                existing = json.loads(path.read_text(encoding="utf-8-sig"))
            except Exception:
                pass

        if "apiKey" in body:
            existing["apiKey"] = str(body["apiKey"])

        if "whisperModelPath" in body:
            existing["whisperModelPath"] = str(body["whisperModelPath"]).strip()
            # Clear cached model so next request reloads from the new path
            _whisper_model_cache.clear()

        # Patch the live runtime config.json so the change takes effect without restart.
        if config_path and config_path.exists() and "apiKey" in body:
            try:
                raw = config_path.read_text(encoding="utf-8")
                escaped = existing["apiKey"].replace('"', '\\"')
                raw = re.sub(
                    r'(?s)("custom"\s*:\s*\{[^}]*?"apiKey"\s*:\s*)"[^"]*"',
                    r'\1"' + escaped + '"',
                    raw,
                    count=1,
                )
                config_path.write_text(raw, encoding="utf-8")
            except Exception as e:
                print(f"[api_server] WARNING: could not patch runtime config: {e}", file=sys.stderr)

        path.write_text(json.dumps(existing, ensure_ascii=False, indent=2), encoding="utf-8")
        return web.Response(
            text=json.dumps({"ok": True}),
            content_type="application/json",
            headers=_CORS_HEADERS,
        )

    async def _handle_health(request: web.Request) -> web.Response:
        return web.Response(text=json.dumps({"status": "ok"}), content_type="application/json")

    # ---------------------------------------------------------------------------
    # Sidebar-state endpoint — chat title overrides & sidebar preferences.
    # Stored in <config_dir>/sidebar_state.json.
    # ---------------------------------------------------------------------------

    _SIDEBAR_STATE_DEFAULT: dict = {
        "schema_version": 1,
        "pinned_keys": [],
        "archived_keys": [],
        "title_overrides": {},
        "project_name_overrides": {},
        "tags_by_key": {},
        "collapsed_groups": {},
        "view": {
            "density": "comfortable",
            "show_previews": True,
            "show_timestamps": False,
            "show_archived": False,
            "sort": "updated_desc",
        },
    }

    _MAX_STATE_FILE_BYTES = 256 * 1024
    _MAX_LIST_ITEMS = 2000
    _MAX_MAP_ITEMS = 2000
    _MAX_KEY_LEN = 512
    _MAX_TITLE_LEN = 160
    _MAX_TAG_LEN = 40
    _MAX_TAGS_PER_KEY = 12

    def _sidebar_state_path() -> Path | None:
        return config_path.parent / "sidebar_state.json" if config_path else None

    def _clean_str(val: object, max_len: int) -> str | None:
        """Return stripped string (capped at max_len), or None if empty/invalid."""
        if not isinstance(val, str):
            return None
        s = val.strip()[:max_len]
        return s if s else None

    def _normalize_sidebar_state(raw: dict) -> dict:
        """Normalize an arbitrary dict into a valid sidebar-state v1 structure."""
        out: dict = {
            "schema_version": 1,
            "pinned_keys": [],
            "archived_keys": [],
            "title_overrides": {},
            "project_name_overrides": {},
            "tags_by_key": {},
            "collapsed_groups": {},
            "view": dict(_SIDEBAR_STATE_DEFAULT["view"]),
        }

        # Lists: pinned_keys, archived_keys
        for list_key in ("pinned_keys", "archived_keys"):
            val = raw.get(list_key)
            if isinstance(val, list):
                seen: set[str] = set()
                for item in val:
                    s = _clean_str(item, _MAX_KEY_LEN)
                    if s and s not in seen:
                        seen.add(s)
                        out[list_key].append(s)  # type: ignore[literal-required]
                        if len(out[list_key]) >= _MAX_LIST_ITEMS:  # type: ignore[literal-required]
                            break

        # Maps: title_overrides, project_name_overrides
        for map_key in ("title_overrides", "project_name_overrides"):
            val = raw.get(map_key)
            if isinstance(val, dict):
                count = 0
                for k, v in val.items():
                    ck = _clean_str(k, _MAX_KEY_LEN)
                    cv = _clean_str(v, _MAX_TITLE_LEN)
                    if ck is None:
                        continue
                    if cv is None:
                        continue  # empty value = delete, so skip it
                    out[map_key][ck] = cv  # type: ignore[literal-required]
                    count += 1
                    if count >= _MAX_MAP_ITEMS:
                        break

        # tags_by_key
        val = raw.get("tags_by_key")
        if isinstance(val, dict):
            count = 0
            for k, v in val.items():
                ck = _clean_str(k, _MAX_KEY_LEN)
                if ck is None:
                    continue
                if not isinstance(v, list):
                    continue
                tags: list[str] = []
                for t in v:
                    ct = _clean_str(t, _MAX_TAG_LEN)
                    if ct:
                        tags.append(ct)
                    if len(tags) >= _MAX_TAGS_PER_KEY:
                        break
                if tags:
                    out["tags_by_key"][ck] = tags
                    count += 1
                    if count >= _MAX_MAP_ITEMS:
                        break

        # collapsed_groups
        val = raw.get("collapsed_groups")
        if isinstance(val, dict):
            for k, v in val.items():
                ck = _clean_str(k, _MAX_KEY_LEN)
                if ck is None:
                    continue
                out["collapsed_groups"][ck] = bool(v)

        # view
        val = raw.get("view")
        if isinstance(val, dict):
            v = out["view"]
            d = val.get("density")
            if d in ("comfortable", "compact"):
                v["density"] = d
            for flag in ("show_previews", "show_timestamps", "show_archived"):
                if flag in val:
                    v[flag] = bool(val[flag])
            s = val.get("sort")
            if s in ("updated_desc", "created_desc", "title_asc"):
                v["sort"] = s

        return out

    def _read_sidebar_state() -> dict:
        """Read sidebar_state.json; return normalized state (default on error).

        If a ``.json.tmp`` staging file exists and is newer than the primary
        ``.json`` (left behind by a failed ``os.replace()``), we recover
        from it so that data is not lost.
        """
        path = _sidebar_state_path()
        tmp = path.with_suffix(".json.tmp") if path else None
        # Decide which file to read: prefer the newer one.
        read_path = None
        if path and path.exists():
            read_path = path
            # If a tmp staging file exists and is newer, it holds the latest data.
            if tmp and tmp.exists():
                try:
                    if tmp.stat().st_mtime > path.stat().st_mtime:
                        read_path = tmp
                except OSError:
                    pass
        elif tmp and tmp.exists():
            # Primary file missing — recover from tmp.
            read_path = tmp

        if read_path:
            try:
                raw = json.loads(read_path.read_text(encoding="utf-8-sig"))
                if isinstance(raw, dict):
                    recovered = _normalize_sidebar_state(raw)
                    # If we read from tmp, promote it to the primary location.
                    if read_path is tmp:
                        try:
                            tmp.replace(path)
                        except OSError:
                            pass
                    return recovered
            except Exception:
                pass
        return dict(_SIDEBAR_STATE_DEFAULT)

    def _write_sidebar_state(raw: dict) -> dict:
        """Normalize, set updated_at, atomically write; return normalized state.

        Uses a ``.json.tmp`` staging file + ``os.replace()`` for atomicity.
        On Windows, anti-virus or search-indexer locks can cause the replace
        to fail transiently, so we retry a few times.  If atomic replace
        still fails we fall back to ``shutil.copy2`` and finally to a
        direct ``write_text`` so the primary ``.json`` is always updated.
        """
        import time as _time
        import shutil as _shutil
        state = _normalize_sidebar_state(raw)
        state["updated_at"] = __import__("datetime").datetime.now(__import__("datetime").timezone.utc).isoformat()
        path = _sidebar_state_path()
        if path:
            tmp = path.with_suffix(".json.tmp")
            payload = json.dumps(state, ensure_ascii=False, indent=2)
            _written = False
            # --- Strategy 1: atomic via tmp + os.replace (with retries) ---
            _max_retries = 5
            _retry_delay_s = 0.05
            for _attempt in range(_max_retries + 1):
                try:
                    tmp.write_text(payload, encoding="utf-8")
                    tmp.replace(path)
                    _written = True
                    break
                except OSError as e:
                    if _attempt < _max_retries:
                        _time.sleep(_retry_delay_s)
                        _retry_delay_s *= 2
                    else:
                        print(
                            f"[api_server] WARN tmp.replace() failed after "
                            f"{_max_retries + 1} attempts: {e}; trying fallback",
                            file=sys.stderr,
                        )
            # --- Strategy 2: shutil.copy2(tmp, path) + unlink tmp ---
            if not _written:
                try:
                    _shutil.copy2(str(tmp), str(path))
                    try:
                        tmp.unlink()
                    except OSError:
                        pass
                    _written = True
                except OSError as e2:
                    print(
                        f"[api_server] WARN shutil.copy2 fallback failed: {e2}; "
                        f"trying direct write",
                        file=sys.stderr,
                    )
            # --- Strategy 3: direct write (non-atomic but reliable) ---
            if not _written:
                try:
                    path.write_text(payload, encoding="utf-8")
                    _written = True
                    try:
                        tmp.unlink()
                    except OSError:
                        pass
                except OSError as e3:
                    print(
                        f"[api_server] ERROR all write strategies failed for "
                        f"sidebar_state.json: {e3}",
                        file=sys.stderr,
                    )
            # fsync for durability when we succeeded
            if _written:
                try:
                    fd = os.open(str(path), os.O_RDONLY)
                    os.fsync(fd)
                    os.close(fd)
                except Exception:
                    pass
        return state

    async def _handle_get_sidebar_state(request: web.Request) -> web.Response:
        state = _read_sidebar_state()
        return web.Response(
            text=json.dumps(state, ensure_ascii=False),
            content_type="application/json",
            headers=_CORS_HEADERS,
        )

    async def _handle_patch_sidebar_state(request: web.Request) -> web.Response:
        try:
            body = await request.json()
        except Exception:
            return web.Response(status=400, text="invalid JSON", headers=_CORS_HEADERS)
        if not isinstance(body, dict):
            return web.Response(status=400, text="state must be an object", headers=_CORS_HEADERS)

        # Merge with existing state: only fields present in the patch body are updated.
        # For dict fields (title_overrides, project_name_overrides, tags_by_key,
        # collapsed_groups) we deep-merge so that partial patches don't erase
        # entries that were not included in the patch body.  For list and view
        # fields we replace wholesale (as before).
        existing = _read_sidebar_state()
        _dict_keys = {
            "title_overrides", "project_name_overrides",
            "tags_by_key", "collapsed_groups",
        }
        for key in (
            "pinned_keys", "archived_keys",
            "title_overrides", "project_name_overrides",
            "tags_by_key", "collapsed_groups", "view",
        ):
            if key in body:
                if key in _dict_keys and isinstance(body[key], dict):
                    existing.setdefault(key, {})
                    existing[key].update(body[key])
                else:
                    existing[key] = body[key]

        state = _write_sidebar_state(existing)
        return web.Response(
            text=json.dumps(state, ensure_ascii=False),
            content_type="application/json",
            headers=_CORS_HEADERS,
        )

    app.router.add_route("OPTIONS", "/api/user-overrides", _handle_options)
    app.router.add_get("/api/user-overrides", _handle_get_overrides)
    app.router.add_patch("/api/user-overrides", _handle_patch_overrides)
    app.router.add_get("/health", _handle_health)

    app.router.add_route("OPTIONS", "/api/sidebar-state", _handle_options)
    app.router.add_get("/api/sidebar-state", _handle_get_sidebar_state)
    app.router.add_patch("/api/sidebar-state", _handle_patch_sidebar_state)

    # ---------------------------------------------------------------------------
    # Auto-start endpoint — check / toggle Windows auto-start (Startup shortcut
    # + Registry Run key).  No restart needed; changes take effect immediately.
    # ---------------------------------------------------------------------------

    import winreg

    _AUTOSTART_REG_KEY = r"Software\Microsoft\Windows\CurrentVersion\Run"
    _AUTOSTART_REG_NAME = "MEC_Agent"

    def _autostart_shortcut_path() -> Path:
        """Return the path of the Startup-folder shortcut."""
        startup = Path(os.environ.get("APPDATA", "")) / "Microsoft" / "Windows" / "Start Menu" / "Programs" / "Startup"
        return startup / "MEC Agent.lnk"

    def _autostart_vbs_path() -> Path:
        """Return the path of launch_mec_agent.vbs relative to install root."""
        return _get_install_root() / "launcher" / "launch_mec_agent.vbs"

    def _check_autostart_enabled() -> dict:
        """Check both Startup shortcut and Registry Run key."""
        shortcut_exists = _autostart_shortcut_path().exists()
        reg_exists = False
        try:
            with winreg.OpenKey(winreg.HKEY_CURRENT_USER, _AUTOSTART_REG_KEY, 0, winreg.KEY_READ) as key:
                val, _ = winreg.QueryValueEx(key, _AUTOSTART_REG_NAME)
                reg_exists = True
        except (FileNotFoundError, OSError):
            pass
        enabled = shortcut_exists or reg_exists
        return {
            "enabled": enabled,
            "shortcut_exists": shortcut_exists,
            "registry_exists": reg_exists,
        }

    def _set_autostart(enabled: bool) -> dict:
        """Enable or disable auto-start via both Startup shortcut and Registry Run key."""
        shortcut_path = _autostart_shortcut_path()
        vbs_path = _autostart_vbs_path()

        if enabled:
            # --- Create Startup-folder shortcut using PowerShell ---
            # Use WScript.Shell COM to create a .lnk file (no extra deps).
            # We use -EncodedCommand (Base64 UTF-16LE) to avoid all quoting /
            # escaping hell between Python → cmd → PowerShell layers.
            wscript = Path(os.environ.get("SystemRoot", r"C:\Windows")) / "system32" / "wscript.exe"
            icon_path = _get_install_root() / "mec_agent.ico"
            ps_script = (
                f'$ws = New-Object -ComObject WScript.Shell\n'
                f'$sc = $ws.CreateShortcut(\'{shortcut_path}\')\n'
                f'$sc.TargetPath = \'{wscript}\'\n'
                f'$sc.Arguments = \'"{vbs_path}"\'\n'
                f'$sc.WorkingDirectory = \'{vbs_path.parent}\'\n'
                f'$sc.IconLocation = \'{icon_path},0\'\n'
                f'$sc.Description = \'MEC Agent\'\n'
                f'$sc.Save()'
            )
            import base64
            import subprocess
            encoded = base64.b64encode(ps_script.encode("utf-16-le")).decode("ascii")
            subprocess.run(
                ["powershell.exe", "-NoProfile", "-NonInteractive", "-EncodedCommand", encoded],
                check=True, timeout=10,
            )

            # --- Create Registry Run key ---
            _system_root = os.environ.get("SystemRoot", r"C:\Windows")
            _wscript = os.path.join(_system_root, "system32", "wscript.exe")
            cmd_value = f'"{_wscript}" "{vbs_path}"'
            try:
                with winreg.OpenKey(winreg.HKEY_CURRENT_USER, _AUTOSTART_REG_KEY, 0, winreg.KEY_SET_VALUE) as key:
                    winreg.SetValueEx(key, _AUTOSTART_REG_NAME, 0, winreg.REG_SZ, cmd_value)
            except OSError as exc:
                import logging
                logging.getLogger("api_server").warning("Failed to set autostart registry key: %s", exc)

        else:
            # --- Remove Startup-folder shortcut ---
            if shortcut_path.exists():
                try:
                    shortcut_path.unlink()
                except OSError:
                    pass

            # --- Remove Registry Run key ---
            try:
                with winreg.OpenKey(winreg.HKEY_CURRENT_USER, _AUTOSTART_REG_KEY, 0, winreg.KEY_SET_VALUE) as key:
                    winreg.DeleteValue(key, _AUTOSTART_REG_NAME)
            except (FileNotFoundError, OSError):
                pass

        return _check_autostart_enabled()

    async def _handle_get_autostart(request: web.Request) -> web.Response:
        status = _check_autostart_enabled()
        return web.Response(
            text=json.dumps(status),
            content_type="application/json",
            headers=_CORS_HEADERS,
        )

    async def _handle_patch_autostart(request: web.Request) -> web.Response:
        try:
            body = await request.json()
        except Exception:
            return web.Response(status=400, text="invalid JSON", headers=_CORS_HEADERS)
        enabled = body.get("enabled")
        if enabled is None or not isinstance(enabled, bool):
            return web.Response(
                status=400,
                text=json.dumps({"error": "enabled (boolean) field required"}),
                content_type="application/json",
                headers=_CORS_HEADERS,
            )
        status = _set_autostart(enabled)
        return web.Response(
            text=json.dumps(status),
            content_type="application/json",
            headers=_CORS_HEADERS,
        )

    app.router.add_route("OPTIONS", "/api/autostart", _handle_options)
    app.router.add_get("/api/autostart", _handle_get_autostart)
    app.router.add_patch("/api/autostart", _handle_patch_autostart)

    # ---------------------------------------------------------------------------
    # Local image proxy — serves image files from allowed directories so that
    # browser <img> elements can display file:/// paths (e.g. Playwright MCP
    # screenshots) that the browser security model would otherwise block.
    # ---------------------------------------------------------------------------

    _LOCAL_IMAGE_ALLOWED_DIRS: list[Path] = []
    # populated after workspace_path is resolved (see below)

    async def _handle_local_image(request: web.Request) -> web.Response:
        """Serve an image file from an allowed local directory.

        The ``path`` query parameter is the absolute local filesystem path.
        Only files inside the allowed directories are served; 403 otherwise.
        """
        raw_path = request.query.get("path", "")
        if not raw_path:
            return web.Response(status=400, text="missing path parameter",
                                headers=_CORS_HEADERS)
        try:
            # Handle file:/// URLs and plain paths
            if raw_path.startswith("file:///"):
                raw_path = raw_path[len("file:///"):]
            elif raw_path.startswith("file://"):
                raw_path = raw_path[len("file://"):]
            # URL-decode (e.g. %20 → space)
            raw_path = urllib.parse.unquote(raw_path)
            candidate = Path(raw_path).resolve()
        except (OSError, ValueError):
            return web.Response(status=400, text="invalid path",
                                headers=_CORS_HEADERS)
        # Security: only serve files from allowed directories
        allowed = False
        for allowed_dir in _LOCAL_IMAGE_ALLOWED_DIRS:
            try:
                candidate.relative_to(allowed_dir.resolve())
                allowed = True
                break
            except (ValueError, OSError):
                continue
        if not allowed:
            return web.Response(status=403, text="path not in allowed directories",
                                headers=_CORS_HEADERS)
        # Playwright screenshot paths can drift between workspace/.playwright-mcp
        # and repo/.playwright-mcp across different launch modes. When the given
        # file is missing, try the same filename in other allowed screenshot dirs.
        if (not candidate.is_file()) and candidate.name and candidate.parent.name == ".playwright-mcp":
            for allowed_dir in _LOCAL_IMAGE_ALLOWED_DIRS:
                try:
                    alt = (allowed_dir / candidate.name).resolve()
                except (OSError, ValueError):
                    continue
                if alt.is_file():
                    candidate = alt
                    break
        if not candidate.is_file():
            return web.Response(status=404, text="file not found",
                                headers=_CORS_HEADERS)
        try:
            body = candidate.read_bytes()
        except OSError:
            return web.Response(status=500, text="read error",
                                headers=_CORS_HEADERS)
        mime, _ = mimetypes.guess_type(candidate.name)
        if not mime or not mime.startswith("image/"):
            return web.Response(status=403, text="not an image file",
                                headers=_CORS_HEADERS)
        return web.Response(
            body=body,
            content_type=mime,
            headers={
                **_CORS_HEADERS,
                "Cache-Control": "private, max-age=3600",
                "X-Content-Type-Options": "nosniff",
            },
        )

    app.router.add_route("OPTIONS", "/api/local-image", _handle_options)
    app.router.add_get("/api/local-image", _handle_local_image)

    # ---------------------------------------------------------------------------
    # Open local file — opens a file or directory with the OS default handler
    # (e.g. Explorer for folders, default editor for files). Used by the UI
    # when the user clicks a file path link in chat messages.
    # ---------------------------------------------------------------------------

    async def _handle_open_file(request: web.Request) -> web.Response:
        """Open a local file/directory with the OS default handler.

        POST body: { "path": "D:\\folder\\file.txt" }
        Security: only paths within allowed directories are permitted.
        """
        try:
            body = await request.json()
        except Exception:
            return web.Response(status=400, text="invalid JSON",
                                headers=_CORS_HEADERS)
        raw_path = body.get("path", "") if isinstance(body, dict) else ""
        if not raw_path:
            return web.Response(status=400, text="missing path",
                                headers=_CORS_HEADERS)
        try:
            if raw_path.startswith("file:///"):
                raw_path = raw_path[len("file:///"):]
            elif raw_path.startswith("file://"):
                raw_path = raw_path[len("file://"):]
            raw_path = urllib.parse.unquote(raw_path)
            candidate = Path(raw_path).resolve()
        except (OSError, ValueError):
            return web.Response(status=400, text="invalid path",
                                headers=_CORS_HEADERS)

        # Security: only allow paths inside allowed directories
        allowed = False
        for allowed_dir in _LOCAL_IMAGE_ALLOWED_DIRS:
            try:
                candidate.relative_to(allowed_dir.resolve())
                allowed = True
                break
            except (ValueError, OSError):
                continue
        # Also allow any existing path that is not hidden / system-critical
        # as a fallback for paths outside workspace (e.g. user project dirs)
        if not allowed and candidate.exists():
            # Block obvious system paths
            str_path = str(candidate).lower()
            _blocked_prefixes = (
                "c:\\windows", "c:\\program files", "c:\\programdata",
                "/usr/", "/etc/", "/sbin/", "/boot/",
            )
            if not any(str_path.startswith(p) for p in _blocked_prefixes):
                allowed = True

        if not allowed:
            return web.Response(status=403, text="path not in allowed directories",
                                headers=_CORS_HEADERS)

        try:
            if os.name == "nt":
                os.startfile(str(candidate))
            else:
                import subprocess
                subprocess.Popen(["xdg-open" if sys.platform == "linux" else "open", str(candidate)])
        except OSError as exc:
            return web.Response(status=500, text=f"open error: {exc}",
                                headers=_CORS_HEADERS)
        return web.json_response({"ok": True}, headers=_CORS_HEADERS)

    app.router.add_route("OPTIONS", "/api/open-file", _handle_options)
    app.router.add_post("/api/open-file", _handle_open_file)

    # ---------------------------------------------------------------------------
    # Open folder — opens Windows Explorer / macOS Finder / Linux file manager
    # focusing on a specific file.
    # ---------------------------------------------------------------------------

    async def _handle_open_folder(request: web.Request) -> web.Response:
        """POST /api/open-folder — open file manager and select a file.

        Body: { "path": "D:\\folder\\file.txt" }
        """
        try:
            body = await request.json()
        except Exception:
            return web.Response(status=400, text="invalid JSON", headers=_CORS_HEADERS)
        raw_path = body.get("path", "") if isinstance(body, dict) else ""
        if not raw_path:
            return web.Response(status=400, text="missing path", headers=_CORS_HEADERS)
        try:
            if raw_path.startswith("file:///"):
                raw_path = raw_path[len("file:///"):]
            elif raw_path.startswith("file://"):
                raw_path = raw_path[len("file://"):]
            raw_path = urllib.parse.unquote(raw_path)
            candidate = Path(raw_path).resolve()
        except (OSError, ValueError):
            return web.Response(status=400, text="invalid path", headers=_CORS_HEADERS)

        # Re-use same security logic as _handle_open_file
        allowed = False
        for allowed_dir in _LOCAL_IMAGE_ALLOWED_DIRS:
            try:
                candidate.relative_to(allowed_dir.resolve())
                allowed = True
                break
            except (ValueError, OSError):
                continue
        if not allowed and candidate.exists():
            str_path = str(candidate).lower()
            _blocked_prefixes = (
                "c:\\windows", "c:\\program files", "c:\\programdata",
                "/usr/", "/etc/", "/sbin/", "/boot/",
            )
            if not any(str_path.startswith(p) for p in _blocked_prefixes):
                allowed = True

        if not allowed:
            return web.Response(status=403, text="path not in allowed directories",
                                headers=_CORS_HEADERS)

        try:
            if os.name == "nt":
                import subprocess
                subprocess.Popen(["explorer", "/select,", str(candidate)])
            elif sys.platform == "darwin":
                import subprocess
                subprocess.Popen(["open", "-R", str(candidate)])
            else:
                import subprocess
                # Fallback: open parent directory
                parent = str(candidate.parent) if candidate.exists() else str(candidate)
                subprocess.Popen(["xdg-open", parent])
        except OSError as exc:
            return web.Response(status=500, text=f"open error: {exc}",
                                headers=_CORS_HEADERS)
        return web.json_response({"ok": True}, headers=_CORS_HEADERS)

    app.router.add_route("OPTIONS", "/api/open-folder", _handle_options)
    app.router.add_post("/api/open-folder", _handle_open_folder)

    # ---------------------------------------------------------------------------
    # Proxy settings endpoints — read / test / set HTTP proxy
    # ---------------------------------------------------------------------------

    _PROXY_HOST_DEFAULT = "172.18.193.161"
    _PROXY_PORT_DEFAULT = 80
    _NO_PROXY_DEFAULT = "localhost,127.0.0.1,.pegatroncorp.com,172.18.212.48,172.18.212.49"

    def _mask_proxy_password(url: str) -> str:
        """Return proxy URL with password masked as ***."""
        try:
            parsed = urllib.parse.urlparse(url)
            if parsed.username and parsed.password:
                userinfo = f"{parsed.username}:{parsed.password}"
                netloc = parsed.netloc.replace(userinfo, f"{parsed.username}:***", 1)
                return urllib.parse.urlunparse(parsed._replace(netloc=netloc))
        except Exception:
            pass
        return url

    def _parse_proxy_env() -> dict:
        """Parse HTTP_PROXY / HTTPS_PROXY / NO_PROXY from os.environ."""
        http_proxy = (os.environ.get("HTTP_PROXY") or os.environ.get("http_proxy") or "").strip()
        https_proxy = (os.environ.get("HTTPS_PROXY") or os.environ.get("https_proxy") or "").strip()
        no_proxy = (os.environ.get("NO_PROXY") or os.environ.get("no_proxy") or "").strip()
        if (not http_proxy or not https_proxy or not no_proxy) and os.name == "nt":
            try:
                from update_manager import _read_windows_user_env_var

                if not http_proxy:
                    http_proxy = (_read_windows_user_env_var("HTTP_PROXY") or _read_windows_user_env_var("http_proxy") or "").strip()
                if not https_proxy:
                    https_proxy = (_read_windows_user_env_var("HTTPS_PROXY") or _read_windows_user_env_var("https_proxy") or "").strip()
                if not no_proxy:
                    no_proxy = (_read_windows_user_env_var("NO_PROXY") or _read_windows_user_env_var("no_proxy") or "").strip()
            except Exception:
                pass
        proxy_url = http_proxy or https_proxy
        has_proxy = bool(proxy_url)
        result: dict = {
            "httpProxy": _mask_proxy_password(http_proxy) if http_proxy else "",
            "httpsProxy": _mask_proxy_password(https_proxy) if https_proxy else "",
            "hasProxy": has_proxy,
            "proxyHost": "",
            "proxyPort": None,
            "proxyUser": "",
            "noProxy": no_proxy,
        }
        if has_proxy:
            try:
                parsed = urllib.parse.urlparse(proxy_url)
                result["proxyHost"] = parsed.hostname or ""
                result["proxyPort"] = parsed.port
                result["proxyUser"] = parsed.username or ""
            except Exception:
                pass
        return result

    def _set_user_env_var(name: str, value: str) -> None:
        """Write a Windows user environment variable (persistent) and update current process."""
        if os.name == "nt":
            import winreg
            import ctypes
            key = winreg.OpenKey(
                winreg.HKEY_CURRENT_USER,
                r"Environment",
                0,
                winreg.KEY_SET_VALUE,
            )
            winreg.SetValueEx(key, name, 0, winreg.REG_SZ, value)
            winreg.CloseKey(key)
            # Notify other applications to reload environment
            ctypes.windll.user32.SendMessageW(0xFFFF, 0x001A, 0, "Environment")  # type: ignore[attr-defined]
        # Also update current process environment (immediate effect)
        os.environ[name] = value

    async def _handle_get_proxy(request: web.Request) -> web.Response:
        """GET /api/proxy — return current proxy status."""
        return web.Response(
            text=json.dumps(_parse_proxy_env()),
            content_type="application/json",
            headers=_CORS_HEADERS,
        )

    async def _handle_test_proxy(request: web.Request) -> web.Response:
        """POST /api/proxy/test — test proxy connectivity."""
        import time

        proxy_url = (os.environ.get("HTTP_PROXY") or os.environ.get("http_proxy")
                     or os.environ.get("HTTPS_PROXY") or os.environ.get("https_proxy") or "").strip()
        if not proxy_url and os.name == "nt":
            try:
                from update_manager import _read_windows_user_env_var

                proxy_url = (_read_windows_user_env_var("HTTP_PROXY") or _read_windows_user_env_var("http_proxy")
                             or _read_windows_user_env_var("HTTPS_PROXY") or _read_windows_user_env_var("https_proxy") or "").strip()
            except Exception:
                proxy_url = ""

        if not proxy_url:
            return web.Response(
                text=json.dumps({
                    "success": False,
                    "target": "",
                    "statusCode": None,
                    "responseTimeMs": None,
                    "error": "No proxy configured (HTTP_PROXY / HTTPS_PROXY not set)",
                }),
                content_type="application/json",
                headers=_CORS_HEADERS,
            )

        # Build opener using existing update_manager helpers
        install_root = Path(__file__).parent.parent if not getattr(sys, "frozen", False) else Path(sys.executable).parent.parent
        from update_manager import _make_opener, _get_ca_bundle

        # Fallback test targets: try gstatic first, then msftconnecttest if blocked
        _TEST_TARGETS = [
            "https://connectivitycheck.gstatic.com/generate_204",
            "http://www.msftconnecttest.com/connecttest.txt",
        ]

        opener = _make_opener(install_root)
        start = time.monotonic()
        last_error: str = ""
        last_target: str = ""
        last_status_code = None

        for test_target in _TEST_TARGETS:
            try:
                req = urllib.request.Request(test_target, method="GET")
                req.add_header("User-Agent", "MEC-Agent-ProxyTest/1.0")
                resp = opener.open(req, timeout=10)
                elapsed_ms = int((time.monotonic() - start) * 1000)
                status_code = resp.getcode()
                resp.close()
                return web.Response(
                    text=json.dumps({
                        "success": True,
                        "target": test_target,
                        "statusCode": status_code,
                        "responseTimeMs": elapsed_ms,
                        "error": None,
                    }),
                    content_type="application/json",
                    headers=_CORS_HEADERS,
                )
            except urllib.error.HTTPError as exc:
                last_target = test_target
                last_status_code = exc.code
                last_error = f"{exc.code} {exc.reason}"
                continue
            except Exception as exc:
                last_target = test_target
                last_error = str(exc)
                continue

        # All targets failed
        elapsed_ms = int((time.monotonic() - start) * 1000)
        return web.Response(
            text=json.dumps({
                "success": False,
                "target": last_target,
                "statusCode": last_status_code,
                "responseTimeMs": elapsed_ms,
                "error": last_error,
            }),
            content_type="application/json",
            headers=_CORS_HEADERS,
        )

    async def _handle_patch_proxy(request: web.Request) -> web.Response:
        """PATCH /api/proxy — set proxy credentials and write to env vars."""
        try:
            body = await request.json()
        except Exception:
            return web.Response(status=400, text=json.dumps({"error": "invalid JSON"}),
                                content_type="application/json", headers=_CORS_HEADERS)

        username = str(body.get("username", "")).strip()
        password = str(body.get("password", "")).strip()
        proxy_host = str(body.get("proxyHost", _PROXY_HOST_DEFAULT)).strip()
        proxy_port = body.get("proxyPort", _PROXY_PORT_DEFAULT)
        no_proxy = str(body.get("noProxy", _NO_PROXY_DEFAULT)).strip()

        if not username or not password:
            return web.Response(
                status=400,
                text=json.dumps({"error": "username and password are required"}),
                content_type="application/json",
                headers=_CORS_HEADERS,
            )

        # Build proxy URL: http://user:pass@host:port
        encoded_user = urllib.parse.quote(username, safe="")
        encoded_pass = urllib.parse.quote(password, safe="")
        proxy_url = f"http://{encoded_user}:{encoded_pass}@{proxy_host}:{proxy_port}"

        # Write to Windows user environment variables (persistent)
        _set_user_env_var("HTTP_PROXY", proxy_url)
        _set_user_env_var("HTTPS_PROXY", proxy_url)
        if no_proxy:
            _set_user_env_var("NO_PROXY", no_proxy)
            _set_user_env_var("no_proxy", no_proxy)

        masked_url = _mask_proxy_password(proxy_url)
        return web.Response(
            text=json.dumps({
                "ok": True,
                "httpProxy": masked_url,
                "message": "已寫入使用者環境變數,並更新當前進程環境",
            }),
            content_type="application/json",
            headers=_CORS_HEADERS,
        )

    app.router.add_route("OPTIONS", "/api/proxy", _handle_options)
    app.router.add_route("OPTIONS", "/api/proxy/test", _handle_options)
    app.router.add_get("/api/proxy", _handle_get_proxy)
    app.router.add_post("/api/proxy/test", _handle_test_proxy)
    app.router.add_patch("/api/proxy", _handle_patch_proxy)

    # ---------------------------------------------------------------------------
    # Pega AI connection endpoint — verify provider apiBase + apiKey
    # ---------------------------------------------------------------------------

    def _read_pega_ai_config() -> dict:
        """Read the active custom provider settings from config.json."""
        result = {
            "provider": "custom",
            "model": "",
            "apiBase": "",
            "hasApiKey": False,
            "maskedApiKey": "",
            "configured": False,
        }
        if not config_path or not config_path.exists():
            return result
        try:
            data = json.loads(config_path.read_text(encoding="utf-8-sig"))
            defaults = data.get("agents", {}).get("defaults", {}) or {}
            provider_name = str(defaults.get("provider", "custom") or "custom").strip()
            providers = data.get("providers", {}) or {}
            provider_cfg = providers.get(provider_name, {}) if isinstance(providers, dict) else {}
            api_base = str(provider_cfg.get("apiBase", "") or "").strip()
            api_key = str(provider_cfg.get("apiKey", "") or "").strip()
            model = str(defaults.get("model", "") or "").strip()
            masked = (api_key[:4] + "***" + api_key[-4:]) if len(api_key) > 8 else ("***" if api_key else "")
            result.update({
                "provider": provider_name,
                "model": model,
                "apiBase": api_base,
                "hasApiKey": bool(api_key),
                "maskedApiKey": masked,
                "configured": bool(api_base and api_key and model),
            })
        except Exception:
            pass
        return result

    async def _handle_get_pega_ai_status(request: web.Request) -> web.Response:
        """GET /api/pegaai/status — return configured provider info without secrets."""
        return web.Response(
            text=json.dumps(_read_pega_ai_config()),
            content_type="application/json",
            headers=_CORS_HEADERS,
        )

    async def _handle_test_pega_ai(request: web.Request) -> web.Response:
        """POST /api/pegaai/test — verify apiBase + apiKey by issuing a tiny chat request."""
        cfg = _read_pega_ai_config()
        api_base = cfg.get("apiBase", "")
        api_key = ""
        model = cfg.get("model", "")

        if not config_path or not config_path.exists():
            return web.Response(
                text=json.dumps({
                    "success": False,
                    "apiBase": "",
                    "model": "",
                    "statusCode": None,
                    "responseTimeMs": None,
                    "error": "config.json not found",
                }),
                content_type="application/json",
                headers=_CORS_HEADERS,
            )

        try:
            data = json.loads(config_path.read_text(encoding="utf-8-sig"))
            defaults = data.get("agents", {}).get("defaults", {}) or {}
            provider_name = str(defaults.get("provider", "custom") or "custom").strip()
            providers = data.get("providers", {}) or {}
            provider_cfg = providers.get(provider_name, {}) if isinstance(providers, dict) else {}
            api_base = str(provider_cfg.get("apiBase", "") or "").strip()
            api_key = str(provider_cfg.get("apiKey", "") or "").strip()
            model = str(defaults.get("model", "") or "").strip()
        except Exception as exc:
            return web.Response(
                text=json.dumps({
                    "success": False,
                    "apiBase": "",
                    "model": "",
                    "statusCode": None,
                    "responseTimeMs": None,
                    "error": f"Failed to read config: {exc}",
                }),
                content_type="application/json",
                headers=_CORS_HEADERS,
            )

        if not api_base or not api_key or not model:
            return web.Response(
                text=json.dumps({
                    "success": False,
                    "apiBase": api_base,
                    "model": model,
                    "statusCode": None,
                    "responseTimeMs": None,
                    "error": "apiBase/apiKey/model is missing",
                }),
                content_type="application/json",
                headers=_CORS_HEADERS,
            )

        chat_url = f"{api_base.rstrip('/')}/chat/completions"
        payload = {
            "model": model,
            "messages": [
                {"role": "user", "content": "請只回覆一個字:ok"},
            ],
            "max_tokens": 1,
            "temperature": 0,
            "stream": False,
        }

        headers = {
            "Authorization": f"Bearer {api_key}",
            "Content-Type": "application/json",
            "Accept": "application/json",
        }

        import time
        start = time.monotonic()
        try:
            async with httpx.AsyncClient(timeout=httpx.Timeout(20.0), trust_env=True) as client:
                response = await client.post(chat_url, headers=headers, json=payload)
            elapsed_ms = int((time.monotonic() - start) * 1000)
            if response.status_code >= 400:
                body_text = response.text[:500] if response.text else ""
                return web.Response(
                    text=json.dumps({
                        "success": False,
                        "apiBase": api_base,
                        "model": model,
                        "statusCode": response.status_code,
                        "responseTimeMs": elapsed_ms,
                        "error": body_text or f"HTTP {response.status_code}",
                    }, ensure_ascii=False),
                    content_type="application/json",
                    headers=_CORS_HEADERS,
                )
            reply_text = ""
            try:
                body = response.json()
                choices = body.get("choices", []) if isinstance(body, dict) else []
                if choices:
                    message = choices[0].get("message", {}) if isinstance(choices[0], dict) else {}
                    reply_text = str(message.get("content", "") or "").strip()
            except Exception:
                pass
            return web.Response(
                text=json.dumps({
                    "success": True,
                    "apiBase": api_base,
                    "model": model,
                    "statusCode": response.status_code,
                    "responseTimeMs": elapsed_ms,
                    "reply": reply_text,
                    "error": None,
                }, ensure_ascii=False),
                content_type="application/json",
                headers=_CORS_HEADERS,
            )
        except Exception as exc:
            elapsed_ms = int((time.monotonic() - start) * 1000)
            return web.Response(
                text=json.dumps({
                    "success": False,
                    "apiBase": api_base,
                    "model": model,
                    "statusCode": None,
                    "responseTimeMs": elapsed_ms,
                    "error": str(exc),
                }, ensure_ascii=False),
                content_type="application/json",
                headers=_CORS_HEADERS,
            )

    app.router.add_route("OPTIONS", "/api/pegaai/status", _handle_options)
    app.router.add_route("OPTIONS", "/api/pegaai/test", _handle_options)
    app.router.add_get("/api/pegaai/status", _handle_get_pega_ai_status)
    app.router.add_post("/api/pegaai/test", _handle_test_pega_ai)

    # ---------------------------------------------------------------------------
    # Ein-Wiki launch endpoints
    # ---------------------------------------------------------------------------

    _WIKI_URL = "http://localhost:3001"

    def _wiki_paths() -> "tuple[str, str] | None":
        """Return (node_exe, server_js) if the ein-wiki folder is present, else None."""
        d = args.mec_wiki or ""
        if not d:
            return None
        node_exe = os.path.join(d, "runtime", "node.exe")
        server_js = os.path.join(d, "server.js")
        if os.path.exists(node_exe) and os.path.exists(server_js):
            return node_exe, server_js
        return None

    async def _handle_wiki_status(request: web.Request) -> web.Response:
        return web.Response(
            text=json.dumps({"available": _wiki_paths() is not None}),
            content_type="application/json",
            headers=_CORS_HEADERS,
        )

    async def _handle_wiki_start(request: web.Request) -> web.Response:
        paths = _wiki_paths()
        if not paths:
            return web.Response(
                status=404,
                text=json.dumps({"error": "Ein-Wiki not found"}),
                content_type="application/json",
                headers=_CORS_HEADERS,
            )
        node_exe, server_js = paths
        wiki_dir = os.path.dirname(server_js)
        import subprocess
        wiki_env = os.environ.copy()
        vault_path = getattr(args, "mec_wiki_vault", None) or ""
        cmd = [node_exe, server_js]
        if vault_path:
            cmd += ["--vault", vault_path]
            wiki_env["WIKI_VAULT_PATH"] = vault_path
        subprocess.Popen(
            cmd,
            cwd=wiki_dir,
            env=wiki_env,
            creationflags=0x00000008,  # DETACHED_PROCESS
            close_fds=True,
        )
        return web.Response(
            text=json.dumps({"ok": True, "url": _WIKI_URL}),
            content_type="application/json",
            headers=_CORS_HEADERS,
        )

    app.router.add_route("OPTIONS", "/api/wiki/status", _handle_options)
    app.router.add_route("OPTIONS", "/api/wiki/start",  _handle_options)
    app.router.add_get("/api/wiki/status",  _handle_wiki_status)
    app.router.add_post("/api/wiki/start",  _handle_wiki_start)

    # ---------------------------------------------------------------------------
    # Log viewer endpoint — read last N lines from log files
    # ---------------------------------------------------------------------------

    def _get_log_dir() -> Path | None:
        """Return the logs/ directory next to the installation root."""
        try:
            if getattr(sys, "frozen", False):
                # Running as PyInstaller exe: <install>/api_server/api_server.exe
                install_root = Path(sys.executable).parent.parent
            else:
                # Running as script: <install>/api_server/server.py
                install_root = Path(__file__).parent.parent
            return install_root / "logs"
        except Exception:
            return None

    _LOG_FILE_MAP: dict[str, str] = {
        "nanobot_out": "mec_agent_gateway.out.log",
        "nanobot_err": "mec_agent_gateway.err.log",
        "api_out":     "api_server.out.log",
        "api_err":     "api_server.err.log",
        "llama_out":   "llama_server.out.log",
        "llama_err":   "llama_server.err.log",
    }

    async def _handle_get_logs(request: web.Request) -> web.Response:
        log_dir = _get_log_dir()
        if not log_dir:
            return web.Response(
                status=503,
                text=json.dumps({"error": "log directory unavailable"}),
                content_type="application/json",
                headers=_CORS_HEADERS,
            )
        file_key = request.rel_url.query.get("file", "nanobot_out")
        if file_key not in _LOG_FILE_MAP:
            file_key = "nanobot_out"
        filename = _LOG_FILE_MAP[file_key]
        try:
            lines_param = min(max(int(request.rel_url.query.get("lines", "300")), 1), 5000)
        except ValueError:
            lines_param = 300

        log_path = log_dir / filename
        lines: list[str] = []
        if log_path.exists():
            try:
                content = log_path.read_text(encoding="utf-8", errors="replace")
                all_lines = content.splitlines()
                lines = all_lines[-lines_param:]
            except Exception as exc:
                lines = [f"[error reading log: {exc}]"]

        available = [k for k, v in _LOG_FILE_MAP.items() if (log_dir / v).exists()]
        return web.Response(
            text=json.dumps({
                "file": file_key,
                "filename": filename,
                "lines": lines,
                "available": available,
            }, ensure_ascii=False),
            content_type="application/json",
            headers=_CORS_HEADERS,
        )

    app.router.add_route("OPTIONS", "/api/logs", _handle_options)
    app.router.add_get("/api/logs", _handle_get_logs)

    # ---------------------------------------------------------------------------
    # Version endpoint
    # ---------------------------------------------------------------------------

    async def _handle_get_version(_request: web.Request) -> web.Response:
        return web.Response(
            text=json.dumps({"ui": _APP_VERSION, "backend": _APP_VERSION}),
            content_type="application/json",
            headers=_CORS_HEADERS,
        )

    app.router.add_route("OPTIONS", "/api/version", _handle_options)
    app.router.add_get("/api/version", _handle_get_version)

    # ---------------------------------------------------------------------------
    # Changelog endpoint — serve parsed CHANGELOG.md as structured JSON
    # ---------------------------------------------------------------------------

    _CHANGELOG_CACHE: dict = {"mtime": 0.0, "data": None}

    def _parse_changelog(text: str) -> list:
        """Parse a Keep-a-Changelog formatted string into a list of version entries.

        Each entry: { version, date, sections: { Added|Fixed|Changed|...: [items] } }
        """
        import re as _re
        entries: list = []
        current_entry = None
        current_section = None

        for raw_line in text.splitlines():
            line = raw_line

            # Version heading: ## [X.Y.Z] - YYYY-MM-DD  or  ## [Unreleased]
            m = _re.match(r'^##\s+\[([^\]]+)\](?:\s*-\s*(\d{4}-\d{2}-\d{2}))?', line)
            if m:
                if current_entry is not None:
                    entries.append(current_entry)
                current_entry = {
                    "version": m.group(1),
                    "date": m.group(2) or "",
                    "sections": {},
                }
                current_section = None
                continue

            # Section heading: ### Added, ### Fixed, ### Changed, etc.
            m2 = _re.match(r'^###\s+(\w+)', line)
            if m2 and current_entry is not None:
                current_section = m2.group(1)
                current_entry["sections"][current_section] = []
                continue

            # Item line: - description
            if current_entry is not None and current_section is not None:
                item = line.strip()
                if item.startswith("- "):
                    current_entry["sections"][current_section].append(item[2:])
                elif item == "":
                    continue
                # Skip --- separator lines

        if current_entry is not None:
            entries.append(current_entry)

        return entries

    async def _handle_get_changelog(_request: web.Request) -> web.Response:
        """GET /api/changelog — return parsed CHANGELOG.md as structured JSON."""
        nonlocal _CHANGELOG_CACHE
        try:
            install_root = _get_install_root()
            # CHANGELOG.md lives alongside version.json in update/ directory.
            candidates = [
                install_root / "update" / "CHANGELOG.md",
                install_root / "CHANGELOG.md",
                install_root.parent / "CHANGELOG.md",
            ]
            changelog_path = None
            for c in candidates:
                if c.exists():
                    changelog_path = c
                    break

            if changelog_path is None:
                return web.Response(
                    status=404,
                    text=json.dumps({"error": "CHANGELOG.md not found"}),
                    content_type="application/json",
                    headers=_CORS_HEADERS,
                )

            mtime = changelog_path.stat().st_mtime
            if _CHANGELOG_CACHE["data"] is not None and _CHANGELOG_CACHE["mtime"] == mtime:
                data = _CHANGELOG_CACHE["data"]
            else:
                raw = changelog_path.read_text(encoding="utf-8")
                data = _parse_changelog(raw)
                _CHANGELOG_CACHE = {"mtime": mtime, "data": data}

            return web.Response(
                text=json.dumps(data, ensure_ascii=False),
                content_type="application/json",
                headers=_CORS_HEADERS,
            )
        except Exception as e:
            return web.Response(
                status=500,
                text=json.dumps({"error": str(e)}),
                content_type="application/json",
                headers=_CORS_HEADERS,
            )

    app.router.add_route("OPTIONS", "/api/changelog", _handle_options)
    app.router.add_get("/api/changelog", _handle_get_changelog)

    # ---------------------------------------------------------------------------
    # Speech-to-text transcription endpoint — uses local faster-whisper model
    # ---------------------------------------------------------------------------

    def _get_whisper_model_path() -> Path:
        """Return path to the faster-whisper model directory.

        Priority:
          1. Value stored in user_overrides.json (user-configurable via Settings UI)
          2. Bundled path: <api_server_dir>/whisper/faster_whisper_tiny
        """
        overrides_file = _overrides_path()
        if overrides_file and overrides_file.exists():
            try:
                data = json.loads(overrides_file.read_text(encoding="utf-8-sig"))
                custom = data.get("whisperModelPath", "").strip()
                if custom:
                    return Path(custom)
            except Exception:
                pass
        # Default bundled path
        if getattr(sys, "frozen", False):
            base = Path(sys.executable).parent
        else:
            base = Path(__file__).parent
        return base / "whisper" / "faster_whisper_tiny"

    _whisper_model_cache: dict = {}

    def _load_whisper_model():
        """Load and cache the faster-whisper model (lazy, thread-safe enough for single-worker use)."""
        if "model" in _whisper_model_cache:
            return _whisper_model_cache["model"]
        model_path = _get_whisper_model_path()
        if not model_path.exists():
            raise FileNotFoundError(f"Whisper model not found at: {model_path}")
        try:
            from faster_whisper import WhisperModel  # type: ignore
            model = WhisperModel(str(model_path), device="cpu", compute_type="int8")
            _whisper_model_cache["model"] = model
            print(f"[api_server] Whisper model loaded from: {model_path}", flush=True)
            return model
        except ImportError:
            raise ImportError("faster-whisper is not installed. Run: pip install faster-whisper")

    async def _handle_transcribe(request: web.Request) -> web.Response:
        """POST /api/transcribe — accept audio file (multipart or raw), return transcribed text."""
        import tempfile
        import asyncio

        try:
            # Accept multipart/form-data (field name: "audio") or raw body
            content_type = request.headers.get("Content-Type", "")
            if "multipart/form-data" in content_type:
                reader = await request.multipart()
                field = await reader.next()
                if field is None or field.name != "audio":
                    return web.Response(
                        status=400,
                        text=json.dumps({"error": "expected multipart field 'audio'"}),
                        content_type="application/json",
                        headers=_CORS_HEADERS,
                    )
                audio_bytes = await field.read(decode=True)
                filename = field.filename or "audio.webm"
            else:
                audio_bytes = await request.read()
                filename = "audio.webm"

            if not audio_bytes:
                return web.Response(
                    status=400,
                    text=json.dumps({"error": "empty audio data"}),
                    content_type="application/json",
                    headers=_CORS_HEADERS,
                )

            # Write to temp file with correct extension for ffmpeg detection
            suffix = Path(filename).suffix or ".webm"
            with tempfile.NamedTemporaryFile(suffix=suffix, delete=False) as tmp:
                tmp.write(audio_bytes)
                tmp_path = tmp.name

            try:
                loop = asyncio.get_event_loop()
                transcript = await loop.run_in_executor(None, _do_transcribe, tmp_path)
            finally:
                try:
                    os.unlink(tmp_path)
                except Exception:
                    pass

            return web.Response(
                text=json.dumps({"text": transcript}),
                content_type="application/json",
                headers=_CORS_HEADERS,
            )
        except FileNotFoundError as e:
            return web.Response(
                status=503,
                text=json.dumps({"error": str(e)}),
                content_type="application/json",
                headers=_CORS_HEADERS,
            )
        except ImportError as e:
            return web.Response(
                status=503,
                text=json.dumps({"error": str(e)}),
                content_type="application/json",
                headers=_CORS_HEADERS,
            )
        except Exception as e:
            print(f"[api_server] transcribe error: {e}", file=sys.stderr)
            return web.Response(
                status=500,
                text=json.dumps({"error": f"transcription failed: {e}"}),
                content_type="application/json",
                headers=_CORS_HEADERS,
            )

    def _do_transcribe(audio_path: str) -> str:
        """Run faster-whisper transcription synchronously (called in executor)."""
        model = _load_whisper_model()
        segments, _info = model.transcribe(audio_path, beam_size=5)
        return "".join(seg.text for seg in segments).strip()

    app.router.add_route("OPTIONS", "/api/transcribe", _handle_options)
    app.router.add_post("/api/transcribe", _handle_transcribe)
    _UPLOAD_MAX_MB_DEFAULT = 200
    _UPLOAD_CHUNK = 1024 * 1024  # 1 MB per read_chunk
    _uploads_dir_state = {"value": None}

    def _get_upload_max_mb() -> int:
        """Read upload max MB from config.json, fallback to 200."""
        if config_path and config_path.exists():
            try:
                data = json.loads(config_path.read_text(encoding="utf-8-sig"))
                return int(data.get("api", {}).get("uploadMaxMb", _UPLOAD_MAX_MB_DEFAULT))
            except Exception:
                pass
        return _UPLOAD_MAX_MB_DEFAULT

    def _get_uploads_dir() -> Path | None:
        """Lazy-resolve the uploads directory so workspace_path is ready."""
        print(f"[api_server] debug workspace_path={workspace_path!r}")
        if _uploads_dir_state["value"] is None and workspace_path:
            _uploads_dir_state["value"] = workspace_path / "uploads"
            try:
                _uploads_dir_state["value"].mkdir(parents=True, exist_ok=True)
            except Exception as e:
                print(f"[api_server] cannot create uploads dir: {e}", file=sys.stderr)
                _uploads_dir_state["value"] = None
        return _uploads_dir_state["value"]

    async def _handle_upload(request: web.Request) -> web.Response:
        """POST /api/upload — multipart file upload. Returns {path,name,size}."""
        uploads_dir = _get_uploads_dir()
        if not uploads_dir:
            return web.Response(
                status=503,
                text=json.dumps({"error": "workspace unavailable"}),
                content_type="application/json",
                headers=_CORS_HEADERS,
            )
        max_mb = _get_upload_max_mb()
        max_bytes = max_mb * 1024 * 1024

        def _size_exceeded_resp() -> web.Response:
            return web.Response(
                status=413,
                text=json.dumps({
                    "error": f"total upload exceeds {max_mb} MB",
                    "maxSizeMb": max_mb,
                    "suggestion": (
                        "對於大型檔案,請直接在對話中輸入檔案路徑"
                        "(例如 D:\\design\\part.stp),"
                        "AI Agent 可直接讀取本機檔案。"
                    ),
                    "suggestionEn": (
                        "For large files, enter the file path directly in the chat"
                        " (e.g. D:\\design\\part.stp)."
                        " The AI Agent can read local files directly."
                    ),
                    "code": "UPLOAD_SIZE_EXCEEDED",
                }),
                content_type="application/json",
                headers=_CORS_HEADERS,
            )

        try:
            reader = await request.multipart()
            results: list[dict] = []
            total_size = 0
            while True:
                field = await reader.next()
                if field is None:
                    break
                # Skip non-file fields
                if not getattr(field, "filename", None):
                    # ponytail: drain non-file fields with chunked read
                    while await field.read_chunk(_UPLOAD_CHUNK):
                        pass
                    continue
                name = field.filename or "unnamed"
                safe_name = Path(name).name
                dest_name = f"{uuid.uuid4().hex}_{safe_name}"
                dest_path = uploads_dir / dest_name
                file_size = 0
                try:
                    with open(dest_path, "wb") as f:
                        while True:
                            chunk = await field.read_chunk(_UPLOAD_CHUNK)
                            if not chunk:
                                break
                            total_size += len(chunk)
                            file_size += len(chunk)
                            if total_size > max_bytes:
                                # Exceeded — remove partial file and reject
                                try:
                                    dest_path.unlink(missing_ok=True)
                                except Exception:
                                    pass
                                return _size_exceeded_resp()
                            f.write(chunk)
                except Exception as e:
                    try:
                        dest_path.unlink(missing_ok=True)
                    except Exception:
                        pass
                    raise
                if file_size == 0:
                    try:
                        dest_path.unlink(missing_ok=True)
                    except Exception:
                        pass
                    continue
                results.append({
                    "path": str(dest_path),
                    "name": safe_name,
                    "size": file_size,
                })
            if not results:
                return web.Response(
                    status=400,
                    text=json.dumps({"error": "no files received"}),
                    content_type="application/json",
                    headers=_CORS_HEADERS,
                )
            return web.Response(
                text=json.dumps(results),
                content_type="application/json",
                headers=_CORS_HEADERS,
            )
        except Exception as e:
            print(f"[api_server] upload error: {e}", file=sys.stderr)
            return web.Response(
                status=500,
                text=json.dumps({"error": f"upload failed: {e}"}),
                content_type="application/json",
                headers=_CORS_HEADERS,
            )

    async def _handle_upload_config(_request: web.Request) -> web.Response:
        """GET /api/upload/config — return current upload size limit."""
        return web.Response(
            text=json.dumps({"maxSizeMb": _get_upload_max_mb()}),
            content_type="application/json",
            headers=_CORS_HEADERS,
        )

    app.router.add_route("OPTIONS", "/api/upload", _handle_options)
    app.router.add_post("/api/upload", _handle_upload)
    app.router.add_get("/api/upload/config", _handle_upload_config)

    import getpass as _getpass

    async def _handle_whoami(_request: web.Request) -> web.Response:
        username = ""
        try:
            username = _getpass.getuser()
        except Exception:
            try:
                username = os.getlogin()
            except Exception:
                username = "User"
        return web.Response(
            text=json.dumps({"username": username}),
            content_type="application/json",
            headers=_CORS_HEADERS,
        )

    app.router.add_route("OPTIONS", "/api/whoami", _handle_options)
    app.router.add_get("/api/whoami", _handle_whoami)

    # ---------------------------------------------------------------------------
    # Static docs endpoint — serve HTML files from the docs/ directory
    # ---------------------------------------------------------------------------

    def _get_docs_dir() -> Path:
        if getattr(sys, "frozen", False):
            # Running as PyInstaller exe: <install_root>/api_server/api_server.exe
            # Serve docs from the directory next to the exe so files can be
            # updated independently without rebuilding the executable.
            return Path(sys.executable).parent / "docs"
        return Path(__file__).parent / "docs"

    async def _handle_doc(request: web.Request) -> web.Response:
        filename = request.match_info.get("filename", "")
        import re as _re
        if not _re.match(r'^[a-zA-Z0-9\-]+\.html$', filename):
            return web.Response(status=404, text="Not found", headers=_CORS_HEADERS)
        doc_path = _get_docs_dir() / filename
        if not doc_path.exists():
            return web.Response(status=404, text="Not found", headers=_CORS_HEADERS)
        try:
            content = doc_path.read_text(encoding="utf-8")
            return web.Response(text=content, content_type="text/html",
                                charset="utf-8", headers=_CORS_HEADERS)
        except Exception as e:
            return web.Response(status=500, text=str(e), headers=_CORS_HEADERS)

    app.router.add_route("OPTIONS", "/api/docs/{filename}", _handle_options)
    app.router.add_get("/api/docs/{filename}", _handle_doc)

    # ---------------------------------------------------------------------------
    # Management endpoints — Cron & Git backed by nanobot SDK
    # ---------------------------------------------------------------------------

    # Resolve workspace path: prefer --workspace arg, fall back to nanobot SDK config.
    workspace_path: Path | None = None
    if args.workspace:
        workspace_path = Path(args.workspace).expanduser().resolve()
    else:
        try:
            from nanobot.config.loader import load_config  # type: ignore
            _cfg = load_config(config_path)  # uses config_path or ~/.nanobot/config.json
            workspace_path = _cfg.workspace_path.resolve()
        except Exception as _e:
            print(f"[api_server] WARNING: could not resolve workspace via nanobot SDK: {_e}", file=sys.stderr)
            # Manual fallback: parse config.json
            if config_path and config_path.exists():
                try:
                    _cfg_data = json.loads(config_path.read_text(encoding="utf-8-sig"))
                    _ws_str = (_cfg_data.get("agents", {}).get("defaults", {}) or {}).get("workspace", "")
                    if _ws_str:
                        workspace_path = Path(os.path.expanduser(_ws_str)).resolve()
                except Exception as _e2:
                    print(f"[api_server] WARNING: could not resolve workspace from config: {_e2}", file=sys.stderr)

    # If still None (no --config/--workspace passed), try environment variable or common paths.
    if not workspace_path:
        _data_dir = os.environ.get("NANOBOT_DATA_DIR") or os.environ.get("MEC_DATA_DIR")
        if _data_dir:
            workspace_path = (Path(_data_dir) / ".nanobot" / "workspace").resolve()
        else:
            # Probe known default locations: launch_mec_agent.ps1 uses D:\MEC_AgentData on machines with D:
            for _candidate in [
                Path(os.environ.get("LOCALAPPDATA", "~")) / "MEC_Agent" / ".nanobot" / "workspace",
                Path("D:\\MEC_AgentData\\.nanobot\\workspace"),
                Path.home() / ".mec_agent" / "workspace",
                Path.home() / ".nanobot" / "workspace",
            ]:
                try:
                    _c = Path(os.path.expanduser(str(_candidate))).resolve()
                    if (_c / "cron").exists() or (_c / "sessions").exists() or _c.exists():
                        workspace_path = _c
                        break
                except Exception:
                    continue

    if workspace_path:
        print(f"[api_server] workspace_path resolved to: {workspace_path}", flush=True)
        # Populate local-image proxy allowed directories now that workspace_path is known
        _LOCAL_IMAGE_ALLOWED_DIRS.append(workspace_path / ".playwright-mcp")
    else:
        print("[api_server] WARNING: workspace_path could not be determined; management endpoints will return 503", file=sys.stderr, flush=True)

    # Playwright screenshots can be written under different roots depending on
    # launch mode (dev repo root vs installed MEC_Agent root). Allow both.
    _local_playwright_dirs: list[Path] = []
    if getattr(sys, "frozen", False):
        # api_server.exe -> <install_root>\api_server\api_server.exe
        # install root is the parent of api_server directory.
        _install_root = Path(sys.executable).resolve().parent.parent
        _local_playwright_dirs.append(_install_root / ".playwright-mcp")
        _local_playwright_dirs.append(_install_root.parent / ".playwright-mcp")
    else:
        # Dev/source run:
        # .../agent_packaging/api_server/server.py
        # - parent.parent   => agent_packaging
        # - parent.parent.parent => repo root
        _api_server_dir = Path(__file__).resolve().parent
        _agent_packaging_root = _api_server_dir.parent
        _local_playwright_dirs.append(_agent_packaging_root / ".playwright-mcp")
        _local_playwright_dirs.append(_agent_packaging_root.parent / ".playwright-mcp")

    for _d in _local_playwright_dirs:
        if _d not in _LOCAL_IMAGE_ALLOWED_DIRS:
            _LOCAL_IMAGE_ALLOWED_DIRS.append(_d)

    # Also allow Playwright MCP screenshots from the dev workspace
    _dev_ws = Path("D:/MEC_AgentData_dev/.mec_agent/workspace/.playwright-mcp")
    if _dev_ws.exists():
        _LOCAL_IMAGE_ALLOWED_DIRS.append(_dev_ws)

    # ── Integrity monitor: restore protected workspace files on startup ──────
    if workspace_path:
        from integrity_monitor import restore_protected_files, start_monitor
        restore_protected_files(workspace_path)
        start_monitor(workspace_path, interval=120)

    # ── Long-lived CronService + NotificationManager ─────────────────────────
    # Previously each CRUD handler called _make_cron_service() which created a
    # throwaway instance.  We now keep a single long-lived instance so that the
    # on_job callback fires when scheduled jobs are due.

    cron_svc = None           # type: ignore[assignment]
    notification_mgr = None   # type: ignore[assignment]

    def _cron_job_to_dict(job) -> dict:
        from dataclasses import asdict
        return asdict(job)

    async def _cron_on_job(job) -> str | None:
        """CronService on_job callback — push SSE/balloon notification for any job with a message."""
        # Any cron job that has a message should trigger a Toast / balloon
        # notification, regardless of channel.  This covers:
        #   - channel="notification"  (created via the Schedule Management UI)
        #   - channel="websocket"     (created via AI chat)
        #   - channel=null             (system jobs with a user-facing message)
        # When channel is "notification" we also return "ok" to suppress the
        # default agent_turn delivery so the user is not notified twice.
        if job.payload.message and notification_mgr:
            await notification_mgr.push(
                title=job.name,
                message=job.payload.message,
                job_id=job.id,
            )
        if job.payload.channel == "notification":
            return "ok"
        return None

    if workspace_path:
        try:
            from nanobot.cron.service import CronService  # type: ignore
            cron_svc = CronService(
                store_path=workspace_path / "cron" / "jobs.json",
                on_job=_cron_on_job,
            )
            # ── Cross-process safety for shared jobs.json ──────────────────
            # Both api_server and nanobot gateway run separate CronService
            # instances sharing the same jobs.json.  During _on_timer the
            # service holds an in-memory snapshot and skips disk reads; when
            # _save_store() is called it overwrites the file with whatever is
            # in memory, wiping any jobs added by the other process in the
            # meantime.
            #
            # We cannot modify nanobot source, so we apply two mitigations
            # here in api_server:
            #
            #   1. Monkey-patch our own _save_store to merge disk-only jobs
            #      before writing (prevents *us* from overwriting *gateway's*
            #      new jobs).
            #   2. After add_job, also append to the action file so that
            #      gateway's _merge_action() can recover jobs that were
            #      overwritten by gateway's own _save_store() during its
            #      _on_timer window.
            _orig_save_store = cron_svc._save_store  # type: ignore[attr-defined]
            from nanobot.cron.types import CronJob as _CronJob  # type: ignore

            def _patched_save_store():
                """Merge disk-only jobs before writing to prevent cross-process data loss."""
                if cron_svc._store and cron_svc.store_path.exists():
                    try:
                        disk_data = json.loads(
                            cron_svc.store_path.read_text(encoding="utf-8")
                        )
                        disk_ids = {j.get("id") for j in disk_data.get("jobs", [])}
                        mem_ids = {j.id for j in cron_svc._store.jobs}
                        new_ids = disk_ids - mem_ids
                        if new_ids:
                            for j in disk_data.get("jobs", []):
                                if j.get("id") in new_ids:
                                    try:
                                        cron_svc._store.jobs.append(
                                            _CronJob.from_dict(j)
                                        )
                                    except Exception:
                                        pass
                    except Exception:
                        pass
                _orig_save_store()

            cron_svc._save_store = _patched_save_store  # type: ignore[attr-defined]

            # Helper: also write to action file after add_job so gateway's
            # _merge_action() can recover overwritten jobs.
            def _append_cron_action(action: str, params: dict):
                """Append a line to the cron action file for cross-process sync."""
                action_path = cron_svc.store_path.parent / "action.jsonl"
                try:
                    action_path.parent.mkdir(parents=True, exist_ok=True)
                    with open(action_path, "a", encoding="utf-8") as f:
                        f.write(json.dumps({"action": action, "params": params}, ensure_ascii=False) + "\n")
                except Exception:
                    pass  # Best-effort; action file is a safety net
        except Exception as _e:
            print(f"[api_server] WARNING: CronService unavailable: {_e}", file=sys.stderr)

        try:
            from notification_manager import NotificationManager
            notification_mgr = NotificationManager(workspace_path=workspace_path)
        except Exception as _e:
            print(f"[api_server] WARNING: NotificationManager unavailable: {_e}", file=sys.stderr)

    # -- CRON CRUD --

    async def _handle_list_cron(request: web.Request) -> web.Response:
        if not cron_svc:
            return web.Response(status=503, text=json.dumps({"error": "workspace unavailable"}),
                                content_type="application/json", headers=_CORS_HEADERS)
        try:
            jobs = cron_svc.list_jobs(include_disabled=True)
            return web.Response(
                text=json.dumps([_cron_job_to_dict(j) for j in jobs]),
                content_type="application/json",
                headers=_CORS_HEADERS,
            )
        except Exception as e:
            return web.Response(status=500, text=json.dumps({"error": str(e)}),
                                content_type="application/json", headers=_CORS_HEADERS)

    async def _handle_add_cron_job(request: web.Request) -> web.Response:
        """POST /api/manage/cron — add a new scheduled reminder."""
        if not cron_svc:
            return web.Response(status=503, text=json.dumps({"error": "workspace unavailable"}),
                                content_type="application/json", headers=_CORS_HEADERS)
        try:
            body = await request.json()
        except Exception:
            return web.Response(status=400, text=json.dumps({"error": "invalid JSON"}),
                                content_type="application/json", headers=_CORS_HEADERS)

        name = (body.get("name") or "").strip()
        message = (body.get("message") or "").strip()
        if not name or not message:
            return web.Response(status=400, text=json.dumps({"error": "name and message required"}),
                                content_type="application/json", headers=_CORS_HEADERS)

        schedule_data = body.get("schedule", {})
        from nanobot.cron.types import CronSchedule
        schedule = CronSchedule(
            kind=schedule_data.get("kind", "every"),
            at_ms=schedule_data.get("atMs") or schedule_data.get("at_ms"),
            every_ms=schedule_data.get("everyMs") or schedule_data.get("every_ms"),
            expr=schedule_data.get("expr"),
            tz=schedule_data.get("tz"),
        )
        # Validate: every schedule needs its required fields populated
        if schedule.kind == "every" and (not schedule.every_ms or schedule.every_ms <= 0):
            return web.Response(status=400, text=json.dumps({"error": "everyMs required for interval schedule"}),
                                content_type="application/json", headers=_CORS_HEADERS)
        if schedule.kind == "at" and (not schedule.at_ms or schedule.at_ms <= 0):
            return web.Response(status=400, text=json.dumps({"error": "atMs required for one-shot schedule"}),
                                content_type="application/json", headers=_CORS_HEADERS)
        if schedule.kind == "cron" and not schedule.expr:
            return web.Response(status=400, text=json.dumps({"error": "expr required for cron schedule"}),
                                content_type="application/json", headers=_CORS_HEADERS)

        channel = body.get("channel", "notification")
        job = cron_svc.add_job(
            name=name,
            schedule=schedule,
            message=message,
            deliver=False,
            channel=channel,
        )
        # Also write to the cron action file so gateway's _merge_action()
        # can recover this job if its _save_store() overwrites jobs.json
        # during its _on_timer window.
        from dataclasses import asdict as _asdict
        _append_cron_action("add", _asdict(job))
        return web.Response(
            status=201,
            text=json.dumps(_cron_job_to_dict(job)),
            content_type="application/json",
            headers=_CORS_HEADERS,
        )

    async def _handle_patch_cron_job(request: web.Request) -> web.Response:
        job_id = request.match_info.get("job_id", "")
        if not cron_svc:
            return web.Response(status=503, text=json.dumps({"error": "workspace unavailable"}),
                                content_type="application/json", headers=_CORS_HEADERS)
        try:
            body = await request.json()
        except Exception:
            return web.Response(status=400, text=json.dumps({"error": "invalid JSON"}),
                                content_type="application/json", headers=_CORS_HEADERS)

        job = None
        if "enabled" in body:
            job = cron_svc.enable_job(job_id, bool(body["enabled"]))
            if job is None:
                return web.Response(status=404, text=json.dumps({"error": "not_found"}),
                                    content_type="application/json", headers=_CORS_HEADERS)
            # Write update action so gateway's _merge_action() applies the
            # new enabled state instead of recovering the old one from add.
            from dataclasses import asdict as _asdict
            _append_cron_action("update", _asdict(job))

        if "name" in body or "message" in body:
            kwargs: dict = {}
            if "name" in body:
                kwargs["name"] = str(body["name"])
            if "message" in body:
                kwargs["message"] = str(body["message"])
            result = cron_svc.update_job(job_id, **kwargs)
            if result in ("not_found", "protected"):
                return web.Response(status=404 if result == "not_found" else 403,
                                    text=json.dumps({"error": result}),
                                    content_type="application/json", headers=_CORS_HEADERS)
            job = result
            # Write update action so gateway's _merge_action() applies the
            # new name/message instead of recovering old values from add.
            from dataclasses import asdict as _asdict
            _append_cron_action("update", _asdict(job))

        if job is None:
            job = cron_svc.get_job(job_id)

        return web.Response(
            text=json.dumps(_cron_job_to_dict(job) if job else {}),
            content_type="application/json",
            headers=_CORS_HEADERS,
        )

    async def _handle_delete_cron_job(request: web.Request) -> web.Response:
        job_id = request.match_info.get("job_id", "")
        if not cron_svc:
            return web.Response(status=503, text=json.dumps({"error": "workspace unavailable"}),
                                content_type="application/json", headers=_CORS_HEADERS)
        result = cron_svc.remove_job(job_id)
        if result == "not_found":
            return web.Response(status=404, text=json.dumps({"error": "not_found"}),
                                content_type="application/json", headers=_CORS_HEADERS)
        if result == "protected":
            return web.Response(status=403, text=json.dumps({"error": "protected system job"}),
                                content_type="application/json", headers=_CORS_HEADERS)
        # Also write a delete action so gateway's _merge_action() removes
        # the job instead of recovering it from a previous add action.
        _append_cron_action("del", {"job_id": job_id})
        return web.Response(
            text=json.dumps({"deleted": True}),
            content_type="application/json",
            headers=_CORS_HEADERS,
        )

    # -- NOTIFICATIONS (SSE + list + mark-read) --

    async def _handle_notifications_stream(request: web.Request) -> web.StreamResponse:
        """SSE endpoint: /api/notifications/stream.
        Clients connect with EventSource and receive push events in real time."""
        if not notification_mgr:
            return web.Response(status=503, text=json.dumps({"error": "workspace unavailable"}),
                                content_type="application/json", headers=_CORS_HEADERS)
        resp = web.StreamResponse(
            status=200,
            headers={
                "Content-Type": "text/event-stream",
                "Cache-Control": "no-cache",
                "Connection": "keep-alive",
                "X-Accel-Buffering": "no",
                **_CORS_HEADERS,
            },
        )
        await resp.prepare(request)
        # Send an initial comment to ensure the connection is established
        await resp.write(b": connected\n\n")
        q = notification_mgr.subscribe_sse()
        try:
            while True:
                event = await q.get()
                data = json.dumps(event, ensure_ascii=False)
                await resp.write(f"data: {data}\n\n".encode("utf-8"))
        except (ConnectionResetError, asyncio.CancelledError, ConnectionError):
            pass
        finally:
            notification_mgr.unsubscribe_sse(q)
        return resp

    async def _handle_list_notifications(request: web.Request) -> web.Response:
        """GET /api/notifications — recent notification list."""
        if not notification_mgr:
            return web.Response(status=503, text=json.dumps({"error": "workspace unavailable"}),
                                content_type="application/json", headers=_CORS_HEADERS)
        limit = int(request.rel_url.query.get("limit", "50"))
        events = await notification_mgr.list_recent(limit=limit)
        return web.Response(
            text=json.dumps(events),
            content_type="application/json",
            headers=_CORS_HEADERS,
        )

    async def _handle_mark_notification_read(request: web.Request) -> web.Response:
        """PATCH /api/notifications/{id}/read."""
        if not notification_mgr:
            return web.Response(status=503, text=json.dumps({"error": "workspace unavailable"}),
                                content_type="application/json", headers=_CORS_HEADERS)
        event_id = request.match_info.get("id", "")
        found = await notification_mgr.mark_read(event_id)
        if not found:
            return web.Response(status=404, text=json.dumps({"error": "not_found"}),
                                content_type="application/json", headers=_CORS_HEADERS)
        return web.Response(
            text=json.dumps({"read": True}),
            content_type="application/json",
            headers=_CORS_HEADERS,
        )

    # Register management routes
    app.router.add_route("OPTIONS", "/api/manage/cron",          _handle_options)
    app.router.add_route("OPTIONS", "/api/manage/cron/{job_id}", _handle_options)
    app.router.add_route("OPTIONS", "/api/notifications",        _handle_options)
    app.router.add_route("OPTIONS", "/api/notifications/stream",  _handle_options)
    app.router.add_route("OPTIONS", "/api/notifications/{id}/read", _handle_options)
    app.router.add_get(    "/api/manage/cron",            _handle_list_cron)
    app.router.add_post(   "/api/manage/cron",            _handle_add_cron_job)
    app.router.add_patch(  "/api/manage/cron/{job_id}",   _handle_patch_cron_job)
    app.router.add_delete( "/api/manage/cron/{job_id}",   _handle_delete_cron_job)
    app.router.add_get(    "/api/notifications/stream",  _handle_notifications_stream)
    app.router.add_get(    "/api/notifications",         _handle_list_notifications)
    app.router.add_patch(  "/api/notifications/{id}/read", _handle_mark_notification_read)

    # ---------------------------------------------------------------------------
    # Workspace filesystem endpoints — generic file access + derived views
    # ---------------------------------------------------------------------------

    _WS_ALLOWED_WRITE = {
        "SOUL.md",
        "AGENTS.md", "TOOLS.md", "USER.md", "HEARTBEAT.md",
        "memory/MEMORY.md",
    }

    def _safe_ws_path(rel: str) -> Path | None:
        """Resolve rel relative to workspace; return None if path traversal or workspace unknown."""
        if not workspace_path:
            return None
        rel = rel.replace("\\", "/").lstrip("/")
        parts = rel.split("/")
        if ".." in parts or "" in parts[:-1]:
            return None
        try:
            p = (workspace_path / rel).resolve()
            p.relative_to(workspace_path.resolve())
            return p
        except (ValueError, OSError):
            return None

    async def _handle_ws_read_file(request: web.Request) -> web.Response:
        rel = request.rel_url.query.get("path", "")
        if not rel:
            return web.Response(status=400, text=json.dumps({"error": "missing path"}),
                                content_type="application/json", headers=_CORS_HEADERS)
        p = _safe_ws_path(rel)
        if p is None:
            return web.Response(
                status=503 if not workspace_path else 400,
                text=json.dumps({"error": "workspace unavailable" if not workspace_path else "invalid path"}),
                content_type="application/json", headers=_CORS_HEADERS,
            )
        if not p.exists():
            return web.Response(text=json.dumps({"exists": False, "content": ""}),
                                content_type="application/json", headers=_CORS_HEADERS)
        try:
            content = p.read_text(encoding="utf-8", errors="replace")
            return web.Response(text=json.dumps({"exists": True, "content": content}),
                                content_type="application/json", headers=_CORS_HEADERS)
        except Exception as e:
            return web.Response(status=500, text=json.dumps({"error": str(e)}),
                                content_type="application/json", headers=_CORS_HEADERS)

    async def _handle_ws_write_file(request: web.Request) -> web.Response:
        rel = request.rel_url.query.get("path", "")
        if not rel:
            return web.Response(status=400, text=json.dumps({"error": "missing path"}),
                                content_type="application/json", headers=_CORS_HEADERS)
        # Security: only allow writes to explicitly whitelisted relative paths
        norm = rel.replace("\\", "/").lstrip("/")
        if norm not in _WS_ALLOWED_WRITE:
            return web.Response(status=403, text=json.dumps({"error": "write not permitted for this path"}),
                                content_type="application/json", headers=_CORS_HEADERS)
        p = _safe_ws_path(rel)
        if p is None:
            return web.Response(status=503 if not workspace_path else 400,
                                text=json.dumps({"error": "workspace unavailable" if not workspace_path else "invalid path"}),
                                content_type="application/json", headers=_CORS_HEADERS)
        try:
            body = await request.json()
        except Exception:
            return web.Response(status=400, text=json.dumps({"error": "invalid JSON"}),
                                content_type="application/json", headers=_CORS_HEADERS)
        content = body.get("content", "")
        if not isinstance(content, str):
            return web.Response(status=400, text=json.dumps({"error": "content must be a string"}),
                                content_type="application/json", headers=_CORS_HEADERS)
        try:
            p.parent.mkdir(parents=True, exist_ok=True)
            p.write_text(content, encoding="utf-8")
            return web.Response(text=json.dumps({"ok": True}),
                                content_type="application/json", headers=_CORS_HEADERS)
        except Exception as e:
            return web.Response(status=500, text=json.dumps({"error": str(e)}),
                                content_type="application/json", headers=_CORS_HEADERS)

    async def _handle_ws_skills(request: web.Request) -> web.Response:
        if not workspace_path:
            return web.Response(status=503, text=json.dumps({"error": "workspace unavailable"}),
                                content_type="application/json", headers=_CORS_HEADERS)
        skills_dir = workspace_path / "skills"
        results: list[dict] = []
        if skills_dir.is_dir():
            for entry in sorted(skills_dir.iterdir()):
                if not entry.is_dir():
                    continue
                skill_md = entry / "SKILL.md"
                preview = ""
                if skill_md.exists():
                    try:
                        text = skill_md.read_text(encoding="utf-8", errors="replace")
                        for line in text.splitlines():
                            stripped = line.strip().lstrip("#").strip()
                            if stripped:
                                preview = stripped
                                break
                    except Exception:
                        pass
                results.append({
                    "name": entry.name,
                    "has_skill_md": skill_md.exists(),
                    "preview": preview,
                })
        return web.Response(text=json.dumps(results),
                            content_type="application/json", headers=_CORS_HEADERS)

    # ---------------------------------------------------------------------------
    # Context-usage endpoint — estimate prompt tokens for a session
    # ---------------------------------------------------------------------------

    # Cache for the ContextBuilder and tool-definitions token overhead.
    # Built once on first request; rebuilt if the workspace or config changes.
    _context_builder_cache: dict[str, Any] = {}

    def _make_session_manager():
        """Create a SessionManager for the resolved workspace (lazy import)."""
        if not workspace_path:
            return None
        try:
            from nanobot.session import SessionManager  # type: ignore
            return SessionManager(workspace_path)
        except Exception as _e:
            print(f"[api_server] WARNING: SessionManager unavailable: {_e}", file=sys.stderr)
            return None

    def _get_or_create_context_builder():
        """Return a (cached) ContextBuilder + tool-overhead for the workspace.

        The ContextBuilder builds the full system prompt (identity, bootstrap
        files, memory, skills), so token estimates include that overhead.
        Tool definitions are estimated separately and cached as a fixed
        overhead (they don't vary per session).
        """
        cache_key = str(workspace_path)
        cached = _context_builder_cache.get(cache_key)
        if cached is not None:
            return cached

        result: dict[str, Any] = {"context_builder": None, "tool_token_overhead": 0}
        try:
            from nanobot.agent.context import ContextBuilder  # type: ignore
            result["context_builder"] = ContextBuilder(workspace_path)
        except Exception as _e:
            print(f"[api_server] WARNING: ContextBuilder unavailable: {_e}", file=sys.stderr)

        # --- Tool token overhead estimation ---
        # Build a lightweight ToolRegistry and register all known tools.
        # Token count for tool definitions is constant per nanobot version,
        # so we compute it once and cache the result.
        try:
            from nanobot.agent.tools.registry import ToolRegistry  # type: ignore
            # nanobot 0.2.1: AskUserTool removed, GlobTool → FindFilesTool,
            # NotebookEditTool removed, tool system uses ToolLoader + entry_points
            from nanobot.agent.tools.filesystem import (  # type: ignore
                ReadFileTool, WriteFileTool, EditFileTool, ListDirTool,
            )
            from nanobot.agent.tools.search import FindFilesTool, GrepTool  # type: ignore
            from nanobot.agent.tools.shell import ExecTool  # type: ignore
            from nanobot.agent.tools.web import WebSearchTool, WebFetchTool  # type: ignore
            from nanobot.agent.tools.message import MessageTool  # type: ignore
            from nanobot.utils.helpers import estimate_prompt_tokens  # type: ignore

            reg = ToolRegistry()
            _ws = workspace_path
            # Tools with simple constructors (v0.2.1: 10 lightweight tools)
            reg.register(ReadFileTool(workspace=_ws))
            reg.register(WriteFileTool(workspace=_ws))
            reg.register(EditFileTool(workspace=_ws))
            reg.register(ListDirTool(workspace=_ws))
            reg.register(FindFilesTool(workspace=_ws))
            reg.register(GrepTool(workspace=_ws))
            reg.register(ExecTool(
                working_dir=str(_ws), timeout=120,
                restrict_to_workspace=False, sandbox=False,
            ))
            reg.register(WebSearchTool())
            reg.register(WebFetchTool())
            reg.register(MessageTool(send_callback=lambda x: None, workspace=_ws))

            tool_defs = reg.get_definitions()
            lightweight_tokens = estimate_prompt_tokens([], tool_defs)

            # SpawnTool, CronTool, LongTaskTool, SelfTool need heavy deps
            # (AgentLoop, SubagentManager, CronService) that we cannot construct here.
            # Add a static estimate for their schema overhead.
            _HEAVY_TOOL_OVERHEAD = 400  # ~100 tokens per skipped tool
            result["tool_token_overhead"] = lightweight_tokens + _HEAVY_TOOL_OVERHEAD
            print(
                f"[api_server] Tool token overhead: {lightweight_tokens} "
                f"(10 tools) + {_HEAVY_TOOL_OVERHEAD} (4 skipped) = "
                f"{result['tool_token_overhead']}",
                file=sys.stderr,
            )
        except Exception as _te:
            print(f"[api_server] INFO: tool overhead estimation failed: {_te}", file=sys.stderr)
            # Fallback: nanobot default ~15 tools ≈ 3000 tokens
            result["tool_token_overhead"] = 3000

        _context_builder_cache[cache_key] = result
        return result

    async def _handle_context_usage(request: web.Request) -> web.Response:
        """Return context window usage estimate for a session.

        Reads the session JSONL from workspace, builds the full prompt context
        (system prompt + skills + memory + runtime context + session history),
        and estimates prompt tokens via tiktoken.  This matches the gateway's
        Consolidator estimate closely enough for a percentage display.

        This endpoint lives in api_server (not nanobot gateway) so that nanobot
        source code does not need modification — easier version management.
        """
        key = request.match_info.get("key", "")
        if not key:
            return web.Response(status=400, text=json.dumps({"error": "missing session key"}),
                                content_type="application/json", headers=_CORS_HEADERS)
        sm = _make_session_manager()
        if not sm:
            return web.Response(status=503, text=json.dumps({"error": "workspace unavailable"}),
                                content_type="application/json", headers=_CORS_HEADERS)
        data = sm.read_session_file(key)
        if data is None:
            return web.Response(status=404, text=json.dumps({"error": "session not found"}),
                                content_type="application/json", headers=_CORS_HEADERS)
        messages = data.get("messages", [])

        # Read context window config
        context_window_tokens = 65536  # nanobot default
        max_completion_tokens = 8192   # nanobot default
        try:
            from nanobot.config.loader import load_config  # type: ignore
            _cfg = load_config(config_path)
            context_window_tokens = _cfg.agents.defaults.context_window_tokens
            max_completion_tokens = _cfg.agents.defaults.max_tokens
        except Exception:
            pass  # keep defaults

        # Budget mirrors Consolidator formula:
        #   ctx_window - max_completion - _SAFETY_BUFFER(1024)
        safety_buffer = 1024
        input_budget = max(context_window_tokens - max_completion_tokens - safety_buffer, 1)

        # Build full prompt messages via ContextBuilder (includes system prompt,
        # skills, memory, runtime context) and add tool-definition overhead.
        cb_info = _get_or_create_context_builder()
        cb = cb_info.get("context_builder")
        tool_overhead = cb_info.get("tool_token_overhead", 0)
        estimated = 0
        source = "none"

        if cb is not None:
            try:
                from nanobot.utils.helpers import estimate_prompt_tokens  # type: ignore
                channel, chat_id = (key.split(":", 1) + [""])[:2]
                probe_messages = cb.build_messages(
                    history=messages,
                    current_message="[token-probe]",
                    channel=channel or None,
                    chat_id=chat_id or None,
                )
                base_tokens = estimate_prompt_tokens(probe_messages)
                estimated = base_tokens + tool_overhead
                source = "tiktoken+context"
            except Exception as _e:
                print(f"[api_server] WARNING: ContextBuilder token estimation failed: {_e}", file=sys.stderr)
                # Fallback: session-only estimate + static overhead
                try:
                    from nanobot.utils.helpers import estimate_prompt_tokens as _ept  # type: ignore
                    estimated = _ept(messages) + 7800 + tool_overhead  # ~7800 = avg system+runtime overhead
                    source = "tiktoken+overhead"
                except Exception:
                    estimated = 0
                    source = "none"
        else:
            # No ContextBuilder — raw session estimate only
            try:
                from nanobot.utils.helpers import estimate_prompt_tokens  # type: ignore
                estimated = estimate_prompt_tokens(messages)
                source = "tiktoken"
            except Exception:
                estimated = 0
                source = "none"

        percent = min(int((estimated / input_budget) * 100), 999) if input_budget > 0 else 0

        payload = {
            "context_window_tokens": context_window_tokens,
            "estimated_prompt_tokens": estimated,
            "input_budget": input_budget,
            "max_completion_tokens": max_completion_tokens,
            "percent": percent,
            "source": source,
            "last_usage": {
                "prompt_tokens": 0,
                "completion_tokens": 0,
                "cached_tokens": 0,
                "total_tokens": 0,
            },
        }
        return web.Response(text=json.dumps(payload),
                            content_type="application/json", headers=_CORS_HEADERS)

    app.router.add_route("OPTIONS", "/api/sessions/{key}/context-usage", _handle_options)
    app.router.add_get("/api/sessions/{key}/context-usage", _handle_context_usage)

    # ---------------------------------------------------------------------------
    # Subagent history endpoint — reads session JSONL and returns all
    # subagent results (completed / failed) for a given session key.
    # This lives in api_server (not nanobot) so we don't modify nanobot source.
    # ---------------------------------------------------------------------------

    async def _handle_subagents(request: web.Request) -> web.Response:
        """Return subagent history for a session.

        Reads the session JSONL, filters messages with
        ``injected_event == "subagent_result"``, and returns structured
        subagent entries including real task_id, label, status, and full
        (un-truncated) result text.
        """
        key = request.match_info.get("key", "")
        if not key:
            return web.Response(status=400, text=json.dumps({"error": "missing session key"}),
                                content_type="application/json", headers=_CORS_HEADERS)
        sm = _make_session_manager()
        if not sm:
            return web.Response(status=503, text=json.dumps({"error": "workspace unavailable"}),
                                content_type="application/json", headers=_CORS_HEADERS)
        data = sm.read_session_file(key)
        if data is None:
            return web.Response(status=404, text=json.dumps({"error": "session not found"}),
                                content_type="application/json", headers=_CORS_HEADERS)

        messages = data.get("messages", [])
        subagents: list[dict[str, Any]] = []
        # Track labels already added to avoid duplicates when both the
        # injected_event entry AND the drained user-role entry exist.
        seen_labels: set[str] = set()

        def _parse_subagent_entry(content: str, task_id: str, timestamp: str) -> dict[str, Any] | None:
            """Parse a subagent announcement into a structured entry.

            Handles both formats:
            1. role=assistant + injected_event="subagent_result" (intended but
               currently not persisted to JSONL)
            2. role=user with content starting with "[Subagent '...' ...]"
               (the _drain_pending path that converts InboundMessage → user msg)
            """
            if not isinstance(content, str) or not content.strip():
                return None

            # Format: [Subagent 'label' completed successfully/failed]
            header_match = re.match(
                r"^\[Subagent\s+'([^']+)'\s+(completed successfully|failed)\]",
                content,
            )
            if not header_match:
                return None

            label = header_match.group(1)
            status_text = header_match.group(2)
            status = "completed" if status_text.startswith("completed") else "failed"

            # Extract task description
            task_match = re.search(
                r"\nTask:\s*\n([\s\S]*?)(?=\nResult:|\nCompleted steps:|\nFailure:|\nSummarize|$)",
                content,
            )
            task_desc = task_match.group(1).strip() if task_match else None

            # Extract completed steps
            steps_match = re.search(
                r"Completed steps:\s*\n([\s\S]*?)(?=\n\nFailure:|\n\nSummarize|\nResult:|$)",
                content,
            )
            completed_steps = None
            if steps_match:
                completed_steps = [
                    re.sub(r"^-\s*", "", line).strip()
                    for line in steps_match.group(1).splitlines()
                    if line.strip()
                ]

            # Extract failure detail
            failure_match = re.search(
                r"Failure:\s*\n([\s\S]*?)(?=\n\nSummarize|\nResult:|$)",
                content,
            )
            failure_detail = failure_match.group(1).strip() if failure_match else None

            # Extract result body — the text between "Result:" and either
            # "Completed steps:" or "Summarize".  When the subagent failed
            # the Result section may only contain the steps/failure blocks,
            # so result_text can be None.
            result_match = re.search(
                r"\nResult:\s*\n([\s\S]*?)(?=\nCompleted steps:|\nSummarize|$)",
                content,
            )
            result_text = result_match.group(1).strip() if result_match else None
            if result_text == "":
                result_text = None

            entry: dict[str, Any] = {
                "task_id": task_id,
                "label": label,
                "status": status,
                "timestamp": timestamp,
            }
            if task_desc:
                entry["task"] = task_desc
            if completed_steps:
                entry["completed_steps"] = completed_steps
            if failure_detail:
                entry["failure_detail"] = failure_detail
            if result_text:
                entry["result_text"] = result_text

            return entry

        for m in messages:
            if not isinstance(m, dict):
                continue

            content = m.get("content", "")
            role = m.get("role", "")

            # Path 1: role=assistant with injected_event="subagent_result"
            # (intended nanobot persistence — currently not present in JSONL
            # but may work in future versions)
            if m.get("injected_event") == "subagent_result":
                task_id = m.get("subagent_task_id", "")
                timestamp = m.get("timestamp", "")
                entry = _parse_subagent_entry(content, task_id, timestamp)
                if entry and entry["label"] not in seen_labels:
                    seen_labels.add(entry["label"])
                    subagents.append(entry)
                continue

            # Path 2: role=user with content starting with [Subagent '...' ...]
            # This is the actual format present in session JSONL because
            # _drain_pending converts InboundMessage → {"role": "user", ...}
            # and subagent announcements arrive this way.
            if role == "user" and isinstance(content, str) and content.startswith("[Subagent "):
                task_id = m.get("subagent_task_id", "")
                timestamp = m.get("timestamp", "")
                entry = _parse_subagent_entry(content, task_id, timestamp)
                if entry and entry["label"] not in seen_labels:
                    seen_labels.add(entry["label"])
                    subagents.append(entry)
                continue

        return web.Response(
            text=json.dumps({"subagents": subagents}, ensure_ascii=False),
            content_type="application/json",
            charset="utf-8",
            headers=_CORS_HEADERS,
        )

    app.router.add_route("OPTIONS", "/api/subagents/{key}", _handle_options)
    app.router.add_get("/api/subagents/{key}", _handle_subagents)

    # ---------------------------------------------------------------------------
    # Session media endpoint — reads the nanobot WebUI transcript JSONL
    # and returns assistant media_urls so the UI can restore attachments
    # after a page refresh without modifying nanobot source.
    # ---------------------------------------------------------------------------

    def _read_transcript_media(session_key: str) -> list[dict[str, Any]]:
        """Read the session JSONL for *session_key* and return assistant
        media entries in order.

        nanobot's persisted session JSONL contains the raw local file paths
        in the ``media`` field but does NOT persist ``media_urls`` (signed).
        We walk the JSONL, locate assistant turns that have a ``media`` list,
        and build ``media_urls`` entries from the raw paths so the UI can
        restore file attachments after a page refresh.
        """
        import glob

        if not workspace_path:
            return []

        sessions_dir = workspace_path / "sessions"
        if not sessions_dir.is_dir():
            return []

        # Match the websocket JSONL file for this session key.
        # nanobot names files: websocket_<key>.jsonl where <key> is the
        # full session key. The nanobot gateway stores with underscore instead
        # of colon (websocket_7bd3ce...jsonl instead of websocket:7bd3ce...jsonl).
        candidates = []
        # Try original key and underscore-version
        search_keys = [session_key, session_key.replace(":", "_")]
        for sk in search_keys:
            for pattern in [
                f"{sk}.jsonl",
                f"*{sk}*.jsonl",
            ]:
                candidates.extend(sessions_dir.glob(pattern))

        # De-duplicate
        seen = set()
        candidates = [p for p in candidates if not (p in seen or seen.add(p))]

        if not candidates:
            return []

        # Pick the most recently modified file.
        candidates.sort(key=lambda p: p.stat().st_mtime, reverse=True)
        path = candidates[0]

        # Simple extension-based media kind inference.
        _IMAGE_EXTS = frozenset({".png", ".jpg", ".jpeg", ".webp", ".gif", ".svg"})
        _VIDEO_EXTS = frozenset({".mp4", ".mov", ".webm"})

        def _kind_from_name(name: str) -> str:
            ext = Path(name).suffix.lower()
            if ext in _IMAGE_EXTS:
                return "image"
            if ext in _VIDEO_EXTS:
                return "video"
            return "file"

        results: list[dict[str, Any]] = []
        try:
            with open(path, "r", encoding="utf-8") as fh:
                for line in fh:
                    line = line.strip()
                    if not line:
                        continue
                    try:
                        rec = json.loads(line)
                    except json.JSONDecodeError:
                        continue
                    if not isinstance(rec, dict):
                        continue
                    if rec.get("role") != "assistant":
                        continue
                    media = rec.get("media")
                    if not isinstance(media, list) or not media:
                        # Emit an empty placeholder so the frontend can keep
                        # media entries aligned by assistant turn index.
                        results.append({"media_urls": []})
                        continue
                    urls: list[dict[str, str]] = []
                    # Prefer media_urls if they were persisted
                    # (likely empty because nanobot doesn't persist them).
                    media_urls = rec.get("media_urls")
                    if isinstance(media_urls, list) and media_urls:
                        for mu in media_urls:
                            if isinstance(mu, dict) and mu.get("url"):
                                name = str(mu.get("name", ""))
                                urls.append({
                                    "url": str(mu["url"]),
                                    "name": name,
                                    "kind": str(mu.get("kind", _kind_from_name(name))),
                                })
                    # Fallback: raw local paths from the ``media`` field.
                    if not urls:
                        for p in media:
                            if isinstance(p, str) and p:
                                name = Path(p).name
                                urls.append({
                                    "url": p,
                                    "name": name,
                                    "kind": _kind_from_name(name),
                                })
                    if urls:
                        results.append({"media_urls": urls})
        except Exception as e:
            print(f"[api_server] _read_transcript_media error: {e}", file=sys.stderr)
            return []

        return results

    async def _handle_session_media(request: web.Request) -> web.Response:
        key = request.match_info.get("key", "")
        if not key:
            return web.Response(
                status=400,
                text=json.dumps({"error": "missing session key"}),
                content_type="application/json",
                headers=_CORS_HEADERS,
            )
        try:
            data = _read_transcript_media(key)
            return web.Response(
                text=json.dumps({"key": key, "media": data}),
                content_type="application/json",
                headers=_CORS_HEADERS,
            )
        except Exception as e:
            print(f"[api_server] session-media error: {e}", file=sys.stderr)
            return web.Response(
                status=500,
                text=json.dumps({"error": str(e)}),
                content_type="application/json",
                headers=_CORS_HEADERS,
            )

    app.router.add_route("OPTIONS", "/api/session-media/{key}", _handle_options)
    app.router.add_get("/api/session-media/{key}", _handle_session_media)

    # ---------------------------------------------------------------------------
    # Session files endpoint — list all uploaded / generated files across sessions
    # ---------------------------------------------------------------------------

    def _scan_session_files() -> list[dict[str, Any]]:
        """Scan sessions directory and uploads dir for all session-related files."""
        import time

        results: list[dict[str, Any]] = []
        if not workspace_path:
            return results

        sessions_dir = workspace_path / "sessions"
        uploads_dir = _get_uploads_dir()

        _IMAGE_EXTS = frozenset({".png", ".jpg", ".jpeg", ".webp", ".gif", ".svg"})
        _VIDEO_EXTS = frozenset({".mp4", ".mov", ".webm"})

        def _kind_from_name(name: str) -> str:
            ext = Path(name).suffix.lower()
            if ext in _IMAGE_EXTS:
                return "image"
            if ext in _VIDEO_EXTS:
                return "video"
            return "file"

        # 1. Scan uploads directory for all uploaded files
        if uploads_dir and uploads_dir.is_dir():
            for f in uploads_dir.iterdir():
                if f.is_file():
                    stat = f.stat()
                    name = f.name
                    session_key = ""
                    if "_" in name:
                        prefix = name.split("_", 1)[0]
                        if len(prefix) >= 32:
                            session_key = prefix[:32]
                    results.append({
                        "name": name,
                        "path": str(f),
                        "kind": "upload",
                        "type": _kind_from_name(name),
                        "sessionKey": session_key,
                        "sessionPreview": "",
                        "size": stat.st_size,
                        "time": stat.st_mtime,
                    })

        # 2. Scan session JSONL files for assistant-generated files and user media
        if not sessions_dir.is_dir():
            return results

        for sess_file in sessions_dir.glob("websocket_*.jsonl"):
            if not sess_file.is_file():
                continue
            stem = sess_file.stem
            session_key = stem.replace("websocket_", "", 1)
            session_preview = ""
            try:
                with open(sess_file, "r", encoding="utf-8") as fh:
                    for line in fh:
                        line = line.strip()
                        if not line:
                            continue
                        try:
                            rec = json.loads(line)
                        except json.JSONDecodeError:
                            continue
                        if not isinstance(rec, dict):
                            continue

                        if rec.get("role") == "assistant" and not session_preview:
                            content = rec.get("content", "")
                            session_preview = rec.get("preview", "") or (content[:80] if isinstance(content, str) else "")

                        # User uploads via media_urls
                        if rec.get("role") == "user":
                            media_urls = rec.get("media_urls")
                            if isinstance(media_urls, list) and media_urls:
                                for mu in media_urls:
                                    if isinstance(mu, dict) and mu.get("url"):
                                        name = str(mu.get("name", ""))
                                        url = str(mu["url"])
                                        file_path = ""
                                        if url.startswith("http://") or url.startswith("https://"):
                                            continue
                                        if url.startswith("file://"):
                                            file_path = url[len("file://"):]
                                            if file_path.startswith("/"):
                                                file_path = file_path[1:]
                                        else:
                                            file_path = url
                                        already = any(
                                            r["kind"] == "upload" and r["path"] == file_path
                                            for r in results
                                        )
                                        if not already and file_path and Path(file_path).is_file():
                                            pst = Path(file_path).stat()
                                            results.append({
                                                "name": name or Path(file_path).name,
                                                "path": file_path,
                                                "kind": "upload",
                                                "type": str(mu.get("kind", _kind_from_name(name))),
                                                "sessionKey": session_key,
                                                "sessionPreview": session_preview,
                                                "size": pst.st_size,
                                                "time": pst.st_mtime,
                                            })

                        # Assistant generated files
                        if rec.get("role") == "assistant":
                            media = rec.get("media")
                            media_urls = rec.get("media_urls")
                            urls: list[dict[str, str]] = []
                            if isinstance(media_urls, list) and media_urls:
                                for mu in media_urls:
                                    if isinstance(mu, dict) and mu.get("url"):
                                        name = str(mu.get("name", ""))
                                        urls.append({
                                            "url": str(mu["url"]),
                                            "name": name or Path(str(mu["url"])).name,
                                            "kind": str(mu.get("kind", _kind_from_name(name))),
                                        })
                            if not urls and isinstance(media, list) and media:
                                for p in media:
                                    if isinstance(p, str) and p:
                                        urls.append({
                                            "url": p,
                                            "name": Path(p).name,
                                            "kind": _kind_from_name(Path(p).name),
                                        })
                            for u in urls:
                                url = u["url"]
                                file_path = ""
                                if url.startswith("http://") or url.startswith("https://"):
                                    continue
                                if url.startswith("file://"):
                                    file_path = url[len("file://"):]
                                    if file_path.startswith("/"):
                                        file_path = file_path[1:]
                                else:
                                    file_path = url
                                if not file_path or not Path(file_path).is_file():
                                    continue
                                already = any(r["path"] == file_path for r in results)
                                if already:
                                    for r in results:
                                        if r["path"] == file_path and not r.get("sessionKey"):
                                            r["sessionKey"] = session_key
                                            r["sessionPreview"] = session_preview or r.get("sessionPreview", "")
                                    continue
                                pst = Path(file_path).stat()
                                results.append({
                                    "name": u["name"],
                                    "path": file_path,
                                    "kind": "generated",
                                    "type": u["kind"],
                                    "sessionKey": session_key,
                                    "sessionPreview": session_preview,
                                    "size": pst.st_size,
                                    "time": pst.st_mtime,
                                })
            except Exception as e:
                print(f"[api_server] _scan_session_files error reading {sess_file}: {e}", file=sys.stderr)
                continue

        # 3. Mark sessionExists for each file
        existing_keys: set[str] = set()
        if sessions_dir.is_dir():
            for sf in sessions_dir.glob("websocket_*.jsonl"):
                if sf.is_file():
                    existing_keys.add(sf.stem.replace("websocket_", "", 1))
        for r in results:
            r["sessionExists"] = bool(r.get("sessionKey")) and r["sessionKey"] in existing_keys

        results.sort(key=lambda x: x.get("time", 0), reverse=True)
        return results

    async def _handle_session_files(request: web.Request) -> web.Response:
        """GET /api/session-files — list all uploaded & generated files across sessions."""
        try:
            data = _scan_session_files()
            return web.Response(
                text=json.dumps({"files": data}),
                content_type="application/json",
                headers=_CORS_HEADERS,
            )
        except Exception as e:
            print(f"[api_server] session-files error: {e}", file=sys.stderr)
            return web.Response(
                status=500,
                text=json.dumps({"error": str(e)}),
                content_type="application/json",
                headers=_CORS_HEADERS,
            )

    app.router.add_route("OPTIONS", "/api/session-files", _handle_options)
    app.router.add_get("/api/session-files", _handle_session_files)

    # ---------------------------------------------------------------------------
    # Skills Pool endpoints — fetch available skills from the remote release
    # server; fall back to local skills_pools/ when the server is unreachable.
    # ---------------------------------------------------------------------------

    _SKILLS_POOLS_REMOTE_BASE = "https://did.pegatroncorp.com/mec_agent_release/skills_pools/"
    _sp_skill_to_cat: dict[str, str] = {}  # skill_name → category_name (in-memory cache)

    def _get_skills_pools_root() -> Path | None:
        """Return the local skills_pools/ root (offline fallback).

        Searches:
          1. <api_server_dir>/skills_pools/  (dev mode & bundled)
          2. <install_root>/skills_pools/     (fallback)
        """
        if getattr(sys, "frozen", False):
            base = Path(sys.executable).parent
        else:
            base = Path(__file__).parent
        candidate = base / "skills_pools"
        if candidate.is_dir():
            return candidate
        # fallback: install root
        candidate2 = base.parent / "skills_pools"
        if candidate2.is_dir():
            return candidate2
        return None

    def _read_skill_info(skill_dir: Path) -> dict:
        """Read SKILL.md metadata and optional info.md from a skill directory."""
        skill_md = skill_dir / "SKILL.md"
        name = skill_dir.name
        preview = ""
        if skill_md.exists():
            try:
                text = skill_md.read_text(encoding="utf-8", errors="replace")
                in_frontmatter = False
                frontmatter_done = False
                frontmatter_count = 0
                for line in text.splitlines():
                    stripped_raw = line.strip()
                    # Skip YAML frontmatter (--- ... ---)
                    if stripped_raw == "---":
                        frontmatter_count += 1
                        if frontmatter_count == 1:
                            in_frontmatter = True
                            continue
                        elif frontmatter_count == 2:
                            in_frontmatter = False
                            frontmatter_done = True
                            continue
                    if in_frontmatter:
                        continue
                    stripped = stripped_raw.lstrip("#").strip()
                    if stripped:
                        preview = stripped
                        break
            except Exception:
                pass
        info = ""
        info_md = skill_dir / "info.md"
        if info_md.exists():
            try:
                raw = info_md.read_text(encoding="utf-8", errors="replace").strip()
                info = raw.replace("\n", " ")[:100]
            except Exception:
                pass
        return {
            "name": name,
            "has_skill_md": skill_md.exists(),
            "preview": preview,
            "info": info,
        }

    # ── Remote-fetch helpers ─────────────────────────────────────────────────

    _sp_ssl_ctx_cache: list = []  # mutable singleton — populated on first call

    def _sp_get_ssl_ctx():
        """Return a cached SSL context (built once) that trusts the Pegatron CA bundle."""
        if _sp_ssl_ctx_cache:
            return _sp_ssl_ctx_cache[0]
        import ssl
        ctx = ssl.create_default_context()
        candidates = [
            os.environ.get("REQUESTS_CA_BUNDLE", ""),
            os.environ.get("SSL_CERT_FILE", ""),
        ]
        if config_path:
            candidates.append(str(config_path.parent / "pegatron-ca-bundle.pem"))
        if getattr(sys, "frozen", False):
            candidates.append(str(Path(sys.executable).parent.parent / "config" / "pegatron-ca-bundle.pem"))
        else:
            candidates.append(str(Path(__file__).parent.parent / "config" / "pegatron-ca-bundle.pem"))
        for ca in candidates:
            if ca and os.path.exists(ca):
                try:
                    ctx.load_verify_locations(ca)
                except Exception:
                    pass
                break
        _sp_ssl_ctx_cache.append(ctx)
        return ctx

    def _sp_parse_dirs(html: str, base_url: str = "") -> list[str]:
        """Extract direct-child directory names from an IIS-style HTML listing.

        Pass ``base_url`` (the URL of the page being parsed) so that parent /
        ancestor directory links are automatically filtered out — only entries
        that are exactly one level deeper than ``base_url`` are returned.
        """
        from urllib.parse import unquote as _uq, urlparse as _up
        base_path = _up(base_url).path if base_url else ""
        if base_path and not base_path.endswith("/"):
            base_path += "/"
        result = []
        for link in re.findall(r'href=["\']([^"\'>?#]+/)["\']', html, re.IGNORECASE):
            if "://" in link:
                path = _up(link).path
            elif link.startswith("/"):
                path = link
            else:
                continue  # bare relative href — unexpected from IIS
            if base_path:
                if not path.startswith(base_path):
                    continue
                remainder = path[len(base_path):].rstrip("/")
                if "/" in remainder:  # not a direct child
                    continue
                name = _uq(remainder)
            else:
                name = _uq(path.rstrip("/").rsplit("/", 1)[-1])
            if name and name not in (".", "..", "__pycache__"):
                result.append(name)
        return result

    def _sp_parse_files(html: str, base_url: str = "") -> list[str]:
        """Extract direct-child file names from an IIS-style HTML listing."""
        from urllib.parse import unquote as _uq, urlparse as _up
        base_path = _up(base_url).path if base_url else ""
        if base_path and not base_path.endswith("/"):
            base_path += "/"
        result = []
        for link in re.findall(r'href=["\']([^"\'>?#]+)["\']', html, re.IGNORECASE):
            if link.endswith("/"):
                continue  # directory
            if "://" in link:
                path = _up(link).path
            elif link.startswith("/"):
                path = link
            else:
                continue
            if base_path:
                if not path.startswith(base_path):
                    continue
                remainder = path[len(base_path):]
                if "/" in remainder:  # not a direct file
                    continue
                fname = _uq(remainder)
            else:
                fname = _uq(path.rsplit("/", 1)[-1])
            if fname and not fname.endswith(".pyc"):
                result.append(fname)
        return result

    def _sp_url_join(base: str, *parts: str) -> str:
        """Append URL-encoded path segments to base URL."""
        from urllib.parse import quote as _q
        url = base.rstrip("/")
        for p in parts:
            url += "/" + _q(p, safe="")
        return url

    def _sp_extract_preview(md_text: str) -> str:
        """Return first meaningful line from SKILL.md (skips YAML frontmatter)."""
        in_fm = False
        fm_count = 0
        for line in md_text.splitlines():
            s = line.strip()
            if s == "---":
                fm_count += 1
                in_fm = (fm_count == 1)
                if fm_count == 2:
                    in_fm = False
                continue
            if in_fm:
                continue
            clean = s.lstrip("#").strip()
            if clean:
                return clean
        return ""

    async def _sp_fetch_text(session, url: str) -> str:
        """GET url and return decoded text; raises on HTTP error."""
        import aiohttp as _ah
        async with session.get(url, timeout=_ah.ClientTimeout(total=20), ssl=_sp_get_ssl_ctx()) as r:
            r.raise_for_status()
            return await r.text(encoding="utf-8", errors="replace")

    async def _sp_fetch_bytes(session, url: str) -> bytes:
        """GET url and return raw bytes."""
        import aiohttp as _ah
        async with session.get(url, timeout=_ah.ClientTimeout(total=20), ssl=_sp_get_ssl_ctx()) as r:
            r.raise_for_status()
            return await r.read()

    async def _sp_download_dir(session, remote_url: str, dest: Path, sem) -> list[str]:
        """Recursively download a remote directory into dest (sem limits concurrency).

        Returns a list of file paths that **failed** to download so the caller
        can decide whether to retry or report.
        """
        html = await _sp_fetch_text(session, remote_url)
        dirs = _sp_parse_dirs(html, remote_url)
        files = _sp_parse_files(html, remote_url)
        dest.mkdir(parents=True, exist_ok=True)

        failed: list[str] = []  # collect file names that could not be downloaded

        async def _dl_file(fname: str) -> None:
            async with sem:
                try:
                    data = await _sp_fetch_bytes(session, _sp_url_join(remote_url, fname))
                    (dest / fname).write_bytes(data)
                except Exception as e:
                    failed.append(fname)
                    print(f"[api_server] WARNING: file dl {_sp_url_join(remote_url, fname)}: {e}",
                          file=sys.stderr)

        async def _dl_sub(dname: str) -> list[str]:
            return await _sp_download_dir(session, _sp_url_join(remote_url, dname) + "/",
                                          dest / dname, sem)

        sub_results: list[list[str]] = await asyncio.gather(
            *[_dl_file(f) for f in files],
            *[_dl_sub(d) for d in dirs],
            return_exceptions=True,
        )
        # Merge sub-directory failures
        for sr in sub_results:
            if isinstance(sr, list):
                failed.extend(sr)
        return failed

    # ── Endpoint handlers ────────────────────────────────────────────────────

    async def _handle_skills_pools_list(request: web.Request) -> web.Response:
        """GET /api/skills-pools — fetch categories/skills from remote server.
        Only fetches category listing pages (1 + N_cat requests total); individual
        skill pages are NOT fetched at list time.  has_skill_md is assumed True.
        Falls back to local skills_pools/ when the remote is unreachable.
        """
        active_skills: set[str] = set()
        if workspace_path:
            ws_skills = workspace_path / "skills"
            if ws_skills.is_dir():
                active_skills = {e.name for e in ws_skills.iterdir() if e.is_dir()}

        try:
            import aiohttp as _ah
            async with _ah.ClientSession() as session:
                # ── Step 1: root listing ─────────────────────────────────────
                root_html = await _sp_fetch_text(session, _SKILLS_POOLS_REMOTE_BASE)
                cat_names = _sp_parse_dirs(root_html, _SKILLS_POOLS_REMOTE_BASE)

                # ── Step 2: one request per category, concurrent ─────────────
                async def _fetch_cat(cat_name: str) -> dict:
                    cat_url = _sp_url_join(_SKILLS_POOLS_REMOTE_BASE, cat_name) + "/"
                    try:
                        cat_html = await _sp_fetch_text(session, cat_url)
                        skill_names = _sp_parse_dirs(cat_html, cat_url)
                    except Exception as e:
                        print(f"[api_server] WARNING: category {cat_name}: {e}", file=sys.stderr)
                        return {"name": cat_name, "required": cat_name.lower() == "defult",
                                "skills": []}
                    # Register skill→category map (used by apply)
                    for sn in skill_names:
                        _sp_skill_to_cat[sn] = cat_name

                    async def _fetch_skill_info(sn: str) -> str:
                        info_url = _sp_url_join(cat_url, sn, "info.md")
                        try:
                            info_text = await _sp_fetch_text(session, info_url)
                            return info_text.strip().replace("\n", " ")[:30]
                        except Exception:
                            return ""

                    infos = await asyncio.gather(*[_fetch_skill_info(sn) for sn in skill_names])
                    skills = [
                        {"name": sn, "has_skill_md": True, "preview": "",
                         "info": info, "active": sn in active_skills}
                        for sn, info in zip(sorted(skill_names), infos)
                    ]
                    return {"name": cat_name, "required": cat_name.lower() == "defult",
                            "skills": skills}

                categories = list(await asyncio.gather(
                    *[_fetch_cat(cn) for cn in sorted(cat_names)]))

            return web.Response(
                text=json.dumps({"categories": categories, "source": "remote",
                                 "remote_base": _SKILLS_POOLS_REMOTE_BASE}),
                content_type="application/json", headers=_CORS_HEADERS,
            )

        except Exception as e:
            print(f"[api_server] WARNING: remote skills pool failed ({e}), using local fallback",
                  file=sys.stderr)
            pools_root = _get_skills_pools_root()
            if not pools_root:
                return web.Response(
                    status=503,
                    text=json.dumps({"error": f"Remote unavailable: {e}; no local fallback found."}),
                    content_type="application/json", headers=_CORS_HEADERS)
            cats: list[dict] = []
            for cat_dir in sorted(pools_root.iterdir()):
                if not cat_dir.is_dir():
                    continue
                skills_local: list[dict] = []
                for sd in sorted(cat_dir.iterdir()):
                    if not sd.is_dir():
                        continue
                    info = _read_skill_info(sd)
                    info["active"] = sd.name in active_skills
                    _sp_skill_to_cat[sd.name] = cat_dir.name
                    skills_local.append(info)
                cats.append({"name": cat_dir.name,
                             "required": cat_dir.name.lower() == "defult",
                             "skills": skills_local})
            return web.Response(
                text=json.dumps({"categories": cats, "source": "local",
                                 "remote_base": _SKILLS_POOLS_REMOTE_BASE}),
                content_type="application/json", headers=_CORS_HEADERS,
            )

    async def _handle_skills_pools_apply(request: web.Request) -> web.Response:
        """POST /api/skills-pools/apply — download selected skills from remote server.
        Falls back to local copy when the remote is unreachable.
        """
        if not workspace_path:
            return web.Response(status=503, text=json.dumps({"error": "workspace unavailable"}),
                                content_type="application/json", headers=_CORS_HEADERS)
        try:
            body = await request.json()
        except Exception:
            return web.Response(status=400, text=json.dumps({"error": "invalid JSON"}),
                                content_type="application/json", headers=_CORS_HEADERS)

        selected: set[str] = set(body.get("skills", []))

        # Ensure skill→category map is populated (re-fetch if cache is empty)
        skill_to_cat: dict[str, str] = dict(_sp_skill_to_cat)
        if not skill_to_cat:
            try:
                import aiohttp as _ah
                async with _ah.ClientSession() as _s:
                    _root = await _sp_fetch_text(_s, _SKILLS_POOLS_REMOTE_BASE)
                    for _cn in _sp_parse_dirs(_root, _SKILLS_POOLS_REMOTE_BASE):
                        _cat_url = _sp_url_join(_SKILLS_POOLS_REMOTE_BASE, _cn) + "/"
                        _cat_html = await _sp_fetch_text(_s, _cat_url)
                        for _sn in _sp_parse_dirs(_cat_html, _cat_url):
                            skill_to_cat[_sn] = _cn
                            _sp_skill_to_cat[_sn] = _cn
            except Exception as e:
                print(f"[api_server] WARNING: could not build skill→cat map: {e}", file=sys.stderr)

        # Always include defult skills
        defult_skills: set[str] = {s for s, c in skill_to_cat.items() if c.lower() == "defult"}
        local_root = _get_skills_pools_root()
        if local_root and (local_root / "defult").is_dir():
            for _d in (local_root / "defult").iterdir():
                if _d.is_dir():
                    defult_skills.add(_d.name)
        selected.update(defult_skills)

        import shutil
        ws_skills_dir = workspace_path / "skills"
        ws_skills_dir.mkdir(parents=True, exist_ok=True)

        # Remove deselected skills that belong to a known pool
        all_pool_skills = set(skill_to_cat.keys())
        locked_skills: set[str] = set()  # skills that could not be removed due to file lock
        for existing in ws_skills_dir.iterdir():
            if existing.is_dir() and existing.name not in selected and existing.name in all_pool_skills:
                try:
                    shutil.rmtree(existing)
                except Exception as e:
                    locked_skills.add(existing.name)
                    print(f"[api_server] WARNING: remove {existing.name} skipped (locked?): {e}",
                          file=sys.stderr)

        copied: list[str] = []
        skipped: list[str] = list(locked_skills)  # pre-populate with lock failures

        try:
            import aiohttp as _ah
            sem = asyncio.Semaphore(20)

            async with _ah.ClientSession() as session:
                async def _install(skill_name: str) -> None:
                    # Skip if removal of this skill previously failed (still locked)
                    if skill_name in locked_skills:
                        return
                    cat_name = skill_to_cat.get(skill_name)
                    if not cat_name:
                        skipped.append(skill_name)
                        return
                    skill_dst = ws_skills_dir / skill_name
                    # If the skill directory already exists, remove it first so that
                    # a fresh, complete download is performed.  This avoids stale
                    # partial downloads persisting when the remote skill is updated.
                    if skill_dst.exists():
                        try:
                            shutil.rmtree(skill_dst)
                        except Exception as _rm_err:
                            # Cannot remove (file lock?) — keep as-is, skip re-download
                            print(f"[api_server] WARNING: cannot remove existing {skill_name}: {_rm_err}",
                                  file=sys.stderr)
                            copied.append(skill_name)
                            return
                    skill_url = _sp_url_join(_SKILLS_POOLS_REMOTE_BASE, cat_name, skill_name) + "/"
                    try:
                        failed_files = await _sp_download_dir(session, skill_url, skill_dst, sem)
                        if failed_files:
                            print(f"[api_server] WARNING: {skill_name} — {len(failed_files)} file(s) failed: {failed_files}",
                                  file=sys.stderr)
                        copied.append(skill_name)
                    except Exception as e:
                        print(f"[api_server] WARNING: remote install {skill_name} ({e}); trying local",
                              file=sys.stderr)
                        # Local fallback
                        if local_root:
                            for _cat in local_root.iterdir():
                                local_src = _cat / skill_name
                                if _cat.is_dir() and local_src.is_dir():
                                    try:
                                        shutil.copytree(local_src, skill_dst)
                                        copied.append(skill_name)
                                        return
                                    except Exception:
                                        pass
                        skipped.append(skill_name)

                await asyncio.gather(*[_install(sn) for sn in sorted(selected)],
                                     return_exceptions=True)

        except Exception as e:
            print(f"[api_server] WARNING: remote apply failed ({e}), pure local fallback",
                  file=sys.stderr)
            if not local_root:
                return web.Response(
                    status=503,
                    text=json.dumps({"error": f"Remote and local unavailable: {e}"}),
                    content_type="application/json", headers=_CORS_HEADERS)
            for cat_dir in local_root.iterdir():
                if not cat_dir.is_dir():
                    continue
                for sn in sorted(selected):
                    src = cat_dir / sn
                    if not src.is_dir():
                        continue
                    dst = ws_skills_dir / sn
                    try:
                        if dst.exists():
                            # Remove existing so a fresh copy replaces potentially
                            # stale / partial content.
                            try:
                                shutil.rmtree(dst)
                            except Exception:
                                copied.append(sn)
                                continue
                        shutil.copytree(src, dst)
                        copied.append(sn)
                    except Exception as ce:
                        skipped.append(sn)
                        print(f"[api_server] WARNING: local copy {sn}: {ce}", file=sys.stderr)

        return web.Response(
            text=json.dumps({"ok": True, "copied": sorted(set(copied)), "skipped": sorted(set(skipped))}),
            content_type="application/json",
            headers=_CORS_HEADERS,
        )

    async def _handle_ws_git_log(request: web.Request) -> web.Response:
        if not workspace_path:
            return web.Response(status=503, text=json.dumps({"error": "workspace unavailable"}),
                                content_type="application/json", headers=_CORS_HEADERS)
        git_dir = workspace_path / ".git"
        if not git_dir.exists():
            return web.Response(text=json.dumps({"initialized": False, "commits": []}),
                                content_type="application/json", headers=_CORS_HEADERS)
        import subprocess as _subprocess
        try:
            max_count = min(int(request.rel_url.query.get("max", "30")), 200)
        except ValueError:
            max_count = 30
        try:
            result = _subprocess.run(
                ["git", "log", f"--max-count={max_count}",
                 "--pretty=format:%H\x1f%s\x1f%an\x1f%aI"],
                capture_output=True, text=True,
                cwd=str(workspace_path), timeout=10,
            )
            commits: list[dict] = []
            for line in result.stdout.splitlines():
                parts = line.split("\x1f", 3)
                if len(parts) == 4:
                    commits.append({"sha": parts[0], "message": parts[1],
                                    "author": parts[2], "date": parts[3]})
            return web.Response(text=json.dumps({"initialized": True, "commits": commits}),
                                content_type="application/json", headers=_CORS_HEADERS)
        except Exception as e:
            return web.Response(status=500, text=json.dumps({"error": str(e)}),
                                content_type="application/json", headers=_CORS_HEADERS)

    # Register workspace filesystem routes
    app.router.add_route("OPTIONS", "/api/ws/file",    _handle_options)
    app.router.add_route("OPTIONS", "/api/ws/skills",  _handle_options)
    app.router.add_route("OPTIONS", "/api/ws/git-log", _handle_options)
    app.router.add_route("OPTIONS", "/api/skills-pools",       _handle_options)
    app.router.add_route("OPTIONS", "/api/skills-pools/apply", _handle_options)
    app.router.add_get("/api/ws/file",    _handle_ws_read_file)
    app.router.add_put("/api/ws/file",    _handle_ws_write_file)
    app.router.add_get("/api/ws/skills",  _handle_ws_skills)
    app.router.add_get("/api/ws/git-log", _handle_ws_git_log)
    app.router.add_get("/api/skills-pools",       _handle_skills_pools_list)
    app.router.add_post("/api/skills-pools/apply", _handle_skills_pools_apply)

    # ---------------------------------------------------------------------------
    # Auto-update endpoints
    # ---------------------------------------------------------------------------

    def _get_install_root() -> Path:
        """Return the installation root (parent of api_server/)."""
        if getattr(sys, "frozen", False):
            return Path(sys.executable).parent.parent
        return Path(__file__).parent.parent

    def _get_local_version() -> dict:
        from update_manager import read_local_version
        return read_local_version(_get_install_root())

    def _get_update_config() -> dict:
        from update_manager import read_update_config
        return read_update_config(_get_install_root())

    # Track background staging process
    _update_state: dict = {"status": "idle", "progress": None, "error": None, "process": None, "staged_count": 0, "stage_warning": None, "target_version": ""}

    async def _handle_update_check(request: web.Request) -> web.Response:
        """GET /api/update/check — compare local vs remote versions (pure Python, no PS1)."""
        from update_manager import check_updates
        # Optional ?manifestUrl= to preview a specific historical version
        manifest_url = request.rel_url.query.get("manifestUrl", "")
        loop = asyncio.get_event_loop()
        try:
            data = await loop.run_in_executor(None, check_updates, _get_install_root(), manifest_url)
            return web.Response(
                text=json.dumps(data, ensure_ascii=False),
                content_type="application/json", headers=_CORS_HEADERS,
            )
        except RuntimeError as e:
            return web.Response(
                status=400, text=json.dumps({"error": str(e)}),
                content_type="application/json", headers=_CORS_HEADERS,
            )
        except Exception as e:
            return web.Response(
                status=500, text=json.dumps({"error": str(e)}),
                content_type="application/json", headers=_CORS_HEADERS,
            )

    async def _handle_update_status(request: web.Request) -> web.Response:
        """GET /api/update/status — return current update state and local version."""
        local_ver = _get_local_version()
        pending_flag = _get_install_root() / "update" / "_pending_update"
        progress_file = _get_install_root() / "update" / "_progress.json"

        # Read progress from updater's progress file if available
        updater_progress = None
        if progress_file.exists():
            try:
                updater_progress = json.loads(progress_file.read_text(encoding="utf-8-sig"))
            except Exception:
                pass

        return web.Response(
            text=json.dumps({
                "status": _update_state["status"],
                "progress": _update_state["progress"],
                "error": _update_state["error"],
                "stageWarning": _update_state.get("stage_warning"),
                "stagedCount": _update_state.get("staged_count", 0),
                "targetVersion": _update_state.get("target_version", ""),
                "currentVersion": local_ver.get("version", _APP_VERSION),
                "components": local_ver.get("components", {}),
                "hasPendingUpdate": pending_flag.exists(),
                "updaterProgress": updater_progress,
            }, ensure_ascii=False),
            content_type="application/json", headers=_CORS_HEADERS,
        )

    async def _handle_update_stage(request: web.Request) -> web.Response:
        """POST /api/update/stage — download and SHA-256-verify ZIPs (pure Python, no PS1).

        Optional JSON body:
          { "manifestUrl": "http://server/v0.3.0/manifest.json", "force": true }
        When manifestUrl + force are provided this performs a rollback download.
        """
        if _update_state["status"] == "staging":
            return web.Response(
                text=json.dumps({"error": "staging already in progress"}),
                status=409, content_type="application/json", headers=_CORS_HEADERS,
            )

        # Parse optional body
        body: dict = {}
        try:
            if request.content_length and request.content_length > 0:
                body = await request.json()
        except Exception:
            pass

        override_manifest_url: str = body.get("manifestUrl", "")
        force_install: bool = bool(body.get("force", False))

        if not override_manifest_url and not _get_update_config().get("manifestUrl", ""):
            return web.Response(
                status=400, text=json.dumps({"error": "update.manifestUrl not configured"}),
                content_type="application/json", headers=_CORS_HEADERS,
            )

        _update_state["status"] = "staging"
        _update_state["error"] = None
        _update_state["progress"] = "downloading"

        from update_manager import stage_updates

        def _run_stage() -> None:
            try:
                result       = stage_updates(
                    _get_install_root(),
                    manifest_url=override_manifest_url,
                    force=force_install,
                )
                staged_count = int(result.get("staged", 0))
                failed_count = int(result.get("failed", 0))
                failed_names = list(result.get("failedComponents", []))
                failed_label = ", ".join(failed_names) if failed_names else f"{failed_count} component(s)"

                if staged_count == 0 and failed_count > 0:
                    _update_state["status"] = "error"
                    _update_state["error"]  = f"Failed to download: {failed_label}"
                else:
                    _update_state["status"]        = "staged"
                    _update_state["progress"]      = "ready"
                    _update_state["staged_count"]  = staged_count
                    _update_state["target_version"] = result.get("targetVersion", "")
                    _update_state["stage_warning"] = (
                        f"Partial update: {failed_label} failed checksum or download"
                        if failed_count > 0 else None
                    )
            except Exception as e:
                _update_state["status"] = "error"
                _update_state["error"]  = str(e)

        loop = asyncio.get_event_loop()
        loop.run_in_executor(None, _run_stage)

        return web.Response(
            text=json.dumps({"ok": True, "status": "staging"}),
            content_type="application/json", headers=_CORS_HEADERS,
        )

    async def _handle_update_apply(request: web.Request) -> web.Response:
        """POST /api/update/restart — signal the launcher to restart and apply staged updates."""
        pending_flag = _get_install_root() / "update" / "_pending_update"
        if not pending_flag.exists():
            return web.Response(
                status=400, text=json.dumps({"error": "no staged update to apply"}),
                content_type="application/json", headers=_CORS_HEADERS,
            )

        # Write restart signal — launcher polls for this file and performs the restart
        signal_file = _get_install_root() / "update" / "_restart_signal"
        signal_file.parent.mkdir(parents=True, exist_ok=True)
        signal_file.write_text("apply_update", encoding="utf-8")
        return web.Response(
            text=json.dumps({"ok": True, "status": "applying"}),
            content_type="application/json", headers=_CORS_HEADERS,
        )

    async def _handle_restart_agent(request: web.Request) -> web.Response:
        """POST /api/restart — signal the launcher to restart the MEC Agent."""
        # Write restart signal — launcher polls for this file and performs the restart.
        # This avoids relying on Windows Task Scheduler which may not have access to
        # the user's interactive session when triggered from the api_server process.
        signal_file = _get_install_root() / "update" / "_restart_signal"
        signal_file.parent.mkdir(parents=True, exist_ok=True)
        signal_file.write_text("restart", encoding="utf-8")
        return web.Response(
            text=json.dumps({"ok": True, "status": "restarting"}),
            content_type="application/json", headers=_CORS_HEADERS,
        )

    async def _handle_update_history(request: web.Request) -> web.Response:
        """GET /api/update/history — return server-side version index (all releases)."""
        from update_manager import fetch_version_index
        loop = asyncio.get_event_loop()
        try:
            data = await loop.run_in_executor(None, fetch_version_index, _get_install_root())
            return web.Response(
                text=json.dumps(data, ensure_ascii=False),
                content_type="application/json", headers=_CORS_HEADERS,
            )
        except RuntimeError as e:
            return web.Response(
                status=400, text=json.dumps({"error": str(e)}),
                content_type="application/json", headers=_CORS_HEADERS,
            )
        except Exception as e:
            return web.Response(
                status=500, text=json.dumps({"error": str(e)}),
                content_type="application/json", headers=_CORS_HEADERS,
            )

    async def _handle_update_rollback(request: web.Request) -> web.Response:
        """POST /api/update/rollback — stage a specific historical version for installation.

        Request body: { "manifestUrl": "http://server/v0.3.0/manifest.json" }

        This is a convenience alias for POST /api/update/stage with force=true and
        a custom manifestUrl. The client can reuse the same staging/progress/restart
        flow as for a normal update.
        """
        if _update_state["status"] == "staging":
            return web.Response(
                text=json.dumps({"error": "staging already in progress"}),
                status=409, content_type="application/json", headers=_CORS_HEADERS,
            )

        try:
            body: dict = await request.json()
        except Exception:
            return web.Response(
                status=400, text=json.dumps({"error": "request body must be JSON with {manifestUrl}"}),
                content_type="application/json", headers=_CORS_HEADERS,
            )

        manifest_url: str = body.get("manifestUrl", "")
        if not manifest_url:
            return web.Response(
                status=400, text=json.dumps({"error": "manifestUrl is required"}),
                content_type="application/json", headers=_CORS_HEADERS,
            )

        _update_state["status"] = "staging"
        _update_state["error"] = None
        _update_state["progress"] = "downloading"
        _update_state["target_version"] = ""

        from update_manager import stage_updates

        def _run_rollback() -> None:
            try:
                result       = stage_updates(
                    _get_install_root(),
                    manifest_url=manifest_url,
                    force=True,
                )
                staged_count = int(result.get("staged", 0))
                failed_count = int(result.get("failed", 0))
                failed_names = list(result.get("failedComponents", []))
                failed_label = ", ".join(failed_names) if failed_names else f"{failed_count} component(s)"

                if staged_count == 0 and failed_count > 0:
                    _update_state["status"] = "error"
                    _update_state["error"]  = f"Failed to download: {failed_label}"
                else:
                    _update_state["status"]         = "staged"
                    _update_state["progress"]       = "ready"
                    _update_state["staged_count"]   = staged_count
                    _update_state["target_version"] = result.get("targetVersion", "")
                    _update_state["stage_warning"]  = (
                        f"Partial rollback: {failed_label} failed checksum or download"
                        if failed_count > 0 else None
                    )
            except Exception as e:
                _update_state["status"] = "error"
                _update_state["error"]  = str(e)

        loop = asyncio.get_event_loop()
        loop.run_in_executor(None, _run_rollback)

        return web.Response(
            text=json.dumps({"ok": True, "status": "staging"}),
            content_type="application/json", headers=_CORS_HEADERS,
        )

    # ── Model management endpoints ─────────────────────────────────────────

    async def _handle_models_status(request: web.Request) -> web.Response:
        """GET /api/models/status — check whether the active server model is installed."""
        from update_manager import check_model_update, read_model_config
        try:
            root = _get_install_root()
            status = await asyncio.get_event_loop().run_in_executor(
                None, check_model_update, root
            )
            local_cfg = read_model_config(root)
            return web.Response(
                text=json.dumps({
                    **status,
                    "downloadDate":      local_cfg.get("downloadDate", ""),
                    "downloadStatus":    _model_dl_state["status"],
                    "bytesDownloaded":   _model_dl_state["bytesDownloaded"],
                    "totalBytes":        _model_dl_state["totalBytes"],
                    "downloadError":     _model_dl_state["error"],
                }),
                content_type="application/json", headers=_CORS_HEADERS,
            )
        except Exception as e:
            return web.Response(
                status=500, text=json.dumps({"error": str(e)}),
                content_type="application/json", headers=_CORS_HEADERS,
            )

    # Global state for model download progress
    _model_dl_state: dict = {
        "status": "idle", "error": None, "filename": "",
        "bytesDownloaded": 0, "totalBytes": 0,
    }

    async def _handle_models_download(request: web.Request) -> web.Response:
        """POST /api/models/download — start downloading the active model in the background."""
        from update_manager import download_model
        if _model_dl_state["status"] == "downloading":
            return web.Response(
                text=json.dumps({"ok": False, "error": "download already in progress"}),
                status=409, content_type="application/json", headers=_CORS_HEADERS,
            )

        _model_dl_state["status"] = "downloading"
        _model_dl_state["error"]  = None
        _model_dl_state["filename"] = ""
        _model_dl_state["bytesDownloaded"] = 0
        _model_dl_state["totalBytes"] = 0

        def _on_progress(phase: str, current: int, total: int, component: str) -> None:
            if phase == "model_download":
                _model_dl_state["bytesDownloaded"] = current
                _model_dl_state["totalBytes"] = total

        def _run_dl() -> None:
            try:
                result = download_model(_get_install_root(), progress_cb=_on_progress)
                _model_dl_state["status"]   = "done"
                _model_dl_state["filename"] = result.get("filename", "")
            except Exception as e:
                _model_dl_state["status"] = "error"
                _model_dl_state["error"]  = str(e)

        asyncio.get_event_loop().run_in_executor(None, _run_dl)
        return web.Response(
            text=json.dumps({"ok": True, "status": "downloading"}),
            content_type="application/json", headers=_CORS_HEADERS,
        )

    app.router.add_route("OPTIONS", "/api/restart",              _handle_options)
    app.router.add_route("OPTIONS", "/api/update/check",         _handle_options)
    app.router.add_route("OPTIONS", "/api/update/status",        _handle_options)
    app.router.add_route("OPTIONS", "/api/update/stage",         _handle_options)
    app.router.add_route("OPTIONS", "/api/update/restart",       _handle_options)
    app.router.add_route("OPTIONS", "/api/update/history",       _handle_options)
    app.router.add_route("OPTIONS", "/api/update/rollback",      _handle_options)
    app.router.add_route("OPTIONS", "/api/models/status",        _handle_options)
    app.router.add_route("OPTIONS", "/api/models/download",      _handle_options)
    app.router.add_post("/api/restart",              _handle_restart_agent)
    app.router.add_get("/api/update/check",          _handle_update_check)
    app.router.add_get("/api/update/status",         _handle_update_status)
    app.router.add_post("/api/update/stage",         _handle_update_stage)
    app.router.add_post("/api/update/restart",       _handle_update_apply)
    app.router.add_get("/api/update/history",        _handle_update_history)
    app.router.add_post("/api/update/rollback",      _handle_update_rollback)
    app.router.add_get("/api/models/status",         _handle_models_status)
    app.router.add_post("/api/models/download",      _handle_models_download)

    # Start listening immediately so the UI can reach us without waiting for nanobot to load.
    runner = web.AppRunner(app)
    await runner.setup()
    site = web.TCPSite(runner, args.host, args.port, reuse_address=True)
    await site.start()
    print(f"[api_server] Listening on http://{args.host}:{args.port}", flush=True)

    # ── Start CronService (with on_job callback for notifications) ────────────
    if cron_svc:
        try:
            await cron_svc.start()
            print("[api_server] CronService started", flush=True)
        except Exception as _e:
            print(f"[api_server] WARNING: CronService start failed: {_e}", file=sys.stderr)

    # Block until process is killed
    try:
        await asyncio.Event().wait()
    finally:
        if cron_svc:
            try:
                cron_svc.stop()
            except Exception:
                pass
        await runner.cleanup()


def _run_apply_staged() -> None:
    """Apply staged update and exit. Invoked when --apply-staged is passed.

    This runs synchronously — api_server is not started, no web listener is bound.
    Called by the launcher after all running services have been stopped.
    """
    import argparse as _ap
    p = _ap.ArgumentParser(add_help=False)
    p.add_argument("--apply-staged", action="store_true")
    p.add_argument("--install-root", default="", dest="install_root")
    args, _ = p.parse_known_args()

    from update_manager import apply_staged, get_install_root
    install_root = Path(args.install_root).resolve() if args.install_root else get_install_root()
    result = apply_staged(install_root)
    print(json.dumps(result))
    sys.exit(0 if result.get("applied") or result.get("components", 0) > 0 else 1)


def main() -> None:
    # Detect --apply-staged early — before importing aiohttp or starting anything
    if "--apply-staged" in sys.argv:
        _run_apply_staged()
        return
    asyncio.run(_main())


if __name__ == "__main__":
    main()
 







Login to like - 0 Likes



Comments...


No Comments Yet...



Add Comment...



shumin

A graduated biotechnology engineer. Now is a software engineer


Latest Posts



Footer with Icons