I have PySpark dataframe (source_df
) in which there is a column with values that are comma-separated.
I am trying to replace those values with a lookup based on another dataframe (lookup_df
)
source_df
A B T ... followed by N unrelated columns... foo a,b,c sam bar k,a,c bob faz b,a,f sam
lookup_df
C D a h1 b h2 c h3
output dataframe:
A T B new_col ... followed by N unrelated columns... foo sam a,b,c h1,h2,h3 bar bob h,a,c EMPTY,h1,h3 faz sam b,a,f h2,h1,EMPTY
Column A
is a primary key and is always unique. Column T
is unique for a given value of A
.
Advertisement
Answer
You can split and explode the column B and do a left join. Then collect the D values and concat with comma.
import pyspark.sql.functions as F result = source_df.withColumn( 'B_split', F.explode(F.split('B', ',')) ).alias('s').join( lookup_df.alias('l'), F.expr('s.B_split = l.C'), 'left' ).drop('C').na.fill( 'EMPTY', ['D'] ).groupBy( source_df.columns ).agg( F.concat_ws(',', F.collect_list('D')).alias('new_col') ) result.show() +---+-----+---+-----------+ | A| B| T| new_col| +---+-----+---+-----------+ |foo|a,b,c|sam| h1,h2,h3| |faz|b,a,f|sam|h2,h1,EMPTY| |bar|k,a,c|bob|EMPTY,h1,h3| +---+-----+---+-----------+