Skip to content
Advertisement

Confused with encoding and decoding with Kafka connect

Here’s a high-level view of my pipeline:

MQTT -> Kafka/MQTT bridge [producer] -> Kafka connect -> AWS Kinesis data stream -> Lambda function (decode Kinesis.record.data) -> store DynamoDB

Basically, the full flow is working properly but I’m getting a weird “json” format when I’m trying to read the kinesis.record.data.

It isn’t utf8. It includes utf8-encoded quoted-string-encoded-JSON records separated by a binary header (so it looks like it’s JSON that’s gone through a printed-quotable filter and then got packed into a binary format). I don’t think it’s anything Kinesis-specific and I’ve been wasting countless hours debugging – it has to come from the Kafka producer or in-between, Kafka connect…

Here’s the python code, I’m running to grab my metrics from various MQTT topics and send the message to my Kafka producer, using kafka-python:

JavaScript

And I’m running Kafka in connect-distributed mode with the properties:

JavaScript

I’ve read a bit about encoding and decoding on Kafka connect, but I’m new to this. As you can see from my python code, I’m sending successfully my JSON (as a string, but in the property file I tried to set it to JSONConverter with no success)…

And here’s my connector config:

JavaScript

Finally, when I try reading the kinesis.record.data this is what I get:

JavaScript

Here’s the log:

JavaScript

I’m 110% sure that the weird characters represent the stream record data structure for example something along those lines:

JavaScript

What am I missing here?

Advertisement

Answer

Based on the printed payload, the data is being batched together within Kinesis, and in some custom binary format

You’d probably want individual Kinesis records that you can parse and insert row-by-row into your downstream systems, so you can set

JavaScript

https://github.com/awslabs/kinesis-kafka-connector#kafka-kinesis-streams-connectorproperties

Then JSON.parse(payload) in the lambda should work.

If you did want to batch records together, you can do that in Python

JavaScript
User contributions licensed under: CC BY-SA
2 People found this is helpful
Advertisement