Skip to content

Marketplace Lifecycle Execution — Implementation Plan

For agentic workers: REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (- [ ]) syntax for tracking.

Goal: Make marketplace module install/enable/disable/retry/uninstall lifecycle production-real with plugin hook orchestration and rollback.

Architecture: Finite state machine on installed_modules.install_state with transitions enforced by ModuleLifecycleService. Plugin hooks (configure, install, activate, deactivate, uninstall, health_check) called at transition boundaries. Rollback on install failure, best-effort on uninstall.

Tech Stack: Python 3.12, FastAPI, asyncpg (via SQLAlchemy async), Pydantic v2, pytest + pytest-asyncio

Spec: docs/superpowers/specs/2026-03-28-marketplace-lifecycle-design.md


File Structure

File Responsibility
backend/db/postgres/V12__installed_modules_retry_count.sql Create. Add retry_count column.
backend/app/modules/marketplace/lifecycle.py Create. ModuleLifecycleService — owns all state transitions and plugin hook calls.
backend/app/modules/marketplace/schemas.py Modify. Add action field to update request, add HealthStatusOut, add retry_count to output, add removing/removed to InstallState.
backend/app/modules/marketplace/repository.py Modify. Add update_state_and_error(), increment_retry(), change uninstall_module() to soft-delete.
backend/app/modules/marketplace/dependencies.py Modify. Add get_lifecycle_service() factory.
backend/app/modules/marketplace/router.py Modify. Wire action-based PATCH to lifecycle service, add health endpoint, wire DELETE to lifecycle.
backend/app/modules/marketplace/service.py Modify. Remove install/update/uninstall logic, delegate to lifecycle service.
backend/tests/modules/test_marketplace_lifecycle.py Create. Unit tests for state machine transitions, rollback, retry.
backend/tests/modules/test_marketplace.py Modify. Update for new PATCH schema.

Task 1: Database Migration — Add retry_count Column

Files: - Create: backend/db/postgres/V12__installed_modules_retry_count.sql

  • [ ] Step 1: Write the migration
-- V12: Add retry_count to installed_modules for lifecycle retry tracking.
-- Also add 'removing' and 'removed' to the install_state CHECK constraint.

ALTER TABLE installed_modules DROP CONSTRAINT IF EXISTS installed_modules_install_state_check;

ALTER TABLE installed_modules
    ADD CONSTRAINT installed_modules_install_state_check
    CHECK (install_state IN ('installed', 'disabled', 'provisioning', 'error', 'removing', 'removed'));

ALTER TABLE installed_modules
    ADD COLUMN IF NOT EXISTS retry_count INTEGER NOT NULL DEFAULT 0;
  • [ ] Step 2: Verify migration file exists and SQL is valid

Run: python -c "open('backend/db/postgres/V12__installed_modules_retry_count.sql').read()" Expected: No error

  • [ ] Step 3: Commit
git add backend/db/postgres/V12__installed_modules_retry_count.sql
git commit -m "feat(db): add retry_count column and extended install_state constraint"

Task 2: Schema Updates — Action-Based PATCH + HealthStatusOut

Files: - Modify: backend/app/modules/marketplace/schemas.py

  • [ ] Step 1: Update InstallState to include new states

In backend/app/modules/marketplace/schemas.py, change line 11:

# before
InstallState = Literal["installed", "disabled", "provisioning", "error"]

# after
InstallState = Literal["installed", "disabled", "provisioning", "error", "removing", "removed"]
  • [ ] Step 2: Add retry_count to InstalledModuleOut

In backend/app/modules/marketplace/schemas.py, add after line 82 (last_error: str | None = None):

    retry_count: int = 0
  • [ ] Step 3: Replace UpdateInstalledModuleRequest with action-based schema

In backend/app/modules/marketplace/schemas.py, replace lines 98-100:

# before
class UpdateInstalledModuleRequest(BaseModel):
    install_state: InstallState | None = None
    channel: Literal["stable", "candidate"] | None = None

# after
LifecycleAction = Literal["enable", "disable", "retry"]


class UpdateInstalledModuleRequest(BaseModel):
    action: LifecycleAction | None = None
    channel: Literal["stable", "candidate"] | None = None
  • [ ] Step 4: Add HealthStatusOut schema

Add at the end of backend/app/modules/marketplace/schemas.py:

class HealthStatusOut(BaseModel):
    status: Literal["healthy", "degraded", "unhealthy"]
    detail: str = ""
  • [ ] Step 5: Run type check

Run: cd backend && python -m py_compile app/modules/marketplace/schemas.py Expected: No output (success)

  • [ ] Step 6: Commit
git add backend/app/modules/marketplace/schemas.py
git commit -m "feat(schemas): action-based PATCH, HealthStatusOut, retry_count, extended InstallState"

Task 3: Repository Updates — State Helpers + Soft-Delete

Files: - Modify: backend/app/modules/marketplace/repository.py

  • [ ] Step 1: Add update_state_and_error() method

