I’m trying to run a simple Beam pipeline to extract data from a BQ table using SQL and push to a GCS bucket.
class SimplePipeline: def run_pipeline(self, known_args, pipeline_args, streaming_flag): pipeline_options = PipelineOptions(pipeline_args, streaming=streaming_flag, save_main_session=True) custom_options = pipeline_options.view_as(self.get_pipeline_options_class()) log_info("Starting Pipeline") sql_query = "SELECT col1, col2, col3 FROM `project.dataset.table`" with Pipeline(options=pipeline_options) as pipeline: bq_source = beam.io.BigQuerySource(query=sql_query, use_standard_sql=True) apromore_event_schema = pa.schema([ pa.field("col1", "string",metadata={'parent':'id'}), pa.field("col2", "string",metadata={'parent':'name'}), pa.field("col3", "string",metadata={'parent':'age'})] ) process_data = (pipeline | "ReadFromBigQuery" >> beam.io.Read(bq_source) | "Write to Parguet" >> beam.io.parquetio.WriteToParquet(file_path_prefix='gs://project-gcs/test/', schema=event_schema,file_name_suffix='.parquet') )
My requirement is to pass the SQL from a file (a simple .sql file) and not as a string. I want to modularize the SQL. So far, I’ve tried the following option – it did not work:
with open ("file_query.sql", "r") as myfile: sql_query = myfile.read()
where the contents of my file_query.sql file look like:
"SELECT col1, col2, col3 FROM `project.dataset.table`"
Any help on this folks?
Advertisement
Answer
Any information on the failure that you get? I ran almost a copy from what you shared, and it seems to work fine for me.
file_query.sql
SELECT * FROM `bigquery-public-data`.`baseball`.`games_post_wide`;
pipeline.py
with beam.Pipeline(options=pipeline_options) as pipeline, open("file_query.sql", "r") as file_content: sql_query = file_content.read() bq_source = beam.io.BigQuerySource(query=sql_query, use_standard_sql=True) ( pipeline | 'ReadFromBigQuery' >> beam.io.Read(bq_source) | 'Print to Console' >> beam.Map(print) )
Running the above code is printing rows to the console.