Skip to content

Commit

Permalink
Event Workflows (phase 1: Custom listeners) (#359)
Browse files Browse the repository at this point in the history
* Add wrapper types + trait structure to enforce structure for event flows

* Decoupling/refactor and ensure integration test passes

* periodic web poller working and further refactor of macro code

* Everything compiling, integration test passes
  • Loading branch information
tbraun96 authored Oct 18, 2024
1 parent a2d4646 commit 81916d4
Show file tree
Hide file tree
Showing 17 changed files with 874 additions and 295 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,4 @@ CODEGEN_LOCK
blueprint.json
blueprint*.json
blueprint.lock
/blueprints/periodic-web-poller/node_modules/
23 changes: 23 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ members = [
"blueprint-metadata",
"blueprints/incredible-squaring",
"blueprints/incredible-squaring-eigenlayer",
"blueprints/periodic-web-poller",
"cli",
"gadget-io",
"blueprint-test-utils",
Expand Down Expand Up @@ -47,6 +48,7 @@ blueprint-test-utils = { path = "./blueprint-test-utils" }
gadget-sdk = { path = "./sdk", default-features = false, version = "0.1.2" }

incredible-squaring-blueprint-eigenlayer = { path = "./blueprints/incredible-squaring-eigenlayer", default-features = false, version = "0.1.1" }
periodic-web-poller-blueprint = { path = "./blueprints/periodic-web-poller", default-features = false, version = "0.1.1" }
gadget-blueprint-proc-macro = { path = "./macros/blueprint-proc-macro", default-features = false, version = "0.1.2" }
gadget-blueprint-proc-macro-core = { path = "./macros/blueprint-proc-macro-core", default-features = false, version = "0.1.2" }
gadget-context-derive = { path = "./macros/context-derive", default-features = false, version = "0.1.1" }
Expand Down
13 changes: 8 additions & 5 deletions blueprints/incredible-squaring-eigenlayer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,12 +65,15 @@ impl Config for NodeConfig {
id = 1,
params(number_to_be_squared, task_created_block, quorum_numbers, quorum_threshold_percentage),
result(_),
event_listener(EvmContractEventListener(
instance = IncredibleSquaringTaskManager,
event_listener(
listener = EvmContractEventListener(
instance = IncredibleSquaringTaskManager,
event = IncredibleSquaringTaskManager::NewTaskCreated,
event_converter = convert_event_to_inputs,
callback = IncredibleSquaringTaskManager::IncredibleSquaringTaskManagerCalls::respondToTask
),
event = IncredibleSquaringTaskManager::NewTaskCreated,
event_converter = convert_event_to_inputs,
callback = IncredibleSquaringTaskManager::IncredibleSquaringTaskManagerCalls::respondToTask
)),
),
)]
pub async fn xsquare_eigen(
number_to_be_squared: U256,
Expand Down
5 changes: 4 additions & 1 deletion blueprints/incredible-squaring/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@ use std::convert::Infallible;
id = 0,
params(x),
result(_),
event_listener(TangleEventListener::<JobCalled>),
event_listener(
listener = TangleEventListener,
event = JobCalled,
),
verifier(evm = "IncredibleSquaringBlueprint")
)]
pub fn xsquare(x: u64) -> Result<u64, Infallible> {
Expand Down
35 changes: 35 additions & 0 deletions blueprints/periodic-web-poller/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
[package]
name = "periodic-web-poller-blueprint"
version = "0.1.1"
description = "A Simple Blueprint to demo how to run blueprints dependent on an arbitrary events"
authors.workspace = true
edition.workspace = true
license.workspace = true
homepage.workspace = true
repository.workspace = true
publish = false

[dependencies]
tracing = { workspace = true }
async-trait = { workspace = true }
gadget-sdk = { workspace = true, features = ["std"] }
color-eyre = { workspace = true }
lock_api = { workspace = true }
tokio = { workspace = true, default-features = false, features = ["full"] }
tokio-util = { workspace = true }
sp-core = { workspace = true }
subxt-signer = { workspace = true, features = ["sr25519", "subxt", "std"] }
parking_lot = { workspace = true }
ed25519-zebra = { workspace = true, features = ["pkcs8", "default", "der", "std", "serde", "pem"] }
structopt = { workspace = true }
hex = { workspace = true }
k256 = { workspace = true }
serde_json = { workspace = true }
reqwest = { workspace = true }

[build-dependencies]
blueprint-metadata = { workspace = true }

[features]
default = ["std"]
std = []
6 changes: 6 additions & 0 deletions blueprints/periodic-web-poller/build.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
fn main() {
println!("cargo:rerun-if-changed=src/*");
println!("cargo:rerun-if-changed=src/lib.rs");
println!("cargo:rerun-if-changed=src/main.rs");
blueprint_metadata::generate_json();
}
83 changes: 83 additions & 0 deletions blueprints/periodic-web-poller/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
use async_trait::async_trait;
use gadget_sdk::event_listener::periodic::PeriodicEventListener;
use gadget_sdk::event_listener::EventListener;
use gadget_sdk::job;
use std::convert::Infallible;

#[job(
id = 0,
params(value),
result(_),
event_listener(
listener = PeriodicEventListener<2000, WebPoller, serde_json::Value, reqwest::Client>,
event = serde_json::Value,
pre_processor = pre_process,
post_processor = post_process,
),
)]
// Maps a boolean value obtained from pre-processing to a u8 value
pub async fn web_poller(value: bool, client: reqwest::Client) -> Result<u8, Infallible> {
gadget_sdk::info!("Running web_poller on value: {value}");
Ok(value as u8)
}

