I have an airflow comprising of 2-3 steps
- PythonOperator –> It runs the query on AWS Athena and stores the generated file on specific s3 path
- BashOperator –> Increments the airflow variable for tracking
- BashOperator –> It takes the output(response) of task1 and and run some code on top of it.
What happens here is the airflow gets completed within seconds even if the Athena query step is running.
I want to make sure that after the file is generated further steps should run. Basically i want this to be synchronous.
Advertisement
Answer
You can set the tasks as:
def athena_task(): # Add your code return t1 = PythonOperator( task_id='athena_task', python_callable=athena_task, ) t2 = BashOperator( task_id='variable_task', bash_command='', #replace with relevant command ) t3 = BashOperator( task_id='process_task', bash_command='', #replace with relevant command ) t1 >> t2 >> t3
t2 will run only after t1 is completed successfully and t3 will start only after t2 is completed successfully.
Note that Airflow has AWSAthenaOperator which might save you the trouble of writing the code yourself. The operator submit a query to Athena and save the output in S3 path by setting the output_location
parameter:
run_query = AWSAthenaOperator( task_id='athena_task', query='SELECT * FROM my_table', output_location='s3://some-bucket/some-path/', database='my_database' )