diff --git a/python scripts/meshtastic_gnuradio_decoder.py b/python scripts/meshtastic_gnuradio_decoder.py deleted file mode 100644 index 7b91342..0000000 --- a/python scripts/meshtastic_gnuradio_decoder.py +++ /dev/null @@ -1,212 +0,0 @@ -# Joint copyright of Josh Conway and discord user:winter_soldier#1984 -# License is GPL3 (Gnu public license version 3) - - -import sys -import os -import time -import argparse -import base64 -import socket -import zmq -from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes -from cryptography.hazmat.backends import default_backend -from meshtastic import protocols, mesh_pb2 - -# SDR output example data: ffffffffb45463dab971aa8c6308000078aacf76587a5a4cf4a20e2c1d0349ab3f72 -# Use default key. Result should be: b'\x08\x01\x12\x0eTestingCLU1234' - -##### START FUNCTIONS BLOCK ##### - -# Takes in a string encoded as hex, and emits them as a bytes encoded of the same hex representation - -def hexStringToBinary(hexString): - binString = bytes.fromhex(hexString) - return binString - -def bytesToHexString(byteString): - hexString = byteString.hex() - return hexString - -##### END FUNCTIONS BLOCK ##### - - - -##### START PARSE COMMANDLINE INPUT ##### - -parser = argparse.ArgumentParser(description='Process incoming command parmeters') -parser.add_argument('-i', '--input', action='store', dest='input', help='SDR capture of the full Meshtastic LoRa string') -parser.add_argument('-k', '--key', action='store',dest='key', help='AES key override in Base64') -parser.add_argument('-n', '--net', action='store',dest='net', help='Network TCP in ip or DNS. ZeroMQ protocol.') -parser.add_argument('-p', '--port', action='store',dest='port', help='Network port') -args = parser.parse_args() - -##### END PARSE COMMANDLINE INPUT ##### - - - -##### START AES KEY ASSIGNMENT BLOCK ##### - -def parseAESKey(aesKey): - - # We look if there's a "NOKEY" declaration, a key provided, or an absence of key. We do the right thing depending on each choice. - # The "NOKEY" is basically ham mode. You're forbidden from using encryption. - # If you dont provide a key, we use the default one. We try to make it easy on our users! - # Note this format is in Base64 - - try: - if args.key == "0" or args.key == "NOKEY" or args.key == "nokey" or args.key == "NONE" or args.key == "none" or args.key == "HAM" or args.key == "ham": - meshtasticFullKeyBase64 = "AAAAAAAAAAAAAAAAAAAAAA==" - elif ( len(args.key) > 0 ): - meshtasticFullKeyBase64 = args.key - except: - meshtasticFullKeyBase64 = "1PG7OiApB1nwvP+rz05pAQ==" - - - - # Validate the key is 128bit/32byte or 256bit/64byte long. Fail if not. - - aesKeyLength = len(base64.b64decode(meshtasticFullKeyBase64).hex()) - if (aesKeyLength == 32 or aesKeyLength == 64): - pass - else: - print("The included AES key appears to be invalid. The key length is" , aesKeyLength , "and is not the key length of 128 or 256 bits.") - sys.exit() - - - # Convert the key FROM Base64 TO hexadecimal. - return base64.b64decode(meshtasticFullKeyBase64.encode('ascii')) - -##### END AES KEY ASSIGNMENT BLOCK ##### - - - -##### START DATA EXTRACTION BLOCK ##### - -def dataExtractor(data): - - # Now we split the data into the appropriate meshtastic packet structure using https://meshtastic.org/docs/overview/mesh-algo/ - # NOTE: The data coming out of GnuRadio is MSB or big endian. We have to reverse byte order after this step. - - # destination : 4 bytes - # sender : 4 bytes - # packetID : 4 bytes - # flags : 1 byte - # channelHash : 1 byte - # reserved : 2 bytes - # data : 0-237 bytes - - meshPacketHex = { - 'dest' : hexStringToBinary(data[0:8]), - 'sender' : hexStringToBinary(data[8:16]), - 'packetID' : hexStringToBinary(data[16:24]), - 'flags' : hexStringToBinary(data[24:26]), - 'channelHash' : hexStringToBinary(data[26:28]), - 'reserved' : hexStringToBinary(data[28:32]), - 'data' : hexStringToBinary(data[32:len(data)]) - } - return meshPacketHex - -##### END DATA EXTRACTION BLOCK ##### - - - -##### START DECRYPTION PROCESS ##### - -def dataDecryptor(meshPacketHex, aesKey): - - # Build the nonce. This is (packetID)+(00000000)+(sender)+(00000000) for a total of 128bit - # Even though sender is a 32 bit number, internally its used as a 64 bit number. - # Needs to be a bytes array for AES function. - - aesNonce = meshPacketHex['packetID'] + b'\x00\x00\x00\x00' + meshPacketHex['sender'] + b'\x00\x00\x00\x00' - - # print("Nonce binary is:", aesNonce) - # print("Nonce length is:", len(aesNonce) ) - - - # Initialize the cipher - cipher = Cipher(algorithms.AES(meshtasticFullKeyHex), modes.CTR(aesNonce), backend=default_backend()) - decryptor = cipher.decryptor() - - # Do the decryption. Note, that this cipher is reversible, so running the cipher on encrypted gives decrypted, and running the cipher on decrypted gives encrypted. - decryptedOutput = decryptor.update(meshPacketHex['data']) + decryptor.finalize() - return decryptedOutput - -##### END DECRYPTION PROCESS ##### - - - -##### START PROTOBUF DECODER ##### - -def decodeProtobuf(packetData): - # print("Packet data:", packetData) - data = mesh_pb2.Data() - try: - data.ParseFromString(packetData) - - handler = protocols.get(data.portnum) - if handler.protobufFactory is None: - pass - else: - pb = handler.protobufFactory() - pb.ParseFromString(data.payload) - except: - data = "INVALID PROTOBUF:" - return data - -##### END PROTOBUF DECODER ##### - - - -##### START OPTIONAL NETWORK PROCESS ##### - -def networkParse(ipAddr, port, aesKey): - - context = zmq.Context() - socket = context.socket(zmq.SUB) - socket.connect("tcp://" + ipAddr + ":" + port) # connect, not bind, the PUB will bind, only 1 can bind - socket.setsockopt(zmq.SUBSCRIBE, b'') # subscribe to topic of all (needed or else it won't work) - - while True: - if socket.poll(10) != 0: - msg = socket.recv() - # Extracts data from network socket - extractedData = dataExtractor(msg.hex()) - # Decrypts the payload - decryptedData = dataDecryptor(extractedData, aesKey) - # Decodes the Protobuf if possible - protobufMessage = decodeProtobuf(decryptedData) - if(protobufMessage == "INVALID PROTOBUF:"): - print("INVALID PROTOBUF: ", end = '') - print(decryptedData) - else: - print(protobufMessage) - - else: - time.sleep(0.1) # wait 100ms and try again - -##### START OPTIONAL NETWORK PROCESS ##### - - -if __name__ == "__main__": - meshtasticFullKeyHex = parseAESKey(args.key) - - # Network branch. Doesnt exit, so we need IP Port and AES key - try: - if len(args.net) > 0 and len(args.port) > 0: - print(args.net, args.port) - networkParse(args.net, args.port, meshtasticFullKeyHex) - except: - # If we get a payload on commandline, decrypt and exit. - meshPacketHex = dataExtractor(args.input) - decryptedData = dataDecryptor(meshPacketHex, meshtasticFullKeyHex) - protobufMessage = decodeProtobuf(decryptedData) - if(protobufMessage == "INVALID PROTOBUF:"): - print("INVALID PROTOBUF: ", end = '') - print(decryptedData) - else: - print(protobufMessage) - - -