Membangun pipeline tertunda

Pemrograman Paralel dengan Dask di Python

James Fulton

Climate Informatics Researcher

Potongan data

Diagram yang menunjukkan dataset cukup kecil untuk disimpan di hard drive, tetapi terlalu besar untuk muat di RAM.

Diagram yang menunjukkan dataset dibagi menjadi beberapa bagian. Seluruh dataset terlalu besar untuk RAM, tetapi tiap bagiannya muat.

Pemrograman Paralel dengan Dask di Python

Dataset lagu Spotify

files = [
  '2005_tracks.csv',
  '2006_tracks.csv',
  '2007_tracks.csv',
  '2008_tracks.csv',
  '2009_tracks.csv',
  '2010_tracks.csv',
  ...
  '2020_tracks.csv',
]
Pemrograman Paralel dengan Dask di Python

Dataset lagu Spotify

                       name  duration_ms release_date  ...
0     Aldrig (feat. Carmon)       247869   2019-01-01  ...
2  2019 - The Year to Build       288105   2019-01-01  ...
3                 Na zawsze       186812   2019-01-01  ...
4         Humo en la Trampa       258354   2019-01-01  ...
5                     Au Au       176000   2019-01-01  ...
...                     ...          ...          ...  ...
Pemrograman Paralel dengan Dask di Python

Menganalisis data

import pandas as pd

maximums = []

for file in files:
    # Muat tiap file
    df = pd.read_csv(file)

# Cari durasi lagu maksimum di tiap file max_length = df['duration_ms'].max()
# Simpan nilai maksimum maximums.append(max_length)
# Cari maksimum dari semua nilai maksimum absolute_maximum = max(maximums)
Pemrograman Paralel dengan Dask di Python

Menganalisis data

import pandas as pd

maximums = []

for file in files:
    # Muat tiap file
    df = delayed(pd.read_csv)(file) # <------- tunda pemuatan
    # Cari durasi lagu maksimum di tiap file
    max_length = df['duration_ms'].max()
    # Simpan nilai maksimum
    maximums.append(max_length)

# Cari maksimum dari semua nilai maksimum
absolute_maximum = max(maximums)
Pemrograman Paralel dengan Dask di Python

Menganalisis data

import pandas as pd

maximums = []

for file in files:
    # Muat tiap file
    df = delayed(pd.read_csv)(file) # <------- tunda pemuatan
    # Cari durasi lagu maksimum di tiap file
    max_length = df['duration_ms'].max()
    # Simpan nilai maksimum
    maximums.append(max_length)

# Cari maksimum dari semua nilai maksimum
absolute_maximum = delayed(max)(maximums) # <------- tunda fungsi max()
Pemrograman Paralel dengan Dask di Python

Menggunakan metode objek tertunda

import pandas as pd

maximums = []

for file in files:
    df = delayed(pd.read_csv)(file)
    # Gunakan metode .max()
    max_length = df['duration_ms'].max()

    maximums.append(max_length)


absolute_maximum = delayed(max)(maximums)
print(max_length)
Delayed('max-0602855d-3ee6-4c43-a4d2-...')
  • Metode dan properti objek Delayed mengembalikan objek Delayed baru
print(df.shape)
print(df.shape.compute())
Delayed('getattr-bc1e8838ab...')
(11907, 12)
Pemrograman Paralel dengan Dask di Python

Menggunakan metode objek tertunda

import pandas as pd

maximums = []

for file in files:
    df = delayed(pd.read_csv)(file)
    # Gunakan metode yang tidak ada
    max_length = df['duration_ms'].fake()

    maximums.append(max_length)


absolute_maximum = delayed(max)(maximums)
print(max_length)
Delayed('max-6c026036-5daf-4b2-...')
  • Metode tidak dijalankan sampai .compute() dipanggil
print(max_length.compute())
...
AttributeError: 'Series' object has no 
attribute 'fake'
Pemrograman Paralel dengan Dask di Python

Menggunakan metode objek tertunda

import pandas as pd

maximums = []

for file in files:
    df = delayed(pd.read_csv)(file)

    max_length = df['duration_ms'].max()
    # Tambahkan objek tertunda ke daftar
    maximums.append(max_length)

# Jalankan max tertunda pada daftar objek tertunda
absolute_maximum = delayed(max)(maximums)

maximums adalah daftar objek tertunda

print(maximums)
[Delayed('max-80b...'), 
Delayed('max-fa15d...', 
...]
Pemrograman Paralel dengan Dask di Python

Menghitung daftar objek tertunda

import pandas as pd

maximums = []

for file in files:
    df = delayed(pd.read_csv)(file)

    max_length = df['duration_ms'].max()
    # Tambahkan objek tertunda ke daftar
    maximums.append(max_length)

# Hitung semua nilai maksimum
all_maximums = dask.compute(maximums)
print(all_maximums)
([2539418, 4368000, ...
... 4511716, 4864333],)
Pemrograman Paralel dengan Dask di Python

Menghitung daftar objek tertunda

import pandas as pd

maximums = []

for file in files:
    df = delayed(pd.read_csv)(file) 

    max_length = df['duration_ms'].max()

    maximums.append(max_length)

# Hitung semua nilai maksimum
all_maximums = dask.compute(maximums)[0]
print(all_maximums)
[2539418, 4368000, ...
... 4511716, 4864333]
Pemrograman Paralel dengan Dask di Python

Menunda atau tidak menunda

def get_max_track(df):
    return df['duration_ms'].max()

for file in files:
    df = delayed(pd.read_csv)(file) 
    # Gunakan fungsi untuk mencari maksimum
    max_length = get_max_track(df)

    maximums.append(max_length)


absolute_maximum = delayed(max)(maximums)
Pemrograman Paralel dengan Dask di Python

Graf tugas lebih dalam

absolute_maximum.visualize()

Graf tugas yang menjelaskan langkah untuk menghitung maksimum absolut antar file. Grafnya besar namun sederhana.

Pemrograman Paralel dengan Dask di Python

Ayo berlatih!

Pemrograman Paralel dengan Dask di Python

Preparing Video For Download...