Skip to content

Commit

Permalink
[improve] Support merklization pipeline (#20)
Browse files Browse the repository at this point in the history
  • Loading branch information
Lchangliang authored Jan 3, 2025
1 parent 3233832 commit 491fdb4
Show file tree
Hide file tree
Showing 11 changed files with 139 additions and 89 deletions.
8 changes: 8 additions & 0 deletions crates/chain-state/src/in_memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -932,6 +932,14 @@ mod tests {
) -> ProviderResult<(B256, TrieUpdates)> {
Ok((B256::random(), TrieUpdates::default()))
}

fn state_root_with_updates_v2(&self,
state:HashedPostState,
hashed_state_vec: Vec<Arc<HashedPostState>>,
trie_updates_vec: Vec<Arc<TrieUpdates>>,
) -> ProviderResult<(B256,TrieUpdates)> {
Ok((B256::random(), TrieUpdates::default()))
}
}

impl StorageRootProvider for MockStateProvider {
Expand Down
21 changes: 20 additions & 1 deletion crates/chain-state/src/memory_overlay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use reth_trie::{
};
use std::{
collections::{HashMap, HashSet},
sync::OnceLock,
sync::{Arc, OnceLock},
};

/// A state provider that stores references to in-memory blocks along with their state as well as
Expand Down Expand Up @@ -119,6 +119,25 @@ impl StateRootProvider for MemoryOverlayStateProvider {
self.state_root_from_nodes_with_updates(TrieInput::from_state(state))
}

fn state_root_with_updates_v2(
&self,
state: HashedPostState,
hashed_state_vec: Vec<Arc<HashedPostState>>,
trie_updates_vec: Vec<Arc<TrieUpdates>>,
) -> ProviderResult<(B256, TrieUpdates)> {
let mut input = TrieInput::from_state(state);
let mut trie_state = MemoryOverlayTrieState::default();
hashed_state_vec.iter().for_each(|hashed_state| {
trie_state.state.extend_ref(hashed_state.as_ref());
});
trie_updates_vec.iter().for_each(|trie_updates| {
trie_state.nodes.extend_ref(trie_updates.as_ref());
});
let MemoryOverlayTrieState { nodes, state } = trie_state;
input.prepend_cached(nodes, state);
self.state_root_from_nodes_with_updates(input)
}

fn state_root_from_nodes_with_updates(
&self,
mut input: TrieInput,
Expand Down
96 changes: 40 additions & 56 deletions crates/gravity-storage/src/block_view_storage/mod.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
use async_trait::async_trait;
use reth_payload_builder::database::CachedReads;
use reth_primitives::{revm_primitives::Bytecode, Address, B256, U256};
use reth_revm::database::StateProviderDatabase;
use reth_storage_api::{errors::provider::ProviderError, StateProviderBox, StateProviderFactory};
use reth_trie::{updates::TrieUpdates, HashedPostState};
use revm::{db::BundleState, primitives::AccountInfo, DatabaseRef};
use std::{collections::BTreeMap, sync::Arc};
use tokio::sync::Mutex;
use std::{clone, collections::BTreeMap, sync::{Arc, Mutex}};

use crate::{GravityStorage, GravityStorageError};

Expand All @@ -18,9 +16,8 @@ pub struct BlockViewStorage<Client> {
struct BlockViewStorageInner {
state_provider_info: (B256, u64), // (block_hash, block_number),
block_number_to_view: BTreeMap<u64, Arc<CachedReads>>,
block_number_to_state: BTreeMap<u64, HashedPostState>,
block_number_to_trip_updates: BTreeMap<u64, TrieUpdates>,
block_number_to_hash: BTreeMap<u64, B256>,
block_number_to_state: BTreeMap<u64, Arc<HashedPostState>>,
block_number_to_trip_updates: BTreeMap<u64, Arc<TrieUpdates>>,
block_number_to_id: BTreeMap<u64, B256>,
}

Expand All @@ -37,40 +34,37 @@ fn get_state_provider<Client: StateProviderFactory + 'static>(
}

impl<Client: StateProviderFactory + 'static> BlockViewStorage<Client> {
pub fn new(client: Client, block_number: u64, block_hash: B256) -> Self {
Self { client, inner: Mutex::new(BlockViewStorageInner::new(block_number, block_hash)) }
pub fn new(client: Client, latest_block_number: u64, latest_block_hash: B256, block_number_to_id: BTreeMap<u64, B256>) -> Self {
Self { client, inner: Mutex::new(BlockViewStorageInner::new(latest_block_number, latest_block_hash, block_number_to_id)) }
}
}

impl BlockViewStorageInner {
fn new(block_number: u64, block_hash: B256) -> Self {
let mut res = Self {
fn new(block_number: u64, block_hash: B256, block_number_to_id: BTreeMap<u64, B256>) -> Self {
Self {
state_provider_info: (block_hash, block_number),
block_number_to_view: BTreeMap::new(),
block_number_to_state: BTreeMap::new(),
block_number_to_trip_updates: BTreeMap::new(),
block_number_to_hash: BTreeMap::new(),
block_number_to_id: BTreeMap::new(),
};
res.block_number_to_hash.insert(block_number, block_hash);
res
block_number_to_id: block_number_to_id,
}
}
}

#[async_trait]
impl<Client: StateProviderFactory + 'static> GravityStorage for BlockViewStorage<Client> {
type StateView = BlockViewProvider;

async fn get_state_view(
fn get_state_view(
&self,
target_block_number: u64,
) -> Result<(B256, Self::StateView), GravityStorageError> {
let storage = self.inner.lock().await;
let storage = self.inner.lock().unwrap();
if target_block_number == storage.state_provider_info.1 {
return Ok((
B256::ZERO,
*storage.block_number_to_id.get(&target_block_number).unwrap(),
BlockViewProvider::new(
vec![],
storage.block_number_to_id.clone(),
get_state_provider(&self.client, storage.state_provider_info.0)?,
),
));
Expand All @@ -89,11 +83,11 @@ impl<Client: StateProviderFactory + 'static> GravityStorage for BlockViewStorage
let block_hash = storage.state_provider_info.0;
Ok((
block_id,
BlockViewProvider::new(block_views, get_state_provider(&self.client, block_hash)?),
BlockViewProvider::new(block_views, storage.block_number_to_id.clone(), get_state_provider(&self.client, block_hash)?),
))
}

async fn commit_state(&self, block_id: B256, block_number: u64, bundle_state: &BundleState) {
fn insert_bundle_state(&self, block_id: B256, block_number: u64, bundle_state: &BundleState) {
let mut cached = CachedReads::default();
for (addr, acc) in bundle_state.state().iter().map(|(a, acc)| (*a, acc)) {
if let Some(info) = acc.info.clone() {
Expand All @@ -104,58 +98,53 @@ impl<Client: StateProviderFactory + 'static> GravityStorage for BlockViewStorage
cached.insert_account(addr, info, storage);
}
}
let mut storage = self.inner.lock().await;
let mut storage = self.inner.lock().unwrap();
storage.block_number_to_view.insert(block_number, Arc::new(cached));
storage
.block_number_to_state
.insert(block_number, HashedPostState::from_bundle_state(&bundle_state.state));
.insert(block_number, Arc::new(HashedPostState::from_bundle_state(&bundle_state.state)));
storage.block_number_to_id.insert(block_number, block_id);
}

async fn insert_block_hash(&self, block_number: u64, block_hash: B256) {
let mut storage = self.inner.lock().await;
storage.block_number_to_hash.insert(block_number, block_hash);
}

async fn block_hash_by_number(&self, block_number: u64) -> Result<B256, GravityStorageError> {
let storage = self.inner.lock().await;
match storage.block_number_to_hash.get(&block_number) {
Some(block_hash) => Ok(*block_hash),
None => Err(GravityStorageError::TooNew(block_number)),
}
}

async fn update_canonical(&self, block_number: u64) {
let mut storage = self.inner.lock().await;
fn update_canonical(&self, block_number: u64, block_hash: B256) {
let mut storage = self.inner.lock().unwrap();
assert!(block_number > storage.state_provider_info.1);
let gc_block_number = storage.state_provider_info.1;
let block_hash = *storage.block_number_to_hash.get(&block_number).unwrap();
storage.state_provider_info = (block_hash, block_number);
storage.block_number_to_view.remove(&gc_block_number);
storage.block_number_to_hash.remove(&gc_block_number);
storage.block_number_to_id.remove(&gc_block_number);
storage.block_number_to_state.remove(&gc_block_number);
storage.block_number_to_trip_updates.remove(&gc_block_number);
}

async fn state_root_with_updates(
fn state_root_with_updates(
&self,
block_number: u64,
bundle_state: &BundleState,
) -> Result<(B256, TrieUpdates), GravityStorageError> {
let block_hash = self.block_hash_by_number(block_number - 1).await?;
let state_provider = get_state_provider(&self.client, block_hash)?;
let hashed_state = HashedPostState::from_bundle_state(&bundle_state.state);
Ok(state_provider.state_root_with_updates(hashed_state).unwrap())
) -> Result<(B256, Arc<HashedPostState>, Arc<TrieUpdates>), GravityStorageError> {
let mut storage = self.inner.lock().unwrap();
let state_provider = get_state_provider(&self.client, storage.state_provider_info.0)?;
let mut hashed_state_vec = vec![];
let mut trie_updates_vec = vec![];
for number in storage.state_provider_info.1..block_number {
hashed_state_vec.push(storage.block_number_to_state.get(&number).unwrap().clone());
trie_updates_vec.push(storage.block_number_to_trip_updates.get(&number).unwrap().clone());
}
let hashed_state = storage.block_number_to_state.get(&block_number).unwrap().clone();
let (state_root, trie_updates) = state_provider.state_root_with_updates_v2(hashed_state.as_ref().clone(), hashed_state_vec, trie_updates_vec).unwrap();
let trie_updates = Arc::new(trie_updates);
storage.block_number_to_trip_updates.insert(block_number, trie_updates.clone());
Ok((state_root, hashed_state, trie_updates))
}
}

pub struct BlockViewProvider {
block_views: Vec<Arc<CachedReads>>,
block_number_to_id: BTreeMap<u64, B256>,
db: StateProviderDatabase<StateProviderBox>,
}

impl BlockViewProvider {
fn new(block_views: Vec<Arc<CachedReads>>, state_provider: StateProviderBox) -> Self {
Self { block_views, db: StateProviderDatabase::new(state_provider) }
fn new(block_views: Vec<Arc<CachedReads>>, block_number_to_id: BTreeMap<u64, B256>, state_provider: StateProviderBox) -> Self {
Self { block_views, block_number_to_id, db: StateProviderDatabase::new(state_provider) }
}
}

Expand Down Expand Up @@ -192,11 +181,6 @@ impl DatabaseRef for BlockViewProvider {
}

fn block_hash_ref(&self, number: u64) -> Result<B256, Self::Error> {
for block_view in &self.block_views {
if let Some(hash) = block_view.block_hashes.get(&number) {
return Ok(*hash);
}
}
Ok(self.db.block_hash_ref(number)?)
Ok(*self.block_number_to_id.get(&number).unwrap())
}
}
19 changes: 6 additions & 13 deletions crates/gravity-storage/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,10 @@ pub mod block_view_storage;

use std::sync::Arc;

use async_trait::async_trait;
use reth_primitives::B256;
use reth_revm::DatabaseRef;
use reth_storage_api::errors::provider::ProviderError;
use reth_trie::updates::TrieUpdates;
use reth_trie::{updates::TrieUpdates, HashedPostState};
use revm::db::BundleState;

use thiserror::Error;
Expand Down Expand Up @@ -36,26 +35,20 @@ impl std::fmt::Display for GravityStorageError {
}
}

#[async_trait]
pub trait GravityStorage: Send + Sync + 'static {
type StateView: DatabaseRef<Error = ProviderError>;

async fn get_state_view(
fn get_state_view(
&self,
block_number: u64,
) -> Result<(B256, Self::StateView), GravityStorageError>;

async fn commit_state(&self, block_id: B256, block_number: u64, bundle_state: &BundleState);
fn insert_bundle_state(&self, block_id: B256, block_number: u64, bundle_state: &BundleState);

async fn insert_block_hash(&self, block_number: u64, block_hash: B256);
fn update_canonical(&self, block_number: u64, block_hash: B256); // gc

async fn block_hash_by_number(&self, block_number: u64) -> Result<B256, GravityStorageError>;

async fn update_canonical(&self, block_number: u64); // gc

async fn state_root_with_updates(
fn state_root_with_updates(
&self,
block_number: u64,
bundle_state: &BundleState,
) -> Result<(B256, TrieUpdates), GravityStorageError>;
) -> Result<(B256, Arc<HashedPostState>, Arc<TrieUpdates>), GravityStorageError>;
}
28 changes: 13 additions & 15 deletions crates/pipe-exec-layer-ext-v2/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,31 +199,31 @@ impl<Storage: GravityStorage> Core<Storage> {
let parent_block_header = self.execute_block_barrier.wait(block_number - 1).await.unwrap();
let (mut block, outcome) =
self.execute_ordered_block(ordered_block, &parent_block_header).await;
self.storage.insert_bundle_state(block_id, block_number, &outcome.state);
self.execute_block_barrier.notify(block_number, block.header.clone()).await.unwrap();

let execution_outcome = self.calculate_roots(&mut block, outcome);

let parent_hash = self.make_canonical_barrier.wait(block_number - 1).await.unwrap();
block.header.parent_hash = parent_hash;

// Merkling the state trie
let (state_root, trie_output) = self
let (state_root, hashed_state, trie_output) = self
.storage
.state_root_with_updates(block_number, &execution_outcome.state())
.await
.state_root_with_updates(block_number)
.unwrap();
block.header.state_root = state_root;

let parent_hash = self.make_canonical_barrier.wait(block_number - 1).await.unwrap();
block.header.parent_hash = parent_hash;

// Seal the block
let block = block.seal_slow();
let block_hash = block.hash();

// Commit the executed block hash to Coordinator
self.verify_executed_block_hash(ExecutedBlockMeta { block_id, block_hash }).await.unwrap();
self.storage.insert_block_hash(block_number, block_hash).await;

// Make the block canonical
self.make_canonical(block, execution_outcome, trie_output).await;
self.make_canonical(block, execution_outcome, hashed_state, trie_output).await;
self.storage.update_canonical(block_number, block_hash);
self.make_canonical_barrier.notify(block_number, block_hash).await;
}

Expand Down Expand Up @@ -304,7 +304,7 @@ impl<Storage: GravityStorage> Core<Storage> {
block.header.blob_gas_used = Some(blob_gas_used);
}

let (block_id, state) = self.storage.get_state_view(block.number - 1).await.unwrap();
let (block_id, state) = self.storage.get_state_view(block.number - 1).unwrap();
assert_eq!(block_id, ordered_block.id);
let db = State::builder().with_database_ref(state).with_bundle_update().build();

Expand Down Expand Up @@ -369,16 +369,16 @@ impl<Storage: GravityStorage> Core<Storage> {
&self,
sealed_block: SealedBlockWithSenders,
execution_outcome: ExecutionOutcome,
trie_output: TrieUpdates,
hashed_state: Arc<HashedPostState>,
trie_output: Arc<TrieUpdates>,
) {
// create the executed block data
let hashed_state = HashedPostState::from_bundle_state(&execution_outcome.state().state);
let executed_block = ExecutedBlock {
block: Arc::new(sealed_block.block),
senders: Arc::new(sealed_block.senders),
execution_output: Arc::new(execution_outcome),
hashed_state: Arc::new(hashed_state),
trie: Arc::new(trie_output),
hashed_state: hashed_state,
trie: trie_output,
};

let block_number = executed_block.block.number;
Expand All @@ -400,8 +400,6 @@ impl<Storage: GravityStorage> Core<Storage> {
rx.await.unwrap();

debug!(target: "make_canonical", block_number=?block_number, "block made canonical");

self.storage.update_canonical(block_number).await;
}
}

Expand Down
10 changes: 10 additions & 0 deletions crates/rpc/rpc-eth-types/src/cache/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use reth_revm::{database::StateProviderDatabase, db::CacheDB, DatabaseRef};
use reth_storage_api::StateProvider;
use reth_trie::HashedStorage;
use revm::Database;
use std::sync::Arc;

/// Helper alias type for the state's [`CacheDB`]
pub type StateCacheDb<'a> = CacheDB<StateProviderDatabase<StateProviderTraitObjWrapper<'a>>>;
Expand Down Expand Up @@ -45,6 +46,15 @@ impl<'a> reth_storage_api::StateRootProvider for StateProviderTraitObjWrapper<'a
) -> reth_errors::ProviderResult<(B256, reth_trie::updates::TrieUpdates)> {
self.0.state_root_from_nodes_with_updates(input)
}

fn state_root_with_updates_v2(
&self,
state: reth_trie::HashedPostState,
hashed_state_vec: Vec<Arc<reth_trie::HashedPostState>>,
trie_updates_vec:Vec<Arc<reth_trie::updates::TrieUpdates>>,
) -> ProviderResult<(B256, reth_trie::updates::TrieUpdates)> {
todo!()
}
}

impl<'a> reth_storage_api::StorageRootProvider for StateProviderTraitObjWrapper<'a> {
Expand Down
Loading

0 comments on commit 491fdb4

Please sign in to comment.