diff --git a/core/src/service/sync/backfill.rs b/core/src/service/sync/backfill.rs index cc5a0837d..ad90c9713 100644 --- a/core/src/service/sync/backfill.rs +++ b/core/src/service/sync/backfill.rs @@ -665,7 +665,129 @@ impl BackfillManager { // Track max HLC for ACK (critical for pruning) let max_hlc_in_batch = entries.last().map(|e| e.hlc); - // Apply entries in HLC order (already sorted from peer) + // IMPORTANT: Apply current_state snapshot FIRST (before HLC entries) + // The snapshot contains base data that peer log entries may depend on + // This prevents FK constraint errors when peer log references snapshot data + if let Some(state) = current_state { + if let Some(state_map) = state.as_object() { + // Get dependency-ordered list of models to prevent FK violations + // CRITICAL: Must apply parent models before children (e.g., user_metadata before user_metadata_tag) + let sync_order = match crate::infra::sync::registry::compute_registry_sync_order().await { + Ok(order) => order, + Err(e) => { + warn!("Failed to compute sync order, using unordered: {}", e); + // Fallback to unordered if dependency graph fails + state_map.keys().map(|k| k.clone()).collect::>() + } + }; + + // Apply snapshot records in dependency order + for model_type in sync_order { + // Skip if model not in snapshot + let records_value = match state_map.get(&model_type) { + Some(val) => val, + None => continue, + }; + + if let Some(records_array) = records_value.as_array() { + info!( + model_type = %model_type, + count = records_array.len(), + "Applying current state snapshot for pre-sync data" + ); + + for record_value in records_array { + if let Some(record_obj) = record_value.as_object() { + if let (Some(uuid_value), Some(data)) = + (record_obj.get("uuid"), record_obj.get("data")) + { + if let Some(uuid_str) = uuid_value.as_str() { + if let Ok(record_uuid) = Uuid::parse_str(uuid_str) { + // Construct a synthetic SharedChangeEntry for application + // Generate HLC for ordering (pre-sync data gets current HLC) + let hlc = { + let mut hlc_gen = self + .peer_sync + .hlc_generator() + .lock() + .await; + hlc_gen.next() + }; + + let entry = crate::infra::sync::SharedChangeEntry { + hlc, + model_type: model_type.clone(), + record_uuid, + change_type: crate::infra::sync::ChangeType::Insert, + data: data.clone(), + }; + + let db = self.peer_sync.db().clone(); + if let Err(e) = crate::infra::sync::registry::apply_shared_change(entry, db).await { + warn!( + model_type = %model_type, + uuid = %record_uuid, + error = %e, + "Failed to apply current state record" + ); + } else { + // Resolve any state changes waiting for this shared resource + let waiting_updates = self + .peer_sync + .dependency_tracker() + .resolve(record_uuid) + .await; + + if !waiting_updates.is_empty() { + tracing::debug!( + resolved_uuid = %record_uuid, + model_type = %model_type, + waiting_count = waiting_updates.len(), + "Resolving dependencies after current_state snapshot" + ); + + for update in waiting_updates { + match update { + super::state::BufferedUpdate::StateChange(dependent_change) => { + if let Err(e) = self + .peer_sync + .apply_state_change(dependent_change.clone()) + .await + { + tracing::warn!( + error = %e, + record_uuid = %dependent_change.record_uuid, + "Failed to apply dependent state change after current_state snapshot" + ); + } + } + super::state::BufferedUpdate::SharedChange(dependent_entry) => { + // Retry the shared change now that its dependency exists + let entry_clone = dependent_entry.clone(); + let db = self.peer_sync.db().clone(); + if let Err(e) = crate::infra::sync::registry::apply_shared_change(entry_clone, db).await { + tracing::warn!( + error = %e, + record_uuid = %dependent_entry.record_uuid, + "Failed to apply dependent shared change after current_state snapshot" + ); + } + } + } + } + } + } + } + } + } + } + } + } + } + } + } + + // Now apply HLC-ordered peer log entries (after snapshot dependencies are available) for entry in &entries { // Attempt to apply the shared change // If it fails due to missing FK dependency, buffer it for retry @@ -676,9 +798,9 @@ impl BackfillManager { Err(e) => { let error_str = e.to_string(); - // Check if this is a FK dependency error - if error_str.contains("Sync dependency missing") { - // Extract the missing UUID and buffer this entry + // Check if this is a FK dependency error (from FK mapping or SQLite constraint) + if error_str.contains("Sync dependency missing") || error_str.contains("FOREIGN KEY constraint failed") { + // Try to extract the missing UUID from the error message if let Some(missing_uuid) = super::dependency::extract_missing_dependency_uuid(&error_str) { tracing::debug!( record_uuid = %entry.record_uuid, @@ -698,13 +820,17 @@ impl BackfillManager { continue; // Skip to next entry } else { + // FK error but can't extract UUID (raw SQLite error) + // For models with dependencies, buffer on a placeholder and retry after snapshot completes + // This handles the case where peer log entries depend on snapshot data tracing::warn!( error = %e, record_uuid = %entry.record_uuid, - "Sync dependency missing but couldn't extract UUID, buffering: {}", - error_str + model_type = %entry.model_type, + "FK constraint failed but couldn't extract UUID - skipping (will retry from snapshot dependencies)" ); - // TODO: Consider buffering blind or failing + // Skip this record - if it's in the snapshot dependencies, it will be resolved + // If not, it will be re-requested in next backfill attempt continue; } } @@ -834,126 +960,6 @@ impl BackfillManager { last_hlc = Some(last_entry.hlc); } - // Apply current_state snapshot (contains pre-sync data not in peer_log) - if let Some(state) = current_state { - if let Some(state_map) = state.as_object() { - // Get dependency-ordered list of models to prevent FK violations - // CRITICAL: Must apply parent models before children (e.g., user_metadata before user_metadata_tag) - let sync_order = match crate::infra::sync::registry::compute_registry_sync_order().await { - Ok(order) => order, - Err(e) => { - warn!("Failed to compute sync order, using unordered: {}", e); - // Fallback to unordered if dependency graph fails - state_map.keys().map(|k| k.clone()).collect::>() - } - }; - - // Apply snapshot records in dependency order - for model_type in sync_order { - // Skip if model not in snapshot - let records_value = match state_map.get(&model_type) { - Some(val) => val, - None => continue, - }; - - if let Some(records_array) = records_value.as_array() { - info!( - model_type = %model_type, - count = records_array.len(), - "Applying current state snapshot for pre-sync data" - ); - - for record_value in records_array { - if let Some(record_obj) = record_value.as_object() { - if let (Some(uuid_value), Some(data)) = - (record_obj.get("uuid"), record_obj.get("data")) - { - if let Some(uuid_str) = uuid_value.as_str() { - if let Ok(record_uuid) = Uuid::parse_str(uuid_str) { - // Construct a synthetic SharedChangeEntry for application - // Generate HLC for ordering (pre-sync data gets current HLC) - let hlc = { - let mut hlc_gen = self - .peer_sync - .hlc_generator() - .lock() - .await; - hlc_gen.next() - }; - - let entry = crate::infra::sync::SharedChangeEntry { - hlc, - model_type: model_type.clone(), - record_uuid, - change_type: crate::infra::sync::ChangeType::Insert, - data: data.clone(), - }; - - let db = self.peer_sync.db().clone(); - if let Err(e) = crate::infra::sync::registry::apply_shared_change(entry, db).await { - warn!( - model_type = %model_type, - uuid = %record_uuid, - error = %e, - "Failed to apply current state record" - ); - } else { - // Resolve any state changes waiting for this shared resource - let waiting_updates = self - .peer_sync - .dependency_tracker() - .resolve(record_uuid) - .await; - - if !waiting_updates.is_empty() { - tracing::debug!( - resolved_uuid = %record_uuid, - model_type = %model_type, - waiting_count = waiting_updates.len(), - "Resolving dependencies after current_state snapshot" - ); - - for update in waiting_updates { - match update { - super::state::BufferedUpdate::StateChange(dependent_change) => { - if let Err(e) = self - .peer_sync - .apply_state_change(dependent_change.clone()) - .await - { - tracing::warn!( - error = %e, - record_uuid = %dependent_change.record_uuid, - "Failed to apply dependent state change after current_state snapshot" - ); - } - } - super::state::BufferedUpdate::SharedChange(dependent_entry) => { - // Retry the shared change now that its dependency exists - let entry_clone = dependent_entry.clone(); - let db = self.peer_sync.db().clone(); - if let Err(e) = crate::infra::sync::registry::apply_shared_change(entry_clone, db).await { - tracing::warn!( - error = %e, - record_uuid = %dependent_entry.record_uuid, - "Failed to apply dependent shared change after current_state snapshot" - ); - } - } - } - } - } - } - } - } - } - } - } - } - } - } - } - // Continue if there are more entries if !has_more || batch_size == 0 { break;