Skip to content
Advertisement

Create column from array of struct Pyspark

I’m pretty new to data processing. I have a deeply nested dataset that have this approximately this schema :

 |-- col1 : string
 |-- col2 : string
 |-- col3: struct
 |    |-- something : string
 |    |-- elem: array
 |    |    |-- registrationNumber: struct
 |    |    |     |-- registrationNumber : string
 |    |    |     |-- registrationNumberType : string
 |    |    |     |-- registrationCode : int

For the array, I will receive something like this. Keep in mind that the length is variable, I might receive no value or 10 or even more

[
  {
    registrationNumber : 123456789
    registrationNumberType : VAT
    registrationCode : 1234
  },
  {
    registrationNumber : ABCDERTYU
    registrationNumberType : fiscal1
    registrationCode : 9876
  },
  {
    registrationNumber : 123456789
    registrationNumberType : foo
    registrationCode : 8765
  }
]

Is there a way to transform the schema to :

 |-- col1 : string
 |-- col2 : string
 |-- col3: struct
 |    |-- something : string
 |    |-- VAT: string
 |    |-- fiscal1: string 

with VAT and fiscal1 value being the registrationNumber value. I basically need to get a column with the VAT and the fiscal1 value as column

Thanks so much

Edit:

Here is a sample json of col3

{
        "col3": {
            "somestring": "xxxxxx",
            "registrationNumbers": [
              {
                'registrationNumber' : 'something',
                'registrationNumberType' : 'VAT'
              },
              {
                'registrationNumber' : 'somethingelse',
                'registrationNumberType' : 'fiscal1'
              },
              {
                'registrationNumber' : 'something i dont need',
                'registrationNumberType' : 'fiscal2'
              }
            ]
        }
}

and here is what I would like to have :

{
        "col3": {
            "somestring": "xxxxxx",
            "VAT" : "something"
            "fiscal1" : "somethingelse"
        }
}

Maybe I can, create a dataframe using the array and the primary key, create VAT and fiscal1 columns and select data from the new dataframe to input in the column? Finally to join the 2 dataframes using the primary key

Advertisement

Answer

You can use inline function to explode and expand the struct elements of col3.registrationNumbers array, then filter only rows with registrationNumberType either VAT or fiscal1 and pivot. After pivot, update the struct column col3with the pivoted columns:

import pyspark.sql.functions as F

exampleJSON = '{"col1":"col1_XX","col2":"col2_XX","col3":{"somestring":"xxxxxx","registrationNumbers":[{"registrationNumber":"something","registrationNumberType":"VAT"},{"registrationNumber":"somethingelse","registrationNumberType":"fiscal1"},{"registrationNumber":"something i dont need","registrationNumberType":"fiscal2"}]}}'
df = spark.read.json(sc.parallelize([exampleJSON]))

df1 = df.selectExpr("*", "inline(col3.registrationNumbers)") 
    .filter(F.col("registrationNumberType").isin(["VAT", "fiscal1"])) 
    .groupBy("col1", "col2", "col3") 
    .pivot("registrationNumberType") 
    .agg(F.first("registrationNumber")) 
    .withColumn("col3", F.struct(F.col("col3.somestring"), F.col("VAT"), F.col("fiscal1"))) 
    .drop("VAT", "fiscal1")

df1.printSchema()
#root
# |-- col1: string (nullable = true)
# |-- col2: string (nullable = true)
# |-- col3: struct (nullable = false)
# |    |-- somestring: string (nullable = true)
# |    |-- VAT: string (nullable = true)
# |    |-- fiscal1: string (nullable = true)

df1.show(truncate=False)
#+-------+-------+----------------------------------+
#|col1   |col2   |col3                              |
#+-------+-------+----------------------------------+
#|col1_XX|col2_XX|{xxxxxx, something, somethingelse}|
#+-------+-------+----------------------------------+
User contributions licensed under: CC BY-SA
9 People found this is helpful
Advertisement