diff --git a/eth/headers.go b/eth/headers.go index debc8bd..c687488 100644 --- a/eth/headers.go +++ b/eth/headers.go @@ -228,6 +228,7 @@ func (ehdr EthernetHeader) AssertType() EtherType { return EtherType(ehdr.SizeOr // Put marshals the ethernet frame onto buf. buf needs to be 14 bytes in length or Put panics. func (ehdr *EthernetHeader) Put(buf []byte) { _ = buf[13] + copy(buf[0:], ehdr.Destination[0:]) copy(buf[6:], ehdr.Source[0:]) binary.BigEndian.PutUint16(buf[12:14], ehdr.SizeOrEtherType) @@ -256,7 +257,7 @@ func (ehdr *EthernetHeader) String() string { // IHL returns the internet header length in 32bit words and is guaranteed to be within 0..15. // Valid values for IHL are 5..15. When multiplied by 4 this yields number of bytes of the header, 20..60. -func (iphdr *IPv4Header) IHL() uint8 { return iphdr.VersionAndIHL & 0xf } +func (iphdr *IPv4Header) IHL() uint8 { return iphdr.VersionAndIHL & 0xf } //low four bits func (iphdr *IPv4Header) Version() uint8 { return iphdr.VersionAndIHL >> 4 } func (iphdr *IPv4Header) DSCP() uint8 { return iphdr.ToS >> 2 } func (iphdr *IPv4Header) ECN() uint8 { return iphdr.ToS & 0b11 } diff --git a/stacks/arp.go b/stacks/arp.go index bd8e02b..27f6591 100644 --- a/stacks/arp.go +++ b/stacks/arp.go @@ -95,7 +95,8 @@ func (c *arpClient) pendingOutReqARPv4() bool { return c.result.Operation == 1 // User asked for a ARP request. } -func (c *arpClient) handle(dst []byte) (n int) { +// putOutboundEth implements [iudphandler] interface. +func (c *arpClient) putOutboundEth(dst []byte) (n int) { pendingOutReq := c.pendingOutReqARPv4() switch { case pendingOutReq: @@ -131,7 +132,8 @@ func (c *arpClient) handle(dst []byte) (n int) { return n } -func (c *arpClient) recv(ahdr *eth.ARPv4Header) error { +// recvEth implements [iudphandler] interface. +func (c *arpClient) recvEth(ahdr *eth.ARPv4Header) error { if ahdr.HardwareLength != 6 || ahdr.ProtoLength != 4 || ahdr.HardwareType != 1 || ahdr.AssertEtherType() != eth.EtherTypeIPv4 { return errARPUnsupported // Ignore ARP unsupported requests. } diff --git a/stacks/dhcp_client.go b/stacks/dhcp_client.go index 1a03c35..9e4eab2 100644 --- a/stacks/dhcp_client.go +++ b/stacks/dhcp_client.go @@ -243,7 +243,8 @@ func getDefaultParams() []byte { return unsafe.Slice((*byte)(unsafe.Pointer(ptr)), len(dhcpDefaultParamReqList)) } -func (d *DHCPClient) send(dst []byte) (n int, err error) { +// putOutboundEth implements [iudphandler] interface. +func (d *DHCPClient) putOutboundEth(dst []byte) (n int, err error) { if d.isAborted() { return 0, io.EOF } else if !d.isPendingHandling() { @@ -340,7 +341,8 @@ func (d *DHCPClient) send(dst []byte) (n int, err error) { return ptr, nil } -func (d *DHCPClient) recv(pkt *UDPPacket) (err error) { +// recvEth implements [iudphandler] interface. +func (d *DHCPClient) recvEth(pkt *UDPPacket) (err error) { if d.isAborted() { return io.EOF } diff --git a/stacks/dhcp_server.go b/stacks/dhcp_server.go index f0ff9cc..23d300e 100644 --- a/stacks/dhcp_server.go +++ b/stacks/dhcp_server.go @@ -45,7 +45,8 @@ func (d *DHCPServer) Start() error { return d.stack.OpenUDP(d.port, d) } -func (d *DHCPServer) recv(pkt *UDPPacket) (err error) { +// recvEth implements [iudphandler] interface. +func (d *DHCPServer) recvEth(pkt *UDPPacket) (err error) { if d.isAborted() { return io.EOF // Signal to close socket. } @@ -57,7 +58,8 @@ func (d *DHCPServer) recv(pkt *UDPPacket) (err error) { return nil } -func (d *DHCPServer) send(dst []byte) (int, error) { +// putOutboundEth implements [iudphandler] interface. +func (d *DHCPServer) putOutboundEth(dst []byte) (int, error) { if d.isAborted() { return 0, io.EOF // Signal to close socket. } diff --git a/stacks/dns_client.go b/stacks/dns_client.go index ed438bc..3f0ed3c 100644 --- a/stacks/dns_client.go +++ b/stacks/dns_client.go @@ -68,7 +68,8 @@ func (dnsc *DNSClient) StartResolve(cfg DNSResolveConfig) error { return nil } -func (dnsc *DNSClient) send(dst []byte) (n int, err error) { +// putOutboundEth implements [iudphandler] interface. +func (dnsc *DNSClient) putOutboundEth(dst []byte) (n int, err error) { if dnsc.state == dnsAborted { return 0, io.EOF } else if dnsc.state != dnsSendQuery { @@ -101,7 +102,8 @@ func (dnsc *DNSClient) send(dst []byte) (n int, err error) { return payloadOffset + int(msgLen), nil } -func (dnsc *DNSClient) recv(pkt *UDPPacket) error { +// recvEth implements the [iudphandler] interface. +func (dnsc *DNSClient) recvEth(pkt *UDPPacket) error { if dnsc.state == dnsAborted { return io.EOF } else if dnsc.state != dnsAwaitResponse { diff --git a/stacks/ntp_client.go b/stacks/ntp_client.go index 90ed4e2..94298dc 100644 --- a/stacks/ntp_client.go +++ b/stacks/ntp_client.go @@ -64,7 +64,8 @@ func (nc *NTPClient) BeginDefaultRequest(hwaddr [6]byte, raddr netip.Addr) error return nil } -func (nc *NTPClient) send(dst []byte) (n int, err error) { +// putOutboundEth implements [iudphandler] interface. +func (nc *NTPClient) putOutboundEth(dst []byte) (n int, err error) { const ( payloadoffset = eth.SizeEthernetHeader + eth.SizeIPv4Header + eth.SizeUDPHeader ToS = 192 @@ -101,7 +102,8 @@ func (nc *NTPClient) send(dst []byte) (n int, err error) { return payloadoffset + ntp.SizeHeader, nil } -func (nc *NTPClient) recv(pkt *UDPPacket) (err error) { +// recvEth implements the [iudphandler] interface. +func (nc *NTPClient) recvEth(pkt *UDPPacket) (err error) { if nc.isAborted() || nc.IsDone() { return io.EOF } diff --git a/stacks/port_tcp.go b/stacks/port_tcp.go index c221612..9df7062 100644 --- a/stacks/port_tcp.go +++ b/stacks/port_tcp.go @@ -9,15 +9,20 @@ import ( "github.com/soypat/seqs/eth" ) -// tcphandler represents a user provided function for handling incoming TCP packets on a port. -// Incoming data is sent inside the `pkt` TCPPacket argument when pkt.HasPacket returns true. -// Outgoing data is stored into the `response` byte slice. The function must return the number of +// itcphandler represents a user provided function for handling incoming TCP packets on a port. +// Incoming data is passed in a 'pkt' to the recv function which is invoked whenever data arrives (by [PortStack.RecvEth]) +// Outgoing data is written into the `dst` byte slice (from the tx ring buffer). The function must return the number of // bytes written to `response` and an error. -// +// TCPConn provides an implemntation of this interface - note .send is ONLY called by [PortStack.PutOutboundEth] // See [PortStack] for information on how to use this function and other port handlers. +// Note [TCPConn] is our implementation of this interface type itcphandler interface { - send(dst []byte) (n int, err error) - recv(pkt *TCPPacket) error + // putOutboundEth is called by the underlying stack [PortStack.PutOutboundEth] method and populates + // response from the TX ring buffer, with data to be sent as a packet and returns n bytes written. + // See [PortStack] for more information. + putOutboundEth(response []byte) (n int, err error) + // recvEth called by the [PortStack.RecvEth] method when a packet is received on the network interface, pkt is (a pointer to) the arrived packet. + recvEth(pkt *TCPPacket) error // needsHandling() bool isPendingHandling() bool abort() @@ -36,14 +41,14 @@ func (port *tcpPort) IsPendingHandling() bool { return port.port != 0 && port.handler.isPendingHandling() } -// HandleEth writes the socket's response into dst to be sent over an ethernet interface. -// HandleEth can return 0 bytes written and a nil error to indicate no action must be taken. -func (port *tcpPort) HandleEth(dst []byte) (n int, err error) { +// PutOutboundEth writes the socket's response into dst to be sent over an ethernet interface. +// PutOutboundEth can return 0 bytes written and a nil error to indicate no action must be taken. +func (port *tcpPort) PutOutboundEth(dst []byte) (n int, err error) { if port.handler == nil { panic("nil tcp handler on port " + strconv.Itoa(int(port.port))) } - n, err = port.handler.send(dst) + n, err = port.handler.putOutboundEth(dst) port.p = false if err == ErrFlagPending { port.p = true @@ -51,7 +56,7 @@ func (port *tcpPort) HandleEth(dst []byte) (n int, err error) { return n, err } -// Open sets the UDP handler and opens the port. +// Open sets the TCP handler and opens the port. func (port *tcpPort) Open(portNum uint16, handler itcphandler) { if portNum == 0 || handler == nil { panic("invalid port or nil handler" + strconv.Itoa(int(port.port))) diff --git a/stacks/port_udp.go b/stacks/port_udp.go index c332a7e..c250e64 100644 --- a/stacks/port_udp.go +++ b/stacks/port_udp.go @@ -8,16 +8,19 @@ import ( ) type iudphandler interface { - send(dst []byte) (n int, err error) - recv(pkt *UDPPacket) error + // putOutboundEth is called by the underlying stack [PortStack.PutOutboundEth] method and populates + // response from the TX ring buffer, with data to be sent as a packet and returns n bytes written. + // See [PortStack] for more information. + putOutboundEth(response []byte) (n int, err error) + recvEth(pkt *UDPPacket) error // needsHandling() bool isPendingHandling() bool abort() } type udpPort struct { - ihandler iudphandler - port uint16 + handler iudphandler + port uint16 } func (port udpPort) Port() uint16 { return port.port } @@ -25,32 +28,35 @@ func (port udpPort) Port() uint16 { return port.port } // IsPendingHandling returns true if there are packet(s) pending handling. func (port *udpPort) IsPendingHandling() bool { // return port.port != 0 && port.ihandler.isPendingHandling() - return port.port != 0 && port.ihandler.isPendingHandling() + return port.port != 0 && port.handler.isPendingHandling() } -// HandleEth writes the socket's response into dst to be sent over an ethernet interface. -// HandleEth can return 0 bytes written and a nil error to indicate no action must be taken. -func (port *udpPort) HandleEth(dst []byte) (int, error) { - if port.ihandler == nil { +// PutOutboundEth writes the socket's response into dst to be sent over an ethernet interface. +// PutOutboundEth can return 0 bytes written and a nil error to indicate no action must be taken. +func (port *udpPort) PutOutboundEth(dst []byte) (int, error) { + + if port.handler == nil { panic("nil udp handler on port " + strconv.Itoa(int(port.port))) } - return port.ihandler.send(dst) + + return port.handler.putOutboundEth(dst) } // Open sets the UDP handler and opens the port. +// This is effectively a constructor for the port NewUDPPort() - would be an alternative name func (port *udpPort) Open(portNum uint16, h iudphandler) { if portNum == 0 || h == nil { panic("invalid port or nil handler" + strconv.Itoa(int(port.port))) } else if port.port != 0 { panic("port already open") } - port.ihandler = h + port.handler = h port.port = portNum } func (port *udpPort) Close() { port.port = 0 // Port 0 flags the port is inactive. - port.ihandler = nil + port.handler = nil } // UDP socket can be forced to respond even if no packet has been received @@ -70,7 +76,7 @@ func (pkt *UDPPacket) PutHeaders(b []byte) { panic("short UDPPacket buffer") } if pkt.IP.IHL() != 5 { - panic("UDPPacket.PutHeaders expects no IP options") + panic("UDPPacket.PutHeaders expects no IP options " + strconv.Itoa(int(pkt.IP.IHL()))) } pkt.Eth.Put(b) pkt.IP.Put(b[eth.SizeEthernetHeader:]) @@ -87,3 +93,26 @@ func (pkt *UDPPacket) Payload() []byte { } return pkt.payload[:uLen] } + +func (pkt *UDPPacket) CalculateHeaders(payload []byte) { + const ipLenInWords = 5 + pkt.Eth.SizeOrEtherType = uint16(eth.EtherTypeIPv4) + + // IPv4 frame. + pkt.IP.Protocol = 17 // UDP + pkt.IP.TTL = 64 + pkt.IP.ID = prand16(pkt.IP.ID) + pkt.IP.VersionAndIHL = ipLenInWords // Sets IHL: No IP options. Version set automatically. + pkt.IP.TotalLength = 4*ipLenInWords + eth.SizeUDPHeader + uint16(len(payload)) + // TODO(soypat): Document how to handle ToS. For now just use ToS used by other side. + pkt.IP.Flags = 0 // packet.IP.ToS = 0 + pkt.IP.Checksum = pkt.IP.CalculateChecksum() + + pkt.UDP = eth.UDPHeader{ + SourcePort: pkt.UDP.SourcePort, + DestinationPort: pkt.UDP.DestinationPort, + Checksum: 0, + Length: uint16(len(payload) + eth.SizeUDPHeader), + } + pkt.UDP.Checksum = pkt.UDP.CalculateChecksumIPv4(&pkt.IP, payload) +} diff --git a/stacks/portstack.go b/stacks/portstack.go index c5533f4..f9e52bd 100644 --- a/stacks/portstack.go +++ b/stacks/portstack.go @@ -18,6 +18,12 @@ const ( arpOpWait = 0xffff ) +type socket interface { + Close() + IsPendingHandling() bool + PutOutboundEth(dst []byte) (int, error) +} + var modernAge = time.Date(2024, 1, 1, 0, 0, 0, 0, time.UTC) type ethernethandler = func(ehdr *eth.EthernetHeader, ethPayload []byte) error @@ -26,7 +32,7 @@ type PortStackConfig struct { MaxOpenPortsUDP int MaxOpenPortsTCP int // GlobalHandler processes all incoming ethernet frames before they reach the port handlers. - // If GlobalHandler returns an error the frame is discarded and PortStack.HandleEth returns the error. + // If GlobalHandler returns an error the frame is discarded and PortStack.PutOutboundEth returns the error. // GlobalHandler ethernethandler Logger *slog.Logger MAC [6]byte @@ -56,7 +62,7 @@ func NewPortStack(cfg PortStackConfig) *PortStack { var ErrFlagPending = errors.New("seqs: pending data") -// PortStack implements partial TCP/UDP packet muxing to respective sockets with [PortStack.RcvEth]. +// PortStack implements partial TCP/UDP packet muxing to respective sockets with [PortStack.RecvEth]. // This implementation limits itself basic header validation and port matching. // Users of PortStack are expected to implement connection state, packet buffering and retransmission logic. // - In the case of TCP this means implementing the TCP state machine. @@ -64,7 +70,7 @@ var ErrFlagPending = errors.New("seqs: pending data") // // # Notes on PortStack handlers // -// - While PortStack.HandleEth has yet to find a outgoing packet it will look for +// - While PortStack.PutOutboundEth has yet to find a outgoing packet it will look for // a port that has a pending packet or has been flagged as pending and call its handler. // // - A call to a handler may or may not have an incoming packet ready to process. @@ -82,7 +88,7 @@ var ErrFlagPending = errors.New("seqs: pending data") // // - ErrFlagPending: When returned by the handler then the port is flagged as // pending and the written data is handled normally if there is any. If no data is written -// the call to HandleEth proceeds looking for another port to handle. +// the call to PutOutboundEth proceeds looking for another port to handle. // // - ErrFlagPending: When returned by the handler then for UDP/TCP implementations the // incoming packet argument `pkt` is flagged as not present in future calls to the handler in pkt.HasPacket calls. @@ -156,9 +162,9 @@ func (ps *PortStack) MTU() uint16 { return ps.mtu } func (ps *PortStack) HardwareAddr6() [6]byte { return ps.mac } // RecvEth validates an ethernet+ipv4 frame in payload. If it is OK then it -// defers response handling of the packets during a call to [Stack.HandleEth]. +// defers response handling of the packets during a call to [PortStack.PutOutboundEth]. // -// If [Stack.HandleEth] is not called often enough prevent packet queue from +// If [PortStack.PutOutboundEth] is not called often enough prevent packet queue from // filling up on a socket RecvEth will start to return [ErrDroppedPacket]. func (ps *PortStack) RecvEth(ethernetFrame []byte) (err error) { // defer ps.trace("RecvEth:end") @@ -167,7 +173,6 @@ func (ps *PortStack) RecvEth(ethernetFrame []byte) (err error) { if len(payload) < eth.SizeEthernetHeader+eth.SizeIPv4Header { return errPacketSmol } else if len(payload) > int(ps.mtu) { - println("recv", payload, ps.mtu) return errPacketExceedsMTU } ps.trace("Stack.RecvEth:start", slog.Int("plen", len(payload))) @@ -193,7 +198,7 @@ func (ps *PortStack) RecvEth(ethernetFrame []byte) (err error) { return errPacketSmol } ps.auxARP = eth.DecodeARPv4Header(payload[eth.SizeEthernetHeader:]) - return ps.arpClient.recv(&ps.auxARP) + return ps.arpClient.recvEth(&ps.auxARP) } // IP parsing block. var ipOffset uint8 @@ -267,8 +272,8 @@ func (ps *PortStack) RecvEth(ethernetFrame []byte) (err error) { pkt.Eth = *ehdr pkt.IP = ihdr // TODO(soypat): Don't ignore IP options. pkt.UDP = uhdr - copy(pkt.payload[:], payload) - err = port.ihandler.recv(pkt) + copy(pkt.payload[:], payload) //copies the payload from the EtherNet frame into the UDP packet + err = port.handler.recvEth(pkt) //<-- where the magic happens - invoking recvEth(), passes the arrived packet so in can be placed in the RX ring buffer if err == io.EOF { // Special case; EOF is flag to close port err = nil @@ -336,7 +341,7 @@ func (ps *PortStack) RecvEth(ethernetFrame []byte) (err error) { n := copy(pkt.data[:], ipOptions) n += copy(pkt.data[n:], tcpOptions) copy(pkt.data[n:], payload) - err = port.handler.recv(pkt) + err = port.handler.recvEth(pkt) if err == io.EOF { // Special case; EOF is flag to close port err = nil @@ -354,28 +359,28 @@ func (ps *PortStack) RecvEth(ethernetFrame []byte) (err error) { return err } -func (ps *PortStack) HandleEth(dst []byte) (n int, err error) { +// PutOutboundEth searches for a socket with a pending packet and writes the response +// into the dst argument. The length written to dst is returned. +// [ErrFlagPending] can be returned by value by a handler to indicate the packet was +// not processed and that a future call to PutOutboundEth is required to complete. +// +// If a handler returns any other error the port is closed. +func (ps *PortStack) PutOutboundEth(dst []byte) (n int, err error) { isTrace := ps.isLogEnabled(internal.LevelTrace) - n, err = ps.handleEth(dst) + n, err = ps.putOutboundEth(dst) if n > 0 && err == nil { if isTrace { - ps.trace("Stack: HandleEth", slog.Int("plen", n)) + ps.trace("Stack:PutOutboundEth", slog.Int("plen", n)) } ps.lastTx = ps.now() ps.processedPackets++ } else if err != nil && ps.isLogEnabled(slog.LevelError) { - ps.error("Stack:HandleEth", slog.String("err", err.Error())) + ps.error("Stack:PutOutboundEth", slog.String("err", err.Error())) } return n, err } -// HandleEth searches for a socket with a pending packet and writes the response -// into the dst argument. The length written to dst is returned. -// [ErrFlagPending] can be returned by value by a handler to indicate the packet was -// not processed and that a future call to HandleEth is required to complete. -// -// If a handler returns any other error the port is closed. -func (ps *PortStack) handleEth(dst []byte) (n int, err error) { +func (ps *PortStack) putOutboundEth(dst []byte) (n int, err error) { switch { case len(dst) < int(ps.mtu): return 0, io.ErrShortBuffer @@ -383,42 +388,15 @@ func (ps *PortStack) handleEth(dst []byte) (n int, err error) { case !ps.IsPendingHandling(): return 0, nil // No remaining packets to handle. } - n = ps.arpClient.handle(dst) + n = ps.arpClient.putOutboundEth(dst) if n != 0 { return n, nil } - type Socket interface { - Close() - IsPendingHandling() bool - HandleEth(dst []byte) (int, error) - } - - handleSocket := func(dst []byte, sock Socket) (int, bool, error) { - if !sock.IsPendingHandling() { - return 0, false, nil // Nothing to handle, just skip. - } - // Socket has an unhandled packet. - n, err := sock.HandleEth(dst) - if err == ErrFlagPending { - // Special case: Socket may have written data but needs future handling, flagged with the ErrFlagPending error. - return n, true, nil - } - if err != nil { - sock.Close() - if err == io.EOF { - // Special case: If error is EOF we don't return it to caller but we do write the packet if any. - err = nil - } else { - n = 0 // Clear n on unknown error and return error up the call stack. - } - } - return n, sock.IsPendingHandling(), err - } - isDebug := ps.isLogEnabled(slog.LevelDebug) socketPending := false if ps.pendingUDPv4 > 0 { + for i := range ps.portsUDP { n, pending, err := handleSocket(dst, &ps.portsUDP[i]) if pending { @@ -427,6 +405,7 @@ func (ps *PortStack) handleEth(dst []byte) (n int, err error) { if err != nil { return 0, err } else if n > 0 { + if isDebug { ps.debug("UDP:send", slog.Int("plen", n)) } @@ -462,7 +441,30 @@ func (ps *PortStack) handleEth(dst []byte) (n int, err error) { return 0, nil // Nothing handled. } -// IsPendingHandling checks if a call to HandleEth could possibly result in a packet being generated by the PortStack. +func handleSocket(dst []byte, sock socket) (int, bool, error) { + //note sock is a UDPport or TCPport - things that impliment the Sock interface + if !sock.IsPendingHandling() { + return 0, false, nil // Nothing to handle, just skip. + } + // Socket has an unhandled packet. + n, err := sock.PutOutboundEth(dst) + if err == ErrFlagPending { + // Special case: Socket may have written data but needs future handling, flagged with the ErrFlagPending error. + return n, true, nil + } + if err != nil { + sock.Close() + if err == io.EOF { + // Special case: If error is EOF we don't return it to caller but we do write the packet if any. + err = nil + } else { + n = 0 // Clear n on unknown error and return error up the call stack. + } + } + return n, sock.IsPendingHandling(), err +} + +// IsPendingHandling checks if a call to PutOutboundEth could possibly result in a packet being generated by the PortStack. func (ps *PortStack) IsPendingHandling() bool { return ps.pendingUDPv4 > 0 || ps.pendingTCPv4 > 0 || ps.arpClient.isPending() } @@ -638,3 +640,11 @@ func bytesAttr(name string, b []byte) slog.Attr { Value: slog.StringValue(string(b)), } } + +func contiguous2Bufs(b1, b2 int) ([]byte, []byte) { + buf := make([]byte, b1+b2) + // make sure first buffer's capacity does not bleed into second buffer to avoid append interference. + buf1 := buf[:b1:b1] + buf2 := buf[b1 : b1+b2] + return buf1, buf2 +} diff --git a/stacks/ring.go b/stacks/ring.go index 93843de..e5adb35 100644 --- a/stacks/ring.go +++ b/stacks/ring.go @@ -14,6 +14,7 @@ type ring struct { end int } +// Write writes b[] into the ring buffer at the current write position (r.end) - wrapping if needed, and increasing r.end (possibly wrapping) func (r *ring) Write(b []byte) (int, error) { free := r.Free() if len(b) > free { @@ -105,7 +106,7 @@ func (r *ring) onReadEnd() { r.off = 0 // Wrap around. } if r.off == r.end { - r.Reset() // We read everything, reset. + r.Reset() // We have read everything, reset. (reduces split reads - can get the data in one chunk more often) } } diff --git a/stacks/stacks_test.go b/stacks/stacks_test.go index ae0dd8e..5b84068 100644 --- a/stacks/stacks_test.go +++ b/stacks/stacks_test.go @@ -39,6 +39,43 @@ const ( synack = seqs.FlagSYN | seqs.FlagACK ) +func TestUDP(t *testing.T) { + const mtu = defaultMTU + const bufsize = mtu / 2 + const nmessages = 100 + conn1, conn2 := createUDPPair(t, bufsize, bufsize, mtu) + ex := NewExchanger(conn1.PortStack(), conn2.PortStack()) + rng := rand.New(rand.NewSource(1)) + var send, recv [mtu]byte + rng.Read(send[:]) + for test := 0; test < nmessages; test++ { + n := rng.Int() % bufsize + ngot, err := conn1.Write(send[:n]) + if err != nil { + t.Fatal(err) + } else if n != ngot { + t.Errorf("want %d bytes written, got %d", n, ngot) + } + _, sent := ex.DoExchanges(t, 1) + if sent <= 0 { + t.Fatal("no data sent") + } + if conn2.BufferedInput() == 0 { + t.Fatal("no buffered input") + } + nrecv, err := conn2.Read(recv[:]) + if err != nil { + t.Fatal(err) + } else if nrecv != n { + t.Errorf("want %d bytes received, got %d", n, nrecv) + } + if !bytes.Equal(send[:n], recv[:n]) { + t.Errorf("receive/send mismatch") + } + recv = [mtu]byte{} // Zero receive buffer. + } +} + func TestDNS(t *testing.T) { const networkSize = testingLargeNetworkSize // How many distinct IP/MAC addresses on network. const questionHost = "www.go.dev" @@ -828,7 +865,7 @@ func (egr *Exchanger) HandleTx(t *testing.T) (pkts, bytesSent int) { var err error for istack := 0; istack < len(egr.Stacks); istack++ { // This first for loop generates packets "in-flight" contained in `pipes` data structure. - egr.pipesN[istack], err = egr.Stacks[istack].HandleEth(egr.pipes[istack][:]) + egr.pipesN[istack], err = egr.Stacks[istack].PutOutboundEth(egr.pipes[istack][:]) egr.handleErr(t, err, "send", istack) bytesSent += egr.pipesN[istack] if egr.pipesN[istack] > 0 { @@ -932,6 +969,32 @@ func createTCPClientListenerPair(t *testing.T, clientSizes, listenerSizes, maxLi return client, listener } +func createUDPPair(t *testing.T, bufsize1, bufsize2, mtu uint16) (one, two *stacks.UDPConn) { + t.Helper() + const ( + p1, p2 = 1024, 2048 + ) + Stacks := createPortStacks(t, 2, mtu) + c1, err := stacks.NewUDPConn(Stacks[0], stacks.UDPConnConfig{TxBufSize: bufsize1, RxBufSize: bufsize1}) + if err != nil { + t.Fatal(err) + } + err = c1.OpenDialUDP(p1, Stacks[1].HardwareAddr6(), netip.AddrPortFrom(Stacks[1].Addr(), p2)) + if err != nil { + t.Fatal(err) + } + + c2, err := stacks.NewUDPConn(Stacks[1], stacks.UDPConnConfig{TxBufSize: bufsize2, RxBufSize: bufsize2}) + if err != nil { + t.Fatal(err) + } + err = c2.OpenDialUDP(p2, Stacks[0].HardwareAddr6(), netip.AddrPortFrom(Stacks[0].Addr(), p1)) + if err != nil { + t.Fatal(err) + } + return c1, c2 +} + func createTCPClientServerPair(t *testing.T, clientSizes, serverSizes, mtu uint16) (client, server *stacks.TCPConn) { t.Helper() const ( @@ -1025,7 +1088,7 @@ func checkNoMoreDataSent(t *testing.T, msg string, egr *Exchanger) { handleTx := func() (newTxs int) { txOld := txs for istack := 0; istack < len(egr.Stacks); istack++ { - n, _ := egr.Stacks[istack].HandleEth(buf) + n, _ := egr.Stacks[istack].PutOutboundEth(buf) if n > 0 { txs++ data += n diff --git a/stacks/tcpconn.go b/stacks/tcpconn.go index 544e885..282532a 100644 --- a/stacks/tcpconn.go +++ b/stacks/tcpconn.go @@ -39,7 +39,7 @@ type TCPConn struct { // remote is the IP+port address of remote. remote netip.AddrPort localPort uint16 - remoteMAC [6]byte + remoteMAC [6]byte //this is the local peer's MAC address (or often, that of the router/gateway for Internet traffic) abortErr error closing bool // connid is a conenction counter that is incremented each time a new @@ -62,8 +62,8 @@ func NewTCPConn(stack *PortStack, cfg TCPConnConfig) (*TCPConn, error) { if cfg.TxBufSize == 0 { cfg.TxBufSize = defaultSocketSize } - buf := make([]byte, cfg.RxBufSize+cfg.TxBufSize) - sock := makeTCPConn(stack, buf[:cfg.TxBufSize], buf[cfg.TxBufSize:cfg.TxBufSize+cfg.RxBufSize]) + tx, rx := contiguous2Bufs(int(cfg.TxBufSize), int(cfg.RxBufSize)) + sock := makeTCPConn(stack, tx, rx) sock.trace("NewTCPConn:end") return &sock, nil } @@ -160,8 +160,8 @@ func (sock *TCPConn) Write(b []byte) (n int, _ error) { } } -// Read reads data from the socket's input buffer. If the buffer is empty, -// Read will block until data is available. +// Read reads data from the socket's input (RX) (ring) buffer... populating b[].. +// If the rx buffer is empty, Read will block until data is available. func (sock *TCPConn) Read(b []byte) (int, error) { err := sock.checkPipeOpen() if err != nil { @@ -273,6 +273,7 @@ func (sock *TCPConn) openstack(state seqs.State, localPortNum uint16, iss seqs.V } err = sock.open(state, localPortNum, iss, remoteMAC, remoteAddr) if err != nil { + // On failure ensure PortStack socket is released to avoid leak. sock.stack.CloseTCP(localPortNum) } return err @@ -331,7 +332,8 @@ func (sock *TCPConn) checkPipeOpen() error { return nil } -func (sock *TCPConn) recv(pkt *TCPPacket) (err error) { +// recvEth implements the [itcphandler] interface. +func (sock *TCPConn) recvEth(pkt *TCPPacket) (err error) { sock.trace("TCPConn.recv:start") prevState := sock.scb.State() if prevState.IsClosed() { @@ -380,7 +382,8 @@ func (sock *TCPConn) recv(pkt *TCPPacket) (err error) { return err } -func (sock *TCPConn) send(response []byte) (n int, err error) { +// Send this handler is called by the underlying stack [PortStack.PutOutboundEth] method and populates response[] from the TX ring buffer, with data to be sent as a packet +func (sock *TCPConn) putOutboundEth(response []byte) (n int, err error) { defer sock.trace("TCPConn.send:start") if !sock.remote.IsValid() { return 0, nil // No remote address yet, yield. @@ -414,14 +417,15 @@ func (sock *TCPConn) send(response []byte) (n int, err error) { var payload []byte if available > 0 { payload = response[sizeTCPNoOptions : sizeTCPNoOptions+seg.DATALEN] + //we are reading out of the TX ring buffer, data to encapsulate and send n, err = sock.tx.Read(payload) if err != nil && err != io.EOF || n != int(seg.DATALEN) { - panic("bug in handleUser") // This is a bug in ring buffer or a race condition. + panic("unexpected condition in seqs.TCPConn.send") // This is a bug in ring buffer or a race condition. } } sock.setSrcDest(&sock.pkt) sock.pkt.CalculateHeaders(seg, payload) - sock.pkt.PutHeaders(response) + sock.pkt.PutHeaders(response) //puts the headers in the response bytes array (around the payload) if prevState != sock.scb.State() { sock.info("TCP:tx-statechange", slog.Uint64("port", uint64(sock.localPort)), slog.String("old", prevState.String()), slog.String("new", sock.scb.State().String()), slog.String("txflags", seg.Flags.String())) } diff --git a/stacks/tcplistener.go b/stacks/tcplistener.go index a9bea17..e00f74a 100644 --- a/stacks/tcplistener.go +++ b/stacks/tcplistener.go @@ -103,7 +103,8 @@ func (l *TCPListener) Addr() net.Addr { return &l.laddr } -func (l *TCPListener) send(dst []byte) (n int, err error) { +// putOutboundEth implements the [itcphandler] interface. +func (l *TCPListener) putOutboundEth(dst []byte) (n int, err error) { if !l.isOpen() { return 0, io.EOF } @@ -112,7 +113,7 @@ func (l *TCPListener) send(dst []byte) (n int, err error) { if conn.LocalPort() == 0 || !conn.isPendingHandling() { continue } - n, err = conn.send(dst) + n, err = conn.putOutboundEth(dst) if err == io.EOF { l.freeConnForReuse(i) err = nil @@ -125,7 +126,8 @@ func (l *TCPListener) send(dst []byte) (n int, err error) { return 0, nil } -func (l *TCPListener) recv(pkt *TCPPacket) error { +// recvEth implements the [itcphandler] interface. +func (l *TCPListener) recvEth(pkt *TCPPacket) error { if !l.isOpen() { return io.EOF } @@ -143,7 +145,7 @@ func (l *TCPListener) recv(pkt *TCPPacket) error { pkt.IP.Source != conn.remote.Addr().As4() { continue // Not for this connection. } - err := conn.recv(pkt) + err := conn.recvEth(pkt) if err == io.EOF { l.freeConnForReuse(connidx) err = nil @@ -154,7 +156,7 @@ func (l *TCPListener) recv(pkt *TCPPacket) error { l.trace("lst:noconn2recv") return ErrDroppedPacket // No available connection to receive packet. } - err := freeconn.recv(pkt) + err := freeconn.recvEth(pkt) if err == io.EOF { l.freeConnForReuse(connidx) freeconn.abort() diff --git a/stacks/udpconn.go b/stacks/udpconn.go new file mode 100644 index 0000000..194332c --- /dev/null +++ b/stacks/udpconn.go @@ -0,0 +1,312 @@ +package stacks + +//UDP is 'connectionless' so this is only a connection in so far as it's a bunch of packets arriving on a common port +//Packets can fail to arrive, or arrive out of order, or arrive more than once - this is the nature of UDP +//It is very easy to build a reliable protocol on top of UDP + +import ( + "errors" + "io" + "log/slog" + "net" + "net/netip" + "os" + "runtime" + "time" + + "github.com/soypat/seqs/eth" + "github.com/soypat/seqs/internal" +) + +var _ net.Conn = &UDPConn{} //net.conn is part of the standard go library - it's an interface we must implement + +// The handler is a set of methods (primarily send and recv) that the underlying port stack will call, to pass packets for processing +// it is an interface we need to implement +var _ iudphandler = (*UDPConn)(nil) + +var defaultUDPbuffSize = uint16(4096) //a bit arbitrary + +const ( + //defaultSocketSize = 2048 + sizeUDPNoOptions = eth.SizeEthernetHeader + eth.SizeIPv4Header + eth.SizeUDPHeader +) + +type UDPConn struct { + stack *PortStack + lastRx time.Time + pkt UDPPacket //this is a reusable 'scratchpad' packet used for sending + tx ring //the ring buffers contain unpacketised data + rx ring + remote netip.AddrPort + localPort uint16 + remoteMAC [6]byte //this is a local peer - OR the ROUTER/Gateways mac address + raddr, laddr net.UDPAddr + // Read and Write deadlines. + rdeadline, wdeadline time.Time + connid uint8 + closing bool +} + +// this is an identical structure to TCPConnConfig - but I didn't want to change the original name without discussions +type UDPConnConfig struct { + TxBufSize uint16 + RxBufSize uint16 +} + +func NewUDPConn(stack *PortStack, cfg UDPConnConfig) (*UDPConn, error) { + + if cfg.RxBufSize == 0 { + cfg.RxBufSize = defaultUDPbuffSize + } + if cfg.TxBufSize == 0 { + cfg.TxBufSize = defaultUDPbuffSize + } + + tx, rx := contiguous2Bufs(int(cfg.TxBufSize), int(cfg.RxBufSize)) + sock := makeUDPConn(stack, tx, rx) + sock.trace("NewUDPConn:end") + return &sock, nil + +} + +func makeUDPConn(stack *PortStack, tx, rx []byte) UDPConn { + return UDPConn{ + stack: stack, + tx: ring{buf: tx}, + rx: ring{buf: rx}, + } +} + +// OpenDialUDP won't really do anything - other than choose a local outbound port) ??? +func (sock *UDPConn) OpenDialUDP(localPort uint16, remoteMAC [6]byte, remote netip.AddrPort) error { + if !remote.IsValid() { + return errors.New("invalid netip.AddrPort") + } + sock.trace("UDPConn.OpenDialUDP:start") + err := sock.stack.OpenUDP(localPort, sock) + if err != nil { + return err + } + sock.closing = false + sock.connid++ + sock.remoteMAC = remoteMAC //this is our router/gateway MAC address - we never get to know the remote MAC address (that would be a security issue) + sock.remote = remote + sock.localPort = localPort + sock.rx.Reset() + sock.tx.Reset() + return nil +} + +func (sock *UDPConn) PortStack() *PortStack { + return sock.stack +} + +// abort deletes connection state and fails all pending Read/Write calls. +func (sock *UDPConn) abort() { + sock.rx.Reset() + sock.tx.Reset() + *sock = UDPConn{ + stack: sock.stack, + tx: sock.tx, + rx: sock.tx, + connid: sock.connid + 1, + } +} + +func (sock *UDPConn) isPendingHandling() bool { + // much simpler than TCP - may need expanding?? + return sock.closing || sock.tx.Buffered() > 0 +} + +// Read reads from the underlying RX ring buffer, throws an EOF error if no data is available +func (sock *UDPConn) Read(b []byte) (int, error) { + err := sock.checkPipeClosed() + if err != nil { + return 0, err + } + connid := sock.connid + backoff := internal.NewBackoff(internal.BackoffHasPriority) + for sock.rx.Buffered() == 0 { + if connid != sock.connid { + return 0, net.ErrClosed + } else if !sock.wdeadline.IsZero() && time.Since(sock.wdeadline) > 0 { + return 0, os.ErrDeadlineExceeded + } + backoff.Miss() + } + return sock.rx.Read(b) //read from the rx ring buffer into b +} + +func (sock *UDPConn) BufferedInput() int { + if sock.closing || sock.localPort == 0 { + return 0 + } + return sock.rx.Buffered() +} + +func (sock *UDPConn) FlushOutputBuffer() error { + sock.trace("UDPConn.FlushOutputBuffer:start") + if err := sock.checkPipeClosed(); err != nil { + return err + } + backoff := internal.NewBackoff(internal.BackoffHasPriority) + connid := sock.connid + for sock.tx.Buffered() > 0 { + backoff.Miss() + if connid != sock.connid { + return net.ErrClosed + } + } + return nil +} + +// Write writes into the underlying tx ring buffer - calls to the portStacks [PortStack.PutOutboundEth] will send the (queued) data +func (sock *UDPConn) Write(b []byte) (int, error) { + err := sock.checkPipeClosed() + if err != nil { + return 0, err + } + err = sock.stack.FlagPendingUDP(sock.localPort) + if err != nil { + return 0, err + } + connid := sock.connid + totalLen := len(b) + backoff := internal.NewBackoff(internal.BackoffHasPriority) + + for { + if connid != sock.connid { + // Important check before writing to buffer- maybe connection was aborted during sleep. + return totalLen - len(b), net.ErrClosed + } + n, _ := sock.tx.Write(b) + b = b[n:] + if len(b) == 0 { + break + } + if n == 0 && !sock.wdeadline.IsZero() && time.Since(sock.wdeadline) > 0 { + return totalLen - len(b), os.ErrDeadlineExceeded + } else if n == 0 { + backoff.Miss() + } else { + backoff.Hit() + runtime.Gosched() + } + err = sock.stack.FlagPendingUDP(sock.localPort) + if err != nil { + return totalLen - len(b), err + } + } + return totalLen, nil +} + +func (sock *UDPConn) Close() error { + if sock.localPort == 0 { + return net.ErrClosed + } + sock.closing = true + sock.stack.FlagPendingUDP(sock.localPort) + return nil +} + +func (sock *UDPConn) checkPipeClosed() error { + if sock.closing { + return io.EOF + } else if sock.localPort == 0 { + return net.ErrClosed + } + return nil +} + +func (sock *UDPConn) LocalAddr() net.Addr { + sock.laddr = net.UDPAddr{ + IP: sock.stack.ip[:], + Port: int(sock.localPort), + } + return &sock.laddr +} + +func (sock *UDPConn) RemoteAddr() net.Addr { + sock.raddr = net.UDPAddr{ + IP: sock.remote.Addr().AsSlice(), + Port: int(sock.remote.Port()), + } + return &sock.raddr +} + +func (sock *UDPConn) SetDeadline(t time.Time) error { + sock.SetReadDeadline(t) + sock.SetWriteDeadline(t) + return nil +} + +func (sock *UDPConn) SetReadDeadline(t time.Time) error { + sock.rdeadline = t + return nil +} + +func (sock *UDPConn) SetWriteDeadline(t time.Time) error { + sock.wdeadline = t + return nil +} + +// recvEth implements the [iudphandler] interface. +func (sock *UDPConn) recvEth(pkt *UDPPacket) (err error) { + sock.trace("UDP.recv:start") + if sock.closing { + return io.EOF + } + + remotePort := sock.remote.Port() + if remotePort != 0 && pkt.UDP.SourcePort != remotePort { + return nil // This packet came from a different client (remote port) to the one we are interacting with. + } + sock.lastRx = pkt.Rx + // By this point we know that the packet is valid and contains data, we process it. + payload := pkt.Payload() + _, err = sock.rx.Write(payload) //write into the UDPconns rx (ring) buffer + + return err //which is hopefully nil - but could be errRingBufferFull +} + +// putOutboundEth implements the [iudphandler] interface. +func (sock *UDPConn) putOutboundEth(response []byte) (n int, err error) { + sock.trace("UDPConn.send:start") + if !sock.remote.IsValid() { + return 0, nil // No remote address yet, yield. + } + + available := min(sock.tx.Buffered(), len(response)-sizeUDPNoOptions) + + var payload []byte + if available > 0 { + payload = response[sizeUDPNoOptions : sizeUDPNoOptions+available] //this is a reference to the payload section of the response[] slice we are populating + + //we are reading out of the TX ring buffer, data to encapsulate and send + n, err = sock.tx.Read(payload) //fill the payload section from the TX ring buffer + + if err != nil && err != io.EOF || n != int(available) { + panic("bug in UDPCpn.send reading from TX buffer") + } + } + + sock.setSrcDest(&sock.pkt) + sock.pkt.CalculateHeaders(payload) + sock.pkt.PutHeaders(response) //the sock (conn object) has the local and remote port data to be able to embed in the packet + + return sizeUDPNoOptions + n, err +} + +func (sock *UDPConn) setSrcDest(pkt *UDPPacket) { + pkt.Eth.Source = sock.stack.HardwareAddr6() + pkt.IP.Source = sock.stack.ip + pkt.UDP.SourcePort = sock.localPort + + pkt.IP.Destination = sock.remote.Addr().As4() + pkt.UDP.DestinationPort = sock.remote.Port() + pkt.Eth.Destination = sock.remoteMAC +} + +func (sock *UDPConn) trace(msg string, attrs ...slog.Attr) { + internal.LogAttrs(sock.stack.logger, internal.LevelTrace, msg, attrs...) +}