Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion debugger/src/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,10 @@
USER_PTYPE_PREPROCESSED = "ptype_output_preprocessed"

# Amount of time to allow select.select() to wait for program stdout before timing out
TIMEOUT_DURATION = 0.05
TIMEOUT_DURATION = 0.6

# Range of ports to give to debug sessions for communicating between debugger
# server and gdb subprocess.
# Ports used will be in range [MIN_SOCKET_PORT, MIN_SOCKET_PORT + PORT_RANGE)
MIN_SOCKET_PORT = 11000
PORT_RANGE = 100
28 changes: 23 additions & 5 deletions debugger/src/gdb_scripts/DebugSession.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import gdb
import datetime

from src.gdb_scripts.Publisher import Publisher
from src.gdb_scripts.custom_next import CustomNextCommand
from src.gdb_scripts.parse_functions import (
get_type_decl_strs,
Expand All @@ -9,9 +11,8 @@
from src.gdb_scripts.iomanager import IOManager
from src.constants import CUSTOM_NEXT_COMMAND_NAME


class DebugSession:
def __init__(self, user_socket_id: str, program_name: str):
def __init__(self, user_socket_id: str, ipc_port: int, program_name: str):
print("Initializing DebugSession instance...")

self.user_socket_id = user_socket_id
Expand All @@ -22,7 +23,12 @@ def __init__(self, user_socket_id: str, program_name: str):

# Load program to debug. Make sure it is compiled with -g flag to
# include debug symbols
gdb.execute(f"file {program_name}")
# from_tty=True option makes the output of this command show up in
# the gdb's paginated output stream. Equivalent to:
# output = gdb.execute(cmd, to_string=True)
# gdb.write(output)
gdb.execute(f"file {program_name}", from_tty=True)
gdb.flush()

"""
Necessary to store these three information here because the output
Expand All @@ -41,11 +47,21 @@ def __init__(self, user_socket_id: str, program_name: str):

self.io_manager = IOManager(user_socket_id=self.user_socket_id)

# Restrict the publisher socket to only listen for subscribers
# connections coming from the loopback network interface (localhost).
# Bc the debugger server running the subscriber threads should be
# running on the same machine as this publisher.
host = '127.0.0.1'
print(f"begin create publisher at datetime {datetime.datetime.now()}")
self.publisher = Publisher.create_publisher(host, ipc_port)
print(f"created publisher at datetime {datetime.datetime.now()}")
gdb.flush()

# Start the debug session
gdb.execute("start")
gdb.execute("start", from_tty=True)

# Make stdout stream unbuffered
gdb.execute("call setbuf(stdout, (void *) 0)")
gdb.execute("call setbuf(stdout, (void *) 0)", from_tty=True)

def get_cached_type_decl_strs(self):
return self.type_decl_strs
Expand All @@ -55,3 +71,5 @@ def get_cached_parsed_type_decls(self):

def get_cached_parsed_fn_decls(self):
return self.parsed_fn_decls

print(f"Sourced DebugSession.py")
197 changes: 197 additions & 0 deletions debugger/src/gdb_scripts/Publisher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
import gdb
import multiprocessing.connection
import multiprocessing
import socket
import threading
from typing import Any, Dict, Callable

DEBUG_PUBLISHER: bool = True

SUBSCRIPTION_THREAD_JOIN_TIMEOUT: int = 2

class Subscription:
def __init__(self, addr, conn: socket.socket, remove_self: Callable[[], None]):
self.conn = conn
self.addr = addr
self.thread = threading.Thread(target=self.__handle_subscription)
self.thread.start()
self.remove_self = remove_self

def __handle_subscription(self):
with self.conn:
try:
while True:
data = self.conn.recv(1024)
if not data:
DEBUG_PUBLISHER and print(f"None data received by subscriber {self.addr}, terminating subscription.")
break
except Exception as e:
print(f"Error with subscriber {self.addr} receiving data, terminating subscription")
raise e

## `self.conn` will automatically close when execution leaves the
# `with self.conn` block.
self.remove_self(self.addr)

@staticmethod
def create_subscription(addr, conn: socket.socket):
return Subscription(addr, conn)

def request_unsubscribe(self):
try:
# Poitely ask the subscriber to unsubscribe
self.conn.sendall(b"please unsubscribe")
except Exception as e:
print(f"Error sending to {self.conn.getpeername()}:")
raise e

DEBUG_PUBLISHER and print(f"Waiting for thread {self.thread.name} handling subscriber {self.addr} to finish...")
self.thread.join(SUBSCRIPTION_THREAD_JOIN_TIMEOUT)
if self.thread.is_alive():
raise RuntimeError(f"Thread join timed out. Thread {self.thread.name}, timeout {SUBSCRIPTION_THREAD_JOIN_TIMEOUT}")
else:
DEBUG_PUBLISHER and print(f"Thread {self.thread.name} handling subscriber {self.addr} finished.")


class Publisher:
def __init__(self, host: str, port: int):
self.host = host
self.port = port

self.subscriptions: Dict[Any, Subscription] = dict()
self.subscriptions_lock: threading.Lock = threading.Lock()

self.publisher_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.publisher_socket.bind((self.host, self.port))
## Allow reusing the same port after Ctrl+C
self.publisher_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.publisher_socket.listen()
print(f"Publisher socket listening at {self.publisher_socket.getsockname()}. Waiting for subscribers...")
gdb.flush()

self.accept_subscriptions_thread = threading.Thread(target=self.__accept_subscriptions)
self.stop_accepting_subscriptions: threading.Event = threading.Event()
self.accept_subscriptions_thread.start()
print(f"Publisher accepting connections at {self.host} {self.port}")

# publisher_ready_event_fd = int(os.environ['PUBLISHER_READY_EVENT_FD'])
# publisher_ready_event = multiprocessing.connection.Connection(publisher_ready_event_fd)
# publisher_ready_event.set()

# parent_conn_fd = int(os.environ['PARENT_CONN_FD'])
# print(f"parent_conn_fd in subprocess: {parent_conn_fd}")
# parent_conn = multiprocessing.connection.Connection(parent_conn_fd)
# parent_conn.send("publisher ready")

gdb.flush()

@staticmethod
def create_publisher(host: str, port: int):
return Publisher(host, port)

def __enter__(self):
'''
`.__enter__()` and `.__exit__()` are implemented for the context manager
protocol, making this class a 'context manager object'.

```
with Publisher.create_publisher(HOST, PORT) as publisher:
## do stuff with publisher
publisher.broadcast
```

`.__enter__()` is called by the `with` statement to enter the runtime
context.

The return value of `.__enter__()` is put into the `as publisher`
variable.
'''
return self

def __exit__(self, exc_type, exc_value, exc_tb):
'''
`.__exit__()` is called when execution leaves the context of the `with`
block.
'''
self.close()

def __accept_subscriptions(self):
'''
Intended to be a private method (but Python doesn't have a concept of
private method methods, so we use leading underscores __ to indicate
that this should be used as a private method). Undergoes name mangling
so you cannot use publisher.__accept_subscriptions() outside this class.
https://docs.python.org/2/tutorial/classes.html#private-variables-and-class-local-references
'''
while not self.stop_accepting_subscriptions.is_set():
try:
# The purpose of this timeout is to perpetually go back to the
# condition of the containing while loop, which makes sure
# we are still accepting new subscriptions.
# Note: This looks like polling for connections but its not
# really. The `self.publisher_socket.accept()` does not work by
# polling (at a high level), it waits.
self.publisher_socket.settimeout(3)
conn, addr = self.publisher_socket.accept()
DEBUG_PUBLISHER and print(f"New subscriber connection to publisher accepted: {addr}")

if not self.stop_accepting_subscriptions.is_set():
self.__add_subcription(
addr,
Subscription(
addr,
conn,
lambda addr: self.__remove_subscription(addr)
)
)
else:
break
except socket.timeout:
continue
except socket.error as e:
if self.stop_accepting_subscriptions.is_set():
print(f'socket error raised while publisher no longer accepting connections:')
print(e)
break
else:
raise e

print(f'Terminate __accept_subscriptions')

def __add_subcription(self, addr, subscription: Subscription):
with self.subscriptions_lock:
self.subscriptions[addr] = subscription
DEBUG_PUBLISHER and print(f"Added subscription to socket {addr}")

def __remove_subscription(self, addr):
with self.subscriptions_lock:
del self.subscriptions[addr]
DEBUG_PUBLISHER and print(f"Removed subscription to socket {addr}")

def broadcast(self, message: str):
with self.subscriptions_lock:
for subscription in self.subscriptions.values():
try:
subscription.conn.sendall(message.encode())
print(f"Published to {subscription.conn.getpeername()}: {message}")
except Exception as e:
print(f"Error sending to {subscription.conn.getpeername()}: {e}")

def close(self):
## Trigger event to signal accept_subcriptions thread to stop accepting
## new subscriptions.
self.stop_accepting_subscriptions.set()

DEBUG_PUBLISHER and print(f"Waiting for thread {self.accept_subscriptions_thread.name} handling accepting new subscribers to finish...")
self.accept_subscriptions_thread.join()
DEBUG_PUBLISHER and print(f"Thread {self.accept_subscriptions_thread.name} handling accepting new subscribers has finished.")

## Ask all subscribers to unsubscribe.
## Does not guarantee that they will actually subscribe.
## But if they do then the thread handling that subscription should
## terminate and the subscription should remove itself from the
## publisher's list of subscriptions.
for subscription in list(self.subscriptions.values()):
subscription.request_unsubscribe()

self.publisher_socket.close()
10 changes: 5 additions & 5 deletions debugger/src/gdb_scripts/custom_next.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
import os
from pprint import pprint
import gdb
import os
import re
import subprocess
from pprint import pprint
from pycparser import parse_file, c_ast
import re
from src.constants import USER_MALLOC_CALL_FILE_NAME, USER_MALLOC_CALL_PREPROCESSED, USER_PTYPE_FILE_NAME, USER_PTYPE_PREPROCESSED
from src.utils import create_abs_file_path
import re
from src.utils.path_utils import create_abs_file_path

from src.gdb_scripts.use_socketio_connection import useSocketIOConnection, enable_socketio_client_emit

Expand Down Expand Up @@ -375,6 +374,7 @@ def invoke(self, arg=None, from_tty=None):

send_backend_data_to_server(
self.user_socket_id, backend_data=backend_data)
self.debug_session.publisher.broadcast("\n\n\n============================\nHELLO I AM THE PUBLISHER IN GDB SUBPROCESS SENDING MSG TO THE SUBSCRIBER IN DEBUGGER SERVER\n============================\n\n\n")

print(f"\n=== Finished running update_backend_state in gdb instance\n\n")
return backend_data
Expand Down
74 changes: 74 additions & 0 deletions debugger/src/ipc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
'''
Tools for interprocess communication between the main debugger server
and the gdb subprocesses that it has spawned.

Event-based, modelled on publisher-subscriber model so that:
- messages being received from another process are detected instantaneously
- no explicit polling or busy waiting; we use threads for publisher to accept
subscribers and for subscribers to listen to publisher

Closely related to the Subscriber in this module is the Publisher in
debugger/gdb_scripts/Publisher.py
The Publisher implementation is placed there instead of here because it is
technically a gdb script which, like all other gdb scripts, runs inside
gdb subprocesses. It just happens to be written in python, but it is not
the same python runtime as this program and other python programs initiated
from server.py.
'''

import socket
import threading

from utils.socket_utils import connect_and_retry

# If this module is imported into some other file using `from ipc import *` then the list `__all__`
# defines all the things included in the import. So `connect_and_retry` won't be
# imported.
__all__ = ["Subscriber"]

class Subscriber:
def __init__(self, host: str, port: int):
self.host = host
self.port = port
self.thread = threading.Thread(target=self.listen)

@staticmethod
def create_subscriber(host: str, port: int):
return Subscriber(host, port)

# def start(self, publisher_ready_event):
# timeout = 5
# result = publisher_ready_event.wait(timeout)
# if result:
# print(f"publisher triggered ready event, subscriber start thread")
# self.thread.start()
# else:
# raise RuntimeError(f"Event timeout: publisher_ready_event was not set to true after {timeout} seconds.")

# def start(self, child_conn: multiprocessing.connection.Connection):
# timeout = 10
# if child_conn.poll(timeout):
# message = child_conn.recv()
# if message == "publisher ready":
# self.thread.start()
# else:
# raise RuntimeError("Publisher reported not ready.")
# else:
# # raise RuntimeError(f"Timeout: publisher did not respond after {timeout} seconds.")
# print(f"Timeout: publisher did not respond after {timeout} seconds.")

def start(self):
self.thread.start()

def listen(self):
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
connect_and_retry(sock, (self.host, self.port), [1,2,4])

print(f"Subscriber with socket {sock.getsockname()} listening to socket {sock.getpeername()}")

while True:
data = sock.recv(4096)
if not data or data.decode().lower() == "please unsubscribe":
print(f"Subscriber stop listening to socket {sock.getsockname()}.")
break
print('Subscriber received', repr(data.decode()))
Loading