How can I run a PythonOperator inside another one PythonOperator? The idea is: to call “main” function as a PythonOperator and then run a few other PythonOperators inside and scheduler them The code is: Answer I think you are expecting that Airflow will create these operators/tasks in the UI but that is not going to happen. With that code you
Tag: airflow
Airflow/EC2 – Save CSV from DAG
I was looking around but I could not find anyone that may have a similar issue. I’m trying to save some results to a CSV into my EC2 instance, but for some reason the return value is none. Here is what I have: I have to get this output in a CSV and therefore the use of Pandas. This is
Airflow issue with pathlib / configparser – ‘PosixPath’ object is not iterable
I am trying to containerize my airflow setup. I’ve been tasked to keep the environment the same, just move it into a docker container. We currently have Airflow and all our dependencies installed within a anaconda environment. So what I’ve done is created a custom docker image that installs anaconda and creates my environment. The problem is, our current environment
Airflow ExternalTaskSensor don’t fail when External Task fails
I was trying to use the ExternalTaskSensor in Airflow 1.10.11 to manage the coordinate some dags. I have develop this code to test the functionality: The idea is that one dag triggers another one with a TriggerDagRunOperator. This sets the execution_date to the same value in both dags. This works perfectly when the state of the dummy_dag last task, ends,
AIRFLOW – Setting relationships from a list of operators
I have a list of operators that were appended to a list in a for loop. I would like to set a relationship for my DAG where each task is set downstream in the order of the list. For example Thank you Answer There’s a helper function created for that, will be much more performant than the accepted answer See
Airflow HttpSensor using a default host
I’m trying to poll some endpoint to wait until the Last-Modified header shows the endpoint has been updated in the last five minutes (the default poke interval for the HttpSensor). In the Airflow logs, I see the following: As the logs show, the hostname it’s using is: Using connection to: id: http_default. Host: https://www.httpbin.org/, and so when it goes to
How to use DatabaseHook objects with PythonOperator in Airflow without running out of connections?
I’m trying to store my database credentials using Airflow Connections and use them with PythonOperators. I noticed that if I pass the credentials to the PythonOperator then every variable gets logged, including the database password. So I moved to pass the connection object itself to the PythonOperator, per the example below. But the issue I have now is that airflow
Need help running spark-submit in Apache Airflow
I am a relatively new user to Python and Airflow and am having a very difficult time getting spark-submit to run in an Airflow task. My goal is to get the following DAG task to run successfully I know the problem lies with Airflow and not with the bash because when I run the command spark-submit –class CLASSPATH.CustomCreate ~/IdeaProjects/custom-create-job/build/libs/custom-create.jar in
How to pass parameters to Airflow on_success_callback and on_failure_callback
I have implemented email alerts on success and failure using on_success_callback and on_failure_callback. According to Airflow documentation, a context dictionary is passed as a single parameter to this function. How can I pass another parameter to these callback methods? Here is my code I intend to move the callbacks to another package and pass the email address as parameter. Answer
AirFlowException – Python_Callable must be callable
I made a small change to an existing workflow, and it has broken airflow. Here is the code: Here is the error I’m receiving: airflow.exceptions.AirflowException: python_callable param must be callable Answer seems like you are passing trigger_report itself as the python_callable. Is this intentional? does it already have a value? (probably, otherwise you would’ve gotten a NameError: name ‘trigger_report’ is