# -*- coding: utf-8 -*- from ast import literal_eval from typing import Iterable, List from table_drawing import table from srnemqtt.protocol import parse_packet def memory_table( data: Iterable[int], start: int = 0, wordsize: int = 2, skip_nullrows: bool = True, ): data_iter = iter(data) data_rows: List[List[str]] = [] position = start try: while True: try: row_id = position row = [] for _ in range(16): try: cell = [] for _ in range(wordsize): d = data_iter.__next__() cell.append(d) finally: if cell: position += 1 row.append(cell) finally: if row: if skip_nullrows: is_null = not any([any(cell) for cell in row]) if is_null: if data_rows[-1]: data_rows.append([]) continue row_hex = [ " ".join([f"{n:02X}" if n is not None else " " for n in cell]) for cell in row ] row_ascii = [ " ".join( [ " " + chr(n) if n is not None and chr(n).isprintable() else " " for n in cell ] ) for cell in row ] row_head = f"{row_id:04X}"[:3] + "·" data_rows.append([row_head] + row_hex) if " ".join(row_ascii).strip(): data_rows.append([""] + row_ascii) except StopIteration: pass headers = [""] + [ ("" if wordsize % 2 else " ") + "···{:01X}".format(x) for x in range(16) ] return table( data_rows, headers=headers, justify="^", # header_interval=8 * 2, collapse_empty=True, ) def parse_log(fh, chunksize=32): # address = None for line in fh.readlines(): if " " not in line: continue date, time, text = line.strip().split(" ", 2) if text.startswith("Reading"): _, addr_txt = text.split(" ") # address = int(addr_txt.strip("."), 16) elif text.startswith("bytearray"): data_txt: str = text.split("(", 1)[1].strip(")") packet = literal_eval(data_txt) # print(address, packet) data = parse_packet(packet) dlen = len(data) for i in range(chunksize): if i < dlen: yield data[i] else: yield None if __name__ == "__main__": with open("z_solar copy.log") as fh: data = list(parse_log(fh)) # print(data) # data = list(range(256)) print( memory_table( data, wordsize=2, skip_nullrows=True, ) ) # # rows = [[f"{x:03X}·"] for x in range(32)] #