spacedrive/docs/core/jobs.mdx
2025-11-14 21:31:21 -08:00

379 lines
10 KiB
Plaintext

---
title: Job System
sidebarTitle: Jobs
---
The job system powers long-running operations in Spacedrive. It provides automatic persistence, progress tracking, and graceful interruption handling for tasks like indexing, file processing, and sync operations.
Jobs execute asynchronously with minimal boilerplate. They persist their state to survive crashes and resume where they left off. The system integrates with Spacedrive's task executor for efficient resource usage.
## Core Concepts
A job represents a resumable unit of work. Jobs report progress, handle interruptions, and maintain state across executions. The system manages job lifecycles automatically.
<Note>
Jobs are library-scoped. Each library maintains its own job database and execution queue.
</Note>
### Job Lifecycle
Jobs transition through defined states during execution:
<Steps>
<Step title="Queued">
Job created and waiting for execution. Initial state after dispatch.
</Step>
<Step title="Running">
Job actively executing. Progress updates flow to subscribers.
</Step>
<Step title="Paused">
Job interrupted but resumable. State persisted to database.
</Step>
<Step title="Completed">
Job finished successfully. Moved to history table.
</Step>
</Steps>
Failed or cancelled jobs cannot resume. The system distinguishes between recoverable interruptions and permanent failures.
### Key Components
The job system consists of several interconnected parts:
**Job Manager** coordinates all job operations. It maintains the job database, tracks running jobs, and handles lifecycle transitions. Located at `core/src/infra/job/manager.rs`.
**Job Registry** enables automatic job discovery. Jobs register themselves at compile time using the derive macro. The registry creates jobs dynamically from saved state. See `core/src/infra/job/registry.rs`.
**Job Context** provides execution environment. Jobs access the database, report progress, and interact with services through context. Implementation in `core/src/infra/job/context.rs`.
**Job Executor** bridges jobs with the task system. It manages interruption signals and checkpoint operations. Found at `core/src/infra/job/executor.rs`.
## Defining Jobs
Jobs implement two traits: `Job` for metadata and `JobHandler` for execution logic.
```rust
use sd_task_system::TaskId;
use serde::{Serialize, Deserialize};
#[derive(Serialize, Deserialize, Debug)]
struct ProcessFilesJob {
location_id: i32,
files: Vec<PathBuf>,
processed: usize,
}
#[typetag::serde(name = "process_files")]
impl Job for ProcessFilesJob {
const NAME: &'static str = "process_files";
const VERSION: u32 = 1;
const IS_RESUMABLE: bool = true;
}
#[async_trait::async_trait]
impl JobHandler for ProcessFilesJob {
async fn run(&mut self, ctx: &JobContext) -> Result<Vec<i32>, JobError> {
// Skip already processed files when resuming
let files_to_process = &self.files[self.processed..];
for (idx, file) in files_to_process.iter().enumerate() {
// Check for interruption
if ctx.check_interrupted().await? {
return Err(JobError::Interrupted);
}
// Process file
process_file(file, ctx).await?;
// Update progress
self.processed += 1;
ctx.report_count_progress(self.processed, self.files.len()).await;
// Save checkpoint periodically
if idx % 100 == 0 {
ctx.checkpoint().await?;
}
}
Ok(vec![])
}
}
```
<Info>
The `#[typetag::serde]` attribute enables polymorphic serialization. Jobs must be serializable to support resumption.
</Info>
### Progress Reporting
Jobs communicate progress through the context. The system supports multiple progress types:
```rust
// Count-based progress
ctx.report_count_progress(current, total).await;
// Percentage progress
ctx.report_percentage_progress(0.75).await;
// Bytes processed
ctx.report_bytes_progress(processed_bytes, total_bytes).await;
// Custom structured data
ctx.report_structured_progress("phase", json!({
"stage": "analyzing",
"files_found": 1500
})).await;
```
Progress updates throttle automatically. The system batches updates to prevent database overhead.
### Error Handling
Jobs distinguish between recoverable and permanent errors:
```rust
// Recoverable - job can resume
if network_unavailable() {
return Err(JobError::Interrupted);
}
// Permanent failure - job cannot resume
if corrupt_data() {
return Err(JobError::Critical(
"Data corruption detected".into()
));
}
// Non-critical errors accumulate
ctx.report_non_critical_error("Skipped locked file").await;
```
<Warning>
Always check `ctx.check_interrupted()` in loops. This enables graceful shutdown and pause operations.
</Warning>
## Dispatching Jobs
The job manager provides typed and dynamic dispatch methods:
```rust
// Typed dispatch
let handle = manager.dispatch(ProcessFilesJob {
location_id: 1,
files: vec![path1, path2],
processed: 0,
}).await?;
// Dynamic dispatch by name
let handle = manager.dispatch_by_name(
"process_files",
json!({
"location_id": 1,
"files": ["path1", "path2"],
"processed": 0
})
).await?;
// Wait for completion
let result = handle.wait().await?;
```
Job handles provide status monitoring and progress streaming:
```rust
// Subscribe to status changes
let mut status_rx = handle.status();
while let Ok(status) = status_rx.recv().await {
match status {
JobStatus::Running => println!("Job started"),
JobStatus::Completed => break,
_ => {}
}
}
// Stream progress updates
let mut progress_rx = handle.progress();
while let Ok(progress) = progress_rx.recv().await {
if let Some(count) = progress.count {
println!("Processed {}/{}", count.current, count.total);
}
}
```
## Database Schema
Jobs persist to a dedicated SQLite database (`jobs.db`) with three tables:
<ResponseField name="jobs" type="table">
Active job records containing:
<Expandable title="columns">
<ResponseField name="id" type="UUID">
Unique job identifier
</ResponseField>
<ResponseField name="name" type="TEXT">
Job type name for registry lookup
</ResponseField>
<ResponseField name="version" type="INTEGER">
Schema version for migrations
</ResponseField>
<ResponseField name="status" type="TEXT">
Current job state
</ResponseField>
<ResponseField name="data" type="BLOB">
MessagePack serialized job state
</ResponseField>
<ResponseField name="progress" type="BLOB">
Latest progress snapshot
</ResponseField>
<ResponseField name="metrics" type="TEXT">
Performance statistics JSON
</ResponseField>
</Expandable>
</ResponseField>
<ResponseField name="job_history" type="table">
Completed jobs moved here for audit trails
</ResponseField>
<ResponseField name="job_checkpoints" type="table">
Resumption checkpoints for long-running jobs
</ResponseField>
## Advanced Features
### Job Versioning
Jobs specify versions for schema evolution:
```rust
impl Job for DataMigrationJob {
const VERSION: u32 = 2; // Increment when schema changes
}
```
The registry validates versions during resumption. Incompatible versions fail to load.
### Extension Jobs
The system supports WASM-based extension jobs:
```rust
manager.dispatch_extension_job(
extension_id,
job_name,
job_data
).await?;
```
Extensions run in isolated contexts with limited capabilities.
### Performance Considerations
The job system optimizes for throughput and resumability:
- Progress updates batch at 2-second intervals
- Checkpoints save incrementally
- Database operations use prepared statements
- Channels use bounded capacity to prevent memory growth
<Tip>
For high-frequency operations, batch work into larger chunks. This reduces checkpoint overhead and improves performance.
</Tip>
## Integration Points
Jobs integrate with core Spacedrive systems:
**Task System**: Jobs execute as tasks with configurable priority. The executor handles work distribution across threads.
**Event System**: State changes emit events for UI updates. Subscribe to `JOB_MANAGER_EVENTS` for notifications.
**Action System**: User actions spawn jobs with audit context. The system tracks who initiated operations.
**Library System**: Each library maintains independent job state. Jobs cannot access cross-library data.
## Common Patterns
### Batch Processing
Process items in chunks for efficiency:
```rust
const BATCH_SIZE: usize = 1000;
for (batch_idx, chunk) in items.chunks(BATCH_SIZE).enumerate() {
if ctx.check_interrupted().await? {
self.batch_idx = batch_idx;
return Err(JobError::Interrupted);
}
process_batch(chunk).await?;
ctx.report_count_progress(
batch_idx * BATCH_SIZE + chunk.len(),
items.len()
).await;
ctx.checkpoint().await?;
}
```
### Phased Execution
Split complex jobs into phases:
```rust
match self.phase {
Phase::Discovery => {
let items = discover_items(ctx).await?;
self.items = items;
self.phase = Phase::Processing;
ctx.checkpoint().await?;
}
Phase::Processing => {
process_items(&mut self.items, ctx).await?;
self.phase = Phase::Cleanup;
ctx.checkpoint().await?;
}
Phase::Cleanup => {
cleanup_resources(ctx).await?;
}
}
```
### Child Jobs
Spawn dependent jobs (feature in development):
```rust
let child_ids = ctx.spawn_children(vec![
AnalyzeFileJob { path: file1 },
AnalyzeFileJob { path: file2 },
]).await?;
ctx.wait_for_children(child_ids).await?;
```
## Debugging
Enable file-based logging for troubleshooting:
```rust
std::env::set_var("SD_JOBS_FILE_LOG", "1");
```
Logs write to `.spacedrive/jobs/{job_id}.log` with detailed execution traces.
Monitor job metrics through the context:
```rust
let metrics = ctx.get_metrics();
println!("Execution time: {}s", metrics.elapsed_seconds);
println!("Memory used: {}MB", metrics.memory_mb);
```
<Danger>
Never block the job executor thread. Use `tokio::task::spawn_blocking` for CPU-intensive work.
</Danger>
The job system provides the foundation for reliable background processing in Spacedrive. Its resumable design ensures operations complete despite interruptions, while the progress system keeps users informed of ongoing work.