I am new to Spark and I am having a silly “what’s-the-best-approach” issue. Basically, I have a map(dict) that I would like to loop over. During each iteration, I want to search through a column in a spark dataframe using rlike regex and assign the key of the dict to a new column using withColumn
maps = {"groceries": ["hot chocolate", "milk", "sugar", "flour"],
"laundry": ["soap", "detergent", "fabric softener"]
}
The data sample is shown below
+--------------------+-----------+ | id|item_bought| +--------------------+-----------+ |uiq7Zq52Bww4pZXc3xri| Soap| |fpJatwxTeObcbuJH25UI| Detergent| |MdK1q5gBygIGFYyvbz8J| Milk| +--------------------+-----------+
I want to get a dataframe that looks like this:
+--------------------+-----------+---------+ | id|item_bought| class| +--------------------+-----------+---------+ |uiq7Zq52Bww4pZXc3xri| Soap| Laundry| |fpJatwxTeObcbuJH25UI| Detergent| Laundry| |MdK1q5gBygIGFYyvbz8J| Milk|Groceries| +--------------------+-----------+---------+
I have over 100M records and I want an approach that uses Spark’s best practices (distributed computing). One approach that comes to mind is to loop through the map and use rlike or str.contains for the regex search as shown below:
for key, value in maps.items():
pattern = '|'.join([f'(?i){x}' for x in value]). # ignore case
df.withColumn("class", col("item_bought").rlike(pattern))
But this returns true or false for the rlike search. I want to substitute the true or false with the key value.
Also, considering that I have 100M (up to 150M) records, is looping through the map the best approach?
EDIT
What if the items_bought in the df had special characters (or some extra text)?
+--------------------+----------------+ | id| item_bought| +--------------------+----------------+ |uiq7Zq52Bww4pZXc3xri| Soap -&ju10kg| |fpJatwxTeObcbuJH25UI|Detergent x.ju2i| |MdK1q5gBygIGFYyvbz8J| Milk| +--------------------+----------------+
I don’t wanna do a text clean up first, just assign classes based on regex keyword search
Advertisement
Answer
With you situation, I will turn the map into a dataframe. I assume the resultant dataframe will be relatively small. Use abroadcast join. What this does is that it distribute the small df to each worker node avoiding a shuffle.
#Create df from maps
df_ref = spark.createDataFrame(maps.items(), schema =('class','item_bought')).withColumn('item_bought',explode('item_bought')).withColumn('item_bought', initcap('item_bought'))
#Broadcast join
df.join(broadcast(df_ref), how='left', on='item_bought').show()
+-----------+--------------------+---------+
|item_bought| id| class|
+-----------+--------------------+---------+
| Soap|uiq7Zq52Bww4pZXc3xri| laundry|
| Detergent|fpJatwxTeObcbuJH25UI| laundry|
| Milk|MdK1q5gBygIGFYyvbz8J|groceries|
+-----------+--------------------+---------+
Following your edit
df_ref = spark.createDataFrame(maps.items(), schema =('class','item_bought1')).withColumn('item_bought1',explode('item_bought1')).withColumn('item_bought1', initcap('item_bought1'))
df.withColumn('item_bought1',regexp_extract('item_bought','^[A-Za-z]+',0)).join(broadcast(df_ref), how='left', on='item_bought1').show()
+------------+--------------------+----------------+---------+
|item_bought1| id| item_bought| class|
+------------+--------------------+----------------+---------+
| Soap|uiq7Zq52Bww4pZXc3xri| Soap| laundry|
| Detergent|fpJatwxTeObcbuJH25UI| Detergent| laundry|
| Milk|MdK1q5gBygIGFYyvbz8J| Milk|groceries|
| Soap|uiq7Zq52Bww4pZXc3xri| Soap -&ju10kg| laundry|
| Detergent|fpJatwxTeObcbuJH25UI|Detergent x.ju2i| laundry|
| Milk|MdK1q5gBygIGFYyvbz8J| Milk|groceries|
+————+——————–+—————-+———+