Skip to content

Python API

Auto-generated reference for the public Python modules in the hub package.

When to use this

This reference is useful if you are extending the hub itself, writing tests, or scripting against the hub's internals. For most app development, the CLI and Hub API are sufficient.


hub.config

Configuration loading and directory resolution. The HubConfig dataclass holds all runtime settings; helper functions resolve the paths used by the hub at startup.

hub.config

Hub configuration loading and directory resolution.

HubConfig dataclass

Source code in hub/config.py
@dataclass
class HubConfig:
    host: str = "127.0.0.1"
    port: int = 9100
    socket_mode: str = "xdg"  # "xdg" or "local"
    log_level: str = "info"
    log_dir: Optional[str] = None

load_config(path=None)

Load hub configuration from a TOML file.

Defaults to the user config at $XDG_CONFIG_HOME/squareberg/config.toml. Falls back to built-in defaults if the file does not exist.

Source code in hub/config.py
def load_config(path: Path | None = None) -> HubConfig:
    """Load hub configuration from a TOML file.

    Defaults to the user config at ``$XDG_CONFIG_HOME/squareberg/config.toml``.
    Falls back to built-in defaults if the file does not exist.
    """
    if path is None:
        path = get_config_path()

    if not path.exists():
        logger.debug("Config file not found at %s; using defaults.", path)
        return HubConfig()

    logger.debug("Loading config from %s", path)
    data = _read_toml(path)
    hub = data.get("hub", {})
    sockets = hub.get("sockets", {})
    logging_cfg = hub.get("logging", {})

    log_dir_raw = logging_cfg.get("dir", "") or None

    return HubConfig(
        host=hub.get("host", "127.0.0.1"),
        port=hub.get("port", 9100),
        socket_mode=sockets.get("mode", "xdg"),
        log_level=logging_cfg.get("level", "info"),
        log_dir=log_dir_raw,
    )

get_socket_dir(config=None)

Resolve the socket directory and create it if needed.

Source code in hub/config.py
def get_socket_dir(config: HubConfig | None = None) -> Path:
    """Resolve the socket directory and create it if needed."""
    mode = config.socket_mode if config else "xdg"

    if mode == "xdg":
        sock_dir = _xdg_data_home() / "squareberg" / "sockets"
    else:
        sock_dir = _project_root() / "sockets"

    sock_dir.mkdir(parents=True, exist_ok=True)
    return sock_dir

get_log_dir(config=None)

Resolve the log directory and create it if needed.

Source code in hub/config.py
def get_log_dir(config: HubConfig | None = None) -> Path:
    """Resolve the log directory and create it if needed."""
    if config and config.log_dir:
        log_dir = Path(config.log_dir)
    else:
        log_dir = _xdg_data_home() / "squareberg" / "logs"

    log_dir.mkdir(parents=True, exist_ok=True)
    return log_dir

get_apps_dir()

Return the apps directory under XDG_DATA_HOME, creating it if needed.

Source code in hub/config.py
def get_apps_dir() -> Path:
    """Return the apps directory under XDG_DATA_HOME, creating it if needed."""
    apps_dir = _xdg_data_home() / "squareberg" / "apps"
    apps_dir.mkdir(parents=True, exist_ok=True)
    return apps_dir

hub.registry

App discovery and runtime status tracking. The Registry class scans the filesystem for manifests and maintains an in-memory map of app names to AppInfo objects.

hub.registry

App registry — discovers and tracks installed Squareberg apps.

AppInfo dataclass

Source code in hub/registry.py
@dataclass
class AppInfo:
    name: str
    display_name: str
    description: str
    version: str
    status: str = "stopped"
    socket_path: Optional[str] = None
    frontend_dist_path: Optional[str] = None
    manifest_path: Optional[str] = None
    backend_module: Optional[str] = None
    app_dir: Optional[str] = None

    def to_dict(self) -> dict:
        return {
            "name": self.name,
            "display_name": self.display_name,
            "description": self.description,
            "version": self.version,
            "status": self.status,
            "socket_path": self.socket_path,
            "frontend_dist_path": self.frontend_dist_path,
            "manifest_path": self.manifest_path,
            "backend_module": self.backend_module,
        }

