diff --git a/.gitignore b/.gitignore index c444d1c13..1f6dc1ac6 100644 --- a/.gitignore +++ b/.gitignore @@ -25,3 +25,4 @@ CODEGEN_LOCK blueprint.json blueprint*.json blueprint.lock +/blueprints/periodic-web-poller/node_modules/ diff --git a/Cargo.lock b/Cargo.lock index 3fc0a5572..f33ea97a5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8228,6 +8228,29 @@ version = "2.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e3148f5046208a5d56bcfc03053e3ca6334e51da8dfb19b6cdc8b306fae3283e" +[[package]] +name = "periodic-web-poller-blueprint" +version = "0.1.1" +dependencies = [ + "async-trait", + "blueprint-metadata", + "color-eyre", + "ed25519-zebra 4.0.3", + "gadget-sdk", + "hex", + "k256", + "lock_api", + "parking_lot", + "reqwest 0.12.8", + "serde_json", + "sp-core 34.0.0", + "structopt", + "subxt-signer", + "tokio", + "tokio-util", + "tracing", +] + [[package]] name = "pest" version = "2.7.13" diff --git a/Cargo.toml b/Cargo.toml index c9800dc6d..5e972176a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,6 +4,7 @@ members = [ "blueprint-metadata", "blueprints/incredible-squaring", "blueprints/incredible-squaring-eigenlayer", + "blueprints/periodic-web-poller", "cli", "gadget-io", "blueprint-test-utils", @@ -47,6 +48,7 @@ blueprint-test-utils = { path = "./blueprint-test-utils" } gadget-sdk = { path = "./sdk", default-features = false, version = "0.1.2" } incredible-squaring-blueprint-eigenlayer = { path = "./blueprints/incredible-squaring-eigenlayer", default-features = false, version = "0.1.1" } +periodic-web-poller-blueprint = { path = "./blueprints/periodic-web-poller", default-features = false, version = "0.1.1" } gadget-blueprint-proc-macro = { path = "./macros/blueprint-proc-macro", default-features = false, version = "0.1.2" } gadget-blueprint-proc-macro-core = { path = "./macros/blueprint-proc-macro-core", default-features = false, version = "0.1.2" } gadget-context-derive = { path = "./macros/context-derive", default-features = false, version = "0.1.1" } diff --git a/blueprints/incredible-squaring-eigenlayer/src/lib.rs b/blueprints/incredible-squaring-eigenlayer/src/lib.rs index 5c86d02d4..32e479577 100644 --- a/blueprints/incredible-squaring-eigenlayer/src/lib.rs +++ b/blueprints/incredible-squaring-eigenlayer/src/lib.rs @@ -65,12 +65,15 @@ impl Config for NodeConfig { id = 1, params(number_to_be_squared, task_created_block, quorum_numbers, quorum_threshold_percentage), result(_), - event_listener(EvmContractEventListener( - instance = IncredibleSquaringTaskManager, + event_listener( + listener = EvmContractEventListener( + instance = IncredibleSquaringTaskManager, + event = IncredibleSquaringTaskManager::NewTaskCreated, + event_converter = convert_event_to_inputs, + callback = IncredibleSquaringTaskManager::IncredibleSquaringTaskManagerCalls::respondToTask + ), event = IncredibleSquaringTaskManager::NewTaskCreated, - event_converter = convert_event_to_inputs, - callback = IncredibleSquaringTaskManager::IncredibleSquaringTaskManagerCalls::respondToTask - )), + ), )] pub async fn xsquare_eigen( number_to_be_squared: U256, diff --git a/blueprints/incredible-squaring/src/lib.rs b/blueprints/incredible-squaring/src/lib.rs index 8a3a25973..b3e1fd7ef 100644 --- a/blueprints/incredible-squaring/src/lib.rs +++ b/blueprints/incredible-squaring/src/lib.rs @@ -6,7 +6,10 @@ use std::convert::Infallible; id = 0, params(x), result(_), - event_listener(TangleEventListener::), + event_listener( + listener = TangleEventListener, + event = JobCalled, + ), verifier(evm = "IncredibleSquaringBlueprint") )] pub fn xsquare(x: u64) -> Result { diff --git a/blueprints/periodic-web-poller/Cargo.toml b/blueprints/periodic-web-poller/Cargo.toml new file mode 100644 index 000000000..67cc66240 --- /dev/null +++ b/blueprints/periodic-web-poller/Cargo.toml @@ -0,0 +1,35 @@ +[package] +name = "periodic-web-poller-blueprint" +version = "0.1.1" +description = "A Simple Blueprint to demo how to run blueprints dependent on an arbitrary events" +authors.workspace = true +edition.workspace = true +license.workspace = true +homepage.workspace = true +repository.workspace = true +publish = false + +[dependencies] +tracing = { workspace = true } +async-trait = { workspace = true } +gadget-sdk = { workspace = true, features = ["std"] } +color-eyre = { workspace = true } +lock_api = { workspace = true } +tokio = { workspace = true, default-features = false, features = ["full"] } +tokio-util = { workspace = true } +sp-core = { workspace = true } +subxt-signer = { workspace = true, features = ["sr25519", "subxt", "std"] } +parking_lot = { workspace = true } +ed25519-zebra = { workspace = true, features = ["pkcs8", "default", "der", "std", "serde", "pem"] } +structopt = { workspace = true } +hex = { workspace = true } +k256 = { workspace = true } +serde_json = { workspace = true } +reqwest = { workspace = true } + +[build-dependencies] +blueprint-metadata = { workspace = true } + +[features] +default = ["std"] +std = [] diff --git a/blueprints/periodic-web-poller/build.rs b/blueprints/periodic-web-poller/build.rs new file mode 100644 index 000000000..ebfbe1a37 --- /dev/null +++ b/blueprints/periodic-web-poller/build.rs @@ -0,0 +1,6 @@ +fn main() { + println!("cargo:rerun-if-changed=src/*"); + println!("cargo:rerun-if-changed=src/lib.rs"); + println!("cargo:rerun-if-changed=src/main.rs"); + blueprint_metadata::generate_json(); +} diff --git a/blueprints/periodic-web-poller/src/lib.rs b/blueprints/periodic-web-poller/src/lib.rs new file mode 100644 index 000000000..1a25e095b --- /dev/null +++ b/blueprints/periodic-web-poller/src/lib.rs @@ -0,0 +1,83 @@ +use async_trait::async_trait; +use gadget_sdk::event_listener::periodic::PeriodicEventListener; +use gadget_sdk::event_listener::EventListener; +use gadget_sdk::job; +use std::convert::Infallible; + +#[job( + id = 0, + params(value), + result(_), + event_listener( + listener = PeriodicEventListener<2000, WebPoller, serde_json::Value, reqwest::Client>, + event = serde_json::Value, + pre_processor = pre_process, + post_processor = post_process, + ), +)] +// Maps a boolean value obtained from pre-processing to a u8 value +pub async fn web_poller(value: bool, client: reqwest::Client) -> Result { + gadget_sdk::info!("Running web_poller on value: {value}"); + Ok(value as u8) +} + +// Maps a JSON response to a boolean value +async fn pre_process(event: serde_json::Value) -> Result { + gadget_sdk::info!("Running web_poller pre-processor on value: {event}"); + let completed = event["completed"].as_bool().unwrap_or(false); + Ok(completed) +} + +// Received the u8 value output from the job and performs any last post-processing +async fn post_process(job_output: u8) -> Result<(), gadget_sdk::Error> { + gadget_sdk::info!("Running web_poller post-processor on value: {job_output}"); + if job_output == 1 { + Ok(()) + } else { + Err(gadget_sdk::Error::Other( + "Job failed since query returned with a false status".to_string(), + )) + } +} + +/// Define an event listener that polls a webserver +pub struct WebPoller { + pub client: reqwest::Client, +} + +#[async_trait] +impl EventListener for WebPoller { + async fn new(context: &reqwest::Client) -> Result + where + Self: Sized, + { + Ok(Self { + client: context.clone(), + }) + } + + /// Implement the logic that polls the web server + async fn next_event(&mut self) -> Option { + // Send a GET request to the JSONPlaceholder API + let response = self + .client + .get("https://jsonplaceholder.typicode.com/todos/10") + .send() + .await + .ok()?; + + // Check if the request was successful + if response.status().is_success() { + // Parse the JSON response + let resp: serde_json::Value = response.json().await.ok()?; + Some(resp) + } else { + None + } + } + + /// Implement any handler logic when an event is received + async fn handle_event(&mut self, _event: serde_json::Value) -> Result<(), gadget_sdk::Error> { + unreachable!("Not called here") + } +} diff --git a/blueprints/periodic-web-poller/src/main.rs b/blueprints/periodic-web-poller/src/main.rs new file mode 100644 index 000000000..dcdaf3d38 --- /dev/null +++ b/blueprints/periodic-web-poller/src/main.rs @@ -0,0 +1,21 @@ +use color_eyre::Result; +use gadget_sdk::info; +use gadget_sdk::job_runner::MultiJobRunner; +use periodic_web_poller_blueprint as blueprint; + +#[gadget_sdk::main] +async fn main() { + let web_poller = blueprint::WebPollerEventHandler { + client: reqwest::Client::new(), + }; + + info!("~~~ Executing the periodic web poller ~~~"); + MultiJobRunner::new(None) + .with_job() + .finish(web_poller) + .run() + .await?; + + info!("Exiting..."); + Ok(()) +} diff --git a/cli/src/signer.rs b/cli/src/signer.rs index de87e1a59..03a4cdf39 100644 --- a/cli/src/signer.rs +++ b/cli/src/signer.rs @@ -52,7 +52,7 @@ pub fn load_signer_from_env() -> Result { }) .note(SURI_HELP_MSG)?; - let sp_core_keypair = sp_core::sr25519::Pair::from_string(&*s, None)?; + let sp_core_keypair = sp_core::sr25519::Pair::from_string(&s, None)?; Ok(TanglePairSigner::new(sp_core_keypair)) } diff --git a/docs/event_listeners.md b/docs/event_listeners.md index 29076297c..b84af184e 100644 --- a/docs/event_listeners.md +++ b/docs/event_listeners.md @@ -177,6 +177,7 @@ pub struct WebPoller { Then, implement `EventListener` for `WebPoller`: ```rust +#[async_trait::async_trait] impl EventListener for WebPoller { /// Build the event listener. Note that this time, we don't necessarily need the context async fn new(_context: &MyContext) -> Result @@ -217,17 +218,16 @@ Finally, register the event listener inside the `job` macro using `event_listene ```rust #[job( id = 0, - params(x), + params(value), result(_), event_listener(PeriodicEventListener::<6000, WebPoller, serde_json::Value, MyContext>), // <-- Register the event listener here - verifier(evm = "IncredibleSquaringBlueprint") )] pub fn hello_event_listener( - x: u64, + value: serde_json::Value, context: MyContext, - env: GadgetConfiguration, -) -> Result { - Ok(x.saturating_pow(2u32)) +) -> Result { + let completed = value["completed"].as_bool().unwrap_or(false); + Ok(completed) } ``` diff --git a/macros/blueprint-proc-macro-playground/src/lib.rs b/macros/blueprint-proc-macro-playground/src/lib.rs index 18ae40e50..823e2f668 100644 --- a/macros/blueprint-proc-macro-playground/src/lib.rs +++ b/macros/blueprint-proc-macro-playground/src/lib.rs @@ -28,7 +28,7 @@ pub struct MyContext; // ================== /// Simple Threshold (t) Keygen Job for n parties. -#[job(id = 0, params(n, t), event_listener(TangleEventListener), result(_))] +#[job(id = 0, params(n, t), event_listener(listener = TangleEventListener, event = JobCalled), result(_))] pub fn keygen(ctx: &MyContext, n: u16, t: u16) -> Result, Error> { let _ = (n, t, ctx); Ok(vec![0; 33]) @@ -38,7 +38,7 @@ pub fn keygen(ctx: &MyContext, n: u16, t: u16) -> Result, Error> { #[job( id = 1, params(keygen_id, data), - event_listener(TangleEventListener), + event_listener(listener = TangleEventListener, event = JobCalled), result(_) )] pub async fn sign(keygen_id: u64, data: Vec) -> Result, Error> { @@ -49,7 +49,7 @@ pub async fn sign(keygen_id: u64, data: Vec) -> Result, Error> { #[job( id = 2, params(keygen_id, new_t), - event_listener(TangleEventListener), + event_listener(listener = TangleEventListener, event = JobCalled), result(_) )] pub fn refresh(keygen_id: u64, new_t: Option) -> Result, Error> { @@ -58,7 +58,7 @@ pub fn refresh(keygen_id: u64, new_t: Option) -> Result, Error> { } /// Say hello to someone or the world. -#[job(id = 3, params(who), event_listener(TangleEventListener), result(_))] +#[job(id = 3, params(who), event_listener(listener = TangleEventListener, event = JobCalled), result(_))] pub fn say_hello(who: Option) -> Result { match who { Some(who) => Ok(format!("Hello, {}!", who)), @@ -84,7 +84,7 @@ pub fn on_request(nft_id: u64); #[report( job_id = 0, params(n, t, msgs), - event_listener(TangleEventListener), + event_listener(listener = TangleEventListener, event = JobCalled), result(_), report_type = "job", verifier(evm = "KeygenContract") @@ -96,7 +96,7 @@ fn report_keygen(n: u16, t: u16, msgs: Vec>) -> u32 { #[report( params(uptime, response_time, error_rate), - event_listener(TangleEventListener), + event_listener(listener = TangleEventListener, event = JobCalled), result(Vec), report_type = "qos", interval = 3600, diff --git a/macros/blueprint-proc-macro/src/job.rs b/macros/blueprint-proc-macro/src/job.rs index 02eefc621..ac7b50cc0 100644 --- a/macros/blueprint-proc-macro/src/job.rs +++ b/macros/blueprint-proc-macro/src/job.rs @@ -7,8 +7,8 @@ use proc_macro::TokenStream; use quote::{format_ident, quote, ToTokens}; use std::collections::HashSet; use syn::ext::IdentExt; -use syn::parse::{Parse, ParseStream}; -use syn::{Ident, ItemFn, LitInt, LitStr, PathArguments, Token, Type, TypePath}; +use syn::parse::{Parse, ParseBuffer, ParseStream}; +use syn::{Ident, Index, ItemFn, LitInt, LitStr, Token, Type}; /// Defines custom keywords for defining Job arguments mod kw { @@ -18,6 +18,9 @@ mod kw { syn::custom_keyword!(verifier); syn::custom_keyword!(evm); syn::custom_keyword!(event_listener); + syn::custom_keyword!(listener); + syn::custom_keyword!(pre_processor); + syn::custom_keyword!(post_processor); syn::custom_keyword!(protocol); syn::custom_keyword!(instance); syn::custom_keyword!(event); @@ -82,12 +85,12 @@ pub(crate) fn job_impl(args: &JobArgs, input: &ItemFn) -> syn::Result syn::Result (Vec, Vec) { let (event_handler_args, event_handler_arg_types) = get_event_handler_args(param_types, params); + let (_, _, struct_name) = generate_fn_name_and_struct(input, suffix); // Generate Event Listener, if not being skipped let mut event_listener_calls = vec![]; let event_listener_gen = if skip_codegen { vec![proc_macro2::TokenStream::default()] } else { - match &event_listeners.listeners { - Some(ref listeners) => { - let mut all_listeners = vec![]; - for (idx, listener_meta) in listeners.iter().enumerate() { - let listener_function_name = format_ident!( - "run_listener_{}_{}{}", - fn_name_string, - idx, - suffix.to_lowercase() - ); - let is_tangle = listener_meta.evm_args.is_none(); - // convert the listener var, which is just a struct name, to an ident - let listener = listener_meta.listener.to_token_stream(); - // if Listener == TangleEventListener or EvmContractEventListener, we need to use defaults - let listener_str = listener.to_string(); - let (_, _, struct_name) = generate_fn_name_and_struct(input, suffix); - - let type_args = if is_tangle { - proc_macro2::TokenStream::default() - } else { - quote! { } - }; + let mut all_listeners = vec![]; + for (idx, listener_meta) in event_listeners.listeners.iter().enumerate() { + let listener_function_name = format_ident!( + "run_listener_{}_{}{}", + fn_name_string, + idx, + suffix.to_lowercase() + ); + let is_tangle = listener_meta.evm_args.is_none(); + // convert the listener var, which is just a struct name, to an ident + let listener = listener_meta.listener.to_token_stream(); + // if Listener == TangleEventListener or EvmContractEventListener, we need to use defaults + let listener_str = listener.to_string(); + + let type_args = if is_tangle { + proc_macro2::TokenStream::default() + } else { + quote! { } + }; - let bounded_type_args = if is_tangle { - proc_macro2::TokenStream::default() - } else { - quote! { } - }; + let bounded_type_args = if is_tangle { + proc_macro2::TokenStream::default() + } else { + quote! { } + }; - let autogen_struct_name = quote! { #struct_name #type_args }; + let autogen_struct_name = quote! { #struct_name #type_args }; - // Check for special cases - let next_listener = if listener_str.contains("TangleEventListener") - || listener_str.contains("EvmContractEventListener") - { - // How to inject not just this event handler, but all event handlers here? - let wrapper = if is_tangle { - quote! { - gadget_sdk::event_listener::TangleEventWrapper<_> - } - } else { - quote! { - gadget_sdk::event_listener::EthereumHandlerWrapper<#autogen_struct_name, _> - } - }; + // Check for special cases + let next_listener = if listener_str.contains("TangleEventListener") + || listener_str.contains("EvmContractEventListener") + { + // How to inject not just this event handler, but all event handlers here? + let wrapper = if is_tangle { + quote! { + gadget_sdk::event_listener::TangleEventWrapper<_> + } + } else { + quote! { + gadget_sdk::event_listener::EthereumHandlerWrapper<#autogen_struct_name, _> + } + }; - let ctx_create = if is_tangle { - quote! { - (ctx.client.clone(), std::sync::Arc::new(ctx.clone()) as gadget_sdk::events_watcher::substrate::EventHandlerFor) - } - } else { - quote! { - (ctx.contract.clone(), std::sync::Arc::new(ctx.clone()) as std::sync::Arc<#autogen_struct_name>) - } - }; + let ctx_create = if is_tangle { + quote! { + (ctx.client.clone(), std::sync::Arc::new(ctx.clone()) as gadget_sdk::events_watcher::substrate::EventHandlerFor) + } + } else { + quote! { + (ctx.contract.clone(), std::sync::Arc::new(ctx.clone()) as std::sync::Arc<#autogen_struct_name>) + } + }; - if event_listener_calls.is_empty() { - event_listener_calls.push(quote! { - let mut listeners = vec![]; - }); - } + if event_listener_calls.is_empty() { + event_listener_calls.push(quote! { + let mut listeners = vec![]; + }); + } - event_listener_calls.push(quote! { + event_listener_calls.push(quote! { listeners.push(#listener_function_name(&self).await.expect("Event listener already initialized")); }); - quote! { - async fn #listener_function_name #bounded_type_args(ctx: &#autogen_struct_name) -> Option>>{ - static ONCE: std::sync::atomic::AtomicBool = std::sync::atomic::AtomicBool::new(false); - if !ONCE.load(std::sync::atomic::Ordering::Relaxed) { - ONCE.store(true, std::sync::atomic::Ordering::Relaxed); - let (tx, rx) = gadget_sdk::tokio::sync::oneshot::channel(); - let ctx = #ctx_create; - let mut instance = <#wrapper as gadget_sdk::event_listener::EventListener::<_, _>>::new(&ctx).await.expect("Failed to create event listener"); - let task = async move { - let res = gadget_sdk::event_listener::EventListener::<_, _>::execute(&mut instance).await; - let _ = tx.send(res); - }; - gadget_sdk::tokio::task::spawn(task); - return Some(rx) - } - - None - } + quote! { + async fn #listener_function_name #bounded_type_args(ctx: &#autogen_struct_name) -> Option>>{ + static ONCE: std::sync::atomic::AtomicBool = std::sync::atomic::AtomicBool::new(false); + if !ONCE.load(std::sync::atomic::Ordering::Relaxed) { + ONCE.store(true, std::sync::atomic::Ordering::Relaxed); + let (tx, rx) = gadget_sdk::tokio::sync::oneshot::channel(); + let ctx = #ctx_create; + let mut instance = <#wrapper as gadget_sdk::event_listener::EventListener::<_, _>>::new(&ctx).await.expect("Failed to create event listener"); + let task = async move { + let res = gadget_sdk::event_listener::EventListener::<_, _>::execute(&mut instance).await; + let _ = tx.send(res); + }; + gadget_sdk::tokio::task::spawn(task); + return Some(rx) } + + None + } + } + } else { + // Generate the variable that we are passing as the context into EventListener::create(&mut ctx) + // We assume the first supplied event handler arg is the context we are injecting into the event listener + let (context, field_in_self) = event_handler_args + .first() + .map(|ctx| (quote! {self}, (*ctx).clone())) + .expect("No context found"); + + let context_ty = event_handler_arg_types + .first() + .map(|_ty| quote! {#struct_name}) + .unwrap_or_default(); + + if event_listener_calls.is_empty() { + event_listener_calls.push(quote! { + let mut listeners = vec![]; + }); + } + + event_listener_calls.push(quote! { + listeners.push(#listener_function_name(&#context).await.expect("Event listener already initialized")); + }); + + let event_type = &listener_meta.event; + // The event type is what gets sent through the pre_processor_function. + + let pre_processor_function = + if let Some(preprocessor) = &listener_meta.pre_processor { + quote! { #preprocessor } } else { - // Generate the variable that we are passing as the context into EventListener::create(&mut ctx) - // We assume the first supplied event handler arg is the context we are injecting into the event listener - let context = event_handler_args - .first() - .map(|ctx| quote! {self.#ctx}) - .unwrap_or_default(); - - let context_ty = event_handler_arg_types - .first() - .map(|ty| quote! {#ty}) - .unwrap_or_default(); - - if event_listener_calls.is_empty() { - event_listener_calls.push(quote! { - let mut listeners = vec![]; - }); - } + // identity transformation + quote! { |evt| async move { Ok(evt) } } + }; - event_listener_calls.push(quote! { - listeners.push(#listener_function_name(&#context).await.expect("Event listener already initialized")); - }); + // The job_processor is just the job function. Since it may contain multiple params, we need a new function to call it. + let fn_name_ident = &input.sig.ident; + let ordered_inputs = + get_fn_call_ordered(param_types, params, Some(quote! { CTX.get().unwrap() })); + let asyncness = get_asyncness(input); + // The below assumes param0 IS the event streamed from the event listener + let job_processor_wrapper = quote! { + move |param0| async move { + #fn_name_ident (#(#ordered_inputs)*) #asyncness .map_err(|err| gadget_sdk::Error::Other(err.to_string())) + } + }; + let post_processor_function = + if let Some(postprocessor) = &listener_meta.post_processor { + quote! { #postprocessor } + } else { + // no-op default + quote! { |_evt| async move { Ok(()) } } + }; - quote! { - async fn #listener_function_name(ctx: &#context_ty) -> Option>> { - static ONCE: std::sync::atomic::AtomicBool = std::sync::atomic::AtomicBool::new(false); - if !ONCE.load(std::sync::atomic::Ordering::Relaxed) { - ONCE.store(true, std::sync::atomic::Ordering::Relaxed); - let (tx, rx) = gadget_sdk::tokio::sync::oneshot::channel(); - let mut instance = <#listener as gadget_sdk::event_listener::EventListener<_, _>>::new(ctx).await.expect("Failed to create event listener"); - let task = async move { - let res = gadget_sdk::event_listener::EventListener::execute(&mut instance).await; - let _ = tx.send(res); - }; - gadget_sdk::tokio::task::spawn(task); - return Some(rx) - } - - None + quote! { + async fn #listener_function_name(ctx: &#context_ty) -> Option>> { + static ONCE: std::sync::atomic::AtomicBool = std::sync::atomic::AtomicBool::new(false); + if !ONCE.load(std::sync::atomic::Ordering::Relaxed) { + ONCE.store(true, std::sync::atomic::Ordering::Relaxed); + let (tx, rx) = gadget_sdk::tokio::sync::oneshot::channel(); + static CTX: gadget_sdk::tokio::sync::OnceCell<#context_ty> = gadget_sdk::tokio::sync::OnceCell::const_new(); + if let Err(_err) = CTX.set(ctx.clone()) { + gadget_sdk::error!("Failed to set the context"); + return None; } + let job_processor = #job_processor_wrapper; + + let listener = <#listener as gadget_sdk::event_listener::EventListener<#event_type, _>>::new(&ctx. #field_in_self).await.expect("Failed to create event listener"); + let mut event_workflow = gadget_sdk::event_listener::executor::EventFlowWrapper::new( + listener, + #pre_processor_function, + job_processor, + #post_processor_function, + ); + + let task = async move { + let res = gadget_sdk::event_listener::executor::EventFlowExecutor::event_loop(&mut event_workflow).await; + let _ = tx.send(res); + }; + gadget_sdk::tokio::task::spawn(task); + return Some(rx) } - }; - all_listeners.push(next_listener); + None + } } + }; - all_listeners - } - None => vec![proc_macro2::TokenStream::default()], + all_listeners.push(next_listener); } + + all_listeners }; if event_listener_calls.is_empty() { @@ -396,6 +438,7 @@ pub fn generate_autogen_struct( let mut type_params_bounds = proc_macro2::TokenStream::default(); let mut type_params = proc_macro2::TokenStream::default(); + // Even if multiple tangle listeners, we only need this once if job_args.event_listener.has_tangle() { required_fields.push(quote! { pub service_id: u64, @@ -404,6 +447,7 @@ pub fn generate_autogen_struct( }) } + // Even if multiple evm listeners, we only need this once if job_args.event_listener.has_evm() { let (_, _, instance_wrapper_name, _) = get_evm_instance_data(&job_args.event_listener); @@ -417,6 +461,7 @@ pub fn generate_autogen_struct( } let combined_event_listener = generate_combined_event_listener_selector(&struct_name); + quote! { /// Event handler for the function #[doc = "[`"] @@ -440,21 +485,12 @@ pub fn generate_autogen_struct( } } -/// Generates the [`EventHandler`](gadget_sdk::events_watcher::evm::EventHandler) for a Job -#[allow(clippy::too_many_lines)] -pub fn generate_event_handler_for( - input: &ItemFn, - job_args: &JobArgs, +pub fn get_fn_call_ordered( param_types: &IndexMap, - params: &[FieldType], - result: &[FieldType], - suffix: &str, -) -> proc_macro2::TokenStream { - let (fn_name, _fn_name_string, struct_name) = generate_fn_name_and_struct(input, suffix); - let job_id = &job_args.id; - let event_listener_args = &job_args.event_listener; - - let (event_handler_args, _) = get_event_handler_args(param_types, &job_args.params); + params_from_job_args: &[Ident], + replacement_for_self: Option, +) -> Vec { + let (event_handler_args, _) = get_event_handler_args(param_types, params_from_job_args); let additional_var_indexes = event_handler_args .iter() @@ -463,7 +499,8 @@ pub fn generate_event_handler_for( // This has all params let mut job_var_idx = 0; - let fn_call_ordered = param_types + let this = replacement_for_self.unwrap_or_else(|| quote! { self }); + param_types .iter() .enumerate() .map(|(pos_in_all_args, (ident, ty))| { @@ -483,37 +520,41 @@ pub fn generate_event_handler_for( }; if is_ref && is_ref_mut { - quote! { &mut self.#ident, } + quote! { &mut #this .#ident, } } else if is_ref { - quote! { &self.#ident, } - } else { - quote! { self.#ident.clone(), } - } - }) - .collect::>(); - - let params_tokens = params - .iter() - .enumerate() - .map(|(i, t)| { - let ident = format_ident!("param{i}"); - let index = syn::Index::from(i); - // TODO: support multiple evm listeners - if event_listener_args.has_evm() { - quote! { - let #ident = inputs.#index; - } + quote! { &#this .#ident, } } else { - crate::tangle::field_type_to_param_token(&ident, t) + quote! { #this .#ident.clone(), } } }) - .collect::>(); + .collect::>() +} - let asyncness = if input.sig.asyncness.is_some() { +fn get_asyncness(input: &ItemFn) -> proc_macro2::TokenStream { + if input.sig.asyncness.is_some() { quote! {.await} } else { quote! {} - }; + } +} + +/// Generates the [`EventHandler`](gadget_sdk::events_watcher::evm::EventHandler) for a Job +#[allow(clippy::too_many_lines)] +pub fn generate_additional_logic( + input: &ItemFn, + job_args: &JobArgs, + param_types: &IndexMap, + params: &[FieldType], + results: &[FieldType], + suffix: &str, +) -> proc_macro2::TokenStream { + let (fn_name, _fn_name_string, struct_name) = generate_fn_name_and_struct(input, suffix); + let job_id = &job_args.id; + let event_listener_args = &job_args.event_listener; + let params_tokens = job_args.event_listener.get_param_name_tokenstream(params); + let fn_call_ordered = get_fn_call_ordered(param_types, &job_args.params, None); + + let asyncness = get_asyncness(input); let fn_call = quote! { let job_result = match #fn_name( @@ -528,46 +569,24 @@ pub fn generate_event_handler_for( }; }; - let result_tokens = if result.len() == 1 { - let ident = format_ident!("job_result"); - if event_listener_args.has_evm() { - vec![quote! { let #ident = job_result; }] - } else { - vec![crate::tangle::field_type_to_result_token( - &ident, &result[0], - )] + let result_tokens = job_args + .event_listener + .get_param_result_tokenstream(results); + + match job_args.event_listener.get_event_listener().listener_type { + ListenerType::Evm => { + generate_evm_event_handler(&struct_name, event_listener_args, ¶ms_tokens, &fn_call) } - } else { - result - .iter() - .enumerate() - .map(|(i, t)| { - let ident = format_ident!("result_{i}"); - if event_listener_args.has_evm() { - quote! { - let #ident = job_result[#i]; - } - } else { - let s = crate::tangle::field_type_to_result_token(&ident, t); - quote! { - let #ident = job_result[#i]; - #s - } - } - }) - .collect::>() - }; - if event_listener_args.has_evm() { - generate_evm_event_handler(&struct_name, event_listener_args, ¶ms_tokens, &fn_call) - } else { - generate_tangle_event_handler( + ListenerType::Tangle => generate_tangle_event_handler( &struct_name, job_id, ¶ms_tokens, &result_tokens, &fn_call, - ) + ), + + ListenerType::Custom => proc_macro2::TokenStream::default(), } } @@ -603,7 +622,7 @@ impl Parse for JobArgs { let mut id = None; let mut verifier = Verifier::None; let mut skip_codegen = false; - let mut event_listener = EventListenerArgs { listeners: None }; + let mut event_listener = EventListenerArgs { listeners: vec![] }; while !input.is_empty() { let lookahead = input.lookahead1(); @@ -767,7 +786,7 @@ enum Verifier { /// `#[job(event_listener(MyCustomListener, MyCustomListener2)]` /// Accepts an optional argument that specifies the event listener to use that implements EventListener pub(crate) struct EventListenerArgs { - listeners: Option>, + listeners: Vec, } #[derive(Debug, Eq, PartialEq)] @@ -779,25 +798,42 @@ pub enum ListenerType { #[derive(Debug)] pub(crate) struct SingleListener { - pub listener: TypePath, + pub listener: Type, pub evm_args: Option, - pub handler: Option, pub listener_type: ListenerType, + pub event: Type, + pub post_processor: Option, + pub pre_processor: Option, } -// TODO: Add support for below in order to allow postprocessing hooks -#[derive(Debug)] -#[allow(dead_code)] -pub struct SpecialHandlerArgs { - pub event_type: proc_macro2::TokenStream, - pub event_handler: Option, +/// Extracts a value from form: "tag = value" +fn extract_x_equals_y( + content: &ParseBuffer, + required: bool, + name: &str, +) -> syn::Result> { + if content.peek(Token![,]) { + let _ = content.parse::()?; + } + + if content.parse::().is_err() { + if required { + panic!("Expected keyword {name}, none supplied") + } else { + return Ok(None); + } + } + + if !content.peek(Token![=]) { + panic!("Expected = after variable {name}") + } + + let _ = content.parse::()?; + + let listener = content.parse::

()?; + Ok(Some(listener)) } -// Implement Parse for EventListener. kw::event_listener exists in the kw module. -// Note: MYCustomListener is a reference to a type struct or type enum, and not surrounded in quotation -// marks as such: #[job(event_listener(MyCustomListener, MyCustomEventListener2, ...))] -// importantly, MyCustomListener may be a struct or enum that has const or type params; parse those too, e.g.,: -// event_listener(PeriodicEventListener::<6000>) impl Parse for EventListenerArgs { fn parse(input: ParseStream) -> syn::Result { let _ = input.parse::()?; @@ -807,27 +843,34 @@ impl Parse for EventListenerArgs { let mut listeners = Vec::new(); // Parse a TypePath instead of a LitStr while !content.is_empty() { - let listener = content.parse::()?; - // let listener_tokens = quote! { #listener }; - // There are two possibilities: either the user does: - // event_listener(MyCustomListener, MyCustomListener2) - // or, have some with the format: event_listener(EvmContractEventListener(instance = IncredibleSquaringTaskManager, event = IncredibleSquaringTaskManager::NewTaskCreated, event_converter = convert_event_to_inputs, callback = IncredibleSquaringTaskManager::IncredibleSquaringTaskManagerCalls::respondToTask), MyCustomListener, MyCustomListener2) - // So, in case 1, we have the unique case where the first value is "EvmContractEventListener". If so, we need to parse the argument - // tokens like we do below. Otherwise, we just push the listener into the listeners vec - let full_ty = listener.path.segments.last().cloned().unwrap(); - let ty = &full_ty.ident; - let params = &full_ty.arguments; + let listener = extract_x_equals_y::(&content, true, "listener")? + .expect("No listener defined in listener block"); + + let ty_str = quote! { #listener }.to_string(); + let mut evm_args = None; + if ty_str.contains("EvmContractEventListener") { + evm_args = Some(content.parse::()?); + } + + let event = extract_x_equals_y::(&content, true, "event")? + .expect("No event defined in listener block"); + let pre_processor = + extract_x_equals_y::(&content, false, "pre_processor")?; + let post_processor = + extract_x_equals_y::(&content, false, "post_processor")?; // Create a listener. If this is an EvmContractEventListener, we need to specially parse the arguments // In the case of tangle and everything other listener type, we don't pass evm_args - let ty_str = ty.to_string(); - let mut this_listener = if ty_str.contains("EvmContractEventListener") { - let evm_args = content.parse::()?; + let ty_str = quote! { #listener }.to_string(); + let this_listener = if ty_str.contains("EvmContractEventListener") { + assert!(evm_args.is_some(), "EvmArgs must be passed"); SingleListener { listener, - evm_args: Some(evm_args), - handler: None, + evm_args, listener_type: ListenerType::Evm, + event, + post_processor, + pre_processor, } } else { let listener_type = if ty_str.contains("TangleEventListener") { @@ -835,36 +878,17 @@ impl Parse for EventListenerArgs { } else { ListenerType::Custom }; + SingleListener { listener, evm_args: None, - handler: None, listener_type, + event, + post_processor, + pre_processor, } }; - // Now, determine if this is a tangle listener that has unique code generation requirements - // We do not care about EVM here since it already has its own special handler via evm_args - if this_listener.listener_type == ListenerType::Tangle { - if let PathArguments::AngleBracketed(args) = params { - let args = &args.args; - if args.is_empty() || args.len() > 2 { - return Err(content.error("Expected 1 or 2 type parameter arguments")); - } - - // The first argument is the #event_type, the second, optionally, is the #event_handler that handles that event - let event_type = args[0].to_token_stream(); - let event_handler = args.get(0).map(|r| r.to_token_stream()); - let handler = SpecialHandlerArgs { - event_type, - event_handler, - }; - this_listener.handler = Some(handler); - } else { - panic!("Invalid type parameters specified for {ty_str}") - } - } - listeners.push(this_listener); if content.peek(Token![,]) { @@ -876,9 +900,11 @@ impl Parse for EventListenerArgs { return Err(content.error("Expected at least one event listener")); } - Ok(Self { - listeners: Some(listeners), - }) + if listeners.len() > 1 { + return Err(content.error("Only one event listener is currently supported")); + } + + Ok(Self { listeners }) } } @@ -909,27 +935,107 @@ pub(crate) struct EvmArgs { } impl EventListenerArgs { - /// Returns true if on EigenLayer - pub fn get_evm(&self) -> Option<&EvmArgs> { - self.listeners.as_ref().and_then(|listeners| { - listeners + fn get_event_listener(&self) -> &SingleListener { + &self.listeners[0] + } + + pub fn get_param_result_tokenstream( + &self, + fields: &[FieldType], + ) -> Vec { + let event_listener = self.get_event_listener(); + if fields.len() == 1 { + let ident = format_ident!("job_result"); + match event_listener.listener_type { + ListenerType::Evm => { + vec![quote! { let #ident = job_result; }] + } + + ListenerType::Tangle => { + vec![crate::tangle::field_type_to_result_token( + &ident, &fields[0], + )] + } + + ListenerType::Custom => { + vec![quote! { let #ident = job_result; }] + } + } + } else { + fields .iter() - .find_map(|listener| listener.evm_args.as_ref()) - }) + .enumerate() + .map(|(i, t)| { + let ident = format_ident!("result_{i}"); + match event_listener.listener_type { + ListenerType::Evm => { + quote! { + let #ident = job_result[#i]; + } + } + + ListenerType::Tangle => { + let s = crate::tangle::field_type_to_result_token(&ident, t); + quote! { + let #ident = job_result[#i]; + #s + } + } + + ListenerType::Custom => { + quote! { + let #ident = job_result[#i]; + } + } + } + }) + .collect::>() + } + } + + pub fn get_param_name_tokenstream( + &self, + params: &[FieldType], + ) -> Vec { + params + .iter() + .enumerate() + .map(|(i, t)| { + let ident = format_ident!("param{i}"); + let index = Index::from(i); + match self.get_event_listener().listener_type { + ListenerType::Tangle => crate::tangle::field_type_to_param_token(&ident, t), + ListenerType::Evm => { + quote! { + let #ident = inputs.#index; + } + } + // All other event listeners will return just one type + ListenerType::Custom => { + quote! { + let #ident = inputs.#index; + } + } + } + }) + .collect::>() + } + + /// Returns true if on EVM + pub fn get_evm(&self) -> Option<&EvmArgs> { + self.get_event_listener().evm_args.as_ref() } pub fn has_tangle(&self) -> bool { self.listeners - .as_ref() - .map(|r| r.iter().any(|r| r.listener_type == ListenerType::Tangle)) - .unwrap_or(false) + .iter() + .any(|r| r.listener_type == ListenerType::Tangle) } pub fn has_evm(&self) -> bool { self.listeners - .as_ref() - .map(|r| r.iter().any(|r| r.listener_type == ListenerType::Evm)) - .unwrap_or(false) + .iter() + .any(|r| r.listener_type == ListenerType::Evm) } /// Returns the Event Handler's Instance if on EigenLayer. Otherwise, returns None diff --git a/macros/blueprint-proc-macro/src/sdk_main.rs b/macros/blueprint-proc-macro/src/sdk_main.rs index 2222d49de..1ae56f1ec 100644 --- a/macros/blueprint-proc-macro/src/sdk_main.rs +++ b/macros/blueprint-proc-macro/src/sdk_main.rs @@ -40,6 +40,17 @@ pub(crate) fn sdk_main_impl(args: &SdkMainArgs, input: &ItemFn) -> syn::Result syn::Result Result<(), Box> { gadget_sdk::logging::setup_log(); - // Load the environment and create the gadget runner - let config: gadget_sdk::config::ContextConfig = gadget_sdk::structopt::StructOpt::from_args(); - let env = gadget_sdk::config::load(config.clone()).expect("Failed to load environment"); - gadget_sdk::utils::check_for_test(&config).expect("Failed to check for test"); - + #standard_setup inner_main(#env_passed_var).await?; Ok(()) } diff --git a/sdk/src/event_listener/executor.rs b/sdk/src/event_listener/executor.rs new file mode 100644 index 000000000..0ab1ad772 --- /dev/null +++ b/sdk/src/event_listener/executor.rs @@ -0,0 +1,281 @@ +use crate::event_listener::EventListener; +use async_trait::async_trait; +use std::future::Future; +use std::marker::PhantomData; +use std::pin::Pin; + +/// [`EventFlowExecutor`]: Allows flexible and organized execution of events +/// +/// Provides the structure for the workflow of taking events and running them through a series of steps. +/// This is meant to be auto-implemented by the job macro onto the provided structs that implement T: EventListener. +#[async_trait] +pub trait EventFlowExecutor +where + T: Send + 'static, + Ctx: Send + 'static, + Self: EventListener, +{ + type PreprocessedEvent: Send + 'static; + type PreProcessor: ProcessorFunction< + T, + Result, + BoxedFuture>, + >; + type JobProcessor: ProcessorFunction< + Self::PreprocessedEvent, + Result, + BoxedFuture>, + >; + type JobProcessedEvent: Send + 'static; + type PostProcessor: ProcessorFunction< + Self::JobProcessedEvent, + Result<(), crate::Error>, + BoxedFuture>, + >; + + fn get_preprocessor(&mut self) -> &mut Self::PreProcessor; + fn get_job_processor(&mut self) -> &mut Self::JobProcessor; + fn get_postprocessor(&mut self) -> &mut Self::PostProcessor; + + async fn pre_process(&mut self, event: T) -> Result { + self.get_preprocessor()(event).await + } + + async fn process( + &mut self, + preprocessed_event: Self::PreprocessedEvent, + ) -> Result { + self.get_job_processor()(preprocessed_event).await + } + + async fn post_process( + &mut self, + job_output: Self::JobProcessedEvent, + ) -> Result<(), crate::Error> { + self.get_postprocessor()(job_output).await + } + + async fn event_loop(&mut self) -> Result<(), crate::Error> { + // TODO: add exponential backoff logic here + while let Some(event) = self.next_event().await { + let preprocessed_event = self.pre_process(event).await?; + let job_output = self.process(preprocessed_event).await?; + self.post_process(job_output).await?; + } + + Err(crate::Error::Other( + "Event loop ended unexpectedly".to_string(), + )) + } +} + +pub trait ProcessorFunction: Fn(Event) -> Fut +where + Fut: Future + Send + 'static, + Event: Send + 'static, + Out: Send + 'static, +{ +} + +// Blanket implementation of ProcessorFunction for all functions that satisfy the constraints +impl ProcessorFunction for F +where + F: Fn(Event) -> Fut, + Fut: Future + Send + 'static, + Event: Send + 'static, + Out: Send + 'static, +{ +} + +pub type BoxedFuture = Pin + Send + 'static>>; + +pub struct EventFlowWrapper< + Ctx: Send + 'static, + Event: Send + 'static, + PreProcessOut: Send + 'static, + JobOutput: Send + 'static, +> { + event_listener: Box>, + preprocessor: Box BoxedFuture> + Send>, + job_processor: + Box BoxedFuture> + Send>, + postprocessor: Box BoxedFuture> + Send>, + _pd: PhantomData, +} + +impl< + Ctx: Send + 'static, + Event: Send + 'static, + PreProcessOut: Send + 'static, + JobOutput: Send + 'static, + > EventFlowWrapper +{ + pub fn new( + event_listener: T, + preprocessor: Pre, + job_processor: Job, + postprocessor: Post, + ) -> Self + where + T: EventListener, + Pre: Fn(Event) -> PreFut + Send + 'static, + PreFut: Future> + Send + 'static, + Job: Fn(PreProcessOut) -> JobFut + Send + 'static, + JobFut: Future> + Send + 'static, + Post: Fn(JobOutput) -> PostFut + Send + 'static, + PostFut: Future> + Send + 'static, + { + Self { + event_listener: Box::new(event_listener), + preprocessor: Box::new(move |event| Box::pin(preprocessor(event))), + job_processor: Box::new(move |event| Box::pin(job_processor(event))), + postprocessor: Box::new(move |event| Box::pin(postprocessor(event))), + _pd: PhantomData, + } + } +} + +#[async_trait] +impl< + Ctx: Send + 'static, + Event: Send + 'static, + PreProcessOut: Send + 'static, + JobOutput: Send + 'static, + > EventFlowExecutor for EventFlowWrapper +{ + type PreprocessedEvent = PreProcessOut; + type PreProcessor = + Box BoxedFuture> + Send>; + type JobProcessor = Box< + dyn Fn( + Self::PreprocessedEvent, + ) -> BoxedFuture> + + Send, + >; + type JobProcessedEvent = JobOutput; + type PostProcessor = + Box BoxedFuture> + Send>; + + fn get_preprocessor(&mut self) -> &mut Self::PreProcessor { + &mut self.preprocessor + } + + fn get_job_processor(&mut self) -> &mut Self::JobProcessor { + &mut self.job_processor + } + + fn get_postprocessor(&mut self) -> &mut Self::PostProcessor { + &mut self.postprocessor + } +} + +#[async_trait] +impl< + Ctx: Send + 'static, + Event: Send + 'static, + PreProcessOut: Send + 'static, + JobOutput: Send + 'static, + > EventListener for EventFlowWrapper +{ + async fn new(_context: &Ctx) -> Result { + unreachable!("Not called here") + } + + async fn next_event(&mut self) -> Option { + self.event_listener.next_event().await + } + + async fn handle_event(&mut self, _event: Event) -> Result<(), crate::Error> { + unreachable!("Not called here") + } +} + +#[cfg(test)] +mod tests { + use crate::event_listener::executor::{EventFlowExecutor, EventFlowWrapper}; + use crate::event_listener::EventListener; + use crate::Error; + use async_trait::async_trait; + use std::sync::atomic::{AtomicU64, Ordering}; + use std::sync::Arc; + use std::time::Duration; + + struct DummyEventListener(Arc); + + type TestEvent = Arc; + + #[async_trait] + impl EventListener> for DummyEventListener { + async fn new(context: &Arc) -> Result + where + Self: Sized, + { + Ok(Self(context.clone())) + } + + async fn next_event(&mut self) -> Option { + tokio::time::sleep(Duration::from_millis(1000)).await; + Some(self.0.clone()) + } + + async fn handle_event(&mut self, _event: TestEvent) -> Result<(), Error> { + unreachable!("Not handled here") + } + } + + async fn preprocess(event: TestEvent) -> Result<(u64, TestEvent), Error> { + let amount = event.fetch_add(1, Ordering::SeqCst) + 1; + Ok((amount, event)) + } + + async fn job_processor(preprocessed_event: (u64, TestEvent)) -> Result { + let amount = preprocessed_event.1.fetch_add(1, Ordering::SeqCst) + 1; + Ok(amount) + } + + async fn post_process(_job_output: u64) -> Result<(), Error> { + Ok(()) + } + + #[tokio::test] + async fn test_event_flow_executor_builds() { + let counter = Arc::new(AtomicU64::new(0)); + let _event_listener = EventFlowWrapper::new( + DummyEventListener(counter.clone()), + preprocess, + job_processor, + post_process, + ); + } + + #[tokio::test] + async fn test_event_flow_executor_executes() { + let counter = &Arc::new(AtomicU64::new(0)); + let mut event_listener = EventFlowWrapper::new( + DummyEventListener(counter.clone()), + preprocess, + job_processor, + post_process, + ); + + let executor = async move { event_listener.event_loop().await.expect("Executor failed") }; + let poller = async move { + loop { + tokio::time::sleep(Duration::from_millis(100)).await; + if counter.load(Ordering::SeqCst) >= 2 { + break; + } + } + }; + + tokio::select! { + _res0 = executor => { + panic!("Executor failed") + }, + + _res1 = poller => { + assert_eq!(counter.load(Ordering::SeqCst), 2); + } + } + } +} diff --git a/sdk/src/event_listener/mod.rs b/sdk/src/event_listener/mod.rs index bf630b847..e05211623 100644 --- a/sdk/src/event_listener/mod.rs +++ b/sdk/src/event_listener/mod.rs @@ -20,14 +20,13 @@ use tokio::sync::Mutex; use tokio_retry::strategy::ExponentialBackoff; use tokio_retry::Retry; +pub mod executor; pub mod markers; pub mod periodic; /// The [`EventListener`] trait defines the interface for event listeners. #[async_trait] -pub trait EventListener: - Send + Sync + 'static -{ +pub trait EventListener: Send + 'static { async fn new(context: &Ctx) -> Result where Self: Sized; diff --git a/sdk/src/job_runner.rs b/sdk/src/job_runner.rs index f9c73c9fa..030aae27a 100644 --- a/sdk/src/job_runner.rs +++ b/sdk/src/job_runner.rs @@ -12,7 +12,7 @@ use tangle_subxt::tangle_testnet_runtime::api::runtime_types::tangle_primitives: pub struct MultiJobRunner<'a> { pub(crate) enqueued_job_runners: EnqueuedJobRunners<'a>, - pub(crate) env: GadgetConfiguration, + pub(crate) env: Option>, } pub type EnqueuedJobRunners<'a> = Vec< @@ -56,7 +56,11 @@ impl<'a, K: InitializableEventHandler + Send + 'a> JobBuilder<'a, K> { where K: markers::IsTangle, { - let env = self.runner.env.clone(); + let env = self + .runner + .env + .clone() + .expect("Must have an env when using tangle"); self.with_registration((env, price_targets), tangle_registration) } @@ -75,12 +79,17 @@ impl<'a, K: InitializableEventHandler + Send + 'a> JobBuilder<'a, K> { pub fn finish(mut self, job_runner: K) -> MultiJobRunner<'a> { let registration = self.register_call.take(); - let test_mode = self.runner.env.test_mode; + let skip_registration = self + .runner + .env + .as_ref() + .map(|r| r.test_mode) + .unwrap_or(true); let task = Box::pin(async move { if let Some(registration) = registration { // Skip registration if in test mode - if !test_mode { + if !skip_registration { if let Err(err) = registration.await { crate::error!("Failed to register job: {err:?}"); return None; @@ -98,10 +107,10 @@ impl<'a, K: InitializableEventHandler + Send + 'a> JobBuilder<'a, K> { } impl<'a> MultiJobRunner<'a> { - pub fn new(env: &GadgetConfiguration) -> Self { + pub fn new>>>(env: T) -> Self { Self { enqueued_job_runners: Vec::new(), - env: env.clone(), + env: env.into().cloned(), } }