diff --git a/include/seastar/net/api.hh b/include/seastar/net/api.hh index 1fa06158c36..49592f4df7c 100644 --- a/include/seastar/net/api.hh +++ b/include/seastar/net/api.hh @@ -129,6 +129,22 @@ public: void close(); }; +class input_buffer_factory { +public: + using buffer_t = temporary_buffer; + + virtual ~input_buffer_factory() = default; + /// Provide a rx buffer. Implementation is responsible for determining its size + /// and memory. This is useful when a network stack implementation does not put + /// extra requirements on these factors. The POSIX stack is the example here. + /// \param allocator Memory allocator \c connected_socket implementation prefers. + /// Maybe nullptr. + virtual buffer_t create(compat::polymorphic_allocator* allocator) = 0; + + // Give back to the factory unused part of a buffer obtained from it + virtual void return_unused(buffer_t&& buf) = 0; +}; + } /* namespace net */ /// \addtogroup networking-module @@ -156,7 +172,10 @@ public: /// Gets the input stream. /// /// Gets an object returning data sent from the remote endpoint. - input_stream input(); + /// \param ibf_hint optional factory of rx buffers. The decision + /// whether to use the factory is opt to an implementation of \c + /// connected_socket. + input_stream input(net::input_buffer_factory* ibf_hint = nullptr); /// Gets the output stream. /// /// Gets an object that sends data to the remote endpoint. diff --git a/include/seastar/net/posix-stack.hh b/include/seastar/net/posix-stack.hh index 803f6db6266..f47db4f0a03 100644 --- a/include/seastar/net/posix-stack.hh +++ b/include/seastar/net/posix-stack.hh @@ -106,11 +106,10 @@ class posix_data_source_impl final : public data_source_impl { compat::polymorphic_allocator* _buffer_allocator; lw_shared_ptr _fd; temporary_buffer _buf; - size_t _buf_size; + net::input_buffer_factory* _buffer_factory; public: - explicit posix_data_source_impl(lw_shared_ptr fd, compat::polymorphic_allocator* allocator=memory::malloc_allocator, - size_t buf_size = 8192) : _buffer_allocator(allocator), _fd(std::move(fd)), - _buf(make_temporary_buffer(_buffer_allocator, buf_size)), _buf_size(buf_size) {} + explicit posix_data_source_impl(lw_shared_ptr fd, net::input_buffer_factory* ibf, compat::polymorphic_allocator* allocator=memory::malloc_allocator) + : _buffer_allocator(allocator), _fd(std::move(fd)), _buffer_factory(ibf) {} future> get() override; future<> close() override; }; diff --git a/include/seastar/net/stack.hh b/include/seastar/net/stack.hh index d0d4a0a097c..e980cf3c531 100644 --- a/include/seastar/net/stack.hh +++ b/include/seastar/net/stack.hh @@ -32,7 +32,7 @@ namespace net { class connected_socket_impl { public: virtual ~connected_socket_impl() {} - virtual data_source source() = 0; + virtual data_source source(input_buffer_factory* ibf = nullptr) = 0; virtual data_sink sink() = 0; virtual void shutdown_input() = 0; virtual void shutdown_output() = 0; diff --git a/src/net/native-stack-impl.hh b/src/net/native-stack-impl.hh index 4be2ca08f6a..dd0c7d019c7 100644 --- a/src/net/native-stack-impl.hh +++ b/src/net/native-stack-impl.hh @@ -86,7 +86,7 @@ class native_connected_socket_impl : public connected_socket_impl { public: explicit native_connected_socket_impl(lw_shared_ptr conn) : _conn(std::move(conn)) {} - virtual data_source source() override; + virtual data_source source(net::input_buffer_factory*) override; virtual data_sink sink() override; virtual void shutdown_input() override; virtual void shutdown_output() override; @@ -180,7 +180,7 @@ public: }; template -data_source native_connected_socket_impl::source() { +data_source native_connected_socket_impl::source(net::input_buffer_factory*) { return data_source(std::make_unique(_conn)); } diff --git a/src/net/posix-stack.cc b/src/net/posix-stack.cc index 758deb0b32b..2c27ea9e062 100644 --- a/src/net/posix-stack.cc +++ b/src/net/posix-stack.cc @@ -116,8 +116,18 @@ class posix_connected_socket_impl final : public connected_socket_impl, posix_co explicit posix_connected_socket_impl(lw_shared_ptr fd, conntrack::handle&& handle, compat::polymorphic_allocator* allocator=memory::malloc_allocator) : _fd(std::move(fd)), _handle(std::move(handle)), _allocator(allocator) {} public: - virtual data_source source() override { - return data_source(std::make_unique< posix_data_source_impl>(_fd, _allocator)); + virtual data_source source(net::input_buffer_factory* ibf) override { + if (!ibf) { + static struct final : input_buffer_factory { + buffer_t create(compat::polymorphic_allocator* const allocator) override { + return make_temporary_buffer(allocator, 8192); + } + void return_unused(buffer_t&&) override { + } + } default_posix_inbuf_factory{}; + ibf = &default_posix_inbuf_factory; + } + return data_source(std::make_unique(_fd, ibf, _allocator)); } virtual data_sink sink() override { return data_sink(std::make_unique< posix_data_sink_impl>(_fd)); @@ -317,11 +327,13 @@ posix_ap_server_socket_impl::move_connected_socket(socket_address sa, future> posix_data_source_impl::get() { - return _fd->read_some(_buf.get_write(), _buf_size).then([this] (size_t size) { - _buf.trim(size); - auto ret = std::move(_buf); - _buf = make_temporary_buffer(_buffer_allocator, _buf_size); - return make_ready_future>(std::move(ret)); + _buf = _buffer_factory->create(_buffer_allocator); + return _fd->read_some(_buf.get_write(), _buf.size()).then([this] (size_t size) { + if (size < _buf.size()) { + _buffer_factory->return_unused(_buf.share(size, _buf.size() - size)); + _buf.trim(size); + } + return make_ready_future>(std::move(_buf)); }); } diff --git a/src/net/stack.cc b/src/net/stack.cc index 4956b02eb9a..8111de56c7a 100644 --- a/src/net/stack.cc +++ b/src/net/stack.cc @@ -84,8 +84,8 @@ connected_socket& connected_socket::operator=(connected_socket&& cs) noexcept = connected_socket::~connected_socket() {} -input_stream connected_socket::input() { - return input_stream(_csi->source()); +input_stream connected_socket::input(net::input_buffer_factory* const ise) { + return input_stream(_csi->source(ise)); } output_stream connected_socket::output(size_t buffer_size) { diff --git a/src/net/tls.cc b/src/net/tls.cc index fd40900459c..c2c5316292c 100644 --- a/src/net/tls.cc +++ b/src/net/tls.cc @@ -1050,7 +1050,7 @@ class tls_connected_socket_impl : public net::connected_socket_impl, public sess class source_impl; class sink_impl; - data_source source() override; + data_source source(net::input_buffer_factory*) override; data_sink sink() override; void shutdown_input() override { @@ -1160,7 +1160,7 @@ class tls_socket_impl : public net::socket_impl { } -data_source tls::tls_connected_socket_impl::source() { +data_source tls::tls_connected_socket_impl::source(net::input_buffer_factory*) { return data_source(std::make_unique(_session)); }