From 8ae1958f19d289bf072bdc67869d538aaf403cb6 Mon Sep 17 00:00:00 2001 From: Josiah Noel <32279667+SentryMan@users.noreply.github.com> Date: Tue, 2 Sep 2025 21:29:35 -0400 Subject: [PATCH 01/11] websocket plugin --- avaje-jex-websockets/pom.xml | 19 + .../io/avaje/jex/websocket/JexWebSocket.java | 53 +++ .../io/avaje/jex/websocket/WebSocket.java | 47 +++ .../websocket/WebSocketExchangeHandler.java | 19 + .../avaje/jex/websocket/WebSocketFrame.java | 110 +++++ .../jex/websocket/WebSocketListener.java | 56 +++ .../avaje/jex/websocket/WebSocketPlugin.java | 28 ++ .../io/avaje/jex/websocket/WsContext.java | 185 +++++++++ .../jex/websocket/exception/CloseCode.java | 35 ++ .../exception/WebSocketException.java | 32 ++ .../websocket/internal/AbstractWebSocket.java | 262 ++++++++++++ .../jex/websocket/internal/CloseFrame.java | 44 ++ .../avaje/jex/websocket/internal/LICENSE.md | 14 + .../avaje/jex/websocket/internal/State.java | 42 ++ .../io/avaje/jex/websocket/internal/Util.java | 67 +++ .../avaje/jex/websocket/internal/WSFrame.java | 390 ++++++++++++++++++ .../websocket/internal/WebSocketHandler.java | 70 ++++ .../src/main/java/module-info.java | 23 ++ .../internal/EchoWebSocketHandler.java | 20 + .../internal/WebSocketClientUtil.java | 72 ++++ .../jex/websocket/internal/WebSocketTest.java | 76 ++++ .../java/io/avaje/jex/core/JdkContext.java | 9 + .../jex/core/json/JacksonJsonService.java | 16 +- .../avaje/jex/core/json/JsonbJsonService.java | 5 + .../main/java/io/avaje/jex/http/Context.java | 8 + .../java/io/avaje/jex/spi/JsonService.java | 11 +- pom.xml | 1 + 27 files changed, 1711 insertions(+), 3 deletions(-) create mode 100644 avaje-jex-websockets/pom.xml create mode 100644 avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/JexWebSocket.java create mode 100644 avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/WebSocket.java create mode 100644 avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/WebSocketExchangeHandler.java create mode 100644 avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/WebSocketFrame.java create mode 100644 avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/WebSocketListener.java create mode 100644 avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/WebSocketPlugin.java create mode 100644 avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/WsContext.java create mode 100644 avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/exception/CloseCode.java create mode 100644 avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/exception/WebSocketException.java create mode 100644 avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/internal/AbstractWebSocket.java create mode 100644 avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/internal/CloseFrame.java create mode 100644 avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/internal/LICENSE.md create mode 100644 avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/internal/State.java create mode 100644 avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/internal/Util.java create mode 100644 avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/internal/WSFrame.java create mode 100644 avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/internal/WebSocketHandler.java create mode 100644 avaje-jex-websockets/src/main/java/module-info.java create mode 100644 avaje-jex-websockets/src/test/java/io/avaje/jex/websocket/internal/EchoWebSocketHandler.java create mode 100644 avaje-jex-websockets/src/test/java/io/avaje/jex/websocket/internal/WebSocketClientUtil.java create mode 100644 avaje-jex-websockets/src/test/java/io/avaje/jex/websocket/internal/WebSocketTest.java diff --git a/avaje-jex-websockets/pom.xml b/avaje-jex-websockets/pom.xml new file mode 100644 index 00000000..eaf440a2 --- /dev/null +++ b/avaje-jex-websockets/pom.xml @@ -0,0 +1,19 @@ + + 4.0.0 + + io.avaje + avaje-jex-parent + 3.3-RC4 + + avaje-jex-websockets + + io.avaje + avaje-jex + + + io.avaje + avaje-jex-test + test + + + \ No newline at end of file diff --git a/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/JexWebSocket.java b/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/JexWebSocket.java new file mode 100644 index 00000000..649dfa63 --- /dev/null +++ b/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/JexWebSocket.java @@ -0,0 +1,53 @@ +package io.avaje.jex.websocket; + +import io.avaje.jex.http.Context; +import io.avaje.jex.websocket.WsContext.WsBinaryMessage; +import io.avaje.jex.websocket.WsContext.WsClose; +import io.avaje.jex.websocket.WsContext.WsError; +import io.avaje.jex.websocket.WsContext.WsMessage; +import io.avaje.jex.websocket.WsContext.WsOpen; +import io.avaje.jex.websocket.WsContext.WsPong; +import io.avaje.jex.websocket.exception.CloseCode; +import io.avaje.jex.websocket.internal.AbstractWebSocket; + +class JexWebSocket extends AbstractWebSocket { + + private final WebSocketListener listener; + private final Context ctx; + + JexWebSocket(Context ctx, WebSocketListener listener) { + super(ctx.exchange()); + this.listener = listener; + this.ctx = ctx; + } + + @Override + protected void onClose(CloseCode code, String reason, boolean initiatedByRemote) { + listener.onClose(new WsClose(ctx, this, code, reason, initiatedByRemote)); + } + + @Override + protected void onMessage(WebSocketFrame frame) { + switch (frame.opCode()) { + case TEXT -> listener.onMessage(new WsMessage(ctx, this, frame, frame.textPayload())); + case BINARY -> + listener.onBinaryMessage(new WsBinaryMessage(ctx, this, frame, frame.binaryPayload())); + default -> throw new IllegalArgumentException("Unexpected value: "); + } + } + + @Override + protected void onPong(WebSocketFrame pong) { + listener.onPong(new WsPong(ctx, this, pong)); + } + + @Override + protected void onOpen() { + listener.onOpen(new WsOpen(ctx, this)); + } + + @Override + protected void onError(Exception exception) { + listener.onError(new WsError(ctx, this, exception)); + } +} diff --git a/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/WebSocket.java b/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/WebSocket.java new file mode 100644 index 00000000..e8948c39 --- /dev/null +++ b/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/WebSocket.java @@ -0,0 +1,47 @@ +package io.avaje.jex.websocket; + +import io.avaje.jex.websocket.exception.CloseCode; + +/** + * Represents a WebSocket connection, providing methods to interact with the connection. + * Allows sending and receiving messages, pinging, and closing the connection. + */ +public interface WebSocket { + + /** + * Checks if the WebSocket connection is open. + * + * @return true if the connection is open, false otherwise + */ + boolean isOpen(); + + /** + * Closes the WebSocket connection with the specified close code and reason. + * + * @param code the close code indicating the reason for closure + * @param reason the reason for closing the connection + * @param initiatedByRemote true if the close was initiated by the remote endpoint, false otherwise + */ + void close(CloseCode code, String reason, boolean initiatedByRemote); + + /** + * Sends a ping frame with the specified payload to the remote endpoint. + * + * @param payload the ping payload as a byte array + */ + void ping(byte[] payload); + + /** + * Sends a binary message to the remote endpoint. + * + * @param payload the binary payload as a byte array + */ + void send(byte[] payload); + + /** + * Sends a text message to the remote endpoint. + * + * @param payload the text payload as a string + */ + void send(String payload); +} \ No newline at end of file diff --git a/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/WebSocketExchangeHandler.java b/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/WebSocketExchangeHandler.java new file mode 100644 index 00000000..fd7e3b66 --- /dev/null +++ b/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/WebSocketExchangeHandler.java @@ -0,0 +1,19 @@ +package io.avaje.jex.websocket; + +import io.avaje.jex.http.Context; +import io.avaje.jex.websocket.internal.WebSocketHandler; + +class WebSocketExchangeHandler extends WebSocketHandler { + + private final WebSocketListener listener; + + WebSocketExchangeHandler(WebSocketListener listener) { + this.listener = listener; + } + + @Override + protected JexWebSocket openWebSocket(Context exchange) { + + return new JexWebSocket(exchange, listener); + } +} diff --git a/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/WebSocketFrame.java b/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/WebSocketFrame.java new file mode 100644 index 00000000..39b57f10 --- /dev/null +++ b/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/WebSocketFrame.java @@ -0,0 +1,110 @@ +package io.avaje.jex.websocket; + +import java.io.IOException; +import java.io.OutputStream; +import java.util.Arrays; +import java.util.Map; +import java.util.stream.Collectors; + +/** + * Represents a WebSocket frame as defined by RFC 6455. Provides access to frame payload, masking, + * opcode, and frame control information. + */ +public interface WebSocketFrame { + + /** + * Returns the binary payload of the frame. + * + * @return the binary payload as a byte array + */ + byte[] binaryPayload(); + + /** + * Returns the masking key used for the frame, if present. + * + * @return the masking key as a byte array, or null if not masked + */ + byte[] maskingKey(); + + /** + * Returns the opcode of the frame. + * + * @return the opcode + */ + OpCode opCode(); + + /** + * Returns the text payload of the frame, if applicable. + * + * @return the text payload, or null if not a text frame + */ + String textPayload(); + + /** + * Indicates if this frame is the final fragment in a message. + * + * @return true if final fragment, false otherwise + */ + boolean isFin(); + + /** + * Indicates if the frame is masked. + * + * @return true if masked, false otherwise + */ + boolean isMasked(); + + /** + * Writes the frame to the given output stream in WebSocket frame format. + * + * @param out the output stream to write to + * @throws IOException if an I/O error occurs + */ + void write(OutputStream out) throws IOException; + + /** WebSocket opcodes */ + public enum OpCode { + CONTINUATION(0), + TEXT(1), + BINARY(2), + CLOSE(8), + PING(9), + PONG(10); + + private final byte code; + private static final Map VALUES = + Arrays.stream(values()).collect(Collectors.toMap(OpCode::value, e -> e)); + + OpCode(int code) { + this.code = (byte) code; + } + + /** + * Finds the OpCode corresponding to the given byte value. + * + * @param value the opcode value + * @return the matching OpCode, or null if not found + */ + public static OpCode find(byte value) { + return VALUES.get(value); + } + + /** + * Returns the byte value of this opcode. + * + * @return the opcode value + */ + public byte value() { + return this.code; + } + + /** + * Indicates if this opcode is a control frame (close, ping, pong). + * + * @return true if control frame, false otherwise + */ + public boolean isControlFrame() { + return this == CLOSE || this == PING || this == PONG; + } + } +} diff --git a/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/WebSocketListener.java b/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/WebSocketListener.java new file mode 100644 index 00000000..9b6870f7 --- /dev/null +++ b/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/WebSocketListener.java @@ -0,0 +1,56 @@ +package io.avaje.jex.websocket; + +import io.avaje.jex.websocket.WsContext.WsBinaryMessage; +import io.avaje.jex.websocket.WsContext.WsClose; +import io.avaje.jex.websocket.WsContext.WsError; +import io.avaje.jex.websocket.WsContext.WsMessage; +import io.avaje.jex.websocket.WsContext.WsOpen; +import io.avaje.jex.websocket.WsContext.WsPong; + +/** + * Holds the different WebSocket handlers for a specific {@link WsHandlerEntry} or the WebSocket + * log. + */ +public interface WebSocketListener { + /** + * Called when a binary message is received. + * + * @param binaryPayload the binary payload + */ + default void onBinaryMessage(WsBinaryMessage binaryPayload) {} + + /** + * Called when the websocket is closed. + * + * @param wsClose the close context + */ + default void onClose(WsClose wsClose) {} + + /** + * Called when a text message is received. + * + * @param message the text message + */ + default void onMessage(WsMessage message) {} + + /** + * Called when the websocket is opened. + * + * @param wsOpenContext the open context + */ + default void onOpen(WsOpen wsOpen) {} + + /** + * Called when a pong is received. + * + * @param pong the pong + */ + default void onPong(WsPong wsPong) {} + + /** + * Called when an error occurs. + * + * @param wsError the error + */ + default void onError(WsError wsError) {} +} diff --git a/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/WebSocketPlugin.java b/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/WebSocketPlugin.java new file mode 100644 index 00000000..105feabf --- /dev/null +++ b/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/WebSocketPlugin.java @@ -0,0 +1,28 @@ +package io.avaje.jex.websocket; + +import java.util.ArrayList; +import java.util.List; + +import io.avaje.jex.Jex; +import io.avaje.jex.Routing; +import io.avaje.jex.security.Role; +import io.avaje.jex.spi.JexPlugin; + +public class WebSocketPlugin implements JexPlugin { + + private final List handlers = new ArrayList<>(); + + public WebSocketPlugin ws(String path, WebSocketListener listener, Role... roles) { + handlers.add(r -> r.get(path, new WebSocketExchangeHandler(listener), roles)); + return this; + } + + @Override + public void apply(Jex jex) { + jex.routing().addAll(handlers); + } + + public static WebSocketPlugin create() { + return new WebSocketPlugin(); + } +} diff --git a/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/WsContext.java b/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/WsContext.java new file mode 100644 index 00000000..52f22a42 --- /dev/null +++ b/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/WsContext.java @@ -0,0 +1,185 @@ +package io.avaje.jex.websocket; + +import java.lang.reflect.Type; + +import io.avaje.jex.http.Context; +import io.avaje.jex.websocket.exception.CloseCode; + +/** The context for a WebSocket event */ +public abstract sealed class WsContext { + + protected final Context ctx; + private final WebSocket ws; + + protected WsContext(Context ctx, WebSocket ws) { + this.ctx = ctx; + this.ws = ws; + } + + /** + * Serializes object to a JSON-string using the registered JsonMapper and sends it over the socket + */ + public void send(Object message) { + ws.send(ctx.jsonService().toJsonString(message)); + } + + /** Sends a String over the socket */ + public void send(String message) { + ws.send(message); + } + + /** Sends a byte[] over the socket */ + public void send(byte[] message) { + ws.send(message); + } + + /** Sends a ping over the socket */ + public void sendPing() { + sendPing(null); + } + + /** Sends a ping over the socket */ + public void sendPing(byte[] applicationData) { + ws.ping(applicationData != null ? applicationData : new byte[0]); + } + + /** + * Return the request Context. + * + * @return the request + */ + public Context ctx() { + return ctx; + } + + /** + * Return the Websocket. + * + * @return the request + */ + public WebSocket ws() { + return ws; + } + + /** Close the session */ + public void closeSession() { + ws.close(CloseCode.NORMAL_CLOSURE, "cya", false); + } + + /** Close the session with a CloseCode */ + public void closeSession(CloseCode code) { + ws.close(code, "", false); + } + + /** Close the session with a code and reason */ + public void closeSession(CloseCode code, String reason) { + ws.close(code, reason, false); + } + + public static final class WsOpen extends WsContext { + WsOpen(Context ctx, WebSocket ws) { + super(ctx, ws); + } + } + + public static final class WsPong extends WsMessageCtx { + WsPong(Context ctx, WebSocket ws, WebSocketFrame wsFrame) { + super(ctx, ws, wsFrame); + } + } + + public static final class WsError extends WsContext { + private final Exception error; + + WsError(Context ctx, WebSocket ws, Exception error) { + super(ctx, ws); + this.error = error; + } + + /** Get the Throwable error that occurred */ + public Exception error() { + return error; + } + } + + public static final class WsClose extends WsContext { + private final CloseCode closeCode; + private final String reason; + private final boolean initiatedByRemote; + + WsClose( + Context ctx, WebSocket ws, CloseCode closeCode, String reason, boolean initiatedByRemote) { + super(ctx, ws); + this.closeCode = closeCode; + this.reason = reason; + this.initiatedByRemote = initiatedByRemote; + } + + /** The int status for why connection was closed */ + public CloseCode closeCode() { + return closeCode; + } + + /** The reason for the close */ + public String reason() { + return reason; + } + + /** True if the close was initiated by the remote endpoint */ + public boolean initiatedByRemote() { + return initiatedByRemote; + } + } + + public abstract static sealed class WsMessageCtx extends WsContext { + private final WebSocketFrame wsFrame; + + WsMessageCtx(Context ctx, WebSocket ws, WebSocketFrame wsFrame) { + super(ctx, ws); + this.wsFrame = wsFrame; + } + + /** Get the underlying frame */ + public WebSocketFrame wsFrame() { + return wsFrame; + } + } + + public static final class WsBinaryMessage extends WsMessageCtx { + private final byte[] data; + + WsBinaryMessage(Context ctx, WebSocket ws, WebSocketFrame wsFrame, byte[] data) { + super(ctx, ws, wsFrame); + this.data = data; + } + + /** Get the binary data of the message */ + public byte[] data() { + return data; + } + } + + public static final class WsMessage extends WsMessageCtx { + private final String message; + + WsMessage(Context ctx, WebSocket ws, WebSocketFrame frame, String message) { + super(ctx, ws, frame); + this.message = message; + } + + /** Receive a string message from the client */ + public String message() { + return message; + } + + /** Receive a message from the client as a class */ + public T messageAsClass(Type type) { + return ctx.jsonService().fromJson(type, message); + } + + /** See Also: messageAsClass(Type) */ + public T messageAsClass(Class clazz) { + return messageAsClass((Type) clazz); + } + } +} diff --git a/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/exception/CloseCode.java b/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/exception/CloseCode.java new file mode 100644 index 00000000..4b92320e --- /dev/null +++ b/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/exception/CloseCode.java @@ -0,0 +1,35 @@ +package io.avaje.jex.websocket.exception; + +public enum CloseCode { + NORMAL_CLOSURE(1000), + GOING_AWAY(1001), + PROTOCOL_ERROR(1002), + UNSUPPORTED_DATA(1003), + NO_STATUS_RCVD(1005), + ABNORMAL_CLOSURE(1006), + INVALID_FRAME_PAYLOAD_DATA(1007), + POLICY_VIOLATION(1008), + MESSAGE_TOO_BIG(1009), + MANDATORY_EXT(1010), + INTERNAL_SERVER_ERROR(1011), + TLS_HANDSHAKE(1015); + + public static CloseCode find(int value) { + for (CloseCode code : values()) { + if (code.getValue() == value) { + return code; + } + } + return null; + } + + private final int code; + + CloseCode(int code) { + this.code = code; + } + + public int getValue() { + return this.code; + } +} diff --git a/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/exception/WebSocketException.java b/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/exception/WebSocketException.java new file mode 100644 index 00000000..28002f20 --- /dev/null +++ b/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/exception/WebSocketException.java @@ -0,0 +1,32 @@ +package io.avaje.jex.websocket.exception; + +public class WebSocketException extends RuntimeException { + + private static final long serialVersionUID = 1L; + + private final CloseCode code; + + private final String reason; + + public WebSocketException(CloseCode code, String reason) { + this(code, reason, null); + } + + public WebSocketException(CloseCode code, String reason, Exception cause) { + super(code + ": " + reason, cause); + this.code = code; + this.reason = reason; + } + + public WebSocketException(Exception cause) { + this(CloseCode.INTERNAL_SERVER_ERROR, cause.toString(), cause); + } + + public CloseCode getCode() { + return this.code; + } + + public String getReason() { + return this.reason; + } +} diff --git a/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/internal/AbstractWebSocket.java b/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/internal/AbstractWebSocket.java new file mode 100644 index 00000000..b84b0b5b --- /dev/null +++ b/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/internal/AbstractWebSocket.java @@ -0,0 +1,262 @@ +package io.avaje.jex.websocket.internal; + +import static java.lang.System.Logger.Level.DEBUG; +import static java.lang.System.Logger.Level.INFO; +import static java.lang.System.Logger.Level.TRACE; + +import java.io.EOFException; + +/* + * #%L + * NanoHttpd-Websocket + * %% + * Copyright (C) 2012 - 2016 nanohttpd + * %% + * Redistribution and use in source and binary forms, with or without modification, + * are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, this + * list of conditions and the following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * 3. Neither the name of the nanohttpd nor the names of its contributors + * may be used to endorse or promote products derived from this software without + * specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED + * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. + * IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, + * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, + * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE + * OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED + * OF THE POSSIBILITY OF SUCH DAMAGE. + * #L% + */ + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.UncheckedIOException; +import java.net.URI; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import com.sun.net.httpserver.HttpExchange; + +import io.avaje.jex.websocket.WebSocket; +import io.avaje.jex.websocket.WebSocketFrame; +import io.avaje.jex.websocket.WebSocketFrame.OpCode; +import io.avaje.jex.websocket.exception.CloseCode; +import io.avaje.jex.websocket.exception.WebSocketException; + +public abstract class AbstractWebSocket implements WebSocket { + + private final List continuousFrames = new LinkedList<>(); + + private OpCode continuousOpCode = null; + private final InputStream in; + private Lock lock = new ReentrantLock(); + protected final System.Logger log = System.getLogger("io.avaje.jex.websocket"); + private final OutputStream out; + private volatile State state = State.UNCONNECTED; + private final URI uri; + + protected AbstractWebSocket(HttpExchange exchange) { + this.uri = exchange.getRequestURI(); + log.log(INFO, "connecting websocket {0}", uri); + this.state = State.CONNECTING; + this.in = exchange.getRequestBody(); + this.out = exchange.getResponseBody(); + } + + @Override + public void close(CloseCode code, String reason, boolean initiatedByRemote) { + log.log(INFO, "closing websocket {0}", uri); + + var oldState = this.state; + this.state = State.CLOSING; + if (oldState == State.OPEN) { + sendFrame(new CloseFrame(code, reason)); + } else { + doClose(code, reason, initiatedByRemote); + } + } + + void doClose(CloseCode code, String reason, boolean initiatedByRemote) { + if (this.state == State.CLOSED) { + return; + } + try (in; out) { + // just close the streams + } catch (IOException expected) { + // Expected + } + this.state = State.CLOSED; + onClose(code, reason, initiatedByRemote); + } + + private void handleCloseFrame(WebSocketFrame frame) { + var code = CloseCode.NORMAL_CLOSURE; + var reason = ""; + if (frame instanceof CloseFrame cf) { + code = cf.getCloseCode(); + reason = cf.getCloseReason(); + } + log.log( + TRACE, + "handleCloseFrame: {0}, code={1}, reason={2}, state {3}", + uri, + code, + reason, + this.state); + if (this.state == State.CLOSING) { + // Answer for my requested close + doClose(code, reason, false); + } else { + close(code, reason, true); + } + } + + private void handleFrameFragment(WebSocketFrame frame) { + if (frame.opCode() != OpCode.CONTINUATION) { + // First + if (this.continuousOpCode != null) { + throw new WebSocketException( + CloseCode.PROTOCOL_ERROR, "Previous continuous frame sequence not completed."); + } + this.continuousOpCode = frame.opCode(); + this.continuousFrames.clear(); + this.continuousFrames.add(frame); + } else if (frame.isFin()) { + // Last + if (this.continuousOpCode == null) { + throw new WebSocketException( + CloseCode.PROTOCOL_ERROR, "Continuous frame sequence was not started."); + } + this.continuousFrames.add(frame); + onMessage(new WSFrame(this.continuousOpCode, this.continuousFrames)); + this.continuousOpCode = null; + this.continuousFrames.clear(); + } else if (this.continuousOpCode == null) { + // Unexpected + throw new WebSocketException( + CloseCode.PROTOCOL_ERROR, "Continuous frame sequence was not started."); + } else { + // Intermediate + this.continuousFrames.add(frame); + } + } + + private void handleWebsocketFrame(WebSocketFrame frame) { + onFrameReceived(frame); + if (frame.opCode() == OpCode.CLOSE) { + handleCloseFrame(frame); + } else if (frame.opCode() == OpCode.PING) { + sendFrame(new WSFrame(OpCode.PONG, true, frame.binaryPayload())); + } else if (frame.opCode() == OpCode.PONG) { + onPong(frame); + } else if (!frame.isFin() || frame.opCode() == OpCode.CONTINUATION) { + handleFrameFragment(frame); + } else if (this.continuousOpCode != null) { + throw new WebSocketException( + CloseCode.PROTOCOL_ERROR, "Continuous frame sequence not completed."); + } else if (frame.opCode() == OpCode.TEXT || frame.opCode() == OpCode.BINARY) { + onMessage(frame); + } else { + throw new WebSocketException( + CloseCode.PROTOCOL_ERROR, "Non control or continuous frame expected."); + } + } + + @Override + public boolean isOpen() { + return state == State.OPEN; + } + + protected void onFrameReceived(WebSocketFrame frame) { + log.log(TRACE, "frame received: {0}", frame); + } + + /** + * Debug method. Do not Override unless for debug purposes!
+ * This method is called before actually sending the frame. + * + * @param frame The sent WebSocket Frame. + */ + protected void onFrameSent(WebSocketFrame frame) { + log.log(TRACE, "frame sent: {0}", frame); + } + + protected abstract void onClose(CloseCode code, String reason, boolean initiatedByRemote); + + protected abstract void onError(Exception exception); + + protected abstract void onMessage(WebSocketFrame message) throws WebSocketException; + + protected abstract void onOpen() throws WebSocketException; + + protected abstract void onPong(WebSocketFrame pong) throws WebSocketException; + + @Override + public void ping(byte[] payload) { + sendFrame(new WSFrame(OpCode.PING, true, payload)); + } + + void readWebsocket() { + try { + state = State.OPEN; + log.log(DEBUG, "websocket open {0}", uri); + onOpen(); + while (this.state == State.OPEN) { + handleWebsocketFrame(WSFrame.read(in)); + } + } catch (EOFException e) { + log.log(TRACE, "exception on websocket", e); + onError(e); + doClose(CloseCode.ABNORMAL_CLOSURE, e.toString(), false); + } catch (Exception e) { + onError(e); + if (e instanceof WebSocketException wse) { + doClose(wse.getCode(), wse.getReason(), false); + } else { + doClose(CloseCode.ABNORMAL_CLOSURE, e.toString(), false); + } + } finally { + doClose( + CloseCode.INTERNAL_SERVER_ERROR, + "Handler terminated without closing the connection.", + false); + log.log(TRACE, "readWebsocket() exiting {0}", uri); + } + } + + @Override + public void send(byte[] payload) { + sendFrame(new WSFrame(OpCode.BINARY, true, payload)); + } + + @Override + public void send(String payload) { + sendFrame(new WSFrame(OpCode.TEXT, true, payload)); + } + + public void sendFrame(WebSocketFrame frame) { + lock.lock(); + try { + onFrameSent(frame); + frame.write(this.out); + } catch (IOException e) { + throw new UncheckedIOException(e); + } finally { + lock.unlock(); + } + } +} diff --git a/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/internal/CloseFrame.java b/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/internal/CloseFrame.java new file mode 100644 index 00000000..a7dadec0 --- /dev/null +++ b/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/internal/CloseFrame.java @@ -0,0 +1,44 @@ +package io.avaje.jex.websocket.internal; + +import io.avaje.jex.websocket.exception.CloseCode; + +public final class CloseFrame extends WSFrame { + + private static byte[] generatePayload(CloseCode code, String closeReason) { + if (code != null) { + var reasonBytes = text2Binary(closeReason); + var payload = new byte[reasonBytes.length + 2]; + payload[0] = (byte) (code.getValue() >> 8 & 0xFF); + payload[1] = (byte) (code.getValue() & 0xFF); + System.arraycopy(reasonBytes, 0, payload, 2, reasonBytes.length); + return payload; + } + return new byte[0]; + } + + private CloseCode closeCode; + + private String closeReason; + + public CloseFrame(CloseCode code, String closeReason) { + super(OpCode.CLOSE, true, generatePayload(code, closeReason)); + } + + CloseFrame(WSFrame wrap) { + super(wrap); + assert wrap.opCode() == OpCode.CLOSE; + if (wrap.binaryPayload().length >= 2) { + this.closeCode = + CloseCode.find((wrap.binaryPayload()[0] & 0xFF) << 8 | wrap.binaryPayload()[1] & 0xFF); + this.closeReason = binary2Text(binaryPayload(), 2, binaryPayload().length - 2); + } + } + + public CloseCode getCloseCode() { + return this.closeCode; + } + + public String getCloseReason() { + return this.closeReason; + } +} diff --git a/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/internal/LICENSE.md b/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/internal/LICENSE.md new file mode 100644 index 00000000..4b21b270 --- /dev/null +++ b/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/internal/LICENSE.md @@ -0,0 +1,14 @@ +Most of the code in the websockets package is covered under the following license: + +Copyright (c) 2012-2013 by Paul S. Hawke, 2001,2005-2013 by Jarno Elonen, 2010 by Konstantinos Togias +All rights reserved. + +Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following conditions are met: + +* Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer. + +* Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following disclaimer in the documentation and/or other materials provided with the distribution. + +* Neither the name of the NanoHttpd organization nor the names of its contributors may be used to endorse or promote products derived from this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. \ No newline at end of file diff --git a/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/internal/State.java b/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/internal/State.java new file mode 100644 index 00000000..8b9d7160 --- /dev/null +++ b/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/internal/State.java @@ -0,0 +1,42 @@ +package io.avaje.jex.websocket.internal; + +/* + * #%L + * NanoHttpd-Websocket + * %% + * Copyright (C) 2012 - 2016 nanohttpd + * %% + * Redistribution and use in source and binary forms, with or without modification, + * are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, this + * list of conditions and the following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * 3. Neither the name of the nanohttpd nor the names of its contributors + * may be used to endorse or promote products derived from this software without + * specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED + * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. + * IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, + * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, + * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE + * OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED + * OF THE POSSIBILITY OF SUCH DAMAGE. + * #L% + */ + +public enum State { + UNCONNECTED, + CONNECTING, + OPEN, + CLOSING, + CLOSED +} diff --git a/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/internal/Util.java b/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/internal/Util.java new file mode 100644 index 00000000..6133b464 --- /dev/null +++ b/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/internal/Util.java @@ -0,0 +1,67 @@ +package io.avaje.jex.websocket.internal; + +/* + * #%L + * NanoHttpd-Websocket + * %% + * Copyright (C) 2012 - 2016 nanohttpd + * %% + * Redistribution and use in source and binary forms, with or without modification, + * are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, this + * list of conditions and the following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * 3. Neither the name of the nanohttpd nor the names of its contributors + * may be used to endorse or promote products derived from this software without + * specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED + * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. + * IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, + * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, + * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE + * OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED + * OF THE POSSIBILITY OF SUCH DAMAGE. + * #L% + */ + +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; +import java.util.Base64; + +class Util { + + public static final String HEADER_UPGRADE = "Upgrade"; + + public static final String HEADER_UPGRADE_VALUE = "websocket"; + + public static final String HEADER_CONNECTION = "Connection"; + + public static final String HEADER_WEBSOCKET_VERSION = "sec-websocket-version"; + + public static final String HEADER_WEBSOCKET_VERSION_VALUE = "13"; + + public static final String HEADER_WEBSOCKET_KEY = "sec-websocket-key"; + + public static final String HEADER_WEBSOCKET_ACCEPT = "sec-websocket-accept"; + + public static final String HEADER_WEBSOCKET_PROTOCOL = "sec-websocket-protocol"; + + private static final String WEBSOCKET_KEY_MAGIC = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"; + + public static String makeAcceptKey(String key) throws NoSuchAlgorithmException { + var md = MessageDigest.getInstance("SHA-1"); + var text = key + Util.WEBSOCKET_KEY_MAGIC; + md.update(text.getBytes(), 0, text.length()); + var sha1hash = md.digest(); + return Base64.getEncoder().encodeToString(sha1hash); + } +} diff --git a/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/internal/WSFrame.java b/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/internal/WSFrame.java new file mode 100644 index 00000000..2b3218a3 --- /dev/null +++ b/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/internal/WSFrame.java @@ -0,0 +1,390 @@ +package io.avaje.jex.websocket.internal; + +/* + * #%L + * NanoHttpd-Websocket + * %% + * Copyright (C) 2012 - 2016 nanohttpd + * %% + * Redistribution and use in source and binary forms, with or without modification, + * are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, this + * list of conditions and the following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * 3. Neither the name of the nanohttpd nor the names of its contributors + * may be used to endorse or promote products derived from this software without + * specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED + * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. + * IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, + * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, + * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE + * OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED + * OF THE POSSIBILITY OF SUCH DAMAGE. + * #L% + */ + +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.List; + +import io.avaje.jex.websocket.WebSocketFrame; +import io.avaje.jex.websocket.exception.CloseCode; +import io.avaje.jex.websocket.exception.WebSocketException; + +public sealed class WSFrame implements WebSocketFrame permits CloseFrame { + + static final Charset TEXT_CHARSET = StandardCharsets.UTF_8; + + static String binary2Text(byte[] payload) { + return new String(payload, WSFrame.TEXT_CHARSET); + } + + static String binary2Text(byte[] payload, int offset, int length) { + return new String(payload, offset, length, WSFrame.TEXT_CHARSET); + } + + private static int checkedRead(int read) throws IOException { + if (read < 0) { + throw new EOFException(); + } + return read; + } + + static WSFrame read(InputStream in) throws IOException { + var head = (byte) checkedRead(in.read()); + var fin = (head & 0x80) != 0; + var opCode = OpCode.find((byte) (head & 0x0F)); + if ((head & 0x70) != 0) { + throw new WebSocketException( + CloseCode.PROTOCOL_ERROR, + "The reserved bits (" + Integer.toBinaryString(head & 0x70) + ") must be 0."); + } + if (opCode == null) { + throw new WebSocketException( + CloseCode.PROTOCOL_ERROR, + "Received frame with reserved/unknown opcode " + (head & 0x0F) + "."); + } + if (opCode.isControlFrame() && !fin) { + throw new WebSocketException(CloseCode.PROTOCOL_ERROR, "Fragmented control frame."); + } + + var frame = new WSFrame(opCode, fin); + frame.readPayloadInfo(in); + frame.readPayload(in); + if (frame.opCode() == OpCode.CLOSE) { + return new CloseFrame(frame); + } + return frame; + } + + static byte[] text2Binary(String payload) { + return payload.getBytes(WSFrame.TEXT_CHARSET); + } + + private OpCode opCode; + + private boolean fin; + + private byte[] maskingKey; + + private byte[] payload; + + // --------------------------------GETTERS--------------------------------- + + private int payloadLength; + + private String payloadString; + + private WSFrame(OpCode opCode, boolean fin) { + setOpCode(opCode); + setFin(fin); + } + + public WSFrame(OpCode opCode, boolean fin, byte[] payload) { + this(opCode, fin, payload, null); + } + + public WSFrame(OpCode opCode, boolean fin, byte[] payload, byte[] maskingKey) { + this(opCode, fin); + setMaskingKey(maskingKey); + setBinaryPayload(payload); + } + + public WSFrame(OpCode opCode, boolean fin, String payload) { + this(opCode, fin, payload, null); + } + + public WSFrame(OpCode opCode, boolean fin, String payload, byte[] maskingKey) { + this(opCode, fin); + setMaskingKey(maskingKey); + setTextPayload(payload); + } + + public WSFrame(OpCode opCode, List fragments) throws WebSocketException { + setOpCode(opCode); + setFin(true); + + var length = 0L; + for (var inter : fragments) { + length += inter.binaryPayload().length; + } + if (length < 0 || length > Integer.MAX_VALUE) { + throw new WebSocketException( + CloseCode.MESSAGE_TOO_BIG, "Max frame length has been exceeded."); + } + this.payloadLength = (int) length; + var payload = new byte[this.payloadLength]; + var offset = 0; + for (var inter : fragments) { + System.arraycopy(inter.binaryPayload(), 0, payload, offset, inter.binaryPayload().length); + offset += inter.binaryPayload().length; + } + setBinaryPayload(payload); + } + + public WSFrame(WSFrame clone) { + setOpCode(clone.opCode()); + setFin(clone.isFin()); + setBinaryPayload(clone.binaryPayload()); + setMaskingKey(clone.maskingKey()); + } + + @Override + public byte[] binaryPayload() { + return this.payload; + } + + @Override + public byte[] maskingKey() { + return this.maskingKey; + } + + @Override + public OpCode opCode() { + return this.opCode; + } + + // --------------------------------SERIALIZATION--------------------------- + + @Override + public String textPayload() { + if (this.payloadString == null) { + this.payloadString = binary2Text(binaryPayload()); + } + return this.payloadString; + } + + @Override + public boolean isFin() { + return this.fin; + } + + @Override + public boolean isMasked() { + return this.maskingKey != null && this.maskingKey.length == 4; + } + + private String payloadToString() { + if (this.payload == null) { + return "null"; + } + final var sb = new StringBuilder(); + sb.append('[').append(this.payload.length).append("b] "); + if (opCode() == OpCode.TEXT) { + var text = textPayload(); + if (text.length() > 100) { + sb.append(text.substring(0, 100)).append("..."); + } else { + sb.append(text); + } + } else { + sb.append("0x"); + for (var i = 0; i < Math.min(this.payload.length, 50); ++i) { + sb.append(Integer.toHexString(this.payload[i] & 0xFF)); + } + if (this.payload.length > 50) { + sb.append("..."); + } + } + return sb.toString(); + } + + private void readPayload(InputStream in) throws IOException { + this.payload = new byte[this.payloadLength]; + var read = 0; + while (read < this.payloadLength) { + read += checkedRead(in.read(this.payload, read, this.payloadLength - read)); + } + + if (isMasked()) { + for (var i = 0; i < this.payload.length; i++) { + this.payload[i] ^= this.maskingKey[i % 4]; + } + } + + // Test for Unicode errors + if (opCode() == OpCode.TEXT) { + this.payloadString = binary2Text(binaryPayload()); + } + } + + // --------------------------------ENCODING-------------------------------- + + private void readPayloadInfo(InputStream in) throws IOException { + var b = (byte) checkedRead(in.read()); + var masked = (b & 0x80) != 0; + + this.payloadLength = (byte) (0x7F & b); + if (this.payloadLength == 126) { + // checkedRead must return int for this to work + this.payloadLength = (checkedRead(in.read()) << 8 | checkedRead(in.read())) & 0xFFFF; + if (this.payloadLength < 126) { + throw new WebSocketException( + CloseCode.PROTOCOL_ERROR, + "Invalid data frame 2byte length. (not using minimal length encoding)"); + } + } else if (this.payloadLength == 127) { + var length = + (long) checkedRead(in.read()) << 56 + | (long) checkedRead(in.read()) << 48 + | (long) checkedRead(in.read()) << 40 + | (long) checkedRead(in.read()) << 32 + | checkedRead(in.read()) << 24 + | checkedRead(in.read()) << 16 + | checkedRead(in.read()) << 8 + | checkedRead(in.read()); + if (length < 65536) { + throw new WebSocketException( + CloseCode.PROTOCOL_ERROR, + "Invalid data frame 4byte length. (not using minimal length encoding)"); + } + if (length < 0 || length > Integer.MAX_VALUE) { + throw new WebSocketException( + CloseCode.MESSAGE_TOO_BIG, "Max frame length has been exceeded."); + } + this.payloadLength = (int) length; + } + + if (this.opCode.isControlFrame()) { + if (this.payloadLength > 125) { + throw new WebSocketException( + CloseCode.PROTOCOL_ERROR, "Control frame with payload length > 125 bytes."); + } + if (this.opCode == OpCode.CLOSE && this.payloadLength == 1) { + throw new WebSocketException( + CloseCode.PROTOCOL_ERROR, "Received close frame with payload len 1."); + } + } + + if (masked) { + this.maskingKey = new byte[4]; + var read = 0; + while (read < this.maskingKey.length) { + read += checkedRead(in.read(this.maskingKey, read, this.maskingKey.length - read)); + } + } + } + + void setBinaryPayload(byte[] payload) { + this.payload = payload; + this.payloadLength = payload.length; + this.payloadString = null; + } + + void setFin(boolean fin) { + this.fin = fin; + } + + void setMaskingKey(byte[] maskingKey) { + if (maskingKey != null && maskingKey.length != 4) { + throw new IllegalArgumentException( + "MaskingKey " + Arrays.toString(maskingKey) + " hasn't length 4"); + } + this.maskingKey = maskingKey; + } + + void setOpCode(OpCode opcode) { + this.opCode = opcode; + } + + void setTextPayload(String payload) { + this.payload = text2Binary(payload); + this.payloadLength = payload.length(); + this.payloadString = payload; + } + + // --------------------------------CONSTANTS------------------------------- + + void setUnmasked() { + setMaskingKey(null); + } + + @Override + public String toString() { + final var sb = new StringBuilder("WS["); + sb.append(opCode()); + sb.append(", ").append(isFin() ? "fin" : "inter"); + sb.append(", ").append(isMasked() ? "masked" : "unmasked"); + sb.append(", ").append(payloadToString()); + sb.append(']'); + return sb.toString(); + } + + // ------------------------------------------------------------------------ + + @Override + public void write(OutputStream out) throws IOException { + byte header = 0; + if (this.fin) { + header |= 0x80; + } + header |= this.opCode.value() & 0x0F; + out.write(header); + + this.payloadLength = binaryPayload().length; + if (this.payloadLength <= 125) { + out.write(isMasked() ? 0x80 | (byte) this.payloadLength : (byte) this.payloadLength); + } else { + if (this.payloadLength <= 0xFFFF) { + out.write(isMasked() ? 0xFE : 126); + } else { + out.write(isMasked() ? 0xFF : 127); + out.write(this.payloadLength >>> 56 & 0); // integer only + // contains + // 31 bit + out.write(this.payloadLength >>> 48 & 0); + out.write(this.payloadLength >>> 40 & 0); + out.write(this.payloadLength >>> 32 & 0); + out.write(this.payloadLength >>> 24); + out.write(this.payloadLength >>> 16); + } + out.write(this.payloadLength >>> 8); + out.write(this.payloadLength); + } + + if (isMasked()) { + out.write(this.maskingKey); + for (var i = 0; i < this.payloadLength; i++) { + out.write(binaryPayload()[i] ^ this.maskingKey[i % 4]); + } + } else { + out.write(binaryPayload()); + } + out.flush(); + } +} diff --git a/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/internal/WebSocketHandler.java b/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/internal/WebSocketHandler.java new file mode 100644 index 00000000..de4ca60b --- /dev/null +++ b/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/internal/WebSocketHandler.java @@ -0,0 +1,70 @@ +package io.avaje.jex.websocket.internal; + +import java.io.IOException; +import java.security.NoSuchAlgorithmException; + +import com.sun.net.httpserver.Headers; + +import io.avaje.jex.http.BadRequestException; +import io.avaje.jex.http.Context; +import io.avaje.jex.http.ExchangeHandler; +import io.avaje.jex.http.HttpResponseException; +import io.avaje.jex.http.HttpStatus; +import io.avaje.jex.http.InternalServerErrorException; + +public abstract class WebSocketHandler implements ExchangeHandler { + + @Override + public void handle(Context ctx) throws IOException { + var headers = ctx.requestHeaders(); + + if (!isWebsocketRequested(headers)) { + throw new HttpResponseException(HttpStatus.UPGRADE_REQUIRED_426, "Not a websocket request"); + } + + if (!Util.HEADER_WEBSOCKET_VERSION_VALUE.equalsIgnoreCase( + headers.getFirst(Util.HEADER_WEBSOCKET_VERSION)) + || !headers.containsKey(Util.HEADER_WEBSOCKET_KEY)) { + throw new BadRequestException( + "Invalid Websocket-Version " + headers.getFirst(Util.HEADER_WEBSOCKET_VERSION)); + } + + var webSocket = openWebSocket(ctx); + + try { + ctx.header( + Util.HEADER_WEBSOCKET_ACCEPT, + Util.makeAcceptKey(headers.getFirst(Util.HEADER_WEBSOCKET_KEY))); + } catch (NoSuchAlgorithmException e) { + throw new InternalServerErrorException( + "The SHA-1 Algorithm required for websockets is not available on the server."); + } + + if (headers.containsKey(Util.HEADER_WEBSOCKET_PROTOCOL)) { + ctx.header( + Util.HEADER_WEBSOCKET_PROTOCOL, + headers.getFirst(Util.HEADER_WEBSOCKET_PROTOCOL).split(",")[0]); + } + + ctx.header(Util.HEADER_UPGRADE, Util.HEADER_UPGRADE_VALUE); + ctx.header(Util.HEADER_CONNECTION, Util.HEADER_UPGRADE); + ctx.writeEmpty(101); + + // this won't return until websocket is closed + webSocket.readWebsocket(); + } + + private static boolean isWebsocketRequested(Headers headers) { + // check if Upgrade connection + var values = headers.get(Util.HEADER_CONNECTION); + if (values == null + || values.stream().filter(Util.HEADER_UPGRADE::equalsIgnoreCase).findAny().isEmpty()) { + return false; + } + // check for proper upgrade type + var upgrade = headers.getFirst(Util.HEADER_UPGRADE); + return Util.HEADER_UPGRADE_VALUE.equalsIgnoreCase(upgrade); + } + + protected abstract AbstractWebSocket openWebSocket(Context exchange); +} diff --git a/avaje-jex-websockets/src/main/java/module-info.java b/avaje-jex-websockets/src/main/java/module-info.java new file mode 100644 index 00000000..598e7a5a --- /dev/null +++ b/avaje-jex-websockets/src/main/java/module-info.java @@ -0,0 +1,23 @@ +/** + * Defines the Static Content API for serving static resources with Jex - see {@link io.avaje.jex.staticcontent.StaticContent}. + * + *
{@code
+ * var staticContent = StaticContentService.createCP("/public").httpPath("/").directoryIndex("index.html");
+ * final Jex.Server app = Jex.create()
+ *   .plugin(staticContent)
+ *   .port(8080)
+ *   .start();
+ *
+ * app.shutdown();
+ *
+ * }
+ */ +module io.avaje.jex.websocket { + + exports io.avaje.jex.websocket; + exports io.avaje.jex.websocket.exception; + + requires transitive io.avaje.jex; + requires static java.logging; + +} diff --git a/avaje-jex-websockets/src/test/java/io/avaje/jex/websocket/internal/EchoWebSocketHandler.java b/avaje-jex-websockets/src/test/java/io/avaje/jex/websocket/internal/EchoWebSocketHandler.java new file mode 100644 index 00000000..ca048aae --- /dev/null +++ b/avaje-jex-websockets/src/test/java/io/avaje/jex/websocket/internal/EchoWebSocketHandler.java @@ -0,0 +1,20 @@ +package io.avaje.jex.websocket.internal; + +import io.avaje.jex.websocket.WebSocketListener; +import io.avaje.jex.websocket.WsContext.WsMessage; + +public class EchoWebSocketHandler implements WebSocketListener { + + private StringBuilder sb = new StringBuilder(); + + @Override + public void onMessage(WsMessage message) { + sb.append(message.message()); + if (message.wsFrame().isFin()) { + String msg = sb.toString(); + sb = new StringBuilder(); + message.send(msg); + } + message.closeSession(); + } +} diff --git a/avaje-jex-websockets/src/test/java/io/avaje/jex/websocket/internal/WebSocketClientUtil.java b/avaje-jex-websockets/src/test/java/io/avaje/jex/websocket/internal/WebSocketClientUtil.java new file mode 100644 index 00000000..cbc1b50b --- /dev/null +++ b/avaje-jex-websockets/src/test/java/io/avaje/jex/websocket/internal/WebSocketClientUtil.java @@ -0,0 +1,72 @@ +package io.avaje.jex.websocket.internal; + +import java.net.URI; +import java.net.http.HttpClient; +import java.net.http.WebSocket; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; + +public class WebSocketClientUtil { + + public static WebSocket createWSC( + int port, String path, final Consumer onTextCallback, Runnable onCloseCallback) + throws InterruptedException { + + HttpClient client = HttpClient.newHttpClient(); + + CountDownLatch waitForOpen = new CountDownLatch(1); + + CompletableFuture future = + client + .newWebSocketBuilder() + .buildAsync( + URI.create("ws://localhost:" + port + path), + new WebSocket.Listener() { + StringBuilder text = new StringBuilder(); + + @Override + public CompletionStage onText( + WebSocket webSocket, CharSequence data, boolean last) { + text.append(data); + if (last) { + onTextCallback.accept(text.toString()); + text = new StringBuilder(); + } + webSocket.request(1); + + return null; + } + + @Override + public CompletionStage onClose( + WebSocket webSocket, int statusCode, String reason) { + if (onCloseCallback != null) { + onCloseCallback.run(); + } + return null; + } + + @Override + public void onError(WebSocket webSocket, Throwable error) { + if (onCloseCallback != null) { + onCloseCallback.run(); + } + } + + @Override + public void onOpen(WebSocket webSocket) { + waitForOpen.countDown(); + webSocket.request(1); + } + }); + + WebSocket ws = future.join(); + if (!waitForOpen.await(5, TimeUnit.SECONDS)) { + throw new IllegalStateException("websocket did not open"); + } + return ws; + } +} diff --git a/avaje-jex-websockets/src/test/java/io/avaje/jex/websocket/internal/WebSocketTest.java b/avaje-jex-websockets/src/test/java/io/avaje/jex/websocket/internal/WebSocketTest.java new file mode 100644 index 00000000..e488e78d --- /dev/null +++ b/avaje-jex-websockets/src/test/java/io/avaje/jex/websocket/internal/WebSocketTest.java @@ -0,0 +1,76 @@ +package io.avaje.jex.websocket.internal; + +import static org.junit.jupiter.api.Assertions.fail; + +import java.io.IOException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.logging.ConsoleHandler; +import java.util.logging.Level; +import java.util.logging.Logger; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import io.avaje.jex.Jex; +import io.avaje.jex.test.TestPair; +import io.avaje.jex.websocket.WebSocketPlugin; + +public class WebSocketTest { + + static { + // System.setProperty("jdk.httpclient.HttpClient.log", "all"); + // System.setProperty("jdk.internal.httpclient.websocket.debug", "true"); + } + + private static final String path = "/ws"; + + TestPair server; + + @BeforeEach + public void setUp() throws IOException { + + var jex = Jex.create(); + + WebSocketPlugin p = WebSocketPlugin.create(); + p.ws(path, new EchoWebSocketHandler()); + jex.plugin(p); + server = TestPair.create(jex); + + Logger logger = Logger.getLogger(WebSocketTest.class.getName()); + ConsoleHandler ch = new ConsoleHandler(); + logger.setLevel(Level.ALL); + ch.setLevel(Level.ALL); + logger.addHandler(ch); + } + + @AfterEach + public void tearDown() { + server.shutdown(); + } + + @Test + public void testEcho() throws InterruptedException { + CountDownLatch latch = new CountDownLatch(1); + var client = + WebSocketClientUtil.createWSC( + server.port(), + path, + s -> { + if ("a_message".equals(s)) { + latch.countDown(); + } else { + fail("received wrong message"); + } + }, + null); + + client.sendText("a_message", true); + + if (!latch.await(5, TimeUnit.SECONDS)) { + fail("did not receive message"); + } + System.err.println("closing client"); + } +} diff --git a/avaje-jex/src/main/java/io/avaje/jex/core/JdkContext.java b/avaje-jex/src/main/java/io/avaje/jex/core/JdkContext.java index 0aeae922..32375b0d 100644 --- a/avaje-jex/src/main/java/io/avaje/jex/core/JdkContext.java +++ b/avaje-jex/src/main/java/io/avaje/jex/core/JdkContext.java @@ -541,6 +541,15 @@ public void write(String content) { write(content.getBytes(StandardCharsets.UTF_8)); } + @Override + public void writeEmpty(int statusCode) { + try { + exchange.sendResponseHeaders(statusCode, -1); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + @Override public JsonService jsonService() { return mgr.jsonService(); diff --git a/avaje-jex/src/main/java/io/avaje/jex/core/json/JacksonJsonService.java b/avaje-jex/src/main/java/io/avaje/jex/core/json/JacksonJsonService.java index ac8ee4b5..dac3e48e 100644 --- a/avaje-jex/src/main/java/io/avaje/jex/core/json/JacksonJsonService.java +++ b/avaje-jex/src/main/java/io/avaje/jex/core/json/JacksonJsonService.java @@ -24,7 +24,8 @@ public final class JacksonJsonService implements JsonService { /** Create with defaults for Jackson */ public JacksonJsonService() { - this.mapper = new ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + this.mapper = + new ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); } /** Create with a Jackson instance that might have custom configuration. */ @@ -53,7 +54,8 @@ public T fromJson(Type type, byte[] data) { } private JavaType javaType(Type type) { - return javaTypes.computeIfAbsent(type.getTypeName(), k -> mapper.getTypeFactory().constructType(type)); + return javaTypes.computeIfAbsent( + type.getTypeName(), k -> mapper.getTypeFactory().constructType(type)); } @Override @@ -101,4 +103,14 @@ private void write(Iterator iterator, final JsonGenerator generator) { throw new UncheckedIOException(e); } } + + @Override + public T fromJson(Type type, String data) { + try { + final var javaType = javaType(type); + return mapper.readValue(data, javaType); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } } diff --git a/avaje-jex/src/main/java/io/avaje/jex/core/json/JsonbJsonService.java b/avaje-jex/src/main/java/io/avaje/jex/core/json/JsonbJsonService.java index e07f4bc3..ac6dab76 100644 --- a/avaje-jex/src/main/java/io/avaje/jex/core/json/JsonbJsonService.java +++ b/avaje-jex/src/main/java/io/avaje/jex/core/json/JsonbJsonService.java @@ -61,4 +61,9 @@ public void toJsonStream(Iterator iterator, OutputStream os) { } } } + + @Override + public T fromJson(Type type, String data) { + return jsonb.type(type).fromJson(data); + } } diff --git a/avaje-jex/src/main/java/io/avaje/jex/http/Context.java b/avaje-jex/src/main/java/io/avaje/jex/http/Context.java index 9d01a098..66257626 100644 --- a/avaje-jex/src/main/java/io/avaje/jex/http/Context.java +++ b/avaje-jex/src/main/java/io/avaje/jex/http/Context.java @@ -556,6 +556,14 @@ default String userAgent() { return header(Constants.USER_AGENT); } + /** Writes Nothing. */ + void writeEmpty(int statusCode); + + /** Writes Nothing. */ + default void writeEmpty(HttpStatus statusCode) { + writeEmpty(statusCode.status()); + } + /** * Writes the given bytes directly to the response. * diff --git a/avaje-jex/src/main/java/io/avaje/jex/spi/JsonService.java b/avaje-jex/src/main/java/io/avaje/jex/spi/JsonService.java index bab6d9f0..2325bd2d 100644 --- a/avaje-jex/src/main/java/io/avaje/jex/spi/JsonService.java +++ b/avaje-jex/src/main/java/io/avaje/jex/spi/JsonService.java @@ -35,10 +35,19 @@ public non-sealed interface JsonService extends JexExtension { String toJsonString(Object bean); /** - * Deserializes a json input stream and deserializes it into a Java object of the specified type. + * **Read a Java Object from JSON string** + * + *

Deserializes a Java object from a JSON string * * @param type the Type object of the desired type * @param is the input stream containing the JSON data + * @return the serialized JSON string + */ + T fromJson(Type type, String string); + + /** + * Deserializes a json input stream and deserializes it into a Java object of the specified type. + * * @return the deserialized object */ T fromJson(Type type, InputStream is); diff --git a/pom.xml b/pom.xml index ebc07e2c..a31791eb 100644 --- a/pom.xml +++ b/pom.xml @@ -48,6 +48,7 @@ avaje-jex-static-content avaje-jex-test avaje-jex-ssl + avaje-jex-websockets From 146c146aedc8516d48ebade730419fe8465e18b1 Mon Sep 17 00:00:00 2001 From: Josiah Noel <32279667+SentryMan@users.noreply.github.com> Date: Thu, 4 Sep 2025 01:36:37 -0400 Subject: [PATCH 02/11] will of D --- .../jex/websocket/{JexWebSocket.java => DWebSocket.java} | 4 ++-- ...bSocketExchangeHandler.java => DWebSocketHandler.java} | 8 ++++---- .../main/java/io/avaje/jex/websocket/WebSocketPlugin.java | 2 +- .../main/java/io/avaje/jex/websocket/internal/Util.java | 8 -------- .../jex/websocket/internal/EchoWebSocketHandler.java | 8 +++++++- .../avaje/jex/websocket/internal/WebSocketClientUtil.java | 1 + 6 files changed, 15 insertions(+), 16 deletions(-) rename avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/{JexWebSocket.java => DWebSocket.java} (93%) rename avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/{WebSocketExchangeHandler.java => DWebSocketHandler.java} (50%) diff --git a/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/JexWebSocket.java b/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/DWebSocket.java similarity index 93% rename from avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/JexWebSocket.java rename to avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/DWebSocket.java index 649dfa63..f66b3f1d 100644 --- a/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/JexWebSocket.java +++ b/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/DWebSocket.java @@ -10,12 +10,12 @@ import io.avaje.jex.websocket.exception.CloseCode; import io.avaje.jex.websocket.internal.AbstractWebSocket; -class JexWebSocket extends AbstractWebSocket { +class DWebSocket extends AbstractWebSocket { private final WebSocketListener listener; private final Context ctx; - JexWebSocket(Context ctx, WebSocketListener listener) { + DWebSocket(Context ctx, WebSocketListener listener) { super(ctx.exchange()); this.listener = listener; this.ctx = ctx; diff --git a/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/WebSocketExchangeHandler.java b/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/DWebSocketHandler.java similarity index 50% rename from avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/WebSocketExchangeHandler.java rename to avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/DWebSocketHandler.java index fd7e3b66..4a05476a 100644 --- a/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/WebSocketExchangeHandler.java +++ b/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/DWebSocketHandler.java @@ -3,17 +3,17 @@ import io.avaje.jex.http.Context; import io.avaje.jex.websocket.internal.WebSocketHandler; -class WebSocketExchangeHandler extends WebSocketHandler { +class DWebSocketHandler extends WebSocketHandler { private final WebSocketListener listener; - WebSocketExchangeHandler(WebSocketListener listener) { + DWebSocketHandler(WebSocketListener listener) { this.listener = listener; } @Override - protected JexWebSocket openWebSocket(Context exchange) { + protected DWebSocket openWebSocket(Context exchange) { - return new JexWebSocket(exchange, listener); + return new DWebSocket(exchange, listener); } } diff --git a/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/WebSocketPlugin.java b/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/WebSocketPlugin.java index 105feabf..f8dd6512 100644 --- a/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/WebSocketPlugin.java +++ b/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/WebSocketPlugin.java @@ -13,7 +13,7 @@ public class WebSocketPlugin implements JexPlugin { private final List handlers = new ArrayList<>(); public WebSocketPlugin ws(String path, WebSocketListener listener, Role... roles) { - handlers.add(r -> r.get(path, new WebSocketExchangeHandler(listener), roles)); + handlers.add(r -> r.get(path, new DWebSocketHandler(listener), roles)); return this; } diff --git a/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/internal/Util.java b/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/internal/Util.java index 6133b464..a5a168d2 100644 --- a/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/internal/Util.java +++ b/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/internal/Util.java @@ -40,21 +40,13 @@ class Util { public static final String HEADER_UPGRADE = "Upgrade"; - public static final String HEADER_UPGRADE_VALUE = "websocket"; - public static final String HEADER_CONNECTION = "Connection"; - public static final String HEADER_WEBSOCKET_VERSION = "sec-websocket-version"; - public static final String HEADER_WEBSOCKET_VERSION_VALUE = "13"; - public static final String HEADER_WEBSOCKET_KEY = "sec-websocket-key"; - public static final String HEADER_WEBSOCKET_ACCEPT = "sec-websocket-accept"; - public static final String HEADER_WEBSOCKET_PROTOCOL = "sec-websocket-protocol"; - private static final String WEBSOCKET_KEY_MAGIC = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"; public static String makeAcceptKey(String key) throws NoSuchAlgorithmException { diff --git a/avaje-jex-websockets/src/test/java/io/avaje/jex/websocket/internal/EchoWebSocketHandler.java b/avaje-jex-websockets/src/test/java/io/avaje/jex/websocket/internal/EchoWebSocketHandler.java index ca048aae..62d166d0 100644 --- a/avaje-jex-websockets/src/test/java/io/avaje/jex/websocket/internal/EchoWebSocketHandler.java +++ b/avaje-jex-websockets/src/test/java/io/avaje/jex/websocket/internal/EchoWebSocketHandler.java @@ -1,6 +1,7 @@ package io.avaje.jex.websocket.internal; import io.avaje.jex.websocket.WebSocketListener; +import io.avaje.jex.websocket.WsContext.WsError; import io.avaje.jex.websocket.WsContext.WsMessage; public class EchoWebSocketHandler implements WebSocketListener { @@ -15,6 +16,11 @@ public void onMessage(WsMessage message) { sb = new StringBuilder(); message.send(msg); } - message.closeSession(); + // message.closeSession(); + } + + @Override + public void onError(WsError wsError) { + wsError.error().printStackTrace(); } } diff --git a/avaje-jex-websockets/src/test/java/io/avaje/jex/websocket/internal/WebSocketClientUtil.java b/avaje-jex-websockets/src/test/java/io/avaje/jex/websocket/internal/WebSocketClientUtil.java index cbc1b50b..f1c5fbe7 100644 --- a/avaje-jex-websockets/src/test/java/io/avaje/jex/websocket/internal/WebSocketClientUtil.java +++ b/avaje-jex-websockets/src/test/java/io/avaje/jex/websocket/internal/WebSocketClientUtil.java @@ -54,6 +54,7 @@ public void onError(WebSocket webSocket, Throwable error) { if (onCloseCallback != null) { onCloseCallback.run(); } + error.printStackTrace(); } @Override From b1ef4218742d7065b95d926dcad1ab09150362cd Mon Sep 17 00:00:00 2001 From: Josiah Noel <32279667+SentryMan@users.noreply.github.com> Date: Thu, 4 Sep 2025 15:25:22 -0400 Subject: [PATCH 03/11] builder --- .../avaje/jex/websocket/ListenerBuilder.java | 132 ++++++++++++++++++ .../jex/websocket/WebSocketListener.java | 66 +++++++++ .../avaje/jex/websocket/WebSocketPlugin.java | 12 +- .../io/avaje/jex/websocket/WsContext.java | 9 ++ .../internal/EchoWebSocketHandler.java | 3 +- .../jex/websocket/internal/WebSocketTest.java | 3 +- 6 files changed, 219 insertions(+), 6 deletions(-) create mode 100644 avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/ListenerBuilder.java diff --git a/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/ListenerBuilder.java b/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/ListenerBuilder.java new file mode 100644 index 00000000..8e37750f --- /dev/null +++ b/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/ListenerBuilder.java @@ -0,0 +1,132 @@ +package io.avaje.jex.websocket; + +import java.util.function.Consumer; + +import io.avaje.jex.websocket.WsContext.WsBinaryMessage; +import io.avaje.jex.websocket.WsContext.WsClose; +import io.avaje.jex.websocket.WsContext.WsError; +import io.avaje.jex.websocket.WsContext.WsMessage; +import io.avaje.jex.websocket.WsContext.WsOpen; +import io.avaje.jex.websocket.WsContext.WsPong; + +/** A builder for creating a {@link WebSocketListener} with specific event handlers. */ +class ListenerBuilder implements WebSocketListener.Builder { + private Consumer onOpen; + private Consumer onMessage; + private Consumer onBinaryMessage; + private Consumer onClose; + private Consumer onPong; + private Consumer onError; + + /** + * Set the handler for the WebSocket open event. + * + * @param handler Consumer for {@link WsOpen} + * @return this builder + */ + @Override + public ListenerBuilder onOpen(Consumer handler) { + this.onOpen = handler; + return this; + } + + /** + * Set the handler for the WebSocket text message event. + * + * @param handler Consumer for {@link WsMessage} + * @return this builder + */ + @Override + public ListenerBuilder onMessage(Consumer handler) { + this.onMessage = handler; + return this; + } + + /** + * Set the handler for the WebSocket binary message event. + * + * @param handler Consumer for {@link WsBinaryMessage} + * @return this builder + */ + @Override + public ListenerBuilder onBinaryMessage(Consumer handler) { + this.onBinaryMessage = handler; + return this; + } + + /** + * Set the handler for the WebSocket close event. + * + * @param handler Consumer for {@link WsClose} + * @return this builder + */ + @Override + public ListenerBuilder onClose(Consumer handler) { + this.onClose = handler; + return this; + } + + /** + * Set the handler for the WebSocket pong event. + * + * @param handler Consumer for {@link WsPong} + * @return this builder + */ + @Override + public ListenerBuilder onPong(Consumer handler) { + this.onPong = handler; + return this; + } + + /** + * Set the handler for the WebSocket error event. + * + * @param handler Consumer for {@link WsError} + * @return this builder + */ + @Override + public ListenerBuilder onError(Consumer handler) { + this.onError = handler; + return this; + } + + /** + * Build a {@link WebSocketListener} implementation using the configured handlers. + * + * @return a new {@link WebSocketListener} instance + */ + @Override + public WebSocketListener build() { + return new WebSocketListener() { + @Override + public void onOpen(WsOpen wsOpen) { + if (onOpen != null) onOpen.accept(wsOpen); + } + + @Override + public void onMessage(WsMessage message) { + if (onMessage != null) onMessage.accept(message); + } + + @Override + public void onBinaryMessage(WsBinaryMessage binaryPayload) { + if (onBinaryMessage != null) onBinaryMessage.accept(binaryPayload); + } + + @Override + public void onClose(WsClose wsClose) { + if (onClose != null) onClose.accept(wsClose); + } + + @Override + public void onPong(WsPong wsPong) { + if (onPong != null) onPong.accept(wsPong); + } + + @Override + public void onError(WsError wsError) { + if (onError != null) onError.accept(wsError); + } + }; + } +} diff --git a/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/WebSocketListener.java b/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/WebSocketListener.java index 9b6870f7..c11d0dd4 100644 --- a/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/WebSocketListener.java +++ b/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/WebSocketListener.java @@ -1,5 +1,7 @@ package io.avaje.jex.websocket; +import java.util.function.Consumer; + import io.avaje.jex.websocket.WsContext.WsBinaryMessage; import io.avaje.jex.websocket.WsContext.WsClose; import io.avaje.jex.websocket.WsContext.WsError; @@ -12,6 +14,16 @@ * log. */ public interface WebSocketListener { + + /** + * Create a builder for a WebSocketListener. + * + * @return the builder + */ + static Builder builder() { + return new ListenerBuilder(); + } + /** * Called when a binary message is received. * @@ -53,4 +65,58 @@ default void onPong(WsPong wsPong) {} * @param wsError the error */ default void onError(WsError wsError) {} + + interface Builder { + + /** + * Set the handler for the WebSocket open event. + * + * @param handler Consumer for {@link WsOpen} + * @return this builder + */ + Builder onOpen(Consumer handler); + + /** + * Set the handler for the WebSocket text message event. + * + * @param handler Consumer for {@link WsMessage} + * @return this builder + */ + Builder onMessage(Consumer handler); + + /** + * Set the handler for the WebSocket binary message event. + * + * @param handler Consumer for {@link WsBinaryMessage} + * @return this builder + */ + Builder onBinaryMessage(Consumer handler); + + /** + * Set the handler for the WebSocket close event. + * + * @param handler Consumer for {@link WsClose} + * @return this builder + */ + Builder onClose(Consumer handler); + + /** + * Set the handler for the WebSocket pong event. + * + * @param handler Consumer for {@link WsPong} + * @return this builder + */ + Builder onPong(Consumer handler); + + /** + * Set the handler for the WebSocket error event. + * + * @param handler Consumer for {@link WsError} + * @return this builder + */ + Builder onError(Consumer handler); + + /** Build the WebSocketListener. */ + WebSocketListener build(); + } } diff --git a/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/WebSocketPlugin.java b/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/WebSocketPlugin.java index f8dd6512..06070a78 100644 --- a/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/WebSocketPlugin.java +++ b/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/WebSocketPlugin.java @@ -2,15 +2,23 @@ import java.util.ArrayList; import java.util.List; +import java.util.function.Consumer; import io.avaje.jex.Jex; -import io.avaje.jex.Routing; +import io.avaje.jex.Routing.HttpService; import io.avaje.jex.security.Role; import io.avaje.jex.spi.JexPlugin; +import io.avaje.jex.websocket.WebSocketListener.Builder; public class WebSocketPlugin implements JexPlugin { - private final List handlers = new ArrayList<>(); + private final List handlers = new ArrayList<>(); + + public WebSocketPlugin ws(String path, Consumer consumer, Role... roles) { + var builder = WebSocketListener.builder(); + consumer.accept(builder); + return ws(path, builder.build(), roles); + } public WebSocketPlugin ws(String path, WebSocketListener listener, Role... roles) { handlers.add(r -> r.get(path, new DWebSocketHandler(listener), roles)); diff --git a/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/WsContext.java b/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/WsContext.java index 52f22a42..319eb3e9 100644 --- a/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/WsContext.java +++ b/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/WsContext.java @@ -143,6 +143,15 @@ public abstract static sealed class WsMessageCtx extends WsContext { public WebSocketFrame wsFrame() { return wsFrame; } + + /** + * Indicates if this frame is the final fragment in a message. + * + * @return true if final fragment, false otherwise + */ + public boolean isFin() { + return wsFrame.isFin(); + } } public static final class WsBinaryMessage extends WsMessageCtx { diff --git a/avaje-jex-websockets/src/test/java/io/avaje/jex/websocket/internal/EchoWebSocketHandler.java b/avaje-jex-websockets/src/test/java/io/avaje/jex/websocket/internal/EchoWebSocketHandler.java index 62d166d0..4679ab25 100644 --- a/avaje-jex-websockets/src/test/java/io/avaje/jex/websocket/internal/EchoWebSocketHandler.java +++ b/avaje-jex-websockets/src/test/java/io/avaje/jex/websocket/internal/EchoWebSocketHandler.java @@ -11,12 +11,11 @@ public class EchoWebSocketHandler implements WebSocketListener { @Override public void onMessage(WsMessage message) { sb.append(message.message()); - if (message.wsFrame().isFin()) { + if (message.isFin()) { String msg = sb.toString(); sb = new StringBuilder(); message.send(msg); } - // message.closeSession(); } @Override diff --git a/avaje-jex-websockets/src/test/java/io/avaje/jex/websocket/internal/WebSocketTest.java b/avaje-jex-websockets/src/test/java/io/avaje/jex/websocket/internal/WebSocketTest.java index e488e78d..bdaa034f 100644 --- a/avaje-jex-websockets/src/test/java/io/avaje/jex/websocket/internal/WebSocketTest.java +++ b/avaje-jex-websockets/src/test/java/io/avaje/jex/websocket/internal/WebSocketTest.java @@ -2,7 +2,6 @@ import static org.junit.jupiter.api.Assertions.fail; -import java.io.IOException; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.logging.ConsoleHandler; @@ -29,7 +28,7 @@ public class WebSocketTest { TestPair server; @BeforeEach - public void setUp() throws IOException { + void setUp() { var jex = Jex.create(); From d6814e5d0f10ef6ecadd044391c87fcbc92def15 Mon Sep 17 00:00:00 2001 From: Josiah Noel <32279667+SentryMan@users.noreply.github.com> Date: Sun, 14 Sep 2025 22:32:00 -0400 Subject: [PATCH 04/11] remove write from interface --- .../java/io/avaje/jex/websocket/WebSocketFrame.java | 10 ---------- .../jex/websocket/internal/AbstractWebSocket.java | 2 +- .../java/io/avaje/jex/websocket/internal/WSFrame.java | 4 +--- 3 files changed, 2 insertions(+), 14 deletions(-) diff --git a/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/WebSocketFrame.java b/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/WebSocketFrame.java index 39b57f10..84bed8e3 100644 --- a/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/WebSocketFrame.java +++ b/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/WebSocketFrame.java @@ -1,7 +1,5 @@ package io.avaje.jex.websocket; -import java.io.IOException; -import java.io.OutputStream; import java.util.Arrays; import java.util.Map; import java.util.stream.Collectors; @@ -54,14 +52,6 @@ public interface WebSocketFrame { */ boolean isMasked(); - /** - * Writes the frame to the given output stream in WebSocket frame format. - * - * @param out the output stream to write to - * @throws IOException if an I/O error occurs - */ - void write(OutputStream out) throws IOException; - /** WebSocket opcodes */ public enum OpCode { CONTINUATION(0), diff --git a/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/internal/AbstractWebSocket.java b/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/internal/AbstractWebSocket.java index b84b0b5b..85e0cf77 100644 --- a/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/internal/AbstractWebSocket.java +++ b/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/internal/AbstractWebSocket.java @@ -248,7 +248,7 @@ public void send(String payload) { sendFrame(new WSFrame(OpCode.TEXT, true, payload)); } - public void sendFrame(WebSocketFrame frame) { + public void sendFrame(WSFrame frame) { lock.lock(); try { onFrameSent(frame); diff --git a/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/internal/WSFrame.java b/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/internal/WSFrame.java index 2b3218a3..f41d4134 100644 --- a/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/internal/WSFrame.java +++ b/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/internal/WSFrame.java @@ -346,9 +346,7 @@ public String toString() { } // ------------------------------------------------------------------------ - - @Override - public void write(OutputStream out) throws IOException { + void write(OutputStream out) throws IOException { byte header = 0; if (this.fin) { header |= 0x80; From 826e68be82de9d345a158445277bfc2eff56fd46 Mon Sep 17 00:00:00 2001 From: Josiah Noel <32279667+SentryMan@users.noreply.github.com> Date: Fri, 10 Oct 2025 10:16:48 -0400 Subject: [PATCH 05/11] Update pom.xml --- avaje-jex-websockets/pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/avaje-jex-websockets/pom.xml b/avaje-jex-websockets/pom.xml index eaf440a2..05d20b66 100644 --- a/avaje-jex-websockets/pom.xml +++ b/avaje-jex-websockets/pom.xml @@ -3,7 +3,7 @@ io.avaje avaje-jex-parent - 3.3-RC4 + 3.3-RC5 avaje-jex-websockets From 94d9fa3bbe1490d72f475bf84a376bc58e89e8c5 Mon Sep 17 00:00:00 2001 From: Josiah Noel <32279667+SentryMan@users.noreply.github.com> Date: Wed, 15 Oct 2025 01:05:02 -0400 Subject: [PATCH 06/11] throw if less than JDK 26 --- avaje-jex-websockets/pom.xml | 20 +++++++++++++++++-- .../avaje/jex/websocket/WebSocketPlugin.java | 5 +++++ 2 files changed, 23 insertions(+), 2 deletions(-) diff --git a/avaje-jex-websockets/pom.xml b/avaje-jex-websockets/pom.xml index 05d20b66..be41717a 100644 --- a/avaje-jex-websockets/pom.xml +++ b/avaje-jex-websockets/pom.xml @@ -5,7 +5,8 @@ avaje-jex-parent 3.3-RC5 - avaje-jex-websockets + avaje-jex-websocket + io.avaje avaje-jex @@ -16,4 +17,19 @@ test - \ No newline at end of file + + + lessThan26 + + [21, 25] + + + + io.github.robaho + httpserver + test + + + + + diff --git a/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/WebSocketPlugin.java b/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/WebSocketPlugin.java index 06070a78..2242e836 100644 --- a/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/WebSocketPlugin.java +++ b/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/WebSocketPlugin.java @@ -28,6 +28,11 @@ public WebSocketPlugin ws(String path, WebSocketListener listener, Role... roles @Override public void apply(Jex jex) { jex.routing().addAll(handlers); + if (Runtime.version().feature() < 26 + && jex.config().serverProvider().getClass().getPackageName().indexOf("sun.") != -1) { + throw new IllegalStateException( + "WebSocket support requires Java 26+ when using the default JDK server provider. Upgrade your JDK or use a different server provider such as robaho httpserver"); + } } public static WebSocketPlugin create() { From 9efc13d07875951c98db8604aa2d2d7a7488a98c Mon Sep 17 00:00:00 2001 From: Josiah Noel <32279667+SentryMan@users.noreply.github.com> Date: Sun, 2 Nov 2025 00:32:15 -0400 Subject: [PATCH 07/11] test with robaho for now --- avaje-jex-websockets/pom.xml | 14 ++------------ .../io/avaje/jex/websocket/WebSocketPlugin.java | 12 ++++++++---- 2 files changed, 10 insertions(+), 16 deletions(-) diff --git a/avaje-jex-websockets/pom.xml b/avaje-jex-websockets/pom.xml index be41717a..b23e18ad 100644 --- a/avaje-jex-websockets/pom.xml +++ b/avaje-jex-websockets/pom.xml @@ -3,7 +3,7 @@ io.avaje avaje-jex-parent - 3.3-RC5 + 3.3 avaje-jex-websocket @@ -16,20 +16,10 @@ avaje-jex-test test - - - - lessThan26 - - [21, 25] - - io.github.robaho httpserver test - - - + diff --git a/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/WebSocketPlugin.java b/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/WebSocketPlugin.java index 2242e836..e2c65553 100644 --- a/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/WebSocketPlugin.java +++ b/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/WebSocketPlugin.java @@ -28,10 +28,14 @@ public WebSocketPlugin ws(String path, WebSocketListener listener, Role... roles @Override public void apply(Jex jex) { jex.routing().addAll(handlers); - if (Runtime.version().feature() < 26 - && jex.config().serverProvider().getClass().getPackageName().indexOf("sun.") != -1) { - throw new IllegalStateException( - "WebSocket support requires Java 26+ when using the default JDK server provider. Upgrade your JDK or use a different server provider such as robaho httpserver"); + + var provider = jex.config().serverProvider().getClass().getPackageName(); + + if (Runtime.version().feature() < 27 && provider.indexOf("sun.") != -1 + || provider.indexOf("jetty.") != -1) { + throw new UnsupportedOperationException( + "WebSocket not supported for this version of %s, use a newer/different server provider" + .formatted(jex.config().serverProvider().getClass())); } } From 374926aa06dcb7d7ab1661a72eb2e029ee04de1b Mon Sep 17 00:00:00 2001 From: Josiah Noel <32279667+SentryMan@users.noreply.github.com> Date: Sun, 2 Nov 2025 00:41:26 -0400 Subject: [PATCH 08/11] faster test --- .../jex/websocket/exception/CloseCode.java | 23 ++++++++++++------- .../jex/websocket/internal/CloseFrame.java | 4 ++-- .../internal/WebSocketClientUtil.java | 4 +++- 3 files changed, 20 insertions(+), 11 deletions(-) diff --git a/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/exception/CloseCode.java b/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/exception/CloseCode.java index 4b92320e..b810a95b 100644 --- a/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/exception/CloseCode.java +++ b/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/exception/CloseCode.java @@ -1,6 +1,10 @@ package io.avaje.jex.websocket.exception; +import java.util.HashMap; +import java.util.Map; + public enum CloseCode { + // Enum constants remain the same NORMAL_CLOSURE(1000), GOING_AWAY(1001), PROTOCOL_ERROR(1002), @@ -14,22 +18,25 @@ public enum CloseCode { INTERNAL_SERVER_ERROR(1011), TLS_HANDSHAKE(1015); - public static CloseCode find(int value) { + private final int code; + + private static final Map CODES_MAP = new HashMap<>(); + + static { for (CloseCode code : values()) { - if (code.getValue() == value) { - return code; - } + CODES_MAP.put(code.code(), code); } - return null; } - private final int code; - CloseCode(int code) { this.code = code; } - public int getValue() { + public int code() { return this.code; } + + public static CloseCode find(int value) { + return CODES_MAP.get(value); + } } diff --git a/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/internal/CloseFrame.java b/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/internal/CloseFrame.java index a7dadec0..8c621cf1 100644 --- a/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/internal/CloseFrame.java +++ b/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/internal/CloseFrame.java @@ -8,8 +8,8 @@ private static byte[] generatePayload(CloseCode code, String closeReason) { if (code != null) { var reasonBytes = text2Binary(closeReason); var payload = new byte[reasonBytes.length + 2]; - payload[0] = (byte) (code.getValue() >> 8 & 0xFF); - payload[1] = (byte) (code.getValue() & 0xFF); + payload[0] = (byte) (code.code() >> 8 & 0xFF); + payload[1] = (byte) (code.code() & 0xFF); System.arraycopy(reasonBytes, 0, payload, 2, reasonBytes.length); return payload; } diff --git a/avaje-jex-websockets/src/test/java/io/avaje/jex/websocket/internal/WebSocketClientUtil.java b/avaje-jex-websockets/src/test/java/io/avaje/jex/websocket/internal/WebSocketClientUtil.java index f1c5fbe7..94e7ee4c 100644 --- a/avaje-jex-websockets/src/test/java/io/avaje/jex/websocket/internal/WebSocketClientUtil.java +++ b/avaje-jex-websockets/src/test/java/io/avaje/jex/websocket/internal/WebSocketClientUtil.java @@ -9,6 +9,8 @@ import java.util.concurrent.TimeUnit; import java.util.function.Consumer; +import io.avaje.jex.websocket.exception.CloseCode; + public class WebSocketClientUtil { public static WebSocket createWSC( @@ -36,7 +38,7 @@ public CompletionStage onText( text = new StringBuilder(); } webSocket.request(1); - + webSocket.sendClose(CloseCode.NORMAL_CLOSURE.code(), "cya"); return null; } From 2873394408f1d064778af725688d748a285dc629 Mon Sep 17 00:00:00 2001 From: Josiah Noel <32279667+SentryMan@users.noreply.github.com> Date: Sun, 2 Nov 2025 22:39:58 -0500 Subject: [PATCH 09/11] Update CloseCode.java --- .../main/java/io/avaje/jex/websocket/exception/CloseCode.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/exception/CloseCode.java b/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/exception/CloseCode.java index b810a95b..b7d1131f 100644 --- a/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/exception/CloseCode.java +++ b/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/exception/CloseCode.java @@ -4,7 +4,7 @@ import java.util.Map; public enum CloseCode { - // Enum constants remain the same + NORMAL_CLOSURE(1000), GOING_AWAY(1001), PROTOCOL_ERROR(1002), @@ -20,7 +20,7 @@ public enum CloseCode { private final int code; - private static final Map CODES_MAP = new HashMap<>(); + private static final Map CODES_MAP = HashMap.newHashMap(values().length); static { for (CloseCode code : values()) { From f65e0a29fd81bbf549f3f30d7d1d9a6264a09b64 Mon Sep 17 00:00:00 2001 From: Josiah Noel <32279667+SentryMan@users.noreply.github.com> Date: Sun, 2 Nov 2025 23:04:36 -0500 Subject: [PATCH 10/11] doc --- .../avaje/jex/websocket/WebSocketPlugin.java | 46 +++++- .../io/avaje/jex/websocket/WsContext.java | 149 +++++++++++++++--- .../jex/websocket/exception/CloseCode.java | 17 +- .../exception/WebSocketException.java | 41 ++++- .../websocket/internal/AbstractWebSocket.java | 2 +- 5 files changed, 223 insertions(+), 32 deletions(-) diff --git a/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/WebSocketPlugin.java b/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/WebSocketPlugin.java index e2c65553..c7c8e272 100644 --- a/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/WebSocketPlugin.java +++ b/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/WebSocketPlugin.java @@ -8,23 +8,60 @@ import io.avaje.jex.Routing.HttpService; import io.avaje.jex.security.Role; import io.avaje.jex.spi.JexPlugin; -import io.avaje.jex.websocket.WebSocketListener.Builder; +/** + * A plugin for the Jex web framework to simplify the registration of WebSocket handlers. + * + *

This class provides a fluent API for mapping specific URL paths to {@link WebSocketListener} + * implementations and integrates them into the Jex application's routing. + * + *

**Note on Server Compatibility:** WebSocket support may be limited or unavailable with older versions of the default + * JDK server provider or certain third-party providers like Jetty, requiring a newer version or an + * alternative server implementation to function correctly. + */ public class WebSocketPlugin implements JexPlugin { private final List handlers = new ArrayList<>(); - public WebSocketPlugin ws(String path, Consumer consumer, Role... roles) { + /** + * Registers a WebSocket listener for a given path using a fluent builder approach. + * + * @param path The URL path to which the WebSocket endpoint will be mapped (e.g., "/ws/chat"). + * @param consumer A {@code Consumer} that configures the {@link WebSocketListener.Builder} to + * create the listener. + * @param roles Optional roles required to access this WebSocket endpoint. + * @return This {@code WebSocketPlugin} instance for method chaining. + */ + public WebSocketPlugin ws( + String path, Consumer consumer, Role... roles) { var builder = WebSocketListener.builder(); consumer.accept(builder); return ws(path, builder.build(), roles); } + /** + * Registers a pre-built {@link WebSocketListener} for a given path. + * + * @param path The URL path to which the WebSocket endpoint will be mapped (e.g., "/ws/chat"). + * @param listener The {@link WebSocketListener} instance that handles WebSocket events. + * @param roles Optional roles required to access this WebSocket endpoint. + * @return This {@code WebSocketPlugin} instance for method chaining. + */ public WebSocketPlugin ws(String path, WebSocketListener listener, Role... roles) { handlers.add(r -> r.get(path, new DWebSocketHandler(listener), roles)); return this; } + /** + * Applies the plugin to the Jex application. + * + *

This method adds all registered WebSocket handlers to the Jex router and checks for server + * provider compatibility before application startup. + * + * @param jex The {@link Jex} instance to which the plugin is being applied. + * @throws UnsupportedOperationException if the current server provider is detected as + * incompatible with WebSocket functionality. + */ @Override public void apply(Jex jex) { jex.routing().addAll(handlers); @@ -39,6 +76,11 @@ public void apply(Jex jex) { } } + /** + * Creates a new instance of the {@code WebSocketPlugin}. + * + * @return A new {@code WebSocketPlugin}. + */ public static WebSocketPlugin create() { return new WebSocketPlugin(); } diff --git a/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/WsContext.java b/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/WsContext.java index 319eb3e9..b41ee266 100644 --- a/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/WsContext.java +++ b/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/WsContext.java @@ -5,7 +5,13 @@ import io.avaje.jex.http.Context; import io.avaje.jex.websocket.exception.CloseCode; -/** The context for a WebSocket event */ +/** + * The abstract sealed base class that provides the context for a specific WebSocket event. + * + *

This class encapsulates the underlying {@link Context} (request context) and the {@link + * WebSocket} connection, offering methods for sending messages and controlling the session. + * Subclasses represent specific WebSocket events (e.g., Open, Message, Close, Error). + */ public abstract sealed class WsContext { protected final Context ctx; @@ -17,77 +23,114 @@ protected WsContext(Context ctx, WebSocket ws) { } /** - * Serializes object to a JSON-string using the registered JsonMapper and sends it over the socket + * Serializes an object to a JSON string using the registered {@code JsonService} and sends it as + * a text frame over the socket. + * + * @param message The object to be serialized and sent. */ public void send(Object message) { - ws.send(ctx.jsonService().toJsonString(message)); + send(ctx.jsonService().toJsonString(message)); } - /** Sends a String over the socket */ + /** + * Sends a {@code String} message (Text Frame) over the socket. + * + * @param message The string message to send. + */ public void send(String message) { ws.send(message); } - /** Sends a byte[] over the socket */ + /** + * Sends a {@code byte[]} message (Binary Frame) over the socket. + * + * @param message The binary data to send. + */ public void send(byte[] message) { ws.send(message); } - /** Sends a ping over the socket */ + /** Sends a Ping control frame over the socket */ public void sendPing() { sendPing(null); } - /** Sends a ping over the socket */ + /** + * Sends a Ping control frame over the socket. + * + * @param applicationData Optional application data to include in the Ping frame. + */ public void sendPing(byte[] applicationData) { ws.ping(applicationData != null ? applicationData : new byte[0]); } /** - * Return the request Context. + * Returns the underlying HTTP request {@code Context}. This provides access to request headers, + * path parameters, and attributes. * - * @return the request + * @return The request {@code Context}. */ public Context ctx() { return ctx; } /** - * Return the Websocket. + * Returns the underlying {@code WebSocket} session object. * - * @return the request + * @return The {@code WebSocket} session object. */ public WebSocket ws() { return ws; } - /** Close the session */ + /** Closes the WebSocket session gracefully with a default reason. */ public void closeSession() { - ws.close(CloseCode.NORMAL_CLOSURE, "cya", false); + ws.close(CloseCode.NORMAL_CLOSURE, "Normally closed", false); } - /** Close the session with a CloseCode */ + /** + * Closes the WebSocket session with a specified {@link CloseCode} and an empty reason string. + * + * @param code The {@link CloseCode} to send. + */ public void closeSession(CloseCode code) { ws.close(code, "", false); } - /** Close the session with a code and reason */ + /** + * Closes the WebSocket session with a specified {@link CloseCode} and a descriptive reason. + * + * @param code The {@link CloseCode} to send. + * @param reason A descriptive string explaining why the session is being closed. + */ public void closeSession(CloseCode code, String reason) { ws.close(code, reason, false); } + /** + * Represents the context for an open event. This event occurs when a new connection is + * established and the handshake is complete. + */ public static final class WsOpen extends WsContext { WsOpen(Context ctx, WebSocket ws) { super(ctx, ws); } } + /** + * Represents the context for a Pong control frame received from the remote endpoint. Pongs are + * typically received in response to a Ping sent by this endpoint. + */ public static final class WsPong extends WsMessageCtx { WsPong(Context ctx, WebSocket ws, WebSocketFrame wsFrame) { super(ctx, ws, wsFrame); } } + /** + * Represents the context for an error event. This is triggered when an unhandled exception occurs + * during the lifecycle of the connection. + */ public static final class WsError extends WsContext { private final Exception error; @@ -96,12 +139,20 @@ public static final class WsError extends WsContext { this.error = error; } - /** Get the Throwable error that occurred */ + /** + * Gets the {@code Exception} that caused the error event. + * + * @return The underlying {@code Exception}. + */ public Exception error() { return error; } } + /** + * Represents the context for a close event. This event is triggered when the connection is + * closed, either locally or by the remote endpoint. + */ public static final class WsClose extends WsContext { private final CloseCode closeCode; private final String reason; @@ -115,22 +166,39 @@ public static final class WsClose extends WsContext { this.initiatedByRemote = initiatedByRemote; } - /** The int status for why connection was closed */ + /** + * Gets the {@link CloseCode} provided when the connection was closed. + * + * @return The {@link CloseCode} indicating the reason for closure. + */ public CloseCode closeCode() { return closeCode; } - /** The reason for the close */ + /** + * Gets the descriptive reason string for the close event, as provided by the closing endpoint. + * + * @return The reason string. + */ public String reason() { return reason; } - /** True if the close was initiated by the remote endpoint */ + /** + * Indicates whether the close handshake was initiated by the remote endpoint (true) or by the + * local endpoint (false). + * + * @return {@code true} if closed by the remote; {@code false} if closed locally. + */ public boolean initiatedByRemote() { return initiatedByRemote; } } + /** + * The abstract sealed base class for WebSocket contexts that involve receiving a data frame + * (e.g., Text, Binary, Pong). + */ public abstract static sealed class WsMessageCtx extends WsContext { private final WebSocketFrame wsFrame; @@ -139,21 +207,27 @@ public abstract static sealed class WsMessageCtx extends WsContext { this.wsFrame = wsFrame; } - /** Get the underlying frame */ + /** + * Gets the underlying raw {@code WebSocketFrame}. This is useful for inspecting frame metadata + * like opcode, RSV bits, etc. + * + * @return The raw {@code WebSocketFrame}. + */ public WebSocketFrame wsFrame() { return wsFrame; } /** - * Indicates if this frame is the final fragment in a message. + * Indicates if this frame is the final fragment of a fragmented message. * - * @return true if final fragment, false otherwise + * @return {@code true} if this is the final fragment (FIN bit is set), {@code false} otherwise. */ public boolean isFin() { return wsFrame.isFin(); } } + /** Represents the context for a binary message received from the remote endpoint. */ public static final class WsBinaryMessage extends WsMessageCtx { private final byte[] data; @@ -162,12 +236,17 @@ public static final class WsBinaryMessage extends WsMessageCtx { this.data = data; } - /** Get the binary data of the message */ + /** + * Gets the raw binary data (payload) of the message. + * + * @return The message content as a byte array. + */ public byte[] data() { return data; } } + /** Represents the context for a text message received from the remote endpoint. */ public static final class WsMessage extends WsMessageCtx { private final String message; @@ -176,17 +255,35 @@ public static final class WsMessage extends WsMessageCtx { this.message = message; } - /** Receive a string message from the client */ + /** + * Gets the text message received from the client. + * + * @return The message content as a {@code String}. + */ public String message() { return message; } - /** Receive a message from the client as a class */ + /** + * Deserializes the received JSON string message into an object of the specified {@code Type}. + * This uses the application's registered {@code JsonService}. + * + * @param The target type. + * @param type The {@code Type} (e.g., a generic type) to deserialize the message into. + * @return The deserialized object. + */ public T messageAsClass(Type type) { return ctx.jsonService().fromJson(type, message); } - /** See Also: messageAsClass(Type) */ + /** + * Deserializes the received JSON string message into an object of the specified {@code Class}. + * + * @param The target class type. + * @param clazz The {@code Class} to deserialize the message into. + * @return The deserialized object. + * @see #messageAsClass(Type) + */ public T messageAsClass(Class clazz) { return messageAsClass((Type) clazz); } diff --git a/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/exception/CloseCode.java b/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/exception/CloseCode.java index b7d1131f..db922c51 100644 --- a/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/exception/CloseCode.java +++ b/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/exception/CloseCode.java @@ -3,8 +3,11 @@ import java.util.HashMap; import java.util.Map; +/** + * Websocket Close Codes. These codes are used to indicate the reason why a WebSocket connection has + * been closed. + */ public enum CloseCode { - NORMAL_CLOSURE(1000), GOING_AWAY(1001), PROTOCOL_ERROR(1002), @@ -32,10 +35,22 @@ public enum CloseCode { this.code = code; } + /** + * Returns the integer value of this close code. + * + * @return The integer close code. + */ public int code() { return this.code; } + /** + * Finds the {@code CloseCode} enum constant corresponding to the given integer value. + * + * @param value The integer value of the close code to find. + * @return The corresponding {@code CloseCode} enum constant, or {@code null} if no match is + * found. + */ public static CloseCode find(int value) { return CODES_MAP.get(value); } diff --git a/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/exception/WebSocketException.java b/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/exception/WebSocketException.java index 28002f20..ec05c149 100644 --- a/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/exception/WebSocketException.java +++ b/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/exception/WebSocketException.java @@ -1,5 +1,11 @@ package io.avaje.jex.websocket.exception; +/** + * An unchecked exception specifically for signaling errors that occur during WebSocket + * communication. This exception wraps a standard {@link CloseCode} and a descriptive reason, making + * it easy to communicate the cause of a connection failure or protocol violation in a way that + * aligns with the WebSocket protocol. + */ public class WebSocketException extends RuntimeException { private static final long serialVersionUID = 1L; @@ -8,25 +14,56 @@ public class WebSocketException extends RuntimeException { private final String reason; + /** + * Constructs a new {@code WebSocketException} with a specific close code and reason. The + * exception will have no cause. + * + * @param code The {@link CloseCode} that indicates the nature of the error. + * @param reason A descriptive message explaining the error. + */ public WebSocketException(CloseCode code, String reason) { this(code, reason, null); } + /** + * Constructs a new {@code WebSocketException} with a specific close code, reason, and a cause. + * + * @param code The {@link CloseCode} that indicates the nature of the error. + * @param reason A descriptive message explaining the error. + * @param cause The underlying exception that caused this {@code WebSocketException}. + */ public WebSocketException(CloseCode code, String reason, Exception cause) { super(code + ": " + reason, cause); this.code = code; this.reason = reason; } + /** + * Constructs a new {@code WebSocketException} from an existing exception. It defaults to {@link + * CloseCode#INTERNAL_SERVER_ERROR} for the close code, using the cause's {@code toString()} + * method for the reason. + * + * @param cause The underlying exception that caused this {@code WebSocketException}. + */ public WebSocketException(Exception cause) { this(CloseCode.INTERNAL_SERVER_ERROR, cause.toString(), cause); } - public CloseCode getCode() { + /** + * Returns the WebSocket close code associated with this exception. + * + * @return The {@link CloseCode} enum value. + */ + public CloseCode code() { return this.code; } - public String getReason() { + /** + * Returns the descriptive reason associated with this exception. + * + * @return The descriptive reason string. + */ + public String reason() { return this.reason; } } diff --git a/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/internal/AbstractWebSocket.java b/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/internal/AbstractWebSocket.java index 85e0cf77..8f6453ea 100644 --- a/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/internal/AbstractWebSocket.java +++ b/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/internal/AbstractWebSocket.java @@ -225,7 +225,7 @@ void readWebsocket() { } catch (Exception e) { onError(e); if (e instanceof WebSocketException wse) { - doClose(wse.getCode(), wse.getReason(), false); + doClose(wse.code(), wse.reason(), false); } else { doClose(CloseCode.ABNORMAL_CLOSURE, e.toString(), false); } From 32acc9110cbe391f89a493bb5eae310595891dbd Mon Sep 17 00:00:00 2001 From: Josiah Noel <32279667+SentryMan@users.noreply.github.com> Date: Sun, 9 Nov 2025 14:19:58 -0500 Subject: [PATCH 11/11] nested builder --- .../avaje/jex/websocket/ListenerBuilder.java | 132 ------------------ .../jex/websocket/WebSocketListener.java | 84 +++++++++-- 2 files changed, 74 insertions(+), 142 deletions(-) delete mode 100644 avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/ListenerBuilder.java diff --git a/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/ListenerBuilder.java b/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/ListenerBuilder.java deleted file mode 100644 index 8e37750f..00000000 --- a/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/ListenerBuilder.java +++ /dev/null @@ -1,132 +0,0 @@ -package io.avaje.jex.websocket; - -import java.util.function.Consumer; - -import io.avaje.jex.websocket.WsContext.WsBinaryMessage; -import io.avaje.jex.websocket.WsContext.WsClose; -import io.avaje.jex.websocket.WsContext.WsError; -import io.avaje.jex.websocket.WsContext.WsMessage; -import io.avaje.jex.websocket.WsContext.WsOpen; -import io.avaje.jex.websocket.WsContext.WsPong; - -/** A builder for creating a {@link WebSocketListener} with specific event handlers. */ -class ListenerBuilder implements WebSocketListener.Builder { - private Consumer onOpen; - private Consumer onMessage; - private Consumer onBinaryMessage; - private Consumer onClose; - private Consumer onPong; - private Consumer onError; - - /** - * Set the handler for the WebSocket open event. - * - * @param handler Consumer for {@link WsOpen} - * @return this builder - */ - @Override - public ListenerBuilder onOpen(Consumer handler) { - this.onOpen = handler; - return this; - } - - /** - * Set the handler for the WebSocket text message event. - * - * @param handler Consumer for {@link WsMessage} - * @return this builder - */ - @Override - public ListenerBuilder onMessage(Consumer handler) { - this.onMessage = handler; - return this; - } - - /** - * Set the handler for the WebSocket binary message event. - * - * @param handler Consumer for {@link WsBinaryMessage} - * @return this builder - */ - @Override - public ListenerBuilder onBinaryMessage(Consumer handler) { - this.onBinaryMessage = handler; - return this; - } - - /** - * Set the handler for the WebSocket close event. - * - * @param handler Consumer for {@link WsClose} - * @return this builder - */ - @Override - public ListenerBuilder onClose(Consumer handler) { - this.onClose = handler; - return this; - } - - /** - * Set the handler for the WebSocket pong event. - * - * @param handler Consumer for {@link WsPong} - * @return this builder - */ - @Override - public ListenerBuilder onPong(Consumer handler) { - this.onPong = handler; - return this; - } - - /** - * Set the handler for the WebSocket error event. - * - * @param handler Consumer for {@link WsError} - * @return this builder - */ - @Override - public ListenerBuilder onError(Consumer handler) { - this.onError = handler; - return this; - } - - /** - * Build a {@link WebSocketListener} implementation using the configured handlers. - * - * @return a new {@link WebSocketListener} instance - */ - @Override - public WebSocketListener build() { - return new WebSocketListener() { - @Override - public void onOpen(WsOpen wsOpen) { - if (onOpen != null) onOpen.accept(wsOpen); - } - - @Override - public void onMessage(WsMessage message) { - if (onMessage != null) onMessage.accept(message); - } - - @Override - public void onBinaryMessage(WsBinaryMessage binaryPayload) { - if (onBinaryMessage != null) onBinaryMessage.accept(binaryPayload); - } - - @Override - public void onClose(WsClose wsClose) { - if (onClose != null) onClose.accept(wsClose); - } - - @Override - public void onPong(WsPong wsPong) { - if (onPong != null) onPong.accept(wsPong); - } - - @Override - public void onError(WsError wsError) { - if (onError != null) onError.accept(wsError); - } - }; - } -} diff --git a/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/WebSocketListener.java b/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/WebSocketListener.java index c11d0dd4..cca01a27 100644 --- a/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/WebSocketListener.java +++ b/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/WebSocketListener.java @@ -21,7 +21,7 @@ public interface WebSocketListener { * @return the builder */ static Builder builder() { - return new ListenerBuilder(); + return new Builder(); } /** @@ -66,7 +66,16 @@ default void onPong(WsPong wsPong) {} */ default void onError(WsError wsError) {} - interface Builder { + /** A builder for creating a {@link WebSocketListener} with specific event handlers. */ + final class Builder { + private Consumer onOpen; + private Consumer onMessage; + private Consumer onBinaryMessage; + private Consumer onClose; + private Consumer onPong; + private Consumer onError; + + private Builder() {} /** * Set the handler for the WebSocket open event. @@ -74,7 +83,10 @@ interface Builder { * @param handler Consumer for {@link WsOpen} * @return this builder */ - Builder onOpen(Consumer handler); + public Builder onOpen(Consumer handler) { + this.onOpen = handler; + return this; + } /** * Set the handler for the WebSocket text message event. @@ -82,7 +94,10 @@ interface Builder { * @param handler Consumer for {@link WsMessage} * @return this builder */ - Builder onMessage(Consumer handler); + public Builder onMessage(Consumer handler) { + this.onMessage = handler; + return this; + } /** * Set the handler for the WebSocket binary message event. @@ -90,7 +105,10 @@ interface Builder { * @param handler Consumer for {@link WsBinaryMessage} * @return this builder */ - Builder onBinaryMessage(Consumer handler); + public Builder onBinaryMessage(Consumer handler) { + this.onBinaryMessage = handler; + return this; + } /** * Set the handler for the WebSocket close event. @@ -98,7 +116,10 @@ interface Builder { * @param handler Consumer for {@link WsClose} * @return this builder */ - Builder onClose(Consumer handler); + public Builder onClose(Consumer handler) { + this.onClose = handler; + return this; + } /** * Set the handler for the WebSocket pong event. @@ -106,7 +127,10 @@ interface Builder { * @param handler Consumer for {@link WsPong} * @return this builder */ - Builder onPong(Consumer handler); + public Builder onPong(Consumer handler) { + this.onPong = handler; + return this; + } /** * Set the handler for the WebSocket error event. @@ -114,9 +138,49 @@ interface Builder { * @param handler Consumer for {@link WsError} * @return this builder */ - Builder onError(Consumer handler); + public Builder onError(Consumer handler) { + this.onError = handler; + return this; + } - /** Build the WebSocketListener. */ - WebSocketListener build(); + /** + * Build a {@link WebSocketListener} implementation using the configured handlers. + * + * @return a new {@link WebSocketListener} instance + */ + public WebSocketListener build() { + return new WebSocketListener() { + + @Override + public void onOpen(WsOpen wsOpen) { + if (onOpen != null) onOpen.accept(wsOpen); + } + + @Override + public void onMessage(WsMessage message) { + if (onMessage != null) onMessage.accept(message); + } + + @Override + public void onBinaryMessage(WsBinaryMessage binaryPayload) { + if (onBinaryMessage != null) onBinaryMessage.accept(binaryPayload); + } + + @Override + public void onClose(WsClose wsClose) { + if (onClose != null) onClose.accept(wsClose); + } + + @Override + public void onPong(WsPong wsPong) { + if (onPong != null) onPong.accept(wsPong); + } + + @Override + public void onError(WsError wsError) { + if (onError != null) onError.accept(wsError); + } + }; + } } }