From 504829971c41f0f97aba240b4de81e1965b87fd3 Mon Sep 17 00:00:00 2001 From: Zeljko Mihaljcic <7150613+zehiko@users.noreply.github.com> Date: Fri, 24 Jan 2025 11:32:27 +0100 Subject: [PATCH 01/10] WIP --- .../proto/rerun/v0/remote_store.proto | 46 ++++ .../re_protos/src/v0/rerun.remote_store.v0.rs | 226 ++++++++++++++++++ 2 files changed, 272 insertions(+) diff --git a/crates/store/re_protos/proto/rerun/v0/remote_store.proto b/crates/store/re_protos/proto/rerun/v0/remote_store.proto index 86dbdd42bb31..256632d1d87a 100644 --- a/crates/store/re_protos/proto/rerun/v0/remote_store.proto +++ b/crates/store/re_protos/proto/rerun/v0/remote_store.proto @@ -9,6 +9,9 @@ service StorageNode { rpc Query(QueryRequest) returns (stream DataframePart) {} rpc FetchRecording(FetchRecordingRequest) returns (stream rerun.common.v0.RerunChunk) {} + rpc IndexCollection(IndexCollectionRequest) returns (IndexCollectionResponse) {} + rpc QueryCollectionIndex(QueryCollectionIndexRequest) returns (QueryCollectionIndexResponse) {} + // metadata API calls rpc QueryCatalog(QueryCatalogRequest) returns (stream DataframePart) {} rpc UpdateCatalog(UpdateCatalogRequest) returns (UpdateCatalogResponse) {} @@ -32,6 +35,49 @@ message DataframePart { bytes payload = 1000; } +// ---------------- IndexCollection ------------------ + +message IndexCollectionRequest { + // which collection do we want to create index for + string collection_name = 1; + // what kind of index do we want to create + IndexType index_type = 2; + // Component / column we want to index + rerun.common.v0.ComponentColumnDescriptor column = 3; +} + +enum IndexType { + // unused + _UNUSED_TYP = 0; + + // index type for full text search + INVERTED = 1; + + // index type for vector search + // TODO(zehiko) we can expose others when needed + VECTOR_IVF_PF = 2; + + // B-tree index suitable for range and sorted access queries + // on numerical, string and binary data + BTREE = 3; + + // TODO(zehiko) add others as needed +} + +message IndexCollectionResponse {} + + +// ---------------- QueryCollectionIndex ------------------ + +message QueryCollectionIndexRequest { + +} + +message QueryCollectionIndexResponse { + // TODO(zehiko) we need to define the response format +} + + // ---------------- GetRecordingSchema ------------------ message GetRecordingSchemaRequest { diff --git a/crates/store/re_protos/src/v0/rerun.remote_store.v0.rs b/crates/store/re_protos/src/v0/rerun.remote_store.v0.rs index 13a5f0d65df5..e667e4f9e273 100644 --- a/crates/store/re_protos/src/v0/rerun.remote_store.v0.rs +++ b/crates/store/re_protos/src/v0/rerun.remote_store.v0.rs @@ -20,6 +20,65 @@ impl ::prost::Name for DataframePart { } } #[derive(Clone, PartialEq, ::prost::Message)] +pub struct IndexCollectionRequest { + /// which collection do we want to create index for + #[prost(string, tag = "1")] + pub collection_name: ::prost::alloc::string::String, + /// what kind of index do we want to create + #[prost(enumeration = "IndexType", tag = "2")] + pub index_type: i32, + /// Component / column we want to index + #[prost(message, optional, tag = "3")] + pub column: ::core::option::Option, +} +impl ::prost::Name for IndexCollectionRequest { + const NAME: &'static str = "IndexCollectionRequest"; + const PACKAGE: &'static str = "rerun.remote_store.v0"; + fn full_name() -> ::prost::alloc::string::String { + "rerun.remote_store.v0.IndexCollectionRequest".into() + } + fn type_url() -> ::prost::alloc::string::String { + "/rerun.remote_store.v0.IndexCollectionRequest".into() + } +} +#[derive(Clone, Copy, PartialEq, ::prost::Message)] +pub struct IndexCollectionResponse {} +impl ::prost::Name for IndexCollectionResponse { + const NAME: &'static str = "IndexCollectionResponse"; + const PACKAGE: &'static str = "rerun.remote_store.v0"; + fn full_name() -> ::prost::alloc::string::String { + "rerun.remote_store.v0.IndexCollectionResponse".into() + } + fn type_url() -> ::prost::alloc::string::String { + "/rerun.remote_store.v0.IndexCollectionResponse".into() + } +} +#[derive(Clone, Copy, PartialEq, ::prost::Message)] +pub struct QueryCollectionIndexRequest {} +impl ::prost::Name for QueryCollectionIndexRequest { + const NAME: &'static str = "QueryCollectionIndexRequest"; + const PACKAGE: &'static str = "rerun.remote_store.v0"; + fn full_name() -> ::prost::alloc::string::String { + "rerun.remote_store.v0.QueryCollectionIndexRequest".into() + } + fn type_url() -> ::prost::alloc::string::String { + "/rerun.remote_store.v0.QueryCollectionIndexRequest".into() + } +} +/// TODO(zehiko) we need to define the response format +#[derive(Clone, Copy, PartialEq, ::prost::Message)] +pub struct QueryCollectionIndexResponse {} +impl ::prost::Name for QueryCollectionIndexResponse { + const NAME: &'static str = "QueryCollectionIndexResponse"; + const PACKAGE: &'static str = "rerun.remote_store.v0"; + fn full_name() -> ::prost::alloc::string::String { + "rerun.remote_store.v0.QueryCollectionIndexResponse".into() + } + fn type_url() -> ::prost::alloc::string::String { + "/rerun.remote_store.v0.QueryCollectionIndexResponse".into() + } +} +#[derive(Clone, PartialEq, ::prost::Message)] pub struct GetRecordingSchemaRequest { #[prost(message, optional, tag = "1")] pub recording_id: ::core::option::Option, @@ -306,6 +365,44 @@ impl ::prost::Name for RemoteStoreError { } #[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)] #[repr(i32)] +pub enum IndexType { + /// unused + UnusedTyp = 0, + /// index type for full text search + Inverted = 1, + /// index type for vector search + /// TODO(zehiko) we can expose others when needed + VectorIvfPf = 2, + /// B-tree index suitable for range and sorted access queries + /// on numerical, string and binary data + Btree = 3, +} +impl IndexType { + /// String value of the enum field names used in the ProtoBuf definition. + /// + /// The values are not transformed in any way and thus are considered stable + /// (if the ProtoBuf definition does not change) and safe for programmatic use. + pub fn as_str_name(&self) -> &'static str { + match self { + Self::UnusedTyp => "_UNUSED_TYP", + Self::Inverted => "INVERTED", + Self::VectorIvfPf => "VECTOR_IVF_PF", + Self::Btree => "BTREE", + } + } + /// Creates an enum from field names used in the ProtoBuf definition. + pub fn from_str_name(value: &str) -> ::core::option::Option { + match value { + "_UNUSED_TYP" => Some(Self::UnusedTyp), + "INVERTED" => Some(Self::Inverted), + "VECTOR_IVF_PF" => Some(Self::VectorIvfPf), + "BTREE" => Some(Self::Btree), + _ => None, + } + } +} +#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)] +#[repr(i32)] pub enum RecordingType { Rrd = 0, } @@ -485,6 +582,44 @@ pub mod storage_node_client { )); self.inner.server_streaming(req, path, codec).await } + pub async fn index_collection( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result, tonic::Status> + { + self.inner.ready().await.map_err(|e| { + tonic::Status::unknown(format!("Service was not ready: {}", e.into())) + })?; + let codec = tonic::codec::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/rerun.remote_store.v0.StorageNode/IndexCollection", + ); + let mut req = request.into_request(); + req.extensions_mut().insert(GrpcMethod::new( + "rerun.remote_store.v0.StorageNode", + "IndexCollection", + )); + self.inner.unary(req, path, codec).await + } + pub async fn query_collection_index( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result, tonic::Status> + { + self.inner.ready().await.map_err(|e| { + tonic::Status::unknown(format!("Service was not ready: {}", e.into())) + })?; + let codec = tonic::codec::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/rerun.remote_store.v0.StorageNode/QueryCollectionIndex", + ); + let mut req = request.into_request(); + req.extensions_mut().insert(GrpcMethod::new( + "rerun.remote_store.v0.StorageNode", + "QueryCollectionIndex", + )); + self.inner.unary(req, path, codec).await + } /// metadata API calls pub async fn query_catalog( &mut self, @@ -641,6 +776,14 @@ pub mod storage_node_server { &self, request: tonic::Request, ) -> std::result::Result, tonic::Status>; + async fn index_collection( + &self, + request: tonic::Request, + ) -> std::result::Result, tonic::Status>; + async fn query_collection_index( + &self, + request: tonic::Request, + ) -> std::result::Result, tonic::Status>; /// Server streaming response type for the QueryCatalog method. type QueryCatalogStream: tonic::codegen::tokio_stream::Stream< Item = std::result::Result, @@ -833,6 +976,89 @@ pub mod storage_node_server { }; Box::pin(fut) } + "/rerun.remote_store.v0.StorageNode/IndexCollection" => { + #[allow(non_camel_case_types)] + struct IndexCollectionSvc(pub Arc); + impl tonic::server::UnaryService + for IndexCollectionSvc + { + type Response = super::IndexCollectionResponse; + type Future = BoxFuture, tonic::Status>; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { + ::index_collection(&inner, request).await + }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let method = IndexCollectionSvc(inner); + let codec = tonic::codec::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } + "/rerun.remote_store.v0.StorageNode/QueryCollectionIndex" => { + #[allow(non_camel_case_types)] + struct QueryCollectionIndexSvc(pub Arc); + impl + tonic::server::UnaryService + for QueryCollectionIndexSvc + { + type Response = super::QueryCollectionIndexResponse; + type Future = BoxFuture, tonic::Status>; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { + ::query_collection_index(&inner, request).await + }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let method = QueryCollectionIndexSvc(inner); + let codec = tonic::codec::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } "/rerun.remote_store.v0.StorageNode/QueryCatalog" => { #[allow(non_camel_case_types)] struct QueryCatalogSvc(pub Arc); From c58ae6341ca230c074d1abd9cc90a442560d494c Mon Sep 17 00:00:00 2001 From: Zeljko Mihaljcic <7150613+zehiko@users.noreply.github.com> Date: Fri, 24 Jan 2025 11:32:47 +0100 Subject: [PATCH 02/10] rebase --- pixi.toml | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/pixi.toml b/pixi.toml index 6d30be21d927..c30864607e6c 100644 --- a/pixi.toml +++ b/pixi.toml @@ -450,8 +450,8 @@ cpp-prepare-msvc = "cmake -G 'Visual Studio 17 2022' -B build-msvc -S ." aiohttp = ">=3.9.3,<3.10" # For `zombie_todos.py` attrs = ">=23.1.0" -clang-tools = "16.0.6" # clang-format -cmake = "3.27.6" +clang-tools = "16.0.6.*" # clang-format +cmake = "3.27.6.*" colorama = ">=0.4.6,<0.5" doxygen = "1.9.7.*" # Make sure to use a version that is compatible with the theme we're using, see https://github.com/jothepro/doxygen-awesome-css/blob/v2.2.1/README.md fd-find = ">=10.1.0" # Used by `cpp-fmt` to find C++ files @@ -459,16 +459,16 @@ flatbuffers = ">=23" gitignore-parser = ">=0.1.9" gitpython = ">=3.1.40" jinja2 = ">=3.1.3,<3.2" # For `build_screenshot_compare.py` and other utilities that build websites. -mypy = "1.14.1" +mypy = "1.14.1.*" nasm = ">=2.16" # Required by https://github.com/memorysafety/rav1d for native video support -ninja = "1.11.1" +ninja = "1.11.1.*" numpy = ">=1.23,<2" prettier = "3.2.5.*" -pyarrow = "18.0.0" +pyarrow = "18.0.0.*" pytest = ">=7" pytest-benchmark = ">=4.0.0,<4.1" python = "=3.11" # We use the latest Python version here, so we get the latest mypy etc, EXCEPT 3.12 is too new for some of our examples. We run our CI tests on ALL supported versions though. -ruff = "0.3.5" +ruff = "0.3.5.*" semver = ">=2.13,<2.14" taplo = "=0.9.1" tomlkit = "0.12.3.*" @@ -490,10 +490,10 @@ parso = ">=0.8.4, <0.9" [feature.wheel-build.dependencies] binaryen = "117.*" # for `wasm-opt` -maturin = "1.5.1" +maturin = "1.5.1.*" packaging = ">=24.0,<25" # For `publish_wheels.py` pip = ">=23" -pyarrow = "18.0.0" +pyarrow = "18.0.0.*" nodejs = ">=20.12" # rerun_notebook needs nodejs to build the wheel wheel = ">=0.38,<0.39" @@ -511,13 +511,13 @@ sysroot_linux-64 = ">=2.17,<3" # rustc 1.64+ requires glibc 2.17+, see https://b sysroot_linux-aarch64 = ">=2.17,<3" # rustc 1.64+ requires glibc 2.17+, see https://blog.rust-lang.org/2022/08/01/Increasing-glibc-kernel-requirements.html [feature.cpp.target.unix.dependencies] -clang = "16.0.6" -ninja = "1.11.1" +clang = "16.0.6.*" +ninja = "1.11.1.*" c-compiler = "1.6.0.*" cxx-compiler = "1.6.0.*" [feature.cpp.target.win-64.dependencies] -vs2022_win-64 = "19.37.32822" +vs2022_win-64 = "19.37.32822.*" [feature.cpp.pypi-dependencies] ghp-import = "==2.1.0" # for CI documentation handling From 23ca90ad4f63dce1606f31d2d383e0c2d6970d1b Mon Sep 17 00:00:00 2001 From: Zeljko Mihaljcic <7150613+zehiko@users.noreply.github.com> Date: Fri, 24 Jan 2025 15:09:14 +0100 Subject: [PATCH 03/10] export ComponentName from re_dataframe --- crates/store/re_dataframe/src/lib.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/crates/store/re_dataframe/src/lib.rs b/crates/store/re_dataframe/src/lib.rs index c77a068fb424..ab7a5b2c60ce 100644 --- a/crates/store/re_dataframe/src/lib.rs +++ b/crates/store/re_dataframe/src/lib.rs @@ -21,11 +21,15 @@ pub use self::external::re_log_types::{ #[doc(no_inline)] pub use self::external::re_query::{QueryCache, QueryCacheHandle, StorageEngine}; +#[doc(no_inline)] +pub use self::external::re_types_core::ComponentName; + pub mod external { pub use re_chunk; pub use re_chunk_store; pub use re_log_types; pub use re_query; + pub use re_types_core; pub use arrow; } From da5b2247a27e574caab80de5e70ef64b3dff6cfd Mon Sep 17 00:00:00 2001 From: Zeljko Mihaljcic <7150613+zehiko@users.noreply.github.com> Date: Mon, 27 Jan 2025 16:32:33 +0100 Subject: [PATCH 04/10] grpc spec definition improvements --- .../src/protobuf_conversions.rs | 100 +++--- .../proto/rerun/v0/remote_store.proto | 100 ++++-- .../re_protos/src/v0/rerun.remote_store.v0.rs | 289 +++++++++++++++--- 3 files changed, 386 insertions(+), 103 deletions(-) diff --git a/crates/store/re_chunk_store/src/protobuf_conversions.rs b/crates/store/re_chunk_store/src/protobuf_conversions.rs index c0a22234925f..8556b4571c5f 100644 --- a/crates/store/re_chunk_store/src/protobuf_conversions.rs +++ b/crates/store/re_chunk_store/src/protobuf_conversions.rs @@ -258,6 +258,27 @@ impl From for re_protos::common::v0::Query { } } +impl TryFrom + for re_protos::common::v0::ComponentColumnDescriptor +{ + type Error = TypeConversionError; + + fn try_from(value: crate::ComponentColumnDescriptor) -> Result { + Ok(Self { + entity_path: Some(value.entity_path.into()), + archetype_name: value.archetype_name.map(|an| an.to_string()), + archetype_field_name: value.archetype_field_name.map(|afn| afn.to_string()), + component_name: value.component_name.to_string(), + datatype: serde_json::to_string(&value.store_datatype) + .map_err(|err| invalid_field!(Self, "component column descriptor", err))?, + is_static: value.is_static, + is_tombstone: value.is_tombstone, + is_semantically_empty: value.is_semantically_empty, + is_indicator: value.is_indicator, + }) + } +} + impl TryFrom for re_protos::common::v0::ColumnDescriptor { type Error = TypeConversionError; @@ -280,24 +301,7 @@ impl TryFrom for re_protos::common::v0::ColumnDescripto crate::ColumnDescriptor::Component(component_descriptor) => Ok(Self { descriptor_type: Some( re_protos::common::v0::column_descriptor::DescriptorType::ComponentColumn( - re_protos::common::v0::ComponentColumnDescriptor { - entity_path: Some(component_descriptor.entity_path.into()), - archetype_name: component_descriptor - .archetype_name - .map(|an| an.to_string()), - archetype_field_name: component_descriptor - .archetype_field_name - .map(|afn| afn.to_string()), - component_name: component_descriptor.component_name.to_string(), - datatype: serde_json::to_string(&component_descriptor.store_datatype) - .map_err(|err| { - invalid_field!(Self, "component column descriptor", err) - })?, - is_static: component_descriptor.is_static, - is_tombstone: component_descriptor.is_tombstone, - is_semantically_empty: component_descriptor.is_semantically_empty, - is_indicator: component_descriptor.is_indicator, - }, + component_descriptor.try_into()?, ), ), }), @@ -305,6 +309,40 @@ impl TryFrom for re_protos::common::v0::ColumnDescripto } } +impl TryFrom + for crate::ComponentColumnDescriptor +{ + type Error = TypeConversionError; + + fn try_from( + value: re_protos::common::v0::ComponentColumnDescriptor, + ) -> Result { + Ok(Self { + entity_path: value + .entity_path + .ok_or(missing_field!( + re_protos::common::v0::ComponentColumnDescriptor, + "entity_path", + ))? + .try_into()?, + archetype_name: value.archetype_name.map(Into::into), + archetype_field_name: value.archetype_field_name.map(Into::into), + component_name: value.component_name.into(), + store_datatype: serde_json::from_str(&value.datatype).map_err(|err| { + invalid_field!( + re_protos::common::v0::ColumnDescriptor, + "component column descriptor", + err + ) + })?, + is_static: value.is_static, + is_tombstone: value.is_tombstone, + is_semantically_empty: value.is_semantically_empty, + is_indicator: value.is_indicator, + }) + } +} + impl TryFrom for crate::ColumnDescriptor { type Error = TypeConversionError; @@ -335,31 +373,7 @@ impl TryFrom for crate::ColumnDescripto })), re_protos::common::v0::column_descriptor::DescriptorType::ComponentColumn( component_descriptor, - ) => Ok(Self::Component(crate::ComponentColumnDescriptor { - entity_path: component_descriptor - .entity_path - .ok_or(missing_field!( - re_protos::common::v0::ComponentColumnDescriptor, - "entity_path", - ))? - .try_into()?, - archetype_name: component_descriptor.archetype_name.map(Into::into), - archetype_field_name: component_descriptor.archetype_field_name.map(Into::into), - component_name: component_descriptor.component_name.into(), - store_datatype: serde_json::from_str(&component_descriptor.datatype).map_err( - |err| { - invalid_field!( - re_protos::common::v0::ColumnDescriptor, - "component column descriptor", - err - ) - }, - )?, - is_static: component_descriptor.is_static, - is_tombstone: component_descriptor.is_tombstone, - is_semantically_empty: component_descriptor.is_semantically_empty, - is_indicator: component_descriptor.is_indicator, - })), + ) => Ok(Self::Component(component_descriptor.try_into()?)), } } } diff --git a/crates/store/re_protos/proto/rerun/v0/remote_store.proto b/crates/store/re_protos/proto/rerun/v0/remote_store.proto index 256632d1d87a..48b0deb00010 100644 --- a/crates/store/re_protos/proto/rerun/v0/remote_store.proto +++ b/crates/store/re_protos/proto/rerun/v0/remote_store.proto @@ -10,7 +10,14 @@ service StorageNode { rpc FetchRecording(FetchRecordingRequest) returns (stream rerun.common.v0.RerunChunk) {} rpc IndexCollection(IndexCollectionRequest) returns (IndexCollectionResponse) {} - rpc QueryCollectionIndex(QueryCollectionIndexRequest) returns (QueryCollectionIndexResponse) {} + // Collection index query response is a RecordBatch with 2 columns: + // - 'resource_id' column with the id of the resource + // - timepoint column with the values reprensenting the points in time + // where index query matches. What time points are matched depends on the type of + // index that is queried. For example for vector search it might be timepoints where + // top-K matches are found within *each* resource in the collection. For inverted index + // it might be timepoints where the query string is found in the indexed column + rpc QueryCollectionIndex(QueryCollectionIndexRequest) returns (stream DataframePart) {} // metadata API calls rpc QueryCatalog(QueryCatalogRequest) returns (stream DataframePart) {} @@ -39,29 +46,46 @@ message DataframePart { message IndexCollectionRequest { // which collection do we want to create index for - string collection_name = 1; - // what kind of index do we want to create + Collection collection = 1; + // what kind of index do we want to create and what are + // its index specific properties IndexType index_type = 2; // Component / column we want to index rerun.common.v0.ComponentColumnDescriptor column = 3; + // What is the filter index i.e. timeline for which we + // will query the timepoints + rerun.common.v0.IndexColumnSelector time_index = 4; } -enum IndexType { - // unused - _UNUSED_TYP = 0; +message IndexType { + oneof typ { + InvertedIndex inverted = 1; + VectorIvfPqIndex vector = 2; + BTreeIndex btree = 3; + } +} + +message InvertedIndex { + bool store_position = 1; + string base_tokenizer = 2; + // TODO(zehiko) add properties as needed +} - // index type for full text search - INVERTED = 1; +message VectorIvfPqIndex { + uint32 num_partitions = 1; + uint32 num_sub_vectors = 2; + VectorDistanceMetric distance_metrics = 3; +} - // index type for vector search - // TODO(zehiko) we can expose others when needed - VECTOR_IVF_PF = 2; +enum VectorDistanceMetric { + L2 = 0; + COSINE = 1; + DOT = 2; + HAMMING = 3; +} - // B-tree index suitable for range and sorted access queries - // on numerical, string and binary data - BTREE = 3; +message BTreeIndex { - // TODO(zehiko) add others as needed } message IndexCollectionResponse {} @@ -70,13 +94,55 @@ message IndexCollectionResponse {} // ---------------- QueryCollectionIndex ------------------ message QueryCollectionIndexRequest { + // Collection we want to run the query against on + // If not specified, the default collection is queried + Collection collection = 1; + // Index type specific query properties + IndexQueryData index = 2; +} + +message IndexQueryData { + // specific index properties based on the index type + oneof index_query { + InvertedIndexQuery inverted = 1; + VectorIndexQuery vector = 2; + BTreeIndexQuery btree = 3; + } +} +message InvertedIndexQuery { + // Query to execute represented as the arrow data + // Query should be a unit RecordBatch with 2 columns: + // - 'index' column with the name of the column we want to query + // - 'query' column with the value we want to query. It must be + // of utf8 type + DataframePart query = 1; + // TODO(zehiko) add properties as needed } -message QueryCollectionIndexResponse { - // TODO(zehiko) we need to define the response format +message VectorIndexQuery { + // Query to execute represented as the arrow data + // Query should be a unit RecordBatch with 2 columns: + // - 'index' column with the name of the column we want to query + // - 'query' column with the value we want to query. It must be of + // type of float32 array + DataframePart query = 1; + uint32 top_k = 2; } +message BTreeIndexQuery { + // Query to execute represented as the arrow data + // Query should be a unit RecordBatch with 2 columns: + // - 'index' column with the name of the column we want to query + // - 'query' column with the value we want to query. The type should + // be of the same type as the indexed column + DataframePart query = 1; + // TODO(zehiko) add properties as needed +} + +message Collection { + string name = 1; +} // ---------------- GetRecordingSchema ------------------ diff --git a/crates/store/re_protos/src/v0/rerun.remote_store.v0.rs b/crates/store/re_protos/src/v0/rerun.remote_store.v0.rs index e667e4f9e273..7a47eafc419a 100644 --- a/crates/store/re_protos/src/v0/rerun.remote_store.v0.rs +++ b/crates/store/re_protos/src/v0/rerun.remote_store.v0.rs @@ -22,14 +22,19 @@ impl ::prost::Name for DataframePart { #[derive(Clone, PartialEq, ::prost::Message)] pub struct IndexCollectionRequest { /// which collection do we want to create index for - #[prost(string, tag = "1")] - pub collection_name: ::prost::alloc::string::String, - /// what kind of index do we want to create - #[prost(enumeration = "IndexType", tag = "2")] - pub index_type: i32, + #[prost(message, optional, tag = "1")] + pub collection: ::core::option::Option, + /// what kind of index do we want to create and what are + /// its index specific properties + #[prost(message, optional, tag = "2")] + pub index_type: ::core::option::Option, /// Component / column we want to index #[prost(message, optional, tag = "3")] pub column: ::core::option::Option, + /// What is the filter index i.e. timeline for which we + /// will query the timepoints + #[prost(message, optional, tag = "4")] + pub time_index: ::core::option::Option, } impl ::prost::Name for IndexCollectionRequest { const NAME: &'static str = "IndexCollectionRequest"; @@ -41,6 +46,82 @@ impl ::prost::Name for IndexCollectionRequest { "/rerun.remote_store.v0.IndexCollectionRequest".into() } } +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct IndexType { + #[prost(oneof = "index_type::Typ", tags = "1, 2, 3")] + pub typ: ::core::option::Option, +} +/// Nested message and enum types in `IndexType`. +pub mod index_type { + #[derive(Clone, PartialEq, ::prost::Oneof)] + pub enum Typ { + #[prost(message, tag = "1")] + Inverted(super::InvertedIndex), + #[prost(message, tag = "2")] + Vector(super::VectorIvfPqIndex), + #[prost(message, tag = "3")] + Btree(super::BTreeIndex), + } +} +impl ::prost::Name for IndexType { + const NAME: &'static str = "IndexType"; + const PACKAGE: &'static str = "rerun.remote_store.v0"; + fn full_name() -> ::prost::alloc::string::String { + "rerun.remote_store.v0.IndexType".into() + } + fn type_url() -> ::prost::alloc::string::String { + "/rerun.remote_store.v0.IndexType".into() + } +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct InvertedIndex { + #[prost(bool, tag = "1")] + pub store_position: bool, + /// TODO(zehiko) add properties as needed + #[prost(string, tag = "2")] + pub base_tokenizer: ::prost::alloc::string::String, +} +impl ::prost::Name for InvertedIndex { + const NAME: &'static str = "InvertedIndex"; + const PACKAGE: &'static str = "rerun.remote_store.v0"; + fn full_name() -> ::prost::alloc::string::String { + "rerun.remote_store.v0.InvertedIndex".into() + } + fn type_url() -> ::prost::alloc::string::String { + "/rerun.remote_store.v0.InvertedIndex".into() + } +} +#[derive(Clone, Copy, PartialEq, ::prost::Message)] +pub struct VectorIvfPqIndex { + #[prost(uint32, tag = "1")] + pub num_partitions: u32, + #[prost(uint32, tag = "2")] + pub num_sub_vectors: u32, + #[prost(enumeration = "VectorDistanceMetric", tag = "3")] + pub distance_metrics: i32, +} +impl ::prost::Name for VectorIvfPqIndex { + const NAME: &'static str = "VectorIvfPqIndex"; + const PACKAGE: &'static str = "rerun.remote_store.v0"; + fn full_name() -> ::prost::alloc::string::String { + "rerun.remote_store.v0.VectorIvfPqIndex".into() + } + fn type_url() -> ::prost::alloc::string::String { + "/rerun.remote_store.v0.VectorIvfPqIndex".into() + } +} +#[derive(Clone, Copy, PartialEq, ::prost::Message)] +pub struct BTreeIndex {} +impl ::prost::Name for BTreeIndex { + const NAME: &'static str = "BTreeIndex"; + const PACKAGE: &'static str = "rerun.remote_store.v0"; + fn full_name() -> ::prost::alloc::string::String { + "rerun.remote_store.v0.BTreeIndex".into() + } + fn type_url() -> ::prost::alloc::string::String { + "/rerun.remote_store.v0.BTreeIndex".into() + } +} #[derive(Clone, Copy, PartialEq, ::prost::Message)] pub struct IndexCollectionResponse {} impl ::prost::Name for IndexCollectionResponse { @@ -53,8 +134,16 @@ impl ::prost::Name for IndexCollectionResponse { "/rerun.remote_store.v0.IndexCollectionResponse".into() } } -#[derive(Clone, Copy, PartialEq, ::prost::Message)] -pub struct QueryCollectionIndexRequest {} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct QueryCollectionIndexRequest { + /// Collection we want to run the query against on + /// If not specified, the default collection is queried + #[prost(message, optional, tag = "1")] + pub collection: ::core::option::Option, + /// Index type specific query properties + #[prost(message, optional, tag = "2")] + pub index: ::core::option::Option, +} impl ::prost::Name for QueryCollectionIndexRequest { const NAME: &'static str = "QueryCollectionIndexRequest"; const PACKAGE: &'static str = "rerun.remote_store.v0"; @@ -65,17 +154,114 @@ impl ::prost::Name for QueryCollectionIndexRequest { "/rerun.remote_store.v0.QueryCollectionIndexRequest".into() } } -/// TODO(zehiko) we need to define the response format -#[derive(Clone, Copy, PartialEq, ::prost::Message)] -pub struct QueryCollectionIndexResponse {} -impl ::prost::Name for QueryCollectionIndexResponse { - const NAME: &'static str = "QueryCollectionIndexResponse"; +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct IndexQueryData { + /// specific index properties based on the index type + #[prost(oneof = "index_query_data::IndexQuery", tags = "1, 2, 3")] + pub index_query: ::core::option::Option, +} +/// Nested message and enum types in `IndexQueryData`. +pub mod index_query_data { + /// specific index properties based on the index type + #[derive(Clone, PartialEq, ::prost::Oneof)] + pub enum IndexQuery { + #[prost(message, tag = "1")] + Inverted(super::InvertedIndexQuery), + #[prost(message, tag = "2")] + Vector(super::VectorIndexQuery), + #[prost(message, tag = "3")] + Btree(super::BTreeIndexQuery), + } +} +impl ::prost::Name for IndexQueryData { + const NAME: &'static str = "IndexQueryData"; + const PACKAGE: &'static str = "rerun.remote_store.v0"; + fn full_name() -> ::prost::alloc::string::String { + "rerun.remote_store.v0.IndexQueryData".into() + } + fn type_url() -> ::prost::alloc::string::String { + "/rerun.remote_store.v0.IndexQueryData".into() + } +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct InvertedIndexQuery { + /// Query to execute represented as the arrow data + /// Query should be a unit RecordBatch with 2 columns: + /// - 'index' column with the name of the column we want to query + /// - 'query' column with the value we want to query. It must be + /// of utf8 type + /// + /// TODO(zehiko) add properties as needed + #[prost(message, optional, tag = "1")] + pub query: ::core::option::Option, +} +impl ::prost::Name for InvertedIndexQuery { + const NAME: &'static str = "InvertedIndexQuery"; + const PACKAGE: &'static str = "rerun.remote_store.v0"; + fn full_name() -> ::prost::alloc::string::String { + "rerun.remote_store.v0.InvertedIndexQuery".into() + } + fn type_url() -> ::prost::alloc::string::String { + "/rerun.remote_store.v0.InvertedIndexQuery".into() + } +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct VectorIndexQuery { + /// Query to execute represented as the arrow data + /// Query should be a unit RecordBatch with 2 columns: + /// - 'index' column with the name of the column we want to query + /// - 'query' column with the value we want to query. It must be of + /// type of float32 array + #[prost(message, optional, tag = "1")] + pub query: ::core::option::Option, + #[prost(uint32, tag = "2")] + pub top_k: u32, +} +impl ::prost::Name for VectorIndexQuery { + const NAME: &'static str = "VectorIndexQuery"; + const PACKAGE: &'static str = "rerun.remote_store.v0"; + fn full_name() -> ::prost::alloc::string::String { + "rerun.remote_store.v0.VectorIndexQuery".into() + } + fn type_url() -> ::prost::alloc::string::String { + "/rerun.remote_store.v0.VectorIndexQuery".into() + } +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct BTreeIndexQuery { + /// Query to execute represented as the arrow data + /// Query should be a unit RecordBatch with 2 columns: + /// - 'index' column with the name of the column we want to query + /// - 'query' column with the value we want to query. The type should + /// be of the same type as the indexed column + /// + /// TODO(zehiko) add properties as needed + #[prost(message, optional, tag = "1")] + pub query: ::core::option::Option, +} +impl ::prost::Name for BTreeIndexQuery { + const NAME: &'static str = "BTreeIndexQuery"; + const PACKAGE: &'static str = "rerun.remote_store.v0"; + fn full_name() -> ::prost::alloc::string::String { + "rerun.remote_store.v0.BTreeIndexQuery".into() + } + fn type_url() -> ::prost::alloc::string::String { + "/rerun.remote_store.v0.BTreeIndexQuery".into() + } +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct Collection { + #[prost(string, tag = "1")] + pub name: ::prost::alloc::string::String, +} +impl ::prost::Name for Collection { + const NAME: &'static str = "Collection"; const PACKAGE: &'static str = "rerun.remote_store.v0"; fn full_name() -> ::prost::alloc::string::String { - "rerun.remote_store.v0.QueryCollectionIndexResponse".into() + "rerun.remote_store.v0.Collection".into() } fn type_url() -> ::prost::alloc::string::String { - "/rerun.remote_store.v0.QueryCollectionIndexResponse".into() + "/rerun.remote_store.v0.Collection".into() } } #[derive(Clone, PartialEq, ::prost::Message)] @@ -365,38 +551,32 @@ impl ::prost::Name for RemoteStoreError { } #[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)] #[repr(i32)] -pub enum IndexType { - /// unused - UnusedTyp = 0, - /// index type for full text search - Inverted = 1, - /// index type for vector search - /// TODO(zehiko) we can expose others when needed - VectorIvfPf = 2, - /// B-tree index suitable for range and sorted access queries - /// on numerical, string and binary data - Btree = 3, -} -impl IndexType { +pub enum VectorDistanceMetric { + L2 = 0, + Cosine = 1, + Dot = 2, + Hamming = 3, +} +impl VectorDistanceMetric { /// String value of the enum field names used in the ProtoBuf definition. /// /// The values are not transformed in any way and thus are considered stable /// (if the ProtoBuf definition does not change) and safe for programmatic use. pub fn as_str_name(&self) -> &'static str { match self { - Self::UnusedTyp => "_UNUSED_TYP", - Self::Inverted => "INVERTED", - Self::VectorIvfPf => "VECTOR_IVF_PF", - Self::Btree => "BTREE", + Self::L2 => "L2", + Self::Cosine => "COSINE", + Self::Dot => "DOT", + Self::Hamming => "HAMMING", } } /// Creates an enum from field names used in the ProtoBuf definition. pub fn from_str_name(value: &str) -> ::core::option::Option { match value { - "_UNUSED_TYP" => Some(Self::UnusedTyp), - "INVERTED" => Some(Self::Inverted), - "VECTOR_IVF_PF" => Some(Self::VectorIvfPf), - "BTREE" => Some(Self::Btree), + "L2" => Some(Self::L2), + "COSINE" => Some(Self::Cosine), + "DOT" => Some(Self::Dot), + "HAMMING" => Some(Self::Hamming), _ => None, } } @@ -601,11 +781,20 @@ pub mod storage_node_client { )); self.inner.unary(req, path, codec).await } + /// Collection index query response is a RecordBatch with 2 columns: + /// - 'resource_id' column with the id of the resource + /// - timepoint column with the values reprensenting the points in time + /// where index query matches. What time points are matched depends on the type of + /// index that is queried. For example for vector search it might be timepoints where + /// top-K matches are found within *each* resource in the collection. For inverted index + /// it might be timepoints where the query string is found in the indexed column pub async fn query_collection_index( &mut self, request: impl tonic::IntoRequest, - ) -> std::result::Result, tonic::Status> - { + ) -> std::result::Result< + tonic::Response>, + tonic::Status, + > { self.inner.ready().await.map_err(|e| { tonic::Status::unknown(format!("Service was not ready: {}", e.into())) })?; @@ -618,7 +807,7 @@ pub mod storage_node_client { "rerun.remote_store.v0.StorageNode", "QueryCollectionIndex", )); - self.inner.unary(req, path, codec).await + self.inner.server_streaming(req, path, codec).await } /// metadata API calls pub async fn query_catalog( @@ -780,10 +969,22 @@ pub mod storage_node_server { &self, request: tonic::Request, ) -> std::result::Result, tonic::Status>; + /// Server streaming response type for the QueryCollectionIndex method. + type QueryCollectionIndexStream: tonic::codegen::tokio_stream::Stream< + Item = std::result::Result, + > + std::marker::Send + + 'static; + /// Collection index query response is a RecordBatch with 2 columns: + /// - 'resource_id' column with the id of the resource + /// - timepoint column with the values reprensenting the points in time + /// where index query matches. What time points are matched depends on the type of + /// index that is queried. For example for vector search it might be timepoints where + /// top-K matches are found within *each* resource in the collection. For inverted index + /// it might be timepoints where the query string is found in the indexed column async fn query_collection_index( &self, request: tonic::Request, - ) -> std::result::Result, tonic::Status>; + ) -> std::result::Result, tonic::Status>; /// Server streaming response type for the QueryCatalog method. type QueryCatalogStream: tonic::codegen::tokio_stream::Stream< Item = std::result::Result, @@ -1021,11 +1222,13 @@ pub mod storage_node_server { #[allow(non_camel_case_types)] struct QueryCollectionIndexSvc(pub Arc); impl - tonic::server::UnaryService + tonic::server::ServerStreamingService for QueryCollectionIndexSvc { - type Response = super::QueryCollectionIndexResponse; - type Future = BoxFuture, tonic::Status>; + type Response = super::DataframePart; + type ResponseStream = T::QueryCollectionIndexStream; + type Future = + BoxFuture, tonic::Status>; fn call( &mut self, request: tonic::Request, @@ -1054,7 +1257,7 @@ pub mod storage_node_server { max_decoding_message_size, max_encoding_message_size, ); - let res = grpc.unary(method, req).await; + let res = grpc.server_streaming(method, req).await; Ok(res) }; Box::pin(fut) From 463ae846b95b06b35235c07e673da4e2f112ba00 Mon Sep 17 00:00:00 2001 From: Zeljko Mihaljcic <7150613+zehiko@users.noreply.github.com> Date: Tue, 28 Jan 2025 11:54:04 +0100 Subject: [PATCH 05/10] few more updated to the spec --- .../proto/rerun/v0/remote_store.proto | 16 +++--- .../re_protos/src/v0/rerun.remote_store.v0.rs | 51 ++++++++++--------- 2 files changed, 34 insertions(+), 33 deletions(-) diff --git a/crates/store/re_protos/proto/rerun/v0/remote_store.proto b/crates/store/re_protos/proto/rerun/v0/remote_store.proto index 48b0deb00010..2ecafb23c345 100644 --- a/crates/store/re_protos/proto/rerun/v0/remote_store.proto +++ b/crates/store/re_protos/proto/rerun/v0/remote_store.proto @@ -49,7 +49,7 @@ message IndexCollectionRequest { Collection collection = 1; // what kind of index do we want to create and what are // its index specific properties - IndexType index_type = 2; + IndexProperties properties = 2; // Component / column we want to index rerun.common.v0.ComponentColumnDescriptor column = 3; // What is the filter index i.e. timeline for which we @@ -57,8 +57,8 @@ message IndexCollectionRequest { rerun.common.v0.IndexColumnSelector time_index = 4; } -message IndexType { - oneof typ { +message IndexProperties { + oneof props { InvertedIndex inverted = 1; VectorIvfPqIndex vector = 2; BTreeIndex btree = 3; @@ -85,7 +85,7 @@ enum VectorDistanceMetric { } message BTreeIndex { - + // TODO(zehiko) as properties as needed } message IndexCollectionResponse {} @@ -98,12 +98,12 @@ message QueryCollectionIndexRequest { // If not specified, the default collection is queried Collection collection = 1; // Index type specific query properties - IndexQueryData index = 2; + IndexQuery query = 2; } -message IndexQueryData { - // specific index properties based on the index type - oneof index_query { +message IndexQuery { + // specific index query properties based on the index type + oneof query { InvertedIndexQuery inverted = 1; VectorIndexQuery vector = 2; BTreeIndexQuery btree = 3; diff --git a/crates/store/re_protos/src/v0/rerun.remote_store.v0.rs b/crates/store/re_protos/src/v0/rerun.remote_store.v0.rs index 7a47eafc419a..4bc8820a0ee0 100644 --- a/crates/store/re_protos/src/v0/rerun.remote_store.v0.rs +++ b/crates/store/re_protos/src/v0/rerun.remote_store.v0.rs @@ -27,7 +27,7 @@ pub struct IndexCollectionRequest { /// what kind of index do we want to create and what are /// its index specific properties #[prost(message, optional, tag = "2")] - pub index_type: ::core::option::Option, + pub properties: ::core::option::Option, /// Component / column we want to index #[prost(message, optional, tag = "3")] pub column: ::core::option::Option, @@ -47,14 +47,14 @@ impl ::prost::Name for IndexCollectionRequest { } } #[derive(Clone, PartialEq, ::prost::Message)] -pub struct IndexType { - #[prost(oneof = "index_type::Typ", tags = "1, 2, 3")] - pub typ: ::core::option::Option, +pub struct IndexProperties { + #[prost(oneof = "index_properties::Props", tags = "1, 2, 3")] + pub props: ::core::option::Option, } -/// Nested message and enum types in `IndexType`. -pub mod index_type { +/// Nested message and enum types in `IndexProperties`. +pub mod index_properties { #[derive(Clone, PartialEq, ::prost::Oneof)] - pub enum Typ { + pub enum Props { #[prost(message, tag = "1")] Inverted(super::InvertedIndex), #[prost(message, tag = "2")] @@ -63,14 +63,14 @@ pub mod index_type { Btree(super::BTreeIndex), } } -impl ::prost::Name for IndexType { - const NAME: &'static str = "IndexType"; +impl ::prost::Name for IndexProperties { + const NAME: &'static str = "IndexProperties"; const PACKAGE: &'static str = "rerun.remote_store.v0"; fn full_name() -> ::prost::alloc::string::String { - "rerun.remote_store.v0.IndexType".into() + "rerun.remote_store.v0.IndexProperties".into() } fn type_url() -> ::prost::alloc::string::String { - "/rerun.remote_store.v0.IndexType".into() + "/rerun.remote_store.v0.IndexProperties".into() } } #[derive(Clone, PartialEq, ::prost::Message)] @@ -110,6 +110,7 @@ impl ::prost::Name for VectorIvfPqIndex { "/rerun.remote_store.v0.VectorIvfPqIndex".into() } } +/// TODO(zehiko) as properties as needed #[derive(Clone, Copy, PartialEq, ::prost::Message)] pub struct BTreeIndex {} impl ::prost::Name for BTreeIndex { @@ -142,7 +143,7 @@ pub struct QueryCollectionIndexRequest { pub collection: ::core::option::Option, /// Index type specific query properties #[prost(message, optional, tag = "2")] - pub index: ::core::option::Option, + pub query: ::core::option::Option, } impl ::prost::Name for QueryCollectionIndexRequest { const NAME: &'static str = "QueryCollectionIndexRequest"; @@ -155,16 +156,16 @@ impl ::prost::Name for QueryCollectionIndexRequest { } } #[derive(Clone, PartialEq, ::prost::Message)] -pub struct IndexQueryData { - /// specific index properties based on the index type - #[prost(oneof = "index_query_data::IndexQuery", tags = "1, 2, 3")] - pub index_query: ::core::option::Option, -} -/// Nested message and enum types in `IndexQueryData`. -pub mod index_query_data { - /// specific index properties based on the index type +pub struct IndexQuery { + /// specific index query properties based on the index type + #[prost(oneof = "index_query::Query", tags = "1, 2, 3")] + pub query: ::core::option::Option, +} +/// Nested message and enum types in `IndexQuery`. +pub mod index_query { + /// specific index query properties based on the index type #[derive(Clone, PartialEq, ::prost::Oneof)] - pub enum IndexQuery { + pub enum Query { #[prost(message, tag = "1")] Inverted(super::InvertedIndexQuery), #[prost(message, tag = "2")] @@ -173,14 +174,14 @@ pub mod index_query_data { Btree(super::BTreeIndexQuery), } } -impl ::prost::Name for IndexQueryData { - const NAME: &'static str = "IndexQueryData"; +impl ::prost::Name for IndexQuery { + const NAME: &'static str = "IndexQuery"; const PACKAGE: &'static str = "rerun.remote_store.v0"; fn full_name() -> ::prost::alloc::string::String { - "rerun.remote_store.v0.IndexQueryData".into() + "rerun.remote_store.v0.IndexQuery".into() } fn type_url() -> ::prost::alloc::string::String { - "/rerun.remote_store.v0.IndexQueryData".into() + "/rerun.remote_store.v0.IndexQuery".into() } } #[derive(Clone, PartialEq, ::prost::Message)] From 43abe0da11c8dbf1034e737ff34ec9323d57bda0 Mon Sep 17 00:00:00 2001 From: Zeljko Mihaljcic <7150613+zehiko@users.noreply.github.com> Date: Wed, 29 Jan 2025 17:56:36 +0100 Subject: [PATCH 06/10] small improvements --- .../proto/rerun/v0/remote_store.proto | 55 +++---- .../re_protos/src/v0/rerun.remote_store.v0.rs | 148 +++++++++--------- 2 files changed, 98 insertions(+), 105 deletions(-) diff --git a/crates/store/re_protos/proto/rerun/v0/remote_store.proto b/crates/store/re_protos/proto/rerun/v0/remote_store.proto index 2ecafb23c345..f82d05816251 100644 --- a/crates/store/re_protos/proto/rerun/v0/remote_store.proto +++ b/crates/store/re_protos/proto/rerun/v0/remote_store.proto @@ -9,14 +9,15 @@ service StorageNode { rpc Query(QueryRequest) returns (stream DataframePart) {} rpc FetchRecording(FetchRecordingRequest) returns (stream rerun.common.v0.RerunChunk) {} - rpc IndexCollection(IndexCollectionRequest) returns (IndexCollectionResponse) {} - // Collection index query response is a RecordBatch with 2 columns: + rpc CreateCollectionIndex(CreateCollectionIndexRequest) returns (CreateCollectionIndexResponse) {} + // Collection index query response is a RecordBatch with 3 columns: // - 'resource_id' column with the id of the resource // - timepoint column with the values reprensenting the points in time // where index query matches. What time points are matched depends on the type of // index that is queried. For example for vector search it might be timepoints where // top-K matches are found within *each* resource in the collection. For inverted index // it might be timepoints where the query string is found in the indexed column + // - 'data' column with the data that is returned for the matched timepoints rpc QueryCollectionIndex(QueryCollectionIndexRequest) returns (stream DataframePart) {} // metadata API calls @@ -42,18 +43,21 @@ message DataframePart { bytes payload = 1000; } -// ---------------- IndexCollection ------------------ +// ---------------- CreateCollectionIndex ------------------ -message IndexCollectionRequest { +message CreateCollectionIndexRequest { // which collection do we want to create index for Collection collection = 1; // what kind of index do we want to create and what are // its index specific properties IndexProperties properties = 2; // Component / column we want to index + // TODO - name of the lance table should be derived from the descriptor! rerun.common.v0.ComponentColumnDescriptor column = 3; // What is the filter index i.e. timeline for which we // will query the timepoints + // TODO(zehiko) this might go away and we might just index + // across all the timelines rerun.common.v0.IndexColumnSelector time_index = 4; } @@ -68,7 +72,7 @@ message IndexProperties { message InvertedIndex { bool store_position = 1; string base_tokenizer = 2; - // TODO(zehiko) add properties as needed + // TODO(zehiko) add other properties as needed } message VectorIvfPqIndex { @@ -85,10 +89,12 @@ enum VectorDistanceMetric { } message BTreeIndex { - // TODO(zehiko) as properties as needed + // TODO(zehiko) add properties as needed } -message IndexCollectionResponse {} +message CreateCollectionIndexResponse { + uint64 indexed_rows = 1; +} // ---------------- QueryCollectionIndex ------------------ @@ -97,13 +103,22 @@ message QueryCollectionIndexRequest { // Collection we want to run the query against on // If not specified, the default collection is queried Collection collection = 1; - // Index type specific query properties - IndexQuery query = 2; + // Index column that is queried + rerun.common.v0.ComponentColumnDescriptor column = 2; + // Query data - type of data is index specific. Caller must ensure + // to provide the right type. For vector search this should + // be a vector of appropriate size, for inverted index this should be a string. + // Query data is represented as a unit (single row) RecordBatch with 1 column. + DataframePart query = 3; + // Index type specific properties + IndexQueryProperties properties = 4; + // max number of rows to be returned + optional uint32 limit = 5; } -message IndexQuery { +message IndexQueryProperties { // specific index query properties based on the index type - oneof query { + oneof props { InvertedIndexQuery inverted = 1; VectorIndexQuery vector = 2; BTreeIndexQuery btree = 3; @@ -111,32 +126,14 @@ message IndexQuery { } message InvertedIndexQuery { - // Query to execute represented as the arrow data - // Query should be a unit RecordBatch with 2 columns: - // - 'index' column with the name of the column we want to query - // - 'query' column with the value we want to query. It must be - // of utf8 type - DataframePart query = 1; // TODO(zehiko) add properties as needed } message VectorIndexQuery { - // Query to execute represented as the arrow data - // Query should be a unit RecordBatch with 2 columns: - // - 'index' column with the name of the column we want to query - // - 'query' column with the value we want to query. It must be of - // type of float32 array - DataframePart query = 1; uint32 top_k = 2; } message BTreeIndexQuery { - // Query to execute represented as the arrow data - // Query should be a unit RecordBatch with 2 columns: - // - 'index' column with the name of the column we want to query - // - 'query' column with the value we want to query. The type should - // be of the same type as the indexed column - DataframePart query = 1; // TODO(zehiko) add properties as needed } diff --git a/crates/store/re_protos/src/v0/rerun.remote_store.v0.rs b/crates/store/re_protos/src/v0/rerun.remote_store.v0.rs index 4bc8820a0ee0..443d655ef599 100644 --- a/crates/store/re_protos/src/v0/rerun.remote_store.v0.rs +++ b/crates/store/re_protos/src/v0/rerun.remote_store.v0.rs @@ -20,7 +20,7 @@ impl ::prost::Name for DataframePart { } } #[derive(Clone, PartialEq, ::prost::Message)] -pub struct IndexCollectionRequest { +pub struct CreateCollectionIndexRequest { /// which collection do we want to create index for #[prost(message, optional, tag = "1")] pub collection: ::core::option::Option, @@ -29,21 +29,24 @@ pub struct IndexCollectionRequest { #[prost(message, optional, tag = "2")] pub properties: ::core::option::Option, /// Component / column we want to index + /// TODO - name of the lance table should be derived from the descriptor! #[prost(message, optional, tag = "3")] pub column: ::core::option::Option, /// What is the filter index i.e. timeline for which we /// will query the timepoints + /// TODO(zehiko) this might go away and we might just index + /// across all the timelines #[prost(message, optional, tag = "4")] pub time_index: ::core::option::Option, } -impl ::prost::Name for IndexCollectionRequest { - const NAME: &'static str = "IndexCollectionRequest"; +impl ::prost::Name for CreateCollectionIndexRequest { + const NAME: &'static str = "CreateCollectionIndexRequest"; const PACKAGE: &'static str = "rerun.remote_store.v0"; fn full_name() -> ::prost::alloc::string::String { - "rerun.remote_store.v0.IndexCollectionRequest".into() + "rerun.remote_store.v0.CreateCollectionIndexRequest".into() } fn type_url() -> ::prost::alloc::string::String { - "/rerun.remote_store.v0.IndexCollectionRequest".into() + "/rerun.remote_store.v0.CreateCollectionIndexRequest".into() } } #[derive(Clone, PartialEq, ::prost::Message)] @@ -77,7 +80,7 @@ impl ::prost::Name for IndexProperties { pub struct InvertedIndex { #[prost(bool, tag = "1")] pub store_position: bool, - /// TODO(zehiko) add properties as needed + /// TODO(zehiko) add other properties as needed #[prost(string, tag = "2")] pub base_tokenizer: ::prost::alloc::string::String, } @@ -110,7 +113,7 @@ impl ::prost::Name for VectorIvfPqIndex { "/rerun.remote_store.v0.VectorIvfPqIndex".into() } } -/// TODO(zehiko) as properties as needed +/// TODO(zehiko) add properties as needed #[derive(Clone, Copy, PartialEq, ::prost::Message)] pub struct BTreeIndex {} impl ::prost::Name for BTreeIndex { @@ -124,15 +127,18 @@ impl ::prost::Name for BTreeIndex { } } #[derive(Clone, Copy, PartialEq, ::prost::Message)] -pub struct IndexCollectionResponse {} -impl ::prost::Name for IndexCollectionResponse { - const NAME: &'static str = "IndexCollectionResponse"; +pub struct CreateCollectionIndexResponse { + #[prost(uint64, tag = "1")] + pub indexed_rows: u64, +} +impl ::prost::Name for CreateCollectionIndexResponse { + const NAME: &'static str = "CreateCollectionIndexResponse"; const PACKAGE: &'static str = "rerun.remote_store.v0"; fn full_name() -> ::prost::alloc::string::String { - "rerun.remote_store.v0.IndexCollectionResponse".into() + "rerun.remote_store.v0.CreateCollectionIndexResponse".into() } fn type_url() -> ::prost::alloc::string::String { - "/rerun.remote_store.v0.IndexCollectionResponse".into() + "/rerun.remote_store.v0.CreateCollectionIndexResponse".into() } } #[derive(Clone, PartialEq, ::prost::Message)] @@ -141,9 +147,21 @@ pub struct QueryCollectionIndexRequest { /// If not specified, the default collection is queried #[prost(message, optional, tag = "1")] pub collection: ::core::option::Option, - /// Index type specific query properties + /// Index column that is queried #[prost(message, optional, tag = "2")] - pub query: ::core::option::Option, + pub column: ::core::option::Option, + /// Query data - type of data is index specific. Caller must ensure + /// to provide the right type. For vector search this should + /// be a vector of appropriate size, for inverted index this should be a string. + /// Query data is represented as a unit (single row) RecordBatch with 1 column. + #[prost(message, optional, tag = "3")] + pub query: ::core::option::Option, + /// Index type specific properties + #[prost(message, optional, tag = "4")] + pub properties: ::core::option::Option, + /// max number of rows to be returned + #[prost(uint32, optional, tag = "5")] + pub limit: ::core::option::Option, } impl ::prost::Name for QueryCollectionIndexRequest { const NAME: &'static str = "QueryCollectionIndexRequest"; @@ -155,17 +173,17 @@ impl ::prost::Name for QueryCollectionIndexRequest { "/rerun.remote_store.v0.QueryCollectionIndexRequest".into() } } -#[derive(Clone, PartialEq, ::prost::Message)] -pub struct IndexQuery { +#[derive(Clone, Copy, PartialEq, ::prost::Message)] +pub struct IndexQueryProperties { /// specific index query properties based on the index type - #[prost(oneof = "index_query::Query", tags = "1, 2, 3")] - pub query: ::core::option::Option, + #[prost(oneof = "index_query_properties::Props", tags = "1, 2, 3")] + pub props: ::core::option::Option, } -/// Nested message and enum types in `IndexQuery`. -pub mod index_query { +/// Nested message and enum types in `IndexQueryProperties`. +pub mod index_query_properties { /// specific index query properties based on the index type - #[derive(Clone, PartialEq, ::prost::Oneof)] - pub enum Query { + #[derive(Clone, Copy, PartialEq, ::prost::Oneof)] + pub enum Props { #[prost(message, tag = "1")] Inverted(super::InvertedIndexQuery), #[prost(message, tag = "2")] @@ -174,28 +192,19 @@ pub mod index_query { Btree(super::BTreeIndexQuery), } } -impl ::prost::Name for IndexQuery { - const NAME: &'static str = "IndexQuery"; +impl ::prost::Name for IndexQueryProperties { + const NAME: &'static str = "IndexQueryProperties"; const PACKAGE: &'static str = "rerun.remote_store.v0"; fn full_name() -> ::prost::alloc::string::String { - "rerun.remote_store.v0.IndexQuery".into() + "rerun.remote_store.v0.IndexQueryProperties".into() } fn type_url() -> ::prost::alloc::string::String { - "/rerun.remote_store.v0.IndexQuery".into() + "/rerun.remote_store.v0.IndexQueryProperties".into() } } -#[derive(Clone, PartialEq, ::prost::Message)] -pub struct InvertedIndexQuery { - /// Query to execute represented as the arrow data - /// Query should be a unit RecordBatch with 2 columns: - /// - 'index' column with the name of the column we want to query - /// - 'query' column with the value we want to query. It must be - /// of utf8 type - /// - /// TODO(zehiko) add properties as needed - #[prost(message, optional, tag = "1")] - pub query: ::core::option::Option, -} +/// TODO(zehiko) add properties as needed +#[derive(Clone, Copy, PartialEq, ::prost::Message)] +pub struct InvertedIndexQuery {} impl ::prost::Name for InvertedIndexQuery { const NAME: &'static str = "InvertedIndexQuery"; const PACKAGE: &'static str = "rerun.remote_store.v0"; @@ -206,15 +215,8 @@ impl ::prost::Name for InvertedIndexQuery { "/rerun.remote_store.v0.InvertedIndexQuery".into() } } -#[derive(Clone, PartialEq, ::prost::Message)] +#[derive(Clone, Copy, PartialEq, ::prost::Message)] pub struct VectorIndexQuery { - /// Query to execute represented as the arrow data - /// Query should be a unit RecordBatch with 2 columns: - /// - 'index' column with the name of the column we want to query - /// - 'query' column with the value we want to query. It must be of - /// type of float32 array - #[prost(message, optional, tag = "1")] - pub query: ::core::option::Option, #[prost(uint32, tag = "2")] pub top_k: u32, } @@ -228,18 +230,9 @@ impl ::prost::Name for VectorIndexQuery { "/rerun.remote_store.v0.VectorIndexQuery".into() } } -#[derive(Clone, PartialEq, ::prost::Message)] -pub struct BTreeIndexQuery { - /// Query to execute represented as the arrow data - /// Query should be a unit RecordBatch with 2 columns: - /// - 'index' column with the name of the column we want to query - /// - 'query' column with the value we want to query. The type should - /// be of the same type as the indexed column - /// - /// TODO(zehiko) add properties as needed - #[prost(message, optional, tag = "1")] - pub query: ::core::option::Option, -} +/// TODO(zehiko) add properties as needed +#[derive(Clone, Copy, PartialEq, ::prost::Message)] +pub struct BTreeIndexQuery {} impl ::prost::Name for BTreeIndexQuery { const NAME: &'static str = "BTreeIndexQuery"; const PACKAGE: &'static str = "rerun.remote_store.v0"; @@ -763,32 +756,33 @@ pub mod storage_node_client { )); self.inner.server_streaming(req, path, codec).await } - pub async fn index_collection( + pub async fn create_collection_index( &mut self, - request: impl tonic::IntoRequest, - ) -> std::result::Result, tonic::Status> + request: impl tonic::IntoRequest, + ) -> std::result::Result, tonic::Status> { self.inner.ready().await.map_err(|e| { tonic::Status::unknown(format!("Service was not ready: {}", e.into())) })?; let codec = tonic::codec::ProstCodec::default(); let path = http::uri::PathAndQuery::from_static( - "/rerun.remote_store.v0.StorageNode/IndexCollection", + "/rerun.remote_store.v0.StorageNode/CreateCollectionIndex", ); let mut req = request.into_request(); req.extensions_mut().insert(GrpcMethod::new( "rerun.remote_store.v0.StorageNode", - "IndexCollection", + "CreateCollectionIndex", )); self.inner.unary(req, path, codec).await } - /// Collection index query response is a RecordBatch with 2 columns: + /// Collection index query response is a RecordBatch with 3 columns: /// - 'resource_id' column with the id of the resource /// - timepoint column with the values reprensenting the points in time /// where index query matches. What time points are matched depends on the type of /// index that is queried. For example for vector search it might be timepoints where /// top-K matches are found within *each* resource in the collection. For inverted index /// it might be timepoints where the query string is found in the indexed column + /// - 'data' column with the data that is returned for the matched timepoints pub async fn query_collection_index( &mut self, request: impl tonic::IntoRequest, @@ -966,22 +960,23 @@ pub mod storage_node_server { &self, request: tonic::Request, ) -> std::result::Result, tonic::Status>; - async fn index_collection( + async fn create_collection_index( &self, - request: tonic::Request, - ) -> std::result::Result, tonic::Status>; + request: tonic::Request, + ) -> std::result::Result, tonic::Status>; /// Server streaming response type for the QueryCollectionIndex method. type QueryCollectionIndexStream: tonic::codegen::tokio_stream::Stream< Item = std::result::Result, > + std::marker::Send + 'static; - /// Collection index query response is a RecordBatch with 2 columns: + /// Collection index query response is a RecordBatch with 3 columns: /// - 'resource_id' column with the id of the resource /// - timepoint column with the values reprensenting the points in time /// where index query matches. What time points are matched depends on the type of /// index that is queried. For example for vector search it might be timepoints where /// top-K matches are found within *each* resource in the collection. For inverted index /// it might be timepoints where the query string is found in the indexed column + /// - 'data' column with the data that is returned for the matched timepoints async fn query_collection_index( &self, request: tonic::Request, @@ -1178,21 +1173,22 @@ pub mod storage_node_server { }; Box::pin(fut) } - "/rerun.remote_store.v0.StorageNode/IndexCollection" => { + "/rerun.remote_store.v0.StorageNode/CreateCollectionIndex" => { #[allow(non_camel_case_types)] - struct IndexCollectionSvc(pub Arc); - impl tonic::server::UnaryService - for IndexCollectionSvc + struct CreateCollectionIndexSvc(pub Arc); + impl + tonic::server::UnaryService + for CreateCollectionIndexSvc { - type Response = super::IndexCollectionResponse; + type Response = super::CreateCollectionIndexResponse; type Future = BoxFuture, tonic::Status>; fn call( &mut self, - request: tonic::Request, + request: tonic::Request, ) -> Self::Future { let inner = Arc::clone(&self.0); let fut = async move { - ::index_collection(&inner, request).await + ::create_collection_index(&inner, request).await }; Box::pin(fut) } @@ -1203,7 +1199,7 @@ pub mod storage_node_server { let max_encoding_message_size = self.max_encoding_message_size; let inner = self.inner.clone(); let fut = async move { - let method = IndexCollectionSvc(inner); + let method = CreateCollectionIndexSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) .apply_compression_config( From 7ed9ab2360aa5f5916f4508a4aacbdcecd0d746c Mon Sep 17 00:00:00 2001 From: Zeljko Mihaljcic <7150613+zehiko@users.noreply.github.com> Date: Thu, 30 Jan 2025 08:28:08 +0100 Subject: [PATCH 07/10] fix rebase issues and bring back descriptor that was removed --- .../re_protos/proto/rerun/v0/common.proto | 24 +++++++++++ .../store/re_protos/src/v0/rerun.common.v0.rs | 43 +++++++++++++++++++ 2 files changed, 67 insertions(+) diff --git a/crates/store/re_protos/proto/rerun/v0/common.proto b/crates/store/re_protos/proto/rerun/v0/common.proto index fe32374a5de1..a8530bda650c 100644 --- a/crates/store/re_protos/proto/rerun/v0/common.proto +++ b/crates/store/re_protos/proto/rerun/v0/common.proto @@ -102,6 +102,30 @@ message Query { SparseFillStrategy sparse_fill_strategy = 11; } +// Describes a data/component column, such as `Position3D`. +message ComponentColumnDescriptor { + // The path of the entity. + EntityPath entity_path = 1; + // Optional name of the `Archetype` associated with this data. + optional string archetype_name = 2; + // Optional name of the field within `Archetype` associated with this data. + optional string archetype_field_name = 3; + // Semantic name associated with this data. + string component_name = 4; + // The Arrow datatype of the column. + // + // Currently this is a JSON-encoded `arrow-rs` `DataType`. + string datatype = 5; + // Whether the column is a static column. + bool is_static = 6; + // Whether the column is a tombstone column. + bool is_tombstone = 7; + // Whether the column is an indicator column. + bool is_indicator = 8; + // Whether the column is semantically empty. + bool is_semantically_empty = 9; +} + message ColumnSelection { repeated ColumnSelector columns = 1; diff --git a/crates/store/re_protos/src/v0/rerun.common.v0.rs b/crates/store/re_protos/src/v0/rerun.common.v0.rs index 806f6810ed96..60b5cb15f303 100644 --- a/crates/store/re_protos/src/v0/rerun.common.v0.rs +++ b/crates/store/re_protos/src/v0/rerun.common.v0.rs @@ -160,6 +160,49 @@ impl ::prost::Name for Query { "/rerun.common.v0.Query".into() } } +/// Describes a data/component column, such as `Position3D`. +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct ComponentColumnDescriptor { + /// The path of the entity. + #[prost(message, optional, tag = "1")] + pub entity_path: ::core::option::Option, + /// Optional name of the `Archetype` associated with this data. + #[prost(string, optional, tag = "2")] + pub archetype_name: ::core::option::Option<::prost::alloc::string::String>, + /// Optional name of the field within `Archetype` associated with this data. + #[prost(string, optional, tag = "3")] + pub archetype_field_name: ::core::option::Option<::prost::alloc::string::String>, + /// Semantic name associated with this data. + #[prost(string, tag = "4")] + pub component_name: ::prost::alloc::string::String, + /// The Arrow datatype of the column. + /// + /// Currently this is a JSON-encoded `arrow-rs` `DataType`. + #[prost(string, tag = "5")] + pub datatype: ::prost::alloc::string::String, + /// Whether the column is a static column. + #[prost(bool, tag = "6")] + pub is_static: bool, + /// Whether the column is a tombstone column. + #[prost(bool, tag = "7")] + pub is_tombstone: bool, + /// Whether the column is an indicator column. + #[prost(bool, tag = "8")] + pub is_indicator: bool, + /// Whether the column is semantically empty. + #[prost(bool, tag = "9")] + pub is_semantically_empty: bool, +} +impl ::prost::Name for ComponentColumnDescriptor { + const NAME: &'static str = "ComponentColumnDescriptor"; + const PACKAGE: &'static str = "rerun.common.v0"; + fn full_name() -> ::prost::alloc::string::String { + "rerun.common.v0.ComponentColumnDescriptor".into() + } + fn type_url() -> ::prost::alloc::string::String { + "/rerun.common.v0.ComponentColumnDescriptor".into() + } +} #[derive(Clone, PartialEq, ::prost::Message)] pub struct ColumnSelection { #[prost(message, repeated, tag = "1")] From d3722fbce33cea488ad17d820f5d4a4c28a66a59 Mon Sep 17 00:00:00 2001 From: Zeljko Mihaljcic <7150613+zehiko@users.noreply.github.com> Date: Thu, 30 Jan 2025 09:32:21 +0100 Subject: [PATCH 08/10] replace ComponentColumnDescriptor that's not really nicely serializable with IndedColumn --- .../src/protobuf_conversions.rs | 55 ------------------- .../re_protos/proto/rerun/v0/common.proto | 25 --------- .../proto/rerun/v0/remote_store.proto | 17 +++++- .../store/re_protos/src/v0/rerun.common.v0.rs | 43 --------------- .../re_protos/src/v0/rerun.remote_store.v0.rs | 31 ++++++++++- 5 files changed, 42 insertions(+), 129 deletions(-) diff --git a/crates/store/re_chunk_store/src/protobuf_conversions.rs b/crates/store/re_chunk_store/src/protobuf_conversions.rs index e0aa77717f3f..3d593b6993fd 100644 --- a/crates/store/re_chunk_store/src/protobuf_conversions.rs +++ b/crates/store/re_chunk_store/src/protobuf_conversions.rs @@ -256,61 +256,6 @@ impl From for re_protos::common::v0::Query { } } -impl TryFrom - for re_protos::common::v0::ComponentColumnDescriptor -{ - type Error = TypeConversionError; - - fn try_from(value: crate::ComponentColumnDescriptor) -> Result { - Ok(Self { - entity_path: Some(value.entity_path.into()), - archetype_name: value.archetype_name.map(|an| an.to_string()), - archetype_field_name: value.archetype_field_name.map(|afn| afn.to_string()), - component_name: value.component_name.to_string(), - datatype: serde_json::to_string(&value.store_datatype) - .map_err(|err| invalid_field!(Self, "component column descriptor", err))?, - is_static: value.is_static, - is_tombstone: value.is_tombstone, - is_semantically_empty: value.is_semantically_empty, - is_indicator: value.is_indicator, - }) - } -} - -impl TryFrom - for crate::ComponentColumnDescriptor -{ - type Error = TypeConversionError; - - fn try_from( - value: re_protos::common::v0::ComponentColumnDescriptor, - ) -> Result { - Ok(Self { - entity_path: value - .entity_path - .ok_or(missing_field!( - re_protos::common::v0::ComponentColumnDescriptor, - "entity_path", - ))? - .try_into()?, - archetype_name: value.archetype_name.map(Into::into), - archetype_field_name: value.archetype_field_name.map(Into::into), - component_name: value.component_name.into(), - store_datatype: serde_json::from_str(&value.datatype).map_err(|err| { - invalid_field!( - re_protos::common::v0::ColumnDescriptor, - "component column descriptor", - err - ) - })?, - is_static: value.is_static, - is_tombstone: value.is_tombstone, - is_semantically_empty: value.is_semantically_empty, - is_indicator: value.is_indicator, - }) - } -} - #[cfg(test)] mod tests { use re_protos::common::v0::{ diff --git a/crates/store/re_protos/proto/rerun/v0/common.proto b/crates/store/re_protos/proto/rerun/v0/common.proto index a8530bda650c..3efa7174be55 100644 --- a/crates/store/re_protos/proto/rerun/v0/common.proto +++ b/crates/store/re_protos/proto/rerun/v0/common.proto @@ -102,31 +102,6 @@ message Query { SparseFillStrategy sparse_fill_strategy = 11; } -// Describes a data/component column, such as `Position3D`. -message ComponentColumnDescriptor { - // The path of the entity. - EntityPath entity_path = 1; - // Optional name of the `Archetype` associated with this data. - optional string archetype_name = 2; - // Optional name of the field within `Archetype` associated with this data. - optional string archetype_field_name = 3; - // Semantic name associated with this data. - string component_name = 4; - // The Arrow datatype of the column. - // - // Currently this is a JSON-encoded `arrow-rs` `DataType`. - string datatype = 5; - // Whether the column is a static column. - bool is_static = 6; - // Whether the column is a tombstone column. - bool is_tombstone = 7; - // Whether the column is an indicator column. - bool is_indicator = 8; - // Whether the column is semantically empty. - bool is_semantically_empty = 9; -} - - message ColumnSelection { repeated ColumnSelector columns = 1; } diff --git a/crates/store/re_protos/proto/rerun/v0/remote_store.proto b/crates/store/re_protos/proto/rerun/v0/remote_store.proto index 10c41feedcda..fa3da37f8b51 100644 --- a/crates/store/re_protos/proto/rerun/v0/remote_store.proto +++ b/crates/store/re_protos/proto/rerun/v0/remote_store.proto @@ -45,6 +45,18 @@ message DataframePart { // ---------------- CreateCollectionIndex ------------------ +// used to define which column we want to index +message IndexColumn { + // The path of the entity. + rerun.common.v0.EntityPath entity_path = 1; + // Optional name of the `Archetype` associated with this data. + optional string archetype_name = 2; + // Optional name of the field within `Archetype` associated with this data. + optional string archetype_field_name = 3; + // Semantic name associated with this data. + string component_name = 4; +} + message CreateCollectionIndexRequest { // which collection do we want to create index for Collection collection = 1; @@ -52,8 +64,7 @@ message CreateCollectionIndexRequest { // its index specific properties IndexProperties properties = 2; // Component / column we want to index - // TODO - name of the lance table should be derived from the descriptor! - rerun.common.v0.ComponentColumnDescriptor column = 3; + IndexColumn column = 3; // What is the filter index i.e. timeline for which we // will query the timepoints // TODO(zehiko) this might go away and we might just index @@ -104,7 +115,7 @@ message QueryCollectionIndexRequest { // If not specified, the default collection is queried Collection collection = 1; // Index column that is queried - rerun.common.v0.ComponentColumnDescriptor column = 2; + IndexColumn column = 2; // Query data - type of data is index specific. Caller must ensure // to provide the right type. For vector search this should // be a vector of appropriate size, for inverted index this should be a string. diff --git a/crates/store/re_protos/src/v0/rerun.common.v0.rs b/crates/store/re_protos/src/v0/rerun.common.v0.rs index 60b5cb15f303..806f6810ed96 100644 --- a/crates/store/re_protos/src/v0/rerun.common.v0.rs +++ b/crates/store/re_protos/src/v0/rerun.common.v0.rs @@ -160,49 +160,6 @@ impl ::prost::Name for Query { "/rerun.common.v0.Query".into() } } -/// Describes a data/component column, such as `Position3D`. -#[derive(Clone, PartialEq, ::prost::Message)] -pub struct ComponentColumnDescriptor { - /// The path of the entity. - #[prost(message, optional, tag = "1")] - pub entity_path: ::core::option::Option, - /// Optional name of the `Archetype` associated with this data. - #[prost(string, optional, tag = "2")] - pub archetype_name: ::core::option::Option<::prost::alloc::string::String>, - /// Optional name of the field within `Archetype` associated with this data. - #[prost(string, optional, tag = "3")] - pub archetype_field_name: ::core::option::Option<::prost::alloc::string::String>, - /// Semantic name associated with this data. - #[prost(string, tag = "4")] - pub component_name: ::prost::alloc::string::String, - /// The Arrow datatype of the column. - /// - /// Currently this is a JSON-encoded `arrow-rs` `DataType`. - #[prost(string, tag = "5")] - pub datatype: ::prost::alloc::string::String, - /// Whether the column is a static column. - #[prost(bool, tag = "6")] - pub is_static: bool, - /// Whether the column is a tombstone column. - #[prost(bool, tag = "7")] - pub is_tombstone: bool, - /// Whether the column is an indicator column. - #[prost(bool, tag = "8")] - pub is_indicator: bool, - /// Whether the column is semantically empty. - #[prost(bool, tag = "9")] - pub is_semantically_empty: bool, -} -impl ::prost::Name for ComponentColumnDescriptor { - const NAME: &'static str = "ComponentColumnDescriptor"; - const PACKAGE: &'static str = "rerun.common.v0"; - fn full_name() -> ::prost::alloc::string::String { - "rerun.common.v0.ComponentColumnDescriptor".into() - } - fn type_url() -> ::prost::alloc::string::String { - "/rerun.common.v0.ComponentColumnDescriptor".into() - } -} #[derive(Clone, PartialEq, ::prost::Message)] pub struct ColumnSelection { #[prost(message, repeated, tag = "1")] diff --git a/crates/store/re_protos/src/v0/rerun.remote_store.v0.rs b/crates/store/re_protos/src/v0/rerun.remote_store.v0.rs index becc822095f1..ce2ff4972d88 100644 --- a/crates/store/re_protos/src/v0/rerun.remote_store.v0.rs +++ b/crates/store/re_protos/src/v0/rerun.remote_store.v0.rs @@ -19,6 +19,32 @@ impl ::prost::Name for DataframePart { "/rerun.remote_store.v0.DataframePart".into() } } +/// used to define which column we want to index +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct IndexColumn { + /// The path of the entity. + #[prost(message, optional, tag = "1")] + pub entity_path: ::core::option::Option, + /// Optional name of the `Archetype` associated with this data. + #[prost(string, optional, tag = "2")] + pub archetype_name: ::core::option::Option<::prost::alloc::string::String>, + /// Optional name of the field within `Archetype` associated with this data. + #[prost(string, optional, tag = "3")] + pub archetype_field_name: ::core::option::Option<::prost::alloc::string::String>, + /// Semantic name associated with this data. + #[prost(string, tag = "4")] + pub component_name: ::prost::alloc::string::String, +} +impl ::prost::Name for IndexColumn { + const NAME: &'static str = "IndexColumn"; + const PACKAGE: &'static str = "rerun.remote_store.v0"; + fn full_name() -> ::prost::alloc::string::String { + "rerun.remote_store.v0.IndexColumn".into() + } + fn type_url() -> ::prost::alloc::string::String { + "/rerun.remote_store.v0.IndexColumn".into() + } +} #[derive(Clone, PartialEq, ::prost::Message)] pub struct CreateCollectionIndexRequest { /// which collection do we want to create index for @@ -29,9 +55,8 @@ pub struct CreateCollectionIndexRequest { #[prost(message, optional, tag = "2")] pub properties: ::core::option::Option, /// Component / column we want to index - /// TODO - name of the lance table should be derived from the descriptor! #[prost(message, optional, tag = "3")] - pub column: ::core::option::Option, + pub column: ::core::option::Option, /// What is the filter index i.e. timeline for which we /// will query the timepoints /// TODO(zehiko) this might go away and we might just index @@ -149,7 +174,7 @@ pub struct QueryCollectionIndexRequest { pub collection: ::core::option::Option, /// Index column that is queried #[prost(message, optional, tag = "2")] - pub column: ::core::option::Option, + pub column: ::core::option::Option, /// Query data - type of data is index specific. Caller must ensure /// to provide the right type. For vector search this should /// be a vector of appropriate size, for inverted index this should be a string. From 8ab34a8943f901af499b162939984bdc9192f45d Mon Sep 17 00:00:00 2001 From: Zeljko Mihaljcic <7150613+zehiko@users.noreply.github.com> Date: Thu, 30 Jan 2025 16:26:59 +0100 Subject: [PATCH 09/10] hacky way to expose this through python bindings --- examples/python/remote/metadata.py | 22 ++ pixi.toml | 2 +- rerun_py/rerun_bindings/rerun_bindings.pyi | 156 ++++++++++++- rerun_py/rerun_bindings/types.py | 7 + rerun_py/rerun_sdk/rerun/remote.py | 8 +- rerun_py/src/dataframe.rs | 18 +- rerun_py/src/remote.rs | 251 ++++++++++++++++++++- 7 files changed, 452 insertions(+), 12 deletions(-) diff --git a/examples/python/remote/metadata.py b/examples/python/remote/metadata.py index 8e83de23d801..78db2fe58e3d 100644 --- a/examples/python/remote/metadata.py +++ b/examples/python/remote/metadata.py @@ -17,6 +17,8 @@ print_schema_cmd = subparsers.add_parser("print-schema", help="Print schema for a recording in the catalog") register_cmd = subparsers.add_parser("register", help="Register a new recording") update_cmd = subparsers.add_parser("update", help="Update metadata for a recording") + create_index_cmd = subparsers.add_parser("create-index", help="Create an index for a collection") + query_collection_index_cmd = subparsers.add_parser("query-collection-index", help="Query an index for a collection") update_cmd.add_argument("id", help="ID of the recording to update") update_cmd.add_argument("key", help="Key of the metadata to update") @@ -29,6 +31,12 @@ print_schema_cmd.add_argument("recording_id", help="Recording ID to print schema for") + create_index_cmd.add_argument("--entity-path", help="entity path of component to index") + create_index_cmd.add_argument("--index-column", help="index column name") + + query_collection_index_cmd.add_argument("--entity-path", help="entity path of component to index") + query_collection_index_cmd.add_argument("--index-column", help="index column name") + args = parser.parse_args() # Register the new rrd @@ -68,3 +76,17 @@ schema = conn.get_recording_schema(id) for column in schema: print(column) + elif args.subcommand == "create-index": + props = rr.remote.VectorIndexProperties(5, 16, rr.remote.VectorDistanceMetric.L2) + column = rr.dataframe.ComponentColumnSelector(args.entity_path, args.index_column) + + index = rr.dataframe.IndexColumnSelector("log_tick") + + conn.create_collection_index("my_collection", props, column, index) + elif args.subcommand == "query-collection-index": + column = rr.dataframe.ComponentColumnSelector(args.entity_path, args.index_column) + query = pa.array([1, 2, 3], type=pa.int64()) + properties = rr.remote.VectorIndexQueryProperties(10) + + result = conn.query_collection_index("my_collection", query, column, properties, 10) + print(result) diff --git a/pixi.toml b/pixi.toml index c30864607e6c..5e84577e1cfa 100644 --- a/pixi.toml +++ b/pixi.toml @@ -298,7 +298,7 @@ js-build-all = { cmd = "yarn --cwd rerun_js workspaces run build", depends-on = # configured to not install outside venv (which is a good practice). PIP_REQUIRE_VIRTUALENV=0 disables this check. # - RERUN_ALLOW_MISSING_BIN is needed to allow maturin to run without the `rerun` binary being part of the rerun-sdk # package. -py-build-common = { cmd = "PIP_REQUIRE_VIRTUALENV=0 RERUN_ALLOW_MISSING_BIN=1 maturin develop --manifest-path rerun_py/Cargo.toml --extras=tests", depends-on = [ +py-build-common = { cmd = "PIP_REQUIRE_VIRTUALENV=0 RERUN_ALLOW_MISSING_BIN=1 maturin develop --manifest-path rerun_py/Cargo.toml --extras=tests --features extra,remote", depends-on = [ "rerun-build", # We need to build rerun-cli since it is bundled in the python package. ] } diff --git a/rerun_py/rerun_bindings/rerun_bindings.pyi b/rerun_py/rerun_bindings/rerun_bindings.pyi index 442a1a843351..d590b78a2984 100644 --- a/rerun_py/rerun_bindings/rerun_bindings.pyi +++ b/rerun_py/rerun_bindings/rerun_bindings.pyi @@ -1,9 +1,18 @@ import os +from enum import Enum from typing import Iterator, Optional, Sequence, Union import pyarrow as pa -from .types import AnyColumn, AnyComponentColumn, ComponentLike, IndexValuesLike, TableLike, ViewContentsLike +from .types import ( + AnyColumn, + AnyComponentColumn, + AnyIndexQueryProperties, + ComponentLike, + IndexValuesLike, + TableLike, + ViewContentsLike, +) class IndexColumnDescriptor: """ @@ -134,6 +143,94 @@ class ComponentColumnSelector: """ ... +class VectorDistanceMetric(Enum): + """Which distance metric for use for vector index.""" + + L2 = 1 + COSINE = 2 + DOT = 3 + HAMMING = 4 + ... + +class VectorIndexProperties: + """Properties that define which kind of index is to be created.""" + + def __init__(self, num_partitions: int, num_sub_vectors: int, distance_metric: VectorDistanceMetric): + """ + Create new `VectorIndexProperties`. + + Parameters + ---------- + num_partitions : int + The number of partitions for the index. + num_sub_vectors : int + The number of sub-vectors for the index. + distance_metric : VectorDistanceMetric + The distance metric to use for the index. + + """ + + ... + + @property + def num_partitions(self) -> int: + """ + Number of partitions. + + This property is read-only. + """ + ... + + @property + def num_sub_vectors(self) -> int: + """ + Number of sub-vectors. + + This property is read-only. + """ + ... + + @property + def distance_metric(self) -> VectorDistanceMetric: + """ + Vector distance metric. + + This property is read-only. + """ + ... + +class VectorIndexQueryProperties: + """Vector index search properties.""" + + def __init__(self, top_k: int): + """ + Create new `VectorIndexQueryProperties`. + + Parameters + ---------- + top_k : int + The number of results to return. + + """ + + ... + + @property + def top_k(self) -> int: + """ + Number of results to return. + + This property is read-only. + """ + ... + +class InvertedIndexQueryProperties: + """Inverted index search properties.""" + + def __init__(self, columns: list[str]): + """Create new `InvertedIndexQueryProperties`.""" + ... + class Schema: """ The schema representing a set of available columns. @@ -679,6 +776,63 @@ class StorageNodeClient: """ ... + def create_collection_index( + self, + collection: str, + # TODO(zehiko) enum of different properties + properties: VectorIndexProperties, + column: ComponentColumnSelector, + time_index: IndexColumnSelector, + ) -> None: + """ + Create a collection index. + + Parameters + ---------- + collection : str + The name of the collection. + properties : VectorIndexProperties + The properties of the collection index. + column : ComponentColumnSelector + The component column to index. + time_index : IndexColumnSelector + The index column to use for the time index. + + """ + ... + + def query_collection_index( + self, + collection: str, + query: TableLike, + column: ComponentColumnSelector, + properties: AnyIndexQueryProperties, + limit: Optional[int], + ) -> pa.RecordBatchReader: + """ + Query a collection index. + + Parameters + ---------- + collection : str + The name of the collection. + query : pa.RecordBatch + The query to run. + column : ComponentColumnSelector + The component column to query. + properties : IndexQueryProperties + The properties of the query. + limit : Optional[int] + The maximum number of results to return. + + Returns + ------- + pa.RecordBatchReader + The results of the query. + + """ + ... + def connect(addr: str) -> StorageNodeClient: """ Load a rerun archive from an RRD file. diff --git a/rerun_py/rerun_bindings/types.py b/rerun_py/rerun_bindings/types.py index a38e70036d34..254644f0db64 100644 --- a/rerun_py/rerun_bindings/types.py +++ b/rerun_py/rerun_bindings/types.py @@ -15,6 +15,8 @@ ComponentColumnSelector as ComponentColumnSelector, IndexColumnDescriptor as IndexColumnDescriptor, IndexColumnSelector as IndexColumnSelector, + InvertedIndexQueryProperties as InvertedIndexQueryProperties, + VectorIndexQueryProperties as VectorIndexQueryProperties, ) AnyColumn: TypeAlias = Union[ @@ -72,3 +74,8 @@ """ A type alias for TableLike pyarrow objects. """ + +AnyIndexQueryProperties: TypeAlias = Union[VectorIndexQueryProperties, InvertedIndexQueryProperties] +""" +A type alias for any index query properties. +""" diff --git a/rerun_py/rerun_sdk/rerun/remote.py b/rerun_py/rerun_sdk/rerun/remote.py index 5fad8347a3e0..2959beb74ee4 100644 --- a/rerun_py/rerun_sdk/rerun/remote.py +++ b/rerun_py/rerun_sdk/rerun/remote.py @@ -2,10 +2,16 @@ try: from rerun_bindings import ( + InvertedIndexQueryProperties as InvertedIndexQueryProperties, StorageNodeClient as StorageNodeClient, + VectorDistanceMetric as VectorDistanceMetric, + VectorIndexProperties as VectorIndexProperties, + VectorIndexQueryProperties as VectorIndexQueryProperties, connect as connect, ) -except ImportError: + +except ImportError as e: + print("import failed: ", e) def connect(addr: str) -> StorageNodeClient: raise NotImplementedError("Rerun SDK was built without the `remote` feature enabled.") diff --git a/rerun_py/src/dataframe.rs b/rerun_py/src/dataframe.rs index 01fd441f1fd3..9d03268e5519 100644 --- a/rerun_py/src/dataframe.rs +++ b/rerun_py/src/dataframe.rs @@ -111,7 +111,7 @@ impl From for PyIndexColumnDescriptor { /// The name of the index to select. Usually the name of a timeline. #[pyclass(frozen, name = "IndexColumnSelector")] #[derive(Clone)] -struct PyIndexColumnSelector(TimeColumnSelector); +pub struct PyIndexColumnSelector(TimeColumnSelector); #[pymethods] impl PyIndexColumnSelector { @@ -138,6 +138,12 @@ impl PyIndexColumnSelector { } } +impl From for TimeColumnSelector { + fn from(selector: PyIndexColumnSelector) -> Self { + selector.0 + } +} + /// The descriptor of a component column. /// /// Component columns contain the data for a specific component of an entity. @@ -147,7 +153,7 @@ impl PyIndexColumnSelector { /// column, use [`ComponentColumnSelector`][rerun.dataframe.ComponentColumnSelector]. #[pyclass(frozen, name = "ComponentColumnDescriptor")] #[derive(Clone)] -struct PyComponentColumnDescriptor(ComponentColumnDescriptor); +pub struct PyComponentColumnDescriptor(ComponentColumnDescriptor); impl From for PyComponentColumnDescriptor { fn from(desc: ComponentColumnDescriptor) -> Self { @@ -212,7 +218,7 @@ impl From for ComponentColumnDescriptor { /// The component to select #[pyclass(frozen, name = "ComponentColumnSelector")] #[derive(Clone)] -struct PyComponentColumnSelector(ComponentColumnSelector); +pub struct PyComponentColumnSelector(ComponentColumnSelector); #[pymethods] impl PyComponentColumnSelector { @@ -251,6 +257,12 @@ impl PyComponentColumnSelector { } } +impl From for ComponentColumnSelector { + fn from(selector: PyComponentColumnSelector) -> Self { + selector.0 + } +} + /// A type alias for any component-column-like object. #[derive(FromPyObject)] enum AnyColumn { diff --git a/rerun_py/src/remote.rs b/rerun_py/src/remote.rs index 6670e4455cd3..3521b001157a 100644 --- a/rerun_py/src/remote.rs +++ b/rerun_py/src/remote.rs @@ -19,25 +19,34 @@ use tokio_stream::StreamExt; use re_arrow_util::ArrowArrayDowncastRef as _; use re_chunk::Chunk; use re_chunk_store::ChunkStore; -use re_dataframe::{ChunkStoreHandle, QueryExpression, SparseFillStrategy, ViewContentsSelector}; +use re_dataframe::{ + ChunkStoreHandle, ComponentColumnSelector, QueryExpression, SparseFillStrategy, + TimeColumnSelector, ViewContentsSelector, +}; use re_grpc_client::TonicStatusError; use re_log_encoding::codec::wire::{decoder::Decode, encoder::Encode}; use re_log_types::{EntityPathFilter, StoreInfo, StoreSource}; use re_protos::{ - common::v0::RecordingId, + common::v0::{EntityPath, IndexColumnSelector, RecordingId}, remote_store::v0::{ - storage_node_client::StorageNodeClient, CatalogFilter, ColumnProjection, - FetchRecordingRequest, GetRecordingSchemaRequest, QueryCatalogRequest, QueryRequest, - RecordingType, RegisterRecordingRequest, UpdateCatalogRequest, + index_properties::Props, storage_node_client::StorageNodeClient, CatalogFilter, Collection, + ColumnProjection, FetchRecordingRequest, GetRecordingSchemaRequest, IndexColumn, + QueryCatalogRequest, QueryCollectionIndexRequest, QueryRequest, RecordingType, + RegisterRecordingRequest, UpdateCatalogRequest, VectorIvfPqIndex, }, }; use re_sdk::{ApplicationId, ComponentName, StoreId, StoreKind, Time, Timeline}; -use crate::dataframe::{ComponentLike, PyRecording, PyRecordingHandle, PyRecordingView, PySchema}; +use crate::dataframe::{ + ComponentLike, PyComponentColumnSelector, PyIndexColumnSelector, PyRecording, + PyRecordingHandle, PyRecordingView, PySchema, +}; /// Register the `rerun.remote` module. pub(crate) fn register(m: &Bound<'_, PyModule>) -> PyResult<()> { m.add_class::()?; + m.add_class::()?; + m.add_class::()?; m.add_function(wrap_pyfunction!(connect, m)?)?; @@ -360,6 +369,192 @@ impl PyStorageNodeClient { }) } + /// Create a collection index. + /// + /// Parameters + /// ---------- + /// collection : str + /// The name of the collection to create the index for. + /// properties : VectorIndexProperties + /// The properties of the index. + /// column : AnyColumn + /// The column to index. + /// time_index : AnyColumn + /// The time index. + #[pyo3(signature = ( + collection, + properties, + column, + time_index, + ))] + fn create_collection_index( + &mut self, + collection: String, + properties: PyVectorIndexProperties, + column: PyComponentColumnSelector, + time_index: PyIndexColumnSelector, + ) -> PyResult<()> { + self.runtime.block_on(async { + let time_selector: TimeColumnSelector = time_index.into(); + let column_selector: ComponentColumnSelector = column.into(); + + let index_column = IndexColumn { + entity_path: Some(EntityPath { + path: column_selector.entity_path.to_string(), + }), + archetype_name: None, + archetype_field_name: None, + component_name: column_selector.component_name, + }; + + let time_index = IndexColumnSelector { + timeline: Some(re_protos::common::v0::Timeline { + name: time_selector.timeline.to_string(), + }), + }; + + self.client + .create_collection_index( + re_protos::remote_store::v0::CreateCollectionIndexRequest { + collection: Some(Collection { name: collection }), + properties: Some(re_protos::remote_store::v0::IndexProperties { + props: Some(Props::Vector(VectorIvfPqIndex { + num_partitions: properties.num_partitions, + num_sub_vectors: properties.num_sub_vectors, + + distance_metrics: match properties.distance_metric { + PyVectorDistanceMetric::L2 => { + re_protos::remote_store::v0::VectorDistanceMetric::L2 as i32 + } + PyVectorDistanceMetric::Cosine => { + re_protos::remote_store::v0::VectorDistanceMetric::Cosine + as i32 + } + PyVectorDistanceMetric::Dot => { + re_protos::remote_store::v0::VectorDistanceMetric::Dot + as i32 + } + PyVectorDistanceMetric::Hamming => { + re_protos::remote_store::v0::VectorDistanceMetric::Hamming + as i32 + } + }, + })), + }), + column: Some(index_column), + time_index: Some(time_index.into()), + }, + ) + .await + .map_err(|err| PyRuntimeError::new_err(err.to_string()))?; + + Ok(()) + }) + } + + /// Query the collection index. + /// + /// Parameters + /// ---------- + /// collection : str + /// The name of the collection to query. + /// query : Table | RecordBatch + /// The query to run. + /// column : ComponentColumnSelector + /// The component column to query. + /// properties : IndexQueryProperties + /// The properties of the query. + /// limit : Optional[int] + /// The maximum number of results to return. + /// Returns + /// ------- + /// pa.RecordBatchReader + /// The result of the query. + #[pyo3(signature = ( + collection, + query, + column, + properties, + limit = None + ))] + fn query_collection_index( + &mut self, + collection: String, + query: MetadataLike, + column: PyComponentColumnSelector, + properties: PyIndexQueryProperties, + limit: Option, + ) -> PyResult>> { + let reader = self.runtime.block_on(async { + let column_selector: ComponentColumnSelector = column.into(); + let query = query.into_record_batch()?; + + let transport_chunks = self + .client + .query_collection_index(QueryCollectionIndexRequest { + collection: Some(Collection { name: collection }), + column: Some(IndexColumn { + entity_path: Some(EntityPath { + path: column_selector.entity_path.to_string(), + }), + archetype_name: None, + archetype_field_name: None, + component_name: column_selector.component_name, + }), + properties: Some(match properties { + PyIndexQueryProperties::Vector { top_k } => { + re_protos::remote_store::v0::IndexQueryProperties { + props: Some(re_protos::remote_store::v0::index_query_properties::Props::Vector( + re_protos::remote_store::v0::VectorIndexQuery { top_k }, + )), + } + } + PyIndexQueryProperties::Inverted { columns: _ } => { + re_protos::remote_store::v0::IndexQueryProperties { + props: Some(re_protos::remote_store::v0::index_query_properties::Props::Inverted( + re_protos::remote_store::v0::InvertedIndexQuery {}, + )), + } + } + }), + query: Some( + query + .encode() + .map_err(|err| PyRuntimeError::new_err(err.to_string()))?, + ), + limit, + }) + .await + .map_err(|err| PyRuntimeError::new_err(err.to_string()))? + .into_inner() + .map(|resp| { + resp.and_then(|r| { + r.decode() + .map_err(|err| tonic::Status::internal(err.to_string())) + }) + }) + .collect::, _>>() + .await + .map_err(|err| PyRuntimeError::new_err(err.to_string()))?; + + let record_batches: Vec> = + transport_chunks.into_iter().map(Ok).collect(); + + // TODO(jleibs): surfacing this schema is awkward. This should be more explicit in + // the gRPC APIs somehow. + let schema = record_batches + .first() + .and_then(|batch| batch.as_ref().ok().map(|batch| batch.schema())) + .unwrap_or(std::sync::Arc::new(ArrowSchema::empty())); + + let reader = RecordBatchIterator::new(record_batches, schema); + + Ok::<_, PyErr>(reader) + })?; + + Ok(PyArrowType(Box::new(reader))) + } + /// Update the catalog metadata for one or more recordings. /// /// The updates are provided as a pyarrow Table or RecordBatch containing the metadata to update. @@ -510,6 +705,50 @@ impl PyStorageNodeClient { } } +#[pyclass(frozen, name = "VectorIndexProperties")] +#[derive(Clone)] +struct PyVectorIndexProperties { + num_partitions: u32, + num_sub_vectors: u32, + distance_metric: PyVectorDistanceMetric, +} + +#[pymethods] +impl PyVectorIndexProperties { + #[new] + #[pyo3( + text_signature = "(self, num_partitions: int, num_sub_vectors: int, distance_metric: VectorDistanceMetric)" + )] + fn new( + num_partitions: u32, + num_sub_vectors: u32, + distance_metric: PyVectorDistanceMetric, + ) -> Self { + Self { + num_partitions, + num_sub_vectors, + distance_metric, + } + } +} + +#[pyclass(name = "VectorDistanceMetric", eq, eq_int)] +#[derive(Clone, Debug, PartialEq)] +enum PyVectorDistanceMetric { + L2, + Cosine, + Dot, + Hamming, +} + +/// A type alias for metadata. +#[derive(FromPyObject)] +enum PyIndexQueryProperties { + Vector { top_k: u32 }, + // TODO(zehiko) remove this as it's unnecessary + Inverted { columns: Vec }, +} + /// A type alias for metadata. #[derive(FromPyObject)] enum MetadataLike { From 33d70db0f783345674ec88665d8957577ba35ed6 Mon Sep 17 00:00:00 2001 From: Zeljko Mihaljcic <7150613+zehiko@users.noreply.github.com> Date: Thu, 30 Jan 2025 16:35:02 +0100 Subject: [PATCH 10/10] Revert "hacky way to expose this through python bindings" This reverts commit 8ab34a8943f901af499b162939984bdc9192f45d. --- examples/python/remote/metadata.py | 22 -- pixi.toml | 2 +- rerun_py/rerun_bindings/rerun_bindings.pyi | 156 +------------ rerun_py/rerun_bindings/types.py | 7 - rerun_py/rerun_sdk/rerun/remote.py | 8 +- rerun_py/src/dataframe.rs | 18 +- rerun_py/src/remote.rs | 251 +-------------------- 7 files changed, 12 insertions(+), 452 deletions(-) diff --git a/examples/python/remote/metadata.py b/examples/python/remote/metadata.py index 78db2fe58e3d..8e83de23d801 100644 --- a/examples/python/remote/metadata.py +++ b/examples/python/remote/metadata.py @@ -17,8 +17,6 @@ print_schema_cmd = subparsers.add_parser("print-schema", help="Print schema for a recording in the catalog") register_cmd = subparsers.add_parser("register", help="Register a new recording") update_cmd = subparsers.add_parser("update", help="Update metadata for a recording") - create_index_cmd = subparsers.add_parser("create-index", help="Create an index for a collection") - query_collection_index_cmd = subparsers.add_parser("query-collection-index", help="Query an index for a collection") update_cmd.add_argument("id", help="ID of the recording to update") update_cmd.add_argument("key", help="Key of the metadata to update") @@ -31,12 +29,6 @@ print_schema_cmd.add_argument("recording_id", help="Recording ID to print schema for") - create_index_cmd.add_argument("--entity-path", help="entity path of component to index") - create_index_cmd.add_argument("--index-column", help="index column name") - - query_collection_index_cmd.add_argument("--entity-path", help="entity path of component to index") - query_collection_index_cmd.add_argument("--index-column", help="index column name") - args = parser.parse_args() # Register the new rrd @@ -76,17 +68,3 @@ schema = conn.get_recording_schema(id) for column in schema: print(column) - elif args.subcommand == "create-index": - props = rr.remote.VectorIndexProperties(5, 16, rr.remote.VectorDistanceMetric.L2) - column = rr.dataframe.ComponentColumnSelector(args.entity_path, args.index_column) - - index = rr.dataframe.IndexColumnSelector("log_tick") - - conn.create_collection_index("my_collection", props, column, index) - elif args.subcommand == "query-collection-index": - column = rr.dataframe.ComponentColumnSelector(args.entity_path, args.index_column) - query = pa.array([1, 2, 3], type=pa.int64()) - properties = rr.remote.VectorIndexQueryProperties(10) - - result = conn.query_collection_index("my_collection", query, column, properties, 10) - print(result) diff --git a/pixi.toml b/pixi.toml index 5e84577e1cfa..c30864607e6c 100644 --- a/pixi.toml +++ b/pixi.toml @@ -298,7 +298,7 @@ js-build-all = { cmd = "yarn --cwd rerun_js workspaces run build", depends-on = # configured to not install outside venv (which is a good practice). PIP_REQUIRE_VIRTUALENV=0 disables this check. # - RERUN_ALLOW_MISSING_BIN is needed to allow maturin to run without the `rerun` binary being part of the rerun-sdk # package. -py-build-common = { cmd = "PIP_REQUIRE_VIRTUALENV=0 RERUN_ALLOW_MISSING_BIN=1 maturin develop --manifest-path rerun_py/Cargo.toml --extras=tests --features extra,remote", depends-on = [ +py-build-common = { cmd = "PIP_REQUIRE_VIRTUALENV=0 RERUN_ALLOW_MISSING_BIN=1 maturin develop --manifest-path rerun_py/Cargo.toml --extras=tests", depends-on = [ "rerun-build", # We need to build rerun-cli since it is bundled in the python package. ] } diff --git a/rerun_py/rerun_bindings/rerun_bindings.pyi b/rerun_py/rerun_bindings/rerun_bindings.pyi index d590b78a2984..442a1a843351 100644 --- a/rerun_py/rerun_bindings/rerun_bindings.pyi +++ b/rerun_py/rerun_bindings/rerun_bindings.pyi @@ -1,18 +1,9 @@ import os -from enum import Enum from typing import Iterator, Optional, Sequence, Union import pyarrow as pa -from .types import ( - AnyColumn, - AnyComponentColumn, - AnyIndexQueryProperties, - ComponentLike, - IndexValuesLike, - TableLike, - ViewContentsLike, -) +from .types import AnyColumn, AnyComponentColumn, ComponentLike, IndexValuesLike, TableLike, ViewContentsLike class IndexColumnDescriptor: """ @@ -143,94 +134,6 @@ class ComponentColumnSelector: """ ... -class VectorDistanceMetric(Enum): - """Which distance metric for use for vector index.""" - - L2 = 1 - COSINE = 2 - DOT = 3 - HAMMING = 4 - ... - -class VectorIndexProperties: - """Properties that define which kind of index is to be created.""" - - def __init__(self, num_partitions: int, num_sub_vectors: int, distance_metric: VectorDistanceMetric): - """ - Create new `VectorIndexProperties`. - - Parameters - ---------- - num_partitions : int - The number of partitions for the index. - num_sub_vectors : int - The number of sub-vectors for the index. - distance_metric : VectorDistanceMetric - The distance metric to use for the index. - - """ - - ... - - @property - def num_partitions(self) -> int: - """ - Number of partitions. - - This property is read-only. - """ - ... - - @property - def num_sub_vectors(self) -> int: - """ - Number of sub-vectors. - - This property is read-only. - """ - ... - - @property - def distance_metric(self) -> VectorDistanceMetric: - """ - Vector distance metric. - - This property is read-only. - """ - ... - -class VectorIndexQueryProperties: - """Vector index search properties.""" - - def __init__(self, top_k: int): - """ - Create new `VectorIndexQueryProperties`. - - Parameters - ---------- - top_k : int - The number of results to return. - - """ - - ... - - @property - def top_k(self) -> int: - """ - Number of results to return. - - This property is read-only. - """ - ... - -class InvertedIndexQueryProperties: - """Inverted index search properties.""" - - def __init__(self, columns: list[str]): - """Create new `InvertedIndexQueryProperties`.""" - ... - class Schema: """ The schema representing a set of available columns. @@ -776,63 +679,6 @@ class StorageNodeClient: """ ... - def create_collection_index( - self, - collection: str, - # TODO(zehiko) enum of different properties - properties: VectorIndexProperties, - column: ComponentColumnSelector, - time_index: IndexColumnSelector, - ) -> None: - """ - Create a collection index. - - Parameters - ---------- - collection : str - The name of the collection. - properties : VectorIndexProperties - The properties of the collection index. - column : ComponentColumnSelector - The component column to index. - time_index : IndexColumnSelector - The index column to use for the time index. - - """ - ... - - def query_collection_index( - self, - collection: str, - query: TableLike, - column: ComponentColumnSelector, - properties: AnyIndexQueryProperties, - limit: Optional[int], - ) -> pa.RecordBatchReader: - """ - Query a collection index. - - Parameters - ---------- - collection : str - The name of the collection. - query : pa.RecordBatch - The query to run. - column : ComponentColumnSelector - The component column to query. - properties : IndexQueryProperties - The properties of the query. - limit : Optional[int] - The maximum number of results to return. - - Returns - ------- - pa.RecordBatchReader - The results of the query. - - """ - ... - def connect(addr: str) -> StorageNodeClient: """ Load a rerun archive from an RRD file. diff --git a/rerun_py/rerun_bindings/types.py b/rerun_py/rerun_bindings/types.py index 254644f0db64..a38e70036d34 100644 --- a/rerun_py/rerun_bindings/types.py +++ b/rerun_py/rerun_bindings/types.py @@ -15,8 +15,6 @@ ComponentColumnSelector as ComponentColumnSelector, IndexColumnDescriptor as IndexColumnDescriptor, IndexColumnSelector as IndexColumnSelector, - InvertedIndexQueryProperties as InvertedIndexQueryProperties, - VectorIndexQueryProperties as VectorIndexQueryProperties, ) AnyColumn: TypeAlias = Union[ @@ -74,8 +72,3 @@ """ A type alias for TableLike pyarrow objects. """ - -AnyIndexQueryProperties: TypeAlias = Union[VectorIndexQueryProperties, InvertedIndexQueryProperties] -""" -A type alias for any index query properties. -""" diff --git a/rerun_py/rerun_sdk/rerun/remote.py b/rerun_py/rerun_sdk/rerun/remote.py index 2959beb74ee4..5fad8347a3e0 100644 --- a/rerun_py/rerun_sdk/rerun/remote.py +++ b/rerun_py/rerun_sdk/rerun/remote.py @@ -2,16 +2,10 @@ try: from rerun_bindings import ( - InvertedIndexQueryProperties as InvertedIndexQueryProperties, StorageNodeClient as StorageNodeClient, - VectorDistanceMetric as VectorDistanceMetric, - VectorIndexProperties as VectorIndexProperties, - VectorIndexQueryProperties as VectorIndexQueryProperties, connect as connect, ) - -except ImportError as e: - print("import failed: ", e) +except ImportError: def connect(addr: str) -> StorageNodeClient: raise NotImplementedError("Rerun SDK was built without the `remote` feature enabled.") diff --git a/rerun_py/src/dataframe.rs b/rerun_py/src/dataframe.rs index 9d03268e5519..01fd441f1fd3 100644 --- a/rerun_py/src/dataframe.rs +++ b/rerun_py/src/dataframe.rs @@ -111,7 +111,7 @@ impl From for PyIndexColumnDescriptor { /// The name of the index to select. Usually the name of a timeline. #[pyclass(frozen, name = "IndexColumnSelector")] #[derive(Clone)] -pub struct PyIndexColumnSelector(TimeColumnSelector); +struct PyIndexColumnSelector(TimeColumnSelector); #[pymethods] impl PyIndexColumnSelector { @@ -138,12 +138,6 @@ impl PyIndexColumnSelector { } } -impl From for TimeColumnSelector { - fn from(selector: PyIndexColumnSelector) -> Self { - selector.0 - } -} - /// The descriptor of a component column. /// /// Component columns contain the data for a specific component of an entity. @@ -153,7 +147,7 @@ impl From for TimeColumnSelector { /// column, use [`ComponentColumnSelector`][rerun.dataframe.ComponentColumnSelector]. #[pyclass(frozen, name = "ComponentColumnDescriptor")] #[derive(Clone)] -pub struct PyComponentColumnDescriptor(ComponentColumnDescriptor); +struct PyComponentColumnDescriptor(ComponentColumnDescriptor); impl From for PyComponentColumnDescriptor { fn from(desc: ComponentColumnDescriptor) -> Self { @@ -218,7 +212,7 @@ impl From for ComponentColumnDescriptor { /// The component to select #[pyclass(frozen, name = "ComponentColumnSelector")] #[derive(Clone)] -pub struct PyComponentColumnSelector(ComponentColumnSelector); +struct PyComponentColumnSelector(ComponentColumnSelector); #[pymethods] impl PyComponentColumnSelector { @@ -257,12 +251,6 @@ impl PyComponentColumnSelector { } } -impl From for ComponentColumnSelector { - fn from(selector: PyComponentColumnSelector) -> Self { - selector.0 - } -} - /// A type alias for any component-column-like object. #[derive(FromPyObject)] enum AnyColumn { diff --git a/rerun_py/src/remote.rs b/rerun_py/src/remote.rs index 3521b001157a..6670e4455cd3 100644 --- a/rerun_py/src/remote.rs +++ b/rerun_py/src/remote.rs @@ -19,34 +19,25 @@ use tokio_stream::StreamExt; use re_arrow_util::ArrowArrayDowncastRef as _; use re_chunk::Chunk; use re_chunk_store::ChunkStore; -use re_dataframe::{ - ChunkStoreHandle, ComponentColumnSelector, QueryExpression, SparseFillStrategy, - TimeColumnSelector, ViewContentsSelector, -}; +use re_dataframe::{ChunkStoreHandle, QueryExpression, SparseFillStrategy, ViewContentsSelector}; use re_grpc_client::TonicStatusError; use re_log_encoding::codec::wire::{decoder::Decode, encoder::Encode}; use re_log_types::{EntityPathFilter, StoreInfo, StoreSource}; use re_protos::{ - common::v0::{EntityPath, IndexColumnSelector, RecordingId}, + common::v0::RecordingId, remote_store::v0::{ - index_properties::Props, storage_node_client::StorageNodeClient, CatalogFilter, Collection, - ColumnProjection, FetchRecordingRequest, GetRecordingSchemaRequest, IndexColumn, - QueryCatalogRequest, QueryCollectionIndexRequest, QueryRequest, RecordingType, - RegisterRecordingRequest, UpdateCatalogRequest, VectorIvfPqIndex, + storage_node_client::StorageNodeClient, CatalogFilter, ColumnProjection, + FetchRecordingRequest, GetRecordingSchemaRequest, QueryCatalogRequest, QueryRequest, + RecordingType, RegisterRecordingRequest, UpdateCatalogRequest, }, }; use re_sdk::{ApplicationId, ComponentName, StoreId, StoreKind, Time, Timeline}; -use crate::dataframe::{ - ComponentLike, PyComponentColumnSelector, PyIndexColumnSelector, PyRecording, - PyRecordingHandle, PyRecordingView, PySchema, -}; +use crate::dataframe::{ComponentLike, PyRecording, PyRecordingHandle, PyRecordingView, PySchema}; /// Register the `rerun.remote` module. pub(crate) fn register(m: &Bound<'_, PyModule>) -> PyResult<()> { m.add_class::()?; - m.add_class::()?; - m.add_class::()?; m.add_function(wrap_pyfunction!(connect, m)?)?; @@ -369,192 +360,6 @@ impl PyStorageNodeClient { }) } - /// Create a collection index. - /// - /// Parameters - /// ---------- - /// collection : str - /// The name of the collection to create the index for. - /// properties : VectorIndexProperties - /// The properties of the index. - /// column : AnyColumn - /// The column to index. - /// time_index : AnyColumn - /// The time index. - #[pyo3(signature = ( - collection, - properties, - column, - time_index, - ))] - fn create_collection_index( - &mut self, - collection: String, - properties: PyVectorIndexProperties, - column: PyComponentColumnSelector, - time_index: PyIndexColumnSelector, - ) -> PyResult<()> { - self.runtime.block_on(async { - let time_selector: TimeColumnSelector = time_index.into(); - let column_selector: ComponentColumnSelector = column.into(); - - let index_column = IndexColumn { - entity_path: Some(EntityPath { - path: column_selector.entity_path.to_string(), - }), - archetype_name: None, - archetype_field_name: None, - component_name: column_selector.component_name, - }; - - let time_index = IndexColumnSelector { - timeline: Some(re_protos::common::v0::Timeline { - name: time_selector.timeline.to_string(), - }), - }; - - self.client - .create_collection_index( - re_protos::remote_store::v0::CreateCollectionIndexRequest { - collection: Some(Collection { name: collection }), - properties: Some(re_protos::remote_store::v0::IndexProperties { - props: Some(Props::Vector(VectorIvfPqIndex { - num_partitions: properties.num_partitions, - num_sub_vectors: properties.num_sub_vectors, - - distance_metrics: match properties.distance_metric { - PyVectorDistanceMetric::L2 => { - re_protos::remote_store::v0::VectorDistanceMetric::L2 as i32 - } - PyVectorDistanceMetric::Cosine => { - re_protos::remote_store::v0::VectorDistanceMetric::Cosine - as i32 - } - PyVectorDistanceMetric::Dot => { - re_protos::remote_store::v0::VectorDistanceMetric::Dot - as i32 - } - PyVectorDistanceMetric::Hamming => { - re_protos::remote_store::v0::VectorDistanceMetric::Hamming - as i32 - } - }, - })), - }), - column: Some(index_column), - time_index: Some(time_index.into()), - }, - ) - .await - .map_err(|err| PyRuntimeError::new_err(err.to_string()))?; - - Ok(()) - }) - } - - /// Query the collection index. - /// - /// Parameters - /// ---------- - /// collection : str - /// The name of the collection to query. - /// query : Table | RecordBatch - /// The query to run. - /// column : ComponentColumnSelector - /// The component column to query. - /// properties : IndexQueryProperties - /// The properties of the query. - /// limit : Optional[int] - /// The maximum number of results to return. - /// Returns - /// ------- - /// pa.RecordBatchReader - /// The result of the query. - #[pyo3(signature = ( - collection, - query, - column, - properties, - limit = None - ))] - fn query_collection_index( - &mut self, - collection: String, - query: MetadataLike, - column: PyComponentColumnSelector, - properties: PyIndexQueryProperties, - limit: Option, - ) -> PyResult>> { - let reader = self.runtime.block_on(async { - let column_selector: ComponentColumnSelector = column.into(); - let query = query.into_record_batch()?; - - let transport_chunks = self - .client - .query_collection_index(QueryCollectionIndexRequest { - collection: Some(Collection { name: collection }), - column: Some(IndexColumn { - entity_path: Some(EntityPath { - path: column_selector.entity_path.to_string(), - }), - archetype_name: None, - archetype_field_name: None, - component_name: column_selector.component_name, - }), - properties: Some(match properties { - PyIndexQueryProperties::Vector { top_k } => { - re_protos::remote_store::v0::IndexQueryProperties { - props: Some(re_protos::remote_store::v0::index_query_properties::Props::Vector( - re_protos::remote_store::v0::VectorIndexQuery { top_k }, - )), - } - } - PyIndexQueryProperties::Inverted { columns: _ } => { - re_protos::remote_store::v0::IndexQueryProperties { - props: Some(re_protos::remote_store::v0::index_query_properties::Props::Inverted( - re_protos::remote_store::v0::InvertedIndexQuery {}, - )), - } - } - }), - query: Some( - query - .encode() - .map_err(|err| PyRuntimeError::new_err(err.to_string()))?, - ), - limit, - }) - .await - .map_err(|err| PyRuntimeError::new_err(err.to_string()))? - .into_inner() - .map(|resp| { - resp.and_then(|r| { - r.decode() - .map_err(|err| tonic::Status::internal(err.to_string())) - }) - }) - .collect::, _>>() - .await - .map_err(|err| PyRuntimeError::new_err(err.to_string()))?; - - let record_batches: Vec> = - transport_chunks.into_iter().map(Ok).collect(); - - // TODO(jleibs): surfacing this schema is awkward. This should be more explicit in - // the gRPC APIs somehow. - let schema = record_batches - .first() - .and_then(|batch| batch.as_ref().ok().map(|batch| batch.schema())) - .unwrap_or(std::sync::Arc::new(ArrowSchema::empty())); - - let reader = RecordBatchIterator::new(record_batches, schema); - - Ok::<_, PyErr>(reader) - })?; - - Ok(PyArrowType(Box::new(reader))) - } - /// Update the catalog metadata for one or more recordings. /// /// The updates are provided as a pyarrow Table or RecordBatch containing the metadata to update. @@ -705,50 +510,6 @@ impl PyStorageNodeClient { } } -#[pyclass(frozen, name = "VectorIndexProperties")] -#[derive(Clone)] -struct PyVectorIndexProperties { - num_partitions: u32, - num_sub_vectors: u32, - distance_metric: PyVectorDistanceMetric, -} - -#[pymethods] -impl PyVectorIndexProperties { - #[new] - #[pyo3( - text_signature = "(self, num_partitions: int, num_sub_vectors: int, distance_metric: VectorDistanceMetric)" - )] - fn new( - num_partitions: u32, - num_sub_vectors: u32, - distance_metric: PyVectorDistanceMetric, - ) -> Self { - Self { - num_partitions, - num_sub_vectors, - distance_metric, - } - } -} - -#[pyclass(name = "VectorDistanceMetric", eq, eq_int)] -#[derive(Clone, Debug, PartialEq)] -enum PyVectorDistanceMetric { - L2, - Cosine, - Dot, - Hamming, -} - -/// A type alias for metadata. -#[derive(FromPyObject)] -enum PyIndexQueryProperties { - Vector { top_k: u32 }, - // TODO(zehiko) remove this as it's unnecessary - Inverted { columns: Vec }, -} - /// A type alias for metadata. #[derive(FromPyObject)] enum MetadataLike {