I’m using kafka-python
library for my fastapi consumer app and I’m consuming messages in batch with maximum of 100 records. Since the topic has huge traffic and have only one partition, consuming, processing and committing should be as quick as possible hence I want to use commit_async()
, instead of synchronous commit()
.
But I’m not able to find a good example of commit_async()
. I’m looking for an example for commit_async()
with callback so that I can log in case of commit failure. But I’m not sure what does that callback function takes as argument and what field those arguments contain.
However the docs related to commit_async mentions the arguments, I’m not completely sure how to use them.
I need help in completing my callback function on_commit()
, someone please help here
Code
import logging as log from kafka import KafkaConsumer from message_handler_impl import MessageHandlerImpl def on_commit(): pass class KafkaMessageConsumer: def __init__(self, bootstrap_servers: str, topic: str, group_id: str): self.bootstrap_servers = bootstrap_servers self.topic = topic self.group_id = group_id self.consumer = KafkaConsumer(topic, bootstrap_servers=bootstrap_servers, group_id=group_id, enable_auto_commit=False, auto_offset_reset='latest') def consume_messages(self, max_poll_records: int, message_handler: MessageHandlerImpl = MessageHandlerImpl()): try: while True: try: msg_pack = self.consumer.poll(max_records=max_poll_records) for topic_partition, messages in msg_pack.items(): message_handler.process_messages(messages) self.consumer.commit_async(callback=on_commit) except Exception as e: log.error("Error while consuming message due to: %s", e, exc_info=True) finally: log.error("Something went wrong, closing consumer...........") self.consumer.close() if __name__ == "__main__": kafka_consumer = KafkaMessageConsumer("localhost:9092", "test-topic", "test-group") kafka_consumer.consume_messages(100)
Advertisement
Answer
The docs are fairly clear.
Called as
callback(offsets, response)
withresponse
as either anException
or anOffsetCommitResponse
struct.
def on_commit(offsets, response): # or maybe try checking type(response) if hasattr(response, '<some attribute unique to OffsetCommitResponse>'): print('committed ' + str(offsets)) else: print(response) # exception
I’m sure you could look at the source code an maybe find a unit test that covers a full example