From 1637a172fca7cef2f352066252f5bc41dd7b8d5a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dominic=20H=C3=B6glinger?= Date: Sat, 17 May 2025 17:45:18 +0200 Subject: [PATCH] st_record.py: Live view of signal values The TUI is overhauled to display a live view of all received signals. All values are rendered in hexadecimal, while events show their latest timestamp and strings are stripped of trailing newlines. --- st_record.py | 88 +++++++++++++++++++++++++++++++++++++++------------- 1 file changed, 66 insertions(+), 22 deletions(-) diff --git a/st_record.py b/st_record.py index 157fc2c..4c6b2c4 100644 --- a/st_record.py +++ b/st_record.py @@ -532,15 +532,10 @@ def main(): nfile = open(noisefile, 'wb') packet_filter.onnoise(lambda b: nfile.write(b.to_bytes(1))) - print("Signals:") - for var in signals: - print(f" - {var}") - print() - if enable_tui: tui_record(signals, packet_filter, vcd_sink, enable_verbose_trace) else: - record(packet_filter, vcd_sink, enable_verbose_trace) + record(signals, packet_filter, vcd_sink, enable_verbose_trace) vcd_sink.writer.close() dfile.close() @@ -557,7 +552,12 @@ def main(): print(f" - Trace size: {trace_size}") -def record(packet_filter, vcd_sink, enable_verbose_trace): +def record(signals, packet_filter, vcd_sink, enable_verbose_trace): + print("Signals:") + for var in signals: + print(f" - {var}") + print() + packet_filter.onnoise(lambda b: print(chr(b), end="", flush=True)) def onval(timestamp, tag, value, sub): @@ -606,6 +606,50 @@ class TotalMaximumProgressUpdater: self.progress.update(self.progress_task, total=self.maximum) self.progress.update(self.progress_task, completed=value, visible=True) +class SignalTable: + def __init__(self, signals): + self.values = {} + self.arraycache = {} + self.types = {} + + for signal in signals: + name,value_type = signal.split(":") + self.values[name] = None + if len(s := value_type.split("[")) > 1: + value_type = s[0] + array_len = int(s[1].split("]")[0], 0) + self.arraycache[name] = [None] * array_len + self.types[name] = value_type + + def update(self, time, signal, value, sub): + try: + value_str = "" + value_type = self.types[signal] + + match value_type: + case "u8"|"s8": + value_str = f"x{value:02X}" + case "u16"|"s16": + value_str = f"x{value:04X}" + case "u32"|"s32": + value_str = f"x{value:08X}" + case "string": + value_str = value.rstrip("\r\n") + case "event": + value_str = str(time) + + if (signal in self.arraycache) and (sub is not None): + if sub >= len(self.arraycache[signal]): + self.arraycache[signal].extend([0] * (sub - len(self.arraycache[signal]) + 1)) + self.arraycache[signal][sub] = value_str + value_str = "[" + ", ".join(['X' if x is None else x for x in self.arraycache[signal]]) + "]" + + self.values[signal] = value_str + except Exception as e: + print("EEE", e) + print(f"{sub=} {signal=} {len(self.arraycache[signal])=}") + print('Error on line {}'.format(sys.exc_info()[-1].tb_lineno), type(e).__name__, e) + def tui_record(signals, packet_filter, vcd_sink, enable_verbose_trace): try: from rich.console import Console @@ -649,28 +693,28 @@ def tui_record(signals, packet_filter, vcd_sink, enable_verbose_trace): comp_time_tm = TotalMaximumProgressUpdater(diag_progress, comp_time) # set up table layout and signal view - signal_values = {name.split(":")[0]:None for name in signals} - - - def on_any_value(time,signal,value,sub): - signal_values[signal].plain(value) - - vcd_sink.onanyvalue(on_any_value) + signal_values = SignalTable(signals) + vcd_sink.onanyvalue(signal_values.update) def generate_table(diag_progress, signal_values): - grid = Table.grid(expand=True) + grid = Table.grid(expand=False) grid.add_column(justify="left") - grid.add_column(justify="left") - grid.add_column(justify="left") - grid.add_row(None) # this empty row is there to not leave behind a render on interrupt + + signals_width = max([len(x) for x in signal_values.values.keys()]) + 2 + sigtable = Table.grid(expand=False) + sigtable.add_column(justify="left", width=10) + sigtable.add_column(justify="left", width=signals_width) + sigtable.add_column(justify="left") + sigtable.add_row(None) # this empty row is there to not leave behind a render on interrupt for sig in signals: name, sigtype = sig.split(":") - value = signal_values[name] + value = signal_values.values[name] text = Text("X" if value is None else value, style="red" if value is None else "green") - grid.add_row(sigtype, name, text) - - grid.add_row("Diagnostics", diag_progress) + sigtable.add_row(sigtype, name, text) + grid.add_row(sigtable) + grid.add_row(None) + grid.add_row(diag_progress) return grid with Live(console=console, transient=True) as live_status: