Skip to content
Advertisement

json explode – return filtered array of records

I have some JSON I have exploded however I need to filter the return based on where the “locale” is en_GB and I only wish to return that data in the dataframe.

I currently have

from pyspark.sql import functions as F
from pyspark.sql.functions import *

jsonDF2 = jsonDF.withColumn('d', explode(col('d.picklistOptions.results.picklistLabels.results')))
jsonDF2 = jsonDF2.select(jsonDF2.d.optionId, jsonDF2.d.label,jsonDF2.d.locale).filter(array_contains(col("d.locale"), "en_GB")).show(10)

However this obviously does as it says it returns me the rows where en_GB is in locale but I actually only want it to return each row that was matching in tabular form.

+--------------------+--------------------+--------------------+
|          d.optionId|             d.label|            d.locale|
+--------------------+--------------------+--------------------+
|[45024, 45024, 45...|[Theft the compan...|[en_GB, en_US, th...|
|[45046, 45046, 45...|[Back to home tow...|[th_TH, en_US, en...|
|[45031, 45031, 45...|[Non-confirmation...|[en_GB, en_US, th...|
|[45023, 45023, 45...|[Commit a serious...|[en_GB, en_US, th...|
|[45015, 45015, 45...|[Absence more tha...|[en_GB, en_US, th...|
|[45017, 45017, 45...|[Resignation, Res...|[th_TH, en_US, en...|
|[45039, 45039, 45...|[Be an alien, Be ...|[th_TH, en_US, en...|
|[45048, 45048, 45...|[Education, Educa...|[th_TH, en_US, en...|
|[45043, 45043, 45...|[Farming, Farming...|[th_TH, en_US, en...|
|[45040, 45040, 45...|[Death, Death, De...|[en_GB, en_US, th...|
+--------------------+--------------------+--------------------+

So basically I would want just

+--------------------+--------------------+--------------------+
|          d.optionId|             d.label|            d.locale|
+--------------------+--------------------+--------------------+
|[45024              |[Theft the compan   |en_GB               |
|[45624              |[Back to home tow.. |en_GB               |

Where have I gone wrong? can someone please explain.

Update

I have changed the code to this

from pyspark.sql import functions as F
from pyspark.sql.functions import *

jsonDF2 = jsonDF.withColumn('d', explode(col('d.picklistOptions.results.picklistLabels.results')))

jsonDF2 = jsonDF2.select(explode(jsonDF2.d.optionId), jsonDF2.d.label,jsonDF2.d.locale).withColumnRenamed("col","optionId").distinct()
jsonDF2 = jsonDF2.select(jsonDF2.optionId, explode(jsonDF2.d.label),jsonDF2.d.locale).withColumnRenamed("col","label").distinct()
jsonDF2.show(100,truncate=False)

Result:

+--------+------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------+
|optionId|d.label                                                                                                                                                     |d.locale             |
+--------+------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------+
|45995   |[ลาออก โอนย้ายระหว่างภาค, Resignation and Transfer across Region]                                                                                           |[th_TH, en_GB]       |
|45016   |[Absence after training No show, Absence after training No show, Absence after training No show]                                                            |[th_TH, en_US, en_GB]|

It then fails with

AttributeError: 'DataFrame' object has no attribute 'd'

But the column is called “d.label”.

Dataframe Schema after initial explode.

root
 |-- d: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- __metadata: struct (nullable = true)
 |    |    |    |-- type: string (nullable = true)
 |    |    |    |-- uri: string (nullable = true)
 |    |    |-- id: string (nullable = true)
 |    |    |-- label: string (nullable = true)
 |    |    |-- locale: string (nullable = true)
 |    |    |-- optionId: string (nullable = true)
 |    |    |-- picklistOption: struct (nullable = true)
 |    |    |    |-- __deferred: struct (nullable = true)
 |    |    |    |    |-- uri: string (nullable = true)

This is how it looks after my initial select and explode the “optionId” as shown by your example.

enter image description here

UPDATE 2

JSON Example

{
    "d": {
        "__metadata": {
            "uri": "https://someapi.insomeplace.com/odata/v2/Picklist('cust_resignReason')",
            "type": "SFOData.Picklist"
        },
        "picklistId": "cust_resignReason",
        "picklistOptions": {
            "results": [
                {
                    "__metadata": {
                        "uri": "https://someapi.insomeplace.com/odata/v2/PicklistOption(45024L)",
                        "type": "SFOData.PicklistOption"
                    },
                    "id": "45024",
                    "minValue": "0",
                    "externalCode": null,
                    "maxValue": "0",
                    "optionValue": "-1",
                    "sortOrder": 10,
                    "mdfExternalCode": "Theft_(company)",
                    "status": "ACTIVE",
                    "parentPicklistOption": {
                        "__deferred": {
                            "uri": "https://someapi.insomeplace.com/odata/v2/PicklistOption(45024L)/parentPicklistOption"
                        }
                    },
                    "picklistLabels": {
                        "results": [
                            {
                                "__metadata": {
                                    "uri": "https://someapi.insomeplace.com/odata/v2/PicklistLabel(locale='en_GB',optionId=45024L)",
                                    "type": "SFOData.PicklistLabel"
                                },
                                "optionId": "45024",
                                "locale": "en_GB",
                                "id": "177967",
                                "label": "Theft the company property",
                                "picklistOption": {
                                    "__deferred": {
                                        "uri": "https://someapi.insomeplace.com/odata/v2/PicklistLabel(locale='en_GB',optionId=45024L)/picklistOption"
                                    }
                                }
                            },
                            {
                                "__metadata": {
                                    "uri": "https://someapi.insomeplace.com/odata/v2/PicklistLabel(locale='en_US',optionId=45024L)",
                                    "type": "SFOData.PicklistLabel"
                                },
                                "optionId": "45024",
                                "locale": "en_US",
                                "id": "177968",
                                "label": "Theft the company property",
                                "picklistOption": {
                                    "__deferred": {
                                        "uri": "https://someapi.insomeplace.com/odata/v2/PicklistLabel(locale='en_US',optionId=45024L)/picklistOption"
                                    }
                                }
                            },
                            {
                                "__metadata": {
                                    "uri": "https://someapi.insomeplace.com/odata/v2/PicklistLabel(locale='th_TH',optionId=45024L)",
                                    "type": "SFOData.PicklistLabel"
                                },
                                "optionId": "45024",
                                "locale": "th_TH",
                                "id": "177969",
                                "label": "Theft the company property",
                                "picklistOption": {
                                    "__deferred": {
                                        "uri": "https://someapi.insomeplace.com/odata/v2/PicklistLabel(locale='th_TH',optionId=45024L)/picklistOption"
                                    }
                                }
                            }
                        ]
                    },
                    "picklist": {
                        "__deferred": {
                            "uri": "https://someapi.insomeplace.com/odata/v2/PicklistOption(45024L)/picklist"
                        }
                    },
                    "childPicklistOptions": {
                        "__deferred": {
                            "uri": "https://someapi.insomeplace.com/odata/v2/PicklistOption(45024L)/childPicklistOptions"
                        }
                    }
                },
                {
                    "__metadata": {
                        "uri": "https://someapi.insomeplace.com/odata/v2/PicklistOption(45995L)",
                        "type": "SFOData.PicklistOption"
                    },
                    "id": "45995",
                    "minValue": "0",
                    "externalCode": "Transfer",
                    "maxValue": "0",
                    "optionValue": "-1",
                    "sortOrder": 40,
                    "mdfExternalCode": "Transfer",
                    "status": "ACTIVE",
                    "parentPicklistOption": {
                        "__deferred": {
                            "uri": "https://someapi.insomeplace.com/odata/v2/PicklistOption(45995L)/parentPicklistOption"
                        }
                    },
                    "picklistLabels": {
                        "results": [
                            {
                                "__metadata": {
                                    "uri": "https://someapi.insomeplace.com/odata/v2/PicklistLabel(locale='th_TH',optionId=45995L)",
                                    "type": "SFOData.PicklistLabel"
                                },
                                "optionId": "45995",
                                "locale": "th_TH",
                                "id": "181793",
                                "label": "ลาออก โอนย้ายระหว่างภาค",
                                "picklistOption": {
                                    "__deferred": {
                                        "uri": "https://someapi.insomeplace.com/odata/v2/PicklistLabel(locale='th_TH',optionId=45995L)/picklistOption"
                                    }
                                }
                            },
                            {
                                "__metadata": {
                                    "uri": "https://someapi.insomeplace.com/odata/v2/PicklistLabel(locale='en_GB',optionId=45995L)",
                                    "type": "SFOData.PicklistLabel"
                                },
                                "optionId": "45995",
                                "locale": "en_GB",
                                "id": "181791",
                                "label": "Resignation and Transfer across Region",
                                "picklistOption": {
                                    "__deferred": {
                                        "uri": "https://someapi.insomeplace.com/odata/v2/PicklistLabel(locale='en_GB',optionId=45995L)/picklistOption"
                                    }
                                }
                            }
                        ]
                    },
                    "picklist": {
                        "__deferred": {
                            "uri": "https://someapi.insomeplace.com/odata/v2/PicklistOption(45995L)/picklist"
                        }
                    },
                    "childPicklistOptions": {
                        "__deferred": {
                            "uri": "https://someapi.insomeplace.com/odata/v2/PicklistOption(45995L)/childPicklistOptions"
                        }
                    }
                }
            ]
        }
    }
}

Advertisement

Answer

My initial sample data:

+--------------------+--------------------+--------------------+
|            optionId|               label|              locale|
+--------------------+--------------------+--------------------+
|      [45024, 45024]|[Theft the compan...|      [en_GB, en_US]|
|[45624, 45624, 45...|[Back to home tow...|[th_TH, en_US, en...|
|      [45048, 45048]|[Education, Educa...|      [en_TH, en_US]|
+--------------------+--------------------+--------------------+

Since d.optionId, d.label and d.locale have values as arrays and the values of d.optionId and d.label are an array of same values. So, you can use explode() funtion again on d.optionId and d.label while using distinct to convert the array with same elements to just the value.

jsonDF2 = jsonDF2.select(explode(jsonDF2.optionId),jsonDF2.label,jsonDF2.locale).withColumnRenamed("col","optionId").distinct()
jsonDF2 = jsonDF2.select(jsonDF2.optionId,explode(jsonDF2.label),jsonDF2.locale).withColumnRenamed("col","label").distinct()
jsonDF2.show(100,truncate=False)

+--------+-----------------+---------------------+
|optionId|label            |locale               |
+--------+-----------------+---------------------+
|45624   |Back to home town|[th_TH, en_US, en_GB]|
|45048   |Education        |[en_TH, en_US]       |
|45024   |Theft the company|[en_GB, en_US]       |
+--------+-----------------+---------------------+

Now your requirement is to get d.locale to also be a value using filter() function based on the condition that en_GB is in d.locale. Since you already know the value you are using to filter the dataframe, you can select only d.optionId and d.lable from jsonDF2 and insert the d.locale with ‘en_GB’ as its value. The following code will help:

solution_df = jsonDF2.select(jsonDF2.optionId, jsonDF2.label).filter(array_contains(col("locale"), "en_GB")).withColumn("locale", lit("en_GB"))
solution_df.show(truncate=False)

+--------+-----------------+------+
|optionId|label            |locale|
+--------+-----------------+------+
|45624   |Back to home town|en_GB |
|45024   |Theft the company|en_GB |
+--------+-----------------+------+

Updated Code:

jsonDF2 = jsonDF2.select(explode(col('optionId')),'`d.label`','`d.locale`').withColumnRenamed("col","optionId").distinct()
jsonDF2 = jsonDF2.select('optionId',explode(jsonDF2['`d.label`']),'`d.locale`').withColumnRenamed("col","d.label").distinct()

solution_df = jsonDF2.select('optionId', '`d.label`').filter(array_contains(col("`d.locale`"), "en_GB")).withColumn("d.locale", lit("en_GB"))
solution_df.show(truncate=False)

Update #2: The better way for you to achieve your requirement is to convert the pyspark dataframe to pandas dataframe and perform operations as shown below.

#same as your code

jsonDF2 = jsonDF.withColumn('d', explode(col('d.picklistOptions.results.picklistLabels.results')))

jsonDF2.printSchema()

#selecting the columns
jsonDF3 = jsonDF2.select(jsonDF2.d.optionId, jsonDF2.d.label,jsonDF2.d.locale)

#printing its schema
jsonDF3.show(truncate=False)

+---------------------+------------------------------------------------------------------------------------+---------------------+
|d.optionId           |d.label                                                                             |d.locale             |
+---------------------+------------------------------------------------------------------------------------+---------------------+
|[45024, 45024, 45024]|[Theft the company property, Theft the company property, Theft the company property]|[en_GB, en_US, th_TH]|
|[45995, 45995]       |[ลาออก โอนย้ายระหว่างภาค, Resignation and Transfer across Region]                   |[th_TH, en_GB]       |
+---------------------+------------------------------------------------------------------------------------+---------------------+

Now convert the Pyspark dataframe to Pandas dataframe. Use iterrows() function of pandas dataframe to extract data as per the requirement.

jsonDF_pd= jsonDF3.toPandas()

final=[]
for i in jsonDF_pd.iterrows():
    list1=i[1][0]
    list2=i[1][1]
    list3=i[1][2]
    for i in range(len(list1)):
        final. append ([list1[i],list2[i],str(list3[i])])
#print(final)
#You can use this 'final' array as input data to create dataframe as below
cols = ['optionId','label','locale']
finalDF = spark.createDataFrame(final,cols)
finalDF.show()

+--------+--------------------+------+
|optionId|               label|locale|
+--------+--------------------+------+
|   45024|Theft the company...| en_GB|
|   45024|Theft the company...| en_US|
|   45024|Theft the company...| th_TH|
|   45995|ลาออก โอนย้ายระหว...| th_TH|
|   45995|Resignation and T...| en_GB|
+--------+--------------------+------+

Using filter:

finalDF.select('optionId','label','locale').filter(col('locale')=='en_GB').show(truncate=False)

+--------+--------------------------------------+------+
|optionId|label                                 |locale|
+--------+--------------------------------------+------+
|45024   |Theft the company property            |en_GB |
|45995   |Resignation and Transfer across Region|en_GB |
+--------+--------------------------------------+------+

Advertisement