st_record.py: Update signals in TUI mode each 0.2s and add floats

This commit is contained in:
Dominic Höglinger 2025-05-17 22:19:43 +02:00
parent 5b9bda35af
commit 26319a72d3

View File

@ -26,6 +26,25 @@ def human_readable_size(size, decimal_places=2):
size /= 1024.0 size /= 1024.0
return f"{size:.{decimal_places}f} {unit}" return f"{size:.{decimal_places}f} {unit}"
def parse_time(timestring):
prefixes = {
'p':10e-12,
'n':10e-9,
'u':10e-6,
'm':10e-3,
}
for prefix,factor in prefixes.items():
unit = prefix+'s'
# found unit in string
if unit in timestring:
# remove unit from string, strip any whitespace before it and parse as float
time = float(timestring.split(unit)[0].rstrip(" "))
return time*factor
# no unit attached, assuming seconds
return float(timestring)
def is_hex(s): def is_hex(s):
return all(chr(c) in '0123456789abcdefABCDEF' for c in s) return all(chr(c) in '0123456789abcdefABCDEF' for c in s)
@ -532,8 +551,17 @@ def main():
nfile = open(noisefile, 'wb') nfile = open(noisefile, 'wb')
packet_filter.onnoise(lambda b: nfile.write(b.to_bytes(1))) packet_filter.onnoise(lambda b: nfile.write(b.to_bytes(1)))
# remove diagnostic signal from view
signals = [s for s in signals if s not in predefined_signals]
# convert timescale to seconds
timescale = parse_time(timescale)
# start recording
if enable_tui: if enable_tui:
tui_record(signals, packet_filter, vcd_sink, enable_verbose_trace) if enable_verbose_trace:
print("warning: verbose trace is not avaialble in TUI mode")
print()
tui_record(signals, packet_filter, vcd_sink, timescale)
else: else:
record(signals, packet_filter, vcd_sink, enable_verbose_trace) record(signals, packet_filter, vcd_sink, enable_verbose_trace)
@ -607,10 +635,12 @@ class TotalMaximumProgressUpdater:
self.progress.update(self.progress_task, completed=value, visible=True) self.progress.update(self.progress_task, completed=value, visible=True)
class SignalTable: class SignalTable:
def __init__(self, signals): def __init__(self, signals, update_interval=1000000):
self.values = {} self.values = {}
self.arraycache = {} self.arraycache = {}
self.types = {} self.types = {}
self.lastupdate = {}
self.update_interval = update_interval
for signal in signals: for signal in signals:
name,value_type = signal.split(":") name,value_type = signal.split(":")
@ -620,9 +650,17 @@ class SignalTable:
array_len = int(s[1].split("]")[0], 0) array_len = int(s[1].split("]")[0], 0)
self.arraycache[name] = [None] * array_len self.arraycache[name] = [None] * array_len
self.types[name] = value_type self.types[name] = value_type
self.lastupdate[name] = 0
def update(self, time, signal, value, sub): def update(self, time, signal, value, sub):
try: if signal in self.values:
not_enough_time_passed = (time - self.lastupdate[signal]) < self.update_interval
is_array = signal in self.arraycache
is_last_array_index = is_array and (sub == (len(self.arraycache[signal])))
if not_enough_time_passed and is_last_array_index:
return
self.lastupdate[signal] = time
value_str = "" value_str = ""
value_type = self.types[signal] value_type = self.types[signal]
@ -633,24 +671,22 @@ class SignalTable:
value_str = f"x{value:04X}" value_str = f"x{value:04X}"
case "u32"|"s32": case "u32"|"s32":
value_str = f"x{value:08X}" value_str = f"x{value:08X}"
case "f32":
value_str = str(value)
case "string": case "string":
value_str = value.rstrip("\r\n") value_str = value.rstrip("\r\n")
case "event": case "event":
value_str = str(time) value_str = str(time)
if (signal in self.arraycache) and (sub is not None): if is_array and (sub is not None):
if sub >= len(self.arraycache[signal]): if sub >= len(self.arraycache[signal]):
self.arraycache[signal].extend([0] * (sub - len(self.arraycache[signal]) + 1)) self.arraycache[signal].extend([0] * (sub - len(self.arraycache[signal]) + 1))
self.arraycache[signal][sub] = value_str self.arraycache[signal][sub] = value_str
value_str = "[" + ", ".join(['X' if x is None else x for x in self.arraycache[signal]]) + "]" value_str = "[" + ", ".join(['X' if x is None else x for x in self.arraycache[signal]]) + "]"
self.values[signal] = value_str self.values[signal] = value_str
except Exception as e:
print("EEE", e)
print(f"{sub=} {signal=} {len(self.arraycache[signal])=}")
print('Error on line {}'.format(sys.exc_info()[-1].tb_lineno), type(e).__name__, e)
def tui_record(signals, packet_filter, vcd_sink, enable_verbose_trace): def tui_record(signals, packet_filter, vcd_sink, timescale):
try: try:
from rich.console import Console from rich.console import Console
from rich.text import Text from rich.text import Text
@ -663,10 +699,6 @@ def tui_record(signals, packet_filter, vcd_sink, enable_verbose_trace):
print("error: TUI mode requires the rich package") print("error: TUI mode requires the rich package")
exit() exit()
if enable_verbose_trace:
print("warning: verbose trace is not avaialble in TUI mode")
print()
console = Console() console = Console()
noise_buffer = NoiseLineBuffer(lambda text: console.print(f"[blue]{text}")) noise_buffer = NoiseLineBuffer(lambda text: console.print(f"[blue]{text}"))
trace_text = Text("") trace_text = Text("")
@ -693,7 +725,9 @@ def tui_record(signals, packet_filter, vcd_sink, enable_verbose_trace):
comp_time_tm = TotalMaximumProgressUpdater(diag_progress, comp_time) comp_time_tm = TotalMaximumProgressUpdater(diag_progress, comp_time)
# set up table layout and signal view # set up table layout and signal view
signal_values = SignalTable(signals) # set its update interval to 0.2 seconds in ticks
signal_update_interval = int(0.2/timescale)
signal_values = SignalTable(signals, signal_update_interval)
vcd_sink.onanyvalue(signal_values.update) vcd_sink.onanyvalue(signal_values.update)
def generate_table(diag_progress, signal_values): def generate_table(diag_progress, signal_values):