Skip to content
Advertisement

MRJob: I’m having a client error while using EMR

I’m a newbie in mrjob and EMR and I’m still trying to figure out how things work. So I’m having this error when I’m running my script:

python3 MovieSimilarities.py -r emr --items=ml-100k/u.item ml-100k/u.data > sims2t.txt

No configs found; falling back on auto-configuration
No configs specified for emr runner
Using s3://mrjob-35beccaf67be4929/tmp/ as our temp dir on S3
Creating temp directory /tmp/MovieSimilarities.hostname.20201101.164744.518416
uploading working dir files to s3://mrjob-35beccaf67be4929/tmp/MovieSimilarities.hostname.20201101.164744.518416/files/wd...
Copying other local files to s3://mrjob-35beccaf67be4929/tmp/MovieSimilarities.hostname.20201101.164744.518416/files/
Created new cluster j-320TQKHQJ683U
Added EMR tags to cluster j-320TQKHQJ683U: __mrjob_label=MovieSimilarities, __mrjob_owner=hostname, __mrjob_version=0.7.4
Waiting for Step 1 of 3 (s-1WHEBVTU60KAA) to complete...
  PENDING (cluster is STARTING)
  PENDING (cluster is STARTING)
  PENDING (cluster is STARTING: Configuring cluster software)
  PENDING (cluster is STARTING: Configuring cluster software)
  PENDING (cluster is STARTING: Configuring cluster software)
  PENDING (cluster is STARTING: Configuring cluster software)
  PENDING (cluster is STARTING: Configuring cluster software)
  master node is ec2-44-234-63-159.us-west-2.compute.amazonaws.com
  PENDING (cluster is RUNNING: Running step)
  RUNNING for 0:00:52
  COMPLETED
Attempting to fetch counters from logs...
Waiting for cluster (j-320TQKHQJ683U) to terminate...
  TERMINATING
  TERMINATED
Looking for step log in s3://mrjob-35beccaf67be4929/tmp/logs/j-320TQKHQJ683U/steps/s-1WHEBVTU60KAA...
  Parsing step log: s3://mrjob-35beccaf67be4929/tmp/logs/j-320TQKHQJ683U/steps/s-1WHEBVTU60KAA/syslog.gz
Counters: 60
    File Input Format Counters 
        Bytes Read=1994689
    File Output Format Counters 
        Bytes Written=1397908
    File System Counters
        FILE: Number of bytes read=658079
        FILE: Number of bytes written=2552888
        FILE: Number of large read operations=0
        FILE: Number of read operations=0
        FILE: Number of write operations=0
        HDFS: Number of bytes read=568
        HDFS: Number of bytes read erasure-coded=0
        HDFS: Number of bytes written=1397908
        HDFS: Number of large read operations=0
        HDFS: Number of read operations=13
        HDFS: Number of write operations=2
        S3: Number of bytes read=1994689
        S3: Number of bytes written=0
        S3: Number of large read operations=0
        S3: Number of read operations=0
        S3: Number of write operations=0
    Job Counters 
        Data-local map tasks=4
        Killed map tasks=1
        Launched map tasks=4
        Launched reduce tasks=1
        Total megabyte-milliseconds taken by all map tasks=91127808
        Total megabyte-milliseconds taken by all reduce tasks=17491968
        Total time spent by all map tasks (ms)=29664
        Total time spent by all maps in occupied slots (ms)=2847744
        Total time spent by all reduce tasks (ms)=2847
        Total time spent by all reduces in occupied slots (ms)=546624
        Total vcore-milliseconds taken by all map tasks=29664
        Total vcore-milliseconds taken by all reduce tasks=2847
    Map-Reduce Framework
        CPU time spent (ms)=23910
        Combine input records=0
        Combine output records=0
        Failed Shuffles=0
        GC time elapsed (ms)=834
        Input split bytes=568
        Map input records=100000
        Map output bytes=1879173
        Map output materialized bytes=683872
        Map output records=100000
        Merged Map outputs=4
        Peak Map Physical memory (bytes)=712859648
        Peak Map Virtual memory (bytes)=4446281728
        Peak Reduce Physical memory (bytes)=230252544
        Peak Reduce Virtual memory (bytes)=7088242688
        Physical memory (bytes) snapshot=2708877312
        Reduce input groups=943
        Reduce input records=100000
        Reduce output records=943
        Reduce shuffle bytes=683872
        Shuffled Maps =4
        Spilled Records=200000
        Total committed heap usage (bytes)=2690646016
        Virtual memory (bytes) snapshot=24827822080
    Shuffle Errors
        BAD_ID=0
        CONNECTION=0
        IO_ERROR=0
        WRONG_LENGTH=0
        WRONG_MAP=0
        WRONG_REDUCE=0