Registry

Discovers apps from the filesystem and tracks their runtime status.

Source code in hub/registry.py
class Registry:
    """Discovers apps from the filesystem and tracks their runtime status."""

    def __init__(self, apps_dir: Path, socket_dir: Path) -> None:
        self._apps_dir = apps_dir
        self._socket_dir = socket_dir
        self._apps: dict[str, AppInfo] = {}

    # ------------------------------------------------------------------
    # Scanning
    # ------------------------------------------------------------------

    def scan(self) -> None:
        """Scan apps_dir for installed apps."""
        self._apps.clear()

        if self._apps_dir.is_dir():
            self._scan_directory(self._apps_dir)

        logger.info(
            "Registry scan complete: %d app(s) found — %s",
            len(self._apps),
            ", ".join(self._apps.keys()) or "(none)",
        )

    def _scan_directory(self, directory: Path) -> None:
        """Scan a directory for subdirectories containing .squareberg/manifest.toml."""
        for child in sorted(directory.iterdir()):
            if not child.is_dir():
                continue
            manifest = child / ".squareberg" / "manifest.toml"
            if manifest.is_file():
                self._load_manifest(manifest, child)

    def _load_manifest(self, manifest_path: Path, app_dir: Path) -> None:
        """Parse a manifest.toml and register the app."""
        try:
            with open(manifest_path, "rb") as fh:
                data = tomllib.load(fh)
        except Exception:
            logger.warning("Failed to parse manifest: %s", manifest_path, exc_info=True)
            return

        app_section = data.get("app", {})
        backend_section = data.get("backend", {})
        frontend_section = data.get("frontend", {})

        name = app_section.get("name", app_dir.name)

        # Resolve the active frontend directory.
        # Prefer a ``dist/`` subfolder (build-based frontends), but fall
        # back to the frontend root when it contains an ``index.html``
        # directly (simple / pre-built frontends).
        frontend_dist: str | None = None
        active_frontends = frontend_section.get("active", [])
        if active_frontends:
            first_active = active_frontends[0]
            fe_config = frontend_section.get(first_active, {})
            fe_rel_path = fe_config.get("path", f"frontend/{first_active}")
            fe_dir = app_dir / fe_rel_path
            dist_candidate = fe_dir / "dist"
            if dist_candidate.is_dir():
                frontend_dist = str(dist_candidate)
            elif (fe_dir / "index.html").is_file():
                frontend_dist = str(fe_dir)

        socket_path = str(self._socket_dir / f"{name}.sock")

        info = AppInfo(
            name=name,
            display_name=app_section.get("display_name", name),
            description=app_section.get("description", ""),
            version=app_section.get("version", "0.0.0"),
            status="stopped",
            socket_path=socket_path,
            frontend_dist_path=frontend_dist,
            manifest_path=str(manifest_path),
            backend_module=backend_section.get("module", "app:app"),
            app_dir=str(app_dir),
        )

        if name in self._apps:
            logger.warning("Duplicate app name '%s'; skipping %s", name, app_dir)
            return

        self._apps[name] = info
        logger.debug("Registered app: %s (%s)", name, app_dir)

    # ------------------------------------------------------------------
    # Queries
    # ------------------------------------------------------------------

    def get(self, name: str) -> AppInfo | None:
        return self._apps.get(name)

    def list(self) -> list[AppInfo]:
        return list(self._apps.values())

    def remove(self, name: str) -> bool:
        """Remove an app from the registry. Returns True if it was present."""
        if name in self._apps:
            del self._apps[name]
            logger.info("Unregistered app: %s", name)
            return True
        return False

    def set_status(self, name: str, status: str) -> None:
        if name in self._apps:
            self._apps[name].status = status

remove(name)

Remove an app from the registry. Returns True if it was present.

Source code in hub/registry.py
def remove(self, name: str) -> bool:
    """Remove an app from the registry. Returns True if it was present."""
    if name in self._apps:
        del self._apps[name]
        logger.info("Unregistered app: %s", name)
        return True
    return False

