diff --git a/Cargo.lock b/Cargo.lock index 2caa0bad..5f365aef 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9,7 +9,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "94476c7ef97af4c4d998b3f422c1b01d5211aad57c80ed200baf148d1f1efab6" dependencies = [ "bit_field", - "bitflags", + "bitflags 2.9.1", "log", ] @@ -19,6 +19,17 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c1c330e503236d0b06386ae6cc42a513ef1ccc23c52b603c1b52f018564faf44" +[[package]] +name = "async-trait" +version = "0.1.88" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e539d3fca749fcee5236ab05e93a52867dd549cc157c8cb7f99595f3cedffdb5" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "atomic_unique_refcell" version = "0.1.0" @@ -35,6 +46,12 @@ version = "0.10.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dc827186963e592360843fb5ba4b973e145841266c1357f7180c43526f2e5b61" +[[package]] +name = "bitflags" +version = "1.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" + [[package]] name = "bitflags" version = "2.9.1" @@ -49,6 +66,18 @@ dependencies = [ "intrusive_list", ] +[[package]] +name = "byteorder" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b" + +[[package]] +name = "bytes" +version = "1.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d71b6127be86fdcfddb610f7182ac57211d4b18a3e9c82eb2d17662f2227ad6a" + [[package]] name = "cfg-if" version = "1.0.1" @@ -61,6 +90,47 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "790eea4361631c5e7d22598ecd5723ff611904e3344ce8720784c93e3d83d40b" +[[package]] +name = "defmt" +version = "0.3.100" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f0963443817029b2024136fc4dd07a5107eb8f977eaf18fcd1fdeb11306b64ad" +dependencies = [ + "defmt 1.0.1", +] + +[[package]] +name = "defmt" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "548d977b6da32fa1d1fda2876453da1e7df63ad0304c8b3dae4dbe7b96f39b78" +dependencies = [ + "bitflags 1.3.2", + "defmt-macros", +] + +[[package]] +name = "defmt-macros" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3d4fc12a85bcf441cfe44344c4b72d58493178ce635338a3f3b78943aceb258e" +dependencies = [ + "defmt-parser", + "proc-macro-error2", + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "defmt-parser" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "10d60334b3b2e7c9d91ef8150abfb6fa4c1c39ebbcf4a81c2e346aad939fee3e" +dependencies = [ + "thiserror", +] + [[package]] name = "either" version = "1.15.0" @@ -95,7 +165,7 @@ name = "eonix_hal" version = "0.1.0" dependencies = [ "acpi", - "bitflags", + "bitflags 2.9.1", "buddy_allocator", "cfg-if", "eonix_hal_macros", @@ -124,7 +194,7 @@ dependencies = [ name = "eonix_hal_traits" version = "0.1.0" dependencies = [ - "bitflags", + "bitflags 2.9.1", "eonix_mm", ] @@ -134,9 +204,11 @@ version = "0.1.0" dependencies = [ "acpi", "align_ext", + "async-trait", "atomic_unique_refcell", - "bitflags", + "bitflags 2.9.1", "buddy_allocator", + "bytes", "eonix_hal", "eonix_log", "eonix_macros", @@ -152,6 +224,7 @@ dependencies = [ "pointers", "posix_types", "slab_allocator", + "smoltcp", "virtio-drivers", "xmas-elf", ] @@ -176,7 +249,7 @@ dependencies = [ name = "eonix_mm" version = "0.1.0" dependencies = [ - "bitflags", + "bitflags 2.9.1", ] [[package]] @@ -254,7 +327,7 @@ version = "1.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9a1a97344bde15b0ace15e265dab27228d4bdc37a0bfa8548c5645d7cfa6a144" dependencies = [ - "bitflags", + "bitflags 2.9.1", "log", ] @@ -264,6 +337,45 @@ version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "784a4df722dc6267a04af36895398f59d21d07dce47232adf31ec0ff2fa45e67" +[[package]] +name = "hash32" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "47d60b12902ba28e2730cd37e95b8c9223af2808df9e902d4df49588d1470606" +dependencies = [ + "byteorder", +] + +[[package]] +name = "heapless" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0bfb9eb618601c89945a70e254898da93b13be0388091d42117462b265bb3fad" +dependencies = [ + "hash32", + "stable_deref_trait", +] + +[[package]] +name = "int-to-c-enum" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e24dde2fe29de031a6fc24642ea1a62cb8e273b6c9e8799fa5c7cf0e2f03f220" +dependencies = [ + "int-to-c-enum-derive", +] + +[[package]] +name = "int-to-c-enum-derive" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dac1b8cfab94604ae1bcab8af8dd04cac15568a95cff50c562e6108457022f49" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "intrusive-collections" version = "0.9.7" @@ -299,9 +411,15 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7c9f0d275c70310e2a9d2fc23250c5ac826a73fa828a5f256401f85c5c554283" dependencies = [ "bit_field", - "bitflags", + "bitflags 2.9.1", ] +[[package]] +name = "managed" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0ca88d725a0a943b096803bd34e73a4437208b6077654cc4ecb2947a5f91618d" + [[package]] name = "memoffset" version = "0.9.1" @@ -325,8 +443,31 @@ version = "0.1.0" name = "posix_types" version = "0.1.0" dependencies = [ - "bitflags", + "bitflags 2.9.1", "cfg-if", + "int-to-c-enum", +] + +[[package]] +name = "proc-macro-error-attr2" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "96de42df36bb9bba5542fe9f1a054b8cc87e172759a1868aa05c1f3acc89dfc5" +dependencies = [ + "proc-macro2", + "quote", +] + +[[package]] +name = "proc-macro-error2" +version = "2.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "11ec05c52be0a07b08061f7dd003e7d7092e0472bc731b4af7bb1ef876109802" +dependencies = [ + "proc-macro-error-attr2", + "proc-macro2", + "quote", + "syn", ] [[package]] @@ -401,6 +542,25 @@ dependencies = [ "intrusive_list", ] +[[package]] +name = "smoltcp" +version = "0.12.0" +source = "git+https://github.com/Shao-ZW/smoltcp.git?rev=f14baaf#f14baaf5f8dbeff90ddbae9b3371f7b3fab730a6" +dependencies = [ + "bitflags 1.3.2", + "byteorder", + "cfg-if", + "defmt 0.3.100", + "heapless", + "managed", +] + +[[package]] +name = "stable_deref_trait" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a8f112729512f8e442d81f95a8a7ddf2b7c6b8a1a6f509a95864142b30cab2d3" + [[package]] name = "syn" version = "2.0.103" @@ -444,7 +604,7 @@ version = "0.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7fe3f779fd88436e27b51540d9563c7454c8c814893a1e6f9bb6138bcac60627" dependencies = [ - "bitflags", + "bitflags 2.9.1", "embedded-io", "enumn", "log", diff --git a/Cargo.toml b/Cargo.toml index 15df5f15..1ed1d16c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -24,6 +24,8 @@ pointers = { path = "./crates/pointers" } posix_types = { path = "./crates/posix_types" } slab_allocator = { path = "./crates/slab_allocator" } +async-trait = "0.1.80" +bytes = { version = "1.10.0", default-features = false } bitflags = "2.6.0" intrusive-collections = "0.9.7" itertools = { version = "0.13.0", default-features = false } @@ -32,6 +34,26 @@ align_ext = "0.1.0" xmas-elf = "0.10.0" ext4_rs = "1.3.2" +[dependencies.smoltcp] +git = "https://github.com/Shao-ZW/smoltcp.git" +rev = "f14baaf" +default-features = false +features = [ + "alloc", + "medium-ethernet", + "medium-ip", + "proto-ipv4", + "proto-ipv6", + "socket-raw", + "socket-icmp", + "socket-udp", + "socket-tcp", + "socket-dns", + "proto-ipv4-fragmentation", + "async", + "reassembly-buffer-count-16", +] + [target.'cfg(any(target_arch = "riscv64", target_arch = "loongarch64"))'.dependencies] virtio-drivers = { version = "0.11.0" } diff --git a/crates/posix_types/Cargo.toml b/crates/posix_types/Cargo.toml index 5a74bc5d..ff41df77 100644 --- a/crates/posix_types/Cargo.toml +++ b/crates/posix_types/Cargo.toml @@ -6,3 +6,4 @@ edition = "2024" [dependencies] cfg-if = "1.0" bitflags = "2.6.0" +int-to-c-enum = "0.1.0" diff --git a/crates/posix_types/src/lib.rs b/crates/posix_types/src/lib.rs index dfe8d089..274ef1e7 100644 --- a/crates/posix_types/src/lib.rs +++ b/crates/posix_types/src/lib.rs @@ -3,6 +3,7 @@ pub mod constants; pub mod ctypes; pub mod namei; +pub mod net; pub mod open; pub mod poll; pub mod result; diff --git a/crates/posix_types/src/net.rs b/crates/posix_types/src/net.rs new file mode 100644 index 00000000..354192f0 --- /dev/null +++ b/crates/posix_types/src/net.rs @@ -0,0 +1,223 @@ +use core::net::Ipv4Addr; + +use bitflags::bitflags; +use int_to_c_enum::TryFromInt; + +// definition copy from asterinas + +#[repr(u32)] +#[derive(Debug, Clone, Copy, PartialEq, Eq, TryFromInt)] +#[expect(non_camel_case_types)] +pub enum SockDomain { + AF_UNSPEC = 0, + /// Unix domain sockets + AF_UNIX = 1, + // POSIX name for AF_UNIX + // AF_LOCAL = 1, + /// Internet IP Protocol + AF_INET = 2, + /// Amateur Radio AX.25 + AF_AX25 = 3, + /// Novell IPX + AF_IPX = 4, + /// AppleTalk DDP + AF_APPLETALK = 5, + /// Amateur Radio NET/ROM + AF_NETROM = 6, + /// Multiprotocol bridge + AF_BRIDGE = 7, + /// ATM PVCs + AF_ATMPVC = 8, + /// Reserved for X.25 project + AF_X25 = 9, + /// IP version 6, + AF_INET6 = 10, + /// Amateur Radio X.25 PLP + AF_ROSE = 11, + /// Reserved for DECnet project + AF_DECnet = 12, + /// Reserved for 802.2LLC project + AF_NETBEUI = 13, + /// Security callback pseudo AF + AF_SECURITY = 14, + /// PF_KEY key management API + AF_KEY = 15, + AF_NETLINK = 16, + // Alias to emulate 4.4BSD + // AF_ROUTE = AF_NETLINK + /// Packet family + AF_PACKET = 17, + /// Ash + AF_ASH = 18, + /// Acorn Econet + AF_ECONET = 19, + /// ATM SVCs + AF_ATMSVC = 20, + /// RDS sockets + AF_RDS = 21, + /// Linux SNA Project (nutters!) + AF_SNA = 22, + /// IRDA sockets + AF_IRDA = 23, + /// PPPoX sockets + AF_PPPOX = 24, + /// Wanpipe API Sockets + AF_WANPIPE = 25, + /// Linux LLC + AF_LLC = 26, + /// Native InfiniBand address + AF_IB = 27, + /// MPLS + AF_MPLS = 28, + /// Controller Area Network + AF_CAN = 29, + /// TIPC sockets + AF_TIPC = 30, + /// Bluetooth sockets + AF_BLUETOOTH = 31, + /// IUCV sockets + AF_IUCV = 32, + /// RxRPC sockets + AF_RXRPC = 33, + /// mISDN sockets + AF_ISDN = 34, + /// Phonet sockets + AF_PHONET = 35, + /// IEEE802154 sockets + AF_IEEE802154 = 36, + /// CAIF sockets + AF_CAIF = 37, + /// Algorithm sockets + AF_ALG = 38, + /// NFC sockets + AF_NFC = 39, + /// vSockets + AF_VSOCK = 40, + /// Kernel Connection Multiplexor + AF_KCM = 41, + /// Qualcomm IPC Router + AF_QIPCRTR = 42, + /// smc sockets: reserve number for + /// PF_SMC protocol family that + /// reuses AF_INET address family + AF_SMC = 43, + /// XDP sockets + AF_XDP = 44, + /// Management component transport protocol + AF_MCTP = 45, +} + +pub const SOCK_TYPE_MASK: u32 = 0xf; + +bitflags! { + #[repr(C)] + pub struct SockFlags: u32 { + const SOCK_NONBLOCK = 1 << 11; + const SOCK_CLOEXEC = 1 << 19; + } +} + +#[repr(u32)] +#[expect(non_camel_case_types)] +#[derive(Debug, Clone, Copy, TryFromInt)] +pub enum SockType { + /// Stream socket + SOCK_STREAM = 1, + /// Datagram socket + SOCK_DGRAM = 2, + /// Raw socket + SOCK_RAW = 3, + /// Reliably-delivered message + SOCK_RDM = 4, + /// Sequential packet socket + SOCK_SEQPACKET = 5, + /// Datagram Congestion Control Protocol socket + SOCK_DCCP = 6, + /// Linux specific way of getting packets at the dev level + SOCK_PACKET = 10, +} + +#[repr(u32)] +#[derive(Debug, Clone, Copy, TryFromInt)] +#[expect(non_camel_case_types)] +pub enum Protocol { + IPPROTO_IP = 0, /* Dummy protocol for TCP */ + IPPROTO_ICMP = 1, /* Internet Control Message Protocol */ + IPPROTO_IGMP = 2, /* Internet Group Management Protocol */ + IPPROTO_TCP = 6, /* Transmission Control Protocol */ + IPPROTO_EGP = 8, /* Exterior Gateway Protocol */ + IPPROTO_PUP = 12, /* PUP protocol */ + IPPROTO_UDP = 17, /* User Datagram Protocol */ + IPPROTO_IDP = 22, /* XNS IDP protocol */ + IPPROTO_TP = 29, /* SO Transport Protocol Class 4 */ + IPPROTO_DCCP = 33, /* Datagram Congestion Control Protocol */ + IPPROTO_IPV6 = 41, /* IPv6-in-IPv4 tunnelling */ + IPPROTO_RSVP = 46, /* RSVP Protocol */ + IPPROTO_GRE = 47, /* Cisco GRE tunnels (rfc 1701,1702) */ + IPPROTO_ESP = 50, /* Encapsulation Security Payload protocol */ + IPPROTO_AH = 51, /* Authentication Header protocol */ + IPPROTO_MTP = 92, /* Multicast Transport Protocol */ + IPPROTO_BEETPH = 94, /* IP option pseudo header for BEET */ + IPPROTO_ENCAP = 98, /* Encapsulation Header */ + IPPROTO_PIM = 103, /* Protocol Independent Multicast */ + IPPROTO_COMP = 108, /* Compression Header Protocol */ + IPPROTO_SCTP = 132, /* Stream Control Transport Protocol */ + IPPROTO_UDPLITE = 136, /* UDP-Lite (RFC 3828) */ + IPPROTO_MPLS = 137, /* MPLS in IP (RFC 4023) */ + IPPROTO_ETHERNET = 143, /* Ethernet-within-IPv6 Encapsulation */ + IPPROTO_RAW = 255, /* Raw IP packets */ + IPPROTO_MPTCP = 262, /* Multipath TCP connection */ +} + +pub const ADDR_MAX_LEN: usize = 128; + +#[repr(C)] +#[derive(Debug, Clone, Copy)] +pub struct CSockAddr { + pub sa_family: u16, + pub bytes: [u8; ADDR_MAX_LEN - 2], + pub _align: [u64; 0], +} + +#[repr(C)] +#[derive(Debug, Clone, Copy)] +pub struct CSocketAddrInet { + /// Address family (AF_INET). + sin_family: u16, + /// Port number. + sin_port: u16, + /// IPv4 address. + sin_addr: u32, + /// Pad bytes to 16-byte `struct sockaddr`. + sin_zero: [u8; 8], +} + +impl CSocketAddrInet { + pub fn new(addr: Ipv4Addr, port: u16) -> Self { + Self { + sin_family: 2, // AF_INET = 2, + sin_port: port.to_be(), + sin_addr: u32::from_ne_bytes(addr.octets()), + sin_zero: [0; 8], + } + } +} + +#[repr(C)] +#[derive(Debug, Clone, Copy)] +pub struct MsgHdr { + /// Pointer to socket address structure + pub msg_name: usize, + /// Size of socket address + pub msg_namelen: u32, + /// Scatter/Gather iov array + pub msg_iov: usize, + /// The # of elements in msg_iov + pub msg_iovlen: u32, + /// Ancillary data + pub msg_control: usize, + /// Ancillary data buffer length + pub msg_controllen: u32, + /// Flags on received message + pub msg_flags: u32, +} diff --git a/crates/posix_types/src/open.rs b/crates/posix_types/src/open.rs index 758ea331..e0538b13 100644 --- a/crates/posix_types/src/open.rs +++ b/crates/posix_types/src/open.rs @@ -1,7 +1,7 @@ use bitflags::bitflags; bitflags! { - #[derive(Debug, Clone, Copy)] + #[derive(Debug, Clone, Copy, Default)] pub struct OpenFlags: u32 { /// Open for writing only const O_WRONLY = 0x1; @@ -25,13 +25,13 @@ bitflags! { const O_CLOEXEC = 0x80000; } - #[derive(Debug, Clone, Copy)] + #[derive(Debug, Clone, Copy, Default)] pub struct FDFlags: u32 { /// Close on exec const FD_CLOEXEC = 0x1; } - #[derive(Debug, Clone, Copy)] + #[derive(Debug, Clone, Copy, Default)] pub struct AtFlags: u32 { /// Do not follow symbolic links const AT_SYMLINK_NOFOLLOW = 0x100; diff --git a/crates/posix_types/src/poll.rs b/crates/posix_types/src/poll.rs index 781f589f..e4cb5b84 100644 --- a/crates/posix_types/src/poll.rs +++ b/crates/posix_types/src/poll.rs @@ -1,5 +1,47 @@ -pub const FDSET_LENGTH: usize = 1024 / (8 * size_of::()); +// Fork form asterinas +pub const FD_SETSIZE: usize = 1024; +pub const USIZE_BITS: usize = core::mem::size_of::() * 8; +#[derive(Debug, Clone, Copy, PartialEq)] +#[repr(C)] pub struct FDSet { - fds_bits: [usize; FDSET_LENGTH], + fds_bits: [usize; FD_SETSIZE / USIZE_BITS], +} + +impl FDSet { + /// Equivalent to FD_SET. + pub fn set(&mut self, fd: u32) -> bool { + let fd = fd as usize; + if fd >= FD_SETSIZE { + return false; + } + self.fds_bits[fd / USIZE_BITS] |= 1 << (fd % USIZE_BITS); + true + } + + /// Equivalent to FD_CLR. + pub fn unset(&mut self, fd: u32) -> bool { + let fd = fd as usize; + if fd >= FD_SETSIZE { + return false; + } + self.fds_bits[fd / USIZE_BITS] &= !(1 << (fd % USIZE_BITS)); + true + } + + /// Equivalent to FD_ISSET. + pub fn is_set(&self, fd: u32) -> bool { + let fd = fd as usize; + if fd >= FD_SETSIZE { + return false; + } + (self.fds_bits[fd / USIZE_BITS] & (1 << (fd % USIZE_BITS))) != 0 + } + + /// Equivalent to FD_ZERO. + pub fn clear(&mut self) { + for slot in self.fds_bits.iter_mut() { + *slot = 0; + } + } } diff --git a/doc/net.md b/doc/net.md new file mode 100644 index 00000000..0b8b9698 --- /dev/null +++ b/doc/net.md @@ -0,0 +1,174 @@ +# 网络栈 + +Eonix的网络协议栈基于 smoltcp 设计。由于 smoltcp 是一个专为嵌入式环境设计的 Rust 网络栈,相关网络接口与 posix 规范并不对齐,我们通过对 smoltcp 进行面向 posix 网络规范的改造与封装,实现了从底层网络设备驱动到高层应用程序套接字的网络协议栈支持。Eonix 贯彻 smoltcp 设计理念,将网络功能划分为设备层,接口层,套接字层三个核心层次,优雅实现了网络协议栈的层次抽象。同时 Eonix 网络层利用 Rust 高效异步特性,有效提升网络性能和系统灵活性。 + +## 核心架构 + +整个网络模块分为三个核心层次: + +- 设备层 (Device Layer):负责与具体的物理或虚拟网络设备进行交互,如 VirtIO 网卡或回环设备。它实现了 smoltcp::phy::Device Trait,为上层提供统一的数据包收发接口。 +- 接口层 (Interface Layer):管理网络接口卡(NIC)的配置,包括 IP 地址、MAC 地址等。它维护了一个 smoltcp::iface::Interface 实例和一个套接字集合 (smoltcp::iface::SocketSet)。 +- 套接字层 (Socket Layer):提供用户态应用程序使用的抽象,如 TcpSocket 和 UdpSocket。它封装了 smoltcp 的底层套接字,并提供了 bind, listen, connect, send, recv 等标准 API,并支持异步操作。 + +## 设备层 + +设备层是整个网络栈的基础,它将物理硬件抽象为统一的 NetDevice 对象。NetDev Trait 定义了所有网络设备必须实现的接口,如 name(), mac_addr(), recv(), send() 等。这使得上层代码可以与具体的设备类型解耦。 + +``` rust +pub type NetDevice = Arc>; + +pub trait NetDev: Send { + fn name(&self) -> &'static str; + fn mac_addr(&self) -> Mac; + fn caps(&self) -> DeviceCapabilities; + fn can_receive(&self) -> bool; + fn can_send(&self) -> bool; + fn recv(&mut self) -> Result; + fn recycle_rx_buffer(&mut self, rx_buffer: RxBuffer) -> Result<(), NetDevError>; + fn send(&mut self, data: &[u8]) -> Result<(), NetDevError>; +} +``` + +smoltcp::phy::Device 实现: NetDev Trait 通过 impl smoltcp::phy::Device for dyn NetDev 来适配 smoltcp 的物理设备接口。它将 recv 和 send 方法映射到 smoltcp 的 receive 和 transmit 方法上,实现了协议栈与设备的交互。 + +``` rust +// 实现 smoltcp 的 Device Trait,将 NetDev 适配到协议栈 +impl smoltcp::phy::Device for dyn NetDev { + type RxToken<'a> = RxToken where Self: 'a; + type TxToken<'a> = TxToken<'a> where Self: 'a; + + fn receive(&mut self, _timestamp: smoltcp::time::Instant) -> Option<(Self::RxToken<'_>, Self::TxToken<'_>)> { + if self.can_receive() && self.can_send() { + let rx_buffer = self.recv().unwrap(); + Some((RxToken(rx_buffer), TxToken(self))) + } else { + None + } + } + // ... (transmit, capabilities 的实现) +} +``` + +## 接口层 + +接口层 (Iface) 负责管理一个网络接口的协议栈状态。 + +- Iface 结构体: 封装了 NetDevice、smoltcp::iface::Interface 和 smoltcp::iface::SocketSet。它将设备层和套接字层连接起来。 + +- 端口管理: 使用 BTreeSet<(SocketType, u16)> 来追踪已使用的端口,并提供了 alloc_port 和 bind_socket 方法来分配和绑定端口,防止端口冲突。 + +- 定时轮询: Iface::poll() 方法会定期调用 smoltcp::iface::Interface::poll() 来处理传入和传出的数据包,并更新套接字状态。这是整个异步 I/O 机制的核心驱动力。 + +``` rust +pub struct Iface { + device: NetDevice, + iface_inner: Interface, + used_ports: BTreeSet<(SocketType, u16)>, + sockets: SocketSet<'static>, +} + +impl Iface { + pub fn poll(&mut self) { + let mut device = Task::block_on(self.device.lock()); + let timestamp = smoltcp::time::Instant::from_millis(Instant::now().to_millis() as i64); + self.iface_inner.poll(timestamp, &mut *device, &mut self.sockets); + } +} +``` + +## 套接字层 + +套接字层提供了与 POSIX 类似但基于 Rust async/await 的 API。 + +- Socket Trait: 定义了通用的套接字接口,所有具体协议的套接字都必须实现此 Trait。 + +``` rust +# [async_trait] +pub trait Socket: Sync + Send { + fn local_addr(&self) -> Option; + fn remote_addr(&self) -> Option; + fn bind(&self, socket_addr: SocketAddr) -> KResult<()>; + fn listen(&self, backlog: usize) -> KResult<()>; + async fn connect(&self, remote_addr: SocketAddr) -> KResult<()>; + async fn accept(&self) -> KResult>; + async fn recv(&self, buffer: &mut dyn Buffer) -> KResult<(usize, RecvMetadata)>; + async fn send(&self, stream: &mut dyn Stream, send_meta: SendMetadata) -> KResult; + async fn poll(&self, events: PollEvent) -> KResult; +} +``` + +- TcpSocket/UdpSocket 结构体: 封装了 smoltcp socket ,并在结构体中存储热点数据,实现更加细粒度的并发管理。 + +``` rust +pub struct TcpSocket { + bound_socket: RwLock>, + local_addr: RwLock>, + remote_addr: RwLock>, + is_nonblock: bool, +} +``` + +- 异步 I/O: recv() 和 send() 等方法被实现为 async fn,内部通过 Future 状态机来处理非阻塞操作。当 smoltcp 的套接字缓冲区不可用时,它会注册 waker,并在 poll 方法中唤醒任务。 + +``` rust +impl TcpSocket { + n try_recv( + &self, + buf: &mut dyn Buffer, + waker: &Waker, + ) -> KResult> { + // ... socket 并发访问控制 + if !socket.may_recv() { + Err(ENOTCONN) + } else if socket.can_recv() { + let len = socket + .recv(|rx_data| { + let _ = buf.fill(&rx_data[..]); + (buf.wrote(), buf.wrote()) + }) + .unwrap(); + Ok(Some(( + len, + RecvMetadata { + remote_addr: self.remote_addr().unwrap(), + }, + ))) + } else { + socket.register_recv_waker(waker); + Ok(None) + } + } +} + +impl Socket for TcpSocket { +/// ... bind、send等实现 +async fn recv(&self, buffer: &mut dyn Buffer) -> KResult<(usize, RecvMetadata)> { + struct RecvFuture<'a> { + socket: &'a TcpSocket, + buffer: &'a mut dyn Buffer, + } + + impl<'a> Future for RecvFuture<'a> { + type Output = KResult<(usize, RecvMetadata)>; + + fn poll(self: Pin<&mut Self>, cx: &mut core::task::Context<'_>) -> Poll { + let this = self.get_mut(); + match this.socket.try_recv(this.buffer, cx.waker()) { + Ok(Some(res)) => Poll::Ready(Ok(res)), + Ok(None) if this.socket.is_nonblock => Poll::Ready(Err(EAGAIN)), + Ok(None) => Poll::Pending, + Err(err) => Poll::Ready(Err(err)), + } + } + } + + RecvFuture { + socket: self, + buffer, + } + .await + } +} +``` + +- 资源管理: TcpSocket::drop 方法确保在套接字被销毁时,正确地从接口的套接字集合中移除并释放端口资源,避免资源泄漏。 diff --git a/src/driver/virtio.rs b/src/driver/virtio.rs index ea0ff896..2267bed8 100644 --- a/src/driver/virtio.rs +++ b/src/driver/virtio.rs @@ -1,4 +1,8 @@ +mod hal; mod virtio_blk; +mod virtio_net; + +pub use virtio_net::VIRTIO_NET_NAME; #[cfg(not(any(target_arch = "riscv64", target_arch = "loongarch64")))] compile_error!("VirtIO drivers are only supported on RISC-V and LoongArch64 architecture"); diff --git a/src/driver/virtio/hal.rs b/src/driver/virtio/hal.rs new file mode 100644 index 00000000..e4c0aee5 --- /dev/null +++ b/src/driver/virtio/hal.rs @@ -0,0 +1,66 @@ +use crate::kernel::mem::{AsMemoryBlock, Page}; +use eonix_hal::mm::ArchPhysAccess; +use eonix_mm::{ + address::{Addr, PAddr, PhysAccess}, + paging::PFN, +}; +use virtio_drivers::Hal; + +pub struct HAL; + +unsafe impl Hal for HAL { + fn dma_alloc( + pages: usize, + _direction: virtio_drivers::BufferDirection, + ) -> (virtio_drivers::PhysAddr, core::ptr::NonNull) { + let page = Page::alloc_at_least(pages); + + let paddr = page.start().addr(); + let ptr = page.as_memblk().as_byte_ptr(); + page.into_raw(); + + (paddr, ptr) + } + + unsafe fn dma_dealloc( + paddr: virtio_drivers::PhysAddr, + _vaddr: core::ptr::NonNull, + _pages: usize, + ) -> i32 { + let pfn = PFN::from(PAddr::from(paddr)); + + unsafe { + // SAFETY: The caller ensures that the pfn corresponds to a valid + // page allocated by `dma_alloc`. + Page::from_raw(pfn); + } + + 0 + } + + unsafe fn mmio_phys_to_virt( + paddr: virtio_drivers::PhysAddr, + _size: usize, + ) -> core::ptr::NonNull { + unsafe { ArchPhysAccess::as_ptr(PAddr::from(paddr)) } + } + + unsafe fn share( + buffer: core::ptr::NonNull<[u8]>, + _direction: virtio_drivers::BufferDirection, + ) -> virtio_drivers::PhysAddr { + let paddr = unsafe { + // SAFETY: The caller ensures that the buffer is valid. + ArchPhysAccess::from_ptr(buffer.cast::()) + }; + + paddr.addr() + } + + unsafe fn unshare( + _paddr: virtio_drivers::PhysAddr, + _buffer: core::ptr::NonNull<[u8]>, + _direction: virtio_drivers::BufferDirection, + ) { + } +} diff --git a/src/driver/virtio/loongarch64.rs b/src/driver/virtio/loongarch64.rs index bcd7e713..b350f56c 100644 --- a/src/driver/virtio/loongarch64.rs +++ b/src/driver/virtio/loongarch64.rs @@ -1,4 +1,4 @@ -use super::virtio_blk::HAL; +use crate::driver::virtio::hal::HAL; use crate::kernel::{ block::{make_device, BlockDevice}, constants::EIO, diff --git a/src/driver/virtio/riscv64.rs b/src/driver/virtio/riscv64.rs index 66f150c3..1fad99eb 100644 --- a/src/driver/virtio/riscv64.rs +++ b/src/driver/virtio/riscv64.rs @@ -1,23 +1,19 @@ -use super::virtio_blk::HAL; -use crate::kernel::{ - block::{make_device, BlockDevice}, - mem::{AsMemoryBlock, MemoryBlock, Page}, +use crate::{ + driver::virtio::{hal::HAL, VIRTIO_NET_NAME}, + kernel::block::{make_device, BlockDevice}, + net, }; + use alloc::{sync::Arc, vec::Vec}; -use core::num::NonZero; use eonix_hal::arch_exported::fdt::FDT; use eonix_hal::mm::ArchPhysAccess; use eonix_log::{println_info, println_warn}; -use eonix_mm::{ - address::{Addr, PAddr, PhysAccess}, - paging::PFN, -}; +use eonix_mm::address::{PAddr, PhysAccess}; use eonix_runtime::task::Task; use eonix_sync::Spin; use virtio_drivers::{ - device::blk::VirtIOBlk, + device::{blk::VirtIOBlk, net::VirtIONet}, transport::{mmio::MmioTransport, Transport}, - Hal, }; pub fn init() { @@ -61,11 +57,13 @@ pub fn init() { disk_id += 1; } virtio_drivers::transport::DeviceType::Network => { - println_info!( - "Initializing Virtio Network device at {:?} with size {:#x}", - base, - size - ); + const NET_QUEUE_SIZE: usize = 64; + const NET_BUF_LEN: usize = 2048; + let net_device = + VirtIONet::::new(transport, NET_BUF_LEN) + .expect("Failed to initialize VirtIO Net device"); + net::register_netdev(VIRTIO_NET_NAME, net_device) + .expect("Failed to register VirtIO Net device"); } virtio_drivers::transport::DeviceType::Console => { println_info!( diff --git a/src/driver/virtio/virtio_blk.rs b/src/driver/virtio/virtio_blk.rs index e8723cac..5afb2a30 100644 --- a/src/driver/virtio/virtio_blk.rs +++ b/src/driver/virtio/virtio_blk.rs @@ -1,78 +1,15 @@ use crate::{ + driver::virtio::hal::HAL, io::Chunks, kernel::{ block::{BlockDeviceRequest, BlockRequestQueue}, constants::EIO, - mem::{AsMemoryBlock, Page}, + mem::AsMemoryBlock, }, prelude::KResult, }; -use eonix_hal::mm::ArchPhysAccess; -use eonix_mm::{ - address::{Addr, PAddr, PhysAccess}, - paging::PFN, -}; use eonix_sync::Spin; -use virtio_drivers::{device::blk::VirtIOBlk, transport::Transport, Hal}; - -pub struct HAL; - -unsafe impl Hal for HAL { - fn dma_alloc( - pages: usize, - _direction: virtio_drivers::BufferDirection, - ) -> (virtio_drivers::PhysAddr, core::ptr::NonNull) { - let page = Page::alloc_at_least(pages); - - let paddr = page.start().addr(); - let ptr = page.as_memblk().as_byte_ptr(); - page.into_raw(); - - (paddr, ptr) - } - - unsafe fn dma_dealloc( - paddr: virtio_drivers::PhysAddr, - _vaddr: core::ptr::NonNull, - _pages: usize, - ) -> i32 { - let pfn = PFN::from(PAddr::from(paddr)); - - unsafe { - // SAFETY: The caller ensures that the pfn corresponds to a valid - // page allocated by `dma_alloc`. - Page::from_raw(pfn); - } - - 0 - } - - unsafe fn mmio_phys_to_virt( - paddr: virtio_drivers::PhysAddr, - _size: usize, - ) -> core::ptr::NonNull { - unsafe { ArchPhysAccess::as_ptr(PAddr::from(paddr)) } - } - - unsafe fn share( - buffer: core::ptr::NonNull<[u8]>, - _direction: virtio_drivers::BufferDirection, - ) -> virtio_drivers::PhysAddr { - let paddr = unsafe { - // SAFETY: The caller ensures that the buffer is valid. - ArchPhysAccess::from_ptr(buffer.cast::()) - }; - - paddr.addr() - } - - unsafe fn unshare( - _paddr: virtio_drivers::PhysAddr, - _buffer: core::ptr::NonNull<[u8]>, - _direction: virtio_drivers::BufferDirection, - ) { - } -} +use virtio_drivers::{device::blk::VirtIOBlk, transport::Transport}; impl BlockRequestQueue for Spin> where diff --git a/src/driver/virtio/virtio_net.rs b/src/driver/virtio/virtio_net.rs new file mode 100644 index 00000000..fa89a366 --- /dev/null +++ b/src/driver/virtio/virtio_net.rs @@ -0,0 +1,57 @@ +use crate::driver::virtio::hal::HAL; +use crate::net::device::{Mac, NetDev, NetDevError, RxBuffer}; +use eonix_log::println_debug; +use smoltcp::phy::{DeviceCapabilities, Medium}; + +use virtio_drivers::{ + device::net::{self, VirtIONet}, + transport::Transport, +}; + +pub static VIRTIO_NET_NAME: &'static str = "virtio_net"; + +impl NetDev for VirtIONet +where + T: Transport + Send + Sync, +{ + fn name(&self) -> &'static str { + VIRTIO_NET_NAME + } + + fn mac_addr(&self) -> Mac { + self.mac_address() + } + + fn caps(&self) -> DeviceCapabilities { + let mut caps = DeviceCapabilities::default(); + caps.max_transmission_unit = 1514; + caps.max_burst_size = None; + caps.medium = Medium::Ethernet; + caps + } + + fn can_receive(&self) -> bool { + self.can_recv() + } + + fn can_send(&self) -> bool { + self.can_send() + } + + fn recv(&mut self) -> Result { + self.receive().map_or_else( + |_| Err(NetDevError::Unknown), + |rx_buffer| Ok(RxBuffer::VirtIOBuffer(rx_buffer)), + ) + } + + fn recycle_rx_buffer(&mut self, rx_buffer: RxBuffer) -> Result<(), NetDevError> { + self.recycle_rx_buffer(rx_buffer.into_virtio_buffer().unwrap()) + .map_err(|_| NetDevError::Unknown) + } + + fn send(&mut self, data: &[u8]) -> Result<(), NetDevError> { + self.send(net::TxBuffer::from(data)) + .map_err(|_| NetDevError::Unknown) + } +} diff --git a/src/io.rs b/src/io.rs index f1eae9b9..76492202 100644 --- a/src/io.rs +++ b/src/io.rs @@ -30,7 +30,7 @@ impl FillResult { } } -pub trait Buffer { +pub trait Buffer: Send { fn total(&self) -> usize; fn wrote(&self) -> usize; @@ -49,7 +49,8 @@ pub trait Buffer { } } -pub trait Stream { +pub trait Stream: Send { + fn total(&self) -> usize; fn poll_data<'a>(&mut self, buf: &'a mut [u8]) -> KResult>; fn ignore(&mut self, len: usize) -> KResult>; } @@ -161,7 +162,7 @@ impl<'lt, T: Copy + Sized> UninitBuffer<'lt, T> { } } -impl<'lt, T: Copy + Sized> Buffer for UninitBuffer<'lt, T> { +impl<'lt, T: Copy + Sized + Send> Buffer for UninitBuffer<'lt, T> { fn total(&self) -> usize { self.buffer.total() } @@ -282,6 +283,10 @@ impl<'a> ByteStream<'a> { } impl<'a> Stream for ByteStream<'a> { + fn total(&self) -> usize { + self.data.len() + } + fn poll_data<'b>(&mut self, buf: &'b mut [u8]) -> KResult> { if self.cur >= self.data.len() { return Ok(None); diff --git a/src/kernel/chardev.rs b/src/kernel/chardev.rs index cd23fc14..a7550bd2 100644 --- a/src/kernel/chardev.rs +++ b/src/kernel/chardev.rs @@ -1,3 +1,5 @@ +use core::sync::atomic::{AtomicU8, Ordering}; + use super::{ block::make_device, console::get_console, @@ -119,6 +121,24 @@ impl VirtualCharDevice for ZeroDevice { } } +struct RandomDevice(AtomicU8); + +impl VirtualCharDevice for RandomDevice { + fn read(&self, buffer: &mut dyn Buffer) -> KResult { + // TODO: Copy from empty page. + let mut data = self.0.load(Ordering::Relaxed); + while let false = buffer.fill(&[data])?.should_stop() { + data = (data as usize * 91 + 114514) as u8; + } + self.0.store(data, Ordering::Relaxed); + Ok(buffer.wrote()) + } + + fn write(&self, stream: &mut dyn Stream) -> KResult { + stream.ignore_all() + } +} + struct ConsoleDevice; impl VirtualCharDevice for ConsoleDevice { fn read(&self, buffer: &mut dyn Buffer) -> KResult { @@ -149,6 +169,18 @@ impl CharDevice { CharDeviceType::Virtual(Box::new(ZeroDevice)), )?; + Self::register( + make_device(1, 8), + Arc::from("random"), + CharDeviceType::Virtual(Box::new(RandomDevice(AtomicU8::new(114)))), + )?; + + Self::register( + make_device(1, 9), + Arc::from("urandom"), + CharDeviceType::Virtual(Box::new(RandomDevice(AtomicU8::new(91)))), + )?; + Self::register( make_device(5, 1), Arc::from("console"), diff --git a/src/kernel/constants.rs b/src/kernel/constants.rs index 4e11d66e..ed4ddf53 100644 --- a/src/kernel/constants.rs +++ b/src/kernel/constants.rs @@ -15,6 +15,7 @@ pub const SIG_SETMASK: u32 = 2; pub const CLOCK_REALTIME: u32 = 0; pub const CLOCK_MONOTONIC: u32 = 1; +pub const CLOCK_PROCESS_CPUTIME_ID: u32 = 2; pub const CLOCK_REALTIME_COARSE: u32 = 5; pub const EPERM: u32 = 1; @@ -44,6 +45,14 @@ pub const ERANGE: u32 = 34; pub const ENOSYS: u32 = 38; pub const ELOOP: u32 = 40; pub const EOVERFLOW: u32 = 75; +pub const ENOTSOCK: u32 = 88; +pub const EAFNOSUPPORT: u32 = 97; +pub const EADDRINUSE: u32 = 98; +pub const EADDRNOTAVAIL: u32 = 99; +pub const ECONNRESET: u32 = 104; +pub const EISCONN: u32 = 106; +pub const ENOTCONN: u32 = 107; +pub const ECONNREFUSED: u32 = 111; // pub const S_IFIFO: u32 = 0o010000; pub const S_IFCHR: u32 = 0o020000; diff --git a/src/kernel/mem/page_cache.rs b/src/kernel/mem/page_cache.rs index 863e538e..e0567d21 100644 --- a/src/kernel/mem/page_cache.rs +++ b/src/kernel/mem/page_cache.rs @@ -26,6 +26,8 @@ unsafe impl Sync for PageCache {} #[derive(Clone, Copy)] pub struct CachePage(RawPagePtr); +unsafe impl Send for CachePage {} + impl Buffer for CachePage { fn total(&self) -> usize { PAGE_SIZE diff --git a/src/kernel/syscall/file_rw.rs b/src/kernel/syscall/file_rw.rs index 5683b27e..8cd19086 100644 --- a/src/kernel/syscall/file_rw.rs +++ b/src/kernel/syscall/file_rw.rs @@ -1,4 +1,4 @@ -use core::time::Duration; +use core::time::{self, Duration}; use super::FromSyscallArg; use crate::io::IntoStream; @@ -23,7 +23,9 @@ use crate::{ path::Path, prelude::*, }; +use alloc::slice; use alloc::sync::Arc; +use bitflags::bitflags; use eonix_runtime::task::Task; use posix_types::ctypes::{Long, PtrT}; use posix_types::namei::RenameFlags; @@ -74,8 +76,8 @@ fn dentry_from( } #[eonix_macros::define_syscall(SYS_READ)] -fn read(fd: FD, buffer: *mut u8, bufsize: usize) -> KResult { - let mut buffer = UserBuffer::new(buffer, bufsize)?; +fn read(fd: FD, buffer_: *mut u8, bufsize: usize) -> KResult { + let mut buffer = UserBuffer::new(buffer_, bufsize)?; Task::block_on(thread.files.get(fd).ok_or(EBADF)?.read(&mut buffer, None)) } @@ -362,9 +364,9 @@ fn llseek(fd: FD, offset_high: u32, offset_low: u32, result: *mut u64, whence: u #[repr(C)] #[derive(Clone, Copy)] -struct IoVec { - base: PtrT, - len: Long, +pub struct IoVec { + pub base: PtrT, + pub len: Long, } #[eonix_macros::define_syscall(SYS_READV)] @@ -535,8 +537,8 @@ fn ppoll( #[eonix_macros::define_syscall(SYS_PSELECT6)] fn pselect6( nfds: u32, - _readfds: *mut FDSet, - _writefds: *mut FDSet, + readfds: *mut FDSet, + writefds: *mut FDSet, _exceptfds: *mut FDSet, timeout: *mut TimeSpec, _sigmask: *const (), @@ -544,24 +546,111 @@ fn pselect6( // According to [pthread6(2)](https://linux.die.net/man/2/pselect6): // Some code calls select() with all three sets empty, nfds zero, and // a non-NULL timeout as a fairly portable way to sleep with subsecond precision. - if nfds != 0 { - thread.raise(Signal::SIGSYS); - return Err(ENOSYS); + if nfds == 0 { + let timeout = UserPointerMut::new(timeout)?; + + // Read here to check for invalid pointers. + let _timeout_value = timeout.read()?; + + Task::block_on(sleep(Duration::from_millis(10))); + + timeout.write(TimeSpec { + tv_sec: 0, + tv_nsec: 0, + })?; + + return Ok(0); } - let timeout = UserPointerMut::new(timeout)?; - - // Read here to check for invalid pointers. - let _timeout_value = timeout.read()?; + let _time_out = if timeout.is_null() { + None + } else { + let timeout = UserPointer::new(timeout)?.read()?; + if timeout.tv_nsec == 0 && timeout.tv_sec == 0 { + return Ok(0); + } - Task::block_on(sleep(Duration::from_millis(10))); + Some(timeout) + }; - timeout.write(TimeSpec { - tv_sec: 0, - tv_nsec: 0, - })?; + let mut read_fds = if readfds.is_null() { + None + } else { + Some(UserPointer::new(readfds)?.read()?) + }; + let mut write_fds = if writefds.is_null() { + None + } else { + Some(UserPointer::new(writefds)?.read()?) + }; + + let poll_fds = { + let mut poll_fds = Vec::with_capacity(nfds as usize); + for fd in 0..nfds { + let events = { + let readable = read_fds.as_ref().is_some_and(|fds| fds.is_set(fd)); + let writable = write_fds.as_ref().is_some_and(|fds| fds.is_set(fd)); + let mut events = PollEvent::empty(); + + if readable { + events |= PollEvent::Readable; + } + if writable { + events |= PollEvent::Writable; + } + events + }; + + if !events.is_empty() { + poll_fds.push((fd, events)); + } + } + poll_fds + }; - Ok(0) + if let Some(fds) = &mut read_fds { + fds.clear(); + } + if let Some(fds) = &mut write_fds { + fds.clear(); + } + + let mut tot = 0; + + loop { + for (fd, events) in &poll_fds { + let res = Task::block_on(thread.files.get(FD::from(*fd)).ok_or(EBADF)?.poll(*events))?; + + if res.contains(PollEvent::Readable) { + if let Some(fds) = &mut read_fds { + fds.set(*fd); + tot += 1; + } + } + if res.contains(PollEvent::Writable) { + if let Some(fds) = &mut write_fds { + fds.set(*fd); + tot += 1; + } + } + } + + if tot > 0 { + break; + } + + // Since we already have a background iface poll task, simply sleep for a while + Task::block_on(sleep(Duration::from_millis(100))); + } + + if let Some(fds) = read_fds { + UserPointerMut::new(readfds)?.write(fds)?; + } + if let Some(fds) = write_fds { + UserPointerMut::new(writefds)?.write(fds)?; + } + + Ok(tot) } #[cfg(target_arch = "x86_64")] diff --git a/src/kernel/syscall/net.rs b/src/kernel/syscall/net.rs index 82ec9152..b3dc5627 100644 --- a/src/kernel/syscall/net.rs +++ b/src/kernel/syscall/net.rs @@ -1,10 +1,376 @@ -use crate::kernel::constants::EINVAL; +use core::net::IpAddr; +use core::net::Ipv4Addr; +use core::net::SocketAddr; + +use crate::io::Buffer; +use crate::io::IntoStream; +use crate::kernel::constants::{EAFNOSUPPORT, EBADF, EINVAL, ENOTSOCK}; +use crate::kernel::syscall::file_rw::IoVec; +use crate::kernel::user::dataflow::CheckedUserPointer; +use crate::kernel::user::UserBuffer; +use crate::kernel::user::UserPointer; +use crate::kernel::user::UserPointerMut; +use crate::kernel::vfs::filearray::FD; +use crate::net::socket::tcp::TcpSocket; +use crate::net::socket::udp::UdpSocket; +use crate::net::socket::SendMetadata; use crate::prelude::*; +use bytes::Buf; +use eonix_runtime::task::Task; +use posix_types::ctypes::Long; +use posix_types::net::CSocketAddrInet; +use posix_types::net::{ + CSockAddr, MsgHdr, Protocol, SockDomain, SockFlags, SockType, ADDR_MAX_LEN, SOCK_TYPE_MASK, +}; use posix_types::syscall_no::*; +fn read_socket_addr(addr_ptr: *const CSockAddr, addrlen: usize) -> KResult { + if addrlen > ADDR_MAX_LEN || addrlen < 2 { + return Err(EINVAL); + } + + let raw_sockaddr = UserPointer::new(addr_ptr)?.read()?; + + match SockDomain::try_from(raw_sockaddr.sa_family as u32) { + Ok(SockDomain::AF_INET) => { + if addrlen < size_of::() { + return Err(EINVAL); + } + + let mut bytes = raw_sockaddr.bytes.as_slice(); + let port = bytes.get_u16(); + let addr_bits = bytes.get_u32(); + let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::from_bits(addr_bits)), port); + Ok(socket_addr) + } + _ => Err(EAFNOSUPPORT), + } +} + +fn write_socket_addr( + addr_ptr: *mut CSockAddr, + addrlen_ptr: *mut u32, + socket_addr: SocketAddr, +) -> KResult<()> { + match socket_addr { + SocketAddr::V4(addr) => { + let raw_socket = CSocketAddrInet::new(*addr.ip(), addr.port()); + UserPointerMut::new(addr_ptr as *mut CSocketAddrInet)?.write(raw_socket)?; + UserPointerMut::new(addrlen_ptr)?.write(size_of::() as u32)?; + Ok(()) + } + SocketAddr::V6(_) => panic!("IPv6 is not supported"), + } +} + #[eonix_macros::define_syscall(SYS_SOCKET)] -fn socket(_domain: u32, _socket_type: u32, _protocol: u32) -> KResult { - Err(EINVAL) +fn socket(domain: u32, type_: u32, protocol: u32) -> KResult { + let domain = SockDomain::try_from(domain).map_err(|_| EINVAL)?; + let sock_type = SockType::try_from(type_ & SOCK_TYPE_MASK).map_err(|_| EINVAL)?; + let sock_flags = SockFlags::from_bits_truncate(type_ & !SOCK_TYPE_MASK); + let protocol = Protocol::try_from(protocol).map_err(|_| EINVAL)?; + + let is_nonblock = sock_flags.contains(SockFlags::SOCK_NONBLOCK); + + let socket = match (domain, sock_type) { + (SockDomain::AF_INET, SockType::SOCK_STREAM) => match protocol { + Protocol::IPPROTO_IP | Protocol::IPPROTO_TCP => TcpSocket::new(is_nonblock) as _, + _ => return Err(EAFNOSUPPORT), + }, + (SockDomain::AF_INET, SockType::SOCK_DGRAM) => match protocol { + Protocol::IPPROTO_IP | Protocol::IPPROTO_UDP => UdpSocket::new(is_nonblock) as _, + _ => return Err(EAFNOSUPPORT), + }, + _ => return Err(EAFNOSUPPORT), + }; + + thread.files.socket(socket) +} + +#[eonix_macros::define_syscall(SYS_SETSOCKOPT)] +fn set_sockopt(fd: FD, level: u32, optname: u32, optval: *const u8, optlen: u32) -> KResult<()> { + Ok(()) +} + +#[eonix_macros::define_syscall(SYS_GETSOCKOPT)] +fn get_sockopt(fd: FD, level: u32, optname: u32, optval: *mut u8, optlen: *mut u32) -> KResult<()> { + const SOL_SOCKET: u32 = 1; + const SQL_TCP: u32 = 6; + + const SNDBUF: u32 = 7; + const RCVBUF: u32 = 8; + + const TCP_MAXSEG: u32 = 2; + + const MAX_SEGMENT_SIZE: u32 = 1460; + + if level == SOL_SOCKET { + if optname == SNDBUF || optname == RCVBUF { + UserPointerMut::new(optval as *mut u32)?.write(65536)?; + UserPointerMut::new(optlen as *mut u32)?.write(size_of::() as u32)?; + } + } else if level == SQL_TCP { + if optname == TCP_MAXSEG { + UserPointerMut::new(optval as *mut u32)?.write(MAX_SEGMENT_SIZE)?; + UserPointerMut::new(optlen as *mut u32)?.write(size_of::() as u32)?; + } + } + + Ok(()) +} + +#[eonix_macros::define_syscall(SYS_GETSOCKNAME)] +fn get_socktname(sockfd: FD, sockaddr_ptr: *mut CSockAddr, addrlen_ptr: *mut u32) -> KResult<()> { + let socket = thread + .files + .get(sockfd) + .ok_or(EBADF)? + .get_socket()? + .ok_or(ENOTSOCK)?; + + let local_addr = socket.local_addr().unwrap(); + if sockaddr_ptr as usize != 0 { + write_socket_addr(sockaddr_ptr, addrlen_ptr, local_addr)?; + } + + Ok(()) +} + +#[eonix_macros::define_syscall(SYS_GETPEERNAME)] +fn get_peername(sockfd: FD, sockaddr_ptr: *mut CSockAddr, addrlen_ptr: *mut u32) -> KResult<()> { + let socket = thread + .files + .get(sockfd) + .ok_or(EBADF)? + .get_socket()? + .ok_or(ENOTSOCK)?; + + let remote_addr = socket.remote_addr().unwrap(); + if sockaddr_ptr as usize != 0 { + write_socket_addr(sockaddr_ptr, addrlen_ptr, remote_addr)?; + } + Ok(()) +} + +#[eonix_macros::define_syscall(SYS_BIND)] +fn bind(sockfd: FD, sockaddr_ptr: *const CSockAddr, addrlen: u32) -> KResult<()> { + let socket = thread + .files + .get(sockfd) + .ok_or(EBADF)? + .get_socket()? + .ok_or(ENOTSOCK)?; + + let socket_addr = read_socket_addr(sockaddr_ptr, addrlen as usize)?; + + let res = socket.bind(socket_addr); + res +} + +#[eonix_macros::define_syscall(SYS_LISTEN)] +fn listen(sockfd: FD, backlog: u32) -> KResult<()> { + let socket = thread + .files + .get(sockfd) + .ok_or(EBADF)? + .get_socket()? + .ok_or(ENOTSOCK)?; + + let res = socket.listen(backlog as usize); + res +} + +#[eonix_macros::define_syscall(SYS_ACCEPT)] +fn accept(sockfd: FD, sockaddr_ptr: *mut CSockAddr, addrlen_ptr: *mut u32) -> KResult { + let socket = thread + .files + .get(sockfd) + .ok_or(EBADF)? + .get_socket()? + .ok_or(ENOTSOCK)?; + + let accepted_socket = Task::block_on(socket.accept())?; + write_socket_addr( + sockaddr_ptr, + addrlen_ptr, + accepted_socket.remote_addr().unwrap(), + )?; + let res = thread.files.socket(accepted_socket); + res +} + +#[eonix_macros::define_syscall(SYS_CONNECT)] +fn connect(sockfd: FD, sockaddr_ptr: *const CSockAddr, addrlen: u32) -> KResult<()> { + let socket = thread + .files + .get(sockfd) + .ok_or(EBADF)? + .get_socket()? + .ok_or(ENOTSOCK)?; + + let remote_addr = read_socket_addr(sockaddr_ptr, addrlen as usize)?; + + let res = Task::block_on(socket.connect(remote_addr)); + res +} + +#[eonix_macros::define_syscall(SYS_RECVMSG)] +fn recvmsg(sockfd: FD, msghdr_ptr: *mut MsgHdr, flags: u32) -> KResult { + let socket = thread + .files + .get(sockfd) + .ok_or(EBADF)? + .get_socket()? + .ok_or(ENOTSOCK)?; + + let msghdr = UserPointer::new(msghdr_ptr)?.read()?; + + let mut iov_user = UserPointer::new(msghdr.msg_iov as *mut IoVec)?; + let iov_buffers = (0..msghdr.msg_iovlen) + .map(|_| { + let iov_result = iov_user.read()?; + iov_user = iov_user.offset(1)?; + Ok(iov_result) + }) + .filter_map(|iov_result| match iov_result { + Err(err) => Some(Err(err)), + Ok(IoVec { + len: Long::ZERO, .. + }) => None, + Ok(IoVec { base, len }) => Some(UserBuffer::new(base.addr() as *mut u8, len.get())), + }) + .collect::>>()?; + + let mut recv_metadata = None; + let mut tot = 0usize; + for mut buffer in iov_buffers.into_iter() { + let (nread, recv_meta) = Task::block_on(socket.recv(&mut buffer))?; + + if recv_metadata.is_none() { + recv_metadata = Some(recv_meta); + } else { + assert_eq!(recv_metadata, Some(recv_meta)); + } + + tot += nread; + if nread != buffer.total() { + break; + } + } + + if msghdr.msg_name != 0 { + let addrlen_ptr = unsafe { msghdr_ptr.byte_add(core::mem::size_of::()) as *mut u32 }; + write_socket_addr( + msghdr.msg_name as _, + addrlen_ptr, + recv_metadata.unwrap().remote_addr, + )?; + } + + Ok(tot) +} + +#[eonix_macros::define_syscall(SYS_RECVFROM)] +fn recvfrom( + sockfd: FD, + buf: *mut u8, + len: usize, + flags: u32, + srcaddr_ptr: *mut CSockAddr, + addrlen_ptr: *mut u32, +) -> KResult { + let socket = thread + .files + .get(sockfd) + .ok_or(EBADF)? + .get_socket()? + .ok_or(ENOTSOCK)?; + + let (ret, recv_meta) = Task::block_on(socket.recv(&mut UserBuffer::new(buf, len)?))?; + + if srcaddr_ptr as usize != 0 { + write_socket_addr(srcaddr_ptr, addrlen_ptr, recv_meta.remote_addr)?; + } + + Ok(ret) +} + +#[eonix_macros::define_syscall(SYS_SENDMSG)] +fn sendmsg(sockfd: FD, msghdr: *const MsgHdr, flags: u32) -> KResult { + let socket = thread + .files + .get(sockfd) + .ok_or(EBADF)? + .get_socket()? + .ok_or(ENOTSOCK)?; + + let msghdr = UserPointer::new(msghdr)?.read()?; + + let mut iov_user = UserPointer::new(msghdr.msg_iov as *const IoVec)?; + let iov_streams = (0..msghdr.msg_iovlen) + .map(|_| { + let iov_result = iov_user.read()?; + iov_user = iov_user.offset(1)?; + Ok(iov_result) + }) + .filter_map(|iov_result| match iov_result { + Err(err) => Some(Err(err)), + Ok(IoVec { + len: Long::ZERO, .. + }) => None, + Ok(IoVec { base, len }) => Some( + CheckedUserPointer::new(base.addr() as *mut u8, len.get()) + .map(|ptr| ptr.into_stream()), + ), + }) + .collect::>>()?; + + let remote_addr = if msghdr.msg_namelen == 0 { + None + } else { + Some(read_socket_addr( + msghdr.msg_name as _, + msghdr.msg_namelen as usize, + )?) + }; + + let mut tot = 0usize; + for mut stream in iov_streams.into_iter() { + let nread = Task::block_on(socket.send(&mut stream, SendMetadata { remote_addr }))?; + tot += nread; + + if nread == 0 || !stream.is_drained() { + break; + } + } + + Ok(tot) +} + +#[eonix_macros::define_syscall(SYS_SENDTO)] +fn sendto( + sockfd: FD, + buf: *const u8, + len: usize, + flags: u32, + dstaddr_ptr: *const CSockAddr, + addrlen: u32, +) -> KResult { + let socket = thread + .files + .get(sockfd) + .ok_or(EBADF)? + .get_socket()? + .ok_or(ENOTSOCK)?; + + let remote_addr = if addrlen == 0 { + None + } else { + Some(read_socket_addr(dstaddr_ptr, addrlen as usize)?) + }; + + let mut user_stream = CheckedUserPointer::new(buf, len).map(|ptr| ptr.into_stream())?; + Task::block_on(socket.send(&mut user_stream, SendMetadata { remote_addr })) } pub fn keep_alive() {} diff --git a/src/kernel/syscall/sysinfo.rs b/src/kernel/syscall/sysinfo.rs index 5092c8a6..73e61a06 100644 --- a/src/kernel/syscall/sysinfo.rs +++ b/src/kernel/syscall/sysinfo.rs @@ -1,7 +1,10 @@ use crate::{ io::Buffer as _, kernel::{ - constants::{CLOCK_MONOTONIC, CLOCK_REALTIME, CLOCK_REALTIME_COARSE, EINTR, EINVAL}, + constants::{ + CLOCK_MONOTONIC, CLOCK_PROCESS_CPUTIME_ID, CLOCK_REALTIME, CLOCK_REALTIME_COARSE, + EINTR, EINVAL, + }, task::Thread, timer::{Instant, Ticks}, user::{UserBuffer, UserPointerMut}, @@ -100,6 +103,13 @@ fn do_clock_gettime64(_thread: &Thread, clock_id: u32, timespec: *mut TimeSpec) tv_nsec: 0, }) } + CLOCK_PROCESS_CPUTIME_ID => { + let uptime_secs = Ticks::since_boot().as_secs() / 10; + timespec.write(TimeSpec { + tv_sec: uptime_secs, + tv_nsec: 0, + }) + } clock_id => unimplemented!("Unsupported clock_id: {}", clock_id), } } diff --git a/src/kernel/task/loader/elf.rs b/src/kernel/task/loader/elf.rs index 073026a9..3068ca2b 100644 --- a/src/kernel/task/loader/elf.rs +++ b/src/kernel/task/loader/elf.rs @@ -124,7 +124,7 @@ impl_elf_addr!(u32); impl_elf_addr!(u64); pub trait ElfArch { - type Ea: ElfAddr + Clone + Copy; + type Ea: ElfAddr + Clone + Copy + Send; type Ph: ProgramHeader + Clone + Copy + Default; const DYN_BASE_ADDR: usize; diff --git a/src/kernel/timer.rs b/src/kernel/timer.rs index 9b6a3ff2..1040f0dd 100644 --- a/src/kernel/timer.rs +++ b/src/kernel/timer.rs @@ -8,6 +8,7 @@ use core::{ time::Duration, }; use eonix_hal::processor::CPU; +use eonix_log::println_debug; use eonix_sync::{Spin, SpinIrq as _}; use posix_types::stat::{StatXTimestamp, TimeSpec, TimeVal}; @@ -109,6 +110,10 @@ impl Instant { pub fn since_epoch(&self) -> Duration { Duration::new(self.secs_since_epoch, self.nsecs_within) } + + pub fn to_millis(&self) -> u64 { + (self.secs_since_epoch * 1_000) + (self.nsecs_within / 1_000_000) as u64 + } } impl From for TimeSpec { @@ -177,6 +182,12 @@ pub fn timer_interrupt() { return; }; + // println_debug!( + // "time interrupt {:?} {:?} {:?}", + // current_tick, + // wakeup_tick, + // sleepers_to_wakeup.wakers.borrow().len() + // ); for waker in sleepers_to_wakeup.wakers.into_inner() { waker.wake(); } @@ -212,6 +223,7 @@ pub fn should_reschedule() -> bool { pub async fn sleep(duration: Duration) { let wakeup_tick = Ticks::now() + Ticks(duration.as_millis() as usize); + // println_debug!("sleep wakeup_tick{:?}", wakeup_tick); core::future::poll_fn(|ctx| { if Ticks::now() >= wakeup_tick { diff --git a/src/kernel/user/dataflow.rs b/src/kernel/user/dataflow.rs index 17dbd4c9..89609a74 100644 --- a/src/kernel/user/dataflow.rs +++ b/src/kernel/user/dataflow.rs @@ -16,6 +16,8 @@ pub struct CheckedUserPointer<'a> { _phantom: PhantomData<&'a ()>, } +unsafe impl<'a> Send for CheckedUserPointer<'a> {} + pub struct UserBuffer<'a> { ptr: CheckedUserPointer<'a>, size: usize, @@ -500,6 +502,10 @@ impl UserStream<'_> { } impl Stream for UserStream<'_> { + fn total(&self) -> usize { + self.len() + } + fn poll_data<'a>(&mut self, buf: &'a mut [u8]) -> KResult> { assert_preempt_enabled!("UserStream::poll_data"); diff --git a/src/kernel/vfs/file.rs b/src/kernel/vfs/file.rs index 49cb1d44..e2b7d771 100644 --- a/src/kernel/vfs/file.rs +++ b/src/kernel/vfs/file.rs @@ -14,6 +14,7 @@ use crate::{ vfs::inode::Inode, CharDevice, }, + net::socket::{SendMetadata, Socket}, prelude::*, sync::CondVar, }; @@ -80,6 +81,7 @@ pub enum FileType { PipeWrite(PipeWriteEnd), TTY(TerminalFile), CharDev(Arc), + Socket(Arc), } pub struct File { @@ -94,6 +96,13 @@ impl File { _ => Ok(None), } } + + pub fn get_socket(&self) -> KResult>> { + match &self.file_type { + FileType::Socket(socket) => Ok(Some(socket.clone())), + _ => Ok(None), + } + } } pub enum SeekOption { @@ -103,6 +112,7 @@ pub enum SeekOption { } bitflags! { + #[derive(Clone, Copy, Debug)] pub struct PollEvent: u16 { const Readable = 0x0001; const Writable = 0x0002; @@ -484,6 +494,7 @@ impl FileType { FileType::PipeRead(pipe) => pipe.pipe.read(buffer).await, FileType::TTY(tty) => tty.read(buffer).await, FileType::CharDev(device) => device.read(buffer), + FileType::Socket(socket) => socket.recv(buffer).await.map(|res| res.0), _ => Err(EBADF), } } @@ -509,6 +520,7 @@ impl FileType { FileType::PipeWrite(pipe) => pipe.pipe.write(stream).await, FileType::TTY(tty) => tty.write(stream), FileType::CharDev(device) => device.write(stream), + FileType::Socket(socket) => socket.send(stream, SendMetadata::default()).await, _ => Err(EBADF), } } @@ -582,6 +594,7 @@ impl FileType { FileType::TTY(tty) => tty.poll(event).await, FileType::PipeRead(PipeReadEnd { pipe }) | FileType::PipeWrite(PipeWriteEnd { pipe }) => pipe.poll(event).await, + FileType::Socket(socket) => socket.poll(event), _ => unimplemented!("Poll event not supported."), } } diff --git a/src/kernel/vfs/filearray.rs b/src/kernel/vfs/filearray.rs index f8b06a12..16149302 100644 --- a/src/kernel/vfs/filearray.rs +++ b/src/kernel/vfs/filearray.rs @@ -3,26 +3,33 @@ use super::{ inode::Mode, s_ischr, Spin, }; -use crate::kernel::{ - constants::{ - EBADF, EISDIR, ENOTDIR, F_DUPFD, F_DUPFD_CLOEXEC, F_GETFD, F_GETFL, F_SETFD, F_SETFL, - }, - syscall::{FromSyscallArg, SyscallRetVal}, -}; use crate::{ kernel::{ console::get_console, constants::ENXIO, - vfs::{dentry::Dentry, file::Pipe, s_isdir, s_isreg}, + vfs::{ + dentry::Dentry, + file::{FileType, Pipe}, + s_isdir, s_isreg, + }, CharDevice, }, prelude::*, }; +use crate::{ + kernel::{ + constants::{ + EBADF, EISDIR, ENOTDIR, F_DUPFD, F_DUPFD_CLOEXEC, F_GETFD, F_GETFL, F_SETFD, F_SETFL, + }, + syscall::{FromSyscallArg, SyscallRetVal}, + }, + net::socket::Socket, +}; use alloc::{ collections::btree_map::{BTreeMap, Entry}, sync::Arc, }; -use core::sync::atomic::Ordering; +use core::sync::atomic::{AtomicU32, Ordering}; use itertools::{ FoldWhile::{Continue, Done}, Itertools, @@ -32,6 +39,12 @@ use posix_types::open::{FDFlags, OpenFlags}; #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)] pub struct FD(u32); +impl From for FD { + fn from(fd: u32) -> Self { + FD(fd) + } +} + #[derive(Clone)] struct OpenFile { flags: FDFlags, @@ -173,6 +186,17 @@ impl FileArray { Ok((read_fd, write_fd)) } + pub fn socket(&self, socket: Arc) -> KResult { + let mut inner = self.inner.lock(); + let sockfd = inner.next_fd(); + inner.do_insert( + sockfd, + FDFlags::default(), + File::new(OpenFlags::default(), FileType::Socket(socket)), + ); + Ok(sockfd) + } + pub fn open(&self, dentry: &Arc, flags: OpenFlags, mode: Mode) -> KResult { dentry.open_check(flags, mode)?; diff --git a/src/lib.rs b/src/lib.rs index 6fd82c40..8d12054f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -5,6 +5,7 @@ #![feature(arbitrary_self_types)] #![feature(get_mut_unchecked)] #![feature(macro_metavar_expr)] +#![feature(ip_from)] extern crate alloc; @@ -145,7 +146,7 @@ async fn init_process(early_kstack: PRange) { { // We might want the serial initialized as soon as possible. driver::serial::init().unwrap(); - driver::e1000e::register_e1000e_driver(); + // driver::e1000e::register_e1000e_driver(); driver::ahci::register_ahci_driver(); } @@ -153,7 +154,7 @@ async fn init_process(early_kstack: PRange) { { driver::serial::init().unwrap(); driver::virtio::init_virtio_devices(); - driver::e1000e::register_e1000e_driver(); + // driver::e1000e::register_e1000e_driver(); driver::ahci::register_ahci_driver(); driver::goldfish_rtc::probe(); } @@ -162,10 +163,12 @@ async fn init_process(early_kstack: PRange) { { driver::serial::init().unwrap(); driver::virtio::init_virtio_devices(); - driver::e1000e::register_e1000e_driver(); + // driver::e1000e::register_e1000e_driver(); driver::ahci::register_ahci_driver(); } + net::init().expect("Failed to initialize network"); + fs::tmpfs::init(); fs::procfs::init(); fs::fat32::init(); diff --git a/src/net.rs b/src/net.rs index 95fa3b19..3b1415f0 100644 --- a/src/net.rs +++ b/src/net.rs @@ -1 +1,99 @@ +pub mod device; +pub mod iface; pub mod netdev; +pub mod socket; + +pub use device::register_netdev; + +use alloc::sync::Arc; +use core::time::Duration; +use eonix_log::{println_debug, println_warn}; +use eonix_runtime::{run::FutureRun, scheduler::Scheduler, task::Task}; +use eonix_sync::Mutex; +use smoltcp::wire::{Ipv4Address, Ipv4Cidr}; + +use crate::{ + driver::virtio::VIRTIO_NET_NAME, + kernel::{task::KernelStack, timer::sleep}, + net::{ + device::{ + get_netdev, + loopback::{Loopback, LOOPBACK_NAME}, + RxBuffer, NETDEVS, USED_RX_BUFFERS, + }, + iface::{Iface, IFACES}, + }, + prelude::KResult, +}; + +const VIRTIO_ADDRESS: Ipv4Address = Ipv4Address::new(10, 0, 2, 15); +const VIRTIO_ADDRESS_PREFIX_LEN: u8 = 24; +const VIRTIO_GATEWAY: Ipv4Address = Ipv4Address::new(10, 0, 2, 2); +const LOOPBACK_ADDRESS: Ipv4Address = Ipv4Address::new(127, 0, 0, 1); +const LOOPBACK_ADDRESS_PREFIX_LEN: u8 = 8; + +pub fn init() -> KResult<()> { + let mut ifaces = Task::block_on(IFACES.lock()); + let mut netdevs = Task::block_on(NETDEVS.lock()); + + netdevs.insert(LOOPBACK_NAME, Arc::new(Mutex::new(Loopback::new()))); + + for (name, netdev) in netdevs.iter() { + if *name == VIRTIO_NET_NAME { + let virtio_iface = Iface::new( + netdev.clone(), + Ipv4Cidr::new(VIRTIO_ADDRESS, VIRTIO_ADDRESS_PREFIX_LEN), + Some(VIRTIO_GATEWAY), + ); + ifaces.insert(name, Arc::new(Mutex::new(virtio_iface))); + } else if *name == LOOPBACK_NAME { + let loopback_iface = Iface::new( + netdev.clone(), + Ipv4Cidr::new(LOOPBACK_ADDRESS, LOOPBACK_ADDRESS_PREFIX_LEN), + None, + ); + ifaces.insert(name, Arc::new(Mutex::new(loopback_iface))); + } else { + println_warn!("Currently only virtio_net is supported"); + } + } + + drop(ifaces); + drop(netdevs); + + Scheduler::get().spawn::(FutureRun::new(ifaces_poll())); + + Ok(()) +} + +// Temporary spawn a task to poll network interfaces +// Better register soft irq handler +pub async fn ifaces_poll() { + loop { + let ifaces = IFACES.lock().await; + for iface in ifaces.values() { + let mut iface_guard = iface.lock().await; + iface_guard.poll(); + } + drop(ifaces); + + // Ugly since i have no time to redesign rx_recycle + let virio_netdev = get_netdev(VIRTIO_NET_NAME).unwrap(); + let mut virio_netdev_guard = virio_netdev.lock().await; + let mut used_rx_buffers = USED_RX_BUFFERS.lock().await; + + while let Some(rx_buffer) = used_rx_buffers.pop_front() { + match rx_buffer { + RxBuffer::VirtIOBuffer(_) => { + virio_netdev_guard.recycle_rx_buffer(rx_buffer).unwrap(); + } + _ => {} + } + } + + drop(virio_netdev_guard); + drop(used_rx_buffers); + + sleep(Duration::from_millis(50)).await; + } +} diff --git a/src/net/device.rs b/src/net/device.rs new file mode 100644 index 00000000..f95ff1b1 --- /dev/null +++ b/src/net/device.rs @@ -0,0 +1,138 @@ +pub mod loopback; + +use crate::prelude::KResult; +use alloc::{ + collections::{btree_map::BTreeMap, vec_deque::VecDeque}, + sync::Arc, + vec, + vec::Vec, +}; +use eonix_log::println_debug; +use eonix_runtime::task::Task; +use eonix_sync::Mutex; + +pub use smoltcp::phy::DeviceCapabilities; + +pub type NetDevice = Arc>; +pub type Mac = [u8; 6]; + +pub static NETDEVS: Mutex> = Mutex::new(BTreeMap::new()); + +pub fn register_netdev(name: &'static str, netdev: impl NetDev + 'static) -> KResult<()> { + let netdev = Arc::new(Mutex::new(netdev)); + + let mut netdevs = Task::block_on(NETDEVS.lock()); + netdevs.insert(name, netdev); + drop(netdevs); + + Ok(()) +} + +pub fn get_netdev(name: &'static str) -> Option { + let netdevs = Task::block_on(NETDEVS.lock()); + netdevs.get(name).cloned() +} + +#[derive(Debug, Clone, Copy)] +pub enum NetDevError { + Unknown, +} + +pub enum RxBuffer { + VirtIOBuffer(virtio_drivers::device::net::RxBuffer), + LoopBackBuffer(Vec), +} + +impl RxBuffer { + fn packet(&self) -> &[u8] { + match self { + RxBuffer::VirtIOBuffer(rx_buffer) => rx_buffer.packet(), + RxBuffer::LoopBackBuffer(rx_buffer) => rx_buffer.as_slice(), + } + } + + pub fn into_virtio_buffer(self) -> Option { + match self { + RxBuffer::VirtIOBuffer(rx_buffer) => Some(rx_buffer), + _ => None, + } + } +} + +pub trait NetDev: Send { + fn name(&self) -> &'static str; + fn mac_addr(&self) -> Mac; + fn caps(&self) -> DeviceCapabilities; + + fn can_receive(&self) -> bool; + fn can_send(&self) -> bool; + + fn recv(&mut self) -> Result; + fn recycle_rx_buffer(&mut self, rx_buffer: RxBuffer) -> Result<(), NetDevError>; + fn send(&mut self, data: &[u8]) -> Result<(), NetDevError>; +} + +impl smoltcp::phy::Device for dyn NetDev { + type RxToken<'a> + = RxToken + where + Self: 'a; + + type TxToken<'a> + = TxToken<'a> + where + Self: 'a; + + fn receive( + &mut self, + _timestamp: smoltcp::time::Instant, + ) -> Option<(Self::RxToken<'_>, Self::TxToken<'_>)> { + if self.can_receive() && self.can_send() { + let rx_buffer = self.recv().unwrap(); + Some((RxToken(rx_buffer), TxToken(self))) + } else { + None + } + } + + fn transmit(&mut self, _timestamp: smoltcp::time::Instant) -> Option> { + if self.can_send() { + Some(TxToken(self)) + } else { + None + } + } + + fn capabilities(&self) -> DeviceCapabilities { + self.caps() + } +} + +pub static USED_RX_BUFFERS: Mutex> = Mutex::new(VecDeque::new()); + +pub struct RxToken(RxBuffer); + +impl smoltcp::phy::RxToken for RxToken { + fn consume(self, f: F) -> R + where + F: FnOnce(&[u8]) -> R, + { + let res = f(self.0.packet()); + Task::block_on(USED_RX_BUFFERS.lock()).push_back(self.0); + res + } +} + +pub struct TxToken<'a>(&'a mut dyn NetDev); + +impl smoltcp::phy::TxToken for TxToken<'_> { + fn consume(self, len: usize, f: F) -> R + where + F: FnOnce(&mut [u8]) -> R, + { + let mut tx_data = vec![0; len]; + let res = f(&mut tx_data); + self.0.send(&tx_data).expect("Send packet failed"); + res + } +} diff --git a/src/net/device/loopback.rs b/src/net/device/loopback.rs new file mode 100644 index 00000000..1ad5a5b9 --- /dev/null +++ b/src/net/device/loopback.rs @@ -0,0 +1,62 @@ +use alloc::collections::VecDeque; +use alloc::vec::Vec; +use smoltcp::phy::{ChecksumCapabilities, DeviceCapabilities, Medium}; + +use crate::net::device::{Mac, NetDev, NetDevError, RxBuffer}; + +pub struct Loopback { + pub queue: VecDeque>, +} + +impl Loopback { + pub fn new() -> Loopback { + Loopback { + queue: VecDeque::new(), + } + } +} + +pub static LOOPBACK_NAME: &'static str = "loopback"; + +impl NetDev for Loopback { + fn name(&self) -> &'static str { + LOOPBACK_NAME + } + + fn mac_addr(&self) -> Mac { + [0, 0, 0, 0, 0, 0] + } + + fn can_receive(&self) -> bool { + !self.queue.is_empty() + } + + fn can_send(&self) -> bool { + true + } + + fn caps(&self) -> DeviceCapabilities { + let mut caps = DeviceCapabilities::default(); + caps.max_transmission_unit = 65535; + caps.max_burst_size = None; + caps.medium = Medium::Ip; + caps.checksum = ChecksumCapabilities::ignored(); + caps + } + + fn recv(&mut self) -> Result { + self.queue + .pop_back() + .map(|rx_buffer| RxBuffer::LoopBackBuffer(rx_buffer)) + .ok_or(NetDevError::Unknown) + } + + fn recycle_rx_buffer(&mut self, _rx_buffer: RxBuffer) -> Result<(), NetDevError> { + Ok(()) + } + + fn send(&mut self, data: &[u8]) -> Result<(), NetDevError> { + self.queue.push_back(Vec::from(data)); + Ok(()) + } +} diff --git a/src/net/iface.rs b/src/net/iface.rs new file mode 100644 index 00000000..26699192 --- /dev/null +++ b/src/net/iface.rs @@ -0,0 +1,184 @@ +use core::net::{IpAddr, Ipv4Addr, SocketAddr}; + +use crate::driver::virtio::VIRTIO_NET_NAME; +use crate::kernel::constants::EADDRINUSE; +use crate::kernel::timer::Instant; +use crate::net::device::NetDevice; +use crate::net::socket::SocketType; +use crate::prelude::KResult; + +use alloc::collections::btree_map::BTreeMap; +use alloc::collections::btree_set::BTreeSet; +use alloc::sync::Arc; +use alloc::vec; +use eonix_runtime::task::Task; +use eonix_sync::Mutex; +use smoltcp::phy::Medium; +use smoltcp::{ + iface::{Config, Interface, SocketHandle, SocketSet}, + socket::{tcp, udp}, + wire::{self, EthernetAddress, Ipv4Cidr}, +}; + +pub static IFACES: Mutex> = Mutex::new(BTreeMap::new()); + +pub type NetIface = Arc>; + +pub struct Iface { + device: NetDevice, + iface_inner: Interface, + // Should distinguish TCP/UDP ports + used_ports: BTreeSet<(SocketType, u16)>, + sockets: SocketSet<'static>, +} + +unsafe impl Send for Iface {} + +const IP_LOCAL_PORT_START: u16 = 32768; +const IP_LOCAL_PORT_END: u16 = 60999; + +const TCP_RX_BUF_LEN: usize = 65536; +const TCP_TX_BUF_LEN: usize = 65536; +const UDP_RX_BUF_LEN: usize = 65536; +const UDP_TX_BUF_LEN: usize = 65536; + +impl Iface { + pub fn new(device: NetDevice, ip_cidr: Ipv4Cidr, gateway: Option) -> Self { + let iface_inner = { + let mut device = Task::block_on(device.lock()); + let config = match device.caps().medium { + Medium::Ethernet => Config::new(wire::HardwareAddress::Ethernet(EthernetAddress( + device.mac_addr(), + ))), + Medium::Ip => Config::new(wire::HardwareAddress::Ip), + }; + let now = smoltcp::time::Instant::from_millis(Instant::now().to_millis() as i64); + let mut iface = Interface::new(config, &mut *device, now); + iface.update_ip_addrs(|ip_addrs| ip_addrs.push(wire::IpCidr::Ipv4(ip_cidr)).unwrap()); + + if let Some(gateway) = gateway { + iface.routes_mut().add_default_ipv4_route(gateway).unwrap(); + } + + iface + }; + + Self { + device, + iface_inner, + used_ports: BTreeSet::new(), + sockets: SocketSet::new(vec![]), + } + } + + pub fn iface_and_sockets(&mut self) -> (&mut Interface, &mut SocketSet<'static>) { + (&mut self.iface_inner, &mut self.sockets) + } + + fn new_tcp_socket(&mut self) -> SocketHandle { + let rx_buffer = tcp::SocketBuffer::new(vec![0; TCP_RX_BUF_LEN]); + let tx_buffer = tcp::SocketBuffer::new(vec![0; TCP_TX_BUF_LEN]); + + self.sockets.add(tcp::Socket::new(rx_buffer, tx_buffer)) + } + + fn new_udp_socket(&mut self) -> SocketHandle { + let rx_buffer = + udp::PacketBuffer::new(vec![udp::PacketMetadata::EMPTY; 8], vec![0; UDP_RX_BUF_LEN]); + let tx_buffer = + udp::PacketBuffer::new(vec![udp::PacketMetadata::EMPTY; 8], vec![0; UDP_TX_BUF_LEN]); + + self.sockets.add(udp::Socket::new(rx_buffer, tx_buffer)) + } + + pub fn remove_socket(&mut self, handle: SocketHandle, port: u16, socket_type: SocketType) { + self.sockets.remove(handle); + // FIXME: may many sockets use on port + self.used_ports.remove(&(socket_type, port)); + } + + pub fn bind_socket( + &mut self, + bind_port: u16, + socket_type: SocketType, + ) -> KResult<(SocketAddr, SocketHandle)> { + if self.used_ports.contains(&(socket_type, bind_port)) { + return Err(EADDRINUSE); + } + + let port = if bind_port == 0 { + self.alloc_port(socket_type).ok_or(EADDRINUSE)? + } else { + self.used_ports.insert((socket_type, bind_port)); + bind_port + }; + + let socket_addr = SocketAddr::new( + IpAddr::V4(self.iface_inner.ipv4_addr().expect("Set Ipv4 in construct")), + port, + ); + + let socket_handle = match socket_type { + SocketType::Tcp => self.new_tcp_socket(), + SocketType::Udp => self.new_udp_socket(), + }; + + Ok((socket_addr, socket_handle)) + } + + fn alloc_port(&mut self, socket_type: SocketType) -> Option { + // FIXME: more efficient way to allocate ports + for port in IP_LOCAL_PORT_START..=IP_LOCAL_PORT_END { + if !self.used_ports.contains(&(socket_type, port)) { + self.used_ports.insert((socket_type, port)); + return Some(port); + } + } + None + } + + pub fn ipv4_addr(&self) -> Option { + self.iface_inner.ipv4_addr() + } + + pub fn poll(&mut self) { + let mut device = Task::block_on(self.device.lock()); + let timestamp = smoltcp::time::Instant::from_millis(Instant::now().to_millis() as i64); + + self.iface_inner + .poll(timestamp, &mut *device, &mut self.sockets); + } +} + +pub fn get_relate_iface(ip_addr: IpAddr) -> Option { + let ifaces = Task::block_on(IFACES.lock()); + for iface in ifaces.values() { + let iface_guard = Task::block_on(iface.lock()); + for cidr in iface_guard.iface_inner.ip_addrs() { + if IpAddr::from(cidr.address()) == ip_addr { + return Some(iface.clone()); + } + } + } + + None +} + +pub fn get_ephemeral_iface(remote_addr: Option) -> Option { + let ifaces = Task::block_on(IFACES.lock()); + assert!(ifaces.len() > 0, "No network interfaces available"); + + if let Some(remote_addr) = remote_addr { + for iface in ifaces.values() { + let iface_guard = Task::block_on(iface.lock()); + for cidr in iface_guard.iface_inner.ip_addrs() { + if IpAddr::from(cidr.address()) == remote_addr { + return Some(iface.clone()); + } + } + } + } + + // FIXME: Temporary use virtio-net as our default iface + return ifaces.get(VIRTIO_NET_NAME).cloned(); +} diff --git a/src/net/socket.rs b/src/net/socket.rs new file mode 100644 index 00000000..402b08cd --- /dev/null +++ b/src/net/socket.rs @@ -0,0 +1,181 @@ +use crate::{ + io::{Buffer, Stream}, + kernel::{ + constants::{EADDRINUSE, EINVAL}, + vfs::file::PollEvent, + }, + net::iface::{NetIface, IFACES}, + prelude::KResult, +}; +use alloc::{boxed::Box, sync::Arc, vec::Vec}; +use async_trait::async_trait; +use core::net::SocketAddr; +use eonix_runtime::task::Task; +use smoltcp::iface::SocketHandle; + +pub mod tcp; +pub mod udp; + +#[derive(Clone, Copy, Debug, PartialEq, PartialOrd, Eq, Ord)] +pub enum SocketType { + Tcp, + Udp, +} + +#[derive(Clone, Copy, Debug, Default)] +pub struct SendMetadata { + pub remote_addr: Option, +} + +#[derive(Clone, Copy, Debug, PartialEq)] +pub struct RecvMetadata { + pub remote_addr: SocketAddr, +} + +#[async_trait] +pub trait Socket: Sync + Send { + fn bind(&self, _socket_addr: SocketAddr) -> KResult<()> { + Err(EINVAL) + } + + fn listen(&self, _backlog: usize) -> KResult<()> { + Err(EINVAL) + } + + async fn connect(&self, _remote_addr: SocketAddr) -> KResult<()> { + Err(EINVAL) + } + + async fn accept(&self) -> KResult> { + Err(EINVAL) + } + + fn local_addr(&self) -> Option; + + fn remote_addr(&self) -> Option; + + async fn recv(&self, buffer: &mut dyn Buffer) -> KResult<(usize, RecvMetadata)>; + + async fn send(&self, stream: &mut dyn Stream, send_meta: SendMetadata) -> KResult; + + fn poll(&self, events: PollEvent) -> KResult; +} + +#[derive(Clone)] +pub enum BoundSocket { + BoundSingle(BoundSingle), + BoundAll(BoundAll), +} + +impl BoundSocket { + fn new_bind_all(bind_port: u16, socket_type: SocketType) -> KResult { + Ok(BoundSocket::BoundAll(BoundAll::new_bind( + bind_port, + socket_type, + )?)) + } + + fn new_bind_single( + iface: NetIface, + bind_port: u16, + socket_type: SocketType, + ) -> KResult<(Self, SocketAddr)> { + let (single, sock_addr) = BoundSingle::new_bind(iface, bind_port, socket_type)?; + Ok((BoundSocket::BoundSingle(single), sock_addr)) + } + + fn new_single(iface: NetIface, socket_handle: SocketHandle) -> KResult { + Ok(BoundSocket::BoundSingle(BoundSingle::new( + iface, + socket_handle, + ))) + } + + fn as_single_bound(&self) -> Option<&BoundSingle> { + match self { + BoundSocket::BoundSingle(single) => Some(single), + _ => None, + } + } + + fn as_all_bound(&self) -> Option<&BoundAll> { + match self { + BoundSocket::BoundAll(all) => Some(all), + _ => None, + } + } +} + +/// BoundAll is only used for socket listen all ifaces +#[derive(Clone)] +struct BoundAll { + port: u16, + // FIXME: need support IFACES dyn change + sockets: Vec, +} + +impl BoundAll { + fn new_bind(bind_port: u16, socket_type: SocketType) -> KResult { + let ifaces_guard = Task::block_on(IFACES.lock()); + + let mut sockets = Vec::new(); + for iface in ifaces_guard.values() { + // FIXME: handle err except eaddrinuse + if let Ok((item, _)) = BoundSingle::new_bind(iface.clone(), bind_port, socket_type) { + sockets.push(item); + } + } + + if sockets.len() == 0 { + return Err(EADDRINUSE); + } + + Ok(Self { + port: bind_port, + sockets, + }) + } +} + +#[derive(Clone)] +struct BoundSingle { + iface: NetIface, + socket_handle: SocketHandle, +} + +impl BoundSingle { + fn new(iface: NetIface, socket_handle: SocketHandle) -> Self { + Self { + iface, + socket_handle, + } + } + + fn new_bind( + iface: NetIface, + bind_port: u16, + socket_type: SocketType, + ) -> KResult<(Self, SocketAddr)> { + let (socket_addr, socket_handle) = { + let mut iface_guard = Task::block_on(iface.lock()); + + iface_guard.bind_socket(bind_port, socket_type)? + }; + + Ok(( + Self { + iface, + socket_handle, + }, + socket_addr, + )) + } + + fn iface(&self) -> NetIface { + self.iface.clone() + } + + fn handle(&self) -> SocketHandle { + self.socket_handle + } +} diff --git a/src/net/socket/tcp.rs b/src/net/socket/tcp.rs new file mode 100644 index 00000000..0570a8ed --- /dev/null +++ b/src/net/socket/tcp.rs @@ -0,0 +1,505 @@ +use core::future::Future; +use core::net::SocketAddr; +use core::pin::Pin; +use core::task::{Poll, Waker}; + +use alloc::boxed::Box; +use alloc::sync::Arc; +use async_trait::async_trait; +use eonix_runtime::task::Task; +use eonix_sync::RwLock; +use smoltcp::socket::tcp; +use smoltcp::wire::IpListenEndpoint; +use smoltcp::{iface::SocketHandle, wire::IpEndpoint}; + +use crate::io::{Buffer, Stream}; +use crate::kernel::constants::{EADDRNOTAVAIL, EAGAIN, ECONNREFUSED, EINVAL, EISCONN, ENOTCONN}; +use crate::kernel::vfs::file::PollEvent; +use crate::net::iface::{get_ephemeral_iface, get_relate_iface, NetIface}; +use crate::net::socket::{BoundSocket, RecvMetadata, SendMetadata, Socket, SocketType}; +use crate::prelude::KResult; + +pub struct TcpSocket { + bound_socket: RwLock>, + local_addr: RwLock>, + remote_addr: RwLock>, + is_nonblock: bool, +} + +impl TcpSocket { + pub fn new(is_nonblock: bool) -> Arc { + Arc::new(Self { + bound_socket: RwLock::new(None), + local_addr: RwLock::new(None), + remote_addr: RwLock::new(None), + is_nonblock, + }) + } + + // Get single bound iface and socket handle + fn iface_and_handle(&self) -> Option<(NetIface, SocketHandle)> { + Task::block_on(self.bound_socket.read()) + .as_ref()? + .as_single_bound() + .map(|bound_socket| (bound_socket.iface(), bound_socket.handle())) + } + + fn listen_impl( + iface: NetIface, + socket_handle: SocketHandle, + local_endpoint: T, + backlog: usize, + ) -> KResult<()> + where + T: Into, + { + let mut iface_guard = Task::block_on(iface.lock()); + + let socket = iface_guard + .iface_and_sockets() + .1 + .get_mut::(socket_handle); + + socket + .listen(local_endpoint, Some(backlog)) + .map_err(|_| EINVAL) + } + + fn accept_impl( + iface: NetIface, + socket_handle: SocketHandle, + waker: &Waker, + ) -> KResult>> { + let mut iface_guard = Task::block_on(iface.lock()); + + let socket = iface_guard + .iface_and_sockets() + .1 + .get_mut::(socket_handle); + + if socket.may_accept() { + match socket.accept() { + Ok(Some(new_socket)) => { + let socket_addr = SocketAddr::from(new_socket.local_endpoint().unwrap()); + let remote_addr = SocketAddr::from(new_socket.remote_endpoint().unwrap()); + let handle = iface_guard.iface_and_sockets().1.add(new_socket); + let accept_socket = Arc::new(TcpSocket { + bound_socket: RwLock::new(Some(BoundSocket::new_single( + iface.clone(), + handle, + )?)), + local_addr: RwLock::new(Some(socket_addr)), + remote_addr: RwLock::new(Some(remote_addr)), + is_nonblock: false, + }); + Ok(Some(accept_socket)) + } + Ok(None) => { + socket.register_recv_waker(waker); + Ok(None) + } + Err(_) => Err(EINVAL), + } + } else { + Err(EINVAL) + } + } + + fn try_accept(&self, waker: &Waker) -> KResult>> { + let bound_socket_guard = Task::block_on(self.bound_socket.read()); + + match bound_socket_guard.as_ref().unwrap() { + BoundSocket::BoundSingle(single) => { + Self::accept_impl(single.iface(), single.handle(), waker) + } + BoundSocket::BoundAll(all) => { + for bound_socket in &all.sockets { + let res = + Self::accept_impl(bound_socket.iface(), bound_socket.handle(), waker)?; + if res.is_some() { + return Ok(res); + } + } + Ok(None) + } + } + } + + fn try_recv( + &self, + buf: &mut dyn Buffer, + waker: &Waker, + ) -> KResult> { + let (iface, handle) = self.iface_and_handle().unwrap(); + + let mut iface_guard = Task::block_on(iface.lock()); + + let socket = iface_guard + .iface_and_sockets() + .1 + .get_mut::(handle); + + if !socket.may_recv() { + Err(ENOTCONN) + } else if socket.can_recv() { + let len = socket + .recv(|rx_data| { + let _ = buf.fill(&rx_data[..]); + (buf.wrote(), buf.wrote()) + }) + .unwrap(); // unwrap() is safe due to the source code logic + Ok(Some(( + len, + RecvMetadata { + remote_addr: self.remote_addr().unwrap(), + }, + ))) + } else { + socket.register_recv_waker(waker); + Ok(None) + } + } + + fn try_send(&self, stream: &mut dyn Stream, waker: &Waker) -> KResult> { + let (iface, handle) = self.iface_and_handle().unwrap(); + + let mut iface_guard = Task::block_on(iface.lock()); + + let socket = iface_guard + .iface_and_sockets() + .1 + .get_mut::(handle); + + if !socket.may_send() { + Err(ENOTCONN) + } else if socket.can_send() { + let result = socket + .send(|tx_data| { + let result = stream + .poll_data(tx_data) + .map(|data| data.map(|write_in| write_in.len()).unwrap_or(0)); + (result.unwrap_or(0), result) + }) + .unwrap(); // unwrap() is safe due to the source code logic + + result.map(|len| Some(len)) + } else { + socket.register_send_waker(waker); + Ok(None) + } + } + + fn check_connect(&self, waker: &Waker) -> KResult> { + let (iface, handle) = self.iface_and_handle().unwrap(); + + let mut iface_guard = Task::block_on(iface.lock()); + + let socket = iface_guard + .iface_and_sockets() + .1 + .get_mut::(handle); + + if socket.state() == tcp::State::Established { + Ok(Some(())) + } else if !socket.is_active() { + Err(ECONNREFUSED) + } else { + socket.register_recv_waker(waker); + Ok(None) + } + } + + fn poll_impl( + iface: NetIface, + socket_handle: SocketHandle, + events: PollEvent, + ) -> KResult { + let mut iface_guard = Task::block_on(iface.lock()); + let socket = iface_guard + .iface_and_sockets() + .1 + .get_mut::(socket_handle); + + match socket.state() { + tcp::State::Established => { + let mut poll_state = PollEvent::empty(); + if events.contains(PollEvent::Readable) { + if !socket.may_recv() || socket.can_recv() { + poll_state |= PollEvent::Readable; + } + } + + if events.contains(PollEvent::Writable) { + if !socket.may_send() || socket.can_send() { + poll_state |= PollEvent::Writable; + } + } + Ok(poll_state) + } + tcp::State::Listen => { + let mut poll_state = PollEvent::empty(); + if events.contains(PollEvent::Readable) { + if socket.can_accept() { + poll_state |= PollEvent::Readable; + } + } + Ok(poll_state) + } + _ => Ok(events), + } + } +} + +#[async_trait] +impl Socket for TcpSocket { + fn local_addr(&self) -> Option { + Task::block_on(self.local_addr.read()).clone() + } + + fn remote_addr(&self) -> Option { + Task::block_on(self.remote_addr.read()).clone() + } + + fn bind(&self, socket_addr: SocketAddr) -> KResult<()> { + let mut bound_socket_guard = Task::block_on(self.bound_socket.write()); + + if bound_socket_guard.is_some() { + return Err(EINVAL); + } + + if socket_addr.ip().is_unspecified() { + *bound_socket_guard = Some(BoundSocket::new_bind_all( + socket_addr.port(), + SocketType::Tcp, + )?); + *Task::block_on(self.local_addr.write()) = Some(socket_addr); + } else { + let bind_iface = get_relate_iface(socket_addr.ip()).ok_or(EADDRNOTAVAIL)?; + + let (bound_socket, local_addr) = + BoundSocket::new_bind_single(bind_iface, socket_addr.port(), SocketType::Tcp)?; + + *bound_socket_guard = Some(bound_socket); + *Task::block_on(self.local_addr.write()) = Some(local_addr); + } + drop(bound_socket_guard); + + Ok(()) + } + + fn listen(&self, backlog: usize) -> KResult<()> { + let mut bound_socket_guard = Task::block_on(self.bound_socket.write()); + + if bound_socket_guard.is_none() { + // FIXME: should get a available port and bind to all iface + // need a way to get that port simply + // *bound_socket_guard = Some(BoundSocket::new_bind_all(available_port)?); + } + + drop(bound_socket_guard); + + let local_addr = self.local_addr().unwrap(); + + let bound_socket_guard = Task::block_on(self.bound_socket.read()); + + match bound_socket_guard.as_ref().unwrap() { + BoundSocket::BoundAll(all) => { + for item in &all.sockets { + let (iface, handle) = (item.iface(), item.handle()); + Self::listen_impl(iface, handle, local_addr.port(), backlog)?; + } + Ok(()) + } + BoundSocket::BoundSingle(single) => { + let (iface, handle) = (single.iface(), single.handle()); + Self::listen_impl(iface, handle, local_addr, backlog) + } + } + } + + async fn connect(&self, remote_addr: SocketAddr) -> KResult<()> { + let mut bound_socket_guard = self.bound_socket.write().await; + + if bound_socket_guard.is_none() { + let bind_iface = get_ephemeral_iface(Some(remote_addr.ip())); + let (bound_socket, local_addr) = + BoundSocket::new_bind_single(bind_iface.unwrap(), 0, SocketType::Tcp)?; + *bound_socket_guard = Some(bound_socket); + *Task::block_on(self.local_addr.write()) = Some(local_addr); + } + + drop(bound_socket_guard); + + let (iface, handle) = self.iface_and_handle().unwrap(); + + let mut iface_guard = iface.lock().await; + let (iface_inner, sockets) = iface_guard.iface_and_sockets(); + let socket = sockets.get_mut::(handle); + + socket + .connect( + iface_inner.context(), + IpEndpoint::from(remote_addr), + IpEndpoint::from(self.local_addr().unwrap()), + ) + .map_err(|err| match err { + tcp::ConnectError::Unaddressable => EADDRNOTAVAIL, + // FIXME: Should return EISCONN ? + tcp::ConnectError::InvalidState => EISCONN, + })?; + + drop(iface_guard); + + *Task::block_on(self.remote_addr.write()) = Some(remote_addr); + + struct ConnectFuture<'a> { + socket: &'a TcpSocket, + } + + impl<'a> Future for ConnectFuture<'a> { + type Output = KResult<()>; + + fn poll(self: Pin<&mut Self>, cx: &mut core::task::Context<'_>) -> Poll { + let this = self.get_mut(); + match this.socket.check_connect(cx.waker()) { + Ok(Some(_)) => Poll::Ready(Ok(())), + Ok(None) if this.socket.is_nonblock => Poll::Ready(Err(EAGAIN)), + Ok(None) => Poll::Pending, + Err(err) => Poll::Ready(Err(err)), + } + } + } + + ConnectFuture { socket: self }.await + } + + async fn accept(&self) -> KResult> { + struct AcceptFuture<'a> { + socket: &'a TcpSocket, + } + + impl<'a> Future for AcceptFuture<'a> { + type Output = KResult>; + + fn poll(self: Pin<&mut Self>, cx: &mut core::task::Context<'_>) -> Poll { + let this = self.get_mut(); + match this.socket.try_accept(cx.waker()) { + Ok(Some(res)) => Poll::Ready(Ok(res)), + Ok(None) if this.socket.is_nonblock => Poll::Ready(Err(EAGAIN)), + Ok(None) => Poll::Pending, + Err(err) => Poll::Ready(Err(err)), + } + } + } + + AcceptFuture { socket: self }.await + } + + async fn recv(&self, buffer: &mut dyn Buffer) -> KResult<(usize, RecvMetadata)> { + struct RecvFuture<'a> { + socket: &'a TcpSocket, + buffer: &'a mut dyn Buffer, + } + + impl<'a> Future for RecvFuture<'a> { + type Output = KResult<(usize, RecvMetadata)>; + + fn poll(self: Pin<&mut Self>, cx: &mut core::task::Context<'_>) -> Poll { + let this = self.get_mut(); + match this.socket.try_recv(this.buffer, cx.waker()) { + Ok(Some(res)) => Poll::Ready(Ok(res)), + Ok(None) if this.socket.is_nonblock => Poll::Ready(Err(EAGAIN)), + Ok(None) => Poll::Pending, + Err(err) => Poll::Ready(Err(err)), + } + } + } + + RecvFuture { + socket: self, + buffer, + } + .await + } + + async fn send(&self, stream: &mut dyn Stream, _send_meta: SendMetadata) -> KResult { + struct SendFuture<'a> { + socket: &'a TcpSocket, + stream: &'a mut dyn Stream, + } + + impl<'a> Future for SendFuture<'a> { + type Output = KResult; + + fn poll(self: Pin<&mut Self>, cx: &mut core::task::Context<'_>) -> Poll { + let this = self.get_mut(); + match this.socket.try_send(this.stream, cx.waker()) { + Ok(Some(res)) => Poll::Ready(Ok(res)), + Ok(None) if this.socket.is_nonblock => Poll::Ready(Err(EAGAIN)), + Ok(None) => Poll::Pending, + Err(err) => Poll::Ready(Err(err)), + } + } + } + + SendFuture { + socket: self, + stream, + } + .await + } + + fn poll(&self, events: PollEvent) -> KResult { + let bound_socket_guard = Task::block_on(self.bound_socket.read()); + + match bound_socket_guard.as_ref().unwrap() { + BoundSocket::BoundSingle(single) => { + Self::poll_impl(single.iface(), single.handle(), events) + } + BoundSocket::BoundAll(all) => { + let mut poll_state = PollEvent::empty(); + for bound_socket in &all.sockets { + poll_state |= + Self::poll_impl(bound_socket.iface(), bound_socket.handle(), events)? + } + Ok(poll_state) + } + } + } +} + +impl Drop for TcpSocket { + fn drop(&mut self) { + let bound_socket_guard = Task::block_on(self.bound_socket.read()); + + if bound_socket_guard.is_none() { + return; + } + + let port = self.local_addr().unwrap().port(); + + match bound_socket_guard.as_ref().unwrap() { + BoundSocket::BoundAll(all) => { + for item in &all.sockets { + close_impl(item.iface(), item.handle(), port); + } + } + BoundSocket::BoundSingle(single) => close_impl(single.iface(), single.handle(), port), + } + + fn close_impl(iface: NetIface, handle: SocketHandle, port: u16) { + let mut iface_guard = Task::block_on(iface.lock()); + + let socket = iface_guard + .iface_and_sockets() + .1 + .get_mut::(handle); + + socket.close(); + + iface_guard.poll(); + + iface_guard.remove_socket(handle, port, SocketType::Tcp); + } + } +} diff --git a/src/net/socket/udp.rs b/src/net/socket/udp.rs new file mode 100644 index 00000000..c65e5d36 --- /dev/null +++ b/src/net/socket/udp.rs @@ -0,0 +1,403 @@ +use core::future::Future; +use core::net::SocketAddr; +use core::pin::Pin; +use core::task::{Poll, Waker}; + +use alloc::boxed::Box; +use alloc::collections::btree_map::BTreeMap; +use alloc::sync::Arc; +use async_trait::async_trait; +use eonix_runtime::task::Task; +use eonix_sync::{RwLock, Spin}; +use smoltcp::socket::udp; +use smoltcp::wire::IpListenEndpoint; +use smoltcp::{iface::SocketHandle, wire::IpEndpoint}; + +use crate::io::{Buffer, Stream}; +use crate::kernel::constants::{EADDRNOTAVAIL, EAGAIN, EINVAL}; +use crate::kernel::vfs::file::PollEvent; +use crate::net::iface::{get_ephemeral_iface, get_relate_iface, NetIface}; +use crate::net::socket::{BoundSocket, RecvMetadata, SendMetadata, Socket, SocketType}; +use crate::prelude::KResult; + +// FIXME: +pub static UDP_PORT_MAP: Spin> = Spin::new(BTreeMap::new()); + +pub struct UdpSocket { + bound_socket: RwLock>, + local_addr: RwLock>, + remote_addr: RwLock>, + is_nonblock: bool, + // FIXME: can ensure the order + is_reuse_other: Spin, +} + +impl UdpSocket { + pub fn new(is_nonblock: bool) -> Arc { + Arc::new(Self { + bound_socket: RwLock::new(None), + local_addr: RwLock::new(None), + remote_addr: RwLock::new(None), + is_nonblock, + is_reuse_other: Spin::new(false), + }) + } + + // Get single bound iface and socket handle + fn iface_and_handle(&self) -> Option<(NetIface, SocketHandle)> { + Task::block_on(self.bound_socket.read()) + .as_ref()? + .as_single_bound() + .map(|bound_socket| (bound_socket.iface(), bound_socket.handle())) + } + + fn bind_impl(iface: NetIface, socket_handle: SocketHandle, endpoint: T) -> KResult<()> + where + T: Into, + { + let mut iface_guard = Task::block_on(iface.lock()); + + let socket = iface_guard + .iface_and_sockets() + .1 + .get_mut::(socket_handle); + + let _ = socket.bind(endpoint); + Ok(()) + } + + fn recv_impl( + iface: NetIface, + socket_handle: SocketHandle, + buf: &mut dyn Buffer, + waker: &Waker, + ) -> KResult> { + let mut iface_guard = Task::block_on(iface.lock()); + + let socket = iface_guard + .iface_and_sockets() + .1 + .get_mut::(socket_handle); + + match socket.recv() { + Ok((data, meta)) => { + buf.fill(data)?; + Ok(Some(( + buf.wrote(), + RecvMetadata { + remote_addr: meta.endpoint.into(), + }, + ))) + } + Err(udp::RecvError::Exhausted) => { + socket.register_recv_waker(waker); + Ok(None) + } + Err(udp::RecvError::Truncated) => Err(EINVAL), + } + } + + fn try_recv( + &self, + buf: &mut dyn Buffer, + waker: &Waker, + ) -> KResult> { + let bound_socket_guard = Task::block_on(self.bound_socket.read()); + + match bound_socket_guard.as_ref().unwrap() { + BoundSocket::BoundAll(all) => { + for item in &all.sockets { + let ret = Self::recv_impl(item.iface(), item.handle(), buf, waker)?; + if ret.is_some() { + return Ok(ret); + } + } + Ok(None) + } + BoundSocket::BoundSingle(single) => { + Self::recv_impl(single.iface(), single.handle(), buf, waker) + } + } + } + + fn send_impl( + iface: NetIface, + socket_handle: SocketHandle, + stream: &mut dyn Stream, + remote_addr: SocketAddr, + waker: &Waker, + ) -> KResult> { + let mut iface_guard = Task::block_on(iface.lock()); + + let socket = iface_guard + .iface_and_sockets() + .1 + .get_mut::(socket_handle); + + match socket.send_with(stream.total(), IpEndpoint::from(remote_addr), |tx_buffer| { + stream + .poll_data(tx_buffer) + .unwrap() + .map(|data| data.len()) + .unwrap_or(0) + }) { + Ok(res) => Ok(Some(res)), + Err(udp::SendError::BufferFull) => { + socket.register_send_waker(waker); + Ok(None) + } + Err(udp::SendError::Unaddressable) => Err(EINVAL), + } + } + + fn try_send( + &self, + stream: &mut dyn Stream, + remote_addr: SocketAddr, + waker: &Waker, + ) -> KResult> { + let bound_socket_guard = Task::block_on(self.bound_socket.read()); + + match bound_socket_guard.as_ref().unwrap() { + BoundSocket::BoundAll(all) => { + let iface = get_ephemeral_iface(Some(remote_addr.ip())).unwrap(); + let handle = all + .sockets + .iter() + .find(|item| Arc::ptr_eq(&item.iface, &iface)) + .unwrap() + .handle(); + Self::send_impl(iface, handle, stream, remote_addr, waker) + } + BoundSocket::BoundSingle(single) => { + Self::send_impl(single.iface(), single.handle(), stream, remote_addr, waker) + } + } + } + + fn poll_impl( + iface: NetIface, + socket_handle: SocketHandle, + events: PollEvent, + ) -> KResult { + let mut iface_guard = Task::block_on(iface.lock()); + let socket = iface_guard + .iface_and_sockets() + .1 + .get_mut::(socket_handle); + + let mut poll_state = PollEvent::empty(); + if events.contains(PollEvent::Readable) { + if socket.can_recv() { + poll_state |= PollEvent::Readable; + } + } + if events.contains(PollEvent::Writable) { + if socket.can_send() { + poll_state |= PollEvent::Writable; + } + } + Ok(poll_state) + } +} + +#[async_trait] +impl Socket for UdpSocket { + fn local_addr(&self) -> Option { + Task::block_on(self.local_addr.read()).clone() + } + + fn remote_addr(&self) -> Option { + Task::block_on(self.remote_addr.read()).clone() + } + + fn bind(&self, socket_addr: SocketAddr) -> KResult<()> { + let mut bound_socket_guard = Task::block_on(self.bound_socket.write()); + + if bound_socket_guard.is_some() { + return Err(EINVAL); + } + + *Task::block_on(self.local_addr.write()) = Some(socket_addr); + + if let Some(bound_sock) = UDP_PORT_MAP.lock().get(&socket_addr) { + *bound_socket_guard = Some(bound_sock.clone()); + *self.is_reuse_other.lock() = true; + return Ok(()); + } + + let (bound_socket, local_addr) = if socket_addr.ip().is_unspecified() { + ( + BoundSocket::new_bind_all(socket_addr.port(), SocketType::Udp)?, + socket_addr, + ) + } else { + let bind_iface = get_relate_iface(socket_addr.ip()).ok_or(EADDRNOTAVAIL)?; + BoundSocket::new_bind_single(bind_iface, socket_addr.port(), SocketType::Udp)? + }; + + *Task::block_on(self.local_addr.write()) = Some(local_addr); + UDP_PORT_MAP.lock().insert(local_addr, bound_socket.clone()); + *bound_socket_guard = Some(bound_socket); + drop(bound_socket_guard); + + let bound_socket_guard = Task::block_on(self.bound_socket.read()); + + match bound_socket_guard.as_ref().unwrap() { + BoundSocket::BoundAll(all) => { + for item in &all.sockets { + Self::bind_impl(item.iface(), item.handle(), socket_addr.port())? + } + } + BoundSocket::BoundSingle(single) => { + Self::bind_impl(single.iface(), single.handle(), socket_addr)? + } + } + + Ok(()) + } + + async fn connect(&self, remote_addr: SocketAddr) -> KResult<()> { + *(self.remote_addr.write().await) = Some(remote_addr); + + Ok(()) + } + + async fn recv(&self, buffer: &mut dyn Buffer) -> KResult<(usize, RecvMetadata)> { + struct RecvFuture<'a> { + socket: &'a UdpSocket, + buffer: &'a mut dyn Buffer, + } + + impl<'a> Future for RecvFuture<'a> { + type Output = KResult<(usize, RecvMetadata)>; + + fn poll(self: Pin<&mut Self>, cx: &mut core::task::Context<'_>) -> Poll { + let this = self.get_mut(); + match this.socket.try_recv(this.buffer, cx.waker()) { + Ok(Some(res)) => Poll::Ready(Ok(res)), + Ok(None) if this.socket.is_nonblock => Poll::Ready(Err(EAGAIN)), + Ok(None) => Poll::Pending, + Err(err) => Poll::Ready(Err(err)), + } + } + } + + RecvFuture { + socket: self, + buffer, + } + .await + } + + async fn send(&self, stream: &mut dyn Stream, send_meta: SendMetadata) -> KResult { + let remote_addr = if let Some(remote_addr) = send_meta.remote_addr { + remote_addr + } else { + Task::block_on(self.remote_addr.read()).clone().unwrap() + }; + + let mut bound_socket_guard = Task::block_on(self.bound_socket.write()); + if bound_socket_guard.is_none() { + let bind_iface = get_ephemeral_iface(Some(remote_addr.ip())).unwrap(); + let (bound_socket, local_addr) = + BoundSocket::new_bind_single(bind_iface.clone(), 0, SocketType::Udp)?; + let socket_handle = bound_socket.as_single_bound().unwrap().handle(); + Self::bind_impl(bind_iface, socket_handle, local_addr)?; + *bound_socket_guard = Some(bound_socket); + *Task::block_on(self.local_addr.write()) = Some(local_addr); + } + + drop(bound_socket_guard); + + struct SendFuture<'a> { + socket: &'a UdpSocket, + remote_addr: SocketAddr, + stream: &'a mut dyn Stream, + } + + impl<'a> Future for SendFuture<'a> { + type Output = KResult; + + fn poll(self: Pin<&mut Self>, cx: &mut core::task::Context<'_>) -> Poll { + let this = self.get_mut(); + match this + .socket + .try_send(this.stream, this.remote_addr, cx.waker()) + { + Ok(Some(res)) => Poll::Ready(Ok(res)), + Ok(None) if this.socket.is_nonblock => Poll::Ready(Err(EAGAIN)), + Ok(None) => Poll::Pending, + Err(err) => Poll::Ready(Err(err)), + } + } + } + + SendFuture { + socket: self, + remote_addr, + stream, + } + .await + } + + fn poll(&self, events: PollEvent) -> KResult { + let bound_socket_guard = Task::block_on(self.bound_socket.read()); + + match bound_socket_guard.as_ref().unwrap() { + BoundSocket::BoundSingle(single) => { + Self::poll_impl(single.iface(), single.handle(), events) + } + BoundSocket::BoundAll(all) => { + let mut poll_state = PollEvent::empty(); + for bound_socket in &all.sockets { + poll_state |= + Self::poll_impl(bound_socket.iface(), bound_socket.handle(), events)? + } + Ok(poll_state) + } + } + } +} + +impl Drop for UdpSocket { + fn drop(&mut self) { + let bound_socket_guard = Task::block_on(self.bound_socket.read()); + + if bound_socket_guard.is_none() { + return; + } + + if *self.is_reuse_other.lock() { + return; + } + + let local_addr = self.local_addr().unwrap(); + let port = local_addr.port(); + + UDP_PORT_MAP.lock().remove(&local_addr); + match bound_socket_guard.as_ref().unwrap() { + BoundSocket::BoundAll(all) => { + for item in &all.sockets { + close_impl(item.iface(), item.handle(), port); + } + } + BoundSocket::BoundSingle(single) => close_impl(single.iface(), single.handle(), port), + } + + fn close_impl(iface: NetIface, handle: SocketHandle, port: u16) { + let mut iface_guard = Task::block_on(iface.lock()); + + let socket = iface_guard + .iface_and_sockets() + .1 + .get_mut::(handle); + + socket.close(); + + iface_guard.poll(); + + iface_guard.remove_socket(handle, port, SocketType::Udp); + } + } +} diff --git a/user-programs/init_script_loongarch64.sh b/user-programs/init_script_loongarch64.sh index 66b960ff..01a66f1d 100644 --- a/user-programs/init_script_loongarch64.sh +++ b/user-programs/init_script_loongarch64.sh @@ -18,11 +18,15 @@ do_or_freeze() { freeze $@ } +do_or_freeze $BUSYBOX mkdir -p /tmp + do_or_freeze $BUSYBOX mkdir -p /dev do_or_freeze $BUSYBOX mknod -m 666 /dev/console c 5 1 do_or_freeze $BUSYBOX mknod -m 666 /dev/null c 1 3 do_or_freeze $BUSYBOX mknod -m 666 /dev/zero c 1 5 +do_or_freeze $BUSYBOX mknod -m 666 /dev/random c 1 8 +do_or_freeze $BUSYBOX mknod -m 666 /dev/urandom c 1 9 do_or_freeze $BUSYBOX mknod -m 666 /dev/sda b 8 0 do_or_freeze $BUSYBOX mknod -m 666 /dev/sda1 b 8 1 do_or_freeze $BUSYBOX mknod -m 666 /dev/sdb b 8 16 diff --git a/user-programs/init_script_riscv64.sh b/user-programs/init_script_riscv64.sh index 52b2628c..340fc480 100644 --- a/user-programs/init_script_riscv64.sh +++ b/user-programs/init_script_riscv64.sh @@ -18,11 +18,15 @@ do_or_freeze() { freeze $@ } +do_or_freeze $BUSYBOX mkdir -p /tmp + do_or_freeze $BUSYBOX mkdir -p /dev do_or_freeze $BUSYBOX mknod -m 666 /dev/console c 5 1 do_or_freeze $BUSYBOX mknod -m 666 /dev/null c 1 3 do_or_freeze $BUSYBOX mknod -m 666 /dev/zero c 1 5 +do_or_freeze $BUSYBOX mknod -m 666 /dev/random c 1 8 +do_or_freeze $BUSYBOX mknod -m 666 /dev/urandom c 1 9 do_or_freeze $BUSYBOX mknod -m 666 /dev/vda b 8 0 do_or_freeze $BUSYBOX mknod -m 666 /dev/vda1 b 8 1 do_or_freeze $BUSYBOX mknod -m 666 /dev/vdb b 8 16 diff --git a/user-programs/init_script_x86_64.sh b/user-programs/init_script_x86_64.sh index 91f7f18a..9ab21009 100644 --- a/user-programs/init_script_x86_64.sh +++ b/user-programs/init_script_x86_64.sh @@ -18,11 +18,15 @@ do_or_freeze() { freeze $@ } +do_or_freeze $BUSYBOX mkdir -p /tmp + do_or_freeze $BUSYBOX mkdir -p /dev do_or_freeze $BUSYBOX mknod -m 666 /dev/console c 5 1 do_or_freeze $BUSYBOX mknod -m 666 /dev/null c 1 3 do_or_freeze $BUSYBOX mknod -m 666 /dev/zero c 1 5 +do_or_freeze $BUSYBOX mknod -m 666 /dev/random c 1 8 +do_or_freeze $BUSYBOX mknod -m 666 /dev/urandom c 1 9 do_or_freeze $BUSYBOX mknod -m 666 /dev/sda b 8 0 do_or_freeze $BUSYBOX mknod -m 666 /dev/sda1 b 8 1 do_or_freeze $BUSYBOX mknod -m 666 /dev/sdb b 8 16