Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve] Support merklization pipeline #20

Merged
merged 1 commit into from
Jan 3, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions crates/chain-state/src/in_memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -932,6 +932,14 @@ mod tests {
) -> ProviderResult<(B256, TrieUpdates)> {
Ok((B256::random(), TrieUpdates::default()))
}

fn state_root_with_updates_v2(&self,
state:HashedPostState,
hashed_state_vec: Vec<Arc<HashedPostState>>,
trie_updates_vec: Vec<Arc<TrieUpdates>>,
) -> ProviderResult<(B256,TrieUpdates)> {
Ok((B256::random(), TrieUpdates::default()))
}
}

impl StorageRootProvider for MockStateProvider {
Expand Down
21 changes: 20 additions & 1 deletion crates/chain-state/src/memory_overlay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use reth_trie::{
};
use std::{
collections::{HashMap, HashSet},
sync::OnceLock,
sync::{Arc, OnceLock},
};

/// A state provider that stores references to in-memory blocks along with their state as well as
Expand Down Expand Up @@ -119,6 +119,25 @@ impl StateRootProvider for MemoryOverlayStateProvider {
self.state_root_from_nodes_with_updates(TrieInput::from_state(state))
}

fn state_root_with_updates_v2(
&self,
state: HashedPostState,
hashed_state_vec: Vec<Arc<HashedPostState>>,
trie_updates_vec: Vec<Arc<TrieUpdates>>,
) -> ProviderResult<(B256, TrieUpdates)> {
let mut input = TrieInput::from_state(state);
let mut trie_state = MemoryOverlayTrieState::default();
hashed_state_vec.iter().for_each(|hashed_state| {
trie_state.state.extend_ref(hashed_state.as_ref());
});
trie_updates_vec.iter().for_each(|trie_updates| {
trie_state.nodes.extend_ref(trie_updates.as_ref());
});
let MemoryOverlayTrieState { nodes, state } = trie_state;
input.prepend_cached(nodes, state);
self.state_root_from_nodes_with_updates(input)
}

fn state_root_from_nodes_with_updates(
&self,
mut input: TrieInput,
Expand Down
96 changes: 40 additions & 56 deletions crates/gravity-storage/src/block_view_storage/mod.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
use async_trait::async_trait;
use reth_payload_builder::database::CachedReads;
use reth_primitives::{revm_primitives::Bytecode, Address, B256, U256};
use reth_revm::database::StateProviderDatabase;
use reth_storage_api::{errors::provider::ProviderError, StateProviderBox, StateProviderFactory};
use reth_trie::{updates::TrieUpdates, HashedPostState};
use revm::{db::BundleState, primitives::AccountInfo, DatabaseRef};
use std::{collections::BTreeMap, sync::Arc};
use tokio::sync::Mutex;
use std::{clone, collections::BTreeMap, sync::{Arc, Mutex}};

use crate::{GravityStorage, GravityStorageError};

Expand All @@ -18,9 +16,8 @@ pub struct BlockViewStorage<Client> {
struct BlockViewStorageInner {
state_provider_info: (B256, u64), // (block_hash, block_number),
block_number_to_view: BTreeMap<u64, Arc<CachedReads>>,
block_number_to_state: BTreeMap<u64, HashedPostState>,
block_number_to_trip_updates: BTreeMap<u64, TrieUpdates>,
block_number_to_hash: BTreeMap<u64, B256>,
block_number_to_state: BTreeMap<u64, Arc<HashedPostState>>,
block_number_to_trip_updates: BTreeMap<u64, Arc<TrieUpdates>>,
block_number_to_id: BTreeMap<u64, B256>,
}

Expand All @@ -37,40 +34,37 @@ fn get_state_provider<Client: StateProviderFactory + 'static>(
}

impl<Client: StateProviderFactory + 'static> BlockViewStorage<Client> {
pub fn new(client: Client, block_number: u64, block_hash: B256) -> Self {
Self { client, inner: Mutex::new(BlockViewStorageInner::new(block_number, block_hash)) }
pub fn new(client: Client, latest_block_number: u64, latest_block_hash: B256, block_number_to_id: BTreeMap<u64, B256>) -> Self {
Self { client, inner: Mutex::new(BlockViewStorageInner::new(latest_block_number, latest_block_hash, block_number_to_id)) }
}
}

impl BlockViewStorageInner {
fn new(block_number: u64, block_hash: B256) -> Self {
let mut res = Self {
fn new(block_number: u64, block_hash: B256, block_number_to_id: BTreeMap<u64, B256>) -> Self {
Self {
state_provider_info: (block_hash, block_number),
block_number_to_view: BTreeMap::new(),
block_number_to_state: BTreeMap::new(),
block_number_to_trip_updates: BTreeMap::new(),
block_number_to_hash: BTreeMap::new(),
block_number_to_id: BTreeMap::new(),
};
res.block_number_to_hash.insert(block_number, block_hash);
res
block_number_to_id: block_number_to_id,
}
}
}

#[async_trait]
impl<Client: StateProviderFactory + 'static> GravityStorage for BlockViewStorage<Client> {
type StateView = BlockViewProvider;

async fn get_state_view(
fn get_state_view(
&self,
target_block_number: u64,
) -> Result<(B256, Self::StateView), GravityStorageError> {
let storage = self.inner.lock().await;
let storage = self.inner.lock().unwrap();
if target_block_number == storage.state_provider_info.1 {
return Ok((
B256::ZERO,
*storage.block_number_to_id.get(&target_block_number).unwrap(),
BlockViewProvider::new(
vec![],
storage.block_number_to_id.clone(),
get_state_provider(&self.client, storage.state_provider_info.0)?,
),
));
Expand All @@ -89,11 +83,11 @@ impl<Client: StateProviderFactory + 'static> GravityStorage for BlockViewStorage
let block_hash = storage.state_provider_info.0;
Ok((
block_id,
BlockViewProvider::new(block_views, get_state_provider(&self.client, block_hash)?),
BlockViewProvider::new(block_views, storage.block_number_to_id.clone(), get_state_provider(&self.client, block_hash)?),
))
}

async fn commit_state(&self, block_id: B256, block_number: u64, bundle_state: &BundleState) {
fn insert_bundle_state(&self, block_id: B256, block_number: u64, bundle_state: &BundleState) {
let mut cached = CachedReads::default();
for (addr, acc) in bundle_state.state().iter().map(|(a, acc)| (*a, acc)) {
if let Some(info) = acc.info.clone() {
Expand All @@ -104,58 +98,53 @@ impl<Client: StateProviderFactory + 'static> GravityStorage for BlockViewStorage
cached.insert_account(addr, info, storage);
}
}
let mut storage = self.inner.lock().await;
let mut storage = self.inner.lock().unwrap();
storage.block_number_to_view.insert(block_number, Arc::new(cached));
storage
.block_number_to_state
.insert(block_number, HashedPostState::from_bundle_state(&bundle_state.state));
.insert(block_number, Arc::new(HashedPostState::from_bundle_state(&bundle_state.state)));
storage.block_number_to_id.insert(block_number, block_id);
}

async fn insert_block_hash(&self, block_number: u64, block_hash: B256) {
let mut storage = self.inner.lock().await;
storage.block_number_to_hash.insert(block_number, block_hash);
}

async fn block_hash_by_number(&self, block_number: u64) -> Result<B256, GravityStorageError> {
let storage = self.inner.lock().await;
match storage.block_number_to_hash.get(&block_number) {
Some(block_hash) => Ok(*block_hash),
None => Err(GravityStorageError::TooNew(block_number)),
}
}

async fn update_canonical(&self, block_number: u64) {
let mut storage = self.inner.lock().await;
fn update_canonical(&self, block_number: u64, block_hash: B256) {
let mut storage = self.inner.lock().unwrap();
assert!(block_number > storage.state_provider_info.1);
let gc_block_number = storage.state_provider_info.1;
let block_hash = *storage.block_number_to_hash.get(&block_number).unwrap();
storage.state_provider_info = (block_hash, block_number);
storage.block_number_to_view.remove(&gc_block_number);
storage.block_number_to_hash.remove(&gc_block_number);
storage.block_number_to_id.remove(&gc_block_number);
storage.block_number_to_state.remove(&gc_block_number);
storage.block_number_to_trip_updates.remove(&gc_block_number);
}

async fn state_root_with_updates(
fn state_root_with_updates(
&self,
block_number: u64,
bundle_state: &BundleState,
) -> Result<(B256, TrieUpdates), GravityStorageError> {
let block_hash = self.block_hash_by_number(block_number - 1).await?;
let state_provider = get_state_provider(&self.client, block_hash)?;
let hashed_state = HashedPostState::from_bundle_state(&bundle_state.state);
Ok(state_provider.state_root_with_updates(hashed_state).unwrap())
) -> Result<(B256, Arc<HashedPostState>, Arc<TrieUpdates>), GravityStorageError> {
let mut storage = self.inner.lock().unwrap();
let state_provider = get_state_provider(&self.client, storage.state_provider_info.0)?;
let mut hashed_state_vec = vec![];
let mut trie_updates_vec = vec![];
for number in storage.state_provider_info.1..block_number {
hashed_state_vec.push(storage.block_number_to_state.get(&number).unwrap().clone());
trie_updates_vec.push(storage.block_number_to_trip_updates.get(&number).unwrap().clone());
}
let hashed_state = storage.block_number_to_state.get(&block_number).unwrap().clone();
let (state_root, trie_updates) = state_provider.state_root_with_updates_v2(hashed_state.as_ref().clone(), hashed_state_vec, trie_updates_vec).unwrap();
let trie_updates = Arc::new(trie_updates);
storage.block_number_to_trip_updates.insert(block_number, trie_updates.clone());
Ok((state_root, hashed_state, trie_updates))
}
}

pub struct BlockViewProvider {
block_views: Vec<Arc<CachedReads>>,
block_number_to_id: BTreeMap<u64, B256>,
db: StateProviderDatabase<StateProviderBox>,
}

impl BlockViewProvider {
fn new(block_views: Vec<Arc<CachedReads>>, state_provider: StateProviderBox) -> Self {
Self { block_views, db: StateProviderDatabase::new(state_provider) }
fn new(block_views: Vec<Arc<CachedReads>>, block_number_to_id: BTreeMap<u64, B256>, state_provider: StateProviderBox) -> Self {
Self { block_views, block_number_to_id, db: StateProviderDatabase::new(state_provider) }
}
}

Expand Down Expand Up @@ -192,11 +181,6 @@ impl DatabaseRef for BlockViewProvider {
}

fn block_hash_ref(&self, number: u64) -> Result<B256, Self::Error> {
for block_view in &self.block_views {
if let Some(hash) = block_view.block_hashes.get(&number) {
return Ok(*hash);
}
}
Ok(self.db.block_hash_ref(number)?)
Ok(*self.block_number_to_id.get(&number).unwrap())
}
}
19 changes: 6 additions & 13 deletions crates/gravity-storage/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,10 @@ pub mod block_view_storage;

use std::sync::Arc;

use async_trait::async_trait;
use reth_primitives::B256;
use reth_revm::DatabaseRef;
use reth_storage_api::errors::provider::ProviderError;
use reth_trie::updates::TrieUpdates;
use reth_trie::{updates::TrieUpdates, HashedPostState};
use revm::db::BundleState;

use thiserror::Error;
Expand Down Expand Up @@ -36,26 +35,20 @@ impl std::fmt::Display for GravityStorageError {
}
}

#[async_trait]
pub trait GravityStorage: Send + Sync + 'static {
type StateView: DatabaseRef<Error = ProviderError>;

async fn get_state_view(
fn get_state_view(
&self,
block_number: u64,
) -> Result<(B256, Self::StateView), GravityStorageError>;

async fn commit_state(&self, block_id: B256, block_number: u64, bundle_state: &BundleState);
fn insert_bundle_state(&self, block_id: B256, block_number: u64, bundle_state: &BundleState);

async fn insert_block_hash(&self, block_number: u64, block_hash: B256);
fn update_canonical(&self, block_number: u64, block_hash: B256); // gc

async fn block_hash_by_number(&self, block_number: u64) -> Result<B256, GravityStorageError>;

async fn update_canonical(&self, block_number: u64); // gc

async fn state_root_with_updates(
fn state_root_with_updates(
&self,
block_number: u64,
bundle_state: &BundleState,
) -> Result<(B256, TrieUpdates), GravityStorageError>;
) -> Result<(B256, Arc<HashedPostState>, Arc<TrieUpdates>), GravityStorageError>;
}
28 changes: 13 additions & 15 deletions crates/pipe-exec-layer-ext-v2/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,31 +199,31 @@ impl<Storage: GravityStorage> Core<Storage> {
let parent_block_header = self.execute_block_barrier.wait(block_number - 1).await.unwrap();
let (mut block, outcome) =
self.execute_ordered_block(ordered_block, &parent_block_header).await;
self.storage.insert_bundle_state(block_id, block_number, &outcome.state);
self.execute_block_barrier.notify(block_number, block.header.clone()).await.unwrap();

let execution_outcome = self.calculate_roots(&mut block, outcome);

let parent_hash = self.make_canonical_barrier.wait(block_number - 1).await.unwrap();
block.header.parent_hash = parent_hash;

// Merkling the state trie
let (state_root, trie_output) = self
let (state_root, hashed_state, trie_output) = self
.storage
.state_root_with_updates(block_number, &execution_outcome.state())
.await
.state_root_with_updates(block_number)
.unwrap();
block.header.state_root = state_root;

let parent_hash = self.make_canonical_barrier.wait(block_number - 1).await.unwrap();
block.header.parent_hash = parent_hash;

// Seal the block
let block = block.seal_slow();
let block_hash = block.hash();

// Commit the executed block hash to Coordinator
self.verify_executed_block_hash(ExecutedBlockMeta { block_id, block_hash }).await.unwrap();
self.storage.insert_block_hash(block_number, block_hash).await;

// Make the block canonical
self.make_canonical(block, execution_outcome, trie_output).await;
self.make_canonical(block, execution_outcome, hashed_state, trie_output).await;
self.storage.update_canonical(block_number, block_hash);
self.make_canonical_barrier.notify(block_number, block_hash).await;
}

Expand Down Expand Up @@ -304,7 +304,7 @@ impl<Storage: GravityStorage> Core<Storage> {
block.header.blob_gas_used = Some(blob_gas_used);
}

let (block_id, state) = self.storage.get_state_view(block.number - 1).await.unwrap();
let (block_id, state) = self.storage.get_state_view(block.number - 1).unwrap();
assert_eq!(block_id, ordered_block.id);
let db = State::builder().with_database_ref(state).with_bundle_update().build();

Expand Down Expand Up @@ -369,16 +369,16 @@ impl<Storage: GravityStorage> Core<Storage> {
&self,
sealed_block: SealedBlockWithSenders,
execution_outcome: ExecutionOutcome,
trie_output: TrieUpdates,
hashed_state: Arc<HashedPostState>,
trie_output: Arc<TrieUpdates>,
) {
// create the executed block data
let hashed_state = HashedPostState::from_bundle_state(&execution_outcome.state().state);
let executed_block = ExecutedBlock {
block: Arc::new(sealed_block.block),
senders: Arc::new(sealed_block.senders),
execution_output: Arc::new(execution_outcome),
hashed_state: Arc::new(hashed_state),
trie: Arc::new(trie_output),
hashed_state: hashed_state,
trie: trie_output,
};

let block_number = executed_block.block.number;
Expand All @@ -400,8 +400,6 @@ impl<Storage: GravityStorage> Core<Storage> {
rx.await.unwrap();

debug!(target: "make_canonical", block_number=?block_number, "block made canonical");

self.storage.update_canonical(block_number).await;
}
}

Expand Down
10 changes: 10 additions & 0 deletions crates/rpc/rpc-eth-types/src/cache/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use reth_revm::{database::StateProviderDatabase, db::CacheDB, DatabaseRef};
use reth_storage_api::StateProvider;
use reth_trie::HashedStorage;
use revm::Database;
use std::sync::Arc;

/// Helper alias type for the state's [`CacheDB`]
pub type StateCacheDb<'a> = CacheDB<StateProviderDatabase<StateProviderTraitObjWrapper<'a>>>;
Expand Down Expand Up @@ -45,6 +46,15 @@ impl<'a> reth_storage_api::StateRootProvider for StateProviderTraitObjWrapper<'a
) -> reth_errors::ProviderResult<(B256, reth_trie::updates::TrieUpdates)> {
self.0.state_root_from_nodes_with_updates(input)
}

fn state_root_with_updates_v2(
&self,
state: reth_trie::HashedPostState,
hashed_state_vec: Vec<Arc<reth_trie::HashedPostState>>,
trie_updates_vec:Vec<Arc<reth_trie::updates::TrieUpdates>>,
) -> ProviderResult<(B256, reth_trie::updates::TrieUpdates)> {
todo!()
}
}

impl<'a> reth_storage_api::StorageRootProvider for StateProviderTraitObjWrapper<'a> {
Expand Down
Loading
Loading