Skip to content

Commit

Permalink
KAFKA-18517: Enable ConsumerBounceTest to run for new async consumer (#…
Browse files Browse the repository at this point in the history
…18532)

Reviewers: Andrew Schofield <[email protected]>, Kirk True <[email protected]>
  • Loading branch information
lianetm authored Jan 22, 2025
1 parent f4d9039 commit 410065a
Showing 1 changed file with 31 additions and 14 deletions.
45 changes: 31 additions & 14 deletions core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
import org.apache.kafka.server.config.{KRaftConfigs, ReplicationConfigs, ServerLogConfigs}
import org.apache.kafka.server.util.ShutdownableThread
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, Disabled, Test, TestInfo}
import org.junit.jupiter.api.{AfterEach, Disabled, TestInfo}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.MethodSource

Expand Down Expand Up @@ -59,7 +59,12 @@ class ConsumerBounceTest extends AbstractConsumerTest with Logging {
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG -> "1",
GroupCoordinatorConfig.GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG -> "10", // set small enough session timeout
GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG -> "0",

// Tests will run for CONSUMER and CLASSIC group protocol, so set the group max size property
// required for each.
GroupCoordinatorConfig.CONSUMER_GROUP_MAX_SIZE_CONFIG -> maxGroupSize.toString,
GroupCoordinatorConfig.GROUP_MAX_SIZE_CONFIG -> maxGroupSize.toString,

ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_CONFIG -> "false",
ReplicationConfigs.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG -> "true",
ReplicationConfigs.UNCLEAN_LEADER_ELECTION_INTERVAL_MS_CONFIG -> "50",
Expand Down Expand Up @@ -94,7 +99,7 @@ class ConsumerBounceTest extends AbstractConsumerTest with Logging {
}

@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly"))
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
def testConsumptionWithBrokerFailures(quorum: String, groupProtocol: String): Unit = consumeWithBrokerFailures(10)

/*
Expand Down Expand Up @@ -139,7 +144,7 @@ class ConsumerBounceTest extends AbstractConsumerTest with Logging {
}

@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly"))
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
def testSeekAndCommitWithBrokerFailures(quorum: String, groupProtocol: String): Unit = seekAndCommitWithBrokerFailures(5)

def seekAndCommitWithBrokerFailures(numIters: Int): Unit = {
Expand Down Expand Up @@ -183,7 +188,7 @@ class ConsumerBounceTest extends AbstractConsumerTest with Logging {
}

@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly"))
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
def testSubscribeWhenTopicUnavailable(quorum: String, groupProtocol: String): Unit = {
val numRecords = 1000
val newtopic = "newtopic"
Expand Down Expand Up @@ -243,7 +248,7 @@ class ConsumerBounceTest extends AbstractConsumerTest with Logging {

checkCloseGoodPath(numRecords, "group1")
checkCloseWithCoordinatorFailure(numRecords, "group2", "group3")
checkCloseWithClusterFailure(numRecords, "group4", "group5")
checkCloseWithClusterFailure(numRecords, "group4", "group5", groupProtocol)
}

/**
Expand Down Expand Up @@ -297,12 +302,15 @@ class ConsumerBounceTest extends AbstractConsumerTest with Logging {
* there is no coordinator, but close should timeout and return. If close is invoked with a very
* large timeout, close should timeout after request timeout.
*/
private def checkCloseWithClusterFailure(numRecords: Int, group1: String, group2: String): Unit = {
private def checkCloseWithClusterFailure(numRecords: Int, group1: String, group2: String,
groupProtocol: String): Unit = {
val consumer1 = createConsumerAndReceive(group1, manualAssign = false, numRecords)

val requestTimeout = 6000
this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "5000")
this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "1000")
if (groupProtocol.equals(GroupProtocol.CLASSIC.name)) {
this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "5000")
this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "1000")
}
this.consumerConfig.setProperty(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, requestTimeout.toString)
val consumer2 = createConsumerAndReceive(group2, manualAssign = true, numRecords)

Expand All @@ -319,17 +327,20 @@ class ConsumerBounceTest extends AbstractConsumerTest with Logging {
* the group should be forced to rebalance when it becomes hosted on a Coordinator with the new config.
* Then, 1 consumer should be left out of the group.
*/
@Test
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
@Disabled // TODO: To be re-enabled once we can make it less flaky (KAFKA-13421)
def testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup(): Unit = {
def testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup(quorum: String, groupProtocol: String): Unit = {
val group = "group-max-size-test"
val topic = "group-max-size-test"
val maxGroupSize = 2
val consumerCount = maxGroupSize + 1
val partitionCount = consumerCount * 2

this.consumerConfig.setProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "60000")
this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "1000")
if (groupProtocol.equals(GroupProtocol.CLASSIC.name)) {
this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "1000")
}
this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
val partitions = createTopicPartitions(topic, numPartitions = partitionCount, replicationFactor = brokerCount)

Expand Down Expand Up @@ -361,12 +372,14 @@ class ConsumerBounceTest extends AbstractConsumerTest with Logging {
* When we have the consumer group max size configured to X, the X+1th consumer trying to join should receive a fatal exception
*/
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly"))
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
def testConsumerReceivesFatalExceptionWhenGroupPassesMaxSize(quorum: String, groupProtocol: String): Unit = {
val group = "fatal-exception-test"
val topic = "fatal-exception-test"
this.consumerConfig.setProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "60000")
this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "1000")
if (groupProtocol.equals(GroupProtocol.CLASSIC.name)) {
this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "1000")
}
this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")

val partitions = createTopicPartitions(topic, numPartitions = maxGroupSize, replicationFactor = brokerCount)
Expand Down Expand Up @@ -401,11 +414,15 @@ class ConsumerBounceTest extends AbstractConsumerTest with Logging {
*/
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly"))
// TODO: enable for all protocols after fix for not generating/blocking on unneeded
// FindCoordinator on close for the new consumer
def testCloseDuringRebalance(quorum: String, groupProtocol: String): Unit = {
val topic = "closetest"
createTopic(topic, 10, brokerCount)
this.consumerConfig.setProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "60000")
this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "1000")
if (groupProtocol.equals(GroupProtocol.CLASSIC.name)) {
this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "1000")
}
this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
checkCloseDuringRebalance("group1", topic, executor, brokersAvailableDuringClose = true)
}
Expand Down

0 comments on commit 410065a

Please sign in to comment.