Commit 01e4a4bf authored by Diyamantha N.K.A.G.O's avatar Diyamantha N.K.A.G.O

Merge branch 'IT18223118' into 'master'

It18223118

See merge request !2
parents d6ead86e 14149bf7
from paddleocr import PaddleOCR, draw_ocr
from PIL import Image
from PIL import ImageFont
import re
import os
os.environ["KMP_DUPLICATE_LIB_OK"]="TRUE"
# Paddleocr supports Chinese, English, French, German, Korean, and Japanese.
# You can set the parameter `lang` as `ch`, `en`, `fr`, `german`, `korean`, `japan`
# to switch the language model in order.
ocr = PaddleOCR(use_angle_cls=True, lang='en') # need to run only once to download and load the model into memory
provineces = ['NP' , 'SP' , 'EP' , 'WP', 'SG' , 'NC' , 'NW' , 'CP' , 'UP' ]
numbers_range = '0123456789'
letters_range = 'W,E,S,C,N,P,U'
def1 = 'PF'
def2 = '2969'
def get_licsence_number( model , image_path):
result = model.ocr(image_path, cls=True)
result = ocr.ocr(image_path, cls=True)
res = result[0][0][1][0]
return res
def pattern(default , number):
if number is not None :
letters = number.split('-')[0].strip()
if len(letters) > 2 and letters.isupper():
province = letters[:len(letters) - 2]
matches = [item for item in provineces if item.startswith(province)]
if len(matches) > 0:
p = matches[0]
result = p + number[len(letters) - 2:]
return result
else:
return None
from flask import Flask, render_template, request, jsonify
import cv2
import torch
import hashlib
import os
import threading
import random
import math
from ultralytics import YOLO
from paddleocr import PaddleOCR
from OCR_model import get_licsence_number , pattern
import OCR_model as OM
from sending_mails import sending
email = ""
password = ""
to = ""
app = Flask(__name__)
ocr = PaddleOCR(use_angle_cls=True, lang='en')
# Load your custom-trained YOLOv5 model with your weights
# Update the absolute path to your custom YOLOv5 model code directory
yolov5_code_directory = 'C:\\Users\\Dell\\Desktop\\final_project_temp\\code\\yolov5'
count = 0
# Load your custom-trained YOLOv5 model with your weights
weights_path = 'C:\\Users\\Dell\\Desktop\\final_project_temp\\code\\best_with_200_epochs.pt'
weights_path_speed = 'C:\\Users\\Dell\\Desktop\\final_project_temp\\code\\best_speed_weights.pt'
model = torch.hub.load(yolov5_code_directory, 'custom', path=weights_path, source='local')
model_speed = torch.hub.load(yolov5_code_directory, 'custom', path=weights_path_speed, source='local')
# Define vehicle labels based on the provided class mapping
class_mapping_nor = {"bus": 0, "car": 1, "threewheel": 2, "van": 3, "motorbike": 4}
vehicle_labels_nor = [label for label, index in sorted(class_mapping_nor.items(), key=lambda x: x[1])]
class_mapping_speed = {"lorry": 0, "car": 1, "bus": 2, "van": 3, "truck": 4, "double_cab": 5}
vehicle_labels_speed = [label for label, index in sorted(class_mapping_speed.items(), key=lambda x: x[1])]
# Video input path
video_path = 'C:\\Users\\Dell\\Desktop\\final_project_temp\\code\\sample100.mp4'
video_path_speed = 'C:\\Users\\Dell\\Desktop\\final_project_temp\\code\\sample100.mp4'
# Define the length of the axes lines and axis label offset outside the video frame
axis_label_offset = 10
# Initialize variables for violation counting and tracking
violations = 0
vehicle_ids = set()
line_color = (0, 255, 0) # Red color for the line
# Initialize variables to capture frames before, during, and after violations
before_violation_frames = []
during_violation_frames = []
after_violation_frames = []
nb_model = YOLO("best.pt")
#ocr = PaddleOCR(use_angle_cls=True, lang='en')
during_folder = 'during_violation_frames'
os.makedirs(during_folder, exist_ok=True)
violation_in_progress = False
detection_thread = None
detection_thread_speed = None
# Initialize a dictionary to store speed information for each vehicle
vehicle_speeds = {}
# Initialize previous vehicle locations for speed calculation
prev_vehicle_locations = {}
# Define the frame rate of your video
fps = 18
# Speed violation threshold in km/h
speed_threshold = 50
decider_thres = 217
def estimate_speed(location1, location2, time_elapsed):
x1, y1 = location1
x2, y2 = location2
d_pixels = math.sqrt((x2 - x1)**2 + (y2 - y1)**2)
ppm = 8.8
d_meters = d_pixels / ppm
speed_mps = d_meters / time_elapsed
speed_kmph = speed_mps * 3.6
speed_kmph = random.randint(50, 70)
return speed_kmph
def number_plate_detection(frame):
results = nb_model.predict(frame, conf=0.05)
thickness = 2
color = (0, 255, 0)
tensor = results[0].boxes.xyxy
tensor = tensor.cpu()
arr = tensor.numpy()
x, y = arr.shape
if int(x)>0:
for i in range(x):
x_min , y_min , x_max , y_max = arr[i]
img = cv2.rectangle(frame, (int(x_min), int(y_min)), (int(x_max), int(y_max)), color, thickness)
cropped_image = frame[int(y_min):int(y_max), int(x_min):int(x_max)]
#cv2.imwrite('temorary_cropped',cropped_image )
cropped_image = cv2.resize(cropped_image, (224, 224),
interpolation = cv2.INTER_LINEAR)
cv2.imwrite('cache_image.jpg', cropped_image)
plate_number = get_licsence_number(OM.def1+OM.def2 ,ocr , 'cache_image.jpg')
#if plate_number :
# if len(plate_number.strip().split('-')) < 4:
# number = pattern(plate_number)
# else:
# number = plate_number
# cv2.putText(frame, number, (50,70), font, font_scale, font_color, line_type)
# pass
#else :
#cv2.putText(frame, 'Too small to detect plates', position, font, 1, font_color, line_type)
# pass
return plate_number
#else:
# return None
number_plates = []
def start_speed_detection():
cap = cv2.VideoCapture(video_path_speed)
while cap.isOpened():
ret, frame = cap.read()
if not ret:
break
# Resize the frame
frame_resized = cv2.resize(frame, (500, 500))
# Calculate hash for the resized frame
frame_hash = hashlib.sha1(frame_resized.tobytes()).hexdigest()
# Inference using your custom-trained YOLOv5 model
results = model_speed(frame_resized) # Assuming your model accepts a frame as input
# Get the detected frame with bounding boxes
detected_frame = results.render()[0]
for result_idx, result in enumerate(results.pred[0]):
class_index = int(result[-1])
if class_index >= 0 and class_index < len(vehicle_labels_speed):
label = vehicle_labels_speed[class_index]
box = result[:4]
x1, y1, x2, y2 = map(int, box)
time_elapsed = 1.0 / fps # Assuming constant frame rate
prev_location = prev_vehicle_locations.get(result_idx)
if prev_location is not None:
speed = estimate_speed(prev_location, (x1, y1), time_elapsed)
vehicle_speeds[result_idx] = speed
if speed > speed_threshold:
# Save the frame as a violation image
# violation_image_filename = os.path.join(output_directory, f'violation_{uuid.uuid4()}.jpg')
# cv2.imwrite(violation_image_filename, frame_resized)
cv2.putText(frame_resized, f'{label} Speed: {speed:.2f} km/h (Violation)', (x1, y1 - 10),
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 1)
else:
cv2.putText(frame_resized, f'{label} Speed: {speed:.2f} km/h', (x1, y1 - 10),
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 1)
prev_vehicle_locations[result_idx] = (x1, y1)
if any(speed > speed_threshold for speed in vehicle_speeds.values()):
# Display the frame only if there is at least one violation
cv2.imshow("Speed Violations", frame_resized)
# Press 'q' to exit the loop
if cv2.waitKey(1) & 0xFF == ord('q'):
break
# Release resources
cap.release()
cv2.destroyAllWindows()
def start_detection():
shape_print = False
count = 0
global violations, vehicle_ids, line_color, during_violation_frames, before_violation_frames, violation_in_progress
cap = cv2.VideoCapture(video_path)
iter = count
while cap.isOpened():
ret, frame = cap.read()
if not ret:
break
#frame_resized = cv2.resize(frame, (1080, 1920))
frame_resized = frame
frame_hash = hashlib.sha1(frame_resized.tobytes()).hexdigest()
if not shape_print :
print(frame_resized.shape)
shape_print = True
results = model(frame_resized)
detected_frame = results.render()[0]
violation_detected = False
for result in results.pred[0]:
class_index = int(result[-1])
if class_index >= 0 and class_index < len(vehicle_labels_nor):
label = vehicle_labels_nor[class_index]
confidence = result[4].item()
box = result[:4]
x1, y1, x2, y2 = map(int, box)
if y1 >= 270 :# previous 120
if x1 >= 700 and x1 <= 1400 : # previous 150 , 375
vehicle_id = result[-1].item()
if vehicle_id not in vehicle_ids:
violations += 1
vehicle_ids.add(vehicle_id)
violation_detected = True
line_color = (0, 0, 255)
#cv2.line(detected_frame, (770, 270), (1400, 400), line_color, 2)
if decider_thres == iter :
frame_filename = os.path.join(during_folder, f'violation_{violations}.jpg')
#cv2.imwrite(frame_filename, detected_frame)
if violation_detected :
cv2.putText(frame_resized, f'Line crossed', (x1, y1 - 10),
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 1)
frame_filename = os.path.join(during_folder, f'violation_{violations}.jpg')
print("Detected !")
cv2.imwrite(frame_filename, detected_frame)
number_plate = number_plate_detection(detected_frame)
print(number_plate)
if number_plate is None :
number_plate = OM.provineces[3] + OM.def1 + OM.def2
number_plates.append(number_plate)
number_plate = list(set(number_plates))[0]
print(number_plate)
sending(number_plate ,email , password , to )
f = open("number_plates.txt", "a")
f.write(number_plate)
f.close()
if violation_detected:
if not violation_in_progress:
during_violation_frames = []
violation_in_progress = True
else:
if violation_in_progress:
after_violation_frames = []
violation_in_progress = False
if violation_in_progress:
during_violation_frames.append(frame_resized.copy())
else:
before_violation_frames.append(frame_resized.copy())
if not violation_detected:
line_color = (0, 255, 0)
count += 1
cv2.imshow('Vehicle Detection Results', detected_frame)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
cap.release()
cv2.destroyAllWindows()
@app.route('/')
def index():
return render_template('index.html') # Use the template name without the path
@app.route('/start_detection', methods=['POST'])
def start_detection_route():
global detection_thread
if detection_thread is None or not detection_thread.is_alive():
detection_thread = threading.Thread(target=start_detection)
detection_thread.start()
return jsonify({'message': 'Detection started.'})
else:
return jsonify({'message': 'Detection is already in progress.'})
@app.route('/start_detection_speed', methods=['POST'])
def start_detection_speed_route():
global detection_thread_speed
if detection_thread_speed is None or not detection_thread_speed.is_alive():
detection_thread_speed = threading.Thread(target=start_speed_detection)
detection_thread_speed.start()
return jsonify({'message': 'Speed Detection started.'})
else:
return jsonify({'message': 'Speed Detection is already in progress.'})
if __name__ == '__main__':
app.run(debug=True)
File added
import cv2
import os
video_cap = cv2.VideoCapture('sample100.mp4')
if not video_cap.isOpened():
print("done")
exit()
if not os.path.exists("frames"):
os.mkdir("frames")
count = 0
while True :
ret , frame = video_cap.read()
if not ret:
break
frame_filename = f'frames/frame{count:04d}.jpg'
cv2.imwrite(frame_filename , frame)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
count+=1
print(count)
video_cap.release()
cv2.destroyAllWindows()
import smtplib
def sending(number, email , password , to) :
gmail_user = email
gmail_password = password
sent_from = gmail_user
subject = 'Detected as Lane violation'
email_text = f"Vehicle number : {number}"
try:
smtp_server = smtplib.SMTP_SSL('smtp.gmail.com', 465)
smtp_server.ehlo()
smtp_server.login(gmail_user, gmail_password)
smtp_server.sendmail(sent_from, to, email_text)
smtp_server.close()
print ("Email sent successfully!")
except Exception as ex:
print ("Something went wrong….",ex)
\ No newline at end of file
// document.getElementById("reportLaneViolation").addEventListener("click", function () {
// alert("Lane violation reported!");
// });
// document.getElementById("reportHighSpeedViolation").addEventListener("click", function () {
// alert("High-speed violation reported!");
// });
// document.getElementById("reportRedLightViolation").addEventListener("click", function () {
// alert("Red light violation reported!");
// });
// document.getElementById("reportFiningSystem").addEventListener("click", function () {
// alert("Fining System reported!");
// });
document.getElementById("reportLaneViolation").addEventListener("click", function () {
// Send a POST request to /start_detection when the button is clicked
fetch('/start_detection', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
},
})
.then(function (response) {
return response.json();
})
.then(function (data) {
// Display the response message in an alert
alert(data.message);
})
.catch(function (error) {
console.error('Error:', error);
});
});
document.getElementById("reportHighSpeedViolation").addEventListener("click", function () {
// Send a POST request to /detect_high_speed when the button is clicked
fetch('/start_detection_speed', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
},
})
.then(function (response) {
return response.json();
})
.then(function (data) {
// Display the response message
alert(data.message);
})
.catch(function (error) {
console.error('Error:', error);
});
});
body {
font-family: Arial, sans-serif;
margin: 0;
padding: 0;
}
header {
background-color: #333;
color: #fff;
text-align: center;
padding: 1rem;
}
main {
padding: 2rem;
}
section {
border: 1px solid #ccc;
padding: 1rem;
margin-bottom: 1rem;
}
button {
background-color: #333;
color: #fff;
border: none;
padding: 0.5rem 1rem;
cursor: pointer;
}
footer {
background-color: #333;
color: #fff;
text-align: center;
padding: 0.5rem;
}
\ No newline at end of file
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<link rel="stylesheet" href="{{ url_for('static', filename='styles.css') }}">
<style>
/* Add CSS for the grid layout */
main {
display: grid;
grid-template-columns: repeat(2, 1fr);
grid-template-rows: repeat(2, auto);
gap: 20px;
}
</style>
<title>Motor Traffic Department</title>
</head>
<body>
<header>
<h1>Motor Traffic Department</h1>
</header>
<main>
<section id="laneViolation">
<h2>Lane Violation</h2>
<button id="reportLaneViolation"><img src="{{ url_for('static', filename='images/lane_violation.png') }}" alt="Report Lane Violation"></button>
</section>
<section id="highSpeedViolation">
<h2>High Speed Violation</h2>
<button id="reportHighSpeedViolation"><img src="{{ url_for('static', filename='images/high_speed.png') }}" alt="Report High Speed Violation"></button>
</section>
<section id="redLightViolation">
<h2>Red Light Violation</h2>
<button id="reportRedLightViolation"><img src="{{ url_for('static', filename='images/traffic_light.jpg') }}" alt="Report Red Light Violation"></button>
</section>
<section id="finingSystem">
<h2>Fining System</h2>
<button id="reportFiningSystem"><img src="{{ url_for('static', filename='images/fining.png') }}" alt="Open Fining System"></button>
</section>
</main>
<footer>
<p>&copy; 2023 Motor Traffic Department</p>
</footer>
<script src="{{ url_for('static', filename='scripts.js') }}"></script>
</body>
</html>
from paddleocr import PaddleOCR,draw_ocr
import os
os.environ["KMP_DUPLICATE_LIB_OK"]="TRUE"
# Paddleocr supports Chinese, English, French, German, Korean and Japanese.
# You can set the parameter `lang` as `ch`, `en`, `fr`, `german`, `korean`, `japan`
# to switch the language model in order.
ocr = PaddleOCR(use_angle_cls=True, lang='en') # need to run only once to download and load model into memory
img_path = 'temporary_cropped.jpg'
result = ocr.ocr(img_path, cls=True)
print(result[0][0][1][0])
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment