diff --git a/python scripts/meshtastic_gnuradio_RX.py b/python scripts/meshtastic_gnuradio_RX.py index 7b91342..d13f061 100644 --- a/python scripts/meshtastic_gnuradio_RX.py +++ b/python scripts/meshtastic_gnuradio_RX.py @@ -11,11 +11,14 @@ import socket import zmq from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes from cryptography.hazmat.backends import default_backend -from meshtastic import protocols, mesh_pb2 +from meshtastic import protocols, mesh_pb2, admin_pb2, portnums_pb2, telemetry_pb2, mqtt_pb2 +from datetime import datetime # SDR output example data: ffffffffb45463dab971aa8c6308000078aacf76587a5a4cf4a20e2c1d0349ab3f72 # Use default key. Result should be: b'\x08\x01\x12\x0eTestingCLU1234' +debug=False + ##### START FUNCTIONS BLOCK ##### # Takes in a string encoded as hex, and emits them as a bytes encoded of the same hex representation @@ -39,6 +42,7 @@ parser.add_argument('-i', '--input', action='store', dest='input', help='SDR cap parser.add_argument('-k', '--key', action='store',dest='key', help='AES key override in Base64') parser.add_argument('-n', '--net', action='store',dest='net', help='Network TCP in ip or DNS. ZeroMQ protocol.') parser.add_argument('-p', '--port', action='store',dest='port', help='Network port') +parser.add_argument('-d', '--debug', action='store_true',dest='debug', help='Print more debug messages') args = parser.parse_args() ##### END PARSE COMMANDLINE INPUT ##### @@ -70,7 +74,8 @@ def parseAESKey(aesKey): if (aesKeyLength == 32 or aesKeyLength == 64): pass else: - print("The included AES key appears to be invalid. The key length is" , aesKeyLength , "and is not the key length of 128 or 256 bits.") + if debug: + print("The included AES key appears to be invalid. The key length is" , aesKeyLength , "and is not the key length of 128 or 256 bits.") sys.exit() @@ -105,6 +110,12 @@ def dataExtractor(data): 'reserved' : hexStringToBinary(data[28:32]), 'data' : hexStringToBinary(data[32:len(data)]) } + if debug: + print("dest " + str(meshPacketHex['dest'].hex()) + " sender " + str(meshPacketHex['sender'].hex())) + print("id " + str(int(meshPacketHex['packetID'].hex(),16))) + print("flags " + str(meshPacketHex['flags'].hex())) + print("chash " + str(meshPacketHex['channelHash'].hex())) + print("data " + str(meshPacketHex['data'].hex())) return meshPacketHex ##### END DATA EXTRACTION BLOCK ##### @@ -121,7 +132,8 @@ def dataDecryptor(meshPacketHex, aesKey): aesNonce = meshPacketHex['packetID'] + b'\x00\x00\x00\x00' + meshPacketHex['sender'] + b'\x00\x00\x00\x00' - # print("Nonce binary is:", aesNonce) + if debug: + print("Nonce binary is:", aesNonce.hex()) # print("Nonce length is:", len(aesNonce) ) @@ -131,30 +143,140 @@ def dataDecryptor(meshPacketHex, aesKey): # Do the decryption. Note, that this cipher is reversible, so running the cipher on encrypted gives decrypted, and running the cipher on decrypted gives encrypted. decryptedOutput = decryptor.update(meshPacketHex['data']) + decryptor.finalize() + if debug: + print("dec: "+ decryptedOutput.hex()) return decryptedOutput -##### END DECRYPTION PROCESS ##### +###### END DECRYPTION PROCESS ##### +# +# +# +###### START PROTOBUF DECODER ##### +# +#def decodeProtobuf(packetData): +# # print("Packet data:", packetData) +# data = mesh_pb2.Data() +# try: +# data.ParseFromString(packetData) +# +# handler = protocols.get(data.portnum) +# if handler.protobufFactory is None: +# pass +# else: +# pb = handler.protobufFactory() +# pb.ParseFromString(data.payload) +# except: +# data = "INVALID PROTOBUF:" +# return data +# +###### END PROTOBUF DECODER ##### + ##### START PROTOBUF DECODER ##### -def decodeProtobuf(packetData): - # print("Packet data:", packetData) +def decodeProtobuf(packetData, sourceID, destID): + data = mesh_pb2.Data() try: data.ParseFromString(packetData) - - handler = protocols.get(data.portnum) - if handler.protobufFactory is None: - pass - else: - pb = handler.protobufFactory() - pb.ParseFromString(data.payload) except: - data = "INVALID PROTOBUF:" + data = "INVALID PROTOBUF: " + str(packetData) + return data + + match data.portnum : + case 0 : # UNKNOWN_APP + data = "UNKNOWN_APP To be implemented" + case 1 : # TEXT_MESSAGE_APP + text_payload = data.payload.decode('utf-8') + data = "TEXT_MESSAGE_APP " + str(sourceID) + " -> " + str(destID) + " " + str(text_payload) + case 2 : # REMOTE_HARDWARE_APP + data = "REMOTE_HARDWARE_APP To be implemented" + case 3 : # POSITION_APP + pos = mesh_pb2.Position() + pos.ParseFromString(data.payload) + latitude = pos.latitude_i * 1e-7 + longitude = pos.longitude_i * 1e-7 + data="POSITION_APP " + str(sourceID) + " -> " + str(destID) + " " + str(latitude) +"," + str(longitude) + case 4 : # NODEINFO_APP + info = mesh_pb2.User() + info.ParseFromString(data.payload) + data = "NODEINFO_APP " + str(info) + case 5 : # ROUTING_APP + rtng = mesh_pb2.Routing() + rtng.ParseFromString(data.payload) + data = "TELEMETRY_APP " + str(rtng) + case 6 : # ADMIN_APP + admn = admin_pb2.AdminMessage() + admn.ParseFromString(data.payload) + data = "ADMIN_APP " + str(admn) + case 7 : # TEXT_MESSAGE_COMPRESSED_APP + data = "TEXT_MESSAGE_COMPRESSED_APP To be implemented" + case 10 : # DETECTION_SENSOR_APP + data = "DETECTION_SENSOR_APP To be implemented" + case 32 : # REPLY_APP + data = "REPLY_APP To be implemented" + case 33 : # IP_TUNNEL_APP + data = "IP_TUNNEL_APP To be implemented" + case 34 : # PAXCOUNTER_APP + data = "PAXCOUNTER_APP To be implemented" + case 64 : # SERIAL_APP + print(" ") + case 65 : # STORE_FORWARD_APP + sfwd = mesh_pb2.StoreAndForward() + sfwd.ParseFromString(data.payload) + data = "STORE_FORWARD_APP " + str(sfwd) + case 67 : # TELEMETRY_APP + env = telemetry_pb2.Telemetry() + env.ParseFromString(data.payload) + data = "TELEMETRY_APP " + str(env) + case 68 : # ZPS_APP + z_info = mesh_pb2.zps() + z_info.ParseFromString(data.payload) + data = "ZPS_APP " + str(z_info) + case 69 : # SIMULATOR_APP + data = "SIMULATOR_APP To be implemented" + case 70 : # TRACEROUTE_APP + trct= mesh_pb2.RouteDiscovery() + trct.ParseFromString(data.payload) + data = "TRACEROUTE_APP " + str(sourceID) + " -> " + str(destID) + " " + str(trct) + case 71 : # NEIGHBORINFO_APP + ninfo = mesh_pb2.NeighborInfo() + ninfo.ParseFromString(data.payload) + data = "NEIGHBORINFO_APP " + str(ninfo) + case 72 : # ATAK_PLUGIN + data = "ATAK_PLUGIN To be implemented" + case 73 : # MAP_REPORT_APP + mrpt = mesh_pb2.MapReport() + mrpt.ParseFromString(data.payload) + data = "MAP_REPORT_APP " + str(mrpt) + case 74 : # POWERSTRESS_APP + data = "POWERSTRESS_APP To be implemented" + case 256 : # PRIVATE_APP + data = "PRIVATE_APP To be implemented" + case 257 : # ATAK_FORWARDER + data = "ATAK_FORWARD To be implemented" + case _: + data = "UNKNOWN PROTOBUF" + return data +# # print("Packet data:", packetData) +# data = mesh_pb2.Data() +# try: +# data.ParseFromString(packetData) +# +# handler = protocols.get(data.portnum) +# if handler.protobufFactory is None: +# pass +# else: +# pb = handler.protobufFactory() +# pb.ParseFromString(data.payload) +# except: +# data = "INVALID PROTOBUF:" +# return data + ##### END PROTOBUF DECODER ##### @@ -171,17 +293,13 @@ def networkParse(ipAddr, port, aesKey): while True: if socket.poll(10) != 0: msg = socket.recv() - # Extracts data from network socket extractedData = dataExtractor(msg.hex()) - # Decrypts the payload + PacketID = extractedData['packetID'].hex() + if debug: + print("Packet: " + msg.hex()) decryptedData = dataDecryptor(extractedData, aesKey) - # Decodes the Protobuf if possible - protobufMessage = decodeProtobuf(decryptedData) - if(protobufMessage == "INVALID PROTOBUF:"): - print("INVALID PROTOBUF: ", end = '') - print(decryptedData) - else: - print(protobufMessage) + protobufMessage = decodeProtobuf(decryptedData, extractedData['sender'].hex(), extractedData['dest'].hex()) + print(protobufMessage) else: time.sleep(0.1) # wait 100ms and try again @@ -194,19 +312,24 @@ if __name__ == "__main__": # Network branch. Doesnt exit, so we need IP Port and AES key try: + if args.debug: + debug=True if len(args.net) > 0 and len(args.port) > 0: - print(args.net, args.port) + if debug: + print(args.net, args.port) networkParse(args.net, args.port, meshtasticFullKeyHex) except: # If we get a payload on commandline, decrypt and exit. meshPacketHex = dataExtractor(args.input) + if debug: + print(meshPacketHex) decryptedData = dataDecryptor(meshPacketHex, meshtasticFullKeyHex) protobufMessage = decodeProtobuf(decryptedData) if(protobufMessage == "INVALID PROTOBUF:"): - print("INVALID PROTOBUF: ", end = '') - print(decryptedData) + if debug: + print("INVALID PROTOBUF: ", end = '') + if debug: + print(decryptedData) else: print(protobufMessage) - -