Refactor change detection and enhance event handling in persistent indexing

- Improved the `emit_change_event` method in `DatabaseAdapter` to handle resource events more effectively, including detailed logging for event emissions.
- Updated the `PersistentEventHandler` to ensure paths are registered with the filesystem watcher upon startup, enhancing real-time monitoring capabilities.
- Introduced new event collectors for better diagnostics and logging of filesystem and core events during tests.
- Enhanced integration tests to cover various scenarios, including batch file operations and delete/restore patterns, ensuring accurate event handling and preventing duplicates.
This commit is contained in:
Jamie Pine 2025-12-09 18:00:44 -08:00
parent cf998edb03
commit 4efcf7466b
3 changed files with 1131 additions and 382 deletions

View File

@ -216,7 +216,8 @@ impl ChangeHandler for DatabaseAdapter {
state
.entry_id_cache
.insert(parent_path.to_path_buf(), parent_id);
} else if let Ok(Some(parent_id)) = DatabaseStorage::resolve_parent_id(&self.db, parent_path).await
} else if let Ok(Some(parent_id)) =
DatabaseStorage::resolve_parent_id(&self.db, parent_path).await
{
// Cache the parent ID for future lookups
state
@ -586,8 +587,46 @@ impl ChangeHandler for DatabaseAdapter {
async fn emit_change_event(&self, entry: &EntryRef, change_type: ChangeType) -> Result<()> {
use crate::domain::ResourceManager;
use crate::infra::event::Event;
if let Some(uuid) = entry.uuid {
tracing::debug!(
"emit_change_event called: {:?} for {} (uuid: {:?})",
change_type,
entry.path.display(),
entry.uuid
);
let Some(uuid) = entry.uuid else {
tracing::warn!(
"Entry has no UUID, cannot emit {:?} event for {}",
change_type,
entry.path.display()
);
return Ok(());
};
match change_type {
ChangeType::Deleted => {
// Emit ResourceDeleted event so frontend can remove from cache
// Use "file" resource_type to match ephemeral events (frontend listens for "file")
tracing::debug!(
"Emitting ResourceDeleted for persistent delete: {} (id: {})",
entry.path.display(),
uuid
);
self.context.events.emit(Event::ResourceDeleted {
resource_type: "file".to_string(),
resource_id: uuid,
});
}
ChangeType::Created | ChangeType::Modified | ChangeType::Moved => {
// Emit ResourceChanged event for UI updates
tracing::debug!(
"Emitting ResourceChanged for persistent {:?}: {} (id: {})",
change_type,
entry.path.display(),
uuid
);
let resource_manager =
ResourceManager::new(Arc::new(self.db.clone()), self.context.events.clone());
@ -596,10 +635,18 @@ impl ChangeHandler for DatabaseAdapter {
.await
{
tracing::warn!(
"Failed to emit resource event for {:?} entry: {}",
"Failed to emit resource event for {:?} entry {}: {}",
change_type,
entry.path.display(),
e
);
} else {
tracing::debug!(
"Successfully emitted ResourceChanged for {:?}: {}",
change_type,
entry.path.display()
);
}
}
}
@ -728,12 +775,7 @@ impl<'a> IndexPersistence for DatabaseAdapterForJob<'a> {
) -> JobResult<()> {
use crate::ops::indexing::database_storage::DatabaseStorage;
DatabaseStorage::link_to_content_identity(
self.ctx.library_db(),
entry_id,
path,
cas_id,
)
DatabaseStorage::link_to_content_identity(self.ctx.library_db(), entry_id, path, cas_id)
.await
.map(|_| ())
}

View File

@ -124,18 +124,46 @@ impl PersistentEventHandler {
{
let mut locations = self.locations.write().await;
locations.insert(root_path.clone(), meta.clone());
debug!(
"Added location to map. Total locations: {}",
locations.len()
);
}
// Create worker if handler is running
if self.is_running.load(Ordering::SeqCst) {
debug!(
"Handler is running, creating worker for location {}",
location_id
);
self.ensure_worker(meta).await?;
} else {
debug!(
"Handler not running yet, worker will be created on start for location {}",
location_id
);
}
// Register path with FsWatcher if connected
if let Some(fs_watcher) = self.fs_watcher.read().await.as_ref() {
debug!(
"Registering path {} with FsWatcher (recursive)",
root_path.display()
);
fs_watcher
.watch_path(&root_path, sd_fs_watcher::WatchConfig::recursive())
.await?;
info!(
"Successfully registered {} with FsWatcher for location {}",
root_path.display(),
location_id
);
} else {
warn!(
"FsWatcher not connected, cannot watch path {} for location {}",
root_path.display(),
location_id
);
}
Ok(())
@ -198,10 +226,35 @@ impl PersistentEventHandler {
debug!("Starting PersistentEventHandler");
// Create workers for all registered locations
// Create workers for all registered locations AND register paths with FsWatcher
// This is critical: locations may have been added before start() was called,
// when the FsWatcher wasn't connected yet, so we need to register them now.
let locations: Vec<LocationMeta> = self.locations.read().await.values().cloned().collect();
for meta in locations {
self.ensure_worker(meta).await?;
for meta in &locations {
self.ensure_worker(meta.clone()).await?;
// Register the path with the OS-level watcher (may have been skipped during add_location)
debug!(
"Registering path {} with FsWatcher for location {}",
meta.root_path.display(),
meta.id
);
if let Err(e) = fs_watcher
.watch_path(&meta.root_path, sd_fs_watcher::WatchConfig::recursive())
.await
{
error!(
"Failed to register path {} with FsWatcher: {}",
meta.root_path.display(),
e
);
} else {
info!(
"Successfully registered {} with FsWatcher for location {}",
meta.root_path.display(),
meta.id
);
}
}
// Start the event routing task
@ -284,11 +337,24 @@ impl PersistentEventHandler {
) -> Result<()> {
let locs = locations.read().await;
trace!(
"Routing event {:?} for path: {} (checking {} locations)",
event.kind,
event.path.display(),
locs.len()
);
// Find the best matching location (longest prefix match)
let mut best_match: Option<&LocationMeta> = None;
let mut longest_len = 0;
for (root_path, meta) in locs.iter() {
trace!(
" Checking location {} at {} for path {}",
meta.id,
root_path.display(),
event.path.display()
);
if event.path.starts_with(root_path) {
let len = root_path.as_os_str().len();
if len > longest_len {
@ -299,10 +365,21 @@ impl PersistentEventHandler {
}
let Some(location) = best_match else {
trace!("Event not under any location: {}", event.path.display());
debug!(
"Event not under any location: {} (registered locations: {:?})",
event.path.display(),
locs.keys().collect::<Vec<_>>()
);
return Ok(());
};
debug!(
"Routing {:?} event for {} to location {}",
event.kind,
event.path.display(),
location.id
);
// Send to worker
let workers_map = workers.read().await;
if let Some(tx) = workers_map.get(&location.id) {
@ -312,6 +389,12 @@ impl PersistentEventHandler {
location.id, e
);
}
} else {
warn!(
"No worker found for location {} (workers: {:?})",
location.id,
workers_map.keys().collect::<Vec<_>>()
);
}
Ok(())

File diff suppressed because it is too large Load Diff