diff --git a/python/README.md b/python/README.md index ffa7c568..8350e75e 100644 --- a/python/README.md +++ b/python/README.md @@ -22,10 +22,15 @@ FusionEngine message specification. ### Applications +- [p1_capture](fusion_engine_client/applications/p1_capture.py) - Connect to a FusionEngine device in real time over + serial, TCP, UDP, or UNIX domain socket, and display incoming FusionEngine contents and/or log the incoming data to + disk - [p1_display](fusion_engine_client/applications/p1_display.py) - Generate plots of vehicle trajectory, GNSS signal status, wheel speed measurements, etc. from a file of logged FusionEngine messages - [p1_extract](fusion_engine_client/applications/p1_extract.py) - Extract FusionEngine messages from a binary file containing multiple data streams (e.g., interleaved RTCM and FusionEngine messages) +- [p1_filter](fusion_engine_client/applications/p1_filter.py) - Filter an incoming FusionEngine data stream, outputting + a new FusionEngine stream containing only the requested messages - [p1_lband_extract](fusion_engine_client/applications/p1_lband_extract.py) - Extract L-band data bits contained from a log of FusionEngine `LBandFrameMessage` messages - [p1_print](fusion_engine_client/applications/p1_print.py) - Print the contents of FusionEngine messages found in a @@ -59,13 +64,12 @@ FusionEngine message specification. optionally mixed with other binary data, and decode the contents using the `FusionEngineDecoder` helper class - [send_command.py](examples/send_command.py) - Send a command to a device over serial or TCP, and wait for a response - - [serial_client.py](examples/serial_client.py) - Connect to a device over a local serial port and decode messages - in real time to be displayed and/or logged to disk using the `FusionEngineDecoder` helper class - - [tcp_client.py](examples/tcp_client.py) - Connect to a device over TCP and decode messages in real time to be - displayed and/or logged to disk using the `FusionEngineDecoder` helper class + - [serial_client.py](examples/serial_client.py) - Connect to a device over a local serial port and decode/print + incoming FusionEngine messages + - [tcp_client.py](examples/tcp_client.py) - Connect to a device over TCP and decode messages in real time and + decode/print incoming FusionEngine messages - [udp_client.py](examples/udp_client.py) - Connect to a device over UDP and decode/display messages in real time - - Unlike [tcp_client.py](examples/tcp_client.py), currently assumes all incoming UDP packets contain - FusionEngine messages and does not use the `FusionEngineDecoder` helper class + and decode/print incoming FusionEngine messages - `fusion_engine_client` - Top-level Python package directory - `analysis` - [analyzer.py](fusion_engine_client/analysis/analyzer.py) - `Analyzer` class, used by diff --git a/python/examples/serial_client.py b/python/examples/serial_client.py index 3b0d813b..ebe688cb 100755 --- a/python/examples/serial_client.py +++ b/python/examples/serial_client.py @@ -1,6 +1,5 @@ #!/usr/bin/env python3 -from datetime import datetime import os import sys @@ -15,115 +14,41 @@ root_dir = os.path.normpath(os.path.join(os.path.dirname(__file__), '..')) sys.path.insert(0, root_dir) -from fusion_engine_client.applications.p1_print import add_print_format_argument, print_message -from fusion_engine_client.parsers import FusionEngineDecoder -from fusion_engine_client.utils import trace as logging from fusion_engine_client.utils.argument_parser import ArgumentParser +from fusion_engine_client.messages import MessagePayload +from fusion_engine_client.parsers import FusionEngineDecoder if __name__ == "__main__": + # Parse command-line arguments. parser = ArgumentParser(description="""\ -Connect to an Point One device over serial and print out the incoming message -contents and/or log the messages to disk. +Connect to a Point One device over serial and print out the incoming message +contents. """) - parser.add_argument('-b', '--baud', type=int, default=460800, help="The serial baud rate to be used.") - add_print_format_argument(parser, '--display-format') - parser.add_argument('-f', '--format', default='p1log', choices=('p1log', 'raw'), - help="The format of the file to be generated when --output is enabled." - "If 'p1log' (default), create a *.p1log file containing only FusionEngine messages." - "If 'raw', create a generic binary file containing all incoming data.") - parser.add_argument('-n', '--no-display', dest='display', action='store_false', - help="Do not display the incoming message contents.") - parser.add_argument('-o', '--output', type=str, - help="The path to a file where incoming data will be stored.") - parser.add_argument('-q', '--quiet', dest='quiet', action='store_true', - help="Do not print anything to the console.") - parser.add_argument('port', help="The serial device to use (e.g., /dev/ttyUSB0, COM1)") options = parser.parse_args() - if options.quiet: - options.display = False - - logging.basicConfig(format='%(asctime)s - %(levelname)s - %(name)s:%(lineno)d - %(message)s', stream=sys.stdout) - logger = logging.getLogger('point_one.fusion_engine') - logger.setLevel(logging.INFO) - - # Open the output file if logging was requested. - if options.output is not None: - if options.format == 'p1log': - p1i_path = os.path.splitext(options.output)[0] + '.p1i' - if os.path.exists(p1i_path): - os.remove(p1i_path) - - output_file = open(options.output, 'wb') - else: - output_file = None - - generating_raw_log = (output_file is not None and options.format == 'raw') - generating_p1log = (output_file is not None and options.format == 'p1log') - # Connect to the device. - port = serial.Serial(port=options.port, baudrate=options.baud) - - # Listen for incoming data. - decoder = FusionEngineDecoder(warn_on_unrecognized=not options.quiet, return_bytes=True) - bytes_received = 0 - messages_received = 0 - start_time = datetime.now() - last_print_time = start_time - while True: - # Read some data. - try: - received_data = port.read(1024) - bytes_received += len(received_data) - - if not options.quiet: - now = datetime.now() - if (now - last_print_time).total_seconds() > 5.0: - print('Status: [bytes_received=%d, messages_received=%d elapsed_time=%d sec]' % - (bytes_received, messages_received, (now - start_time).total_seconds())) - last_print_time = now - except serial.SerialException as e: - print('Unexpected error reading from device:\r%s' % str(e)) - break - except KeyboardInterrupt: - break - - # If logging in raw format, write the data to disk as is. - if generating_raw_log: - output_file.write(received_data) - - # Decode the incoming data and print the contents of any complete messages. - # - # Note that we pass the data to the decoder, even if --no-display was requested, for three reasons: - # - So that we get a count of the number of incoming messages - # - So we print warnings if the CRC fails on any of the incoming data - # - If we are logging in *.p1log format, so the decoder can extract the FusionEngine data from any - # non-FusionEngine data in the stream - messages = decoder.on_data(received_data) - messages_received += len(messages) - - if options.display or generating_p1log: - for (header, message, raw_data) in messages: - if generating_p1log: - output_file.write(raw_data) - - if options.display: - print_message(header, message, format=options.display_format) - - # Close the serial port. - port.close() - - # Close the output file. - if output_file is not None: - output_file.close() - - if not options.quiet: - now = datetime.now() - elapsed_sec = (now - last_print_time).total_seconds() if last_print_time else 0.0 - print('Status: [bytes_received=%d, messages_received=%d elapsed_time=%d sec]' % - (bytes_received, messages_received, elapsed_sec)) + transport = serial.Serial(port=options.port, baudrate=options.baud) + + # Listen for incoming data and parse FusionEngine messages. + try: + decoder = FusionEngineDecoder() + while True: + received_data = transport.read(1024) + messages = decoder.on_data(received_data) + for header, message in messages: + if isinstance(message, MessagePayload): + print(str(message)) + else: + print(f'{header.message_type} message (not supported)') + except KeyboardInterrupt: + pass + except serial.SerialException as e: + print('Unexpected error reading from device:\r%s' % str(e)) + + # Close the transport when finished. + transport.close() diff --git a/python/examples/tcp_client.py b/python/examples/tcp_client.py index 8e3ce8cb..278e46d4 100755 --- a/python/examples/tcp_client.py +++ b/python/examples/tcp_client.py @@ -1,6 +1,5 @@ #!/usr/bin/env python3 -from datetime import datetime import os import socket import sys @@ -10,108 +9,40 @@ root_dir = os.path.normpath(os.path.join(os.path.dirname(__file__), '..')) sys.path.insert(0, root_dir) -from fusion_engine_client.applications.p1_print import add_print_format_argument, print_message -from fusion_engine_client.parsers import FusionEngineDecoder from fusion_engine_client.utils.argument_parser import ArgumentParser +from fusion_engine_client.messages import MessagePayload +from fusion_engine_client.parsers import FusionEngineDecoder if __name__ == "__main__": + # Parse command-line arguments. parser = ArgumentParser(description="""\ -Connect to an Point One device over TCP and print out the incoming message -contents and/or log the messages to disk. +Connect to a Point One device over TCP and print out the incoming message +contents. """) - - add_print_format_argument(parser, '--display-format') - parser.add_argument('-f', '--format', default='p1log', choices=('p1log', 'raw'), - help="The format of the file to be generated when --output is enabled." - "If 'p1log' (default), create a *.p1log file containing only FusionEngine messages." - "If 'raw', create a generic binary file containing all incoming data.") - parser.add_argument('-n', '--no-display', dest='display', action='store_false', - help="Do not display the incoming message contents.") - parser.add_argument('-o', '--output', type=str, - help="The path to a file where incoming data will be stored.") parser.add_argument('-p', '--port', type=int, default=30201, help="The FusionEngine TCP port on the data source.") - parser.add_argument('-q', '--quiet', dest='quiet', action='store_true', - help="Do not print anything to the console.") - parser.add_argument('hostname', help="The IP address or hostname of the data source.") options = parser.parse_args() - if options.quiet: - options.display = False - - # Open the output file if logging was requested. - if options.output is not None: - if options.format == 'p1log': - p1i_path = os.path.splitext(options.output)[0] + '.p1i' - if os.path.exists(p1i_path): - os.remove(p1i_path) - - output_file = open(options.output, 'wb') - else: - output_file = None - - generating_raw_log = (output_file is not None and options.format == 'raw') - generating_p1log = (output_file is not None and options.format == 'p1log') - # Connect to the device. - sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - sock.connect((socket.gethostbyname(options.hostname), options.port)) - - # Listen for incoming data. - decoder = FusionEngineDecoder(warn_on_unrecognized=not options.quiet, return_bytes=True) - bytes_received = 0 - messages_received = 0 - start_time = datetime.now() - last_print_time = start_time - while True: - # Read some data. - try: - received_data = sock.recv(1024) - bytes_received += len(received_data) - - if not options.quiet: - now = datetime.now() - if (now - last_print_time).total_seconds() > 5.0: - print('Status: [bytes_received=%d, messages_received=%d elapsed_time=%d sec]' % - (bytes_received, messages_received, (now - start_time).total_seconds())) - last_print_time = now - except KeyboardInterrupt: - break - - # If logging in raw format, write the data to disk as is. - if generating_raw_log: - output_file.write(received_data) - - # Decode the incoming data and print the contents of any complete messages. - # - # Note that we pass the data to the decoder, even if --no-display was requested, for three reasons: - # - So that we get a count of the number of incoming messages - # - So we print warnings if the CRC fails on any of the incoming data - # - If we are logging in *.p1log format, so the decoder can extract the FusionEngine data from any - # non-FusionEngine data in the stream - messages = decoder.on_data(received_data) - messages_received += len(messages) - - if options.display or generating_p1log: - for (header, message, raw_data) in messages: - if generating_p1log: - output_file.write(raw_data) - - if options.display: - print_message(header, message, format=options.display_format) - - # Close the socket. - sock.close() - - # Close the output file. - if output_file is not None: - output_file.close() - - if not options.quiet: - now = datetime.now() - elapsed_sec = (now - last_print_time).total_seconds() if last_print_time else 0.0 - print('Status: [bytes_received=%d, messages_received=%d elapsed_time=%d sec]' % - (bytes_received, messages_received, elapsed_sec)) + transport = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + transport.connect((socket.gethostbyname(options.hostname), options.port)) + + # Listen for incoming data and parse FusionEngine messages. + try: + decoder = FusionEngineDecoder() + while True: + received_data = transport.recv(1024) + messages = decoder.on_data(received_data) + for header, message in messages: + if isinstance(message, MessagePayload): + print(str(message)) + else: + print(f'{header.message_type} message (not supported)') + except KeyboardInterrupt: + pass + + # Close the transport when finished. + transport.close() diff --git a/python/examples/udp_client.py b/python/examples/udp_client.py index 6983ef9f..b482c5fb 100755 --- a/python/examples/udp_client.py +++ b/python/examples/udp_client.py @@ -1,133 +1,49 @@ #!/usr/bin/env python3 -from datetime import datetime import os -import math import socket import sys -import time # Add the Python root directory (fusion-engine-client/python/) to the import search path to enable FusionEngine imports # if this application is being run directly out of the repository and is not installed as a pip package. root_dir = os.path.normpath(os.path.join(os.path.dirname(__file__), '..')) sys.path.insert(0, root_dir) -from fusion_engine_client.applications.p1_print import add_print_format_argument, print_message -from fusion_engine_client.parsers import FusionEngineDecoder from fusion_engine_client.utils.argument_parser import ArgumentParser +from fusion_engine_client.messages import MessagePayload +from fusion_engine_client.parsers import FusionEngineDecoder if __name__ == "__main__": + # Parse command-line arguments. parser = ArgumentParser(description="""\ Connect to a Point One device over UDP and print out the incoming message -contents and/or log the messages to disk. +contents. When using UDP, you must configure the device to send data to your machine. """) - - add_print_format_argument(parser, '--display-format') - parser.add_argument('-f', '--format', default='p1log', choices=('p1log', 'raw', 'csv'), - help="""\ -The format of the file to be generated when --output is enabled: -- p1log - (default), create a *.p1log file containing only FusionEngine messages." -- raw - create a generic binary file containing all incoming data." -- csv - create a csv of time vs message type.""") - parser.add_argument('-n', '--no-display', dest='display', action='store_false', - help="Do not display the incoming message contents.") - parser.add_argument('-o', '--output', type=str, - help="The path to a file where incoming data will be stored.") parser.add_argument('-p', '--port', type=int, default=30400, help="The FusionEngine UDP port on the data source.") - parser.add_argument('-q', '--quiet', dest='quiet', action='store_true', - help="Do not print anything to the console.") options = parser.parse_args() - if options.quiet: - options.display = False - - # Open the output file if logging was requested. - if options.output is not None: - if options.format == 'p1log': - p1i_path = os.path.splitext(options.output)[0] + '.p1i' - if os.path.exists(p1i_path): - os.remove(p1i_path) - - output_file = open(options.output, 'wb') - else: - output_file = None - - generating_raw_log = (output_file is not None and options.format == 'raw') - generating_p1log = (output_file is not None and options.format == 'p1log') - generating_csv = (output_file is not None and options.format == 'csv') - - if generating_csv: - output_file.write(b'host_time,type,p1_time,sys_time\n') - # Connect to the device. - sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) - sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - sock.bind(('', options.port)) - - # Listen for incoming data. - decoder = FusionEngineDecoder(warn_on_unrecognized=not options.quiet, return_bytes=True) - bytes_received = 0 - messages_received = 0 - start_time = datetime.now() - last_print_time = start_time - while True: - # Read some data. - try: - received_data = sock.recv(1024) - bytes_received += len(received_data) - - if not options.quiet: - now = datetime.now() - if (now - last_print_time).total_seconds() > 5.0: - print('Status: [bytes_received=%d, messages_received=%d, elapsed_time=%d sec]' % - (bytes_received, messages_received, (now - start_time).total_seconds())) - last_print_time = now - except KeyboardInterrupt: - break - - # If logging in raw format, write the data to disk as is. - if generating_raw_log: - output_file.write(received_data) - - # Decode the incoming data and print the contents of any complete messages. - # - # Note that we pass the data to the decoder, even if --no-display was requested, for three reasons: - # - So that we get a count of the number of incoming messages - # - So we print warnings if the CRC fails on any of the incoming data - # - If we are logging in *.p1log format, so the decoder can extract the FusionEngine data from any - # non-FusionEngine data in the stream - messages = decoder.on_data(received_data) - messages_received += len(messages) - - if options.display or generating_p1log or generating_csv: - for (header, message, raw_data) in messages: - if generating_p1log: - output_file.write(raw_data) - - if options.display: - print_message(header, message, format=options.display_format) - - if generating_csv: - p1_time = message.get_p1_time() - sys_time = message.get_system_time_sec() - p1_str = str(p1_time.seconds) if p1_time is not None and not math.isnan(p1_time) else '' - sys_str = str(sys_time) if sys_time is not None and not math.isnan(sys_time) else '' - output_file.write(f'{time.monotonic()},{header.message_type},{p1_str},{sys_str}\n'.encode('utf-8')) - - - # Close the socket. - sock.close() - - # Close the output file. - if output_file is not None: - output_file.close() - - if not options.quiet: - now = datetime.now() - elapsed_sec = (now - last_print_time).total_seconds() if last_print_time else 0.0 - print('Status: [bytes_received=%d, messages_received=%d, elapsed_time=%d sec]' % - (bytes_received, messages_received, elapsed_sec)) + transport = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + transport.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + transport.bind(('', options.port)) + + # Listen for incoming data and parse FusionEngine messages. + try: + decoder = FusionEngineDecoder() + while True: + received_data = transport.recv(1024) + messages = decoder.on_data(received_data) + for header, message in messages: + if isinstance(message, MessagePayload): + print(str(message)) + else: + print(f'{header.message_type} message (not supported)') + except KeyboardInterrupt: + pass + + # Close the transport when finished. + transport.close() diff --git a/python/fusion_engine_client/applications/p1_capture.py b/python/fusion_engine_client/applications/p1_capture.py new file mode 100755 index 00000000..6cc09abe --- /dev/null +++ b/python/fusion_engine_client/applications/p1_capture.py @@ -0,0 +1,266 @@ +#!/usr/bin/env python3 + +from typing import Union + +from datetime import datetime +import math +import os +import re +import select +import socket +import sys +import time + +import colorama + +try: + # pySerial is optional. + import serial + serial_supported = True +except ImportError: + serial_supported = False + # Dummy stand-in if pySerial is not installed. + class serial: + class SerialException: pass + +if __package__ is None or __package__ == "": + from import_utils import enable_relative_imports + __package__ = enable_relative_imports(__name__, __file__) + +from ..parsers import FusionEngineDecoder +from ..utils import trace as logging +from ..utils.argument_parser import ArgumentParser, ExtendedBooleanAction +from ..utils.print_utils import \ + DeviceSummary, add_print_format_argument, print_message, print_summary_table + +_logger = logging.getLogger('point_one.fusion_engine.applications.p1_capture') + + +def create_transport(descriptor: str) -> Union[socket.socket, serial.Serial]: + m = re.match(r'^tcp://([a-zA-Z0-9-_.]+)?:([0-9]+)$', descriptor) + if m: + transport = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + transport.connect((socket.gethostbyname(m.group(1)), int(m.group(2)))) + return transport + + m = re.match(r'^udp://:([0-9]+)$', descriptor) + if m: + transport = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + transport.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + transport.bind(('', int(m.group(1)))) + return transport + + m = re.match(r'^unix://([a-zA-Z0-9-_./]+)$', descriptor) + if m: + transport = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + transport.connect(m.group(1)) + return transport + + m = re.match(r'^(?:(?:serial|tty)://)?([^:]+):([0-9]+)$', descriptor) + if m: + if serial_supported: + transport = serial.Serial(port=m.group(1), baudrate=int(m.group(2))) + return transport + else: + raise RuntimeError( + "This application requires pyserial. Please install (pip install pyserial) and run again.") + + raise ValueError('Unsupported transport descriptor.') + + +def main(): + # Parse command-line arguments. + parser = ArgumentParser(description="""\ +Connect to a Point One device and print out the incoming FusionEngine message +contents and/or log the messages to disk. +""") + add_print_format_argument(parser, '--display-format') + parser.add_argument( + '--display', action=ExtendedBooleanAction, default=True, + help="Print the incoming message contents to the console.") + parser.add_argument( + '-q', '--quiet', dest='quiet', action=ExtendedBooleanAction, default=False, + help="Do not print anything to the console.") + parser.add_argument( + '-s', '--summary', action=ExtendedBooleanAction, default=False, + help="Print a summary of the incoming messages instead of the message content.") + parser.add_argument( + '-v', '--verbose', action='count', default=0, + help="Print verbose/trace debugging messages.") + + file_group = parser.add_argument_group('File Capture') + file_group.add_argument( + '-f', '--output-format', default='p1log', choices=('p1log', 'raw', 'csv'), + help="""\ +The format of the file to be generated when --output is enabled: +- p1log - Create a *.p1log file containing only FusionEngine messages (default) +- raw - Create a generic binary file containing all incoming data +- csv - Create a CSV file with the received message types and timestamps""") + file_group.add_argument( + '-o', '--output', type=str, + help="The path to a file where incoming data will be stored.") + + parser.add_argument( + 'transport', + help="""\ +The method used to communicate with the target device: +- tcp://HOSTNAME:PORT - Connect to the specified hostname (or IP address) and + port over TCP (e.g., tty://192.168.0.3:30201) +- udp://:PORT - Listen for incoming data on the specified UDP port (e.g., + udp://:12345) + Note: When using UDP, you must configure the device to send data to your + machine. +- unix://FILENAME - Connect to the specified UNIX domain socket file +- [tty://]DEVICE:BAUD - Connect to a serial device with the specified baud rate + (e.g., tty:///dev/ttyUSB0:460800 or /dev/ttyUSB0:460800) +""") + + options = parser.parse_args() + + if options.quiet: + options.display = False + + # Configure logging. + if options.verbose >= 1: + logging.basicConfig(level=logging.INFO, format='%(levelname)s - %(name)s:%(lineno)d - %(message)s', + stream=sys.stdout) + if options.verbose == 1: + logging.getLogger('point_one.fusion_engine.parsers').setLevel(logging.DEBUG) + else: + logging.getLogger('point_one.fusion_engine.parsers').setLevel( + logging.getTraceLevel(depth=options.verbose - 1)) + else: + logging.basicConfig(level=logging.INFO, format='%(message)s', stream=sys.stdout) + + # Connect to the device using the specified transport. + try: + transport = create_transport(options.transport) + except Exception as e: + _logger.error(str(e)) + sys.exit(1) + + # Open the output file if logging was requested. + if options.output is not None: + if options.format == 'p1log': + p1i_path = os.path.splitext(options.output)[0] + '.p1i' + if os.path.exists(p1i_path): + os.remove(p1i_path) + + output_file = open(options.output, 'wb') + else: + output_file = None + + generating_raw_log = (output_file is not None and options.format == 'raw') + generating_p1log = (output_file is not None and options.format == 'p1log') + generating_csv = (output_file is not None and options.format == 'csv') + + if generating_csv: + output_file.write(b'host_time,type,p1_time,sys_time\n') + + # If this is a TCP/UDP socket, configure it for non-blocking reads. We'll apply a read timeout with select() below. + read_timeout_sec = 1.0 + if isinstance(transport, socket.socket): + transport.setblocking(0) + # If this is a serial port, configure its read timeout. + else: + transport.timeout = read_timeout_sec + + # Listen for incoming data. + decoder = FusionEngineDecoder(warn_on_unrecognized=not options.quiet and not options.summary, return_bytes=True) + + bytes_received = 0 + messages_received = 0 + device_summary = DeviceSummary() + + start_time = datetime.now() + last_print_time = start_time + print_timeout_sec = 1.0 if options.summary else 5.0 + + def _print_status(now): + if options.summary: + # Clear the terminal. + print(colorama.ansi.CSI + 'H' + colorama.ansi.CSI + 'J', end='') + _logger.info('Status: [bytes_received=%d, messages_received=%d, elapsed_time=%d sec]' % + (bytes_received, messages_received, (now - start_time).total_seconds())) + if options.summary: + print_summary_table(device_summary) + + try: + while True: + # Read some data. + try: + # If this is a TCP/UDP socket, use select() to implement a read timeout so we can wakeup periodically + # and print status if there's no incoming data. + if isinstance(transport, socket.socket): + ready = select.select([transport], [], [], read_timeout_sec) + if ready[0]: + received_data = transport.recv(1024) + else: + received_data = [] + # If this is a serial port, we set the read timeout above. + else: + received_data = transport.read(1024) + + bytes_received += len(received_data) + + now = datetime.now() + if not options.quiet: + if (now - last_print_time).total_seconds() > print_timeout_sec: + _print_status(now) + last_print_time = now + except serial.SerialException as e: + _logger.error('Unexpected error reading from device:\r%s' % str(e)) + break + + # If logging in raw format, write the data to disk as is. + if generating_raw_log: + output_file.write(received_data) + + # Decode the incoming data and print the contents of any complete messages. + # + # Note that we pass the data to the decoder, even if --no-display was requested, for three reasons: + # - So that we get a count of the number of incoming messages + # - So we print warnings if the CRC fails on any of the incoming data + # - If we are logging in *.p1log format, so the decoder can extract the FusionEngine data from any + # non-FusionEngine data in the stream + messages = decoder.on_data(received_data) + messages_received += len(messages) + + if options.display or generating_p1log: + for (header, message, raw_data) in messages: + device_summary.update(header, message) + + if generating_p1log: + output_file.write(raw_data) + + if generating_csv: + p1_time = message.get_p1_time() + sys_time = message.get_system_time_sec() + p1_str = str(p1_time.seconds) if p1_time is not None and not math.isnan(p1_time) else '' + sys_str = str(sys_time) if sys_time is not None and not math.isnan(sys_time) else '' + output_file.write( + f'{time.monotonic()},{header.message_type},{p1_str},{sys_str}\n'.encode('utf-8')) + + if options.display: + if options.summary: + if (now - last_print_time).total_seconds() > 0.1: + _print_status(now) + else: + print_message(header, message, format=options.display_format) + except KeyboardInterrupt: + pass + + # Close the transport. + transport.close() + + # Close the output file. + if output_file is not None: + output_file.close() + + if not options.quiet and not options.summary: + now = datetime.now() + _print_status(now) + + +if __name__ == "__main__": + main() diff --git a/python/fusion_engine_client/applications/p1_print.py b/python/fusion_engine_client/applications/p1_print.py index 5f0fc1f1..efab1fd3 100755 --- a/python/fusion_engine_client/applications/p1_print.py +++ b/python/fusion_engine_client/applications/p1_print.py @@ -1,6 +1,5 @@ #!/usr/bin/env python3 -from collections import defaultdict import sys if __package__ is None or __package__ == "": @@ -11,87 +10,14 @@ from ..parsers import MixedLogReader from ..utils import trace as logging from ..utils.argument_parser import ArgumentParser, ExtendedBooleanAction, CSVAction -from ..utils.bin_utils import bytes_to_hex from ..utils.log import locate_log, DEFAULT_LOG_BASE_DIR +from ..utils.print_utils import DeviceSummary, add_print_format_argument, print_message, print_summary_table from ..utils.time_range import TimeRange from ..utils.trace import HighlightFormatter, BrokenPipeStreamHandler _logger = logging.getLogger('point_one.fusion_engine.applications.print_contents') -def add_print_format_argument(parser, *arg_names): - parser.add_argument( - *arg_names, - choices=['binary', 'pretty', 'pretty-binary', 'pretty-binary-payload', - 'oneline', 'oneline-detailed', 'oneline-binary', 'oneline-binary-payload'], - default='pretty', - help="Specify the format used to print the message contents:\n" - "- Print the binary representation of each message on a single line, but no other details\n" - "- pretty - Print the message contents in a human-readable format (default)\n" - "- pretty-binary - Use `pretty` format, but include the binary representation of each message\n" - "- pretty-binary-payload - Like `pretty-binary`, but exclude the message header from the binary\n" - "- oneline - Print a summary of each message on a single line\n" - "- oneline-detailed - Print a one-line summary, including message offset details\n" - "- oneline-binary - Use `oneline-detailed` format, but include the binary representation of each message\n" - "- oneline-binary-payload - Like `oneline-binary`, but exclude the message header from the binary") - - -def print_message(header, contents, offset_bytes=None, format='pretty', bytes=None): - if format == 'binary': - if bytes is None: - raise ValueError('No data provided for binary format.') - parts = [] - elif isinstance(contents, MessagePayload): - if format.startswith('oneline'): - # The repr string should always start with the message type, then other contents: - # [POSE (10000), p1_time=12.029 sec, gps_time=2249:528920.500 (1360724120.500 sec), ...] - # We want to reformat and insert the additional details as follows for consistency: - # POSE (10000) [sequence=10, ... p1_time=12.029 sec, gps_time=2249:528920.500 (1360724120.500 sec), ...] - message_str = repr(contents).split('\n')[0] - message_str = message_str.replace('[', '', 1) - break_idx = message_str.find(',') - if break_idx >= 0: - message_str = f'{message_str[:break_idx]} [{message_str[(break_idx + 2):]}' - else: - message_str = message_str.rstrip(']') - parts = [message_str] - else: - parts = str(contents).split('\n') - else: - parts = [f'{header.get_type_string()} (unsupported)'] - - if format != 'oneline': - details = 'source_id=%d, sequence=%d, size=%d B' % (header.source_identifier, - header.sequence_number, - header.get_message_size()) - if offset_bytes is not None: - details += ', offset=%d B (0x%x)' % (offset_bytes, offset_bytes) - - idx = parts[0].find('[') - if idx < 0: - parts[0] += f' [{details}]' - else: - parts[0] = f'{parts[0][:(idx + 1)]}{details}, {parts[0][(idx + 1):]}' - - if bytes is None: - pass - elif format == 'binary': - byte_string = bytes_to_hex(bytes, bytes_per_row=-1, bytes_per_col=2).replace('\n', '\n ') - parts.insert(1, byte_string) - elif format == 'pretty-binary' or format == 'pretty-binary-payload': - if format.endswith('-payload'): - bytes = bytes[MessageHeader.calcsize():] - byte_string = ' ' + bytes_to_hex(bytes, bytes_per_row=16, bytes_per_col=2).replace('\n', '\n ') - parts.insert(1, " Binary:\n%s" % byte_string) - elif format == 'oneline-binary' or format == 'oneline-binary-payload': - if format.endswith('-payload'): - bytes = bytes[MessageHeader.calcsize():] - byte_string = ' ' + bytes_to_hex(bytes, bytes_per_row=16, bytes_per_col=2).replace('\n', '\n ') - parts.insert(1, byte_string) - - _logger.info('\n'.join(parts)) - - def main(): parser = ArgumentParser(description="""\ Decode and print the contents of messages contained in a *.p1log file or other @@ -236,9 +162,8 @@ def main(): total_decoded_messages = 0 total_messages = 0 bytes_decoded = 0 + device_summary = DeviceSummary() - def create_stats_entry(): return {'count': 0} - message_stats = defaultdict(create_stats_entry) try: for header, message, data, offset_bytes in reader: total_decoded_messages += 1 @@ -251,8 +176,7 @@ def create_stats_entry(): return {'count': 0} total_messages += 1 bytes_decoded += len(data) if options.summary: - entry = message_stats[header.message_type] - entry['count'] += 1 + device_summary.update(header, message) if message is not None: p1_time = message.get_p1_time() @@ -318,20 +242,7 @@ def create_stats_entry(): return {'count': 0} _logger.info('Total data read: %d B' % reader.get_bytes_read()) _logger.info('Selected data size: %d B' % bytes_decoded) _logger.info('') - - format_string = '| {:<50} | {:>5} | {:>8} |' - _logger.info(format_string.format('Message Name', 'Type', 'Count')) - _logger.info(format_string.format('-' * 50, '-' * 5, '-' * 8)) - for type, info in sorted(message_stats.items(), key=lambda x: int(x[0])): - if type in message_type_to_class: - name = message_type_to_class[type].__name__ - elif type.is_unrecognized(): - name = str(type) - else: - name = f'Unsupported ({str(type)})' - _logger.info(format_string.format(name, int(type), info['count'])) - _logger.info(format_string.format('-' * 50, '-' * 5, '-' * 8)) - _logger.info(format_string.format('Total', '', total_messages)) + print_summary_table(device_summary) elif total_messages == 0: _logger.warning('No valid FusionEngine messages found.') diff --git a/python/fusion_engine_client/utils/print_utils.py b/python/fusion_engine_client/utils/print_utils.py new file mode 100644 index 00000000..65962683 --- /dev/null +++ b/python/fusion_engine_client/utils/print_utils.py @@ -0,0 +1,148 @@ +from typing import Optional, Union + +import argparse +from collections import defaultdict + +from ..messages import * +from ..utils import trace as logging +from ..utils.bin_utils import bytes_to_hex + +_logger = logging.getLogger('point_one.fusion_engine.utils.print_utils') + + +def add_print_format_argument(parser: argparse._ActionsContainer, *arg_names): + parser.add_argument( + *arg_names, + choices=['binary', 'pretty', 'pretty-binary', 'pretty-binary-payload', + 'oneline', 'oneline-detailed', 'oneline-binary', 'oneline-binary-payload'], + default='pretty', + help="Specify the format used to print the message contents:\n" + "- Print the binary representation of each message on a single line, but no other details\n" + "- pretty - Print the message contents in a human-readable format (default)\n" + "- pretty-binary - Use `pretty` format, but include the binary representation of each message\n" + "- pretty-binary-payload - Like `pretty-binary`, but exclude the message header from the binary\n" + "- oneline - Print a summary of each message on a single line\n" + "- oneline-detailed - Print a one-line summary, including message offset details\n" + "- oneline-binary - Use `oneline-detailed` format, but include the binary representation of each message\n" + "- oneline-binary-payload - Like `oneline-binary`, but exclude the message header from the binary") + + +def print_message(header: MessageHeader, contents: Union[MessagePayload, bytes], + offset_bytes: Optional[int] = None, format: str = 'pretty', bytes: Optional[int] = None, + logger: Optional[logging.Logger] = None): + if logger is None: + logger = _logger + + if format == 'binary': + if bytes is None: + raise ValueError('No data provided for binary format.') + parts = [] + elif isinstance(contents, MessagePayload): + if format.startswith('oneline'): + # The repr string should always start with the message type, then other contents: + # [POSE (10000), p1_time=12.029 sec, gps_time=2249:528920.500 (1360724120.500 sec), ...] + # We want to reformat and insert the additional details as follows for consistency: + # POSE (10000) [sequence=10, ... p1_time=12.029 sec, gps_time=2249:528920.500 (1360724120.500 sec), ...] + message_str = repr(contents).split('\n')[0] + message_str = message_str.replace('[', '', 1) + break_idx = message_str.find(',') + if break_idx >= 0: + message_str = f'{message_str[:break_idx]} [{message_str[(break_idx + 2):]}' + else: + message_str = message_str.rstrip(']') + parts = [message_str] + else: + parts = str(contents).split('\n') + else: + parts = [f'{header.get_type_string()} (unsupported)'] + + if format != 'oneline': + details = 'source_id=%d, sequence=%d, size=%d B' % (header.source_identifier, + header.sequence_number, + header.get_message_size()) + if offset_bytes is not None: + details += ', offset=%d B (0x%x)' % (offset_bytes, offset_bytes) + + idx = parts[0].find('[') + if idx < 0: + parts[0] += f' [{details}]' + else: + parts[0] = f'{parts[0][:(idx + 1)]}{details}, {parts[0][(idx + 1):]}' + + if bytes is None: + pass + elif format == 'binary': + byte_string = bytes_to_hex(bytes, bytes_per_row=-1, bytes_per_col=2).replace('\n', '\n ') + parts.insert(1, byte_string) + elif format == 'pretty-binary' or format == 'pretty-binary-payload': + if format.endswith('-payload'): + bytes = bytes[MessageHeader.calcsize():] + byte_string = ' ' + bytes_to_hex(bytes, bytes_per_row=16, bytes_per_col=2).replace('\n', '\n ') + parts.insert(1, " Binary:\n%s" % byte_string) + elif format == 'oneline-binary' or format == 'oneline-binary-payload': + if format.endswith('-payload'): + bytes = bytes[MessageHeader.calcsize():] + byte_string = ' ' + bytes_to_hex(bytes, bytes_per_row=16, bytes_per_col=2).replace('\n', '\n ') + parts.insert(1, byte_string) + + logger.info('\n'.join(parts)) + + +class MessageStatsEntry: + def __init__(self): + self.count = 0 + self.total_bytes = 0 + + def update(self, header: MessageHeader, message: MessagePayload): + self.count += 1 + self.total_bytes = header.get_message_size() + + +class DeviceSummary: + def __init__(self): + self.device_id = None + self.version_info = None + self.stats = defaultdict(MessageStatsEntry) + + def update(self, header: MessageHeader, message: MessagePayload): + self.stats[header.message_type].update(header, message) + + if header.message_type == MessageType.DEVICE_ID: + self.device_id = message + elif header.message_type == MessageType.VERSION_INFO: + self.version_info = message + + +def print_summary_table(device_summary: DeviceSummary, logger: Optional[logging.Logger] = None): + if logger is None: + logger = _logger + + device_type = DeviceType.UNKNOWN + device_id = '' + if device_summary.device_id is not None: + device_type = device_summary.device_id.device_type + if len(device_summary.device_id.user_id_data) != 0: + device_id = DeviceIDMessage._get_str(device_summary.device_id.user_id_data) + logger.info(f'Device ID: {device_id} | ' + f'Device type: {"" if device_type == DeviceType.UNKNOWN else str(device_type)}') + + if device_summary.version_info is not None and device_summary.version_info.engine_version_str != "": + logger.info(f'Software version: {device_summary.version_info.engine_version_str}') + else: + logger.info(f'Software version: ') + + format_string = '| {:<50} | {:>5} | {:>8} |' + logger.info(format_string.format('Message Name', 'Type', 'Count')) + logger.info(format_string.format('-' * 50, '-' * 5, '-' * 8)) + total_messages = 0 + for type, entry in sorted(device_summary.stats.items(), key=lambda x: int(x[0])): + if type in message_type_to_class: + name = message_type_to_class[type].__name__ + elif type.is_unrecognized(): + name = str(type) + else: + name = f'Unsupported ({str(type)})' + logger.info(format_string.format(name, int(type), entry.count)) + total_messages += entry.count + logger.info(format_string.format('-' * 50, '-' * 5, '-' * 8)) + logger.info(format_string.format('Total', '', total_messages)) diff --git a/python/setup.py b/python/setup.py index 52f8208a..e0355d90 100644 --- a/python/setup.py +++ b/python/setup.py @@ -82,6 +82,7 @@ def find_version(*file_paths): packages=find_packages(where='.'), entry_points={ 'console_scripts': [ + 'p1_capture = fusion_engine_client.applications.p1_capture:main', 'p1_display = fusion_engine_client.applications.p1_display:main', 'p1_extract = fusion_engine_client.applications.p1_extract:main', 'p1_lband_extract = fusion_engine_client.applications.p1_lband_extract:main',