Add to MarketplaceRepository class in backend/app/modules/marketplace/repository.py, after update_installed_state (around line 427):

    async def update_state_and_error(
        self,
        *,
        org_id: str,
        installed_module_id: UUID,
        install_state: str,
        last_error: str | None = None,
        increment_retry: bool = False,
    ) -> None:
        sets = [
            "install_state = :install_state",
            "last_error = :last_error",
            "updated_at = now()",
        ]
        params: dict[str, Any] = {
            "org_id": org_id,
            "installed_module_id": installed_module_id,
            "install_state": install_state,
            "last_error": last_error,
        }
        if increment_retry:
            sets.append("retry_count = retry_count + 1")

        query = f"""
            UPDATE installed_modules
            SET {', '.join(sets)}
            WHERE org_id = CAST(:org_id AS uuid)
              AND id = :installed_module_id
        """
        await self._execute(query, params)

    async def clear_error_for_retry(
        self,
        *,
        org_id: str,
        installed_module_id: UUID,
    ) -> None:
        query = """
            UPDATE installed_modules
            SET install_state = 'provisioning',
                last_error = NULL,
                updated_at = now()
            WHERE org_id = CAST(:org_id AS uuid)
              AND id = :installed_module_id
        """
        await self._execute(
            query,
            {"org_id": org_id, "installed_module_id": installed_module_id},
        )

    async def get_retry_count(self, *, org_id: str, installed_module_id: UUID) -> int:
        query = """
            SELECT retry_count FROM installed_modules
            WHERE org_id = CAST(:org_id AS uuid) AND id = :installed_module_id
        """
        row = await self._fetch_one(query, {"org_id": org_id, "installed_module_id": installed_module_id})
        return row["retry_count"] if row else 0
  • [ ] Step 2: Change uninstall_module to soft-delete

In backend/app/modules/marketplace/repository.py, replace the uninstall_module method (lines 653-666):

    async def uninstall_module(self, *, org_id: str, installation_id: str) -> bool:
        query = """
            UPDATE installed_modules
            SET install_state = 'removed',
                updated_at = now()
            WHERE org_id = CAST(:org_id AS uuid)
              AND id = CAST(:installation_id AS uuid)
        """
        result = await self._execute(
            query,
            {
                "org_id": org_id,
                "installation_id": installation_id,
            },
        )
        return bool(result.rowcount and result.rowcount > 0)
  • [ ] Step 3: Run type check

Run: cd backend && python -m py_compile app/modules/marketplace/repository.py Expected: No output (success)

  • [ ] Step 4: Commit
git add backend/app/modules/marketplace/repository.py
git commit -m "feat(repo): state+error update helpers, soft-delete uninstall, retry support"

Task 4: Lifecycle Service — State Machine + Hook Orchestration

Files: - Create: backend/app/modules/marketplace/lifecycle.py - Create: backend/tests/modules/test_marketplace_lifecycle.py

This is the core task. TDD: write tests first, then implement.

  • [ ] Step 1: Write unit tests for the lifecycle service

Create backend/tests/modules/test_marketplace_lifecycle.py:

"""Unit tests for ModuleLifecycleService state machine."""
from __future__ import annotations

from unittest.mock import AsyncMock, MagicMock, patch
from uuid import uuid4

import pytest

from app.modules.marketplace.lifecycle import ModuleLifecycleService, InvalidTransitionError

MODULE_KEY = "connector.github"
ORG_ID = str(uuid4())
INSTALL_ID = uuid4()
MODULE_ID = uuid4()
USER_PROFILE_ID = uuid4()


def _make_installed_row(state: str, retry_count: int = 0, module_type: str = "connector") -> dict:
    return {
        "id": INSTALL_ID,
        "org_id": ORG_ID,
        "module_id": MODULE_ID,
        "module_key": MODULE_KEY,
        "module_name": "GitHub Data Connector",
        "module_version": "1.0.0",
        "module_publisher": "Substrate Labs",
        "module_type": module_type,
        "billing_mode": "free",
        "install_state": state,
        "channel": "stable",
        "artifact_ref": "registry.substrate.io/connectors/github@sha256:abc",
        "artifact_digest": "sha256:abc",
        "configured_json": {},
        "verification_json": {},
        "verified_at": None,
        "last_error": None,
        "retry_count": retry_count,
        "created_at": "2026-03-28T00:00:00",
        "updated_at": "2026-03-28T00:00:00",
    }


def _make_service() -> tuple[ModuleLifecycleService, AsyncMock, AsyncMock, AsyncMock, MagicMock]:
    repo = AsyncMock()
    connector_repo = AsyncMock()
    policy_runtime_svc = AsyncMock()
    plugin_registry = MagicMock()
    svc = ModuleLifecycleService(repo, connector_repo, policy_runtime_svc, plugin_registry)
    return svc, repo, connector_repo, policy_runtime_svc, plugin_registry


# --- Transition validation ---

@pytest.mark.asyncio
async def test_enable_from_disabled():
    svc, repo, connector_repo, _, plugin_registry = _make_service()
    row = _make_installed_row("disabled")
    repo.get_installed_by_id.return_value = row
    plugin = AsyncMock()
    plugin_registry.get_or_create_instance.return_value = plugin

    updated_row = {**row, "install_state": "installed"}
    repo.get_installed_by_id.side_effect = [row, updated_row]

    result = await svc.enable(INSTALL_ID, ORG_ID, USER_PROFILE_ID)
    assert result["install_state"] == "installed"
    connector_repo.upsert_instance_config.assert_called_once()


@pytest.mark.asyncio
async def test_disable_from_installed():
    svc, repo, connector_repo, _, plugin_registry = _make_service()
    row = _make_installed_row("installed")
    plugin = AsyncMock()
    plugin_registry.get_or_create_instance.return_value = plugin

    updated_row = {**row, "install_state": "disabled"}
    repo.get_installed_by_id.side_effect = [row, updated_row]

    result = await svc.disable(INSTALL_ID, ORG_ID, USER_PROFILE_ID)
    assert result["install_state"] == "disabled"


@pytest.mark.asyncio
async def test_enable_from_installed_raises():
    svc, repo, *_ = _make_service()
    row = _make_installed_row("installed")
    repo.get_installed_by_id.return_value = row

    with pytest.raises(InvalidTransitionError):
        await svc.enable(INSTALL_ID, ORG_ID, USER_PROFILE_ID)


