Add support for direct serial connection #1

Merged
oddstr13 merged 6 commits from configurable-source-device into master 2023-04-07 22:35:07 +00:00
11 changed files with 125 additions and 13 deletions

View File

@ -2,7 +2,7 @@
# See https://pre-commit.com/hooks.html for more hooks
repos:
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v4.0.1
rev: v4.4.0
hooks:
- id: trailing-whitespace
- id: end-of-file-fixer
@ -23,7 +23,7 @@ repos:
- id: detect-private-key
- repo: https://github.com/editorconfig-checker/editorconfig-checker.python
rev: 2.3.54
rev: 2.7.1
hooks:
- id: editorconfig-checker
args:
@ -35,16 +35,19 @@ repos:
- id: flake8
- repo: https://github.com/pre-commit/mirrors-mypy
rev: v0.910-1
rev: v1.2.0
hooks:
- id: mypy
args:
- "--install-types"
- "--non-interactive"
- repo: https://github.com/psf/black
rev: 21.10b0
rev: 23.3.0
hooks:
- id: black
- repo: https://github.com/PyCQA/isort
rev: 5.9.3
rev: 5.12.0
hooks:
- id: isort

View File

@ -1,7 +1,8 @@
# Connector
The connector is a RJ12 (6P6C, phone connector with all 6 positions populated)
The interface uses RS-232 levels (±15V)
The connector is a RJ12 (6P6C, phone connector with all 6 positions populated).
The interface uses RS-232 levels (±5V), make sure your adaptor can handle this! (Some adaptors require higher voltages).
TODO: Triple check RS-232 voltages.
## Pinout

12
config-example.yaml Normal file
View File

@ -0,0 +1,12 @@
consumers:
stdio.StdoutConsumer: {}
interface:
name: serial.SerialInterface
params:
port: /dev/ttyUSB0
baudrate: 9600
timeout: 2
# name: feasycom.FeasycomInterface
# params:
# mac: DC:0D:30:9C:61:BA
# timeout: 5

29
misc/test_serial.py Normal file
View File

@ -0,0 +1,29 @@
# -*- coding: utf-8 -*-
import os
import sys
from time import sleep
from serial import Serial
print(sys.path)
sys.path.insert(1, os.path.dirname(os.path.dirname(sys.argv[0])))
# from srnemqtt.constants import MAC
# from srnemqtt.lib.feasycom_ble import BTLEUart
from srnemqtt.protocol import construct_request, write # noqa: E402
for rate in [1200, 2400, 4800, 9600, 115200]:
print(rate)
with Serial("/dev/ttyUSB0", baudrate=rate, timeout=2) as x:
sleep(2)
print(x)
write(x, construct_request(0x0E, words=3))
print(x.read(3))
print(x.read(6))
print(x.read(2))
# x.timeout = 2
# print(x.read())
# print(x.read(1))

View File

@ -0,0 +1,7 @@
# -*- coding: utf-8 -*-
from serial import Serial
with Serial("/dev/ttyUSB0", baudrate=9600, timeout=2) as x:
x.write(b"Hello, World!")
print(x.read(13))
print(x.read(13))

View File

@ -3,3 +3,6 @@ rrdtool
bluepy
libscrc
paho-mqtt
pyserial
types-PyYAML

View File

@ -5,28 +5,34 @@ import time
from decimal import Decimal
from typing import cast
from bluepy import btle
from bluepy.btle import BTLEDisconnectError
from serial import SerialException
from .config import get_config, get_consumers
from .constants import MAC
from .lib.feasycom_ble import BTLEUart
from .config import get_config, get_consumers, get_interface
from .protocol import parse_battery_state, parse_historical_entry, try_read_parse
from .solar_types import DataName
from .util import Periodical, log
class CommunicationError(BTLEDisconnectError, SerialException, IOError):
pass
def main():
conf = get_config()
consumers = get_consumers(conf)
per_voltages = Periodical(interval=15)
per_current_hist = Periodical(interval=60)
# import serial
# ser = serial.Serial()
try:
while True:
try:
log("Connecting...")
with BTLEUart(MAC, timeout=5) as dev:
with get_interface() as dev:
log("Connected.")
# write(dev, construct_request(0, 32))
@ -96,7 +102,7 @@ def main():
# else:
# write(wd, CMD_ENABLE_LOAD)
except btle.BTLEDisconnectError:
except CommunicationError:
log("ERROR: Disconnected")
time.sleep(1)

View File

@ -6,6 +6,8 @@ from typing import Any, Dict, List, Optional, Type
import yaml
from srnemqtt.interfaces import BaseInterface
from .consumers import BaseConsumer
@ -52,6 +54,30 @@ def get_consumers(conf: Optional[Dict[str, Any]] = None) -> List[BaseConsumer]:
return consumers
def _get_interface(name: str) -> Type[BaseInterface]:
mod_name, cls_name = name.rsplit(".", 1)
mod = importlib.import_module(f".interfaces.{mod_name}", package=__package__)
res = getattr(mod, cls_name)
assert issubclass(res, BaseInterface)
return res
def get_interface(conf: Optional[Dict[str, Any]] = None) -> BaseInterface:
if conf is None:
conf = get_config()
name = conf["interface"]["name"]
params = conf["interface"].get("params", {})
mod = _get_interface(name)
assert mod
return mod(**params)
if __name__ == "__main__":
conf = get_config()

View File

@ -0,0 +1,7 @@
# -*- coding: utf-8 -*-
from abc import ABCMeta
from io import RawIOBase
class BaseInterface(RawIOBase, metaclass=ABCMeta):
pass

View File

@ -0,0 +1,10 @@
# -*- coding: utf-8 -*-
from ..lib.feasycom_ble import BTLEUart
from . import BaseInterface
class FeasycomInterface(BTLEUart, BaseInterface):
pass
# BTLEUart(mac=MAC, timeout=5)

View File

@ -0,0 +1,8 @@
# -*- coding: utf-8 -*-
import serial
from . import BaseInterface
class SerialInterface(serial.Serial, BaseInterface):
pass