From 5b83daa1ce376f16f24094cc7b19c962e14395c0 Mon Sep 17 00:00:00 2001 From: CalvinConfluent Date: Thu, 23 Jan 2025 10:50:49 -0800 Subject: [PATCH 1/2] KAFKA-18634: Fix ELR metadata version issues --- .../controller/ReplicationControlManager.java | 4 +++- .../kafka/image/loader/MetadataLoader.java | 1 + .../image/publisher/SnapshotEmitter.java | 1 + .../image/writer/ImageWriterOptions.java | 21 +++++++++++++++++-- .../kafka/metadata/PartitionRegistration.java | 4 +++- .../controller/ClusterControlManagerTest.java | 1 + ...ontrollerMetadataMetricsPublisherTest.java | 2 ++ .../metrics/ControllerMetricsChangesTest.java | 1 + .../apache/kafka/image/ClusterImageTest.java | 1 + .../kafka/image/DelegationTokenImageTest.java | 9 ++++++-- .../apache/kafka/image/FeaturesImageTest.java | 5 ++++- .../kafka/image/ImageDowngradeTest.java | 1 + .../apache/kafka/image/MetadataImageTest.java | 3 +++ .../apache/kafka/image/ScramImageTest.java | 9 ++++++-- .../image/writer/ImageWriterOptionsTest.java | 19 ++++++++++++++++- .../metadata/PartitionRegistrationTest.java | 13 +++++++++++- .../kafka/server/common/MetadataVersion.java | 2 +- tests/kafkatest/version.py | 2 +- 18 files changed, 86 insertions(+), 13 deletions(-) diff --git a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java index b2e232cfdab34..93adb92374c91 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java @@ -798,6 +798,7 @@ private ApiError createTopic(ControllerRequestContext context, PartitionRegistration info = partEntry.getValue(); records.add(info.toRecord(topicId, partitionIndex, new ImageWriterOptions.Builder(). setMetadataVersion(featureControl.metadataVersion()). + setFinalizedFeatures(featureControl.latestFinalizedFeatures().featureMap()). build())); } return ApiError.NONE; @@ -1416,7 +1417,7 @@ void handleBrokerInControlledShutdown(int brokerId, long brokerEpoch, List records) { - if (featureControl.metadataVersion().isElrSupported()) { + if (featureControl.isElrFeatureEnabled()) { // ELR is enabled, generate unclean shutdown partition change records generateLeaderAndIsrUpdates("handleBrokerUncleanShutdown", NO_LEADER, NO_LEADER, brokerId, records, brokersToIsrs.partitionsWithBrokerInIsr(brokerId)); @@ -1890,6 +1891,7 @@ void createPartitions(ControllerRequestContext context, records.add(buildPartitionRegistration(partitionAssignment, isr) .toRecord(topicId, partitionId, new ImageWriterOptions.Builder(). setMetadataVersion(featureControl.metadataVersion()). + setFinalizedFeatures(featureControl.latestFinalizedFeatures().featureMap()). build())); partitionId++; } diff --git a/metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoader.java b/metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoader.java index 56f5f20268b63..2d9dba0b49edd 100644 --- a/metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoader.java +++ b/metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoader.java @@ -295,6 +295,7 @@ void initializeNewPublishers() { ImageReWriter writer = new ImageReWriter(delta); image.write(writer, new ImageWriterOptions.Builder(). setMetadataVersion(image.features().metadataVersion()). + setFinalizedFeatures(image.features().finalizedVersions()). build()); // ImageReWriter#close invokes finishSnapshot, so we don't need to invoke it here. SnapshotManifest manifest = new SnapshotManifest( diff --git a/metadata/src/main/java/org/apache/kafka/image/publisher/SnapshotEmitter.java b/metadata/src/main/java/org/apache/kafka/image/publisher/SnapshotEmitter.java index c3e08caf9f2f7..be5e9f1b10df2 100644 --- a/metadata/src/main/java/org/apache/kafka/image/publisher/SnapshotEmitter.java +++ b/metadata/src/main/java/org/apache/kafka/image/publisher/SnapshotEmitter.java @@ -148,6 +148,7 @@ public void maybeEmit(MetadataImage image) { try { image.write(writer, new ImageWriterOptions.Builder(). setMetadataVersion(image.features().metadataVersion()). + setFinalizedFeatures(image.features().finalizedVersions()). build()); writer.close(true); metrics.setLatestSnapshotGeneratedTimeMs(time.milliseconds()); diff --git a/metadata/src/main/java/org/apache/kafka/image/writer/ImageWriterOptions.java b/metadata/src/main/java/org/apache/kafka/image/writer/ImageWriterOptions.java index f84fe1e66d671..c47bb0b8af2e1 100644 --- a/metadata/src/main/java/org/apache/kafka/image/writer/ImageWriterOptions.java +++ b/metadata/src/main/java/org/apache/kafka/image/writer/ImageWriterOptions.java @@ -20,6 +20,7 @@ import org.apache.kafka.image.MetadataImage; import org.apache.kafka.server.common.MetadataVersion; +import java.util.Map; import java.util.function.Consumer; @@ -30,6 +31,7 @@ public final class ImageWriterOptions { public static class Builder { private MetadataVersion metadataVersion; private MetadataVersion requestedMetadataVersion; + private Map finalizedFeatures; private Consumer lossHandler = e -> { throw e; }; @@ -54,6 +56,11 @@ public Builder setMetadataVersion(MetadataVersion metadataVersion) { return this; } + public Builder setFinalizedFeatures(Map features) { + finalizedFeatures = features; + return this; + } + public MetadataVersion metadataVersion() { return metadataVersion; } @@ -62,33 +69,43 @@ public MetadataVersion requestedMetadataVersion() { return requestedMetadataVersion; } + public Map finalizedFeatures() { + return finalizedFeatures; + } + public Builder setLossHandler(Consumer lossHandler) { this.lossHandler = lossHandler; return this; } public ImageWriterOptions build() { - return new ImageWriterOptions(metadataVersion, lossHandler, requestedMetadataVersion); + return new ImageWriterOptions(metadataVersion, lossHandler, requestedMetadataVersion, finalizedFeatures); } } private final MetadataVersion metadataVersion; private final MetadataVersion requestedMetadataVersion; private final Consumer lossHandler; + private final Map finalizedFeatures; private ImageWriterOptions( MetadataVersion metadataVersion, Consumer lossHandler, - MetadataVersion orgMetadataVersion + MetadataVersion orgMetadataVersion, + Map finalizedFeatures ) { this.metadataVersion = metadataVersion; this.lossHandler = lossHandler; this.requestedMetadataVersion = orgMetadataVersion; + this.finalizedFeatures = finalizedFeatures; } public MetadataVersion metadataVersion() { return metadataVersion; } + public Map finalizedFeatures() { + return finalizedFeatures; + } public void handleLoss(String loss) { lossHandler.accept(new UnwritableMetadataException(requestedMetadataVersion, loss)); diff --git a/metadata/src/main/java/org/apache/kafka/metadata/PartitionRegistration.java b/metadata/src/main/java/org/apache/kafka/metadata/PartitionRegistration.java index 99a877a7953a9..3a1dac80c82fd 100644 --- a/metadata/src/main/java/org/apache/kafka/metadata/PartitionRegistration.java +++ b/metadata/src/main/java/org/apache/kafka/metadata/PartitionRegistration.java @@ -27,6 +27,7 @@ import org.apache.kafka.image.writer.ImageWriterOptions; import org.apache.kafka.server.common.ApiMessageAndVersion; +import org.apache.kafka.server.common.EligibleLeaderReplicasVersion; import org.slf4j.Logger; import java.util.Arrays; @@ -387,7 +388,8 @@ public ApiMessageAndVersion toRecord(Uuid topicId, int partitionId, ImageWriterO setLeaderRecoveryState(leaderRecoveryState.value()). setLeaderEpoch(leaderEpoch). setPartitionEpoch(partitionEpoch); - if (options.metadataVersion().isElrSupported()) { + if (options.finalizedFeatures().getOrDefault(EligibleLeaderReplicasVersion.FEATURE_NAME, (short) 0) >= + EligibleLeaderReplicasVersion.ELRV_1.featureLevel()) { // The following are tagged fields, we should only set them when there are some contents, in order to save // spaces. if (elr.length > 0) record.setEligibleLeaderReplicas(Replicas.toList(elr)); diff --git a/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java index e7d190339f44d..eebb24eac2d7e 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java @@ -505,6 +505,7 @@ public void testRegistrationsToRecords(MetadataVersion metadataVersion) { ImageWriterOptions options = new ImageWriterOptions.Builder(). setMetadataVersion(metadataVersion). + setFinalizedFeatures(Collections.emptyMap()). setLossHandler(__ -> { }). build(); assertEquals(new ApiMessageAndVersion(new RegisterBrokerRecord(). diff --git a/metadata/src/test/java/org/apache/kafka/controller/metrics/ControllerMetadataMetricsPublisherTest.java b/metadata/src/test/java/org/apache/kafka/controller/metrics/ControllerMetadataMetricsPublisherTest.java index 2d230d605127f..4111581ef9794 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/metrics/ControllerMetadataMetricsPublisherTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/metrics/ControllerMetadataMetricsPublisherTest.java @@ -40,6 +40,7 @@ import org.junit.jupiter.api.Test; +import java.util.Collections; import java.util.Optional; import static org.apache.kafka.controller.metrics.ControllerMetricsTestUtils.FakePartitionRegistrationType.NON_PREFERRED_LEADER; @@ -147,6 +148,7 @@ public void testLoadSnapshot() { ImageReWriter writer = new ImageReWriter(delta); IMAGE1.write(writer, new ImageWriterOptions.Builder(). setMetadataVersion(delta.image().features().metadataVersion()). + setFinalizedFeatures(Collections.emptyMap()). build()); env.publisher.onMetadataUpdate(delta, IMAGE1, fakeManifest(true)); assertEquals(0, env.metrics.activeBrokerCount()); diff --git a/metadata/src/test/java/org/apache/kafka/controller/metrics/ControllerMetricsChangesTest.java b/metadata/src/test/java/org/apache/kafka/controller/metrics/ControllerMetricsChangesTest.java index 80f54daf27b10..74c7d3cf60814 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/metrics/ControllerMetricsChangesTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/metrics/ControllerMetricsChangesTest.java @@ -131,6 +131,7 @@ public void testHandleDeletedTopic() { static { ImageWriterOptions options = new ImageWriterOptions.Builder(). + setFinalizedFeatures(Collections.emptyMap()). setMetadataVersion(MetadataVersion.IBP_3_7_IV0).build(); // highest MV for PartitionRecord v0 TOPIC_DELTA1 = new TopicDelta(new TopicImage("foo", FOO_ID, Collections.emptyMap())); TOPIC_DELTA1.replay((PartitionRecord) fakePartitionRegistration(NORMAL). diff --git a/metadata/src/test/java/org/apache/kafka/image/ClusterImageTest.java b/metadata/src/test/java/org/apache/kafka/image/ClusterImageTest.java index cf79d276b8474..74dec6dfe4e2c 100644 --- a/metadata/src/test/java/org/apache/kafka/image/ClusterImageTest.java +++ b/metadata/src/test/java/org/apache/kafka/image/ClusterImageTest.java @@ -313,6 +313,7 @@ public void testHandleLossOfControllerRegistrations() { final AtomicReference lossString = new AtomicReference<>(""); testImage.write(writer, new ImageWriterOptions.Builder(). setMetadataVersion(MetadataVersion.IBP_3_6_IV2). + setFinalizedFeatures(Collections.emptyMap()). setLossHandler(loss -> lossString.compareAndSet("", loss.loss())). build()); assertEquals("controller registration data", lossString.get()); diff --git a/metadata/src/test/java/org/apache/kafka/image/DelegationTokenImageTest.java b/metadata/src/test/java/org/apache/kafka/image/DelegationTokenImageTest.java index 7de0237068d1b..86876f32e02bc 100644 --- a/metadata/src/test/java/org/apache/kafka/image/DelegationTokenImageTest.java +++ b/metadata/src/test/java/org/apache/kafka/image/DelegationTokenImageTest.java @@ -33,6 +33,7 @@ import org.junit.jupiter.api.Timeout; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -141,7 +142,9 @@ private static List getImageRecords(DelegationTokenImage i @Test public void testEmptyWithInvalidIBP() { ImageWriterOptions imageWriterOptions = new ImageWriterOptions.Builder(). - setMetadataVersion(MetadataVersion.IBP_3_5_IV2).build(); + setMetadataVersion(MetadataVersion.IBP_3_5_IV2). + setFinalizedFeatures(Collections.emptyMap()). + build(); RecordListWriter writer = new RecordListWriter(); DelegationTokenImage.EMPTY.write(writer, imageWriterOptions); } @@ -149,7 +152,9 @@ public void testEmptyWithInvalidIBP() { @Test public void testImage1withInvalidIBP() { ImageWriterOptions imageWriterOptions = new ImageWriterOptions.Builder(). - setMetadataVersion(MetadataVersion.IBP_3_5_IV2).build(); + setMetadataVersion(MetadataVersion.IBP_3_5_IV2). + setFinalizedFeatures(Collections.emptyMap()). + build(); RecordListWriter writer = new RecordListWriter(); try { IMAGE1.write(writer, imageWriterOptions); diff --git a/metadata/src/test/java/org/apache/kafka/image/FeaturesImageTest.java b/metadata/src/test/java/org/apache/kafka/image/FeaturesImageTest.java index 1df5ff655639f..4f9494102c39b 100644 --- a/metadata/src/test/java/org/apache/kafka/image/FeaturesImageTest.java +++ b/metadata/src/test/java/org/apache/kafka/image/FeaturesImageTest.java @@ -153,7 +153,10 @@ private static void testToImage(FeaturesImage image, List private static List getImageRecords(FeaturesImage image) { RecordListWriter writer = new RecordListWriter(); - image.write(writer, new ImageWriterOptions.Builder().setMetadataVersion(image.metadataVersion()).build()); + image.write(writer, new ImageWriterOptions.Builder(). + setMetadataVersion(image.metadataVersion()). + setFinalizedFeatures(Collections.emptyMap()). + build()); return writer.records(); } diff --git a/metadata/src/test/java/org/apache/kafka/image/ImageDowngradeTest.java b/metadata/src/test/java/org/apache/kafka/image/ImageDowngradeTest.java index ba636f3bdba96..d6bf8a86034c1 100644 --- a/metadata/src/test/java/org/apache/kafka/image/ImageDowngradeTest.java +++ b/metadata/src/test/java/org/apache/kafka/image/ImageDowngradeTest.java @@ -202,6 +202,7 @@ private static void writeWithExpectedLosses( RecordListWriter writer = new RecordListWriter(); image.write(writer, new ImageWriterOptions.Builder(). setMetadataVersion(metadataVersion). + setFinalizedFeatures(Collections.emptyMap()). setLossHandler(lossConsumer). build()); assertEquals(expectedLosses, lossConsumer.losses, "Failed to get expected metadata losses."); diff --git a/metadata/src/test/java/org/apache/kafka/image/MetadataImageTest.java b/metadata/src/test/java/org/apache/kafka/image/MetadataImageTest.java index dea6fbab71084..aac6d0be413f1 100644 --- a/metadata/src/test/java/org/apache/kafka/image/MetadataImageTest.java +++ b/metadata/src/test/java/org/apache/kafka/image/MetadataImageTest.java @@ -25,6 +25,7 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; +import java.util.Collections; import java.util.List; import java.util.Optional; @@ -94,6 +95,7 @@ public void testApplyDelta1() { // check image1 + delta1 = image2, since records for image1 + delta1 might differ from records from image2 ImageWriterOptions options = new ImageWriterOptions.Builder() .setMetadataVersion(IMAGE1.features().metadataVersion()) + .setFinalizedFeatures(Collections.emptyMap()) .build(); List records = getImageRecords(IMAGE1, options); records.addAll(FeaturesImageTest.DELTA1_RECORDS); @@ -116,6 +118,7 @@ public void testImage2RoundTrip() { private static void testToImage(MetadataImage image) { testToImage(image, new ImageWriterOptions.Builder() .setMetadataVersion(image.features().metadataVersion()) + .setFinalizedFeatures(Collections.emptyMap()) .build(), Optional.empty()); } diff --git a/metadata/src/test/java/org/apache/kafka/image/ScramImageTest.java b/metadata/src/test/java/org/apache/kafka/image/ScramImageTest.java index 528b75fbe03cf..efd30767a2e5d 100644 --- a/metadata/src/test/java/org/apache/kafka/image/ScramImageTest.java +++ b/metadata/src/test/java/org/apache/kafka/image/ScramImageTest.java @@ -32,6 +32,7 @@ import org.junit.jupiter.api.Timeout; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -177,7 +178,9 @@ private static List getImageRecords(ScramImage image) { @Test public void testEmptyWithInvalidIBP() { ImageWriterOptions imageWriterOptions = new ImageWriterOptions.Builder(). - setMetadataVersion(MetadataVersion.IBP_3_4_IV0).build(); + setMetadataVersion(MetadataVersion.IBP_3_4_IV0). + setFinalizedFeatures(Collections.emptyMap()). + build(); RecordListWriter writer = new RecordListWriter(); ScramImage.EMPTY.write(writer, imageWriterOptions); } @@ -185,7 +188,9 @@ public void testEmptyWithInvalidIBP() { @Test public void testImage1withInvalidIBP() { ImageWriterOptions imageWriterOptions = new ImageWriterOptions.Builder(). - setMetadataVersion(MetadataVersion.IBP_3_4_IV0).build(); + setMetadataVersion(MetadataVersion.IBP_3_4_IV0). + setFinalizedFeatures(Collections.emptyMap()). + build(); RecordListWriter writer = new RecordListWriter(); try { IMAGE1.write(writer, imageWriterOptions); diff --git a/metadata/src/test/java/org/apache/kafka/image/writer/ImageWriterOptionsTest.java b/metadata/src/test/java/org/apache/kafka/image/writer/ImageWriterOptionsTest.java index c1c119f9b4cb5..b2ba39b7cd9b4 100644 --- a/metadata/src/test/java/org/apache/kafka/image/writer/ImageWriterOptionsTest.java +++ b/metadata/src/test/java/org/apache/kafka/image/writer/ImageWriterOptionsTest.java @@ -17,14 +17,19 @@ package org.apache.kafka.image.writer; +import org.apache.kafka.server.common.EligibleLeaderReplicasVersion; +import org.apache.kafka.server.common.GroupVersion; import org.apache.kafka.server.common.MetadataVersion; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; +import java.util.Collections; +import java.util.Map; import java.util.function.Consumer; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -44,7 +49,8 @@ public void testSetMetadataVersion() { i++) { MetadataVersion version = MetadataVersion.VERSIONS[i]; ImageWriterOptions.Builder options = new ImageWriterOptions.Builder(). - setMetadataVersion(version); + setMetadataVersion(version). + setFinalizedFeatures(Collections.emptyMap()); if (i < MetadataVersion.MINIMUM_BOOTSTRAP_VERSION.ordinal()) { assertEquals(MetadataVersion.MINIMUM_KRAFT_VERSION, options.metadataVersion()); assertEquals(version, options.requestedMetadataVersion()); @@ -66,9 +72,20 @@ public void testHandleLoss() { Consumer customLossHandler = e -> assertEquals(formattedMessage, e.getMessage()); ImageWriterOptions options = new ImageWriterOptions.Builder() .setMetadataVersion(version) + .setFinalizedFeatures(Collections.emptyMap()) .setLossHandler(customLossHandler) .build(); options.handleLoss(expectedMessage); } } + + @Test + public void testSetFeatures() { + MetadataVersion version = MetadataVersion.MINIMUM_BOOTSTRAP_VERSION; + ImageWriterOptions options = new ImageWriterOptions.Builder(). + setMetadataVersion(version). + setFinalizedFeatures(Map.of(EligibleLeaderReplicasVersion.FEATURE_NAME, EligibleLeaderReplicasVersion.ELRV_1.featureLevel())).build(); + assertEquals(EligibleLeaderReplicasVersion.ELRV_1.featureLevel(), options.finalizedFeatures().get(EligibleLeaderReplicasVersion.FEATURE_NAME)); + assertFalse(options.finalizedFeatures().containsKey(GroupVersion.FEATURE_NAME)); + } } diff --git a/metadata/src/test/java/org/apache/kafka/metadata/PartitionRegistrationTest.java b/metadata/src/test/java/org/apache/kafka/metadata/PartitionRegistrationTest.java index c7d1767889b5d..8e8ee9aad9a57 100644 --- a/metadata/src/test/java/org/apache/kafka/metadata/PartitionRegistrationTest.java +++ b/metadata/src/test/java/org/apache/kafka/metadata/PartitionRegistrationTest.java @@ -26,6 +26,7 @@ import org.apache.kafka.image.writer.ImageWriterOptions; import org.apache.kafka.image.writer.UnwritableMetadataException; import org.apache.kafka.server.common.ApiMessageAndVersion; +import org.apache.kafka.server.common.EligibleLeaderReplicasVersion; import org.apache.kafka.server.common.MetadataVersion; import net.jqwik.api.Arbitraries; @@ -43,7 +44,9 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.stream.Stream; import static org.junit.jupiter.api.Assertions.assertArrayEquals; @@ -92,7 +95,9 @@ public void testRecordRoundTrip() { Uuid topicId = Uuid.fromString("OGdAI5nxT_m-ds3rJMqPLA"); int partitionId = 4; ApiMessageAndVersion record = registrationA.toRecord(topicId, partitionId, new ImageWriterOptions.Builder(). - setMetadataVersion(MetadataVersion.IBP_3_7_IV0).build()); // highest MV for PartitionRecord v0 + setMetadataVersion(MetadataVersion.IBP_3_7_IV0). + setFinalizedFeatures(Collections.emptyMap()). + build()); // highest MV for PartitionRecord v0 PartitionRegistration registrationB = new PartitionRegistration((PartitionRecord) record.message()); assertEquals(registrationA, registrationB); @@ -336,8 +341,13 @@ public void testPartitionRegistrationToRecord(MetadataVersion metadataVersion) { )); } List exceptions = new ArrayList<>(); + Map features = new HashMap<>(); + if (metadataVersion.isElrSupported()) { + features.put(EligibleLeaderReplicasVersion.FEATURE_NAME, EligibleLeaderReplicasVersion.ELRV_1.featureLevel()); + } ImageWriterOptions options = new ImageWriterOptions.Builder(). setMetadataVersion(metadataVersion). + setFinalizedFeatures(features). setLossHandler(exceptions::add). build(); assertEquals(new ApiMessageAndVersion(expectRecord, metadataVersion.partitionRecordVersion()), @@ -374,6 +384,7 @@ public void testPartitionRegistrationToRecord_ElrShouldBeNullIfEmpty() { List exceptions = new ArrayList<>(); ImageWriterOptions options = new ImageWriterOptions.Builder(). setMetadataVersion(MetadataVersion.IBP_4_0_IV1). + setFinalizedFeatures(Collections.emptyMap()). setLossHandler(exceptions::add). build(); assertEquals(new ApiMessageAndVersion(expectRecord, (short) 2), partitionRegistration.toRecord(topicID, 0, options)); diff --git a/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java b/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java index 310a9242b2345..b2aad09b91ec8 100644 --- a/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java +++ b/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java @@ -152,7 +152,7 @@ public enum MetadataVersion { * Think carefully before you update this value. ONCE A METADATA VERSION IS PRODUCTION, * IT CANNOT BE CHANGED. */ - public static final MetadataVersion LATEST_PRODUCTION = IBP_4_0_IV0; + public static final MetadataVersion LATEST_PRODUCTION = IBP_4_0_IV1; // If you change the value above please also update // LATEST_STABLE_METADATA_VERSION version in tests/kafkatest/version.py diff --git a/tests/kafkatest/version.py b/tests/kafkatest/version.py index 6eeeedd44528d..7e212d60010ee 100644 --- a/tests/kafkatest/version.py +++ b/tests/kafkatest/version.py @@ -114,7 +114,7 @@ def get_version(node=None): LATEST_STABLE_TRANSACTION_VERSION = 2 # This should match the LATEST_PRODUCTION version defined in MetadataVersion.java -LATEST_STABLE_METADATA_VERSION = "4.0-IV0" +LATEST_STABLE_METADATA_VERSION = "4.0-IV1" # 0.11.0.x versions V_0_11_0_3 = KafkaVersion("0.11.0.3") From 975a681eed47a908e7bca740f14ffe204958b42c Mon Sep 17 00:00:00 2001 From: CalvinConfluent Date: Thu, 23 Jan 2025 10:55:33 -0800 Subject: [PATCH 2/2] Minor --- .../org/apache/kafka/server/common/MetadataVersion.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java b/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java index b2aad09b91ec8..36f7a6e956b91 100644 --- a/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java +++ b/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java @@ -113,15 +113,15 @@ public enum MetadataVersion { // Bootstrap metadata version for version 1 of the GroupVersion feature (KIP-848). IBP_4_0_IV0(22, "4.0", "IV0", false), + // Add ELR related supports (KIP-966). + IBP_4_0_IV1(23, "4.0", "IV1", true), + // // NOTE: MetadataVersions after this point are unstable and may be changed. // If users attempt to use an unstable MetadataVersion, they will get an error. // Please move this comment when updating the LATEST_PRODUCTION constant. // - // Add ELR related supports (KIP-966). - IBP_4_0_IV1(23, "4.0", "IV1", true), - // Bootstrap metadata version for transaction versions 1 and 2 (KIP-890) IBP_4_0_IV2(24, "4.0", "IV2", false),