Skip to content

Commit

Permalink
InOrderConsumer doesn't work as intended. (#913)
Browse files Browse the repository at this point in the history
* Adding the logic to use the offset store instead

Update baseplate/frameworks/queue_consumer/kafka.py

Co-authored-by: Thomas Cooper <[email protected]>

Update baseplate/frameworks/queue_consumer/kafka.py

Co-authored-by: Thomas Cooper <[email protected]>

Update baseplate/frameworks/queue_consumer/kafka.py

Co-authored-by: Thomas Cooper <[email protected]>

Update baseplate/frameworks/queue_consumer/kafka.py

Co-authored-by: Sky Kistler <[email protected]>

Linting errors

Formatting?

Black formatting

Black formatting

Black formatting

Linting

Black linting again

Another attempt

* Revert "Adding the logic to use the offset store instead"

This reverts commit b1c8e96.

* The correct tests

* Formatting

* Adding more comments
  • Loading branch information
AdrielVelazquez authored May 20, 2024
1 parent 9ea10ec commit 0b6e40d
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 6 deletions.
24 changes: 20 additions & 4 deletions baseplate/frameworks/queue_consumer/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,9 @@ class InOrderConsumerFactory(_BaseKafkaQueueConsumerFactory):
This will run a single `KafkaConsumerWorker` that reads messages from Kafka and
puts them into an internal work queue. Then it will run a single `KafkaMessageHandler`
that reads messages from the internal work queue, processes them with the
`handler_fn`, and then commits each message's offset to Kafka.
`handler_fn`, and then commits each message's offset to the kafka consumer's internal state.
The Kafka Consumer will commit the offsets back to Kafka based on the auto.commit.interval.ms default which is 5 seconds
This one-at-a-time, in-order processing ensures that when a failure happens
during processing we don't commit its offset (or the offset of any later
Expand All @@ -419,6 +421,19 @@ class InOrderConsumerFactory(_BaseKafkaQueueConsumerFactory):
If you need more control, you can create the :py:class:`~confluent_kafka.Consumer`
yourself and use the constructor directly.
UPDATE: The InOrderConsumerFactory can NEVER achieve in-order, exactly once message processing.
Message processing in Kafka to enable exactly once starts at the Producer enabling transactions,
and downstream consumers enabling reading exclusively from the committed offsets within a transactions.
Secondly, without defined keys in the messages from the producer, messages will be sent in a round robin fashion to all partitions in the topic.
This means that newer messages could be consumed before older ones if the consumer of those partitions with newer messages are faster.
Some improvements are made instead that retain the current behaviour, but don't put as much pressure on Kafka by committing every single offset.
Instead of committing every single message's offset back to Kafka,
the consumer now commits each offset to it's local offset store, and commits the highest seen value for each partition at a defined interval (auto.commit.interval.ms).
"enable.auto.offset.store" is set to false to give our application explicit control of when to store offsets.
"""

# we need to ensure that only a single message handler worker exists (max_concurrency = 1)
Expand All @@ -444,8 +459,9 @@ def _consumer_config(cls) -> Dict[str, Any]:
# after message processing, to make sure offsets are not auto-committed
# prior to processing has finished.
"max.poll.interval.ms": 300000,
# disable offset autocommit, we'll manually commit.
"enable.auto.commit": "false",
# Enable offset autocommit, but disable offset store.
"enable.auto.commit": "true",
"enable.auto.offset.store": "false",
}

def build_message_handler(self) -> KafkaMessageHandler:
Expand All @@ -464,7 +480,7 @@ def commit_offset(
message.offset(),
)
with context.span.make_child("kafka.commit"):
self.consumer.commit(message=message, asynchronous=False)
self.consumer.store_offsets(message=message)

return KafkaMessageHandler(
self.baseplate,
Expand Down
6 changes: 4 additions & 2 deletions tests/unit/frameworks/queue_consumer/kafka_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,8 @@ def test_make_kafka_consumer(
"heartbeat.interval.ms": 3000,
"session.timeout.ms": 10000,
"max.poll.interval.ms": 300000,
"enable.auto.commit": "false",
"enable.auto.commit": "true",
"enable.auto.offset.store": "false",
"queued.max.messages.kbytes": 10000,
}
)
Expand Down Expand Up @@ -448,8 +449,9 @@ def test_make_kafka_consumer_unknown_topic(
"heartbeat.interval.ms": 3000,
"session.timeout.ms": 10000,
"max.poll.interval.ms": 300000,
"enable.auto.commit": "false",
"enable.auto.commit": "true",
"queued.max.messages.kbytes": 100,
"enable.auto.offset.store": "false",
}
)
mock_consumer.subscribe.assert_not_called()
Expand Down

0 comments on commit 0b6e40d

Please sign in to comment.