feat: Implement job registration system for WASM extensions

- Introduced a new `ExtensionJobRegistry` to manage custom job types for extensions at runtime.
- Added `host_register_job` function to facilitate job registration from WASM extensions.
- Updated `PluginEnv` and `PluginManager` to include job registry functionality.
- Enhanced the `job` macro to support automatic job registration during plugin initialization.
- Updated documentation and tests to reflect the new job registration capabilities.
This commit is contained in:
Jamie Pine 2025-10-09 05:18:35 -07:00
parent 0db006107e
commit 705249019c
11 changed files with 443 additions and 59 deletions

View File

@ -21,6 +21,7 @@ pub struct PluginEnv {
pub api_dispatcher: Arc<crate::infra::api::ApiDispatcher>, // For creating sessions
pub permissions: ExtensionPermissions,
pub memory: Memory,
pub job_registry: Arc<super::job_registry::ExtensionJobRegistry>,
}
/// THE MAIN HOST FUNCTION - Generic Wire RPC
@ -399,3 +400,65 @@ pub fn host_job_increment_items(
tracing::debug!(extension = %plugin_env.extension_id, "Processed {} items", count);
// TODO: Update metrics
}
// === Extension Registration Functions ===
/// Register a job type for an extension
///
/// Called from plugin_init() to register custom job types
///
/// # Arguments
/// - `job_name_ptr`, `job_name_len`: Job name (e.g., "email_scan")
/// - `export_fn_ptr`, `export_fn_len`: WASM export function (e.g., "execute_email_scan")
/// - `resumable`: Whether the job supports resumption (1 = yes, 0 = no)
///
/// # Returns
/// 0 on success, 1 on error
pub fn host_register_job(
mut env: FunctionEnvMut<PluginEnv>,
job_name_ptr: WasmPtr<u8>,
job_name_len: u32,
export_fn_ptr: WasmPtr<u8>,
export_fn_len: u32,
resumable: u32,
) -> i32 {
let (plugin_env, mut store) = env.data_and_store_mut();
let memory = &plugin_env.memory;
let memory_view = memory.view(&store);
// Read job name
let job_name = match read_string_from_wasm(&memory_view, job_name_ptr, job_name_len) {
Ok(name) => name,
Err(e) => {
tracing::error!("Failed to read job name: {}", e);
return 1; // Error
}
};
// Read export function name
let export_fn = match read_string_from_wasm(&memory_view, export_fn_ptr, export_fn_len) {
Ok(name) => name,
Err(e) => {
tracing::error!("Failed to read export function name: {}", e);
return 1; // Error
}
};
let is_resumable = resumable != 0;
// Register the job synchronously (no async needed)
let result = plugin_env.job_registry.register(
plugin_env.extension_id.clone(),
job_name,
export_fn,
is_resumable,
);
match result {
Ok(()) => 0, // Success
Err(e) => {
tracing::error!("Failed to register job: {}", e);
1 // Error
}
}
}

View File

