Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
277 changes: 93 additions & 184 deletions python_files/tests/pytestadapter/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import json
import os
import pathlib
import select
import socket
import subprocess
import sys
Expand All @@ -14,10 +15,6 @@
import uuid
from typing import Any, Dict, List, Optional, Tuple

if sys.platform == "win32":
from namedpipe import NPopen


script_dir = pathlib.Path(__file__).parent.parent.parent
script_dir_child = pathlib.Path(__file__).parent.parent
sys.path.append(os.fspath(script_dir))
Expand Down Expand Up @@ -128,79 +125,95 @@ def parse_rpc_message(data: str) -> Tuple[Dict[str, str], str]:
print("json decode error")


def _listen_on_fifo(pipe_name: str, result: List[str], completed: threading.Event):
# Open the FIFO for reading
fifo_path = pathlib.Path(pipe_name)
with fifo_path.open() as fifo:
print("Waiting for data...")
while True:
if completed.is_set():
break # Exit loop if completed event is set
data = fifo.read() # This will block until data is available
if len(data) == 0:
# If data is empty, assume EOF
break
print(f"Received: {data}")
result.append(data)
if sys.platform == "win32":
from namedpipe import NPopen

@contextlib.contextmanager
def pipe_setup_and_listen(pipe_name: str, result: List[str]):
# For Windows, named pipes have a specific naming convention.
pipe_path = f"\\\\.\\pipe\\{pipe_name}"

def _listen_on_pipe_new(listener, result: List[str], completed: threading.Event):
"""Listen on the named pipe or Unix domain socket for JSON data from the server.
with NPopen("r+t", name=pipe_name, bufsize=0) as pipe:
completed = threading.Event()

Created as a separate function for clarity in threading context.
"""
# Windows design
if sys.platform == "win32":
all_data: list = []
stream = listener.wait()
while True:
# Read data from collection
close = stream.closed
if close:
break
data = stream.readlines()
if not data:
if completed.is_set():
break # Exit loop if completed event is set
else:
try:
# Attempt to accept another connection if the current one closes unexpectedly
print("attempt another connection")
except socket.timeout:
# On timeout, append all collected data to result and return
# result.append("".join(all_data))
return
data_decoded = "".join(data)
all_data.append(data_decoded)
# Append all collected data to result array
result.append("".join(all_data))
else: # Unix design
connection, _ = listener.socket.accept()
listener.socket.settimeout(1)
all_data: list = []
while True:
# Reading from connection
data: bytes = connection.recv(1024 * 1024)
if not data:
if completed.is_set():
break # Exit loop if completed event is set
else:
try:
# Attempt to accept another connection if the current one closes unexpectedly
connection, _ = listener.socket.accept()
except socket.timeout:
# On timeout, append all collected data to result and return
result.append("".join(all_data))
return
all_data.append(data.decode("utf-8"))
# Append all collected data to result array
result.append("".join(all_data))


def _run_test_code(proc_args: List[str], proc_env, proc_cwd: str, completed: threading.Event):
result = subprocess.run(proc_args, env=proc_env, cwd=proc_cwd)
completed.set()
return result
def listen():
all_data: list = []
stream = pipe.wait()
while True:
# Read data from collection
close = stream.closed
if close:
break
data = stream.readlines()
if not data:
if completed.is_set():
break # Exit loop if completed event is set
else:
try:
# Attempt to accept another connection if the current one closes unexpectedly
print("attempt another connection")
except socket.timeout:
# On timeout, append all collected data to result and return
# result.append("".join(all_data))
return
data_decoded = "".join(data)
all_data.append(data_decoded)
# Append all collected data to result array
result.append("".join(all_data))

thread = threading.Thread(target=listen)
thread.start()
try:
yield pipe_path
finally:
completed.set()
thread.join()
else:

@contextlib.contextmanager
def pipe_setup_and_listen(pipe_name: str, result: List[str]):
# For Unix-like systems, use either the XDG_RUNTIME_DIR or a temporary directory.
xdg_runtime_dir = os.getenv("XDG_RUNTIME_DIR")
pipe_path = pathlib.Path(
xdg_runtime_dir if xdg_runtime_dir else tempfile.gettempdir(),
pipe_name,
)
os.mkfifo(pipe_path)

completed = threading.Event()

def listen():
# When using blocking IO, open blocks forever if the subprocess compleates but never
# opens the pipe for writing (which may happen if there is an error early in the
# subprocess.) Hence we go to the effort of using non-blocking io so that we can
# break out of this function if that happens.
fd = os.open(pipe_path, os.O_RDONLY | os.O_NONBLOCK)
try:
all_data = bytearray()
while True:
if completed.is_set():
break

# Wait till the pipe has data to read, with a timeout.
rlist, _, _ = select.select([fd], [], [], 0.1)
if rlist:
# Data is available, read it.
data = os.read(fd, 1024)
if not data:
# Empty data indicates EOF.
break
all_data.extend(data)
result.append(all_data.decode())
finally:
os.close(fd)

thread = threading.Thread(target=listen)
thread.start()
try:
yield pipe_path
finally:
completed.set()
thread.join()


def runner(args: List[str]) -> Optional[List[Dict[str, Any]]]:
Expand Down Expand Up @@ -293,80 +306,20 @@ def runner_with_cwd_env(
*args,
]

# Generate pipe name, pipe name specific per OS type.

# Windows design
if sys.platform == "win32":
with NPopen("r+t", name=pipe_name, bufsize=0) as pipe:
# Update the environment with the pipe name and PYTHONPATH.
env = os.environ.copy()
env.update(
{
"TEST_RUN_PIPE": pipe.path,
"PYTHONPATH": os.fspath(pathlib.Path(__file__).parent.parent.parent),
}
)
# if additional environment variables are passed, add them to the environment
if env_add:
env.update(env_add)

completed = threading.Event()

result = [] # result is a string array to store the data during threading
t1: threading.Thread = threading.Thread(
target=_listen_on_pipe_new, args=(pipe, result, completed)
)
t1.start()

t2 = threading.Thread(
target=_run_test_code,
args=(process_args, env, path, completed),
)
t2.start()

t1.join()
t2.join()

return process_data_received(result[0]) if result else None
else: # Unix design
# Update the environment with the pipe name and PYTHONPATH.
result = [] # result is a string array to store the data during threading
with pipe_setup_and_listen(pipe_name, result) as pipe_path:
env = os.environ.copy()
env.update(
{
"TEST_RUN_PIPE": pipe_name,
"TEST_RUN_PIPE": pipe_path,
"PYTHONPATH": os.fspath(pathlib.Path(__file__).parent.parent.parent),
}
)
# if additional environment variables are passed, add them to the environment
if env_add:
env.update(env_add)
# server = UnixPipeServer(pipe_name)
# server.start()
#################
# Create the FIFO (named pipe) if it doesn't exist
# if not pathlib.Path.exists(pipe_name):
os.mkfifo(pipe_name)
#################

completed = threading.Event()

result = [] # result is a string array to store the data during threading
t1: threading.Thread = threading.Thread(
target=_listen_on_fifo, args=(pipe_name, result, completed)
)
t1.start()

t2: threading.Thread = threading.Thread(
target=_run_test_code,
args=(process_args, env, path, completed),
)

t2.start()

t1.join()
t2.join()

return process_data_received(result[0]) if result else None
subprocess.run(process_args, env=env, cwd=path)
return process_data_received(result[0]) if result else None


def find_test_line_number(test_name: str, test_file_path) -> str:
Expand Down Expand Up @@ -422,48 +375,4 @@ def generate_random_pipe_name(prefix=""):
if not prefix:
prefix = "python-ext-rpc"

# For Windows, named pipes have a specific naming convention.
if sys.platform == "win32":
return f"\\\\.\\pipe\\{prefix}-{random_suffix}"

# For Unix-like systems, use either the XDG_RUNTIME_DIR or a temporary directory.
xdg_runtime_dir = os.getenv("XDG_RUNTIME_DIR")
if xdg_runtime_dir:
return os.path.join(xdg_runtime_dir, f"{prefix}-{random_suffix}") # noqa: PTH118
else:
return os.path.join(tempfile.gettempdir(), f"{prefix}-{random_suffix}") # noqa: PTH118


class UnixPipeServer:
def __init__(self, name):
self.name = name
self.is_windows = sys.platform == "win32"
if self.is_windows:
raise NotImplementedError(
"This class is only intended for Unix-like systems, not Windows."
)
else:
# For Unix-like systems, use a Unix domain socket.
self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
# Ensure the socket does not already exist
try:
os.unlink(self.name) # noqa: PTH108
except OSError:
if os.path.exists(self.name): # noqa: PTH110
raise

def start(self):
if self.is_windows:
raise NotImplementedError(
"This class is only intended for Unix-like systems, not Windows."
)
else:
# Bind the socket to the address and listen for incoming connections.
self.socket.bind(self.name)
self.socket.listen(1)
print(f"Server listening on {self.name}")

def stop(self):
# Clean up the server socket.
self.socket.close()
print("Server stopped.")
return f"{prefix}-{random_suffix}"