I am new to Spark and I am having a silly “what’s-the-best-approach” issue. Basically, I have a map(dict) that I would like to loop over. During each iteration, I want to search through a column in a spark dataframe using rlike
regex and assign the key of the dict to a new column using withColumn
maps = {"groceries": ["hot chocolate", "milk", "sugar", "flour"], "laundry": ["soap", "detergent", "fabric softener"] }
The data sample is shown below
+--------------------+-----------+ | id|item_bought| +--------------------+-----------+ |uiq7Zq52Bww4pZXc3xri| Soap| |fpJatwxTeObcbuJH25UI| Detergent| |MdK1q5gBygIGFYyvbz8J| Milk| +--------------------+-----------+
I want to get a dataframe that looks like this:
+--------------------+-----------+---------+ | id|item_bought| class| +--------------------+-----------+---------+ |uiq7Zq52Bww4pZXc3xri| Soap| Laundry| |fpJatwxTeObcbuJH25UI| Detergent| Laundry| |MdK1q5gBygIGFYyvbz8J| Milk|Groceries| +--------------------+-----------+---------+
I have over 100M records and I want an approach that uses Spark’s best practices (distributed computing). One approach that comes to mind is to loop through the map and use rlike
or str.contains
for the regex search as shown below:
for key, value in maps.items(): pattern = '|'.join([f'(?i){x}' for x in value]). # ignore case df.withColumn("class", col("item_bought").rlike(pattern))
But this returns true
or false
for the rlike search. I want to substitute the true or false with the key
value.
Also, considering that I have 100M (up to 150M) records, is looping through the map the best approach?
EDIT
What if the items_bought
in the df
had special characters (or some extra text)?
+--------------------+----------------+ | id| item_bought| +--------------------+----------------+ |uiq7Zq52Bww4pZXc3xri| Soap -&ju10kg| |fpJatwxTeObcbuJH25UI|Detergent x.ju2i| |MdK1q5gBygIGFYyvbz8J| Milk| +--------------------+----------------+
I don’t wanna do a text clean up first, just assign classes based on regex keyword search
Advertisement
Answer
With you situation, I will turn the map into a dataframe. I assume the resultant dataframe will be relatively small. Use abroadcast join. What this does is that it distribute the small df to each worker node avoiding a shuffle.
#Create df from maps df_ref = spark.createDataFrame(maps.items(), schema =('class','item_bought')).withColumn('item_bought',explode('item_bought')).withColumn('item_bought', initcap('item_bought')) #Broadcast join df.join(broadcast(df_ref), how='left', on='item_bought').show() +-----------+--------------------+---------+ |item_bought| id| class| +-----------+--------------------+---------+ | Soap|uiq7Zq52Bww4pZXc3xri| laundry| | Detergent|fpJatwxTeObcbuJH25UI| laundry| | Milk|MdK1q5gBygIGFYyvbz8J|groceries| +-----------+--------------------+---------+
Following your edit
df_ref = spark.createDataFrame(maps.items(), schema =('class','item_bought1')).withColumn('item_bought1',explode('item_bought1')).withColumn('item_bought1', initcap('item_bought1')) df.withColumn('item_bought1',regexp_extract('item_bought','^[A-Za-z]+',0)).join(broadcast(df_ref), how='left', on='item_bought1').show() +------------+--------------------+----------------+---------+ |item_bought1| id| item_bought| class| +------------+--------------------+----------------+---------+ | Soap|uiq7Zq52Bww4pZXc3xri| Soap| laundry| | Detergent|fpJatwxTeObcbuJH25UI| Detergent| laundry| | Milk|MdK1q5gBygIGFYyvbz8J| Milk|groceries| | Soap|uiq7Zq52Bww4pZXc3xri| Soap -&ju10kg| laundry| | Detergent|fpJatwxTeObcbuJH25UI|Detergent x.ju2i| laundry| | Milk|MdK1q5gBygIGFYyvbz8J| Milk|groceries|
+————+——————–+—————-+———+