Skip to content
Advertisement

Confused with encoding and decoding with Kafka connect

Here’s a high-level view of my pipeline:

MQTT -> Kafka/MQTT bridge [producer] -> Kafka connect -> AWS Kinesis data stream -> Lambda function (decode Kinesis.record.data) -> store DynamoDB

Basically, the full flow is working properly but I’m getting a weird “json” format when I’m trying to read the kinesis.record.data.

It isn’t utf8. It includes utf8-encoded quoted-string-encoded-JSON records separated by a binary header (so it looks like it’s JSON that’s gone through a printed-quotable filter and then got packed into a binary format). I don’t think it’s anything Kinesis-specific and I’ve been wasting countless hours debugging – it has to come from the Kafka producer or in-between, Kafka connect…

Here’s the python code, I’m running to grab my metrics from various MQTT topics and send the message to my Kafka producer, using kafka-python:

kafka_producer = KafkaProducer( bootstrap_servers= "10.0.0.129:9092",
                                value_serializer=lambda v: json.dumps(v).encode('utf-8'))
data = {
                "datetime": str(datetime.now()),
                "topic": str(msg.topic),
                "value": str(msg.payload.decode()),
                "environment": "test-v1"
        }
kafka_producer.send("greenforge-events", data)

And I’m running Kafka in connect-distributed mode with the properties:

bootstrap.servers=PLAINTEXT://kafka:9092
group.id=connect-cluster
#key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter=org.apache.kafka.connect.storage.StringConverter
#value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
offset.storage.topic=connect-offsets
offset.storage.replication.factor=1
config.storage.topic=connect-configs
config.storage.replication.factor=1
status.storage.topic=connect-status
status.storage.replication.factor=1
offset.flush.interval.ms=10000
plugin.path=/opt/kafka/plugins

I’ve read a bit about encoding and decoding on Kafka connect, but I’m new to this. As you can see from my python code, I’m sending successfully my JSON (as a string, but in the property file I tried to set it to JSONConverter with no success)…

And here’s my connector config:

name=greenforges
connector.class=com.amazon.kinesis.kafka.AmazonKinesisSinkConnector
tasks.max=1
topics=greenforge-events
region=us-east-2
streamName=greenforge-prototype-stream
usePartitionAsHashKey=false
flushSync=true
# Use new Kinesis Producer for each Partition
singleKinesisProducerPerPartition=true
# Whether to block new records from putting onto Kinesis Producer if 
# threshold for outstandings records have reached 
pauseConsumption=true
outstandingRecordsThreshold=500000
# If outstanding records on producers are beyond threshold sleep for following period (in ms) 
sleepPeriod=1000
# If outstanding records on producers are not cleared sleep for following cycle before killing the tasks
sleepCycles=10
# Kinesis Producer Configuration - https://github.com/awslabs/amazon-kinesis-producer/blob/master/java/amazon-kinesis-producer-sample/default_config.properties
# All kinesis producer configuration have not been exposed
maxBufferedTime=1500
maxConnections=1
rateLimit=100
ttl=60000
metricsLevel=detailed
metricsGranuality=shard
metricsNameSpace=KafkaKinesisStreamsConnector
aggregation=true

Finally, when I try reading the kinesis.record.data this is what I get:

