Vertraagde pipelines bouwen

Parallel programmeren met Dask in Python

James Fulton

Climate Informatics Researcher

Chunks met data

Een diagram met een dataset die op een harde schijf past, maar niet in het RAM-geheugen.

Een diagram waarin de dataset in meerdere stukken is gesplitst. Het geheel past niet in RAM, maar de losse stukken wel.

Parallel programmeren met Dask in Python

Spotify-nummersdataset

files = [
  '2005_tracks.csv',
  '2006_tracks.csv',
  '2007_tracks.csv',
  '2008_tracks.csv',
  '2009_tracks.csv',
  '2010_tracks.csv',
  ...
  '2020_tracks.csv',
]
Parallel programmeren met Dask in Python

Spotify-nummersdataset

                       name  duration_ms release_date  ...
0     Aldrig (feat. Carmon)       247869   2019-01-01  ...
2  2019 - The Year to Build       288105   2019-01-01  ...
3                 Na zawsze       186812   2019-01-01  ...
4         Humo en la Trampa       258354   2019-01-01  ...
5                     Au Au       176000   2019-01-01  ...
...                     ...          ...          ...  ...
Parallel programmeren met Dask in Python

De data analyseren

import pandas as pd

maximums = []

for file in files:
    # Elk bestand laden
    df = pd.read_csv(file)

# Maximale tracklengte per bestand max_length = df['duration_ms'].max()
# Deze maximum opslaan maximums.append(max_length)
# Het maximum van alle maxima absolute_maximum = max(maximums)
Parallel programmeren met Dask in Python

De data analyseren

import pandas as pd

maximums = []

for file in files:
    # Elk bestand laden
    df = delayed(pd.read_csv)(file) # <------- laden uitstellen
    # Maximale tracklengte per bestand
    max_length = df['duration_ms'].max()
    # Deze maximum opslaan
    maximums.append(max_length)

# Het maximum van alle maxima
absolute_maximum = max(maximums)
Parallel programmeren met Dask in Python

De data analyseren

import pandas as pd

maximums = []

for file in files:
    # Elk bestand laden
    df = delayed(pd.read_csv)(file) # <------- laden uitstellen
    # Maximale tracklengte per bestand
    max_length = df['duration_ms'].max()
    # Deze maximum opslaan
    maximums.append(max_length)

# Het maximum van alle maxima vinden
absolute_maximum = delayed(max)(maximums) # <------- max() uitstellen
Parallel programmeren met Dask in Python

Methoden van een Delayed-object gebruiken

import pandas as pd

maximums = []

for file in files:
    df = delayed(pd.read_csv)(file)
    # Gebruik de .max()-methode
    max_length = df['duration_ms'].max()

    maximums.append(max_length)


absolute_maximum = delayed(max)(maximums)
print(max_length)
Delayed('max-0602855d-3ee6-4c43-a4d2-...')
  • Methoden en eigenschappen van Delayed-objecten geven nieuwe Delayed-objecten terug
print(df.shape)
print(df.shape.compute())
Delayed('getattr-bc1e8838ab...')
(11907, 12)
Parallel programmeren met Dask in Python

Methoden van een Delayed-object gebruiken

import pandas as pd

maximums = []

for file in files:
    df = delayed(pd.read_csv)(file)
    # Gebruik een niet-bestaande methode
    max_length = df['duration_ms'].fake()

    maximums.append(max_length)


absolute_maximum = delayed(max)(maximums)
print(max_length)
Delayed('max-6c026036-5daf-4b2-...')
  • Methoden draaien pas na .compute()
print(max_length.compute())
...
AttributeError: 'Series' object has no 
attribute 'fake'
Parallel programmeren met Dask in Python

Methoden van een Delayed-object gebruiken

import pandas as pd

maximums = []

for file in files:
    df = delayed(pd.read_csv)(file)

    max_length = df['duration_ms'].max()
    # Delayed-object aan lijst toevoegen
    maximums.append(max_length)

# Uitgestelde max op lijst van Delayed-objecten
absolute_maximum = delayed(max)(maximums)

maximums is een lijst met Delayed-objecten

print(maximums)
[Delayed('max-80b...'), 
Delayed('max-fa15d...', 
...]
Parallel programmeren met Dask in Python

Lijsten met Delayed-objecten berekenen

import pandas as pd

maximums = []

for file in files:
    df = delayed(pd.read_csv)(file)

    max_length = df['duration_ms'].max()
    # Delayed-object aan lijst toevoegen
    maximums.append(max_length)

# Alle maxima berekenen
all_maximums = dask.compute(maximums)
print(all_maximums)
([2539418, 4368000, ...
... 4511716, 4864333],)
Parallel programmeren met Dask in Python

Lijsten met Delayed-objecten berekenen

import pandas as pd

maximums = []

for file in files:
    df = delayed(pd.read_csv)(file) 

    max_length = df['duration_ms'].max()

    maximums.append(max_length)

# Alle maxima berekenen
all_maximums = dask.compute(maximums)[0]
print(all_maximums)
[2539418, 4368000, ...
... 4511716, 4864333]
Parallel programmeren met Dask in Python

Wel of niet uitstellen

def get_max_track(df):
    return df['duration_ms'].max()

for file in files:
    df = delayed(pd.read_csv)(file) 
    # Functie gebruiken om max te vinden
    max_length = get_max_track(df)

    maximums.append(max_length)


absolute_maximum = delayed(max)(maximums)
Parallel programmeren met Dask in Python

Diepere taakgrafiek

absolute_maximum.visualize()

Een taakgrafiek met de stappen om het absolute maximum over bestanden te berekenen. De grafiek is groot maar eenvoudig.

Parallel programmeren met Dask in Python

Laten we oefenen!

Parallel programmeren met Dask in Python

Preparing Video For Download...