Skip to content
Advertisement

How to pass parameters to Airflow on_success_callback and on_failure_callback

I have implemented email alerts on success and failure using on_success_callback and on_failure_callback.

According to Airflow documentation,

a context dictionary is passed as a single parameter to this function.

How can I pass another parameter to these callback methods?

Here is my code

from airflow.utils.email import send_email_smtp

def task_success_alert(context):
    subject = "[Airflow] DAG {0} - Task {1}: Success".format(
        context['task_instance_key_str'].split('__')[0], 
        context['task_instance_key_str'].split('__')[1]
        )
    html_content = """
    DAG: {0}<br>
    Task: {1}<br>
    Succeeded on: {2}
    """.format(
        context['task_instance_key_str'].split('__')[0], 
        context['task_instance_key_str'].split('__')[1], 
        datetime.now()
        )
    send_email_smtp(dag_vars["dev_mailing_list"], subject, html_content)

def task_failure_alert(context):
    subject = "[Airflow] DAG {0} - Task {1}: Failed".format(
        context['task_instance_key_str'].split('__')[0], 
        context['task_instance_key_str'].split('__')[1]
        )
    html_content = """
    DAG: {0}<br>
    Task: {1}<br>
    Failed on: {2}
    """.format(
        context['task_instance_key_str'].split('__')[0], 
        context['task_instance_key_str'].split('__')[1], 
        datetime.now()
        )
    send_email_smtp(dag_vars["dev_mailing_list"], subject, html_content)

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2019, 6, 13),
    'on_success_callback': task_success_alert,
    'on_failure_callback': task_failure_alert
}

I intend to move the callbacks to another package and pass the email address as parameter.

Advertisement

Answer

You could define a function inside your dag that calls the function from your package. And while calling that function, pass email as an argument. You can refine it further at your DAG level to pass only information required for the emails.

from package import outer_task_success_callback
email = 'xyz@example.com'

def task_success_alert(context):
    dag_id = context['dag'].dag_id
    task_id = context['task_instance']. task_id
    outer_task_success_callback(dag_id, task_id, email)
    
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2019, 6, 13),
    'on_success_callback': task_success_alert,
    'on_failure_callback': task_failure_alert
}

This will allow you to customize before you call the function in your package.

On a side note, airflow has smtp email functionality. Instead of writing your own solution, you can utilize those.

Advertisement