Building Recommendation Engines with PySpark
Jamen Long
Data Scientist at Nike
$$\text{RMSE} = \sqrt{\frac{\Sigma(y_{\text{pred}} - y_{\text{actual}})^2}{N}}$$
+----+------+
|pred|actual|
+----+------+
| 5| 4.5|
| 3| 3.5|
| 4| 4|
| 2| 1|
+----+------+
+----+------+----+
|pred|actual|diff|
+----+------+----+
| 5| 4.5| 0.5|
| 3| 3.5|-0.5|
| 4| 4| 0.0|
| 2| 1| 1.0|
+----+------+----+
+----+------+----+-------+
|pred|actual|diff|diff_sq|
+----+------+----+-------+
| 5| 4.5| 0.5| 0.25|
| 3| 3.5|-0.5| 0.25|
| 4| 4| 0.0| 0.00|
| 2| 1| 1.0| 1.00|
+----+------+----+-------+
+----+------+----+-------+
|pred|actual|diff|diff_sq|
+----+------+----+-------+
| 5| 4.5| 0.5| 0.25|
| 3| 3.5|-0.5| 0.25|
| 4| 4| 0.0| 0.00|
| 2| 1| 1.0| 1.00|
+----+------+----+-------+
sum of diff_sq = 1.5
+----+------+----+-------+
|pred|actual|diff|diff_sq|
+----+------+----+-------+
| 5| 4.5| 0.5| 0.25|
| 3| 3.5|-0.5| 0.25|
| 4| 4| 0.0| 0.00|
| 2| 1| 1.0| 1.00|
+----+------+----+-------+
sum of diff_sq = 1.5
avg of diff_sq = 1.5 / 4 = 0.375
+----+------+----+-------+
|pred|actual|diff|diff_sq|
+----+------+----+-------+
| 5| 4.5| 0.5| 0.25|
| 3| 3.5|-0.5| 0.25|
| 4| 4| 0.0| 0.00|
| 2| 1| 1.0| 1.00|
+----+------+----+-------+
sum of diff_sq = 1.5
avg of diff_sq = 1.5 / 4 = 0.375
RMSE = sq root of avg of diff_sq = 0.61
# Generate top n recommendations for all users
recommendForAllUsers(n) # n is an integer
ALS_recommendations.show()
+------+---------------------+
|userId| recommendations|
+------+---------------------+
| 360|[[65037, 4.491346]...|
| 246|[[3414, 4.8967672]...|
| 346|[[4565, 4.9247236]...|
| 476|[[83318,4.9556283]...|
| 367|[[4632, 4.7018986]...|
| 539|[[1172, 5.2528191]...|
| 599|[[6413, 4.7284415]...|
| 220|[[80, 4.4857406]...|
| 301|[[66665, 5.190159]...|
| 173|[[65037, 4.316745]...|
+------+---------------------+
ALS_recommendations.registerTempTable("ALS_recs_temp")
clean_recs = spark.sql("SELECT userId,
movieIds_and_ratings.movieId AS movieId,
movieIds_and_ratings.rating AS prediction
FROM ALS_recs_temp
LATERAL VIEW explode(recommendations) exploded_table
AS movieIds_and_ratings")
exploded_recs = spark.sql("SELECT uderId,
explode(recommendations) AS MovieRec
FROM ALS_recs_temp")
exploded_recs.show()
+------+---------------------------------------+
|userId| MovieRec|
+------+---------------------------------------+
| 360|{"movieId": 65037, "rating": 4.4913464}|
| 360|{"movieId": 59684, "rating": 4.4832921}|
| 360|{"movieId": 31435, "rating": 4.4822811}|
| 360|{"movieId": 593, "rating": 4.456215} |
| 360|{"movieId": 67504, "rating": 4.4028492}|
| 360|{"movieId": 83411, "rating": 4.3391834}|
| 360|{"movieId": 83318, "rating": 4.3199939}|
| 360|{"movieId": 83359, "rating": 4.3000213}|
| 360|{"movieId": 76170, "rating": 4.2987138}|
| 360|{"movieId": 17, "rating": 4.2539403} |
| 360|{"movieId": 2112, "rating": 4.11893843}|
+------+---------------------------------------+
ALS_recommendations.registerTempTable("ALS_recs_temp")
clean_recs = spark.sql("SELECT userId,
movieIds_and_ratings.movieId AS movieId,
movieIds_and_ratings.rating AS prediction
FROM ALS_recs_temp
LATERAL VIEW explode(recommendations) exploded_table
AS movieIds_and_ratings")
ALS_recommendations.registerTempTable("ALS_recs_temp")
clean_recs = spark.sql("SELECT userId,
movieIds_and_ratings.movieId AS movieId,
movieIds_and_ratings.rating AS prediction
FROM ALS_recs_temp
LATERAL VIEW explode(recommendations) exploded_table
AS movieIds_and_ratings")
clean_recs.show()
+------+------------------+
|userId|movieId|prediction|
+------+------------------+
| 360| 65037| 4.491346|
| 360| 59684| 4.491346|
| 360| 34135| 4.491346|
| 360| 593| 4.453185|
| 360| 67504| 4.389951|
| 360| 83411| 4.389944|
| 360| 83318| 4.389938|
| 360| 83359| 4.373281|
| 360| 76173| 4.190159|
| 360| 5114| 4.116745|
+------+-------+----------+
clean_recs.join(movie_info, ["movieId"], "left").show()
+------+------------------+--------------------+
|userId|movieId|prediction| title|
+------+------------------+--------------------+
| 360| 65037| 4.491346| Ben X (2007)|
| 360| 59684| 4.491346| Lake of Fire (2006)|
| 360| 34135| 4.491346|Rory O Shea Was H...|
| 360| 593| 4.453185|Silence of the La...|
| 360| 67504| 4.389951|Land of Silence a...|
| 360| 83411| 4.389944| Cops (1922)|
| 360| 83318| 4.389938| Goat, The (1921)|
| 360| 83359| 4.373281| Play House, The(...|
| 360| 76173| 4.190159| Micmacs (Micmacs...|
| 360| 5114| 4.116745|Bad and the Beaut...|
+------+------------------+--------------------+
clean_recs.join(movie_ratings, ["userId", "movieId"], "left")
clean_recs.join(movie_ratings, ["userId", "movieId"], "left").show()
+------+------------------+------+
|userId|movieId|prediction|rating|
+------+------------------+------+
| 173| 318| 4.947126| null|
| 150| 318| 4.066513| 5.0|
| 369| 318| 4.514297| 5.0|
| 27| 318| 4.523860| null|
| 42| 318| 4.568357| 5.0|
| 662| 318| 4.242076| 5.0|
| 250| 318| 5.042126| 5.0|
| 94| 318| 4.291757| 5.0|
| 515| 318| 5.165822| null|
| 109| 318| 4.885314| 5.0|
+------+------------------+------+
clean_recs.join(movie_ratings, ["userId", "movieId"], "left")
.filter(movie_ratings.rating.isNull()).show()
+------+------------------+------+
|userId|movieId|prediction|rating|
+------+------------------+------+
| 173| 318| 4.947126| null|
| 27| 318| 4.523860| null|
| 515| 318| 5.165822| null|
| 275| 318| 5.171431| null|
| 503| 318| 4.308533| null|
| 106| 318| 4.688634| null|
| 249| 318| 4.759836| null|
| 368| 318| 3.589334| null|
| 581| 318| 4.717382| null|
| 208| 318| 3.920525| null|
+------+------------------+------+
Building Recommendation Engines with PySpark