Skip to content

Commit

Permalink
sdk: Switch to backon
Browse files Browse the repository at this point in the history
`backoff` hasn't been updated in years, and `backon` supports `wasm32-unknown-unknown`. Most of the `events_watcher` module builds on WASM now.
  • Loading branch information
Serial-ATA committed Sep 17, 2024
1 parent f723e74 commit a1468cf
Show file tree
Hide file tree
Showing 11 changed files with 99 additions and 113 deletions.
29 changes: 19 additions & 10 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ sp-session = { version = "35.0.0" }

async-trait = "0.1.82"
auto_impl = "1.2.0"
backoff = { version = "0.4.0", default-features = false }
backon = { version = "1.2.0", default-features = false }
bincode = "1.3.3"
cargo-generate = "0.21.3"
cfg-if = "1.0.0"
Expand Down
19 changes: 12 additions & 7 deletions sdk/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ w3f-bls = { workspace = true }
hyper = { workspace = true, features = ["http1", "server"], optional = true }
hyper-util = { workspace = true, features = ["server"], optional = true }
prometheus = { workspace = true }
tokio = { workspace = true, features = ["rt-multi-thread", "parking_lot"], optional = true }
tokio = { workspace = true, optional = true }

# Logging deps
log = { workspace = true }
Expand All @@ -51,7 +51,7 @@ sp-core = { workspace = true, features = ["full_crypto"] }
sp-io = { workspace = true }

# Event Watchers and Handlers
backoff = { workspace = true, optional = true }
backon = { workspace = true, optional = true }
subxt = { workspace = true, optional = true }
subxt-signer = { workspace = true, features = ["sr25519"] }
tangle-subxt = { workspace = true }
Expand All @@ -60,7 +60,7 @@ tangle-subxt = { workspace = true }
alloy-contract = { workspace = true }
alloy-network = { workspace = true }
alloy-primitives = { workspace = true }
alloy-provider = { workspace = true }
alloy-provider = { workspace = true, optional = true }
alloy-pubsub = { workspace = true }
alloy-rpc-types = { workspace = true }
alloy-signer = { workspace = true }
Expand Down Expand Up @@ -106,13 +106,14 @@ hyper = { workspace = true, features = ["client"] }
default = ["std"]

