I’m planning to use PostgreSQL as my task meta info provider, so I want to run a few queries and get some data and pass it like a filled variable to another task. The problem is when I use PostgresHook I get the data but its in a python method that I cant access, in fact I see bellow line
[2021-08-23 13:00:12,628] {python.py:151} INFO - Done. Returned value was: [[1, "inf_account",....]]
here is part of my code:
def _query_postgres(**context): """ Queries Postgres and returns a cursor to the results. """ postgres = PostgresHook(postgres_conn_id="aramis_postgres_connection") conn = postgres.get_conn() cursor = conn.cursor() mark_williams = cursor.execute(" SELECT * FROM public.aramis_meta_task; ") # iterate over to get a list of dicts details_dicts = [doc for doc in cursor] # serialize to json string details_json_string = json.dumps(details_dicts, default=json_util.default) task_instance = context['task_instance'] task_instance.xcom_push(key="my_value", value=details_json_string) return details_json_string
but I don’t know which variable should I use to access it or how to push it to XCOM so that i can use that returned value as a parameter to another bashoperator task(Spark for example).
PostgresOperator on the other hand, only returns None
as result.
Advertisement
Answer
The trick behind XComs
is that you push
them in one task and pull
it in another task. If you want to use the XCom
you pushed in the _query_postgres
function in a bash operator you can use something like this:
puller = BashOperator( task_id="do_something_postgres_result", bash_command="some-bash-command {{ task_instance.xcom_pull(key='my_value', task_ids='query_postgres_task_id_here') }}", dag=dag)
You will need to replace the bash_command
with the appropriate one, and change the task_ids
from the xcom_pull()
to set the task_id
from the task you created that invokes the _query_postgres
function.
Regarding PostgresOperator
, it’s okay that returns None
. It’s not meant for data extraction (even if you run a SELECT
query. The way you implemented that with the PostgresHook
is okay.
Some good sources to understand XComs
: