Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 82 additions & 6 deletions control_plane.c
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@
#include <signal.h>
#include <stdlib.h>
#include <unistd.h>
#include <math.h>
#include "common.h"
#include "countdown_cond.h"
#include "flow.h"
#include "hexdump.h"
#include "lib.h"
#include "logging.h"
Expand Down Expand Up @@ -392,21 +394,66 @@ struct print_io_stats_info {
uint64_t start_ns;
uint64_t prev_ns;
struct io_stats prev;
int protocol; /* SOCK_STREAM, SOCK_DGRAM, etc. */
};

static void print_io_stats(struct print_io_stats_info *s)
{
const uint64_t now = clock_now();
const double dt = 1e-9 * (now - s->prev_ns);
struct io_stats cur = {}, prev = s->prev;
uint32_t rtt_sum = 0;
uint32_t rttvar_sum = 0;
uint32_t rtt_min = UINT32_MAX;
uint32_t rtt_max = 0;
int rtt_count = 0;
double elapsed_time;
double tx_mbps;
double rx_mbps;

/* Accumulate per-thread stats */
for (int i = 0; i < s->num_threads; i++) {
cur.tx_ops += s->t[i].io_stats.tx_ops;
cur.tx_bytes += s->t[i].io_stats.tx_bytes;
cur.rx_ops += s->t[i].io_stats.rx_ops;
cur.rx_bytes += s->t[i].io_stats.rx_bytes;

/* skip RTT collection for non-TCP protocols */
if (s->protocol != SOCK_STREAM)
continue;

/* collect RTT stats from all valid flows in this thread */
for (int j = 0; j < s->t[i].flow_space; j++) {
struct tcp_info tcp_info = {};
struct flow *f;
socklen_t len;
int fd;

f = s->t[i].flows[j];
if (!f)
continue;

fd = flow_fd(f);
if (fd < 0)
continue;

len = sizeof(tcp_info);
if (getsockopt(fd, IPPROTO_TCP, TCP_INFO, &tcp_info, &len) != 0)
continue;

/* a value of 0 often means the RTT hasn't been measured yet */
if (tcp_info.tcpi_rtt == 0)
continue;

/* update RTT stats for this flow */
rtt_sum += tcp_info.tcpi_rtt;
rttvar_sum += tcp_info.tcpi_rttvar;
rtt_min = MIN(tcp_info.tcpi_rtt, rtt_min);
rtt_max = MAX(tcp_info.tcpi_rtt, rtt_max);
rtt_count++;
}
}

/* save totals for next round */
s->prev = cur;
s->prev_ns = now;
Expand All @@ -416,18 +463,47 @@ static void print_io_stats(struct print_io_stats_info *s)
cur.tx_bytes -= prev.tx_bytes;
cur.rx_ops -= prev.rx_ops;
cur.rx_bytes -= prev.rx_bytes;
PRINT(s->cb, "t",
"%-10.3lf TX: %6ld ops, %10ld bytes, %8.1lf Mbps; RX: %6ld ops, %10ld bytes, %8.1f Mbps;",
(double)(now - s->start_ns) * 1e-9,
cur.tx_ops, cur.tx_bytes, cur.tx_bytes * 8 * 1e-6 /dt,
cur.rx_ops, cur.rx_bytes, cur.rx_bytes * 8 * 1e-6 / dt);

/* common stats to display */
elapsed_time = (double)(now - s->start_ns) * 1e-9;
tx_mbps = cur.tx_bytes * 8 * 1e-6 / dt;
rx_mbps = cur.rx_bytes * 8 * 1e-6 / dt;

/* display appropriate stats based on protocol */
if (s->protocol == SOCK_STREAM && rtt_count > 0) {
/* TCP with valid RTT data */
double avg_rtt_ms = (double)rtt_sum / rtt_count / 1000.0;
double avg_rttvar_ms = (double)rttvar_sum / rtt_count / 1000.0;
double min_rtt_ms = (double)rtt_min / 1000.0;
double max_rtt_ms = (double)rtt_max / 1000.0;
/* Assuming uniformly collectly variances, the stddev is simply
* the square root of the average of variances.
*/
double rtt_stddev_ms = sqrt(avg_rttvar_ms);

PRINT(s->cb, "t",
"%-10.3lf TX: %6ld ops, %10ld bytes, %8.1lf Mbps; RX: %6ld ops, %10ld bytes, %8.1f Mbps; RTT: %6.2f ms (σ=%5.2f ms, min=%5.2f, max=%5.2f)",
elapsed_time,
cur.tx_ops, cur.tx_bytes, tx_mbps,
cur.rx_ops, cur.rx_bytes, rx_mbps,
avg_rtt_ms, rtt_stddev_ms, min_rtt_ms, max_rtt_ms);
} else {
/* no RTT data or non-TCP protocol */
PRINT(s->cb, "t",
"%-10.3lf TX: %6ld ops, %10ld bytes, %8.1lf Mbps; RX: %6ld ops, %10ld bytes, %8.1f Mbps;",
elapsed_time,
cur.tx_ops, cur.tx_bytes, tx_mbps,
cur.rx_ops, cur.rx_bytes, rx_mbps);
}
}

void control_plane_wait_until_done(struct control_plane *cp, struct thread *t)
{
struct print_io_stats_info s = {
.cb = cp->cb, .t = t, .num_threads = cp->opts->num_threads,
.start_ns = clock_now(), .prev_ns = clock_now()};
.start_ns = clock_now(), .prev_ns = clock_now(),
.protocol = cp->fn ? cp->fn->fn_type : 0,
};
if (cp->opts->client) {
if (cp->opts->test_length > 0) {
signal(SIGALRM, sig_alarm_handler);
Expand Down