Improve subscription handling and RPC reliability

- Flush RPC writer after sending responses to ensure delivery - Add
cancellation guard in useNormalizedQuery to avoid leaks - Make
SubscriptionManager concurrency-safe: deduplicate in-flight subscribes -
Track pending subscriptions to avoid races and add centralized cleanup
logic - Introduce createSubscription helper and centralized cleanup
logic - Update docs with Sync State Machine, HLC update algorithm, retry
queue, metrics, and protocol messages - Reflect new config defaults for
timeouts and intervals
This commit is contained in:
Jamie Pine 2025-11-27 12:48:50 -08:00
parent 97cc3853b5
commit ace39839a2
4 changed files with 706 additions and 66 deletions

View File

@ -342,6 +342,10 @@ impl RpcServer {
if let Err(_) = writer.write_all((response_json + "\n").as_bytes()).await {
break; // Connection closed
}
// Flush to ensure response is sent immediately
if let Err(_) = writer.flush().await {
break; // Connection closed
}
// For non-streaming requests, close connection after response
match response {
@ -388,6 +392,9 @@ impl RpcServer {
if let Err(_) = writer.write_all((response_json + "\n").as_bytes()).await {
break; // Connection closed
}
if let Err(_) = writer.flush().await {
break; // Connection closed
}
}
}
}

View File

@ -63,6 +63,71 @@ Any device can create or modify:
This ownership model eliminates most conflicts and simplifies synchronization.
## Sync State Machine
The sync service runs as a background process with well-defined state transitions:
```
Uninitialized → Backfilling → CatchingUp → Ready ⇄ Paused
```
### States
| State | Description |
| ----- | ----------- |
| `Uninitialized` | Device hasn't synced yet (no watermarks) |
| `Backfilling { peer, progress }` | Receiving initial state from a peer (0-100%) |
| `CatchingUp { buffered_count }` | Processing updates buffered during backfill |
| `Ready` | Fully synced, applying real-time updates |
| `Paused` | Sync disabled or device offline |
### Transitions
```
Uninitialized
→ [peer available] → Backfilling
→ [already has data] → Ready
Backfilling
→ [complete] → CatchingUp
→ [peer disconnected] → save checkpoint, select new peer
CatchingUp
→ [buffer empty] → Ready
→ [5 consecutive failures] → Uninitialized (escalate to full backfill)
Ready
→ [offline] → Paused
→ [watermarks stale] → CatchingUp
Paused
→ [online] → Ready or CatchingUp
```
### Buffer Queue
During backfill, incoming real-time updates are buffered to prevent data loss:
- **Max capacity**: 100,000 updates
- **Ordering**: Priority queue sorted by timestamp/HLC
- **Overflow handling**: Drops oldest updates to prevent OOM
- **Processing**: Drained in order during CatchingUp phase
### Catch-Up Escalation
If incremental catch-up fails repeatedly, the system escalates:
```
Attempt 1: Wait 10s, retry
Attempt 2: Wait 20s, retry
Attempt 3: Wait 40s, retry
Attempt 4: Wait 80s, retry
Attempt 5: Wait 160s (capped), retry
After 5 failures: Reset to Uninitialized, trigger full backfill
```
This prevents permanent sync failures from transient network issues.
## Sync Protocols
### State-Based Sync (Device-Owned)
@ -152,6 +217,49 @@ Properties:
- Any two HLCs can be compared
- No clock synchronization required
### HLC Update Algorithm
When generating or receiving an HLC, the system maintains causality:
```rust
fn generate(last: Option<HLC>, device_id: Uuid) -> HLC {
let physical = now_millis();
let (timestamp, counter) = match last {
Some(prev) if prev.timestamp >= physical => {
// Clock hasn't advanced, increment counter
(prev.timestamp, prev.counter + 1)
}
Some(prev) => {
// Clock advanced, reset counter
(physical, 0)
}
None => (physical, 0),
};
HLC { timestamp, counter, device_id }
}
fn update(&mut self, received: HLC) {
let physical = now_millis();
let max_ts = max(self.timestamp, max(received.timestamp, physical));
self.counter = if max_ts == self.timestamp && max_ts == received.timestamp {
max(self.counter, received.counter) + 1
} else if max_ts == self.timestamp {
self.counter + 1
} else if max_ts == received.timestamp {
received.counter + 1
} else {
0 // Physical time advanced
};
self.timestamp = max_ts;
}
```
This ensures:
- Local events always have increasing HLCs
- Received events update local clock to maintain causality
- Clock drift is bounded by the max of all observed timestamps
### Conflict Resolution
Each shared model implements its own `apply_shared_change()` method, allowing per-model conflict resolution strategies. The `Syncable` trait provides this flexibility.
@ -459,6 +567,35 @@ for (record, fk_field, missing_uuid) in result.failed {
actual data transfer.
</Info>
### Dependency Tracking
During backfill, records may arrive before their FK dependencies (e.g., an entry before its parent folder). The `DependencyTracker` handles this efficiently:
```rust
// Record fails FK resolution - parent doesn't exist yet
let error = "Foreign key lookup failed: parent_uuid abc-123 not found";
let missing_uuid = extract_missing_dependency_uuid(&error);
// Track the waiting record
dependency_tracker.add_dependency(missing_uuid, buffered_update);
// Later, when parent record arrives and is applied...
let waiting = dependency_tracker.resolve(parent_uuid);
for update in waiting {
// Retry applying - FK should resolve now
apply_update(update).await?;
}
```
This provides **O(n) targeted retry** instead of O(n²) "retry entire buffer" approaches:
| Approach | Records | FKs | Retries | Complexity |
| -------- | ------- | --- | ------- | ---------- |
| Retry all | 10,000 | 3 | 10,000 × 10,000 | O(n²) |
| Dependency tracking | 10,000 | 3 | ~100 targeted | O(n) |
The tracker maintains a map of `missing_uuid → Vec<waiting_updates>`. When a record is successfully applied, its UUID is checked against the tracker to resolve any waiting dependents.
## Sync Flows
<Info>
@ -581,6 +718,57 @@ This is made possible by two core architectural principles:
This architecture provides significant redundancy and resilience, as the library can stay in sync as long as there is any path of connectivity between peers.
### Peer Selection
When starting a backfill, the system scores available peers to select the best source:
```rust
fn score(&self) -> i32 {
let mut score = 0;
// Prefer online peers
if self.is_online { score += 100; }
// Prefer peers with complete state
if self.has_complete_state { score += 50; }
// Prefer low latency (measured RTT)
score -= (self.latency_ms / 10) as i32;
// Prefer less busy peers
score -= (self.active_syncs * 10) as i32;
score
}
```
Peers are sorted by score (highest first). The best peer is selected for backfill. If that peer disconnects, the checkpoint is saved and a new peer is selected.
### Deterministic UUIDs
System-provided resources use deterministic UUIDs (v5 namespace hashing) so they're identical across all devices:
```rust
// System tags have consistent UUIDs everywhere
let system_tag_uuid = deterministic_system_tag_uuid("system");
// Always: 550e8400-e29b-41d4-a716-446655440000 (example)
// Library-scoped defaults
let default_uuid = deterministic_library_default_uuid(library_id, "default_collection");
```
**Use deterministic UUIDs for:**
- System tags (system, screenshot, download, document, image, video, audio, hidden, archive, favorite)
- Built-in collections
- Library defaults
**Use random UUIDs for:**
- User-created tags (supports duplicate names in different contexts)
- User-created collections
- All user content
This prevents creation conflicts for system resources while allowing polymorphic naming for user content.
### Delete Handling
<Info>
@ -703,6 +891,118 @@ The peer log query returns the next batch starting after the provided HLC, maint
Both pagination strategies ensure all records are fetched exactly once, no records are skipped even with identical timestamps, and backfill is resumable from checkpoint if interrupted.
## Protocol Messages
The sync protocol uses JSON-serialized messages over Iroh/QUIC streams:
### Message Types
| Message | Direction | Purpose |
| ------- | --------- | ------- |
| `StateChange` | Broadcast | Single device-owned record update |
| `StateBatch` | Broadcast | Batch of device-owned records |
| `StateRequest` | Request | Pull device-owned data from peer |
| `StateResponse` | Response | Device-owned data with tombstones |
| `SharedChange` | Broadcast | Single shared resource update (HLC) |
| `SharedChangeBatch` | Broadcast | Batch of shared resource updates |
| `SharedChangeRequest` | Request | Pull shared changes since HLC |
| `SharedChangeResponse` | Response | Shared changes + state snapshot |
| `AckSharedChanges` | Broadcast | Acknowledge receipt (enables pruning) |
| `Heartbeat` | Broadcast | Peer status with watermarks |
| `WatermarkExchangeRequest` | Request | Request peer's sync progress |
| `WatermarkExchangeResponse` | Response | Peer's watermarks for catch-up |
| `Error` | Response | Error message |
### Message Structures
```rust
// Device-owned state change
StateChange {
library_id: Uuid,
model_type: String, // "location", "entry", etc.
record_uuid: Uuid,
device_id: Uuid, // Owner device
data: serde_json::Value, // Record as JSON
timestamp: DateTime<Utc>,
}
// Batch of device-owned changes
StateBatch {
library_id: Uuid,
model_type: String,
device_id: Uuid,
records: Vec<StateRecord>, // [{uuid, data, timestamp}, ...]
}
// Request device-owned state
StateRequest {
library_id: Uuid,
model_types: Vec<String>,
device_id: Option<Uuid>, // Specific device or all
since: Option<DateTime>, // Incremental sync
checkpoint: Option<String>, // Resume cursor
batch_size: usize,
}
// Response with device-owned state
StateResponse {
library_id: Uuid,
model_type: String,
device_id: Uuid,
records: Vec<StateRecord>,
deleted_uuids: Vec<Uuid>, // Tombstones
checkpoint: Option<String>, // Next page cursor
has_more: bool,
}
// Shared resource change (HLC-ordered)
SharedChange {
library_id: Uuid,
entry: SharedChangeEntry,
}
SharedChangeEntry {
hlc: HLC, // Ordering key
model_type: String,
record_uuid: Uuid,
change_type: ChangeType, // Insert, Update, Delete
data: serde_json::Value,
}
// Heartbeat with sync progress
Heartbeat {
library_id: Uuid,
device_id: Uuid,
timestamp: DateTime<Utc>,
state_watermark: Option<DateTime>, // Last state sync
shared_watermark: Option<HLC>, // Last shared change
}
// Watermark exchange for reconnection
WatermarkExchangeRequest {
library_id: Uuid,
device_id: Uuid,
my_state_watermark: Option<DateTime>,
my_shared_watermark: Option<HLC>,
}
WatermarkExchangeResponse {
library_id: Uuid,
device_id: Uuid,
state_watermark: Option<DateTime>,
shared_watermark: Option<HLC>,
needs_state_catchup: bool,
needs_shared_catchup: bool,
}
```
### Serialization
- **Format**: JSON via serde
- **Bidirectional streams**: 4-byte length prefix (big-endian) + JSON bytes
- **Unidirectional streams**: Direct JSON bytes
- **Timeout**: 30s for messages, 60s for backfill requests
### Connection State Tracking
<Info>
@ -732,6 +1032,42 @@ Some data is computed locally and never syncs:
These rebuild automatically from synced base data.
## Retry Queue
Failed sync messages are automatically retried with exponential backoff:
### Retry Behavior
| Attempt | Delay | Action |
| ------- | ----- | ------ |
| 1 | 5s | First retry |
| 2 | 10s | Second retry |
| 3 | 20s | Third retry |
| 4 | 40s | Fourth retry |
| 5 | 80s | Final retry |
| 6+ | - | Message dropped |
### How It Works
```
1. Broadcast fails (peer unreachable, timeout, etc.)
2. Message queued with next_retry = now + 5s
3. Background task checks queue every sync_loop_interval
4. Ready messages retried in order
5. Success: remove from queue
6. Failure: re-queue with doubled delay
7. After 5 attempts: drop and log warning
```
### Queue Management
- **Atomic processing**: Messages removed before retry to prevent duplicates
- **Ordered by next_retry**: Earliest messages processed first
- **No persistence**: Queue lost on restart (messages will re-sync via watermarks)
- **Metrics**: `retry_queue_depth` tracks current queue size
The retry queue handles transient network failures without blocking real-time sync. Permanent failures eventually resolve via watermark-based catch-up when the peer reconnects.
## Portable Volumes & Ownership Changes
A key feature of Spacedrive is the ability to move external drives between devices without losing track of the data. This is handled through a special sync process that allows the "ownership" of a `Location` to change.
@ -830,6 +1166,244 @@ RUST_LOG=sd_core::sync=debug cargo run
**Conflicts**: Check HLC implementation maintains ordering.
## Error Types
The sync system defines specific error types for different failure modes:
### Infrastructure Errors
```rust
/// HLC parsing failures
HLCError::ParseError(String)
/// Peer log database errors
PeerLogError {
ConnectionError(String), // Can't open sync.db
QueryError(String), // SQL query failed
SerializationError(String), // JSON encode/decode failed
ParseError(String), // Invalid data format
}
/// Watermark tracking errors
WatermarkError {
QueryError(String),
ParseError(String),
}
/// Checkpoint persistence errors
CheckpointError {
QueryError(String),
ParseError(String),
}
```
### Registry Errors
```rust
ApplyError {
UnknownModel(String), // Model not registered
MissingFkLookup(String), // FK mapper not configured
WrongSyncType { model, expected, got }, // Device-owned vs shared mismatch
MissingApplyFunction(String), // No apply handler
MissingQueryFunction(String), // No query handler
MissingDeletionHandler(String), // No deletion handler
DatabaseError(String), // DB operation failed
}
```
### Dependency Errors
```rust
DependencyError {
CircularDependency(String), // A → B → A detected
UnknownDependency(String, String), // Depends on unregistered model
NoModels, // Empty registry
}
```
### Transaction Errors
```rust
TxError {
Database(DbErr), // SeaORM error
SyncLog(String), // Peer log write failed
Serialization(serde_json::Error), // JSON error
InvalidModel(String), // Model validation failed
}
```
All errors implement `std::error::Error` and include context for debugging.
## Metrics & Observability
The sync system collects comprehensive metrics for monitoring and debugging.
### Metric Categories
**State Metrics**:
- `current_state` - Current sync state (Uninitialized, Backfilling, etc.)
- `state_entered_at` - When current state started
- `state_history` - Recent state transitions (ring buffer)
- `total_time_in_state` - Cumulative time per state
- `transition_count` - Number of state transitions
**Operation Metrics**:
- `broadcasts_sent` - Total broadcast messages sent
- `state_changes_broadcast` - Device-owned changes broadcast
- `shared_changes_broadcast` - Shared resource changes broadcast
- `changes_received` - Updates received from peers
- `changes_applied` - Successfully applied updates
- `changes_rejected` - Updates rejected (conflict, error)
- `active_backfill_sessions` - Concurrent backfills in progress
- `retry_queue_depth` - Messages waiting for retry
**Data Volume Metrics**:
- `entries_synced` - Records synced per model type
- `entries_by_device` - Records synced per peer device
- `bytes_sent` / `bytes_received` - Network bandwidth
- `last_sync_per_peer` - Last sync timestamp per device
- `last_sync_per_model` - Last sync timestamp per model
**Performance Metrics**:
- `broadcast_latency` - Time to broadcast to all peers (histogram)
- `apply_latency` - Time to apply received changes (histogram)
- `backfill_request_latency` - Backfill round-trip time (histogram)
- `peer_rtt_ms` - Per-peer round-trip time
- `watermark_lag_ms` - How far behind each peer is
- `hlc_physical_drift_ms` - Clock drift detected via HLC
- `hlc_counter_max` - Highest logical counter seen
**Error Metrics**:
- `total_errors` - Total error count
- `network_errors` - Connection/timeout failures
- `database_errors` - DB operation failures
- `apply_errors` - Change application failures
- `validation_errors` - Invalid data received
- `recent_errors` - Last N errors with details
- `conflicts_detected` - Concurrent modification conflicts
- `conflicts_resolved_by_hlc` - Conflicts resolved via HLC
### Histogram Metrics
Performance metrics use histograms with atomic min/max/avg tracking:
```rust
HistogramMetric {
count: AtomicU64, // Number of samples
sum: AtomicU64, // Sum for average
min: AtomicU64, // Minimum value
max: AtomicU64, // Maximum value
}
// Methods
histogram.avg() // Average latency
histogram.min() // Best case
histogram.max() // Worst case
histogram.count() // Sample count
```
### Snapshots
Metrics can be captured as point-in-time snapshots:
```rust
let snapshot = sync_service.metrics().snapshot().await;
// Filter by time range
let recent = snapshot.filter_since(one_hour_ago);
// Filter by peer
let alice_metrics = snapshot.filter_by_peer(alice_device_id);
// Filter by model
let entry_metrics = snapshot.filter_by_model("entry");
```
### History
A ring buffer stores recent snapshots for time-series analysis:
```rust
MetricsHistory {
capacity: 1000, // Max snapshots retained
snapshots: VecDeque<SyncMetricsSnapshot>,
}
// Query methods
history.get_snapshots_since(timestamp)
history.get_snapshots_range(start, end)
history.get_latest_snapshot()
```
### Persistence
Metrics are persisted to the database every 5 minutes (configurable via `metrics_log_interval_secs`). This enables post-mortem analysis of sync issues.
## Sync Event Bus
The sync system uses a dedicated event bus separate from the general application event bus:
### Why Separate?
The general `EventBus` handles high-volume events (filesystem changes, job progress, UI updates). During heavy indexing, thousands of events per second can queue up.
The `SyncEventBus` is isolated to prevent sync events from being starved:
- **Capacity**: 10,000 events (vs 1,000 for general bus)
- **Priority**: Sync-critical events processed first
- **Droppable**: Metrics events can be dropped under load
### Event Types
```rust
enum SyncEvent {
// Device-owned state change ready to broadcast
StateChange {
library_id: Uuid,
model_type: String,
record_uuid: Uuid,
device_id: Uuid,
data: serde_json::Value,
timestamp: DateTime<Utc>,
},
// Shared resource change ready to broadcast
SharedChange {
library_id: Uuid,
entry: SharedChangeEntry,
},
// Metrics snapshot available
MetricsUpdated {
library_id: Uuid,
metrics: SyncMetricsSnapshot,
},
}
```
### Event Criticality
| Event | Critical | Can Drop |
| ----- | -------- | -------- |
| `StateChange` | Yes | No |
| `SharedChange` | Yes | No |
| `MetricsUpdated` | No | Yes |
Critical events trigger warnings if the bus lags. Non-critical events are silently dropped under load.
### Real-Time Batching
The event listener batches events before broadcasting:
```
1. Event arrives on SyncEventBus
2. Add to batch buffer
3. If buffer.len() >= 100 OR 50ms elapsed:
4. Flush batch as single network message
5. Reset buffer and timer
```
This reduces network overhead during rapid operations (e.g., bulk tagging).
## Implementation Status
<Info>See `core/tests/sync_backfill_test.rs`, `core/tests/sync_realtime_test.rs`, and `core/tests/sync_metrics_test.rs` for the test suite.</Info>
@ -857,28 +1431,47 @@ RUST_LOG=sd_core::sync=debug cargo run
**Device-Owned Models (4):**
- **Device** - Device records and metadata
- **Location** - Filesystem paths and mount points (with deletion)
- **Entry** - Files and folders within locations (with deletion + rebuild)
- **Volume** - Physical drives and volumes (with deletion)
| Model | Table | Dependencies | FK Mappings | Features |
| ----- | ----- | ------------ | ----------- | -------- |
| Device | `devices` | None | None | Root model |
| Location | `locations` | `device` | `device_id → devices`, `entry_id → entries` | with_deletion |
| Entry | `entries` | `content_identity`, `user_metadata` | `parent_id → entries`, `metadata_id → user_metadata`, `content_id → content_identities` | with_deletion, with_rebuild |
| Volume | `volumes` | `device` | None | with_deletion |
**Shared Models (15):**
- **Tag** - User-created labels
- **TagRelationship** - Tag hierarchy relationships (with rebuild)
- **Collection** - File groupings
- **CollectionEntry** - Many-to-many collection relationships
- **ContentIdentity** - Content-based file identification
- **UserMetadata** - User notes, ratings, and custom fields
- **UserMetadataTag** - Many-to-many tag relationships
- **AuditLog** - Action audit trail for compliance
- **Sidecar** - Generated files (thumbnails, previews, metadata caches)
- **Space** - User-defined workspace containers
- **SpaceGroup** - Groups within spaces
- **SpaceItem** - Items within space groups
- **VideoMediaData** - Video file metadata (duration, resolution, codec)
- **AudioMediaData** - Audio file metadata (bitrate, sample rate, channels)
- **ImageMediaData** - Image file metadata (dimensions, color space, EXIF)
| Model | Table | Dependencies | FK Mappings | Features |
| ----- | ----- | ------------ | ----------- | -------- |
| Tag | `tag` | None | None | - |
| TagRelationship | `tag_relationship` | `tag` | `parent_tag_id → tag`, `child_tag_id → tag` | with_rebuild |
| Collection | `collection` | None | None | - |
| CollectionEntry | `collection_entry` | `collection`, `entry` | `collection_id → collection`, `entry_id → entries` | - |
| ContentIdentity | `content_identities` | None | None | Deterministic UUID |
| UserMetadata | `user_metadata` | None | None | - |
| UserMetadataTag | `user_metadata_tag` | `user_metadata`, `tag` | `user_metadata_id → user_metadata`, `tag_id → tag`, `device_uuid → devices` | - |
| AuditLog | `audit_log` | None | None | - |
| Sidecar | `sidecar` | `content_identity` | `content_uuid → content_identities` | - |
| Space | `spaces` | None | None | - |
| SpaceGroup | `space_groups` | `space` | `space_id → spaces` | - |
| SpaceItem | `space_items` | `space`, `space_group` | `space_id → spaces`, `group_id → space_groups` | - |
| VideoMediaData | `video_media_data` | None | None | - |
| AudioMediaData | `audio_media_data` | None | None | - |
| ImageMediaData | `image_media_data` | None | None | - |
### Excluded Fields
Each model excludes certain fields from sync (local-only data):
| Model | Excluded Fields |
| ----- | --------------- |
| Device | `id` |
| Location | `id`, `scan_state`, `error_message`, `job_policies`, `created_at`, `updated_at` |
| Entry | `id`, `indexed_at` |
| Volume | `id`, `is_online`, `last_seen_at`, `last_speed_test_at`, `tracked_at` |
| ContentIdentity | `id`, `mime_type_id`, `kind_id`, `entry_count`, `*_media_data_id`, `first_seen_at`, `last_verified_at` |
| UserMetadata | `id`, `created_at`, `updated_at` |
| AuditLog | `id`, `created_at`, `updated_at`, `job_id` |
| Sidecar | `id`, `source_entry_id` |
All models sync automatically during creation, updates, and deletions. File indexing uses batch sync for both device-owned entries (`StateBatch`) and shared content identities (`SharedChangeBatch`) to reduce network overhead.
@ -924,22 +1517,24 @@ SyncConfig {
shared_broadcast_batch_size: 100, // Shared records per broadcast
max_snapshot_size: 100_000, // Max records in state snapshot
realtime_batch_max_entries: 100, // Max entries before flush
realtime_batch_flush_interval_ms: 50, // Auto-flush interval
realtime_batch_flush_interval_ms: 50, // Auto-flush interval (ms)
},
retention: RetentionConfig {
strategy: AcknowledgmentBased,
tombstone_max_retention_days: 7, // Hard limit for tombstone pruning
peer_log_max_retention_days: 7, // Hard limit for peer log pruning
force_full_sync_threshold_days: 25, // Force full sync if watermark older
},
network: NetworkConfig {
message_timeout_secs: 30, // Timeout for sync messages
backfill_request_timeout_secs: 60, // Timeout for backfill requests
sync_loop_interval_secs: 5, // Sync loop check interval
connection_check_interval_secs: 10, // How often to check peer connectivity
},
monitoring: MonitoringConfig {
pruning_interval_secs: 3600, // How often to prune sync.db
pruning_interval_secs: 3600, // How often to prune sync.db (1 hour)
enable_metrics: true, // Enable sync metrics collection
metrics_log_interval_secs: 300, // Log metrics every 5 minutes
metrics_log_interval_secs: 300, // Persist metrics every 5 minutes
},
}
```

View File

@ -151,6 +151,7 @@ export function useNormalizedQuery<I, O>(
}
let unsubscribe: (() => void) | undefined;
let isCancelled = false;
const handleEvent = (event: Event) => {
handleResourceEvent(
@ -172,10 +173,15 @@ export function useNormalizedQuery<I, O>(
handleEvent,
)
.then((unsub) => {
unsubscribe = unsub;
if (isCancelled) {
unsub();
} else {
unsubscribe = unsub;
}
});
return () => {
isCancelled = true;
unsubscribe?.();
};
}, [

View File

@ -34,6 +34,7 @@ interface SubscriptionEntry {
export class SubscriptionManager {
private subscriptions = new Map<string, SubscriptionEntry>();
private pendingSubscriptions = new Map<string, Promise<SubscriptionEntry>>();
private transport: Transport;
constructor(transport: Transport) {
@ -57,66 +58,97 @@ export class SubscriptionManager {
/**
* Subscribe to filtered events
* Reuses existing subscription if filter matches
* Handles concurrent subscription requests for the same filter
*/
async subscribe(
filter: EventFilter,
callback: (event: Event) => void,
): Promise<() => void> {
const key = this.getFilterKey(filter);
// Check if subscription already exists
let entry = this.subscriptions.get(key);
// Create new subscription if needed
if (!entry) {
const eventTypes = filter.event_types ?? [
"ResourceChanged",
"ResourceChangedBatch",
"ResourceDeleted",
"Refresh",
];
const unsubscribe = await this.transport.subscribe(
(event) => {
// Broadcast event to all listeners
const currentEntry = this.subscriptions.get(key);
if (currentEntry) {
currentEntry.listeners.forEach((listener) => listener(event));
}
},
{
event_types: eventTypes,
filter: {
resource_type: filter.resource_type,
path_scope: filter.path_scope,
library_id: filter.library_id,
include_descendants: filter.include_descendants,
},
},
);
entry = {
unsubscribe,
listeners: new Set(),
refCount: 0,
};
this.subscriptions.set(key, entry);
if (entry) {
entry.listeners.add(callback);
entry.refCount++;
return this.createCleanup(key, callback);
}
// Add listener and increment ref count
entry.listeners.add(callback);
entry.refCount++;
// Check if subscription is being created (concurrent request)
const pending = this.pendingSubscriptions.get(key);
if (pending) {
// Wait for the in-progress subscription to complete
entry = await pending;
entry.listeners.add(callback);
entry.refCount++;
return this.createCleanup(key, callback);
}
// Return cleanup function
// Create new subscription
const subscriptionPromise = this.createSubscription(key, filter);
this.pendingSubscriptions.set(key, subscriptionPromise);
try {
entry = await subscriptionPromise;
entry.listeners.add(callback);
entry.refCount++;
return this.createCleanup(key, callback);
} finally {
this.pendingSubscriptions.delete(key);
}
}
private async createSubscription(
key: string,
filter: EventFilter,
): Promise<SubscriptionEntry> {
const eventTypes = filter.event_types ?? [
"ResourceChanged",
"ResourceChangedBatch",
"ResourceDeleted",
"Refresh",
];
const unsubscribe = await this.transport.subscribe(
(event) => {
// Broadcast event to all listeners
const currentEntry = this.subscriptions.get(key);
if (currentEntry) {
currentEntry.listeners.forEach((listener) => listener(event));
}
},
{
event_types: eventTypes,
filter: {
resource_type: filter.resource_type,
path_scope: filter.path_scope,
library_id: filter.library_id,
include_descendants: filter.include_descendants,
},
},
);
const entry: SubscriptionEntry = {
unsubscribe,
listeners: new Set(),
refCount: 0,
};
this.subscriptions.set(key, entry);
return entry;
}
private createCleanup(
key: string,
callback: (event: Event) => void,
): () => void {
return () => {
const currentEntry = this.subscriptions.get(key);
if (!currentEntry) return;
// Remove listener and decrement ref count
currentEntry.listeners.delete(callback);
currentEntry.refCount--;
// Cleanup subscription if no more listeners
if (currentEntry.refCount === 0) {
currentEntry.unsubscribe();
this.subscriptions.delete(key);