Skip to content
Advertisement

Python-Kafka: Keep polling topic infinitely

I am using python-kafka to listen to a kafka topic and use that the records. I want to keep it polling infinitely without any exit. This is my code below:

def test():
    consumer = KafkaConsumer('abc', 'localhost:9092', auto_offset_reset='earliest')
    for msg in consumer:
        print(msg.value)

This code just reads the data and exits directly. Is there a way to keep listening to topics even if message is not pushed to it?

Any relevant example where the topic is continuously monitored is also great for me.

Advertisement

Answer

Using confluent_kafka

import time
from confluent_kafka import Consumer


consumer = Consumer({
    'bootstrap.servers': 'localhost:9092',
    'group.id': 'my-consumer-1',
    'auto.offset.reset': 'earliest'
})
consumer.subscribe(['topicName'])

while True:
    try: 
        message = consumer.poll(10.0)

        if not message:
            time.sleep(120) # Sleep for 2 minutes

        if message.error():
            print(f"Consumer error: {message.error()}")
            continue

        print(f"Received message: {message.value().decode('utf-8')}")
    except:
        # Handle any exception here
        ...
    finally:
        consumer.close()
        print("Goodbye")

Using kafka-python

import time
from kafka import KafkaConsumer

consumer = KafkaConsumer(
     bootstrap_servers=['localhost:9092'],
     auto_offset_reset='earliest',
     group_id='my-consumer-1',
)
consumer.subscribe(['topicName'])

while True:
    try: 
        message = consumer.poll(10.0)

        if not message:
            time.sleep(120) # Sleep for 2 minutes

        if message.error():
            print(f"Consumer error: {message.error()}")
            continue

        print(f"Received message: {message.value().decode('utf-8')}")
    except:
        # Handle any exception here
        ...
    finally:
        consumer.close()
        print("Goodbye")
  
Advertisement