diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMergeSink.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMergeSink.java index 66c8134caef3..74222856209e 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMergeSink.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMergeSink.java @@ -131,6 +131,7 @@ public class DeltaLakeMergeSink private final boolean deletionVectorEnabled; private final Map deletionVectors; private final int randomPrefixLength; + private final Optional shallowCloneSourceTableLocation; @Nullable private DeltaLakeCdfPageSink cdfPageSink; @@ -155,7 +156,8 @@ public DeltaLakeMergeSink( FileFormatDataSourceStats fileFormatDataSourceStats, boolean deletionVectorEnabled, Map deletionVectors, - int randomPrefixLength) + int randomPrefixLength, + Optional shallowCloneSourceTableLocation) { this.typeOperators = requireNonNull(typeOperators, "typeOperators is null"); this.session = requireNonNull(session, "session is null"); @@ -184,6 +186,8 @@ public DeltaLakeMergeSink( this.deletionVectorEnabled = deletionVectorEnabled; this.deletionVectors = ImmutableMap.copyOf(requireNonNull(deletionVectors, "deletionVectors is null")); this.randomPrefixLength = randomPrefixLength; + this.shallowCloneSourceTableLocation = requireNonNull(shallowCloneSourceTableLocation, "shallowCloneSourceTableLocation is null"); + dataColumnsIndices = new int[tableColumnCount]; dataAndRowIdColumnsIndices = new int[tableColumnCount + 1]; for (int i = 0; i < tableColumnCount; i++) { @@ -407,8 +411,8 @@ private Slice writeDeletionVector( long rowCount) { String tablePath = rootTableLocation.toString(); - String sourceRelativePath = relativePath(tablePath, sourcePath); - DeletionVectorEntry oldDeletionVector = deletionVectors.get(sourceRelativePath); + String sourceReferencePath = getReferencedPath(tablePath, sourcePath); + DeletionVectorEntry oldDeletionVector = deletionVectors.get(sourceReferencePath); DeletionVectorEntry deletionVectorEntry; try { @@ -420,14 +424,14 @@ private Slice writeDeletionVector( try { DataFileInfo newFileInfo = new DataFileInfo( - sourceRelativePath, + sourceReferencePath, length, lastModified.toEpochMilli(), DATA, deletion.partitionValues, readStatistics(parquetMetadata, dataColumns, rowCount), Optional.of(deletionVectorEntry)); - DeltaLakeMergeResult result = new DeltaLakeMergeResult(deletion.partitionValues, Optional.of(sourceRelativePath), Optional.ofNullable(oldDeletionVector), Optional.of(newFileInfo)); + DeltaLakeMergeResult result = new DeltaLakeMergeResult(deletion.partitionValues, Optional.of(sourceReferencePath), Optional.ofNullable(oldDeletionVector), Optional.of(newFileInfo)); return utf8Slice(mergeResultJsonCodec.toJson(result)); } catch (Throwable e) { @@ -445,9 +449,9 @@ private Slice writeDeletionVector( private Slice onlySourceFile(String sourcePath, FileDeletion deletion) { - String sourceRelativePath = relativePath(rootTableLocation.toString(), sourcePath); - DeletionVectorEntry deletionVector = deletionVectors.get(sourceRelativePath); - DeltaLakeMergeResult result = new DeltaLakeMergeResult(deletion.partitionValues(), Optional.of(sourceRelativePath), Optional.ofNullable(deletionVector), Optional.empty()); + String sourceReferencePath = getReferencedPath(rootTableLocation.toString(), sourcePath); + DeletionVectorEntry deletionVector = deletionVectors.get(sourceReferencePath); + DeltaLakeMergeResult result = new DeltaLakeMergeResult(deletion.partitionValues(), Optional.of(sourceReferencePath), Optional.ofNullable(deletionVector), Optional.empty()); return utf8Slice(mergeResultJsonCodec.toJson(result)); } @@ -457,16 +461,16 @@ private List rewriteFile(String sourcePath, FileDeletion deletion) try { String tablePath = rootTableLocation.toString(); Location sourceLocation = Location.of(sourcePath); - String sourceRelativePath = relativePath(tablePath, sourcePath); + String sourceReferencePath = getReferencedPath(tablePath, sourcePath); Location targetLocation = sourceLocation.sibling(session.getQueryId() + "_" + randomUUID()); - String targetRelativePath = relativePath(tablePath, targetLocation.toString()); + String targetReferencePath = getReferencedPath(tablePath, targetLocation.toString()); ParquetFileWriter fileWriter = createParquetFileWriter(targetLocation, dataColumns); DeltaLakeWriter writer = new DeltaLakeWriter( fileWriter, rootTableLocation, - targetRelativePath, + targetReferencePath, deletion.partitionValues(), writerStats, dataColumns, @@ -474,7 +478,7 @@ private List rewriteFile(String sourcePath, FileDeletion deletion) Optional newFileInfo = rewriteParquetFile(sourceLocation, deletion, writer); - DeltaLakeMergeResult result = new DeltaLakeMergeResult(deletion.partitionValues(), Optional.of(sourceRelativePath), Optional.empty(), newFileInfo); + DeltaLakeMergeResult result = new DeltaLakeMergeResult(deletion.partitionValues(), Optional.of(sourceReferencePath), Optional.empty(), newFileInfo); return ImmutableList.of(utf8Slice(mergeResultJsonCodec.toJson(result))); } catch (IOException e) { @@ -524,8 +528,8 @@ private ParquetFileWriter createParquetFileWriter(Location path, List deletionVectors) + Map deletionVectors, + Optional shallowCloneSourceTableLocation) implements ConnectorMergeTableHandle { public DeltaLakeMergeTableHandle @@ -33,6 +35,7 @@ public record DeltaLakeMergeTableHandle( requireNonNull(tableHandle, "tableHandle is null"); requireNonNull(insertTableHandle, "insertTableHandle is null"); deletionVectors = ImmutableMap.copyOf(requireNonNull(deletionVectors, "deletionVectors is null")); + requireNonNull(shallowCloneSourceTableLocation, "shallowCloneSourceTableLocation is null"); } @Override diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java index ab729bea2793..f1a835197975 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java @@ -69,14 +69,17 @@ import io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport; import io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.ColumnMappingMode; import io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.UnsupportedTypeException; +import io.trino.plugin.deltalake.transactionlog.DeltaLakeTransactionLogEntry; import io.trino.plugin.deltalake.transactionlog.MetadataEntry; import io.trino.plugin.deltalake.transactionlog.ProtocolEntry; import io.trino.plugin.deltalake.transactionlog.RemoveFileEntry; import io.trino.plugin.deltalake.transactionlog.TableSnapshot; +import io.trino.plugin.deltalake.transactionlog.Transaction; import io.trino.plugin.deltalake.transactionlog.TransactionLogAccess; import io.trino.plugin.deltalake.transactionlog.TransactionLogEntries; import io.trino.plugin.deltalake.transactionlog.checkpoint.CheckpointWriterManager; import io.trino.plugin.deltalake.transactionlog.checkpoint.MetadataAndProtocolEntries; +import io.trino.plugin.deltalake.transactionlog.checkpoint.TransactionLogTail; import io.trino.plugin.deltalake.transactionlog.statistics.DeltaLakeFileStatistics; import io.trino.plugin.deltalake.transactionlog.statistics.DeltaLakeJsonFileStatistics; import io.trino.plugin.deltalake.transactionlog.writer.TransactionConflictException; @@ -292,6 +295,7 @@ import static io.trino.plugin.deltalake.transactionlog.TransactionLogUtil.getTransactionLogDir; import static io.trino.plugin.deltalake.transactionlog.TransactionLogUtil.getTransactionLogJsonEntryPath; import static io.trino.plugin.deltalake.transactionlog.checkpoint.TransactionLogTail.getEntriesFromJson; +import static io.trino.plugin.deltalake.transactionlog.checkpoint.TransactionLogTail.loadNewTail; import static io.trino.plugin.hive.HiveMetadata.TRINO_QUERY_ID_NAME; import static io.trino.plugin.hive.TableType.EXTERNAL_TABLE; import static io.trino.plugin.hive.TableType.MANAGED_TABLE; @@ -2064,6 +2068,18 @@ private void appendTableEntries( private static void appendAddFileEntries(TransactionLogWriter transactionLogWriter, List dataFileInfos, List partitionColumnNames, List originalColumnNames, boolean dataChange) throws JsonProcessingException + { + appendAddFileEntries(transactionLogWriter, dataFileInfos, partitionColumnNames, originalColumnNames, dataChange, Optional.empty()); + } + + private static void appendAddFileEntries( + TransactionLogWriter transactionLogWriter, + List dataFileInfos, + List partitionColumnNames, + List originalColumnNames, + boolean dataChange, + Optional cloneSourceLocation) + throws JsonProcessingException { Map toOriginalColumnNames = originalColumnNames.stream() .collect(toImmutableMap(name -> name.toLowerCase(ENGLISH), identity())); @@ -2081,9 +2097,13 @@ private static void appendAddFileEntries(TransactionLogWriter transactionLogWrit partitionValues = unmodifiableMap(partitionValues); + String path = cloneSourceLocation.isPresent() && info.path().startsWith(cloneSourceLocation.get()) + ? info.path() + : toUriFormat(info.path()); // Paths are RFC 2396 URI encoded https://github.com/delta-io/delta/blob/master/PROTOCOL.md#add-file-and-remove-file + transactionLogWriter.appendAddFileEntry( new AddFileEntry( - toUriFormat(info.path()), // Paths are RFC 2396 URI encoded https://github.com/delta-io/delta/blob/master/PROTOCOL.md#add-file-and-remove-file + path, partitionValues, info.size(), info.creationTime(), @@ -2448,7 +2468,52 @@ public ConnectorMergeTableHandle beginMerge(ConnectorSession session, ConnectorT DeltaLakeInsertTableHandle insertHandle = createInsertHandle(retryMode, handle, inputColumns); Map deletionVectors = loadDeletionVectors(session, handle); - return new DeltaLakeMergeTableHandle(handle, insertHandle, deletionVectors); + return new DeltaLakeMergeTableHandle(handle, insertHandle, deletionVectors, shallowCloneSourceTableLocation(session, handle)); + } + + private Optional shallowCloneSourceTableLocation(ConnectorSession session, DeltaLakeTableHandle handle) + { + TrinoFileSystem fileSystem = fileSystemFactory.create(session); + String sourceTableName = ""; + try { + TransactionLogTail transactionLogTail = loadNewTail(fileSystem, handle.getLocation(), Optional.empty(), Optional.of(0L), DataSize.ofBytes(0)); + List transactions = transactionLogTail.getTransactions(); + for (Transaction transaction : transactions) { + Optional cloneCommit = transaction.transactionEntries().getEntries(fileSystem) + .map(DeltaLakeTransactionLogEntry::getCommitInfo) + .filter(Objects::nonNull) + .filter(commitInfoEntry -> commitInfoEntry.operation().equals("CLONE")) + .findFirst(); + if (cloneCommit.isEmpty()) { + return Optional.empty(); + } + + // It's the cloned table + Map operationParameters = cloneCommit.get().operationParameters(); + if (!operationParameters.containsKey("source")) { + return Optional.empty(); + } + + sourceTableName = operationParameters.get("source"); + checkArgument(sourceTableName != null && sourceTableName.contains(".") && sourceTableName.split("\\.").length == 3, "Unexpected source table in operation_parameters: %s", sourceTableName); + } + } + catch (IOException e) { + throw new RuntimeException(e); + } + + String[] names = sourceTableName.split("\\."); + DeltaLakeTableHandle tableHandle = (DeltaLakeTableHandle) getTableHandle(session, new SchemaTableName(names[1], names[2]), Optional.empty(), Optional.empty()); + if (tableHandle == null) { + return Optional.empty(); + } + + String tableLocation = tableHandle.getLocation(); + if (!tableLocation.endsWith("/")) { + tableLocation += "/"; + } + + return Optional.of(tableLocation); } private Map loadDeletionVectors(ConnectorSession session, DeltaLakeTableHandle handle) @@ -2577,14 +2642,22 @@ private long commitMergeOperation( appendCdcFilesInfos(transactionLogWriter, cdcFiles, partitionColumns); } + Optional cloneSourceTableLocation = mergeHandle.shallowCloneSourceTableLocation(); for (DeltaLakeMergeResult mergeResult : mergeResults) { if (mergeResult.oldFile().isEmpty()) { continue; } - transactionLogWriter.appendRemoveFileEntry(new RemoveFileEntry(toUriFormat(mergeResult.oldFile().get()), createPartitionValuesMap(partitionColumns, mergeResult.partitionValues()), writeTimestamp, true, mergeResult.oldDeletionVector())); + + String oldFile = mergeResult.oldFile().get(); + if (cloneSourceTableLocation.isPresent() && oldFile.startsWith(cloneSourceTableLocation.get())) { + transactionLogWriter.appendRemoveFileEntry(new RemoveFileEntry(oldFile, createPartitionValuesMap(partitionColumns, mergeResult.partitionValues()), writeTimestamp, true, mergeResult.oldDeletionVector())); + } + else { + transactionLogWriter.appendRemoveFileEntry(new RemoveFileEntry(toUriFormat(oldFile), createPartitionValuesMap(partitionColumns, mergeResult.partitionValues()), writeTimestamp, true, mergeResult.oldDeletionVector())); + } } - appendAddFileEntries(transactionLogWriter, newFiles, partitionColumns, getExactColumnNames(handle.getMetadataEntry()), true); + appendAddFileEntries(transactionLogWriter, newFiles, partitionColumns, getExactColumnNames(handle.getMetadataEntry()), true, cloneSourceTableLocation); transactionLogWriter.flush(); return commitVersion; diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakePageSinkProvider.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakePageSinkProvider.java index f84ee23de32f..42ee2c08f89a 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakePageSinkProvider.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakePageSinkProvider.java @@ -201,7 +201,8 @@ public ConnectorMergeSink createMergeSink(ConnectorTransactionHandle transaction fileFormatDataSourceStats, isDeletionVectorEnabled(tableHandle.metadataEntry(), tableHandle.protocolEntry()), merge.deletionVectors(), - getRandomPrefixLength(tableHandle.metadataEntry())); + getRandomPrefixLength(tableHandle.metadataEntry()), + merge.shallowCloneSourceTableLocation()); } private DeltaLakeCdfPageSink createCdfPageSink( diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeFileOperations.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeFileOperations.java index 98230e78aa5d..08c613b385aa 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeFileOperations.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeFileOperations.java @@ -631,12 +631,16 @@ public void testDeleteWithNonPartitionFilter() ImmutableMultiset.builder() .add(new FileOperation(LAST_CHECKPOINT, "_last_checkpoint", "InputFile.newStream")) .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000000.json", "InputFile.newStream")) + // One more newStream for check the table if is a cloned table + .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000000.json", "InputFile.newStream")) .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000001.json", "InputFile.newStream")) .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000002.json", "InputFile.newStream")) .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000003.json", "InputFile.newStream")) .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000004.json", "InputFile.exists")) .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000004.json", "OutputFile.createOrOverwrite")) .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000000.json", "InputFile.length")) + // One more length for check the table if is a cloned table + .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000000.json", "InputFile.length")) .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000001.json", "InputFile.length")) .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000002.json", "InputFile.length")) .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000003.json", "InputFile.length")) diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeCloneTableCompatibility.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeCloneTableCompatibility.java index d773c66a6547..3275b1eea54b 100644 --- a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeCloneTableCompatibility.java +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeCloneTableCompatibility.java @@ -22,6 +22,7 @@ import io.trino.testng.services.Flaky; import org.testng.annotations.Test; +import java.sql.Date; import java.util.List; import static com.google.common.collect.ImmutableList.toImmutableList; @@ -250,6 +251,75 @@ public void testReadFromSchemaChangedDeepCloneTable() testReadSchemaChangedCloneTable("DEEP", false); } + @Test(groups = {DELTA_LAKE_OSS, PROFILE_SPECIFIC_TESTS}) + public void testShallowCloneTableMerge() + { + String baseTable = "test_dl_base_table_" + randomNameSuffix(); + String clonedTable = "test_dl_clone_tableV1_" + randomNameSuffix(); + String directoryName = "databricks-merge-clone-compatibility-test-"; + try { + onDelta().executeQuery("CREATE TABLE default." + baseTable + + " (id INT, v STRING, part DATE) USING delta " + + "LOCATION 's3://" + bucketName + "/" + directoryName + baseTable + "'"); + + onDelta().executeQuery("INSERT INTO default." + baseTable + " " + + "VALUES (1, 'A', TIMESTAMP '2024-01-01'), " + + "(2, 'B', TIMESTAMP '2024-01-01'), " + + "(3, 'C', TIMESTAMP '2024-02-02'), " + + "(4, 'D', TIMESTAMP '2024-02-02')"); + + onDelta().executeQuery("CREATE TABLE default." + clonedTable + + " SHALLOW CLONE default." + baseTable + + " LOCATION 's3://" + bucketName + "/" + directoryName + clonedTable + "'"); + + List expectedRows = ImmutableList.of( + row(1, "A", Date.valueOf("2024-01-01")), + row(2, "B", Date.valueOf("2024-01-01")), + row(3, "C", Date.valueOf("2024-02-02")), + row(4, "D", Date.valueOf("2024-02-02"))); + assertThat(onDelta().executeQuery("SELECT * FROM default." + baseTable)) + .containsOnly(expectedRows); + assertThat(onTrino().executeQuery("SELECT * FROM delta.default." + baseTable)) + .containsOnly(expectedRows); + + // update on cloned table + onTrino().executeQuery("UPDATE delta.default." + clonedTable + " SET v = 'xxx' WHERE id in (1,3)"); + // source table not change + assertThat(onTrino().executeQuery("SELECT * FROM delta.default." + baseTable)) + .containsOnly(expectedRows); + List expectedRowsAfterUpdate = ImmutableList.of( + row(1, "xxx", Date.valueOf("2024-01-01")), + row(2, "B", Date.valueOf("2024-01-01")), + row(3, "xxx", Date.valueOf("2024-02-02")), + row(4, "D", Date.valueOf("2024-02-02"))); + assertThat(onTrino().executeQuery("SELECT * FROM delta.default." + clonedTable)) + .containsOnly(expectedRowsAfterUpdate); + + // merge on cloned table + String mergeSql = format(""" + MERGE INTO %s t + USING (VALUES (3, 'yyy', TIMESTAMP '2025-01-01'), (4, 'zzz', TIMESTAMP '2025-02-02'), (5, 'kkk', TIMESTAMP '2025-03-03')) AS s(id, v, part) + ON (t.id = s.id) + WHEN MATCHED AND s.v = 'zzz' THEN DELETE + WHEN MATCHED THEN UPDATE SET v = s.v + WHEN NOT MATCHED THEN INSERT (id, v, part) VALUES(s.id, s.v, s.part) + """, "delta.default." + clonedTable); + onTrino().executeQuery(mergeSql); + + List expectedRowsAfterMerge = ImmutableList.of( + row(1, "xxx", Date.valueOf("2024-01-01")), + row(2, "B", Date.valueOf("2024-01-01")), + row(3, "yyy", Date.valueOf("2024-02-02")), + row(5, "kkk", Date.valueOf("2025-03-03"))); + assertThat(onTrino().executeQuery("SELECT * FROM delta.default." + clonedTable)) + .containsOnly(expectedRowsAfterMerge); + } + finally { + onTrino().executeQuery("DROP TABLE IF EXISTS delta.default." + baseTable); + onTrino().executeQuery("DROP TABLE IF EXISTS delta.default." + clonedTable); + } + } + private void testReadSchemaChangedCloneTable(String cloneType, boolean partitioned) { String directoryName = "/databricks-compatibility-test-";