const stream = "84mawgoBMBqfAQgAGpoBeyJkYXRldGltZSI6ICIyMDIyLTEwLTExIDE0OjQ2OjE2LjE0MzY0OSIsICJ0b3BpYyI6ICJncmVlbmZvcmdlL3V1aWQvdXRpbGl0eTEvYWlycHJlc3N1cmUiLCAidmFsdWUiOiAie1wiaGVjdG8gcGFzY2Fsc1wiOiA2NDJ9IiwgImVudmlyb25tZW50IjogInRlc3QtdjEifRqdAQgAGpgBeyJkYXRldGltZSI6ICIyMDIyLTEwLTExIDE0OjQ2OjE2LjE1ODMxOCIsICJ0b3BpYyI6ICJncmVlbmZvcmdlL3V1aWQvdXRpbGl0eTEvY2FzaW5ndGVtcGVyYXR1cmUiLCAidmFsdWUiOiAie1wiY2Vsc2l1c1wiOiA4fSIsICJlbnZpcm9ubWVudCI6ICJ0ZXN0LXYxIn0anAEIABqXAXsiZGF0ZXRpbWUiOiAiMjAyMi0xMC0xMSAxNDo0NjoxNi4xNzc3MzgiLCAidG9waWMiOiAiZ3JlZW5mb3JnZS91dWlkL3V0aWxpdHkxL2NvMiIsICJ2YWx1ZSI6ICJ7XCJwYXJ0cyBwZXIgbWlsbGlvblwiOiAxMzQ2fSIsICJlbnZpcm9ubWVudCI6ICJ0ZXN0LXYxIn0arAEIABqnAXsiZGF0ZXRpbWUiOiAiMjAyMi0xMC0xMSAxNDo0NjoxNi4xODUwNjAiLCAidG9waWMiOiAiZ3JlZW5mb3JnZS91dWlkL3V0aWxpdHkzL2FpcnRlbXBlcmF0dXJlIiwgInZhbHVlIjogIntcImNlbHNpdXNcIjogLTIyLjkwMjU3ODUzNjMyMDQyNn0iLCAiZW52aXJvbm1lbnQiOiAidGVzdC12MSJ9GqABCAAamwF7ImRhdGV0aW1lIjogIjIwMjItMTAtMTEgMTQ6NDY6MTYuMTkyNzg4IiwgInRvcGljIjogImdyZWVuZm9yZ2UvdXVpZC91dGlsaXR5My9haXJwcmVzc3VyZSIsICJ2YWx1ZSI6ICJ7XCJoZWN0byBwYXNjYWxzXCI6IDEwNTR9IiwgImVudmlyb25tZW50IjogInRlc3QtdjEifRqjAQgAGp4BeyJkYXRldGltZSI6ICIyMDIyLTEwLTExIDE0OjQ2OjE2LjIwNTU3NiIsICJ0b3BpYyI6ICJncmVlbmZvcmdlL3V1aWQvdXRpbGl0eTkvaGlnaGxldmVsZmxvYXRzd2l0Y2giLCAidmFsdWUiOiAie1wiZW5hYmxlZFwiOiB0cnVlfSIsICJlbnZpcm9ubWVudCI6ICJ0ZXN0LXYxIn0anwEIABqaAXsiZGF0ZXRpbWUiOiAiMjAyMi0xMC0xMSAxNDo0NjoxNi4yMjIyMDAiLCAidG9waWMiOiAiZ3JlZW5mb3JnZS91dWlkL3V0aWxpdHk5L2FpcnByZXNzdXJlIiwgInZhbHVlIjogIntcImhlY3RvIHBhc2NhbHNcIjogNzQ4fSIsICJlbnZpcm9ubWVudCI6ICJ0ZXN0LXYxIn0argEIABqpAXsiZGF0ZXRpbWUiOiAiMjAyMi0xMC0xMSAxNDo0NjoxNi4yMjc4ODgiLCAidG9waWMiOiAiZ3JlZW5mb3JnZS91dWlkL3V0aWxpdHk2L2Nhc2luZ3RlbXBlcmF0dXJlIiwgInZhbHVlIjogIntcImNlbHNpdXNcIjogMTUuMDMzODQ4NDg3NTEwNzY5fSIsICJlbnZpcm9ubWVudCI6ICJ0ZXN0LXYxIn0anQEIABqYAXsiZGF0ZXRpbWUiOiAiMjAyMi0xMC0xMSAxNDo0NjoxNi4yNjU4ODIiLCAidG9waWMiOiAiZ3JlZW5mb3JnZS91dWlkL3V0aWxpdHk5L3NvbGVub2lkdmFsdmUiLCAidmFsdWUiOiAie1wiZW5hYmxlZFwiOiBmYWxzZX0iLCAiZW52aXJvbm1lbnQiOiAidGVzdC12MSJ9Gq8BCAAaqgF7ImRhdGV0aW1lIjogIjIwMjItMTAtMTEgMTQ6NDY6MTYuMjcxMTg2IiwgInRvcGljIjogImdyZWVuZm9yZ2UvdXVpZC91dGlsaXR5OS9jYXNpbmd0ZW1wZXJhdHVyZSIsICJ2YWx1ZSI6ICJ7XCJjZWxzaXVzXCI6IC0yLjk5NDY5NTUwNzkxNjc0MDh9IiwgImVudmlyb25tZW50IjogInRlc3QtdjEifRqsAQgAGqcBeyJkYXRldGltZSI6ICIyMDIyLTEwLTExIDE0OjQ2OjE3LjE3MTI0NCIsICJ0b3BpYyI6ICJncmVlbmZvcmdlL3V1aWQvdXRpbGl0eTEvYWlydGVtcGVyYXR1cmUiLCAidmFsdWUiOiAie1wiY2Vsc2l1c1wiOiAtMTMuMjE1NTQ4ODAzNjQxMTU3fSIsICJlbnZpcm9ubWVudCI6ICJ0ZXN0LXYxIn0anAEIABqXAXsiZGF0ZXRpbWUiOiAiMjAyMi0xMC0xMSAxNDo0NjoxNy4xNzc5OTMiLCAidG9waWMiOiAiZ3JlZW5mb3JnZS91dWlkL3V0aWxpdHkxL2NvMiIsICJ2YWx1ZSI6ICJ7XCJwYXJ0cyBwZXIgbWlsbGlvblwiOiAxMzQ2fSIsICJlbnZpcm9ubWVudCI6ICJ0ZXN0LXYxIn0arAEIABqnAXsiZGF0ZXRpbWUiOiAiMjAyMi0xMC0xMSAxNDo0NjoxNy4xODU2NzAiLCAidG9waWMiOiAiZ3JlZW5mb3JnZS91dWlkL3V0aWxpdHkzL2FpcnRlbXBlcmF0dXJlIiwgInZhbHVlIjogIntcImNlbHNpdXNcIjogLTIyLjkwMjU3ODUzNjMyMDQyNn0iLCAiZW52aXJvbm1lbnQiOiAidGVzdC12MSJ9GqABCAAamwF7ImRhdGV0aW1lIjogIjIwMjItMTAtMTEgMTQ6NDY6MTcuMTkzMTUzIiwgInRvcGljIjogImdyZWVuZm9yZ2UvdXVpZC91dGlsaXR5My9haXJwcmVzc3VyZSIsICJ2YWx1ZSI6ICJ7XCJoZWN0byBwYXNjYWxzXCI6IDEwNTR9IiwgImVudmlyb25tZW50IjogInRlc3QtdjEifRqqAQgAGqUBeyJkYXRldGltZSI6ICIyMDIyLTEwLTExIDE0OjQ2OjE3LjIxOTIwMSIsICJ0b3BpYyI6ICJncmVlbmZvcmdlL3V1aWQvdXRpbGl0eTYvYWlydGVtcGVyYXR1cmUiLCAidmFsdWUiOiAie1wiY2Vsc2l1c1wiOiA0NS44MDQ0OTA2MDc4OTEwNn0iLCAiZW52aXJvbm1lbnQiOiAidGVzdC12MSJ9Gq4BCAAaqQF7ImRhdGV0aW1lIjogIjIwMjItMTAtMTEgMTQ6NDY6MTcuMjI5OTc3IiwgInRvcGljIjogImdyZWVuZm9yZ2UvdXVpZC91dGlsaXR5Ni9jYXNpbmd0ZW1wZXJhdHVyZSIsICJ2YWx1ZSI6ICJ7XCJjZWxzaXVzXCI6IDE1LjAzMzg0ODQ4NzUxMDc2OX0iLCAiZW52aXJvbm1lbnQiOiAidGVzdC12MSJ90UczjB09DYqhb40gF9vKEA==";
        
