Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
2
2022-073
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Analytics
Analytics
CI / CD
Repository
Value Stream
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
2022-073
2022-073
Commits
a91069b6
Commit
a91069b6
authored
Oct 11, 2022
by
AdithyaKahawanugoda
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
kaldi configs added to git ignore, main py modified
parent
cab3cd1b
Changes
1
Hide whitespace changes
Inline
Side-by-side
Showing
1 changed file
with
271 additions
and
158 deletions
+271
-158
web-app/backend/app.py
web-app/backend/app.py
+271
-158
No files found.
web-app/backend/app.py
View file @
a91069b6
...
...
@@ -14,6 +14,7 @@ import numpy as np
from
trainAnn
import
headPose
,
headDistence
,
facialExperssions
,
blinkCount
,
detector
from
keras.models
import
load_model
from
sklearn.preprocessing
import
OneHotEncoder
from
kaldiApp
import
upload_file
# Attention classes
attClass
=
{
0
:
'Low Attention'
,
1
:
'Mid Attention'
,
2
:
'High Attention'
}
...
...
@@ -29,109 +30,6 @@ app = Flask(__name__)
app
.
config
[
'UPLOAD_FOLDER'
]
=
UPLOAD_FOLDER
CORS
(
app
)
def
captureImage
(
link
,
frameSkip
):
global
lastAttention
model_new
=
load_model
(
"attScore.h5"
)
encoder2
=
OneHotEncoder
()
encoder2
.
fit_transform
(
np
.
array
([
0
,
1
,
2
])
.
reshape
(
-
1
,
1
))
allFrames
=
0
finalList
=
[
0
,
0
,
0
]
cap
=
cv
.
VideoCapture
(
link
)
while
cap
.
isOpened
():
success
,
image
=
cap
.
read
()
if
success
:
if
allFrames
%
frameSkip
==
0
:
headPoseImg
,
landMarks
,
state1
=
headPose
(
image
.
copy
())
headDisImg
,
headDis
,
state2
=
headDistence
(
image
.
copy
(),
detector
)
expImg
,
expression
,
state3
=
facialExperssions
(
image
.
copy
())
blinkCountImg
,
blinkTime
=
blinkCount
(
image
.
copy
(),
detector
)
if
state1
and
state2
and
state3
:
arr
=
[
list
(
map
(
float
,
landMarks
))
+
[
float
(
headDis
)]
+
expression
.
tolist
()[
0
]]
preClass
=
np
.
zeros
([
3
],
dtype
=
int
)
pre
=
list
(
model_new
.
predict
(
arr
)[
0
])
preClass
[
pre
.
index
(
max
(
pre
))]
=
1
final
=
encoder2
.
inverse_transform
([
preClass
])
finalList
[
final
[
0
][
0
]]
+=
1
headPoseImg
=
cv
.
resize
(
headPoseImg
,
(
640
,
380
))
headDisImg
=
cv
.
resize
(
headDisImg
,
(
640
,
380
))
blinkCountImg
=
cv
.
resize
(
blinkCountImg
,
(
640
,
380
))
expImg
=
cv
.
resize
(
expImg
,
(
640
,
380
))
image
=
cv
.
resize
(
image
,
(
640
,
380
))
# imgPlot = plotY.update(lastAttention, color=(255, 0, 0))
# hor = np.hstack((headPoseImg, headDisImg))
# hor2 = np.hstack((blinkCountImg, expImg))
# ver = np.vstack((hor, hor2))
# finaliImage = cv.putText(headPoseImg," ".join(list(map(lambda x:str(round(x,2)),finalList)))+" "+attClass[final[0][0]], (20,80), cv.FONT_HERSHEY_SIMPLEX, 0.7,(0,255,0), 2, cv.LINE_AA)
# cv.imshow("ImageStack", finaliImage)
else
:
break
if
cv
.
waitKey
(
5
)
&
0xFF
==
27
:
break
allFrames
+=
1
# if allFrames==1000:
# break
cap
.
release
()
return
list
(
map
(
lambda
x
:
round
(
x
/
sum
(
finalList
),
4
),
finalList
))
# create our model
IMG_SHAPE
=
(
224
,
224
,
3
)
def
get_model
():
letterClassificationmodel
=
load_model
(
'letterClassification.h5'
,
compile
=
False
)
return
letterClassificationmodel
# decode the imaeg coming from the request
def
decode_request
(
req
):
encoded
=
req
[
"image"
]
decoded
=
base64
.
b64decode
(
encoded
)
# print(decoded)
return
decoded
# preprocess image before sending it to the model
def
preprocess
(
decoded
):
# resize and convert in RGB in case image is in RGBA
pil_image
=
Image
.
open
(
io
.
BytesIO
(
decoded
))
.
resize
(
(
224
,
224
),
Image
.
LANCZOS
)
.
convert
(
"RGB"
)
image
=
np
.
asarray
(
pil_image
)
batch
=
np
.
expand_dims
(
image
,
axis
=
0
)
return
batch
# load model so it's in memory and not loaded each time there is a request
model
=
get_model
()
# function predict is called at each request
# for prediction
@
app
.
route
(
'/upload'
,
methods
=
[
'POST'
])
def
predictAttention
():
print
(
request
)
...
...
@@ -139,31 +37,172 @@ def predictAttention():
if
not
os
.
path
.
isdir
(
target
):
os
.
mkdir
(
target
)
file
=
request
.
files
[
'file'
]
testNo
=
request
.
form
.
get
(
'questionIndex'
)
filename
=
secure_filename
(
"Testing_vid"
)
destination
=
"/"
.
join
([
target
,
filename
])
file
.
save
(
destination
)
session
[
'uploadFilePath'
]
=
destination
scoreLst
=
captureImage
(
destination
,
10
)
db
.
db
[
'attentionlevelScore'
]
.
insert_one
({
"activityName"
:
"Manual_Attention_Test"
,
"questionIndex"
:
testNo
,
"attentionLevelScore"
:
scoreLst
,
})
print
(
"Prediction"
,
scoreLst
)
return
scoreLst
@
app
.
route
(
'/parents/'
,
methods
=
[
'POST'
,
'GET'
])
@
cross_origin
()
def
data2
():
# POST a data to database
if
request
.
method
==
'POST'
:
body
=
request
.
json
parentName
=
body
[
'parentName'
]
parentSignature
=
body
[
'parentSignature'
]
childCandidate
=
body
[
'childCandidate'
]
# db.users.insert_one({
db
.
db
[
'parents'
]
.
insert_one
({
"parentName"
:
parentName
,
"parentSignature"
:
parentSignature
,
"childCandidate"
:
childCandidate
,
})
return
jsonify
({
'status'
:
'Parents details are added to the system successfully!'
,
})
@
app
.
route
(
'/users/'
,
methods
=
[
'POST'
,
'GET'
])
@
cross_origin
()
def
data
():
# POST a data to database
if
request
.
method
==
'POST'
:
body
=
request
.
json
firstName
=
body
[
'firstName'
]
lastName
=
body
[
'lastName'
]
nationality
=
body
[
'nationality'
]
school
=
body
[
'school'
]
age
=
body
[
'age'
]
residence
=
body
[
'residence'
]
disorders
=
body
[
'disorders'
]
# db.users.insert_one({
db
.
db
[
'users'
]
.
insert_one
({
"firstName"
:
firstName
,
"lastName"
:
lastName
,
"nationality"
:
nationality
,
"school"
:
school
,
"age"
:
age
,
"residence"
:
residence
,
"disorders"
:
disorders
,
})
return
jsonify
({
'status'
:
'Candidate details are added to the system successfully!'
,
})
# GET all data from database
if
request
.
method
==
'GET'
:
allData
=
db
.
db
[
'users'
]
.
find
()
dataJson
=
[]
for
data
in
allData
:
id
=
data
[
'_id'
]
firstName
=
data
[
'firstName'
]
lastName
=
data
[
'lastName'
]
nationality
=
data
[
'nationality'
]
school
=
data
[
'school'
]
age
=
data
[
'age'
]
residence
=
data
[
'residence'
]
disorders
=
data
[
'disorders'
]
dataDict
=
{
'id'
:
str
(
id
),
'firstName'
:
firstName
,
'lastName'
:
lastName
,
'nationality'
:
nationality
,
'school'
:
school
,
'age'
:
age
,
'residence'
:
residence
,
'disorders'
:
disorders
}
dataJson
.
append
(
dataDict
)
return
jsonify
(
dataJson
)
@
app
.
route
(
'/users/<string:id>'
,
methods
=
[
'GET'
,
'DELETE'
,
'PUT'
])
def
onedata
(
id
):
# GET a specific data by id
if
request
.
method
==
'GET'
:
data
=
db
.
db
[
'users'
]
.
find_one
({
'_id'
:
ObjectId
(
id
)})
id
=
data
[
'_id'
]
firstName
=
data
[
'firstName'
]
lastName
=
data
[
'lastName'
]
nationality
=
data
[
'nationality'
]
school
=
data
[
'school'
]
age
=
data
[
'age'
]
residence
=
data
[
'residence'
]
disorders
=
data
[
'disorders'
]
dataDict
=
{
'id'
:
str
(
id
),
'firstName'
:
firstName
,
'lastName'
:
lastName
,
'nationality'
:
nationality
,
'school'
:
school
,
'age'
:
age
,
'residence'
:
residence
,
'disorders'
:
disorders
}
print
(
dataDict
)
return
jsonify
(
dataDict
)
# DELETE a data
if
request
.
method
==
'DELETE'
:
db
.
db
[
'users'
]
.
delete_many
({
'_id'
:
ObjectId
(
id
)})
print
(
'
\n
# Deletion successful #
\n
'
)
return
jsonify
({
'status'
:
'Data id: '
+
id
+
' is deleted!'
})
# UPDATE a data by id
if
request
.
method
==
'PUT'
:
body
=
request
.
json
firstName
=
body
[
'firstName'
]
lastName
=
body
[
'lastName'
]
nationality
=
body
[
'nationality'
]
school
=
body
[
'school'
]
age
=
body
[
'age'
]
residence
=
body
[
'residence'
]
disorders
=
body
[
'disorders'
]
db
.
db
[
'users'
]
.
update_one
(
{
'_id'
:
ObjectId
(
id
)},
{
"$set"
:
{
"firstName"
:
firstName
,
"lastName"
:
lastName
,
"nationality"
:
nationality
,
"school"
:
school
,
"age"
:
age
,
"residence"
:
residence
,
"disorders"
:
disorders
,
}
}
)
print
(
'
\n
# Update successful #
\n
'
)
return
jsonify
({
'status'
:
'Data id: '
+
id
+
' is updated!'
})
# Mental Cronometry API
@
app
.
route
(
"/predictletters"
,
methods
=
[
"POST"
])
def
predict
():
# get the data from the request and put ir under the right format
letterClassificationmodel
=
load_model
(
'letterClassification.h5'
,
compile
=
False
)
req
=
request
.
get_json
(
force
=
True
)
# print(req)
encoded
=
req
[
"image"
]
decoded
=
base64
.
b64decode
(
encoded
)
print
(
decoded
)
with
open
(
"imgt.png"
,
"wb"
)
as
fh
:
fh
.
write
(
base64
.
decodebytes
(
decoded
))
# print(decoded)
pil_image
=
Image
.
open
(
io
.
BytesIO
(
decoded
))
.
resize
(
(
224
,
224
),
Image
.
LANCZOS
)
.
convert
(
"RGB"
)
...
...
@@ -171,7 +210,6 @@ def predict():
image
.
save
(
"imgtest.png"
)
data
=
np
.
ndarray
(
shape
=
(
1
,
224
,
224
,
3
),
dtype
=
np
.
float32
)
# image = decoded # Image.open('img.png')
size
=
(
224
,
224
)
image_array
=
np
.
asarray
(
image
)
...
...
@@ -182,39 +220,27 @@ def predict():
word_dict
=
{
0
:
'B'
,
1
:
'D'
,
2
:
'M'
,
3
:
'P'
,
4
:
'R'
,
5
:
'S'
,
6
:
'U'
,
7
:
'Y'
}
prediction
=
word_dict
[
np
.
argmax
(
model
.
predict
(
data
))]
prediction
=
word_dict
[
np
.
argmax
(
letterClassification
model
.
predict
(
data
))]
prediction2
=
model
.
predict
(
data
)
# print(prediction)
# response = jsonify({"prediction": prediction})
# response.headers.add('Access-Control-Allow-Origin', '*')
# return response
prediction2
=
letterClassificationmodel
.
predict
(
data
)
response
=
{
"prediction"
:
prediction
}
print
(
prediction2
)
return
jsonify
(
response
)
# https://medium.com/analytics-vidhya/deploy-your-model-using-a-flask-web-service-461ccaef9ea0
@
app
.
route
(
"/predicthanddrwnshapes"
,
methods
=
[
"POST"
])
@
app
.
route
(
"/predicthanddrwnshapes"
,
methods
=
[
"POST"
])
def
predictHandDrawnShapes
():
handDrawnShapeClassificationmodel
=
load_model
(
'shapeClassification2.h5'
,
compile
=
False
)
# get the data from the request and put ir under the right format
req
=
request
.
get_json
(
force
=
True
)
# print(req)
encoded
=
req
[
"image"
]
decoded
=
base64
.
b64decode
(
encoded
)
print
(
decoded
)
with
open
(
"imgt.png"
,
"wb"
)
as
fh
:
fh
.
write
(
base64
.
decodebytes
(
decoded
))
# print(decoded)
pil_image
=
Image
.
open
(
io
.
BytesIO
(
decoded
))
.
resize
(
(
224
,
224
),
Image
.
LANCZOS
)
.
convert
(
"RGB"
)
...
...
@@ -222,7 +248,6 @@ def predictHandDrawnShapes():
image
.
save
(
"imgtest.png"
)
data
=
np
.
ndarray
(
shape
=
(
1
,
224
,
224
,
3
),
dtype
=
np
.
float32
)
# image = decoded # Image.open('img.png')
size
=
(
224
,
224
)
image_array
=
np
.
asarray
(
image
)
...
...
@@ -235,35 +260,24 @@ def predictHandDrawnShapes():
prediction
=
word_dict
[
np
.
argmax
(
handDrawnShapeClassificationmodel
.
predict
(
data
))]
prediction2
=
model
.
predict
(
data
)
# print(prediction)
# response = jsonify({"prediction": prediction})
# response.headers.add('Access-Control-Allow-Origin', '*')
# return response
prediction2
=
handDrawnShapeClassificationmodel
.
predict
(
data
)
response
=
{
"prediction"
:
prediction
}
print
(
prediction2
)
return
jsonify
(
response
)
@
app
.
route
(
"/predictShapePattern"
,
methods
=
[
"POST"
])
@
app
.
route
(
"/predictShapePattern"
,
methods
=
[
"POST"
])
def
predictShapePattern
():
handDrawnShapeClassificationm
odel
=
load_model
(
shapePatternM
odel
=
load_model
(
'shapeClassification1.h5'
,
compile
=
False
)
# get the data from the request and put ir under the right format
req
=
request
.
get_json
(
force
=
True
)
# print(req)
encoded
=
req
[
"image"
]
decoded
=
base64
.
b64decode
(
encoded
)
print
(
decoded
)
with
open
(
"imgt.png"
,
"wb"
)
as
fh
:
fh
.
write
(
base64
.
decodebytes
(
decoded
))
# print(decoded)
pil_image
=
Image
.
open
(
io
.
BytesIO
(
decoded
))
.
resize
(
(
224
,
224
),
Image
.
LANCZOS
)
.
convert
(
"RGB"
)
...
...
@@ -291,9 +305,8 @@ def predictShapePattern():
cv
.
imwrite
(
'right.jpg'
,
right_part
)
cv
.
imwrite
(
'left.jpg'
,
left_part
)
predArray
=
[]
# image = decoded # Image.open('img.png')
imgArray
=
[
"left.jpg"
,
"right.jpg"
]
for
img
in
imgArray
:
data
=
np
.
ndarray
(
shape
=
(
1
,
224
,
224
,
3
),
dtype
=
np
.
float32
)
...
...
@@ -304,26 +317,126 @@ def predictShapePattern():
normalized_image_array
=
(
image_array
.
astype
(
np
.
float32
)
/
127.0
)
-
1
data
[
0
]
=
normalized_image_array
word_dict
=
{
0
:
'circle'
,
1
:
'
triangle'
,
2
:
'star'
,
3
:
'squar
e'
}
word_dict
=
{
0
:
'circle'
,
1
:
'
square'
,
2
:
'star'
,
3
:
'triangl
e'
}
prediction
=
word_dict
[
np
.
argmax
(
handDrawnShapeClassificationmodel
.
predict
(
data
))]
prediction2
=
model
.
predict
(
data
)
response
=
{
"prediction"
:
prediction
}
shapePatternModel
.
predict
(
data
))]
predArray
.
append
(
prediction
)
prediction2
=
shapePatternModel
.
predict
(
data
)
print
(
prediction
)
# print(prediction)
# response = jsonify({"prediction": prediction})
# response.headers.add('Access-Control-Allow-Origin', '*')
# return response
# response = {"prediction": prediction}
response
=
{
"prediction"
:
predArray
}
# print(prediction2)
return
jsonify
(
response
)
# Reasoning IQ evaluation
@
app
.
route
(
'/predictArithmetic'
,
methods
=
[
'POST'
,
'GET'
])
@
cross_origin
()
def
predictArithmetic
():
if
request
.
method
==
'POST'
:
file
=
request
.
files
.
get
(
'file'
)
questionIndex
=
request
.
form
.
get
(
'questionIndex'
)
candidateID
=
request
.
form
.
get
(
'candidateID'
)
print
(
questionIndex
)
if
file
is
None
or
file
.
filename
==
""
:
return
jsonify
({
'error: no file'
})
try
:
file
.
save
(
"./"
+
file
.
filename
)
prediction
=
upload_file
(
file
)
data
=
{
'prediction'
:
prediction
}
db
.
db
[
'reasoningIQScore'
]
.
insert_one
({
"activityName"
:
"Arithmetic"
,
"candidateID"
:
candidateID
,
"questionIndex"
:
questionIndex
,
"transcription"
:
prediction
,
})
return
jsonify
(
data
)
except
:
return
jsonify
({
'error: Error during pipeline execution'
})
return
jsonify
({
'result: test'
})
@
app
.
route
(
'/predictPictureConcept'
,
methods
=
[
'POST'
,
'GET'
])
@
cross_origin
()
def
predictArithmetic
():
if
request
.
method
==
'POST'
:
file
=
request
.
files
.
get
(
'file'
)
questionIndex
=
request
.
form
.
get
(
'questionIndex'
)
candidateID
=
request
.
form
.
get
(
'candidateID'
)
print
(
questionIndex
)
if
file
is
None
or
file
.
filename
==
""
:
return
jsonify
({
'error: no file'
})
try
:
file
.
save
(
"./"
+
file
.
filename
)
prediction
=
upload_file
(
file
)
data
=
{
'prediction'
:
prediction
}
db
.
db
[
'reasoningIQScore'
]
.
insert_one
({
"activityName"
:
"Arithmetic"
,
"candidateID"
:
candidateID
,
"questionIndex"
:
questionIndex
,
"transcription"
:
prediction
,
})
return
jsonify
(
data
)
except
:
return
jsonify
({
'error: Error during pipeline execution'
})
return
jsonify
({
'result: test'
})
@
app
.
route
(
'/mentalChromScores'
,
methods
=
[
'POST'
])
@
cross_origin
()
def
mentalChromScores
():
body
=
request
.
json
activityName
=
body
[
'activityName'
]
score
=
body
[
'score'
]
actualResult
=
body
[
'actualResult'
]
predResult
=
body
[
'predResult'
]
createdTime
=
body
[
'createdTime'
]
totalTime
=
body
[
'totalTime'
]
# db.users.insert_one({
db
.
db
[
'mentalChronomrtryScore'
]
.
insert_one
({
"activityName"
:
activityName
,
"score"
:
score
,
"actualResult"
:
actualResult
,
"predResult"
:
predResult
,
"createdTime"
:
createdTime
,
"totalTime"
:
totalTime
,
})
return
jsonify
({
'status'
:
'MentalCrom scores are added to the system successfully!'
,
})
# Knowledge IQ evaluation
@
app
.
route
(
'/predictKnowledgeIq'
,
methods
=
[
'POST'
,
'GET'
])
@
cross_origin
()
def
predictKnowledgeIq
():
if
request
.
method
==
'POST'
:
file
=
request
.
files
.
get
(
'file'
)
questionIndex
=
request
.
form
.
get
(
'questionIndex'
)
print
(
questionIndex
)
if
file
is
None
or
file
.
filename
==
""
:
return
jsonify
({
'error: no file'
})
try
:
file
.
save
(
"./"
+
file
.
filename
)
prediction
=
transform_audio
(
file
.
filename
)
data
=
{
'prediction'
:
prediction
}
db
.
db
[
'knowledgeIQScore'
]
.
insert_one
({
"activityName"
:
"Colour Numbers"
,
"questionIndex"
:
questionIndex
,
"transcription"
:
prediction
,
})
return
jsonify
(
data
)
except
:
return
jsonify
({
'error: Error during pipeline execution'
})
return
jsonify
({
'result: test'
})
# @app.route("/testDB")
# def test():
# db.db.collection.insert_one({"name": "John"})
# return "Connected to the data base!"
# Running app
if
__name__
==
"__main__"
:
app
.
secret_key
=
os
.
urandom
(
24
)
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment