🌐 Distributed Mode¶
Scale Spector horizontally across multiple nodes. The distributed architecture uses consistent hash sharding, configurable replication, heartbeat-based membership, and parallel query fan-out with result merging via gRPC.
🏗️ Architecture Overview¶
graph TD
Client["👤 Client"] --> Coord["🧭 Query Coordinator<br/>Fan-out + Merge + Dedup"]
Coord --> S0["💾 Shard 0<br/>(Primary)"]
Coord --> S1["💾 Shard 1<br/>(Primary)"]
Coord --> S2["💾 Shard 2<br/>(Primary)"]
S0 --> R0["📋 Replica 0a"]
S1 --> R1["📋 Replica 1a"]
S2 --> R2["📋 Replica 2a"]
MS["💓 Membership Service<br/>(Heartbeat)"] -.-> S0
MS -.-> S1
MS -.-> S2
🧩 Components¶
🔑 Shard Manager¶
The ConsistentHashShardManager distributes documents across shards using consistent hashing on document IDs.
graph LR
subgraph "Hash Ring"
H1["Hash(doc-A) → Shard 0"]
H2["Hash(doc-B) → Shard 2"]
H3["Hash(doc-C) → Shard 1"]
end
Properties:
-
Each shard owns a range on a hash ring (using virtual nodes for even distribution)
-
Document ID → hash → ring position → assigned shard (deterministic)
-
Adding a shard migrates only affected documents (minimal data movement)
-
Shard count changes apply without full cluster restart
📋 Replication Manager¶
Each shard maintains configurable replicas for fault tolerance.
| Behavior | Details |
|---|---|
| Writes | Go to primary, replicate to all replicas within 2s |
| Reads | Served from any fully-synchronized replica |
| Primary failure | Replica promoted within 10 seconds |
| Recovery | Delta sync only (data changed since failure) |
💓 Membership Service¶
Heartbeat-based cluster membership tracking.
| Parameter | Default | Range |
|---|---|---|
heartbeatInterval |
2s | 500ms–30s |
heartbeatTimeout |
10s | 3s–120s |
Behavior:
-
Nodes send periodic heartbeats to announce liveness
-
Missing heartbeats beyond timeout → node marked unavailable
-
New nodes trigger shard rebalancing within 5 seconds
-
All active nodes converge to the same membership view within 5 seconds
🧭 Query Coordinator¶
sequenceDiagram
participant Client as 👤 Client
participant Coord as 🧭 Coordinator
participant S0 as 💾 Shard 0
participant S1 as 💾 Shard 1
participant S2 as 💾 Shard 2
Client->>Coord: Search request
par Fan-out (parallel gRPC)
Coord->>S0: Query
Coord->>S1: Query
Coord->>S2: Query
end
S0-->>Coord: Results
S1-->>Coord: Results
S2-->>Coord: Results
Note over Coord: Merge by score + dedup by ID
Coord-->>Client: ✨ Global top-K results
Note
If some shards timeout, the coordinator returns partial results from responding shards plus metadata indicating which shards were unreachable.
🚀 Deployment Guide¶
Prerequisites¶
-
All nodes must run the same Spector version
-
Nodes must be reachable via gRPC (default port: 9090)
-
Network latency between nodes should be <10ms for optimal performance
Starting a Cluster¶
Node 1 (seed node):
java -jar spector-node.jar \
--cluster-mode \
--node-id node-1 \
--grpc-port 9090 \
--shard-count 4 \
--replica-count 2 \
--seeds node-1:9090
Node 2:
java -jar spector-node.jar \
--cluster-mode \
--node-id node-2 \
--grpc-port 9090 \
--shard-count 4 \
--replica-count 2 \
--seeds node-1:9090
Node 3:
java -jar spector-node.jar \
--cluster-mode \
--node-id node-3 \
--grpc-port 9090 \
--shard-count 4 \
--replica-count 2 \
--seeds node-1:9090
✅ Verifying Cluster Health¶
{
"status": "RUNNING",
"clusterMode": true,
"activeNodes": 3,
"shardCount": 4,
"replicaCount": 2,
"topology": {
"node-1": {"status": "ACTIVE", "shards": [0, 1]},
"node-2": {"status": "ACTIVE", "shards": [2, 3]},
"node-3": {"status": "ACTIVE", "shards": ["0-replica", "2-replica"]}
}
}
🔒 gRPC TLS Setup¶
For production deployments, enable TLS on gRPC communication:
java -jar spector-node.jar \
--cluster-mode \
--grpc-port 9090 \
--grpc-tls \
--grpc-cert /path/to/cert.pem \
--grpc-key /path/to/key.pem \
--grpc-ca /path/to/ca.pem
🛡️ Failure Scenarios¶
💥 Node Failure¶
graph TD
A["💥 Node fails"] --> B["💓 Heartbeat timeout detected"]
B --> C["🚫 Node removed from routing"]
C --> D["📋 Replica promoted to primary"]
D --> E["✅ Queries continue from remaining nodes"]
🔄 Node Recovery¶
graph TD
A["🔄 Node resumes heartbeats"] --> B["💓 Re-registered in membership"]
B --> C["📋 Delta sync (only changed data)"]
C --> D["✅ Node resumes serving reads/writes"]
🌐 Network Partition¶
-
Nodes on each side continue serving their local shards
-
Queries to unreachable shards return partial results with timeout metadata
-
When partition heals, membership reconverges and replicas sync
📈 Scaling Guidelines¶
| Cluster Size | Shards | Documents | Estimated Throughput |
|---|---|---|---|
| 2 nodes | 2–4 | Up to 500K | ~15K QPS |
| 4 nodes | 4–8 | Up to 2M | ~29K QPS |
| 8 nodes | 8–16 | Up to 5M | ~55K QPS |
| 16 nodes | 16–32 | Up to 10M | ~100K QPS |
Note
Throughput estimates assume 128-dim vectors, top-10, hybrid search, extrapolated from single-node measured throughput of ~7.3K concurrent hybrid ops/s at 16 threads. Actual cluster throughput depends on network latency, shard balance, query routing overhead, and hardware homogeneity. These are projected estimates, not measured cluster benchmarks.
🔗 See Also¶
-
Architecture Overview — Overall system architecture
-
Configuration Guide — Cluster parameters
-
Performance Tuning — Optimizing distributed performance