I’m using PySpark to do collaborative filtering using ALS. My original user and item id’s are strings, so I used StringIndexer to convert them to numeric indices (PySpark’s ALS model obliges us to do so).
After I’ve fitted the model, I can get the top 3 recommendations for each user like so:
recs = (
model
.recommendForAllUsers(3)
)
The recs dataframe looks like so:
+-----------+--------------------+ |userIdIndex| recommendations| +-----------+--------------------+ | 1580|[[10096,3.6725707...| | 4900|[[10096,3.0137873...| | 5300|[[10096,2.7274625...| | 6620|[[10096,2.4493625...| | 7240|[[10096,2.4928937...| +-----------+--------------------+ only showing top 5 rows root |-- userIdIndex: integer (nullable = false) |-- recommendations: array (nullable = true) | |-- element: struct (containsNull = true) | | |-- productIdIndex: integer (nullable = true) | | |-- rating: float (nullable = true)
I want to create a huge JSOM dump with this dataframe, and I can like so:
(
recs
.toJSON()
.saveAsTextFile("name_i_must_hide.recs")
)
and a sample of these jsons is:
{
"userIdIndex": 1580,
"recommendations": [
{
"productIdIndex": 10096,
"rating": 3.6725707
},
{
"productIdIndex": 10141,
"rating": 3.61542
},
{
"productIdIndex": 11591,
"rating": 3.536216
}
]
}
The userIdIndex and productIdIndex keys are due to the StringIndexer transformation.
How can I get the original value of these columns back? I suspect I must use the IndexToString transformer, but I can’t quite figure out how since the data is nested in an array inside the recs Dataframe.
I tried to use a Pipeline evaluator (stages=[StringIndexer, ALS, IndexToString]) but it looks like this evaluator doesn’t support these indexers.
Cheers!
Advertisement
Answer
In both cases you’ll need an access to the list of labels. This can be accessed using either a StringIndexerModel
user_indexer_model = ... # type: StringIndexerModel user_labels = user_indexer_model.labels product_indexer_model = ... # type: StringIndexerModel product_labels = product_indexer_model.labels
or column metadata.
For userIdIndex you can just apply IndexToString:
from pyspark.ml.feature import IndexToString
user_id_to_label = IndexToString(
inputCol="userIdIndex", outputCol="userId", labels=user_labels)
user_id_to_label.transform(recs)
For recommendations you’ll need either udf or expression like this:
from pyspark.sql.functions import array, col, lit, struct
n = 3 # Same as numItems
product_labels_ = array(*[lit(x) for x in product_labels])
recommendations = array(*[struct(
product_labels_[col("recommendations")[i]["productIdIndex"]].alias("productId"),
col("recommendations")[i]["rating"].alias("rating")
) for i in range(n)])
recs.withColumn("recommendations", recommendations)