I’m using PySpark to do collaborative filtering using ALS. My original user and item id’s are strings, so I used StringIndexer
to convert them to numeric indices (PySpark’s ALS model obliges us to do so).
After I’ve fitted the model, I can get the top 3 recommendations for each user like so:
recs = ( model .recommendForAllUsers(3) )
The recs
dataframe looks like so:
+-----------+--------------------+ |userIdIndex| recommendations| +-----------+--------------------+ | 1580|[[10096,3.6725707...| | 4900|[[10096,3.0137873...| | 5300|[[10096,2.7274625...| | 6620|[[10096,2.4493625...| | 7240|[[10096,2.4928937...| +-----------+--------------------+ only showing top 5 rows root |-- userIdIndex: integer (nullable = false) |-- recommendations: array (nullable = true) | |-- element: struct (containsNull = true) | | |-- productIdIndex: integer (nullable = true) | | |-- rating: float (nullable = true)
I want to create a huge JSOM dump with this dataframe, and I can like so:
( recs .toJSON() .saveAsTextFile("name_i_must_hide.recs") )
and a sample of these jsons is:
{ "userIdIndex": 1580, "recommendations": [ { "productIdIndex": 10096, "rating": 3.6725707 }, { "productIdIndex": 10141, "rating": 3.61542 }, { "productIdIndex": 11591, "rating": 3.536216 } ] }
The userIdIndex
and productIdIndex
keys are due to the StringIndexer
transformation.
How can I get the original value of these columns back? I suspect I must use the IndexToString
transformer, but I can’t quite figure out how since the data is nested in an array inside the recs
Dataframe.
I tried to use a Pipeline
evaluator (stages=[StringIndexer, ALS, IndexToString]
) but it looks like this evaluator doesn’t support these indexers.
Cheers!
Advertisement
Answer
In both cases you’ll need an access to the list of labels. This can be accessed using either a StringIndexerModel
user_indexer_model = ... # type: StringIndexerModel user_labels = user_indexer_model.labels product_indexer_model = ... # type: StringIndexerModel product_labels = product_indexer_model.labels
or column metadata.
For userIdIndex
you can just apply IndexToString
:
from pyspark.ml.feature import IndexToString user_id_to_label = IndexToString( inputCol="userIdIndex", outputCol="userId", labels=user_labels) user_id_to_label.transform(recs)
For recommendations you’ll need either udf
or expression like this:
from pyspark.sql.functions import array, col, lit, struct n = 3 # Same as numItems product_labels_ = array(*[lit(x) for x in product_labels]) recommendations = array(*[struct( product_labels_[col("recommendations")[i]["productIdIndex"]].alias("productId"), col("recommendations")[i]["rating"].alias("rating") ) for i in range(n)]) recs.withColumn("recommendations", recommendations)