Compare commits

...

2 Commits

Author SHA1 Message Date
1637a172fc st_record.py: Live view of signal values
The TUI is overhauled to display a live view of all received signals.
All values are rendered in hexadecimal, while events show their latest timestamp
and strings are stripped of trailing newlines.
2025-05-17 17:45:18 +02:00
c670df3ce0 WIP Signal TUI view 2025-05-17 16:08:39 +02:00

View File

@ -532,15 +532,10 @@ def main():
nfile = open(noisefile, 'wb') nfile = open(noisefile, 'wb')
packet_filter.onnoise(lambda b: nfile.write(b.to_bytes(1))) packet_filter.onnoise(lambda b: nfile.write(b.to_bytes(1)))
print("Signals:")
for var in signals:
print(f" - {var}")
print()
if enable_tui: if enable_tui:
tui_record(packet_filter, vcd_sink, enable_verbose_trace) tui_record(signals, packet_filter, vcd_sink, enable_verbose_trace)
else: else:
record(packet_filter, vcd_sink, enable_verbose_trace) record(signals, packet_filter, vcd_sink, enable_verbose_trace)
vcd_sink.writer.close() vcd_sink.writer.close()
dfile.close() dfile.close()
@ -557,7 +552,12 @@ def main():
print(f" - Trace size: {trace_size}") print(f" - Trace size: {trace_size}")
def record(packet_filter, vcd_sink, enable_verbose_trace): def record(signals, packet_filter, vcd_sink, enable_verbose_trace):
print("Signals:")
for var in signals:
print(f" - {var}")
print()
packet_filter.onnoise(lambda b: print(chr(b), end="", flush=True)) packet_filter.onnoise(lambda b: print(chr(b), end="", flush=True))
def onval(timestamp, tag, value, sub): def onval(timestamp, tag, value, sub):
@ -606,7 +606,51 @@ class TotalMaximumProgressUpdater:
self.progress.update(self.progress_task, total=self.maximum) self.progress.update(self.progress_task, total=self.maximum)
self.progress.update(self.progress_task, completed=value, visible=True) self.progress.update(self.progress_task, completed=value, visible=True)
def tui_record(packet_filter, vcd_sink, enable_verbose_trace): class SignalTable:
def __init__(self, signals):
self.values = {}
self.arraycache = {}
self.types = {}
for signal in signals:
name,value_type = signal.split(":")
self.values[name] = None
if len(s := value_type.split("[")) > 1:
value_type = s[0]
array_len = int(s[1].split("]")[0], 0)
self.arraycache[name] = [None] * array_len
self.types[name] = value_type
def update(self, time, signal, value, sub):
try:
value_str = ""
value_type = self.types[signal]
match value_type:
case "u8"|"s8":
value_str = f"x{value:02X}"
case "u16"|"s16":
value_str = f"x{value:04X}"
case "u32"|"s32":
value_str = f"x{value:08X}"
case "string":
value_str = value.rstrip("\r\n")
case "event":
value_str = str(time)
if (signal in self.arraycache) and (sub is not None):
if sub >= len(self.arraycache[signal]):
self.arraycache[signal].extend([0] * (sub - len(self.arraycache[signal]) + 1))
self.arraycache[signal][sub] = value_str
value_str = "[" + ", ".join(['X' if x is None else x for x in self.arraycache[signal]]) + "]"
self.values[signal] = value_str
except Exception as e:
print("EEE", e)
print(f"{sub=} {signal=} {len(self.arraycache[signal])=}")
print('Error on line {}'.format(sys.exc_info()[-1].tb_lineno), type(e).__name__, e)
def tui_record(signals, packet_filter, vcd_sink, enable_verbose_trace):
try: try:
from rich.console import Console from rich.console import Console
from rich.text import Text from rich.text import Text
@ -614,6 +658,7 @@ def tui_record(packet_filter, vcd_sink, enable_verbose_trace):
from rich.progress import Progress, TextColumn, BarColumn, TaskProgressColumn, MofNCompleteColumn from rich.progress import Progress, TextColumn, BarColumn, TaskProgressColumn, MofNCompleteColumn
from rich.live import Live from rich.live import Live
from rich.align import Align from rich.align import Align
from rich.table import Table
except: except:
print("error: TUI mode requires the rich package") print("error: TUI mode requires the rich package")
exit() exit()
@ -626,6 +671,7 @@ def tui_record(packet_filter, vcd_sink, enable_verbose_trace):
noise_buffer = NoiseLineBuffer(lambda text: console.print(f"[blue]{text}")) noise_buffer = NoiseLineBuffer(lambda text: console.print(f"[blue]{text}"))
trace_text = Text("") trace_text = Text("")
# set up progress bars
progress_colums = [ progress_colums = [
TextColumn("[progress.description]{task.description}"), TextColumn("[progress.description]{task.description}"),
BarColumn(bar_width=80, complete_style="gold3", finished_style="red"), BarColumn(bar_width=80, complete_style="gold3", finished_style="red"),
@ -646,6 +692,31 @@ def tui_record(packet_filter, vcd_sink, enable_verbose_trace):
render_time_tm = TotalMaximumProgressUpdater(diag_progress, render_time) render_time_tm = TotalMaximumProgressUpdater(diag_progress, render_time)
comp_time_tm = TotalMaximumProgressUpdater(diag_progress, comp_time) comp_time_tm = TotalMaximumProgressUpdater(diag_progress, comp_time)
# set up table layout and signal view
signal_values = SignalTable(signals)
vcd_sink.onanyvalue(signal_values.update)
def generate_table(diag_progress, signal_values):
grid = Table.grid(expand=False)
grid.add_column(justify="left")
signals_width = max([len(x) for x in signal_values.values.keys()]) + 2
sigtable = Table.grid(expand=False)
sigtable.add_column(justify="left", width=10)
sigtable.add_column(justify="left", width=signals_width)
sigtable.add_column(justify="left")
sigtable.add_row(None) # this empty row is there to not leave behind a render on interrupt
for sig in signals:
name, sigtype = sig.split(":")
value = signal_values.values[name]
text = Text("X" if value is None else value, style="red" if value is None else "green")
sigtable.add_row(sigtype, name, text)
grid.add_row(sigtable)
grid.add_row(None)
grid.add_row(diag_progress)
return grid
with Live(console=console, transient=True) as live_status: with Live(console=console, transient=True) as live_status:
vcd_sink.onvalue("ST.BufferHealth", lambda _,value,sub: diag_progress.update(buffer_health, completed=value, visible=True)) vcd_sink.onvalue("ST.BufferHealth", lambda _,value,sub: diag_progress.update(buffer_health, completed=value, visible=True))
vcd_sink.onvalue("ST.BufferItems", lambda _,value,sub: buffer_items_tm.update(value)) vcd_sink.onvalue("ST.BufferItems", lambda _,value,sub: buffer_items_tm.update(value))
@ -660,7 +731,7 @@ def tui_record(packet_filter, vcd_sink, enable_verbose_trace):
for bstr in sys.stdin.buffer: for bstr in sys.stdin.buffer:
for b in bstr: for b in bstr:
packet_filter.process(b) packet_filter.process(b)
live_status.update(diag_progress) live_status.update(generate_table(diag_progress, signal_values))
except KeyboardInterrupt: except KeyboardInterrupt:
diag_progress.stop() diag_progress.stop()