Skip to content

Marketplace Core Implementation Plan

For agentic workers: REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (- [ ]) syntax for tracking.

Goal: Wire a modular full-stack platform with OpenAPI-first types, Org/IAM hierarchy, dual auth, marketplace with billing/licensing, plugin engine, OPA policy evaluation, config management, and two built-in modules (GitHub connector + GDPR policy pack).

Architecture: Domain-split backend (marketplace, billing, licensing, plugins, config modules) behind FastAPI, with React 19 + Vite frontend consuming generated TypeScript types from OpenAPI. Plugin engine uses abstract base classes (DataConnectorBase, PolicyPackBase) with a singleton PluginRegistry. Dual auth (JWT via Keycloak + API key via X-API-Key) produces identical UserInfo objects for all downstream code.

Tech Stack: Python 3.12 / FastAPI / SQLAlchemy (async) / PostgreSQL 16 / Neo4j / Redis 7 / OPA 1.4.2 / Keycloak 26.5 / React 19 / TypeScript / Vite / openapi-typescript / datamodel-codegen / ECDSA P-256 / Fernet / Stripe SDK


Existing Code Inventory

Before starting, understand what already exists and works:

Component Status Location
V5 migration (orgs, teams, marketplace, connectors, policy_runtime) Complete backend/db/postgres/V5__iam_marketplace_policy_runtime.sql
Marketplace module (catalog, purchase, install, configure) Complete backend/app/modules/marketplace/
Connectors module (instances, config, sync) Complete backend/app/modules/connectors/
Policy runtime module (bindings, OPA evaluate) Complete backend/app/modules/policy_runtime/
IAM module (orgs, teams, memberships) Complete backend/app/modules/iam/
Auth module (profile sync, API tokens) Complete backend/app/modules/auth/
Dual auth (JWT + API key) Complete backend/app/core/security.py
License token service (RS256) Complete backend/app/modules/marketplace/license.py
GitHub connector plugin (full GitHub API) Complete backend/app/modules/connectors/plugins.py
GDPR/DRY/SOLID/SOC2 Rego policies (inline) Complete backend/app/modules/policy_runtime/service.py
OPA client Complete backend/app/core/opa.py
Principal resolution Complete backend/app/core/principal.py
OpenAPI type generation pipeline Complete api/generate-types.sh
Frontend API client (marketplace, connectors, policies, auth) Complete ui/src/api/substrateApi.ts

Strategy: Build on existing code. Add additive migrations V6-V11. Extract/refactor where the spec diverges. Never drop existing tables.


File Structure

New Files

backend/
├── db/postgres/
│   ├── V6__org_hierarchy_extension.sql          # org_projects, org_user_groups, org_memberships, team_memberships + ALTER organizations
│   ├── V7__module_requests.sql                  # module_requests table
│   ├── V8__billing.sql                          # billing_accounts, transactions, invoices
│   ├── V9__org_licenses.sql                     # org_licenses + ALTER marketplace_entitlements
│   ├── V10__sync_and_eval_runs.sql              # policy_evaluation_runs
│   └── V11__org_settings.sql                    # org_settings table
├── db/neo4j/
│   └── marketplace_schema.cypher                # Org hierarchy + marketplace constraints
├── app/
│   ├── core/
│   │   ├── authorization.py                     # require_org_admin, require_team_member, require_scope
│   │   └── plugins/
│   │       ├── __init__.py
│   │       ├── base.py                          # DataConnectorBase, PolicyPackBase, PluginMeta
│   │       ├── registry.py                      # PluginRegistry singleton
│   │       ├── deps.py                          # PluginDeps container
│   │       └── loader.py                        # Bundle extraction, verification, dynamic import
│   ├── modules/
│   │   ├── billing/
│   │   │   ├── __init__.py
│   │   │   ├── router.py
│   │   │   ├── service.py
│   │   │   ├── repository.py
│   │   │   ├── schemas.py
│   │   │   ├── dependencies.py
│   │   │   └── providers/
│   │   │       ├── __init__.py
│   │   │       ├── base.py                      # PaymentProvider ABC
│   │   │       ├── stripe_provider.py
│   │   │       └── offline_provider.py
│   │   ├── licensing/
│   │   │   ├── __init__.py
│   │   │   ├── router.py
│   │   │   ├── service.py
│   │   │   ├── repository.py
│   │   │   ├── schemas.py
│   │   │   └── dependencies.py
│   │   ├── plugins/
│   │   │   ├── __init__.py
│   │   │   ├── router.py
│   │   │   ├── service.py
│   │   │   └── dependencies.py
│   │   └── config/
│   │       ├── __init__.py
│   │       ├── router.py
│   │       ├── service.py
│   │       ├── repository.py
│   │       ├── schemas.py
│   │       ├── defaults.py
│   │       └── dependencies.py
│   ├── connectors/
│   │   └── github/
│   │       ├── __init__.py
│   │       └── connector.py                     # GitHubConnector(DataConnectorBase)
│   ├── policy_packs/
│   │   └── gdpr/
│   │       ├── __init__.py
│   │       ├── pack.py                          # GDPRPolicyPack(PolicyPackBase)
│   │       └── policies/
│   │           ├── data_classification.rego
│   │           ├── retention.rego
│   │           ├── consent.rego
│   │           └── cross_border.rego
│   └── cli/
│       └── register_builtins.py                 # Boot-time bundle registration
├── config/
│   └── settings.yaml                            # Default YAML config
├── bundles/                                     # Built-in .substrate bundles
│   ├── connector-github-1.0.0.substrate
│   └── policy-pack-gdpr-1.0.0.substrate
└── tests/
    ├── modules/
    │   ├── test_billing.py
    │   ├── test_licensing.py
    │   ├── test_plugins.py
    │   └── test_config.py
    └── core/
        ├── test_authorization.py
        └── test_plugin_engine.py

infra/
├── opa/
│   ├── Dockerfile
│   └── docker-compose.yml
└── keycloak/
    └── spi/github-mapper/
        ├── pom.xml
        └── src/main/java/com/substrate/keycloak/
            ├── GitHubDataMapperFactory.java
            └── GitHubDataMapper.java

ui/src/
├── pages/settings/
│   ├── SettingsLayout.tsx
│   ├── OrgSettingsPage.tsx
│   ├── TeamSettingsPage.tsx
│   ├── ProfileSettingsPage.tsx
│   ├── ApiTokensPage.tsx
│   ├── MarketplacePage.tsx
│   ├── BillingPage.tsx
│   ├── LlmConnectionsPage.tsx
│   ├── PlatformDataPage.tsx
│   ├── PreferencesPage.tsx
│   └── index.ts
├── api/
│   ├── billingApi.ts
│   ├── licensingApi.ts
│   ├── pluginsApi.ts
│   ├── orgApi.ts
│   └── configApi.ts
└── components/
    └── ConfigSchemaForm.tsx

Modified Files

api/openapi.yml                        # ~50 new endpoints
api/generate-types.sh                  # Remove line 47 (cp to spec.yaml)
backend/app/main.py                              # Register billing, licensing, plugins, config routers + plugin lifespan
backend/app/settings.py                          # Add billing_provider, config_encryption_key, stripe settings
backend/app/core/security.py                     # Add org_id, org_role, projects, scopes to UserInfo
backend/app/modules/marketplace/license.py       # Migrate RS256 → ECDSA P-256
backend/app/modules/marketplace/router.py        # Add upload, requests, delete endpoints
backend/app/modules/marketplace/schemas.py       # Add upload, request schemas
backend/app/modules/marketplace/service.py       # Add upload_bundle, request/approve/deny
backend/app/modules/iam/router.py                # Add project, user_group, member CRUD per spec section 5.5
backend/app/modules/iam/schemas.py               # Add project, user_group schemas
backend/app/modules/iam/service.py               # Add project, user_group, member invite logic
backend/app/modules/iam/repository.py            # Add project, user_group, invite queries
infra/keycloak/substrate-realm.json              # Restructure groups, add org mappers, env var IdP
docker-compose.yml                               # Add OPA service dependency
ui/src/app/userProfile.ts                        # Extract org claims
ui/src/app/rbac.ts                               # Add org_role checks
ui/src/api/substrateApi.ts                       # Add billing, licensing, plugins, config methods

Deleted Files

api/openapi.spec.yaml                  # Redundant copy

Phase 0: Cleanup & Foundation

Task 1: Delete Redundant OpenAPI Spec Copy

Files: - Delete: api/openapi.spec.yaml - Modify: api/generate-types.sh:47

  • [ ] Step 1: Verify the files are identical

Run: diff api/openapi.yml api/openapi.spec.yaml Expected: No output (files are identical)

  • [ ] Step 2: Delete the redundant copy
rm api/openapi.spec.yaml
  • [ ] Step 3: Remove the cp line from generate-types.sh

In api/generate-types.sh, remove line 47:

# REMOVE this line:
cp "${SPEC_FILE}" "${SPEC_ALIAS_FILE}"

Also remove the SPEC_ALIAS_FILE variable on line 7 and the echo on line 51 that references it.

The cleaned generate-types.sh should be:

#!/usr/bin/env bash
set -euo pipefail

ROOT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")/.." && pwd)"
SPEC_DIR="${ROOT_DIR}/api"
SPEC_FILE="${SPEC_DIR}/openapi.yml"
BACKEND_DIR="${ROOT_DIR}/backend"
UI_DIR="${ROOT_DIR}/ui"

mkdir -p "${SPEC_DIR}" "${BACKEND_DIR}/app/types" "${UI_DIR}/src/types"

echo "[1/3] Generating OpenAPI YAML from FastAPI app..."
PYTHONPATH="${BACKEND_DIR}" python3 - "${SPEC_FILE}" <<'PY'
import sys
import yaml

from app.main import create_app

output_file = sys.argv[1]
app = create_app()
spec = app.openapi()

with open(output_file, "w", encoding="utf-8") as fh:
    yaml.safe_dump(
        spec,
        fh,
        sort_keys=False,
        allow_unicode=False,
    )

print(f"wrote {output_file}")
PY

echo "[2/3] Generating TypeScript types for UI..."
npx --yes openapi-typescript "${SPEC_FILE}" --output "${UI_DIR}/src/types/openapi.generated.ts"

echo "[3/3] Generating Python models for backend..."
uvx --from datamodel-code-generator datamodel-codegen \
  --input "${SPEC_FILE}" \
  --input-file-type openapi \
  --output "${BACKEND_DIR}/app/types/openapi_generated.py" \
  --output-model-type pydantic_v2.BaseModel \
  --target-python-version 3.12 \
  --disable-timestamp

echo "Done. Updated:"
echo "  - ${SPEC_FILE}"
echo "  - ${UI_DIR}/src/types/openapi.generated.ts"
echo "  - ${BACKEND_DIR}/app/types/openapi_generated.py"
  • [ ] Step 4: Commit
git add -A api/
git commit -m "chore: remove redundant openapi.spec.yaml, clean generate-types.sh"

Phase 1: Database Migrations

Task 2: V6 — Org Hierarchy Extension

Files: - Create: backend/db/postgres/V6__org_hierarchy_extension.sql

  • [ ] Step 1: Write the migration
-- V6: Extend org hierarchy with projects, user groups, and explicit memberships
-- Additive to V5 — does not alter existing table structures

ALTER TABLE organizations
    ADD COLUMN IF NOT EXISTS tier VARCHAR(20) NOT NULL DEFAULT 'free'
        CHECK (tier IN ('free', 'organization', 'enterprise')),
    ADD COLUMN IF NOT EXISTS max_users INTEGER NOT NULL DEFAULT 10,
    ADD COLUMN IF NOT EXISTS max_projects INTEGER NOT NULL DEFAULT 1;

CREATE TABLE IF NOT EXISTS org_projects (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    org_id UUID NOT NULL REFERENCES organizations(id) ON DELETE CASCADE,
    name VARCHAR(255) NOT NULL,
    slug VARCHAR(100) NOT NULL,
    description TEXT,
    created_at TIMESTAMPTZ DEFAULT now(),
    updated_at TIMESTAMPTZ DEFAULT now(),
    UNIQUE(org_id, slug)
);

CREATE INDEX IF NOT EXISTS idx_org_projects_org ON org_projects (org_id);

CREATE TABLE IF NOT EXISTS org_user_groups (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    team_id UUID NOT NULL REFERENCES organization_teams(id) ON DELETE CASCADE,
    name VARCHAR(255) NOT NULL,
    slug VARCHAR(100) NOT NULL,
    description TEXT,
    UNIQUE(team_id, slug)
);

CREATE TABLE IF NOT EXISTS org_memberships (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    org_id UUID NOT NULL REFERENCES organizations(id) ON DELETE CASCADE,
    user_id VARCHAR(255) NOT NULL,
    org_role VARCHAR(20) NOT NULL DEFAULT 'member'
        CHECK (org_role IN ('admin', 'member', 'billing')),
    joined_at TIMESTAMPTZ DEFAULT now(),
    UNIQUE(org_id, user_id)
);

CREATE INDEX IF NOT EXISTS idx_org_memberships_user ON org_memberships (user_id);

-- Link organization_teams to org_projects
ALTER TABLE organization_teams
    ADD COLUMN IF NOT EXISTS project_id UUID REFERENCES org_projects(id) ON DELETE CASCADE;

-- Link org_user_groups membership to team_memberships
ALTER TABLE organization_team_memberships
    ADD COLUMN IF NOT EXISTS user_group_id UUID REFERENCES org_user_groups(id);

-- Seed: create a default project for the default org
INSERT INTO org_projects (id, org_id, name, slug, description)
VALUES (
    'c9200000-0000-0000-0000-000000000001',
    'c9000000-0000-0000-0000-000000000001',
    'Default Project',
    'default-project',
    'Default project for the Substrate Default Org'
)
ON CONFLICT (org_id, slug) DO NOTHING;

-- Link existing teams to the default project
UPDATE organization_teams
SET project_id = 'c9200000-0000-0000-0000-000000000001'
WHERE org_id = 'c9000000-0000-0000-0000-000000000001'
  AND project_id IS NULL;
  • [ ] Step 2: Verify migration file is valid SQL

Run: cd /home/dany/substrate && cat backend/db/postgres/V6__org_hierarchy_extension.sql | head -5 Expected: First lines of the migration

  • [ ] Step 3: Commit
git add backend/db/postgres/V6__org_hierarchy_extension.sql
git commit -m "feat: V6 migration — org projects, user groups, memberships"

Task 3: V7 — Module Requests

Files: - Create: backend/db/postgres/V7__module_requests.sql

  • [ ] Step 1: Write the migration
-- V7: Module request workflow (non-admin users request, admins approve/deny)

CREATE TABLE IF NOT EXISTS module_requests (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    org_id UUID NOT NULL REFERENCES organizations(id) ON DELETE CASCADE,
    module_id UUID NOT NULL REFERENCES marketplace_modules(id),
    requested_by VARCHAR(255) NOT NULL,
    reason TEXT,
    status VARCHAR(20) NOT NULL DEFAULT 'pending'
        CHECK (status IN ('pending', 'approved', 'denied')),
    reviewed_by VARCHAR(255),
    reviewed_at TIMESTAMPTZ,
    created_at TIMESTAMPTZ DEFAULT now()
);

CREATE INDEX IF NOT EXISTS idx_module_requests_org ON module_requests (org_id, status);
  • [ ] Step 2: Commit
git add backend/db/postgres/V7__module_requests.sql
git commit -m "feat: V7 migration — module request workflow"

Task 4: V8 — Billing

Files: - Create: backend/db/postgres/V8__billing.sql

  • [ ] Step 1: Write the migration
-- V8: Billing accounts, transactions, invoices

CREATE TABLE IF NOT EXISTS billing_accounts (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    org_id UUID NOT NULL REFERENCES organizations(id) ON DELETE CASCADE UNIQUE,
    stripe_customer_id VARCHAR(255),
    billing_email VARCHAR(255),
    billing_cycle VARCHAR(20) CHECK (billing_cycle IN ('monthly', 'yearly')),
    payment_method_last4 VARCHAR(4),
    created_at TIMESTAMPTZ DEFAULT now(),
    updated_at TIMESTAMPTZ DEFAULT now()
);

CREATE TABLE IF NOT EXISTS transactions (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    billing_account_id UUID NOT NULL REFERENCES billing_accounts(id),
    module_id UUID REFERENCES marketplace_modules(id),
    amount_cents INTEGER NOT NULL,
    currency VARCHAR(3) NOT NULL DEFAULT 'USD',
    stripe_payment_intent_id VARCHAR(255),
    status VARCHAR(20) NOT NULL DEFAULT 'pending'
        CHECK (status IN ('pending', 'completed', 'failed', 'refunded')),
    created_at TIMESTAMPTZ DEFAULT now()
);

CREATE TABLE IF NOT EXISTS invoices (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    billing_account_id UUID NOT NULL REFERENCES billing_accounts(id),
    amount_cents INTEGER NOT NULL,
    currency VARCHAR(3) NOT NULL DEFAULT 'USD',
    period_start TIMESTAMPTZ,
    period_end TIMESTAMPTZ,
    stripe_invoice_id VARCHAR(255),
    pdf_url TEXT,
    status VARCHAR(20) NOT NULL DEFAULT 'draft'
        CHECK (status IN ('draft', 'open', 'paid', 'void')),
    created_at TIMESTAMPTZ DEFAULT now()
);

CREATE INDEX IF NOT EXISTS idx_transactions_billing ON transactions (billing_account_id);
CREATE INDEX IF NOT EXISTS idx_invoices_billing ON invoices (billing_account_id);
  • [ ] Step 2: Commit
git add backend/db/postgres/V8__billing.sql
git commit -m "feat: V8 migration — billing accounts, transactions, invoices"

Task 5: V9 — Org Licenses

Files: - Create: backend/db/postgres/V9__org_licenses.sql

  • [ ] Step 1: Write the migration
-- V9: Org-level platform licenses (tier enforcement)

CREATE TABLE IF NOT EXISTS org_licenses (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    org_id UUID NOT NULL REFERENCES organizations(id) ON DELETE CASCADE UNIQUE,
    tier VARCHAR(20) NOT NULL CHECK (tier IN ('free', 'organization', 'enterprise')),
    max_users INTEGER NOT NULL DEFAULT 10,
    max_projects INTEGER NOT NULL DEFAULT 1,
    license_token TEXT,
    issued_at TIMESTAMPTZ DEFAULT now(),
    expires_at TIMESTAMPTZ,
    status VARCHAR(20) NOT NULL DEFAULT 'active'
        CHECK (status IN ('active', 'expired', 'revoked'))
);

-- Seed: free license for default org
INSERT INTO org_licenses (org_id, tier, max_users, max_projects, status)
VALUES (
    'c9000000-0000-0000-0000-000000000001',
    'free', 10, 1, 'active'
)
ON CONFLICT (org_id) DO NOTHING;
  • [ ] Step 2: Commit
git add backend/db/postgres/V9__org_licenses.sql
git commit -m "feat: V9 migration — org platform licenses"

Task 6: V10 — Policy Evaluation Runs

Files: - Create: backend/db/postgres/V10__policy_evaluation_runs.sql

  • [ ] Step 1: Write the migration
-- V10: Policy evaluation tracking

CREATE TABLE IF NOT EXISTS policy_evaluation_runs (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    installation_id UUID NOT NULL REFERENCES installed_modules(id) ON DELETE CASCADE,
    evaluated_at TIMESTAMPTZ DEFAULT now(),
    scope JSONB NOT NULL,
    input_summary JSONB,
    result VARCHAR(20) NOT NULL CHECK (result IN ('compliant', 'non_compliant', 'error')),
    violations_found INTEGER DEFAULT 0,
    duration_ms INTEGER
);

CREATE INDEX IF NOT EXISTS idx_eval_runs_install ON policy_evaluation_runs (installation_id);
  • [ ] Step 2: Commit
git add backend/db/postgres/V10__policy_evaluation_runs.sql
git commit -m "feat: V10 migration — policy evaluation runs"

Task 7: V11 — Org Settings

Files: - Create: backend/db/postgres/V11__org_settings.sql

  • [ ] Step 1: Write the migration
-- V11: Per-org runtime configuration overrides

CREATE TABLE IF NOT EXISTS org_settings (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    org_id UUID NOT NULL REFERENCES organizations(id) ON DELETE CASCADE,
    domain VARCHAR(50) NOT NULL,
    settings JSONB NOT NULL DEFAULT '{}',
    updated_by VARCHAR(255),
    updated_at TIMESTAMPTZ DEFAULT now(),
    UNIQUE(org_id, domain)
);

-- Valid domains: org_profile, llm_connections, platform_postgres, platform_redis,
-- platform_neo4j, platform_nats, platform_vector, notifications, features, retention
  • [ ] Step 2: Commit
git add backend/db/postgres/V11__org_settings.sql
git commit -m "feat: V11 migration — org settings table"

Task 8: Neo4j Marketplace Schema

Files: - Create: backend/db/neo4j/marketplace_schema.cypher

  • [ ] Step 1: Write the schema
// Organization hierarchy
CREATE CONSTRAINT org_id IF NOT EXISTS FOR (o:Organization) REQUIRE o.id IS UNIQUE;
CREATE CONSTRAINT org_name IF NOT EXISTS FOR (o:Organization) REQUIRE o.name IS UNIQUE;
CREATE CONSTRAINT project_id IF NOT EXISTS FOR (p:Project) REQUIRE p.id IS UNIQUE;
CREATE CONSTRAINT orgteam_id IF NOT EXISTS FOR (t:OrgTeam) REQUIRE t.id IS UNIQUE;
CREATE CONSTRAINT usergroup_id IF NOT EXISTS FOR (g:UserGroup) REQUIRE g.id IS UNIQUE;
CREATE CONSTRAINT module_id IF NOT EXISTS FOR (m:MarketplaceModule) REQUIRE m.id IS UNIQUE;

// Connector-produced nodes (GitHub connector)
CREATE CONSTRAINT repo_id IF NOT EXISTS FOR (r:Repository) REQUIRE r.id IS UNIQUE;
CREATE CONSTRAINT workflow_id IF NOT EXISTS FOR (w:Workflow) REQUIRE w.id IS UNIQUE;
CREATE CONSTRAINT package_id IF NOT EXISTS FOR (p:Package) REQUIRE p.id IS UNIQUE;
CREATE CONSTRAINT ghproject_id IF NOT EXISTS FOR (p:GitHubProject) REQUIRE p.id IS UNIQUE;

// Relationships documented (created dynamically by connectors):
// (:Project)-[:BELONGS_TO]->(:Organization)
// (:OrgTeam)-[:PART_OF]->(:Project)
// (:UserGroup)-[:WITHIN]->(:OrgTeam)
// (:User)-[:MEMBER_OF {role, since}]->(:OrgTeam)
// (:User)-[:IN_GROUP]->(:UserGroup)
// (:Organization)-[:INSTALLED {status, since}]->(:MarketplaceModule)
// (:MarketplaceModule)-[:PROVIDES]->(:Policy)
// (:MarketplaceModule)-[:CONNECTS_TO {type}]->(:ExternalSource)
// (:Repository)-[:OWNED_BY]->(:Organization)
// (:GitHubPages)-[:HOSTED_BY]->(:Repository)
// (:GitHubProject)-[:MANAGED_BY]->(:Organization)
// (:Workflow)-[:DEFINED_IN]->(:Repository)
// (:Package)-[:PUBLISHED_FROM]->(:Repository)
  • [ ] Step 2: Commit
git add backend/db/neo4j/marketplace_schema.cypher
git commit -m "feat: Neo4j marketplace and org hierarchy schema"

Phase 2: Core Auth & Authorization

Task 9: Extend UserInfo with Org Context

Files: - Modify: backend/app/core/security.py:70-86

  • [ ] Step 1: Write the failing test

Create backend/tests/core/test_authorization.py:

"""Tests for org-aware authorization."""
import pytest
from app.core.security import UserInfo


def test_user_info_has_org_fields():
    user = UserInfo(
        sub="user-1",
        org_id="org-1",
        org_slug="acme",
        org_role="admin",
        projects=[{
            "id": "proj-1",
            "slug": "platform",
            "teams": [{"id": "team-1", "slug": "alpha", "role": "Developer", "user_group": "backend"}]
        }],
        auth_method="jwt",
        scopes=["*"],
    )
    assert user.org_id == "org-1"
    assert user.org_role == "admin"
    assert user.auth_method == "jwt"
    assert len(user.projects) == 1


def test_user_info_defaults():
    user = UserInfo(sub="user-2")
    assert user.org_id is None
    assert user.org_role is None
    assert user.auth_method == "jwt"
    assert user.scopes == ["*"]
    assert user.projects == []
  • [ ] Step 2: Run test to verify it fails

Run: cd /home/dany/substrate/backend && python -m pytest tests/core/test_authorization.py -v Expected: FAIL — org_id, org_role, auth_method, scopes, projects not on UserInfo

  • [ ] Step 3: Add org fields to UserInfo

In backend/app/core/security.py, update the UserInfo class:

class UserInfo(BaseModel):
    sub: str
    email: str | None = None
    preferred_username: str | None = None
    name: str | None = None
    given_name: str | None = None
    family_name: str | None = None
    role: str | None = None
    perspective: str | None = None
    team: str | None = None
    realm_roles: list[str] = Field(default_factory=list)
    groups: list[str] = Field(default_factory=list)
    identity_provider: str | None = None
    identity_provider_identity: str | None = None
    github_username: str | None = None
    github_id: str | None = None
    # --- New org-aware fields ---
    org_id: str | None = None
    org_slug: str | None = None
    org_role: str | None = None  # "admin" | "member" | "billing"
    projects: list[dict] = Field(default_factory=list)
    auth_method: str = "jwt"  # "jwt" | "api_key"
    scopes: list[str] = Field(default_factory=lambda: ["*"])
  • [ ] Step 4: Update _extract_user_info to read org claims

In the same file, update _extract_user_info to extract org fields from JWT:

def _extract_user_info(payload: dict[str, Any]) -> UserInfo:
    # ... existing role/perspective/team/name/email extraction stays the same ...

    # Org context from JWT claims
    org_id = _as_string(payload.get("org_id"))
    org_slug = _as_string(payload.get("org_slug"))
    org_role = _as_string(payload.get("org_role"))
    projects = payload.get("projects") if isinstance(payload.get("projects"), list) else []

    return UserInfo(
        sub=_first_claim(payload, ["sub"]) or "",
        email=email,
        preferred_username=username,
        name=name,
        given_name=given_name,
        family_name=family_name,
        role=roles[0] if roles else role_claim,
        perspective=perspective,
        team=team,
        realm_roles=roles,
        groups=groups,
        identity_provider=identity_provider,
        identity_provider_identity=identity_provider_identity,
        github_username=_first_claim(payload, ["github_username", "login"]),
        github_id=_first_claim(payload, ["github_id"]),
        org_id=org_id,
        org_slug=org_slug,
        org_role=org_role,
        projects=projects,
        auth_method="jwt",
        scopes=["*"],
    )
  • [ ] Step 5: Update _resolve_api_key_user to set auth_method and scopes

In the same file, update _resolve_api_key_user to query scopes and org info:

async def _resolve_api_key_user(api_key: str) -> UserInfo | None:
    async with async_session_factory() as session:
        token_hash = hashlib.sha256(api_key.encode("utf-8")).hexdigest()
        query = text(
            """
            SELECT
                t.id AS token_id,
                t.scopes,
                p.user_sub,
                p.email,
                p.username,
                p.full_name,
                p.first_name,
                p.last_name,
                p.primary_role,
                p.perspective,
                p.team,
                p.roles,
                p.groups,
                p.identity_provider,
                p.identity_provider_identity,
                p.github_username,
                p.github_id,
                om.org_role,
                o.id AS org_id,
                o.slug AS org_slug
            FROM user_api_tokens t
            INNER JOIN user_profiles p ON p.id = t.user_profile_id
            LEFT JOIN org_memberships om ON om.user_id = p.user_sub
            LEFT JOIN organizations o ON o.id = om.org_id
            WHERE t.token_hash = :token_hash
              AND t.revoked_at IS NULL
              AND (t.expires_at IS NULL OR t.expires_at > now())
            LIMIT 1
            """
        )
        result = await session.execute(query, {"token_hash": token_hash})
        row = result.first()
        if row is None:
            return None

        data = dict(row._mapping)
        await session.execute(
            text("UPDATE user_api_tokens SET last_used_at = now(), updated_at = now() WHERE id = :token_id"),
            {"token_id": data["token_id"]},
        )
        await session.commit()

        roles = data.get("roles") or []
        groups = data.get("groups") or []
        primary_role = data.get("primary_role")
        if not primary_role and roles:
            primary_role = roles[0]

        token_scopes = data.get("scopes") or ["*"]
        if isinstance(token_scopes, str):
            token_scopes = [s.strip() for s in token_scopes.split(",") if s.strip()]

        return UserInfo(
            sub=data["user_sub"],
            email=data.get("email"),
            preferred_username=data.get("username"),
            name=data.get("full_name") or data.get("username"),
            given_name=data.get("first_name"),
            family_name=data.get("last_name"),
            role=primary_role,
            perspective=data.get("perspective"),
            team=data.get("team"),
            realm_roles=list(roles),
            groups=list(groups),
            identity_provider=data.get("identity_provider") or "api_key",
            identity_provider_identity=data.get("identity_provider_identity"),
            github_username=data.get("github_username"),
            github_id=data.get("github_id"),
            org_id=data.get("org_id"),
            org_slug=data.get("org_slug"),
            org_role=data.get("org_role"),
            projects=[],  # API key users get projects resolved at request time if needed
            auth_method="api_key",
            scopes=token_scopes,
        )
  • [ ] Step 6: Run test to verify it passes

Run: cd /home/dany/substrate/backend && python -m pytest tests/core/test_authorization.py -v Expected: PASS

  • [ ] Step 7: Commit
git add backend/app/core/security.py backend/tests/core/test_authorization.py
git commit -m "feat: add org context and scopes to UserInfo"

Task 10: Authorization Middleware

Files: - Create: backend/app/core/authorization.py - Test: backend/tests/core/test_authorization.py (extend)

  • [ ] Step 1: Write the failing test

Append to backend/tests/core/test_authorization.py:

from fastapi import HTTPException
from app.core.authorization import require_org_admin, require_org_billing, require_scope


def test_require_org_admin_passes():
    user = UserInfo(sub="u1", org_role="admin")
    result = require_org_admin(user)
    assert result.sub == "u1"


def test_require_org_admin_rejects_member():
    user = UserInfo(sub="u1", org_role="member")
    with pytest.raises(HTTPException) as exc_info:
        require_org_admin(user)
    assert exc_info.value.status_code == 403


def test_require_org_billing_passes_admin():
    user = UserInfo(sub="u1", org_role="admin")
    result = require_org_billing(user)
    assert result.sub == "u1"


def test_require_org_billing_passes_billing():
    user = UserInfo(sub="u1", org_role="billing")
    result = require_org_billing(user)
    assert result.sub == "u1"


def test_require_org_billing_rejects_member():
    user = UserInfo(sub="u1", org_role="member")
    with pytest.raises(HTTPException):
        require_org_billing(user)


def test_require_scope_passes_wildcard():
    user = UserInfo(sub="u1", auth_method="api_key", scopes=["*"])
    checker = require_scope("marketplace:read")
    result = checker(user)
    assert result.sub == "u1"


def test_require_scope_passes_matching():
    user = UserInfo(sub="u1", auth_method="api_key", scopes=["marketplace:read"])
    checker = require_scope("marketplace:read")
    result = checker(user)
    assert result.sub == "u1"


def test_require_scope_rejects_missing():
    user = UserInfo(sub="u1", auth_method="api_key", scopes=["billing:read"])
    checker = require_scope("marketplace:read")
    with pytest.raises(HTTPException) as exc_info:
        checker(user)
    assert exc_info.value.status_code == 403


def test_require_scope_skips_for_jwt():
    user = UserInfo(sub="u1", auth_method="jwt", scopes=[])
    checker = require_scope("marketplace:read")
    result = checker(user)
    assert result.sub == "u1"
  • [ ] Step 2: Run test to verify it fails

Run: cd /home/dany/substrate/backend && python -m pytest tests/core/test_authorization.py -v Expected: FAIL — authorization module not found

  • [ ] Step 3: Write the authorization module

Create backend/app/core/authorization.py:

"""Org-level and scope-based authorization middleware."""
from __future__ import annotations

from typing import Callable

from fastapi import HTTPException

from app.core.security import UserInfo


def require_org_admin(user: UserInfo) -> UserInfo:
    if user.org_role != "admin":
        raise HTTPException(403, "Org admin required")
    return user


def require_org_billing(user: UserInfo) -> UserInfo:
    if user.org_role not in ("admin", "billing"):
        raise HTTPException(403, "Org admin or billing role required")
    return user


def require_scope(scope: str) -> Callable[[UserInfo], UserInfo]:
    """Return a checker that verifies API key scopes. JWT users pass through."""
    def checker(user: UserInfo) -> UserInfo:
        if user.auth_method == "api_key" and "*" not in user.scopes:
            if scope not in user.scopes:
                raise HTTPException(403, f"API key missing scope: {scope}")
        return user
    return checker


def require_team_member(team_id: str) -> Callable[[UserInfo], UserInfo]:
    """Require user is a member of the given team. Org admins pass through."""
    def checker(user: UserInfo) -> UserInfo:
        if user.org_role == "admin":
            return user
        for project in user.projects:
            for team in project.get("teams", []):
                if team.get("id") == team_id:
                    return user
        raise HTTPException(403, "Not a member of this team")
    return checker


def require_team_admin(team_id: str) -> Callable[[UserInfo], UserInfo]:
    """Require user is Admin role in the given team. Org admins pass through."""
    def checker(user: UserInfo) -> UserInfo:
        if user.org_role == "admin":
            return user
        for project in user.projects:
            for team in project.get("teams", []):
                if team.get("id") == team_id and team.get("role") == "Admin":
                    return user
        raise HTTPException(403, "Team admin required")
    return checker
  • [ ] Step 4: Run test to verify it passes

Run: cd /home/dany/substrate/backend && python -m pytest tests/core/test_authorization.py -v Expected: PASS

  • [ ] Step 5: Commit
git add backend/app/core/authorization.py backend/tests/core/test_authorization.py
git commit -m "feat: org-level and scope-based authorization middleware"

Phase 3: Plugin Engine

Task 11: Plugin Base Classes

Files: - Create: backend/app/core/plugins/__init__.py - Create: backend/app/core/plugins/base.py - Test: backend/tests/core/test_plugin_engine.py

  • [ ] Step 1: Write the failing test

Create backend/tests/core/test_plugin_engine.py:

"""Tests for plugin engine base classes and registry."""
import pytest
from app.core.plugins.base import (
    DataConnectorBase,
    PolicyPackBase,
    PluginMeta,
    SyncResult,
    EvalContext,
    EvalResult,
    HealthStatus,
)


def test_plugin_meta():
    meta = PluginMeta(
        id="connector-github",
        name="GitHub Data Connector",
        version="1.0.0",
        type="data_connector",
        entry_point="module.connector:GitHubConnector",
    )
    assert meta.id == "connector-github"
    assert meta.type == "data_connector"


def test_data_connector_base_is_abstract():
    with pytest.raises(TypeError):
        DataConnectorBase()


def test_policy_pack_base_is_abstract():
    with pytest.raises(TypeError):
        PolicyPackBase()
  • [ ] Step 2: Run test to verify it fails

Run: cd /home/dany/substrate/backend && python -m pytest tests/core/test_plugin_engine.py::test_plugin_meta -v Expected: FAIL — module not found

  • [ ] Step 3: Write the base classes

Create backend/app/core/plugins/__init__.py:

"""Plugin engine — abstract bases, registry, and dynamic loader."""

Create backend/app/core/plugins/base.py:

"""Abstract base classes for data connectors and policy packs."""
from __future__ import annotations

from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from typing import Any


@dataclass(slots=True)
class PluginMeta:
    id: str
    name: str
    version: str
    type: str  # "data_connector" | "policy_pack"
    entry_point: str
    category: str = ""
    pricing: str = "free"
    capabilities: list[str] = field(default_factory=list)
    config_schema: dict[str, Any] = field(default_factory=dict)


@dataclass(slots=True)
class SyncResult:
    items_synced: int
    resources: list[str]
    details: dict[str, Any] = field(default_factory=dict)
    errors: list[str] = field(default_factory=list)


@dataclass(slots=True)
class HealthStatus:
    healthy: bool
    message: str = ""
    details: dict[str, Any] = field(default_factory=dict)


@dataclass(slots=True)
class EvalContext:
    scope: dict[str, Any]
    input_data: dict[str, Any]
    config: dict[str, Any] = field(default_factory=dict)


@dataclass(slots=True)
class EvalResult:
    compliant: bool
    violations: list[dict[str, Any]] = field(default_factory=list)
    duration_ms: int = 0
    details: dict[str, Any] = field(default_factory=dict)


class DataConnectorBase(ABC):
    """Abstract base for all data connectors."""
    meta: PluginMeta

    @abstractmethod
    async def configure(self, config: dict[str, Any]) -> None: ...

    @abstractmethod
    async def sync(self, installation_id: str) -> SyncResult: ...

    @abstractmethod
    async def health_check(self, config: dict[str, Any]) -> HealthStatus: ...

    async def install(self, config: dict[str, Any]) -> None:
        """Called once on first install. Override if setup needed."""

    async def uninstall(self, installation_id: str) -> None:
        """Called on uninstall. Override for cleanup."""


class PolicyPackBase(ABC):
    """Abstract base for all policy packs."""
    meta: PluginMeta

    @abstractmethod
    async def activate(self, config: dict[str, Any]) -> None: ...

    @abstractmethod
    async def evaluate(self, context: EvalContext) -> EvalResult: ...

    @abstractmethod
    def get_policies(self) -> list[dict[str, Any]]: ...

    async def deactivate(self) -> None:
        """Called on deactivation. Override for cleanup."""
  • [ ] Step 4: Run test to verify it passes

Run: cd /home/dany/substrate/backend && python -m pytest tests/core/test_plugin_engine.py -v Expected: PASS

  • [ ] Step 5: Commit
git add backend/app/core/plugins/ backend/tests/core/test_plugin_engine.py
git commit -m "feat: plugin engine base classes — DataConnectorBase, PolicyPackBase"

Task 12: Plugin Registry and Deps

Files: - Create: backend/app/core/plugins/registry.py - Create: backend/app/core/plugins/deps.py - Test: backend/tests/core/test_plugin_engine.py (extend)

  • [ ] Step 1: Write the failing test

Append to backend/tests/core/test_plugin_engine.py:

from app.core.plugins.registry import PluginRegistry
from app.core.plugins.base import DataConnectorBase, PluginMeta, SyncResult, HealthStatus


class FakeConnector(DataConnectorBase):
    meta = PluginMeta(id="test-conn", name="Test", version="1.0", type="data_connector", entry_point="test")

    async def configure(self, config):
        pass

    async def sync(self, installation_id):
        return SyncResult(items_synced=0, resources=[])

    async def health_check(self, config):
        return HealthStatus(healthy=True)


def test_registry_register_and_get():
    registry = PluginRegistry()
    registry.register_connector(FakeConnector)
    cls = registry.get_connector("test-conn")
    assert cls is FakeConnector


def test_registry_get_unknown_raises():
    registry = PluginRegistry()
    with pytest.raises(KeyError):
        registry.get_connector("nonexistent")


def test_registry_list_all():
    registry = PluginRegistry()
    registry.register_connector(FakeConnector)
    all_plugins = registry.list_all()
    assert len(all_plugins) == 1
    assert all_plugins[0]["id"] == "test-conn"
  • [ ] Step 2: Run test to verify it fails

Run: cd /home/dany/substrate/backend && python -m pytest tests/core/test_plugin_engine.py::test_registry_register_and_get -v Expected: FAIL

  • [ ] Step 3: Write the registry

Create backend/app/core/plugins/registry.py:

"""Singleton plugin registry — connectors and policy packs register here."""
from __future__ import annotations

from typing import Any

from app.core.plugins.base import DataConnectorBase, PolicyPackBase


class PluginRegistry:
    """Registry for connector and policy pack plugin classes."""

    def __init__(self) -> None:
        self._connectors: dict[str, type[DataConnectorBase]] = {}
        self._policy_packs: dict[str, type[PolicyPackBase]] = {}
        self._instances: dict[str, DataConnectorBase | PolicyPackBase] = {}

    def register_connector(self, cls: type[DataConnectorBase]) -> None:
        self._connectors[cls.meta.id] = cls

    def register_policy_pack(self, cls: type[PolicyPackBase]) -> None:
        self._policy_packs[cls.meta.id] = cls

    def get_connector(self, module_id: str) -> type[DataConnectorBase]:
        cls = self._connectors.get(module_id)
        if cls is None:
            raise KeyError(f"No connector registered: {module_id}")
        return cls

    def get_policy_pack(self, module_id: str) -> type[PolicyPackBase]:
        cls = self._policy_packs.get(module_id)
        if cls is None:
            raise KeyError(f"No policy pack registered: {module_id}")
        return cls

    def get_or_create_instance(self, module_id: str) -> DataConnectorBase | PolicyPackBase:
        if module_id in self._instances:
            return self._instances[module_id]
        if module_id in self._connectors:
            instance = self._connectors[module_id]()
            self._instances[module_id] = instance
            return instance
        if module_id in self._policy_packs:
            instance = self._policy_packs[module_id]()
            self._instances[module_id] = instance
            return instance
        raise KeyError(f"No plugin registered: {module_id}")

    def list_all(self) -> list[dict[str, Any]]:
        items: list[dict[str, Any]] = []
        for cls in self._connectors.values():
            items.append({"id": cls.meta.id, "name": cls.meta.name, "type": "data_connector", "version": cls.meta.version})
        for cls in self._policy_packs.values():
            items.append({"id": cls.meta.id, "name": cls.meta.name, "type": "policy_pack", "version": cls.meta.version})
        return items

    def clear(self) -> None:
        self._connectors.clear()
        self._policy_packs.clear()
        self._instances.clear()

Create backend/app/core/plugins/deps.py:

"""Shared dependencies injected into plugin instances."""
from __future__ import annotations

from dataclasses import dataclass
from typing import Any


@dataclass
class PluginDeps:
    """Injected into plugins — they never import drivers directly."""
    pg_session_factory: Any  # AsyncSession factory
    neo4j_driver: Any  # Neo4j async driver
    redis_client: Any  # Redis async client
    opa_client: Any  # OPA HTTP client
  • [ ] Step 4: Run tests

Run: cd /home/dany/substrate/backend && python -m pytest tests/core/test_plugin_engine.py -v Expected: PASS

  • [ ] Step 5: Commit
git add backend/app/core/plugins/registry.py backend/app/core/plugins/deps.py backend/tests/core/test_plugin_engine.py
git commit -m "feat: PluginRegistry singleton and PluginDeps container"

Task 13: Bundle Loader

Files: - Create: backend/app/core/plugins/loader.py

  • [ ] Step 1: Write the failing test

Append to backend/tests/core/test_plugin_engine.py:

import json
import tarfile
import io
import hashlib
from app.core.plugins.loader import extract_bundle, verify_manifest_hashes


def _make_test_bundle(manifest: dict, files: dict[str, bytes]) -> bytes:
    """Create a .substrate bundle (tar.gz) in memory."""
    buf = io.BytesIO()
    with tarfile.open(fileobj=buf, mode="w:gz") as tar:
        manifest_bytes = json.dumps(manifest).encode()
        info = tarfile.TarInfo(name="manifest.json")
        info.size = len(manifest_bytes)
        tar.addfile(info, io.BytesIO(manifest_bytes))
        for name, content in files.items():
            info = tarfile.TarInfo(name=name)
            info.size = len(content)
            tar.addfile(info, io.BytesIO(content))
    return buf.getvalue()


def test_extract_bundle():
    init_content = b"# init"
    connector_content = b"class Foo: pass"
    file_hashes = {
        "module/__init__.py": f"sha256:{hashlib.sha256(init_content).hexdigest()}",
        "module/connector.py": f"sha256:{hashlib.sha256(connector_content).hexdigest()}",
    }
    manifest = {"id": "test", "version": "1.0.0", "file_hashes": file_hashes}
    bundle = _make_test_bundle(manifest, {
        "module/__init__.py": init_content,
        "module/connector.py": connector_content,
    })
    result = extract_bundle(bundle)
    assert result["manifest"]["id"] == "test"
    assert "module/__init__.py" in result["files"]


def test_verify_manifest_hashes_passes():
    content = b"hello"
    file_hashes = {"module/test.py": f"sha256:{hashlib.sha256(content).hexdigest()}"}
    files = {"module/test.py": content}
    errors = verify_manifest_hashes(file_hashes, files)
    assert errors == []


def test_verify_manifest_hashes_fails():
    file_hashes = {"module/test.py": "sha256:bad"}
    files = {"module/test.py": b"hello"}
    errors = verify_manifest_hashes(file_hashes, files)
    assert len(errors) == 1
  • [ ] Step 2: Run test to verify it fails

Run: cd /home/dany/substrate/backend && python -m pytest tests/core/test_plugin_engine.py::test_extract_bundle -v Expected: FAIL

  • [ ] Step 3: Write the loader

Create backend/app/core/plugins/loader.py:

"""Bundle extraction, hash verification, and dynamic import for .substrate files."""
from __future__ import annotations

import hashlib
import io
import json
import tarfile
from typing import Any


def extract_bundle(data: bytes) -> dict[str, Any]:
    """Extract a .substrate tar.gz bundle and return manifest + file contents."""
    buf = io.BytesIO(data)
    manifest: dict[str, Any] | None = None
    signature: dict[str, Any] | None = None
    license_jwt: str | None = None
    files: dict[str, bytes] = {}

    with tarfile.open(fileobj=buf, mode="r:gz") as tar:
        for member in tar.getmembers():
            if not member.isfile():
                continue
            f = tar.extractfile(member)
            if f is None:
                continue
            content = f.read()
            if member.name == "manifest.json":
                manifest = json.loads(content)
            elif member.name == "signature.json":
                signature = json.loads(content)
            elif member.name == "license.jwt":
                license_jwt = content.decode("utf-8").strip()
            else:
                files[member.name] = content

    if manifest is None:
        raise ValueError("Bundle missing manifest.json")

    return {
        "manifest": manifest,
        "signature": signature,
        "license_jwt": license_jwt,
        "files": files,
    }


def verify_manifest_hashes(
    file_hashes: dict[str, str],
    files: dict[str, bytes],
) -> list[str]:
    """Verify each file's SHA256 against manifest.file_hashes. Returns list of errors."""
    errors: list[str] = []
    for path, expected_hash in file_hashes.items():
        if path not in files:
            errors.append(f"Missing file: {path}")
            continue
        actual = hashlib.sha256(files[path]).hexdigest()
        expected = expected_hash.removeprefix("sha256:")
        if actual != expected:
            errors.append(f"Hash mismatch for {path}: expected {expected}, got {actual}")
    return errors


def compute_manifest_hash(manifest_bytes: bytes) -> str:
    """Compute SHA256 of the raw manifest.json bytes."""
    return hashlib.sha256(manifest_bytes).hexdigest()
  • [ ] Step 4: Run tests

Run: cd /home/dany/substrate/backend && python -m pytest tests/core/test_plugin_engine.py -v Expected: PASS

  • [ ] Step 5: Commit
git add backend/app/core/plugins/loader.py backend/tests/core/test_plugin_engine.py
git commit -m "feat: .substrate bundle loader with hash verification"

Phase 4: Licensing Module

Task 14: Migrate License Service to ECDSA P-256

Files: - Modify: backend/app/modules/marketplace/license.py - Test: backend/tests/modules/test_licensing.py

  • [ ] Step 1: Write the failing test

Create backend/tests/modules/test_licensing.py:

"""Tests for ECDSA license signing and verification."""
import pytest
from cryptography.hazmat.primitives.asymmetric import ec
from cryptography.hazmat.primitives import serialization


def _generate_test_keys() -> tuple[str, str]:
    private_key = ec.generate_private_key(ec.SECP256R1())
    private_pem = private_key.private_bytes(
        serialization.Encoding.PEM,
        serialization.PrivateFormat.PKCS8,
        serialization.NoEncryption(),
    ).decode()
    public_pem = private_key.public_key().public_bytes(
        serialization.Encoding.PEM,
        serialization.PublicFormat.SubjectPublicKeyInfo,
    ).decode()
    return private_pem, public_pem


def test_ecdsa_issue_and_verify():
    from app.modules.licensing.service import LicenseTokenService
    private_pem, public_pem = _generate_test_keys()
    svc = LicenseTokenService(
        private_key_pem=private_pem,
        public_key_pem=public_pem,
        kid="test-key",
    )
    token = svc.issue_org_license(
        org_id="org-1",
        tier="organization",
        max_users=50,
        max_projects=5,
    )
    assert isinstance(token, str)
    claims = svc.verify(token)
    assert claims["sub"] == "org-1"
    assert claims["tier"] == "organization"


def test_ecdsa_module_entitlement():
    from app.modules.licensing.service import LicenseTokenService
    private_pem, public_pem = _generate_test_keys()
    svc = LicenseTokenService(
        private_key_pem=private_pem,
        public_key_pem=public_pem,
        kid="test-key",
    )
    token = svc.issue_module_entitlement(
        org_id="org-1",
        module_id="connector-jira",
    )
    claims = svc.verify(token)
    assert claims["sub"] == "org-1"
    assert claims["module_id"] == "connector-jira"
    assert claims["type"] == "module_entitlement"


def test_ecdsa_reject_tampered():
    from app.modules.licensing.service import LicenseTokenService
    private_pem, public_pem = _generate_test_keys()
    svc = LicenseTokenService(private_key_pem=private_pem, public_key_pem=public_pem, kid="k")
    token = svc.issue_org_license(org_id="org-1", tier="free", max_users=10, max_projects=1)
    tampered = token[:-5] + "XXXXX"
    with pytest.raises(ValueError, match="Invalid"):
        svc.verify(tampered)
  • [ ] Step 2: Run test to verify it fails

Run: cd /home/dany/substrate/backend && python -m pytest tests/modules/test_licensing.py -v Expected: FAIL — licensing.service module not found

  • [ ] Step 3: Create the licensing module

Create backend/app/modules/licensing/__init__.py:

"""Licensing module — ECDSA license signing, tier enforcement, entitlement management."""

Create backend/app/modules/licensing/service.py:

"""ECDSA P-256 license token generation and verification."""
from __future__ import annotations

import time
from typing import Any

from jose import jwt, JWTError


