I am trying to execute a apache beam pipeline as a dataflow job in Google Cloud Platform.
My project structure is as follows:
root_dir/ __init__.py setup.py main.py utils/ __init__.py log_util.py config_util.py
Here’s my setup.py
setuptools.setup( name='dataflow_example', version='1.0', install_requires=[ "google-cloud-tasks==2.2.0", "google-cloud-pubsub>=0.1.0", "google-cloud-storage==1.39.0", "google-cloud-bigquery==2.6.2", "google-cloud-secret-manager==2.0.0", "google-api-python-client==2.3.0", "oauth2client==4.1.3", "apache-beam[gcp]>=2.20.0", "wheel>=0.36.2" ], packages=setuptools.find_packages() )
Here’s my pipeline code:
import math import apache_beam as beam from datetime import datetime from apache_beam.options.pipeline_options import PipelineOptions from utils.log_util import LogUtil from utils.config_util import ConfigUtil class DataflowExample: config = {} def __init__(self): self.config = ConfigUtil.get_config(module_config=["config"]) self.project = self.config['project'] self.region = self.config['location'] self.bucket = self.config['core_bucket'] self.batch_size = 10 def execute_pipeline(self): try: LogUtil.log_n_notify(log_type="info", msg=f"Dataflow started") query = "SELECT id, name, company FROM `<bigquery_table>` LIMIT 10" beam_options = { "project": self.project, "region": self.region, "job_name": "dataflow_example", "runner": "DataflowRunner", "temp_location": f"gs://{self.bucket}/temp_location/" } options = PipelineOptions(**beam_options, save_main_session=True) with beam.Pipeline(options=options) as pipeline: data = ( pipeline | 'Read from BQ ' >> beam.io.Read(beam.io.ReadFromBigQuery(query=query, use_standard_sql=True)) | 'Count records' >> beam.combiners.Count.Globally() | 'Print ' >> beam.ParDo(PrintCount(), self.batch_size) ) LogUtil.log_n_notify(log_type="info", msg=f"Dataflow completed") except Exception as e: LogUtil.log_n_notify(log_type="error", msg=f"Exception in execute_pipeline - {str(e)}") class PrintCount(beam.DoFn): def __init__(self): self.logger = LogUtil() def process(self, row_count, batch_size): try: current_date = datetime.today().date() total = int(math.ceil(row_count / batch_size)) self.logger.log_n_notify(log_type="info", msg=f"Records pulled from table on {current_date} is {row_count}") self.logger.log_n_notify(log_type="info", msg=f"Records per batch: {batch_size}. Total batches: {total}") except Exception as e: self.logger.log_n_notify(log_type="error", msg=f"Exception in PrintCount.process - {str(e)}") if __name__ == "__main__": df_example = DataflowExample() df_example.execute_pipeline()
Functionality of pipeline is
- Query against BigQuery Table.
- Count the total records fetched from querying.
- Print using the custom Log module present in utils folder.
I am running the job using cloud shell using command - python3 - main.py
Though the Dataflow job starts, the worker nodes throws error after few mins saying “ModuleNotFoundError: No module named ‘utils'”
“utils” folder is available and the same code works fine when executed with “DirectRunner”.
log_util
and config_util
files are custom util files for logging and config fetching respectively.
Also, I tried running with setup_file
options as python3 - main.py --setup_file </path/of/setup.py>
which makes the job to just freeze and does not proceed even after 15 mins.
How do I resolve the ModuleNotFoundError with “DataflowRunner”?
Advertisement
Answer
Posting as community wiki. As confirmed by @GopinathS the error and fix are as follows:
The error encountered by the workers is Beam SDK base version 2.32.0 does not match Dataflow Python worker version 2.28.0. Please check Dataflow worker startup logs and make sure that correct version of Beam SDK is installed
.
To fix this “apache-beam[gcp]>=2.20.0” is removed from install_requires
of setup.py since, the ‘>=’ is assigning the latest available version (2.32.0 as of this writing) while the workers version are only 2.28.0.
Updated setup.py:
setuptools.setup( name='dataflow_example', version='1.0', install_requires=[ "google-cloud-tasks==2.2.0", "google-cloud-pubsub>=0.1.0", "google-cloud-storage==1.39.0", "google-cloud-bigquery==2.6.2", "google-cloud-secret-manager==2.0.0", "google-api-python-client==2.3.0", "oauth2client==4.1.3", # removed apache-beam[gcp]>=2.20.0 "wheel>=0.36.2" ], packages=setuptools.find_packages() )
Updated beam_options
in the pipeline code:
beam_options = { "project": self.project, "region": self.region, "job_name": "dataflow_example", "runner": "DataflowRunner", "temp_location": f"gs://{self.bucket}/temp_location/", "setup_file": "./setup.py" }
Also make sure that you pass all the pipeline options at once and not partially.
If you pass --setup_file </path/of/setup.py>
in the command then make sure to read and append the setup file path into the already defined beam_options
variable using argument_parser
in your code.
To avoid parsing the argument and appending into beam_options
I instead added it directly in beam_options
as "setup_file": "./setup.py"