Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-18632: Added few share consumer multibroker tests. #18679

Open
wants to merge 4 commits into
base: trunk
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
275 changes: 274 additions & 1 deletion core/src/test/java/kafka/test/api/ShareConsumerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaShareConsumer;
import org.apache.kafka.clients.consumer.ShareConsumer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
Expand All @@ -38,6 +39,7 @@
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.errors.InvalidConfigurationException;
Expand All @@ -48,6 +50,7 @@
import org.apache.kafka.common.internals.Topic;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.test.api.ClusterConfigProperty;
Expand All @@ -57,7 +60,9 @@
import org.apache.kafka.common.test.api.ClusterTestExtensions;
import org.apache.kafka.common.test.api.Flaky;
import org.apache.kafka.common.test.api.Type;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.coordinator.group.GroupConfig;
import org.apache.kafka.server.share.SharePartitionKey;
import org.apache.kafka.test.TestUtils;

import org.junit.jupiter.api.Tag;
Expand All @@ -74,16 +79,21 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.IntStream;
import java.util.stream.Stream;

Expand Down Expand Up @@ -1806,6 +1816,181 @@ public void testShareAutoOffsetResetByDurationInvalidFormat() throws Exception {
}
}

@ClusterTest(
brokers = 3,
serverProperties = {
@ClusterConfigProperty(key = "auto.create.topics.enable", value = "false"),
@ClusterConfigProperty(key = "group.coordinator.rebalance.protocols", value = "classic,consumer,share"),
@ClusterConfigProperty(key = "group.share.enable", value = "true"),
@ClusterConfigProperty(key = "group.share.partition.max.record.locks", value = "10000"),
@ClusterConfigProperty(key = "group.share.record.lock.duration.ms", value = "15000"),
@ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "3"),
@ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "3"),
@ClusterConfigProperty(key = "share.coordinator.state.topic.min.isr", value = "1"),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems to me that this override is unnecessary with 3 brokers and the default of 2 is better.

@ClusterConfigProperty(key = "share.coordinator.state.topic.num.partitions", value = "3"),
@ClusterConfigProperty(key = "share.coordinator.state.topic.replication.factor", value = "3"),
@ClusterConfigProperty(key = "transaction.state.log.min.isr", value = "1"),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Leave this as the default of 2 also I think.

@ClusterConfigProperty(key = "transaction.state.log.replication.factor", value = "3"),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, the default replication factor of all three of these internal topics is 3 also, which should be ideal for this test with 3 brokers.

