Building Recommendation Engines with PySpark
Jamen Long
Data Scientist at Nike
# Split data
(training_data, test_data) = movie_ratings.randomSplit([0.8, 0.2])
# Build ALS model
from pyspark.ml.recommendation import ALS
als = ALS(userCol="userId", itemCol="movieId", ratingCol="rating",
rank=25, maxIter=100, regParam=.05, nonnegative=True,
coldStartStrategy="drop", implicitPrefs=False)
# Fit model to training data
model = als.fit(training_data)
# Generate predictions on test_data
predictions = model.transform(test_data)
# Tell Spark how to evaluate predictions
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",
predictionCol="prediction")
# Obtain and print RMSE
rmse = evaluator.evaluate(predictions)
print ("RMSE: "), rmse
RMSE: 1.45
ParamGridBuilder()
CrossValidator()
# Imports ParamGridBuilder package from pyspark.ml.tuning import ParamGridBuilder
# Creates a ParamGridBuilder param_grid = ParamGridBuilder()
# Imports ParamGridBuilder package
from pyspark.ml.tuning import ParamGridBuilder
# Creates a ParamGridBuilder, and adds hyperparameters
param_grid = ParamGridBuilder()
.addGrid(als.rank, [])
.addGrid(als.maxIter, [])
.addGrid(als.regParam, [])
# Imports ParamGridBuilder package
from pyspark.ml.tuning import ParamGridBuilder
# Creates a ParamGridBuilder, and adds hyperparameters and values
param_grid = ParamGridBuilder()
.addGrid(als.rank, [5, 40, 80, 120])
.addGrid(als.maxIter, [5, 100, 250, 500])
.addGrid(als.regParam, [.05, .1, 1.5])
.build()
# Imports CrossValidator package
from pyspark.ml.tuning import CrossValidator
# Creates cross validator and tells Spark what to use when training # and evaluating a model
cv = CrossValidator(estimator = als,
estimatorParamMaps = param_grid,
evaluator = evaluator,
numFolds = 5)
# Imports CrossValidator package
from pyspark.ml.tuning import CrossValidator
# Instantiates a cross validator
cv = CrossValidator()
# Imports CrossValidator package
from pyspark.ml.tuning import CrossValidator
# Tells Spark what to use when training a model
cv = CrossValidator(estimator = als,
estimatorParamMaps = param_grid,
)
# Imports CrossValidator package
from pyspark.ml.tuning import CrossValidator
# Tells Spark what alg, hyperparameter values, how to evaluate
# each model and number of folds to use during training
cv = CrossValidator(estimator = als,
estimatorParamMaps = param_grid,
evaluator = evaluator,
numFolds = 5)
# Create training and test set (80/20 split)
(training, test) = movie_ratings.randomSplit([0.8, 0.2])
# Build generic ALS model without hyperparameters
als = ALS(userCol="userId", itemCol="movieId", ratingCol="rating",
coldStartStrategy="drop", nonnegative = True,
implicitPrefs = False)
# Create training and test set (80/20 split)
(training, test) = movie_ratings.randomSplit([0.8, 0.2])
# Build generic ALS model without hyperparameters
als = ALS(userCol="userId", itemCol="movieId", ratingCol="rating",
coldStartStrategy="drop", nonnegative = True,
implicitPrefs = False)
# Tell Spark what values to try for each hyperparameter
from pyspark.ml.tuning import ParamGridBuilder
param_grid = ParamGridBuilder()
.addGrid(als.rank, [5, 40, 80, 120])
.addGrid(als.maxIter, [5, 100, 250, 500])
.addGrid(als.regParam, [.05, .1, 1.5])
.build()
# Create training and test set (80/20 split)
(training, test) = movie_ratings.randomSplit([0.8, 0.2])
# Build generic ALS model without hyperparameters
als = ALS(userCol="userId", itemCol="movieId", ratingCol="rating",
coldStartStrategy="drop", nonnegative = True,
implicitPrefs = False)
# Tell Spark what values to try for each hyperparameter
from pyspark.ml.tuning import ParamGridBuilder
param_grid = ParamGridBuilder()
.addGrid(als.rank, [5, 40, 80, 120])
.addGrid(als.maxIter, [5, 100, 250, 500])
.addGrid(als.regParam, [.05, .1, 1.5])
.build()
# Tell Spark how to evaluate model performance
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",
predictionCol="prediction")
# Build generic ALS model without hyperparameters
als = ALS(userCol="userId", itemCol="movieId", ratingCol="rating",
coldStartStrategy="drop", nonnegative = True,
implicitPrefs = False)
# Tell Spark what values to try for each hyperparameter
from pyspark.ml.tuning import ParamGridBuilder
param_grid = ParamGridBuilder()
.addGrid(als.rank, [5, 40, 80, 120])
.addGrid(als.maxIter, [5, 100, 250, 500])
.addGrid(als.regParam, [.05, .1, 1.5])
.build()
# Tell Spark how to evaluate model performance
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",
predictionCol="prediction")
# Build cross validation step using CrossValidator
from pyspark.ml.tuning import CrossValidator
cv = CrossValidator(estimator = als,
estimatorParamMaps = param_grid,
evaluator = evaluator,
numFolds = 5)
# Tell Spark what values to try for each hyperparameter
from pyspark.ml.tuning import ParamGridBuilder
param_grid = ParamGridBuilder()
.addGrid(als.rank, [5, 40, 80, 120])
.addGrid(als.maxIter, [5, 100, 250, 500])
.addGrid(als.regParam, [.05, .1, 1.5])
.build()
# Tell Spark how to evaluate model performance
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",
predictionCol="prediction")
# Build cross validation step using CrossValidator
from pyspark.ml.tuning import CrossValidator
cv = CrossValidator(estimator = als,
estimatorParamMaps = param_grid,
evaluator = evaluator,
numFolds = 5)
# Run the cv on the training data
model = cv.fit(training)
# Extract best combination of values from cross validation
best_model = model.bestModel
# Extract best combination of values from cross validation
best_model = model.bestModel
# Generate test set predictions and evaluate using RMSE
predictions = best_model.transform(test)
rmse = evaluator.evaluate(predictions)
# Print evaluation metrics and model parameters
print ("**Best Model**")
print ("RMSE = "), rmse
print (" Rank: "), best_model.rank
print (" MaxIter: "), best_model._java_obj.parent().getMaxIter()
print (" RegParam: "), best_model._java_obj.parent().getRegParam()
Building Recommendation Engines with PySpark