Apply current_state snapshot before backfill

Process the pre-sync current_state data in dependency order before
applying HLC-ordered log entries. This ensures base records exist for FK
constraints, and then resolves dependent changes after inserting
snapshot records. Extend FK error handling to recognize foreign key
failures and extract missing UUIDs for buffering and retry.
This commit is contained in:
Jamie Pine 2025-11-28 08:58:13 -08:00
parent 7c864e9588
commit 3791372f57

View File

@ -665,7 +665,129 @@ impl BackfillManager {
// Track max HLC for ACK (critical for pruning)
let max_hlc_in_batch = entries.last().map(|e| e.hlc);
// Apply entries in HLC order (already sorted from peer)
// IMPORTANT: Apply current_state snapshot FIRST (before HLC entries)
// The snapshot contains base data that peer log entries may depend on
// This prevents FK constraint errors when peer log references snapshot data
if let Some(state) = current_state {
if let Some(state_map) = state.as_object() {
// Get dependency-ordered list of models to prevent FK violations
// CRITICAL: Must apply parent models before children (e.g., user_metadata before user_metadata_tag)
let sync_order = match crate::infra::sync::registry::compute_registry_sync_order().await {
Ok(order) => order,
Err(e) => {
warn!("Failed to compute sync order, using unordered: {}", e);
// Fallback to unordered if dependency graph fails
state_map.keys().map(|k| k.clone()).collect::<Vec<_>>()
}
};
// Apply snapshot records in dependency order
for model_type in sync_order {
// Skip if model not in snapshot
let records_value = match state_map.get(&model_type) {
Some(val) => val,
None => continue,
};
if let Some(records_array) = records_value.as_array() {
info!(
model_type = %model_type,
count = records_array.len(),
"Applying current state snapshot for pre-sync data"
);
for record_value in records_array {
if let Some(record_obj) = record_value.as_object() {
if let (Some(uuid_value), Some(data)) =
(record_obj.get("uuid"), record_obj.get("data"))
{
if let Some(uuid_str) = uuid_value.as_str() {
if let Ok(record_uuid) = Uuid::parse_str(uuid_str) {
// Construct a synthetic SharedChangeEntry for application
// Generate HLC for ordering (pre-sync data gets current HLC)
let hlc = {
let mut hlc_gen = self
.peer_sync
.hlc_generator()
.lock()
.await;
hlc_gen.next()
};
let entry = crate::infra::sync::SharedChangeEntry {
hlc,
model_type: model_type.clone(),
record_uuid,
change_type: crate::infra::sync::ChangeType::Insert,
data: data.clone(),
};
let db = self.peer_sync.db().clone();
if let Err(e) = crate::infra::sync::registry::apply_shared_change(entry, db).await {
warn!(
model_type = %model_type,
uuid = %record_uuid,
error = %e,
"Failed to apply current state record"
);
} else {
// Resolve any state changes waiting for this shared resource
let waiting_updates = self
.peer_sync
.dependency_tracker()
.resolve(record_uuid)
.await;
if !waiting_updates.is_empty() {
tracing::debug!(
resolved_uuid = %record_uuid,
model_type = %model_type,
waiting_count = waiting_updates.len(),
"Resolving dependencies after current_state snapshot"
);
for update in waiting_updates {
match update {
super::state::BufferedUpdate::StateChange(dependent_change) => {
if let Err(e) = self
.peer_sync
.apply_state_change(dependent_change.clone())
.await
{
tracing::warn!(
error = %e,
record_uuid = %dependent_change.record_uuid,
"Failed to apply dependent state change after current_state snapshot"
);
}
}
super::state::BufferedUpdate::SharedChange(dependent_entry) => {
// Retry the shared change now that its dependency exists
let entry_clone = dependent_entry.clone();
let db = self.peer_sync.db().clone();
if let Err(e) = crate::infra::sync::registry::apply_shared_change(entry_clone, db).await {
tracing::warn!(
error = %e,
record_uuid = %dependent_entry.record_uuid,
"Failed to apply dependent shared change after current_state snapshot"
);
}
}
}
}
}
}
}
}
}
}
}
}
}
}
}
// Now apply HLC-ordered peer log entries (after snapshot dependencies are available)
for entry in &entries {
// Attempt to apply the shared change
// If it fails due to missing FK dependency, buffer it for retry
@ -676,9 +798,9 @@ impl BackfillManager {
Err(e) => {
let error_str = e.to_string();
// Check if this is a FK dependency error
if error_str.contains("Sync dependency missing") {
// Extract the missing UUID and buffer this entry
// Check if this is a FK dependency error (from FK mapping or SQLite constraint)
if error_str.contains("Sync dependency missing") || error_str.contains("FOREIGN KEY constraint failed") {
// Try to extract the missing UUID from the error message
if let Some(missing_uuid) = super::dependency::extract_missing_dependency_uuid(&error_str) {
tracing::debug!(
record_uuid = %entry.record_uuid,
@ -698,13 +820,17 @@ impl BackfillManager {
continue; // Skip to next entry
} else {
// FK error but can't extract UUID (raw SQLite error)
// For models with dependencies, buffer on a placeholder and retry after snapshot completes
// This handles the case where peer log entries depend on snapshot data
tracing::warn!(
error = %e,
record_uuid = %entry.record_uuid,
"Sync dependency missing but couldn't extract UUID, buffering: {}",
error_str
model_type = %entry.model_type,
"FK constraint failed but couldn't extract UUID - skipping (will retry from snapshot dependencies)"
);
// TODO: Consider buffering blind or failing
// Skip this record - if it's in the snapshot dependencies, it will be resolved
// If not, it will be re-requested in next backfill attempt
continue;
}
}
@ -834,126 +960,6 @@ impl BackfillManager {
last_hlc = Some(last_entry.hlc);
}
// Apply current_state snapshot (contains pre-sync data not in peer_log)
if let Some(state) = current_state {
if let Some(state_map) = state.as_object() {
// Get dependency-ordered list of models to prevent FK violations
// CRITICAL: Must apply parent models before children (e.g., user_metadata before user_metadata_tag)
let sync_order = match crate::infra::sync::registry::compute_registry_sync_order().await {
Ok(order) => order,
Err(e) => {
warn!("Failed to compute sync order, using unordered: {}", e);
// Fallback to unordered if dependency graph fails
state_map.keys().map(|k| k.clone()).collect::<Vec<_>>()
}
};
// Apply snapshot records in dependency order
for model_type in sync_order {
// Skip if model not in snapshot
let records_value = match state_map.get(&model_type) {
Some(val) => val,
None => continue,
};
if let Some(records_array) = records_value.as_array() {
info!(
model_type = %model_type,
count = records_array.len(),
"Applying current state snapshot for pre-sync data"
);
for record_value in records_array {
if let Some(record_obj) = record_value.as_object() {
if let (Some(uuid_value), Some(data)) =
(record_obj.get("uuid"), record_obj.get("data"))
{
if let Some(uuid_str) = uuid_value.as_str() {
if let Ok(record_uuid) = Uuid::parse_str(uuid_str) {
// Construct a synthetic SharedChangeEntry for application
// Generate HLC for ordering (pre-sync data gets current HLC)
let hlc = {
let mut hlc_gen = self
.peer_sync
.hlc_generator()
.lock()
.await;
hlc_gen.next()
};
let entry = crate::infra::sync::SharedChangeEntry {
hlc,
model_type: model_type.clone(),
record_uuid,
change_type: crate::infra::sync::ChangeType::Insert,
data: data.clone(),
};
let db = self.peer_sync.db().clone();
if let Err(e) = crate::infra::sync::registry::apply_shared_change(entry, db).await {
warn!(
model_type = %model_type,
uuid = %record_uuid,
error = %e,
"Failed to apply current state record"
);
} else {
// Resolve any state changes waiting for this shared resource
let waiting_updates = self
.peer_sync
.dependency_tracker()
.resolve(record_uuid)
.await;
if !waiting_updates.is_empty() {
tracing::debug!(
resolved_uuid = %record_uuid,
model_type = %model_type,
waiting_count = waiting_updates.len(),
"Resolving dependencies after current_state snapshot"
);
for update in waiting_updates {
match update {
super::state::BufferedUpdate::StateChange(dependent_change) => {
if let Err(e) = self
.peer_sync
.apply_state_change(dependent_change.clone())
.await
{
tracing::warn!(
error = %e,
record_uuid = %dependent_change.record_uuid,
"Failed to apply dependent state change after current_state snapshot"
);
}
}
super::state::BufferedUpdate::SharedChange(dependent_entry) => {
// Retry the shared change now that its dependency exists
let entry_clone = dependent_entry.clone();
let db = self.peer_sync.db().clone();
if let Err(e) = crate::infra::sync::registry::apply_shared_change(entry_clone, db).await {
tracing::warn!(
error = %e,
record_uuid = %dependent_entry.record_uuid,
"Failed to apply dependent shared change after current_state snapshot"
);
}
}
}
}
}
}
}
}
}
}
}
}
}
}
}
// Continue if there are more entries
if !has_more || batch_size == 0 {
break;