parallel indexing rayon without rayon due to async requirement

This commit is contained in:
Jamie Pine 2025-12-07 15:45:26 -08:00
parent d1392607ad
commit 8fc01634f6
4 changed files with 383 additions and 33 deletions

View File

@ -19,9 +19,10 @@ cli = []
[dependencies]
# Async runtime
async-trait = "0.1"
futures = "0.3"
tokio = { version = "1.40", features = ["full"] }
async-channel = { workspace = true }
async-trait = "0.1"
futures = "0.3"
tokio = { version = "1.40", features = ["full"] }
# Database
sea-orm = { version = "1.1", features = [

View File

@ -9,7 +9,9 @@ use crate::{
state::{DirEntry, EntryKind, IndexError, IndexPhase, IndexerProgress, IndexerState},
},
};
use async_channel as chan;
use std::path::PathBuf;
use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering};
use std::time::Instant;
use std::{path::Path, sync::Arc};
@ -22,7 +24,7 @@ impl crate::ops::indexing::rules::MetadataForIndexerRules for SimpleMetadata {
}
}
/// Run the discovery phase of indexing
/// Run the discovery phase of indexing with parallel directory walking
pub async fn run_discovery_phase(
state: &mut IndexerState,
ctx: &JobContext<'_>,
@ -31,33 +33,382 @@ pub async fn run_discovery_phase(
volume_backend: Option<&Arc<dyn crate::volume::VolumeBackend>>,
cloud_url_base: Option<String>,
) -> Result<(), JobError> {
let concurrency = state.discovery_concurrency;
if concurrency <= 1 {
// Fall back to sequential discovery for concurrency = 1
return run_discovery_phase_sequential(
state,
ctx,
root_path,
rule_toggles,
volume_backend,
cloud_url_base,
)
.await;
}
ctx.log(format!(
"Discovery phase starting from: {}",
root_path.display()
"Discovery phase starting from: {} (concurrency: {})",
root_path.display(),
concurrency
));
ctx.log(format!(
"Initial directories to walk: {}",
state.dirs_to_walk.len()
));
let mut skipped_count = 0u64;
run_parallel_discovery(state, ctx, root_path, rule_toggles, volume_backend, cloud_url_base)
.await
}
let toggles = rule_toggles;
/// Parallel discovery implementation using Rayon-style work-stealing
async fn run_parallel_discovery(
state: &mut IndexerState,
ctx: &JobContext<'_>,
root_path: &Path,
rule_toggles: RuleToggles,
volume_backend: Option<&Arc<dyn crate::volume::VolumeBackend>>,
cloud_url_base: Option<String>,
) -> Result<(), JobError> {
let concurrency = state.discovery_concurrency;
// Use unbounded channels to avoid backpressure/deadlock issues
let (work_tx, work_rx) = chan::unbounded::<PathBuf>();
let (result_tx, result_rx) = chan::unbounded::<DiscoveryResult>();
// Atomic counter tracking work in progress + shutdown signal
// INVARIANT: incremented BEFORE sending to work channel, decremented AFTER processing
let pending_work = Arc::new(AtomicUsize::new(0));
let skipped_count = Arc::new(AtomicU64::new(0));
let shutdown = Arc::new(AtomicBool::new(false));
// Seed initial work
while let Some(dir) = state.dirs_to_walk.pop_front() {
pending_work.fetch_add(1, Ordering::Release);
work_tx
.send(dir)
.await
.map_err(|_| JobError::execution("Work channel closed"))?;
}
// Spawn worker tasks
let mut workers = Vec::new();
for worker_id in 0..concurrency {
let work_rx = work_rx.clone();
let work_tx = work_tx.clone();
let result_tx = result_tx.clone();
let pending_work = Arc::clone(&pending_work);
let skipped_count = Arc::clone(&skipped_count);
let shutdown = Arc::clone(&shutdown);
let root_path = root_path.to_path_buf();
let volume_backend = volume_backend.cloned();
let cloud_url_base = cloud_url_base.clone();
let worker = tokio::spawn(async move {
discovery_worker_rayon(
worker_id,
work_rx,
work_tx,
result_tx,
pending_work,
skipped_count,
shutdown,
root_path,
rule_toggles,
volume_backend,
cloud_url_base,
)
.await
});
workers.push(worker);
}
// Monitor task: signals shutdown when all work is done
let monitor = tokio::spawn({
let shutdown = Arc::clone(&shutdown);
let pending_work = Arc::clone(&pending_work);
async move {
loop {
tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
if pending_work.load(Ordering::Acquire) == 0 {
shutdown.store(true, Ordering::Release);
break;
}
}
}
});
// Drop our copies so channels close when workers are done
drop(work_tx);
drop(result_tx);
// Collect results
let mut total_processed = 0u64;
while let Ok(result) = result_rx.recv().await {
match result {
DiscoveryResult::Entry(entry) => {
state.pending_entries.push(entry);
total_processed += 1;
if state.should_create_batch() {
let batch = state.create_batch();
state.entry_batches.push(batch);
}
}
DiscoveryResult::Stats {
files,
dirs,
symlinks,
bytes,
} => {
state.stats.files += files;
state.stats.dirs += dirs;
state.stats.symlinks += symlinks;
state.stats.bytes += bytes;
}
DiscoveryResult::Error(error) => {
state.add_error(error);
}
DiscoveryResult::Progress { dirs_queued } => {
let indexer_progress = IndexerProgress {
phase: IndexPhase::Discovery { dirs_queued },
current_path: root_path.display().to_string(),
total_found: state.stats,
processing_rate: state.calculate_rate(),
estimated_remaining: state.estimate_remaining(),
scope: None,
persistence: None,
is_ephemeral: false,
action_context: None,
};
ctx.progress(Progress::generic(indexer_progress.to_generic_progress()));
state.items_since_last_update += 1;
}
DiscoveryResult::QueueDirectories(_) => {
// Workers queue directly, this shouldn't happen
unreachable!("Workers should not send QueueDirectories in Rayon-style mode");
}
}
ctx.check_interrupt().await?;
}
// Wait for monitor and workers
monitor
.await
.map_err(|e| JobError::execution(format!("Monitor task failed: {}", e)))?;
for worker in workers {
worker
.await
.map_err(|e| JobError::execution(format!("Worker task failed: {}", e)))?;
}
// Final batch
if !state.pending_entries.is_empty() {
let final_batch_size = state.pending_entries.len();
ctx.log(format!(
"Creating final batch with {} entries",
final_batch_size
));
let batch = state.create_batch();
state.entry_batches.push(batch);
}
let skipped = skipped_count.load(Ordering::SeqCst);
state.stats.skipped = skipped;
ctx.log(format!(
"Parallel discovery complete: {} files, {} dirs, {} symlinks, {} skipped, {} batches created",
state.stats.files,
state.stats.dirs,
state.stats.symlinks,
skipped,
state.entry_batches.len()
));
state.phase = crate::ops::indexing::state::Phase::Processing;
Ok(())
}
/// Result types sent from workers back to coordinator
enum DiscoveryResult {
Entry(DirEntry),
QueueDirectories(Vec<PathBuf>),
Stats {
files: u64,
dirs: u64,
symlinks: u64,
bytes: u64,
},
Error(IndexError),
Progress { dirs_queued: usize },
}
/// Rayon-style worker: processes directories and directly enqueues new work
async fn discovery_worker_rayon(
_worker_id: usize,
work_rx: chan::Receiver<PathBuf>,
work_tx: chan::Sender<PathBuf>,
result_tx: chan::Sender<DiscoveryResult>,
pending_work: Arc<AtomicUsize>,
skipped_count: Arc<AtomicU64>,
shutdown: Arc<AtomicBool>,
root_path: PathBuf,
rule_toggles: RuleToggles,
volume_backend: Option<Arc<dyn crate::volume::VolumeBackend>>,
cloud_url_base: Option<String>,
) {
let mut seen_paths = std::collections::HashSet::new();
loop {
// Check shutdown signal
if shutdown.load(Ordering::Acquire) {
break;
}
// Try to get work with a timeout to periodically check shutdown
let dir_path = match tokio::time::timeout(
tokio::time::Duration::from_millis(50),
work_rx.recv(),
)
.await
{
Ok(Ok(path)) => path,
Ok(Err(_)) => break, // Channel closed
Err(_) => continue, // Timeout, check shutdown flag again
};
// Skip if already seen (handles symlink loops)
if !seen_paths.insert(dir_path.clone()) {
pending_work.fetch_sub(1, Ordering::Release);
continue;
}
// Build rules for this directory
let dir_ruler = build_default_ruler(rule_toggles, &root_path, &dir_path).await;
// Read directory
match read_directory(&dir_path, volume_backend.as_ref(), cloud_url_base.as_deref()).await
{
Ok(entries) => {
let mut local_stats = LocalStats::default();
for entry in entries {
// Apply rules
let decision = dir_ruler
.evaluate_path(
&entry.path,
&SimpleMetadata {
is_dir: matches!(entry.kind, EntryKind::Directory),
},
)
.await;
if matches!(decision, Ok(RulerDecision::Reject)) {
skipped_count.fetch_add(1, Ordering::Relaxed);
continue;
}
if let Err(err) = decision {
let _ = result_tx
.send(DiscoveryResult::Error(IndexError::FilterCheck {
path: entry.path.to_string_lossy().to_string(),
error: err.to_string(),
}))
.await;
continue;
}
match entry.kind {
EntryKind::Directory => {
local_stats.dirs += 1;
// Rayon-style: increment BEFORE queueing, worker directly enqueues
pending_work.fetch_add(1, Ordering::Release);
if work_tx.send(entry.path.clone()).await.is_err() {
// Channel closed, decrement and continue
pending_work.fetch_sub(1, Ordering::Release);
}
let _ = result_tx.send(DiscoveryResult::Entry(entry)).await;
}
EntryKind::File => {
local_stats.files += 1;
local_stats.bytes += entry.size;
let _ = result_tx.send(DiscoveryResult::Entry(entry)).await;
}
EntryKind::Symlink => {
local_stats.symlinks += 1;
let _ = result_tx.send(DiscoveryResult::Entry(entry)).await;
}
}
}
// Send stats update
let _ = result_tx
.send(DiscoveryResult::Stats {
files: local_stats.files,
dirs: local_stats.dirs,
symlinks: local_stats.symlinks,
bytes: local_stats.bytes,
})
.await;
// Send progress update
let dirs_queued = pending_work.load(Ordering::Acquire);
let _ = result_tx
.send(DiscoveryResult::Progress { dirs_queued })
.await;
}
Err(e) => {
let _ = result_tx
.send(DiscoveryResult::Error(IndexError::ReadDir {
path: dir_path.to_string_lossy().to_string(),
error: e.to_string(),
}))
.await;
}
}
// Decrement AFTER processing complete
pending_work.fetch_sub(1, Ordering::Release);
}
}
#[derive(Default)]
struct LocalStats {
files: u64,
dirs: u64,
symlinks: u64,
bytes: u64,
}
/// Sequential discovery fallback (original implementation)
async fn run_discovery_phase_sequential(
state: &mut IndexerState,
ctx: &JobContext<'_>,
root_path: &Path,
rule_toggles: RuleToggles,
volume_backend: Option<&Arc<dyn crate::volume::VolumeBackend>>,
cloud_url_base: Option<String>,
) -> Result<(), JobError> {
ctx.log(format!(
"Discovery phase starting from: {} (sequential mode)",
root_path.display()
));
let mut skipped_count = 0u64;
while let Some(dir_path) = state.dirs_to_walk.pop_front() {
ctx.check_interrupt().await?;
// Skip if already seen (handles symlink loops)
if !state.seen_paths.insert(dir_path.clone()) {
continue;
}
// Build rules in the context of the current directory for gitignore behavior
let dir_ruler = build_default_ruler(toggles, root_path, &dir_path).await;
let dir_ruler = build_default_ruler(rule_toggles, root_path, &dir_path).await;
// Do not skip the directory itself by rules; only apply rules to its entries
// Update progress
let indexer_progress = IndexerProgress {
phase: IndexPhase::Discovery {
dirs_queued: state.dirs_to_walk.len(),
@ -69,21 +420,18 @@ pub async fn run_discovery_phase(
scope: None,
persistence: None,
is_ephemeral: false,
action_context: None, // TODO: Pass action context from job state
action_context: None,
};
ctx.progress(Progress::generic(indexer_progress.to_generic_progress()));
// Read directory entries with per-dir FS timing
match read_directory(&dir_path, volume_backend, cloud_url_base.as_deref()).await {
Ok(entries) => {
let entry_count = entries.len();
let mut added_count = 0;
for entry in entries {
// Check for interruption during entry processing
ctx.check_interrupt().await?;
// Skip filtered entries via rules engine
let decision = dir_ruler
.evaluate_path(
&entry.path,
@ -95,7 +443,6 @@ pub async fn run_discovery_phase(
if matches!(decision, Ok(RulerDecision::Reject)) {
state.stats.skipped += 1;
skipped_count += 1;
eprintln!("[discovery] Filtered entry: {}", entry.path.display());
continue;
}
if let Err(err) = decision {
@ -135,7 +482,6 @@ pub async fn run_discovery_phase(
));
}
// Batch entries
if state.should_create_batch() {
let batch = state.create_batch();
state.entry_batches.push(batch);
@ -151,13 +497,9 @@ pub async fn run_discovery_phase(
}
}
// Update rate tracking
state.items_since_last_update += 1;
// State is automatically saved during job serialization on shutdown
}
// Final batch
if !state.pending_entries.is_empty() {
let final_batch_size = state.pending_entries.len();
ctx.log(format!(

View File

@ -135,6 +135,11 @@ impl IndexerState {
dirs_to_walk.push_back(path.to_path_buf());
}
// Use half of available CPU cores for parallel discovery (Rayon-style)
let discovery_concurrency = std::thread::available_parallelism()
.map(|n| usize::max(n.get() / 2, 1))
.unwrap_or(4);
Self {
phase: Phase::Discovery,
started_at: Instant::now(),
@ -150,7 +155,7 @@ impl IndexerState {
last_progress_time: Instant::now(),
items_since_last_update: 0,
batch_size: 1000,
discovery_concurrency: 1,
discovery_concurrency,
dirs_channel_capacity: 4096,
entries_channel_capacity: 16384,
}

View File

@ -173,15 +173,16 @@ async fn test_location_indexing() -> Result<(), Box<dyn std::error::Error>> {
// 8. Verify indexed entries in database
// Helper to get all entry IDs under the location
let get_location_entry_ids = || async {
let descendant_ids = entry_closure::Entity::find()
.filter(entry_closure::Column::AncestorId.eq(location_entry_id))
let location_id = location_entry_id.expect("Location should have entry_id");
let descendant_ids: Vec<i32> = entry_closure::Entity::find()
.filter(entry_closure::Column::AncestorId.eq(location_id))
.all(db.conn())
.await?
.into_iter()
.map(|ec| ec.descendant_id)
.collect::<Vec<i32>>();
.collect();
let mut all_ids = vec![location_entry_id];
let mut all_ids = vec![location_id];
all_ids.extend(descendant_ids);
Ok::<Vec<i32>, anyhow::Error>(all_ids)
};
@ -337,15 +338,16 @@ async fn test_incremental_indexing() -> Result<(), Box<dyn std::error::Error>> {
}
// Get all entry IDs under this location
let descendant_ids = entry_closure::Entity::find()
.filter(entry_closure::Column::AncestorId.eq(location_entry_id))
let location_id = location_entry_id.expect("Location should have entry_id");
let descendant_ids: Vec<i32> = entry_closure::Entity::find()
.filter(entry_closure::Column::AncestorId.eq(location_id))
.all(db.conn())
.await?
.into_iter()
.map(|ec| ec.descendant_id)
.collect::<Vec<i32>>();
.collect();
let mut all_entry_ids = vec![location_entry_id];
let mut all_entry_ids = vec![location_id];
all_entry_ids.extend(descendant_ids);
let initial_file_count = entities::entry::Entity::find()