spacedrive/core/tests/resource_events_test.rs
Jamie Pine 69e825fe06 Revert sync to November 14th - Add new sync tests
1. Watermark Messages: - Nov 16: Simple - single my_state_watermark,
single state_watermark in response - Main: Complex - per-resource
my_resource_watermarks HashMap, resource counts, content hashes 2.
DataAvailableNotification: - Nov 16: Doesn't exist - Main: New message
type that triggers watermark exchanges 3. FK Mapper: - Nov 16: Sets
missing FKs to NULL, applies all records - Main: Marks records with
missing FKs as failed, filters them out 4. Watermark Storage (internal,
not in messages): - Nov 16: Single last_watermark per resource - Main:
Dual watermarks - cursor_watermark and
2025-11-24 14:27:32 -08:00

271 lines
7.4 KiB
Rust
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

//! Test that verifies ResourceChanged events are emitted during indexing
//!
//! This test indexes a directory and collects all ResourceChanged events
//! to verify the normalized cache event system works end-to-end.
use sd_core::{
infra::{
db::entities,
event::{Event, EventSubscriber},
},
location::{create_location, IndexMode, LocationCreateArgs},
Core,
};
use sea_orm::{ActiveModelTrait, ColumnTrait, EntityTrait, QueryFilter};
use std::{collections::HashMap, sync::Arc, time::Duration};
use tempfile::TempDir;
use tokio::time::timeout;
use tracing::{info, warn};
/// Test fixture that tracks all ResourceChanged events
struct EventCollector {
events: Arc<tokio::sync::Mutex<Vec<Event>>>,
subscriber: EventSubscriber,
}
impl EventCollector {
fn new(event_bus: &Arc<sd_core::infra::event::EventBus>) -> Self {
Self {
events: Arc::new(tokio::sync::Mutex::new(Vec::new())),
subscriber: event_bus.subscribe(),
}
}
async fn collect_events(&mut self, duration: Duration) {
let events = self.events.clone();
let mut event_count = 0;
let mut batch_event_count = 0;
let timeout_result = timeout(duration, async {
loop {
match self.subscriber.recv().await {
Ok(event) => {
event_count += 1;
// Log the event
match &event {
Event::ResourceChanged { resource_type, .. } => {
eprintln!("Received ResourceChanged event: {}", resource_type);
}
Event::ResourceChangedBatch { resource_type, resources } => {
batch_event_count += 1;
let count = if let Some(arr) = resources.as_array() {
arr.len()
} else {
0
};
eprintln!("Received ResourceChangedBatch event #{}: {} ({} items)", batch_event_count, resource_type, count);
}
Event::IndexingCompleted { .. } => {
eprintln!("Indexing completed");
}
Event::JobCompleted { job_type, .. } => {
eprintln!("Job completed: {}", job_type);
}
_ => {}
}
// Store all events
events.lock().await.push(event);
}
Err(e) => {
eprintln!(" Event receive error: {:?}", e);
eprintln!(" This might indicate dropped events or channel overflow!");
break;
}
}
}
})
.await;
if timeout_result.is_err() {
eprintln!(" Event collection timed out (collected {} events, {} batch events)", event_count, batch_event_count);
}
}
async fn get_events(&self) -> Vec<Event> {
self.events.lock().await.clone()
}
/// Analyze collected events and return statistics
async fn analyze(&self) -> EventStats {
let events = self.events.lock().await;
let mut stats = EventStats::default();
for event in events.iter() {
match event {
Event::ResourceChanged { resource_type, .. } => {
*stats.resource_changed.entry(resource_type.clone()).or_insert(0) += 1;
}
Event::ResourceChangedBatch { resource_type, resources } => {
let count = if let Some(arr) = resources.as_array() {
arr.len()
} else {
1
};
*stats.resource_changed_batch.entry(resource_type.clone()).or_insert(0) += count;
}
Event::IndexingStarted { .. } => {
stats.indexing_started += 1;
}
Event::IndexingCompleted { .. } => {
stats.indexing_completed += 1;
}
Event::JobStarted { job_type, .. } => {
*stats.jobs_started.entry(job_type.clone()).or_insert(0) += 1;
}
Event::JobCompleted { job_type, .. } => {
*stats.jobs_completed.entry(job_type.clone()).or_insert(0) += 1;
}
_ => {}
}
}
stats
}
}
#[derive(Debug, Default)]
struct EventStats {
resource_changed: HashMap<String, usize>,
resource_changed_batch: HashMap<String, usize>,
indexing_started: usize,
indexing_completed: usize,
jobs_started: HashMap<String, usize>,
jobs_completed: HashMap<String, usize>,
}
impl EventStats {
fn print(&self) {
eprintln!("\nEvent Statistics:");
eprintln!("==================");
eprintln!("\nResourceChanged events:");
if self.resource_changed.is_empty() {
eprintln!(" (none)");
}
for (resource_type, count) in &self.resource_changed {
eprintln!(" {}{} events", resource_type, count);
}
eprintln!("\nResourceChangedBatch events:");
if self.resource_changed_batch.is_empty() {
eprintln!(" (none)");
}
for (resource_type, count) in &self.resource_changed_batch {
eprintln!(" {}{} resources", resource_type, count);
}
eprintln!("\nIndexing events:");
eprintln!(" Started: {}", self.indexing_started);
eprintln!(" Completed: {}", self.indexing_completed);
eprintln!("\n Job events:");
eprintln!(" Started:");
for (job_type, count) in &self.jobs_started {
eprintln!(" {}{}", job_type, count);
}
eprintln!(" Completed:");
for (job_type, count) in &self.jobs_completed {
eprintln!(" {}{}", job_type, count);
}
}
}
#[tokio::test]
async fn test_resource_events_during_indexing() -> Result<(), Box<dyn std::error::Error>> {
tracing_subscriber::fmt::init();
eprintln!("\nStarting resource events test\n");
// Setup
let temp_dir = TempDir::new()?;
let core = Core::new(temp_dir.path().to_path_buf()).await?;
// Create library
let library = core
.libraries
.create_library("Resource Events Test Library", None, core.context.clone())
.await?;
eprintln!("Created test library");
// Use Desktop directory for real-world testing
let desktop_path = dirs::desktop_dir()
.expect("Could not find Desktop directory");
eprintln!("Using Desktop directory: {:?}", desktop_path);
// Register device
let db = library.db();
let device = core.device.to_device()?;
let device_record = match entities::device::Entity::find()
.filter(entities::device::Column::Uuid.eq(device.id))
.one(db.conn())
.await?
{
Some(existing) => existing,
None => {
let device_model: entities::device::ActiveModel = device.into();
device_model.insert(db.conn()).await?
}
};
// Start event collection
let event_bus = core.events.clone();
let collection_handle = {
let mut collector = EventCollector::new(&event_bus);
tokio::spawn(async move {
collector.collect_events(Duration::from_secs(60)).await;
collector
})
};
tokio::time::sleep(Duration::from_millis(100)).await;
// Create location and start indexing with Content mode
eprintln!("Starting Content mode indexing on Desktop...");
let location_args = LocationCreateArgs {
path: desktop_path.clone(),
name: Some("Desktop Test Location".to_string()),
index_mode: IndexMode::Content, // Content mode with hashing
};
let _location_db_id = create_location(
library.clone(),
&core.events,
location_args,
device_record.id,
)
.await?;
eprintln!("Waiting for indexing to complete (up to 2 minutes)...");
// Wait longer for Desktop indexing
tokio::time::sleep(Duration::from_secs(120)).await;
eprintln!("\nAnalyzing collected events...\n");
let collector = collection_handle.await.unwrap();
let stats = collector.analyze().await;
stats.print();
// Assertions
let total_events = collector.get_events().await.len();
eprintln!("\nTotal events received: {}", total_events);
let file_events = stats.resource_changed_batch.get("file").unwrap_or(&0);
eprintln!("File ResourceChangedBatch events: {}", file_events);
if *file_events > 0 {
eprintln!("\nSUCCESS: Normalized cache for Files is working!");
eprintln!(" Received {} file resource events during indexing", file_events);
} else {
eprintln!("\nFAIL: No file resource events received");
eprintln!(" The normalized cache system is not emitting file events");
}
Ok(())
}