Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

initpos LATEST triggers InvalidArgumentException #13

Closed
cizmazia opened this issue Feb 8, 2021 · 13 comments
Closed

initpos LATEST triggers InvalidArgumentException #13

cizmazia opened this issue Feb 8, 2021 · 13 comments

Comments

@cizmazia
Copy link

cizmazia commented Feb 8, 2021

Hey, please could advise me on how to resolve the following issue?

With the configuration flink.stream.initpos=LATEST, AWSUtil changes the configuration to AT_TIMESTAMP, starting from the current time. The Kinesis client is created by AWSUtil, however, it does not accept the timestamp and throws InvalidArgumentException "The timestampInMillis parameter cannot be greater than the currentTimestampInMillis":

  • timestampInMillis: 1612801872966000 (the 3 extra zeros look suspicious)
  • currentTimestampInMillis: 1612801873011
software.amazon.kinesis.shaded.com.amazonaws.services.kinesis.model.InvalidArgumentException: The timestampInMillis parameter cannot be greater than the currentTimestampInMillis. timestampInMillis: 1612801872966000, currentTimestampInMillis: 1612801873011 (Service: AmazonKinesis; Status Code: 400; Error Code: InvalidArgumentException; Request ID: 0ed7be10-6a2b-11eb-a035-a72b181167f6; Proxy: null)
	at software.amazon.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1811)
	at software.amazon.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleServiceErrorResponse(AmazonHttpClient.java:1395)
	at software.amazon.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1371)
	at software.amazon.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1145)
	at software.amazon.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:802)
	at software.amazon.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:770)
	at software.amazon.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:744)
	at software.amazon.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:704)
	at software.amazon.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:686)
	at software.amazon.kinesis.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:550)
	at software.amazon.kinesis.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:530)
	at software.amazon.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.doInvoke(AmazonKinesisClient.java:2839)
	at software.amazon.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.invoke(AmazonKinesisClient.java:2806)
	at software.amazon.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.invoke(AmazonKinesisClient.java:2795)
	at software.amazon.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.executeGetShardIterator(AmazonKinesisClient.java:1399)
	at software.amazon.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.getShardIterator(AmazonKinesisClient.java:1370)
	at software.amazon.kinesis.connectors.flink.proxy.KinesisProxy.getShardIterator(KinesisProxy.java:313)
	at software.amazon.kinesis.connectors.flink.proxy.KinesisProxy.getShardIterator(KinesisProxy.java:304)
	at software.amazon.kinesis.connectors.flink.internals.publisher.polling.PollingRecordPublisher.getShardIterator(PollingRecordPublisher.java:175)
	at software.amazon.kinesis.connectors.flink.internals.publisher.polling.PollingRecordPublisher.<init>(PollingRecordPublisher.java:95)
	at software.amazon.kinesis.connectors.flink.internals.publisher.polling.PollingRecordPublisherFactory.create(PollingRecordPublisherFactory.java:88)
	at software.amazon.kinesis.connectors.flink.internals.publisher.polling.PollingRecordPublisherFactory.create(PollingRecordPublisherFactory.java:39)
	at software.amazon.kinesis.connectors.flink.internals.KinesisDataFetcher.createRecordPublisher(KinesisDataFetcher.java:501)
	at software.amazon.kinesis.connectors.flink.internals.KinesisDataFetcher.createShardConsumer(KinesisDataFetcher.java:473)
	at software.amazon.kinesis.connectors.flink.internals.KinesisDataFetcher.runFetcher(KinesisDataFetcher.java:569)
	at software.amazon.kinesis.connectors.flink.FlinkKinesisConsumer.run(FlinkKinesisConsumer.java:350)
	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
	at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)

Versions

  • amazon-kinesis-connector-flink v2.0.0
  • flink-streaming-java_2.12 v1.11.1
  • openjdk 11.0.10
@cizmazia cizmazia changed the title flink.stream.initpos LATEST triggers InvalidArgumentException WIP flink.stream.initpos LATEST triggers InvalidArgumentException Feb 8, 2021
@cizmazia cizmazia changed the title WIP flink.stream.initpos LATEST triggers InvalidArgumentException WIP initpos LATEST triggers InvalidArgumentException Feb 8, 2021
@cizmazia cizmazia changed the title WIP initpos LATEST triggers InvalidArgumentException initpos LATEST triggers InvalidArgumentException Feb 8, 2021
@dannycranmer
Copy link
Contributor

