Run_test04.py 10.1 KB
from mylib import config, thread
from mylib.mailer import Mailer
from mylib.detection_test01 import detect_people
from imutils.video import VideoStream, FPS
from scipy.spatial import distance as dist
import numpy as np
import argparse, imutils, cv2, os, time, schedule, sys

#----------------------------Parse req. arguments------------------------------#
ap = argparse.ArgumentParser()
ap.add_argument("-i", "--input", type=str, default="",
	help="path to (optional) input video file")
ap.add_argument("-o", "--output", type=str, default="",
	help="path to (optional) output video file")
ap.add_argument("-d", "--display", type=int, default=1,
	help="whether or not output frame should be displayed")
args = vars(ap.parse_args())
#------------------------------------------------------------------------------#

# load the COCO class labels our YOLO model was trained on
labelsPath = os.path.sep.join([config.MODEL_PATH, "coco.names"])
LABELS = open(labelsPath).read().strip().split("\n")

# derive the paths to the YOLO weights and model configuration
weightsPath = os.path.sep.join([config.MODEL_PATH, "yolov3.weights"])
configPath = os.path.sep.join([config.MODEL_PATH, "yolov3.cfg"])

# load our YOLO object detector trained on COCO dataset (80 classes)
net = cv2.dnn.readNetFromDarknet(configPath, weightsPath)

# check if we are going to use GPU
if config.USE_GPU:
	# set CUDA as the preferable backend and target
	print("")
	print("[INFO] Looking for GPU")
	net.setPreferableBackend(cv2.dnn.DNN_BACKEND_CUDA)
	net.setPreferableTarget(cv2.dnn.DNN_TARGET_CUDA)

# determine only the *output* layer names that we need from YOLO
ln = net.getLayerNames()
ln = [ln[i[0] - 1] for i in net.getUnconnectedOutLayers()]

# if a video path was not supplied, grab a reference to the camera
if not args.get("input", False):
	print("[INFO] Starting the live stream..")
	vs = cv2.VideoCapture(config.url)

# otherwise, grab a reference to the video file
else:
	print("[INFO] Starting the video..")
	vs = cv2.VideoCapture(args["input"])

writer = None
# start the FPS counter
fps = FPS().start()

### test
win_name = "ROI ,MIN_DISTANCE setting"
pts_cnt = 0
dist_std = 0
mtrx = 0

def onMouse(event, x, y, flags, param):
	global pts_cnt
	if event == cv2.EVENT_LBUTTONDOWN:
		# 좌표에 초록색 동그라미 표시
		cv2.circle(draw, (x, y), 10, (0, 255, 0), -1)
		cv2.imshow(win_name, draw)

		# 마우스 좌표 저장
		pts[pts_cnt] = [x, y]
		pts_cnt += 1
		if pts_cnt == 6:
			topLeft = pts[0]
			bottomLeft = pts[1]
			bottomRight = pts[2]
			topRight = pts[3]

			ptA = pts[4]
			ptB = pts[5]

			# 변환 전 4개 좌표
			pts1 = np.float32([topLeft, topRight, bottomRight, bottomLeft])
			# standard_dist = np.float32([ptA, ptB])
			standard_dist = []
			standard_dist.append(ptA)
			standard_dist.append(ptB)
			print(standard_dist)

			# 변환 후 영상에 사용할 서류의 폭과 높이 계산
			w1 = abs(bottomRight[0] - bottomLeft[0])
			w2 = abs(topRight[0] - topLeft[0])
			h1 = abs(topRight[1] - bottomRight[1])
			h2 = abs(topLeft[1] - bottomLeft[1])
			width = max([w1, w2])  # 두 좌우 거리간의 최대값이 서류의 폭
			height = max([h1, h2])  # 두 상하 거리간의 최대값이 서류의 높이

			# 변환 후 4개 좌표
			pts2 = np.float32([[0, 0], [width, 0], [width, height], [0, height]])

			# 변환 행렬 계산
			global mtrx
			mtrx = cv2.getPerspectiveTransform(pts1, pts2)
			# 원근 변환 적용
			result = cv2.warpPerspective(frame, mtrx, (width, height))

			# warped_standard_dist = cv2.perspectiveTransform(standard_dist, mtrx)
			warped_standard_dist = get_transformed_points(standard_dist, mtrx)
			warped_standard_dist = np.array([r for r in warped_standard_dist])
			print(warped_standard_dist)

			warped_D = dist.cdist(warped_standard_dist, warped_standard_dist, metric="euclidean")

			for i in range(0, warped_D.shape[0]):
				for j in range(i + 1, warped_D.shape[1]):
					global dist_std
					dist_std = warped_D[i, j]
					print("Warped Standard Social pixel distance : {}".format(warped_D[i, j]))

			cv2.imshow('scanned', result)
			cv2.waitKey(1)
			# cv2.destroyAllWindows()

def get_transformed_points(pts, mtrx):
	transformed_points = []
	for pt in pts:
		pnts = np.array([[pt]], dtype="float32")
		bd_pnt = cv2.perspectiveTransform(pnts, mtrx)[0][0]
		# pnt = [int(bd_pnt[0]), int(bd_pnt[1])]
		pnt = (int(bd_pnt[0]), int(bd_pnt[1]))
		transformed_points.append(pnt)

	return transformed_points

while True:
	(grabbed, frame) = vs.read()
	frame = imutils.resize(frame, width=1000)
	draw = frame.copy()
	pts = np.zeros((6, 2), dtype=np.float32)

	cv2.imshow(win_name, frame)
	cv2.setMouseCallback(win_name, onMouse)

	if pts_cnt >= 6:
		cv2.destroyAllWindows()
		break

	k = cv2.waitKey(0) & 0xFF
	if k == ord("f"):
		continue
	elif k == ord("q"):
		print("[INFO] Exit System...")
		sys.exit()
	elif k != ord("f") and k != ord("q"):
		pts_cnt = 0
		continue
	cv2.destroyAllWindows()

### test end

# loop over the frames from the video stream
while True:
	(grabbed, frame) = vs.read()
	# if the frame was not grabbed, then we have reached the end of the stream
	if not grabbed:
		break

	# resize the frame and then detect people (and only people) in it
	frame = imutils.resize(frame, width=700)
	results = detect_people(frame, net, ln, personIdx=LABELS.index("person"))

	# initialize the set of indexes that violate the max/min social distance limits
	serious = set()
	# abnormal = set()

	# ensure there are *at least* two people detections (required in
	# order to compute our pairwise distance maps)
	if len(results) >= 2:
		# extract all centroids from the results and compute the
		# Euclidean distances between all pairs of the centroids

		# centroids = np.array([r[2] for r in results])	# test
		feets = np.array([r[3] for r in results])
		# D = dist.cdist(centroids, centroids, metric="euclidean")	# test
		D = dist.cdist(feets, feets, metric="euclidean")

		warped_feets = get_transformed_points(feets, mtrx)
		warped_feets = np.array([r for r in warped_feets])
		# print(warped_feets)
		new_D = dist.cdist(warped_feets, warped_feets, metric="euclidean")

		# # loop over the upper triangular of the distance matrix
		# for i in range(0, D.shape[0]):
		# 	for j in range(i + 1, D.shape[1]):
		# 		# check to see if the distance between any two
		# 		# centroid pairs is less than the configured number of pixels
		# 		if D[i, j] < config.MIN_DISTANCE:
		# 			# update our violation set with the indexes of the centroid pairs
		# 			serious.add(i)
		# 			serious.add(j)
		#
		# 		# # update our abnormal set if the centroid distance is below max distance limit
		# 		# if (D[i, j] < config.MAX_DISTANCE) and not serious:
		# 		# 	abnormal.add(i)
		# 		# 	abnormal.add(j)

		for i in range(0, new_D.shape[0]):
			for j in range(i + 1, new_D.shape[1]):
				# check to see if the distance between any two
				# centroid pairs is less than the configured number of pixels
				if new_D[i, j] < dist_std:
					# update our violation set with the indexes of the centroid pairs
					print("Violation: Warped Social pixel distance : {}".format(new_D[i, j]))
					serious.add(i)
					serious.add(j)



	# loop over the results
	for (i, (prob, bbox, centroid, feet)) in enumerate(results):
		# extract the bounding box and centroid coordinates, then
		# initialize the color of the annotation
		(startX, startY, endX, endY) = bbox
		(cX, cY) = centroid
		(fX, fY) = feet
		color = (0, 255, 0)	# green

		# if the index pair exists within the violation/abnormal sets, then update the color
		if i in serious:
			color = (0, 0, 255)	# red
		# elif i in abnormal:
		# 	color = (0, 255, 255) #orange = (0, 165, 255)

		# draw (1) a bounding box around the person and (2) the
		# centroid coordinates of the person,
		cv2.rectangle(frame, (startX, startY), (endX, endY), color, 2)
		# cv2.circle(frame, (cX, cY), 5, color, 2)	# test
		cv2.circle(frame, (fX, fY), 5, color, 2)	# img, 원의 중심 좌표, 반지름, 색, 선의 두께

	## test
	# draw some of the parameters
	Safe_Distance = "Safe distance: >{} px".format(config.MAX_DISTANCE)
	cv2.putText(frame, Safe_Distance, (470, frame.shape[0] - 25),
		cv2.FONT_HERSHEY_COMPLEX, 0.60, (255, 0, 0), 2)
	# Threshold = "Threshold limit: {}".format(config.Threshold)
	# cv2.putText(frame, Threshold, (470, frame.shape[0] - 50),
	# 	cv2.FONT_HERSHEY_COMPLEX, 0.60, (255, 0, 0), 2)

	# draw the total number of social distancing violations on the output frame
	text = "Total serious violations: {}".format(len(serious))
	cv2.putText(frame, text, (10, frame.shape[0] - 55),
		cv2.FONT_HERSHEY_COMPLEX, 0.70, (0, 0, 255), 2)

	# text1 = "Total abnormal violations: {}".format(len(abnormal))
	# cv2.putText(frame, text1, (10, frame.shape[0] - 25),
	# 	cv2.FONT_HERSHEY_COMPLEX, 0.70, (0, 255, 255), 2)
	## test end

# #------------------------------Alert function----------------------------------#
# 	if len(serious) >= config.Threshold:
# 		cv2.putText(frame, "-ALERT: Violations over limit-", (10, frame.shape[0] - 80),
# 			cv2.FONT_HERSHEY_COMPLEX, 0.60, (0, 0, 255), 2)
# 		if config.ALERT:
# 			print("")
# 			print('[INFO] Sending mail...')
# 			Mailer().send(config.MAIL)
# 			print('[INFO] Mail sent')
# 		#config.ALERT = False
# #------------------------------------------------------------------------------#
	# check to see if the output frame should be displayed to our screen
	if args["display"] > 0:
		# show the output frame
		cv2.imshow("Real-Time Monitoring/Analysis Window", frame)
		key = cv2.waitKey(1) & 0xFF

		# if the `q` key was pressed, break from the loop
		if key == ord("q"):
			break
	# update the FPS counter
	fps.update()

	# if an output video file path has been supplied and the video
	# writer has not been initialized, do so now
	if args["output"] != "" and writer is None:
		# initialize our video writer
		fourcc = cv2.VideoWriter_fourcc(*"MJPG")
		writer = cv2.VideoWriter(args["output"], fourcc, 25,
			(frame.shape[1], frame.shape[0]), True)

	# if the video writer is not None, write the frame to the output video file
	if writer is not None:
		writer.write(frame)

# stop the timer and display FPS information
fps.stop()
print("===========================")
print("[INFO] Elasped time: {:.2f}".format(fps.elapsed()))
print("[INFO] Approx. FPS: {:.2f}".format(fps.fps()))

# close any open windows
cv2.destroyAllWindows()