I am trying to create a dataflow script that goes from BigQuery back to BigQuery. Our main table is massive and breaks the extraction capabilities. I’d like to create a simple table (as a result of a query) containing all the relevant information.
The SQL query 'Select * from table.orders where paid = false limit 10'
is a simple one to make sure it works. The true query is more complex but connects to multiple tables within the same project.
This seems to work but I’d like to know what I can do to test it out? Also, How can I get this to run automatically every morning?
import logging import argparse import apache_beam as beam PROJECT='experimental' BUCKET='temp1/python2' def run(): argv = [ '--project={0}'.format(PROJECT), '--job_name=test1', '--save_main_session', '--staging_location=gs://{0}/staging/'.format(BUCKET), '--temp_location=gs://{0}/staging/'.format(BUCKET), '--runner=DataflowRunner' ] with beam.Pipeline(argv=argv) as p: # Read the table rows into a PCollection. rows = p | 'read' >> beam.io.Read(beam.io.BigQuerySource( query = 'Select * from `table.orders` where paid = false limit 10', use_standard_sql=True)) # Write the output using a "Write" transform that has side effects. rows | 'Write' >> beam.io.WriteToBigQuery( table='orders_test', dataset='external', project='experimental', schema='field1:type1,field2:type2,field3:type3', create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED, write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE) if __name__ == '__main__': logging.getLogger().setLevel(logging.INFO) run()
Advertisement
Answer
Running daily: https://cloud.google.com/blog/big-data/2016/04/scheduling-dataflow-pipelines-using-app-engine-cron-service-or-cloud-functions
Testing – you can try running against a smaller data set to test it. If you are running user code (not just read / write) you can test by using data from a file and checking expected results. But since you are just doing a read / write you would need to test using bigquery.