Skip to content
Advertisement

Kafka with python: How to send topic to postgreSQL?

I am urged to use Kafka with python. Moreover, I need to develop a very simple producer-consumer application that reads metrics from a device in real-time and then publishes them to a topic ‘metrics’ in Kafka. Then a consumer must subscribe to the ‘metrics’ topic and store those data to a postgreSQL database.

I tried to draw the architecture here:

           +-----------+        Fetch metrics every 1 second          +--------------+                                           
           |Biometric  |     {heartrate, oxygen level, temprature}     |              |                                           
           |generation ------------------------------------------------  producer.py |                                           
           |device     |                                              |              |                                           
           +-----------+                                              +-------|------+                                           
                                                                              |                                                  
                                                                              |                                                  
                                                                              |                                                  
                                                                              |Publish metrics in "metrics" topic, every 1 second
                                                                              |{heartrate, oxygen level, tempature}              
                                                                              |         JSON format                              
                                                                              |                                                  
                                                                              |                                                  
                                                                      +-------|------+                                           
                                                                      |              |                                           
                                                                      |    KAFKA     |                                           
                                                                      |              |                                           
                                                                      +-------|------+                                           
                                                                              |                                                  
                                                                              |                                                  
                                                                              |                                                  
                                                                              |                                                  
                                                                              | Subscribe to "metrics" topic and fetch           
                 -                                                            | the JSON every 1 second                          
                                                                              |                                                  
          +-------------+                                              +------|------+                                           
          |             |            Send data to postgreSQL           |             |                                           
          | postgreSQL  ------------------------------------------------ consumer.py |                                           
          |             |                                              |             |                                           
          +-------------+                                              +-------------+                                           

Now, this is how I (with zero Kafka experience) have imagined this app. I managed to get everything to the consumer.

It is very easy for me now to connect to a postgreSQL database and send those data to it. But I am confused. I read everywhere that the connection to a such database must occur through a Kafka Connector (?). Is it wrong to just send the data I receive in the consumer to postgres manually? Why would I use a ‘Kafka connector’ here? At last, I am not aware of any python kafka connectors, which complicates this even more for me.

Could someone help me clear things up?

Advertisement

Answer

If you want to push data to kafka in JSON format I recently wrote a simple example over here.

You can also find the kafka python docs

For the Kafka -> PostgreSQL connection, you might want to use Kafka Connect JDBC sink. Kafka Connect is a series of pre-built connector that allows you to push or pull (source or sink in kafka connect terms) data from Kafka by just writing a config file, without having to code or re-invent the wheel over and over again. Kafka connect is NOT language dependant, since all you need is to deploy it in your Kafka environment and set correctly the config file.

Just pay attention, if you’re planning to use Kafka connect to push data to PostgreSQL, you might need either

  • to create the source stream in AVRO format
  • to add the schema specification to your JSON message (more info here
User contributions licensed under: CC BY-SA
5 People found this is helpful
Advertisement