I have a consumer that consumes from two topics with two different avro schemas. The number of topics can grow in the future, so unioning schemas to read from multiple topics is not scalable.
from confluent_kafka import Consumer # assume config for group.id and bootstrap servers are set consumer = Consumer(config) consumer.subscribe(["randomtopic1", "randomtopic2"]) msg = consumer.poll() msg_value = message.value() #Should be in bytes or str
How do I identify/extract which topic the message is from, without deserializing it with the avro schema? Wondering if topic name is encoded into any avro headers, I came across this link where they deserialize in the form of bytes and somehow extract the topic name. Then they deserialize the message value with the schema.
Advertisement
Answer
There are no “Avro headers” in the consumed records. Kafka record headers don’t have the topic name (usually).
You can simply use message.topic()
, then use an if
statement against it to process the topics differently.