Add Sync Events Export CLI and Protocol Support

- Add SyncEventsArgs to export sync events from the CLI - Wire
SyncCmd::Events and implement export_events to fetch - format and write
results - Implement JSON, SQL, and Markdown exporters - with optional
device data in the output - Extend protocol with EventLogRequest and
EventLogResponse - Enable LogSyncHandler to handle event log requests
and return logs - Expose log_handler from BackfillManager for event
logging - Update docs with CLI examples and protocol overview
This commit is contained in:
Jamie Pine 2025-12-03 21:58:29 -08:00
parent a84ccadfa9
commit dbcfcb3c56
8 changed files with 527 additions and 2 deletions

View File

@ -1,5 +1,40 @@
use clap::Args;
#[derive(Args, Debug)]
pub struct SyncEventsArgs {
/// Export format (json, sql, markdown)
#[arg(short, long, default_value = "json", help = "Output format: json, sql, or markdown")]
pub format: String,
/// Output file (defaults to stdout)
#[arg(short, long, help = "Write output to file instead of stdout")]
pub output: Option<String>,
/// Time range: events since this time
#[arg(long, help = "Show events since (e.g., '1 hour ago', '2025-12-03 10:00:00')")]
pub since: Option<String>,
/// Filter by event type
#[arg(long, help = "Filter by event type (state_transition, backfill_session_started, etc.)")]
pub event_type: Option<String>,
/// Filter by correlation ID (show all events from a session)
#[arg(long, help = "Filter by correlation ID to trace a specific backfill session")]
pub correlation_id: Option<String>,
/// Filter by severity (debug, info, warning, error)
#[arg(long, help = "Filter by severity level")]
pub severity: Option<String>,
/// Maximum number of events to return
#[arg(long, default_value = "1000", help = "Maximum number of events")]
pub limit: u32,
/// Include device name in output
#[arg(long, help = "Include device name/ID in output")]
pub with_device: bool,
}
#[derive(Args, Debug)]
pub struct SyncMetricsArgs {
/// Show metrics for a specific time range

View File

@ -11,6 +11,7 @@ use tokio::time::sleep;
use crate::context::Context;
use crate::util::prelude::*;
use sd_core::infra::sync::{EventSeverity, SyncEventQuery, SyncEventType};
use sd_core::ops::sync::get_metrics::GetSyncMetricsInput;
use sd_core::service::sync::state::DeviceSyncState;
@ -20,10 +21,14 @@ use self::args::*;
pub enum SyncCmd {
/// Show sync metrics
Metrics(SyncMetricsArgs),
/// Export sync event log
Events(SyncEventsArgs),
}
pub async fn run(ctx: &Context, cmd: SyncCmd) -> Result<()> {
match cmd {
SyncCmd::Events(args) => export_events(ctx, args).await?,
SyncCmd::Metrics(args) => {
// Parse time filters
let since = if let Some(since_str) = &args.since {
@ -405,3 +410,171 @@ fn format_bytes(bytes: u64) -> String {
format!("{:.2} GB", bytes as f64 / GB as f64)
}
}
async fn export_events(ctx: &Context, args: SyncEventsArgs) -> Result<()> {
let library_id = ctx.library_id.ok_or_else(|| {
anyhow::anyhow!("No library selected. Use 'sd library switch' to select a library first.")
})?;
// Build query
let mut query = SyncEventQuery::new(library_id).with_limit(args.limit);
// Parse time filter
if let Some(since_str) = &args.since {
let since = parse_time_filter(since_str)?;
query = query.with_time_range(since, Utc::now());
}
// Parse event type filter
if let Some(event_type_str) = &args.event_type {
if let Some(event_type) = SyncEventType::from_str(event_type_str) {
query = query.with_event_types(vec![event_type]);
} else {
return Err(anyhow::anyhow!("Invalid event type: {}", event_type_str));
}
}
// Parse correlation ID filter
if let Some(corr_id_str) = &args.correlation_id {
let corr_id = uuid::Uuid::parse_str(corr_id_str)?;
query = query.with_correlation_id(corr_id);
}
// Parse severity filter
if let Some(severity_str) = &args.severity {
if let Some(severity) = EventSeverity::from_str(severity_str) {
query = query.with_severities(vec![severity]);
} else {
return Err(anyhow::anyhow!("Invalid severity: {}", severity_str));
}
}
// Call the API
let input = sd_core::ops::sync::get_event_log::GetSyncEventLogInput {
start_time: query.time_range.map(|(start, _)| start),
end_time: query.time_range.map(|(_, end)| end),
event_types: query.event_types,
categories: query.categories,
severities: query.severities,
peer_id: query.peer_filter,
model_type: query.model_type_filter,
correlation_id: query.correlation_id,
limit: query.limit,
offset: query.offset,
};
let json_response = ctx.core.query(&input, Some(library_id)).await?;
let output: sd_core::ops::sync::get_event_log::GetSyncEventLogOutput =
serde_json::from_value(json_response)?;
// Format output
let formatted = match args.format.as_str() {
"json" => format_events_json(&output.events, args.with_device)?,
"sql" => format_events_sql(&output.events),
"markdown" | "md" => format_events_markdown(&output.events, args.with_device),
_ => return Err(anyhow::anyhow!("Unknown format: {}. Use json, sql, or markdown", args.format)),
};
// Write output
if let Some(output_file) = &args.output {
std::fs::write(output_file, formatted)?;
println!("Exported {} events to {}", output.events.len(), output_file);
} else {
println!("{}", formatted);
}
Ok(())
}
fn format_events_json(events: &[sd_core::infra::sync::SyncEventLog], with_device: bool) -> Result<String> {
if with_device {
Ok(serde_json::to_string_pretty(events)?)
} else {
// Strip device_id for cleaner output
let simplified: Vec<_> = events
.iter()
.map(|e| {
serde_json::json!({
"timestamp": e.timestamp,
"event_type": e.event_type,
"severity": e.severity,
"summary": e.summary,
"details": e.details,
"correlation_id": e.correlation_id,
"peer_device_id": e.peer_device_id,
"record_count": e.record_count,
"duration_ms": e.duration_ms,
})
})
.collect();
Ok(serde_json::to_string_pretty(&simplified)?)
}
}
fn format_events_sql(events: &[sd_core::infra::sync::SyncEventLog]) -> String {
let mut output = String::from("-- Sync Event Log Export\n");
output.push_str("-- Generated: ");
output.push_str(&Utc::now().to_rfc3339());
output.push_str("\n\n");
for event in events {
let details = event
.details
.as_ref()
.map(|d| serde_json::to_string(d).unwrap_or_default())
.unwrap_or_default();
let model_types = event
.model_types
.as_ref()
.map(|m| m.join(","))
.unwrap_or_default();
output.push_str(&format!(
"INSERT INTO sync_event_log (timestamp, device_id, event_type, category, severity, summary, details, correlation_id, peer_device_id, model_types, record_count, duration_ms) VALUES ('{}', '{}', '{}', '{}', '{}', '{}', {}, {}, {}, {}, {}, {});\n",
event.timestamp.to_rfc3339(),
event.device_id,
event.event_type.as_str(),
event.category.as_str(),
event.severity.as_str(),
event.summary.replace("'", "''"), // Escape quotes
if details.is_empty() { "NULL".to_string() } else { format!("'{}'", details.replace("'", "''")) },
event.correlation_id.map(|id| format!("'{}'", id)).unwrap_or_else(|| "NULL".to_string()),
event.peer_device_id.map(|id| format!("'{}'", id)).unwrap_or_else(|| "NULL".to_string()),
if model_types.is_empty() { "NULL".to_string() } else { format!("'{}'", model_types) },
event.record_count.map(|c| c.to_string()).unwrap_or_else(|| "NULL".to_string()),
event.duration_ms.map(|d| d.to_string()).unwrap_or_else(|| "NULL".to_string()),
));
}
output
}
fn format_events_markdown(events: &[sd_core::infra::sync::SyncEventLog], with_device: bool) -> String {
let mut output = String::from("# Sync Event Log\n\n");
output.push_str(&format!("**Exported**: {}\n", Utc::now().to_rfc3339()));
output.push_str(&format!("**Event Count**: {}\n\n", events.len()));
output.push_str("| Timestamp | Event Type | Severity | Summary |\n");
output.push_str("|-----------|------------|----------|----------|\n");
for event in events {
output.push_str(&format!(
"| {} | {:?} | {:?} | {} |\n",
event.timestamp.format("%Y-%m-%d %H:%M:%S"),
event.event_type,
event.severity,
event.summary
));
}
if with_device && !events.is_empty() {
output.push_str("\n## Devices\n\n");
let device_ids: std::collections::HashSet<_> = events.iter().map(|e| e.device_id).collect();
for device_id in device_ids {
output.push_str(&format!("- {}\n", device_id));
}
}
output
}

View File

@ -476,6 +476,53 @@ impl SyncProtocolHandler {
Ok(None)
}
SyncMessage::EventLogRequest {
library_id,
requesting_device,
since,
event_types,
correlation_id,
limit,
} => {
debug!(
from_device = %from_device,
requesting_device = %requesting_device,
"Processing event log request"
);
let backfill_manager = self.backfill_manager.as_ref().ok_or_else(|| {
NetworkingError::Protocol("BackfillManager not initialized".to_string())
})?;
let log_handler = backfill_manager.log_handler();
let response = log_handler
.handle_event_log_request(requesting_device, since, event_types, correlation_id, limit)
.await
.map_err(|e| {
NetworkingError::Protocol(format!("Failed to handle event log request: {}", e))
})?;
Ok(Some(response))
}
SyncMessage::EventLogResponse {
library_id,
responding_device,
events,
} => {
debug!(
from_device = %from_device,
responding_device = %responding_device,
event_count = events.len(),
"Received event log response from peer"
);
// EventLogResponse is handled by the cross-device fetcher waiting for it
// We don't process it here - it gets delivered to the waiting query
Ok(None)
}
SyncMessage::Error {
library_id,
message,

View File

@ -117,6 +117,23 @@ pub enum SyncMessage {
needs_shared_catchup: bool, // If true, peer needs our shared changes
},
/// Request event log from peer
EventLogRequest {
library_id: Uuid,
requesting_device: Uuid,
since: Option<DateTime<Utc>>,
event_types: Option<Vec<String>>,
correlation_id: Option<Uuid>,
limit: u32,
},
/// Response with event log
EventLogResponse {
library_id: Uuid,
responding_device: Uuid,
events: Vec<serde_json::Value>, // Serialized SyncEventLog
},
/// Error response
Error { library_id: Uuid, message: String },
}
@ -145,6 +162,8 @@ impl SyncMessage {
| SyncMessage::Heartbeat { library_id, .. }
| SyncMessage::WatermarkExchangeRequest { library_id, .. }
| SyncMessage::WatermarkExchangeResponse { library_id, .. }
| SyncMessage::EventLogRequest { library_id, .. }
| SyncMessage::EventLogResponse { library_id, .. }
| SyncMessage::Error { library_id, .. } => *library_id,
}
}

View File

@ -70,6 +70,11 @@ impl BackfillManager {
&self.metrics
}
/// Get log handler for protocol operations
pub fn log_handler(&self) -> &Arc<LogSyncHandler> {
&self.log_handler
}
/// Deliver a StateResponse to waiting request
///
/// Called by protocol handler when StateResponse is received.

View File

@ -201,11 +201,13 @@ impl SyncService {
);
// Create protocol handlers
let log_handler = Arc::new(LogSyncHandler::new(
let mut log_handler = LogSyncHandler::new(
library_id,
library.db().clone(),
peer_sync.clone(),
));
);
log_handler.set_event_logger(event_logger.clone());
let log_handler = Arc::new(log_handler);
// Create backfill manager for automatic orchestration
let backfill_manager = Arc::new(BackfillManager::new(

View File

@ -17,12 +17,14 @@ use tracing::{debug, info, warn};
use uuid::Uuid;
use super::peer::PeerSync;
use crate::infra::sync::{SyncEventLog, SyncEventQuery, SyncEventType};
/// Handle log-based sync messages (shared resources)
pub struct LogSyncHandler {
library_id: Uuid,
db: Arc<Database>,
peer_sync: Arc<PeerSync>,
event_logger: Option<Arc<crate::infra::sync::SyncEventLogger>>,
}
impl LogSyncHandler {
@ -31,9 +33,15 @@ impl LogSyncHandler {
library_id,
db,
peer_sync,
event_logger: None,
}
}
/// Set the event logger (called after initialization)
pub fn set_event_logger(&mut self, logger: Arc<crate::infra::sync::SyncEventLogger>) {
self.event_logger = Some(logger);
}
/// Handle incoming SharedChange message
///
/// Uses the Syncable registry with conflict resolution strategies.
@ -140,4 +148,68 @@ impl LogSyncHandler {
Ok(serde_json::Value::Object(json_map))
}
/// Handle EventLogRequest message
pub async fn handle_event_log_request(
&self,
requesting_device: Uuid,
since: Option<DateTime<Utc>>,
event_types: Option<Vec<String>>,
correlation_id: Option<Uuid>,
limit: u32,
) -> Result<SyncMessage> {
debug!(
requesting_device = %requesting_device,
since = ?since,
limit = limit,
"Handling event log request from peer"
);
// Build query
let mut query = SyncEventQuery::new(self.library_id).with_limit(limit);
if let Some(since_time) = since {
query = query.with_time_range(since_time, Utc::now());
}
if let Some(types_str) = event_types {
let types: Vec<SyncEventType> = types_str
.into_iter()
.filter_map(|s| SyncEventType::from_str(&s))
.collect();
if !types.is_empty() {
query = query.with_event_types(types);
}
}
if let Some(corr_id) = correlation_id {
query = query.with_correlation_id(corr_id);
}
// Query local events
let logger = self
.event_logger
.as_ref()
.ok_or_else(|| anyhow::anyhow!("Event logger not initialized"))?;
let events = logger.query(query).await?;
// Serialize events to JSON
let events_json: Vec<serde_json::Value> = events
.into_iter()
.map(|e| serde_json::to_value(e))
.collect::<Result<Vec<_>, _>>()?;
info!(
event_count = events_json.len(),
requesting_device = %requesting_device,
"Responding to event log request"
);
Ok(SyncMessage::EventLogResponse {
library_id: self.library_id,
responding_device: self.peer_sync.device_id(),
events: events_json,
})
}
}

View File

@ -114,6 +114,93 @@ CREATE INDEX idx_sync_event_log_type ON sync_event_log(event_type);
CREATE INDEX idx_sync_event_log_correlation ON sync_event_log(correlation_id);
```
## CLI Export
### Export Events from a Device
```bash
# Export as JSON (default)
sd sync events
# Export to file
sd sync events --output sync_events.json
# Export last hour only
sd sync events --since "1 hour ago" -o recent_events.json
# Export as SQL (for database import)
sd sync events --format sql -o events.sql
# Export as Markdown
sd sync events --format markdown -o events.md
# Filter by event type
sd sync events --event-type state_transition
# Filter by severity (errors only)
sd sync events --severity error
# Trace specific backfill session
sd sync events --correlation-id abc-123-def
# Limit results
sd sync events --limit 500
```
### Cross-Device Timeline
To debug sync issues across multiple devices, export logs from each device and merge:
```bash
# On Device A
sd sync events --since "2 hours ago" --output device_a.json
# On Device B
sd sync events --since "2 hours ago" --output device_b.json
# Merge chronologically (requires jq)
jq -s 'add | sort_by(.timestamp)' device_a.json device_b.json > timeline.json
# View merged timeline
jq '.[] | "\(.timestamp) | \(.device_id[0:8]) | \(.event_type) | \(.summary)"' timeline.json
```
### SQL Import for Analysis
```bash
# Export as SQL from both devices
sd sync events --format sql -o device_a.sql
sd sync events --format sql -o device_b.sql
# Create combined database
sqlite3 timeline.db < device_a.sql
sqlite3 timeline.db < device_b.sql
# Query cross-device timeline
sqlite3 timeline.db "
SELECT
timestamp,
substr(device_id, 1, 8) as device,
event_type,
summary
FROM sync_event_log
ORDER BY timestamp
"
# Find matching backfill sessions across devices
sqlite3 timeline.db "
SELECT
correlation_id,
COUNT(DISTINCT device_id) as device_count,
MIN(timestamp) as started,
MAX(timestamp) as completed
FROM sync_event_log
WHERE correlation_id IS NOT NULL
GROUP BY correlation_id
ORDER BY started DESC
"
```
## Query API
### Rust API
@ -208,6 +295,35 @@ WHERE event_type = 'backfill_session_started'
ORDER BY timestamp DESC;
```
## Network Protocol
The system includes protocol messages for querying event logs from remote peers over the network:
**EventLogRequest** - Request events from a peer
```rust
EventLogRequest {
library_id: Uuid,
requesting_device: Uuid,
since: Option<DateTime<Utc>>,
event_types: Option<Vec<String>>,
correlation_id: Option<Uuid>,
limit: u32,
}
```
**EventLogResponse** - Peer's events
```rust
EventLogResponse {
library_id: Uuid,
responding_device: Uuid,
events: Vec<serde_json::Value>,
}
```
These messages are routed through `SyncProtocolHandler` and handled by `LogSyncHandler::handle_event_log_request()`.
**Status**: Protocol foundation complete. Automatic cross-device fetching can be implemented when needed by creating a cross-device fetcher that sends requests to all online peers and merges responses.
## Debugging Scenarios
### "Why did sync fail?"
@ -402,6 +518,62 @@ Tests verify:
- Persistence across restarts
- Batch aggregation
## Export Formats
### JSON Format
Structured output for programmatic processing:
```json
[
{
"timestamp": "2025-12-03T10:15:00Z",
"event_type": "StateTransition",
"severity": "Info",
"summary": "Ready → Backfilling",
"details": { "reason": "new device joined" },
"correlation_id": null,
"duration_ms": null
},
{
"timestamp": "2025-12-03T10:15:32Z",
"event_type": "BatchIngestion",
"severity": "Debug",
"summary": "Ingested batch of 45,000 records (25k entries, 15k tags, 5k locations) in 28s",
"record_count": 45000,
"duration_ms": 28000
}
]
```
### SQL Format
INSERT statements for database import:
```sql
-- Sync Event Log Export
-- Generated: 2025-12-03T10:20:00Z
INSERT INTO sync_event_log (...) VALUES (...);
INSERT INTO sync_event_log (...) VALUES (...);
```
### Markdown Format
Human-readable table format:
```markdown
# Sync Event Log
**Exported**: 2025-12-03T10:20:00Z
**Event Count**: 15
| Timestamp | Event Type | Severity | Summary |
|-----------|------------|----------|---------|
| 2025-12-03 10:15:00 | StateTransition | Info | Ready → Backfilling |
| 2025-12-03 10:15:32 | BatchIngestion | Debug | Ingested 45k records |
```
## Troubleshooting
**Events not appearing:**