handles protobufs, better error handling

This commit is contained in:
Josh Conway 2024-06-04 06:14:00 +00:00
parent f9b7a046d6
commit 887ce6c81d

View File

@ -11,7 +11,7 @@ import socket
import zmq import zmq
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.backends import default_backend from cryptography.hazmat.backends import default_backend
from meshtastic import protocols, mesh_pb2
# SDR output example data: ffffffffb45463dab971aa8c6308000078aacf76587a5a4cf4a20e2c1d0349ab3f72 # SDR output example data: ffffffffb45463dab971aa8c6308000078aacf76587a5a4cf4a20e2c1d0349ab3f72
# Use default key. Result should be: b'\x08\x01\x12\x0eTestingCLU1234' # Use default key. Result should be: b'\x08\x01\x12\x0eTestingCLU1234'
@ -41,8 +41,6 @@ parser.add_argument('-n', '--net', action='store',dest='net', help='Network TCP
parser.add_argument('-p', '--port', action='store',dest='port', help='Network port') parser.add_argument('-p', '--port', action='store',dest='port', help='Network port')
args = parser.parse_args() args = parser.parse_args()
##### END PARSE COMMANDLINE INPUT ##### ##### END PARSE COMMANDLINE INPUT #####
@ -137,6 +135,30 @@ def dataDecryptor(meshPacketHex, aesKey):
##### END DECRYPTION PROCESS ##### ##### END DECRYPTION PROCESS #####
##### START PROTOBUF DECODER #####
def decodeProtobuf(packetData):
# print("Packet data:", packetData)
data = mesh_pb2.Data()
try:
data.ParseFromString(packetData)
handler = protocols.get(data.portnum)
if handler.protobufFactory is None:
pass
else:
pb = handler.protobufFactory()
pb.ParseFromString(data.payload)
except:
data = "INVALID PROTOBUF:"
return data
##### END PROTOBUF DECODER #####
##### START OPTIONAL NETWORK PROCESS ##### ##### START OPTIONAL NETWORK PROCESS #####
def networkParse(ipAddr, port, aesKey): def networkParse(ipAddr, port, aesKey):
@ -151,22 +173,19 @@ def networkParse(ipAddr, port, aesKey):
msg = socket.recv() # grab the message msg = socket.recv() # grab the message
extractedData = dataExtractor(msg.hex()) extractedData = dataExtractor(msg.hex())
decryptedData = dataDecryptor(extractedData, aesKey) decryptedData = dataDecryptor(extractedData, aesKey)
print(decryptedData) # print(decryptedData)
print(decodeProtobuf(decryptedData))
else: else:
time.sleep(0.1) # wait 100ms and try again time.sleep(0.1) # wait 100ms and try again
##### START OPTIONAL NETWORK PROCESS ##### ##### START OPTIONAL NETWORK PROCESS #####
if __name__ == "__main__": if __name__ == "__main__":
meshtasticFullKeyHex = parseAESKey(args.key) meshtasticFullKeyHex = parseAESKey(args.key)
# Network branch. Doesnt exit, so we need IP Port and AES key # Network branch. Doesnt exit, so we need IP Port and AES key
try: try:
print("do we have ip and port?")
if len(args.net) > 0 and len(args.port) > 0: if len(args.net) > 0 and len(args.port) > 0:
print(args.net, args.port) print(args.net, args.port)
networkParse(args.net, args.port, meshtasticFullKeyHex) networkParse(args.net, args.port, meshtasticFullKeyHex)
@ -174,8 +193,12 @@ if __name__ == "__main__":
# If we get a payload on commandline, decrypt and exit. # If we get a payload on commandline, decrypt and exit.
meshPacketHex = dataExtractor(args.input) meshPacketHex = dataExtractor(args.input)
decryptedData = dataDecryptor(meshPacketHex, meshtasticFullKeyHex) decryptedData = dataDecryptor(meshPacketHex, meshtasticFullKeyHex)
protobufMessage = decodeProtobuf(decryptedData)
if(protobufMessage == "INVALID PROTOBUF:"):
print("INVALID PROTOBUF: ", end = '')
print(decryptedData) print(decryptedData)
else:
print(protobufMessage)