I have implemented email alerts on success and failure using on_success_callback and on_failure_callback.
According to Airflow documentation,
a context dictionary is passed as a single parameter to this function.
How can I pass another parameter to these callback methods?
Here is my code
from airflow.utils.email import send_email_smtp def task_success_alert(context): subject = "[Airflow] DAG {0} - Task {1}: Success".format( context['task_instance_key_str'].split('__')[0], context['task_instance_key_str'].split('__')[1] ) html_content = """ DAG: {0}<br> Task: {1}<br> Succeeded on: {2} """.format( context['task_instance_key_str'].split('__')[0], context['task_instance_key_str'].split('__')[1], datetime.now() ) send_email_smtp(dag_vars["dev_mailing_list"], subject, html_content) def task_failure_alert(context): subject = "[Airflow] DAG {0} - Task {1}: Failed".format( context['task_instance_key_str'].split('__')[0], context['task_instance_key_str'].split('__')[1] ) html_content = """ DAG: {0}<br> Task: {1}<br> Failed on: {2} """.format( context['task_instance_key_str'].split('__')[0], context['task_instance_key_str'].split('__')[1], datetime.now() ) send_email_smtp(dag_vars["dev_mailing_list"], subject, html_content) default_args = { 'owner': 'airflow', 'depends_on_past': False, 'start_date': datetime(2019, 6, 13), 'on_success_callback': task_success_alert, 'on_failure_callback': task_failure_alert }
I intend to move the callbacks to another package and pass the email address as parameter.
Advertisement
Answer
You could define a function inside your dag that calls the function from your package. And while calling that function, pass email as an argument. You can refine it further at your DAG level to pass only information required for the emails.
from package import outer_task_success_callback email = 'xyz@example.com' def task_success_alert(context): dag_id = context['dag'].dag_id task_id = context['task_instance']. task_id outer_task_success_callback(dag_id, task_id, email) default_args = { 'owner': 'airflow', 'depends_on_past': False, 'start_date': datetime(2019, 6, 13), 'on_success_callback': task_success_alert, 'on_failure_callback': task_failure_alert }
This will allow you to customize before you call the function in your package.
On a side note, airflow has smtp email functionality. Instead of writing your own solution, you can utilize those.