diff --git a/st_record.py b/st_record.py index 86fca12..b2658ba 100644 --- a/st_record.py +++ b/st_record.py @@ -218,16 +218,23 @@ class Filter: FLAGS_COMPRESSED = (1 << 0) - def __init__(self, on_value, on_noise): + def __init__(self, on_value): self.preamble_i = 0 self.epilouge_i = 0 self.packet_buffer = [] self.noise_buffer = [] self.process_value = on_value - self.process_noise = on_noise + self._on_noise = [] self.packet_counter = 0 self.packets_dropped = 0 + def onnoise(self, cb): + self._on_noise.append(cb) + + def _emit_noise(self, b): + for cb in self._on_noise: + cb(b) + def process(self, b): if self.preamble_i == (len(self.PREAMBLE)): self.packet_buffer.append(b) @@ -239,6 +246,7 @@ class Filter: if self.epilouge_i == (len(self.EPILOUGE)): self.process_packet(self.packet_buffer[:-len(self.EPILOUGE)]) self.packet_buffer = [] + self.noise_buffer = [] self.preamble_i = 0 self.epilouge_i = 0 else: @@ -248,9 +256,9 @@ class Filter: else: self.preamble_i = 0 for nb in self.noise_buffer: - self.process_noise(nb) + self._emit_noise(nb) self.noise_buffer = [] - self.process_noise(b) + self._emit_noise(b) def disassemble_packet(self, packet): try: @@ -340,15 +348,16 @@ class Retagger: self.packets_dropped += 1 class VcdSink: - def __init__(self, fs, signals, timescale='1 us', verbose_trace=False): + def __init__(self, fs, signals, timescale='1 us'): self.writer = VCDWriter(fs, timescale=timescale, date=datetime.datetime.now().isoformat(), version=f"PET v1.0") self.skalars = {} self.arrays = {} self.strings = {} self.varnames = {} + self._onvalues = {} + self._onanyvalues = [] self.timestamp = 0 self.packets_dropped = 0 - self.verbose_trace = verbose_trace for v in signals: hvar, vtype = v.split(":") hier, _, name = hvar.rpartition(".") @@ -391,6 +400,18 @@ class VcdSink: else: self.skalars[hvar] = self.writer.register_var(hier, name, dtype, size=dsize) + def onvalue(self, tag, cb): + self._onvalues[tag] = cb + + def onanyvalue(self, cb): + self._onanyvalues.append(cb) + + def _emit(self, timestamp, tag, value, sub=None): + for cb in self._onanyvalues: + cb(timestamp, tag, value, sub) + if tag in self._onvalues: + self._onvalues[tag](timestamp, value, sub) + def process(self, tag, value, sub, datatag): if datatag[0] == 'D': self.timestamp += value @@ -398,8 +419,6 @@ class VcdSink: elif datatag[0] == 'A': timestamp = self.timestamp try: - if self.verbose_trace: - print(f"### {timestamp:012} : {self.varnames[tag]}[{sub}] <= {value} [OK] ", flush=True) self.writer.change(self.arrays[tag][sub], timestamp, value) except ValueError: print(f"### {timestamp:012} : {self.varnames[tag]}[{sub}] <= {value} [VAL_ERR] ", flush=True) @@ -410,6 +429,7 @@ class VcdSink: except: print(f"### {timestamp:012} : {self.varnames[tag]}[{sub}] <= {value} [ERR] ", flush=True) self.packets_dropped += 1 + self._emit(timestamp, tag, value, sub) elif datatag == 'S4': timestamp = self.timestamp # unpack @@ -421,8 +441,6 @@ class VcdSink: if sub == 1: try: string = self.strings[tag][1] - if self.verbose_trace: - print(f"### {timestamp:012} : {self.varnames[tag]} <= \"{string}\"", flush=True) self.writer.change(self.strings[tag][0], timestamp, string) except ValueError: print(f"### {timestamp:012} : {self.varnames[tag]} <= \"{self.strings[tag][1]}\" [VAL_ERR] ", flush=True) @@ -434,6 +452,7 @@ class VcdSink: print(f"### {timestamp:012} : {self.varnames[tag]} <= \"{self.strings[tag][1]}\" [ERR] ", flush=True) self.packets_dropped += 1 self.strings[tag][1] = "" + self._emit(timestamp, tag, string, None) # skalar values elif (datatag == 'EV') or (datatag[0] == 'V') or (datatag[0] == 'F'): @@ -443,8 +462,6 @@ class VcdSink: value = True elif datatag == 'F4': value = struct.unpack(">f", struct.pack(">L", value))[0] - if self.verbose_trace: - print(f"### {timestamp:012} : {self.varnames[tag]} <= {value:08X}", flush=True) self.writer.change(self.skalars[tag], timestamp, value) except ValueError: print(f"### {timestamp:012} : {self.varnames[tag]} <= {value} [VAL_ERR] ", flush=True) @@ -455,11 +472,7 @@ class VcdSink: except: print(f"### {timestamp:012} : {self.varnames[tag]} <= {value} [ERR] ", flush=True) self.packets_dropped += 1 - -def process_noise(noisefile, b): - print(chr(b), end="", flush=True) - if noisefile: - noisefile.write(b.to_bytes(1)) + self._emit(timestamp, tag, value, None) def main(): parser = argparse.ArgumentParser(description="scans stdin for PET packets and dumps the values into a VCD file") @@ -475,6 +488,8 @@ def main(): help='add additional signals tracing internal state of PET') parser.add_argument('--trace', action=argparse.BooleanOptionalAction, help='write out every trace that arrives') + parser.add_argument('--tui', action=argparse.BooleanOptionalAction, + help='enable TUI mode') args = parser.parse_args() print(header) @@ -485,16 +500,17 @@ def main(): timescale = args.timescale enable_diag = args.diagnostics enable_verbose_trace = args.trace + enable_tui = args.tui predefined_signals = [] if enable_diag: predefined_signals += [ - 'SET.BufferItems:u32', - 'SET.BufferHealth:u8', - 'SET.CompressionLevel:u8', - 'SET.CompressionTime:u32', - 'SET.RenderTime:u32', - 'SET.ItemsSent:u32' + 'ST.BufferItems:u32', + 'ST.BufferHealth:u8', + 'ST.CompressionLevel:u8', + 'ST.CompressionTime:u32', + 'ST.RenderTime:u32', + 'ST.ItemsSent:u32' ] signals, signals_valid = scan_for_signals(source_tree, predefined_signals) @@ -507,31 +523,24 @@ def main(): dfile = open(tracefile, 'w', encoding='utf-8') - process_noise_p = partial(process_noise, None) + vcd_sink = VcdSink(dfile, signals, timescale) + retagger = Retagger(vcd_sink.process, tags) + packet_filter = Filter(retagger.process) + nfile = None if noisefile: nfile = open(noisefile, 'wb') - process_noise_p = partial(process_noise, nfile) + packet_filter.onnoise(lambda b: nfile.write(b.to_bytes(1))) - vcd_sink = VcdSink(dfile, signals, timescale, enable_verbose_trace) - retagger = Retagger(vcd_sink.process, tags) - packet_filter = Filter(retagger.process, process_noise_p) print("Signals:") for var in signals: print(f" - {var}") print() - print(" === BEGIN NOISE ===") - try: - for bstr in sys.stdin.buffer: - for b in bstr: - packet_filter.process(b) - except KeyboardInterrupt: - pass - - print() - print(" === END NOISE ===") - print() + if enable_tui: + tui_record(packet_filter, vcd_sink, enable_verbose_trace) + else: + record(packet_filter, vcd_sink, enable_verbose_trace) vcd_sink.writer.close() dfile.close() @@ -547,5 +556,115 @@ def main(): print(f" - Trace file: {tracefile}") print(f" - Trace size: {trace_size}") + +def record(packet_filter, vcd_sink, enable_verbose_trace): + packet_filter.onnoise(lambda b: print(chr(b), end="", flush=True)) + + def onval(timestamp, tag, value, sub): + if sub is not None: + print(f"### {timestamp:012} : {tag}[{sub}] <= {value}") + else: + print(f"### {timestamp:012} : {tag} <= {value}") + + if enable_verbose_trace: + vcd_sink.onanyvalue(onval) + + print(" === BEGIN NOISE ===") + try: + for bstr in sys.stdin.buffer: + for b in bstr: + packet_filter.process(b) + except KeyboardInterrupt: + pass + + print() + print(" === END NOISE ===") + print() + +class NoiseLineBuffer: + def __init__(self, online): + self.buffer = "" + self.online = online + + def add(self, b): + t = chr(b) + if t == "\n": + self.online(self.buffer) + self.buffer = "" + else: + self.buffer += t + +class TotalMaximumProgressUpdater: + def __init__(self, progress, progress_task): + self.progress = progress + self.progress_task = progress_task + self.maximum = 0 + + def update(self, value): + if value > self.maximum: + self.maximum = value + self.progress.update(self.progress_task, total=self.maximum) + self.progress.update(self.progress_task, completed=value) + +def tui_record(packet_filter, vcd_sink, enable_verbose_trace): + try: + from rich.console import Console + from rich.text import Text + from rich.layout import Layout + from rich.progress import Progress, TextColumn, BarColumn, TaskProgressColumn, MofNCompleteColumn + from rich.live import Live + from rich.align import Align + except: + print("error: TUI mode requires the rich package") + return + + if enable_verbose_trace: + print("warning: verbose trace is not avaialble in TUI mode") + print() + + console = Console() + noise_buffer = NoiseLineBuffer(lambda text: console.print(f"[blue]{text}")) + trace_text = Text("") + + progress_colums = [ + TextColumn("[progress.description]{task.description}"), + BarColumn(bar_width=80), + TaskProgressColumn(), + MofNCompleteColumn() + + ] + diag_progress = Progress(*progress_colums, transient=True, auto_refresh=False, refresh_per_second=1) + buffer_health = diag_progress.add_task("[green]Buffer Health", total=255) + buffer_items = diag_progress.add_task("[green]Buffer Items") + items_sent = diag_progress.add_task("[blue]Items Sent", total=1024) + render_time = diag_progress.add_task("[blue]Render Time", total=1024) + comp_lvl = diag_progress.add_task("[yellow]Compression Level", total=100) + comp_time = diag_progress.add_task("[yellow]Compression Time", total=100) + + buffer_items_tm = TotalMaximumProgressUpdater(diag_progress, buffer_items) + items_sent_tm = TotalMaximumProgressUpdater(diag_progress, items_sent) + render_time_tm = TotalMaximumProgressUpdater(diag_progress, render_time) + comp_time_tm = TotalMaximumProgressUpdater(diag_progress, comp_time) + + with Live(console=console, transient=True) as live_status: + vcd_sink.onvalue("ST.BufferHealth", lambda _,value,sub: diag_progress.update(buffer_health, completed=value)) + vcd_sink.onvalue("ST.BufferItems", lambda _,value,sub: buffer_items_tm.update(value)) + vcd_sink.onvalue("ST.ItemsSent", lambda _,value,sub: items_sent_tm.update(value)) + vcd_sink.onvalue("ST.CompressionLevel", lambda _,value,sub: diag_progress.update(comp_lvl, completed=value)) + vcd_sink.onvalue("ST.CompressionTime", lambda _,value,sub: comp_time_tm.update(value)) + vcd_sink.onvalue("ST.RenderTime", lambda _,value,sub: render_time_tm.update(value)) + + packet_filter.onnoise(noise_buffer.add) + + try: + while True: + for b in sys.stdin.buffer.read(1): + packet_filter.process(b) + live_status.update(diag_progress) + except KeyboardInterrupt: + diag_progress.stop() + + print() + if __name__ == '__main__': main()