Skip to content

Commit

Permalink
feat(io): support parallel IO for historical sync (#27)
Browse files Browse the repository at this point in the history
* implement `block_hash` for `ParallelStateProvider`

* support get latest state provider in `Stage::execute`

* support parallel state provider for latest block

* use `ParallelStateProvider` in `ExecutionStage`
  • Loading branch information
nekomoto911 authored Jan 16, 2025
1 parent 377eb49 commit 4a95a05
Show file tree
Hide file tree
Showing 8 changed files with 247 additions and 135 deletions.
15 changes: 11 additions & 4 deletions crates/stages/api/src/pipeline/mod.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
mod ctrl;
mod event;
pub use crate::pipeline::ctrl::ControlFlow;
use crate::{PipelineTarget, StageCheckpoint, StageId};
use crate::{LatestStateProviderFactory, PipelineTarget, StageCheckpoint, StageId};
use alloy_primitives::{BlockNumber, B256};
pub use event::*;
use futures_util::Future;
use reth_primitives_traits::constants::BEACON_CONSENSUS_REORG_UNWIND_DEPTH;
use reth_provider::{
providers::ProviderNodeTypes, writer::UnifiedStorageWriter, DatabaseProviderFactory,
FinalizedBlockReader, FinalizedBlockWriter, ProviderFactory, StageCheckpointReader,
StageCheckpointWriter, StaticFileProviderFactory,
FinalizedBlockReader, FinalizedBlockWriter, ProviderFactory, ProviderResult,
StageCheckpointReader, StageCheckpointWriter, StateProviderBox, StateProviderOptions,
StaticFileProviderFactory,
};
use reth_prune::PrunerBuilder;
use reth_static_file::StaticFileProducer;
Expand Down Expand Up @@ -107,6 +108,12 @@ impl<N: ProviderNodeTypes> Pipeline<N> {
}
}

impl<N: ProviderNodeTypes> LatestStateProviderFactory for ProviderFactory<N> {
fn latest(&self, opts: StateProviderOptions) -> ProviderResult<StateProviderBox> {
self.latest(opts)
}
}

