I’m using kafka-python library for my fastapi consumer app and I’m consuming messages in batch with maximum of 100 records. Since the topic has huge traffic and have only one partition, consuming, processing and committing should be as quick as possible hence I want to use commit_async(), instead …
Tag: apache-kafka
How can I create Kafka stream from Python?
How can I create a Kafka stream directly from Python? I know that I need to use GQLAlchemy but I don’t know the exact commands. Answer GQLAlchemy is a fully open-source Python library and Object Graph Mapper (OGM) – a link between graph database objects and Python objects. An Object Graph Mapper o…
Confused with encoding and decoding with Kafka connect
Here’s a high-level view of my pipeline: MQTT -> Kafka/MQTT bridge [producer] -> Kafka connect -> AWS Kinesis data stream -> Lambda function (decode Kinesis.record.data) -> store DynamoDB Basically, the full flow is working properly but I’m getting a weird “json” format …
Multi Topic Consumer Identify topic
I have a consumer that consumes from two topics with two different avro schemas. The number of topics can grow in the future, so unioning schemas to read from multiple topics is not scalable. How do I identify/extract which topic the message is from, without deserializing it with the avro schema? Wondering if…
consuming Kafka Avro massages in Python
I am trying to consume messages from Kafka Avro in Python. We have it in Java, and it’s working, but when trying to consume it in the Jupyter notebook, Parsing does not work. I followed the example given by the documentation: (I’ve removed conf information for security reasons) This is the error I…
pyspark structured streaming kafka – py4j.protocol.Py4JJavaError: An error occurred while calling o41.save
I have a simple PySpark program which publishes data into kafka. when i do a spark-submit, it gives error Command being run : Error : Spark Version – 3.2.0; I’ve confluent kafka installed on my m/c, here is the version : Here is the code : Any ideas what the issue is ? The Spark version seems to b…
Kafka consumer/producer with Python in WSL2 Ubuntu
This is a follow-up question from this thread. As per advised from the thread, the possible cause for why my Python code is not working is because I was to connect to a remote server in WSL2. And there could be unknown issues with WSL2 Ubuntu. So I am testing that hypothesis with the following two approaches …
Kafka with python: How to send topic to postgreSQL?
I am urged to use Kafka with python. Moreover, I need to develop a very simple producer-consumer application that reads metrics from a device in real-time and then publishes them to a topic ‘metrics’ in Kafka. Then a consumer must subscribe to the ‘metrics’ topic and store those data t…
Python-Kafka: Keep polling topic infinitely
I am using python-kafka to listen to a kafka topic and use that the records. I want to keep it polling infinitely without any exit. This is my code below: This code just reads the data and exits directly. Is there a way to keep listening to topics even if message is not pushed to it? Any relevant example wher…
kafka-python raise UnrecognizedBrokerVersion Error
I am getting this error when constructing KafkaProducer with the kafka-python package: The code is as follows: I am using Python 3.7 and an AWS MSK cluster. Answer Solved it by just adding security_protocol=”SSL” to the KafkaProducer as follows: