Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-18225: ClientQuotaCallback#updateClusterMetadata is unsupported by kraft #18196

Open
wants to merge 41 commits into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
5fe6d1a
wip
m1a2st Dec 15, 2024
b3edea5
add clusterId to DynamicClientQuotaPublisher
m1a2st Dec 15, 2024
15d7dbb
Draft a version
m1a2st Dec 15, 2024
4123c70
revert test
m1a2st Dec 16, 2024
765d7f1
Merge branch 'trunk' into KAFKA-18225
m1a2st Dec 16, 2024
513b381
add new test
m1a2st Dec 16, 2024
74b1006
update the test
m1a2st Dec 17, 2024
d1cfe97
Merge branch 'trunk' into KAFKA-18225
m1a2st Dec 18, 2024
a4758e7
Merge branch 'trunk' into KAFKA-18225
m1a2st Dec 31, 2024
dee9a65
Merge branch 'trunk' into KAFKA-18225
m1a2st Jan 8, 2025
cd853b6
resolve conflict
m1a2st Jan 8, 2025
48a8b41
Merge branch 'trunk' into KAFKA-18225
m1a2st Jan 9, 2025
6bf41a7
addressed by comments
m1a2st Jan 9, 2025
3fe1d7c
addressed by comments
m1a2st Jan 11, 2025
ed50ea8
Merge branch 'trunk' into KAFKA-18225
m1a2st Jan 11, 2025
f87db91
fix var name
m1a2st Jan 12, 2025
57fad78
update delete topic
m1a2st Jan 13, 2025
f468e55
addressed by comment
m1a2st Jan 13, 2025
0c136b7
addressed by comment
m1a2st Jan 13, 2025
ac9f55b
addressed by comment
m1a2st Jan 13, 2025
909d8f9
addressed by comment
m1a2st Jan 13, 2025
5e88ee1
update when updateClusterMetadata success
m1a2st Jan 13, 2025
b3365f0
addressed by comment
m1a2st Jan 15, 2025
8e36e2f
Merge branch 'trunk' into KAFKA-18225
m1a2st Jan 16, 2025
977a603
update the doc
m1a2st Jan 16, 2025
de16d0e
addressed the comment
m1a2st Jan 16, 2025
9f04283
Merge branch 'trunk' into KAFKA-18225
m1a2st Jan 18, 2025
830c99c
Merge branch 'trunk' into KAFKA-18225
m1a2st Jan 19, 2025
3a3e409
fix the test
m1a2st Jan 19, 2025
e48ca88
Merge branch 'trunk' into KAFKA-18225
m1a2st Jan 20, 2025
6f679f1
fix the conflict
m1a2st Jan 20, 2025
be432e6
separator logic
m1a2st Jan 20, 2025
af4cee5
add license
m1a2st Jan 20, 2025
39f542e
revert unused change
m1a2st Jan 20, 2025
0bab1c7
revert unused change
m1a2st Jan 20, 2025
1ea0067
Merge branch 'trunk' into KAFKA-18225
m1a2st Jan 21, 2025
929e87a
wip
m1a2st Jan 24, 2025
101cbdc
addressed by comments
m1a2st Jan 24, 2025
9c28db2
add new test
m1a2st Jan 24, 2025
9514964
Merge branch 'trunk' into KAFKA-18225
m1a2st Jan 26, 2025
498ab89
resolve the conflict
m1a2st Jan 26, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions core/src/main/scala/kafka/server/BrokerServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -509,6 +509,7 @@ class BrokerServer(
dynamicConfigHandlers.toMap,
"broker"),
new DynamicClientQuotaPublisher(
clusterId,
config,
sharedServer.metadataPublishingFaultHandler,
"broker",
Expand Down
1 change: 1 addition & 0 deletions core/src/main/scala/kafka/server/ControllerServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,7 @@ class ControllerServer(
// Set up the client quotas publisher. This will enable controller mutation quotas and any
// other quotas which are applicable.
metadataPublishers.add(new DynamicClientQuotaPublisher(
clusterId,
config,
sharedServer.metadataPublishingFaultHandler,
"controller",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,13 @@ import kafka.server.KafkaConfig
import kafka.utils.Logging
import org.apache.kafka.image.loader.LoaderManifest
import org.apache.kafka.image.{MetadataDelta, MetadataImage}
import org.apache.kafka.server.config.QuotaConfig
import org.apache.kafka.server.fault.FaultHandler
import org.apache.kafka.server.quota.ClientQuotaCallback


class DynamicClientQuotaPublisher(
clusterId: String,
conf: KafkaConfig,
faultHandler: FaultHandler,
nodeType: String,
Expand All @@ -48,6 +51,11 @@ class DynamicClientQuotaPublisher(
): Unit = {
val deltaName = s"MetadataDelta up to ${newImage.highestOffsetAndEpoch().offset}"
try {
val clientQuotaCallback = conf.getConfiguredInstance(QuotaConfig.CLIENT_QUOTA_CALLBACK_CLASS_CONFIG, classOf[ClientQuotaCallback])
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should use the callback in quotaManagers rather than creating a new one! They are different instances and so this approach can update the callback used by quotaManagers

if (clientQuotaCallback != null) {
chia7712 marked this conversation as resolved.
Show resolved Hide resolved
val cluster = KRaftMetadataCache.toCluster(clusterId, newImage)
clientQuotaCallback.updateClusterMetadata(cluster)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this line is required.

}
Option(delta.clientQuotasDelta()).foreach { clientQuotasDelta =>
clientQuotaMetadataManager.update(clientQuotasDelta)
}
Expand Down
134 changes: 97 additions & 37 deletions core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package kafka.server.metadata

import kafka.server.metadata.KRaftMetadataCache.{getOfflineReplicas, getRandomAliveBroker}
import kafka.server.{CachedControllerId, KRaftCachedControllerId, MetadataCache}
import kafka.utils.Logging
import org.apache.kafka.admin.BrokerMetadata
Expand Down Expand Up @@ -209,28 +210,7 @@ class KRaftMetadataCache(
}
}
}

private def getOfflineReplicas(image: MetadataImage,
partition: PartitionRegistration,
listenerName: ListenerName): util.List[Integer] = {
val offlineReplicas = new util.ArrayList[Integer](0)
for (brokerId <- partition.replicas) {
Option(image.cluster().broker(brokerId)) match {
case None => offlineReplicas.add(brokerId)
case Some(broker) => if (isReplicaOffline(partition, listenerName, broker)) {
offlineReplicas.add(brokerId)
}
}
}
offlineReplicas
}

private def isReplicaOffline(partition: PartitionRegistration, listenerName: ListenerName, broker: BrokerRegistration) =
broker.fenced() || !broker.listeners().containsKey(listenerName.value()) || isReplicaInOfflineDir(broker, partition)

private def isReplicaInOfflineDir(broker: BrokerRegistration, partition: PartitionRegistration): Boolean =
!broker.hasOnlineDir(partition.directory(broker.id()))


/**
* Get the endpoint matching the provided listener if the broker is alive. Note that listeners can
* be added dynamically, so a broker with a missing listener could be a transient error.
Expand Down Expand Up @@ -364,12 +344,7 @@ class KRaftMetadataCache(
Option(_currentImage.cluster.broker(brokerId)).count(_.inControlledShutdown) == 1
}

override def getAliveBrokers(): Iterable[BrokerMetadata] = getAliveBrokers(_currentImage)

private def getAliveBrokers(image: MetadataImage): Iterable[BrokerMetadata] = {
image.cluster().brokers().values().asScala.filterNot(_.fenced()).
map(b => new BrokerMetadata(b.id, b.rack))
}
override def getAliveBrokers(): Iterable[BrokerMetadata] = KRaftMetadataCache.getAliveBrokers(_currentImage)

override def getAliveBrokerNode(brokerId: Int, listenerName: ListenerName): Option[Node] = {
Option(_currentImage.cluster().broker(brokerId)).filterNot(_.fenced()).
Expand Down Expand Up @@ -463,15 +438,6 @@ class KRaftMetadataCache(
getRandomAliveBroker(_currentImage)
}

private def getRandomAliveBroker(image: MetadataImage): Option[Int] = {
val aliveBrokers = getAliveBrokers(image).toList
if (aliveBrokers.isEmpty) {
None
} else {
Some(aliveBrokers(ThreadLocalRandom.current().nextInt(aliveBrokers.size)).id)
}
}

def getAliveBrokerEpoch(brokerId: Int): Option[Long] = {
Option(_currentImage.cluster().broker(brokerId)).filterNot(_.fenced()).
map(brokerRegistration => brokerRegistration.epoch())
Expand Down Expand Up @@ -565,3 +531,97 @@ class KRaftMetadataCache(
}
}

object KRaftMetadataCache {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#18632 is trying to remove reference of KRaftMetadataCache, so maybe we can move the helpers to MetadataCache?


def toCluster(clusterId: String, image: MetadataImage): Cluster = {
Copy link
Contributor

@TaiJuWu TaiJuWu Dec 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This result is different with KraftMetadataCache#toCluster (the old will filter partitions base on listener).
If we decide to change this one, maybe we need to document it.

val brokerToNodes = new util.HashMap[Integer, java.util.List[Node]]
image.cluster().brokers()
.values().stream()
.filter(broker => !broker.fenced())
.forEach { broker => brokerToNodes.put(broker.id(), broker.nodes()) }

def nodes(id: Int): java.util.List[Node] = brokerToNodes.get(id)

val partitionInfos = new util.ArrayList[PartitionInfo]
val internalTopics = new util.HashSet[String]

def toArray(replicas: Array[Int]): Array[Node] = {
util.Arrays.stream(replicas)
.mapToObj(replica => nodes(replica))
.flatMap(replica => replica.stream()).toArray(size => new Array[Node](size))
}


val topicImages = image.topics().topicsByName().values()
if (topicImages != null) {
topicImages.forEach { topic =>
topic.partitions().forEach { (key, value) =>
val partitionId = key
val partition = value
val nodes1 = nodes(partition.leader)
if (nodes1 != null) {
nodes1.forEach(node => {
partitionInfos.add(new PartitionInfo(topic.name(),
partitionId,
node,
toArray(partition.replicas),
toArray(partition.isr),
getOfflineReplicas(image, partition).stream()
.map(replica => nodes(replica))
.flatMap(replica => replica.stream()).toArray(size => new Array[Node](size))))
})
if (Topic.isInternal(topic.name())) {
internalTopics.add(topic.name())
}
}
}
}
}

val controllerNode = nodes(getRandomAliveBroker(image).getOrElse(-1)) match {
case null => Node.noNode()
case nodes => nodes.get(0)
}
// Note: the constructor of Cluster does not allow us to reference unregistered nodes.
// So, for example, if partition foo-0 has replicas [1, 2] but broker 2 is not
// registered, we pass its replicas as [1, -1]. This doesn't make a lot of sense, but
// we are duplicating the behavior of ZkMetadataCache, for now.
new Cluster(clusterId, brokerToNodes.values().stream().flatMap(n => n.stream()).collect(util.stream.Collectors.toList()),
partitionInfos, Collections.emptySet(), internalTopics, controllerNode)
}

private def getOfflineReplicas(image: MetadataImage,
partition: PartitionRegistration,
listenerName: ListenerName = null): util.List[Integer] = {
val offlineReplicas = new util.ArrayList[Integer](0)
for (brokerId <- partition.replicas) {
Option(image.cluster().broker(brokerId)) match {
case None => offlineReplicas.add(brokerId)
case Some(broker) => if (listenerName == null || isReplicaOffline(partition, listenerName, broker)) {
offlineReplicas.add(brokerId)
}
}
}
offlineReplicas
}

private def isReplicaOffline(partition: PartitionRegistration, listenerName: ListenerName, broker: BrokerRegistration) =
broker.fenced() || !broker.listeners().containsKey(listenerName.value()) || isReplicaInOfflineDir(broker, partition)

private def isReplicaInOfflineDir(broker: BrokerRegistration, partition: PartitionRegistration): Boolean =
!broker.hasOnlineDir(partition.directory(broker.id()))

private def getRandomAliveBroker(image: MetadataImage): Option[Int] = {
val aliveBrokers = getAliveBrokers(image).toList
if (aliveBrokers.isEmpty) {
None
} else {
Some(aliveBrokers(ThreadLocalRandom.current().nextInt(aliveBrokers.size)).id)
}
}

private def getAliveBrokers(image: MetadataImage): Iterable[BrokerMetadata] = {
image.cluster().brokers().values().asScala.filterNot(_.fenced()).
map(b => new BrokerMetadata(b.id, b.rack))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,16 @@ import kafka.api.GroupedUserQuotaCallback._
import kafka.security.{JaasModule, JaasTestUtils}
import kafka.server._
import kafka.utils.{Logging, TestInfoUtils, TestUtils}
import kafka.zk.ConfigEntityChangeNotificationZNode
import org.apache.kafka.clients.admin.{Admin, AdminClientConfig}
import org.apache.kafka.clients.consumer.{Consumer, ConsumerConfig}
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.kafka.common.config.SaslConfigs
import org.apache.kafka.common.config.internals.BrokerSecurityConfigs
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.security.auth._
import org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder
import org.apache.kafka.common.{Cluster, Reconfigurable}
import org.apache.kafka.metadata.storage.Formatter
import org.apache.kafka.server.config.{QuotaConfig, ServerConfigs}
import org.apache.kafka.server.quota._
import org.junit.jupiter.api.Assertions._
Expand Down Expand Up @@ -64,11 +65,13 @@ class CustomQuotaCallbackTest extends IntegrationTestHarness with SaslSetup {

@BeforeEach
override def setUp(testInfo: TestInfo): Unit = {
startSasl(jaasSections(kafkaServerSaslMechanisms, Some("SCRAM-SHA-256"), KafkaSasl, JaasTestUtils.KAFKA_SERVER_CONTEXT_NAME))
startSasl(jaasSections(kafkaServerSaslMechanisms, Some("SCRAM-SHA-256"), Both, JaasTestUtils.KAFKA_SERVER_CONTEXT_NAME))
this.serverConfig.setProperty(QuotaConfig.CLIENT_QUOTA_CALLBACK_CLASS_CONFIG, classOf[GroupedUserQuotaCallback].getName)
this.serverConfig.setProperty(s"${listenerName.configPrefix}${BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG}",
classOf[GroupedUserPrincipalBuilder].getName)
this.serverConfig.setProperty(ServerConfigs.DELETE_TOPIC_ENABLE_CONFIG, "true")
val superuserLoginContext = jaasAdminLoginModule(kafkaClientSaslMechanism)
superuserClientConfig.put(SaslConfigs.SASL_JAAS_CONFIG, superuserLoginContext)
super.setUp(testInfo)

producerConfig.put(SaslConfigs.SASL_JAAS_CONFIG,
Expand All @@ -86,12 +89,20 @@ class CustomQuotaCallbackTest extends IntegrationTestHarness with SaslSetup {

override def configureSecurityBeforeServersStart(testInfo: TestInfo): Unit = {
super.configureSecurityBeforeServersStart(testInfo)
zkClient.makeSurePersistentPathExists(ConfigEntityChangeNotificationZNode.path)
createScramCredentials(zkConnect, JaasTestUtils.KAFKA_SCRAM_ADMIN, JaasTestUtils.KAFKA_SCRAM_ADMIN_PASSWORD)
}

override def addFormatterSettings(formatter: Formatter): Unit = {
formatter.setScramArguments(
List(s"SCRAM-SHA-256=[name=${JaasTestUtils.KAFKA_SCRAM_ADMIN},password=${JaasTestUtils.KAFKA_SCRAM_ADMIN_PASSWORD}]").asJava)
}

override def createPrivilegedAdminClient() = {
createAdminClient(bootstrapServers(), securityProtocol, trustStoreFile, clientSaslProperties,
kafkaClientSaslMechanism, JaasTestUtils.KAFKA_SCRAM_ADMIN, JaasTestUtils.KAFKA_SCRAM_ADMIN_PASSWORD)
}

@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_ZK_implicit"))
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly"))
def testCustomQuotaCallback(quorum: String, groupProtocol: String): Unit = {
// Large quota override, should not throttle
var brokerId = 0
Expand Down Expand Up @@ -179,14 +190,12 @@ class CustomQuotaCallbackTest extends IntegrationTestHarness with SaslSetup {
}

private def createTopic(topic: String, numPartitions: Int, leader: Int): Unit = {
val assignment = (0 until numPartitions).map { i => i -> Seq(leader) }.toMap
TestUtils.createTopic(zkClient, topic, assignment, servers)
TestUtils.createTopicWithAdmin(createAdminClient(), topic, brokers, controllerServers, numPartitions)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You have to honor the leader to make replica leader hosted by correct broker - otherwise, this test will get flaky in the future

}

private def createAdminClient(): Admin = {
val config = new util.HashMap[String, Object]
config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
TestUtils.bootstrapServers(servers, new ListenerName("BROKER")))
config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers())
clientSecurityProps("admin-client").asInstanceOf[util.Map[Object, Object]].forEach { (key, value) =>
config.put(key.toString, value)
}
Expand Down Expand Up @@ -231,11 +240,11 @@ class CustomQuotaCallbackTest extends IntegrationTestHarness with SaslSetup {
consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, s"$user-group")
consumerConfig.put(SaslConfigs.SASL_JAAS_CONFIG, JaasModule.scramLoginModule(user, password).toString)

GroupedUser(user, userGroup, topic, servers(leader), producerClientId, consumerClientId,
GroupedUser(user, userGroup, topic, brokerServers(leader), producerClientId, consumerClientId,
createProducer(), createConsumer(), adminClient)
}

case class GroupedUser(user: String, userGroup: String, topic: String, leaderNode: KafkaBroker,
case class GroupedUser(user: String, userGroup: String, topic: String, leaderNode: BrokerServer,
producerClientId: String, consumerClientId: String,
override val producer: KafkaProducer[Array[Byte], Array[Byte]],
override val consumer: Consumer[Array[Byte], Array[Byte]],
Expand Down Expand Up @@ -316,7 +325,7 @@ object GroupedUserPrincipalBuilder {
}
}

class GroupedUserPrincipalBuilder extends KafkaPrincipalBuilder {
class GroupedUserPrincipalBuilder extends DefaultKafkaPrincipalBuilder(null, null) {
override def build(context: AuthenticationContext): KafkaPrincipal = {
val securityProtocol = context.securityProtocol
if (securityProtocol == SecurityProtocol.SASL_PLAINTEXT || securityProtocol == SecurityProtocol.SASL_SSL) {
Expand Down Expand Up @@ -396,7 +405,15 @@ class GroupedUserQuotaCallback extends ClientQuotaCallback with Reconfigurable w
}

override def quotaMetricTags(quotaType: ClientQuotaType, principal: KafkaPrincipal, clientId: String): util.Map[String, String] = {
principal match {
val user = principal.getName
val userGroup = group(user)
val newPrincipal = {
if (userGroup.isEmpty)
principal
else
GroupedUserPrincipal(user, userGroup)
}
newPrincipal match {
case groupPrincipal: GroupedUserPrincipal =>
val userGroup = groupPrincipal.userGroup
val quotaLimit = quotaOrDefault(userGroup, quotaType)
Expand Down Expand Up @@ -470,4 +487,3 @@ class GroupedUserQuotaCallback extends ClientQuotaCallback with Reconfigurable w
}
}


Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,13 @@ public Optional<Node> node(String listenerName) {
}
return Optional.of(new Node(id, endpoint.host(), endpoint.port(), rack.orElse(null), fenced));
}

public List<Node> nodes() {
List<Node> nodes = new ArrayList<>();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can leverage existent method. for example:

return listeners.keySet().stream().flatMap(l -> node(l).stream()).toList();

listeners.values()
.forEach(endpoint -> nodes.add(new Node(id, endpoint.host(), endpoint.port(), rack.orElse(null), fenced)));
return nodes;
}

public Map<String, VersionRange> supportedFeatures() {
return supportedFeatures;
Expand Down
Loading