I want to pass parameters into airflow DAG and use them in python function. I can use the parameter into bash operator, but I can’t find any reference to use them as python function.
from airflow import DAG from airflow.operators.bash_operator import BashOperator from airflow.operators.python_operator import PythonOperator from airflow.utils.dates import days_ago #Define DAG dag = DAG("test_backup", schedule_interval=None, start_date=days_ago(1)) #Parameter owner="{{ dag_run.conf['owner'] }}" table="{{ dag_run.conf['table'] }}" run_this="echo "+owner+"."+table def test_func(owner,table): print(owner+"."+table) task1 = BashOperator( task_id='test_task1', bash_command=run_this, dag=dag, queue='cdp_node53', ) task2 = PythonOperator( task_id='test_task2', python_callable=test_func(owner,table), dag=dag, queue='cdp_node53', )
I want to pass below as parameters while trigger DAG. “task1” works fine for me. I need to make “task2” workable. Please guide me to correct the above code so that I can pass parameters into it.
{"owner":"test_owner","table":"test_table"}
Advertisement
Answer
For passing arguments into the PythonOperator
you should use either op_args
(for positional arguments) or op_kwargs
(for keyword arguments). Both parameters are also template fields so the values can be Jinja expressions as well.
Refactoring your code using op_kwargs
:
from airflow import DAG from airflow.operators.bash_operator import BashOperator from airflow.operators.python_operator import PythonOperator from airflow.utils.dates import days_ago #Define DAG dag = DAG("test_backup", schedule_interval=None, start_date=days_ago(1)) #Parameter owner="{{ dag_run.conf['owner'] }}" table="{{ dag_run.conf['table'] }}" run_this="echo "+owner+"."+table def test_func(owner,table): print(owner+"."+table) task1 = BashOperator( task_id='test_task1', bash_command=run_this, dag=dag, queue='cdp_node53', ) task2 = PythonOperator( task_id='test_task2', python_callable=test_func, op_kwargs={"owner": owner, "table": table}, dag=dag, queue='cdp_node53', )
Both tasks will log the INFO - test_owner.test_table
now.