update_manager.py

2026-06-29




"""

update_manager.py — MEC Agent update engine (Python replacement for mec_updater.ps1).

 

All update logic runs inside api_server.exe so that SHA-256 verification details

are hidden from end-users. The old mec_updater.ps1 is no longer needed.

 

Public API

----------

  check_updates(install_root)                 -> dict

  stage_updates(install_root, progress_cb)    -> dict

  apply_staged(install_root)                  -> dict

  get_install_root()                          -> Path

  read_local_version(install_root)            -> dict

  read_update_config(install_root)            -> dict

  verify_sha256(file_path, expected)          -> bool

"""

 

from __future__ import annotations

 

import hashlib

import json

import logging

import os

import re

import shutil

import ssl

import sys

import urllib.request

import zipfile

from datetime import datetime

from pathlib import Path

from typing import Callable, Optional


 

def _read_windows_user_env_var(name: str) -> Optional[str]:

    """Read a Windows user environment variable from HKCU\\Environment.

 

    This is a fallback for cases where the current process did not inherit

    the variable but the user profile already has it configured.

    """

    if os.name != "nt":

        return None

    try:

        import winreg

 

        key = winreg.OpenKey(winreg.HKEY_CURRENT_USER, r"Environment")

        try:

            index = 0

            target = name.lower()

            while True:

                value_name, value_data, _value_type = winreg.EnumValue(key, index)

                if value_name.lower() == target and isinstance(value_data, str):

                    value = value_data.strip()

                    return value or None

                index += 1

        finally:

            winreg.CloseKey(key)

    except OSError:

        return None

    except Exception:

        return None

    return None


 

# ---------------------------------------------------------------------------

# Install-root resolution

# ---------------------------------------------------------------------------

 

def get_install_root() -> Path:

    """Return the MEC Agent installation root directory.

 

    Frozen (PyInstaller onefile): api_server.exe lives in {install}/api_server/

    Dev mode: server.py lives in agent_packaging/api_server/

    """

    if getattr(sys, "frozen", False):

        return Path(sys.executable).parent.parent

    return Path(__file__).parent.parent


 

# ---------------------------------------------------------------------------

# Logging — writes to logs/updater.log and stdout

# ---------------------------------------------------------------------------

 

def _get_logger(install_root: Path) -> logging.Logger:

    logger = logging.getLogger("mec_updater")

    if not logger.handlers:

        log_dir = install_root / "logs"

        log_dir.mkdir(parents=True, exist_ok=True)

        fmt = logging.Formatter("[%(asctime)s] %(message)s", datefmt="%Y-%m-%d %H:%M:%S")

        fh = logging.FileHandler(str(log_dir / "updater.log"), encoding="utf-8")

        fh.setFormatter(fmt)

        sh = logging.StreamHandler(sys.stdout)

        sh.setFormatter(fmt)

        logger.addHandler(fh)

        logger.addHandler(sh)

        logger.setLevel(logging.INFO)

    return logger


 

# ---------------------------------------------------------------------------

# Proxy skip flag — once a proxy attempt fails in this process session, skip

# proxy for all subsequent requests to avoid repeated "Proxy fetch failed" noise.

# ---------------------------------------------------------------------------

_proxy_skip: bool = False


 

# ---------------------------------------------------------------------------

# Proxy helpers — mirror the logic in the old pega_proxy.cmd reader

# ---------------------------------------------------------------------------

 

def _read_proxy_url(install_root: Path) -> Optional[str]:

    """Return HTTP proxy URL from env var or proxy/pega_proxy.cmd."""

    url = (os.environ.get("HTTP_PROXY") or os.environ.get("http_proxy")

           or os.environ.get("HTTPS_PROXY") or os.environ.get("https_proxy"))

    if url:

        return url.strip()

    url = _read_windows_user_env_var("HTTP_PROXY") or _read_windows_user_env_var("HTTPS_PROXY")

    if url:

        return url

    proxy_cmd = install_root / "proxy" / "pega_proxy.cmd"

    if proxy_cmd.exists():

        try:

            for line in proxy_cmd.read_text(encoding="utf-8", errors="replace").splitlines():

                m = re.search(r'(?i)set\s+"?(?:HTTP|HTTPS)_PROXY=([^"\r\n]+)"?', line)

                if m:

                    return m.group(1).strip()

        except Exception:

            pass

    return None


 

def _read_no_proxy(install_root: Path) -> Optional[str]:

    """Return NO_PROXY value from env var or proxy/pega_proxy.cmd."""

    no_proxy = os.environ.get("NO_PROXY") or os.environ.get("no_proxy")

    if no_proxy:

        return no_proxy.strip()

    no_proxy = _read_windows_user_env_var("NO_PROXY") or _read_windows_user_env_var("no_proxy")

    if no_proxy:

        return no_proxy

    proxy_cmd = install_root / "proxy" / "pega_proxy.cmd"

    if proxy_cmd.exists():

        try:

            for line in proxy_cmd.read_text(encoding="utf-8", errors="replace").splitlines():

                m = re.search(r'(?i)set\s+"?NO_PROXY=([^"\r\n]+)"?', line)

                if m:

                    return m.group(1).strip()

        except Exception:

            pass

    return None


 

def _get_ca_bundle(install_root: Path) -> Optional[str]:

    ca = install_root / "proxy" / "PEGA-CA-02.pem"

    return str(ca) if ca.exists() else None


 

def _make_opener(install_root: Path) -> urllib.request.OpenerDirector:

    """Build urllib opener with optional proxy + custom CA bundle."""

    proxy_url = _read_proxy_url(install_root)

    ca_bundle  = _get_ca_bundle(install_root)

 

    ssl_ctx = ssl.create_default_context()

    if ca_bundle:

        ssl_ctx.load_verify_locations(ca_bundle)

 

    handlers: list = [urllib.request.HTTPSHandler(context=ssl_ctx)]

    if proxy_url:

        proxies: dict = {"http": proxy_url, "https": proxy_url}

        no_proxy = _read_no_proxy(install_root)

        if no_proxy:

            proxies["no"] = no_proxy

        handlers.insert(0, urllib.request.ProxyHandler(proxies))

 

    return urllib.request.build_opener(*handlers)


 

