refactor: improve code readability and consistency in sync watermark tests; format and organize test setup and logging for better clarity

This commit is contained in:
Jamie Pine 2025-11-14 06:52:29 -08:00
parent 3e78e8c3c8
commit 1ae1bcea6b
3 changed files with 159 additions and 73 deletions

View File

@ -4,7 +4,10 @@ use sd_core::{
infra::sync::{NetworkTransport, Syncable},
service::{network::protocol::sync::messages::SyncMessage, sync::SyncService},
};
use std::{collections::HashMap, sync::{Arc, Weak}};
use std::{
collections::HashMap,
sync::{Arc, Weak},
};
use tokio::sync::Mutex;
use uuid::Uuid;
@ -44,15 +47,30 @@ impl MockTransport {
let history = Arc::new(Mutex::new(Vec::new()));
let sync_services = Arc::new(Mutex::new(HashMap::new()));
let transport_a = Self::new(device_a, vec![device_b], queues.clone(), history.clone(), sync_services.clone());
let transport_b = Self::new(device_b, vec![device_a], queues.clone(), history.clone(), sync_services.clone());
let transport_a = Self::new(
device_a,
vec![device_b],
queues.clone(),
history.clone(),
sync_services.clone(),
);
let transport_b = Self::new(
device_b,
vec![device_a],
queues.clone(),
history.clone(),
sync_services.clone(),
);
(transport_a, transport_b)
}
/// Register a sync service for request/response handling
pub async fn register_sync_service(&self, device_id: Uuid, sync_service: Weak<SyncService>) {
self.sync_services.lock().await.insert(device_id, sync_service);
self.sync_services
.lock()
.await
.insert(device_id, sync_service);
}
/// Process incoming messages by delivering them to the sync service
@ -80,31 +98,45 @@ impl MockTransport {
} => {
sync_service
.peer_sync()
.on_state_change_received(sd_core::service::sync::state::StateChangeMessage {
model_type,
record_uuid,
device_id,
data,
timestamp,
})
.on_state_change_received(
sd_core::service::sync::state::StateChangeMessage {
model_type,
record_uuid,
device_id,
data,
timestamp,
},
)
.await?;
}
SyncMessage::SharedChange { library_id: _, entry } => {
sync_service.peer_sync().on_shared_change_received(entry).await?;
SyncMessage::SharedChange {
library_id: _,
entry,
} => {
sync_service
.peer_sync()
.on_shared_change_received(entry)
.await?;
}
SyncMessage::AckSharedChanges {
library_id: _,
from_device,
up_to_hlc,
} => {
sync_service.peer_sync().on_ack_received(from_device, up_to_hlc).await?;
sync_service
.peer_sync()
.on_ack_received(from_device, up_to_hlc)
.await?;
}
SyncMessage::SharedChangeRequest {
library_id,
since_hlc,
limit,
} => {
let (entries, has_more) = sync_service.peer_sync().get_shared_changes(since_hlc, limit).await?;
let (entries, has_more) = sync_service
.peer_sync()
.get_shared_changes(since_hlc, limit)
.await?;
let current_state = if since_hlc.is_none() {
Some(sync_service.peer_sync().get_full_shared_state().await?)
} else {
@ -120,23 +152,37 @@ impl MockTransport {
self.send_sync_message(sender, response).await?;
}
SyncMessage::SharedChangeResponse { entries, current_state, .. } => {
SyncMessage::SharedChangeResponse {
entries,
current_state,
..
} => {
// Deliver to backfill manager (it handles duplicate/unexpected responses gracefully)
let _ = sync_service.backfill_manager().deliver_shared_response(message_clone).await;
let _ = sync_service
.backfill_manager()
.deliver_shared_response(message_clone)
.await;
// Also process entries directly (for tests that send manual requests)
for entry in entries {
sync_service.peer_sync().on_shared_change_received(entry).await?;
sync_service
.peer_sync()
.on_shared_change_received(entry)
.await?;
}
// Apply current_state snapshot if provided
if let Some(state) = current_state {
if let Some(tags) = state["tag"].as_array() {
for tag_data in tags {
let uuid: Uuid = Uuid::parse_str(tag_data["uuid"].as_str().unwrap())?;
let uuid: Uuid =
Uuid::parse_str(tag_data["uuid"].as_str().unwrap())?;
let data = tag_data["data"].clone();
use sd_core::infra::{db::entities, sync::{ChangeType, SharedChangeEntry, HLC}};
use sd_core::infra::{
db::entities,
sync::{ChangeType, SharedChangeEntry, HLC},
};
entities::tag::Model::apply_shared_change(
SharedChangeEntry {
hlc: HLC::now(self.my_device_id),
@ -158,10 +204,16 @@ impl MockTransport {
my_state_watermark: peer_state_watermark,
my_shared_watermark: peer_shared_watermark,
} => {
let (our_state_watermark, our_shared_watermark) = sync_service.peer_sync().get_watermarks().await;
let (our_state_watermark, our_shared_watermark) =
sync_service.peer_sync().get_watermarks().await;
let needs_state_catchup = matches!((peer_state_watermark, our_state_watermark), (Some(p), Some(o)) if o > p) || matches!((peer_state_watermark, our_state_watermark), (None, Some(_)));
let needs_shared_catchup = matches!((peer_shared_watermark, our_shared_watermark), (Some(p), Some(o)) if o > p) || matches!((peer_shared_watermark, our_shared_watermark), (None, Some(_)));
let needs_state_catchup = matches!((peer_state_watermark, our_state_watermark), (Some(p), Some(o)) if o > p)
|| matches!((peer_state_watermark, our_state_watermark), (None, Some(_)));
let needs_shared_catchup = matches!((peer_shared_watermark, our_shared_watermark), (Some(p), Some(o)) if o > p)
|| matches!(
(peer_shared_watermark, our_shared_watermark),
(None, Some(_))
);
let response = SyncMessage::WatermarkExchangeResponse {
library_id,
@ -202,19 +254,22 @@ impl MockTransport {
batch_size: _,
} => {
let response = SyncMessage::StateResponse {
library_id,
model_type: model_types.first().cloned().unwrap_or_default(),
device_id: requested_device_id.unwrap_or(self.my_device_id),
records: vec![],
deleted_uuids: vec![],
has_more: false,
checkpoint: None,
};
library_id,
model_type: model_types.first().cloned().unwrap_or_default(),
device_id: requested_device_id.unwrap_or(self.my_device_id),
records: vec![],
deleted_uuids: vec![],
has_more: false,
checkpoint: None,
};
self.send_sync_message(sender, response).await?;
}
SyncMessage::StateResponse { .. } => {
sync_service.backfill_manager().deliver_state_response(message_clone).await?;
sync_service
.backfill_manager()
.deliver_state_response(message_clone)
.await?;
}
_ => {}
}
@ -248,7 +303,11 @@ impl MockTransport {
#[async_trait::async_trait]
impl NetworkTransport for MockTransport {
async fn send_sync_message(&self, target_device: Uuid, message: SyncMessage) -> anyhow::Result<()> {
async fn send_sync_message(
&self,
target_device: Uuid,
message: SyncMessage,
) -> anyhow::Result<()> {
if !self.connected_peers.contains(&target_device) {
return Err(anyhow::anyhow!("device {} not connected", target_device));
}
@ -275,7 +334,7 @@ impl NetworkTransport for MockTransport {
) -> anyhow::Result<SyncMessage> {
// For testing: invoke the actual protocol handler on the target device
// This simulates the bidirectional stream request/response pattern
if !self.connected_peers.contains(&target_device) {
return Err(anyhow::anyhow!("device {} not connected", target_device));
}
@ -286,7 +345,12 @@ impl NetworkTransport for MockTransport {
services
.get(&target_device)
.and_then(|weak| weak.upgrade())
.ok_or_else(|| anyhow::anyhow!("Target sync service not registered for device {}", target_device))?
.ok_or_else(|| {
anyhow::anyhow!(
"Target sync service not registered for device {}",
target_device
)
})?
};
// Record in history
@ -341,9 +405,9 @@ impl NetworkTransport for MockTransport {
let has_more = records.len() >= *batch_size;
let next_checkpoint = if has_more {
records.last().map(|r| {
format!("{}|{}", r.timestamp.to_rfc3339(), r.uuid)
})
records
.last()
.map(|r| format!("{}|{}", r.timestamp.to_rfc3339(), r.uuid))
} else {
None
};
@ -358,7 +422,9 @@ impl NetworkTransport for MockTransport {
has_more,
}
}
SyncMessage::SharedChangeRequest { since_hlc, limit, .. } => {
SyncMessage::SharedChangeRequest {
since_hlc, limit, ..
} => {
// Query actual shared changes from target device
let (entries, has_more) = sync_service
.peer_sync()

View File

@ -108,20 +108,34 @@ impl WatermarkTestSetup {
let (transport_a, transport_b) = MockTransport::new_pair(device_a_id, device_b_id);
library_a
.init_sync_service(device_a_id, transport_a.clone() as Arc<dyn NetworkTransport>)
.init_sync_service(
device_a_id,
transport_a.clone() as Arc<dyn NetworkTransport>,
)
.await?;
library_b
.init_sync_service(device_b_id, transport_b.clone() as Arc<dyn NetworkTransport>)
.init_sync_service(
device_b_id,
transport_b.clone() as Arc<dyn NetworkTransport>,
)
.await?;
// Register sync services with transports for request/response BEFORE backfill starts
if let Some(sync_a) = library_a.sync_service() {
transport_a.register_sync_service(device_a_id, Arc::downgrade(&sync_a)).await;
transport_b.register_sync_service(device_a_id, Arc::downgrade(&sync_a)).await; // Register on both
transport_a
.register_sync_service(device_a_id, Arc::downgrade(&sync_a))
.await;
transport_b
.register_sync_service(device_a_id, Arc::downgrade(&sync_a))
.await; // Register on both
}
if let Some(sync_b) = library_b.sync_service() {
transport_a.register_sync_service(device_b_id, Arc::downgrade(&sync_b)).await; // Register on both
transport_b.register_sync_service(device_b_id, Arc::downgrade(&sync_b)).await;
transport_a
.register_sync_service(device_b_id, Arc::downgrade(&sync_b))
.await; // Register on both
transport_b
.register_sync_service(device_b_id, Arc::downgrade(&sync_b))
.await;
}
let setup = Self {
@ -315,9 +329,9 @@ impl WatermarkTestSetup {
timestamp: chrono::DateTime<chrono::Utc>,
) -> anyhow::Result<entities::volume::Model> {
use sd_core::domain::volume::VolumeFingerprint;
let fingerprint = VolumeFingerprint::new(name, 1_000_000_000, "ext4");
let volume = entities::volume::ActiveModel {
id: sea_orm::ActiveValue::NotSet,
uuid: Set(Uuid::new_v4()),
@ -395,11 +409,7 @@ async fn test_watermark_bug_with_10k_mixed_resources() -> anyhow::Result<()> {
for i in 0..50 {
let timestamp = base_time + chrono::Duration::minutes(i as i64 % 5);
let location = setup
.create_location_with_timestamp(
&format!("Location_{}", i),
device_a.id,
timestamp,
)
.create_location_with_timestamp(&format!("Location_{}", i), device_a.id, timestamp)
.await?;
setup
.library_a
@ -417,7 +427,8 @@ async fn test_watermark_bug_with_10k_mixed_resources() -> anyhow::Result<()> {
println!("Creating 12,000 entries with timestamps at T+20 minutes onwards...");
println!("(This will be batched into multiple syncs)");
for i in 0..12_000 {
let timestamp = base_time + chrono::Duration::minutes(20) + chrono::Duration::seconds(i as i64);
let timestamp =
base_time + chrono::Duration::minutes(20) + chrono::Duration::seconds(i as i64);
let entry = setup
.create_entry_with_timestamp(&format!("Entry_{}", i), 0, timestamp)
.await?;
@ -454,14 +465,17 @@ async fn test_watermark_bug_with_10k_mixed_resources() -> anyhow::Result<()> {
// Check how much synced
let locations_partial = setup.count_locations(&setup.library_b).await?;
let entries_partial = setup.count_entries(&setup.library_b).await?;
println!("\nAfter first backfill pass:");
println!(" Locations: {}", locations_partial);
println!(" Entries: {}", entries_partial);
// Get watermark after partial sync
let watermark_after_partial = setup.get_device_watermark().await?;
println!("\nWatermark after partial sync: {:?}", watermark_after_partial);
println!(
"\nWatermark after partial sync: {:?}",
watermark_after_partial
);
println!("\n=== Phase 3: Simulate disconnection and create MORE entries ===\n");
@ -486,7 +500,10 @@ async fn test_watermark_bug_with_10k_mixed_resources() -> anyhow::Result<()> {
let entries_mid = setup.count_entries(&setup.library_b).await?;
let watermark_mid = setup.get_device_watermark().await?;
println!("After syncing some new entries: {} total entries", entries_mid);
println!(
"After syncing some new entries: {} total entries",
entries_mid
);
println!("Watermark advanced to: {:?}", watermark_mid);
println!("\n=== Phase 4: Reconnect and trigger catchup ===\n");
@ -496,11 +513,12 @@ async fn test_watermark_bug_with_10k_mixed_resources() -> anyhow::Result<()> {
println!("Triggering watermark-based catchup...");
let sync_b = setup.library_b.sync_service().unwrap();
let backfill_mgr = sync_b.backfill_manager();
// Get current watermark before catchup
let (state_watermark_before, shared_watermark_before) = sync_b.peer_sync().get_watermarks().await;
let (state_watermark_before, shared_watermark_before) =
sync_b.peer_sync().get_watermarks().await;
println!("Using watermark for catchup: {:?}", state_watermark_before);
// Call catch_up_from_peer directly (watermark exchange doesn't trigger it due to TODO)
backfill_mgr
.catch_up_from_peer(
@ -525,7 +543,10 @@ async fn test_watermark_bug_with_10k_mixed_resources() -> anyhow::Result<()> {
println!("Sync Results:");
println!(" Locations: {} / 50 expected", locations_synced);
println!(" Entries: {} / 15,000 expected (12k initial + 3k more)", entries_synced);
println!(
" Entries: {} / 15,000 expected (12k initial + 3k more)",
entries_synced
);
println!("\n=== Assertions ===\n");
@ -550,4 +571,3 @@ async fn test_watermark_bug_with_10k_mixed_resources() -> anyhow::Result<()> {
Ok(())
}

View File

@ -166,18 +166,19 @@ async fn test_global_watermark_filters_earlier_resources() -> anyhow::Result<()>
.await?;
// Query entries with watermark
let entries_result = entities::entry::Model::query_for_sync(
None,
Some(watermark_at_t25),
None,
1000,
db,
)
.await?;
let entries_result =
entities::entry::Model::query_for_sync(None, Some(watermark_at_t25), None, 1000, db)
.await?;
println!("Query Results:");
println!(" Locations returned: {} (expected 0 due to watermark filter)", locations_result.len());
println!(" Entries returned: {} (expected some, those >= T+25)", entries_result.len());
println!(
" Locations returned: {} (expected 0 due to watermark filter)",
locations_result.len()
);
println!(
" Entries returned: {} (expected some, those >= T+25)",
entries_result.len()
);
println!();
println!("=== Demonstrating the Bug ===\n");
@ -250,4 +251,3 @@ async fn test_per_resource_watermark_solution() -> anyhow::Result<()> {
Ok(())
}