I was trying to use the ExternalTaskSensor
in Airflow 1.10.11 to manage the coordinate some dags. I have develop this code to test the functionality:
import time from datetime import datetime, timedelta from pprint import pprint from airflow import DAG from airflow.operators.dagrun_operator import TriggerDagRunOperator from airflow.operators.dummy_operator import DummyOperator from airflow.operators.python_operator import PythonOperator from airflow.sensors.external_task_sensor import ExternalTaskSensor from airflow.utils.state import State sensors_dag = DAG( "test_launch_sensors", schedule_interval=None, start_date=datetime(2020, 2, 14, 0, 0, 0), dagrun_timeout=timedelta(minutes=150), tags=["DEMO"], ) dummy_dag = DAG( "test_dummy_dag", schedule_interval=None, start_date=datetime(2020, 2, 14, 0, 0, 0), dagrun_timeout=timedelta(minutes=150), tags=["DEMO"], ) def print_context(ds, **context): pprint(context['conf']) with dummy_dag: starts = DummyOperator(task_id="starts", dag=dummy_dag) empty = PythonOperator( task_id="empty", provide_context=True, python_callable=print_context, dag=dummy_dag, ) ends = DummyOperator(task_id="ends", dag=dummy_dag) starts >> empty >> ends with sensors_dag: trigger = TriggerDagRunOperator( task_id=f"trigger_{dummy_dag.dag_id}", trigger_dag_id=dummy_dag.dag_id, conf={"key": "value"}, execution_date="{{ execution_date }}", ) sensor = ExternalTaskSensor( task_id="wait_for_dag", external_dag_id=dummy_dag.dag_id, external_task_id="ends", failed_states=["failed", "upstream_failed"], poke_interval=5, timeout=120, ) trigger >> sensor
The idea is that one dag triggers another one with a TriggerDagRunOperator
. This sets the execution_date
to the same value in both dags. This works perfectly when the state of the dummy_dag
last task, ends
, is success
.
However, if I force the intermediate task to fail like so:
def print_context(ds, **context): pprint(context['conf']) raise Exception('ouch')
The Sensor doesn’t detect the failed
or the upstream_failed
states, and it keeps running until it times out. I was using the failed_states
parameter to indicate which states need to be consider as failure, but it seems that is not working.
Am I doing something wrong?
Advertisement
Answer
failed_states
was added in Airflow 2.0; you’d set it to ["failed"]
to configure the sensor to fail the current DAG run if the monitored DAG run failed. If given a task ID, it’ll monitor the task state, otherwise it monitors DAG run state.
In Airflow 1.x, unfortunately, the ExternalTaskSensor
operation only compares DAG run or task state against allowed_states
; as soon as the monitored DAG run or task reaches one of the allowed states, the sensor stops, and is then always marked as successful. By default, the sensor only looks for the SUCCESS
state, so without a timeout it’ll just keep on poking forever if the monitored DAG run has failed. If you put failed
in the allowed_states
list, it will still only ever mark itself as successful.
While you could use a timeout, like you I needed the sensor to fail it’s own DAG run if the external DAG run failed, as if the dependencies for the next task have not been met. This requires you write your own sensor, unfortunately.
Here is my implementation; it is a simplified version of the ExternalTaskSensor()
class, adapted to my simpler needs (no need to check for a specific task id or for anything other than the same execution date):
from airflow.exceptions import AirflowFailException from airflow.models import DagRun from airflow.sensors.base_sensor_operator import BaseSensorOperator from airflow.utils.db import provide_session from airflow.utils.decorators import apply_defaults from airflow.utils.state import State class ExternalDagrunSensor(BaseSensorOperator): """ Waits for a different DAG to complete; if the dagrun has failed, this task fails itself as well. :param external_dag_id: The dag_id that contains the task you want to wait for :type external_dag_id: str """ template_fields = ["external_dag_id"] ui_color = "#19647e" @apply_defaults def __init__(self, external_dag_id, *args, **kwargs): super().__init__(*args, **kwargs) self.external_dag_id = external_dag_id @provide_session def poke(self, context, session=None): dag_id, execution_date = self.external_dag_id, context["execution_date"] self.log.info("Poking for %s on %s ... ", dag_id, execution_date) state = ( session.query(DagRun.state) .filter( DagRun.dag_id == dag_id, DagRun.execution_date == execution_date, DagRun.state.in_((State.SUCCESS, State.FAILED)), ) .scalar() ) if state == State.FAILED: raise AirflowFailException( f"The external DAG run {dag_id} {execution_date} has failed" ) return state is not None
The base sensor implementation will call the poke()
method repeatedly until it returns True
(or the optional timeout was reached), and by raising AirflowFailException
the task state is set to failed immediately, no retrying. It is then up to the downstream task configuration if they will be scheduled to run.