From 7c864e9588e8950c5d5ccf138d7f63d7de5d83ef Mon Sep 17 00:00:00 2001 From: Jamie Pine Date: Fri, 28 Nov 2025 08:37:27 -0800 Subject: [PATCH] Buffer shared changes when FK dep missing --- core/src/service/sync/backfill.rs | 121 +++++++++++++++++++++++------- 1 file changed, 94 insertions(+), 27 deletions(-) diff --git a/core/src/service/sync/backfill.rs b/core/src/service/sync/backfill.rs index 0c61acb53..cc5a0837d 100644 --- a/core/src/service/sync/backfill.rs +++ b/core/src/service/sync/backfill.rs @@ -667,7 +667,52 @@ impl BackfillManager { // Apply entries in HLC order (already sorted from peer) for entry in &entries { - self.log_handler.handle_shared_change(entry.clone()).await?; + // Attempt to apply the shared change + // If it fails due to missing FK dependency, buffer it for retry + match self.log_handler.handle_shared_change(entry.clone()).await { + Ok(()) => { + // Success - resolve any waiting dependencies + } + Err(e) => { + let error_str = e.to_string(); + + // Check if this is a FK dependency error + if error_str.contains("Sync dependency missing") { + // Extract the missing UUID and buffer this entry + if let Some(missing_uuid) = super::dependency::extract_missing_dependency_uuid(&error_str) { + tracing::debug!( + record_uuid = %entry.record_uuid, + model_type = %entry.model_type, + missing_uuid = %missing_uuid, + "Shared resource has missing FK dependency, buffering for retry" + ); + + // Buffer this shared change for retry when dependency arrives + self.peer_sync + .dependency_tracker() + .add_dependency( + missing_uuid, + super::state::BufferedUpdate::SharedChange(entry.clone()), + ) + .await; + + continue; // Skip to next entry + } else { + tracing::warn!( + error = %e, + record_uuid = %entry.record_uuid, + "Sync dependency missing but couldn't extract UUID, buffering: {}", + error_str + ); + // TODO: Consider buffering blind or failing + continue; + } + } + + // Non-dependency error - fail backfill + return Err(e); + } + } // Resolve any state changes waiting for this shared resource // This handles cross-type dependencies (e.g., entries waiting for content_identities) @@ -686,18 +731,29 @@ impl BackfillManager { ); for update in waiting_updates { - if let super::state::BufferedUpdate::StateChange(dependent_change) = update - { - if let Err(e) = self - .peer_sync - .apply_state_change(dependent_change.clone()) - .await - { - tracing::warn!( - error = %e, - record_uuid = %dependent_change.record_uuid, - "Failed to apply dependent state change after shared resource backfill" - ); + match update { + super::state::BufferedUpdate::StateChange(dependent_change) => { + if let Err(e) = self + .peer_sync + .apply_state_change(dependent_change.clone()) + .await + { + tracing::warn!( + error = %e, + record_uuid = %dependent_change.record_uuid, + "Failed to apply dependent state change after shared resource backfill" + ); + } + } + super::state::BufferedUpdate::SharedChange(dependent_entry) => { + // Retry the shared change now that its dependency exists + if let Err(e) = self.log_handler.handle_shared_change(dependent_entry.clone()).await { + tracing::warn!( + error = %e, + record_uuid = %dependent_entry.record_uuid, + "Failed to apply dependent shared change after parent resolved" + ); + } } } } @@ -858,20 +914,31 @@ impl BackfillManager { ); for update in waiting_updates { - if let super::state::BufferedUpdate::StateChange( - dependent_change, - ) = update - { - if let Err(e) = self - .peer_sync - .apply_state_change(dependent_change.clone()) - .await - { - tracing::warn!( - error = %e, - record_uuid = %dependent_change.record_uuid, - "Failed to apply dependent state change after current_state snapshot" - ); + match update { + super::state::BufferedUpdate::StateChange(dependent_change) => { + if let Err(e) = self + .peer_sync + .apply_state_change(dependent_change.clone()) + .await + { + tracing::warn!( + error = %e, + record_uuid = %dependent_change.record_uuid, + "Failed to apply dependent state change after current_state snapshot" + ); + } + } + super::state::BufferedUpdate::SharedChange(dependent_entry) => { + // Retry the shared change now that its dependency exists + let entry_clone = dependent_entry.clone(); + let db = self.peer_sync.db().clone(); + if let Err(e) = crate::infra::sync::registry::apply_shared_change(entry_clone, db).await { + tracing::warn!( + error = %e, + record_uuid = %dependent_entry.record_uuid, + "Failed to apply dependent shared change after current_state snapshot" + ); + } } } }