Parallel Programming with Dask in Python
James Fulton
Climate Informatics Researcher
def number_of_words(s):
word_list = s.split(' ')
return len(word_list)
print(number_of_words('these are four words'))
4
string_list = [ 'these are four words', 'but these are five words', 'and these are seven words in total' ] # Create bag from list above string_bag = db.from_sequence(string_list)
# Apply function to each element in bag word_count_bag = string_bag.map(number_of_words)
print(word_count_bag)
dask.bag<number_of_words, npartitions=3>
string_list = [
'these are four words',
'but these are five words',
'and these are seven words in total'
]
# Create bag from list above
string_bag = db.from_sequence(string_list)
# Apply function to each element in bag
word_count_bag = string_bag.map(number_of_words)
# Run compute method
print(word_count_bag.compute())
[4, 5, 7]
example_0.json
{"name": "Beth", "employment": [{"role": "manager", "start_date": ...}, ...], ...}
{"name": "Omar", "employment": [{"role": "analyst", "start_date": ...}, ...], ...}
{"name": "Fang", "employment": [{"role": "engineer", "start_date": ...}, ...], ...}
...
text_bag = db.read_text('example*.json')
print(text_bag.take(1))
('{"name": "Beth", "employment": [{"role": "manager", "start_date": ...}, ...]}\n',)
import json text_bag = db.read_text('example*.json') dict_bag = text_bag.map(json.loads)
print(dict_bag.take(1))
({"name": "Beth",
"employment": [{"role": "manager", "start_date": ...}, ...]
...},)
def is_new(employee_dict): """Check if employee has less than 1 years service""" return employee_dict['years_service'] < 1
# Select only the newer employees new_employee_bag = dict_bag.filter(is_new)
# Count all employees and new employees
print(dict_bag.count().compute(), new_employee_bag.count().compute())
261 49
We can use a lambda function to do the same thing
def is_new(employee_dict):
"""Check if employee has less than 1 years service"""
return employee_dict['years_service'] < 1
# Can use a lambda function instead of writing the function above
new_employee_bag = dict_bag.filter(lambda x: x['years_service'] < 1)
example_0.json
{"name": "Beth", "employment": [{"role": "manager", "start_date": ...}, ...], ...}
{"name": "Omar", "employment": [{"role": "analyst", "start_date": ...}, ...], ...}
{"name": "Fang", "employment": [{"role": "engineer", "start_date": ...}, ...], ...}
...
employment_bag = new_employee_bag.pluck('employment')
print(employment_bag.take(1))
([{"role": "manager", "start_date": ...}, ...],)
employment_bag = new_employee_bag.pluck('employment')
number_of_jobs_bag = employment_bag.map(len)
print(number_of_jobs_bag.take(1))
(4,)
min_jobs = number_of_jobs_bag.min()
max_jobs = number_of_jobs_bag.max()
mean_jobs = number_of_jobs_bag.mean()
print(dask.compute(min_jobs, max_jobs, mean_jobs))
(0, 12, 3.142)
Parallel Programming with Dask in Python