mirror of
https://github.com/spacedriveapp/spacedrive.git
synced 2025-12-11 20:15:30 +01:00
Implement unified change handling for indexing with filesystem watching support
- Introduced a new `handler.rs` module to manage filesystem change events for both persistent and ephemeral indexing. - Added a trait-based `ChangeHandler` interface to abstract operations for different storage backends. - Enhanced `EphemeralIndexCache` to support filesystem watching, allowing paths to be monitored for changes. - Implemented methods for registering and unregistering watched paths, improving responsiveness to filesystem events. - Updated the `LocationWatcher` to handle ephemeral watches and process events accordingly. - Added tests and documentation to ensure reliability and clarity of the new functionality.
This commit is contained in:
parent
3739b3f34f
commit
6bdc9a7055
@ -38,6 +38,7 @@ lütke
|
||||
marietti
|
||||
mbps
|
||||
mehrzad
|
||||
memmap
|
||||
Mjpeg
|
||||
Mmap
|
||||
mpscrr
|
||||
@ -77,6 +78,7 @@ tobiaslutke
|
||||
tokio
|
||||
tombstoned
|
||||
typecheck
|
||||
Uninit
|
||||
unwatch
|
||||
uuid
|
||||
vdfs
|
||||
|
||||
@ -11,6 +11,12 @@
|
||||
//!
|
||||
//! The cache tracks which paths have been indexed (ready) vs are currently
|
||||
//! being indexed (in progress).
|
||||
//!
|
||||
//! ## File Watching Support
|
||||
//!
|
||||
//! The cache can optionally track which paths should be monitored for filesystem
|
||||
//! changes. When a path is marked for watching, the watcher service can detect
|
||||
//! changes and update the ephemeral index via `EphemeralChangeHandler`.
|
||||
|
||||
use crate::ops::indexing::EphemeralIndex;
|
||||
use parking_lot::RwLock;
|
||||
@ -36,6 +42,9 @@ pub struct EphemeralIndexCache {
|
||||
/// Paths currently being indexed
|
||||
indexing_in_progress: RwLock<HashSet<PathBuf>>,
|
||||
|
||||
/// Paths registered for filesystem watching (subset of indexed_paths)
|
||||
watched_paths: RwLock<HashSet<PathBuf>>,
|
||||
|
||||
/// When the cache was created
|
||||
created_at: Instant,
|
||||
}
|
||||
@ -47,6 +56,7 @@ impl EphemeralIndexCache {
|
||||
index: Arc::new(TokioRwLock::new(EphemeralIndex::new()?)),
|
||||
indexed_paths: RwLock::new(HashSet::new()),
|
||||
indexing_in_progress: RwLock::new(HashSet::new()),
|
||||
watched_paths: RwLock::new(HashSet::new()),
|
||||
created_at: Instant::now(),
|
||||
})
|
||||
}
|
||||
@ -159,14 +169,92 @@ impl EphemeralIndexCache {
|
||||
self.indexing_in_progress.read().iter().cloned().collect()
|
||||
}
|
||||
|
||||
// ========================================================================
|
||||
// File Watching Support
|
||||
// ========================================================================
|
||||
|
||||
/// Register a path for filesystem watching.
|
||||
///
|
||||
/// When registered, the watcher service will monitor this path for changes
|
||||
/// and update the ephemeral index via `EphemeralChangeHandler`. The path
|
||||
/// must already be indexed.
|
||||
pub fn register_for_watching(&self, path: PathBuf) -> bool {
|
||||
let indexed = self.indexed_paths.read();
|
||||
if !indexed.contains(&path) {
|
||||
return false;
|
||||
}
|
||||
drop(indexed);
|
||||
|
||||
let mut watched = self.watched_paths.write();
|
||||
watched.insert(path);
|
||||
true
|
||||
}
|
||||
|
||||
/// Unregister a path from filesystem watching.
|
||||
pub fn unregister_from_watching(&self, path: &Path) {
|
||||
let mut watched = self.watched_paths.write();
|
||||
watched.remove(path);
|
||||
}
|
||||
|
||||
/// Check if a path is registered for watching.
|
||||
pub fn is_watched(&self, path: &Path) -> bool {
|
||||
self.watched_paths.read().contains(path)
|
||||
}
|
||||
|
||||
/// Get all watched paths.
|
||||
pub fn watched_paths(&self) -> Vec<PathBuf> {
|
||||
self.watched_paths.read().iter().cloned().collect()
|
||||
}
|
||||
|
||||
/// Find the watched root path that contains the given path.
|
||||
///
|
||||
/// If the given path is under a watched directory, returns that directory.
|
||||
/// Used by the watcher to route events to the ephemeral handler.
|
||||
pub fn find_watched_root(&self, path: &Path) -> Option<PathBuf> {
|
||||
let watched = self.watched_paths.read();
|
||||
|
||||
// Find the longest matching watched path that is an ancestor of `path`
|
||||
let mut best_match: Option<&PathBuf> = None;
|
||||
let mut best_len = 0;
|
||||
|
||||
for watched_path in watched.iter() {
|
||||
if path.starts_with(watched_path) {
|
||||
let len = watched_path.as_os_str().len();
|
||||
if len > best_len {
|
||||
best_len = len;
|
||||
best_match = Some(watched_path);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
best_match.cloned()
|
||||
}
|
||||
|
||||
/// Check if any path in an event batch is under an ephemeral watched path.
|
||||
///
|
||||
/// Returns the watched root if found.
|
||||
pub fn find_watched_root_for_any<'a, I>(&self, paths: I) -> Option<PathBuf>
|
||||
where
|
||||
I: IntoIterator<Item = &'a Path>,
|
||||
{
|
||||
for path in paths {
|
||||
if let Some(root) = self.find_watched_root(path) {
|
||||
return Some(root);
|
||||
}
|
||||
}
|
||||
None
|
||||
}
|
||||
|
||||
/// Get cache statistics
|
||||
pub fn stats(&self) -> EphemeralIndexCacheStats {
|
||||
let indexed = self.indexed_paths.read();
|
||||
let in_progress = self.indexing_in_progress.read();
|
||||
let watched = self.watched_paths.read();
|
||||
|
||||
EphemeralIndexCacheStats {
|
||||
indexed_paths: indexed.len(),
|
||||
indexing_in_progress: in_progress.len(),
|
||||
watched_paths: watched.len(),
|
||||
}
|
||||
}
|
||||
|
||||
@ -222,7 +310,8 @@ pub struct EphemeralIndexCacheStats {
|
||||
pub indexed_paths: usize,
|
||||
/// Number of paths currently being indexed
|
||||
pub indexing_in_progress: usize,
|
||||
// Legacy field names for compatibility
|
||||
/// Number of paths registered for filesystem watching
|
||||
pub watched_paths: usize,
|
||||
}
|
||||
|
||||
impl EphemeralIndexCacheStats {
|
||||
@ -328,4 +417,49 @@ mod tests {
|
||||
assert_eq!(stats.indexed_paths, 1);
|
||||
assert_eq!(stats.indexing_in_progress, 1);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_watch_registration() {
|
||||
let cache = EphemeralIndexCache::new().expect("failed to create cache");
|
||||
let path = PathBuf::from("/test/watched");
|
||||
|
||||
// Can't watch a path that's not indexed
|
||||
assert!(!cache.register_for_watching(path.clone()));
|
||||
assert!(!cache.is_watched(&path));
|
||||
|
||||
// Index the path first
|
||||
let _index = cache.create_for_indexing(path.clone());
|
||||
cache.mark_indexing_complete(&path);
|
||||
|
||||
// Now we can register for watching
|
||||
assert!(cache.register_for_watching(path.clone()));
|
||||
assert!(cache.is_watched(&path));
|
||||
|
||||
// Stats should reflect watched path
|
||||
let stats = cache.stats();
|
||||
assert_eq!(stats.watched_paths, 1);
|
||||
|
||||
// Unregister
|
||||
cache.unregister_from_watching(&path);
|
||||
assert!(!cache.is_watched(&path));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_find_watched_root() {
|
||||
let cache = EphemeralIndexCache::new().expect("failed to create cache");
|
||||
|
||||
let root = PathBuf::from("/mnt/nas");
|
||||
let child = PathBuf::from("/mnt/nas/documents/report.pdf");
|
||||
|
||||
// Index and watch the root
|
||||
let _index = cache.create_for_indexing(root.clone());
|
||||
cache.mark_indexing_complete(&root);
|
||||
cache.register_for_watching(root.clone());
|
||||
|
||||
// Child path should find the watched root
|
||||
assert_eq!(cache.find_watched_root(&child), Some(root.clone()));
|
||||
|
||||
// Unrelated path should not find a root
|
||||
assert_eq!(cache.find_watched_root(Path::new("/other/path")), None);
|
||||
}
|
||||
}
|
||||
|
||||
@ -41,6 +41,7 @@ pub mod arena;
|
||||
pub mod cache;
|
||||
pub mod index_cache;
|
||||
pub mod registry;
|
||||
pub mod responder;
|
||||
pub mod types;
|
||||
|
||||
// Re-export public types
|
||||
|
||||
124
core/src/ops/indexing/ephemeral/responder.rs
Normal file
124
core/src/ops/indexing/ephemeral/responder.rs
Normal file
@ -0,0 +1,124 @@
|
||||
//! Ephemeral responder for updating in-memory indexes on filesystem changes.
|
||||
//!
|
||||
//! This module processes filesystem events against the ephemeral index cache.
|
||||
//! When a user is browsing an ephemeral directory (external drive, network share)
|
||||
//! and files change, the responder updates the in-memory index to reflect changes.
|
||||
//!
|
||||
//! ## Usage
|
||||
//!
|
||||
//! ```rust,ignore
|
||||
//! use sd_core::ops::indexing::ephemeral::responder;
|
||||
//!
|
||||
//! // Check if an event should be handled by the ephemeral system
|
||||
//! if let Some(root) = responder::find_ephemeral_root(&path, &context) {
|
||||
//! responder::process_event(&context, &root, event_kind).await?;
|
||||
//! }
|
||||
//! ```
|
||||
|
||||
use crate::context::CoreContext;
|
||||
use crate::infra::event::FsRawEventKind;
|
||||
use crate::ops::indexing::handler::{self, ChangeConfig, EphemeralChangeHandler};
|
||||
use crate::ops::indexing::rules::RuleToggles;
|
||||
use anyhow::Result;
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::sync::Arc;
|
||||
|
||||
/// Check if a path falls under an ephemeral watched directory.
|
||||
///
|
||||
/// Returns the watched root path if found.
|
||||
pub fn find_ephemeral_root(path: &Path, context: &CoreContext) -> Option<PathBuf> {
|
||||
context.ephemeral_cache().find_watched_root(path)
|
||||
}
|
||||
|
||||
/// Check if any path in a batch of events falls under an ephemeral watched directory.
|
||||
pub fn find_ephemeral_root_for_events(
|
||||
events: &[FsRawEventKind],
|
||||
context: &CoreContext,
|
||||
) -> Option<PathBuf> {
|
||||
let paths: Vec<&Path> = events
|
||||
.iter()
|
||||
.flat_map(|e| match e {
|
||||
FsRawEventKind::Create { path } => vec![path.as_path()],
|
||||
FsRawEventKind::Modify { path } => vec![path.as_path()],
|
||||
FsRawEventKind::Remove { path } => vec![path.as_path()],
|
||||
FsRawEventKind::Rename { from, to } => vec![from.as_path(), to.as_path()],
|
||||
})
|
||||
.collect();
|
||||
|
||||
context
|
||||
.ephemeral_cache()
|
||||
.find_watched_root_for_any(paths.into_iter())
|
||||
}
|
||||
|
||||
/// Process a batch of filesystem events against the ephemeral index.
|
||||
///
|
||||
/// Creates an `EphemeralChangeHandler` and processes the events using shared
|
||||
/// handler logic. The ephemeral index is updated in-place and ResourceChanged
|
||||
/// events are emitted for UI updates.
|
||||
pub async fn apply_batch(
|
||||
context: &Arc<CoreContext>,
|
||||
root_path: &Path,
|
||||
events: Vec<FsRawEventKind>,
|
||||
rule_toggles: RuleToggles,
|
||||
) -> Result<()> {
|
||||
if events.is_empty() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let index = context.ephemeral_cache().get_global_index();
|
||||
let event_bus = context.events.clone();
|
||||
|
||||
let mut handler = EphemeralChangeHandler::new(index, event_bus, root_path.to_path_buf());
|
||||
|
||||
let config = ChangeConfig {
|
||||
rule_toggles,
|
||||
location_root: root_path,
|
||||
volume_backend: None, // Ephemeral paths typically don't use volume backends
|
||||
};
|
||||
|
||||
handler::apply_batch(&mut handler, events, &config).await
|
||||
}
|
||||
|
||||
/// Process a single filesystem event against the ephemeral index.
|
||||
pub async fn apply(
|
||||
context: &Arc<CoreContext>,
|
||||
root_path: &Path,
|
||||
event: FsRawEventKind,
|
||||
rule_toggles: RuleToggles,
|
||||
) -> Result<()> {
|
||||
apply_batch(context, root_path, vec![event], rule_toggles).await
|
||||
}
|
||||
|
||||
/// Register an ephemeral path for filesystem watching.
|
||||
///
|
||||
/// After calling this, filesystem events under the path will be detectable
|
||||
/// via `find_ephemeral_root`. The path must already be indexed in the
|
||||
/// ephemeral cache.
|
||||
///
|
||||
/// Returns true if registration succeeded, false if the path is not indexed.
|
||||
pub fn register_for_watching(context: &CoreContext, path: PathBuf) -> bool {
|
||||
context.ephemeral_cache().register_for_watching(path)
|
||||
}
|
||||
|
||||
/// Unregister an ephemeral path from filesystem watching.
|
||||
pub fn unregister_from_watching(context: &CoreContext, path: &Path) {
|
||||
context.ephemeral_cache().unregister_from_watching(path)
|
||||
}
|
||||
|
||||
/// Check if any ephemeral paths are being watched.
|
||||
pub fn has_watched_paths(context: &CoreContext) -> bool {
|
||||
!context.ephemeral_cache().watched_paths().is_empty()
|
||||
}
|
||||
|
||||
/// Get all currently watched ephemeral paths.
|
||||
pub fn watched_paths(context: &CoreContext) -> Vec<PathBuf> {
|
||||
context.ephemeral_cache().watched_paths()
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
// Integration tests would require a full CoreContext setup
|
||||
// Unit tests for the helper functions are covered by index_cache tests
|
||||
}
|
||||
1447
core/src/ops/indexing/handler.rs
Normal file
1447
core/src/ops/indexing/handler.rs
Normal file
File diff suppressed because it is too large
Load Diff
@ -620,6 +620,47 @@ impl EphemeralIndex {
|
||||
self.path_index.len()
|
||||
}
|
||||
|
||||
/// Check if an entry exists at the given path.
|
||||
pub fn has_entry(&self, path: &Path) -> bool {
|
||||
self.path_index.contains_key(path)
|
||||
}
|
||||
|
||||
/// Remove an entry at the given path.
|
||||
///
|
||||
/// Returns true if the entry was removed, false if it didn't exist.
|
||||
/// For directories, this only removes the directory entry itself, not its children.
|
||||
/// Use `remove_directory_tree` to remove a directory and all its descendants.
|
||||
pub fn remove_entry(&mut self, path: &Path) -> bool {
|
||||
let existed = self.path_index.remove(path).is_some();
|
||||
self.entry_uuids.remove(path);
|
||||
self.content_kinds.remove(path);
|
||||
existed
|
||||
}
|
||||
|
||||
/// Remove a directory and all its descendants.
|
||||
///
|
||||
/// Returns the number of entries removed.
|
||||
pub fn remove_directory_tree(&mut self, path: &Path) -> usize {
|
||||
let prefix = path.to_string_lossy().to_string();
|
||||
let keys_to_remove: Vec<_> = self
|
||||
.path_index
|
||||
.keys()
|
||||
.filter(|k| {
|
||||
let k_str = k.to_string_lossy();
|
||||
k_str == prefix || k_str.starts_with(&format!("{}/", prefix))
|
||||
})
|
||||
.cloned()
|
||||
.collect();
|
||||
|
||||
let count = keys_to_remove.len();
|
||||
for key in keys_to_remove {
|
||||
self.path_index.remove(&key);
|
||||
self.entry_uuids.remove(&key);
|
||||
self.content_kinds.remove(&key);
|
||||
}
|
||||
count
|
||||
}
|
||||
|
||||
/// Reconstructs paths for all entries and returns them as a HashMap.
|
||||
///
|
||||
/// For large indexes, this can be expensive since it walks the tree to rebuild
|
||||
|
||||
@ -25,6 +25,7 @@ pub mod change_detection;
|
||||
pub mod ctx;
|
||||
pub mod entry;
|
||||
pub mod ephemeral;
|
||||
pub mod handler;
|
||||
pub mod hierarchy;
|
||||
pub mod input;
|
||||
pub mod job;
|
||||
@ -43,6 +44,10 @@ pub use action::IndexingAction;
|
||||
pub use ctx::{IndexingCtx, ResponderCtx};
|
||||
pub use entry::{EntryMetadata, EntryProcessor};
|
||||
pub use ephemeral::EphemeralIndexCache;
|
||||
pub use handler::{
|
||||
apply_batch as apply_change_batch, ChangeConfig, ChangeHandler, ChangeType, EntryRef,
|
||||
EphemeralChangeHandler, PersistentChangeHandler,
|
||||
};
|
||||
pub use hierarchy::HierarchyQuery;
|
||||
pub use input::IndexInput;
|
||||
pub use job::{
|
||||
|
||||
@ -139,6 +139,8 @@ pub struct LocationWatcher {
|
||||
context: Arc<CoreContext>,
|
||||
/// Currently watched locations
|
||||
watched_locations: Arc<RwLock<HashMap<Uuid, WatchedLocation>>>,
|
||||
/// Ephemeral watches (shallow, non-recursive) keyed by path
|
||||
ephemeral_watches: Arc<RwLock<HashMap<PathBuf, EphemeralWatch>>>,
|
||||
/// File system watcher
|
||||
watcher: Arc<RwLock<Option<RecommendedWatcher>>>,
|
||||
/// Whether the service is running
|
||||
@ -170,6 +172,15 @@ pub struct WatchedLocation {
|
||||
pub rule_toggles: crate::ops::indexing::rules::RuleToggles,
|
||||
}
|
||||
|
||||
/// Information about an ephemeral watch (shallow, non-recursive)
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct EphemeralWatch {
|
||||
/// Path being watched
|
||||
pub path: PathBuf,
|
||||
/// Indexing rule toggles for filtering events
|
||||
pub rule_toggles: crate::ops::indexing::rules::RuleToggles,
|
||||
}
|
||||
|
||||
impl LocationWatcher {
|
||||
/// Create a new location watcher
|
||||
pub fn new(
|
||||
@ -184,6 +195,7 @@ impl LocationWatcher {
|
||||
events,
|
||||
context,
|
||||
watched_locations: Arc::new(RwLock::new(HashMap::new())),
|
||||
ephemeral_watches: Arc::new(RwLock::new(HashMap::new())),
|
||||
watcher: Arc::new(RwLock::new(None)),
|
||||
is_running: Arc::new(RwLock::new(false)),
|
||||
platform_handler,
|
||||
@ -508,6 +520,135 @@ impl LocationWatcher {
|
||||
.collect()
|
||||
}
|
||||
|
||||
// ========================================================================
|
||||
// Ephemeral Watch Support (shallow, non-recursive)
|
||||
// ========================================================================
|
||||
|
||||
/// Add an ephemeral watch for a directory (shallow, immediate children only).
|
||||
///
|
||||
/// Unlike location watches which are recursive, ephemeral watches only monitor
|
||||
/// immediate children of the watched directory. This is appropriate for ephemeral
|
||||
/// browsing where only the current directory's contents are indexed.
|
||||
///
|
||||
/// The path should already be indexed in the ephemeral cache before calling this.
|
||||
pub async fn add_ephemeral_watch(
|
||||
&self,
|
||||
path: PathBuf,
|
||||
rule_toggles: crate::ops::indexing::rules::RuleToggles,
|
||||
) -> Result<()> {
|
||||
// Check if path is valid
|
||||
if !path.exists() {
|
||||
return Err(anyhow::anyhow!(
|
||||
"Cannot watch non-existent path: {}",
|
||||
path.display()
|
||||
));
|
||||
}
|
||||
|
||||
if !path.is_dir() {
|
||||
return Err(anyhow::anyhow!(
|
||||
"Cannot watch non-directory path: {}",
|
||||
path.display()
|
||||
));
|
||||
}
|
||||
|
||||
// Check if already watching
|
||||
{
|
||||
let watches = self.ephemeral_watches.read().await;
|
||||
if watches.contains_key(&path) {
|
||||
debug!("Already watching ephemeral path: {}", path.display());
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
|
||||
// Register in ephemeral cache
|
||||
self.context
|
||||
.ephemeral_cache()
|
||||
.register_for_watching(path.clone());
|
||||
|
||||
// Add to our tracking
|
||||
{
|
||||
let mut watches = self.ephemeral_watches.write().await;
|
||||
watches.insert(
|
||||
path.clone(),
|
||||
EphemeralWatch {
|
||||
path: path.clone(),
|
||||
rule_toggles,
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
// Add to file system watcher with NonRecursive mode
|
||||
if *self.is_running.read().await {
|
||||
if let Some(watcher) = self.watcher.write().await.as_mut() {
|
||||
watcher.watch(&path, RecursiveMode::NonRecursive)?;
|
||||
info!("Started shallow ephemeral watch for: {}", path.display());
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Remove an ephemeral watch
|
||||
pub async fn remove_ephemeral_watch(&self, path: &Path) -> Result<()> {
|
||||
let watch = {
|
||||
let mut watches = self.ephemeral_watches.write().await;
|
||||
watches.remove(path)
|
||||
};
|
||||
|
||||
if let Some(watch) = watch {
|
||||
// Unregister from ephemeral cache
|
||||
self.context
|
||||
.ephemeral_cache()
|
||||
.unregister_from_watching(&watch.path);
|
||||
|
||||
// Remove from file system watcher
|
||||
if *self.is_running.read().await {
|
||||
if let Some(watcher) = self.watcher.write().await.as_mut() {
|
||||
if let Err(e) = watcher.unwatch(&watch.path) {
|
||||
warn!(
|
||||
"Failed to unwatch ephemeral path {}: {}",
|
||||
watch.path.display(),
|
||||
e
|
||||
);
|
||||
} else {
|
||||
info!("Stopped ephemeral watch for: {}", watch.path.display());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Get all ephemeral watches
|
||||
pub async fn get_ephemeral_watches(&self) -> Vec<EphemeralWatch> {
|
||||
self.ephemeral_watches
|
||||
.read()
|
||||
.await
|
||||
.values()
|
||||
.cloned()
|
||||
.collect()
|
||||
}
|
||||
|
||||
/// Check if a path has an ephemeral watch
|
||||
pub async fn has_ephemeral_watch(&self, path: &Path) -> bool {
|
||||
self.ephemeral_watches.read().await.contains_key(path)
|
||||
}
|
||||
|
||||
/// Find the ephemeral watch that covers a given path (if any).
|
||||
///
|
||||
/// For shallow watches, only returns a match if the path is an immediate
|
||||
/// child of a watched directory.
|
||||
pub async fn find_ephemeral_watch_for_path(&self, path: &Path) -> Option<EphemeralWatch> {
|
||||
let watches = self.ephemeral_watches.read().await;
|
||||
|
||||
// Get the parent directory of the event path
|
||||
let parent = path.parent()?;
|
||||
|
||||
// Check if the parent is being watched
|
||||
watches.get(parent).cloned()
|
||||
}
|
||||
|
||||
/// Load existing locations from the database and add them to the watcher
|
||||
async fn load_existing_locations(&self) -> Result<()> {
|
||||
info!("Loading existing locations from database...");
|
||||
@ -674,11 +815,13 @@ impl LocationWatcher {
|
||||
async fn start_event_loop(&self) -> Result<()> {
|
||||
let platform_handler = self.platform_handler.clone();
|
||||
let watched_locations = self.watched_locations.clone();
|
||||
let ephemeral_watches = self.ephemeral_watches.clone();
|
||||
let workers = self.workers.clone();
|
||||
let is_running = self.is_running.clone();
|
||||
let debug_mode = self.config.debug_mode;
|
||||
let metrics = self.metrics.clone();
|
||||
let events = self.events.clone();
|
||||
let context = self.context.clone();
|
||||
|
||||
let (tx, mut rx) = mpsc::channel(self.config.event_buffer_size);
|
||||
let tx_clone = tx.clone();
|
||||
@ -731,6 +874,17 @@ impl LocationWatcher {
|
||||
}
|
||||
drop(locations);
|
||||
|
||||
// Watch all ephemeral paths (non-recursive/shallow)
|
||||
let ephemeral = ephemeral_watches.read().await;
|
||||
for watch in ephemeral.values() {
|
||||
watcher.watch(&watch.path, RecursiveMode::NonRecursive)?;
|
||||
info!(
|
||||
"Started shallow ephemeral watch for: {}",
|
||||
watch.path.display()
|
||||
);
|
||||
}
|
||||
drop(ephemeral);
|
||||
|
||||
// Store watcher
|
||||
*self.watcher.write().await = Some(watcher);
|
||||
|
||||
@ -762,6 +916,46 @@ impl LocationWatcher {
|
||||
FsRawEventKind::Rename { from, .. } => Some(from.as_path()),
|
||||
};
|
||||
|
||||
// First, check if this is an ephemeral watch event
|
||||
// For shallow watches, only process if path is immediate child
|
||||
let mut handled_by_ephemeral = false;
|
||||
if let Some(event_path) = event_path {
|
||||
let parent = event_path.parent();
|
||||
if let Some(parent_path) = parent {
|
||||
let ephemeral = ephemeral_watches.read().await;
|
||||
if let Some(watch) = ephemeral.get(parent_path) {
|
||||
debug!(
|
||||
"Ephemeral watch match for {}: parent {} is watched",
|
||||
event_path.display(),
|
||||
parent_path.display()
|
||||
);
|
||||
handled_by_ephemeral = true;
|
||||
|
||||
// Process via ephemeral handler
|
||||
let ctx = context.clone();
|
||||
let root = watch.path.clone();
|
||||
let toggles = watch.rule_toggles;
|
||||
let event_kind = kind.clone();
|
||||
|
||||
tokio::spawn(async move {
|
||||
if let Err(e) = crate::ops::indexing::ephemeral::responder::apply(
|
||||
&ctx,
|
||||
&root,
|
||||
event_kind,
|
||||
toggles,
|
||||
).await {
|
||||
warn!("Failed to process ephemeral event: {}", e);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Skip location matching if handled by ephemeral
|
||||
if handled_by_ephemeral {
|
||||
continue;
|
||||
}
|
||||
|
||||
// Find the location for this event by matching path prefix
|
||||
// CRITICAL: Must match by path, not just library_id, to avoid routing
|
||||
// events to the wrong location when multiple locations exist in one library
|
||||
@ -995,6 +1189,7 @@ impl LocationWatcher {
|
||||
events: events.clone(),
|
||||
context: context.clone(),
|
||||
watched_locations: watched_locations.clone(),
|
||||
ephemeral_watches: Arc::new(RwLock::new(HashMap::new())),
|
||||
watcher: watcher_ref.clone(),
|
||||
is_running: is_running.clone(),
|
||||
platform_handler: platform_handler.clone(),
|
||||
@ -1033,6 +1228,7 @@ impl LocationWatcher {
|
||||
events: events.clone(),
|
||||
context: context.clone(),
|
||||
watched_locations: watched_locations.clone(),
|
||||
ephemeral_watches: Arc::new(RwLock::new(HashMap::new())),
|
||||
watcher: watcher_ref.clone(),
|
||||
is_running: is_running.clone(),
|
||||
platform_handler: platform_handler.clone(),
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user