I’m trying to make a DAG with params that can be triggered with a dag_id/task_id. The goal of this DAG is to set the state of the last executed task to “success” and to continue the pipeline from this point.
exemple of pipeline:
In my dag I want to be able to set “run_that” to success and automatically run “run_them” as a result of the new state change.
Here is what I did from now:
import airflow from airflow.models import DagRun, TaskInstance, DagBag from airflow.operators.dagrun_operator import TriggerDagRunOperator from airflow.utils.trigger_rule import TriggerRule from airflow.utils.state import State from airflow.operators.python_operator import PythonOperator import pendulum from wrapper import ScriptLauncher, handleErrorSlack, handleErrorMail from datetime import timedelta, datetime default_args = { 'owner': 'tozzi', 'depends_on_past': False, 'start_date': pendulum.datetime(2022, 12, 19, tz='Europe/Paris'), 'retries': 0, 'retry_delay': timedelta(minutes=5), 'xcom_push': True, 'catchup': False, 'params': { 'dag_id': 'my_dag', 'task_id': 'run_that', } } def last_exec(dag_id, task_id, session): task_instances = ( session.query(TaskInstance) .filter(TaskInstance.dag_id == dag_id, TaskInstance.task_id == task_id) .all() ) task_instances.sort(key=lambda x: x.execution_date, reverse=True) if task_instances: return task_instances[0] return None def set_last_task_success(**kwargs): dag_id = kwargs['dag_id'] task_id = kwargs['task_id'] session = airflow.settings.Session() task_instance = last_exec(dag_id, task_id, session) if (task_instance is not None): task_instance.state = 'success' # task_instance = TaskInstance(task_id=task_id, execution_date=last_task_instance.execution_date) task_instance.run(session=session, ignore_ti_state=True, ignore_task_deps=True) session.commit() session.close() doc_md=f"""## Set the given task_id to success of the given dag_id""" # launched remotely launcher = ScriptLauncher(default_args, "@once", 'set_task_to_success', ['airflow'], doc_md) dag = launcher.dag; set_to_success = PythonOperator( task_id='set_to_success', provide_context=True, python_callable=set_last_task_success, dag=dag, op_kwargs={ 'dag_id': '{{ params.dag_id }}', 'task_id': '{{ params.task_id }}', } )
The task_instance.run(...)
call fail here with this error : “AttributeError: ‘TaskInstance’ object has no attribute ‘task'”, the state change is correctly working tho. What should I change so it rerun the “run_them” task when I change the state of the “run_that” task?
Advertisement
Answer
As of Airflow 2.5.0 you can update the state of any existing task via the Airflow REST API.