Ingestion Service¶
The Ingestion Service is the sensor array of the Substrate platform, responsible for building and continuously maintaining the Unified Multimodal Knowledge Base (UMKB). It is the single source of truth construction pipeline — every node, edge, and embedding in the Observed Graph originates here.
Responsibility¶
Transform heterogeneous raw data — source code, infrastructure configuration, project planning data, runtime host state, documentation, and institutional memory — into a unified, queryable graph of typed nodes, directional edges, and vector embeddings. The Ingestion Service ensures that the Observed Graph reflects ground truth in near real-time, providing the factual substrate against which all governance, reasoning, and simulation operate.
Architecture Overview¶
The Ingestion Service is event-driven. Webhooks and scheduled pollers feed raw events into NATS JetStream topic streams. Celery workers consume from those streams, execute connector logic, and write results to:
- Neo4j 5.x — graph nodes and edges (Observed Graph)
- PostgreSQL 16 + pgvector — vector embeddings stored in an HNSW index, ingestion audit logs, deduplication metadata
- Redis 7 — distributed locks (SET NX EX) per ingestion event, result cache keyed on SHA-256 content hash
Deduplication is enforced at two levels: (1) a Redis result cache keyed on SHA-256 content hash prevents redundant processing of identical payloads, and (2) a distributed Redis lock per ingestion event prevents parallel workers from processing the same event simultaneously.
Data Connectors¶
Connector Priority Table¶
| Connector | Data Ingested | MVP Priority | Trigger / Frequency |
|---|---|---|---|
| GitHub (Repos + Code) | AST-parsed code graph, PR events, commits, CODEOWNERS, releases, ADRs in markdown | Must Have | Webhook: PR open/update/merge, push |
| GitHub Projects v2 | Sprint items, custom fields, iteration data, status updates | Must Have | Webhook: projects_v2_item; GraphQL poll hourly |
| GitHub Pages | Published docs sites, staleness delta vs code commits | Must Have | Poll: 6-hour |
| SSH Runtime Connector | Running processes, port bindings, packages, config checksums | Must Have | Scheduled: 15 min per host; on-demand |
| Terraform | InfraResource nodes, HOSTS/CONNECTS_TO edges, state diffs | Must Have | Webhook: post-apply or file watcher |
| Kubernetes | Deployments, Services, Ingress, ConfigMaps — live watch | Must Have | K8s Watch API: continuous |
| Jira | Tickets, epics, sprints, component links | Nice to Have (MVP) | Webhook: issue create/update |
| Confluence | Pages, ADRs, post-mortems, runbooks | Nice to Have (MVP) | Webhook: page save |
| Slack | Decision threads via keyword trigger | Nice to Have (v1.1) | Event API: message.channels |
| Datadog | Runtime alert signals, anomaly events, SLO state | Nice to Have (v1.1) | Webhook: monitor alerts |
Specialized Connector Details¶
GitHub (Repos + Code)¶
The GitHub connector registers as either an OAuth App or a GitHub App against the target organization and installs webhooks for pull_request, push, and check_run events (ING-01).
On each push or PR event, the connector clones or shallow-fetches the repository, then invokes the Tree-sitter + stack-graphs Rust CLI to parse every changed source file into an Abstract Syntax Tree (ING-02). The stack-graphs engine performs compiler-level name resolution, producing Module, Function, Class, and Import nodes tagged with file path and line-range metadata.
Edges extracted include (ING-03):
CALLS— direct function invocation between two Function nodesDEPENDS_ON— module-level import or package dependencyIMPORTS— file-level import relationshipDEFINES— ownership of a Function or Class by a Module
On PR open events, the connector computes an incremental graph delta (ING-04): only files present in the PR diff are re-parsed. The resulting subgraph delta — new nodes, removed nodes, changed edges — is published to the substrate.ingestion.code.delta NATS subject for downstream consumption by the Governance and Reasoning services.
ADRs stored as markdown files in the repository are ingested as DecisionNode entries with WHY edges linking them to the relevant service or module nodes (ING-11).
GitHub Projects v2¶
GitHub Classic Projects reached end-of-life in April 2025. All sprint and project planning integration uses the GraphQL API exclusively.
The connector queries the ProjectV2 object to extract:
SprintNodeentities from iteration fieldsProjectItemnodes representing individual work items- Custom fields: text, number, date, single-select, and iteration types
- Status update values:
ON_TRACK,AT_RISK,OFF_TRACK,COMPLETE— mapped toSprintNode.healthgraph properties (ING-05)
Rate limits apply: 5,000 points/hour for Personal Access Tokens, 10,000 points/hour for GitHub Apps on Enterprise Cloud. Pagination is cursor-based with a maximum of 100 items per page.
In addition to webhook delivery for projects_v2_item events, a GraphQL hourly poll is scheduled as a consistency backstop to catch any missed webhook deliveries.
GitHub Pages¶
The GitHub Pages connector calls the Pages REST API to retrieve the last build timestamp for each configured Pages site. It then compares that timestamp against the last code commit in the source branch to compute a documentation staleness signal (ING-06). This signal is attached to the corresponding documentation node as a staleness_delta_days property and surfaced by the Proactive Maintenance Service.
Polling frequency: every 6 hours.
SSH Runtime Connector (Agentless)¶
No existing IDP platform implements agentless SSH-based runtime verification at this level of fidelity. The SSH Runtime Connector closes the gap between what the graph believes is deployed and what is actually running on production hosts.
Architecture: A single JSON-outputting shell script is deployed to each target host via SSH. No persistent daemon, no agent installation. Round-trips are minimised by collecting all inspection data in a single script invocation and returning a structured JSON document (ING-07).
Security model:
- HashiCorp Vault SSH CA issues ephemeral certificates with a 5-minute TTL
- Connections route through a ProxyJump bastion; SSH agent forwarding is never used
ForceCommandon the target restricts the session to the single inspection script- SSH multiplexing via
ControlMaster=autoandControlPersist=600sreduces execution time by more than 50% across multi-host batches
Verification checks performed per host:
| Check | Command | Graph Comparison Target |
|---|---|---|
| Running services | systemctl list-units --type=service --state=running --output=json |
Service nodes in Observed Graph |
| Port bindings | ss -tlnp --json |
Declared port mappings on InfraResource nodes |
| Package drift | dpkg -l \| awk \| sha256sum |
Dependency nodes + lockfile hash |
| Config integrity | sha256sum of critical config files |
Baselines stored at last deploy |
| Container state | docker inspect |
K8s/Docker service declarations |
| Env var presence | Hash only (no values exported) | Required env vars declared in service manifest |
The output JSON is parsed and diffed against the current Observed Graph. Any divergence is published as a RuntimeDriftEvent to NATS for the Governance Service to evaluate against the substrate/runtime-drift policy threshold (ING-07, GOV-12).
Execution schedule: every 15 minutes per host, plus on-demand trigger via API or workflow.
Terraform¶
The Terraform connector ingests state files either via a post-apply webhook from Terraform Cloud/Enterprise or by watching a configured state file path (ING-08).
From each state file, the connector extracts:
InfraResourcenodes for every managed resource (EC2 instances, RDS clusters, S3 buckets, load balancers, etc.)HOSTSedges connecting infrastructure resources to the services they runCONNECTS_TOedges representing declared network connectivity between resources
On each ingestion, the new state is diffed against the prior state stored in Neo4j. Drift — resources added, removed, or with changed properties outside a known deployment window — is flagged as a TerraformDriftEvent.
Kubernetes¶
The Kubernetes connector establishes a Kubernetes Watch API connection (using the watch=true query parameter on the relevant resource endpoints) to receive real-time change notifications for (ING-09):
Deploymentobjects →ServiceNodewith replica count, image tag, resource limitsServiceobjects →InfraResourcewith port mappings and selector labelsIngressobjects → routing rules asROUTES_TOedgesConfigMapobjects → configuration nodes withCONFIGURESedges to dependent services
The Watch connection is long-lived and reconnects with exponential backoff on disconnection. The Kubernetes connector maintains the live cluster state in the Observed Graph continuously, not on a schedule.
Jira (Nice to Have — MVP)¶
Jira tickets are ingested as IntentAssertion nodes and linked to the service nodes they describe via entity resolution (ING-17). Epics produce EpicNode entries; sprints produce SprintNode entries. Component links map to service ownership edges. Ingestion is triggered by issue_created and issue_updated webhooks.
Confluence (Nice to Have — MVP)¶
Confluence pages are ingested as Policy or Documentation nodes depending on their space and label classification (ING-18). The connector detects orphaned documentation (doc node with no matching live service node) and undocumented services (service node with no DOCUMENTS edge). Ingestion is triggered by page_created and page_updated webhooks.
ADRs and post-mortems stored in Confluence are parsed and cross-linked with their GitHub markdown counterparts through entity resolution to avoid duplicate DecisionNode entries.
Slack (Nice to Have — v1.1)¶
Slack decision threads matching configured keyword triggers or channel subscriptions are ingested as IntentAssertion nodes (ING-20). The connector uses the Slack Event API (message.channels) and applies a Dense extract-lora classifier to identify messages containing architectural decisions, risk acknowledgements, or scope commitments before creating graph entries.
Cross-Cutting Concerns¶
Entity Resolution (ING-10)¶
Every ingestion pipeline passes candidate node identifiers through the Dense resolve-lora adapter before writing to Neo4j. This canonicalises variant representations of the same entity — for example, "Service A", "srv-a", "service-a", and "ServiceA" — into a single canonical node with ALIAS edges recording the source-system identifiers.
Entity resolution runs as a synchronous step within the Celery task prior to graph writes. If the resolver confidence is below 0.6, the candidate node is written to the verification queue (see Proactive Maintenance Service) rather than directly to the graph.
Institutional Memory Ingestion¶
ADRs (ING-11): Architectural Decision Records are ingested from both GitHub markdown files and linked Confluence pages. Each produces a DecisionNode with:
title,status(Accepted/Superseded/Deprecated),date,context,decision,consequencespropertiesWHYedge to the service or component the decision governsSUPERSEDESedge to any prior ADR it replaces
Post-mortems (ING-12): Post-incident review documents from Confluence and GitHub are parsed to extract FailurePattern nodes. Each FailurePattern carries:
incident_date,severity,affected_services,root_cause_summarypropertiesCAUSED_BYedges to the contributing code or infrastructure nodesAFFECTEDedges to all service nodes impacted
CVE / Advisory Feed (ING-13)¶
The GitHub Advisory Database and CVE feed are polled every 15 minutes. Each advisory is classified for relevance to the current dependency graph using the Dense extract-lora adapter. If the advisory references a package that appears in any DEPENDS_ON edge, the affected dependency node is tagged with a cve_ids array property and a CVE_AFFECTS edge is created to a VulnerabilityNode. A CveAlertEvent is published to NATS for Governance Service evaluation.
Embeddings and Deduplication (ING-14, ING-15, ING-16)¶
Every node written to the graph also receives a bge-m3 vector embedding (1024-dimensional) generated via the embedding service on DGX Spark port 8003. Embeddings are stored in the pgvector HNSW index in PostgreSQL with cosine distance metric.
Before inserting any new node, a cosine similarity query is run against the HNSW index. Nodes with similarity above 0.95 to an existing node are flagged for deduplication review rather than inserted as new entries.
Deduplication of ingestion events is enforced via a Redis result cache keyed on the SHA-256 hash of the raw event payload (ING-15). If a cache hit is found, the event is acknowledged and discarded without processing.
Each ingestion event also acquires a distributed Redis lock using SET NX EX 300 keyed on the event's content hash (ING-16). This prevents two Celery workers from processing the same event concurrently during a transient cache miss.
Functional Requirements¶
| ID | Requirement | Priority |
|---|---|---|
| ING-01 | Connect to GitHub via OAuth App or GitHub App; register webhooks for PR, push, check_run events | Must Have |
| ING-02 | Clone repositories; parse AST via Tree-sitter + stack-graphs Rust CLI; extract Module/Function/Class/Import nodes with file+line metadata | Must Have |
| ING-03 | Extract CALLS, DEPENDS_ON, IMPORTS, DEFINES edges from stack-graph with compiler-level name resolution precision | Must Have |
| ING-04 | On PR open: incremental graph delta — only changed files re-parsed and diffed; result published to NATS | Must Have |
| ING-05 | Ingest GitHub Projects v2 via GraphQL API; extract SprintNode, ProjectItem nodes; map status updates to graph properties | Must Have |
| ING-06 | Ingest GitHub Pages via Pages REST API; compare last build date to last code commit; extract documentation staleness signal | Must Have |
| ING-07 | SSH Runtime Connector: Vault-signed ephemeral cert, ProxyJump connection, inspection script execution, JSON output parse, diff against graph | Must Have |
| ING-08 | Parse Terraform state files; extract InfraResource nodes with HOSTS, CONNECTS_TO edges; detect drift from prior state | Must Have |
| ING-09 | Watch Kubernetes API; extract Deployment, Service, Ingress, ConfigMap nodes; maintain sync with live cluster state | Must Have |
| ING-10 | Entity resolution: canonicalise "Service A" / "srv-a" / "service-a" across all sources using Dense resolve-lora | Must Have |
| ING-11 | Ingest ADRs from GitHub markdown files and linked Confluence pages; extract DecisionNode with WHY edges; capture rationale as institutional memory | Must Have |
| ING-12 | Ingest post-mortems from Confluence/GitHub; extract FailurePattern nodes with CAUSED_BY and AFFECTED edges | Must Have |
| ING-13 | Poll GitHub Advisory/CVE feed every 15 minutes; classify relevance using Dense extract-lora; identify affected dependency nodes | Must Have |
| ING-14 | Generate bge-m3 embeddings for all nodes; store in pgvector HNSW index; run cosine deduplication check pre-insert | Must Have |
| ING-15 | Deduplicate ingestion events via Redis result cache keyed on SHA-256 content hash | Must Have |
| ING-16 | Distributed lock per ingestion event via Redis SET NX EX to prevent parallel duplicate processing | Must Have |
| ING-17 | Ingest Jira tickets as IntentAssertion nodes linked to service nodes via entity resolution | Nice to Have (MVP) |
| ING-18 | Ingest Confluence pages as Policy/Documentation nodes; detect orphaned docs and undocumented services | Nice to Have (MVP) |
| ING-19 | Ingest PR review comments as design rationale MemoryNodes linked to code nodes | Nice to Have (v1.1) |
| ING-20 | Ingest Slack decision threads via keyword/channel trigger as IntentAssertion nodes | Nice to Have (v1.1) |
Node and Edge Taxonomy¶
The Ingestion Service is the sole authoritative writer for the following node types in the Observed Graph:
| Node Type | Source Connectors | Key Properties |
|---|---|---|
ServiceNode |
GitHub, K8s, Terraform, SSH | name, language, repo_url, criticality (PageRank) |
FunctionNode |
GitHub (AST) | name, file_path, line_start, line_end, signature |
ModuleNode |
GitHub (AST) | name, file_path, language |
ClassNode |
GitHub (AST) | name, file_path, line_start |
InfraResourceNode |
Terraform, K8s, SSH | provider, resource_type, region, host |
SprintNode |
GitHub Projects v2, Jira | title, start_date, end_date, health |
ProjectItemNode |
GitHub Projects v2, Jira | title, status, assignee, sprint_id |
DecisionNode |
GitHub (ADR), Confluence | title, status, date, rationale |
FailurePatternNode |
GitHub, Confluence | incident_date, severity, root_cause_summary |
VulnerabilityNode |
GitHub Advisory, CVE feed | cve_id, severity, affected_packages |
DocumentationNode |
GitHub Pages, Confluence | url, last_build_date, staleness_delta_days |
IntentAssertionNode |
Jira, Slack | source, text, linked_service, confidence |
Infrastructure Dependencies¶
| Component | Role in Ingestion Service |
|---|---|
| NATS JetStream | Event bus: ingestion events published to substrate.ingestion.> subjects; at-least-once delivery guarantee |
| Celery | Distributed task queue consuming from NATS; handles retries with exponential backoff |
| Redis 7 | Distributed locks (SET NX EX) + SHA-256 result cache for deduplication |
| Neo4j 5.x | Primary graph store; all nodes and edges written here |
| PostgreSQL 16 + pgvector | HNSW vector index for bge-m3 embeddings; ingestion audit log |
| DGX Spark port 8001 | Dense 70B + resolve-lora: entity resolution, relevance classification |
| DGX Spark port 8003 | bge-m3: vector embedding generation for all ingested nodes |
| HashiCorp Vault | SSH CA for ephemeral certificate issuance (SSH Runtime Connector) |