Skip to content
Advertisement

Airflow ExternalTaskSensor don’t fail when External Task fails

I was trying to use the ExternalTaskSensor in Airflow 1.10.11 to manage the coordinate some dags. I have develop this code to test the functionality:

JavaScript

The idea is that one dag triggers another one with a TriggerDagRunOperator. This sets the execution_date to the same value in both dags. This works perfectly when the state of the dummy_dag last task, ends, is success.

However, if I force the intermediate task to fail like so:

JavaScript

The Sensor doesn’t detect the failed or the upstream_failed states, and it keeps running until it times out. I was using the failed_states parameter to indicate which states need to be consider as failure, but it seems that is not working.

Am I doing something wrong?

Advertisement

Answer

failed_states was added in Airflow 2.0; you’d set it to ["failed"] to configure the sensor to fail the current DAG run if the monitored DAG run failed. If given a task ID, it’ll monitor the task state, otherwise it monitors DAG run state.

In Airflow 1.x, unfortunately, the ExternalTaskSensor operation only compares DAG run or task state against allowed_states; as soon as the monitored DAG run or task reaches one of the allowed states, the sensor stops, and is then always marked as successful. By default, the sensor only looks for the SUCCESS state, so without a timeout it’ll just keep on poking forever if the monitored DAG run has failed. If you put failed in the allowed_states list, it will still only ever mark itself as successful.

While you could use a timeout, like you I needed the sensor to fail it’s own DAG run if the external DAG run failed, as if the dependencies for the next task have not been met. This requires you write your own sensor, unfortunately.

Here is my implementation; it is a simplified version of the ExternalTaskSensor() class, adapted to my simpler needs (no need to check for a specific task id or for anything other than the same execution date):

JavaScript

The base sensor implementation will call the poke() method repeatedly until it returns True (or the optional timeout was reached), and by raising AirflowFailException the task state is set to failed immediately, no retrying. It is then up to the downstream task configuration if they will be scheduled to run.

Advertisement