doorlock.py
3.75 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
#-*- coding:utf-8 -*-
import time
import RPi.GPIO as GPIO
from multiprocessing import Queue
import os
PIN = {
'Motor_MT_N':17,
'Motor_MT_P':4
}
class Motor:
LEFT = 0
RIGHT = 1
def __init__(self):
self.pwmN = GPIO.PWM(PIN['Motor_MT_N'], 100)
self.pwmP = GPIO.PWM(PIN['Motor_MT_P'], 100)
self.pwmN.start(0)
self.pwmP.start(0)
def rotate(self, direction):
if direction == Motor.LEFT:
GPIO.output(PIN['Motor_MT_N'], GPIO.HIGH)
GPIO.output(PIN['Motor_MT_P'], GPIO.LOW)
self.pwmN.ChangeDutyCycle(50)
else:
GPIO.output(PIN['Motor_MT_N'], GPIO.LOW)
GPIO.output(PIN['Motor_MT_P'], GPIO.HIGH)
self.pwmP.ChangeDutyCycle(50)
def stop(self):
self.pwmP.ChangeDutyCycle(0)
self.pwmN.ChangeDutyCycle(0)
def RFIDProcess(signalQueue):
while True:
################## 이곳을 지우고 코드를 작성해주세요 ################
# RFID ID가 등록된 기기의 ID인 경우 success에 True를 넣습니다.
#
# RFID 태그가 된 경우 API에 요청을 보내 (GET /api/device) ID 목록을
# 가져온 후 이 목록 안에 태그된 기기의 ID가 있는지 여부를 확인하는 방식으로
# 동작하면 될 것 같습니다.
#
# ID 목록을 미리 받아온 후 비교하도록 하면 ID 목록 업데이트가 안 될 수 있으니
# 태그가 된 경우 ID 목록을 받아오도록 해주세요.
#
# success가 True인 경우 모터가 회전합니다.
#
# 아래 코드는 테스트를 위한 코드입니다. 아래 코드까지 지우고 작성해주세요.
time.sleep(30)
success = True
##############################################################
if success:
print("등록된 RFID ID가 확인됨")
signalQueue.put("RFID")
def RemoteProcess(signalQueue):
while True:
################## 이곳을 지우고 코드를 작성해주세요 ################
# 원격 잠금해제 요청이 들어온 경우 success에 True를 넣습니다.
# 원격 잠금해제 요청은 IPC로 처리합니다.
# 지우님과 협업하여 작업해주세요.
#
# 제 생각으로는 한 파일에 대해서 (ex ~/IPC.txt) API에서는 write하고
# 도어락 프로세스에서는 read하는 방법으로 하면 될 것 같습니다.
# 원격 잠금해제 요청이 들어온 경우 API에서 write하도록 하면 되겠죠..?
#
# success가 True인 경우 모터가 회전합니다.
#
# 아래 코드는 테스트를 위한 코드입니다. 아래 코드까지 지우고 작성해주세요.
time.sleep(13)
success = True
##############################################################
if success:
print("원격 잠금해제 요청이 들어옴")
signalQueue.put("Remote")
def signalProcess(signalQueue):
pid = os.fork()
if pid == 0:
RFIDProcess(signalQueue)
else:
RemoteProcess(signalQueue)
def doorProcess(doorQueue):
motor = Motor()
while True:
signal = doorQueue.get()
print("{} 신호를 받아 문 열기 동작 수행 시작".format(signal))
if signal is not None:
print("문 열림")
motor.rotate(Motor.LEFT)
time.sleep(0.5)
motor.stop()
time.sleep(5) # 열린 후 5초 지나면 닫힘
print("문 닫힘")
motor.rotate(Motor.RIGHT)
time.sleep(0.5)
motor.stop()
if __name__ == '__main__':
try:
GPIO.setmode(GPIO.BCM)
GPIO.setup(PIN['Motor_MT_N'], GPIO.OUT, initial=GPIO.LOW)
GPIO.setup(PIN['Motor_MT_P'], GPIO.OUT, initial=GPIO.LOW)
signalQueue = Queue()
pid = os.fork()
if pid == 0:
doorQueue = Queue()
pid = os.fork()
if pid == 0:
while True:
signal = signalQueue.get()
print("{} 신호가 들어와 전달 준비".format(signal))
if signal is not None:
doorQueue.put(signal)
else:
doorProcess(doorQueue)
else:
signalProcess(signalQueue)
except Exception as e:
print(e)
finally:
GPIO.cleanup()