I have a Dag in air-flow looks like that: I want to limit the tasks that running simultaneously in this dag to be 4 for example. I know there is ‘max_active_tasks_per_dag’ parameters, but it affects on all dags and in my case I want to define it only for my dag. how can I do so? it is even possible?
Tag: airflow
How to return False when file exists with FIleSensor
I’m trying to reverse FileSensor(): that is, with It will return True if the file exists. But, how can it return False when the file exists and True when it does not? Answer You could create a PythonSensor that checks for the existence of a file with the logic that you want.
Attempting to delete files in s3 folder but the command is removing the entire directory itself
I have an s3 bucket which has 4 folders now of which is input/. After the my airflow DAG Runs at the end of the py code are few lines which attempt to delete all files in the input/. Now, this sometimes deletes all files and sometimes deletes the directory itself. I am not sure why it is deleting even
Airflow GCSFileTransformOperator source object filename wildcard
I am working on a DAG that should read an xml file, do some transformations to it and land the result as a CSV. For this I am using GCSFileTransformOperator. Example: My problem is that the filename has is ending with a 4 digit number that is different each day (File_20220119_4302). Next day the number will be different. I can
Use existing celery workers for Airflow’s Celeryexecutor workers
I am trying to introduce dynamic workflows into my landscape that involves multiple steps of different model inference where the output from one model gets fed into another model.Currently we have few …
Airflow PythonOperator inside PythonOperator
How can I run a PythonOperator inside another one PythonOperator? The idea is: to call “main” function as a PythonOperator and then run a few other PythonOperators inside and scheduler them …
Airflow/EC2 – Save CSV from DAG
I was looking around but I could not find anyone that may have a similar issue. I’m trying to save some results to a CSV into my EC2 instance, but for some reason the return value is none. Here is …
Airflow issue with pathlib / configparser – ‘PosixPath’ object is not iterable
I am trying to containerize my airflow setup. I’ve been tasked to keep the environment the same, just move it into a docker container. We currently have Airflow and all our dependencies installed …
Airflow ExternalTaskSensor don’t fail when External Task fails
I was trying to use the ExternalTaskSensor in Airflow 1.10.11 to manage the coordinate some dags. I have develop this code to test the functionality: import time from datetime import datetime, …
AIRFLOW – Setting relationships from a list of operators
I have a list of operators that were appended to a list in a for loop. I would like to set a relationship for my DAG where each task is set downstream in the order of the list. For example task_list = …