diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 8eacc5f84..ca416ce3b 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -69,7 +69,7 @@ jobs: run: forge update && cd blueprints/incredible-squaring-eigenlayer && forge build --root ./contracts - name: Run Clippy - run: cargo clippy --tests -- -D warnings + run: cargo clippy --tests --examples -- -D warnings testing: timeout-minutes: 90 @@ -122,3 +122,58 @@ jobs: - name: tests run: cargo nextest run --nocapture --package ${{ matrix.package }} ${{ matrix.package == 'gadget-sdk' && '--features getrandom,std' || '' }} + + integration-test-eigenlayer: + timeout-minutes: 90 + runs-on: macos-latest + steps: + - name: checkout code + uses: actions/checkout@v2 + + - name: Install Foundry + run: | + curl -L https://foundry.paradigm.xyz | bash + source /Users/runner/.bashrc + foundryup + + - name: Add Foundry to PATH + run: echo "${HOME}/.foundry/bin" >> $GITHUB_PATH + + - name: Forge build + run: forge update && cd blueprints/incredible-squaring-eigenlayer && forge build --root ./contracts && cd ../../ + + - name: setup-docker + run: brew install docker + + - name: Verify Forge and Docker installation + run: forge --version && docker --version + + - name: Run anvil in background + run: anvil --version && anvil & + + - name: install rust + uses: dtolnay/rust-toolchain@nightly + with: + toolchain: stable + + - uses: swatinem/rust-cache@v2 + with: + cache-on-failure: "true" + + - name: install protobuf + run: brew install protobuf gmp + + - name: Set Relevant M1 env vars + run: | + export LIBRARY_PATH=$LIBRARY_PATH:/opt/homebrew/lib + export INCLUDE_PATH=$INCLUDE_PATH:/opt/homebrew/include + + - name: install cargo-nextest + run: cargo install cargo-nextest --locked + + - name: Build (release) integration test + run: cargo build --release -p incredible-squaring-blueprint-eigenlayer + + - name: tests + run: RUST_LOG=gadget=trace cargo test --package blueprint-test-utils test_eigenlayer_incredible_squaring_blueprint -- --nocapture + diff --git a/Cargo.lock b/Cargo.lock index 82d45a805..b71b89b50 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4774,6 +4774,7 @@ dependencies = [ "http-body-util", "hyper 1.5.0", "hyper-util", + "itertools 0.13.0", "k256", "libp2p", "lock_api", @@ -11924,6 +11925,16 @@ dependencies = [ "libc", ] +[[package]] +name = "tangle-raw-event-listener-blueprint" +version = "0.1.1" +dependencies = [ + "blueprint-metadata", + "color-eyre", + "gadget-sdk", + "tracing", +] + [[package]] name = "tangle-subxt" version = "0.4.0" diff --git a/Cargo.toml b/Cargo.toml index 0931e770e..f5479d0ac 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,6 +6,7 @@ members = [ "blueprints/incredible-squaring-eigenlayer", "blueprints/incredible-squaring-symbiotic", "blueprints/periodic-web-poller", + "blueprints/tangle-raw-event-listener", "cli", "gadget-io", "blueprint-test-utils", @@ -15,6 +16,7 @@ members = [ "macros/blueprint-proc-macro-core", "macros/playground", "macros/context-derive", + ] exclude = ["tangle-test-utils", "example"] @@ -49,6 +51,7 @@ incredible-squaring-blueprint = { path = "./blueprints/incredible-squaring", def incredible-squaring-blueprint-eigenlayer = { path = "./blueprints/incredible-squaring-eigenlayer", default-features = false, version = "0.1.1" } incredible-squaring-blueprint-symbiotic = { path = "./blueprints/incredible-squaring-symbiotic", default-features = false, version = "0.1.1" } periodic-web-poller-blueprint = { path = "./blueprints/periodic-web-poller", default-features = false, version = "0.1.1" } +tangle-raw-event-listener-blueprint = { path = "./blueprints/tangle-raw-event-listener", default-features = false, version = "0.1.1" } gadget-blueprint-proc-macro = { path = "./macros/blueprint-proc-macro", default-features = false, version = "0.2.3" } gadget-blueprint-proc-macro-core = { path = "./macros/blueprint-proc-macro-core", default-features = false, version = "0.1.5" } gadget-context-derive = { path = "./macros/context-derive", default-features = false, version = "0.1.3" } diff --git a/blueprint-manager/src/sources/mod.rs b/blueprint-manager/src/sources/mod.rs index 64bd44779..0afce35e8 100644 --- a/blueprint-manager/src/sources/mod.rs +++ b/blueprint-manager/src/sources/mod.rs @@ -156,7 +156,6 @@ pub fn generate_process_arguments( format!("--ws-rpc-url={}", gadget_config.ws_rpc_url), format!("--keystore-uri={}", gadget_config.keystore_uri), format!("--chain={}", gadget_config.chain), - format!("-{}", "v".repeat(gadget_config.verbose as usize)), format!("--blueprint-id={}", blueprint_id), format!("--service-id={}", service_id), format!("--protocol={}", protocol), @@ -173,5 +172,10 @@ pub fn generate_process_arguments( arguments.push(format!("--keystore-password={}", keystore_password)); } + // Uses occurrences of clap short -v + if opt.verbose > 0 { + arguments.push(format!("-{}", "v".repeat(opt.verbose as usize))); + } + Ok(arguments) } diff --git a/blueprint-test-utils/src/helpers.rs b/blueprint-test-utils/src/helpers.rs index 0aa90ce5e..2fb3861ea 100644 --- a/blueprint-test-utils/src/helpers.rs +++ b/blueprint-test-utils/src/helpers.rs @@ -139,8 +139,8 @@ impl BlueprintProcessManager { format!("--ws-rpc-url={}", Url::parse(ws_endpoint).unwrap()), format!("--keystore-uri={}", keystore_uri_str.clone()), format!("--chain={}", SupportedChains::LocalTestnet), - format!("--vvv"), - format!("--pretty"), + "-vvv".to_string(), + "--pretty".to_string(), format!("--blueprint-id={}", instance_id), format!("--service-id={}", instance_id), format!("--protocol={}", protocol), @@ -168,10 +168,6 @@ impl BlueprintProcessManager { "--strategy-manager={}", EigenlayerContractAddresses::default().strategy_manager_address )); - arguments.push(format!( - "--avs-directory={}", - EigenlayerContractAddresses::default().avs_directory_address - )); } Protocol::Symbiotic => { arguments.push(format!( diff --git a/blueprint-test-utils/src/lib.rs b/blueprint-test-utils/src/lib.rs index a400aef0b..ad1d435b7 100644 --- a/blueprint-test-utils/src/lib.rs +++ b/blueprint-test-utils/src/lib.rs @@ -313,24 +313,6 @@ pub async fn request_service( Ok(()) } -/// Approves a service request. This is meant for testing, and will always approve the request. -pub async fn approve_service( - client: &TestClient, - caller: &TanglePairSigner, - request_id: u64, - restaking_percent: u8, -) -> Result<(), Box> { - let call = api::tx() - .services() - .approve(request_id, Percent(restaking_percent)); - let res = client - .tx() - .sign_and_submit_then_watch_default(&call, caller) - .await?; - wait_for_in_block_success(res).await?; - Ok(()) -} - pub async fn wait_for_in_block_success( mut res: TxProgress, ) -> Result<(), Box> { @@ -410,6 +392,39 @@ pub async fn get_next_call_id(client: &TestClient) -> Result Ok(res) } +/// Approves a service request. This is meant for testing, and will always approve the request. +pub async fn approve_service( + client: &TestClient, + caller: &TanglePairSigner, + request_id: u64, + restaking_percent: u8, +) -> Result<(), Box> { + gadget_sdk::info!("Approving service request ..."); + let call = api::tx() + .services() + .approve(request_id, Percent(restaking_percent)); + let res = client + .tx() + .sign_and_submit_then_watch_default(&call, caller) + .await?; + res.wait_for_finalized_success().await?; + Ok(()) +} + +pub async fn get_next_request_id(client: &TestClient) -> Result> { + gadget_sdk::info!("Fetching next request ID ..."); + let next_request_id_addr = api::storage().services().next_service_request_id(); + let next_request_id = client + .storage() + .at_latest() + .await + .expect("Failed to fetch latest block") + .fetch_or_default(&next_request_id_addr) + .await + .expect("Failed to fetch next request ID"); + Ok(next_request_id) +} + #[macro_export] macro_rules! test_blueprint { ( @@ -420,7 +435,7 @@ macro_rules! test_blueprint { [$($expected_output:expr),+] ) => { use $crate::{ - get_next_call_id, get_next_service_id, run_test_blueprint_manager, + get_next_call_id, run_test_blueprint_manager, submit_job, wait_for_completion_of_tangle_job, Opts, setup_log, }; @@ -457,11 +472,10 @@ macro_rules! test_blueprint { run_test_blueprint_manager, ) .await - .execute_with_async(move |client, handles| async move { + .execute_with_async(move |client, handles, blueprint| async move { let keypair = handles[0].sr25519_id().clone(); - let service_id = get_next_service_id(client) - .await - .expect("Failed to get next service id"); + let selected_service = &blueprint.services[0]; + let service_id = selected_service.id; let call_id = get_next_call_id(client) .await .expect("Failed to get next job id"); @@ -570,17 +584,14 @@ mod tests_standard { new_test_ext_blueprint_manager::<5, 1, (), _, _>((), opts, run_test_blueprint_manager) .await - .execute_with_async(move |client, handles| async move { + .execute_with_async(move |client, handles, blueprint| async move { // At this point, blueprint has been deployed, every node has registered // as an operator for the relevant services, and, all gadgets are running // What's left: Submit a job, wait for the job to finish, then assert the job results let keypair = handles[0].sr25519_id().clone(); - - let service_id = get_next_service_id(client) - .await - .expect("Failed to get next service id") - .saturating_sub(1); + let selected_service = &blueprint.services[0]; + let service_id = selected_service.id; let call_id = get_next_call_id(client) .await .expect("Failed to get next job id") diff --git a/blueprint-test-utils/src/test_ext.rs b/blueprint-test-utils/src/test_ext.rs index f3329c341..a64189c65 100644 --- a/blueprint-test-utils/src/test_ext.rs +++ b/blueprint-test-utils/src/test_ext.rs @@ -20,10 +20,8 @@ use blueprint_manager::executor::BlueprintManagerHandle; use blueprint_manager::sdk::entry::SendFuture; use cargo_tangle::deploy::Opts; use gadget_sdk::clients::tangle::runtime::TangleClient; -use gadget_sdk::tangle_subxt::subxt::OnlineClient; use gadget_sdk::tangle_subxt::tangle_testnet_runtime::api::runtime_types::tangle_primitives::services::PriceTargets; use gadget_sdk::tangle_subxt::tangle_testnet_runtime::api::services::calls::types::register::{Preferences, RegistrationArgs}; -use gadget_sdk::tangle_subxt::tangle_testnet_runtime::api as api; use libp2p::Multiaddr; use std::collections::HashSet; use std::future::Future; @@ -38,6 +36,9 @@ use gadget_sdk::keystore::KeystoreUriSanitizer; use sp_core::Pair; use tracing::Instrument; use gadget_sdk::{error, info, warn}; +use gadget_sdk::clients::tangle::services::{RpcServicesWithBlueprint, ServicesClient}; +use gadget_sdk::subxt_core::config::Header; +use gadget_sdk::utils::get_client; const LOCAL_BIND_ADDR: &str = "127.0.0.1"; pub const NAME_IDS: [&str; 5] = ["Alice", "Bob", "Charlie", "Dave", "Eve"]; @@ -128,6 +129,9 @@ pub async fn new_test_ext_blueprint_manager< handles.push(handle); } + let local_tangle_node_ws = opts.ws_rpc_url.clone(); + let local_tangle_node_http = opts.http_rpc_url.clone(); + // Step 1: Create the blueprint using alice's identity let blueprint_id = match cargo_tangle::deploy::deploy_to_tangle(opts.clone()).await { Ok(id) => id, @@ -137,7 +141,7 @@ pub async fn new_test_ext_blueprint_manager< } }; - let client = OnlineClient::from_url(&opts.ws_rpc_url) + let client = get_client(&local_tangle_node_ws, &local_tangle_node_http) .await .expect("Failed to create an account-based localhost client"); @@ -147,9 +151,7 @@ pub async fn new_test_ext_blueprint_manager< // TODO: allow the function called to specify the registration args for handle in handles { - let client = OnlineClient::from_url(&opts.ws_rpc_url) - .await - .expect("Failed to create an account-based localhost client"); + let client = client.clone(); let registration_args = registration_args.clone(); let task = async move { @@ -198,11 +200,11 @@ pub async fn new_test_ext_blueprint_manager< futures_ordered.push_back(task); } - let mut handles = futures_ordered + let handles = futures_ordered .collect::>() .await; - // Step 3: register a service + // Step 3: request a service let all_nodes = handles .iter() .map(|handle| handle.sr25519_id().account_id().clone()) @@ -210,15 +212,6 @@ pub async fn new_test_ext_blueprint_manager< // Use Alice's account to register the service info!("Requesting service for blueprint ID {blueprint_id} using Alice's keys ..."); - let next_request_id_addr = api::storage().services().next_service_request_id(); - let next_request_id = client - .storage() - .at_latest() - .await - .expect("Failed to fetch latest block") - .fetch_or_default(&next_request_id_addr) - .await - .expect("Failed to fetch next request ID"); if let Err(err) = super::request_service(&client, handles[0].sr25519_id(), blueprint_id, all_nodes).await @@ -227,24 +220,53 @@ pub async fn new_test_ext_blueprint_manager< panic!("Failed to register service: {err}"); } - // Approve the service request for each node - let futures = handles.iter().map(|handle| async { - let keypair = handle.sr25519_id().clone(); - let percentage = 50; - info!( - "Approving service request {next_request_id} for {} with {percentage}%", - keypair.account_id() - ); - if let Err(err) = - super::approve_service(&client, &keypair, next_request_id, percentage).await - { - error!("Failed to approve service request: {err}"); - panic!("Failed to approve service request: {err}"); - } - }); + let next_request_id = super::get_next_request_id(&client) + .await + .expect("Failed to get next request ID") + .saturating_sub(1); + + // Step 2: Have each identity register to a blueprint + let mut futures_ordered = FuturesOrdered::new(); + + for handle in handles { + let client = client.clone(); + let task = async move { + let keypair = handle.sr25519_id().clone(); + if let Err(err) = super::approve_service(&client, &keypair, next_request_id, 20).await { + let _span = handle.span().enter(); + error!("Failed to approve service request {next_request_id}: {err}"); + panic!("Failed to approve service request {next_request_id}: {err}"); + } + + handle + }; - let futures_ordered = FuturesOrdered::from_iter(futures); - let _ = futures_ordered.collect::>().await; + futures_ordered.push_back(task); + } + + let mut handles = futures_ordered + .collect::>() + .await; + + let now = client + .blocks() + .at_latest() + .await + .expect("Unable to get block") + .header() + .hash() + .0; + let services_client = ServicesClient::new(client.clone()); + let blueprints = services_client + .query_operator_blueprints(now, handles[0].sr25519_id().account_id().clone()) + .await + .expect("Failed to query operator blueprints"); + assert!(!blueprints.is_empty(), "No blueprints found"); + + let blueprint = blueprints + .into_iter() + .find(|r| r.blueprint_id == blueprint_id) + .expect("Blueprint not found in operator's blueprints"); // Now, start every blueprint manager. With the blueprint submitted and every operator registered // to the blueprint, we can now start the blueprint manager, expecting that the blueprint manager @@ -268,6 +290,7 @@ pub async fn new_test_ext_blueprint_manager< client, handles, span, + blueprint, } } @@ -284,33 +307,42 @@ pub struct LocalhostTestExt { client: TangleClient, handles: Vec, span: tracing::Span, + blueprint: RpcServicesWithBlueprint, } impl LocalhostTestExt { /// An identity function (For future reverse-compatible changes) pub fn execute_with< - T: FnOnce(&TangleClient, &Vec) -> R + Send + 'static, + T: FnOnce(&TangleClient, &Vec, &RpcServicesWithBlueprint) -> R + + Send + + 'static, R: Send + 'static, >( &self, function: T, ) -> R { let _span = self.span.enter(); - function(&self.client, &self.handles) + function(&self.client, &self.handles, &self.blueprint) } /// An identity function (For future reverse-compatible changes) pub async fn execute_with_async< 'a, 'b: 'a, - T: FnOnce(&'a TangleClient, &'a Vec) -> R + Send + 'a, + T: FnOnce( + &'a TangleClient, + &'a Vec, + &'a RpcServicesWithBlueprint, + ) -> R + + Send + + 'a, R: Future + Send + 'a, Out: Send + 'b, >( &'a self, function: T, ) -> Out { - function(&self.client, &self.handles) + function(&self.client, &self.handles, &self.blueprint) .instrument(self.span.clone()) .await } diff --git a/blueprints/ecdsa-threshold-mpc/src/jobs/keygen.rs b/blueprints/ecdsa-threshold-mpc/src/jobs/keygen.rs index 89195acc9..77710c48f 100644 --- a/blueprints/ecdsa-threshold-mpc/src/jobs/keygen.rs +++ b/blueprints/ecdsa-threshold-mpc/src/jobs/keygen.rs @@ -29,7 +29,7 @@ use super::Context; #[sdk::job( id = 0, params(curve, t, num_keys, hd_wallet), - result(_), + verifier(evm = "HelloBlueprint") )] pub async fn keygen( diff --git a/blueprints/ecdsa-threshold-mpc/src/jobs/refresh.rs b/blueprints/ecdsa-threshold-mpc/src/jobs/refresh.rs index 873cab3c5..3e158e570 100644 --- a/blueprints/ecdsa-threshold-mpc/src/jobs/refresh.rs +++ b/blueprints/ecdsa-threshold-mpc/src/jobs/refresh.rs @@ -6,7 +6,7 @@ use color_eyre::Result; #[sdk::job( id = 2, params(keygen_id, new_parties, t), - result(_), + verifier(evm = "HelloBlueprint") )] pub fn refresh(keygen_id: u32, new_parties: Vec, t: u16) -> Result { diff --git a/blueprints/ecdsa-threshold-mpc/src/jobs/sign.rs b/blueprints/ecdsa-threshold-mpc/src/jobs/sign.rs index c6cd6666d..c6230b6ba 100644 --- a/blueprints/ecdsa-threshold-mpc/src/jobs/sign.rs +++ b/blueprints/ecdsa-threshold-mpc/src/jobs/sign.rs @@ -6,7 +6,7 @@ use color_eyre::Result; #[sdk::job( id = 1, params(msgs, is_prehashed), - result(_), + verifier(evm = "HelloBlueprint") )] pub fn sign(msgs: Vec, is_prehashed: bool) -> Result { diff --git a/blueprints/incredible-squaring-eigenlayer/contracts/lib/forge-std b/blueprints/incredible-squaring-eigenlayer/contracts/lib/forge-std index 035de35f5..1eea5bae1 160000 --- a/blueprints/incredible-squaring-eigenlayer/contracts/lib/forge-std +++ b/blueprints/incredible-squaring-eigenlayer/contracts/lib/forge-std @@ -1 +1 @@ -Subproject commit 035de35f5e366c8d6ed142aec4ccb57fe2dd87d4 +Subproject commit 1eea5bae12ae557d589f9f0f0edae2faa47cb262 diff --git a/blueprints/incredible-squaring-eigenlayer/src/jobs/compute_x_square.rs b/blueprints/incredible-squaring-eigenlayer/src/jobs/compute_x_square.rs index d8b97a078..8d65849c0 100644 --- a/blueprints/incredible-squaring-eigenlayer/src/jobs/compute_x_square.rs +++ b/blueprints/incredible-squaring-eigenlayer/src/jobs/compute_x_square.rs @@ -1,6 +1,6 @@ #![allow(dead_code)] use crate::contexts::client::{AggregatorClient, SignedTaskResponse}; -use crate::{noop, IncredibleSquaringTaskManager, INCREDIBLE_SQUARING_TASK_MANAGER_ABI_STRING}; +use crate::{IncredibleSquaringTaskManager, INCREDIBLE_SQUARING_TASK_MANAGER_ABI_STRING}; use alloy_primitives::keccak256; use alloy_primitives::{hex, Bytes, U256}; use alloy_sol_types::SolType; @@ -10,7 +10,7 @@ use color_eyre::Result; use eigensdk::crypto_bls::BlsKeyPair; use eigensdk::crypto_bls::OperatorId; use gadget_sdk::{error, info, job}; -use std::{convert::Infallible, ops::Deref, sync::OnceLock}; +use std::{convert::Infallible, ops::Deref}; use IncredibleSquaringTaskManager::TaskResponse; /// Returns x^2 saturating to [`u64::MAX`] if overflow occurs. diff --git a/blueprints/incredible-squaring-eigenlayer/src/jobs/initialize_task.rs b/blueprints/incredible-squaring-eigenlayer/src/jobs/initialize_task.rs index 3d1fe058a..efff5a1de 100644 --- a/blueprints/incredible-squaring-eigenlayer/src/jobs/initialize_task.rs +++ b/blueprints/incredible-squaring-eigenlayer/src/jobs/initialize_task.rs @@ -1,9 +1,9 @@ use crate::{ - contexts::aggregator::AggregatorContext, noop, IncredibleSquaringTaskManager, + contexts::aggregator::AggregatorContext, IncredibleSquaringTaskManager, INCREDIBLE_SQUARING_TASK_MANAGER_ABI_STRING, }; use gadget_sdk::{info, job}; -use std::{convert::Infallible, ops::Deref, sync::OnceLock}; +use std::{convert::Infallible, ops::Deref}; use IncredibleSquaringTaskManager::Task; const TASK_CHALLENGE_WINDOW_BLOCK: u32 = 100; diff --git a/blueprints/incredible-squaring-eigenlayer/src/main.rs b/blueprints/incredible-squaring-eigenlayer/src/main.rs index df7711ed1..45999285c 100644 --- a/blueprints/incredible-squaring-eigenlayer/src/main.rs +++ b/blueprints/incredible-squaring-eigenlayer/src/main.rs @@ -65,7 +65,8 @@ async fn main() { let aggregator_client = AggregatorClient::new(&server_address)?; let x_square_eigen = XsquareEigenEventHandler:: { ctx: aggregator_client, - contract: contract.clone().into(), + contract: contract.clone(), + contract_instance: Default::default(), }; let aggregator_context = AggregatorContext::new( @@ -80,12 +81,10 @@ async fn main() { let initialize_task = InitializeBlsTaskEventHandler:: { ctx: aggregator_context.clone(), - contract: contract.clone().into(), + contract, + contract_instance: Default::default(), }; - // let (handle, aggregator_shutdown_tx) = - // aggregator_context.start(env.ws_rpc_endpoint.clone()); - info!("~~~ Executing the incredible squaring blueprint ~~~"); let eigen_config = EigenlayerConfig {}; BlueprintRunner::new(eigen_config, env) diff --git a/blueprints/incredible-squaring-symbiotic/src/lib.rs b/blueprints/incredible-squaring-symbiotic/src/lib.rs index 665d3f0f1..3d3712d4a 100644 --- a/blueprints/incredible-squaring-symbiotic/src/lib.rs +++ b/blueprints/incredible-squaring-symbiotic/src/lib.rs @@ -2,7 +2,7 @@ use alloy_primitives::U256; use alloy_sol_types::sol; use gadget_sdk::{job, load_abi}; use serde::{Deserialize, Serialize}; -use std::{ops::Deref, sync::OnceLock}; +use std::ops::Deref; sol!( #[allow(missing_docs, clippy::too_many_arguments)] diff --git a/blueprints/incredible-squaring-symbiotic/src/main.rs b/blueprints/incredible-squaring-symbiotic/src/main.rs index cf45407f3..208526c29 100644 --- a/blueprints/incredible-squaring-symbiotic/src/main.rs +++ b/blueprints/incredible-squaring-symbiotic/src/main.rs @@ -36,7 +36,8 @@ async fn main() { let x_square = blueprint::XsquareEventHandler:: { context: blueprint::MyContext {}, - contract: contract.clone().into(), + contract: contract.clone(), + contract_instance: Default::default(), }; info!("~~~ Executing the incredible squaring blueprint ~~~"); diff --git a/blueprints/incredible-squaring/contracts/lib/forge-std b/blueprints/incredible-squaring/contracts/lib/forge-std index 035de35f5..1eea5bae1 160000 --- a/blueprints/incredible-squaring/contracts/lib/forge-std +++ b/blueprints/incredible-squaring/contracts/lib/forge-std @@ -1 +1 @@ -Subproject commit 035de35f5e366c8d6ed142aec4ccb57fe2dd87d4 +Subproject commit 1eea5bae12ae557d589f9f0f0edae2faa47cb262 diff --git a/blueprints/incredible-squaring/src/lib.rs b/blueprints/incredible-squaring/src/lib.rs index c372608a3..cd49f92a9 100644 --- a/blueprints/incredible-squaring/src/lib.rs +++ b/blueprints/incredible-squaring/src/lib.rs @@ -6,17 +6,16 @@ use gadget_sdk::tangle_subxt::tangle_testnet_runtime::api::services::events::Job #[derive(Clone)] pub struct MyContext; -/// Returns x^2 saturating to [`u64::MAX`] if overflow occurs. #[job( id = 0, params(x), - result(_), event_listener( - listener = TangleEventListener, + listener = TangleEventListener, pre_processor = services_pre_processor, post_processor = services_post_processor, ), )] +/// Returns x^2 saturating to [`u64::MAX`] if overflow occurs. pub fn xsquare(x: u64, context: MyContext) -> Result { Ok(x.saturating_pow(2)) } diff --git a/blueprints/incredible-squaring/src/main.rs b/blueprints/incredible-squaring/src/main.rs index 31d72e11f..a252bd77e 100644 --- a/blueprints/incredible-squaring/src/main.rs +++ b/blueprints/incredible-squaring/src/main.rs @@ -1,5 +1,4 @@ use color_eyre::{eyre::eyre, Result}; -use gadget_sdk::config::protocol::TangleInstanceSettings; use gadget_sdk::info; use gadget_sdk::runners::tangle::TangleConfig; use gadget_sdk::runners::BlueprintRunner; @@ -13,10 +12,8 @@ async fn main() { info!("Starting the event watcher for {} ...", signer.account_id()); - let tangle_settings = env.protocol_specific.tangle()?; - let TangleInstanceSettings { service_id, .. } = tangle_settings; let x_square = blueprint::XsquareEventHandler { - service_id: *service_id, + service_id: env.service_id().expect("No service ID found"), context: blueprint::MyContext, client, signer, diff --git a/blueprints/periodic-web-poller/src/lib.rs b/blueprints/periodic-web-poller/src/lib.rs index 7e538db48..90dcba638 100644 --- a/blueprints/periodic-web-poller/src/lib.rs +++ b/blueprints/periodic-web-poller/src/lib.rs @@ -7,7 +7,7 @@ use std::convert::Infallible; #[job( id = 0, params(value), - result(_), + event_listener( listener = PeriodicEventListener<2000, WebPoller, serde_json::Value, reqwest::Client>, pre_processor = pre_process, diff --git a/blueprints/tangle-avs-blueprint/contracts/lib/forge-std b/blueprints/tangle-avs-blueprint/contracts/lib/forge-std index 035de35f5..1eea5bae1 160000 --- a/blueprints/tangle-avs-blueprint/contracts/lib/forge-std +++ b/blueprints/tangle-avs-blueprint/contracts/lib/forge-std @@ -1 +1 @@ -Subproject commit 035de35f5e366c8d6ed142aec4ccb57fe2dd87d4 +Subproject commit 1eea5bae12ae557d589f9f0f0edae2faa47cb262 diff --git a/blueprints/tangle-raw-event-listener/Cargo.toml b/blueprints/tangle-raw-event-listener/Cargo.toml new file mode 100644 index 000000000..60e138ad7 --- /dev/null +++ b/blueprints/tangle-raw-event-listener/Cargo.toml @@ -0,0 +1,22 @@ +[package] +name = "tangle-raw-event-listener-blueprint" +version = "0.1.1" +description = "A Simple Blueprint to demo how to listen to raw events from Tangle" +authors.workspace = true +edition.workspace = true +license.workspace = true +homepage.workspace = true +repository.workspace = true +publish = false + +[dependencies] +gadget-sdk = { workspace = true, features = ["std"] } +color-eyre = { workspace = true } +tracing = { workspace = true } + +[build-dependencies] +blueprint-metadata = { workspace = true } + +[features] +default = ["std"] +std = [] \ No newline at end of file diff --git a/blueprints/tangle-raw-event-listener/build.rs b/blueprints/tangle-raw-event-listener/build.rs new file mode 100644 index 000000000..acb638b3c --- /dev/null +++ b/blueprints/tangle-raw-event-listener/build.rs @@ -0,0 +1,5 @@ +fn main() { + println!("cargo:rerun-if-changed=src/lib.rs"); + println!("cargo:rerun-if-changed=src/main.rs"); + blueprint_metadata::generate_json(); +} diff --git a/blueprints/tangle-raw-event-listener/src/lib.rs b/blueprints/tangle-raw-event-listener/src/lib.rs new file mode 100644 index 000000000..1ee1ee641 --- /dev/null +++ b/blueprints/tangle-raw-event-listener/src/lib.rs @@ -0,0 +1,24 @@ +use gadget_sdk::event_listener::tangle::{TangleEvent, TangleEventListener}; +use gadget_sdk::job; +use gadget_sdk::tangle_subxt::tangle_testnet_runtime::api; + +#[derive(Clone)] +pub struct MyContext; + +#[job( + id = 0, + event_listener( + listener = TangleEventListener, + ), +)] +pub fn raw(event: TangleEvent, context: MyContext) -> Result { + if let Some(balance_transfer) = event + .evt + .as_event::() + .ok() + .flatten() + { + gadget_sdk::info!("Found a balance transfer: {balance_transfer:?}"); + } + Ok(0) +} diff --git a/blueprints/tangle-raw-event-listener/src/main.rs b/blueprints/tangle-raw-event-listener/src/main.rs new file mode 100644 index 000000000..16b74e633 --- /dev/null +++ b/blueprints/tangle-raw-event-listener/src/main.rs @@ -0,0 +1,31 @@ +use color_eyre::{eyre::eyre, Result}; +use gadget_sdk::info; +use gadget_sdk::runners::{tangle::TangleConfig, BlueprintRunner}; +use gadget_sdk::tangle_subxt::subxt::tx::Signer; +use tangle_raw_event_listener_blueprint as blueprint; + +#[gadget_sdk::main(env)] +async fn main() { + let client = env.client().await.map_err(|e| eyre!(e))?; + let signer = env.first_sr25519_signer().map_err(|e| eyre!(e))?; + + info!("Starting the event watcher for {} ...", signer.account_id()); + + let x_square = blueprint::RawEventHandler { + service_id: env.service_id().expect("No service ID found"), + context: blueprint::MyContext, + client, + signer, + }; + + let tangle_config = TangleConfig::default(); + + info!("~~~ Executing the incredible squaring blueprint ~~~"); + BlueprintRunner::new(tangle_config, env) + .job(x_square) + .run() + .await?; + + info!("Exiting..."); + Ok(()) +} diff --git a/cli/src/deploy.rs b/cli/src/deploy.rs index 8594376df..a245b7fa2 100644 --- a/cli/src/deploy.rs +++ b/cli/src/deploy.rs @@ -1,5 +1,5 @@ use alloy_provider::network::TransactionBuilder; -use alloy_provider::Provider; +use alloy_provider::{Provider, WsConnect}; pub use alloy_signer_local::PrivateKeySigner; use color_eyre::eyre::{self, Context, ContextCompat, OptionExt, Result}; use gadget_blueprint_proc_macro_core::{BlueprintManager, ServiceBlueprint}; @@ -45,7 +45,7 @@ impl Debug for Opts { pub async fn generate_service_blueprint, T: AsRef>( manifest_metadata_path: P, pkg_name: Option<&String>, - http_rpc_url: T, + rpc_url: T, signer_evm: Option, ) -> Result { let manifest_path = manifest_metadata_path.into(); @@ -60,13 +60,7 @@ pub async fn generate_service_blueprint, T: AsRef>( let mut blueprint = tokio::task::spawn_blocking(move || load_blueprint_metadata(&package)).await??; build_contracts_if_needed(package_clone, &blueprint).context("Building contracts")?; - deploy_contracts_to_tangle( - http_rpc_url.as_ref(), - package_clone, - &mut blueprint, - signer_evm, - ) - .await?; + deploy_contracts_to_tangle(rpc_url.as_ref(), package_clone, &mut blueprint, signer_evm).await?; bake_blueprint(blueprint) } @@ -74,7 +68,7 @@ pub async fn generate_service_blueprint, T: AsRef>( pub async fn deploy_to_tangle( Opts { pkg_name, - http_rpc_url, + http_rpc_url: _, ws_rpc_url, manifest_path, signer, @@ -83,7 +77,7 @@ pub async fn deploy_to_tangle( ) -> Result { // Load the manifest file into cargo metadata let blueprint = - generate_service_blueprint(&manifest_path, pkg_name.as_ref(), &http_rpc_url, signer_evm) + generate_service_blueprint(&manifest_path, pkg_name.as_ref(), &ws_rpc_url, signer_evm) .await?; let signer = if let Some(signer) = signer { @@ -149,7 +143,7 @@ pub fn load_blueprint_metadata( } async fn deploy_contracts_to_tangle( - http_rpc_url: &str, + rpc_url: &str, package: &cargo_metadata::Package, blueprint: &mut ServiceBlueprint<'_>, signer_evm: Option, @@ -198,10 +192,13 @@ async fn deploy_contracts_to_tangle( }; let wallet = alloy_provider::network::EthereumWallet::from(signer); + assert!(rpc_url.starts_with("ws:")); + let provider = alloy_provider::ProviderBuilder::new() .with_recommended_fillers() .wallet(wallet) - .on_http(http_rpc_url.parse()?); + .on_ws(WsConnect::new(rpc_url)) + .await?; let chain_id = provider.get_chain_id().await?; eprintln!("Chain ID: {chain_id}"); diff --git a/macros/blueprint-proc-macro/src/event_listener/evm.rs b/macros/blueprint-proc-macro/src/event_listener/evm.rs index 48464b128..ca19e2fc8 100644 --- a/macros/blueprint-proc-macro/src/event_listener/evm.rs +++ b/macros/blueprint-proc-macro/src/event_listener/evm.rs @@ -26,8 +26,6 @@ pub(crate) fn generate_evm_event_handler( params_tokens: &[TokenStream], fn_call: &TokenStream, ) -> TokenStream { - let (instance_base, instance_name, instance_wrapper_name, _instance) = - get_evm_instance_data(event_handler); let event = event_handler .get_event_listener() .event @@ -38,7 +36,7 @@ pub(crate) fn generate_evm_event_handler( .pre_processor .as_ref() .unwrap(); - let callback = event_handler + let _callback = event_handler .get_event_listener() .post_processor .as_ref() @@ -51,117 +49,45 @@ pub(crate) fn generate_evm_event_handler( .expect("ABI String must exist"); quote! { - #[derive(Debug, Clone)] - pub struct #instance_wrapper_name { - instance: #instance_base::#instance_name, - contract_instance: OnceLock>, - } - - impl From<#instance_base::#instance_name> for #instance_wrapper_name - where - T: alloy_transport::Transport + Clone + Send + Sync + 'static, - P: alloy_provider::Provider + Clone + Send + Sync + 'static { - fn from(instance: #instance_base::#instance_name) -> Self { - Self::new(instance) + impl gadget_sdk::event_listener::evm_contracts::EvmContractInstance for #struct_name { + fn get_instance(&self) -> &alloy_contract::ContractInstance { + self.deref() } } - impl #instance_wrapper_name - where - T: alloy_transport::Transport + Clone + Send + Sync + 'static, - P: alloy_provider::Provider + Clone + Send + Sync + 'static, + impl Deref for #struct_name { - /// Constructor for creating a new [`#instance_wrapper_name`]. - pub fn new(instance: #instance_base::#instance_name) -> Self { - Self { - instance, - contract_instance: OnceLock::new(), - } - } - - /// Lazily creates the [`alloy_contract::ContractInstance`] if it does not exist, otherwise returning a reference to it. - #[allow(clippy::clone_on_copy)] - fn get_contract_instance(&self) -> &alloy_contract::ContractInstance { + type Target = alloy_contract::ContractInstance; + fn deref(&self) -> &Self::Target { self.contract_instance.get_or_init(|| { let abi_location = alloy_contract::Interface::new(alloy_json_abi::JsonAbi::from_json_str(&#abi_string).unwrap()); - alloy_contract::ContractInstance::new( - self.instance.address().clone(), - self.instance.provider().clone(), - abi_location, - ) + alloy_contract::ContractInstance::new(self.contract.address().clone(), self.contract.provider().clone(), abi_location ) }) } } - - impl Deref for #instance_wrapper_name - where - T: alloy_transport::Transport + Clone + Send + Sync + 'static, - P: alloy_provider::Provider + Clone + Send + Sync + 'static, - { - type Target = alloy_contract::ContractInstance; - - /// Dereferences the [`#instance_wrapper_name`] to its [`alloy_contract::ContractInstance`]. - fn deref(&self) -> &Self::Target { - self.get_contract_instance() - } - } - - #[automatically_derived] - #[async_trait::async_trait] - impl gadget_sdk::events_watcher::evm::EvmEventHandler for #struct_name - where - T: Clone + Send + Sync + gadget_sdk::events_watcher::evm::Config +'static, - #instance_wrapper_name : std::ops::Deref>, + #[gadget_sdk::async_trait::async_trait] + impl gadget_sdk::events_watcher::evm::EvmEventHandler for #struct_name { - type Contract = #instance_wrapper_name ; type Event = #event; const GENESIS_TX_HASH: alloy_primitives::FixedBytes<32> = alloy_primitives::FixedBytes([0; 32]); - async fn handle(&self, log: &gadget_sdk::alloy_rpc_types::Log, event: &Self::Event) -> Result<(), gadget_sdk::events_watcher::Error> { use alloy_provider::Provider; use alloy_sol_types::SolEvent; use alloy_sol_types::SolInterface; - let contract = &self.contract; - - // Convert the event to inputs let decoded: alloy_primitives::Log = ::decode_log(&log.inner, true)?; - let (_, index) = decoded.topics(); - // Convert the event to inputs using the event converter. - // TODO: If no converter is provided, the #[job] must consume the - // event directly, as specified in the `event = `. - - // let inputs = if let Some(converter) = #event_converter { - // converter(decoded.data) - // } else { - // decoded.data - // }; let inputs = #event_converter(decoded.data, index); // Apply the function #(#params_tokens)* #fn_call; - - // Call the callback with the job result - // TODO: Check if the callback is None - // if let Some(cb) = #callback { - // let call = cb(job_result); - // // Submit the transaction - // let tx = contract.provider().send_raw_transaction(call.abi_encode().as_ref()).await?; - // tx.watch().await?; - // } - let call = #callback(job_result); - // Submit the transaction - // let tx = contract.provider().send_raw_transaction(call.abi_encode().as_ref()).await?; - // tx.watch().await?; - Ok(()) } } - impl gadget_sdk::event_listener::markers::IsEvm for #struct_name {} + impl gadget_sdk::event_listener::markers::IsEvm for #struct_name {} } } diff --git a/macros/blueprint-proc-macro/src/job.rs b/macros/blueprint-proc-macro/src/job.rs index 8714dfd9a..ac3ea6ac8 100644 --- a/macros/blueprint-proc-macro/src/job.rs +++ b/macros/blueprint-proc-macro/src/job.rs @@ -66,7 +66,6 @@ pub(crate) fn job_impl(args: &JobArgs, input: &ItemFn) -> syn::Result, params: &[Ident], - result_types: &[FieldType], ) -> (Vec, Vec) { - let (event_handler_args, event_handler_arg_types) = get_event_handler_args(param_types, params); + let (mut event_handler_args, _event_handler_arg_types) = + get_event_handler_args(param_types, params); let (_, _, struct_name) = generate_fn_name_and_struct(input, suffix); let (fn_name_string, _job_def_name, job_id_name) = get_job_id_field_name(input); // Generate Event Listener, if not being skipped @@ -260,11 +259,11 @@ pub(crate) fn generate_event_workflow_tokenstream( let next_listener = if matches!(listener_meta.listener_type, ListenerType::Evm) { // How to inject not just this event handler, but all event handlers here? let wrapper = quote! { - gadget_sdk::event_listener::evm_contracts::EthereumHandlerWrapper<#autogen_struct_name, _> + gadget_sdk::event_listener::evm_contracts::EthereumHandlerWrapper<_, _, _> }; let ctx_create = quote! { - (ctx.contract.clone(), std::sync::Arc::new(ctx.clone()) as std::sync::Arc<#autogen_struct_name>) + ctx.clone() }; if event_listener_calls.is_empty() { @@ -300,17 +299,39 @@ pub(crate) fn generate_event_workflow_tokenstream( // Generate the variable that we are passing as the context into EventListener::create(&mut ctx) // We assume the first supplied event handler arg is the context we are injecting into the event listener // Then, pass that into the EventFlowWrapper - let (context, field_in_self) = event_handler_args + let fn_name_ident = &input.sig.ident; + let static_ctx_get_override = quote! { CTX.get().unwrap() }; + let mut ordered_inputs = + get_fn_call_ordered(param_types, params, Some(static_ctx_get_override)); + + let asyncness = get_asyncness(input); + let call_id_static_name = get_current_call_id_field_name(input); + + // Raw events have no pre-processor, therefore their inputs are passed directly into the job function + // and NOT as job params + let is_raw = listener_meta.is_raw(); + + // TODO: task 001: find better way to identify which ident is the raw event + // for now, we assume the raw event is always listed first + if is_raw { + let _ = event_handler_args.remove(0); + } + + let field_in_self_getter = event_handler_args .first() - .map(|ctx| (quote! {self}, (*ctx).clone())) + .map(|field_in_self| { + // If is_raw, assume the actual context is the second param + quote! { ctx. #field_in_self .clone() } + }) .expect("No context found"); let autogen_struct_name = quote! { #struct_name #type_args }; + /* let context_ty = event_handler_arg_types .first() .map(|ty| quote! {#ty}) - .unwrap_or_default(); + .unwrap_or_default();*/ if event_listener_calls.is_empty() { event_listener_calls.push(quote! { @@ -319,7 +340,7 @@ pub(crate) fn generate_event_workflow_tokenstream( } event_listener_calls.push(quote! { - listeners.push(#listener_function_name(&#context).await.expect("Event listener already initialized")); + listeners.push(#listener_function_name(&self).await.expect("Event listener already initialized")); }); let pre_processor_function = @@ -331,13 +352,6 @@ pub(crate) fn generate_event_workflow_tokenstream( }; // The job_processor is just the job function. Since it may contain multiple params, we need a new function to call it. - let fn_name_ident = &input.sig.ident; - let static_ctx_get_override = quote! { CTX.get().unwrap() }; - let ordered_inputs = - get_fn_call_ordered(param_types, params, Some(static_ctx_get_override)); - - let asyncness = get_asyncness(input); - let call_id_static_name = get_current_call_id_field_name(input); let job_processor_wrapper = if matches!( listener_meta.listener_type, ListenerType::Tangle @@ -345,15 +359,28 @@ pub(crate) fn generate_event_workflow_tokenstream( let params = declared_params_to_field_types(params, param_types) .expect("Failed to generate params"); let params_tokens = event_listeners.get_param_name_tokenstream(¶ms, true); + + let job_processor_call = if params_tokens.is_empty() { + let second_param = ordered_inputs.pop().expect("Expected a context"); + quote! { + // If no args are specified, assume this job has no parameters and thus takes in the raw event + #fn_name_ident (param0, #second_param) #asyncness .map_err(|err| gadget_sdk::Error::Other(err.to_string())) + } + } else { + quote! { + let mut args_iter = param0.args.clone().into_iter(); + #(#params_tokens)* + #fn_name_ident (#(#ordered_inputs)*) #asyncness .map_err(|err| gadget_sdk::Error::Other(err.to_string())) + } + }; + quote! { - move |event: gadget_sdk::event_listener::tangle::jobs::TangleJobEvent<#context_ty>| async move { - if let Some(call_id) = event.call_id { + move |param0: gadget_sdk::event_listener::tangle::TangleEvent<_, _>| async move { + if let Some(call_id) = param0.call_id { #call_id_static_name.store(call_id, std::sync::atomic::Ordering::Relaxed); } - let mut args_iter = event.args.clone().into_iter(); - #(#params_tokens)* - #fn_name_ident (#(#ordered_inputs)*) #asyncness .map_err(|err| gadget_sdk::Error::Other(err.to_string())) + #job_processor_call } } } else { @@ -368,16 +395,11 @@ pub(crate) fn generate_event_workflow_tokenstream( &listener_meta.post_processor { if matches!(listener_meta.listener_type, ListenerType::Tangle) { - let result_tokens = - event_listeners.get_param_result_tokenstream(result_types); quote! { |job_result| async move { let ctx = CTX.get().unwrap(); - let mut result = Vec::new(); - // TODO: Will need to decouple this from jobs - #(#result_tokens)* - let tangle_job_result = gadget_sdk::event_listener::tangle::jobs::TangleJobResult { - results: result, + let tangle_job_result = gadget_sdk::event_listener::tangle::TangleResult::<_> { + results: job_result, service_id: ctx.service_id, call_id: #call_id_static_name.load(std::sync::atomic::Ordering::Relaxed), client: ctx.client.clone(), @@ -395,22 +417,20 @@ pub(crate) fn generate_event_workflow_tokenstream( quote! { |_evt| async move { Ok(()) } } }; - let context_declaration = if matches!( - listener_meta.listener_type, - ListenerType::Tangle - ) { - quote! { - let context = gadget_sdk::event_listener::tangle::TangleListenerInput::<#context_ty> { - client: ctx.client.clone(), - signer: ctx.signer.clone(), - job_id: #job_id_name, - service_id: ctx.service_id, - context: ctx. #field_in_self .clone(), - }; - } - } else { - quote! { let context = ctx. #field_in_self .clone(); } - }; + let context_declaration = + if matches!(listener_meta.listener_type, ListenerType::Tangle) { + quote! { + let context = gadget_sdk::event_listener::tangle::TangleListenerInput { + client: ctx.client.clone(), + signer: ctx.signer.clone(), + job_id: #job_id_name, + service_id: ctx.service_id, + context: #field_in_self_getter, + }; + } + } else { + quote! { let context = #field_in_self_getter; } + }; quote! { async fn #listener_function_name #bounded_type_args(ctx: &#autogen_struct_name) -> Option>> { @@ -524,7 +544,7 @@ pub fn generate_autogen_struct( let (event_handler_args, _) = get_event_handler_args(param_types, params); let mut additional_var_indexes = vec![]; - let additional_params = event_handler_args + let mut additional_params = event_handler_args .iter() .map(|ident| { let mut ty = param_types[*ident].clone(); @@ -540,6 +560,14 @@ pub fn generate_autogen_struct( }) .collect::>(); + // TODO: task 001: find better way to identify which ident is the raw event + // for now, we assume the raw event is always listed first + if event_listener_args.get_event_listener().is_raw() { + // We don't care to add the first event to the autogen struct + let _ = additional_var_indexes.remove(0); + let _ = additional_params.remove(0); + } + let mut required_fields = vec![]; let mut type_params_bounds = proc_macro2::TokenStream::default(); let mut type_params = proc_macro2::TokenStream::default(); @@ -555,10 +583,11 @@ pub fn generate_autogen_struct( // Even if multiple evm listeners, we only need this once if event_listener_args.has_evm() { - let (_, _, instance_wrapper_name, _) = get_evm_instance_data(event_listener_args); + let (_, _, _, instance_name) = get_evm_instance_data(event_listener_args); required_fields.push(quote! { - pub contract: #instance_wrapper_name, + pub contract: #instance_name, + pub contract_instance: std::sync::OnceLock>, }); type_params = quote! { }; @@ -579,7 +608,7 @@ pub fn generate_autogen_struct( #(#additional_params)* } - #[async_trait::async_trait] + #[gadget_sdk::async_trait::async_trait] impl #type_params_bounds gadget_sdk::events_watcher::InitializableEventHandler for #struct_name #type_params { async fn init_event_handler( &self, @@ -742,15 +771,11 @@ impl Parse for JobArgs { let id = id.ok_or_else(|| input.error("Missing `id` argument in attribute"))?; - if params.is_empty() { - return Err(input.error("Missing `params` argument in attribute")); - } - - let result = result.ok_or_else(|| input.error("Missing 'result' argument in attribute"))?; + let result = result.unwrap_or(ResultsKind::Infered); if let ResultsKind::Types(ref r) = result { if r.is_empty() { - return Err(input.error("Expected at least one parameter for the `result` attribute, or `_` to infer the type")); + return Err(input.error("`result` attribute empty, expected at least one parameter, or `_` to infer the type")); } } @@ -887,6 +912,12 @@ pub(crate) struct SingleListener { pub pre_processor: Option, } +impl SingleListener { + pub fn is_raw(&self) -> bool { + self.pre_processor.is_none() && matches!(self.listener_type, ListenerType::Tangle) + } +} + /// Extracts a value from form: "tag = value" fn extract_x_equals_y( content: &ParseBuffer, @@ -1028,60 +1059,6 @@ impl EventListenerArgs { &self.listeners[0] } - pub fn get_param_result_tokenstream( - &self, - fields: &[FieldType], - ) -> Vec { - let event_listener = self.get_event_listener(); - if fields.len() == 1 { - let ident = format_ident!("job_result"); - match event_listener.listener_type { - ListenerType::Evm => { - vec![quote! { let #ident = job_result; }] - } - - ListenerType::Tangle => { - vec![crate::tangle::field_type_to_result_token( - &ident, &fields[0], - )] - } - - ListenerType::Custom => { - vec![quote! { let #ident = job_result; }] - } - } - } else { - fields - .iter() - .enumerate() - .map(|(i, t)| { - let ident = format_ident!("result_{i}"); - match event_listener.listener_type { - ListenerType::Evm => { - quote! { - let #ident = job_result[#i]; - } - } - - ListenerType::Tangle => { - let s = crate::tangle::field_type_to_result_token(&ident, t); - quote! { - let #ident = job_result[#i]; - #s - } - } - - ListenerType::Custom => { - quote! { - let #ident = job_result[#i]; - } - } - } - }) - .collect::>() - } - } - pub fn get_param_name_tokenstream( &self, params: &[FieldType], diff --git a/macros/blueprint-proc-macro/src/lib.rs b/macros/blueprint-proc-macro/src/lib.rs index 6579dacb8..64a56a963 100644 --- a/macros/blueprint-proc-macro/src/lib.rs +++ b/macros/blueprint-proc-macro/src/lib.rs @@ -123,7 +123,7 @@ pub fn load_abi(input: TokenStream) -> TokenStream { /// A procedural macro that annotates a function as a main function for the blueprint. /// -/// ```rust,no_run +/// ```ignore /// # use gadget_sdk as sdk; /// #[sdk::main(env)] /// pub async fn main() { diff --git a/macros/blueprint-proc-macro/src/report.rs b/macros/blueprint-proc-macro/src/report.rs index b587ab1a9..a196b4ad8 100644 --- a/macros/blueprint-proc-macro/src/report.rs +++ b/macros/blueprint-proc-macro/src/report.rs @@ -121,7 +121,6 @@ pub(crate) fn report_impl(args: &ReportArgs, input: &ItemFn) -> syn::Result { @@ -374,10 +372,6 @@ fn generate_qos_report_event_handler( let fn_name_string = fn_name.to_string(); const SUFFIX: &str = "QoSReportEventHandler"; - let syn::ReturnType::Type(_, result) = &input.sig.output else { - panic!("Report function must have a return type of Result where T is a tuple of the result fields"); - }; - let struct_name = format_ident!("{}{SUFFIX}", pascal_case(&fn_name_string)); let job_id = quote! { 0 }; // We don't care about job ID's for QOS let param_types = @@ -385,8 +379,6 @@ fn generate_qos_report_event_handler( // TODO: Allow passing all events, use a dummy value here that satisfies the trait bounds. For now QOS will // trigger only once a singular JobCalled event is received. let event_type = quote! { gadget_sdk::tangle_subxt::tangle_testnet_runtime::api::services::events::JobResultSubmitted }; - let result_type = crate::job::declared_result_type_to_field_types(&args.result, result) - .expect("Failed to generate result types for job report"); let (event_listener_gen, event_listener_calls) = crate::job::generate_event_workflow_tokenstream( @@ -397,7 +389,6 @@ fn generate_qos_report_event_handler( args.skip_codegen, ¶m_types, &args.params, - &result_type, ); let interval = args @@ -412,7 +403,7 @@ fn generate_qos_report_event_handler( #(#event_listener_gen)* #[automatically_derived] - #[async_trait::async_trait] + #[gadget_sdk::async_trait::async_trait] impl gadget_sdk::events_watcher::substrate::EventHandler for #struct_name { async fn handle(&self, event: &#event_type) -> Result>, gadget_sdk::events_watcher::Error> { use std::time::Duration; @@ -452,7 +443,7 @@ fn generate_qos_report_event_handler( } } - #[async_trait::async_trait] + #[gadget_sdk::async_trait::async_trait] impl gadget_sdk::events_watcher::InitializableEventHandler for #struct_name { async fn init_event_handler(&self) -> Option>> { #(#event_listener_calls)* diff --git a/macros/blueprint-proc-macro/src/tangle/mod.rs b/macros/blueprint-proc-macro/src/tangle/mod.rs index 73ba526b8..77e963373 100644 --- a/macros/blueprint-proc-macro/src/tangle/mod.rs +++ b/macros/blueprint-proc-macro/src/tangle/mod.rs @@ -145,158 +145,3 @@ pub fn field_type_to_param_token( } } } - -pub fn field_type_to_result_token(ident: &Ident, t: &FieldType) -> proc_macro2::TokenStream { - match t { - FieldType::Void => quote! {}, - FieldType::Bool => { - quote! { result.push(gadget_sdk::tangle_subxt::tangle_testnet_runtime::api::runtime_types::tangle_primitives::services::field::Field::Bool(#ident)); } - } - FieldType::Uint8 => { - quote! { result.push(gadget_sdk::tangle_subxt::tangle_testnet_runtime::api::runtime_types::tangle_primitives::services::field::Field::Uint8(#ident)); } - } - FieldType::Int8 => { - quote! { result.push(gadget_sdk::tangle_subxt::tangle_testnet_runtime::api::runtime_types::tangle_primitives::services::field::Field::Int8(#ident)); } - } - FieldType::Uint16 => { - quote! { result.push(gadget_sdk::tangle_subxt::tangle_testnet_runtime::api::runtime_types::tangle_primitives::services::field::Field::Uint16(#ident)); } - } - FieldType::Int16 => { - quote! { result.push(gadget_sdk::tangle_subxt::tangle_testnet_runtime::api::runtime_types::tangle_primitives::services::field::Field::Int16(#ident)); } - } - FieldType::Uint32 => { - quote! { result.push(gadget_sdk::tangle_subxt::tangle_testnet_runtime::api::runtime_types::tangle_primitives::services::field::Field::Uint32(#ident)); } - } - FieldType::Int32 => { - quote! { result.push(gadget_sdk::tangle_subxt::tangle_testnet_runtime::api::runtime_types::tangle_primitives::services::field::Field::Int32(#ident)); } - } - FieldType::Uint64 => { - quote! { result.push(gadget_sdk::tangle_subxt::tangle_testnet_runtime::api::runtime_types::tangle_primitives::services::field::Field::Uint64(#ident)); } - } - FieldType::Int64 => { - quote! { result.push(gadget_sdk::tangle_subxt::tangle_testnet_runtime::api::runtime_types::tangle_primitives::services::field::Field::Int64(#ident)); } - } - FieldType::Uint128 => { - quote! { result.push(gadget_sdk::tangle_subxt::tangle_testnet_runtime::api::runtime_types::tangle_primitives::services::field::Field::Uint128(#ident)); } - } - FieldType::U256 => { - quote! { result.push(gadget_sdk::tangle_subxt::tangle_testnet_runtime::api::runtime_types::tangle_primitives::services::field::Field::U256(#ident)); } - } - FieldType::Int128 => { - quote! { result.push(gadget_sdk::tangle_subxt::tangle_testnet_runtime::api::runtime_types::tangle_primitives::services::field::Field::Int128(#ident)); } - } - FieldType::Float64 => { - quote! { result.push(gadget_sdk::tangle_subxt::tangle_testnet_runtime::api::runtime_types::tangle_primitives::services::field::Field::Float64(#ident)); } - } - FieldType::String => { - quote! { result.push(gadget_sdk::tangle_subxt::tangle_testnet_runtime::api::runtime_types::tangle_primitives::services::field::Field::String(gadget_sdk::tangle_subxt::tangle_testnet_runtime::api::runtime_types::tangle_primitives::services::field::BoundedString(gadget_sdk::tangle_subxt::tangle_testnet_runtime::api::runtime_types::bounded_collections::bounded_vec::BoundedVec(#ident.into_bytes())))); } - } - FieldType::Bytes => { - quote! { result.push(gadget_sdk::tangle_subxt::tangle_testnet_runtime::api::runtime_types::tangle_primitives::services::field::Field::Bytes(gadget_sdk::tangle_subxt::tangle_testnet_runtime::api::runtime_types::bounded_collections::bounded_vec::BoundedVec(#ident))); } - } - FieldType::Optional(t_x) => { - let v_ident = format_ident!("v"); - let tokens = field_type_to_result_token(&v_ident, t_x); - quote! { - match #ident { - Some(v) => #tokens, - None => result.push(gadget_sdk::tangle_subxt::tangle_testnet_runtime::api::runtime_types::tangle_primitives::services::field::Field::None), - } - } - } - FieldType::Array(_, _) => todo!("Handle array"), - FieldType::List(t_x) => { - let inner_ident = format_ident!("{}_inner", ident); - let field = match **t_x { - FieldType::Void => unreachable!(), - FieldType::Bool => { - quote! { gadget_sdk::tangle_subxt::tangle_testnet_runtime::api::runtime_types::tangle_primitives::services::field::Field::Bool(item) } - } - FieldType::Uint8 => { - quote! { gadget_sdk::tangle_subxt::tangle_testnet_runtime::api::runtime_types::tangle_primitives::services::field::Field::Uint8(item) } - } - FieldType::Int8 => { - quote! { gadget_sdk::tangle_subxt::tangle_testnet_runtime::api::runtime_types::tangle_primitives::services::field::Field::Int8(item) } - } - FieldType::Uint16 => { - quote! { gadget_sdk::tangle_subxt::tangle_testnet_runtime::api::runtime_types::tangle_primitives::services::field::Field::Uint16(item) } - } - FieldType::Int16 => { - quote! { gadget_sdk::tangle_subxt::tangle_testnet_runtime::api::runtime_types::tangle_primitives::services::field::Field::Int16(item) } - } - FieldType::Uint32 => { - quote! { gadget_sdk::tangle_subxt::tangle_testnet_runtime::api::runtime_types::tangle_primitives::services::field::Field::Uint32(item) } - } - FieldType::Int32 => { - quote! { gadget_sdk::tangle_subxt::tangle_testnet_runtime::api::runtime_types::tangle_primitives::services::field::Field::Int32(item) } - } - FieldType::Uint64 => { - quote! { gadget_sdk::tangle_subxt::tangle_testnet_runtime::api::runtime_types::tangle_primitives::services::field::Field::Uint64(item) } - } - FieldType::Int64 => { - quote! { gadget_sdk::tangle_subxt::tangle_testnet_runtime::api::runtime_types::tangle_primitives::services::field::Field::Int64(item) } - } - FieldType::Uint128 => { - quote! { gadget_sdk::tangle_subxt::tangle_testnet_runtime::api::runtime_types::tangle_primitives::services::field::Field::Uint128(item) } - } - FieldType::Int128 => { - quote! { gadget_sdk::tangle_subxt::tangle_testnet_runtime::api::runtime_types::tangle_primitives::services::field::Field::Int128(item) } - } - FieldType::U256 => { - quote! { gadget_sdk::tangle_subxt::tangle_testnet_runtime::api::runtime_types::tangle_primitives::services::field::Field::U256(item) } - } - FieldType::Float64 => { - quote! { gadget_sdk::tangle_subxt::tangle_testnet_runtime::api::runtime_types::tangle_primitives::services::field::Field::Float64(item) } - } - FieldType::String => { - quote! { gadget_sdk::tangle_subxt::tangle_testnet_runtime::api::runtime_types::tangle_primitives::services::field::Field::String(gadget_sdk::tangle_subxt::tangle_testnet_runtime::api::runtime_types::tangle_primitives::services::field::BoundedString(gadget_sdk::tangle_subxt::tangle_testnet_runtime::api::runtime_types::bounded_collections::bounded_vec::BoundedVec(item.into_bytes()))) } - } - FieldType::Bytes => { - quote! { gadget_sdk::tangle_subxt::tangle_testnet_runtime::api::runtime_types::tangle_primitives::services::field::Field::Bytes(gadget_sdk::tangle_subxt::tangle_testnet_runtime::api::runtime_types::bounded_collections::bounded_vec::BoundedVec(item)) } - } - FieldType::Optional(_) => todo!("handle optionals into lists"), - FieldType::Array(_, _) => todo!("handle arrays into lists"), - FieldType::List(_) => todo!("handle nested lists"), - FieldType::Struct(_, _) => todo!("handle nested structs"), - FieldType::AccountId => { - quote! { gadget_sdk::tangle_subxt::tangle_testnet_runtime::api::runtime_types::tangle_primitives::services::field::Field::AccountId(item) } - } - }; - let inner = quote! { - let #inner_ident = #ident.into_iter().map(|item| #field).collect::>(); - }; - - quote! { - #inner - result.push(gadget_sdk::tangle_subxt::tangle_testnet_runtime::api::runtime_types::tangle_primitives::services::field::Field::List(gadget_sdk::tangle_subxt::tangle_testnet_runtime::api::runtime_types::bounded_collections::bounded_vec::BoundedVec(#inner_ident))); - } - } - FieldType::Struct(name, fields) => { - let field_tokens: Vec<_> = fields - .iter() - .map(|(field_name, field_type)| { - let field_ident = format_ident!("{}", field_name); - let inner_ident = format_ident!("{}_{}", ident, field_name); - let inner_token = field_type_to_result_token(&inner_ident, field_type); - quote! { - let #inner_ident = #ident.#field_ident; - let field_name = gadget_sdk::tangle_subxt::tangle_testnet_runtime::api::runtime_types::tangle_primitives::services::field::BoundedString::::from(#field_name); - let field_value = Box::new(#inner_ident); - fields_vec.push((field_name, field_value)); - #inner_token - } - }) - .collect(); - - quote! { - #(#field_tokens)* - let struct_name = gadget_sdk::tangle_subxt::tangle_testnet_runtime::api::runtime_types::tangle_primitives::services::field::BoundedString::::from(#name); - let fields_vec = vec![#(#field_tokens),*]; - result.push(gadget_sdk::tangle_subxt::tangle_testnet_runtime::api::runtime_types::tangle_primitives::services::field::Field::Struct(struct_name, gadget_sdk::tangle_subxt::tangle_testnet_runtime::api::runtime_types::bounded_collections::bounded_vec::BoundedVec(fields_vec))); - } - } - FieldType::AccountId => { - quote! { result.push(gadget_sdk::tangle_subxt::tangle_testnet_runtime::api::runtime_types::tangle_primitives::services::field::Field::AccountId(#ident)); } - } - } -} diff --git a/macros/blueprint-proc-macro/tests/invalid_cases/job/01_missing_params.stderr b/macros/blueprint-proc-macro/tests/invalid_cases/job/01_missing_params.stderr deleted file mode 100644 index 84f4bd2de..000000000 --- a/macros/blueprint-proc-macro/tests/invalid_cases/job/01_missing_params.stderr +++ /dev/null @@ -1,7 +0,0 @@ -error: unexpected end of input, Missing `params` argument in attribute - --> tests/invalid_cases/job/01_missing_params.rs:3:1 - | -3 | #[job(id = 0, result(Vec))] - | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ - | - = note: this error originates in the attribute macro `job` (in Nightly builds, run with -Z macro-backtrace for more info) diff --git a/macros/blueprint-proc-macro/tests/invalid_cases/job/01_missing_params.rs b/macros/blueprint-proc-macro/tests/invalid_cases/job/01_non_result_return_type.rs similarity index 100% rename from macros/blueprint-proc-macro/tests/invalid_cases/job/01_missing_params.rs rename to macros/blueprint-proc-macro/tests/invalid_cases/job/01_non_result_return_type.rs diff --git a/macros/blueprint-proc-macro/tests/invalid_cases/job/01_non_result_return_type.stderr b/macros/blueprint-proc-macro/tests/invalid_cases/job/01_non_result_return_type.stderr new file mode 100644 index 000000000..7611780d7 --- /dev/null +++ b/macros/blueprint-proc-macro/tests/invalid_cases/job/01_non_result_return_type.stderr @@ -0,0 +1,5 @@ +error: Function must have a return type of Result where T is a tuple of the result fields + --> tests/invalid_cases/job/01_non_result_return_type.rs:4:22 + | +4 | fn keygen(n: u16) -> Vec { + | ^^^^^^^ diff --git a/macros/blueprint-proc-macro/tests/invalid_cases/job/03_invalid_params_syntax.rs b/macros/blueprint-proc-macro/tests/invalid_cases/job/02_invalid_params_syntax.rs similarity index 100% rename from macros/blueprint-proc-macro/tests/invalid_cases/job/03_invalid_params_syntax.rs rename to macros/blueprint-proc-macro/tests/invalid_cases/job/02_invalid_params_syntax.rs diff --git a/macros/blueprint-proc-macro/tests/invalid_cases/job/03_invalid_params_syntax.stderr b/macros/blueprint-proc-macro/tests/invalid_cases/job/02_invalid_params_syntax.stderr similarity index 62% rename from macros/blueprint-proc-macro/tests/invalid_cases/job/03_invalid_params_syntax.stderr rename to macros/blueprint-proc-macro/tests/invalid_cases/job/02_invalid_params_syntax.stderr index c29f1baeb..ea81c6345 100644 --- a/macros/blueprint-proc-macro/tests/invalid_cases/job/03_invalid_params_syntax.stderr +++ b/macros/blueprint-proc-macro/tests/invalid_cases/job/02_invalid_params_syntax.stderr @@ -1,5 +1,5 @@ error: expected `,` - --> tests/invalid_cases/job/03_invalid_params_syntax.rs:3:24 + --> tests/invalid_cases/job/02_invalid_params_syntax.rs:3:24 | 3 | #[job(id = 0, params(n t), result(Vec))] | ^ diff --git a/macros/blueprint-proc-macro/tests/invalid_cases/job/02_missing_result.rs b/macros/blueprint-proc-macro/tests/invalid_cases/job/02_missing_result.rs deleted file mode 100644 index a2314b0fa..000000000 --- a/macros/blueprint-proc-macro/tests/invalid_cases/job/02_missing_result.rs +++ /dev/null @@ -1,8 +0,0 @@ -use gadget_blueprint_proc_macro::job; - -#[job(id = 0, params(n))] -fn keygen(n: u16) -> Vec { - Vec::new() -} - -fn main() {} diff --git a/macros/blueprint-proc-macro/tests/invalid_cases/job/02_missing_result.stderr b/macros/blueprint-proc-macro/tests/invalid_cases/job/02_missing_result.stderr deleted file mode 100644 index 1d743d179..000000000 --- a/macros/blueprint-proc-macro/tests/invalid_cases/job/02_missing_result.stderr +++ /dev/null @@ -1,7 +0,0 @@ -error: unexpected end of input, Missing 'result' argument in attribute - --> tests/invalid_cases/job/02_missing_result.rs:3:1 - | -3 | #[job(id = 0, params(n))] - | ^^^^^^^^^^^^^^^^^^^^^^^^^ - | - = note: this error originates in the attribute macro `job` (in Nightly builds, run with -Z macro-backtrace for more info) diff --git a/macros/blueprint-proc-macro/tests/invalid_cases/job/04_invalid_result_syntax.rs b/macros/blueprint-proc-macro/tests/invalid_cases/job/03_invalid_result_syntax.rs similarity index 100% rename from macros/blueprint-proc-macro/tests/invalid_cases/job/04_invalid_result_syntax.rs rename to macros/blueprint-proc-macro/tests/invalid_cases/job/03_invalid_result_syntax.rs diff --git a/macros/blueprint-proc-macro/tests/invalid_cases/job/04_invalid_result_syntax.stderr b/macros/blueprint-proc-macro/tests/invalid_cases/job/03_invalid_result_syntax.stderr similarity index 67% rename from macros/blueprint-proc-macro/tests/invalid_cases/job/04_invalid_result_syntax.stderr rename to macros/blueprint-proc-macro/tests/invalid_cases/job/03_invalid_result_syntax.stderr index 68c404b26..6f0630970 100644 --- a/macros/blueprint-proc-macro/tests/invalid_cases/job/04_invalid_result_syntax.stderr +++ b/macros/blueprint-proc-macro/tests/invalid_cases/job/03_invalid_result_syntax.stderr @@ -1,5 +1,5 @@ error: expected `,` - --> tests/invalid_cases/job/04_invalid_result_syntax.rs:3:41 + --> tests/invalid_cases/job/03_invalid_result_syntax.rs:3:41 | 3 | #[job(id = 0, params(n), result(Vec String))] | ^^^^^^ diff --git a/macros/blueprint-proc-macro/tests/invalid_cases/job/05_function_without_return.rs b/macros/blueprint-proc-macro/tests/invalid_cases/job/04_function_without_return.rs similarity index 100% rename from macros/blueprint-proc-macro/tests/invalid_cases/job/05_function_without_return.rs rename to macros/blueprint-proc-macro/tests/invalid_cases/job/04_function_without_return.rs diff --git a/macros/blueprint-proc-macro/tests/invalid_cases/job/05_function_without_return.stderr b/macros/blueprint-proc-macro/tests/invalid_cases/job/04_function_without_return.stderr similarity index 83% rename from macros/blueprint-proc-macro/tests/invalid_cases/job/05_function_without_return.stderr rename to macros/blueprint-proc-macro/tests/invalid_cases/job/04_function_without_return.stderr index dcede4e41..68d581718 100644 --- a/macros/blueprint-proc-macro/tests/invalid_cases/job/05_function_without_return.stderr +++ b/macros/blueprint-proc-macro/tests/invalid_cases/job/04_function_without_return.stderr @@ -1,5 +1,5 @@ error: Function must have a return type of Result where T is a tuple of the result fields - --> tests/invalid_cases/job/05_function_without_return.rs:3:1 + --> tests/invalid_cases/job/04_function_without_return.rs:3:1 | 3 | #[job(id = 0, params(n), result(_))] | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ diff --git a/macros/blueprint-proc-macro/tests/invalid_cases/job/06_undefined_param.rs b/macros/blueprint-proc-macro/tests/invalid_cases/job/05_undefined_param.rs similarity index 100% rename from macros/blueprint-proc-macro/tests/invalid_cases/job/06_undefined_param.rs rename to macros/blueprint-proc-macro/tests/invalid_cases/job/05_undefined_param.rs diff --git a/macros/blueprint-proc-macro/tests/invalid_cases/job/06_undefined_param.stderr b/macros/blueprint-proc-macro/tests/invalid_cases/job/05_undefined_param.stderr similarity index 69% rename from macros/blueprint-proc-macro/tests/invalid_cases/job/06_undefined_param.stderr rename to macros/blueprint-proc-macro/tests/invalid_cases/job/05_undefined_param.stderr index d8dce6b8b..5d6e0fa79 100644 --- a/macros/blueprint-proc-macro/tests/invalid_cases/job/06_undefined_param.stderr +++ b/macros/blueprint-proc-macro/tests/invalid_cases/job/05_undefined_param.stderr @@ -1,5 +1,5 @@ error: parameter not declared in the function - --> tests/invalid_cases/job/06_undefined_param.rs:3:25 + --> tests/invalid_cases/job/05_undefined_param.rs:3:25 | 3 | #[job(id = 0, params(n, t), result(Vec))] | ^ diff --git a/macros/blueprint-proc-macro/tests/invalid_cases/job/07_missing_id.rs b/macros/blueprint-proc-macro/tests/invalid_cases/job/06_missing_id.rs similarity index 100% rename from macros/blueprint-proc-macro/tests/invalid_cases/job/07_missing_id.rs rename to macros/blueprint-proc-macro/tests/invalid_cases/job/06_missing_id.rs diff --git a/macros/blueprint-proc-macro/tests/invalid_cases/job/07_missing_id.stderr b/macros/blueprint-proc-macro/tests/invalid_cases/job/06_missing_id.stderr similarity index 84% rename from macros/blueprint-proc-macro/tests/invalid_cases/job/07_missing_id.stderr rename to macros/blueprint-proc-macro/tests/invalid_cases/job/06_missing_id.stderr index 50518b3cf..832d673a7 100644 --- a/macros/blueprint-proc-macro/tests/invalid_cases/job/07_missing_id.stderr +++ b/macros/blueprint-proc-macro/tests/invalid_cases/job/06_missing_id.stderr @@ -1,5 +1,5 @@ error: unexpected end of input, Missing `id` argument in attribute - --> tests/invalid_cases/job/07_missing_id.rs:3:1 + --> tests/invalid_cases/job/06_missing_id.rs:3:1 | 3 | #[job(params(n), result(Vec))] | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ diff --git a/macros/blueprint-proc-macro/tests/valid_cases/job/04_mixed_types.rs b/macros/blueprint-proc-macro/tests/valid_cases/job/04_mixed_types.rs index f9b1448ad..b7aecb635 100644 --- a/macros/blueprint-proc-macro/tests/valid_cases/job/04_mixed_types.rs +++ b/macros/blueprint-proc-macro/tests/valid_cases/job/04_mixed_types.rs @@ -1,7 +1,7 @@ use gadget_blueprint_proc_macro::job; use std::convert::Infallible; -#[job(id = 0, params(n, s), result(_))] +#[job(id = 0, params(n, s))] fn keygen(n: u16, s: String) -> Result, Infallible> { let _ = (n, s); Ok(Vec::new()) diff --git a/macros/blueprint-proc-macro/tests/valid_cases/job/06_infer_result.rs b/macros/blueprint-proc-macro/tests/valid_cases/job/06_infer_result.rs index 7666be25b..4748149a4 100644 --- a/macros/blueprint-proc-macro/tests/valid_cases/job/06_infer_result.rs +++ b/macros/blueprint-proc-macro/tests/valid_cases/job/06_infer_result.rs @@ -1,7 +1,7 @@ use gadget_blueprint_proc_macro::job; use std::convert::Infallible; -#[job(id = 0, params(n), result(_))] +#[job(id = 0, params(n))] fn keygen(n: u16) -> Result, Infallible> { let _ = n; Ok(Vec::new()) diff --git a/macros/context-derive/src/services.rs b/macros/context-derive/src/services.rs index 999de1dcf..2a5c44a91 100644 --- a/macros/context-derive/src/services.rs +++ b/macros/context-derive/src/services.rs @@ -22,7 +22,6 @@ pub fn generate_context_impl( quote! { impl #impl_generics gadget_sdk::ctx::ServicesContext for #name #ty_generics #where_clause { type Config = gadget_sdk::ext::subxt::PolkadotConfig; - /// Get the current blueprint information from the context. fn current_blueprint( &self, client: &gadget_sdk::ext::subxt::OnlineClient, @@ -52,7 +51,6 @@ pub fn generate_context_impl( } } - /// Query the current blueprint owner from the context. fn current_blueprint_owner( &self, client: &gadget_sdk::ext::subxt::OnlineClient, @@ -76,9 +74,6 @@ pub fn generate_context_impl( } } - /// Get the current service operators from the context. - /// This function will return a list of service operators that are selected to run this service - /// instance. fn current_service_operators( &self, client: &gadget_sdk::ext::subxt::OnlineClient, diff --git a/macros/context-derive/tests/ui/01_basic.rs b/macros/context-derive/tests/ui/01_basic.rs index 13cfd6b2a..d73d7a0d7 100644 --- a/macros/context-derive/tests/ui/01_basic.rs +++ b/macros/context-derive/tests/ui/01_basic.rs @@ -20,6 +20,5 @@ fn main() { let _services = ctx.current_service_operators(&tangle_client).await.unwrap(); }; - // Run the async block let _ = body; } diff --git a/macros/context-derive/tests/ui/02_unnamed_fields.rs b/macros/context-derive/tests/ui/02_unnamed_fields.rs index 55e4bc7b8..14c75cb39 100644 --- a/macros/context-derive/tests/ui/02_unnamed_fields.rs +++ b/macros/context-derive/tests/ui/02_unnamed_fields.rs @@ -12,6 +12,5 @@ fn main() { let tangle_client = ctx.tangle_client().await.unwrap(); let _services = ctx.current_service_operators(&tangle_client).await; }; - // Run the async block let _ = body; } diff --git a/macros/context-derive/tests/ui/03_generic_struct.rs b/macros/context-derive/tests/ui/03_generic_struct.rs index 0de0d29bd..e7c8c1768 100644 --- a/macros/context-derive/tests/ui/03_generic_struct.rs +++ b/macros/context-derive/tests/ui/03_generic_struct.rs @@ -22,6 +22,5 @@ fn main() { let _services = ctx.current_service_operators(&tangle_client).await.unwrap(); }; - // Run the async block let _ = body; } diff --git a/macros/playground/src/lib.rs b/macros/playground/src/lib.rs index 656497643..0a7567f4e 100644 --- a/macros/playground/src/lib.rs +++ b/macros/playground/src/lib.rs @@ -34,7 +34,7 @@ pub struct MyContext; // ================== /// Simple Threshold (t) Keygen Job for n parties. -#[job(id = 0, params(n, t), event_listener(listener = TangleEventListener, pre_processor = services_pre_processor), result(_))] +#[job(id = 0, params(n, t), event_listener(listener = TangleEventListener, pre_processor = services_pre_processor))] pub fn keygen(context: TangleClient, n: u16, t: u16) -> Result, Error> { let _ = (n, t, context); Ok(vec![0; 33]) @@ -44,8 +44,7 @@ pub fn keygen(context: TangleClient, n: u16, t: u16) -> Result, Error> { #[job( id = 1, params(keygen_id, data), - event_listener(listener = TangleEventListener, pre_processor = services_pre_processor), - result(_) + event_listener(listener = TangleEventListener, pre_processor = services_pre_processor), )] pub async fn sign(context: TangleClient, keygen_id: u64, data: Vec) -> Result, Error> { let _ = (keygen_id, data); @@ -56,11 +55,10 @@ pub async fn sign(context: TangleClient, keygen_id: u64, data: Vec) -> Resul id = 2, params(keygen_id, new_t), event_listener( - listener = TangleEventListener, + listener = TangleEventListener, pre_processor = services_pre_processor, post_processor = services_post_processor, ), - result(_) )] pub fn refresh( context: TangleClient, @@ -74,11 +72,10 @@ pub fn refresh( /// Say hello to someone or the world. #[job(id = 3, params(who), event_listener( - listener = TangleEventListener, + listener = TangleEventListener, pre_processor = services_pre_processor, post_processor = services_post_processor, - ), - result(_))] + ))] pub fn say_hello(context: TangleClient, who: Option) -> Result { match who { Some(who) => Ok(format!("Hello, {}!", who)), @@ -105,11 +102,11 @@ pub fn on_request(nft_id: u64); job_id = 0, params(n, t, msgs), event_listener( - listener = TangleEventListener, + listener = TangleEventListener, pre_processor = services_pre_processor, post_processor = services_post_processor, ), - result(_), + report_type = "job", verifier(evm = "KeygenContract") )] @@ -125,8 +122,8 @@ fn report_keygen( #[report( params(uptime, response_time, error_rate), - event_listener(listener = TangleEventListener, pre_processor = services_pre_processor,), - result(_), + event_listener(listener = TangleEventListener, pre_processor = services_pre_processor,), + report_type = "qos", interval = 3600, metric_thresholds(uptime = 99, response_time = 1000, error_rate = 5) @@ -234,7 +231,7 @@ mod tests { #[job( id = 0, params(value), - result(_), + event_listener( listener = PeriodicEventListener<1500, WebPoller, serde_json::Value, Arc>, pre_processor = pre_process, diff --git a/sdk/Cargo.toml b/sdk/Cargo.toml index 85340b523..805a03273 100644 --- a/sdk/Cargo.toml +++ b/sdk/Cargo.toml @@ -10,6 +10,7 @@ license.workspace = true [dependencies] +itertools = { workspace = true } auto_impl = { workspace = true } bollard = { workspace = true } elliptic-curve = { workspace = true, features = ["alloc", "sec1"] } diff --git a/sdk/src/config/gadget_config.rs b/sdk/src/config/gadget_config.rs index 208811401..03a8487ae 100644 --- a/sdk/src/config/gadget_config.rs +++ b/sdk/src/config/gadget_config.rs @@ -4,6 +4,7 @@ use crate::keystore::backend::GenericKeyStore; use crate::keystore::BackendExt; #[cfg(any(feature = "std", feature = "wasm"))] use crate::keystore::TanglePairSigner; +use crate::utils::get_client; use alloc::string::{String, ToString}; use core::fmt::Debug; use core::net::IpAddr; @@ -203,11 +204,15 @@ impl GadgetConfiguration { /// This function will return an error if we are unable to connect to the Tangle RPC endpoint. #[cfg(any(feature = "std", feature = "wasm"))] pub async fn client(&self) -> Result { - let client = - subxt::OnlineClient::::from_url( - self.ws_rpc_endpoint.clone(), - ) - .await?; - Ok(client) + get_client(&self.ws_rpc_endpoint, &self.http_rpc_endpoint) + .await + .map_err(|err| Error::BadRpcConnection(err.to_string())) + } + + /// Only relevant if this is a Tangle protocol. + pub fn service_id(&self) -> Option { + let tangle_settings = self.protocol_specific.tangle().ok()?; + let TangleInstanceSettings { service_id, .. } = tangle_settings; + Some(*service_id) } } diff --git a/sdk/src/config/mod.rs b/sdk/src/config/mod.rs index 5e09b701b..454473e0f 100644 --- a/sdk/src/config/mod.rs +++ b/sdk/src/config/mod.rs @@ -66,6 +66,8 @@ pub enum Error { /// Missing `SymbioticContractAddresses` #[error("Missing EigenlayerContractAddresses")] MissingSymbioticContractAddresses, + #[error("Bad RPC Connection: {0}")] + BadRpcConnection(String), } /// Loads the [`GadgetConfiguration`] from the current environment. diff --git a/sdk/src/ctx.rs b/sdk/src/ctx.rs index cea4985ea..dc7582a4b 100644 --- a/sdk/src/ctx.rs +++ b/sdk/src/ctx.rs @@ -21,7 +21,7 @@ //! } //! // By deriving KeystoreContext, you can now access the keystore client from your struct. //! -//! #[job(id = 0, params(who), result(_))] +//! #[job(id = 0, params(who))] //! async fn my_job(ctx: &MyContext, who: String) -> Result { //! // Access the keystore client from the context. //! let keystore = ctx.keystore(); diff --git a/sdk/src/event_listener/evm_contracts.rs b/sdk/src/event_listener/evm_contracts.rs index 646929e4c..de7116af2 100644 --- a/sdk/src/event_listener/evm_contracts.rs +++ b/sdk/src/event_listener/evm_contracts.rs @@ -3,37 +3,72 @@ use crate::event_listener::get_exponential_backoff; use crate::events_watcher::evm::{Config as ConfigT, EvmEventHandler}; use crate::store::LocalDatabase; use crate::{error, Error}; -use alloy_contract::Event; +use alloy_contract::{ContractInstance, Event}; +use alloy_network::Ethereum; use alloy_provider::Provider; use alloy_rpc_types::{BlockNumberOrTag, Filter}; use alloy_sol_types::SolEvent; -use std::sync::Arc; use tokio_retry::Retry; use tracing::{info, warn}; use uuid::Uuid; -pub struct EthereumHandlerWrapper, C: ConfigT> { - handler: Arc, - contract: W::Contract, +pub trait EthereumContractBound: + Clone + + Send + + Sync + + crate::events_watcher::evm::Config< + TH: alloy_transport::Transport + Clone + Send + Sync + 'static, + PH: alloy_provider::Provider<::TH> + + Clone + + Send + + Sync + + 'static, + > + 'static +{ +} + +// Impl EthereumContractBound for any T satisfying the bounds +impl EthereumContractBound for T where + T: Clone + + Send + + Sync + + crate::events_watcher::evm::Config< + TH: alloy_transport::Transport + Clone + Send + Sync + 'static, + PH: alloy_provider::Provider<::TH> + + Clone + + Send + + Sync + + 'static, + > + 'static +{ +} + +pub struct EthereumHandlerWrapper, T: EthereumContractBound, C: ConfigT> { + instance: W, chain_id: u64, local_db: LocalDatabase, - _phantom: std::marker::PhantomData, + _phantom: std::marker::PhantomData<(C, T)>, +} + +pub trait EvmContractInstance { + fn get_instance(&self) -> &ContractInstance; } -pub type EvmWatcherWrapperContext = (>::Contract, Arc); +pub type EvmWatcherWrapperContext = W; #[async_trait::async_trait] -impl> - EventListener< - Vec<(Watcher::Event, alloy_rpc_types::Log)>, - EvmWatcherWrapperContext, - > for EthereumHandlerWrapper +impl< + Config: ConfigT, + T: EthereumContractBound, + Watcher: Clone + EvmEventHandler + EvmContractInstance, + > EventListener, EvmWatcherWrapperContext> + for EthereumHandlerWrapper { - async fn new(context: &EvmWatcherWrapperContext) -> Result + async fn new(context: &EvmWatcherWrapperContext) -> Result where Self: Sized, { - let provider = context.0.provider().root(); + let provider = context.get_instance().provider().root(); // Add more detailed error handling and logging let chain_id = provider .get_chain_id() @@ -44,16 +79,16 @@ impl> Ok(Self { chain_id, local_db, - handler: context.1.clone(), - contract: context.0.clone(), + instance: context.clone(), _phantom: std::marker::PhantomData, }) } async fn next_event(&mut self) -> Option> { - let contract = &self.contract; + let contract = &self.instance; let step = 100; let target_block_number: u64 = contract + .get_instance() .provider() .get_block_number() .await @@ -62,7 +97,10 @@ impl> // Get the latest block number let block = self .local_db - .get(&format!("LAST_BLOCK_NUMBER_{}", contract.address())) + .get(&format!( + "LAST_BLOCK_NUMBER_{}", + contract.get_instance().address() + )) .unwrap_or(0); let should_cooldown = block >= target_block_number; @@ -73,22 +111,22 @@ impl> let dest_block = core::cmp::min(block + step, target_block_number); // Query events - let events_filter = Event::new(contract.provider(), Filter::new()) - .address(*contract.address()) + let events_filter = Event::new(contract.get_instance().provider(), Filter::new()) + .address(*contract.get_instance().address()) .from_block(BlockNumberOrTag::Number(block + 1)) .to_block(BlockNumberOrTag::Number(dest_block)) .event_signature(Watcher::Event::SIGNATURE_HASH); - info!("Querying events for filter, address: {}, from_block: {}, to_block: {}, event_signature: {}", contract.address(), block + 1, dest_block, Watcher::Event::SIGNATURE_HASH); + info!("Querying events for filter, address: {}, from_block: {}, to_block: {}, event_signature: {}", contract.get_instance().address(), block + 1, dest_block, Watcher::Event::SIGNATURE_HASH); match events_filter.query().await { Ok(events) => { self.local_db.set( - &format!("LAST_BLOCK_NUMBER_{}", contract.address()), + &format!("LAST_BLOCK_NUMBER_{}", contract.get_instance().address()), dest_block, ); self.local_db.set( - &format!("TARGET_BLOCK_{}", contract.address()), + &format!("TARGET_BLOCK_{}", contract.get_instance().address()), target_block_number, ); @@ -110,7 +148,7 @@ impl> let mut tasks = vec![]; for (event, log) in &events { let backoff = get_exponential_backoff::(); - let handler = self.handler.clone(); + let handler = self.instance.clone(); let task = async move { Retry::spawn(backoff, || async { handler.handle(log, event).await }).await }; @@ -149,7 +187,12 @@ impl> } } -impl> EthereumHandlerWrapper { +impl< + Config: ConfigT, + T: EthereumContractBound, + Watcher: Clone + EvmEventHandler + EvmContractInstance, + > EthereumHandlerWrapper +{ async fn run_event_loop(&mut self) -> Result<(), Error> { while let Some(events) = self.next_event().await { if events.is_empty() { diff --git a/sdk/src/event_listener/tangle/jobs.rs b/sdk/src/event_listener/tangle/jobs.rs index 148681335..490b6f31c 100644 --- a/sdk/src/event_listener/tangle/jobs.rs +++ b/sdk/src/event_listener/tangle/jobs.rs @@ -1,11 +1,7 @@ -use crate::clients::tangle::runtime::TangleClient; -use crate::event_listener::tangle::{BlockNumber, TangleEvent}; +use crate::event_listener::tangle::{TangleEvent, TangleResult, ValueIntoFieldType}; use crate::Error; use subxt_core::events::StaticEvent; -use subxt_core::utils::AccountId32; use tangle_subxt::tangle_testnet_runtime::api; -use tangle_subxt::tangle_testnet_runtime::api::runtime_types::tangle_primitives::services::field::Field; -use tangle_subxt::tangle_testnet_runtime::api::services::calls::types; use tangle_subxt::tangle_testnet_runtime::api::services::calls::types::call::{Job, ServiceId}; use tangle_subxt::tangle_testnet_runtime::api::services::events::{ job_called, JobCalled, JobResultSubmitted, @@ -15,7 +11,7 @@ pub trait ServicesJobPalletItem: StaticEvent { fn call_id(&self) -> job_called::CallId; fn job_id(&self) -> Job; fn service_id(&self) -> ServiceId; - fn args(self) -> Option>> { + fn args(self) -> Option { None } } @@ -33,8 +29,8 @@ impl ServicesJobPalletItem for JobCalled { self.service_id } - fn args(self) -> Option>> { - Some(self.args) + fn args(self) -> Option { + Some(self.args.clone()) } } @@ -52,9 +48,9 @@ impl ServicesJobPalletItem for JobResultSubmitted { } } -pub async fn services_pre_processor( - event: TangleEvent, -) -> Result, Error> { +pub async fn services_pre_processor( + mut event: TangleEvent, +) -> Result, Error> { let this_service_id = event.service_id; let this_job_id = event.job_id; crate::info!("Pre-processing event for sid/bid = {this_service_id}/{this_job_id} ..."); @@ -66,16 +62,9 @@ pub async fn services_pre_processor( if job == this_job_id && service_id == this_service_id { crate::info!("Found actionable event for sid/bid = {service_id}/{job} ..."); - return Ok(TangleJobEvent { - args, - context: event.context, - block_number: event.block_number, - signer: event.signer, - client: event.client, - call_id: Some(call_id), - job_id: this_job_id, - service_id: this_service_id, - }); + event.call_id = Some(call_id); + event.args = args; + return Ok(event); } } @@ -83,42 +72,23 @@ pub async fn services_pre_processor( } /// By default, the tangle post-processor takes in a job result and submits the result on-chain -pub async fn services_post_processor( - TangleJobResult { +pub async fn services_post_processor( + TangleResult { results, service_id, call_id, client, signer, - }: TangleJobResult, + }: TangleResult, ) -> Result<(), Error> { crate::info!("Submitting result on-chain for service {service_id} call_id {call_id} ..."); - let response = api::tx() - .services() - .submit_result(service_id, call_id, results); + let response = + api::tx() + .services() + .submit_result(service_id, call_id, vec![results.into_field_type()]); let _ = crate::tx::tangle::send(&client, &signer, &response) .await .map_err(|err| Error::Client(err.to_string()))?; crate::info!("Submitted result on-chain"); Ok(()) } - -pub struct TangleJobResult { - pub results: types::submit_result::Result, - pub service_id: ServiceId, - pub call_id: job_called::CallId, - pub client: TangleClient, - pub signer: crate::keystore::TanglePairSigner, -} - -// TODO: Move up this a module to make call_id generic -pub struct TangleJobEvent { - pub args: Vec>, - pub context: Ctx, - pub client: TangleClient, - pub signer: crate::keystore::TanglePairSigner, - pub block_number: BlockNumber, - pub call_id: Option, - pub job_id: Job, - pub service_id: ServiceId, -} diff --git a/sdk/src/event_listener/tangle/mod.rs b/sdk/src/event_listener/tangle/mod.rs index 4995ab285..02d22aecd 100644 --- a/sdk/src/event_listener/tangle/mod.rs +++ b/sdk/src/event_listener/tangle/mod.rs @@ -4,17 +4,24 @@ use crate::event_listener::EventListener; use crate::Error; use async_trait::async_trait; use gadget_blueprint_proc_macro_core::FieldType; -use sp_core::crypto::AccountId32; +pub use subxt_core::utils::AccountId32; use std::collections::VecDeque; use std::marker::PhantomData; +use std::sync::Arc; +use std::sync::atomic::{AtomicBool, Ordering}; use subxt::backend::StreamOfResults; use subxt_core::events::EventDetails; +use tangle_subxt::tangle_testnet_runtime::api::runtime_types::bounded_collections::bounded_vec::BoundedVec; +use tangle_subxt::tangle_testnet_runtime::api::runtime_types::tangle_primitives::services::field::BoundedString; +pub use tangle_subxt::tangle_testnet_runtime::api::runtime_types::tangle_primitives::services::field::Field; use tangle_subxt::tangle_testnet_runtime::api::services::calls::types::call::{Job, ServiceId}; +use tangle_subxt::tangle_testnet_runtime::api::services::events::job_called; +use tangle_subxt::tangle_testnet_runtime::api::services::events::job_called::CallId; use tokio::sync::Mutex; pub mod jobs; -pub struct TangleEventListener { +pub struct TangleEventListener { current_block: Option, job_id: Job, service_id: ServiceId, @@ -22,8 +29,10 @@ pub struct TangleEventListener { context: Ctx, signer: crate::keystore::TanglePairSigner, client: TangleClient, + has_stopped: Arc, + stopper_tx: Arc>>>, enqueued_events: VecDeque>, - _pd: PhantomData, + _phantom: PhantomData, } pub type BlockNumber = u32; @@ -42,23 +51,39 @@ pub struct TangleListenerInput { /// Root events are preferred to be used as the Evt, as then the application can /// sort through a series of events to find the ones it is interested in for /// pre-processing. -pub struct TangleEvent { +#[derive(Clone)] +pub struct TangleEvent { pub evt: EventDetails, pub context: Ctx, + pub call_id: Option, + pub args: job_called::Args, pub block_number: BlockNumber, pub signer: crate::keystore::TanglePairSigner, pub client: TangleClient, pub job_id: Job, pub service_id: ServiceId, - _pd: PhantomData, + pub stopper: Arc>>>, + pub _phantom: PhantomData, +} + +impl TangleEvent { + /// Stops the event listener + pub fn stop(&self) -> bool { + let mut lock = self.stopper.lock(); + if let Some(tx) = lock.take() { + tx.send(()).is_ok() + } else { + false + } + } } -impl IsTangle for TangleEventListener {} +impl IsTangle for TangleEventListener {} #[async_trait] -impl - EventListener, TangleListenerInput> - for TangleEventListener +impl + EventListener, TangleListenerInput> + for TangleEventListener { async fn new(context: &TangleListenerInput) -> Result where @@ -73,6 +98,19 @@ impl } = context; let listener = Mutex::new(client.blocks().subscribe_finalized().await?); + + let (tx, rx) = tokio::sync::oneshot::channel(); + let has_stopped = Arc::new(AtomicBool::new(false)); + + let has_stopped_clone = has_stopped.clone(); + + let background_task = async move { + let _ = rx.await; + has_stopped_clone.store(false, Ordering::SeqCst); + }; + + drop(tokio::task::spawn(background_task)); + Ok(Self { listener, current_block: None, @@ -81,23 +119,32 @@ impl context: context.clone(), client: client.clone(), signer: signer.clone(), + stopper_tx: Arc::new(parking_lot::Mutex::new(Some(tx))), + has_stopped, enqueued_events: VecDeque::new(), - _pd: PhantomData, + _phantom: PhantomData, }) } - async fn next_event(&mut self) -> Option> { + async fn next_event(&mut self) -> Option> { loop { + if self.has_stopped.load(Ordering::SeqCst) { + return None; + } + if let Some(evt) = self.enqueued_events.pop_front() { return Some(TangleEvent { evt, context: self.context.clone(), signer: self.signer.clone(), + call_id: None, + stopper: self.stopper_tx.clone(), + args: vec![], block_number: self.current_block?, client: self.client.clone(), job_id: self.job_id, service_id: self.service_id, - _pd: PhantomData, + _phantom: PhantomData, }); } @@ -122,32 +169,105 @@ impl evt, context: self.context.clone(), signer: self.signer.clone(), + call_id: None, + stopper: self.stopper_tx.clone(), + args: vec![], block_number, client: self.client.clone(), job_id: self.job_id, service_id: self.service_id, - _pd: PhantomData, + _phantom: PhantomData, }); } } } - async fn handle_event(&mut self, _event: TangleEvent) -> Result<(), Error> { + async fn handle_event(&mut self, _event: TangleEvent) -> Result<(), Error> { unimplemented!("placeholder; will be removed") } } -pub trait FieldTypeToValue: Sized { - fn to_value(&self, field_type: FieldType) -> Self; +pub trait FieldTypeIntoValue: Sized { + fn convert(field: Field, field_type: FieldType) -> Self; +} + +pub trait ValueIntoFieldType { + fn into_field_type(self) -> Field; +} + +macro_rules! impl_value_to_field_type { + ($($t:ty => $j:path),*) => { + $( + impl ValueIntoFieldType for $t { + fn into_field_type(self) -> Field { + $j(self) + } + } + )* + }; +} + +impl_value_to_field_type!( + u8 => Field::Uint8, + u16 => Field::Uint16, + u32 => Field::Uint32, + u64 => Field::Uint64, + i8 => Field::Int8, + i16 => Field::Int16, + i32 => Field::Int32, + i64 => Field::Int64, + bool => Field::Bool, + AccountId32 => Field::AccountId +); + +impl ValueIntoFieldType for Vec { + fn into_field_type(self) -> Field { + Field::Array(BoundedVec( + self.into_iter() + .map(ValueIntoFieldType::into_field_type) + .collect(), + )) + } +} + +impl ValueIntoFieldType for [T; N] { + fn into_field_type(self) -> Field { + Field::Array(BoundedVec( + self.into_iter() + .map(ValueIntoFieldType::into_field_type) + .collect(), + )) + } +} + +impl ValueIntoFieldType for Option { + fn into_field_type(self) -> Field { + match self { + Some(val) => val.into_field_type(), + None => Field::None, + } + } +} + +impl ValueIntoFieldType for String { + fn into_field_type(self) -> Field { + Field::String(BoundedString(BoundedVec(self.into_bytes()))) + } } macro_rules! impl_field_type_to_value { - ($($t:ty => $f:pat),*) => { + ($($t:ty => $f:pat => $j:path),*) => { $( - impl FieldTypeToValue for $t { - fn to_value(&self, field_type: FieldType) -> Self { + impl FieldTypeIntoValue for $t { + fn convert(field: Field, field_type: FieldType) -> Self { match field_type { - $f => self.clone(), + $f => { + let $j (val) = field else { + panic!("Invalid field type!"); + }; + + val + }, _ => panic!("Invalid field type!"), } } @@ -157,26 +277,36 @@ macro_rules! impl_field_type_to_value { } impl_field_type_to_value!( - u8 => FieldType::Uint8, - u16 => FieldType::Uint16, - u32 => FieldType::Uint32, - u64 => FieldType::Uint64, - i8 => FieldType::Int8, - i16 => FieldType::Int16, - i32 => FieldType::Int32, - i64 => FieldType::Int64, - u128 => FieldType::Uint128, - i128 => FieldType::Int128, - f64 => FieldType::Float64, - bool => FieldType::Bool, - AccountId32 => FieldType::AccountId + u16 => FieldType::Uint16 => Field::Uint16, + u32 => FieldType::Uint32 => Field::Uint32, + u64 => FieldType::Uint64 => Field::Uint64, + i8 => FieldType::Int8 => Field::Int8, + i16 => FieldType::Int16 => Field::Int16, + i32 => FieldType::Int32 => Field::Int32, + i64 => FieldType::Int64 => Field::Int64, + bool => FieldType::Bool => Field::Bool, + AccountId32 => FieldType::AccountId => Field::AccountId ); -impl FieldTypeToValue for String { - fn to_value(&self, field_type: FieldType) -> Self { +impl FieldTypeIntoValue for String { + fn convert(field: Field, field_type: FieldType) -> Self { match field_type { - FieldType::String => self.clone(), + FieldType::String => { + let Field::String(val) = field else { + panic!("Invalid field type!"); + }; + + String::from_utf8(val.0 .0).expect("Bad String from pallet Field") + } _ => panic!("Invalid field type!"), } } } + +pub struct TangleResult { + pub results: Res, + pub service_id: ServiceId, + pub call_id: job_called::CallId, + pub client: TangleClient, + pub signer: crate::keystore::TanglePairSigner, +} diff --git a/sdk/src/events_watcher/evm.rs b/sdk/src/events_watcher/evm.rs index 1e766facf..0488e47e5 100644 --- a/sdk/src/events_watcher/evm.rs +++ b/sdk/src/events_watcher/evm.rs @@ -59,8 +59,6 @@ impl EvmEvent for X {} /// EventWatcher trait exists for deployments that are smart-contract / EVM based #[async_trait::async_trait] pub trait EvmEventHandler: Send + Sync + 'static { - /// The contract that this event watcher is watching. - type Contract: EvmContract; /// The type of event this handler is for. type Event: EvmEvent; /// The genesis transaction hash for the contract. diff --git a/sdk/src/lib.rs b/sdk/src/lib.rs index dea03dbbd..09fb304e6 100644 --- a/sdk/src/lib.rs +++ b/sdk/src/lib.rs @@ -70,6 +70,7 @@ pub mod utils; // Re-exports pub use alloy_rpc_types; +pub use async_trait; pub use clap; pub use error::Error; pub use futures; diff --git a/sdk/src/network/messaging.rs b/sdk/src/network/messaging.rs new file mode 100644 index 000000000..15ebb480f --- /dev/null +++ b/sdk/src/network/messaging.rs @@ -0,0 +1,458 @@ +use async_trait::async_trait; +use itertools::Itertools; +use serde::{Deserialize, Serialize}; +use std::collections::HashMap; +use std::fmt::Display; +use std::hash::Hash; +use std::ops::Add; +use std::sync::atomic::AtomicBool; +use std::sync::Arc; +use tokio::sync::RwLock; +use tokio::time::{sleep, Duration}; + +const OUTBOUND_POLL: Duration = Duration::from_millis(100); +const INBOUND_POLL: Duration = Duration::from_millis(100); + +#[async_trait] +pub trait MessageMetadata { + type JobId: Display + Hash + Eq + Copy + Send + Sync + 'static; + type PeerId: Display + Hash + Eq + Copy + Send + Sync + 'static; + type MessageId: Add + + Eq + + PartialEq + + Display + + Hash + + Ord + + PartialOrd + + Copy + + Send + + Sync + + 'static; + + fn job_id(&self) -> Self::JobId; + fn source_id(&self) -> Self::PeerId; + fn destination_id(&self) -> Self::PeerId; + fn message_id(&self) -> Self::MessageId; + fn contents(&self) -> &[u8]; +} + +#[async_trait] +pub trait Network { + type Message: MessageMetadata + Send + Sync + 'static; + + async fn next_message(&self) -> Option>; + async fn send_message(&self, message: &Payload) -> Result<(), NetworkError>; +} + +#[derive(Debug, Serialize, Deserialize)] +pub enum Payload { + Ack { + job_id: M::JobId, + from_id: M::PeerId, + message_id: M::MessageId, + }, + Message(M), +} + +#[derive(Debug)] +pub enum NetworkError { + SendFailed(String), + ConnectionError(String), +} + +#[derive(Debug)] +pub enum BackendError { + StorageError(String), + NotFound, + Stopped, +} + +#[derive(Debug, Copy, Clone)] +pub enum DeliveryError { + NoReceiver, + ChannelClosed, +} + +// Modified Backend trait to handle both outbound and inbound messages +#[async_trait] +pub trait Backend { + async fn store_outbound(&self, message: M) -> Result<(), BackendError>; + async fn store_inbound(&self, message: M) -> Result<(), BackendError>; + async fn clear_message( + &self, + peer_id: M::PeerId, + job_id: M::JobId, + message_id: M::MessageId, + ) -> Result<(), BackendError>; + async fn get_pending_outbound(&self) -> Result, BackendError>; + async fn get_pending_inbound(&self) -> Result, BackendError>; +} + +#[async_trait] +pub trait LocalDelivery { + async fn deliver(&self, message: M) -> Result<(), DeliveryError>; +} + +// Add this new struct to track last ACKed message IDs +pub struct MessageTracker { + last_acked: HashMap<(M::JobId, M::PeerId), M::MessageId>, +} + +impl MessageTracker { + fn new() -> Self { + Self { + last_acked: HashMap::new(), + } + } + + fn update_ack(&mut self, job_id: M::JobId, peer_id: M::PeerId, msg_id: M::MessageId) { + let key = (job_id, peer_id); + match self.last_acked.get(&key) { + Some(last_id) => { + if msg_id == *last_id + 1usize { + let _ = self.last_acked.insert(key, msg_id); + } + } + None => { + // For the first message in a sequence, only accept if it's the initial message + let _ = self.last_acked.insert(key, msg_id); + } + } + } + + fn can_send(&self, job_id: &M::JobId, peer_id: &M::PeerId, msg_id: &M::MessageId) -> bool { + match self.last_acked.get(&(*job_id, *peer_id)) { + Some(last_id) => *msg_id == *last_id + 1usize, + None => true, // If there is no message for this job/peer_id combo, send it + } + } +} + +pub struct MessageSystem +where + M: MessageMetadata + Clone + Send + Sync + Serialize + for<'de> Deserialize<'de> + 'static, + B: Backend + Send + Sync + 'static, + L: LocalDelivery + Send + Sync + 'static, + N: Network + Send + Sync + 'static, +{ + backend: Arc, + local_delivery: Arc, + network: Arc, + is_running: Arc, + tracker: Arc>>, +} + +impl Clone for MessageSystem +where + M: MessageMetadata + Clone + Send + Sync + Serialize + for<'de> Deserialize<'de> + 'static, + B: Backend + Send + Sync + 'static, + L: LocalDelivery + Send + Sync + 'static, + N: Network + Send + Sync + 'static, +{ + fn clone(&self) -> Self { + Self { + backend: self.backend.clone(), + local_delivery: self.local_delivery.clone(), + network: self.network.clone(), + is_running: self.is_running.clone(), + tracker: self.tracker.clone(), + } + } +} + +impl MessageSystem +where + M: MessageMetadata + Clone + Send + Sync + Serialize + for<'de> Deserialize<'de> + 'static, + B: Backend + Send + Sync + 'static, + L: LocalDelivery + Send + Sync + 'static, + N: Network + Send + Sync + 'static, +{ + pub fn new(backend: B, local_delivery: L, network: N) -> Self { + let this = Self { + backend: Arc::new(backend), + local_delivery: Arc::new(local_delivery), + network: Arc::new(network), + is_running: Arc::new(AtomicBool::new(true)), + tracker: Arc::new(RwLock::new(MessageTracker::new())), + }; + + this.spawn_background_tasks(); + + this + } + + fn spawn_background_tasks(&self) { + // Spawn outbound processing task + let self_clone = self.clone(); + let is_alive = self.is_running.clone(); + + let outbound_handle = tokio::spawn(async move { + loop { + self_clone.process_outbound().await; + sleep(OUTBOUND_POLL).await; + } + }); + + // Spawn inbound processing task + let self_clone = self.clone(); + let inbound_handle = tokio::spawn(async move { + loop { + self_clone.process_inbound().await; + sleep(INBOUND_POLL).await; + } + }); + + // Spawn network listener task + let self_clone = self.clone(); + let network_io_handle = tokio::spawn(async move { + self_clone.process_network_messages().await; + }); + + // Spawn a task that selects all three handles, and on any of them finishing, it will + // set the atomic bool to false + drop(tokio::spawn(async move { + tokio::select! { + _ = outbound_handle => { + crate::error!("Outbound processing task prematurely ended"); + }, + _ = inbound_handle => { + crate::error!("Inbound processing task prematurely ended"); + }, + _ = network_io_handle => { + crate::error!("Network IO task prematurely ended"); + }, + } + + is_alive.store(false, std::sync::atomic::Ordering::Relaxed); + })); + } + + async fn process_outbound(&self) { + let pending_messages = match self.backend.get_pending_outbound().await { + Ok(messages) => messages, + Err(e) => { + eprintln!("Failed to get pending outbound messages: {:?}", e); + return; + } + }; + + // Group messages by (JobId, PeerId) pair + let mut grouped_messages: HashMap<(M::JobId, M::PeerId), Vec> = HashMap::new(); + for msg in pending_messages { + grouped_messages + .entry((msg.job_id(), msg.destination_id())) + .or_default() + .push(msg); + } + + // Process each group independently + let tracker = self.tracker.read().await; + for ((job_id, peer_id), mut messages) in grouped_messages { + // Sort messages by MessageId + messages.sort_by_key(|m| m.message_id()); + + // Find the first message we can send based on ACKs + if let Some(msg) = messages + .into_iter() + .find(|m| tracker.can_send(&job_id, &peer_id, &m.message_id())) + { + if let Err(e) = self.network.send_message(&Payload::Message(msg)).await { + eprintln!("Failed to send message: {:?}", e); + } + } + } + } + + async fn process_inbound(&self) { + let pending_messages = match self.backend.get_pending_inbound().await { + Ok(messages) => messages, + Err(e) => { + eprintln!("Failed to get pending inbound messages: {:?}", e); + return; + } + }; + + // Sort the pending messages in order by MessageID + let pending_messages: Vec = pending_messages + .into_iter() + .sorted_by_key(|r| r.message_id()) + .collect(); + + for message in pending_messages { + match self.local_delivery.deliver(message.clone()).await { + Ok(()) => { + // Create and send ACK + if let Err(e) = self + .network + .send_message(&self.create_ack_message(&message)) + .await + { + crate::error!("Failed to send ACK: {e:?}"); + continue; + } + + // Clear delivered message from backend + if let Err(e) = self + .backend + .clear_message(message.source_id(), message.job_id(), message.message_id()) + .await + { + crate::error!("Failed to clear delivered message: {e:?}"); + } + } + Err(e) => { + crate::error!("Failed to deliver message: {e:?}"); + } + } + } + } + + // Modify process_network_messages to update the tracker + async fn process_network_messages(&self) { + loop { + if let Some(message) = self.network.next_message().await { + match message { + Payload::Ack { + job_id, + from_id, + message_id, + } => { + // Update the tracker with the new ACK + let mut tracker = self.tracker.write().await; + tracker.update_ack(job_id, from_id, message_id); + + if let Err(e) = self + .backend + .clear_message(from_id, job_id, message_id) + .await + { + crate::error!("Failed to clear ACKed message: {e:?}"); + } + } + Payload::Message(msg) => { + if let Err(e) = self.backend.store_inbound(msg).await { + crate::error!("Failed to store inbound message: {e:?}"); + } + } + } + } + } + } + + pub async fn send_message(&self, message: M) -> Result<(), BackendError> { + if self.is_running.load(std::sync::atomic::Ordering::Relaxed) { + self.backend.store_outbound(message).await + } else { + Err(BackendError::Stopped) + } + } + + fn create_ack_message(&self, original_message: &M) -> Payload { + Payload::Ack { + job_id: original_message.job_id(), + from_id: original_message.source_id(), + message_id: original_message.message_id(), + } + } +} + +// Example InMemoryBackend implementation +pub struct InMemoryBackend { + outbound: Mailbox, + inbound: Mailbox, +} + +type Mailbox = + Arc>>; + +impl InMemoryBackend { + pub fn new() -> Self { + Self { + outbound: Arc::new(RwLock::new(HashMap::new())), + inbound: Arc::new(RwLock::new(HashMap::new())), + } + } +} + +impl Default for InMemoryBackend { + fn default() -> Self { + Self::new() + } +} + +#[async_trait] +impl Backend for InMemoryBackend { + async fn store_outbound(&self, message: M) -> Result<(), BackendError> { + let mut outbound = self.outbound.write().await; + let (job_id, source_id, message_id) = + (message.job_id(), message.source_id(), message.message_id()); + + if outbound + .insert( + ( + message.job_id(), + message.destination_id(), + message.message_id(), + ), + message, + ) + .is_some() + { + crate::warn!( + "Overwriting existing message in outbound storage jid={}/dest={}/id={}", + job_id, + source_id, + message_id + ); + } + Ok(()) + } + + async fn store_inbound(&self, message: M) -> Result<(), BackendError> { + let mut inbound = self.inbound.write().await; + let (job_id, source_id, message_id) = + (message.job_id(), message.source_id(), message.message_id()); + + if inbound + .insert( + (message.job_id(), message.source_id(), message.message_id()), + message, + ) + .is_some() + { + crate::warn!( + "Overwriting existing message in inbound storage jid={}/src={}/id={}", + job_id, + source_id, + message_id + ); + } + Ok(()) + } + + async fn clear_message( + &self, + peer_id: M::PeerId, + job_id: M::JobId, + message_id: M::MessageId, + ) -> Result<(), BackendError> { + // Try to remove from both outbound and inbound + let mut outbound = self.outbound.write().await; + let mut inbound = self.inbound.write().await; + + let _ = outbound.remove(&(job_id, peer_id, message_id)); + let _ = inbound.remove(&(job_id, peer_id, message_id)); + + Ok(()) + } + + async fn get_pending_outbound(&self) -> Result, BackendError> { + let outbound = self.outbound.read().await; + Ok(outbound.values().cloned().collect()) + } + + async fn get_pending_inbound(&self) -> Result, BackendError> { + let inbound = self.inbound.read().await; + Ok(inbound.values().cloned().collect()) + } +} diff --git a/sdk/src/network/mod.rs b/sdk/src/network/mod.rs index 593cacd6d..157e4dfa1 100644 --- a/sdk/src/network/mod.rs +++ b/sdk/src/network/mod.rs @@ -13,6 +13,7 @@ pub mod gossip; pub mod handlers; #[cfg(target_family = "wasm")] pub mod matchbox; +pub mod messaging; pub mod setup; #[derive(Debug, Serialize, Deserialize, Clone, Copy)] diff --git a/sdk/src/utils.rs b/sdk/src/utils.rs index 481048ae5..22f5e1002 100644 --- a/sdk/src/utils.rs +++ b/sdk/src/utils.rs @@ -1,6 +1,9 @@ +use crate::clients::tangle::runtime::TangleClient; use crate::config::{ContextConfig, GadgetCLICoreSettings}; use crate::info; use crate::keystore::KeystoreUriSanitizer; +use futures::future::select_ok; +use std::error::Error; use std::io::Write; #[allow(irrefutable_let_patterns)] @@ -24,3 +27,11 @@ pub fn check_for_test(config: &ContextConfig) -> Result<(), crate::Error> { Ok(()) } + +/// Returns either client. Allows flexibility in choosing between HTTP and WS clients +/// depending on the local setup +pub async fn get_client(ws_url: &str, http_url: &str) -> Result> { + let task0 = TangleClient::from_url(ws_url); + let task1 = TangleClient::from_url(http_url); + Ok(select_ok([Box::pin(task0), Box::pin(task1)]).await?.0) +}