scan()

Scan apps_dir for installed apps.

Source code in hub/registry.py
def scan(self) -> None:
    """Scan apps_dir for installed apps."""
    self._apps.clear()

    if self._apps_dir.is_dir():
        self._scan_directory(self._apps_dir)

    logger.info(
        "Registry scan complete: %d app(s) found — %s",
        len(self._apps),
        ", ".join(self._apps.keys()) or "(none)",
    )

hub.manager

Process lifecycle management. ProcessManager spawns uvicorn subprocesses for apps, waits for their sockets to appear, and handles graceful shutdown.

hub.manager

Process manager — spawns, monitors, and stops app subprocesses.

ProcessManager

Manage app subprocesses — start, stop, health-check.

Source code in hub/manager.py
class ProcessManager:
    """Manage app subprocesses — start, stop, health-check."""

    def __init__(self, registry: Registry, socket_dir: Path, log_dir: Path) -> None:
        self._registry = registry
        self._socket_dir = socket_dir
        self._log_dir = log_dir
        self._processes: dict[str, asyncio.subprocess.Process] = {}

    # ------------------------------------------------------------------
    # Start
    # ------------------------------------------------------------------

    async def start_app(self, name: str) -> None:
        """Start an app by name.

        Resolves the app's venv Python, spawns uvicorn on a Unix socket,
        and waits for the socket file to appear.
        """
        if name in self._processes:
            proc = self._processes[name]
            if proc.returncode is None:
                logger.info("App '%s' is already running (pid %d).", name, proc.pid)
                return

        info = self._registry.get(name)
        if info is None:
            raise ValueError(f"Unknown app: {name}")

        app_dir = Path(info.app_dir)  # type: ignore[arg-type]
        python = self._resolve_python(app_dir)
        if python is None:
            raise FileNotFoundError(
                f"No Python venv found for app '{name}'. "
                f"Looked in {app_dir / '.venv'} and {app_dir / 'backend' / '.venv'}."
            )

        socket_path = Path(info.socket_path)  # type: ignore[arg-type]

        # Remove stale socket if present.
        if socket_path.exists():
            socket_path.unlink()

        module = info.backend_module or "app:app"
        backend_dir = app_dir / "backend"
        if not backend_dir.is_dir():
            backend_dir = app_dir  # fallback: app root is the backend

        cmd = [
            str(python), "-m", "uvicorn",
            module,
            "--uds", str(socket_path),
            "--app-dir", str(backend_dir),
            "--log-level", "info",
        ]

        log_file_path = self._log_dir / f"{name}.log"
        log_fh = open(log_file_path, "ab")

        logger.info("Starting app '%s': %s", name, " ".join(cmd))

        proc = await asyncio.create_subprocess_exec(
            *cmd,
            stdout=log_fh,
            stderr=log_fh,
            cwd=str(backend_dir),
        )
        self._processes[name] = proc

        # Wait for the socket file to appear.
        elapsed = 0.0
        while elapsed < _SOCKET_POLL_TIMEOUT:
            if proc.returncode is not None:
                self._registry.set_status(name, "error")
                raise RuntimeError(
                    f"App '{name}' exited immediately with code {proc.returncode}. "
                    f"Check logs at {log_file_path}"
                )
            if socket_path.exists():
                break
            await asyncio.sleep(_SOCKET_POLL_INTERVAL)
            elapsed += _SOCKET_POLL_INTERVAL
        else:
            # Timed out — kill the process.
            await self._kill(proc)
            self._registry.set_status(name, "error")
            raise TimeoutError(
                f"App '{name}' did not create socket within {_SOCKET_POLL_TIMEOUT}s. "
                f"Check logs at {log_file_path}"
            )

        self._registry.set_status(name, "running")
        logger.info("App '%s' is running (pid %d).", name, proc.pid)

    # ------------------------------------------------------------------
    # Stop
    # ------------------------------------------------------------------

    async def stop_app(self, name: str) -> None:
        """Stop a running app gracefully, escalating to SIGKILL if needed."""
        proc = self._processes.pop(name, None)
        if proc is None or proc.returncode is not None:
            self._registry.set_status(name, "stopped")
            self._cleanup_socket(name)
            return

        logger.info("Stopping app '%s' (pid %d)...", name, proc.pid)

        # Send SIGTERM.
        try:
            proc.send_signal(signal.SIGTERM)
        except ProcessLookupError:
            pass

        try:
            await asyncio.wait_for(proc.wait(), timeout=_STOP_TIMEOUT)
        except asyncio.TimeoutError:
            logger.warning("App '%s' did not exit in time; sending SIGKILL.", name)
            await self._kill(proc)

        self._cleanup_socket(name)
        self._registry.set_status(name, "stopped")
        logger.info("App '%s' stopped.", name)

    # ------------------------------------------------------------------
    # Health check
    # ------------------------------------------------------------------

    async def health_check(self, name: str) -> bool:
        """Probe an app's /api/health endpoint via its Unix socket."""
        info = self._registry.get(name)
        if info is None or info.socket_path is None:
            return False

        socket_path = Path(info.socket_path)
        if not socket_path.exists():
            return False

        try:
            import httpx

            transport = httpx.AsyncHTTPTransport(uds=str(socket_path))
            async with httpx.AsyncClient(transport=transport) as client:
                resp = await client.get("http://localhost/api/health", timeout=3.0)
                return resp.status_code == 200
        except Exception:
            logger.debug("Health check failed for '%s'.", name, exc_info=True)
            return False

    # ------------------------------------------------------------------
    # Bulk operations
    # ------------------------------------------------------------------

    async def start_all(self) -> None:
        """Start all registered apps."""
        for info in self._registry.list():
            try:
                await self.start_app(info.name)
            except Exception:
                logger.error("Failed to start app '%s'.", info.name, exc_info=True)

    async def stop_all(self) -> None:
        """Stop all running apps."""
        names = list(self._processes.keys())
        for name in names:
            try:
                await self.stop_app(name)
            except Exception:
                logger.error("Failed to stop app '%s'.", name, exc_info=True)

    # ------------------------------------------------------------------
    # Helpers
    # ------------------------------------------------------------------

    @staticmethod
    def _resolve_python(app_dir: Path) -> Path | None:
        """Find the venv Python binary for an app.

        Checks two locations:
          1. <app_dir>/.venv/bin/python
          2. <app_dir>/backend/.venv/bin/python
        """
        for candidate in (
            app_dir / ".venv" / "bin" / "python",
            app_dir / "backend" / ".venv" / "bin" / "python",
        ):
            if candidate.is_file():
                return candidate
        return None

    @staticmethod
    async def _kill(proc: asyncio.subprocess.Process) -> None:
        try:
            proc.kill()
        except ProcessLookupError:
            pass
        await proc.wait()

    def _cleanup_socket(self, name: str) -> None:
        info = self._registry.get(name)
        if info and info.socket_path:
            sock = Path(info.socket_path)
            if sock.exists():
                try:
                    sock.unlink()
                except OSError:
                    pass

