I’m using kafka-python library for my fastapi consumer app and I’m consuming messages in batch with maximum of 100 records. Since the topic has huge traffic and have only one partition, consuming, processing and committing should be as quick as possible hence I want to use commit_async(), instead of synchronous commit(). But I’m not able to find a good example
Tag: apache-kafka
How can I create Kafka stream from Python?
How can I create a Kafka stream directly from Python? I know that I need to use GQLAlchemy but I don’t know the exact commands. Answer GQLAlchemy is a fully open-source Python library and Object Graph Mapper (OGM) – a link between graph database objects and Python objects. An Object Graph Mapper or OGM provides a developer-friendly workflow that allows
Confused with encoding and decoding with Kafka connect
Here’s a high-level view of my pipeline: MQTT -> Kafka/MQTT bridge [producer] -> Kafka connect -> AWS Kinesis data stream -> Lambda function (decode Kinesis.record.data) -> store DynamoDB Basically, the full flow is working properly but I’m getting a weird “json” format when I’m trying to read the kinesis.record.data. It isn’t utf8. It includes utf8-encoded quoted-string-encoded-JSON records separated by a
Multi Topic Consumer Identify topic
I have a consumer that consumes from two topics with two different avro schemas. The number of topics can grow in the future, so unioning schemas to read from multiple topics is not scalable. How do I identify/extract which topic the message is from, without deserializing it with the avro schema? Wondering if topic name is encoded into any avro
consuming Kafka Avro massages in Python
I am trying to consume messages from Kafka Avro in Python. We have it in Java, and it’s working, but when trying to consume it in the Jupyter notebook, Parsing does not work. I followed the example given by the documentation: (I’ve removed conf information for security reasons) This is the error I got: KafkaError{code=_KEY_DESERIALIZATION,val=-160,str=”‘str’ object is not callable”} It
pyspark structured streaming kafka – py4j.protocol.Py4JJavaError: An error occurred while calling o41.save
I have a simple PySpark program which publishes data into kafka. when i do a spark-submit, it gives error Command being run : Error : Spark Version – 3.2.0; I’ve confluent kafka installed on my m/c, here is the version : Here is the code : Any ideas what the issue is ? The Spark version seems to be matching
Kafka consumer/producer with Python in WSL2 Ubuntu
This is a follow-up question from this thread. As per advised from the thread, the possible cause for why my Python code is not working is because I was to connect to a remote server in WSL2. And there could be unknown issues with WSL2 Ubuntu. So I am testing that hypothesis with the following two approaches of communicating within
Kafka with python: How to send topic to postgreSQL?
I am urged to use Kafka with python. Moreover, I need to develop a very simple producer-consumer application that reads metrics from a device in real-time and then publishes them to a topic ‘metrics’ in Kafka. Then a consumer must subscribe to the ‘metrics’ topic and store those data to a postgreSQL database. I tried to draw the architecture here:
Python-Kafka: Keep polling topic infinitely
I am using python-kafka to listen to a kafka topic and use that the records. I want to keep it polling infinitely without any exit. This is my code below: This code just reads the data and exits directly. Is there a way to keep listening to topics even if message is not pushed to it? Any relevant example where
kafka-python raise UnrecognizedBrokerVersion Error
I am getting this error when constructing KafkaProducer with the kafka-python package: The code is as follows: I am using Python 3.7 and an AWS MSK cluster. Answer Solved it by just adding security_protocol=”SSL” to the KafkaProducer as follows: