Skip to content

Commit

Permalink
propagate changes in all libs and binaries
Browse files Browse the repository at this point in the history
  • Loading branch information
teh-cmc committed Oct 28, 2024
1 parent c56fc6b commit 55834a1
Show file tree
Hide file tree
Showing 26 changed files with 121 additions and 87 deletions.
20 changes: 9 additions & 11 deletions crates/store/re_dataframe/src/engine.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use re_chunk::{EntityPath, TransportChunk};
use re_chunk_store::{ChunkStoreHandle, ColumnDescriptor, QueryExpression};
use re_log_types::EntityPathFilter;
use re_query::QueryCache;
use re_query::QueryCacheHandle;

use crate::QueryHandle;

Expand All @@ -15,26 +15,24 @@ use re_chunk_store::ComponentColumnDescriptor;
// `TransportChunk` type until we migrate to `arrow-rs`.
// `TransportChunk` maps 1:1 to `RecordBatch` so the switch (and the compatibility layer in the meantime)
// will be trivial.
// TODO(cmc): add an `arrow` feature to transportchunk in a follow-up pr and call it a day.
pub type RecordBatch = TransportChunk;

// --- Queries ---

/// A handle to our user-facing query engine.
///
/// Cheap to clone.
///
/// See the following methods:
/// * [`QueryEngine::schema`]: get the complete schema of the recording.
/// * [`QueryEngine::query`]: execute a [`QueryExpression`] on the recording.
//
// TODO(cmc): This needs to be a refcounted type that can be easily be passed around: the ref has
// got to go. But for that we need to generally introduce `ChunkStoreHandle` and `QueryCacheHandle`
// first, and this is not as straightforward as it seems.
pub struct QueryEngine<'a> {
#[derive(Clone)]
pub struct QueryEngine {
pub store: ChunkStoreHandle,
pub cache: &'a QueryCache,
pub cache: QueryCacheHandle,
}

impl QueryEngine<'_> {
impl QueryEngine {
/// Returns the full schema of the store.
///
/// This will include a column descriptor for every timeline and every component on every
Expand All @@ -60,8 +58,8 @@ impl QueryEngine<'_> {

/// Starts a new query by instantiating a [`QueryHandle`].
#[inline]
pub fn query(&self, query: QueryExpression) -> QueryHandle<'_> {
QueryHandle::new(self, query)
pub fn query(&self, query: QueryExpression) -> QueryHandle {
QueryHandle::new(self.clone(), query)
}

/// Returns an iterator over all the [`EntityPath`]s present in the database.
Expand Down
6 changes: 3 additions & 3 deletions crates/store/re_dataframe/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,15 @@ pub use self::external::arrow2::chunk::Chunk as ArrowChunk;
pub use self::external::re_chunk::{util::concatenate_record_batches, TransportChunk};
#[doc(no_inline)]
pub use self::external::re_chunk_store::{
ColumnSelector, ComponentColumnSelector, Index, IndexRange, IndexValue, QueryExpression,
SparseFillStrategy, TimeColumnSelector, ViewContentsSelector,
ChunkStoreHandle, ColumnSelector, ComponentColumnSelector, Index, IndexRange, IndexValue,
QueryExpression, SparseFillStrategy, TimeColumnSelector, ViewContentsSelector,
};
#[doc(no_inline)]
pub use self::external::re_log_types::{
EntityPath, EntityPathFilter, ResolvedTimeRange, TimeInt, Timeline,
};
#[doc(no_inline)]
pub use self::external::re_query::QueryCache;
pub use self::external::re_query::{QueryCache, QueryCacheHandle};

pub mod external {
pub use re_chunk;
Expand Down
38 changes: 21 additions & 17 deletions crates/store/re_dataframe/src/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,9 @@ use crate::{QueryEngine, RecordBatch};
/// Cheaply created via [`QueryEngine::query`].
///
/// See [`QueryHandle::next_row`] or [`QueryHandle::into_iter`].
pub struct QueryHandle<'a> {
pub struct QueryHandle {
/// Handle to the [`QueryEngine`].
pub(crate) engine: &'a QueryEngine<'a>,
pub(crate) engine: QueryEngine,

/// The original query expression used to instantiate this handle.
pub(crate) query: QueryExpression,
Expand Down Expand Up @@ -140,8 +140,8 @@ struct QueryHandleState {
unique_index_values: Vec<IndexValue>,
}

impl<'a> QueryHandle<'a> {
pub(crate) fn new(engine: &'a QueryEngine<'a>, query: QueryExpression) -> Self {
impl QueryHandle {
pub(crate) fn new(engine: QueryEngine, query: QueryExpression) -> Self {
Self {
engine,
query,
Expand All @@ -150,7 +150,7 @@ impl<'a> QueryHandle<'a> {
}
}

impl QueryHandle<'_> {
impl QueryHandle {
/// Lazily initialize internal private state.
///
/// It is important that query handles stay cheap to create.
Expand All @@ -163,6 +163,7 @@ impl QueryHandle<'_> {
re_tracing::profile_scope!("init");

let store = self.engine.store.read();
let query_cache = self.engine.cache.read();

// The timeline doesn't matter if we're running in static-only mode.
let filtered_index = self.query.filtered_index.unwrap_or_default();
Expand Down Expand Up @@ -328,7 +329,7 @@ impl QueryHandle<'_> {
let query =
re_chunk::LatestAtQuery::new(Timeline::default(), TimeInt::STATIC);

let results = self.engine.cache.latest_at(
let results = query_cache.latest_at(
&query,
&descr.entity_path,
[descr.component_name],
Expand Down Expand Up @@ -587,7 +588,11 @@ impl QueryHandle<'_> {
//
// TODO(cmc): Going through the cache is very useful in a Viewer context, but
// not so much in an SDK context. Make it configurable.
let results = self.engine.cache.range(query, entity_path, component_names);
let results = self
.engine
.cache
.read()
.range(query, entity_path, component_names);

debug_assert!(
results.components.len() <= 1,
Expand Down Expand Up @@ -793,6 +798,8 @@ impl QueryHandle<'_> {
pub fn next_row(&self) -> Option<Vec<Box<dyn ArrowArray>>> {
re_tracing::profile_function!();

let query_cache = self.engine.cache.read();

/// Temporary state used to resolve the streaming join for the current iteration.
#[derive(Debug)]
struct StreamingJoinStateEntry<'a> {
Expand Down Expand Up @@ -994,11 +1001,8 @@ impl QueryHandle<'_> {
let query =
re_chunk::LatestAtQuery::new(state.filtered_index, *cur_index_value);

let results = self.engine.cache.latest_at(
&query,
&descr.entity_path,
[descr.component_name],
);
let results =
query_cache.latest_at(&query, &descr.entity_path, [descr.component_name]);

*streaming_state = results
.components
Expand Down Expand Up @@ -1169,28 +1173,28 @@ impl QueryHandle<'_> {
}
}

impl<'a> QueryHandle<'a> {
impl QueryHandle {
/// Returns an iterator backed by [`Self::next_row`].
#[allow(clippy::should_implement_trait)] // we need an anonymous closure, this won't work
pub fn iter(&'a self) -> impl Iterator<Item = Vec<Box<dyn ArrowArray>>> + 'a {
pub fn iter(&self) -> impl Iterator<Item = Vec<Box<dyn ArrowArray>>> + '_ {
std::iter::from_fn(move || self.next_row())
}

/// Returns an iterator backed by [`Self::next_row`].
#[allow(clippy::should_implement_trait)] // we need an anonymous closure, this won't work
pub fn into_iter(self) -> impl Iterator<Item = Vec<Box<dyn ArrowArray>>> + 'a {
pub fn into_iter(self) -> impl Iterator<Item = Vec<Box<dyn ArrowArray>>> {
std::iter::from_fn(move || self.next_row())
}

/// Returns an iterator backed by [`Self::next_row_batch`].
#[allow(clippy::should_implement_trait)] // we need an anonymous closure, this won't work
pub fn batch_iter(&'a self) -> impl Iterator<Item = RecordBatch> + 'a {
pub fn batch_iter(&self) -> impl Iterator<Item = RecordBatch> + '_ {
std::iter::from_fn(move || self.next_row_batch())
}

/// Returns an iterator backed by [`Self::next_row_batch`].
#[allow(clippy::should_implement_trait)] // we need an anonymous closure, this won't work
pub fn into_batch_iter(self) -> impl Iterator<Item = RecordBatch> + 'a {
pub fn into_batch_iter(self) -> impl Iterator<Item = RecordBatch> {
std::iter::from_fn(move || self.next_row_batch())
}
}
Expand Down
23 changes: 15 additions & 8 deletions crates/store/re_entity_db/src/entity_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use re_log_types::{
ApplicationId, EntityPath, EntityPathHash, LogMsg, ResolvedTimeRange, ResolvedTimeRangeF,
SetStoreInfo, StoreId, StoreInfo, StoreKind, Timeline,
};
use re_query::{QueryCache, QueryCacheHandle};

use crate::{Error, TimesPerTimeline};

Expand Down Expand Up @@ -65,7 +66,7 @@ pub struct EntityDb {
data_store: ChunkStoreHandle,

/// Query caches for the data in [`Self::data_store`].
query_caches: re_query::QueryCache,
query_caches: QueryCacheHandle,

stats: IngestionStatistics,
}
Expand All @@ -77,7 +78,7 @@ impl EntityDb {

pub fn with_store_config(store_id: StoreId, store_config: ChunkStoreConfig) -> Self {
let data_store = ChunkStoreHandle::new(ChunkStore::new(store_id.clone(), store_config));
let query_caches = re_query::QueryCache::new(data_store.clone());
let query_caches = QueryCacheHandle::new(QueryCache::new(data_store.clone()));

Self {
data_source: None,
Expand Down Expand Up @@ -112,14 +113,15 @@ impl EntityDb {
}

#[inline]
pub fn query_caches(&self) -> &re_query::QueryCache {
pub fn query_caches(&self) -> &QueryCacheHandle {
&self.query_caches
}

pub fn query_engine(&self) -> re_dataframe::QueryEngine<'_> {
#[inline]
pub fn query_engine(&self) -> re_dataframe::QueryEngine {
re_dataframe::QueryEngine {
store: self.store().clone(),
cache: self.query_caches(),
cache: self.query_caches().clone(),
}
}

Expand All @@ -136,6 +138,7 @@ impl EntityDb {
component_names: impl IntoIterator<Item = re_types_core::ComponentName>,
) -> re_query::LatestAtResults {
self.query_caches()
.read()
.latest_at(query, entity_path, component_names)
}

Expand All @@ -155,6 +158,7 @@ impl EntityDb {
) -> Option<((TimeInt, RowId), C)> {
let results = self
.query_caches()
.read()
.latest_at(query, entity_path, [C::name()]);
results
.component_mono()
Expand All @@ -177,6 +181,7 @@ impl EntityDb {
) -> Option<((TimeInt, RowId), C)> {
let results = self
.query_caches()
.read()
.latest_at(query, entity_path, [C::name()]);
results
.component_mono_quiet()
Expand Down Expand Up @@ -360,7 +365,7 @@ impl EntityDb {
// Update our internal views by notifying them of resulting [`ChunkStoreEvent`]s.
self.times_per_timeline.on_events(&store_events);
self.time_histogram_per_timeline.on_events(&store_events);
self.query_caches.on_events(&store_events);
self.query_caches.write().on_events(&store_events);
self.tree.on_store_additions(&store_events);

// It is possible for writes to trigger deletions: specifically in the case of
Expand Down Expand Up @@ -421,7 +426,9 @@ impl EntityDb {
// to regain some space.
// See <https://github.com/rerun-io/rerun/issues/7369#issuecomment-2335164098> for the
// complete rationale.
self.query_caches.purge_fraction_of_ram(fraction_to_purge);
self.query_caches
.write()
.purge_fraction_of_ram(fraction_to_purge);
}

store_events
Expand Down Expand Up @@ -499,7 +506,7 @@ impl EntityDb {
re_tracing::profile_function!();

self.times_per_timeline.on_events(store_events);
self.query_caches.on_events(store_events);
self.query_caches.write().on_events(store_events);
self.time_histogram_per_timeline.on_events(store_events);
self.tree.on_store_deletions(store, store_events);
}
Expand Down
2 changes: 1 addition & 1 deletion crates/top/rerun/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ pub mod dataframe {
/// Everything needed to build custom `ChunkStoreSubscriber`s.
pub use re_entity_db::external::re_chunk_store::{
ChunkStore, ChunkStoreConfig, ChunkStoreDiff, ChunkStoreDiffKind, ChunkStoreEvent,
ChunkStoreGeneration, ChunkStoreSubscriber, VersionPolicy,
ChunkStoreGeneration, ChunkStoreHandle, ChunkStoreSubscriber, VersionPolicy,
};
pub use re_log_types::StoreKind;

Expand Down
1 change: 1 addition & 0 deletions crates/viewer/re_data_ui/src/component_path.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ impl DataUi for ComponentPath {
} else {
let results = db
.query_caches()
.read()
.latest_at(query, entity_path, [*component_name]);
if let Some(unit) = results.components.get(component_name) {
crate::ComponentPathLatestAtResults {
Expand Down
7 changes: 4 additions & 3 deletions crates/viewer/re_data_ui/src/instance_path.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,9 +122,10 @@ fn latest_at(
let components: Vec<(ComponentName, UnitChunkShared)> = components
.iter()
.filter_map(|&component_name| {
let mut results = db
.query_caches()
.latest_at(query, entity_path, [component_name]);
let mut results =
db.query_caches()
.read()
.latest_at(query, entity_path, [component_name]);

// We ignore components that are unset at this point in time
results
Expand Down
1 change: 1 addition & 0 deletions crates/viewer/re_selection_panel/src/defaults_ui.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,7 @@ fn active_defaults(
.into_iter()
.filter(|c| {
db.query_caches()
.read()
.latest_at(query, &view.defaults_path, [*c])
.component_batch_raw(c)
.map_or(false, |data| !data.is_empty())
Expand Down
Loading

0 comments on commit 55834a1

Please sign in to comment.