health_check(name) async

Probe an app's /api/health endpoint via its Unix socket.

Source code in hub/manager.py
async def health_check(self, name: str) -> bool:
    """Probe an app's /api/health endpoint via its Unix socket."""
    info = self._registry.get(name)
    if info is None or info.socket_path is None:
        return False

    socket_path = Path(info.socket_path)
    if not socket_path.exists():
        return False

    try:
        import httpx

        transport = httpx.AsyncHTTPTransport(uds=str(socket_path))
        async with httpx.AsyncClient(transport=transport) as client:
            resp = await client.get("http://localhost/api/health", timeout=3.0)
            return resp.status_code == 200
    except Exception:
        logger.debug("Health check failed for '%s'.", name, exc_info=True)
        return False

start_all() async

Start all registered apps.

Source code in hub/manager.py
async def start_all(self) -> None:
    """Start all registered apps."""
    for info in self._registry.list():
        try:
            await self.start_app(info.name)
        except Exception:
            logger.error("Failed to start app '%s'.", info.name, exc_info=True)

start_app(name) async

Start an app by name.

Resolves the app's venv Python, spawns uvicorn on a Unix socket, and waits for the socket file to appear.

Source code in hub/manager.py
async def start_app(self, name: str) -> None:
    """Start an app by name.

    Resolves the app's venv Python, spawns uvicorn on a Unix socket,
    and waits for the socket file to appear.
    """
    if name in self._processes:
        proc = self._processes[name]
        if proc.returncode is None:
            logger.info("App '%s' is already running (pid %d).", name, proc.pid)
            return

    info = self._registry.get(name)
    if info is None:
        raise ValueError(f"Unknown app: {name}")

    app_dir = Path(info.app_dir)  # type: ignore[arg-type]
    python = self._resolve_python(app_dir)
    if python is None:
        raise FileNotFoundError(
            f"No Python venv found for app '{name}'. "
            f"Looked in {app_dir / '.venv'} and {app_dir / 'backend' / '.venv'}."
        )

    socket_path = Path(info.socket_path)  # type: ignore[arg-type]

    # Remove stale socket if present.
    if socket_path.exists():
        socket_path.unlink()

    module = info.backend_module or "app:app"
    backend_dir = app_dir / "backend"
    if not backend_dir.is_dir():
        backend_dir = app_dir  # fallback: app root is the backend

    cmd = [
        str(python), "-m", "uvicorn",
        module,
        "--uds", str(socket_path),
        "--app-dir", str(backend_dir),
        "--log-level", "info",
    ]

    log_file_path = self._log_dir / f"{name}.log"
    log_fh = open(log_file_path, "ab")

    logger.info("Starting app '%s': %s", name, " ".join(cmd))

    proc = await asyncio.create_subprocess_exec(
        *cmd,
        stdout=log_fh,
        stderr=log_fh,
        cwd=str(backend_dir),
    )
    self._processes[name] = proc

    # Wait for the socket file to appear.
    elapsed = 0.0
    while elapsed < _SOCKET_POLL_TIMEOUT:
        if proc.returncode is not None:
            self._registry.set_status(name, "error")
            raise RuntimeError(
                f"App '{name}' exited immediately with code {proc.returncode}. "
                f"Check logs at {log_file_path}"
            )
        if socket_path.exists():
            break
        await asyncio.sleep(_SOCKET_POLL_INTERVAL)
        elapsed += _SOCKET_POLL_INTERVAL
    else:
        # Timed out — kill the process.
        await self._kill(proc)
        self._registry.set_status(name, "error")
        raise TimeoutError(
            f"App '{name}' did not create socket within {_SOCKET_POLL_TIMEOUT}s. "
            f"Check logs at {log_file_path}"
        )

    self._registry.set_status(name, "running")
    logger.info("App '%s' is running (pid %d).", name, proc.pid)

