In TUI mode, bar displays are now hidden by default until its diagnostic metric was received to declutter the UI. This means that for instance the compression metrics are hidden when this feature is not utilized in the traced program, or all metrics are hidden if it does not call `st_diagtrace`. The bars are now styled to not give an impression of being a progress bar. The completed portion is colored gold while a completed bar is indicated by red, as this usually indicates a new maximum value. A failing rich import in TUI mode now exits after its error message.
671 lines
23 KiB
Python
671 lines
23 KiB
Python
# -*- coding: utf-8 -*-
|
|
|
|
import sys
|
|
import os
|
|
import re
|
|
import struct
|
|
import argparse
|
|
import datetime
|
|
from functools import partial
|
|
from vcd import VCDWriter, writer
|
|
from vcd.common import VarType
|
|
|
|
header = """
|
|
▓▓▓▓▓ ░░░ ███████ ████████
|
|
▓ ▓ ▒ ░ ██ ██ Streaming Trace
|
|
▓ ▓ ▓ ░░ ███████ ██ VCD Recorder Utility
|
|
▓ ▓ ▒ ░ ██ ██
|
|
▓▓▓▓▓ ░░░ ███████ ██ (c) 2025 D.Hoeglinger
|
|
|
|
"""
|
|
|
|
def human_readable_size(size, decimal_places=2):
|
|
for unit in ['B', 'KiB', 'MiB', 'GiB', 'TiB', 'PiB']:
|
|
if size < 1024.0 or unit == 'PiB':
|
|
break
|
|
size /= 1024.0
|
|
return f"{size:.{decimal_places}f} {unit}"
|
|
|
|
def is_hex(s):
|
|
return all(chr(c) in '0123456789abcdefABCDEF' for c in s)
|
|
|
|
def bsd_hash(data):
|
|
checksum = 0
|
|
for c in data:
|
|
checksum = (checksum >> 1) + ((checksum & 1) << 15)
|
|
checksum += ord(c)
|
|
checksum = checksum & 0xFFFF
|
|
return checksum
|
|
|
|
def hex_to_int(hex_digit):
|
|
if hex_digit.isdigit():
|
|
return int(hex_digit)
|
|
elif hex_digit in 'ABCDEF':
|
|
return ord(hex_digit) - ord('A') + 10
|
|
elif hex_digit in 'abcdef':
|
|
return ord(hex_digit) - ord('a') + 10
|
|
else:
|
|
raise ValueError("Invalid hexadecimal digit")
|
|
|
|
def decode_hexstr(payload):
|
|
value = bytearray()
|
|
for i in range(0, len(payload), 2):
|
|
b = hex_to_int(chr(payload[i]))
|
|
b |= hex_to_int(chr(payload[i + 1])) << 4
|
|
value.append(b)
|
|
return bytes(value)
|
|
|
|
def hexstr(payload):
|
|
return payload.hex().upper()
|
|
|
|
def decode_binstr(payload):
|
|
value = 0
|
|
for i,b in enumerate(payload):
|
|
value |= b << (8 * i)
|
|
return value
|
|
|
|
class DecodeError(Exception):
|
|
pass
|
|
|
|
def cobs_decode(enc, delim):
|
|
enc = list(enc)
|
|
enc.append(delim)
|
|
enc2 = enc[0:-1]
|
|
length = len(enc2)
|
|
if delim != 0x00:
|
|
for i in range(0, len(enc2)):
|
|
enc2[i] = enc[i] ^ delim
|
|
dec = []
|
|
code = 0xFF
|
|
block = 0
|
|
for i in range(0, length):
|
|
byte = enc2[i]
|
|
if block != 0:
|
|
dec.append(byte)
|
|
else:
|
|
if (i + byte) > len(enc):
|
|
raise DecodeError(f"Marker pointing to end of packet, at {i}, marker={byte}")
|
|
if code != 0xFF:
|
|
dec.append(0)
|
|
code = byte
|
|
block = code
|
|
if code == 0:
|
|
break
|
|
block = block - 1
|
|
return bytes(dec)
|
|
|
|
def _memmove(data: bytearray, stidx: int, offset: int, mlen: int) -> None:
|
|
for i in range(mlen):
|
|
data[stidx + i] = data[stidx - offset + i]
|
|
|
|
def fastlz_decompress_lv1(datain, doutlen):
|
|
opcode_0 = datain[0]
|
|
datain_idx = 1
|
|
|
|
dataout = bytearray(doutlen)
|
|
dataout_idx = 0;
|
|
|
|
while True:
|
|
op_type = opcode_0 >> 5
|
|
op_data = opcode_0 & 31
|
|
|
|
if op_type == 0b000:
|
|
# literal run
|
|
run = 1 + opcode_0
|
|
dataout[dataout_idx:dataout_idx + run] = datain[datain_idx:datain_idx + run]
|
|
datain_idx += run
|
|
dataout_idx += run
|
|
|
|
elif op_type == 0b111:
|
|
# long match
|
|
opcode_1 = datain[datain_idx]
|
|
datain_idx += 1
|
|
opcode_2 = datain[datain_idx]
|
|
datain_idx += 1
|
|
|
|
match_len = 9 + opcode_1
|
|
ofs = (op_data << 8) + opcode_2 + 1
|
|
|
|
_memmove(dataout, dataout_idx, ofs, match_len)
|
|
dataout_idx += match_len
|
|
|
|
else:
|
|
# short match
|
|
opcode_1 = datain[datain_idx]
|
|
datain_idx += 1
|
|
|
|
match_len = 2 + op_type
|
|
ofs = (op_data << 8) + opcode_1 + 1
|
|
|
|
_memmove(dataout, dataout_idx, ofs, match_len)
|
|
dataout_idx += match_len
|
|
|
|
if datain_idx < len(datain):
|
|
opcode_0 = datain[datain_idx]
|
|
datain_idx += 1
|
|
else:
|
|
break
|
|
|
|
return bytes(dataout[:dataout_idx])
|
|
|
|
def scan_for_signals(directory, predefined_signals):
|
|
library_files = ['st.c', 'st.h']
|
|
rx_events = re.compile(r'st_evtrace\(\"([^\"]+)\"')
|
|
rx_scalars = re.compile(r'st_([usf])(8|16|32)trace\(\"([^\"]+)\"')
|
|
rx_arrays = re.compile(r'st_a([us])(8|16|32)trace\(\"([^\"]+)\"\,\s*[^,]+,\s*((?:0x)?[a-zA-Z0-9]+)')
|
|
rx_strings = re.compile(r'st_strtrace\(\"([^\"]+)\"')
|
|
signals = {}
|
|
valid = True
|
|
|
|
def add_variable(tag, variable):
|
|
hash = bsd_hash(tag)
|
|
if hash in signals:
|
|
if signals[hash] != variable:
|
|
print(f"{file_path}: error: variable `{variable}` has conflicting hash with `{signals[hash]}`")
|
|
valid = False
|
|
else:
|
|
signals[hash] = variable
|
|
|
|
for predef_signal in predefined_signals:
|
|
sig,_ = predef_signal.split(":")
|
|
add_variable(sig, predef_signal)
|
|
|
|
for root, _, files in os.walk(directory):
|
|
for file in files:
|
|
if file.endswith(('.c', '.h')) and (file not in library_files):
|
|
file_path = os.path.join(root, file)
|
|
with open(file_path, 'r') as f:
|
|
content = f.read()
|
|
for match in rx_events.finditer(content):
|
|
tag = match.group(1)
|
|
variable = f"{tag}:event"
|
|
add_variable(tag, variable)
|
|
|
|
for match in rx_scalars.finditer(content):
|
|
tag = match.group(3)
|
|
variable = f"{tag}:{match.group(1)}{match.group(2)}"
|
|
add_variable(tag, variable)
|
|
|
|
for match in rx_arrays.finditer(content):
|
|
tag = match.group(3)
|
|
variable = f"{tag}:{match.group(1)}{match.group(2)}[{int(match.group(4),0)}]"
|
|
add_variable(tag, variable)
|
|
|
|
for match in rx_strings.finditer(content):
|
|
tag = match.group(1)
|
|
variable = f"{tag}:string"
|
|
add_variable(tag, variable)
|
|
|
|
return list(signals.values()), valid
|
|
|
|
class Filter:
|
|
PREAMBLE = b"\x1b[s"
|
|
EPILOUGE = b"\x1b[u"
|
|
TAGCODE_LUT = {
|
|
0x11 : "D1",
|
|
0x12 : "D2",
|
|
0x14 : "D4",
|
|
0x21 : "V1",
|
|
0x22 : "V2",
|
|
0x24 : "V4",
|
|
0x31 : "A1",
|
|
0x32 : "A2",
|
|
0x34 : "A4",
|
|
0x44 : "S4",
|
|
0x54 : "F4",
|
|
0x60 : "EV"
|
|
}
|
|
|
|
FLAGS_COMPRESSED = (1 << 0)
|
|
|
|
def __init__(self, on_value):
|
|
self.preamble_i = 0
|
|
self.epilouge_i = 0
|
|
self.packet_buffer = []
|
|
self.noise_buffer = []
|
|
self.process_value = on_value
|
|
self._on_noise = []
|
|
self.packet_counter = 0
|
|
self.packets_dropped = 0
|
|
|
|
def onnoise(self, cb):
|
|
self._on_noise.append(cb)
|
|
|
|
def _emit_noise(self, b):
|
|
for cb in self._on_noise:
|
|
cb(b)
|
|
|
|
def process(self, b):
|
|
if self.preamble_i == (len(self.PREAMBLE)):
|
|
self.packet_buffer.append(b)
|
|
if b == self.EPILOUGE[self.epilouge_i]:
|
|
self.epilouge_i += 1
|
|
else:
|
|
self.epilouge_i = 0
|
|
|
|
if self.epilouge_i == (len(self.EPILOUGE)):
|
|
self.process_packet(self.packet_buffer[:-len(self.EPILOUGE)])
|
|
self.packet_buffer = []
|
|
self.noise_buffer = []
|
|
self.preamble_i = 0
|
|
self.epilouge_i = 0
|
|
else:
|
|
if b == self.PREAMBLE[self.preamble_i]:
|
|
self.preamble_i += 1
|
|
self.noise_buffer.append(b)
|
|
else:
|
|
self.preamble_i = 0
|
|
for nb in self.noise_buffer:
|
|
self._emit_noise(nb)
|
|
self.noise_buffer = []
|
|
self._emit_noise(b)
|
|
|
|
def disassemble_packet(self, packet):
|
|
try:
|
|
tagcode = packet[0]
|
|
# ignore rubout packets
|
|
if chr(tagcode) == ' ':
|
|
return
|
|
|
|
if tagcode not in self.TAGCODE_LUT:
|
|
self.packets_dropped += 1
|
|
return
|
|
|
|
tag = self.TAGCODE_LUT[tagcode]
|
|
|
|
value = None
|
|
offset = 1
|
|
match tag:
|
|
case "D1"|"V1"|"A1":
|
|
value = decode_binstr(packet[offset:offset+1])
|
|
offset += 1
|
|
case "D2"|"V2"|"A2":
|
|
value = decode_binstr(packet[offset:offset+2])
|
|
offset += 2
|
|
case "D4"|"V4"|"A4"|"F4"|"S4":
|
|
value = decode_binstr(packet[offset:offset+4])
|
|
offset += 4
|
|
sub = None
|
|
if tag[0] == 'A' or tag[0] == 'S':
|
|
sub = decode_binstr(packet[offset:offset+1])
|
|
offset += 1
|
|
|
|
hashtag = None
|
|
if tag[0] != 'D':
|
|
hashtag = decode_binstr(packet[-2:])
|
|
except Exception as ex:
|
|
self.packets_dropped += 1
|
|
return
|
|
|
|
self.process_value(hashtag, value, sub, tag)
|
|
self.packet_counter += 1
|
|
|
|
def disassemble_macropacket(self, packet):
|
|
flags = packet[0]
|
|
payload = packet[1:]
|
|
|
|
try:
|
|
macropack = cobs_decode(payload, 0x1B)
|
|
if self.FLAGS_COMPRESSED & flags:
|
|
frames = fastlz_decompress_lv1(macropack, 1024)
|
|
else:
|
|
frames = macropack
|
|
macropack_delim = 0x00
|
|
for pack in frames.split(bytes(bytearray([macropack_delim]))):
|
|
if len(pack) == 0:
|
|
continue
|
|
try:
|
|
decoded = cobs_decode(pack, macropack_delim)
|
|
self.disassemble_packet(decoded)
|
|
except:
|
|
self.packets_dropped += 1
|
|
except:
|
|
self.packets_dropped += 1
|
|
|
|
|
|
def process_packet(self, packet):
|
|
if len(packet) == 0:
|
|
return
|
|
if is_hex(packet):
|
|
packet = decode_hexstr(packet)
|
|
if True or (len(packet) > 3 + 8 + 3):
|
|
self.disassemble_macropacket(packet)
|
|
else:
|
|
self.disassemble_packet(packet)
|
|
|
|
class Retagger:
|
|
def __init__(self, on_value, tags=None):
|
|
tags = tags or []
|
|
self._tag_lut = {bsd_hash(tag):tag for tag in tags}
|
|
self.process_value = on_value
|
|
self.packets_dropped = 0
|
|
|
|
def process(self, hashtag, number, sub, datatag):
|
|
if hashtag in self._tag_lut or datatag[0] == 'D':
|
|
tag = self._tag_lut.get(hashtag, None)
|
|
self.process_value(tag, number, sub, datatag)
|
|
else:
|
|
self.packets_dropped += 1
|
|
|
|
class VcdSink:
|
|
def __init__(self, fs, signals, timescale='1 us'):
|
|
self.writer = VCDWriter(fs, timescale=timescale, date=datetime.datetime.now().isoformat(), version=f"PET v1.0")
|
|
self.skalars = {}
|
|
self.arrays = {}
|
|
self.strings = {}
|
|
self.varnames = {}
|
|
self._onvalues = {}
|
|
self._onanyvalues = []
|
|
self.timestamp = 0
|
|
self.packets_dropped = 0
|
|
for v in signals:
|
|
hvar, vtype = v.split(":")
|
|
hier, _, name = hvar.rpartition(".")
|
|
arr = None
|
|
s = vtype.split("[")
|
|
if len(s) == 2:
|
|
vtype, arr = s
|
|
dsize = 32
|
|
dtype = 'integer'
|
|
match vtype:
|
|
case 'event':
|
|
dtype = 'event'
|
|
dsize = 32
|
|
case 'f32':
|
|
dtype = 'real'
|
|
dsize = 32
|
|
case 'u32'|'s32':
|
|
dtype = 'integer'
|
|
dsize = 32
|
|
case 'u16'|'s16':
|
|
dtype = 'integer'
|
|
dsize = 16
|
|
case 'u8'|'s8':
|
|
dtype = 'integer'
|
|
dsize = 8
|
|
case 'string':
|
|
dtype = 'string'
|
|
dsize = 8
|
|
|
|
self.varnames[hvar] = hvar
|
|
|
|
if arr is not None:
|
|
elems = int(arr.rstrip("]"))
|
|
vars = []
|
|
for i in range(0, elems):
|
|
vars.append(self.writer.register_var(hvar, f"{name}[{i}:{(i+1)}]", 'wire', size=dsize))
|
|
self.arrays[hvar] = vars
|
|
elif dtype == 'string':
|
|
self.strings[hvar] = [self.writer.register_var(hier, name, dtype, size=dsize), ""]
|
|
else:
|
|
self.skalars[hvar] = self.writer.register_var(hier, name, dtype, size=dsize)
|
|
|
|
def onvalue(self, tag, cb):
|
|
self._onvalues[tag] = cb
|
|
|
|
def onanyvalue(self, cb):
|
|
self._onanyvalues.append(cb)
|
|
|
|
def _emit(self, timestamp, tag, value, sub=None):
|
|
for cb in self._onanyvalues:
|
|
cb(timestamp, tag, value, sub)
|
|
if tag in self._onvalues:
|
|
self._onvalues[tag](timestamp, value, sub)
|
|
|
|
def process(self, tag, value, sub, datatag):
|
|
if datatag[0] == 'D':
|
|
self.timestamp += value
|
|
# array values
|
|
elif datatag[0] == 'A':
|
|
timestamp = self.timestamp
|
|
try:
|
|
self.writer.change(self.arrays[tag][sub], timestamp, value)
|
|
except ValueError:
|
|
print(f"### {timestamp:012} : {self.varnames[tag]}[{sub}] <= {value} [VAL_ERR] ", flush=True)
|
|
self.packets_dropped += 1
|
|
except writer.VCDPhaseError:
|
|
print(f"### {timestamp:012} : {self.varnames[tag]}[{sub}] <= {value} [PHA_ERR] ", flush=True)
|
|
self.packets_dropped += 1
|
|
except:
|
|
print(f"### {timestamp:012} : {self.varnames[tag]}[{sub}] <= {value} [ERR] ", flush=True)
|
|
self.packets_dropped += 1
|
|
self._emit(timestamp, tag, value, sub)
|
|
elif datatag == 'S4':
|
|
timestamp = self.timestamp
|
|
# unpack
|
|
for i in range(0,4):
|
|
char = value >> (i*8) & 0xFF
|
|
if char != 0:
|
|
self.strings[tag][1] += chr(char)
|
|
# sub of 1 indicates end of string
|
|
if sub == 1:
|
|
try:
|
|
string = self.strings[tag][1]
|
|
self.writer.change(self.strings[tag][0], timestamp, string)
|
|
except ValueError:
|
|
print(f"### {timestamp:012} : {self.varnames[tag]} <= \"{self.strings[tag][1]}\" [VAL_ERR] ", flush=True)
|
|
self.packets_dropped += 1
|
|
except writer.VCDPhaseError:
|
|
print(f"### {timestamp:012} : {self.varnames[tag]} <= \"{self.strings[tag][1]}\" [PHA_ERR] ", flush=True)
|
|
self.packets_dropped += 1
|
|
except:
|
|
print(f"### {timestamp:012} : {self.varnames[tag]} <= \"{self.strings[tag][1]}\" [ERR] ", flush=True)
|
|
self.packets_dropped += 1
|
|
self.strings[tag][1] = ""
|
|
self._emit(timestamp, tag, string, None)
|
|
|
|
# skalar values
|
|
elif (datatag == 'EV') or (datatag[0] == 'V') or (datatag[0] == 'F'):
|
|
timestamp = self.timestamp
|
|
try:
|
|
if self.skalars[tag].type == VarType.event:
|
|
value = True
|
|
elif datatag == 'F4':
|
|
value = struct.unpack(">f", struct.pack(">L", value))[0]
|
|
self.writer.change(self.skalars[tag], timestamp, value)
|
|
except ValueError:
|
|
print(f"### {timestamp:012} : {self.varnames[tag]} <= {value} [VAL_ERR] ", flush=True)
|
|
self.packets_dropped += 1
|
|
except writer.VCDPhaseError:
|
|
print(f"### {timestamp:012} : {self.varnames[tag]} <= {value} [PHA_ERR] ", flush=True)
|
|
self.packets_dropped += 1
|
|
except:
|
|
print(f"### {timestamp:012} : {self.varnames[tag]} <= {value} [ERR] ", flush=True)
|
|
self.packets_dropped += 1
|
|
self._emit(timestamp, tag, value, None)
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description="scans stdin for PET packets and dumps the values into a VCD file")
|
|
parser.add_argument('-d', '--dump', type=str, required=True,
|
|
help='output IEEE 1364-2005 Value Change Dump (vcd) file')
|
|
parser.add_argument('-t', '--timescale', type=str, default="1 us",
|
|
help='period of one timestamp tick')
|
|
parser.add_argument('-n', '--noise', type=str, default=None,
|
|
help='store the stdin data sans packets in this file')
|
|
parser.add_argument('-s', '--source', type=str, required=True,
|
|
help='source tree to scan for trace marks')
|
|
parser.add_argument('--diagnostics', action=argparse.BooleanOptionalAction,
|
|
help='add additional signals tracing internal state of PET')
|
|
parser.add_argument('--trace', action=argparse.BooleanOptionalAction,
|
|
help='write out every trace that arrives')
|
|
parser.add_argument('--tui', action=argparse.BooleanOptionalAction,
|
|
help='enable TUI mode')
|
|
args = parser.parse_args()
|
|
|
|
print(header)
|
|
|
|
tracefile = args.dump
|
|
noisefile = args.noise
|
|
source_tree = args.source
|
|
timescale = args.timescale
|
|
enable_diag = args.diagnostics
|
|
enable_verbose_trace = args.trace
|
|
enable_tui = args.tui
|
|
|
|
predefined_signals = []
|
|
if enable_diag:
|
|
predefined_signals += [
|
|
'ST.BufferItems:u32',
|
|
'ST.BufferHealth:u8',
|
|
'ST.CompressionLevel:u8',
|
|
'ST.CompressionTime:u32',
|
|
'ST.RenderTime:u32',
|
|
'ST.ItemsSent:u32'
|
|
]
|
|
|
|
signals, signals_valid = scan_for_signals(source_tree, predefined_signals)
|
|
|
|
if not signals_valid:
|
|
return
|
|
|
|
signals.sort()
|
|
tags = [k.split(":")[0] for k in signals]
|
|
|
|
dfile = open(tracefile, 'w', encoding='utf-8')
|
|
|
|
vcd_sink = VcdSink(dfile, signals, timescale)
|
|
retagger = Retagger(vcd_sink.process, tags)
|
|
packet_filter = Filter(retagger.process)
|
|
|
|
nfile = None
|
|
if noisefile:
|
|
nfile = open(noisefile, 'wb')
|
|
packet_filter.onnoise(lambda b: nfile.write(b.to_bytes(1)))
|
|
|
|
print("Signals:")
|
|
for var in signals:
|
|
print(f" - {var}")
|
|
print()
|
|
|
|
if enable_tui:
|
|
tui_record(packet_filter, vcd_sink, enable_verbose_trace)
|
|
else:
|
|
record(packet_filter, vcd_sink, enable_verbose_trace)
|
|
|
|
vcd_sink.writer.close()
|
|
dfile.close()
|
|
if nfile:
|
|
nfile.close()
|
|
|
|
print("Summary:")
|
|
packet_count = packet_filter.packet_counter
|
|
drop_count = packet_filter.packets_dropped + vcd_sink.packets_dropped
|
|
trace_size = human_readable_size(os.path.getsize(tracefile))
|
|
print(f" - Packets received: {packet_count}")
|
|
print(f" - Packets dropped: {drop_count}")
|
|
print(f" - Trace file: {tracefile}")
|
|
print(f" - Trace size: {trace_size}")
|
|
|
|
|
|
def record(packet_filter, vcd_sink, enable_verbose_trace):
|
|
packet_filter.onnoise(lambda b: print(chr(b), end="", flush=True))
|
|
|
|
def onval(timestamp, tag, value, sub):
|
|
if sub is not None:
|
|
print(f"### {timestamp:012} : {tag}[{sub}] <= {value}")
|
|
else:
|
|
print(f"### {timestamp:012} : {tag} <= {value}")
|
|
|
|
if enable_verbose_trace:
|
|
vcd_sink.onanyvalue(onval)
|
|
|
|
print(" === BEGIN NOISE ===")
|
|
try:
|
|
for bstr in sys.stdin.buffer:
|
|
for b in bstr:
|
|
packet_filter.process(b)
|
|
except KeyboardInterrupt:
|
|
pass
|
|
|
|
print()
|
|
print(" === END NOISE ===")
|
|
print()
|
|
|
|
class NoiseLineBuffer:
|
|
def __init__(self, online):
|
|
self.buffer = ""
|
|
self.online = online
|
|
|
|
def add(self, b):
|
|
t = chr(b)
|
|
if t == "\n":
|
|
self.online(self.buffer)
|
|
self.buffer = ""
|
|
else:
|
|
self.buffer += t
|
|
|
|
class TotalMaximumProgressUpdater:
|
|
def __init__(self, progress, progress_task):
|
|
self.progress = progress
|
|
self.progress_task = progress_task
|
|
self.maximum = 0
|
|
|
|
def update(self, value):
|
|
if value > self.maximum:
|
|
self.maximum = value
|
|
self.progress.update(self.progress_task, total=self.maximum)
|
|
self.progress.update(self.progress_task, completed=value, visible=True)
|
|
|
|
def tui_record(packet_filter, vcd_sink, enable_verbose_trace):
|
|
try:
|
|
from rich.console import Console
|
|
from rich.text import Text
|
|
from rich.layout import Layout
|
|
from rich.progress import Progress, TextColumn, BarColumn, TaskProgressColumn, MofNCompleteColumn
|
|
from rich.live import Live
|
|
from rich.align import Align
|
|
except:
|
|
print("error: TUI mode requires the rich package")
|
|
exit()
|
|
|
|
if enable_verbose_trace:
|
|
print("warning: verbose trace is not avaialble in TUI mode")
|
|
print()
|
|
|
|
console = Console()
|
|
noise_buffer = NoiseLineBuffer(lambda text: console.print(f"[blue]{text}"))
|
|
trace_text = Text("")
|
|
|
|
progress_colums = [
|
|
TextColumn("[progress.description]{task.description}"),
|
|
BarColumn(bar_width=80, complete_style="gold3", finished_style="red"),
|
|
TaskProgressColumn(),
|
|
MofNCompleteColumn()
|
|
|
|
]
|
|
diag_progress = Progress(*progress_colums, transient=True, auto_refresh=False, refresh_per_second=1)
|
|
buffer_health = diag_progress.add_task("[green]Buffer Health", total=255, visible=False)
|
|
buffer_items = diag_progress.add_task("[green]Buffer Items", visible=False)
|
|
items_sent = diag_progress.add_task("[blue]Items Sent", total=1024, visible=False)
|
|
render_time = diag_progress.add_task("[blue]Render Time", total=1024, visible=False)
|
|
comp_lvl = diag_progress.add_task("[yellow]Compression Level", total=100, visible=False)
|
|
comp_time = diag_progress.add_task("[yellow]Compression Time", total=100, visible=False)
|
|
|
|
buffer_items_tm = TotalMaximumProgressUpdater(diag_progress, buffer_items)
|
|
items_sent_tm = TotalMaximumProgressUpdater(diag_progress, items_sent)
|
|
render_time_tm = TotalMaximumProgressUpdater(diag_progress, render_time)
|
|
comp_time_tm = TotalMaximumProgressUpdater(diag_progress, comp_time)
|
|
|
|
with Live(console=console, transient=True) as live_status:
|
|
vcd_sink.onvalue("ST.BufferHealth", lambda _,value,sub: diag_progress.update(buffer_health, completed=value, visible=True))
|
|
vcd_sink.onvalue("ST.BufferItems", lambda _,value,sub: buffer_items_tm.update(value))
|
|
vcd_sink.onvalue("ST.ItemsSent", lambda _,value,sub: items_sent_tm.update(value))
|
|
vcd_sink.onvalue("ST.CompressionLevel", lambda _,value,sub: diag_progress.update(comp_lvl, completed=value, visible=True))
|
|
vcd_sink.onvalue("ST.CompressionTime", lambda _,value,sub: comp_time_tm.update(value))
|
|
vcd_sink.onvalue("ST.RenderTime", lambda _,value,sub: render_time_tm.update(value))
|
|
|
|
packet_filter.onnoise(noise_buffer.add)
|
|
|
|
try:
|
|
for bstr in sys.stdin.buffer:
|
|
for b in bstr:
|
|
packet_filter.process(b)
|
|
live_status.update(diag_progress)
|
|
except KeyboardInterrupt:
|
|
diag_progress.stop()
|
|
|
|
print()
|
|
|
|
if __name__ == '__main__':
|
|
main()
|