@pytest.mark.asyncio
async def test_disable_from_error_raises():
    svc, repo, *_ = _make_service()
    row = _make_installed_row("error")
    repo.get_installed_by_id.return_value = row

    with pytest.raises(InvalidTransitionError):
        await svc.disable(INSTALL_ID, ORG_ID, USER_PROFILE_ID)


# --- Install rollback ---

@pytest.mark.asyncio
async def test_install_rollback_on_plugin_configure_failure():
    svc, repo, connector_repo, _, plugin_registry = _make_service()

    plugin = AsyncMock()
    plugin.configure.side_effect = RuntimeError("bad config")
    plugin_registry.get_or_create_instance.return_value = plugin

    connector_repo.ensure_instance_for_install.return_value = {"id": uuid4()}

    row_provisioning = _make_installed_row("provisioning")
    row_error = {**row_provisioning, "install_state": "error", "last_error": "bad config"}
    repo.get_installed_by_id.side_effect = [row_error]

    result = await svc.run_install(
        installed_module_id=INSTALL_ID,
        org_id=ORG_ID,
        user_profile_id=USER_PROFILE_ID,
        module_key=MODULE_KEY,
        module_type="connector",
        config={}
    )
    assert result["install_state"] == "error"
    assert result["last_error"] == "bad config"
    repo.update_state_and_error.assert_called()
    connector_repo.delete_instance.assert_called_once()


# --- Retry ---

@pytest.mark.asyncio
async def test_retry_from_error():
    svc, repo, connector_repo, _, plugin_registry = _make_service()
    row = _make_installed_row("error", retry_count=1)

    plugin = AsyncMock()
    plugin_registry.get_or_create_instance.return_value = plugin
    connector_repo.ensure_instance_for_install.return_value = {"id": uuid4()}

    row_installed = {**row, "install_state": "installed", "retry_count": 2}
    repo.get_installed_by_id.side_effect = [row, row_installed]
    repo.get_retry_count.return_value = 1

    result = await svc.retry(INSTALL_ID, ORG_ID, USER_PROFILE_ID)
    assert result["install_state"] == "installed"
    repo.clear_error_for_retry.assert_called_once()


@pytest.mark.asyncio
async def test_retry_blocked_after_max():
    svc, repo, *_ = _make_service()
    row = _make_installed_row("error", retry_count=3)
    repo.get_installed_by_id.return_value = row
    repo.get_retry_count.return_value = 3

    with pytest.raises(InvalidTransitionError, match="max retries"):
        await svc.retry(INSTALL_ID, ORG_ID, USER_PROFILE_ID)


# --- Uninstall best-effort ---

@pytest.mark.asyncio
async def test_uninstall_best_effort_on_plugin_failure():
    svc, repo, connector_repo, _, plugin_registry = _make_service()
    row = _make_installed_row("installed")
    repo.get_installed_by_id.return_value = row

    plugin = AsyncMock()
    plugin.uninstall.side_effect = RuntimeError("cleanup failed")
    plugin_registry.get_or_create_instance.return_value = plugin

    await svc.uninstall(INSTALL_ID, ORG_ID, USER_PROFILE_ID)
    repo.uninstall_module.assert_called_once()
    repo.record_event.assert_called()


# --- Health check ---

@pytest.mark.asyncio
async def test_health_check_calls_plugin():
    svc, repo, _, _, plugin_registry = _make_service()
    row = _make_installed_row("installed")
    repo.get_installed_by_id.return_value = row

    plugin = AsyncMock()
    plugin.health_check.return_value = MagicMock(healthy=True, message="ok", details={})
    plugin_registry.get_or_create_instance.return_value = plugin

    result = await svc.health_check(INSTALL_ID, ORG_ID)
    assert result.status == "healthy"


# --- Policy pack transitions ---

@pytest.mark.asyncio
async def test_disable_policy_pack_calls_deactivate():
    svc, repo, _, policy_svc, plugin_registry = _make_service()
    row = _make_installed_row("installed", module_type="policy_pack")
    updated = {**row, "install_state": "disabled"}
    repo.get_installed_by_id.side_effect = [row, updated]

    plugin = AsyncMock()
    plugin_registry.get_or_create_instance.return_value = plugin

    result = await svc.disable(INSTALL_ID, ORG_ID, USER_PROFILE_ID)
    assert result["install_state"] == "disabled"
    policy_svc.deactivate_binding.assert_called_once()
  • [ ] Step 2: Run tests to verify they fail

Run: cd backend && python -m pytest tests/modules/test_marketplace_lifecycle.py -v 2>&1 | head -30 Expected: ModuleNotFoundError: No module named 'app.modules.marketplace.lifecycle'

  • [ ] Step 3: Implement ModuleLifecycleService

Create backend/app/modules/marketplace/lifecycle.py:

"""Marketplace module lifecycle state machine and plugin hook orchestration."""
from __future__ import annotations

import logging
from dataclasses import dataclass
from typing import Any
from uuid import UUID

from app.core.plugins.base import HealthStatus
from app.core.plugins.registry import PluginRegistry
from app.modules.connectors.repository import ConnectorRepository
from app.modules.marketplace.repository import MarketplaceRepository
from app.modules.marketplace.schemas import HealthStatusOut
from app.modules.policy_runtime.service import PolicyRuntimeService

logger = logging.getLogger(__name__)

MAX_RETRIES = 3

# Legal state transitions: {(from, to)}
LEGAL_TRANSITIONS: set[tuple[str, str]] = {
    ("provisioning", "installed"),
    ("provisioning", "error"),
    ("error", "provisioning"),
    ("installed", "disabled"),
    ("disabled", "installed"),
    ("installed", "removing"),
    ("disabled", "removing"),
    ("removing", "removed"),
    ("removing", "error"),
}


