How can I run a PythonOperator inside another one PythonOperator? The idea is: to call “main” function as a PythonOperator and then run a few other PythonOperators inside and scheduler them The code is: Answer I think you are expecting that Airflow will create these operators/tasks in the UI but t…
Tag: airflow
Airflow/EC2 – Save CSV from DAG
I was looking around but I could not find anyone that may have a similar issue. I’m trying to save some results to a CSV into my EC2 instance, but for some reason the return value is none. Here is what I have: I have to get this output in a CSV and therefore the use of Pandas. This is
Airflow issue with pathlib / configparser – ‘PosixPath’ object is not iterable
I am trying to containerize my airflow setup. I’ve been tasked to keep the environment the same, just move it into a docker container. We currently have Airflow and all our dependencies installed within a anaconda environment. So what I’ve done is created a custom docker image that installs anacon…
Airflow ExternalTaskSensor don’t fail when External Task fails
I was trying to use the ExternalTaskSensor in Airflow 1.10.11 to manage the coordinate some dags. I have develop this code to test the functionality: The idea is that one dag triggers another one with a TriggerDagRunOperator. This sets the execution_date to the same value in both dags. This works perfectly wh…
AIRFLOW – Setting relationships from a list of operators
I have a list of operators that were appended to a list in a for loop. I would like to set a relationship for my DAG where each task is set downstream in the order of the list. For example Thank you Answer There’s a helper function created for that, will be much more performant than the accepted answer …
Airflow HttpSensor using a default host
I’m trying to poll some endpoint to wait until the Last-Modified header shows the endpoint has been updated in the last five minutes (the default poke interval for the HttpSensor). In the Airflow logs, I see the following: As the logs show, the hostname it’s using is: Using connection to: id: http…
How to use DatabaseHook objects with PythonOperator in Airflow without running out of connections?
I’m trying to store my database credentials using Airflow Connections and use them with PythonOperators. I noticed that if I pass the credentials to the PythonOperator then every variable gets logged, including the database password. So I moved to pass the connection object itself to the PythonOperator,…
Need help running spark-submit in Apache Airflow
I am a relatively new user to Python and Airflow and am having a very difficult time getting spark-submit to run in an Airflow task. My goal is to get the following DAG task to run successfully I know the problem lies with Airflow and not with the bash because when I run the command spark-submit –class …
How to pass parameters to Airflow on_success_callback and on_failure_callback
I have implemented email alerts on success and failure using on_success_callback and on_failure_callback. According to Airflow documentation, a context dictionary is passed as a single parameter to this function. How can I pass another parameter to these callback methods? Here is my code I intend to move the …
AirFlowException – Python_Callable must be callable
I made a small change to an existing workflow, and it has broken airflow. Here is the code: Here is the error I’m receiving: airflow.exceptions.AirflowException: python_callable param must be callable Answer seems like you are passing trigger_report itself as the python_callable. Is this intentional? do…