I’m pretty new to data processing. I have a deeply nested dataset that have this approximately this schema :
|-- col1 : string |-- col2 : string |-- col3: struct | |-- something : string | |-- elem: array | | |-- registrationNumber: struct | | | |-- registrationNumber : string | | | |-- registrationNumberType : string | | | |-- registrationCode : int
For the array, I will receive something like this. Keep in mind that the length is variable, I might receive no value or 10 or even more
[
{
registrationNumber : 123456789
registrationNumberType : VAT
registrationCode : 1234
},
{
registrationNumber : ABCDERTYU
registrationNumberType : fiscal1
registrationCode : 9876
},
{
registrationNumber : 123456789
registrationNumberType : foo
registrationCode : 8765
}
]
Is there a way to transform the schema to :
|-- col1 : string |-- col2 : string |-- col3: struct | |-- something : string | |-- VAT: string | |-- fiscal1: string
with VAT and fiscal1 value being the registrationNumber value.
I basically need to get a column with the VAT and the fiscal1 value as column
Thanks so much
Edit:
Here is a sample json of col3
{
"col3": {
"somestring": "xxxxxx",
"registrationNumbers": [
{
'registrationNumber' : 'something',
'registrationNumberType' : 'VAT'
},
{
'registrationNumber' : 'somethingelse',
'registrationNumberType' : 'fiscal1'
},
{
'registrationNumber' : 'something i dont need',
'registrationNumberType' : 'fiscal2'
}
]
}
}
and here is what I would like to have :
{
"col3": {
"somestring": "xxxxxx",
"VAT" : "something"
"fiscal1" : "somethingelse"
}
}
Maybe I can, create a dataframe using the array and the primary key, create VAT and fiscal1 columns and select data from the new dataframe to input in the column?
Finally to join the 2 dataframes using the primary key
Advertisement
Answer
You can use inline function to explode and expand the struct elements of col3.registrationNumbers array, then filter only rows with registrationNumberType either VAT or fiscal1 and pivot. After pivot, update the struct column col3with the pivoted columns:
import pyspark.sql.functions as F
exampleJSON = '{"col1":"col1_XX","col2":"col2_XX","col3":{"somestring":"xxxxxx","registrationNumbers":[{"registrationNumber":"something","registrationNumberType":"VAT"},{"registrationNumber":"somethingelse","registrationNumberType":"fiscal1"},{"registrationNumber":"something i dont need","registrationNumberType":"fiscal2"}]}}'
df = spark.read.json(sc.parallelize([exampleJSON]))
df1 = df.selectExpr("*", "inline(col3.registrationNumbers)")
.filter(F.col("registrationNumberType").isin(["VAT", "fiscal1"]))
.groupBy("col1", "col2", "col3")
.pivot("registrationNumberType")
.agg(F.first("registrationNumber"))
.withColumn("col3", F.struct(F.col("col3.somestring"), F.col("VAT"), F.col("fiscal1")))
.drop("VAT", "fiscal1")
df1.printSchema()
#root
# |-- col1: string (nullable = true)
# |-- col2: string (nullable = true)
# |-- col3: struct (nullable = false)
# | |-- somestring: string (nullable = true)
# | |-- VAT: string (nullable = true)
# | |-- fiscal1: string (nullable = true)
df1.show(truncate=False)
#+-------+-------+----------------------------------+
#|col1 |col2 |col3 |
#+-------+-------+----------------------------------+
#|col1_XX|col2_XX|{xxxxxx, something, somethingelse}|
#+-------+-------+----------------------------------+