Dask bag operations

Parallel Programming with Dask in Python

James Fulton

Climate Informatics Researcher

The map method

def number_of_words(s):
    word_list = s.split(' ')
    return len(word_list)

print(number_of_words('these are four words'))
4
Parallel Programming with Dask in Python

The map method

string_list  = [
  'these are four words',
  'but these are five words', 
  'and these are seven words in total'
]
# Create bag from list above
string_bag = db.from_sequence(string_list)

# Apply function to each element in bag word_count_bag = string_bag.map(number_of_words)
print(word_count_bag)
dask.bag<number_of_words, npartitions=3>
Parallel Programming with Dask in Python

The map method

string_list  = [
  'these are four words',
  'but these are five words', 
  'and these are seven words in total'
]
# Create bag from list above
string_bag = db.from_sequence(string_list)

# Apply function to each element in bag
word_count_bag = string_bag.map(number_of_words)
# Run compute method
print(word_count_bag.compute())
[4, 5, 7]
Parallel Programming with Dask in Python

JSON data

  • Inside an example JSON file,example_0.json
{"name": "Beth", "employment": [{"role": "manager", "start_date": ...}, ...], ...}
{"name": "Omar", "employment": [{"role": "analyst", "start_date": ...}, ...], ...}
{"name": "Fang", "employment": [{"role": "engineer", "start_date": ...}, ...], ...}
...
text_bag = db.read_text('example*.json')

print(text_bag.take(1))
('{"name": "Beth", "employment": [{"role": "manager", "start_date": ...}, ...]}\n',)
  • This is just a string
Parallel Programming with Dask in Python

Converting JSON from string to dictionary

import json

text_bag = db.read_text('example*.json')

dict_bag = text_bag.map(json.loads)

print(dict_bag.take(1))
({"name": "Beth", 
  "employment": [{"role": "manager", "start_date": ...}, ...]
  ...},)
  • Now this is a Python dictionary
Parallel Programming with Dask in Python

Filtering

def is_new(employee_dict):
    """Check if employee has less than 1 years service"""
    return employee_dict['years_service'] < 1


# Select only the newer employees new_employee_bag = dict_bag.filter(is_new)
# Count all employees and new employees
print(dict_bag.count().compute(), new_employee_bag.count().compute())
261 49
Parallel Programming with Dask in Python

Filtering

We can use a lambda function to do the same thing

def is_new(employee_dict):
    """Check if employee has less than 1 years service"""
    return employee_dict['years_service'] < 1

# Can use a lambda function instead of writing the function above
new_employee_bag = dict_bag.filter(lambda x: x['years_service'] < 1)
Parallel Programming with Dask in Python

Pluck method

  • Inside an example JSON file, example_0.json
{"name": "Beth", "employment": [{"role": "manager", "start_date": ...}, ...], ...}
{"name": "Omar", "employment": [{"role": "analyst", "start_date": ...}, ...], ...}
{"name": "Fang", "employment": [{"role": "engineer", "start_date": ...}, ...], ...}
...
employment_bag = new_employee_bag.pluck('employment')

print(employment_bag.take(1))
([{"role": "manager", "start_date": ...}, ...],)
Parallel Programming with Dask in Python

Pluck method

employment_bag = new_employee_bag.pluck('employment')

number_of_jobs_bag = employment_bag.map(len)

print(number_of_jobs_bag.take(1))
(4,)
Parallel Programming with Dask in Python

Aggregations

min_jobs = number_of_jobs_bag.min()
max_jobs = number_of_jobs_bag.max()
mean_jobs = number_of_jobs_bag.mean()

print(dask.compute(min_jobs, max_jobs, mean_jobs))
(0, 12, 3.142)
Parallel Programming with Dask in Python

Let's practice!

Parallel Programming with Dask in Python

Preparing Video For Download...