From 0bae97746a4c2c921a6a79384e87e0bc30468e56 Mon Sep 17 00:00:00 2001 From: Jamie Pine Date: Tue, 9 Dec 2025 19:41:23 -0800 Subject: [PATCH] Add EventBuffer for handling event subscriptions and race conditions - Introduced a new EventBuffer struct to buffer recent events and manage time-based eviction. - Implemented methods for adding events, retrieving matching events based on filters, and cleaning up expired events. - Integrated EventBuffer into the RpcServer to handle subscription race conditions by replaying buffered events to new subscribers. - Added periodic cleanup task for the event buffer to prevent unbounded memory growth. - Included unit tests to verify buffer size limits, time-based cleanup, and event filtering functionality. --- core/src/infra/daemon/event_buffer.rs | 252 ++++++++++++++++++++++++++ core/src/infra/daemon/mod.rs | 1 + core/src/infra/daemon/rpc.rs | 41 ++++- 3 files changed, 291 insertions(+), 3 deletions(-) create mode 100644 core/src/infra/daemon/event_buffer.rs diff --git a/core/src/infra/daemon/event_buffer.rs b/core/src/infra/daemon/event_buffer.rs new file mode 100644 index 000000000..ca250861b --- /dev/null +++ b/core/src/infra/daemon/event_buffer.rs @@ -0,0 +1,252 @@ +use std::collections::VecDeque; +use std::sync::Arc; +use std::time::{Duration, Instant}; +use tokio::sync::RwLock; + +use crate::infra::daemon::types::EventFilter; +use crate::infra::event::Event; + +/// A buffered event with timestamp for time-based eviction +#[derive(Debug, Clone)] +struct BufferedEvent { + event: Arc, + timestamp: Instant, +} + +/// Thread-safe event buffer with time-based eviction +/// +/// Buffers recent events to handle subscription race conditions where events +/// are emitted before subscriptions are created. When a new subscription is +/// created, buffered events matching the subscription filter are replayed. +pub struct EventBuffer { + events: Arc>>, + retention_duration: Duration, + max_size: usize, +} + +impl EventBuffer { + /// Create a new event buffer with default settings + /// + /// - Retention: 5 seconds + /// - Max size: 100 events + pub fn new() -> Self { + Self { + events: Arc::new(RwLock::new(VecDeque::with_capacity(100))), + retention_duration: Duration::from_secs(5), + max_size: 100, + } + } + + /// Add an event to the buffer + /// + /// Events are wrapped in Arc to avoid expensive clones when replaying + /// to multiple subscriptions. If the buffer exceeds max_size, the oldest + /// events are evicted (FIFO). + pub async fn add_event(&self, event: Event) { + let mut events = self.events.write().await; + + events.push_back(BufferedEvent { + event: Arc::new(event), + timestamp: Instant::now(), + }); + + // Enforce max size (evict oldest) + while events.len() > self.max_size { + events.pop_front(); + } + } + + /// Get buffered events that match the subscription filter + /// + /// Returns Arc to avoid cloning large event payloads. + /// Events are returned in chronological order (oldest first). + pub async fn get_matching_events( + &self, + event_types: &[String], + filter: &Option, + ) -> Vec> { + let events = self.events.read().await; + + events + .iter() + .filter_map(|buffered| { + if Self::matches_filter(&buffered.event, event_types, filter) { + Some(Arc::clone(&buffered.event)) + } else { + None + } + }) + .collect() + } + + /// Remove events older than retention_duration + /// + /// Should be called periodically (e.g., every 1 second) to prevent + /// unbounded memory growth. + pub async fn cleanup_expired(&self) { + let mut events = self.events.write().await; + let now = Instant::now(); + + events.retain(|buffered| now.duration_since(buffered.timestamp) < self.retention_duration); + } + + /// Check if an event matches the subscription filter + /// + /// This is a copy of RpcServer::should_forward_event to avoid module coupling. + /// The logic must stay in sync with the main filtering implementation. + fn matches_filter( + event: &Event, + event_types: &[String], + filter: &Option, + ) -> bool { + // If event_types is empty, forward all events + // Otherwise, treat event_types as an INCLUSION list (only forward these) + if !event_types.is_empty() { + let event_type = event.variant_name(); + + if !event_types.contains(&event_type.to_string()) { + return false; + } + } + + // Apply additional filters if specified + if let Some(filter) = filter { + // Filter by resource type + if let Some(filter_resource_type) = &filter.resource_type { + if let Some(event_resource_type) = event.resource_type() { + if event_resource_type != filter_resource_type { + return false; + } + } else { + // Event is not a resource event, but filter expects one + return false; + } + } + + // Filter by path scope (for resource events) + if let Some(path_scope) = &filter.path_scope { + let include_descendants = filter.include_descendants.unwrap_or(false); + let affects = event.affects_path(path_scope, include_descendants); + + if !affects { + return false; + } + } + + // Filter by job_id + match event { + Event::JobProgress { job_id, .. } + | Event::JobStarted { job_id, .. } + | Event::JobCompleted { job_id, .. } + | Event::JobFailed { job_id, .. } + | Event::JobCancelled { job_id, .. } => { + if let Some(filter_job_id) = &filter.job_id { + return job_id == filter_job_id; + } + } + Event::LibraryCreated { id, .. } + | Event::LibraryOpened { id, .. } + | Event::LibraryClosed { id, .. } => { + if let Some(filter_library_id) = &filter.library_id { + return id == filter_library_id; + } + } + _ => {} + } + } + + true + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::infra::event::Event; + + #[tokio::test] + async fn test_buffer_size_limit() { + let buffer = EventBuffer::new(); + + // Add 150 events (exceeds max_size of 100) + for i in 0..150 { + buffer + .add_event(Event::CoreStarted) + .await; + } + + // Verify only last 100 events are kept + let events = buffer.events.read().await; + assert_eq!(events.len(), 100); + } + + #[tokio::test] + async fn test_time_based_cleanup() { + let buffer = EventBuffer { + events: Arc::new(RwLock::new(VecDeque::new())), + retention_duration: Duration::from_millis(100), + max_size: 100, + }; + + // Add event + buffer.add_event(Event::CoreStarted).await; + + // Verify event exists + { + let events = buffer.events.read().await; + assert_eq!(events.len(), 1); + } + + // Wait for expiration + tokio::time::sleep(Duration::from_millis(150)).await; + + // Cleanup + buffer.cleanup_expired().await; + + // Verify event was removed + let events = buffer.events.read().await; + assert_eq!(events.len(), 0); + } + + #[tokio::test] + async fn test_empty_filter_matches_all() { + let buffer = EventBuffer::new(); + + buffer.add_event(Event::CoreStarted).await; + buffer.add_event(Event::CoreShutdown).await; + + // Empty filter should match all events + let matching = buffer.get_matching_events(&[], &None).await; + assert_eq!(matching.len(), 2); + } + + #[tokio::test] + async fn test_event_type_filtering() { + let buffer = EventBuffer::new(); + + buffer.add_event(Event::CoreStarted).await; + buffer.add_event(Event::CoreShutdown).await; + + // Filter for only CoreStarted + let matching = buffer + .get_matching_events(&["CoreStarted".to_string()], &None) + .await; + + assert_eq!(matching.len(), 1); + assert!(matches!(&*matching[0], Event::CoreStarted)); + } + + #[tokio::test] + async fn test_arc_cloning_is_cheap() { + let buffer = EventBuffer::new(); + + buffer.add_event(Event::CoreStarted).await; + + // Get matching events multiple times (simulating multiple subscriptions) + let match1 = buffer.get_matching_events(&[], &None).await; + let match2 = buffer.get_matching_events(&[], &None).await; + + // Both should point to same underlying event (Arc cloning) + assert_eq!(Arc::strong_count(&match1[0]), 3); // buffer + match1 + match2 + } +} diff --git a/core/src/infra/daemon/mod.rs b/core/src/infra/daemon/mod.rs index 37b78d7c1..62bfc044d 100644 --- a/core/src/infra/daemon/mod.rs +++ b/core/src/infra/daemon/mod.rs @@ -3,6 +3,7 @@ pub mod bootstrap; pub mod client; pub mod dispatch; +pub mod event_buffer; pub mod health; pub mod rpc; pub mod types; diff --git a/core/src/infra/daemon/rpc.rs b/core/src/infra/daemon/rpc.rs index b3b915f73..cebe744fd 100644 --- a/core/src/infra/daemon/rpc.rs +++ b/core/src/infra/daemon/rpc.rs @@ -7,6 +7,7 @@ use tokio::net::TcpListener; use tokio::sync::{mpsc, RwLock}; use uuid::Uuid; +use crate::infra::daemon::event_buffer::EventBuffer; use crate::infra::daemon::types::{DaemonError, DaemonRequest, DaemonResponse, EventFilter}; use crate::infra::event::log_emitter::{set_global_log_bus, LogMessage}; use crate::infra::event::{Event, EventSubscriber}; @@ -34,6 +35,8 @@ pub struct RpcServer { connection_count: Arc, /// Maximum number of concurrent connections max_connections: usize, + /// Event buffer for handling subscription race conditions + event_buffer: Arc, } impl RpcServer { @@ -47,6 +50,7 @@ impl RpcServer { connections: Arc::new(RwLock::new(HashMap::new())), connection_count: Arc::new(AtomicUsize::new(0)), max_connections: 100, // Reasonable limit for concurrent connections + event_buffer: Arc::new(EventBuffer::new()), } } @@ -82,6 +86,7 @@ impl RpcServer { let shutdown_tx = self.shutdown_tx.clone(); let connections = self.connections.clone(); let connection_count = self.connection_count.clone(); + let event_buffer = self.event_buffer.clone(); // Increment connection counter connection_count.fetch_add(1, Ordering::Relaxed); @@ -89,7 +94,7 @@ impl RpcServer { // Spawn task for concurrent request handling tokio::spawn(async move { // Convert errors to strings to ensure Send - if let Err(e) = Self::handle_connection(stream, core, shutdown_tx, connections, connection_count).await { + if let Err(e) = Self::handle_connection(stream, core, shutdown_tx, connections, connection_count, event_buffer).await { eprintln!("Connection error: {}", e); } }); @@ -137,9 +142,13 @@ impl RpcServer { // Start main event broadcaster let mut event_subscriber = core.events.subscribe(); let connections = self.connections.clone(); + let event_buffer = self.event_buffer.clone(); tokio::spawn(async move { while let Ok(event) = event_subscriber.recv().await { + // Add to buffer before broadcasting (for subscription race condition handling) + event_buffer.add_event(event.clone()).await; + let connections_read = connections.read().await; // Broadcast event to all subscribed connections @@ -167,6 +176,16 @@ impl RpcServer { } }); + // Spawn periodic cleanup task for event buffer + let buffer_clone = self.event_buffer.clone(); + tokio::spawn(async move { + let mut interval = tokio::time::interval(std::time::Duration::from_secs(1)); + loop { + interval.tick().await; + buffer_clone.cleanup_expired().await; + } + }); + // Note: Log messages are NOT broadcast as events anymore // They use a separate dedicated LogBus (core.logs) // Clients subscribe to logs separately, not through the event bus @@ -299,6 +318,7 @@ impl RpcServer { shutdown_tx: mpsc::Sender<()>, connections: Arc>>, connection_count: Arc, + event_buffer: Arc, ) -> Result<(), String> { let connection_id = Uuid::new_v4(); let (mut reader, mut writer) = stream.into_split(); @@ -327,7 +347,8 @@ impl RpcServer { &shutdown_tx, &connections, connection_id, - &response_tx + &response_tx, + &event_buffer ).await; // Send response @@ -411,6 +432,7 @@ impl RpcServer { connections: &Arc>>, connection_id: Uuid, response_tx: &mpsc::UnboundedSender, + event_buffer: &Arc, ) -> DaemonResponse { match request { DaemonRequest::Ping => DaemonResponse::Pong, @@ -443,7 +465,13 @@ impl RpcServer { event_types, filter, } => { - // Register connection for event streaming + // Step 1: Get buffered events BEFORE registering connection + // This prevents race between replay and live events + let matching_events = event_buffer + .get_matching_events(&event_types, &filter) + .await; + + // Step 2: Register connection for event streaming (starts receiving live events) let connection = Connection { id: connection_id, response_tx: response_tx.clone(), @@ -453,6 +481,13 @@ impl RpcServer { }; connections.write().await.insert(connection_id, connection); + + // Step 3: Send buffered events in chronological order + // This ensures no gaps between buffered and live events + for event in matching_events { + let _ = response_tx.send(DaemonResponse::Event((*event).clone())); + } + DaemonResponse::Subscribed }