Skip to content
Advertisement

PySpark reversing StringIndexer in nested array

I’m using PySpark to do collaborative filtering using ALS. My original user and item id’s are strings, so I used StringIndexer to convert them to numeric indices (PySpark’s ALS model obliges us to do so).

After I’ve fitted the model, I can get the top 3 recommendations for each user like so:

recs = (
    model
    .recommendForAllUsers(3)
)

The recs dataframe looks like so:

+-----------+--------------------+
|userIdIndex|     recommendations|
+-----------+--------------------+
|       1580|[[10096,3.6725707...|
|       4900|[[10096,3.0137873...|
|       5300|[[10096,2.7274625...|
|       6620|[[10096,2.4493625...|
|       7240|[[10096,2.4928937...|
+-----------+--------------------+
only showing top 5 rows

root
 |-- userIdIndex: integer (nullable = false)
 |-- recommendations: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- productIdIndex: integer (nullable = true)
 |    |    |-- rating: float (nullable = true)

I want to create a huge JSOM dump with this dataframe, and I can like so:

(
    recs
    .toJSON()
    .saveAsTextFile("name_i_must_hide.recs")
)

and a sample of these jsons is:

{
  "userIdIndex": 1580,
  "recommendations": [
    {
      "productIdIndex": 10096,
      "rating": 3.6725707
    },
    {
      "productIdIndex": 10141,
      "rating": 3.61542
    },
    {
      "productIdIndex": 11591,
      "rating": 3.536216
    }
  ]
}

The userIdIndex and productIdIndex keys are due to the StringIndexer transformation.

How can I get the original value of these columns back? I suspect I must use the IndexToString transformer, but I can’t quite figure out how since the data is nested in an array inside the recs Dataframe.

I tried to use a Pipeline evaluator (stages=[StringIndexer, ALS, IndexToString]) but it looks like this evaluator doesn’t support these indexers.

Cheers!

Advertisement

Answer

In both cases you’ll need an access to the list of labels. This can be accessed using either a StringIndexerModel

user_indexer_model = ...  # type: StringIndexerModel
user_labels = user_indexer_model.labels

product_indexer_model = ...  # type: StringIndexerModel
product_labels = product_indexer_model.labels

or column metadata.

For userIdIndex you can just apply IndexToString:

from pyspark.ml.feature import IndexToString

user_id_to_label = IndexToString(
    inputCol="userIdIndex", outputCol="userId", labels=user_labels)
user_id_to_label.transform(recs)

For recommendations you’ll need either udf or expression like this:

from pyspark.sql.functions import array, col, lit, struct

n = 3  # Same as numItems

product_labels_ = array(*[lit(x) for x in product_labels])
recommendations = array(*[struct(
    product_labels_[col("recommendations")[i]["productIdIndex"]].alias("productId"),
    col("recommendations")[i]["rating"].alias("rating")
) for i in range(n)])

recs.withColumn("recommendations", recommendations)
User contributions licensed under: CC BY-SA
9 People found this is helpful
Advertisement