st/st_record.py

698 lines
24 KiB
Python

# -*- coding: utf-8 -*-
import sys
import os
import re
import struct
import argparse
import datetime
from functools import partial
from vcd import VCDWriter, writer
from vcd.common import VarType
header = """
▓▓▓▓▓ ░░░ ███████ ████████
▓ ▓ ▒ ░ ██ ██ Streaming Trace
▓ ▓ ▓ ░░ ███████ ██ VCD Recorder Utility
▓ ▓ ▒ ░ ██ ██
▓▓▓▓▓ ░░░ ███████ ██ (c) 2025 D.Hoeglinger
"""
def human_readable_size(size, decimal_places=2):
for unit in ['B', 'KiB', 'MiB', 'GiB', 'TiB', 'PiB']:
if size < 1024.0 or unit == 'PiB':
break
size /= 1024.0
return f"{size:.{decimal_places}f} {unit}"
def is_hex(s):
return all(chr(c) in '0123456789abcdefABCDEF' for c in s)
def bsd_hash(data):
checksum = 0
for c in data:
checksum = (checksum >> 1) + ((checksum & 1) << 15)
checksum += ord(c)
checksum = checksum & 0xFFFF
return checksum
def hex_to_int(hex_digit):
if hex_digit.isdigit():
return int(hex_digit)
elif hex_digit in 'ABCDEF':
return ord(hex_digit) - ord('A') + 10
elif hex_digit in 'abcdef':
return ord(hex_digit) - ord('a') + 10
else:
raise ValueError("Invalid hexadecimal digit")
def decode_hexstr(payload):
value = bytearray()
for i in range(0, len(payload), 2):
b = hex_to_int(chr(payload[i]))
b |= hex_to_int(chr(payload[i + 1])) << 4
value.append(b)
return bytes(value)
def hexstr(payload):
return payload.hex().upper()
def decode_binstr(payload):
value = 0
for i,b in enumerate(payload):
value |= b << (8 * i)
return value
class DecodeError(Exception):
pass
def cobs_decode(enc, delim):
enc = list(enc)
enc.append(delim)
enc2 = enc[0:-1]
length = len(enc2)
if delim != 0x00:
for i in range(0, len(enc2)):
enc2[i] = enc[i] ^ delim
dec = []
code = 0xFF
block = 0
for i in range(0, length):
byte = enc2[i]
if block != 0:
dec.append(byte)
else:
if (i + byte) > len(enc):
raise DecodeError(f"Marker pointing to end of packet, at {i}, marker={byte}")
if code != 0xFF:
dec.append(0)
code = byte
block = code
if code == 0:
break
block = block - 1
return bytes(dec)
def _memmove(data: bytearray, stidx: int, offset: int, mlen: int) -> None:
for i in range(mlen):
data[stidx + i] = data[stidx - offset + i]
def fastlz_decompress_lv1(datain, doutlen):
opcode_0 = datain[0]
datain_idx = 1
dataout = bytearray(doutlen)
dataout_idx = 0;
while True:
op_type = opcode_0 >> 5
op_data = opcode_0 & 31
if op_type == 0b000:
# literal run
run = 1 + opcode_0
dataout[dataout_idx:dataout_idx + run] = datain[datain_idx:datain_idx + run]
datain_idx += run
dataout_idx += run
elif op_type == 0b111:
# long match
opcode_1 = datain[datain_idx]
datain_idx += 1
opcode_2 = datain[datain_idx]
datain_idx += 1
match_len = 9 + opcode_1
ofs = (op_data << 8) + opcode_2 + 1
_memmove(dataout, dataout_idx, ofs, match_len)
dataout_idx += match_len
else:
# short match
opcode_1 = datain[datain_idx]
datain_idx += 1
match_len = 2 + op_type
ofs = (op_data << 8) + opcode_1 + 1
_memmove(dataout, dataout_idx, ofs, match_len)
dataout_idx += match_len
if datain_idx < len(datain):
opcode_0 = datain[datain_idx]
datain_idx += 1
else:
break
return bytes(dataout[:dataout_idx])
def scan_for_signals(directory, predefined_signals):
library_files = ['st.c', 'st.h']
rx_events = re.compile(r'st_evtrace\(\"([^\"]+)\"')
rx_scalars = re.compile(r'st_([usf])(8|16|32)trace\(\"([^\"]+)\"')
rx_arrays = re.compile(r'st_a([us])(8|16|32)trace\(\"([^\"]+)\"\,\s*[^,]+,\s*((?:0x)?[a-zA-Z0-9]+)')
rx_strings = re.compile(r'st_strtrace\(\"([^\"]+)\"')
signals = {}
valid = True
def add_variable(tag, variable):
hash = bsd_hash(tag)
if hash in signals:
if signals[hash] != variable:
print(f"{file_path}: error: variable `{variable}` has conflicting hash with `{signals[hash]}`")
valid = False
else:
signals[hash] = variable
for predef_signal in predefined_signals:
sig,_ = predef_signal.split(":")
add_variable(sig, predef_signal)
for root, _, files in os.walk(directory):
for file in files:
if file.endswith(('.c', '.h')) and (file not in library_files):
file_path = os.path.join(root, file)
with open(file_path, 'r') as f:
content = f.read()
for match in rx_events.finditer(content):
tag = match.group(1)
variable = f"{tag}:event"
add_variable(tag, variable)
for match in rx_scalars.finditer(content):
tag = match.group(3)
variable = f"{tag}:{match.group(1)}{match.group(2)}"
add_variable(tag, variable)
for match in rx_arrays.finditer(content):
tag = match.group(3)
variable = f"{tag}:{match.group(1)}{match.group(2)}[{int(match.group(4),0)}]"
add_variable(tag, variable)
for match in rx_strings.finditer(content):
tag = match.group(1)
variable = f"{tag}:string"
add_variable(tag, variable)
return list(signals.values()), valid
class Filter:
PREAMBLE = b"\x1b[s"
EPILOUGE = b"\x1b[u"
TAGCODE_LUT = {
0x11 : "D1",
0x12 : "D2",
0x14 : "D4",
0x21 : "V1",
0x22 : "V2",
0x24 : "V4",
0x31 : "A1",
0x32 : "A2",
0x34 : "A4",
0x44 : "S4",
0x54 : "F4",
0x60 : "EV"
}
FLAGS_COMPRESSED = (1 << 0)
def __init__(self, on_value):
self.preamble_i = 0
self.epilouge_i = 0
self.packet_buffer = []
self.noise_buffer = []
self.process_value = on_value
self._on_noise = []
self.packet_counter = 0
self.packets_dropped = 0
def onnoise(self, cb):
self._on_noise.append(cb)
def _emit_noise(self, b):
for cb in self._on_noise:
cb(b)
def process(self, b):
if self.preamble_i == (len(self.PREAMBLE)):
self.packet_buffer.append(b)
if b == self.EPILOUGE[self.epilouge_i]:
self.epilouge_i += 1
else:
self.epilouge_i = 0
if self.epilouge_i == (len(self.EPILOUGE)):
self.process_packet(self.packet_buffer[:-len(self.EPILOUGE)])
self.packet_buffer = []
self.noise_buffer = []
self.preamble_i = 0
self.epilouge_i = 0
else:
if b == self.PREAMBLE[self.preamble_i]:
self.preamble_i += 1
self.noise_buffer.append(b)
else:
self.preamble_i = 0
for nb in self.noise_buffer:
self._emit_noise(nb)
self.noise_buffer = []
self._emit_noise(b)
def disassemble_packet(self, packet):
try:
tagcode = packet[0]
# ignore rubout packets
if chr(tagcode) == ' ':
return
if tagcode not in self.TAGCODE_LUT:
self.packets_dropped += 1
return
tag = self.TAGCODE_LUT[tagcode]
value = None
offset = 1
match tag:
case "D1"|"V1"|"A1":
value = decode_binstr(packet[offset:offset+1])
offset += 1
case "D2"|"V2"|"A2":
value = decode_binstr(packet[offset:offset+2])
offset += 2
case "D4"|"V4"|"A4"|"F4"|"S4":
value = decode_binstr(packet[offset:offset+4])
offset += 4
sub = None
if tag[0] == 'A' or tag[0] == 'S':
sub = decode_binstr(packet[offset:offset+1])
offset += 1
hashtag = None
if tag[0] != 'D':
hashtag = decode_binstr(packet[-2:])
except Exception as ex:
self.packets_dropped += 1
return
self.process_value(hashtag, value, sub, tag)
self.packet_counter += 1
def disassemble_macropacket(self, packet):
flags = packet[0]
payload = packet[1:]
try:
macropack = cobs_decode(payload, 0x1B)
if self.FLAGS_COMPRESSED & flags:
frames = fastlz_decompress_lv1(macropack, 1024)
else:
frames = macropack
macropack_delim = 0x00
for pack in frames.split(bytes(bytearray([macropack_delim]))):
if len(pack) == 0:
continue
try:
decoded = cobs_decode(pack, macropack_delim)
self.disassemble_packet(decoded)
except:
self.packets_dropped += 1
except:
self.packets_dropped += 1
def process_packet(self, packet):
if len(packet) == 0:
return
if is_hex(packet):
packet = decode_hexstr(packet)
if True or (len(packet) > 3 + 8 + 3):
self.disassemble_macropacket(packet)
else:
self.disassemble_packet(packet)
class Retagger:
def __init__(self, on_value, tags=None):
tags = tags or []
self._tag_lut = {bsd_hash(tag):tag for tag in tags}
self.process_value = on_value
self.packets_dropped = 0
def process(self, hashtag, number, sub, datatag):
if hashtag in self._tag_lut or datatag[0] == 'D':
tag = self._tag_lut.get(hashtag, None)
self.process_value(tag, number, sub, datatag)
else:
self.packets_dropped += 1
class VcdSink:
def __init__(self, fs, signals, timescale='1 us'):
self.writer = VCDWriter(fs, timescale=timescale, date=datetime.datetime.now().isoformat(), version=f"ST v1.0.2")
self.skalars = {}
self.arrays = {}
self.strings = {}
self.varnames = {}
self._onvalues = {}
self._onanyvalues = []
self.timestamp = 0
self.packets_dropped = 0
for v in signals:
hvar, vtype = v.split(":")
hier, _, name = hvar.rpartition(".")
arr = None
s = vtype.split("[")
if len(s) == 2:
vtype, arr = s
dsize = 32
dtype = 'integer'
match vtype:
case 'event':
dtype = 'event'
dsize = 32
case 'f32':
dtype = 'real'
dsize = 32
case 'u32'|'s32':
dtype = 'integer'
dsize = 32
case 'u16'|'s16':
dtype = 'integer'
dsize = 16
case 'u8'|'s8':
dtype = 'integer'
dsize = 8
case 'string':
dtype = 'string'
dsize = 8
self.varnames[hvar] = hvar
if arr is not None:
elems = int(arr.rstrip("]"))
vars = []
for i in range(0, elems):
vars.append(self.writer.register_var(hvar, f"{name}[{i}:{(i+1)}]", 'wire', size=dsize))
self.arrays[hvar] = vars
elif dtype == 'string':
self.strings[hvar] = [self.writer.register_var(hier, name, dtype, size=dsize), ""]
else:
self.skalars[hvar] = self.writer.register_var(hier, name, dtype, size=dsize)
def onvalue(self, tag, cb):
self._onvalues[tag] = cb
def onanyvalue(self, cb):
self._onanyvalues.append(cb)
def _emit(self, timestamp, tag, value, sub=None):
for cb in self._onanyvalues:
cb(timestamp, tag, value, sub)
if tag in self._onvalues:
self._onvalues[tag](timestamp, value, sub)
def process(self, tag, value, sub, datatag):
if datatag[0] == 'D':
self.timestamp += value
# array values
elif datatag[0] == 'A':
timestamp = self.timestamp
try:
self.writer.change(self.arrays[tag][sub], timestamp, value)
except ValueError:
print(f"### {timestamp:012} : {self.varnames[tag]}[{sub}] <= {value} [VAL_ERR] ", flush=True)
self.packets_dropped += 1
except writer.VCDPhaseError:
print(f"### {timestamp:012} : {self.varnames[tag]}[{sub}] <= {value} [PHA_ERR] ", flush=True)
self.packets_dropped += 1
except:
print(f"### {timestamp:012} : {self.varnames[tag]}[{sub}] <= {value} [ERR] ", flush=True)
self.packets_dropped += 1
self._emit(timestamp, tag, value, sub)
elif datatag == 'S4':
timestamp = self.timestamp
# unpack
for i in range(0,4):
char = value >> (i*8) & 0xFF
if char != 0:
self.strings[tag][1] += chr(char)
# sub of 1 indicates end of string
if sub == 1:
try:
string = self.strings[tag][1]
self.writer.change(self.strings[tag][0], timestamp, string)
except ValueError:
print(f"### {timestamp:012} : {self.varnames[tag]} <= \"{self.strings[tag][1]}\" [VAL_ERR] ", flush=True)
self.packets_dropped += 1
except writer.VCDPhaseError:
print(f"### {timestamp:012} : {self.varnames[tag]} <= \"{self.strings[tag][1]}\" [PHA_ERR] ", flush=True)
self.packets_dropped += 1
except:
print(f"### {timestamp:012} : {self.varnames[tag]} <= \"{self.strings[tag][1]}\" [ERR] ", flush=True)
self.packets_dropped += 1
self.strings[tag][1] = ""
self._emit(timestamp, tag, string, None)
# skalar values
elif (datatag == 'EV') or (datatag[0] == 'V') or (datatag[0] == 'F'):
timestamp = self.timestamp
try:
if self.skalars[tag].type == VarType.event:
value = True
elif datatag == 'F4':
value = struct.unpack(">f", struct.pack(">L", value))[0]
self.writer.change(self.skalars[tag], timestamp, value)
except ValueError:
print(f"### {timestamp:012} : {self.varnames[tag]} <= {value} [VAL_ERR] ", flush=True)
self.packets_dropped += 1
except writer.VCDPhaseError:
print(f"### {timestamp:012} : {self.varnames[tag]} <= {value} [PHA_ERR] ", flush=True)
self.packets_dropped += 1
except:
print(f"### {timestamp:012} : {self.varnames[tag]} <= {value} [ERR] ", flush=True)
self.packets_dropped += 1
self._emit(timestamp, tag, value, None)
def main():
parser = argparse.ArgumentParser(description="scans stdin for ST packets and dumps the values into a VCD file")
parser.add_argument('-d', '--dump', type=str, required=True,
help='output IEEE 1364-2005 Value Change Dump (vcd) file')
parser.add_argument('-t', '--timescale', type=str, default="1 us",
help='period of one timestamp tick')
parser.add_argument('-n', '--noise', type=str, default=None,
help='store the stdin data sans packets in this file')
parser.add_argument('-s', '--source', type=str, required=True,
help='source tree to scan for trace marks')
parser.add_argument('--diagnostics', action=argparse.BooleanOptionalAction,
help='add additional signals tracing internal state of ST')
parser.add_argument('--trace', action=argparse.BooleanOptionalAction,
help='write out every trace that arrives')
parser.add_argument('--tui', action=argparse.BooleanOptionalAction,
help='enable TUI mode')
args = parser.parse_args()
print(header)
tracefile = args.dump
noisefile = args.noise
source_tree = args.source
timescale = args.timescale
enable_diag = args.diagnostics
enable_verbose_trace = args.trace
enable_tui = args.tui
predefined_signals = []
if enable_diag:
predefined_signals += [
'ST.BufferItems:u32',
'ST.BufferHealth:u8',
'ST.CompressionLevel:u8',
'ST.CompressionTime:u32',
'ST.RenderTime:u32',
'ST.ItemsSent:u32'
]
signals, signals_valid = scan_for_signals(source_tree, predefined_signals)
if not signals_valid:
return
signals.sort()
tags = [k.split(":")[0] for k in signals]
dfile = open(tracefile, 'w', encoding='utf-8')
vcd_sink = VcdSink(dfile, signals, timescale)
retagger = Retagger(vcd_sink.process, tags)
packet_filter = Filter(retagger.process)
nfile = None
if noisefile:
nfile = open(noisefile, 'wb')
packet_filter.onnoise(lambda b: nfile.write(b.to_bytes(1)))
print("Signals:")
for var in signals:
print(f" - {var}")
print()
if enable_tui:
tui_record(signals, packet_filter, vcd_sink, enable_verbose_trace)
else:
record(packet_filter, vcd_sink, enable_verbose_trace)
vcd_sink.writer.close()
dfile.close()
if nfile:
nfile.close()
print("Summary:")
packet_count = packet_filter.packet_counter
drop_count = packet_filter.packets_dropped + vcd_sink.packets_dropped
trace_size = human_readable_size(os.path.getsize(tracefile))
print(f" - Packets received: {packet_count}")
print(f" - Packets dropped: {drop_count}")
print(f" - Trace file: {tracefile}")
print(f" - Trace size: {trace_size}")
def record(packet_filter, vcd_sink, enable_verbose_trace):
packet_filter.onnoise(lambda b: print(chr(b), end="", flush=True))
def onval(timestamp, tag, value, sub):
if sub is not None:
print(f"### {timestamp:012} : {tag}[{sub}] <= {value}")
else:
print(f"### {timestamp:012} : {tag} <= {value}")
if enable_verbose_trace:
vcd_sink.onanyvalue(onval)
print(" === BEGIN NOISE ===")
try:
for bstr in sys.stdin.buffer:
for b in bstr:
packet_filter.process(b)
except KeyboardInterrupt:
pass
print()
print(" === END NOISE ===")
print()
class NoiseLineBuffer:
def __init__(self, online):
self.buffer = ""
self.online = online
def add(self, b):
t = chr(b)
if t == "\n":
self.online(self.buffer)
self.buffer = ""
else:
self.buffer += t
class TotalMaximumProgressUpdater:
def __init__(self, progress, progress_task):
self.progress = progress
self.progress_task = progress_task
self.maximum = 0
def update(self, value):
if value > self.maximum:
self.maximum = value
self.progress.update(self.progress_task, total=self.maximum)
self.progress.update(self.progress_task, completed=value, visible=True)
def tui_record(signals, packet_filter, vcd_sink, enable_verbose_trace):
try:
from rich.console import Console
from rich.text import Text
from rich.layout import Layout
from rich.progress import Progress, TextColumn, BarColumn, TaskProgressColumn, MofNCompleteColumn
from rich.live import Live
from rich.align import Align
from rich.table import Table
except:
print("error: TUI mode requires the rich package")
exit()
if enable_verbose_trace:
print("warning: verbose trace is not avaialble in TUI mode")
print()
console = Console()
noise_buffer = NoiseLineBuffer(lambda text: console.print(f"[blue]{text}"))
trace_text = Text("")
# set up progress bars
progress_colums = [
TextColumn("[progress.description]{task.description}"),
BarColumn(bar_width=80, complete_style="gold3", finished_style="red"),
TaskProgressColumn(),
MofNCompleteColumn()
]
diag_progress = Progress(*progress_colums, transient=True, auto_refresh=False, refresh_per_second=1)
buffer_health = diag_progress.add_task("[green]Buffer Health", total=255, visible=False)
buffer_items = diag_progress.add_task("[green]Buffer Items", visible=False)
items_sent = diag_progress.add_task("[blue]Items Sent", total=1024, visible=False)
render_time = diag_progress.add_task("[blue]Render Time", total=1024, visible=False)
comp_lvl = diag_progress.add_task("[yellow]Compression Level", total=100, visible=False)
comp_time = diag_progress.add_task("[yellow]Compression Time", total=100, visible=False)
buffer_items_tm = TotalMaximumProgressUpdater(diag_progress, buffer_items)
items_sent_tm = TotalMaximumProgressUpdater(diag_progress, items_sent)
render_time_tm = TotalMaximumProgressUpdater(diag_progress, render_time)
comp_time_tm = TotalMaximumProgressUpdater(diag_progress, comp_time)
# set up table layout and signal view
signal_values = {name.split(":")[0]:None for name in signals}
def on_any_value(time,signal,value,sub):
signal_values[signal].plain(value)
vcd_sink.onanyvalue(on_any_value)
def generate_table(diag_progress, signal_values):
grid = Table.grid(expand=True)
grid.add_column(justify="left")
grid.add_column(justify="left")
grid.add_column(justify="left")
grid.add_row(None) # this empty row is there to not leave behind a render on interrupt
for sig in signals:
name, sigtype = sig.split(":")
value = signal_values[name]
text = Text("X" if value is None else value, style="red" if value is None else "green")
grid.add_row(sigtype, name, text)
grid.add_row("Diagnostics", diag_progress)
return grid
with Live(console=console, transient=True) as live_status:
vcd_sink.onvalue("ST.BufferHealth", lambda _,value,sub: diag_progress.update(buffer_health, completed=value, visible=True))
vcd_sink.onvalue("ST.BufferItems", lambda _,value,sub: buffer_items_tm.update(value))
vcd_sink.onvalue("ST.ItemsSent", lambda _,value,sub: items_sent_tm.update(value))
vcd_sink.onvalue("ST.CompressionLevel", lambda _,value,sub: diag_progress.update(comp_lvl, completed=value, visible=True))
vcd_sink.onvalue("ST.CompressionTime", lambda _,value,sub: comp_time_tm.update(value))
vcd_sink.onvalue("ST.RenderTime", lambda _,value,sub: render_time_tm.update(value))
packet_filter.onnoise(noise_buffer.add)
try:
for bstr in sys.stdin.buffer:
for b in bstr:
packet_filter.process(b)
live_status.update(generate_table(diag_progress, signal_values))
except KeyboardInterrupt:
diag_progress.stop()
print()
if __name__ == '__main__':
main()