Dask bag işlemleri

Python ile Dask ile Paralel Programlama

James Fulton

Climate Informatics Researcher

map yöntemi

def number_of_words(s):
    word_list = s.split(' ')
    return len(word_list)

print(number_of_words('these are four words'))
4
Python ile Dask ile Paralel Programlama

map yöntemi

string_list  = [
  'these are four words',
  'but these are five words', 
  'and these are seven words in total'
]
# Create bag from list above
string_bag = db.from_sequence(string_list)

# Apply function to each element in bag word_count_bag = string_bag.map(number_of_words)
print(word_count_bag)
dask.bag<number_of_words, npartitions=3>
Python ile Dask ile Paralel Programlama

map yöntemi

string_list  = [
  'these are four words',
  'but these are five words', 
  'and these are seven words in total'
]
# Create bag from list above
string_bag = db.from_sequence(string_list)

# Apply function to each element in bag
word_count_bag = string_bag.map(number_of_words)
# Run compute method
print(word_count_bag.compute())
[4, 5, 7]
Python ile Dask ile Paralel Programlama

JSON verisi

  • Örnek bir JSON dosyasında,example_0.json
{"name": "Beth", "employment": [{"role": "manager", "start_date": ...}, ...], ...}
{"name": "Omar", "employment": [{"role": "analyst", "start_date": ...}, ...], ...}
{"name": "Fang", "employment": [{"role": "engineer", "start_date": ...}, ...], ...}
...
text_bag = db.read_text('example*.json')

print(text_bag.take(1))
('{"name": "Beth", "employment": [{"role": "manager", "start_date": ...}, ...]}\n',)
  • Bu sadece bir string
Python ile Dask ile Paralel Programlama

JSON’u string’ten sözlüğe dönüştürme

import json

text_bag = db.read_text('example*.json')

dict_bag = text_bag.map(json.loads)

print(dict_bag.take(1))
({"name": "Beth", 
  "employment": [{"role": "manager", "start_date": ...}, ...]
  ...},)
  • Artık bu bir Python dictionary’si
Python ile Dask ile Paralel Programlama

Filtreleme

def is_new(employee_dict):
    """Check if employee has less than 1 years service"""
    return employee_dict['years_service'] < 1


# Select only the newer employees new_employee_bag = dict_bag.filter(is_new)
# Count all employees and new employees
print(dict_bag.count().compute(), new_employee_bag.count().compute())
261 49
Python ile Dask ile Paralel Programlama

Filtreleme

Aynısını bir lambda fonksiyonuyla da yapabiliriz

def is_new(employee_dict):
    """Check if employee has less than 1 years service"""
    return employee_dict['years_service'] < 1

# Can use a lambda function instead of writing the function above
new_employee_bag = dict_bag.filter(lambda x: x['years_service'] < 1)
Python ile Dask ile Paralel Programlama

Pluck yöntemi

  • Örnek bir JSON dosyasında, example_0.json
{"name": "Beth", "employment": [{"role": "manager", "start_date": ...}, ...], ...}
{"name": "Omar", "employment": [{"role": "analyst", "start_date": ...}, ...], ...}
{"name": "Fang", "employment": [{"role": "engineer", "start_date": ...}, ...], ...}
...
employment_bag = new_employee_bag.pluck('employment')

print(employment_bag.take(1))
([{"role": "manager", "start_date": ...}, ...],)
Python ile Dask ile Paralel Programlama

Pluck yöntemi

employment_bag = new_employee_bag.pluck('employment')

number_of_jobs_bag = employment_bag.map(len)

print(number_of_jobs_bag.take(1))
(4,)
Python ile Dask ile Paralel Programlama

Toplamalar

min_jobs = number_of_jobs_bag.min()
max_jobs = number_of_jobs_bag.max()
mean_jobs = number_of_jobs_bag.mean()

print(dask.compute(min_jobs, max_jobs, mean_jobs))
(0, 12, 3.142)
Python ile Dask ile Paralel Programlama

Hadi pratik yapalım!

Python ile Dask ile Paralel Programlama

Preparing Video For Download...