diff --git a/lgl_android_install.py b/lgl_android_install.py index 1b6eb3f..2c3117a 100755 --- a/lgl_android_install.py +++ b/lgl_android_install.py @@ -606,19 +606,23 @@ def configure_server(conn: ADBConnect, profile_dir: The desired output directory path for timeline. Existing files in the directory may be overwritten. ''' + verbose = False + # Create a server instance - instance = server.CommsServer(0) + instance = server.CommsServer(0, verbose) if timeline_file: # Import late to avoid pulling in transitive deps when unused from lglpy.comms import service_gpu_timeline - service_tl = service_gpu_timeline.GPUTimelineService(timeline_file) + service_tl = service_gpu_timeline.GPUTimelineService( + timeline_file, verbose) instance.register_endpoint(service_tl) if profile_dir: # Import late to avoid pulling in transitive deps when unused from lglpy.comms import service_gpu_profile - service_prof = service_gpu_profile.GPUProfileService(profile_dir) + service_prof = service_gpu_profile.GPUProfileService( + profile_dir, verbose) instance.register_endpoint(service_prof) # Start it running diff --git a/lglpy/comms/server.py b/lglpy/comms/server.py index 7ab9b3f..1be89c9 100644 --- a/lglpy/comms/server.py +++ b/lglpy/comms/server.py @@ -22,19 +22,23 @@ # ----------------------------------------------------------------------------- ''' -This module implements the server-side communications module that can accept -client connections from a layer driver, and dispatch messages to registered -service handler in the server. - -This module currently only accepts a single connection at a time and message -handling is synchronous inside the server. It is therefore not possible to -implement pseudo-host-driven event loops if the layer is using multiple -services concurrently - this needs threads per service. +This module implements the server-side of the communications module that can +accept connections from client layer drivers running on the device. The +protocol is service-based, and the server will dispatch messages to the +registered service handler for each message channel. + +The server is multi-threaded, allowing multiple layers to concurrently access +networked services provided by host-side implementations. However, within each +client connection messages are handled synchronously by a single worker thread. +It is therefore not possible to implement pseudo-host-driven event loops if a +layer is using multiple services concurrently - this needs threads per service +endpoint which is not yet implemented. ''' import enum import socket import struct +import threading from typing import Any, Optional @@ -143,7 +147,7 @@ class CommsServer: Class listening for client connection from a layer and handling messages. This implementation is designed to run in a thread, so has a run method - that will setup and listen on the server socket.q + that will setup and listen on the server socket. This implementation only handles a single layer connection at a time, but can handle multiple connections serially without restarting. @@ -173,7 +177,6 @@ def __init__(self, port: int, verbose: bool = False): self.register_endpoint(self) self.shutdown = False - self.sockd = None # type: Optional[socket.socket] self.sockl = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.sockl.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) @@ -185,6 +188,9 @@ def __init__(self, port: int, verbose: bool = False): # Work out which port was assigned if not user-defined self.port = self.sockl.getsockname()[1] + # Pool of worker threads + self.workers: list[threading.Thread] = [] + def register_endpoint(self, endpoint: Any) -> int: ''' Register a new service endpoint with the server. @@ -235,55 +241,60 @@ def run(self) -> None: if self.verbose: print('Waiting for client connection') try: - self.sockd, _ = self.sockl.accept() + # Wait for a new client connection + sockd, _ = self.sockl.accept() if self.verbose: print(' + Client connected') - self.run_client() + # Spawn a worker thread for this client + thread = threading.Thread(target=self.run_client, args=(sockd,)) + self.workers.append(thread) + thread.start() - if self.verbose: - print(' + Client disconnected') - self.sockd.close() - self.sockd = None - - except ClientDropped: - if self.verbose: - print(' + Client disconnected') - if self.sockd: - self.sockd.close() - self.sockd = None + # Release old worker resources if they have completed + self.workers = [x for x in self.workers if x.is_alive()] except OSError: continue self.sockl.close() - def run_client(self) -> None: + def run_client(self, sockd: socket.socket) -> None: ''' Enter client message handler run loop. Raises: ClientDropped: The client disconnected from the socket. ''' - while not self.shutdown: - # Read the header - data = self.receive_data(Message.HEADER_LEN) - message = Message(data) - - # Read the payload if there is one - if message.payload_size: - data = self.receive_data(message.payload_size) - message.add_payload(data) - - # Dispatch to a service handler - endpoint = self.endpoints[message.endpoint_id] - response = endpoint.handle_message(message) + try: + while not self.shutdown: + # Read the header + data = self.receive_data(sockd, Message.HEADER_LEN) + message = Message(data) + + # Read the payload if there is one + if message.payload_size: + data = self.receive_data(sockd, message.payload_size) + message.add_payload(data) + + # Dispatch to a service handler + endpoint = self.endpoints[message.endpoint_id] + response = endpoint.handle_message(message) + + # Send a response for all TX_RX messages + if message.message_type == MessageType.TX_RX: + header = Response(message, response) + self.send_data(sockd, header.get_header()) + self.send_data(sockd, response) + + except ClientDropped: + pass + + finally: + if self.verbose: + print(' + Client disconnected') - # Send a response for all TX_RX messages - if message.message_type == MessageType.TX_RX: - header = Response(message, response) - self.send_data(header.get_header()) - self.send_data(response) + sockd.close() def stop(self) -> None: ''' @@ -294,14 +305,16 @@ def stop(self) -> None: if self.sockl is not None: self.sockl.close() - if self.sockd is not None: - self.sockd.shutdown(socket.SHUT_RDWR) + for worker in self.workers: + worker.join() - def receive_data(self, size: int) -> bytes: + @staticmethod + def receive_data(sockd: socket.socket, size: int) -> bytes: ''' Fetch a fixed size packet from the socket. Args: + sockd: The data socket. size: The length of the packet in bytes. Returns: @@ -310,31 +323,29 @@ def receive_data(self, size: int) -> bytes: Raises: ClientDropped: The client disconnected from the socket. ''' - assert self.sockd is not None - data = b'' while len(data) < size: - new_data = self.sockd.recv(size - len(data)) + new_data = sockd.recv(size - len(data)) if not new_data: raise ClientDropped() data = data + new_data return data - def send_data(self, data: bytes) -> None: + @staticmethod + def send_data(sockd: socket.socket, data: bytes) -> None: ''' Send a fixed size packet to the socket. Args: + sockd: The data socket. data: The binary data to send. Raises: ClientDropped: The client disconnected from the socket. ''' - assert self.sockd is not None - while len(data): - sent_bytes = self.sockd.send(data) + sent_bytes = sockd.send(data) if not sent_bytes: raise ClientDropped() data = data[sent_bytes:] diff --git a/lglpy/comms/service_gpu_profile.py b/lglpy/comms/service_gpu_profile.py index adea4c3..b2cec22 100644 --- a/lglpy/comms/service_gpu_profile.py +++ b/lglpy/comms/service_gpu_profile.py @@ -30,7 +30,7 @@ import enum import json import os -from typing import Any, Optional, TypedDict, Union +from typing import Optional, TypedDict, Union from lglpy.comms.server import Message @@ -89,6 +89,7 @@ def __init__(self, dir_path: str, verbose: bool = False): dir_path: Directory to write on the filesystem verbose: Should this use verbose logging? ''' + del verbose self.base_dir = dir_path # Sample mode is detected on the fly when we get our first data @@ -137,7 +138,7 @@ def handle_end_frame(self, message: EndFrameMessage): # Emit the CSV file print(f'Generating CSV for frame {self.frame_id}') path = os.path.join(self.base_dir, f'frame_{self.frame_id:05d}.csv') - with open(path, 'w', newline='') as handle: + with open(path, 'w', newline='', encoding='utf-8') as handle: writer = csv.writer(handle) writer.writerow(self.table_header) writer.writerows(self.table_data) @@ -249,8 +250,8 @@ def handle_frame_sample(self, message: FrameMessage): self.create_frame_data(message) print(f'Updating CSV for frame {self.frame_id}') - path = os.path.join(self.base_dir, f'capture.csv') - with open(path, 'w', newline='') as handle: + path = os.path.join(self.base_dir, 'capture.csv') + with open(path, 'w', newline='', encoding='utf-8') as handle: writer = csv.writer(handle) writer.writerow(self.table_header) writer.writerows(self.table_data) diff --git a/lglpy/comms/service_gpu_timeline.py b/lglpy/comms/service_gpu_timeline.py index fad9cd6..b9fa79d 100644 --- a/lglpy/comms/service_gpu_timeline.py +++ b/lglpy/comms/service_gpu_timeline.py @@ -165,19 +165,22 @@ def map_renderpass_binding(type, index: int | None) -> str: Map the PB encoded render pass attachment type to a description. ''' if type == timeline_pb2.RenderpassAttachmentType.undefined: - assert ((index is None) or (index == 0)) + assert (index is None) or (index == 0) return "U" - elif type == timeline_pb2.RenderpassAttachmentType.color: - assert (index is not None) + + if type == timeline_pb2.RenderpassAttachmentType.color: + assert index is not None return f"C{index}" - elif type == timeline_pb2.RenderpassAttachmentType.depth: - assert ((index is None) or (index == 0)) + + if type == timeline_pb2.RenderpassAttachmentType.depth: + assert (index is None) or (index == 0) return "D" - elif type == timeline_pb2.RenderpassAttachmentType.stencil: - assert ((index is None) or (index == 0)) + + if type == timeline_pb2.RenderpassAttachmentType.stencil: + assert (index is None) or (index == 0) return "S" - else: - assert False + + assert False def map_image_transfer_type(type) -> str: @@ -186,16 +189,20 @@ def map_image_transfer_type(type) -> str: ''' if type == timeline_pb2.ImageTransferType.unknown_image_transfer: return "Unknown" - elif type == timeline_pb2.ImageTransferType.clear_image: + + if type == timeline_pb2.ImageTransferType.clear_image: return "Clear image" - elif type == timeline_pb2.ImageTransferType.copy_image: + + if type == timeline_pb2.ImageTransferType.copy_image: return "Copy image" - elif type == timeline_pb2.ImageTransferType.copy_buffer_to_image: + + if type == timeline_pb2.ImageTransferType.copy_buffer_to_image: return "Copy buffer to image" - elif type == timeline_pb2.ImageTransferType.copy_image_to_buffer: + + if type == timeline_pb2.ImageTransferType.copy_image_to_buffer: return "Copy image to buffer" - else: - assert False + + assert False def map_buffer_transfer_type(type) -> str: @@ -204,12 +211,14 @@ def map_buffer_transfer_type(type) -> str: ''' if type == timeline_pb2.BufferTransferType.unknown_buffer_transfer: return "Unknown" - elif type == timeline_pb2.BufferTransferType.fill_buffer: + + if type == timeline_pb2.BufferTransferType.fill_buffer: return "Fill buffer" - elif type == timeline_pb2.BufferTransferType.copy_buffer: + + if type == timeline_pb2.BufferTransferType.copy_buffer: return "Copy buffer" - else: - assert False + + assert False def map_as_build_type(type) -> str: @@ -218,12 +227,14 @@ def map_as_build_type(type) -> str: ''' if type == timeline_pb2.AccelerationStructureBuildType.unknown_as_build: return "Unknown" - elif type == timeline_pb2.AccelerationStructureTransferType.fast_build: + + if type == timeline_pb2.AccelerationStructureTransferType.fast_build: return "Fast build" - elif type == timeline_pb2.AccelerationStructureTransferType.fast_trace: + + if type == timeline_pb2.AccelerationStructureTransferType.fast_trace: return "Fast trace" - else: - assert False + + assert False def map_as_transfer_type(type) -> str: @@ -231,16 +242,20 @@ def map_as_transfer_type(type) -> str: Map the PB encoded acceleration structure transfer to a description. ''' base_type = timeline_pb2.AccelerationStructureTransferType + if type == base_type.unknown_as_transfer: return "Unknown" - elif type == base_type.struct_to_struct: + + if type == base_type.struct_to_struct: return "Copy acceleration structure" - elif type == base_type.struct_to_mem: + + if type == base_type.struct_to_mem: return "Copy acceleration structure to mem" - elif type == base_type.mem_to_struct: + + if type == base_type.mem_to_struct: return "Copy mem to acceleration structure" - else: - assert False + + assert False def map_debug_label(labels: list[str] | None) -> list[str]: @@ -249,7 +264,8 @@ def map_debug_label(labels: list[str] | None) -> list[str]: ''' if labels is None: return [] - # need to convert it to a list from a RepeatedScalarContainer + + # Need to convert it to a list from a RepeatedScalarContainer return [str(label) for label in labels] @@ -292,7 +308,7 @@ def __init__(self, file_path: str, verbose: bool = False): file_path: File to write on the filesystem verbose: Should this use verbose logging? ''' - self.devices: dict[int, GPUDeviceState] = dict() + self.devices: dict[int, GPUDeviceState] = {} self.last_submit: SubmitMetadataType | None = None self.last_render_pass: RenderpassMetadataType | None = None # pylint: disable=consider-using-with diff --git a/lglpy/timeline/data/processed_trace.py b/lglpy/timeline/data/processed_trace.py index 73e7366..743fde0 100644 --- a/lglpy/timeline/data/processed_trace.py +++ b/lglpy/timeline/data/processed_trace.py @@ -61,7 +61,7 @@ class GPUWorkload: PARENS = re.compile(r'(\(.*\))') RESOLUTION = re.compile(r'\d+x\d+') WHITESPACE = re.compile(r'\s\s+') - MEMO: dict[str, str] = dict() + MEMO: dict[str, str] = {} @classmethod def memoize(cls, string: str) -> str: @@ -760,7 +760,7 @@ def get_transfer_size_str(self) -> str: ''' # If indirect then show a placeholder if self.pixel_count == -1: - return f'? pixels' + return '? pixels' s = 's' if self.pixel_count != 1 else '' label = f'{self.pixel_count} pixel{s}' @@ -844,7 +844,7 @@ def get_transfer_size_str(self) -> str: ''' # If indirect then show a placeholder if self.byte_count == -1: - return f'? bytes' + return '? bytes' s = 's' if self.byte_count != 1 else '' label = f'{self.byte_count} byte{s}' @@ -933,7 +933,7 @@ def get_transfer_size_str(self) -> str: ''' # If indirect then show a placeholder if self.primitive_count == -1: - return f'? primitives' + return '? primitives' s = 's' if self.primitive_count != 1 else '' label = f'{self.primitive_count} primitive{s}' @@ -1014,7 +1014,7 @@ def get_transfer_size_str(self) -> str: ''' # If indirect then show a placeholder if self.byte_count == -1: - return f'? bytes' + return '? bytes' s = 's' if self.byte_count != 1 else '' label = f'{self.byte_count} byte{s}' diff --git a/lglpy/timeline/data/raw_trace.py b/lglpy/timeline/data/raw_trace.py index 9cc0411..db02323 100644 --- a/lglpy/timeline/data/raw_trace.py +++ b/lglpy/timeline/data/raw_trace.py @@ -300,7 +300,7 @@ class MetadataWorkload: label_stack: Debug label stack, or None if no user labels. ''' - MEMO: dict[str, str] = dict() + MEMO: dict[str, str] = {} @classmethod def memoize(cls, string: str) -> str: diff --git a/lglpy/timeline/drawable/timeline_base.py b/lglpy/timeline/drawable/timeline_base.py index 9b97637..1a1102c 100644 --- a/lglpy/timeline/drawable/timeline_base.py +++ b/lglpy/timeline/drawable/timeline_base.py @@ -285,7 +285,7 @@ def remove_multiple_from_active_objects(self, old_objects): Args: old_objects: the sequence of objects to remove. ''' - removed_objects = {x for x in old_objects} + removed_objects = set(old_objects) new_active_objects = self.active_objects - removed_objects # If nothing left then reset the active state @@ -575,12 +575,12 @@ def on_mouse_single_click(self, button, x, y, mod): return True # If 'c' modifier then append selection to highlighted region - elif mod == 'c': + if mod == 'c': self.add_to_active_objects(clicked_object) return True # If 's' modifier then subtract selection to highlighted region - elif mod == 's': + if mod == 's': self.remove_from_active_objects(clicked_object) return True