class InvalidTransitionError(Exception):
    """Raised when a state transition is not allowed."""


class ModuleLifecycleService:
    """Owns all install_state transitions and plugin hook orchestration."""

    def __init__(
        self,
        repo: MarketplaceRepository,
        connector_repo: ConnectorRepository,
        policy_runtime_svc: PolicyRuntimeService,
        plugin_registry: PluginRegistry,
    ) -> None:
        self.repo = repo
        self.connector_repo = connector_repo
        self.policy_runtime = policy_runtime_svc
        self.plugin_registry = plugin_registry

    def _assert_transition(self, current: str, target: str) -> None:
        if (current, target) not in LEGAL_TRANSITIONS:
            raise InvalidTransitionError(
                f"Cannot transition from '{current}' to '{target}'"
            )

    async def _get_row(self, installed_module_id: UUID, org_id: str) -> dict[str, Any]:
        row = await self.repo.get_installed_by_id(org_id, installed_module_id)
        if row is None:
            raise InvalidTransitionError(f"Installed module not found: {installed_module_id}")
        return row

    def _resolve_plugin(self, module_key: str) -> Any:
        """Get or create a plugin instance. Returns None if not in registry."""
        try:
            return self.plugin_registry.get_or_create_instance(module_key)
        except KeyError:
            logger.warning("No plugin registered for %s, skipping hooks", module_key)
            return None

    async def _call_hook(self, plugin: Any, hook_name: str, *args: Any, **kwargs: Any) -> Any:
        """Call a plugin hook if it exists (duck-typing adapter)."""
        method = getattr(plugin, hook_name, None)
        if method is None:
            return None
        return await method(*args, **kwargs)

    # --- Install ---

    async def run_install(
        self,
        *,
        installed_module_id: UUID,
        org_id: str,
        user_profile_id: UUID,
        module_key: str,
        module_type: str,
        config: dict[str, Any],
    ) -> dict[str, Any]:
        """Execute the provisioning → installed transition with plugin hooks."""
        plugin = self._resolve_plugin(module_key)
        resource_id: UUID | None = None

        try:
            if module_type == "connector":
                resource = await self.connector_repo.ensure_instance_for_install(
                    org_id=org_id,
                    installed_module_id=installed_module_id,
                    connector_key=module_key,
                    display_name=module_key,
                )
                resource_id = resource["id"]
                if plugin:
                    await self._call_hook(plugin, "configure", config)
                    await self._call_hook(plugin, "install", config)
                await self.connector_repo.upsert_instance_config(
                    org_id=org_id,
                    connector_key=module_key,
                    config_json=config,
                    enabled=True,
                )

            elif module_type == "policy_pack":
                binding = await self.policy_runtime.activate_pack_on_install(
                    org_id=org_id,
                    installed_module_id=installed_module_id,
                    module_key=module_key,
                    config_json=config,
                )
                resource_id = binding["id"]
                if plugin:
                    await self._call_hook(plugin, "get_policies")
                    await self._call_hook(plugin, "activate", config)

            await self.repo.update_state_and_error(
                org_id=org_id,
                installed_module_id=installed_module_id,
                install_state="installed",
                last_error=None,
            )

            await self.repo.record_event(
                org_id=org_id,
                user_profile_id=user_profile_id,
                event_type="marketplace.install.completed",
                module_id=None,
                payload={"module_key": module_key, "module_type": module_type},
            )

        except Exception as exc:
            logger.exception("Install failed for %s: %s", module_key, exc)
            # Rollback: clean up partially created resources
            await self._rollback_install(org_id, installed_module_id, module_type, resource_id)
            await self.repo.update_state_and_error(
                org_id=org_id,
                installed_module_id=installed_module_id,
                install_state="error",
                last_error=str(exc),
                increment_retry=True,
            )

        return await self._get_row(installed_module_id, org_id)

    async def _rollback_install(
        self,
        org_id: str,
        installed_module_id: UUID,
        module_type: str,
        resource_id: UUID | None,
    ) -> None:
        """Best-effort cleanup of resources created during install."""
        try:
            if module_type == "connector" and resource_id:
                await self.connector_repo.delete_instance(
                    org_id=org_id, instance_id=resource_id
                )
            elif module_type == "policy_pack" and resource_id:
                await self.policy_runtime.delete_binding(
                    org_id=org_id, binding_id=resource_id
                )
        except Exception as cleanup_exc:
            logger.warning(
                "Rollback cleanup failed for %s: %s", installed_module_id, cleanup_exc
            )

    # --- Enable ---

    async def enable(
        self,
        installed_module_id: UUID,
        org_id: str,
        user_profile_id: UUID,
    ) -> dict[str, Any]:
        row = await self._get_row(installed_module_id, org_id)
        self._assert_transition(row["install_state"], "installed")
        plugin = self._resolve_plugin(row["module_key"])

        if row["module_type"] == "connector":
            await self.connector_repo.upsert_instance_config(
                org_id=org_id,
                connector_key=row["module_key"],
                config_json=row.get("configured_json", {}),
                enabled=True,
            )
        elif row["module_type"] == "policy_pack":
            if plugin:
                await self._call_hook(plugin, "activate", row.get("configured_json", {}))
            await self.policy_runtime.enable_binding(
                org_id=org_id, installed_module_id=installed_module_id
            )

        await self.repo.update_state_and_error(
            org_id=org_id,
            installed_module_id=installed_module_id,
            install_state="installed",
            last_error=None,
        )
        await self.repo.record_event(
            org_id=org_id,
            user_profile_id=user_profile_id,
            event_type="marketplace.module.enabled",
            module_id=None,
            payload={"module_key": row["module_key"]},
        )
        return await self._get_row(installed_module_id, org_id)

    # --- Disable ---

    async def disable(
        self,
        installed_module_id: UUID,
        org_id: str,
        user_profile_id: UUID,
    ) -> dict[str, Any]:
        row = await self._get_row(installed_module_id, org_id)
        self._assert_transition(row["install_state"], "disabled")
        plugin = self._resolve_plugin(row["module_key"])

        if row["module_type"] == "connector":
            await self.connector_repo.upsert_instance_config(
                org_id=org_id,
                connector_key=row["module_key"],
                config_json=row.get("configured_json", {}),
                enabled=False,
            )
        elif row["module_type"] == "policy_pack":
            if plugin:
                await self._call_hook(plugin, "deactivate")
            await self.policy_runtime.deactivate_binding(
                org_id=org_id, installed_module_id=installed_module_id
            )

        await self.repo.update_state_and_error(
            org_id=org_id,
            installed_module_id=installed_module_id,
            install_state="disabled",
            last_error=None,
        )
        await self.repo.record_event(
            org_id=org_id,
            user_profile_id=user_profile_id,
            event_type="marketplace.module.disabled",
            module_id=None,
            payload={"module_key": row["module_key"]},
        )
        return await self._get_row(installed_module_id, org_id)

    # --- Retry ---

    async def retry(
        self,
        installed_module_id: UUID,
        org_id: str,
        user_profile_id: UUID,
    ) -> dict[str, Any]:
        row = await self._get_row(installed_module_id, org_id)
        self._assert_transition(row["install_state"], "provisioning")

        retry_count = await self.repo.get_retry_count(
            org_id=org_id, installed_module_id=installed_module_id
        )
        if retry_count >= MAX_RETRIES:
            raise InvalidTransitionError(
                f"Cannot retry: max retries ({MAX_RETRIES}) exceeded. Uninstall and reinstall."
            )

        await self.repo.clear_error_for_retry(
            org_id=org_id, installed_module_id=installed_module_id
        )

        return await self.run_install(
            installed_module_id=installed_module_id,
            org_id=org_id,
            user_profile_id=user_profile_id,
            module_key=row["module_key"],
            module_type=row["module_type"],
            config=row.get("configured_json", {}),
        )

    # --- Uninstall ---

    async def uninstall(
        self,
        installed_module_id: UUID,
        org_id: str,
        user_profile_id: UUID,
    ) -> None:
        row = await self._get_row(installed_module_id, org_id)
        self._assert_transition(row["install_state"], "removing")

        await self.repo.update_state_and_error(
            org_id=org_id,
            installed_module_id=installed_module_id,
            install_state="removing",
            last_error=None,
        )

        plugin = self._resolve_plugin(row["module_key"])

        # Best-effort plugin cleanup
        try:
            if row["module_type"] == "connector" and plugin:
                await self._call_hook(plugin, "uninstall", str(installed_module_id))
            elif row["module_type"] == "policy_pack" and plugin:
                await self._call_hook(plugin, "deactivate")
        except Exception as exc:
            logger.warning("Plugin cleanup failed during uninstall of %s: %s", installed_module_id, exc)

        # Delete resources regardless of plugin cleanup success
        try:
            if row["module_type"] == "connector":
                await self.connector_repo.delete_instance_by_module(
                    org_id=org_id, installed_module_id=installed_module_id
                )
            elif row["module_type"] == "policy_pack":
                await self.policy_runtime.delete_binding(
                    org_id=org_id, installed_module_id=installed_module_id
                )
        except Exception as exc:
            logger.warning("Resource cleanup failed during uninstall of %s: %s", installed_module_id, exc)

        await self.repo.uninstall_module(
            org_id=org_id, installation_id=str(installed_module_id)
        )
        await self.repo.record_event(
            org_id=org_id,
            user_profile_id=user_profile_id,
            event_type="marketplace.module.uninstalled",
            module_id=None,
            payload={"module_key": row["module_key"]},
        )

    # --- Health Check ---

    async def health_check(
        self,
        installed_module_id: UUID,
        org_id: str,
    ) -> HealthStatusOut:
        row = await self._get_row(installed_module_id, org_id)
        if row["install_state"] != "installed":
            raise InvalidTransitionError(
                f"Health check requires 'installed' state, got '{row['install_state']}'"
            )

        plugin = self._resolve_plugin(row["module_key"])

        if row["module_type"] == "connector" and plugin:
            health = await self._call_hook(plugin, "health_check", row.get("configured_json", {}))
            if health and isinstance(health, HealthStatus):
                return HealthStatusOut(
                    status="healthy" if health.healthy else "unhealthy",
                    detail=health.message,
                )

        if row["module_type"] == "policy_pack":
            binding_ok = await self.policy_runtime.check_binding_health(
                org_id=org_id, installed_module_id=installed_module_id
            )
            return HealthStatusOut(
                status="healthy" if binding_ok else "unhealthy",
                detail="OPA policy active" if binding_ok else "OPA policy not found",
            )

        return HealthStatusOut(status="healthy", detail="No health check available")
  • [ ] Step 4: Run tests

