Skip to content
Advertisement

How to use DatabaseHook objects with PythonOperator in Airflow without running out of connections?

I’m trying to store my database credentials using Airflow Connections and use them with PythonOperators. I noticed that if I pass the credentials to the PythonOperator then every variable gets logged, including the database password. So I moved to pass the connection object itself to the PythonOperator, per the example below.

But the issue I have now is that airflow creates a ton of these objects, even though this dag is only scheduled to operate daily, leading there to often be issues of reaching the connection limit. How do I use a PostgresHook with a PythonOperator without using a ton of connections for a data script in Airflow?

import sys
from airflow import DAG
from datetime import datetime, timedelta
from airflow.operators.python_operator import PythonOperator
from airflow.hooks.postgres_hook import PostgresHook

try:
    sys.path.append('/path/to/my/awesome/module/')
    from awesome_module import function_1, function_1
except:
    raise ImportError("Couldn't import awesome_module")

postgres_hook_object = PostgresHook("dedicated_bot_account")


with postgres_hook_object.get_conn() as con:
    t1 = PythonOperator(
            task_id = 'function_1',
            python_callable = function_1, 
            dag = dag,
            op_kwargs = {'conn':con}
            )

    t2 = PythonOperator(
            task_id = 'function_2',
            python_callable = function_2,
            dag = dag,
            op_args = [con, service]
            )

Advertisement

Answer

From the Airflow Slack I learned that the code in the DAG is run at the frequency of the scheduler, hence the opening of multiple connections every time the scheduler refreshes the DAG.

It seems like best practice is to ensure the connection is opened only at task runtime by either:

  1. if the tasks are defined in the DAG, move the connection opening code to within the Python function definition
  2. if the tasks are defined elsewhere have the connection opened within the task. Note that if passing the connection information via plaintext as variable, then this will get logged
User contributions licensed under: CC BY-SA
4 People found this is helpful
Advertisement