To preface I’m fairly new to Docker, Airflow & Stackoverflow. I’ve got an instance of Airflow running in Docker on an Ubuntu (20.04.3) VM. I’m trying to get Openpyxl installed on build in order to use it as the engine for pd.read_excel. Here’s the Dockerfile with the install command: The requirements.txt file looks like this: And the docker-compose.yaml file looks
Tag: airflow
Airflow GCSFileTransformOperator source object filename wildcard
I am working on a DAG that should read an xml file, do some transformations to it and land the result as a CSV. For this I am using GCSFileTransformOperator. Example: My problem is that the filename has is ending with a 4 digit number that is different each day (File_20220119_4302). Next day the number will be different. I can
Why airflow is returning error while requesting Rest API?
I have a python code which is requesting a Rest API. The API has more than 5000+ pages so i tried to request it but always i am getting error at 2000th request. The error is: “df = pd.json_normalize(json_data[“items”]) KeyError: ‘items'” How can i solve this problem ? P.S. In locally, the code is working clearly. Answer I found a
How to pass the PostgreSQL query result into a variable in Airflow? (Postgres Operator or Postgres Hook)
I’m planning to use PostgreSQL as my task meta info provider, so I want to run a few queries and get some data and pass it like a filled variable to another task. The problem is when I use PostgresHook I get the data but its in a python method that I cant access, in fact I see bellow line
Airflow DAG script print the value in logs
Actually I was passing JSON {“Column”: “ABC123”} in Airflow before triggering it and in DAG script I have written the code as below in DAG script Actually I want to print the value as 123 in Airflow logs but it is not printing in the logs…DAG runs successful but not able to print the value in logs whatever I passed
How to run Airflow tasks synchronously
I have an airflow comprising of 2-3 steps PythonOperator –> It runs the query on AWS Athena and stores the generated file on specific s3 path BashOperator –> Increments the airflow variable for tracking BashOperator –> It takes the output(response) of task1 and and run some code on top of it. What happens here is the airflow gets completed within
Decode UTF-8 encoded Xcom value from SSHOperator
I have two Airflow tasks that I want to communicate. The SSHOperator returns the last line printed, in this case, “remote_IP”. However, the SSHOperator’s return value is encoded using UTF-8. How can the SSHOperator Read_remote_IP return value non-encoded? Also, how can the BashOperator Read_SSH_Output decode the encoded value? Answer My current solution is to introduce another Python operator to convert
How to install packages in Airflow (docker-compose)?
The question is very similar to the one already available. The only difference is that I ran Airflow in docker Step by step: Put docker-compose.yaml to PyCharm project Put requirements.txt to PyCharm project Run docker-compose up Run DAG and receive a ModuleNotFoundError I want to start Airflow using docker-compose with the dependencies from requirements.txt. These dependencies should be available by
Trigger airflow DAG manually with parameter and pass then into python function
I want to pass parameters into airflow DAG and use them in python function. I can use the parameter into bash operator, but I can’t find any reference to use them as python function. I want to pass below as parameters while trigger DAG. “task1” works fine for me. I need to make “task2” workable. Please guide me to correct
Use existing celery workers for Airflow’s Celeryexecutor workers
I am trying to introduce dynamic workflows into my landscape that involves multiple steps of different model inference where the output from one model gets fed into another model.Currently we have few Celery workers spread across hosts to manage the inference chain. As the complexity increase, we are attempting to build workflows on the fly. For that purpose, I got