Fondamenti di Big Data con PySpark
Upendra Devisetty
Science Analyst, CyVerse
Collaborative filtering is finding users that share common interests
Collaborative filtering is commonly used for recommender systems
Collaborative filtering approaches:
User-User Collaborative filtering: Finds users that are similar to the target user
Item-Item Collaborative filtering: Finds and recommends items that are similar to items with the target user
The Rating class is a wrapper around tuple (user, product and rating)
Useful for parsing the RDD and creating a tuple of user, product and rating
from pyspark.mllib.recommendation import Rating
r = Rating(user = 1, product = 2, rating = 5.0)
(r[0], r[1], r[2])
(1, 2, 5.0)
Splitting data into training and testing sets is important for evaluating predictive modeling
Typically a large portion of data is assigned to training compared to testing data
PySpark's randomSplit() method randomly splits with the provided weights and returns multiple RDDs
data = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
training, test=data.randomSplit([0.6, 0.4])
training.collect()
test.collect()
[1, 2, 5, 6, 9, 10]
[3, 4, 7, 8]
Alternating Least Squares (ALS) algorithm in spark.mllib provides collaborative filtering
ALS.train(ratings, rank, iterations)
r1 = Rating(1, 1, 1.0)
r2 = Rating(1, 2, 2.0)
r3 = Rating(2, 1, 2.0)
ratings = sc.parallelize([r1, r2, r3])
ratings.collect()
[Rating(user=1, product=1, rating=1.0),
Rating(user=1, product=2, rating=2.0),
Rating(user=2, product=1, rating=2.0)]
model = ALS.train(ratings, rank=10, iterations=10)
The predictAll() method returns a list of predicted ratings for input user and product pair
The method takes in a RDD without ratings to generate the ratings
unrated_RDD = sc.parallelize([(1, 2), (1, 1)])
predictions = model.predictAll(unrated_RDD)
predictions.collect()
[Rating(user=1, product=1, rating=1.0000278574351853),
Rating(user=1, product=2, rating=1.9890355703778122)]
rates = ratings.map(lambda x: ((x[0], x[1]), x[2]))
rates.collect()
[((1, 1), 1.0), ((1, 2), 2.0), ((2, 1), 2.0)]
preds = predictions.map(lambda x: ((x[0], x[1]), x[2])) preds.collect()[((1, 1), 1.000027857), ((1, 2), 1.9890355703)]
rates_preds = rates.join(preds)
rates_preds.collect()
[((1, 2), (2.0, 1.9890355703)), ((1, 1), (1.0, 1.000027857))]
The MSE is the average value of the square of (actual rating - predicted rating)
MSE = rates_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean()
Fondamenti di Big Data con PySpark