Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-18618: Improve leader change handling of acknowledgements #18672

Draft
wants to merge 1 commit into
base: trunk
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.consumer.internals;

/**
* This class combines Acknowledgements with the id of the node to use for acknowledging.
*/
public class NodeAcknowledgements {
private final int nodeId;
private final Acknowledgements acknowledgements;

public NodeAcknowledgements(int nodeId, Acknowledgements acknowledgements) {
this.nodeId = nodeId;
this.acknowledgements = acknowledgements;
}

public int getNodeId() {
return nodeId;
}

public Acknowledgements getAcknowledgements() {
return acknowledgements;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@
* to keep track of aborted transactions or the need to keep track of fetch position.
*/
public class ShareCompletedFetch {

final int nodeId;
final TopicIdPartition partition;
final ShareFetchResponseData.PartitionData partitionData;
final short requestVersion;
Expand All @@ -79,12 +79,14 @@ public class ShareCompletedFetch {

ShareCompletedFetch(final LogContext logContext,
final BufferSupplier decompressionBufferSupplier,
final int nodeId,
final TopicIdPartition partition,
final ShareFetchResponseData.PartitionData partitionData,
final ShareFetchMetricsAggregator metricAggregator,
final short requestVersion) {
this.log = logContext.logger(org.apache.kafka.clients.consumer.internals.ShareCompletedFetch.class);
this.decompressionBufferSupplier = decompressionBufferSupplier;
this.nodeId = nodeId;
this.partition = partition;
this.partitionData = partitionData;
this.metricAggregator = metricAggregator;
Expand Down Expand Up @@ -156,7 +158,7 @@ <K, V> ShareInFlightBatch<K, V> fetchRecords(final Deserializers<K, V> deseriali
final int maxRecords,
final boolean checkCrcs) {
// Creating an empty ShareInFlightBatch
ShareInFlightBatch<K, V> inFlightBatch = new ShareInFlightBatch<>(partition);
ShareInFlightBatch<K, V> inFlightBatch = new ShareInFlightBatch<>(nodeId, partition);

if (cachedBatchException != null) {
// If the event that a CRC check fails, reject the entire record batch because it is corrupt.
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -611,7 +611,7 @@ public synchronized ConsumerRecords<K, V> poll(final Duration timeout) {
private ShareFetch<K, V> pollForFetches(final Timer timer) {
long pollTimeout = Math.min(applicationEventHandler.maximumTimeToWait(), timer.remainingMs());

Map<TopicIdPartition, Acknowledgements> acknowledgementsMap = currentFetch.takeAcknowledgedRecords();
Map<TopicIdPartition, NodeAcknowledgements> acknowledgementsMap = currentFetch.takeAcknowledgedRecords();

// If data is available already, return it immediately
final ShareFetch<K, V> fetch = collect(acknowledgementsMap);
Expand All @@ -636,7 +636,7 @@ private ShareFetch<K, V> pollForFetches(final Timer timer) {
return collect(Collections.emptyMap());
}

private ShareFetch<K, V> collect(Map<TopicIdPartition, Acknowledgements> acknowledgementsMap) {
private ShareFetch<K, V> collect(Map<TopicIdPartition, NodeAcknowledgements> acknowledgementsMap) {
if (currentFetch.isEmpty()) {
final ShareFetch<K, V> fetch = fetchCollector.collect(fetchBuffer);
if (fetch.isEmpty()) {
Expand Down Expand Up @@ -709,7 +709,7 @@ public Map<TopicIdPartition, Optional<KafkaException>> commitSync(final Duration
acknowledgeBatchIfImplicitAcknowledgement(false);

Timer requestTimer = time.timer(timeout.toMillis());
Map<TopicIdPartition, Acknowledgements> acknowledgementsMap = acknowledgementsToSend();
Map<TopicIdPartition, NodeAcknowledgements> acknowledgementsMap = acknowledgementsToSend();
if (acknowledgementsMap.isEmpty()) {
return Collections.emptyMap();
} else {
Expand Down Expand Up @@ -757,7 +757,7 @@ public void commitAsync() {
// If using implicit acknowledgement, acknowledge the previously fetched records
acknowledgeBatchIfImplicitAcknowledgement(false);

Map<TopicIdPartition, Acknowledgements> acknowledgementsMap = acknowledgementsToSend();
Map<TopicIdPartition, NodeAcknowledgements> acknowledgementsMap = acknowledgementsToSend();
if (!acknowledgementsMap.isEmpty()) {
ShareAcknowledgeAsyncEvent event = new ShareAcknowledgeAsyncEvent(acknowledgementsMap);
applicationEventHandler.add(event);
Expand Down Expand Up @@ -1045,7 +1045,7 @@ private void acknowledgeBatchIfImplicitAcknowledgement(boolean calledOnPoll) {
/**
* Returns any ready acknowledgements to be sent to the cluster.
*/
private Map<TopicIdPartition, Acknowledgements> acknowledgementsToSend() {
private Map<TopicIdPartition, NodeAcknowledgements> acknowledgementsToSend() {
return currentFetch.takeAcknowledgedRecords();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,14 +138,15 @@ public void acknowledgeAll(final AcknowledgeType type) {
* to send. If some records were not acknowledged, the in-flight records will not be empty after this
* method.
*
* @return The map of acknowledgements to send
* @return The map of acknowledgements to send, along with node information
*/
public Map<TopicIdPartition, Acknowledgements> takeAcknowledgedRecords() {
Map<TopicIdPartition, Acknowledgements> acknowledgementMap = new LinkedHashMap<>();
public Map<TopicIdPartition, NodeAcknowledgements> takeAcknowledgedRecords() {
Map<TopicIdPartition, NodeAcknowledgements> acknowledgementMap = new LinkedHashMap<>();
batches.forEach((tip, batch) -> {
int nodeId = batch.nodeId();
Acknowledgements acknowledgements = batch.takeAcknowledgedRecords();
if (!acknowledgements.isEmpty())
acknowledgementMap.put(tip, acknowledgements);
acknowledgementMap.put(tip, new NodeAcknowledgements(nodeId, acknowledgements));
});
return acknowledgementMap;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,16 @@
import java.util.TreeSet;

public class ShareInFlightBatch<K, V> {
private final int nodeId;
final TopicIdPartition partition;
private final Map<Long, ConsumerRecord<K, V>> inFlightRecords;
private final Set<Long> acknowledgedRecords;
private Acknowledgements acknowledgements;
private KafkaException exception;
private boolean hasCachedException = false;

public ShareInFlightBatch(TopicIdPartition partition) {
public ShareInFlightBatch(int nodeId, TopicIdPartition partition) {
this.nodeId = nodeId;
this.partition = partition;
inFlightRecords = new TreeMap<>();
acknowledgedRecords = new TreeSet<>();
Expand Down Expand Up @@ -87,6 +89,10 @@ int numRecords() {
return inFlightRecords.size();
}

int nodeId() {
return nodeId;
}

Acknowledgements takeAcknowledgedRecords() {
// Usually, all records will be acknowledged, so we can just clear the in-flight records leaving
// an empty batch, which will trigger more fetching
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,12 @@ public class ShareSessionHandler {
*/
private final LinkedHashMap<TopicPartition, TopicIdPartition> sessionPartitions;

/*
/**
* The partitions to be included in the next ShareFetch request.
*/
private LinkedHashMap<TopicPartition, TopicIdPartition> nextPartitions;

/*
/**
* The acknowledgements to be included in the next ShareFetch/ShareAcknowledge request.
*/
private LinkedHashMap<TopicIdPartition, Acknowledgements> nextAcknowledgements;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,21 @@
*/
package org.apache.kafka.clients.consumer.internals.events;

import org.apache.kafka.clients.consumer.internals.Acknowledgements;
import org.apache.kafka.clients.consumer.internals.NodeAcknowledgements;
import org.apache.kafka.common.TopicIdPartition;

import java.util.Map;

public class ShareAcknowledgeAsyncEvent extends ApplicationEvent {

private final Map<TopicIdPartition, Acknowledgements> acknowledgementsMap;
private final Map<TopicIdPartition, NodeAcknowledgements> acknowledgementsMap;

public ShareAcknowledgeAsyncEvent(final Map<TopicIdPartition, Acknowledgements> acknowledgementsMap) {
public ShareAcknowledgeAsyncEvent(final Map<TopicIdPartition, NodeAcknowledgements> acknowledgementsMap) {
super(Type.SHARE_ACKNOWLEDGE_ASYNC);
this.acknowledgementsMap = acknowledgementsMap;
}

public Map<TopicIdPartition, Acknowledgements> acknowledgementsMap() {
public Map<TopicIdPartition, NodeAcknowledgements> acknowledgementsMap() {
return acknowledgementsMap;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,21 @@
*/
package org.apache.kafka.clients.consumer.internals.events;

import org.apache.kafka.clients.consumer.internals.Acknowledgements;
import org.apache.kafka.clients.consumer.internals.NodeAcknowledgements;
import org.apache.kafka.common.TopicIdPartition;

import java.util.Map;

public class ShareAcknowledgeOnCloseEvent extends CompletableApplicationEvent<Void> {

private final Map<TopicIdPartition, Acknowledgements> acknowledgementsMap;
private final Map<TopicIdPartition, NodeAcknowledgements> acknowledgementsMap;

public ShareAcknowledgeOnCloseEvent(final Map<TopicIdPartition, Acknowledgements> acknowledgementsMap, final long deadlineMs) {
public ShareAcknowledgeOnCloseEvent(final Map<TopicIdPartition, NodeAcknowledgements> acknowledgementsMap, final long deadlineMs) {
super(Type.SHARE_ACKNOWLEDGE_ON_CLOSE, deadlineMs);
this.acknowledgementsMap = acknowledgementsMap;
}

public Map<TopicIdPartition, Acknowledgements> acknowledgementsMap() {
public Map<TopicIdPartition, NodeAcknowledgements> acknowledgementsMap() {
return acknowledgementsMap;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,21 @@
package org.apache.kafka.clients.consumer.internals.events;

import org.apache.kafka.clients.consumer.internals.Acknowledgements;
import org.apache.kafka.clients.consumer.internals.NodeAcknowledgements;
import org.apache.kafka.common.TopicIdPartition;

import java.util.Map;

public class ShareAcknowledgeSyncEvent extends CompletableApplicationEvent<Map<TopicIdPartition, Acknowledgements>> {

private final Map<TopicIdPartition, Acknowledgements> acknowledgementsMap;
private final Map<TopicIdPartition, NodeAcknowledgements> acknowledgementsMap;

public ShareAcknowledgeSyncEvent(final Map<TopicIdPartition, Acknowledgements> acknowledgementsMap, final long deadlineMs) {
public ShareAcknowledgeSyncEvent(final Map<TopicIdPartition, NodeAcknowledgements> acknowledgementsMap, final long deadlineMs) {
super(Type.SHARE_ACKNOWLEDGE_SYNC, deadlineMs);
this.acknowledgementsMap = acknowledgementsMap;
}

public Map<TopicIdPartition, Acknowledgements> acknowledgementsMap() {
public Map<TopicIdPartition, NodeAcknowledgements> acknowledgementsMap() {
return acknowledgementsMap;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,21 @@
*/
package org.apache.kafka.clients.consumer.internals.events;

import org.apache.kafka.clients.consumer.internals.Acknowledgements;
import org.apache.kafka.clients.consumer.internals.NodeAcknowledgements;
import org.apache.kafka.common.TopicIdPartition;

import java.util.Map;

public class ShareFetchEvent extends ApplicationEvent {

private final Map<TopicIdPartition, Acknowledgements> acknowledgementsMap;
private final Map<TopicIdPartition, NodeAcknowledgements> acknowledgementsMap;

public ShareFetchEvent(Map<TopicIdPartition, Acknowledgements> acknowledgementsMap) {
public ShareFetchEvent(Map<TopicIdPartition, NodeAcknowledgements> acknowledgementsMap) {
super(Type.SHARE_FETCH);
this.acknowledgementsMap = acknowledgementsMap;
}

public Map<TopicIdPartition, Acknowledgements> acknowledgementsMap() {
public Map<TopicIdPartition, NodeAcknowledgements> acknowledgementsMap() {
return acknowledgementsMap;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,7 @@ private ShareCompletedFetch newShareCompletedFetch(ShareFetchResponseData.Partit
return new ShareCompletedFetch(
logContext,
BufferSupplier.create(),
0,
TIP,
partitionData,
shareFetchMetricsAggregator,
Expand Down
Loading
Loading