Run: cd backend && python -m pytest tests/modules/test_marketplace_lifecycle.py -v 2>&1 | tail -20 Expected: Tests pass (some may need small fixes based on exact mock wiring — adjust assertions to match the implementation)

  • [ ] Step 5: Commit
git add backend/app/modules/marketplace/lifecycle.py backend/tests/modules/test_marketplace_lifecycle.py
git commit -m "feat: ModuleLifecycleService with state machine, hooks, rollback, and tests"

Task 5: Repository Cleanup Methods for Rollback

The lifecycle service calls connector_repo.delete_instance(), connector_repo.delete_instance_by_module(), and policy_runtime.delete_binding(), policy_runtime.enable_binding(), policy_runtime.deactivate_binding(), policy_runtime.check_binding_health(). Some of these don't exist yet.

Files: - Modify: backend/app/modules/connectors/repository.py - Modify: backend/app/modules/policy_runtime/service.py - Modify: backend/app/modules/policy_runtime/repository.py

  • [ ] Step 1: Add delete_instance and delete_instance_by_module to ConnectorRepository

Add to ConnectorRepository in backend/app/modules/connectors/repository.py at the end of the class:

    async def delete_instance(self, *, org_id: str, instance_id: UUID) -> bool:
        query = """
            DELETE FROM connector_instances
            WHERE org_id = CAST(:org_id AS uuid) AND id = :instance_id
        """
        result = await self._execute(query, {"org_id": org_id, "instance_id": instance_id})
        return bool(result.rowcount and result.rowcount > 0)

    async def delete_instance_by_module(self, *, org_id: str, installed_module_id: UUID) -> bool:
        query = """
            DELETE FROM connector_instances
            WHERE org_id = CAST(:org_id AS uuid) AND installed_module_id = :installed_module_id
        """
        result = await self._execute(query, {"org_id": org_id, "installed_module_id": installed_module_id})
        return bool(result.rowcount and result.rowcount > 0)
  • [ ] Step 2: Add binding lifecycle methods to PolicyRuntimeService

