Skip to content

Commit

Permalink
Adds basic geoparquet support (#94)
Browse files Browse the repository at this point in the history
We already write postgis geometry column as WKB formatted binary, as specified by [geoparquet spec](https://geoparquet.org/releases/v1.1.0/).

With this PR, we also write basic geoparquet metadata to key value metadata of the parquet file. We support only basic info (required by the spec) to be interoperable with [duckdb](https://github.com/duckdb/duckdb/blob/2e533ec9dfaa82baac24ae1104e501af723a565a/extension/parquet/geo_parquet.cpp#L179).
  • Loading branch information
aykut-bozkurt authored Jan 20, 2025
1 parent fcb5036 commit 30fb0c6
Show file tree
Hide file tree
Showing 6 changed files with 279 additions and 21 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ parquet = {version = "53", default-features = false, features = [
]}
pgrx = "=0.12.9"
rust-ini = "0.21"
serde = "1"
serde_json = "1"
tokio = {version = "1", default-features = false, features = ["rt", "time", "macros"]}
url = "2"

Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ There is currently only one GUC parameter to enable/disable the `pg_parquet`:
> * `numeric` is allowed by Postgres. (precision and scale not specified). These are represented by a default precision (38) and scale (9) instead of writing them as string. You get runtime error if your table tries to read or write a numeric value which is not allowed by the default precision and scale (29 integral digits before decimal point, 9 digits after decimal point).
> - (2) The `date` type is represented according to `Unix epoch` when writing to Parquet files. It is converted back according to `PostgreSQL epoch` when reading from Parquet files.
> - (3) The `timestamptz` and `timetz` types are adjusted to `UTC` when writing to Parquet files. They are converted back with `UTC` timezone when reading from Parquet files.
> - (4) The `geometry` type is represented as `BYTE_ARRAY` encoded as `WKB` when `postgis` extension is created. Otherwise, it is represented as `BYTE_ARRAY` with `STRING` logical type.
> - (4) The `geometry` type is represented as `BYTE_ARRAY` encoded as `WKB`, specified by [geoparquet spec](https://geoparquet.org/releases/v1.1.0/), when `postgis` extension is created. Otherwise, it is represented as `BYTE_ARRAY` with `STRING` logical type.
> - (5) `crunchy_map` is dependent on functionality provided by [Crunchy Bridge](https://www.crunchydata.com/products/crunchy-bridge). The `crunchy_map` type is represented as `GROUP` with `MAP` logical type when `crunchy_map` extension is created. Otherwise, it is represented as `BYTE_ARRAY` with `STRING` logical type.
> [!WARNING]
Expand Down
51 changes: 35 additions & 16 deletions src/arrow_parquet/parquet_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@ use arrow_schema::SchemaRef;
use parquet::{
arrow::{async_writer::ParquetObjectWriter, AsyncArrowWriter},
file::properties::{EnabledStatistics, WriterProperties},
format::KeyValue,
};
use pgrx::{heap_tuple::PgHeapTuple, pg_sys::RECORDOID, AllocatedByRust, PgTupleDesc};
use pgrx::{heap_tuple::PgHeapTuple, AllocatedByRust, PgTupleDesc};
use url::Url;

use crate::{
Expand All @@ -18,7 +19,10 @@ use crate::{
uri_utils::parquet_writer_from_uri,
},
pgrx_utils::{collect_attributes_for, CollectAttributesFor},
type_compat::{geometry::reset_postgis_context, map::reset_map_context},
type_compat::{
geometry::{geoparquet_metadata_json_from_tupledesc, reset_postgis_context},
map::reset_map_context,
},
PG_BACKEND_TOKIO_RUNTIME,
};

Expand All @@ -42,25 +46,11 @@ impl ParquetWriterContext {
compression_level: i32,
tupledesc: &PgTupleDesc,
) -> ParquetWriterContext {
debug_assert!(tupledesc.oid() == RECORDOID);

// Postgis and Map contexts are used throughout writing the parquet file.
// We need to reset them to avoid reading the stale data. (e.g. extension could be dropped)
reset_postgis_context();
reset_map_context();

let writer_props = WriterProperties::builder()
.set_statistics_enabled(EnabledStatistics::Page)
.set_compression(
PgParquetCompressionWithLevel {
compression,
compression_level,
}
.into(),
)
.set_created_by("pg_parquet".to_string())
.build();

let attributes = collect_attributes_for(CollectAttributesFor::CopyTo, tupledesc);

pgrx::debug2!(
Expand All @@ -71,6 +61,8 @@ impl ParquetWriterContext {
let schema = parse_arrow_schema_from_attributes(&attributes);
let schema = Arc::new(schema);

let writer_props = Self::writer_props(tupledesc, compression, compression_level);

let parquet_writer = parquet_writer_from_uri(&uri, schema.clone(), writer_props);

let attribute_contexts =
Expand All @@ -83,6 +75,33 @@ impl ParquetWriterContext {
}
}

fn writer_props(
tupledesc: &PgTupleDesc,
compression: PgParquetCompression,
compression_level: i32,
) -> WriterProperties {
let compression = PgParquetCompressionWithLevel {
compression,
compression_level,
};

let mut writer_props_builder = WriterProperties::builder()
.set_statistics_enabled(EnabledStatistics::Page)
.set_compression(compression.into())
.set_created_by("pg_parquet".to_string());

let geometry_columns_metadata_value = geoparquet_metadata_json_from_tupledesc(tupledesc);

if geometry_columns_metadata_value.is_some() {
let key_value_metadata = KeyValue::new("geo".into(), geometry_columns_metadata_value);

writer_props_builder =
writer_props_builder.set_key_value_metadata(Some(vec![key_value_metadata]));
}

writer_props_builder.build()
}

pub(crate) fn write_new_row_group(
&mut self,
tuples: Vec<Option<PgHeapTuple<AllocatedByRust>>>,
Expand Down
117 changes: 115 additions & 2 deletions src/pgrx_tests/copy_type_roundtrip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,20 @@ mod tests {
LOCAL_TEST_FILE_PATH,
};
use crate::type_compat::fallback_to_text::FallbackToText;
use crate::type_compat::geometry::Geometry;
use crate::type_compat::geometry::{
Geometry, GeometryColumnsMetadata, GeometryEncoding, GeometryType,
};
use crate::type_compat::map::Map;
use crate::type_compat::pg_arrow_type_conversions::{
DEFAULT_UNBOUNDED_NUMERIC_PRECISION, DEFAULT_UNBOUNDED_NUMERIC_SCALE,
};
use pgrx::pg_sys::Oid;
use pgrx::pg_test;
use pgrx::{
composite_type,
datum::{Date, Time, TimeWithTimeZone, Timestamp, TimestampWithTimeZone},
AnyNumeric, Spi,
};
use pgrx::{pg_test, JsonB};

#[pg_test]
fn test_int2() {
Expand Down Expand Up @@ -946,6 +948,117 @@ mod tests {
test_table.assert_expected_and_result_rows();
}

#[pg_test]
fn test_geometry_geoparquet_metadata() {
// Skip the test if postgis extension is not available
if !extension_exists("postgis") {
return;
}

let query = "DROP EXTENSION IF EXISTS postgis; CREATE EXTENSION postgis;";
Spi::run(query).unwrap();

let copy_to_query = format!(
"COPY (SELECT ST_GeomFromText('POINT(1 1)')::geometry(point) as a,
ST_GeomFromText('LINESTRING(0 0, 1 1)')::geometry(linestring) as b,
ST_GeomFromText('POLYGON((0 0, 1 1, 2 2, 0 0))')::geometry(polygon) as c,
ST_GeomFromText('MULTIPOINT((0 0), (1 1))')::geometry(multipoint) as d,
ST_GeomFromText('MULTILINESTRING((0 0, 1 1), (2 2, 3 3))')::geometry(multilinestring) as e,
ST_GeomFromText('MULTIPOLYGON(((0 0, 1 1, 2 2, 0 0)), ((3 3, 4 4, 5 5, 3 3)))')::geometry(multipolygon) as f,
ST_GeomFromText('GEOMETRYCOLLECTION(POINT(1 1), LINESTRING(0 0, 1 1))')::geometry(geometrycollection) as g
)
TO '{LOCAL_TEST_FILE_PATH}' WITH (format parquet);",
);
Spi::run(copy_to_query.as_str()).unwrap();

// Check geoparquet metadata
let geoparquet_metadata_query = format!(
"select encode(value, 'escape')::jsonb
from parquet.kv_metadata('{LOCAL_TEST_FILE_PATH}')
where encode(key, 'escape') = 'geo';",
);
let geoparquet_metadata_json = Spi::get_one::<JsonB>(geoparquet_metadata_query.as_str())
.unwrap()
.unwrap();

let geoparquet_metadata: GeometryColumnsMetadata =
serde_json::from_value(geoparquet_metadata_json.0).unwrap();

// assert common metadata
assert_eq!(geoparquet_metadata.version, "1.1.0");
assert_eq!(geoparquet_metadata.primary_column, "a");

// point
assert_eq!(
geoparquet_metadata.columns.get("a").unwrap().encoding,
GeometryEncoding::WKB
);
assert_eq!(
geoparquet_metadata.columns.get("a").unwrap().geometry_types,
vec![GeometryType::Point]
);

// linestring
assert_eq!(
geoparquet_metadata.columns.get("b").unwrap().encoding,
GeometryEncoding::WKB
);
assert_eq!(
geoparquet_metadata.columns.get("b").unwrap().geometry_types,
vec![GeometryType::LineString]
);

// polygon
assert_eq!(
geoparquet_metadata.columns.get("c").unwrap().encoding,
GeometryEncoding::WKB
);
assert_eq!(
geoparquet_metadata.columns.get("c").unwrap().geometry_types,
vec![GeometryType::Polygon]
);

// multipoint
assert_eq!(
geoparquet_metadata.columns.get("d").unwrap().encoding,
GeometryEncoding::WKB
);
assert_eq!(
geoparquet_metadata.columns.get("d").unwrap().geometry_types,
vec![GeometryType::MultiPoint]
);

// multilinestring
assert_eq!(
geoparquet_metadata.columns.get("e").unwrap().encoding,
GeometryEncoding::WKB
);
assert_eq!(
geoparquet_metadata.columns.get("e").unwrap().geometry_types,
vec![GeometryType::MultiLineString]
);

// multipolygon
assert_eq!(
geoparquet_metadata.columns.get("f").unwrap().encoding,
GeometryEncoding::WKB
);
assert_eq!(
geoparquet_metadata.columns.get("f").unwrap().geometry_types,
vec![GeometryType::MultiPolygon]
);

// geometrycollection
assert_eq!(
geoparquet_metadata.columns.get("g").unwrap().encoding,
GeometryEncoding::WKB
);
assert_eq!(
geoparquet_metadata.columns.get("g").unwrap().geometry_types,
vec![GeometryType::GeometryCollection]
);
}

#[pg_test]
fn test_complex_composite() {
Spi::run("CREATE TYPE dog AS (name text, age int);").unwrap();
Expand Down
Loading

0 comments on commit 30fb0c6

Please sign in to comment.