Terminating cluster: j-320TQKHQJ683U
Traceback (most recent call last):
  File "MovieSimilarities.py", line 129, in <module>
    MovieSimilarities.run()
  File "/home/hostname/PycharmProjects/Taming-Big-Data-with-MapReduce-and-Hadoop/venv/lib/python3.8/site-packages/mrjob/job.py", line 616, in run
    cls().execute()
  File "/home/hostname/PycharmProjects/Taming-Big-Data-with-MapReduce-and-Hadoop/venv/lib/python3.8/site-packages/mrjob/job.py", line 687, in execute
    self.run_job()
  File "/home/hostname/PycharmProjects/Taming-Big-Data-with-MapReduce-and-Hadoop/venv/lib/python3.8/site-packages/mrjob/job.py", line 636, in run_job
    runner.run()
  File "/home/hostname/PycharmProjects/Taming-Big-Data-with-MapReduce-and-Hadoop/venv/lib/python3.8/site-packages/mrjob/runner.py", line 503, in run
    self._run()
  File "/home/hostname/PycharmProjects/Taming-Big-Data-with-MapReduce-and-Hadoop/venv/lib/python3.8/site-packages/mrjob/emr.py", line 705, in _run
    self._finish_run()
  File "/home/hostname/PycharmProjects/Taming-Big-Data-with-MapReduce-and-Hadoop/venv/lib/python3.8/site-packages/mrjob/emr.py", line 710, in _finish_run
    self._wait_for_steps_to_complete()
  File "/home/hostname/PycharmProjects/Taming-Big-Data-with-MapReduce-and-Hadoop/venv/lib/python3.8/site-packages/mrjob/emr.py", line 1570, in _wait_for_steps_to_complete
    self._add_steps_to_cluster(
  File "/home/hostname/PycharmProjects/Taming-Big-Data-with-MapReduce-and-Hadoop/venv/lib/python3.8/site-packages/mrjob/emr.py", line 1537, in _add_steps_to_cluster
    step_ids = emr_client.add_job_flow_steps(**steps_kwargs)['StepIds']
  File "/home/hostname/PycharmProjects/Taming-Big-Data-with-MapReduce-and-Hadoop/venv/lib/python3.8/site-packages/mrjob/retry.py", line 108, in call_and_maybe_retry
    return f(*args, **kwargs)
  File "/home/hostname/PycharmProjects/Taming-Big-Data-with-MapReduce-and-Hadoop/venv/lib/python3.8/site-packages/botocore/client.py", line 357, in _api_call
    return self._make_api_call(operation_name, kwargs)
  File "/home/hostname/PycharmProjects/Taming-Big-Data-with-MapReduce-and-Hadoop/venv/lib/python3.8/site-packages/botocore/client.py", line 676, in _make_api_call
    raise error_class(parsed_response, operation_name)
botocore.exceptions.ClientError: An error occurred (ValidationException) when calling the AddJobFlowSteps operation: A job flow that is shutting down, terminated, or finished may not be modified.

Here’s the code:

from mrjob.job import MRJob
from mrjob.step import MRStep
from math import sqrt

from itertools import combinations


class MovieSimilarities(MRJob):

    def __init__(self, args=None):
        super().__init__(args)
        self.movieNames = {}

    def configure_args(self):
        super(MovieSimilarities, self).configure_args()
        self.add_file_arg('--items', help='Path to u.item')

    def load_movie_names(self):
        # Load database of movie names.
        with open("u.item", encoding='ascii', errors='ignore') as f:
            for line in f:
                fields = line.split('|')
                self.movieNames[int(fields[0])] = fields[1]

    def steps(self):
        return [
            MRStep(mapper=self.mapper_parse_input,
                   reducer=self.reducer_ratings_by_user),
            MRStep(mapper=self.mapper_create_item_pairs,
                   reducer=self.reducer_compute_similarity),
            MRStep(mapper=self.mapper_sort_similarities,
                   mapper_init=self.load_movie_names,
                   reducer=self.reducer_output_similarities)]

    def mapper_parse_input(self, key, line):
        # Outputs userID => (movieID, rating)
        (userID, movieID, rating, timestamp) = line.split('t')
        yield userID, (movieID, float(rating))

    def reducer_ratings_by_user(self, user_id, itemRatings):
        # Group (item, rating) pairs by userID

        ratings = []
        for movieID, rating in itemRatings:
            ratings.append((movieID, rating))

        yield user_id, ratings

    def mapper_create_item_pairs(self, user_id, itemRatings):
        # Find every pair of movies each user has seen, and emit
        # each pair with its associated ratings

        # "combinations" finds every possible pair from the list of movies
        # this user viewed.
        for itemRating1, itemRating2 in combinations(itemRatings, 2):
            movieID1 = itemRating1[0]
            rating1 = itemRating1[1]
            movieID2 = itemRating2[0]
            rating2 = itemRating2[1]

            # Produce both orders so sims are bi-directional
            yield (movieID1, movieID2), (rating1, rating2)
            yield (movieID2, movieID1), (rating2, rating1)

    def cosine_similarity(self, ratingPairs):
        # Computes the cosine similarity metric between two
        # rating vectors.
        numPairs = 0
        sum_xx = sum_yy = sum_xy = 0
        for ratingX, ratingY in ratingPairs:
            sum_xx += ratingX * ratingX
            sum_yy += ratingY * ratingY
            sum_xy += ratingX * ratingY
            numPairs += 1

        numerator = sum_xy
        denominator = sqrt(sum_xx) * sqrt(sum_yy)

        score = 0
        if (denominator):
            score = (numerator / (float(denominator)))

        return (score, numPairs)

    def reducer_compute_similarity(self, moviePair, ratingPairs):
        # Compute the similarity score between the ratings vectors
        # for each movie pair viewed by multiple people

        # Output movie pair => score, number of co-ratings

        score, numPairs = self.cosine_similarity(ratingPairs)

        # Enforce a minimum score and minimum number of co-ratings
        # to ensure quality
        if numPairs > 10 and score > 0.95:
            yield moviePair, (score, numPairs)

    def mapper_sort_similarities(self, moviePair, scores):
        # Shuffle things around so the key is (movie1, score)
        # so we have meaningfully sorted results.
        score, n = scores
        movie1, movie2 = moviePair

        yield (self.movieNames[int(movie1)], score), 
              (self.movieNames[int(movie2)], n)

    def reducer_output_similarities(self, movieScore, similarN):
        # Output the results.
        # Movie => Similar Movie, score, number of co-ratings
        movie1, score = movieScore
        for movie2, n in similarN:
            yield movie1, (movie2, score, n)


if __name__ == '__main__':
    MovieSimilarities.run()

Here’s the link to get the data: files.grouplens.org/datasets/movielens/ml-100k.zip

I have exported my aws_access_key_id and aws_secret_access_key in my .bashrc and restarted my shell.

I need help to undertand what I’m doing wrong, what does botocore.exceptions.ClientError: An error occurred (ValidationException) when calling the AddJobFlowSteps operation: A job flow that is shutting down, terminated, or finished may not be modified. means ?

Advertisement

Answer

the botocore package is actually deprecated, any since that module relies on the botocore package, that module is now broken. Sorry for the inconvenience.

User contributions licensed under: CC BY-SA
7 People found this is helpful
Advertisement