Programmazione parallela con Dask in Python
James Fulton
Climate Informatics Researcher

import glob
video_filenames = glob.glob("*.mp4")
print(video_filenames)
['me_at_the_zoo.mp4', 'life_goes_on.mp4', 'guitar.mp4', 'hurt.mp4', ...]
import glob
video_filenames = glob.glob("*.mp4")
import dask.bag as db
filename_bag = db.from_sequence(video_filenames)
filename_bag.take(1)[0]
'me_at_the_zoo.mp4'
# Carica un singolo video
load_mp4("video.mp4")
{'video': array(
[[[ 51, 57, 37, ..., 227, 238, 168],
...,
[ 83, 125, 129, ..., 222, 148, 208]]]),
'audio': array([ 7. , 9. , 9.5, ..., -544.5, -551. , -558. ]),
'filename': 'video.mp4'}
data_bag = filename_bag.map(load_mp4)
data_bag.take(1)[0]
{'video': array(
[126, 162, 203, ..., 63, 58, 8],
...,
[ 58, 222, 170, ..., 234, 63, 81]]]),
'audio': array([-203.5, -209. , -207. , ..., -222.5, -233. , -248.5]),
'filename': 'me_at_the_zoo.mp4'}
data_bag = filename_bag.map(load_mp4)
# Crea una lista vuota
data_list = []
# Aggiungi i file caricati in modalità delayed alla lista
for file in video_filenames:
data_list.append(dask.delayed(load_mp4)(file))
# Converti la lista di oggetti delayed in un Dask bag
data_bag = db.from_delayed(data_list)
# Converti il Dask bag in lista di oggetti delayed
data_list = data_bag.to_delayed()
transcribed_bag = data_bag.map(transcribe_audio)
transcribed_bag.take(1)[0]
{'video': array(
[126, 162, 203, ..., 63, 58, 8],
...,
[ 58, 222, 170, ..., 234, 63, 81]]]),
'audio': array([-203.5, -209. , -207. , ..., -222.5, -233. , -248.5]),
'filename': 'me_at_the_zoo.mp4'
'transcript': "All right, so here we are in front of the, uh, elephants ...",
}
# Applica funzione personalizzata per rimuovere i video senza parlato
clean_bag = transcribed_bag.filter(transcript_is_not_blank)
# Applica sentiment analysis alle trascrizioni
sentiment_bag = clean_bag.map(analyze_transcript_sentiment)
# Rimuovi elementi indesiderati dal bag
keys_to_drop = ['video', 'audio']
final_bag = sentiment_bag.map(filter_dictionary, keys_to_drop=keys_to_drop)
# Converti in Dask DataFrame
df = final_bag.to_dataframe()
df.compute()
filename transcript sentiment
0 me_at_the_zoo.mp4 All right, so here ... positive
... ... ... ...
# Importa il modulo scipy per file .wav
from scipy.io import wavfile
# Carica frequenza di campionamento e array audio
sample_freq, audio = wavfile.read(filename)
# Campioni al secondo
print(sample_freq)
44100
# I dati audio
print(audio)
array([ 148, 142, 150, ..., -542, -546, -559], dtype=int16)
Programmazione parallela con Dask in Python