Add to PolicyRuntimeService in backend/app/modules/policy_runtime/service.py at the end of the class:

    async def enable_binding(self, *, org_id: str, installed_module_id: UUID) -> None:
        await self.repo.update_binding_enabled(
            org_id=org_id, installed_module_id=installed_module_id, enabled=True
        )

    async def deactivate_binding(self, *, org_id: str, installed_module_id: UUID) -> None:
        await self.repo.update_binding_enabled(
            org_id=org_id, installed_module_id=installed_module_id, enabled=False
        )

    async def delete_binding(self, *, org_id: str, installed_module_id: UUID | None = None, binding_id: UUID | None = None) -> bool:
        return await self.repo.delete_binding(
            org_id=org_id, installed_module_id=installed_module_id, binding_id=binding_id
        )

    async def check_binding_health(self, *, org_id: str, installed_module_id: UUID) -> bool:
        binding = await self.repo.get_binding_by_module(org_id=org_id, installed_module_id=installed_module_id)
        if binding is None or not binding.get("enabled"):
            return False
        try:
            result = await self.opa.query(binding["decision_path"], {})
            return result is not None
        except Exception:
            return False
  • [ ] Step 3: Add repository methods for PolicyRuntimeRepository

Add to PolicyRuntimeRepository in backend/app/modules/policy_runtime/repository.py at the end of the class:

    async def update_binding_enabled(
        self, *, org_id: str, installed_module_id: UUID, enabled: bool
    ) -> None:
        query = """
            UPDATE policy_pack_bindings
            SET enabled = :enabled, updated_at = now()
            WHERE org_id = CAST(:org_id AS uuid) AND installed_module_id = :installed_module_id
        """
        await self._execute(query, {"org_id": org_id, "installed_module_id": installed_module_id, "enabled": enabled})

    async def delete_binding(
        self, *, org_id: str, installed_module_id: UUID | None = None, binding_id: UUID | None = None
    ) -> bool:
        if installed_module_id:
            query = """
                DELETE FROM policy_pack_bindings
                WHERE org_id = CAST(:org_id AS uuid) AND installed_module_id = :installed_module_id
            """
            result = await self._execute(query, {"org_id": org_id, "installed_module_id": installed_module_id})
        elif binding_id:
            query = """
                DELETE FROM policy_pack_bindings
                WHERE org_id = CAST(:org_id AS uuid) AND id = :binding_id
            """
            result = await self._execute(query, {"org_id": org_id, "binding_id": binding_id})
        else:
            return False
        return bool(result.rowcount and result.rowcount > 0)

    async def get_binding_by_module(self, *, org_id: str, installed_module_id: UUID) -> dict | None:
        query = """
            SELECT id, org_id, installed_module_id, policy_pack_key, decision_path, enabled
            FROM policy_pack_bindings
            WHERE org_id = CAST(:org_id AS uuid) AND installed_module_id = :installed_module_id
        """
        return await self._fetch_one(query, {"org_id": org_id, "installed_module_id": installed_module_id})
  • [ ] Step 4: Verify imports compile

Run: cd backend && python -m py_compile app/modules/connectors/repository.py && python -m py_compile app/modules/policy_runtime/service.py && python -m py_compile app/modules/policy_runtime/repository.py Expected: No output (success)

  • [ ] Step 5: Commit
git add backend/app/modules/connectors/repository.py backend/app/modules/policy_runtime/service.py backend/app/modules/policy_runtime/repository.py
git commit -m "feat: add delete/enable/deactivate/health methods for connector and policy repos"

Task 6: Dependency Injection — Wire Lifecycle Service

Files: - Modify: backend/app/modules/marketplace/dependencies.py

  • [ ] Step 1: Add get_lifecycle_service factory

Add to backend/app/modules/marketplace/dependencies.py:

# Add these imports at top:
from app.core.plugins.registry import PluginRegistry
from app.modules.marketplace.lifecycle import ModuleLifecycleService

# Add after get_marketplace_service:
def _get_plugin_registry() -> PluginRegistry:
    from app.modules.connectors.plugins import plugin_registry
    return plugin_registry


