Skip to content
Advertisement

Pyspark: How to flatten nested arrays by merging values in spark

I have 10000 jsons with different ids each has 10000 names. How to flatten nested arrays by merging values by int or str in pyspark?

EDIT: I have added column name_10000_xvz to explain better data structure. I have updated Notes, Input df, required output df and input json files as well.

Notes:

  • Input dataframe has more than 10000 columns name_1_a, name_1000_xx so column(array) names can not be hardcoded as it will requires to write 10000 names
  • id, date, val has always the same naming convention across all columns and all jsons
  • array size can vary but date, val are always there so they can be hardcoded
  • date can be different in each array, for example name_1_a starts with 2001, but name_10000_xvz for id == 1 starts with 2000 and finnish with 2004, however for id == 2 starts with 1990 and finish with 2004

Input df:

root
 |-- id: long (nullable = true)
 |-- name_10000_xvz: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- date: long (nullable = true)
 |    |    |-- val: long (nullable = true)
 |-- name_1_a: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- date: long (nullable = true)
 |    |    |-- val: long (nullable = true)
 |-- name_1_b: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- date: long (nullable = true)
 |    |    |-- val: long (nullable = true)
 |-- name_2_a: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- date: long (nullable = true)
 |    |    |-- val: long (nullable = true)

+---+------------------------------------------------------------------------+---------------------------------+---------------------------------+------------------------------------+
|id |name_10000_xvz                                                          |name_1_a                         |name_1_b                         |name_2_a                            |
+---+------------------------------------------------------------------------+---------------------------------+---------------------------------+------------------------------------+
|2  |[{1990, 39}, {2000, 30}, {2001, 31}, {2002, 32}, {2003, 33}, {2004, 34}]|[{2001, 1}, {2002, 2}, {2003, 3}]|[{2001, 4}, {2002, 5}, {2003, 6}]|[{2001, 21}, {2002, 22}, {2003, 23}]|
|1  |[{2000, 30}, {2001, 31}, {2002, 32}, {2003, 33}]                        |[{2001, 1}, {2002, 2}, {2003, 3}]|[{2001, 4}, {2002, 5}, {2003, 6}]|[{2001, 21}, {2002, 22}, {2003, 23}]|
+---+------------------------------------------------------------------------+---------------------------------+---------------------------------+------------------------------------+

Required output df:

+---+---------+----------+-----------+---------+----------------+
|id |   date  | name_1_a | name_1_b  |name_2_a | name_10000_xvz |
+---+---------+----------+-----------+---------+----------------+
|1  |   2000  |     0    |    0      |   0     |        30      |
|1  |   2001  |     1    |    4      |   21    |        31      |
|1  |   2002  |     2    |    5      |   22    |        32      |
|1  |   2003  |     3    |    6      |   23    |        33      |
|2  |   1990  |     0    |    0      |   0     |        39      |
|2  |   2000  |     0    |    0      |   0     |        30      |
|2  |   2001  |     1    |    4      |   21    |        31      |
|2  |   2002  |     2    |    5      |   22    |        32      |
|2  |   2003  |     3    |    6      |   23    |        33      |
|2  |   2004  |     0    |    0      |   0     |        34      |
+---+---------+----------+-----------+---------+----------------+

To reproduce input df:

df = spark.read.json(sc.parallelize([
  """{"id":1,"name_1_a":[{"date":2001,"val":1},{"date":2002,"val":2},{"date":2003,"val":3}],"name_1_b":[{"date":2001,"val":4},{"date":2002,"val":5},{"date":2003,"val":6}],"name_2_a":[{"date":2001,"val":21},{"date":2002,"val":22},{"date":2003,"val":23}],"name_10000_xvz":[{"date":2000,"val":30},{"date":2001,"val":31},{"date":2002,"val":32},{"date":2003,"val":33}]}""",
  """{"id":2,"name_1_a":[{"date":2001,"val":1},{"date":2002,"val":2},{"date":2003,"val":3}],"name_1_b":[{"date":2001,"val":4},{"date":2002,"val":5},{"date":2003,"val":6}],"name_2_a":[{"date":2001,"val":21},{"date":2002,"val":22},{"date":2003,"val":23}],"name_10000_xvz":[{"date":1990,"val":39},{"date":2000,"val":30},{"date":2001,"val":31},{"date":2002,"val":32},{"date":2003,"val":33},{"date":2004,"val":34}]}}"""
]))

Useful links:

Advertisement

Answer

UPDATE

As @werner has mentioned, it’s necessary to transform all structs to append the column name into it.

import pyspark.sql.functions as f

names = [column for column in df.columns if column.startswith('name_')]

expressions = []
for name in names:
  expressions.append(f.expr('TRANSFORM({name}, el -> STRUCT("{name}" AS name, el.date, el.val))'.format(name=name)))

flatten_df = (df
              .withColumn('flatten', f.flatten(f.array(*expressions)))
              .selectExpr('id', 'inline(flatten)'))

output_df = (flatten_df
             .groupBy('id', 'date')
             .pivot('name', names)
             .agg(f.first('val')))

output_df.sort('id', 'date').show(truncate=False)
+---+----+--------------+--------+--------+--------+
|id |date|name_10000_xvz|name_1_a|name_1_b|name_2_a|
+---+----+--------------+--------+--------+--------+
|1  |2000|30            |null    |null    |null    |
|1  |2001|31            |1       |4       |21      |
|1  |2002|32            |2       |5       |22      |
|1  |2003|33            |3       |6       |23      |
|2  |1990|39            |null    |null    |null    |
|2  |2000|30            |null    |null    |null    |
|2  |2001|31            |1       |4       |21      |
|2  |2002|32            |2       |5       |22      |
|2  |2003|33            |3       |6       |23      |
|2  |2004|34            |null    |null    |null    |
+---+----+--------------+--------+--------+--------+

OLD

Assuming:

  • date value is always the same value all columns
  • name_1_a, name_1_b, name_2_a their sizes are equals
import pyspark.sql.functions as f

output_df = (df
             .withColumn('flatten', f.expr('TRANSFORM(SEQUENCE(0, size(name_1_a) - 1), i -> ' 
                                           'STRUCT(name_1_a[i].date AS date, ' 
                                           '       name_1_a[i].val AS name_1_a, ' 
                                           '       name_1_b[i].val AS name_1_b, ' 
                                           '       name_2_a[i].val AS name_2_a))'))
             .selectExpr('id', 'inline(flatten)'))

output_df.sort('id', 'date').show(truncate=False)
+---+----+--------+--------+--------+
|id |date|name_1_a|name_1_b|name_2_a|
+---+----+--------+--------+--------+
|1  |2001|1       |4       |21      |
|1  |2002|2       |5       |22      |
|1  |2003|3       |6       |23      |
|2  |2001|1       |4       |21      |
|2  |2002|2       |5       |22      |
|2  |2003|3       |6       |23      |
+---+----+--------+--------+--------+
User contributions licensed under: CC BY-SA
7 People found this is helpful
Advertisement