Marketplace Lifecycle Execution — Implementation Plan¶
For agentic workers: REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (
- [ ]) syntax for tracking.
Goal: Make marketplace module install/enable/disable/retry/uninstall lifecycle production-real with plugin hook orchestration and rollback.
Architecture: Finite state machine on installed_modules.install_state with transitions enforced by ModuleLifecycleService. Plugin hooks (configure, install, activate, deactivate, uninstall, health_check) called at transition boundaries. Rollback on install failure, best-effort on uninstall.
Tech Stack: Python 3.12, FastAPI, asyncpg (via SQLAlchemy async), Pydantic v2, pytest + pytest-asyncio
Spec: docs/superpowers/specs/2026-03-28-marketplace-lifecycle-design.md
File Structure¶
| File | Responsibility |
|---|---|
backend/db/postgres/V12__installed_modules_retry_count.sql |
Create. Add retry_count column. |
backend/app/modules/marketplace/lifecycle.py |
Create. ModuleLifecycleService — owns all state transitions and plugin hook calls. |
backend/app/modules/marketplace/schemas.py |
Modify. Add action field to update request, add HealthStatusOut, add retry_count to output, add removing/removed to InstallState. |
backend/app/modules/marketplace/repository.py |
Modify. Add update_state_and_error(), increment_retry(), change uninstall_module() to soft-delete. |
backend/app/modules/marketplace/dependencies.py |
Modify. Add get_lifecycle_service() factory. |
backend/app/modules/marketplace/router.py |
Modify. Wire action-based PATCH to lifecycle service, add health endpoint, wire DELETE to lifecycle. |
backend/app/modules/marketplace/service.py |
Modify. Remove install/update/uninstall logic, delegate to lifecycle service. |
backend/tests/modules/test_marketplace_lifecycle.py |
Create. Unit tests for state machine transitions, rollback, retry. |
backend/tests/modules/test_marketplace.py |
Modify. Update for new PATCH schema. |
Task 1: Database Migration — Add retry_count Column¶
Files:
- Create: backend/db/postgres/V12__installed_modules_retry_count.sql
- [ ] Step 1: Write the migration
-- V12: Add retry_count to installed_modules for lifecycle retry tracking.
-- Also add 'removing' and 'removed' to the install_state CHECK constraint.
ALTER TABLE installed_modules DROP CONSTRAINT IF EXISTS installed_modules_install_state_check;
ALTER TABLE installed_modules
ADD CONSTRAINT installed_modules_install_state_check
CHECK (install_state IN ('installed', 'disabled', 'provisioning', 'error', 'removing', 'removed'));
ALTER TABLE installed_modules
ADD COLUMN IF NOT EXISTS retry_count INTEGER NOT NULL DEFAULT 0;
- [ ] Step 2: Verify migration file exists and SQL is valid
Run: python -c "open('backend/db/postgres/V12__installed_modules_retry_count.sql').read()"
Expected: No error
- [ ] Step 3: Commit
git add backend/db/postgres/V12__installed_modules_retry_count.sql
git commit -m "feat(db): add retry_count column and extended install_state constraint"
Task 2: Schema Updates — Action-Based PATCH + HealthStatusOut¶
Files:
- Modify: backend/app/modules/marketplace/schemas.py
- [ ] Step 1: Update
InstallStateto include new states
In backend/app/modules/marketplace/schemas.py, change line 11:
# before
InstallState = Literal["installed", "disabled", "provisioning", "error"]
# after
InstallState = Literal["installed", "disabled", "provisioning", "error", "removing", "removed"]
- [ ] Step 2: Add
retry_counttoInstalledModuleOut
In backend/app/modules/marketplace/schemas.py, add after line 82 (last_error: str | None = None):
retry_count: int = 0
- [ ] Step 3: Replace
UpdateInstalledModuleRequestwith action-based schema
In backend/app/modules/marketplace/schemas.py, replace lines 98-100:
# before
class UpdateInstalledModuleRequest(BaseModel):
install_state: InstallState | None = None
channel: Literal["stable", "candidate"] | None = None
# after
LifecycleAction = Literal["enable", "disable", "retry"]
class UpdateInstalledModuleRequest(BaseModel):
action: LifecycleAction | None = None
channel: Literal["stable", "candidate"] | None = None
- [ ] Step 4: Add
HealthStatusOutschema
Add at the end of backend/app/modules/marketplace/schemas.py:
class HealthStatusOut(BaseModel):
status: Literal["healthy", "degraded", "unhealthy"]
detail: str = ""
- [ ] Step 5: Run type check
Run: cd backend && python -m py_compile app/modules/marketplace/schemas.py
Expected: No output (success)
- [ ] Step 6: Commit
git add backend/app/modules/marketplace/schemas.py
git commit -m "feat(schemas): action-based PATCH, HealthStatusOut, retry_count, extended InstallState"
Task 3: Repository Updates — State Helpers + Soft-Delete¶
Files:
- Modify: backend/app/modules/marketplace/repository.py
- [ ] Step 1: Add
update_state_and_error()method
Add to MarketplaceRepository class in backend/app/modules/marketplace/repository.py, after update_installed_state (around line 427):
async def update_state_and_error(
self,
*,
org_id: str,
installed_module_id: UUID,
install_state: str,
last_error: str | None = None,
increment_retry: bool = False,
) -> None:
sets = [
"install_state = :install_state",
"last_error = :last_error",
"updated_at = now()",
]
params: dict[str, Any] = {
"org_id": org_id,
"installed_module_id": installed_module_id,
"install_state": install_state,
"last_error": last_error,
}
if increment_retry:
sets.append("retry_count = retry_count + 1")
query = f"""
UPDATE installed_modules
SET {', '.join(sets)}
WHERE org_id = CAST(:org_id AS uuid)
AND id = :installed_module_id
"""
await self._execute(query, params)
async def clear_error_for_retry(
self,
*,
org_id: str,
installed_module_id: UUID,
) -> None:
query = """
UPDATE installed_modules
SET install_state = 'provisioning',
last_error = NULL,
updated_at = now()
WHERE org_id = CAST(:org_id AS uuid)
AND id = :installed_module_id
"""
await self._execute(
query,
{"org_id": org_id, "installed_module_id": installed_module_id},
)
async def get_retry_count(self, *, org_id: str, installed_module_id: UUID) -> int:
query = """
SELECT retry_count FROM installed_modules
WHERE org_id = CAST(:org_id AS uuid) AND id = :installed_module_id
"""
row = await self._fetch_one(query, {"org_id": org_id, "installed_module_id": installed_module_id})
return row["retry_count"] if row else 0
- [ ] Step 2: Change
uninstall_moduleto soft-delete
In backend/app/modules/marketplace/repository.py, replace the uninstall_module method (lines 653-666):
async def uninstall_module(self, *, org_id: str, installation_id: str) -> bool:
query = """
UPDATE installed_modules
SET install_state = 'removed',
updated_at = now()
WHERE org_id = CAST(:org_id AS uuid)
AND id = CAST(:installation_id AS uuid)
"""
result = await self._execute(
query,
{
"org_id": org_id,
"installation_id": installation_id,
},
)
return bool(result.rowcount and result.rowcount > 0)
- [ ] Step 3: Run type check
Run: cd backend && python -m py_compile app/modules/marketplace/repository.py
Expected: No output (success)
- [ ] Step 4: Commit
git add backend/app/modules/marketplace/repository.py
git commit -m "feat(repo): state+error update helpers, soft-delete uninstall, retry support"
Task 4: Lifecycle Service — State Machine + Hook Orchestration¶
Files:
- Create: backend/app/modules/marketplace/lifecycle.py
- Create: backend/tests/modules/test_marketplace_lifecycle.py
This is the core task. TDD: write tests first, then implement.
- [ ] Step 1: Write unit tests for the lifecycle service
Create backend/tests/modules/test_marketplace_lifecycle.py:
"""Unit tests for ModuleLifecycleService state machine."""
from __future__ import annotations
from unittest.mock import AsyncMock, MagicMock, patch
from uuid import uuid4
import pytest
from app.modules.marketplace.lifecycle import ModuleLifecycleService, InvalidTransitionError
MODULE_KEY = "connector.github"
ORG_ID = str(uuid4())
INSTALL_ID = uuid4()
MODULE_ID = uuid4()
USER_PROFILE_ID = uuid4()
def _make_installed_row(state: str, retry_count: int = 0, module_type: str = "connector") -> dict:
return {
"id": INSTALL_ID,
"org_id": ORG_ID,
"module_id": MODULE_ID,
"module_key": MODULE_KEY,
"module_name": "GitHub Data Connector",
"module_version": "1.0.0",
"module_publisher": "Substrate Labs",
"module_type": module_type,
"billing_mode": "free",
"install_state": state,
"channel": "stable",
"artifact_ref": "registry.substrate.io/connectors/github@sha256:abc",
"artifact_digest": "sha256:abc",
"configured_json": {},
"verification_json": {},
"verified_at": None,
"last_error": None,
"retry_count": retry_count,
"created_at": "2026-03-28T00:00:00",
"updated_at": "2026-03-28T00:00:00",
}
def _make_service() -> tuple[ModuleLifecycleService, AsyncMock, AsyncMock, AsyncMock, MagicMock]:
repo = AsyncMock()
connector_repo = AsyncMock()
policy_runtime_svc = AsyncMock()
plugin_registry = MagicMock()
svc = ModuleLifecycleService(repo, connector_repo, policy_runtime_svc, plugin_registry)
return svc, repo, connector_repo, policy_runtime_svc, plugin_registry
# --- Transition validation ---
@pytest.mark.asyncio
async def test_enable_from_disabled():
svc, repo, connector_repo, _, plugin_registry = _make_service()
row = _make_installed_row("disabled")
repo.get_installed_by_id.return_value = row
plugin = AsyncMock()
plugin_registry.get_or_create_instance.return_value = plugin
updated_row = {**row, "install_state": "installed"}
repo.get_installed_by_id.side_effect = [row, updated_row]
result = await svc.enable(INSTALL_ID, ORG_ID, USER_PROFILE_ID)
assert result["install_state"] == "installed"
connector_repo.upsert_instance_config.assert_called_once()
@pytest.mark.asyncio
async def test_disable_from_installed():
svc, repo, connector_repo, _, plugin_registry = _make_service()
row = _make_installed_row("installed")
plugin = AsyncMock()
plugin_registry.get_or_create_instance.return_value = plugin
updated_row = {**row, "install_state": "disabled"}
repo.get_installed_by_id.side_effect = [row, updated_row]
result = await svc.disable(INSTALL_ID, ORG_ID, USER_PROFILE_ID)
assert result["install_state"] == "disabled"
@pytest.mark.asyncio
async def test_enable_from_installed_raises():
svc, repo, *_ = _make_service()
row = _make_installed_row("installed")
repo.get_installed_by_id.return_value = row
with pytest.raises(InvalidTransitionError):
await svc.enable(INSTALL_ID, ORG_ID, USER_PROFILE_ID)
@pytest.mark.asyncio
async def test_disable_from_error_raises():
svc, repo, *_ = _make_service()
row = _make_installed_row("error")
repo.get_installed_by_id.return_value = row
with pytest.raises(InvalidTransitionError):
await svc.disable(INSTALL_ID, ORG_ID, USER_PROFILE_ID)
# --- Install rollback ---
@pytest.mark.asyncio
async def test_install_rollback_on_plugin_configure_failure():
svc, repo, connector_repo, _, plugin_registry = _make_service()
plugin = AsyncMock()
plugin.configure.side_effect = RuntimeError("bad config")
plugin_registry.get_or_create_instance.return_value = plugin
connector_repo.ensure_instance_for_install.return_value = {"id": uuid4()}
row_provisioning = _make_installed_row("provisioning")
row_error = {**row_provisioning, "install_state": "error", "last_error": "bad config"}
repo.get_installed_by_id.side_effect = [row_error]
result = await svc.run_install(
installed_module_id=INSTALL_ID,
org_id=ORG_ID,
user_profile_id=USER_PROFILE_ID,
module_key=MODULE_KEY,
module_type="connector",
config={}
)
assert result["install_state"] == "error"
assert result["last_error"] == "bad config"
repo.update_state_and_error.assert_called()
connector_repo.delete_instance.assert_called_once()
# --- Retry ---
@pytest.mark.asyncio
async def test_retry_from_error():
svc, repo, connector_repo, _, plugin_registry = _make_service()
row = _make_installed_row("error", retry_count=1)
plugin = AsyncMock()
plugin_registry.get_or_create_instance.return_value = plugin
connector_repo.ensure_instance_for_install.return_value = {"id": uuid4()}
row_installed = {**row, "install_state": "installed", "retry_count": 2}
repo.get_installed_by_id.side_effect = [row, row_installed]
repo.get_retry_count.return_value = 1
result = await svc.retry(INSTALL_ID, ORG_ID, USER_PROFILE_ID)
assert result["install_state"] == "installed"
repo.clear_error_for_retry.assert_called_once()
@pytest.mark.asyncio
async def test_retry_blocked_after_max():
svc, repo, *_ = _make_service()
row = _make_installed_row("error", retry_count=3)
repo.get_installed_by_id.return_value = row
repo.get_retry_count.return_value = 3
with pytest.raises(InvalidTransitionError, match="max retries"):
await svc.retry(INSTALL_ID, ORG_ID, USER_PROFILE_ID)
# --- Uninstall best-effort ---
@pytest.mark.asyncio
async def test_uninstall_best_effort_on_plugin_failure():
svc, repo, connector_repo, _, plugin_registry = _make_service()
row = _make_installed_row("installed")
repo.get_installed_by_id.return_value = row
plugin = AsyncMock()
plugin.uninstall.side_effect = RuntimeError("cleanup failed")
plugin_registry.get_or_create_instance.return_value = plugin
await svc.uninstall(INSTALL_ID, ORG_ID, USER_PROFILE_ID)
repo.uninstall_module.assert_called_once()
repo.record_event.assert_called()
# --- Health check ---
@pytest.mark.asyncio
async def test_health_check_calls_plugin():
svc, repo, _, _, plugin_registry = _make_service()
row = _make_installed_row("installed")
repo.get_installed_by_id.return_value = row
plugin = AsyncMock()
plugin.health_check.return_value = MagicMock(healthy=True, message="ok", details={})
plugin_registry.get_or_create_instance.return_value = plugin
result = await svc.health_check(INSTALL_ID, ORG_ID)
assert result.status == "healthy"
# --- Policy pack transitions ---
@pytest.mark.asyncio
async def test_disable_policy_pack_calls_deactivate():
svc, repo, _, policy_svc, plugin_registry = _make_service()
row = _make_installed_row("installed", module_type="policy_pack")
updated = {**row, "install_state": "disabled"}
repo.get_installed_by_id.side_effect = [row, updated]
plugin = AsyncMock()
plugin_registry.get_or_create_instance.return_value = plugin
result = await svc.disable(INSTALL_ID, ORG_ID, USER_PROFILE_ID)
assert result["install_state"] == "disabled"
policy_svc.deactivate_binding.assert_called_once()
- [ ] Step 2: Run tests to verify they fail
Run: cd backend && python -m pytest tests/modules/test_marketplace_lifecycle.py -v 2>&1 | head -30
Expected: ModuleNotFoundError: No module named 'app.modules.marketplace.lifecycle'
- [ ] Step 3: Implement
ModuleLifecycleService
Create backend/app/modules/marketplace/lifecycle.py:
"""Marketplace module lifecycle state machine and plugin hook orchestration."""
from __future__ import annotations
import logging
from dataclasses import dataclass
from typing import Any
from uuid import UUID
from app.core.plugins.base import HealthStatus
from app.core.plugins.registry import PluginRegistry
from app.modules.connectors.repository import ConnectorRepository
from app.modules.marketplace.repository import MarketplaceRepository
from app.modules.marketplace.schemas import HealthStatusOut
from app.modules.policy_runtime.service import PolicyRuntimeService
logger = logging.getLogger(__name__)
MAX_RETRIES = 3
# Legal state transitions: {(from, to)}
LEGAL_TRANSITIONS: set[tuple[str, str]] = {
("provisioning", "installed"),
("provisioning", "error"),
("error", "provisioning"),
("installed", "disabled"),
("disabled", "installed"),
("installed", "removing"),
("disabled", "removing"),
("removing", "removed"),
("removing", "error"),
}
class InvalidTransitionError(Exception):
"""Raised when a state transition is not allowed."""
class ModuleLifecycleService:
"""Owns all install_state transitions and plugin hook orchestration."""
def __init__(
self,
repo: MarketplaceRepository,
connector_repo: ConnectorRepository,
policy_runtime_svc: PolicyRuntimeService,
plugin_registry: PluginRegistry,
) -> None:
self.repo = repo
self.connector_repo = connector_repo
self.policy_runtime = policy_runtime_svc
self.plugin_registry = plugin_registry
def _assert_transition(self, current: str, target: str) -> None:
if (current, target) not in LEGAL_TRANSITIONS:
raise InvalidTransitionError(
f"Cannot transition from '{current}' to '{target}'"
)
async def _get_row(self, installed_module_id: UUID, org_id: str) -> dict[str, Any]:
row = await self.repo.get_installed_by_id(org_id, installed_module_id)
if row is None:
raise InvalidTransitionError(f"Installed module not found: {installed_module_id}")
return row
def _resolve_plugin(self, module_key: str) -> Any:
"""Get or create a plugin instance. Returns None if not in registry."""
try:
return self.plugin_registry.get_or_create_instance(module_key)
except KeyError:
logger.warning("No plugin registered for %s, skipping hooks", module_key)
return None
async def _call_hook(self, plugin: Any, hook_name: str, *args: Any, **kwargs: Any) -> Any:
"""Call a plugin hook if it exists (duck-typing adapter)."""
method = getattr(plugin, hook_name, None)
if method is None:
return None
return await method(*args, **kwargs)
# --- Install ---
async def run_install(
self,
*,
installed_module_id: UUID,
org_id: str,
user_profile_id: UUID,
module_key: str,
module_type: str,
config: dict[str, Any],
) -> dict[str, Any]:
"""Execute the provisioning → installed transition with plugin hooks."""
plugin = self._resolve_plugin(module_key)
resource_id: UUID | None = None
try:
if module_type == "connector":
resource = await self.connector_repo.ensure_instance_for_install(
org_id=org_id,
installed_module_id=installed_module_id,
connector_key=module_key,
display_name=module_key,
)
resource_id = resource["id"]
if plugin:
await self._call_hook(plugin, "configure", config)
await self._call_hook(plugin, "install", config)
await self.connector_repo.upsert_instance_config(
org_id=org_id,
connector_key=module_key,
config_json=config,
enabled=True,
)
elif module_type == "policy_pack":
binding = await self.policy_runtime.activate_pack_on_install(
org_id=org_id,
installed_module_id=installed_module_id,
module_key=module_key,
config_json=config,
)
resource_id = binding["id"]
if plugin:
await self._call_hook(plugin, "get_policies")
await self._call_hook(plugin, "activate", config)
await self.repo.update_state_and_error(
org_id=org_id,
installed_module_id=installed_module_id,
install_state="installed",
last_error=None,
)
await self.repo.record_event(
org_id=org_id,
user_profile_id=user_profile_id,
event_type="marketplace.install.completed",
module_id=None,
payload={"module_key": module_key, "module_type": module_type},
)
except Exception as exc:
logger.exception("Install failed for %s: %s", module_key, exc)
# Rollback: clean up partially created resources
await self._rollback_install(org_id, installed_module_id, module_type, resource_id)
await self.repo.update_state_and_error(
org_id=org_id,
installed_module_id=installed_module_id,
install_state="error",
last_error=str(exc),
increment_retry=True,
)
return await self._get_row(installed_module_id, org_id)
async def _rollback_install(
self,
org_id: str,
installed_module_id: UUID,
module_type: str,
resource_id: UUID | None,
) -> None:
"""Best-effort cleanup of resources created during install."""
try:
if module_type == "connector" and resource_id:
await self.connector_repo.delete_instance(
org_id=org_id, instance_id=resource_id
)
elif module_type == "policy_pack" and resource_id:
await self.policy_runtime.delete_binding(
org_id=org_id, binding_id=resource_id
)
except Exception as cleanup_exc:
logger.warning(
"Rollback cleanup failed for %s: %s", installed_module_id, cleanup_exc
)
# --- Enable ---
async def enable(
self,
installed_module_id: UUID,
org_id: str,
user_profile_id: UUID,
) -> dict[str, Any]:
row = await self._get_row(installed_module_id, org_id)
self._assert_transition(row["install_state"], "installed")
plugin = self._resolve_plugin(row["module_key"])
if row["module_type"] == "connector":
await self.connector_repo.upsert_instance_config(
org_id=org_id,
connector_key=row["module_key"],
config_json=row.get("configured_json", {}),
enabled=True,
)
elif row["module_type"] == "policy_pack":
if plugin:
await self._call_hook(plugin, "activate", row.get("configured_json", {}))
await self.policy_runtime.enable_binding(
org_id=org_id, installed_module_id=installed_module_id
)
await self.repo.update_state_and_error(
org_id=org_id,
installed_module_id=installed_module_id,
install_state="installed",
last_error=None,
)
await self.repo.record_event(
org_id=org_id,
user_profile_id=user_profile_id,
event_type="marketplace.module.enabled",
module_id=None,
payload={"module_key": row["module_key"]},
)
return await self._get_row(installed_module_id, org_id)
# --- Disable ---
async def disable(
self,
installed_module_id: UUID,
org_id: str,
user_profile_id: UUID,
) -> dict[str, Any]:
row = await self._get_row(installed_module_id, org_id)
self._assert_transition(row["install_state"], "disabled")
plugin = self._resolve_plugin(row["module_key"])
if row["module_type"] == "connector":
await self.connector_repo.upsert_instance_config(
org_id=org_id,
connector_key=row["module_key"],
config_json=row.get("configured_json", {}),
enabled=False,
)
elif row["module_type"] == "policy_pack":
if plugin:
await self._call_hook(plugin, "deactivate")
await self.policy_runtime.deactivate_binding(
org_id=org_id, installed_module_id=installed_module_id
)
await self.repo.update_state_and_error(
org_id=org_id,
installed_module_id=installed_module_id,
install_state="disabled",
last_error=None,
)
await self.repo.record_event(
org_id=org_id,
user_profile_id=user_profile_id,
event_type="marketplace.module.disabled",
module_id=None,
payload={"module_key": row["module_key"]},
)
return await self._get_row(installed_module_id, org_id)
# --- Retry ---
async def retry(
self,
installed_module_id: UUID,
org_id: str,
user_profile_id: UUID,
) -> dict[str, Any]:
row = await self._get_row(installed_module_id, org_id)
self._assert_transition(row["install_state"], "provisioning")
retry_count = await self.repo.get_retry_count(
org_id=org_id, installed_module_id=installed_module_id
)
if retry_count >= MAX_RETRIES:
raise InvalidTransitionError(
f"Cannot retry: max retries ({MAX_RETRIES}) exceeded. Uninstall and reinstall."
)
await self.repo.clear_error_for_retry(
org_id=org_id, installed_module_id=installed_module_id
)
return await self.run_install(
installed_module_id=installed_module_id,
org_id=org_id,
user_profile_id=user_profile_id,
module_key=row["module_key"],
module_type=row["module_type"],
config=row.get("configured_json", {}),
)
# --- Uninstall ---
async def uninstall(
self,
installed_module_id: UUID,
org_id: str,
user_profile_id: UUID,
) -> None:
row = await self._get_row(installed_module_id, org_id)
self._assert_transition(row["install_state"], "removing")
await self.repo.update_state_and_error(
org_id=org_id,
installed_module_id=installed_module_id,
install_state="removing",
last_error=None,
)
plugin = self._resolve_plugin(row["module_key"])
# Best-effort plugin cleanup
try:
if row["module_type"] == "connector" and plugin:
await self._call_hook(plugin, "uninstall", str(installed_module_id))
elif row["module_type"] == "policy_pack" and plugin:
await self._call_hook(plugin, "deactivate")
except Exception as exc:
logger.warning("Plugin cleanup failed during uninstall of %s: %s", installed_module_id, exc)
# Delete resources regardless of plugin cleanup success
try:
if row["module_type"] == "connector":
await self.connector_repo.delete_instance_by_module(
org_id=org_id, installed_module_id=installed_module_id
)
elif row["module_type"] == "policy_pack":
await self.policy_runtime.delete_binding(
org_id=org_id, installed_module_id=installed_module_id
)
except Exception as exc:
logger.warning("Resource cleanup failed during uninstall of %s: %s", installed_module_id, exc)
await self.repo.uninstall_module(
org_id=org_id, installation_id=str(installed_module_id)
)
await self.repo.record_event(
org_id=org_id,
user_profile_id=user_profile_id,
event_type="marketplace.module.uninstalled",
module_id=None,
payload={"module_key": row["module_key"]},
)
# --- Health Check ---
async def health_check(
self,
installed_module_id: UUID,
org_id: str,
) -> HealthStatusOut:
row = await self._get_row(installed_module_id, org_id)
if row["install_state"] != "installed":
raise InvalidTransitionError(
f"Health check requires 'installed' state, got '{row['install_state']}'"
)
plugin = self._resolve_plugin(row["module_key"])
if row["module_type"] == "connector" and plugin:
health = await self._call_hook(plugin, "health_check", row.get("configured_json", {}))
if health and isinstance(health, HealthStatus):
return HealthStatusOut(
status="healthy" if health.healthy else "unhealthy",
detail=health.message,
)
if row["module_type"] == "policy_pack":
binding_ok = await self.policy_runtime.check_binding_health(
org_id=org_id, installed_module_id=installed_module_id
)
return HealthStatusOut(
status="healthy" if binding_ok else "unhealthy",
detail="OPA policy active" if binding_ok else "OPA policy not found",
)
return HealthStatusOut(status="healthy", detail="No health check available")
- [ ] Step 4: Run tests
Run: cd backend && python -m pytest tests/modules/test_marketplace_lifecycle.py -v 2>&1 | tail -20
Expected: Tests pass (some may need small fixes based on exact mock wiring — adjust assertions to match the implementation)
- [ ] Step 5: Commit
git add backend/app/modules/marketplace/lifecycle.py backend/tests/modules/test_marketplace_lifecycle.py
git commit -m "feat: ModuleLifecycleService with state machine, hooks, rollback, and tests"
Task 5: Repository Cleanup Methods for Rollback¶
The lifecycle service calls connector_repo.delete_instance(), connector_repo.delete_instance_by_module(), and policy_runtime.delete_binding(), policy_runtime.enable_binding(), policy_runtime.deactivate_binding(), policy_runtime.check_binding_health(). Some of these don't exist yet.
Files:
- Modify: backend/app/modules/connectors/repository.py
- Modify: backend/app/modules/policy_runtime/service.py
- Modify: backend/app/modules/policy_runtime/repository.py
- [ ] Step 1: Add
delete_instanceanddelete_instance_by_moduleto ConnectorRepository
Add to ConnectorRepository in backend/app/modules/connectors/repository.py at the end of the class:
async def delete_instance(self, *, org_id: str, instance_id: UUID) -> bool:
query = """
DELETE FROM connector_instances
WHERE org_id = CAST(:org_id AS uuid) AND id = :instance_id
"""
result = await self._execute(query, {"org_id": org_id, "instance_id": instance_id})
return bool(result.rowcount and result.rowcount > 0)
async def delete_instance_by_module(self, *, org_id: str, installed_module_id: UUID) -> bool:
query = """
DELETE FROM connector_instances
WHERE org_id = CAST(:org_id AS uuid) AND installed_module_id = :installed_module_id
"""
result = await self._execute(query, {"org_id": org_id, "installed_module_id": installed_module_id})
return bool(result.rowcount and result.rowcount > 0)
- [ ] Step 2: Add binding lifecycle methods to PolicyRuntimeService
Add to PolicyRuntimeService in backend/app/modules/policy_runtime/service.py at the end of the class:
async def enable_binding(self, *, org_id: str, installed_module_id: UUID) -> None:
await self.repo.update_binding_enabled(
org_id=org_id, installed_module_id=installed_module_id, enabled=True
)
async def deactivate_binding(self, *, org_id: str, installed_module_id: UUID) -> None:
await self.repo.update_binding_enabled(
org_id=org_id, installed_module_id=installed_module_id, enabled=False
)
async def delete_binding(self, *, org_id: str, installed_module_id: UUID | None = None, binding_id: UUID | None = None) -> bool:
return await self.repo.delete_binding(
org_id=org_id, installed_module_id=installed_module_id, binding_id=binding_id
)
async def check_binding_health(self, *, org_id: str, installed_module_id: UUID) -> bool:
binding = await self.repo.get_binding_by_module(org_id=org_id, installed_module_id=installed_module_id)
if binding is None or not binding.get("enabled"):
return False
try:
result = await self.opa.query(binding["decision_path"], {})
return result is not None
except Exception:
return False
- [ ] Step 3: Add repository methods for PolicyRuntimeRepository
Add to PolicyRuntimeRepository in backend/app/modules/policy_runtime/repository.py at the end of the class:
async def update_binding_enabled(
self, *, org_id: str, installed_module_id: UUID, enabled: bool
) -> None:
query = """
UPDATE policy_pack_bindings
SET enabled = :enabled, updated_at = now()
WHERE org_id = CAST(:org_id AS uuid) AND installed_module_id = :installed_module_id
"""
await self._execute(query, {"org_id": org_id, "installed_module_id": installed_module_id, "enabled": enabled})
async def delete_binding(
self, *, org_id: str, installed_module_id: UUID | None = None, binding_id: UUID | None = None
) -> bool:
if installed_module_id:
query = """
DELETE FROM policy_pack_bindings
WHERE org_id = CAST(:org_id AS uuid) AND installed_module_id = :installed_module_id
"""
result = await self._execute(query, {"org_id": org_id, "installed_module_id": installed_module_id})
elif binding_id:
query = """
DELETE FROM policy_pack_bindings
WHERE org_id = CAST(:org_id AS uuid) AND id = :binding_id
"""
result = await self._execute(query, {"org_id": org_id, "binding_id": binding_id})
else:
return False
return bool(result.rowcount and result.rowcount > 0)
async def get_binding_by_module(self, *, org_id: str, installed_module_id: UUID) -> dict | None:
query = """
SELECT id, org_id, installed_module_id, policy_pack_key, decision_path, enabled
FROM policy_pack_bindings
WHERE org_id = CAST(:org_id AS uuid) AND installed_module_id = :installed_module_id
"""
return await self._fetch_one(query, {"org_id": org_id, "installed_module_id": installed_module_id})
- [ ] Step 4: Verify imports compile
Run: cd backend && python -m py_compile app/modules/connectors/repository.py && python -m py_compile app/modules/policy_runtime/service.py && python -m py_compile app/modules/policy_runtime/repository.py
Expected: No output (success)
- [ ] Step 5: Commit
git add backend/app/modules/connectors/repository.py backend/app/modules/policy_runtime/service.py backend/app/modules/policy_runtime/repository.py
git commit -m "feat: add delete/enable/deactivate/health methods for connector and policy repos"
Task 6: Dependency Injection — Wire Lifecycle Service¶
Files:
- Modify: backend/app/modules/marketplace/dependencies.py
- [ ] Step 1: Add
get_lifecycle_servicefactory
Add to backend/app/modules/marketplace/dependencies.py:
# Add these imports at top:
from app.core.plugins.registry import PluginRegistry
from app.modules.marketplace.lifecycle import ModuleLifecycleService
# Add after get_marketplace_service:
def _get_plugin_registry() -> PluginRegistry:
from app.modules.connectors.plugins import plugin_registry
return plugin_registry
def get_lifecycle_service(
repo: MarketplaceRepository = Depends(get_marketplace_repo),
connector_repo: ConnectorRepository = Depends(get_connector_repo),
policy_repo: PolicyRuntimeRepository = Depends(get_policy_runtime_repo),
session: AsyncSession = Depends(get_session),
opa_client: OpaClient = Depends(get_opa_client),
plugin_registry: PluginRegistry = Depends(_get_plugin_registry),
) -> ModuleLifecycleService:
policy_runtime = PolicyRuntimeService(policy_repo, session, opa_client)
return ModuleLifecycleService(repo, connector_repo, policy_runtime, plugin_registry)
- [ ] Step 2: Verify import
Run: cd backend && python -m py_compile app/modules/marketplace/dependencies.py
Expected: No output (success)
- [ ] Step 3: Commit
git add backend/app/modules/marketplace/dependencies.py
git commit -m "feat(di): wire ModuleLifecycleService with plugin registry"
Task 7: Router Updates — Action-Based PATCH + Health Endpoint¶
Files:
- Modify: backend/app/modules/marketplace/router.py
- [ ] Step 1: Add imports for lifecycle service and health schema
At the top of backend/app/modules/marketplace/router.py, add:
from app.modules.marketplace.dependencies import get_lifecycle_service
from app.modules.marketplace.lifecycle import ModuleLifecycleService, InvalidTransitionError
from app.modules.marketplace.schemas import HealthStatusOut
- [ ] Step 2: Replace the PATCH endpoint
Replace the update_installed_module endpoint (lines 79-91) with:
@router.patch("/installed/{installed_module_id}", response_model=UpdateInstalledModuleResponse)
async def update_installed_module(
installed_module_id: UUID,
payload: UpdateInstalledModuleRequest,
lifecycle: ModuleLifecycleService = Depends(get_lifecycle_service),
service: MarketplaceService = Depends(get_marketplace_service),
user: UserInfo = Depends(get_current_user),
):
from app.core.principal import resolve_principal_context, require_org_roles
from app.core.dependencies import get_session
principal = await resolve_principal_context(lifecycle.repo.session, user)
require_org_roles(principal, {"org_owner", "org_admin", "team_lead", "connector_manager", "policy_manager"})
try:
if payload.action == "enable":
row = await lifecycle.enable(installed_module_id, principal.org_id, UUID(principal.user_profile_id))
elif payload.action == "disable":
row = await lifecycle.disable(installed_module_id, principal.org_id, UUID(principal.user_profile_id))
elif payload.action == "retry":
row = await lifecycle.retry(installed_module_id, principal.org_id, UUID(principal.user_profile_id))
elif payload.channel:
return await service.update_installed_module(
user=user,
installed_module_id=installed_module_id,
install_state=None,
channel=payload.channel,
)
else:
raise HTTPException(status_code=400, detail="Must provide action or channel")
except InvalidTransitionError as exc:
raise HTTPException(status_code=400, detail=str(exc)) from exc
return UpdateInstalledModuleResponse(installed_module=service._installed_row_to_out(row))
- [ ] Step 3: Replace the DELETE endpoint
Replace the uninstall_module endpoint (lines 144-150) with:
@router.delete("/installations/{installation_id}", status_code=status.HTTP_204_NO_CONTENT)
async def uninstall_module(
installation_id: UUID,
lifecycle: ModuleLifecycleService = Depends(get_lifecycle_service),
user: UserInfo = Depends(get_current_user),
):
from app.core.principal import resolve_principal_context, require_org_roles
principal = await resolve_principal_context(lifecycle.repo.session, user)
require_org_roles(principal, {"org_owner", "org_admin", "team_lead", "connector_manager", "policy_manager"})
try:
await lifecycle.uninstall(installation_id, principal.org_id, UUID(principal.user_profile_id))
except InvalidTransitionError as exc:
raise HTTPException(status_code=400, detail=str(exc)) from exc
- [ ] Step 4: Add health check endpoint
Add after the configure endpoint:
@router.get("/installed/{installed_module_id}/health", response_model=HealthStatusOut)
async def get_module_health(
installed_module_id: UUID,
lifecycle: ModuleLifecycleService = Depends(get_lifecycle_service),
user: UserInfo = Depends(get_current_user),
):
from app.core.principal import resolve_principal_context
principal = await resolve_principal_context(lifecycle.repo.session, user)
try:
return await lifecycle.health_check(installed_module_id, principal.org_id)
except InvalidTransitionError as exc:
raise HTTPException(status_code=404, detail=str(exc)) from exc
- [ ] Step 5: Verify compilation
Run: cd backend && python -m py_compile app/modules/marketplace/router.py
Expected: No output (success)
- [ ] Step 6: Commit
git add backend/app/modules/marketplace/router.py
git commit -m "feat(router): action-based PATCH, lifecycle-powered DELETE, health endpoint"
Task 8: Service Cleanup — Remove Duplicated Logic¶
Files:
- Modify: backend/app/modules/marketplace/service.py
- [ ] Step 1: Simplify
install_moduleto delegate resource creation to lifecycle
In backend/app/modules/marketplace/service.py, replace the block in install_module() from the install_row upsert through the return (lines 259-311). The method should still handle entitlement/license validation, but after creating the installed_modules row, delegate to the lifecycle service for resource creation:
Replace lines 259-311 with:
install_row = await self.repo.upsert_install(
org_id=effective_org_id,
module_id=module["id"],
installed_by_user_profile_id=UUID(principal.user_profile_id),
artifact_ref=artifact_ref,
artifact_digest=artifact_digest,
config_json=config_json,
verification_json=verification_json,
)
# Lifecycle service handles resource creation + plugin hooks.
# It is injected at router level; here we return the raw install response.
# Resource creation is now handled by the lifecycle service called from the router.
installed_full = await self.repo.get_installed_by_id(effective_org_id, install_row["id"])
if installed_full is None:
raise HTTPException(status_code=500, detail="Installed module row not found after upsert")
return InstallModuleResponse(
installed_module=self._installed_row_to_out(installed_full),
connector_instance_id=None,
policy_binding_id=None,
)
- [ ] Step 2: Update
_installed_row_to_outto includeretry_count
In backend/app/modules/marketplace/service.py, add to _installed_row_to_out (around line 400):
retry_count=row.get("retry_count", 0),
Add this after the last_error=row.get("last_error"), line.
- [ ] Step 3: Update the install route to call lifecycle after service
In backend/app/modules/marketplace/router.py, update the install_module endpoint to call lifecycle:
@router.post("/install", response_model=InstallModuleResponse)
async def install_module(
payload: InstallModuleRequest,
service: MarketplaceService = Depends(get_marketplace_service),
lifecycle: ModuleLifecycleService = Depends(get_lifecycle_service),
user: UserInfo = Depends(get_current_user),
):
result = await service.install_module(
user=user,
module_key=payload.module_key,
org_id=payload.org_id,
artifact_ref=payload.artifact_ref,
artifact_digest=payload.artifact_digest,
config_json=payload.config_json,
license_token=payload.license_token,
)
# Run lifecycle hooks (configure, install) for the newly created installation
from app.core.principal import resolve_principal_context
principal = await resolve_principal_context(lifecycle.repo.session, user)
module = await lifecycle.repo.get_module_by_key(payload.module_key)
installed_row = await lifecycle.run_install(
installed_module_id=result.installed_module.id,
org_id=principal.org_id,
user_profile_id=UUID(principal.user_profile_id),
module_key=payload.module_key,
module_type=module["module_type"],
config=payload.config_json,
)
return InstallModuleResponse(
installed_module=service._installed_row_to_out(installed_row),
connector_instance_id=None,
policy_binding_id=None,
)
- [ ] Step 4: Verify compilation
Run: cd backend && python -m py_compile app/modules/marketplace/service.py && python -m py_compile app/modules/marketplace/router.py
Expected: No output (success)
- [ ] Step 5: Commit
git add backend/app/modules/marketplace/service.py backend/app/modules/marketplace/router.py
git commit -m "refactor: delegate install resource creation to lifecycle service"
Task 9: Update Existing Tests¶
Files:
- Modify: backend/tests/modules/test_marketplace.py
- [ ] Step 1: Update test for action-based PATCH
Add a new test to backend/tests/modules/test_marketplace.py:
@pytest.mark.asyncio
async def test_update_installed_module_action_enable(app, client):
installed_id = str(uuid4())
mock_lifecycle = AsyncMock()
mock_lifecycle.enable.return_value = {
"id": installed_id,
"org_id": str(uuid4()),
"module_id": str(uuid4()),
"module_key": "connector.github",
"module_name": "GitHub",
"module_version": "1.0.0",
"module_publisher": "Substrate Labs",
"module_type": "connector",
"billing_mode": "free",
"install_state": "installed",
"channel": "stable",
"artifact_ref": "ref",
"artifact_digest": "digest",
"configured_json": {},
"verification_json": {},
"verified_at": None,
"last_error": None,
"retry_count": 0,
"created_at": "2026-03-28T00:00:00",
"updated_at": "2026-03-28T00:00:00",
}
mock_lifecycle.repo = AsyncMock()
from app.modules.marketplace.dependencies import get_lifecycle_service
from app.core.principal import resolve_principal_context
from unittest.mock import patch, MagicMock
principal = MagicMock()
principal.org_id = str(uuid4())
principal.user_profile_id = str(uuid4())
with patch("app.modules.marketplace.router.resolve_principal_context", return_value=principal):
app.dependency_overrides[get_lifecycle_service] = lambda: mock_lifecycle
resp = await client.patch(
f"/api/v1/marketplace/installed/{installed_id}",
json={"action": "enable"},
)
assert resp.status_code == 200
assert resp.json()["installed_module"]["install_state"] == "installed"
app.dependency_overrides.pop(get_lifecycle_service, None)
- [ ] Step 2: Run all marketplace tests
Run: cd backend && python -m pytest tests/modules/test_marketplace.py tests/modules/test_marketplace_lifecycle.py -v 2>&1 | tail -20
Expected: All tests pass
- [ ] Step 3: Commit
git add backend/tests/modules/test_marketplace.py
git commit -m "test: update marketplace tests for action-based PATCH schema"
Task 10: Run Full Test Suite + Final Verification¶
Files: None (verification only)
- [ ] Step 1: Run all backend tests
Run: cd backend && python -m pytest tests/ -v --tb=short 2>&1 | tail -30
Expected: All tests pass
- [ ] Step 2: Verify type checking
Run: cd backend && python -m py_compile app/modules/marketplace/lifecycle.py && python -m py_compile app/modules/marketplace/schemas.py && python -m py_compile app/modules/marketplace/router.py && python -m py_compile app/modules/marketplace/service.py && python -m py_compile app/modules/marketplace/dependencies.py && echo "All OK"
Expected: All OK
- [ ] Step 3: Update changelog
Add a section to changelog.md documenting:
- State machine implementation with 7 states
- Plugin lifecycle hook wiring
- Rollback strategy
- Action-based PATCH endpoint
- Health check endpoint
- Soft-delete uninstall
- New tests
- [ ] Step 4: Final commit
git add changelog.md
git commit -m "docs: update changelog with marketplace lifecycle implementation"