diff --git a/example/CMakeLists.txt b/example/CMakeLists.txt index 950c29db9b..d0f716978c 100644 --- a/example/CMakeLists.txt +++ b/example/CMakeLists.txt @@ -12,3 +12,6 @@ add_subdirectory (http) add_subdirectory (websocket) add_subdirectory (echo-op) + +# Repro tools for permessage-deflate receive path +add_subdirectory (repro) diff --git a/example/repro/CMakeLists.txt b/example/repro/CMakeLists.txt new file mode 100644 index 0000000000..1b7e74f9a5 --- /dev/null +++ b/example/repro/CMakeLists.txt @@ -0,0 +1,90 @@ +cmake_minimum_required(VERSION 3.16) +project(beast_pmd_repro LANGUAGES CXX) +set(CMAKE_CXX_STANDARD 17) +set(CMAKE_CXX_STANDARD_REQUIRED ON) + +# 这两个任选其一传入: +# -DBOOST_SRC=(已执行 b2 headers / 可选 b2 stage) +# 或 +# -DBOOST_PREFIX=<安装前缀> +set(BOOST_SRC "" CACHE PATH "Boost superproject root (with ./boost after `b2 headers`)") +set(BOOST_PREFIX "" CACHE PATH "Boost install prefix") + +# 解析 include/lib 路径 +unset(BOOST_INC) +unset(BOOST_LIB) + +if(BOOST_SRC) + # 源码树模式 + set(BOOST_INC "${BOOST_SRC}") # 这里直接包含 $BOOST_SRC/boost/... + set(BOOST_LIB "${BOOST_SRC}/stage/lib") # 库来自 stage/lib + if(NOT EXISTS "${BOOST_INC}/boost/beast/core.hpp") + message(FATAL_ERROR + "Boost headers not found at ${BOOST_INC}/boost/...; run `b2 headers` in ${BOOST_SRC}") + endif() +elseif(BOOST_PREFIX) + # 安装前缀模式 + set(BOOST_INC "${BOOST_PREFIX}/include") + set(BOOST_LIB "${BOOST_PREFIX}/lib") + if(NOT EXISTS "${BOOST_INC}/boost/beast/core.hpp") + message(FATAL_ERROR + "Boost headers not found at ${BOOST_INC}/boost/...; reinstall headers to the prefix") + endif() +else() + # 自动尝试:优先用环境变量 BOOST_SRC,其次 $HOME/opt/boost-dev + if(DEFINED ENV{BOOST_SRC} AND EXISTS "$ENV{BOOST_SRC}/boost/beast/core.hpp") + set(BOOST_INC "$ENV{BOOST_SRC}") + set(BOOST_LIB "$ENV{BOOST_SRC}/stage/lib") + elseif(DEFINED ENV{HOME} AND EXISTS "$ENV{HOME}/opt/boost-dev/include/boost/beast/core.hpp") + set(BOOST_INC "$ENV{HOME}/opt/boost-dev/include") + set(BOOST_LIB "$ENV{HOME}/opt/boost-dev/lib") + else() + message(FATAL_ERROR + "Set -DBOOST_SRC= (after `b2 headers`) " + "or -DBOOST_PREFIX=.") + endif() +endif() + +message(STATUS "Using BOOST_INC=${BOOST_INC}") +message(STATUS "Using BOOST_LIB=${BOOST_LIB}") + +find_package(Threads REQUIRED) +find_package(OpenSSL QUIET) + +# ---- 定义接口库,集中注入 include/lib/link 规则 ---- +add_library(lib-asio INTERFACE) +target_include_directories(lib-asio INTERFACE "${BOOST_INC}") +target_link_directories(lib-asio INTERFACE "${BOOST_LIB}") + +# Boost.System(Beast/Asio 需要) +find_library(BOOST_SYSTEM_LIB NAMES boost_system PATHS "${BOOST_LIB}" NO_DEFAULT_PATH) +if(BOOST_SYSTEM_LIB) + target_link_libraries(lib-asio INTERFACE "${BOOST_SYSTEM_LIB}") +else() + # 若上面没找到,依赖 -L${BOOST_LIB} 后用普通名解析 + target_link_libraries(lib-asio INTERFACE boost_system) +endif() +target_link_libraries(lib-asio INTERFACE Threads::Threads) +if(OpenSSL_FOUND) + target_link_libraries(lib-asio INTERFACE OpenSSL::SSL OpenSSL::Crypto) +endif() + +# Beast 是 header-only,复用 asio 的设置 +add_library(lib-beast INTERFACE) +target_link_libraries(lib-beast INTERFACE lib-asio) + +# 让可执行文件运行时能直接找到 stage/lib 或 prefix/lib +add_link_options("-Wl,-rpath,${BOOST_LIB}") + +# ---- 你的四个可执行文件,保持原样 ---- +add_executable (beast_pmd_server server_smallbuf.cpp) +target_link_libraries(beast_pmd_server PRIVATE lib-beast lib-asio) + +add_executable (beast_pmd_client_raw client_raw_replay.cpp) +target_link_libraries(beast_pmd_client_raw PRIVATE lib-asio) + +add_executable (beast_pmd_server_writer server_pmd_writer.cpp) +target_link_libraries(beast_pmd_server_writer PRIVATE lib-beast lib-asio) + +add_executable (beast_pmd_client_smallbuf client_smallbuf.cpp) +target_link_libraries(beast_pmd_client_smallbuf PRIVATE lib-beast lib-asio) diff --git a/example/repro/README.md b/example/repro/README.md new file mode 100644 index 0000000000..f2265e5675 --- /dev/null +++ b/example/repro/README.md @@ -0,0 +1,42 @@ +# PMD Receive Path Repro (Client-Side) + +This pair of tools reproduces a client-side decompression fault when using `permessage-deflate` (with context takeover) by stressing the WebSocket receive path with a 1-byte read buffer. + +## Build + +```bash +cmake -S . -B build -DBeast_BUILD_EXAMPLES=ON +cmake --build build -j +``` + +## Run (One-Click Repro) + +1) Start the server that negotiates PMD and sends many messages: + +```bash +./build/example/repro/beast_pmd_server_writer 0.0.0.0 8080 200 256 +``` + +2) Start the client that enables PMD and reads with a tiny buffer (default 1 byte): + +```bash +./build/example/repro/beast_pmd_client_smallbuf 127.0.0.1 8080 / +``` + +You should soon observe `read_some ec=...` on the client side (e.g., inflate invalid data), along with a summary line reporting counts of read calls, total bytes, and completed messages. + +## Notes + +- Both peers enable `permessage-deflate` without `*_no_context_takeover`, window bits 15. +- The server sends many large, compressible messages to exercise cross-message context. +- The client constrains `avail_out` by reading 1 byte at a time to stress the end-of-message inflate phase. +- If it doesn’t reproduce quickly, increase message count/size on the server (e.g., `1000 512`). + +## Optional + +- The client accepts an optional `buf_bytes` argument to adjust read buffer (default 1): + +```bash +./beast_pmd_client_smallbuf [target] [buf_bytes] +``` + diff --git a/example/repro/client_raw_replay.cpp b/example/repro/client_raw_replay.cpp new file mode 100644 index 0000000000..3d07215e21 --- /dev/null +++ b/example/repro/client_raw_replay.cpp @@ -0,0 +1,110 @@ +// TCP raw replay client: connects to a WebSocket server and writes bytes +// from a file to the socket, optionally in fixed-size chunks with delay. +// Intended to replay application-layer bytes extracted from a plaintext +// WebSocket PCAP (client->server direction), including the HTTP upgrade +// and subsequent frames. + +#include +#include +#include +#include + +namespace net = boost::asio; +using tcp = net::ip::tcp; + +int main(int argc, char** argv) +{ + if(argc < 4) + { + std::cerr << "Usage: beast_pmd_client_raw [chunk_bytes] [delay_ms]\n"; + std::cerr << " binfile: binary blob of client->server TCP payload (concatenated)\n"; + std::cerr << " chunk_bytes: send in fixed-size chunks (default: all at once)\n"; + std::cerr << " delay_ms: sleep between chunks (default: 0)\n"; + return 64; + } + + std::string host = argv[1]; + std::string port = argv[2]; + std::string path = argv[3]; + std::size_t chunk = (argc > 4) ? static_cast(std::stoul(argv[4])) : 0; + unsigned delay_ms = (argc > 5) ? static_cast(std::stoul(argv[5])) : 0; + + try + { + // Load file + std::ifstream ifs(path, std::ios::binary); + if(!ifs) + { + std::cerr << "Cannot open file: " << path << std::endl; + return 66; + } + std::vector data((std::istreambuf_iterator(ifs)), {}); + std::cout << "Loaded " << data.size() << " bytes from " << path << std::endl; + if(data.empty()) + { + std::cerr << "File is empty." << std::endl; + return 65; + } + + net::io_context ioc; + tcp::resolver res{ioc}; + tcp::socket sock{ioc}; + + auto const results = res.resolve(host, port); + net::connect(sock, results); + sock.set_option(tcp::no_delay(true)); + std::cout << "Connected to " << host << ":" << port << std::endl; + + // Reader thread to drain server responses to avoid backpressure + std::atomic stop{false}; + std::thread reader([&] + { + std::array buf{}; + boost::system::error_code ec; + while(!stop.load()) + { + auto n = sock.read_some(net::buffer(buf), ec); + if(ec) + break; + (void)n; // discard + } + }); + + // Write data + boost::system::error_code ec; + if(chunk == 0) + { + net::write(sock, net::buffer(data), ec); + if(ec) std::cerr << "write error: " << ec.message() << std::endl; + } + else + { + std::size_t off = 0; + while(off < data.size()) + { + auto n = std::min(chunk, data.size() - off); + auto sent = net::write(sock, net::buffer(&data[off], n), ec); + if(ec) + { + std::cerr << "write error: " << ec.message() << std::endl; + break; + } + off += sent; + if(delay_ms) std::this_thread::sleep_for(std::chrono::milliseconds(delay_ms)); + } + } + + stop = true; + sock.shutdown(tcp::socket::shutdown_both, ec); + sock.close(ec); + if(reader.joinable()) reader.join(); + } + catch(std::exception const& e) + { + std::cerr << "FATAL: " << e.what() << std::endl; + return 1; + } + + return 0; +} + diff --git a/example/repro/client_smallbuf.cpp b/example/repro/client_smallbuf.cpp new file mode 100644 index 0000000000..f77648f86b --- /dev/null +++ b/example/repro/client_smallbuf.cpp @@ -0,0 +1,102 @@ +// Beast WebSocket client that enables permessage-deflate and reads using a +// very small buffer (1 byte) to stress the receive/decompression path. + +#include +#include +#include +#include + +namespace net = boost::asio; +namespace beast = boost::beast; +namespace websocket = beast::websocket; +using tcp = net::ip::tcp; + +int main(int argc, char** argv) +{ + if(argc < 3) + { + std::cerr << "Usage: beast_pmd_client_smallbuf [target] [buf_bytes]\n"; + return 64; + } + std::string host = argv[1]; + std::string port = argv[2]; + std::string target = argc > 3 ? argv[3] : "/"; + std::size_t buf_bytes = argc > 4 ? static_cast(std::stoul(argv[4])) : 1; + + try + { + net::io_context ioc; + tcp::resolver res{ioc}; + tcp::socket sock{ioc}; + auto results = res.resolve(host, port); + net::connect(sock, results); + sock.set_option(tcp::no_delay(true)); + + websocket::stream ws{std::move(sock)}; + + // Enable PMD in client role (offer extension), with context takeover + websocket::permessage_deflate pmd; + pmd.client_enable = true; // offer PMD from client + pmd.server_enable = false; // irrelevant on client + pmd.server_no_context_takeover = false; + pmd.client_no_context_takeover = false; + pmd.server_max_window_bits = 15; + pmd.client_max_window_bits = 15; + pmd.msg_size_threshold = 0; + ws.set_option(pmd); + + // Upgrade + ws.handshake(host, target); + std::cout << "Handshake complete. PMD offered:" + << " client_enable=1 server_enable=0" + << " client_no_context_takeover=0 server_no_context_takeover=0" + << " win_bits(client/server)=15/15\n"; + std::cout << "Reading with tiny buffer of " << buf_bytes << " bytes..." << std::endl; + + // Log control frames + ws.control_callback([](websocket::frame_type ft, beast::string_view sv){ + char const* name = ft==websocket::frame_type::ping?"PING":(ft==websocket::frame_type::pong?"PONG":"CLOSE"); + std::cout << "[ctrl] " << name << " len=" << sv.size() << std::endl; + }); + + // Small buffer to constrain avail_out (1 byte) + std::vector tiny(buf_bytes?buf_bytes:1); + std::size_t total_bytes = 0; + std::size_t read_calls = 0; + std::size_t messages_done = 0; + for(;;) + { + beast::error_code ec; + auto n = ws.read_some(net::buffer(tiny.data(), tiny.size()), ec); + if(ec) + { + std::cout << "read_some ec=" << ec.message() << " (after " + << read_calls << " calls, " << total_bytes << " bytes, messages_done=" + << messages_done << ")" << std::endl; + break; + } + ++read_calls; + total_bytes += n; + if(ws.is_message_done()) + { + ++messages_done; + std::cout << "[message done] count=" << messages_done + << " total_bytes=" << total_bytes + << " type=" << (ws.got_binary()?"binary":"text") + << std::endl; + } + } + + beast::error_code ec; + ws.close(websocket::close_code::normal, ec); + std::cout << "Summary: read_calls=" << read_calls + << " total_bytes=" << total_bytes + << " messages_done=" << messages_done << std::endl; + } + catch(std::exception const& e) + { + std::cerr << "FATAL: " << e.what() << std::endl; + return 1; + } + return 0; +} diff --git a/example/repro/cm.txt b/example/repro/cm.txt new file mode 100644 index 0000000000..0d15a575f6 --- /dev/null +++ b/example/repro/cm.txt @@ -0,0 +1,73 @@ +cmake_minimum_required(VERSION 3.16) +project(beast_pmd_repro LANGUAGES CXX) +set(CMAKE_CXX_STANDARD 17) +set(CMAKE_CXX_STANDARD_REQUIRED ON) + +# ---- 1) 选择 Boost 安装前缀(你之前装在 $HOME/opt/boost-dev)---- +# 可通过 -DBOOST_PREFIX=... 覆盖;否则依次尝试环境和默认路径 +set(BOOST_PREFIX "" CACHE PATH "Prefix where Boost (modified) is installed") +if(NOT BOOST_PREFIX AND DEFINED ENV{BOOST_PREFIX}) + set(BOOST_PREFIX "$ENV{BOOST_PREFIX}") +endif() +if(NOT BOOST_PREFIX AND DEFINED ENV{BOOST_ROOT}) + set(BOOST_PREFIX "$ENV{BOOST_ROOT}") +endif() +if(NOT BOOST_PREFIX) + set(BOOST_PREFIX "$ENV{HOME}/opt/boost-dev") +endif() +message(STATUS "BOOST_PREFIX = ${BOOST_PREFIX}") + +# 自检:头文件是否存在 +if(NOT EXISTS "${BOOST_PREFIX}/include/boost/beast/core.hpp") + message(FATAL_ERROR + "Boost headers not found at ${BOOST_PREFIX}/include. " + "Set -DBOOST_PREFIX to your Boost install prefix.") +endif() + +# ---- 2) 定义承载 include/link 的 INTERFACE 目标 ---- +find_package(Threads REQUIRED) +find_package(OpenSSL QUIET) + +add_library(lib-asio INTERFACE) +target_include_directories(lib-asio INTERFACE "${BOOST_PREFIX}/include") +target_link_directories(lib-asio INTERFACE "${BOOST_PREFIX}/lib") +target_link_libraries(lib-asio INTERFACE Threads::Threads) + +# Boost.System(Asio/Beast 常用依赖) +find_library(BOOST_SYSTEM_LIB NAMES boost_system PATHS "${BOOST_PREFIX}/lib" NO_DEFAULT_PATH) +if(NOT BOOST_SYSTEM_LIB) + message(FATAL_ERROR + "libboost_system not found in ${BOOST_PREFIX}/lib. " + "Please build/install Boost.System into that prefix.") +endif() +target_link_libraries(lib-asio INTERFACE "${BOOST_SYSTEM_LIB}") + +# 可选:如果你示例会用到 HTTPS,再把 OpenSSL 带上 +if(OpenSSL_FOUND) + target_link_libraries(lib-asio INTERFACE OpenSSL::SSL OpenSSL::Crypto) +endif() + +# Beast 是 header-only,这里让它继承 asio 的设置 +add_library(lib-beast INTERFACE) +target_link_libraries(lib-beast INTERFACE lib-asio) + +# 让生成的可执行文件无需设置 LD_LIBRARY_PATH 也能运行 +add_link_options("-Wl,-rpath,${BOOST_PREFIX}/lib") + +# ---- 3) 你原来的 target 保持不变 ---- +# Minimal repro tools for permessage-deflate receive path +add_executable (beast_pmd_server + server_smallbuf.cpp) +target_link_libraries(beast_pmd_server PRIVATE lib-beast lib-asio) + +add_executable (beast_pmd_client_raw + client_raw_replay.cpp) +target_link_libraries(beast_pmd_client_raw PRIVATE lib-asio) + +add_executable (beast_pmd_server_writer + server_pmd_writer.cpp) +target_link_libraries(beast_pmd_server_writer PRIVATE lib-beast lib-asio) + +add_executable (beast_pmd_client_smallbuf + client_smallbuf.cpp) +target_link_libraries(beast_pmd_client_smallbuf PRIVATE lib-beast lib-asio) diff --git a/example/repro/server_pmd_writer.cpp b/example/repro/server_pmd_writer.cpp new file mode 100644 index 0000000000..c5d5edb825 --- /dev/null +++ b/example/repro/server_pmd_writer.cpp @@ -0,0 +1,106 @@ +// Beast WebSocket server that negotiates permessage-deflate (with context +// takeover) and proactively sends many compressed messages to a client. +// Used to reproduce client-side receive/inflate issues. + +#include +#include +#include +#include + +namespace net = boost::asio; +namespace beast = boost::beast; +namespace websocket = beast::websocket; +using tcp = net::ip::tcp; + +int main(int argc, char** argv) +{ + try + { + // Args: [address] [port] [messages] [payload_repeat] + auto const address = net::ip::make_address(argc > 1 ? argv[1] : "0.0.0.0"); + unsigned short port = static_cast(argc > 2 ? std::atoi(argv[2]) : 8080); + int messages = argc > 3 ? std::atoi(argv[3]) : 200; // number of messages to send + int repeat = argc > 4 ? std::atoi(argv[4]) : 256; // payload repetition to enlarge data + + net::io_context ioc; + tcp::acceptor acceptor{ioc, tcp::endpoint{address, port}}; + std::cout << "beast_pmd_server_writer listening on " << address.to_string() << ":" << port << std::endl; + + for(;;) + { + tcp::socket socket{ioc}; + acceptor.accept(socket); + std::cout << "Accepted connection from " << socket.remote_endpoint() << std::endl; + + websocket::stream ws{std::move(socket)}; + + // Enable PMD with context takeover (the default in this tool) + websocket::permessage_deflate pmd; + pmd.server_enable = true; + pmd.client_enable = false; + pmd.server_no_context_takeover = false; // keep compressor context across messages + pmd.client_no_context_takeover = false; + pmd.server_max_window_bits = 15; + pmd.client_max_window_bits = 15; + pmd.msg_size_threshold = 0; + ws.set_option(pmd); + + ws.accept(); + std::cout << "Handshake complete. PMD offered:" + << " server_enable=1 client_enable=0" + << " server_no_context_takeover=0 client_no_context_takeover=0" + << " win_bits(server/client)=15/15\n"; + std::cout << "Sending " << messages << " messages..." << std::endl; + + ws.text(true); + std::string base = "The quick brown fox jumps over the lazy dog. "; + std::string payload; + payload.reserve(base.size() * repeat); + for(int i = 0; i < repeat; ++i) payload += base; + + // Send many messages to exercise context takeover + std::size_t total_uncompressed = 0; + std::size_t total_wire = 0; + int sent_count = 0; + for(int i = 0; i < messages; ++i) + { + std::string msg = "[" + std::to_string(i) + "] " + payload; + beast::error_code ec; + auto before = msg.size(); + auto wrote = ws.write(net::buffer(msg), ec); // compressed if PMD negotiated + if(ec) + { + std::cout << "write error: " << ec.message() << std::endl; + break; + } + ++sent_count; + total_uncompressed += before; + total_wire += wrote; + if((i % 10) == 0) + { + std::cout << "sent msg#" << i + << " uncompressed=" << before + << " wire=" << wrote << std::endl; + } + } + + std::cout << "Summary: messages_sent=" << sent_count + << " total_uncompressed=" << total_uncompressed + << " total_wire=" << total_wire; + if(total_uncompressed) + { + double ratio = static_cast(total_wire) / static_cast(total_uncompressed); + std::cout << " wire/uncompressed=" << ratio; + } + std::cout << std::endl; + + beast::error_code ec; + ws.close(websocket::close_code::normal, ec); + } + } + catch(std::exception const& e) + { + std::cerr << "FATAL: " << e.what() << std::endl; + return 1; + } +} diff --git a/example/repro/server_smallbuf.cpp b/example/repro/server_smallbuf.cpp new file mode 100644 index 0000000000..9099f11732 --- /dev/null +++ b/example/repro/server_smallbuf.cpp @@ -0,0 +1,76 @@ +// Minimal Beast WebSocket server that enables permessage-deflate and +// reads with a tiny buffer to stress the receive/inflate path. + +#include +#include +#include +#include + +namespace net = boost::asio; +namespace beast = boost::beast; +namespace websocket = beast::websocket; +using tcp = net::ip::tcp; + +int main(int argc, char** argv) +{ + try + { + // Args: [address] [port] + auto const address = net::ip::make_address(argc > 1 ? argv[1] : "0.0.0.0"); + unsigned short port = static_cast(argc > 2 ? std::atoi(argv[2]) : 8080); + + net::io_context ioc; + tcp::acceptor acceptor{ioc, tcp::endpoint{address, port}}; + std::cout << "beast_pmd_server listening on " << address.to_string() << ":" << port << std::endl; + + for(;;) + { + tcp::socket socket{ioc}; + acceptor.accept(socket); + std::cout << "Accepted connection from " << socket.remote_endpoint() << std::endl; + + websocket::stream ws{std::move(socket)}; + + // Enable permessage-deflate (with context takeover) + websocket::permessage_deflate pmd; + pmd.server_enable = true; // offer PMD in server role + pmd.client_enable = false; // irrelevant on server + pmd.server_no_context_takeover = false; // keep sliding window + pmd.client_no_context_takeover = false; // keep sliding window + pmd.server_max_window_bits = 15; // 32K window + pmd.client_max_window_bits = 15; + pmd.msg_size_threshold = 0; // compress all sizes + ws.set_option(pmd); + + // Perform HTTP Upgrade + ws.accept(); + std::cout << "Handshake complete (PMD enabled if negotiated)." << std::endl; + + // Read loop using very small user buffer to force small avail_out + // and trigger edge-cases around end-of-message inflate. + std::array tiny{}; // 1–2 bytes at a time + for(;;) + { + beast::error_code ec; + // read_some lets us strictly bound avail_out + std::size_t n = ws.read_some(net::buffer(tiny.data(), 1), ec); + if(ec) + { + std::cout << "read_some ec=" << ec.message() << std::endl; + break; + } + // Optionally dump data size to observe flow + (void)n; + } + + beast::error_code ec; + ws.close(websocket::close_code::normal, ec); + } + } + catch(std::exception const& e) + { + std::cerr << "FATAL: " << e.what() << std::endl; + return 1; + } +} + diff --git a/include/boost/beast/websocket/detail/impl_base.hpp b/include/boost/beast/websocket/detail/impl_base.hpp index 62aa1675d7..bc390d3f97 100644 --- a/include/boost/beast/websocket/detail/impl_base.hpp +++ b/include/boost/beast/websocket/detail/impl_base.hpp @@ -49,6 +49,11 @@ struct impl_base zlib::deflate_stream zo; zlib::inflate_stream zi; + + // Number of bytes of the RFC7692 end-of-message tail + // (0x00, 0x00, 0xff, 0xff) already consumed by inflate + // for the current message. Range: 0..4 + unsigned rd_tail_in = 0; }; std::unique_ptr pmd_; // pmd settings or nullptr @@ -70,6 +75,8 @@ struct impl_base if(pmd_) { pmd_->rd_set = rsv1; + // Reset tail tracker at the start of a new data message + pmd_->rd_tail_in = 0; return true; } return ! rsv1; // pmd not negotiated @@ -170,6 +177,11 @@ struct impl_base error_code& ec) { pmd_->zi.write(zs, flush, ec); + // zlib::error::need_buffers is non-fatal and indicates that + // more input and/or output space is needed. Do not propagate it + // to callers, since it is part of normal streaming operation. + if(ec == zlib::error::need_buffers) + ec = {}; } void @@ -182,6 +194,9 @@ struct impl_base { pmd_->zi.clear(); } + // At a message boundary, clear the tail tracker regardless + if(pmd_) + pmd_->rd_tail_in = 0; } template @@ -307,6 +322,22 @@ struct impl_base return n_bytes >= pmd_opts_.msg_size_threshold; } + // Return number of RFC7692 tail bytes already consumed (0..4) + unsigned rd_tail_in() const noexcept + { + return pmd_ ? pmd_->rd_tail_in : 0; + } + + // Advance tail consumption counter by n (saturating at 4) + void rd_tail_in_add(unsigned n) noexcept + { + if(!pmd_) return; + unsigned v = pmd_->rd_tail_in; + v += n; + if(v > 4) v = 4; + pmd_->rd_tail_in = v; + } + std::size_t read_size_hint_pmd( std::size_t initial_size, @@ -461,6 +492,10 @@ struct impl_base return false; } + // PMD tail tracking stubs (no-op when deflate unsupported) + unsigned rd_tail_in() const noexcept { return 0; } + void rd_tail_in_add(unsigned) noexcept {} + std::size_t read_size_hint_pmd( std::size_t initial_size, diff --git a/include/boost/beast/websocket/impl/read.hpp b/include/boost/beast/websocket/impl/read.hpp index 037ad609c4..df3b38dd66 100644 --- a/include/boost/beast/websocket/impl/read.hpp +++ b/include/boost/beast/websocket/impl/read.hpp @@ -578,11 +578,21 @@ class stream::read_some_op } else if(impl.rd_fh.fin) { - // append the empty block codes + // Append the RFC7692 tail (0x00,0x00,0xff,0xff), + // honoring any partial consumption from previous calls. static std::uint8_t constexpr empty_block[4] = { 0x00, 0x00, 0xff, 0xff }; - zs.next_in = empty_block; - zs.avail_in = sizeof(empty_block); + auto consumed = impl.rd_tail_in(); + if(consumed < 4) + { + zs.next_in = empty_block + consumed; + zs.avail_in = sizeof(empty_block) - consumed; + } + else + { + zs.next_in = nullptr; + zs.avail_in = 0; + } fin = true; } else @@ -592,6 +602,8 @@ class stream::read_some_op impl.inflate(zs, zlib::Flush::sync, ec); if(impl.check_stop_now(ec)) goto upcall; + if(fin) + impl.rd_tail_in_add(static_cast(zs.total_in)); if(fin && zs.total_out == 0) { impl.do_context_takeover_read(impl.role); impl.rd_done = true; @@ -1344,11 +1356,21 @@ read_some( } else if(impl.rd_fh.fin) { - // append the empty block codes + // Append the RFC7692 tail (0x00,0x00,0xff,0xff), + // honoring any partial consumption from previous calls. static std::uint8_t constexpr empty_block[4] = { 0x00, 0x00, 0xff, 0xff }; - zs.next_in = empty_block; - zs.avail_in = sizeof(empty_block); + auto consumed = impl.rd_tail_in(); + if(consumed < 4) + { + zs.next_in = empty_block + consumed; + zs.avail_in = sizeof(empty_block) - consumed; + } + else + { + zs.next_in = nullptr; + zs.avail_in = 0; + } fin = true; } else @@ -1358,6 +1380,8 @@ read_some( impl.inflate(zs, zlib::Flush::sync, ec); if(impl.check_stop_now(ec)) return bytes_written; + if (fin) + impl.rd_tail_in_add(static_cast(zs.total_in)); if (fin && zs.total_out == 0) { impl.do_context_takeover_read(impl.role); impl.rd_done = true;