st_record.py: Live view of signal values

The TUI is overhauled to display a live view of all received signals.
All values are rendered in hexadecimal, while events show their latest timestamp
and strings are stripped of trailing newlines.
This commit is contained in:
Dominic Höglinger 2025-05-17 17:45:18 +02:00
parent c670df3ce0
commit 1637a172fc

View File

@ -532,15 +532,10 @@ def main():
nfile = open(noisefile, 'wb')
packet_filter.onnoise(lambda b: nfile.write(b.to_bytes(1)))
print("Signals:")
for var in signals:
print(f" - {var}")
print()
if enable_tui:
tui_record(signals, packet_filter, vcd_sink, enable_verbose_trace)
else:
record(packet_filter, vcd_sink, enable_verbose_trace)
record(signals, packet_filter, vcd_sink, enable_verbose_trace)
vcd_sink.writer.close()
dfile.close()
@ -557,7 +552,12 @@ def main():
print(f" - Trace size: {trace_size}")
def record(packet_filter, vcd_sink, enable_verbose_trace):
def record(signals, packet_filter, vcd_sink, enable_verbose_trace):
print("Signals:")
for var in signals:
print(f" - {var}")
print()
packet_filter.onnoise(lambda b: print(chr(b), end="", flush=True))
def onval(timestamp, tag, value, sub):
@ -606,6 +606,50 @@ class TotalMaximumProgressUpdater:
self.progress.update(self.progress_task, total=self.maximum)
self.progress.update(self.progress_task, completed=value, visible=True)
class SignalTable:
def __init__(self, signals):
self.values = {}
self.arraycache = {}
self.types = {}
for signal in signals:
name,value_type = signal.split(":")
self.values[name] = None
if len(s := value_type.split("[")) > 1:
value_type = s[0]
array_len = int(s[1].split("]")[0], 0)
self.arraycache[name] = [None] * array_len
self.types[name] = value_type
def update(self, time, signal, value, sub):
try:
value_str = ""
value_type = self.types[signal]
match value_type:
case "u8"|"s8":
value_str = f"x{value:02X}"
case "u16"|"s16":
value_str = f"x{value:04X}"
case "u32"|"s32":
value_str = f"x{value:08X}"
case "string":
value_str = value.rstrip("\r\n")
case "event":
value_str = str(time)
if (signal in self.arraycache) and (sub is not None):
if sub >= len(self.arraycache[signal]):
self.arraycache[signal].extend([0] * (sub - len(self.arraycache[signal]) + 1))
self.arraycache[signal][sub] = value_str
value_str = "[" + ", ".join(['X' if x is None else x for x in self.arraycache[signal]]) + "]"
self.values[signal] = value_str
except Exception as e:
print("EEE", e)
print(f"{sub=} {signal=} {len(self.arraycache[signal])=}")
print('Error on line {}'.format(sys.exc_info()[-1].tb_lineno), type(e).__name__, e)
def tui_record(signals, packet_filter, vcd_sink, enable_verbose_trace):
try:
from rich.console import Console
@ -649,28 +693,28 @@ def tui_record(signals, packet_filter, vcd_sink, enable_verbose_trace):
comp_time_tm = TotalMaximumProgressUpdater(diag_progress, comp_time)
# set up table layout and signal view
signal_values = {name.split(":")[0]:None for name in signals}
def on_any_value(time,signal,value,sub):
signal_values[signal].plain(value)
vcd_sink.onanyvalue(on_any_value)
signal_values = SignalTable(signals)
vcd_sink.onanyvalue(signal_values.update)
def generate_table(diag_progress, signal_values):
grid = Table.grid(expand=True)
grid = Table.grid(expand=False)
grid.add_column(justify="left")
grid.add_column(justify="left")
grid.add_column(justify="left")
grid.add_row(None) # this empty row is there to not leave behind a render on interrupt
signals_width = max([len(x) for x in signal_values.values.keys()]) + 2
sigtable = Table.grid(expand=False)
sigtable.add_column(justify="left", width=10)
sigtable.add_column(justify="left", width=signals_width)
sigtable.add_column(justify="left")
sigtable.add_row(None) # this empty row is there to not leave behind a render on interrupt
for sig in signals:
name, sigtype = sig.split(":")
value = signal_values[name]
value = signal_values.values[name]
text = Text("X" if value is None else value, style="red" if value is None else "green")
grid.add_row(sigtype, name, text)
grid.add_row("Diagnostics", diag_progress)
sigtable.add_row(sigtype, name, text)
grid.add_row(sigtable)
grid.add_row(None)
grid.add_row(diag_progress)
return grid
with Live(console=console, transient=True) as live_status: