Enhance EphemeralIndex entry removal and introduce event collectors

- Updated `remove_entry` and `remove_directory_tree` methods in `EphemeralIndex` to ensure proper removal of entries from parent nodes' children lists.
- Introduced `FsEventCollector` and `CoreEventCollector` for improved event collection from the filesystem watcher and core event bus, respectively.
- Enhanced test harness to utilize the new collectors, ensuring accurate event logging and summary reporting.
- Added scenarios to test delete and restore patterns, verifying correct entry counts and preventing duplicate entries.
This commit is contained in:
Jamie Pine 2025-12-09 17:01:49 -08:00
parent 1adabc7999
commit b16a015746
2 changed files with 546 additions and 23 deletions

View File

@ -448,16 +448,33 @@ impl EphemeralIndex {
/// For directories, this only removes the directory entry itself, not its children.
/// Use `remove_directory_tree` to remove a directory and all its descendants.
pub fn remove_entry(&mut self, path: &Path) -> bool {
let existed = self.path_index.remove(path).is_some();
// Get the entry ID before removing from path_index
let entry_id = self.path_index.remove(path);
self.entry_uuids.remove(path);
self.content_kinds.remove(path);
existed
// Also remove from parent's children list in arena
if let Some(id) = entry_id {
// Get the parent's entry ID
if let Some(parent_path) = path.parent() {
if let Some(&parent_id) = self.path_index.get(parent_path) {
if let Some(parent_node) = self.arena.get_mut(parent_id) {
parent_node.children.retain(|child_id| *child_id != id);
}
}
}
}
entry_id.is_some()
}
/// Remove a directory and all its descendants.
///
/// Returns the number of entries removed.
pub fn remove_directory_tree(&mut self, path: &Path) -> usize {
// First, get the entry ID for the root directory to remove from parent
let root_id = self.path_index.get(path).copied();
let prefix = path.to_string_lossy().to_string();
let keys_to_remove: Vec<_> = self
.path_index
@ -475,6 +492,18 @@ impl EphemeralIndex {
self.entry_uuids.remove(&key);
self.content_kinds.remove(&key);
}
// Remove root directory from parent's children list
if let Some(id) = root_id {
if let Some(parent_path) = path.parent() {
if let Some(&parent_id) = self.path_index.get(parent_path) {
if let Some(parent_node) = self.arena.get_mut(parent_id) {
parent_node.children.retain(|child_id| *child_id != id);
}
}
}
}
count
}

View File

@ -6,6 +6,7 @@
use sd_core::{
context::CoreContext,
infra::event::Event,
library::Library,
ops::indexing::{
job::{IndexScope, IndexerJob, IndexerJobConfig},
@ -21,16 +22,16 @@ use tokio::sync::Mutex;
use tokio::time::timeout;
// ============================================================================
// Event Collector for Debugging
// FsWatcher Event Collector (raw filesystem events)
// ============================================================================
/// Collects FsEvents for diagnostic output
struct EventCollector {
/// Collects FsEvents from the watcher for diagnostic output
struct FsEventCollector {
events: Arc<Mutex<Vec<(std::time::Instant, FsEvent)>>>,
start_time: std::time::Instant,
}
impl EventCollector {
impl FsEventCollector {
fn new() -> Self {
Self {
events: Arc::new(Mutex::new(Vec::new())),
@ -52,7 +53,7 @@ impl EventCollector {
}
Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
eprintln!("Event collector lagged by {} events", n);
eprintln!("FsEvent collector lagged by {} events", n);
}
}
}
@ -95,7 +96,7 @@ impl EventCollector {
async fn print_summary(&self) {
let events = self.events.lock().await;
println!("\n=== Event Summary ===");
println!("\n=== FsWatcher Event Summary ===");
println!("Total events: {}", events.len());
let mut creates = 0;
@ -116,7 +117,215 @@ impl EventCollector {
println!(" Modifies: {}", modifies);
println!(" Removes: {}", removes);
println!(" Renames: {}", renames);
println!("===================\n");
println!("===============================\n");
}
}
// ============================================================================
// Core Event Collector (ResourceChanged events from event bus)
// ============================================================================
/// Collected core event with timestamp and extracted info
struct CollectedCoreEvent {
timestamp: std::time::Instant,
resource_type: String,
// Extracted fields
id: Option<String>,
name: Option<String>,
extension: Option<String>,
content_kind: Option<String>,
is_dir: Option<bool>,
size: Option<u64>,
// Full resource JSON for detailed inspection
resource_json: String,
}
/// Collects ResourceChanged events from the Core's event bus
struct CoreEventCollector {
events: Arc<Mutex<Vec<CollectedCoreEvent>>>,
start_time: std::time::Instant,
}
impl CoreEventCollector {
fn new() -> Self {
Self {
events: Arc::new(Mutex::new(Vec::new())),
start_time: std::time::Instant::now(),
}
}
/// Start collecting events from the core's event bus
fn start_collecting(&self, core: &Core) {
let events = self.events.clone();
let mut subscriber = core.events.subscribe();
tokio::spawn(async move {
loop {
match subscriber.recv().await {
Ok(event) => {
if let Event::ResourceChanged {
resource_type,
resource,
..
} = event
{
// Extract various fields from resource
let id = resource
.get("id")
.and_then(|v| v.as_str())
.map(String::from);
let name = resource
.get("name")
.and_then(|v| v.as_str())
.map(String::from);
let extension = resource
.get("extension")
.and_then(|v| v.as_str())
.map(String::from);
let content_kind = resource
.get("content_kind")
.and_then(|v| v.as_str())
.map(String::from);
// Check if it's a directory from "kind" field
let is_dir = resource
.get("kind")
.and_then(|v| v.as_str())
.map(|k| k == "Directory");
let size = resource.get("size").and_then(|v| v.as_u64());
// Store compact JSON representation
let resource_json =
serde_json::to_string_pretty(&resource).unwrap_or_default();
let collected = CollectedCoreEvent {
timestamp: std::time::Instant::now(),
resource_type,
id,
name,
extension,
content_kind,
is_dir,
size,
resource_json,
};
let mut events_lock = events.lock().await;
events_lock.push(collected);
}
}
Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
eprintln!("Core event collector lagged by {} events", n);
}
}
}
});
}
/// Dump collected events to a file
async fn dump_to_file(&self, path: &std::path::Path) -> std::io::Result<()> {
use std::io::Write;
let events = self.events.lock().await;
let mut file = std::fs::File::create(path)?;
writeln!(file, "=== Core ResourceChanged Event Log ===")?;
writeln!(file, "Total events collected: {}", events.len())?;
writeln!(file, "")?;
for (i, event) in events.iter().enumerate() {
let elapsed = event.timestamp.duration_since(self.start_time);
writeln!(file, "{}", "-".repeat(70))?;
writeln!(
file,
"[{:03}] +{:.3}s | ResourceChanged ({})",
i,
elapsed.as_secs_f64(),
event.resource_type
)?;
writeln!(file, "{}", "-".repeat(70))?;
// Summary line
let name_str = event.name.as_deref().unwrap_or("?");
let ext_str = event
.extension
.as_ref()
.map(|e| format!(".{}", e))
.unwrap_or_default();
let kind_str = event.content_kind.as_deref().unwrap_or("unknown");
let dir_str = if event.is_dir == Some(true) {
" [DIR]"
} else {
""
};
let size_str = event
.size
.map(|s| format!(" ({} bytes)", s))
.unwrap_or_default();
writeln!(
file,
" {} {}{}{}{} | kind: {}",
if event.is_dir == Some(true) {
"📁"
} else {
"📄"
},
name_str,
ext_str,
dir_str,
size_str,
kind_str
)?;
if let Some(ref id) = event.id {
writeln!(file, " id: {}", id)?;
}
// Full JSON (indented)
writeln!(file, "\n Full Resource JSON:")?;
for line in event.resource_json.lines() {
writeln!(file, " {}", line)?;
}
writeln!(file, "")?;
}
writeln!(file, "=== End of Core Event Log ===")?;
Ok(())
}
/// Print summary to console
async fn print_summary(&self) {
let events = self.events.lock().await;
println!("\n=== Core Event Summary ===");
println!("Total ResourceChanged events: {}", events.len());
// Count by resource type
let mut files = 0;
let mut dirs = 0;
let mut other = 0;
for event in events.iter() {
match event.resource_type.as_str() {
"file" => files += 1,
"directory" => dirs += 1,
_ => other += 1,
}
}
println!(" Files: {}", files);
println!(" Directories: {}", dirs);
if other > 0 {
println!(" Other: {}", other);
}
println!("==========================\n");
}
}
@ -132,7 +341,8 @@ struct TestHarness {
test_dir: PathBuf,
watcher: Arc<FsWatcherService>,
context: Arc<CoreContext>,
event_collector: EventCollector,
fs_event_collector: FsEventCollector,
core_event_collector: CoreEventCollector,
}
impl TestHarness {
@ -191,10 +401,15 @@ impl TestHarness {
watcher.start().await?;
println!("✓ Started watcher service");
// Create event collector and start collecting
let event_collector = EventCollector::new();
event_collector.start_collecting(&watcher);
println!("✓ Started event collector");
// Create FsEvent collector and start collecting from watcher
let fs_event_collector = FsEventCollector::new();
fs_event_collector.start_collecting(&watcher);
println!("✓ Started FsEvent collector");
// Create Core event collector and start collecting from event bus
let core_event_collector = CoreEventCollector::new();
core_event_collector.start_collecting(&core);
println!("✓ Started Core event collector");
// Run ephemeral indexing job
let sd_path = sd_core::domain::addressing::SdPath::local(test_dir.clone());
@ -233,7 +448,8 @@ impl TestHarness {
test_dir,
watcher,
context,
event_collector,
fs_event_collector,
core_event_collector,
})
}
@ -309,6 +525,32 @@ impl TestHarness {
Ok(())
}
/// Move a file to a location outside the watched directory (simulates trash)
async fn move_to_trash(
&self,
name: &str,
trash_dir: &std::path::Path,
) -> Result<(), Box<dyn std::error::Error>> {
let from_path = self.path(name);
let to_path = trash_dir.join(name);
tokio::fs::rename(&from_path, &to_path).await?;
println!("Moved to trash: {} -> {}", name, to_path.display());
Ok(())
}
/// Move a file back from trash (simulates undo delete / restore)
async fn restore_from_trash(
&self,
name: &str,
trash_dir: &std::path::Path,
) -> Result<(), Box<dyn std::error::Error>> {
let from_path = trash_dir.join(name);
let to_path = self.path(name);
tokio::fs::rename(&from_path, &to_path).await?;
println!("Restored from trash: {} -> {}", from_path.display(), name);
Ok(())
}
/// Create multiple directories at the top level
async fn create_batch_dirs(&self, dirs: &[&str]) -> Result<(), Box<dyn std::error::Error>> {
for dir in dirs {
@ -422,6 +664,35 @@ impl TestHarness {
.count()
}
/// Get children count using list_directory (like the UI does)
/// This is the critical check - it uses the arena's children list
async fn get_children_count(&self) -> usize {
let index = self.context.ephemeral_cache().get_global_index();
let index_lock = index.read().await;
index_lock
.list_directory(&self.test_dir)
.map(|v| v.len())
.unwrap_or(0)
}
/// Verify children count using list_directory (catches the orphan bug)
async fn verify_children_count(
&self,
expected: usize,
) -> Result<(), Box<dyn std::error::Error>> {
let count = self.get_children_count().await;
if count == expected {
println!("✓ Children count (list_directory) matches: {}", expected);
Ok(())
} else {
Err(format!(
"Children count mismatch: expected {} but found {} (using list_directory)",
expected, count
)
.into())
}
}
/// Verify expected entry count
async fn verify_entry_count(&self, expected: usize) -> Result<(), Box<dyn std::error::Error>> {
let count = self.get_entry_count().await;
@ -483,17 +754,28 @@ impl TestHarness {
println!("=============================\n");
}
/// Dump collected events to file and print summary
/// Dump collected events to files and print summaries
async fn dump_events(&self) {
// Print summary to console
self.event_collector.print_summary().await;
// Print FsWatcher event summary
self.fs_event_collector.print_summary().await;
// Write detailed log to file
let log_path = std::env::temp_dir().join("ephemeral_watcher_events.log");
if let Err(e) = self.event_collector.dump_to_file(&log_path).await {
eprintln!("Failed to write event log: {}", e);
// Print Core event summary
self.core_event_collector.print_summary().await;
// Write FsWatcher events to file
let fs_log_path = std::env::temp_dir().join("ephemeral_watcher_fs_events.log");
if let Err(e) = self.fs_event_collector.dump_to_file(&fs_log_path).await {
eprintln!("Failed to write FsEvent log: {}", e);
} else {
println!("📝 Event log written to: {}", log_path.display());
println!("📝 FsEvent log written to: {}", fs_log_path.display());
}
// Write Core events to file
let core_log_path = std::env::temp_dir().join("ephemeral_watcher_core_events.log");
if let Err(e) = self.core_event_collector.dump_to_file(&core_log_path).await {
eprintln!("Failed to write Core event log: {}", e);
} else {
println!("📝 Core event log written to: {}", core_log_path.display());
}
}
@ -667,6 +949,218 @@ async fn run_test_scenarios(harness: &TestHarness) -> Result<(), Box<dyn std::er
println!("✓ All batch-deleted entries removed from index");
// ========================================================================
// Scenario 9: Delete + Undo (Restore) - Tests for duplicate entry bug
// ========================================================================
println!("\n--- Scenario 9: Delete + Undo (Restore) Pattern ---");
println!("This tests for the duplicate entry bug when files are restored after deletion");
// Create a temporary "trash" directory outside the watched directory
let trash_dir = std::env::temp_dir().join("sd_test_trash");
if trash_dir.exists() {
tokio::fs::remove_dir_all(&trash_dir).await?;
}
tokio::fs::create_dir_all(&trash_dir).await?;
println!("Created trash directory: {}", trash_dir.display());
// Create files to test delete + restore
let restore_files = [
"restore_test1.txt",
"restore_test2.txt",
"restore_test3.txt",
];
harness.create_batch_files(&restore_files).await?;
// Wait for creates to be processed
tokio::time::sleep(Duration::from_millis(500)).await;
// Verify files are in the index
for file in &restore_files {
harness.verify_entry_exists(file).await?;
}
// Record entry count before delete (using both methods)
let count_before_delete = harness.get_entry_count().await;
let children_before_delete = harness.get_children_count().await;
println!("Entry count before delete: {}", count_before_delete);
println!(
"Children count (list_directory) before delete: {}",
children_before_delete
);
harness.dump_index_state().await;
// "Delete" files by moving to trash (simulates Finder trash)
println!("\nMoving files to trash (simulating delete)...");
for file in &restore_files {
harness.move_to_trash(file, &trash_dir).await?;
}
// Wait for deletes to be processed
tokio::time::sleep(Duration::from_millis(500)).await;
// Verify files are removed from index
for file in &restore_files {
harness.verify_entry_not_exists(file).await?;
}
let count_after_delete = harness.get_entry_count().await;
let children_after_delete = harness.get_children_count().await;
println!("Entry count after delete: {}", count_after_delete);
println!(
"Children count (list_directory) after delete: {}",
children_after_delete
);
harness.dump_index_state().await;
// Verify count decreased by the number of deleted files
assert_eq!(
count_after_delete,
count_before_delete - restore_files.len(),
"Entry count should decrease by {} after delete",
restore_files.len()
);
// CRITICAL: Also verify children count using list_directory (like UI does)
assert_eq!(
children_after_delete,
children_before_delete - restore_files.len(),
"ORPHAN BUG: Children count (list_directory) should decrease by {} after delete. \
path_index shows {} but list_directory shows {}",
restore_files.len(),
count_after_delete,
children_after_delete
);
// "Restore" files by moving back from trash (simulates Undo delete)
println!("\nRestoring files from trash (simulating undo delete)...");
for file in &restore_files {
harness.restore_from_trash(file, &trash_dir).await?;
}
// Wait for restores to be processed
tokio::time::sleep(Duration::from_millis(500)).await;
// Verify files are back in the index
for file in &restore_files {
harness.verify_entry_exists(file).await?;
}
let count_after_restore = harness.get_entry_count().await;
let children_after_restore = harness.get_children_count().await;
println!("Entry count after restore: {}", count_after_restore);
println!(
"Children count (list_directory) after restore: {}",
children_after_restore
);
harness.dump_index_state().await;
// THIS IS THE KEY ASSERTION - count should match the original
// If there's a duplicate entry bug, count_after_restore will be higher
assert_eq!(
count_after_restore, count_before_delete,
"DUPLICATE ENTRY BUG DETECTED: Entry count after restore ({}) should equal count before delete ({}). \
Expected {} entries but found {}. This indicates files were added without removing stale entries.",
count_after_restore,
count_before_delete,
count_before_delete,
count_after_restore
);
// CRITICAL: Also verify children count (this catches the orphan bug the UI sees)
assert_eq!(
children_after_restore, children_before_delete,
"ORPHAN BUG IN list_directory: Children count after restore ({}) should equal before delete ({}). \
path_index shows {} but list_directory shows {}. This is the bug you saw in the UI!",
children_after_restore,
children_before_delete,
count_after_restore,
children_after_restore
);
println!("✓ Delete + Restore pattern: entry count is correct (no duplicates)");
println!("✓ Delete + Restore pattern: children count (list_directory) is correct");
// Clean up trash directory
tokio::fs::remove_dir_all(&trash_dir).await?;
// ========================================================================
// Scenario 10: Screenshot Pattern (rapid create + write)
// ========================================================================
println!("\n--- Scenario 10: Screenshot Pattern (rapid create + write) ---");
println!("This tests for duplicate entries when file is created then immediately written");
let screenshot_file = "screenshot_test.png";
let screenshot_path = harness.path(screenshot_file);
// Record counts before
let children_before_screenshot = harness.get_children_count().await;
println!(
"Children count before screenshot: {}",
children_before_screenshot
);
// Simulate screenshot behavior (aggressive version):
// macOS creates file, writes header, then writes full data in rapid succession
// We'll do multiple rapid writes to try to trigger race conditions
// 1. Create with tiny content
tokio::fs::write(&screenshot_path, b"x").await?;
println!("Created with tiny content: {}", screenshot_file);
// 2. Immediate overwrites (no delay - trying to trigger race)
tokio::fs::write(&screenshot_path, b"xx").await?;
tokio::fs::write(&screenshot_path, b"xxx").await?;
// 3. Write "header"
let fake_png_header = vec![0x89, 0x50, 0x4E, 0x47, 0x0D, 0x0A, 0x1A, 0x0A];
tokio::fs::write(&screenshot_path, &fake_png_header).await?;
println!("Wrote header ({} bytes)", fake_png_header.len());
// 4. Write full content (final state)
let mut full_content = fake_png_header.clone();
full_content.extend(vec![0u8; 50000]); // Simulate ~50KB image
tokio::fs::write(&screenshot_path, &full_content).await?;
println!(
"Wrote full content ({} bytes): {}",
full_content.len(),
screenshot_file
);
// Wait for watcher to process
tokio::time::sleep(Duration::from_millis(500)).await;
// Verify only ONE entry exists
harness.verify_entry_exists(screenshot_file).await?;
let children_after_screenshot = harness.get_children_count().await;
println!(
"Children count after screenshot: {}",
children_after_screenshot
);
// Should have exactly ONE more entry (the screenshot file)
assert_eq!(
children_after_screenshot,
children_before_screenshot + 1,
"DUPLICATE ENTRY BUG: Screenshot pattern created {} entries instead of 1. \
Expected {} but got {} children",
children_after_screenshot - children_before_screenshot,
children_before_screenshot + 1,
children_after_screenshot
);
println!("✓ Screenshot pattern: only one entry created (no duplicates)");
// Clean up
harness.delete_file(screenshot_file).await?;
tokio::time::sleep(Duration::from_millis(500)).await;
// Clean up test files for final state
for file in &restore_files {
harness.delete_file(file).await?;
}
tokio::time::sleep(Duration::from_millis(500)).await;
// ========================================================================
// Final State Verification
// ========================================================================