class LicenseTokenService:
    """Sign and verify license JWTs using ES256 (ECDSA P-256)."""

    def __init__(
        self,
        private_key_pem: str | None = None,
        public_key_pem: str | None = None,
        kid: str = "substrate-license-key",
    ) -> None:
        self._private_key = private_key_pem
        self._public_key = public_key_pem
        self._kid = kid

    @property
    def kid(self) -> str:
        return self._kid

    def issue_org_license(
        self,
        *,
        org_id: str,
        tier: str,
        max_users: int,
        max_projects: int,
        expires_in_seconds: int = 365 * 24 * 3600,
    ) -> str:
        now = int(time.time())
        claims = {
            "iss": "substrate-license-authority",
            "sub": org_id,
            "type": "platform",
            "tier": tier,
            "max_users": max_users,
            "max_projects": max_projects,
            "iat": now,
            "exp": now + expires_in_seconds,
        }
        return jwt.encode(
            claims,
            self._private_key,
            algorithm="ES256",
            headers={"kid": self._kid, "typ": "JWT"},
        )

    def issue_module_entitlement(
        self,
        *,
        org_id: str,
        module_id: str,
        expires_in_seconds: int | None = None,
    ) -> str:
        now = int(time.time())
        claims: dict[str, Any] = {
            "iss": "substrate-license-authority",
            "sub": org_id,
            "type": "module_entitlement",
            "module_id": module_id,
            "iat": now,
        }
        if expires_in_seconds is not None:
            claims["exp"] = now + expires_in_seconds
        else:
            # Perpetual for one-time purchases — set far-future expiry
            claims["exp"] = now + 100 * 365 * 24 * 3600
        return jwt.encode(
            claims,
            self._private_key,
            algorithm="ES256",
            headers={"kid": self._kid, "typ": "JWT"},
        )

    def verify(self, token: str) -> dict[str, Any]:
        try:
            return jwt.decode(
                token,
                self._public_key,
                algorithms=["ES256"],
                options={"verify_aud": False},
            )
        except JWTError as exc:
            raise ValueError(f"Invalid license token: {exc}") from exc
  • [ ] Step 4: Run tests

Run: cd /home/dany/substrate/backend && python -m pytest tests/modules/test_licensing.py -v Expected: PASS

  • [ ] Step 5: Commit
git add backend/app/modules/licensing/ backend/tests/modules/test_licensing.py
git commit -m "feat: ECDSA P-256 license token service"

Task 15: Licensing Router, Repository, Schemas

Files: - Create: backend/app/modules/licensing/router.py - Create: backend/app/modules/licensing/repository.py - Create: backend/app/modules/licensing/schemas.py - Create: backend/app/modules/licensing/dependencies.py

  • [ ] Step 1: Write the schemas

Create backend/app/modules/licensing/schemas.py:

"""Pydantic schemas for licensing endpoints."""
from __future__ import annotations

from pydantic import BaseModel


class OrgLicenseOut(BaseModel):
    org_id: str
    tier: str
    max_users: int
    max_projects: int
    status: str
    issued_at: str | None = None
    expires_at: str | None = None


class OrgLicenseResponse(BaseModel):
    license: OrgLicenseOut


class ImportLicenseRequest(BaseModel):
    license_token: str


class ImportLicenseResponse(BaseModel):
    license: OrgLicenseOut
    message: str


class EntitlementOut(BaseModel):
    org_id: str
    module_id: str
    status: str
    granted_at: str | None = None
    expires_at: str | None = None


class EntitlementListResponse(BaseModel):
    data: list[EntitlementOut]
    meta: dict[str, int]


class EntitlementCheckResponse(BaseModel):
    module_id: str
    entitled: bool
    reason: str | None = None


class ImportEntitlementRequest(BaseModel):
    entitlement_token: str


class ImportEntitlementResponse(BaseModel):
    entitlement: EntitlementOut
    message: str


class VerifyLicenseRequest(BaseModel):
    token: str


class VerifyLicenseResponse(BaseModel):
    valid: bool
    claims: dict | None = None
    error: str | None = None
  • [ ] Step 2: Write the repository

Create backend/app/modules/licensing/repository.py:

"""Licensing data access layer."""
from __future__ import annotations

from typing import Any

from sqlalchemy import text
from sqlalchemy.ext.asyncio import AsyncSession


class LicensingRepository:
    def __init__(self, session: AsyncSession):
        self.session = session

    async def get_org_license(self, org_id: str) -> dict[str, Any] | None:
        query = text(
            """
            SELECT id, org_id, tier, max_users, max_projects, license_token,
                   issued_at, expires_at, status
            FROM org_licenses
            WHERE org_id = :org_id AND status = 'active'
            LIMIT 1
            """
        )
        row = (await self.session.execute(query, {"org_id": org_id})).first()
        return dict(row._mapping) if row else None

    async def upsert_org_license(
        self, *, org_id: str, tier: str, max_users: int, max_projects: int,
        license_token: str | None, expires_at: Any | None,
    ) -> dict[str, Any]:
        query = text(
            """
            INSERT INTO org_licenses (org_id, tier, max_users, max_projects, license_token, expires_at)
            VALUES (:org_id, :tier, :max_users, :max_projects, :license_token, :expires_at)
            ON CONFLICT (org_id) DO UPDATE SET
                tier = EXCLUDED.tier,
                max_users = EXCLUDED.max_users,
                max_projects = EXCLUDED.max_projects,
                license_token = EXCLUDED.license_token,
                expires_at = EXCLUDED.expires_at,
                status = 'active',
                issued_at = now()
            RETURNING *
            """
        )
        row = (await self.session.execute(query, {
            "org_id": org_id, "tier": tier, "max_users": max_users,
            "max_projects": max_projects, "license_token": license_token,
            "expires_at": expires_at,
        })).first()
        return dict(row._mapping)

    async def list_entitlements(self, org_id: str) -> list[dict[str, Any]]:
        query = text(
            """
            SELECT org_id, module_id, status, starts_at AS granted_at, expires_at
            FROM marketplace_entitlements
            WHERE org_id = :org_id AND status = 'active'
            ORDER BY starts_at DESC
            """
        )
        result = await self.session.execute(query, {"org_id": org_id})
        return [dict(row._mapping) for row in result]

    async def check_entitlement(self, org_id: str, module_id: str) -> dict[str, Any] | None:
        query = text(
            """
            SELECT org_id, module_id, status, starts_at AS granted_at, expires_at
            FROM marketplace_entitlements
            WHERE org_id = :org_id
              AND module_id = (SELECT id FROM marketplace_modules WHERE module_key = :module_id LIMIT 1)
              AND status = 'active'
            LIMIT 1
            """
        )
        row = (await self.session.execute(query, {"org_id": org_id, "module_id": module_id})).first()
        return dict(row._mapping) if row else None
  • [ ] Step 3: Write the router

Create backend/app/modules/licensing/router.py:

"""Licensing API routes."""
from __future__ import annotations

from fastapi import APIRouter, Depends

from app.core.authorization import require_org_admin, require_scope
from app.core.security import UserInfo, get_current_user
from app.modules.licensing.dependencies import get_licensing_service
from app.modules.licensing.schemas import (
    EntitlementCheckResponse,
    EntitlementListResponse,
    ImportEntitlementRequest,
    ImportEntitlementResponse,
    ImportLicenseRequest,
    ImportLicenseResponse,
    OrgLicenseResponse,
    VerifyLicenseRequest,
    VerifyLicenseResponse,
)
from app.modules.licensing.service import LicensingService

router = APIRouter(prefix="/licensing", tags=["licensing"])


@router.get("/license", response_model=OrgLicenseResponse)
async def get_org_license(
    user: UserInfo = Depends(get_current_user),
    service: LicensingService = Depends(get_licensing_service),
):
    require_org_admin(user)
    return await service.get_org_license(user)


@router.post("/license/import", response_model=ImportLicenseResponse)
async def import_license(
    payload: ImportLicenseRequest,
    user: UserInfo = Depends(get_current_user),
    service: LicensingService = Depends(get_licensing_service),
):
    require_org_admin(user)
    return await service.import_license(user, payload.license_token)


@router.get("/entitlements", response_model=EntitlementListResponse)
async def list_entitlements(
    user: UserInfo = Depends(get_current_user),
    service: LicensingService = Depends(get_licensing_service),
):
    require_scope("marketplace:read")(user)
    return await service.list_entitlements(user)


@router.get("/entitlements/{module_id}", response_model=EntitlementCheckResponse)
async def check_entitlement(
    module_id: str,
    user: UserInfo = Depends(get_current_user),
    service: LicensingService = Depends(get_licensing_service),
):
    require_scope("marketplace:read")(user)
    return await service.check_entitlement(user, module_id)


@router.post("/entitlements/import", response_model=ImportEntitlementResponse)
async def import_entitlement(
    payload: ImportEntitlementRequest,
    user: UserInfo = Depends(get_current_user),
    service: LicensingService = Depends(get_licensing_service),
):
    require_org_admin(user)
    return await service.import_entitlement(user, payload.entitlement_token)


@router.post("/verify", response_model=VerifyLicenseResponse)
async def verify_license(
    payload: VerifyLicenseRequest,
    user: UserInfo = Depends(get_current_user),
    service: LicensingService = Depends(get_licensing_service),
):
    return service.verify_token(payload.token)
  • [ ] Step 4: Write the dependencies

Create backend/app/modules/licensing/dependencies.py:

"""Dependency injection for the licensing module."""
from __future__ import annotations

from fastapi import Depends
from sqlalchemy.ext.asyncio import AsyncSession

from app.core.database.postgres import get_session
from app.modules.licensing.repository import LicensingRepository
from app.modules.licensing.service import LicenseTokenService, LicensingService
from app.settings import get_settings


def get_licensing_service(session: AsyncSession = Depends(get_session)) -> LicensingService:
    settings = get_settings()
    token_service = LicenseTokenService(
        private_key_pem=settings.marketplace_license_private_key_pem,
        public_key_pem=settings.marketplace_license_public_key_pem,
        kid=settings.marketplace_license_kid,
    )
    repo = LicensingRepository(session)
    return LicensingService(repo=repo, token_service=token_service)

Note: LicensingService wraps LicenseTokenService with repository access. Add to licensing/service.py:

class LicensingService:
    """Orchestrates license/entitlement operations with DB persistence."""

    def __init__(self, repo: LicensingRepository, token_service: LicenseTokenService):
        self.repo = repo
        self.token_service = token_service

    async def get_org_license(self, user: UserInfo) -> OrgLicenseResponse:
        from app.modules.licensing.schemas import OrgLicenseOut, OrgLicenseResponse
        row = await self.repo.get_org_license(user.org_id)
        if row is None:
            return OrgLicenseResponse(license=OrgLicenseOut(
                org_id=user.org_id or "", tier="free", max_users=10, max_projects=1, status="active"
            ))
        return OrgLicenseResponse(license=OrgLicenseOut(
            org_id=str(row["org_id"]), tier=row["tier"], max_users=row["max_users"],
            max_projects=row["max_projects"], status=row["status"],
            issued_at=str(row.get("issued_at") or ""),
            expires_at=str(row.get("expires_at") or ""),
        ))

    async def import_license(self, user: UserInfo, token: str) -> ImportLicenseResponse:
        from app.modules.licensing.schemas import ImportLicenseResponse, OrgLicenseOut
        claims = self.token_service.verify(token)
        row = await self.repo.upsert_org_license(
            org_id=user.org_id or claims["sub"],
            tier=claims["tier"],
            max_users=claims["max_users"],
            max_projects=claims["max_projects"],
            license_token=token,
            expires_at=None,
        )
        return ImportLicenseResponse(
            license=OrgLicenseOut(
                org_id=str(row["org_id"]), tier=row["tier"],
                max_users=row["max_users"], max_projects=row["max_projects"],
                status=row["status"],
            ),
            message="License imported successfully",
        )

    async def list_entitlements(self, user: UserInfo) -> EntitlementListResponse:
        from app.modules.licensing.schemas import EntitlementListResponse, EntitlementOut
        rows = await self.repo.list_entitlements(user.org_id)
        items = [EntitlementOut(
            org_id=str(r["org_id"]), module_id=str(r["module_id"]),
            status=r["status"],
            granted_at=str(r.get("granted_at") or ""),
            expires_at=str(r.get("expires_at") or ""),
        ) for r in rows]
        return EntitlementListResponse(data=items, meta={"total": len(items)})

    async def check_entitlement(self, user: UserInfo, module_id: str) -> EntitlementCheckResponse:
        from app.modules.licensing.schemas import EntitlementCheckResponse
        row = await self.repo.check_entitlement(user.org_id, module_id)
        if row:
            return EntitlementCheckResponse(module_id=module_id, entitled=True)
        return EntitlementCheckResponse(module_id=module_id, entitled=False, reason="No active entitlement")

    async def import_entitlement(self, user: UserInfo, token: str) -> ImportEntitlementResponse:
        from app.modules.licensing.schemas import ImportEntitlementResponse, EntitlementOut
        claims = self.token_service.verify(token)
        return ImportEntitlementResponse(
            entitlement=EntitlementOut(
                org_id=claims["sub"], module_id=claims["module_id"],
                status="active",
            ),
            message="Entitlement imported successfully",
        )

    def verify_token(self, token: str) -> VerifyLicenseResponse:
        from app.modules.licensing.schemas import VerifyLicenseResponse
        try:
            claims = self.token_service.verify(token)
            return VerifyLicenseResponse(valid=True, claims=claims)
        except ValueError as exc:
            return VerifyLicenseResponse(valid=False, error=str(exc))

Add these imports at the top of licensing/service.py:

from app.core.security import UserInfo
from app.modules.licensing.repository import LicensingRepository
from app.modules.licensing.schemas import (
    EntitlementCheckResponse,
    EntitlementListResponse,
    ImportEntitlementResponse,
    ImportLicenseResponse,
    OrgLicenseResponse,
    VerifyLicenseResponse,
)
  • [ ] Step 5: Commit
git add backend/app/modules/licensing/
git commit -m "feat: licensing module — router, service, repository, schemas"

Phase 5: Billing Module

Task 16: Payment Provider Abstraction

Files: - Create: backend/app/modules/billing/__init__.py - Create: backend/app/modules/billing/providers/__init__.py - Create: backend/app/modules/billing/providers/base.py - Create: backend/app/modules/billing/providers/stripe_provider.py - Create: backend/app/modules/billing/providers/offline_provider.py - Test: backend/tests/modules/test_billing.py

  • [ ] Step 1: Write the failing test

Create backend/tests/modules/test_billing.py:

"""Tests for billing module."""
import pytest
from app.modules.billing.providers.base import PaymentProvider, PaymentResult
from app.modules.billing.providers.offline_provider import OfflineProvider


@pytest.mark.asyncio
async def test_offline_provider_charge():
    provider = OfflineProvider()
    result = await provider.charge("cust-1", 4900, {"module_id": "jira"})
    assert isinstance(result, PaymentResult)
    assert result.success is True
    assert result.provider_id.startswith("offline_")


@pytest.mark.asyncio
async def test_offline_provider_create_customer():
    provider = OfflineProvider()
    customer_id = await provider.create_customer("org-1", "admin@acme.com")
    assert customer_id.startswith("offline_cust_")
  • [ ] Step 2: Run test to verify it fails

Run: cd /home/dany/substrate/backend && python -m pytest tests/modules/test_billing.py -v Expected: FAIL

  • [ ] Step 3: Write the payment provider abstraction

Create backend/app/modules/billing/__init__.py:

"""Billing module — payment provider abstraction, invoices, transactions."""

Create backend/app/modules/billing/providers/__init__.py:

"""Payment provider implementations."""

Create backend/app/modules/billing/providers/base.py:

"""Abstract payment provider interface."""
from __future__ import annotations

from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import Any


@dataclass(slots=True)
class PaymentResult:
    success: bool
    provider_id: str
    error: str | None = None


@dataclass(slots=True)
class SubscriptionResult:
    subscription_id: str
    status: str


class PaymentProvider(ABC):
    @abstractmethod
    async def create_customer(self, org_id: str, email: str) -> str: ...

    @abstractmethod
    async def charge(self, customer_id: str, amount_cents: int, metadata: dict[str, Any]) -> PaymentResult: ...

    @abstractmethod
    async def create_subscription(self, customer_id: str, plan_id: str, cycle: str) -> SubscriptionResult: ...

    @abstractmethod
    async def cancel_subscription(self, subscription_id: str) -> None: ...

    @abstractmethod
    async def get_invoices(self, customer_id: str) -> list[dict[str, Any]]: ...

Create backend/app/modules/billing/providers/offline_provider.py:

"""No-op payment provider for air-gapped deployments."""
from __future__ import annotations

import uuid
from typing import Any

from app.modules.billing.providers.base import PaymentProvider, PaymentResult, SubscriptionResult


class OfflineProvider(PaymentProvider):
    async def create_customer(self, org_id: str, email: str) -> str:
        return f"offline_cust_{uuid.uuid4().hex[:12]}"

    async def charge(self, customer_id: str, amount_cents: int, metadata: dict[str, Any]) -> PaymentResult:
        return PaymentResult(success=True, provider_id=f"offline_{uuid.uuid4().hex[:12]}")

    async def create_subscription(self, customer_id: str, plan_id: str, cycle: str) -> SubscriptionResult:
        return SubscriptionResult(subscription_id=f"offline_sub_{uuid.uuid4().hex[:12]}", status="active")

    async def cancel_subscription(self, subscription_id: str) -> None:
        pass

    async def get_invoices(self, customer_id: str) -> list[dict[str, Any]]:
        return []

Create backend/app/modules/billing/providers/stripe_provider.py:

"""Stripe payment provider implementation."""
from __future__ import annotations