def _make_direct_opener(install_root: Path) -> urllib.request.OpenerDirector:

    """Build urllib opener that bypasses all proxies (direct connection)."""

    ca_bundle = _get_ca_bundle(install_root)

    ssl_ctx = ssl.create_default_context()

    if ca_bundle:

        ssl_ctx.load_verify_locations(ca_bundle)

    return urllib.request.build_opener(

        urllib.request.ProxyHandler({}),

        urllib.request.HTTPSHandler(context=ssl_ctx),

    )


 

# ---------------------------------------------------------------------------

# SHA-256 verification  ← logic is compiled into the exe, not visible to users

# ---------------------------------------------------------------------------

 

def verify_sha256(file_path: Path, expected: str) -> bool:

    """Return True if *file_path* matches *expected* SHA-256 hex digest.

 

    Returns True without checking when *expected* is empty or a placeholder

    (contains "..."), preserving backward-compatibility with manifests that

    omit hashes for large files.

    """

    if not expected or "..." in expected:

        return True

    h = hashlib.sha256()

    with open(file_path, "rb") as f:

        for chunk in iter(lambda: f.read(65536), b""):

            h.update(chunk)

    return h.hexdigest().lower() == expected.lower()


 

# ---------------------------------------------------------------------------

# Version comparison

# ---------------------------------------------------------------------------

 

def _compare_versions(local: str, remote: str) -> bool:

    """Return True if *remote* is strictly newer than *local* (upgrade check)."""

    # All components now use semver (unified app version). Build-number style kept

    # for backward-compat with very old version.json files that tracked llama separately.

    bm = re.match(r"^b(\d+)$", remote)

    bl = re.match(r"^b(\d+)$", local)

    if bm and bl:

        return int(bm.group(1)) > int(bl.group(1))

    # Legacy migration: local has build-number format (e.g. b9240) but remote uses

    # semver (unified app version). Treat the component as needing upgrade.

    if bl and not bm:

        return True

    try:

        rv = tuple(int(x) for x in remote.split("."))

        lv = tuple(int(x) for x in local.split("."))

        return rv > lv

    except Exception:

        return remote != local


 

def _versions_differ(local: str, remote: str) -> bool:

    """Return True if *remote* differs from *local* (used in forced install / rollback)."""

    return local.strip() != remote.strip()


 

# ---------------------------------------------------------------------------

# Local version.json

# ---------------------------------------------------------------------------

 

def read_local_version(install_root: Path) -> dict:

    vf = install_root / "update" / "version.json"

    if vf.exists():

        try:

            return json.loads(vf.read_text(encoding="utf-8-sig"))

        except Exception:

            pass

    return {"version": "0.0.0", "components": {}}


 

def _write_local_version(install_root: Path, data: dict) -> None:

    vf = install_root / "update" / "version.json"

    vf.parent.mkdir(parents=True, exist_ok=True)

    # Strip releaseNotes before writing — it may contain CJK characters that

    # break PowerShell 5.1's Get-Content -Raw | ConvertFrom-Json (ANSI codepage).

    # The launcher never needs releaseNotes; it's only consumed by the web UI

    # which reads the remote manifest directly.

    clean = {k: v for k, v in data.items() if k != "releaseNotes"}

    vf.write_text(json.dumps(clean, ensure_ascii=False, indent=2), encoding="utf-8")


 

# ---------------------------------------------------------------------------

# Update config

# ---------------------------------------------------------------------------

 

def read_update_config(install_root: Path) -> dict:

    cfg = install_root / "update" / "update_config.json"

    if cfg.exists():

        try:

            return json.loads(cfg.read_text(encoding="utf-8-sig"))

        except Exception:

            pass

    return {}


 

# ---------------------------------------------------------------------------

# Remote manifest / index fetch

# ---------------------------------------------------------------------------

 

def _fetch_json(url: str, install_root: Path, timeout: int = 30) -> dict:

    """Fetch JSON from *url* with proxy + direct-connection fallback."""

    global _proxy_skip

    log = _get_logger(install_root)

    log.info(f"Fetching {url}")

    # Use only direct connection if proxy has already failed in this session.

    if _proxy_skip or not _read_proxy_url(install_root):

        openers = [_make_direct_opener(install_root)]

    else:

        openers = [_make_opener(install_root), _make_direct_opener(install_root)]

    last_err: Exception = RuntimeError("no opener")

    raw = ""

    for attempt, opener in enumerate(openers):

        if attempt > 0:

            _proxy_skip = True

            log.warning("Proxy fetch failed, retrying without proxy...")

        fresh_req = urllib.request.Request(url, headers={"User-Agent": "MEC-Agent-Updater/1.0"})

        try:

            with opener.open(fresh_req, timeout=timeout) as resp:

                raw = resp.read().decode("utf-8-sig")  # strip BOM

            break

        except Exception as e:

            last_err = e

    else:

        raise RuntimeError(f"Failed to fetch {url}: {last_err}") from last_err

    try:

        return json.loads(raw)

    except json.JSONDecodeError as e:

        raise RuntimeError(f"Invalid JSON from {url}: {e}") from e


 

def _fetch_manifest(url: str, install_root: Path) -> dict:

    return _fetch_json(url, install_root)


 

