[KAFKA-16720] AdminClient Support for ListShareGroupOffsets (2/2) #18671
+942
−102
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
KAFKA-16720 aims at adding the support for the ListShareGroupOffsets AdminClient functionality. This PR is a followup on #18571. The previous PR had added the handling for
ReadShareGroupStateSummaryRequest
. This one continues to make changes for handling theDescribeShareGroupOffsetsRequest
The Flow diagram as per the KIP:
Key Changes in the PR:
listShareGroupOffsets()
inKafkaAdminClient
and adding the corresponding tests toKafkaAdminClientTest
.DescribeShareGroupOffsetsHandler
class. Here is where the request is created and it also houses the handling of the same and conversions needed. One thing to note here is that, I have specified thebuildBatchedRequest()
method in such a way that the request is for one singlegroupId
though it specifies a Set. I think we need to update the KIP to ensure thatlistShareGroupOffsets()
takes a(String, ListShareGroupOffsetsSpec)
instead of a Map given that theDescribeShareGroupOffsetsRequest
can currently handle only a single groupId request. Here it diverges from the waylistConsumerGroupOffsets
is handled as theOffsetFetchRequest
supports fetching offsets for multiple groups together (from v8)KafkaApis
forhandleDescribeShareGroupOffsetsRequest
. One thing to be noted here is that theDescribeShareGroupOffsetsRequest
uses the topic Name while theReadShareGroupStateSummaryRequest
needs the topic Uuid. Hence the conversion takes place inKafkaApis
itself before relaying the same to the GroupCoordinator.GroupCoordinator
now hasdescribeShareGroupOffsets
method where it handles theDesribeShareGroupOffsets
request. Since here the call was to be made via the persister, code has been added for the same.GroupCoordinatorService
now additionally has the Persister. This is passed from theBrokerServer
which passes theNoOpPersister
in case KIP-932 isn't enabled on the clusters orDefaultStatePersister
in case Share Groups are enabled.ShareGroupCommand
(and the tests) to make a call to thelistShareGroupOffsets
method to get the start offset.