Compare commits

...

2 Commits

Author SHA1 Message Date
1637a172fc st_record.py: Live view of signal values
The TUI is overhauled to display a live view of all received signals.
All values are rendered in hexadecimal, while events show their latest timestamp
and strings are stripped of trailing newlines.
2025-05-17 17:45:18 +02:00
c670df3ce0 WIP Signal TUI view 2025-05-17 16:08:39 +02:00

View File

@ -532,15 +532,10 @@ def main():
nfile = open(noisefile, 'wb')
packet_filter.onnoise(lambda b: nfile.write(b.to_bytes(1)))
print("Signals:")
for var in signals:
print(f" - {var}")
print()
if enable_tui:
tui_record(packet_filter, vcd_sink, enable_verbose_trace)
tui_record(signals, packet_filter, vcd_sink, enable_verbose_trace)
else:
record(packet_filter, vcd_sink, enable_verbose_trace)
record(signals, packet_filter, vcd_sink, enable_verbose_trace)
vcd_sink.writer.close()
dfile.close()
@ -557,7 +552,12 @@ def main():
print(f" - Trace size: {trace_size}")
def record(packet_filter, vcd_sink, enable_verbose_trace):
def record(signals, packet_filter, vcd_sink, enable_verbose_trace):
print("Signals:")
for var in signals:
print(f" - {var}")
print()
packet_filter.onnoise(lambda b: print(chr(b), end="", flush=True))
def onval(timestamp, tag, value, sub):
@ -606,7 +606,51 @@ class TotalMaximumProgressUpdater:
self.progress.update(self.progress_task, total=self.maximum)
self.progress.update(self.progress_task, completed=value, visible=True)
def tui_record(packet_filter, vcd_sink, enable_verbose_trace):
class SignalTable:
def __init__(self, signals):
self.values = {}
self.arraycache = {}
self.types = {}
for signal in signals:
name,value_type = signal.split(":")
self.values[name] = None
if len(s := value_type.split("[")) > 1:
value_type = s[0]
array_len = int(s[1].split("]")[0], 0)
self.arraycache[name] = [None] * array_len
self.types[name] = value_type
def update(self, time, signal, value, sub):
try:
value_str = ""
value_type = self.types[signal]
match value_type:
case "u8"|"s8":
value_str = f"x{value:02X}"
case "u16"|"s16":
value_str = f"x{value:04X}"
case "u32"|"s32":
value_str = f"x{value:08X}"
case "string":
value_str = value.rstrip("\r\n")
case "event":
value_str = str(time)
if (signal in self.arraycache) and (sub is not None):
if sub >= len(self.arraycache[signal]):
self.arraycache[signal].extend([0] * (sub - len(self.arraycache[signal]) + 1))
self.arraycache[signal][sub] = value_str
value_str = "[" + ", ".join(['X' if x is None else x for x in self.arraycache[signal]]) + "]"
self.values[signal] = value_str
except Exception as e:
print("EEE", e)
print(f"{sub=} {signal=} {len(self.arraycache[signal])=}")
print('Error on line {}'.format(sys.exc_info()[-1].tb_lineno), type(e).__name__, e)
def tui_record(signals, packet_filter, vcd_sink, enable_verbose_trace):
try:
from rich.console import Console
from rich.text import Text
@ -614,6 +658,7 @@ def tui_record(packet_filter, vcd_sink, enable_verbose_trace):
from rich.progress import Progress, TextColumn, BarColumn, TaskProgressColumn, MofNCompleteColumn
from rich.live import Live
from rich.align import Align
from rich.table import Table
except:
print("error: TUI mode requires the rich package")
exit()
@ -626,6 +671,7 @@ def tui_record(packet_filter, vcd_sink, enable_verbose_trace):
noise_buffer = NoiseLineBuffer(lambda text: console.print(f"[blue]{text}"))
trace_text = Text("")
# set up progress bars
progress_colums = [
TextColumn("[progress.description]{task.description}"),
BarColumn(bar_width=80, complete_style="gold3", finished_style="red"),
@ -646,6 +692,31 @@ def tui_record(packet_filter, vcd_sink, enable_verbose_trace):
render_time_tm = TotalMaximumProgressUpdater(diag_progress, render_time)
comp_time_tm = TotalMaximumProgressUpdater(diag_progress, comp_time)
# set up table layout and signal view
signal_values = SignalTable(signals)
vcd_sink.onanyvalue(signal_values.update)
def generate_table(diag_progress, signal_values):
grid = Table.grid(expand=False)
grid.add_column(justify="left")
signals_width = max([len(x) for x in signal_values.values.keys()]) + 2
sigtable = Table.grid(expand=False)
sigtable.add_column(justify="left", width=10)
sigtable.add_column(justify="left", width=signals_width)
sigtable.add_column(justify="left")
sigtable.add_row(None) # this empty row is there to not leave behind a render on interrupt
for sig in signals:
name, sigtype = sig.split(":")
value = signal_values.values[name]
text = Text("X" if value is None else value, style="red" if value is None else "green")
sigtable.add_row(sigtype, name, text)
grid.add_row(sigtable)
grid.add_row(None)
grid.add_row(diag_progress)
return grid
with Live(console=console, transient=True) as live_status:
vcd_sink.onvalue("ST.BufferHealth", lambda _,value,sub: diag_progress.update(buffer_health, completed=value, visible=True))
vcd_sink.onvalue("ST.BufferItems", lambda _,value,sub: buffer_items_tm.update(value))
@ -660,7 +731,7 @@ def tui_record(packet_filter, vcd_sink, enable_verbose_trace):
for bstr in sys.stdin.buffer:
for b in bstr:
packet_filter.process(b)
live_status.update(diag_progress)
live_status.update(generate_table(diag_progress, signal_values))
except KeyboardInterrupt:
diag_progress.stop()