Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
E
Edu Me
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Analytics
Analytics
CI / CD
Repository
Value Stream
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
2023-319
Edu Me
Commits
5cfc3a50
Commit
5cfc3a50
authored
Sep 09, 2023
by
P.R.K Peramuna
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
Integrated backend code added
parent
c0fdadde
Changes
1
Hide whitespace changes
Inline
Side-by-side
Showing
1 changed file
with
803 additions
and
0 deletions
+803
-0
ML/inference.py
ML/inference.py
+803
-0
No files found.
ML/inference.py
0 → 100644
View file @
5cfc3a50
import
pprint
import
pathlib
import
cv2
as
cv
import
numpy
as
np
import
pandas
as
pd
import
PyPDF2
,
re
,
os
import
datetime
as
dt
import
cv2
,
dlib
,
math
import
mediapipe
as
mp
from
fastai.vision.all
import
*
from
transformers
import
pipeline
,
T5ForConditionalGeneration
,
T5Tokenizer
import
pke
import
nltk
import
torch
import
random
import
string
import
warnings
import
requests
import
traceback
import
numpy
as
np
from
textwrap3
import
wrap
from
nltk.corpus
import
stopwords
from
pywsd.lesk
import
simple_lesk
from
pywsd.lesk
import
cosine_lesk
from
pywsd.lesk
import
adapted_lesk
from
nltk.corpus
import
wordnet
as
wn
from
flashtext
import
KeywordProcessor
from
nltk.tokenize
import
sent_tokenize
from
pywsd.similarity
import
max_similarity
from
flask
import
Flask
,
request
,
jsonify
from
flask_cors
import
CORS
,
cross_origin
temp
=
pathlib
.
PosixPath
pathlib
.
PosixPath
=
pathlib
.
WindowsPath
app
=
Flask
(
__name__
)
cors
=
CORS
(
app
)
app
.
config
[
'UPLOAD_FOLDER'
]
=
'uploads/'
device
=
torch
.
device
(
"cuda"
if
torch
.
cuda
.
is_available
()
else
"cpu"
)
mp_face_mesh
=
mp
.
solutions
.
face_mesh
face_mesh
=
mp_face_mesh
.
FaceMesh
(
min_detection_confidence
=
0.5
,
min_tracking_confidence
=
0.5
)
face_detector
=
dlib
.
get_frontal_face_detector
()
shape_predictor
=
dlib
.
shape_predictor
(
"weights/shape_predictor_68_face_landmarks.dat"
)
learn_emotion
=
load_learner
(
'weights/emotions_rf.pkl'
)
learn_emotion_labels
=
learn_emotion
.
dls
.
vocab
# pipeline_lesson = pipeline(
# task="summarization",
# model="./weights/lesson-summarization",
# device=0 if torch.cuda.is_available() else -1,
# )
# summary_model = T5ForConditionalGeneration.from_pretrained('t5-base')
# summary_tokenizer = T5Tokenizer.from_pretrained('t5-base')
# question_model = T5ForConditionalGeneration.from_pretrained('ramsrigouthamg/t5_squad_v1')
# question_tokenizer = T5Tokenizer.from_pretrained('ramsrigouthamg/t5_squad_v1')
# question_model = question_model.to(device)
# summary_model = summary_model.to(device)
# print("All models loaded successfully!")
head_pose_dict
=
{
"Looking Left"
:
0
,
"Looking Right"
:
1
,
"Looking Up"
:
2
,
"Looking Down"
:
3
,
"Looking Forward"
:
4
}
drowsiness_dict
=
{
"Sleepy"
:
0
,
"Not Sleepy"
:
1
}
emotion_dict
=
{
"angry"
:
0
,
"disgust"
:
1
,
"fear"
:
2
,
"happy"
:
3
,
"neutral"
:
4
,
"sad"
:
5
,
"surprise"
:
6
}
def
set_seed
(
seed
:
int
):
random
.
seed
(
seed
)
np
.
random
.
seed
(
seed
)
torch
.
manual_seed
(
seed
)
torch
.
cuda
.
manual_seed_all
(
seed
)
nltk
.
download
(
'punkt'
)
nltk
.
download
(
'brown'
)
nltk
.
download
(
'wordnet'
)
nltk
.
download
(
'stopwords'
)
warnings
.
filterwarnings
(
'ignore'
)
set_seed
(
42
)
# def find_available_hours_per_week(available_time_dict):
# available_hours = {}
# for day in available_time_dict.keys():
# available_hour_per_day = 0
# for time in available_time_dict[day]:
# start_time = time.split('-')[0]
# end_time = time.split('-')[1]
# start_time = dt.datetime.strptime(start_time, '%H:%M')
# end_time = dt.datetime.strptime(end_time, '%H:%M')
# available_hour_per_day += (end_time - start_time).seconds/3600
# available_hours[day] = available_hour_per_day
# return available_hours
# def map_time_slot(duration):
# # map each half hour to a number, so a day should have 0 - 48
# # eg : duration - '06:00-18:00'
# start_time = duration.split('-')[0]
# end_time = duration.split('-')[1]
# start_time = dt.datetime.strptime(start_time, '%H:%M')
# end_time = dt.datetime.strptime(end_time, '%H:%M')
# time_starter = dt.datetime.strptime('00:00', '%H:%M')
# idx_tracker = []
# idx = 0
# while True:
# if time_starter >= start_time and time_starter < end_time:
# idx_tracker.append(idx)
# time_starter += dt.timedelta(minutes=30)
# idx += 1
# if time_starter >= dt.datetime.strptime('23:59', '%H:%M'):
# break
# return idx_tracker
# def map_idx_to_time(idx):
# time_starter = dt.datetime.strptime('00:00', '%H:%M')
# for i in range(idx):
# time_starter += dt.timedelta(minutes=30)
# start_time = time_starter.strftime('%H:%M')
# end_time = (time_starter + dt.timedelta(minutes=30)).strftime('%H:%M')
# return f"{start_time}-{end_time}"
# def find_available_slots_per_week(available_time_dict):
# available_slots = {}
# for day in available_time_dict.keys():
# available_slots_per_day = []
# for time in available_time_dict[day]:
# available_slots_per_day += map_time_slot(time)
# available_slots[day] = available_slots_per_day
# return available_slots
# def extract_all_slots(available_times):
# slot_dict = find_available_slots_per_week(available_times)
# all_slots = []
# for dat, slots in slot_dict.items():
# for slot in slots:
# all_slots.append(f"{dat}_{slot}")
# return all_slots
# def reward_per_module(
# student_details,
# level_mapping = {
# 'Easy': 0,
# 'Average': 1,
# 'Hard': 2
# }):
# df = pd.DataFrame(student_details)
# df['Student Level'] = df['Student Level'].map(level_mapping)
# CreditPoints = df['Credit points'].values
# StudentLevel = df['Student Level'].values
# StudentLevel = [int(i) + 1 for i in StudentLevel]
# TotalChapters = df['Total Chapters'].values
# StudiedAlready = df['Studied Already'].values
# ChapterScore = StudiedAlready / TotalChapters
# reward = (CreditPoints * StudentLevel) * ChapterScore
# reward = reward / np.sum(reward)
# modules = df['Subject name & PDF'].values
# return dict(zip(modules, reward))
# def q_learning():
# Q = np.zeros([7, 48])
# gamma = 0.8
# alpha = 0.9
# num_episodes = 1000
# for i in range(num_episodes):
# s = 0
# while s < 6:
# a = np.argmax(Q[s, :] + np.random.randn(1, 48) * (1. / (i + 1)))
# s_prime = s + 1
# r = 1
# Q[s, a] = Q[s, a] + alpha * (r + gamma * np.max(Q[s_prime, :]) - Q[s, a])
# s = s_prime
# return Q
# def assign_slots_per_week(available_times, student_details):
# module_weights = reward_per_module(student_details)
# all_slots = extract_all_slots(available_times)
# module_weights = {k : int(v * len(all_slots)) for k, v in module_weights.items()}
# # sort the modules based on weights
# module_weights = {k: v for k, v in sorted(module_weights.items(), key=lambda item: item[1], reverse=True)}
# module_weights[list(module_weights.keys())[-1]] = (len(all_slots) - sum(module_weights.values())) + module_weights[list(module_weights.keys())[-1]]
# all_slots_cp = all_slots.copy()
# all_slots_cp = np.array(all_slots_cp)
# assign_slots = {}
# for module, weight in module_weights.items():
# Q = q_learning()
# rand_idxs = np.random.choice(len(all_slots_cp), weight, replace=False)
# module_json = {}
# for idx in rand_idxs:
# slopt_details = all_slots_cp[idx]
# day, splot_id = slopt_details.split('_')
# if day not in module_json.keys():
# module_json[day] = []
# module_json[day].append(int(splot_id))
# all_slots_cp = np.delete(all_slots_cp, rand_idxs)
# assign_slots[module] = module_json
# for module, slots in assign_slots.items():
# for day, slot_ids in slots.items():
# # sort the slot ids in ascending order
# slot_ids = sorted(slot_ids)
# assign_slots[module][day] = slot_ids
# assign_slots[module][day] = [map_idx_to_time(i) for i in slot_ids]
# return assign_slots
# def read_pdf_data(pdf_path):
# pdf_file = open(pdf_path, 'rb')
# pdf_reader = PyPDF2.PdfFileReader(pdf_file)
# num_pages = pdf_reader.getNumPages()
# whole_text = ''
# for page in range(num_pages):
# page_obj = pdf_reader.getPage(page)
# text = page_obj.extractText()
# whole_text += f" {text}"
# pdf_file.close()
# # split this text into paragraphs
# sentences = whole_text.split('\n')
# sen_lengths = [len(sen) for sen in sentences]
# avg_sen_length = np.mean(sen_lengths)
# avg_sen_length = 80
# # split into paragraphs
# paragraphs = []
# paragraph = ''
# for sentence in sentences:
# if len(sentence) > avg_sen_length:
# paragraph += f" {sentence}"
# else:
# paragraphs.append(paragraph)
# paragraph = ''
# return paragraphs
# def inference_lesson_summarizer(pdf_path):
# paragraphs = read_pdf_data(pdf_path)
# summarized_text = ''
# for paragraph in paragraphs:
# summary_paragraph = pipeline_lesson(paragraph)[0]['summary_text']
# summarized_text += f"{summary_paragraph}\n\n"
# return {
# "summary_text": f"{summarized_text}"
# }
# def postprocesstext (content):
# final=""
# for sent in sent_tokenize(content):
# sent = sent.capitalize()
# final = final +" "+sent
# return final
# def summarizer(
# text,
# model,
# tokenizer,
# max_len = 512
# ):
# text = text.strip().replace("\n"," ")
# text = "summarize: "+text
# encoding = tokenizer.encode_plus(
# text,
# max_length=max_len,
# pad_to_max_length=False,
# return_tensors="pt",
# truncation=True
# ).to(device)
# input_ids, attention_mask = encoding["input_ids"], encoding["attention_mask"]
# outs = model.generate(
# input_ids=input_ids,
# attention_mask=attention_mask,
# early_stopping=True,
# num_beams=3,
# num_return_sequences=1,
# no_repeat_ngram_size=2,
# min_length = 75,
# max_length=300
# )
# dec = [tokenizer.decode(ids,skip_special_tokens=True) for ids in outs]
# summary = dec[0]
# summary = postprocesstext(summary)
# summary= summary.strip()
# return summary
# def get_nouns_multipartite(content):
# out=[]
# try:
# extractor = pke.unsupervised.MultipartiteRank()
# extractor.load_document(input=content,language='en')
# pos = {'PROPN','NOUN'}
# stoplist = list(string.punctuation)
# stoplist += ['-lrb-', '-rrb-', '-lcb-', '-rcb-', '-lsb-', '-rsb-']
# stoplist += stopwords.words('english')
# extractor.candidate_selection(pos=pos)
# extractor.candidate_weighting(alpha=1.1,
# threshold=0.75,
# method='average')
# keyphrases = extractor.get_n_best(n=15)
# for val in keyphrases:
# out.append(val[0])
# except:
# out = []
# traceback.print_exc()
# return out
# def get_keywords(
# originaltext,
# summarytext,
# n_questions = 8
# ):
# keywords = get_nouns_multipartite(originaltext)
# # print ("keywords unsummarized: ",keywords)
# keyword_processor = KeywordProcessor()
# for keyword in keywords:
# keyword_processor.add_keyword(keyword)
# keywords_found = keyword_processor.extract_keywords(summarytext)
# keywords_found = list(set(keywords_found))
# # print ("keywords_found in summarized: ",keywords_found)
# important_keywords =[]
# for keyword in keywords:
# if keyword in keywords_found:
# important_keywords.append(keyword)
# return important_keywords[:n_questions] if len(important_keywords) > n_questions else important_keywords
# def get_question(context,answer,model,tokenizer):
# text = "context: {} answer: {}".format(context,answer)
# encoding = tokenizer.encode_plus(text,max_length=384, pad_to_max_length=False,truncation=True, return_tensors="pt").to(device)
# input_ids, attention_mask = encoding["input_ids"], encoding["attention_mask"]
# outs = model.generate(
# input_ids=input_ids,
# attention_mask=attention_mask,
# early_stopping=True,
# num_beams=5,
# num_return_sequences=1,
# no_repeat_ngram_size=2,
# max_length=72
# )
# dec = [tokenizer.decode(ids,skip_special_tokens=True) for ids in outs]
# Question = dec[0].replace("question:","")
# Question= Question.strip()
# return Question
# def get_distractors_wordnet(syn,word):
# distractors=[]
# word= word.lower()
# orig_word = word
# if len(word.split())>0:
# word = word.replace(" ","_")
# hypernym = syn.hypernyms()
# if len(hypernym) == 0:
# return distractors
# for item in hypernym[0].hyponyms():
# name = item.lemmas()[0].name()
# if name == orig_word:
# continue
# name = name.replace("_"," ")
# name = " ".join(w.capitalize() for w in name.split())
# if name is not None and name not in distractors:
# distractors.append(name)
# return distractors
# def get_wordsense(sent,word):
# word= word.lower()
# if len(word.split())>0:
# word = word.replace(" ","_")
# synsets = wn.synsets(word,'n')
# if synsets:
# wup = max_similarity(sent, word, 'wup', pos='n')
# adapted_lesk_output = adapted_lesk(sent, word, pos='n')
# lowest_index = min (synsets.index(wup),synsets.index(adapted_lesk_output))
# return synsets[lowest_index]
# else:
# return None
# def get_distractors_conceptnet(word):
# word = word.lower()
# original_word= word
# if (len(word.split())>0):
# word = word.replace(" ","_")
# distractor_list = []
# url = "http://api.conceptnet.io/query?node=/c/en/%s/n&rel=/r/PartOf&start=/c/en/%s&limit=20"%(word,word)
# obj = requests.get(url).json()
# for edge in obj['edges']:
# link = edge['end']['term']
# url2 = "http://api.conceptnet.io/query?node=%s&rel=/r/PartOf&end=%s&limit=20"%(link,link)
# obj2 = requests.get(url2).json()
# for edge in obj2['edges']:
# word2 = edge['start']['label']
# if word2 not in distractor_list and original_word.lower() not in word2.lower():
# distractor_list.append(word2)
# return distractor_list
# def get_distractors_ensemble(q, a):
# try:
# wordsense = get_wordsense(q,a)
# if wordsense:
# distractors = get_distractors_wordnet(wordsense,a)
# if len(distractors) ==0:
# distractors = get_distractors_conceptnet(a)
# if len(distractors) != 0:
# distractors =[dis.capitalize() for dis in distractors if dis.lower() not in a.lower()]
# return distractors
# else:
# distractors = get_distractors_conceptnet(a)
# if len(distractors) != 0:
# distractors =[dis.capitalize() for dis in distractors if dis.lower() not in a.lower()]
# return distractors
# except:
# return []
# def qna_generation_pipeline(pdf_path, n_questions):
# pdf_file = open(pdf_path, 'rb')
# pdf_reader = PyPDF2.PdfFileReader(pdf_file)
# num_pages = pdf_reader.getNumPages()
# whole_text = ''
# for page in range(num_pages):
# page_obj = pdf_reader.getPage(page)
# text = page_obj.extractText()
# whole_text += f" {text}"
# pdf_file.close()
# summarized_text = summarizer(whole_text,summary_model,summary_tokenizer)
# imp_keywords = get_keywords(whole_text,summarized_text, n_questions = n_questions)
# data = []
# for answer in imp_keywords:
# q_json = {}
# ques = get_question(summarized_text,answer,question_model,question_tokenizer)
# distractors = get_distractors_ensemble(ques, answer)
# answer = answer.capitalize()
# choices = [answer] + distractors[:3]
# random.shuffle(choices)
# q_json["question"] = ques
# q_json["answer"] = answer
# q_json["choices"] = choices
# data.append(q_json)
# return data
def
mid
(
p1
,
p2
):
return
int
((
p1
.
x
+
p2
.
x
)
/
2
),
int
((
p1
.
y
+
p2
.
y
)
/
2
)
def
eye_aspect_ratio
(
eye_landmark
,
face_roi_landmark
):
left_point
=
(
face_roi_landmark
.
part
(
eye_landmark
[
0
])
.
x
,
face_roi_landmark
.
part
(
eye_landmark
[
0
])
.
y
)
right_point
=
(
face_roi_landmark
.
part
(
eye_landmark
[
3
])
.
x
,
face_roi_landmark
.
part
(
eye_landmark
[
3
])
.
y
)
center_top
=
mid
(
face_roi_landmark
.
part
(
eye_landmark
[
1
]),
face_roi_landmark
.
part
(
eye_landmark
[
2
]))
center_bottom
=
mid
(
face_roi_landmark
.
part
(
eye_landmark
[
5
]),
face_roi_landmark
.
part
(
eye_landmark
[
4
]))
hor_line_length
=
math
.
hypot
((
left_point
[
0
]
-
right_point
[
0
]),
(
left_point
[
1
]
-
right_point
[
1
]))
ver_line_length
=
math
.
hypot
((
center_top
[
0
]
-
center_bottom
[
0
]),
(
center_top
[
1
]
-
center_bottom
[
1
]))
ratio
=
hor_line_length
/
ver_line_length
return
ratio
def
mouth_aspect_ratio
(
lips_landmark
,
face_roi_landmark
):
left_point
=
(
face_roi_landmark
.
part
(
lips_landmark
[
0
])
.
x
,
face_roi_landmark
.
part
(
lips_landmark
[
0
])
.
y
)
right_point
=
(
face_roi_landmark
.
part
(
lips_landmark
[
2
])
.
x
,
face_roi_landmark
.
part
(
lips_landmark
[
2
])
.
y
)
center_top
=
(
face_roi_landmark
.
part
(
lips_landmark
[
1
])
.
x
,
face_roi_landmark
.
part
(
lips_landmark
[
1
])
.
y
)
center_bottom
=
(
face_roi_landmark
.
part
(
lips_landmark
[
3
])
.
x
,
face_roi_landmark
.
part
(
lips_landmark
[
3
])
.
y
)
hor_line_length
=
math
.
hypot
((
left_point
[
0
]
-
right_point
[
0
]),
(
left_point
[
1
]
-
right_point
[
1
]))
ver_line_length
=
math
.
hypot
((
center_top
[
0
]
-
center_bottom
[
0
]),
(
center_top
[
1
]
-
center_bottom
[
1
]))
if
hor_line_length
==
0
:
return
ver_line_length
ratio
=
ver_line_length
/
hor_line_length
return
ratio
def
predict_emotion
(
img_path
):
img
=
cv
.
imread
(
img_path
)
img
=
cv
.
cvtColor
(
img
,
cv
.
COLOR_BGR2GRAY
)
img
=
cv
.
resize
(
img
,
(
48
,
48
))
img
=
PILImage
.
create
(
img
)
probs_emotion
=
learn_emotion
.
predict
(
img
)[
-
1
]
emotions
=
{
learn_emotion_labels
[
i
]:
float
(
probs_emotion
[
i
])
for
i
in
range
(
len
(
learn_emotion_labels
))}
return
emotions
def
head_pose_estimation
(
video_path
,
is_visualize
=
False
):
if
video_path
is
not
None
:
cap
=
cv2
.
VideoCapture
(
video_path
)
else
:
cap
=
cv2
.
VideoCapture
(
0
)
head_pose_state
=
[]
fps
=
int
(
cap
.
get
(
cv2
.
CAP_PROP_FPS
))
counter
=
0
while
cap
.
isOpened
():
_
,
image
=
cap
.
read
()
print
(
image
)
if
image
is
None
:
break
if
counter
%
fps
==
0
:
image
=
cv2
.
cvtColor
(
cv2
.
flip
(
image
,
1
),
cv2
.
COLOR_BGR2RGB
)
image
.
flags
.
writeable
=
False
results
=
face_mesh
.
process
(
image
)
image
.
flags
.
writeable
=
True
image
=
cv2
.
cvtColor
(
image
,
cv2
.
COLOR_RGB2BGR
)
img_h
,
img_w
,
_
=
image
.
shape
face_3d
=
[]
face_2d
=
[]
if
results
.
multi_face_landmarks
:
for
face_landmarks
in
results
.
multi_face_landmarks
:
for
idx
,
lm
in
enumerate
(
face_landmarks
.
landmark
):
if
idx
==
33
or
idx
==
263
or
idx
==
1
or
idx
==
61
or
idx
==
291
or
idx
==
199
:
if
idx
==
1
:
nose_2d
=
(
lm
.
x
*
img_w
,
lm
.
y
*
img_h
)
nose_3d
=
(
lm
.
x
*
img_w
,
lm
.
y
*
img_h
,
lm
.
z
*
8000
)
x
,
y
=
int
(
lm
.
x
*
img_w
),
int
(
lm
.
y
*
img_h
)
face_2d
.
append
([
x
,
y
])
face_3d
.
append
([
x
,
y
,
lm
.
z
])
face_2d
=
np
.
array
(
face_2d
,
dtype
=
np
.
float64
)
face_3d
=
np
.
array
(
face_3d
,
dtype
=
np
.
float64
)
focal_length
=
1
*
img_w
cam_matrix
=
np
.
array
([
[
focal_length
,
0
,
img_h
/
2
],
[
0
,
focal_length
,
img_w
/
2
],
[
0
,
0
,
1
]])
dist_matrix
=
np
.
zeros
((
4
,
1
),
dtype
=
np
.
float64
)
success
,
rot_vec
,
trans_vec
=
cv2
.
solvePnP
(
face_3d
,
face_2d
,
cam_matrix
,
dist_matrix
)
rmat
,
jac
=
cv2
.
Rodrigues
(
rot_vec
)
angles
,
mtxR
,
mtxQ
,
Qx
,
Qy
,
Qz
=
cv2
.
RQDecomp3x3
(
rmat
)
x
=
angles
[
0
]
*
360
y
=
angles
[
1
]
*
360
if
y
<
-
10
:
text
=
"Looking Left"
elif
y
>
10
:
text
=
"Looking Right"
elif
x
<
-
10
:
text
=
"Looking Down"
elif
x
>
10
:
text
=
"Looking Up"
else
:
text
=
"Looking Forward"
head_pose_state
.
append
(
head_pose_dict
[
text
])
if
is_visualize
:
nose_3d_projection
,
_
=
cv2
.
projectPoints
(
nose_3d
,
rot_vec
,
trans_vec
,
cam_matrix
,
dist_matrix
)
p1
=
(
int
(
nose_2d
[
0
]),
int
(
nose_2d
[
1
]))
p2
=
(
int
(
nose_3d_projection
[
0
][
0
][
0
]),
int
(
nose_3d_projection
[
0
][
0
][
1
]))
cv2
.
line
(
image
,
p1
,
p2
,
(
255
,
0
,
0
),
2
)
cv2
.
putText
(
image
,
text
,
(
20
,
20
),
cv2
.
FONT_HERSHEY_SIMPLEX
,
1
,
(
0
,
0
,
255
),
2
)
if
is_visualize
:
cv2
.
imshow
(
'Head Pose Estimation'
,
image
)
if
cv2
.
waitKey
(
30
)
&
0xFF
==
27
:
break
counter
+=
1
# cap.release()
return
head_pose_state
def
drowsiness_detection
(
video_path
,
font
=
cv2
.
FONT_HERSHEY_TRIPLEX
):
# if video_path is not None:
# cap = cv2.VideoCapture(video_path)
# else:
# cap = cv2.VideoCapture(0)
cap
=
cv2
.
VideoCapture
(
video_path
)
count
=
0
drowsiness_state
=
[]
fps
=
int
(
cap
.
get
(
cv2
.
CAP_PROP_FPS
))
fps
=
1
# if is_visualize:
counter
=
0
while
cap
.
isOpened
():
_
,
img
=
cap
.
read
()
print
(
"img"
,
img
.
shape
)
if
img
is
None
:
break
if
counter
%
fps
==
0
:
img
=
cv2
.
flip
(
img
,
1
)
gray
=
cv2
.
cvtColor
(
img
,
cv2
.
COLOR_BGR2GRAY
)
faces
=
face_detector
(
gray
)
for
face_roi
in
faces
:
landmark_list
=
shape_predictor
(
gray
,
face_roi
)
left_eye_ratio
=
eye_aspect_ratio
([
36
,
37
,
38
,
39
,
40
,
41
],
landmark_list
)
right_eye_ratio
=
eye_aspect_ratio
([
42
,
43
,
44
,
45
,
46
,
47
],
landmark_list
)
eye_open_ratio
=
(
left_eye_ratio
+
right_eye_ratio
)
/
2
inner_lip_ratio
=
mouth_aspect_ratio
([
60
,
62
,
64
,
66
],
landmark_list
)
outter_lip_ratio
=
mouth_aspect_ratio
([
48
,
51
,
54
,
57
],
landmark_list
)
mouth_open_ratio
=
(
inner_lip_ratio
+
outter_lip_ratio
)
/
2
if
mouth_open_ratio
>
0.380
and
eye_open_ratio
>
4.0
or
eye_open_ratio
>
4.30
:
count
+=
1
else
:
count
=
0
if
count
>
10
:
drowsiness_state
.
append
(
drowsiness_dict
[
"Sleepy"
])
print
(
drowsiness_state
)
else
:
drowsiness_state
.
append
(
drowsiness_dict
[
"Not Sleepy"
])
print
(
drowsiness_state
)
counter
+=
1
# cap.release()
# cv2.destroyAllWindows()
# n_sl = np.random.randint(75, 90) / 100
# sl = 1 - n_sl
# drowsiness_state = {
# "Not Sleepy": n_sl,
# "Sleepy": sl
# }
return
drowsiness_state
def
emotion_detection
(
video_path
,
is_visualize
=
False
):
if
video_path
is
not
None
:
cap
=
cv2
.
VideoCapture
(
video_path
)
else
:
cap
=
cv2
.
VideoCapture
(
0
)
emotion_state
=
[]
fps
=
int
(
cap
.
get
(
cv2
.
CAP_PROP_FPS
))
fps
=
1
counter
=
0
while
cap
.
isOpened
():
_
,
img
=
cap
.
read
()
if
img
is
None
:
break
if
counter
%
fps
==
0
:
img
=
cv2
.
flip
(
img
,
1
)
gray
=
cv2
.
cvtColor
(
img
,
cv2
.
COLOR_BGR2GRAY
)
faces
=
face_detector
(
gray
)
print
(
faces
)
for
face_roi
in
faces
:
print
(
face_roi
)
x
,
y
=
face_roi
.
left
(),
face_roi
.
top
()
x1
,
y1
=
face_roi
.
right
(),
face_roi
.
bottom
()
cv2
.
rectangle
(
img
,
(
x
,
y
),
(
x1
,
y1
),
(
0
,
255
,
0
),
2
)
roi_gray
=
gray
[
y
:
y1
,
x
:
x1
]
roi_color
=
img
[
y
:
y1
,
x
:
x1
]
roi_gray
=
cv2
.
resize
(
roi_gray
,
(
48
,
48
))
roi_gray
=
PILImage
.
create
(
roi_gray
)
probs_emotion
=
learn_emotion
.
predict
(
roi_gray
)[
-
1
]
emotions
=
{
learn_emotion_labels
[
i
]:
float
(
probs_emotion
[
i
])
for
i
in
range
(
len
(
learn_emotion_labels
))}
emotion
=
max
(
emotions
,
key
=
emotions
.
get
)
print
(
emotion
)
if
is_visualize
:
cv2
.
putText
(
img
,
emotion
,
(
x
,
y
-
5
),
cv2
.
FONT_HERSHEY_SIMPLEX
,
0.5
,
(
0
,
0
,
255
))
emotion_state
.
append
(
emotion_dict
[
emotion
])
if
is_visualize
:
cv2
.
imshow
(
"Emotion Detection"
,
img
)
if
cv2
.
waitKey
(
1
)
&
0xFF
==
27
:
break
counter
+=
1
# cap.release()
# cv2.destroyAllWindows()
return
emotion_state
def
inference_attention_analyzer
(
video_path
):
head_pose_state
=
head_pose_estimation
(
video_path
)
print
(
"head_pose_state"
,
head_pose_state
)
drowsiness_state
=
drowsiness_detection
(
video_path
)
print
(
"drowsiness_state"
,
drowsiness_state
)
emotion_state
=
emotion_detection
(
video_path
)
print
(
"emotion_state"
,
emotion_state
)
head_pose_distribution
=
np
.
bincount
(
head_pose_state
)
drowsiness_distribution
=
np
.
bincount
(
drowsiness_state
)
emotion_distribution
=
np
.
bincount
(
emotion_state
)
head_pose_percentage
=
head_pose_distribution
/
len
(
head_pose_state
)
*
100
drowsiness_percentage
=
drowsiness_distribution
/
len
(
drowsiness_state
)
*
100
emotion_percentage
=
emotion_distribution
/
len
(
emotion_state
)
*
100
head_pose_response
=
{
key
:
f
"{round(value, 2)}
%
"
for
key
,
value
in
zip
(
head_pose_dict
.
keys
(),
head_pose_percentage
)}
drowsiness_response
=
{
key
:
f
"{round(value, 2)}
%
"
for
key
,
value
in
zip
(
drowsiness_dict
.
keys
(),
drowsiness_percentage
)}
emotion_response
=
{
key
:
f
"{round(value, 2)}
%
"
for
key
,
value
in
zip
(
emotion_dict
.
keys
(),
emotion_percentage
)}
return
{
"head_pose"
:
head_pose_response
,
"drowsiness"
:
drowsiness_response
,
"emotion"
:
emotion_response
}
video_path
=
'videos/111.mp4'
# print(inference_attention_analyzer(video_path))
print
(
drowsiness_detection
(
video_path
))
\ No newline at end of file
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment