Programmazione parallela con Dask in Python
James Fulton
Climate Informatics Researcher
import pandas as pd# Leggi un singolo file CSV pandas_df = pd.read_csv( "dataset/chunk1.csv" )
Questo legge subito un singolo file CSV.
import dask.dataframe as dd# Leggi pigramente tutti i file CSV dask_df = dd.read_csv( "dataset/*.csv" )
Questo legge in modo lazy tutti i CSV nella cartella dataset.
print(dask_df)
Struttura Dask DataFrame:
ID col1 col2 col3 col4 ...
npartitions=3
int64 object object int64 float64 ...
... ... ... ... ... ...
... ... ... ... ... ...
... ... ... ... ... ...
Dask Name: getitem, 3 tasks
dask.visualize(dask_df)

# Imposta la memoria massima di un blocco dask_df = dd.read_csv("dataset/*.csv", blocksize="10MB")print(dask_df)
Struttura Dask DataFrame:
ID col1 col2 col3 col4 ...
npartitions=7
int64 object object int64 float64 ...
... ... ... ... ... ...
... ... ... ... ... ...
... ... ... ... ... ...
Dask Name: getitem, 7 tasks
# Imposta la memoria massima di un blocco
dask_df = dd.read_csv("dataset/*.csv", blocksize="10MB")
Perché 7 partizioni?
size file
9M dataset/chunk1.csv
18M dataset/chunk2.csv
32M dataset/chunk3.csv
# Imposta la memoria massima di un blocco
dask_df = dd.read_csv("dataset/*.csv", blocksize="10MB")
Perché 7 partizioni?
size file
9M dataset/chunk1.csv # diventa 1 partizione
18M dataset/chunk2.csv # diventa 2 partizioni
32M dataset/chunk3.csv # diventa 4 partizioni
col1 = dask_df['col1']
dask_df['double_col1'] = 2 * col1
dask_df.std()
dask_df.min()
dask_df.groupby(col1).mean()
dask_df.nlargest(n=3, columns='col1')
import pandas as pd # Converti stringa in datetime pd.to_datetime(pandas_df['start_date'])# Attributi datetime pandas_df['start_date'].dt.year pandas_df['start_date'].dt.day pandas_df['start_date'].dt.hour pandas_df['start_date'].dt.minute
import dask.dataframe as dd # Converti stringa in datetime dd.to_datetime(dask_df['start_date'])# Attributi datetime dask_df['start_date'].dt.year dask_df['start_date'].dt.day dask_df['start_date'].dt.hour dask_df['start_date'].dt.minute
# Mostra 5 righe
print(dask_df.head())
ID double_col1 col1 col2 col3 ...
0 543795 20 10 436 0 ...
1 874535 24 12 268 0 ...
2 781326 62 31 211 0 ...
3 112457 18 9 898 1 ...
4 103256 142 71 663 0 ...
# Converte il Dask DataFrame lazy in un pandas DataFrame in memoria
results_df = df.compute()
# 7 partizioni (blocchi) quindi 7 file di output
dask_df.to_csv('answer/part-*.csv')
part-0.csv
part-1.csv
part-2.csv
part-3.csv
part-4.csv
part-5.csv
part-6.csv
# Leggi da Parquet
dask_df = dd.read_parquet('dataset_parquet')
# Salva in Parquet
dask_df.to_parquet('answer_parquet')
Programmazione parallela con Dask in Python