Here’s a high-level view of my pipeline:
MQTT -> Kafka/MQTT bridge [producer] -> Kafka connect -> AWS Kinesis data stream -> Lambda function (decode Kinesis.record.data) -> store DynamoDB
Basically, the full flow is working properly but I’m getting a weird “json” format when I’m trying to read the kinesis.record.data.
It isn’t utf8. It includes utf8-encoded quoted-string-encoded-JSON records separated by a binary header (so it looks like it’s JSON that’s gone through a printed-quotable filter and then got packed into a binary format). I don’t think it’s anything Kinesis-specific and I’ve been wasting countless hours debugging – it has to come from the Kafka producer or in-between, Kafka connect…
Here’s the python code, I’m running to grab my metrics from various MQTT topics and send the message to my Kafka producer, using kafka-python:
kafka_producer = KafkaProducer( bootstrap_servers= "10.0.0.129:9092", value_serializer=lambda v: json.dumps(v).encode('utf-8')) data = { "datetime": str(datetime.now()), "topic": str(msg.topic), "value": str(msg.payload.decode()), "environment": "test-v1" } kafka_producer.send("greenforge-events", data)
And I’m running Kafka in connect-distributed mode with the properties:
bootstrap.servers=PLAINTEXT://kafka:9092 group.id=connect-cluster #key.converter=org.apache.kafka.connect.json.JsonConverter key.converter=org.apache.kafka.connect.storage.StringConverter #value.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.storage.StringConverter key.converter.schemas.enable=false value.converter.schemas.enable=false offset.storage.topic=connect-offsets offset.storage.replication.factor=1 config.storage.topic=connect-configs config.storage.replication.factor=1 status.storage.topic=connect-status status.storage.replication.factor=1 offset.flush.interval.ms=10000 plugin.path=/opt/kafka/plugins
I’ve read a bit about encoding and decoding on Kafka connect, but I’m new to this. As you can see from my python code, I’m sending successfully my JSON (as a string, but in the property file I tried to set it to JSONConverter with no success)…
And here’s my connector config:
name=greenforges connector.class=com.amazon.kinesis.kafka.AmazonKinesisSinkConnector tasks.max=1 topics=greenforge-events region=us-east-2 streamName=greenforge-prototype-stream usePartitionAsHashKey=false flushSync=true # Use new Kinesis Producer for each Partition singleKinesisProducerPerPartition=true # Whether to block new records from putting onto Kinesis Producer if # threshold for outstandings records have reached pauseConsumption=true outstandingRecordsThreshold=500000 # If outstanding records on producers are beyond threshold sleep for following period (in ms) sleepPeriod=1000 # If outstanding records on producers are not cleared sleep for following cycle before killing the tasks sleepCycles=10 # Kinesis Producer Configuration - https://github.com/awslabs/amazon-kinesis-producer/blob/master/java/amazon-kinesis-producer-sample/default_config.properties # All kinesis producer configuration have not been exposed maxBufferedTime=1500 maxConnections=1 rateLimit=100 ttl=60000 metricsLevel=detailed metricsGranuality=shard metricsNameSpace=KafkaKinesisStreamsConnector aggregation=true
Finally, when I try reading the kinesis.record.data this is what I get:
const stream = "84mawgoBMBqfAQgAGpoBeyJkYXRldGltZSI6ICIyMDIyLTEwLTExIDE0OjQ2OjE2LjE0MzY0OSIsICJ0b3BpYyI6ICJncmVlbmZvcmdlL3V1aWQvdXRpbGl0eTEvYWlycHJlc3N1cmUiLCAidmFsdWUiOiAie1wiaGVjdG8gcGFzY2Fsc1wiOiA2NDJ9IiwgImVudmlyb25tZW50IjogInRlc3QtdjEifRqdAQgAGpgBeyJkYXRldGltZSI6ICIyMDIyLTEwLTExIDE0OjQ2OjE2LjE1ODMxOCIsICJ0b3BpYyI6ICJncmVlbmZvcmdlL3V1aWQvdXRpbGl0eTEvY2FzaW5ndGVtcGVyYXR1cmUiLCAidmFsdWUiOiAie1wiY2Vsc2l1c1wiOiA4fSIsICJlbnZpcm9ubWVudCI6ICJ0ZXN0LXYxIn0anAEIABqXAXsiZGF0ZXRpbWUiOiAiMjAyMi0xMC0xMSAxNDo0NjoxNi4xNzc3MzgiLCAidG9waWMiOiAiZ3JlZW5mb3JnZS91dWlkL3V0aWxpdHkxL2NvMiIsICJ2YWx1ZSI6ICJ7XCJwYXJ0cyBwZXIgbWlsbGlvblwiOiAxMzQ2fSIsICJlbnZpcm9ubWVudCI6ICJ0ZXN0LXYxIn0arAEIABqnAXsiZGF0ZXRpbWUiOiAiMjAyMi0xMC0xMSAxNDo0NjoxNi4xODUwNjAiLCAidG9waWMiOiAiZ3JlZW5mb3JnZS91dWlkL3V0aWxpdHkzL2FpcnRlbXBlcmF0dXJlIiwgInZhbHVlIjogIntcImNlbHNpdXNcIjogLTIyLjkwMjU3ODUzNjMyMDQyNn0iLCAiZW52aXJvbm1lbnQiOiAidGVzdC12MSJ9GqABCAAamwF7ImRhdGV0aW1lIjogIjIwMjItMTAtMTEgMTQ6NDY6MTYuMTkyNzg4IiwgInRvcGljIjogImdyZWVuZm9yZ2UvdXVpZC91dGlsaXR5My9haXJwcmVzc3VyZSIsICJ2YWx1ZSI6ICJ7XCJoZWN0byBwYXNjYWxzXCI6IDEwNTR9IiwgImVudmlyb25tZW50IjogInRlc3QtdjEifRqjAQgAGp4BeyJkYXRldGltZSI6ICIyMDIyLTEwLTExIDE0OjQ2OjE2LjIwNTU3NiIsICJ0b3BpYyI6ICJncmVlbmZvcmdlL3V1aWQvdXRpbGl0eTkvaGlnaGxldmVsZmxvYXRzd2l0Y2giLCAidmFsdWUiOiAie1wiZW5hYmxlZFwiOiB0cnVlfSIsICJlbnZpcm9ubWVudCI6ICJ0ZXN0LXYxIn0anwEIABqaAXsiZGF0ZXRpbWUiOiAiMjAyMi0xMC0xMSAxNDo0NjoxNi4yMjIyMDAiLCAidG9waWMiOiAiZ3JlZW5mb3JnZS91dWlkL3V0aWxpdHk5L2FpcnByZXNzdXJlIiwgInZhbHVlIjogIntcImhlY3RvIHBhc2NhbHNcIjogNzQ4fSIsICJlbnZpcm9ubWVudCI6ICJ0ZXN0LXYxIn0argEIABqpAXsiZGF0ZXRpbWUiOiAiMjAyMi0xMC0xMSAxNDo0NjoxNi4yMjc4ODgiLCAidG9waWMiOiAiZ3JlZW5mb3JnZS91dWlkL3V0aWxpdHk2L2Nhc2luZ3RlbXBlcmF0dXJlIiwgInZhbHVlIjogIntcImNlbHNpdXNcIjogMTUuMDMzODQ4NDg3NTEwNzY5fSIsICJlbnZpcm9ubWVudCI6ICJ0ZXN0LXYxIn0anQEIABqYAXsiZGF0ZXRpbWUiOiAiMjAyMi0xMC0xMSAxNDo0NjoxNi4yNjU4ODIiLCAidG9waWMiOiAiZ3JlZW5mb3JnZS91dWlkL3V0aWxpdHk5L3NvbGVub2lkdmFsdmUiLCAidmFsdWUiOiAie1wiZW5hYmxlZFwiOiBmYWxzZX0iLCAiZW52aXJvbm1lbnQiOiAidGVzdC12MSJ9Gq8BCAAaqgF7ImRhdGV0aW1lIjogIjIwMjItMTAtMTEgMTQ6NDY6MTYuMjcxMTg2IiwgInRvcGljIjogImdyZWVuZm9yZ2UvdXVpZC91dGlsaXR5OS9jYXNpbmd0ZW1wZXJhdHVyZSIsICJ2YWx1ZSI6ICJ7XCJjZWxzaXVzXCI6IC0yLjk5NDY5NTUwNzkxNjc0MDh9IiwgImVudmlyb25tZW50IjogInRlc3QtdjEifRqsAQgAGqcBeyJkYXRldGltZSI6ICIyMDIyLTEwLTExIDE0OjQ2OjE3LjE3MTI0NCIsICJ0b3BpYyI6ICJncmVlbmZvcmdlL3V1aWQvdXRpbGl0eTEvYWlydGVtcGVyYXR1cmUiLCAidmFsdWUiOiAie1wiY2Vsc2l1c1wiOiAtMTMuMjE1NTQ4ODAzNjQxMTU3fSIsICJlbnZpcm9ubWVudCI6ICJ0ZXN0LXYxIn0anAEIABqXAXsiZGF0ZXRpbWUiOiAiMjAyMi0xMC0xMSAxNDo0NjoxNy4xNzc5OTMiLCAidG9waWMiOiAiZ3JlZW5mb3JnZS91dWlkL3V0aWxpdHkxL2NvMiIsICJ2YWx1ZSI6ICJ7XCJwYXJ0cyBwZXIgbWlsbGlvblwiOiAxMzQ2fSIsICJlbnZpcm9ubWVudCI6ICJ0ZXN0LXYxIn0arAEIABqnAXsiZGF0ZXRpbWUiOiAiMjAyMi0xMC0xMSAxNDo0NjoxNy4xODU2NzAiLCAidG9waWMiOiAiZ3JlZW5mb3JnZS91dWlkL3V0aWxpdHkzL2FpcnRlbXBlcmF0dXJlIiwgInZhbHVlIjogIntcImNlbHNpdXNcIjogLTIyLjkwMjU3ODUzNjMyMDQyNn0iLCAiZW52aXJvbm1lbnQiOiAidGVzdC12MSJ9GqABCAAamwF7ImRhdGV0aW1lIjogIjIwMjItMTAtMTEgMTQ6NDY6MTcuMTkzMTUzIiwgInRvcGljIjogImdyZWVuZm9yZ2UvdXVpZC91dGlsaXR5My9haXJwcmVzc3VyZSIsICJ2YWx1ZSI6ICJ7XCJoZWN0byBwYXNjYWxzXCI6IDEwNTR9IiwgImVudmlyb25tZW50IjogInRlc3QtdjEifRqqAQgAGqUBeyJkYXRldGltZSI6ICIyMDIyLTEwLTExIDE0OjQ2OjE3LjIxOTIwMSIsICJ0b3BpYyI6ICJncmVlbmZvcmdlL3V1aWQvdXRpbGl0eTYvYWlydGVtcGVyYXR1cmUiLCAidmFsdWUiOiAie1wiY2Vsc2l1c1wiOiA0NS44MDQ0OTA2MDc4OTEwNn0iLCAiZW52aXJvbm1lbnQiOiAidGVzdC12MSJ9Gq4BCAAaqQF7ImRhdGV0aW1lIjogIjIwMjItMTAtMTEgMTQ6NDY6MTcuMjI5OTc3IiwgInRvcGljIjogImdyZWVuZm9yZ2UvdXVpZC91dGlsaXR5Ni9jYXNpbmd0ZW1wZXJhdHVyZSIsICJ2YWx1ZSI6ICJ7XCJjZWxzaXVzXCI6IDE1LjAzMzg0ODQ4NzUxMDc2OX0iLCAiZW52aXJvbm1lbnQiOiAidGVzdC12MSJ90UczjB09DYqhb40gF9vKEA=="; var payload = Buffer.from(stream, 'base64').toString('utf8'); console.log('Decoded payload: %s', payload);
Here’s the log:
Decoded payload: �� 0�� { "datetime": "2022-10-11 14:46:16.143649", "topic": "greenforge/uuid/utility1/airpressure", "value": "{"hecto pascals": 642}", "environment": "test-v1" } �� { "datetime": "2022-10-11 14:46:16.158318", "topic": "greenforge/uuid/utility1/casingtemperature", "value": "{"celsius": 8}", "environment": "test-v1" } �� { "datetime": "2022-10-11 14:46:16.177738", "topic": "greenforge/uuid/utility1/co2", "value": "{"parts per million": 1346}", "environment": "test-v1" } �� { "datetime": "2022-10-11 14:46:16.185060", "topic": "greenforge/uuid/utility3/airtemperature", "value": "{"celsius": -22.902578536320426}", "environment": "test-v1" } �� { "datetime": "2022-10-11 14:46:16.192788", "topic": "greenforge/uuid/utility3/airpressure", "value": "{"hecto pascals": 1054}", "environment": "test-v1" } �� { "datetime": "2022-10-11 14:46:16.205576", "topic": "greenforge/uuid/utility9/highlevelfloatswitch", "value": "{"enabled": true}", "environment": "test-v1" } �� { "datetime": "2022-10-11 14:46:16.222200", "topic": "greenforge/uuid/utility9/airpressure", "value": "{"hecto pascals": 748}", "environment": "test-v1" } �� { "datetime": "2022-10-11 14:46:16.227888", "topic": "greenforge/uuid/utility6/casingtemperature", "value": "{"celsius": 15.033848487510769}", "environment": "test-v1" } �� { "datetime": "2022-10-11 14:46:16.265882", "topic": "greenforge/uuid/utility9/solenoidvalve", "value": "{"enabled": false}", "environment": "test-v1" } �� { "datetime": "2022-10-11 14:46:16.271186", "topic": "greenforge/uuid/utility9/casingtemperature", "value": "{"celsius": -2.9946955079167408}", "environment": "test-v1" } �� { "datetime": "2022-10-11 14:46:17.171244", "topic": "greenforge/uuid/utility1/airtemperature", "value": "{"celsius": -13.215548803641157}", "environment": "test-v1" } �� { "datetime": "2022-10-11 14:46:17.177993", "topic": "greenforge/uuid/utility1/co2", "value": "{"parts per million": 1346}", "environment": "test-v1" } �� { "datetime": "2022-10-11 14:46:17.185670", "topic": "greenforge/uuid/utility3/airtemperature", "value": "{"celsius": -22.902578536320426}", "environment": "test-v1" } �� { "datetime": "2022-10-11 14:46:17.193153", "topic": "greenforge/uuid/utility3/airpressure", "value": "{"hecto pascals": 1054}", "environment": "test-v1" } �� { "datetime": "2022-10-11 14:46:17.219201", "topic": "greenforge/uuid/utility6/airtemperature", "value": "{"celsius": 45.80449060789106}", "environment": "test-v1" } �� { "datetime": "2022-10-11 14:46:17.229977", "topic": "greenforge/uuid/utility6/casingtemperature", "value": "{"celsius": 15.033848487510769}", "environment": "test-v1" } �G3�= ��o� ��
I’m 110% sure that the weird characters represent the stream record data structure for example something along those lines:
( topic='greenforge-events', partition=0, offset=24, timestamp=21321421312312, key=None, value={JSON_Object}, checksum=321321, serialized_key_size=-1 serialized_value_size=49 )
What am I missing here?
Advertisement
Answer
Based on the printed payload, the data is being batched together within Kinesis, and in some custom binary format
You’d probably want individual Kinesis records that you can parse and insert row-by-row into your downstream systems, so you can set
aggregation=false
https://github.com/awslabs/kinesis-kafka-connector#kafka-kinesis-streams-connectorproperties
Then JSON.parse(payload)
in the lambda should work.
If you did want to batch records together, you can do that in Python
kafka_producer = ... counter = 0 events = [] for msg in mqtt_consumer: data = { "datetime": str(datetime.now()), "topic": msg.topic, "value": msg.payload.decode(), "environment": "test-v1" } events.append(data) counter += 1 if counter % 10 == 0: # for example, 10 at a time kafka_producer.send("greenforge-events", events) events.clear() counter = 0 if events: # send remainder if for-loop exits with error kafka_producer.send("greenforge-events", events)