Skip to content
Advertisement

How to perform async commit when using kafka-python

I’m using kafka-python library for my fastapi consumer app and I’m consuming messages in batch with maximum of 100 records. Since the topic has huge traffic and have only one partition, consuming, processing and committing should be as quick as possible hence I want to use commit_async(), instead of synchronous commit().

But I’m not able to find a good example of commit_async(). I’m looking for an example for commit_async() with callback so that I can log in case of commit failure. But I’m not sure what does that callback function takes as argument and what field those arguments contain.

However the docs related to commit_async mentions the arguments, I’m not completely sure how to use them.

I need help in completing my callback function on_commit(), someone please help here

Code

JavaScript

Advertisement

Answer

The docs are fairly clear.

Called as callback(offsets, response) with response as either an Exception or an OffsetCommitResponse struct.

JavaScript

I’m sure you could look at the source code an maybe find a unit test that covers a full example

User contributions licensed under: CC BY-SA
10 People found this is helpful
Advertisement