I am using python-kafka to listen to a kafka topic and use that the records. I want to keep it polling infinitely without any exit. This is my code below:
JavaScript
x
5
1
def test():
2
consumer = KafkaConsumer('abc', 'localhost:9092', auto_offset_reset='earliest')
3
for msg in consumer:
4
print(msg.value)
5
This code just reads the data and exits directly. Is there a way to keep listening to topics even if message is not pushed to it?
Any relevant example where the topic is continuously monitored is also great for me.
Advertisement
Answer
Using confluent_kafka
JavaScript
1
30
30
1
import time
2
from confluent_kafka import Consumer
3
4
5
consumer = Consumer({
6
'bootstrap.servers': 'localhost:9092',
7
'group.id': 'my-consumer-1',
8
'auto.offset.reset': 'earliest'
9
})
10
consumer.subscribe(['topicName'])
11
12
while True:
13
try:
14
message = consumer.poll(10.0)
15
16
if not message:
17
time.sleep(120) # Sleep for 2 minutes
18
19
if message.error():
20
print(f"Consumer error: {message.error()}")
21
continue
22
23
print(f"Received message: {message.value().decode('utf-8')}")
24
except:
25
# Handle any exception here
26
27
finally:
28
consumer.close()
29
print("Goodbye")
30
Using kafka-python
JavaScript
1
30
30
1
import time
2
from kafka import KafkaConsumer
3
4
consumer = KafkaConsumer(
5
bootstrap_servers=['localhost:9092'],
6
auto_offset_reset='earliest',
7
group_id='my-consumer-1',
8
)
9
consumer.subscribe(['topicName'])
10
11
while True:
12
try:
13
message = consumer.poll(10.0)
14
15
if not message:
16
time.sleep(120) # Sleep for 2 minutes
17
18
if message.error():
19
print(f"Consumer error: {message.error()}")
20
continue
21
22
print(f"Received message: {message.value().decode('utf-8')}")
23
except:
24
# Handle any exception here
25
26
finally:
27
consumer.close()
28
print("Goodbye")
29
30