Grundlagen von Big Data mit PySpark
Upendra Devisetty
Science Analyst, CyVerse
Collaborative Filtering findet Nutzer mit ähnlichen Interessen
Häufig für Empfehlungssysteme verwendet
Ansätze beim Collaborative Filtering:
User-User Collaborative Filtering: Findet Nutzer ähnlich zum Zielnutzer
Item-Item Collaborative Filtering: Findet/empfiehlt Items, die Items des Zielnutzers ähneln
Die Klasse Rating ist ein Wrapper um ein Tupel (user, product, rating)
Nützlich zum Parsen des RDD und Erstellen eines Tupels aus user, product und rating
from pyspark.mllib.recommendation import Rating
r = Rating(user = 1, product = 2, rating = 5.0)
(r[0], r[1], r[2])
(1, 2, 5.0)
Train-/Test-Split ist wichtig zur Bewertung prädiktiver Modelle
Meist geht ein größerer Anteil in das Training als in den Test
PySparks randomSplit() teilt per Zufall nach Gewichten und gibt mehrere RDDs zurück
data = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
training, test=data.randomSplit([0.6, 0.4])
training.collect()
test.collect()
[1, 2, 5, 6, 9, 10]
[3, 4, 7, 8]
Der ALS-Algorithmus (Alternating Least Squares) in spark.mllib bietet Collaborative Filtering
ALS.train(ratings, rank, iterations)
r1 = Rating(1, 1, 1.0)
r2 = Rating(1, 2, 2.0)
r3 = Rating(2, 1, 2.0)
ratings = sc.parallelize([r1, r2, r3])
ratings.collect()
[Rating(user=1, product=1, rating=1.0),
Rating(user=1, product=2, rating=2.0),
Rating(user=2, product=1, rating=2.0)]
model = ALS.train(ratings, rank=10, iterations=10)
Die Methode predictAll() gibt vorhergesagte Bewertungen für Nutzer-Produkt-Paare zurück
Sie nimmt ein RDD ohne Ratings und erzeugt die Bewertungen
unrated_RDD = sc.parallelize([(1, 2), (1, 1)])
predictions = model.predictAll(unrated_RDD)
predictions.collect()
[Rating(user=1, product=1, rating=1.0000278574351853),
Rating(user=1, product=2, rating=1.9890355703778122)]
rates = ratings.map(lambda x: ((x[0], x[1]), x[2]))
rates.collect()
[((1, 1), 1.0), ((1, 2), 2.0), ((2, 1), 2.0)]
preds = predictions.map(lambda x: ((x[0], x[1]), x[2])) preds.collect()[((1, 1), 1.000027857), ((1, 2), 1.9890355703)]
rates_preds = rates.join(preds)
rates_preds.collect()
[((1, 2), (2.0, 1.9890355703)), ((1, 1), (1.0, 1.000027857))]
Die MSE ist der Durchschnitt des Quadrats von (actual rating - predicted rating)
MSE = rates_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean()
Grundlagen von Big Data mit PySpark