PySpark em escala

Introdução ao PySpark

Benjamin Schmidt

Data Engineer

Aproveitando a escala

  • PySpark funciona bem com gigabytes e terabytes de dados
  • Com PySpark, o objetivo é velocidade e eficiência
  • Entender a execução do PySpark traz mais eficiência
  • Use broadcast para gerenciar todo o cluster
joined_df = large_df.join(broadcast(small_df), 
                          on="key_column", how="inner")
joined_df.show()
Introdução ao PySpark

Planos de execução

# Usando explain() para ver o plano de execução
df.filter(df.Age > 40).select("Name").explain()
== Plano Físico ==
*(1) Filtro (isnotnull(Age) AND (Age > 30))
+- Scan ExistingRDD[Name:String, Age:Int]
1 https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.explain.html
Introdução ao PySpark

Cache e persistência de DataFrames

  • Cache: Armazena dados na memória para acesso mais rápido em datasets menores
  • Persistência: Armazena dados em diferentes níveis de armazenamento para datasets maiores
df = spark.read.csv("large_dataset.csv", header=True, inferSchema=True)

# Cache do DataFrame
df.cache()

# Realizar múltiplas operações no DataFrame em cache df.filter(df["column1"] > 50).show() df.groupBy("column2").count().show()
Introdução ao PySpark

Persistência de DataFrames com diferentes níveis de armazenamento

# Persistir o DataFrame com nível de armazenamento
from pyspark import StorageLevel

df.persist(StorageLevel.MEMORY_AND_DISK)

# Realizar transformações result = df.groupBy("column3").agg({"column4": "sum"}) result.show() # Despersistir após o uso df.unpersist()
Introdução ao PySpark

Otimização do PySpark

  • Pequenas Seções: Mais dados, operação mais lenta: Prefira map() a groupby() pela seletividade
  • Joins de Broadcast: Broadcast usa todo o processamento, mesmo em datasets menores
  • Evite Ações Repetidas: Ações repetidas no mesmo dado custam tempo e processamento, sem benefício
Introdução ao PySpark

Vamos praticar!

Introdução ao PySpark

Preparing Video For Download...