Skip to content
Advertisement

Dataflow BigQuery to BigQuery

I am trying to create a dataflow script that goes from BigQuery back to BigQuery. Our main table is massive and breaks the extraction capabilities. I’d like to create a simple table (as a result of a query) containing all the relevant information.

The SQL query 'Select * from table.orders where paid = false limit 10' is a simple one to make sure it works. The true query is more complex but connects to multiple tables within the same project.

This seems to work but I’d like to know what I can do to test it out? Also, How can I get this to run automatically every morning?

import logging
import argparse
import apache_beam as beam

PROJECT='experimental'
BUCKET='temp1/python2'


def run():
    argv = [
        '--project={0}'.format(PROJECT),
        '--job_name=test1',
        '--save_main_session',
        '--staging_location=gs://{0}/staging/'.format(BUCKET),
        '--temp_location=gs://{0}/staging/'.format(BUCKET),
        '--runner=DataflowRunner'
    ]

    with beam.Pipeline(argv=argv) as p:

        # Read the table rows into a PCollection.
        rows = p | 'read' >> beam.io.Read(beam.io.BigQuerySource(
                query =  'Select * from `table.orders` where paid = false limit 10', 
                use_standard_sql=True))

        # Write the output using a "Write" transform that has side effects.
        rows  | 'Write' >> beam.io.WriteToBigQuery(
                table='orders_test',
                dataset='external',
                project='experimental',
                schema='field1:type1,field2:type2,field3:type3',
                create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
                write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE)


if __name__ == '__main__':
    logging.getLogger().setLevel(logging.INFO)
    run()

Advertisement

Answer

Running daily: https://cloud.google.com/blog/big-data/2016/04/scheduling-dataflow-pipelines-using-app-engine-cron-service-or-cloud-functions

Testing – you can try running against a smaller data set to test it. If you are running user code (not just read / write) you can test by using data from a file and checking expected results. But since you are just doing a read / write you would need to test using bigquery.

User contributions licensed under: CC BY-SA
4 People found this is helpful
Advertisement