Hello, the 3 extra zeros do look suspicious indeed. It looks like if the extra 3 zeroes were not there, it would be ok. I have not seen this error before. Please confirm how you are running Flink, is it local/hosted or managed (AWS KDA etc)?

To consume from LATEST in light of this error, the only thing I can think of is trying to use AT_TIMESTAMP directly when configuring the stream, for example. However this might result in the same error, I would be interested to see the result:

String format = "yyyy-MM-dd'T'HH:mm:ss";
String date = new SimpleDateFormat(format).format(new Date());

consumerProperties.put(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP, date);
consumerProperties.put(ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT, format);
consumerProperties.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "AT_TIMESTAMP");

If you are in a position to compile the connector from source, this is an option for now until we resolve the problem. You can modify the method here to simply return StartingPosition.restartFromSequenceNumber(sequenceNumber);

At the moment I am struggling to reproduce the problem, so more information would be useful.

cizmazia added a commit to cizmazia/amazon-kinesis-data-analytics-java-examples that referenced this issue Feb 9, 2021
awslabs/amazon-kinesis-connector-flink#13

Steps:

docker-compose up -d

export AWS_DEFAULT_REGION=eu-west-1
export AWS_ACCESS_KEY_ID=test
export AWS_SECRET_ACCESS_KEY=test
aws kinesis create-stream --endpoint-url=http://localhost:4566 --stream-name ExampleInputStream --shard-count 1
aws kinesis create-stream --endpoint-url=http://localhost:4566 --stream-name ExampleOutputStream --shard-count 1

mvn clean test
@cizmazia
Copy link
Author

cizmazia commented Feb 9, 2021

Thanks for getting back!

Please take a look here on how to reproduce the issue:
cizmazia/amazon-kinesis-data-analytics-java-examples#1

cizmazia added a commit to cizmazia/amazon-kinesis-data-analytics-java-examples that referenced this issue Feb 9, 2021
awslabs/amazon-kinesis-connector-flink#13

Steps:

docker-compose up -d

export AWS_DEFAULT_REGION=us-west-2
export AWS_ACCESS_KEY_ID=test
export AWS_SECRET_ACCESS_KEY=test
aws kinesis create-stream --endpoint-url=http://localhost:4566 --stream-name ExampleInputStream --shard-count 1

mvn clean test
cizmazia added a commit to cizmazia/amazon-kinesis-data-analytics-java-examples that referenced this issue Feb 9, 2021
awslabs/amazon-kinesis-connector-flink#13

Steps:

docker-compose up -d

export AWS_DEFAULT_REGION=us-west-2
export AWS_ACCESS_KEY_ID=test
export AWS_SECRET_ACCESS_KEY=test
aws kinesis create-stream --endpoint-url=http://localhost:4566 --stream-name ExampleInputStream --shard-count 1

mvn clean test
@dannycranmer
Copy link
Contributor

Thanks for the self contained reproduction, very nice. I am fairly certain it is a kineasalite issue, this looks like it will add three zeros:

Since the AWS SDK uses a Date for the timestamp, which is being created correctly in the connector:

The issue is not observed when hitting the real Kinesis service.

The kinesallite change was added in a commit indicating it is being fixed:

It looks like Kinesalite was broken in v3.3.3 and Flink e2e tests use 3.3.1 so are unaffected:

@cizmazia
Copy link
Author

Thanks for tracking this down. Much appreciated!

@cizmazia
Copy link
Author

Related mhart/kinesalite#70

@cizmazia
Copy link
Author

The breaking change has been introduced in kinesalite v1.12.0

@dannycranmer
Copy link
Contributor

Yeah you are right. The e2e test uses TRIM_HORIZON so does not encounter the problem. Thanks for the heads up

@davidtag
Copy link

@dannycranmer I am running into a similar issue on the real Kinesis service when CBOR is disabled, but not when it is enabled (default).

I've confirmed I get this error either when I set this property in my job:

System.setProperty(SDKGlobalConfiguration.AWS_CBOR_DISABLE_SYSTEM_PROPERTY, "true")

or when I set this environment variable, as they do in the Flink tests you linked above AWS_CBOR_DISABLE=1.

I know disabling CBOR is not required for the real Kinesis service, but it is for localstack, which uses kinesalite behind-the-scenes, which impacts the ability to run integration tests or for local development.

