Commit 226feab3 authored by NaweenTharuka's avatar NaweenTharuka

feat: Audio emotion detection py

parent c2e71338
import numpy as np
import librosa # To extract speech features
import glob
import os
# Extract feature function
def extract_audio_features(file_name, should_augment=False, **kwargs):
"""
Extract feature from audio file `file_name`
Features supported:
- MFCC (mfcc)
- Chroma (chroma)
- MEL Spectrogram Frequency (mel)
e.g:
`features = extract_audio_features(path, mel=True, mfcc=True)`
"""
mfcc = kwargs.get("mfcc")
chroma = kwargs.get("chroma")
mel = kwargs.get("mel")
# https://stackoverflow.com/questions/9458480/read-mp3-in-python-3
# https://librosa.org/doc/latest/tutorial.html#quickstart
# https://github.com/librosa/librosa/issues/1015
X, sample_rate = librosa.load(file_name)
if chroma:
stft = np.abs(librosa.stft(X))
result = np.array([])
if mfcc:
mfccs = np.mean(librosa.feature.mfcc(y=X, sr=sample_rate, n_mfcc=40).T, axis=0)
result = np.hstack((result, mfccs))
# print('mfccs shape', mfccs.shape)
if mel:
mel = np.mean(librosa.feature.melspectrogram(X, sr=sample_rate).T,axis=0)
result = np.hstack((result, mel))
# print('mel shape', mel.shape)
if chroma:
chroma = np.mean(librosa.feature.chroma_stft(S=stft, sr=sample_rate).T,axis=0)
result = np.hstack((result, chroma))
# print('chroma shape', chroma.shape)
return result
#!/usr/bin/env python3
# Author: Jan Cuhel
# Date: 2.5.2021
import os
import gtts
import librosa
import numpy as np
import pickle
from pydub import AudioSegment
from pydub.playback import play
import tensorflow as tf
import tensorflow_hub as hub
import tensorflow_text as text
import scipy
import speech_recognition as sr
# Import TF 2.X and make sure we're running eager.
import tensorflow.compat.v2 as tf
tf.enable_v2_behavior()
assert tf.executing_eagerly()
import warnings
warnings.filterwarnings('ignore')
from extract_audio_features import extract_audio_features
# Audio constants
DURATION_RAVDESS = 3
DURATION_IEMOCAP = 11
SAMPLING_RATE = 16000
input_length_iemocap = SAMPLING_RATE * DURATION_IEMOCAP
input_length_ravdess = SAMPLING_RATE * DURATION_RAVDESS
DEFAULT_FILE = 'microphone-results.wav'
# TRILL models
SER_TRILL_MODEL_IEMOCAP = '/content/mer-thesis-app/result_models/ser_trill_lstm_iemocap_model.h5'
SER_TRILL_MODEL_RAVDESS = '/content/mer-thesis-app/result_models/ser_trill_lstm_ravdess_model.h5'
MER_ELECTRA_TRILL = '/content/mer-thesis-app/result_models/mer_trill_electra_small_model.h5'
# Yamnet models
SER_YAMNET_MODEL_IEMOCAP = '/content/mer-thesis-app/result_models/ser_yamnet_iemocap_model.h5'
SER_YAMNET_MODEL_RAVDESS = '/content/mer-thesis-app/result_models/ser_yamnet_ravdess_model.h5'
MER_ELECTRA_YAMNET = '/content/mer-thesis-app/result_models/mer_electra_yamnet_iemocap_model.h5'
# TER Electra
TER_ELECTRA_IEMOCAP = '/content/mer-thesis-app/result_models/ter_electra_iemocap_model.h5'
TER_ELECTRA_PSYCHEXP = '/content/mer-thesis-app/result_models/ter_electra_model_psychexp.h5'
# Emotion available in datasets
emotions_iemocap = ['neutral', 'happy', 'sad', 'angry']
emotions_ravdess = ['neutral', 'calm', 'happy', 'sad', 'angry', 'fearful', 'disgust', 'surprised']
emotion_psychexp = ['joy', 'fear', 'anger', 'sadness', 'disgust', 'shame', 'guilt']
# Language of the models
LANG='en'
# URL addresses for the audio embeddings
TRILL_URL = 'https://tfhub.dev/google/nonsemantic-speech-benchmark/trill-distilled/3'
YAMNET_URL = 'https://tfhub.dev/google/yamnet/1'
class DeepLearningModel():
""" Definition of a class for DeepLearning Emotion Recognition model """
def __init__(self, model_filename, emotions=emotions_iemocap):
self.model_filename = model_filename
self.emotions = emotions
self.model = self.load_model()
def load_model(self):
""" Loads the model from TF Hub """
return tf.keras.models.load_model(
self.model_filename, custom_objects={'KerasLayer':hub.KerasLayer})
class TERModel(DeepLearningModel):
""" Definition of a class for Text Emotion Recognition model (TER) """
def __init__(self, model_filename, emotions=emotions_iemocap):
super().__init__(model_filename, emotions)
def predict_emotion(self, text):
""" Predicts an emotion of the given text """
X_text = np.array([text])
# Make prediction
pred_id = tf.argmax(self.model.predict(X_text), 1).numpy()[0]
return self.emotions[pred_id]
class SERModel(DeepLearningModel):
""" Definition of a class for Speech Emotion Recognition model (SER) """
def __init__(self, model_filename, embedding_url, emotions=emotions_iemocap, input_length=input_length_iemocap, sample_rate=SAMPLING_RATE):
super().__init__(model_filename, emotions)
self.input_length = input_length
self.embedding = hub.load(embedding_url)
self.sample_rate = sample_rate
def load_model(self):
""" Loads the model """
return tf.keras.models.load_model(self.model_filename)
def predict_emotion(self, audio_file):
""" Predicts an emotion of the given audio file """
y, _ = librosa.load(audio_file, sr=self.sample_rate)
# y,_ = librosa.effects.trim(y, top_db = 25)
# https://en.wikipedia.org/wiki/Wiener_filter
# https://cs.wikipedia.org/wiki/Wiener%C5%AFv_filtr
# https://docs.scipy.org/doc/scipy/reference/generated/scipy.signal.wiener.html
y = scipy.signal.wiener(y)
if len(y) > self.input_length:
# Cut to the same length
y = y[0:self.input_length]
elif self.input_length > len(y):
# Pad the sequence
max_offset = self.input_length - len(y)
y = np.pad(y, (0, max_offset), "constant")
X_audio = self.get_audio_embedding(y)
# Make prediction
pred_id = tf.argmax(self.model.predict(X_audio), 1).numpy()[0]
return self.emotions[pred_id]
def get_audio_embedding(self, audio):
return np.array([audio])
class TRILLSERModel(SERModel):
"""
Definition of a class for Speech Emotion Recognition model (SER) that
uses TRILL Embedding
"""
def __init__(self, model_filename, embedding_url, emotions=emotions_iemocap, input_length=input_length_iemocap, sample_rate=SAMPLING_RATE):
super().__init__(model_filename, embedding_url, emotions, input_length, sample_rate)
def get_audio_embedding(self, audio):
return np.array([self.embedding(samples=audio, sample_rate=self.sample_rate)['embedding'].numpy()])
class YAMNetSERModel(SERModel):
"""
Definition of a class for Speech Emotion Recognition model (SER) that
uses YAMNet as an Embedding
"""
def __init__(self, model_filename, embedding_url, emotions=emotions_iemocap, input_length=input_length_iemocap, sample_rate=SAMPLING_RATE):
super().__init__(model_filename, embedding_url, emotions, input_length, sample_rate)
def get_audio_embedding(self, audio):
# Get the embedding from the yamnet
_, embeddings, _ = self.embedding(audio)
return np.array([embeddings.numpy()])
class MERModel(DeepLearningModel):
""" Definition of a class for Multimodal Emotion Recognition model (MER) """
def __init__(self, model_filename, embedding_url, emotions=emotions_iemocap, input_length=input_length_iemocap, sample_rate=SAMPLING_RATE):
super().__init__(model_filename, emotions)
self.input_length = input_length
self.embedding = hub.load(embedding_url)
self.input_length = input_length
self.sample_rate = sample_rate
def predict_emotion(self, text, audio_file):
""" Predicts an emotion of the given text and audio file """
y, _ = librosa.load(audio_file, sr=self.sample_rate)
# y,_ = librosa.effects.trim(y, top_db = 25)
# https://en.wikipedia.org/wiki/Wiener_filter
# https://cs.wikipedia.org/wiki/Wiener%C5%AFv_filtr
# https://docs.scipy.org/doc/scipy/reference/generated/scipy.signal.wiener.html
y = scipy.signal.wiener(y)
if len(y) > self.input_length:
# Cut to the same length
y = y[0:self.input_length]
elif self.input_length > len(y):
# Pad the sequence
max_offset = self.input_length - len(y)
y = np.pad(y, (0, max_offset), "constant")
X_audio = self.get_audio_embedding(y)
X_text = np.array([text])
# Make prediction
pred_id = tf.argmax(self.model.predict([X_text, X_audio]), 1).numpy()[0]
return self.emotions[pred_id]
def get_audio_embedding(self, audio):
return np.array([audio])
class ElectraTRILLMERModel(MERModel):
"""
Definition of a class for Multimodal Emotion Recognition model (MER) that
uses TRILL Embedding
"""
def __init__(self, model_filename, embedding_url, emotions=emotions_iemocap, input_length=input_length_iemocap, sample_rate=SAMPLING_RATE):
super().__init__(model_filename, embedding_url, emotions, input_length, sample_rate)
def get_audio_embedding(self, audio):
return np.array([self.embedding(samples=audio, sample_rate=self.sample_rate)['embedding'].numpy()])
class ElectraYAMNetMERModel(MERModel):
"""
Definition of a class for Multimodal Emotion Recognition model (MER) that
uses YAMNet as an Embedding
"""
def __init__(self, model_filename, embedding_url, emotions=emotions_iemocap, input_length=input_length_iemocap, sample_rate=SAMPLING_RATE):
super().__init__(model_filename, embedding_url, emotions, input_length, sample_rate)
def get_audio_embedding(self, audio):
# Get the embedding from the yamnet
_, embeddings, _ = self.embedding(audio)
return np.array([embeddings.numpy()])
def record_speech(lang=LANG, dur=DURATION_IEMOCAP, filepath=DEFAULT_FILE):
"""
This function records a speech from a microphone and get the text.
params:
- lang: the language of the recorded speach
- dur: how long in seconds should the function record
- filepath: path to the file where should be the audio recording saved
returns:
- text: transcript of the audio recording
- filepath: where was the audio recording saved
"""
# initialize the recognizer
r = sr.Recognizer()
try:
with sr.Microphone() as source:
print(f'Starting recording for the next {dur}s.\nPlease speak...')
# read the audio data from the default microphone
audio_data = r.record(source, duration=dur)
print("Recording ended.\nRecognizing...")
# convert speech to text
text = r.recognize_google(audio_data, language=lang)
print('Done.')
print(f'\nYou\'ve said {text}.\n')
# write audio to a WAV file
with open(filepath, "wb") as f:
f.write(audio_data.get_wav_data())
print('Done.')
return text, filepath
except:
print('Something went wrong... Try to speak again')
return None, None
"""
Resource: https://ricardodeazambuja.com/deep_learning/2019/03/09/audio_and_video_google_colab/
Author references:
To write this piece of code I took inspiration/code from a lot of places.
It was late night, so I'm not sure how much I created or just copied o.O
Here are some of the possible references:
https://blog.addpipe.com/recording-audio-in-the-browser-using-pure-html5-and-minimal-javascript/
https://stackoverflow.com/a/18650249
https://hacks.mozilla.org/2014/06/easy-audio-capture-with-the-mediarecorder-api/
https://air.ghost.io/recording-to-an-audio-file-using-html5-and-js/
https://stackoverflow.com/a/49019356
"""
from IPython.display import HTML, Audio
from google.colab.output import eval_js
from base64 import b64decode
import numpy as np
# from scipy.io.wavfile import read as wav_read
import librosa
import io
import ffmpeg
AUDIO_HTML = """
<script>
var my_div = document.createElement("DIV");
var my_p = document.createElement("P");
var my_btn = document.createElement("BUTTON");
var t = document.createTextNode("Press to start recording");
my_btn.appendChild(t);
//my_p.appendChild(my_btn);
my_div.appendChild(my_btn);
document.body.appendChild(my_div);
var base64data = 0;
var reader;
var recorder, gumStream;
var recordButton = my_btn;
var handleSuccess = function(stream) {
gumStream = stream;
var options = {
//bitsPerSecond: 8000, //chrome seems to ignore, always 48k
mimeType : 'audio/webm;codecs=opus'
//mimeType : 'audio/webm;codecs=pcm'
};
//recorder = new MediaRecorder(stream, options);
recorder = new MediaRecorder(stream);
recorder.ondataavailable = function(e) {
var url = URL.createObjectURL(e.data);
var preview = document.createElement('audio');
preview.controls = true;
preview.src = url;
document.body.appendChild(preview);
reader = new FileReader();
reader.readAsDataURL(e.data);
reader.onloadend = function() {
base64data = reader.result;
//console.log("Inside FileReader:" + base64data);
}
};
recorder.start();
};
recordButton.innerText = "Recording... press to stop";
navigator.mediaDevices.getUserMedia({audio: true}).then(handleSuccess);
function toggleRecording() {
if (recorder && recorder.state == "recording") {
recorder.stop();
gumStream.getAudioTracks()[0].stop();
recordButton.innerText = "Saving the recording... pls wait!"
}
}
// https://stackoverflow.com/a/951057
function sleep(ms) {
return new Promise(resolve => setTimeout(resolve, ms));
}
var data = new Promise(resolve=>{
//recordButton.addEventListener("click", toggleRecording);
recordButton.onclick = ()=>{
toggleRecording()
sleep(2000).then(() => {
// wait 2000ms for the data to be available...
// ideally this should use something like await...
//console.log("Inside data:" + base64data)
resolve(base64data.toString())
});
}
});
</script>
"""
def get_audio():
display(HTML(AUDIO_HTML))
data = eval_js("data")
binary = b64decode(data.split(',')[1])
process = (ffmpeg
.input('pipe:0')
.output('pipe:1', format='wav')
.run_async(pipe_stdin=True, pipe_stdout=True, pipe_stderr=True, quiet=True, overwrite_output=True)
)
output, err = process.communicate(input=binary)
riff_chunk_size = len(output) - 8
# Break up the chunk size into four bytes, held in b.
q = riff_chunk_size
b = []
for i in range(4):
q, r = divmod(q, 256)
b.append(r)
# Replace bytes 4:8 in proc.stdout with the actual size of the RIFF chunk.
riff = output[:4] + bytes(b) + output[8:]
# sr, audio = wav_read(io.BytesIO(riff))
audio, sr = librosa.load(io.BytesIO(riff), sr=16000)
audio_file = 'audio.wav'
with open(audio_file,'wb') as f:
f.write(riff)
return audio, sr, audio_file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment