I’m trying to use hadoop to get some statistics from a json file like average number of stars for a category or language with most reviews. To do this I am using mrjob, I found this code:
import re from mrjob.job import MRJob from mrjob.protocol import JSONValueProtocol from mrjob.step import MRStep WORD_RE = re.compile(r"[w']+") class MRMostUsedWord(MRJob): FILES = ['stop_words.txt'] OUTPUT_PROTOCOL = JSONValueProtocol def configure_args(self): super(MRMostUsedWord, self).configure_args() # allow for alternate stop words file self.add_file_arg( '--stop-words-file', dest='stop_words_file', default=None, help='alternate stop words file. lowercase words, one per line', ) def mapper_init(self): stop_words_path = self.options.stop_words_file or 'stop_words.txt' with open(stop_words_path) as f: self.stop_words = set(line.strip() for line in f) def mapper_get_words(self, _, line): # yield each word in the line for word in WORD_RE.findall(line): word = word.lower() if word not in self.stop_words: yield (word, 1) def combiner_count_words(self, word, counts): # sum the words we've seen so far yield (word, sum(counts)) def reducer_count_words(self, word, counts): # send all (num_occurrences, word) pairs to the same reducer. # num_occurrences is so we can easily use Python's max() function. yield None, (sum(counts), word) # discard the key; it is just None def reducer_find_max_word(self, _, word_count_pairs): # each item of word_count_pairs is (count, word), # so yielding one results in key=counts, value=word try: yield max(word_count_pairs) except ValueError: pass def steps(self): return [ MRStep(mapper_init=self.mapper_init, mapper=self.mapper_get_words, combiner=self.combiner_count_words, reducer=self.reducer_count_words), MRStep(reducer=self.reducer_find_max_word) ] if __name__ == '__main__': MRMostUsedWord.run()
It allows to find the most used word, but I am not sure how to do this with json attributes instead of words.
A sample of the json:
{“review_id”: “en_0690095”, “product_id”: “product_en_0440378”, “reviewer_id”: “reviewer_en_0133349”, “stars”: “1”, “review_body”: “the cabinet dot were all detached from backing… got me”, “review_title”: “Not use able”, “language”: “en”, “product_category”: “home_improvement”}
{“review_id”: “en_0311558”, “product_id”: “product_en_0399702”, “reviewer_id”: “reviewer_en_0152034”, “stars”: “1”, “review_body”: “I received my first order of this product and it was broke so I ordered it again. The second one was broke in more places than the first. I can’t blame the shipping process as it’s shrink wrapped and boxed.”, “review_title”: “The product is junk.”, “language”: “en”, “product_category”: “home”}
Advertisement
Answer
For me was useful just to use json.loads, like:
def mapper(self, _, line): review = json.loads(line)