In individual DAG task, how do I set up the url link with the help from python operator as I am intending to send an url link of the latest log directly to the user whenever errors occur so that they can access to the page directly skipping the step of navigating.
Advertisement
Answer
You can define a callback
function, which you can then pass to DAG()
operator as default arugments.
def on_failure_callback_slack(context): message = f"Failue! airflow task: {context['task_instance'].task_id} failed" f"dag: {base_url}?dag_id={context['dag'].dag_id} " f"{str(context['task_instance'])}" operator = PythonOperator(task_id="failure", python_callable=post_to_slack, op_kwargs={'message': message} return operator.execute(context=context)
In the above code, post_to_slack()
is just a utility function to post to slack with requests.post(...)
You can pass this function to DAG
, and it will post with the url to slack (or other medium of your choice). Note you will have to provide base_url
for the url to work properly.
default_args = {"on_failure_callback": on_failure_callback_slack} dag=DAG(dag_id='some_id', default_args=default_args)
For more you can read here: https://airflow.apache.org/docs/apache-airflow/2.2.1/logging-monitoring/callbacks.html