Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-18634: Fix ELR metadata version issues #18680

Open
wants to merge 2 commits into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -798,6 +798,7 @@ private ApiError createTopic(ControllerRequestContext context,
PartitionRegistration info = partEntry.getValue();
records.add(info.toRecord(topicId, partitionIndex, new ImageWriterOptions.Builder().
setMetadataVersion(featureControl.metadataVersion()).
setFinalizedFeatures(featureControl.latestFinalizedFeatures().featureMap()).
build()));
}
return ApiError.NONE;
Expand Down Expand Up @@ -1416,7 +1417,7 @@ void handleBrokerInControlledShutdown(int brokerId, long brokerEpoch, List<ApiMe
* @param records The record list to append to.
*/
void handleBrokerUncleanShutdown(int brokerId, List<ApiMessageAndVersion> records) {
if (featureControl.metadataVersion().isElrSupported()) {
if (featureControl.isElrFeatureEnabled()) {
// ELR is enabled, generate unclean shutdown partition change records
generateLeaderAndIsrUpdates("handleBrokerUncleanShutdown", NO_LEADER, NO_LEADER, brokerId, records,
brokersToIsrs.partitionsWithBrokerInIsr(brokerId));
Expand Down Expand Up @@ -1890,6 +1891,7 @@ void createPartitions(ControllerRequestContext context,
records.add(buildPartitionRegistration(partitionAssignment, isr)
.toRecord(topicId, partitionId, new ImageWriterOptions.Builder().
setMetadataVersion(featureControl.metadataVersion()).
setFinalizedFeatures(featureControl.latestFinalizedFeatures().featureMap()).
build()));
partitionId++;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,7 @@ void initializeNewPublishers() {
ImageReWriter writer = new ImageReWriter(delta);
image.write(writer, new ImageWriterOptions.Builder().
setMetadataVersion(image.features().metadataVersion()).
setFinalizedFeatures(image.features().finalizedVersions()).
build());
// ImageReWriter#close invokes finishSnapshot, so we don't need to invoke it here.
SnapshotManifest manifest = new SnapshotManifest(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ public void maybeEmit(MetadataImage image) {
try {
image.write(writer, new ImageWriterOptions.Builder().
setMetadataVersion(image.features().metadataVersion()).
setFinalizedFeatures(image.features().finalizedVersions()).
build());
writer.close(true);
metrics.setLatestSnapshotGeneratedTimeMs(time.milliseconds());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.server.common.MetadataVersion;

import java.util.Map;
import java.util.function.Consumer;


Expand All @@ -30,6 +31,7 @@ public final class ImageWriterOptions {
public static class Builder {
private MetadataVersion metadataVersion;
private MetadataVersion requestedMetadataVersion;
private Map<String, Short> finalizedFeatures;
private Consumer<UnwritableMetadataException> lossHandler = e -> {
throw e;
};
Expand All @@ -54,6 +56,11 @@ public Builder setMetadataVersion(MetadataVersion metadataVersion) {
return this;
}

public Builder setFinalizedFeatures(Map<String, Short> features) {
finalizedFeatures = features;
return this;
}

public MetadataVersion metadataVersion() {
return metadataVersion;
}
Expand All @@ -62,33 +69,43 @@ public MetadataVersion requestedMetadataVersion() {
return requestedMetadataVersion;
}

public Map<String, Short> finalizedFeatures() {
return finalizedFeatures;
}

public Builder setLossHandler(Consumer<UnwritableMetadataException> lossHandler) {
this.lossHandler = lossHandler;
return this;
}

public ImageWriterOptions build() {
return new ImageWriterOptions(metadataVersion, lossHandler, requestedMetadataVersion);
return new ImageWriterOptions(metadataVersion, lossHandler, requestedMetadataVersion, finalizedFeatures);
}
}

private final MetadataVersion metadataVersion;
private final MetadataVersion requestedMetadataVersion;
private final Consumer<UnwritableMetadataException> lossHandler;
private final Map<String, Short> finalizedFeatures;

private ImageWriterOptions(
MetadataVersion metadataVersion,
Consumer<UnwritableMetadataException> lossHandler,
MetadataVersion orgMetadataVersion
MetadataVersion orgMetadataVersion,
Map<String, Short> finalizedFeatures
) {
this.metadataVersion = metadataVersion;
this.lossHandler = lossHandler;
this.requestedMetadataVersion = orgMetadataVersion;
this.finalizedFeatures = finalizedFeatures;
}

public MetadataVersion metadataVersion() {
return metadataVersion;
}
public Map<String, Short> finalizedFeatures() {
return finalizedFeatures;
}

public void handleLoss(String loss) {
lossHandler.accept(new UnwritableMetadataException(requestedMetadataVersion, loss));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.kafka.image.writer.ImageWriterOptions;
import org.apache.kafka.server.common.ApiMessageAndVersion;

import org.apache.kafka.server.common.EligibleLeaderReplicasVersion;
import org.slf4j.Logger;

import java.util.Arrays;
Expand Down Expand Up @@ -387,7 +388,8 @@ public ApiMessageAndVersion toRecord(Uuid topicId, int partitionId, ImageWriterO
setLeaderRecoveryState(leaderRecoveryState.value()).
setLeaderEpoch(leaderEpoch).
setPartitionEpoch(partitionEpoch);
if (options.metadataVersion().isElrSupported()) {
if (options.finalizedFeatures().getOrDefault(EligibleLeaderReplicasVersion.FEATURE_NAME, (short) 0) >=
EligibleLeaderReplicasVersion.ELRV_1.featureLevel()) {
// The following are tagged fields, we should only set them when there are some contents, in order to save
// spaces.
if (elr.length > 0) record.setEligibleLeaderReplicas(Replicas.toList(elr));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -505,6 +505,7 @@ public void testRegistrationsToRecords(MetadataVersion metadataVersion) {

ImageWriterOptions options = new ImageWriterOptions.Builder().
setMetadataVersion(metadataVersion).
setFinalizedFeatures(Collections.emptyMap()).
setLossHandler(__ -> { }).
build();
assertEquals(new ApiMessageAndVersion(new RegisterBrokerRecord().
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@

import org.junit.jupiter.api.Test;

import java.util.Collections;
import java.util.Optional;

import static org.apache.kafka.controller.metrics.ControllerMetricsTestUtils.FakePartitionRegistrationType.NON_PREFERRED_LEADER;
Expand Down Expand Up @@ -147,6 +148,7 @@ public void testLoadSnapshot() {
ImageReWriter writer = new ImageReWriter(delta);
IMAGE1.write(writer, new ImageWriterOptions.Builder().
setMetadataVersion(delta.image().features().metadataVersion()).
setFinalizedFeatures(Collections.emptyMap()).
build());
env.publisher.onMetadataUpdate(delta, IMAGE1, fakeManifest(true));
assertEquals(0, env.metrics.activeBrokerCount());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ public void testHandleDeletedTopic() {

static {
ImageWriterOptions options = new ImageWriterOptions.Builder().
setFinalizedFeatures(Collections.emptyMap()).
setMetadataVersion(MetadataVersion.IBP_3_7_IV0).build(); // highest MV for PartitionRecord v0
TOPIC_DELTA1 = new TopicDelta(new TopicImage("foo", FOO_ID, Collections.emptyMap()));
TOPIC_DELTA1.replay((PartitionRecord) fakePartitionRegistration(NORMAL).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,7 @@ public void testHandleLossOfControllerRegistrations() {
final AtomicReference<String> lossString = new AtomicReference<>("");
testImage.write(writer, new ImageWriterOptions.Builder().
setMetadataVersion(MetadataVersion.IBP_3_6_IV2).
setFinalizedFeatures(Collections.emptyMap()).
setLossHandler(loss -> lossString.compareAndSet("", loss.loss())).
build());
assertEquals("controller registration data", lossString.get());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.junit.jupiter.api.Timeout;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -141,15 +142,19 @@ private static List<ApiMessageAndVersion> getImageRecords(DelegationTokenImage i
@Test
public void testEmptyWithInvalidIBP() {
ImageWriterOptions imageWriterOptions = new ImageWriterOptions.Builder().
setMetadataVersion(MetadataVersion.IBP_3_5_IV2).build();
setMetadataVersion(MetadataVersion.IBP_3_5_IV2).
setFinalizedFeatures(Collections.emptyMap()).
build();
RecordListWriter writer = new RecordListWriter();
DelegationTokenImage.EMPTY.write(writer, imageWriterOptions);
}

@Test
public void testImage1withInvalidIBP() {
ImageWriterOptions imageWriterOptions = new ImageWriterOptions.Builder().
setMetadataVersion(MetadataVersion.IBP_3_5_IV2).build();
setMetadataVersion(MetadataVersion.IBP_3_5_IV2).
setFinalizedFeatures(Collections.emptyMap()).
build();
RecordListWriter writer = new RecordListWriter();
try {
IMAGE1.write(writer, imageWriterOptions);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,10 @@ private static void testToImage(FeaturesImage image, List<ApiMessageAndVersion>

private static List<ApiMessageAndVersion> getImageRecords(FeaturesImage image) {
RecordListWriter writer = new RecordListWriter();
image.write(writer, new ImageWriterOptions.Builder().setMetadataVersion(image.metadataVersion()).build());
image.write(writer, new ImageWriterOptions.Builder().
setMetadataVersion(image.metadataVersion()).
setFinalizedFeatures(Collections.emptyMap()).
build());
return writer.records();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ private static void writeWithExpectedLosses(
RecordListWriter writer = new RecordListWriter();
image.write(writer, new ImageWriterOptions.Builder().
setMetadataVersion(metadataVersion).
setFinalizedFeatures(Collections.emptyMap()).
setLossHandler(lossConsumer).
build());
assertEquals(expectedLosses, lossConsumer.losses, "Failed to get expected metadata losses.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

import java.util.Collections;
import java.util.List;
import java.util.Optional;

Expand Down Expand Up @@ -94,6 +95,7 @@ public void testApplyDelta1() {
// check image1 + delta1 = image2, since records for image1 + delta1 might differ from records from image2
ImageWriterOptions options = new ImageWriterOptions.Builder()
.setMetadataVersion(IMAGE1.features().metadataVersion())
.setFinalizedFeatures(Collections.emptyMap())
.build();
List<ApiMessageAndVersion> records = getImageRecords(IMAGE1, options);
records.addAll(FeaturesImageTest.DELTA1_RECORDS);
Expand All @@ -116,6 +118,7 @@ public void testImage2RoundTrip() {
private static void testToImage(MetadataImage image) {
testToImage(image, new ImageWriterOptions.Builder()
.setMetadataVersion(image.features().metadataVersion())
.setFinalizedFeatures(Collections.emptyMap())
.build(), Optional.empty());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.junit.jupiter.api.Timeout;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -177,15 +178,19 @@ private static List<ApiMessageAndVersion> getImageRecords(ScramImage image) {
@Test
public void testEmptyWithInvalidIBP() {
ImageWriterOptions imageWriterOptions = new ImageWriterOptions.Builder().
setMetadataVersion(MetadataVersion.IBP_3_4_IV0).build();
setMetadataVersion(MetadataVersion.IBP_3_4_IV0).
setFinalizedFeatures(Collections.emptyMap()).
build();
RecordListWriter writer = new RecordListWriter();
ScramImage.EMPTY.write(writer, imageWriterOptions);
}

@Test
public void testImage1withInvalidIBP() {
ImageWriterOptions imageWriterOptions = new ImageWriterOptions.Builder().
setMetadataVersion(MetadataVersion.IBP_3_4_IV0).build();
setMetadataVersion(MetadataVersion.IBP_3_4_IV0).
setFinalizedFeatures(Collections.emptyMap()).
build();
RecordListWriter writer = new RecordListWriter();
try {
IMAGE1.write(writer, imageWriterOptions);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,19 @@

package org.apache.kafka.image.writer;

import org.apache.kafka.server.common.EligibleLeaderReplicasVersion;
import org.apache.kafka.server.common.GroupVersion;
import org.apache.kafka.server.common.MetadataVersion;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

import java.util.Collections;
import java.util.Map;
import java.util.function.Consumer;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;


Expand All @@ -44,7 +49,8 @@ public void testSetMetadataVersion() {
i++) {
MetadataVersion version = MetadataVersion.VERSIONS[i];
ImageWriterOptions.Builder options = new ImageWriterOptions.Builder().
setMetadataVersion(version);
setMetadataVersion(version).
setFinalizedFeatures(Collections.emptyMap());
if (i < MetadataVersion.MINIMUM_BOOTSTRAP_VERSION.ordinal()) {
assertEquals(MetadataVersion.MINIMUM_KRAFT_VERSION, options.metadataVersion());
assertEquals(version, options.requestedMetadataVersion());
Expand All @@ -66,9 +72,20 @@ public void testHandleLoss() {
Consumer<UnwritableMetadataException> customLossHandler = e -> assertEquals(formattedMessage, e.getMessage());
ImageWriterOptions options = new ImageWriterOptions.Builder()
.setMetadataVersion(version)
.setFinalizedFeatures(Collections.emptyMap())
.setLossHandler(customLossHandler)
.build();
options.handleLoss(expectedMessage);
}
}

@Test
public void testSetFeatures() {
MetadataVersion version = MetadataVersion.MINIMUM_BOOTSTRAP_VERSION;
ImageWriterOptions options = new ImageWriterOptions.Builder().
setMetadataVersion(version).
setFinalizedFeatures(Map.of(EligibleLeaderReplicasVersion.FEATURE_NAME, EligibleLeaderReplicasVersion.ELRV_1.featureLevel())).build();
assertEquals(EligibleLeaderReplicasVersion.ELRV_1.featureLevel(), options.finalizedFeatures().get(EligibleLeaderReplicasVersion.FEATURE_NAME));
assertFalse(options.finalizedFeatures().containsKey(GroupVersion.FEATURE_NAME));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.kafka.image.writer.ImageWriterOptions;
import org.apache.kafka.image.writer.UnwritableMetadataException;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.common.EligibleLeaderReplicasVersion;
import org.apache.kafka.server.common.MetadataVersion;

import net.jqwik.api.Arbitraries;
Expand All @@ -43,7 +44,9 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Stream;

import static org.junit.jupiter.api.Assertions.assertArrayEquals;
Expand Down Expand Up @@ -92,7 +95,9 @@ public void testRecordRoundTrip() {
Uuid topicId = Uuid.fromString("OGdAI5nxT_m-ds3rJMqPLA");
int partitionId = 4;
ApiMessageAndVersion record = registrationA.toRecord(topicId, partitionId, new ImageWriterOptions.Builder().
setMetadataVersion(MetadataVersion.IBP_3_7_IV0).build()); // highest MV for PartitionRecord v0
setMetadataVersion(MetadataVersion.IBP_3_7_IV0).
setFinalizedFeatures(Collections.emptyMap()).
build()); // highest MV for PartitionRecord v0
PartitionRegistration registrationB =
new PartitionRegistration((PartitionRecord) record.message());
assertEquals(registrationA, registrationB);
Expand Down Expand Up @@ -336,8 +341,13 @@ public void testPartitionRegistrationToRecord(MetadataVersion metadataVersion) {
));
}
List<UnwritableMetadataException> exceptions = new ArrayList<>();
Map<String, Short> features = new HashMap<>();
if (metadataVersion.isElrSupported()) {
features.put(EligibleLeaderReplicasVersion.FEATURE_NAME, EligibleLeaderReplicasVersion.ELRV_1.featureLevel());
}
ImageWriterOptions options = new ImageWriterOptions.Builder().
setMetadataVersion(metadataVersion).
setFinalizedFeatures(features).
setLossHandler(exceptions::add).
build();
assertEquals(new ApiMessageAndVersion(expectRecord, metadataVersion.partitionRecordVersion()),
Expand Down Expand Up @@ -374,6 +384,7 @@ public void testPartitionRegistrationToRecord_ElrShouldBeNullIfEmpty() {
List<UnwritableMetadataException> exceptions = new ArrayList<>();
ImageWriterOptions options = new ImageWriterOptions.Builder().
setMetadataVersion(MetadataVersion.IBP_4_0_IV1).
setFinalizedFeatures(Collections.emptyMap()).
setLossHandler(exceptions::add).
build();
assertEquals(new ApiMessageAndVersion(expectRecord, (short) 2), partitionRegistration.toRecord(topicID, 0, options));
Expand Down
Loading
Loading