def fetch_version_index(install_root: Path) -> dict:

    """Fetch the version history index from the update server.

 

    The index URL is derived from manifestUrl: strip the filename and append 'index.json'.

    Expected server path: {BaseUrl}/index.json

    Returns: { latest, versions: [{version, buildDate, releaseNotes, manifestUrl}] }

    """

    cfg = read_update_config(install_root)

    manifest_url = cfg.get("manifestUrl", "")

    if not manifest_url:

        raise RuntimeError("update.manifestUrl not configured in update/update_config.json")

 

    # Derive index URL from manifest URL.

    # manifestUrl examples:

    #   http://server/Mec_Agent_0_3_1/manifest.json  → http://server/index.json

    #   http://server/v0.3.1/manifest.json           → http://server/index.json (legacy)

    from urllib.parse import urlparse

    parsed = urlparse(manifest_url)

    # Walk up to the root of the path (strip everything after the base URL)

    parts = parsed.path.rstrip("/").split("/")

    # Remove the filename and any version directory to get the server root path.

    # Matches: Mec_Agent_0_3_1  or  v0.3.1  (legacy)

    while parts and (

        parts[-1].endswith(".json")

        or re.match(r"^Mec_Agent_\d+_\d+", parts[-1])

        or re.match(r"^v\d+\.\d+", parts[-1])

    ):

        parts.pop()

    base_path = "/".join(parts) + "/"

    index_url = f"{parsed.scheme}://{parsed.netloc}{base_path}index.json"

 

    try:

        return _fetch_json(index_url, install_root)

    except Exception as e:

        raise RuntimeError(f"Failed to fetch version index: {e}") from e


 

# ---------------------------------------------------------------------------

# Progress file — read by launcher splash screen and mecui

# ---------------------------------------------------------------------------

 

def _write_progress(install_root: Path, phase: str, current: int, total: int, component: str) -> None:

    pf = install_root / "update" / "_progress.json"

    pf.parent.mkdir(parents=True, exist_ok=True)

    try:

        pf.write_text(

            json.dumps({

                "phase": phase, "current": current, "total": total,

                "component": component,

                "timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),

            }, ensure_ascii=False),

            encoding="utf-8",

        )

    except Exception:

        pass


 

def _remove_progress(install_root: Path) -> None:

    (install_root / "update" / "_progress.json").unlink(missing_ok=True)


 

# ---------------------------------------------------------------------------

# ZIP safe extraction — guards against zip-slip path traversal

# ---------------------------------------------------------------------------

 

def _safe_extract_zip(zip_path: Path, extract_dir: Path) -> None:

    """Extract *zip_path* into *extract_dir*, rejecting path-traversal members."""

    resolve = extract_dir.resolve()

    sep = os.sep

    with zipfile.ZipFile(zip_path, "r") as zf:

        for member in zf.namelist():

            dest = (resolve / member).resolve()

            if not (str(dest) == str(resolve) or str(dest).startswith(str(resolve) + sep)):

                raise ValueError(f"Zip-slip attempt in archive: {member!r}")

        zf.extractall(extract_dir)


 

def _is_self_executable(filepath: Path) -> bool:

    """Return True if *filepath* is the currently running executable.

 

    On Windows a running .exe cannot be deleted or overwritten. Detecting this

    lets us skip the locked file and defer its replacement to the launcher.

    """

    if not getattr(sys, "frozen", False):

        return False  # dev mode — no locked exe

    try:

        return filepath.resolve().samefile(Path(sys.executable).resolve())

    except OSError:

        return False


 

def _pre_apply_launcher_and_updater(

    install_root: Path,

    staged_dir: Path,

    remote_manifest: dict,

    log: logging.Logger,

) -> None:

    """Extract launcher & updater from staged ZIPs BEFORE writing _pending_update.

 

    This is the key to solving the chicken-and-egg problem:

    - v0.3.11 launcher only knows "api_server.exe --apply-staged" (old shutil.rmtree)

    - v0.3.12 launcher uses mec_updater.ps1 (file-by-file replace, no locked-exe issue)

    - By extracting the new launcher scripts NOW, the NEXT launcher restart

      (triggered by the user clicking "restart & apply") will run the NEW code

      that calls mec_updater.ps1 instead of api_server.exe.

 

    Similarly, mec_updater.ps1 is extracted so the new launcher can find it.

 

    This is safe because:

    - api_server.exe is running (doing the stage), but launcher scripts are NOT

    - .ps1/.cmd/.vbs files are never locked by Windows

    - If extraction fails, the update still proceeds — the old launcher just

      falls back to api_server.exe --apply-staged (which has _replace_directory_files)

    """

    import zipfile

 

    components = remote_manifest.get("components", {})

 

    # 1. Extract launcher scripts from launcher.zip

    launcher_zip = staged_dir / "launcher.zip"

    if launcher_zip.exists() and "launcher" in components:

        launcher_dir = install_root / "launcher"

        launcher_dir.mkdir(parents=True, exist_ok=True)

        try:

            with zipfile.ZipFile(str(launcher_zip), "r") as zf:

                for entry in zf.infolist():

                    if entry.is_dir():

                        continue

                    # ZIP entries may be flat (launch_mec_agent.ps1) or in launcher/ subdir

                    name = Path(entry.filename).name

                    if not name or name.startswith("_"):

                        continue

                    dest = launcher_dir / name

                    with zf.open(entry) as src, open(dest, "wb") as dst:

                        shutil.copyfileobj(src, dst)

                    log.info(f"Pre-applied launcher script: {name}")

        except Exception as e:

            log.warning(f"Failed to pre-apply launcher: {e}")

 

    # 2. Extract mec_updater.ps1 from updater.zip

    updater_zip = staged_dir / "updater.zip"

    if updater_zip.exists() and "updater" in components:

        update_dir = install_root / "update"

        update_dir.mkdir(parents=True, exist_ok=True)

        try:

            with zipfile.ZipFile(str(updater_zip), "r") as zf:

                for entry in zf.infolist():

                    if entry.is_dir():

                        continue

                    name = Path(entry.filename).name

                    if not name or name.startswith("_"):

                        continue

                    dest = update_dir / name

                    with zf.open(entry) as src, open(dest, "wb") as dst:

                        shutil.copyfileobj(src, dst)

                    log.info(f"Pre-applied updater script: {name}")

        except Exception as e:

            log.warning(f"Failed to pre-apply updater: {e}")


 

