diff --git a/.devcontainer/.env b/.devcontainer/.env index 484b326..7e7f783 100644 --- a/.devcontainer/.env +++ b/.devcontainer/.env @@ -8,5 +8,15 @@ AWS_S3_TEST_BUCKET=testbucket MINIO_ROOT_USER=minioadmin MINIO_ROOT_PASSWORD=minioadmin +# Azure Blob tests +AZURE_STORAGE_ACCOUNT=devstoreaccount1 +AZURE_STORAGE_KEY="Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==" +AZURE_STORAGE_CONNECTION_STRING="DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;BlobEndpoint=http://localhost:10000/devstoreaccount1;" +AZURE_STORAGE_ENDPOINT=http://localhost:10000/devstoreaccount1 +AZURE_ALLOW_HTTP=true +AZURE_TEST_CONTAINER_NAME=testcontainer +AZURE_TEST_READ_ONLY_SAS="se=2100-05-05&sp=r&sv=2022-11-02&sr=c&sig=YMPFnAHKe9y0o3hFegncbwQTXtAyvsJEgPB2Ne1b9CQ%3D" +AZURE_TEST_READ_WRITE_SAS="se=2100-05-05&sp=rcw&sv=2022-11-02&sr=c&sig=TPz2jEz0t9L651t6rTCQr%2BOjmJHkM76tnCGdcyttnlA%3D" + # Others RUST_TEST_THREADS=1 diff --git a/.devcontainer/Dockerfile b/.devcontainer/Dockerfile index 522d00a..dfad9b6 100644 --- a/.devcontainer/Dockerfile +++ b/.devcontainer/Dockerfile @@ -12,6 +12,11 @@ RUN apt-get update && apt-get -y install build-essential libreadline-dev zlib1g- curl lsb-release ca-certificates gnupg sudo git \ nano net-tools awscli +# install azure-cli +RUN curl -sL https://packages.microsoft.com/keys/microsoft.asc | gpg --dearmor | tee /etc/apt/keyrings/microsoft.gpg > /dev/null +RUN echo "deb [arch=`dpkg --print-architecture` signed-by=/etc/apt/keyrings/microsoft.gpg] https://packages.microsoft.com/repos/azure-cli/ `lsb_release -cs` main" | tee /etc/apt/sources.list.d/azure-cli.list +RUN apt-get update && apt-get install -y azure-cli + # install Postgres RUN sh -c 'echo "deb https://apt.postgresql.org/pub/repos/apt $(lsb_release -cs)-pgdg main" > /etc/apt/sources.list.d/pgdg.list' RUN wget --quiet -O - https://www.postgresql.org/media/keys/ACCC4CF8.asc | apt-key add - diff --git a/.devcontainer/docker-compose.yml b/.devcontainer/docker-compose.yml index 616e225..a432f90 100644 --- a/.devcontainer/docker-compose.yml +++ b/.devcontainer/docker-compose.yml @@ -3,20 +3,23 @@ services: build: context: . dockerfile: Dockerfile - command: sleep infinity + entrypoint: "./entrypoint.sh" network_mode: host volumes: - ..:/workspace - ${USERPROFILE}${HOME}/.ssh:/home/rust/.ssh:ro - ${USERPROFILE}${HOME}/.ssh/known_hosts:/home/rust/.ssh/known_hosts:rw - ${USERPROFILE}${HOME}/.gitconfig:/home/rust/.gitconfig:ro - - ${USERPROFILE}${HOME}/.aws:/home/rust/.aws:ro + - ${USERPROFILE}${HOME}/.aws:/home/rust/.aws:rw + - ${USERPROFILE}${HOME}/.azure:/home/rust/.azure:rw + - ./entrypoint.sh:/entrypoint.sh env_file: - .env cap_add: - SYS_PTRACE depends_on: - minio + - azurite minio: image: minio/minio @@ -31,4 +34,16 @@ services: timeout: 2s retries: 3 volumes: - - ./minio-entrypoint.sh:/entrypoint.sh + - ./minio-entrypoint.sh:/entrypoint.sh + + azurite: + image: mcr.microsoft.com/azure-storage/azurite + env_file: + - .env + network_mode: host + restart: unless-stopped + healthcheck: + test: ["CMD", "nc", "-z", "localhost", "10000"] + interval: 6s + timeout: 2s + retries: 3 diff --git a/.devcontainer/entrypoint.sh b/.devcontainer/entrypoint.sh new file mode 100755 index 0000000..43944ed --- /dev/null +++ b/.devcontainer/entrypoint.sh @@ -0,0 +1,8 @@ +#!/bin/bash + +trap "echo 'Caught termination signal. Exiting...'; exit 0" SIGINT SIGTERM + +# create azurite container +az storage container create -n $AZURE_TEST_CONTAINER_NAME --connection-string $AZURE_STORAGE_CONNECTION_STRING + +sleep infinity diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 4afdc02..9a16df7 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -85,6 +85,11 @@ jobs: postgresql-client-${{ env.PG_MAJOR }} \ libpq-dev + - name: Install azure-cli + run: | + curl -sL https://packages.microsoft.com/keys/microsoft.asc | gpg --dearmor | sudo tee /etc/apt/keyrings/microsoft.gpg > /dev/null + echo "deb [arch=`dpkg --print-architecture` signed-by=/etc/apt/keyrings/microsoft.gpg] https://packages.microsoft.com/repos/azure-cli/ `lsb_release -cs` main" | sudo tee /etc/apt/sources.list.d/azure-cli.list + sudo apt-get update && sudo apt-get install -y azure-cli - name: Install and configure pgrx run: | @@ -112,7 +117,6 @@ jobs: -p 9000:9000 \ --entrypoint "./entrypoint.sh" \ --volume ./.devcontainer/minio-entrypoint.sh:/entrypoint.sh \ - --name miniocontainer \ minio/minio while ! curl $AWS_ENDPOINT_URL; do @@ -120,6 +124,21 @@ jobs: sleep 1 done + - name: Start Azurite for Azure Blob Storage emulator tests + run: | + docker run -d \ + --env-file .devcontainer/.env \ + -p 10000:10000 \ + mcr.microsoft.com/azure-storage/azurite + + while ! curl $AZURE_STORAGE_ENDPOINT; do + echo "Waiting for $AZURE_STORAGE_ENDPOINT..." + sleep 1 + done + + # create container + az storage container create -n $AZURE_TEST_CONTAINER_NAME --connection-string $AZURE_STORAGE_CONNECTION_STRING + - name: Run tests run: | # Run tests with coverage tool diff --git a/Cargo.lock b/Cargo.lock index 066b734..bfe857f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2,6 +2,12 @@ # It is not intended for manual editing. version = 4 +[[package]] +name = "RustyXML" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b5ace29ee3216de37c0546865ad08edef58b0f9e76838ed8959a84a990e58c5" + [[package]] name = "addr2line" version = "0.24.2" @@ -25,7 +31,7 @@ checksum = "e89da841a80418a9b391ebaea17f5c112ffaaa96f621d2c285b5174da76b9011" dependencies = [ "cfg-if", "const-random", - "getrandom", + "getrandom 0.2.15", "once_cell", "version_check", "zerocopy", @@ -137,7 +143,7 @@ dependencies = [ "arrow-schema", "chrono", "half 2.4.1", - "hashbrown", + "hashbrown 0.15.2", "num", ] @@ -264,6 +270,28 @@ dependencies = [ "regex-syntax", ] +[[package]] +name = "async-channel" +version = "1.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "81953c529336010edd6d8e358f886d9581267795c61b19475b71314bffa46d35" +dependencies = [ + "concurrent-queue", + "event-listener 2.5.3", + "futures-core", +] + +[[package]] +name = "async-lock" +version = "3.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ff6e472cdea888a4bd64f342f09b3f50e1886d32afe8df3d663c01140b811b18" +dependencies = [ + "event-listener 5.4.0", + "event-listener-strategy", + "pin-project-lite", +] + [[package]] name = "async-trait" version = "0.1.83" @@ -323,7 +351,7 @@ dependencies = [ "aws-smithy-types", "aws-types", "bytes", - "fastrand", + "fastrand 2.3.0", "http 0.2.12", "time", "tokio", @@ -358,7 +386,7 @@ dependencies = [ "aws-smithy-types", "aws-types", "bytes", - "fastrand", + "fastrand 2.3.0", "http 0.2.12", "http-body 0.4.6", "once_cell", @@ -484,7 +512,7 @@ dependencies = [ "aws-smithy-runtime-api", "aws-smithy-types", "bytes", - "fastrand", + "fastrand 2.3.0", "h2 0.3.26", "http 0.2.12", "http-body 0.4.6", @@ -526,7 +554,6 @@ dependencies = [ "base64-simd", "bytes", "bytes-utils", - "futures-core", "http 0.2.12", "http 1.2.0", "http-body 0.4.6", @@ -539,8 +566,6 @@ dependencies = [ "ryu", "serde", "time", - "tokio", - "tokio-util", ] [[package]] @@ -566,6 +591,52 @@ dependencies = [ "tracing", ] +[[package]] +name = "azure_core" +version = "0.21.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b552ad43a45a746461ec3d3a51dfb6466b4759209414b439c165eb6a6b7729e" +dependencies = [ + "async-trait", + "base64 0.22.1", + "bytes", + "dyn-clone", + "futures", + "getrandom 0.2.15", + "http-types", + "once_cell", + "paste", + "pin-project", + "quick-xml 0.31.0", + "rand 0.8.5", + "rustc_version 0.4.1", + "serde", + "serde_json", + "time", + "tracing", + "url", + "uuid", +] + +[[package]] +name = "azure_storage" +version = "0.21.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "59f838159f4d29cb400a14d9d757578ba495ae64feb07a7516bf9e4415127126" +dependencies = [ + "RustyXML", + "async-lock", + "async-trait", + "azure_core", + "bytes", + "serde", + "serde_derive", + "time", + "tracing", + "url", + "uuid", +] + [[package]] name = "backtrace" version = "0.3.74" @@ -581,6 +652,12 @@ dependencies = [ "windows-targets", ] +[[package]] +name = "base64" +version = "0.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9e1b586273c5702936fe7b7d6896644d8be71e6314cfe09d3167c95f712589e8" + [[package]] name = "base64" version = "0.21.7" @@ -876,6 +953,15 @@ version = "0.7.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f46ad14479a25103f283c0f10005961cf086d8dc42205bb44c46ac563475dca6" +[[package]] +name = "concurrent-queue" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4ca0197aee26d1ae37445ee532fefce43251d24cc7c166799f4d46817f1d3973" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "const-random" version = "0.1.18" @@ -891,7 +977,7 @@ version = "0.1.16" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f9d839f2a20b0aee515dc581a6172f2321f96cab76c1a38a4c584a194955390e" dependencies = [ - "getrandom", + "getrandom 0.2.15", "once_cell", "tiny-keccak", ] @@ -997,6 +1083,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b42b6fa04a440b495c8b04d0e71b707c585f83cb9cb28cf8cd0d976c315e31b4" dependencies = [ "powerfmt", + "serde", ] [[package]] @@ -1021,6 +1108,21 @@ dependencies = [ "syn", ] +[[package]] +name = "dlv-list" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "442039f5147480ba31067cb00ada1adae6892028e40e45fc5de7b7df6dcc1b5f" +dependencies = [ + "const-random", +] + +[[package]] +name = "dyn-clone" +version = "1.0.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0d6ef0072f8a535281e4876be788938b528e9a1d43900b82c2569af7da799125" + [[package]] name = "either" version = "1.13.0" @@ -1063,6 +1165,33 @@ dependencies = [ "windows-sys 0.59.0", ] +[[package]] +name = "event-listener" +version = "2.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0206175f82b8d6bf6652ff7d71a1e27fd2e4efde587fd368662814d6ec1d9ce0" + +[[package]] +name = "event-listener" +version = "5.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3492acde4c3fc54c845eaab3eed8bd00c7a7d881f78bfc801e43a93dec1331ae" +dependencies = [ + "concurrent-queue", + "parking", + "pin-project-lite", +] + +[[package]] +name = "event-listener-strategy" +version = "0.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3c3e4e0dd3673c1139bf041f3008816d9cf2946bbfac2945c09e523b8d7b05b2" +dependencies = [ + "event-listener 5.4.0", + "pin-project-lite", +] + [[package]] name = "eyre" version = "0.6.12" @@ -1079,6 +1208,15 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4443176a9f2c162692bd3d352d745ef9413eec5782a80d8fd6f8a1ac692a07f7" +[[package]] +name = "fastrand" +version = "1.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e51093e27b0797c359783294ca4f0a911c270184cb10f85783b118614a1501be" +dependencies = [ + "instant", +] + [[package]] name = "fastrand" version = "2.3.0" @@ -1180,6 +1318,21 @@ version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9e5c1b78ca4aae1ac06c48a526a655760685149f0d465d21f37abfe57ce075c6" +[[package]] +name = "futures-lite" +version = "1.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "49a9d51ce47660b1e808d3c990b4709f2f415d928835a17dfd16991515c46bce" +dependencies = [ + "fastrand 1.9.0", + "futures-core", + "futures-io", + "memchr", + "parking", + "pin-project-lite", + "waker-fn", +] + [[package]] name = "futures-macro" version = "0.3.31" @@ -1231,6 +1384,17 @@ dependencies = [ "version_check", ] +[[package]] +name = "getrandom" +version = "0.1.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8fc3cb4d91f53b50155bdcfd23f6a4c39ae1969c2ae85982b135750cccaf5fce" +dependencies = [ + "cfg-if", + "libc", + "wasi 0.9.0+wasi-snapshot-preview1", +] + [[package]] name = "getrandom" version = "0.2.15" @@ -1240,7 +1404,7 @@ dependencies = [ "cfg-if", "js-sys", "libc", - "wasi", + "wasi 0.11.0+wasi-snapshot-preview1", "wasm-bindgen", ] @@ -1320,6 +1484,12 @@ dependencies = [ "byteorder", ] +[[package]] +name = "hashbrown" +version = "0.14.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1" + [[package]] name = "hashbrown" version = "0.15.2" @@ -1428,6 +1598,26 @@ dependencies = [ "pin-project-lite", ] +[[package]] +name = "http-types" +version = "2.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e9b187a72d63adbfba487f48095306ac823049cb504ee195541e91c7775f5ad" +dependencies = [ + "anyhow", + "async-channel", + "base64 0.13.1", + "futures-lite", + "infer", + "pin-project-lite", + "rand 0.7.3", + "serde", + "serde_json", + "serde_qs", + "serde_urlencoded", + "url", +] + [[package]] name = "httparse" version = "1.9.5" @@ -1718,7 +1908,22 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "62f822373a4fe84d4bb149bf54e584a7f4abec90e072ed49cda0edea5b95471f" dependencies = [ "equivalent", - "hashbrown", + "hashbrown 0.15.2", +] + +[[package]] +name = "infer" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "64e9829a50b42bb782c1df523f78d332fe371b10c661e78b7a3c34b0198e9fac" + +[[package]] +name = "instant" +version = "0.1.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e0242819d153cba4b4b05a5a8f2a7e9bbf97b6055b2a002b395c96b5ff3c0222" +dependencies = [ + "cfg-if", ] [[package]] @@ -1957,7 +2162,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2886843bf800fba2e3377cff24abf6379b4c4d5c6681eaf9ea5b0d15090450bd" dependencies = [ "libc", - "wasi", + "wasi 0.11.0+wasi-snapshot-preview1", "windows-sys 0.52.0", ] @@ -2086,8 +2291,8 @@ dependencies = [ "md-5", "parking_lot", "percent-encoding", - "quick-xml", - "rand", + "quick-xml 0.36.2", + "rand 0.8.5", "reqwest", "ring", "serde", @@ -2120,6 +2325,16 @@ dependencies = [ "num-traits", ] +[[package]] +name = "ordered-multimap" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "49203cdcae0030493bad186b28da2fa25645fa276a51b6fec8010d281e02ef79" +dependencies = [ + "dlv-list", + "hashbrown 0.14.5", +] + [[package]] name = "outref" version = "0.5.1" @@ -2136,6 +2351,12 @@ dependencies = [ "supports-color 3.0.2", ] +[[package]] +name = "parking" +version = "2.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f38d5652c16fde515bb1ecef450ab0f6a219d619a7274976324d5e377f7dceba" + [[package]] name = "parking_lot" version = "0.12.3" @@ -2180,7 +2401,7 @@ dependencies = [ "flate2", "futures", "half 2.4.1", - "hashbrown", + "hashbrown 0.15.2", "lz4_flex", "num", "num-bigint", @@ -2247,13 +2468,15 @@ dependencies = [ "arrow-schema", "aws-config", "aws-credential-types", - "aws-sdk-sts", + "azure_storage", "futures", + "home", "object_store", "once_cell", "parquet", "pgrx", "pgrx-tests", + "rust-ini", "tokio", "url", ] @@ -2377,7 +2600,7 @@ dependencies = [ "pgrx-pg-config", "postgres", "proptest", - "rand", + "rand 0.8.5", "regex", "serde", "serde_json", @@ -2403,6 +2626,26 @@ dependencies = [ "siphasher", ] +[[package]] +name = "pin-project" +version = "1.1.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e2ec53ad785f4d35dac0adea7f7dc6f1bb277ad84a680c7afefeae05d1f5916" +dependencies = [ + "pin-project-internal", +] + +[[package]] +name = "pin-project-internal" +version = "1.1.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d56a66c0c55993aa927429d0f8a0abfd74f084e4d9c192cffed01e418d83eefb" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "pin-project-lite" version = "0.2.15" @@ -2448,7 +2691,7 @@ dependencies = [ "hmac", "md-5", "memchr", - "rand", + "rand 0.8.5", "sha2", "stringprep", ] @@ -2499,8 +2742,8 @@ dependencies = [ "bitflags 2.6.0", "lazy_static", "num-traits", - "rand", - "rand_chacha", + "rand 0.8.5", + "rand_chacha 0.3.1", "rand_xorshift", "regex-syntax", "rusty-fork", @@ -2514,6 +2757,16 @@ version = "1.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a1d01941d82fa2ab50be1e79e6714289dd7cde78eba4c074bc5a4374f650dfe0" +[[package]] +name = "quick-xml" +version = "0.31.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1004a344b30a54e2ee58d66a71b32d2db2feb0a31f9a2d302bf0536f15de2a33" +dependencies = [ + "memchr", + "serde", +] + [[package]] name = "quick-xml" version = "0.36.2" @@ -2549,8 +2802,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a2fe5ef3495d7d2e377ff17b1a8ce2ee2ec2a18cde8b6ad6619d65d0701c135d" dependencies = [ "bytes", - "getrandom", - "rand", + "getrandom 0.2.15", + "rand 0.8.5", "ring", "rustc-hash 2.1.0", "rustls 0.23.20", @@ -2591,6 +2844,19 @@ version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dc33ff2d4973d518d823d61aa239014831e521c75da58e3df4840d3f47749d09" +[[package]] +name = "rand" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6a6b1679d49b24bbfe0c803429aa1874472f50d9b363131f0e89fc356b544d03" +dependencies = [ + "getrandom 0.1.16", + "libc", + "rand_chacha 0.2.2", + "rand_core 0.5.1", + "rand_hc", +] + [[package]] name = "rand" version = "0.8.5" @@ -2598,8 +2864,18 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404" dependencies = [ "libc", - "rand_chacha", - "rand_core", + "rand_chacha 0.3.1", + "rand_core 0.6.4", +] + +[[package]] +name = "rand_chacha" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f4c8ed856279c9737206bf725bf36935d8666ead7aa69b52be55af369d193402" +dependencies = [ + "ppv-lite86", + "rand_core 0.5.1", ] [[package]] @@ -2609,7 +2885,16 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88" dependencies = [ "ppv-lite86", - "rand_core", + "rand_core 0.6.4", +] + +[[package]] +name = "rand_core" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "90bde5296fc891b0cef12a6d03ddccc162ce7b2aff54160af9338f8d40df6d19" +dependencies = [ + "getrandom 0.1.16", ] [[package]] @@ -2618,7 +2903,16 @@ version = "0.6.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ec0be4795e2f6a28069bec0b5ff3e2ac9bafc99e6a9a7dc3547996c5c816922c" dependencies = [ - "getrandom", + "getrandom 0.2.15", +] + +[[package]] +name = "rand_hc" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ca3129af7b92a17112d59ad498c6f81eaf463253766b90396d39ea7a39d6613c" +dependencies = [ + "rand_core 0.5.1", ] [[package]] @@ -2627,7 +2921,7 @@ version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d25bf25ec5ae4a3f1b92f929810509a2f53d7dca2f50b794ff57e3face536c8f" dependencies = [ - "rand_core", + "rand_core 0.6.4", ] [[package]] @@ -2747,13 +3041,24 @@ checksum = "c17fa4cb658e3583423e915b9f3acc01cceaee1860e33d59ebae66adc3a2dc0d" dependencies = [ "cc", "cfg-if", - "getrandom", + "getrandom 0.2.15", "libc", "spin", "untrusted", "windows-sys 0.52.0", ] +[[package]] +name = "rust-ini" +version = "0.21.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4e310ef0e1b6eeb79169a1171daf9abcb87a2e17c03bee2c4bb100b55c75409f" +dependencies = [ + "cfg-if", + "ordered-multimap", + "trim-in-place", +] + [[package]] name = "rustc-demangle" version = "0.1.24" @@ -3070,6 +3375,17 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_qs" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c7715380eec75f029a4ef7de39a9200e0a63823176b759d055b613f5a87df6a6" +dependencies = [ + "percent-encoding", + "serde", + "thiserror 1.0.69", +] + [[package]] name = "serde_spanned" version = "0.6.8" @@ -3285,7 +3601,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "28cce251fcbc87fac86a866eeb0d6c2d536fc16d06f184bb61aeae11aa4cee0c" dependencies = [ "cfg-if", - "fastrand", + "fastrand 2.3.0", "once_cell", "rustix", "windows-sys 0.59.0", @@ -3349,6 +3665,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "35e7868883861bd0e56d9ac6efcaaca0d6d5d82a2a7ec8209ff492c07cf37b21" dependencies = [ "deranged", + "itoa", + "js-sys", "num-conv", "powerfmt", "serde", @@ -3452,7 +3770,7 @@ dependencies = [ "pin-project-lite", "postgres-protocol", "postgres-types", - "rand", + "rand 0.8.5", "socket2", "tokio", "tokio-util", @@ -3563,6 +3881,12 @@ dependencies = [ "once_cell", ] +[[package]] +name = "trim-in-place" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "343e926fc669bc8cde4fa3129ab681c63671bae288b1f1081ceee6d9d37904fc" + [[package]] name = "try-lock" version = "0.2.5" @@ -3657,6 +3981,7 @@ dependencies = [ "form_urlencoded", "idna", "percent-encoding", + "serde", ] [[package]] @@ -3683,7 +4008,8 @@ version = "1.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f8c5f0a0af699448548ad1a2fbf920fb4bee257eae39953ba95cb84891a0446a" dependencies = [ - "getrandom", + "getrandom 0.2.15", + "serde", ] [[package]] @@ -3707,6 +4033,12 @@ dependencies = [ "libc", ] +[[package]] +name = "waker-fn" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "317211a0dc0ceedd78fb2ca9a44aed3d7b9b26f81870d485c07122b4350673b7" + [[package]] name = "walkdir" version = "2.5.0" @@ -3726,6 +4058,12 @@ dependencies = [ "try-lock", ] +[[package]] +name = "wasi" +version = "0.9.0+wasi-snapshot-preview1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cccddf32554fecc6acb585f82a32a72e28b48f8c4c1883ddfeeeaa96f7d8e519" + [[package]] name = "wasi" version = "0.11.0+wasi-snapshot-preview1" diff --git a/Cargo.toml b/Cargo.toml index e77e462..c545672 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,11 +23,12 @@ pg_test = [] arrow = {version = "53", default-features = false} arrow-cast = {version = "53", default-features = false} arrow-schema = {version = "53", default-features = false} -aws-config = { version = "1", default-features = false, features = ["rustls"]} +aws-config = { version = "1", default-features = false, features = ["rustls","rt-tokio"] } aws-credential-types = {version = "1", default-features = false} -aws-sdk-sts = "1" +azure_storage = {version = "0.21", default-features = false} futures = "0.3" -object_store = {version = "0.11", default-features = false, features = ["aws"]} +home = "0.5" +object_store = {version = "0.11", default-features = false, features = ["aws", "azure"]} once_cell = "1" parquet = {version = "53", default-features = false, features = [ "arrow", @@ -39,6 +40,7 @@ parquet = {version = "53", default-features = false, features = [ "object_store", ]} pgrx = "=0.12.9" +rust-ini = "0.21" tokio = {version = "1", default-features = false, features = ["rt", "time", "macros"]} url = "2" diff --git a/README.md b/README.md index aafc2f5..1190d42 100644 --- a/README.md +++ b/README.md @@ -156,7 +156,13 @@ SELECT uri, encode(key, 'escape') as key, encode(value, 'escape') as value FROM ``` ## Object Store Support -`pg_parquet` supports reading and writing Parquet files from/to `S3` object store. Only the uris with `s3://` scheme is supported. +`pg_parquet` supports reading and writing Parquet files from/to `S3` and `Azure Blob Storage` object stores. + +> [!NOTE] +> To be able to write into a object store location, you need to grant `parquet_object_store_write` role to your current postgres user. +> Similarly, to read from an object store location, you need to grant `parquet_object_store_read` role to your current postgres user. + +#### S3 Storage The simplest way to configure object storage is by creating the standard `~/.aws/credentials` and `~/.aws/config` files: @@ -182,10 +188,56 @@ Alternatively, you can use the following environment variables when starting pos - `AWS_PROFILE`: the name of the profile from the credentials and config file (default profile name is `default`) **(only via environment variables)** - `AWS_ALLOW_HTTP`: allows http endpoints **(only via environment variables)** +Config source priority order is shown below: +1. Environment variables, +2. Config file. -> [!NOTE] -> To be able to write into a object store location, you need to grant `parquet_object_store_write` role to your current postgres user. -> Similarly, to read from an object store location, you need to grant `parquet_object_store_read` role to your current postgres user. +Supported S3 uri formats are shown below: +- s3:// \ / \ +- https:// \.s3.amazonaws.com / \ +- https:// s3.amazonaws.com / \ / \ + +Supported authorization methods' priority order is shown below: +1. Temporary session tokens by assuming roles, +2. Long term credentials. + +#### Azure Blob Storage + +The simplest way to configure object storage is by creating the standard [`~/.azure/config`](https://learn.microsoft.com/en-us/cli/azure/azure-cli-configuration?view=azure-cli-latest) file: + +```bash +$ cat ~/.azure/config +[storage] +account = devstoreaccount1 +key = Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw== +``` + +Alternatively, you can use the following environment variables when starting postgres to configure the Azure Blob Storage client: +- `AZURE_STORAGE_ACCOUNT`: the storage account name of the Azure Blob +- `AZURE_STORAGE_KEY`: the storage key of the Azure Blob +- `AZURE_STORAGE_CONNECTION_STRING`: the connection string for the Azure Blob (overrides any other config) +- `AZURE_STORAGE_SAS_TOKEN`: the storage SAS token for the Azure Blob +- `AZURE_TENANT_ID`: the tenant id for client secret auth **(only via environment variables)** +- `AZURE_CLIENT_ID`: the client id for client secret auth **(only via environment variables)** +- `AZURE_CLIENT_SECRET`: the client secret for client secret auth **(only via environment variables)** +- `AZURE_STORAGE_ENDPOINT`: the endpoint **(only via environment variables)** +- `AZURE_CONFIG_FILE`: an alternative location for the config file **(only via environment variables)** +- `AZURE_ALLOW_HTTP`: allows http endpoints **(only via environment variables)** + +Config source priority order is shown below: +1. Connection string (read from environment variable or config file), +2. Environment variables, +3. Config file. + +Supported Azure Blob Storage uri formats are shown below: +- az:// \ / \ +- azure:// \ / \ +- https:// \.blob.core.windows.net / \ + +Supported authorization methods' priority order is shown below: +1. Bearer token via client secret, +2. Sas token, +3. Storage key. ## Copy Options `pg_parquet` supports the following options in the `COPY TO` command: diff --git a/src/arrow_parquet/parquet_reader.rs b/src/arrow_parquet/parquet_reader.rs index 6790513..b64b238 100644 --- a/src/arrow_parquet/parquet_reader.rs +++ b/src/arrow_parquet/parquet_reader.rs @@ -25,6 +25,7 @@ use crate::{ }, pgrx_utils::{collect_attributes_for, CollectAttributesFor}, type_compat::{geometry::reset_postgis_context, map::reset_map_context}, + PG_BACKEND_TOKIO_RUNTIME, }; use super::{ @@ -33,7 +34,7 @@ use super::{ schema_parser::{ ensure_file_schema_match_tupledesc_schema, parse_arrow_schema_from_attributes, }, - uri_utils::{parquet_reader_from_uri, PG_BACKEND_TOKIO_RUNTIME}, + uri_utils::parquet_reader_from_uri, }; pub(crate) struct ParquetReaderContext { diff --git a/src/arrow_parquet/parquet_writer.rs b/src/arrow_parquet/parquet_writer.rs index e93ea8b..4f5713f 100644 --- a/src/arrow_parquet/parquet_writer.rs +++ b/src/arrow_parquet/parquet_writer.rs @@ -15,10 +15,11 @@ use crate::{ schema_parser::{ parquet_schema_string_from_attributes, parse_arrow_schema_from_attributes, }, - uri_utils::{parquet_writer_from_uri, PG_BACKEND_TOKIO_RUNTIME}, + uri_utils::parquet_writer_from_uri, }, pgrx_utils::{collect_attributes_for, CollectAttributesFor}, type_compat::{geometry::reset_postgis_context, map::reset_map_context}, + PG_BACKEND_TOKIO_RUNTIME, }; use super::pg_to_arrow::{ diff --git a/src/arrow_parquet/uri_utils.rs b/src/arrow_parquet/uri_utils.rs index 2ca62b0..6eadc91 100644 --- a/src/arrow_parquet/uri_utils.rs +++ b/src/arrow_parquet/uri_utils.rs @@ -1,14 +1,6 @@ -use std::{sync::Arc, sync::LazyLock}; +use std::{panic, sync::Arc}; use arrow::datatypes::SchemaRef; -use aws_config::BehaviorVersion; -use aws_credential_types::provider::ProvideCredentials; -use object_store::{ - aws::{AmazonS3, AmazonS3Builder}, - local::LocalFileSystem, - path::Path, - ObjectStore, -}; use parquet::{ arrow::{ arrow_to_parquet_schema, @@ -23,110 +15,16 @@ use pgrx::{ ereport, pg_sys::{get_role_oid, has_privs_of_role, superuser, AsPgCStr, GetUserId}, }; -use tokio::runtime::Runtime; use url::Url; -use crate::arrow_parquet::parquet_writer::DEFAULT_ROW_GROUP_SIZE; +use crate::{ + arrow_parquet::parquet_writer::DEFAULT_ROW_GROUP_SIZE, object_store::create_object_store, + PG_BACKEND_TOKIO_RUNTIME, +}; const PARQUET_OBJECT_STORE_READ_ROLE: &str = "parquet_object_store_read"; const PARQUET_OBJECT_STORE_WRITE_ROLE: &str = "parquet_object_store_write"; -// PG_BACKEND_TOKIO_RUNTIME creates a tokio runtime that uses the current thread -// to run the tokio reactor. This uses the same thread that is running the Postgres backend. -pub(crate) static PG_BACKEND_TOKIO_RUNTIME: LazyLock = LazyLock::new(|| { - tokio::runtime::Builder::new_current_thread() - .enable_all() - .build() - .unwrap_or_else(|e| panic!("failed to create tokio runtime: {}", e)) -}); - -fn parse_bucket_and_key(uri: &Url) -> (String, String) { - debug_assert!(uri.scheme() == "s3"); - - let bucket = uri - .host_str() - .unwrap_or_else(|| panic!("bucket not found in uri: {}", uri)); - - let key = uri.path(); - - (bucket.to_string(), key.to_string()) -} - -fn object_store_with_location(uri: &Url, copy_from: bool) -> (Arc, Path) { - if uri.scheme() == "s3" { - let (bucket_name, key) = parse_bucket_and_key(uri); - - let storage_container = PG_BACKEND_TOKIO_RUNTIME - .block_on(async { Arc::new(get_s3_object_store(&bucket_name).await) }); - - let location = Path::from(key); - - (storage_container, location) - } else { - debug_assert!(uri.scheme() == "file"); - - let uri = uri_as_string(uri); - - if !copy_from { - // create or overwrite the local file - std::fs::OpenOptions::new() - .write(true) - .truncate(true) - .create(true) - .open(&uri) - .unwrap_or_else(|e| panic!("{}", e)); - } - - let storage_container = Arc::new(LocalFileSystem::new()); - - let location = Path::from_filesystem_path(&uri).unwrap_or_else(|e| panic!("{}", e)); - - (storage_container, location) - } -} - -// get_s3_object_store creates an AmazonS3 object store with the given bucket name. -// It is configured by environment variables and aws config files as fallback method. -// We need to read the config files to make the fallback method work since object_store -// does not provide a way to read them. Currently, we only support to extract -// "AWS_ACCESS_KEY_ID", "AWS_SECRET_ACCESS_KEY", "AWS_SESSION_TOKEN", "AWS_ENDPOINT_URL", -// and "AWS_REGION" from the config files. -async fn get_s3_object_store(bucket_name: &str) -> AmazonS3 { - let mut aws_s3_builder = AmazonS3Builder::from_env().with_bucket_name(bucket_name); - - // first tries environment variables and then the config files - let sdk_config = aws_config::defaults(BehaviorVersion::v2024_03_28()) - .load() - .await; - - if let Some(credential_provider) = sdk_config.credentials_provider() { - if let Ok(credentials) = credential_provider.provide_credentials().await { - // AWS_ACCESS_KEY_ID - aws_s3_builder = aws_s3_builder.with_access_key_id(credentials.access_key_id()); - - // AWS_SECRET_ACCESS_KEY - aws_s3_builder = aws_s3_builder.with_secret_access_key(credentials.secret_access_key()); - - if let Some(token) = credentials.session_token() { - // AWS_SESSION_TOKEN - aws_s3_builder = aws_s3_builder.with_token(token); - } - } - } - - // AWS_ENDPOINT_URL - if let Some(aws_endpoint_url) = sdk_config.endpoint_url() { - aws_s3_builder = aws_s3_builder.with_endpoint(aws_endpoint_url); - } - - // AWS_REGION - if let Some(aws_region) = sdk_config.region() { - aws_s3_builder = aws_s3_builder.with_region(aws_region.as_ref()); - } - - aws_s3_builder.build().unwrap_or_else(|e| panic!("{}", e)) -} - pub(crate) fn parse_uri(uri: &str) -> Url { if !uri.contains("://") { // local file @@ -134,16 +32,7 @@ pub(crate) fn parse_uri(uri: &str) -> Url { .unwrap_or_else(|_| panic!("not a valid file path: {}", uri)); } - let uri = Url::parse(uri).unwrap_or_else(|e| panic!("{}", e)); - - if uri.scheme() != "s3" { - panic!( - "unsupported uri {}. Only local files and URIs with s3:// prefix are supported.", - uri - ); - } - - uri + Url::parse(uri).unwrap_or_else(|e| panic!("{}", e)) } pub(crate) fn uri_as_string(uri: &Url) -> String { @@ -169,7 +58,7 @@ pub(crate) fn parquet_schema_from_uri(uri: &Url) -> SchemaDescriptor { pub(crate) fn parquet_metadata_from_uri(uri: &Url) -> Arc { let copy_from = true; - let (parquet_object_store, location) = object_store_with_location(uri, copy_from); + let (parquet_object_store, location) = create_object_store(uri, copy_from); PG_BACKEND_TOKIO_RUNTIME.block_on(async { let object_store_meta = parquet_object_store @@ -192,7 +81,7 @@ pub(crate) fn parquet_metadata_from_uri(uri: &Url) -> Arc { pub(crate) fn parquet_reader_from_uri(uri: &Url) -> ParquetRecordBatchStream { let copy_from = true; - let (parquet_object_store, location) = object_store_with_location(uri, copy_from); + let (parquet_object_store, location) = create_object_store(uri, copy_from); PG_BACKEND_TOKIO_RUNTIME.block_on(async { let object_store_meta = parquet_object_store @@ -224,7 +113,7 @@ pub(crate) fn parquet_writer_from_uri( writer_props: WriterProperties, ) -> AsyncArrowWriter { let copy_from = false; - let (parquet_object_store, location) = object_store_with_location(uri, copy_from); + let (parquet_object_store, location) = create_object_store(uri, copy_from); let parquet_object_writer = ParquetObjectWriter::new(parquet_object_store, location); diff --git a/src/lib.rs b/src/lib.rs index 100c80b..16aa1d9 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,8 +1,12 @@ +use std::sync::LazyLock; + use parquet_copy_hook::hook::{init_parquet_copy_hook, ENABLE_PARQUET_COPY_HOOK}; use parquet_copy_hook::pg_compat::MarkGUCPrefixReserved; use pgrx::{prelude::*, GucContext, GucFlags, GucRegistry}; +use tokio::runtime::Runtime; mod arrow_parquet; +mod object_store; mod parquet_copy_hook; mod parquet_udfs; #[cfg(any(test, feature = "pg_test"))] @@ -20,6 +24,15 @@ pgrx::pg_module_magic!(); extension_sql_file!("../sql/bootstrap.sql", name = "role_setup", bootstrap); +// PG_BACKEND_TOKIO_RUNTIME creates a tokio runtime that uses the current thread +// to run the tokio reactor. This uses the same thread that is running the Postgres backend. +pub(crate) static PG_BACKEND_TOKIO_RUNTIME: LazyLock = LazyLock::new(|| { + tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap_or_else(|e| panic!("failed to create tokio runtime: {}", e)) +}); + #[pg_guard] pub extern "C" fn _PG_init() { GucRegistry::define_bool_guc( diff --git a/src/object_store.rs b/src/object_store.rs new file mode 100644 index 0000000..8d36032 --- /dev/null +++ b/src/object_store.rs @@ -0,0 +1,56 @@ +use std::sync::Arc; + +use object_store::{path::Path, ObjectStore, ObjectStoreScheme}; +use url::Url; + +use crate::{ + arrow_parquet::uri_utils::uri_as_string, + object_store::{ + aws::create_s3_object_store, azure::create_azure_object_store, + local_file::create_local_file_object_store, + }, + PG_BACKEND_TOKIO_RUNTIME, +}; + +pub(crate) mod aws; +pub(crate) mod azure; +pub(crate) mod local_file; + +pub(crate) fn create_object_store(uri: &Url, copy_from: bool) -> (Arc, Path) { + let (scheme, path) = ObjectStoreScheme::parse(uri).unwrap_or_else(|_| { + panic!( + "unrecognized uri {}. pg_parquet supports local paths, s3:// or azure:// schemes.", + uri + ) + }); + + // object_store crate can recognize a bunch of different schemes and paths, but we only support + // local, azure, and s3 schemes with a subset of all supported paths. + match scheme { + ObjectStoreScheme::AmazonS3 => { + let storage_container = Arc::new(create_s3_object_store(uri)); + + (storage_container, path) + } + ObjectStoreScheme::MicrosoftAzure => { + let storage_container = Arc::new(create_azure_object_store(uri)); + + (storage_container, path) + } + ObjectStoreScheme::Local => { + let storage_container = Arc::new(create_local_file_object_store(uri, copy_from)); + + let path = + Path::from_filesystem_path(uri_as_string(uri)).unwrap_or_else(|e| panic!("{}", e)); + + (storage_container, path) + } + _ => { + panic!( + "unsupported scheme {} in uri {}. pg_parquet supports local paths, s3:// or azure:// schemes.", + uri.scheme(), + uri + ); + } + } +} diff --git a/src/object_store/aws.rs b/src/object_store/aws.rs new file mode 100644 index 0000000..ed6a85e --- /dev/null +++ b/src/object_store/aws.rs @@ -0,0 +1,148 @@ +use aws_config::BehaviorVersion; +use aws_credential_types::provider::ProvideCredentials; +use object_store::aws::{AmazonS3, AmazonS3Builder}; +use url::Url; + +use super::PG_BACKEND_TOKIO_RUNTIME; + +// create_s3_object_store creates an AmazonS3 object store with the given bucket name. +// It is configured by environment variables and aws config files as fallback method. +// We need to read the config files to make the fallback method work since object_store +// does not provide a way to read them. Currently, we only support following environment +// variables and config parameters: +// - AWS_ACCESS_KEY_ID +// - AWS_SECRET_ACCESS_KEY +// - AWS_SESSION_TOKEN +// - AWS_ENDPOINT_URL +// - AWS_REGION +// - AWS_SHARED_CREDENTIALS_FILE (env var only) +// - AWS_CONFIG_FILE (env var only) +// - AWS_PROFILE (env var only) +// - AWS_ALLOW_HTTP (env var only, object_store specific) +pub(crate) fn create_s3_object_store(uri: &Url) -> AmazonS3 { + let bucket_name = parse_s3_bucket(uri).unwrap_or_else(|| { + panic!("unsupported s3 uri: {}", uri); + }); + + // we do not use builder::from_env() here because not all environment variables have + // a fallback to the config files + let mut aws_s3_builder = AmazonS3Builder::new().with_bucket_name(bucket_name); + + let aws_s3_config = AwsS3Config::load(); + + // allow http + aws_s3_builder = aws_s3_builder.with_allow_http(aws_s3_config.allow_http); + + // access key id + if let Some(access_key_id) = aws_s3_config.access_key_id { + aws_s3_builder = aws_s3_builder.with_access_key_id(access_key_id); + } + + // secret access key + if let Some(secret_access_key) = aws_s3_config.secret_access_key { + aws_s3_builder = aws_s3_builder.with_secret_access_key(secret_access_key); + } + + // session token + if let Some(session_token) = aws_s3_config.session_token { + aws_s3_builder = aws_s3_builder.with_token(session_token); + } + + // endpoint url + if let Some(endpoint_url) = aws_s3_config.endpoint_url { + aws_s3_builder = aws_s3_builder.with_endpoint(endpoint_url); + } + + // region + if let Some(region) = aws_s3_config.region { + aws_s3_builder = aws_s3_builder.with_region(region); + } + + aws_s3_builder.build().unwrap_or_else(|e| panic!("{}", e)) +} + +fn parse_s3_bucket(uri: &Url) -> Option { + let host = uri.host_str()?; + + // s3(a)://{bucket}/key + if uri.scheme() == "s3" { + return Some(host.to_string()); + } + // https://s3.amazonaws.com/{bucket}/key + else if host == "s3.amazonaws.com" { + let path_segments: Vec<&str> = uri.path_segments()?.collect(); + + // Bucket name is the first part of the path + return Some( + path_segments + .first() + .expect("unexpected error during parsing s3 uri") + .to_string(), + ); + } + // https://{bucket}.s3.amazonaws.com/key + else if host.ends_with(".s3.amazonaws.com") { + let bucket_name = host.split('.').next()?; + return Some(bucket_name.to_string()); + } + + None +} + +// AwsS3Config is a struct that holds the configuration that is +// used to configure the AmazonS3 object store. object_store does +// not provide a way to read the config files, so we need to read +// them ourselves via aws sdk. +struct AwsS3Config { + region: Option, + access_key_id: Option, + secret_access_key: Option, + session_token: Option, + endpoint_url: Option, + allow_http: bool, +} + +impl AwsS3Config { + // load reads the s3 config from the environment variables first and config files as fallback. + fn load() -> Self { + let allow_http = if let Ok(allow_http) = std::env::var("AWS_ALLOW_HTTP") { + allow_http.parse().unwrap_or(false) + } else { + false + }; + + // first tries environment variables and then the config files + let sdk_config = PG_BACKEND_TOKIO_RUNTIME.block_on(async { + aws_config::defaults(BehaviorVersion::v2024_03_28()) + .load() + .await + }); + + let mut access_key_id = None; + let mut secret_access_key = None; + let mut session_token = None; + + if let Some(credential_provider) = sdk_config.credentials_provider() { + if let Ok(credentials) = PG_BACKEND_TOKIO_RUNTIME + .block_on(async { credential_provider.provide_credentials().await }) + { + access_key_id = Some(credentials.access_key_id().to_string()); + secret_access_key = Some(credentials.secret_access_key().to_string()); + session_token = credentials.session_token().map(|t| t.to_string()); + } + } + + let endpoint_url = sdk_config.endpoint_url().map(|u| u.to_string()); + + let region = sdk_config.region().map(|r| r.as_ref().to_string()); + + Self { + region, + access_key_id, + secret_access_key, + session_token, + endpoint_url, + allow_http, + } + } +} diff --git a/src/object_store/azure.rs b/src/object_store/azure.rs new file mode 100644 index 0000000..c41c854 --- /dev/null +++ b/src/object_store/azure.rs @@ -0,0 +1,232 @@ +use azure_storage::{ConnectionString, EndpointProtocol}; +use home::home_dir; +use ini::Ini; +use object_store::azure::{AzureConfigKey, MicrosoftAzure, MicrosoftAzureBuilder}; +use url::Url; + +// create_azure_object_store creates a MicrosoftAzure object store with the given container name. +// It is configured by environment variables and azure config files as fallback method. +// We need to read the config files to make the fallback method work since object_store +// does not provide a way to read them. Currently, we only support following environment +// variables and config parameters: +// - AZURE_STORAGE_ACCOUNT +// - AZURE_STORAGE_KEY +// - AZURE_STORAGE_CONNECTION_STRING +// - AZURE_STORAGE_SAS_TOKEN +// - AZURE_CONFIG_FILE (env var only, object_store specific) +// - AZURE_STORAGE_ENDPOINT (env var only, object_store specific) +// - AZURE_ALLOW_HTTP (env var only, object_store specific) +pub(crate) fn create_azure_object_store(uri: &Url) -> MicrosoftAzure { + let container_name = parse_azure_blob_container(uri).unwrap_or_else(|| { + panic!("unsupported azure blob storage uri: {}", uri); + }); + + let mut azure_builder = MicrosoftAzureBuilder::new().with_container_name(container_name); + + let azure_blob_config = AzureStorageConfig::load(); + + // allow http + azure_builder = azure_builder.with_allow_http(azure_blob_config.allow_http); + + // endpoint + if let Some(endpoint) = azure_blob_config.endpoint { + azure_builder = azure_builder.with_endpoint(endpoint); + } + + // sas token + if let Some(sas_token) = azure_blob_config.sas_token { + azure_builder = azure_builder.with_config(AzureConfigKey::SasKey, sas_token); + } + + // account name + if let Some(account_name) = azure_blob_config.account_name { + azure_builder = azure_builder.with_account(account_name); + } + + // account key + if let Some(account_key) = azure_blob_config.account_key { + azure_builder = azure_builder.with_access_key(account_key); + } + + // tenant id + if let Some(tenant_id) = azure_blob_config.tenant_id { + azure_builder = azure_builder.with_tenant_id(tenant_id); + } + + // client id + if let Some(client_id) = azure_blob_config.client_id { + azure_builder = azure_builder.with_client_id(client_id); + } + + // client secret + if let Some(client_secret) = azure_blob_config.client_secret { + azure_builder = azure_builder.with_client_secret(client_secret); + } + + azure_builder.build().unwrap_or_else(|e| panic!("{}", e)) +} + +fn parse_azure_blob_container(uri: &Url) -> Option { + let host = uri.host_str()?; + + // az(ure)://{container}/key + if uri.scheme() == "az" || uri.scheme() == "azure" { + return Some(host.to_string()); + } + // https://{account}.blob.core.windows.net/{container} + else if host.ends_with(".blob.core.windows.net") { + let path_segments: Vec<&str> = uri.path_segments()?.collect(); + + // Container name is the first part of the path + return Some( + path_segments + .first() + .expect("unexpected error during parsing azure blob uri") + .to_string(), + ); + } + + None +} + +// AzureStorageConfig is a struct that holds the configuration that is +// used to configure the Azure Blob Storage object store. object_store does +// not provide a way to read the config files, so we need to read +// them ourselves via rust-ini and azure sdk. +struct AzureStorageConfig { + account_name: Option, + account_key: Option, + sas_token: Option, + tenant_id: Option, + client_id: Option, + client_secret: Option, + endpoint: Option, + allow_http: bool, +} + +impl AzureStorageConfig { + // load reads the azure config from the environment variables first and config files as fallback. + // There is no proper azure sdk config crate that can read the config files. + // So, we need to read the config files manually from azure's ini config. + // See https://learn.microsoft.com/en-us/cli/azure/azure-cli-configuration?view=azure-cli-latest + fn load() -> Self { + // ~/.azure/config + let azure_config_file_path = std::env::var("AZURE_CONFIG_FILE").unwrap_or( + home_dir() + .expect("failed to get home directory") + .join(".azure") + .join("config") + .to_str() + .expect("failed to convert path to string") + .to_string(), + ); + + let azure_config_content = Ini::load_from_file(&azure_config_file_path).ok(); + + let connection_string = match std::env::var("AZURE_STORAGE_CONNECTION_STRING") { + Ok(connection_string) => Some(connection_string), + Err(_) => azure_config_content + .as_ref() + .and_then(|ini| ini.section(Some("storage"))) + .and_then(|section| section.get("connection_string")) + .map(|connection_string| connection_string.to_string()), + }; + + // connection string overrides everything + if let Some(connection_string) = connection_string { + if let Ok(connection_string) = ConnectionString::new(&connection_string) { + return connection_string.into(); + } + } + + let account_name = match std::env::var("AZURE_STORAGE_ACCOUNT") { + Ok(account) => Some(account), + Err(_) => azure_config_content + .as_ref() + .and_then(|ini| ini.section(Some("storage"))) + .and_then(|section| section.get("account")) + .map(|account| account.to_string()), + }; + + let account_key = match std::env::var("AZURE_STORAGE_KEY") { + Ok(key) => Some(key), + Err(_) => azure_config_content + .as_ref() + .and_then(|ini| ini.section(Some("storage"))) + .and_then(|section| section.get("key")) + .map(|key| key.to_string()), + }; + + let sas_token = match std::env::var("AZURE_STORAGE_SAS_TOKEN") { + Ok(token) => Some(token), + Err(_) => azure_config_content + .as_ref() + .and_then(|ini| ini.section(Some("storage"))) + .and_then(|section| section.get("sas_token")) + .map(|token| token.to_string()), + }; + + // endpoint, object_store specific + let endpoint = std::env::var("AZURE_STORAGE_ENDPOINT").ok(); + + // allow http, object_store specific + let allow_http = std::env::var("AZURE_ALLOW_HTTP") + .ok() + .map(|allow_http| allow_http.parse().unwrap_or(false)) + .unwrap_or(false); + + // tenant id, object_store specific + let tenant_id = std::env::var("AZURE_TENANT_ID").ok(); + + // client id, object_store specific + let client_id = std::env::var("AZURE_CLIENT_ID").ok(); + + // client secret, object_store specific + let client_secret = std::env::var("AZURE_CLIENT_SECRET").ok(); + + AzureStorageConfig { + account_name, + account_key, + sas_token, + tenant_id, + client_id, + client_secret, + endpoint, + allow_http, + } + } +} + +impl From> for AzureStorageConfig { + fn from(connection_string: ConnectionString) -> Self { + let account_name = connection_string + .account_name + .map(|account_name| account_name.to_string()); + + let account_key = connection_string + .account_key + .map(|account_key| account_key.to_string()); + + let sas_token = connection_string.sas.map(|sas| sas.to_string()); + + let endpoint = connection_string + .blob_endpoint + .map(|blob_endpoint| blob_endpoint.to_string()); + + let allow_http = matches!( + connection_string.default_endpoints_protocol, + Some(EndpointProtocol::Http) + ); + + AzureStorageConfig { + account_name, + account_key, + sas_token, + tenant_id: None, + client_id: None, + client_secret: None, + endpoint, + allow_http, + } + } +} diff --git a/src/object_store/local_file.rs b/src/object_store/local_file.rs new file mode 100644 index 0000000..938dde4 --- /dev/null +++ b/src/object_store/local_file.rs @@ -0,0 +1,21 @@ +use object_store::local::LocalFileSystem; +use url::Url; + +use super::uri_as_string; + +// create_local_file_object_store creates a LocalFileSystem object store with the given path. +pub(crate) fn create_local_file_object_store(uri: &Url, copy_from: bool) -> LocalFileSystem { + let path = uri_as_string(uri); + + if !copy_from { + // create or overwrite the local file + std::fs::OpenOptions::new() + .write(true) + .truncate(true) + .create(true) + .open(path) + .unwrap_or_else(|e| panic!("{}", e)); + } + + LocalFileSystem::new() +} diff --git a/src/pgrx_tests/object_store.rs b/src/pgrx_tests/object_store.rs index 561aab6..c5c9e83 100644 --- a/src/pgrx_tests/object_store.rs +++ b/src/pgrx_tests/object_store.rs @@ -2,26 +2,37 @@ mod tests { use std::io::Write; - use aws_config::BehaviorVersion; use pgrx::{pg_test, Spi}; use crate::pgrx_tests::common::TestTable; #[pg_test] - fn test_s3_object_store_from_env() { + fn test_s3_from_env() { let test_bucket_name: String = std::env::var("AWS_S3_TEST_BUCKET").expect("AWS_S3_TEST_BUCKET not found"); - let s3_uri = format!("s3://{}/pg_parquet_test.parquet", test_bucket_name); - - let test_table = TestTable::::new("int4".into()).with_uri(s3_uri); - - test_table.insert("INSERT INTO test_expected (a) VALUES (1), (2), (null);"); - test_table.assert_expected_and_result_rows(); + let s3_uris = [ + format!("s3://{}/pg_parquet_test.parquet", test_bucket_name), + format!( + "https://s3.amazonaws.com/{}/pg_parquet_test.parquet", + test_bucket_name + ), + format!( + "https://{}.s3.amazonaws.com/pg_parquet_test.parquet", + test_bucket_name + ), + ]; + + for s3_uri in s3_uris { + let test_table = TestTable::::new("int4".into()).with_uri(s3_uri); + + test_table.insert("INSERT INTO test_expected (a) VALUES (1), (2), (null);"); + test_table.assert_expected_and_result_rows(); + } } #[pg_test] - fn test_s3_object_store_from_config_file() { + fn test_s3_from_config_file() { let test_bucket_name: String = std::env::var("AWS_S3_TEST_BUCKET").expect("AWS_S3_TEST_BUCKET not found"); @@ -35,21 +46,26 @@ mod tests { let endpoint = std::env::var("AWS_ENDPOINT_URL").unwrap(); std::env::remove_var("AWS_ENDPOINT_URL"); + let profile = "pg_parquet_test"; + // create a config file let aws_config_file_content = format!( - "[profile pg_parquet_test]\nregion = {}\naws_access_key_id = {}\naws_secret_access_key = {}\nendpoint_url = {}\n", - region, access_key_id, secret_access_key, endpoint + "[profile {profile}]\n\ + region={region}\n\ + aws_access_key_id={access_key_id}\n\ + aws_secret_access_key={secret_access_key}\n\ + endpoint_url={endpoint}\n" ); - std::env::set_var("AWS_PROFILE", "pg_parquet_test"); + std::env::set_var("AWS_PROFILE", profile); - let aws_config_file = "/tmp/aws_config"; - std::env::set_var("AWS_CONFIG_FILE", aws_config_file); + let aws_config_file_path = "/tmp/pg_parquet_aws_config"; + std::env::set_var("AWS_CONFIG_FILE", aws_config_file_path); let mut aws_config_file = std::fs::OpenOptions::new() .write(true) .truncate(true) .create(true) - .open(aws_config_file) + .open(aws_config_file_path) .unwrap(); aws_config_file @@ -62,6 +78,41 @@ mod tests { test_table.insert("INSERT INTO test_expected (a) VALUES (1), (2), (null);"); test_table.assert_expected_and_result_rows(); + + // remove the config file + std::fs::remove_file(aws_config_file_path).unwrap(); + } + + #[pg_test] + #[should_panic(expected = "403 Forbidden")] + fn test_s3_wrong_access_key_id() { + std::env::set_var("AWS_ACCESS_KEY_ID", "wrong_access_key_id"); + + let test_bucket_name: String = + std::env::var("AWS_S3_TEST_BUCKET").expect("AWS_S3_TEST_BUCKET not found"); + + let s3_uri = format!("s3://{}/pg_parquet_test.parquet", test_bucket_name); + + let test_table = TestTable::::new("int4".into()).with_uri(s3_uri); + + test_table.insert("INSERT INTO test_expected (a) VALUES (1), (2), (null);"); + test_table.assert_expected_and_result_rows(); + } + + #[pg_test] + #[should_panic(expected = "403 Forbidden")] + fn test_s3_wrong_secret_access_key() { + std::env::set_var("AWS_SECRET_ACCESS_KEY", "wrong_secret_access_key"); + + let test_bucket_name: String = + std::env::var("AWS_S3_TEST_BUCKET").expect("AWS_S3_TEST_BUCKET not found"); + + let s3_uri = format!("s3://{}/pg_parquet_test.parquet", test_bucket_name); + + let test_table = TestTable::::new("int4".into()).with_uri(s3_uri); + + test_table.insert("INSERT INTO test_expected (a) VALUES (1), (2), (null);"); + test_table.assert_expected_and_result_rows(); } #[pg_test] @@ -146,7 +197,7 @@ mod tests { #[pg_test] #[should_panic(expected = "404 Not Found")] - fn test_s3_object_store_write_invalid_uri() { + fn test_s3_write_wrong_bucket() { let s3_uri = "s3://randombucketwhichdoesnotexist/pg_parquet_test.parquet"; let copy_to_command = format!( @@ -157,86 +208,359 @@ mod tests { } #[pg_test] - fn test_s3_object_store_with_temporary_token() { - let tokio_rt = tokio::runtime::Builder::new_current_thread() - .enable_all() - .build() - .unwrap_or_else(|e| panic!("failed to create tokio runtime: {}", e)); + #[should_panic(expected = "404 Not Found")] + fn test_s3_read_wrong_bucket() { + let s3_uri = "s3://randombucketwhichdoesnotexist/pg_parquet_test.parquet"; - let s3_uri = tokio_rt.block_on(async { - let config = aws_config::load_defaults(BehaviorVersion::v2024_03_28()).await; - let client = aws_sdk_sts::Client::new(&config); + let create_table_command = "CREATE TABLE test_table (a int);"; + Spi::run(create_table_command).unwrap(); - let assume_role_result = client - .assume_role() - .role_session_name("testsession") - .role_arn("arn:xxx:xxx:xxx:xxxx") - .send() - .await - .unwrap(); + let copy_from_command = format!("COPY test_table FROM '{}';", s3_uri); + Spi::run(copy_from_command.as_str()).unwrap(); + } - let assumed_creds = assume_role_result.credentials().unwrap(); + #[pg_test] + fn test_s3_temporary_token() { + let test_bucket_name: String = + std::env::var("AWS_S3_TEST_BUCKET").expect("AWS_S3_TEST_BUCKET not found"); - std::env::set_var("AWS_ACCESS_KEY_ID", assumed_creds.access_key_id()); - std::env::set_var("AWS_SECRET_ACCESS_KEY", assumed_creds.secret_access_key()); - std::env::set_var("AWS_SESSION_TOKEN", assumed_creds.session_token()); + // remove these to make sure the config file is used + let access_key_id = std::env::var("AWS_ACCESS_KEY_ID").unwrap(); + std::env::remove_var("AWS_ACCESS_KEY_ID"); + let secret_access_key = std::env::var("AWS_SECRET_ACCESS_KEY").unwrap(); + std::env::remove_var("AWS_SECRET_ACCESS_KEY"); + let region = std::env::var("AWS_REGION").unwrap(); + std::env::remove_var("AWS_REGION"); + let endpoint = std::env::var("AWS_ENDPOINT_URL").unwrap(); + std::env::remove_var("AWS_ENDPOINT_URL"); + + let profile = "pg_parquet_test"; + + // create a config file + let aws_config_file_content = format!( + "[profile {profile}-source]\n\ + aws_access_key_id={access_key_id}\n\ + aws_secret_access_key={secret_access_key}\n\ + \n\ + [profile {profile}]\n\ + region={region}\n\ + source_profile={profile}-source\n\ + role_arn=arn:aws:iam::123456789012:dummy\n\ + endpoint_url={endpoint}\n" + ); + std::env::set_var("AWS_PROFILE", profile); + + let aws_config_file_path = "/tmp/pg_parquet_aws_config"; + std::env::set_var("AWS_CONFIG_FILE", aws_config_file_path); + + let mut aws_config_file = std::fs::OpenOptions::new() + .write(true) + .truncate(true) + .create(true) + .open(aws_config_file_path) + .unwrap(); - let test_bucket_name: String = - std::env::var("AWS_S3_TEST_BUCKET").expect("AWS_S3_TEST_BUCKET not found"); + aws_config_file + .write_all(aws_config_file_content.as_bytes()) + .unwrap(); - format!("s3://{}/pg_parquet_test.parquet", test_bucket_name) - }); + let s3_uri = format!("s3://{}/pg_parquet_test.parquet", test_bucket_name); let test_table = TestTable::::new("int4".into()).with_uri(s3_uri); test_table.insert("INSERT INTO test_expected (a) VALUES (1), (2), (null);"); test_table.assert_expected_and_result_rows(); + + // remove the config file + std::fs::remove_file(aws_config_file_path).unwrap(); + } + + #[pg_test] + #[should_panic(expected = "unsupported s3 uri")] + fn test_s3_unsupported_uri() { + let cloudflare_s3_uri = "https://ACCOUNT_ID.r2.cloudflarestorage.com/bucket".into(); + + let test_table = TestTable::::new("int4".into()).with_uri(cloudflare_s3_uri); + + test_table.insert("INSERT INTO test_expected (a) VALUES (1), (2), (null);"); + test_table.assert_expected_and_result_rows(); + } + + #[pg_test] + fn test_azure_blob_from_env() { + // unset AZURE_STORAGE_CONNECTION_STRING to make sure the account name and key are used + std::env::remove_var("AZURE_STORAGE_CONNECTION_STRING"); + + let test_container_name: String = std::env::var("AZURE_TEST_CONTAINER_NAME") + .expect("AZURE_TEST_CONTAINER_NAME not found"); + + let test_account_name: String = + std::env::var("AZURE_STORAGE_ACCOUNT").expect("AZURE_STORAGE_ACCOUNT not found"); + + let azure_blob_uris = [ + format!("az://{}/pg_parquet_test.parquet", test_container_name), + format!("azure://{}/pg_parquet_test.parquet", test_container_name), + format!( + "https://{}.blob.core.windows.net/{}", + test_account_name, test_container_name + ), + ]; + + for azure_blob_uri in azure_blob_uris { + let test_table = TestTable::::new("int4".into()).with_uri(azure_blob_uri); + + test_table.insert("INSERT INTO test_expected (a) VALUES (1), (2), (null);"); + test_table.assert_expected_and_result_rows(); + } + } + + #[pg_test] + fn test_azure_from_config_file() { + let test_container_name: String = std::env::var("AZURE_TEST_CONTAINER_NAME") + .expect("AZURE_TEST_CONTAINER_NAME not found"); + + // remove these to make sure the config file is used + let account_name = std::env::var("AZURE_STORAGE_ACCOUNT").unwrap(); + std::env::remove_var("AZURE_STORAGE_ACCOUNT"); + let account_key = std::env::var("AZURE_STORAGE_KEY").unwrap(); + std::env::remove_var("AZURE_STORAGE_KEY"); + std::env::remove_var("AZURE_STORAGE_CONNECTION_STRING"); + + // create a config file + let azure_config_file_content = format!( + "[storage]\n\ + account={account_name}\n\ + key={account_key}" + ); + + let azure_config_file_path = "/tmp/pg_parquet_azure_config"; + std::env::set_var("AZURE_CONFIG_FILE", azure_config_file_path); + + let mut azure_config_file = std::fs::OpenOptions::new() + .write(true) + .truncate(true) + .create(true) + .open(azure_config_file_path) + .unwrap(); + + azure_config_file + .write_all(azure_config_file_content.as_bytes()) + .unwrap(); + + let azure_blob_uri = format!("az://{}/pg_parquet_test.parquet", test_container_name); + + let test_table = TestTable::::new("int4".into()).with_uri(azure_blob_uri); + + test_table.insert("INSERT INTO test_expected (a) VALUES (1), (2), (null);"); + test_table.assert_expected_and_result_rows(); + + // remove the config file + std::fs::remove_file(azure_config_file_path).unwrap(); + } + + #[pg_test] + fn test_azure_from_env_via_connection_string() { + // unset AZURE_STORAGE_ACCOUNT AND AZURE_STORAGE_KEY to make sure the connection string is used + std::env::remove_var("AZURE_STORAGE_ACCOUNT"); + std::env::remove_var("AZURE_STORAGE_KEY"); + + let test_container_name: String = std::env::var("AZURE_TEST_CONTAINER_NAME") + .expect("AZURE_TEST_CONTAINER_NAME not found"); + + let azure_blob_uri = format!("az://{}/pg_parquet_test.parquet", test_container_name); + + let test_table = TestTable::::new("int4".into()).with_uri(azure_blob_uri); + + test_table.insert("INSERT INTO test_expected (a) VALUES (1), (2), (null);"); + test_table.assert_expected_and_result_rows(); + } + + #[pg_test] + fn test_azure_from_config_via_connection_string() { + let test_container_name: String = std::env::var("AZURE_TEST_CONTAINER_NAME") + .expect("AZURE_TEST_CONTAINER_NAME not found"); + + // remove these to make sure the config file is used + std::env::remove_var("AZURE_STORAGE_ACCOUNT"); + std::env::remove_var("AZURE_STORAGE_KEY"); + let connection_string = std::env::var("AZURE_STORAGE_CONNECTION_STRING").unwrap(); + std::env::remove_var("AZURE_STORAGE_CONNECTION_STRING"); + + // create a config file + let azure_config_file_content = + format!("[storage]\nconnection_string = {connection_string}\n"); + + let azure_config_file_path = "/tmp/pg_parquet_azure_config"; + std::env::set_var("AZURE_CONFIG_FILE", azure_config_file_path); + + let mut azure_config_file = std::fs::OpenOptions::new() + .write(true) + .truncate(true) + .create(true) + .open(azure_config_file_path) + .unwrap(); + + azure_config_file + .write_all(azure_config_file_content.as_bytes()) + .unwrap(); + + let azure_blob_uri = format!("az://{}/pg_parquet_test.parquet", test_container_name); + + let test_table = TestTable::::new("int4".into()).with_uri(azure_blob_uri); + + test_table.insert("INSERT INTO test_expected (a) VALUES (1), (2), (null);"); + test_table.assert_expected_and_result_rows(); + + // remove the config file + std::fs::remove_file(azure_config_file_path).unwrap(); + } + + #[pg_test] + #[should_panic(expected = "Account must be specified")] + fn test_azure_no_storage_account() { + // unset AZURE_STORAGE_CONNECTION_STRING to make sure the account name and key are used + std::env::remove_var("AZURE_STORAGE_CONNECTION_STRING"); + + std::env::remove_var("AZURE_STORAGE_ACCOUNT"); + + let test_container_name: String = std::env::var("AZURE_TEST_CONTAINER_NAME") + .expect("AZURE_TEST_CONTAINER_NAME not found"); + + let azure_blob_uri = format!("az://{}/pg_parquet_test.parquet", test_container_name); + + let test_table = TestTable::::new("int4".into()).with_uri(azure_blob_uri); + + test_table.insert("INSERT INTO test_expected (a) VALUES (1), (2), (null);"); + test_table.assert_expected_and_result_rows(); + } + + #[pg_test] + #[should_panic(expected = "403 Forbidden")] + fn test_azure_wrong_storage_key() { + // unset AZURE_STORAGE_CONNECTION_STRING to make sure the account name and key are used + std::env::remove_var("AZURE_STORAGE_CONNECTION_STRING"); + + let wrong_account_key = String::from("FFy8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw=="); + std::env::set_var("AZURE_STORAGE_KEY", wrong_account_key); + + let test_container_name: String = std::env::var("AZURE_TEST_CONTAINER_NAME") + .expect("AZURE_TEST_CONTAINER_NAME not found"); + + let test_account_name: String = + std::env::var("AZURE_STORAGE_ACCOUNT").expect("AZURE_STORAGE_ACCOUNT not found"); + + let azure_blob_uri = format!( + "https://{}.blob.core.windows.net/{}", + test_account_name, test_container_name + ); + + let test_table = TestTable::::new("int4".into()).with_uri(azure_blob_uri); + + test_table.insert("INSERT INTO test_expected (a) VALUES (1), (2), (null);"); + test_table.assert_expected_and_result_rows(); + } + + #[pg_test] + #[should_panic(expected = "404 Not Found")] + fn test_azure_write_wrong_container() { + let test_account_name: String = + std::env::var("AZURE_STORAGE_ACCOUNT").expect("AZURE_STORAGE_ACCOUNT not found"); + + let azure_blob_uri = format!( + "https://{}.blob.core.windows.net/nonexistentcontainer", + test_account_name + ); + + let copy_to_command = format!( + "COPY (SELECT i FROM generate_series(1,10) i) TO '{}' WITH (format parquet);;", + azure_blob_uri + ); + Spi::run(copy_to_command.as_str()).unwrap(); + } + + #[pg_test] + fn test_azure_read_write_sas() { + let test_container_name: String = std::env::var("AZURE_TEST_CONTAINER_NAME") + .expect("AZURE_TEST_CONTAINER_NAME not found"); + + let test_account_name: String = + std::env::var("AZURE_STORAGE_ACCOUNT").expect("AZURE_STORAGE_ACCOUNT not found"); + + let read_write_sas_token = std::env::var("AZURE_TEST_READ_WRITE_SAS") + .expect("AZURE_TEST_READ_WRITE_SAS not found"); + + // remove account key and connection string to make sure the sas token is used + std::env::remove_var("AZURE_STORAGE_KEY"); + std::env::remove_var("AZURE_STORAGE_CONNECTION_STRING"); + std::env::set_var("AZURE_STORAGE_SAS_TOKEN", read_write_sas_token); + + let azure_blob_uri = format!( + "https://{}.blob.core.windows.net/{}", + test_account_name, test_container_name + ); + + let copy_to_command = format!( + "COPY (SELECT i FROM generate_series(1,10) i) TO '{}' WITH (format parquet);;", + azure_blob_uri + ); + Spi::run(copy_to_command.as_str()).unwrap(); } #[pg_test] #[should_panic(expected = "403 Forbidden")] - fn test_s3_object_store_with_missing_temporary_token_fail() { - let tokio_rt = tokio::runtime::Builder::new_current_thread() - .enable_all() - .build() - .unwrap_or_else(|e| panic!("failed to create tokio runtime: {}", e)); + fn test_azure_read_only_sas() { + let test_container_name: String = std::env::var("AZURE_TEST_CONTAINER_NAME") + .expect("AZURE_TEST_CONTAINER_NAME not found"); - let s3_uri = tokio_rt.block_on(async { - let config = aws_config::load_defaults(BehaviorVersion::v2024_03_28()).await; - let client = aws_sdk_sts::Client::new(&config); + let test_account_name: String = + std::env::var("AZURE_STORAGE_ACCOUNT").expect("AZURE_STORAGE_ACCOUNT not found"); - let assume_role_result = client - .assume_role() - .role_session_name("testsession") - .role_arn("arn:xxx:xxx:xxx:xxxx") - .send() - .await - .unwrap(); + let read_only_sas_token: String = + std::env::var("AZURE_TEST_READ_ONLY_SAS").expect("AZURE_TEST_READ_ONLY_SAS not found"); - let assumed_creds = assume_role_result.credentials().unwrap(); + // remove account key and connection string to make sure the sas token is used + std::env::remove_var("AZURE_STORAGE_KEY"); + std::env::remove_var("AZURE_STORAGE_CONNECTION_STRING"); + std::env::set_var("AZURE_STORAGE_SAS_TOKEN", read_only_sas_token); - // we do not set the session token on purpose - std::env::set_var("AWS_ACCESS_KEY_ID", assumed_creds.access_key_id()); - std::env::set_var("AWS_SECRET_ACCESS_KEY", assumed_creds.secret_access_key()); + let azure_blob_uri = format!( + "https://{}.blob.core.windows.net/{}", + test_account_name, test_container_name + ); - let test_bucket_name: String = - std::env::var("AWS_S3_TEST_BUCKET").expect("AWS_S3_TEST_BUCKET not found"); + let copy_to_command = format!( + "COPY (SELECT i FROM generate_series(1,10) i) TO '{}' WITH (format parquet);", + azure_blob_uri + ); + Spi::run(copy_to_command.as_str()).unwrap(); + } - format!("s3://{}/pg_parquet_test.parquet", test_bucket_name) - }); + #[pg_test] + #[should_panic(expected = "unsupported azure blob storage uri")] + fn test_azure_unsupported_uri() { + let fabric_azure_blob_uri = "https://ACCOUNT.dfs.fabric.microsoft.com".into(); - let test_table = TestTable::::new("int4".into()).with_uri(s3_uri); + let test_table = TestTable::::new("int4".into()).with_uri(fabric_azure_blob_uri); test_table.insert("INSERT INTO test_expected (a) VALUES (1), (2), (null);"); test_table.assert_expected_and_result_rows(); } #[pg_test] - #[should_panic(expected = "unsupported uri gs://testbucket")] + #[should_panic(expected = "unsupported scheme gs in uri gs://testbucket")] fn test_unsupported_uri() { let test_table = TestTable::::new("int4".into()).with_uri("gs://testbucket".to_string()); test_table.insert("INSERT INTO test_expected (a) VALUES (1), (2), (null);"); test_table.assert_expected_and_result_rows(); } + + #[pg_test] + #[should_panic(expected = "unrecognized uri dummy://testbucket")] + fn test_unrecognized_uri() { + let test_table = + TestTable::::new("int4".into()).with_uri("dummy://testbucket".to_string()); + test_table.insert("INSERT INTO test_expected (a) VALUES (1), (2), (null);"); + test_table.assert_expected_and_result_rows(); + } }