"""
api_server/server.py - Standalone OpenAI-compatible API server for nanobot.
Uses nanobot.api.server (aiohttp) + nanobot.nanobot.Nanobot.from_config
to expose /v1/chat/completions, /v1/models, and /health.
CLI usage:
python server.py --config <path> --workspace <path> [--port 47391]
[--model nanobot] [--timeout 120] [--parent-pid <PID>]
"""
from __future__ import annotations
import argparse
import asyncio
import json
import os
import re
import sys
import threading
import uuid
import urllib.error
import urllib.parse
import urllib.request
from pathlib import Path
import httpx
import mimetypes
_CORS_HEADERS = {
"Access-Control-Allow-Origin": "*",
"Access-Control-Allow-Methods": "GET, PUT, PATCH, DELETE, POST, OPTIONS",
"Access-Control-Allow-Headers": "Content-Type, Authorization",
"Access-Control-Max-Age": "86400",
}
# Application version — read dynamically from update/version.json so the
# /api/version endpoint always reflects the actually installed version
# (not a stale compile-time constant). Falls back to the value below only
# when version.json cannot be found (e.g. very early dev runs).
_APP_VERSION_FALLBACK = "0.3.9"
def _read_app_version() -> str:
"""Read the installed version from update/version.json.
Works in both frozen (PyInstaller) and dev modes because it uses the same
install-root logic as update_manager.get_install_root().
"""
try:
if getattr(sys, "frozen", False):
install_root = Path(sys.executable).parent.parent
else:
install_root = Path(__file__).parent.parent
vf = install_root / "update" / "version.json"
if vf.exists():
data = json.loads(vf.read_text(encoding="utf-8-sig"))
v = data.get("version", "")
if v and v != "0.0.0":
return v
except Exception:
pass
return _APP_VERSION_FALLBACK
_APP_VERSION = _read_app_version()
# ---------------------------------------------------------------------------
# Parent-process watchdog
# ---------------------------------------------------------------------------
def _is_pid_alive(pid: int) -> bool:
"""Return True if *pid* refers to a running process.
Uses the Windows kernel API on Windows because ``os.kill(pid, 0)``
maps to ``CTRL_C_EVENT`` (signal value 0) on Windows, which sends an
unwanted control event to the target process group and may raise
``OSError`` when the two processes belong to different console groups,
causing the watchdog to incorrectly conclude the parent is gone.
"""
if os.name == "nt":
import ctypes
kernel32 = ctypes.windll.kernel32 # type: ignore[attr-defined]
PROCESS_QUERY_LIMITED_INFORMATION = 0x1000
STILL_ACTIVE = 259
handle = kernel32.OpenProcess(PROCESS_QUERY_LIMITED_INFORMATION, False, pid)
if not handle:
return False
try:
exit_code = ctypes.c_ulong(0)
ok = kernel32.GetExitCodeProcess(handle, ctypes.byref(exit_code))
return bool(ok) and exit_code.value == STILL_ACTIVE
finally:
kernel32.CloseHandle(handle)
else:
try:
os.kill(pid, 0)
return True
except ProcessLookupError:
return False
except PermissionError:
return True # process exists but we lack permission to signal it
def _start_parent_watchdog(parent_pid: int) -> None:
"""Exit this process when the parent process (launcher) disappears."""
import time
def _watch() -> None:
while True:
time.sleep(5)
if not _is_pid_alive(parent_pid):
os._exit(0)
t = threading.Thread(target=_watch, daemon=True)
t.start()
# ---------------------------------------------------------------------------
# Entry point
# ---------------------------------------------------------------------------
def _parse_args() -> argparse.Namespace:
p = argparse.ArgumentParser(description="Nanobot OpenAI-compatible API server")
p.add_argument("--config", default=None, help="Path to nanobot config.json")
p.add_argument("--workspace", default=None, help="Override workspace directory")
p.add_argument("--port", type=int, default=47391, help="HTTP port to listen on (default: 47391)")
p.add_argument("--host", default="127.0.0.1", help="Host to bind (default: 127.0.0.1)")
p.add_argument("--model", default="nanobot", help="Model name reported in responses")
p.add_argument("--timeout", type=float, default=120.0, help="Per-request timeout in seconds")
p.add_argument("--parent-pid", type=int, default=0, dest="parent_pid",
help="If set, exit when this PID disappears")
p.add_argument("--mec-wiki", default=None, dest="mec_wiki",
help="Path to the bundled ein-wiki folder (contains runtime/node.exe and server.js)")
p.add_argument("--mec-wiki-vault", default=None, dest="mec_wiki_vault",
help="Default vault path passed to Ein-Wiki server (WIKI_VAULT_PATH env var)")
p.add_argument("--apply-staged", action="store_true", dest="apply_staged",
help="Apply staged update and exit (invoked by restart helper)")
p.add_argument("--install-root", default="", dest="install_root",
help="Installation root path for --apply-staged mode")
return p.parse_args()
async def _main() -> None:
args = _parse_args()
if args.parent_pid:
_start_parent_watchdog(args.parent_pid)
# Resolve config path
config_path: Path | None = None
if args.config:
config_path = Path(args.config).expanduser().resolve()
if not config_path.exists():
print(f"[api_server] ERROR: config not found: {config_path}", file=sys.stderr)
sys.exit(1)
from aiohttp import web
@web.middleware
async def cors_middleware(request: web.Request, handler):
# Handle preflight (OPTIONS) for any route, including unregistered ones.
if request.method == "OPTIONS":
return web.Response(headers=_CORS_HEADERS)
try:
response = await handler(request)
except web.HTTPException as exc:
# Framework-generated errors (404, 405, etc.) — attach CORS headers.
for k, v in _CORS_HEADERS.items():
exc.headers[k] = v
raise
# Attach CORS headers to every successful response.
for k, v in _CORS_HEADERS.items():
response.headers[k] = v
return response
app = web.Application(middlewares=[cors_middleware])
# ---------------------------------------------------------------------------
# User overrides endpoint — available immediately, no nanobot required.
# Stored in <config_dir>/user_overrides.json.
# ---------------------------------------------------------------------------
def _overrides_path() -> Path | None:
return config_path.parent / "user_overrides.json" if config_path else None
async def _handle_options(request: web.Request) -> web.Response:
return web.Response(headers=_CORS_HEADERS)
async def _handle_get_overrides(request: web.Request) -> web.Response:
path = _overrides_path()
data: dict = {}
if path and path.exists():
try:
data = json.loads(path.read_text(encoding="utf-8-sig"))
except Exception:
pass
api_key = data.get("apiKey", "")
masked = (api_key[:4] + "***" + api_key[-4:]) if len(api_key) > 8 else ("***" if api_key else "")
return web.Response(
text=json.dumps({
"hasApiKey": bool(api_key),
"maskedApiKey": masked,
"whisperModelPath": data.get("whisperModelPath", ""),
}),
content_type="application/json",
headers=_CORS_HEADERS,
)
async def _handle_patch_overrides(request: web.Request) -> web.Response:
path = _overrides_path()
if not path:
return web.Response(status=503, text="config path unknown", headers=_CORS_HEADERS)
try:
body = await request.json()
except Exception:
return web.Response(status=400, text="invalid JSON", headers=_CORS_HEADERS)
existing: dict = {}
if path.exists():
try:
existing = json.loads(path.read_text(encoding="utf-8-sig"))
except Exception:
pass
if "apiKey" in body:
existing["apiKey"] = str(body["apiKey"])
if "whisperModelPath" in body:
existing["whisperModelPath"] = str(body["whisperModelPath"]).strip()
# Clear cached model so next request reloads from the new path
_whisper_model_cache.clear()
# Patch the live runtime config.json so the change takes effect without restart.
if config_path and config_path.exists() and "apiKey" in body:
try:
raw = config_path.read_text(encoding="utf-8")
escaped = existing["apiKey"].replace('"', '\\"')
raw = re.sub(
r'(?s)("custom"\s*:\s*\{[^}]*?"apiKey"\s*:\s*)"[^"]*"',
r'\1"' + escaped + '"',
raw,
count=1,
)
config_path.write_text(raw, encoding="utf-8")
except Exception as e:
print(f"[api_server] WARNING: could not patch runtime config: {e}", file=sys.stderr)
path.write_text(json.dumps(existing, ensure_ascii=False, indent=2), encoding="utf-8")
return web.Response(
text=json.dumps({"ok": True}),
content_type="application/json",
headers=_CORS_HEADERS,
)
async def _handle_health(request: web.Request) -> web.Response:
return web.Response(text=json.dumps({"status": "ok"}), content_type="application/json")
# ---------------------------------------------------------------------------
# Sidebar-state endpoint — chat title overrides & sidebar preferences.
# Stored in <config_dir>/sidebar_state.json.
# ---------------------------------------------------------------------------
_SIDEBAR_STATE_DEFAULT: dict = {
"schema_version": 1,
"pinned_keys": [],
"archived_keys": [],
"title_overrides": {},
"project_name_overrides": {},
"tags_by_key": {},
"collapsed_groups": {},
"view": {
"density": "comfortable",
"show_previews": True,
"show_timestamps": False,
"show_archived": False,
"sort": "updated_desc",
},
}
_MAX_STATE_FILE_BYTES = 256 * 1024
_MAX_LIST_ITEMS = 2000
_MAX_MAP_ITEMS = 2000
_MAX_KEY_LEN = 512
_MAX_TITLE_LEN = 160
_MAX_TAG_LEN = 40
_MAX_TAGS_PER_KEY = 12
def _sidebar_state_path() -> Path | None:
return config_path.parent / "sidebar_state.json" if config_path else None
def _clean_str(val: object, max_len: int) -> str | None:
"""Return stripped string (capped at max_len), or None if empty/invalid."""
if not isinstance(val, str):
return None
s = val.strip()[:max_len]
return s if s else None
def _normalize_sidebar_state(raw: dict) -> dict:
"""Normalize an arbitrary dict into a valid sidebar-state v1 structure."""
out: dict = {
"schema_version": 1,
"pinned_keys": [],
"archived_keys": [],
"title_overrides": {},
"project_name_overrides": {},
"tags_by_key": {},
"collapsed_groups": {},
"view": dict(_SIDEBAR_STATE_DEFAULT["view"]),
}
# Lists: pinned_keys, archived_keys
for list_key in ("pinned_keys", "archived_keys"):
val = raw.get(list_key)
if isinstance(val, list):
seen: set[str] = set()
for item in val:
s = _clean_str(item, _MAX_KEY_LEN)
if s and s not in seen:
seen.add(s)
out[list_key].append(s) # type: ignore[literal-required]
if len(out[list_key]) >= _MAX_LIST_ITEMS: # type: ignore[literal-required]
break
# Maps: title_overrides, project_name_overrides
for map_key in ("title_overrides", "project_name_overrides"):
val = raw.get(map_key)
if isinstance(val, dict):
count = 0
for k, v in val.items():
ck = _clean_str(k, _MAX_KEY_LEN)
cv = _clean_str(v, _MAX_TITLE_LEN)
if ck is None:
continue
if cv is None:
continue # empty value = delete, so skip it
out[map_key][ck] = cv # type: ignore[literal-required]
count += 1
if count >= _MAX_MAP_ITEMS:
break
# tags_by_key
val = raw.get("tags_by_key")
if isinstance(val, dict):
count = 0
for k, v in val.items():
ck = _clean_str(k, _MAX_KEY_LEN)
if ck is None:
continue
if not isinstance(v, list):
continue
tags: list[str] = []
for t in v:
ct = _clean_str(t, _MAX_TAG_LEN)
if ct:
tags.append(ct)
if len(tags) >= _MAX_TAGS_PER_KEY:
break
if tags:
out["tags_by_key"][ck] = tags
count += 1
if count >= _MAX_MAP_ITEMS:
break
# collapsed_groups
val = raw.get("collapsed_groups")
if isinstance(val, dict):
for k, v in val.items():
ck = _clean_str(k, _MAX_KEY_LEN)
if ck is None:
continue
out["collapsed_groups"][ck] = bool(v)
# view
val = raw.get("view")
if isinstance(val, dict):
v = out["view"]
d = val.get("density")
if d in ("comfortable", "compact"):
v["density"] = d
for flag in ("show_previews", "show_timestamps", "show_archived"):
if flag in val:
v[flag] = bool(val[flag])
s = val.get("sort")
if s in ("updated_desc", "created_desc", "title_asc"):
v["sort"] = s
return out
def _read_sidebar_state() -> dict:
"""Read sidebar_state.json; return normalized state (default on error).
If a ``.json.tmp`` staging file exists and is newer than the primary
``.json`` (left behind by a failed ``os.replace()``), we recover
from it so that data is not lost.
"""
path = _sidebar_state_path()
tmp = path.with_suffix(".json.tmp") if path else None
# Decide which file to read: prefer the newer one.
read_path = None
if path and path.exists():
read_path = path
# If a tmp staging file exists and is newer, it holds the latest data.
if tmp and tmp.exists():
try:
if tmp.stat().st_mtime > path.stat().st_mtime:
read_path = tmp
except OSError:
pass
elif tmp and tmp.exists():
# Primary file missing — recover from tmp.
read_path = tmp
if read_path:
try:
raw = json.loads(read_path.read_text(encoding="utf-8-sig"))
if isinstance(raw, dict):
recovered = _normalize_sidebar_state(raw)
# If we read from tmp, promote it to the primary location.
if read_path is tmp:
try:
tmp.replace(path)
except OSError:
pass
return recovered
except Exception:
pass
return dict(_SIDEBAR_STATE_DEFAULT)
def _write_sidebar_state(raw: dict) -> dict:
"""Normalize, set updated_at, atomically write; return normalized state.
Uses a ``.json.tmp`` staging file + ``os.replace()`` for atomicity.
On Windows, anti-virus or search-indexer locks can cause the replace
to fail transiently, so we retry a few times. If atomic replace
still fails we fall back to ``shutil.copy2`` and finally to a
direct ``write_text`` so the primary ``.json`` is always updated.
"""
import time as _time
import shutil as _shutil
state = _normalize_sidebar_state(raw)
state["updated_at"] = __import__("datetime").datetime.now(__import__("datetime").timezone.utc).isoformat()
path = _sidebar_state_path()
if path:
tmp = path.with_suffix(".json.tmp")
payload = json.dumps(state, ensure_ascii=False, indent=2)
_written = False
# --- Strategy 1: atomic via tmp + os.replace (with retries) ---
_max_retries = 5
_retry_delay_s = 0.05
for _attempt in range(_max_retries + 1):
try:
tmp.write_text(payload, encoding="utf-8")
tmp.replace(path)
_written = True
break
except OSError as e:
if _attempt < _max_retries:
_time.sleep(_retry_delay_s)
_retry_delay_s *= 2
else:
print(
f"[api_server] WARN tmp.replace() failed after "
f"{_max_retries + 1} attempts: {e}; trying fallback",
file=sys.stderr,
)
# --- Strategy 2: shutil.copy2(tmp, path) + unlink tmp ---
if not _written:
try:
_shutil.copy2(str(tmp), str(path))
try:
tmp.unlink()
except OSError:
pass
_written = True
except OSError as e2:
print(
f"[api_server] WARN shutil.copy2 fallback failed: {e2}; "
f"trying direct write",
file=sys.stderr,
)
# --- Strategy 3: direct write (non-atomic but reliable) ---
if not _written:
try:
path.write_text(payload, encoding="utf-8")
_written = True
try:
tmp.unlink()
except OSError:
pass
except OSError as e3:
print(
f"[api_server] ERROR all write strategies failed for "
f"sidebar_state.json: {e3}",
file=sys.stderr,
)
# fsync for durability when we succeeded
if _written:
try:
fd = os.open(str(path), os.O_RDONLY)
os.fsync(fd)
os.close(fd)
except Exception:
pass
return state
async def _handle_get_sidebar_state(request: web.Request) -> web.Response:
state = _read_sidebar_state()
return web.Response(
text=json.dumps(state, ensure_ascii=False),
content_type="application/json",
headers=_CORS_HEADERS,
)
async def _handle_patch_sidebar_state(request: web.Request) -> web.Response:
try:
body = await request.json()
except Exception:
return web.Response(status=400, text="invalid JSON", headers=_CORS_HEADERS)
if not isinstance(body, dict):
return web.Response(status=400, text="state must be an object", headers=_CORS_HEADERS)
# Merge with existing state: only fields present in the patch body are updated.
# For dict fields (title_overrides, project_name_overrides, tags_by_key,
# collapsed_groups) we deep-merge so that partial patches don't erase
# entries that were not included in the patch body. For list and view
# fields we replace wholesale (as before).
existing = _read_sidebar_state()
_dict_keys = {
"title_overrides", "project_name_overrides",
"tags_by_key", "collapsed_groups",
}
for key in (
"pinned_keys", "archived_keys",
"title_overrides", "project_name_overrides",
"tags_by_key", "collapsed_groups", "view",
):
if key in body:
if key in _dict_keys and isinstance(body[key], dict):
existing.setdefault(key, {})
existing[key].update(body[key])
else:
existing[key] = body[key]
state = _write_sidebar_state(existing)
return web.Response(
text=json.dumps(state, ensure_ascii=False),
content_type="application/json",
headers=_CORS_HEADERS,
)
app.router.add_route("OPTIONS", "/api/user-overrides", _handle_options)
app.router.add_get("/api/user-overrides", _handle_get_overrides)
app.router.add_patch("/api/user-overrides", _handle_patch_overrides)
app.router.add_get("/health", _handle_health)
app.router.add_route("OPTIONS", "/api/sidebar-state", _handle_options)
app.router.add_get("/api/sidebar-state", _handle_get_sidebar_state)
app.router.add_patch("/api/sidebar-state", _handle_patch_sidebar_state)
# ---------------------------------------------------------------------------
# Auto-start endpoint — check / toggle Windows auto-start (Startup shortcut
# + Registry Run key). No restart needed; changes take effect immediately.
# ---------------------------------------------------------------------------
import winreg
_AUTOSTART_REG_KEY = r"Software\Microsoft\Windows\CurrentVersion\Run"
_AUTOSTART_REG_NAME = "MEC_Agent"
def _autostart_shortcut_path() -> Path:
"""Return the path of the Startup-folder shortcut."""
startup = Path(os.environ.get("APPDATA", "")) / "Microsoft" / "Windows" / "Start Menu" / "Programs" / "Startup"
return startup / "MEC Agent.lnk"
def _autostart_vbs_path() -> Path:
"""Return the path of launch_mec_agent.vbs relative to install root."""
return _get_install_root() / "launcher" / "launch_mec_agent.vbs"
def _check_autostart_enabled() -> dict:
"""Check both Startup shortcut and Registry Run key."""
shortcut_exists = _autostart_shortcut_path().exists()
reg_exists = False
try:
with winreg.OpenKey(winreg.HKEY_CURRENT_USER, _AUTOSTART_REG_KEY, 0, winreg.KEY_READ) as key:
val, _ = winreg.QueryValueEx(key, _AUTOSTART_REG_NAME)
reg_exists = True
except (FileNotFoundError, OSError):
pass
enabled = shortcut_exists or reg_exists
return {
"enabled": enabled,
"shortcut_exists": shortcut_exists,
"registry_exists": reg_exists,
}
def _set_autostart(enabled: bool) -> dict:
"""Enable or disable auto-start via both Startup shortcut and Registry Run key."""
shortcut_path = _autostart_shortcut_path()
vbs_path = _autostart_vbs_path()
if enabled:
# --- Create Startup-folder shortcut using PowerShell ---
# Use WScript.Shell COM to create a .lnk file (no extra deps).
# We use -EncodedCommand (Base64 UTF-16LE) to avoid all quoting /
# escaping hell between Python → cmd → PowerShell layers.
wscript = Path(os.environ.get("SystemRoot", r"C:\Windows")) / "system32" / "wscript.exe"
icon_path = _get_install_root() / "mec_agent.ico"
ps_script = (
f'$ws = New-Object -ComObject WScript.Shell\n'
f'$sc = $ws.CreateShortcut(\'{shortcut_path}\')\n'
f'$sc.TargetPath = \'{wscript}\'\n'
f'$sc.Arguments = \'"{vbs_path}"\'\n'
f'$sc.WorkingDirectory = \'{vbs_path.parent}\'\n'
f'$sc.IconLocation = \'{icon_path},0\'\n'
f'$sc.Description = \'MEC Agent\'\n'
f'$sc.Save()'
)
import base64
import subprocess
encoded = base64.b64encode(ps_script.encode("utf-16-le")).decode("ascii")
subprocess.run(
["powershell.exe", "-NoProfile", "-NonInteractive", "-EncodedCommand", encoded],
check=True, timeout=10,
)
# --- Create Registry Run key ---
_system_root = os.environ.get("SystemRoot", r"C:\Windows")
_wscript = os.path.join(_system_root, "system32", "wscript.exe")
cmd_value = f'"{_wscript}" "{vbs_path}"'
try:
with winreg.OpenKey(winreg.HKEY_CURRENT_USER, _AUTOSTART_REG_KEY, 0, winreg.KEY_SET_VALUE) as key:
winreg.SetValueEx(key, _AUTOSTART_REG_NAME, 0, winreg.REG_SZ, cmd_value)
except OSError as exc:
import logging
logging.getLogger("api_server").warning("Failed to set autostart registry key: %s", exc)
else:
# --- Remove Startup-folder shortcut ---
if shortcut_path.exists():
try:
shortcut_path.unlink()
except OSError:
pass
# --- Remove Registry Run key ---
try:
with winreg.OpenKey(winreg.HKEY_CURRENT_USER, _AUTOSTART_REG_KEY, 0, winreg.KEY_SET_VALUE) as key:
winreg.DeleteValue(key, _AUTOSTART_REG_NAME)
except (FileNotFoundError, OSError):
pass
return _check_autostart_enabled()
async def _handle_get_autostart(request: web.Request) -> web.Response:
status = _check_autostart_enabled()
return web.Response(
text=json.dumps(status),
content_type="application/json",
headers=_CORS_HEADERS,
)
async def _handle_patch_autostart(request: web.Request) -> web.Response:
try:
body = await request.json()
except Exception:
return web.Response(status=400, text="invalid JSON", headers=_CORS_HEADERS)
enabled = body.get("enabled")
if enabled is None or not isinstance(enabled, bool):
return web.Response(
status=400,
text=json.dumps({"error": "enabled (boolean) field required"}),
content_type="application/json",
headers=_CORS_HEADERS,
)
status = _set_autostart(enabled)
return web.Response(
text=json.dumps(status),
content_type="application/json",
headers=_CORS_HEADERS,
)
app.router.add_route("OPTIONS", "/api/autostart", _handle_options)
app.router.add_get("/api/autostart", _handle_get_autostart)
app.router.add_patch("/api/autostart", _handle_patch_autostart)
# ---------------------------------------------------------------------------
# Local image proxy — serves image files from allowed directories so that
# browser <img> elements can display file:/// paths (e.g. Playwright MCP
# screenshots) that the browser security model would otherwise block.
# ---------------------------------------------------------------------------
_LOCAL_IMAGE_ALLOWED_DIRS: list[Path] = []
# populated after workspace_path is resolved (see below)
async def _handle_local_image(request: web.Request) -> web.Response:
"""Serve an image file from an allowed local directory.
The ``path`` query parameter is the absolute local filesystem path.
Only files inside the allowed directories are served; 403 otherwise.
"""
raw_path = request.query.get("path", "")
if not raw_path:
return web.Response(status=400, text="missing path parameter",
headers=_CORS_HEADERS)
try:
# Handle file:/// URLs and plain paths
if raw_path.startswith("file:///"):
raw_path = raw_path[len("file:///"):]
elif raw_path.startswith("file://"):
raw_path = raw_path[len("file://"):]
# URL-decode (e.g. %20 → space)
raw_path = urllib.parse.unquote(raw_path)
candidate = Path(raw_path).resolve()
except (OSError, ValueError):
return web.Response(status=400, text="invalid path",
headers=_CORS_HEADERS)
# Security: only serve files from allowed directories
allowed = False
for allowed_dir in _LOCAL_IMAGE_ALLOWED_DIRS:
try:
candidate.relative_to(allowed_dir.resolve())
allowed = True
break
except (ValueError, OSError):
continue
if not allowed:
return web.Response(status=403, text="path not in allowed directories",
headers=_CORS_HEADERS)
# Playwright screenshot paths can drift between workspace/.playwright-mcp
# and repo/.playwright-mcp across different launch modes. When the given
# file is missing, try the same filename in other allowed screenshot dirs.
if (not candidate.is_file()) and candidate.name and candidate.parent.name == ".playwright-mcp":
for allowed_dir in _LOCAL_IMAGE_ALLOWED_DIRS:
try:
alt = (allowed_dir / candidate.name).resolve()
except (OSError, ValueError):
continue
if alt.is_file():
candidate = alt
break
if not candidate.is_file():
return web.Response(status=404, text="file not found",
headers=_CORS_HEADERS)
try:
body = candidate.read_bytes()
except OSError:
return web.Response(status=500, text="read error",
headers=_CORS_HEADERS)
mime, _ = mimetypes.guess_type(candidate.name)
if not mime or not mime.startswith("image/"):
return web.Response(status=403, text="not an image file",
headers=_CORS_HEADERS)
return web.Response(
body=body,
content_type=mime,
headers={
**_CORS_HEADERS,
"Cache-Control": "private, max-age=3600",
"X-Content-Type-Options": "nosniff",
},
)
app.router.add_route("OPTIONS", "/api/local-image", _handle_options)
app.router.add_get("/api/local-image", _handle_local_image)
# ---------------------------------------------------------------------------
# Open local file — opens a file or directory with the OS default handler
# (e.g. Explorer for folders, default editor for files). Used by the UI
# when the user clicks a file path link in chat messages.
# ---------------------------------------------------------------------------
async def _handle_open_file(request: web.Request) -> web.Response:
"""Open a local file/directory with the OS default handler.
POST body: { "path": "D:\\folder\\file.txt" }
Security: only paths within allowed directories are permitted.
"""
try:
body = await request.json()
except Exception:
return web.Response(status=400, text="invalid JSON",
headers=_CORS_HEADERS)
raw_path = body.get("path", "") if isinstance(body, dict) else ""
if not raw_path:
return web.Response(status=400, text="missing path",
headers=_CORS_HEADERS)
try:
if raw_path.startswith("file:///"):
raw_path = raw_path[len("file:///"):]
elif raw_path.startswith("file://"):
raw_path = raw_path[len("file://"):]
raw_path = urllib.parse.unquote(raw_path)
candidate = Path(raw_path).resolve()
except (OSError, ValueError):
return web.Response(status=400, text="invalid path",
headers=_CORS_HEADERS)
# Security: only allow paths inside allowed directories
allowed = False
for allowed_dir in _LOCAL_IMAGE_ALLOWED_DIRS:
try:
candidate.relative_to(allowed_dir.resolve())
allowed = True
break
except (ValueError, OSError):
continue
# Also allow any existing path that is not hidden / system-critical
# as a fallback for paths outside workspace (e.g. user project dirs)
if not allowed and candidate.exists():
# Block obvious system paths
str_path = str(candidate).lower()
_blocked_prefixes = (
"c:\\windows", "c:\\program files", "c:\\programdata",
"/usr/", "/etc/", "/sbin/", "/boot/",
)
if not any(str_path.startswith(p) for p in _blocked_prefixes):
allowed = True
if not allowed:
return web.Response(status=403, text="path not in allowed directories",
headers=_CORS_HEADERS)
try:
if os.name == "nt":
os.startfile(str(candidate))
else:
import subprocess
subprocess.Popen(["xdg-open" if sys.platform == "linux" else "open", str(candidate)])
except OSError as exc:
return web.Response(status=500, text=f"open error: {exc}",
headers=_CORS_HEADERS)
return web.json_response({"ok": True}, headers=_CORS_HEADERS)
app.router.add_route("OPTIONS", "/api/open-file", _handle_options)
app.router.add_post("/api/open-file", _handle_open_file)
# ---------------------------------------------------------------------------
# Open folder — opens Windows Explorer / macOS Finder / Linux file manager
# focusing on a specific file.
# ---------------------------------------------------------------------------
async def _handle_open_folder(request: web.Request) -> web.Response:
"""POST /api/open-folder — open file manager and select a file.
Body: { "path": "D:\\folder\\file.txt" }
"""
try:
body = await request.json()
except Exception:
return web.Response(status=400, text="invalid JSON", headers=_CORS_HEADERS)
raw_path = body.get("path", "") if isinstance(body, dict) else ""
if not raw_path:
return web.Response(status=400, text="missing path", headers=_CORS_HEADERS)
try:
if raw_path.startswith("file:///"):
raw_path = raw_path[len("file:///"):]
elif raw_path.startswith("file://"):
raw_path = raw_path[len("file://"):]
raw_path = urllib.parse.unquote(raw_path)
candidate = Path(raw_path).resolve()
except (OSError, ValueError):
return web.Response(status=400, text="invalid path", headers=_CORS_HEADERS)
# Re-use same security logic as _handle_open_file
allowed = False
for allowed_dir in _LOCAL_IMAGE_ALLOWED_DIRS:
try:
candidate.relative_to(allowed_dir.resolve())
allowed = True
break
except (ValueError, OSError):
continue
if not allowed and candidate.exists():
str_path = str(candidate).lower()
_blocked_prefixes = (
"c:\\windows", "c:\\program files", "c:\\programdata",
"/usr/", "/etc/", "/sbin/", "/boot/",
)
if not any(str_path.startswith(p) for p in _blocked_prefixes):
allowed = True
if not allowed:
return web.Response(status=403, text="path not in allowed directories",
headers=_CORS_HEADERS)
try:
if os.name == "nt":
import subprocess
subprocess.Popen(["explorer", "/select,", str(candidate)])
elif sys.platform == "darwin":
import subprocess
subprocess.Popen(["open", "-R", str(candidate)])
else:
import subprocess
# Fallback: open parent directory
parent = str(candidate.parent) if candidate.exists() else str(candidate)
subprocess.Popen(["xdg-open", parent])
except OSError as exc:
return web.Response(status=500, text=f"open error: {exc}",
headers=_CORS_HEADERS)
return web.json_response({"ok": True}, headers=_CORS_HEADERS)
app.router.add_route("OPTIONS", "/api/open-folder", _handle_options)
app.router.add_post("/api/open-folder", _handle_open_folder)
# ---------------------------------------------------------------------------
# Proxy settings endpoints — read / test / set HTTP proxy
# ---------------------------------------------------------------------------
_PROXY_HOST_DEFAULT = "172.18.193.161"
_PROXY_PORT_DEFAULT = 80
_NO_PROXY_DEFAULT = "localhost,127.0.0.1,.pegatroncorp.com,172.18.212.48,172.18.212.49"
def _mask_proxy_password(url: str) -> str:
"""Return proxy URL with password masked as ***."""
try:
parsed = urllib.parse.urlparse(url)
if parsed.username and parsed.password:
userinfo = f"{parsed.username}:{parsed.password}"
netloc = parsed.netloc.replace(userinfo, f"{parsed.username}:***", 1)
return urllib.parse.urlunparse(parsed._replace(netloc=netloc))
except Exception:
pass
return url
def _parse_proxy_env() -> dict:
"""Parse HTTP_PROXY / HTTPS_PROXY / NO_PROXY from os.environ."""
http_proxy = (os.environ.get("HTTP_PROXY") or os.environ.get("http_proxy") or "").strip()
https_proxy = (os.environ.get("HTTPS_PROXY") or os.environ.get("https_proxy") or "").strip()
no_proxy = (os.environ.get("NO_PROXY") or os.environ.get("no_proxy") or "").strip()
if (not http_proxy or not https_proxy or not no_proxy) and os.name == "nt":
try:
from update_manager import _read_windows_user_env_var
if not http_proxy:
http_proxy = (_read_windows_user_env_var("HTTP_PROXY") or _read_windows_user_env_var("http_proxy") or "").strip()
if not https_proxy:
https_proxy = (_read_windows_user_env_var("HTTPS_PROXY") or _read_windows_user_env_var("https_proxy") or "").strip()
if not no_proxy:
no_proxy = (_read_windows_user_env_var("NO_PROXY") or _read_windows_user_env_var("no_proxy") or "").strip()
except Exception:
pass
proxy_url = http_proxy or https_proxy
has_proxy = bool(proxy_url)
result: dict = {
"httpProxy": _mask_proxy_password(http_proxy) if http_proxy else "",
"httpsProxy": _mask_proxy_password(https_proxy) if https_proxy else "",
"hasProxy": has_proxy,
"proxyHost": "",
"proxyPort": None,
"proxyUser": "",
"noProxy": no_proxy,
}
if has_proxy:
try:
parsed = urllib.parse.urlparse(proxy_url)
result["proxyHost"] = parsed.hostname or ""
result["proxyPort"] = parsed.port
result["proxyUser"] = parsed.username or ""
except Exception:
pass
return result
def _set_user_env_var(name: str, value: str) -> None:
"""Write a Windows user environment variable (persistent) and update current process."""
if os.name == "nt":
import winreg
import ctypes
key = winreg.OpenKey(
winreg.HKEY_CURRENT_USER,
r"Environment",
0,
winreg.KEY_SET_VALUE,
)
winreg.SetValueEx(key, name, 0, winreg.REG_SZ, value)
winreg.CloseKey(key)
# Notify other applications to reload environment
ctypes.windll.user32.SendMessageW(0xFFFF, 0x001A, 0, "Environment") # type: ignore[attr-defined]
# Also update current process environment (immediate effect)
os.environ[name] = value
async def _handle_get_proxy(request: web.Request) -> web.Response:
"""GET /api/proxy — return current proxy status."""
return web.Response(
text=json.dumps(_parse_proxy_env()),
content_type="application/json",
headers=_CORS_HEADERS,
)
async def _handle_test_proxy(request: web.Request) -> web.Response:
"""POST /api/proxy/test — test proxy connectivity."""
import time
proxy_url = (os.environ.get("HTTP_PROXY") or os.environ.get("http_proxy")
or os.environ.get("HTTPS_PROXY") or os.environ.get("https_proxy") or "").strip()
if not proxy_url and os.name == "nt":
try:
from update_manager import _read_windows_user_env_var
proxy_url = (_read_windows_user_env_var("HTTP_PROXY") or _read_windows_user_env_var("http_proxy")
or _read_windows_user_env_var("HTTPS_PROXY") or _read_windows_user_env_var("https_proxy") or "").strip()
except Exception:
proxy_url = ""
if not proxy_url:
return web.Response(
text=json.dumps({
"success": False,
"target": "",
"statusCode": None,
"responseTimeMs": None,
"error": "No proxy configured (HTTP_PROXY / HTTPS_PROXY not set)",
}),
content_type="application/json",
headers=_CORS_HEADERS,
)
# Build opener using existing update_manager helpers
install_root = Path(__file__).parent.parent if not getattr(sys, "frozen", False) else Path(sys.executable).parent.parent
from update_manager import _make_opener, _get_ca_bundle
# Fallback test targets: try gstatic first, then msftconnecttest if blocked
_TEST_TARGETS = [
"https://connectivitycheck.gstatic.com/generate_204",
"http://www.msftconnecttest.com/connecttest.txt",
]
opener = _make_opener(install_root)
start = time.monotonic()
last_error: str = ""
last_target: str = ""
last_status_code = None
for test_target in _TEST_TARGETS:
try:
req = urllib.request.Request(test_target, method="GET")
req.add_header("User-Agent", "MEC-Agent-ProxyTest/1.0")
resp = opener.open(req, timeout=10)
elapsed_ms = int((time.monotonic() - start) * 1000)
status_code = resp.getcode()
resp.close()
return web.Response(
text=json.dumps({
"success": True,
"target": test_target,
"statusCode": status_code,
"responseTimeMs": elapsed_ms,
"error": None,
}),
content_type="application/json",
headers=_CORS_HEADERS,
)
except urllib.error.HTTPError as exc:
last_target = test_target
last_status_code = exc.code
last_error = f"{exc.code} {exc.reason}"
continue
except Exception as exc:
last_target = test_target
last_error = str(exc)
continue
# All targets failed
elapsed_ms = int((time.monotonic() - start) * 1000)
return web.Response(
text=json.dumps({
"success": False,
"target": last_target,
"statusCode": last_status_code,
"responseTimeMs": elapsed_ms,
"error": last_error,
}),
content_type="application/json",
headers=_CORS_HEADERS,
)
async def _handle_patch_proxy(request: web.Request) -> web.Response:
"""PATCH /api/proxy — set proxy credentials and write to env vars."""
try:
body = await request.json()
except Exception:
return web.Response(status=400, text=json.dumps({"error": "invalid JSON"}),
content_type="application/json", headers=_CORS_HEADERS)
username = str(body.get("username", "")).strip()
password = str(body.get("password", "")).strip()
proxy_host = str(body.get("proxyHost", _PROXY_HOST_DEFAULT)).strip()
proxy_port = body.get("proxyPort", _PROXY_PORT_DEFAULT)
no_proxy = str(body.get("noProxy", _NO_PROXY_DEFAULT)).strip()
if not username or not password:
return web.Response(
status=400,
text=json.dumps({"error": "username and password are required"}),
content_type="application/json",
headers=_CORS_HEADERS,
)
# Build proxy URL: http://user:pass@host:port
encoded_user = urllib.parse.quote(username, safe="")
encoded_pass = urllib.parse.quote(password, safe="")
proxy_url = f"http://{encoded_user}:{encoded_pass}@{proxy_host}:{proxy_port}"
# Write to Windows user environment variables (persistent)
_set_user_env_var("HTTP_PROXY", proxy_url)
_set_user_env_var("HTTPS_PROXY", proxy_url)
if no_proxy:
_set_user_env_var("NO_PROXY", no_proxy)
_set_user_env_var("no_proxy", no_proxy)
masked_url = _mask_proxy_password(proxy_url)
return web.Response(
text=json.dumps({
"ok": True,
"httpProxy": masked_url,
"message": "已寫入使用者環境變數,並更新當前進程環境",
}),
content_type="application/json",
headers=_CORS_HEADERS,
)
app.router.add_route("OPTIONS", "/api/proxy", _handle_options)
app.router.add_route("OPTIONS", "/api/proxy/test", _handle_options)
app.router.add_get("/api/proxy", _handle_get_proxy)
app.router.add_post("/api/proxy/test", _handle_test_proxy)
app.router.add_patch("/api/proxy", _handle_patch_proxy)
# ---------------------------------------------------------------------------
# Pega AI connection endpoint — verify provider apiBase + apiKey
# ---------------------------------------------------------------------------
def _read_pega_ai_config() -> dict:
"""Read the active custom provider settings from config.json."""
result = {
"provider": "custom",
"model": "",
"apiBase": "",
"hasApiKey": False,
"maskedApiKey": "",
"configured": False,
}
if not config_path or not config_path.exists():
return result
try:
data = json.loads(config_path.read_text(encoding="utf-8-sig"))
defaults = data.get("agents", {}).get("defaults", {}) or {}
provider_name = str(defaults.get("provider", "custom") or "custom").strip()
providers = data.get("providers", {}) or {}
provider_cfg = providers.get(provider_name, {}) if isinstance(providers, dict) else {}
api_base = str(provider_cfg.get("apiBase", "") or "").strip()
api_key = str(provider_cfg.get("apiKey", "") or "").strip()
model = str(defaults.get("model", "") or "").strip()
masked = (api_key[:4] + "***" + api_key[-4:]) if len(api_key) > 8 else ("***" if api_key else "")
result.update({
"provider": provider_name,
"model": model,
"apiBase": api_base,
"hasApiKey": bool(api_key),
"maskedApiKey": masked,
"configured": bool(api_base and api_key and model),
})
except Exception:
pass
return result
async def _handle_get_pega_ai_status(request: web.Request) -> web.Response:
"""GET /api/pegaai/status — return configured provider info without secrets."""
return web.Response(
text=json.dumps(_read_pega_ai_config()),
content_type="application/json",
headers=_CORS_HEADERS,
)
async def _handle_test_pega_ai(request: web.Request) -> web.Response:
"""POST /api/pegaai/test — verify apiBase + apiKey by issuing a tiny chat request."""
cfg = _read_pega_ai_config()
api_base = cfg.get("apiBase", "")
api_key = ""
model = cfg.get("model", "")
if not config_path or not config_path.exists():
return web.Response(
text=json.dumps({
"success": False,
"apiBase": "",
"model": "",
"statusCode": None,
"responseTimeMs": None,
"error": "config.json not found",
}),
content_type="application/json",
headers=_CORS_HEADERS,
)
try:
data = json.loads(config_path.read_text(encoding="utf-8-sig"))
defaults = data.get("agents", {}).get("defaults", {}) or {}
provider_name = str(defaults.get("provider", "custom") or "custom").strip()
providers = data.get("providers", {}) or {}
provider_cfg = providers.get(provider_name, {}) if isinstance(providers, dict) else {}
api_base = str(provider_cfg.get("apiBase", "") or "").strip()
api_key = str(provider_cfg.get("apiKey", "") or "").strip()
model = str(defaults.get("model", "") or "").strip()
except Exception as exc:
return web.Response(
text=json.dumps({
"success": False,
"apiBase": "",
"model": "",
"statusCode": None,
"responseTimeMs": None,
"error": f"Failed to read config: {exc}",
}),
content_type="application/json",
headers=_CORS_HEADERS,
)
if not api_base or not api_key or not model:
return web.Response(
text=json.dumps({
"success": False,
"apiBase": api_base,
"model": model,
"statusCode": None,
"responseTimeMs": None,
"error": "apiBase/apiKey/model is missing",
}),
content_type="application/json",
headers=_CORS_HEADERS,
)
chat_url = f"{api_base.rstrip('/')}/chat/completions"
payload = {
"model": model,
"messages": [
{"role": "user", "content": "請只回覆一個字:ok"},
],
"max_tokens": 1,
"temperature": 0,
"stream": False,
}
headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json",
"Accept": "application/json",
}
import time
start = time.monotonic()
try:
async with httpx.AsyncClient(timeout=httpx.Timeout(20.0), trust_env=True) as client:
response = await client.post(chat_url, headers=headers, json=payload)
elapsed_ms = int((time.monotonic() - start) * 1000)
if response.status_code >= 400:
body_text = response.text[:500] if response.text else ""
return web.Response(
text=json.dumps({
"success": False,
"apiBase": api_base,
"model": model,
"statusCode": response.status_code,
"responseTimeMs": elapsed_ms,
"error": body_text or f"HTTP {response.status_code}",
}, ensure_ascii=False),
content_type="application/json",
headers=_CORS_HEADERS,
)
reply_text = ""
try:
body = response.json()
choices = body.get("choices", []) if isinstance(body, dict) else []
if choices:
message = choices[0].get("message", {}) if isinstance(choices[0], dict) else {}
reply_text = str(message.get("content", "") or "").strip()
except Exception:
pass
return web.Response(
text=json.dumps({
"success": True,
"apiBase": api_base,
"model": model,
"statusCode": response.status_code,
"responseTimeMs": elapsed_ms,
"reply": reply_text,
"error": None,
}, ensure_ascii=False),
content_type="application/json",
headers=_CORS_HEADERS,
)
except Exception as exc:
elapsed_ms = int((time.monotonic() - start) * 1000)
return web.Response(
text=json.dumps({
"success": False,
"apiBase": api_base,
"model": model,
"statusCode": None,
"responseTimeMs": elapsed_ms,
"error": str(exc),
}, ensure_ascii=False),
content_type="application/json",
headers=_CORS_HEADERS,
)
app.router.add_route("OPTIONS", "/api/pegaai/status", _handle_options)
app.router.add_route("OPTIONS", "/api/pegaai/test", _handle_options)
app.router.add_get("/api/pegaai/status", _handle_get_pega_ai_status)
app.router.add_post("/api/pegaai/test", _handle_test_pega_ai)
# ---------------------------------------------------------------------------
# Ein-Wiki launch endpoints
# ---------------------------------------------------------------------------
_WIKI_URL = "http://localhost:3001"
def _wiki_paths() -> "tuple[str, str] | None":
"""Return (node_exe, server_js) if the ein-wiki folder is present, else None."""
d = args.mec_wiki or ""
if not d:
return None
node_exe = os.path.join(d, "runtime", "node.exe")
server_js = os.path.join(d, "server.js")
if os.path.exists(node_exe) and os.path.exists(server_js):
return node_exe, server_js
return None
async def _handle_wiki_status(request: web.Request) -> web.Response:
return web.Response(
text=json.dumps({"available": _wiki_paths() is not None}),
content_type="application/json",
headers=_CORS_HEADERS,
)
async def _handle_wiki_start(request: web.Request) -> web.Response:
paths = _wiki_paths()
if not paths:
return web.Response(
status=404,
text=json.dumps({"error": "Ein-Wiki not found"}),
content_type="application/json",
headers=_CORS_HEADERS,
)
node_exe, server_js = paths
wiki_dir = os.path.dirname(server_js)
import subprocess
wiki_env = os.environ.copy()
vault_path = getattr(args, "mec_wiki_vault", None) or ""
cmd = [node_exe, server_js]
if vault_path:
cmd += ["--vault", vault_path]
wiki_env["WIKI_VAULT_PATH"] = vault_path
subprocess.Popen(
cmd,
cwd=wiki_dir,
env=wiki_env,
creationflags=0x00000008, # DETACHED_PROCESS
close_fds=True,
)
return web.Response(
text=json.dumps({"ok": True, "url": _WIKI_URL}),
content_type="application/json",
headers=_CORS_HEADERS,
)
app.router.add_route("OPTIONS", "/api/wiki/status", _handle_options)
app.router.add_route("OPTIONS", "/api/wiki/start", _handle_options)
app.router.add_get("/api/wiki/status", _handle_wiki_status)
app.router.add_post("/api/wiki/start", _handle_wiki_start)
# ---------------------------------------------------------------------------
# Log viewer endpoint — read last N lines from log files
# ---------------------------------------------------------------------------
def _get_log_dir() -> Path | None:
"""Return the logs/ directory next to the installation root."""
try:
if getattr(sys, "frozen", False):
# Running as PyInstaller exe: <install>/api_server/api_server.exe
install_root = Path(sys.executable).parent.parent
else:
# Running as script: <install>/api_server/server.py
install_root = Path(__file__).parent.parent
return install_root / "logs"
except Exception:
return None
_LOG_FILE_MAP: dict[str, str] = {
"nanobot_out": "mec_agent_gateway.out.log",
"nanobot_err": "mec_agent_gateway.err.log",
"api_out": "api_server.out.log",
"api_err": "api_server.err.log",
"llama_out": "llama_server.out.log",
"llama_err": "llama_server.err.log",
}
async def _handle_get_logs(request: web.Request) -> web.Response:
log_dir = _get_log_dir()
if not log_dir:
return web.Response(
status=503,
text=json.dumps({"error": "log directory unavailable"}),
content_type="application/json",
headers=_CORS_HEADERS,
)
file_key = request.rel_url.query.get("file", "nanobot_out")
if file_key not in _LOG_FILE_MAP:
file_key = "nanobot_out"
filename = _LOG_FILE_MAP[file_key]
try:
lines_param = min(max(int(request.rel_url.query.get("lines", "300")), 1), 5000)
except ValueError:
lines_param = 300
log_path = log_dir / filename
lines: list[str] = []
if log_path.exists():
try:
content = log_path.read_text(encoding="utf-8", errors="replace")
all_lines = content.splitlines()
lines = all_lines[-lines_param:]
except Exception as exc:
lines = [f"[error reading log: {exc}]"]
available = [k for k, v in _LOG_FILE_MAP.items() if (log_dir / v).exists()]
return web.Response(
text=json.dumps({
"file": file_key,
"filename": filename,
"lines": lines,
"available": available,
}, ensure_ascii=False),
content_type="application/json",
headers=_CORS_HEADERS,
)
app.router.add_route("OPTIONS", "/api/logs", _handle_options)
app.router.add_get("/api/logs", _handle_get_logs)
# ---------------------------------------------------------------------------
# Version endpoint
# ---------------------------------------------------------------------------
async def _handle_get_version(_request: web.Request) -> web.Response:
return web.Response(
text=json.dumps({"ui": _APP_VERSION, "backend": _APP_VERSION}),
content_type="application/json",
headers=_CORS_HEADERS,
)
app.router.add_route("OPTIONS", "/api/version", _handle_options)
app.router.add_get("/api/version", _handle_get_version)
# ---------------------------------------------------------------------------
# Changelog endpoint — serve parsed CHANGELOG.md as structured JSON
# ---------------------------------------------------------------------------
_CHANGELOG_CACHE: dict = {"mtime": 0.0, "data": None}
def _parse_changelog(text: str) -> list:
"""Parse a Keep-a-Changelog formatted string into a list of version entries.
Each entry: { version, date, sections: { Added|Fixed|Changed|...: [items] } }
"""
import re as _re
entries: list = []
current_entry = None
current_section = None
for raw_line in text.splitlines():
line = raw_line
# Version heading: ## [X.Y.Z] - YYYY-MM-DD or ## [Unreleased]
m = _re.match(r'^##\s+\[([^\]]+)\](?:\s*-\s*(\d{4}-\d{2}-\d{2}))?', line)
if m:
if current_entry is not None:
entries.append(current_entry)
current_entry = {
"version": m.group(1),
"date": m.group(2) or "",
"sections": {},
}
current_section = None
continue
# Section heading: ### Added, ### Fixed, ### Changed, etc.
m2 = _re.match(r'^###\s+(\w+)', line)
if m2 and current_entry is not None:
current_section = m2.group(1)
current_entry["sections"][current_section] = []
continue
# Item line: - description
if current_entry is not None and current_section is not None:
item = line.strip()
if item.startswith("- "):
current_entry["sections"][current_section].append(item[2:])
elif item == "":
continue
# Skip --- separator lines
if current_entry is not None:
entries.append(current_entry)
return entries
async def _handle_get_changelog(_request: web.Request) -> web.Response:
"""GET /api/changelog — return parsed CHANGELOG.md as structured JSON."""
nonlocal _CHANGELOG_CACHE
try:
install_root = _get_install_root()
# CHANGELOG.md lives alongside version.json in update/ directory.
candidates = [
install_root / "update" / "CHANGELOG.md",
install_root / "CHANGELOG.md",
install_root.parent / "CHANGELOG.md",
]
changelog_path = None
for c in candidates:
if c.exists():
changelog_path = c
break
if changelog_path is None:
return web.Response(
status=404,
text=json.dumps({"error": "CHANGELOG.md not found"}),
content_type="application/json",
headers=_CORS_HEADERS,
)
mtime = changelog_path.stat().st_mtime
if _CHANGELOG_CACHE["data"] is not None and _CHANGELOG_CACHE["mtime"] == mtime:
data = _CHANGELOG_CACHE["data"]
else:
raw = changelog_path.read_text(encoding="utf-8")
data = _parse_changelog(raw)
_CHANGELOG_CACHE = {"mtime": mtime, "data": data}
return web.Response(
text=json.dumps(data, ensure_ascii=False),
content_type="application/json",
headers=_CORS_HEADERS,
)
except Exception as e:
return web.Response(
status=500,
text=json.dumps({"error": str(e)}),
content_type="application/json",
headers=_CORS_HEADERS,
)
app.router.add_route("OPTIONS", "/api/changelog", _handle_options)
app.router.add_get("/api/changelog", _handle_get_changelog)
# ---------------------------------------------------------------------------
# Speech-to-text transcription endpoint — uses local faster-whisper model
# ---------------------------------------------------------------------------
def _get_whisper_model_path() -> Path:
"""Return path to the faster-whisper model directory.
Priority:
1. Value stored in user_overrides.json (user-configurable via Settings UI)
2. Bundled path: <api_server_dir>/whisper/faster_whisper_tiny
"""
overrides_file = _overrides_path()
if overrides_file and overrides_file.exists():
try:
data = json.loads(overrides_file.read_text(encoding="utf-8-sig"))
custom = data.get("whisperModelPath", "").strip()
if custom:
return Path(custom)
except Exception:
pass
# Default bundled path
if getattr(sys, "frozen", False):
base = Path(sys.executable).parent
else:
base = Path(__file__).parent
return base / "whisper" / "faster_whisper_tiny"
_whisper_model_cache: dict = {}
def _load_whisper_model():
"""Load and cache the faster-whisper model (lazy, thread-safe enough for single-worker use)."""
if "model" in _whisper_model_cache:
return _whisper_model_cache["model"]
model_path = _get_whisper_model_path()
if not model_path.exists():
raise FileNotFoundError(f"Whisper model not found at: {model_path}")
try:
from faster_whisper import WhisperModel # type: ignore
model = WhisperModel(str(model_path), device="cpu", compute_type="int8")
_whisper_model_cache["model"] = model
print(f"[api_server] Whisper model loaded from: {model_path}", flush=True)
return model
except ImportError:
raise ImportError("faster-whisper is not installed. Run: pip install faster-whisper")
async def _handle_transcribe(request: web.Request) -> web.Response:
"""POST /api/transcribe — accept audio file (multipart or raw), return transcribed text."""
import tempfile
import asyncio
try:
# Accept multipart/form-data (field name: "audio") or raw body
content_type = request.headers.get("Content-Type", "")
if "multipart/form-data" in content_type:
reader = await request.multipart()
field = await reader.next()
if field is None or field.name != "audio":
return web.Response(
status=400,
text=json.dumps({"error": "expected multipart field 'audio'"}),
content_type="application/json",
headers=_CORS_HEADERS,
)
audio_bytes = await field.read(decode=True)
filename = field.filename or "audio.webm"
else:
audio_bytes = await request.read()
filename = "audio.webm"
if not audio_bytes:
return web.Response(
status=400,
text=json.dumps({"error": "empty audio data"}),
content_type="application/json",
headers=_CORS_HEADERS,
)
# Write to temp file with correct extension for ffmpeg detection
suffix = Path(filename).suffix or ".webm"
with tempfile.NamedTemporaryFile(suffix=suffix, delete=False) as tmp:
tmp.write(audio_bytes)
tmp_path = tmp.name
try:
loop = asyncio.get_event_loop()
transcript = await loop.run_in_executor(None, _do_transcribe, tmp_path)
finally:
try:
os.unlink(tmp_path)
except Exception:
pass
return web.Response(
text=json.dumps({"text": transcript}),
content_type="application/json",
headers=_CORS_HEADERS,
)
except FileNotFoundError as e:
return web.Response(
status=503,
text=json.dumps({"error": str(e)}),
content_type="application/json",
headers=_CORS_HEADERS,
)
except ImportError as e:
return web.Response(
status=503,
text=json.dumps({"error": str(e)}),
content_type="application/json",
headers=_CORS_HEADERS,
)
except Exception as e:
print(f"[api_server] transcribe error: {e}", file=sys.stderr)
return web.Response(
status=500,
text=json.dumps({"error": f"transcription failed: {e}"}),
content_type="application/json",
headers=_CORS_HEADERS,
)
def _do_transcribe(audio_path: str) -> str:
"""Run faster-whisper transcription synchronously (called in executor)."""
model = _load_whisper_model()
segments, _info = model.transcribe(audio_path, beam_size=5)
return "".join(seg.text for seg in segments).strip()
app.router.add_route("OPTIONS", "/api/transcribe", _handle_options)
app.router.add_post("/api/transcribe", _handle_transcribe)
_UPLOAD_MAX_MB_DEFAULT = 200
_UPLOAD_CHUNK = 1024 * 1024 # 1 MB per read_chunk
_uploads_dir_state = {"value": None}
def _get_upload_max_mb() -> int:
"""Read upload max MB from config.json, fallback to 200."""
if config_path and config_path.exists():
try:
data = json.loads(config_path.read_text(encoding="utf-8-sig"))
return int(data.get("api", {}).get("uploadMaxMb", _UPLOAD_MAX_MB_DEFAULT))
except Exception:
pass
return _UPLOAD_MAX_MB_DEFAULT
def _get_uploads_dir() -> Path | None:
"""Lazy-resolve the uploads directory so workspace_path is ready."""
print(f"[api_server] debug workspace_path={workspace_path!r}")
if _uploads_dir_state["value"] is None and workspace_path:
_uploads_dir_state["value"] = workspace_path / "uploads"
try:
_uploads_dir_state["value"].mkdir(parents=True, exist_ok=True)
except Exception as e:
print(f"[api_server] cannot create uploads dir: {e}", file=sys.stderr)
_uploads_dir_state["value"] = None
return _uploads_dir_state["value"]
async def _handle_upload(request: web.Request) -> web.Response:
"""POST /api/upload — multipart file upload. Returns {path,name,size}."""
uploads_dir = _get_uploads_dir()
if not uploads_dir:
return web.Response(
status=503,
text=json.dumps({"error": "workspace unavailable"}),
content_type="application/json",
headers=_CORS_HEADERS,
)
max_mb = _get_upload_max_mb()
max_bytes = max_mb * 1024 * 1024
def _size_exceeded_resp() -> web.Response:
return web.Response(
status=413,
text=json.dumps({
"error": f"total upload exceeds {max_mb} MB",
"maxSizeMb": max_mb,
"suggestion": (
"對於大型檔案,請直接在對話中輸入檔案路徑"
"(例如 D:\\design\\part.stp),"
"AI Agent 可直接讀取本機檔案。"
),
"suggestionEn": (
"For large files, enter the file path directly in the chat"
" (e.g. D:\\design\\part.stp)."
" The AI Agent can read local files directly."
),
"code": "UPLOAD_SIZE_EXCEEDED",
}),
content_type="application/json",
headers=_CORS_HEADERS,
)
try:
reader = await request.multipart()
results: list[dict] = []
total_size = 0
while True:
field = await reader.next()
if field is None:
break
# Skip non-file fields
if not getattr(field, "filename", None):
# ponytail: drain non-file fields with chunked read
while await field.read_chunk(_UPLOAD_CHUNK):
pass
continue
name = field.filename or "unnamed"
safe_name = Path(name).name
dest_name = f"{uuid.uuid4().hex}_{safe_name}"
dest_path = uploads_dir / dest_name
file_size = 0
try:
with open(dest_path, "wb") as f:
while True:
chunk = await field.read_chunk(_UPLOAD_CHUNK)
if not chunk:
break
total_size += len(chunk)
file_size += len(chunk)
if total_size > max_bytes:
# Exceeded — remove partial file and reject
try:
dest_path.unlink(missing_ok=True)
except Exception:
pass
return _size_exceeded_resp()
f.write(chunk)
except Exception as e:
try:
dest_path.unlink(missing_ok=True)
except Exception:
pass
raise
if file_size == 0:
try:
dest_path.unlink(missing_ok=True)
except Exception:
pass
continue
results.append({
"path": str(dest_path),
"name": safe_name,
"size": file_size,
})
if not results:
return web.Response(
status=400,
text=json.dumps({"error": "no files received"}),
content_type="application/json",
headers=_CORS_HEADERS,
)
return web.Response(
text=json.dumps(results),
content_type="application/json",
headers=_CORS_HEADERS,
)
except Exception as e:
print(f"[api_server] upload error: {e}", file=sys.stderr)
return web.Response(
status=500,
text=json.dumps({"error": f"upload failed: {e}"}),
content_type="application/json",
headers=_CORS_HEADERS,
)
async def _handle_upload_config(_request: web.Request) -> web.Response:
"""GET /api/upload/config — return current upload size limit."""
return web.Response(
text=json.dumps({"maxSizeMb": _get_upload_max_mb()}),
content_type="application/json",
headers=_CORS_HEADERS,
)
app.router.add_route("OPTIONS", "/api/upload", _handle_options)
app.router.add_post("/api/upload", _handle_upload)
app.router.add_get("/api/upload/config", _handle_upload_config)
import getpass as _getpass
async def _handle_whoami(_request: web.Request) -> web.Response:
username = ""
try:
username = _getpass.getuser()
except Exception:
try:
username = os.getlogin()
except Exception:
username = "User"
return web.Response(
text=json.dumps({"username": username}),
content_type="application/json",
headers=_CORS_HEADERS,
)
app.router.add_route("OPTIONS", "/api/whoami", _handle_options)
app.router.add_get("/api/whoami", _handle_whoami)
# ---------------------------------------------------------------------------
# Static docs endpoint — serve HTML files from the docs/ directory
# ---------------------------------------------------------------------------
def _get_docs_dir() -> Path:
if getattr(sys, "frozen", False):
# Running as PyInstaller exe: <install_root>/api_server/api_server.exe
# Serve docs from the directory next to the exe so files can be
# updated independently without rebuilding the executable.
return Path(sys.executable).parent / "docs"
return Path(__file__).parent / "docs"
async def _handle_doc(request: web.Request) -> web.Response:
filename = request.match_info.get("filename", "")
import re as _re
if not _re.match(r'^[a-zA-Z0-9\-]+\.html$', filename):
return web.Response(status=404, text="Not found", headers=_CORS_HEADERS)
doc_path = _get_docs_dir() / filename
if not doc_path.exists():
return web.Response(status=404, text="Not found", headers=_CORS_HEADERS)
try:
content = doc_path.read_text(encoding="utf-8")
return web.Response(text=content, content_type="text/html",
charset="utf-8", headers=_CORS_HEADERS)
except Exception as e:
return web.Response(status=500, text=str(e), headers=_CORS_HEADERS)
app.router.add_route("OPTIONS", "/api/docs/{filename}", _handle_options)
app.router.add_get("/api/docs/{filename}", _handle_doc)
# ---------------------------------------------------------------------------
# Management endpoints — Cron & Git backed by nanobot SDK
# ---------------------------------------------------------------------------
# Resolve workspace path: prefer --workspace arg, fall back to nanobot SDK config.
workspace_path: Path | None = None
if args.workspace:
workspace_path = Path(args.workspace).expanduser().resolve()
else:
try:
from nanobot.config.loader import load_config # type: ignore
_cfg = load_config(config_path) # uses config_path or ~/.nanobot/config.json
workspace_path = _cfg.workspace_path.resolve()
except Exception as _e:
print(f"[api_server] WARNING: could not resolve workspace via nanobot SDK: {_e}", file=sys.stderr)
# Manual fallback: parse config.json
if config_path and config_path.exists():
try:
_cfg_data = json.loads(config_path.read_text(encoding="utf-8-sig"))
_ws_str = (_cfg_data.get("agents", {}).get("defaults", {}) or {}).get("workspace", "")
if _ws_str:
workspace_path = Path(os.path.expanduser(_ws_str)).resolve()
except Exception as _e2:
print(f"[api_server] WARNING: could not resolve workspace from config: {_e2}", file=sys.stderr)
# If still None (no --config/--workspace passed), try environment variable or common paths.
if not workspace_path:
_data_dir = os.environ.get("NANOBOT_DATA_DIR") or os.environ.get("MEC_DATA_DIR")
if _data_dir:
workspace_path = (Path(_data_dir) / ".nanobot" / "workspace").resolve()
else:
# Probe known default locations: launch_mec_agent.ps1 uses D:\MEC_AgentData on machines with D:
for _candidate in [
Path(os.environ.get("LOCALAPPDATA", "~")) / "MEC_Agent" / ".nanobot" / "workspace",
Path("D:\\MEC_AgentData\\.nanobot\\workspace"),
Path.home() / ".mec_agent" / "workspace",
Path.home() / ".nanobot" / "workspace",
]:
try:
_c = Path(os.path.expanduser(str(_candidate))).resolve()
if (_c / "cron").exists() or (_c / "sessions").exists() or _c.exists():
workspace_path = _c
break
except Exception:
continue
if workspace_path:
print(f"[api_server] workspace_path resolved to: {workspace_path}", flush=True)
# Populate local-image proxy allowed directories now that workspace_path is known
_LOCAL_IMAGE_ALLOWED_DIRS.append(workspace_path / ".playwright-mcp")
else:
print("[api_server] WARNING: workspace_path could not be determined; management endpoints will return 503", file=sys.stderr, flush=True)
# Playwright screenshots can be written under different roots depending on
# launch mode (dev repo root vs installed MEC_Agent root). Allow both.
_local_playwright_dirs: list[Path] = []
if getattr(sys, "frozen", False):
# api_server.exe -> <install_root>\api_server\api_server.exe
# install root is the parent of api_server directory.
_install_root = Path(sys.executable).resolve().parent.parent
_local_playwright_dirs.append(_install_root / ".playwright-mcp")
_local_playwright_dirs.append(_install_root.parent / ".playwright-mcp")
else:
# Dev/source run:
# .../agent_packaging/api_server/server.py
# - parent.parent => agent_packaging
# - parent.parent.parent => repo root
_api_server_dir = Path(__file__).resolve().parent
_agent_packaging_root = _api_server_dir.parent
_local_playwright_dirs.append(_agent_packaging_root / ".playwright-mcp")
_local_playwright_dirs.append(_agent_packaging_root.parent / ".playwright-mcp")
for _d in _local_playwright_dirs:
if _d not in _LOCAL_IMAGE_ALLOWED_DIRS:
_LOCAL_IMAGE_ALLOWED_DIRS.append(_d)
# Also allow Playwright MCP screenshots from the dev workspace
_dev_ws = Path("D:/MEC_AgentData_dev/.mec_agent/workspace/.playwright-mcp")
if _dev_ws.exists():
_LOCAL_IMAGE_ALLOWED_DIRS.append(_dev_ws)
# ── Integrity monitor: restore protected workspace files on startup ──────
if workspace_path:
from integrity_monitor import restore_protected_files, start_monitor
restore_protected_files(workspace_path)
start_monitor(workspace_path, interval=120)
# ── Long-lived CronService + NotificationManager ─────────────────────────
# Previously each CRUD handler called _make_cron_service() which created a
# throwaway instance. We now keep a single long-lived instance so that the
# on_job callback fires when scheduled jobs are due.
cron_svc = None # type: ignore[assignment]
notification_mgr = None # type: ignore[assignment]
def _cron_job_to_dict(job) -> dict:
from dataclasses import asdict
return asdict(job)
async def _cron_on_job(job) -> str | None:
"""CronService on_job callback — push SSE/balloon notification for any job with a message."""
# Any cron job that has a message should trigger a Toast / balloon
# notification, regardless of channel. This covers:
# - channel="notification" (created via the Schedule Management UI)
# - channel="websocket" (created via AI chat)
# - channel=null (system jobs with a user-facing message)
# When channel is "notification" we also return "ok" to suppress the
# default agent_turn delivery so the user is not notified twice.
if job.payload.message and notification_mgr:
await notification_mgr.push(
title=job.name,
message=job.payload.message,
job_id=job.id,
)
if job.payload.channel == "notification":
return "ok"
return None
if workspace_path:
try:
from nanobot.cron.service import CronService # type: ignore
cron_svc = CronService(
store_path=workspace_path / "cron" / "jobs.json",
on_job=_cron_on_job,
)
# ── Cross-process safety for shared jobs.json ──────────────────
# Both api_server and nanobot gateway run separate CronService
# instances sharing the same jobs.json. During _on_timer the
# service holds an in-memory snapshot and skips disk reads; when
# _save_store() is called it overwrites the file with whatever is
# in memory, wiping any jobs added by the other process in the
# meantime.
#
# We cannot modify nanobot source, so we apply two mitigations
# here in api_server:
#
# 1. Monkey-patch our own _save_store to merge disk-only jobs
# before writing (prevents *us* from overwriting *gateway's*
# new jobs).
# 2. After add_job, also append to the action file so that
# gateway's _merge_action() can recover jobs that were
# overwritten by gateway's own _save_store() during its
# _on_timer window.
_orig_save_store = cron_svc._save_store # type: ignore[attr-defined]
from nanobot.cron.types import CronJob as _CronJob # type: ignore
def _patched_save_store():
"""Merge disk-only jobs before writing to prevent cross-process data loss."""
if cron_svc._store and cron_svc.store_path.exists():
try:
disk_data = json.loads(
cron_svc.store_path.read_text(encoding="utf-8")
)
disk_ids = {j.get("id") for j in disk_data.get("jobs", [])}
mem_ids = {j.id for j in cron_svc._store.jobs}
new_ids = disk_ids - mem_ids
if new_ids:
for j in disk_data.get("jobs", []):
if j.get("id") in new_ids:
try:
cron_svc._store.jobs.append(
_CronJob.from_dict(j)
)
except Exception:
pass
except Exception:
pass
_orig_save_store()
cron_svc._save_store = _patched_save_store # type: ignore[attr-defined]
# Helper: also write to action file after add_job so gateway's
# _merge_action() can recover overwritten jobs.
def _append_cron_action(action: str, params: dict):
"""Append a line to the cron action file for cross-process sync."""
action_path = cron_svc.store_path.parent / "action.jsonl"
try:
action_path.parent.mkdir(parents=True, exist_ok=True)
with open(action_path, "a", encoding="utf-8") as f:
f.write(json.dumps({"action": action, "params": params}, ensure_ascii=False) + "\n")
except Exception:
pass # Best-effort; action file is a safety net
except Exception as _e:
print(f"[api_server] WARNING: CronService unavailable: {_e}", file=sys.stderr)
try:
from notification_manager import NotificationManager
notification_mgr = NotificationManager(workspace_path=workspace_path)
except Exception as _e:
print(f"[api_server] WARNING: NotificationManager unavailable: {_e}", file=sys.stderr)
# -- CRON CRUD --
async def _handle_list_cron(request: web.Request) -> web.Response:
if not cron_svc:
return web.Response(status=503, text=json.dumps({"error": "workspace unavailable"}),
content_type="application/json", headers=_CORS_HEADERS)
try:
jobs = cron_svc.list_jobs(include_disabled=True)
return web.Response(
text=json.dumps([_cron_job_to_dict(j) for j in jobs]),
content_type="application/json",
headers=_CORS_HEADERS,
)
except Exception as e:
return web.Response(status=500, text=json.dumps({"error": str(e)}),
content_type="application/json", headers=_CORS_HEADERS)
async def _handle_add_cron_job(request: web.Request) -> web.Response:
"""POST /api/manage/cron — add a new scheduled reminder."""
if not cron_svc:
return web.Response(status=503, text=json.dumps({"error": "workspace unavailable"}),
content_type="application/json", headers=_CORS_HEADERS)
try:
body = await request.json()
except Exception:
return web.Response(status=400, text=json.dumps({"error": "invalid JSON"}),
content_type="application/json", headers=_CORS_HEADERS)
name = (body.get("name") or "").strip()
message = (body.get("message") or "").strip()
if not name or not message:
return web.Response(status=400, text=json.dumps({"error": "name and message required"}),
content_type="application/json", headers=_CORS_HEADERS)
schedule_data = body.get("schedule", {})
from nanobot.cron.types import CronSchedule
schedule = CronSchedule(
kind=schedule_data.get("kind", "every"),
at_ms=schedule_data.get("atMs") or schedule_data.get("at_ms"),
every_ms=schedule_data.get("everyMs") or schedule_data.get("every_ms"),
expr=schedule_data.get("expr"),
tz=schedule_data.get("tz"),
)
# Validate: every schedule needs its required fields populated
if schedule.kind == "every" and (not schedule.every_ms or schedule.every_ms <= 0):
return web.Response(status=400, text=json.dumps({"error": "everyMs required for interval schedule"}),
content_type="application/json", headers=_CORS_HEADERS)
if schedule.kind == "at" and (not schedule.at_ms or schedule.at_ms <= 0):
return web.Response(status=400, text=json.dumps({"error": "atMs required for one-shot schedule"}),
content_type="application/json", headers=_CORS_HEADERS)
if schedule.kind == "cron" and not schedule.expr:
return web.Response(status=400, text=json.dumps({"error": "expr required for cron schedule"}),
content_type="application/json", headers=_CORS_HEADERS)
channel = body.get("channel", "notification")
job = cron_svc.add_job(
name=name,
schedule=schedule,
message=message,
deliver=False,
channel=channel,
)
# Also write to the cron action file so gateway's _merge_action()
# can recover this job if its _save_store() overwrites jobs.json
# during its _on_timer window.
from dataclasses import asdict as _asdict
_append_cron_action("add", _asdict(job))
return web.Response(
status=201,
text=json.dumps(_cron_job_to_dict(job)),
content_type="application/json",
headers=_CORS_HEADERS,
)
async def _handle_patch_cron_job(request: web.Request) -> web.Response:
job_id = request.match_info.get("job_id", "")
if not cron_svc:
return web.Response(status=503, text=json.dumps({"error": "workspace unavailable"}),
content_type="application/json", headers=_CORS_HEADERS)
try:
body = await request.json()
except Exception:
return web.Response(status=400, text=json.dumps({"error": "invalid JSON"}),
content_type="application/json", headers=_CORS_HEADERS)
job = None
if "enabled" in body:
job = cron_svc.enable_job(job_id, bool(body["enabled"]))
if job is None:
return web.Response(status=404, text=json.dumps({"error": "not_found"}),
content_type="application/json", headers=_CORS_HEADERS)
# Write update action so gateway's _merge_action() applies the
# new enabled state instead of recovering the old one from add.
from dataclasses import asdict as _asdict
_append_cron_action("update", _asdict(job))
if "name" in body or "message" in body:
kwargs: dict = {}
if "name" in body:
kwargs["name"] = str(body["name"])
if "message" in body:
kwargs["message"] = str(body["message"])
result = cron_svc.update_job(job_id, **kwargs)
if result in ("not_found", "protected"):
return web.Response(status=404 if result == "not_found" else 403,
text=json.dumps({"error": result}),
content_type="application/json", headers=_CORS_HEADERS)
job = result
# Write update action so gateway's _merge_action() applies the
# new name/message instead of recovering old values from add.
from dataclasses import asdict as _asdict
_append_cron_action("update", _asdict(job))
if job is None:
job = cron_svc.get_job(job_id)
return web.Response(
text=json.dumps(_cron_job_to_dict(job) if job else {}),
content_type="application/json",
headers=_CORS_HEADERS,
)
async def _handle_delete_cron_job(request: web.Request) -> web.Response:
job_id = request.match_info.get("job_id", "")
if not cron_svc:
return web.Response(status=503, text=json.dumps({"error": "workspace unavailable"}),
content_type="application/json", headers=_CORS_HEADERS)
result = cron_svc.remove_job(job_id)
if result == "not_found":
return web.Response(status=404, text=json.dumps({"error": "not_found"}),
content_type="application/json", headers=_CORS_HEADERS)
if result == "protected":
return web.Response(status=403, text=json.dumps({"error": "protected system job"}),
content_type="application/json", headers=_CORS_HEADERS)
# Also write a delete action so gateway's _merge_action() removes
# the job instead of recovering it from a previous add action.
_append_cron_action("del", {"job_id": job_id})
return web.Response(
text=json.dumps({"deleted": True}),
content_type="application/json",
headers=_CORS_HEADERS,
)
# -- NOTIFICATIONS (SSE + list + mark-read) --
async def _handle_notifications_stream(request: web.Request) -> web.StreamResponse:
"""SSE endpoint: /api/notifications/stream.
Clients connect with EventSource and receive push events in real time."""
if not notification_mgr:
return web.Response(status=503, text=json.dumps({"error": "workspace unavailable"}),
content_type="application/json", headers=_CORS_HEADERS)
resp = web.StreamResponse(
status=200,
headers={
"Content-Type": "text/event-stream",
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no",
**_CORS_HEADERS,
},
)
await resp.prepare(request)
# Send an initial comment to ensure the connection is established
await resp.write(b": connected\n\n")
q = notification_mgr.subscribe_sse()
try:
while True:
event = await q.get()
data = json.dumps(event, ensure_ascii=False)
await resp.write(f"data: {data}\n\n".encode("utf-8"))
except (ConnectionResetError, asyncio.CancelledError, ConnectionError):
pass
finally:
notification_mgr.unsubscribe_sse(q)
return resp
async def _handle_list_notifications(request: web.Request) -> web.Response:
"""GET /api/notifications — recent notification list."""
if not notification_mgr:
return web.Response(status=503, text=json.dumps({"error": "workspace unavailable"}),
content_type="application/json", headers=_CORS_HEADERS)
limit = int(request.rel_url.query.get("limit", "50"))
events = await notification_mgr.list_recent(limit=limit)
return web.Response(
text=json.dumps(events),
content_type="application/json",
headers=_CORS_HEADERS,
)
async def _handle_mark_notification_read(request: web.Request) -> web.Response:
"""PATCH /api/notifications/{id}/read."""
if not notification_mgr:
return web.Response(status=503, text=json.dumps({"error": "workspace unavailable"}),
content_type="application/json", headers=_CORS_HEADERS)
event_id = request.match_info.get("id", "")
found = await notification_mgr.mark_read(event_id)
if not found:
return web.Response(status=404, text=json.dumps({"error": "not_found"}),
content_type="application/json", headers=_CORS_HEADERS)
return web.Response(
text=json.dumps({"read": True}),
content_type="application/json",
headers=_CORS_HEADERS,
)
# Register management routes
app.router.add_route("OPTIONS", "/api/manage/cron", _handle_options)
app.router.add_route("OPTIONS", "/api/manage/cron/{job_id}", _handle_options)
app.router.add_route("OPTIONS", "/api/notifications", _handle_options)
app.router.add_route("OPTIONS", "/api/notifications/stream", _handle_options)
app.router.add_route("OPTIONS", "/api/notifications/{id}/read", _handle_options)
app.router.add_get( "/api/manage/cron", _handle_list_cron)
app.router.add_post( "/api/manage/cron", _handle_add_cron_job)
app.router.add_patch( "/api/manage/cron/{job_id}", _handle_patch_cron_job)
app.router.add_delete( "/api/manage/cron/{job_id}", _handle_delete_cron_job)
app.router.add_get( "/api/notifications/stream", _handle_notifications_stream)
app.router.add_get( "/api/notifications", _handle_list_notifications)
app.router.add_patch( "/api/notifications/{id}/read", _handle_mark_notification_read)
# ---------------------------------------------------------------------------
# Workspace filesystem endpoints — generic file access + derived views
# ---------------------------------------------------------------------------
_WS_ALLOWED_WRITE = {
"SOUL.md",
"AGENTS.md", "TOOLS.md", "USER.md", "HEARTBEAT.md",
"memory/MEMORY.md",
}
def _safe_ws_path(rel: str) -> Path | None:
"""Resolve rel relative to workspace; return None if path traversal or workspace unknown."""
if not workspace_path:
return None
rel = rel.replace("\\", "/").lstrip("/")
parts = rel.split("/")
if ".." in parts or "" in parts[:-1]:
return None
try:
p = (workspace_path / rel).resolve()
p.relative_to(workspace_path.resolve())
return p
except (ValueError, OSError):
return None
async def _handle_ws_read_file(request: web.Request) -> web.Response:
rel = request.rel_url.query.get("path", "")
if not rel:
return web.Response(status=400, text=json.dumps({"error": "missing path"}),
content_type="application/json", headers=_CORS_HEADERS)
p = _safe_ws_path(rel)
if p is None:
return web.Response(
status=503 if not workspace_path else 400,
text=json.dumps({"error": "workspace unavailable" if not workspace_path else "invalid path"}),
content_type="application/json", headers=_CORS_HEADERS,
)
if not p.exists():
return web.Response(text=json.dumps({"exists": False, "content": ""}),
content_type="application/json", headers=_CORS_HEADERS)
try:
content = p.read_text(encoding="utf-8", errors="replace")
return web.Response(text=json.dumps({"exists": True, "content": content}),
content_type="application/json", headers=_CORS_HEADERS)
except Exception as e:
return web.Response(status=500, text=json.dumps({"error": str(e)}),
content_type="application/json", headers=_CORS_HEADERS)
async def _handle_ws_write_file(request: web.Request) -> web.Response:
rel = request.rel_url.query.get("path", "")
if not rel:
return web.Response(status=400, text=json.dumps({"error": "missing path"}),
content_type="application/json", headers=_CORS_HEADERS)
# Security: only allow writes to explicitly whitelisted relative paths
norm = rel.replace("\\", "/").lstrip("/")
if norm not in _WS_ALLOWED_WRITE:
return web.Response(status=403, text=json.dumps({"error": "write not permitted for this path"}),
content_type="application/json", headers=_CORS_HEADERS)
p = _safe_ws_path(rel)
if p is None:
return web.Response(status=503 if not workspace_path else 400,
text=json.dumps({"error": "workspace unavailable" if not workspace_path else "invalid path"}),
content_type="application/json", headers=_CORS_HEADERS)
try:
body = await request.json()
except Exception:
return web.Response(status=400, text=json.dumps({"error": "invalid JSON"}),
content_type="application/json", headers=_CORS_HEADERS)
content = body.get("content", "")
if not isinstance(content, str):
return web.Response(status=400, text=json.dumps({"error": "content must be a string"}),
content_type="application/json", headers=_CORS_HEADERS)
try:
p.parent.mkdir(parents=True, exist_ok=True)
p.write_text(content, encoding="utf-8")
return web.Response(text=json.dumps({"ok": True}),
content_type="application/json", headers=_CORS_HEADERS)
except Exception as e:
return web.Response(status=500, text=json.dumps({"error": str(e)}),
content_type="application/json", headers=_CORS_HEADERS)
async def _handle_ws_skills(request: web.Request) -> web.Response:
if not workspace_path:
return web.Response(status=503, text=json.dumps({"error": "workspace unavailable"}),
content_type="application/json", headers=_CORS_HEADERS)
skills_dir = workspace_path / "skills"
results: list[dict] = []
if skills_dir.is_dir():
for entry in sorted(skills_dir.iterdir()):
if not entry.is_dir():
continue
skill_md = entry / "SKILL.md"
preview = ""
if skill_md.exists():
try:
text = skill_md.read_text(encoding="utf-8", errors="replace")
for line in text.splitlines():
stripped = line.strip().lstrip("#").strip()
if stripped:
preview = stripped
break
except Exception:
pass
results.append({
"name": entry.name,
"has_skill_md": skill_md.exists(),
"preview": preview,
})
return web.Response(text=json.dumps(results),
content_type="application/json", headers=_CORS_HEADERS)
# ---------------------------------------------------------------------------
# Context-usage endpoint — estimate prompt tokens for a session
# ---------------------------------------------------------------------------
# Cache for the ContextBuilder and tool-definitions token overhead.
# Built once on first request; rebuilt if the workspace or config changes.
_context_builder_cache: dict[str, Any] = {}
def _make_session_manager():
"""Create a SessionManager for the resolved workspace (lazy import)."""
if not workspace_path:
return None
try:
from nanobot.session import SessionManager # type: ignore
return SessionManager(workspace_path)
except Exception as _e:
print(f"[api_server] WARNING: SessionManager unavailable: {_e}", file=sys.stderr)
return None
def _get_or_create_context_builder():
"""Return a (cached) ContextBuilder + tool-overhead for the workspace.
The ContextBuilder builds the full system prompt (identity, bootstrap
files, memory, skills), so token estimates include that overhead.
Tool definitions are estimated separately and cached as a fixed
overhead (they don't vary per session).
"""
cache_key = str(workspace_path)
cached = _context_builder_cache.get(cache_key)
if cached is not None:
return cached
result: dict[str, Any] = {"context_builder": None, "tool_token_overhead": 0}
try:
from nanobot.agent.context import ContextBuilder # type: ignore
result["context_builder"] = ContextBuilder(workspace_path)
except Exception as _e:
print(f"[api_server] WARNING: ContextBuilder unavailable: {_e}", file=sys.stderr)
# --- Tool token overhead estimation ---
# Build a lightweight ToolRegistry and register all known tools.
# Token count for tool definitions is constant per nanobot version,
# so we compute it once and cache the result.
try:
from nanobot.agent.tools.registry import ToolRegistry # type: ignore
# nanobot 0.2.1: AskUserTool removed, GlobTool → FindFilesTool,
# NotebookEditTool removed, tool system uses ToolLoader + entry_points
from nanobot.agent.tools.filesystem import ( # type: ignore
ReadFileTool, WriteFileTool, EditFileTool, ListDirTool,
)
from nanobot.agent.tools.search import FindFilesTool, GrepTool # type: ignore
from nanobot.agent.tools.shell import ExecTool # type: ignore
from nanobot.agent.tools.web import WebSearchTool, WebFetchTool # type: ignore
from nanobot.agent.tools.message import MessageTool # type: ignore
from nanobot.utils.helpers import estimate_prompt_tokens # type: ignore
reg = ToolRegistry()
_ws = workspace_path
# Tools with simple constructors (v0.2.1: 10 lightweight tools)
reg.register(ReadFileTool(workspace=_ws))
reg.register(WriteFileTool(workspace=_ws))
reg.register(EditFileTool(workspace=_ws))
reg.register(ListDirTool(workspace=_ws))
reg.register(FindFilesTool(workspace=_ws))
reg.register(GrepTool(workspace=_ws))
reg.register(ExecTool(
working_dir=str(_ws), timeout=120,
restrict_to_workspace=False, sandbox=False,
))
reg.register(WebSearchTool())
reg.register(WebFetchTool())
reg.register(MessageTool(send_callback=lambda x: None, workspace=_ws))
tool_defs = reg.get_definitions()
lightweight_tokens = estimate_prompt_tokens([], tool_defs)
# SpawnTool, CronTool, LongTaskTool, SelfTool need heavy deps
# (AgentLoop, SubagentManager, CronService) that we cannot construct here.
# Add a static estimate for their schema overhead.
_HEAVY_TOOL_OVERHEAD = 400 # ~100 tokens per skipped tool
result["tool_token_overhead"] = lightweight_tokens + _HEAVY_TOOL_OVERHEAD
print(
f"[api_server] Tool token overhead: {lightweight_tokens} "
f"(10 tools) + {_HEAVY_TOOL_OVERHEAD} (4 skipped) = "
f"{result['tool_token_overhead']}",
file=sys.stderr,
)
except Exception as _te:
print(f"[api_server] INFO: tool overhead estimation failed: {_te}", file=sys.stderr)
# Fallback: nanobot default ~15 tools ≈ 3000 tokens
result["tool_token_overhead"] = 3000
_context_builder_cache[cache_key] = result
return result
async def _handle_context_usage(request: web.Request) -> web.Response:
"""Return context window usage estimate for a session.
Reads the session JSONL from workspace, builds the full prompt context
(system prompt + skills + memory + runtime context + session history),
and estimates prompt tokens via tiktoken. This matches the gateway's
Consolidator estimate closely enough for a percentage display.
This endpoint lives in api_server (not nanobot gateway) so that nanobot
source code does not need modification — easier version management.
"""
key = request.match_info.get("key", "")
if not key:
return web.Response(status=400, text=json.dumps({"error": "missing session key"}),
content_type="application/json", headers=_CORS_HEADERS)
sm = _make_session_manager()
if not sm:
return web.Response(status=503, text=json.dumps({"error": "workspace unavailable"}),
content_type="application/json", headers=_CORS_HEADERS)
data = sm.read_session_file(key)
if data is None:
return web.Response(status=404, text=json.dumps({"error": "session not found"}),
content_type="application/json", headers=_CORS_HEADERS)
messages = data.get("messages", [])
# Read context window config
context_window_tokens = 65536 # nanobot default
max_completion_tokens = 8192 # nanobot default
try:
from nanobot.config.loader import load_config # type: ignore
_cfg = load_config(config_path)
context_window_tokens = _cfg.agents.defaults.context_window_tokens
max_completion_tokens = _cfg.agents.defaults.max_tokens
except Exception:
pass # keep defaults
# Budget mirrors Consolidator formula:
# ctx_window - max_completion - _SAFETY_BUFFER(1024)
safety_buffer = 1024
input_budget = max(context_window_tokens - max_completion_tokens - safety_buffer, 1)
# Build full prompt messages via ContextBuilder (includes system prompt,
# skills, memory, runtime context) and add tool-definition overhead.
cb_info = _get_or_create_context_builder()
cb = cb_info.get("context_builder")
tool_overhead = cb_info.get("tool_token_overhead", 0)
estimated = 0
source = "none"
if cb is not None:
try:
from nanobot.utils.helpers import estimate_prompt_tokens # type: ignore
channel, chat_id = (key.split(":", 1) + [""])[:2]
probe_messages = cb.build_messages(
history=messages,
current_message="[token-probe]",
channel=channel or None,
chat_id=chat_id or None,
)
base_tokens = estimate_prompt_tokens(probe_messages)
estimated = base_tokens + tool_overhead
source = "tiktoken+context"
except Exception as _e:
print(f"[api_server] WARNING: ContextBuilder token estimation failed: {_e}", file=sys.stderr)
# Fallback: session-only estimate + static overhead
try:
from nanobot.utils.helpers import estimate_prompt_tokens as _ept # type: ignore
estimated = _ept(messages) + 7800 + tool_overhead # ~7800 = avg system+runtime overhead
source = "tiktoken+overhead"
except Exception:
estimated = 0
source = "none"
else:
# No ContextBuilder — raw session estimate only
try:
from nanobot.utils.helpers import estimate_prompt_tokens # type: ignore
estimated = estimate_prompt_tokens(messages)
source = "tiktoken"
except Exception:
estimated = 0
source = "none"
percent = min(int((estimated / input_budget) * 100), 999) if input_budget > 0 else 0
payload = {
"context_window_tokens": context_window_tokens,
"estimated_prompt_tokens": estimated,
"input_budget": input_budget,
"max_completion_tokens": max_completion_tokens,
"percent": percent,
"source": source,
"last_usage": {
"prompt_tokens": 0,
"completion_tokens": 0,
"cached_tokens": 0,
"total_tokens": 0,
},
}
return web.Response(text=json.dumps(payload),
content_type="application/json", headers=_CORS_HEADERS)
app.router.add_route("OPTIONS", "/api/sessions/{key}/context-usage", _handle_options)
app.router.add_get("/api/sessions/{key}/context-usage", _handle_context_usage)
# ---------------------------------------------------------------------------
# Subagent history endpoint — reads session JSONL and returns all
# subagent results (completed / failed) for a given session key.
# This lives in api_server (not nanobot) so we don't modify nanobot source.
# ---------------------------------------------------------------------------
async def _handle_subagents(request: web.Request) -> web.Response:
"""Return subagent history for a session.
Reads the session JSONL, filters messages with
``injected_event == "subagent_result"``, and returns structured
subagent entries including real task_id, label, status, and full
(un-truncated) result text.
"""
key = request.match_info.get("key", "")
if not key:
return web.Response(status=400, text=json.dumps({"error": "missing session key"}),
content_type="application/json", headers=_CORS_HEADERS)
sm = _make_session_manager()
if not sm:
return web.Response(status=503, text=json.dumps({"error": "workspace unavailable"}),
content_type="application/json", headers=_CORS_HEADERS)
data = sm.read_session_file(key)
if data is None:
return web.Response(status=404, text=json.dumps({"error": "session not found"}),
content_type="application/json", headers=_CORS_HEADERS)
messages = data.get("messages", [])
subagents: list[dict[str, Any]] = []
# Track labels already added to avoid duplicates when both the
# injected_event entry AND the drained user-role entry exist.
seen_labels: set[str] = set()
def _parse_subagent_entry(content: str, task_id: str, timestamp: str) -> dict[str, Any] | None:
"""Parse a subagent announcement into a structured entry.
Handles both formats:
1. role=assistant + injected_event="subagent_result" (intended but
currently not persisted to JSONL)
2. role=user with content starting with "[Subagent '...' ...]"
(the _drain_pending path that converts InboundMessage → user msg)
"""
if not isinstance(content, str) or not content.strip():
return None
# Format: [Subagent 'label' completed successfully/failed]
header_match = re.match(
r"^\[Subagent\s+'([^']+)'\s+(completed successfully|failed)\]",
content,
)
if not header_match:
return None
label = header_match.group(1)
status_text = header_match.group(2)
status = "completed" if status_text.startswith("completed") else "failed"
# Extract task description
task_match = re.search(
r"\nTask:\s*\n([\s\S]*?)(?=\nResult:|\nCompleted steps:|\nFailure:|\nSummarize|$)",
content,
)
task_desc = task_match.group(1).strip() if task_match else None
# Extract completed steps
steps_match = re.search(
r"Completed steps:\s*\n([\s\S]*?)(?=\n\nFailure:|\n\nSummarize|\nResult:|$)",
content,
)
completed_steps = None
if steps_match:
completed_steps = [
re.sub(r"^-\s*", "", line).strip()
for line in steps_match.group(1).splitlines()
if line.strip()
]
# Extract failure detail
failure_match = re.search(
r"Failure:\s*\n([\s\S]*?)(?=\n\nSummarize|\nResult:|$)",
content,
)
failure_detail = failure_match.group(1).strip() if failure_match else None
# Extract result body — the text between "Result:" and either
# "Completed steps:" or "Summarize". When the subagent failed
# the Result section may only contain the steps/failure blocks,
# so result_text can be None.
result_match = re.search(
r"\nResult:\s*\n([\s\S]*?)(?=\nCompleted steps:|\nSummarize|$)",
content,
)
result_text = result_match.group(1).strip() if result_match else None
if result_text == "":
result_text = None
entry: dict[str, Any] = {
"task_id": task_id,
"label": label,
"status": status,
"timestamp": timestamp,
}
if task_desc:
entry["task"] = task_desc
if completed_steps:
entry["completed_steps"] = completed_steps
if failure_detail:
entry["failure_detail"] = failure_detail
if result_text:
entry["result_text"] = result_text
return entry
for m in messages:
if not isinstance(m, dict):
continue
content = m.get("content", "")
role = m.get("role", "")
# Path 1: role=assistant with injected_event="subagent_result"
# (intended nanobot persistence — currently not present in JSONL
# but may work in future versions)
if m.get("injected_event") == "subagent_result":
task_id = m.get("subagent_task_id", "")
timestamp = m.get("timestamp", "")
entry = _parse_subagent_entry(content, task_id, timestamp)
if entry and entry["label"] not in seen_labels:
seen_labels.add(entry["label"])
subagents.append(entry)
continue
# Path 2: role=user with content starting with [Subagent '...' ...]
# This is the actual format present in session JSONL because
# _drain_pending converts InboundMessage → {"role": "user", ...}
# and subagent announcements arrive this way.
if role == "user" and isinstance(content, str) and content.startswith("[Subagent "):
task_id = m.get("subagent_task_id", "")
timestamp = m.get("timestamp", "")
entry = _parse_subagent_entry(content, task_id, timestamp)
if entry and entry["label"] not in seen_labels:
seen_labels.add(entry["label"])
subagents.append(entry)
continue
return web.Response(
text=json.dumps({"subagents": subagents}, ensure_ascii=False),
content_type="application/json",
charset="utf-8",
headers=_CORS_HEADERS,
)
app.router.add_route("OPTIONS", "/api/subagents/{key}", _handle_options)
app.router.add_get("/api/subagents/{key}", _handle_subagents)
# ---------------------------------------------------------------------------
# Session media endpoint — reads the nanobot WebUI transcript JSONL
# and returns assistant media_urls so the UI can restore attachments
# after a page refresh without modifying nanobot source.
# ---------------------------------------------------------------------------
def _read_transcript_media(session_key: str) -> list[dict[str, Any]]:
"""Read the session JSONL for *session_key* and return assistant
media entries in order.
nanobot's persisted session JSONL contains the raw local file paths
in the ``media`` field but does NOT persist ``media_urls`` (signed).
We walk the JSONL, locate assistant turns that have a ``media`` list,
and build ``media_urls`` entries from the raw paths so the UI can
restore file attachments after a page refresh.
"""
import glob
if not workspace_path:
return []
sessions_dir = workspace_path / "sessions"
if not sessions_dir.is_dir():
return []
# Match the websocket JSONL file for this session key.
# nanobot names files: websocket_<key>.jsonl where <key> is the
# full session key. The nanobot gateway stores with underscore instead
# of colon (websocket_7bd3ce...jsonl instead of websocket:7bd3ce...jsonl).
candidates = []
# Try original key and underscore-version
search_keys = [session_key, session_key.replace(":", "_")]
for sk in search_keys:
for pattern in [
f"{sk}.jsonl",
f"*{sk}*.jsonl",
]:
candidates.extend(sessions_dir.glob(pattern))
# De-duplicate
seen = set()
candidates = [p for p in candidates if not (p in seen or seen.add(p))]
if not candidates:
return []
# Pick the most recently modified file.
candidates.sort(key=lambda p: p.stat().st_mtime, reverse=True)
path = candidates[0]
# Simple extension-based media kind inference.
_IMAGE_EXTS = frozenset({".png", ".jpg", ".jpeg", ".webp", ".gif", ".svg"})
_VIDEO_EXTS = frozenset({".mp4", ".mov", ".webm"})
def _kind_from_name(name: str) -> str:
ext = Path(name).suffix.lower()
if ext in _IMAGE_EXTS:
return "image"
if ext in _VIDEO_EXTS:
return "video"
return "file"
results: list[dict[str, Any]] = []
try:
with open(path, "r", encoding="utf-8") as fh:
for line in fh:
line = line.strip()
if not line:
continue
try:
rec = json.loads(line)
except json.JSONDecodeError:
continue
if not isinstance(rec, dict):
continue
if rec.get("role") != "assistant":
continue
media = rec.get("media")
if not isinstance(media, list) or not media:
# Emit an empty placeholder so the frontend can keep
# media entries aligned by assistant turn index.
results.append({"media_urls": []})
continue
urls: list[dict[str, str]] = []
# Prefer media_urls if they were persisted
# (likely empty because nanobot doesn't persist them).
media_urls = rec.get("media_urls")
if isinstance(media_urls, list) and media_urls:
for mu in media_urls:
if isinstance(mu, dict) and mu.get("url"):
name = str(mu.get("name", ""))
urls.append({
"url": str(mu["url"]),
"name": name,
"kind": str(mu.get("kind", _kind_from_name(name))),
})
# Fallback: raw local paths from the ``media`` field.
if not urls:
for p in media:
if isinstance(p, str) and p:
name = Path(p).name
urls.append({
"url": p,
"name": name,
"kind": _kind_from_name(name),
})
if urls:
results.append({"media_urls": urls})
except Exception as e:
print(f"[api_server] _read_transcript_media error: {e}", file=sys.stderr)
return []
return results
async def _handle_session_media(request: web.Request) -> web.Response:
key = request.match_info.get("key", "")
if not key:
return web.Response(
status=400,
text=json.dumps({"error": "missing session key"}),
content_type="application/json",
headers=_CORS_HEADERS,
)
try:
data = _read_transcript_media(key)
return web.Response(
text=json.dumps({"key": key, "media": data}),
content_type="application/json",
headers=_CORS_HEADERS,
)
except Exception as e:
print(f"[api_server] session-media error: {e}", file=sys.stderr)
return web.Response(
status=500,
text=json.dumps({"error": str(e)}),
content_type="application/json",
headers=_CORS_HEADERS,
)
app.router.add_route("OPTIONS", "/api/session-media/{key}", _handle_options)
app.router.add_get("/api/session-media/{key}", _handle_session_media)
# ---------------------------------------------------------------------------
# Session files endpoint — list all uploaded / generated files across sessions
# ---------------------------------------------------------------------------
def _scan_session_files() -> list[dict[str, Any]]:
"""Scan sessions directory and uploads dir for all session-related files."""
import time
results: list[dict[str, Any]] = []
if not workspace_path:
return results
sessions_dir = workspace_path / "sessions"
uploads_dir = _get_uploads_dir()
_IMAGE_EXTS = frozenset({".png", ".jpg", ".jpeg", ".webp", ".gif", ".svg"})
_VIDEO_EXTS = frozenset({".mp4", ".mov", ".webm"})
def _kind_from_name(name: str) -> str:
ext = Path(name).suffix.lower()
if ext in _IMAGE_EXTS:
return "image"
if ext in _VIDEO_EXTS:
return "video"
return "file"
# 1. Scan uploads directory for all uploaded files
if uploads_dir and uploads_dir.is_dir():
for f in uploads_dir.iterdir():
if f.is_file():
stat = f.stat()
name = f.name
session_key = ""
if "_" in name:
prefix = name.split("_", 1)[0]
if len(prefix) >= 32:
session_key = prefix[:32]
results.append({
"name": name,
"path": str(f),
"kind": "upload",
"type": _kind_from_name(name),
"sessionKey": session_key,
"sessionPreview": "",
"size": stat.st_size,
"time": stat.st_mtime,
})
# 2. Scan session JSONL files for assistant-generated files and user media
if not sessions_dir.is_dir():
return results
for sess_file in sessions_dir.glob("websocket_*.jsonl"):
if not sess_file.is_file():
continue
stem = sess_file.stem
session_key = stem.replace("websocket_", "", 1)
session_preview = ""
try:
with open(sess_file, "r", encoding="utf-8") as fh:
for line in fh:
line = line.strip()
if not line:
continue
try:
rec = json.loads(line)
except json.JSONDecodeError:
continue
if not isinstance(rec, dict):
continue
if rec.get("role") == "assistant" and not session_preview:
content = rec.get("content", "")
session_preview = rec.get("preview", "") or (content[:80] if isinstance(content, str) else "")
# User uploads via media_urls
if rec.get("role") == "user":
media_urls = rec.get("media_urls")
if isinstance(media_urls, list) and media_urls:
for mu in media_urls:
if isinstance(mu, dict) and mu.get("url"):
name = str(mu.get("name", ""))
url = str(mu["url"])
file_path = ""
if url.startswith("http://") or url.startswith("https://"):
continue
if url.startswith("file://"):
file_path = url[len("file://"):]
if file_path.startswith("/"):
file_path = file_path[1:]
else:
file_path = url
already = any(
r["kind"] == "upload" and r["path"] == file_path
for r in results
)
if not already and file_path and Path(file_path).is_file():
pst = Path(file_path).stat()
results.append({
"name": name or Path(file_path).name,
"path": file_path,
"kind": "upload",
"type": str(mu.get("kind", _kind_from_name(name))),
"sessionKey": session_key,
"sessionPreview": session_preview,
"size": pst.st_size,
"time": pst.st_mtime,
})
# Assistant generated files
if rec.get("role") == "assistant":
media = rec.get("media")
media_urls = rec.get("media_urls")
urls: list[dict[str, str]] = []
if isinstance(media_urls, list) and media_urls:
for mu in media_urls:
if isinstance(mu, dict) and mu.get("url"):
name = str(mu.get("name", ""))
urls.append({
"url": str(mu["url"]),
"name": name or Path(str(mu["url"])).name,
"kind": str(mu.get("kind", _kind_from_name(name))),
})
if not urls and isinstance(media, list) and media:
for p in media:
if isinstance(p, str) and p:
urls.append({
"url": p,
"name": Path(p).name,
"kind": _kind_from_name(Path(p).name),
})
for u in urls:
url = u["url"]
file_path = ""
if url.startswith("http://") or url.startswith("https://"):
continue
if url.startswith("file://"):
file_path = url[len("file://"):]
if file_path.startswith("/"):
file_path = file_path[1:]
else:
file_path = url
if not file_path or not Path(file_path).is_file():
continue
already = any(r["path"] == file_path for r in results)
if already:
for r in results:
if r["path"] == file_path and not r.get("sessionKey"):
r["sessionKey"] = session_key
r["sessionPreview"] = session_preview or r.get("sessionPreview", "")
continue
pst = Path(file_path).stat()
results.append({
"name": u["name"],
"path": file_path,
"kind": "generated",
"type": u["kind"],
"sessionKey": session_key,
"sessionPreview": session_preview,
"size": pst.st_size,
"time": pst.st_mtime,
})
except Exception as e:
print(f"[api_server] _scan_session_files error reading {sess_file}: {e}", file=sys.stderr)
continue
# 3. Mark sessionExists for each file
existing_keys: set[str] = set()
if sessions_dir.is_dir():
for sf in sessions_dir.glob("websocket_*.jsonl"):
if sf.is_file():
existing_keys.add(sf.stem.replace("websocket_", "", 1))
for r in results:
r["sessionExists"] = bool(r.get("sessionKey")) and r["sessionKey"] in existing_keys
results.sort(key=lambda x: x.get("time", 0), reverse=True)
return results
async def _handle_session_files(request: web.Request) -> web.Response:
"""GET /api/session-files — list all uploaded & generated files across sessions."""
try:
data = _scan_session_files()
return web.Response(
text=json.dumps({"files": data}),
content_type="application/json",
headers=_CORS_HEADERS,
)
except Exception as e:
print(f"[api_server] session-files error: {e}", file=sys.stderr)
return web.Response(
status=500,
text=json.dumps({"error": str(e)}),
content_type="application/json",
headers=_CORS_HEADERS,
)
app.router.add_route("OPTIONS", "/api/session-files", _handle_options)
app.router.add_get("/api/session-files", _handle_session_files)
# ---------------------------------------------------------------------------
# Skills Pool endpoints — fetch available skills from the remote release
# server; fall back to local skills_pools/ when the server is unreachable.
# ---------------------------------------------------------------------------
_SKILLS_POOLS_REMOTE_BASE = "https://did.pegatroncorp.com/mec_agent_release/skills_pools/"
_sp_skill_to_cat: dict[str, str] = {} # skill_name → category_name (in-memory cache)
def _get_skills_pools_root() -> Path | None:
"""Return the local skills_pools/ root (offline fallback).
Searches:
1. <api_server_dir>/skills_pools/ (dev mode & bundled)
2. <install_root>/skills_pools/ (fallback)
"""
if getattr(sys, "frozen", False):
base = Path(sys.executable).parent
else:
base = Path(__file__).parent
candidate = base / "skills_pools"
if candidate.is_dir():
return candidate
# fallback: install root
candidate2 = base.parent / "skills_pools"
if candidate2.is_dir():
return candidate2
return None
def _read_skill_info(skill_dir: Path) -> dict:
"""Read SKILL.md metadata and optional info.md from a skill directory."""
skill_md = skill_dir / "SKILL.md"
name = skill_dir.name
preview = ""
if skill_md.exists():
try:
text = skill_md.read_text(encoding="utf-8", errors="replace")
in_frontmatter = False
frontmatter_done = False
frontmatter_count = 0
for line in text.splitlines():
stripped_raw = line.strip()
# Skip YAML frontmatter (--- ... ---)
if stripped_raw == "---":
frontmatter_count += 1
if frontmatter_count == 1:
in_frontmatter = True
continue
elif frontmatter_count == 2:
in_frontmatter = False
frontmatter_done = True
continue
if in_frontmatter:
continue
stripped = stripped_raw.lstrip("#").strip()
if stripped:
preview = stripped
break
except Exception:
pass
info = ""
info_md = skill_dir / "info.md"
if info_md.exists():
try:
raw = info_md.read_text(encoding="utf-8", errors="replace").strip()
info = raw.replace("\n", " ")[:100]
except Exception:
pass
return {
"name": name,
"has_skill_md": skill_md.exists(),
"preview": preview,
"info": info,
}
# ── Remote-fetch helpers ─────────────────────────────────────────────────
_sp_ssl_ctx_cache: list = [] # mutable singleton — populated on first call
def _sp_get_ssl_ctx():
"""Return a cached SSL context (built once) that trusts the Pegatron CA bundle."""
if _sp_ssl_ctx_cache:
return _sp_ssl_ctx_cache[0]
import ssl
ctx = ssl.create_default_context()
candidates = [
os.environ.get("REQUESTS_CA_BUNDLE", ""),
os.environ.get("SSL_CERT_FILE", ""),
]
if config_path:
candidates.append(str(config_path.parent / "pegatron-ca-bundle.pem"))
if getattr(sys, "frozen", False):
candidates.append(str(Path(sys.executable).parent.parent / "config" / "pegatron-ca-bundle.pem"))
else:
candidates.append(str(Path(__file__).parent.parent / "config" / "pegatron-ca-bundle.pem"))
for ca in candidates:
if ca and os.path.exists(ca):
try:
ctx.load_verify_locations(ca)
except Exception:
pass
break
_sp_ssl_ctx_cache.append(ctx)
return ctx
def _sp_parse_dirs(html: str, base_url: str = "") -> list[str]:
"""Extract direct-child directory names from an IIS-style HTML listing.
Pass ``base_url`` (the URL of the page being parsed) so that parent /
ancestor directory links are automatically filtered out — only entries
that are exactly one level deeper than ``base_url`` are returned.
"""
from urllib.parse import unquote as _uq, urlparse as _up
base_path = _up(base_url).path if base_url else ""
if base_path and not base_path.endswith("/"):
base_path += "/"
result = []
for link in re.findall(r'href=["\']([^"\'>?#]+/)["\']', html, re.IGNORECASE):
if "://" in link:
path = _up(link).path
elif link.startswith("/"):
path = link
else:
continue # bare relative href — unexpected from IIS
if base_path:
if not path.startswith(base_path):
continue
remainder = path[len(base_path):].rstrip("/")
if "/" in remainder: # not a direct child
continue
name = _uq(remainder)
else:
name = _uq(path.rstrip("/").rsplit("/", 1)[-1])
if name and name not in (".", "..", "__pycache__"):
result.append(name)
return result
def _sp_parse_files(html: str, base_url: str = "") -> list[str]:
"""Extract direct-child file names from an IIS-style HTML listing."""
from urllib.parse import unquote as _uq, urlparse as _up
base_path = _up(base_url).path if base_url else ""
if base_path and not base_path.endswith("/"):
base_path += "/"
result = []
for link in re.findall(r'href=["\']([^"\'>?#]+)["\']', html, re.IGNORECASE):
if link.endswith("/"):
continue # directory
if "://" in link:
path = _up(link).path
elif link.startswith("/"):
path = link
else:
continue
if base_path:
if not path.startswith(base_path):
continue
remainder = path[len(base_path):]
if "/" in remainder: # not a direct file
continue
fname = _uq(remainder)
else:
fname = _uq(path.rsplit("/", 1)[-1])
if fname and not fname.endswith(".pyc"):
result.append(fname)
return result
def _sp_url_join(base: str, *parts: str) -> str:
"""Append URL-encoded path segments to base URL."""
from urllib.parse import quote as _q
url = base.rstrip("/")
for p in parts:
url += "/" + _q(p, safe="")
return url
def _sp_extract_preview(md_text: str) -> str:
"""Return first meaningful line from SKILL.md (skips YAML frontmatter)."""
in_fm = False
fm_count = 0
for line in md_text.splitlines():
s = line.strip()
if s == "---":
fm_count += 1
in_fm = (fm_count == 1)
if fm_count == 2:
in_fm = False
continue
if in_fm:
continue
clean = s.lstrip("#").strip()
if clean:
return clean
return ""
async def _sp_fetch_text(session, url: str) -> str:
"""GET url and return decoded text; raises on HTTP error."""
import aiohttp as _ah
async with session.get(url, timeout=_ah.ClientTimeout(total=20), ssl=_sp_get_ssl_ctx()) as r:
r.raise_for_status()
return await r.text(encoding="utf-8", errors="replace")
async def _sp_fetch_bytes(session, url: str) -> bytes:
"""GET url and return raw bytes."""
import aiohttp as _ah
async with session.get(url, timeout=_ah.ClientTimeout(total=20), ssl=_sp_get_ssl_ctx()) as r:
r.raise_for_status()
return await r.read()
async def _sp_download_dir(session, remote_url: str, dest: Path, sem) -> list[str]:
"""Recursively download a remote directory into dest (sem limits concurrency).
Returns a list of file paths that **failed** to download so the caller
can decide whether to retry or report.
"""
html = await _sp_fetch_text(session, remote_url)
dirs = _sp_parse_dirs(html, remote_url)
files = _sp_parse_files(html, remote_url)
dest.mkdir(parents=True, exist_ok=True)
failed: list[str] = [] # collect file names that could not be downloaded
async def _dl_file(fname: str) -> None:
async with sem:
try:
data = await _sp_fetch_bytes(session, _sp_url_join(remote_url, fname))
(dest / fname).write_bytes(data)
except Exception as e:
failed.append(fname)
print(f"[api_server] WARNING: file dl {_sp_url_join(remote_url, fname)}: {e}",
file=sys.stderr)
async def _dl_sub(dname: str) -> list[str]:
return await _sp_download_dir(session, _sp_url_join(remote_url, dname) + "/",
dest / dname, sem)
sub_results: list[list[str]] = await asyncio.gather(
*[_dl_file(f) for f in files],
*[_dl_sub(d) for d in dirs],
return_exceptions=True,
)
# Merge sub-directory failures
for sr in sub_results:
if isinstance(sr, list):
failed.extend(sr)
return failed
# ── Endpoint handlers ────────────────────────────────────────────────────
async def _handle_skills_pools_list(request: web.Request) -> web.Response:
"""GET /api/skills-pools — fetch categories/skills from remote server.
Only fetches category listing pages (1 + N_cat requests total); individual
skill pages are NOT fetched at list time. has_skill_md is assumed True.
Falls back to local skills_pools/ when the remote is unreachable.
"""
active_skills: set[str] = set()
if workspace_path:
ws_skills = workspace_path / "skills"
if ws_skills.is_dir():
active_skills = {e.name for e in ws_skills.iterdir() if e.is_dir()}
try:
import aiohttp as _ah
async with _ah.ClientSession() as session:
# ── Step 1: root listing ─────────────────────────────────────
root_html = await _sp_fetch_text(session, _SKILLS_POOLS_REMOTE_BASE)
cat_names = _sp_parse_dirs(root_html, _SKILLS_POOLS_REMOTE_BASE)
# ── Step 2: one request per category, concurrent ─────────────
async def _fetch_cat(cat_name: str) -> dict:
cat_url = _sp_url_join(_SKILLS_POOLS_REMOTE_BASE, cat_name) + "/"
try:
cat_html = await _sp_fetch_text(session, cat_url)
skill_names = _sp_parse_dirs(cat_html, cat_url)
except Exception as e:
print(f"[api_server] WARNING: category {cat_name}: {e}", file=sys.stderr)
return {"name": cat_name, "required": cat_name.lower() == "defult",
"skills": []}
# Register skill→category map (used by apply)
for sn in skill_names:
_sp_skill_to_cat[sn] = cat_name
async def _fetch_skill_info(sn: str) -> str:
info_url = _sp_url_join(cat_url, sn, "info.md")
try:
info_text = await _sp_fetch_text(session, info_url)
return info_text.strip().replace("\n", " ")[:30]
except Exception:
return ""
infos = await asyncio.gather(*[_fetch_skill_info(sn) for sn in skill_names])
skills = [
{"name": sn, "has_skill_md": True, "preview": "",
"info": info, "active": sn in active_skills}
for sn, info in zip(sorted(skill_names), infos)
]
return {"name": cat_name, "required": cat_name.lower() == "defult",
"skills": skills}
categories = list(await asyncio.gather(
*[_fetch_cat(cn) for cn in sorted(cat_names)]))
return web.Response(
text=json.dumps({"categories": categories, "source": "remote",
"remote_base": _SKILLS_POOLS_REMOTE_BASE}),
content_type="application/json", headers=_CORS_HEADERS,
)
except Exception as e:
print(f"[api_server] WARNING: remote skills pool failed ({e}), using local fallback",
file=sys.stderr)
pools_root = _get_skills_pools_root()
if not pools_root:
return web.Response(
status=503,
text=json.dumps({"error": f"Remote unavailable: {e}; no local fallback found."}),
content_type="application/json", headers=_CORS_HEADERS)
cats: list[dict] = []
for cat_dir in sorted(pools_root.iterdir()):
if not cat_dir.is_dir():
continue
skills_local: list[dict] = []
for sd in sorted(cat_dir.iterdir()):
if not sd.is_dir():
continue
info = _read_skill_info(sd)
info["active"] = sd.name in active_skills
_sp_skill_to_cat[sd.name] = cat_dir.name
skills_local.append(info)
cats.append({"name": cat_dir.name,
"required": cat_dir.name.lower() == "defult",
"skills": skills_local})
return web.Response(
text=json.dumps({"categories": cats, "source": "local",
"remote_base": _SKILLS_POOLS_REMOTE_BASE}),
content_type="application/json", headers=_CORS_HEADERS,
)
async def _handle_skills_pools_apply(request: web.Request) -> web.Response:
"""POST /api/skills-pools/apply — download selected skills from remote server.
Falls back to local copy when the remote is unreachable.
"""
if not workspace_path:
return web.Response(status=503, text=json.dumps({"error": "workspace unavailable"}),
content_type="application/json", headers=_CORS_HEADERS)
try:
body = await request.json()
except Exception:
return web.Response(status=400, text=json.dumps({"error": "invalid JSON"}),
content_type="application/json", headers=_CORS_HEADERS)
selected: set[str] = set(body.get("skills", []))
# Ensure skill→category map is populated (re-fetch if cache is empty)
skill_to_cat: dict[str, str] = dict(_sp_skill_to_cat)
if not skill_to_cat:
try:
import aiohttp as _ah
async with _ah.ClientSession() as _s:
_root = await _sp_fetch_text(_s, _SKILLS_POOLS_REMOTE_BASE)
for _cn in _sp_parse_dirs(_root, _SKILLS_POOLS_REMOTE_BASE):
_cat_url = _sp_url_join(_SKILLS_POOLS_REMOTE_BASE, _cn) + "/"
_cat_html = await _sp_fetch_text(_s, _cat_url)
for _sn in _sp_parse_dirs(_cat_html, _cat_url):
skill_to_cat[_sn] = _cn
_sp_skill_to_cat[_sn] = _cn
except Exception as e:
print(f"[api_server] WARNING: could not build skill→cat map: {e}", file=sys.stderr)
# Always include defult skills
defult_skills: set[str] = {s for s, c in skill_to_cat.items() if c.lower() == "defult"}
local_root = _get_skills_pools_root()
if local_root and (local_root / "defult").is_dir():
for _d in (local_root / "defult").iterdir():
if _d.is_dir():
defult_skills.add(_d.name)
selected.update(defult_skills)
import shutil
ws_skills_dir = workspace_path / "skills"
ws_skills_dir.mkdir(parents=True, exist_ok=True)
# Remove deselected skills that belong to a known pool
all_pool_skills = set(skill_to_cat.keys())
locked_skills: set[str] = set() # skills that could not be removed due to file lock
for existing in ws_skills_dir.iterdir():
if existing.is_dir() and existing.name not in selected and existing.name in all_pool_skills:
try:
shutil.rmtree(existing)
except Exception as e:
locked_skills.add(existing.name)
print(f"[api_server] WARNING: remove {existing.name} skipped (locked?): {e}",
file=sys.stderr)
copied: list[str] = []
skipped: list[str] = list(locked_skills) # pre-populate with lock failures
try:
import aiohttp as _ah
sem = asyncio.Semaphore(20)
async with _ah.ClientSession() as session:
async def _install(skill_name: str) -> None:
# Skip if removal of this skill previously failed (still locked)
if skill_name in locked_skills:
return
cat_name = skill_to_cat.get(skill_name)
if not cat_name:
skipped.append(skill_name)
return
skill_dst = ws_skills_dir / skill_name
# If the skill directory already exists, remove it first so that
# a fresh, complete download is performed. This avoids stale
# partial downloads persisting when the remote skill is updated.
if skill_dst.exists():
try:
shutil.rmtree(skill_dst)
except Exception as _rm_err:
# Cannot remove (file lock?) — keep as-is, skip re-download
print(f"[api_server] WARNING: cannot remove existing {skill_name}: {_rm_err}",
file=sys.stderr)
copied.append(skill_name)
return
skill_url = _sp_url_join(_SKILLS_POOLS_REMOTE_BASE, cat_name, skill_name) + "/"
try:
failed_files = await _sp_download_dir(session, skill_url, skill_dst, sem)
if failed_files:
print(f"[api_server] WARNING: {skill_name} — {len(failed_files)} file(s) failed: {failed_files}",
file=sys.stderr)
copied.append(skill_name)
except Exception as e:
print(f"[api_server] WARNING: remote install {skill_name} ({e}); trying local",
file=sys.stderr)
# Local fallback
if local_root:
for _cat in local_root.iterdir():
local_src = _cat / skill_name
if _cat.is_dir() and local_src.is_dir():
try:
shutil.copytree(local_src, skill_dst)
copied.append(skill_name)
return
except Exception:
pass
skipped.append(skill_name)
await asyncio.gather(*[_install(sn) for sn in sorted(selected)],
return_exceptions=True)
except Exception as e:
print(f"[api_server] WARNING: remote apply failed ({e}), pure local fallback",
file=sys.stderr)
if not local_root:
return web.Response(
status=503,
text=json.dumps({"error": f"Remote and local unavailable: {e}"}),
content_type="application/json", headers=_CORS_HEADERS)
for cat_dir in local_root.iterdir():
if not cat_dir.is_dir():
continue
for sn in sorted(selected):
src = cat_dir / sn
if not src.is_dir():
continue
dst = ws_skills_dir / sn
try:
if dst.exists():
# Remove existing so a fresh copy replaces potentially
# stale / partial content.
try:
shutil.rmtree(dst)
except Exception:
copied.append(sn)
continue
shutil.copytree(src, dst)
copied.append(sn)
except Exception as ce:
skipped.append(sn)
print(f"[api_server] WARNING: local copy {sn}: {ce}", file=sys.stderr)
return web.Response(
text=json.dumps({"ok": True, "copied": sorted(set(copied)), "skipped": sorted(set(skipped))}),
content_type="application/json",
headers=_CORS_HEADERS,
)
async def _handle_ws_git_log(request: web.Request) -> web.Response:
if not workspace_path:
return web.Response(status=503, text=json.dumps({"error": "workspace unavailable"}),
content_type="application/json", headers=_CORS_HEADERS)
git_dir = workspace_path / ".git"
if not git_dir.exists():
return web.Response(text=json.dumps({"initialized": False, "commits": []}),
content_type="application/json", headers=_CORS_HEADERS)
import subprocess as _subprocess
try:
max_count = min(int(request.rel_url.query.get("max", "30")), 200)
except ValueError:
max_count = 30
try:
result = _subprocess.run(
["git", "log", f"--max-count={max_count}",
"--pretty=format:%H\x1f%s\x1f%an\x1f%aI"],
capture_output=True, text=True,
cwd=str(workspace_path), timeout=10,
)
commits: list[dict] = []
for line in result.stdout.splitlines():
parts = line.split("\x1f", 3)
if len(parts) == 4:
commits.append({"sha": parts[0], "message": parts[1],
"author": parts[2], "date": parts[3]})
return web.Response(text=json.dumps({"initialized": True, "commits": commits}),
content_type="application/json", headers=_CORS_HEADERS)
except Exception as e:
return web.Response(status=500, text=json.dumps({"error": str(e)}),
content_type="application/json", headers=_CORS_HEADERS)
# Register workspace filesystem routes
app.router.add_route("OPTIONS", "/api/ws/file", _handle_options)
app.router.add_route("OPTIONS", "/api/ws/skills", _handle_options)
app.router.add_route("OPTIONS", "/api/ws/git-log", _handle_options)
app.router.add_route("OPTIONS", "/api/skills-pools", _handle_options)
app.router.add_route("OPTIONS", "/api/skills-pools/apply", _handle_options)
app.router.add_get("/api/ws/file", _handle_ws_read_file)
app.router.add_put("/api/ws/file", _handle_ws_write_file)
app.router.add_get("/api/ws/skills", _handle_ws_skills)
app.router.add_get("/api/ws/git-log", _handle_ws_git_log)
app.router.add_get("/api/skills-pools", _handle_skills_pools_list)
app.router.add_post("/api/skills-pools/apply", _handle_skills_pools_apply)
# ---------------------------------------------------------------------------
# Auto-update endpoints
# ---------------------------------------------------------------------------
def _get_install_root() -> Path:
"""Return the installation root (parent of api_server/)."""
if getattr(sys, "frozen", False):
return Path(sys.executable).parent.parent
return Path(__file__).parent.parent
def _get_local_version() -> dict:
from update_manager import read_local_version
return read_local_version(_get_install_root())
def _get_update_config() -> dict:
from update_manager import read_update_config
return read_update_config(_get_install_root())
# Track background staging process
_update_state: dict = {"status": "idle", "progress": None, "error": None, "process": None, "staged_count": 0, "stage_warning": None, "target_version": ""}
async def _handle_update_check(request: web.Request) -> web.Response:
"""GET /api/update/check — compare local vs remote versions (pure Python, no PS1)."""
from update_manager import check_updates
# Optional ?manifestUrl= to preview a specific historical version
manifest_url = request.rel_url.query.get("manifestUrl", "")
loop = asyncio.get_event_loop()
try:
data = await loop.run_in_executor(None, check_updates, _get_install_root(), manifest_url)
return web.Response(
text=json.dumps(data, ensure_ascii=False),
content_type="application/json", headers=_CORS_HEADERS,
)
except RuntimeError as e:
return web.Response(
status=400, text=json.dumps({"error": str(e)}),
content_type="application/json", headers=_CORS_HEADERS,
)
except Exception as e:
return web.Response(
status=500, text=json.dumps({"error": str(e)}),
content_type="application/json", headers=_CORS_HEADERS,
)
async def _handle_update_status(request: web.Request) -> web.Response:
"""GET /api/update/status — return current update state and local version."""
local_ver = _get_local_version()
pending_flag = _get_install_root() / "update" / "_pending_update"
progress_file = _get_install_root() / "update" / "_progress.json"
# Read progress from updater's progress file if available
updater_progress = None
if progress_file.exists():
try:
updater_progress = json.loads(progress_file.read_text(encoding="utf-8-sig"))
except Exception:
pass
return web.Response(
text=json.dumps({
"status": _update_state["status"],
"progress": _update_state["progress"],
"error": _update_state["error"],
"stageWarning": _update_state.get("stage_warning"),
"stagedCount": _update_state.get("staged_count", 0),
"targetVersion": _update_state.get("target_version", ""),
"currentVersion": local_ver.get("version", _APP_VERSION),
"components": local_ver.get("components", {}),
"hasPendingUpdate": pending_flag.exists(),
"updaterProgress": updater_progress,
}, ensure_ascii=False),
content_type="application/json", headers=_CORS_HEADERS,
)
async def _handle_update_stage(request: web.Request) -> web.Response:
"""POST /api/update/stage — download and SHA-256-verify ZIPs (pure Python, no PS1).
Optional JSON body:
{ "manifestUrl": "http://server/v0.3.0/manifest.json", "force": true }
When manifestUrl + force are provided this performs a rollback download.
"""
if _update_state["status"] == "staging":
return web.Response(
text=json.dumps({"error": "staging already in progress"}),
status=409, content_type="application/json", headers=_CORS_HEADERS,
)
# Parse optional body
body: dict = {}
try:
if request.content_length and request.content_length > 0:
body = await request.json()
except Exception:
pass
override_manifest_url: str = body.get("manifestUrl", "")
force_install: bool = bool(body.get("force", False))
if not override_manifest_url and not _get_update_config().get("manifestUrl", ""):
return web.Response(
status=400, text=json.dumps({"error": "update.manifestUrl not configured"}),
content_type="application/json", headers=_CORS_HEADERS,
)
_update_state["status"] = "staging"
_update_state["error"] = None
_update_state["progress"] = "downloading"
from update_manager import stage_updates
def _run_stage() -> None:
try:
result = stage_updates(
_get_install_root(),
manifest_url=override_manifest_url,
force=force_install,
)
staged_count = int(result.get("staged", 0))
failed_count = int(result.get("failed", 0))
failed_names = list(result.get("failedComponents", []))
failed_label = ", ".join(failed_names) if failed_names else f"{failed_count} component(s)"
if staged_count == 0 and failed_count > 0:
_update_state["status"] = "error"
_update_state["error"] = f"Failed to download: {failed_label}"
else:
_update_state["status"] = "staged"
_update_state["progress"] = "ready"
_update_state["staged_count"] = staged_count
_update_state["target_version"] = result.get("targetVersion", "")
_update_state["stage_warning"] = (
f"Partial update: {failed_label} failed checksum or download"
if failed_count > 0 else None
)
except Exception as e:
_update_state["status"] = "error"
_update_state["error"] = str(e)
loop = asyncio.get_event_loop()
loop.run_in_executor(None, _run_stage)
return web.Response(
text=json.dumps({"ok": True, "status": "staging"}),
content_type="application/json", headers=_CORS_HEADERS,
)
async def _handle_update_apply(request: web.Request) -> web.Response:
"""POST /api/update/restart — signal the launcher to restart and apply staged updates."""
pending_flag = _get_install_root() / "update" / "_pending_update"
if not pending_flag.exists():
return web.Response(
status=400, text=json.dumps({"error": "no staged update to apply"}),
content_type="application/json", headers=_CORS_HEADERS,
)
# Write restart signal — launcher polls for this file and performs the restart
signal_file = _get_install_root() / "update" / "_restart_signal"
signal_file.parent.mkdir(parents=True, exist_ok=True)
signal_file.write_text("apply_update", encoding="utf-8")
return web.Response(
text=json.dumps({"ok": True, "status": "applying"}),
content_type="application/json", headers=_CORS_HEADERS,
)
async def _handle_restart_agent(request: web.Request) -> web.Response:
"""POST /api/restart — signal the launcher to restart the MEC Agent."""
# Write restart signal — launcher polls for this file and performs the restart.
# This avoids relying on Windows Task Scheduler which may not have access to
# the user's interactive session when triggered from the api_server process.
signal_file = _get_install_root() / "update" / "_restart_signal"
signal_file.parent.mkdir(parents=True, exist_ok=True)
signal_file.write_text("restart", encoding="utf-8")
return web.Response(
text=json.dumps({"ok": True, "status": "restarting"}),
content_type="application/json", headers=_CORS_HEADERS,
)
async def _handle_update_history(request: web.Request) -> web.Response:
"""GET /api/update/history — return server-side version index (all releases)."""
from update_manager import fetch_version_index
loop = asyncio.get_event_loop()
try:
data = await loop.run_in_executor(None, fetch_version_index, _get_install_root())
return web.Response(
text=json.dumps(data, ensure_ascii=False),
content_type="application/json", headers=_CORS_HEADERS,
)
except RuntimeError as e:
return web.Response(
status=400, text=json.dumps({"error": str(e)}),
content_type="application/json", headers=_CORS_HEADERS,
)
except Exception as e:
return web.Response(
status=500, text=json.dumps({"error": str(e)}),
content_type="application/json", headers=_CORS_HEADERS,
)
async def _handle_update_rollback(request: web.Request) -> web.Response:
"""POST /api/update/rollback — stage a specific historical version for installation.
Request body: { "manifestUrl": "http://server/v0.3.0/manifest.json" }
This is a convenience alias for POST /api/update/stage with force=true and
a custom manifestUrl. The client can reuse the same staging/progress/restart
flow as for a normal update.
"""
if _update_state["status"] == "staging":
return web.Response(
text=json.dumps({"error": "staging already in progress"}),
status=409, content_type="application/json", headers=_CORS_HEADERS,
)
try:
body: dict = await request.json()
except Exception:
return web.Response(
status=400, text=json.dumps({"error": "request body must be JSON with {manifestUrl}"}),
content_type="application/json", headers=_CORS_HEADERS,
)
manifest_url: str = body.get("manifestUrl", "")
if not manifest_url:
return web.Response(
status=400, text=json.dumps({"error": "manifestUrl is required"}),
content_type="application/json", headers=_CORS_HEADERS,
)
_update_state["status"] = "staging"
_update_state["error"] = None
_update_state["progress"] = "downloading"
_update_state["target_version"] = ""
from update_manager import stage_updates
def _run_rollback() -> None:
try:
result = stage_updates(
_get_install_root(),
manifest_url=manifest_url,
force=True,
)
staged_count = int(result.get("staged", 0))
failed_count = int(result.get("failed", 0))
failed_names = list(result.get("failedComponents", []))
failed_label = ", ".join(failed_names) if failed_names else f"{failed_count} component(s)"
if staged_count == 0 and failed_count > 0:
_update_state["status"] = "error"
_update_state["error"] = f"Failed to download: {failed_label}"
else:
_update_state["status"] = "staged"
_update_state["progress"] = "ready"
_update_state["staged_count"] = staged_count
_update_state["target_version"] = result.get("targetVersion", "")
_update_state["stage_warning"] = (
f"Partial rollback: {failed_label} failed checksum or download"
if failed_count > 0 else None
)
except Exception as e:
_update_state["status"] = "error"
_update_state["error"] = str(e)
loop = asyncio.get_event_loop()
loop.run_in_executor(None, _run_rollback)
return web.Response(
text=json.dumps({"ok": True, "status": "staging"}),
content_type="application/json", headers=_CORS_HEADERS,
)
# ── Model management endpoints ─────────────────────────────────────────
async def _handle_models_status(request: web.Request) -> web.Response:
"""GET /api/models/status — check whether the active server model is installed."""
from update_manager import check_model_update, read_model_config
try:
root = _get_install_root()
status = await asyncio.get_event_loop().run_in_executor(
None, check_model_update, root
)
local_cfg = read_model_config(root)
return web.Response(
text=json.dumps({
**status,
"downloadDate": local_cfg.get("downloadDate", ""),
"downloadStatus": _model_dl_state["status"],
"bytesDownloaded": _model_dl_state["bytesDownloaded"],
"totalBytes": _model_dl_state["totalBytes"],
"downloadError": _model_dl_state["error"],
}),
content_type="application/json", headers=_CORS_HEADERS,
)
except Exception as e:
return web.Response(
status=500, text=json.dumps({"error": str(e)}),
content_type="application/json", headers=_CORS_HEADERS,
)
# Global state for model download progress
_model_dl_state: dict = {
"status": "idle", "error": None, "filename": "",
"bytesDownloaded": 0, "totalBytes": 0,
}
async def _handle_models_download(request: web.Request) -> web.Response:
"""POST /api/models/download — start downloading the active model in the background."""
from update_manager import download_model
if _model_dl_state["status"] == "downloading":
return web.Response(
text=json.dumps({"ok": False, "error": "download already in progress"}),
status=409, content_type="application/json", headers=_CORS_HEADERS,
)
_model_dl_state["status"] = "downloading"
_model_dl_state["error"] = None
_model_dl_state["filename"] = ""
_model_dl_state["bytesDownloaded"] = 0
_model_dl_state["totalBytes"] = 0
def _on_progress(phase: str, current: int, total: int, component: str) -> None:
if phase == "model_download":
_model_dl_state["bytesDownloaded"] = current
_model_dl_state["totalBytes"] = total
def _run_dl() -> None:
try:
result = download_model(_get_install_root(), progress_cb=_on_progress)
_model_dl_state["status"] = "done"
_model_dl_state["filename"] = result.get("filename", "")
except Exception as e:
_model_dl_state["status"] = "error"
_model_dl_state["error"] = str(e)
asyncio.get_event_loop().run_in_executor(None, _run_dl)
return web.Response(
text=json.dumps({"ok": True, "status": "downloading"}),
content_type="application/json", headers=_CORS_HEADERS,
)
app.router.add_route("OPTIONS", "/api/restart", _handle_options)
app.router.add_route("OPTIONS", "/api/update/check", _handle_options)
app.router.add_route("OPTIONS", "/api/update/status", _handle_options)
app.router.add_route("OPTIONS", "/api/update/stage", _handle_options)
app.router.add_route("OPTIONS", "/api/update/restart", _handle_options)
app.router.add_route("OPTIONS", "/api/update/history", _handle_options)
app.router.add_route("OPTIONS", "/api/update/rollback", _handle_options)
app.router.add_route("OPTIONS", "/api/models/status", _handle_options)
app.router.add_route("OPTIONS", "/api/models/download", _handle_options)
app.router.add_post("/api/restart", _handle_restart_agent)
app.router.add_get("/api/update/check", _handle_update_check)
app.router.add_get("/api/update/status", _handle_update_status)
app.router.add_post("/api/update/stage", _handle_update_stage)
app.router.add_post("/api/update/restart", _handle_update_apply)
app.router.add_get("/api/update/history", _handle_update_history)
app.router.add_post("/api/update/rollback", _handle_update_rollback)
app.router.add_get("/api/models/status", _handle_models_status)
app.router.add_post("/api/models/download", _handle_models_download)
# Start listening immediately so the UI can reach us without waiting for nanobot to load.
runner = web.AppRunner(app)
await runner.setup()
site = web.TCPSite(runner, args.host, args.port, reuse_address=True)
await site.start()
print(f"[api_server] Listening on http://{args.host}:{args.port}", flush=True)
# ── Start CronService (with on_job callback for notifications) ────────────
if cron_svc:
try:
await cron_svc.start()
print("[api_server] CronService started", flush=True)
except Exception as _e:
print(f"[api_server] WARNING: CronService start failed: {_e}", file=sys.stderr)
# Block until process is killed
try:
await asyncio.Event().wait()
finally:
if cron_svc:
try:
cron_svc.stop()
except Exception:
pass
await runner.cleanup()
def _run_apply_staged() -> None:
"""Apply staged update and exit. Invoked when --apply-staged is passed.
This runs synchronously — api_server is not started, no web listener is bound.
Called by the launcher after all running services have been stopped.
"""
import argparse as _ap
p = _ap.ArgumentParser(add_help=False)
p.add_argument("--apply-staged", action="store_true")
p.add_argument("--install-root", default="", dest="install_root")
args, _ = p.parse_known_args()
from update_manager import apply_staged, get_install_root
install_root = Path(args.install_root).resolve() if args.install_root else get_install_root()
result = apply_staged(install_root)
print(json.dumps(result))
sys.exit(0 if result.get("applied") or result.get("components", 0) > 0 else 1)
def main() -> None:
# Detect --apply-staged early — before importing aiohttp or starting anything
if "--apply-staged" in sys.argv:
_run_apply_staged()
return
asyncio.run(_main())
if __name__ == "__main__":
main()
Comments...
No Comments Yet...