Parallel programmeren met Dask in Python
James Fulton
Climate Informatics Researcher
import pandas as pd# Lees één csv-bestand pandas_df = pd.read_csv( "dataset/chunk1.csv" )
Dit leest direct één CSV-bestand.
import dask.dataframe as dd# Alle csv-bestanden lui inlezen dask_df = dd.read_csv( "dataset/*.csv" )
Dit leest alle CSV’s in de map dataset lui in.
print(dask_df)
Dask DataFrame-structuur:
ID col1 col2 col3 col4 ...
npartitions=3
int64 object object int64 float64 ...
... ... ... ... ... ...
... ... ... ... ... ...
... ... ... ... ... ...
Dask Name: getitem, 3 taken
dask.visualize(dask_df)

# Stel het maximum geheugen per chunk in dask_df = dd.read_csv("dataset/*.csv", blocksize="10MB")print(dask_df)
Dask DataFrame-structuur:
ID col1 col2 col3 col4 ...
npartitions=7
int64 object object int64 float64 ...
... ... ... ... ... ...
... ... ... ... ... ...
... ... ... ... ... ...
Dask Name: getitem, 7 taken
# Stel het maximum geheugen per chunk in
dask_df = dd.read_csv("dataset/*.csv", blocksize="10MB")
Waarom 7 partitions?
size file
9M dataset/chunk1.csv
18M dataset/chunk2.csv
32M dataset/chunk3.csv
# Stel het maximum geheugen per chunk in
dask_df = dd.read_csv("dataset/*.csv", blocksize="10MB")
Waarom 7 partitions?
size file
9M dataset/chunk1.csv # wordt 1 partition
18M dataset/chunk2.csv # wordt 2 partitions
32M dataset/chunk3.csv # wordt 4 partitions
col1 = dask_df['col1']
dask_df['double_col1'] = 2 * col1
dask_df.std()
dask_df.min()
dask_df.groupby(col1).mean()
dask_df.nlargest(n=3, columns='col1')
import pandas as pd # String naar datetime converteren pd.to_datetime(pandas_df['start_date'])# Datetime-attributen openen pandas_df['start_date'].dt.year pandas_df['start_date'].dt.day pandas_df['start_date'].dt.hour pandas_df['start_date'].dt.minute
import dask.dataframe as dd # String naar datetime converteren dd.to_datetime(dask_df['start_date'])# Datetime-attributen openen dask_df['start_date'].dt.year dask_df['start_date'].dt.day dask_df['start_date'].dt.hour dask_df['start_date'].dt.minute
# Toon 5 rijen
print(dask_df.head())
ID double_col1 col1 col2 col3 ...
0 543795 20 10 436 0 ...
1 874535 24 12 268 0 ...
2 781326 62 31 211 0 ...
3 112457 18 9 898 1 ...
4 103256 142 71 663 0 ...
# Converteer luie Dask DataFrame naar pandas in het geheugen
results_df = df.compute()
# 7 partitions (chunks) dus 7 uitvoerbestanden
dask_df.to_csv('answer/part-*.csv')
part-0.csv
part-1.csv
part-2.csv
part-3.csv
part-4.csv
part-5.csv
part-6.csv
# Lees van parquet
dask_df = dd.read_parquet('dataset_parquet')
# Schrijf naar parquet
dask_df.to_parquet('answer_parquet')
Parallel programmeren met Dask in Python