Skip to content

Commit

Permalink
init barrier in new_pipe_exec_layer_api
Browse files Browse the repository at this point in the history
  • Loading branch information
nekomoto911 committed Jan 2, 2025
1 parent c3ad429 commit 60074cd
Showing 1 changed file with 18 additions and 29 deletions.
47 changes: 18 additions & 29 deletions crates/pipe-exec-layer-ext-v2/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,19 +91,6 @@ struct PipeBarrier<Result> {
inner: Mutex<PipeBarrierInner<Result>>,
}

impl<Result> Default for PipeBarrier<Result> {
fn default() -> Self {
Self {
inner: Mutex::new(PipeBarrierInner {
block_number: 0,
result: None,
waiters: Vec::new(),
closed: false,
}),
}
}
}

#[derive(Debug)]
struct PipeBarrierInner<Result> {
block_number: u64,
Expand All @@ -113,6 +100,17 @@ struct PipeBarrierInner<Result> {
}

impl<Result> PipeBarrier<Result> {
fn new(block_number: u64, result: Result) -> Self {
Self {
inner: Mutex::new(PipeBarrierInner {
block_number,
result: Some(result),
waiters: Vec::new(),
closed: false,
}),
}
}

/// Wait until the block number is reached and return the result.
/// Returns `None` if the barrier has been closed.
async fn wait(&self, block_number: u64) -> Option<Result> {
Expand Down Expand Up @@ -167,19 +165,7 @@ impl<Result> PipeBarrier<Result> {
}

impl<Storage: GravityStorage> PipeExecService<Storage> {
async fn run(mut self, latest_block_header: Header, latest_block_hash: B256) {
let mut latest_block_number = latest_block_header.number;
self.core
.execute_block_barrier
.notify(latest_block_number, latest_block_header)
.await
.unwrap();
self.core
.make_canonical_barrier
.notify(latest_block_number, latest_block_hash)
.await
.unwrap();

async fn run(mut self, mut latest_block_number: u64) {
loop {
let ordered_block = match self.ordered_block_rx.recv().await {
Some(ordered_block) => ordered_block,
Expand Down Expand Up @@ -479,12 +465,15 @@ pub fn new_pipe_exec_layer_api<Storage: GravityStorage>(
evm_config: EthEvmConfig::new(chain_spec.clone()),
chain_spec,
event_tx,
execute_block_barrier: PipeBarrier::default(),
make_canonical_barrier: PipeBarrier::default(),
execute_block_barrier: PipeBarrier::new(
latest_block_header.number,
latest_block_header,
),
make_canonical_barrier: PipeBarrier::new(latest_block_header.number, latest_block_hash),
}),
ordered_block_rx,
};
tokio::spawn(service.run(latest_block_header, latest_block_hash));
tokio::spawn(service.run(latest_block_header.number));

PIPE_EXEC_LAYER_EXT.get_or_init(|| PipeExecLayerExt { event_rx: Mutex::new(event_rx) });

Expand Down

0 comments on commit 60074cd

Please sign in to comment.