"""
update_manager.py — MEC Agent update engine (Python replacement for mec_updater.ps1).
All update logic runs inside api_server.exe so that SHA-256 verification details
are hidden from end-users. The old mec_updater.ps1 is no longer needed.
Public API
----------
check_updates(install_root) -> dict
stage_updates(install_root, progress_cb) -> dict
apply_staged(install_root) -> dict
get_install_root() -> Path
read_local_version(install_root) -> dict
read_update_config(install_root) -> dict
verify_sha256(file_path, expected) -> bool
"""
from __future__ import annotations
import hashlib
import json
import logging
import os
import re
import shutil
import ssl
import sys
import urllib.request
import zipfile
from datetime import datetime
from pathlib import Path
from typing import Callable, Optional
def _read_windows_user_env_var(name: str) -> Optional[str]:
"""Read a Windows user environment variable from HKCU\\Environment.
This is a fallback for cases where the current process did not inherit
the variable but the user profile already has it configured.
"""
if os.name != "nt":
return None
try:
import winreg
key = winreg.OpenKey(winreg.HKEY_CURRENT_USER, r"Environment")
try:
index = 0
target = name.lower()
while True:
value_name, value_data, _value_type = winreg.EnumValue(key, index)
if value_name.lower() == target and isinstance(value_data, str):
value = value_data.strip()
return value or None
index += 1
finally:
winreg.CloseKey(key)
except OSError:
return None
except Exception:
return None
return None
# ---------------------------------------------------------------------------
# Install-root resolution
# ---------------------------------------------------------------------------
def get_install_root() -> Path:
"""Return the MEC Agent installation root directory.
Frozen (PyInstaller onefile): api_server.exe lives in {install}/api_server/
Dev mode: server.py lives in agent_packaging/api_server/
"""
if getattr(sys, "frozen", False):
return Path(sys.executable).parent.parent
return Path(__file__).parent.parent
# ---------------------------------------------------------------------------
# Logging — writes to logs/updater.log and stdout
# ---------------------------------------------------------------------------
def _get_logger(install_root: Path) -> logging.Logger:
logger = logging.getLogger("mec_updater")
if not logger.handlers:
log_dir = install_root / "logs"
log_dir.mkdir(parents=True, exist_ok=True)
fmt = logging.Formatter("[%(asctime)s] %(message)s", datefmt="%Y-%m-%d %H:%M:%S")
fh = logging.FileHandler(str(log_dir / "updater.log"), encoding="utf-8")
fh.setFormatter(fmt)
sh = logging.StreamHandler(sys.stdout)
sh.setFormatter(fmt)
logger.addHandler(fh)
logger.addHandler(sh)
logger.setLevel(logging.INFO)
return logger
# ---------------------------------------------------------------------------
# Proxy skip flag — once a proxy attempt fails in this process session, skip
# proxy for all subsequent requests to avoid repeated "Proxy fetch failed" noise.
# ---------------------------------------------------------------------------
_proxy_skip: bool = False
# ---------------------------------------------------------------------------
# Proxy helpers — mirror the logic in the old pega_proxy.cmd reader
# ---------------------------------------------------------------------------
def _read_proxy_url(install_root: Path) -> Optional[str]:
"""Return HTTP proxy URL from env var or proxy/pega_proxy.cmd."""
url = (os.environ.get("HTTP_PROXY") or os.environ.get("http_proxy")
or os.environ.get("HTTPS_PROXY") or os.environ.get("https_proxy"))
if url:
return url.strip()
url = _read_windows_user_env_var("HTTP_PROXY") or _read_windows_user_env_var("HTTPS_PROXY")
if url:
return url
proxy_cmd = install_root / "proxy" / "pega_proxy.cmd"
if proxy_cmd.exists():
try:
for line in proxy_cmd.read_text(encoding="utf-8", errors="replace").splitlines():
m = re.search(r'(?i)set\s+"?(?:HTTP|HTTPS)_PROXY=([^"\r\n]+)"?', line)
if m:
return m.group(1).strip()
except Exception:
pass
return None
def _read_no_proxy(install_root: Path) -> Optional[str]:
"""Return NO_PROXY value from env var or proxy/pega_proxy.cmd."""
no_proxy = os.environ.get("NO_PROXY") or os.environ.get("no_proxy")
if no_proxy:
return no_proxy.strip()
no_proxy = _read_windows_user_env_var("NO_PROXY") or _read_windows_user_env_var("no_proxy")
if no_proxy:
return no_proxy
proxy_cmd = install_root / "proxy" / "pega_proxy.cmd"
if proxy_cmd.exists():
try:
for line in proxy_cmd.read_text(encoding="utf-8", errors="replace").splitlines():
m = re.search(r'(?i)set\s+"?NO_PROXY=([^"\r\n]+)"?', line)
if m:
return m.group(1).strip()
except Exception:
pass
return None
def _get_ca_bundle(install_root: Path) -> Optional[str]:
ca = install_root / "proxy" / "PEGA-CA-02.pem"
return str(ca) if ca.exists() else None
def _make_opener(install_root: Path) -> urllib.request.OpenerDirector:
"""Build urllib opener with optional proxy + custom CA bundle."""
proxy_url = _read_proxy_url(install_root)
ca_bundle = _get_ca_bundle(install_root)
ssl_ctx = ssl.create_default_context()
if ca_bundle:
ssl_ctx.load_verify_locations(ca_bundle)
handlers: list = [urllib.request.HTTPSHandler(context=ssl_ctx)]
if proxy_url:
proxies: dict = {"http": proxy_url, "https": proxy_url}
no_proxy = _read_no_proxy(install_root)
if no_proxy:
proxies["no"] = no_proxy
handlers.insert(0, urllib.request.ProxyHandler(proxies))
return urllib.request.build_opener(*handlers)
def _make_direct_opener(install_root: Path) -> urllib.request.OpenerDirector:
"""Build urllib opener that bypasses all proxies (direct connection)."""
ca_bundle = _get_ca_bundle(install_root)
ssl_ctx = ssl.create_default_context()
if ca_bundle:
ssl_ctx.load_verify_locations(ca_bundle)
return urllib.request.build_opener(
urllib.request.ProxyHandler({}),
urllib.request.HTTPSHandler(context=ssl_ctx),
)
# ---------------------------------------------------------------------------
# SHA-256 verification ← logic is compiled into the exe, not visible to users
# ---------------------------------------------------------------------------
def verify_sha256(file_path: Path, expected: str) -> bool:
"""Return True if *file_path* matches *expected* SHA-256 hex digest.
Returns True without checking when *expected* is empty or a placeholder
(contains "..."), preserving backward-compatibility with manifests that
omit hashes for large files.
"""
if not expected or "..." in expected:
return True
h = hashlib.sha256()
with open(file_path, "rb") as f:
for chunk in iter(lambda: f.read(65536), b""):
h.update(chunk)
return h.hexdigest().lower() == expected.lower()
# ---------------------------------------------------------------------------
# Version comparison
# ---------------------------------------------------------------------------
def _compare_versions(local: str, remote: str) -> bool:
"""Return True if *remote* is strictly newer than *local* (upgrade check)."""
# All components now use semver (unified app version). Build-number style kept
# for backward-compat with very old version.json files that tracked llama separately.
bm = re.match(r"^b(\d+)$", remote)
bl = re.match(r"^b(\d+)$", local)
if bm and bl:
return int(bm.group(1)) > int(bl.group(1))
# Legacy migration: local has build-number format (e.g. b9240) but remote uses
# semver (unified app version). Treat the component as needing upgrade.
if bl and not bm:
return True
try:
rv = tuple(int(x) for x in remote.split("."))
lv = tuple(int(x) for x in local.split("."))
return rv > lv
except Exception:
return remote != local
def _versions_differ(local: str, remote: str) -> bool:
"""Return True if *remote* differs from *local* (used in forced install / rollback)."""
return local.strip() != remote.strip()
# ---------------------------------------------------------------------------
# Local version.json
# ---------------------------------------------------------------------------
def read_local_version(install_root: Path) -> dict:
vf = install_root / "update" / "version.json"
if vf.exists():
try:
return json.loads(vf.read_text(encoding="utf-8-sig"))
except Exception:
pass
return {"version": "0.0.0", "components": {}}
def _write_local_version(install_root: Path, data: dict) -> None:
vf = install_root / "update" / "version.json"
vf.parent.mkdir(parents=True, exist_ok=True)
# Strip releaseNotes before writing — it may contain CJK characters that
# break PowerShell 5.1's Get-Content -Raw | ConvertFrom-Json (ANSI codepage).
# The launcher never needs releaseNotes; it's only consumed by the web UI
# which reads the remote manifest directly.
clean = {k: v for k, v in data.items() if k != "releaseNotes"}
vf.write_text(json.dumps(clean, ensure_ascii=False, indent=2), encoding="utf-8")
# ---------------------------------------------------------------------------
# Update config
# ---------------------------------------------------------------------------
def read_update_config(install_root: Path) -> dict:
cfg = install_root / "update" / "update_config.json"
if cfg.exists():
try:
return json.loads(cfg.read_text(encoding="utf-8-sig"))
except Exception:
pass
return {}
# ---------------------------------------------------------------------------
# Remote manifest / index fetch
# ---------------------------------------------------------------------------
def _fetch_json(url: str, install_root: Path, timeout: int = 30) -> dict:
"""Fetch JSON from *url* with proxy + direct-connection fallback."""
global _proxy_skip
log = _get_logger(install_root)
log.info(f"Fetching {url}")
# Use only direct connection if proxy has already failed in this session.
if _proxy_skip or not _read_proxy_url(install_root):
openers = [_make_direct_opener(install_root)]
else:
openers = [_make_opener(install_root), _make_direct_opener(install_root)]
last_err: Exception = RuntimeError("no opener")
raw = ""
for attempt, opener in enumerate(openers):
if attempt > 0:
_proxy_skip = True
log.warning("Proxy fetch failed, retrying without proxy...")
fresh_req = urllib.request.Request(url, headers={"User-Agent": "MEC-Agent-Updater/1.0"})
try:
with opener.open(fresh_req, timeout=timeout) as resp:
raw = resp.read().decode("utf-8-sig") # strip BOM
break
except Exception as e:
last_err = e
else:
raise RuntimeError(f"Failed to fetch {url}: {last_err}") from last_err
try:
return json.loads(raw)
except json.JSONDecodeError as e:
raise RuntimeError(f"Invalid JSON from {url}: {e}") from e
def _fetch_manifest(url: str, install_root: Path) -> dict:
return _fetch_json(url, install_root)
def fetch_version_index(install_root: Path) -> dict:
"""Fetch the version history index from the update server.
The index URL is derived from manifestUrl: strip the filename and append 'index.json'.
Expected server path: {BaseUrl}/index.json
Returns: { latest, versions: [{version, buildDate, releaseNotes, manifestUrl}] }
"""
cfg = read_update_config(install_root)
manifest_url = cfg.get("manifestUrl", "")
if not manifest_url:
raise RuntimeError("update.manifestUrl not configured in update/update_config.json")
# Derive index URL from manifest URL.
# manifestUrl examples:
# http://server/Mec_Agent_0_3_1/manifest.json → http://server/index.json
# http://server/v0.3.1/manifest.json → http://server/index.json (legacy)
from urllib.parse import urlparse
parsed = urlparse(manifest_url)
# Walk up to the root of the path (strip everything after the base URL)
parts = parsed.path.rstrip("/").split("/")
# Remove the filename and any version directory to get the server root path.
# Matches: Mec_Agent_0_3_1 or v0.3.1 (legacy)
while parts and (
parts[-1].endswith(".json")
or re.match(r"^Mec_Agent_\d+_\d+", parts[-1])
or re.match(r"^v\d+\.\d+", parts[-1])
):
parts.pop()
base_path = "/".join(parts) + "/"
index_url = f"{parsed.scheme}://{parsed.netloc}{base_path}index.json"
try:
return _fetch_json(index_url, install_root)
except Exception as e:
raise RuntimeError(f"Failed to fetch version index: {e}") from e
# ---------------------------------------------------------------------------
# Progress file — read by launcher splash screen and mecui
# ---------------------------------------------------------------------------
def _write_progress(install_root: Path, phase: str, current: int, total: int, component: str) -> None:
pf = install_root / "update" / "_progress.json"
pf.parent.mkdir(parents=True, exist_ok=True)
try:
pf.write_text(
json.dumps({
"phase": phase, "current": current, "total": total,
"component": component,
"timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
}, ensure_ascii=False),
encoding="utf-8",
)
except Exception:
pass
def _remove_progress(install_root: Path) -> None:
(install_root / "update" / "_progress.json").unlink(missing_ok=True)
# ---------------------------------------------------------------------------
# ZIP safe extraction — guards against zip-slip path traversal
# ---------------------------------------------------------------------------
def _safe_extract_zip(zip_path: Path, extract_dir: Path) -> None:
"""Extract *zip_path* into *extract_dir*, rejecting path-traversal members."""
resolve = extract_dir.resolve()
sep = os.sep
with zipfile.ZipFile(zip_path, "r") as zf:
for member in zf.namelist():
dest = (resolve / member).resolve()
if not (str(dest) == str(resolve) or str(dest).startswith(str(resolve) + sep)):
raise ValueError(f"Zip-slip attempt in archive: {member!r}")
zf.extractall(extract_dir)
def _is_self_executable(filepath: Path) -> bool:
"""Return True if *filepath* is the currently running executable.
On Windows a running .exe cannot be deleted or overwritten. Detecting this
lets us skip the locked file and defer its replacement to the launcher.
"""
if not getattr(sys, "frozen", False):
return False # dev mode — no locked exe
try:
return filepath.resolve().samefile(Path(sys.executable).resolve())
except OSError:
return False
def _pre_apply_launcher_and_updater(
install_root: Path,
staged_dir: Path,
remote_manifest: dict,
log: logging.Logger,
) -> None:
"""Extract launcher & updater from staged ZIPs BEFORE writing _pending_update.
This is the key to solving the chicken-and-egg problem:
- v0.3.11 launcher only knows "api_server.exe --apply-staged" (old shutil.rmtree)
- v0.3.12 launcher uses mec_updater.ps1 (file-by-file replace, no locked-exe issue)
- By extracting the new launcher scripts NOW, the NEXT launcher restart
(triggered by the user clicking "restart & apply") will run the NEW code
that calls mec_updater.ps1 instead of api_server.exe.
Similarly, mec_updater.ps1 is extracted so the new launcher can find it.
This is safe because:
- api_server.exe is running (doing the stage), but launcher scripts are NOT
- .ps1/.cmd/.vbs files are never locked by Windows
- If extraction fails, the update still proceeds — the old launcher just
falls back to api_server.exe --apply-staged (which has _replace_directory_files)
"""
import zipfile
components = remote_manifest.get("components", {})
# 1. Extract launcher scripts from launcher.zip
launcher_zip = staged_dir / "launcher.zip"
if launcher_zip.exists() and "launcher" in components:
launcher_dir = install_root / "launcher"
launcher_dir.mkdir(parents=True, exist_ok=True)
try:
with zipfile.ZipFile(str(launcher_zip), "r") as zf:
for entry in zf.infolist():
if entry.is_dir():
continue
# ZIP entries may be flat (launch_mec_agent.ps1) or in launcher/ subdir
name = Path(entry.filename).name
if not name or name.startswith("_"):
continue
dest = launcher_dir / name
with zf.open(entry) as src, open(dest, "wb") as dst:
shutil.copyfileobj(src, dst)
log.info(f"Pre-applied launcher script: {name}")
except Exception as e:
log.warning(f"Failed to pre-apply launcher: {e}")
# 2. Extract mec_updater.ps1 from updater.zip
updater_zip = staged_dir / "updater.zip"
if updater_zip.exists() and "updater" in components:
update_dir = install_root / "update"
update_dir.mkdir(parents=True, exist_ok=True)
try:
with zipfile.ZipFile(str(updater_zip), "r") as zf:
for entry in zf.infolist():
if entry.is_dir():
continue
name = Path(entry.filename).name
if not name or name.startswith("_"):
continue
dest = update_dir / name
with zf.open(entry) as src, open(dest, "wb") as dst:
shutil.copyfileobj(src, dst)
log.info(f"Pre-applied updater script: {name}")
except Exception as e:
log.warning(f"Failed to pre-apply updater: {e}")
def _replace_directory_files(
src_dir: Path,
dst_dir: Path,
preserve: list[str],
log: logging.Logger,
install_root: Path,
) -> bool:
"""Replace *dst_dir* contents with *src_dir* **file-by-file**.
Unlike shutil.rmtree + copytree, this strategy:
- Preserves subdirectories listed in *preserve* (moved aside & restored).
- Skips the currently running exe (if it lives inside dst_dir).
- Saves a pending copy of the new exe so the launcher can swap it later.
- Removes files in dst_dir that are NOT present in src_dir (clean update).
- Returns True on success, False on partial failure (locked exe deferred).
"""
update_dir = install_root / "update"
# ── 1. Move preserved subdirs aside ──────────────────────────────────
saved_dirs: dict[str, Path] = {}
if preserve and dst_dir.exists():
for sub in preserve:
sub_path = dst_dir / sub
if sub_path.is_dir():
tmp = src_dir / f"_preserve_{sub}"
shutil.move(str(sub_path), str(tmp))
saved_dirs[sub] = tmp
log.info(f"Preserved subdirectory for clean update: {sub_path}")
# ── 2. Build set of relative paths in the NEW version ────────────────
new_files: set[str] = set()
for root, dirs, files in os.walk(src_dir):
rel_root = Path(root).relative_to(src_dir)
for d in dirs:
if d.startswith("_preserve_"):
continue
new_files.add(str(rel_root / d) if str(rel_root) != "." else d)
for f in files:
new_files.add(str(rel_root / f) if str(rel_root) != "." else f)
# ── 3. Remove stale files/dirs from old version ─────────────────────
if dst_dir.exists():
for root, dirs, files in os.walk(dst_dir, topdown=False):
rel_root = Path(root).relative_to(dst_dir)
for f in files:
rel = str(rel_root / f) if str(rel_root) != "." else f
if rel not in new_files:
old_file = Path(root) / f
if _is_self_executable(old_file):
log.warning(f"Skipping removal of running exe (will be stale): {old_file}")
continue
try:
old_file.unlink()
log.debug(f"Removed stale file: {old_file}")
except (PermissionError, OSError) as exc:
log.warning(f"Could not remove stale file {old_file}: {exc}")
for d in dirs:
rel = str(rel_root / d) if str(rel_root) != "." else d
empty_dir = Path(root) / d
if rel not in new_files:
try:
if not any(empty_dir.iterdir()):
empty_dir.rmdir()
log.debug(f"Removed stale empty dir: {empty_dir}")
except (PermissionError, OSError) as exc:
log.debug(f"Could not remove stale dir {empty_dir}: {exc}")
# ── 4. Ensure dst_dir exists, then copy new files ────────────────────
dst_dir.mkdir(parents=True, exist_ok=True)
exe_deferred = False
for root, dirs, files in os.walk(src_dir):
rel_root = Path(root).relative_to(src_dir)
# Skip _preserve_ directories
dirs[:] = [d for d in dirs if not d.startswith("_preserve_")]
for d in dirs:
dest = dst_dir / rel_root / d
dest.mkdir(parents=True, exist_ok=True)
for f in files:
src_file = Path(root) / f
dest = dst_dir / rel_root / f
if _is_self_executable(dest):
# Cannot overwrite the running exe — save it for deferred replace
pending_exe = update_dir / "_pending_new_api_server.exe"
shutil.copy2(src_file, pending_exe)
flag_file = update_dir / "_exe_pending_replace"
flag_file.write_text("pending", encoding="utf-8")
log.info(f"Deferred exe replacement: saved new exe to {pending_exe}")
exe_deferred = True
continue
try:
shutil.copy2(src_file, dest)
except (PermissionError, OSError) as exc:
log.warning(f"Could not copy {src_file} -> {dest}: {exc}")
# For non-exe files, this is still an error but we continue
# ── 5. Restore preserved subdirectories ──────────────────────────────
for sub, tmp in saved_dirs.items():
restore_path = dst_dir / sub
shutil.move(str(tmp), str(restore_path))
log.info(f"Restored preserved subdirectory: {restore_path}")
return exe_deferred
# ---------------------------------------------------------------------------
# Component → installation-directory mapping
# ---------------------------------------------------------------------------
_COMPONENT_TARGETS: dict[str, dict] = {
"mec_agent": {"type": "file", "target": "mec_agent.exe"},
"api_server": {"type": "dir", "target": "api_server",
"preserve_subdirs": ["whisper", "skills_pools"]},
"whisper": {"type": "dir", "target": os.path.join("api_server", "whisper")},
"mecui": {"type": "dir", "target": os.path.join("web", "mecui")},
"skills": {"type": "dir", "target": "skills"},
"python_embed": {"type": "dir", "target": "python"},
"llama_cpp": {"type": "dir", "target": "llama"},
"whl": {"type": "dir", "target": "whl"},
"model": {"type": "dir", "target": "models"},
"launcher": {"type": "dir", "target": "launcher"},
"config": {"type": "dir", "target": "config"},
"proxy": {"type": "dir", "target": "proxy"},
"downloads": {"type": "dir", "target": "downloads"},
"ein_wiki": {"type": "dir", "target": os.path.join("bundled", "ein-wiki")},
"workspace_defaults": {"type": "dir", "target": "workspace-defaults"},
"updater": {"type": "file", "target": os.path.join("update", "mec_updater.ps1")},
}
# ===========================================================================
# Public API
# ===========================================================================
def check_updates(install_root: Path, manifest_url: str = "") -> dict:
"""Compare local version.json against the remote manifest.
When *manifest_url* is provided it is used directly (rollback preview for a
specific historical version). Otherwise the latest version is discovered via
index.json (derived from the configured manifestUrl) so that a client sitting
on v0.3.1 can detect v0.4.0 even though its manifestUrl still points at the
Mec_Agent_0_3_1 folder.
Returns a dict with keys:
hasUpdate, isDowngrade, currentVersion, remoteVersion, releaseNotes, updates[]
"""
cfg = read_update_config(install_root)
base_url = cfg.get("manifestUrl", "")
if not base_url:
raise RuntimeError("update.manifestUrl not configured in update/update_config.json")
if manifest_url:
# Explicit override (rollback): use as-is.
url = manifest_url
else:
# Normal check: discover the latest version via index.json so clients on
# older versions can detect newer releases.
try:
index = fetch_version_index(install_root)
latest_ver = index.get("latest", "")
versions = index.get("versions", [])
latest_entry = next(
(v for v in versions if v.get("version") == latest_ver),
versions[0] if versions else None,
)
url = latest_entry["manifestUrl"] if latest_entry else base_url
except Exception:
# If index.json is unreachable fall back to the configured manifest.
url = base_url
local = read_local_version(install_root)
remote = _fetch_manifest(url, install_root)
local_ver_str = local.get("version", "0.0.0")
remote_ver_str = remote.get("version", "0.0.0")
is_downgrade = _compare_versions(remote_ver_str, local_ver_str) # local > remote
local_comps: dict = local.get("components") or {}
updates = []
for name, remote_comp in remote.get("components", {}).items():
lc = local_comps.get(name, {})
lv = lc.get("version", "0.0.0") if isinstance(lc, dict) else str(lc)
rv = remote_comp.get("version", "0.0.0")
if _versions_differ(lv, rv):
updates.append({
"component": name,
"localVersion": lv,
"remoteVersion": rv,
"size": remote_comp.get("size", 0),
"url": remote_comp.get("url", ""),
})
return {
"hasUpdate": len(updates) > 0,
"isDowngrade": is_downgrade,
"currentVersion": local_ver_str,
"remoteVersion": remote_ver_str,
"releaseNotes": remote.get("releaseNotes", ""),
"manifestUrl": url,
"updates": updates,
}
def stage_updates(
install_root: Path,
progress_cb: Optional[Callable[[str, int, int, str], None]] = None,
manifest_url: str = "",
force: bool = False,
) -> dict:
"""Download and SHA-256-verify component ZIPs into update/_staged_update/.
Args:
install_root: MEC Agent installation root.
progress_cb: Called as (phase, current, total, component) on each step.
manifest_url: Override manifest URL (used for rollback to a specific version).
When empty, uses the configured manifestUrl (latest).
force: When True, download all components that differ regardless of
whether the remote version is newer (enables rollback / downgrade).
Returns: { staged, failed, failedComponents[], targetVersion }
"""
log = _get_logger(install_root)
global _proxy_skip
cfg = read_update_config(install_root)
base_url = cfg.get("manifestUrl", "")
if manifest_url:
# Explicit override (rollback / version-specific install): use as-is.
url = manifest_url
else:
# Normal upgrade: discover the latest manifest via index.json so that a
# client sitting on v0.3.1 stages v0.4.0, not v0.3.1 again.
try:
index = fetch_version_index(install_root)
latest_ver = index.get("latest", "")
versions = index.get("versions", [])
latest_entry = next(
(v for v in versions if v.get("version") == latest_ver),
versions[0] if versions else None,
)
url = latest_entry["manifestUrl"] if latest_entry else base_url
except Exception:
url = base_url
if not url:
raise RuntimeError("update.manifestUrl not configured")
local = read_local_version(install_root)
remote = _fetch_manifest(url, install_root)
staged_dir = install_root / "update" / "_staged_update"
pending_flag = install_root / "update" / "_pending_update"
staged_manifest = staged_dir / "manifest.json"
if staged_dir.exists():
shutil.rmtree(staged_dir)
staged_dir.mkdir(parents=True, exist_ok=True)
local_comps: dict = local.get("components") or {}
def _needs_download(name: str, rc: dict) -> bool:
lc = local_comps.get(name, {})
lv = (lc.get("version", "0.0.0") if isinstance(lc, dict) else str(lc))
rv = rc.get("version", "0.0.0")
if force:
# Rollback / forced install: download anything that differs
return _versions_differ(lv, rv)
else:
# Normal upgrade: only download if remote is newer
return _compare_versions(lv, rv)
to_download = [
(name, rc)
for name, rc in remote.get("components", {}).items()
if _needs_download(name, rc)
]
total = len(to_download)
downloaded_count = 0
failed_count = 0
failed_components: list[str] = []
for idx, (name, rc) in enumerate(to_download, 1):
url_dl = rc.get("url", "")
if not url_dl:
log.warning(f"No download URL for component '{name}' — skipping")
continue
zip_file = staged_dir / f"{name}.zip"
log.info(f"Downloading {name} v{rc.get('version', '?')} from {url_dl} ...")
_write_progress(install_root, "downloading", idx, total, name)
if progress_cb:
progress_cb("downloading", idx, total, name)
# Adaptive timeout: min 300 s, +300 s per GB, max 7200 s (2 h)
declared_bytes = int(rc.get("size", 0) or 0)
timeout = max(300, min(7200, 300 + ((declared_bytes + 2**30 - 1) // 2**30) * 300))
log.info(f" Timeout: {timeout}s (declared {declared_bytes / 2**20:.1f} MB)")
try:
last_dl_err: Exception = RuntimeError("no opener")
success = False
# Re-evaluate opener list each iteration: proxy may have been marked bad
# by a previous download in this same loop.
cur_openers = (
[_make_direct_opener(install_root)]
if _proxy_skip or not _read_proxy_url(install_root)
else [_make_opener(install_root), _make_direct_opener(install_root)]
)
for dl_attempt, dl_opener in enumerate(cur_openers):
if dl_attempt > 0:
_proxy_skip = True
log.warning(f"Proxy download failed for {name}, retrying without proxy...")
fresh_req = urllib.request.Request(url_dl, headers={"User-Agent": "MEC-Agent-Updater/1.0"})
try:
with dl_opener.open(fresh_req, timeout=timeout) as resp:
with open(zip_file, "wb") as f:
while True:
chunk = resp.read(65536)
if not chunk:
break
f.write(chunk)
success = True
break
except Exception as e:
last_dl_err = e
zip_file.unlink(missing_ok=True)
if not success:
raise last_dl_err
# SHA-256 verification — this logic lives inside the compiled exe
expected_hash = rc.get("sha256", "")
if not verify_sha256(zip_file, expected_hash):
log.error(f"SHA-256 mismatch for {name} — expected {expected_hash}")
zip_file.unlink(missing_ok=True)
failed_components.append(name)
failed_count += 1
continue
downloaded_count += 1
log.info(f"Downloaded {name} OK ({zip_file.stat().st_size / 2**20:.1f} MB)")
except Exception as e:
log.error(f"Failed to download {name}: {e}")
zip_file.unlink(missing_ok=True)
failed_components.append(name)
failed_count += 1
# Persist the remote manifest so apply_staged can use it
staged_manifest.write_text(json.dumps(remote, ensure_ascii=False, indent=2), encoding="utf-8")
target_version = remote.get("version", "")
if downloaded_count > 0:
pending_flag.parent.mkdir(parents=True, exist_ok=True)
# ── Pre-apply launcher & updater before writing _pending_update ────
# CRITICAL: When upgrading from v0.3.11 (or older), the OLD launcher
# only knows how to call "api_server.exe --apply-staged", which uses
# shutil.rmtree and hits WinError 32 on the running exe. The NEW
# launcher uses mec_updater.ps1 instead. By extracting the new
# launcher scripts NOW (before _pending_update is written), the next
# launcher restart will already run the NEW code path.
#
# Similarly, pre-apply mec_updater.ps1 so the new launcher finds it.
try:
_pre_apply_launcher_and_updater(install_root, staged_dir, remote, log)
except Exception as e:
log.warning(f"Pre-apply launcher/updater failed (non-fatal): {e}")
pending_flag.write_text("staged", encoding="utf-8")
log.info(f"Staging complete: {downloaded_count} downloaded, {failed_count} failed. Target: v{target_version}")
_write_progress(install_root, "staged", downloaded_count, total, "")
else:
log.info("No components staged (all up-to-date or all downloads failed).")
_remove_progress(install_root)
return {
"staged": downloaded_count,
"failed": failed_count,
"failedComponents": failed_components,
"targetVersion": target_version,
}
def apply_staged(install_root: Path) -> dict:
"""Apply ZIPs in update/_staged_update/ to the installation directories.
Each component is backed up first. Failures attempt per-component rollback.
Returns: { applied, components, errors, newVersion }
"""
log = _get_logger(install_root)
update_dir = install_root / "update"
staged_dir = update_dir / "_staged_update"
staged_manifest = staged_dir / "manifest.json"
pending_flag = update_dir / "_pending_update"
if not pending_flag.exists():
log.info("No pending update found.")
return {"applied": False, "components": 0, "errors": 0, "newVersion": "", "reason": "no_pending_update"}
if not staged_manifest.exists():
log.error("Staged manifest not found.")
return {"applied": False, "components": 0, "errors": 0, "newVersion": "", "reason": "missing_manifest"}
remote = json.loads(staged_manifest.read_text(encoding="utf-8-sig"))
apply_components = [
(name, rc)
for name, rc in remote.get("components", {}).items()
if (staged_dir / f"{name}.zip").exists()
]
total = len(apply_components)
applied_count = 0
error_count = 0
for idx, (name, rc) in enumerate(apply_components, 1):
zip_file = staged_dir / f"{name}.zip"
_write_progress(install_root, "applying", idx, total, name)
mapping = _COMPONENT_TARGETS.get(name)
if not mapping:
log.warning(f"Unknown component '{name}' — skipping (updater component is deprecated)")
continue
target_path = install_root / mapping["target"]
try:
# ── Extract ──────────────────────────────────────────────────
extract_dir = staged_dir / f"_extract_{name}"
if extract_dir.exists():
shutil.rmtree(extract_dir)
extract_dir.mkdir(parents=True, exist_ok=True)
_safe_extract_zip(zip_file, extract_dir)
# ── Install ──────────────────────────────────────────────────
# Issue #11158-2: For directory components, remove the existing
# directory first and then extract the new version cleanly.
# This avoids leftover files from the old version that are not
# present in the new archive (merge-overwrite used to keep them).
if mapping["type"] == "file":
fname = Path(mapping["target"]).name
src_file = next(extract_dir.rglob(fname), None)
if src_file:
target_path.parent.mkdir(parents=True, exist_ok=True)
shutil.copy2(src_file, target_path)
log.info(f"Updated file: {target_path}")
else:
log.warning(f"Expected file '{fname}' not found in {name} archive")
else:
# ── Directory component — use file-by-file replacement ────
# The old approach (shutil.rmtree + copytree) fails when the
# running api_server.exe lives inside the target directory because
# Windows locks the file (WinError 32 / OSError(32)).
#
# New approach: _replace_directory_files() copies files one by
# one, skips the locked exe, and saves a pending copy for the
# launcher to swap on next startup.
preserve = mapping.get("preserve_subdirs", [])
exe_deferred = _replace_directory_files(
extract_dir, target_path, preserve, log, install_root,
)
if exe_deferred:
log.info(f"Updated directory (exe deferred): {target_path}")
else:
log.info(f"Updated directory (clean): {target_path}")
shutil.rmtree(extract_dir, ignore_errors=True)
applied_count += 1
except Exception as e:
log.error(f"Failed to apply {name}: {e}")
error_count += 1
# ── Update local version.json ─────────────────────────────────────────
if applied_count > 0:
try:
new_local = read_local_version(install_root)
new_local["version"] = remote.get("version", new_local.get("version", "0.0.0"))
new_local["buildDate"] = remote.get("buildDate", "")
comps = new_local.setdefault("components", {})
for name in (n for n, _ in apply_components):
if (staged_dir / f"{name}.zip").exists():
comps.setdefault(name, {})["version"] = remote.get("components", {}).get(name, {}).get("version", "")
_write_local_version(install_root, new_local)
log.info(f"Updated local version.json to v{new_local['version']}")
except Exception as e:
log.warning(f"Could not update local version.json: {e}")
# ── Download remote CHANGELOG.md ──────────────────────────────────────
# After a successful update, fetch the latest CHANGELOG.md from the
# update server so the /api/changelog endpoint can serve it. The URL is
# derived from any component's download URL (same versioned directory).
if applied_count > 0:
try:
_changelog_url = None
for _rc in remote.get("components", {}).values():
_comp_url = _rc.get("url", "")
if _comp_url:
# e.g. .../updates/v0.3.8/api_server-1.0.4.zip → .../updates/v0.3.8/CHANGELOG.md
_changelog_url = _comp_url.rsplit("/", 1)[0] + "/CHANGELOG.md"
break
if _changelog_url:
log.info(f"Downloading CHANGELOG.md from {_changelog_url}")
_cl_req = urllib.request.Request(_changelog_url, headers={"User-Agent": "MEC-Agent-Updater/1.0"})
_cl_openers = (
[_make_direct_opener(install_root)]
if _proxy_skip or not _read_proxy_url(install_root)
else [_make_opener(install_root), _make_direct_opener(install_root)]
)
_cl_ok = False
for _cl_opener in _cl_openers:
try:
with _cl_opener.open(_cl_req, timeout=30) as _cl_resp:
_cl_data = _cl_resp.read()
_cl_dest = install_root / "update" / "CHANGELOG.md"
_cl_dest.parent.mkdir(parents=True, exist_ok=True)
_cl_dest.write_bytes(_cl_data)
log.info(f"Updated CHANGELOG.md ({len(_cl_data)} bytes)")
_cl_ok = True
break
except Exception as _cl_e:
log.debug(f"CHANGELOG.md download attempt failed: {_cl_e}")
continue
if not _cl_ok:
log.warning("Could not download CHANGELOG.md from update server (non-fatal)")
else:
log.debug("No component URL found — skipping CHANGELOG.md download")
except Exception as e:
log.warning(f"Failed to update CHANGELOG.md: {e} (non-fatal)")
# ── Cleanup ───────────────────────────────────────────────────────────
pending_flag.unlink(missing_ok=True)
shutil.rmtree(staged_dir, ignore_errors=True)
_remove_progress(install_root)
return {
"applied": applied_count > 0,
"components": applied_count,
"errors": error_count,
"newVersion": remote.get("version", ""),
}
# ---------------------------------------------------------------------------
# Model management
# ---------------------------------------------------------------------------
def read_model_config(install_root: Path) -> dict:
"""Read local model tracking config (which model is currently installed)."""
mf = install_root / "update" / "model_config.json"
if mf.exists():
try:
return json.loads(mf.read_text(encoding="utf-8-sig"))
except Exception:
pass
return {}
def _write_model_config(install_root: Path, data: dict) -> None:
mf = install_root / "update" / "model_config.json"
mf.parent.mkdir(parents=True, exist_ok=True)
mf.write_text(json.dumps(data, ensure_ascii=False, indent=2), encoding="utf-8")
def fetch_model_index(install_root: Path) -> dict:
"""Fetch models/index.json from the update server."""
cfg = read_update_config(install_root)
models_url = cfg.get("modelsUrl", "")
if not models_url:
raise RuntimeError("modelsUrl not configured in update/update_config.json")
return _fetch_json(models_url, install_root)
def check_model_update(install_root: Path) -> dict:
"""Check whether the active server model differs from the locally installed model.
Returns:
hasUpdate bool
activeFilename str — filename declared as active on the server
activeModel dict — full model entry from server index
localFilename str — currently installed model filename (empty if none)
localExists bool — whether the local file actually exists on disk
"""
index = fetch_model_index(install_root)
active_filename = index.get("active", "")
if not active_filename:
raise RuntimeError("models/index.json has no 'active' field")
models = {m["filename"]: m for m in index.get("models", [])}
active_model = models.get(active_filename)
if not active_model:
raise RuntimeError(f"Active model '{active_filename}' not found in models list")
local_cfg = read_model_config(install_root)
local_filename = local_cfg.get("filename", "")
models_dir = install_root / "models"
local_exists = (models_dir / local_filename).exists() if local_filename else False
has_update = (local_filename != active_filename) or not local_exists
return {
"hasUpdate": has_update,
"activeFilename": active_filename,
"activeModel": active_model,
"localFilename": local_filename,
"localExists": local_exists,
}
def download_model(
install_root: Path,
progress_cb: Optional[Callable[[str, int, int, str], None]] = None,
) -> dict:
"""Download the active model from the server, replacing any previous model.
Args:
install_root: installation root path
progress_cb: optional callback(phase, current, total, component)
Returns: { ok: bool, filename: str }
"""
log = _get_logger(install_root)
global _proxy_skip
status = check_model_update(install_root)
active_model = status["activeModel"]
active_filename = status["activeFilename"]
models_dir = install_root / "models"
models_dir.mkdir(parents=True, exist_ok=True)
url = active_model.get("url", "")
if not url:
raise RuntimeError(f"No URL for model '{active_filename}'")
expected_sha256 = active_model.get("sha256", "")
expected_size = int(active_model.get("size", 0))
# Support ZIP-packaged models (same scheme as other components).
# When the URL ends with .zip, we download the ZIP, verify its SHA-256,
# then extract the .gguf from it.
download_as_zip = url.lower().endswith(".zip")
dest_file = models_dir / active_filename
tmp_dl = models_dir / ((active_filename + ".zip.tmp") if download_as_zip else (active_filename + ".tmp"))
# Large timeout for GGUF files (can be several GB)
timeout = (
max(600, min(7200, 300 + ((expected_size + 2**30 - 1) // 2**30) * 300))
if expected_size else 3600
)
log.info(f"Downloading model {active_filename} from {url} (timeout: {timeout}s)")
_write_progress(install_root, "model_download", 0, expected_size, active_filename)
if progress_cb:
progress_cb("model_download", 0, expected_size, active_filename)
cur_openers = (
[_make_direct_opener(install_root)]
if _proxy_skip or not _read_proxy_url(install_root)
else [_make_opener(install_root), _make_direct_opener(install_root)]
)
last_err: Exception = RuntimeError("no opener")
success = False
for attempt, opener in enumerate(cur_openers):
if attempt > 0:
_proxy_skip = True
log.warning("Proxy download failed for model, retrying without proxy...")
req = urllib.request.Request(url, headers={"User-Agent": "MEC-Agent-Updater/1.0"})
try:
bytes_downloaded = 0
# Progress is reported every ~5 MB (approximately) to avoid flooding the file.
progress_interval = 5 * 2**20
next_progress_at = progress_interval
with opener.open(req, timeout=timeout) as resp:
with open(tmp_dl, "wb") as f:
while True:
chunk = resp.read(65536)
if not chunk:
break
f.write(chunk)
bytes_downloaded += len(chunk)
if bytes_downloaded >= next_progress_at:
_write_progress(install_root, "model_download",
bytes_downloaded, expected_size, active_filename)
if progress_cb:
progress_cb("model_download", bytes_downloaded,
expected_size, active_filename)
next_progress_at = bytes_downloaded + progress_interval
success = True
break
except Exception as e:
last_err = e
tmp_dl.unlink(missing_ok=True)
if not success:
raise RuntimeError(f"Failed to download model: {last_err}")
# Verify SHA-256 of the downloaded file (ZIP or raw gguf)
if expected_sha256 and "..." not in expected_sha256:
if not verify_sha256(tmp_dl, expected_sha256):
tmp_dl.unlink(missing_ok=True)
raise RuntimeError(f"SHA256 mismatch for model {active_filename}")
# Extract .gguf from ZIP if required
if download_as_zip:
log.info(f"Extracting {active_filename} from ZIP...")
try:
_safe_extract_zip(tmp_dl, models_dir)
extracted = models_dir / active_filename
if not extracted.exists():
# Try to find it inside a subdirectory
found = next(models_dir.rglob(active_filename), None)
if found:
found.rename(extracted)
else:
raise RuntimeError(f"{active_filename} not found inside downloaded ZIP")
tmp_dl.unlink(missing_ok=True)
except Exception:
tmp_dl.unlink(missing_ok=True)
raise
tmp_file = extracted
else:
tmp_file = tmp_dl
# Remove old model file if it differs from the newly downloaded one
local_cfg = read_model_config(install_root)
old_filename = local_cfg.get("filename", "")
if old_filename and old_filename != active_filename:
old_file = models_dir / old_filename
if old_file.exists():
try:
old_file.unlink()
log.info(f"Removed old model: {old_filename}")
except Exception as e:
log.warning(f"Could not remove old model {old_filename}: {e}")
# Move tmp to final destination (skip if ZIP extraction already placed the file correctly)
if tmp_file != dest_file:
if dest_file.exists():
dest_file.unlink()
tmp_file.rename(dest_file)
_write_model_config(install_root, {
"filename": active_filename,
"downloadDate": datetime.now().strftime("%Y-%m-%d"),
})
if progress_cb:
progress_cb("downloaded", 1, 1, active_filename)
log.info(f"Model {active_filename} downloaded successfully")
return {"ok": True, "filename": active_filename}
Comments...
No Comments Yet...