st/set_record.py
Dominic Höglinger a8c6119b56 set_record.py: Fixed input stream for Windows, omit null characters in string receive
This commit fixes two bugs, the first when recording
on Windows where the user is unable to stop the capture cleanly
and one regarding the VCD output being malformed due to null characters in strings.

Reading from the stdin buffer is generally better behaved on both platforms,
while not as performant. The user can now cancel the capture by terminating the source
program or issuing a keyboard interrupt.

The issue with string capture having trailing null characters is fixed,
which in the best case confuses GTKwave, or in the worst case segfaults it.
2025-05-14 20:44:31 +02:00

545 lines
18 KiB
Python

# -*- coding: utf-8 -*-
import sys
import os
import re
import struct
import argparse
import datetime
from functools import partial
from vcd import VCDWriter, writer
from vcd.common import VarType
header = """
▓▓▓▓▓ ░░░ ███████ ███████ ████████
▓ ▓ ▒ ░ ██ ██ ██ Streaming Event Trace
▓ ▓ ▓ ░░ ███████ █████ ██ VCD Recorder Utility
▓ ▓ ▒ ░ ██ ██ ██
▓▓▓▓▓ ░░░ ███████ ███████ ██ (c) 2025 D.Hoeglinger
"""
def human_readable_size(size, decimal_places=2):
for unit in ['B', 'KiB', 'MiB', 'GiB', 'TiB', 'PiB']:
if size < 1024.0 or unit == 'PiB':
break
size /= 1024.0
return f"{size:.{decimal_places}f} {unit}"
def is_hex(s):
return all(chr(c) in '0123456789abcdefABCDEF' for c in s)
def bsd_hash(data):
checksum = 0
for c in data:
checksum = (checksum >> 1) + ((checksum & 1) << 15)
checksum += ord(c)
checksum = checksum & 0xFFFF
return checksum
def hex_to_int(hex_digit):
if hex_digit.isdigit():
return int(hex_digit)
elif hex_digit in 'ABCDEF':
return ord(hex_digit) - ord('A') + 10
elif hex_digit in 'abcdef':
return ord(hex_digit) - ord('a') + 10
else:
raise ValueError("Invalid hexadecimal digit")
def decode_hexstr(payload):
value = bytearray()
for i in range(0, len(payload), 2):
b = hex_to_int(chr(payload[i]))
b |= hex_to_int(chr(payload[i + 1])) << 4
value.append(b)
return bytes(value)
def hexstr(payload):
return payload.hex().upper()
def decode_binstr(payload):
value = 0
for i,b in enumerate(payload):
value |= b << (8 * i)
return value
class DecodeError(Exception):
pass
def cobs_decode(enc, delim):
enc = list(enc)
enc.append(delim)
enc2 = enc[0:-1]
length = len(enc2)
if delim != 0x00:
for i in range(0, len(enc2)):
enc2[i] = enc[i] ^ delim
dec = []
code = 0xFF
block = 0
for i in range(0, length):
byte = enc2[i]
if block != 0:
dec.append(byte)
else:
if (i + byte) > len(enc):
raise DecodeError(f"Marker pointing to end of packet, at {i}, marker={byte}")
if code != 0xFF:
dec.append(0)
code = byte
block = code
if code == 0:
break
block = block - 1
return bytes(dec)
def _memmove(data: bytearray, stidx: int, offset: int, mlen: int) -> None:
for i in range(mlen):
data[stidx + i] = data[stidx - offset + i]
def fastlz_decompress_lv1(datain, doutlen):
opcode_0 = datain[0]
datain_idx = 1
dataout = bytearray(doutlen)
dataout_idx = 0;
while True:
op_type = opcode_0 >> 5
op_data = opcode_0 & 31
if op_type == 0b000:
# literal run
run = 1 + opcode_0
dataout[dataout_idx:dataout_idx + run] = datain[datain_idx:datain_idx + run]
datain_idx += run
dataout_idx += run
elif op_type == 0b111:
# long match
opcode_1 = datain[datain_idx]
datain_idx += 1
opcode_2 = datain[datain_idx]
datain_idx += 1
match_len = 9 + opcode_1
ofs = (op_data << 8) + opcode_2 + 1
_memmove(dataout, dataout_idx, ofs, match_len)
dataout_idx += match_len
else:
# short match
opcode_1 = datain[datain_idx]
datain_idx += 1
match_len = 2 + op_type
ofs = (op_data << 8) + opcode_1 + 1
_memmove(dataout, dataout_idx, ofs, match_len)
dataout_idx += match_len
if datain_idx < len(datain):
opcode_0 = datain[datain_idx]
datain_idx += 1
else:
break
return bytes(dataout[:dataout_idx])
def scan_for_signals(directory, predefined_signals):
library_files = ['pet.c', 'pet.h']
rx_events = re.compile(r'pet_evtrace\(\"([^\"]+)\"')
rx_scalars = re.compile(r'pet_([usf])(8|16|32)trace\(\"([^\"]+)\"')
rx_arrays = re.compile(r'pet_a([us])(8|16|32)trace\(\"([^\"]+)\"\,\s*[^,]+,\s*((?:0x)?[a-zA-Z0-9]+)')
rx_strings = re.compile(r'pet_strtrace\(\"([^\"]+)\"')
signals = {}
valid = True
def add_variable(tag, variable):
hash = bsd_hash(tag)
if hash in signals:
if signals[hash] != variable:
print(f"{file_path}: error: variable `{variable}` has conflicting hash with `{signals[hash]}`")
valid = False
else:
signals[hash] = variable
for predef_signal in predefined_signals:
sig,_ = predef_signal.split(":")
add_variable(sig, predef_signal)
for root, _, files in os.walk(directory):
for file in files:
if file.endswith(('.c', '.h')) and (file not in library_files):
file_path = os.path.join(root, file)
with open(file_path, 'r') as f:
content = f.read()
for match in rx_events.finditer(content):
tag = match.group(1)
variable = f"{tag}:event"
add_variable(tag, variable)
for match in rx_scalars.finditer(content):
tag = match.group(3)
variable = f"{tag}:{match.group(1)}{match.group(2)}"
add_variable(tag, variable)
for match in rx_arrays.finditer(content):
tag = match.group(3)
variable = f"{tag}:{match.group(1)}{match.group(2)}[{int(match.group(4),0)}]"
add_variable(tag, variable)
for match in rx_strings.finditer(content):
tag = match.group(1)
variable = f"{tag}:string"
add_variable(tag, variable)
return list(signals.values()), valid
class Filter:
PREAMBLE = b"\x1b[s"
EPILOUGE = b"\x1b[u"
TAGCODE_LUT = {
0x11 : "D1",
0x12 : "D2",
0x14 : "D4",
0x21 : "V1",
0x22 : "V2",
0x24 : "V4",
0x31 : "A1",
0x32 : "A2",
0x34 : "A4",
0x44 : "S4",
0x54 : "F4",
0x60 : "EV"
}
FLAGS_COMPRESSED = (1 << 0)
def __init__(self, on_value, on_noise):
self.preamble_i = 0
self.epilouge_i = 0
self.packet_buffer = []
self.noise_buffer = []
self.process_value = on_value
self.process_noise = on_noise
self.packet_counter = 0
self.packets_dropped = 0
def process(self, b):
if self.preamble_i == (len(self.PREAMBLE)):
self.packet_buffer.append(b)
if b == self.EPILOUGE[self.epilouge_i]:
self.epilouge_i += 1
else:
self.epilouge_i = 0
if self.epilouge_i == (len(self.EPILOUGE)):
self.process_packet(self.packet_buffer[:-len(self.EPILOUGE)])
self.packet_buffer = []
self.preamble_i = 0
self.epilouge_i = 0
else:
if b == self.PREAMBLE[self.preamble_i]:
self.preamble_i += 1
self.noise_buffer.append(b)
else:
self.preamble_i = 0
for nb in self.noise_buffer:
self.process_noise(nb)
self.noise_buffer = []
self.process_noise(b)
def disassemble_packet(self, packet):
try:
tagcode = packet[0]
# ignore rubout packets
if chr(tagcode) == ' ':
return
if tagcode not in self.TAGCODE_LUT:
self.packets_dropped += 1
return
tag = self.TAGCODE_LUT[tagcode]
value = None
offset = 1
match tag:
case "D1"|"V1"|"A1":
value = decode_binstr(packet[offset:offset+1])
offset += 1
case "D2"|"V2"|"A2":
value = decode_binstr(packet[offset:offset+2])
offset += 2
case "D4"|"V4"|"A4"|"F4"|"S4":
value = decode_binstr(packet[offset:offset+4])
offset += 4
sub = None
if tag[0] == 'A' or tag[0] == 'S':
sub = decode_binstr(packet[offset:offset+1])
offset += 1
hashtag = None
if tag[0] != 'D':
hashtag = decode_binstr(packet[-2:])
except Exception as ex:
self.packets_dropped += 1
return
self.process_value(hashtag, value, sub, tag)
self.packet_counter += 1
def disassemble_macropacket(self, packet):
flags = packet[0]
payload = packet[1:]
try:
macropack = cobs_decode(payload, 0x1B)
if self.FLAGS_COMPRESSED & flags:
frames = fastlz_decompress_lv1(macropack, 1024)
else:
frames = macropack
macropack_delim = 0x00
for pack in frames.split(bytes(bytearray([macropack_delim]))):
if len(pack) == 0:
continue
try:
decoded = cobs_decode(pack, macropack_delim)
self.disassemble_packet(decoded)
except:
self.packets_dropped += 1
except:
self.packets_dropped += 1
def process_packet(self, packet):
if len(packet) == 0:
return
if is_hex(packet):
packet = decode_hexstr(packet)
if True or (len(packet) > 3 + 8 + 3):
self.disassemble_macropacket(packet)
else:
self.disassemble_packet(packet)
class Retagger:
def __init__(self, on_value, tags=None):
tags = tags or []
self._tag_lut = {bsd_hash(tag):tag for tag in tags}
self.process_value = on_value
self.packets_dropped = 0
def process(self, hashtag, number, sub, datatag):
if hashtag in self._tag_lut or datatag[0] == 'D':
tag = self._tag_lut.get(hashtag, None)
self.process_value(tag, number, sub, datatag)
else:
self.packets_dropped += 1
class VcdSink:
def __init__(self, fs, signals, timescale='1 us'):
self.writer = VCDWriter(fs, timescale=timescale, date=datetime.datetime.now().isoformat(), version=f"PET v1.0")
self.skalars = {}
self.arrays = {}
self.strings = {}
self.varnames = {}
self.timestamp = 0
self.packets_dropped = 0
for v in signals:
hvar, vtype = v.split(":")
hier, _, name = hvar.rpartition(".")
arr = None
s = vtype.split("[")
if len(s) == 2:
vtype, arr = s
dsize = 32
dtype = 'integer'
match vtype:
case 'event':
dtype = 'event'
dsize = 32
case 'f32':
dtype = 'real'
dsize = 32
case 'u32'|'s32':
dtype = 'integer'
dsize = 32
case 'u16'|'s16':
dtype = 'integer'
dsize = 16
case 'u8'|'s8':
dtype = 'integer'
dsize = 8
case 'string':
dtype = 'string'
dsize = 8
self.varnames[hvar] = hvar
if arr is not None:
elems = int(arr.rstrip("]"))
vars = []
for i in range(0, elems):
vars.append(self.writer.register_var(hvar, f"{name}[{i}:{(i+1)}]", 'wire', size=dsize))
self.arrays[hvar] = vars
elif dtype == 'string':
self.strings[hvar] = [self.writer.register_var(hier, name, dtype, size=dsize), ""]
else:
self.skalars[hvar] = self.writer.register_var(hier, name, dtype, size=dsize)
def process(self, tag, value, sub, datatag):
if datatag[0] == 'D':
self.timestamp += value
# array values
elif datatag[0] == 'A':
timestamp = self.timestamp
try:
#print(f"### {timestamp:012X} : {self.varnames[tag]}[{sub}] <= {value} [OK] ", flush=True)
self.writer.change(self.arrays[tag][sub], timestamp, value)
except ValueError:
print(f"### {timestamp:012X} : {self.varnames[tag]}[{sub}] <= {value} [VAL_ERR] ", flush=True)
self.packets_dropped += 1
except writer.VCDPhaseError:
print(f"### {timestamp:012X} : {self.varnames[tag]}[{sub}] <= {value} [PHA_ERR] ", flush=True)
self.packets_dropped += 1
except:
print(f"### {timestamp:012X} : {self.varnames[tag]}[{sub}] <= {value} [ERR] ", flush=True)
self.packets_dropped += 1
elif datatag == 'S4':
timestamp = self.timestamp
# unpack
for i in range(0,4):
char = value >> (i*8) & 0xFF
if char != 0:
self.strings[tag][1] += chr(char)
# sub of 1 indicates end of string
if sub == 1:
try:
string = self.strings[tag][1]
#print(f"### {timestamp:012X} : {self.varnames[tag]} <= \"{self.strings[tag][1]}\"", flush=True)
self.writer.change(self.strings[tag][0], timestamp, self.strings[tag][1])
except ValueError:
print(f"### {timestamp:012X} : {self.varnames[tag]} <= \"{self.strings[tag][1]}\" [VAL_ERR] ", flush=True)
self.packets_dropped += 1
except writer.VCDPhaseError:
print(f"### {timestamp:012X} : {self.varnames[tag]} <= \"{self.strings[tag][1]}\" [PHA_ERR] ", flush=True)
self.packets_dropped += 1
except:
print(f"### {timestamp:012X} : {self.varnames[tag]} <= \"{self.strings[tag][1]}\" [ERR] ", flush=True)
self.packets_dropped += 1
self.strings[tag][1] = ""
# skalar values
elif (datatag == 'EV') or (datatag[0] == 'V') or (datatag[0] == 'F'):
timestamp = self.timestamp
try:
if self.skalars[tag].type == VarType.event:
value = True
elif datatag == 'F4':
value = struct.unpack(">f", struct.pack(">L", value))[0]
#print(f"### {timestamp:012X} : {self.varnames[tag]} <= {value:08X}", flush=True)
self.writer.change(self.skalars[tag], timestamp, value)
except ValueError:
print(f"### {timestamp:012X} : {self.varnames[tag]} <= {value} [VAL_ERR] ", flush=True)
self.packets_dropped += 1
except writer.VCDPhaseError:
print(f"### {timestamp:012X} : {self.varnames[tag]} <= {value} [PHA_ERR] ", flush=True)
self.packets_dropped += 1
except:
print(f"### {timestamp:012X} : {self.varnames[tag]} <= {value} [ERR] ", flush=True)
self.packets_dropped += 1
def process_noise(noisefile, b):
print(chr(b), end="", flush=True)
if noisefile:
noisefile.write(b.to_bytes(1))
def main():
parser = argparse.ArgumentParser(description="scans stdin for PET packets and dumps the values into a VCD file")
parser.add_argument('-d', '--dump', type=str, required=True,
help='output IEEE 1364-2005 Value Change Dump (vcd) file')
parser.add_argument('-t', '--timescale', type=str, default="1 us",
help='period of one timestamp tick')
parser.add_argument('-n', '--noise', type=str, default=None,
help='store the stdin data sans packets in this file')
parser.add_argument('-s', '--source', type=str, required=True,
help='source tree to scan for trace marks')
parser.add_argument('--diagnostics', action=argparse.BooleanOptionalAction,
help='add additional signals tracing internal state of PET')
args = parser.parse_args()
print(header)
tracefile = args.dump
noisefile = args.noise
source_tree = args.source
timescale = args.timescale
enable_diag = args.diagnostics
predefined_signals = []
if enable_diag:
predefined_signals += [
'PET.BufferItems:u32',
'PET.BufferHealth:u8',
'PET.CompressionLevel:u8',
'PET.CompressionTime:u32',
'PET.RenderTime:u32',
'PET.ItemsSent:u32'
]
signals, signals_valid = scan_for_signals(source_tree, predefined_signals)
if not signals_valid:
return
signals.sort()
tags = [k.split(":")[0] for k in signals]
dfile = open(tracefile, 'w', encoding='utf-8')
process_noise_p = partial(process_noise, None)
nfile = None
if noisefile:
nfile = open(noisefile, 'wb')
process_noise_p = partial(process_noise, nfile)
vcd_sink = VcdSink(dfile, signals, timescale)
retagger = Retagger(vcd_sink.process, tags)
packet_filter = Filter(retagger.process, process_noise_p)
print("Signals:")
for var in signals:
print(f" - {var}")
print()
print(" === BEGIN NOISE ===")
try:
for bstr in sys.stdin.buffer:
for b in bstr:
petf.process(b)
except KeyboardInterrupt:
pass
print()
print(" === END NOISE ===")
print()
vcd_sink.writer.close()
dfile.close()
if nfile:
nfile.close()
print("Summary:")
packet_count = packet_filter.packet_counter
drop_count = packet_filter.packets_dropped + vcd_sink.packets_dropped
trace_size = human_readable_size(os.path.getsize(tracefile))
print(f" - Packets received: {packet_count}")
print(f" - Packets dropped: {drop_count}")
print(f" - Trace file: {tracefile}")
print(f" - Trace size: {trace_size}")
if __name__ == '__main__':
main()