I am using python-kafka to listen to a kafka topic and use that the records. I want to keep it polling infinitely without any exit. This is my code below:
def test(): consumer = KafkaConsumer('abc', 'localhost:9092', auto_offset_reset='earliest') for msg in consumer: print(msg.value)
This code just reads the data and exits directly. Is there a way to keep listening to topics even if message is not pushed to it?
Any relevant example where the topic is continuously monitored is also great for me.
Advertisement
Answer
Using confluent_kafka
import time from confluent_kafka import Consumer consumer = Consumer({ 'bootstrap.servers': 'localhost:9092', 'group.id': 'my-consumer-1', 'auto.offset.reset': 'earliest' }) consumer.subscribe(['topicName']) while True: try: message = consumer.poll(10.0) if not message: time.sleep(120) # Sleep for 2 minutes if message.error(): print(f"Consumer error: {message.error()}") continue print(f"Received message: {message.value().decode('utf-8')}") except: # Handle any exception here ... finally: consumer.close() print("Goodbye")
Using kafka-python
import time from kafka import KafkaConsumer consumer = KafkaConsumer( bootstrap_servers=['localhost:9092'], auto_offset_reset='earliest', group_id='my-consumer-1', ) consumer.subscribe(['topicName']) while True: try: message = consumer.poll(10.0) if not message: time.sleep(120) # Sleep for 2 minutes if message.error(): print(f"Consumer error: {message.error()}") continue print(f"Received message: {message.value().decode('utf-8')}") except: # Handle any exception here ... finally: consumer.close() print("Goodbye")