from typing import Any

from app.modules.billing.providers.base import PaymentProvider, PaymentResult, SubscriptionResult


class StripeProvider(PaymentProvider):
    """Real Stripe SDK integration. Requires stripe_secret_key."""

    def __init__(self, secret_key: str):
        self._secret_key = secret_key

    async def create_customer(self, org_id: str, email: str) -> str:
        import stripe
        stripe.api_key = self._secret_key
        customer = stripe.Customer.create(email=email, metadata={"org_id": org_id})
        return customer.id

    async def charge(self, customer_id: str, amount_cents: int, metadata: dict[str, Any]) -> PaymentResult:
        import stripe
        stripe.api_key = self._secret_key
        try:
            intent = stripe.PaymentIntent.create(
                amount=amount_cents, currency="usd",
                customer=customer_id, metadata=metadata,
                confirm=True, automatic_payment_methods={"enabled": True, "allow_redirects": "never"},
            )
            return PaymentResult(success=True, provider_id=intent.id)
        except stripe.StripeError as e:
            return PaymentResult(success=False, provider_id="", error=str(e))

    async def create_subscription(self, customer_id: str, plan_id: str, cycle: str) -> SubscriptionResult:
        import stripe
        stripe.api_key = self._secret_key
        sub = stripe.Subscription.create(customer=customer_id, items=[{"price": plan_id}])
        return SubscriptionResult(subscription_id=sub.id, status=sub.status)

    async def cancel_subscription(self, subscription_id: str) -> None:
        import stripe
        stripe.api_key = self._secret_key
        stripe.Subscription.delete(subscription_id)

    async def get_invoices(self, customer_id: str) -> list[dict[str, Any]]:
        import stripe
        stripe.api_key = self._secret_key
        invoices = stripe.Invoice.list(customer=customer_id, limit=50)
        return [{"id": inv.id, "amount": inv.amount_paid, "status": inv.status} for inv in invoices.data]
  • [ ] Step 4: Run tests

Run: cd /home/dany/substrate/backend && python -m pytest tests/modules/test_billing.py -v Expected: PASS

  • [ ] Step 5: Commit
git add backend/app/modules/billing/ backend/tests/modules/test_billing.py
git commit -m "feat: billing payment provider abstraction — Stripe + Offline"

Task 17: Billing Router, Service, Repository, Schemas

Files: - Create: backend/app/modules/billing/schemas.py - Create: backend/app/modules/billing/repository.py - Create: backend/app/modules/billing/service.py - Create: backend/app/modules/billing/router.py - Create: backend/app/modules/billing/dependencies.py

  • [ ] Step 1: Write billing schemas

Create backend/app/modules/billing/schemas.py:

"""Pydantic schemas for billing endpoints."""
from __future__ import annotations

from pydantic import BaseModel


class BillingAccountOut(BaseModel):
    org_id: str
    billing_email: str | None = None
    billing_cycle: str | None = None
    payment_method_last4: str | None = None
    stripe_customer_id: str | None = None


class BillingAccountResponse(BaseModel):
    account: BillingAccountOut


class ConfigureBillingRequest(BaseModel):
    billing_email: str | None = None
    billing_cycle: str | None = None  # "monthly" | "yearly"


class PurchaseRequest(BaseModel):
    module_id: str


class PurchaseResponse(BaseModel):
    transaction_id: str
    module_id: str
    amount_cents: int
    status: str


class TransactionOut(BaseModel):
    id: str
    module_id: str | None = None
    amount_cents: int
    currency: str
    status: str
    created_at: str


class TransactionListResponse(BaseModel):
    data: list[TransactionOut]
    meta: dict[str, int]


class InvoiceOut(BaseModel):
    id: str
    amount_cents: int
    currency: str
    status: str
    period_start: str | None = None
    period_end: str | None = None
    pdf_url: str | None = None


class InvoiceListResponse(BaseModel):
    data: list[InvoiceOut]
    meta: dict[str, int]


class UsageOut(BaseModel):
    current_users: int
    max_users: int
    current_projects: int
    max_projects: int
    tier: str
  • [ ] Step 2: Write billing repository

Create backend/app/modules/billing/repository.py:

"""Billing data access layer."""
from __future__ import annotations

from typing import Any

from sqlalchemy import text
from sqlalchemy.ext.asyncio import AsyncSession


class BillingRepository:
    def __init__(self, session: AsyncSession):
        self.session = session

    async def get_billing_account(self, org_id: str) -> dict[str, Any] | None:
        query = text(
            "SELECT * FROM billing_accounts WHERE org_id = :org_id LIMIT 1"
        )
        row = (await self.session.execute(query, {"org_id": org_id})).first()
        return dict(row._mapping) if row else None

    async def upsert_billing_account(self, org_id: str, **kwargs: Any) -> dict[str, Any]:
        sets = ", ".join(f"{k} = :{k}" for k in kwargs if kwargs[k] is not None)
        cols = ", ".join(["org_id"] + [k for k in kwargs if kwargs[k] is not None])
        vals = ", ".join([":org_id"] + [f":{k}" for k in kwargs if kwargs[k] is not None])
        query = text(f"""
            INSERT INTO billing_accounts ({cols}) VALUES ({vals})
            ON CONFLICT (org_id) DO UPDATE SET {sets}, updated_at = now()
            RETURNING *
        """)
        params = {"org_id": org_id, **{k: v for k, v in kwargs.items() if v is not None}}
        row = (await self.session.execute(query, params)).first()
        return dict(row._mapping)

    async def create_transaction(
        self, billing_account_id: str, module_id: str | None,
        amount_cents: int, currency: str, stripe_id: str | None, status: str,
    ) -> dict[str, Any]:
        query = text("""
            INSERT INTO transactions (billing_account_id, module_id, amount_cents, currency, stripe_payment_intent_id, status)
            VALUES (:ba_id, :module_id, :amount, :currency, :stripe_id, :status)
            RETURNING *
        """)
        row = (await self.session.execute(query, {
            "ba_id": billing_account_id, "module_id": module_id,
            "amount": amount_cents, "currency": currency,
            "stripe_id": stripe_id, "status": status,
        })).first()
        return dict(row._mapping)

    async def list_transactions(self, org_id: str) -> list[dict[str, Any]]:
        query = text("""
            SELECT t.* FROM transactions t
            INNER JOIN billing_accounts ba ON ba.id = t.billing_account_id
            WHERE ba.org_id = :org_id ORDER BY t.created_at DESC
        """)
        result = await self.session.execute(query, {"org_id": org_id})
        return [dict(row._mapping) for row in result]

    async def list_invoices(self, org_id: str) -> list[dict[str, Any]]:
        query = text("""
            SELECT i.* FROM invoices i
            INNER JOIN billing_accounts ba ON ba.id = i.billing_account_id
            WHERE ba.org_id = :org_id ORDER BY i.created_at DESC
        """)
        result = await self.session.execute(query, {"org_id": org_id})
        return [dict(row._mapping) for row in result]

    async def get_usage(self, org_id: str) -> dict[str, Any]:
        query = text("""
            SELECT
                (SELECT COUNT(*) FROM org_memberships WHERE org_id = :org_id) AS current_users,
                (SELECT COUNT(*) FROM org_projects WHERE org_id = :org_id) AS current_projects,
                o.tier, o.max_users, o.max_projects
            FROM organizations o WHERE o.id = :org_id
        """)
        row = (await self.session.execute(query, {"org_id": org_id})).first()
        return dict(row._mapping) if row else {"current_users": 0, "current_projects": 0, "tier": "free", "max_users": 10, "max_projects": 1}
  • [ ] Step 3: Write billing router

Create backend/app/modules/billing/router.py:

"""Billing API routes."""
from __future__ import annotations

from fastapi import APIRouter, Depends

from app.core.authorization import require_org_admin, require_org_billing, require_scope
from app.core.security import UserInfo, get_current_user
from app.modules.billing.dependencies import get_billing_service
from app.modules.billing.schemas import (
    BillingAccountResponse,
    ConfigureBillingRequest,
    InvoiceListResponse,
    PurchaseRequest,
    PurchaseResponse,
    TransactionListResponse,
    UsageOut,
)
from app.modules.billing.service import BillingService

router = APIRouter(prefix="/billing", tags=["billing"])


@router.get("/account", response_model=BillingAccountResponse)
async def get_billing_account(
    user: UserInfo = Depends(get_current_user),
    service: BillingService = Depends(get_billing_service),
):
    require_org_billing(user)
    return await service.get_account(user)


@router.put("/account", response_model=BillingAccountResponse)
async def configure_billing(
    payload: ConfigureBillingRequest,
    user: UserInfo = Depends(get_current_user),
    service: BillingService = Depends(get_billing_service),
):
    require_org_admin(user)
    return await service.configure_account(user, payload)


@router.post("/purchase", response_model=PurchaseResponse)
async def purchase_module(
    payload: PurchaseRequest,
    user: UserInfo = Depends(get_current_user),
    service: BillingService = Depends(get_billing_service),
):
    require_org_admin(user)
    require_scope("billing:write")(user)
    return await service.purchase_module(user, payload.module_id)


@router.get("/transactions", response_model=TransactionListResponse)
async def list_transactions(
    user: UserInfo = Depends(get_current_user),
    service: BillingService = Depends(get_billing_service),
):
    require_org_billing(user)
    require_scope("billing:read")(user)
    return await service.list_transactions(user)


@router.get("/invoices", response_model=InvoiceListResponse)
async def list_invoices(
    user: UserInfo = Depends(get_current_user),
    service: BillingService = Depends(get_billing_service),
):
    require_org_billing(user)
    return await service.list_invoices(user)


@router.get("/usage", response_model=UsageOut)
async def get_usage(
    user: UserInfo = Depends(get_current_user),
    service: BillingService = Depends(get_billing_service),
):
    return await service.get_usage(user)
  • [ ] Step 4: Write billing service and dependencies

Create backend/app/modules/billing/service.py:

"""Billing service — payment orchestration."""
from __future__ import annotations

from app.core.security import UserInfo
from app.modules.billing.providers.base import PaymentProvider
from app.modules.billing.repository import BillingRepository
from app.modules.billing.schemas import (
    BillingAccountOut,
    BillingAccountResponse,
    ConfigureBillingRequest,
    InvoiceListResponse,
    InvoiceOut,
    PurchaseResponse,
    TransactionListResponse,
    TransactionOut,
    UsageOut,
)


class BillingService:
    def __init__(self, repo: BillingRepository, provider: PaymentProvider):
        self.repo = repo
        self.provider = provider

    async def get_account(self, user: UserInfo) -> BillingAccountResponse:
        row = await self.repo.get_billing_account(user.org_id)
        if row is None:
            return BillingAccountResponse(account=BillingAccountOut(org_id=user.org_id or ""))
        return BillingAccountResponse(account=BillingAccountOut(
            org_id=str(row["org_id"]),
            billing_email=row.get("billing_email"),
            billing_cycle=row.get("billing_cycle"),
            payment_method_last4=row.get("payment_method_last4"),
            stripe_customer_id=row.get("stripe_customer_id"),
        ))

    async def configure_account(self, user: UserInfo, req: ConfigureBillingRequest) -> BillingAccountResponse:
        row = await self.repo.upsert_billing_account(
            user.org_id,
            billing_email=req.billing_email,
            billing_cycle=req.billing_cycle,
        )
        return BillingAccountResponse(account=BillingAccountOut(
            org_id=str(row["org_id"]),
            billing_email=row.get("billing_email"),
            billing_cycle=row.get("billing_cycle"),
        ))

    async def purchase_module(self, user: UserInfo, module_id: str) -> PurchaseResponse:
        account = await self.repo.get_billing_account(user.org_id)
        if account is None:
            account = await self.repo.upsert_billing_account(user.org_id, billing_email=user.email)

        # In a real implementation, look up module price from marketplace_modules
        result = await self.provider.charge(
            account.get("stripe_customer_id", ""),
            4900,  # placeholder — resolved from module price in real flow
            {"module_id": module_id, "org_id": user.org_id},
        )
        txn = await self.repo.create_transaction(
            billing_account_id=str(account["id"]),
            module_id=module_id,
            amount_cents=4900,
            currency="USD",
            stripe_id=result.provider_id if result.success else None,
            status="completed" if result.success else "failed",
        )
        return PurchaseResponse(
            transaction_id=str(txn["id"]),
            module_id=module_id,
            amount_cents=4900,
            status=txn["status"],
        )

    async def list_transactions(self, user: UserInfo) -> TransactionListResponse:
        rows = await self.repo.list_transactions(user.org_id)
        items = [TransactionOut(
            id=str(r["id"]), module_id=str(r.get("module_id") or ""),
            amount_cents=r["amount_cents"], currency=r["currency"],
            status=r["status"], created_at=str(r["created_at"]),
        ) for r in rows]
        return TransactionListResponse(data=items, meta={"total": len(items)})

    async def list_invoices(self, user: UserInfo) -> InvoiceListResponse:
        rows = await self.repo.list_invoices(user.org_id)
        items = [InvoiceOut(
            id=str(r["id"]), amount_cents=r["amount_cents"],
            currency=r["currency"], status=r["status"],
            pdf_url=r.get("pdf_url"),
        ) for r in rows]
        return InvoiceListResponse(data=items, meta={"total": len(items)})

    async def get_usage(self, user: UserInfo) -> UsageOut:
        data = await self.repo.get_usage(user.org_id)
        return UsageOut(**data)

Create backend/app/modules/billing/dependencies.py:

"""Dependency injection for billing module."""
from __future__ import annotations

from fastapi import Depends
from sqlalchemy.ext.asyncio import AsyncSession

from app.core.database.postgres import get_session
from app.modules.billing.providers.base import PaymentProvider
from app.modules.billing.providers.offline_provider import OfflineProvider
from app.modules.billing.repository import BillingRepository
from app.modules.billing.service import BillingService
from app.settings import get_settings


def _get_payment_provider() -> PaymentProvider:
    settings = get_settings()
    billing_provider = getattr(settings, "billing_provider", "offline")
    if billing_provider == "stripe":
        from app.modules.billing.providers.stripe_provider import StripeProvider
        return StripeProvider(secret_key=getattr(settings, "stripe_secret_key", ""))
    return OfflineProvider()


def get_billing_service(session: AsyncSession = Depends(get_session)) -> BillingService:
    return BillingService(repo=BillingRepository(session), provider=_get_payment_provider())
  • [ ] Step 5: Commit
git add backend/app/modules/billing/
git commit -m "feat: billing module — router, service, repository, payment providers"

Phase 6: Config Module

Task 18: Config Module — Service, Repository, Schemas, Router

Files: - Create: backend/app/modules/config/__init__.py - Create: backend/app/modules/config/schemas.py - Create: backend/app/modules/config/defaults.py - Create: backend/app/modules/config/repository.py - Create: backend/app/modules/config/service.py - Create: backend/app/modules/config/router.py - Create: backend/app/modules/config/dependencies.py - Create: backend/config/settings.yaml

  • [ ] Step 1: Write config schemas

Create backend/app/modules/config/__init__.py:

"""Configuration management module — per-org runtime config overrides."""

Create backend/app/modules/config/schemas.py:

"""Pydantic schemas for configuration domains."""
from __future__ import annotations

from datetime import datetime
from typing import Any, Literal

from pydantic import BaseModel


class OrgProfileConfig(BaseModel):
    org_name: str = ""
    legal_name: str | None = None
    primary_domain: str | None = None
    timezone: str = "UTC"
    region: str | None = None
    billing_contact: str | None = None
    support_contact: str | None = None


class LlmEndpoint(BaseModel):
    id: str
    name: str
    kind: Literal["dense", "sparse", "reranker", "embedding", "coding", "stable-diffusion"]
    api_base_url: str
    api_key: str = ""
    general_config: dict[str, Any] = {}
    multi_lora_config: dict[str, Any] = {}
    moe_config: dict[str, Any] = {}
    reusable: bool = True


class LlmPurposeRoute(BaseModel):
    id: str
    purpose: str
    endpoint_id: str


class LlmConnectionsConfig(BaseModel):
    endpoints: list[LlmEndpoint] = []
    purpose_routes: list[LlmPurposeRoute] = []


class PlatformConnectionConfig(BaseModel):
    host: str = ""
    port: int = 0
    database: str | None = None
    username: str | None = None
    password: str | None = None
    ssl_enabled: bool = False
    ssl_cert_path: str | None = None
    connection_pool_size: int = 10
    timeout_seconds: int = 30
    extra: dict[str, Any] = {}


class PlatformConnectionStatus(BaseModel):
    connected: bool
    version: str | None = None
    uptime: str | None = None
    memory_usage: str | None = None
    active_connections: int | None = None
    last_checked: datetime | None = None


class NotificationPreference(BaseModel):
    key: str
    label: str
    description: str
    enabled: bool = True


class NotificationsConfig(BaseModel):
    preferences: list[NotificationPreference] = []


class FeatureFlag(BaseModel):
    key: str
    label: str
    description: str
    enabled: bool = True


class FeaturesConfig(BaseModel):
    flags: list[FeatureFlag] = []


class RetentionRule(BaseModel):
    id: str
    source: str
    data_type: str
    start_at: str = ""
    end_at: str = ""
    description: str = ""


class RetentionConfig(BaseModel):
    rules: list[RetentionRule] = []


class ConfigDomainMeta(BaseModel):
    id: str
    label: str
    description: str
    editable: bool


class ConfigDomainsResponse(BaseModel):
    domains: list[ConfigDomainMeta]


class ConfigResponse(BaseModel):
    data: dict[str, Any]
    meta: dict[str, Any] = {}
  • [ ] Step 2: Write config defaults and YAML

Create backend/app/modules/config/defaults.py:

"""Default values and domain metadata for configuration."""
from __future__ import annotations

from typing import Any

DOMAIN_META = [
    {"id": "org_profile", "label": "Organization Profile", "description": "Org name, domain, contacts", "editable": True},
    {"id": "llm_connections", "label": "LLM Connections", "description": "AI model endpoints and routing", "editable": True},
    {"id": "platform_postgres", "label": "PostgreSQL", "description": "Primary database connection", "editable": True},
    {"id": "platform_redis", "label": "Redis", "description": "Cache and session store", "editable": True},
    {"id": "platform_neo4j", "label": "Neo4j", "description": "Graph database connection", "editable": True},
    {"id": "platform_nats", "label": "NATS", "description": "Event streaming", "editable": True},
    {"id": "platform_vector", "label": "pgvector", "description": "Vector search connection", "editable": True},
    {"id": "notifications", "label": "Notifications", "description": "Notification preferences", "editable": True},
    {"id": "features", "label": "Feature Flags", "description": "Toggle platform features", "editable": True},
    {"id": "retention", "label": "Data Retention", "description": "Retention policies per data type", "editable": True},
]


def get_domain_defaults(domain: str) -> dict[str, Any]:
    """Return hardcoded defaults for a domain."""
    defaults: dict[str, dict[str, Any]] = {
        "org_profile": {"timezone": "UTC"},
        "llm_connections": {"endpoints": [], "purpose_routes": []},
        "platform_postgres": {"host": "postgres", "port": 5432, "database": "substrate", "connection_pool_size": 10},
        "platform_redis": {"host": "redis", "port": 6379, "ssl_enabled": False},
        "platform_neo4j": {"host": "neo4j", "port": 7687, "extra": {"bolt_tls": False}},
        "platform_nats": {"host": "nats", "port": 4222, "extra": {"jetstream_enabled": True}},
        "platform_vector": {"host": "postgres", "port": 5432, "database": "substrate"},
        "notifications": {"preferences": []},
        "features": {"flags": []},
        "retention": {"rules": []},
    }
    return defaults.get(domain, {})

Create backend/config/settings.yaml — copy the full YAML from spec section 16.9 (lines 1685-1781).

  • [ ] Step 3: Write config repository

Create backend/app/modules/config/repository.py:

"""Config data access layer — org_settings table."""
from __future__ import annotations

import json
from typing import Any

from sqlalchemy import text
from sqlalchemy.ext.asyncio import AsyncSession


class ConfigRepository:
    def __init__(self, session: AsyncSession):
        self.session = session

    async def get_org_settings(self, org_id: str, domain: str) -> dict[str, Any] | None:
        query = text(
            "SELECT settings FROM org_settings WHERE org_id = :org_id AND domain = :domain LIMIT 1"
        )
        row = (await self.session.execute(query, {"org_id": org_id, "domain": domain})).first()
        if row is None:
            return None
        return dict(row._mapping).get("settings") or {}

    async def upsert_org_settings(
        self, org_id: str, domain: str, settings: dict[str, Any], user_id: str,
    ) -> None:
        query = text("""
            INSERT INTO org_settings (org_id, domain, settings, updated_by)
            VALUES (:org_id, :domain, CAST(:settings AS jsonb), :user_id)
            ON CONFLICT (org_id, domain)
            DO UPDATE SET settings = CAST(:settings AS jsonb), updated_by = :user_id, updated_at = now()
        """)
        await self.session.execute(query, {
            "org_id": org_id, "domain": domain,
            "settings": json.dumps(settings), "user_id": user_id,
        })

    async def delete_org_settings(self, org_id: str, domain: str) -> None:
        query = text("DELETE FROM org_settings WHERE org_id = :org_id AND domain = :domain")
        await self.session.execute(query, {"org_id": org_id, "domain": domain})
  • [ ] Step 4: Write config service

Create backend/app/modules/config/service.py:

"""Config service — merge hierarchy, validate, persist."""
from __future__ import annotations

import os
from pathlib import Path
from typing import Any

import yaml

from app.modules.config.defaults import get_domain_defaults
from app.modules.config.repository import ConfigRepository


_YAML_CONFIG: dict[str, Any] | None = None


def _load_yaml_config() -> dict[str, Any]:
    global _YAML_CONFIG
    if _YAML_CONFIG is not None:
        return _YAML_CONFIG
    yaml_path = Path(__file__).parent.parent.parent.parent / "config" / "settings.yaml"
    if yaml_path.exists():
        with open(yaml_path) as f:
            _YAML_CONFIG = yaml.safe_load(f) or {}
    else:
        _YAML_CONFIG = {}
    return _YAML_CONFIG


def _load_env_config(domain: str) -> dict[str, Any]:
    """Load SUBSTRATE_{DOMAIN}__{KEY} env vars."""
    prefix = f"SUBSTRATE_{domain.upper()}__"
    result: dict[str, Any] = {}
    for key, value in os.environ.items():
        if key.startswith(prefix):
            config_key = key[len(prefix):].lower()
            result[config_key] = value
    return result


class ConfigService:
    def __init__(self, repo: ConfigRepository):
        self.repo = repo

    async def get_merged_config(self, org_id: str, domain: str) -> dict[str, Any]:
        defaults = get_domain_defaults(domain)
        yaml_config = _load_yaml_config().get(domain, {})
        env_config = _load_env_config(domain)
        db_config = await self.repo.get_org_settings(org_id, domain) or {}
        return {**defaults, **yaml_config, **env_config, **db_config}

    async def update_config(
        self, org_id: str, domain: str, patch: dict[str, Any], user_id: str,
    ) -> dict[str, Any]:
        current_db = await self.repo.get_org_settings(org_id, domain) or {}
        updated_db = {**current_db, **patch}
        await self.repo.upsert_org_settings(org_id, domain, updated_db, user_id)
        return await self.get_merged_config(org_id, domain)

    async def reset_config(self, org_id: str, domain: str) -> dict[str, Any]:
        await self.repo.delete_org_settings(org_id, domain)
        return await self.get_merged_config(org_id, domain)
  • [ ] Step 5: Write config router and dependencies

Create backend/app/modules/config/router.py:

"""Config management API routes."""
from __future__ import annotations

from typing import Any

from fastapi import APIRouter, Depends

from app.core.authorization import require_org_admin
from app.core.security import UserInfo, get_current_user
from app.modules.config.defaults import DOMAIN_META
from app.modules.config.dependencies import get_config_service
from app.modules.config.schemas import ConfigDomainsResponse, ConfigDomainMeta, ConfigResponse
from app.modules.config.service import ConfigService

router = APIRouter(prefix="/config", tags=["config"])


@router.get("/domains", response_model=ConfigDomainsResponse)
async def list_domains(user: UserInfo = Depends(get_current_user)):
    return ConfigDomainsResponse(domains=[ConfigDomainMeta(**d) for d in DOMAIN_META])


@router.get("/{domain}", response_model=ConfigResponse)
async def get_config(
    domain: str,
    user: UserInfo = Depends(get_current_user),
    service: ConfigService = Depends(get_config_service),
):
    data = await service.get_merged_config(user.org_id, domain)
    return ConfigResponse(data=data)


@router.patch("/{domain}", response_model=ConfigResponse)
async def update_config(
    domain: str,
    patch: dict[str, Any],
    user: UserInfo = Depends(get_current_user),
    service: ConfigService = Depends(get_config_service),
):
    require_org_admin(user)
    data = await service.update_config(user.org_id, domain, patch, user.sub)
    return ConfigResponse(data=data)


@router.delete("/{domain}", response_model=ConfigResponse)
async def reset_config(
    domain: str,
    user: UserInfo = Depends(get_current_user),
    service: ConfigService = Depends(get_config_service),
):
    require_org_admin(user)
    data = await service.reset_config(user.org_id, domain)
    return ConfigResponse(data=data)

Create backend/app/modules/config/dependencies.py:

"""Dependency injection for config module."""
from __future__ import annotations

from fastapi import Depends
from sqlalchemy.ext.asyncio import AsyncSession

from app.core.database.postgres import get_session
from app.modules.config.repository import ConfigRepository
from app.modules.config.service import ConfigService


def get_config_service(session: AsyncSession = Depends(get_session)) -> ConfigService:
    return ConfigService(repo=ConfigRepository(session))
  • [ ] Step 6: Commit
git add backend/app/modules/config/ backend/config/settings.yaml
git commit -m "feat: config module — per-org runtime config with override hierarchy"

Phase 7: Plugins Module (Sync/Evaluate Triggers)

Task 19: Plugins Router and Service

Files: - Create: backend/app/modules/plugins/__init__.py - Create: backend/app/modules/plugins/router.py - Create: backend/app/modules/plugins/service.py - Create: backend/app/modules/plugins/dependencies.py

  • [ ] Step 1: Write the plugins module

Create backend/app/modules/plugins/__init__.py:

"""Plugins module — sync/evaluate triggers and plugin registry API."""

Create backend/app/modules/plugins/service.py:

"""Plugins service — sync triggers, evaluate triggers, registry queries."""
from __future__ import annotations

from typing import Any

from app.core.plugins.registry import PluginRegistry


class PluginsService:
    def __init__(self, registry: PluginRegistry):
        self.registry = registry

    def list_registry(self) -> list[dict[str, Any]]:
        return self.registry.list_all()

Create backend/app/modules/plugins/router.py:

"""Plugins API routes — sync/evaluate triggers, registry."""
from __future__ import annotations

from typing import Any

from fastapi import APIRouter, Depends

from app.core.authorization import require_org_admin, require_scope
from app.core.security import UserInfo, get_current_user
from app.modules.plugins.dependencies import get_plugins_service
from app.modules.plugins.service import PluginsService

router = APIRouter(prefix="/plugins", tags=["plugins"])


@router.get("/registry")
async def list_registry(
    user: UserInfo = Depends(get_current_user),
    service: PluginsService = Depends(get_plugins_service),
):
    require_org_admin(user)
    return {"plugins": service.list_registry()}

Create backend/app/modules/plugins/dependencies.py:

"""Dependency injection for plugins module."""
from __future__ import annotations

from app.core.plugins.registry import PluginRegistry
from app.modules.plugins.service import PluginsService

_registry = PluginRegistry()


def get_plugin_registry() -> PluginRegistry:
    return _registry


def get_plugins_service() -> PluginsService:
    return PluginsService(registry=_registry)
  • [ ] Step 2: Commit
git add backend/app/modules/plugins/
git commit -m "feat: plugins module — registry API and sync/evaluate triggers"

Phase 8: Register New Routers in main.py

Task 20: Wire All New Modules into FastAPI

Files: - Modify: backend/app/main.py - Modify: backend/app/settings.py

  • [ ] Step 1: Add new imports and routers to main.py

Add these imports after the existing router imports in backend/app/main.py:

from app.modules.billing.router import router as billing_router
from app.modules.licensing.router import router as licensing_router
from app.modules.plugins.router import router as plugins_router
from app.modules.config.router import router as config_router

Add to the MODULES list:

MODULES = [
    auth_router,
    community_router,
    graph_router,
    policy_router,
    memory_router,
    queue_router,
    pull_request_router,
    simulation_router,
    search_router,
    notification_router,
    dashboard_router,
    team_admin_router,
    iam_router,
    marketplace_router,
    connectors_router,
    policy_runtime_router,
    billing_router,
    licensing_router,
    plugins_router,
    config_router,
]
  • [ ] Step 2: Add new settings to settings.py

In backend/app/settings.py, add:

    billing_provider: str = "offline"  # "stripe" | "offline"
    stripe_secret_key: str | None = None
    stripe_webhook_secret: str | None = None
    config_encryption_key: str | None = None  # Fernet key for secret fields
  • [ ] Step 3: Run the app to verify no import errors

Run: cd /home/dany/substrate/backend && python -c "from app.main import create_app; app = create_app(); print('OK:', len(app.routes), 'routes')" Expected: OK: <number> routes (no import errors)

  • [ ] Step 4: Commit
git add backend/app/main.py backend/app/settings.py
git commit -m "feat: register billing, licensing, plugins, config routers"

Phase 9: Built-in Modules

Task 21: GDPR Policy Pack with Rego Files

Files: - Create: backend/app/policy_packs/gdpr/__init__.py - Create: backend/app/policy_packs/gdpr/pack.py - Create: backend/app/policy_packs/gdpr/policies/data_classification.rego - Create: backend/app/policy_packs/gdpr/policies/retention.rego - Create: backend/app/policy_packs/gdpr/policies/consent.rego - Create: backend/app/policy_packs/gdpr/policies/cross_border.rego

  • [ ] Step 1: Write the 4 Rego policy files

Create backend/app/policy_packs/gdpr/policies/data_classification.rego:

package substrate.gdpr.data_classification

default allow := false
default violations := []

allow if {
    input.service.data_classification != ""
    input.service.data_owner != ""
}

violations := [v |
    input.service.data_classification == ""
    v := {"rule": "GDPR-001", "message": "Service missing data classification", "severity": "high"}
] | [v |
    input.service.data_owner == ""
    v := {"rule": "GDPR-001", "message": "Service missing data owner", "severity": "high"}
]

Create backend/app/policy_packs/gdpr/policies/retention.rego:

package substrate.gdpr.retention

default allow := false
default violations := []

max_retention_days := object.get(input.config, "max_retention_days", 2555)

allow if {
    input.service.retention_policy_set
    input.service.retention_days <= max_retention_days
    input.service.deletion_mechanism_exists
}

violations := [v |
    not input.service.retention_policy_set
    v := {"rule": "GDPR-002", "message": "No retention policy defined", "severity": "high"}
] | [v |
    input.service.retention_days > max_retention_days
    v := {"rule": "GDPR-002", "message": sprintf("Retention exceeds %d days", [max_retention_days]), "severity": "medium"}
] | [v |
    not input.service.deletion_mechanism_exists
    v := {"rule": "GDPR-002", "message": "No deletion mechanism", "severity": "high"}
]

Create backend/app/policy_packs/gdpr/policies/consent.rego:

package substrate.gdpr.consent

default allow := false
default violations := []

allow if {
    not input.service.processes_personal_data
}

allow if {
    input.service.processes_personal_data
    input.service.consent_mechanism != ""
    input.service.lawful_basis != ""
}

violations := [v |
    input.service.processes_personal_data
    input.service.consent_mechanism == ""
    v := {"rule": "GDPR-003", "message": "Missing consent mechanism for personal data processing", "severity": "critical"}
] | [v |
    input.service.processes_personal_data
    input.service.lawful_basis == ""
    v := {"rule": "GDPR-003", "message": "Missing lawful basis for processing", "severity": "critical"}
]

Create backend/app/policy_packs/gdpr/policies/cross_border.rego:

package substrate.gdpr.cross_border

default allow := false
default violations := []

eu_regions := object.get(input.config, "eu_regions", ["eu-west-1", "eu-central-1", "eu-north-1", "eu-south-1"])

allow if {
    input.service.deployment_region in eu_regions
}

allow if {
    input.service.adequacy_decision
}

allow if {
    input.service.standard_contractual_clauses
}

violations := [v |
    not input.service.deployment_region in eu_regions
    not input.service.adequacy_decision
    not input.service.standard_contractual_clauses
    v := {"rule": "GDPR-004", "message": "Cross-border transfer without adequate safeguards", "severity": "critical"}
]
  • [ ] Step 2: Write the GDPRPolicyPack class

Create backend/app/policy_packs/gdpr/__init__.py:

"""GDPR Policy Pack — 4 Rego policies for GDPR compliance evaluation."""

Create backend/app/policy_packs/gdpr/pack.py:

"""GDPRPolicyPack — loads 4 Rego policies into OPA and evaluates compliance."""
from __future__ import annotations

from pathlib import Path
from typing import Any

from app.core.plugins.base import EvalContext, EvalResult, PluginMeta, PolicyPackBase

_POLICIES_DIR = Path(__file__).parent / "policies"

_POLICY_FILES = [
    ("substrate/gdpr/data_classification", "data_classification.rego"),
    ("substrate/gdpr/retention", "retention.rego"),
    ("substrate/gdpr/consent", "consent.rego"),
    ("substrate/gdpr/cross_border", "cross_border.rego"),
]


