diff --git a/Cargo.lock b/Cargo.lock index 3e0e813f2..69f38cec7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1691,9 +1691,9 @@ dependencies = [ [[package]] name = "compio" -version = "0.17.0" +version = "0.18.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a8a7cc183295c36483f1c9616f43c4ac1a9030ce6d9321d6cebb4c4bb21164c4" +checksum = "9b84ee96a86948d04388f3a0b8c36b9f0a6b40b3528ac0d65737e53632fb37fe" dependencies = [ "compio-buf", "compio-driver", @@ -1710,9 +1710,9 @@ dependencies = [ [[package]] name = "compio-buf" -version = "0.7.2" +version = "0.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5ebb4036bf394915196c09362e4fd5581ee8bf0f3302ab598bff9d646aea2061" +checksum = "3e8777c3ad31ab42f8a3a4a1bd629b78f688371df9b0f528d94dfbdbe5c945c9" dependencies = [ "arrayvec", "bytes", @@ -1721,33 +1721,37 @@ dependencies = [ [[package]] name = "compio-driver" -version = "0.10.0" +version = "0.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ff5c12800e82a01d12046ccc29b014e1cbbb2fbe38c52534e0d40d4fc58881d5" +checksum = "042d449def75fb78af58e53e865dd1343c36255294466e0abd5464b70a525be4" dependencies = [ "cfg-if", "cfg_aliases", "compio-buf", "compio-log", "crossbeam-queue", - "flume 0.11.1", + "flume 0.12.0", "futures-util", "io-uring", "io_uring_buf_ring", "libc", "once_cell", "paste", + "pin-project-lite", "polling", "slab", + "smallvec", "socket2 0.6.2", + "synchrony", + "thin-cell", "windows-sys 0.61.2", ] [[package]] name = "compio-fs" -version = "0.10.0" +version = "0.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c568022f90c2e2e8ea7ff4c4e8fde500753b5b9b6b6d870e25b5e656f9ea2892" +checksum = "65ee36e1acf2cec4835efe9a986c012b2462c5ef53580e4ee84ae6d5a3d8e3b3" dependencies = [ "cfg-if", "cfg_aliases", @@ -1757,19 +1761,21 @@ dependencies = [ "compio-runtime", "libc", "os_pipe", + "pin-project-lite", "widestring", "windows-sys 0.61.2", ] [[package]] name = "compio-io" -version = "0.8.4" +version = "0.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d1e64c6d723589492a4f5041394301e9903466a606f6d9bcc11e406f9f07e9ec" +checksum = "b914ea4883d9a5b44b328c04e4d23011f043228b0282d1e4b9100ce6507594cc" dependencies = [ "compio-buf", "futures-util", "paste", + "synchrony", ] [[package]] @@ -1795,9 +1801,9 @@ dependencies = [ [[package]] name = "compio-net" -version = "0.10.0" +version = "0.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bffab78b8a876111ca76450912ca6a5a164b0dd93973e342c5f438a6f478c735" +checksum = "14cdcd182c89864d05e522ea4f7a4a662675a636ac6e8c129565325e445032e6" dependencies = [ "cfg-if", "compio-buf", @@ -1814,9 +1820,9 @@ dependencies = [ [[package]] name = "compio-quic" -version = "0.6.0" +version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "53e101b05fe8608ce6fb2882ac331e211f2b0318449ae27c576c7456b4f1ec4e" +checksum = "256df80066ad4901c54a3d3e495df4e10384cb911b3e98d61f8275aba48321f9" dependencies = [ "cfg_aliases", "compio-buf", @@ -1824,21 +1830,22 @@ dependencies = [ "compio-log", "compio-net", "compio-runtime", - "flume 0.11.1", + "flume 0.12.0", "futures-util", "libc", "quinn-proto", "rustc-hash", "rustls", + "synchrony", "thiserror 2.0.18", "windows-sys 0.61.2", ] [[package]] name = "compio-runtime" -version = "0.10.1" +version = "0.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "83fd890a129a8086af857bbe18401689c130aa6ccfc7f3c029a7800f7256af3e" +checksum = "d6c1c71f011bdd9c8f30e97d877b606505ee6d241c7782cfaed172f66acbd9cd" dependencies = [ "async-task", "cfg-if", @@ -1859,9 +1866,9 @@ dependencies = [ [[package]] name = "compio-tls" -version = "0.8.0" +version = "0.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "84cd9ca48815f384f1a30400848beebcd8c7ead2f57bfe28ebc5560babea88ec" +checksum = "4e462f3f836226cc293795c87d8e7df783ca7f88811e433ee79a9a2eace0b253" dependencies = [ "compio-buf", "compio-io", @@ -1872,9 +1879,9 @@ dependencies = [ [[package]] name = "compio-ws" -version = "0.2.0" +version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bf7281a15e8f638697415f9838030e41a92c8a8954ddccfc46556a413c16dd9a" +checksum = "32b0174d0a3da33ac73efddbe62a3fb046a9bc3a58124b2f8c1d2e0354e54222" dependencies = [ "compio-buf", "compio-io", @@ -2335,9 +2342,8 @@ dependencies = [ [[package]] name = "cyper" -version = "0.7.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7442418b16e89f9c04b91ea2d9e8bfd55529d6767e656d4221cfbca67a07ba61" +version = "0.8.0" +source = "git+https://github.com/compio-rs/cyper.git#b87baee246989d636a79a91c1a73ee7ac45776d9" dependencies = [ "async-stream", "base64 0.22.1", @@ -2362,9 +2368,8 @@ dependencies = [ [[package]] name = "cyper-axum" -version = "0.7.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "343e0d0b2bbf5daacf8c7ddd42fca5816ced4b421485be3fbe6fd226a5728101" +version = "0.8.0" +source = "git+https://github.com/compio-rs/cyper.git#b87baee246989d636a79a91c1a73ee7ac45776d9" dependencies = [ "axum", "axum-core", @@ -2383,9 +2388,8 @@ dependencies = [ [[package]] name = "cyper-core" -version = "0.7.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7f4b86aa741e422dab7f730aa1ec5ab6bc26569e577fe2b8fe0ebf6d779b2325" +version = "0.8.0" +source = "git+https://github.com/compio-rs/cyper.git#b87baee246989d636a79a91c1a73ee7ac45776d9" dependencies = [ "compio", "futures-util", @@ -4197,7 +4201,7 @@ dependencies = [ "libc", "percent-encoding", "pin-project-lite", - "socket2 0.5.10", + "socket2 0.6.2", "tokio", "tower-service", "tracing", @@ -4655,9 +4659,6 @@ dependencies = [ "clap", "comfy-table", "compio", - "compio-quic", - "compio-tls", - "compio-ws", "crossbeam", "derive_more", "err_trail", @@ -7114,7 +7115,7 @@ dependencies = [ "quinn-udp", "rustc-hash", "rustls", - "socket2 0.5.10", + "socket2 0.6.2", "thiserror 2.0.18", "tokio", "tracing", @@ -7153,7 +7154,7 @@ dependencies = [ "cfg_aliases", "libc", "once_cell", - "socket2 0.5.10", + "socket2 0.6.2", "tracing", "windows-sys 0.60.2", ] @@ -8263,10 +8264,6 @@ dependencies = [ "chrono", "clap", "compio", - "compio-net", - "compio-quic", - "compio-tls", - "compio-ws", "configs", "ctrlc", "cyper", @@ -8877,6 +8874,15 @@ dependencies = [ "futures-core", ] +[[package]] +name = "synchrony" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "de0208d3660701622272151bc63c35f5d32ca3d45c19785a9a8dc04dc797dc43" +dependencies = [ + "futures-util", +] + [[package]] name = "synstructure" version = "0.13.2" @@ -9084,6 +9090,12 @@ dependencies = [ "unicode-width 0.2.2", ] +[[package]] +name = "thin-cell" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4164c6c316ba9733b0ab021e7f9852c788a4b991b49c25820f1be48e1d41345b" + [[package]] name = "thiserror" version = "1.0.69" diff --git a/Cargo.toml b/Cargo.toml index b8ea12417..31b1b4407 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -98,26 +98,27 @@ chrono = { version = "0.4.43", features = ["serde"] } clap = { version = "4.5.54", features = ["derive", "wrap_help"] } colored = "3.1.1" comfy-table = "7.2.2" -compio = { version = "0.17.0", features = [ +compio = { version = "0.18.0", features = [ "runtime", "macros", "io-uring", "time", "rustls", + "ring", + "net", + "quic", + "tls", + "ws", + "fs", ] } -compio-io = "0.8.4" -compio-net = "0.10.0" -compio-quic = "0.6.0" -compio-tls = { version = "0.8.0", features = [ - "rustls", -], default-features = false } -compio-ws = "0.2.0" configs = { path = "core/configs", version = "0.1.0" } configs_derive = { path = "core/configs_derive", version = "0.1.0" } console-subscriber = "0.5.0" crossbeam = "0.8.4" -cyper = { version = "0.7.1", features = ["rustls"], default-features = false } -cyper-axum = { version = "0.7.1" } +cyper = { git = "https://github.com/compio-rs/cyper.git", features = [ + "rustls", +], default-features = false } +cyper-axum = { git = "https://github.com/compio-rs/cyper.git" } darling = "0.20" dashmap = "6.1.0" derive-new = "0.7.0" diff --git a/DEPENDENCIES.md b/DEPENDENCIES.md index 306f3bdb7..f460bb7f6 100644 --- a/DEPENDENCIES.md +++ b/DEPENDENCIES.md @@ -143,18 +143,18 @@ colorchoice: 1.0.4, "Apache-2.0 OR MIT", colored: 3.1.1, "MPL-2.0", combine: 4.6.7, "MIT", comfy-table: 7.2.2, "MIT", -compio: 0.17.0, "MIT", -compio-buf: 0.7.2, "MIT", -compio-driver: 0.10.0, "MIT", -compio-fs: 0.10.0, "MIT", -compio-io: 0.8.4, "MIT", +compio: 0.18.0, "MIT", +compio-buf: 0.8.0, "MIT", +compio-driver: 0.11.1, "MIT", +compio-fs: 0.11.0, "MIT", +compio-io: 0.9.0, "MIT", compio-log: 0.1.0, "MIT", compio-macros: 0.1.2, "MIT", -compio-net: 0.10.0, "MIT", -compio-quic: 0.6.0, "MIT", -compio-runtime: 0.10.1, "MIT", -compio-tls: 0.8.0, "MIT", -compio-ws: 0.2.0, "MIT", +compio-net: 0.11.0, "MIT", +compio-quic: 0.7.0, "MIT", +compio-runtime: 0.11.0, "MIT", +compio-tls: 0.9.0, "MIT", +compio-ws: 0.3.0, "MIT", compression-codecs: 0.4.36, "Apache-2.0 OR MIT", compression-core: 0.4.31, "Apache-2.0 OR MIT", concurrent-queue: 2.5.0, "Apache-2.0 OR MIT", @@ -199,9 +199,9 @@ cucumber-codegen: 0.22.1, "Apache-2.0 OR MIT", cucumber-expressions: 0.5.0, "Apache-2.0 OR MIT", curve25519-dalek: 4.1.3, "BSD-3-Clause", curve25519-dalek-derive: 0.1.1, "Apache-2.0 OR MIT", -cyper: 0.7.1, "MIT", -cyper-axum: 0.7.1, "MIT", -cyper-core: 0.7.1, "MIT", +cyper: 0.8.0, "MIT", +cyper-axum: 0.8.0, "MIT", +cyper-core: 0.8.0, "MIT", darling: 0.20.11, "MIT", darling: 0.21.3, "MIT", darling: 0.23.0, "MIT", @@ -763,6 +763,7 @@ subtle: 2.6.1, "BSD-3-Clause", syn: 1.0.109, "Apache-2.0 OR MIT", syn: 2.0.114, "Apache-2.0 OR MIT", sync_wrapper: 1.0.2, "Apache-2.0", +synchrony: 0.1.1, "MIT", synstructure: 0.13.2, "MIT", synthez: 0.4.0, "BlueOak-1.0.0", synthez-codegen: 0.4.0, "BlueOak-1.0.0", @@ -781,6 +782,7 @@ test-case-macros: 3.3.1, "MIT", testcontainers: 0.26.3, "Apache-2.0 OR MIT", testcontainers-modules: 0.14.0, "MIT", textwrap: 0.16.2, "MIT", +thin-cell: 0.1.2, "MIT", thiserror: 1.0.69, "Apache-2.0 OR MIT", thiserror: 2.0.18, "Apache-2.0 OR MIT", thiserror-impl: 1.0.69, "Apache-2.0 OR MIT", diff --git a/core/common/Cargo.toml b/core/common/Cargo.toml index 74aea461b..ceed26159 100644 --- a/core/common/Cargo.toml +++ b/core/common/Cargo.toml @@ -45,9 +45,6 @@ chrono = { workspace = true } clap = { workspace = true } comfy-table = { workspace = true } compio = { workspace = true } -compio-quic = { workspace = true } -compio-tls = { workspace = true } -compio-ws = { workspace = true } crossbeam = { workspace = true } derive_more = { workspace = true } err_trail = { workspace = true } diff --git a/core/common/src/alloc/buffer.rs b/core/common/src/alloc/buffer.rs index 601fe6a86..3c059cba3 100644 --- a/core/common/src/alloc/buffer.rs +++ b/core/common/src/alloc/buffer.rs @@ -18,7 +18,8 @@ use super::memory_pool::{BytesMutExt, memory_pool}; use bytes::{Buf, BufMut, Bytes, BytesMut}; -use compio::buf::{IoBuf, IoBufMut, SetBufInit}; +use compio::buf::{IoBuf, IoBufMut, SetLen}; +use std::mem::MaybeUninit; use std::ops::{Deref, DerefMut}; /// A buffer wrapper that participates in memory pooling. @@ -267,7 +268,7 @@ impl Buf for PooledBuffer { } fn advance(&mut self, cnt: usize) { - self.inner.advance(cnt) + Buf::advance(&mut self.inner, cnt) } fn chunks_vectored<'t>(&'t self, dst: &mut [std::io::IoSlice<'t>]) -> usize { @@ -275,32 +276,22 @@ impl Buf for PooledBuffer { } } -impl SetBufInit for PooledBuffer { - unsafe fn set_buf_init(&mut self, len: usize) { - if self.inner.len() <= len { - unsafe { - self.inner.set_len(len); - } - } +impl SetLen for PooledBuffer { + unsafe fn set_len(&mut self, len: usize) { + unsafe { self.inner.set_len(len) }; } } -unsafe impl IoBufMut for PooledBuffer { - fn as_buf_mut_ptr(&mut self) -> *mut u8 { - self.inner.as_mut_ptr() +impl IoBuf for PooledBuffer { + fn as_init(&self) -> &[u8] { + &self.inner[..] } } -unsafe impl IoBuf for PooledBuffer { - fn as_buf_ptr(&self) -> *const u8 { - self.inner.as_buf_ptr() - } - - fn buf_len(&self) -> usize { - self.inner.len() - } - - fn buf_capacity(&self) -> usize { - self.inner.capacity() +impl IoBufMut for PooledBuffer { + fn as_uninit(&mut self) -> &mut [MaybeUninit] { + let ptr = self.inner.as_mut_ptr().cast::>(); + let cap = self.inner.capacity(); + unsafe { std::slice::from_raw_parts_mut(ptr, cap) } } } diff --git a/core/common/src/sender/mod.rs b/core/common/src/sender/mod.rs index 27c98d890..fe71b5ef8 100644 --- a/core/common/src/sender/mod.rs +++ b/core/common/src/sender/mod.rs @@ -34,8 +34,8 @@ use compio::BufResult; use compio::buf::IoBufMut; use compio::io::{AsyncReadExt, AsyncWriteExt}; use compio::net::TcpStream; -use compio_quic::{RecvStream, SendStream}; -use compio_tls::TlsStream; +use compio::quic::{RecvStream, SendStream}; +use compio::tls::TlsStream; use std::future::Future; use std::os::fd::{AsFd, OwnedFd}; use tracing::{debug, error}; diff --git a/core/common/src/sender/quic_sender.rs b/core/common/src/sender/quic_sender.rs index 025cec141..a8b86c378 100644 --- a/core/common/src/sender/quic_sender.rs +++ b/core/common/src/sender/quic_sender.rs @@ -20,8 +20,8 @@ use super::{PooledBuffer, Sender}; use crate::IggyError; use compio::BufResult; use compio::buf::IoBufMut; -use compio::io::AsyncReadExt; -use compio_quic::{ClosedStream, RecvStream, SendStream, WriteError}; +use compio::io::{AsyncReadExt, AsyncWriteExt}; +use compio::quic::{ClosedStream, RecvStream, SendStream}; use err_trail::ErrContext; use tracing::{debug, error}; @@ -73,10 +73,9 @@ impl Sender for QuicSender { debug!("Sending vectored response with status: {:?}...", STATUS_OK); let headers = [STATUS_OK, length].concat(); - self.send - .write_all(&headers) - .await - .error(|e: &WriteError| { + let BufResult(result, _) = self.send.write_all(headers).await; + result + .error(|e: &std::io::Error| { format!("{COMPONENT} (error: {e}) - failed to write headers to stream") }) .map_err(|_| IggyError::QuicError)?; @@ -84,17 +83,16 @@ impl Sender for QuicSender { let mut total_bytes_written = 0; for slice in slices { - let slice_data = &*slice; - if !slice_data.is_empty() { - self.send - .write_all(slice_data) - .await - .error(|e: &WriteError| { + let slice_len = slice.len(); + if slice_len > 0 { + let BufResult(result, _) = self.send.write_all(slice).await; + result + .error(|e: &std::io::Error| { format!("{COMPONENT} (error: {e}) - failed to write slice to stream") }) .map_err(|_| IggyError::QuicError)?; - total_bytes_written += slice_data.len(); + total_bytes_written += slice_len; } } @@ -123,10 +121,10 @@ impl QuicSender { status ); let length = (payload.len() as u32).to_le_bytes(); - self.send - .write_all(&[status, &length, payload].as_slice().concat()) - .await - .error(|e: &WriteError| { + let data = [status, &length, payload].concat(); + let BufResult(result, _) = self.send.write_all(data).await; + result + .error(|e: &std::io::Error| { format!("{COMPONENT} (error: {e}) - failed to write buffer to the stream") }) .map_err(|_| IggyError::QuicError)?; diff --git a/core/common/src/sender/tcp_tls_sender.rs b/core/common/src/sender/tcp_tls_sender.rs index 9522efe6d..81c7b287e 100644 --- a/core/common/src/sender/tcp_tls_sender.rs +++ b/core/common/src/sender/tcp_tls_sender.rs @@ -21,7 +21,7 @@ use crate::IggyError; use compio::buf::IoBufMut; use compio::io::AsyncWrite; use compio::net::TcpStream; -use compio_tls::TlsStream; +use compio::tls::TlsStream; use err_trail::ErrContext; const COMPONENT: &str = "TCP"; diff --git a/core/common/src/sender/websocket_sender.rs b/core/common/src/sender/websocket_sender.rs index a65cdb9c0..3af20df1b 100644 --- a/core/common/src/sender/websocket_sender.rs +++ b/core/common/src/sender/websocket_sender.rs @@ -22,8 +22,8 @@ use crate::alloc::buffer::PooledBuffer; use bytes::{BufMut, BytesMut}; use compio::buf::IoBufMut; use compio::net::TcpStream; -use compio_ws::WebSocketStream; -use compio_ws::tungstenite::{Error as TungsteniteError, Message}; +use compio::ws::WebSocketStream; +use compio::ws::tungstenite::{Error as TungsteniteError, Message}; use std::ptr; use tracing::{debug, warn}; @@ -107,8 +107,12 @@ impl Sender for WebSocketSender { let data_to_copy = self.read_buffer.split_to(required_len); unsafe { - ptr::copy_nonoverlapping(data_to_copy.as_ptr(), buffer.as_buf_mut_ptr(), required_len); - buffer.set_buf_init(required_len); + ptr::copy_nonoverlapping( + data_to_copy.as_ptr(), + buffer.buf_mut_ptr().cast::(), + required_len, + ); + buffer.set_len(required_len); } (Ok(()), buffer) diff --git a/core/common/src/sender/websocket_tls_sender.rs b/core/common/src/sender/websocket_tls_sender.rs index 834fa7dcf..0946a27c8 100644 --- a/core/common/src/sender/websocket_tls_sender.rs +++ b/core/common/src/sender/websocket_tls_sender.rs @@ -22,9 +22,9 @@ use crate::alloc::buffer::PooledBuffer; use bytes::{BufMut, BytesMut}; use compio::buf::IoBufMut; use compio::net::TcpStream; -use compio_tls::TlsStream; -use compio_ws::WebSocketStream; -use compio_ws::tungstenite::{Error as TungsteniteError, Message}; +use compio::tls::TlsStream; +use compio::ws::WebSocketStream; +use compio::ws::tungstenite::{Error as TungsteniteError, Message}; use std::ptr; use tracing::debug; @@ -95,8 +95,12 @@ impl Sender for WebSocketTlsSender { let data_to_copy = self.read_buffer.split_to(required_len); unsafe { - ptr::copy_nonoverlapping(data_to_copy.as_ptr(), buffer.as_buf_mut_ptr(), required_len); - buffer.set_buf_init(required_len); + ptr::copy_nonoverlapping( + data_to_copy.as_ptr(), + buffer.buf_mut_ptr().cast::(), + required_len, + ); + buffer.set_len(required_len); } (Ok(()), buffer) diff --git a/core/integration/src/test_server.rs b/core/integration/src/test_server.rs index a58ad85a3..702924423 100644 --- a/core/integration/src/test_server.rs +++ b/core/integration/src/test_server.rs @@ -295,7 +295,7 @@ impl TestServer { .and_then(|p| fs::read_to_string(p).ok()) .unwrap_or_else(|| "[No stderr log]".to_string()); - eprintln!( + panic!( "\n\n=== SERVER CRASHED ===\n\ The iggy-server process (PID {}) has died unexpectedly!\n\ This usually indicates a bug in the server.\n\n\ @@ -303,7 +303,6 @@ impl TestServer { === STDERR ===\n{}\n", pid, stdout_content, stderr_content ); - std::process::abort(); } thread::sleep(CHECK_INTERVAL); diff --git a/core/server/Cargo.toml b/core/server/Cargo.toml index c2644498d..ff5ad4b4c 100644 --- a/core/server/Cargo.toml +++ b/core/server/Cargo.toml @@ -49,10 +49,6 @@ bytes = { workspace = true } chrono = { workspace = true } clap = { workspace = true } compio = { workspace = true } -compio-net = { workspace = true } -compio-quic = { workspace = true } -compio-tls = { workspace = true } -compio-ws = { workspace = true } configs = { workspace = true } ctrlc = { version = "3.5", features = ["termination"] } cyper = { workspace = true } diff --git a/core/server/src/http/http_server.rs b/core/server/src/http/http_server.rs index a59242810..b420be7d2 100644 --- a/core/server/src/http/http_server.rs +++ b/core/server/src/http/http_server.rs @@ -35,8 +35,7 @@ use axum::extract::connect_info::Connected; use axum::http::Method; use axum::{Router, middleware}; use axum_server::tls_rustls::RustlsConfig; -use compio::net::TcpListener; -use compio_net::TcpOpts; +use compio::net::{SocketOpts, TcpListener}; use err_trail::ErrContext; use iggy_common::IggyError; use iggy_common::TransportProtocol; @@ -136,8 +135,8 @@ pub async fn start_http_server( } if !config.tls.enabled { - let opts = TcpOpts::new().reuse_port(true).reuse_address(true); - let listener = TcpListener::bind_with_options(config.address.clone(), opts) + let opts = SocketOpts::new().reuse_port(true).reuse_address(true); + let listener = TcpListener::bind_with_options(config.address.clone(), &opts) .await .unwrap_or_else(|_| panic!("Failed to bind to HTTP address {}", config.address)); let address = listener diff --git a/core/server/src/quic/listener.rs b/core/server/src/quic/listener.rs index 69df4dadf..b9d14a9a8 100644 --- a/core/server/src/quic/listener.rs +++ b/core/server/src/quic/listener.rs @@ -22,7 +22,8 @@ use crate::shard::IggyShard; use crate::shard::task_registry::ShutdownToken; use crate::streaming::session::Session; use anyhow::anyhow; -use compio_quic::{Connection, Endpoint, RecvStream, SendStream}; +use compio::io::AsyncReadExt; +use compio::quic::{Connection, Endpoint, RecvStream, SendStream}; use futures::FutureExt; use iggy_common::{GET_CLUSTER_METADATA_CODE, IggyError, SenderKind, TransportProtocol}; use std::rc::Rc; @@ -145,7 +146,7 @@ async fn accept_stream( _client_id: u32, ) -> Result, ConnectionError> { match connection.accept_bi().await { - Err(compio_quic::ConnectionError::ApplicationClosed { .. }) => { + Err(compio::quic::ConnectionError::ApplicationClosed { .. }) => { info!("Connection closed"); Ok(None) } @@ -164,11 +165,13 @@ async fn handle_stream( ) -> anyhow::Result<()> { let (send_stream, mut recv_stream) = stream; - let mut length_buffer = [0u8; INITIAL_BYTES_LENGTH]; - let mut code_buffer = [0u8; INITIAL_BYTES_LENGTH]; + let length_buffer = [0u8; INITIAL_BYTES_LENGTH]; + let code_buffer = [0u8; INITIAL_BYTES_LENGTH]; - recv_stream.read_exact(&mut length_buffer[..]).await?; - recv_stream.read_exact(&mut code_buffer[..]).await?; + let compio::BufResult(result, length_buffer) = recv_stream.read_exact(length_buffer).await; + result?; + let compio::BufResult(result, code_buffer) = recv_stream.read_exact(code_buffer).await; + result?; let length = u32::from_le_bytes(length_buffer); let code = u32::from_le_bytes(code_buffer); diff --git a/core/server/src/quic/quic_server.rs b/core/server/src/quic/quic_server.rs index daed5638c..51c946299 100644 --- a/core/server/src/quic/quic_server.rs +++ b/core/server/src/quic/quic_server.rs @@ -23,7 +23,7 @@ use crate::shard::IggyShard; use crate::shard::task_registry::ShutdownToken; use crate::shard::transmission::event::ShardEvent; use anyhow::Result; -use compio_quic::{ +use compio::quic::{ Endpoint, EndpointConfig, IdleTimeout, ServerBuilder, ServerConfig, TransportConfig, VarInt, }; use err_trail::ErrContext; @@ -97,7 +97,7 @@ pub async fn spawn_quic_server( })?; let std_socket: std::net::UdpSocket = socket.into(); - let socket = compio_net::UdpSocket::from_std(std_socket).map_err(|e| { + let socket = compio::net::UdpSocket::from_std(std_socket).map_err(|e| { error!("Failed to convert std socket to compio socket: {:?}", e); iggy_common::IggyError::QuicError })?; diff --git a/core/server/src/server_error.rs b/core/server/src/server_error.rs index 2be3135ab..74dd71118 100644 --- a/core/server/src/server_error.rs +++ b/core/server/src/server_error.rs @@ -17,7 +17,7 @@ * under the License. */ -use compio_quic::{ConnectionError as QuicConnectionError, ReadError, WriteError}; +use compio::quic::{ConnectionError as QuicConnectionError, ReadError, WriteError}; use error_set::error_set; use std::array::TryFromSliceError; use std::io; diff --git a/core/server/src/shard/handlers.rs b/core/server/src/shard/handlers.rs index 53756d5f8..bad3cec05 100644 --- a/core/server/src/shard/handlers.rs +++ b/core/server/src/shard/handlers.rs @@ -31,7 +31,7 @@ use crate::{ tcp_listener::cleanup_connection, }, }; -use compio_net::TcpStream; +use compio::net::TcpStream; use iggy_common::sharding::IggyNamespace; use iggy_common::{Identifier, IggyError, SenderKind, TransportProtocol}; use nix::sys::stat::SFlag; diff --git a/core/server/src/tcp/tcp_listener.rs b/core/server/src/tcp/tcp_listener.rs index d1bdc6a7c..9a514aa20 100644 --- a/core/server/src/tcp/tcp_listener.rs +++ b/core/server/src/tcp/tcp_listener.rs @@ -22,7 +22,7 @@ use crate::shard::IggyShard; use crate::shard::task_registry::{ShutdownToken, TaskRegistry}; use crate::shard::transmission::event::ShardEvent; use crate::tcp::connection_handler::{ConnectionAction, handle_connection, handle_error}; -use compio::net::{TcpListener, TcpOpts}; +use compio::net::{SocketOpts, TcpListener}; use err_trail::ErrContext; use futures::FutureExt; use iggy_common::{IggyError, SenderKind, TransportProtocol}; @@ -37,7 +37,7 @@ async fn create_listener( ) -> Result { // Required by the thread-per-core model... // We create bunch of sockets on different threads, that bind to exactly the same address and port. - let opts = TcpOpts::new().reuse_port(true).reuse_port(true); + let opts = SocketOpts::new().reuse_port(true).reuse_port(true); let opts = if config.override_defaults { let recv_buffer_size = config .recv_buffer_size @@ -59,7 +59,7 @@ async fn create_listener( } else { opts }; - TcpListener::bind_with_options(addr, opts).await + TcpListener::bind_with_options(addr, &opts).await } pub async fn start( diff --git a/core/server/src/tcp/tcp_tls_listener.rs b/core/server/src/tcp/tcp_tls_listener.rs index 381a57c99..c0230774e 100644 --- a/core/server/src/tcp/tcp_tls_listener.rs +++ b/core/server/src/tcp/tcp_tls_listener.rs @@ -21,8 +21,8 @@ use crate::shard::IggyShard; use crate::shard::task_registry::ShutdownToken; use crate::shard::transmission::event::ShardEvent; use crate::tcp::connection_handler::{handle_connection, handle_error}; -use compio::net::{TcpListener, TcpOpts}; -use compio_tls::TlsAcceptor; +use compio::net::{SocketOpts, TcpListener}; +use compio::tls::TlsAcceptor; use err_trail::ErrContext; use futures::FutureExt; use iggy_common::{IggyError, SenderKind, TransportProtocol}; @@ -127,7 +127,7 @@ async fn create_listener( ) -> Result { // Required by the thread-per-core model... // We create bunch of sockets on different threads, that bind to exactly the same address and port. - let opts = TcpOpts::new().reuse_port(true); + let opts = SocketOpts::new().reuse_port(true); let opts = if config.override_defaults { let recv_buffer_size = config .recv_buffer_size @@ -149,7 +149,7 @@ async fn create_listener( } else { opts }; - TcpListener::bind_with_options(addr, opts).await + TcpListener::bind_with_options(addr, &opts).await } async fn accept_loop( diff --git a/core/server/src/websocket/websocket_listener.rs b/core/server/src/websocket/websocket_listener.rs index 94f70a29c..045f15c03 100644 --- a/core/server/src/websocket/websocket_listener.rs +++ b/core/server/src/websocket/websocket_listener.rs @@ -21,9 +21,8 @@ use crate::shard::IggyShard; use crate::shard::task_registry::ShutdownToken; use crate::shard::transmission::event::ShardEvent; use crate::websocket::connection_handler::{handle_connection, handle_error}; -use compio::net::TcpListener; -use compio_net::TcpOpts; -use compio_ws::accept_async_with_config; +use compio::net::{SocketOpts, TcpListener}; +use compio::ws::accept_async_with_config; use err_trail::ErrContext; use futures::FutureExt; use iggy_common::TransportProtocol; @@ -35,8 +34,8 @@ use tracing::{debug, error, info}; async fn create_listener(addr: SocketAddr) -> Result { // Required by the thread-per-core model... // We create bunch of sockets on different threads, that bind to exactly the same address and port. - let opts = TcpOpts::new().reuse_port(true).reuse_address(true); - TcpListener::bind_with_options(addr, opts).await + let opts = SocketOpts::new().reuse_port(true).reuse_address(true); + TcpListener::bind_with_options(addr, &opts).await } pub async fn start( diff --git a/core/server/src/websocket/websocket_tls_listener.rs b/core/server/src/websocket/websocket_tls_listener.rs index 56d22a3f5..f6865a694 100644 --- a/core/server/src/websocket/websocket_tls_listener.rs +++ b/core/server/src/websocket/websocket_tls_listener.rs @@ -21,10 +21,9 @@ use crate::shard::IggyShard; use crate::shard::task_registry::ShutdownToken; use crate::shard::transmission::event::ShardEvent; use crate::websocket::connection_handler::{handle_connection, handle_error}; -use compio::net::TcpListener; -use compio_net::TcpOpts; -use compio_tls::TlsAcceptor; -use compio_ws::accept_async_with_config; +use compio::net::{SocketOpts, TcpListener}; +use compio::tls::TlsAcceptor; +use compio::ws::accept_async_with_config; use err_trail::ErrContext; use futures::FutureExt; use iggy_common::{IggyError, SenderKind, TransportProtocol, WebSocketTlsSender}; @@ -40,8 +39,8 @@ use tracing::{debug, error, info, trace, warn}; async fn create_listener(addr: SocketAddr) -> Result { // Required by the thread-per-core model... // We create bunch of sockets on different threads, that bind to exactly the same address and port. - let opts = TcpOpts::new().reuse_port(true).reuse_address(true); - TcpListener::bind_with_options(addr, opts).await + let opts = SocketOpts::new().reuse_port(true).reuse_address(true); + TcpListener::bind_with_options(addr, &opts).await } pub async fn start(