// Maps a JSON response to a boolean value
async fn pre_process(event: serde_json::Value) -> Result<bool, gadget_sdk::Error> {
gadget_sdk::info!("Running web_poller pre-processor on value: {event}");
let completed = event["completed"].as_bool().unwrap_or(false);
Ok(completed)
}

// Received the u8 value output from the job and performs any last post-processing
async fn post_process(job_output: u8) -> Result<(), gadget_sdk::Error> {
gadget_sdk::info!("Running web_poller post-processor on value: {job_output}");
if job_output == 1 {
Ok(())
} else {
Err(gadget_sdk::Error::Other(
"Job failed since query returned with a false status".to_string(),
))
}
}

/// Define an event listener that polls a webserver
pub struct WebPoller {
pub client: reqwest::Client,
}

#[async_trait]
impl EventListener<serde_json::Value, reqwest::Client> for WebPoller {
async fn new(context: &reqwest::Client) -> Result<Self, gadget_sdk::Error>
where
Self: Sized,
{
Ok(Self {
client: context.clone(),
})
}

/// Implement the logic that polls the web server
async fn next_event(&mut self) -> Option<serde_json::Value> {
// Send a GET request to the JSONPlaceholder API
let response = self
.client
.get("https://jsonplaceholder.typicode.com/todos/10")
.send()
.await
.ok()?;

// Check if the request was successful
if response.status().is_success() {
// Parse the JSON response
let resp: serde_json::Value = response.json().await.ok()?;
Some(resp)
} else {
None
}
}

/// Implement any handler logic when an event is received
async fn handle_event(&mut self, _event: serde_json::Value) -> Result<(), gadget_sdk::Error> {
unreachable!("Not called here")
}
}
21 changes: 21 additions & 0 deletions blueprints/periodic-web-poller/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
use color_eyre::Result;
use gadget_sdk::info;
use gadget_sdk::job_runner::MultiJobRunner;
use periodic_web_poller_blueprint as blueprint;

#[gadget_sdk::main]
async fn main() {
let web_poller = blueprint::WebPollerEventHandler {
client: reqwest::Client::new(),
};

info!("~~~ Executing the periodic web poller ~~~");
MultiJobRunner::new(None)
.with_job()
.finish(web_poller)
.run()
.await?;

info!("Exiting...");
Ok(())
}
2 changes: 1 addition & 1 deletion cli/src/signer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ pub fn load_signer_from_env() -> Result<TanglePairSigner> {
})
.note(SURI_HELP_MSG)?;

let sp_core_keypair = sp_core::sr25519::Pair::from_string(&*s, None)?;
let sp_core_keypair = sp_core::sr25519::Pair::from_string(&s, None)?;
Ok(TanglePairSigner::new(sp_core_keypair))
}