def _replace_directory_files(

    src_dir: Path,

    dst_dir: Path,

    preserve: list[str],

    log: logging.Logger,

    install_root: Path,

) -> bool:

    """Replace *dst_dir* contents with *src_dir* **file-by-file**.

 

    Unlike shutil.rmtree + copytree, this strategy:

    - Preserves subdirectories listed in *preserve* (moved aside & restored).

    - Skips the currently running exe (if it lives inside dst_dir).

    - Saves a pending copy of the new exe so the launcher can swap it later.

    - Removes files in dst_dir that are NOT present in src_dir (clean update).

    - Returns True on success, False on partial failure (locked exe deferred).

    """

    update_dir = install_root / "update"

 

    # ── 1. Move preserved subdirs aside ──────────────────────────────────

    saved_dirs: dict[str, Path] = {}

    if preserve and dst_dir.exists():

        for sub in preserve:

            sub_path = dst_dir / sub

            if sub_path.is_dir():

                tmp = src_dir / f"_preserve_{sub}"

                shutil.move(str(sub_path), str(tmp))

                saved_dirs[sub] = tmp

                log.info(f"Preserved subdirectory for clean update: {sub_path}")

 

    # ── 2. Build set of relative paths in the NEW version ────────────────

    new_files: set[str] = set()

    for root, dirs, files in os.walk(src_dir):

        rel_root = Path(root).relative_to(src_dir)

        for d in dirs:

            if d.startswith("_preserve_"):

                continue

            new_files.add(str(rel_root / d) if str(rel_root) != "." else d)

        for f in files:

            new_files.add(str(rel_root / f) if str(rel_root) != "." else f)

 

    # ── 3. Remove stale files/dirs from old version ─────────────────────

    if dst_dir.exists():

        for root, dirs, files in os.walk(dst_dir, topdown=False):

            rel_root = Path(root).relative_to(dst_dir)

            for f in files:

                rel = str(rel_root / f) if str(rel_root) != "." else f

                if rel not in new_files:

                    old_file = Path(root) / f

                    if _is_self_executable(old_file):

                        log.warning(f"Skipping removal of running exe (will be stale): {old_file}")

                        continue

                    try:

                        old_file.unlink()

                        log.debug(f"Removed stale file: {old_file}")

                    except (PermissionError, OSError) as exc:

                        log.warning(f"Could not remove stale file {old_file}: {exc}")

            for d in dirs:

                rel = str(rel_root / d) if str(rel_root) != "." else d

                empty_dir = Path(root) / d

                if rel not in new_files:

                    try:

                        if not any(empty_dir.iterdir()):

                            empty_dir.rmdir()

                            log.debug(f"Removed stale empty dir: {empty_dir}")

                    except (PermissionError, OSError) as exc:

                        log.debug(f"Could not remove stale dir {empty_dir}: {exc}")

 

    # ── 4. Ensure dst_dir exists, then copy new files ────────────────────

    dst_dir.mkdir(parents=True, exist_ok=True)

    exe_deferred = False

    for root, dirs, files in os.walk(src_dir):

        rel_root = Path(root).relative_to(src_dir)

        # Skip _preserve_ directories

        dirs[:] = [d for d in dirs if not d.startswith("_preserve_")]

        for d in dirs:

            dest = dst_dir / rel_root / d

            dest.mkdir(parents=True, exist_ok=True)

        for f in files:

            src_file = Path(root) / f

            dest = dst_dir / rel_root / f

            if _is_self_executable(dest):

                # Cannot overwrite the running exe — save it for deferred replace

                pending_exe = update_dir / "_pending_new_api_server.exe"

                shutil.copy2(src_file, pending_exe)

                flag_file = update_dir / "_exe_pending_replace"

                flag_file.write_text("pending", encoding="utf-8")

                log.info(f"Deferred exe replacement: saved new exe to {pending_exe}")

                exe_deferred = True

                continue

            try:

                shutil.copy2(src_file, dest)

            except (PermissionError, OSError) as exc:

                log.warning(f"Could not copy {src_file} -> {dest}: {exc}")

                # For non-exe files, this is still an error but we continue

 

    # ── 5. Restore preserved subdirectories ──────────────────────────────

    for sub, tmp in saved_dirs.items():

        restore_path = dst_dir / sub

        shutil.move(str(tmp), str(restore_path))

        log.info(f"Restored preserved subdirectory: {restore_path}")

 

    return exe_deferred


 

# ---------------------------------------------------------------------------

# Component → installation-directory mapping

# ---------------------------------------------------------------------------

 

_COMPONENT_TARGETS: dict[str, dict] = {

    "mec_agent":          {"type": "file", "target": "mec_agent.exe"},

    "api_server":         {"type": "dir",  "target": "api_server",

                           "preserve_subdirs": ["whisper", "skills_pools"]},

    "whisper":            {"type": "dir",  "target": os.path.join("api_server", "whisper")},

    "mecui":              {"type": "dir",  "target": os.path.join("web", "mecui")},

    "skills":             {"type": "dir",  "target": "skills"},

    "python_embed":       {"type": "dir",  "target": "python"},

    "llama_cpp":          {"type": "dir",  "target": "llama"},

    "whl":                {"type": "dir",  "target": "whl"},

    "model":              {"type": "dir",  "target": "models"},

    "launcher":           {"type": "dir",  "target": "launcher"},

    "config":             {"type": "dir",  "target": "config"},

    "proxy":              {"type": "dir",  "target": "proxy"},

    "downloads":          {"type": "dir",  "target": "downloads"},

    "ein_wiki":           {"type": "dir",  "target": os.path.join("bundled", "ein-wiki")},

    "workspace_defaults": {"type": "dir",  "target": "workspace-defaults"},

    "updater":            {"type": "file", "target": os.path.join("update", "mec_updater.ps1")},

}


 

# ===========================================================================

# Public API

# ===========================================================================

 

