# MIT License # # Original author: Zoltan Szarvas # https://github.com/szazo/DHT11_Python # # Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: # # The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. # # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #!/usr/bin/python3 import time import sys import RPi.GPIO as GPIO # DHTxx sensor result returned by DHT.read() method' class DHTResult: ERR_NO_ERROR = 0 ERR_MISSING_DATA = 1 ERR_CRC = 2 ERR_NOT_FOUND = 3 error_code = ERR_NO_ERROR temperature = -1 humidity = -1 def __init__(self, error_code, temperature, humidity): self.error_code = error_code self.temperature = temperature self.humidity = humidity def is_valid(self): return self.error_code == DHTResult.ERR_NO_ERROR # DHTxx sensor reader class for Raspberry' class DHT: __pin = 0 __isDht11 = True def __init__(self, pin, isDht11): self.__pin = pin self.__isDht11 = isDht11 def read(self): GPIO.setup(self.__pin, GPIO.OUT) # send initial high self.__send_and_sleep(GPIO.HIGH, 0.05) # pull down to low self.__send_and_sleep(GPIO.LOW, 0.02) # change to input using pull up GPIO.setup(self.__pin, GPIO.IN, GPIO.PUD_UP) # collect data into an array data = self.__collect_input() # parse lengths of all data pull up periods pull_up_lengths = self.__parse_data_pull_up_lengths(data) if len(pull_up_lengths) == 0: return DHTResult(DHTResult.ERR_NOT_FOUND, 0, 0) # if bit count mismatch, return error (4 byte data + 1 byte checksum) if len(pull_up_lengths) != 40: return DHTResult(DHTResult.ERR_MISSING_DATA, 0, 0) # calculate bits from lengths of the pull up periods bits = self.__calculate_bits(pull_up_lengths) # we have the bits, calculate bytes the_bytes = self.__bits_to_bytes(bits) # calculate checksum and check checksum = self.__calculate_checksum(the_bytes) if the_bytes[4] != checksum: return DHTResult(DHTResult.ERR_CRC, 0, 0) # ok, we have valid data # The meaning of the return sensor values # the_bytes[0]: humidity int # the_bytes[1]: humidity decimal # the_bytes[2]: temperature int # the_bytes[3]: temperature decimal temperature = -1 humidity = -1 if(self.__isDht11): # DHT11 temperature = the_bytes[2] + float(the_bytes[3]) / 10 humidity = the_bytes[0] + float(the_bytes[1]) / 10 else: #DHT22 temperature = (the_bytes[2] * 256 + float(the_bytes[3])) / 10 humidity = (the_bytes[0] * 256 + float(the_bytes[1])) / 10 c = (float)(((the_bytes[2]&0x7F)<< 8)+the_bytes[3])/10 if ( c > 125 ): c = the_bytes[2] if (the_bytes[2] & 0x80): c = -c; temperature = c humidity = ((the_bytes[0]<<8)+the_bytes[1])/10.00 return DHTResult(DHTResult.ERR_NO_ERROR, temperature, humidity) def __send_and_sleep(self, output, sleep): GPIO.output(self.__pin, output) time.sleep(sleep) def __collect_input(self): # collect the data while unchanged found unchanged_count = 0 # this is used to determine where is the end of the data max_unchanged_count = 100 last = -1 data = [] while True: current = GPIO.input(self.__pin) data.append(current) if last != current: unchanged_count = 0 last = current else: unchanged_count += 1 if unchanged_count > max_unchanged_count: break return data def __parse_data_pull_up_lengths(self, data): STATE_INIT_PULL_DOWN = 1 STATE_INIT_PULL_UP = 2 STATE_DATA_FIRST_PULL_DOWN = 3 STATE_DATA_PULL_UP = 4 STATE_DATA_PULL_DOWN = 5 state = STATE_INIT_PULL_DOWN lengths = [] # will contain the lengths of data pull up periods current_length = 0 # will contain the length of the previous period for i in range(len(data)): current = data[i] current_length += 1 if state == STATE_INIT_PULL_DOWN: if current == GPIO.LOW: # ok, we got the initial pull down state = STATE_INIT_PULL_UP continue else: continue if state == STATE_INIT_PULL_UP: if current == GPIO.HIGH: # ok, we got the initial pull up state = STATE_DATA_FIRST_PULL_DOWN continue else: continue if state == STATE_DATA_FIRST_PULL_DOWN: if current == GPIO.LOW: # we have the initial pull down, the next will be the data pull up state = STATE_DATA_PULL_UP continue else: continue if state == STATE_DATA_PULL_UP: if current == GPIO.HIGH: # data pulled up, the length of this pull up will determine whether it is 0 or 1 current_length = 0 state = STATE_DATA_PULL_DOWN continue else: continue if state == STATE_DATA_PULL_DOWN: if current == GPIO.LOW: # pulled down, we store the length of the previous pull up period lengths.append(current_length) state = STATE_DATA_PULL_UP continue else: continue return lengths def __calculate_bits(self, pull_up_lengths): # find shortest and longest period shortest_pull_up = 1000 longest_pull_up = 0 for i in range(0, len(pull_up_lengths)): length = pull_up_lengths[i] if length < shortest_pull_up: shortest_pull_up = length if length > longest_pull_up: longest_pull_up = length # use the halfway to determine whether the period it is long or short halfway = shortest_pull_up + (longest_pull_up - shortest_pull_up) / 2 bits = [] for i in range(0, len(pull_up_lengths)): bit = False if pull_up_lengths[i] > halfway: bit = True bits.append(bit) return bits def __bits_to_bytes(self, bits): the_bytes = [] byte = 0 for i in range(0, len(bits)): byte = byte << 1 if (bits[i]): byte = byte | 1 else: byte = byte | 0 if ((i + 1) % 8 == 0): the_bytes.append(byte) byte = 0 return the_bytes def __calculate_checksum(self, the_bytes): return the_bytes[0] + the_bytes[1] + the_bytes[2] + the_bytes[3] & 255 # Parse command line parameters if len(sys.argv) == 3: sensor = sys.argv[1] if sensor not in ("11", "22", "2302"): print('Sensor "{}" is not valid. Use 11, 22 or 2302'.format(sensor)) exit(1) isDht11 = sensor == "11" try: pin = int(sys.argv[2]) if (pin < 2 or pin > 27): raise ValueError except: print('Gpio {} is not valid'.format(pin)) exit(1) else: print('usage: dht.py [11|22|2302] [gpio#]') exit(1) # initialize GPIO GPIO.setwarnings(False) GPIO.setmode(GPIO.BCM) # read data MAX_ATTEMPTS = 15 MAX_NOT_FOUND_ATTEMPTS = 3 dht = DHT(pin, isDht11) result = dht.read() not_found_attempts = 0 # make some attempts, because someone may not be successful for x in range(0, MAX_ATTEMPTS): if result.is_valid() or not_found_attempts == MAX_NOT_FOUND_ATTEMPTS: break else: time.sleep(2) result = dht.read() if result.error_code == DHTResult.ERR_NOT_FOUND: not_found_attempts += 1 else: not_found_attempts = 0 # print result if result.is_valid(): print('Temp: {0:0.1f} C Humidity: {1:0.1f} %'.format(result.temperature, result.humidity)) else: print('Failed to get reading. Is the sensor connected? Is the pin number correct?') # clean the gpio GPIO.cleanup()