I am working on a DAG that should read an xml file, do some transformations to it and land the result as a CSV. For this I am using GCSFileTransformOperator
.
Example:
xml_to_csv = GCSFileTransformOperator( task_id=f'xml_to_csv', source_bucket='source_bucket', source_object=( f'raw/dt=2022-01-19/File_20220119_4302.xml' ), destination_bucket='destination_bucket', destination_object=f'csv_format/dt=2022-01-19/File_20220119_4302.csv', transform_script=[ '/path_to_script/transform_script.py' ], )
My problem is that the filename has is ending with a 4 digit number that is different each day (File_20220119_4302). Next day the number will be different.
I can use template for execution date: {{ ds }}
, {{ ds_nodash }}
, but not sure what to with the number.
I have tried wildcards like File_20220119_*.xml
, with no success.
Advertisement
Answer
I dig on the operator GCSFileTransformOperator code and I dont think current wildcards will likely work as the current templates are fixed values based on the time of execution as described on templates reference page and the source file will have a totally different filename.
My solution to this will be to have a python operator as an additional step which can find your input file first. Depending on your airflow version you might use TASKFLOW API or XCOM to pass the filename data.
def look_file(*args, **kwargs): # look for file return {'file_found': filefounpath} file_found = PythonOperator( task_id='file_searcher', python_callable=look_file, dag=dag, ) xml_to_csv = GCSFileTransformOperator( task_id=f'xml_to_csv', source_bucket='source_bucket', source_object=( raw/dt=file_found ), destination_bucket='destination_bucket', destination_object=f'csv_format/dt=2022-01-19/File_20220119_4302.csv', transform_script=[ '/path_to_script/transform_script.py' ], )