refactor: improve device filtering and broadcasting logic for synchronization

- Added logic to exclude the current device from sync partners, ensuring that only other paired devices are considered.
- Enhanced the filtering process to ensure devices have valid NodeId mappings, improving the accuracy of synchronization partners.
- Implemented concurrency limiting for broadcast tasks to prevent network overload, enhancing overall performance and reliability.
- Improved logging to provide detailed insights into the computed sync partners and broadcasting status.
This commit is contained in:
Jamie Pine 2025-11-16 15:23:33 -08:00
parent 91f8503426
commit be1c957f96
2 changed files with 57 additions and 14 deletions

View File

@ -337,29 +337,48 @@ impl NetworkTransport for NetworkingService {
.await
.map_err(|e| anyhow::anyhow!("Failed to query library devices: {}", e))?;
// 2. Get Iroh endpoint for checking connection state
// 2. Get our own device ID to exclude from partners
let our_device_id = self.device_id();
// 3. Get Iroh endpoint for checking connection state
let endpoint = self
.endpoint()
.ok_or_else(|| anyhow::anyhow!("Network endpoint not initialized"))?;
// 3. Get DeviceRegistry to check Iroh connection state
// 4. Get DeviceRegistry to check which devices have NodeId mappings (paired devices)
let device_registry_arc = self.device_registry();
let registry = device_registry_arc.read().await;
// 4. Filter to only devices that Iroh reports as connected
// 5. Filter to OTHER devices in this library that are paired
// We don't check Iroh connection state because:
// - Connections may be idle (no active streams) but still reachable
// - send_sync_message establishes connections on-demand
// - Better to attempt send and handle failure than skip paired devices
let sync_partners: Vec<Uuid> = library_devices
.iter()
.filter(|device| registry.is_node_connected(endpoint, device.uuid))
.filter(|device| {
// Exclude ourselves (can't sync with self)
if device.uuid == our_device_id {
return false;
}
// Must have NodeId mapping (paired via pairing protocol)
// This ensures device is known to our network layer
registry.get_node_id_for_device(device.uuid).is_some()
})
.map(|device| device.uuid)
.collect();
// tracing::info!(
// "Library-scoped sync partners: library={}, lib_devs={}, iroh_connected={}, partners={}",
// library_id,
// library_devices.len(),
// sync_partners.len(),
// sync_partners.len()
// );
tracing::debug!(
library_id = %library_id,
our_device_id = %our_device_id,
total_lib_devices = library_devices.len(),
sync_enabled_devices = library_devices.iter().filter(|d| d.sync_enabled).count(),
paired_devices = library_devices.iter().filter(|d| registry.get_node_id_for_device(d.uuid).is_some()).count(),
sync_partners = sync_partners.len(),
partner_uuids = ?sync_partners,
"Computed library sync partners"
);
Ok(sync_partners)
}

View File

@ -1940,8 +1940,13 @@ impl PeerSync {
}
}
// Spawn a broadcast task per model_type batch
// This sends 1 StateBatch message instead of N StateChange messages
// Spawn broadcast tasks with concurrency limiting
// This prevents overwhelming the network/receiver with too many concurrent sends
use futures::stream::{FuturesUnordered, StreamExt};
let mut broadcast_tasks = FuturesUnordered::new();
const MAX_CONCURRENT_BROADCASTS: usize = 10;
for (model_type, changes) in batches_by_model {
let library_id = library_id;
let network = network.clone();
@ -1952,7 +1957,7 @@ impl PeerSync {
let config = config.clone();
let last_realtime_activity_per_peer = last_realtime_activity_per_peer.clone();
tokio::spawn(async move {
let task = tokio::spawn(async move {
if let Err(e) = Self::broadcast_state_batch_static(
library_id,
model_type,
@ -1970,6 +1975,25 @@ impl PeerSync {
warn!(error = %e, "Failed to broadcast state batch in background task");
}
});
broadcast_tasks.push(task);
// Limit concurrent broadcasts to prevent overwhelming receiver
if broadcast_tasks.len() >= MAX_CONCURRENT_BROADCASTS {
// Wait for one to complete before spawning more
if let Some(result) = broadcast_tasks.next().await {
if let Err(e) = result {
warn!(error = %e, "Broadcast task panicked");
}
}
}
}
// Wait for remaining tasks to complete
while let Some(result) = broadcast_tasks.next().await {
if let Err(e) = result {
warn!(error = %e, "Broadcast task panicked");
}
}
info!(