def get_lifecycle_service(
    repo: MarketplaceRepository = Depends(get_marketplace_repo),
    connector_repo: ConnectorRepository = Depends(get_connector_repo),
    policy_repo: PolicyRuntimeRepository = Depends(get_policy_runtime_repo),
    session: AsyncSession = Depends(get_session),
    opa_client: OpaClient = Depends(get_opa_client),
    plugin_registry: PluginRegistry = Depends(_get_plugin_registry),
) -> ModuleLifecycleService:
    policy_runtime = PolicyRuntimeService(policy_repo, session, opa_client)
    return ModuleLifecycleService(repo, connector_repo, policy_runtime, plugin_registry)
  • [ ] Step 2: Verify import

Run: cd backend && python -m py_compile app/modules/marketplace/dependencies.py Expected: No output (success)

  • [ ] Step 3: Commit
git add backend/app/modules/marketplace/dependencies.py
git commit -m "feat(di): wire ModuleLifecycleService with plugin registry"

Task 7: Router Updates — Action-Based PATCH + Health Endpoint

Files: - Modify: backend/app/modules/marketplace/router.py

  • [ ] Step 1: Add imports for lifecycle service and health schema

At the top of backend/app/modules/marketplace/router.py, add:

from app.modules.marketplace.dependencies import get_lifecycle_service
from app.modules.marketplace.lifecycle import ModuleLifecycleService, InvalidTransitionError
from app.modules.marketplace.schemas import HealthStatusOut
  • [ ] Step 2: Replace the PATCH endpoint

Replace the update_installed_module endpoint (lines 79-91) with:

@router.patch("/installed/{installed_module_id}", response_model=UpdateInstalledModuleResponse)
async def update_installed_module(
    installed_module_id: UUID,
    payload: UpdateInstalledModuleRequest,
    lifecycle: ModuleLifecycleService = Depends(get_lifecycle_service),
    service: MarketplaceService = Depends(get_marketplace_service),
    user: UserInfo = Depends(get_current_user),
):
    from app.core.principal import resolve_principal_context, require_org_roles
    from app.core.dependencies import get_session
    principal = await resolve_principal_context(lifecycle.repo.session, user)
    require_org_roles(principal, {"org_owner", "org_admin", "team_lead", "connector_manager", "policy_manager"})

    try:
        if payload.action == "enable":
            row = await lifecycle.enable(installed_module_id, principal.org_id, UUID(principal.user_profile_id))
        elif payload.action == "disable":
            row = await lifecycle.disable(installed_module_id, principal.org_id, UUID(principal.user_profile_id))
        elif payload.action == "retry":
            row = await lifecycle.retry(installed_module_id, principal.org_id, UUID(principal.user_profile_id))
        elif payload.channel:
            return await service.update_installed_module(
                user=user,
                installed_module_id=installed_module_id,
                install_state=None,
                channel=payload.channel,
            )
        else:
            raise HTTPException(status_code=400, detail="Must provide action or channel")
    except InvalidTransitionError as exc:
        raise HTTPException(status_code=400, detail=str(exc)) from exc

    return UpdateInstalledModuleResponse(installed_module=service._installed_row_to_out(row))
  • [ ] Step 3: Replace the DELETE endpoint

Replace the uninstall_module endpoint (lines 144-150) with:

@router.delete("/installations/{installation_id}", status_code=status.HTTP_204_NO_CONTENT)
async def uninstall_module(
    installation_id: UUID,
    lifecycle: ModuleLifecycleService = Depends(get_lifecycle_service),
    user: UserInfo = Depends(get_current_user),
):
    from app.core.principal import resolve_principal_context, require_org_roles
    principal = await resolve_principal_context(lifecycle.repo.session, user)
    require_org_roles(principal, {"org_owner", "org_admin", "team_lead", "connector_manager", "policy_manager"})

    try:
        await lifecycle.uninstall(installation_id, principal.org_id, UUID(principal.user_profile_id))
    except InvalidTransitionError as exc:
        raise HTTPException(status_code=400, detail=str(exc)) from exc
  • [ ] Step 4: Add health check endpoint

Add after the configure endpoint:

@router.get("/installed/{installed_module_id}/health", response_model=HealthStatusOut)
async def get_module_health(
    installed_module_id: UUID,
    lifecycle: ModuleLifecycleService = Depends(get_lifecycle_service),
    user: UserInfo = Depends(get_current_user),
):
    from app.core.principal import resolve_principal_context
    principal = await resolve_principal_context(lifecycle.repo.session, user)

    try:
        return await lifecycle.health_check(installed_module_id, principal.org_id)
    except InvalidTransitionError as exc:
        raise HTTPException(status_code=404, detail=str(exc)) from exc
  • [ ] Step 5: Verify compilation

Run: cd backend && python -m py_compile app/modules/marketplace/router.py Expected: No output (success)

  • [ ] Step 6: Commit
git add backend/app/modules/marketplace/router.py
git commit -m "feat(router): action-based PATCH, lifecycle-powered DELETE, health endpoint"

Task 8: Service Cleanup — Remove Duplicated Logic

Files: - Modify: backend/app/modules/marketplace/service.py

  • [ ] Step 1: Simplify install_module to delegate resource creation to lifecycle

In backend/app/modules/marketplace/service.py, replace the block in install_module() from the install_row upsert through the return (lines 259-311). The method should still handle entitlement/license validation, but after creating the installed_modules row, delegate to the lifecycle service for resource creation:

Replace lines 259-311 with:

        install_row = await self.repo.upsert_install(
            org_id=effective_org_id,
            module_id=module["id"],
            installed_by_user_profile_id=UUID(principal.user_profile_id),
            artifact_ref=artifact_ref,
            artifact_digest=artifact_digest,
            config_json=config_json,
            verification_json=verification_json,
        )

        # Lifecycle service handles resource creation + plugin hooks.
        # It is injected at router level; here we return the raw install response.
        # Resource creation is now handled by the lifecycle service called from the router.

        installed_full = await self.repo.get_installed_by_id(effective_org_id, install_row["id"])
        if installed_full is None:
            raise HTTPException(status_code=500, detail="Installed module row not found after upsert")

        return InstallModuleResponse(
            installed_module=self._installed_row_to_out(installed_full),
            connector_instance_id=None,
            policy_binding_id=None,
        )
  • [ ] Step 2: Update _installed_row_to_out to include retry_count

