# -*- coding: utf-8 -*- import sys import os import re import struct import argparse import datetime from functools import partial from vcd import VCDWriter, writer from vcd.common import VarType header = """ ▓▓▓▓▓ ░░░ ███████ ███████ ████████ ▓ ▓ ▒ ░ ██ ██ ██ Streaming Event Trace ▓ ▓ ▓ ░░ ███████ █████ ██ VCD Recorder Utility ▓ ▓ ▒ ░ ██ ██ ██ ▓▓▓▓▓ ░░░ ███████ ███████ ██ (c) 2025 D.Hoeglinger """ def human_readable_size(size, decimal_places=2): for unit in ['B', 'KiB', 'MiB', 'GiB', 'TiB', 'PiB']: if size < 1024.0 or unit == 'PiB': break size /= 1024.0 return f"{size:.{decimal_places}f} {unit}" def is_hex(s): return all(chr(c) in '0123456789abcdefABCDEF' for c in s) def bsd_hash(data): checksum = 0 for c in data: checksum = (checksum >> 1) + ((checksum & 1) << 15) checksum += ord(c) checksum = checksum & 0xFFFF return checksum def hex_to_int(hex_digit): if hex_digit.isdigit(): return int(hex_digit) elif hex_digit in 'ABCDEF': return ord(hex_digit) - ord('A') + 10 elif hex_digit in 'abcdef': return ord(hex_digit) - ord('a') + 10 else: raise ValueError("Invalid hexadecimal digit") def decode_hexstr(payload): value = bytearray() for i in range(0, len(payload), 2): b = hex_to_int(chr(payload[i])) b |= hex_to_int(chr(payload[i + 1])) << 4 value.append(b) return bytes(value) def hexstr(payload): return payload.hex().upper() def decode_binstr(payload): value = 0 for i,b in enumerate(payload): value |= b << (8 * i) return value class DecodeError(Exception): pass def cobs_decode(enc, delim): enc = list(enc) enc.append(delim) enc2 = enc[0:-1] length = len(enc2) if delim != 0x00: for i in range(0, len(enc2)): enc2[i] = enc[i] ^ delim dec = [] code = 0xFF block = 0 for i in range(0, length): byte = enc2[i] if block != 0: dec.append(byte) else: if (i + byte) > len(enc): raise DecodeError(f"Marker pointing to end of packet, at {i}, marker={byte}") if code != 0xFF: dec.append(0) code = byte block = code if code == 0: break block = block - 1 return bytes(dec) def _memmove(data: bytearray, stidx: int, offset: int, mlen: int) -> None: for i in range(mlen): data[stidx + i] = data[stidx - offset + i] def fastlz_decompress_lv1(datain, doutlen): opcode_0 = datain[0] datain_idx = 1 dataout = bytearray(doutlen) dataout_idx = 0; while True: op_type = opcode_0 >> 5 op_data = opcode_0 & 31 if op_type == 0b000: # literal run run = 1 + opcode_0 dataout[dataout_idx:dataout_idx + run] = datain[datain_idx:datain_idx + run] datain_idx += run dataout_idx += run elif op_type == 0b111: # long match opcode_1 = datain[datain_idx] datain_idx += 1 opcode_2 = datain[datain_idx] datain_idx += 1 match_len = 9 + opcode_1 ofs = (op_data << 8) + opcode_2 + 1 _memmove(dataout, dataout_idx, ofs, match_len) dataout_idx += match_len else: # short match opcode_1 = datain[datain_idx] datain_idx += 1 match_len = 2 + op_type ofs = (op_data << 8) + opcode_1 + 1 _memmove(dataout, dataout_idx, ofs, match_len) dataout_idx += match_len if datain_idx < len(datain): opcode_0 = datain[datain_idx] datain_idx += 1 else: break return bytes(dataout[:dataout_idx]) def scan_for_signals(directory, predefined_signals): library_files = ['pet.c', 'pet.h'] rx_events = re.compile(r'pet_evtrace\(\"([^\"]+)\"') rx_scalars = re.compile(r'pet_([usf])(8|16|32)trace\(\"([^\"]+)\"') rx_arrays = re.compile(r'pet_a([us])(8|16|32)trace\(\"([^\"]+)\"\,\s*[^,]+,\s*((?:0x)?[a-zA-Z0-9]+)') rx_strings = re.compile(r'pet_strtrace\(\"([^\"]+)\"') signals = {} valid = True def add_variable(tag, variable): hash = bsd_hash(tag) if hash in signals: if signals[hash] != variable: print(f"{file_path}: error: variable `{variable}` has conflicting hash with `{signals[hash]}`") valid = False else: signals[hash] = variable for predef_signal in predefined_signals: sig,_ = predef_signal.split(":") add_variable(sig, predef_signal) for root, _, files in os.walk(directory): for file in files: if file.endswith(('.c', '.h')) and (file not in library_files): file_path = os.path.join(root, file) with open(file_path, 'r') as f: content = f.read() for match in rx_events.finditer(content): tag = match.group(1) variable = f"{tag}:event" add_variable(tag, variable) for match in rx_scalars.finditer(content): tag = match.group(3) variable = f"{tag}:{match.group(1)}{match.group(2)}" add_variable(tag, variable) for match in rx_arrays.finditer(content): tag = match.group(3) variable = f"{tag}:{match.group(1)}{match.group(2)}[{int(match.group(4),0)}]" add_variable(tag, variable) for match in rx_strings.finditer(content): tag = match.group(1) variable = f"{tag}:string" add_variable(tag, variable) return list(signals.values()), valid class Filter: PREAMBLE = b"\x1b[s" EPILOUGE = b"\x1b[u" TAGCODE_LUT = { 0x11 : "D1", 0x12 : "D2", 0x14 : "D4", 0x21 : "V1", 0x22 : "V2", 0x24 : "V4", 0x31 : "A1", 0x32 : "A2", 0x34 : "A4", 0x44 : "S4", 0x54 : "F4", 0x60 : "EV" } FLAGS_COMPRESSED = (1 << 0) def __init__(self, on_value, on_noise): self.preamble_i = 0 self.epilouge_i = 0 self.packet_buffer = [] self.noise_buffer = [] self.process_value = on_value self.process_noise = on_noise self.packet_counter = 0 self.packets_dropped = 0 def process(self, b): if self.preamble_i == (len(self.PREAMBLE)): self.packet_buffer.append(b) if b == self.EPILOUGE[self.epilouge_i]: self.epilouge_i += 1 else: self.epilouge_i = 0 if self.epilouge_i == (len(self.EPILOUGE)): self.process_packet(self.packet_buffer[:-len(self.EPILOUGE)]) self.packet_buffer = [] self.preamble_i = 0 self.epilouge_i = 0 else: if b == self.PREAMBLE[self.preamble_i]: self.preamble_i += 1 self.noise_buffer.append(b) else: self.preamble_i = 0 for nb in self.noise_buffer: self.process_noise(nb) self.noise_buffer = [] self.process_noise(b) def disassemble_packet(self, packet): try: tagcode = packet[0] # ignore rubout packets if chr(tagcode) == ' ': return if tagcode not in self.TAGCODE_LUT: self.packets_dropped += 1 return tag = self.TAGCODE_LUT[tagcode] value = None offset = 1 match tag: case "D1"|"V1"|"A1": value = decode_binstr(packet[offset:offset+1]) offset += 1 case "D2"|"V2"|"A2": value = decode_binstr(packet[offset:offset+2]) offset += 2 case "D4"|"V4"|"A4"|"F4"|"S4": value = decode_binstr(packet[offset:offset+4]) offset += 4 sub = None if tag[0] == 'A' or tag[0] == 'S': sub = decode_binstr(packet[offset:offset+1]) offset += 1 hashtag = None if tag[0] != 'D': hashtag = decode_binstr(packet[-2:]) except Exception as ex: self.packets_dropped += 1 return self.process_value(hashtag, value, sub, tag) self.packet_counter += 1 def disassemble_macropacket(self, packet): flags = packet[0] payload = packet[1:] try: macropack = cobs_decode(payload, 0x1B) if self.FLAGS_COMPRESSED & flags: frames = fastlz_decompress_lv1(macropack, 1024) else: frames = macropack macropack_delim = 0x00 for pack in frames.split(bytes(bytearray([macropack_delim]))): if len(pack) == 0: continue try: decoded = cobs_decode(pack, macropack_delim) self.disassemble_packet(decoded) except: self.packets_dropped += 1 except: self.packets_dropped += 1 def process_packet(self, packet): if len(packet) == 0: return if is_hex(packet): packet = decode_hexstr(packet) if True or (len(packet) > 3 + 8 + 3): self.disassemble_macropacket(packet) else: self.disassemble_packet(packet) class Retagger: def __init__(self, on_value, tags=None): tags = tags or [] self._tag_lut = {bsd_hash(tag):tag for tag in tags} self.process_value = on_value self.packets_dropped = 0 def process(self, hashtag, number, sub, datatag): if hashtag in self._tag_lut or datatag[0] == 'D': tag = self._tag_lut.get(hashtag, None) self.process_value(tag, number, sub, datatag) else: self.packets_dropped += 1 class VcdSink: def __init__(self, fs, signals, timescale='1 us'): self.writer = VCDWriter(fs, timescale=timescale, date=datetime.datetime.now().isoformat(), version=f"PET v1.0") self.skalars = {} self.arrays = {} self.strings = {} self.varnames = {} self.timestamp = 0 self.packets_dropped = 0 for v in signals: hvar, vtype = v.split(":") hier, _, name = hvar.rpartition(".") arr = None s = vtype.split("[") if len(s) == 2: vtype, arr = s dsize = 32 dtype = 'integer' match vtype: case 'event': dtype = 'event' dsize = 32 case 'f32': dtype = 'real' dsize = 32 case 'u32'|'s32': dtype = 'integer' dsize = 32 case 'u16'|'s16': dtype = 'integer' dsize = 16 case 'u8'|'s8': dtype = 'integer' dsize = 8 case 'string': dtype = 'string' dsize = 8 self.varnames[hvar] = hvar if arr is not None: elems = int(arr.rstrip("]")) vars = [] for i in range(0, elems): vars.append(self.writer.register_var(hvar, f"{name}[{i}:{(i+1)}]", 'wire', size=dsize)) self.arrays[hvar] = vars elif dtype == 'string': self.strings[hvar] = [self.writer.register_var(hier, name, dtype, size=dsize), ""] else: self.skalars[hvar] = self.writer.register_var(hier, name, dtype, size=dsize) def process(self, tag, value, sub, datatag): if datatag[0] == 'D': self.timestamp += value # array values elif datatag[0] == 'A': timestamp = self.timestamp try: #print(f"### {timestamp:012X} : {self.varnames[tag]}[{sub}] <= {value} [OK] ", flush=True) self.writer.change(self.arrays[tag][sub], timestamp, value) except ValueError: print(f"### {timestamp:012X} : {self.varnames[tag]}[{sub}] <= {value} [VAL_ERR] ", flush=True) self.packets_dropped += 1 except writer.VCDPhaseError: print(f"### {timestamp:012X} : {self.varnames[tag]}[{sub}] <= {value} [PHA_ERR] ", flush=True) self.packets_dropped += 1 except: print(f"### {timestamp:012X} : {self.varnames[tag]}[{sub}] <= {value} [ERR] ", flush=True) self.packets_dropped += 1 elif datatag == 'S4': timestamp = self.timestamp # unpack for i in range(0,4): char = value >> (i*8) & 0xFF if char != 0: self.strings[tag][1] += chr(char) # sub of 1 indicates end of string if sub == 1: try: string = self.strings[tag][1] #print(f"### {timestamp:012X} : {self.varnames[tag]} <= \"{self.strings[tag][1]}\"", flush=True) self.writer.change(self.strings[tag][0], timestamp, self.strings[tag][1]) except ValueError: print(f"### {timestamp:012X} : {self.varnames[tag]} <= \"{self.strings[tag][1]}\" [VAL_ERR] ", flush=True) self.packets_dropped += 1 except writer.VCDPhaseError: print(f"### {timestamp:012X} : {self.varnames[tag]} <= \"{self.strings[tag][1]}\" [PHA_ERR] ", flush=True) self.packets_dropped += 1 except: print(f"### {timestamp:012X} : {self.varnames[tag]} <= \"{self.strings[tag][1]}\" [ERR] ", flush=True) self.packets_dropped += 1 self.strings[tag][1] = "" # skalar values elif (datatag == 'EV') or (datatag[0] == 'V') or (datatag[0] == 'F'): timestamp = self.timestamp try: if self.skalars[tag].type == VarType.event: value = True elif datatag == 'F4': value = struct.unpack(">f", struct.pack(">L", value))[0] #print(f"### {timestamp:012X} : {self.varnames[tag]} <= {value:08X}", flush=True) self.writer.change(self.skalars[tag], timestamp, value) except ValueError: print(f"### {timestamp:012X} : {self.varnames[tag]} <= {value} [VAL_ERR] ", flush=True) self.packets_dropped += 1 except writer.VCDPhaseError: print(f"### {timestamp:012X} : {self.varnames[tag]} <= {value} [PHA_ERR] ", flush=True) self.packets_dropped += 1 except: print(f"### {timestamp:012X} : {self.varnames[tag]} <= {value} [ERR] ", flush=True) self.packets_dropped += 1 def process_noise(noisefile, b): print(chr(b), end="", flush=True) if noisefile: noisefile.write(b.to_bytes(1)) def main(): parser = argparse.ArgumentParser(description="scans stdin for PET packets and dumps the values into a VCD file") parser.add_argument('-d', '--dump', type=str, required=True, help='output IEEE 1364-2005 Value Change Dump (vcd) file') parser.add_argument('-t', '--timescale', type=str, default="1 us", help='period of one timestamp tick') parser.add_argument('-n', '--noise', type=str, default=None, help='store the stdin data sans packets in this file') parser.add_argument('-s', '--source', type=str, required=True, help='source tree to scan for trace marks') parser.add_argument('--diagnostics', action=argparse.BooleanOptionalAction, help='add additional signals tracing internal state of PET') args = parser.parse_args() print(header) tracefile = args.dump noisefile = args.noise source_tree = args.source timescale = args.timescale enable_diag = args.diagnostics predefined_signals = [] if enable_diag: predefined_signals += [ 'PET.BufferItems:u32', 'PET.BufferHealth:u8', 'PET.CompressionLevel:u8', 'PET.CompressionTime:u32', 'PET.RenderTime:u32', 'PET.ItemsSent:u32' ] signals, signals_valid = scan_for_signals(source_tree, predefined_signals) if not signals_valid: return signals.sort() tags = [k.split(":")[0] for k in signals] dfile = open(tracefile, 'w', encoding='utf-8') process_noise_p = partial(process_noise, None) nfile = None if noisefile: nfile = open(noisefile, 'wb') process_noise_p = partial(process_noise, nfile) vcd_sink = VcdSink(dfile, signals, timescale) retagger = Retagger(vcd_sink.process, tags) packet_filter = Filter(retagger.process, process_noise_p) print("Signals:") for var in signals: print(f" - {var}") print() print(" === BEGIN NOISE ===") try: for bstr in sys.stdin.buffer: for b in bstr: packet_filter.process(b) except KeyboardInterrupt: pass print() print(" === END NOISE ===") print() vcd_sink.writer.close() dfile.close() if nfile: nfile.close() print("Summary:") packet_count = packet_filter.packet_counter drop_count = packet_filter.packets_dropped + vcd_sink.packets_dropped trace_size = human_readable_size(os.path.getsize(tracefile)) print(f" - Packets received: {packet_count}") print(f" - Packets dropped: {drop_count}") print(f" - Trace file: {tracefile}") print(f" - Trace size: {trace_size}") if __name__ == '__main__': main()