Usare qualsiasi dato nei Dask bag

Programmazione parallela con Dask in Python

James Fulton

Climate Informatics Researcher

Formati dati misti complessi

Un diagramma che mostra un dataset con video e audio.

Programmazione parallela con Dask in Python

Creare un Dask bag

import glob

video_filenames = glob.glob("*.mp4")
print(video_filenames)
['me_at_the_zoo.mp4', 'life_goes_on.mp4', 'guitar.mp4', 'hurt.mp4', ...]
Programmazione parallela con Dask in Python

Creare un Dask bag

import glob

video_filenames = glob.glob("*.mp4")
import dask.bag as db

filename_bag = db.from_sequence(video_filenames)
filename_bag.take(1)[0]
'me_at_the_zoo.mp4'
Programmazione parallela con Dask in Python

Caricare dati personalizzati

# Carica un singolo video
load_mp4("video.mp4")
{'video': array(
         [[[ 51,  57,  37, ..., 227, 238, 168],
         ...,
         [ 83, 125, 129, ..., 222, 148, 208]]]),
 'audio': array([   7. ,    9. ,    9.5, ..., -544.5, -551. , -558. ]),
 'filename': 'video.mp4'}
Programmazione parallela con Dask in Python

Caricare dati personalizzati

data_bag = filename_bag.map(load_mp4)
data_bag.take(1)[0]
{'video': array(
         [126, 162, 203, ...,  63,  58,   8],
         ...,
         [ 58, 222, 170, ..., 234,  63,  81]]]),
 'audio': array([-203.5, -209. , -207. , ..., -222.5, -233. , -248.5]),
 'filename': 'me_at_the_zoo.mp4'}
Programmazione parallela con Dask in Python

Caricare dati personalizzati

data_bag = filename_bag.map(load_mp4)
# Crea una lista vuota
data_list = []

# Aggiungi i file caricati in modalità delayed alla lista
for file in video_filenames:
    data_list.append(dask.delayed(load_mp4)(file))
Programmazione parallela con Dask in Python

Lista di oggetti delayed vs Dask bag

# Converti la lista di oggetti delayed in un Dask bag
data_bag = db.from_delayed(data_list)
# Converti il Dask bag in lista di oggetti delayed
data_list = data_bag.to_delayed()
Programmazione parallela con Dask in Python

Analisi successive

transcribed_bag = data_bag.map(transcribe_audio)
transcribed_bag.take(1)[0]
{'video': array(
         [126, 162, 203, ...,  63,  58,   8],
         ...,
         [ 58, 222, 170, ..., 234,  63,  81]]]),
 'audio': array([-203.5, -209. , -207. , ..., -222.5, -233. , -248.5]),
 'filename': 'me_at_the_zoo.mp4'
 'transcript': "All right, so here we are in front of the, uh, elephants ...",
}
Programmazione parallela con Dask in Python

Analisi successive

# Applica funzione personalizzata per rimuovere i video senza parlato
clean_bag = transcribed_bag.filter(transcript_is_not_blank)

# Applica sentiment analysis alle trascrizioni
sentiment_bag = clean_bag.map(analyze_transcript_sentiment)
# Rimuovi elementi indesiderati dal bag
keys_to_drop = ['video', 'audio']
final_bag = sentiment_bag.map(filter_dictionary, keys_to_drop=keys_to_drop)

# Converti in Dask DataFrame
df = final_bag.to_dataframe()
Programmazione parallela con Dask in Python

Risultati

df.compute()
            filename              transcript        sentiment
0  me_at_the_zoo.mp4  All right, so here ...         positive
...              ...                     ...              ...
Programmazione parallela con Dask in Python

Uso di file .wav

# Importa il modulo scipy per file .wav
from scipy.io import wavfile

# Carica frequenza di campionamento e array audio
sample_freq, audio = wavfile.read(filename)
Programmazione parallela con Dask in Python

Uso di file .wav

# Campioni al secondo
print(sample_freq)
44100
# I dati audio
print(audio)
array([ 148,  142,  150, ..., -542, -546, -559], dtype=int16)
Programmazione parallela con Dask in Python

Passiamo alla pratica!

Programmazione parallela con Dask in Python

Preparing Video For Download...