feat: Add library validation and selection logic to Context

- Implemented `validate_and_fix_library` method in the Context struct to ensure the current library selection is valid.
- If the stored library ID is not found, the method selects the first available library.
- Updated the main execution flow to call this new method, enhancing library management during command execution.
This commit is contained in:
Jamie Pine 2025-10-09 05:45:47 -07:00
parent 15b26e0fda
commit 4ef5d59ebe
6 changed files with 70 additions and 10 deletions

View File

@ -177,4 +177,37 @@ impl Context {
execute_core_query!(self, input); execute_core_query!(self, input);
Ok(output) Ok(output)
} }
/// Validate and fix the current library selection
/// If the stored library ID is not found, select the first available library
pub async fn validate_and_fix_library(&mut self) -> Result<()> {
if let Some(stored_library_id) = self.library_id {
let libraries = self.list_libraries().await?;
if libraries.is_empty() {
self.library_id = None;
self.cli_config.clear_current_library(&self.data_dir)?;
return Ok(());
}
let library_exists = libraries.iter().any(|lib| lib.id == stored_library_id);
if !library_exists {
if let Some(first_lib) = libraries.first() {
self.library_id = Some(first_lib.id);
self.cli_config
.set_current_library(first_lib.id, &self.data_dir)?;
}
}
} else {
let libraries = self.list_libraries().await?;
if let Some(first_lib) = libraries.first() {
self.library_id = Some(first_lib.id);
self.cli_config
.set_current_library(first_lib.id, &self.data_dir)?;
}
}
Ok(())
}
} }

View File