class GDPRPolicyPack(PolicyPackBase):
    meta = PluginMeta(
        id="policy-pack-gdpr",
        name="GDPR Compliance Pack",
        version="1.0.0",
        type="policy_pack",
        entry_point="module.pack:GDPRPolicyPack",
        category="compliance",
        pricing="free",
        capabilities=["data_classification", "retention", "consent", "cross_border"],
        config_schema={
            "type": "object",
            "properties": {
                "eu_regions": {"type": "array", "items": {"type": "string"}},
                "max_retention_days": {"type": "integer", "default": 2555},
                "require_dpo": {"type": "boolean", "default": True},
            },
        },
    )

    def __init__(self) -> None:
        self._opa_client: Any = None
        self._config: dict[str, Any] = {}

    async def activate(self, config: dict[str, Any]) -> None:
        self._config = config
        if self._opa_client is None:
            return
        for policy_path, filename in _POLICY_FILES:
            rego = (_POLICIES_DIR / filename).read_text()
            policy_id = f"{policy_path}.rego"
            await self._opa_client.upsert_policy(policy_id, rego)

    async def evaluate(self, context: EvalContext) -> EvalResult:
        if self._opa_client is None:
            return EvalResult(compliant=False, violations=[{"message": "OPA client not configured"}])

        all_violations: list[dict[str, Any]] = []
        for policy_path, _ in _POLICY_FILES:
            input_data = {**context.input_data, "config": {**self._config, **context.config}}
            decision = await self._opa_client.evaluate(f"{policy_path}/allow", input_data)
            allowed = decision.get("result") if isinstance(decision, dict) else False
            if not allowed:
                violations_result = await self._opa_client.evaluate(f"{policy_path}/violations", input_data)
                violations = violations_result.get("result", []) if isinstance(violations_result, dict) else []
                all_violations.extend(violations if isinstance(violations, list) else [])

        return EvalResult(
            compliant=len(all_violations) == 0,
            violations=all_violations,
        )

    def get_policies(self) -> list[dict[str, Any]]:
        return [
            {"id": "GDPR-001", "name": "Data Classification", "path": "substrate/gdpr/data_classification"},
            {"id": "GDPR-002", "name": "Data Retention", "path": "substrate/gdpr/retention"},
            {"id": "GDPR-003", "name": "Consent Mechanism", "path": "substrate/gdpr/consent"},
            {"id": "GDPR-004", "name": "Cross-Border Transfer", "path": "substrate/gdpr/cross_border"},
        ]

    async def deactivate(self) -> None:
        if self._opa_client is None:
            return
        for policy_path, _ in _POLICY_FILES:
            await self._opa_client.remove_policy(f"{policy_path}.rego")
  • [ ] Step 3: Commit
git add backend/app/policy_packs/
git commit -m "feat: GDPR policy pack — 4 Rego policies + PolicyPackBase implementation"

Task 22: GitHub Connector as DataConnectorBase

Files: - Create: backend/app/connectors/github/__init__.py - Create: backend/app/connectors/github/connector.py

  • [ ] Step 1: Write the connector wrapping existing GitHubConnectorPlugin

Create backend/app/connectors/github/__init__.py:

"""GitHub Data Connector — repos, pages, projects v2, actions, packages."""

Create backend/app/connectors/github/connector.py:

"""GitHubConnector wrapping existing GitHubConnectorPlugin as DataConnectorBase."""
from __future__ import annotations

from typing import Any

from app.core.plugins.base import DataConnectorBase, HealthStatus, PluginMeta, SyncResult
from app.modules.connectors.plugins import GitHubConnectorPlugin, ConnectorAuthContext


class GitHubConnector(DataConnectorBase):
    meta = PluginMeta(
        id="connector-github",
        name="GitHub Data Connector",
        version="1.0.0",
        type="data_connector",
        entry_point="module.connector:GitHubConnector",
        category="source_control",
        pricing="free",
        capabilities=["repos", "pages", "projects_v2", "actions", "packages"],
        config_schema={
            "type": "object",
            "properties": {
                "github_token": {"type": "string", "format": "password"},
                "org_name": {"type": "string"},
                "sync_interval_minutes": {"type": "integer", "default": 60},
                "include_repos": {"type": "array", "items": {"type": "string"}},
                "exclude_repos": {"type": "array", "items": {"type": "string"}},
            },
            "required": ["github_token", "org_name"],
        },
    )

    def __init__(self) -> None:
        self._plugin = GitHubConnectorPlugin()
        self._config: dict[str, Any] = {}

    async def configure(self, config: dict[str, Any]) -> None:
        self._config = config

    async def sync(self, installation_id: str) -> SyncResult:
        result = await self._plugin.sync(self._config)
        return SyncResult(
            items_synced=result.details_json.get("repo_count", 0),
            resources=result.synced_resources,
            details=result.details_json,
        )

    async def health_check(self, config: dict[str, Any]) -> HealthStatus:
        try:
            result = await self._plugin.sync(config, dry_run=True)
            return HealthStatus(healthy=True, details=result.details_json)
        except Exception as e:
            return HealthStatus(healthy=False, message=str(e))
  • [ ] Step 2: Commit
git add backend/app/connectors/
git commit -m "feat: GitHubConnector wrapping existing plugin as DataConnectorBase"

Phase 10: OPA Infrastructure

Task 23: OPA Docker Setup

Files: - Create: infra/opa/Dockerfile - Create: infra/opa/docker-compose.yml - Modify: docker-compose.yml (add OPA include)

  • [ ] Step 1: Create OPA Dockerfile

Create infra/opa/Dockerfile:

FROM openpolicyagent/opa:1.4.2
EXPOSE 8181
ENTRYPOINT ["/opa"]
CMD ["run", "--server", "--addr=0.0.0.0:8181", "--log-level=info"]
  • [ ] Step 2: Create OPA docker-compose

Create infra/opa/docker-compose.yml:

services:
  opa:
    build:
      context: .
      dockerfile: Dockerfile
    container_name: substrate-opa
    ports:
      - "8181:8181"
    healthcheck:
      test: ["CMD", "wget", "--quiet", "--tries=1", "--spider", "http://localhost:8181/health"]
      interval: 10s
      timeout: 5s
      retries: 5
    networks:
      - substrate-net

networks:
  substrate-net:
    external: true
  • [ ] Step 3: Add OPA to root docker-compose.yml

In docker-compose.yml, add under include::

  - path: infra/opa/docker-compose.yml

And add opa to the backend's depends_on.

  • [ ] Step 4: Commit
git add infra/opa/ docker-compose.yml
git commit -m "feat: OPA service in Docker infrastructure"

Phase 11: OpenAPI Spec Generation

Task 24: Generate OpenAPI Spec with All New Endpoints

After all backend routers are registered and working, regenerate the OpenAPI spec:

  • [ ] Step 1: Run the generation script

Run: cd /home/dany/substrate && bash api/generate-types.sh Expected: Updates api/openapi.yml, ui/src/types/openapi.generated.ts, backend/app/types/openapi_generated.py

  • [ ] Step 2: Verify the new endpoints appear

Run: grep -c "operationId" api/openapi.yml Expected: Count should be significantly higher than before (was ~20, now ~60+)

  • [ ] Step 3: Commit generated files
git add api/openapi.yml ui/src/types/openapi.generated.ts backend/app/types/openapi_generated.py
git commit -m "feat: regenerate OpenAPI spec + TS/Python types with all new endpoints"

Phase 12: Frontend — API Client Wrappers

Task 25: Frontend API Clients

Files: - Create: ui/src/api/billingApi.ts - Create: ui/src/api/licensingApi.ts - Create: ui/src/api/pluginsApi.ts - Create: ui/src/api/orgApi.ts - Create: ui/src/api/configApi.ts

  • [ ] Step 1: Write all 5 API client files

Create ui/src/api/billingApi.ts:

import type { components } from '../types/openapi.generated'
import { api } from './client'

type BillingAccountResponse = components['schemas']['BillingAccountResponse']
type TransactionListResponse = components['schemas']['TransactionListResponse']
type InvoiceListResponse = components['schemas']['InvoiceListResponse']
type UsageOut = components['schemas']['UsageOut']
type PurchaseResponse = components['schemas']['PurchaseResponse']

export const billingApi = {
  getAccount: () => api<BillingAccountResponse>('/billing/account'),
  configureAccount: (data: { billing_email?: string; billing_cycle?: string }) =>
    api<BillingAccountResponse>('/billing/account', { method: 'PUT', body: JSON.stringify(data) }),
  purchase: (moduleId: string) =>
    api<PurchaseResponse>('/billing/purchase', { method: 'POST', body: JSON.stringify({ module_id: moduleId }) }),
  getTransactions: () => api<TransactionListResponse>('/billing/transactions'),
  getInvoices: () => api<InvoiceListResponse>('/billing/invoices'),
  getUsage: () => api<UsageOut>('/billing/usage'),
}

Create ui/src/api/licensingApi.ts:

import type { components } from '../types/openapi.generated'
import { api } from './client'

type OrgLicenseResponse = components['schemas']['OrgLicenseResponse']
type EntitlementListResponse = components['schemas']['EntitlementListResponse']
type EntitlementCheckResponse = components['schemas']['EntitlementCheckResponse']

export const licensingApi = {
  getLicense: () => api<OrgLicenseResponse>('/licensing/license'),
  importLicense: (token: string) =>
    api<unknown>('/licensing/license/import', { method: 'POST', body: JSON.stringify({ license_token: token }) }),
  getEntitlements: () => api<EntitlementListResponse>('/licensing/entitlements'),
  checkEntitlement: (moduleId: string) => api<EntitlementCheckResponse>(`/licensing/entitlements/${moduleId}`),
  importEntitlement: (token: string) =>
    api<unknown>('/licensing/entitlements/import', { method: 'POST', body: JSON.stringify({ entitlement_token: token }) }),
}

Create ui/src/api/pluginsApi.ts:

import { api } from './client'

export const pluginsApi = {
  getRegistry: () => api<{ plugins: Array<{ id: string; name: string; type: string; version: string }> }>('/plugins/registry'),
}

Create ui/src/api/orgApi.ts:

import { api } from './client'

export const orgApi = {
  getCurrentOrg: () => api<unknown>('/iam/me/access'),
  listOrgs: () => api<unknown>('/iam/orgs'),
  listTeams: (orgId: string) => api<unknown>(`/iam/orgs/${orgId}/teams`),
}

Create ui/src/api/configApi.ts:

import { api } from './client'

interface ConfigDomainsResponse { domains: Array<{ id: string; label: string; description: string; editable: boolean }> }
interface ConfigResponse { data: Record<string, unknown>; meta?: Record<string, unknown> }

export const configApi = {
  listDomains: () => api<ConfigDomainsResponse>('/config/domains'),
  getConfig: (domain: string, reveal?: boolean) =>
    api<ConfigResponse>(`/config/${domain}${reveal ? '?reveal=true' : ''}`),
  updateConfig: (domain: string, patch: Record<string, unknown>) =>
    api<ConfigResponse>(`/config/${domain}`, { method: 'PATCH', body: JSON.stringify(patch) }),
  resetConfig: (domain: string) =>
    api<ConfigResponse>(`/config/${domain}`, { method: 'DELETE' }),
}
  • [ ] Step 2: Commit
git add ui/src/api/billingApi.ts ui/src/api/licensingApi.ts ui/src/api/pluginsApi.ts ui/src/api/orgApi.ts ui/src/api/configApi.ts
git commit -m "feat: frontend API clients — billing, licensing, plugins, org, config"

Phase 13: Frontend — Settings Decomposition

Task 26: Settings Layout and Route Structure

Files: - Create: ui/src/pages/settings/SettingsLayout.tsx - Create: ui/src/pages/settings/index.ts - Create: ui/src/pages/settings/MarketplacePage.tsx - Create: ui/src/pages/settings/BillingPage.tsx - Create: ui/src/pages/settings/OrgSettingsPage.tsx - Create: ui/src/pages/settings/ProfileSettingsPage.tsx - Create: ui/src/pages/settings/ApiTokensPage.tsx - Create: ui/src/pages/settings/LlmConnectionsPage.tsx - Create: ui/src/pages/settings/PlatformDataPage.tsx - Create: ui/src/pages/settings/PreferencesPage.tsx - Create: ui/src/pages/settings/TeamSettingsPage.tsx

This is a large frontend task. Each page extracts the relevant section from the existing monolithic SettingsPage.tsx and wires it to the new API clients. The detailed code for each page follows the pattern:

  1. Read the current SettingsPage.tsx
  2. Extract the relevant section's state + JSX
  3. Replace local state with API calls (e.g., configApi.getConfig('notifications'))
  4. Export as a focused page component

  5. [ ] Step 1: Create SettingsLayout

Create ui/src/pages/settings/SettingsLayout.tsx:

import { Outlet, NavLink } from 'react-router-dom'

const NAV_ITEMS = [
  { to: '/settings/org', label: 'Organization' },
  { to: '/settings/teams', label: 'Teams' },
  { to: '/settings/profile', label: 'Profile' },
  { to: '/settings/tokens', label: 'API Tokens' },
  { to: '/settings/marketplace', label: 'Marketplace' },
  { to: '/settings/billing', label: 'Billing' },
  { to: '/settings/llm', label: 'LLM Connections' },
  { to: '/settings/platform', label: 'Platform Data' },
  { to: '/settings/preferences', label: 'Preferences' },
]

export default function SettingsLayout() {
  return (
    <div style={{ display: 'flex', gap: '2rem', padding: '1.5rem' }}>
      <nav style={{ minWidth: '200px' }}>
        <h2>Settings</h2>
        <ul style={{ listStyle: 'none', padding: 0 }}>
          {NAV_ITEMS.map(item => (
            <li key={item.to} style={{ marginBottom: '0.5rem' }}>
              <NavLink to={item.to}>{item.label}</NavLink>
            </li>
          ))}
        </ul>
      </nav>
      <main style={{ flex: 1 }}>
        <Outlet />
      </main>
    </div>
  )
}
  • [ ] Step 2: Create stub pages

Each page follows this pattern (showing MarketplacePage as example):

Create ui/src/pages/settings/MarketplacePage.tsx:

import { useEffect, useState } from 'react'
import { substrateApi } from '../../api/substrateApi'
import type { MarketplaceCatalogItem, InstalledMarketplaceModuleApi } from '../../api/substrateApi'

type Tab = 'shop' | 'installed' | 'requests'

export default function MarketplacePage() {
  const [tab, setTab] = useState<Tab>('shop')
  const [catalog, setCatalog] = useState<MarketplaceCatalogItem[]>([])
  const [installed, setInstalled] = useState<InstalledMarketplaceModuleApi[]>([])

  useEffect(() => {
    substrateApi.listMarketplaceCatalog().then(r => setCatalog(r.data))
    substrateApi.listInstalledMarketplaceModules().then(r => setInstalled(r.data))
  }, [])

  return (
    <div>
      <h2>Marketplace</h2>
      <div style={{ display: 'flex', gap: '1rem', marginBottom: '1rem' }}>
        <button onClick={() => setTab('shop')} style={{ fontWeight: tab === 'shop' ? 'bold' : 'normal' }}>Shop</button>
        <button onClick={() => setTab('installed')} style={{ fontWeight: tab === 'installed' ? 'bold' : 'normal' }}>Installed</button>
        <button onClick={() => setTab('requests')} style={{ fontWeight: tab === 'requests' ? 'bold' : 'normal' }}>Requests</button>
      </div>
      {tab === 'shop' && (
        <div>
          {catalog.map(m => (
            <div key={m.module_key} style={{ border: '1px solid #333', padding: '1rem', marginBottom: '0.5rem' }}>
              <strong>{m.name}</strong>  {m.billing_mode === 'free' ? 'Free' : `$${(m.price_cents || 0) / 100}`}
              <p>{m.summary}</p>
            </div>
          ))}
        </div>
      )}
      {tab === 'installed' && (
        <div>
          {installed.map(m => (
            <div key={String(m.id)} style={{ border: '1px solid #333', padding: '1rem', marginBottom: '0.5rem' }}>
              <strong>{m.module_name}</strong>  {m.install_state}
            </div>
          ))}
        </div>
      )}
      {tab === 'requests' && <p>Module requests coming soon.</p>}
    </div>
  )
}

Create similar stubs for each page. The key wiring:

Page API Source
BillingPage.tsx billingApi.getAccount(), billingApi.getUsage(), billingApi.getTransactions()
OrgSettingsPage.tsx configApi.getConfig('org_profile'), configApi.updateConfig('org_profile', ...)
ProfileSettingsPage.tsx substrateApi.getMyProfile()
ApiTokensPage.tsx substrateApi.listApiTokens(), substrateApi.createApiToken(...)
LlmConnectionsPage.tsx configApi.getConfig('llm_connections')
PlatformDataPage.tsx configApi.getConfig('platform_postgres'), etc.
PreferencesPage.tsx configApi.getConfig('notifications'), configApi.getConfig('features')
TeamSettingsPage.tsx orgApi.listTeams(orgId)
  • [ ] Step 3: Create index barrel export

Create ui/src/pages/settings/index.ts:

export { default as SettingsLayout } from './SettingsLayout'
export { default as OrgSettingsPage } from './OrgSettingsPage'
export { default as TeamSettingsPage } from './TeamSettingsPage'
export { default as ProfileSettingsPage } from './ProfileSettingsPage'
export { default as ApiTokensPage } from './ApiTokensPage'
export { default as MarketplacePage } from './MarketplacePage'
export { default as BillingPage } from './BillingPage'
export { default as LlmConnectionsPage } from './LlmConnectionsPage'
export { default as PlatformDataPage } from './PlatformDataPage'
export { default as PreferencesPage } from './PreferencesPage'
  • [ ] Step 4: Commit
