📥 Ingestion Pipeline¶
Unified ingestion: document → chunk → embed → target. A single
IngestionPipelinewith builder configuration handles all ingestion — for both search engine and cognitive memory. The pipeline decides how to process content; theIngestionTargetdecides where to store it.
Architecture¶
All entry points (CLI, MCP, Server) route ingestion through SpectorRuntime:
CLI/MCP/Server → SpectorRuntime.ingestion() → IngestionHandler → IngestionPipeline
│
┌─────┴─────┐
▼ ▼
EngineIngestionTarget CognitiveIngestionTarget
(SEARCH mode) (MEMORY mode)
IngestionPipeline(inspector-ingestion) — unified chunk → embed → store orchestrator with builder patternIngestionTarget(inspector-ingestion) — abstraction for storage backends (engine or memory)IngestionHandler(inspector-runtime) — thin routing layer over the pipelineFileDiscoveryService(inspector-ingestion) — pure file discovery + title extraction utility
Module: spector-ingestion¶
The ingestion module is a low-level utility with no dependency on engine, runtime, or memory. It defines the pipeline and the IngestionTarget interface that downstream modules implement.
Key classes:
| Class | Purpose |
|---|---|
IngestionPipeline |
Builder-configured orchestrator — chunk → embed → store |
IngestionTarget |
Interface for storage backends (ingest(id, text, vector)) |
IngestionResult |
Outcome with chunk counts, failures, timing |
FileDiscoveryService |
File discovery, title extraction, config-driven filtering |
🔄 Pipeline Flow¶
flowchart LR
A["📄 Document"] --> B{"Content > threshold?"}
B -->|Yes| C["✂️ TextChunker<br/>Config-driven<br/>chunk size + overlap"]
B -->|No| D["Direct embed"]
C --> E["🧠 Parallel Embedding<br/>Virtual threads<br/>ParallelEmbeddingPipeline"]
D --> E
E --> F["💾 IngestionTarget<br/>Engine or Cognitive"]
F --> G["✅ IngestionResult"]
🏗️ Builder Pattern¶
The pipeline is configured once via a builder, then reused for all ingestion in a session:
// Read chunking config from spector.yml
var ingestionConfig = SpectorConfigFactory.ingestionDefaults(props);
var pipeline = IngestionPipeline.builder()
.target(engineTarget) // or cognitiveTarget
.embeddingProvider(embedder) // for auto-embedding
.chunking(new TextChunker(
ingestionConfig.chunkSize(),
ingestionConfig.chunkOverlap()))
.chunkThreshold(ingestionConfig.chunkSize())
.build();
The pipeline automatically selects a strategy based on content:
| Content | Strategy | Description |
|---|---|---|
| ≤ threshold | Direct | Embed whole text, store as single doc |
| > threshold | Chunked | Split via TextChunker, embed in parallel, store each chunk |
| Pre-embedded | Passthrough | Skip embedding, store vector directly |
| File path | Streaming | StreamingChunker for bounded-memory processing |
🎯 IngestionTarget Interface¶
The pipeline is decoupled from storage — it writes to any IngestionTarget:
public interface IngestionTarget {
void ingest(String id, String text, float[] vector);
default void storeParentMetadata(String parentId, int chunkCount) {}
default void onBatchComplete() {}
}
Implementations¶
| Target | Module | What it does |
|---|---|---|
EngineIngestionTarget |
spector-engine |
VectorStore → VectorIndex (HNSW/IVF/Spectrum) → KeywordIndex (BM25) |
CognitiveIngestionTarget |
spector-memory |
Synaptic tags → Surprise detection → ICNU fusion → Quantize → Tier route → WAL |
This decoupling enables:
- Testing — Mock the target for unit tests
- Rebuilding indexes — Point at a fresh index during reindexing
- Multi-tenant setups — Route documents to different targets
- Custom stores — Write to external systems alongside Spector
Virtual Thread Parallelism¶
Embedding calls (I/O-bound, network) run in parallel using the ParallelEmbeddingPipeline:
sequenceDiagram
participant Pipeline as 📥 IngestionPipeline
participant Chunker as ✂️ TextChunker
participant Embed as 🧠 ParallelEmbeddingPipeline
participant VT1 as Virtual Thread 1
participant VT2 as Virtual Thread 2
participant Target as 💾 IngestionTarget
Pipeline->>Chunker: chunk(document)
Chunker-->>Pipeline: List<Chunk>
Pipeline->>Embed: embed(chunkTexts)
par Batch 1
Embed->>VT1: embedBatch([c1,c2,c3,c4])
and Batch 2
Embed->>VT2: embedBatch([c5,c6,c7,c8])
end
VT1-->>Embed: vectors[0..3]
VT2-->>Embed: vectors[4..7]
Embed-->>Pipeline: List<PipelineEmbeddingResult>
loop For each successful embedding
Pipeline->>Target: ingest(chunkId, text, vector)
end
Pipeline-->>Pipeline: IngestionResult
Note
CPU-bound work (chunking, keyword tokenization, SIMD index insertion) runs synchronously on the caller's virtual thread. Only the embedding I/O call is parallelized. This avoids context-switch overhead on hot paths.
📋 Ingestion Modes¶
Text Ingestion (auto-chunked)¶
// Pipeline decides whether to chunk based on content length vs. threshold
IngestionResult result = pipeline.ingest("doc-1", longDocumentText);
Pre-embedded (skip embedding)¶
// For pre-computed vectors — no chunking, no embedding
IngestionResult result = pipeline.ingest("doc-1", "Hello world", precomputedVector);
Streaming File Ingestion¶
For multi-GB files that can't fit in memory:
IngestionResult result = pipeline.ingest(
Path.of("corpus.txt"), "corpus");
// Bounded memory: only ~2× chunkSize held at once via StreamingChunker
📊 Result Tracking¶
Every ingestion operation returns an IngestionResult:
public record IngestionResult(
String documentId,
int chunksStored,
List<String> failures, // chunk IDs that failed
long durationMs
) {}
Properties:
- Failed chunks don't halt the pipeline — other chunks continue
- Failure reasons are logged at WARN level
isFullSuccess()returns true only if all chunks succeeded- Timing includes chunking + embedding + storage
🧠 Cognitive Target Pipeline¶
When the CognitiveIngestionTarget receives a chunk from the unified pipeline, it executes the cognitive processing steps:
IngestionPipeline CognitiveIngestionTarget
│ │
│ ingest(id, text, vector) │
├──────────────────────────────────────────► │
│ ├── 2. Encode synaptic tags (Bloom filter)
│ ├── 3. Compute surprise (Dopamine)
│ ├── 3b. ICNU fusion (if hints provided)
│ ├── 4. Flashbulb check (extreme surprise)
│ ├── 5. Quantize to INT8
│ ├── 6. Build cognitive header
│ ├── 7. Write to tier store
│ ├── 8. Register in MemoryIndex
│ └── 9. WAL append
SpectorMemory.remember() calls CognitiveIngestionTarget.ingestCognitive() directly with full cognitive parameters (type, tags, source, ICNU hints).
⚡ Design Decisions¶
Why not Reactor?¶
The pipeline uses virtual threads instead of Project Reactor because:
| Concern | Virtual Threads | Reactor |
|---|---|---|
| Embedding I/O | Native async via VT | Requires Mono.fromCallable wrapping |
| Error handling | try/catch, intuitive | onErrorResume chains |
| Debugging | Normal stack traces | Operator assembly traces |
| Testing | Standard JUnit | StepVerifier complexity |
| Dependencies | Zero (JDK only) | reactor-core + reactor-netty |
Why a unified pipeline?¶
Consolidating from 3 separate ingestion paths:
- Single code path — Same chunking + embedding logic for search and memory
- Config-driven — Chunk size, overlap, threshold all read from
spector.yml - No OOM — Streaming chunker ensures bounded memory for large files
- Extensible — New targets only need to implement
IngestionTarget.ingest()
Why a separate module?¶
Extracting ingestion from SpectorEngine:
- Testability — Pipeline can be unit-tested with a mock
IngestionTarget - Reusability — Bulk ingestion tools don't need the full engine
- Clarity — Ingestion logic is isolated from search/lifecycle concerns
- Extensibility — Custom pipelines can compose different chunkers/embedders
🔗 See Also¶
- RAG Pipeline — Retrieval and context assembly
- Architecture Overview — Module dependency graph
- REST API Reference — Ingest endpoints
- Configuration Guide — Chunking and embedding parameters