I am new to Airflow, and I am trying to create a Python pipeline scheduling automation process. My project youtubecollection01
utilizes custom created modules, so when I run the DAG it fails with ModuleNotFoundError: No module named 'Authentication'
.
This is how my project is structured:
This is my dag file:
# This to intialize the file as a dag file from airflow import DAG from datetime import datetime, timedelta from airflow.operators.python import PythonOperator # from airflow.utils.dates import days_ago from youtubecollectiontier01.src.__main__ import main default_args = { 'owner': 'airflow', 'depends_on_past': False, # 'start_date': days_ago(1), 'email': ['airflow@example.com'], 'email_on_failure': False, 'email_on_retry': False, 'retries': 1, 'retry_delay': timedelta(minutes=5), # 'priority_weight': 10, # 'end_date': datetime(2016, 1, 1), } # curate dag with DAG('collect_layer_01', start_date=datetime(2022,7,25), schedule_interval='@daily', catchup=False, default_args=default_args) as dag: curate = PythonOperator( task_id='collect_tier_01', # name for the task you would like to execute python_callable=main, # the name of your python function provide_context=True, dag=dag)
I am importing main function from the __main__.py
, however inside the main I am importing other classes such as Authentication.py
, ChannelClass.py
, Common.py
and that’s where Airflow is not recognizing.
Why it is failing for the imports, is it a directory issue or an Airflow issue? I tried moving the project under plugins and run it, but it did not work, any feedback would be highly appreciated!
Thank you!
Advertisement
Answer
Up until the last part, you got everything setup according to the tutorials! Also, thank you for a well documented question.
If you have not changed the PYTHON_PATH
for airflow, you can try the following to get the default with:
$ airflow info
In the paths info part, you get “airflow_home“, “system_path“, “python_path” and “airflow_on_path“.
Now within the “python_path”, you’ll basically see that, airflow is set up so that it will check everything inside /dags
, /plugins
and /config
folder.
More about this topic in documents called “Module Management”
Now, I think, the problem with your code can be fixed with a little change.
In your main code you import:
from Authentication import Authentication
in a default setup, Airflow doesn’t know where that is!
If you import it this way:
from youtubecollectiontier01.src.Authentication import Authentication
Just like the one you did in the DAG file. I believe it will work. Same goes for the other classes you have ChannelClass
, Common
, etc.
Waiting to hear from you!