Dask bag-bewerkingen

Parallel programmeren met Dask in Python

James Fulton

Climate Informatics Researcher

De map-methode

def number_of_words(s):
    word_list = s.split(' ')
    return len(word_list)

print(number_of_words('these are four words'))
4
Parallel programmeren met Dask in Python

De map-methode

string_list  = [
  'these are four words',
  'but these are five words', 
  'and these are seven words in total'
]
# Create bag from list above
string_bag = db.from_sequence(string_list)

# Apply function to each element in bag word_count_bag = string_bag.map(number_of_words)
print(word_count_bag)
dask.bag<number_of_words, npartitions=3>
Parallel programmeren met Dask in Python

De map-methode

string_list  = [
  'these are four words',
  'but these are five words', 
  'and these are seven words in total'
]
# Create bag from list above
string_bag = db.from_sequence(string_list)

# Apply function to each element in bag
word_count_bag = string_bag.map(number_of_words)
# Run compute method
print(word_count_bag.compute())
[4, 5, 7]
Parallel programmeren met Dask in Python

JSON-gegevens

  • In een voorbeeld-JSON-bestand,example_0.json
{"name": "Beth", "employment": [{"role": "manager", "start_date": ...}, ...], ...}
{"name": "Omar", "employment": [{"role": "analyst", "start_date": ...}, ...], ...}
{"name": "Fang", "employment": [{"role": "engineer", "start_date": ...}, ...], ...}
...
text_bag = db.read_text('example*.json')

print(text_bag.take(1))
('{"name": "Beth", "employment": [{"role": "manager", "start_date": ...}, ...]}\n',)
  • Dit is gewoon een string
Parallel programmeren met Dask in Python

JSON omzetten van string naar dictionary

import json

text_bag = db.read_text('example*.json')

dict_bag = text_bag.map(json.loads)

print(dict_bag.take(1))
({"name": "Beth", 
  "employment": [{"role": "manager", "start_date": ...}, ...]
  ...},)
  • Dit is nu een Python-dictionary
Parallel programmeren met Dask in Python

Filteren

def is_new(employee_dict):
    """Check if employee has less than 1 years service"""
    return employee_dict['years_service'] < 1


# Select only the newer employees new_employee_bag = dict_bag.filter(is_new)
# Count all employees and new employees
print(dict_bag.count().compute(), new_employee_bag.count().compute())
261 49
Parallel programmeren met Dask in Python

Filteren

We kunnen een lambda-functie gebruiken voor hetzelfde

def is_new(employee_dict):
    """Check if employee has less than 1 years service"""
    return employee_dict['years_service'] < 1

# Kan ook met een lambda-functie i.p.v. de functie hierboven
new_employee_bag = dict_bag.filter(lambda x: x['years_service'] < 1)
Parallel programmeren met Dask in Python

Pluck-methode

  • In een voorbeeld-JSON-bestand, example_0.json
{"name": "Beth", "employment": [{"role": "manager", "start_date": ...}, ...], ...}
{"name": "Omar", "employment": [{"role": "analyst", "start_date": ...}, ...], ...}
{"name": "Fang", "employment": [{"role": "engineer", "start_date": ...}, ...], ...}
...
employment_bag = new_employee_bag.pluck('employment')

print(employment_bag.take(1))
([{"role": "manager", "start_date": ...}, ...],)
Parallel programmeren met Dask in Python

Pluck-methode

employment_bag = new_employee_bag.pluck('employment')

number_of_jobs_bag = employment_bag.map(len)

print(number_of_jobs_bag.take(1))
(4,)
Parallel programmeren met Dask in Python

Aggregaties

min_jobs = number_of_jobs_bag.min()
max_jobs = number_of_jobs_bag.max()
mean_jobs = number_of_jobs_bag.mean()

print(dask.compute(min_jobs, max_jobs, mean_jobs))
(0, 12, 3.142)
Parallel programmeren met Dask in Python

Laten we oefenen!

Parallel programmeren met Dask in Python

Preparing Video For Download...