@ -0,0 +1,137 @@
//! Runtime job registry for WASM extensions
//!
//! Allows extensions to register custom job types at runtime that integrate
//! with the core job system.
use crate::infra::extension::WasmJob;
use std::collections::HashMap;
use std::sync::RwLock;
/// Metadata for a registered extension job
#[derive(Debug, Clone)]
pub struct ExtensionJobRegistration {
/// Extension ID (e.g., "finance")
pub extension_id: String,
/// Job name (e.g., "email_scan")
pub job_name: String,
/// Full qualified name (e.g., "finance:email_scan")
pub full_name: String,
/// WASM export function name (e.g., "execute_email_scan")
pub export_fn: String,
/// Whether this job supports resumption
pub resumable: bool,
}
/// Runtime registry for extension-defined jobs
pub struct ExtensionJobRegistry {
/// Map from full job name (e.g., "finance:email_scan") to registration
jobs: RwLock<HashMap<String, ExtensionJobRegistration>>,
}
impl ExtensionJobRegistry {
/// Create a new empty registry
pub fn new() -> Self {
Self {
jobs: RwLock::new(HashMap::new()),
}
}
/// Register a new extension job
pub fn register(
&self,
extension_id: String,
job_name: String,
export_fn: String,
resumable: bool,
) -> Result<(), String> {
let full_name = format!("{}:{}", extension_id, job_name);
let registration = ExtensionJobRegistration {
extension_id: extension_id.clone(),
job_name: job_name.clone(),
full_name: full_name.clone(),
export_fn: export_fn.clone(),
resumable,
};
let mut jobs = self.jobs.write().unwrap();
// Check for duplicates
if jobs.contains_key(&full_name) {
return Err(format!(
"Job '{}' is already registered by extension '{}'",
job_name, extension_id
));
}
tracing::info!("Registered extension job: {} -> {}", full_name, export_fn);
jobs.insert(full_name, registration);
Ok(())
}
/// Check if a job name is registered
pub fn has_job(&self, full_name: &str) -> bool {
self.jobs.read().unwrap().contains_key(full_name)
}
/// Get registration info for a job
pub fn get_job(&self, full_name: &str) -> Option<ExtensionJobRegistration> {
self.jobs.read().unwrap().get(full_name).cloned()
}
/// Create a WasmJob instance from a registered job name
pub fn create_wasm_job(&self, full_name: &str, state_json: String) -> Result<WasmJob, String> {
let registration = self
.get_job(full_name)
.ok_or_else(|| format!("Extension job '{}' not found", full_name))?;
Ok(WasmJob {
extension_id: registration.extension_id,
export_fn: registration.export_fn,
state_json,
is_resuming: false,
})
}
/// List all registered jobs for an extension
pub fn list_jobs_for_extension(&self, extension_id: &str) -> Vec<ExtensionJobRegistration> {
self.jobs
.read()
.unwrap()
.values()
.filter(|reg| reg.extension_id == extension_id)
.cloned()
.collect()
}
/// List all registered extension jobs
pub fn list_all_jobs(&self) -> Vec<ExtensionJobRegistration> {
self.jobs.read().unwrap().values().cloned().collect()
}
/// Unregister all jobs for an extension (called on unload)
pub fn unregister_extension_jobs(&self, extension_id: &str) -> usize {
let mut jobs = self.jobs.write().unwrap();
let before_count = jobs.len();
jobs.retain(|_, reg| reg.extension_id != extension_id);
let removed = before_count - jobs.len();
if removed > 0 {
tracing::info!(
"Unregistered {} job(s) for extension '{}'",
removed,
extension_id
);
}
removed
}
}
impl Default for ExtensionJobRegistry {
fn default() -> Self {
Self::new()
}
}

View File

@ -14,6 +14,7 @@ use wasmer::{imports, Function, FunctionEnv, Instance, Memory, Module, Store};
use crate::{context::CoreContext, infra::api::ApiDispatcher};
use super::host_functions::{self, host_spacedrive_call, host_spacedrive_log, PluginEnv};
use super::job_registry::ExtensionJobRegistry;
use super::permissions::ExtensionPermissions;
use super::types::{ExtensionManifest, LoadedPlugin};
@ -45,6 +46,7 @@ pub struct PluginManager {
plugin_dir: PathBuf,
core_context: Arc<CoreContext>,
api_dispatcher: Arc<ApiDispatcher>,
job_registry: Arc<ExtensionJobRegistry>,
}
impl PluginManager {
@ -62,9 +64,15 @@ impl PluginManager {
plugin_dir,
core_context,
api_dispatcher,
job_registry: Arc::new(ExtensionJobRegistry::new()),
}
}
/// Get the job registry for extension jobs
pub fn job_registry(&self) -> Arc<ExtensionJobRegistry> {
self.job_registry.clone()
}
/// Load a WASM plugin from directory
///
/// Expected structure:
@ -128,6 +136,7 @@ impl PluginManager {
api_dispatcher: self.api_dispatcher.clone(),
permissions,
memory: temp_memory,
job_registry: self.job_registry.clone(),
};
let env = FunctionEnv::new(&mut self.store, plugin_env);
@ -178,6 +187,13 @@ impl PluginManager {
&env,
host_functions::host_job_increment_items
),
// Extension registration functions
"register_job" => Function::new_typed_with_env(
&mut self.store,
&env,
host_functions::host_register_job
),
}
};