@ -458,7 +458,10 @@ async fn run_client_command(
} }
let core = CoreClient::new(socket_path.clone()); let core = CoreClient::new(socket_path.clone());
let ctx = Context::new(core, format, data_dir, socket_path)?; let mut ctx = Context::new(core, format, data_dir, socket_path)?;
ctx.validate_and_fix_library().await?;
match command { match command {
Commands::Status => { Commands::Status => {
let status: sd_core::ops::core::status::output::CoreStatus = let status: sd_core::ops::core::status::output::CoreStatus =

View File

@ -26,6 +26,7 @@ pub struct JobExecutor<J: JobHandler> {
pub struct JobExecutorState { pub struct JobExecutorState {
pub job_id: JobId, pub job_id: JobId,
pub job_name: String,
pub library: Arc<Library>, pub library: Arc<Library>,
pub job_db: Arc<JobDb>, pub job_db: Arc<JobDb>,
pub status_tx: watch::Sender<super::types::JobStatus>, pub status_tx: watch::Sender<super::types::JobStatus>,
@ -47,6 +48,7 @@ impl<J: JobHandler> JobExecutor<J> {
pub fn new( pub fn new(
job: J, job: J,
job_id: JobId, job_id: JobId,
job_name: String,
library: Arc<Library>, library: Arc<Library>,
job_db: Arc<JobDb>, job_db: Arc<JobDb>,
status_tx: watch::Sender<super::types::JobStatus>, status_tx: watch::Sender<super::types::JobStatus>,
@ -67,7 +69,7 @@ impl<J: JobHandler> JobExecutor<J> {
let log_file = logs_dir.join(format!("{}.log", job_id)); let log_file = logs_dir.join(format!("{}.log", job_id));
match super::logger::FileJobLogger::new(job_id, log_file, config.clone()) { match super::logger::FileJobLogger::new(job_id, log_file, config.clone()) {
Ok(logger) => { Ok(logger) => {
let _ = logger.log("INFO", &format!("Job {} ({}) starting", job_id, J::NAME)); let _ = logger.log("INFO", &format!("Job {} ({}) starting", job_id, &job_name));
Some(Arc::new(logger)) Some(Arc::new(logger))
} }
Err(e) => { Err(e) => {
@ -83,6 +85,7 @@ impl<J: JobHandler> JobExecutor<J> {
job, job,
state: JobExecutorState { state: JobExecutorState {
job_id, job_id,
job_name,
library, library,
job_db, job_db,
status_tx, status_tx,
@ -151,7 +154,10 @@ impl<J: JobHandler> Task<JobError> for JobExecutor<J> {
if let Some(logger) = &self.state.file_logger { if let Some(logger) = &self.state.file_logger {
let _ = logger.log( let _ = logger.log(
"INFO", "INFO",
&format!("Starting job {}: {}", self.state.job_id, J::NAME), &format!(
"Starting job {}: {}",
self.state.job_id, self.state.job_name
),
); );
} }
@ -185,7 +191,10 @@ impl<J: JobHandler> Task<JobError> for JobExecutor<J> {
impl<J: JobHandler> JobExecutor<J> { impl<J: JobHandler> JobExecutor<J> {
async fn run_inner(&mut self, interrupter: &Interrupter) -> Result<ExecStatus, JobError> { async fn run_inner(&mut self, interrupter: &Interrupter) -> Result<ExecStatus, JobError> {
info!("Starting job {}: {}", self.state.job_id, J::NAME); info!(
"Starting job {}: {}",
self.state.job_id, self.state.job_name
);
// Update status to running // Update status to running
warn!( warn!(
@ -439,6 +448,7 @@ impl<J: JobHandler + std::fmt::Debug> ErasedJob for JobExecutor<J> {
fn create_executor( fn create_executor(
self: Box<Self>, self: Box<Self>,
job_id: JobId, job_id: JobId,
job_name: String,
library: std::sync::Arc<crate::library::Library>, library: std::sync::Arc<crate::library::Library>,
job_db: std::sync::Arc<crate::infra::job::database::JobDb>, job_db: std::sync::Arc<crate::infra::job::database::JobDb>,
status_tx: tokio::sync::watch::Sender<JobStatus>, status_tx: tokio::sync::watch::Sender<JobStatus>,
@ -477,6 +487,7 @@ impl<J: JobHandler + std::fmt::Debug> ErasedJob for JobExecutor<J> {
executor.state = JobExecutorState { executor.state = JobExecutorState {
job_id, job_id,
job_name,
library, library,
job_db, job_db,
status_tx, status_tx,

View File

@ -118,7 +118,9 @@ impl JobManager {
if REGISTRY.has_job(job_name) { if REGISTRY.has_job(job_name) {
// Create job instance from core registry // Create job instance from core registry
let erased_job = REGISTRY.create_job(job_name, params)?; let erased_job = REGISTRY.create_job(job_name, params)?;
return self.dispatch_erased_job(job_name, erased_job, priority, None).await; return self
.dispatch_erased_job(job_name, erased_job, priority, None)
.await;
} }
// Check if it's an extension job (contains colon) // Check if it's an extension job (contains colon)
@ -129,16 +131,20 @@ impl JobManager {
if job_registry.has_job(job_name) { if job_registry.has_job(job_name) {
// Extract state JSON from params // Extract state JSON from params
let state_json = serde_json::to_string(&params) let state_json = serde_json::to_string(&params).map_err(|e| {
.map_err(|e| JobError::serialization(format!("Failed to serialize params: {}", e)))?; JobError::serialization(format!("Failed to serialize params: {}", e))
})?;
// Create WasmJob from registry // Create WasmJob from registry
let wasm_job = job_registry let wasm_job = job_registry
.create_wasm_job(job_name, state_json) .create_wasm_job(job_name, state_json)
.map_err(|e| JobError::NotFound(e))?; .map_err(|e| JobError::NotFound(e))?;
// Dispatch the WasmJob // Box as ErasedJob and dispatch with the extension job name
return self.dispatch_with_priority(wasm_job, priority, None).await; let erased_job = Box::new(wasm_job) as Box<dyn ErasedJob>;
return self
.dispatch_erased_job(job_name, erased_job, priority, None)
.await;
} }
} }
} }
@ -270,6 +276,7 @@ impl JobManager {
// Create executor using the erased job // Create executor using the erased job
let executor = erased_job.create_executor( let executor = erased_job.create_executor(
job_id, job_id,
job_name.to_string(),
library, library,
self.db.clone(), self.db.clone(),
status_tx.clone(), status_tx.clone(),
@ -555,6 +562,7 @@ impl JobManager {
let executor = JobExecutor::new( let executor = JobExecutor::new(
job, job,
job_id, job_id,
J::NAME.to_string(),
library, library,
self.db.clone(), self.db.clone(),
status_tx.clone(), status_tx.clone(),
@ -1112,7 +1120,7 @@ impl JobManager {
let job_name = job_record.name.clone(); let job_name = job_record.name.clone();
let handle = JobHandle { let handle = JobHandle {
id: job_id, id: job_id,
job_name, job_name: job_name.clone(),
task_handle: Arc::new(Mutex::new(None)), task_handle: Arc::new(Mutex::new(None)),
status_rx, status_rx,
progress_rx: broadcast_rx, progress_rx: broadcast_rx,
@ -1126,6 +1134,7 @@ impl JobManager {
// Create executor using the erased job // Create executor using the erased job
let executor = erased_job.create_executor( let executor = erased_job.create_executor(
job_id, job_id,
job_name,
library, library,
self.db.clone(), self.db.clone(),
status_tx.clone(), status_tx.clone(),
@ -1479,6 +1488,7 @@ impl JobManager {
// Create executor // Create executor
let executor = erased_job.create_executor( let executor = erased_job.create_executor(
job_id, job_id,
job_name.clone(),
library, library,
self.db.clone(), self.db.clone(),
status_tx.clone(), status_tx.clone(),

View File

@ -126,6 +126,7 @@ pub trait ErasedJob: Send + Sync + std::fmt::Debug + 'static {
fn create_executor( fn create_executor(
self: Box<Self>, self: Box<Self>,
job_id: JobId, job_id: JobId,
job_name: String,
library: std::sync::Arc<crate::library::Library>, library: std::sync::Arc<crate::library::Library>,
job_db: std::sync::Arc<crate::infra::job::database::JobDb>, job_db: std::sync::Arc<crate::infra::job::database::JobDb>,
status_tx: tokio::sync::watch::Sender<JobStatus>, status_tx: tokio::sync::watch::Sender<JobStatus>,

View File

@ -63,6 +63,7 @@ pub fn derive_job(input: TokenStream) -> TokenStream {
fn create_executor( fn create_executor(
self: Box<Self>, self: Box<Self>,
job_id: crate::infra::job::types::JobId, job_id: crate::infra::job::types::JobId,
job_name: String,
library: std::sync::Arc<crate::library::Library>, library: std::sync::Arc<crate::library::Library>,
job_db: std::sync::Arc<crate::infra::job::database::JobDb>, job_db: std::sync::Arc<crate::infra::job::database::JobDb>,
status_tx: tokio::sync::watch::Sender<crate::infra::job::types::JobStatus>, status_tx: tokio::sync::watch::Sender<crate::infra::job::types::JobStatus>,
@ -79,6 +80,7 @@ pub fn derive_job(input: TokenStream) -> TokenStream {
Box::new(crate::infra::job::executor::JobExecutor::new( Box::new(crate::infra::job::executor::JobExecutor::new(
*self, *self,
job_id, job_id,
job_name,
library, library,
job_db, job_db,
status_tx, status_tx,