Debugged crash handling, added date/time stamps, better protobuf handling

This commit is contained in:
Josh Conway 2024-09-12 00:58:32 +00:00
parent e5db8fe1f5
commit cd12ab44d7

View File

@ -13,6 +13,7 @@ from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.backends import default_backend from cryptography.hazmat.backends import default_backend
from meshtastic import protocols, mesh_pb2, admin_pb2, portnums_pb2, telemetry_pb2, mqtt_pb2 from meshtastic import protocols, mesh_pb2, admin_pb2, portnums_pb2, telemetry_pb2, mqtt_pb2
from datetime import datetime from datetime import datetime
from base64 import b64encode, b64decode
# SDR output example data: ffffffffb45463dab971aa8c6308000078aacf76587a5a4cf4a20e2c1d0349ab3f72 # SDR output example data: ffffffffb45463dab971aa8c6308000078aacf76587a5a4cf4a20e2c1d0349ab3f72
# Use default key. Result should be: b'\x08\x01\x12\x0eTestingCLU1234' # Use default key. Result should be: b'\x08\x01\x12\x0eTestingCLU1234'
@ -31,6 +32,11 @@ def bytesToHexString(byteString):
hexString = byteString.hex() hexString = byteString.hex()
return hexString return hexString
def msb2lsb(msb):
#string version of this. ONLY supports 32 bit from the sender/receiver ID. Hacky
lsb = msb[6] + msb[7] + msb[4] + msb[5] + msb[2] + msb[3] + msb[0] + msb[1]
return lsb
##### END FUNCTIONS BLOCK ##### ##### END FUNCTIONS BLOCK #####
@ -111,11 +117,13 @@ def dataExtractor(data):
'data' : hexStringToBinary(data[32:len(data)]) 'data' : hexStringToBinary(data[32:len(data)])
} }
if debug: if debug:
print("dest " + str(meshPacketHex['dest'].hex()) + " sender " + str(meshPacketHex['sender'].hex())) print("##### PACKET DATA START #####")
print("id " + str(int(meshPacketHex['packetID'].hex(),16))) print("dest " + msb2lsb(str(meshPacketHex['dest'].hex())) + " sender " + msb2lsb(str(meshPacketHex['sender'].hex())) )
print("id " + msb2lsb(str(int(meshPacketHex['packetID'].hex(),16))) )
print("flags " + str(meshPacketHex['flags'].hex())) print("flags " + str(meshPacketHex['flags'].hex()))
print("chash " + str(meshPacketHex['channelHash'].hex())) print("chanhash " + str(meshPacketHex['channelHash'].hex()))
print("data " + str(meshPacketHex['data'].hex())) print("data " + str(meshPacketHex['data'].hex()))
print("##### PACKET DATA END #####")
return meshPacketHex return meshPacketHex
##### END DATA EXTRACTION BLOCK ##### ##### END DATA EXTRACTION BLOCK #####
@ -133,7 +141,8 @@ def dataDecryptor(meshPacketHex, aesKey):
aesNonce = meshPacketHex['packetID'] + b'\x00\x00\x00\x00' + meshPacketHex['sender'] + b'\x00\x00\x00\x00' aesNonce = meshPacketHex['packetID'] + b'\x00\x00\x00\x00' + meshPacketHex['sender'] + b'\x00\x00\x00\x00'
if debug: if debug:
print("Nonce binary is:", aesNonce.hex()) print("AES nonce is: ", aesNonce.hex())
print("AES key used: ", str(b64encode(aesKey)))
# print("Nonce length is:", len(aesNonce) ) # print("Nonce length is:", len(aesNonce) )
@ -148,28 +157,6 @@ def dataDecryptor(meshPacketHex, aesKey):
return decryptedOutput return decryptedOutput
###### END DECRYPTION PROCESS ##### ###### END DECRYPTION PROCESS #####
#
#
#
###### START PROTOBUF DECODER #####
#
#def decodeProtobuf(packetData):
# # print("Packet data:", packetData)
# data = mesh_pb2.Data()
# try:
# data.ParseFromString(packetData)
#
# handler = protocols.get(data.portnum)
# if handler.protobufFactory is None:
# pass
# else:
# pb = handler.protobufFactory()
# pb.ParseFromString(data.payload)
# except:
# data = "INVALID PROTOBUF:"
# return data
#
###### END PROTOBUF DECODER #####
@ -182,7 +169,7 @@ def decodeProtobuf(packetData, sourceID, destID):
try: try:
data.ParseFromString(packetData) data.ParseFromString(packetData)
except: except:
data = "INVALID PROTOBUF: " + str(packetData) data = "INVALID PROTOBUF"
return data return data
match data.portnum : match data.portnum :
@ -262,21 +249,6 @@ def decodeProtobuf(packetData, sourceID, destID):
return data return data
# # print("Packet data:", packetData)
# data = mesh_pb2.Data()
# try:
# data.ParseFromString(packetData)
#
# handler = protocols.get(data.portnum)
# if handler.protobufFactory is None:
# pass
# else:
# pb = handler.protobufFactory()
# pb.ParseFromString(data.payload)
# except:
# data = "INVALID PROTOBUF:"
# return data
##### END PROTOBUF DECODER ##### ##### END PROTOBUF DECODER #####
@ -293,13 +265,18 @@ def networkParse(ipAddr, port, aesKey):
while True: while True:
if socket.poll(10) != 0: if socket.poll(10) != 0:
msg = socket.recv() msg = socket.recv()
timeNow = datetime.now()
print("Datetime: " + timeNow.strftime("%Y-%m-%d %H:%M:%S"))
extractedData = dataExtractor(msg.hex()) extractedData = dataExtractor(msg.hex())
PacketID = extractedData['packetID'].hex() PacketID = extractedData['packetID'].hex()
if debug: if debug:
print("Packet: " + msg.hex()) print("Packet: " + msg.hex())
decryptedData = dataDecryptor(extractedData, aesKey) decryptedData = dataDecryptor(extractedData, aesKey)
protobufMessage = decodeProtobuf(decryptedData, extractedData['sender'].hex(), extractedData['dest'].hex()) protobufMessage = decodeProtobuf(decryptedData, msb2lsb(extractedData['sender'].hex()), msb2lsb(extractedData['dest'].hex()) )
print(protobufMessage) if (protobufMessage == "INVALID PROTOBUF: "):
print(decryptedData)
else:
print(protobufMessage + "\n")
else: else:
time.sleep(0.1) # wait 100ms and try again time.sleep(0.1) # wait 100ms and try again
@ -332,4 +309,3 @@ if __name__ == "__main__":
print(decryptedData) print(decryptedData)
else: else:
print(protobufMessage) print(protobufMessage)