Compare commits

..

No commits in common. "1637a172fca7cef2f352066252f5bc41dd7b8d5a" and "90787ec97e1a6a8a51333a7af4aacf27070b458d" have entirely different histories.

View File

@ -532,10 +532,15 @@ def main():
nfile = open(noisefile, 'wb') nfile = open(noisefile, 'wb')
packet_filter.onnoise(lambda b: nfile.write(b.to_bytes(1))) packet_filter.onnoise(lambda b: nfile.write(b.to_bytes(1)))
print("Signals:")
for var in signals:
print(f" - {var}")
print()
if enable_tui: if enable_tui:
tui_record(signals, packet_filter, vcd_sink, enable_verbose_trace) tui_record(packet_filter, vcd_sink, enable_verbose_trace)
else: else:
record(signals, packet_filter, vcd_sink, enable_verbose_trace) record(packet_filter, vcd_sink, enable_verbose_trace)
vcd_sink.writer.close() vcd_sink.writer.close()
dfile.close() dfile.close()
@ -552,12 +557,7 @@ def main():
print(f" - Trace size: {trace_size}") print(f" - Trace size: {trace_size}")
def record(signals, packet_filter, vcd_sink, enable_verbose_trace): def record(packet_filter, vcd_sink, enable_verbose_trace):
print("Signals:")
for var in signals:
print(f" - {var}")
print()
packet_filter.onnoise(lambda b: print(chr(b), end="", flush=True)) packet_filter.onnoise(lambda b: print(chr(b), end="", flush=True))
def onval(timestamp, tag, value, sub): def onval(timestamp, tag, value, sub):
@ -606,51 +606,7 @@ class TotalMaximumProgressUpdater:
self.progress.update(self.progress_task, total=self.maximum) self.progress.update(self.progress_task, total=self.maximum)
self.progress.update(self.progress_task, completed=value, visible=True) self.progress.update(self.progress_task, completed=value, visible=True)
class SignalTable: def tui_record(packet_filter, vcd_sink, enable_verbose_trace):
def __init__(self, signals):
self.values = {}
self.arraycache = {}
self.types = {}
for signal in signals:
name,value_type = signal.split(":")
self.values[name] = None
if len(s := value_type.split("[")) > 1:
value_type = s[0]
array_len = int(s[1].split("]")[0], 0)
self.arraycache[name] = [None] * array_len
self.types[name] = value_type
def update(self, time, signal, value, sub):
try:
value_str = ""
value_type = self.types[signal]
match value_type:
case "u8"|"s8":
value_str = f"x{value:02X}"
case "u16"|"s16":
value_str = f"x{value:04X}"
case "u32"|"s32":
value_str = f"x{value:08X}"
case "string":
value_str = value.rstrip("\r\n")
case "event":
value_str = str(time)
if (signal in self.arraycache) and (sub is not None):
if sub >= len(self.arraycache[signal]):
self.arraycache[signal].extend([0] * (sub - len(self.arraycache[signal]) + 1))
self.arraycache[signal][sub] = value_str
value_str = "[" + ", ".join(['X' if x is None else x for x in self.arraycache[signal]]) + "]"
self.values[signal] = value_str
except Exception as e:
print("EEE", e)
print(f"{sub=} {signal=} {len(self.arraycache[signal])=}")
print('Error on line {}'.format(sys.exc_info()[-1].tb_lineno), type(e).__name__, e)
def tui_record(signals, packet_filter, vcd_sink, enable_verbose_trace):
try: try:
from rich.console import Console from rich.console import Console
from rich.text import Text from rich.text import Text
@ -658,7 +614,6 @@ def tui_record(signals, packet_filter, vcd_sink, enable_verbose_trace):
from rich.progress import Progress, TextColumn, BarColumn, TaskProgressColumn, MofNCompleteColumn from rich.progress import Progress, TextColumn, BarColumn, TaskProgressColumn, MofNCompleteColumn
from rich.live import Live from rich.live import Live
from rich.align import Align from rich.align import Align
from rich.table import Table
except: except:
print("error: TUI mode requires the rich package") print("error: TUI mode requires the rich package")
exit() exit()
@ -671,7 +626,6 @@ def tui_record(signals, packet_filter, vcd_sink, enable_verbose_trace):
noise_buffer = NoiseLineBuffer(lambda text: console.print(f"[blue]{text}")) noise_buffer = NoiseLineBuffer(lambda text: console.print(f"[blue]{text}"))
trace_text = Text("") trace_text = Text("")
# set up progress bars
progress_colums = [ progress_colums = [
TextColumn("[progress.description]{task.description}"), TextColumn("[progress.description]{task.description}"),
BarColumn(bar_width=80, complete_style="gold3", finished_style="red"), BarColumn(bar_width=80, complete_style="gold3", finished_style="red"),
@ -692,31 +646,6 @@ def tui_record(signals, packet_filter, vcd_sink, enable_verbose_trace):
render_time_tm = TotalMaximumProgressUpdater(diag_progress, render_time) render_time_tm = TotalMaximumProgressUpdater(diag_progress, render_time)
comp_time_tm = TotalMaximumProgressUpdater(diag_progress, comp_time) comp_time_tm = TotalMaximumProgressUpdater(diag_progress, comp_time)
# set up table layout and signal view
signal_values = SignalTable(signals)
vcd_sink.onanyvalue(signal_values.update)
def generate_table(diag_progress, signal_values):
grid = Table.grid(expand=False)
grid.add_column(justify="left")
signals_width = max([len(x) for x in signal_values.values.keys()]) + 2
sigtable = Table.grid(expand=False)
sigtable.add_column(justify="left", width=10)
sigtable.add_column(justify="left", width=signals_width)
sigtable.add_column(justify="left")
sigtable.add_row(None) # this empty row is there to not leave behind a render on interrupt
for sig in signals:
name, sigtype = sig.split(":")
value = signal_values.values[name]
text = Text("X" if value is None else value, style="red" if value is None else "green")
sigtable.add_row(sigtype, name, text)
grid.add_row(sigtable)
grid.add_row(None)
grid.add_row(diag_progress)
return grid
with Live(console=console, transient=True) as live_status: with Live(console=console, transient=True) as live_status:
vcd_sink.onvalue("ST.BufferHealth", lambda _,value,sub: diag_progress.update(buffer_health, completed=value, visible=True)) vcd_sink.onvalue("ST.BufferHealth", lambda _,value,sub: diag_progress.update(buffer_health, completed=value, visible=True))
vcd_sink.onvalue("ST.BufferItems", lambda _,value,sub: buffer_items_tm.update(value)) vcd_sink.onvalue("ST.BufferItems", lambda _,value,sub: buffer_items_tm.update(value))
@ -731,7 +660,7 @@ def tui_record(signals, packet_filter, vcd_sink, enable_verbose_trace):
for bstr in sys.stdin.buffer: for bstr in sys.stdin.buffer:
for b in bstr: for b in bstr:
packet_filter.process(b) packet_filter.process(b)
live_status.update(generate_table(diag_progress, signal_values)) live_status.update(diag_progress)
except KeyboardInterrupt: except KeyboardInterrupt:
diag_progress.stop() diag_progress.stop()