Refactor IndexerJob to separate job phase execution and ensure proper ephemeral indexing cleanup

- Moved the job phase logic into a new `run_job_phases` method for better organization and clarity.
- Updated the `run` method to always mark ephemeral indexing as complete, even on failure, preventing stuck indexing flags.
- Enhanced logging to provide feedback on the completion status of ephemeral indexing.
This commit is contained in:
Jamie Pine 2025-12-07 21:46:21 -08:00
parent aff2398563
commit 4a2590d418

View File

@ -674,22 +674,9 @@ impl DynJob for IndexerJob {
impl JobProgress for IndexerProgress {}
#[async_trait::async_trait]
impl JobHandler for IndexerJob {
type Output = IndexerOutput;
async fn run(&mut self, ctx: JobContext<'_>) -> JobResult<Self::Output> {
// Initialize timer
if self.timer.is_none() {
self.timer = Some(PhaseTimer::new());
}
// Initialize ephemeral index if needed
if self.config.is_ephemeral() && self.ephemeral_index.is_none() {
self.ephemeral_index = Some(Arc::new(RwLock::new(EphemeralIndex::new())));
ctx.log("Initialized ephemeral index for non-persistent job");
}
impl IndexerJob {
/// Inner implementation of the job phases (separated for cleanup guarantee)
async fn run_job_phases(&mut self, ctx: &JobContext<'_>) -> JobResult<IndexerOutput> {
// Initialize or restore state
// Ensure state is always created early to avoid serialization issues
if self.state.is_none() {
@ -983,21 +970,7 @@ impl JobHandler for IndexerJob {
}
}
// Mark ephemeral indexing as complete in the cache
if self.config.is_ephemeral() {
if let Some(local_path) = self.config.path.as_local_path() {
ctx.library()
.core_context()
.ephemeral_cache()
.mark_indexing_complete(local_path);
ctx.log(format!(
"Marked ephemeral indexing complete for: {}",
local_path.display()
));
}
}
// Generate final output
// Generate final output (cleanup happens in outer run() method)
Ok(IndexerOutput {
location_id: self.config.location_id,
stats: state.stats,
@ -1011,6 +984,52 @@ impl JobHandler for IndexerJob {
},
})
}
}
// JobHandler trait implementation
#[async_trait::async_trait]
impl JobHandler for IndexerJob {
type Output = IndexerOutput;
async fn run(&mut self, ctx: JobContext<'_>) -> JobResult<Self::Output> {
// Initialize timer
if self.timer.is_none() {
self.timer = Some(PhaseTimer::new());
}
// Initialize ephemeral index if needed
if self.config.is_ephemeral() && self.ephemeral_index.is_none() {
self.ephemeral_index = Some(Arc::new(RwLock::new(EphemeralIndex::new())));
ctx.log("Initialized ephemeral index for non-persistent job");
}
// Run the actual job, ensuring ephemeral cleanup happens on both success and failure
let result = self.run_job_phases(&ctx).await;
// ALWAYS mark ephemeral indexing complete, even on failure
// This prevents the indexing flag from being stuck forever
if self.config.is_ephemeral() {
if let Some(local_path) = self.config.path.as_local_path() {
ctx.library()
.core_context()
.ephemeral_cache()
.mark_indexing_complete(local_path);
match &result {
Ok(_) => ctx.log(format!(
"Marked ephemeral indexing complete for: {}",
local_path.display()
)),
Err(e) => ctx.log(format!(
"Marked ephemeral indexing complete (job failed: {}) for: {}",
e,
local_path.display()
)),
}
}
}
result
}
async fn on_resume(&mut self, ctx: &JobContext<'_>) -> JobResult {
// State is already loaded from serialization