Commit 96452023 authored by Karunarathna P.M.J.I.'s avatar Karunarathna P.M.J.I.

Upload cv2CCTV.py

parent 2c4c4066
import numpy as np
from ultralytics import YOLO
import cv2
import cvzone
import math
from sort import *
import time
import torch
#from PIL import Image
import shutil
import os
from detecto import core, utils
# from detecto.visualize import show_labeled_image, plot_prediction_grid
from torchvision import transforms
import matplotlib.pyplot as plt
import pyrebase
firebaseConfig = {
"apiKey": "AIzaSyAcltJ26-MoRUlsyAiYdztrlxtuBtjUKko",
"authDomain": "cctv-584b2.firebaseapp.com",
"databaseURL": "https://cctv-584b2-default-rtdb.firebaseio.com",
"projectId": "cctv-584b2",
"storageBucket": "cctv-584b2.appspot.com",
"messagingSenderId": "972965540260",
"appId": "1:972965540260:web:0e8bf3e4465d7bd55fd131",
"measurementId": "G-GN9MTT80BZ",
"serviceAccount":"Firebase_Service_Account_Keys.json"
}
firebase = pyrebase.initialize_app(firebaseConfig)
storage = firebase.storage()
model = YOLO("Yolo-Weights/yolov8l.pt")
classNames = ["person", "bicycle", "car", "motorbike", "aeroplane", "bus", "train", "truck", "boat",
"traffic light", "fire hydrant", "stop sign", "parking meter", "bench", "bird", "cat",
"dog", "horse", "sheep", "cow", "elephant", "bear", "zebra", "giraffe", "backpack", "umbrella",
"handbag", "tie", "suitcase", "frisbee", "skis", "snowboard", "sports ball", "kite", "baseball bat",
"baseball glove", "skateboard", "surfboard", "tennis racket", "bottle", "wine glass", "cup",
"fork", "knife", "spoon", "bowl", "banana", "apple", "sandwich", "orange", "broccoli",
"carrot", "hot dog", "pizza", "donut", "cake", "chair", "sofa", "pottedplant", "bed",
"diningtable", "toilet", "tvmonitor", "laptop", "mouse", "remote", "keyboard", "cell phone",
"microwave", "oven", "toaster", "sink", "refrigerator", "book", "clock", "vase", "scissors",
"teddy bear", "hair drier", "toothbrush"
]
tracker = Sort(max_age=20, min_hits=3, iou_threshold=0.3)
limits = [0, 200, 1344, 241]
limits1 = [0, 400, 1344, 540]
totalCount = []
uniqIds = []
linepassIds = []
timeDuration = []
fulldata = []
mask = cv2.imread("mask.png")
def calculate_speed(time_seconds):
# Convert distance to kilometers and time to hours
distance_km = 20 * 0.001
time_hours = time_seconds / 3600
# Calculate speed in km/h
speed_kmph = distance_km / time_hours
return speed_kmph
model_number = torch.hub.load('ultralytics_yolov5_master', 'custom', path='best.pt',force_reload=True,source='local')
model_ocr = core.Model.load('./model_weights.pth', ['0','1','2','3','4','5','6','7','8','9','A','B','C','D','E','F','G','H','I','J','K','L','M',
'N','O','P','Q','R','S','T','U','V','Z','X','Y','W'])
def number_plate(img):
# img = cv2.imread(image_path)
gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
img_rgb = cv2.cvtColor(gray, cv2.COLOR_BGR2RGB)
# original = img.copy()
# gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
# xp = [0, 64, 112, 128, 144, 192, 255] # setting reference values
# fp = [0, 16, 64, 128, 192, 240, 255] # setting values to be taken for reference values
# x = np.arange(256)
# table = np.interp(x, xp, fp).astype('uint8') # creating lookup table
# img1 = cv2.LUT(gray, table) # changing values based on lookup table
results = model_number(img_rgb)
# results
class_names = model_number.module.names if hasattr(model_number, 'module') else model_number.names
jk = 0
crp_img_num = ""
for det in results.pred[0]:
box = det[:4].cpu().numpy()
score = det[4].cpu().numpy()
labelx = int(det[5].cpu().numpy())
x1, y1, x2, y2 = box.astype(int)
# Draw bounding box
cv2.rectangle(img, (x1, y1), (x2, y2), (0, 255, 0), 2)
crop_img_number = img[y1:y2, x1:x2]
crp_img_num = f'num_{jk}.jpg'
cv2.imwrite(crp_img_num, crop_img_number)
jk = jk+1
predictions = model_ocr.predict(crop_img_number)
labels, boxes, scores = predictions
thresh=0.5
filtered_indices=np.where(scores>thresh)
filtered_scores=scores[filtered_indices]
filtered_boxes=boxes[filtered_indices]
num_list = filtered_indices[0].tolist()
print(labels)
filtered_boxes_list = filtered_boxes.numpy()
filtered_labels = [labels[i] for i in num_list]
combined_data = []
for box, label in zip(filtered_boxes, filtered_labels):
combined_data.append(np.concatenate((box, [label])))
combined_data = np.array(combined_data)
print(combined_data)
try:
# Convert the array to numeric, excluding the last column with non-numeric values
numeric_data = combined_data[:, :-1].astype(np.float64)
# Sort the rows based on the values in the first column
sorted_indices = np.argsort(numeric_data[:, 0])
sorted_data = combined_data[sorted_indices]
print(sorted_data)
number_plate_str = ""
for i in sorted_data:
number_plate_str =number_plate_str+i[4]
except:
number_plate_str = ""
print(number_plate_str)
# Count the letters
letter_count = sum(1 for char in number_plate_str if char.isalpha())
# Remove the first letter if it's not 'A'
if letter_count >= 3 and number_plate_str[0].isalpha() and number_plate_str[0] != 'A':
number_plate_str = number_plate_str[1:]
print(number_plate_str)
# Put label text
label_text = f'{class_names[labelx]}: {score:.2f} : {number_plate_str}'
cv2.putText(img, label_text, (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 0, 0), 2)
print(crp_img_num)
print("crp_img_num ggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggg")
print(number_plate_str)
return number_plate_str,crp_img_num
# Get the starting time
start_time = time.time()
def read_vid(vid):
cap = cv2.VideoCapture(vid)
i = 0
while True:
success, img = cap.read()
if not success: # Break loop when video capture is finished
break
current_time = time.time()
img = cv2.resize(img, None, fx=0.5, fy=0.5)
imgRegion = cv2.bitwise_and(img, mask)
results = model(imgRegion, stream=True)
detections = np.empty((0, 5))
for r in results:
boxes = r.boxes
for box in boxes:
x1, y1, x2, y2 = box.xyxy[0]
x1, y1, x2, y2 = int(x1), int(y1), int(x2), int(y2)
# cv2.rectangle(img,(x1,y1),(x2,y2),(255,0,255),3)
w, h = x2 - x1, y2 - y1
conf = math.ceil((box.conf[0] * 100)) / 100
cls = int(box.cls[0])
currentClass = classNames[cls]
if currentClass == "car" or currentClass == "truck" or currentClass == "bus" or currentClass == "motorbike" and conf > 0.3:
# cvzone.putTextRect(img, f'{currentClass} {conf}', (max(0, x1), max(35, y1)),scale=0.6, thickness=1, offset=3)
# cvzone.cornerRect(img, (x1, y1, w, h), l=9, rt=5)
currentArray = np.array([x1, y1, x2, y2, conf])
detections = np.vstack((detections, currentArray))
resultsTracker = tracker.update(detections)
cv2.line(img, (limits[0], limits[1]), (limits[2], limits[3]), (0, 0, 255), 5)
cv2.line(img, (limits1[0], limits1[1]), (limits1[2], limits1[3]), (0, 0, 255), 5)
for result in resultsTracker:
x1, y1, x2, y2, id = result
x1, y1, x2, y2 = int(x1), int(y1), int(x2), int(y2)
w, h = x2 - x1, y2 - y1
# cvzone.cornerRect(img, (x1, y1, w, h), l=9, rt=2, colorR=(255, 0, 255))
cvzone.putTextRect(img, f' {int(id)}', (max(0, x1), max(35, y1)),
scale=2, thickness=3, offset=10)
cx, cy = x1 + w // 2, y1 + h // 2
# cv2.circle(img, (cx, cy), 5, (255, 0, 255), cv2.FILLED)
if limits[0] < cx < limits[2] and limits[1] - 5 < cy < limits[1] + 25:
if totalCount.count(id) == 0:
totalCount.append(id)
cv2.line(img, (limits[0], limits[1]), (limits[2], limits[3]), (0, 255, 0), 5)
foundAT = time.time()
uniqData = {'id': id, 'foundAT': int(foundAT) }
uniqIds.append(uniqData)
if limits1[0] < cx < limits1[2] and limits1[1] - 5 < cy < limits1[1] + 25:
cv2.line(img, (limits1[0], limits1[1]), (limits1[2], limits1[3]), (0, 255, 0), 5)
PassAT = time.time()
uniqLineTime = {'id': id, 'passAT': int(PassAT) }
linepassIds.append(uniqLineTime)
for item in uniqIds:
if item['id'] == uniqLineTime['id']:
foundAT_value = item['foundAT']
time1 = PassAT - foundAT_value
speed = calculate_speed(round(time1, 2))
if speed > 30:
uid = item['id']
crop_img = img[y1:y2, x1:x2]
try:
number_plate_num,img_num =number_plate(crop_img)
storage.child("CCTV_IMG").child(img_num).put(img_num)
img_num_url = storage.child("CCTV_IMG").child(img_num).get_url(None)
except:
number_plate_num =number_plate(crop_img)
img_num_url = None
img_file_name = f'crop_{uid}.jpg'
cv2.imwrite(img_file_name, crop_img)
storage.child("CCTV_IMG").child(img_file_name).put(img_file_name)
# storage.child("CCTV_IMG").child(img_num).put(img_num)
# os.remove(file.filename)
img_url = storage.child("CCTV_IMG").child(img_file_name).get_url(None)
# img_num_url = storage.child("CCTV_IMG").child(img_num).get_url(None)
i = i+1
f_data = {'item_id':item['id'],'pass_id':uniqLineTime['id'],'foundAT': int(foundAT_value),'passAT': int(PassAT),'timeduration':round(time1, 2),'speed':speed,'number_plate_num':number_plate_num,'img':img_url,'number_img':img_num_url}
fulldata.append(f_data)
# cv2.putText(img,str(len(totalCount)),(255,100),cv2.FONT_HERSHEY_PLAIN,5,(50,50,255),8)
#uncomment from here
# cv2.imshow("Image", img)
# if cv2.waitKey(1) & 0xFF == ord('q'):
# break
elapsed_time = current_time - start_time
if elapsed_time >= 300:
break
cap.release()
return fulldata
cv2.destroyAllWindows()
# return fulldata
# if uniqIds:
# cv2.putText(img,str(uniqIds[-1]),(100,600),cv2.FONT_HERSHEY_PLAIN,3,(50,50,255),2)
# if linepassIds:
# cv2.putText(img,str(linepassIds[-1]),(100,670),cv2.FONT_HERSHEY_PLAIN,3,(50,50,255),2)
# if fulldata:
# cv2.putText(img,str(fulldata[-1]['speed']),(100,720),cv2.FONT_HERSHEY_PLAIN,2,(50,50,255),2)
# cv2.imshow("Image", img)
# if cv2.waitKey(1) & 0xFF == ord('q'):
# break
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment