Refractor to use feasycom_ble

This commit is contained in:
Odd Stråbø 2021-11-05 06:51:35 +01:00
parent 30246b9355
commit c91c819a42

View file

@ -5,10 +5,13 @@ import datetime
import struct import struct
import sys import sys
import time import time
from io import RawIOBase
from bluepy import btle from bluepy import btle
from libscrc import modbus from libscrc import modbus
from feasycom_ble import BTLEUart
MAC = "DC:0D:30:9C:61:BA" MAC = "DC:0D:30:9C:61:BA"
INTERVAL = 15 INTERVAL = 15
@ -158,14 +161,8 @@ CMD_ = b"\xff\x78\x00\x00\x00\x01"
STATUS = {} STATUS = {}
def parsePacket(data): # GET_BATTERY_STATE
timestamp = datetime.datetime.utcnow().isoformat(" ") def parse_battery_state(data: bytes) -> dict:
# prefix = data[0]
operation = data[1]
cc = data[2]
res = None
if operation == 3:
if cc == 0x0E: # GET_BATTERY_STATE
res = dict( res = dict(
zip( zip(
( (
@ -178,7 +175,7 @@ def parsePacket(data):
"load_current", "load_current",
"load_power", "load_power",
), ),
struct.unpack("!xxxHHHbbHHHxx", data), struct.unpack("!HHHbbHHH", data),
) )
) )
res["battery_voltage"] /= 10 res["battery_voltage"] /= 10
@ -187,43 +184,28 @@ def parsePacket(data):
res["load_current"] /= 100 res["load_current"] /= 100
STATUS.update(res) STATUS.update(res)
elif cc == 0x08: # GET_PANEL_STATUS (OR version) log(str(res))
return res
# GET_PANEL_STATUS
def parse_panel_status(data: bytes) -> dict:
res = dict( res = dict(
zip( zip(
("panel_voltage", "panel_current", "panel_power", "load_enabled"), ("panel_voltage", "panel_current", "panel_power", "load_enabled"),
struct.unpack("!xxxHHHx?xx", data), struct.unpack("!HHHx?", data),
) )
) )
res["panel_voltage"] /= 10 res["panel_voltage"] /= 10
res["panel_current"] /= 100 res["panel_current"] /= 100
STATUS.update(res) STATUS.update(res)
elif operation == 6 and cc == 1:
res = dict(zip(("load_enabled",), struct.unpack("!xxxxx?xx", data)))
STATUS.update(res)
if res: # elif operation == 6 and cc == 1:
print(timestamp, res) # res = dict(zip(("load_enabled",), struct.unpack("!xxxxx?xx", data)))
# STATUS.update(res)
log(str(res))
return res return res
print(timestamp, data)
sys.stdout.flush()
class Delegate(btle.DefaultDelegate):
data = bytearray()
def handleNotification(self, cHandle, data):
# print(cHandle, data, dlen)
self.data.extend(data)
c_crc = modbus(bytes(self.data[:-2]))
# byte order is inverted in regards to libscrc output
d_crc = self.data[-1] << 8 | self.data[-2]
# print(hex(c_crc), hex(d_crc))
if c_crc == d_crc:
parsePacket(self.data)
self.data.clear()
def write(fh, data): def write(fh, data):
@ -238,18 +220,6 @@ def construct_request(address, words=1, action=ACTION_READ, marker=0xFF):
return struct.pack("!BBHH", marker, action, address, words) return struct.pack("!BBHH", marker, action, address, words)
def poll(dev: btle.Peripheral, timeout: float = 1) -> bool:
start = time.time()
while not dev.waitForNotifications(0.2):
if time.time() < start + timeout:
return False
while dev.waitForNotifications(0.2):
pass
return True
def log(string: str): def log(string: str):
print(datetime.datetime.utcnow().isoformat(" "), string) print(datetime.datetime.utcnow().isoformat(" "), string)
sys.stdout.flush() sys.stdout.flush()
@ -275,51 +245,50 @@ def parse_packet(data):
return payload return payload
def readMemory(fh: RawIOBase, address: int, words: int = 1):
write(fh, construct_request(address, words=words))
header = fh.read(3)
if header and len(header) == 3:
tag, operation, size = header
data = fh.read(size)
_crc = fh.read(2)
if data and _crc:
crc = struct.unpack_from("<H", _crc)[0]
calculated_crc = modbus(bytes([tag, operation, size, *data]))
if crc == calculated_crc:
return data
if __name__ == "__main__": if __name__ == "__main__":
dlgt = Delegate()
prev = time.time() - INTERVAL prev = time.time() - INTERVAL
while True: while True:
try: try:
log("Connecting...") log("Connecting...")
with btle.Peripheral(MAC).withDelegate(dlgt) as dev: with BTLEUart(MAC, timeout=10) as dev:
from feasycom_ble import WRITE_DEVICE
wd = dev.getCharacteristics(uuid=WRITE_DEVICE)[0]
log("Connected.") log("Connected.")
poll(dev) # write(dev, construct_request(0, 32))
write(wd, construct_request(0, 32))
poll(dev)
poll(dev)
poll(dev)
# Memory dump # Memory dump
# for address in range(0, 0x10000, 16): # for address in range(0, 0x10000, 16):
# log(f"Reading 0x{address:04X}...") # log(f"Reading 0x{address:04X}...")
# write(wd, construct_request(address, 16)) # write(wd, construct_request(address, 16))
# poll(dev)
# poll(dev)
# poll(dev)
# poll(dev)
# poll(dev)
while True: while True:
poll(dev)
now = time.time() now = time.time()
diff = now - prev diff = now - prev
if diff >= INTERVAL: if diff >= INTERVAL:
prev += INTERVAL prev += INTERVAL
write(wd, construct_request(0x0107, 4)) # CMD_GET_PANEL_STATUS res = readMemory(dev, 0x0107, 4) # CMD_GET_PANEL_STATUS
poll(dev) if res:
parse_panel_status(res)
write(wd, construct_request(0x0100, 7)) # CMD_GET_BATTERY_STATE res = readMemory(dev, 0x0100, 7) # CMD_GET_BATTERY_STATE
poll(dev) if res:
parse_battery_state(res)
# if STATUS.get('load_enabled'): # if STATUS.get('load_enabled'):
# write(wd, CMD_DISABLE_LOAD) # write(wd, CMD_DISABLE_LOAD)