Expand Down
12 changes: 6 additions & 6 deletions docs/event_listeners.md
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ pub struct WebPoller {
Then, implement `EventListener` for `WebPoller`:

```rust
#[async_trait::async_trait]
impl EventListener<serde_json::Value, MyContext> for WebPoller {
/// Build the event listener. Note that this time, we don't necessarily need the context
async fn new(_context: &MyContext) -> Result<Self, Error>
Expand Down Expand Up @@ -217,17 +218,16 @@ Finally, register the event listener inside the `job` macro using `event_listene
```rust
#[job(
id = 0,
params(x),
params(value),
result(_),
event_listener(PeriodicEventListener::<6000, WebPoller, serde_json::Value, MyContext>), // <-- Register the event listener here
verifier(evm = "IncredibleSquaringBlueprint")
)]
pub fn hello_event_listener(
x: u64,
value: serde_json::Value,
context: MyContext,
env: GadgetConfiguration<parking_lot::RawRwLock>,
) -> Result<u64, Infallible> {
Ok(x.saturating_pow(2u32))
) -> Result<bool, Infallible> {
let completed = value["completed"].as_bool().unwrap_or(false);
Ok(completed)
}
```

Expand Down
12 changes: 6 additions & 6 deletions macros/blueprint-proc-macro-playground/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ pub struct MyContext;
// ==================

/// Simple Threshold (t) Keygen Job for n parties.
#[job(id = 0, params(n, t), event_listener(TangleEventListener<JobCalled>), result(_))]
#[job(id = 0, params(n, t), event_listener(listener = TangleEventListener, event = JobCalled), result(_))]
pub fn keygen(ctx: &MyContext, n: u16, t: u16) -> Result<Vec<u8>, Error> {
let _ = (n, t, ctx);
Ok(vec![0; 33])
Expand All @@ -38,7 +38,7 @@ pub fn keygen(ctx: &MyContext, n: u16, t: u16) -> Result<Vec<u8>, Error> {
#[job(
id = 1,
params(keygen_id, data),
event_listener(TangleEventListener<JobCalled>),
event_listener(listener = TangleEventListener, event = JobCalled),
result(_)
)]
pub async fn sign(keygen_id: u64, data: Vec<u8>) -> Result<Vec<u8>, Error> {
Expand All @@ -49,7 +49,7 @@ pub async fn sign(keygen_id: u64, data: Vec<u8>) -> Result<Vec<u8>, Error> {
#[job(
id = 2,
params(keygen_id, new_t),
event_listener(TangleEventListener<JobCalled>),
event_listener(listener = TangleEventListener, event = JobCalled),
result(_)
)]
pub fn refresh(keygen_id: u64, new_t: Option<u8>) -> Result<Vec<u64>, Error> {
Expand All @@ -58,7 +58,7 @@ pub fn refresh(keygen_id: u64, new_t: Option<u8>) -> Result<Vec<u64>, Error> {
}

/// Say hello to someone or the world.
#[job(id = 3, params(who), event_listener(TangleEventListener<JobCalled>), result(_))]
#[job(id = 3, params(who), event_listener(listener = TangleEventListener, event = JobCalled), result(_))]
pub fn say_hello(who: Option<String>) -> Result<String, Error> {
match who {
Some(who) => Ok(format!("Hello, {}!", who)),
Expand All @@ -84,7 +84,7 @@ pub fn on_request(nft_id: u64);
#[report(
job_id = 0,
params(n, t, msgs),
event_listener(TangleEventListener<JobCalled>),
event_listener(listener = TangleEventListener, event = JobCalled),
result(_),
report_type = "job",
verifier(evm = "KeygenContract")
Expand All @@ -96,7 +96,7 @@ fn report_keygen(n: u16, t: u16, msgs: Vec<Vec<u8>>) -> u32 {

#[report(
params(uptime, response_time, error_rate),
event_listener(TangleEventListener<JobCalled>),
event_listener(listener = TangleEventListener, event = JobCalled),
result(Vec<u8>),
report_type = "qos",
interval = 3600,
Expand Down
Loading

0 comments on commit 81916d4

Please sign in to comment.