Name change due to confusion with the common programming verb "set". Once again I cannot settle on a name.:
552 lines
19 KiB
Python
552 lines
19 KiB
Python
# -*- coding: utf-8 -*-
|
|
|
|
import sys
|
|
import os
|
|
import re
|
|
import struct
|
|
import argparse
|
|
import datetime
|
|
from functools import partial
|
|
from vcd import VCDWriter, writer
|
|
from vcd.common import VarType
|
|
|
|
header = """
|
|
▓▓▓▓▓ ░░░ ███████ ███████ ████████
|
|
▓ ▓ ▒ ░ ██ ██ ██ Streaming Event Trace
|
|
▓ ▓ ▓ ░░ ███████ █████ ██ VCD Recorder Utility
|
|
▓ ▓ ▒ ░ ██ ██ ██
|
|
▓▓▓▓▓ ░░░ ███████ ███████ ██ (c) 2025 D.Hoeglinger
|
|
|
|
"""
|
|
|
|
def human_readable_size(size, decimal_places=2):
|
|
for unit in ['B', 'KiB', 'MiB', 'GiB', 'TiB', 'PiB']:
|
|
if size < 1024.0 or unit == 'PiB':
|
|
break
|
|
size /= 1024.0
|
|
return f"{size:.{decimal_places}f} {unit}"
|
|
|
|
def is_hex(s):
|
|
return all(chr(c) in '0123456789abcdefABCDEF' for c in s)
|
|
|
|
def bsd_hash(data):
|
|
checksum = 0
|
|
for c in data:
|
|
checksum = (checksum >> 1) + ((checksum & 1) << 15)
|
|
checksum += ord(c)
|
|
checksum = checksum & 0xFFFF
|
|
return checksum
|
|
|
|
def hex_to_int(hex_digit):
|
|
if hex_digit.isdigit():
|
|
return int(hex_digit)
|
|
elif hex_digit in 'ABCDEF':
|
|
return ord(hex_digit) - ord('A') + 10
|
|
elif hex_digit in 'abcdef':
|
|
return ord(hex_digit) - ord('a') + 10
|
|
else:
|
|
raise ValueError("Invalid hexadecimal digit")
|
|
|
|
def decode_hexstr(payload):
|
|
value = bytearray()
|
|
for i in range(0, len(payload), 2):
|
|
b = hex_to_int(chr(payload[i]))
|
|
b |= hex_to_int(chr(payload[i + 1])) << 4
|
|
value.append(b)
|
|
return bytes(value)
|
|
|
|
def hexstr(payload):
|
|
return payload.hex().upper()
|
|
|
|
def decode_binstr(payload):
|
|
value = 0
|
|
for i,b in enumerate(payload):
|
|
value |= b << (8 * i)
|
|
return value
|
|
|
|
class DecodeError(Exception):
|
|
pass
|
|
|
|
def cobs_decode(enc, delim):
|
|
enc = list(enc)
|
|
enc.append(delim)
|
|
enc2 = enc[0:-1]
|
|
length = len(enc2)
|
|
if delim != 0x00:
|
|
for i in range(0, len(enc2)):
|
|
enc2[i] = enc[i] ^ delim
|
|
dec = []
|
|
code = 0xFF
|
|
block = 0
|
|
for i in range(0, length):
|
|
byte = enc2[i]
|
|
if block != 0:
|
|
dec.append(byte)
|
|
else:
|
|
if (i + byte) > len(enc):
|
|
raise DecodeError(f"Marker pointing to end of packet, at {i}, marker={byte}")
|
|
if code != 0xFF:
|
|
dec.append(0)
|
|
code = byte
|
|
block = code
|
|
if code == 0:
|
|
break
|
|
block = block - 1
|
|
return bytes(dec)
|
|
|
|
def _memmove(data: bytearray, stidx: int, offset: int, mlen: int) -> None:
|
|
for i in range(mlen):
|
|
data[stidx + i] = data[stidx - offset + i]
|
|
|
|
def fastlz_decompress_lv1(datain, doutlen):
|
|
opcode_0 = datain[0]
|
|
datain_idx = 1
|
|
|
|
dataout = bytearray(doutlen)
|
|
dataout_idx = 0;
|
|
|
|
while True:
|
|
op_type = opcode_0 >> 5
|
|
op_data = opcode_0 & 31
|
|
|
|
if op_type == 0b000:
|
|
# literal run
|
|
run = 1 + opcode_0
|
|
dataout[dataout_idx:dataout_idx + run] = datain[datain_idx:datain_idx + run]
|
|
datain_idx += run
|
|
dataout_idx += run
|
|
|
|
elif op_type == 0b111:
|
|
# long match
|
|
opcode_1 = datain[datain_idx]
|
|
datain_idx += 1
|
|
opcode_2 = datain[datain_idx]
|
|
datain_idx += 1
|
|
|
|
match_len = 9 + opcode_1
|
|
ofs = (op_data << 8) + opcode_2 + 1
|
|
|
|
_memmove(dataout, dataout_idx, ofs, match_len)
|
|
dataout_idx += match_len
|
|
|
|
else:
|
|
# short match
|
|
opcode_1 = datain[datain_idx]
|
|
datain_idx += 1
|
|
|
|
match_len = 2 + op_type
|
|
ofs = (op_data << 8) + opcode_1 + 1
|
|
|
|
_memmove(dataout, dataout_idx, ofs, match_len)
|
|
dataout_idx += match_len
|
|
|
|
if datain_idx < len(datain):
|
|
opcode_0 = datain[datain_idx]
|
|
datain_idx += 1
|
|
else:
|
|
break
|
|
|
|
return bytes(dataout[:dataout_idx])
|
|
|
|
def scan_for_signals(directory, predefined_signals):
|
|
library_files = ['pet.c', 'pet.h']
|
|
rx_events = re.compile(r'pet_evtrace\(\"([^\"]+)\"')
|
|
rx_scalars = re.compile(r'pet_([usf])(8|16|32)trace\(\"([^\"]+)\"')
|
|
rx_arrays = re.compile(r'pet_a([us])(8|16|32)trace\(\"([^\"]+)\"\,\s*[^,]+,\s*((?:0x)?[a-zA-Z0-9]+)')
|
|
rx_strings = re.compile(r'pet_strtrace\(\"([^\"]+)\"')
|
|
signals = {}
|
|
valid = True
|
|
|
|
def add_variable(tag, variable):
|
|
hash = bsd_hash(tag)
|
|
if hash in signals:
|
|
if signals[hash] != variable:
|
|
print(f"{file_path}: error: variable `{variable}` has conflicting hash with `{signals[hash]}`")
|
|
valid = False
|
|
else:
|
|
signals[hash] = variable
|
|
|
|
for predef_signal in predefined_signals:
|
|
sig,_ = predef_signal.split(":")
|
|
add_variable(sig, predef_signal)
|
|
|
|
for root, _, files in os.walk(directory):
|
|
for file in files:
|
|
if file.endswith(('.c', '.h')) and (file not in library_files):
|
|
file_path = os.path.join(root, file)
|
|
with open(file_path, 'r') as f:
|
|
content = f.read()
|
|
for match in rx_events.finditer(content):
|
|
tag = match.group(1)
|
|
variable = f"{tag}:event"
|
|
add_variable(tag, variable)
|
|
|
|
for match in rx_scalars.finditer(content):
|
|
tag = match.group(3)
|
|
variable = f"{tag}:{match.group(1)}{match.group(2)}"
|
|
add_variable(tag, variable)
|
|
|
|
for match in rx_arrays.finditer(content):
|
|
tag = match.group(3)
|
|
variable = f"{tag}:{match.group(1)}{match.group(2)}[{int(match.group(4),0)}]"
|
|
add_variable(tag, variable)
|
|
|
|
for match in rx_strings.finditer(content):
|
|
tag = match.group(1)
|
|
variable = f"{tag}:string"
|
|
add_variable(tag, variable)
|
|
|
|
return list(signals.values()), valid
|
|
|
|
class Filter:
|
|
PREAMBLE = b"\x1b[s"
|
|
EPILOUGE = b"\x1b[u"
|
|
TAGCODE_LUT = {
|
|
0x11 : "D1",
|
|
0x12 : "D2",
|
|
0x14 : "D4",
|
|
0x21 : "V1",
|
|
0x22 : "V2",
|
|
0x24 : "V4",
|
|
0x31 : "A1",
|
|
0x32 : "A2",
|
|
0x34 : "A4",
|
|
0x44 : "S4",
|
|
0x54 : "F4",
|
|
0x60 : "EV"
|
|
}
|
|
|
|
FLAGS_COMPRESSED = (1 << 0)
|
|
|
|
def __init__(self, on_value, on_noise):
|
|
self.preamble_i = 0
|
|
self.epilouge_i = 0
|
|
self.packet_buffer = []
|
|
self.noise_buffer = []
|
|
self.process_value = on_value
|
|
self.process_noise = on_noise
|
|
self.packet_counter = 0
|
|
self.packets_dropped = 0
|
|
|
|
def process(self, b):
|
|
if self.preamble_i == (len(self.PREAMBLE)):
|
|
self.packet_buffer.append(b)
|
|
if b == self.EPILOUGE[self.epilouge_i]:
|
|
self.epilouge_i += 1
|
|
else:
|
|
self.epilouge_i = 0
|
|
|
|
if self.epilouge_i == (len(self.EPILOUGE)):
|
|
self.process_packet(self.packet_buffer[:-len(self.EPILOUGE)])
|
|
self.packet_buffer = []
|
|
self.preamble_i = 0
|
|
self.epilouge_i = 0
|
|
else:
|
|
if b == self.PREAMBLE[self.preamble_i]:
|
|
self.preamble_i += 1
|
|
self.noise_buffer.append(b)
|
|
else:
|
|
self.preamble_i = 0
|
|
for nb in self.noise_buffer:
|
|
self.process_noise(nb)
|
|
self.noise_buffer = []
|
|
self.process_noise(b)
|
|
|
|
def disassemble_packet(self, packet):
|
|
try:
|
|
tagcode = packet[0]
|
|
# ignore rubout packets
|
|
if chr(tagcode) == ' ':
|
|
return
|
|
|
|
if tagcode not in self.TAGCODE_LUT:
|
|
self.packets_dropped += 1
|
|
return
|
|
|
|
tag = self.TAGCODE_LUT[tagcode]
|
|
|
|
value = None
|
|
offset = 1
|
|
match tag:
|
|
case "D1"|"V1"|"A1":
|
|
value = decode_binstr(packet[offset:offset+1])
|
|
offset += 1
|
|
case "D2"|"V2"|"A2":
|
|
value = decode_binstr(packet[offset:offset+2])
|
|
offset += 2
|
|
case "D4"|"V4"|"A4"|"F4"|"S4":
|
|
value = decode_binstr(packet[offset:offset+4])
|
|
offset += 4
|
|
sub = None
|
|
if tag[0] == 'A' or tag[0] == 'S':
|
|
sub = decode_binstr(packet[offset:offset+1])
|
|
offset += 1
|
|
|
|
hashtag = None
|
|
if tag[0] != 'D':
|
|
hashtag = decode_binstr(packet[-2:])
|
|
except Exception as ex:
|
|
self.packets_dropped += 1
|
|
return
|
|
|
|
self.process_value(hashtag, value, sub, tag)
|
|
self.packet_counter += 1
|
|
|
|
def disassemble_macropacket(self, packet):
|
|
flags = packet[0]
|
|
payload = packet[1:]
|
|
|
|
try:
|
|
macropack = cobs_decode(payload, 0x1B)
|
|
if self.FLAGS_COMPRESSED & flags:
|
|
frames = fastlz_decompress_lv1(macropack, 1024)
|
|
else:
|
|
frames = macropack
|
|
macropack_delim = 0x00
|
|
for pack in frames.split(bytes(bytearray([macropack_delim]))):
|
|
if len(pack) == 0:
|
|
continue
|
|
try:
|
|
decoded = cobs_decode(pack, macropack_delim)
|
|
self.disassemble_packet(decoded)
|
|
except:
|
|
self.packets_dropped += 1
|
|
except:
|
|
self.packets_dropped += 1
|
|
|
|
|
|
def process_packet(self, packet):
|
|
if len(packet) == 0:
|
|
return
|
|
if is_hex(packet):
|
|
packet = decode_hexstr(packet)
|
|
if True or (len(packet) > 3 + 8 + 3):
|
|
self.disassemble_macropacket(packet)
|
|
else:
|
|
self.disassemble_packet(packet)
|
|
|
|
class Retagger:
|
|
def __init__(self, on_value, tags=None):
|
|
tags = tags or []
|
|
self._tag_lut = {bsd_hash(tag):tag for tag in tags}
|
|
self.process_value = on_value
|
|
self.packets_dropped = 0
|
|
|
|
def process(self, hashtag, number, sub, datatag):
|
|
if hashtag in self._tag_lut or datatag[0] == 'D':
|
|
tag = self._tag_lut.get(hashtag, None)
|
|
self.process_value(tag, number, sub, datatag)
|
|
else:
|
|
self.packets_dropped += 1
|
|
|
|
class VcdSink:
|
|
def __init__(self, fs, signals, timescale='1 us', verbose_trace=False):
|
|
self.writer = VCDWriter(fs, timescale=timescale, date=datetime.datetime.now().isoformat(), version=f"PET v1.0")
|
|
self.skalars = {}
|
|
self.arrays = {}
|
|
self.strings = {}
|
|
self.varnames = {}
|
|
self.timestamp = 0
|
|
self.packets_dropped = 0
|
|
self.verbose_trace = verbose_trace
|
|
for v in signals:
|
|
hvar, vtype = v.split(":")
|
|
hier, _, name = hvar.rpartition(".")
|
|
arr = None
|
|
s = vtype.split("[")
|
|
if len(s) == 2:
|
|
vtype, arr = s
|
|
dsize = 32
|
|
dtype = 'integer'
|
|
match vtype:
|
|
case 'event':
|
|
dtype = 'event'
|
|
dsize = 32
|
|
case 'f32':
|
|
dtype = 'real'
|
|
dsize = 32
|
|
case 'u32'|'s32':
|
|
dtype = 'integer'
|
|
dsize = 32
|
|
case 'u16'|'s16':
|
|
dtype = 'integer'
|
|
dsize = 16
|
|
case 'u8'|'s8':
|
|
dtype = 'integer'
|
|
dsize = 8
|
|
case 'string':
|
|
dtype = 'string'
|
|
dsize = 8
|
|
|
|
self.varnames[hvar] = hvar
|
|
|
|
if arr is not None:
|
|
elems = int(arr.rstrip("]"))
|
|
vars = []
|
|
for i in range(0, elems):
|
|
vars.append(self.writer.register_var(hvar, f"{name}[{i}:{(i+1)}]", 'wire', size=dsize))
|
|
self.arrays[hvar] = vars
|
|
elif dtype == 'string':
|
|
self.strings[hvar] = [self.writer.register_var(hier, name, dtype, size=dsize), ""]
|
|
else:
|
|
self.skalars[hvar] = self.writer.register_var(hier, name, dtype, size=dsize)
|
|
|
|
def process(self, tag, value, sub, datatag):
|
|
if datatag[0] == 'D':
|
|
self.timestamp += value
|
|
# array values
|
|
elif datatag[0] == 'A':
|
|
timestamp = self.timestamp
|
|
try:
|
|
if self.verbose_trace:
|
|
print(f"### {timestamp:012} : {self.varnames[tag]}[{sub}] <= {value} [OK] ", flush=True)
|
|
self.writer.change(self.arrays[tag][sub], timestamp, value)
|
|
except ValueError:
|
|
print(f"### {timestamp:012} : {self.varnames[tag]}[{sub}] <= {value} [VAL_ERR] ", flush=True)
|
|
self.packets_dropped += 1
|
|
except writer.VCDPhaseError:
|
|
print(f"### {timestamp:012} : {self.varnames[tag]}[{sub}] <= {value} [PHA_ERR] ", flush=True)
|
|
self.packets_dropped += 1
|
|
except:
|
|
print(f"### {timestamp:012} : {self.varnames[tag]}[{sub}] <= {value} [ERR] ", flush=True)
|
|
self.packets_dropped += 1
|
|
elif datatag == 'S4':
|
|
timestamp = self.timestamp
|
|
# unpack
|
|
for i in range(0,4):
|
|
char = value >> (i*8) & 0xFF
|
|
if char != 0:
|
|
self.strings[tag][1] += chr(char)
|
|
# sub of 1 indicates end of string
|
|
if sub == 1:
|
|
try:
|
|
string = self.strings[tag][1]
|
|
if self.verbose_trace:
|
|
print(f"### {timestamp:012} : {self.varnames[tag]} <= \"{string}\"", flush=True)
|
|
self.writer.change(self.strings[tag][0], timestamp, string)
|
|
except ValueError:
|
|
print(f"### {timestamp:012} : {self.varnames[tag]} <= \"{self.strings[tag][1]}\" [VAL_ERR] ", flush=True)
|
|
self.packets_dropped += 1
|
|
except writer.VCDPhaseError:
|
|
print(f"### {timestamp:012} : {self.varnames[tag]} <= \"{self.strings[tag][1]}\" [PHA_ERR] ", flush=True)
|
|
self.packets_dropped += 1
|
|
except:
|
|
print(f"### {timestamp:012} : {self.varnames[tag]} <= \"{self.strings[tag][1]}\" [ERR] ", flush=True)
|
|
self.packets_dropped += 1
|
|
self.strings[tag][1] = ""
|
|
|
|
# skalar values
|
|
elif (datatag == 'EV') or (datatag[0] == 'V') or (datatag[0] == 'F'):
|
|
timestamp = self.timestamp
|
|
try:
|
|
if self.skalars[tag].type == VarType.event:
|
|
value = True
|
|
elif datatag == 'F4':
|
|
value = struct.unpack(">f", struct.pack(">L", value))[0]
|
|
if self.verbose_trace:
|
|
print(f"### {timestamp:012} : {self.varnames[tag]} <= {value:08X}", flush=True)
|
|
self.writer.change(self.skalars[tag], timestamp, value)
|
|
except ValueError:
|
|
print(f"### {timestamp:012} : {self.varnames[tag]} <= {value} [VAL_ERR] ", flush=True)
|
|
self.packets_dropped += 1
|
|
except writer.VCDPhaseError:
|
|
print(f"### {timestamp:012} : {self.varnames[tag]} <= {value} [PHA_ERR] ", flush=True)
|
|
self.packets_dropped += 1
|
|
except:
|
|
print(f"### {timestamp:012} : {self.varnames[tag]} <= {value} [ERR] ", flush=True)
|
|
self.packets_dropped += 1
|
|
|
|
def process_noise(noisefile, b):
|
|
print(chr(b), end="", flush=True)
|
|
if noisefile:
|
|
noisefile.write(b.to_bytes(1))
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description="scans stdin for PET packets and dumps the values into a VCD file")
|
|
parser.add_argument('-d', '--dump', type=str, required=True,
|
|
help='output IEEE 1364-2005 Value Change Dump (vcd) file')
|
|
parser.add_argument('-t', '--timescale', type=str, default="1 us",
|
|
help='period of one timestamp tick')
|
|
parser.add_argument('-n', '--noise', type=str, default=None,
|
|
help='store the stdin data sans packets in this file')
|
|
parser.add_argument('-s', '--source', type=str, required=True,
|
|
help='source tree to scan for trace marks')
|
|
parser.add_argument('--diagnostics', action=argparse.BooleanOptionalAction,
|
|
help='add additional signals tracing internal state of PET')
|
|
parser.add_argument('--trace', action=argparse.BooleanOptionalAction,
|
|
help='write out every trace that arrives')
|
|
args = parser.parse_args()
|
|
|
|
print(header)
|
|
|
|
tracefile = args.dump
|
|
noisefile = args.noise
|
|
source_tree = args.source
|
|
timescale = args.timescale
|
|
enable_diag = args.diagnostics
|
|
enable_verbose_trace = args.trace
|
|
|
|
predefined_signals = []
|
|
if enable_diag:
|
|
predefined_signals += [
|
|
'SET.BufferItems:u32',
|
|
'SET.BufferHealth:u8',
|
|
'SET.CompressionLevel:u8',
|
|
'SET.CompressionTime:u32',
|
|
'SET.RenderTime:u32',
|
|
'SET.ItemsSent:u32'
|
|
]
|
|
|
|
signals, signals_valid = scan_for_signals(source_tree, predefined_signals)
|
|
|
|
if not signals_valid:
|
|
return
|
|
|
|
signals.sort()
|
|
tags = [k.split(":")[0] for k in signals]
|
|
|
|
dfile = open(tracefile, 'w', encoding='utf-8')
|
|
|
|
process_noise_p = partial(process_noise, None)
|
|
nfile = None
|
|
if noisefile:
|
|
nfile = open(noisefile, 'wb')
|
|
process_noise_p = partial(process_noise, nfile)
|
|
|
|
vcd_sink = VcdSink(dfile, signals, timescale, enable_verbose_trace)
|
|
retagger = Retagger(vcd_sink.process, tags)
|
|
packet_filter = Filter(retagger.process, process_noise_p)
|
|
print("Signals:")
|
|
for var in signals:
|
|
print(f" - {var}")
|
|
print()
|
|
|
|
print(" === BEGIN NOISE ===")
|
|
try:
|
|
for bstr in sys.stdin.buffer:
|
|
for b in bstr:
|
|
packet_filter.process(b)
|
|
except KeyboardInterrupt:
|
|
pass
|
|
|
|
print()
|
|
print(" === END NOISE ===")
|
|
print()
|
|
|
|
vcd_sink.writer.close()
|
|
dfile.close()
|
|
if nfile:
|
|
nfile.close()
|
|
|
|
print("Summary:")
|
|
packet_count = packet_filter.packet_counter
|
|
drop_count = packet_filter.packets_dropped + vcd_sink.packets_dropped
|
|
trace_size = human_readable_size(os.path.getsize(tracefile))
|
|
print(f" - Packets received: {packet_count}")
|
|
print(f" - Packets dropped: {drop_count}")
|
|
print(f" - Trace file: {tracefile}")
|
|
print(f" - Trace size: {trace_size}")
|
|
|
|
if __name__ == '__main__':
|
|
main()
|