Big Data Fundamentals met PySpark
Upendra Devisetty
Science Analyst, CyVerse
Datasets uit de praktijk zijn meestal key/value-paren
Elke rij is een key die naar één of meer waarden verwijst
Pair RDD is een speciale datastructuur voor dit soort datasets
Pair RDD: key is de identifier, value de data
Twee gangbare manieren om pair RDD’s te maken
Zet data om naar key/value-vorm voor een pair RDD
my_tuple = [('Sam', 23), ('Mary', 34), ('Peter', 25)]
pairRDD_tuple = sc.parallelize(my_tuple)
my_list = ['Sam 23', 'Mary 34', 'Peter 25']
regularRDD = sc.parallelize(my_list)
pairRDD_RDD = regularRDD.map(lambda s: (s.split(' ')[0], s.split(' ')[1]))
Alle gewone transformaties werken op pair RDD’s
Geef functies door die op key-value-paren werken, niet op losse elementen
Voorbeelden van pair RDD-transformaties
reduceByKey(func): combineert waarden met dezelfde key
groupByKey(): groepeert waarden met dezelfde key
sortByKey(): retourneert een RDD gesorteerd op key
join(): voegt twee pair RDD’s samen op basis van hun key
reduceByKey() combineert waarden met dezelfde key
Voert parallelle bewerkingen uit per key in de dataset
Het is een transformatie, geen actie
regularRDD = sc.parallelize([("Messi", 23), ("Ronaldo", 34), ("Neymar", 22), ("Messi", 24)]) pairRDD_reducebykey = regularRDD.reduceByKey(lambda x,y : x + y) pairRDD_reducebykey.collect()[('Neymar', 22), ('Ronaldo', 34), ('Messi', 47)]
sortByKey() sorteert een pair RDD op key
Retourneert een RDD gesorteerd op key, oplopend of aflopend
pairRDD_reducebykey_rev = pairRDD_reducebykey.map(lambda x: (x[1], x[0])) pairRDD_reducebykey_rev.sortByKey(ascending=False).collect()[(47, 'Messi'), (34, 'Ronaldo'), (22, 'Neymar')]
groupByKey() groepeert alle waarden met dezelfde key in de pair RDDairports = [("US", "JFK"),("UK", "LHR"),("FR", "CDG"),("US", "SFO")] regularRDD = sc.parallelize(airports) pairRDD_group = regularRDD.groupByKey().collect() for cont, air in pairRDD_group: print(cont, list(air))FR ['CDG'] US ['JFK', 'SFO'] UK ['LHR']
join()-transformatie voegt twee pair RDD’s samen op basis van hun keyRDD1 = sc.parallelize([("Messi", 34),("Ronaldo", 32),("Neymar", 24)])
RDD2 = sc.parallelize([("Ronaldo", 80),("Neymar", 120),("Messi", 100)])
RDD1.join(RDD2).collect()[('Neymar', (24, 120)), ('Ronaldo', (32, 80)), ('Messi', (34, 100))]
Big Data Fundamentals met PySpark