From dbcfcb3c560590845fcb9172c3304a7a33fc7ceb Mon Sep 17 00:00:00 2001 From: Jamie Pine Date: Wed, 3 Dec 2025 21:58:29 -0800 Subject: [PATCH] Add Sync Events Export CLI and Protocol Support - Add SyncEventsArgs to export sync events from the CLI - Wire SyncCmd::Events and implement export_events to fetch - format and write results - Implement JSON, SQL, and Markdown exporters - with optional device data in the output - Extend protocol with EventLogRequest and EventLogResponse - Enable LogSyncHandler to handle event log requests and return logs - Expose log_handler from BackfillManager for event logging - Update docs with CLI examples and protocol overview --- apps/cli/src/domains/sync/args.rs | 35 ++++ apps/cli/src/domains/sync/mod.rs | 173 ++++++++++++++++++ .../service/network/protocol/sync/handler.rs | 47 +++++ .../service/network/protocol/sync/messages.rs | 19 ++ core/src/service/sync/backfill.rs | 5 + core/src/service/sync/mod.rs | 6 +- core/src/service/sync/protocol_handler.rs | 72 ++++++++ docs/core/sync-event-log.mdx | 172 +++++++++++++++++ 8 files changed, 527 insertions(+), 2 deletions(-) diff --git a/apps/cli/src/domains/sync/args.rs b/apps/cli/src/domains/sync/args.rs index 7286901e8..0e90e834e 100644 --- a/apps/cli/src/domains/sync/args.rs +++ b/apps/cli/src/domains/sync/args.rs @@ -1,5 +1,40 @@ use clap::Args; +#[derive(Args, Debug)] +pub struct SyncEventsArgs { + /// Export format (json, sql, markdown) + #[arg(short, long, default_value = "json", help = "Output format: json, sql, or markdown")] + pub format: String, + + /// Output file (defaults to stdout) + #[arg(short, long, help = "Write output to file instead of stdout")] + pub output: Option, + + /// Time range: events since this time + #[arg(long, help = "Show events since (e.g., '1 hour ago', '2025-12-03 10:00:00')")] + pub since: Option, + + /// Filter by event type + #[arg(long, help = "Filter by event type (state_transition, backfill_session_started, etc.)")] + pub event_type: Option, + + /// Filter by correlation ID (show all events from a session) + #[arg(long, help = "Filter by correlation ID to trace a specific backfill session")] + pub correlation_id: Option, + + /// Filter by severity (debug, info, warning, error) + #[arg(long, help = "Filter by severity level")] + pub severity: Option, + + /// Maximum number of events to return + #[arg(long, default_value = "1000", help = "Maximum number of events")] + pub limit: u32, + + /// Include device name in output + #[arg(long, help = "Include device name/ID in output")] + pub with_device: bool, +} + #[derive(Args, Debug)] pub struct SyncMetricsArgs { /// Show metrics for a specific time range diff --git a/apps/cli/src/domains/sync/mod.rs b/apps/cli/src/domains/sync/mod.rs index 4b2141a0b..15387b4a9 100644 --- a/apps/cli/src/domains/sync/mod.rs +++ b/apps/cli/src/domains/sync/mod.rs @@ -11,6 +11,7 @@ use tokio::time::sleep; use crate::context::Context; use crate::util::prelude::*; +use sd_core::infra::sync::{EventSeverity, SyncEventQuery, SyncEventType}; use sd_core::ops::sync::get_metrics::GetSyncMetricsInput; use sd_core::service::sync::state::DeviceSyncState; @@ -20,10 +21,14 @@ use self::args::*; pub enum SyncCmd { /// Show sync metrics Metrics(SyncMetricsArgs), + + /// Export sync event log + Events(SyncEventsArgs), } pub async fn run(ctx: &Context, cmd: SyncCmd) -> Result<()> { match cmd { + SyncCmd::Events(args) => export_events(ctx, args).await?, SyncCmd::Metrics(args) => { // Parse time filters let since = if let Some(since_str) = &args.since { @@ -405,3 +410,171 @@ fn format_bytes(bytes: u64) -> String { format!("{:.2} GB", bytes as f64 / GB as f64) } } + +async fn export_events(ctx: &Context, args: SyncEventsArgs) -> Result<()> { + let library_id = ctx.library_id.ok_or_else(|| { + anyhow::anyhow!("No library selected. Use 'sd library switch' to select a library first.") + })?; + + // Build query + let mut query = SyncEventQuery::new(library_id).with_limit(args.limit); + + // Parse time filter + if let Some(since_str) = &args.since { + let since = parse_time_filter(since_str)?; + query = query.with_time_range(since, Utc::now()); + } + + // Parse event type filter + if let Some(event_type_str) = &args.event_type { + if let Some(event_type) = SyncEventType::from_str(event_type_str) { + query = query.with_event_types(vec![event_type]); + } else { + return Err(anyhow::anyhow!("Invalid event type: {}", event_type_str)); + } + } + + // Parse correlation ID filter + if let Some(corr_id_str) = &args.correlation_id { + let corr_id = uuid::Uuid::parse_str(corr_id_str)?; + query = query.with_correlation_id(corr_id); + } + + // Parse severity filter + if let Some(severity_str) = &args.severity { + if let Some(severity) = EventSeverity::from_str(severity_str) { + query = query.with_severities(vec![severity]); + } else { + return Err(anyhow::anyhow!("Invalid severity: {}", severity_str)); + } + } + + // Call the API + let input = sd_core::ops::sync::get_event_log::GetSyncEventLogInput { + start_time: query.time_range.map(|(start, _)| start), + end_time: query.time_range.map(|(_, end)| end), + event_types: query.event_types, + categories: query.categories, + severities: query.severities, + peer_id: query.peer_filter, + model_type: query.model_type_filter, + correlation_id: query.correlation_id, + limit: query.limit, + offset: query.offset, + }; + + let json_response = ctx.core.query(&input, Some(library_id)).await?; + let output: sd_core::ops::sync::get_event_log::GetSyncEventLogOutput = + serde_json::from_value(json_response)?; + + // Format output + let formatted = match args.format.as_str() { + "json" => format_events_json(&output.events, args.with_device)?, + "sql" => format_events_sql(&output.events), + "markdown" | "md" => format_events_markdown(&output.events, args.with_device), + _ => return Err(anyhow::anyhow!("Unknown format: {}. Use json, sql, or markdown", args.format)), + }; + + // Write output + if let Some(output_file) = &args.output { + std::fs::write(output_file, formatted)?; + println!("Exported {} events to {}", output.events.len(), output_file); + } else { + println!("{}", formatted); + } + + Ok(()) +} + +fn format_events_json(events: &[sd_core::infra::sync::SyncEventLog], with_device: bool) -> Result { + if with_device { + Ok(serde_json::to_string_pretty(events)?) + } else { + // Strip device_id for cleaner output + let simplified: Vec<_> = events + .iter() + .map(|e| { + serde_json::json!({ + "timestamp": e.timestamp, + "event_type": e.event_type, + "severity": e.severity, + "summary": e.summary, + "details": e.details, + "correlation_id": e.correlation_id, + "peer_device_id": e.peer_device_id, + "record_count": e.record_count, + "duration_ms": e.duration_ms, + }) + }) + .collect(); + Ok(serde_json::to_string_pretty(&simplified)?) + } +} + +fn format_events_sql(events: &[sd_core::infra::sync::SyncEventLog]) -> String { + let mut output = String::from("-- Sync Event Log Export\n"); + output.push_str("-- Generated: "); + output.push_str(&Utc::now().to_rfc3339()); + output.push_str("\n\n"); + + for event in events { + let details = event + .details + .as_ref() + .map(|d| serde_json::to_string(d).unwrap_or_default()) + .unwrap_or_default(); + + let model_types = event + .model_types + .as_ref() + .map(|m| m.join(",")) + .unwrap_or_default(); + + output.push_str(&format!( + "INSERT INTO sync_event_log (timestamp, device_id, event_type, category, severity, summary, details, correlation_id, peer_device_id, model_types, record_count, duration_ms) VALUES ('{}', '{}', '{}', '{}', '{}', '{}', {}, {}, {}, {}, {}, {});\n", + event.timestamp.to_rfc3339(), + event.device_id, + event.event_type.as_str(), + event.category.as_str(), + event.severity.as_str(), + event.summary.replace("'", "''"), // Escape quotes + if details.is_empty() { "NULL".to_string() } else { format!("'{}'", details.replace("'", "''")) }, + event.correlation_id.map(|id| format!("'{}'", id)).unwrap_or_else(|| "NULL".to_string()), + event.peer_device_id.map(|id| format!("'{}'", id)).unwrap_or_else(|| "NULL".to_string()), + if model_types.is_empty() { "NULL".to_string() } else { format!("'{}'", model_types) }, + event.record_count.map(|c| c.to_string()).unwrap_or_else(|| "NULL".to_string()), + event.duration_ms.map(|d| d.to_string()).unwrap_or_else(|| "NULL".to_string()), + )); + } + + output +} + +fn format_events_markdown(events: &[sd_core::infra::sync::SyncEventLog], with_device: bool) -> String { + let mut output = String::from("# Sync Event Log\n\n"); + output.push_str(&format!("**Exported**: {}\n", Utc::now().to_rfc3339())); + output.push_str(&format!("**Event Count**: {}\n\n", events.len())); + + output.push_str("| Timestamp | Event Type | Severity | Summary |\n"); + output.push_str("|-----------|------------|----------|----------|\n"); + + for event in events { + output.push_str(&format!( + "| {} | {:?} | {:?} | {} |\n", + event.timestamp.format("%Y-%m-%d %H:%M:%S"), + event.event_type, + event.severity, + event.summary + )); + } + + if with_device && !events.is_empty() { + output.push_str("\n## Devices\n\n"); + let device_ids: std::collections::HashSet<_> = events.iter().map(|e| e.device_id).collect(); + for device_id in device_ids { + output.push_str(&format!("- {}\n", device_id)); + } + } + + output +} diff --git a/core/src/service/network/protocol/sync/handler.rs b/core/src/service/network/protocol/sync/handler.rs index 272d36f8c..8095d08ab 100644 --- a/core/src/service/network/protocol/sync/handler.rs +++ b/core/src/service/network/protocol/sync/handler.rs @@ -476,6 +476,53 @@ impl SyncProtocolHandler { Ok(None) } + SyncMessage::EventLogRequest { + library_id, + requesting_device, + since, + event_types, + correlation_id, + limit, + } => { + debug!( + from_device = %from_device, + requesting_device = %requesting_device, + "Processing event log request" + ); + + let backfill_manager = self.backfill_manager.as_ref().ok_or_else(|| { + NetworkingError::Protocol("BackfillManager not initialized".to_string()) + })?; + + let log_handler = backfill_manager.log_handler(); + + let response = log_handler + .handle_event_log_request(requesting_device, since, event_types, correlation_id, limit) + .await + .map_err(|e| { + NetworkingError::Protocol(format!("Failed to handle event log request: {}", e)) + })?; + + Ok(Some(response)) + } + + SyncMessage::EventLogResponse { + library_id, + responding_device, + events, + } => { + debug!( + from_device = %from_device, + responding_device = %responding_device, + event_count = events.len(), + "Received event log response from peer" + ); + + // EventLogResponse is handled by the cross-device fetcher waiting for it + // We don't process it here - it gets delivered to the waiting query + Ok(None) + } + SyncMessage::Error { library_id, message, diff --git a/core/src/service/network/protocol/sync/messages.rs b/core/src/service/network/protocol/sync/messages.rs index 849fb731e..610dd7688 100644 --- a/core/src/service/network/protocol/sync/messages.rs +++ b/core/src/service/network/protocol/sync/messages.rs @@ -117,6 +117,23 @@ pub enum SyncMessage { needs_shared_catchup: bool, // If true, peer needs our shared changes }, + /// Request event log from peer + EventLogRequest { + library_id: Uuid, + requesting_device: Uuid, + since: Option>, + event_types: Option>, + correlation_id: Option, + limit: u32, + }, + + /// Response with event log + EventLogResponse { + library_id: Uuid, + responding_device: Uuid, + events: Vec, // Serialized SyncEventLog + }, + /// Error response Error { library_id: Uuid, message: String }, } @@ -145,6 +162,8 @@ impl SyncMessage { | SyncMessage::Heartbeat { library_id, .. } | SyncMessage::WatermarkExchangeRequest { library_id, .. } | SyncMessage::WatermarkExchangeResponse { library_id, .. } + | SyncMessage::EventLogRequest { library_id, .. } + | SyncMessage::EventLogResponse { library_id, .. } | SyncMessage::Error { library_id, .. } => *library_id, } } diff --git a/core/src/service/sync/backfill.rs b/core/src/service/sync/backfill.rs index bb93997eb..13ce07081 100644 --- a/core/src/service/sync/backfill.rs +++ b/core/src/service/sync/backfill.rs @@ -70,6 +70,11 @@ impl BackfillManager { &self.metrics } + /// Get log handler for protocol operations + pub fn log_handler(&self) -> &Arc { + &self.log_handler + } + /// Deliver a StateResponse to waiting request /// /// Called by protocol handler when StateResponse is received. diff --git a/core/src/service/sync/mod.rs b/core/src/service/sync/mod.rs index cbf5c1bb5..910fd342e 100644 --- a/core/src/service/sync/mod.rs +++ b/core/src/service/sync/mod.rs @@ -201,11 +201,13 @@ impl SyncService { ); // Create protocol handlers - let log_handler = Arc::new(LogSyncHandler::new( + let mut log_handler = LogSyncHandler::new( library_id, library.db().clone(), peer_sync.clone(), - )); + ); + log_handler.set_event_logger(event_logger.clone()); + let log_handler = Arc::new(log_handler); // Create backfill manager for automatic orchestration let backfill_manager = Arc::new(BackfillManager::new( diff --git a/core/src/service/sync/protocol_handler.rs b/core/src/service/sync/protocol_handler.rs index 12a4875fa..140381233 100644 --- a/core/src/service/sync/protocol_handler.rs +++ b/core/src/service/sync/protocol_handler.rs @@ -17,12 +17,14 @@ use tracing::{debug, info, warn}; use uuid::Uuid; use super::peer::PeerSync; +use crate::infra::sync::{SyncEventLog, SyncEventQuery, SyncEventType}; /// Handle log-based sync messages (shared resources) pub struct LogSyncHandler { library_id: Uuid, db: Arc, peer_sync: Arc, + event_logger: Option>, } impl LogSyncHandler { @@ -31,9 +33,15 @@ impl LogSyncHandler { library_id, db, peer_sync, + event_logger: None, } } + /// Set the event logger (called after initialization) + pub fn set_event_logger(&mut self, logger: Arc) { + self.event_logger = Some(logger); + } + /// Handle incoming SharedChange message /// /// Uses the Syncable registry with conflict resolution strategies. @@ -140,4 +148,68 @@ impl LogSyncHandler { Ok(serde_json::Value::Object(json_map)) } + + /// Handle EventLogRequest message + pub async fn handle_event_log_request( + &self, + requesting_device: Uuid, + since: Option>, + event_types: Option>, + correlation_id: Option, + limit: u32, + ) -> Result { + debug!( + requesting_device = %requesting_device, + since = ?since, + limit = limit, + "Handling event log request from peer" + ); + + // Build query + let mut query = SyncEventQuery::new(self.library_id).with_limit(limit); + + if let Some(since_time) = since { + query = query.with_time_range(since_time, Utc::now()); + } + + if let Some(types_str) = event_types { + let types: Vec = types_str + .into_iter() + .filter_map(|s| SyncEventType::from_str(&s)) + .collect(); + if !types.is_empty() { + query = query.with_event_types(types); + } + } + + if let Some(corr_id) = correlation_id { + query = query.with_correlation_id(corr_id); + } + + // Query local events + let logger = self + .event_logger + .as_ref() + .ok_or_else(|| anyhow::anyhow!("Event logger not initialized"))?; + + let events = logger.query(query).await?; + + // Serialize events to JSON + let events_json: Vec = events + .into_iter() + .map(|e| serde_json::to_value(e)) + .collect::, _>>()?; + + info!( + event_count = events_json.len(), + requesting_device = %requesting_device, + "Responding to event log request" + ); + + Ok(SyncMessage::EventLogResponse { + library_id: self.library_id, + responding_device: self.peer_sync.device_id(), + events: events_json, + }) + } } diff --git a/docs/core/sync-event-log.mdx b/docs/core/sync-event-log.mdx index 90c49b4ef..f976b3b2d 100644 --- a/docs/core/sync-event-log.mdx +++ b/docs/core/sync-event-log.mdx @@ -114,6 +114,93 @@ CREATE INDEX idx_sync_event_log_type ON sync_event_log(event_type); CREATE INDEX idx_sync_event_log_correlation ON sync_event_log(correlation_id); ``` +## CLI Export + +### Export Events from a Device + +```bash +# Export as JSON (default) +sd sync events + +# Export to file +sd sync events --output sync_events.json + +# Export last hour only +sd sync events --since "1 hour ago" -o recent_events.json + +# Export as SQL (for database import) +sd sync events --format sql -o events.sql + +# Export as Markdown +sd sync events --format markdown -o events.md + +# Filter by event type +sd sync events --event-type state_transition + +# Filter by severity (errors only) +sd sync events --severity error + +# Trace specific backfill session +sd sync events --correlation-id abc-123-def + +# Limit results +sd sync events --limit 500 +``` + +### Cross-Device Timeline + +To debug sync issues across multiple devices, export logs from each device and merge: + +```bash +# On Device A +sd sync events --since "2 hours ago" --output device_a.json + +# On Device B +sd sync events --since "2 hours ago" --output device_b.json + +# Merge chronologically (requires jq) +jq -s 'add | sort_by(.timestamp)' device_a.json device_b.json > timeline.json + +# View merged timeline +jq '.[] | "\(.timestamp) | \(.device_id[0:8]) | \(.event_type) | \(.summary)"' timeline.json +``` + +### SQL Import for Analysis + +```bash +# Export as SQL from both devices +sd sync events --format sql -o device_a.sql +sd sync events --format sql -o device_b.sql + +# Create combined database +sqlite3 timeline.db < device_a.sql +sqlite3 timeline.db < device_b.sql + +# Query cross-device timeline +sqlite3 timeline.db " +SELECT + timestamp, + substr(device_id, 1, 8) as device, + event_type, + summary +FROM sync_event_log +ORDER BY timestamp +" + +# Find matching backfill sessions across devices +sqlite3 timeline.db " +SELECT + correlation_id, + COUNT(DISTINCT device_id) as device_count, + MIN(timestamp) as started, + MAX(timestamp) as completed +FROM sync_event_log +WHERE correlation_id IS NOT NULL +GROUP BY correlation_id +ORDER BY started DESC +" +``` + ## Query API ### Rust API @@ -208,6 +295,35 @@ WHERE event_type = 'backfill_session_started' ORDER BY timestamp DESC; ``` +## Network Protocol + +The system includes protocol messages for querying event logs from remote peers over the network: + +**EventLogRequest** - Request events from a peer +```rust +EventLogRequest { + library_id: Uuid, + requesting_device: Uuid, + since: Option>, + event_types: Option>, + correlation_id: Option, + limit: u32, +} +``` + +**EventLogResponse** - Peer's events +```rust +EventLogResponse { + library_id: Uuid, + responding_device: Uuid, + events: Vec, +} +``` + +These messages are routed through `SyncProtocolHandler` and handled by `LogSyncHandler::handle_event_log_request()`. + +**Status**: Protocol foundation complete. Automatic cross-device fetching can be implemented when needed by creating a cross-device fetcher that sends requests to all online peers and merges responses. + ## Debugging Scenarios ### "Why did sync fail?" @@ -402,6 +518,62 @@ Tests verify: - Persistence across restarts - Batch aggregation +## Export Formats + +### JSON Format + +Structured output for programmatic processing: + +```json +[ + { + "timestamp": "2025-12-03T10:15:00Z", + "event_type": "StateTransition", + "severity": "Info", + "summary": "Ready → Backfilling", + "details": { "reason": "new device joined" }, + "correlation_id": null, + "duration_ms": null + }, + { + "timestamp": "2025-12-03T10:15:32Z", + "event_type": "BatchIngestion", + "severity": "Debug", + "summary": "Ingested batch of 45,000 records (25k entries, 15k tags, 5k locations) in 28s", + "record_count": 45000, + "duration_ms": 28000 + } +] +``` + +### SQL Format + +INSERT statements for database import: + +```sql +-- Sync Event Log Export +-- Generated: 2025-12-03T10:20:00Z + +INSERT INTO sync_event_log (...) VALUES (...); +INSERT INTO sync_event_log (...) VALUES (...); +``` + +### Markdown Format + +Human-readable table format: + +```markdown +# Sync Event Log + +**Exported**: 2025-12-03T10:20:00Z +**Event Count**: 15 + +| Timestamp | Event Type | Severity | Summary | +|-----------|------------|----------|---------| +| 2025-12-03 10:15:00 | StateTransition | Info | Ready → Backfilling | +| 2025-12-03 10:15:32 | BatchIngestion | Debug | Ingested 45k records | +``` + ## Troubleshooting **Events not appearing:**