Big Data Fundamentals met PySpark
Upendra Devisetty
Science Analyst, CyVerse
Collaborative filtering vindt gebruikers met vergelijkbare interesses
Veelgebruikt voor aanbevelingssystemen
Benaderingen voor collaborative filtering:
User-User collaborative filtering: vindt gebruikers die lijken op de doelgebruiker
Item-Item collaborative filtering: vindt en raadt items aan die lijken op items van de doelgebruiker
De Rating-klasse is een wrapper rond de tuple (user, product, rating)
Handig om de RDD te parsen en een tuple van user, product en rating te maken
from pyspark.mllib.recommendation import Rating
r = Rating(user = 1, product = 2, rating = 5.0)
(r[0], r[1], r[2])
(1, 2, 5.0)
Train-test-splits zijn belangrijk voor het evalueren van voorspellende modellen
Meestal gaat een groter deel naar training dan naar test
PySpark's randomSplit() splitst willekeurig volgens gewichten en geeft meerdere RDD's terug
data = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
training, test=data.randomSplit([0.6, 0.4])
training.collect()
test.collect()
[1, 2, 5, 6, 9, 10]
[3, 4, 7, 8]
De Alternating Least Squares (ALS)-algoritme in spark.mllib biedt collaborative filtering
ALS.train(ratings, rank, iterations)
r1 = Rating(1, 1, 1.0)
r2 = Rating(1, 2, 2.0)
r3 = Rating(2, 1, 2.0)
ratings = sc.parallelize([r1, r2, r3])
ratings.collect()
[Rating(user=1, product=1, rating=1.0),
Rating(user=1, product=2, rating=2.0),
Rating(user=2, product=1, rating=2.0)]
model = ALS.train(ratings, rank=10, iterations=10)
De methode predictAll() geeft voorspelde ratings voor user-productparen
De methode neemt een RDD zonder ratings en genereert de ratings
unrated_RDD = sc.parallelize([(1, 2), (1, 1)])
predictions = model.predictAll(unrated_RDD)
predictions.collect()
[Rating(user=1, product=1, rating=1.0000278574351853),
Rating(user=1, product=2, rating=1.9890355703778122)]
rates = ratings.map(lambda x: ((x[0], x[1]), x[2]))
rates.collect()
[((1, 1), 1.0), ((1, 2), 2.0), ((2, 1), 2.0)]
preds = predictions.map(lambda x: ((x[0], x[1]), x[2])) preds.collect()[((1, 1), 1.000027857), ((1, 2), 1.9890355703)]
rates_preds = rates.join(preds)
rates_preds.collect()
[((1, 2), (2.0, 1.9890355703)), ((1, 1), (1.0, 1.000027857))]
De MSE is het gemiddelde van (werkelijke rating - voorspelde rating) in het kwadraat
MSE = rates_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean()
Big Data Fundamentals met PySpark