# -*- coding: utf-8 -*- # original: https://raw.githubusercontent.com/UedaTakeyuki/slider/master/mh_z19.py # # © Takeyuki UEDA 2015 - import serial import time import subprocess import traceback import struct import platform import argparse import sys import json import os.path # setting version = "0.3.9" serial_dev = 'COM3' # major version of running python p_ver = platform.python_version_tuple()[0] def set_serialdevice(serialdevicename): global serial_dev serial_dev = serialdevicename def connect_serial(): return serial.Serial(serial_dev, baudrate=9600, bytesize=serial.EIGHTBITS, parity=serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE, timeout=1.0) def mh_z19(ser): try: while 1: result=ser.write(b"\xff\x01\x86\x00\x00\x00\x00\x00\x79") s=ser.read(9) if p_ver == '2': if len(s) >= 4 and s[0] == "\xff" and s[1] == "\x86": return {'co2': ord(s[2])*256 + ord(s[3])} break else: if len(s) >= 4 and s[0] == 0xff and s[1] == 0x86: return {'co2': s[2]*256 + s[3]} break except: traceback.print_exc() def read(ser): result = mh_z19(ser) if result is not None: return result def read_all(ser): try: while 1: result=ser.write(b"\xff\x01\x86\x00\x00\x00\x00\x00\x79") s=ser.read(9) if p_ver == '2': if len(s) >= 9 and s[0] == "\xff" and s[1] == "\x86": return {'co2': ord(s[2])*256 + ord(s[3]), 'temperature': ord(s[4]) - 40, 'TT': ord(s[4]), 'SS': ord(s[5]), 'UhUl': ord(s[6])*256 + ord(s[7]) } break else: if len(s) >= 9 and s[0] == 0xff and s[1] == 0x86: return {'co2': s[2]*256 + s[3], 'temperature': s[4] - 40, 'TT': s[4], 'SS': s[5], 'UhUl': s[6]*256 + s[7] } break except: traceback.print_exc() if result is not None: return result def abc_on(): ser = connect_serial() result=ser.write(b"\xff\x01\x79\xa0\x00\x00\x00\x00\xe6") ser.close() def abc_off(): ser = connect_serial() result=ser.write(b"\xff\x01\x79\x00\x00\x00\x00\x00\x86") ser.close() def span_point_calibration(span): ser = connect_serial() if p_ver == '2': b3 = span / 256; else: b3 = span // 256; byte3 = struct.pack('B', b3) b4 = span % 256; byte4 = struct.pack('B', b4) c = checksum([0x01, 0x88, b3, b4]) request = b"\xff\x01\x88" + byte3 + byte4 + b"\x00\x00\x00" + c result = ser.write(request) ser.close() def zero_point_calibration(): ser = connect_serial() request = b"\xff\x01\x87\x00\x00\x00\x00\x00\x78" result = ser.write(request) ser.close() def detection_range_5000(): ser = connect_serial() request = b"\xff\x01\x99\x00\x00\x00\x13\x88\xcb" result = ser.write(request) ser.close() def detection_range_2000(): ser = connect_serial() request = b"\xff\x01\x99\x00\x00\x00\x07\xd0\x8F" result = ser.write(request) ser.close() def checksum(array): return struct.pack('B', 0xff - (sum(array) % 0x100) + 1) if __name__ == '__main__': # value = read() # print (value) parser = argparse.ArgumentParser( description='''return CO2 concentration as object as {'co2': 416}''', ) group = parser.add_mutually_exclusive_group() group.add_argument("--serial_device", type=str, help='''Use this serial device file''') group.add_argument("--version", action='store_true', help='''show version''') group.add_argument("--all", action='store_true', help='''return all (co2, temperature, TT, SS and UhUl) as json''') group.add_argument("--abc_on", action='store_true', help='''Set ABC functionality on model B as ON.''') group.add_argument("--abc_off", action='store_true', help='''Set ABC functionality on model B as OFF.''') parser.add_argument("--span_point_calibration", type=int, help='''Call calibration function with SPAN point''') parser.add_argument("--zero_point_calibration", action='store_true', help='''Call calibration function with ZERO point''') parser.add_argument("--detection_range_5000", action='store_true', help='''Set detection range as 5000''') parser.add_argument("--detection_range_2000", action='store_true', help='''Set detection range as 2000''') args = parser.parse_args() if args.serial_device is not None: set_serialdevice(args.serial_device) if args.abc_on: abc_on() print ("Set ABC logic as on.") elif args.abc_off: abc_off() print ("Set ABC logic as off.") elif args.span_point_calibration is not None: span_point_calibration(args.span_point_calibration) print ("Call Calibration with SPAN point.") elif args.zero_point_calibration: print ("Call Calibration with ZERO point.") zero_point_calibration() elif args.detection_range_5000: detection_range_5000() print ("Set Detection range as 5000.") elif args.detection_range_2000: detection_range_2000() print ("Set Detection range as 2000.") elif args.version: print (version) elif args.all: value = read_all(ser) print (json.dumps(value)) else: ser = connect_serial() while True: value = read_all(ser) print (json.dumps(value)) time.sleep(5) sys.exit(0)