I have a Dataflow pipeline that fetches data from Pub/Sub and prepares them for insertion into Big Query and them writes them into the Database.
It works fine, it can generate the schema automatically and it is able to recognise what datatype to use and everything.
However the data we are using with it can vary vastly in format. Ex: we can get both A and B for a single column
A {"name":"John"} B {"name":["Albert", "Einstein"]}
If the first message we get gets added, then adding the second one will not work.
If i do it the other way around it does however.
i always get the following error:
INFO:root:Error: 400 POST https://bigquery.googleapis.com/upload/bigquery/v2/project/projectname/jobs?uploadType=resumable: Provided Schema does not match Table project:test_dataset.test_table. Field cars has changed mode from NULLABLE to REPEATED with loading dataframe ERROR:apache_beam.runners.direct.executor:Exception at bundle <apache_beam.runners.direct.bundle_factory._Bundle object at 0x7fcb9003f2c0>, due to an exception. Traceback (most recent call last): ........ During handling of the above exception, another exception occurred: Traceback (most recent call last): ..... Provided Schema does not match Table project.test_table. Field cars has changed mode from NULLABLE to REPEATED During handling of the above exception, another exception occurred: Traceback (most recent call last): File "apache_beam/runners/common.py", line 1233, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 582, in apache_beam.runners.common.SimpleInvoker.invoke_process File "newmain.py", line 211, in process if load_job and load_job.errors: UnboundLocalError: local variable 'load_job' referenced before assignment
Below is the code
class WriteDataframeToBQ(beam.DoFn): def __init__(self, bq_dataset, bq_table, project_id): self.bq_dataset = bq_dataset self.bq_table = bq_table self.project_id = project_id def start_bundle(self): self.client = bigquery.Client() def process(self, df): # table where we're going to store the data table_id = f"{self.bq_dataset}.{self.bq_table}" # function to help with the json -> bq schema transformations generator = SchemaGenerator(input_format='dict', quoted_values_are_strings=True, keep_nulls=True) # Get original schema to assist the deduce_schema function. If the table doesn't exist # proceed with empty original_schema_map try: table = self.client.get_table(table_id) original_schema = table.schema self.client.schema_to_json(original_schema, "original_schema.json") with open("original_schema.json") as f: original_schema = json.load(f) original_schema_map, original_schema_error_logs = generator.deduce_schema(input_data=original_schema) except Exception: logging.info(f"{table_id} table not exists. Proceed without getting schema") original_schema_map = {} # convert dataframe to dict json_text = df.to_dict('records') # generate the new schema, we need to write it to a file because schema_from_json only accepts json file as input schema_map, error_logs = generator.deduce_schema(input_data=json_text, schema_map=original_schema_map) schema = generator.flatten_schema(schema_map) schema_file_name = "schema_map.json" with open(schema_file_name, "w") as output_file: json.dump(schema, output_file) # convert the generated schema to a version that BQ understands bq_schema = self.client.schema_from_json(schema_file_name) job_config = bigquery.LoadJobConfig( source_format=bigquery.SourceFormat.NEWLINE_DELIMITED_JSON, schema_update_options=[ bigquery.SchemaUpdateOption.ALLOW_FIELD_ADDITION, bigquery.SchemaUpdateOption.ALLOW_FIELD_RELAXATION ], write_disposition=bigquery.WriteDisposition.WRITE_APPEND, schema=bq_schema ) job_config.schema = bq_schema try: load_job = self.client.load_table_from_json( json_text, table_id, job_config=job_config, ) # Make an API request. load_job.result() # Waits for the job to complete. if load_job.errors: logging.info(f"error_result = {load_job.error_result}") logging.info(f"errors = {load_job.errors}") else: logging.info(f'Loaded {len(df)} rows.') except Exception as error: logging.info(f'Error: {error} with loading dataframe') if load_job and load_job.errors: logging.info(f"error_result = {load_job.error_result}") logging.info(f"errors = {load_job.errors}") def run(argv): parser = argparse.ArgumentParser() known_args, pipeline_args = parser.parse_known_args(argv) pipeline_options = PipelineOptions(pipeline_args, save_main_session=True, streaming=True) options = pipeline_options.view_as(JobOptions) with beam.Pipeline(options=pipeline_options) as pipeline: ( pipeline | "Read PubSub Messages" >> beam.io.ReadFromPubSub(subscription=options.input_subscription) | "Write Raw Data to Big Query" >> beam.ParDo(WriteDataframeToBQ(project_id=options.project_id, bq_dataset=options.bigquery_dataset, bq_table=options.bigquery_table)) ) if __name__ == "__main__": logging.getLogger().setLevel(logging.INFO) run(sys.argv)
Is there a way to change the restrictions of the table to make this work?
Advertisement
Answer
BigQuery isn’t a document database, but a columnar oriented database. In addition, you can’t update the schema of existing columns (only add or remove them).
For your use case, and because you can’t know/predict the most generic schema of each of your field, the safer is to store the raw JSON as a string, and then to use the JSON functions of BigQuery to post process, in SQL, your data