impl<N: ProviderNodeTypes> Pipeline<N> {
/// Registers progress metrics for each registered stage
pub fn register_metrics(&mut self) -> Result<(), PipelineError> {
Expand Down Expand Up @@ -435,7 +442,7 @@ impl<N: ProviderNodeTypes> Pipeline<N> {
target,
});

match stage.execute(&provider_rw, exec_input) {
match stage.execute_v2(&provider_rw, &self.provider_factory, exec_input) {
Ok(out @ ExecOutput { checkpoint, done }) => {
made_progress |=
checkpoint.block_number != prev_checkpoint.unwrap_or_default().block_number;
Expand Down
20 changes: 19 additions & 1 deletion crates/stages/api/src/stage.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use crate::{error::StageError, StageCheckpoint, StageId};
use alloy_primitives::{BlockNumber, TxNumber};
use reth_provider::{BlockReader, ProviderError};
use reth_provider::{
BlockReader, ProviderError, ProviderResult, StateProviderBox, StateProviderOptions,
};
use std::{
cmp::{max, min},
future::{poll_fn, Future},
Expand Down Expand Up @@ -178,6 +180,12 @@ pub struct UnwindOutput {
pub checkpoint: StageCheckpoint,
}

/// A factory for creating latest block state provider.
pub trait LatestStateProviderFactory {
/// Create state provider for latest block
fn latest(&self, opts: StateProviderOptions) -> ProviderResult<StateProviderBox>;
}

/// A stage is a segmented part of the syncing process of the node.
///
/// Each stage takes care of a well-defined task, such as downloading headers or executing
Expand Down Expand Up @@ -233,6 +241,16 @@ pub trait Stage<Provider>: Send + Sync {
/// upon invoking this method.
fn execute(&mut self, provider: &Provider, input: ExecInput) -> Result<ExecOutput, StageError>;

/// Execute the stage.
fn execute_v2(
&mut self,
provider: &Provider,
_factory: &dyn LatestStateProviderFactory,
input: ExecInput,
) -> Result<ExecOutput, StageError> {
self.execute(provider, input)
}

/// Post execution commit hook.
///
/// This is called after the stage has been executed and the data has been committed by the
Expand Down
264 changes: 153 additions & 111 deletions crates/stages/stages/src/stages/execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,19 @@ use reth_provider::{
providers::{StaticFileProvider, StaticFileProviderRWRefMut, StaticFileWriter},
writer::UnifiedStorageWriter,
BlockReader, DBProvider, HeaderProvider, LatestStateProviderRef, OriginalValuesKnown,
ProviderError, StateChangeWriter, StateWriter, StaticFileProviderFactory, StatsReader,
TransactionVariant,
ProviderError, StateChangeWriter, StateProvider, StateProviderOptions, StateWriter,
StaticFileProviderFactory, StatsReader, TransactionVariant,
};
use reth_prune_types::PruneModes;
use reth_revm::database::StateProviderDatabase;
use reth_stages_api::{
BlockErrorKind, CheckpointBlockRange, EntitiesCheckpoint, ExecInput, ExecOutput,
ExecutionCheckpoint, ExecutionStageThresholds, Stage, StageCheckpoint, StageError, StageId,
UnwindInput, UnwindOutput,
ExecutionCheckpoint, ExecutionStageThresholds, LatestStateProviderFactory, Stage,
StageCheckpoint, StageError, StageId, UnwindInput, UnwindOutput,
};
use std::{
cmp::Ordering,
num::NonZero,
ops::RangeInclusive,
sync::Arc,
task::{ready, Context, Poll},
Expand Down Expand Up @@ -194,6 +195,133 @@ where

/// Execute the stage
fn execute(&mut self, provider: &Provider, input: ExecInput) -> Result<ExecOutput, StageError> {
self.execute_inner(provider, None, input)
}

fn execute_v2(
&mut self,
provider: &Provider,
factory: &dyn LatestStateProviderFactory,
input: ExecInput,
) -> Result<ExecOutput, StageError> {
self.execute_inner(provider, Some(factory), input)
}

fn post_execute_commit(&mut self) -> Result<(), StageError> {
let Some(chain) = self.post_execute_commit_input.take() else { return Ok(()) };

// NOTE: We can ignore the error here, since an error means that the channel is closed,
// which means the manager has died, which then in turn means the node is shutting down.
let _ = self
.exex_manager_handle
.send(ExExNotification::ChainCommitted { new: Arc::new(chain) });

Ok(())
}

/// Unwind the stage.
fn unwind(
&mut self,
provider: &Provider,
input: UnwindInput,
) -> Result<UnwindOutput, StageError> {
let (range, unwind_to, _) =
input.unwind_block_range_with_threshold(self.thresholds.max_blocks.unwrap_or(u64::MAX));
if range.is_empty() {
return Ok(UnwindOutput {
checkpoint: input.checkpoint.with_block_number(input.unwind_to),
})
}

// Unwind account and storage changesets, as well as receipts.
//
// This also updates `PlainStorageState` and `PlainAccountState`.
let bundle_state_with_receipts = provider.take_state(range.clone())?;

// Prepare the input for post unwind commit hook, where an `ExExNotification` will be sent.
if self.exex_manager_handle.has_exexs() {
// Get the blocks for the unwound range.
let blocks = provider.sealed_block_with_senders_range(range.clone())?;
let previous_input = self.post_unwind_commit_input.replace(Chain::new(
blocks,
bundle_state_with_receipts,
None,
));

debug_assert!(
previous_input.is_none(),
"Previous post unwind commit input wasn't processed"
);
if let Some(previous_input) = previous_input {
tracing::debug!(target: "sync::stages::execution", ?previous_input, "Previous post unwind commit input wasn't processed");
}
}

let static_file_provider = provider.static_file_provider();

// Unwind all receipts for transactions in the block range
if self.prune_modes.receipts.is_none() && self.prune_modes.receipts_log_filter.is_empty() {
// We only use static files for Receipts, if there is no receipt pruning of any kind.

// prepare_static_file_producer does a consistency check that will unwind static files
// if the expected highest receipt in the files is higher than the database.
// Which is essentially what happens here when we unwind this stage.
let _static_file_producer =
prepare_static_file_producer(provider, &static_file_provider, *range.start())?;
} else {
// If there is any kind of receipt pruning/filtering we use the database, since static
// files do not support filters.
//
// If we hit this case, the receipts have already been unwound by the call to
// `take_state`.
}

// Update the checkpoint.
let mut stage_checkpoint = input.checkpoint.execution_stage_checkpoint();
if let Some(stage_checkpoint) = stage_checkpoint.as_mut() {
for block_number in range {
stage_checkpoint.progress.processed -= provider
.block_by_number(block_number)?
.ok_or_else(|| ProviderError::HeaderNotFound(block_number.into()))?
.gas_used;
}
}
let checkpoint = if let Some(stage_checkpoint) = stage_checkpoint {
StageCheckpoint::new(unwind_to).with_execution_stage_checkpoint(stage_checkpoint)
} else {
StageCheckpoint::new(unwind_to)
};

Ok(UnwindOutput { checkpoint })
}

fn post_unwind_commit(&mut self) -> Result<(), StageError> {
let Some(chain) = self.post_unwind_commit_input.take() else { return Ok(()) };

// NOTE: We can ignore the error here, since an error means that the channel is closed,
// which means the manager has died, which then in turn means the node is shutting down.
let _ =
self.exex_manager_handle.send(ExExNotification::ChainReverted { old: Arc::new(chain) });

Ok(())
}
}

impl<E> ExecutionStage<E>
where
E: BlockExecutorProvider,
{
fn execute_inner<Provider>(
&mut self,
provider: &Provider,
factory: Option<&dyn LatestStateProviderFactory>,
input: ExecInput,
) -> Result<ExecOutput, StageError>
where
Provider:
DBProvider + BlockReader + StaticFileProviderFactory + StatsReader + StateChangeWriter,
for<'a> UnifiedStorageWriter<'a, Provider, StaticFileProviderRWRefMut<'a>>: StateWriter,
{
if input.target_reached() {
return Ok(ExecOutput::done(input.checkpoint()))
}
Expand All @@ -218,16 +346,29 @@ where
None
};

let db = StateProviderDatabase(LatestStateProviderRef::new(
provider.tx_ref(),
provider.static_file_provider(),
));
let mut executor =
if let Some(parallel_provider) = self.executor_provider.try_into_parallel_provider() {
EitherBatchExecutor::Parallel(parallel_provider.batch_executor(Arc::new(db)))
let mut executor = if let Some(parallel_provider) =
self.executor_provider.try_into_parallel_provider()
{
let db: Arc<dyn StateProvider> = if let Some(factory) = factory {
Arc::new(
factory.latest(StateProviderOptions { parallel: NonZero::new(8).unwrap() })?,
)
} else {
EitherBatchExecutor::Sequential(self.executor_provider.batch_executor(db))
Arc::new(LatestStateProviderRef::new(
provider.tx_ref(),
provider.static_file_provider(),
))
};
EitherBatchExecutor::Parallel(
parallel_provider.batch_executor(StateProviderDatabase(db)),
)
} else {
let db = StateProviderDatabase(LatestStateProviderRef::new(
provider.tx_ref(),
provider.static_file_provider(),
));
EitherBatchExecutor::Sequential(self.executor_provider.batch_executor(db))
};
executor.set_tip(max_block);
executor.set_prune_modes(prune_modes);

Expand Down Expand Up @@ -383,105 +524,6 @@ where
done,
})
}

fn post_execute_commit(&mut self) -> Result<(), StageError> {
let Some(chain) = self.post_execute_commit_input.take() else { return Ok(()) };

// NOTE: We can ignore the error here, since an error means that the channel is closed,
// which means the manager has died, which then in turn means the node is shutting down.
let _ = self
.exex_manager_handle
.send(ExExNotification::ChainCommitted { new: Arc::new(chain) });

Ok(())
}

/// Unwind the stage.
fn unwind(
&mut self,
provider: &Provider,
input: UnwindInput,
) -> Result<UnwindOutput, StageError> {
let (range, unwind_to, _) =
input.unwind_block_range_with_threshold(self.thresholds.max_blocks.unwrap_or(u64::MAX));
if range.is_empty() {
return Ok(UnwindOutput {
checkpoint: input.checkpoint.with_block_number(input.unwind_to),
})
}

// Unwind account and storage changesets, as well as receipts.
//
// This also updates `PlainStorageState` and `PlainAccountState`.
let bundle_state_with_receipts = provider.take_state(range.clone())?;

// Prepare the input for post unwind commit hook, where an `ExExNotification` will be sent.
if self.exex_manager_handle.has_exexs() {
// Get the blocks for the unwound range.
let blocks = provider.sealed_block_with_senders_range(range.clone())?;
let previous_input = self.post_unwind_commit_input.replace(Chain::new(
blocks,
bundle_state_with_receipts,
None,
));

debug_assert!(
previous_input.is_none(),
"Previous post unwind commit input wasn't processed"
);
if let Some(previous_input) = previous_input {
tracing::debug!(target: "sync::stages::execution", ?previous_input, "Previous post unwind commit input wasn't processed");
}
}

let static_file_provider = provider.static_file_provider();

// Unwind all receipts for transactions in the block range
if self.prune_modes.receipts.is_none() && self.prune_modes.receipts_log_filter.is_empty() {
// We only use static files for Receipts, if there is no receipt pruning of any kind.

// prepare_static_file_producer does a consistency check that will unwind static files
// if the expected highest receipt in the files is higher than the database.
// Which is essentially what happens here when we unwind this stage.
let _static_file_producer =
prepare_static_file_producer(provider, &static_file_provider, *range.start())?;
} else {
// If there is any kind of receipt pruning/filtering we use the database, since static
// files do not support filters.
//
// If we hit this case, the receipts have already been unwound by the call to
// `take_state`.
}

// Update the checkpoint.
let mut stage_checkpoint = input.checkpoint.execution_stage_checkpoint();
if let Some(stage_checkpoint) = stage_checkpoint.as_mut() {
for block_number in range {
stage_checkpoint.progress.processed -= provider
.block_by_number(block_number)?
.ok_or_else(|| ProviderError::HeaderNotFound(block_number.into()))?
.gas_used;
}
}
let checkpoint = if let Some(stage_checkpoint) = stage_checkpoint {
StageCheckpoint::new(unwind_to).with_execution_stage_checkpoint(stage_checkpoint)
} else {
StageCheckpoint::new(unwind_to)
};

Ok(UnwindOutput { checkpoint })
}

fn post_unwind_commit(&mut self) -> Result<(), StageError> {
let Some(chain) = self.post_unwind_commit_input.take() else { return Ok(()) };

// NOTE: We can ignore the error here, since an error means that the channel is closed,
// which means the manager has died, which then in turn means the node is shutting down.
let _ =
self.exex_manager_handle.send(ExExNotification::ChainReverted { old: Arc::new(chain) });

Ok(())
}
}

fn execution_checkpoint(
Expand Down
Loading

0 comments on commit 4a95a05

Please sign in to comment.