Add EventBuffer for handling event subscriptions and race conditions

- Introduced a new EventBuffer struct to buffer recent events and manage time-based eviction.
- Implemented methods for adding events, retrieving matching events based on filters, and cleaning up expired events.
- Integrated EventBuffer into the RpcServer to handle subscription race conditions by replaying buffered events to new subscribers.
- Added periodic cleanup task for the event buffer to prevent unbounded memory growth.
- Included unit tests to verify buffer size limits, time-based cleanup, and event filtering functionality.
This commit is contained in:
Jamie Pine 2025-12-09 19:41:23 -08:00
parent 9438965ffd
commit 0bae97746a
3 changed files with 291 additions and 3 deletions

View File

@ -0,0 +1,252 @@
use std::collections::VecDeque;
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::RwLock;
use crate::infra::daemon::types::EventFilter;
use crate::infra::event::Event;
/// A buffered event with timestamp for time-based eviction
#[derive(Debug, Clone)]
struct BufferedEvent {
event: Arc<Event>,
timestamp: Instant,
}
/// Thread-safe event buffer with time-based eviction
///
/// Buffers recent events to handle subscription race conditions where events
/// are emitted before subscriptions are created. When a new subscription is
/// created, buffered events matching the subscription filter are replayed.
pub struct EventBuffer {
events: Arc<RwLock<VecDeque<BufferedEvent>>>,
retention_duration: Duration,
max_size: usize,
}
impl EventBuffer {
/// Create a new event buffer with default settings
///
/// - Retention: 5 seconds
/// - Max size: 100 events
pub fn new() -> Self {
Self {
events: Arc::new(RwLock::new(VecDeque::with_capacity(100))),
retention_duration: Duration::from_secs(5),
max_size: 100,
}
}
/// Add an event to the buffer
///
/// Events are wrapped in Arc to avoid expensive clones when replaying
/// to multiple subscriptions. If the buffer exceeds max_size, the oldest
/// events are evicted (FIFO).
pub async fn add_event(&self, event: Event) {
let mut events = self.events.write().await;
events.push_back(BufferedEvent {
event: Arc::new(event),
timestamp: Instant::now(),
});
// Enforce max size (evict oldest)
while events.len() > self.max_size {
events.pop_front();
}
}
/// Get buffered events that match the subscription filter
///
/// Returns Arc<Event> to avoid cloning large event payloads.
/// Events are returned in chronological order (oldest first).
pub async fn get_matching_events(
&self,
event_types: &[String],
filter: &Option<EventFilter>,
) -> Vec<Arc<Event>> {
let events = self.events.read().await;
events
.iter()
.filter_map(|buffered| {
if Self::matches_filter(&buffered.event, event_types, filter) {
Some(Arc::clone(&buffered.event))
} else {
None
}
})
.collect()
}
/// Remove events older than retention_duration
///
/// Should be called periodically (e.g., every 1 second) to prevent
/// unbounded memory growth.
pub async fn cleanup_expired(&self) {
let mut events = self.events.write().await;
let now = Instant::now();
events.retain(|buffered| now.duration_since(buffered.timestamp) < self.retention_duration);
}
/// Check if an event matches the subscription filter
///
/// This is a copy of RpcServer::should_forward_event to avoid module coupling.
/// The logic must stay in sync with the main filtering implementation.
fn matches_filter(
event: &Event,
event_types: &[String],
filter: &Option<EventFilter>,
) -> bool {
// If event_types is empty, forward all events
// Otherwise, treat event_types as an INCLUSION list (only forward these)
if !event_types.is_empty() {
let event_type = event.variant_name();
if !event_types.contains(&event_type.to_string()) {
return false;
}
}
// Apply additional filters if specified
if let Some(filter) = filter {
// Filter by resource type
if let Some(filter_resource_type) = &filter.resource_type {
if let Some(event_resource_type) = event.resource_type() {
if event_resource_type != filter_resource_type {
return false;
}
} else {
// Event is not a resource event, but filter expects one
return false;
}
}
// Filter by path scope (for resource events)
if let Some(path_scope) = &filter.path_scope {
let include_descendants = filter.include_descendants.unwrap_or(false);
let affects = event.affects_path(path_scope, include_descendants);
if !affects {
return false;
}
}
// Filter by job_id
match event {
Event::JobProgress { job_id, .. }
| Event::JobStarted { job_id, .. }
| Event::JobCompleted { job_id, .. }
| Event::JobFailed { job_id, .. }
| Event::JobCancelled { job_id, .. } => {
if let Some(filter_job_id) = &filter.job_id {
return job_id == filter_job_id;
}
}
Event::LibraryCreated { id, .. }
| Event::LibraryOpened { id, .. }
| Event::LibraryClosed { id, .. } => {
if let Some(filter_library_id) = &filter.library_id {
return id == filter_library_id;
}
}
_ => {}
}
}
true
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::infra::event::Event;
#[tokio::test]
async fn test_buffer_size_limit() {
let buffer = EventBuffer::new();
// Add 150 events (exceeds max_size of 100)
for i in 0..150 {
buffer
.add_event(Event::CoreStarted)
.await;
}
// Verify only last 100 events are kept
let events = buffer.events.read().await;
assert_eq!(events.len(), 100);
}
#[tokio::test]
async fn test_time_based_cleanup() {
let buffer = EventBuffer {
events: Arc::new(RwLock::new(VecDeque::new())),
retention_duration: Duration::from_millis(100),
max_size: 100,
};
// Add event
buffer.add_event(Event::CoreStarted).await;
// Verify event exists
{
let events = buffer.events.read().await;
assert_eq!(events.len(), 1);
}
// Wait for expiration
tokio::time::sleep(Duration::from_millis(150)).await;
// Cleanup
buffer.cleanup_expired().await;
// Verify event was removed
let events = buffer.events.read().await;
assert_eq!(events.len(), 0);
}
#[tokio::test]
async fn test_empty_filter_matches_all() {
let buffer = EventBuffer::new();
buffer.add_event(Event::CoreStarted).await;
buffer.add_event(Event::CoreShutdown).await;
// Empty filter should match all events
let matching = buffer.get_matching_events(&[], &None).await;
assert_eq!(matching.len(), 2);
}
#[tokio::test]
async fn test_event_type_filtering() {
let buffer = EventBuffer::new();
buffer.add_event(Event::CoreStarted).await;
buffer.add_event(Event::CoreShutdown).await;
// Filter for only CoreStarted
let matching = buffer
.get_matching_events(&["CoreStarted".to_string()], &None)
.await;
assert_eq!(matching.len(), 1);
assert!(matches!(&*matching[0], Event::CoreStarted));
}
#[tokio::test]
async fn test_arc_cloning_is_cheap() {
let buffer = EventBuffer::new();
buffer.add_event(Event::CoreStarted).await;
// Get matching events multiple times (simulating multiple subscriptions)
let match1 = buffer.get_matching_events(&[], &None).await;
let match2 = buffer.get_matching_events(&[], &None).await;
// Both should point to same underlying event (Arc cloning)
assert_eq!(Arc::strong_count(&match1[0]), 3); // buffer + match1 + match2
}
}

View File

@ -3,6 +3,7 @@
pub mod bootstrap;
pub mod client;
pub mod dispatch;
pub mod event_buffer;
pub mod health;
pub mod rpc;
pub mod types;

View File

@ -7,6 +7,7 @@ use tokio::net::TcpListener;
use tokio::sync::{mpsc, RwLock};
use uuid::Uuid;
use crate::infra::daemon::event_buffer::EventBuffer;
use crate::infra::daemon::types::{DaemonError, DaemonRequest, DaemonResponse, EventFilter};
use crate::infra::event::log_emitter::{set_global_log_bus, LogMessage};
use crate::infra::event::{Event, EventSubscriber};
@ -34,6 +35,8 @@ pub struct RpcServer {
connection_count: Arc<AtomicUsize>,
/// Maximum number of concurrent connections
max_connections: usize,
/// Event buffer for handling subscription race conditions
event_buffer: Arc<EventBuffer>,
}
impl RpcServer {
@ -47,6 +50,7 @@ impl RpcServer {
connections: Arc::new(RwLock::new(HashMap::new())),
connection_count: Arc::new(AtomicUsize::new(0)),
max_connections: 100, // Reasonable limit for concurrent connections
event_buffer: Arc::new(EventBuffer::new()),
}
}
@ -82,6 +86,7 @@ impl RpcServer {
let shutdown_tx = self.shutdown_tx.clone();
let connections = self.connections.clone();
let connection_count = self.connection_count.clone();
let event_buffer = self.event_buffer.clone();
// Increment connection counter
connection_count.fetch_add(1, Ordering::Relaxed);
@ -89,7 +94,7 @@ impl RpcServer {
// Spawn task for concurrent request handling
tokio::spawn(async move {
// Convert errors to strings to ensure Send
if let Err(e) = Self::handle_connection(stream, core, shutdown_tx, connections, connection_count).await {
if let Err(e) = Self::handle_connection(stream, core, shutdown_tx, connections, connection_count, event_buffer).await {
eprintln!("Connection error: {}", e);
}
});
@ -137,9 +142,13 @@ impl RpcServer {
// Start main event broadcaster
let mut event_subscriber = core.events.subscribe();
let connections = self.connections.clone();
let event_buffer = self.event_buffer.clone();
tokio::spawn(async move {
while let Ok(event) = event_subscriber.recv().await {
// Add to buffer before broadcasting (for subscription race condition handling)
event_buffer.add_event(event.clone()).await;
let connections_read = connections.read().await;
// Broadcast event to all subscribed connections
@ -167,6 +176,16 @@ impl RpcServer {
}
});
// Spawn periodic cleanup task for event buffer
let buffer_clone = self.event_buffer.clone();
tokio::spawn(async move {
let mut interval = tokio::time::interval(std::time::Duration::from_secs(1));
loop {
interval.tick().await;
buffer_clone.cleanup_expired().await;
}
});
// Note: Log messages are NOT broadcast as events anymore
// They use a separate dedicated LogBus (core.logs)
// Clients subscribe to logs separately, not through the event bus
@ -299,6 +318,7 @@ impl RpcServer {
shutdown_tx: mpsc::Sender<()>,
connections: Arc<RwLock<HashMap<Uuid, Connection>>>,
connection_count: Arc<AtomicUsize>,
event_buffer: Arc<EventBuffer>,
) -> Result<(), String> {
let connection_id = Uuid::new_v4();
let (mut reader, mut writer) = stream.into_split();
@ -327,7 +347,8 @@ impl RpcServer {
&shutdown_tx,
&connections,
connection_id,
&response_tx
&response_tx,
&event_buffer
).await;
// Send response
@ -411,6 +432,7 @@ impl RpcServer {
connections: &Arc<RwLock<HashMap<Uuid, Connection>>>,
connection_id: Uuid,
response_tx: &mpsc::UnboundedSender<DaemonResponse>,
event_buffer: &Arc<EventBuffer>,
) -> DaemonResponse {
match request {
DaemonRequest::Ping => DaemonResponse::Pong,
@ -443,7 +465,13 @@ impl RpcServer {
event_types,
filter,
} => {
// Register connection for event streaming
// Step 1: Get buffered events BEFORE registering connection
// This prevents race between replay and live events
let matching_events = event_buffer
.get_matching_events(&event_types, &filter)
.await;
// Step 2: Register connection for event streaming (starts receiving live events)
let connection = Connection {
id: connection_id,
response_tx: response_tx.clone(),
@ -453,6 +481,13 @@ impl RpcServer {
};
connections.write().await.insert(connection_id, connection);
// Step 3: Send buffered events in chronological order
// This ensures no gaps between buffered and live events
for event in matching_events {
let _ = response_tx.send(DaemonResponse::Event((*event).clone()));
}
DaemonResponse::Subscribed
}