diff --git a/crates/store/re_protos/proto/rerun/v0/remote_store.proto b/crates/store/re_protos/proto/rerun/v0/remote_store.proto index 2ecafb23c345..f82d05816251 100644 --- a/crates/store/re_protos/proto/rerun/v0/remote_store.proto +++ b/crates/store/re_protos/proto/rerun/v0/remote_store.proto @@ -9,14 +9,15 @@ service StorageNode { rpc Query(QueryRequest) returns (stream DataframePart) {} rpc FetchRecording(FetchRecordingRequest) returns (stream rerun.common.v0.RerunChunk) {} - rpc IndexCollection(IndexCollectionRequest) returns (IndexCollectionResponse) {} - // Collection index query response is a RecordBatch with 2 columns: + rpc CreateCollectionIndex(CreateCollectionIndexRequest) returns (CreateCollectionIndexResponse) {} + // Collection index query response is a RecordBatch with 3 columns: // - 'resource_id' column with the id of the resource // - timepoint column with the values reprensenting the points in time // where index query matches. What time points are matched depends on the type of // index that is queried. For example for vector search it might be timepoints where // top-K matches are found within *each* resource in the collection. For inverted index // it might be timepoints where the query string is found in the indexed column + // - 'data' column with the data that is returned for the matched timepoints rpc QueryCollectionIndex(QueryCollectionIndexRequest) returns (stream DataframePart) {} // metadata API calls @@ -42,18 +43,21 @@ message DataframePart { bytes payload = 1000; } -// ---------------- IndexCollection ------------------ +// ---------------- CreateCollectionIndex ------------------ -message IndexCollectionRequest { +message CreateCollectionIndexRequest { // which collection do we want to create index for Collection collection = 1; // what kind of index do we want to create and what are // its index specific properties IndexProperties properties = 2; // Component / column we want to index + // TODO - name of the lance table should be derived from the descriptor! rerun.common.v0.ComponentColumnDescriptor column = 3; // What is the filter index i.e. timeline for which we // will query the timepoints + // TODO(zehiko) this might go away and we might just index + // across all the timelines rerun.common.v0.IndexColumnSelector time_index = 4; } @@ -68,7 +72,7 @@ message IndexProperties { message InvertedIndex { bool store_position = 1; string base_tokenizer = 2; - // TODO(zehiko) add properties as needed + // TODO(zehiko) add other properties as needed } message VectorIvfPqIndex { @@ -85,10 +89,12 @@ enum VectorDistanceMetric { } message BTreeIndex { - // TODO(zehiko) as properties as needed + // TODO(zehiko) add properties as needed } -message IndexCollectionResponse {} +message CreateCollectionIndexResponse { + uint64 indexed_rows = 1; +} // ---------------- QueryCollectionIndex ------------------ @@ -97,13 +103,22 @@ message QueryCollectionIndexRequest { // Collection we want to run the query against on // If not specified, the default collection is queried Collection collection = 1; - // Index type specific query properties - IndexQuery query = 2; + // Index column that is queried + rerun.common.v0.ComponentColumnDescriptor column = 2; + // Query data - type of data is index specific. Caller must ensure + // to provide the right type. For vector search this should + // be a vector of appropriate size, for inverted index this should be a string. + // Query data is represented as a unit (single row) RecordBatch with 1 column. + DataframePart query = 3; + // Index type specific properties + IndexQueryProperties properties = 4; + // max number of rows to be returned + optional uint32 limit = 5; } -message IndexQuery { +message IndexQueryProperties { // specific index query properties based on the index type - oneof query { + oneof props { InvertedIndexQuery inverted = 1; VectorIndexQuery vector = 2; BTreeIndexQuery btree = 3; @@ -111,32 +126,14 @@ message IndexQuery { } message InvertedIndexQuery { - // Query to execute represented as the arrow data - // Query should be a unit RecordBatch with 2 columns: - // - 'index' column with the name of the column we want to query - // - 'query' column with the value we want to query. It must be - // of utf8 type - DataframePart query = 1; // TODO(zehiko) add properties as needed } message VectorIndexQuery { - // Query to execute represented as the arrow data - // Query should be a unit RecordBatch with 2 columns: - // - 'index' column with the name of the column we want to query - // - 'query' column with the value we want to query. It must be of - // type of float32 array - DataframePart query = 1; uint32 top_k = 2; } message BTreeIndexQuery { - // Query to execute represented as the arrow data - // Query should be a unit RecordBatch with 2 columns: - // - 'index' column with the name of the column we want to query - // - 'query' column with the value we want to query. The type should - // be of the same type as the indexed column - DataframePart query = 1; // TODO(zehiko) add properties as needed } diff --git a/crates/store/re_protos/src/v0/rerun.remote_store.v0.rs b/crates/store/re_protos/src/v0/rerun.remote_store.v0.rs index 4bc8820a0ee0..443d655ef599 100644 --- a/crates/store/re_protos/src/v0/rerun.remote_store.v0.rs +++ b/crates/store/re_protos/src/v0/rerun.remote_store.v0.rs @@ -20,7 +20,7 @@ impl ::prost::Name for DataframePart { } } #[derive(Clone, PartialEq, ::prost::Message)] -pub struct IndexCollectionRequest { +pub struct CreateCollectionIndexRequest { /// which collection do we want to create index for #[prost(message, optional, tag = "1")] pub collection: ::core::option::Option, @@ -29,21 +29,24 @@ pub struct IndexCollectionRequest { #[prost(message, optional, tag = "2")] pub properties: ::core::option::Option, /// Component / column we want to index + /// TODO - name of the lance table should be derived from the descriptor! #[prost(message, optional, tag = "3")] pub column: ::core::option::Option, /// What is the filter index i.e. timeline for which we /// will query the timepoints + /// TODO(zehiko) this might go away and we might just index + /// across all the timelines #[prost(message, optional, tag = "4")] pub time_index: ::core::option::Option, } -impl ::prost::Name for IndexCollectionRequest { - const NAME: &'static str = "IndexCollectionRequest"; +impl ::prost::Name for CreateCollectionIndexRequest { + const NAME: &'static str = "CreateCollectionIndexRequest"; const PACKAGE: &'static str = "rerun.remote_store.v0"; fn full_name() -> ::prost::alloc::string::String { - "rerun.remote_store.v0.IndexCollectionRequest".into() + "rerun.remote_store.v0.CreateCollectionIndexRequest".into() } fn type_url() -> ::prost::alloc::string::String { - "/rerun.remote_store.v0.IndexCollectionRequest".into() + "/rerun.remote_store.v0.CreateCollectionIndexRequest".into() } } #[derive(Clone, PartialEq, ::prost::Message)] @@ -77,7 +80,7 @@ impl ::prost::Name for IndexProperties { pub struct InvertedIndex { #[prost(bool, tag = "1")] pub store_position: bool, - /// TODO(zehiko) add properties as needed + /// TODO(zehiko) add other properties as needed #[prost(string, tag = "2")] pub base_tokenizer: ::prost::alloc::string::String, } @@ -110,7 +113,7 @@ impl ::prost::Name for VectorIvfPqIndex { "/rerun.remote_store.v0.VectorIvfPqIndex".into() } } -/// TODO(zehiko) as properties as needed +/// TODO(zehiko) add properties as needed #[derive(Clone, Copy, PartialEq, ::prost::Message)] pub struct BTreeIndex {} impl ::prost::Name for BTreeIndex { @@ -124,15 +127,18 @@ impl ::prost::Name for BTreeIndex { } } #[derive(Clone, Copy, PartialEq, ::prost::Message)] -pub struct IndexCollectionResponse {} -impl ::prost::Name for IndexCollectionResponse { - const NAME: &'static str = "IndexCollectionResponse"; +pub struct CreateCollectionIndexResponse { + #[prost(uint64, tag = "1")] + pub indexed_rows: u64, +} +impl ::prost::Name for CreateCollectionIndexResponse { + const NAME: &'static str = "CreateCollectionIndexResponse"; const PACKAGE: &'static str = "rerun.remote_store.v0"; fn full_name() -> ::prost::alloc::string::String { - "rerun.remote_store.v0.IndexCollectionResponse".into() + "rerun.remote_store.v0.CreateCollectionIndexResponse".into() } fn type_url() -> ::prost::alloc::string::String { - "/rerun.remote_store.v0.IndexCollectionResponse".into() + "/rerun.remote_store.v0.CreateCollectionIndexResponse".into() } } #[derive(Clone, PartialEq, ::prost::Message)] @@ -141,9 +147,21 @@ pub struct QueryCollectionIndexRequest { /// If not specified, the default collection is queried #[prost(message, optional, tag = "1")] pub collection: ::core::option::Option, - /// Index type specific query properties + /// Index column that is queried #[prost(message, optional, tag = "2")] - pub query: ::core::option::Option, + pub column: ::core::option::Option, + /// Query data - type of data is index specific. Caller must ensure + /// to provide the right type. For vector search this should + /// be a vector of appropriate size, for inverted index this should be a string. + /// Query data is represented as a unit (single row) RecordBatch with 1 column. + #[prost(message, optional, tag = "3")] + pub query: ::core::option::Option, + /// Index type specific properties + #[prost(message, optional, tag = "4")] + pub properties: ::core::option::Option, + /// max number of rows to be returned + #[prost(uint32, optional, tag = "5")] + pub limit: ::core::option::Option, } impl ::prost::Name for QueryCollectionIndexRequest { const NAME: &'static str = "QueryCollectionIndexRequest"; @@ -155,17 +173,17 @@ impl ::prost::Name for QueryCollectionIndexRequest { "/rerun.remote_store.v0.QueryCollectionIndexRequest".into() } } -#[derive(Clone, PartialEq, ::prost::Message)] -pub struct IndexQuery { +#[derive(Clone, Copy, PartialEq, ::prost::Message)] +pub struct IndexQueryProperties { /// specific index query properties based on the index type - #[prost(oneof = "index_query::Query", tags = "1, 2, 3")] - pub query: ::core::option::Option, + #[prost(oneof = "index_query_properties::Props", tags = "1, 2, 3")] + pub props: ::core::option::Option, } -/// Nested message and enum types in `IndexQuery`. -pub mod index_query { +/// Nested message and enum types in `IndexQueryProperties`. +pub mod index_query_properties { /// specific index query properties based on the index type - #[derive(Clone, PartialEq, ::prost::Oneof)] - pub enum Query { + #[derive(Clone, Copy, PartialEq, ::prost::Oneof)] + pub enum Props { #[prost(message, tag = "1")] Inverted(super::InvertedIndexQuery), #[prost(message, tag = "2")] @@ -174,28 +192,19 @@ pub mod index_query { Btree(super::BTreeIndexQuery), } } -impl ::prost::Name for IndexQuery { - const NAME: &'static str = "IndexQuery"; +impl ::prost::Name for IndexQueryProperties { + const NAME: &'static str = "IndexQueryProperties"; const PACKAGE: &'static str = "rerun.remote_store.v0"; fn full_name() -> ::prost::alloc::string::String { - "rerun.remote_store.v0.IndexQuery".into() + "rerun.remote_store.v0.IndexQueryProperties".into() } fn type_url() -> ::prost::alloc::string::String { - "/rerun.remote_store.v0.IndexQuery".into() + "/rerun.remote_store.v0.IndexQueryProperties".into() } } -#[derive(Clone, PartialEq, ::prost::Message)] -pub struct InvertedIndexQuery { - /// Query to execute represented as the arrow data - /// Query should be a unit RecordBatch with 2 columns: - /// - 'index' column with the name of the column we want to query - /// - 'query' column with the value we want to query. It must be - /// of utf8 type - /// - /// TODO(zehiko) add properties as needed - #[prost(message, optional, tag = "1")] - pub query: ::core::option::Option, -} +/// TODO(zehiko) add properties as needed +#[derive(Clone, Copy, PartialEq, ::prost::Message)] +pub struct InvertedIndexQuery {} impl ::prost::Name for InvertedIndexQuery { const NAME: &'static str = "InvertedIndexQuery"; const PACKAGE: &'static str = "rerun.remote_store.v0"; @@ -206,15 +215,8 @@ impl ::prost::Name for InvertedIndexQuery { "/rerun.remote_store.v0.InvertedIndexQuery".into() } } -#[derive(Clone, PartialEq, ::prost::Message)] +#[derive(Clone, Copy, PartialEq, ::prost::Message)] pub struct VectorIndexQuery { - /// Query to execute represented as the arrow data - /// Query should be a unit RecordBatch with 2 columns: - /// - 'index' column with the name of the column we want to query - /// - 'query' column with the value we want to query. It must be of - /// type of float32 array - #[prost(message, optional, tag = "1")] - pub query: ::core::option::Option, #[prost(uint32, tag = "2")] pub top_k: u32, } @@ -228,18 +230,9 @@ impl ::prost::Name for VectorIndexQuery { "/rerun.remote_store.v0.VectorIndexQuery".into() } } -#[derive(Clone, PartialEq, ::prost::Message)] -pub struct BTreeIndexQuery { - /// Query to execute represented as the arrow data - /// Query should be a unit RecordBatch with 2 columns: - /// - 'index' column with the name of the column we want to query - /// - 'query' column with the value we want to query. The type should - /// be of the same type as the indexed column - /// - /// TODO(zehiko) add properties as needed - #[prost(message, optional, tag = "1")] - pub query: ::core::option::Option, -} +/// TODO(zehiko) add properties as needed +#[derive(Clone, Copy, PartialEq, ::prost::Message)] +pub struct BTreeIndexQuery {} impl ::prost::Name for BTreeIndexQuery { const NAME: &'static str = "BTreeIndexQuery"; const PACKAGE: &'static str = "rerun.remote_store.v0"; @@ -763,32 +756,33 @@ pub mod storage_node_client { )); self.inner.server_streaming(req, path, codec).await } - pub async fn index_collection( + pub async fn create_collection_index( &mut self, - request: impl tonic::IntoRequest, - ) -> std::result::Result, tonic::Status> + request: impl tonic::IntoRequest, + ) -> std::result::Result, tonic::Status> { self.inner.ready().await.map_err(|e| { tonic::Status::unknown(format!("Service was not ready: {}", e.into())) })?; let codec = tonic::codec::ProstCodec::default(); let path = http::uri::PathAndQuery::from_static( - "/rerun.remote_store.v0.StorageNode/IndexCollection", + "/rerun.remote_store.v0.StorageNode/CreateCollectionIndex", ); let mut req = request.into_request(); req.extensions_mut().insert(GrpcMethod::new( "rerun.remote_store.v0.StorageNode", - "IndexCollection", + "CreateCollectionIndex", )); self.inner.unary(req, path, codec).await } - /// Collection index query response is a RecordBatch with 2 columns: + /// Collection index query response is a RecordBatch with 3 columns: /// - 'resource_id' column with the id of the resource /// - timepoint column with the values reprensenting the points in time /// where index query matches. What time points are matched depends on the type of /// index that is queried. For example for vector search it might be timepoints where /// top-K matches are found within *each* resource in the collection. For inverted index /// it might be timepoints where the query string is found in the indexed column + /// - 'data' column with the data that is returned for the matched timepoints pub async fn query_collection_index( &mut self, request: impl tonic::IntoRequest, @@ -966,22 +960,23 @@ pub mod storage_node_server { &self, request: tonic::Request, ) -> std::result::Result, tonic::Status>; - async fn index_collection( + async fn create_collection_index( &self, - request: tonic::Request, - ) -> std::result::Result, tonic::Status>; + request: tonic::Request, + ) -> std::result::Result, tonic::Status>; /// Server streaming response type for the QueryCollectionIndex method. type QueryCollectionIndexStream: tonic::codegen::tokio_stream::Stream< Item = std::result::Result, > + std::marker::Send + 'static; - /// Collection index query response is a RecordBatch with 2 columns: + /// Collection index query response is a RecordBatch with 3 columns: /// - 'resource_id' column with the id of the resource /// - timepoint column with the values reprensenting the points in time /// where index query matches. What time points are matched depends on the type of /// index that is queried. For example for vector search it might be timepoints where /// top-K matches are found within *each* resource in the collection. For inverted index /// it might be timepoints where the query string is found in the indexed column + /// - 'data' column with the data that is returned for the matched timepoints async fn query_collection_index( &self, request: tonic::Request, @@ -1178,21 +1173,22 @@ pub mod storage_node_server { }; Box::pin(fut) } - "/rerun.remote_store.v0.StorageNode/IndexCollection" => { + "/rerun.remote_store.v0.StorageNode/CreateCollectionIndex" => { #[allow(non_camel_case_types)] - struct IndexCollectionSvc(pub Arc); - impl tonic::server::UnaryService - for IndexCollectionSvc + struct CreateCollectionIndexSvc(pub Arc); + impl + tonic::server::UnaryService + for CreateCollectionIndexSvc { - type Response = super::IndexCollectionResponse; + type Response = super::CreateCollectionIndexResponse; type Future = BoxFuture, tonic::Status>; fn call( &mut self, - request: tonic::Request, + request: tonic::Request, ) -> Self::Future { let inner = Arc::clone(&self.0); let fut = async move { - ::index_collection(&inner, request).await + ::create_collection_index(&inner, request).await }; Box::pin(fut) } @@ -1203,7 +1199,7 @@ pub mod storage_node_server { let max_encoding_message_size = self.max_encoding_message_size; let inner = self.inner.clone(); let fut = async move { - let method = IndexCollectionSvc(inner); + let method = CreateCollectionIndexSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) .apply_compression_config(