@ClusterConfigProperty(key = "unstable.api.versions.enable", value = "true")
}
)
public void testShareConsumerAfterCoordinatorMovement() throws Exception {
setup();
String topicName = "multipart";
String groupId = "multipartGrp";
createTopic(topicName, 3, 3);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually this createTopic method knows the topic ID and discards it. I suggest changing the signature of this method to return a Uuid and then the topic ID is known without the extra call to Admin.describeTopics.


try (Admin admin = createAdminClient()) {
TopicPartition tpMulti = new TopicPartition(topicName, 0);

// get topic id
Uuid topicId = admin.describeTopics(List.of(topicName)).topicNameValues().get(topicName).get().topicId();

// produce some messages
try (Producer<byte[], byte[]> producer = createProducer()) {
ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(
tpMulti.topic(),
tpMulti.partition(),
null,
"key".getBytes(),
"value".getBytes()
);
IntStream.range(0, 10).forEach(__ -> producer.send(record));
producer.flush();
}

// consume messages
try (ShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(groupId)) {
shareConsumer.subscribe(List.of(topicName));
alterShareAutoOffsetReset(groupId, "earliest");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Experience with other tests tells us that it's best to set the auto-offset reset earlier in the test. It's an asynchronous action that may or may not take effect by the time consumption begins.

ConsumerRecords<byte[], byte[]> records = shareConsumer.poll(Duration.ofMillis(5000));
assertEquals(10, records.count());
}

// get current share coordinator node
SharePartitionKey key = SharePartitionKey.getInstance(groupId, new TopicIdPartition(topicId, tpMulti));
int shareGroupStateTp = Utils.abs(key.asCoordinatorKey().hashCode()) % 3;
List<Integer> curShareCoordNodeId = admin.describeTopics(List.of(Topic.SHARE_GROUP_STATE_TOPIC_NAME)).topicNameValues().get(Topic.SHARE_GROUP_STATE_TOPIC_NAME).get()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd prefer admin.describeTopics(...).allTopicNames().get(Topic.SHARE_GROUP_TOPIC_NAME). I think that yields the same result more tersely.

.partitions().stream()
.filter(info -> info.partition() == shareGroupStateTp)
.map(info -> info.leader().id())
.toList();

assertEquals(1, curShareCoordNodeId.size());

// shutdown the coordinator
cluster.shutdownBroker(curShareCoordNodeId.get(0));

// give some breathing time
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you refactor this to wait for the condition rather than sleep for 2 seconds and expect immediate compliance?

TimeUnit.SECONDS.sleep(2L);

List<Integer> newShareCoordNodeId = admin.describeTopics(List.of(Topic.SHARE_GROUP_STATE_TOPIC_NAME)).topicNameValues().get(Topic.SHARE_GROUP_STATE_TOPIC_NAME).get()
.partitions().stream()
.filter(info -> info.partition() == shareGroupStateTp)
.map(info -> info.leader().id())
.toList();

assertEquals(1, newShareCoordNodeId.size());
assertNotEquals(curShareCoordNodeId.get(0), newShareCoordNodeId.get(0));

// again produce to same topic partition
try (Producer<byte[], byte[]> producer = createProducer()) {
ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(
tpMulti.topic(),
tpMulti.partition(),
null,
"key".getBytes(),
"value".getBytes()
);
IntStream.range(0, 10).forEach(__ -> producer.send(record));
producer.flush();
}

// consume messages should only be possible if partition and share coord has moved
// from shutdown broker since we are only producing to partition 0 of topic.
try (ShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(groupId)) {
shareConsumer.subscribe(List.of(topicName));
alterShareAutoOffsetReset(groupId, "earliest");
ConsumerRecords<byte[], byte[]> records = shareConsumer.poll(Duration.ofMillis(5000));
assertEquals(20, records.count());
}

verifyShareGroupStateTopicRecordsProduced();
}
}

@ClusterTest(
brokers = 3,
serverProperties = {
@ClusterConfigProperty(key = "auto.create.topics.enable", value = "false"),
@ClusterConfigProperty(key = "group.coordinator.rebalance.protocols", value = "classic,consumer,share"),
@ClusterConfigProperty(key = "group.share.enable", value = "true"),
@ClusterConfigProperty(key = "group.share.partition.max.record.locks", value = "10000"),
@ClusterConfigProperty(key = "group.share.record.lock.duration.ms", value = "15000"),
@ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "3"),
@ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "3"),
@ClusterConfigProperty(key = "share.coordinator.state.topic.min.isr", value = "1"),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comments as previous test regarding min.isr and replication factor defaults which are appropriate without overrides.

@ClusterConfigProperty(key = "share.coordinator.state.topic.num.partitions", value = "3"),
@ClusterConfigProperty(key = "share.coordinator.state.topic.replication.factor", value = "3"),
@ClusterConfigProperty(key = "transaction.state.log.min.isr", value = "1"),
@ClusterConfigProperty(key = "transaction.state.log.replication.factor", value = "3"),
@ClusterConfigProperty(key = "unstable.api.versions.enable", value = "true")
}
)
public void testComplexShareConsumer() throws Exception {
setup();
String topicName = "multipart";
String groupId = "multipartGrp";
createTopic(topicName, 3, 3);
TopicPartition multiTp = new TopicPartition(topicName, 0);

ExecutorService executer = Executors.newCachedThreadPool();

AtomicBoolean prodDone = new AtomicBoolean(false);
AtomicInteger sentCount = new AtomicInteger(0);

// produce messages until we want
executer.execute(() -> {
while (!prodDone.get()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This makes a separate producer per loop iteration. I suggest re-using the producer across loop iterations. You can still use try-with-resources.

try (Producer<byte[], byte[]> producer = createProducer()) {
ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(multiTp.topic(), multiTp.partition(), null, "key".getBytes(), "value".getBytes());
producer.send(record);
producer.flush();
sentCount.incrementAndGet();
}
}
});

// init a complex share consumer
ComplexShareConsumer<byte[], byte[]> complexCons1 = new ComplexShareConsumer<>(
cluster.bootstrapServers(),
topicName,
groupId,
Map.of()
);

executer.execute(complexCons1);

// let the complex consumer read the messages
executer.execute(() -> {
try {
TimeUnit.SECONDS.sleep(10L);
prodDone.set(true);
} catch (InterruptedException e) {
// ignore
}
});

// all messages which can be read are read, some would be redelivered
TestUtils.waitForCondition(complexCons1::isDone, 30_000L, () -> "did not close!");
assertTrue(sentCount.get() < complexCons1.recordsRead());

executer.shutdown();
executer.shutdownNow();

verifyShareGroupStateTopicRecordsProduced();
}

