From 113a9f9d3beba99f632e84f47c9c7cd1b010dbe1 Mon Sep 17 00:00:00 2001 From: Eric Friedrich Date: Wed, 11 Oct 2017 11:39:22 -0400 Subject: [PATCH 1/2] Add interval bandwidth reporting option --- include/options.h | 1 + src/getter.c | 37 ++++++++++++++++++++++++++++++------ src/options.c | 14 ++++++++++++-- src/worker.c | 48 +++++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 92 insertions(+), 8 deletions(-) diff --git a/include/options.h b/include/options.h index 82722ce..4e19e3b 100644 --- a/include/options.h +++ b/include/options.h @@ -34,6 +34,7 @@ struct options { char *urls[MAX_URLS]; size_t urls_l; char *urls_loc; + double worker_report_interval; }; int initialise_options(struct options *opt, int argc, char **argv); diff --git a/src/getter.c b/src/getter.c index e32acb0..4e5d8d1 100644 --- a/src/getter.c +++ b/src/getter.c @@ -44,20 +44,27 @@ static size_t get_urls(struct worker *w, char **urls, char *urls_loc, int *total return urls_c; } -int get_once(struct worker *workers, char **urls_opt, size_t urls_l, char *urls_loc, int *requests) +int get_once(struct worker *workers, struct options* opt, int *requests) { struct worker *w; + ssize_t urls_l = opt->urls_l; + char *urls_loc = opt->urls_loc; int cururl = 0, total_bytes = 0, urls_alloc = 0, reqs = 0, err = 0, bytes, nfds, retval, len, i; - char **urls = urls_opt; + char **urls = opt->urls; fd_set rfds; char buf[PIPE_BUF+1] = {}, outbuf[PIPE_BUF+1] = {}; + struct timeval now_tv; + int dl_delta = 0; + double last_report = 0, now = 0, speed = 0, time_delta = 0; + unsigned int report_bytes_dl = 0, report_count = 0; + for(w = workers; w; w = w->next) { msg_write(w->pipe_w, "RESET", sizeof("RESET")); msg_read(w->pipe_r, buf, sizeof(buf)); w->status = STATUS_READY; } - if(urls_loc != NULL) { + if(opt->urls_loc != NULL) { urls = malloc(MAX_URLS * sizeof(urls)); if((len = get_urls(workers, urls, urls_loc, &total_bytes)) < 0) return len; urls_l = len; @@ -65,6 +72,9 @@ int get_once(struct worker *workers, char **urls_opt, size_t urls_l, char *urls_ reqs++; } + gettimeofday(&now_tv, NULL); + last_report = now_tv.tv_sec + now_tv.tv_usec / 1000000.0; + do { FD_ZERO(&rfds); nfds = -1; @@ -96,10 +106,25 @@ int get_once(struct worker *workers, char **urls_opt, size_t urls_l, char *urls_ if(sscanf(buf, "OK %d bytes", &bytes) == 1) { total_bytes += bytes; reqs++; + w->status = STATUS_READY; } else if(sscanf(buf, "ERR %d", &err) == 1) { fprintf(stderr, "cURL error: %s for URL '%s'.\n", curl_easy_strerror(err), w->url); - } - w->status = STATUS_READY; + w->status = STATUS_READY; + } else if(sscanf(buf, "REP %d", &dl_delta) == 1) { + report_bytes_dl += dl_delta; + gettimeofday(&now_tv, NULL); + now = now_tv.tv_sec + now_tv.tv_usec / 1000000.0; + time_delta = now - last_report; + if(time_delta >= opt->worker_report_interval) { + speed = report_bytes_dl / time_delta; + fprintf(stderr, "GETTER_INTERIM_RESULT[%u]=%f\r\n", report_count, speed); + fprintf(stderr, "GETTER_INTERVAL[%u]=%f\r\n", report_count, time_delta); + fprintf(stderr, "GETTER_ENDING[%u]=%f\r\n", report_count, now); + report_count++; + report_bytes_dl = 0; + last_report = now; + } + } } } } while(1); @@ -173,7 +198,7 @@ int get_loop(struct options *opt) gettimeofday(&start, NULL); } schedule_next(opt->interval, &start, &next); - bytes = get_once(workers, opt->urls, opt->urls_l, opt->urls_loc, &requests); + bytes = get_once(workers, opt, &requests); gettimeofday(&end, NULL); count++; total_count++; diff --git a/src/options.c b/src/options.c index 71e2e82..77b120f 100644 --- a/src/options.c +++ b/src/options.c @@ -29,6 +29,7 @@ int initialise_options(struct options *opt, int argc, char **argv) opt->urls_l = 0; memset(&opt->urls, 0, MAX_URLS * sizeof(&opt->urls)); opt->urls_loc = NULL; + opt->worker_report_interval = 0; if(parse_options(opt, argc, argv) < 0) return -2; @@ -51,7 +52,7 @@ void destroy_options(struct options *opt) static void usage(const char *name) { - fprintf(stderr, "Usage: %s [-46Dh] [-c ] [-d ] [-i ] [-l ] [-n ] [-o ] [-t ] [url_file]\n", name); + fprintf(stderr, "Usage: %s [-46Dh] [-c ] [-d ] [-i ] [-l ] [-n ] [-o ] [-t ] [-r ] [url_file]\n", name); } @@ -59,12 +60,13 @@ int parse_options(struct options *opt, int argc, char **argv) { int o; int val; + double val_f; FILE *output, *urlfile; char * line; size_t len = 0; ssize_t read; - while((o = getopt(argc, argv, "46Dhc:d:i:l:n:o:t:")) != -1) { + while((o = getopt(argc, argv, "46Dhc:d:i:l:n:o:t:r:")) != -1) { switch(o) { case '4': opt->ai_family = AF_INET; @@ -129,6 +131,14 @@ int parse_options(struct options *opt, int argc, char **argv) opt->output = output; } break; + case 'r': + val_f = atof(optarg); + if (val_f < 0) { + fprintf(stderr, "Invalid worker report interval value: %f\n", val_f); + return -1; + } + opt->worker_report_interval = val_f; + break; case 't': val = atoi(optarg); if(val < 0) { diff --git a/src/worker.c b/src/worker.c index 4f03de6..9599e60 100644 --- a/src/worker.c +++ b/src/worker.c @@ -32,6 +32,8 @@ struct worker_data { CURLcode res; int pipe_r; int pipe_w; + double worker_report_interval; + curl_off_t last_dlnow; }; @@ -53,6 +55,24 @@ static size_t memory_callback(void *contents, size_t size, size_t nmemb, void *u return realsize; } +static int prog_report(void *wd, curl_off_t dltotal, curl_off_t dlnow, curl_off_t ultotal, curl_off_t ulnow) { + struct worker_data *mywd = (struct worker_data *)wd; + char outbuf[PIPE_BUF+1] = {}; + ssize_t len; + curl_off_t delta; + + delta = dlnow - mywd->last_dlnow; + mywd->last_dlnow = dlnow; + len = sprintf(outbuf, "REP %"CURL_FORMAT_CURL_OFF_T, delta); + msg_write(mywd->pipe_w, outbuf, len); + + return 0; +} + +static int old_prog_report(void *wd, double dltotal, double dlnow, double ultotal, double ulnow) { + return prog_report(wd, (curl_off_t)dltotal, (curl_off_t)dlnow, (curl_off_t)ultotal, (curl_off_t)ulnow); +} + static int init_worker(struct worker_data *data) { int res; @@ -103,6 +123,31 @@ static int init_worker(struct worker_data *data) } + /* Enable periodic worker reporting if enabled on CLI */ + if (data->worker_report_interval > 0) { + fprintf(stderr, "Enable reporting on write pipe fd: %d\n", data->pipe_w); + if ((res = curl_easy_setopt(data->curl, CURLOPT_PROGRESSFUNCTION, old_prog_report)) != CURLE_OK) { + fprintf(stderr, "cURL option error: %s\n", curl_easy_strerror(res)); + } + if ((res = curl_easy_setopt(data->curl, CURLOPT_PROGRESSDATA, data)) != CURLE_OK) { + fprintf(stderr, "cURL option error: %s\n", curl_easy_strerror(res)); + } + +#if LIBCURL_VERSION_NUM >= 0x72000 + if ((res = curl_easy_setopt(data->curl, CURLOPT_XFERINFOFUNCTION, prog_report)) != CURLE_OK) { + fprintf(stderr, "cURL option error: %s\n", curl_easy_strerror(res)); + } + if ((res = curl_easy_setopt(data->curl, CURLOPT_XFERINFODATA, data)) != CURLE_OK) { + fprintf(stderr, "cURL option error: %s\n", curl_easy_strerror(res)); + } +#endif + if ((res = curl_easy_setopt(data->curl, CURLOPT_NOPROGRESS, 0L)) != CURLE_OK) { + fprintf(stderr, "cURL option error: %s\n", curl_easy_strerror(res)); + } + data->last_dlnow = (curl_off_t)0; + } + + data->chunk.memory = NULL; data->chunk.size = 0; data->chunk.enabled = 0; @@ -141,6 +186,7 @@ static int destroy_worker(struct worker_data *data) static int reset_worker(struct worker_data *data) { + fprintf(stderr, "RESET WORKER\n"); destroy_worker(data); return init_worker(data); } @@ -179,6 +225,7 @@ static int run_worker(struct worker_data *data) data->chunk.enabled = 1; } else if(strncmp(buf, "URL ", 4) == 0) { p = buf + 4; + data->last_dlnow = (curl_off_t)0; } else { fprintf(stderr, "Unrecognised command '%s'!\n", buf); break; @@ -257,6 +304,7 @@ int start_worker(struct worker *w, struct options *opt) wd.timeout = opt->timeout; wd.dns_servers = opt->dns_servers; wd.ai_family = opt->ai_family; + wd.worker_report_interval = opt->worker_report_interval; close(fds_r[0]); close(fds_w[1]); sigaction(SIGINT, &sigign, NULL); From 6de9cfc25f3a0cd6850a361acb81e5266dcb4e2c Mon Sep 17 00:00:00 2001 From: Eric Friedrich Date: Thu, 7 Dec 2017 12:53:13 -0500 Subject: [PATCH 2/2] Updates to PR comments Signed-off-by: Eric Friedrich --- src/getter.c | 15 +++++++++------ src/worker.c | 16 +++++++--------- 2 files changed, 16 insertions(+), 15 deletions(-) diff --git a/src/getter.c b/src/getter.c index 4e5d8d1..7ef5753 100644 --- a/src/getter.c +++ b/src/getter.c @@ -56,7 +56,7 @@ int get_once(struct worker *workers, struct options* opt, int *requests) struct timeval now_tv; int dl_delta = 0; double last_report = 0, now = 0, speed = 0, time_delta = 0; - unsigned int report_bytes_dl = 0, report_count = 0; + unsigned int report_bytes_dl = 0; for(w = workers; w; w = w->next) { msg_write(w->pipe_w, "RESET", sizeof("RESET")); @@ -116,11 +116,14 @@ int get_once(struct worker *workers, struct options* opt, int *requests) now = now_tv.tv_sec + now_tv.tv_usec / 1000000.0; time_delta = now - last_report; if(time_delta >= opt->worker_report_interval) { - speed = report_bytes_dl / time_delta; - fprintf(stderr, "GETTER_INTERIM_RESULT[%u]=%f\r\n", report_count, speed); - fprintf(stderr, "GETTER_INTERVAL[%u]=%f\r\n", report_count, time_delta); - fprintf(stderr, "GETTER_ENDING[%u]=%f\r\n", report_count, now); - report_count++; + speed = 8 * report_bytes_dl / time_delta; //Report on bits not bytes + fprintf(stdout, "Worker %u: %f bps (%u bytes over %f seconds) ending at %f\r\n", + w->pipe_w, + speed, + report_bytes_dl, + time_delta, + now); + report_bytes_dl = 0; last_report = now; } diff --git a/src/worker.c b/src/worker.c index 9599e60..53acbda 100644 --- a/src/worker.c +++ b/src/worker.c @@ -125,26 +125,25 @@ static int init_worker(struct worker_data *data) /* Enable periodic worker reporting if enabled on CLI */ if (data->worker_report_interval > 0) { - fprintf(stderr, "Enable reporting on write pipe fd: %d\n", data->pipe_w); - if ((res = curl_easy_setopt(data->curl, CURLOPT_PROGRESSFUNCTION, old_prog_report)) != CURLE_OK) { +#if LIBCURL_VERSION_NUM >= 0x72000 + if ((res = curl_easy_setopt(data->curl, CURLOPT_XFERINFOFUNCTION, prog_report)) != CURLE_OK) { fprintf(stderr, "cURL option error: %s\n", curl_easy_strerror(res)); } - if ((res = curl_easy_setopt(data->curl, CURLOPT_PROGRESSDATA, data)) != CURLE_OK) { + if ((res = curl_easy_setopt(data->curl, CURLOPT_XFERINFODATA, data)) != CURLE_OK) { fprintf(stderr, "cURL option error: %s\n", curl_easy_strerror(res)); } - -#if LIBCURL_VERSION_NUM >= 0x72000 - if ((res = curl_easy_setopt(data->curl, CURLOPT_XFERINFOFUNCTION, prog_report)) != CURLE_OK) { +#else + if ((res = curl_easy_setopt(data->curl, CURLOPT_PROGRESSFUNCTION, old_prog_report)) != CURLE_OK) { fprintf(stderr, "cURL option error: %s\n", curl_easy_strerror(res)); } - if ((res = curl_easy_setopt(data->curl, CURLOPT_XFERINFODATA, data)) != CURLE_OK) { + if ((res = curl_easy_setopt(data->curl, CURLOPT_PROGRESSDATA, data)) != CURLE_OK) { fprintf(stderr, "cURL option error: %s\n", curl_easy_strerror(res)); } #endif if ((res = curl_easy_setopt(data->curl, CURLOPT_NOPROGRESS, 0L)) != CURLE_OK) { fprintf(stderr, "cURL option error: %s\n", curl_easy_strerror(res)); } - data->last_dlnow = (curl_off_t)0; + data->last_dlnow = 0; } @@ -186,7 +185,6 @@ static int destroy_worker(struct worker_data *data) static int reset_worker(struct worker_data *data) { - fprintf(stderr, "RESET WORKER\n"); destroy_worker(data); return init_worker(data); }