diff --git a/apps/cli/src/context.rs b/apps/cli/src/context.rs index ab361d135..8ca111145 100644 --- a/apps/cli/src/context.rs +++ b/apps/cli/src/context.rs @@ -177,4 +177,37 @@ impl Context { execute_core_query!(self, input); Ok(output) } + + /// Validate and fix the current library selection + /// If the stored library ID is not found, select the first available library + pub async fn validate_and_fix_library(&mut self) -> Result<()> { + if let Some(stored_library_id) = self.library_id { + let libraries = self.list_libraries().await?; + + if libraries.is_empty() { + self.library_id = None; + self.cli_config.clear_current_library(&self.data_dir)?; + return Ok(()); + } + + let library_exists = libraries.iter().any(|lib| lib.id == stored_library_id); + + if !library_exists { + if let Some(first_lib) = libraries.first() { + self.library_id = Some(first_lib.id); + self.cli_config + .set_current_library(first_lib.id, &self.data_dir)?; + } + } + } else { + let libraries = self.list_libraries().await?; + if let Some(first_lib) = libraries.first() { + self.library_id = Some(first_lib.id); + self.cli_config + .set_current_library(first_lib.id, &self.data_dir)?; + } + } + + Ok(()) + } } diff --git a/apps/cli/src/main.rs b/apps/cli/src/main.rs index c4078b35c..e6736db0e 100644 --- a/apps/cli/src/main.rs +++ b/apps/cli/src/main.rs @@ -458,7 +458,10 @@ async fn run_client_command( } let core = CoreClient::new(socket_path.clone()); - let ctx = Context::new(core, format, data_dir, socket_path)?; + let mut ctx = Context::new(core, format, data_dir, socket_path)?; + + ctx.validate_and_fix_library().await?; + match command { Commands::Status => { let status: sd_core::ops::core::status::output::CoreStatus = diff --git a/core/src/infra/job/executor.rs b/core/src/infra/job/executor.rs index c641a3916..db8bade58 100644 --- a/core/src/infra/job/executor.rs +++ b/core/src/infra/job/executor.rs @@ -26,6 +26,7 @@ pub struct JobExecutor { pub struct JobExecutorState { pub job_id: JobId, + pub job_name: String, pub library: Arc, pub job_db: Arc, pub status_tx: watch::Sender, @@ -47,6 +48,7 @@ impl JobExecutor { pub fn new( job: J, job_id: JobId, + job_name: String, library: Arc, job_db: Arc, status_tx: watch::Sender, @@ -67,7 +69,7 @@ impl JobExecutor { let log_file = logs_dir.join(format!("{}.log", job_id)); match super::logger::FileJobLogger::new(job_id, log_file, config.clone()) { Ok(logger) => { - let _ = logger.log("INFO", &format!("Job {} ({}) starting", job_id, J::NAME)); + let _ = logger.log("INFO", &format!("Job {} ({}) starting", job_id, &job_name)); Some(Arc::new(logger)) } Err(e) => { @@ -83,6 +85,7 @@ impl JobExecutor { job, state: JobExecutorState { job_id, + job_name, library, job_db, status_tx, @@ -151,7 +154,10 @@ impl Task for JobExecutor { if let Some(logger) = &self.state.file_logger { let _ = logger.log( "INFO", - &format!("Starting job {}: {}", self.state.job_id, J::NAME), + &format!( + "Starting job {}: {}", + self.state.job_id, self.state.job_name + ), ); } @@ -185,7 +191,10 @@ impl Task for JobExecutor { impl JobExecutor { async fn run_inner(&mut self, interrupter: &Interrupter) -> Result { - info!("Starting job {}: {}", self.state.job_id, J::NAME); + info!( + "Starting job {}: {}", + self.state.job_id, self.state.job_name + ); // Update status to running warn!( @@ -439,6 +448,7 @@ impl ErasedJob for JobExecutor { fn create_executor( self: Box, job_id: JobId, + job_name: String, library: std::sync::Arc, job_db: std::sync::Arc, status_tx: tokio::sync::watch::Sender, @@ -477,6 +487,7 @@ impl ErasedJob for JobExecutor { executor.state = JobExecutorState { job_id, + job_name, library, job_db, status_tx, diff --git a/core/src/infra/job/manager.rs b/core/src/infra/job/manager.rs index 0e42358c7..b49af7f92 100644 --- a/core/src/infra/job/manager.rs +++ b/core/src/infra/job/manager.rs @@ -118,7 +118,9 @@ impl JobManager { if REGISTRY.has_job(job_name) { // Create job instance from core registry let erased_job = REGISTRY.create_job(job_name, params)?; - return self.dispatch_erased_job(job_name, erased_job, priority, None).await; + return self + .dispatch_erased_job(job_name, erased_job, priority, None) + .await; } // Check if it's an extension job (contains colon) @@ -129,16 +131,20 @@ impl JobManager { if job_registry.has_job(job_name) { // Extract state JSON from params - let state_json = serde_json::to_string(¶ms) - .map_err(|e| JobError::serialization(format!("Failed to serialize params: {}", e)))?; + let state_json = serde_json::to_string(¶ms).map_err(|e| { + JobError::serialization(format!("Failed to serialize params: {}", e)) + })?; // Create WasmJob from registry let wasm_job = job_registry .create_wasm_job(job_name, state_json) .map_err(|e| JobError::NotFound(e))?; - // Dispatch the WasmJob - return self.dispatch_with_priority(wasm_job, priority, None).await; + // Box as ErasedJob and dispatch with the extension job name + let erased_job = Box::new(wasm_job) as Box; + return self + .dispatch_erased_job(job_name, erased_job, priority, None) + .await; } } } @@ -270,6 +276,7 @@ impl JobManager { // Create executor using the erased job let executor = erased_job.create_executor( job_id, + job_name.to_string(), library, self.db.clone(), status_tx.clone(), @@ -555,6 +562,7 @@ impl JobManager { let executor = JobExecutor::new( job, job_id, + J::NAME.to_string(), library, self.db.clone(), status_tx.clone(), @@ -1112,7 +1120,7 @@ impl JobManager { let job_name = job_record.name.clone(); let handle = JobHandle { id: job_id, - job_name, + job_name: job_name.clone(), task_handle: Arc::new(Mutex::new(None)), status_rx, progress_rx: broadcast_rx, @@ -1126,6 +1134,7 @@ impl JobManager { // Create executor using the erased job let executor = erased_job.create_executor( job_id, + job_name, library, self.db.clone(), status_tx.clone(), @@ -1479,6 +1488,7 @@ impl JobManager { // Create executor let executor = erased_job.create_executor( job_id, + job_name.clone(), library, self.db.clone(), status_tx.clone(), diff --git a/core/src/infra/job/types.rs b/core/src/infra/job/types.rs index 66e0ea2e0..93669ff91 100644 --- a/core/src/infra/job/types.rs +++ b/core/src/infra/job/types.rs @@ -126,6 +126,7 @@ pub trait ErasedJob: Send + Sync + std::fmt::Debug + 'static { fn create_executor( self: Box, job_id: JobId, + job_name: String, library: std::sync::Arc, job_db: std::sync::Arc, status_tx: tokio::sync::watch::Sender, diff --git a/crates/job-derive/src/lib.rs b/crates/job-derive/src/lib.rs index d2a8eb514..a1f38219c 100644 --- a/crates/job-derive/src/lib.rs +++ b/crates/job-derive/src/lib.rs @@ -63,6 +63,7 @@ pub fn derive_job(input: TokenStream) -> TokenStream { fn create_executor( self: Box, job_id: crate::infra::job::types::JobId, + job_name: String, library: std::sync::Arc, job_db: std::sync::Arc, status_tx: tokio::sync::watch::Sender, @@ -79,6 +80,7 @@ pub fn derive_job(input: TokenStream) -> TokenStream { Box::new(crate::infra::job::executor::JobExecutor::new( *self, job_id, + job_name, library, job_db, status_tx,