2021-11-02 22:43:16 +00:00
|
|
|
# -*- coding: utf-8 -*-
|
2021-11-03 21:25:39 +00:00
|
|
|
from ast import literal_eval
|
|
|
|
from typing import Iterable, List
|
2021-11-02 22:43:16 +00:00
|
|
|
|
|
|
|
from table_drawing import table
|
|
|
|
|
2023-01-07 21:18:30 +00:00
|
|
|
from srnemqtt.protocol import parse_packet
|
2023-01-07 17:24:41 +00:00
|
|
|
|
2021-11-02 22:43:16 +00:00
|
|
|
|
2021-11-03 21:25:39 +00:00
|
|
|
def memory_table(
|
|
|
|
data: Iterable[int],
|
|
|
|
start: int = 0,
|
|
|
|
wordsize: int = 2,
|
|
|
|
skip_nullrows: bool = True,
|
|
|
|
):
|
2021-11-02 22:43:16 +00:00
|
|
|
data_iter = iter(data)
|
2021-11-03 21:25:39 +00:00
|
|
|
data_rows: List[List[str]] = []
|
2021-11-02 22:43:16 +00:00
|
|
|
position = start
|
|
|
|
try:
|
|
|
|
while True:
|
|
|
|
try:
|
|
|
|
row_id = position
|
|
|
|
row = []
|
|
|
|
for _ in range(16):
|
|
|
|
try:
|
|
|
|
cell = []
|
|
|
|
for _ in range(wordsize):
|
2021-11-03 21:25:39 +00:00
|
|
|
d = data_iter.__next__()
|
2021-11-02 22:43:16 +00:00
|
|
|
cell.append(d)
|
|
|
|
finally:
|
|
|
|
if cell:
|
2021-11-03 21:25:39 +00:00
|
|
|
position += 1
|
2021-11-02 22:43:16 +00:00
|
|
|
row.append(cell)
|
|
|
|
finally:
|
|
|
|
if row:
|
2021-11-03 21:25:39 +00:00
|
|
|
if skip_nullrows:
|
|
|
|
is_null = not any([any(cell) for cell in row])
|
|
|
|
if is_null:
|
|
|
|
if data_rows[-1]:
|
|
|
|
data_rows.append([])
|
|
|
|
continue
|
|
|
|
row_hex = [
|
|
|
|
" ".join([f"{n:02X}" if n is not None else " " for n in cell])
|
|
|
|
for cell in row
|
|
|
|
]
|
2021-11-02 22:43:16 +00:00
|
|
|
row_ascii = [
|
|
|
|
" ".join(
|
|
|
|
[
|
2021-11-03 21:25:39 +00:00
|
|
|
" " + chr(n)
|
|
|
|
if n is not None and chr(n).isprintable()
|
|
|
|
else " "
|
2021-11-02 22:43:16 +00:00
|
|
|
for n in cell
|
|
|
|
]
|
|
|
|
)
|
|
|
|
for cell in row
|
|
|
|
]
|
|
|
|
row_head = f"{row_id:04X}"[:3] + "·"
|
|
|
|
data_rows.append([row_head] + row_hex)
|
|
|
|
if " ".join(row_ascii).strip():
|
|
|
|
data_rows.append([""] + row_ascii)
|
|
|
|
except StopIteration:
|
|
|
|
pass
|
|
|
|
|
|
|
|
headers = [""] + [
|
|
|
|
("" if wordsize % 2 else " ") + "···{:01X}".format(x) for x in range(16)
|
|
|
|
]
|
|
|
|
return table(
|
|
|
|
data_rows,
|
|
|
|
headers=headers,
|
|
|
|
justify="^",
|
2021-11-03 21:25:39 +00:00
|
|
|
# header_interval=8 * 2,
|
|
|
|
collapse_empty=True,
|
2021-11-02 22:43:16 +00:00
|
|
|
)
|
|
|
|
|
|
|
|
|
2021-11-03 21:25:39 +00:00
|
|
|
def parse_log(fh, chunksize=32):
|
|
|
|
# address = None
|
|
|
|
for line in fh.readlines():
|
|
|
|
if " " not in line:
|
|
|
|
continue
|
|
|
|
|
|
|
|
date, time, text = line.strip().split(" ", 2)
|
|
|
|
|
|
|
|
if text.startswith("Reading"):
|
|
|
|
_, addr_txt = text.split(" ")
|
|
|
|
# address = int(addr_txt.strip("."), 16)
|
|
|
|
|
|
|
|
elif text.startswith("bytearray"):
|
|
|
|
data_txt: str = text.split("(", 1)[1].strip(")")
|
|
|
|
packet = literal_eval(data_txt)
|
|
|
|
# print(address, packet)
|
2021-11-02 22:43:16 +00:00
|
|
|
|
2021-11-03 21:25:39 +00:00
|
|
|
data = parse_packet(packet)
|
|
|
|
dlen = len(data)
|
|
|
|
for i in range(chunksize):
|
|
|
|
if i < dlen:
|
|
|
|
yield data[i]
|
|
|
|
else:
|
|
|
|
yield None
|
|
|
|
|
|
|
|
|
|
|
|
with open("z_solar copy.log") as fh:
|
|
|
|
data = list(parse_log(fh))
|
|
|
|
# print(data)
|
|
|
|
|
|
|
|
# data = list(range(256))
|
|
|
|
|
|
|
|
|
|
|
|
print(
|
|
|
|
memory_table(
|
|
|
|
data,
|
|
|
|
wordsize=2,
|
|
|
|
skip_nullrows=True,
|
|
|
|
)
|
|
|
|
)
|
2021-11-02 22:43:16 +00:00
|
|
|
|
|
|
|
|
|
|
|
#
|
|
|
|
# rows = [[f"{x:03X}·"] for x in range(32)]
|
|
|
|
|
|
|
|
|
|
|
|
#
|