Skip to content
Advertisement

consuming Kafka Avro massages in Python

I am trying to consume messages from Kafka Avro in Python. We have it in Java, and it’s working, but when trying to consume it in the Jupyter notebook, Parsing does not work. I followed the example given by the documentation: (I’ve removed conf information for security reasons)

from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.avro import AvroConsumer
from confluent_kafka import DeserializingConsumer


conf_schema = { 'url' : 'https:',  
                'basic.auth.user.info' : ':'}

Schema_registry_client = SchemaRegistryClient(conf_schema)

config = {  'bootstrap.servers' : '',
            'security.protocol' : '',
            'sasl.username' : '',
            'sasl.password' : '',
            'ssl.endpoint.identification.algorithm' : 'https',
            'sasl.mechanism' : 'PLAIN',
            'group.id' : '',
            'auto.offset.reset' : 'earliest',
            'key.deserializer' : '',
            'value.deserializer' : key_avro_deserializer,
            'ssl.endpoint.identification.algorithm' : 'https'
            }

c = DeserializingConsumer(config)
c.subscribe(['skyjack-export-aemp-20-external'])


import io

def decode(msg_value):
    message_bytes = io.BytesIO(msg_value)
    decoder = BinaryDecoder(message_bytes)
    event_dict = reader.read(decoder)
    return event_dict

total_count = 0
running = True
while running:
    msg = c.poll()
    if not msg.error():
        msg_value = msg.value()
        event_dict = decode(msg_value)
        print(event_dict)
    elif msg.error().code() != KafkaError._PARTITION_EOF:
        print(msg.error())
        running = False

This is the error I got:

KafkaError{code=_KEY_DESERIALIZATION,val=-160,str=”‘str’ object is not callable”}

It is worth mentioning that I cannot directly put schema and Avro config in Kafka, similar to what I did in Java.

Advertisement

Answer

Your key deserialization is failing, not the Avro value… You have set an empty string as your key deserializer, which is not a function, or a deserializer instance…

In any case, look at the example code

sr_conf = {'url': args.schema_registry}
schema_registry_client = SchemaRegistryClient(sr_conf)

avro_deserializer = AvroDeserializer(schema_registry_client,
                                     schema_str,
                                     dict_to_user)
string_deserializer = StringDeserializer('utf_8')

consumer_conf = {'bootstrap.servers': args.bootstrap_servers,
                 'key.deserializer': string_deserializer,
                 'value.deserializer': avro_deserializer,
                 'group.id': args.group,
                 'auto.offset.reset': "earliest"}

consumer = DeserializingConsumer(consumer_conf)
consumer.subscribe([topic])

while True:
    try:
        # SIGINT can't be handled when polling, limit timeout to 1 second.
        msg = consumer.poll(1.0)
        if msg is None:
            continue

        user = msg.value()

cannot directly put schema and Avro config in Kafka, similar to what I did in Java.

Assume you mean “in Python” rather than “in Kafka”? Sure, you can. The example loads an .avsc file.

User contributions licensed under: CC BY-SA
7 People found this is helpful
Advertisement