Operazioni con Dask bag

Programmazione parallela con Dask in Python

James Fulton

Climate Informatics Researcher

Il metodo map

def number_of_words(s):
    word_list = s.split(' ')
    return len(word_list)

print(number_of_words('these are four words'))
4
Programmazione parallela con Dask in Python

Il metodo map

string_list  = [
  'these are four words',
  'but these are five words', 
  'and these are seven words in total'
]
# Create bag from list above
string_bag = db.from_sequence(string_list)

# Apply function to each element in bag word_count_bag = string_bag.map(number_of_words)
print(word_count_bag)
dask.bag<number_of_words, npartitions=3>
Programmazione parallela con Dask in Python

Il metodo map

string_list  = [
  'these are four words',
  'but these are five words', 
  'and these are seven words in total'
]
# Create bag from list above
string_bag = db.from_sequence(string_list)

# Apply function to each element in bag
word_count_bag = string_bag.map(number_of_words)
# Run compute method
print(word_count_bag.compute())
[4, 5, 7]
Programmazione parallela con Dask in Python

Dati JSON

  • In un file JSON di esempio,example_0.json
{"name": "Beth", "employment": [{"role": "manager", "start_date": ...}, ...], ...}
{"name": "Omar", "employment": [{"role": "analyst", "start_date": ...}, ...], ...}
{"name": "Fang", "employment": [{"role": "engineer", "start_date": ...}, ...], ...}
...
text_bag = db.read_text('example*.json')

print(text_bag.take(1))
('{"name": "Beth", "employment": [{"role": "manager", "start_date": ...}, ...]}\n',)
  • È solo una stringa
Programmazione parallela con Dask in Python

Convertire JSON da stringa a dizionario

import json

text_bag = db.read_text('example*.json')

dict_bag = text_bag.map(json.loads)

print(dict_bag.take(1))
({"name": "Beth", 
  "employment": [{"role": "manager", "start_date": ...}, ...]
  ...},)
  • Ora è un dizionario Python
Programmazione parallela con Dask in Python

Filtraggio

def is_new(employee_dict):
    """Check if employee has less than 1 years service"""
    return employee_dict['years_service'] < 1


# Select only the newer employees new_employee_bag = dict_bag.filter(is_new)
# Count all employees and new employees
print(dict_bag.count().compute(), new_employee_bag.count().compute())
261 49
Programmazione parallela con Dask in Python

Filtraggio

Possiamo usare una funzione lambda per ottenere lo stesso risultato

def is_new(employee_dict):
    """Check if employee has less than 1 years service"""
    return employee_dict['years_service'] < 1

# Can use a lambda function instead of writing the function above
new_employee_bag = dict_bag.filter(lambda x: x['years_service'] < 1)
Programmazione parallela con Dask in Python

Metodo pluck

  • In un file JSON di esempio, example_0.json
{"name": "Beth", "employment": [{"role": "manager", "start_date": ...}, ...], ...}
{"name": "Omar", "employment": [{"role": "analyst", "start_date": ...}, ...], ...}
{"name": "Fang", "employment": [{"role": "engineer", "start_date": ...}, ...], ...}
...
employment_bag = new_employee_bag.pluck('employment')

print(employment_bag.take(1))
([{"role": "manager", "start_date": ...}, ...],)
Programmazione parallela con Dask in Python

Metodo pluck

employment_bag = new_employee_bag.pluck('employment')

number_of_jobs_bag = employment_bag.map(len)

print(number_of_jobs_bag.take(1))
(4,)
Programmazione parallela con Dask in Python

Aggregazioni

min_jobs = number_of_jobs_bag.min()
max_jobs = number_of_jobs_bag.max()
mean_jobs = number_of_jobs_bag.mean()

print(dask.compute(min_jobs, max_jobs, mean_jobs))
(0, 12, 3.142)
Programmazione parallela con Dask in Python

Passons à la pratique !

Programmazione parallela con Dask in Python

Preparing Video For Download...