I am using python-kafka to listen to a kafka topic and use that the records. I want to keep it polling infinitely without any exit. This is my code below:
def test():
consumer = KafkaConsumer('abc', 'localhost:9092', auto_offset_reset='earliest')
for msg in consumer:
print(msg.value)
This code just reads the data and exits directly. Is there a way to keep listening to topics even if message is not pushed to it?
Any relevant example where the topic is continuously monitored is also great for me.
Advertisement
Answer
Using confluent_kafka
import time
from confluent_kafka import Consumer
consumer = Consumer({
'bootstrap.servers': 'localhost:9092',
'group.id': 'my-consumer-1',
'auto.offset.reset': 'earliest'
})
consumer.subscribe(['topicName'])
while True:
try:
message = consumer.poll(10.0)
if not message:
time.sleep(120) # Sleep for 2 minutes
if message.error():
print(f"Consumer error: {message.error()}")
continue
print(f"Received message: {message.value().decode('utf-8')}")
except:
# Handle any exception here
...
finally:
consumer.close()
print("Goodbye")
Using kafka-python
import time
from kafka import KafkaConsumer
consumer = KafkaConsumer(
bootstrap_servers=['localhost:9092'],
auto_offset_reset='earliest',
group_id='my-consumer-1',
)
consumer.subscribe(['topicName'])
while True:
try:
message = consumer.poll(10.0)
if not message:
time.sleep(120) # Sleep for 2 minutes
if message.error():
print(f"Consumer error: {message.error()}")
continue
print(f"Received message: {message.value().decode('utf-8')}")
except:
# Handle any exception here
...
finally:
consumer.close()
print("Goodbye")