Skip to content
Advertisement

Airflow PythonOperator inside PythonOperator

How can I run a PythonOperator inside another one PythonOperator?

The idea is: to call “main” function as a PythonOperator and then run a few other PythonOperators inside and scheduler them

The code is:

def printFunction(value):
    time.sleep(5)
    print(value)

def main():
    for i in range(10):
        task_2 = PythonOperator(
                        task_id='loop_task_2'+str(i),
                        python_callable = printFunction,
                        op_kwargs = {'value':i},
                        dag=dag,
                )
        task_3 = PythonOperator(
                        task_id='loop_task_3'+str(i),
                        python_callable = printFunction,
                        op_kwargs = {'value':i},
                        dag=dag,
                )
        task_2>>task_3

task = PythonOperator(
                        task_id='tor_task',
                        python_callable = main,
                        op_kwargs = {},
                        dag=dag,
                )

Advertisement

Answer

I think you are expecting that Airflow will create these operators/tasks in the UI but that is not going to happen. With that code you will have only tor_task in the UI. It’s not very clear what you are trying to do but it seems that it would be better to just call printFunction directly in the loop.

In any case if you must use operator inside operator you should call the execute() method. See this answer for more information

User contributions licensed under: CC BY-SA
5 People found this is helpful
Advertisement