diff --git a/.coverage b/.coverage new file mode 100644 index 0000000..f1edc58 Binary files /dev/null and b/.coverage differ diff --git a/.coveragerc b/.coveragerc new file mode 100644 index 0000000..129c150 --- /dev/null +++ b/.coveragerc @@ -0,0 +1,13 @@ +[run] +branch = True +source = jsocket +omit = + */__init__.py + +[report] +show_missing = True +skip_covered = True +exclude_lines = + pragma: no cover + if __name__ == .__main__. + diff --git a/Makefile b/Makefile index ee3a35b..86588f1 100644 --- a/Makefile +++ b/Makefile @@ -1,4 +1,26 @@ -.PHONY: test-behave +.PHONY: test-behave test-pytest-cov test-behave-cov coverage lint test-behave: PYTHONPATH=. behave -f progress2 + +# Pytest coverage (terminal report) +test-pytest-cov: + pytest -q --cov=jsocket --cov-branch --cov-report=term-missing + +# Behave coverage (appends to same .coverage data) +test-behave-cov: + coverage run -a -m behave -f progress2 + +# Combined coverage: erase, run pytest+behave, show and export XML/HTML +coverage: + coverage erase + pytest -q --cov=jsocket --cov-branch --cov-report=term + coverage run -a -m behave -f progress2 + coverage report -m + coverage xml -o coverage.xml + coverage html -d .coverage_html + +# Static analysis with pylint; fail if score below threshold +lint: + mkdir -p .pylint.d + PYLINTHOME=.pylint.d pylint jsocket tests features/steps --fail-under=9.0 --persistent=n diff --git a/coverage.xml b/coverage.xml new file mode 100644 index 0000000..48dc476 --- /dev/null +++ b/coverage.xml @@ -0,0 +1,310 @@ + + + + + + /home/chris/python-json-socket/jsocket + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/features/steps/steps.py b/features/steps/steps.py index 2444bbe..8245fb7 100644 --- a/features/steps/steps.py +++ b/features/steps/steps.py @@ -1,3 +1,10 @@ +"""Behave step implementations for json socket scenarios. + +Note: Pylint is unaware of Behave's decorator callables, so we disable the +"not-callable" check for this file. +""" +# pylint: disable=not-callable, missing-function-docstring + import json import logging import time @@ -12,8 +19,10 @@ class MyServer(jsocket.ThreadedServer): + """Simple echo server used by Behave scenarios.""" + def __init__(self, **kwargs): - super(MyServer, self).__init__(**kwargs) + super().__init__(**kwargs) self.timeout = 2.0 def _process_message(self, obj): @@ -60,7 +69,7 @@ def server_sends_object(context, obj): def client_sees_message(context, obj): expected = json.loads(obj) msg = context.jsonclient.read_obj() - assert msg == expected, "%s" % expected + assert msg == expected, f"{expected}" @then(r"within (\d+(?:\.\d+)?) seconds the server is connected") @@ -138,6 +147,8 @@ def client_attempts_read_with_timeout(context, seconds): def client_read_fails(context): e = getattr(context, 'client_read_exception', None) # Either a socket.timeout or a RuntimeError("socket connection broken") is acceptable - assert e is not None, "client read unexpectedly succeeded: %r" % getattr(context, 'client_read_value', None) - acceptable = isinstance(e, socket.timeout) or (isinstance(e, RuntimeError) and 'socket connection broken' in str(e)) + assert e is not None, f"client read unexpectedly succeeded: {getattr(context, 'client_read_value', None)!r}" + acceptable = isinstance(e, socket.timeout) or ( + isinstance(e, RuntimeError) and 'socket connection broken' in str(e) + ) assert acceptable, f"unexpected exception type: {type(e)} {e}" diff --git a/jsocket/jsocket_base.py b/jsocket/jsocket_base.py index 7d612c5..50016c8 100644 --- a/jsocket/jsocket_base.py +++ b/jsocket/jsocket_base.py @@ -30,30 +30,37 @@ logger = logging.getLogger("jsocket") -class JsonSocket(object): +class JsonSocket: + """Lightweight JSON-over-TCP socket wrapper with length-prefixed framing.""" + def __init__(self, address='127.0.0.1', port=5489, timeout=2.0): self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.conn = self.socket self._timeout = timeout self._address = address self._port = port + # Ensure the primary socket respects timeout for accept/connect operations + self.socket.settimeout(self._timeout) def send_obj(self, obj): + """Send a JSON-serializable object over the connection.""" msg = json.dumps(obj, ensure_ascii=False) if self.socket: payload = msg.encode('utf-8') - frmt = "=%ds" % len(payload) + frmt = f"={len(payload)}s" packed_msg = struct.pack(frmt, payload) packed_hdr = struct.pack('!I', len(packed_msg)) self._send(packed_hdr) self._send(packed_msg) def _send(self, msg): + """Send all bytes in `msg` to the peer.""" sent = 0 while sent < len(msg): sent += self.conn.send(msg[sent:]) def _read(self, size): + """Read exactly `size` bytes from the peer or raise on disconnect.""" data = b'' while len(data) < size: data_tmp = self.conn.recv(size - len(data)) @@ -63,51 +70,80 @@ def _read(self, size): return data def _msg_length(self): + """Read and unpack the 4-byte big-endian length header.""" d = self._read(4) s = struct.unpack('!I', d) return s[0] def read_obj(self): + """Read a full message and decode it as JSON, returning a Python object.""" size = self._msg_length() data = self._read(size) - frmt = "=%ds" % size + frmt = f"={size}s" msg = struct.unpack(frmt, data) return json.loads(msg[0].decode('utf-8')) def close(self): + """Close active connection and the listening socket if open.""" logger.debug("closing all connections") self._close_connection() self._close_socket() def _close_socket(self): + """Best-effort shutdown and close of the main socket.""" logger.debug("closing main socket") - if self.socket.fileno() != -1: - self.socket.shutdown(socket.SHUT_RDWR) - self.socket.close() + try: + if self.socket and self.socket.fileno() != -1: + try: + self.socket.shutdown(socket.SHUT_RDWR) + except OSError: + pass + try: + self.socket.close() + except OSError: + pass + except OSError: + pass def _close_connection(self): + """Best-effort shutdown and close of the accepted connection socket.""" logger.debug("closing the connection socket") - if self.conn.fileno() != -1: - self.conn.shutdown(socket.SHUT_RDWR) - self.conn.close() + try: + if self.conn and self.conn is not self.socket and self.conn.fileno() != -1: + try: + self.conn.shutdown(socket.SHUT_RDWR) + except OSError: + pass + try: + self.conn.close() + except OSError: + pass + except OSError: + pass def _get_timeout(self): + """Get the current socket timeout in seconds.""" return self._timeout def _set_timeout(self, timeout): + """Set the socket timeout in seconds and apply to the main socket.""" self._timeout = timeout self.socket.settimeout(timeout) def _get_address(self): + """Return the configured bind address.""" return self._address def _set_address(self, address): + """No-op: address is read-only after initialization.""" pass def _get_port(self): + """Return the configured bind port.""" return self._port def _set_port(self, port): + """No-op: port is read-only after initialization.""" pass timeout = property(_get_timeout, _set_timeout, doc='Get/set the socket timeout') @@ -116,97 +152,58 @@ def _set_port(self, port): class JsonServer(JsonSocket): + """Server socket that accepts one connection at a time.""" + def __init__(self, address='127.0.0.1', port=5489): - super(JsonServer, self).__init__(address, port) + super().__init__(address, port) self._bind() def _bind(self): self.socket.bind((self.address, self.port)) def _listen(self): - self.socket.listen(1) + self.socket.listen(5) def _accept(self): return self.socket.accept() def accept_connection(self): + """Listen and accept a single client connection; set timeout accordingly.""" self._listen() self.conn, addr = self._accept() self.conn.settimeout(self.timeout) - logger.debug("connection accepted, conn socket (%s,%d,%d)" % (addr[0], addr[1], self.conn.gettimeout())) + logger.debug( + "connection accepted, conn socket (%s,%d,%s)", addr[0], addr[1], str(self.conn.gettimeout()) + ) + + def _reset_connection_ref(self): + """Reset the server's connection reference to the listening socket.""" + self.conn = self.socket def _is_connected(self): try: return (self.conn is not None) and (self.conn is not self.socket) and (self.conn.fileno() != -1) - except Exception: + except (OSError, AttributeError): return False connected = property(_is_connected, doc="True if server has an active client connection") class JsonClient(JsonSocket): + """Client socket for connecting to a JsonServer and exchanging JSON messages.""" + def __init__(self, address='127.0.0.1', port=5489): - super(JsonClient, self).__init__(address, port) + super().__init__(address, port) def connect(self): - for i in range(10): + """Attempt to connect to the server up to 10 times with backoff.""" + for _ in range(10): try: self.socket.connect((self.address, self.port)) except socket.error as msg: - logger.error("SockThread Error: %s" % msg) + logger.error("SockThread Error: %s", msg) time.sleep(3) continue logger.info("...Socket Connected") return True return False - - -if __name__ == "__main__": - """ basic json echo server """ - import threading - logger.setLevel(logging.DEBUG) - FORMAT = '[%(asctime)-15s][%(levelname)s][%(module)s][%(funcName)s] %(message)s' - logging.basicConfig(format=FORMAT) - - def server_thread(): - logger.debug("starting JsonServer") - server = JsonServer() - server.accept_connection() - while 1: - try: - msg = server.read_obj() - logger.info("server received: %s" % msg) - server.send_obj(msg) - except socket.timeout as e: - logger.debug("server socket.timeout: %s" % e) - continue - except Exception as e: - logger.error("server: %s" % e) - break - - server.close() - - t = threading.Timer(1, server_thread) - t.start() - - time.sleep(2) - logger.debug("starting JsonClient") - - client = JsonClient() - client.connect() - - i = 0 - while i < 10: - client.send_obj({"i": i}) - try: - msg = client.read_obj() - logger.info("client received: %s" % msg) - except socket.timeout as e: - logger.debug("client socket.timeout: %s" % e) - continue - except Exception as e: - logger.error("client: %s" % e) - break - i = i + 1 - - client.close() diff --git a/jsocket/tserver.py b/jsocket/tserver.py index 2bed77b..82a6cdc 100644 --- a/jsocket/tserver.py +++ b/jsocket/tserver.py @@ -22,22 +22,24 @@ """ __version__ = "1.0.3" -import jsocket.jsocket_base as jsocket_base import threading import socket import time import logging import abc from typing import Optional +from jsocket import jsocket_base logger = logging.getLogger("jsocket.tserver") class ThreadedServer(threading.Thread, jsocket_base.JsonServer, metaclass=abc.ABCMeta): + """Single-threaded server that accepts one connection and processes messages in its thread.""" + def __init__(self, **kwargs): threading.Thread.__init__(self) jsocket_base.JsonServer.__init__(self, **kwargs) - self._isAlive = False + self._is_alive = False @abc.abstractmethod def _process_message(self, obj) -> Optional[dict]: @@ -52,22 +54,26 @@ def _process_message(self, obj) -> Optional[dict]: return None def run(self): - while self._isAlive: + # Ensure the run loop is active even when run() is invoked directly + # (tests may call run() in a separate thread without invoking start()). + if not self._is_alive: + self._is_alive = True + while self._is_alive: try: self.accept_connection() except socket.timeout as e: - logger.debug("socket.timeout: %s" % e) + logger.debug("socket.timeout: %s", e) continue - except Exception as e: + except Exception as e: # pylint: disable=broad-exception-caught # Avoid noisy error logs during normal shutdown/sequencing - if self._isAlive: + if self._is_alive: logger.debug("accept_connection error: %s", e) else: logger.debug("server stopping; accept loop exiting") break continue - while self._isAlive: + while self._is_alive: try: obj = self.read_obj() resp_obj = self._process_message(obj) @@ -75,9 +81,9 @@ def run(self): logger.debug("message has a response") self.send_obj(resp_obj) except socket.timeout as e: - logger.debug("socket.timeout: %s" % e) + logger.debug("socket.timeout: %s", e) continue - except Exception as e: + except Exception as e: # pylint: disable=broad-exception-caught # Treat client disconnects as normal; keep logs at info/debug msg = str(e) if isinstance(e, RuntimeError) and 'socket connection broken' in msg: @@ -89,7 +95,7 @@ def run(self): # Ensure sockets are cleaned up when the server stops try: self.close() - except Exception: + except OSError: pass def start(self): @@ -98,8 +104,8 @@ def start(self): @retval None """ - self._isAlive = True - super(ThreadedServer, self).start() + self._is_alive = True + super().start() logger.debug("Threaded Server has been started.") def stop(self): @@ -108,15 +114,17 @@ def stop(self): @retval None """ - self._isAlive = False + self._is_alive = False logger.debug("Threaded Server has been stopped.") class ServerFactoryThread(threading.Thread, jsocket_base.JsonSocket, metaclass=abc.ABCMeta): + """Per-connection worker thread used by ServerFactory.""" + def __init__(self, **kwargs): threading.Thread.__init__(self, **kwargs) jsocket_base.JsonSocket.__init__(self, **kwargs) - self._isAlive = False + self._is_alive = False def swap_socket(self, new_sock): """ Swaps the existing socket with a new one. Useful for setting socket after a new connection. @@ -124,7 +132,6 @@ def swap_socket(self, new_sock): @param new_sock socket to replace the existing default jsocket.JsonSocket object @retval None """ - del self.socket self.socket = new_sock self.conn = self.socket @@ -132,7 +139,7 @@ def run(self): """ Should exit when client closes socket conn. Can force an exit with force_stop. """ - while self._isAlive: + while self._is_alive: try: obj = self.read_obj() resp_obj = self._process_message(obj) @@ -140,11 +147,11 @@ def run(self): logger.debug("message has a response") self.send_obj(resp_obj) except socket.timeout as e: - logger.debug("socket.timeout: %s" % e) + logger.debug("socket.timeout: %s", e) continue - except Exception as e: - logger.info("client connection broken, exit and close connection socket") - self._isAlive = False + except Exception as e: # pylint: disable=broad-exception-caught + logger.info("client connection broken, closing connection: %s", e) + self._is_alive = False break self._close_connection() @@ -164,8 +171,8 @@ def start(self): @retval None """ - self._isAlive = True - super(ServerFactoryThread, self).start() + self._is_alive = True + super().start() logger.debug("ServerFactoryThread has been started.") def force_stop(self): @@ -175,11 +182,12 @@ def force_stop(self): @retval None """ - self._isAlive = False + self._is_alive = False logger.debug("ServerFactoryThread has been stopped.") class ServerFactory(ThreadedServer): + """Accepts clients and spawns a ServerFactoryThread per connection.""" def __init__(self, server_thread, **kwargs): ThreadedServer.__init__(self, address=kwargs['address'], port=kwargs['port']) if not issubclass(server_thread, ServerFactoryThread): @@ -195,20 +203,28 @@ def _process_message(self, obj) -> Optional[dict]: return None def run(self): - while self._isAlive: + # Ensure the run loop is active even when run() is invoked directly + # (tests may call run() in a separate thread without invoking start()). + if not self._is_alive: + self._is_alive = True + while self._is_alive: tmp = self._thread_type(**self._thread_args) self._purge_threads() - while not self.connected and self._isAlive: + while not self.connected and self._is_alive: try: self.accept_connection() except socket.timeout as e: - logger.debug("socket.timeout: %s" % e) + logger.debug("socket.timeout: %s", e) continue - except Exception as e: - logger.exception(e) + except Exception as e: # pylint: disable=broad-exception-caught + logger.exception("accept error: %s", e) continue else: - tmp.swap_socket(self.conn) + # Hand off the accepted connection to the worker + accepted_conn = self.conn + # Reset server connection reference so we can accept again + self._reset_connection_ref() + tmp.swap_socket(accepted_conn) tmp.start() self._threads.append(tmp) break @@ -223,9 +239,17 @@ def stop_all(self): t.join() def _purge_threads(self): - for t in self._threads: - if not t.is_alive(): - self._threads.remove(t) + # Rebuild list to avoid mutating while iterating + self._threads = [t for t in self._threads if t.is_alive()] + + def stop(self): + # Stop accepting and stop all workers + self._is_alive = False + try: + self.stop_all() + except Exception: # pylint: disable=broad-exception-caught + pass + logger.debug("ServerFactory has been stopped.") def _wait_to_exit(self): while self._get_num_of_active_threads(): diff --git a/requirements-dev.txt b/requirements-dev.txt index 09e1af7..e5c0dab 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -1,3 +1,6 @@ behave>=1.2.6 pytest>=7.0 pytest-timeout>=2.1 +coverage>=7.5 +pytest-cov>=4.1 +pylint>=3.2 diff --git a/tests/test_additional_coverage.py b/tests/test_additional_coverage.py new file mode 100644 index 0000000..9bdda6d --- /dev/null +++ b/tests/test_additional_coverage.py @@ -0,0 +1,156 @@ +"""Additional tests to exercise error/edge paths for higher coverage. + +These tests avoid real network I/O by monkeypatching/stubbing where possible. +""" + +import threading +import time +import socket +import pytest + +import jsocket + + +def test_client_connect_failure_returns_false(monkeypatch): + """JsonClient.connect should return False after repeated failures.""" + + def fail_connect(self, addr): # pylint: disable=unused-argument + raise OSError("boom") + + # Short-circuit sleeps so the test is fast + monkeypatch.setattr(jsocket.jsocket_base.time, "sleep", lambda *_: None) + monkeypatch.setattr(socket.socket, "connect", fail_connect, raising=True) + + try: + client = jsocket.JsonClient(address="127.0.0.1", port=9) + except PermissionError as e: # sandboxed environments may forbid sockets + pytest.skip(f"Socket creation blocked: {e}") + assert client.connect() is False + + +def test_close_idempotent_and_connected_guard(): + """close() is safe to call multiple times; connected guard tolerates None.""" + # Client close idempotence + try: + c = jsocket.JsonClient(address="127.0.0.1", port=0) + except PermissionError as e: # sandboxed environments may forbid sockets + pytest.skip(f"Socket creation blocked: {e}") + c.close() + c.close() # should not raise + + # Server connected property guard (None conn) + try: + s = jsocket.JsonServer(address="127.0.0.1", port=0) + except PermissionError as e: # sandboxed environments may forbid sockets + pytest.skip(f"Socket creation blocked: {e}") + s.conn = None + assert s.connected is False + s.close() + + +def test_threadedserver_timeout_then_exception_triggers_close(monkeypatch): + """ThreadedServer should ignore timeouts and close on generic exceptions.""" + + class ProbeServer(jsocket.ThreadedServer): + def __init__(self): + # Do not call super to avoid binding sockets + threading.Thread.__init__(self) + self._is_alive = True + self._close_calls = 0 + self._reads = iter([ + lambda: (_ for _ in ()).throw(socket.timeout("t")), + lambda: (_ for _ in ()).throw(ValueError("boom")), + ]) + + def accept_connection(self): + # No-op; simulate an accepted connection + return None + + def read_obj(self): + # First a timeout, then an exception, then timeouts until stopped + st = getattr(self, "_state", 0) + if st == 0: + self._state = 1 + raise socket.timeout("t") + if st == 1: + self._state = 2 + raise ValueError("boom") + raise socket.timeout("t") + + def _process_message(self, obj): # pragma: no cover - not reached + return None + + def _close_connection(self): + self._close_calls += 1 + + def close(self): + # Avoid base close touching real sockets + return None + + srv = ProbeServer() + srv.start() + # Let the loop process the two read attempts + time.sleep(0.1) + srv.stop() + srv.join(timeout=1.0) + # One close due to ValueError path + assert srv._close_calls == 1 + + +def test_serverfactorythread_exception_closes_connection(): + """ServerFactoryThread should close the connection when handler raises.""" + + class BoomWorker(jsocket.ServerFactoryThread): + def __init__(self): + # Avoid base JsonSocket init + threading.Thread.__init__(self) + self._is_alive = True + self.closed = False + + def _process_message(self, obj): # pylint: disable=unused-argument + raise ValueError("boom") + + def read_obj(self): + return {"echo": 1} + + def _close_connection(self): + self.closed = True + + w = BoomWorker() + w.start() + w.join(timeout=1.0) + assert w.closed is True + + +def test_serverfactory_accept_error_branch(monkeypatch): + """ServerFactory should continue on accept() errors and then stop cleanly.""" + + class EchoWorker(jsocket.ServerFactoryThread): + def __init__(self): + threading.Thread.__init__(self) + self._is_alive = False + + def _process_message(self, obj): # pragma: no cover - not used here + return None + + # Real factory to get run loop; we'll stub accept_connection to raise + try: + server = jsocket.ServerFactory(EchoWorker, address="127.0.0.1", port=0) + except PermissionError as e: # sandboxed environments may forbid sockets + pytest.skip(f"Socket creation blocked: {e}") + + calls = {"n": 0} + + def flappy_accept(): + calls["n"] += 1 + if calls["n"] == 1: + raise RuntimeError("accept failed") + # On second call, request stop + server._is_alive = False + + monkeypatch.setattr(server, "accept_connection", flappy_accept) + + t = threading.Thread(target=server.run, daemon=True) + t.start() + t.join(timeout=1.0) + assert calls["n"] >= 1 diff --git a/tests/test_e2e.py b/tests/test_e2e.py index d4cd431..85b6f34 100644 --- a/tests/test_e2e.py +++ b/tests/test_e2e.py @@ -1,20 +1,22 @@ +"""Pytest end-to-end tests for basic client/server echo.""" + import time -import socket import pytest import jsocket class EchoServer(jsocket.ThreadedServer): + """Minimal echo server for tests.""" def __init__(self, **kwargs): super().__init__(**kwargs) self.timeout = 2.0 - self.isConnected = False + self.is_connected = False def _process_message(self, obj): if obj != '': if obj.get('message') == 'new connection': - self.isConnected = True + self.is_connected = True # echo back if present if 'echo' in obj: return obj @@ -23,6 +25,7 @@ def _process_message(self, obj): @pytest.mark.timeout(10) def test_end_to_end_echo_and_connection(): + """Server accepts a connection and echoes payloads end-to-end.""" try: server = EchoServer(address='127.0.0.1', port=0) except PermissionError as e: @@ -39,7 +42,7 @@ def test_end_to_end_echo_and_connection(): # Signal connection and wait briefly for server to process client.send_obj({"message": "new connection"}) time.sleep(0.2) - assert server.isConnected is True + assert server.is_connected is True # Echo round-trip payload = {"echo": "hello", "i": 1} diff --git a/tests/test_listener_persistence.py b/tests/test_listener_persistence.py index c5ec80f..1f57264 100644 --- a/tests/test_listener_persistence.py +++ b/tests/test_listener_persistence.py @@ -1,11 +1,13 @@ +"""Pytest: server should accept multiple clients sequentially without restart.""" + import time import pytest -import socket import jsocket class EchoServer(jsocket.ThreadedServer): + """Echo server used to verify listener persistence.""" def __init__(self, **kwargs): super().__init__(**kwargs) self.timeout = 1.0 @@ -53,4 +55,3 @@ def test_server_accepts_multiple_clients_sequentially(): finally: server.stop() server.join(timeout=3) - diff --git a/tests/test_serverfactory_concurrent.py b/tests/test_serverfactory_concurrent.py new file mode 100644 index 0000000..057381d --- /dev/null +++ b/tests/test_serverfactory_concurrent.py @@ -0,0 +1,65 @@ +"""Pytest: ServerFactory should handle two clients concurrently.""" + +import time +import pytest + +import jsocket + + +class EchoWorker(jsocket.ServerFactoryThread): + """Worker that echoes messages containing 'echo' key.""" + def __init__(self, **kwargs): + super().__init__(**kwargs) + self.timeout = 1.0 + + def _process_message(self, obj): + if isinstance(obj, dict) and 'echo' in obj: + return obj + return None + + +@pytest.mark.timeout(15) +def test_serverfactory_handles_two_clients_concurrently(): + """Two clients can connect and receive echoes concurrently.""" + try: + server = jsocket.ServerFactory(EchoWorker, address='127.0.0.1', port=0) + except PermissionError as e: + pytest.skip(f"Socket creation blocked: {e}") + + _, port = server.socket.getsockname() + server.start() + + try: + c1 = jsocket.JsonClient(address='127.0.0.1', port=port) + c2 = jsocket.JsonClient(address='127.0.0.1', port=port) + assert c1.connect() is True + assert c2.connect() is True + c1.timeout = 1.5 + c2.timeout = 1.5 + + # Send from both clients without closing the first + p1 = {"echo": "alpha"} + p2 = {"echo": "beta"} + c1.send_obj(p1) + c2.send_obj(p2) + + # Both should receive echoes without waiting for the other to disconnect + r1 = c1.read_obj() + r2 = c2.read_obj() + assert r1 == p1 + assert r2 == p2 + + # At some point, we should have at least two active workers + time.sleep(0.2) + assert getattr(server, 'active', 0) >= 2 + + c1.close() + c2.close() + finally: + try: + if hasattr(server, 'stop_all'): + server.stop_all() + except Exception: + pass + server.stop() + server.join(timeout=3) diff --git a/tests/test_serverfactory_serialization.py b/tests/test_serverfactory_serialization.py new file mode 100644 index 0000000..adc14f8 --- /dev/null +++ b/tests/test_serverfactory_serialization.py @@ -0,0 +1,75 @@ +"""Pytest: ServerFactory concurrency behavior (updated to allow multiple clients).""" + +import time +import pytest + +import jsocket + + +class EchoWorker(jsocket.ServerFactoryThread): + """Worker that echoes messages containing 'echo' key.""" + def __init__(self, **kwargs): + super().__init__(**kwargs) + self.timeout = 1.0 + + def _process_message(self, obj): + # Echo payloads that include 'echo' for verification + if isinstance(obj, dict) and 'echo' in obj: + return obj + return None + + +@pytest.mark.timeout(15) +def test_serverfactory_accepts_multiple_active_clients_concurrently(): + """ServerFactory accepts a second client while the first is active. + + Updated behavior: ServerFactory accepts additional clients while one is + already active, starting a new worker per connection. + """ + try: + server = jsocket.ServerFactory(EchoWorker, address='127.0.0.1', port=0) + except PermissionError as e: + pytest.skip(f"Socket creation blocked: {e}") + + # Discover ephemeral port and start server + _, port = server.socket.getsockname() + server.start() + + try: + # First client establishes a connection and gets echoed responses + c1 = jsocket.JsonClient(address='127.0.0.1', port=port) + assert c1.connect() is True + c1.timeout = 1.0 + payload1 = {"echo": "one"} + c1.send_obj(payload1) + echoed1 = c1.read_obj() + assert echoed1 == payload1 + + # Second client connects while the first is still active + c2 = jsocket.JsonClient(address='127.0.0.1', port=port) + assert c2.connect() is True + c2.timeout = 1.0 + payload2 = {"echo": "two"} + c2.send_obj(payload2) + + # Both clients should receive responses + echoed2 = c2.read_obj() + assert echoed2 == payload2 + + # Active workers should reach at least 2 + time.sleep(0.2) + assert getattr(server, 'active', 0) >= 2 + + # Clean up clients + c2.close() + c1.close() + finally: + # Stop the server thread and join + try: + # Ensure any worker threads are stopped to avoid hangs + if hasattr(server, 'stop_all'): + server.stop_all() + except Exception: + pass + server.stop() + server.join(timeout=3)