고원빈

Merge branch 'Hardware'

Showing 45 changed files with 1591 additions and 0 deletions
# 스마트 약병 IoT 하드웨어 및 연결단 개발
## 지도교수
+ 유인태 교수님
## 개발담당자
+ 2015100552 컴퓨터공학과 윤형선
## 일정
+ 2021.05.03: Init(개인 아두이노 코드 git에서 이전)
+ 2021.05.11: Arduino 개발 중단, RaspberryPi Pico로 플랫폼 이전, C++ -> MicroPython
+ 2021.05.12: RaspberryPi Pico v0.1 code complete. Now working on rpi4 with MQTT.
+ 2021.05.13: RaspberryPi4 v0.1 code complete. with MQTT. + Pico code bug fix.
+ 2021.05.13: Fritzing design add.
+ 2021.05.16: RPI4 Code update
+ 2021.05.22: 3D Modeling file add.(Ender3 Pro)
+ 2021.05.25: 3D Printer gcode file updated.(Ender3 Pro+PLA)
\ No newline at end of file
No preview for this file type
This diff could not be displayed because it is too large.
No preview for this file type
This diff could not be displayed because it is too large.
No preview for this file type
This diff could not be displayed because it is too large.
This diff could not be displayed because it is too large.
This diff could not be displayed because it is too large.
No preview for this file type
This diff could not be displayed because it is too large.
This diff could not be displayed because it is too large.
This diff could not be displayed because it is too large.
No preview for this file type
This diff could not be displayed because it is too large.
This diff could not be displayed because it is too large.
This diff could not be displayed because it is too large.
No preview for this file type
This diff could not be displayed because it is too large.
This diff could not be displayed because it is too large.
This diff could not be displayed because it is too large.
No preview for this file type
# RaspberryPi4용 코드
- rpi.py
```
라즈베리파이 메인코드
```
- serverside_mqtt_stest.py
```
server에서 데이터 보낼 예제
```
- serverside_mqtt_rtest.py
```
server에서 수신한 데이터
```
python rpi.py동작 후, test코드 활용해서 테스트 가능
paho-mqtt
\ No newline at end of file
import paho.mqtt.client as mqtt
import paho.mqtt.publish as publish
from bluetooth import *
import time
MQTT_BLOCK = False
BT_MAC_ADDRESS = '00:18:91:D8:24:39'
MED_BOTT_NO = '1'
client_socket = BluetoothSocket( RFCOMM )
client_mqtt = mqtt.Client()
# --------------------------------------------------- #
# INTERNAL FUNCTIONS
# --------------------------------------------------- #
def _send_data(msg:str):
''' Bluetooth를 통해 데이터를 송신한다
'''
client_socket.send(msg)
def _recv_data():
''' Bluetooth를 통해 데이터를 수신한다
'''
# 소켓에 2초간 받을 수 있는 시간을 준다
client_socket.settimeout(2)
# 데이터 받을 변수
data = ''
try:
timeout_start = time.time()
timeout = 2
while time.time() < timeout_start + timeout:
data += client_socket.recv(1024).decode('utf-8')
except:
print('INFO: DATA RECV TIMEOUT')
finally:
return data
def _sub_mqtt(client, userdata, flags, rc):
''' MQTT에서 subscribe한다
'''
print("Connected with result code "+str(rc))
client.subscribe(f'bottle/1/stb')
def _work_mqtt(client, userdata, msg):
''' MQTT에서 데이터를 받으면 알맞게 로직을 수행한다
'''
print('DOING SOMETHING')
global MQTT_BLOCK
MQTT_BLOCK = True
received_msg = msg.payload.decode('utf-8')
data = ''
data_list = []
# 만약 req메시지를 받았다면
if received_msg == 'req':
# 데이터가 손실되어 왔을 경우 에러 처리
if len(data_list[0]) == 0:
data_list = []
elif len(data_list[3]) == 0:
data_list = []
while len(data_list) != 4:
_send_data('REQ')
data = _recv_data()
data_list = data.split('/')
# 데이터가 손실되어 왔을 경우 에러 처리
if len(data_list[0]) == 0:
data_list = []
elif len(data_list[3]) == 0:
data_list = []
# 데이터를 Publish한다
_pub_mqtt(f'bottle/{MED_BOTT_NO}/bts', data, 'localhost')
# 만약 res메시지를 받았다면
elif received_msg.split('/')[0] == 'res':
_send_data(received_msg.split('/')[1])
data = _recv_data()
data_list = data.split('/')
# 데이터가 손실되어 왔을 경우 에러 처리
if len(data_list[0]) == 0:
data_list = []
elif len(data_list[3]) == 0:
data_list = []
while len(data_list) != 4:
_send_data('REQ')
data = _recv_data()
data_list = data.split('/')
# 데이터가 손실되어 왔을 경우 에러 처리
if len(data_list[0]) == 0:
data_list = []
elif len(data_list[3]) == 0:
data_list = []
# 데이터를 Publish한다
# _pub_mqtt(f'bottle/{MED_BOTT_NO}/bts', data, 'localhost')
print("DEBUG")
print(data)
print(data_list)
print(received_msg)
MQTT_BLOCK = False
def _pub_mqtt(topic:str, payload:str, hostname:str):
''' MQTT를 통해 데이터를 publish한다
'''
publish.single(
topic=topic,
payload=payload,
hostname=hostname)
# --------------------------------------------------- #
# MAIN
# --------------------------------------------------- #
def _run():
# CONNECT BT
client_socket.connect((BT_MAC_ADDRESS, 1))
print('bluetooth connected!')
# SUBSCRIBE MQTT
client_mqtt.on_connect = _sub_mqtt
client_mqtt.on_message = _work_mqtt
client_mqtt.connect_async("localhost")
client_mqtt.loop_start()
while True:
if MQTT_BLOCK is False:
# 항상 데이터를 받을 상태로 있는다
data = _recv_data()
# 만약 데이터가 ''가 아니면 무언가 온 것이므로 처리한다
if data != '':
print('DATA IN')
data_list = data.split('/')
# 데이터가 손실되어 왔을 경우 에러 처리
if len(data_list[0]) == 0:
data_list = []
elif len(data_list[3]) == 0:
data_list = []
# 만약 데이터가 불량하게 왔을 경우, 제대로 올때까지 반복시도한다
while len(data_list) != 4:
data = _recv_data()
data_list = data.split('/')
# 데이터가 손실되어 왔을 경우 에러 처리
if len(data_list[0]) == 0:
data_list = []
elif len(data_list[3]) == 0:
data_list = []
# 데이터를 Publish한다
_pub_mqtt(f'bottle/{MED_BOTT_NO}/bts', data, 'localhost')
client_socket.close()
if __name__ == '__main__':
_run()
\ No newline at end of file
import paho.mqtt.client as mqtt
import time
def on_connect(client, userdata, flags, rc):
print("Connected with result code "+str(rc))
client.subscribe("bottle/1/bts") # 요거 수정해서 확인하세요
def on_message(client, userdata, msg):
print(msg.topic)
print(msg.payload.decode('utf-8'))
client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message
client.connect_async("localhost")
client.loop_start()
while True:
print("TESTing")
time.sleep(1)
\ No newline at end of file
import paho.mqtt.publish as publish
# hub -> server
# msgs = \
# [
# {
# 'topic':"bottle/1/bts",
# 'payload':"1/30.4/32.2/1"
# }
# ]
# server -> hub with date
# msgs = \
# [
# {
# 'topic':"bottle/1/stb",
# 'payload':"res/0513"
# }
# ]
# server -> hub without date
msgs = \
[
{
'topic':"bottle/1/stb",
'payload':"req"
}
]
publish.multiple(msgs, hostname="localhost")
"""
MicroPython TM1637 quad 7-segment LED display driver
https://github.com/mcauser/micropython-tm1637
MIT License
Copyright (c) 2016 Mike Causer
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
"""
from micropython import const
from machine import Pin
from time import sleep_us, sleep_ms
TM1637_CMD1 = const(64) # 0x40 data command
TM1637_CMD2 = const(192) # 0xC0 address command
TM1637_CMD3 = const(128) # 0x80 display control command
TM1637_DSP_ON = const(8) # 0x08 display on
TM1637_DELAY = const(10) # 10us delay between clk/dio pulses
TM1637_MSB = const(128) # msb is the decimal point or the colon depending on your display
# 0-9, a-z, blank, dash, star
_SEGMENTS = bytearray(b'\x3F\x06\x5B\x4F\x66\x6D\x7D\x07\x7F\x6F\x77\x7C\x39\x5E\x79\x71\x3D\x76\x06\x1E\x76\x38\x55\x54\x3F\x73\x67\x50\x6D\x78\x3E\x1C\x2A\x76\x6E\x5B\x00\x40\x63')
class TM1637(object):
"""Library for quad 7-segment LED modules based on the TM1637 LED driver."""
def __init__(self, clk, dio, brightness=7):
self.clk = clk
self.dio = dio
if not 0 <= brightness <= 7:
raise ValueError("Brightness out of range")
self._brightness = brightness
self.clk.init(Pin.OUT, value=0)
self.dio.init(Pin.OUT, value=0)
sleep_us(TM1637_DELAY)
self._write_data_cmd()
self._write_dsp_ctrl()
def _start(self):
self.dio(0)
sleep_us(TM1637_DELAY)
self.clk(0)
sleep_us(TM1637_DELAY)
def _stop(self):
self.dio(0)
sleep_us(TM1637_DELAY)
self.clk(1)
sleep_us(TM1637_DELAY)
self.dio(1)
def _write_data_cmd(self):
# automatic address increment, normal mode
self._start()
self._write_byte(TM1637_CMD1)
self._stop()
def _write_dsp_ctrl(self):
# display on, set brightness
self._start()
self._write_byte(TM1637_CMD3 | TM1637_DSP_ON | self._brightness)
self._stop()
def _write_byte(self, b):
for i in range(8):
self.dio((b >> i) & 1)
sleep_us(TM1637_DELAY)
self.clk(1)
sleep_us(TM1637_DELAY)
self.clk(0)
sleep_us(TM1637_DELAY)
self.clk(0)
sleep_us(TM1637_DELAY)
self.clk(1)
sleep_us(TM1637_DELAY)
self.clk(0)
sleep_us(TM1637_DELAY)
def brightness(self, val=None):
"""Set the display brightness 0-7."""
# brightness 0 = 1/16th pulse width
# brightness 7 = 14/16th pulse width
if val is None:
return self._brightness
if not 0 <= val <= 7:
raise ValueError("Brightness out of range")
self._brightness = val
self._write_data_cmd()
self._write_dsp_ctrl()
def write(self, segments, pos=0):
"""Display up to 6 segments moving right from a given position.
The MSB in the 2nd segment controls the colon between the 2nd
and 3rd segments."""
if not 0 <= pos <= 5:
raise ValueError("Position out of range")
self._write_data_cmd()
self._start()
self._write_byte(TM1637_CMD2 | pos)
for seg in segments:
self._write_byte(seg)
self._stop()
self._write_dsp_ctrl()
def encode_digit(self, digit):
"""Convert a character 0-9, a-f to a segment."""
return _SEGMENTS[digit & 0x0f]
def encode_string(self, string):
"""Convert an up to 4 character length string containing 0-9, a-z,
space, dash, star to an array of segments, matching the length of the
source string."""
segments = bytearray(len(string))
for i in range(len(string)):
segments[i] = self.encode_char(string[i])
return segments
def encode_char(self, char):
"""Convert a character 0-9, a-z, space, dash or star to a segment."""
o = ord(char)
if o == 32:
return _SEGMENTS[36] # space
if o == 42:
return _SEGMENTS[38] # star/degrees
if o == 45:
return _SEGMENTS[37] # dash
if o >= 65 and o <= 90:
return _SEGMENTS[o-55] # uppercase A-Z
if o >= 97 and o <= 122:
return _SEGMENTS[o-87] # lowercase a-z
if o >= 48 and o <= 57:
return _SEGMENTS[o-48] # 0-9
raise ValueError("Character out of range: {:d} '{:s}'".format(o, chr(o)))
def hex(self, val):
"""Display a hex value 0x0000 through 0xffff, right aligned."""
string = '{:04x}'.format(val & 0xffff)
self.write(self.encode_string(string))
def number(self, num):
"""Display a numeric value -999 through 9999, right aligned."""
# limit to range -999 to 9999
num = max(-999, min(num, 9999))
string = '{0: >4d}'.format(num)
self.write(self.encode_string(string))
def numbers(self, num1, num2, colon=True):
"""Display two numeric values -9 through 99, with leading zeros
and separated by a colon."""
num1 = max(-9, min(num1, 99))
num2 = max(-9, min(num2, 99))
segments = self.encode_string('{0:0>2d}{1:0>2d}'.format(num1, num2))
if colon:
segments[1] |= 0x80 # colon on
self.write(segments)
def temperature(self, num):
if num < -9:
self.show('lo') # low
elif num > 99:
self.show('hi') # high
else:
string = '{0: >2d}'.format(num)
self.write(self.encode_string(string))
self.write([_SEGMENTS[38], _SEGMENTS[12]], 2) # degrees C
def show(self, string, colon=False):
segments = self.encode_string(string)
if len(segments) > 1 and colon:
segments[1] |= 128
self.write(segments[:4])
def scroll(self, string, delay=250):
segments = string if isinstance(string, list) else self.encode_string(string)
data = [0] * 8
data[4:0] = list(segments)
for i in range(len(segments) + 5):
self.write(data[0+i:4+i])
sleep_ms(delay)
class TM1637Decimal(TM1637):
"""Library for quad 7-segment LED modules based on the TM1637 LED driver.
This class is meant to be used with decimal display modules (modules
that have a decimal point after each 7-segment LED).
"""
def encode_string(self, string):
"""Convert a string to LED segments.
Convert an up to 4 character length string containing 0-9, a-z,
space, dash, star and '.' to an array of segments, matching the length of
the source string."""
segments = bytearray(len(string.replace('.','')))
j = 0
for i in range(len(string)):
if string[i] == '.' and j > 0:
segments[j-1] |= TM1637_MSB
continue
segments[j] = self.encode_char(string[i])
j += 1
return segments
import uos
import machine
import utime
uart0 = machine.UART(0,baudrate=9600)
# --------------------------------------------------- #
# INTERNAL FUNCTIONS
# --------------------------------------------------- #
def _clartBuf(uart=uart0):
''' Clear Buffer
'''
while uart.any():
print(uart.read(1))
# --------------------------------------------------- #
# FUNCTIONS
# --------------------------------------------------- #
def send_data_bt(sdata:str):
''' Send string data using Bluetooth
'''
_clartBuf()
uart0.write(sdata)
def recv_data_bt():
''' Receive string data using Bluetooth
'''
_clartBuf()
prvMills = utime.ticks_ms()
received_data = ''
while (utime.ticks_ms()-prvMills)<2000:
if uart0.any():
single_data = uart0.read(1)
received_data += single_data.decode('utf-8')
return received_data
\ No newline at end of file
''' Original code from https://www.raspberrypi.org/forums/viewtopic.php?t=303606
'''
import utime
import rp2
from rp2 import PIO, asm_pio
from machine import Pin
# --------------------------------------------------- #
# INIT
# --------------------------------------------------- #
# For VCC
dht_pwr = Pin(14, Pin.OUT)
# For data
dht_data = Pin(15, Pin.IN, Pin.PULL_UP)
# Power on
dht_pwr.value(1)
# Create empty state
sm=rp2.StateMachine(1)
# Wait for DHT22 to start up
utime.sleep(2)
# --------------------------------------------------- #
# INIT2
# --------------------------------------------------- #
@asm_pio(set_init=(PIO.OUT_HIGH),autopush=True, push_thresh=8) #output one byte at a time
def DHT22():
#drive output low for at least 20ms
set(pindirs,1) #set pin to output
set(pins,0) #set pin low
set(y,31) #prepare countdown, y*x*100cycles
label ('waity')
set(x,31)
label ('waitx')
nop() [30]
jmp(x_dec,'waitx') #decrement x reg every 100 cycles
jmp(y_dec,'waity') #decrement y reg every time x reaches zero
#begin reading from device
set(pindirs,0) #set pin to input
wait(1,pin,0) #check pin is high before starting
wait(0,pin,0)
wait(1,pin,0)
wait(0,pin,0) #wait for start of data
#read databit
label('readdata')
set(x,21) #reset x register to count down from 20
wait(1,pin,0) #wait for high signal
label('countdown')
jmp(pin,'continue') #if pin still high continue counting
#pin is low before countdown is complete - bit '0' detected
set(y,0)
in_(y, 1) #shift '0' into the isr
jmp('readdata') #read the next bit
label('continue')
jmp(x_dec,'countdown') #decrement x reg and continue counting if x!=0
#pin is still high after countdown complete - bit '1' detected
set(y,1)
in_(y, 1) #shift one bit into the isr
wait(0,pin,0) #wait for low signal (next bit)
jmp('readdata') #read the next bit
# --------------------------------------------------- #
# FUNCTIONS
# --------------------------------------------------- #
def work_dht():
data=[]
total=0
sm.init(DHT22,freq=1600000,set_base=dht_data,in_base=dht_data,jmp_pin=dht_data)
sm.active(1)
for i in range(5): #data should be 40 bits (5 bytes) long
data.append(sm.get()) #read byte
#check checksum (lowest 8 bits of the sum of the first 4 bytes)
for i in range(4):
total=total+data[i]
if((total & 255) == data[4]):
humidity=((data[0]<<8) + data[1])/10.0
temperature=(((data[2] &0x7f) << 8) + data[3]) /10.0
if (data[2] & 0x80) == 0x80:
temperature = -temperature
return [humidity, temperature]
else:
return False
\ No newline at end of file
import tm1637
from machine import Pin
from utime import sleep
display4 = tm1637.TM1637(clk=Pin(12), dio=Pin(13))
# --------------------------------------------------- #
# FUNCTIONS
# --------------------------------------------------- #
def work_tm1637(data:str):
display4.show(data)
def off_tm1637():
display4.show(' ')
# # Show a word
# mydisplay.show("Pico")
# sleep(1)
# #blank the screen
# mydisplay.show(" ")
# sleep(1)
# #show numbers
# mydisplay.number(-123)
# sleep(1)
# #show a time with colon
# mydisplay.numbers(12,59)
# sleep(1)
# #adjust the brightness to make it lower
# mydisplay.brightness(0)
# sleep(1)
# #show scrolling text
# mydisplay.scroll("Hello World 123", delay=200)
# sleep(1)
# #show temperature
# mydisplay.temperature(99)
# sleep(1)
# #blank the screen again
# mydisplay.show(" ")
\ No newline at end of file
import array, utime
from machine import Pin
import rp2
import neopixel
import dht
import bluetoooth as bto
import ultrasonic
import reed
import display4
# --------------------------------------------------- #
# FUNCTIONS
# --------------------------------------------------- #
def _collect_sensor_datas(reed_data:int) -> str:
# Collect Humidity, Temperature
dht_data = dht.work_dht()
if dht_data == False:
dht_data = [0,0]
# Collect Ultrasonic distance
ultrasonic_data = ultrasonic.work_sr04()
# Make data string
send_data_str = str(reed_data) + '/' + str(dht_data[1]) + '/' + str(dht_data[0]) + '/' + str(ultrasonic_data)
return send_data_str
# --------------------------------------------------- #
# LOOP ENTRYPOINT
# --------------------------------------------------- #
def _run():
# INIT REED STATE
reed_data = -1
# LOOP
while True:
# ------------------------------------------- #
# DEFAULT LOOP
# ------------------------------------------- #
# Get data using BT(Standby)
input_data = bto.recv_data_bt()
# Get reed data from reed sensor
current_reed_data = reed.work_reed()
# ------------------------------------------- #
# IF CONDITION MET
# ------------------------------------------- #
if input_data != '' or reed_data != current_reed_data:
# Refine BT data
input_data = input_data.strip()
# Test code
print('INPUT FOUND ', input_data)
# IF INPUT MEANS GET MESSAGE or MEDICINE LID STATUS CHANGED
if input_data == 'REQ' or reed_data != current_reed_data:
# Send data using BT
bto.send_data_bt(_collect_sensor_datas(current_reed_data))
else:
# Refine BT data
input_data = input_data.strip()
display4.work_tm1637(input_data)
neopixel.work_led()
display4.off_tm1637()
# Send data using BT
bto.send_data_bt(_collect_sensor_datas(current_reed_data))
# Update reed state
reed_data = current_reed_data
if __name__ == '__main__':
_run()
\ No newline at end of file
''' Original code from https://core-electronics.com.au/tutorials/how-to-use-ws2812b-rgb-leds-with-raspberry-pi-pico.html
'''
import array, utime
from machine import Pin
import rp2
# --------------------------------------------------- #
# INIT
# --------------------------------------------------- #
NUM_LEDS = 8
PIN_NUM = 22
BLACK = (0, 0, 0)
RED = (255, 0, 0)
YELLOW = (255, 150, 0)
GREEN = (0, 255, 0)
CYAN = (0, 255, 255)
BLUE = (0, 0, 255)
PURPLE = (180, 0, 255)
WHITE = (255, 255, 255)
COLORS = (BLACK, RED, YELLOW, GREEN, CYAN, BLUE, PURPLE, WHITE, BLACK)
# --------------------------------------------------- #
# INIT2
# --------------------------------------------------- #
@rp2.asm_pio(sideset_init=rp2.PIO.OUT_LOW, out_shiftdir=rp2.PIO.SHIFT_LEFT, autopull=True, pull_thresh=24)
def ws2812():
T1 = 2
T2 = 5
T3 = 3
wrap_target()
label("bitloop")
out(x, 1) .side(0) [T3 - 1]
jmp(not_x, "do_zero") .side(1) [T1 - 1]
jmp("bitloop") .side(1) [T2 - 1]
label("do_zero")
nop() .side(0) [T2 - 1]
wrap()
# Create the StateMachine with the ws2812 program, outputting on pin
sm = rp2.StateMachine(0, ws2812, freq=8_000_000, sideset_base=Pin(PIN_NUM))
# Start the StateMachine, it will wait for data on its FIFO.
sm.active(1)
# Display a pattern on the LEDs via an array of LED RGB values.
ar = array.array("I", [0 for _ in range(NUM_LEDS)])
# --------------------------------------------------- #
# INTERNAL FUNCTIONS
# --------------------------------------------------- #
def _pixels_show(brightness:int = 0.2):
dimmer_ar = array.array("I", [0 for _ in range(NUM_LEDS)])
for i,c in enumerate(ar):
r = int(((c >> 8) & 0xFF) * brightness)
g = int(((c >> 16) & 0xFF) * brightness)
b = int((c & 0xFF) * brightness)
dimmer_ar[i] = (g<<16) + (r<<8) + b
sm.put(dimmer_ar, 8)
utime.sleep_ms(10)
def _pixels_set(i, color):
ar[i] = (color[1]<<16) + (color[0]<<8) + color[2]
def _pixels_fill(color):
for i in range(len(ar)):
_pixels_set(i, color)
# --------------------------------------------------- #
# FUNCTIONS
# --------------------------------------------------- #
def work_led(brightness:int = 0.2):
for color in COLORS:
_pixels_fill(color)
_pixels_show(brightness)
utime.sleep(0.4)
from machine import Pin, Signal
data_pin = Pin(16, Pin.IN)
# --------------------------------------------------- #
# FUNCTIONS
# --------------------------------------------------- #
def work_reed() -> int:
# 1 = Lid opened, 0 = Lid closed
return data_pin.value()
''' Original code from https://www.iottrends.tech/blog/how-to-use-ultrasonic-sensor-with-raspberry-pi-pico/
'''
from machine import Pin
import utime
trigger = Pin(26, Pin.OUT)
echo = Pin(27, Pin.IN)
# --------------------------------------------------- #
# FUNCTIONS
# --------------------------------------------------- #
def work_sr04():
trigger.low()
utime.sleep_us(2)
trigger.high()
utime.sleep_us(5)
trigger.low()
while echo.value() == 0:
signaloff = utime.ticks_us()
while echo.value() == 1:
signalon = utime.ticks_us()
timepassed = signalon - signaloff
distance = (timepassed * 0.0330) / 2
return distance
import uos
import machine
import utime
uart0 = machine.UART(0,baudrate=9600)
def _clartBuf(uart=uart0):
print("Clear UART buffer "+ str(uart))
while uart.any():
print(uart.read(1))
def send_data_bt(sdata:str):
_clartBuf()
uart0.write(sdata)
def recv_data_bt():
_clartBuf()
prvMills = utime.ticks_ms()
received_data = ''
while (utime.ticks_ms()-prvMills)<2000:
if uart0.any():
single_data = uart0.read(1)
received_data += single_data.decode('utf-8')
return received_data
import utime
import rp2
from rp2 import PIO, asm_pio
from machine import Pin
dht_pwr = Pin(14, Pin.OUT) #connect GPIO 14 to '+' on DHT11
dht_data = Pin(15, Pin.IN, Pin.PULL_UP) #connect GPIO 15 to 'out' on DHT11
dht_pwr.value(1) #power on DHT11
sm=rp2.StateMachine(1) #create empty state machine
utime.sleep(2) #wait for DHT11 to start up
@asm_pio(set_init=(PIO.OUT_HIGH),autopush=True, push_thresh=8) #output one byte at a time
def DHT22():
#drive output low for at least 20ms
set(pindirs,1) #set pin to output
set(pins,0) #set pin low
set(y,31) #prepare countdown, y*x*100cycles
label ('waity')
set(x,31)
label ('waitx')
nop() [30]
jmp(x_dec,'waitx') #decrement x reg every 100 cycles
jmp(y_dec,'waity') #decrement y reg every time x reaches zero
#begin reading from device
set(pindirs,0) #set pin to input
wait(1,pin,0) #check pin is high before starting
wait(0,pin,0)
wait(1,pin,0)
wait(0,pin,0) #wait for start of data
#read databit
label('readdata')
set(x,21) #reset x register to count down from 20
wait(1,pin,0) #wait for high signal
label('countdown')
jmp(pin,'continue') #if pin still high continue counting
#pin is low before countdown is complete - bit '0' detected
set(y,0)
in_(y, 1) #shift '0' into the isr
jmp('readdata') #read the next bit
label('continue')
jmp(x_dec,'countdown') #decrement x reg and continue counting if x!=0
#pin is still high after countdown complete - bit '1' detected
set(y,1)
in_(y, 1) #shift one bit into the isr
wait(0,pin,0) #wait for low signal (next bit)
jmp('readdata') #read the next bit
# --------------------------------------------------- #
# ENTRYPOINT
# --------------------------------------------------- #
def work_dht():
print("DHT22 WORKING")
data=[]
total=0
sm.init(DHT22,freq=1600000,set_base=dht_data,in_base=dht_data,jmp_pin=dht_data) #start state machine
#state machine frequency adjusted so that PIO countdown during 'readdata' ends somewhere between the
#duration of a '0' and a '1' high signal
sm.active(1)
for i in range(5): #data should be 40 bits (5 bytes) long
data.append(sm.get()) #read byte
print("data: " + str(data))
#check checksum (lowest 8 bits of the sum of the first 4 bytes)
for i in range(4):
total=total+data[i]
if((total & 255) == data[4]):
humidity=((data[0]<<8) + data[1])/10.0 #DHT11 provides integer humidity (no decimal part)
temperature=(((data[2] &0x7f) << 8) + data[3]) /10.0 #DHT11 provides signed integer temperature (no decimal part)
if (data[2] & 0x80) == 0x80:
temperature = -temperature
return [humidity, temperature]
else:
return False
import tm1637
from machine import Pin
from utime import sleep
display4 = tm1637.TM1637(clk=Pin(12), dio=Pin(13))
# --------------------------------------------------- #
# FUNCTIONS
# --------------------------------------------------- #
def work_tm1637(data:str):
display4.show(data)
def off_tm1637():
display4.show(' ')
import array, utime
from machine import Pin
import rp2
import neopixel
import dht
import bluetoooth as bto
import ultrasonic
import reed
import display4
# --------------------------------------------------- #
# FUNCTIONS
# --------------------------------------------------- #
def _collect_sensor_datas(reed_data:int) -> str:
# Collect Humidity, Temperature
dht_data = dht.work_dht()
if dht_data == False:
dht_data = [0,0]
# Collect Ultrasonic distance
ultrasonic_data = ultrasonic.work_sr04()
# Make data string
send_data_str = str(reed_data) + '/' + str(dht_data[1]) + '/' + str(dht_data[0]) + '/' + str(ultrasonic_data)
return send_data_str
# --------------------------------------------------- #
# LOOP ENTRYPOINT
# --------------------------------------------------- #
def _run():
# INIT REED STATE
reed_data = -1
display4.off_tm1637()
# LOOP
while True:
# ------------------------------------------- #
# DEFAULT LOOP
# ------------------------------------------- #
# Get data using BT(Standby)
input_data = bto.recv_data_bt()
# Get reed data from reed sensor
current_reed_data = reed.work_reed()
# ------------------------------------------- #
# IF CONDITION MET
# ------------------------------------------- #
if input_data != '' or reed_data != current_reed_data:
# Refine BT data
input_data = input_data.strip()
# Test code
print('INPUT FOUND ', input_data)
# IF INPUT MEANS GET MESSAGE or MEDICINE LID STATUS CHANGED
if input_data == 'REQ' or reed_data != current_reed_data:
# Send data using BT
bto.send_data_bt(_collect_sensor_datas(reed_data))
else:
# Refine BT data
input_data = input_data.strip()
display4.work_tm1637(input_data)
neopixel.work_led()
display4.off_tm1637()
# Send data using BT
bto.send_data_bt(_collect_sensor_datas(reed_data))
# Update reed state
reed_data = current_reed_data
if __name__ == '__main__':
_run()
import array, utime
from machine import Pin
import rp2
NUM_LEDS = 8
PIN_NUM = 22
BLACK = (0, 0, 0)
RED = (255, 0, 0)
YELLOW = (255, 150, 0)
GREEN = (0, 255, 0)
CYAN = (0, 255, 255)
BLUE = (0, 0, 255)
PURPLE = (180, 0, 255)
WHITE = (255, 255, 255)
COLORS = (BLACK, RED, YELLOW, GREEN, CYAN, BLUE, PURPLE, WHITE, BLACK)
@rp2.asm_pio(sideset_init=rp2.PIO.OUT_LOW, out_shiftdir=rp2.PIO.SHIFT_LEFT, autopull=True, pull_thresh=24)
def ws2812():
T1 = 2
T2 = 5
T3 = 3
wrap_target()
label("bitloop")
out(x, 1) .side(0) [T3 - 1]
jmp(not_x, "do_zero") .side(1) [T1 - 1]
jmp("bitloop") .side(1) [T2 - 1]
label("do_zero")
nop() .side(0) [T2 - 1]
wrap()
# Create the StateMachine with the ws2812 program, outputting on pin
sm = rp2.StateMachine(0, ws2812, freq=8_000_000, sideset_base=Pin(PIN_NUM))
# Start the StateMachine, it will wait for data on its FIFO.
sm.active(1)
# Display a pattern on the LEDs via an array of LED RGB values.
ar = array.array("I", [0 for _ in range(NUM_LEDS)])
# --------------------------------------------------- #
# FUNCTIONS
# --------------------------------------------------- #
def _pixels_show(brightness:int = 0.2):
dimmer_ar = array.array("I", [0 for _ in range(NUM_LEDS)])
for i,c in enumerate(ar):
r = int(((c >> 8) & 0xFF) * brightness)
g = int(((c >> 16) & 0xFF) * brightness)
b = int((c & 0xFF) * brightness)
dimmer_ar[i] = (g<<16) + (r<<8) + b
sm.put(dimmer_ar, 8)
utime.sleep_ms(10)
def _pixels_set(i, color):
ar[i] = (color[1]<<16) + (color[0]<<8) + color[2]
def _pixels_fill(color):
for i in range(len(ar)):
_pixels_set(i, color)
# --------------------------------------------------- #
# ENTRYPOINT
# --------------------------------------------------- #
def work_led(brightness:int = 0.2):
print("WS2812B WORKING")
for color in COLORS:
_pixels_fill(color)
_pixels_show(brightness)
utime.sleep(0.4)
from machine import Pin, Signal
data_pin = Pin(16, Pin.IN)
# --------------------------------------------------- #
# FUNCTIONS
# --------------------------------------------------- #
def work_reed() -> int:
# 1 = Lid opened, 0 = Lid closed
return data_pin.value()
"""
MicroPython TM1637 quad 7-segment LED display driver
https://github.com/mcauser/micropython-tm1637
MIT License
Copyright (c) 2016 Mike Causer
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
"""
from micropython import const
from machine import Pin
from time import sleep_us, sleep_ms
TM1637_CMD1 = const(64) # 0x40 data command
TM1637_CMD2 = const(192) # 0xC0 address command
TM1637_CMD3 = const(128) # 0x80 display control command
TM1637_DSP_ON = const(8) # 0x08 display on
TM1637_DELAY = const(10) # 10us delay between clk/dio pulses
TM1637_MSB = const(128) # msb is the decimal point or the colon depending on your display
# 0-9, a-z, blank, dash, star
_SEGMENTS = bytearray(b'\x3F\x06\x5B\x4F\x66\x6D\x7D\x07\x7F\x6F\x77\x7C\x39\x5E\x79\x71\x3D\x76\x06\x1E\x76\x38\x55\x54\x3F\x73\x67\x50\x6D\x78\x3E\x1C\x2A\x76\x6E\x5B\x00\x40\x63')
class TM1637(object):
"""Library for quad 7-segment LED modules based on the TM1637 LED driver."""
def __init__(self, clk, dio, brightness=7):
self.clk = clk
self.dio = dio
if not 0 <= brightness <= 7:
raise ValueError("Brightness out of range")
self._brightness = brightness
self.clk.init(Pin.OUT, value=0)
self.dio.init(Pin.OUT, value=0)
sleep_us(TM1637_DELAY)
self._write_data_cmd()
self._write_dsp_ctrl()
def _start(self):
self.dio(0)
sleep_us(TM1637_DELAY)
self.clk(0)
sleep_us(TM1637_DELAY)
def _stop(self):
self.dio(0)
sleep_us(TM1637_DELAY)
self.clk(1)
sleep_us(TM1637_DELAY)
self.dio(1)
def _write_data_cmd(self):
# automatic address increment, normal mode
self._start()
self._write_byte(TM1637_CMD1)
self._stop()
def _write_dsp_ctrl(self):
# display on, set brightness
self._start()
self._write_byte(TM1637_CMD3 | TM1637_DSP_ON | self._brightness)
self._stop()
def _write_byte(self, b):
for i in range(8):
self.dio((b >> i) & 1)
sleep_us(TM1637_DELAY)
self.clk(1)
sleep_us(TM1637_DELAY)
self.clk(0)
sleep_us(TM1637_DELAY)
self.clk(0)
sleep_us(TM1637_DELAY)
self.clk(1)
sleep_us(TM1637_DELAY)
self.clk(0)
sleep_us(TM1637_DELAY)
def brightness(self, val=None):
"""Set the display brightness 0-7."""
# brightness 0 = 1/16th pulse width
# brightness 7 = 14/16th pulse width
if val is None:
return self._brightness
if not 0 <= val <= 7:
raise ValueError("Brightness out of range")
self._brightness = val
self._write_data_cmd()
self._write_dsp_ctrl()
def write(self, segments, pos=0):
"""Display up to 6 segments moving right from a given position.
The MSB in the 2nd segment controls the colon between the 2nd
and 3rd segments."""
if not 0 <= pos <= 5:
raise ValueError("Position out of range")
self._write_data_cmd()
self._start()
self._write_byte(TM1637_CMD2 | pos)
for seg in segments:
self._write_byte(seg)
self._stop()
self._write_dsp_ctrl()
def encode_digit(self, digit):
"""Convert a character 0-9, a-f to a segment."""
return _SEGMENTS[digit & 0x0f]
def encode_string(self, string):
"""Convert an up to 4 character length string containing 0-9, a-z,
space, dash, star to an array of segments, matching the length of the
source string."""
segments = bytearray(len(string))
for i in range(len(string)):
segments[i] = self.encode_char(string[i])
return segments
def encode_char(self, char):
"""Convert a character 0-9, a-z, space, dash or star to a segment."""
o = ord(char)
if o == 32:
return _SEGMENTS[36] # space
if o == 42:
return _SEGMENTS[38] # star/degrees
if o == 45:
return _SEGMENTS[37] # dash
if o >= 65 and o <= 90:
return _SEGMENTS[o-55] # uppercase A-Z
if o >= 97 and o <= 122:
return _SEGMENTS[o-87] # lowercase a-z
if o >= 48 and o <= 57:
return _SEGMENTS[o-48] # 0-9
raise ValueError("Character out of range: {:d} '{:s}'".format(o, chr(o)))
def hex(self, val):
"""Display a hex value 0x0000 through 0xffff, right aligned."""
string = '{:04x}'.format(val & 0xffff)
self.write(self.encode_string(string))
def number(self, num):
"""Display a numeric value -999 through 9999, right aligned."""
# limit to range -999 to 9999
num = max(-999, min(num, 9999))
string = '{0: >4d}'.format(num)
self.write(self.encode_string(string))
def numbers(self, num1, num2, colon=True):
"""Display two numeric values -9 through 99, with leading zeros
and separated by a colon."""
num1 = max(-9, min(num1, 99))
num2 = max(-9, min(num2, 99))
segments = self.encode_string('{0:0>2d}{1:0>2d}'.format(num1, num2))
if colon:
segments[1] |= 0x80 # colon on
self.write(segments)
def temperature(self, num):
if num < -9:
self.show('lo') # low
elif num > 99:
self.show('hi') # high
else:
string = '{0: >2d}'.format(num)
self.write(self.encode_string(string))
self.write([_SEGMENTS[38], _SEGMENTS[12]], 2) # degrees C
def show(self, string, colon=False):
segments = self.encode_string(string)
if len(segments) > 1 and colon:
segments[1] |= 128
self.write(segments[:4])
def scroll(self, string, delay=250):
segments = string if isinstance(string, list) else self.encode_string(string)
data = [0] * 8
data[4:0] = list(segments)
for i in range(len(segments) + 5):
self.write(data[0+i:4+i])
sleep_ms(delay)
class TM1637Decimal(TM1637):
"""Library for quad 7-segment LED modules based on the TM1637 LED driver.
This class is meant to be used with decimal display modules (modules
that have a decimal point after each 7-segment LED).
"""
def encode_string(self, string):
"""Convert a string to LED segments.
Convert an up to 4 character length string containing 0-9, a-z,
space, dash, star and '.' to an array of segments, matching the length of
the source string."""
segments = bytearray(len(string.replace('.','')))
j = 0
for i in range(len(string)):
if string[i] == '.' and j > 0:
segments[j-1] |= TM1637_MSB
continue
segments[j] = self.encode_char(string[i])
j += 1
return segments
''' Original code from https://www.iottrends.tech/blog/how-to-use-ultrasonic-sensor-with-raspberry-pi-pico/
'''
from machine import Pin
import utime
trigger = Pin(26, Pin.OUT)
echo = Pin(27, Pin.IN)
# --------------------------------------------------- #
# FUNCTIONS
# --------------------------------------------------- #
def work_sr04():
trigger.low()
utime.sleep_us(2)
trigger.high()
utime.sleep_us(5)
trigger.low()
while echo.value() == 0:
signaloff = utime.ticks_us()
while echo.value() == 1:
signalon = utime.ticks_us()
timepassed = signalon - signaloff
distance = (timepassed * 0.0330) / 2
return distance
#include <SoftwareSerial.h>
#include <DHT.h>
#include <TM1637Display.h>
#include <Adafruit_NeoPixel.h>
#include <string.h>
//-------------Bluetooth--------------//
#define BLUETXPIN 2
#define BLUERXPIN 3
const byte numChars = 15;
char inputdata[numChars]; // an array to store the received data
boolean newData = false;
//--------Temperature, Humidity--------//
#define DHTPIN A2
#define DHTTYPE DHT22
DHT dht(DHTPIN, DHTTYPE);
//-------------Ultrasonic-------------//
#define USTRIGPIN A0
#define USECHOPIN A1
long duration;
long distance;
//--------------Magnetic--------------//
#define MAGPIN 13
int mag_value = 0;
SoftwareSerial wirelessSerial(BLUETXPIN, BLUERXPIN);
//--------------7Segment--------------//
#define TM1637CLKPIN 5
#define TM1637DIOPIN 4
TM1637Display display = TM1637Display(TM1637CLKPIN, TM1637DIOPIN);
// All segments on:
const uint8_t data[] = {0xff, 0xff, 0xff, 0xff};
// All segments off:
const uint8_t blank[] = {0x00, 0x00, 0x00, 0x00};
//--------------NeoPixel--------------//
#define NEOPIXELPIN 6
#define NUMLED 8
Adafruit_NeoPixel strip = Adafruit_NeoPixel(NUMLED, NEOPIXELPIN, NEO_GRB + NEO_KHZ800);
//------------Entrypoint--------------//
void setup() {
// Temp, Hum
dht.begin();
// Ultrasonic
pinMode(USTRIGPIN, OUTPUT);
pinMode(USECHOPIN, INPUT);
// Magnetic
pinMode(MAGPIN, INPUT);
// Bluetooth
wirelessSerial.begin(9600);
// 7Segment
display.clear();
display.setBrightness(7);
// NeoPixel
strip.begin();
strip.show();
// Test
Serial.begin(9600);
}
//------------Functions-------------//
void recvWithEndMarker() {
static byte ndx = 0;
char endMarker = '\n';
char rc;
while (wirelessSerial.available() > 0 && newData == false) {
rc = wirelessSerial.read();
if (rc != endMarker) {
inputdata[ndx] = rc;
ndx++;
if (ndx >= numChars) {
ndx = numChars - 1;
}
}
else {
inputdata[ndx] = '\0'; // terminate the string
ndx = 0;
newData = true;
}
}
if (newData == true) {
Serial.print("TESTTSETESTST ...... ");
Serial.println(inputdata);
Serial.println(strlen(inputdata));
int inputdata_len = strlen(inputdata) - 10;
Serial.print("INPUTDATALEN: ");
Serial.println(inputdata_len);
inputdata[inputdata_len] = '\0';
Serial.println(inputdata);
Serial.println(strlen(inputdata));
}
}
void showNewData() {
Serial.print("This just in ... ");
Serial.println(inputdata);
Serial.println(strlen(inputdata));
}
void ultrasonicSensor() {
// Send signal
digitalWrite(USTRIGPIN, LOW);
delayMicroseconds(2);
digitalWrite(USTRIGPIN, HIGH);
delayMicroseconds(10);
digitalWrite(USTRIGPIN, LOW);
// Save duration(Oneway x 2)
duration = pulseIn(USECHOPIN, HIGH);
// Calculate distance
Serial.print("Duration: ");
Serial.println(duration);
distance = (duration * 17) / 1000 ;
// Test
Serial.print("Distance: ");
Serial.print(distance);
Serial.println("cm");
// Send thw BLE
wirelessSerial.write(distance);
}
void temphumSensor() {
// Get humidity and temperature
float h = dht.readHumidity();
float t = dht.readTemperature();
// Test
Serial.print("Humidity: ");
Serial.print(h);
Serial.print(" %\t");
Serial.print("Temperature: ");
Serial.print(t);
Serial.println(" *C");
// Send thw BLE
wirelessSerial.write(h);
wirelessSerial.write(t);
}
void magneticSensor() {
// Get mag value
mag_value = digitalRead(MAGPIN);
// Test
Serial.print("Magnetic: ");
Serial.println(mag_value);
// Send thw BLE
wirelessSerial.write(mag_value);
}
void cycleNeoPixel() {
for (int i = 0; i < NUMLED; i++) {
strip.setPixelColor(i % NUMLED, 0, 0, 0);
strip.setPixelColor((i + 1) % NUMLED, 0, 1, 2);
strip.setPixelColor((i + 2) % NUMLED, 0, 4, 3);
strip.setPixelColor((i + 3) % NUMLED, 0, 16, 9);
strip.setPixelColor((i + 4) % NUMLED, 0, 30, 16);
strip.setPixelColor((i + 5) % NUMLED, 0, 60, 31);
strip.setPixelColor((i + 6) % NUMLED, 0, 0, 0);
strip.setPixelColor((i + 7) % NUMLED, 1, 2, 0);
strip.show();
delay(80);
}
}
void offNeoPixel() {
for (int i = 0; i < NUMLED; i++) {
strip.setPixelColor(i, 0, 0, 0);
}
strip.show();
}
void doseDisplay() {
Serial.print("Dose: ");
Serial.println(inputdata);
Serial.println(int(inputdata[0] - '0'));
for (int i = 0; i < 10; i++) {
cycleNeoPixel();
display.showNumberDec(int(inputdata[0] - '0'), false, 1, 3);
}
display.clear();
offNeoPixel();
}
void dateDisplay() {
Serial.print("Last time: ");
Serial.print(inputdata[0]);
Serial.print(inputdata[1]);
Serial.print(inputdata[2]);
Serial.println(inputdata[3]);
display.showNumberDec(int(inputdata[0] - '0'), false, 1, 0);
display.showNumberDec(int(inputdata[1] - '0'), false, 1, 1);
display.showNumberDec(int(inputdata[2] - '0'), false, 1, 2);
display.showNumberDec(int(inputdata[3] - '0'), false, 1, 3);
delay(5000);
display.clear();
}
//---------------Main----------------//
void loop() {
// Data
if (newData == false) {
recvWithEndMarker();
}
// Call Functions
if (newData == true) {
showNewData();
// Menu
if (inputdata[0] == 'A') {
ultrasonicSensor();
}
else if (inputdata[0] == 'B') {
temphumSensor();
}
else if (inputdata[0] == 'C') {
magneticSensor();
}
else if (strlen(inputdata) < 4) {
doseDisplay();
} else {
dateDisplay();
}
newData = false;
}
}
\ No newline at end of file