Skip to content
Advertisement

How to pass the PostgreSQL query result into a variable in Airflow? (Postgres Operator or Postgres Hook)

I’m planning to use PostgreSQL as my task meta info provider, so I want to run a few queries and get some data and pass it like a filled variable to another task. The problem is when I use PostgresHook I get the data but its in a python method that I cant access, in fact I see bellow line

[2021-08-23 13:00:12,628] {python.py:151} INFO - Done. Returned value was: [[1, "inf_account",....]]

here is part of my code:

def _query_postgres(**context):
    """
    Queries Postgres and returns a cursor to the results.
    """

    postgres = PostgresHook(postgres_conn_id="aramis_postgres_connection")
    conn = postgres.get_conn()
    cursor = conn.cursor()
    mark_williams = cursor.execute(" SELECT * FROM public.aramis_meta_task; ")

    # iterate over to get a list of dicts
    details_dicts = [doc for doc in cursor]

    # serialize to json string
    details_json_string = json.dumps(details_dicts, default=json_util.default)

    task_instance = context['task_instance']
    task_instance.xcom_push(key="my_value", value=details_json_string)
    return details_json_string

but I don’t know which variable should I use to access it or how to push it to XCOM so that i can use that returned value as a parameter to another bashoperator task(Spark for example).

PostgresOperator on the other hand, only returns None as result.

Advertisement

Answer

The trick behind XComs is that you push them in one task and pull it in another task. If you want to use the XCom you pushed in the _query_postgres function in a bash operator you can use something like this:

puller = BashOperator(
        task_id="do_something_postgres_result",
        bash_command="some-bash-command {{ task_instance.xcom_pull(key='my_value', task_ids='query_postgres_task_id_here') }}",
        dag=dag)

You will need to replace the bash_command with the appropriate one, and change the task_ids from the xcom_pull() to set the task_id from the task you created that invokes the _query_postgres function.

Regarding PostgresOperator, it’s okay that returns None. It’s not meant for data extraction (even if you run a SELECT query. The way you implemented that with the PostgresHook is okay.

Some good sources to understand XComs:

  1. https://medium.com/analytics-vidhya/airflow-tricks-xcom-and-subdag-361ff5cd46ff
  2. https://precocityllc.com/blog/airflow-and-xcom-inter-task-communication-use-cases/
  3. https://github.com/apache/airflow/blob/main/airflow/example_dags/example_xcom.py
User contributions licensed under: CC BY-SA
8 People found this is helpful
Advertisement