From 621062d6c081e209f07302ae14548e2125b49ac2 Mon Sep 17 00:00:00 2001 From: Thomas Braun Date: Wed, 29 Jan 2025 22:33:37 -0500 Subject: [PATCH 1/3] feat: Add multinode test executor --- blueprints/incredible-squaring/src/tests.rs | 6 +- crates/testing-utils/core/src/harness.rs | 3 - crates/testing-utils/tangle/src/harness.rs | 376 ++++++++++++++++++-- 3 files changed, 341 insertions(+), 44 deletions(-) diff --git a/blueprints/incredible-squaring/src/tests.rs b/blueprints/incredible-squaring/src/tests.rs index e27faaa12..90d60dca1 100644 --- a/blueprints/incredible-squaring/src/tests.rs +++ b/blueprints/incredible-squaring/src/tests.rs @@ -1,4 +1,6 @@ use crate::{MyContext, XsquareEventHandler}; +use blueprint_sdk::config::GadgetConfiguration; +use blueprint_sdk::event_listeners::core::InitializableEventHandler; use blueprint_sdk::logging::setup_log; use blueprint_sdk::testing::tempfile; use blueprint_sdk::testing::utils::harness::TestHarness; @@ -22,9 +24,7 @@ async fn test_incredible_squaring() -> Result<()> { }; // Initialize event handler - let handler = XsquareEventHandler::new(&env.clone(), blueprint_ctx) - .await - .unwrap(); + let handler = XsquareEventHandler::new(&env.clone(), blueprint_ctx).await?; // Setup service let (mut test_env, service_id) = harness.setup_services().await?; diff --git a/crates/testing-utils/core/src/harness.rs b/crates/testing-utils/core/src/harness.rs index 5981fa80f..0cb037676 100644 --- a/crates/testing-utils/core/src/harness.rs +++ b/crates/testing-utils/core/src/harness.rs @@ -16,9 +16,6 @@ pub trait TestHarness { /// Gets the gadget configuration fn env(&self) -> &GadgetConfiguration; - - /// Gets the configuration - fn config(&self) -> &Self::Config; } /// Base implementation of a test harness that can be used by specific implementations diff --git a/crates/testing-utils/tangle/src/harness.rs b/crates/testing-utils/tangle/src/harness.rs index ad1a1310f..8398a2015 100644 --- a/crates/testing-utils/tangle/src/harness.rs +++ b/crates/testing-utils/tangle/src/harness.rs @@ -17,15 +17,24 @@ use gadget_core_testing_utils::{ runner::TestEnv, }; use gadget_crypto_tangle_pair_signer::TanglePairSigner; +use gadget_event_listeners::core::InitializableEventHandler; use gadget_keystore::backends::Backend; use gadget_keystore::crypto::sp_core::{SpEcdsa, SpSr25519}; +use gadget_macros::ext::futures::stream::FuturesUnordered; +use gadget_macros::ext::futures::TryStreamExt; +use gadget_runners::core::error::RunnerError; use gadget_runners::tangle::tangle::{PriceTargets, TangleConfig}; use sp_core::Pair; +use std::collections::HashMap; +use std::future::Future; +use std::sync::Arc; use tangle_subxt::tangle_testnet_runtime::api::services::{ calls::types::{call::Job, register::Preferences}, events::JobResultSubmitted, }; use tempfile::TempDir; +use tokio::sync::Mutex; +use tokio::task::JoinError; use url::Url; /// Configuration for the Tangle test harness @@ -36,24 +45,209 @@ pub struct TangleTestConfig { } /// Test harness for Tangle network tests -pub struct TangleTestHarness { - base: BaseTestHarness, +pub struct TangleTestHarness { pub http_endpoint: Url, pub ws_endpoint: Url, client: TangleClient, pub sr25519_signer: TanglePairSigner, pub ecdsa_signer: TanglePairSigner, pub alloy_key: alloy_signer_local::PrivateKeySigner, + pub client_envs: Vec, _temp_dir: tempfile::TempDir, _node: crate::node::testnet::SubstrateNode, } +/// Manages multiple Tangle test environments and executes jobs in parallel across different configurations. +/// +/// [`MultiNodeTangleTestEnv`] allows adding, starting, and stopping jobs on demand, facilitating concurrent testing +/// across multiple Tangle nodes. It handles the coordination of job execution and allows introspection on the state +/// of the system as a whole (either all jobs are running successfully, or a specific job has failed which it interpreted +/// as a global failure in the testing context. +pub struct MultiNodeTangleTestEnv { + envs: Vec, + start_tx: Option>, + command_tx: tokio::sync::mpsc::UnboundedSender, + dead_rx: Option>, +} + +enum NodeExecutorCommand { + StartJob(usize, Box), + Stop(usize), +} + +trait JobCreator: Fn(GadgetConfiguration) -> Box {} +impl Box> JobCreator for T {} + +impl MultiNodeTangleTestEnv { + /// Creates a new `MultiNodeTangleTestEnv` that can execute jobs in parallel across multiple chains. + /// + /// The `test_envs` parameter should be a `HashMap` where the key is the job ID and the value is the + /// `TangleTestEnv` to use for that job. The `envs` parameter should be a vector of + /// `GadgetConfiguration`s that will be used to create the event handlers for each job. + /// + /// The `MultiNodeTangleTestEnv` will not execute any jobs until the `execute` method is called. + /// When `execute` is called, the `MultiNodeTangleTestEnv` will start a background task that will + /// execute the jobs in parallel. The background task will not shut down until all jobs have + /// completed or the `MultiNodeTangleTestEnv` is dropped. + /// + /// The `MultiNodeTangleTestEnv` provides methods for starting and stopping jobs. The `start_job` + /// method takes a job ID and a closure that returns an `InitializableEventHandler`. The closure + /// will be called with the `GadgetConfiguration` for the specified job ID, and the returned event + /// handler will be used to execute the job. The `stop_job` method takes a job ID and will signal + /// the background task to stop the job with the specified ID. + /// + /// The `MultiNodeTangleTestEnv` also provides a method for getting the `GadgetConfiguration` for + /// a given job ID. This can be used to get the configuration for a job without starting the job. + pub async fn new( + mut test_envs: HashMap, + envs: Vec, + ) -> Self { + let (start_tx, start_rx) = tokio::sync::oneshot::channel(); + let (command_tx, mut command_rx) = tokio::sync::mpsc::unbounded_channel(); + let (dead_tx, dead_rx) = tokio::sync::oneshot::channel(); + let envs_clone = envs.clone(); + + // This task will not run until the user has triggered it to begin + let background_task = async move { + if start_rx.await.is_err() { + gadget_logging::warn!("MultiNodeTangleTestEnv was dropped without executing"); + } + + // Allows stopping a running job + let mut handles = HashMap::new(); + + while let Some(command) = command_rx.recv().await { + match command { + NodeExecutorCommand::StartJob(job_id, creator) => { + let job = creator(envs_clone[job_id].clone()); + if let Some(stopper_rx) = job.init_event_handler().await { + let (control_tx, mut control_rx) = + tokio::sync::oneshot::channel::<()>(); + let task = async move { + tokio::select! { + _ = control_rx => { + gadget_logging::info!("[SHUTDOWN] Job {job_id} manually shutdown"); + }, + res = stopper_rx.recv() => { + gadget_logging::info!("[SHUTDOWN] Job {job_id} completed with result {res}"); + } + }; + }; + + handles.insert(job_id, control_tx); + } else { + gadget_logging::warn!( + "Failed to initialize event handler for job {job_id}" + ); + } + } + + NodeExecutorCommand::Stop(job_id) => { + gadget_logging::info!("[SHUTDOWN] Job {job_id} shutdown signal received"); + if let Some(control_tx) = handles.remove(job_id) { + let _ = control_tx.send(()); + } else { + gadget_logging::warn!("Failed to stop job {job_id}"); + } + } + } + } + }; + + drop(tokio::spawn(background_task)); + + Self { + envs, + command_tx, + start_tx: Some(start_tx), + dead_rx: Some(dead_rx), + } + } + + /// Adds a job to the test harness to be executed when the test is run. + /// + /// The `job_creator` parameter is a function that takes a `GadgetConfiguration` as an argument + /// and returns a boxed `InitializableEventHandler`. The job creator is called with the + /// `GadgetConfiguration` corresponding to the environment the job is running in. + /// + /// The job is added to the end of the list of jobs and can be stopped using the `stop_job` + /// method. + /// + /// # Errors + /// + /// If the job cannot be added to the test harness, an error is returned. + pub fn add_job K, K: InitializableEventHandler>( + &mut self, + job_creator: T, + ) -> Result<&mut Self, RunnerError> { + self.command_tx + .send(NodeExecutorCommand::StartJob( + self.envs.len(), + Box::new(job_creator), + )) + .map_err(|err| RunnerError::Other(err.to_string()))?; + Ok(self) + } + + /// Stops a job with the given job ID. Returns an error if the job is not found or if + /// the job is not running. + pub fn stop_job(&mut self, job_id: usize) -> Result<&mut Self, RunnerError> { + self.command_tx + .send(NodeExecutorCommand::Stop(job_id)) + .map_err(|err| RunnerError::Other(err.to_string()))?; + Ok(self) + } + + /// Begins the execution of the jobs in parallel and in the background + /// + /// Any jobs preloaded via `add_job` will be executed after this function is called + /// Consequent jobs may still be added via `add_job` + pub fn start(&mut self) -> Result<(), RunnerError> { + self.start_tx + .take() + .ok_or_else(|| RunnerError::Other("Test harness already started".to_string()))?; + /* + Old logic for reference: + let mut handles = FuturesUnordered::new(); + for test_env in &mut self.test_envs { + handles.push(tokio::spawn(test_env.run_runner())); + } + + async move { + let mut results = handles.try_collect::>().await + .map_err(|err| RunnerError::Other(err.to_string()))?; + results.pop().expect("Should have at least one result") + }*/ + Ok(()) + } + + /// Returns `true` if the test harness has errored, `false` otherwise. + /// + /// If the test harness has errored, the `dead_rx` oneshot receiver is taken and + /// awaited. If the receiver is `None`, `true` is returned immediately. + /// + /// The return value of this function is `true` if the test harness has errored, + /// regardless of whether the `dead_rx` was taken and awaited or not. + pub async fn has_errored(&mut self) -> bool { + if let Some(mut rx) = self.dead_rx.take() { + let _ = rx.await; + } + + true + } +} + +const ENDOWED_TEST_NAMES: [&str; 5] = ["Alice", "Bob", "Charlie", "Dave", "Eve"]; + #[async_trait::async_trait] -impl TestHarness for TangleTestHarness { +impl TestHarness for TangleTestHarness { type Config = TangleTestConfig; type Error = Error; async fn setup(test_dir: TempDir) -> Result { + assert!(N <= 5, "Cannot setup more than 5 services"); + assert_ne!(N, 0, "Cannot setup 0 services"); + // Start Local Tangle Node let node = run(NodeConfig::new(false)) .await @@ -61,27 +255,40 @@ impl TestHarness for TangleTestHarness { let http_endpoint = Url::parse(&format!("http://127.0.0.1:{}", node.ws_port()))?; let ws_endpoint = Url::parse(&format!("ws://127.0.0.1:{}", node.ws_port()))?; - // Setup testing directory - let test_dir_path = test_dir.path().to_string_lossy().into_owned(); - inject_tangle_key(&test_dir_path, "//Alice")?; - - // Create context config - let context_config = ContextConfig::create_tangle_config( - http_endpoint.clone(), - ws_endpoint.clone(), - test_dir_path, - None, - SupportedChains::LocalTestnet, - 0, - Some(0), - ); - - // Load environment - let mut env = - gadget_config::load(context_config).map_err(|e| Error::Setup(e.to_string()))?; + let mut client_envs = vec![]; + + for idx in 0..N { + // Setup testing directory + let name = ENDOWED_TEST_NAMES[idx]; + let test_dir_path = test_dir + .path() + .join(format!("./{}", name.to_ascii_lowercase())) + .to_string_lossy() + .into_owned(); + tokio::fs::create_dir_all(&test_dir_path).await?; + inject_tangle_key(&test_dir_path, &format!("//{name}"))?; + + // Create context config + let context_config = ContextConfig::create_tangle_config( + http_endpoint.clone(), + ws_endpoint.clone(), + test_dir_path, + None, + SupportedChains::LocalTestnet, + 0, + Some(0), + ); + + // Load environment + let mut env = + gadget_config::load(context_config).map_err(|e| Error::Setup(e.to_string()))?; + + // Always set test mode, dont require callers to set env vars + env.test_mode = true; + client_envs.push(env); + } - // Always set test mode, dont require callers to set env vars - env.test_mode = true; + let alice_env = &client_envs[0]; // Create config let config = TangleTestConfig { @@ -89,10 +296,8 @@ impl TestHarness for TangleTestHarness { ws_endpoint: Some(ws_endpoint.clone()), }; - let base = BaseTestHarness::new(env.clone(), config); - // Setup signers - let keystore = env.keystore(); + let keystore = alice_env.keystore(); let sr25519_public = keystore.first_local::()?; let sr25519_pair = keystore.get_secret::(&sr25519_public)?; let sr25519_signer = TanglePairSigner::new(sr25519_pair.0); @@ -104,9 +309,9 @@ impl TestHarness for TangleTestHarness { .alloy_key() .map_err(|e| Error::Setup(e.to_string()))?; - let client = env.tangle_client().await?; + let client = alice_env.tangle_client().await?; let harness = Self { - base, + client_envs, http_endpoint, ws_endpoint, client, @@ -127,15 +332,11 @@ impl TestHarness for TangleTestHarness { } fn env(&self) -> &GadgetConfiguration { - &self.base.env - } - - fn config(&self) -> &Self::Config { - &self.base.config + &self.client_envs[0] } } -impl TangleTestHarness { +impl TangleTestHarness { /// Gets a reference to the Tangle client pub fn client(&self) -> &TangleClient { &self.client @@ -208,7 +409,7 @@ impl TangleTestHarness { } /// Sets up a complete service environment with initialized event handlers - pub async fn setup_services(&self) -> Result<(TangleTestEnv, u64), Error> { + pub async fn setup_services(&self) -> Result<(Vec, u64), Error> { // Deploy blueprint let blueprint_id = self.deploy_blueprint().await?; @@ -223,10 +424,14 @@ impl TangleTestHarness { .await .map_err(|e| Error::Setup(e.to_string()))?; - // Create and spawn test environment - let test_env = TangleTestEnv::new(TangleConfig::default(), self.env().clone())?; + let mut test_envs = vec![]; + for env in &self.client_envs { + // Create and spawn test environment + let test_env = TangleTestEnv::new(TangleConfig::default(), env.clone())?; + test_envs.push(test_env); + } - Ok((test_env, service_id)) + Ok((test_envs, service_id)) } /// Executes a job and verifies its output matches the expected result @@ -260,3 +465,98 @@ impl TangleTestHarness { Ok(results) } } + +#[cfg(test)] +mod tests { + use super::*; + use gadget_core::test_utils::assert_ok; + use std::path::PathBuf; + + #[tokio::test] + async fn test_harness_setup() { + let test_dir = TempDir::new().unwrap(); + let harness = TangleTestHarness::<1>::setup(test_dir).await; + assert!(harness.is_ok(), "Harness setup should succeed"); + + let harness = harness.unwrap(); + assert!(harness.client().is_connected().await, "Client should be connected"); + assert_eq!(harness.client_envs.len(), 1, "Should have 1 client env"); + } + + #[tokio::test] + async fn test_harness_setup_with_multiple_services() { + let test_dir = TempDir::new().unwrap(); + let harness = TangleTestHarness::<3>::setup(test_dir).await; + assert!(harness.is_ok(), "Harness setup should succeed"); + + let harness = harness.unwrap(); + assert_eq!(harness.client_envs.len(), 3, "Should have 3 client envs"); + + // Verify each environment has unique keys + let keys: Vec<_> = harness + .client_envs + .iter() + .map(|env| env.keystore().first_local::().unwrap()) + .collect(); + assert_eq!(keys.len(), 3, "Should have 3 unique keys"); + assert!(keys[0] != keys[1] && keys[1] != keys[2], "Keys should be unique"); + } + + #[tokio::test] + async fn test_deploy_mbsm() { + let test_dir = TempDir::new().unwrap(); + let harness = TangleTestHarness::<1>::setup(test_dir).await.unwrap(); + + // MBSM should be deployed during setup + let latest_revision = transactions::get_latest_mbsm_revision(harness.client()) + .await + .unwrap(); + assert!(latest_revision.is_some(), "MBSM should be deployed"); + } + + #[tokio::test] + async fn test_execute_job() { + let test_dir = TempDir::new().unwrap(); + let harness = TangleTestHarness::<1>::setup(test_dir).await.unwrap(); + + // First set up a service + let (test_envs, service_id) = harness.setup_services().await.unwrap(); + assert!(!test_envs.is_empty(), "Should have test environments"); + assert!(service_id > 0, "Should have valid service ID"); + + // Execute a simple job + let inputs = vec![InputValue::U64(42)]; + let expected = vec![OutputValue::U64(42)]; + let result = harness.execute_job(service_id, 0, inputs, expected).await; + assert_ok!(result, "Job execution should succeed"); + } + + #[tokio::test] + async fn test_create_deploy_opts() { + let test_dir = TempDir::new().unwrap(); + let harness = TangleTestHarness::<1>::setup(test_dir).await.unwrap(); + + let manifest_path = PathBuf::from("Cargo.toml"); + let opts = harness.create_deploy_opts(manifest_path.clone()); + + assert_eq!(opts.manifest_path, manifest_path); + assert!(opts.signer.is_some(), "Should have SR25519 signer"); + assert!(opts.signer_evm.is_some(), "Should have EVM signer"); + assert!(opts.http_rpc_url.starts_with("http://"), "Should have HTTP URL"); + assert!(opts.ws_rpc_url.starts_with("ws://"), "Should have WebSocket URL"); + } + + #[tokio::test] + #[should_panic(expected = "Cannot setup more than 5 services")] + async fn test_harness_setup_exceeds_max_services() { + let test_dir = TempDir::new().unwrap(); + let _harness = TangleTestHarness::<6>::setup(test_dir).await.unwrap(); + } + + #[tokio::test] + #[should_panic(expected = "Cannot setup 0 services")] + async fn test_harness_setup_zero_services() { + let test_dir = TempDir::new().unwrap(); + let _harness = TangleTestHarness::<0>::setup(test_dir).await.unwrap(); + } +} From 93e68e980243c6023126f9727c480b8b5ccb138e Mon Sep 17 00:00:00 2001 From: Thomas Braun Date: Thu, 30 Jan 2025 10:01:17 -0500 Subject: [PATCH 2/3] feat: jobs per node static -> jobs per node dynamic --- blueprints/incredible-squaring/src/tests.rs | 26 ++-- crates/testing-utils/tangle/src/harness.rs | 134 +++++++++++--------- crates/testing-utils/tangle/src/tests.rs | 12 +- 3 files changed, 92 insertions(+), 80 deletions(-) diff --git a/blueprints/incredible-squaring/src/tests.rs b/blueprints/incredible-squaring/src/tests.rs index 90d60dca1..92622036f 100644 --- a/blueprints/incredible-squaring/src/tests.rs +++ b/blueprints/incredible-squaring/src/tests.rs @@ -14,25 +14,23 @@ async fn test_incredible_squaring() -> Result<()> { // Initialize test harness (node, keys, deployment) let temp_dir = tempfile::TempDir::new()?; - let harness = TangleTestHarness::setup(temp_dir).await?; - let env = harness.env().clone(); + let harness = TangleTestHarness::<1>::setup(temp_dir).await?; - // Create blueprint-specific context - let blueprint_ctx = MyContext { - env: env.clone(), - call_id: None, - }; + let xsquare_creator = |env| async move { + // Create blueprint-specific context + let blueprint_ctx = MyContext { + env: env.clone(), + call_id: None, + }; - // Initialize event handler - let handler = XsquareEventHandler::new(&env.clone(), blueprint_ctx).await?; + // Initialize event handler + XsquareEventHandler::new(&env, blueprint_ctx).await? + }; // Setup service let (mut test_env, service_id) = harness.setup_services().await?; - test_env.add_job(handler); - - tokio::spawn(async move { - test_env.run_runner().await.unwrap(); - }); + test_env.add_job(xsquare_creator)?; + test_env.start()?; // Execute job and verify result let results = harness diff --git a/crates/testing-utils/tangle/src/harness.rs b/crates/testing-utils/tangle/src/harness.rs index 8398a2015..c99d05ba2 100644 --- a/crates/testing-utils/tangle/src/harness.rs +++ b/crates/testing-utils/tangle/src/harness.rs @@ -12,21 +12,15 @@ use crate::{ use gadget_client_tangle::client::TangleClient; use gadget_config::{supported_chains::SupportedChains, ContextConfig, GadgetConfiguration}; use gadget_contexts::{keystore::KeystoreContext, tangle::TangleClientContext}; -use gadget_core_testing_utils::{ - harness::{BaseTestHarness, TestHarness}, - runner::TestEnv, -}; +use gadget_core_testing_utils::{harness::TestHarness, runner::TestEnv}; use gadget_crypto_tangle_pair_signer::TanglePairSigner; use gadget_event_listeners::core::InitializableEventHandler; use gadget_keystore::backends::Backend; use gadget_keystore::crypto::sp_core::{SpEcdsa, SpSr25519}; -use gadget_macros::ext::futures::stream::FuturesUnordered; -use gadget_macros::ext::futures::TryStreamExt; use gadget_runners::core::error::RunnerError; use gadget_runners::tangle::tangle::{PriceTargets, TangleConfig}; use sp_core::Pair; use std::collections::HashMap; -use std::future::Future; use std::sync::Arc; use tangle_subxt::tangle_testnet_runtime::api::services::{ calls::types::{call::Job, register::Preferences}, @@ -34,7 +28,6 @@ use tangle_subxt::tangle_testnet_runtime::api::services::{ }; use tempfile::TempDir; use tokio::sync::Mutex; -use tokio::task::JoinError; use url::Url; /// Configuration for the Tangle test harness @@ -68,22 +61,32 @@ pub struct MultiNodeTangleTestEnv { start_tx: Option>, command_tx: tokio::sync::mpsc::UnboundedSender, dead_rx: Option>, + running_test_nodes: Arc>>, } enum NodeExecutorCommand { StartJob(usize, Box), - Stop(usize), + StopJob(usize), } -trait JobCreator: Fn(GadgetConfiguration) -> Box {} -impl Box> JobCreator for T {} +trait JobCreator: + Fn(GadgetConfiguration) -> Box + Send + 'static +{ +} +impl< + T: Fn(GadgetConfiguration) -> Box + + Send + + 'static, + > JobCreator for T +{ +} impl MultiNodeTangleTestEnv { /// Creates a new `MultiNodeTangleTestEnv` that can execute jobs in parallel across multiple chains. /// - /// The `test_envs` parameter should be a `HashMap` where the key is the job ID and the value is the - /// `TangleTestEnv` to use for that job. The `envs` parameter should be a vector of - /// `GadgetConfiguration`s that will be used to create the event handlers for each job. + /// The `test_envs` parameter should be a `HashMap` where the key is the node ID and the value is the + /// `TangleTestEnv` to use for that node. The `envs` parameter should be a vector of + /// `GadgetConfiguration`s that will be used to create the event handlers for each node. /// /// The `MultiNodeTangleTestEnv` will not execute any jobs until the `execute` method is called. /// When `execute` is called, the `MultiNodeTangleTestEnv` will start a background task that will @@ -98,8 +101,8 @@ impl MultiNodeTangleTestEnv { /// /// The `MultiNodeTangleTestEnv` also provides a method for getting the `GadgetConfiguration` for /// a given job ID. This can be used to get the configuration for a job without starting the job. - pub async fn new( - mut test_envs: HashMap, + pub fn new( + initial_node_envs: HashMap, envs: Vec, ) -> Self { let (start_tx, start_rx) = tokio::sync::oneshot::channel(); @@ -120,16 +123,16 @@ impl MultiNodeTangleTestEnv { match command { NodeExecutorCommand::StartJob(job_id, creator) => { let job = creator(envs_clone[job_id].clone()); + // TODO: Job should run inside node test env if let Some(stopper_rx) = job.init_event_handler().await { - let (control_tx, mut control_rx) = - tokio::sync::oneshot::channel::<()>(); + let (control_tx, control_rx) = tokio::sync::oneshot::channel::<()>(); let task = async move { tokio::select! { _ = control_rx => { gadget_logging::info!("[SHUTDOWN] Job {job_id} manually shutdown"); }, - res = stopper_rx.recv() => { - gadget_logging::info!("[SHUTDOWN] Job {job_id} completed with result {res}"); + res = stopper_rx => { + gadget_logging::info!("[SHUTDOWN] Job {job_id} completed with result {res:?}"); } }; }; @@ -142,9 +145,9 @@ impl MultiNodeTangleTestEnv { } } - NodeExecutorCommand::Stop(job_id) => { + NodeExecutorCommand::StopJob(job_id) => { gadget_logging::info!("[SHUTDOWN] Job {job_id} shutdown signal received"); - if let Some(control_tx) = handles.remove(job_id) { + if let Some(control_tx) = handles.remove(&job_id) { let _ = control_tx.send(()); } else { gadget_logging::warn!("Failed to stop job {job_id}"); @@ -161,6 +164,7 @@ impl MultiNodeTangleTestEnv { command_tx, start_tx: Some(start_tx), dead_rx: Some(dead_rx), + running_test_nodes: Arc::new(Mutex::new(initial_node_envs)), } } @@ -176,14 +180,17 @@ impl MultiNodeTangleTestEnv { /// # Errors /// /// If the job cannot be added to the test harness, an error is returned. - pub fn add_job K, K: InitializableEventHandler>( + pub fn add_job< + T: Fn(GadgetConfiguration) -> K + Send + 'static, + K: InitializableEventHandler + Send + 'static, + >( &mut self, job_creator: T, ) -> Result<&mut Self, RunnerError> { self.command_tx .send(NodeExecutorCommand::StartJob( self.envs.len(), - Box::new(job_creator), + Box::new(move |env| Box::new(job_creator(env)) as Box<_>), )) .map_err(|err| RunnerError::Other(err.to_string()))?; Ok(self) @@ -193,7 +200,7 @@ impl MultiNodeTangleTestEnv { /// the job is not running. pub fn stop_job(&mut self, job_id: usize) -> Result<&mut Self, RunnerError> { self.command_tx - .send(NodeExecutorCommand::Stop(job_id)) + .send(NodeExecutorCommand::StopJob(job_id)) .map_err(|err| RunnerError::Other(err.to_string()))?; Ok(self) } @@ -221,19 +228,14 @@ impl MultiNodeTangleTestEnv { Ok(()) } - /// Returns `true` if the test harness has errored, `false` otherwise. - /// - /// If the test harness has errored, the `dead_rx` oneshot receiver is taken and - /// awaited. If the receiver is `None`, `true` is returned immediately. - /// - /// The return value of this function is `true` if the test harness has errored, - /// regardless of whether the `dead_rx` was taken and awaited or not. - pub async fn has_errored(&mut self) -> bool { - if let Some(mut rx) = self.dead_rx.take() { + pub async fn is_empty(&self) -> bool { + self.running_test_nodes.lock().await.is_empty() + } + + pub async fn wait_for_error(&mut self) { + if let Some(rx) = self.dead_rx.take() { let _ = rx.await; } - - true } } @@ -257,9 +259,8 @@ impl TestHarness for TangleTestHarness { let mut client_envs = vec![]; - for idx in 0..N { + for name in ENDOWED_TEST_NAMES.iter().take(N) { // Setup testing directory - let name = ENDOWED_TEST_NAMES[idx]; let test_dir_path = test_dir .path() .join(format!("./{}", name.to_ascii_lowercase())) @@ -409,7 +410,7 @@ impl TangleTestHarness { } /// Sets up a complete service environment with initialized event handlers - pub async fn setup_services(&self) -> Result<(Vec, u64), Error> { + pub async fn setup_services(&self) -> Result<(MultiNodeTangleTestEnv, u64), Error> { // Deploy blueprint let blueprint_id = self.deploy_blueprint().await?; @@ -424,14 +425,16 @@ impl TangleTestHarness { .await .map_err(|e| Error::Setup(e.to_string()))?; - let mut test_envs = vec![]; - for env in &self.client_envs { + let mut test_envs = HashMap::new(); + for (id, env) in self.client_envs.iter().enumerate() { // Create and spawn test environment let test_env = TangleTestEnv::new(TangleConfig::default(), env.clone())?; - test_envs.push(test_env); + test_envs.insert(id, test_env); } - Ok((test_envs, service_id)) + let executor = MultiNodeTangleTestEnv::new(test_envs, self.client_envs.clone()); + + Ok((executor, service_id)) } /// Executes a job and verifies its output matches the expected result @@ -469,7 +472,6 @@ impl TangleTestHarness { #[cfg(test)] mod tests { use super::*; - use gadget_core::test_utils::assert_ok; use std::path::PathBuf; #[tokio::test] @@ -477,9 +479,12 @@ mod tests { let test_dir = TempDir::new().unwrap(); let harness = TangleTestHarness::<1>::setup(test_dir).await; assert!(harness.is_ok(), "Harness setup should succeed"); - + let harness = harness.unwrap(); - assert!(harness.client().is_connected().await, "Client should be connected"); + assert!( + harness.client().now().await.is_some(), + "Client should be connected to live chain" + ); assert_eq!(harness.client_envs.len(), 1, "Should have 1 client env"); } @@ -488,10 +493,10 @@ mod tests { let test_dir = TempDir::new().unwrap(); let harness = TangleTestHarness::<3>::setup(test_dir).await; assert!(harness.is_ok(), "Harness setup should succeed"); - + let harness = harness.unwrap(); assert_eq!(harness.client_envs.len(), 3, "Should have 3 client envs"); - + // Verify each environment has unique keys let keys: Vec<_> = harness .client_envs @@ -499,14 +504,17 @@ mod tests { .map(|env| env.keystore().first_local::().unwrap()) .collect(); assert_eq!(keys.len(), 3, "Should have 3 unique keys"); - assert!(keys[0] != keys[1] && keys[1] != keys[2], "Keys should be unique"); + assert!( + keys[0] != keys[1] && keys[1] != keys[2], + "Keys should be unique" + ); } #[tokio::test] async fn test_deploy_mbsm() { let test_dir = TempDir::new().unwrap(); let harness = TangleTestHarness::<1>::setup(test_dir).await.unwrap(); - + // MBSM should be deployed during setup let latest_revision = transactions::get_latest_mbsm_revision(harness.client()) .await @@ -518,32 +526,38 @@ mod tests { async fn test_execute_job() { let test_dir = TempDir::new().unwrap(); let harness = TangleTestHarness::<1>::setup(test_dir).await.unwrap(); - + // First set up a service let (test_envs, service_id) = harness.setup_services().await.unwrap(); - assert!(!test_envs.is_empty(), "Should have test environments"); - assert!(service_id > 0, "Should have valid service ID"); + assert!(!test_envs.is_empty().await, "Should have test environments"); + assert_eq!(service_id, 0, "Should have valid service ID = 0"); // Execute a simple job - let inputs = vec![InputValue::U64(42)]; - let expected = vec![OutputValue::U64(42)]; + let inputs = vec![InputValue::Uint64(42)]; + let expected = vec![OutputValue::Uint64(42)]; let result = harness.execute_job(service_id, 0, inputs, expected).await; - assert_ok!(result, "Job execution should succeed"); + assert!(result.is_ok(), "Job execution should succeed"); } #[tokio::test] async fn test_create_deploy_opts() { let test_dir = TempDir::new().unwrap(); let harness = TangleTestHarness::<1>::setup(test_dir).await.unwrap(); - + let manifest_path = PathBuf::from("Cargo.toml"); let opts = harness.create_deploy_opts(manifest_path.clone()); - + assert_eq!(opts.manifest_path, manifest_path); assert!(opts.signer.is_some(), "Should have SR25519 signer"); assert!(opts.signer_evm.is_some(), "Should have EVM signer"); - assert!(opts.http_rpc_url.starts_with("http://"), "Should have HTTP URL"); - assert!(opts.ws_rpc_url.starts_with("ws://"), "Should have WebSocket URL"); + assert!( + opts.http_rpc_url.starts_with("http://"), + "Should have HTTP URL" + ); + assert!( + opts.ws_rpc_url.starts_with("ws://"), + "Should have WebSocket URL" + ); } #[tokio::test] diff --git a/crates/testing-utils/tangle/src/tests.rs b/crates/testing-utils/tangle/src/tests.rs index 9e3e5eb8e..0ef033310 100644 --- a/crates/testing-utils/tangle/src/tests.rs +++ b/crates/testing-utils/tangle/src/tests.rs @@ -11,7 +11,7 @@ async fn test_client_initialization() -> Result<(), Error> { setup_log(); let temp_dir = tempfile::TempDir::new()?; - let harness = TangleTestHarness::setup(temp_dir).await?; + let harness = TangleTestHarness::<1>::setup(temp_dir).await?; assert!( harness @@ -32,7 +32,7 @@ async fn test_operator_metadata() -> Result<(), Error> { setup_log(); let temp_dir = tempfile::TempDir::new()?; - let harness = TangleTestHarness::setup(temp_dir).await?; + let harness = TangleTestHarness::<1>::setup(temp_dir).await?; // Get operator metadata for the test account let metadata = harness @@ -53,7 +53,7 @@ async fn test_services_client() -> Result<(), Error> { setup_log(); let temp_dir = tempfile::TempDir::new()?; - let harness = TangleTestHarness::setup(temp_dir).await?; + let harness = TangleTestHarness::<1>::setup(temp_dir).await?; let services = harness.client().services_client(); // Test blueprint queries @@ -91,7 +91,7 @@ async fn test_events_client() -> Result<(), Error> { setup_log(); let temp_dir = tempfile::TempDir::new()?; - let harness = TangleTestHarness::setup(temp_dir).await?; + let harness = TangleTestHarness::<1>::setup(temp_dir).await?; // Test event subscription let latest = harness.client().latest_event().await; @@ -114,7 +114,7 @@ async fn test_gadget_services_client() -> Result<(), Error> { setup_log(); let temp_dir = tempfile::TempDir::new()?; - let harness = TangleTestHarness::setup(temp_dir).await?; + let harness = TangleTestHarness::<1>::setup(temp_dir).await?; // Test operator set retrieval let operators = harness.client().get_operators().await.unwrap(); @@ -140,7 +140,7 @@ async fn test_service_operators() -> Result<(), Error> { setup_log(); let temp_dir = tempfile::TempDir::new()?; - let harness = TangleTestHarness::setup(temp_dir).await?; + let harness = TangleTestHarness::<1>::setup(temp_dir).await?; let services = harness.client().services_client(); // Get current block hash From 29e33fea39bc44258bbb513e4bd7d517ab8d455a Mon Sep 17 00:00:00 2001 From: Thomas Braun Date: Thu, 30 Jan 2025 22:27:43 -0500 Subject: [PATCH 3/3] feat: add correct logic, trait type needs attn --- Cargo.lock | 1 + blueprints/incredible-squaring/src/tests.rs | 8 +- crates/event-listeners/core/Cargo.toml | 1 + crates/event-listeners/core/src/lib.rs | 2 + crates/testing-utils/tangle/src/harness.rs | 295 +++++++++++++------- 5 files changed, 197 insertions(+), 110 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c2cbc2756..4f4d93c12 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6945,6 +6945,7 @@ name = "gadget-event-listeners-core" version = "0.1.0" dependencies = [ "async-trait", + "auto_impl", "futures", "gadget-logging", "gadget-std", diff --git a/blueprints/incredible-squaring/src/tests.rs b/blueprints/incredible-squaring/src/tests.rs index 92622036f..252279229 100644 --- a/blueprints/incredible-squaring/src/tests.rs +++ b/blueprints/incredible-squaring/src/tests.rs @@ -1,10 +1,8 @@ use crate::{MyContext, XsquareEventHandler}; use blueprint_sdk::config::GadgetConfiguration; -use blueprint_sdk::event_listeners::core::InitializableEventHandler; use blueprint_sdk::logging::setup_log; use blueprint_sdk::testing::tempfile; use blueprint_sdk::testing::utils::harness::TestHarness; -use blueprint_sdk::testing::utils::runner::TestEnv; use blueprint_sdk::testing::utils::tangle::{InputValue, OutputValue, TangleTestHarness}; use color_eyre::Result; @@ -16,7 +14,7 @@ async fn test_incredible_squaring() -> Result<()> { let temp_dir = tempfile::TempDir::new()?; let harness = TangleTestHarness::<1>::setup(temp_dir).await?; - let xsquare_creator = |env| async move { + let xsquare_creator = |env: GadgetConfiguration| async move { // Create blueprint-specific context let blueprint_ctx = MyContext { env: env.clone(), @@ -24,7 +22,9 @@ async fn test_incredible_squaring() -> Result<()> { }; // Initialize event handler - XsquareEventHandler::new(&env, blueprint_ctx).await? + XsquareEventHandler::new(&env, blueprint_ctx) + .await + .expect("Failed to create event handler") }; // Setup service diff --git a/crates/event-listeners/core/Cargo.toml b/crates/event-listeners/core/Cargo.toml index adfbd36eb..2cddb5356 100644 --- a/crates/event-listeners/core/Cargo.toml +++ b/crates/event-listeners/core/Cargo.toml @@ -10,6 +10,7 @@ gadget-std = { workspace = true } async-trait = { workspace = true } thiserror = { workspace = true } tokio = { workspace = true, features = ["sync"] } +auto_impl = { workspace = true } [features] default = ["std"] diff --git a/crates/event-listeners/core/src/lib.rs b/crates/event-listeners/core/src/lib.rs index e39c97635..799147fca 100644 --- a/crates/event-listeners/core/src/lib.rs +++ b/crates/event-listeners/core/src/lib.rs @@ -8,6 +8,7 @@ pub mod executor; pub mod testing; use async_trait::async_trait; +use auto_impl::auto_impl; use exponential_backoff::ExponentialBackoff; use gadget_std::iter::Take; @@ -29,6 +30,7 @@ pub fn get_exponential_backoff() -> Take { } #[async_trait] +#[auto_impl(Arc, Box)] pub trait InitializableEventHandler { async fn init_event_handler( &self, diff --git a/crates/testing-utils/tangle/src/harness.rs b/crates/testing-utils/tangle/src/harness.rs index c99d05ba2..df1afe56d 100644 --- a/crates/testing-utils/tangle/src/harness.rs +++ b/crates/testing-utils/tangle/src/harness.rs @@ -21,17 +21,19 @@ use gadget_runners::core::error::RunnerError; use gadget_runners::tangle::tangle::{PriceTargets, TangleConfig}; use sp_core::Pair; use std::collections::HashMap; +use std::future::Future; +use std::pin::Pin; +use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; use tangle_subxt::tangle_testnet_runtime::api::services::{ calls::types::{call::Job, register::Preferences}, events::JobResultSubmitted, }; use tempfile::TempDir; -use tokio::sync::Mutex; use url::Url; /// Configuration for the Tangle test harness -#[derive(Default)] +#[derive(Default, Clone)] pub struct TangleTestConfig { pub http_endpoint: Option, pub ws_endpoint: Option, @@ -46,6 +48,7 @@ pub struct TangleTestHarness { pub ecdsa_signer: TanglePairSigner, pub alloy_key: alloy_signer_local::PrivateKeySigner, pub client_envs: Vec, + config: TangleTestConfig, _temp_dir: tempfile::TempDir, _node: crate::node::testnet::SubstrateNode, } @@ -57,25 +60,55 @@ pub struct TangleTestHarness { /// of the system as a whole (either all jobs are running successfully, or a specific job has failed which it interpreted /// as a global failure in the testing context. pub struct MultiNodeTangleTestEnv { - envs: Vec, start_tx: Option>, - command_tx: tokio::sync::mpsc::UnboundedSender, + command_tx: tokio::sync::mpsc::UnboundedSender, dead_rx: Option>, - running_test_nodes: Arc>>, + running_test_nodes: Arc, + jobs_added_count: usize, } -enum NodeExecutorCommand { - StartJob(usize, Box), - StopJob(usize), +enum MultiNodeExecutorCommand { + AddJob(usize, Arc), + AddNode(usize), + RemoveNode(usize), + Shutdown, } trait JobCreator: - Fn(GadgetConfiguration) -> Box + Send + 'static + Fn( + GadgetConfiguration, + ) -> Pin< + Box< + dyn Future< + Output = Result< + Box, + RunnerError, + >, + > + Send + + Sync + + 'static, + >, + > + Send + + Sync + + 'static { } impl< - T: Fn(GadgetConfiguration) -> Box - + Send + T: Fn( + GadgetConfiguration, + ) -> Pin< + Box< + dyn Future< + Output = Result< + Box, + RunnerError, + >, + > + Send + + Sync + + 'static, + >, + > + Send + + Sync + 'static, > JobCreator for T { @@ -101,15 +134,19 @@ impl MultiNodeTangleTestEnv { /// /// The `MultiNodeTangleTestEnv` also provides a method for getting the `GadgetConfiguration` for /// a given job ID. This can be used to get the configuration for a job without starting the job. - pub fn new( - initial_node_envs: HashMap, - envs: Vec, - ) -> Self { + pub fn new(count: usize, tangle_config: TangleTestConfig) -> Self { let (start_tx, start_rx) = tokio::sync::oneshot::channel(); let (command_tx, mut command_rx) = tokio::sync::mpsc::unbounded_channel(); let (dead_tx, dead_rx) = tokio::sync::oneshot::channel(); - let envs_clone = envs.clone(); + let running_test_nodes = Arc::new(AtomicUsize::new(0)); + // Ensure that adding nodes is the first operation handled once started + for idx in 0..count { + command_tx + .send(MultiNodeExecutorCommand::AddNode(idx)) + .expect("Failed to add node"); + } + let running_count = running_test_nodes.clone(); // This task will not run until the user has triggered it to begin let background_task = async move { if start_rx.await.is_err() { @@ -118,53 +155,71 @@ impl MultiNodeTangleTestEnv { // Allows stopping a running job let mut handles = HashMap::new(); - + let mut jobs: Vec<(usize, Arc)> = vec![]; while let Some(command) = command_rx.recv().await { match command { - NodeExecutorCommand::StartJob(job_id, creator) => { - let job = creator(envs_clone[job_id].clone()); - // TODO: Job should run inside node test env - if let Some(stopper_rx) = job.init_event_handler().await { - let (control_tx, control_rx) = tokio::sync::oneshot::channel::<()>(); - let task = async move { - tokio::select! { - _ = control_rx => { - gadget_logging::info!("[SHUTDOWN] Job {job_id} manually shutdown"); - }, - res = stopper_rx => { - gadget_logging::info!("[SHUTDOWN] Job {job_id} completed with result {res:?}"); - } - }; - }; - - handles.insert(job_id, control_tx); - } else { - gadget_logging::warn!( - "Failed to initialize event handler for job {job_id}" - ); + MultiNodeExecutorCommand::AddNode(node_id) => { + let env = generate_env_from_node_id( + node_id, + tangle_config.http_endpoint.clone().expect("Should exist"), + tangle_config.ws_endpoint.clone().expect("Should exist"), + ) + .await + .unwrap(); + + let mut node = TangleTestEnv::new(TangleConfig::default(), env.clone())?; + // Add all jobs to the node + for (_job_id, creator) in jobs.clone() { + let job = creator(env.clone()).await; + node.add_job(job); } + + let (node_control_tx, node_control_rx) = + tokio::sync::oneshot::channel::<()>(); + running_count.fetch_add(1, std::sync::atomic::Ordering::SeqCst); + let running_count_on_drop = running_count.clone(); + drop(tokio::spawn(async move { + tokio::select! { + _ = node_control_rx => { + gadget_logging::info!("Node {node_id} shutting down by request"); + }, + res = node.run_runner() => { + gadget_logging::warn!("Node {node_id} shutting down: {res:?}"); + } + } + + running_count_on_drop.fetch_sub(1, std::sync::atomic::Ordering::SeqCst); + })); + + handles.insert(node_id, node_control_tx); } - NodeExecutorCommand::StopJob(job_id) => { - gadget_logging::info!("[SHUTDOWN] Job {job_id} shutdown signal received"); - if let Some(control_tx) = handles.remove(&job_id) { - let _ = control_tx.send(()); - } else { - gadget_logging::warn!("Failed to stop job {job_id}"); - } + MultiNodeExecutorCommand::RemoveNode(node_id) => { + handles.remove(&node_id); + } + + MultiNodeExecutorCommand::AddJob(job_id, creator) => { + jobs.push((job_id, creator)); + } + + MultiNodeExecutorCommand::Shutdown => { + let _ = dead_tx.send(RunnerError::Other("Shutting down".to_string())); + break; } } } + + Ok::<_, RunnerError>(()) }; drop(tokio::spawn(background_task)); Self { - envs, command_tx, start_tx: Some(start_tx), dead_rx: Some(dead_rx), - running_test_nodes: Arc::new(Mutex::new(initial_node_envs)), + running_test_nodes, + jobs_added_count: 0, } } @@ -181,26 +236,52 @@ impl MultiNodeTangleTestEnv { /// /// If the job cannot be added to the test harness, an error is returned. pub fn add_job< - T: Fn(GadgetConfiguration) -> K + Send + 'static, - K: InitializableEventHandler + Send + 'static, + T: Fn(GadgetConfiguration) -> F + Send + Sync + 'static, + F: Future> + Send + Sync + 'static, + K: InitializableEventHandler + Send + Sync + 'static, >( &mut self, job_creator: T, ) -> Result<&mut Self, RunnerError> { self.command_tx - .send(NodeExecutorCommand::StartJob( - self.envs.len(), - Box::new(move |env| Box::new(job_creator(env)) as Box<_>), + .send(MultiNodeExecutorCommand::AddJob( + self.jobs_added_count, + Arc::new(move |env| { + Box::pin(async move { Box::new(job_creator(env).await) as Box<_> }) + }), )) .map_err(|err| RunnerError::Other(err.to_string()))?; + self.jobs_added_count += 1; + Ok(self) } - /// Stops a job with the given job ID. Returns an error if the job is not found or if - /// the job is not running. - pub fn stop_job(&mut self, job_id: usize) -> Result<&mut Self, RunnerError> { + /// Adds a new node to the test harness. + /// + /// The `node_id` parameter specifies the ID of the node to be added. + /// + /// The node is added to the test harness, and all jobs that are currently running + /// will be executed on the new node. + /// + /// If the node cannot be added to the test harness, an error is returned. + pub fn add_node(&mut self, node_id: usize) -> Result<&mut Self, RunnerError> { self.command_tx - .send(NodeExecutorCommand::StopJob(job_id)) + .send(MultiNodeExecutorCommand::AddNode(node_id)) + .map_err(|err| RunnerError::Other(err.to_string()))?; + Ok(self) + } + + /// Removes a node from the test harness. + /// + /// The `node_id` parameter specifies the ID of the node to be removed. + /// + /// The node is removed from the test harness, and all jobs that are currently + /// running on the node will be terminated. + /// + /// If the node cannot be removed from the test harness, an error is returned. + pub fn remove_node(&mut self, node_id: usize) -> Result<&mut Self, RunnerError> { + self.command_tx + .send(MultiNodeExecutorCommand::RemoveNode(node_id)) .map_err(|err| RunnerError::Other(err.to_string()))?; Ok(self) } @@ -213,23 +294,17 @@ impl MultiNodeTangleTestEnv { self.start_tx .take() .ok_or_else(|| RunnerError::Other("Test harness already started".to_string()))?; - /* - Old logic for reference: - let mut handles = FuturesUnordered::new(); - for test_env in &mut self.test_envs { - handles.push(tokio::spawn(test_env.run_runner())); - } - - async move { - let mut results = handles.try_collect::>().await - .map_err(|err| RunnerError::Other(err.to_string()))?; - results.pop().expect("Should have at least one result") - }*/ Ok(()) } - pub async fn is_empty(&self) -> bool { - self.running_test_nodes.lock().await.is_empty() + pub fn is_empty(&self) -> bool { + self.running_test_nodes.load(Ordering::SeqCst) == 0 + } + + pub fn shutdown(&self) { + self.command_tx + .send(MultiNodeExecutorCommand::Shutdown) + .expect("Failed to send shutdown command"); } pub async fn wait_for_error(&mut self) { @@ -240,6 +315,44 @@ impl MultiNodeTangleTestEnv { } const ENDOWED_TEST_NAMES: [&str; 5] = ["Alice", "Bob", "Charlie", "Dave", "Eve"]; +async fn generate_env_from_node_id( + id: usize, + http_endpoint: Url, + ws_endpoint: Url, +) -> Result { + if id >= ENDOWED_TEST_NAMES.len() { + return Err(RunnerError::Other(format!( + "Invalid node id {id}, must be less than {}", + ENDOWED_TEST_NAMES.len() + ))); + } + + let name = ENDOWED_TEST_NAMES[id]; + let test_dir_path = format!("./{}", name.to_ascii_lowercase()); + tokio::fs::create_dir_all(&test_dir_path).await?; + inject_tangle_key(&test_dir_path, &format!("//{name}")) + .map_err(|err| RunnerError::Other(err.to_string()))?; + + // Create context config + let context_config = ContextConfig::create_tangle_config( + http_endpoint, + ws_endpoint, + test_dir_path, + None, + SupportedChains::LocalTestnet, + 0, + Some(0), + ); + + // Load environment + let mut env = gadget_config::load(context_config) + .map_err(|e| Error::Setup(e.to_string())) + .map_err(|err| RunnerError::Other(err.to_string()))?; + + // Always set test mode, dont require callers to set env vars + env.test_mode = true; + Ok(env) +} #[async_trait::async_trait] impl TestHarness for TangleTestHarness { @@ -259,33 +372,9 @@ impl TestHarness for TangleTestHarness { let mut client_envs = vec![]; - for name in ENDOWED_TEST_NAMES.iter().take(N) { - // Setup testing directory - let test_dir_path = test_dir - .path() - .join(format!("./{}", name.to_ascii_lowercase())) - .to_string_lossy() - .into_owned(); - tokio::fs::create_dir_all(&test_dir_path).await?; - inject_tangle_key(&test_dir_path, &format!("//{name}"))?; - - // Create context config - let context_config = ContextConfig::create_tangle_config( - http_endpoint.clone(), - ws_endpoint.clone(), - test_dir_path, - None, - SupportedChains::LocalTestnet, - 0, - Some(0), - ); - - // Load environment - let mut env = - gadget_config::load(context_config).map_err(|e| Error::Setup(e.to_string()))?; - - // Always set test mode, dont require callers to set env vars - env.test_mode = true; + for idx in 0..N { + let env = + generate_env_from_node_id(idx, http_endpoint.clone(), ws_endpoint.clone()).await?; client_envs.push(env); } @@ -320,6 +409,7 @@ impl TestHarness for TangleTestHarness { ecdsa_signer, alloy_key, _temp_dir: test_dir, + config, _node: node, }; @@ -425,14 +515,7 @@ impl TangleTestHarness { .await .map_err(|e| Error::Setup(e.to_string()))?; - let mut test_envs = HashMap::new(); - for (id, env) in self.client_envs.iter().enumerate() { - // Create and spawn test environment - let test_env = TangleTestEnv::new(TangleConfig::default(), env.clone())?; - test_envs.insert(id, test_env); - } - - let executor = MultiNodeTangleTestEnv::new(test_envs, self.client_envs.clone()); + let executor = MultiNodeTangleTestEnv::new(N, self.config.clone()); Ok((executor, service_id)) } @@ -529,7 +612,7 @@ mod tests { // First set up a service let (test_envs, service_id) = harness.setup_services().await.unwrap(); - assert!(!test_envs.is_empty().await, "Should have test environments"); + assert!(!test_envs.is_empty(), "Should have test environments"); assert_eq!(service_id, 0, "Should have valid service ID = 0"); // Execute a simple job