I’m trying to run a simple Beam pipeline to extract data from a BQ table using SQL and push to a GCS bucket.
JavaScript
x
24
24
1
class SimplePipeline:
2
def run_pipeline(self, known_args, pipeline_args, streaming_flag):
3
pipeline_options = PipelineOptions(pipeline_args, streaming=streaming_flag, save_main_session=True)
4
custom_options = pipeline_options.view_as(self.get_pipeline_options_class())
5
log_info("Starting Pipeline")
6
7
sql_query = "SELECT col1, col2, col3 FROM `project.dataset.table`"
8
9
with Pipeline(options=pipeline_options) as pipeline:
10
bq_source = beam.io.BigQuerySource(query=sql_query, use_standard_sql=True)
11
12
13
apromore_event_schema = pa.schema([
14
pa.field("col1", "string",metadata={'parent':'id'}),
15
pa.field("col2", "string",metadata={'parent':'name'}),
16
pa.field("col3", "string",metadata={'parent':'age'})]
17
)
18
19
process_data = (pipeline
20
| "ReadFromBigQuery" >> beam.io.Read(bq_source)
21
| "Write to Parguet" >> beam.io.parquetio.WriteToParquet(file_path_prefix='gs://project-gcs/test/',
22
schema=event_schema,file_name_suffix='.parquet')
23
)
24
My requirement is to pass the SQL from a file (a simple .sql file) and not as a string. I want to modularize the SQL. So far, I’ve tried the following option – it did not work:
JavaScript
1
3
1
with open ("file_query.sql", "r") as myfile:
2
sql_query = myfile.read()
3
where the contents of my file_query.sql file look like:
"SELECT col1, col2, col3 FROM `project.dataset.table`"
Any help on this folks?
Advertisement
Answer
Any information on the failure that you get? I ran almost a copy from what you shared, and it seems to work fine for me.
file_query.sql
JavaScript
1
2
1
SELECT * FROM `bigquery-public-data`.`baseball`.`games_post_wide`;
2
pipeline.py
JavaScript
1
12
12
1
with beam.Pipeline(options=pipeline_options) as pipeline,
2
open("file_query.sql", "r") as file_content:
3
sql_query = file_content.read()
4
bq_source = beam.io.BigQuerySource(query=sql_query, use_standard_sql=True)
5
6
(
7
pipeline
8
| 'ReadFromBigQuery' >> beam.io.Read(bq_source)
9
| 'Print to Console' >> beam.Map(print)
10
)
11
12
Running the above code is printing rows to the console.