#!/usr/bin/env python3 import argparse import serial import threading import queue import time from http.server import HTTPServer, BaseHTTPRequestHandler import serial from functools import partial from meshtastic import mesh_pb2 as mt START1 = 0x94 START2 = 0xC3 HEADER_LEN = 4 MAX_TO_FROM_RADIO_SIZE = 512 verbose = False def verbose_print(*args, **kwargs): if verbose: print(*args, **kwargs) def protobuf_writer(ser, toradio_bytes): buf_len = len(toradio_bytes) hex_payload = ' '.join([f"{x:02X}" for x in toradio_bytes]) verbose_print(f"$ Tx Radio Packet len={buf_len} payload={hex_payload}") header = bytes([START1, START2, (buf_len >> 8) & 0xFF, buf_len & 0xFF]) ser.write(header + toradio_bytes) ser.flush() def protobuf_reader(ser, queue): rx_buf = bytearray() log_buf = bytearray() while True: if ser.in_waiting > 0: # handle protobuf packet b = ser.read(1) c = int(b[0]) ptr = len(rx_buf) rx_buf = rx_buf + b if ptr == 0: if c != START1: rx_buf = bytearray() log_buf = log_buf + b else: verbose_print(f"$ Radio RX START1") elif ptr == 1: if c != START2: rx_buf = bytearray() else: verbose_print(f"$ Radio RX START2") elif ptr >= HEADER_LEN - 1: packetlen = (rx_buf[2] << 8) + rx_buf[3] #print(f"$ Radio RX preamble detected, len={packetlen}") if ptr == HEADER_LEN - 1: if packetlen > MAX_TO_FROM_RADIO_SIZE: rx_buf = bytearray() if len(rx_buf) != 0 and ptr + 1 >= packetlen + HEADER_LEN: try: packet = rx_buf[HEADER_LEN:] fromradio = mt.FromRadio() fromradio.ParseFromString(packet) queue.put(fromradio) verbose_print(f"$ Rx Radio Packet len={len(packet)}") except Exception as ex: print(f"$ Error: {ex}") rx_buf = bytearray() else: pass # handle log utf = '\0' try: utf = log_buf[-1].decode('utf-8') except: pass if utf == "\r": pass elif utf == "\n": log = log_buf[:-1].decode('utf-8') print(f"# {log}") else: pass else: time.sleep(0.1) class RequestHandler(BaseHTTPRequestHandler): def __init__(self, ser, queue, *args, **kwargs): self.ser = ser self.queue = queue super().__init__(*args, **kwargs) def do_GET(self): verbose_print(f"$ HTTP GET {self.path}") if self.path == "/index.html": self.send_response(200) self.end_headers() return if self.path == "/hotspot-detect.html": self.send_response(204) self.end_headers() return if self.path == "/api/v1/fromradio?all=false": self.send_response(200) self.send_header('Content-Type', 'application/x-protobuf') self.end_headers() ##time.sleep(0.3) pkt = self.queue.get() b = pkt.SerializeToString() try: self.wfile.write(b) except BrokenPipeError as ex: self.send_error(500, str(ex)) except OSError as ex: self.send_error(500, str(ex)) return "" self.send_error(404) def do_PUT(self): verbose_print(f"$ HTTP PUT {self.path}") if self.path == "/api/v1/toradio": try: clen = int(self.headers['Content-Length']) protobuf_writer(ser, self.rfile.read(clen)) return "" except Exception as ex: print(f"$ Error: {ex}") self.send_error(500, str(ex)) return return self.send_error(404) def log_message(self, fmt, *args): # quieter logs return if __name__ == "__main__": args = argparse.ArgumentParser() args.add_argument('--port', required=True, help='Serial device, e.g. /dev/ttyACM0 or COM7') args.add_argument('--baud', default=115200, help='Serial baud rate') args.add_argument('--bind', default='0.0.0.0:4403', help='IP address and port to listen to') args.add_argument('--verbose', action='store_true', help='Verbose output') args = args.parse_args() verbose = args.verbose q = queue.Queue() ser = serial.Serial(args.port, args.baud, timeout=10) threading.Thread(target=protobuf_reader, args=(ser, q,), daemon=True).start() host, port = args.bind.rsplit(':', 1) httpd = HTTPServer((host, int(port)), partial(RequestHandler, ser, q)) print("Meshtastic Serial to HTTP Bridge v0.1") print(f"$ Serving {args.port} on http://{host}:{port}") try: httpd.serve_forever() except KeyboardInterrupt: print(f"\n$ Shutting down HTTP service") httpd.shutdown()