Skip to content

Commit

Permalink
KAFKA-18310: Flaky AbstractCoordinatorTest
Browse files Browse the repository at this point in the history
Signed-off-by: PoAn Yang <[email protected]>
  • Loading branch information
FrankYang0529 committed Jan 23, 2025
1 parent 94a1bfb commit 2bfce44
Showing 1 changed file with 69 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@
import org.apache.kafka.test.TestUtils;

import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;

import java.nio.ByteBuffer;
Expand Down Expand Up @@ -1435,7 +1434,6 @@ public void testWakeupAfterJoinGroupReceivedExternalCompletion() throws Exceptio
awaitFirstHeartbeat(heartbeatReceived);
}

@Tag("flaky") // "KAFKA-18310"
@Test
public void testWakeupAfterSyncGroupSentExternalCompletion() throws Exception {
setupCoordinator();
Expand All @@ -1456,14 +1454,34 @@ public boolean matches(AbstractRequest body) {
}, syncGroupResponse(Errors.NONE));
AtomicBoolean heartbeatReceived = prepareFirstHeartbeat();

assertThrows(WakeupException.class, () -> coordinator.ensureActiveGroup(), "Should have woken up from ensureActiveGroup()");
// If joinFuture finishes too fast, coordinator#ensureActiveGroup doesn't throw WakeupException,
// because there is no next ConsumerNetworkClient#poll to trigger RequestMatcher from SyncGroupRequest.
boolean exceptionCaught = false;
try {
coordinator.ensureActiveGroup();
} catch (WakeupException ignored) {
exceptionCaught = true;
}

assertEquals(1, coordinator.onJoinPrepareInvokes);
assertEquals(0, coordinator.onJoinCompleteInvokes);
assertFalse(heartbeatReceived.get());
if (exceptionCaught) {
assertEquals(0, coordinator.onJoinCompleteInvokes);
assertFalse(heartbeatReceived.get());
} else {
// Although the first ensureActiveGroup() doesn't throw WakeupException,
// heartbeat thread in the background triggers KafkaClient#pollNoWakeup to complete SyncGroupRequest.
// The ConsumerNetworkClient#wakeup is true, so there should have WakeupException from KafkaClient#poll.
TestUtils.waitForCondition(() -> {
try {
consumerClient.poll(mockTime.timer(0));
return false;
} catch (WakeupException e) {
return true;
}
}, "Should have WakeupException from poll");
}

// the join group completes in this poll()
consumerClient.poll(mockTime.timer(0));
// the join group completes in this ensureActiveGroup() or it's already completed if exceptionCaught is false
coordinator.ensureActiveGroup();

assertEquals(1, coordinator.onJoinPrepareInvokes);
Expand All @@ -1472,7 +1490,6 @@ public boolean matches(AbstractRequest body) {
awaitFirstHeartbeat(heartbeatReceived);
}

@Tag("flaky") // "KAFKA-18310"
@Test
public void testWakeupAfterSyncGroupReceived() throws Exception {
setupCoordinator();
Expand All @@ -1488,15 +1505,32 @@ public void testWakeupAfterSyncGroupReceived() throws Exception {
}, syncGroupResponse(Errors.NONE));
AtomicBoolean heartbeatReceived = prepareFirstHeartbeat();

// If joinFuture finishes too fast, coordinator#ensureActiveGroup doesn't throw WakeupException,
// because there is no next ConsumerNetworkClient#poll to trigger RequestMatcher from SyncGroupRequest.
boolean exceptionCaught = false;
try {
coordinator.ensureActiveGroup();
fail("Should have woken up from ensureActiveGroup()");
} catch (WakeupException ignored) {
exceptionCaught = true;
}

assertEquals(1, coordinator.onJoinPrepareInvokes);
assertEquals(0, coordinator.onJoinCompleteInvokes);
assertFalse(heartbeatReceived.get());
if (exceptionCaught) {
assertEquals(0, coordinator.onJoinCompleteInvokes);
assertFalse(heartbeatReceived.get());
} else {
// Although the first ensureActiveGroup() doesn't throw WakeupException,
// heartbeat thread in the background triggers KafkaClient#pollNoWakeup to complete SyncGroupRequest.
// The ConsumerNetworkClient#wakeup is true, so there should have WakeupException from KafkaClient#poll.
TestUtils.waitForCondition(() -> {
try {
consumerClient.poll(mockTime.timer(0));
return false;
} catch (WakeupException e) {
return true;
}
}, "Should have WakeupException from poll");
}

coordinator.ensureActiveGroup();

Expand All @@ -1506,7 +1540,6 @@ public void testWakeupAfterSyncGroupReceived() throws Exception {
awaitFirstHeartbeat(heartbeatReceived);
}

@Tag("flaky") // KAFKA-15474 and KAFKA-18310
@Test
public void testWakeupAfterSyncGroupReceivedExternalCompletion() throws Exception {
setupCoordinator();
Expand All @@ -1522,11 +1555,32 @@ public void testWakeupAfterSyncGroupReceivedExternalCompletion() throws Exceptio
}, syncGroupResponse(Errors.NONE));
AtomicBoolean heartbeatReceived = prepareFirstHeartbeat();

assertThrows(WakeupException.class, () -> coordinator.ensureActiveGroup(), "Should have woken up from ensureActiveGroup()");
// If joinFuture finishes too fast, coordinator#ensureActiveGroup doesn't throw WakeupException,
// because there is no next ConsumerNetworkClient#poll to trigger RequestMatcher from SyncGroupRequest.
boolean exceptionCaught = false;
try {
coordinator.ensureActiveGroup();
} catch (WakeupException ignored) {
exceptionCaught = true;
}

assertEquals(1, coordinator.onJoinPrepareInvokes);
assertEquals(0, coordinator.onJoinCompleteInvokes);
assertFalse(heartbeatReceived.get());
if (exceptionCaught) {
assertEquals(0, coordinator.onJoinCompleteInvokes);
assertFalse(heartbeatReceived.get());
} else {
// Although the first ensureActiveGroup() doesn't throw WakeupException,
// heartbeat thread in the background triggers KafkaClient#pollNoWakeup to complete SyncGroupRequest.
// The ConsumerNetworkClient#wakeup is true, so there should have WakeupException from KafkaClient#poll.
TestUtils.waitForCondition(() -> {
try {
consumerClient.poll(mockTime.timer(0));
return false;
} catch (WakeupException e) {
return true;
}
}, "Should have WakeupException from poll");
}

coordinator.ensureActiveGroup();

Expand Down

0 comments on commit 2bfce44

Please sign in to comment.