Skip to content
Advertisement

How to perform async commit when using kafka-python

I’m using kafka-python library for my fastapi consumer app and I’m consuming messages in batch with maximum of 100 records. Since the topic has huge traffic and have only one partition, consuming, processing and committing should be as quick as possible hence I want to use commit_async(), instead of synchronous commit().

But I’m not able to find a good example of commit_async(). I’m looking for an example for commit_async() with callback so that I can log in case of commit failure. But I’m not sure what does that callback function takes as argument and what field those arguments contain.

However the docs related to commit_async mentions the arguments, I’m not completely sure how to use them.

I need help in completing my callback function on_commit(), someone please help here

Code

import logging as log

from kafka import KafkaConsumer

from message_handler_impl import MessageHandlerImpl


def on_commit():
    pass


class KafkaMessageConsumer:
    def __init__(self, bootstrap_servers: str, topic: str, group_id: str):
        self.bootstrap_servers = bootstrap_servers
        self.topic = topic
        self.group_id = group_id
        self.consumer = KafkaConsumer(topic, bootstrap_servers=bootstrap_servers, group_id=group_id, enable_auto_commit=False, auto_offset_reset='latest')

    def consume_messages(self, max_poll_records: int,
                         message_handler: MessageHandlerImpl = MessageHandlerImpl()):
        try:
            while True:
                try:
                    msg_pack = self.consumer.poll(max_records=max_poll_records)
                    for topic_partition, messages in msg_pack.items():
                        message_handler.process_messages(messages)

                    self.consumer.commit_async(callback=on_commit)
                except Exception as e:
                    log.error("Error while consuming message due to: %s", e, exc_info=True)

        finally:
            log.error("Something went wrong, closing consumer...........")
            self.consumer.close()


if __name__ == "__main__":
    kafka_consumer = KafkaMessageConsumer("localhost:9092", "test-topic", "test-group")
    kafka_consumer.consume_messages(100)

Advertisement

Answer

The docs are fairly clear.

Called as callback(offsets, response) with response as either an Exception or an OffsetCommitResponse struct.

def on_commit(offsets, response):
    # or maybe try checking type(response) 
    if hasattr(response, '<some attribute unique to OffsetCommitResponse>'):
        print('committed ' + str(offsets))
    else:
       print(response) # exception

I’m sure you could look at the source code an maybe find a unit test that covers a full example

User contributions licensed under: CC BY-SA
10 People found this is helpful
Advertisement