Skip to content
Advertisement

Trigger airflow DAG manually with parameter and pass then into python function

I want to pass parameters into airflow DAG and use them in python function. I can use the parameter into bash operator, but I can’t find any reference to use them as python function.

from airflow import DAG
from airflow.operators.bash_operator import BashOperator from airflow.operators.python_operator import PythonOperator from airflow.utils.dates import days_ago

#Define DAG
dag = DAG("test_backup", schedule_interval=None, start_date=days_ago(1))

#Parameter
owner="{{ dag_run.conf['owner'] }}"
table="{{ dag_run.conf['table'] }}"

run_this="echo "+owner+"."+table

def test_func(owner,table):
    print(owner+"."+table)

task1 = BashOperator(
    task_id='test_task1',
    bash_command=run_this,
    dag=dag,
    queue='cdp_node53',
) 

task2 = PythonOperator(
    task_id='test_task2',
   python_callable=test_func(owner,table),
    dag=dag,
    queue='cdp_node53',
) 

I want to pass below as parameters while trigger DAG. “task1” works fine for me. I need to make “task2” workable. Please guide me to correct the above code so that I can pass parameters into it.

{"owner":"test_owner","table":"test_table"}

Advertisement

Answer

For passing arguments into the PythonOperator you should use either op_args (for positional arguments) or op_kwargs (for keyword arguments). Both parameters are also template fields so the values can be Jinja expressions as well.

Refactoring your code using op_kwargs:

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago


#Define DAG
dag = DAG("test_backup", schedule_interval=None, start_date=days_ago(1))

#Parameter
owner="{{ dag_run.conf['owner'] }}"
table="{{ dag_run.conf['table'] }}"

run_this="echo "+owner+"."+table

def test_func(owner,table):
    print(owner+"."+table)

task1 = BashOperator(
    task_id='test_task1',
    bash_command=run_this,
    dag=dag,
    queue='cdp_node53',
)

task2 = PythonOperator(
    task_id='test_task2',
    python_callable=test_func,
    op_kwargs={"owner": owner, "table": table},
    dag=dag,
    queue='cdp_node53',
)

Both tasks will log the INFO - test_owner.test_table now.

User contributions licensed under: CC BY-SA
1 People found this is helpful
Advertisement