Skip to content

Commit

Permalink
KAFKA-16717 support alter share group offsets
Browse files Browse the repository at this point in the history
  • Loading branch information
root authored and JimmyWang6 committed Jan 23, 2025
1 parent 239708f commit f657a44
Show file tree
Hide file tree
Showing 12 changed files with 251 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,4 @@ public KafkaFuture<Void> all() {
return result;
}

}
}
27 changes: 27 additions & 0 deletions clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
Original file line number Diff line number Diff line change
Expand Up @@ -1780,6 +1780,33 @@ RemoveRaftVoterResult removeRaftVoter(
DescribeShareGroupsResult describeShareGroups(Collection<String> groupIds,
DescribeShareGroupsOptions options);

/**
* Alters offsets for the specified group. In order to succeed, the group must be empty.
*
* <p>This operation is not transactional, so it may succeed for some partitions while fail for others.
*
* @param groupId The group for which to alter offsets.
* @param offsets A map of offsets by partition. Partitions not specified in the map are ignored.
* @param options The options to use when altering the offsets.
* @return The AlterShareGroupOffsetsResult.
*/
AlterShareGroupOffsetsResult alterShareGroupOffsets(String groupId, Map<TopicPartition, Long> offsets, AlterShareGroupOffsetsOptions options);


/**
* Alters offsets for the specified group. In order to succeed, the group must be empty.
*
* <p>This is a convenience method for {@link #alterShareGroupOffsets(String, Map, AlterShareGroupOffsetsOptions)} with default options.
* See the overload for more details.
*
* @param groupId The group for which to alter offsets.
* @param offsets A map of offsets by partition.
* @return The AlterShareGroupOffsetsResult.
*/
default AlterShareGroupOffsetsResult alterShareGroupOffsets(String groupId, Map<TopicPartition, Long> offsets) {
return alterShareGroupOffsets(groupId, offsets, new AlterShareGroupOffsetsOptions());
}

/**
* Describe some share groups in the cluster, with the default options.
* <p>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kafka.clients.admin;

import org.apache.kafka.common.annotation.InterfaceStability;

import java.util.Map;

/**
* Options for the {@link Admin#alterShareGroupOffsets(String, Map, AlterShareGroupOffsetsOptions)} call.
* <p>
* The API of this class is evolving, see {@link Admin} for details.
*/
@InterfaceStability.Evolving
public class AlterShareGroupOffsetsOptions extends AbstractOptions<AlterShareGroupOffsetsOptions> {

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kafka.clients.admin;

import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.common.protocol.Errors;

import java.util.Map;

/**
* The result of the {@link Admin#alterShareGroupOffsets(String, Map, AlterShareGroupOffsetsOptions)} call.
* <p>
* The API of this class is evolving, see {@link Admin} for details.
*/
@InterfaceStability.Evolving
public class AlterShareGroupOffsetsResult {

private final KafkaFuture<Map<TopicPartition, Errors>> future;

AlterShareGroupOffsetsResult(KafkaFuture<Map<TopicPartition, Errors>> future) {
this.future = future;
}

/**
* Return a future which can be used to check the result for a given partition.
*/
public KafkaFuture<Void> partitionResult(final TopicPartition partition) {
return null;
}

/**
* Return a future which succeeds if all the alter offsets succeed.
*/
public KafkaFuture<Void> all() {
return null;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,11 @@ public DescribeShareGroupsResult describeShareGroups(Collection<String> groupIds
return delegate.describeShareGroups(groupIds, options);
}

@Override
public AlterShareGroupOffsetsResult alterShareGroupOffsets(String groupId, Map<TopicPartition, Long> offsets, AlterShareGroupOffsetsOptions options) {
return delegate.alterShareGroupOffsets(groupId, offsets, options);
}

@Override
public ListShareGroupOffsetsResult listShareGroupOffsets(Map<String, ListShareGroupOffsetsSpec> groupSpecs, ListShareGroupOffsetsOptions options) {
return delegate.listShareGroupOffsets(groupSpecs, options);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3796,6 +3796,12 @@ public DescribeShareGroupsResult describeShareGroups(final Collection<String> gr
.collect(Collectors.toMap(entry -> entry.getKey().idValue, Map.Entry::getValue)));
}

@Override
public AlterShareGroupOffsetsResult alterShareGroupOffsets(String groupId, Map<TopicPartition, Long> offsets, AlterShareGroupOffsetsOptions options) {
// TODO support alter share group offsets
throw new InvalidRequestException("The method is not yet implemented");
}

// To do in a follow-up PR
@Override
public ListShareGroupOffsetsResult listShareGroupOffsets(final Map<String, ListShareGroupOffsetsSpec> groupSpecs,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,8 @@ public enum ApiKeys {
READ_SHARE_GROUP_STATE_SUMMARY(ApiMessageType.READ_SHARE_GROUP_STATE_SUMMARY, true),
STREAMS_GROUP_HEARTBEAT(ApiMessageType.STREAMS_GROUP_HEARTBEAT),
STREAMS_GROUP_DESCRIBE(ApiMessageType.STREAMS_GROUP_DESCRIBE),
DESCRIBE_SHARE_GROUP_OFFSETS(ApiMessageType.DESCRIBE_SHARE_GROUP_OFFSETS);
DESCRIBE_SHARE_GROUP_OFFSETS(ApiMessageType.DESCRIBE_SHARE_GROUP_OFFSETS),
ALTER_SHARE_GROUP_OFFSETS(ApiMessageType.ALTER_SHARE_GROUP_OFFSETS);


private static final Map<ApiMessageType.ListenerType, EnumSet<ApiKeys>> APIS_BY_LISTENER =
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package org.apache.kafka.common.requests;

import org.apache.kafka.common.message.AlterShareGroupOffsetsRequestData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ByteBufferAccessor;

import java.nio.ByteBuffer;

public class AlterShareGroupOffsetsRequest extends AbstractRequest {

private final AlterShareGroupOffsetsRequestData data;

public AlterShareGroupOffsetsRequest(AlterShareGroupOffsetsRequestData data, short apiVersion) {
super(ApiKeys.ALTER_PARTITION, apiVersion);
this.data = data;
}

@Override
public AlterShareGroupOffsetsRequestData data() {
return data;
}

@Override
public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
return null;
}

public static AlterShareGroupOffsetsRequest parse(ByteBuffer buffer, short version) {
return new AlterShareGroupOffsetsRequest(new AlterShareGroupOffsetsRequestData(new ByteBufferAccessor(buffer), version), version);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

{
"apiKey": 91,
"type": "request",
"listeners": ["broker"],
"name": "AlterShareGroupOffsetsRequest",
"validVersions": "0",
"flexibleVersions": "0+",
"fields": [
{ "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId",
"about": "The group identifier." },
{ "name": "Topics", "type": "[]AlterShareGroupOffsetsRequestTopic", "versions": "0+",
"about": "The topics to alter offsets for.", "fields": [
{ "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName", "mapKey": true,
"about": "The topic name." },
{ "name": "Partitions", "type": "[]AlterShareGroupOffsetsRequestPartition", "versions": "0+",
"about": "Each partition to alter offsets for.", "fields": [
{ "name": "PartitionIndex", "type": "int32", "versions": "0+",
"about": "The partition index." },
{ "name": "StartOffset", "type": "int64", "versions": "0+",
"about": "The share-partition start offset." }
]}
]}
]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@

{
"apiKey": 91,
"type": "response",
"name": "AlterShareGroupOffsetsResponse",
"validVersions": "0",
"flexibleVersions": "0+",
// Supported errors:
// - GROUP_AUTHORIZATION_FAILED (version 0+)
// - NOT_COORDINATOR (version 0+)
// - COORDINATOR_NOT_AVAILABLE (version 0+)
// - COORDINATOR_LOAD_IN_PROGRESS (version 0+)
// - GROUP_ID_NOT_FOUND (version 0+)
// - GROUP_NOT_EMPTY (version 0+)
// - KAFKA_STORAGE_ERROR (version 0+)
// - INVALID_REQUEST (version 0+)
// - UNKNOWN_SERVER_ERROR (version 0+)
"fields": [
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
"about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
{ "name": "Responses", "type": "[]AlterShareGroupOffsetsResponseTopic", "versions": "0+",
"about": "The results for each topic.", "fields": [
{ "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName",
"about": "The topic name." },
{ "name": "TopicId", "type": "uuid", "versions": "0+",
"about": "The unique topic ID." },
{ "name": "Partitions", "type": "[]AlterShareGroupOffsetsResponsePartition", "versions": "0+", "fields": [
{ "name": "PartitionIndex", "type": "int32", "versions": "0+",
"about": "The partition index." },
{ "name": "ErrorCode", "type": "int16", "versions": "0+",
"about": "The error code, or 0 if there was no error." },
{ "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
"about": "The error message, or null if there was no error." }
]}
]}
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -1389,6 +1389,16 @@ public synchronized DescribeShareGroupsResult describeShareGroups(Collection<Str
throw new UnsupportedOperationException("Not implemented yet");
}

@Override
public AlterShareGroupOffsetsResult alterShareGroupOffsets(String groupId, Map<TopicPartition, Long> offsets, AlterShareGroupOffsetsOptions options) {
throw new UnsupportedOperationException("Not implemented yet");
}

@Override
public AlterShareGroupOffsetsResult alterShareGroupOffsets(String groupId, Map<TopicPartition, Long> offsets) {
throw new UnsupportedOperationException("Not implemented yet");
}

@Override
public synchronized ListShareGroupOffsetsResult listShareGroupOffsets(Map<String, ListShareGroupOffsetsSpec> groupSpecs, ListShareGroupOffsetsOptions options) {
throw new UnsupportedOperationException("Not implemented yet");
Expand Down
6 changes: 6 additions & 0 deletions core/src/main/scala/kafka/server/KafkaApis.scala
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,7 @@ class KafkaApis(val requestChannel: RequestChannel,
case ApiKeys.DELETE_SHARE_GROUP_STATE => handleDeleteShareGroupStateRequest(request)
case ApiKeys.READ_SHARE_GROUP_STATE_SUMMARY => handleReadShareGroupStateSummaryRequest(request)
case ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS => handleDescribeShareGroupOffsetsRequest(request)
case ApiKeys.ALTER_SHARE_GROUP_OFFSETS => handleAlterShareGroupOffsetsRequest(request)
case _ => throw new IllegalStateException(s"No handler for request api key ${request.header.apiKey}")
}
} catch {
Expand Down Expand Up @@ -3190,6 +3191,11 @@ class KafkaApis(val requestChannel: RequestChannel,
CompletableFuture.completedFuture[Unit](())
}

def handleAlterShareGroupOffsetsRequest(request: RequestChannel.Request): Unit = {
val handleAlterGroupOffsetsRequest = request.body[AlterShareGroupOffsetsRequest]
// TODO: Implement the AlterShareGroupOffsetsRequest handling
}

// Visible for Testing
def getAcknowledgeBatchesFromShareAcknowledgeRequest(shareAcknowledgeRequest: ShareAcknowledgeRequest,
topicIdNames: util.Map[Uuid, String],
Expand Down

0 comments on commit f657a44

Please sign in to comment.