I’m trying to store my database credentials using Airflow Connections and use them with PythonOperators. I noticed that if I pass the credentials to the PythonOperator then every variable gets logged, including the database password. So I moved to pass the connection object itself to the PythonOperator, per the example below.
But the issue I have now is that airflow creates a ton of these objects, even though this dag is only scheduled to operate daily, leading there to often be issues of reaching the connection limit. How do I use a PostgresHook with a PythonOperator without using a ton of connections for a data script in Airflow?
import sys from airflow import DAG from datetime import datetime, timedelta from airflow.operators.python_operator import PythonOperator from airflow.hooks.postgres_hook import PostgresHook try: sys.path.append('/path/to/my/awesome/module/') from awesome_module import function_1, function_1 except: raise ImportError("Couldn't import awesome_module") postgres_hook_object = PostgresHook("dedicated_bot_account") with postgres_hook_object.get_conn() as con: t1 = PythonOperator( task_id = 'function_1', python_callable = function_1, dag = dag, op_kwargs = {'conn':con} ) t2 = PythonOperator( task_id = 'function_2', python_callable = function_2, dag = dag, op_args = [con, service] )
Advertisement
Answer
From the Airflow Slack I learned that the code in the DAG is run at the frequency of the scheduler, hence the opening of multiple connections every time the scheduler refreshes the DAG.
It seems like best practice is to ensure the connection is opened only at task runtime by either:
- if the tasks are defined in the DAG, move the connection opening code to within the Python function definition
- if the tasks are defined elsewhere have the connection opened within the task. Note that if passing the connection information via plaintext as variable, then this will get logged