Parallel Programming with Dask in Python
James Fulton
Climate Informatics Researcher
files = [
'2005_tracks.csv',
'2006_tracks.csv',
'2007_tracks.csv',
'2008_tracks.csv',
'2009_tracks.csv',
'2010_tracks.csv',
...
'2020_tracks.csv',
]
name duration_ms release_date ...
0 Aldrig (feat. Carmon) 247869 2019-01-01 ...
2 2019 - The Year to Build 288105 2019-01-01 ...
3 Na zawsze 186812 2019-01-01 ...
4 Humo en la Trampa 258354 2019-01-01 ...
5 Au Au 176000 2019-01-01 ...
... ... ... ... ...
import pandas as pd maximums = [] for file in files: # Load each file df = pd.read_csv(file)
# Find maximum track length in each file max_length = df['duration_ms'].max()
# Store this maximum maximums.append(max_length)
# Find the maximum of all the maximum lengths absolute_maximum = max(maximums)
import pandas as pd
maximums = []
for file in files:
# Load each file
df = delayed(pd.read_csv)(file) # <------- delay loading
# Find maximum track length in each file
max_length = df['duration_ms'].max()
# Store this maximum
maximums.append(max_length)
# Find the maximum of all the maximum lengths
absolute_maximum = max(maximums)
import pandas as pd
maximums = []
for file in files:
# Load each file
df = delayed(pd.read_csv)(file) # <------- delay loading
# Find maximum track length in each file
max_length = df['duration_ms'].max()
# Store this maximum
maximums.append(max_length)
# Find the maximum of all the maximum lengths
absolute_maximum = delayed(max)(maximums) # <------- delay max() function
import pandas as pd
maximums = []
for file in files:
df = delayed(pd.read_csv)(file)
# Use the .max() method
max_length = df['duration_ms'].max()
maximums.append(max_length)
absolute_maximum = delayed(max)(maximums)
print(max_length)
Delayed('max-0602855d-3ee6-4c43-a4d2-...')
print(df.shape)
print(df.shape.compute())
Delayed('getattr-bc1e8838ab...')
(11907, 12)
import pandas as pd
maximums = []
for file in files:
df = delayed(pd.read_csv)(file)
# Use a method which doesn't exist
max_length = df['duration_ms'].fake()
maximums.append(max_length)
absolute_maximum = delayed(max)(maximums)
print(max_length)
Delayed('max-6c026036-5daf-4b2-...')
.compute()
is usedprint(max_length.compute())
...
AttributeError: 'Series' object has no
attribute 'fake'
import pandas as pd
maximums = []
for file in files:
df = delayed(pd.read_csv)(file)
max_length = df['duration_ms'].max()
# Add delayed object to list
maximums.append(max_length)
# Run delayed max on delayed objects list
absolute_maximum = delayed(max)(maximums)
maximums
is a list of delayed objects
print(maximums)
[Delayed('max-80b...'),
Delayed('max-fa15d...',
...]
import pandas as pd
maximums = []
for file in files:
df = delayed(pd.read_csv)(file)
max_length = df['duration_ms'].max()
# Add dalayed object to list
maximums.append(max_length)
# Compute all the maximums
all_maximums = dask.compute(maximums)
print(all_maximums)
([2539418, 4368000, ...
... 4511716, 4864333],)
import pandas as pd
maximums = []
for file in files:
df = delayed(pd.read_csv)(file)
max_length = df['duration_ms'].max()
maximums.append(max_length)
# Compute all the maximums
all_maximums = dask.compute(maximums)[0]
print(all_maximums)
[2539418, 4368000, ...
... 4511716, 4864333]
def get_max_track(df):
return df['duration_ms'].max()
for file in files:
df = delayed(pd.read_csv)(file)
# Use function to find max
max_length = get_max_track(df)
maximums.append(max_length)
absolute_maximum = delayed(max)(maximums)
absolute_maximum.visualize()
Parallel Programming with Dask in Python