private int produceMessages(int messageCount) {
try (Producer<byte[], byte[]> producer = createProducer()) {
ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes());
Expand Down Expand Up @@ -1905,9 +2090,13 @@ private <K, V> List<ConsumerRecord<K, V>> consumeRecords(ShareConsumer<K, V> con
}

private void createTopic(String topicName) {
createTopic(topicName, 1, 1);
}

private void createTopic(String topicName, int numPartitions, int replicationFactor) {
assertDoesNotThrow(() -> {
try (Admin admin = createAdminClient()) {
admin.createTopics(Collections.singleton(new NewTopic(topicName, 1, (short) 1))).all().get();
admin.createTopics(Collections.singleton(new NewTopic(topicName, numPartitions, (short) replicationFactor))).all().get();
}
}, "Failed to create topic");
}
Expand Down Expand Up @@ -2015,4 +2204,88 @@ private void alterShareAutoOffsetReset(String groupId, String newValue) {
.get(60, TimeUnit.SECONDS), "Failed to alter configs");
}
}

private static class ComplexShareConsumer<K, V> implements Runnable {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This really needs a comment about what it's up to.

public static final int POLL_TIMEOUT_MS = 15000;
public static final int MAX_DELIVERY_COUNT = 5;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This 5 is really ShareGroupConfig.SHARE_GROUP_DELIVERY_COUNT_LIMIT_DEFAULT.


private final String topicName;
private final Map<String, Object> configs = new HashMap<>();
private final AtomicBoolean isDone = new AtomicBoolean(false);
private final AtomicBoolean shouldLoop = new AtomicBoolean(true);
private final AtomicInteger readCount = new AtomicInteger(0);
private final Predicate<ConsumerRecords<K, V>> exitCriteria;
private final BiConsumer<ShareConsumer<K, V>, ConsumerRecord<K, V>> processFunc;

ComplexShareConsumer(
String bootstrapServers,
String topicName,
String groupId,
Map<String, Object> additionalProperties
) {
this(
bootstrapServers,
topicName,
groupId,
additionalProperties,
records -> records.count() == 0,
(consumer, record) -> {
short deliveryCountBeforeAccept = (short) ((record.offset() + record.offset() / (MAX_DELIVERY_COUNT + 2)) % (MAX_DELIVERY_COUNT + 2));
if (deliveryCountBeforeAccept == 0) {
consumer.acknowledge(record, AcknowledgeType.REJECT);
} else if (record.deliveryCount().get() == deliveryCountBeforeAccept) {
consumer.acknowledge(record, AcknowledgeType.ACCEPT);
} else {
consumer.acknowledge(record, AcknowledgeType.RELEASE);
}
}
);
}

ComplexShareConsumer(
String bootstrapServers,
String topicName,
String groupId,
Map<String, Object> additionalProperties,
Predicate<ConsumerRecords<K, V>> exitCriteria,
BiConsumer<ShareConsumer<K, V>, ConsumerRecord<K, V>> processFunc
) {
this.exitCriteria = Objects.requireNonNull(exitCriteria);
this.processFunc = Objects.requireNonNull(processFunc);
this.topicName = topicName;
this.configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
this.configs.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
this.configs.putAll(additionalProperties);
this.configs.putIfAbsent(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
this.configs.putIfAbsent(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
}

void stop() {
shouldLoop.set(false);
}

@Override
public void run() {
try (ShareConsumer<K, V> consumer = new KafkaShareConsumer<>(configs)) {
consumer.subscribe(Set.of(this.topicName));
while (shouldLoop.get()) {
ConsumerRecords<K, V> records = consumer.poll(Duration.ofMillis(POLL_TIMEOUT_MS));
readCount.addAndGet(records.count());
if (exitCriteria.test(records)) {
break;
}
records.forEach(record -> processFunc.accept(consumer, record));
}
}
isDone.set(true);
}

boolean isDone() {
return isDone.get();
}

int recordsRead() {
return readCount.get();
}
}
}
Loading