I did a bit of digging, and I think the issue is related to how requests are serialized to JSON. What I did was setup a local proxy server to inspect messages being sent from my Flink job to Kinesalite. When calling GetSharedIterator (request header included 'X-Amz-Target': 'Kinesis_20131202.GetShardIterator'), the payload is:

b'{"StreamName":"events-local","ShardId":"shardId-000000000000","ShardIteratorType":"AT_TIMESTAMP","Timestamp":1618016505720}'

Notice that the timestamp field is in milliseconds. The AWS API Reference says that this should be in seconds. So I think kinesalite multiplying by 1000 is in fact correct and the issue seems to be serialization of the timestamp when using the JSON API.

@dannycranmer
Copy link
Contributor

Hey @davidtag. Thanks for the investigation and information. I will also take a look, since the Flink connector does not serialise the Record, I wonder if there is a bug in the AWS SDK. I will keep you posted.

@davidtag
Copy link

Thanks @dannycranmer. Any insight into where the issue may be?

@dannycranmer
Copy link
Contributor

I conducted a simple test (SDK v1.11.844 and v1.11.1030) and it does look like an SDK or Kinesis issue. When I try to disable CBOR I see the following error, this is coming from the Kinesis service:

com.amazonaws.services.kinesis.model.InvalidArgumentException: The timestampInMillis parameter cannot be greater than the currentTimestampInMillis. timestampInMillis: 1622282661795000, currentTimestampInMillis: 1622299742369 (Service: AmazonKinesis; Status Code: 400; Error Code: InvalidArgumentException; Request ID: d0a3fa2b-2ede-acb7-898f-acb53f9f2a81; Proxy: null)

Here is my test code, the case with CBOR enabled passes.

    @Test
    public void testSdkWithCborDisabled() {
        System.setProperty(SDKGlobalConfiguration.AWS_CBOR_DISABLE_SYSTEM_PROPERTY, "true");

        testGetShardIterator();
    }

    @Test
    public void testSdkWithCbor() {
        System.setProperty(SDKGlobalConfiguration.AWS_CBOR_DISABLE_SYSTEM_PROPERTY, "false");

        testGetShardIterator();
    }

    private void testGetShardIterator() {
        AmazonKinesis kinesisClient = AmazonKinesisClientBuilder.standard()
                .withRegion("us-east-1")
                .build();

        GetShardIteratorRequest request = new GetShardIteratorRequest()
                .withStreamName("input-stream")
                .withShardIteratorType(ShardIteratorType.AT_TIMESTAMP)
                .withTimestamp(new Date(1622282661795L))
                .withShardId("shard-00000001");

        assertNotNull(kinesisClient.getShardIterator(request));
    }

I inspected the HTTP payload and in both cases the full timestamp (millis) is being submitted. I have raised an issue in the SDK project: aws/aws-sdk-java#2588

@SOD-DOB
Copy link

SOD-DOB commented Jun 10, 2021

hi @dannycranmer
I am a flink rookie. Our data is in the kinesis stream. I would like to know the difference between flink and aws connectors, and the current problems between the two?
My version:
flink:1.11
jdk:1.8

@dannycranmer
Copy link
Contributor

Hey @SOD-DOB, the Apache Flink connector adds a layer of abstraction over other connectors to allow Flink to read/write from/to Kinesis Stream shards. In order to integrate any external data with Flink you must use a Flink source/sink interface to allow the systems to talk to each other. Typically Flink connector implementations are thin layers over off-the-shelf libraries, for example AWS SDK or Kafka SDK.

For Kinesis, the following libraries are used:

  • Kinesis Source > AWS SDK
  • Kinesis Sink > Kinesis Producer Library (KPL) > AWS SDK

The difference between this library and the Apache Flink Kinesis connector, is that we have backported some features for earlier versions of Apache Flink, in order to allow customers to leverage these features with Kinesis Data Analytics:

Feature Flink Version AWS Kinesis Connector Apache Flink Kinesis connector
EFO (Enhanced Fan-Out) 1.8 Supported Not supported
EFO (Enhanced Fan-Out) 1.11 Supported Not supported
EFO (Enhanced Fan-Out) 1.12 N/A Supported
EFO (Enhanced Fan-Out) 1.13 N/A Supported
Table API 1.8 Not Supported Not supported
Table API 1.11 Supported Not supported
Table API 1.12 N/A Supported
Table API 1.13 N/A Supported

In terms of bug fix parity, we try our best to port bug fixes across all versions of Flink.

You only need to use this connector if you want to use EFO or Table API, otherwise you can use the Apache connector.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants