Buffer shared changes when FK dep missing

This commit is contained in:
Jamie Pine 2025-11-28 08:37:27 -08:00
parent e4c60c58ca
commit 7c864e9588

View File

@ -667,7 +667,52 @@ impl BackfillManager {
// Apply entries in HLC order (already sorted from peer)
for entry in &entries {
self.log_handler.handle_shared_change(entry.clone()).await?;
// Attempt to apply the shared change
// If it fails due to missing FK dependency, buffer it for retry
match self.log_handler.handle_shared_change(entry.clone()).await {
Ok(()) => {
// Success - resolve any waiting dependencies
}
Err(e) => {
let error_str = e.to_string();
// Check if this is a FK dependency error
if error_str.contains("Sync dependency missing") {
// Extract the missing UUID and buffer this entry
if let Some(missing_uuid) = super::dependency::extract_missing_dependency_uuid(&error_str) {
tracing::debug!(
record_uuid = %entry.record_uuid,
model_type = %entry.model_type,
missing_uuid = %missing_uuid,
"Shared resource has missing FK dependency, buffering for retry"
);
// Buffer this shared change for retry when dependency arrives
self.peer_sync
.dependency_tracker()
.add_dependency(
missing_uuid,
super::state::BufferedUpdate::SharedChange(entry.clone()),
)
.await;
continue; // Skip to next entry
} else {
tracing::warn!(
error = %e,
record_uuid = %entry.record_uuid,
"Sync dependency missing but couldn't extract UUID, buffering: {}",
error_str
);
// TODO: Consider buffering blind or failing
continue;
}
}
// Non-dependency error - fail backfill
return Err(e);
}
}
// Resolve any state changes waiting for this shared resource
// This handles cross-type dependencies (e.g., entries waiting for content_identities)
@ -686,18 +731,29 @@ impl BackfillManager {
);
for update in waiting_updates {
if let super::state::BufferedUpdate::StateChange(dependent_change) = update
{
if let Err(e) = self
.peer_sync
.apply_state_change(dependent_change.clone())
.await
{
tracing::warn!(
error = %e,
record_uuid = %dependent_change.record_uuid,
"Failed to apply dependent state change after shared resource backfill"
);
match update {
super::state::BufferedUpdate::StateChange(dependent_change) => {
if let Err(e) = self
.peer_sync
.apply_state_change(dependent_change.clone())
.await
{
tracing::warn!(
error = %e,
record_uuid = %dependent_change.record_uuid,
"Failed to apply dependent state change after shared resource backfill"
);
}
}
super::state::BufferedUpdate::SharedChange(dependent_entry) => {
// Retry the shared change now that its dependency exists
if let Err(e) = self.log_handler.handle_shared_change(dependent_entry.clone()).await {
tracing::warn!(
error = %e,
record_uuid = %dependent_entry.record_uuid,
"Failed to apply dependent shared change after parent resolved"
);
}
}
}
}
@ -858,20 +914,31 @@ impl BackfillManager {
);
for update in waiting_updates {
if let super::state::BufferedUpdate::StateChange(
dependent_change,
) = update
{
if let Err(e) = self
.peer_sync
.apply_state_change(dependent_change.clone())
.await
{
tracing::warn!(
error = %e,
record_uuid = %dependent_change.record_uuid,
"Failed to apply dependent state change after current_state snapshot"
);
match update {
super::state::BufferedUpdate::StateChange(dependent_change) => {
if let Err(e) = self
.peer_sync
.apply_state_change(dependent_change.clone())
.await
{
tracing::warn!(
error = %e,
record_uuid = %dependent_change.record_uuid,
"Failed to apply dependent state change after current_state snapshot"
);
}
}
super::state::BufferedUpdate::SharedChange(dependent_entry) => {
// Retry the shared change now that its dependency exists
let entry_clone = dependent_entry.clone();
let db = self.peer_sync.db().clone();
if let Err(e) = crate::infra::sync::registry::apply_shared_change(entry_clone, db).await {
tracing::warn!(
error = %e,
record_uuid = %dependent_entry.record_uuid,
"Failed to apply dependent shared change after current_state snapshot"
);
}
}
}
}