Data Engineering'e Giriş
Vincent Vankrunkelsven
Data Engineer @ DataCamp
| customer_id | state | created_at | |
|---|---|---|---|
| 1 | [email protected] | New York | 2019-01-01 07:00:00 |
| customer_id | username | domain | |
|---|---|---|---|
| 1 | [email protected] | jane.doe | theweb.com |
customer_df # Müşteri verilerini içeren Pandas DataFrame'i # E-posta sütununu '@' sembolüne göre 2 sütuna böl split_email = customer_df.email.str.split("@", expand=True)# Bu noktada split_email, ilkinde @ öncesi, # ikincisinde @ sonrası olacak şekilde 2 sütun içerir # Ortaya çıkan DataFrame'den 2 yeni sütun oluştur. customer_df = customer_df.assign( username=split_email[0], domain=split_email[1], )
Verileri PySpark'a aktarın
import pyspark.sql spark = pyspark.sql.SparkSession.builder.getOrCreate()spark.read.jdbc("jdbc:postgresql://localhost:5432/pagila","customer",properties={"user":"repl","password":"password"})
Yeni bir ratings tablosu
| customer_id | film_id | rating |
|---|---|---|
| 1 | 2 | 1 |
| 2 | 1 | 5 |
| 2 | 2 | 3 |
| ... | ... | ... |
Müşteri tablosu
| customer_id | first_name | last_name | ... |
|---|---|---|---|
| 1 | Jane | Doe | ... |
| 2 | Joe | Doe | ... |
| ... | ... | ... | ... |
customer_id, ratings tablosu ile örtüşür
customer_df # Müşteri verilerini içeren PySpark DataFrame'i ratings_df # Puan verilerini içeren PySpark DataFrame'i# Puanları müşteri bazında grupla ratings_per_customer = ratings_df.groupBy("customer_id").mean("rating")# Müşteri kimliği üzerinde join yap customer_df.join( ratings_per_customer, customer_df.customer_id==ratings_per_customer.customer_id )
Data Engineering'e Giriş