PySpark'e Giriş
Benjamin Schmidt
Data Engineer
joined_df = large_df.join(broadcast(small_df),
on="key_column", how="inner")
joined_df.show()
# Yürütme planını görmek için explain() kullanma
df.filter(df.Age > 40).select("Name").explain()
== Fiziksel Plan ==
*(1) Filtre (isnotnull(Age) AND (Age > 30))
+- Scan ExistingRDD[Name:String, Age:Int]
df = spark.read.csv("large_dataset.csv", header=True, inferSchema=True) # DataFrame'i önbelleğe al df.cache()# Önbelleğe alınmış DataFrame üzerinde birden fazla işlem yap df.filter(df["column1"] > 50).show() df.groupBy("column2").count().show()
# Depolama seviyesi ile DataFrame'i kalıcı kılma from pyspark import StorageLevel df.persist(StorageLevel.MEMORY_AND_DISK)# Dönüşümleri gerçekleştir result = df.groupBy("column3").agg({"column4": "sum"}) result.show() # Kullanımdan sonra kalıcılığı kaldır df.unpersist()
map() gibi seçici yöntemleri groupby() yerine tercih edinPySpark'e Giriş