Skip to content
Advertisement

Multi Topic Consumer Identify topic

I have a consumer that consumes from two topics with two different avro schemas. The number of topics can grow in the future, so unioning schemas to read from multiple topics is not scalable.

from confluent_kafka import Consumer

# assume config for group.id and bootstrap servers are set

consumer = Consumer(config)
consumer.subscribe(["randomtopic1", "randomtopic2"])
msg = consumer.poll()
msg_value = message.value() #Should be in bytes or str

How do I identify/extract which topic the message is from, without deserializing it with the avro schema? Wondering if topic name is encoded into any avro headers, I came across this link where they deserialize in the form of bytes and somehow extract the topic name. Then they deserialize the message value with the schema.

Advertisement

Answer

There are no “Avro headers” in the consumed records. Kafka record headers don’t have the topic name (usually).

You can simply use message.topic(), then use an if statement against it to process the topics differently.

User contributions licensed under: CC BY-SA
2 People found this is helpful
Advertisement