Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

upgraded_kafka_client_version_to_3.9.0 #279

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 2 additions & 5 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,7 @@ ext.versions = [
cassandra : '3.4.0',
commons_cli : '1.3.1',
thrift : '0.9.3',
kafka : '0.11.0.2',
zkclient : '0.10'
kafka : '3.9.0'
]

ext.libraries = [
Expand Down Expand Up @@ -102,8 +101,6 @@ ext.libraries = [
commons_cli : "commons-cli:commons-cli:${versions.commons_cli}",
thrift : "org.apache.thrift:libthrift:${versions.thrift}",
kafka_clients : "org.apache.kafka:kafka-clients:${versions.kafka}",
kafka : "org.apache.kafka:kafka_2.12:${versions.kafka}",
zkclient : "com.101tec:zkclient:${versions.zkclient}"
]

ext.tempto_core = project(':tempto-core')
Expand All @@ -113,7 +110,7 @@ ext.tempto_runner = project(':tempto-runner')
ext.tempto_ldap = project(':tempto-ldap')
ext.tempto_kafka = project(':tempto-kafka')
ext.expected_result_generator = project(':expected-result-generator')
ext.tempto_version = '1.53'
ext.tempto_version = '1.54'
Copy link

@ZacBlanco ZacBlanco Dec 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should only be upgraded and set to a non-SNAPSHOT version during the release process. It looks like after the last release it wasn't properly updated. Since we already release 1.53, this should be 1.54-SNAPSHOT. During release we will update to 1.54, and then move to 1.55-SNAPSHOT afterwards.

So could you set this to 1.54-SNAPSHOT for now?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ZacBlanco Done. Updated to 1.54-SNAPSHOT

ext.tempto_group = "io.prestodb.tempto"
ext.isReleaseVersion = !tempto_version.endsWith("SNAPSHOT")

Expand Down
1 change: 1 addition & 0 deletions tempto-examples/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ jar {

shadowJar {
version = ''
zip64 = true

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this required?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ZacBlanco on gradle build, I am getting error

Execution failed for task ':tempto-examples:shadowJar'.
> shadow.org.apache.tools.zip.Zip64RequiredException: archive contains more than 65535 entries.
image

Here, ShadowJar plugin is creating a JAR file that exceeds the limit of 65,535 entries. This might due to the ZIP format limitation that ShadowJar uses by default. Link

Copy link

@ZacBlanco ZacBlanco Dec 6, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it, thanks for the explanation. This is fine then I guess. We were probably close to the limit and the newer version pushed us over? It would be good to know the number of entries before/after this change, but I don't think it's worth trying to fix

}

build.dependsOn.add([shadowJar])
Expand Down
3 changes: 1 addition & 2 deletions tempto-kafka/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@ apply plugin: 'java'

dependencies {
compile tempto_core
compile libraries.kafka
compile libraries.zkclient
compile libraries.kafka_clients
}

// project information
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,56 +24,54 @@
import io.prestodb.tempto.internal.fulfillment.table.TableName;
import io.prestodb.tempto.query.QueryExecutor;
import io.prestodb.tempto.query.QueryResult;
import kafka.admin.AdminUtils;
import kafka.admin.RackAwareMode;
import kafka.utils.ZKStringSerializer$;
import kafka.utils.ZkUtils;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.ZkConnection;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.DeleteTopicsResult;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.slf4j.Logger;

import javax.inject.Inject;
import javax.inject.Named;
import javax.inject.Singleton;

import java.util.Collections;
import java.util.Iterator;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.function.Consumer;

import static java.lang.String.format;
import static java.util.Objects.requireNonNull;
import static org.slf4j.LoggerFactory.getLogger;


@TableManager.Descriptor(tableDefinitionClass = KafkaTableDefinition.class, type = "KAFKA")
@Singleton
public class KafkaTableManager
implements TableManager<KafkaTableDefinition>
{
private static final Logger LOGGER = getLogger(KafkaTableManager.class);
private final String databaseName;
private final QueryExecutor prestoQueryExecutor;
private final String brokerHost;
private final Integer brokerPort;
private final String prestoKafkaCatalog;
private final String zookeeperHost;
private final Integer zookeeperPort;

@Inject
public KafkaTableManager(
@Named("databaseName") String databaseName,
@Named("broker.host") String brokerHost,
@Named("broker.port") int brokerPort,
@Named("zookeeper.host") String zookeeperHost,
@Named("zookeeper.port") int zookeeperPort,
@Named("presto_database_name") String prestoDatabaseName,
@Named("presto_kafka_catalog") String prestoKafkaCatalog,
Injector injector)
{
this.databaseName = requireNonNull(databaseName, "databaseName is null");
this.brokerHost = requireNonNull(brokerHost, "brokerHost is null");
this.brokerPort = brokerPort;
this.zookeeperHost = requireNonNull(zookeeperHost, "zookeeperHost is null");
this.zookeeperPort = zookeeperPort;
requireNonNull(injector, "injector is null");
requireNonNull(prestoDatabaseName, "prestoDatabaseName is null");
this.prestoQueryExecutor = injector.getInstance(Key.get(QueryExecutor.class, Names.named(prestoDatabaseName)));
Expand All @@ -95,6 +93,18 @@ public TableInstance<KafkaTableDefinition> createImmutable(KafkaTableDefinition
return new KafkaTableInstance(createdTableName, tableDefinition);
}

private void withKafkaAdminClient(Consumer<AdminClient> routine) {
Properties properties = new Properties();
properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerHost + ":" + brokerPort);
try (AdminClient adminClient = AdminClient.create(properties)) {
routine.accept(adminClient);
Copy link
Member

@agrawalreetika agrawalreetika Dec 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What would happen if there is an issue/exception while doing above? Should we add a catch and logger for it?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@agrawalreetika added catch to handle exception and logged error.

} catch (Exception e) {
LOGGER.error("An error occurred while performing an operation with Kafka AdminClient on broker "
+ brokerHost + ":" + brokerPort);
throw new RuntimeException("An error occurred while performing an operation with Kafka AdminClient", e);
}
}

private void verifyTableExistsInPresto(String schema, String name)
{
String sql = format("select count(1) from %s.information_schema.tables where table_schema='%s' and table_name='%s'", prestoKafkaCatalog, schema, name);
Expand All @@ -104,34 +114,36 @@ private void verifyTableExistsInPresto(String schema, String name)
}
}

private void deleteTopic(String topic)
{
withZookeeper(zkUtils -> {
if (AdminUtils.topicExists(zkUtils, topic)) {
AdminUtils.deleteTopic(zkUtils, topic);

for (int checkTry = 0; checkTry < 5; ++checkTry) {
if (!AdminUtils.topicExists(zkUtils, topic)) {
return;
}
try {
Thread.sleep(1_000);
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("could not delete topic " + topic);
}
private void deleteTopic(String topic) {
withKafkaAdminClient(adminClient -> {
try {
// Check if topic exists
if (!adminClient.listTopics().names().get().contains(topic)) {
LOGGER.warn("Topic {} does not exist. Skipping deletion.", topic);
return;
}
throw new RuntimeException("could not delete topic " + topic);
DeleteTopicsResult deleteTopicsResult = adminClient.deleteTopics(Collections.singletonList(topic));
deleteTopicsResult.all().get();
try {
Thread.sleep(1_000); // Wait for metadata propagation
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("Interrupted while waiting for topic deletion: " + topic, e);
}
} catch (ExecutionException | InterruptedException e) {
throw new RuntimeException("Failed to delete topic " + topic, e);
}
});
}

private void createTopic(String topic, int partitionsCount, int replicationLevel)
{
withZookeeper(zkUtils -> {
Properties topicConfiguration = new Properties();
AdminUtils.createTopic(zkUtils, topic, partitionsCount, replicationLevel, topicConfiguration, RackAwareMode.Disabled$.MODULE$);
private void createTopic(String topic, int partitionsCount, int replicationLevel) {
withKafkaAdminClient(adminClient -> {
NewTopic newTopic = new NewTopic(topic, partitionsCount, (short) replicationLevel);
try {
adminClient.createTopics(Collections.singletonList(newTopic)).all().get();
} catch (ExecutionException | InterruptedException e) {
throw new RuntimeException("Failed to create topic " + topic, e);
}
});
}

Expand Down Expand Up @@ -164,25 +176,6 @@ private void insertDataIntoTopic(String topic, KafkaDataSource dataSource)
}
}

private void withZookeeper(Consumer<ZkUtils> routine)
{
int sessionTimeOutInMs = 15_000;
int connectionTimeOutInMs = 10_000;
String zookeeperHosts = zookeeperHost + ":" + zookeeperPort;

ZkClient zkClient = new ZkClient(zookeeperHosts,
sessionTimeOutInMs,
connectionTimeOutInMs,
ZKStringSerializer$.MODULE$);
try {
ZkUtils zkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeperHosts), false);
routine.accept(zkUtils);
}
finally {
zkClient.close();
}
}

@Override
public TableInstance<KafkaTableDefinition> createMutable(KafkaTableDefinition tableDefinition, MutableTableRequirement.State state, TableHandle tableHandle)
{
Expand Down