Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Minor: Cherry pick Streams's State directory lock back off retry to 3.8 #18660

Open
wants to merge 1 commit into
base: 3.8
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -362,8 +362,6 @@ synchronized boolean lock(final TaskId taskId) {
throw new IllegalStateException("The state directory has been deleted");
} else {
lockedTasksToOwner.put(taskId, Thread.currentThread());
// make sure the task directory actually exists, and create it if not
getOrCreateDirectoryForTask(taskId);
return true;
}
}
Expand Down Expand Up @@ -679,5 +677,4 @@ public int hashCode() {
return Objects.hash(file, namedTopology);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.utils.ExponentialBackoff;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.errors.LockException;
Expand Down Expand Up @@ -103,6 +104,8 @@ public class TaskManager {
// includes assigned & initialized tasks and unassigned tasks we locked temporarily during rebalance
private final Set<TaskId> lockedTaskDirectories = new HashSet<>();

private Map<TaskId, BackoffRecord> taskIdToBackoffRecord = new HashMap<>();

private final ActiveTaskCreator activeTaskCreator;
private final StandbyTaskCreator standbyTaskCreator;
private final StateUpdater stateUpdater;
Expand Down Expand Up @@ -1006,14 +1009,22 @@ private void addTasksToStateUpdater() {
}

private void addTaskToStateUpdater(final Task task) {
final long nowMs = time.milliseconds();
try {
task.initializeIfNeeded();
stateUpdater.add(task);
if (canTryInitializeTask(task.id(), nowMs)) {
task.initializeIfNeeded();
taskIdToBackoffRecord.remove(task.id());
stateUpdater.add(task);
} else {
log.trace("Task {} is still not allowed to retry acquiring the state directory lock", task.id());
tasks.addPendingTasksToInit(Collections.singleton(task));
}
} catch (final LockException lockException) {
// The state directory may still be locked by another thread, when the rebalance just happened.
// Retry in the next iteration.
log.info("Encountered lock exception. Reattempting locking the state in the next iteration.", lockException);
tasks.addPendingTasksToInit(Collections.singleton(task));
updateOrCreateBackoffRecord(task.id(), nowMs);
}
}

Expand Down Expand Up @@ -1769,7 +1780,6 @@ private Stream<Task> standbyTaskStream() {
return standbyTasksInTaskRegistry;
}
}

// For testing only.
int commitAll() {
return commit(tasks.allTasks());
Expand Down Expand Up @@ -2116,4 +2126,37 @@ boolean needsInitializationOrRestoration() {
void addTask(final Task task) {
tasks.addTask(task);
}

private boolean canTryInitializeTask(final TaskId taskId, final long nowMs) {
return !taskIdToBackoffRecord.containsKey(taskId) || taskIdToBackoffRecord.get(taskId).canAttempt(nowMs);
}

private void updateOrCreateBackoffRecord(final TaskId taskId, final long nowMs) {
if (taskIdToBackoffRecord.containsKey(taskId)) {
taskIdToBackoffRecord.get(taskId).recordAttempt(nowMs);
} else {
taskIdToBackoffRecord.put(taskId, new BackoffRecord(nowMs));
}
}

public static class BackoffRecord {
private long attempts;
private long lastAttemptMs;
private static final ExponentialBackoff EXPONENTIAL_BACKOFF = new ExponentialBackoff(1000, 2, 10000, 0.5);


public BackoffRecord(final long nowMs) {
this.attempts = 1;
this.lastAttemptMs = nowMs;
}

public void recordAttempt(final long nowMs) {
this.attempts++;
this.lastAttemptMs = nowMs;
}

public boolean canAttempt(final long nowMs) {
return nowMs - lastAttemptMs >= EXPONENTIAL_BACKOFF.backoff(attempts);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1134,7 +1134,7 @@ public void shouldRecordCommitLatency() {
topologyMetadata.buildAndRewriteTopology();

final TaskManager taskManager = new TaskManager(
null,
new MockTime(),
changelogReader,
null,
null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1250,6 +1250,54 @@ public void shouldRetryInitializationWhenLockExceptionInStateUpdater() {
verify(stateUpdater).add(task01);
}

@Test
public void shouldRetryInitializationWithBackoffWhenInitializationFails() {
final StreamTask task00 = statefulTask(taskId00, taskId00ChangelogPartitions)
.withInputPartitions(taskId00Partitions)
.inState(State.RESTORING).build();
final StandbyTask task01 = standbyTask(taskId01, taskId01ChangelogPartitions)
.withInputPartitions(taskId01Partitions)
.inState(State.RUNNING).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
when(tasks.drainPendingTasksToInit()).thenReturn(mkSet(task00, task01));
doThrow(new LockException("Lock Exception!")).when(task00).initializeIfNeeded();
taskManager = setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, true);

taskManager.checkStateUpdater(time.milliseconds(), noOpResetter);

// task00 should not be initialized due to LockException, task01 should be initialized
verify(task00).initializeIfNeeded();
verify(task01).initializeIfNeeded();
verify(tasks).addPendingTasksToInit(
argThat(tasksToInit -> tasksToInit.contains(task00) && !tasksToInit.contains(task01))
);
verify(stateUpdater, never()).add(task00);
verify(stateUpdater).add(task01);

time.sleep(500);

taskManager.checkStateUpdater(time.milliseconds(), noOpResetter);

// task00 should not be initialized since the backoff period has not passed
verify(task00, times(1)).initializeIfNeeded();
verify(tasks, times(2)).addPendingTasksToInit(
argThat(tasksToInit -> tasksToInit.contains(task00))
);
verify(stateUpdater, never()).add(task00);

time.sleep(5000);

// task00 should call initialize since the backoff period has passed
doNothing().when(task00).initializeIfNeeded();
taskManager.checkStateUpdater(time.milliseconds(), noOpResetter);

verify(task00, times(2)).initializeIfNeeded();
verify(tasks, times(2)).addPendingTasksToInit(
argThat(tasksToInit -> tasksToInit.contains(task00))
);
verify(stateUpdater).add(task00);
}

@Test
public void shouldRethrowRuntimeExceptionInInitTaskWithStateUpdater() {
final StreamTask task00 = statefulTask(taskId00, taskId00ChangelogPartitions)
Expand Down
Loading