In backend/app/modules/marketplace/service.py, add to _installed_row_to_out (around line 400):

        retry_count=row.get("retry_count", 0),

Add this after the last_error=row.get("last_error"), line.

  • [ ] Step 3: Update the install route to call lifecycle after service

In backend/app/modules/marketplace/router.py, update the install_module endpoint to call lifecycle:

@router.post("/install", response_model=InstallModuleResponse)
async def install_module(
    payload: InstallModuleRequest,
    service: MarketplaceService = Depends(get_marketplace_service),
    lifecycle: ModuleLifecycleService = Depends(get_lifecycle_service),
    user: UserInfo = Depends(get_current_user),
):
    result = await service.install_module(
        user=user,
        module_key=payload.module_key,
        org_id=payload.org_id,
        artifact_ref=payload.artifact_ref,
        artifact_digest=payload.artifact_digest,
        config_json=payload.config_json,
        license_token=payload.license_token,
    )

    # Run lifecycle hooks (configure, install) for the newly created installation
    from app.core.principal import resolve_principal_context
    principal = await resolve_principal_context(lifecycle.repo.session, user)
    module = await lifecycle.repo.get_module_by_key(payload.module_key)

    installed_row = await lifecycle.run_install(
        installed_module_id=result.installed_module.id,
        org_id=principal.org_id,
        user_profile_id=UUID(principal.user_profile_id),
        module_key=payload.module_key,
        module_type=module["module_type"],
        config=payload.config_json,
    )

    return InstallModuleResponse(
        installed_module=service._installed_row_to_out(installed_row),
        connector_instance_id=None,
        policy_binding_id=None,
    )
  • [ ] Step 4: Verify compilation

Run: cd backend && python -m py_compile app/modules/marketplace/service.py && python -m py_compile app/modules/marketplace/router.py Expected: No output (success)

  • [ ] Step 5: Commit
git add backend/app/modules/marketplace/service.py backend/app/modules/marketplace/router.py
git commit -m "refactor: delegate install resource creation to lifecycle service"

Task 9: Update Existing Tests

Files: - Modify: backend/tests/modules/test_marketplace.py

  • [ ] Step 1: Update test for action-based PATCH

Add a new test to backend/tests/modules/test_marketplace.py:

@pytest.mark.asyncio
async def test_update_installed_module_action_enable(app, client):
    installed_id = str(uuid4())

    mock_lifecycle = AsyncMock()
    mock_lifecycle.enable.return_value = {
        "id": installed_id,
        "org_id": str(uuid4()),
        "module_id": str(uuid4()),
        "module_key": "connector.github",
        "module_name": "GitHub",
        "module_version": "1.0.0",
        "module_publisher": "Substrate Labs",
        "module_type": "connector",
        "billing_mode": "free",
        "install_state": "installed",
        "channel": "stable",
        "artifact_ref": "ref",
        "artifact_digest": "digest",
        "configured_json": {},
        "verification_json": {},
        "verified_at": None,
        "last_error": None,
        "retry_count": 0,
        "created_at": "2026-03-28T00:00:00",
        "updated_at": "2026-03-28T00:00:00",
    }
    mock_lifecycle.repo = AsyncMock()

    from app.modules.marketplace.dependencies import get_lifecycle_service
    from app.core.principal import resolve_principal_context
    from unittest.mock import patch, MagicMock

    principal = MagicMock()
    principal.org_id = str(uuid4())
    principal.user_profile_id = str(uuid4())

    with patch("app.modules.marketplace.router.resolve_principal_context", return_value=principal):
        app.dependency_overrides[get_lifecycle_service] = lambda: mock_lifecycle
        resp = await client.patch(
            f"/api/v1/marketplace/installed/{installed_id}",
            json={"action": "enable"},
        )

    assert resp.status_code == 200
    assert resp.json()["installed_module"]["install_state"] == "installed"
    app.dependency_overrides.pop(get_lifecycle_service, None)
  • [ ] Step 2: Run all marketplace tests

Run: cd backend && python -m pytest tests/modules/test_marketplace.py tests/modules/test_marketplace_lifecycle.py -v 2>&1 | tail -20 Expected: All tests pass

  • [ ] Step 3: Commit
git add backend/tests/modules/test_marketplace.py
git commit -m "test: update marketplace tests for action-based PATCH schema"

Task 10: Run Full Test Suite + Final Verification

Files: None (verification only)

  • [ ] Step 1: Run all backend tests

Run: cd backend && python -m pytest tests/ -v --tb=short 2>&1 | tail -30 Expected: All tests pass

  • [ ] Step 2: Verify type checking

Run: cd backend && python -m py_compile app/modules/marketplace/lifecycle.py && python -m py_compile app/modules/marketplace/schemas.py && python -m py_compile app/modules/marketplace/router.py && python -m py_compile app/modules/marketplace/service.py && python -m py_compile app/modules/marketplace/dependencies.py && echo "All OK" Expected: All OK

  • [ ] Step 3: Update changelog

Add a section to changelog.md documenting: - State machine implementation with 7 states - Plugin lifecycle hook wiring - Rollback strategy - Action-based PATCH endpoint - Health check endpoint - Soft-delete uninstall - New tests

  • [ ] Step 4: Final commit
git add changelog.md
git commit -m "docs: update changelog with marketplace lifecycle implementation"