Skip to content
Advertisement

Dataflow Bigquery-Bigquery pipeline executes on smaller data, but not the large production dataset

A little bit of a newbie to Dataflow here, but have succesfully created a pipleine that works well.

The pipleine reads in a query from BigQuery, applies a ParDo (NLP fucntion) and then writes the data to a new BigQuery table.

The dataset I am trying to process is roughly 500GB with 46M records.

When I try this with a subset of the same data (about 300k records) it works just fine and is speedy see below: enter image description here

When I try run this with the full dataset, it starts super fast, but then tapers off and ultimately fails. At this point the job failed and had added about 900k elements which was about 6-7GB and then the element count actually started decreasing.

enter image description here

I am using 250 workers and a n1-highmem-6 machine type

In the worker logs I get a few of these (about 10):

Info
2021-04-22 06:29:38.236 EDTRefreshing due to a 401 (attempt 1/2)

This was one of the final warnings:

2021-04-22 06:29:32.392 EDTS08:[85]: GetArticles/Read+[85]: GetArticles/_PassThroughThenCleanup/ParDo(PassThrough)/ParDo(PassThrough)+[85]: ExtractEntity+[85]: WriteToBigQuery/BigQueryBatchFileLoads/RewindowIntoGlobal+[85]: WriteToBigQuery/BigQueryBatchFileLoads/AppendDestination+[85]: WriteToBigQuery/BigQueryBatchFileLoads/ParDo(WriteRecordsToFile)/ParDo(WriteRecordsToFile)/ParDo(WriteRecordsToFile)+[85]: WriteToBigQuery/BigQueryBatchFileLoads/IdentityWorkaround+[85]: WriteToBigQuery/BigQueryBatchFileLoads/GroupFilesByTableDestinations/Reify+[85]: WriteToBigQuery/BigQueryBatchFileLoads/GroupFilesByTableDestinations/Write+[85]: WriteToBigQuery/BigQueryBatchFileLoads/ParDo(_ShardDestinations)+[85]: WriteToBigQuery/BigQueryBatchFileLoads/GroupShardedRows/Reify+[85]: WriteToBigQuery/BigQueryBatchFileLoads/GroupShardedRows/Write failed.

In the execution details there are multiple of these:

2021-04-22 06:29:40.202 EDTOperation ongoing for over 413.09 seconds in state process-msecs in step s6 . Current Traceback: File "/usr/local/lib/python3.7/runpy.py", line 193, in _run_module_as_main "__main__", mod_spec) File "/usr/local/lib/python3.7/runpy.py", line 85, in _run_code exec(code, run_globals) File "/usr/local/lib/python3.7/site-packages/dataflow_worker/start.py", line 144, in <module> main() File "/usr/local/lib/python3.7/site-packages/dataflow_worker/start.py", line 140, in main batchworker.BatchWorker(properties, sdk_pipeline_options).run() File "/usr/local/lib/python3.7/site-packages/dataflow_worker/batchworker.py", line 844, in run deferred_exception_details=deferred_exception_details) File "/usr/local/lib/python3.7/site-packages/dataflow_worker/batchworker.py", line 649, in do_work work_executor.execute() File "/usr/local/lib/python3.7/site-packages/dataflow_worker/executor.py", line 179, in execute op.start() File "<ipython-input-81-df441d984b0a>", line 194, in process File "<ipython-input-81-df441d984b0a>", line 173, in extract_entities File "<ipython-input-81-df441d984b0a>", line 95, in get_company_sentences

I assume that these are from the larger texts in the dataset that can take a while to process, so after a little bit these items are processed and the next ones start.

There is also a few of these:

2021-04-22 06:29:40.202 EDTOperation ongoing for over 413.09 seconds in state process-msecs in step s6 . Current Traceback: File "/usr/local/lib/python3.7/runpy.py", line 193, in _run_module_as_main "__main__", mod_spec) File "/usr/local/lib/python3.7/runpy.py", line 85, in _run_code exec(code, run_globals) File "/usr/local/lib/python3.7/site-packages/dataflow_worker/start.py", line 144, in <module> main() File "/usr/local/lib/python3.7/site-packages/dataflow_worker/start.py", line 140, in main batchworker.BatchWorker(properties, sdk_pipeline_options).run() File "/usr/local/lib/python3.7/site-packages/dataflow_worker/batchworker.py", line 844, in run deferred_exception_details=deferred_exception_details) File "/usr/local/lib/python3.7/site-packages/dataflow_worker/batchworker.py", line 649, in do_work work_executor.execute() File "/usr/local/lib/python3.7/site-packages/dataflow_worker/executor.py", line 179, in execute op.start() File "<ipython-input-81-df441d984b0a>", line 194, in process File "<ipython-input-81-df441d984b0a>", line 173, in extract_entities File "<ipython-input-81-df441d984b0a>", line 95, in get_company_sentences

All of this to say, its a bit perplexing for me, and not completely intuitive – even though the service when it works is awesome.

I am executing the job from a Jupyter noteboook (not using the interactive runner, just executing the script).

Main pipeline is below:

p = beam.Pipeline()

#Create a collection from Bigquery
articles = p | "GetArticles" >> beam.io.ReadFromBigQuery(query='SELECT id,uuid, company_id_id, title, full_text, FROM `MY TABLE` ', gcs_location=dataflow_gcs_location, project='my_project',use_standard_sql=True)

#Extract entities with NLP
entities = articles | "ExtractEntity" >> beam.ParDo(EntityExtraction()) 

#Write to bigquery 
entities | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('myproject:dataset.table', schema = schema,custom_gcs_temp_location=dataflow_gcs_location, create_disposition="CREATE_IF_NEEDED",write_disposition="WRITE_APPEND")  ```

What am I doing wrong? Is this a memory issue? Should I not be reading and writing to BigQuery like this and instead output to a file and create a table from that? Would love some help, and sorry for the long post, wanted to provide as much context as possible.

Advertisement

Answer

Maybe late to the party but I made some tests on a BigQuery table containing 7.7M rows with strings to process of approx. 350 words.

I ran the same kind of pipeline as you:

  1. Read data from BigQuery
  2. Clean the strings using python string library
  3. Using Spacy fr_core_news_lg model, get the lemmatized part of strings
  4. Write the data back to BigQuery (in a different table)

At the beginning I got the same problem as you, number of elements/sec going down with time.

I realized it was a problem with RAM. I changed the machine_type from custom-1-3072 to custom-1-15360-ext and went from the same profile as yours to this one:

case 2

I think Dataflow can handle big amount of rows with NLP model but you have to be sure you give enough RAM to the workers.

Also, using number_of_worker_harness_threads=1 to be sure Dataflow will not spawn multiple threads (and thus spliting ram into the threads) is important.

You can also take a look at this stack thread, the initial problem is the same.

Last thing, my worker’s CPU utilization went from: cpu not enough ram

To : enter image description here

Which is also a sign of lacking RAM.

EDIT: I ran my pipeline using the same data amount scale as you to be sure my test wasn’t biased, and the results are the same: amount of RAM seems to be the key to make the job run smoothly:

test 38M rows

User contributions licensed under: CC BY-SA
2 People found this is helpful
Advertisement