I am new to Airflow, and I am wondering, how do I load a file from a GCS Bucket to BigQuery?
So far, I have managed to do BigQuery to GCS Bucket:
bq_recent_questions_query = bigquery_operator.BigQueryOperator( task_id='bq_recent_questions_query', sql=""" SELECT owner_display_name, title, view_count FROM `bigquery-public-data.stackoverflow.posts_questions` WHERE creation_date < CAST('{max_date}' AS TIMESTAMP) AND creation_date >= CAST('{min_date}' AS TIMESTAMP) ORDER BY view_count DESC LIMIT 100 """.format(max_date=max_query_date, min_date=min_query_date), use_legacy_sql=False, destination_dataset_table=bq_recent_questions_table_id) # Export query result to Cloud Storage. export_questions_to_gcs = bigquery_to_gcs.BigQueryToCloudStorageOperator( task_id='export_recent_questions_to_gcs', source_project_dataset_table=bq_recent_questions_table_id, destination_cloud_storage_uris=[output_file], export_format='CSV')
Can someone help me to modify my current code, so I can load a file from a GCS Bucket and load it to BigQuery?
Advertisement
Answer
For your requirement, you can use GCSToBigQueryOperator which is an operator in airflow to transfer files from Cloud Storage to BigQuery.For more information, check this link. You can try the below code.
gcs_to_bq_operator.py
import os from airflow import DAG from airflow import models from airflow.providers.google.cloud.operators.bigquery import ( BigQueryCreateEmptyDatasetOperator, BigQueryDeleteDatasetOperator, ) from airflow.providers.google.cloud.transfers.gcs_to_bigquery import GCSToBigQueryOperator from airflow.utils.dates import days_ago DATASET_NAME = os.environ.get("GCP_DATASET_NAME", 'new-dataset') TABLE_NAME = os.environ.get("GCP_TABLE_NAME", 'Country') dag = models.DAG( dag_id='gcs_to_bq_operator', start_date=days_ago(2), schedule_interval='@once', tags=['example'], ) create_test_dataset = BigQueryCreateEmptyDatasetOperator( task_id='create_airflow_test_dataset', dataset_id=DATASET_NAME, dag=dag ) # [START howto_operator_gcs_to_bigquery] load_csv = GCSToBigQueryOperator( task_id='gcs_to_bigquery_example', bucket='sample-bucket', source_objects=['cloud storage URI'], destination_project_dataset_table=f"{DATASET_NAME}.{TABLE_NAME}", schema_fields=[ {'name': 'Year', 'type': 'INTEGER', 'mode': 'NULLABLE'}, {'name': 'Country', 'type': 'STRING', 'mode': 'NULLABLE'}, {'name': 'number', 'type': 'INTEGER', 'mode': 'NULLABLE'}, {'name': 'result', 'type': 'INTEGER', 'mode': 'NULLABLE'} ], write_disposition='WRITE_TRUNCATE', dag=dag, )