I am struggling to transform my pyspark dataframe which looks like this:
JavaScript
x
10
10
1
df = spark.createDataFrame([('0018aad4',[300, 450], ['{"v1": "blue"}', '{"v2": "red"}']), ('0018aad5',[300], ['{"v1": "blue"}'])],[ "id","Tlist", 'Tstring'])
2
df.show(2, False)
3
4
+--------+----------+-------------------------------+
5
|id |Tlist |Tstring |
6
+--------+----------+-------------------------------+
7
|0018aad4|[300, 450]|[{"v1": "blue"}, {"v2": "red"}]|
8
|0018aad5|[300] |[{"v1": "blue"}] |
9
+--------+----------+-------------------------------+
10
to this:
JavaScript
1
10
10
1
df_result = spark.createDataFrame([('0018aad4',[300, 450], 'blue', 'red'), ('0018aad5',[300], 'blue', None)],[ "id","Tlist", 'v1', 'v2'])
2
df_result.show(2, False)
3
4
+--------+----------+----+----+
5
|id |Tlist |v1 |v2 |
6
+--------+----------+----+----+
7
|0018aad4|[300, 450]|blue|red |
8
|0018aad5|[300] |blue|null|
9
+--------+----------+----+----+
10
I tried to pivot and a bunch of others things but don’t get the result above.
Note that I don’t have the exact number of dict in the column Tstring
Do you know how I can do this?
Advertisement
Answer
Using transform
function you can convert each element of the array into a map type. After that, you can use aggregate
function to get one map, explode it then pivot the keys to get the desired output:
JavaScript
1
27
27
1
from pyspark.sql import functions as F
2
3
df1 = df.withColumn(
4
"Tstring",
5
F.transform("Tstring", lambda x: F.from_json(x, "map<string,string>"))
6
).withColumn(
7
"Tstring",
8
F.aggregate(
9
F.expr("slice(Tstring, 2, size(Tstring))"),
10
F.col("Tstring")[0],
11
lambda acc, x: F.map_concat(acc, x)
12
)
13
).select(
14
"id", "Tlist", F.explode("Tstring")
15
).groupby(
16
"id", "Tlist"
17
).pivot("key").agg(F.first("value"))
18
19
20
df1.show()
21
#+--------+----------+----+----+
22
#|id |Tlist |v1 |v2 |
23
#+--------+----------+----+----+
24
#|0018aad4|[300, 450]|blue|red |
25
#|0018aad5|[300] |blue|null|
26
#+--------+----------+----+----+
27
I’m using Spark 3.1+, so the higher-order functions such as transform
are available in dataframe API but you can do the same using expr
for spark <3.1:
JavaScript
1
8
1
df1 = (df.withColumn("Tstring", F.expr("transform(Tstring, x-> from_json(x, 'map<string,string>'))"))
2
.withColumn("Tstring", F.expr("aggregate(slice(Tstring, 2, size(Tstring)), Tstring[0], (acc, x) -> map_concat(acc, x))"))
3
.select("id", "Tlist", F.explode("Tstring"))
4
.groupby("id", "Tlist")
5
.pivot("key")
6
.agg(F.first("value"))
7
)
8