diff --git a/crates/pipe-exec-layer-ext-v2/src/lib.rs b/crates/pipe-exec-layer-ext-v2/src/lib.rs index 0cd0c22d6..1fc8ea959 100644 --- a/crates/pipe-exec-layer-ext-v2/src/lib.rs +++ b/crates/pipe-exec-layer-ext-v2/src/lib.rs @@ -91,19 +91,6 @@ struct PipeBarrier { inner: Mutex>, } -impl Default for PipeBarrier { - fn default() -> Self { - Self { - inner: Mutex::new(PipeBarrierInner { - block_number: 0, - result: None, - waiters: Vec::new(), - closed: false, - }), - } - } -} - #[derive(Debug)] struct PipeBarrierInner { block_number: u64, @@ -113,6 +100,17 @@ struct PipeBarrierInner { } impl PipeBarrier { + fn new(block_number: u64, result: Result) -> Self { + Self { + inner: Mutex::new(PipeBarrierInner { + block_number, + result: Some(result), + waiters: Vec::new(), + closed: false, + }), + } + } + /// Wait until the block number is reached and return the result. /// Returns `None` if the barrier has been closed. async fn wait(&self, block_number: u64) -> Option { @@ -167,19 +165,7 @@ impl PipeBarrier { } impl PipeExecService { - async fn run(mut self, latest_block_header: Header, latest_block_hash: B256) { - let mut latest_block_number = latest_block_header.number; - self.core - .execute_block_barrier - .notify(latest_block_number, latest_block_header) - .await - .unwrap(); - self.core - .make_canonical_barrier - .notify(latest_block_number, latest_block_hash) - .await - .unwrap(); - + async fn run(mut self, mut latest_block_number: u64) { loop { let ordered_block = match self.ordered_block_rx.recv().await { Some(ordered_block) => ordered_block, @@ -471,6 +457,7 @@ pub fn new_pipe_exec_layer_api( let (verified_block_hash_tx, verified_block_hash_rx) = tokio::sync::mpsc::unbounded_channel(); let (event_tx, event_rx) = tokio::sync::mpsc::unbounded_channel(); + let latest_block_number = latest_block_header.number; let service = PipeExecService { core: Arc::new(Core { executed_block_hash_tx, @@ -479,12 +466,12 @@ pub fn new_pipe_exec_layer_api( evm_config: EthEvmConfig::new(chain_spec.clone()), chain_spec, event_tx, - execute_block_barrier: PipeBarrier::default(), - make_canonical_barrier: PipeBarrier::default(), + execute_block_barrier: PipeBarrier::new(latest_block_number, latest_block_header), + make_canonical_barrier: PipeBarrier::new(latest_block_number, latest_block_hash), }), ordered_block_rx, }; - tokio::spawn(service.run(latest_block_header, latest_block_hash)); + tokio::spawn(service.run(latest_block_number)); PIPE_EXEC_LAYER_EXT.get_or_init(|| PipeExecLayerExt { event_rx: Mutex::new(event_rx) });