Skip to content
Advertisement

Spark ERROR in cluster: ModuleNotFoundError: No module named ‘cst_utils’

I have a Spark program with python. The structure of the program is like this:

  cst_utils.py
  bn_utils.py
  ep_utils.py
  main.py

Each cst_utils.py,bn_utils.py,ep_utils.py has a function called Spark_Func(sc). In main I make a Spark Context, sc, and send it to the each Spark_Func like this:

  import cst_utils as cu
  import bn_utils as bu
  import ep_utils as eu

  spark_conf = SparkConf().setAppName('app_name') 
    .setMaster("spark://x.x.x.x:7077") 
    .set('spark.executor.memory', "8g") 
    .set('spark.executor.cores', 4) 
    .set('spark.task.cpus', 2)

   sc = SparkContext(conf=spark_conf)

   cu.spark_func(sc)
   bu.spark_func(sc)
   eu.spark_func(sc) 

I config Spark cluster with two Slaves and One Master, all of them have Ubuntu 20.04 OS. I set Master IP in spark-env.sh and make SSH passwordless that Master node can access to each Slave node without Authentication. I run these command in each node:

MASTER NODE:

   ./start-master.sh

SLAVES:

   ./start-worker.sh spark://x.x.x.x:7077

The cluster is made, because I can see SPARK UI with this command in browser:

  http://x.x.x.x:8080

But when I want to run the program with this command:

  /opt/spark/bin/spark-submit --master spark://x.x.x.x:7077 main.py

I receive this error:

    22/02/16 16:39:20 INFO SparkContext: Starting job: count at /home/hs/Desktop/etl/cst_utils.py:442
    22/02/16 16:39:20 INFO DAGScheduler: Registering RDD 2 (reduceByKey at /home/hs/Desktop/etl/cst_utils.py:434) as input to shuffle 0
    22/02/16 16:39:20 INFO DAGScheduler: Got job 0 (count at /home/hs/Desktop/etl/cst_utils.py:442) with 1 output partitions
    22/02/16 16:39:20 INFO DAGScheduler: Final stage: ResultStage 1 (count at /home/hs/Desktop/etl/cst_utils.py:442)
    22/02/16 16:39:20 INFO DAGScheduler: Parents of final stage: List(ShuffleMapStage 0)
    22/02/16 16:39:20 INFO DAGScheduler: Missing parents: List(ShuffleMapStage 0)
    22/02/16 16:39:20 INFO DAGScheduler: Submitting ShuffleMapStage 0 (PairwiseRDD[2] at reduceByKey at /home/hs/Desktop/etl/cst_utils.py:434), which has no missing parents
    22/02/16 16:39:20 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 9.4 KiB, free 366.3 MiB)
    22/02/16 16:39:20 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 5.9 KiB, free 366.3 MiB)
    22/02/16 16:39:20 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on x.x.x.x:43875 (size: 5.9 KiB, free: 366.3 MiB)
    22/02/16 16:39:20 INFO SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:1388
    22/02/16 16:39:20 INFO DAGScheduler: Submitting 1 missing tasks from ShuffleMapStage 0 (PairwiseRDD[2] at reduceByKey at /home/hs/Desktop/etl/cst_utils.py:434) (first 15 tasks are for partitions Vector(0))
    22/02/16 16:39:20 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks resource profile 0
    22/02/16 16:39:21 INFO CoarseGrainedSchedulerBackend$DriverEndpoint: Registered executor NettyRpcEndpointRef(spark-client://Executor) (z.z.z.z:39668) with ID 1,  ResourceProfileId 0
    22/02/16 16:39:21 INFO CoarseGrainedSchedulerBackend$DriverEndpoint: Registered executor NettyRpcEndpointRef(spark-client://Executor) (y.y.y.y:46330) with ID 0,  ResourceProfileId 0
    22/02/16 16:39:21 INFO BlockManagerMasterEndpoint: Registering block manager y.y.y.y:34159 with 4.1 GiB RAM, BlockManagerId(0, y.y.y.y, 34159, None)
    22/02/16 16:39:21 INFO BlockManagerMasterEndpoint: Registering block manager z.z.z.z:42231 with 4.1 GiB RAM, BlockManagerId(1, z.z.z.z, 42231, None)
    22/02/16 16:39:21 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0) (y.y.y.y, executor 0, partition 0, PROCESS_LOCAL, 4481 bytes) taskResourceAssignments Map()
    22/02/16 16:39:21 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on y.y.y.y:34159 (size: 5.9 KiB, free: 4.1 GiB)
    22/02/16 16:39:22 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0) (y.y.y.y executor 0): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
    File "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 586, in main
func, profiler, deserializer, serializer = read_command(pickleSer, infile)
    File "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 69, in read_command
command = serializer._read_with_length(file)
    File "/opt/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 160, in _read_with_length
    return self.loads(obj)
    File "/opt/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 430, in loads
    return pickle.loads(obj, encoding=encoding)
    ModuleNotFoundError: No module named 'cst_utils'

The program path is in the same path for all of the node, as well as SPARK path.

In fact,when I run the program in local mode, it run without any issue. However, to run locally, I use this config in SPARK CONTEXT:

 spark_conf = SparkConf().setAppName('app_name') 
    .setMaster("local[4]") 
    .set('spark.executor.memory', "8g") 
    .set('spark.executor.cores', 4) 
    .set('spark.task.cpus', 1)

    sc = SparkContext(conf=spark_conf)

Update 1 :

I also use virtual environment and install all the packages in it to distribute them among nodes. In details:

  1. To create virtual environment in python run this command:

    sudo apt install python3.8-venv
    
  2. Create virtual environment:

    python3 -m venv my_venv
    
  3. Enter to the enviroment:

    source my_vent/bin/activate
    
  4. I use venv-pack to pack all packages that you install in your project.

    pip install venv-pack
    
  5. Pack the packages:

    venv-pack -o my_venv.tar.gz
    

Moreover, as Spark site said, I put all the .py files of the project in a folder and compress it into .zip folder.

Finally after making the cluster, I run this command:

  /opt/spark/bin/spark-submit --master spark://x.x.x.x:7077 --archives my_venv.tar.gz#environment --py-files my_files.zip main.py

But, it end up with this error:

  Traceback (most recent call last):
  File "/home/spark/Desktop/etl/main.py", line 3, in <module>
  import cst_utils as cu
  File "/home/spark/Desktop/etl/cst_utils.py", line 5, in <module>
  import group_state as gs
  File "/home/spark/Desktop/etl/group_state.py", line 1, in <module>
  import numpy as np
  ModuleNotFoundError: No module named 'numpy'

Would you please guide me what is wrong with running code in Cluster?

Any help would be really appreciated.

Advertisement

Answer

Problem solved.

First, I installed all packages in each node with this command:

 python3 -m pip install PACKAGE

Then, when I run the program, I must write all the PY files which used in the program, in front of –py-files like this:

 /opt/spark/bin/spark-submit --master spark://x.x.x.x:7077 --files sparkConfig.json --py-files cst_utils.py,grouping.py,group_state.py,g_utils.py,csts.py,oracle_connection.py,config.py,brn_utils.py,emp_utils.py main.py    

Then I don’t have any error about importing the file.

User contributions licensed under: CC BY-SA
3 People found this is helpful
Advertisement