Skip to content
Advertisement

ModuleNotFoundError in Dataflow job

I am trying to execute a apache beam pipeline as a dataflow job in Google Cloud Platform.

My project structure is as follows:

root_dir/
  __init__.py
  ​setup.py
  ​main.py
  ​utils/
    __init__.py
    log_util.py
    config_util.py

Here’s my setup.py

setuptools.setup(
   name='dataflow_example',
   version='1.0',
   install_requires=[
        "google-cloud-tasks==2.2.0",
        "google-cloud-pubsub>=0.1.0",
        "google-cloud-storage==1.39.0",
        "google-cloud-bigquery==2.6.2",
        "google-cloud-secret-manager==2.0.0",
        "google-api-python-client==2.3.0",
        "oauth2client==4.1.3",
        "apache-beam[gcp]>=2.20.0",
        "wheel>=0.36.2"
   ],
   packages=setuptools.find_packages()
)

Here’s my pipeline code:

import math
import apache_beam as beam

from datetime import datetime
from apache_beam.options.pipeline_options import PipelineOptions

from utils.log_util import LogUtil
from utils.config_util import ConfigUtil


class DataflowExample:
    config = {}

    def __init__(self):
        self.config = ConfigUtil.get_config(module_config=["config"])
        self.project = self.config['project']
        self.region = self.config['location']
        self.bucket = self.config['core_bucket']
        self.batch_size = 10

    def execute_pipeline(self):
        try:
            LogUtil.log_n_notify(log_type="info", msg=f"Dataflow started")

            query = "SELECT id, name, company FROM `<bigquery_table>` LIMIT 10"

            beam_options = {
                "project": self.project,
                "region": self.region,
                "job_name": "dataflow_example",
                "runner": "DataflowRunner",
                "temp_location": f"gs://{self.bucket}/temp_location/"
            }

            options = PipelineOptions(**beam_options, save_main_session=True)

            with beam.Pipeline(options=options) as pipeline:
                data = (
                        pipeline
                        | 'Read from BQ ' >> beam.io.Read(beam.io.ReadFromBigQuery(query=query, use_standard_sql=True))
                        | 'Count records' >> beam.combiners.Count.Globally()
                        | 'Print ' >> beam.ParDo(PrintCount(), self.batch_size)
                )

            LogUtil.log_n_notify(log_type="info", msg=f"Dataflow completed")
        except Exception as e:
            LogUtil.log_n_notify(log_type="error", msg=f"Exception in execute_pipeline - {str(e)}")


class PrintCount(beam.DoFn):

    def __init__(self):
        self.logger = LogUtil()

    def process(self, row_count, batch_size):
        try:
            current_date = datetime.today().date()
            total = int(math.ceil(row_count / batch_size))

            self.logger.log_n_notify(log_type="info", msg=f"Records pulled from table on {current_date} is {row_count}")

            self.logger.log_n_notify(log_type="info", msg=f"Records per batch: {batch_size}. Total batches: {total}")
        except Exception as e:
            self.logger.log_n_notify(log_type="error", msg=f"Exception in PrintCount.process  - {str(e)}")


if __name__ == "__main__":
    df_example = DataflowExample()
    df_example.execute_pipeline()

Functionality of pipeline is

  1. Query against BigQuery Table.
  2. Count the total records fetched from querying.
  3. Print using the custom Log module present in utils folder.

I am running the job using cloud shell using command - python3 - main.py

Though the Dataflow job starts, the worker nodes throws error after few mins saying “ModuleNotFoundError: No module named ‘utils'”

“utils” folder is available and the same code works fine when executed with “DirectRunner”.

log_util and config_util files are custom util files for logging and config fetching respectively.

Also, I tried running with setup_file options as python3 - main.py --setup_file </path/of/setup.py> which makes the job to just freeze and does not proceed even after 15 mins.

How do I resolve the ModuleNotFoundError with “DataflowRunner”?

Advertisement

Answer

Posting as community wiki. As confirmed by @GopinathS the error and fix are as follows:

The error encountered by the workers is Beam SDK base version 2.32.0 does not match Dataflow Python worker version 2.28.0. Please check Dataflow worker startup logs and make sure that correct version of Beam SDK is installed.

To fix this “apache-beam[gcp]>=2.20.0” is removed from install_requires of setup.py since, the ‘>=’ is assigning the latest available version (2.32.0 as of this writing) while the workers version are only 2.28.0.

Updated setup.py:

setuptools.setup(
   name='dataflow_example',
   version='1.0',
   install_requires=[
        "google-cloud-tasks==2.2.0",
        "google-cloud-pubsub>=0.1.0",
        "google-cloud-storage==1.39.0",
        "google-cloud-bigquery==2.6.2",
        "google-cloud-secret-manager==2.0.0",
        "google-api-python-client==2.3.0",
        "oauth2client==4.1.3", # removed apache-beam[gcp]>=2.20.0
        "wheel>=0.36.2"
   ],
   packages=setuptools.find_packages()
)

Updated beam_options in the pipeline code:

    beam_options = {
        "project": self.project,
        "region": self.region,
        "job_name": "dataflow_example",
        "runner": "DataflowRunner",
        "temp_location": f"gs://{self.bucket}/temp_location/",
        "setup_file": "./setup.py"
    }

Also make sure that you pass all the pipeline options at once and not partially.

If you pass --setup_file </path/of/setup.py> in the command then make sure to read and append the setup file path into the already defined beam_options variable using argument_parser in your code.

To avoid parsing the argument and appending into beam_options I instead added it directly in beam_options as "setup_file": "./setup.py"

User contributions licensed under: CC BY-SA
3 People found this is helpful
Advertisement