st_record.py: TUI Update

st_record now features the "--tui" flag,
which when set uses the rich package to display live
diagnostic values via its progress bar renderable.

These are displayed regardless of the "--diagnostic" flag,
as this one only regards the recorded dump.
This commit is contained in:
Dominic Höglinger 2025-05-15 22:18:02 +02:00
parent b76a97e394
commit 93c6d6cdd2

View File

@ -218,16 +218,23 @@ class Filter:
FLAGS_COMPRESSED = (1 << 0) FLAGS_COMPRESSED = (1 << 0)
def __init__(self, on_value, on_noise): def __init__(self, on_value):
self.preamble_i = 0 self.preamble_i = 0
self.epilouge_i = 0 self.epilouge_i = 0
self.packet_buffer = [] self.packet_buffer = []
self.noise_buffer = [] self.noise_buffer = []
self.process_value = on_value self.process_value = on_value
self.process_noise = on_noise self._on_noise = []
self.packet_counter = 0 self.packet_counter = 0
self.packets_dropped = 0 self.packets_dropped = 0
def onnoise(self, cb):
self._on_noise.append(cb)
def _emit_noise(self, b):
for cb in self._on_noise:
cb(b)
def process(self, b): def process(self, b):
if self.preamble_i == (len(self.PREAMBLE)): if self.preamble_i == (len(self.PREAMBLE)):
self.packet_buffer.append(b) self.packet_buffer.append(b)
@ -239,6 +246,7 @@ class Filter:
if self.epilouge_i == (len(self.EPILOUGE)): if self.epilouge_i == (len(self.EPILOUGE)):
self.process_packet(self.packet_buffer[:-len(self.EPILOUGE)]) self.process_packet(self.packet_buffer[:-len(self.EPILOUGE)])
self.packet_buffer = [] self.packet_buffer = []
self.noise_buffer = []
self.preamble_i = 0 self.preamble_i = 0
self.epilouge_i = 0 self.epilouge_i = 0
else: else:
@ -248,9 +256,9 @@ class Filter:
else: else:
self.preamble_i = 0 self.preamble_i = 0
for nb in self.noise_buffer: for nb in self.noise_buffer:
self.process_noise(nb) self._emit_noise(nb)
self.noise_buffer = [] self.noise_buffer = []
self.process_noise(b) self._emit_noise(b)
def disassemble_packet(self, packet): def disassemble_packet(self, packet):
try: try:
@ -340,15 +348,16 @@ class Retagger:
self.packets_dropped += 1 self.packets_dropped += 1
class VcdSink: class VcdSink:
def __init__(self, fs, signals, timescale='1 us', verbose_trace=False): def __init__(self, fs, signals, timescale='1 us'):
self.writer = VCDWriter(fs, timescale=timescale, date=datetime.datetime.now().isoformat(), version=f"PET v1.0") self.writer = VCDWriter(fs, timescale=timescale, date=datetime.datetime.now().isoformat(), version=f"PET v1.0")
self.skalars = {} self.skalars = {}
self.arrays = {} self.arrays = {}
self.strings = {} self.strings = {}
self.varnames = {} self.varnames = {}
self._onvalues = {}
self._onanyvalues = []
self.timestamp = 0 self.timestamp = 0
self.packets_dropped = 0 self.packets_dropped = 0
self.verbose_trace = verbose_trace
for v in signals: for v in signals:
hvar, vtype = v.split(":") hvar, vtype = v.split(":")
hier, _, name = hvar.rpartition(".") hier, _, name = hvar.rpartition(".")
@ -391,6 +400,18 @@ class VcdSink:
else: else:
self.skalars[hvar] = self.writer.register_var(hier, name, dtype, size=dsize) self.skalars[hvar] = self.writer.register_var(hier, name, dtype, size=dsize)
def onvalue(self, tag, cb):
self._onvalues[tag] = cb
def onanyvalue(self, cb):
self._onanyvalues.append(cb)
def _emit(self, timestamp, tag, value, sub=None):
for cb in self._onanyvalues:
cb(timestamp, tag, value, sub)
if tag in self._onvalues:
self._onvalues[tag](timestamp, value, sub)
def process(self, tag, value, sub, datatag): def process(self, tag, value, sub, datatag):
if datatag[0] == 'D': if datatag[0] == 'D':
self.timestamp += value self.timestamp += value
@ -398,8 +419,6 @@ class VcdSink:
elif datatag[0] == 'A': elif datatag[0] == 'A':
timestamp = self.timestamp timestamp = self.timestamp
try: try:
if self.verbose_trace:
print(f"### {timestamp:012} : {self.varnames[tag]}[{sub}] <= {value} [OK] ", flush=True)
self.writer.change(self.arrays[tag][sub], timestamp, value) self.writer.change(self.arrays[tag][sub], timestamp, value)
except ValueError: except ValueError:
print(f"### {timestamp:012} : {self.varnames[tag]}[{sub}] <= {value} [VAL_ERR] ", flush=True) print(f"### {timestamp:012} : {self.varnames[tag]}[{sub}] <= {value} [VAL_ERR] ", flush=True)
@ -410,6 +429,7 @@ class VcdSink:
except: except:
print(f"### {timestamp:012} : {self.varnames[tag]}[{sub}] <= {value} [ERR] ", flush=True) print(f"### {timestamp:012} : {self.varnames[tag]}[{sub}] <= {value} [ERR] ", flush=True)
self.packets_dropped += 1 self.packets_dropped += 1
self._emit(timestamp, tag, value, sub)
elif datatag == 'S4': elif datatag == 'S4':
timestamp = self.timestamp timestamp = self.timestamp
# unpack # unpack
@ -421,8 +441,6 @@ class VcdSink:
if sub == 1: if sub == 1:
try: try:
string = self.strings[tag][1] string = self.strings[tag][1]
if self.verbose_trace:
print(f"### {timestamp:012} : {self.varnames[tag]} <= \"{string}\"", flush=True)
self.writer.change(self.strings[tag][0], timestamp, string) self.writer.change(self.strings[tag][0], timestamp, string)
except ValueError: except ValueError:
print(f"### {timestamp:012} : {self.varnames[tag]} <= \"{self.strings[tag][1]}\" [VAL_ERR] ", flush=True) print(f"### {timestamp:012} : {self.varnames[tag]} <= \"{self.strings[tag][1]}\" [VAL_ERR] ", flush=True)
@ -434,6 +452,7 @@ class VcdSink:
print(f"### {timestamp:012} : {self.varnames[tag]} <= \"{self.strings[tag][1]}\" [ERR] ", flush=True) print(f"### {timestamp:012} : {self.varnames[tag]} <= \"{self.strings[tag][1]}\" [ERR] ", flush=True)
self.packets_dropped += 1 self.packets_dropped += 1
self.strings[tag][1] = "" self.strings[tag][1] = ""
self._emit(timestamp, tag, string, None)
# skalar values # skalar values
elif (datatag == 'EV') or (datatag[0] == 'V') or (datatag[0] == 'F'): elif (datatag == 'EV') or (datatag[0] == 'V') or (datatag[0] == 'F'):
@ -443,8 +462,6 @@ class VcdSink:
value = True value = True
elif datatag == 'F4': elif datatag == 'F4':
value = struct.unpack(">f", struct.pack(">L", value))[0] value = struct.unpack(">f", struct.pack(">L", value))[0]
if self.verbose_trace:
print(f"### {timestamp:012} : {self.varnames[tag]} <= {value:08X}", flush=True)
self.writer.change(self.skalars[tag], timestamp, value) self.writer.change(self.skalars[tag], timestamp, value)
except ValueError: except ValueError:
print(f"### {timestamp:012} : {self.varnames[tag]} <= {value} [VAL_ERR] ", flush=True) print(f"### {timestamp:012} : {self.varnames[tag]} <= {value} [VAL_ERR] ", flush=True)
@ -455,11 +472,7 @@ class VcdSink:
except: except:
print(f"### {timestamp:012} : {self.varnames[tag]} <= {value} [ERR] ", flush=True) print(f"### {timestamp:012} : {self.varnames[tag]} <= {value} [ERR] ", flush=True)
self.packets_dropped += 1 self.packets_dropped += 1
self._emit(timestamp, tag, value, None)
def process_noise(noisefile, b):
print(chr(b), end="", flush=True)
if noisefile:
noisefile.write(b.to_bytes(1))
def main(): def main():
parser = argparse.ArgumentParser(description="scans stdin for PET packets and dumps the values into a VCD file") parser = argparse.ArgumentParser(description="scans stdin for PET packets and dumps the values into a VCD file")
@ -475,6 +488,8 @@ def main():
help='add additional signals tracing internal state of PET') help='add additional signals tracing internal state of PET')
parser.add_argument('--trace', action=argparse.BooleanOptionalAction, parser.add_argument('--trace', action=argparse.BooleanOptionalAction,
help='write out every trace that arrives') help='write out every trace that arrives')
parser.add_argument('--tui', action=argparse.BooleanOptionalAction,
help='enable TUI mode')
args = parser.parse_args() args = parser.parse_args()
print(header) print(header)
@ -485,16 +500,17 @@ def main():
timescale = args.timescale timescale = args.timescale
enable_diag = args.diagnostics enable_diag = args.diagnostics
enable_verbose_trace = args.trace enable_verbose_trace = args.trace
enable_tui = args.tui
predefined_signals = [] predefined_signals = []
if enable_diag: if enable_diag:
predefined_signals += [ predefined_signals += [
'SET.BufferItems:u32', 'ST.BufferItems:u32',
'SET.BufferHealth:u8', 'ST.BufferHealth:u8',
'SET.CompressionLevel:u8', 'ST.CompressionLevel:u8',
'SET.CompressionTime:u32', 'ST.CompressionTime:u32',
'SET.RenderTime:u32', 'ST.RenderTime:u32',
'SET.ItemsSent:u32' 'ST.ItemsSent:u32'
] ]
signals, signals_valid = scan_for_signals(source_tree, predefined_signals) signals, signals_valid = scan_for_signals(source_tree, predefined_signals)
@ -507,31 +523,24 @@ def main():
dfile = open(tracefile, 'w', encoding='utf-8') dfile = open(tracefile, 'w', encoding='utf-8')
process_noise_p = partial(process_noise, None) vcd_sink = VcdSink(dfile, signals, timescale)
retagger = Retagger(vcd_sink.process, tags)
packet_filter = Filter(retagger.process)
nfile = None nfile = None
if noisefile: if noisefile:
nfile = open(noisefile, 'wb') nfile = open(noisefile, 'wb')
process_noise_p = partial(process_noise, nfile) packet_filter.onnoise(lambda b: nfile.write(b.to_bytes(1)))
vcd_sink = VcdSink(dfile, signals, timescale, enable_verbose_trace)
retagger = Retagger(vcd_sink.process, tags)
packet_filter = Filter(retagger.process, process_noise_p)
print("Signals:") print("Signals:")
for var in signals: for var in signals:
print(f" - {var}") print(f" - {var}")
print() print()
print(" === BEGIN NOISE ===") if enable_tui:
try: tui_record(packet_filter, vcd_sink, enable_verbose_trace)
for bstr in sys.stdin.buffer: else:
for b in bstr: record(packet_filter, vcd_sink, enable_verbose_trace)
packet_filter.process(b)
except KeyboardInterrupt:
pass
print()
print(" === END NOISE ===")
print()
vcd_sink.writer.close() vcd_sink.writer.close()
dfile.close() dfile.close()
@ -547,5 +556,115 @@ def main():
print(f" - Trace file: {tracefile}") print(f" - Trace file: {tracefile}")
print(f" - Trace size: {trace_size}") print(f" - Trace size: {trace_size}")
def record(packet_filter, vcd_sink, enable_verbose_trace):
packet_filter.onnoise(lambda b: print(chr(b), end="", flush=True))
def onval(timestamp, tag, value, sub):
if sub is not None:
print(f"### {timestamp:012} : {tag}[{sub}] <= {value}")
else:
print(f"### {timestamp:012} : {tag} <= {value}")
if enable_verbose_trace:
vcd_sink.onanyvalue(onval)
print(" === BEGIN NOISE ===")
try:
for bstr in sys.stdin.buffer:
for b in bstr:
packet_filter.process(b)
except KeyboardInterrupt:
pass
print()
print(" === END NOISE ===")
print()
class NoiseLineBuffer:
def __init__(self, online):
self.buffer = ""
self.online = online
def add(self, b):
t = chr(b)
if t == "\n":
self.online(self.buffer)
self.buffer = ""
else:
self.buffer += t
class TotalMaximumProgressUpdater:
def __init__(self, progress, progress_task):
self.progress = progress
self.progress_task = progress_task
self.maximum = 0
def update(self, value):
if value > self.maximum:
self.maximum = value
self.progress.update(self.progress_task, total=self.maximum)
self.progress.update(self.progress_task, completed=value)
def tui_record(packet_filter, vcd_sink, enable_verbose_trace):
try:
from rich.console import Console
from rich.text import Text
from rich.layout import Layout
from rich.progress import Progress, TextColumn, BarColumn, TaskProgressColumn, MofNCompleteColumn
from rich.live import Live
from rich.align import Align
except:
print("error: TUI mode requires the rich package")
return
if enable_verbose_trace:
print("warning: verbose trace is not avaialble in TUI mode")
print()
console = Console()
noise_buffer = NoiseLineBuffer(lambda text: console.print(f"[blue]{text}"))
trace_text = Text("")
progress_colums = [
TextColumn("[progress.description]{task.description}"),
BarColumn(bar_width=80),
TaskProgressColumn(),
MofNCompleteColumn()
]
diag_progress = Progress(*progress_colums, transient=True, auto_refresh=False, refresh_per_second=1)
buffer_health = diag_progress.add_task("[green]Buffer Health", total=255)
buffer_items = diag_progress.add_task("[green]Buffer Items")
items_sent = diag_progress.add_task("[blue]Items Sent", total=1024)
render_time = diag_progress.add_task("[blue]Render Time", total=1024)
comp_lvl = diag_progress.add_task("[yellow]Compression Level", total=100)
comp_time = diag_progress.add_task("[yellow]Compression Time", total=100)
buffer_items_tm = TotalMaximumProgressUpdater(diag_progress, buffer_items)
items_sent_tm = TotalMaximumProgressUpdater(diag_progress, items_sent)
render_time_tm = TotalMaximumProgressUpdater(diag_progress, render_time)
comp_time_tm = TotalMaximumProgressUpdater(diag_progress, comp_time)
with Live(console=console, transient=True) as live_status:
vcd_sink.onvalue("ST.BufferHealth", lambda _,value,sub: diag_progress.update(buffer_health, completed=value))
vcd_sink.onvalue("ST.BufferItems", lambda _,value,sub: buffer_items_tm.update(value))
vcd_sink.onvalue("ST.ItemsSent", lambda _,value,sub: items_sent_tm.update(value))
vcd_sink.onvalue("ST.CompressionLevel", lambda _,value,sub: diag_progress.update(comp_lvl, completed=value))
vcd_sink.onvalue("ST.CompressionTime", lambda _,value,sub: comp_time_tm.update(value))
vcd_sink.onvalue("ST.RenderTime", lambda _,value,sub: render_time_tm.update(value))
packet_filter.onnoise(noise_buffer.add)
try:
while True:
for b in sys.stdin.buffer.read(1):
packet_filter.process(b)
live_status.update(diag_progress)
except KeyboardInterrupt:
diag_progress.stop()
print()
if __name__ == '__main__': if __name__ == '__main__':
main() main()