std = [
"dep:backoff",
"dep:alloy-provider",
"dep:backon",
"dep:parking_lot",
"dep:hyper",
"dep:hyper-util",
"dep:subxt",
"dep:tokio",
"backoff/tokio",
"backon/tokio-sleep",
"getrandom",
"gadget-io/std",
"hex/std",
Expand All @@ -122,11 +123,15 @@ std = [
"sqlx",
"subxt/native",
"tangle-subxt/std",
"tokio/rt-multi-thread",
"tokio/parking_lot",
]
wasm = [
"dep:backoff",
"dep:backon",
"dep:subxt",
"backoff/wasm-bindgen",
"dep:tokio",
"backon/gloo-timers-sleep",
"gadget-io/wasm-bindgen",
"getrandom/js",
"subxt/web",
"tangle-subxt/web",
Expand Down
1 change: 1 addition & 0 deletions sdk/src/clients/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use alloc::boxed::Box;
use async_trait::async_trait;
use auto_impl::auto_impl;

#[cfg(feature = "std")]
pub mod tangle;

#[async_trait]
Expand Down
4 changes: 0 additions & 4 deletions sdk/src/events_watcher/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,6 @@ pub enum Error {
// TODO: Add feature flag for substrate/tangle
#[error(transparent)]
Subxt(#[from] subxt::Error),
/// An error occurred in the backoff mechanism.
#[error(transparent)]
#[cfg(any(feature = "std", feature = "wasm"))]
Backoff(#[from] backoff::Error<subxt::Error>),
/// An error occurred in Alloy transport.
// TODO: Add feature flag for EVM/eigenlayer/etc.
#[error(transparent)]
Expand Down
36 changes: 13 additions & 23 deletions sdk/src/events_watcher/evm.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
//! EVM Event Watcher Module
use crate::events_watcher::{error::Error, ConstantWithMaxRetryCount};
use crate::events_watcher::error::Error;
use crate::events_watcher::retry::UnboundedConstantBuilder;
use crate::store::LocalDatabase;
use alloy_network::Network;
use alloy_network::ReceiptResponse;
Expand All @@ -10,6 +11,7 @@ use alloy_rpc_types::BlockNumberOrTag;
use alloy_rpc_types::{Filter, Log};
use alloy_sol_types::SolEvent;
use alloy_transport::Transport;
use backon::{ConstantBuilder, Retryable};
use futures::TryFutureExt;
use std::{ops::Deref, time::Duration};

Expand Down Expand Up @@ -78,14 +80,11 @@ pub trait EventHandlerWithRetry<T: Config>: EventHandler<T> + Send + Sync + 'sta
&self,
contract: &Self::Contract,
(event, log): (Self::Event, Log),
backoff: impl backoff::backoff::Backoff + Send + Sync + 'static,
backoff: impl backon::Backoff + Send + Sync + 'static,
) -> Result<(), Error> {
let ev = event.clone();
let wrapped_task = || {
self.handle_event(contract, (ev.clone(), log.clone()))
.map_err(backoff::Error::transient)
};
backoff::future::retry(backoff, wrapped_task).await?;
let wrapped_task = || self.handle_event(contract, (ev.clone(), log.clone()));
wrapped_task.retry(backoff).await?;
Ok(())
}
}
Expand Down Expand Up @@ -128,15 +127,14 @@ pub trait EventWatcher<T: Config>: Send + Sync {
handlers: Vec<EventHandlerFor<Self, T>>,
) -> Result<(), Error> {
let local_db = LocalDatabase::open("./db");
let backoff = backoff::backoff::Constant::new(Duration::from_secs(1));
let backoff = UnboundedConstantBuilder::new(Duration::from_secs(1));
let task = || async {
let step = 100;
let chain_id: u64 = contract
.provider()
.root()
.get_chain_id()
.map_err(Into::into)
.map_err(backoff::Error::transient)
.await?;

// we only query this once, at the start of the events watcher.
Expand All @@ -145,7 +143,6 @@ pub trait EventWatcher<T: Config>: Send + Sync {
.provider()
.get_block_number()
.map_err(Into::into)
.map_err(backoff::Error::transient)
.await?;

local_db.set(
Expand All @@ -157,8 +154,7 @@ pub trait EventWatcher<T: Config>: Send + Sync {
.provider()
.get_transaction_receipt(Self::GENESIS_TX_HASH)
.await
.map_err(Into::into)
.map_err(backoff::Error::transient)?
.map_err(Into::into)?
.map(|receipt| receipt.block_number().unwrap_or_default())
.unwrap_or_default();

Expand All @@ -174,11 +170,7 @@ pub trait EventWatcher<T: Config>: Send + Sync {
.to_block(BlockNumberOrTag::Number(dest_block)),
);

let events = events_filter
.query()
.await
.map_err(Into::into)
.map_err(backoff::Error::transient)?;
let events = events_filter.query().await.map_err(Into::into)?;
let number_of_events = events.len();
tracing::trace!("Found #{number_of_events} events");
for (event, log) in events {
Expand All @@ -188,10 +180,9 @@ pub trait EventWatcher<T: Config>: Send + Sync {
const MAX_RETRY_COUNT: usize = 5;
let tasks = handlers.iter().map(|handler| {
// a constant backoff with maximum retry count is used here.
let backoff = ConstantWithMaxRetryCount::new(
Duration::from_millis(100),
MAX_RETRY_COUNT,
);
let backoff = ConstantBuilder::default()
.with_delay(Duration::from_millis(100))
.with_max_times(MAX_RETRY_COUNT);
handler.handle_event_with_retry(
&contract,
(event.clone(), log.clone()),
Expand Down Expand Up @@ -239,7 +230,6 @@ pub trait EventWatcher<T: Config>: Send + Sync {
.provider()
.get_block_number()
.map_err(Into::into)
.map_err(backoff::Error::transient)
.await?;
local_db.set(
&format!("TARGET_BLOCK_NUMBER_{}", contract.address()),
Expand All @@ -248,7 +238,7 @@ pub trait EventWatcher<T: Config>: Send + Sync {
}
}
};
backoff::future::retry(backoff, task).await?;
task.retry(backoff).await?;
Ok(())
}
}
41 changes: 2 additions & 39 deletions sdk/src/events_watcher/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,45 +9,8 @@
pub mod error;
pub use error::Error;

#[cfg(feature = "std")]
pub mod evm;
mod retry;
pub mod substrate;
pub mod tangle;

use core::time::Duration;

/// Constant with Max Retry Count is a backoff policy which always returns
/// a constant duration, until it exceeds the maximum retry count.
#[derive(Debug, Clone, Copy)]
pub struct ConstantWithMaxRetryCount {
interval: Duration,
max_retry_count: usize,
count: usize,
}

impl ConstantWithMaxRetryCount {
/// Creates a new Constant backoff with `interval` and `max_retry_count`.
/// `interval` is the duration to wait between retries, and `max_retry_count` is the maximum
/// number of retries, after which we return `None` to indicate that we should stop retrying.
#[must_use]
pub fn new(interval: Duration, max_retry_count: usize) -> Self {
Self {
interval,
max_retry_count,
count: 0,
}
}
}

#[cfg(any(feature = "std", feature = "wasm"))]
impl backoff::backoff::Backoff for ConstantWithMaxRetryCount {
fn reset(&mut self) {
self.count = 0;
}

fn next_backoff(&mut self) -> Option<Duration> {
(self.count < self.max_retry_count).then(|| {
self.count += 1;
self.interval
})
}
}
34 changes: 34 additions & 0 deletions sdk/src/events_watcher/retry.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
use core::time::Duration;

/// A backoff policy which always returns a constant duration, with no maximum retry count.
#[derive(Debug, Clone, Copy)]
pub struct UnboundedConstantBuilder {
interval: Duration,
}

impl UnboundedConstantBuilder {
/// Creates a new unbounded Constant backoff
///
/// * `interval` is the duration to wait between retries.
#[must_use]
#[allow(dead_code)]
pub fn new(interval: Duration) -> Self {
Self { interval }
}
}

impl backon::BackoffBuilder for UnboundedConstantBuilder {
type Backoff = UnboundedConstantBuilder;

fn build(self) -> Self::Backoff {
self
}
}

impl Iterator for UnboundedConstantBuilder {
type Item = Duration;

fn next(&mut self) -> Option<Self::Item> {
Some(self.interval)
}
}
Loading

0 comments on commit a1468cf

Please sign in to comment.