I am trying to consume messages from Kafka Avro in Python. We have it in Java, and it’s working, but when trying to consume it in the Jupyter notebook, Parsing does not work. I followed the example given by the documentation: (I’ve removed conf information for security reasons)
from confluent_kafka.schema_registry import SchemaRegistryClient from confluent_kafka.avro import AvroConsumer from confluent_kafka import DeserializingConsumer conf_schema = { 'url' : 'https:', 'basic.auth.user.info' : ':'} Schema_registry_client = SchemaRegistryClient(conf_schema) config = { 'bootstrap.servers' : '', 'security.protocol' : '', 'sasl.username' : '', 'sasl.password' : '', 'ssl.endpoint.identification.algorithm' : 'https', 'sasl.mechanism' : 'PLAIN', 'group.id' : '', 'auto.offset.reset' : 'earliest', 'key.deserializer' : '', 'value.deserializer' : key_avro_deserializer, 'ssl.endpoint.identification.algorithm' : 'https' } c = DeserializingConsumer(config) c.subscribe(['skyjack-export-aemp-20-external']) import io def decode(msg_value): message_bytes = io.BytesIO(msg_value) decoder = BinaryDecoder(message_bytes) event_dict = reader.read(decoder) return event_dict total_count = 0 running = True while running: msg = c.poll() if not msg.error(): msg_value = msg.value() event_dict = decode(msg_value) print(event_dict) elif msg.error().code() != KafkaError._PARTITION_EOF: print(msg.error()) running = False
This is the error I got:
KafkaError{code=_KEY_DESERIALIZATION,val=-160,str=”‘str’ object is not callable”}
It is worth mentioning that I cannot directly put schema and Avro config in Kafka, similar to what I did in Java.
Advertisement
Answer
Your key deserialization is failing, not the Avro value… You have set an empty string as your key deserializer, which is not a function, or a deserializer instance…
In any case, look at the example code
sr_conf = {'url': args.schema_registry} schema_registry_client = SchemaRegistryClient(sr_conf) avro_deserializer = AvroDeserializer(schema_registry_client, schema_str, dict_to_user) string_deserializer = StringDeserializer('utf_8') consumer_conf = {'bootstrap.servers': args.bootstrap_servers, 'key.deserializer': string_deserializer, 'value.deserializer': avro_deserializer, 'group.id': args.group, 'auto.offset.reset': "earliest"} consumer = DeserializingConsumer(consumer_conf) consumer.subscribe([topic]) while True: try: # SIGINT can't be handled when polling, limit timeout to 1 second. msg = consumer.poll(1.0) if msg is None: continue user = msg.value()
cannot directly put schema and Avro config in Kafka, similar to what I did in Java.
Assume you mean “in Python” rather than “in Kafka”? Sure, you can. The example loads an .avsc
file.