# -*- coding: utf-8 -*- import sys import os import re import struct import argparse import datetime from functools import partial from vcd import VCDWriter, writer from vcd.common import VarType header = """ ▓▓▓▓▓ ░░░ ███████ ████████ ▓ ▓ ▒ ░ ██ ██ Streaming Trace ▓ ▓ ▓ ░░ ███████ ██ VCD Recorder Utility ▓ ▓ ▒ ░ ██ ██ ▓▓▓▓▓ ░░░ ███████ ██ (c) 2025 D.Hoeglinger """ def human_readable_size(size, decimal_places=2): for unit in ['B', 'KiB', 'MiB', 'GiB', 'TiB', 'PiB']: if size < 1024.0 or unit == 'PiB': break size /= 1024.0 return f"{size:.{decimal_places}f} {unit}" def is_hex(s): return all(chr(c) in '0123456789abcdefABCDEF' for c in s) def bsd_hash(data): checksum = 0 for c in data: checksum = (checksum >> 1) + ((checksum & 1) << 15) checksum += ord(c) checksum = checksum & 0xFFFF return checksum def hex_to_int(hex_digit): if hex_digit.isdigit(): return int(hex_digit) elif hex_digit in 'ABCDEF': return ord(hex_digit) - ord('A') + 10 elif hex_digit in 'abcdef': return ord(hex_digit) - ord('a') + 10 else: raise ValueError("Invalid hexadecimal digit") def decode_hexstr(payload): value = bytearray() for i in range(0, len(payload), 2): b = hex_to_int(chr(payload[i])) b |= hex_to_int(chr(payload[i + 1])) << 4 value.append(b) return bytes(value) def hexstr(payload): return payload.hex().upper() def decode_binstr(payload): value = 0 for i,b in enumerate(payload): value |= b << (8 * i) return value class DecodeError(Exception): pass def cobs_decode(enc, delim): enc = list(enc) enc.append(delim) enc2 = enc[0:-1] length = len(enc2) if delim != 0x00: for i in range(0, len(enc2)): enc2[i] = enc[i] ^ delim dec = [] code = 0xFF block = 0 for i in range(0, length): byte = enc2[i] if block != 0: dec.append(byte) else: if (i + byte) > len(enc): raise DecodeError(f"Marker pointing to end of packet, at {i}, marker={byte}") if code != 0xFF: dec.append(0) code = byte block = code if code == 0: break block = block - 1 return bytes(dec) def _memmove(data: bytearray, stidx: int, offset: int, mlen: int) -> None: for i in range(mlen): data[stidx + i] = data[stidx - offset + i] def fastlz_decompress_lv1(datain, doutlen): opcode_0 = datain[0] datain_idx = 1 dataout = bytearray(doutlen) dataout_idx = 0; while True: op_type = opcode_0 >> 5 op_data = opcode_0 & 31 if op_type == 0b000: # literal run run = 1 + opcode_0 dataout[dataout_idx:dataout_idx + run] = datain[datain_idx:datain_idx + run] datain_idx += run dataout_idx += run elif op_type == 0b111: # long match opcode_1 = datain[datain_idx] datain_idx += 1 opcode_2 = datain[datain_idx] datain_idx += 1 match_len = 9 + opcode_1 ofs = (op_data << 8) + opcode_2 + 1 _memmove(dataout, dataout_idx, ofs, match_len) dataout_idx += match_len else: # short match opcode_1 = datain[datain_idx] datain_idx += 1 match_len = 2 + op_type ofs = (op_data << 8) + opcode_1 + 1 _memmove(dataout, dataout_idx, ofs, match_len) dataout_idx += match_len if datain_idx < len(datain): opcode_0 = datain[datain_idx] datain_idx += 1 else: break return bytes(dataout[:dataout_idx]) def scan_for_signals(directory, predefined_signals): library_files = ['st.c', 'st.h'] rx_events = re.compile(r'st_evtrace\(\"([^\"]+)\"') rx_scalars = re.compile(r'st_([usf])(8|16|32)trace\(\"([^\"]+)\"') rx_arrays = re.compile(r'st_a([us])(8|16|32)trace\(\"([^\"]+)\"\,\s*[^,]+,\s*((?:0x)?[a-zA-Z0-9]+)') rx_strings = re.compile(r'st_strtrace\(\"([^\"]+)\"') signals = {} valid = True def add_variable(tag, variable): hash = bsd_hash(tag) if hash in signals: if signals[hash] != variable: print(f"{file_path}: error: variable `{variable}` has conflicting hash with `{signals[hash]}`") valid = False else: signals[hash] = variable for predef_signal in predefined_signals: sig,_ = predef_signal.split(":") add_variable(sig, predef_signal) for root, _, files in os.walk(directory): for file in files: if file.endswith(('.c', '.h')) and (file not in library_files): file_path = os.path.join(root, file) with open(file_path, 'r') as f: content = f.read() for match in rx_events.finditer(content): tag = match.group(1) variable = f"{tag}:event" add_variable(tag, variable) for match in rx_scalars.finditer(content): tag = match.group(3) variable = f"{tag}:{match.group(1)}{match.group(2)}" add_variable(tag, variable) for match in rx_arrays.finditer(content): tag = match.group(3) variable = f"{tag}:{match.group(1)}{match.group(2)}[{int(match.group(4),0)}]" add_variable(tag, variable) for match in rx_strings.finditer(content): tag = match.group(1) variable = f"{tag}:string" add_variable(tag, variable) return list(signals.values()), valid class Filter: PREAMBLE = b"\x1b[s" EPILOUGE = b"\x1b[u" TAGCODE_LUT = { 0x11 : "D1", 0x12 : "D2", 0x14 : "D4", 0x21 : "V1", 0x22 : "V2", 0x24 : "V4", 0x31 : "A1", 0x32 : "A2", 0x34 : "A4", 0x44 : "S4", 0x54 : "F4", 0x60 : "EV" } FLAGS_COMPRESSED = (1 << 0) def __init__(self, on_value): self.preamble_i = 0 self.epilouge_i = 0 self.packet_buffer = [] self.noise_buffer = [] self.process_value = on_value self._on_noise = [] self.packet_counter = 0 self.packets_dropped = 0 def onnoise(self, cb): self._on_noise.append(cb) def _emit_noise(self, b): for cb in self._on_noise: cb(b) def process(self, b): if self.preamble_i == (len(self.PREAMBLE)): self.packet_buffer.append(b) if b == self.EPILOUGE[self.epilouge_i]: self.epilouge_i += 1 else: self.epilouge_i = 0 if self.epilouge_i == (len(self.EPILOUGE)): self.process_packet(self.packet_buffer[:-len(self.EPILOUGE)]) self.packet_buffer = [] self.noise_buffer = [] self.preamble_i = 0 self.epilouge_i = 0 else: if b == self.PREAMBLE[self.preamble_i]: self.preamble_i += 1 self.noise_buffer.append(b) else: self.preamble_i = 0 for nb in self.noise_buffer: self._emit_noise(nb) self.noise_buffer = [] self._emit_noise(b) def disassemble_packet(self, packet): try: tagcode = packet[0] # ignore rubout packets if chr(tagcode) == ' ': return if tagcode not in self.TAGCODE_LUT: self.packets_dropped += 1 return tag = self.TAGCODE_LUT[tagcode] value = None offset = 1 match tag: case "D1"|"V1"|"A1": value = decode_binstr(packet[offset:offset+1]) offset += 1 case "D2"|"V2"|"A2": value = decode_binstr(packet[offset:offset+2]) offset += 2 case "D4"|"V4"|"A4"|"F4"|"S4": value = decode_binstr(packet[offset:offset+4]) offset += 4 sub = None if tag[0] == 'A' or tag[0] == 'S': sub = decode_binstr(packet[offset:offset+1]) offset += 1 hashtag = None if tag[0] != 'D': hashtag = decode_binstr(packet[-2:]) except Exception as ex: self.packets_dropped += 1 return self.process_value(hashtag, value, sub, tag) self.packet_counter += 1 def disassemble_macropacket(self, packet): flags = packet[0] payload = packet[1:] try: macropack = cobs_decode(payload, 0x1B) if self.FLAGS_COMPRESSED & flags: frames = fastlz_decompress_lv1(macropack, 1024) else: frames = macropack macropack_delim = 0x00 for pack in frames.split(bytes(bytearray([macropack_delim]))): if len(pack) == 0: continue try: decoded = cobs_decode(pack, macropack_delim) self.disassemble_packet(decoded) except: self.packets_dropped += 1 except: self.packets_dropped += 1 def process_packet(self, packet): if len(packet) == 0: return if is_hex(packet): packet = decode_hexstr(packet) if True or (len(packet) > 3 + 8 + 3): self.disassemble_macropacket(packet) else: self.disassemble_packet(packet) class Retagger: def __init__(self, on_value, tags=None): tags = tags or [] self._tag_lut = {bsd_hash(tag):tag for tag in tags} self.process_value = on_value self.packets_dropped = 0 def process(self, hashtag, number, sub, datatag): if hashtag in self._tag_lut or datatag[0] == 'D': tag = self._tag_lut.get(hashtag, None) self.process_value(tag, number, sub, datatag) else: self.packets_dropped += 1 class VcdSink: def __init__(self, fs, signals, timescale='1 us'): self.writer = VCDWriter(fs, timescale=timescale, date=datetime.datetime.now().isoformat(), version=f"ST v1.0.2") self.skalars = {} self.arrays = {} self.strings = {} self.varnames = {} self._onvalues = {} self._onanyvalues = [] self.timestamp = 0 self.packets_dropped = 0 for v in signals: hvar, vtype = v.split(":") hier, _, name = hvar.rpartition(".") arr = None s = vtype.split("[") if len(s) == 2: vtype, arr = s dsize = 32 dtype = 'integer' match vtype: case 'event': dtype = 'event' dsize = 32 case 'f32': dtype = 'real' dsize = 32 case 'u32'|'s32': dtype = 'integer' dsize = 32 case 'u16'|'s16': dtype = 'integer' dsize = 16 case 'u8'|'s8': dtype = 'integer' dsize = 8 case 'string': dtype = 'string' dsize = 8 self.varnames[hvar] = hvar if arr is not None: elems = int(arr.rstrip("]")) vars = [] for i in range(0, elems): vars.append(self.writer.register_var(hvar, f"{name}[{i}:{(i+1)}]", 'wire', size=dsize)) self.arrays[hvar] = vars elif dtype == 'string': self.strings[hvar] = [self.writer.register_var(hier, name, dtype, size=dsize), ""] else: self.skalars[hvar] = self.writer.register_var(hier, name, dtype, size=dsize) def onvalue(self, tag, cb): self._onvalues[tag] = cb def onanyvalue(self, cb): self._onanyvalues.append(cb) def _emit(self, timestamp, tag, value, sub=None): for cb in self._onanyvalues: cb(timestamp, tag, value, sub) if tag in self._onvalues: self._onvalues[tag](timestamp, value, sub) def process(self, tag, value, sub, datatag): if datatag[0] == 'D': self.timestamp += value # array values elif datatag[0] == 'A': timestamp = self.timestamp try: self.writer.change(self.arrays[tag][sub], timestamp, value) except ValueError: print(f"### {timestamp:012} : {self.varnames[tag]}[{sub}] <= {value} [VAL_ERR] ", flush=True) self.packets_dropped += 1 except writer.VCDPhaseError: print(f"### {timestamp:012} : {self.varnames[tag]}[{sub}] <= {value} [PHA_ERR] ", flush=True) self.packets_dropped += 1 except: print(f"### {timestamp:012} : {self.varnames[tag]}[{sub}] <= {value} [ERR] ", flush=True) self.packets_dropped += 1 self._emit(timestamp, tag, value, sub) elif datatag == 'S4': timestamp = self.timestamp # unpack for i in range(0,4): char = value >> (i*8) & 0xFF if char != 0: self.strings[tag][1] += chr(char) # sub of 1 indicates end of string if sub == 1: try: string = self.strings[tag][1] self.writer.change(self.strings[tag][0], timestamp, string) except ValueError: print(f"### {timestamp:012} : {self.varnames[tag]} <= \"{self.strings[tag][1]}\" [VAL_ERR] ", flush=True) self.packets_dropped += 1 except writer.VCDPhaseError: print(f"### {timestamp:012} : {self.varnames[tag]} <= \"{self.strings[tag][1]}\" [PHA_ERR] ", flush=True) self.packets_dropped += 1 except: print(f"### {timestamp:012} : {self.varnames[tag]} <= \"{self.strings[tag][1]}\" [ERR] ", flush=True) self.packets_dropped += 1 self.strings[tag][1] = "" self._emit(timestamp, tag, string, None) # skalar values elif (datatag == 'EV') or (datatag[0] == 'V') or (datatag[0] == 'F'): timestamp = self.timestamp try: if self.skalars[tag].type == VarType.event: value = True elif datatag == 'F4': value = struct.unpack(">f", struct.pack(">L", value))[0] self.writer.change(self.skalars[tag], timestamp, value) except ValueError: print(f"### {timestamp:012} : {self.varnames[tag]} <= {value} [VAL_ERR] ", flush=True) self.packets_dropped += 1 except writer.VCDPhaseError: print(f"### {timestamp:012} : {self.varnames[tag]} <= {value} [PHA_ERR] ", flush=True) self.packets_dropped += 1 except: print(f"### {timestamp:012} : {self.varnames[tag]} <= {value} [ERR] ", flush=True) self.packets_dropped += 1 self._emit(timestamp, tag, value, None) def main(): parser = argparse.ArgumentParser(description="scans stdin for ST packets and dumps the values into a VCD file") parser.add_argument('-d', '--dump', type=str, required=True, help='output IEEE 1364-2005 Value Change Dump (vcd) file') parser.add_argument('-t', '--timescale', type=str, default="1 us", help='period of one timestamp tick') parser.add_argument('-n', '--noise', type=str, default=None, help='store the stdin data sans packets in this file') parser.add_argument('-s', '--source', type=str, required=True, help='source tree to scan for trace marks') parser.add_argument('--diagnostics', action=argparse.BooleanOptionalAction, help='add additional signals tracing internal state of ST') parser.add_argument('--trace', action=argparse.BooleanOptionalAction, help='write out every trace that arrives') parser.add_argument('--tui', action=argparse.BooleanOptionalAction, help='enable TUI mode') args = parser.parse_args() print(header) tracefile = args.dump noisefile = args.noise source_tree = args.source timescale = args.timescale enable_diag = args.diagnostics enable_verbose_trace = args.trace enable_tui = args.tui predefined_signals = [] if enable_diag: predefined_signals += [ 'ST.BufferItems:u32', 'ST.BufferHealth:u8', 'ST.CompressionLevel:u8', 'ST.CompressionTime:u32', 'ST.RenderTime:u32', 'ST.ItemsSent:u32' ] signals, signals_valid = scan_for_signals(source_tree, predefined_signals) if not signals_valid: return signals.sort() tags = [k.split(":")[0] for k in signals] dfile = open(tracefile, 'w', encoding='utf-8') vcd_sink = VcdSink(dfile, signals, timescale) retagger = Retagger(vcd_sink.process, tags) packet_filter = Filter(retagger.process) nfile = None if noisefile: nfile = open(noisefile, 'wb') packet_filter.onnoise(lambda b: nfile.write(b.to_bytes(1))) if enable_tui: tui_record(signals, packet_filter, vcd_sink, enable_verbose_trace) else: record(signals, packet_filter, vcd_sink, enable_verbose_trace) vcd_sink.writer.close() dfile.close() if nfile: nfile.close() print("Summary:") packet_count = packet_filter.packet_counter drop_count = packet_filter.packets_dropped + vcd_sink.packets_dropped trace_size = human_readable_size(os.path.getsize(tracefile)) print(f" - Packets received: {packet_count}") print(f" - Packets dropped: {drop_count}") print(f" - Trace file: {tracefile}") print(f" - Trace size: {trace_size}") def record(signals, packet_filter, vcd_sink, enable_verbose_trace): print("Signals:") for var in signals: print(f" - {var}") print() packet_filter.onnoise(lambda b: print(chr(b), end="", flush=True)) def onval(timestamp, tag, value, sub): if sub is not None: print(f"### {timestamp:012} : {tag}[{sub}] <= {value}") else: print(f"### {timestamp:012} : {tag} <= {value}") if enable_verbose_trace: vcd_sink.onanyvalue(onval) print(" === BEGIN NOISE ===") try: for bstr in sys.stdin.buffer: for b in bstr: packet_filter.process(b) except KeyboardInterrupt: pass print() print(" === END NOISE ===") print() class NoiseLineBuffer: def __init__(self, online): self.buffer = "" self.online = online def add(self, b): t = chr(b) if t == "\n": self.online(self.buffer) self.buffer = "" else: self.buffer += t class TotalMaximumProgressUpdater: def __init__(self, progress, progress_task): self.progress = progress self.progress_task = progress_task self.maximum = 0 def update(self, value): if value > self.maximum: self.maximum = value self.progress.update(self.progress_task, total=self.maximum) self.progress.update(self.progress_task, completed=value, visible=True) class SignalTable: def __init__(self, signals): self.values = {} self.arraycache = {} self.types = {} for signal in signals: name,value_type = signal.split(":") self.values[name] = None if len(s := value_type.split("[")) > 1: value_type = s[0] array_len = int(s[1].split("]")[0], 0) self.arraycache[name] = [None] * array_len self.types[name] = value_type def update(self, time, signal, value, sub): try: value_str = "" value_type = self.types[signal] match value_type: case "u8"|"s8": value_str = f"x{value:02X}" case "u16"|"s16": value_str = f"x{value:04X}" case "u32"|"s32": value_str = f"x{value:08X}" case "string": value_str = value.rstrip("\r\n") case "event": value_str = str(time) if (signal in self.arraycache) and (sub is not None): if sub >= len(self.arraycache[signal]): self.arraycache[signal].extend([0] * (sub - len(self.arraycache[signal]) + 1)) self.arraycache[signal][sub] = value_str value_str = "[" + ", ".join(['X' if x is None else x for x in self.arraycache[signal]]) + "]" self.values[signal] = value_str except Exception as e: print("EEE", e) print(f"{sub=} {signal=} {len(self.arraycache[signal])=}") print('Error on line {}'.format(sys.exc_info()[-1].tb_lineno), type(e).__name__, e) def tui_record(signals, packet_filter, vcd_sink, enable_verbose_trace): try: from rich.console import Console from rich.text import Text from rich.layout import Layout from rich.progress import Progress, TextColumn, BarColumn, TaskProgressColumn, MofNCompleteColumn from rich.live import Live from rich.align import Align from rich.table import Table except: print("error: TUI mode requires the rich package") exit() if enable_verbose_trace: print("warning: verbose trace is not avaialble in TUI mode") print() console = Console() noise_buffer = NoiseLineBuffer(lambda text: console.print(f"[blue]{text}")) trace_text = Text("") # set up progress bars progress_colums = [ TextColumn("[progress.description]{task.description}"), BarColumn(bar_width=80, complete_style="gold3", finished_style="red"), TaskProgressColumn(), MofNCompleteColumn() ] diag_progress = Progress(*progress_colums, transient=True, auto_refresh=False, refresh_per_second=1) buffer_health = diag_progress.add_task("[green]Buffer Health", total=255, visible=False) buffer_items = diag_progress.add_task("[green]Buffer Items", visible=False) items_sent = diag_progress.add_task("[blue]Items Sent", total=1024, visible=False) render_time = diag_progress.add_task("[blue]Render Time", total=1024, visible=False) comp_lvl = diag_progress.add_task("[yellow]Compression Level", total=100, visible=False) comp_time = diag_progress.add_task("[yellow]Compression Time", total=100, visible=False) buffer_items_tm = TotalMaximumProgressUpdater(diag_progress, buffer_items) items_sent_tm = TotalMaximumProgressUpdater(diag_progress, items_sent) render_time_tm = TotalMaximumProgressUpdater(diag_progress, render_time) comp_time_tm = TotalMaximumProgressUpdater(diag_progress, comp_time) # set up table layout and signal view signal_values = SignalTable(signals) vcd_sink.onanyvalue(signal_values.update) def generate_table(diag_progress, signal_values): grid = Table.grid(expand=False) grid.add_column(justify="left") signals_width = max([len(x) for x in signal_values.values.keys()]) + 2 sigtable = Table.grid(expand=False) sigtable.add_column(justify="left", width=10) sigtable.add_column(justify="left", width=signals_width) sigtable.add_column(justify="left") sigtable.add_row(None) # this empty row is there to not leave behind a render on interrupt for sig in signals: name, sigtype = sig.split(":") value = signal_values.values[name] text = Text("X" if value is None else value, style="red" if value is None else "green") sigtable.add_row(sigtype, name, text) grid.add_row(sigtable) grid.add_row(None) grid.add_row(diag_progress) return grid with Live(console=console, transient=True) as live_status: vcd_sink.onvalue("ST.BufferHealth", lambda _,value,sub: diag_progress.update(buffer_health, completed=value, visible=True)) vcd_sink.onvalue("ST.BufferItems", lambda _,value,sub: buffer_items_tm.update(value)) vcd_sink.onvalue("ST.ItemsSent", lambda _,value,sub: items_sent_tm.update(value)) vcd_sink.onvalue("ST.CompressionLevel", lambda _,value,sub: diag_progress.update(comp_lvl, completed=value, visible=True)) vcd_sink.onvalue("ST.CompressionTime", lambda _,value,sub: comp_time_tm.update(value)) vcd_sink.onvalue("ST.RenderTime", lambda _,value,sub: render_time_tm.update(value)) packet_filter.onnoise(noise_buffer.add) try: for bstr in sys.stdin.buffer: for b in bstr: packet_filter.process(b) live_status.update(generate_table(diag_progress, signal_values)) except KeyboardInterrupt: diag_progress.stop() print() if __name__ == '__main__': main()