var payload = Buffer.from(stream, 'base64').toString('utf8');
console.log('Decoded payload: %s', payload);

Here’s the log:

Decoded payload: ��
0��
{
    "datetime": "2022-10-11 14:46:16.143649",
    "topic": "greenforge/uuid/utility1/airpressure",
    "value": "{"hecto pascals": 642}",
    "environment": "test-v1"
}
��
{
    "datetime": "2022-10-11 14:46:16.158318",
    "topic": "greenforge/uuid/utility1/casingtemperature",
    "value": "{"celsius": 8}",
    "environment": "test-v1"
}
��
{
    "datetime": "2022-10-11 14:46:16.177738",
    "topic": "greenforge/uuid/utility1/co2",
    "value": "{"parts per million": 1346}",
    "environment": "test-v1"
}
��
{
    "datetime": "2022-10-11 14:46:16.185060",
    "topic": "greenforge/uuid/utility3/airtemperature",
    "value": "{"celsius": -22.902578536320426}",
    "environment": "test-v1"
}
��
{
    "datetime": "2022-10-11 14:46:16.192788",
    "topic": "greenforge/uuid/utility3/airpressure",
    "value": "{"hecto pascals": 1054}",
    "environment": "test-v1"
}
��
{
    "datetime": "2022-10-11 14:46:16.205576",
    "topic": "greenforge/uuid/utility9/highlevelfloatswitch",
    "value": "{"enabled": true}",
    "environment": "test-v1"
}
��
{
    "datetime": "2022-10-11 14:46:16.222200",
    "topic": "greenforge/uuid/utility9/airpressure",
    "value": "{"hecto pascals": 748}",
    "environment": "test-v1"
}
��
{
    "datetime": "2022-10-11 14:46:16.227888",
    "topic": "greenforge/uuid/utility6/casingtemperature",
    "value": "{"celsius": 15.033848487510769}",
    "environment": "test-v1"
}
��
{
    "datetime": "2022-10-11 14:46:16.265882",
    "topic": "greenforge/uuid/utility9/solenoidvalve",
    "value": "{"enabled": false}",
    "environment": "test-v1"
}
��
{
    "datetime": "2022-10-11 14:46:16.271186",
    "topic": "greenforge/uuid/utility9/casingtemperature",
    "value": "{"celsius": -2.9946955079167408}",
    "environment": "test-v1"
}
��
{
    "datetime": "2022-10-11 14:46:17.171244",
    "topic": "greenforge/uuid/utility1/airtemperature",
    "value": "{"celsius": -13.215548803641157}",
    "environment": "test-v1"
}
��
{
    "datetime": "2022-10-11 14:46:17.177993",
    "topic": "greenforge/uuid/utility1/co2",
    "value": "{"parts per million": 1346}",
    "environment": "test-v1"
}
��
{
    "datetime": "2022-10-11 14:46:17.185670",
    "topic": "greenforge/uuid/utility3/airtemperature",
    "value": "{"celsius": -22.902578536320426}",
    "environment": "test-v1"
}
��
{
    "datetime": "2022-10-11 14:46:17.193153",
    "topic": "greenforge/uuid/utility3/airpressure",
    "value": "{"hecto pascals": 1054}",
    "environment": "test-v1"
}
��
{
    "datetime": "2022-10-11 14:46:17.219201",
    "topic": "greenforge/uuid/utility6/airtemperature",
    "value": "{"celsius": 45.80449060789106}",
    "environment": "test-v1"
}
��
{
    "datetime": "2022-10-11 14:46:17.229977",
    "topic": "greenforge/uuid/utility6/casingtemperature",
    "value": "{"celsius": 15.033848487510769}",
    "environment": "test-v1"
}
�G3�=
��o� ��

