pestdetector/sw/analysis/analyse.py
Dominic Höglinger 78102e2f02 Initial release of the pest detector project
This repository contains an attempt to detect wood pests
using three piezo sensors.
Each sensor records the time of flight of a noise source in their enclosing
area and calculates the coordinate as best it can.

This utilizes an ESP32's motor controller peripheral for exact timestamping
of three sensor events, which is then processed by a Python script.
The Python script takes the timestamps of an event,
and a photo of the sensor arrangement and places a marker where the noise
originates.
2025-05-20 20:02:50 +02:00

461 lines
16 KiB
Python

import sys
import asyncio
import serial_asyncio
import re
import numpy as np
from qasync import QEventLoop, QApplication
import pyqtgraph as pg
from pyqtgraph import functions as fn
from PyQt5 import QtWidgets, QtGui
from pyqtgraph.Qt import QtCore
from PyQt5.QtWidgets import (QMainWindow, QApplication, QVBoxLayout, QHBoxLayout,
QWidget, QGroupBox, QTextEdit, QPushButton)
from PyQt5.QtCore import pyqtSlot, QTimer
import tagfind
from trilateration import *#trilaterate, trilaterate_lin, trilaterate2
def distance(p1, p2):
return ((p2[0] - p1[0])**2 + (p2[1] - p1[1])**2)**0.5
class Calibration:
def __init__(self, ab, ac, ba, bc, ca, cb):
self.ab = ab
self.ac = ac
self.ba = ba
self.bc = bc
self.ca = ca
self.cb = cb
def get_c(self):
return (self.ab + self.ba) / 2
def get_b(self):
return (self.ac + self.ca) / 2
def get_a(self):
return (self.bc + self.cb) / 2
class OutputProtocol(asyncio.Protocol):
RX_CALIB = re.compile(r'AB: ([\d.]+)[\r\n]+AC: ([\d.]+)[\r\n]+BA: ([\d.]+)[\r\n]+BC: ([\d.]+)[\r\n]+CA: ([\d.]+)[\r\n]+CB: ([\d.]+)[\r\n]', re.MULTILINE)
RX_SIGNAL= re.compile(r'A: ([\d.]+)[\r\n]+B: ([\d.]+)[\r\n]+C: ([\d.]+)[\r\n]', re.MULTILINE)
def connection_made(self, transport):
self.transport = transport
print('port opened', transport)
transport.serial.rts = False # You can manipulate Serial object via transport
self.buffer = str()
self.incalib = False
self.calib_callback = None
self.inarm = False
self.sig_callback = None
def calibrate(self, cb):
self.buffer = str()
self.incalib = True
self.calib_callback = cb
transport.write(b'c\n')
def arm(self, cb):
self.buffer = str()
self.inarm = True
self.sig_callback = cb
transport.write(b'a\n')
def _parse_sig(self):
m = self.RX_SIGNAL.search(self.buffer)
if m:
groups = m.groups()
ts = [float(f) for f in groups]
self.sig_callback(*ts)
self.inarm = False
self.buffer = str()
def _parse_calib(self):
m = self.RX_CALIB.search(self.buffer)
if m:
groups = m.groups()
calib = Calibration(*[float(f) for f in groups])
self.calib_callback(calib)
self.incalib = False
self.buffer = str()
def data_received(self, data):
self.buffer += data.decode('ascii')
if self.incalib:
self._parse_calib()
if self.inarm:
self._parse_sig()
#print('RX', self.buffer)
def connection_lost(self, exc):
print('port closed')
self.transport.loop.stop()
def pause_writing(self):
print('pause writing')
print(self.transport.get_write_buffer_size())
def resume_writing(self):
print(self.transport.get_write_buffer_size())
print('resume writing')
def on_calib(cal):
print("Calibration data:")
print(f" - AB: {cal.ab}")
print(f" - AC: {cal.ac}")
print(f" - BA: {cal.ba}")
print(f" - BC: {cal.bc}")
print(f" - CA: {cal.ca}")
print(f" - CB: {cal.cb}")
class TriangleItem(QtWidgets.QGraphicsPolygonItem):
def __init__(self, polygon, parent=None):
super().__init__(polygon, parent)
self.setBrush(QtGui.QBrush(QtGui.QColor.fromRgb(255, 255, 255)))
class HyperTriange(pg.GraphicsObject):
"""A GraphItem displays graph information as
a set of nodes connected by lines (as in 'graph theory', not 'graphics').
Useful for drawing networks, trees, etc.
"""
def __init__(self, **kwds):
pg.GraphicsObject.__init__(self)
self.scatter = pg.ScatterPlotItem()
self.scatter.setParentItem(self)
self.points = None
self.picture = None
self.pen = 'default'
self.setData(**kwds)
def setData(self, **kwds):
"""
Change the data displayed by the graph.
============== =======================================================================
**Arguments:**
pos (N,2) array of the positions of each node in the graph.
adj (M,2) array of connection data. Each row contains indexes
of two nodes that are connected or None to hide lines
pen The pen to use when drawing lines between connected
nodes. May be one of:
* QPen
* a single argument to pass to pg.mkPen
* a record array of length M
with fields (red, green, blue, alpha, width). Note
that using this option may have a significant performance
cost.
* None (to disable connection drawing)
* 'default' to use the default foreground color.
symbolPen The pen(s) used for drawing nodes.
symbolBrush The brush(es) used for drawing nodes.
``**opts`` All other keyword arguments are given to
:func:`ScatterPlotItem.setData() <pyqtgraph.ScatterPlotItem.setData>`
to affect the appearance of nodes (symbol, size, brush,
etc.)
============== =======================================================================
"""
if 'points' in kwds:
self.points = kwds['points']
self._update()
if 'pen' in kwds:
self.setPen(kwds.pop('pen'))
self._update()
if 'symbolPen' in kwds:
kwds['pen'] = kwds.pop('symbolPen')
if 'symbolBrush' in kwds:
kwds['brush'] = kwds.pop('symbolBrush')
self.scatter.setData(**kwds)
self.informViewBoundsChanged()
def _update(self):
self.picture = None
self.prepareGeometryChange()
self.update()
def setPen(self, *args, **kwargs):
"""
Set the pen used to draw graph lines.
May be:
* None to disable line drawing
* Record array with fields (red, green, blue, alpha, width)
* Any set of arguments and keyword arguments accepted by
:func:`mkPen <pyqtgraph.mkPen>`.
* 'default' to use the default foreground color.
"""
if len(args) == 1 and len(kwargs) == 0:
self.pen = args[0]
else:
self.pen = fn.mkPen(*args, **kwargs)
self.picture = None
self.update()
def generatePicture(self):
self.picture = QtGui.QPicture()
if self.pen is None or self.points is None:
return
p = QtGui.QPainter(self.picture)
try:
pts = self.points
pen = self.pen
#p.setPen(pen)
p.setPen(fn.mkPen(color=(pen[0], pen[1], pen[2], pen[3]), width=1))
#p.setPen(fn.mkPen(color=(0xFF, 0xFF, 0xFF,0xFF), width=1))
#pen = pg.getConfigOption('foreground')
#p.setPen(fn.mkPen(pen))
p.drawLine(QtCore.QPointF(*pts[0]), QtCore.QPointF(*pts[1]))
p.drawLine(QtCore.QPointF(*pts[1]), QtCore.QPointF(*pts[2]))
p.drawLine(QtCore.QPointF(*pts[2]), QtCore.QPointF(*pts[0]))
finally:
p.end()
def paint(self, p, *args):
if self.picture is None:
self.generatePicture()
if pg.getConfigOption('antialias') is True:
p.setRenderHint(p.RenderHint.Antialiasing)
self.picture.play(p)
def boundingRect(self):
return self.scatter.boundingRect()
def dataBounds(self, *args, **kwds):
return self.scatter.dataBounds(*args, **kwds)
def pixelPadding(self):
return self.scatter.pixelPadding()
class MainWindow(QtWidgets.QMainWindow):
def __init__(self, proto):
super().__init__()
self.proto = proto
#self.w = pg.GraphicsLayoutWidget()
#self.w = pg.GraphicsView(parent=None, useOpenGL=False)
self.w = pg.GraphicsLayoutWidget(show=True)
#self.setCentralWidget(self.w)
layout = QVBoxLayout()
#triangle = TriangleItem(p)
self.calib_triangle = HyperTriange(points=[[0, -0.5], [-0.5, 0], [0, 0.5]], pen=(255, 0, 0, 255))
#self.w.addItem(triangle)
#v_temporal = self.w.addViewBox(0, 0)
#v_temporal.setAspectLocked()
#v_temporal.addItem(self.calib_triangle)
#self.w.addLabel("Calibration", 1, 0)
self.image_plot = pg.ImageItem()
v_image = self.w.addViewBox(2, 0)
v_image.setAspectLocked(True)
v_image.addItem(self.image_plot)
#self.w.addItem(self.image_plot, 1, 0)
self.constellation = HyperTriange(points=[[0, 0], [0, 0], [0, 0]], pen=(255, 0, 0, 255))
v_image.addItem(self.constellation)
self.w.addLabel("Detection", 3, 0)
self.locations = []
self.trajectories = []
#v_image.addItem(self.location)
self.v_image = v_image
self.arm_btn = QPushButton("Arm")
self.arm_btn.clicked.connect(self.on_arm)
self.clear_btn = QPushButton("Clear")
self.clear_btn.clicked.connect(self.on_clear)
layout.addWidget(self.w)
layout.addWidget(self.arm_btn)
layout.addWidget(self.clear_btn)
widget = QWidget()
widget.setLayout(layout)
self.setCentralWidget(widget)
self.calibration = None
self.armtimer = QTimer()
self.armtimer.timeout.connect(self.on_arm)
self.status = pg.TextItem(text='READY', color=(200, 200, 200), html=None, anchor=(0, 0), border=(255,0,0), fill=(64,0,0), angle=0, rotateAxis=None, ensureInBounds=False)
v_image.addItem(self.status)
@pyqtSlot()
def on_clear(self):
for l in self.locations:
self.v_image.removeItem(l)
self.locations = []
for t in self.trajectories:
self.v_image.removeItem(t)
self.trajectories = []
@pyqtSlot()
def on_arm(self):
print('ARMING')
self.proto.arm(self.enter_signal)
self.armtimer.stop()
def enter_calib(self, calib):
self.calibration = calib
def enter_signal(self, ta, tb, tc):
#f = 2/240000000
f = 1
#t_pd = 0
#ta %= 0xFFFF
#tb %= 0xFFFF
#tc %= 0xFFFF
measurements = np.array([ta, tb, tc])
meas_sorted = np.sort(measurements)
min_idx = np.argmin(measurements)
#print("minidx", min_idx)
tmin = measurements[min_idx]
"""
diff_highest = meas_sorted[0] - meas_sorted[1]
if ((meas_sorted[0] - tmin) > diff_highest) or \
((meas_sorted[1] - tmin) > diff_highest):
print("Overflow detected")
measurements[min_idx] = measurements[min_idx] + 0xFFFFFFFF
ta, tb, tc = measurements
tmin = np.min(measurements)
"""
print("TA: ", ta)
print("TB: ", tb)
print("TC: ", tc)
print("TRA: ", ta-tmin)
print("TRB: ", tb-tmin)
print("TRC: ", tc-tmin)
t_pd = 22.8
ta -= t_pd
tb -= t_pd
tc -= t_pd
ta *= f
tb *= f
tc *= f
"""
solution,*_ = trilaterate(self.points['A'], self.points['B'], self.points['C'], ta, tb, tc)
#self.location.setData(pos=(solution[0], solution[1]))
self.location.setPos(solution[0], solution[1])
print(f"detection at {solution[0]}/{solution[1]}")
"""
#pixel_fact = 359.734/600
pixel_fact = 1
a = [c*pixel_fact for c in self.points['A']]
b = [c*pixel_fact for c in self.points['B']]
c = [c*pixel_fact for c in self.points['C']]
A = distance(b,c)
B = distance(a,c)
C = distance(a,b)
s_a = A/(self.calibration.get_a() * f)
s_b = B/(self.calibration.get_b() * f)
s_c = C/(self.calibration.get_c() * f)
#s_b *= 1000
#s_b = B/(500*f)
print("s_b", s_b)
#self.proto.arm(self.enter_signal)
self.armtimer.start(500)
"""
for l in self.locations:
self.v_image.removeItem(l)
self.locations = []
"""
#solution = tdoa(a,b,c,ta,tb,tc, v=s_b)
window = 3
v_index = 0
trajectory_x = []
trajectory_y = []
for i,v in enumerate(np.linspace(s_b/2, 5*s_b, 20)):
solution = hyper(a,b,c,ta,tb,tc, v=v)
print(f"v{i}={v}")
if (solution[0] > -self.image_width * window) and (solution[0] < self.image_width * window) and \
(solution[1] > -self.image_height * window) and (solution[1] < self.image_height * window):
trajectory_x.append(solution[0])
trajectory_y.append(solution[1])
if i == v_index:
arrow = pg.ArrowItem(pos=(solution[0]/pixel_fact, solution[1]/pixel_fact), brush=(0,255*(i/20),0), angle=-45)
self.v_image.addItem(arrow)
self.locations.append(arrow)
self.status.setText("DETECTED")
else:
self.status.setText("OUT OF BOUNDS")
plot = pg.PlotCurveItem(x=trajectory_x, y=trajectory_y, pen=(0,0,255))
self.trajectories.append(plot)
self.v_image.addItem(plot)
"""
s_b = 0.007547757513716785
solution = hyper(a,b,c,ta,tb,tc, v=s_b)
if (solution[0] > -self.image_width * window) and (solution[0] < self.image_width * window) and \
(solution[1] > -self.image_height * window) and (solution[1] < self.image_height * window):
arrow = pg.ArrowItem(pos=(solution[0]/pixel_fact, solution[1]/pixel_fact), brush=(255,0,0), angle=-45)
self.v_image.addItem(arrow)
self.locations.append(arrow)
self.status.setText("DETECTED")
else:
self.status.setText("OUT OF BOUNDS")
"""
#solutions = trilaterate_lin2(a,b,c,ta,tb,tc, v=s_b)
#solutions = trilaterate_lin(a,b,c,ta,tb,tc, v=s_b)
#solutions = trilaterate2(a, b, c, ta, tb, tc)#, v=s_b)
#solutions = trilaterate_lin(a,b,c,ta,tb,tc, v=s_b)
#for l in self.locations:
# self.v_image.removeItem(l)
# self.locations = []
#for solution in solutions:
# print("S:", solution)
# #if solution[2] < 0:
# arrow = pg.ArrowItem(pos=(solution[0]/pixel_fact, solution[1]/pixel_fact), angle=-45)
# self.v_image.addItem(arrow)
# self.locations.append(arrow)
# #else:
# # print("disregarded")
print("Solutions found: ", len(self.locations))
def set_image(self, image):
self.image_plot.setImage(image)
self.points = tagfind.find_tags(image)
self.image_width,self.image_height,_ = np.shape(image)
self.status.setPos(self.image_width/2, -10)
if self.points is None:
print("Points could not be detected")
else:
print("PA: ", self.points['A'])
print("PB: ", self.points['B'])
print("PC: ", self.points['C'])
self.constellation.setData(points=[self.points['A'], self.points['B'], self.points['C']])
app = QApplication(sys.argv)
loop = QEventLoop(app)
asyncio.set_event_loop(loop)
with loop:
coro = serial_asyncio.create_serial_connection(loop, OutputProtocol, '/dev/ttyUSB0', baudrate=115200)
transport, protocol = loop.run_until_complete(coro)
#protocol.calibrate(on_calib)
main = MainWindow(protocol)
main.show()
image = tagfind.get_image('realideal1.jpg')
main.set_image(image)
#main.enter_signal(197244236935, 197244235709, 197244236682)
#main.enter_calib(Calibration(41329.000000, 142402.000000, 1757.000000 , 110173.000000, 1593.000000, 62337.000000))
main.enter_calib(Calibration(148495.000000, 145277.000000, 59769.000000,63758.000000,157637.000000,47112.000000))
loop.run_forever()