I have 10000 jsons with different ids each has 10000 names. How to flatten nested arrays by merging values by int or str in pyspark?
EDIT: I have added column name_10000_xvz
to explain better data structure. I have updated Notes, Input df, required output df and input json files as well.
Notes:
- Input dataframe has more than 10000 columns name_1_a, name_1000_xx so column(array) names can not be hardcoded as it will requires to write 10000 names
id
,date
,val
has always the same naming convention across all columns and all jsons- array size can vary but
date
,val
are always there so they can be hardcoded date
can be different in each array, for example name_1_a starts with 2001, but name_10000_xvz for id == 1 starts with 2000 and finnish with 2004, however for id == 2 starts with 1990 and finish with 2004
Input df:
root |-- id: long (nullable = true) |-- name_10000_xvz: array (nullable = true) | |-- element: struct (containsNull = true) | | |-- date: long (nullable = true) | | |-- val: long (nullable = true) |-- name_1_a: array (nullable = true) | |-- element: struct (containsNull = true) | | |-- date: long (nullable = true) | | |-- val: long (nullable = true) |-- name_1_b: array (nullable = true) | |-- element: struct (containsNull = true) | | |-- date: long (nullable = true) | | |-- val: long (nullable = true) |-- name_2_a: array (nullable = true) | |-- element: struct (containsNull = true) | | |-- date: long (nullable = true) | | |-- val: long (nullable = true) +---+------------------------------------------------------------------------+---------------------------------+---------------------------------+------------------------------------+ |id |name_10000_xvz |name_1_a |name_1_b |name_2_a | +---+------------------------------------------------------------------------+---------------------------------+---------------------------------+------------------------------------+ |2 |[{1990, 39}, {2000, 30}, {2001, 31}, {2002, 32}, {2003, 33}, {2004, 34}]|[{2001, 1}, {2002, 2}, {2003, 3}]|[{2001, 4}, {2002, 5}, {2003, 6}]|[{2001, 21}, {2002, 22}, {2003, 23}]| |1 |[{2000, 30}, {2001, 31}, {2002, 32}, {2003, 33}] |[{2001, 1}, {2002, 2}, {2003, 3}]|[{2001, 4}, {2002, 5}, {2003, 6}]|[{2001, 21}, {2002, 22}, {2003, 23}]| +---+------------------------------------------------------------------------+---------------------------------+---------------------------------+------------------------------------+
Required output df:
+---+---------+----------+-----------+---------+----------------+ |id | date | name_1_a | name_1_b |name_2_a | name_10000_xvz | +---+---------+----------+-----------+---------+----------------+ |1 | 2000 | 0 | 0 | 0 | 30 | |1 | 2001 | 1 | 4 | 21 | 31 | |1 | 2002 | 2 | 5 | 22 | 32 | |1 | 2003 | 3 | 6 | 23 | 33 | |2 | 1990 | 0 | 0 | 0 | 39 | |2 | 2000 | 0 | 0 | 0 | 30 | |2 | 2001 | 1 | 4 | 21 | 31 | |2 | 2002 | 2 | 5 | 22 | 32 | |2 | 2003 | 3 | 6 | 23 | 33 | |2 | 2004 | 0 | 0 | 0 | 34 | +---+---------+----------+-----------+---------+----------------+
To reproduce input df:
df = spark.read.json(sc.parallelize([ """{"id":1,"name_1_a":[{"date":2001,"val":1},{"date":2002,"val":2},{"date":2003,"val":3}],"name_1_b":[{"date":2001,"val":4},{"date":2002,"val":5},{"date":2003,"val":6}],"name_2_a":[{"date":2001,"val":21},{"date":2002,"val":22},{"date":2003,"val":23}],"name_10000_xvz":[{"date":2000,"val":30},{"date":2001,"val":31},{"date":2002,"val":32},{"date":2003,"val":33}]}""", """{"id":2,"name_1_a":[{"date":2001,"val":1},{"date":2002,"val":2},{"date":2003,"val":3}],"name_1_b":[{"date":2001,"val":4},{"date":2002,"val":5},{"date":2003,"val":6}],"name_2_a":[{"date":2001,"val":21},{"date":2002,"val":22},{"date":2003,"val":23}],"name_10000_xvz":[{"date":1990,"val":39},{"date":2000,"val":30},{"date":2001,"val":31},{"date":2002,"val":32},{"date":2003,"val":33},{"date":2004,"val":34}]}}""" ]))
Useful links:
- How to flatten data frame with dynamic nested structs / arrays in PySpark
- https://docs.databricks.com/_static/notebooks/higher-order-functions.html
Advertisement
Answer
UPDATE
As @werner has mentioned, it’s necessary to transform all structs to append the column name into it.
import pyspark.sql.functions as f names = [column for column in df.columns if column.startswith('name_')] expressions = [] for name in names: expressions.append(f.expr('TRANSFORM({name}, el -> STRUCT("{name}" AS name, el.date, el.val))'.format(name=name))) flatten_df = (df .withColumn('flatten', f.flatten(f.array(*expressions))) .selectExpr('id', 'inline(flatten)')) output_df = (flatten_df .groupBy('id', 'date') .pivot('name', names) .agg(f.first('val'))) output_df.sort('id', 'date').show(truncate=False) +---+----+--------------+--------+--------+--------+ |id |date|name_10000_xvz|name_1_a|name_1_b|name_2_a| +---+----+--------------+--------+--------+--------+ |1 |2000|30 |null |null |null | |1 |2001|31 |1 |4 |21 | |1 |2002|32 |2 |5 |22 | |1 |2003|33 |3 |6 |23 | |2 |1990|39 |null |null |null | |2 |2000|30 |null |null |null | |2 |2001|31 |1 |4 |21 | |2 |2002|32 |2 |5 |22 | |2 |2003|33 |3 |6 |23 | |2 |2004|34 |null |null |null | +---+----+--------------+--------+--------+--------+
OLD
Assuming:
date
value is always the same value all columnsname_1_a, name_1_b, name_2_a
their sizes are equals
import pyspark.sql.functions as f output_df = (df .withColumn('flatten', f.expr('TRANSFORM(SEQUENCE(0, size(name_1_a) - 1), i -> ' 'STRUCT(name_1_a[i].date AS date, ' ' name_1_a[i].val AS name_1_a, ' ' name_1_b[i].val AS name_1_b, ' ' name_2_a[i].val AS name_2_a))')) .selectExpr('id', 'inline(flatten)')) output_df.sort('id', 'date').show(truncate=False) +---+----+--------+--------+--------+ |id |date|name_1_a|name_1_b|name_2_a| +---+----+--------+--------+--------+ |1 |2001|1 |4 |21 | |1 |2002|2 |5 |22 | |1 |2003|3 |6 |23 | |2 |2001|1 |4 |21 | |2 |2002|2 |5 |22 | |2 |2003|3 |6 |23 | +---+----+--------+--------+--------+