st/set_record.py
Dominic Höglinger bf27293543 set_record.py: Add verbose tracing option
The commented out print statements tracking values arriving
are now switchable using the --trace flag.
2025-05-15 20:01:11 +02:00

552 lines
19 KiB
Python

# -*- coding: utf-8 -*-
import sys
import os
import re
import struct
import argparse
import datetime
from functools import partial
from vcd import VCDWriter, writer
from vcd.common import VarType
header = """
▓▓▓▓▓ ░░░ ███████ ███████ ████████
▓ ▓ ▒ ░ ██ ██ ██ Streaming Event Trace
▓ ▓ ▓ ░░ ███████ █████ ██ VCD Recorder Utility
▓ ▓ ▒ ░ ██ ██ ██
▓▓▓▓▓ ░░░ ███████ ███████ ██ (c) 2025 D.Hoeglinger
"""
def human_readable_size(size, decimal_places=2):
for unit in ['B', 'KiB', 'MiB', 'GiB', 'TiB', 'PiB']:
if size < 1024.0 or unit == 'PiB':
break
size /= 1024.0
return f"{size:.{decimal_places}f} {unit}"
def is_hex(s):
return all(chr(c) in '0123456789abcdefABCDEF' for c in s)
def bsd_hash(data):
checksum = 0
for c in data:
checksum = (checksum >> 1) + ((checksum & 1) << 15)
checksum += ord(c)
checksum = checksum & 0xFFFF
return checksum
def hex_to_int(hex_digit):
if hex_digit.isdigit():
return int(hex_digit)
elif hex_digit in 'ABCDEF':
return ord(hex_digit) - ord('A') + 10
elif hex_digit in 'abcdef':
return ord(hex_digit) - ord('a') + 10
else:
raise ValueError("Invalid hexadecimal digit")
def decode_hexstr(payload):
value = bytearray()
for i in range(0, len(payload), 2):
b = hex_to_int(chr(payload[i]))
b |= hex_to_int(chr(payload[i + 1])) << 4
value.append(b)
return bytes(value)
def hexstr(payload):
return payload.hex().upper()
def decode_binstr(payload):
value = 0
for i,b in enumerate(payload):
value |= b << (8 * i)
return value
class DecodeError(Exception):
pass
def cobs_decode(enc, delim):
enc = list(enc)
enc.append(delim)
enc2 = enc[0:-1]
length = len(enc2)
if delim != 0x00:
for i in range(0, len(enc2)):
enc2[i] = enc[i] ^ delim
dec = []
code = 0xFF
block = 0
for i in range(0, length):
byte = enc2[i]
if block != 0:
dec.append(byte)
else:
if (i + byte) > len(enc):
raise DecodeError(f"Marker pointing to end of packet, at {i}, marker={byte}")
if code != 0xFF:
dec.append(0)
code = byte
block = code
if code == 0:
break
block = block - 1
return bytes(dec)
def _memmove(data: bytearray, stidx: int, offset: int, mlen: int) -> None:
for i in range(mlen):
data[stidx + i] = data[stidx - offset + i]
def fastlz_decompress_lv1(datain, doutlen):
opcode_0 = datain[0]
datain_idx = 1
dataout = bytearray(doutlen)
dataout_idx = 0;
while True:
op_type = opcode_0 >> 5
op_data = opcode_0 & 31
if op_type == 0b000:
# literal run
run = 1 + opcode_0
dataout[dataout_idx:dataout_idx + run] = datain[datain_idx:datain_idx + run]
datain_idx += run
dataout_idx += run
elif op_type == 0b111:
# long match
opcode_1 = datain[datain_idx]
datain_idx += 1
opcode_2 = datain[datain_idx]
datain_idx += 1
match_len = 9 + opcode_1
ofs = (op_data << 8) + opcode_2 + 1
_memmove(dataout, dataout_idx, ofs, match_len)
dataout_idx += match_len
else:
# short match
opcode_1 = datain[datain_idx]
datain_idx += 1
match_len = 2 + op_type
ofs = (op_data << 8) + opcode_1 + 1
_memmove(dataout, dataout_idx, ofs, match_len)
dataout_idx += match_len
if datain_idx < len(datain):
opcode_0 = datain[datain_idx]
datain_idx += 1
else:
break
return bytes(dataout[:dataout_idx])
def scan_for_signals(directory, predefined_signals):
library_files = ['pet.c', 'pet.h']
rx_events = re.compile(r'pet_evtrace\(\"([^\"]+)\"')
rx_scalars = re.compile(r'pet_([usf])(8|16|32)trace\(\"([^\"]+)\"')
rx_arrays = re.compile(r'pet_a([us])(8|16|32)trace\(\"([^\"]+)\"\,\s*[^,]+,\s*((?:0x)?[a-zA-Z0-9]+)')
rx_strings = re.compile(r'pet_strtrace\(\"([^\"]+)\"')
signals = {}
valid = True
def add_variable(tag, variable):
hash = bsd_hash(tag)
if hash in signals:
if signals[hash] != variable:
print(f"{file_path}: error: variable `{variable}` has conflicting hash with `{signals[hash]}`")
valid = False
else:
signals[hash] = variable
for predef_signal in predefined_signals:
sig,_ = predef_signal.split(":")
add_variable(sig, predef_signal)
for root, _, files in os.walk(directory):
for file in files:
if file.endswith(('.c', '.h')) and (file not in library_files):
file_path = os.path.join(root, file)
with open(file_path, 'r') as f:
content = f.read()
for match in rx_events.finditer(content):
tag = match.group(1)
variable = f"{tag}:event"
add_variable(tag, variable)
for match in rx_scalars.finditer(content):
tag = match.group(3)
variable = f"{tag}:{match.group(1)}{match.group(2)}"
add_variable(tag, variable)
for match in rx_arrays.finditer(content):
tag = match.group(3)
variable = f"{tag}:{match.group(1)}{match.group(2)}[{int(match.group(4),0)}]"
add_variable(tag, variable)
for match in rx_strings.finditer(content):
tag = match.group(1)
variable = f"{tag}:string"
add_variable(tag, variable)
return list(signals.values()), valid
class Filter:
PREAMBLE = b"\x1b[s"
EPILOUGE = b"\x1b[u"
TAGCODE_LUT = {
0x11 : "D1",
0x12 : "D2",
0x14 : "D4",
0x21 : "V1",
0x22 : "V2",
0x24 : "V4",
0x31 : "A1",
0x32 : "A2",
0x34 : "A4",
0x44 : "S4",
0x54 : "F4",
0x60 : "EV"
}
FLAGS_COMPRESSED = (1 << 0)
def __init__(self, on_value, on_noise):
self.preamble_i = 0
self.epilouge_i = 0
self.packet_buffer = []
self.noise_buffer = []
self.process_value = on_value
self.process_noise = on_noise
self.packet_counter = 0
self.packets_dropped = 0
def process(self, b):
if self.preamble_i == (len(self.PREAMBLE)):
self.packet_buffer.append(b)
if b == self.EPILOUGE[self.epilouge_i]:
self.epilouge_i += 1
else:
self.epilouge_i = 0
if self.epilouge_i == (len(self.EPILOUGE)):
self.process_packet(self.packet_buffer[:-len(self.EPILOUGE)])
self.packet_buffer = []
self.preamble_i = 0
self.epilouge_i = 0
else:
if b == self.PREAMBLE[self.preamble_i]:
self.preamble_i += 1
self.noise_buffer.append(b)
else:
self.preamble_i = 0
for nb in self.noise_buffer:
self.process_noise(nb)
self.noise_buffer = []
self.process_noise(b)
def disassemble_packet(self, packet):
try:
tagcode = packet[0]
# ignore rubout packets
if chr(tagcode) == ' ':
return
if tagcode not in self.TAGCODE_LUT:
self.packets_dropped += 1
return
tag = self.TAGCODE_LUT[tagcode]
value = None
offset = 1
match tag:
case "D1"|"V1"|"A1":
value = decode_binstr(packet[offset:offset+1])
offset += 1
case "D2"|"V2"|"A2":
value = decode_binstr(packet[offset:offset+2])
offset += 2
case "D4"|"V4"|"A4"|"F4"|"S4":
value = decode_binstr(packet[offset:offset+4])
offset += 4
sub = None
if tag[0] == 'A' or tag[0] == 'S':
sub = decode_binstr(packet[offset:offset+1])
offset += 1
hashtag = None
if tag[0] != 'D':
hashtag = decode_binstr(packet[-2:])
except Exception as ex:
self.packets_dropped += 1
return
self.process_value(hashtag, value, sub, tag)
self.packet_counter += 1
def disassemble_macropacket(self, packet):
flags = packet[0]
payload = packet[1:]
try:
macropack = cobs_decode(payload, 0x1B)
if self.FLAGS_COMPRESSED & flags:
frames = fastlz_decompress_lv1(macropack, 1024)
else:
frames = macropack
macropack_delim = 0x00
for pack in frames.split(bytes(bytearray([macropack_delim]))):
if len(pack) == 0:
continue
try:
decoded = cobs_decode(pack, macropack_delim)
self.disassemble_packet(decoded)
except:
self.packets_dropped += 1
except:
self.packets_dropped += 1
def process_packet(self, packet):
if len(packet) == 0:
return
if is_hex(packet):
packet = decode_hexstr(packet)
if True or (len(packet) > 3 + 8 + 3):
self.disassemble_macropacket(packet)
else:
self.disassemble_packet(packet)
class Retagger:
def __init__(self, on_value, tags=None):
tags = tags or []
self._tag_lut = {bsd_hash(tag):tag for tag in tags}
self.process_value = on_value
self.packets_dropped = 0
def process(self, hashtag, number, sub, datatag):
if hashtag in self._tag_lut or datatag[0] == 'D':
tag = self._tag_lut.get(hashtag, None)
self.process_value(tag, number, sub, datatag)
else:
self.packets_dropped += 1
class VcdSink:
def __init__(self, fs, signals, timescale='1 us', verbose_trace=False):
self.writer = VCDWriter(fs, timescale=timescale, date=datetime.datetime.now().isoformat(), version=f"PET v1.0")
self.skalars = {}
self.arrays = {}
self.strings = {}
self.varnames = {}
self.timestamp = 0
self.packets_dropped = 0
self.verbose_trace = verbose_trace
for v in signals:
hvar, vtype = v.split(":")
hier, _, name = hvar.rpartition(".")
arr = None
s = vtype.split("[")
if len(s) == 2:
vtype, arr = s
dsize = 32
dtype = 'integer'
match vtype:
case 'event':
dtype = 'event'
dsize = 32
case 'f32':
dtype = 'real'
dsize = 32
case 'u32'|'s32':
dtype = 'integer'
dsize = 32
case 'u16'|'s16':
dtype = 'integer'
dsize = 16
case 'u8'|'s8':
dtype = 'integer'
dsize = 8
case 'string':
dtype = 'string'
dsize = 8
self.varnames[hvar] = hvar
if arr is not None:
elems = int(arr.rstrip("]"))
vars = []
for i in range(0, elems):
vars.append(self.writer.register_var(hvar, f"{name}[{i}:{(i+1)}]", 'wire', size=dsize))
self.arrays[hvar] = vars
elif dtype == 'string':
self.strings[hvar] = [self.writer.register_var(hier, name, dtype, size=dsize), ""]
else:
self.skalars[hvar] = self.writer.register_var(hier, name, dtype, size=dsize)
def process(self, tag, value, sub, datatag):
if datatag[0] == 'D':
self.timestamp += value
# array values
elif datatag[0] == 'A':
timestamp = self.timestamp
try:
if self.verbose_trace:
print(f"### {timestamp:012} : {self.varnames[tag]}[{sub}] <= {value} [OK] ", flush=True)
self.writer.change(self.arrays[tag][sub], timestamp, value)
except ValueError:
print(f"### {timestamp:012} : {self.varnames[tag]}[{sub}] <= {value} [VAL_ERR] ", flush=True)
self.packets_dropped += 1
except writer.VCDPhaseError:
print(f"### {timestamp:012} : {self.varnames[tag]}[{sub}] <= {value} [PHA_ERR] ", flush=True)
self.packets_dropped += 1
except:
print(f"### {timestamp:012} : {self.varnames[tag]}[{sub}] <= {value} [ERR] ", flush=True)
self.packets_dropped += 1
elif datatag == 'S4':
timestamp = self.timestamp
# unpack
for i in range(0,4):
char = value >> (i*8) & 0xFF
if char != 0:
self.strings[tag][1] += chr(char)
# sub of 1 indicates end of string
if sub == 1:
try:
string = self.strings[tag][1]
if self.verbose_trace:
print(f"### {timestamp:012} : {self.varnames[tag]} <= \"{string}\"", flush=True)
self.writer.change(self.strings[tag][0], timestamp, string)
except ValueError:
print(f"### {timestamp:012} : {self.varnames[tag]} <= \"{self.strings[tag][1]}\" [VAL_ERR] ", flush=True)
self.packets_dropped += 1
except writer.VCDPhaseError:
print(f"### {timestamp:012} : {self.varnames[tag]} <= \"{self.strings[tag][1]}\" [PHA_ERR] ", flush=True)
self.packets_dropped += 1
except:
print(f"### {timestamp:012} : {self.varnames[tag]} <= \"{self.strings[tag][1]}\" [ERR] ", flush=True)
self.packets_dropped += 1
self.strings[tag][1] = ""
# skalar values
elif (datatag == 'EV') or (datatag[0] == 'V') or (datatag[0] == 'F'):
timestamp = self.timestamp
try:
if self.skalars[tag].type == VarType.event:
value = True
elif datatag == 'F4':
value = struct.unpack(">f", struct.pack(">L", value))[0]
if self.verbose_trace:
print(f"### {timestamp:012} : {self.varnames[tag]} <= {value:08X}", flush=True)
self.writer.change(self.skalars[tag], timestamp, value)
except ValueError:
print(f"### {timestamp:012} : {self.varnames[tag]} <= {value} [VAL_ERR] ", flush=True)
self.packets_dropped += 1
except writer.VCDPhaseError:
print(f"### {timestamp:012} : {self.varnames[tag]} <= {value} [PHA_ERR] ", flush=True)
self.packets_dropped += 1
except:
print(f"### {timestamp:012} : {self.varnames[tag]} <= {value} [ERR] ", flush=True)
self.packets_dropped += 1
def process_noise(noisefile, b):
print(chr(b), end="", flush=True)
if noisefile:
noisefile.write(b.to_bytes(1))
def main():
parser = argparse.ArgumentParser(description="scans stdin for PET packets and dumps the values into a VCD file")
parser.add_argument('-d', '--dump', type=str, required=True,
help='output IEEE 1364-2005 Value Change Dump (vcd) file')
parser.add_argument('-t', '--timescale', type=str, default="1 us",
help='period of one timestamp tick')
parser.add_argument('-n', '--noise', type=str, default=None,
help='store the stdin data sans packets in this file')
parser.add_argument('-s', '--source', type=str, required=True,
help='source tree to scan for trace marks')
parser.add_argument('--diagnostics', action=argparse.BooleanOptionalAction,
help='add additional signals tracing internal state of PET')
parser.add_argument('--trace', action=argparse.BooleanOptionalAction,
help='write out every trace that arrives')
args = parser.parse_args()
print(header)
tracefile = args.dump
noisefile = args.noise
source_tree = args.source
timescale = args.timescale
enable_diag = args.diagnostics
enable_verbose_trace = args.trace
predefined_signals = []
if enable_diag:
predefined_signals += [
'SET.BufferItems:u32',
'SET.BufferHealth:u8',
'SET.CompressionLevel:u8',
'SET.CompressionTime:u32',
'SET.RenderTime:u32',
'SET.ItemsSent:u32'
]
signals, signals_valid = scan_for_signals(source_tree, predefined_signals)
if not signals_valid:
return
signals.sort()
tags = [k.split(":")[0] for k in signals]
dfile = open(tracefile, 'w', encoding='utf-8')
process_noise_p = partial(process_noise, None)
nfile = None
if noisefile:
nfile = open(noisefile, 'wb')
process_noise_p = partial(process_noise, nfile)
vcd_sink = VcdSink(dfile, signals, timescale, enable_verbose_trace)
retagger = Retagger(vcd_sink.process, tags)
packet_filter = Filter(retagger.process, process_noise_p)
print("Signals:")
for var in signals:
print(f" - {var}")
print()
print(" === BEGIN NOISE ===")
try:
for bstr in sys.stdin.buffer:
for b in bstr:
packet_filter.process(b)
except KeyboardInterrupt:
pass
print()
print(" === END NOISE ===")
print()
vcd_sink.writer.close()
dfile.close()
if nfile:
nfile.close()
print("Summary:")
packet_count = packet_filter.packet_counter
drop_count = packet_filter.packets_dropped + vcd_sink.packets_dropped
trace_size = human_readable_size(os.path.getsize(tracefile))
print(f" - Packets received: {packet_count}")
print(f" - Packets dropped: {drop_count}")
print(f" - Trace file: {tracefile}")
print(f" - Trace size: {trace_size}")
if __name__ == '__main__':
main()