Skip to content
Advertisement

Airflow ExternalTaskSensor don’t fail when External Task fails

I was trying to use the ExternalTaskSensor in Airflow 1.10.11 to manage the coordinate some dags. I have develop this code to test the functionality:

import time
from datetime import datetime, timedelta
from pprint import pprint

from airflow import DAG
from airflow.operators.dagrun_operator import TriggerDagRunOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from airflow.sensors.external_task_sensor import ExternalTaskSensor
from airflow.utils.state import State

sensors_dag = DAG(
    "test_launch_sensors",
    schedule_interval=None,
    start_date=datetime(2020, 2, 14, 0, 0, 0),
    dagrun_timeout=timedelta(minutes=150),
    tags=["DEMO"],
)

dummy_dag = DAG(
    "test_dummy_dag",
    schedule_interval=None,
    start_date=datetime(2020, 2, 14, 0, 0, 0),
    dagrun_timeout=timedelta(minutes=150),
    tags=["DEMO"],
)


def print_context(ds, **context):
    pprint(context['conf'])


with dummy_dag:
    starts = DummyOperator(task_id="starts", dag=dummy_dag)
    empty = PythonOperator(
        task_id="empty",
        provide_context=True,
        python_callable=print_context,
        dag=dummy_dag,
    )
    ends = DummyOperator(task_id="ends", dag=dummy_dag)

    starts >> empty >> ends

with sensors_dag:
    trigger = TriggerDagRunOperator(
        task_id=f"trigger_{dummy_dag.dag_id}",
        trigger_dag_id=dummy_dag.dag_id,
        conf={"key": "value"},
        execution_date="{{ execution_date }}",
    )
    sensor = ExternalTaskSensor(
        task_id="wait_for_dag",
        external_dag_id=dummy_dag.dag_id,
        external_task_id="ends",
        failed_states=["failed", "upstream_failed"],
        poke_interval=5,
        timeout=120,
    )
    trigger >> sensor

The idea is that one dag triggers another one with a TriggerDagRunOperator. This sets the execution_date to the same value in both dags. This works perfectly when the state of the dummy_dag last task, ends, is success.

However, if I force the intermediate task to fail like so:

def print_context(ds, **context):
    pprint(context['conf'])
    raise Exception('ouch')

The Sensor doesn’t detect the failed or the upstream_failed states, and it keeps running until it times out. I was using the failed_states parameter to indicate which states need to be consider as failure, but it seems that is not working.

Am I doing something wrong?

Advertisement

Answer

failed_states was added in Airflow 2.0; you’d set it to ["failed"] to configure the sensor to fail the current DAG run if the monitored DAG run failed. If given a task ID, it’ll monitor the task state, otherwise it monitors DAG run state.

In Airflow 1.x, unfortunately, the ExternalTaskSensor operation only compares DAG run or task state against allowed_states; as soon as the monitored DAG run or task reaches one of the allowed states, the sensor stops, and is then always marked as successful. By default, the sensor only looks for the SUCCESS state, so without a timeout it’ll just keep on poking forever if the monitored DAG run has failed. If you put failed in the allowed_states list, it will still only ever mark itself as successful.

While you could use a timeout, like you I needed the sensor to fail it’s own DAG run if the external DAG run failed, as if the dependencies for the next task have not been met. This requires you write your own sensor, unfortunately.

Here is my implementation; it is a simplified version of the ExternalTaskSensor() class, adapted to my simpler needs (no need to check for a specific task id or for anything other than the same execution date):

from airflow.exceptions import AirflowFailException
from airflow.models import DagRun
from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.utils.db import provide_session
from airflow.utils.decorators import apply_defaults
from airflow.utils.state import State

class ExternalDagrunSensor(BaseSensorOperator):
    """
    Waits for a different DAG to complete; if the dagrun has failed, this
    task fails itself as well.

    :param external_dag_id: The dag_id that contains the task you want to
        wait for
    :type external_dag_id: str
    """

    template_fields = ["external_dag_id"]
    ui_color = "#19647e"

    @apply_defaults
    def __init__(self, external_dag_id, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.external_dag_id = external_dag_id

    @provide_session
    def poke(self, context, session=None):
        dag_id, execution_date = self.external_dag_id, context["execution_date"]
        self.log.info("Poking for %s on %s ... ", dag_id, execution_date)

        state = (
            session.query(DagRun.state)
            .filter(
                DagRun.dag_id == dag_id,
                DagRun.execution_date == execution_date,
                DagRun.state.in_((State.SUCCESS, State.FAILED)),
            )
            .scalar()
        )
        if state == State.FAILED:
            raise AirflowFailException(
                f"The external DAG run {dag_id} {execution_date} has failed"
            )
        return state is not None

The base sensor implementation will call the poke() method repeatedly until it returns True (or the optional timeout was reached), and by raising AirflowFailException the task state is set to failed immediately, no retrying. It is then up to the downstream task configuration if they will be scheduled to run.

Advertisement