Gecikmeli işlem hatları oluşturma

Python ile Dask ile Paralel Programlama

James Fulton

Climate Informatics Researcher

Veri parçaları

Bir veri kümesinin sabit diske sığdığı, ancak RAM'e sığmadığını gösteren bir diyagram.

Bir veri kümesinin birden çok parçaya bölündüğünü gösteren bir diyagram. Tüm veri kümesi RAM'e sığmaz, ancak tek tek parçalar sığar.

Python ile Dask ile Paralel Programlama

Spotify şarkı veri kümesi

files = [
  '2005_tracks.csv',
  '2006_tracks.csv',
  '2007_tracks.csv',
  '2008_tracks.csv',
  '2009_tracks.csv',
  '2010_tracks.csv',
  ...
  '2020_tracks.csv',
]
Python ile Dask ile Paralel Programlama

Spotify şarkı veri kümesi

                       name  duration_ms release_date  ...
0     Aldrig (feat. Carmon)       247869   2019-01-01  ...
2  2019 - The Year to Build       288105   2019-01-01  ...
3                 Na zawsze       186812   2019-01-01  ...
4         Humo en la Trampa       258354   2019-01-01  ...
5                     Au Au       176000   2019-01-01  ...
...                     ...          ...          ...  ...
Python ile Dask ile Paralel Programlama

Veriyi analiz etme

import pandas as pd

maximums = []

for file in files:
    # Her dosyayı yükle
    df = pd.read_csv(file)

# Her dosyadaki en uzun parçayı bul max_length = df['duration_ms'].max()
# Bu maksimumu sakla maximums.append(max_length)
# Tüm maksimumların en büyüğünü bul absolute_maximum = max(maximums)
Python ile Dask ile Paralel Programlama

Veriyi analiz etme

import pandas as pd

maximums = []

for file in files:
    # Her dosyayı yükle
    df = delayed(pd.read_csv)(file) # <------- yüklemeyi geciktir
    # Her dosyadaki en uzun parçayı bul
    max_length = df['duration_ms'].max()
    # Bu maksimumu sakla
    maximums.append(max_length)

# Tüm maksimumların en büyüğünü bul
absolute_maximum = max(maximums)
Python ile Dask ile Paralel Programlama

Veriyi analiz etme

import pandas as pd

maximums = []

for file in files:
    # Her dosyayı yükle
    df = delayed(pd.read_csv)(file) # <------- yüklemeyi geciktir
    # Her dosyadaki en uzun parçayı bul
    max_length = df['duration_ms'].max()
    # Bu maksimumu sakla
    maximums.append(max_length)

# Tüm maksimumların en büyüğünü bul
absolute_maximum = delayed(max)(maximums) # <------- max() fonksiyonunu geciktir
Python ile Dask ile Paralel Programlama

Gecikmeli nesnenin metodlarını kullanma

import pandas as pd

maximums = []

for file in files:
    df = delayed(pd.read_csv)(file)
    # .max() metodunu kullan
    max_length = df['duration_ms'].max()

    maximums.append(max_length)


absolute_maximum = delayed(max)(maximums)
print(max_length)
Delayed('max-0602855d-3ee6-4c43-a4d2-...')
  • Gecikmeli nesnelerin metod ve özellikleri yeni gecikmeli nesneler döndürür
print(df.shape)
print(df.shape.compute())
Delayed('getattr-bc1e8838ab...')
(11907, 12)
Python ile Dask ile Paralel Programlama

Gecikmeli nesnenin metodlarını kullanma

import pandas as pd

maximums = []

for file in files:
    df = delayed(pd.read_csv)(file)
    # Var olmayan bir metodu kullan
    max_length = df['duration_ms'].fake()

    maximums.append(max_length)


absolute_maximum = delayed(max)(maximums)
print(max_length)
Delayed('max-6c026036-5daf-4b2-...')
  • Metodlar .compute() çağrılana kadar çalışmaz
print(max_length.compute())
...
AttributeError: 'Series' object has no 
attribute 'fake'
Python ile Dask ile Paralel Programlama

Gecikmeli nesnenin metodlarını kullanma

import pandas as pd

maximums = []

for file in files:
    df = delayed(pd.read_csv)(file)

    max_length = df['duration_ms'].max()
    # Gecikmeli nesneyi listeye ekle
    maximums.append(max_length)

# Gecikmeli nesneler listesinde gecikmeli max çalıştır
absolute_maximum = delayed(max)(maximums)

maximums gecikmeli nesnelerden oluşan bir listedir

print(maximums)
[Delayed('max-80b...'), 
Delayed('max-fa15d...', 
...]
Python ile Dask ile Paralel Programlama

Gecikmeli nesne listelerini hesaplama

import pandas as pd

maximums = []

for file in files:
    df = delayed(pd.read_csv)(file)

    max_length = df['duration_ms'].max()
    # Gecikmeli nesneyi listeye ekle
    maximums.append(max_length)

# Tüm maksimumları hesapla
all_maximums = dask.compute(maximums)
print(all_maximums)
([2539418, 4368000, ...
... 4511716, 4864333],)
Python ile Dask ile Paralel Programlama

Gecikmeli nesne listelerini hesaplama

import pandas as pd

maximums = []

for file in files:
    df = delayed(pd.read_csv)(file) 

    max_length = df['duration_ms'].max()

    maximums.append(max_length)

# Tüm maksimumları hesapla
all_maximums = dask.compute(maximums)[0]
print(all_maximums)
[2539418, 4368000, ...
... 4511716, 4864333]
Python ile Dask ile Paralel Programlama

Geciktirmek mi, geciktirmemek mi

def get_max_track(df):
    return df['duration_ms'].max()

for file in files:
    df = delayed(pd.read_csv)(file) 
    # Maksimumu bulmak için fonksiyonu kullan
    max_length = get_max_track(df)

    maximums.append(max_length)


absolute_maximum = delayed(max)(maximums)
Python ile Dask ile Paralel Programlama

Daha derin görev grafiği

absolute_maximum.visualize()

Dosyalar arasında mutlak maksimumu hesaplamak için gereken adımları gösteren bir görev grafiği. Grafik büyük ama basittir.

Python ile Dask ile Paralel Programlama

Hadi pratik yapalım!

Python ile Dask ile Paralel Programlama

Preparing Video For Download...