Skip to content
Advertisement

How to load a BigQuery table from a file in GCS Bucket using Airflow?

I am new to Airflow, and I am wondering, how do I load a file from a GCS Bucket to BigQuery?

So far, I have managed to do BigQuery to GCS Bucket:

bq_recent_questions_query = bigquery_operator.BigQueryOperator(
    task_id='bq_recent_questions_query',
    sql="""
    SELECT owner_display_name, title, view_count
    FROM `bigquery-public-data.stackoverflow.posts_questions`
    WHERE creation_date < CAST('{max_date}' AS TIMESTAMP)
        AND creation_date >= CAST('{min_date}' AS TIMESTAMP)
    ORDER BY view_count DESC
    LIMIT 100
    """.format(max_date=max_query_date, min_date=min_query_date),
    use_legacy_sql=False,
    destination_dataset_table=bq_recent_questions_table_id)

# Export query result to Cloud Storage.
export_questions_to_gcs = bigquery_to_gcs.BigQueryToCloudStorageOperator(
    task_id='export_recent_questions_to_gcs',
    source_project_dataset_table=bq_recent_questions_table_id,
    destination_cloud_storage_uris=[output_file],
    export_format='CSV')

Can someone help me to modify my current code, so I can load a file from a GCS Bucket and load it to BigQuery?

Advertisement

Answer

For your requirement, you can use GCSToBigQueryOperator which is an operator in airflow to transfer files from Cloud Storage to BigQuery.For more information, check this link. You can try the below code.

gcs_to_bq_operator.py

import os
from airflow import DAG
from airflow import models
from airflow.providers.google.cloud.operators.bigquery import (
    BigQueryCreateEmptyDatasetOperator,
    BigQueryDeleteDatasetOperator,
)
from airflow.providers.google.cloud.transfers.gcs_to_bigquery import GCSToBigQueryOperator
from airflow.utils.dates import days_ago

DATASET_NAME = os.environ.get("GCP_DATASET_NAME", 'new-dataset')
TABLE_NAME = os.environ.get("GCP_TABLE_NAME", 'Country')

dag = models.DAG(
    dag_id='gcs_to_bq_operator',
    start_date=days_ago(2),
    schedule_interval='@once',
    tags=['example'],
)

create_test_dataset = BigQueryCreateEmptyDatasetOperator(
    task_id='create_airflow_test_dataset', dataset_id=DATASET_NAME, dag=dag
)

# [START howto_operator_gcs_to_bigquery]
load_csv = GCSToBigQueryOperator(
    task_id='gcs_to_bigquery_example',
    bucket='sample-bucket',
    source_objects=['cloud storage URI'],
    destination_project_dataset_table=f"{DATASET_NAME}.{TABLE_NAME}",
    schema_fields=[
        {'name': 'Year', 'type': 'INTEGER', 'mode': 'NULLABLE'},
        {'name': 'Country', 'type': 'STRING', 'mode': 'NULLABLE'},
        {'name': 'number', 'type': 'INTEGER', 'mode': 'NULLABLE'},
        {'name': 'result', 'type': 'INTEGER', 'mode': 'NULLABLE'}
    ],
    write_disposition='WRITE_TRUNCATE',
    dag=dag,
)
User contributions licensed under: CC BY-SA
1 People found this is helpful
Advertisement