In the dag below, sensor A is set to soft_fail = True, because I’d like to skip B and C if A fails. The problem is I’d still like to get an email alert when A fails. But when soft_fail is true, A is marked as success when the sensor doesn’t detect anything, and no email alert would be sent out. Could someone please help to point out how to achieve this? Many thanks.
A (sensor, soft_fail = True) >> B >> C
Advertisement
Answer
Airflow sensor is marked as skipped
(not success
) when it fails and soft_fail
is True
.
There is no option to add email on skip not a callback. But you can create a new task from the operator EmailOperator
, which run when the sensor A is marked as skipped. Unfortunately, there is no trigger rule to run a task when upstream is skipped, but you can create a new operator which check the state of A and send the email based on it.
from airflow.operators.email import EmailOperator from airflow.utils.context import Context from airflow.utils.state import TaskInstanceState from airflow.utils.trigger_rule import TriggerRule class MyNotifier(EmailOperator): def __int__(self, monitor_task_id: str, notify_on_state: str, *args, **kwargs): self.monitor_task_id = monitor_task_id self.notify_on_state = notify_on_state super().__init__(*args, **kwargs) def execute(self, context: Context): task_to_check = context["dag_run"].get_task_instance(task_id=self.monitor_task_id) if task_to_check.state == self.notify_on_state: super().execute(context) notification_task = MyNotifier( task_id="sensor_skip_notifier", monitor_task_id="A", trigger_rule=TriggerRule.ALL_DONE, # to run the task when A is done regardless the state notify_on_state=TaskInstanceState.SKIPPED, to="<email>", subject="<subject>", html_content="<content>", # you can use jinja to add run info ) A >> notification_task