I’m 110% sure that the weird characters represent the stream record data structure for example something along those lines:

(
 topic='greenforge-events',
 partition=0,
 offset=24,
 timestamp=21321421312312,
 key=None,
 value={JSON_Object},
 checksum=321321,
 serialized_key_size=-1
 serialized_value_size=49
)

What am I missing here?

Advertisement

Answer

Based on the printed payload, the data is being batched together within Kinesis, and in some custom binary format

You’d probably want individual Kinesis records that you can parse and insert row-by-row into your downstream systems, so you can set

aggregation=false

https://github.com/awslabs/kinesis-kafka-connector#kafka-kinesis-streams-connectorproperties

Then JSON.parse(payload) in the lambda should work.

If you did want to batch records together, you can do that in Python

kafka_producer = ...

counter = 0
events = []
for msg in mqtt_consumer:
    data = {
        "datetime": str(datetime.now()),
        "topic": msg.topic,
        "value": msg.payload.decode(),
        "environment": "test-v1"
    }
    events.append(data)
    counter += 1
    if counter % 10 == 0:  # for example, 10 at a time
        kafka_producer.send("greenforge-events", events)
        events.clear()
        counter = 0
if events:  # send remainder if for-loop exits with error
    kafka_producer.send("greenforge-events", events)
User contributions licensed under: CC BY-SA
2 People found this is helpful
Advertisement