From 21dfa867f08f0aa98d56d8eb2ce6f55f2cbe6f29 Mon Sep 17 00:00:00 2001 From: Nick Axworthy <48474755+nickaxgit@users.noreply.github.com> Date: Sat, 20 Jul 2024 12:54:37 +0100 Subject: [PATCH 1/4] UDP Conn implemented (#29) * Added UDPConn, commented some areas for easier understanding, suggested a name change or two * some tidying of udpConn.go * Changes as requested (whitespace, UDP close method, extract anonymous function, errUnsupported returns, function comments etc) * fixes two isuues spotted by b0ch3nski * more linebreak issues fixed - appologies * Tiny fix to itcphandler comment (although it has that c# smell) --- eth/headers.go | 3 +- stacks/port_tcp.go | 34 ++++++- stacks/port_udp.go | 19 ++-- stacks/portstack.go | 68 +++++++------- stacks/ring.go | 3 +- stacks/tcpconn.go | 13 ++- stacks/udpconn.go | 219 ++++++++++++++++++++++++++++++++++++++++++++ 7 files changed, 307 insertions(+), 52 deletions(-) create mode 100644 stacks/udpconn.go diff --git a/eth/headers.go b/eth/headers.go index debc8bd..c687488 100644 --- a/eth/headers.go +++ b/eth/headers.go @@ -228,6 +228,7 @@ func (ehdr EthernetHeader) AssertType() EtherType { return EtherType(ehdr.SizeOr // Put marshals the ethernet frame onto buf. buf needs to be 14 bytes in length or Put panics. func (ehdr *EthernetHeader) Put(buf []byte) { _ = buf[13] + copy(buf[0:], ehdr.Destination[0:]) copy(buf[6:], ehdr.Source[0:]) binary.BigEndian.PutUint16(buf[12:14], ehdr.SizeOrEtherType) @@ -256,7 +257,7 @@ func (ehdr *EthernetHeader) String() string { // IHL returns the internet header length in 32bit words and is guaranteed to be within 0..15. // Valid values for IHL are 5..15. When multiplied by 4 this yields number of bytes of the header, 20..60. -func (iphdr *IPv4Header) IHL() uint8 { return iphdr.VersionAndIHL & 0xf } +func (iphdr *IPv4Header) IHL() uint8 { return iphdr.VersionAndIHL & 0xf } //low four bits func (iphdr *IPv4Header) Version() uint8 { return iphdr.VersionAndIHL >> 4 } func (iphdr *IPv4Header) DSCP() uint8 { return iphdr.ToS >> 2 } func (iphdr *IPv4Header) ECN() uint8 { return iphdr.ToS & 0b11 } diff --git a/stacks/port_tcp.go b/stacks/port_tcp.go index c221612..8274f40 100644 --- a/stacks/port_tcp.go +++ b/stacks/port_tcp.go @@ -9,12 +9,13 @@ import ( "github.com/soypat/seqs/eth" ) -// tcphandler represents a user provided function for handling incoming TCP packets on a port. -// Incoming data is sent inside the `pkt` TCPPacket argument when pkt.HasPacket returns true. -// Outgoing data is stored into the `response` byte slice. The function must return the number of +// itcphandler represents a user provided function for handling incoming TCP packets on a port. +// Incoming data is passed in a 'pkt' to the recv function which is invoked whenever data arrives (by RecvEth) +// Outgoing data is written into the `dst` byte slice (from the tx ring buffer). The function must return the number of // bytes written to `response` and an error. -// +// TCPConn provides an implemntation of this interface - note .send is ONLY called by HandleEth // See [PortStack] for information on how to use this function and other port handlers. +// note TCPConn is our implementation of this interface type itcphandler interface { send(dst []byte) (n int, err error) recv(pkt *TCPPacket) error @@ -51,7 +52,7 @@ func (port *tcpPort) HandleEth(dst []byte) (n int, err error) { return n, err } -// Open sets the UDP handler and opens the port. +// Open sets the TCP handler and opens the port. func (port *tcpPort) Open(portNum uint16, handler itcphandler) { if portNum == 0 || handler == nil { panic("invalid port or nil handler" + strconv.Itoa(int(port.port))) @@ -193,6 +194,29 @@ func (pkt *TCPPacket) CalculateHeaders(seg seqs.Segment, payload []byte) { pkt.TCP.Checksum = pkt.TCP.CalculateChecksumIPv4(&pkt.IP, nil, payload) } +func (pkt *UDPPacket) CalculateHeaders(payload []byte) { + const ipLenInWords = 5 + pkt.Eth.SizeOrEtherType = uint16(eth.EtherTypeIPv4) + + // IPv4 frame. + pkt.IP.Protocol = 17 // UDP + pkt.IP.TTL = 64 + pkt.IP.ID = prand16(pkt.IP.ID) + pkt.IP.VersionAndIHL = ipLenInWords // Sets IHL: No IP options. Version set automatically. + pkt.IP.TotalLength = 4*ipLenInWords + eth.SizeUDPHeader + uint16(len(payload)) + // TODO(soypat): Document how to handle ToS. For now just use ToS used by other side. + pkt.IP.Flags = 0 // packet.IP.ToS = 0 + pkt.IP.Checksum = pkt.IP.CalculateChecksum() + + pkt.UDP = eth.UDPHeader{ + SourcePort: pkt.UDP.SourcePort, + DestinationPort: pkt.UDP.DestinationPort, + Checksum: pkt.UDP.CalculateChecksumIPv4(&pkt.IP, payload), + Length: uint16(len(payload) + 8), + } + +} + // prand16 generates a pseudo random number from a seed. func prand16(seed uint16) uint16 { // 16bit Xorshift https://en.wikipedia.org/wiki/Xorshift diff --git a/stacks/port_udp.go b/stacks/port_udp.go index c332a7e..b77d820 100644 --- a/stacks/port_udp.go +++ b/stacks/port_udp.go @@ -16,8 +16,8 @@ type iudphandler interface { } type udpPort struct { - ihandler iudphandler - port uint16 + handler iudphandler + port uint16 } func (port udpPort) Port() uint16 { return port.port } @@ -25,32 +25,35 @@ func (port udpPort) Port() uint16 { return port.port } // IsPendingHandling returns true if there are packet(s) pending handling. func (port *udpPort) IsPendingHandling() bool { // return port.port != 0 && port.ihandler.isPendingHandling() - return port.port != 0 && port.ihandler.isPendingHandling() + return port.port != 0 && port.handler.isPendingHandling() } // HandleEth writes the socket's response into dst to be sent over an ethernet interface. // HandleEth can return 0 bytes written and a nil error to indicate no action must be taken. func (port *udpPort) HandleEth(dst []byte) (int, error) { - if port.ihandler == nil { + + if port.handler == nil { panic("nil udp handler on port " + strconv.Itoa(int(port.port))) } - return port.ihandler.send(dst) + + return port.handler.send(dst) } // Open sets the UDP handler and opens the port. +// This is effectively a constructor for the port NewUDPPort() - would be an alternative name func (port *udpPort) Open(portNum uint16, h iudphandler) { if portNum == 0 || h == nil { panic("invalid port or nil handler" + strconv.Itoa(int(port.port))) } else if port.port != 0 { panic("port already open") } - port.ihandler = h + port.handler = h port.port = portNum } func (port *udpPort) Close() { port.port = 0 // Port 0 flags the port is inactive. - port.ihandler = nil + port.handler = nil } // UDP socket can be forced to respond even if no packet has been received @@ -70,7 +73,7 @@ func (pkt *UDPPacket) PutHeaders(b []byte) { panic("short UDPPacket buffer") } if pkt.IP.IHL() != 5 { - panic("UDPPacket.PutHeaders expects no IP options") + panic("UDPPacket.PutHeaders expects no IP options " + strconv.Itoa(int(pkt.IP.IHL()))) } pkt.Eth.Put(b) pkt.IP.Put(b[eth.SizeEthernetHeader:]) diff --git a/stacks/portstack.go b/stacks/portstack.go index c5533f4..b47ea37 100644 --- a/stacks/portstack.go +++ b/stacks/portstack.go @@ -18,6 +18,12 @@ const ( arpOpWait = 0xffff ) +type socket interface { + Close() + IsPendingHandling() bool + HandleEth(dst []byte) (int, error) +} + var modernAge = time.Date(2024, 1, 1, 0, 0, 0, 0, time.UTC) type ethernethandler = func(ehdr *eth.EthernetHeader, ethPayload []byte) error @@ -56,7 +62,7 @@ func NewPortStack(cfg PortStackConfig) *PortStack { var ErrFlagPending = errors.New("seqs: pending data") -// PortStack implements partial TCP/UDP packet muxing to respective sockets with [PortStack.RcvEth]. +// PortStack implements partial TCP/UDP packet muxing to respective sockets with [PortStack.RecvEth]. // This implementation limits itself basic header validation and port matching. // Users of PortStack are expected to implement connection state, packet buffering and retransmission logic. // - In the case of TCP this means implementing the TCP state machine. @@ -167,7 +173,6 @@ func (ps *PortStack) RecvEth(ethernetFrame []byte) (err error) { if len(payload) < eth.SizeEthernetHeader+eth.SizeIPv4Header { return errPacketSmol } else if len(payload) > int(ps.mtu) { - println("recv", payload, ps.mtu) return errPacketExceedsMTU } ps.trace("Stack.RecvEth:start", slog.Int("plen", len(payload))) @@ -267,8 +272,8 @@ func (ps *PortStack) RecvEth(ethernetFrame []byte) (err error) { pkt.Eth = *ehdr pkt.IP = ihdr // TODO(soypat): Don't ignore IP options. pkt.UDP = uhdr - copy(pkt.payload[:], payload) - err = port.ihandler.recv(pkt) + copy(pkt.payload[:], payload) //copies the payload from the EtherNet frame into the UDP packet + err = port.handler.recv(pkt) //<-- where the magic happens - invoking recv(), passes the arrived packet so in can be placed in the RX ring buffer if err == io.EOF { // Special case; EOF is flag to close port err = nil @@ -388,37 +393,10 @@ func (ps *PortStack) handleEth(dst []byte) (n int, err error) { return n, nil } - type Socket interface { - Close() - IsPendingHandling() bool - HandleEth(dst []byte) (int, error) - } - - handleSocket := func(dst []byte, sock Socket) (int, bool, error) { - if !sock.IsPendingHandling() { - return 0, false, nil // Nothing to handle, just skip. - } - // Socket has an unhandled packet. - n, err := sock.HandleEth(dst) - if err == ErrFlagPending { - // Special case: Socket may have written data but needs future handling, flagged with the ErrFlagPending error. - return n, true, nil - } - if err != nil { - sock.Close() - if err == io.EOF { - // Special case: If error is EOF we don't return it to caller but we do write the packet if any. - err = nil - } else { - n = 0 // Clear n on unknown error and return error up the call stack. - } - } - return n, sock.IsPendingHandling(), err - } - isDebug := ps.isLogEnabled(slog.LevelDebug) socketPending := false if ps.pendingUDPv4 > 0 { + for i := range ps.portsUDP { n, pending, err := handleSocket(dst, &ps.portsUDP[i]) if pending { @@ -427,6 +405,7 @@ func (ps *PortStack) handleEth(dst []byte) (n int, err error) { if err != nil { return 0, err } else if n > 0 { + if isDebug { ps.debug("UDP:send", slog.Int("plen", n)) } @@ -462,6 +441,31 @@ func (ps *PortStack) handleEth(dst []byte) (n int, err error) { return 0, nil // Nothing handled. } +func handleSocket(dst []byte, sock socket) (int, bool, error) { + + //note sock is a UDPport or TCPport - things that impliment the Sock interface + + if !sock.IsPendingHandling() { + return 0, false, nil // Nothing to handle, just skip. + } + // Socket has an unhandled packet. + n, err := sock.HandleEth(dst) + if err == ErrFlagPending { + // Special case: Socket may have written data but needs future handling, flagged with the ErrFlagPending error. + return n, true, nil + } + if err != nil { + sock.Close() + if err == io.EOF { + // Special case: If error is EOF we don't return it to caller but we do write the packet if any. + err = nil + } else { + n = 0 // Clear n on unknown error and return error up the call stack. + } + } + return n, sock.IsPendingHandling(), err +} + // IsPendingHandling checks if a call to HandleEth could possibly result in a packet being generated by the PortStack. func (ps *PortStack) IsPendingHandling() bool { return ps.pendingUDPv4 > 0 || ps.pendingTCPv4 > 0 || ps.arpClient.isPending() diff --git a/stacks/ring.go b/stacks/ring.go index 93843de..e5adb35 100644 --- a/stacks/ring.go +++ b/stacks/ring.go @@ -14,6 +14,7 @@ type ring struct { end int } +// Write writes b[] into the ring buffer at the current write position (r.end) - wrapping if needed, and increasing r.end (possibly wrapping) func (r *ring) Write(b []byte) (int, error) { free := r.Free() if len(b) > free { @@ -105,7 +106,7 @@ func (r *ring) onReadEnd() { r.off = 0 // Wrap around. } if r.off == r.end { - r.Reset() // We read everything, reset. + r.Reset() // We have read everything, reset. (reduces split reads - can get the data in one chunk more often) } } diff --git a/stacks/tcpconn.go b/stacks/tcpconn.go index 544e885..da18f67 100644 --- a/stacks/tcpconn.go +++ b/stacks/tcpconn.go @@ -39,7 +39,7 @@ type TCPConn struct { // remote is the IP+port address of remote. remote netip.AddrPort localPort uint16 - remoteMAC [6]byte + remoteMAC [6]byte //this is the local peer's MAC address (or often, that of the router/gateway for Internet traffic) abortErr error closing bool // connid is a conenction counter that is incremented each time a new @@ -160,8 +160,8 @@ func (sock *TCPConn) Write(b []byte) (n int, _ error) { } } -// Read reads data from the socket's input buffer. If the buffer is empty, -// Read will block until data is available. +// Read reads data from the socket's input (RX) (ring) buffer... populating b[].. +// If the rx buffer is empty, Read will block until data is available. func (sock *TCPConn) Read(b []byte) (int, error) { err := sock.checkPipeOpen() if err != nil { @@ -331,6 +331,7 @@ func (sock *TCPConn) checkPipeOpen() error { return nil } +// recv is called by the PortStack.RecvEth when a packet is received on the network interface, pkt is (a pointer to) the arrived packet. func (sock *TCPConn) recv(pkt *TCPPacket) (err error) { sock.trace("TCPConn.recv:start") prevState := sock.scb.State() @@ -380,6 +381,7 @@ func (sock *TCPConn) recv(pkt *TCPPacket) (err error) { return err } +// Send this handler is called by the underlying stack and populates response[] from the TX ring buffer, with data to be sent as a packet func (sock *TCPConn) send(response []byte) (n int, err error) { defer sock.trace("TCPConn.send:start") if !sock.remote.IsValid() { @@ -414,14 +416,15 @@ func (sock *TCPConn) send(response []byte) (n int, err error) { var payload []byte if available > 0 { payload = response[sizeTCPNoOptions : sizeTCPNoOptions+seg.DATALEN] + //we are reading out of the TX ring buffer, data to encapsulate and send n, err = sock.tx.Read(payload) if err != nil && err != io.EOF || n != int(seg.DATALEN) { - panic("bug in handleUser") // This is a bug in ring buffer or a race condition. + panic("unexpected condition in seqs.TCPConn.send") // This is a bug in ring buffer or a race condition. } } sock.setSrcDest(&sock.pkt) sock.pkt.CalculateHeaders(seg, payload) - sock.pkt.PutHeaders(response) + sock.pkt.PutHeaders(response) //puts the headers in the response bytes array (around the payload) if prevState != sock.scb.State() { sock.info("TCP:tx-statechange", slog.Uint64("port", uint64(sock.localPort)), slog.String("old", prevState.String()), slog.String("new", sock.scb.State().String()), slog.String("txflags", seg.Flags.String())) } diff --git a/stacks/udpconn.go b/stacks/udpconn.go new file mode 100644 index 0000000..7d3e7ae --- /dev/null +++ b/stacks/udpconn.go @@ -0,0 +1,219 @@ +package stacks + +//UDP is 'connectionless' so this is only a connection in so far as it's a bunch of packets arriving on a common port +//Packets can fail to arrive, or arrive out of order, or arrive more than once - this is the nature of UDP +//It is very easy to build a reliable protocol on top of UDP + +import ( + "errors" + "github.com/soypat/seqs/eth" + "github.com/soypat/seqs/internal" + "io" + "log/slog" + "net" + "net/netip" + "time" +) + +var _ net.Conn = &UDPConn{} //net.conn is part of the standard go library - it's an interface we must implement + +// The handler is a set of methods (primarily send and recv) that the underlying port stack will call, to pass packets for processing +// it is an interface we need to implement +var _ iudphandler = (*UDPConn)(nil) + +var defaultUDPbuffSize = uint16(4096) //a bit arbitrary + +const ( + //defaultSocketSize = 2048 + sizeUDPNoOptions = eth.SizeEthernetHeader + eth.SizeIPv4Header + eth.SizeUDPHeader +) + +type UDPConn struct { + stack *PortStack + lastRx time.Time + pkt UDPPacket //this is a reusable 'scratchpad' packet used for sending + tx ring //the ring buffers contain unpacketised data + rx ring + remote netip.AddrPort + localPort uint16 + remoteMAC [6]byte //this is a local peer - OR the ROUTER/Gateways mac address + raddr, laddr net.UDPAddr +} + +// this is an identical structure to TCPConnConfig - but I didn't want to change the original name without discussions +type UDPConnConfig struct { + TxBufSize uint16 + RxBufSize uint16 +} + +func NewUDPConn(stack *PortStack, cfg UDPConnConfig) (*UDPConn, error) { + + if cfg.RxBufSize == 0 { + cfg.RxBufSize = defaultUDPbuffSize + } + if cfg.TxBufSize == 0 { + cfg.TxBufSize = defaultUDPbuffSize + } + + buf := make([]byte, cfg.RxBufSize+cfg.TxBufSize) //I guess slicing this single buffer is a heap allocation optimisation or something - but I'm not really a fan TX and RX buffers would be a lot more readable + + sock := makeUDPConn(stack, buf[:cfg.TxBufSize], buf[cfg.TxBufSize:cfg.TxBufSize+cfg.RxBufSize]) + sock.trace("NewUDPConn:end") + return &sock, nil + +} + +func makeUDPConn(stack *PortStack, tx, rx []byte) UDPConn { + return UDPConn{ + stack: stack, + tx: ring{buf: tx}, + rx: ring{buf: rx}, + } +} + +// OpenDialUDP won't really do anything - other than choose a local outbound port) ??? +func (sock *UDPConn) OpenDialUDP(localPort uint16, remoteMAC [6]byte, remote netip.AddrPort) error { + + sock.trace("UDPConn.OpenDialUDP:start") + return sock.openstack(localPort, remoteMAC, remote) + +} + +func (sock *UDPConn) openstack(localPortNum uint16, remoteMAC [6]byte, remote netip.AddrPort) error { //}, iss seqs.Value, remoteMAC [6]byte, remoteAddr netip.AddrPort) error { + + sock.stack.OpenUDP(localPortNum, sock) + err := sock.open(localPortNum, remoteMAC, remote) + + return err + +} + +func (sock *UDPConn) open(localPortNum uint16, remoteMAC [6]byte, remoteAddr netip.AddrPort) error { + + sock.remoteMAC = remoteMAC //this is our router/gateway MAC address - we never get to know the remote MAC address (that would be a security issue) + sock.remote = remoteAddr + sock.localPort = localPortNum + sock.rx.Reset() + sock.tx.Reset() + + return nil +} + +func (u *UDPConn) abort() { + // There is no connection per se to abort +} + +func (u *UDPConn) SetReadDeadline(t time.Time) error { + return errors.ErrUnsupported +} + +func (u *UDPConn) SetWriteDeadline(t time.Time) error { + return errors.ErrUnsupported +} + +func (sock *UDPConn) isPendingHandling() bool { + + // much simpler than TCP - may need expanding?? + return (sock.tx.Buffered() > 0) || (sock.rx.Buffered() > 0) + +} + +// Read reads from the underlying RX ring buffer, throws an EOF error if no data is available +func (u *UDPConn) Read(b []byte) (n int, err error) { + return u.rx.Read(b) //read from the rx ring buffer into b +} + +// Write writes into the underlying tx ring buffer - calls to the portStacks handleEth() will send the (queued) data +func (sock *UDPConn) Write(b []byte) (n int, err error) { + err = sock.stack.FlagPendingUDP(sock.localPort) + if err != nil { + return 0, err + } + return sock.tx.Write(b) +} + +func (u *UDPConn) Close() error { + return u.stack.CloseUDP(u.localPort) +} + +func (sock *UDPConn) LocalAddr() net.Addr { + + sock.laddr = net.UDPAddr{ + IP: sock.stack.ip[:], + Port: int(sock.localPort), + } + return &sock.laddr + +} + +func (sock *UDPConn) RemoteAddr() net.Addr { + sock.raddr = net.UDPAddr{ + IP: sock.remote.Addr().AsSlice(), + Port: int(sock.remote.Port()), + } + return &sock.raddr +} + +func (u *UDPConn) SetDeadline(t time.Time) error { + return errors.ErrUnsupported +} + +func (sock *UDPConn) trace(msg string, attrs ...slog.Attr) { + internal.LogAttrs(sock.stack.logger, internal.LevelTrace, msg, attrs...) +} + +// recv takes (the contents of ) a packet it and puts it in the RX ring buffer +func (sock *UDPConn) recv(pkt *UDPPacket) (err error) { + sock.trace("UDP.recv:start") + + remotePort := sock.remote.Port() + if remotePort != 0 && pkt.UDP.SourcePort != remotePort { + return nil // This packet came from a different client (remote port) to the one we are interacting with. + } + sock.lastRx = pkt.Rx + // By this point we know that the packet is valid and contains data, we process it. + payload := pkt.Payload() + _, err = sock.rx.Write(payload) //write into the UDPconns rx (ring) buffer + + return err //which is hopefully nil - but could be errRingBufferFull +} + +// send - this handler is called regularly by the underlying stack (HandleEth) and populates response[] from the TX ring buffer, with data to be sent as a packet +func (sock *UDPConn) send(response []byte) (n int, err error) { + + sock.trace("UDPConn.send:start") + + if !sock.remote.IsValid() { + return 0, nil // No remote address yet, yield. + } + + available := min(sock.tx.Buffered(), len(response)-sizeUDPNoOptions) + + var payload []byte + if available > 0 { + payload = response[sizeUDPNoOptions : sizeUDPNoOptions+available] //this is a reference to the payload section of the response[] slice we are populating + + //we are reading out of the TX ring buffer, data to encapsulate and send + n, err = sock.tx.Read(payload) //fill the payload section from the TX ring buffer + + if err != nil && err != io.EOF || n != int(available) { + panic("bug in UDPCpn.send reading from TX buffer") + } + } + + sock.setSrcDest(&sock.pkt) + sock.pkt.CalculateHeaders(payload) + sock.pkt.PutHeaders(response) //the sock (conn object) has the local and remote port data to be able to embed in the packet + + return sizeUDPNoOptions + n, err +} + +func (sock *UDPConn) setSrcDest(pkt *UDPPacket) { + pkt.Eth.Source = sock.stack.HardwareAddr6() + pkt.IP.Source = sock.stack.ip + pkt.UDP.SourcePort = sock.localPort + + pkt.IP.Destination = sock.remote.Addr().As4() + pkt.UDP.DestinationPort = sock.remote.Port() + pkt.Eth.Destination = sock.remoteMAC +} From 42d2e4b31477505e216e7e31119d6cb7bca38ec7 Mon Sep 17 00:00:00 2001 From: Patricio Whittingslow Date: Sat, 20 Jul 2024 12:16:08 -0300 Subject: [PATCH 2/4] stacks.UDPConn: implement net.Conn somewhat faithfully --- stacks/portstack.go | 28 ++++--- stacks/tcpconn.go | 5 +- stacks/udpconn.go | 189 ++++++++++++++++++++++++++++++++------------ 3 files changed, 159 insertions(+), 63 deletions(-) diff --git a/stacks/portstack.go b/stacks/portstack.go index b47ea37..614132c 100644 --- a/stacks/portstack.go +++ b/stacks/portstack.go @@ -162,9 +162,9 @@ func (ps *PortStack) MTU() uint16 { return ps.mtu } func (ps *PortStack) HardwareAddr6() [6]byte { return ps.mac } // RecvEth validates an ethernet+ipv4 frame in payload. If it is OK then it -// defers response handling of the packets during a call to [Stack.HandleEth]. +// defers response handling of the packets during a call to [PortStack.HandleEth]. // -// If [Stack.HandleEth] is not called often enough prevent packet queue from +// If [PortStack.HandleEth] is not called often enough prevent packet queue from // filling up on a socket RecvEth will start to return [ErrDroppedPacket]. func (ps *PortStack) RecvEth(ethernetFrame []byte) (err error) { // defer ps.trace("RecvEth:end") @@ -359,12 +359,18 @@ func (ps *PortStack) RecvEth(ethernetFrame []byte) (err error) { return err } +// HandleEth searches for a socket with a pending packet and writes the response +// into the dst argument. The length written to dst is returned. +// [ErrFlagPending] can be returned by value by a handler to indicate the packet was +// not processed and that a future call to HandleEth is required to complete. +// +// If a handler returns any other error the port is closed. func (ps *PortStack) HandleEth(dst []byte) (n int, err error) { isTrace := ps.isLogEnabled(internal.LevelTrace) n, err = ps.handleEth(dst) if n > 0 && err == nil { if isTrace { - ps.trace("Stack: HandleEth", slog.Int("plen", n)) + ps.trace("Stack:HandleEth", slog.Int("plen", n)) } ps.lastTx = ps.now() ps.processedPackets++ @@ -374,12 +380,6 @@ func (ps *PortStack) HandleEth(dst []byte) (n int, err error) { return n, err } -// HandleEth searches for a socket with a pending packet and writes the response -// into the dst argument. The length written to dst is returned. -// [ErrFlagPending] can be returned by value by a handler to indicate the packet was -// not processed and that a future call to HandleEth is required to complete. -// -// If a handler returns any other error the port is closed. func (ps *PortStack) handleEth(dst []byte) (n int, err error) { switch { case len(dst) < int(ps.mtu): @@ -442,9 +442,7 @@ func (ps *PortStack) handleEth(dst []byte) (n int, err error) { } func handleSocket(dst []byte, sock socket) (int, bool, error) { - //note sock is a UDPport or TCPport - things that impliment the Sock interface - if !sock.IsPendingHandling() { return 0, false, nil // Nothing to handle, just skip. } @@ -642,3 +640,11 @@ func bytesAttr(name string, b []byte) slog.Attr { Value: slog.StringValue(string(b)), } } + +func contiguous2Bufs(b1, b2 int) ([]byte, []byte) { + buf := make([]byte, b1+b2) + // make sure first buffer's capacity does not bleed into second buffer to avoid append interference. + buf1 := buf[:b1:b1] + buf2 := buf[b1 : b1+b2] + return buf1, buf2 +} diff --git a/stacks/tcpconn.go b/stacks/tcpconn.go index da18f67..d1bca69 100644 --- a/stacks/tcpconn.go +++ b/stacks/tcpconn.go @@ -62,8 +62,8 @@ func NewTCPConn(stack *PortStack, cfg TCPConnConfig) (*TCPConn, error) { if cfg.TxBufSize == 0 { cfg.TxBufSize = defaultSocketSize } - buf := make([]byte, cfg.RxBufSize+cfg.TxBufSize) - sock := makeTCPConn(stack, buf[:cfg.TxBufSize], buf[cfg.TxBufSize:cfg.TxBufSize+cfg.RxBufSize]) + tx, rx := contiguous2Bufs(int(cfg.TxBufSize), int(cfg.RxBufSize)) + sock := makeTCPConn(stack, tx, rx) sock.trace("NewTCPConn:end") return &sock, nil } @@ -273,6 +273,7 @@ func (sock *TCPConn) openstack(state seqs.State, localPortNum uint16, iss seqs.V } err = sock.open(state, localPortNum, iss, remoteMAC, remoteAddr) if err != nil { + // On failure ensure PortStack socket is released to avoid leak. sock.stack.CloseTCP(localPortNum) } return err diff --git a/stacks/udpconn.go b/stacks/udpconn.go index 7d3e7ae..bf1f8c7 100644 --- a/stacks/udpconn.go +++ b/stacks/udpconn.go @@ -6,13 +6,16 @@ package stacks import ( "errors" - "github.com/soypat/seqs/eth" - "github.com/soypat/seqs/internal" "io" "log/slog" "net" "net/netip" + "os" + "runtime" "time" + + "github.com/soypat/seqs/eth" + "github.com/soypat/seqs/internal" ) var _ net.Conn = &UDPConn{} //net.conn is part of the standard go library - it's an interface we must implement @@ -38,6 +41,10 @@ type UDPConn struct { localPort uint16 remoteMAC [6]byte //this is a local peer - OR the ROUTER/Gateways mac address raddr, laddr net.UDPAddr + // Read and Write deadlines. + rdeadline, wdeadline time.Time + connid uint8 + closing bool } // this is an identical structure to TCPConnConfig - but I didn't want to change the original name without discussions @@ -55,9 +62,8 @@ func NewUDPConn(stack *PortStack, cfg UDPConnConfig) (*UDPConn, error) { cfg.TxBufSize = defaultUDPbuffSize } - buf := make([]byte, cfg.RxBufSize+cfg.TxBufSize) //I guess slicing this single buffer is a heap allocation optimisation or something - but I'm not really a fan TX and RX buffers would be a lot more readable - - sock := makeUDPConn(stack, buf[:cfg.TxBufSize], buf[cfg.TxBufSize:cfg.TxBufSize+cfg.RxBufSize]) + tx, rx := contiguous2Bufs(int(cfg.TxBufSize), int(cfg.RxBufSize)) + sock := makeUDPConn(stack, tx, rx) sock.trace("NewUDPConn:end") return &sock, nil @@ -73,77 +79,147 @@ func makeUDPConn(stack *PortStack, tx, rx []byte) UDPConn { // OpenDialUDP won't really do anything - other than choose a local outbound port) ??? func (sock *UDPConn) OpenDialUDP(localPort uint16, remoteMAC [6]byte, remote netip.AddrPort) error { - + if !remote.IsValid() { + return errors.New("invalid netip.AddrPort") + } sock.trace("UDPConn.OpenDialUDP:start") - return sock.openstack(localPort, remoteMAC, remote) - -} - -func (sock *UDPConn) openstack(localPortNum uint16, remoteMAC [6]byte, remote netip.AddrPort) error { //}, iss seqs.Value, remoteMAC [6]byte, remoteAddr netip.AddrPort) error { - - sock.stack.OpenUDP(localPortNum, sock) - err := sock.open(localPortNum, remoteMAC, remote) - - return err - -} - -func (sock *UDPConn) open(localPortNum uint16, remoteMAC [6]byte, remoteAddr netip.AddrPort) error { - + err := sock.stack.OpenUDP(localPort, sock) + if err != nil { + return err + } + sock.closing = false + sock.connid++ sock.remoteMAC = remoteMAC //this is our router/gateway MAC address - we never get to know the remote MAC address (that would be a security issue) - sock.remote = remoteAddr - sock.localPort = localPortNum + sock.remote = remote + sock.localPort = localPort sock.rx.Reset() sock.tx.Reset() - return nil } -func (u *UDPConn) abort() { - // There is no connection per se to abort +// abort deletes connection state and fails all pending Read/Write calls. +func (sock *UDPConn) abort() { + sock.rx.Reset() + sock.tx.Reset() + *sock = UDPConn{ + stack: sock.stack, + tx: sock.tx, + rx: sock.tx, + connid: sock.connid + 1, + } } -func (u *UDPConn) SetReadDeadline(t time.Time) error { - return errors.ErrUnsupported +func (sock *UDPConn) isPendingHandling() bool { + // much simpler than TCP - may need expanding?? + return sock.closing || sock.tx.Buffered() > 0 } -func (u *UDPConn) SetWriteDeadline(t time.Time) error { - return errors.ErrUnsupported +// Read reads from the underlying RX ring buffer, throws an EOF error if no data is available +func (sock *UDPConn) Read(b []byte) (int, error) { + err := sock.checkPipeClosed() + if err != nil { + return 0, err + } + connid := sock.connid + backoff := internal.NewBackoff(internal.BackoffHasPriority) + for sock.rx.Buffered() == 0 { + if connid != sock.connid { + return 0, net.ErrClosed + } else if !sock.wdeadline.IsZero() && time.Since(sock.wdeadline) > 0 { + return 0, os.ErrDeadlineExceeded + } + backoff.Miss() + } + return sock.rx.Read(b) //read from the rx ring buffer into b } -func (sock *UDPConn) isPendingHandling() bool { - - // much simpler than TCP - may need expanding?? - return (sock.tx.Buffered() > 0) || (sock.rx.Buffered() > 0) - +func (sock *UDPConn) BufferedInput() int { + if sock.closing || sock.localPort == 0 { + return 0 + } + return sock.rx.Buffered() } -// Read reads from the underlying RX ring buffer, throws an EOF error if no data is available -func (u *UDPConn) Read(b []byte) (n int, err error) { - return u.rx.Read(b) //read from the rx ring buffer into b +func (sock *UDPConn) FlushOutputBuffer() error { + sock.trace("UDPConn.FlushOutputBuffer:start") + if err := sock.checkPipeClosed(); err != nil { + return err + } + backoff := internal.NewBackoff(internal.BackoffHasPriority) + connid := sock.connid + for sock.tx.Buffered() > 0 { + backoff.Miss() + if connid != sock.connid { + return net.ErrClosed + } + } + return nil } // Write writes into the underlying tx ring buffer - calls to the portStacks handleEth() will send the (queued) data -func (sock *UDPConn) Write(b []byte) (n int, err error) { +func (sock *UDPConn) Write(b []byte) (int, error) { + err := sock.checkPipeClosed() + if err != nil { + return 0, err + } err = sock.stack.FlagPendingUDP(sock.localPort) if err != nil { return 0, err } - return sock.tx.Write(b) + connid := sock.connid + totalLen := len(b) + backoff := internal.NewBackoff(internal.BackoffHasPriority) + + for { + if connid != sock.connid { + // Important check before writing to buffer- maybe connection was aborted during sleep. + return totalLen - len(b), net.ErrClosed + } + n, _ := sock.tx.Write(b) + b = b[n:] + if len(b) == 0 { + break + } + if n == 0 && !sock.wdeadline.IsZero() && time.Since(sock.wdeadline) > 0 { + return totalLen - len(b), os.ErrDeadlineExceeded + } else if n == 0 { + backoff.Miss() + } else { + backoff.Hit() + runtime.Gosched() + } + err = sock.stack.FlagPendingUDP(sock.localPort) + if err != nil { + return totalLen - len(b), err + } + } + return totalLen, nil } -func (u *UDPConn) Close() error { - return u.stack.CloseUDP(u.localPort) +func (sock *UDPConn) Close() error { + if sock.localPort == 0 { + return net.ErrClosed + } + sock.closing = true + sock.stack.FlagPendingUDP(sock.localPort) + return nil } -func (sock *UDPConn) LocalAddr() net.Addr { +func (sock *UDPConn) checkPipeClosed() error { + if sock.closing { + return io.EOF + } else if sock.localPort == 0 { + return net.ErrClosed + } + return nil +} +func (sock *UDPConn) LocalAddr() net.Addr { sock.laddr = net.UDPAddr{ IP: sock.stack.ip[:], Port: int(sock.localPort), } return &sock.laddr - } func (sock *UDPConn) RemoteAddr() net.Addr { @@ -154,17 +230,28 @@ func (sock *UDPConn) RemoteAddr() net.Addr { return &sock.raddr } -func (u *UDPConn) SetDeadline(t time.Time) error { - return errors.ErrUnsupported +func (sock *UDPConn) SetDeadline(t time.Time) error { + sock.SetReadDeadline(t) + sock.SetWriteDeadline(t) + return nil } -func (sock *UDPConn) trace(msg string, attrs ...slog.Attr) { - internal.LogAttrs(sock.stack.logger, internal.LevelTrace, msg, attrs...) +func (sock *UDPConn) SetReadDeadline(t time.Time) error { + sock.rdeadline = t + return nil +} + +func (sock *UDPConn) SetWriteDeadline(t time.Time) error { + sock.wdeadline = t + return nil } // recv takes (the contents of ) a packet it and puts it in the RX ring buffer func (sock *UDPConn) recv(pkt *UDPPacket) (err error) { sock.trace("UDP.recv:start") + if sock.closing { + return io.EOF + } remotePort := sock.remote.Port() if remotePort != 0 && pkt.UDP.SourcePort != remotePort { @@ -180,9 +267,7 @@ func (sock *UDPConn) recv(pkt *UDPPacket) (err error) { // send - this handler is called regularly by the underlying stack (HandleEth) and populates response[] from the TX ring buffer, with data to be sent as a packet func (sock *UDPConn) send(response []byte) (n int, err error) { - sock.trace("UDPConn.send:start") - if !sock.remote.IsValid() { return 0, nil // No remote address yet, yield. } @@ -217,3 +302,7 @@ func (sock *UDPConn) setSrcDest(pkt *UDPPacket) { pkt.UDP.DestinationPort = sock.remote.Port() pkt.Eth.Destination = sock.remoteMAC } + +func (sock *UDPConn) trace(msg string, attrs ...slog.Attr) { + internal.LogAttrs(sock.stack.logger, internal.LevelTrace, msg, attrs...) +} From 74e7a6bc7a705fc113a59f65165f64c47b2c9a44 Mon Sep 17 00:00:00 2001 From: Patricio Whittingslow Date: Sat, 20 Jul 2024 13:38:06 -0300 Subject: [PATCH 3/4] PortStack.SendEth->PutOutboundEth refactor --- stacks/arp.go | 6 ++++-- stacks/dhcp_client.go | 6 ++++-- stacks/dhcp_server.go | 6 ++++-- stacks/dns_client.go | 6 ++++-- stacks/ntp_client.go | 6 ++++-- stacks/port_tcp.go | 22 +++++++++++++--------- stacks/port_udp.go | 15 +++++++++------ stacks/portstack.go | 40 ++++++++++++++++++++-------------------- stacks/stacks_test.go | 4 ++-- stacks/tcpconn.go | 8 ++++---- stacks/tcplistener.go | 12 +++++++----- stacks/udpconn.go | 10 +++++----- 12 files changed, 80 insertions(+), 61 deletions(-) diff --git a/stacks/arp.go b/stacks/arp.go index bd8e02b..27f6591 100644 --- a/stacks/arp.go +++ b/stacks/arp.go @@ -95,7 +95,8 @@ func (c *arpClient) pendingOutReqARPv4() bool { return c.result.Operation == 1 // User asked for a ARP request. } -func (c *arpClient) handle(dst []byte) (n int) { +// putOutboundEth implements [iudphandler] interface. +func (c *arpClient) putOutboundEth(dst []byte) (n int) { pendingOutReq := c.pendingOutReqARPv4() switch { case pendingOutReq: @@ -131,7 +132,8 @@ func (c *arpClient) handle(dst []byte) (n int) { return n } -func (c *arpClient) recv(ahdr *eth.ARPv4Header) error { +// recvEth implements [iudphandler] interface. +func (c *arpClient) recvEth(ahdr *eth.ARPv4Header) error { if ahdr.HardwareLength != 6 || ahdr.ProtoLength != 4 || ahdr.HardwareType != 1 || ahdr.AssertEtherType() != eth.EtherTypeIPv4 { return errARPUnsupported // Ignore ARP unsupported requests. } diff --git a/stacks/dhcp_client.go b/stacks/dhcp_client.go index 1a03c35..9e4eab2 100644 --- a/stacks/dhcp_client.go +++ b/stacks/dhcp_client.go @@ -243,7 +243,8 @@ func getDefaultParams() []byte { return unsafe.Slice((*byte)(unsafe.Pointer(ptr)), len(dhcpDefaultParamReqList)) } -func (d *DHCPClient) send(dst []byte) (n int, err error) { +// putOutboundEth implements [iudphandler] interface. +func (d *DHCPClient) putOutboundEth(dst []byte) (n int, err error) { if d.isAborted() { return 0, io.EOF } else if !d.isPendingHandling() { @@ -340,7 +341,8 @@ func (d *DHCPClient) send(dst []byte) (n int, err error) { return ptr, nil } -func (d *DHCPClient) recv(pkt *UDPPacket) (err error) { +// recvEth implements [iudphandler] interface. +func (d *DHCPClient) recvEth(pkt *UDPPacket) (err error) { if d.isAborted() { return io.EOF } diff --git a/stacks/dhcp_server.go b/stacks/dhcp_server.go index f0ff9cc..23d300e 100644 --- a/stacks/dhcp_server.go +++ b/stacks/dhcp_server.go @@ -45,7 +45,8 @@ func (d *DHCPServer) Start() error { return d.stack.OpenUDP(d.port, d) } -func (d *DHCPServer) recv(pkt *UDPPacket) (err error) { +// recvEth implements [iudphandler] interface. +func (d *DHCPServer) recvEth(pkt *UDPPacket) (err error) { if d.isAborted() { return io.EOF // Signal to close socket. } @@ -57,7 +58,8 @@ func (d *DHCPServer) recv(pkt *UDPPacket) (err error) { return nil } -func (d *DHCPServer) send(dst []byte) (int, error) { +// putOutboundEth implements [iudphandler] interface. +func (d *DHCPServer) putOutboundEth(dst []byte) (int, error) { if d.isAborted() { return 0, io.EOF // Signal to close socket. } diff --git a/stacks/dns_client.go b/stacks/dns_client.go index ed438bc..3f0ed3c 100644 --- a/stacks/dns_client.go +++ b/stacks/dns_client.go @@ -68,7 +68,8 @@ func (dnsc *DNSClient) StartResolve(cfg DNSResolveConfig) error { return nil } -func (dnsc *DNSClient) send(dst []byte) (n int, err error) { +// putOutboundEth implements [iudphandler] interface. +func (dnsc *DNSClient) putOutboundEth(dst []byte) (n int, err error) { if dnsc.state == dnsAborted { return 0, io.EOF } else if dnsc.state != dnsSendQuery { @@ -101,7 +102,8 @@ func (dnsc *DNSClient) send(dst []byte) (n int, err error) { return payloadOffset + int(msgLen), nil } -func (dnsc *DNSClient) recv(pkt *UDPPacket) error { +// recvEth implements the [iudphandler] interface. +func (dnsc *DNSClient) recvEth(pkt *UDPPacket) error { if dnsc.state == dnsAborted { return io.EOF } else if dnsc.state != dnsAwaitResponse { diff --git a/stacks/ntp_client.go b/stacks/ntp_client.go index 90ed4e2..94298dc 100644 --- a/stacks/ntp_client.go +++ b/stacks/ntp_client.go @@ -64,7 +64,8 @@ func (nc *NTPClient) BeginDefaultRequest(hwaddr [6]byte, raddr netip.Addr) error return nil } -func (nc *NTPClient) send(dst []byte) (n int, err error) { +// putOutboundEth implements [iudphandler] interface. +func (nc *NTPClient) putOutboundEth(dst []byte) (n int, err error) { const ( payloadoffset = eth.SizeEthernetHeader + eth.SizeIPv4Header + eth.SizeUDPHeader ToS = 192 @@ -101,7 +102,8 @@ func (nc *NTPClient) send(dst []byte) (n int, err error) { return payloadoffset + ntp.SizeHeader, nil } -func (nc *NTPClient) recv(pkt *UDPPacket) (err error) { +// recvEth implements the [iudphandler] interface. +func (nc *NTPClient) recvEth(pkt *UDPPacket) (err error) { if nc.isAborted() || nc.IsDone() { return io.EOF } diff --git a/stacks/port_tcp.go b/stacks/port_tcp.go index 8274f40..b38b7f9 100644 --- a/stacks/port_tcp.go +++ b/stacks/port_tcp.go @@ -10,15 +10,19 @@ import ( ) // itcphandler represents a user provided function for handling incoming TCP packets on a port. -// Incoming data is passed in a 'pkt' to the recv function which is invoked whenever data arrives (by RecvEth) +// Incoming data is passed in a 'pkt' to the recv function which is invoked whenever data arrives (by [PortStack.RecvEth]) // Outgoing data is written into the `dst` byte slice (from the tx ring buffer). The function must return the number of // bytes written to `response` and an error. -// TCPConn provides an implemntation of this interface - note .send is ONLY called by HandleEth +// TCPConn provides an implemntation of this interface - note .send is ONLY called by [PortStack.PutOutboundEth] // See [PortStack] for information on how to use this function and other port handlers. -// note TCPConn is our implementation of this interface +// Note [TCPConn] is our implementation of this interface type itcphandler interface { - send(dst []byte) (n int, err error) - recv(pkt *TCPPacket) error + // putOutboundEth is called by the underlying stack [PortStack.PutOutboundEth] method and populates + // response from the TX ring buffer, with data to be sent as a packet and returns n bytes written. + // See [PortStack] for more information. + putOutboundEth(response []byte) (n int, err error) + // recvEth called by the [PortStack.RecvEth] method when a packet is received on the network interface, pkt is (a pointer to) the arrived packet. + recvEth(pkt *TCPPacket) error // needsHandling() bool isPendingHandling() bool abort() @@ -37,14 +41,14 @@ func (port *tcpPort) IsPendingHandling() bool { return port.port != 0 && port.handler.isPendingHandling() } -// HandleEth writes the socket's response into dst to be sent over an ethernet interface. -// HandleEth can return 0 bytes written and a nil error to indicate no action must be taken. -func (port *tcpPort) HandleEth(dst []byte) (n int, err error) { +// PutOutboundEth writes the socket's response into dst to be sent over an ethernet interface. +// PutOutboundEth can return 0 bytes written and a nil error to indicate no action must be taken. +func (port *tcpPort) PutOutboundEth(dst []byte) (n int, err error) { if port.handler == nil { panic("nil tcp handler on port " + strconv.Itoa(int(port.port))) } - n, err = port.handler.send(dst) + n, err = port.handler.putOutboundEth(dst) port.p = false if err == ErrFlagPending { port.p = true diff --git a/stacks/port_udp.go b/stacks/port_udp.go index b77d820..7eb7b08 100644 --- a/stacks/port_udp.go +++ b/stacks/port_udp.go @@ -8,8 +8,11 @@ import ( ) type iudphandler interface { - send(dst []byte) (n int, err error) - recv(pkt *UDPPacket) error + // putOutboundEth is called by the underlying stack [PortStack.PutOutboundEth] method and populates + // response from the TX ring buffer, with data to be sent as a packet and returns n bytes written. + // See [PortStack] for more information. + putOutboundEth(response []byte) (n int, err error) + recvEth(pkt *UDPPacket) error // needsHandling() bool isPendingHandling() bool abort() @@ -28,15 +31,15 @@ func (port *udpPort) IsPendingHandling() bool { return port.port != 0 && port.handler.isPendingHandling() } -// HandleEth writes the socket's response into dst to be sent over an ethernet interface. -// HandleEth can return 0 bytes written and a nil error to indicate no action must be taken. -func (port *udpPort) HandleEth(dst []byte) (int, error) { +// PutOutboundEth writes the socket's response into dst to be sent over an ethernet interface. +// PutOutboundEth can return 0 bytes written and a nil error to indicate no action must be taken. +func (port *udpPort) PutOutboundEth(dst []byte) (int, error) { if port.handler == nil { panic("nil udp handler on port " + strconv.Itoa(int(port.port))) } - return port.handler.send(dst) + return port.handler.putOutboundEth(dst) } // Open sets the UDP handler and opens the port. diff --git a/stacks/portstack.go b/stacks/portstack.go index 614132c..f9e52bd 100644 --- a/stacks/portstack.go +++ b/stacks/portstack.go @@ -21,7 +21,7 @@ const ( type socket interface { Close() IsPendingHandling() bool - HandleEth(dst []byte) (int, error) + PutOutboundEth(dst []byte) (int, error) } var modernAge = time.Date(2024, 1, 1, 0, 0, 0, 0, time.UTC) @@ -32,7 +32,7 @@ type PortStackConfig struct { MaxOpenPortsUDP int MaxOpenPortsTCP int // GlobalHandler processes all incoming ethernet frames before they reach the port handlers. - // If GlobalHandler returns an error the frame is discarded and PortStack.HandleEth returns the error. + // If GlobalHandler returns an error the frame is discarded and PortStack.PutOutboundEth returns the error. // GlobalHandler ethernethandler Logger *slog.Logger MAC [6]byte @@ -70,7 +70,7 @@ var ErrFlagPending = errors.New("seqs: pending data") // // # Notes on PortStack handlers // -// - While PortStack.HandleEth has yet to find a outgoing packet it will look for +// - While PortStack.PutOutboundEth has yet to find a outgoing packet it will look for // a port that has a pending packet or has been flagged as pending and call its handler. // // - A call to a handler may or may not have an incoming packet ready to process. @@ -88,7 +88,7 @@ var ErrFlagPending = errors.New("seqs: pending data") // // - ErrFlagPending: When returned by the handler then the port is flagged as // pending and the written data is handled normally if there is any. If no data is written -// the call to HandleEth proceeds looking for another port to handle. +// the call to PutOutboundEth proceeds looking for another port to handle. // // - ErrFlagPending: When returned by the handler then for UDP/TCP implementations the // incoming packet argument `pkt` is flagged as not present in future calls to the handler in pkt.HasPacket calls. @@ -162,9 +162,9 @@ func (ps *PortStack) MTU() uint16 { return ps.mtu } func (ps *PortStack) HardwareAddr6() [6]byte { return ps.mac } // RecvEth validates an ethernet+ipv4 frame in payload. If it is OK then it -// defers response handling of the packets during a call to [PortStack.HandleEth]. +// defers response handling of the packets during a call to [PortStack.PutOutboundEth]. // -// If [PortStack.HandleEth] is not called often enough prevent packet queue from +// If [PortStack.PutOutboundEth] is not called often enough prevent packet queue from // filling up on a socket RecvEth will start to return [ErrDroppedPacket]. func (ps *PortStack) RecvEth(ethernetFrame []byte) (err error) { // defer ps.trace("RecvEth:end") @@ -198,7 +198,7 @@ func (ps *PortStack) RecvEth(ethernetFrame []byte) (err error) { return errPacketSmol } ps.auxARP = eth.DecodeARPv4Header(payload[eth.SizeEthernetHeader:]) - return ps.arpClient.recv(&ps.auxARP) + return ps.arpClient.recvEth(&ps.auxARP) } // IP parsing block. var ipOffset uint8 @@ -272,8 +272,8 @@ func (ps *PortStack) RecvEth(ethernetFrame []byte) (err error) { pkt.Eth = *ehdr pkt.IP = ihdr // TODO(soypat): Don't ignore IP options. pkt.UDP = uhdr - copy(pkt.payload[:], payload) //copies the payload from the EtherNet frame into the UDP packet - err = port.handler.recv(pkt) //<-- where the magic happens - invoking recv(), passes the arrived packet so in can be placed in the RX ring buffer + copy(pkt.payload[:], payload) //copies the payload from the EtherNet frame into the UDP packet + err = port.handler.recvEth(pkt) //<-- where the magic happens - invoking recvEth(), passes the arrived packet so in can be placed in the RX ring buffer if err == io.EOF { // Special case; EOF is flag to close port err = nil @@ -341,7 +341,7 @@ func (ps *PortStack) RecvEth(ethernetFrame []byte) (err error) { n := copy(pkt.data[:], ipOptions) n += copy(pkt.data[n:], tcpOptions) copy(pkt.data[n:], payload) - err = port.handler.recv(pkt) + err = port.handler.recvEth(pkt) if err == io.EOF { // Special case; EOF is flag to close port err = nil @@ -359,28 +359,28 @@ func (ps *PortStack) RecvEth(ethernetFrame []byte) (err error) { return err } -// HandleEth searches for a socket with a pending packet and writes the response +// PutOutboundEth searches for a socket with a pending packet and writes the response // into the dst argument. The length written to dst is returned. // [ErrFlagPending] can be returned by value by a handler to indicate the packet was -// not processed and that a future call to HandleEth is required to complete. +// not processed and that a future call to PutOutboundEth is required to complete. // // If a handler returns any other error the port is closed. -func (ps *PortStack) HandleEth(dst []byte) (n int, err error) { +func (ps *PortStack) PutOutboundEth(dst []byte) (n int, err error) { isTrace := ps.isLogEnabled(internal.LevelTrace) - n, err = ps.handleEth(dst) + n, err = ps.putOutboundEth(dst) if n > 0 && err == nil { if isTrace { - ps.trace("Stack:HandleEth", slog.Int("plen", n)) + ps.trace("Stack:PutOutboundEth", slog.Int("plen", n)) } ps.lastTx = ps.now() ps.processedPackets++ } else if err != nil && ps.isLogEnabled(slog.LevelError) { - ps.error("Stack:HandleEth", slog.String("err", err.Error())) + ps.error("Stack:PutOutboundEth", slog.String("err", err.Error())) } return n, err } -func (ps *PortStack) handleEth(dst []byte) (n int, err error) { +func (ps *PortStack) putOutboundEth(dst []byte) (n int, err error) { switch { case len(dst) < int(ps.mtu): return 0, io.ErrShortBuffer @@ -388,7 +388,7 @@ func (ps *PortStack) handleEth(dst []byte) (n int, err error) { case !ps.IsPendingHandling(): return 0, nil // No remaining packets to handle. } - n = ps.arpClient.handle(dst) + n = ps.arpClient.putOutboundEth(dst) if n != 0 { return n, nil } @@ -447,7 +447,7 @@ func handleSocket(dst []byte, sock socket) (int, bool, error) { return 0, false, nil // Nothing to handle, just skip. } // Socket has an unhandled packet. - n, err := sock.HandleEth(dst) + n, err := sock.PutOutboundEth(dst) if err == ErrFlagPending { // Special case: Socket may have written data but needs future handling, flagged with the ErrFlagPending error. return n, true, nil @@ -464,7 +464,7 @@ func handleSocket(dst []byte, sock socket) (int, bool, error) { return n, sock.IsPendingHandling(), err } -// IsPendingHandling checks if a call to HandleEth could possibly result in a packet being generated by the PortStack. +// IsPendingHandling checks if a call to PutOutboundEth could possibly result in a packet being generated by the PortStack. func (ps *PortStack) IsPendingHandling() bool { return ps.pendingUDPv4 > 0 || ps.pendingTCPv4 > 0 || ps.arpClient.isPending() } diff --git a/stacks/stacks_test.go b/stacks/stacks_test.go index ae0dd8e..c2df58e 100644 --- a/stacks/stacks_test.go +++ b/stacks/stacks_test.go @@ -828,7 +828,7 @@ func (egr *Exchanger) HandleTx(t *testing.T) (pkts, bytesSent int) { var err error for istack := 0; istack < len(egr.Stacks); istack++ { // This first for loop generates packets "in-flight" contained in `pipes` data structure. - egr.pipesN[istack], err = egr.Stacks[istack].HandleEth(egr.pipes[istack][:]) + egr.pipesN[istack], err = egr.Stacks[istack].PutOutboundEth(egr.pipes[istack][:]) egr.handleErr(t, err, "send", istack) bytesSent += egr.pipesN[istack] if egr.pipesN[istack] > 0 { @@ -1025,7 +1025,7 @@ func checkNoMoreDataSent(t *testing.T, msg string, egr *Exchanger) { handleTx := func() (newTxs int) { txOld := txs for istack := 0; istack < len(egr.Stacks); istack++ { - n, _ := egr.Stacks[istack].HandleEth(buf) + n, _ := egr.Stacks[istack].PutOutboundEth(buf) if n > 0 { txs++ data += n diff --git a/stacks/tcpconn.go b/stacks/tcpconn.go index d1bca69..282532a 100644 --- a/stacks/tcpconn.go +++ b/stacks/tcpconn.go @@ -332,8 +332,8 @@ func (sock *TCPConn) checkPipeOpen() error { return nil } -// recv is called by the PortStack.RecvEth when a packet is received on the network interface, pkt is (a pointer to) the arrived packet. -func (sock *TCPConn) recv(pkt *TCPPacket) (err error) { +// recvEth implements the [itcphandler] interface. +func (sock *TCPConn) recvEth(pkt *TCPPacket) (err error) { sock.trace("TCPConn.recv:start") prevState := sock.scb.State() if prevState.IsClosed() { @@ -382,8 +382,8 @@ func (sock *TCPConn) recv(pkt *TCPPacket) (err error) { return err } -// Send this handler is called by the underlying stack and populates response[] from the TX ring buffer, with data to be sent as a packet -func (sock *TCPConn) send(response []byte) (n int, err error) { +// Send this handler is called by the underlying stack [PortStack.PutOutboundEth] method and populates response[] from the TX ring buffer, with data to be sent as a packet +func (sock *TCPConn) putOutboundEth(response []byte) (n int, err error) { defer sock.trace("TCPConn.send:start") if !sock.remote.IsValid() { return 0, nil // No remote address yet, yield. diff --git a/stacks/tcplistener.go b/stacks/tcplistener.go index a9bea17..e00f74a 100644 --- a/stacks/tcplistener.go +++ b/stacks/tcplistener.go @@ -103,7 +103,8 @@ func (l *TCPListener) Addr() net.Addr { return &l.laddr } -func (l *TCPListener) send(dst []byte) (n int, err error) { +// putOutboundEth implements the [itcphandler] interface. +func (l *TCPListener) putOutboundEth(dst []byte) (n int, err error) { if !l.isOpen() { return 0, io.EOF } @@ -112,7 +113,7 @@ func (l *TCPListener) send(dst []byte) (n int, err error) { if conn.LocalPort() == 0 || !conn.isPendingHandling() { continue } - n, err = conn.send(dst) + n, err = conn.putOutboundEth(dst) if err == io.EOF { l.freeConnForReuse(i) err = nil @@ -125,7 +126,8 @@ func (l *TCPListener) send(dst []byte) (n int, err error) { return 0, nil } -func (l *TCPListener) recv(pkt *TCPPacket) error { +// recvEth implements the [itcphandler] interface. +func (l *TCPListener) recvEth(pkt *TCPPacket) error { if !l.isOpen() { return io.EOF } @@ -143,7 +145,7 @@ func (l *TCPListener) recv(pkt *TCPPacket) error { pkt.IP.Source != conn.remote.Addr().As4() { continue // Not for this connection. } - err := conn.recv(pkt) + err := conn.recvEth(pkt) if err == io.EOF { l.freeConnForReuse(connidx) err = nil @@ -154,7 +156,7 @@ func (l *TCPListener) recv(pkt *TCPPacket) error { l.trace("lst:noconn2recv") return ErrDroppedPacket // No available connection to receive packet. } - err := freeconn.recv(pkt) + err := freeconn.recvEth(pkt) if err == io.EOF { l.freeConnForReuse(connidx) freeconn.abort() diff --git a/stacks/udpconn.go b/stacks/udpconn.go index bf1f8c7..d2c5eeb 100644 --- a/stacks/udpconn.go +++ b/stacks/udpconn.go @@ -156,7 +156,7 @@ func (sock *UDPConn) FlushOutputBuffer() error { return nil } -// Write writes into the underlying tx ring buffer - calls to the portStacks handleEth() will send the (queued) data +// Write writes into the underlying tx ring buffer - calls to the portStacks [PortStack.PutOutboundEth] will send the (queued) data func (sock *UDPConn) Write(b []byte) (int, error) { err := sock.checkPipeClosed() if err != nil { @@ -246,8 +246,8 @@ func (sock *UDPConn) SetWriteDeadline(t time.Time) error { return nil } -// recv takes (the contents of ) a packet it and puts it in the RX ring buffer -func (sock *UDPConn) recv(pkt *UDPPacket) (err error) { +// recvEth implements the [iudphandler] interface. +func (sock *UDPConn) recvEth(pkt *UDPPacket) (err error) { sock.trace("UDP.recv:start") if sock.closing { return io.EOF @@ -265,8 +265,8 @@ func (sock *UDPConn) recv(pkt *UDPPacket) (err error) { return err //which is hopefully nil - but could be errRingBufferFull } -// send - this handler is called regularly by the underlying stack (HandleEth) and populates response[] from the TX ring buffer, with data to be sent as a packet -func (sock *UDPConn) send(response []byte) (n int, err error) { +// putOutboundEth implements the [iudphandler] interface. +func (sock *UDPConn) putOutboundEth(response []byte) (n int, err error) { sock.trace("UDPConn.send:start") if !sock.remote.IsValid() { return 0, nil // No remote address yet, yield. From 7d47ab1463856a63f4dad904bec8824101541740 Mon Sep 17 00:00:00 2001 From: soypat Date: Sun, 21 Jul 2024 12:33:48 -0300 Subject: [PATCH 4/4] UDPConn: add test and fix checksum calculation in sent packets --- stacks/port_tcp.go | 23 ---------------- stacks/port_udp.go | 23 ++++++++++++++++ stacks/stacks_test.go | 63 +++++++++++++++++++++++++++++++++++++++++++ stacks/udpconn.go | 4 +++ 4 files changed, 90 insertions(+), 23 deletions(-) diff --git a/stacks/port_tcp.go b/stacks/port_tcp.go index b38b7f9..9df7062 100644 --- a/stacks/port_tcp.go +++ b/stacks/port_tcp.go @@ -198,29 +198,6 @@ func (pkt *TCPPacket) CalculateHeaders(seg seqs.Segment, payload []byte) { pkt.TCP.Checksum = pkt.TCP.CalculateChecksumIPv4(&pkt.IP, nil, payload) } -func (pkt *UDPPacket) CalculateHeaders(payload []byte) { - const ipLenInWords = 5 - pkt.Eth.SizeOrEtherType = uint16(eth.EtherTypeIPv4) - - // IPv4 frame. - pkt.IP.Protocol = 17 // UDP - pkt.IP.TTL = 64 - pkt.IP.ID = prand16(pkt.IP.ID) - pkt.IP.VersionAndIHL = ipLenInWords // Sets IHL: No IP options. Version set automatically. - pkt.IP.TotalLength = 4*ipLenInWords + eth.SizeUDPHeader + uint16(len(payload)) - // TODO(soypat): Document how to handle ToS. For now just use ToS used by other side. - pkt.IP.Flags = 0 // packet.IP.ToS = 0 - pkt.IP.Checksum = pkt.IP.CalculateChecksum() - - pkt.UDP = eth.UDPHeader{ - SourcePort: pkt.UDP.SourcePort, - DestinationPort: pkt.UDP.DestinationPort, - Checksum: pkt.UDP.CalculateChecksumIPv4(&pkt.IP, payload), - Length: uint16(len(payload) + 8), - } - -} - // prand16 generates a pseudo random number from a seed. func prand16(seed uint16) uint16 { // 16bit Xorshift https://en.wikipedia.org/wiki/Xorshift diff --git a/stacks/port_udp.go b/stacks/port_udp.go index 7eb7b08..c250e64 100644 --- a/stacks/port_udp.go +++ b/stacks/port_udp.go @@ -93,3 +93,26 @@ func (pkt *UDPPacket) Payload() []byte { } return pkt.payload[:uLen] } + +func (pkt *UDPPacket) CalculateHeaders(payload []byte) { + const ipLenInWords = 5 + pkt.Eth.SizeOrEtherType = uint16(eth.EtherTypeIPv4) + + // IPv4 frame. + pkt.IP.Protocol = 17 // UDP + pkt.IP.TTL = 64 + pkt.IP.ID = prand16(pkt.IP.ID) + pkt.IP.VersionAndIHL = ipLenInWords // Sets IHL: No IP options. Version set automatically. + pkt.IP.TotalLength = 4*ipLenInWords + eth.SizeUDPHeader + uint16(len(payload)) + // TODO(soypat): Document how to handle ToS. For now just use ToS used by other side. + pkt.IP.Flags = 0 // packet.IP.ToS = 0 + pkt.IP.Checksum = pkt.IP.CalculateChecksum() + + pkt.UDP = eth.UDPHeader{ + SourcePort: pkt.UDP.SourcePort, + DestinationPort: pkt.UDP.DestinationPort, + Checksum: 0, + Length: uint16(len(payload) + eth.SizeUDPHeader), + } + pkt.UDP.Checksum = pkt.UDP.CalculateChecksumIPv4(&pkt.IP, payload) +} diff --git a/stacks/stacks_test.go b/stacks/stacks_test.go index c2df58e..5b84068 100644 --- a/stacks/stacks_test.go +++ b/stacks/stacks_test.go @@ -39,6 +39,43 @@ const ( synack = seqs.FlagSYN | seqs.FlagACK ) +func TestUDP(t *testing.T) { + const mtu = defaultMTU + const bufsize = mtu / 2 + const nmessages = 100 + conn1, conn2 := createUDPPair(t, bufsize, bufsize, mtu) + ex := NewExchanger(conn1.PortStack(), conn2.PortStack()) + rng := rand.New(rand.NewSource(1)) + var send, recv [mtu]byte + rng.Read(send[:]) + for test := 0; test < nmessages; test++ { + n := rng.Int() % bufsize + ngot, err := conn1.Write(send[:n]) + if err != nil { + t.Fatal(err) + } else if n != ngot { + t.Errorf("want %d bytes written, got %d", n, ngot) + } + _, sent := ex.DoExchanges(t, 1) + if sent <= 0 { + t.Fatal("no data sent") + } + if conn2.BufferedInput() == 0 { + t.Fatal("no buffered input") + } + nrecv, err := conn2.Read(recv[:]) + if err != nil { + t.Fatal(err) + } else if nrecv != n { + t.Errorf("want %d bytes received, got %d", n, nrecv) + } + if !bytes.Equal(send[:n], recv[:n]) { + t.Errorf("receive/send mismatch") + } + recv = [mtu]byte{} // Zero receive buffer. + } +} + func TestDNS(t *testing.T) { const networkSize = testingLargeNetworkSize // How many distinct IP/MAC addresses on network. const questionHost = "www.go.dev" @@ -932,6 +969,32 @@ func createTCPClientListenerPair(t *testing.T, clientSizes, listenerSizes, maxLi return client, listener } +func createUDPPair(t *testing.T, bufsize1, bufsize2, mtu uint16) (one, two *stacks.UDPConn) { + t.Helper() + const ( + p1, p2 = 1024, 2048 + ) + Stacks := createPortStacks(t, 2, mtu) + c1, err := stacks.NewUDPConn(Stacks[0], stacks.UDPConnConfig{TxBufSize: bufsize1, RxBufSize: bufsize1}) + if err != nil { + t.Fatal(err) + } + err = c1.OpenDialUDP(p1, Stacks[1].HardwareAddr6(), netip.AddrPortFrom(Stacks[1].Addr(), p2)) + if err != nil { + t.Fatal(err) + } + + c2, err := stacks.NewUDPConn(Stacks[1], stacks.UDPConnConfig{TxBufSize: bufsize2, RxBufSize: bufsize2}) + if err != nil { + t.Fatal(err) + } + err = c2.OpenDialUDP(p2, Stacks[0].HardwareAddr6(), netip.AddrPortFrom(Stacks[0].Addr(), p1)) + if err != nil { + t.Fatal(err) + } + return c1, c2 +} + func createTCPClientServerPair(t *testing.T, clientSizes, serverSizes, mtu uint16) (client, server *stacks.TCPConn) { t.Helper() const ( diff --git a/stacks/udpconn.go b/stacks/udpconn.go index d2c5eeb..194332c 100644 --- a/stacks/udpconn.go +++ b/stacks/udpconn.go @@ -97,6 +97,10 @@ func (sock *UDPConn) OpenDialUDP(localPort uint16, remoteMAC [6]byte, remote net return nil } +func (sock *UDPConn) PortStack() *PortStack { + return sock.stack +} + // abort deletes connection state and fails all pending Read/Write calls. func (sock *UDPConn) abort() { sock.rx.Reset()