Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-18310: Flaky AbstractCoordinatorTest #18665

Open
wants to merge 1 commit into
base: trunk
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@
import org.apache.kafka.test.TestUtils;

import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;

import java.nio.ByteBuffer;
Expand Down Expand Up @@ -1435,7 +1434,6 @@ public void testWakeupAfterJoinGroupReceivedExternalCompletion() throws Exceptio
awaitFirstHeartbeat(heartbeatReceived);
}

@Tag("flaky") // "KAFKA-18310"
@Test
public void testWakeupAfterSyncGroupSentExternalCompletion() throws Exception {
setupCoordinator();
Expand All @@ -1456,14 +1454,34 @@ public boolean matches(AbstractRequest body) {
}, syncGroupResponse(Errors.NONE));
AtomicBoolean heartbeatReceived = prepareFirstHeartbeat();

assertThrows(WakeupException.class, () -> coordinator.ensureActiveGroup(), "Should have woken up from ensureActiveGroup()");
// If joinFuture finishes too fast, coordinator#ensureActiveGroup doesn't throw WakeupException,
// because there is no next ConsumerNetworkClient#poll to trigger RequestMatcher from SyncGroupRequest.
boolean exceptionCaught = false;
try {
coordinator.ensureActiveGroup();
} catch (WakeupException ignored) {
exceptionCaught = true;
}

assertEquals(1, coordinator.onJoinPrepareInvokes);
assertEquals(0, coordinator.onJoinCompleteInvokes);
assertFalse(heartbeatReceived.get());
if (exceptionCaught) {
assertEquals(0, coordinator.onJoinCompleteInvokes);
assertFalse(heartbeatReceived.get());
} else {
// Although the first ensureActiveGroup() doesn't throw WakeupException,
// heartbeat thread in the background triggers KafkaClient#pollNoWakeup to complete SyncGroupRequest.
// The ConsumerNetworkClient#wakeup is true, so there should have WakeupException from KafkaClient#poll.
TestUtils.waitForCondition(() -> {
try {
consumerClient.poll(mockTime.timer(0));
return false;
} catch (WakeupException e) {
return true;
}
}, "Should have WakeupException from poll");
}

// the join group completes in this poll()
consumerClient.poll(mockTime.timer(0));
// the join group completes in this ensureActiveGroup() or it's already completed if exceptionCaught is false
coordinator.ensureActiveGroup();

assertEquals(1, coordinator.onJoinPrepareInvokes);
Expand All @@ -1472,7 +1490,6 @@ public boolean matches(AbstractRequest body) {
awaitFirstHeartbeat(heartbeatReceived);
}

@Tag("flaky") // "KAFKA-18310"
@Test
public void testWakeupAfterSyncGroupReceived() throws Exception {
setupCoordinator();
Expand All @@ -1488,15 +1505,32 @@ public void testWakeupAfterSyncGroupReceived() throws Exception {
}, syncGroupResponse(Errors.NONE));
AtomicBoolean heartbeatReceived = prepareFirstHeartbeat();

// If joinFuture finishes too fast, coordinator#ensureActiveGroup doesn't throw WakeupException,
// because there is no next ConsumerNetworkClient#poll to trigger RequestMatcher from SyncGroupRequest.
boolean exceptionCaught = false;
try {
coordinator.ensureActiveGroup();
fail("Should have woken up from ensureActiveGroup()");
} catch (WakeupException ignored) {
exceptionCaught = true;
}

assertEquals(1, coordinator.onJoinPrepareInvokes);
assertEquals(0, coordinator.onJoinCompleteInvokes);
assertFalse(heartbeatReceived.get());
if (exceptionCaught) {
assertEquals(0, coordinator.onJoinCompleteInvokes);
assertFalse(heartbeatReceived.get());
} else {
// Although the first ensureActiveGroup() doesn't throw WakeupException,
// heartbeat thread in the background triggers KafkaClient#pollNoWakeup to complete SyncGroupRequest.
// The ConsumerNetworkClient#wakeup is true, so there should have WakeupException from KafkaClient#poll.
TestUtils.waitForCondition(() -> {
try {
consumerClient.poll(mockTime.timer(0));
return false;
} catch (WakeupException e) {
return true;
}
}, "Should have WakeupException from poll");
}

coordinator.ensureActiveGroup();

Expand All @@ -1506,7 +1540,6 @@ public void testWakeupAfterSyncGroupReceived() throws Exception {
awaitFirstHeartbeat(heartbeatReceived);
}

@Tag("flaky") // KAFKA-15474 and KAFKA-18310
@Test
public void testWakeupAfterSyncGroupReceivedExternalCompletion() throws Exception {
setupCoordinator();
Expand All @@ -1522,11 +1555,32 @@ public void testWakeupAfterSyncGroupReceivedExternalCompletion() throws Exceptio
}, syncGroupResponse(Errors.NONE));
AtomicBoolean heartbeatReceived = prepareFirstHeartbeat();

assertThrows(WakeupException.class, () -> coordinator.ensureActiveGroup(), "Should have woken up from ensureActiveGroup()");
// If joinFuture finishes too fast, coordinator#ensureActiveGroup doesn't throw WakeupException,
// because there is no next ConsumerNetworkClient#poll to trigger RequestMatcher from SyncGroupRequest.
boolean exceptionCaught = false;
try {
coordinator.ensureActiveGroup();
} catch (WakeupException ignored) {
exceptionCaught = true;
}

assertEquals(1, coordinator.onJoinPrepareInvokes);
assertEquals(0, coordinator.onJoinCompleteInvokes);
assertFalse(heartbeatReceived.get());
if (exceptionCaught) {
assertEquals(0, coordinator.onJoinCompleteInvokes);
assertFalse(heartbeatReceived.get());
} else {
// Although the first ensureActiveGroup() doesn't throw WakeupException,
// heartbeat thread in the background triggers KafkaClient#pollNoWakeup to complete SyncGroupRequest.
// The ConsumerNetworkClient#wakeup is true, so there should have WakeupException from KafkaClient#poll.
TestUtils.waitForCondition(() -> {
try {
consumerClient.poll(mockTime.timer(0));
return false;
} catch (WakeupException e) {
return true;
}
}, "Should have WakeupException from poll");
}

coordinator.ensureActiveGroup();

Expand Down
Loading