stop_all() async

Stop all running apps.

Source code in hub/manager.py
async def stop_all(self) -> None:
    """Stop all running apps."""
    names = list(self._processes.keys())
    for name in names:
        try:
            await self.stop_app(name)
        except Exception:
            logger.error("Failed to stop app '%s'.", name, exc_info=True)

stop_app(name) async

Stop a running app gracefully, escalating to SIGKILL if needed.

Source code in hub/manager.py
async def stop_app(self, name: str) -> None:
    """Stop a running app gracefully, escalating to SIGKILL if needed."""
    proc = self._processes.pop(name, None)
    if proc is None or proc.returncode is not None:
        self._registry.set_status(name, "stopped")
        self._cleanup_socket(name)
        return

    logger.info("Stopping app '%s' (pid %d)...", name, proc.pid)

    # Send SIGTERM.
    try:
        proc.send_signal(signal.SIGTERM)
    except ProcessLookupError:
        pass

    try:
        await asyncio.wait_for(proc.wait(), timeout=_STOP_TIMEOUT)
    except asyncio.TimeoutError:
        logger.warning("App '%s' did not exit in time; sending SIGKILL.", name)
        await self._kill(proc)

    self._cleanup_socket(name)
    self._registry.set_status(name, "stopped")
    logger.info("App '%s' stopped.", name)