From 00eebd6c23bbf63ca2b0a6e18d7dffb7188f8875 Mon Sep 17 00:00:00 2001 From: Jamie Pine Date: Fri, 28 Nov 2025 09:35:40 -0800 Subject: [PATCH] Add per peer watermark store for shared sync - Introduce PeerWatermarkStore to track max HLC received per peer for shared resource incremental sync - Initialize the peer_received_watermarks table at startup - Persist received watermarks during backfill and after backfill completion - Query the max watermark across peers via PeerWatermarkStore - Expose the new module and export PeerWatermarkStore from sync --- core/src/infra/sync/mod.rs | 2 + core/src/infra/sync/peer_log.rs | 5 + core/src/infra/sync/peer_watermarks.rs | 364 +++++++++++++++++++++++++ core/src/service/sync/backfill.rs | 7 +- core/src/service/sync/peer.rs | 32 ++- 5 files changed, 400 insertions(+), 10 deletions(-) create mode 100644 core/src/infra/sync/peer_watermarks.rs diff --git a/core/src/infra/sync/mod.rs b/core/src/infra/sync/mod.rs index 995d92d48..c64161bb9 100644 --- a/core/src/infra/sync/mod.rs +++ b/core/src/infra/sync/mod.rs @@ -18,6 +18,7 @@ pub mod event_bus; pub mod fk_mapper; pub mod hlc; pub mod peer_log; +pub mod peer_watermarks; pub mod registry; pub mod syncable; pub mod transaction; @@ -48,3 +49,4 @@ pub use syncable::Syncable; pub use transaction::{BulkOperation, BulkOperationMetadata, TransactionManager, TxError}; pub use transport::NetworkTransport; pub use watermarks::{ResourceWatermarkStore, WatermarkError}; +pub use peer_watermarks::PeerWatermarkStore; diff --git a/core/src/infra/sync/peer_log.rs b/core/src/infra/sync/peer_log.rs index d0f304501..eec8180cf 100644 --- a/core/src/infra/sync/peer_log.rs +++ b/core/src/infra/sync/peer_log.rs @@ -102,6 +102,11 @@ impl PeerLog { .await .map_err(|e| PeerLogError::QueryError(e.to_string()))?; + // peer_received_watermarks table (shared resource incremental sync) + super::peer_watermarks::PeerWatermarkStore::init_table(conn) + .await + .map_err(|e| PeerLogError::QueryError(e.to_string()))?; + // backfill_checkpoints table (resumable backfill) super::checkpoints::BackfillCheckpointStore::init_table(conn) .await diff --git a/core/src/infra/sync/peer_watermarks.rs b/core/src/infra/sync/peer_watermarks.rs new file mode 100644 index 000000000..c0bbbc5fe --- /dev/null +++ b/core/src/infra/sync/peer_watermarks.rs @@ -0,0 +1,364 @@ +//! Peer received watermark tracking for shared resource incremental sync +//! +//! Tracks the maximum HLC received from each peer for shared resources (tags, metadata, etc). +//! This enables incremental catch-up by requesting only entries newer than the watermark. + +use crate::infra::sync::hlc::HLC; +use sea_orm::{ConnectionTrait, DbBackend, Statement}; +use std::collections::HashMap; +use uuid::Uuid; + +/// Peer watermark tracking for shared resource incremental sync +/// +/// Manages per-peer watermarks in sync.db to track the maximum HLC received +/// from each peer. This prevents re-syncing shared resources that have already +/// been received. +pub struct PeerWatermarkStore { + device_uuid: Uuid, +} + +impl PeerWatermarkStore { + /// Create a new peer watermark store for a device + pub fn new(device_uuid: Uuid) -> Self { + Self { device_uuid } + } + + /// Initialize the peer_received_watermarks table in sync.db + pub async fn init_table(conn: &C) -> Result<(), WatermarkError> { + // Create main table + conn.execute(Statement::from_string( + DbBackend::Sqlite, + r#" + CREATE TABLE IF NOT EXISTS peer_received_watermarks ( + device_uuid TEXT NOT NULL, + peer_device_uuid TEXT NOT NULL, + max_received_hlc TEXT NOT NULL, + updated_at TEXT NOT NULL, + PRIMARY KEY (device_uuid, peer_device_uuid) + ) + "# + .to_string(), + )) + .await + .map_err(|e| WatermarkError::QueryError(e.to_string()))?; + + // Create index for efficient peer queries + conn.execute(Statement::from_string( + DbBackend::Sqlite, + "CREATE INDEX IF NOT EXISTS idx_peer_received_watermarks_peer + ON peer_received_watermarks(peer_device_uuid)" + .to_string(), + )) + .await + .map_err(|e| WatermarkError::QueryError(e.to_string()))?; + + Ok(()) + } + + /// Get max HLC received from a specific peer + pub async fn get( + &self, + conn: &C, + peer_device_uuid: Uuid, + ) -> Result, WatermarkError> { + let row = conn + .query_one(Statement::from_sql_and_values( + DbBackend::Sqlite, + "SELECT max_received_hlc FROM peer_received_watermarks + WHERE device_uuid = ? AND peer_device_uuid = ?", + vec![ + self.device_uuid.to_string().into(), + peer_device_uuid.to_string().into(), + ], + )) + .await + .map_err(|e| WatermarkError::QueryError(e.to_string()))?; + + match row { + Some(row) => { + let hlc_str: String = row + .try_get("", "max_received_hlc") + .map_err(|e| WatermarkError::QueryError(e.to_string()))?; + + let hlc = hlc_str + .parse() + .map_err(|e: crate::infra::sync::hlc::HLCError| { + WatermarkError::ParseError(e.to_string()) + })?; + + Ok(Some(hlc)) + } + None => Ok(None), + } + } + + /// Update max HLC received from peer (only if newer) + pub async fn upsert( + &self, + conn: &C, + peer_device_uuid: Uuid, + received_hlc: HLC, + ) -> Result<(), WatermarkError> { + // Prevent self-watermarks + if peer_device_uuid == self.device_uuid { + tracing::warn!( + device_uuid = %self.device_uuid, + peer_device_uuid = %peer_device_uuid, + "Attempted to track received HLC from self - skipping" + ); + return Ok(()); + } + + // Check if newer before updating + let existing = self.get(conn, peer_device_uuid).await?; + + if let Some(existing_hlc) = existing { + if received_hlc <= existing_hlc { + return Ok(()); + } + } + + // Upsert + conn.execute(Statement::from_sql_and_values( + DbBackend::Sqlite, + r#" + INSERT INTO peer_received_watermarks + (device_uuid, peer_device_uuid, max_received_hlc, updated_at) + VALUES (?, ?, ?, ?) + ON CONFLICT (device_uuid, peer_device_uuid) + DO UPDATE SET + max_received_hlc = excluded.max_received_hlc, + updated_at = excluded.updated_at + "#, + vec![ + self.device_uuid.to_string().into(), + peer_device_uuid.to_string().into(), + received_hlc.to_string().into(), + chrono::Utc::now().to_rfc3339().into(), + ], + )) + .await + .map_err(|e| WatermarkError::QueryError(e.to_string()))?; + + Ok(()) + } + + /// Get all received watermarks (for diagnostics) + pub async fn get_all( + &self, + conn: &C, + ) -> Result, WatermarkError> { + let rows = conn + .query_all(Statement::from_sql_and_values( + DbBackend::Sqlite, + "SELECT peer_device_uuid, max_received_hlc FROM peer_received_watermarks + WHERE device_uuid = ?", + vec![self.device_uuid.to_string().into()], + )) + .await + .map_err(|e| WatermarkError::QueryError(e.to_string()))?; + + let mut results = HashMap::new(); + for row in rows { + let peer_str: String = row + .try_get("", "peer_device_uuid") + .map_err(|e| WatermarkError::QueryError(e.to_string()))?; + + let hlc_str: String = row + .try_get("", "max_received_hlc") + .map_err(|e| WatermarkError::QueryError(e.to_string()))?; + + if let (Ok(peer_uuid), Ok(hlc)) = (Uuid::parse_str(&peer_str), hlc_str.parse()) { + results.insert(peer_uuid, hlc); + } + } + + Ok(results) + } + + /// Get the maximum HLC received from any peer (for catch-up decisions) + pub async fn get_max_across_all_peers( + &self, + conn: &C, + ) -> Result, WatermarkError> { + let row = conn + .query_one(Statement::from_sql_and_values( + DbBackend::Sqlite, + "SELECT MAX(max_received_hlc) as max_hlc FROM peer_received_watermarks + WHERE device_uuid = ?", + vec![self.device_uuid.to_string().into()], + )) + .await + .map_err(|e| WatermarkError::QueryError(e.to_string()))?; + + match row { + Some(row) => { + let hlc_str: Option = row.try_get("", "max_hlc").ok(); + + match hlc_str { + Some(s) => { + let hlc = s.parse().map_err(|e: crate::infra::sync::hlc::HLCError| { + WatermarkError::ParseError(e.to_string()) + })?; + Ok(Some(hlc)) + } + None => Ok(None), + } + } + None => Ok(None), + } + } +} + +/// Watermark errors +#[derive(Debug, thiserror::Error)] +pub enum WatermarkError { + #[error("Database query error: {0}")] + QueryError(String), + + #[error("Parse error: {0}")] + ParseError(String), +} + +#[cfg(test)] +mod tests { + use super::*; + use sea_orm::Database; + use tempfile::TempDir; + + async fn create_test_db() -> (sea_orm::DatabaseConnection, TempDir) { + let temp_dir = TempDir::new().unwrap(); + let db_path = temp_dir.path().join("test_peer_watermarks.db"); + let database_url = format!("sqlite://{}?mode=rwc", db_path.display()); + let conn = Database::connect(&database_url).await.unwrap(); + + PeerWatermarkStore::init_table(&conn).await.unwrap(); + + (conn, temp_dir) + } + + #[tokio::test] + async fn test_peer_watermark_persistence() { + let (conn, _temp) = create_test_db().await; + + let device_uuid = Uuid::new_v4(); + let peer = Uuid::new_v4(); + let store = PeerWatermarkStore::new(device_uuid); + + let hlc = HLC::now(peer); + + // Store watermark + store.upsert(&conn, peer, hlc).await.unwrap(); + + // Query back + let retrieved = store.get(&conn, peer).await.unwrap(); + assert_eq!(retrieved, Some(hlc)); + } + + #[tokio::test] + async fn test_prevent_self_watermark() { + let (conn, _temp) = create_test_db().await; + + let device_uuid = Uuid::new_v4(); + let store = PeerWatermarkStore::new(device_uuid); + + let hlc = HLC::now(device_uuid); + + // Attempt to track self + let result = store.upsert(&conn, device_uuid, hlc).await; + assert!(result.is_ok()); + + // Verify not stored + let retrieved = store.get(&conn, device_uuid).await.unwrap(); + assert_eq!(retrieved, None); + } + + #[tokio::test] + async fn test_watermark_only_advances() { + let (conn, _temp) = create_test_db().await; + + let device_uuid = Uuid::new_v4(); + let peer = Uuid::new_v4(); + let store = PeerWatermarkStore::new(device_uuid); + + // Store HLC(1000) + let hlc1 = HLC { + timestamp: 1000, + counter: 0, + device_id: peer, + }; + store.upsert(&conn, peer, hlc1).await.unwrap(); + + // Try to store older HLC(500) + let hlc2 = HLC { + timestamp: 500, + counter: 0, + device_id: peer, + }; + store.upsert(&conn, peer, hlc2).await.unwrap(); + + // Should still be HLC(1000) + let retrieved = store.get(&conn, peer).await.unwrap(); + assert_eq!(retrieved.unwrap().timestamp, 1000); + } + + #[tokio::test] + async fn test_get_max_across_all_peers() { + let (conn, _temp) = create_test_db().await; + + let device_uuid = Uuid::new_v4(); + let store = PeerWatermarkStore::new(device_uuid); + + let peer1 = Uuid::new_v4(); + let peer2 = Uuid::new_v4(); + let peer3 = Uuid::new_v4(); + + // Store different HLCs from different peers + let hlc1 = HLC { + timestamp: 1000, + counter: 0, + device_id: peer1, + }; + let hlc2 = HLC { + timestamp: 2000, + counter: 0, + device_id: peer2, + }; + let hlc3 = HLC { + timestamp: 1500, + counter: 0, + device_id: peer3, + }; + + store.upsert(&conn, peer1, hlc1).await.unwrap(); + store.upsert(&conn, peer2, hlc2).await.unwrap(); + store.upsert(&conn, peer3, hlc3).await.unwrap(); + + // Get max should return peer2's HLC(2000) + let max = store.get_max_across_all_peers(&conn).await.unwrap(); + assert!(max.is_some()); + assert_eq!(max.unwrap().timestamp, 2000); + } + + #[tokio::test] + async fn test_get_all() { + let (conn, _temp) = create_test_db().await; + + let device_uuid = Uuid::new_v4(); + let store = PeerWatermarkStore::new(device_uuid); + + let peer1 = Uuid::new_v4(); + let peer2 = Uuid::new_v4(); + + let hlc1 = HLC::now(peer1); + let hlc2 = HLC::now(peer2); + + store.upsert(&conn, peer1, hlc1).await.unwrap(); + store.upsert(&conn, peer2, hlc2).await.unwrap(); + + let all = store.get_all(&conn).await.unwrap(); + assert_eq!(all.len(), 2); + assert_eq!(all.get(&peer1), Some(&hlc1)); + assert_eq!(all.get(&peer2), Some(&hlc2)); + } +} diff --git a/core/src/service/sync/backfill.rs b/core/src/service/sync/backfill.rs index ad90c9713..0a01860b2 100644 --- a/core/src/service/sync/backfill.rs +++ b/core/src/service/sync/backfill.rs @@ -159,7 +159,7 @@ impl BackfillManager { self.peer_sync.transition_to_ready().await?; // Phase 5: Set initial watermarks from actual received data (not local DB query) - self.set_initial_watermarks_after_backfill(final_state_checkpoint, max_shared_hlc) + self.set_initial_watermarks_after_backfill(selected_peer, final_state_checkpoint, max_shared_hlc) .await?; // Record metrics @@ -222,7 +222,7 @@ impl BackfillManager { .await?; // Update watermarks from actual received data (not local DB query) - self.set_initial_watermarks_after_backfill(final_state_checkpoint, max_shared_hlc) + self.set_initial_watermarks_after_backfill(peer, final_state_checkpoint, max_shared_hlc) .await?; // Record metrics @@ -1074,11 +1074,12 @@ impl BackfillManager { /// Uses actual checkpoints from received data, not local database queries async fn set_initial_watermarks_after_backfill( &self, + peer: Uuid, final_state_checkpoint: Option, max_shared_hlc: Option, ) -> Result<()> { self.peer_sync - .set_initial_watermarks(final_state_checkpoint, max_shared_hlc) + .set_initial_watermarks(peer, final_state_checkpoint, max_shared_hlc) .await } } diff --git a/core/src/service/sync/peer.rs b/core/src/service/sync/peer.rs index 4a2919ed7..322cf4cb2 100644 --- a/core/src/service/sync/peer.rs +++ b/core/src/service/sync/peer.rs @@ -327,7 +327,7 @@ impl PeerSync { /// Query watermarks from sync.db (per-resource aggregation) /// /// For state watermark: Returns the maximum (most recent) timestamp across all resources - /// For shared watermark: Returns the maximum HLC from peer log + /// For shared watermark: Returns the maximum HLC received from any peer async fn query_device_watermarks( device_id: Uuid, peer_log: &Arc, @@ -348,14 +348,18 @@ impl PeerSync { } }; - // Get max shared watermark from peer log - let shared_watermark = match peer_log.get_max_hlc().await { + // Get max shared watermark from peer_received_watermarks (not our peer log!) + // This tracks what we've RECEIVED from peers, not what we've SENT + let shared_watermark = match crate::infra::sync::PeerWatermarkStore::new(device_id) + .get_max_across_all_peers(peer_log.conn()) + .await + { Ok(max) => max, Err(e) => { warn!( device_id = %device_id, error = %e, - "Failed to query max HLC from peer log" + "Failed to query received HLC watermarks from peers" ); None } @@ -368,7 +372,7 @@ impl PeerSync { /// /// Returns (state_watermark, shared_watermark) aggregated from sync.db. /// State watermark: Maximum timestamp across all per-resource watermarks. - /// Shared watermark (HLC): Maximum HLC from peer log. + /// Shared watermark (HLC): Maximum HLC received from any peer. pub async fn get_watermarks(&self) -> (Option>, Option) { Self::query_device_watermarks(self.device_id, &self.peer_log).await } @@ -389,16 +393,18 @@ impl PeerSync { Ok(()) } - /// Mark backfill complete by updating last_sync_at + /// Mark backfill complete by updating last_sync_at and persisting shared watermark /// /// Note: Per-resource watermarks are now tracked automatically as data is received. - /// This method only updates the last_sync_at timestamp. + /// This method updates the last_sync_at timestamp and persists the shared watermark. pub async fn set_initial_watermarks( &self, + peer: Uuid, _final_state_checkpoint: Option, max_shared_hlc: Option, ) -> Result<()> { use crate::infra::db::entities; + use crate::infra::sync::PeerWatermarkStore; use sea_orm::{ActiveModelTrait, ActiveValue::Set, ColumnTrait, EntityTrait, QueryFilter}; let now = chrono::Utc::now(); @@ -410,6 +416,18 @@ impl PeerSync { hlc = %hlc, "Updated HLC generator from backfill max HLC" ); + + // Persist the received watermark to survive restarts + PeerWatermarkStore::new(self.device_id) + .upsert(self.peer_log.conn(), peer, hlc) + .await + .map_err(|e| anyhow::anyhow!("Failed to persist peer watermark: {}", e))?; + + info!( + peer = %peer, + max_received_hlc = %hlc, + "Persisted shared watermark for peer" + ); } // Update last_sync_at to mark backfill complete