diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 55d07fe3..b4b604c5 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -18,6 +18,7 @@ repos: args: ["-style=Google"] - id: cpplint # linter (or style-error checker) for Google C++ Style Guide - id: cppcheck # static analyzer of C/C++ code + args: ["--check-level=exhaustive"] - repo: https://github.com/charliermarsh/ruff-pre-commit rev: 'v0.0.255' # Ruff version. hooks: diff --git a/frontend/enso/enso_nic.py b/frontend/enso/enso_nic.py index 7d892533..c0865fc6 100644 --- a/frontend/enso/enso_nic.py +++ b/frontend/enso/enso_nic.py @@ -52,7 +52,7 @@ def __init__( tx_credits: int = DEFAULT_NB_TX_CREDITS, ethernet_port: int = DEFAULT_ETH_PORT, desc_per_pkt: bool = False, - latency_opt: bool = False, + latency_opt: bool = True, skip_config: bool = False, verbose: bool = False, log_file: Union[bool, TextIO] = False, diff --git a/scripts/load_bitstream.sh b/scripts/load_bitstream.sh index 353abe12..781238b2 100755 --- a/scripts/load_bitstream.sh +++ b/scripts/load_bitstream.sh @@ -8,8 +8,22 @@ DEVICE_ID="0000" FPGA_NB=${1:-"1-13"} +BITSTREAM_NAME="enso.sof" + cd $SCRIPT_DIR +# Check and download the bitstream +if ! [ -f $PWD/$BITSTREAM_NAME ]; then + $PWD/update_bitstream.sh --download + if [ $? -eq 0 ]; then + echo "Programming bitstream now..." + else + echo "Failed: Could not download bitstream!" + exit 1 + fi +fi + + # We use taskset and chrt to benefit from multiple cores even when they are # isolated from the linux scheduler. This significantly speeds up loading the # bitstream. Note that we use all but the last core. diff --git a/setup.sh b/setup.sh index 6623ced4..42f45a60 100755 --- a/setup.sh +++ b/setup.sh @@ -63,7 +63,7 @@ else fi # Setup the software. -./scripts/sw_setup.sh 16384 32768 false +./scripts/sw_setup.sh 16384 32768 true return_code=$? if [ $return_code -ne 0 ]; then diff --git a/software/examples/capture.cpp b/software/examples/capture.cpp index cd4c1ddf..8208159f 100644 --- a/software/examples/capture.cpp +++ b/software/examples/capture.cpp @@ -159,6 +159,7 @@ int main(int argc, const char* argv[]) { while (!setup_done) continue; // Wait for setup to be done. + std::cout << "The bandwidth statistics are approximated." << std::endl; show_stats(thread_stats, &keep_running); socket_thread.join(); diff --git a/software/examples/echo.cpp b/software/examples/echo.cpp index 7d4cd291..fa2b8517 100644 --- a/software/examples/echo.cpp +++ b/software/examples/echo.cpp @@ -140,6 +140,7 @@ int main(int argc, const char* argv[]) { while (!setup_done) continue; // Wait for setup to be done. + std::cout << "The bandwidth statistics are approximated." << std::endl; show_stats(thread_stats, &keep_running); for (auto& thread : threads) { diff --git a/software/examples/echo_copy.cpp b/software/examples/echo_copy.cpp index 1ef2d610..d9e28714 100644 --- a/software/examples/echo_copy.cpp +++ b/software/examples/echo_copy.cpp @@ -157,6 +157,7 @@ int main(int argc, const char* argv[]) { while (!setup_done) continue; // Wait for setup to be done. + std::cout << "The bandwidth statistics are approximated." << std::endl; show_stats(thread_stats, &keep_running); for (auto& thread : threads) { diff --git a/software/examples/echo_event.cpp b/software/examples/echo_event.cpp index 859df939..b9ff50cf 100644 --- a/software/examples/echo_event.cpp +++ b/software/examples/echo_event.cpp @@ -140,6 +140,7 @@ int main(int argc, const char* argv[]) { while (!setup_done) continue; // Wait for setup to be done. + std::cout << "The bandwidth statistics are approximated." << std::endl; show_stats(thread_stats, &keep_running); for (auto& thread : threads) { diff --git a/software/examples/echo_prefetch.cpp b/software/examples/echo_prefetch.cpp index 4eabc442..3e7dfa4e 100644 --- a/software/examples/echo_prefetch.cpp +++ b/software/examples/echo_prefetch.cpp @@ -151,6 +151,7 @@ int main(int argc, const char* argv[]) { while (!setup_done) continue; // Wait for setup to be done. + std::cout << "The bandwidth statistics are approximated." << std::endl; show_stats(thread_stats, &keep_running); for (auto& thread : threads) { diff --git a/software/examples/ensogen.cpp b/software/examples/ensogen.cpp index a3570baf..971f8952 100644 --- a/software/examples/ensogen.cpp +++ b/software/examples/ensogen.cpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2022, Carnegie Mellon University + * Copyright (c) 2024, Carnegie Mellon University * * Redistribution and use in source and binary forms, with or without * modification, are permitted (subject to the limitations in the disclaimer @@ -30,9 +30,22 @@ * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. */ +/** + * @file: ensogen.cpp + * + * @brief Packet generator program that uses the Enso library to send and + * receive packets. It uses libpcap to read and process packets from a pcap + * file. + * + * Example: + * + * sudo ./scripts/ensogen.sh ./scripts/sample_pcaps/2_64_1_2.pcap 100 \ + * --pcie-addr 65:00.0 + */ + #include #include -#include +#include #include #include #include @@ -58,10 +71,25 @@ #include #include +/****************************************************************************** + * Macros and Globals + *****************************************************************************/ // Number of loop iterations to wait before probing the TX notification buffer // again when reclaiming buffer space. #define TX_RECLAIM_DELAY 1024 +// Scientific notation for 10^6, treated as double. Used for stats calculations. +#define ONE_MILLION 1e6 + +// Scientific notation for 10^3, treated as double. Used for stats calculations. +#define ONE_THOUSAND 1e3 + +// Ethernet's per packet overhead added by the FPGA (in bytes). +#define FPGA_PACKET_OVERHEAD 24 + +// Minimum size of a packet aligned to cache (in bytes). +#define MIN_PACKET_ALIGNED_SIZE 64 + // If defined, ignore received packets. // #define IGNORE_RX @@ -101,19 +129,224 @@ // Number of transfers required to send a buffer full of packets. #define TRANSFERS_PER_BUFFER (((BUFFER_SIZE - 1) / enso::kMaxTransferLen) + 1) +// Macros for cmd line option names +#define CMD_OPT_HELP "help" +#define CMD_OPT_COUNT "count" +#define CMD_OPT_CORE "core" +#define CMD_OPT_QUEUES "queues" +#define CMD_OPT_SAVE "save" +#define CMD_OPT_SINGLE_CORE "single-core" +#define CMD_OPT_RTT "rtt" +#define CMD_OPT_RTT_HIST "rtt-hist" +#define CMD_OPT_RTT_HIST_OFF "rtt-hist-offset" +#define CMD_OPT_RTT_HIST_LEN "rtt-hist-len" +#define CMD_OPT_STATS_DELAY "stats-delay" +#define CMD_OPT_PCIE_ADDR "pcie-addr" + static volatile int keep_running = 1; static volatile int force_stop = 0; static volatile int rx_ready = 0; static volatile int rx_done = 0; static volatile int tx_done = 0; +using enso::Device; +using enso::RxPipe; +using enso::TxPipe; + +/****************************************************************************** + * Structure Definitions + *****************************************************************************/ +/** + * @brief Structure to store the command line arguments. + */ +struct parsed_args_t { + int core_id; + uint32_t nb_queues; + bool save; + bool single_core; + bool enable_rtt; + bool enable_rtt_history; + std::string hist_file; + std::string pcap_file; + std::string save_file; + uint16_t rate_num; + uint16_t rate_den; + uint64_t nb_pkts; + uint32_t rtt_hist_offset; + uint32_t rtt_hist_len; + uint32_t stats_delay; + std::string pcie_addr; +}; + +/** + * @brief Structure to store an Enso TxPipe object and attributes related + * to it. + */ +struct EnsoTxPipe { + explicit EnsoTxPipe(TxPipe* pipe) + : tx_pipe(pipe), nb_aligned_bytes(0), nb_raw_bytes(0), nb_pkts(0) {} + // Enso TxPipe + TxPipe* tx_pipe; + // Number of cache aligned bytes in the pipe + uint32_t nb_aligned_bytes; + // Number of raw bytes in the pipe + uint32_t nb_raw_bytes; + // Number of packets in the pipe + uint32_t nb_pkts; +}; + +/** + * @brief Structure to store variables needed for processing the PCAP + * file and are passed to the callback function. + */ +struct PcapHandlerContext { + PcapHandlerContext(std::unique_ptr& dev_, pcap_t* pcap_) + : dev(dev_), buf(NULL), free_flits_cur_pipe(0), pcap(pcap_) {} + // Pointer to Enso device + std::unique_ptr& dev; + // Pipes to store the packets from the PCAP file + std::vector tx_pipes; + // Pointer to the buffer of the current pipe + uint8_t* buf; + // Total number of free flits in the current pipe + uint32_t free_flits_cur_pipe; + // libpcap object associated with the opened PCAP file + pcap_t* pcap; +}; + +/** + * @brief Structure to store the Rx related stats. + */ +struct RxStats { + explicit RxStats(uint32_t rtt_hist_len = 0, uint32_t rtt_hist_offset = 0) + : pkts(0), + bytes(0), + nb_batches(0), + rtt_sum(0), + rtt_hist_len(rtt_hist_len), + rtt_hist_offset(rtt_hist_offset) { + if (rtt_hist_len > 0) { + rtt_hist = new uint64_t[rtt_hist_len](); + } + } + ~RxStats() { + if (rtt_hist_len > 0) { + delete[] rtt_hist; + } + } + + RxStats(const RxStats& other) = delete; + RxStats(RxStats&& other) = default; + RxStats& operator=(const RxStats& other) = delete; + RxStats& operator=(RxStats&& other) = delete; + + inline void add_rtt_to_hist(const uint32_t rtt) { + // Insert RTTs into the rtt_hist array if they are in its range, + // otherwise use the backup_rtt_hist. + if (unlikely((rtt >= (rtt_hist_len - rtt_hist_offset)) || + (rtt < rtt_hist_offset))) { + backup_rtt_hist[rtt]++; + } else { + rtt_hist[rtt - rtt_hist_offset]++; + } + } + + // Number of packets received + uint64_t pkts; + // Number of bytes received + uint64_t bytes; + // Number of RxNotifications or batches + uint64_t nb_batches; + // RTT calculation related + uint64_t rtt_sum; + const uint32_t rtt_hist_len; + const uint32_t rtt_hist_offset; + uint64_t* rtt_hist; + std::unordered_map backup_rtt_hist; +}; + +/** + * @brief Structure to store the variables needed by the receive_pkts + * function. + */ +struct RxArgs { + RxArgs(bool enbl_rtt, bool enbl_rtt_hist, std::unique_ptr& dev_) + : enable_rtt(enbl_rtt), enable_rtt_history(enbl_rtt_hist), dev(dev_) {} + // Check for whether RTT needs to be calculated + bool enable_rtt; + // Check for whether RTT history needs to be calculated + bool enable_rtt_history; + // Pointer to the Enso device + std::unique_ptr& dev; +}; + +/** + * @brief Structure to store the Tx related stats. + */ +struct TxStats { + TxStats() : pkts(0), bytes(0) {} + // Number of packets received + uint64_t pkts; + // Number of bytes received + uint64_t bytes; +}; + +/** + * @brief Structure to store the arguments needed by the transmit_pkts + * function. + */ +struct TxArgs { + TxArgs(std::vector& pipes, uint64_t total_aligned_bytes, + uint64_t total_raw_bytes, uint64_t pkts_in_last_pipe, + uint32_t pipes_size, std::unique_ptr& dev_) + : tx_pipes(pipes), + total_remaining_aligned_bytes(total_aligned_bytes), + total_remaining_raw_bytes(total_raw_bytes), + nb_pkts_in_last_pipe(pkts_in_last_pipe), + cur_ind(0), + total_pipes(pipes_size), + transmissions_pending(0), + ignored_reclaims(0), + dev(dev_) {} + // TxPipes handled by the thread + std::vector& tx_pipes; + // Number of aligned bytes that need to be sent + uint64_t total_remaining_aligned_bytes; + // Number of raw bytes that need to be sent + uint64_t total_remaining_raw_bytes; + // Number of packets in the last pipe - needed for stats calculation + uint64_t nb_pkts_in_last_pipe; + // Index in tx_pipes vector. Points to the current pipe being sent + uint32_t cur_ind; + // Total number of pipes in tx_pipes vector + uint32_t total_pipes; + // Total number of notifications created and sent by the application + uint32_t transmissions_pending; + // Used to track the number of times the thread did not check for notification + // consumption by the NIC + uint32_t ignored_reclaims; + // Pointer to the Enso device object + std::unique_ptr& dev; +}; + +/****************************************************************************** + * Function Definitions + *****************************************************************************/ +/** + * @brief Signal handler for SIGINT (Ctrl+C). + */ void int_handler(int signal __attribute__((unused))) { if (!keep_running) { + // user interrupted the second time, we force stop force_stop = 1; } + // user interrupted the first time, we signal the thread(s) to stop keep_running = 0; } +/** + * @brief Prints the help message on stdout. + */ static void print_usage(const char* program_name) { printf( "%s PCAP_FILE RATE_NUM RATE_DEN\n" @@ -155,20 +388,9 @@ static void print_usage(const char* program_name) { DEFAULT_HIST_LEN, DEFAULT_STATS_DELAY); } -#define CMD_OPT_HELP "help" -#define CMD_OPT_COUNT "count" -#define CMD_OPT_CORE "core" -#define CMD_OPT_QUEUES "queues" -#define CMD_OPT_SAVE "save" -#define CMD_OPT_SINGLE_CORE "single-core" -#define CMD_OPT_RTT "rtt" -#define CMD_OPT_RTT_HIST "rtt-hist" -#define CMD_OPT_RTT_HIST_OFF "rtt-hist-offset" -#define CMD_OPT_RTT_HIST_LEN "rtt-hist-len" -#define CMD_OPT_STATS_DELAY "stats-delay" -#define CMD_OPT_PCIE_ADDR "pcie-addr" - -// Map long options to short options. +/** + * Command line options related. Used in parse_args function. + */ enum { CMD_OPT_HELP_NUM = 256, CMD_OPT_COUNT_NUM, @@ -201,25 +423,16 @@ static const struct option long_options[] = { {CMD_OPT_PCIE_ADDR, required_argument, NULL, CMD_OPT_PCIE_ADDR_NUM}, {0, 0, 0, 0}}; -struct parsed_args_t { - int core_id; - uint32_t nb_queues; - bool save; - bool single_core; - bool enable_rtt; - bool enable_rtt_history; - std::string hist_file; - std::string pcap_file; - std::string save_file; - uint16_t rate_num; - uint16_t rate_den; - uint64_t nb_pkts; - uint32_t rtt_hist_offset; - uint32_t rtt_hist_len; - uint32_t stats_delay; - std::string pcie_addr; -}; - +/** + * @brief Parses the command line arguments. Called from the main function. + * + * @param argc Number of arguments entered by the user. + * @param argv Value of the arguments entered by the user. + * @param parsed_args Structure filled by this function after parsing the + * arguments and used in main(). + * + * @return 0 on success. -1 on failure. 1 for help message. + */ static int parse_args(int argc, char** argv, struct parsed_args_t& parsed_args) { int opt; @@ -302,182 +515,16 @@ static int parse_args(int argc, char** argv, return 0; } -// Adapted from ixy. -static void* get_huge_page(size_t size) { - static int id = 0; - int fd; - char huge_pages_path[128]; - - snprintf(huge_pages_path, sizeof(huge_pages_path), "/mnt/huge/ensogen:%i", - id); - ++id; - - fd = open(huge_pages_path, O_CREAT | O_RDWR, S_IRWXU); - if (fd == -1) { - std::cerr << "(" << errno << ") Problem opening huge page file descriptor" - << std::endl; - return NULL; - } - - if (ftruncate(fd, (off_t)size)) { - std::cerr << "(" << errno - << ") Could not truncate huge page to size: " << size - << std::endl; - close(fd); - unlink(huge_pages_path); - return NULL; - } - - void* virt_addr = (void*)mmap(NULL, size, PROT_READ | PROT_WRITE, - MAP_SHARED | MAP_HUGETLB, fd, 0); - - if (virt_addr == (void*)-1) { - std::cerr << "(" << errno << ") Could not mmap huge page" << std::endl; - close(fd); - unlink(huge_pages_path); - return NULL; - } - - if (mlock(virt_addr, size)) { - std::cerr << "(" << errno << ") Could not lock huge page" << std::endl; - munmap(virt_addr, size); - close(fd); - unlink(huge_pages_path); - return NULL; - } - - // Don't keep it around in the hugetlbfs. - close(fd); - unlink(huge_pages_path); - - return virt_addr; -} - -// Adapted from ixy. -static uint64_t virt_to_phys(void* virt) { - long pagesize = sysconf(_SC_PAGESIZE); - int fd = open("/proc/self/pagemap", O_RDONLY); - if (fd < 0) { - return 0; - } - // pagemap is an array of pointers for each normal-sized page - if (lseek(fd, (uintptr_t)virt / pagesize * sizeof(uintptr_t), SEEK_SET) < 0) { - close(fd); - return 0; - } - - uintptr_t phy = 0; - if (read(fd, &phy, sizeof(phy)) < 0) { - close(fd); - return 0; - } - close(fd); - - if (!phy) { - return 0; - } - // bits 0-54 are the page number - return (uint64_t)((phy & 0x7fffffffffffffULL) * pagesize + - ((uintptr_t)virt) % pagesize); -} - -struct EnsoPipe { - EnsoPipe(uint8_t* buf, uint32_t length, uint32_t good_bytes, uint32_t nb_pkts) - : buf(buf), length(length), good_bytes(good_bytes), nb_pkts(nb_pkts) { - phys_addr = virt_to_phys(buf); - } - uint8_t* buf; - uint32_t length; - uint32_t good_bytes; - uint32_t nb_pkts; - uint64_t phys_addr; -}; - -struct PcapHandlerContext { - std::vector enso_pipes; - uint32_t free_flits; - uint32_t hugepage_offset; - pcap_t* pcap; -}; - -struct RxStats { - explicit RxStats(uint32_t rtt_hist_len = 0, uint32_t rtt_hist_offset = 0) - : pkts(0), - bytes(0), - rtt_sum(0), - nb_batches(0), - rtt_hist_len(rtt_hist_len), - rtt_hist_offset(rtt_hist_offset) { - if (rtt_hist_len > 0) { - rtt_hist = new uint64_t[rtt_hist_len](); - } - } - ~RxStats() { - if (rtt_hist_len > 0) { - delete[] rtt_hist; - } - } - - RxStats(const RxStats& other) = delete; - RxStats(RxStats&& other) = default; - RxStats& operator=(const RxStats& other) = delete; - RxStats& operator=(RxStats&& other) = delete; - - inline void add_rtt_to_hist(const uint32_t rtt) { - // Insert RTTs into the rtt_hist array if they are in its range, - // otherwise use the backup_rtt_hist. - if (unlikely((rtt >= (rtt_hist_len - rtt_hist_offset)) || - (rtt < rtt_hist_offset))) { - backup_rtt_hist[rtt]++; - } else { - rtt_hist[rtt - rtt_hist_offset]++; - } - } - - uint64_t pkts; - uint64_t bytes; - uint64_t rtt_sum; - uint64_t nb_batches; - const uint32_t rtt_hist_len; - const uint32_t rtt_hist_offset; - uint64_t* rtt_hist; - std::unordered_map backup_rtt_hist; -}; - -struct RxArgs { - bool enable_rtt; - bool enable_rtt_history; - int socket_fd; -}; - -struct TxStats { - TxStats() : pkts(0), bytes(0) {} - uint64_t pkts; - uint64_t bytes; -}; - -struct TxArgs { - TxArgs(std::vector& enso_pipes, uint64_t total_bytes_to_send, - uint64_t total_good_bytes_to_send, uint64_t pkts_in_last_buffer, - int socket_fd) - : ignored_reclaims(0), - total_remaining_bytes(total_bytes_to_send), - total_remaining_good_bytes(total_good_bytes_to_send), - transmissions_pending(0), - pkts_in_last_buffer(pkts_in_last_buffer), - enso_pipes(enso_pipes), - current_enso_pipe(enso_pipes.begin()), - socket_fd(socket_fd) {} - uint64_t ignored_reclaims; - uint64_t total_remaining_bytes; - uint64_t total_remaining_good_bytes; - uint32_t transmissions_pending; - uint64_t pkts_in_last_buffer; - std::vector& enso_pipes; - std::vector::iterator current_enso_pipe; - int socket_fd; -}; - +/** + * @brief libpcap callback registered by the main function. Called for each + * packet present in the PCAP file. + * + * @param user Structure allocated in main to read and store relevant + * information. + * @param pkt_hdr Contains packet metadata like timestamp, length, etc. + * (UNUSED) + * @param pkt_bytes Packet data to be copied into a buffer. + */ void pcap_pkt_handler(u_char* user, const struct pcap_pkthdr* pkt_hdr, const u_char* pkt_bytes) { (void)pkt_hdr; @@ -490,141 +537,158 @@ void pcap_pkt_handler(u_char* user, const struct pcap_pkthdr* pkt_hdr, } uint32_t len = enso::get_pkt_len(pkt_bytes); - uint32_t nb_flits = (len - 1) / 64 + 1; - - if (nb_flits > context->free_flits) { - uint8_t* buf; - if ((context->hugepage_offset + BUFFER_SIZE) > HUGEPAGE_SIZE) { - // Need to allocate another huge page. - buf = (uint8_t*)get_huge_page(HUGEPAGE_SIZE); - if (buf == NULL) { - pcap_breakloop(context->pcap); - return; - } - context->hugepage_offset = BUFFER_SIZE; - } else { - struct EnsoPipe& enso_pipe = context->enso_pipes.back(); - buf = enso_pipe.buf + BUFFER_SIZE; - context->hugepage_offset += BUFFER_SIZE; + uint32_t nb_flits = (len - 1) / MIN_PACKET_ALIGNED_SIZE + 1; + + if (nb_flits > context->free_flits_cur_pipe) { + // initialize a new pipe + TxPipe* tx_pipe = context->dev->AllocateTxPipe(); + if (!tx_pipe) { + std::cerr << "Problem creating TX pipe" << std::endl; + pcap_breakloop(context->pcap); + return; } - context->enso_pipes.emplace_back(buf, 0, 0, 0); - context->free_flits = BUFFER_SIZE / 64; + struct EnsoTxPipe enso_tx_pipe(tx_pipe); + context->tx_pipes.push_back(enso_tx_pipe); + context->free_flits_cur_pipe = BUFFER_SIZE / MIN_PACKET_ALIGNED_SIZE; + context->buf = tx_pipe->buf(); } - struct EnsoPipe& enso_pipe = context->enso_pipes.back(); - uint8_t* dest = enso_pipe.buf + enso_pipe.length; - + // We copy the packets in the pipe's buffer in multiples of 64 bytes + // or MIN_PACKET_ALIGNED SIZE. However, we also keep track of the number + // of raw bytes on a per pipe basis since we need it for stats calculation. + struct EnsoTxPipe& tx_pipe = context->tx_pipes.back(); + uint8_t* dest = context->buf + tx_pipe.nb_aligned_bytes; memcpy(dest, pkt_bytes, len); - enso_pipe.length += nb_flits * 64; // Packets must be cache aligned. - enso_pipe.good_bytes += len; - ++(enso_pipe.nb_pkts); - context->free_flits -= nb_flits; + tx_pipe.nb_aligned_bytes += nb_flits * MIN_PACKET_ALIGNED_SIZE; + tx_pipe.nb_raw_bytes += len; + tx_pipe.nb_pkts++; + context->free_flits_cur_pipe -= nb_flits; } +/** + * @brief This function is used to receive packets. The approach used in this + * function is slightly different from the one described in Enso's library for + * the RxPipe abstraction (Allocate->Bind->Recv->Clear). We use the + * NextRxPipeToRecv abstraction to take advantage of notification prefetching + * and use fallback queues. + * + * @param rx_args Arguments needed by this function. See RxArgs definition. + * @param rx_stats Rx stats that need to be updated in every iteration. + * + * @return Number of packets received. + */ inline uint64_t receive_pkts(const struct RxArgs& rx_args, struct RxStats& rx_stats) { uint64_t nb_pkts = 0; #ifdef IGNORE_RX (void)rx_args; (void)rx_stats; -#else // IGNORE_RX - uint8_t* recv_buf; - int socket_fd; - int recv_len = enso::recv_select(rx_args.socket_fd, &socket_fd, - (void**)&recv_buf, RECV_BUF_LEN, 0); - - if (unlikely(recv_len < 0)) { - std::cerr << "Error receiving" << std::endl; - exit(7); +#else // IGNORE_RX + RxPipe* rx_pipe = rx_args.dev->NextRxPipeToRecv(); + if (unlikely(rx_pipe == nullptr)) { + return 0; } + auto batch = rx_pipe->PeekPkts(); + uint64_t recv_bytes = 0; + for (auto pkt : batch) { + uint16_t pkt_len = enso::get_pkt_len(pkt); - if (likely(recv_len > 0)) { - int processed_bytes = 0; - uint64_t recv_bytes = 0; - uint8_t* pkt = recv_buf; + if (rx_args.enable_rtt) { + uint32_t rtt = enso::get_pkt_rtt(pkt); + rx_stats.rtt_sum += rtt; - while (processed_bytes < recv_len) { - uint16_t pkt_len = enso::get_pkt_len(pkt); - uint16_t nb_flits = (pkt_len - 1) / 64 + 1; - uint16_t pkt_aligned_len = nb_flits * 64; - - if (rx_args.enable_rtt) { - uint32_t rtt = enso::get_pkt_rtt(pkt); - rx_stats.rtt_sum += rtt; - - if (rx_args.enable_rtt_history) { - rx_stats.add_rtt_to_hist(rtt); - } + if (rx_args.enable_rtt_history) { + rx_stats.add_rtt_to_hist(rtt); } - - pkt += pkt_aligned_len; - processed_bytes += pkt_aligned_len; - recv_bytes += pkt_len; - ++nb_pkts; } - rx_stats.pkts += nb_pkts; - ++(rx_stats.nb_batches); - rx_stats.bytes += recv_bytes; - enso::free_enso_pipe(socket_fd, recv_len); + recv_bytes += pkt_len; + nb_pkts++; } + + uint32_t batch_length = batch.processed_bytes(); + rx_pipe->ConfirmBytes(batch_length); + + rx_stats.pkts += nb_pkts; + rx_stats.nb_batches++; + rx_stats.bytes += recv_bytes; + + rx_pipe->Clear(); + #endif // IGNORE_RX return nb_pkts; } +/** + * @brief This function is called periodically to send packets and update + * the TX stats. In case too many transmissions are already pending it will + * wait for the NIC to process them before sending another batch. + * + * @param tx_args Arguments needed by this function. See TxArgs definition. + * @param tx_stats Tx stats that need to be updated in every iteration. + */ inline void transmit_pkts(struct TxArgs& tx_args, struct TxStats& tx_stats) { - // Avoid transmitting new data when the TX buffer is full. + // Avoid transmitting new data when too many TX notifications are pending const uint32_t buf_fill_thresh = enso::kNotificationBufSize - TRANSFERS_PER_BUFFER - 1; - if (likely(tx_args.transmissions_pending < buf_fill_thresh)) { - uint32_t transmission_length = (uint32_t)std::min( - (uint64_t)(BUFFER_SIZE), tx_args.total_remaining_bytes); - transmission_length = - std::min(transmission_length, tx_args.current_enso_pipe->length); - - uint32_t good_transmission_length = - (uint32_t)std::min(tx_args.total_remaining_good_bytes, - (uint64_t)tx_args.current_enso_pipe->good_bytes); - - uint64_t phys_addr = tx_args.current_enso_pipe->phys_addr; - - enso::send(tx_args.socket_fd, phys_addr, transmission_length, 0); - tx_stats.bytes += good_transmission_length; - ++tx_args.transmissions_pending; - - tx_args.total_remaining_bytes -= transmission_length; - tx_args.total_remaining_good_bytes -= good_transmission_length; - - if (unlikely(tx_args.total_remaining_bytes == 0)) { - tx_stats.pkts += tx_args.pkts_in_last_buffer; + struct EnsoTxPipe& cur_pipe = tx_args.tx_pipes[tx_args.cur_ind]; + uint32_t transmission_length = + std::min(tx_args.total_remaining_aligned_bytes, + (uint64_t)cur_pipe.nb_aligned_bytes); + uint32_t transmission_raw_length = std::min( + tx_args.total_remaining_raw_bytes, (uint64_t)cur_pipe.nb_raw_bytes); + + uint64_t buf_phys_addr = cur_pipe.tx_pipe->GetBufPhysAddr(); + tx_args.dev->SendBatch(buf_phys_addr, transmission_length); + tx_args.transmissions_pending++; + tx_args.total_remaining_aligned_bytes -= transmission_length; + tx_args.total_remaining_raw_bytes -= transmission_raw_length; + + // update the stats + // the stats need be calculated based on raw bytes + tx_stats.bytes += transmission_raw_length; + if (tx_args.total_remaining_aligned_bytes == 0) { keep_running = 0; + tx_stats.pkts += tx_args.nb_pkts_in_last_pipe; return; } + tx_stats.pkts += cur_pipe.nb_pkts; - // Move to next packet buffer. - tx_stats.pkts += tx_args.current_enso_pipe->nb_pkts; - tx_args.current_enso_pipe = std::next(tx_args.current_enso_pipe); - if (tx_args.current_enso_pipe == tx_args.enso_pipes.end()) { - tx_args.current_enso_pipe = tx_args.enso_pipes.begin(); - } + // move to the next pipe + tx_args.cur_ind = (tx_args.cur_ind + 1) % tx_args.total_pipes; } // Reclaim TX notification buffer space. if ((tx_args.transmissions_pending > (enso::kNotificationBufSize / 4))) { if (tx_args.ignored_reclaims > TX_RECLAIM_DELAY) { tx_args.ignored_reclaims = 0; - tx_args.transmissions_pending -= enso::get_completions(tx_args.socket_fd); + uint32_t num_processed = tx_args.dev->ConsumeBatches(); + if (num_processed > tx_args.transmissions_pending) { + tx_args.transmissions_pending = 0; + } else { + tx_args.transmissions_pending -= num_processed; + } } else { - ++tx_args.ignored_reclaims; + tx_args.ignored_reclaims++; } } } +/** + * @brief Waits until the NIC has consumed all the Tx notifications. + * + * @param tx_args Arguments needed by this function. See TxArgs definition. + */ inline void reclaim_all_buffers(struct TxArgs& tx_args) { - while (tx_args.transmissions_pending) { - tx_args.transmissions_pending -= enso::get_completions(tx_args.socket_fd); + while (tx_args.transmissions_pending > 0) { + uint32_t num_processed = tx_args.dev->ConsumeBatches(); + if (num_processed > tx_args.transmissions_pending) { + tx_args.transmissions_pending = 0; + break; + } + tx_args.transmissions_pending -= num_processed; } } @@ -639,19 +703,10 @@ int main(int argc, char** argv) { return 1; } - // Parse the PCI address in format 0000:00:00.0 or 00:00.0. - if (parsed_args.pcie_addr != "") { - uint32_t domain, bus, dev, func; - if (sscanf(parsed_args.pcie_addr.c_str(), "%x:%x:%x.%x", &domain, &bus, - &dev, &func) != 4) { - if (sscanf(parsed_args.pcie_addr.c_str(), "%x:%x.%x", &bus, &dev, - &func) != 3) { - std::cerr << "Invalid PCI address" << std::endl; - return 1; - } - } - uint16_t bdf = (bus << 8) | (dev << 3) | (func & 0x7); - enso::set_bdf(bdf); + std::unique_ptr dev = Device::Create(parsed_args.pcie_addr); + if (!dev) { + std::cerr << "Problem creating device" << std::endl; + exit(2); } char errbuf[PCAP_ERRBUF_SIZE]; @@ -662,13 +717,11 @@ int main(int argc, char** argv) { return 2; } - struct PcapHandlerContext context; - context.free_flits = 0; - context.hugepage_offset = HUGEPAGE_SIZE; - context.pcap = pcap; - std::vector& enso_pipes = context.enso_pipes; + struct PcapHandlerContext context(dev, pcap); + + std::vector& tx_pipes = context.tx_pipes; - // Initialize packet buffers with packets read from pcap file. + // Initialize pipes with packets read from pcap file. if (pcap_loop(pcap, 0, pcap_pkt_handler, (u_char*)&context) < 0) { std::cerr << "Error while reading pcap (" << pcap_geterr(pcap) << ")" << std::endl; @@ -677,70 +730,77 @@ int main(int argc, char** argv) { // For small pcaps we copy the same packets over the remaining of the // buffer. This reduces the number of transfers that we need to issue. - if ((enso_pipes.size() == 1) && - (enso_pipes.front().length < BUFFER_SIZE / 2)) { - EnsoPipe& buffer = enso_pipes.front(); - uint32_t original_buf_length = buffer.length; - uint32_t original_good_bytes = buffer.good_bytes; - uint32_t original_nb_pkts = buffer.nb_pkts; - while ((buffer.length + original_buf_length) <= BUFFER_SIZE) { - memcpy(buffer.buf + buffer.length, buffer.buf, original_buf_length); - buffer.length += original_buf_length; - buffer.good_bytes += original_good_bytes; - buffer.nb_pkts += original_nb_pkts; + // If there is only one pipe, nb_bytes contains the number of bytes + // in that pipe only. + if ((tx_pipes.size() == 1) && + (tx_pipes.front().nb_aligned_bytes < BUFFER_SIZE / 2)) { + struct EnsoTxPipe& tx_pipe = tx_pipes.front(); + uint8_t* pipe_buf = tx_pipe.tx_pipe->buf(); + uint32_t cur_buf_length = tx_pipe.nb_aligned_bytes; + uint32_t original_buf_length = cur_buf_length; + uint32_t original_nb_pkts = tx_pipe.nb_pkts; + uint32_t original_raw_bytes = tx_pipe.nb_raw_bytes; + while ((cur_buf_length + original_buf_length) <= BUFFER_SIZE) { + memcpy(pipe_buf + cur_buf_length, pipe_buf, original_buf_length); + cur_buf_length += original_buf_length; + tx_pipe.nb_pkts += original_nb_pkts; + tx_pipe.nb_raw_bytes += original_raw_bytes; } + tx_pipe.nb_aligned_bytes = cur_buf_length; } - uint64_t total_pkts_in_buffers = 0; - uint64_t total_bytes_in_buffers = 0; - uint64_t total_good_bytes_in_buffers = 0; - for (auto& buffer : enso_pipes) { - total_pkts_in_buffers += buffer.nb_pkts; - total_bytes_in_buffers += buffer.length; - total_good_bytes_in_buffers += buffer.good_bytes; + // calculate total aligned bytes, raw bytes and packets in all the pipes + uint64_t total_pkts_in_pipes = 0; + uint64_t total_aligned_bytes_in_pipes = 0; + uint64_t total_raw_bytes_in_pipes = 0; + for (auto& pipe : tx_pipes) { + total_pkts_in_pipes += pipe.nb_pkts; + total_aligned_bytes_in_pipes += pipe.nb_aligned_bytes; + total_raw_bytes_in_pipes += pipe.nb_raw_bytes; } - // To restrict the number of packets, we track the total number of bytes. - // This avoids the need to look at every sent packet only to figure out the - // number bytes to send in the very last buffer. But to be able to do this, - // we need to compute the total number of bytes that we have to send. - uint64_t total_bytes_to_send; - uint64_t total_good_bytes_to_send; - uint64_t pkts_in_last_buffer = 0; + // Handling the --count option. calculate the number of bytes that + // need to be sent. if the user requests 'x' packets, we start sending + // from the start of the first pipe (order is the same as the PCAP file) + // and wrap around if x is greater than total_pkts_in_pipes. + uint64_t total_aligned_bytes_to_send; + uint64_t total_raw_bytes_to_send; + uint64_t pkts_in_last_pipe = 0; if (parsed_args.nb_pkts > 0) { - uint64_t nb_pkts_remaining = parsed_args.nb_pkts % total_pkts_in_buffers; - uint64_t nb_full_iters = parsed_args.nb_pkts / total_pkts_in_buffers; + uint64_t nb_full_iters = parsed_args.nb_pkts / total_pkts_in_pipes; + uint64_t nb_pkts_remaining = parsed_args.nb_pkts % total_pkts_in_pipes; - total_bytes_to_send = nb_full_iters * total_bytes_in_buffers; - total_good_bytes_to_send = nb_full_iters * total_good_bytes_in_buffers; + total_aligned_bytes_to_send = nb_full_iters * total_aligned_bytes_in_pipes; + total_raw_bytes_to_send = nb_full_iters * total_raw_bytes_in_pipes; if (nb_pkts_remaining == 0) { - pkts_in_last_buffer = enso_pipes.back().nb_pkts; + pkts_in_last_pipe = tx_pipes.back().nb_pkts; } - for (auto& buffer : enso_pipes) { - if (nb_pkts_remaining < buffer.nb_pkts) { - uint8_t* pkt = buffer.buf; + // calculate the length of the first 'x % total_pkts_in_pipes' packets + for (auto& pipe : tx_pipes) { + if (nb_pkts_remaining < pipe.nb_pkts) { + uint8_t* pkt = pipe.tx_pipe->buf(); while (nb_pkts_remaining > 0) { uint16_t pkt_len = enso::get_pkt_len(pkt); uint16_t nb_flits = (pkt_len - 1) / 64 + 1; - total_bytes_to_send += nb_flits * 64; - --nb_pkts_remaining; - ++pkts_in_last_buffer; + total_aligned_bytes_to_send += nb_flits * 64; + nb_pkts_remaining--; + pkts_in_last_pipe++; pkt = enso::get_next_pkt(pkt); } break; } - total_bytes_to_send += buffer.length; - nb_pkts_remaining -= buffer.nb_pkts; + total_aligned_bytes_to_send += pipe.nb_aligned_bytes; + nb_pkts_remaining -= pipe.nb_pkts; } } else { // Treat nb_pkts == 0 as unbounded. The following value should be enough // to send 64-byte packets for around 400 years using Tb Ethernet. - total_bytes_to_send = 0xffffffffffffffff; - total_good_bytes_to_send = 0xffffffffffffffff; + total_aligned_bytes_to_send = 0xffffffffffffffff; + total_raw_bytes_to_send = 0xffffffffffffffff; } uint32_t rtt_hist_len = 0; @@ -761,38 +821,32 @@ int main(int argc, char** argv) { // When using single_core we use the same thread for RX and TX, otherwise we // launch separate threads for RX and TX. if (!parsed_args.single_core) { - std::thread rx_thread = std::thread([&parsed_args, &rx_stats] { + std::thread rx_thread = std::thread([&parsed_args, &rx_stats, &dev] { std::this_thread::sleep_for(std::chrono::milliseconds(500)); - std::vector socket_fds; + std::vector rx_pipes; - int socket_fd = 0; - for (uint32_t i = 0; i < parsed_args.nb_queues; ++i) { - socket_fd = enso::socket(AF_INET, SOCK_DGRAM, 0, true); - - if (socket_fd == -1) { - std::cerr << "Problem creating socket (" << errno - << "): " << strerror(errno) << std::endl; - exit(2); + for (uint32_t i = 0; i < parsed_args.nb_queues; i++) { + // we create fallback queues by passing true in AllocateRxPipe + RxPipe* rx_pipe = dev->AllocateRxPipe(true); + if (!rx_pipe) { + std::cerr << "Problem creating RX pipe" << std::endl; + exit(3); } - - socket_fds.push_back(socket_fd); + rx_pipes.push_back(rx_pipe); } - enso::enable_device_rate_limit(socket_fd, parsed_args.rate_num, - parsed_args.rate_den); - enso::enable_device_round_robin(socket_fd); + dev->EnableRateLimiting(parsed_args.rate_num, parsed_args.rate_den); + dev->EnableRoundRobin(); if (parsed_args.enable_rtt) { - enso::enable_device_timestamp(socket_fd); + dev->EnableTimeStamping(); } else { - enso::disable_device_timestamp(socket_fd); + dev->DisableTimeStamping(); } - RxArgs rx_args; - rx_args.enable_rtt = parsed_args.enable_rtt; - rx_args.enable_rtt_history = parsed_args.enable_rtt_history; - rx_args.socket_fd = socket_fd; + RxArgs rx_args(parsed_args.enable_rtt, parsed_args.enable_rtt_history, + dev); std::cout << "Running RX on core " << sched_getcpu() << std::endl; @@ -808,7 +862,7 @@ int main(int argc, char** argv) { while (!force_stop && (nb_iters_no_pkt < ITER_NO_PKT_THRESH)) { uint64_t nb_pkts = receive_pkts(rx_args, rx_stats); if (unlikely(nb_pkts == 0)) { - ++nb_iters_no_pkt; + nb_iters_no_pkt++; } else { nb_iters_no_pkt = 0; } @@ -816,38 +870,26 @@ int main(int argc, char** argv) { rx_done = true; - enso::disable_device_rate_limit(socket_fd); - enso::disable_device_round_robin(socket_fd); + dev->DisableRateLimiting(); + dev->DisableRoundRobin(); if (parsed_args.enable_rtt) { - enso::disable_device_timestamp(socket_fd); - } - - for (auto& s : socket_fds) { - enso::shutdown(s, SHUT_RDWR); + dev->DisableTimeStamping(); } }); std::thread tx_thread = std::thread( - [total_bytes_to_send, total_good_bytes_to_send, pkts_in_last_buffer, - &parsed_args, &enso_pipes, &tx_stats] { + [total_aligned_bytes_to_send, total_raw_bytes_to_send, + pkts_in_last_pipe, &parsed_args, &tx_stats, &dev, &tx_pipes] { std::this_thread::sleep_for(std::chrono::seconds(1)); - int socket_fd = enso::socket(AF_INET, SOCK_DGRAM, 0, false); - - if (socket_fd == -1) { - std::cerr << "Problem creating socket (" << errno - << "): " << strerror(errno) << std::endl; - exit(2); - } - while (!rx_ready) continue; std::cout << "Running TX on core " << sched_getcpu() << std::endl; - TxArgs tx_args(enso_pipes, total_bytes_to_send, - total_good_bytes_to_send, pkts_in_last_buffer, - socket_fd); + TxArgs tx_args(tx_pipes, total_aligned_bytes_to_send, + total_raw_bytes_to_send, pkts_in_last_pipe, + tx_pipes.size(), dev); while (keep_running) { transmit_pkts(tx_args, tx_stats); @@ -881,48 +923,44 @@ int main(int argc, char** argv) { threads.push_back(std::move(rx_thread)); threads.push_back(std::move(tx_thread)); - } else { // Send and receive packets within the same thread. - std::thread rx_tx_thread = std::thread( - [&parsed_args, &rx_stats, total_bytes_to_send, total_good_bytes_to_send, - pkts_in_last_buffer, &enso_pipes, &tx_stats] { + std::thread rx_tx_thread = + std::thread([total_aligned_bytes_to_send, total_raw_bytes_to_send, + pkts_in_last_pipe, &parsed_args, &tx_stats, &rx_stats, + &dev, &tx_pipes] { std::this_thread::sleep_for(std::chrono::milliseconds(500)); - std::vector socket_fds; - - int socket_fd = 0; - for (uint32_t i = 0; i < parsed_args.nb_queues; ++i) { - socket_fd = enso::socket(AF_INET, SOCK_DGRAM, 0, true); + std::vector rx_pipes; - if (socket_fd == -1) { - std::cerr << "Problem creating socket (" << errno - << "): " << strerror(errno) << std::endl; - exit(2); + for (uint32_t i = 0; i < parsed_args.nb_queues; i++) { + // we create fallback queues by passing true in AllocateRxPipe + RxPipe* rx_pipe = dev->AllocateRxPipe(true); + if (!rx_pipe) { + std::cerr << "Problem creating RX pipe" << std::endl; + exit(3); } - - socket_fds.push_back(socket_fd); + rx_pipes.push_back(rx_pipe); } - enso::enable_device_rate_limit(socket_fd, parsed_args.rate_num, - parsed_args.rate_den); - enso::enable_device_round_robin(socket_fd); + dev->EnableRateLimiting(parsed_args.rate_num, parsed_args.rate_den); + dev->EnableRoundRobin(); if (parsed_args.enable_rtt) { - enso::enable_device_timestamp(socket_fd); + dev->EnableTimeStamping(); + } else { + dev->DisableTimeStamping(); } std::cout << "Running RX and TX on core " << sched_getcpu() << std::endl; - RxArgs rx_args; - rx_args.enable_rtt = parsed_args.enable_rtt; - rx_args.enable_rtt_history = parsed_args.enable_rtt_history; - rx_args.socket_fd = socket_fd; + RxArgs rx_args(parsed_args.enable_rtt, parsed_args.enable_rtt_history, + dev); - TxArgs tx_args(enso_pipes, total_bytes_to_send, - total_good_bytes_to_send, pkts_in_last_buffer, - socket_fd); + TxArgs tx_args(tx_pipes, total_aligned_bytes_to_send, + total_raw_bytes_to_send, pkts_in_last_pipe, + tx_pipes.size(), dev); rx_ready = 1; @@ -939,7 +977,7 @@ int main(int argc, char** argv) { while (!force_stop && (nb_iters_no_pkt < ITER_NO_PKT_THRESH)) { uint64_t nb_pkts = receive_pkts(rx_args, rx_stats); if (unlikely(nb_pkts == 0)) { - ++nb_iters_no_pkt; + nb_iters_no_pkt++; } else { nb_iters_no_pkt = 0; } @@ -949,15 +987,11 @@ int main(int argc, char** argv) { reclaim_all_buffers(tx_args); - enso::disable_device_rate_limit(socket_fd); - enso::disable_device_round_robin(socket_fd); + dev->DisableRateLimiting(); + dev->DisableRoundRobin(); if (parsed_args.enable_rtt) { - enso::disable_device_timestamp(socket_fd); - } - - for (auto& s : socket_fds) { - enso::shutdown(s, SHUT_RDWR); + dev->DisableTimeStamping(); } }); @@ -1010,22 +1044,24 @@ int main(int argc, char** argv) { uint64_t tx_bytes = tx_stats.bytes; uint64_t tx_pkts = tx_stats.pkts; - double interval_s = parsed_args.stats_delay / 1000.; + double interval_s = (double)parsed_args.stats_delay / ONE_THOUSAND; uint64_t rx_pkt_diff = rx_pkts - last_rx_pkts; uint64_t rx_goodput_mbps = - (rx_bytes - last_rx_bytes) * 8. / (1e6 * interval_s); + (rx_bytes - last_rx_bytes) * 8. / (ONE_MILLION * interval_s); uint64_t rx_pkt_rate = (rx_pkt_diff / interval_s); - uint64_t rx_pkt_rate_kpps = rx_pkt_rate / 1e3; - uint64_t rx_tput_mbps = rx_goodput_mbps + 24 * 8 * rx_pkt_rate / 1e6; + uint64_t rx_pkt_rate_kpps = rx_pkt_rate / ONE_THOUSAND; + uint64_t rx_tput_mbps = + rx_goodput_mbps + FPGA_PACKET_OVERHEAD * 8 * rx_pkt_rate / ONE_MILLION; uint64_t tx_pkt_diff = tx_pkts - last_tx_pkts; uint64_t tx_goodput_mbps = - (tx_bytes - last_tx_bytes) * 8. / (1e6 * interval_s); + (tx_bytes - last_tx_bytes) * 8. / (ONE_MILLION * interval_s); uint64_t tx_tput_mbps = - (tx_bytes - last_tx_bytes + tx_pkt_diff * 24) * 8. / (1e6 * interval_s); + (tx_bytes - last_tx_bytes + tx_pkt_diff * FPGA_PACKET_OVERHEAD) * 8. / + (ONE_MILLION * interval_s); uint64_t tx_pkt_rate = (tx_pkt_diff / interval_s); - uint64_t tx_pkt_rate_kpps = tx_pkt_rate / 1e3; + uint64_t tx_pkt_rate_kpps = tx_pkt_rate / ONE_THOUSAND; uint64_t rtt_sum_ns = rx_stats.rtt_sum * enso::kNsPerTimestampCycle; uint64_t rtt_ns; @@ -1035,9 +1071,6 @@ int main(int argc, char** argv) { rtt_ns = 0; } - // TODO(sadok): don't print metrics that are unreliable before the first - // two samples. - std::cout << std::dec << " RX: Throughput: " << rx_tput_mbps << " Mbps" << " Rate: " << rx_pkt_rate_kpps << " kpps" << std::endl @@ -1081,7 +1114,7 @@ int main(int argc, char** argv) { std::ofstream hist_file; hist_file.open(parsed_args.hist_file); - for (uint32_t rtt = 0; rtt < parsed_args.rtt_hist_len; ++rtt) { + for (uint32_t rtt = 0; rtt < parsed_args.rtt_hist_len; rtt++) { if (rx_stats.rtt_hist[rtt] != 0) { uint32_t corrected_rtt = (rtt + parsed_args.rtt_hist_offset) * enso::kNsPerTimestampCycle; @@ -1113,12 +1146,6 @@ int main(int argc, char** argv) { thread.join(); } - for (auto& buffer : enso_pipes) { - // Only free hugepage-aligned buffers. - if ((buffer.phys_addr & (HUGEPAGE_SIZE - 1)) == 0) { - munmap(buffer.buf, HUGEPAGE_SIZE); - } - } - + dev.reset(); return ret; } diff --git a/software/examples/l2_forward.cpp b/software/examples/l2_forward.cpp index 8b12fe24..924ffa61 100644 --- a/software/examples/l2_forward.cpp +++ b/software/examples/l2_forward.cpp @@ -157,6 +157,7 @@ int main(int argc, const char* argv[]) { while (!setup_done) continue; // Wait for setup to be done. + std::cout << "The bandwidth statistics are approximated." << std::endl; show_stats(thread_stats, &keep_running); for (auto& thread : threads) { diff --git a/software/include/enso/meson.build b/software/include/enso/meson.build index 0371d3fd..b1beb264 100644 --- a/software/include/enso/meson.build +++ b/software/include/enso/meson.build @@ -5,8 +5,7 @@ public_enso_headers = files( 'ixy_helpers.h', 'internals.h', 'queue.h', - 'pipe.h', - 'socket.h' + 'pipe.h' ) install_headers(public_enso_headers, subdir: 'enso') diff --git a/software/include/enso/pipe.h b/software/include/enso/pipe.h index 90a386db..c90fa346 100644 --- a/software/include/enso/pipe.h +++ b/software/include/enso/pipe.h @@ -285,6 +285,27 @@ class Device { */ int DisableRoundRobin(); + /** + * @brief Sends a batch of packets to the NIC by creating and appending a Tx + * Notification. Use this function if you need to only send a batch + * and not process completions as done by `SendAndFree()`. + * + * @param phys_addr Physical address of the buffer that contains the packets. + * @param nb_bytes The number of bytes that need to be sent starting from the + * physical address. + */ + void SendBatch(uint64_t phys_addr, uint32_t nb_bytes); + + /** + * @brief Checks and returns the numbers of Tx Notifications consumed by the + * NIC. Use this function if you only need to check the number of + * notification consumed and not process them as done by + * `ProcessCompletions()`. + * + * @return number of Tx notifications successfully processed by the NIC. + */ + uint32_t ConsumeBatches(); + /** * @brief Gets the round robin status for the device. * @@ -843,6 +864,14 @@ class TxPipe { device_->Send(kId, phys_addr, nb_bytes); } + /* + * @brief Used to get the physical address of the pipe's buffer starting + * at offset of the current application data. + * + * @return Physical address of the buffer. + * */ + inline uint64_t GetBufPhysAddr() { return buf_phys_addr_ + app_begin_; } + /** * @brief Explicitly requests a best-effort buffer extension. * diff --git a/software/include/enso/socket.h b/software/include/enso/socket.h deleted file mode 100644 index 1655c38c..00000000 --- a/software/include/enso/socket.h +++ /dev/null @@ -1,143 +0,0 @@ -/* - * Copyright (c) 2022, Carnegie Mellon University - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted (subject to the limitations in the disclaimer - * below) provided that the following conditions are met: - * - * * Redistributions of source code must retain the above copyright notice, - * this list of conditions and the following disclaimer. - * - * * Redistributions in binary form must reproduce the above copyright - * notice, this list of conditions and the following disclaimer in the - * documentation and/or other materials provided with the distribution. - * - * * Neither the name of the copyright holder nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * NO EXPRESS OR IMPLIED LICENSES TO ANY PARTY'S PATENT RIGHTS ARE GRANTED BY - * THIS LICENSE. THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND - * CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT - * NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A - * PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR - * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, - * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, - * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; - * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, - * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR - * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF - * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - */ - -/** - * @file - * @brief Socket-like API. - * @deprecated Use the API defined in `pipe.h` instead. - * - * @author Hugo Sadok - */ - -#ifndef SOFTWARE_INCLUDE_ENSO_SOCKET_H_ -#define SOFTWARE_INCLUDE_ENSO_SOCKET_H_ - -#include -#include - -namespace enso { - -typedef unsigned short sa_family_t; -typedef unsigned int socklen_t; - -#define MAX_NB_CORES 128 -#define MAX_NB_SOCKETS MAX_NB_FLOWS - -void set_bdf(uint16_t bdf_); - -int socket(int domain, int type, int protocol, bool fallback) noexcept; - -int bind(int sockfd, const struct sockaddr *addr, socklen_t addrlen) noexcept; - -uint64_t get_socket_phys_addr(int sockfd); - -void *get_socket_virt_addr(int sockfd); - -uint64_t convert_buf_addr_to_phys(int sockfd, void *addr); - -int shutdown(int sockfd, int how) noexcept; - -/* - * Receives packets using a POSIX-like interface. Here *buf is the address to a - * buffer allocated by the user. The function will copy the received data to - * this buffer. - */ -ssize_t recv(int sockfd, void *buf, size_t len, int flags); - -ssize_t recv_zc(int sockfd, void **buf, size_t len, int flags); - -ssize_t recv_select(int ref_sockfd, int *sockfd, void **buf, size_t len, - int flags); - -/* - * Send the bytes pointed by address `phys_addr` through the `sockfd` socket. - * There are two important differences to a traditional POSIX `send`: - * - Memory must be pinned (phys_addr needs to be a physical address); - * - It is not safe to change the buffer content until the transmission is done. - * - * This function blocks until it can send but returns before the transmission is - * over. To figure out when the transmission is over, use the `get_completions` - * function. - */ -ssize_t send(int sockfd, uint64_t phys_addr, size_t len, int flags); - -/* - * Return the number of transmission requests that were completed since the last - * call to this function. Since transmissions are always completed in order, one - * can figure out which transmissions were completed by keeping track of all the - * calls to `send`. There can be only up to `kMaxPendingTxRequests` requests - * completed between two calls to `send`. However, if `send` is called multiple - * times, without calling `get_completions` the number of completed requests can - * surpass `kMaxPendingTxRequests`. - */ -uint32_t get_completions(int ref_sockfd); - -/* - * Enable hardware timestamping for the device. This applies to all sockets. - */ -int enable_device_timestamp(int ref_sockfd); - -/* - * Disable hardware timestamping for the device. This applies to all sockets. - */ -int disable_device_timestamp(int ref_sockfd); - -/* - * Enable hardware rate limit for the device. This applies to all sockets. - */ -int enable_device_rate_limit(int ref_sockfd, uint16_t num, uint16_t den); - -/* - * Disable hardware rate limit for the device. This applies to all sockets. - */ -int disable_device_rate_limit(int ref_sockfd); - -/* - * Enable round robin for the device. This applies to all sockets. - */ -int enable_device_round_robin(int ref_sockfd); - -/* - * Disable round robin for the device. This applies to all sockets. - */ -int disable_device_round_robin(int ref_sockfd); - -/* - * Free packet buffer. Use this to free received packets. - */ -void free_enso_pipe(int sockfd, size_t len); - -void print_sock_stats(int sockfd); - -} // namespace enso - -#endif // SOFTWARE_INCLUDE_ENSO_SOCKET_H_ diff --git a/software/src/enso/meson.build b/software/src/enso/meson.build index 0f86c06d..c644f842 100644 --- a/software/src/enso/meson.build +++ b/software/src/enso/meson.build @@ -4,7 +4,6 @@ enso_sources = files( 'helpers.cpp', 'ixy_helpers.cpp', 'pipe.cpp', - 'socket.cpp', ) project_sources += enso_sources diff --git a/software/src/enso/pipe.cpp b/software/src/enso/pipe.cpp index 8d8cb722..a114d0ad 100644 --- a/software/src/enso/pipe.cpp +++ b/software/src/enso/pipe.cpp @@ -397,6 +397,14 @@ void Device::ProcessCompletions() { } } +void Device::SendBatch(uint64_t phys_addr, uint32_t nb_bytes) { + send_to_queue(¬ification_buf_pair_, phys_addr, nb_bytes); +} + +uint32_t Device::ConsumeBatches() { + return get_unreported_completions(¬ification_buf_pair_); +} + int Device::EnableTimeStamping() { return enable_timestamp(¬ification_buf_pair_); } diff --git a/software/src/enso/socket.cpp b/software/src/enso/socket.cpp deleted file mode 100644 index 7d34f09e..00000000 --- a/software/src/enso/socket.cpp +++ /dev/null @@ -1,254 +0,0 @@ -/* - * Copyright (c) 2022, Carnegie Mellon University - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted (subject to the limitations in the disclaimer - * below) provided that the following conditions are met: - * - * * Redistributions of source code must retain the above copyright notice, - * this list of conditions and the following disclaimer. - * - * * Redistributions in binary form must reproduce the above copyright - * notice, this list of conditions and the following disclaimer in the - * documentation and/or other materials provided with the distribution. - * - * * Neither the name of the copyright holder nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * NO EXPRESS OR IMPLIED LICENSES TO ANY PARTY'S PATENT RIGHTS ARE GRANTED BY - * THIS LICENSE. THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND - * CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT - * NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A - * PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR - * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, - * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, - * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; - * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, - * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR - * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF - * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - */ - -/** - * @file - * @brief Socket-like API. - * @deprecated Use the API defined in `pipe.h` instead. - * - * @author Hugo Sadok - */ - -#include "enso/socket.h" - -#include -#include - -#include -#include - -#include "../pcie.h" - -namespace enso { - -static struct NotificationBufPair notification_buf_pair[MAX_NB_CORES]; - -// TODO(sadok) replace with hash table? -static struct SocketInternal open_sockets[MAX_NB_SOCKETS]; -static unsigned int nb_open_sockets = 0; -static uint16_t bdf = 0; - -// HACK(sadok): We need a better way to specify the BDF. -void set_bdf(uint16_t bdf_) { bdf = bdf_; } - -int socket([[maybe_unused]] int domain, [[maybe_unused]] int type, - [[maybe_unused]] int protocol, bool fallback) noexcept { - if (unlikely(nb_open_sockets >= MAX_NB_SOCKETS)) { - std::cerr << "Maximum number of sockets reached" << std::endl; - return -1; - } - - struct SocketInternal socket_entry; - - struct NotificationBufPair* nb_pair = ¬ification_buf_pair[sched_getcpu()]; - socket_entry.notification_buf_pair = nb_pair; - - struct RxEnsoPipeInternal* enso_pipe = &socket_entry.enso_pipe; - - int bar = -1; - int socket_id = dma_init(nb_pair, enso_pipe, bdf, bar, - std::string(kHugePageDefaultPrefix), fallback); - if (unlikely(socket_id < 0)) { - std::cerr << "Problem initializing DMA" << std::endl; - return -1; - } - - open_sockets[socket_id] = socket_entry; - - // FIXME(sadok): Use __sync_fetch_and_add to update atomically. - ++nb_open_sockets; - - return socket_id; -} - -int bind(int sockfd, const struct sockaddr* addr, socklen_t addrlen) noexcept { - (void)addrlen; // Avoid unused warnings. - struct SocketInternal* socket = &open_sockets[sockfd]; - sockaddr_in* addr_in = (sockaddr_in*)addr; - - uint32_t enso_pipe_id = get_enso_pipe_id_from_socket(socket); - - // TODO(sadok): insert flow entry from kernel. - insert_flow_entry(socket->notification_buf_pair, ntohs(addr_in->sin_port), 0, - ntohl(addr_in->sin_addr.s_addr), 0, - 0x11, // TODO(sadok): support protocols other than UDP. - enso_pipe_id); - - return 0; -} - -/* - * Return physical address of the buffer associated with the socket. - */ -uint64_t get_socket_phys_addr(int sockfd) { - return open_sockets[sockfd].enso_pipe.buf_phys_addr; -} - -/* - * Return virtual address of the buffer associated with the socket. - */ -void* get_socket_virt_addr(int sockfd) { - return (void*)open_sockets[sockfd].enso_pipe.buf; -} - -/* - * Convert a socket buffer virtual address to physical address. - */ -uint64_t convert_buf_addr_to_phys(int sockfd, void* addr) { - return (uint64_t)addr + open_sockets[sockfd].enso_pipe.phys_buf_offset; -} - -ssize_t recv(int sockfd, void* buf, size_t len, int flags) { - (void)len; - (void)flags; - - void* ring_buf; - struct SocketInternal* socket = &open_sockets[sockfd]; - struct RxEnsoPipeInternal* enso_pipe = &socket->enso_pipe; - struct NotificationBufPair* notification_buf_pair = - socket->notification_buf_pair; - - get_new_tails(notification_buf_pair); - - ssize_t bytes_received = - get_next_batch_from_queue(enso_pipe, notification_buf_pair, &ring_buf); - - if (unlikely(bytes_received <= 0)) { - return bytes_received; - } - - memcpy(buf, ring_buf, bytes_received); - - advance_pipe(enso_pipe, bytes_received); - - return bytes_received; -} - -ssize_t recv_zc(int sockfd, void** buf, size_t len, int flags) { - (void)len; - (void)flags; - - struct SocketInternal* socket = &open_sockets[sockfd]; - struct RxEnsoPipeInternal* enso_pipe = &socket->enso_pipe; - struct NotificationBufPair* notification_buf_pair = - socket->notification_buf_pair; - - get_new_tails(notification_buf_pair); - - return get_next_batch_from_queue(enso_pipe, notification_buf_pair, buf); -} - -ssize_t recv_select(int ref_sockfd, int* sockfd, void** buf, size_t len, - int flags) { - (void)len; - (void)flags; - - struct NotificationBufPair* notification_buf_pair = - open_sockets[ref_sockfd].notification_buf_pair; - return get_next_batch(notification_buf_pair, open_sockets, sockfd, buf); -} - -ssize_t send(int sockfd, uint64_t phys_addr, size_t len, int flags) { - (void)flags; - return send_to_queue(open_sockets[sockfd].notification_buf_pair, phys_addr, - len); -} - -uint32_t get_completions(int ref_sockfd) { - struct NotificationBufPair* notification_buf_pair = - open_sockets[ref_sockfd].notification_buf_pair; - return get_unreported_completions(notification_buf_pair); -} - -void free_enso_pipe(int sockfd, size_t len) { - advance_pipe(&(open_sockets[sockfd].enso_pipe), len); -} - -int enable_device_timestamp(int ref_sockfd) { - if (nb_open_sockets == 0) { - return -2; - } - return enable_timestamp(open_sockets[ref_sockfd].notification_buf_pair); -} - -int disable_device_timestamp(int ref_sockfd) { - if (nb_open_sockets == 0) { - return -2; - } - return disable_timestamp(open_sockets[ref_sockfd].notification_buf_pair); -} - -int enable_device_rate_limit(int ref_sockfd, uint16_t num, uint16_t den) { - if (nb_open_sockets == 0) { - return -2; - } - return enable_rate_limit(open_sockets[ref_sockfd].notification_buf_pair, num, - den); -} - -int disable_device_rate_limit(int ref_sockfd) { - if (nb_open_sockets == 0) { - return -2; - } - return disable_rate_limit(open_sockets[ref_sockfd].notification_buf_pair); -} - -int enable_device_round_robin(int ref_sockfd) { - if (nb_open_sockets == 0) { - return -2; - } - return enable_round_robin(open_sockets[ref_sockfd].notification_buf_pair); -} - -int disable_device_round_robin(int ref_sockfd) { - if (nb_open_sockets == 0) { - return -2; - } - return disable_round_robin(open_sockets[ref_sockfd].notification_buf_pair); -} - -int shutdown(int sockfd, int how __attribute__((unused))) noexcept { - dma_finish(&open_sockets[sockfd]); - - // TODO(sadok): Remove entry from the NIC flow table. - - --nb_open_sockets; - - return 0; -} - -void print_sock_stats(int sockfd) { - struct SocketInternal* socket = &open_sockets[sockfd]; - print_stats(socket, socket->enso_pipe.id == 0); -} - -} // namespace enso