st/st_record.py

776 lines
27 KiB
Python

# -*- coding: utf-8 -*-
import sys
import os
import re
import struct
import argparse
import datetime
from functools import partial
from vcd import VCDWriter, writer
from vcd.common import VarType
header = """
▓▓▓▓▓ ░░░ ███████ ████████
▓ ▓ ▒ ░ ██ ██ Streaming Trace
▓ ▓ ▓ ░░ ███████ ██ VCD Recorder Utility
▓ ▓ ▒ ░ ██ ██
▓▓▓▓▓ ░░░ ███████ ██ Version 1.0.16
"""
def human_readable_size(size, decimal_places=2):
for unit in ['B', 'KiB', 'MiB', 'GiB', 'TiB', 'PiB']:
if size < 1024.0 or unit == 'PiB':
break
size /= 1024.0
return f"{size:.{decimal_places}f} {unit}"
def parse_time(timestring):
prefixes = {
'p':10e-12,
'n':10e-9,
'u':10e-6,
'm':10e-3,
}
for prefix,factor in prefixes.items():
unit = prefix+'s'
# found unit in string
if unit in timestring:
# remove unit from string, strip any whitespace before it and parse as float
time = float(timestring.split(unit)[0].rstrip(" "))
return time*factor
# no unit attached, assuming seconds
return float(timestring)
def is_hex(s):
return all(chr(c) in '0123456789abcdefABCDEF' for c in s)
def bsd_hash(data):
checksum = 0
for c in data:
checksum = (checksum >> 1) + ((checksum & 1) << 15)
checksum += ord(c)
checksum = checksum & 0xFFFF
return checksum
def hex_to_int(hex_digit):
if hex_digit.isdigit():
return int(hex_digit)
elif hex_digit in 'ABCDEF':
return ord(hex_digit) - ord('A') + 10
elif hex_digit in 'abcdef':
return ord(hex_digit) - ord('a') + 10
else:
raise ValueError("Invalid hexadecimal digit")
def decode_hexstr(payload):
value = bytearray()
for i in range(0, len(payload), 2):
b = hex_to_int(chr(payload[i]))
b |= hex_to_int(chr(payload[i + 1])) << 4
value.append(b)
return bytes(value)
def hexstr(payload):
return payload.hex().upper()
def decode_binstr(payload):
value = 0
for i,b in enumerate(payload):
value |= b << (8 * i)
return value
class DecodeError(Exception):
pass
def cobs_decode(enc, delim):
enc = list(enc)
enc.append(delim)
enc2 = enc[0:-1]
length = len(enc2)
if delim != 0x00:
for i in range(0, len(enc2)):
enc2[i] = enc[i] ^ delim
dec = []
code = 0xFF
block = 0
for i in range(0, length):
byte = enc2[i]
if block != 0:
dec.append(byte)
else:
if (i + byte) > len(enc):
raise DecodeError(f"Marker pointing to end of packet, at {i}, marker={byte}")
if code != 0xFF:
dec.append(0)
code = byte
block = code
if code == 0:
break
block = block - 1
return bytes(dec)
def _memmove(data: bytearray, stidx: int, offset: int, mlen: int) -> None:
for i in range(mlen):
data[stidx + i] = data[stidx - offset + i]
def fastlz_decompress_lv1(datain, doutlen):
opcode_0 = datain[0]
datain_idx = 1
dataout = bytearray(doutlen)
dataout_idx = 0;
while True:
op_type = opcode_0 >> 5
op_data = opcode_0 & 31
if op_type == 0b000:
# literal run
run = 1 + opcode_0
dataout[dataout_idx:dataout_idx + run] = datain[datain_idx:datain_idx + run]
datain_idx += run
dataout_idx += run
elif op_type == 0b111:
# long match
opcode_1 = datain[datain_idx]
datain_idx += 1
opcode_2 = datain[datain_idx]
datain_idx += 1
match_len = 9 + opcode_1
ofs = (op_data << 8) + opcode_2 + 1
_memmove(dataout, dataout_idx, ofs, match_len)
dataout_idx += match_len
else:
# short match
opcode_1 = datain[datain_idx]
datain_idx += 1
match_len = 2 + op_type
ofs = (op_data << 8) + opcode_1 + 1
_memmove(dataout, dataout_idx, ofs, match_len)
dataout_idx += match_len
if datain_idx < len(datain):
opcode_0 = datain[datain_idx]
datain_idx += 1
else:
break
return bytes(dataout[:dataout_idx])
def scan_for_signals(directory, predefined_signals):
library_files = ['st.c', 'st.h']
rx_events = re.compile(r'st_evtrace\(\"([^\"]+)\"')
rx_scalars = re.compile(r'st_([usf])(8|16|32)trace\(\"([^\"]+)\"')
rx_arrays = re.compile(r'st_a([us])(8|16|32)trace\(\"([^\"]+)\"\,\s*[^,]+,\s*((?:0x)?[a-zA-Z0-9]+)')
rx_strings = re.compile(r'st_strtrace\(\"([^\"]+)\"')
signals = {}
valid = True
def add_variable(tag, variable):
hash = bsd_hash(tag)
if hash in signals:
if signals[hash] != variable:
print(f"{file_path}: error: variable `{variable}` has conflicting hash with `{signals[hash]}`")
valid = False
else:
signals[hash] = variable
for predef_signal in predefined_signals:
sig,_ = predef_signal.split(":")
add_variable(sig, predef_signal)
for root, _, files in os.walk(directory):
for file in files:
if file.endswith(('.c', '.h')) and (file not in library_files):
file_path = os.path.join(root, file)
with open(file_path, 'r') as f:
content = f.read()
for match in rx_events.finditer(content):
tag = match.group(1)
variable = f"{tag}:event"
add_variable(tag, variable)
for match in rx_scalars.finditer(content):
tag = match.group(3)
variable = f"{tag}:{match.group(1)}{match.group(2)}"
add_variable(tag, variable)
for match in rx_arrays.finditer(content):
tag = match.group(3)
variable = f"{tag}:{match.group(1)}{match.group(2)}[{int(match.group(4),0)}]"
add_variable(tag, variable)
for match in rx_strings.finditer(content):
tag = match.group(1)
variable = f"{tag}:string"
add_variable(tag, variable)
return list(signals.values()), valid
class Filter:
PREAMBLE = b"\x1b[s"
EPILOUGE = b"\x1b[u"
TAGCODE_LUT = {
0x11 : "D1",
0x12 : "D2",
0x14 : "D4",
0x21 : "V1",
0x22 : "V2",
0x24 : "V4",
0x31 : "A1",
0x32 : "A2",
0x34 : "A4",
0x44 : "S4",
0x54 : "F4",
0x60 : "EV"
}
FLAGS_COMPRESSED = (1 << 0)
def __init__(self, on_value):
self.preamble_i = 0
self.epilouge_i = 0
self.packet_buffer = []
self.noise_buffer = []
self.process_value = on_value
self._on_noise = []
self.packet_counter = 0
self.packets_dropped = 0
def onnoise(self, cb):
self._on_noise.append(cb)
def _emit_noise(self, b):
for cb in self._on_noise:
cb(b)
def process(self, b):
if self.preamble_i == (len(self.PREAMBLE)):
self.packet_buffer.append(b)
if b == self.EPILOUGE[self.epilouge_i]:
self.epilouge_i += 1
else:
self.epilouge_i = 0
if self.epilouge_i == (len(self.EPILOUGE)):
self.process_packet(self.packet_buffer[:-len(self.EPILOUGE)])
self.packet_buffer = []
self.noise_buffer = []
self.preamble_i = 0
self.epilouge_i = 0
else:
if b == self.PREAMBLE[self.preamble_i]:
self.preamble_i += 1
self.noise_buffer.append(b)
else:
self.preamble_i = 0
for nb in self.noise_buffer:
self._emit_noise(nb)
self.noise_buffer = []
self._emit_noise(b)
def disassemble_packet(self, packet):
try:
tagcode = packet[0]
# ignore rubout packets
if chr(tagcode) == ' ':
return
if tagcode not in self.TAGCODE_LUT:
self.packets_dropped += 1
return
tag = self.TAGCODE_LUT[tagcode]
value = None
offset = 1
match tag:
case "D1"|"V1"|"A1":
value = decode_binstr(packet[offset:offset+1])
offset += 1
case "D2"|"V2"|"A2":
value = decode_binstr(packet[offset:offset+2])
offset += 2
case "D4"|"V4"|"A4"|"F4"|"S4":
value = decode_binstr(packet[offset:offset+4])
offset += 4
sub = None
if tag[0] == 'A' or tag[0] == 'S':
sub = decode_binstr(packet[offset:offset+1])
offset += 1
hashtag = None
if tag[0] != 'D':
hashtag = decode_binstr(packet[-2:])
except Exception as ex:
self.packets_dropped += 1
return
self.process_value(hashtag, value, sub, tag)
self.packet_counter += 1
def disassemble_macropacket(self, packet):
flags = packet[0]
payload = packet[1:]
try:
macropack = cobs_decode(payload, 0x1B)
if self.FLAGS_COMPRESSED & flags:
frames = fastlz_decompress_lv1(macropack, 1024)
else:
frames = macropack
macropack_delim = 0x00
for pack in frames.split(bytes(bytearray([macropack_delim]))):
if len(pack) == 0:
continue
try:
decoded = cobs_decode(pack, macropack_delim)
self.disassemble_packet(decoded)
except:
self.packets_dropped += 1
except:
self.packets_dropped += 1
def process_packet(self, packet):
if len(packet) == 0:
return
if is_hex(packet):
packet = decode_hexstr(packet)
if True or (len(packet) > 3 + 8 + 3):
self.disassemble_macropacket(packet)
else:
self.disassemble_packet(packet)
class Retagger:
def __init__(self, on_value, tags=None):
tags = tags or []
self._tag_lut = {bsd_hash(tag):tag for tag in tags}
self.process_value = on_value
self.packets_dropped = 0
def process(self, hashtag, number, sub, datatag):
if hashtag in self._tag_lut or datatag[0] == 'D':
tag = self._tag_lut.get(hashtag, None)
self.process_value(tag, number, sub, datatag)
else:
self.packets_dropped += 1
class VcdSink:
def __init__(self, fs, signals, timescale='1 us'):
self.writer = VCDWriter(fs, timescale=timescale, date=datetime.datetime.now().isoformat(), version=f"ST v1.0.2")
self.skalars = {}
self.arrays = {}
self.strings = {}
self.varnames = {}
self._onvalues = {}
self._onanyvalues = []
self.timestamp = 0
self.packets_dropped = 0
for v in signals:
hvar, vtype = v.split(":")
hier, _, name = hvar.rpartition(".")
arr = None
s = vtype.split("[")
if len(s) == 2:
vtype, arr = s
dsize = 32
dtype = 'integer'
match vtype:
case 'event':
dtype = 'event'
dsize = 32
case 'f32':
dtype = 'real'
dsize = 32
case 'u32'|'s32':
dtype = 'integer'
dsize = 32
case 'u16'|'s16':
dtype = 'integer'
dsize = 16
case 'u8'|'s8':
dtype = 'integer'
dsize = 8
case 'string':
dtype = 'string'
dsize = 8
self.varnames[hvar] = hvar
if arr is not None:
elems = int(arr.rstrip("]"))
vars = []
for i in range(0, elems):
vars.append(self.writer.register_var(hvar, f"{name}[{i}:{(i+1)}]", 'wire', size=dsize))
self.arrays[hvar] = vars
elif dtype == 'string':
self.strings[hvar] = [self.writer.register_var(hier, name, dtype, size=dsize), ""]
else:
self.skalars[hvar] = self.writer.register_var(hier, name, dtype, size=dsize)
def onvalue(self, tag, cb):
self._onvalues[tag] = cb
def onanyvalue(self, cb):
self._onanyvalues.append(cb)
def _emit(self, timestamp, tag, value, sub=None):
for cb in self._onanyvalues:
cb(timestamp, tag, value, sub)
if tag in self._onvalues:
self._onvalues[tag](timestamp, value, sub)
def process(self, tag, value, sub, datatag):
if datatag[0] == 'D':
self.timestamp += value
# array values
elif datatag[0] == 'A':
timestamp = self.timestamp
try:
self.writer.change(self.arrays[tag][sub], timestamp, value)
except ValueError:
print(f"### {timestamp:012} : {self.varnames[tag]}[{sub}] <= {value} [VAL_ERR] ", flush=True)
self.packets_dropped += 1
except writer.VCDPhaseError:
print(f"### {timestamp:012} : {self.varnames[tag]}[{sub}] <= {value} [PHA_ERR] ", flush=True)
self.packets_dropped += 1
except:
print(f"### {timestamp:012} : {self.varnames[tag]}[{sub}] <= {value} [ERR] ", flush=True)
self.packets_dropped += 1
self._emit(timestamp, tag, value, sub)
elif datatag == 'S4':
timestamp = self.timestamp
# unpack
for i in range(0,4):
char = value >> (i*8) & 0xFF
if char != 0:
self.strings[tag][1] += chr(char)
# sub of 1 indicates end of string
if sub == 1:
try:
string = self.strings[tag][1]
self.writer.change(self.strings[tag][0], timestamp, string)
except ValueError:
print(f"### {timestamp:012} : {self.varnames[tag]} <= \"{self.strings[tag][1]}\" [VAL_ERR] ", flush=True)
self.packets_dropped += 1
except writer.VCDPhaseError:
print(f"### {timestamp:012} : {self.varnames[tag]} <= \"{self.strings[tag][1]}\" [PHA_ERR] ", flush=True)
self.packets_dropped += 1
except:
print(f"### {timestamp:012} : {self.varnames[tag]} <= \"{self.strings[tag][1]}\" [ERR] ", flush=True)
self.packets_dropped += 1
self.strings[tag][1] = ""
self._emit(timestamp, tag, string, None)
# skalar values
elif (datatag == 'EV') or (datatag[0] == 'V') or (datatag[0] == 'F'):
timestamp = self.timestamp
try:
if self.skalars[tag].type == VarType.event:
value = True
elif datatag == 'F4':
value = struct.unpack(">f", struct.pack(">L", value))[0]
self.writer.change(self.skalars[tag], timestamp, value)
except ValueError:
print(f"### {timestamp:012} : {self.varnames[tag]} <= {value} [VAL_ERR] ", flush=True)
self.packets_dropped += 1
except writer.VCDPhaseError:
print(f"### {timestamp:012} : {self.varnames[tag]} <= {value} [PHA_ERR] ", flush=True)
self.packets_dropped += 1
except:
print(f"### {timestamp:012} : {self.varnames[tag]} <= {value} [ERR] ", flush=True)
self.packets_dropped += 1
self._emit(timestamp, tag, value, None)
def main():
parser = argparse.ArgumentParser(description="scans stdin for ST packets and dumps the values into a VCD file")
parser.add_argument('-d', '--dump', type=str, required=True,
help='output IEEE 1364-2005 Value Change Dump (vcd) file')
parser.add_argument('-t', '--timescale', type=str, default="1 us",
help='period of one timestamp tick')
parser.add_argument('-n', '--noise', type=str, default=None,
help='store the stdin data sans packets in this file')
parser.add_argument('-s', '--source', type=str, required=True,
help='source tree to scan for trace marks')
parser.add_argument('--diagnostics', action=argparse.BooleanOptionalAction,
help='add additional signals tracing internal state of ST')
parser.add_argument('--trace', action=argparse.BooleanOptionalAction,
help='write out every trace that arrives')
parser.add_argument('--tui', action=argparse.BooleanOptionalAction,
help='enable TUI mode')
args = parser.parse_args()
print(header)
tracefile = args.dump
noisefile = args.noise
source_tree = args.source
timescale = args.timescale
enable_diag = args.diagnostics
enable_verbose_trace = args.trace
enable_tui = args.tui
predefined_signals = []
if enable_diag:
predefined_signals += [
'ST.BufferItems:u32',
'ST.BufferHealth:u8',
'ST.CompressionLevel:u8',
'ST.CompressionTime:u32',
'ST.RenderTime:u32',
'ST.ItemsSent:u32'
]
signals, signals_valid = scan_for_signals(source_tree, predefined_signals)
if not signals_valid:
return
signals.sort()
tags = [k.split(":")[0] for k in signals]
dfile = open(tracefile, 'w', encoding='utf-8')
vcd_sink = VcdSink(dfile, signals, timescale)
retagger = Retagger(vcd_sink.process, tags)
packet_filter = Filter(retagger.process)
nfile = None
if noisefile:
nfile = open(noisefile, 'wb')
packet_filter.onnoise(lambda b: nfile.write(b.to_bytes(1)))
# remove diagnostic signal from view
signals = [s for s in signals if s not in predefined_signals]
# convert timescale to seconds
timescale = parse_time(timescale)
# start recording
if enable_tui:
if enable_verbose_trace:
print("warning: verbose trace is not avaialble in TUI mode")
print()
tui_record(signals, packet_filter, vcd_sink, timescale)
else:
record(signals, packet_filter, vcd_sink, enable_verbose_trace)
vcd_sink.writer.close()
dfile.close()
if nfile:
nfile.close()
print("Summary:")
packet_count = packet_filter.packet_counter
drop_count = packet_filter.packets_dropped + vcd_sink.packets_dropped
trace_size = human_readable_size(os.path.getsize(tracefile))
print(f" - Packets received: {packet_count}")
print(f" - Packets dropped: {drop_count}")
print(f" - Trace file: {tracefile}")
print(f" - Trace size: {trace_size}")
def record(signals, packet_filter, vcd_sink, enable_verbose_trace):
print("Signals:")
for var in signals:
print(f" - {var}")
print()
packet_filter.onnoise(lambda b: print(chr(b), end="", flush=True))
def onval(timestamp, tag, value, sub):
if sub is not None:
print(f"### {timestamp:012} : {tag}[{sub}] <= {value}")
else:
print(f"### {timestamp:012} : {tag} <= {value}")
if enable_verbose_trace:
vcd_sink.onanyvalue(onval)
print(" === BEGIN NOISE ===")
try:
for bstr in sys.stdin.buffer:
for b in bstr:
packet_filter.process(b)
except KeyboardInterrupt:
pass
print()
print(" === END NOISE ===")
print()
class NoiseLineBuffer:
def __init__(self, online):
self.buffer = ""
self.online = online
def add(self, b):
t = chr(b)
if t == "\n":
self.online(self.buffer)
self.buffer = ""
else:
self.buffer += t
class TotalMaximumProgressUpdater:
def __init__(self, progress, progress_task):
self.progress = progress
self.progress_task = progress_task
self.maximum = 0
def update(self, value):
if value > self.maximum:
self.maximum = value
self.progress.update(self.progress_task, total=self.maximum)
self.progress.update(self.progress_task, completed=value, visible=True)
class SignalTable:
def __init__(self, signals, update_interval=1000000):
self.values = {}
self.arraycache = {}
self.types = {}
self.lastupdate = {}
self.update_interval = update_interval
for signal in signals:
name,value_type = signal.split(":")
self.values[name] = None
if len(s := value_type.split("[")) > 1:
value_type = s[0]
array_len = int(s[1].split("]")[0], 0)
self.arraycache[name] = [None] * array_len
self.types[name] = value_type
self.lastupdate[name] = 0
def update(self, time, signal, value, sub):
if signal in self.values:
not_enough_time_passed = (time - self.lastupdate[signal]) < self.update_interval
is_array = signal in self.arraycache
is_last_array_index = is_array and (sub == (len(self.arraycache[signal])))
if not_enough_time_passed and is_last_array_index:
return
self.lastupdate[signal] = time
value_str = ""
value_type = self.types[signal]
match value_type:
case "u8"|"s8":
value_str = f"x{value:02X}"
case "u16"|"s16":
value_str = f"x{value:04X}"
case "u32"|"s32":
value_str = f"x{value:08X}"
case "f32":
value_str = str(value)
case "string":
value_str = value.rstrip("\r\n")
case "event":
value_str = str(time)
if is_array and (sub is not None):
if sub >= len(self.arraycache[signal]):
self.arraycache[signal].extend([0] * (sub - len(self.arraycache[signal]) + 1))
self.arraycache[signal][sub] = value_str
value_str = "[" + ", ".join(['X' if x is None else x for x in self.arraycache[signal]]) + "]"
self.values[signal] = value_str
def tui_record(signals, packet_filter, vcd_sink, timescale):
try:
from rich.console import Console
from rich.text import Text
from rich.layout import Layout
from rich.progress import Progress, TextColumn, BarColumn, TaskProgressColumn, MofNCompleteColumn
from rich.live import Live
from rich.align import Align
from rich.table import Table
except:
print("error: TUI mode requires the rich package")
exit()
console = Console()
noise_buffer = NoiseLineBuffer(lambda text: console.print(f"[blue]{text}"))
trace_text = Text("")
# set up progress bars
progress_colums = [
TextColumn("[progress.description]{task.description}"),
BarColumn(bar_width=80, complete_style="gold3", finished_style="red"),
TaskProgressColumn(),
MofNCompleteColumn()
]
diag_progress = Progress(*progress_colums, transient=True, auto_refresh=False, refresh_per_second=1)
buffer_health = diag_progress.add_task("[green]Buffer Health", total=255, visible=False)
buffer_items = diag_progress.add_task("[green]Buffer Items", visible=False)
items_sent = diag_progress.add_task("[blue]Items Sent", total=1024, visible=False)
render_time = diag_progress.add_task("[blue]Render Time", total=1024, visible=False)
comp_lvl = diag_progress.add_task("[yellow]Compression Level", total=100, visible=False)
comp_time = diag_progress.add_task("[yellow]Compression Time", total=100, visible=False)
buffer_items_tm = TotalMaximumProgressUpdater(diag_progress, buffer_items)
items_sent_tm = TotalMaximumProgressUpdater(diag_progress, items_sent)
render_time_tm = TotalMaximumProgressUpdater(diag_progress, render_time)
comp_time_tm = TotalMaximumProgressUpdater(diag_progress, comp_time)
# set up table layout and signal view
# set its update interval to 0.2 seconds in ticks
signal_update_interval = int(0.2/timescale)
signal_values = SignalTable(signals, signal_update_interval)
vcd_sink.onanyvalue(signal_values.update)
def generate_table(diag_progress, signal_values):
grid = Table.grid(expand=False)
grid.add_column(justify="left")
signals_width = max([len(x) for x in signal_values.values.keys()]) + 2
sigtable = Table.grid(expand=False)
sigtable.add_column(justify="left", width=10)
sigtable.add_column(justify="left", width=signals_width)
sigtable.add_column(justify="left")
sigtable.add_row(None) # this empty row is there to not leave behind a render on interrupt
for sig in signals:
name, sigtype = sig.split(":")
value = signal_values.values[name]
text = Text("X" if value is None else value, style="red" if value is None else "green")
sigtable.add_row(sigtype, name, text)
grid.add_row(sigtable)
grid.add_row(None)
grid.add_row(diag_progress)
return grid
with Live(console=console, transient=True) as live_status:
vcd_sink.onvalue("ST.BufferHealth", lambda _,value,sub: diag_progress.update(buffer_health, completed=value, visible=True))
vcd_sink.onvalue("ST.BufferItems", lambda _,value,sub: buffer_items_tm.update(value))
vcd_sink.onvalue("ST.ItemsSent", lambda _,value,sub: items_sent_tm.update(value))
vcd_sink.onvalue("ST.CompressionLevel", lambda _,value,sub: diag_progress.update(comp_lvl, completed=value, visible=True))
vcd_sink.onvalue("ST.CompressionTime", lambda _,value,sub: comp_time_tm.update(value))
vcd_sink.onvalue("ST.RenderTime", lambda _,value,sub: render_time_tm.update(value))
packet_filter.onnoise(noise_buffer.add)
try:
for bstr in sys.stdin.buffer:
for b in bstr:
packet_filter.process(b)
live_status.update(generate_table(diag_progress, signal_values))
except KeyboardInterrupt:
diag_progress.stop()
print()
if __name__ == '__main__':
main()