View File

@ -18,11 +18,13 @@
//! - `types`: Shared types and manifest format
mod host_functions;
mod job_registry;
mod manager;
mod permissions;
mod types;
mod wasm_job;
pub use job_registry::{ExtensionJobRegistration, ExtensionJobRegistry};
pub use manager::PluginManager;
pub use permissions::{ExtensionPermissions, PermissionError};
pub use types::{ExtensionManifest, PluginManifest};

View File

@ -114,17 +114,50 @@ impl JobManager {
params: serde_json::Value,
priority: JobPriority,
) -> JobResult<JobHandle> {
// Check if job type is registered
if !REGISTRY.has_job(job_name) {
return Err(JobError::NotFound(format!(
"Job type '{}' not registered",
job_name
)));
// Try core job registry first
if REGISTRY.has_job(job_name) {
// Create job instance from core registry
let erased_job = REGISTRY.create_job(job_name, params)?;
return self.dispatch_erased_job(job_name, erased_job, priority, None).await;
}
// Create job instance
let erased_job = REGISTRY.create_job(job_name, params)?;
// Check if it's an extension job (contains colon)
if job_name.contains(':') {
// Try extension job registry
if let Some(plugin_manager) = self.context.get_plugin_manager().await {
let job_registry = plugin_manager.read().await.job_registry();
if job_registry.has_job(job_name) {
// Extract state JSON from params
let state_json = serde_json::to_string(&params)
.map_err(|e| JobError::serialization(format!("Failed to serialize params: {}", e)))?;
// Create WasmJob from registry
let wasm_job = job_registry
.create_wasm_job(job_name, state_json)
.map_err(|e| JobError::NotFound(e))?;
// Dispatch the WasmJob
return self.dispatch_with_priority(wasm_job, priority, None).await;
}
}
}
// Job not found in either registry
Err(JobError::NotFound(format!(
"Job type '{}' not registered",
job_name
)))
}
/// Helper method to dispatch an erased job (extracted from dispatch_by_name)
async fn dispatch_erased_job(
&self,
job_name: &str,
erased_job: Box<dyn ErasedJob>,
priority: JobPriority,
action_context: Option<ActionContext>,
) -> JobResult<JobHandle> {
let job_id = JobId::new();
info!("Dispatching job {} ({}): {}", job_id, job_name, job_name);

View File

@ -2,7 +2,7 @@
//!
//! Tests that we can dispatch and execute WASM jobs
use sd_core::{infra::extension::WasmJob, Core};
use sd_core::Core;
use std::path::PathBuf;
use tempfile::TempDir;
@ -65,27 +65,19 @@ async fn test_dispatch_wasm_job() {
tracing::info!("✅ Extension loaded");
// 4. Create WasmJob
let wasm_job = WasmJob {
extension_id: "test-extension".to_string(),
export_fn: "execute_test_counter".to_string(),
state_json: serde_json::json!({
"current": 0,
"target": 10,
"processed": []
})
.to_string(),
is_resuming: false,
};
tracing::info!("Created WasmJob");
// 4. Dispatch job
// 4. Dispatch job by name (auto-registered as "test-extension:counter")
let job_handle = library
.jobs()
.dispatch(wasm_job)
.dispatch_by_name(
"test-extension:counter",
serde_json::json!({
"current": 0,
"target": 10,
"processed": []
}),
)
.await
.expect("Should dispatch WasmJob");
.expect("Should dispatch extension job by name");
tracing::info!("✅ WASM job dispatched: {}", job_handle.id());

View File

@ -2,41 +2,118 @@
use proc_macro::TokenStream;
use quote::quote;
use syn::{parse_macro_input, Expr, ItemStruct, Lit, Meta};
use syn::{
parse::{Parse, ParseStream},
parse_macro_input, Expr, Ident, ItemStruct, LitStr, Result, Token,
};
struct ExtensionArgs {
id: String,
name: String,
version: String,
jobs: Vec<Ident>,
}
impl Parse for ExtensionArgs {
fn parse(input: ParseStream) -> Result<Self> {
let mut id = None;
let mut name = None;
let mut version = None;
let mut jobs = Vec::new();
while !input.is_empty() {
let ident: Ident = input.parse()?;
input.parse::<Token![=]>()?;
match ident.to_string().as_str() {
"id" => {
let lit: LitStr = input.parse()?;
id = Some(lit.value());
}
"name" => {
let lit: LitStr = input.parse()?;
name = Some(lit.value());
}
"version" => {
let lit: LitStr = input.parse()?;
version = Some(lit.value());
}
"jobs" => {
let content;
syn::bracketed!(content in input);
while !content.is_empty() {
jobs.push(content.parse()?);
if content.peek(Token![,]) {
content.parse::<Token![,]>()?;
}
}
}
_ => return Err(input.error("unknown parameter")),
}
if input.peek(Token![,]) {
input.parse::<Token![,]>()?;
}
}
Ok(ExtensionArgs {
id: id.ok_or_else(|| input.error("missing id parameter"))?,
name: name.ok_or_else(|| input.error("missing name parameter"))?,
version: version.ok_or_else(|| input.error("missing version parameter"))?,
jobs,
})
}
}
pub fn extension_impl(args: TokenStream, input: TokenStream) -> TokenStream {
let input_struct = parse_macro_input!(input as ItemStruct);
let args = parse_macro_input!(args as ExtensionArgs);
// Parse attributes manually for syn 2.0
let parser = syn::meta::parser(|meta| {
// We'll extract what we need here
Ok(())
let ext_id = &args.id;
let ext_name = &args.name;
let ext_version = &args.version;
// Generate job registration code
let job_registrations = args.jobs.iter().map(|job_fn| {
let register_fn = quote::format_ident!("__register_{}", job_fn);
quote! {
{
let (name, export_fn, resumable) = #register_fn();
::spacedrive_sdk::ffi::log_info(&format!("Registering job: {}", name));
if let Err(_) = ::spacedrive_sdk::ffi::register_job_with_host(
name,
export_fn,
resumable
) {
::spacedrive_sdk::ffi::log_error(&format!("Failed to register job: {}", name));
return 1;
}
}
}
});
let _ = syn::parse::Parser::parse(parser, args);
// For now, use default values
// TODO: Properly parse attributes with syn 2.0 API
let ext_id = "test-beautiful";
let ext_name = "Test Extension (Beautiful API)";
let ext_version = "0.1.0";
let struct_name = &input_struct.ident;
let expanded = quote! {
#input_struct
// Generate plugin_init
// Generate plugin_init with auto-registration
#[no_mangle]
pub extern "C" fn plugin_init() -> i32 {
::spacedrive_sdk::ffi::log_info(&format!(
"{} v{} initializing...",
#ext_name,
#ext_version
));
// Register all jobs
#(#job_registrations)*
::spacedrive_sdk::ffi::log_info(&format!(
"✓ {} v{} initialized!",
#ext_name,
#ext_version
));
// TODO: Auto-register jobs/queries/actions here
0 // Success
}
@ -49,16 +126,7 @@ pub fn extension_impl(args: TokenStream, input: TokenStream) -> TokenStream {
));
0 // Success
}
// Extension metadata (for manifest generation)
#[cfg(feature = "manifest")]
pub const EXTENSION_METADATA: ExtensionMetadata = ExtensionMetadata {
id: #ext_id,
name: #ext_name,
version: #ext_version,
};
};
TokenStream::from(expanded)
}

View File

@ -2,11 +2,29 @@
use proc_macro::TokenStream;
use quote::quote;
use syn::{parse_macro_input, FnArg, ItemFn, Type};
use syn::{parse_macro_input, FnArg, ItemFn, LitStr, Type};
pub fn job_impl(_args: TokenStream, input: TokenStream) -> TokenStream {
pub fn job_impl(args: TokenStream, input: TokenStream) -> TokenStream {
let input_fn = parse_macro_input!(input as ItemFn);
// Parse job name from args if provided
let job_name: Option<String> = if !args.is_empty() {
// Try to parse as name = "value"
let args_str = args.to_string();
if let Some(name_value) = args_str.strip_prefix("name = \"") {
if let Some(name) = name_value.strip_suffix("\"") {
Some(name.to_string())
} else {
None
}
} else {
// Try direct string literal
syn::parse::<LitStr>(args).ok().map(|lit| lit.value())
}
} else {
None
};
// Extract function info
let fn_name = &input_fn.sig.ident;
let fn_attrs = &input_fn.attrs;
@ -18,11 +36,31 @@ pub fn job_impl(_args: TokenStream, input: TokenStream) -> TokenStream {
// Expected signature: async fn name(ctx: &JobContext, state: &mut State) -> Result<()>
let state_type = extract_state_type(&input_fn);
// Generate registration function if job name is provided
let registration_fn = if let Some(ref name) = job_name {
let export_name_str = export_name.to_string();
let register_fn_name = syn::Ident::new(&format!("__register_{}", fn_name), fn_name.span());
quote! {
// Generate a registration helper function
// This will be called by plugin_init
#[doc(hidden)]
pub fn #register_fn_name() -> (&'static str, &'static str, bool) {
(#name, #export_name_str, true)
}
}
} else {
quote! {}
};
let expanded = quote! {
// Keep original function for internal use
#(#fn_attrs)*
#input_fn
// Registration helper function
#registration_fn
// Generate FFI export
#[no_mangle]
pub extern "C" fn #export_name(

View File

@ -6,6 +6,13 @@
#[link(wasm_import_module = "spacedrive")]
extern "C" {
fn spacedrive_log(level: u32, msg_ptr: *const u8, msg_len: usize);
fn register_job(
job_name_ptr: *const u8,
job_name_len: u32,
export_fn_ptr: *const u8,
export_fn_len: u32,
resumable: u32,
) -> i32;
}
/// Log a message (info level)
@ -52,3 +59,24 @@ pub extern "C" fn wasm_free(ptr: *mut u8, size: i32) {
unsafe { std::alloc::dealloc(ptr, layout) };
}
}
/// Register a job with the extension system
///
/// Called automatically by #[extension] macro during plugin_init()
pub fn register_job_with_host(job_name: &str, export_fn: &str, resumable: bool) -> Result<(), ()> {
let result = unsafe {
register_job(
job_name.as_ptr(),
job_name.len() as u32,
export_fn.as_ptr(),
export_fn.len() as u32,
if resumable { 1 } else { 0 },
)
};
if result == 0 {
Ok(())
} else {
Err(())
}
}

View File

@ -7,9 +7,15 @@ use spacedrive_sdk::prelude::*;
use spacedrive_sdk::{extension, job};
// Extension Definition
// The #[extension] macro generates plugin initialization and cleanup functions.
// The #[extension] macro generates plugin_init() and plugin_cleanup().
// List jobs in the jobs parameter for automatic registration.
#[extension(id = "test-extension", name = "Test Extension", version = "0.1.0")]
#[extension(
id = "test-extension",
name = "Test Extension",
version = "0.1.0",
jobs = [test_counter],
)]
struct TestExtension;
// Job State Definition
@ -24,8 +30,9 @@ pub struct CounterState {
// Job Implementation
// The #[job] macro handles FFI bindings, serialization, and error handling.
// The name parameter enables automatic registration (extension-id:name format).
#[job]
#[job(name = "counter")]
fn test_counter(ctx: &JobContext, state: &mut CounterState) -> Result<()> {
ctx.log(&format!(
"Starting counter (current: {}, target: {})",