def check_updates(install_root: Path, manifest_url: str = "") -> dict:

    """Compare local version.json against the remote manifest.

 

    When *manifest_url* is provided it is used directly (rollback preview for a

    specific historical version).  Otherwise the latest version is discovered via

    index.json (derived from the configured manifestUrl) so that a client sitting

    on v0.3.1 can detect v0.4.0 even though its manifestUrl still points at the

    Mec_Agent_0_3_1 folder.

 

    Returns a dict with keys:

      hasUpdate, isDowngrade, currentVersion, remoteVersion, releaseNotes, updates[]

    """

    cfg = read_update_config(install_root)

    base_url = cfg.get("manifestUrl", "")

    if not base_url:

        raise RuntimeError("update.manifestUrl not configured in update/update_config.json")

 

    if manifest_url:

        # Explicit override (rollback): use as-is.

        url = manifest_url

    else:

        # Normal check: discover the latest version via index.json so clients on

        # older versions can detect newer releases.

        try:

            index = fetch_version_index(install_root)

            latest_ver = index.get("latest", "")

            versions   = index.get("versions", [])

            latest_entry = next(

                (v for v in versions if v.get("version") == latest_ver),

                versions[0] if versions else None,

            )

            url = latest_entry["manifestUrl"] if latest_entry else base_url

        except Exception:

            # If index.json is unreachable fall back to the configured manifest.

            url = base_url

 

    local  = read_local_version(install_root)

    remote = _fetch_manifest(url, install_root)

 

    local_ver_str  = local.get("version", "0.0.0")

    remote_ver_str = remote.get("version", "0.0.0")

    is_downgrade   = _compare_versions(remote_ver_str, local_ver_str)  # local > remote

 

    local_comps: dict = local.get("components") or {}

    updates = []

    for name, remote_comp in remote.get("components", {}).items():

        lc = local_comps.get(name, {})

        lv = lc.get("version", "0.0.0") if isinstance(lc, dict) else str(lc)

        rv = remote_comp.get("version", "0.0.0")

        if _versions_differ(lv, rv):

            updates.append({

                "component":     name,

                "localVersion":  lv,

                "remoteVersion": rv,

                "size":          remote_comp.get("size", 0),

                "url":           remote_comp.get("url", ""),

            })

 

    return {

        "hasUpdate":      len(updates) > 0,

        "isDowngrade":    is_downgrade,

        "currentVersion": local_ver_str,

        "remoteVersion":  remote_ver_str,

        "releaseNotes":   remote.get("releaseNotes", ""),

        "manifestUrl":    url,

        "updates":        updates,

    }


 

