Compare commits
No commits in common. "27f06ba208e2f53ea507df141d8422daab606865" and "5b9bda35af2c204651615d91e05c29f8ae545760" have entirely different histories.
27f06ba208
...
5b9bda35af
Binary file not shown.
|
Before Width: | Height: | Size: 370 KiB After Width: | Height: | Size: 318 KiB |
Binary file not shown.
|
Before Width: | Height: | Size: 29 KiB |
@ -1,4 +1,10 @@
|
|||||||

|
```
|
||||||
|
▓▓▓▓▓ ░░░ ███████ ████████
|
||||||
|
▓ ▓ ▒ ░ ██ ██ Streaming Trace
|
||||||
|
▓ ▓ ▓ ░░ ███████ ██ A tiny signal tracing library
|
||||||
|
▓ ▓ ▒ ░ ██ ██
|
||||||
|
▓▓▓▓▓ ░░░ ███████ ██ Version 1.0.2 Alpha
|
||||||
|
```
|
||||||
|
|
||||||
# Introduction
|
# Introduction
|
||||||
|
|
||||||
|
|||||||
64
st_record.py
64
st_record.py
@ -15,7 +15,7 @@ header = """
|
|||||||
▓ ▓ ▒ ░ ██ ██ Streaming Trace
|
▓ ▓ ▒ ░ ██ ██ Streaming Trace
|
||||||
▓ ▓ ▓ ░░ ███████ ██ VCD Recorder Utility
|
▓ ▓ ▓ ░░ ███████ ██ VCD Recorder Utility
|
||||||
▓ ▓ ▒ ░ ██ ██
|
▓ ▓ ▒ ░ ██ ██
|
||||||
▓▓▓▓▓ ░░░ ███████ ██ Version 1.0.16
|
▓▓▓▓▓ ░░░ ███████ ██ (c) 2025 D.Hoeglinger
|
||||||
|
|
||||||
"""
|
"""
|
||||||
|
|
||||||
@ -26,25 +26,6 @@ def human_readable_size(size, decimal_places=2):
|
|||||||
size /= 1024.0
|
size /= 1024.0
|
||||||
return f"{size:.{decimal_places}f} {unit}"
|
return f"{size:.{decimal_places}f} {unit}"
|
||||||
|
|
||||||
def parse_time(timestring):
|
|
||||||
prefixes = {
|
|
||||||
'p':10e-12,
|
|
||||||
'n':10e-9,
|
|
||||||
'u':10e-6,
|
|
||||||
'm':10e-3,
|
|
||||||
}
|
|
||||||
for prefix,factor in prefixes.items():
|
|
||||||
unit = prefix+'s'
|
|
||||||
# found unit in string
|
|
||||||
if unit in timestring:
|
|
||||||
# remove unit from string, strip any whitespace before it and parse as float
|
|
||||||
time = float(timestring.split(unit)[0].rstrip(" "))
|
|
||||||
return time*factor
|
|
||||||
|
|
||||||
# no unit attached, assuming seconds
|
|
||||||
return float(timestring)
|
|
||||||
|
|
||||||
|
|
||||||
def is_hex(s):
|
def is_hex(s):
|
||||||
return all(chr(c) in '0123456789abcdefABCDEF' for c in s)
|
return all(chr(c) in '0123456789abcdefABCDEF' for c in s)
|
||||||
|
|
||||||
@ -551,17 +532,8 @@ def main():
|
|||||||
nfile = open(noisefile, 'wb')
|
nfile = open(noisefile, 'wb')
|
||||||
packet_filter.onnoise(lambda b: nfile.write(b.to_bytes(1)))
|
packet_filter.onnoise(lambda b: nfile.write(b.to_bytes(1)))
|
||||||
|
|
||||||
# remove diagnostic signal from view
|
|
||||||
signals = [s for s in signals if s not in predefined_signals]
|
|
||||||
# convert timescale to seconds
|
|
||||||
timescale = parse_time(timescale)
|
|
||||||
|
|
||||||
# start recording
|
|
||||||
if enable_tui:
|
if enable_tui:
|
||||||
if enable_verbose_trace:
|
tui_record(signals, packet_filter, vcd_sink, enable_verbose_trace)
|
||||||
print("warning: verbose trace is not avaialble in TUI mode")
|
|
||||||
print()
|
|
||||||
tui_record(signals, packet_filter, vcd_sink, timescale)
|
|
||||||
else:
|
else:
|
||||||
record(signals, packet_filter, vcd_sink, enable_verbose_trace)
|
record(signals, packet_filter, vcd_sink, enable_verbose_trace)
|
||||||
|
|
||||||
@ -635,12 +607,10 @@ class TotalMaximumProgressUpdater:
|
|||||||
self.progress.update(self.progress_task, completed=value, visible=True)
|
self.progress.update(self.progress_task, completed=value, visible=True)
|
||||||
|
|
||||||
class SignalTable:
|
class SignalTable:
|
||||||
def __init__(self, signals, update_interval=1000000):
|
def __init__(self, signals):
|
||||||
self.values = {}
|
self.values = {}
|
||||||
self.arraycache = {}
|
self.arraycache = {}
|
||||||
self.types = {}
|
self.types = {}
|
||||||
self.lastupdate = {}
|
|
||||||
self.update_interval = update_interval
|
|
||||||
|
|
||||||
for signal in signals:
|
for signal in signals:
|
||||||
name,value_type = signal.split(":")
|
name,value_type = signal.split(":")
|
||||||
@ -650,17 +620,9 @@ class SignalTable:
|
|||||||
array_len = int(s[1].split("]")[0], 0)
|
array_len = int(s[1].split("]")[0], 0)
|
||||||
self.arraycache[name] = [None] * array_len
|
self.arraycache[name] = [None] * array_len
|
||||||
self.types[name] = value_type
|
self.types[name] = value_type
|
||||||
self.lastupdate[name] = 0
|
|
||||||
|
|
||||||
def update(self, time, signal, value, sub):
|
def update(self, time, signal, value, sub):
|
||||||
if signal in self.values:
|
try:
|
||||||
not_enough_time_passed = (time - self.lastupdate[signal]) < self.update_interval
|
|
||||||
is_array = signal in self.arraycache
|
|
||||||
is_last_array_index = is_array and (sub == (len(self.arraycache[signal])))
|
|
||||||
if not_enough_time_passed and is_last_array_index:
|
|
||||||
return
|
|
||||||
self.lastupdate[signal] = time
|
|
||||||
|
|
||||||
value_str = ""
|
value_str = ""
|
||||||
value_type = self.types[signal]
|
value_type = self.types[signal]
|
||||||
|
|
||||||
@ -671,22 +633,24 @@ class SignalTable:
|
|||||||
value_str = f"x{value:04X}"
|
value_str = f"x{value:04X}"
|
||||||
case "u32"|"s32":
|
case "u32"|"s32":
|
||||||
value_str = f"x{value:08X}"
|
value_str = f"x{value:08X}"
|
||||||
case "f32":
|
|
||||||
value_str = str(value)
|
|
||||||
case "string":
|
case "string":
|
||||||
value_str = value.rstrip("\r\n")
|
value_str = value.rstrip("\r\n")
|
||||||
case "event":
|
case "event":
|
||||||
value_str = str(time)
|
value_str = str(time)
|
||||||
|
|
||||||
if is_array and (sub is not None):
|
if (signal in self.arraycache) and (sub is not None):
|
||||||
if sub >= len(self.arraycache[signal]):
|
if sub >= len(self.arraycache[signal]):
|
||||||
self.arraycache[signal].extend([0] * (sub - len(self.arraycache[signal]) + 1))
|
self.arraycache[signal].extend([0] * (sub - len(self.arraycache[signal]) + 1))
|
||||||
self.arraycache[signal][sub] = value_str
|
self.arraycache[signal][sub] = value_str
|
||||||
value_str = "[" + ", ".join(['X' if x is None else x for x in self.arraycache[signal]]) + "]"
|
value_str = "[" + ", ".join(['X' if x is None else x for x in self.arraycache[signal]]) + "]"
|
||||||
|
|
||||||
self.values[signal] = value_str
|
self.values[signal] = value_str
|
||||||
|
except Exception as e:
|
||||||
|
print("EEE", e)
|
||||||
|
print(f"{sub=} {signal=} {len(self.arraycache[signal])=}")
|
||||||
|
print('Error on line {}'.format(sys.exc_info()[-1].tb_lineno), type(e).__name__, e)
|
||||||
|
|
||||||
def tui_record(signals, packet_filter, vcd_sink, timescale):
|
def tui_record(signals, packet_filter, vcd_sink, enable_verbose_trace):
|
||||||
try:
|
try:
|
||||||
from rich.console import Console
|
from rich.console import Console
|
||||||
from rich.text import Text
|
from rich.text import Text
|
||||||
@ -699,6 +663,10 @@ def tui_record(signals, packet_filter, vcd_sink, timescale):
|
|||||||
print("error: TUI mode requires the rich package")
|
print("error: TUI mode requires the rich package")
|
||||||
exit()
|
exit()
|
||||||
|
|
||||||
|
if enable_verbose_trace:
|
||||||
|
print("warning: verbose trace is not avaialble in TUI mode")
|
||||||
|
print()
|
||||||
|
|
||||||
console = Console()
|
console = Console()
|
||||||
noise_buffer = NoiseLineBuffer(lambda text: console.print(f"[blue]{text}"))
|
noise_buffer = NoiseLineBuffer(lambda text: console.print(f"[blue]{text}"))
|
||||||
trace_text = Text("")
|
trace_text = Text("")
|
||||||
@ -725,9 +693,7 @@ def tui_record(signals, packet_filter, vcd_sink, timescale):
|
|||||||
comp_time_tm = TotalMaximumProgressUpdater(diag_progress, comp_time)
|
comp_time_tm = TotalMaximumProgressUpdater(diag_progress, comp_time)
|
||||||
|
|
||||||
# set up table layout and signal view
|
# set up table layout and signal view
|
||||||
# set its update interval to 0.2 seconds in ticks
|
signal_values = SignalTable(signals)
|
||||||
signal_update_interval = int(0.2/timescale)
|
|
||||||
signal_values = SignalTable(signals, signal_update_interval)
|
|
||||||
vcd_sink.onanyvalue(signal_values.update)
|
vcd_sink.onanyvalue(signal_values.update)
|
||||||
|
|
||||||
def generate_table(diag_progress, signal_values):
|
def generate_table(diag_progress, signal_values):
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user