Skip to content
Advertisement

How to run Airflow tasks synchronously

I have an airflow comprising of 2-3 steps

  1. PythonOperator –> It runs the query on AWS Athena and stores the generated file on specific s3 path
  2. BashOperator –> Increments the airflow variable for tracking
  3. BashOperator –> It takes the output(response) of task1 and and run some code on top of it.

What happens here is the airflow gets completed within seconds even if the Athena query step is running.

I want to make sure that after the file is generated further steps should run. Basically i want this to be synchronous.

Advertisement

Answer

You can set the tasks as:

def athena_task():
    # Add your code
    return

t1 = PythonOperator(
    task_id='athena_task',
    python_callable=athena_task,
)

t2 = BashOperator(
    task_id='variable_task',
    bash_command='', #replace with relevant command
)

t3 = BashOperator(
    task_id='process_task',
    bash_command='', #replace with relevant command
)

t1 >> t2 >> t3

t2 will run only after t1 is completed successfully and t3 will start only after t2 is completed successfully.

Note that Airflow has AWSAthenaOperator which might save you the trouble of writing the code yourself. The operator submit a query to Athena and save the output in S3 path by setting the output_location parameter:

run_query = AWSAthenaOperator(
    task_id='athena_task',
    query='SELECT * FROM  my_table',
    output_location='s3://some-bucket/some-path/',
    database='my_database'
)
User contributions licensed under: CC BY-SA
4 People found this is helpful
Advertisement