I have PySpark dataframe (source_df) in which there is a column with values that are comma-separated.
I am trying to replace those values with a lookup based on another dataframe (lookup_df)
source_df
A B T ... followed by N unrelated columns... foo a,b,c sam bar k,a,c bob faz b,a,f sam
lookup_df
C D a h1 b h2 c h3
output dataframe:
A T B new_col ... followed by N unrelated columns... foo sam a,b,c h1,h2,h3 bar bob h,a,c EMPTY,h1,h3 faz sam b,a,f h2,h1,EMPTY
Column A is a primary key and is always unique. Column T is unique for a given value of A.
Advertisement
Answer
You can split and explode the column B and do a left join. Then collect the D values and concat with comma.
import pyspark.sql.functions as F
result = source_df.withColumn(
    'B_split',
    F.explode(F.split('B', ','))
).alias('s').join(
    lookup_df.alias('l'),
    F.expr('s.B_split = l.C'),
    'left'
).drop('C').na.fill(
    'EMPTY', ['D']
).groupBy(
    source_df.columns
).agg(
    F.concat_ws(',', F.collect_list('D')).alias('new_col')
)
result.show()
+---+-----+---+-----------+
|  A|    B|  T|    new_col|
+---+-----+---+-----------+
|foo|a,b,c|sam|   h1,h2,h3|
|faz|b,a,f|sam|h2,h1,EMPTY|
|bar|k,a,c|bob|EMPTY,h1,h3|
+---+-----+---+-----------+
