Creare pipeline ritardate

Programmazione parallela con Dask in Python

James Fulton

Climate Informatics Researcher

Chunk di dati

Un diagramma che mostra un dataset abbastanza piccolo da entrare in un hard disk, ma troppo grande per la RAM.

Un diagramma che mostra un dataset diviso in più parti. L’intero dataset è troppo grande per la RAM, ma i singoli pezzi ci stanno.

Programmazione parallela con Dask in Python

Dataset di brani Spotify

files = [
  '2005_tracks.csv',
  '2006_tracks.csv',
  '2007_tracks.csv',
  '2008_tracks.csv',
  '2009_tracks.csv',
  '2010_tracks.csv',
  ...
  '2020_tracks.csv',
]
Programmazione parallela con Dask in Python

Dataset di brani Spotify

                       name  duration_ms release_date  ...
0     Aldrig (feat. Carmon)       247869   2019-01-01  ...
2  2019 - The Year to Build       288105   2019-01-01  ...
3                 Na zawsze       186812   2019-01-01  ...
4         Humo en la Trampa       258354   2019-01-01  ...
5                     Au Au       176000   2019-01-01  ...
...                     ...          ...          ...  ...
Programmazione parallela con Dask in Python

Analizzare i dati

import pandas as pd

maximums = []

for file in files:
    # Load each file
    df = pd.read_csv(file)

# Find maximum track length in each file max_length = df['duration_ms'].max()
# Store this maximum maximums.append(max_length)
# Find the maximum of all the maximum lengths absolute_maximum = max(maximums)
Programmazione parallela con Dask in Python

Analizzare i dati

import pandas as pd

maximums = []

for file in files:
    # Load each file
    df = delayed(pd.read_csv)(file) # <------- delay loading
    # Find maximum track length in each file
    max_length = df['duration_ms'].max()
    # Store this maximum
    maximums.append(max_length)

# Find the maximum of all the maximum lengths
absolute_maximum = max(maximums)
Programmazione parallela con Dask in Python

Analizzare i dati

import pandas as pd

maximums = []

for file in files:
    # Load each file
    df = delayed(pd.read_csv)(file) # <------- delay loading
    # Find maximum track length in each file
    max_length = df['duration_ms'].max()
    # Store this maximum
    maximums.append(max_length)

# Find the maximum of all the maximum lengths
absolute_maximum = delayed(max)(maximums) # <------- delay max() function
Programmazione parallela con Dask in Python

Usare i metodi di un oggetto delayed

import pandas as pd

maximums = []

for file in files:
    df = delayed(pd.read_csv)(file)
    # Use the .max() method
    max_length = df['duration_ms'].max()

    maximums.append(max_length)


absolute_maximum = delayed(max)(maximums)
print(max_length)
Delayed('max-0602855d-3ee6-4c43-a4d2-...')
  • Metodi e proprietà degli oggetti Delayed restituiscono nuovi oggetti delayed
print(df.shape)
print(df.shape.compute())
Delayed('getattr-bc1e8838ab...')
(11907, 12)
Programmazione parallela con Dask in Python

Usare i metodi di un oggetto delayed

import pandas as pd

maximums = []

for file in files:
    df = delayed(pd.read_csv)(file)
    # Use a method which doesn't exist
    max_length = df['duration_ms'].fake()

    maximums.append(max_length)


absolute_maximum = delayed(max)(maximums)
print(max_length)
Delayed('max-6c026036-5daf-4b2-...')
  • I metodi non vengono eseguiti finché non usi .compute()
print(max_length.compute())
...
AttributeError: 'Series' object has no 
attribute 'fake'
Programmazione parallela con Dask in Python

Usare i metodi di un oggetto delayed

import pandas as pd

maximums = []

for file in files:
    df = delayed(pd.read_csv)(file)

    max_length = df['duration_ms'].max()
    # Add delayed object to list
    maximums.append(max_length)

# Run delayed max on delayed objects list
absolute_maximum = delayed(max)(maximums)

maximums è una lista di oggetti delayed

print(maximums)
[Delayed('max-80b...'), 
Delayed('max-fa15d...', 
...]
Programmazione parallela con Dask in Python

Calcolare liste di oggetti delayed

import pandas as pd

maximums = []

for file in files:
    df = delayed(pd.read_csv)(file)

    max_length = df['duration_ms'].max()
    # Add dalayed object to list
    maximums.append(max_length)

# Compute all the maximums
all_maximums = dask.compute(maximums)
print(all_maximums)
([2539418, 4368000, ...
... 4511716, 4864333],)
Programmazione parallela con Dask in Python

Calcolare liste di oggetti delayed

import pandas as pd

maximums = []

for file in files:
    df = delayed(pd.read_csv)(file) 

    max_length = df['duration_ms'].max()

    maximums.append(max_length)

# Compute all the maximums
all_maximums = dask.compute(maximums)[0]
print(all_maximums)
[2539418, 4368000, ...
... 4511716, 4864333]
Programmazione parallela con Dask in Python

Ritardare o no?

def get_max_track(df):
    return df['duration_ms'].max()

for file in files:
    df = delayed(pd.read_csv)(file) 
    # Use function to find max
    max_length = get_max_track(df)

    maximums.append(max_length)


absolute_maximum = delayed(max)(maximums)
Programmazione parallela con Dask in Python

Task graph più profondo

absolute_maximum.visualize()

Un task graph che descrive i passaggi per calcolare il massimo assoluto tra i file. Il grafico è grande ma semplice.

Programmazione parallela con Dask in Python

Ayo berlatih!

Programmazione parallela con Dask in Python

Preparing Video For Download...