Skip to content
Advertisement

How do I retrieve airflow url link to my latest task log in a DAG?

enter image description here

In individual DAG task, how do I set up the url link with the help from python operator as I am intending to send an url link of the latest log directly to the user whenever errors occur so that they can access to the page directly skipping the step of navigating.

Advertisement

Answer

You can define a callback function, which you can then pass to DAG() operator as default arugments.

def on_failure_callback_slack(context):
    message = f"Failue! airflow task: {context['task_instance'].task_id} failed" 
f"dag: {base_url}?dag_id={context['dag'].dag_id} " 
f"{str(context['task_instance'])}"

    operator = PythonOperator(task_id="failure", python_callable=post_to_slack, op_kwargs={'message': message}
    
    return operator.execute(context=context)

In the above code, post_to_slack() is just a utility function to post to slack with requests.post(...)

You can pass this function to DAG, and it will post with the url to slack (or other medium of your choice). Note you will have to provide base_url for the url to work properly.

default_args = {"on_failure_callback": on_failure_callback_slack}
dag=DAG(dag_id='some_id', default_args=default_args)

For more you can read here: https://airflow.apache.org/docs/apache-airflow/2.2.1/logging-monitoring/callbacks.html

User contributions licensed under: CC BY-SA
7 People found this is helpful
Advertisement