Enhance event handling for resource deletion in ephemeral writer

- Updated the `delete` method in `MemoryAdapter` to emit a `ResourceDeleted` event, allowing the frontend to update its cache accordingly.
- Modified `CollectedCoreEvent` structure to include an `event_type` field for better event categorization.
- Enhanced event collection logic in `CoreEventCollector` to handle both `ResourceChanged` and `ResourceDeleted` events, improving event summary reporting.
This commit is contained in:
Jamie Pine 2025-12-09 17:09:47 -08:00
parent b16a015746
commit cf998edb03
2 changed files with 107 additions and 67 deletions

View File

@ -5,7 +5,7 @@
//! Both pipelines share the same entry storage logic, UUID generation, and event
//! emission, eliminating code duplication.
//!
use crate::infra::event::EventBus;
use crate::infra::event::{Event, EventBus};
use crate::infra::job::prelude::{JobError, JobResult};
use crate::ops::indexing::change_detection::handler::{build_dir_entry, ChangeHandler};
use crate::ops::indexing::change_detection::types::{ChangeType, EntryRef};
@ -212,6 +212,9 @@ impl ChangeHandler for MemoryAdapter {
}
async fn delete(&mut self, entry: &EntryRef) -> Result<()> {
// Get the UUID before deleting (needed for event)
let uuid = entry.uuid;
{
let mut index = self.index.write().await;
@ -222,6 +225,19 @@ impl ChangeHandler for MemoryAdapter {
}
}
// Emit ResourceDeleted event so frontend can remove from cache
if let Some(resource_id) = uuid {
tracing::debug!(
"Emitting ResourceDeleted for ephemeral delete: {} (id: {})",
entry.path.display(),
resource_id
);
self.event_bus.emit(Event::ResourceDeleted {
resource_type: "file".to_string(),
resource_id,
});
}
Ok(())
}

View File

@ -128,6 +128,7 @@ impl FsEventCollector {
/// Collected core event with timestamp and extracted info
struct CollectedCoreEvent {
timestamp: std::time::Instant,
event_type: String, // "ResourceChanged" or "ResourceDeleted"
resource_type: String,
// Extracted fields
id: Option<String>,
@ -163,59 +164,85 @@ impl CoreEventCollector {
loop {
match subscriber.recv().await {
Ok(event) => {
if let Event::ResourceChanged {
resource_type,
resource,
..
} = event
{
// Extract various fields from resource
let id = resource
.get("id")
.and_then(|v| v.as_str())
.map(String::from);
let name = resource
.get("name")
.and_then(|v| v.as_str())
.map(String::from);
let extension = resource
.get("extension")
.and_then(|v| v.as_str())
.map(String::from);
let content_kind = resource
.get("content_kind")
.and_then(|v| v.as_str())
.map(String::from);
// Check if it's a directory from "kind" field
let is_dir = resource
.get("kind")
.and_then(|v| v.as_str())
.map(|k| k == "Directory");
let size = resource.get("size").and_then(|v| v.as_u64());
// Store compact JSON representation
let resource_json =
serde_json::to_string_pretty(&resource).unwrap_or_default();
let collected = CollectedCoreEvent {
timestamp: std::time::Instant::now(),
match event {
Event::ResourceChanged {
resource_type,
id,
name,
extension,
content_kind,
is_dir,
size,
resource_json,
};
resource,
..
} => {
// Extract various fields from resource
let id = resource
.get("id")
.and_then(|v| v.as_str())
.map(String::from);
let mut events_lock = events.lock().await;
events_lock.push(collected);
let name = resource
.get("name")
.and_then(|v| v.as_str())
.map(String::from);
let extension = resource
.get("extension")
.and_then(|v| v.as_str())
.map(String::from);
let content_kind = resource
.get("content_kind")
.and_then(|v| v.as_str())
.map(String::from);
// Check if it's a directory from "kind" field
let is_dir = resource
.get("kind")
.and_then(|v| v.as_str())
.map(|k| k == "Directory");
let size = resource.get("size").and_then(|v| v.as_u64());
// Store compact JSON representation
let resource_json =
serde_json::to_string_pretty(&resource).unwrap_or_default();
let collected = CollectedCoreEvent {
timestamp: std::time::Instant::now(),
event_type: "ResourceChanged".to_string(),
resource_type,
id,
name,
extension,
content_kind,
is_dir,
size,
resource_json,
};
let mut events_lock = events.lock().await;
events_lock.push(collected);
}
Event::ResourceDeleted {
resource_type,
resource_id,
} => {
let collected = CollectedCoreEvent {
timestamp: std::time::Instant::now(),
event_type: "ResourceDeleted".to_string(),
resource_type,
id: Some(resource_id.to_string()),
name: None,
extension: None,
content_kind: None,
is_dir: None,
size: None,
resource_json: format!(
"{{\"deleted_id\": \"{}\"}}",
resource_id
),
};
let mut events_lock = events.lock().await;
events_lock.push(collected);
}
_ => {} // Ignore other event types
}
}
Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
@ -243,9 +270,10 @@ impl CoreEventCollector {
writeln!(file, "{}", "-".repeat(70))?;
writeln!(
file,
"[{:03}] +{:.3}s | ResourceChanged ({})",
"[{:03}] +{:.3}s | {} ({})",
i,
elapsed.as_secs_f64(),
event.event_type,
event.resource_type
)?;
writeln!(file, "{}", "-".repeat(70))?;
@ -305,26 +333,22 @@ impl CoreEventCollector {
let events = self.events.lock().await;
println!("\n=== Core Event Summary ===");
println!("Total ResourceChanged events: {}", events.len());
println!("Total events: {}", events.len());
// Count by resource type
let mut files = 0;
let mut dirs = 0;
let mut other = 0;
// Count by event type
let mut changed = 0;
let mut deleted = 0;
for event in events.iter() {
match event.resource_type.as_str() {
"file" => files += 1,
"directory" => dirs += 1,
_ => other += 1,
match event.event_type.as_str() {
"ResourceChanged" => changed += 1,
"ResourceDeleted" => deleted += 1,
_ => {}
}
}
println!(" Files: {}", files);
println!(" Directories: {}", dirs);
if other > 0 {
println!(" Other: {}", other);
}
println!(" ResourceChanged: {}", changed);
println!(" ResourceDeleted: {}", deleted);
println!("==========================\n");
}
}