def stage_updates(

    install_root: Path,

    progress_cb: Optional[Callable[[str, int, int, str], None]] = None,

    manifest_url: str = "",

    force: bool = False,

) -> dict:

    """Download and SHA-256-verify component ZIPs into update/_staged_update/.

 

    Args:

        install_root: MEC Agent installation root.

        progress_cb:  Called as (phase, current, total, component) on each step.

        manifest_url: Override manifest URL (used for rollback to a specific version).

                      When empty, uses the configured manifestUrl (latest).

        force:        When True, download all components that differ regardless of

                      whether the remote version is newer (enables rollback / downgrade).

 

    Returns: { staged, failed, failedComponents[], targetVersion }

    """

    log = _get_logger(install_root)

    global _proxy_skip

    cfg = read_update_config(install_root)

    base_url = cfg.get("manifestUrl", "")

 

    if manifest_url:

        # Explicit override (rollback / version-specific install): use as-is.

        url = manifest_url

    else:

        # Normal upgrade: discover the latest manifest via index.json so that a

        # client sitting on v0.3.1 stages v0.4.0, not v0.3.1 again.

        try:

            index = fetch_version_index(install_root)

            latest_ver = index.get("latest", "")

            versions   = index.get("versions", [])

            latest_entry = next(

                (v for v in versions if v.get("version") == latest_ver),

                versions[0] if versions else None,

            )

            url = latest_entry["manifestUrl"] if latest_entry else base_url

        except Exception:

            url = base_url

 

    if not url:

        raise RuntimeError("update.manifestUrl not configured")

 

    local  = read_local_version(install_root)

    remote = _fetch_manifest(url, install_root)

 

    staged_dir      = install_root / "update" / "_staged_update"

    pending_flag    = install_root / "update" / "_pending_update"

    staged_manifest = staged_dir / "manifest.json"

 

    if staged_dir.exists():

        shutil.rmtree(staged_dir)

    staged_dir.mkdir(parents=True, exist_ok=True)

 

    local_comps: dict = local.get("components") or {}

 

    def _needs_download(name: str, rc: dict) -> bool:

        lc = local_comps.get(name, {})

        lv = (lc.get("version", "0.0.0") if isinstance(lc, dict) else str(lc))

        rv = rc.get("version", "0.0.0")

        if force:

            # Rollback / forced install: download anything that differs

            return _versions_differ(lv, rv)

        else:

            # Normal upgrade: only download if remote is newer

            return _compare_versions(lv, rv)

 

    to_download = [

        (name, rc)

        for name, rc in remote.get("components", {}).items()

        if _needs_download(name, rc)

    ]

 

    total = len(to_download)

    downloaded_count = 0

    failed_count = 0

    failed_components: list[str] = []

 

    for idx, (name, rc) in enumerate(to_download, 1):

        url_dl = rc.get("url", "")

        if not url_dl:

            log.warning(f"No download URL for component '{name}' — skipping")

            continue

 

        zip_file = staged_dir / f"{name}.zip"

        log.info(f"Downloading {name} v{rc.get('version', '?')} from {url_dl} ...")

        _write_progress(install_root, "downloading", idx, total, name)

        if progress_cb:

            progress_cb("downloading", idx, total, name)

 

        # Adaptive timeout: min 300 s, +300 s per GB, max 7200 s (2 h)

        declared_bytes = int(rc.get("size", 0) or 0)

        timeout = max(300, min(7200, 300 + ((declared_bytes + 2**30 - 1) // 2**30) * 300))

        log.info(f"  Timeout: {timeout}s (declared {declared_bytes / 2**20:.1f} MB)")

 

        try:

            last_dl_err: Exception = RuntimeError("no opener")

            success = False

            # Re-evaluate opener list each iteration: proxy may have been marked bad

            # by a previous download in this same loop.

            cur_openers = (

                [_make_direct_opener(install_root)]

                if _proxy_skip or not _read_proxy_url(install_root)

                else [_make_opener(install_root), _make_direct_opener(install_root)]

            )

            for dl_attempt, dl_opener in enumerate(cur_openers):

                if dl_attempt > 0:

                    _proxy_skip = True

                    log.warning(f"Proxy download failed for {name}, retrying without proxy...")

                fresh_req = urllib.request.Request(url_dl, headers={"User-Agent": "MEC-Agent-Updater/1.0"})

                try:

                    with dl_opener.open(fresh_req, timeout=timeout) as resp:

                        with open(zip_file, "wb") as f:

                            while True:

                                chunk = resp.read(65536)

                                if not chunk:

                                    break

                                f.write(chunk)

                    success = True

                    break

                except Exception as e:

                    last_dl_err = e

                    zip_file.unlink(missing_ok=True)

            if not success:

                raise last_dl_err

 

            # SHA-256 verification — this logic lives inside the compiled exe

            expected_hash = rc.get("sha256", "")

            if not verify_sha256(zip_file, expected_hash):

                log.error(f"SHA-256 mismatch for {name} — expected {expected_hash}")

                zip_file.unlink(missing_ok=True)

                failed_components.append(name)

                failed_count += 1

                continue

 

            downloaded_count += 1

            log.info(f"Downloaded {name} OK ({zip_file.stat().st_size / 2**20:.1f} MB)")

 

        except Exception as e:

            log.error(f"Failed to download {name}: {e}")

            zip_file.unlink(missing_ok=True)

            failed_components.append(name)

            failed_count += 1

 

    # Persist the remote manifest so apply_staged can use it

    staged_manifest.write_text(json.dumps(remote, ensure_ascii=False, indent=2), encoding="utf-8")

 

    target_version = remote.get("version", "")

    if downloaded_count > 0:

        pending_flag.parent.mkdir(parents=True, exist_ok=True)

 

        # ── Pre-apply launcher & updater before writing _pending_update ────

        # CRITICAL: When upgrading from v0.3.11 (or older), the OLD launcher

        # only knows how to call "api_server.exe --apply-staged", which uses

        # shutil.rmtree and hits WinError 32 on the running exe.  The NEW

        # launcher uses mec_updater.ps1 instead.  By extracting the new

        # launcher scripts NOW (before _pending_update is written), the next

        # launcher restart will already run the NEW code path.

        #

        # Similarly, pre-apply mec_updater.ps1 so the new launcher finds it.

        try:

            _pre_apply_launcher_and_updater(install_root, staged_dir, remote, log)

        except Exception as e:

            log.warning(f"Pre-apply launcher/updater failed (non-fatal): {e}")

 

        pending_flag.write_text("staged", encoding="utf-8")

        log.info(f"Staging complete: {downloaded_count} downloaded, {failed_count} failed. Target: v{target_version}")

        _write_progress(install_root, "staged", downloaded_count, total, "")

    else:

        log.info("No components staged (all up-to-date or all downloads failed).")

        _remove_progress(install_root)

 

    return {

        "staged":           downloaded_count,

        "failed":           failed_count,

        "failedComponents": failed_components,

        "targetVersion":    target_version,

    }


 

def apply_staged(install_root: Path) -> dict:

    """Apply ZIPs in update/_staged_update/ to the installation directories.

 

    Each component is backed up first. Failures attempt per-component rollback.

    Returns: { applied, components, errors, newVersion }

    """

    log = _get_logger(install_root)

    update_dir      = install_root / "update"

    staged_dir      = update_dir / "_staged_update"

    staged_manifest = staged_dir / "manifest.json"

    pending_flag    = update_dir / "_pending_update"

 

    if not pending_flag.exists():

        log.info("No pending update found.")

        return {"applied": False, "components": 0, "errors": 0, "newVersion": "", "reason": "no_pending_update"}

 

    if not staged_manifest.exists():

        log.error("Staged manifest not found.")

        return {"applied": False, "components": 0, "errors": 0, "newVersion": "", "reason": "missing_manifest"}

 

    remote = json.loads(staged_manifest.read_text(encoding="utf-8-sig"))

 

    apply_components = [

        (name, rc)

        for name, rc in remote.get("components", {}).items()

        if (staged_dir / f"{name}.zip").exists()

    ]

    total = len(apply_components)

    applied_count = 0

    error_count   = 0

 

    for idx, (name, rc) in enumerate(apply_components, 1):

        zip_file    = staged_dir / f"{name}.zip"

        _write_progress(install_root, "applying", idx, total, name)

 

        mapping = _COMPONENT_TARGETS.get(name)

        if not mapping:

            log.warning(f"Unknown component '{name}' — skipping (updater component is deprecated)")

            continue

 

        target_path = install_root / mapping["target"]

 

        try:

            # ── Extract ──────────────────────────────────────────────────

            extract_dir = staged_dir / f"_extract_{name}"

            if extract_dir.exists():

                shutil.rmtree(extract_dir)

            extract_dir.mkdir(parents=True, exist_ok=True)

            _safe_extract_zip(zip_file, extract_dir)

 

            # ── Install ──────────────────────────────────────────────────

            # Issue #11158-2: For directory components, remove the existing

            # directory first and then extract the new version cleanly.

            # This avoids leftover files from the old version that are not

            # present in the new archive (merge-overwrite used to keep them).

            if mapping["type"] == "file":

                fname    = Path(mapping["target"]).name

                src_file = next(extract_dir.rglob(fname), None)

                if src_file:

                    target_path.parent.mkdir(parents=True, exist_ok=True)

                    shutil.copy2(src_file, target_path)

                    log.info(f"Updated file: {target_path}")

                else:

                    log.warning(f"Expected file '{fname}' not found in {name} archive")

            else:

                # ── Directory component — use file-by-file replacement ────

                # The old approach (shutil.rmtree + copytree) fails when the

                # running api_server.exe lives inside the target directory because

                # Windows locks the file (WinError 32 / OSError(32)).

                #

                # New approach: _replace_directory_files() copies files one by

                # one, skips the locked exe, and saves a pending copy for the

                # launcher to swap on next startup.

                preserve = mapping.get("preserve_subdirs", [])

                exe_deferred = _replace_directory_files(

                    extract_dir, target_path, preserve, log, install_root,

                )

                if exe_deferred:

                    log.info(f"Updated directory (exe deferred): {target_path}")

                else:

                    log.info(f"Updated directory (clean): {target_path}")

 

            shutil.rmtree(extract_dir, ignore_errors=True)

            applied_count += 1

 

        except Exception as e:

            log.error(f"Failed to apply {name}: {e}")

            error_count += 1

 

    # ── Update local version.json ─────────────────────────────────────────

    if applied_count > 0:

        try:

            new_local = read_local_version(install_root)

            new_local["version"]   = remote.get("version", new_local.get("version", "0.0.0"))

            new_local["buildDate"] = remote.get("buildDate", "")

            comps = new_local.setdefault("components", {})

            for name in (n for n, _ in apply_components):

                if (staged_dir / f"{name}.zip").exists():

                    comps.setdefault(name, {})["version"] = remote.get("components", {}).get(name, {}).get("version", "")

            _write_local_version(install_root, new_local)

            log.info(f"Updated local version.json to v{new_local['version']}")

        except Exception as e:

            log.warning(f"Could not update local version.json: {e}")

 

    # ── Download remote CHANGELOG.md ──────────────────────────────────────

    # After a successful update, fetch the latest CHANGELOG.md from the

    # update server so the /api/changelog endpoint can serve it.  The URL is

    # derived from any component's download URL (same versioned directory).

    if applied_count > 0:

        try:

            _changelog_url = None

            for _rc in remote.get("components", {}).values():

                _comp_url = _rc.get("url", "")

                if _comp_url:

                    # e.g. .../updates/v0.3.8/api_server-1.0.4.zip → .../updates/v0.3.8/CHANGELOG.md

                    _changelog_url = _comp_url.rsplit("/", 1)[0] + "/CHANGELOG.md"

                    break

            if _changelog_url:

                log.info(f"Downloading CHANGELOG.md from {_changelog_url}")

                _cl_req = urllib.request.Request(_changelog_url, headers={"User-Agent": "MEC-Agent-Updater/1.0"})

                _cl_openers = (

                    [_make_direct_opener(install_root)]

                    if _proxy_skip or not _read_proxy_url(install_root)

                    else [_make_opener(install_root), _make_direct_opener(install_root)]

                )

                _cl_ok = False

                for _cl_opener in _cl_openers:

                    try:

                        with _cl_opener.open(_cl_req, timeout=30) as _cl_resp:

                            _cl_data = _cl_resp.read()

                        _cl_dest = install_root / "update" / "CHANGELOG.md"

                        _cl_dest.parent.mkdir(parents=True, exist_ok=True)

                        _cl_dest.write_bytes(_cl_data)

                        log.info(f"Updated CHANGELOG.md ({len(_cl_data)} bytes)")

                        _cl_ok = True

                        break

                    except Exception as _cl_e:

                        log.debug(f"CHANGELOG.md download attempt failed: {_cl_e}")

                        continue

                if not _cl_ok:

                    log.warning("Could not download CHANGELOG.md from update server (non-fatal)")

            else:

                log.debug("No component URL found — skipping CHANGELOG.md download")

        except Exception as e:

            log.warning(f"Failed to update CHANGELOG.md: {e} (non-fatal)")

 

    # ── Cleanup ───────────────────────────────────────────────────────────

    pending_flag.unlink(missing_ok=True)

    shutil.rmtree(staged_dir, ignore_errors=True)

    _remove_progress(install_root)

 

    return {

        "applied":    applied_count > 0,

        "components": applied_count,

        "errors":     error_count,

        "newVersion": remote.get("version", ""),

    }


 

# ---------------------------------------------------------------------------

# Model management

# ---------------------------------------------------------------------------

 

def read_model_config(install_root: Path) -> dict:

    """Read local model tracking config (which model is currently installed)."""

    mf = install_root / "update" / "model_config.json"

    if mf.exists():

        try:

            return json.loads(mf.read_text(encoding="utf-8-sig"))

        except Exception:

            pass

    return {}


 

def _write_model_config(install_root: Path, data: dict) -> None:

    mf = install_root / "update" / "model_config.json"

    mf.parent.mkdir(parents=True, exist_ok=True)

    mf.write_text(json.dumps(data, ensure_ascii=False, indent=2), encoding="utf-8")


 

def fetch_model_index(install_root: Path) -> dict:

    """Fetch models/index.json from the update server."""

    cfg = read_update_config(install_root)

    models_url = cfg.get("modelsUrl", "")

    if not models_url:

        raise RuntimeError("modelsUrl not configured in update/update_config.json")

    return _fetch_json(models_url, install_root)


 

def check_model_update(install_root: Path) -> dict:

    """Check whether the active server model differs from the locally installed model.

 

    Returns:

        hasUpdate       bool

        activeFilename  str   — filename declared as active on the server

        activeModel     dict  — full model entry from server index

        localFilename   str   — currently installed model filename (empty if none)

        localExists     bool  — whether the local file actually exists on disk

    """

    index = fetch_model_index(install_root)

    active_filename = index.get("active", "")

    if not active_filename:

        raise RuntimeError("models/index.json has no 'active' field")

 

    models = {m["filename"]: m for m in index.get("models", [])}

    active_model = models.get(active_filename)

    if not active_model:

        raise RuntimeError(f"Active model '{active_filename}' not found in models list")

 

    local_cfg = read_model_config(install_root)

    local_filename = local_cfg.get("filename", "")

 

    models_dir = install_root / "models"

    local_exists = (models_dir / local_filename).exists() if local_filename else False

    has_update = (local_filename != active_filename) or not local_exists

 

    return {

        "hasUpdate":      has_update,

        "activeFilename": active_filename,

        "activeModel":    active_model,

        "localFilename":  local_filename,

        "localExists":    local_exists,

    }


 

def download_model(

    install_root: Path,

    progress_cb: Optional[Callable[[str, int, int, str], None]] = None,

) -> dict:

    """Download the active model from the server, replacing any previous model.

 

    Args:

        install_root: installation root path

        progress_cb:  optional callback(phase, current, total, component)

 

    Returns: { ok: bool, filename: str }

    """

    log = _get_logger(install_root)

    global _proxy_skip

    status = check_model_update(install_root)

    active_model = status["activeModel"]

    active_filename = status["activeFilename"]

 

    models_dir = install_root / "models"

    models_dir.mkdir(parents=True, exist_ok=True)

 

    url = active_model.get("url", "")

    if not url:

        raise RuntimeError(f"No URL for model '{active_filename}'")

 

    expected_sha256 = active_model.get("sha256", "")

    expected_size = int(active_model.get("size", 0))

 

    # Support ZIP-packaged models (same scheme as other components).

    # When the URL ends with .zip, we download the ZIP, verify its SHA-256,

    # then extract the .gguf from it.

    download_as_zip = url.lower().endswith(".zip")

 

    dest_file = models_dir / active_filename

    tmp_dl    = models_dir / ((active_filename + ".zip.tmp") if download_as_zip else (active_filename + ".tmp"))

 

    # Large timeout for GGUF files (can be several GB)

    timeout = (

        max(600, min(7200, 300 + ((expected_size + 2**30 - 1) // 2**30) * 300))

        if expected_size else 3600

    )

    log.info(f"Downloading model {active_filename} from {url} (timeout: {timeout}s)")

    _write_progress(install_root, "model_download", 0, expected_size, active_filename)

    if progress_cb:

        progress_cb("model_download", 0, expected_size, active_filename)

 

    cur_openers = (

        [_make_direct_opener(install_root)]

        if _proxy_skip or not _read_proxy_url(install_root)

        else [_make_opener(install_root), _make_direct_opener(install_root)]

    )

 

    last_err: Exception = RuntimeError("no opener")

    success = False

    for attempt, opener in enumerate(cur_openers):

        if attempt > 0:

            _proxy_skip = True

            log.warning("Proxy download failed for model, retrying without proxy...")

        req = urllib.request.Request(url, headers={"User-Agent": "MEC-Agent-Updater/1.0"})

        try:

            bytes_downloaded = 0

            # Progress is reported every ~5 MB (approximately) to avoid flooding the file.

            progress_interval = 5 * 2**20

            next_progress_at = progress_interval

            with opener.open(req, timeout=timeout) as resp:

                with open(tmp_dl, "wb") as f:

                    while True:

                        chunk = resp.read(65536)

                        if not chunk:

                            break

                        f.write(chunk)

                        bytes_downloaded += len(chunk)

                        if bytes_downloaded >= next_progress_at:

                            _write_progress(install_root, "model_download",

                                            bytes_downloaded, expected_size, active_filename)

                            if progress_cb:

                                progress_cb("model_download", bytes_downloaded,

                                            expected_size, active_filename)

                            next_progress_at = bytes_downloaded + progress_interval

            success = True

            break

        except Exception as e:

            last_err = e

            tmp_dl.unlink(missing_ok=True)

 

    if not success:

        raise RuntimeError(f"Failed to download model: {last_err}")

 

    # Verify SHA-256 of the downloaded file (ZIP or raw gguf)

    if expected_sha256 and "..." not in expected_sha256:

        if not verify_sha256(tmp_dl, expected_sha256):

            tmp_dl.unlink(missing_ok=True)

            raise RuntimeError(f"SHA256 mismatch for model {active_filename}")

 

    # Extract .gguf from ZIP if required

    if download_as_zip:

        log.info(f"Extracting {active_filename} from ZIP...")

        try:

            _safe_extract_zip(tmp_dl, models_dir)

            extracted = models_dir / active_filename

            if not extracted.exists():

                # Try to find it inside a subdirectory

                found = next(models_dir.rglob(active_filename), None)

                if found:

                    found.rename(extracted)

                else:

                    raise RuntimeError(f"{active_filename} not found inside downloaded ZIP")

            tmp_dl.unlink(missing_ok=True)

        except Exception:

            tmp_dl.unlink(missing_ok=True)

            raise

        tmp_file = extracted

    else:

        tmp_file = tmp_dl

 

    # Remove old model file if it differs from the newly downloaded one

    local_cfg = read_model_config(install_root)

    old_filename = local_cfg.get("filename", "")

    if old_filename and old_filename != active_filename:

        old_file = models_dir / old_filename

        if old_file.exists():

            try:

                old_file.unlink()

                log.info(f"Removed old model: {old_filename}")

            except Exception as e:

                log.warning(f"Could not remove old model {old_filename}: {e}")

 

    # Move tmp to final destination (skip if ZIP extraction already placed the file correctly)

    if tmp_file != dest_file:

        if dest_file.exists():

            dest_file.unlink()

        tmp_file.rename(dest_file)

 

    _write_model_config(install_root, {

        "filename":     active_filename,

        "downloadDate": datetime.now().strftime("%Y-%m-%d"),

    })

 

    if progress_cb:

        progress_cb("downloaded", 1, 1, active_filename)

 

    log.info(f"Model {active_filename} downloaded successfully")

    return {"ok": True, "filename": active_filename}

 







Login to like - 0 Likes



Comments...


No Comments Yet...



Add Comment...



shumin

A graduated biotechnology engineer. Now is a software engineer


Latest Posts



Footer with Icons