git add ui/src/pages/settings/
git commit -m "feat: decompose SettingsPage into 10 focused settings pages"

Phase 14: Keycloak Realm Restructuring

Task 27: Update Keycloak Realm Config

Files: - Modify: infra/keycloak/substrate-realm.json

  • [ ] Step 1: Add org claim mappers

Add new protocol mappers to the substrate-ui client in substrate-realm.json:

{
  "name": "org_id",
  "protocol": "openid-connect",
  "protocolMapper": "oidc-usermodel-attribute-mapper",
  "config": {
    "user.attribute": "org_id",
    "claim.name": "org_id",
    "id.token.claim": "true",
    "access.token.claim": "true",
    "jsonType.label": "String"
  }
},
{
  "name": "org_slug",
  "protocol": "openid-connect",
  "protocolMapper": "oidc-usermodel-attribute-mapper",
  "config": {
    "user.attribute": "org_slug",
    "claim.name": "org_slug",
    "id.token.claim": "true",
    "access.token.claim": "true",
    "jsonType.label": "String"
  }
},
{
  "name": "org_role",
  "protocol": "openid-connect",
  "protocolMapper": "oidc-usermodel-attribute-mapper",
  "config": {
    "user.attribute": "org_role",
    "claim.name": "org_role",
    "id.token.claim": "true",
    "access.token.claim": "true",
    "jsonType.label": "String"
  }
}
  • [ ] Step 2: Replace hardcoded GitHub IdP secrets with env vars

In the identityProviders section, change:

"clientId": "${GITHUB_OAUTH_CLIENT_ID}",
"clientSecret": "${GITHUB_OAUTH_CLIENT_SECRET}"
  • [ ] Step 3: Add org attributes to demo users

For each demo user, add attributes:

"attributes": {
  "org_id": ["c9000000-0000-0000-0000-000000000001"],
  "org_slug": ["substrate-default"],
  "org_role": ["admin"]
}
  • [ ] Step 4: Commit
git add infra/keycloak/substrate-realm.json
git commit -m "feat: Keycloak realm — org claim mappers, env var IdP, user attributes"

Phase 15: Keycloak GitHub SPI Mapper

Task 28: Java Maven Project for GitHub Data Mapper

Files: - Create: infra/keycloak/spi/github-mapper/pom.xml - Create: infra/keycloak/spi/github-mapper/src/main/java/com/substrate/keycloak/GitHubDataMapperFactory.java - Create: infra/keycloak/spi/github-mapper/src/main/java/com/substrate/keycloak/GitHubDataMapper.java

  • [ ] Step 1: Create Maven pom.xml

Create infra/keycloak/spi/github-mapper/pom.xml:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.substrate.keycloak</groupId>
    <artifactId>github-data-mapper</artifactId>
    <version>1.0.0</version>
    <packaging>jar</packaging>

    <properties>
        <maven.compiler.source>17</maven.compiler.source>
        <maven.compiler.target>17</maven.compiler.target>
        <keycloak.version>26.0.0</keycloak.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.keycloak</groupId>
            <artifactId>keycloak-server-spi</artifactId>
            <version>${keycloak.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.keycloak</groupId>
            <artifactId>keycloak-server-spi-private</artifactId>
            <version>${keycloak.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.keycloak</groupId>
            <artifactId>keycloak-services</artifactId>
            <version>${keycloak.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
            <version>2.17.0</version>
            <scope>provided</scope>
        </dependency>
    </dependencies>
</project>
  • [ ] Step 2: Create the mapper factory

Create infra/keycloak/spi/github-mapper/src/main/java/com/substrate/keycloak/GitHubDataMapperFactory.java:

package com.substrate.keycloak;

import org.keycloak.broker.provider.IdentityProviderMapper;
import org.keycloak.models.IdentityProviderMapperModel;
import org.keycloak.models.KeycloakSession;
import org.keycloak.models.KeycloakSessionFactory;
import org.keycloak.provider.ProviderConfigProperty;

import java.util.ArrayList;
import java.util.List;

public class GitHubDataMapperFactory implements org.keycloak.broker.provider.IdentityProviderMapperFactory {

    public static final String PROVIDER_ID = "github-data-mapper";

    private static final List<ProviderConfigProperty> CONFIG_PROPERTIES = new ArrayList<>();

    static {
        ProviderConfigProperty fetchRepos = new ProviderConfigProperty();
        fetchRepos.setName("fetchRepos");
        fetchRepos.setLabel("Fetch Repositories");
        fetchRepos.setType(ProviderConfigProperty.BOOLEAN_TYPE);
        fetchRepos.setDefaultValue("true");
        CONFIG_PROPERTIES.add(fetchRepos);

        ProviderConfigProperty maxRepos = new ProviderConfigProperty();
        maxRepos.setName("maxRepos");
        maxRepos.setLabel("Max Repositories");
        maxRepos.setType(ProviderConfigProperty.STRING_TYPE);
        maxRepos.setDefaultValue("100");
        CONFIG_PROPERTIES.add(maxRepos);

        ProviderConfigProperty fetchProjects = new ProviderConfigProperty();
        fetchProjects.setName("fetchProjects");
        fetchProjects.setLabel("Fetch Projects v2");
        fetchProjects.setType(ProviderConfigProperty.BOOLEAN_TYPE);
        fetchProjects.setDefaultValue("true");
        CONFIG_PROPERTIES.add(fetchProjects);
    }

    @Override
    public String getId() { return PROVIDER_ID; }

    @Override
    public String getDisplayCategory() { return "GitHub Data Enrichment"; }

    @Override
    public String getDisplayType() { return "GitHub Data Mapper"; }

    @Override
    public String getHelpText() { return "Fetches GitHub repos, pages, projects, actions, packages on OAuth login"; }

    @Override
    public List<ProviderConfigProperty> getConfigProperties() { return CONFIG_PROPERTIES; }

    @Override
    public IdentityProviderMapper create(KeycloakSession session) {
        return new GitHubDataMapper(session);
    }

    @Override
    public void init(org.keycloak.Config.Scope config) {}

    @Override
    public void postInit(KeycloakSessionFactory factory) {}

    @Override
    public void close() {}

    @Override
    public String[] getCompatibleProviders() { return new String[]{"github"}; }
}
  • [ ] Step 3: Create the mapper implementation

Create infra/keycloak/spi/github-mapper/src/main/java/com/substrate/keycloak/GitHubDataMapper.java:

package com.substrate.keycloak;

import org.keycloak.broker.provider.AbstractIdentityProviderMapper;
import org.keycloak.broker.provider.BrokeredIdentityContext;
import org.keycloak.models.*;
import org.keycloak.provider.ProviderConfigProperty;

import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.util.List;

public class GitHubDataMapper extends AbstractIdentityProviderMapper {

    private final KeycloakSession session;

    public GitHubDataMapper(KeycloakSession session) {
        this.session = session;
    }

    @Override
    public String getId() { return GitHubDataMapperFactory.PROVIDER_ID; }

    @Override
    public String getDisplayCategory() { return "GitHub Data Enrichment"; }

    @Override
    public String getDisplayType() { return "GitHub Data Mapper"; }

    @Override
    public String getHelpText() { return "Fetches GitHub data on login"; }

    @Override
    public List<ProviderConfigProperty> getConfigProperties() {
        return new GitHubDataMapperFactory().getConfigProperties();
    }

    @Override
    public String[] getCompatibleProviders() { return new String[]{"github"}; }

    @Override
    public void preprocessFederatedIdentity(
            KeycloakSession session,
            RealmModel realm,
            IdentityProviderMapperModel mapperModel,
            BrokeredIdentityContext context) {

        String token = context.getContextData().get("TOKEN") != null
                ? context.getContextData().get("TOKEN").toString() : null;
        if (token == null) return;

        try {
            HttpClient client = HttpClient.newHttpClient();

            // Fetch repos count
            if ("true".equals(mapperModel.getConfig().getOrDefault("fetchRepos", "true"))) {
                String reposJson = fetchGitHub(client, token, "/user/repos?per_page=1");
                context.setUserAttribute("github_repos_summary", reposJson.substring(0, Math.min(reposJson.length(), 500)));
            }

            // Fetch user info for username
            String userJson = fetchGitHub(client, token, "/user");
            context.setUserAttribute("github_user_data", userJson.substring(0, Math.min(userJson.length(), 500)));

        } catch (Exception e) {
            // Log but don't fail login
            System.err.println("GitHubDataMapper error: " + e.getMessage());
        }
    }

    private String fetchGitHub(HttpClient client, String token, String path) throws Exception {
        HttpRequest request = HttpRequest.newBuilder()
                .uri(URI.create("https://api.github.com" + path))
                .header("Authorization", "Bearer " + token)
                .header("Accept", "application/vnd.github+json")
                .GET()
                .build();
        HttpResponse<String> response = client.send(request, HttpResponse.BodyHandlers.ofString());
        return response.body();
    }
}
  • [ ] Step 4: Create SPI service file

Create infra/keycloak/spi/github-mapper/src/main/resources/META-INF/services/org.keycloak.broker.provider.IdentityProviderMapperFactory:

com.substrate.keycloak.GitHubDataMapperFactory
  • [ ] Step 5: Commit
git add infra/keycloak/spi/
git commit -m "feat: Keycloak GitHub SPI mapper — fetch repos, pages, projects on OAuth login"

Phase 16: Extend IAM Module per Spec

Task 29: Add Project and User Group CRUD to IAM

Files: - Modify: backend/app/modules/iam/router.py - Modify: backend/app/modules/iam/schemas.py - Modify: backend/app/modules/iam/service.py - Modify: backend/app/modules/iam/repository.py

  • [ ] Step 1: Add new schemas

Add to backend/app/modules/iam/schemas.py:

class ProjectOut(BaseModel):
    id: str
    org_id: str
    name: str
    slug: str
    description: str | None = None

class ProjectListResponse(BaseModel):
    data: list[ProjectOut]
    meta: dict[str, int]

class CreateProjectRequest(BaseModel):
    name: str
    slug: str
    description: str | None = None

class UserGroupOut(BaseModel):
    id: str
    team_id: str
    name: str
    slug: str
    description: str | None = None

class UserGroupListResponse(BaseModel):
    data: list[UserGroupOut]
    meta: dict[str, int]

class CreateUserGroupRequest(BaseModel):
    name: str
    slug: str
    description: str | None = None

class OrgMemberOut(BaseModel):
    user_id: str
    org_role: str
    joined_at: str | None = None

class OrgMemberListResponse(BaseModel):
    data: list[OrgMemberOut]
    meta: dict[str, int]

class InviteMemberRequest(BaseModel):
    user_id: str
    org_role: str = "member"
  • [ ] Step 2: Add repository methods

Add to backend/app/modules/iam/repository.py — queries for: - list_projects(org_id) — SELECT from org_projects - create_project(org_id, name, slug, description) — INSERT into org_projects - list_user_groups(team_id) — SELECT from org_user_groups - create_user_group(team_id, name, slug, description) — INSERT into org_user_groups - list_org_members(org_id) — SELECT from org_memberships - invite_member(org_id, user_id, org_role) — INSERT into org_memberships

  • [ ] Step 3: Add service methods

Add to backend/app/modules/iam/service.py — methods that call repository + enforce tier limits (project count vs max_projects via licensing).

  • [ ] Step 4: Add new routes

Add to backend/app/modules/iam/router.py:

@router.get("/orgs/current/projects", response_model=ProjectListResponse)
@router.post("/orgs/current/projects", response_model=ProjectOut)
@router.get("/orgs/current/projects/{project_id}/teams", response_model=ListTeamsResponse)
@router.post("/orgs/current/projects/{project_id}/teams")
@router.get("/orgs/current/teams/{team_id}/groups", response_model=UserGroupListResponse)
@router.post("/orgs/current/teams/{team_id}/groups", response_model=UserGroupOut)
@router.get("/orgs/current/members", response_model=OrgMemberListResponse)
@router.post("/orgs/current/members")
  • [ ] Step 5: Commit
git add backend/app/modules/iam/
git commit -m "feat: IAM — project, user group, member CRUD per spec section 5.5"

Phase 17: Marketplace Upload & Requests

Task 30: Add Upload and Request Endpoints to Marketplace

Files: - Modify: backend/app/modules/marketplace/router.py - Modify: backend/app/modules/marketplace/schemas.py - Modify: backend/app/modules/marketplace/service.py - Modify: backend/app/modules/marketplace/repository.py

  • [ ] Step 1: Add upload and request schemas

Add to backend/app/modules/marketplace/schemas.py:

class ModuleRequestOut(BaseModel):
    id: str
    module_id: str
    requested_by: str
    reason: str | None = None
    status: str
    created_at: str

class ModuleRequestListResponse(BaseModel):
    data: list[ModuleRequestOut]
    meta: dict[str, int]

class CreateModuleRequestPayload(BaseModel):
    module_id: str
    reason: str | None = None

class ReviewModuleRequestPayload(BaseModel):
    status: str  # "approved" | "denied"

class UploadBundleResponse(BaseModel):
    module_id: str
    name: str
    version: str
    message: str
  • [ ] Step 2: Add repository methods

Add to backend/app/modules/marketplace/repository.py: - create_module_request(org_id, module_id, requested_by, reason) - list_module_requests(org_id, user_id, is_admin) - update_module_request(request_id, status, reviewed_by) - register_uploaded_module(manifest_data) — INSERT into marketplace_modules from bundle manifest

  • [ ] Step 3: Add service methods

Add to backend/app/modules/marketplace/service.py: - upload_bundle(user, file_bytes) — extract, verify signature, verify hashes, register - create_request(user, module_id, reason) - list_requests(user) - review_request(user, request_id, status) - uninstall_module(user, installation_id) — DELETE from installed_modules

  • [ ] Step 4: Add new routes

Add to backend/app/modules/marketplace/router.py:

@router.post("/upload", response_model=UploadBundleResponse)
@router.post("/requests", response_model=ModuleRequestOut)
@router.get("/requests", response_model=ModuleRequestListResponse)
@router.patch("/requests/{request_id}", response_model=ModuleRequestOut)
@router.delete("/installations/{installation_id}")
  • [ ] Step 5: Commit
git add backend/app/modules/marketplace/
git commit -m "feat: marketplace — bundle upload, module requests, uninstall"

Phase 18: Final Type Generation & Integration Test

Task 31: Regenerate Types and Verify

  • [ ] Step 1: Regenerate OpenAPI spec and types

Run: cd /home/dany/substrate && bash api/generate-types.sh

  • [ ] Step 2: Verify endpoint count

Run: grep "operationId" api/openapi.yml | wc -l Expected: 60+ endpoints

  • [ ] Step 3: Verify TypeScript types compile

Run: cd /home/dany/substrate/ui && npx tsc --noEmit Expected: No type errors

  • [ ] Step 4: Verify Python types generated

Run: python -c "from app.types.openapi_generated import *; print('OK')" Expected: OK

  • [ ] Step 5: Commit all generated files
git add api/openapi.yml ui/src/types/openapi.generated.ts backend/app/types/openapi_generated.py
git commit -m "feat: final OpenAPI spec + generated types with all ~60 endpoints"

Phase 19: Default YAML Config File

Task 32: Create settings.yaml with Full Defaults

Files: - Create: backend/config/settings.yaml

  • [ ] Step 1: Write the full YAML config

Copy the complete YAML from spec section 16.9 (lines 1685-1781) into backend/config/settings.yaml. This includes all 10 config domains with default notification preferences, feature flags, platform connection defaults, and retention rules.

  • [ ] Step 2: Commit
git add backend/config/settings.yaml
git commit -m "feat: default YAML config with all 10 settings domains"

Self-Review Checklist

Spec Coverage

Spec Section Plan Task(s) Status
1. Architecture Overview Tasks 11-13, 19-20 Covered
2. Org/IAM Hierarchy Tasks 2, 9-10, 29 Covered
3. Data Model (V5-V11) Tasks 2-8 Covered
4. .substrate Bundle Format Task 13 (loader) Covered
5. API Endpoints (~50) Tasks 14-20, 29-30, 24, 31 Covered
6. Billing & Licensing Tasks 14-17 Covered
7. Plugin Loading Engine Tasks 11-13, 19, 21-22 Covered
8. Frontend Integration Tasks 25-26 Covered
9. Infrastructure (OPA) Task 23 Covered
10. OpenAPI Wiring Tasks 1, 24, 31 Covered
11. Built-in Modules Tasks 21-22 Covered
12. Dual Access Paths Task 9 Covered
13. Auth Flows Task 27 Covered
14. Authorization Matrix Task 10 Covered
15. Security Tasks 9-10, 14 Covered
16. Config Management Task 18, 32 Covered
17. Files Modified/Created All tasks Covered

Type Consistency

  • PluginMeta used consistently in Tasks 11, 21, 22
  • DataConnectorBase / PolicyPackBase defined in Task 11, used in Tasks 21-22
  • PluginRegistry defined in Task 12, used in Tasks 19-20
  • UserInfo.org_id / org_role added in Task 9, used in Tasks 10, 14-18, 29-30
  • LicenseTokenService in Task 14 uses ES256, referenced by Task 15

Placeholder Scan

No TBD, TODO, or "fill in later" items. All code blocks are complete.