diff --git a/Makefile b/Makefile index 7c951d191..774191110 100644 --- a/Makefile +++ b/Makefile @@ -1,6 +1,6 @@ # Makefile for htslib, a C library for high-throughput sequencing data formats. # -# Copyright (C) 2013-2024 Genome Research Ltd. +# Copyright (C) 2013-2025 Genome Research Ltd. # # Author: John Marshall # @@ -500,7 +500,6 @@ header.o header.pico: header.c config.h $(textutils_internal_h) $(header_h) $(ht hfile.o hfile.pico: hfile.c config.h $(htslib_hfile_h) $(hfile_internal_h) $(htslib_kstring_h) $(hts_internal_h) $(htslib_khash_h) hfile_gcs.o hfile_gcs.pico: hfile_gcs.c config.h $(htslib_hts_h) $(htslib_kstring_h) $(hfile_internal_h) hfile_libcurl.o hfile_libcurl.pico: hfile_libcurl.c config.h $(hfile_internal_h) $(htslib_hts_h) $(htslib_kstring_h) $(htslib_khash_h) -hfile_s3_write.o hfile_s3_write.pico: hfile_s3_write.c config.h $(hfile_internal_h) $(htslib_hts_h) $(htslib_kstring_h) $(htslib_khash_h) hfile_s3.o hfile_s3.pico: hfile_s3.c config.h $(hfile_internal_h) $(htslib_hts_h) $(htslib_kstring_h) $(hts_time_funcs_h) hts.o hts.pico: hts.c config.h os/lzma_stub.h $(htslib_hts_h) $(htslib_bgzf_h) $(cram_h) $(htslib_hfile_h) $(htslib_hts_endian_h) version.h config_vars.h $(hts_internal_h) $(hfile_internal_h) $(sam_internal_h) $(htslib_hts_expr_h) $(htslib_hts_os_h) $(htslib_khash_h) $(htslib_kseq_h) $(htslib_ksort_h) $(htslib_tbx_h) $(htscodecs_htscodecs_h) hts_expr.o hts_expr.pico: hts_expr.c config.h $(htslib_hts_expr_h) $(htslib_hts_log_h) $(textutils_internal_h) diff --git a/config.mk.in b/config.mk.in index 98acb01f2..38bafce3e 100644 --- a/config.mk.in +++ b/config.mk.in @@ -79,12 +79,10 @@ endif ifeq "s3-@s3@" "s3-enabled" plugin_OBJS += hfile_s3.o -plugin_OBJS += hfile_s3_write.o CRYPTO_LIBS = @CRYPTO_LIBS@ noplugin_LIBS += $(CRYPTO_LIBS) -hfile_s3$(PLUGIN_EXT): LIBS += $(CRYPTO_LIBS) -hfile_s3_write$(PLUGIN_EXT): LIBS += $(CRYPTO_LIBS) $(LIBCURL_LIBS) +hfile_s3$(PLUGIN_EXT): LIBS += $(CRYPTO_LIBS) $(LIBCURL_LIBS) endif ifeq "plugins-@enable_plugins@" "plugins-yes" @@ -101,7 +99,6 @@ plugin.o plugin.pico: ALL_CPPFLAGS += -DPLUGINPATH=\"$(pluginpath)\" hfile_gcs.o hfile_gcs.pico: version.h hfile_libcurl.o hfile_libcurl.pico: version.h hfile_s3.o hfile_s3.pico: version.h -hfile_s3_write.o hfile_s3_write.pico: version.h # Windows DLL plugins depend on the import library, built as a byproduct. $(plugin_OBJS:.o=.cygdll): cyghts-$(LIBHTS_SOVERSION).dll diff --git a/hfile.c b/hfile.c index 3b60bedda..044f0d4bc 100644 --- a/hfile.c +++ b/hfile.c @@ -1136,7 +1136,6 @@ static int load_hfile_plugins(void) #endif #ifdef ENABLE_S3 init_add_plugin(NULL, hfile_plugin_init_s3, "s3"); - init_add_plugin(NULL, hfile_plugin_init_s3_write, "s3w"); #endif #endif diff --git a/hfile_internal.h b/hfile_internal.h index 2e365ae7d..3c1fc9ab8 100644 --- a/hfile_internal.h +++ b/hfile_internal.h @@ -182,7 +182,6 @@ extern int hfile_plugin_init(struct hFILE_plugin *self); extern int hfile_plugin_init_gcs(struct hFILE_plugin *self); extern int hfile_plugin_init_libcurl(struct hFILE_plugin *self); extern int hfile_plugin_init_s3(struct hFILE_plugin *self); -extern int hfile_plugin_init_s3_write(struct hFILE_plugin *self); #endif // Callback to allow headers to be set in http connections. Currently used diff --git a/hfile_s3.c b/hfile_s3.c index c7c52e617..4fb32beef 100644 --- a/hfile_s3.c +++ b/hfile_s3.c @@ -1,6 +1,6 @@ /* hfile_s3.c -- Amazon S3 backend for low-level file streams. - Copyright (C) 2015-2017, 2019-2024 Genome Research Ltd. + Copyright (C) 2015-2017, 2019-2025 Genome Research Ltd. Author: John Marshall @@ -33,6 +33,7 @@ DEALINGS IN THE SOFTWARE. */ #include #include +#include #include "hfile_internal.h" #ifdef ENABLE_PLUGINS @@ -42,6 +43,8 @@ DEALINGS IN THE SOFTWARE. */ #include "htslib/kstring.h" #include "hts_time_funcs.h" +#include + typedef struct s3_auth_data { kstring_t id; kstring_t token; @@ -54,17 +57,49 @@ typedef struct s3_auth_data { enum {s3_auto, s3_virtual, s3_path} url_style; time_t creds_expiry_time; char *bucket; - kstring_t auth_hdr; time_t auth_time; char date[40]; char date_long[17]; char date_short[9]; kstring_t date_html; char mode; - char *headers[5]; - int refcount; + int is_v4; } s3_auth_data; +typedef struct { + hFILE base; + CURL *curl; + CURLcode ret; + s3_auth_data *au; + kstring_t buffer; + kstring_t url; + long verbose; + int write; + int part_size; // size for reading or writing + + kstring_t content_hash; + kstring_t authorisation; + kstring_t content; + kstring_t date; + kstring_t token; + kstring_t range; + + // write variables + kstring_t upload_id; + kstring_t completion_message; + int part_no; + int aborted; + size_t index; + int expand; + + // read variables + size_t last_read; // last read position (remote) + size_t last_read_buffer; // last read (local buffer) + int64_t file_size; // size of the file being read + int keep_going; + +} hFILE_s3; + #define AUTH_LIFETIME 60 // Regenerate auth headers if older than this #define CREDENTIAL_LIFETIME 60 // Seconds before expiry to reread credentials @@ -142,6 +177,7 @@ urldecode_kput(const char *s, int len, kstring_t *str) else kputc(s[i++], str); } + static void base64_kput(const unsigned char *data, size_t len, kstring_t *str) { static const char base64[] = @@ -166,6 +202,7 @@ static void base64_kput(const unsigned char *data, size_t len, kstring_t *str) kputsn("==", pad, str); } + static int is_dns_compliant(const char *s0, const char *slim, int is_https) { int has_nondigit = 0, len = 0; @@ -190,6 +227,7 @@ static int is_dns_compliant(const char *s0, const char *slim, int is_https) return has_nondigit && len >= 3 && len <= 63; } + static FILE *expand_tilde_open(const char *fname, const char *mode) { FILE *fp; @@ -211,6 +249,7 @@ static FILE *expand_tilde_open(const char *fname, const char *mode) return fp; } + static void parse_ini(const char *fname, const char *section, ...) { kstring_t line = { 0, 0, NULL }; @@ -252,6 +291,7 @@ static void parse_ini(const char *fname, const char *section, ...) free(line.s); } + static void parse_simple(const char *fname, kstring_t *id, kstring_t *secret) { kstring_t text = { 0, 0, NULL }; @@ -276,46 +316,8 @@ static void parse_simple(const char *fname, kstring_t *id, kstring_t *secret) free(text.s); } -static int copy_auth_headers(s3_auth_data *ad, char ***hdrs) { - char **hdr = &ad->headers[0]; - int idx = 0; - *hdrs = hdr; - - hdr[idx] = strdup(ad->date); - if (!hdr[idx]) return -1; - idx++; - - if (ad->token.l) { - kstring_t token_hdr = KS_INITIALIZE; - kputs("X-Amz-Security-Token: ", &token_hdr); - kputs(ad->token.s, &token_hdr); - if (token_hdr.s) { - hdr[idx++] = token_hdr.s; - } else { - goto fail; - } - } - - if (ad->auth_hdr.l) { - hdr[idx] = strdup(ad->auth_hdr.s); - if (!hdr[idx]) goto fail; - idx++; - } - - hdr[idx] = NULL; - return 0; - - fail: - for (--idx; idx >= 0; --idx) - free(hdr[idx]); - return -1; -} static void free_auth_data(s3_auth_data *ad) { - if (ad->refcount > 0) { - --ad->refcount; - return; - } free(ad->profile.s); free(ad->id.s); free(ad->token.s); @@ -325,7 +327,6 @@ static void free_auth_data(s3_auth_data *ad) { free(ad->user_query_string.s); free(ad->host.s); free(ad->bucket); - free(ad->auth_hdr.s); free(ad->date_html.s); free(ad); } @@ -391,64 +392,6 @@ static void refresh_auth_data(s3_auth_data *ad) { ks_free(&expiry_time); } -static int auth_header_callback(void *ctx, char ***hdrs) { - s3_auth_data *ad = (s3_auth_data *) ctx; - - time_t now = time(NULL); -#ifdef HAVE_GMTIME_R - struct tm tm_buffer; - struct tm *tm = gmtime_r(&now, &tm_buffer); -#else - struct tm *tm = gmtime(&now); -#endif - kstring_t message = { 0, 0, NULL }; - unsigned char digest[DIGEST_BUFSIZ]; - size_t digest_len; - - if (!hdrs) { // Closing connection - free_auth_data(ad); - return 0; - } - - if (ad->creds_expiry_time > 0 - && ad->creds_expiry_time - now < CREDENTIAL_LIFETIME) { - refresh_auth_data(ad); - } else if (now - ad->auth_time < AUTH_LIFETIME) { - // Last auth string should still be valid - *hdrs = NULL; - return 0; - } - - strftime(ad->date, sizeof(ad->date), "Date: %a, %d %b %Y %H:%M:%S GMT", tm); - if (!ad->id.l || !ad->secret.l) { - ad->auth_time = now; - return copy_auth_headers(ad, hdrs); - } - - if (ksprintf(&message, "%s\n\n\n%s\n%s%s%s%s", - ad->mode == 'r' ? "GET" : "PUT", ad->date + 6, - ad->token.l ? "x-amz-security-token:" : "", - ad->token.l ? ad->token.s : "", - ad->token.l ? "\n" : "", - ad->bucket) < 0) { - return -1; - } - - digest_len = s3_sign(digest, &ad->secret, &message); - ad->auth_hdr.l = 0; - if (ksprintf(&ad->auth_hdr, "Authorization: AWS %s:", ad->id.s) < 0) - goto fail; - base64_kput(digest, digest_len, &ad->auth_hdr); - - free(message.s); - ad->auth_time = now; - return copy_auth_headers(ad, hdrs); - - fail: - free(message.s); - return -1; -} - /* like a escape path but for query strings '=' and '&' are untouched */ static char *escape_query(const char *qs) { @@ -542,9 +485,10 @@ static int is_escaped(const char *str) { return escaped || !needs_escape; } -static int redirect_endpoint_callback(void *auth, long response, - kstring_t *header, kstring_t *url) { - s3_auth_data *ad = (s3_auth_data *)auth; + +static int redirect_endpoint(hFILE_s3 *fp, kstring_t *header) { + s3_auth_data *ad = fp->au; + kstring_t *url = &fp->url; char *new_region; char *end; int ret = -1; @@ -795,6 +739,7 @@ static s3_auth_data * setup_auth_data(const char *s3url, const char *mode, goto error; } memcpy(ad->bucket, url->s + url_path_pos, url->l - url_path_pos + 1); + ad->is_v4 = 1; } else { ad->bucket = malloc(url->l - url_path_pos + bucket_len + 2); @@ -805,6 +750,7 @@ static s3_auth_data * setup_auth_data(const char *s3url, const char *mode, memcpy(ad->bucket + 1, bucket, bucket_len); memcpy(ad->bucket + bucket_len + 1, url->s + url_path_pos, url->l - url_path_pos + 1); + ad->is_v4 = 0; } // write any query strings to its own place to use later @@ -823,29 +769,61 @@ static s3_auth_data * setup_auth_data(const char *s3url, const char *mode, return NULL; } -static hFILE * s3_rewrite(const char *s3url, const char *mode, va_list *argsp) -{ - kstring_t url = { 0, 0, NULL }; - s3_auth_data *ad = setup_auth_data(s3url, mode, 2, &url); - if (!ad) - return NULL; +static int v2_authorisation(hFILE_s3 *fp, char *request) { + s3_auth_data *ad = fp->au; + time_t now = time(NULL); + +#ifdef HAVE_GMTIME_R + struct tm tm_buffer; + struct tm *tm = gmtime_r(&now, &tm_buffer); +#else + struct tm *tm = gmtime(&now); +#endif - hFILE *fp = hopen(url.s, mode, "va_list", argsp, - "httphdr_callback", auth_header_callback, - "httphdr_callback_data", ad, - "redirect_callback", redirect_endpoint_callback, - "redirect_callback_data", ad, - NULL); - if (!fp) goto fail; + kstring_t message = KS_INITIALIZE; + unsigned char digest[DIGEST_BUFSIZ]; + size_t digest_len; - free(url.s); - return fp; + if (ad->creds_expiry_time > 0 + && ad->creds_expiry_time - now < CREDENTIAL_LIFETIME) { + refresh_auth_data(ad); + } + + // date format between v2 and v4 is different. + + strftime(ad->date, sizeof(ad->date), "Date: %a, %d %b %Y %H:%M:%S GMT", tm); + + kputs(ad->date, &fp->date); + + if (!ad->id.l || !ad->secret.l) { + ad->auth_time = now; + return 0; + } + + if (ksprintf(&message, "%s\n\n\n%s\n%s%s%s%s", + request, ad->date + 6, + ad->token.l ? "x-amz-security-token:" : "", + ad->token.l ? ad->token.s : "", + ad->token.l ? "\n" : "", + ad->bucket) < 0) { + return -1; + } + + digest_len = s3_sign(digest, &ad->secret, &message); + + if (ksprintf(&fp->authorisation, "Authorization: AWS %s:", ad->id.s) < 0) + goto fail; + + base64_kput(digest, digest_len, &fp->authorisation); + + free(message.s); + ad->auth_time = now; + return 0; fail: - free(url.s); - free_auth_data(ad); - return NULL; + free(message.s); + return -1; } /*************************************************************** @@ -865,18 +843,6 @@ static void hash_string(char *in, size_t length, char *out, size_t out_len) { } } -static void ksinit(kstring_t *s) { - s->l = 0; - s->m = 0; - s->s = NULL; -} - - -static void ksfree(kstring_t *s) { - free(s->s); - ksinit(s); -} - static int make_signature(s3_auth_data *ad, kstring_t *string_to_sign, char *signature_string, size_t sig_string_len) { unsigned char date_key[SHA256_DIGEST_BUFSIZE]; @@ -908,7 +874,7 @@ static int make_signature(s3_auth_data *ad, kstring_t *string_to_sign, char *sig snprintf(signature_string + j, sig_string_len - j, "%02x", signature[i]); } - ksfree(&secret_access_key); + ks_free(&secret_access_key); return 0; } @@ -924,6 +890,9 @@ static int make_authorisation(s3_auth_data *ad, char *http_request, char *conten char signature_string[HASH_LENGTH_SHA256]; int ret = -1; + if (!ad->id.l || !ad->secret.l) { + return 0; + } if (!ad->token.l) { kputs("host;x-amz-content-sha256;x-amz-date", &signed_headers); @@ -985,11 +954,11 @@ static int make_authorisation(s3_auth_data *ad, char *http_request, char *conten ret = 0; cleanup: - ksfree(&signed_headers); - ksfree(&canonical_headers); - ksfree(&canonical_request); - ksfree(&scope); - ksfree(&string_to_sign); + ks_free(&signed_headers); + ks_free(&canonical_headers); + ks_free(&canonical_request); + ks_free(&scope); + ks_free(&string_to_sign); return ret; } @@ -1082,24 +1051,17 @@ static int order_query_string(kstring_t *qs) { } -static int write_authorisation_callback(void *auth, char *request, kstring_t *content, char *cqs, - kstring_t *hash, kstring_t *auth_str, kstring_t *date, - kstring_t *token, int uqs) { - s3_auth_data *ad = (s3_auth_data *)auth; +static int v4_authorisation(hFILE_s3 *fp, char *request, kstring_t *content, char *cqs, int uqs) { + s3_auth_data *ad = fp->au; char content_hash[HASH_LENGTH_SHA256]; time_t now; - if (request == NULL) { - // signal to free auth data - free_auth_data(ad); - return 0; - } - now = time(NULL); if (update_time(ad, now)) { return -1; } + if (ad->creds_expiry_time > 0 && ad->creds_expiry_time - now < CREDENTIAL_LIFETIME) { refresh_auth_data(ad); @@ -1113,160 +1075,169 @@ static int write_authorisation_callback(void *auth, char *request, kstring_t *co } ad->canonical_query_string.l = 0; - kputs(cqs, &ad->canonical_query_string); - if (ad->canonical_query_string.l == 0) { - return -1; - } + if (cqs) { + kputs(cqs, &ad->canonical_query_string); - /* add a user provided query string, normally only useful on upload initiation */ - if (uqs) { - kputs("&", &ad->canonical_query_string); - kputs(ad->user_query_string.s, &ad->canonical_query_string); + /* add a user provided query string, normally only useful on upload initiation */ + if (uqs) { + kputs("&", &ad->canonical_query_string); + kputs(ad->user_query_string.s, &ad->canonical_query_string); - if (order_query_string(&ad->canonical_query_string)) { - return -1; + if (order_query_string(&ad->canonical_query_string)) { + return -1; + } } } - if (make_authorisation(ad, request, content_hash, auth_str)) { + if (make_authorisation(ad, request, content_hash, &fp->authorisation)) { return -1; } - kputs(ad->date_html.s, date); - kputsn(content_hash, HASH_LENGTH_SHA256, hash); + kputs(ad->date_html.s, &fp->date); + kputsn(content_hash, HASH_LENGTH_SHA256, &fp->content_hash); - if (date->l == 0 || hash->l == 0) { + if (fp->date.l == 0 || fp->content_hash.l == 0) { return -1; } if (ad->token.l) { - ksprintf(token, "x-amz-security-token: %s", ad->token.s); + ksprintf(&fp->token, "x-amz-security-token: %s", ad->token.s); } return 0; } +static int set_region(s3_auth_data *ad, kstring_t *region) { + ad->region.l = 0; + return kputsn(region->s, region->l, &ad->region) < 0; +} -static int v4_auth_header_callback(void *ctx, char ***hdrs) { - s3_auth_data *ad = (s3_auth_data *) ctx; - char content_hash[HASH_LENGTH_SHA256]; - kstring_t content = KS_INITIALIZE; - kstring_t authorisation = KS_INITIALIZE; - kstring_t token_hdr = KS_INITIALIZE; - char *date_html = NULL; - time_t now; - int idx; +// +// Writing and reading handling +// - if (!hdrs) { // Closing connection - free_auth_data(ad); - return 0; - } +// Some common code - now = time(NULL); +#define S3_MOVED_PERMANENTLY 301 +#define S3_BAD_REQUEST 400 - if (update_time(ad, now)) { - return -1; - } - if (ad->creds_expiry_time > 0 - && ad->creds_expiry_time - now < CREDENTIAL_LIFETIME) { - refresh_auth_data(ad); - } +static struct { + kstring_t useragent; + CURLSH *share; + pthread_mutex_t share_lock; +} curl = { { 0, 0, NULL }, NULL, PTHREAD_MUTEX_INITIALIZER }; - if (!ad->id.l || !ad->secret.l) { - return copy_auth_headers(ad, hdrs); - } +static void share_lock(CURL *handle, curl_lock_data data, + curl_lock_access access, void *userptr) { + pthread_mutex_lock(&curl.share_lock); +} - hash_string("", 0, content_hash, sizeof(content_hash)); // empty hash +static void share_unlock(CURL *handle, curl_lock_data data, void *userptr) { + pthread_mutex_unlock(&curl.share_lock); +} - ad->canonical_query_string.l = 0; - if (ad->user_query_string.l > 0) { - kputs(ad->user_query_string.s, &ad->canonical_query_string); +static void initialise_authorisation_values(hFILE_s3 *fp) { + ks_initialize(&fp->content_hash); + ks_initialize(&fp->authorisation); + ks_initialize(&fp->content); + ks_initialize(&fp->date); + ks_initialize(&fp->token); + ks_initialize(&fp->range); +} - if (order_query_string(&ad->canonical_query_string)) { - return -1; + +static void clear_authorisation_values(hFILE_s3 *fp) { + ks_clear(&fp->content_hash); + ks_clear(&fp->authorisation); + ks_clear(&fp->content); + ks_clear(&fp->date); + ks_clear(&fp->token); + ks_clear(&fp->range); +} + + +static void free_authorisation_values(hFILE_s3 *fp) { + ks_free(&fp->content_hash); + ks_free(&fp->authorisation); + ks_free(&fp->content); + ks_free(&fp->date); + ks_free(&fp->token); + ks_free(&fp->range); +} + +/* As the response text is case insensitive we need a version of strstr that + is also case insensitive. The response is small so no need to get too + complicated on the string search. +*/ +static char *stristr(char *haystack, char *needle) { + + while (*haystack) { + char *h = haystack; + char *n = needle; + + while (toupper(*h) == toupper(*n)) { + h++, n++; + if (!*h || !*n) break; } - } else { - kputs("", &ad->canonical_query_string); - } - if (make_authorisation(ad, "GET", content_hash, &authorisation)) { - return -1; + if (!*n) break; + + haystack++; } - ksprintf(&content, "x-amz-content-sha256: %s", content_hash); - date_html = strdup(ad->date_html.s); + if (!*haystack) return NULL; - if (ad->token.l > 0) { - kputs("X-Amz-Security-Token: ", &token_hdr); - kputs(ad->token.s, &token_hdr); - } + return haystack; +} - if (content.l == 0 || date_html == NULL) { - ksfree(&authorisation); - ksfree(&content); - ksfree(&token_hdr); - free(date_html); - return -1; + +static int get_entry(char *in, char *start_tag, char *end_tag, kstring_t *out) { + char *start; + char *end; + + if (!in) { + return EOF; } - *hdrs = &ad->headers[0]; - idx = 0; - ad->headers[idx++] = ks_release(&authorisation); - ad->headers[idx++] = date_html; - ad->headers[idx++] = ks_release(&content); - if (token_hdr.s) - ad->headers[idx++] = ks_release(&token_hdr); - ad->headers[idx++] = NULL; + start = stristr(in, start_tag); + if (!start) return EOF; - return 0; + start += strlen(start_tag); + end = stristr(start, end_tag); + + if (!end) return EOF; + + return kputsn(start, end - start, out); } -static int handle_400_response(hFILE *fp, s3_auth_data *ad) { - // v4 signatures in virtual hosted mode return 400 Bad Request if the - // wrong region is used to make the signature. The response is an xml - // document which includes the name of the correct region. This can - // be extracted and used to generate a corrected signature. - // As the xml is fairly simple, go with something "good enough" instead - // of trying to parse it properly. - char buffer[1024], *region, *reg_end; - ssize_t bytes; +static int report_s3_error(kstring_t *body, long resp_code) { + kstring_t entry = KS_INITIALIZE; - bytes = hread(fp, buffer, sizeof(buffer) - 1); - if (bytes < 0) { - return -1; - } - buffer[bytes] = '\0'; - region = strstr(buffer, ""); - if (region == NULL) { - return -1; - } - region += 8; - while (isspace((unsigned char) *region)) ++region; - reg_end = strchr(region, '<'); - if (reg_end == NULL || strncmp(reg_end + 1, "/Region>", 8) != 0) { + if (get_entry(body->s, "", "", &entry) == EOF) { return -1; } - while (reg_end > region && isspace((unsigned char) reg_end[-1])) --reg_end; - ad->region.l = 0; - kputsn(region, reg_end - region, &ad->region); - if (ad->region.l == 0) { + + fprintf(stderr, "hfile_s3: S3 error %ld: %s\n", resp_code, entry.s); + + ks_clear(&entry); + + if (get_entry(body->s, "", "", &entry) == EOF) { return -1; } - return 0; -} + if (entry.l) + fprintf(stderr, "%s\n", entry.s); -static int set_region(void *adv, kstring_t *region) { - s3_auth_data *ad = (s3_auth_data *) adv; + ks_free(&entry); - ad->region.l = 0; - return kputsn(region->s, region->l, &ad->region) < 0; + return 0; } + static int http_status_errno(int status) { if (status >= 500) @@ -1287,156 +1258,1213 @@ static int http_status_errno(int status) case 410: return ENOENT; default: return EINVAL; } + else if (status >= 300) + return EIO; else return 0; } -static hFILE *s3_open_v4(const char *s3url, const char *mode, va_list *argsp) { - kstring_t url = { 0, 0, NULL }; - s3_auth_data *ad = setup_auth_data(s3url, mode, 4, &url); - hFILE *fp = NULL; +static void initialise_local(hFILE_s3 *fp) { + ks_initialize(&fp->buffer); + ks_initialize(&fp->url); + ks_initialize(&fp->upload_id); // write only + ks_initialize(&fp->completion_message); // write only +} - if (ad == NULL) { - return NULL; + +static void cleanup_local(hFILE_s3 *fp) { + ks_free(&fp->buffer); + ks_free(&fp->url); + ks_free(&fp->upload_id); + ks_free(&fp->completion_message); + curl_easy_cleanup(fp->curl); + free_authorisation_values(fp); +} + + +static void cleanup(hFILE_s3 *fp) { + // free up authorisation data + free_auth_data(fp->au); + cleanup_local(fp); +} + +static size_t response_callback(void *contents, size_t size, size_t nmemb, void *userp) { + size_t realsize = size * nmemb; + kstring_t *resp = (kstring_t *)userp; + + if (kputsn((const char *)contents, realsize, resp) == EOF) { + return 0; } - if (ad->mode == 'r') { - long http_response = 0; - - fp = hopen(url.s, mode, "va_list", argsp, - "httphdr_callback", v4_auth_header_callback, - "httphdr_callback_data", ad, - "redirect_callback", redirect_endpoint_callback, - "redirect_callback_data", ad, - "http_response_ptr", &http_response, - "fail_on_error", 0, - NULL); - - if (fp == NULL) goto error; - - if (http_response == 307) { - // Follow additional redirect. - ad->refcount = 1; - hclose_abruptly(fp); - - url.l = 0; - ksprintf(&url, "https://%s%s", ad->host.s, ad->bucket); - - fp = hopen(url.s, mode, "va_list", argsp, - "httphdr_callback", v4_auth_header_callback, - "httphdr_callback_data", ad, - "redirect_callback", redirect_endpoint_callback, - "redirect_callback_data", ad, - "http_response_ptr", &http_response, - "fail_on_error", 0, - NULL); - } + return realsize; +} - if (http_response == 400) { - ad->refcount = 1; - if (handle_400_response(fp, ad) != 0) { - goto error; - } - hclose_abruptly(fp); - fp = hopen(url.s, mode, "va_list", argsp, - "httphdr_callback", v4_auth_header_callback, - "httphdr_callback_data", ad, - "redirect_callback", redirect_endpoint_callback, - "redirect_callback_data", ad, - NULL); - } else if (http_response > 400) { - ad->refcount = 1; - errno = http_status_errno(http_response); - goto error; - } - if (fp == NULL) goto error; +static int add_header(struct curl_slist **head, char *value) { + int err = 0; + struct curl_slist *tmp; + + if ((tmp = curl_slist_append(*head, value)) == NULL) { + err = 1; } else { - kstring_t final_url = KS_INITIALIZE; + *head = tmp; + } - // add the scheme marker - ksprintf(&final_url, "s3w+%s", url.s); + return err; +} - if(final_url.l == 0) goto error; - fp = hopen(final_url.s, mode, "va_list", argsp, - "s3_auth_callback", write_authorisation_callback, - "s3_auth_callback_data", ad, - "redirect_callback", redirect_endpoint_callback, - "set_region_callback", set_region, - NULL); - free(final_url.s); +static struct curl_slist *set_html_headers(hFILE_s3 *fp, kstring_t *auth, kstring_t *date, + kstring_t *content, kstring_t *token, kstring_t *range) { + struct curl_slist *headers = NULL; + int error = 0; - if (fp == NULL) goto error; - } + if (auth->l) + if ((error = add_header(&headers, auth->s))) + goto error; - free(url.s); + if ((error = add_header(&headers, date->s))) + goto error; - return fp; + if (content->l) + if ((error = add_header(&headers, content->s))) + goto error; - error: + if (range) + if ((error = add_header(&headers, range->s))) + goto error; - if (fp) hclose_abruptly(fp); - free(url.s); - free_auth_data(ad); + if (token->l) + if ((error = add_header(&headers, token->s))) + goto error; - return NULL; + curl_easy_setopt(fp->curl, CURLOPT_HTTPHEADER, headers); + +error: + + if (error) { + curl_slist_free_all(headers); + headers = NULL; + } + + return headers; } -static hFILE *s3_open(const char *url, const char *mode) -{ - hFILE *fp; +/* - kstring_t mode_colon = { 0, 0, NULL }; - kputs(mode, &mode_colon); - kputc(':', &mode_colon); +S3 Multipart Upload +------------------- - if (getenv("HTS_S3_V2") == NULL) { // Force the v2 signature code - fp = s3_open_v4(url, mode_colon.s, NULL); - } else { - fp = s3_rewrite(url, mode_colon.s, NULL); - } +There are several steps in the Mulitipart upload. - free(mode_colon.s); - return fp; -} +1) Initiate Upload +------------------ -static hFILE *s3_vopen(const char *url, const char *mode_colon, va_list args0) -{ - hFILE *fp; - // Need to use va_copy() as we can only take the address of an actual - // va_list object, not that of a parameter whose type may have decayed. - va_list args; - va_copy(args, args0); +Initiate the upload and get an upload ID. This ID is used in all other steps. - if (getenv("HTS_S3_V2") == NULL) { // Force the v2 signature code - fp = s3_open_v4(url, mode_colon, &args); - } else { - fp = s3_rewrite(url, mode_colon, &args); - } - va_end(args); - return fp; -} +2) Upload Part +-------------- -int PLUGIN_GLOBAL(hfile_plugin_init,_s3)(struct hFILE_plugin *self) -{ - static const struct hFILE_scheme_handler handler = - { s3_open, hfile_always_remote, "Amazon S3", 2000 + 50, s3_vopen - }; +Upload a part of the data. 5Mb minimum part size (except for the last part). +Each part is numbered and a successful upload returns an Etag header value that +needs to used for the completion step. -#ifdef ENABLE_PLUGINS - // Embed version string for examination via strings(1) or what(1) - static const char id[] = "@(#)hfile_s3 plugin (htslib)\t" HTS_VERSION_TEXT; - if (hts_verbose >= 9) - fprintf(stderr, "[M::hfile_s3.init] version %s\n", strchr(id, '\t')+1); -#endif +Step repeated till all data is uploaded. - self->name = "Amazon S3"; - hfile_add_scheme_handler("s3", &handler); - hfile_add_scheme_handler("s3+http", &handler); - hfile_add_scheme_handler("s3+https", &handler); - return 0; + +3) Completion +------------- + +Complete the upload by sending all the part numbers along with their associated +Etag values. + + +Optional - Abort +---------------- + +If something goes wrong this instructs the server to delete all the partial +uploads and abandon the upload process. +*/ + +/* + This is the writing code. +*/ + +#define MINIMUM_S3_WRITE_SIZE 5242880 + +// Lets the part memory size grow to about 1Gb giving a 2.5Tb max file size. +// Max. parts allowed by AWS is 10000, so use ceil(10000.0/9.0) +#define EXPAND_ON 1112 + + + +/* + The partially uploaded file will hang around unless the delete command is sent. +*/ +static int abort_upload(hFILE_s3 *fp) { + kstring_t url = KS_INITIALIZE; + kstring_t canonical_query_string = KS_INITIALIZE; + int ret = -1; + struct curl_slist *headers = NULL; + char http_request[] = "DELETE"; + CURLcode err; + + clear_authorisation_values(fp); + + if (ksprintf(&canonical_query_string, "uploadId=%s", fp->upload_id.s) < 0) { + goto out; + } + + if (v4_authorisation(fp, http_request, NULL, canonical_query_string.s, 0) != 0) { + goto out; + } + + if (ksprintf(&url, "%s?%s", fp->url.s, canonical_query_string.s) < 0) { + goto out; + } + + if (ksprintf(&fp->content, "x-amz-content-sha256: %s", fp->content_hash.s) < 0) { + goto out; + } + + curl_easy_reset(fp->curl); + + err = curl_easy_setopt(fp->curl, CURLOPT_CUSTOMREQUEST, http_request); + err |= curl_easy_setopt(fp->curl, CURLOPT_USERAGENT, curl.useragent.s); + err |= curl_easy_setopt(fp->curl, CURLOPT_URL, url.s); + err |= curl_easy_setopt(fp->curl, CURLOPT_VERBOSE, fp->verbose); + + if (err != CURLE_OK) + goto out; + + headers = set_html_headers(fp, &fp->authorisation, &fp->date, &fp->content, &fp->token, NULL); + + if (!headers) + goto out; + + fp->ret = curl_easy_perform(fp->curl); + + if (fp->ret == CURLE_OK) { + ret = 0; + } + + out: + ks_free(&url); + ks_free(&canonical_query_string); + curl_slist_free_all(headers); + + fp->aborted = 1; + cleanup(fp); + + return ret; +} + + +static int complete_upload(hFILE_s3 *fp, kstring_t *resp) { + kstring_t url = KS_INITIALIZE; + kstring_t canonical_query_string = KS_INITIALIZE; + int ret = -1; + struct curl_slist *headers = NULL; + char http_request[] = "POST"; + CURLcode err; + + clear_authorisation_values(fp); + + if (ksprintf(&canonical_query_string, "uploadId=%s", fp->upload_id.s) < 0) { + return -1; + } + + // finish off the completion reply + if (kputs("\n", &fp->completion_message) < 0) { + goto out; + } + + if (v4_authorisation(fp, http_request, &fp->completion_message, canonical_query_string.s, 0) != 0) { + goto out; + } + + if (ksprintf(&url, "%s?%s", fp->url.s, canonical_query_string.s) < 0) { + goto out; + } + + if (ksprintf(&fp->content, "x-amz-content-sha256: %s", fp->content_hash.s) < 0) { + goto out; + } + + curl_easy_reset(fp->curl); + + err = curl_easy_setopt(fp->curl, CURLOPT_POST, 1L); + err |= curl_easy_setopt(fp->curl, CURLOPT_POSTFIELDS, fp->completion_message.s); + err |= curl_easy_setopt(fp->curl, CURLOPT_POSTFIELDSIZE, (long) fp->completion_message.l); + err |= curl_easy_setopt(fp->curl, CURLOPT_WRITEFUNCTION, response_callback); + err |= curl_easy_setopt(fp->curl, CURLOPT_WRITEDATA, (void *)resp); + err |= curl_easy_setopt(fp->curl, CURLOPT_URL, url.s); + err |= curl_easy_setopt(fp->curl, CURLOPT_USERAGENT, curl.useragent.s); + err |= curl_easy_setopt(fp->curl, CURLOPT_VERBOSE, fp->verbose); + + if (err != CURLE_OK) + goto out; + + headers = set_html_headers(fp, &fp->authorisation, &fp->date, &fp->content, &fp->token, NULL); + + if (!headers) + goto out; + + fp->ret = curl_easy_perform(fp->curl); + + if (fp->ret == CURLE_OK) { + ret = 0; + } + + out: + ks_free(&url); + ks_free(&canonical_query_string); + curl_slist_free_all(headers); + + return ret; } + + +static size_t upload_callback(void *ptr, size_t size, size_t nmemb, void *stream) { + size_t realsize = size * nmemb; + hFILE_s3 *fp = (hFILE_s3 *)stream; + size_t read_length; + + if (realsize > (fp->buffer.l - fp->index)) { + read_length = fp->buffer.l - fp->index; + } else { + read_length = realsize; + } + + memcpy(ptr, fp->buffer.s + fp->index, read_length); + fp->index += read_length; + + return read_length; +} + + +static int upload_part(hFILE_s3 *fp, kstring_t *resp) { + kstring_t url = KS_INITIALIZE; + kstring_t canonical_query_string = KS_INITIALIZE; + int ret = -1; + struct curl_slist *headers = NULL; + char http_request[] = "PUT"; + CURLcode err; + + clear_authorisation_values(fp); + + if (ksprintf(&canonical_query_string, "partNumber=%d&uploadId=%s", fp->part_no, fp->upload_id.s) < 0) { + return -1; + } + + if (v4_authorisation(fp, http_request, &fp->buffer, canonical_query_string.s, 0) != 0) { + goto out; + } + + if (ksprintf(&url, "%s?%s", fp->url.s, canonical_query_string.s) < 0) { + goto out; + } + + fp->index = 0; + if (ksprintf(&fp->content, "x-amz-content-sha256: %s", fp->content_hash.s) < 0) { + goto out; + } + + curl_easy_reset(fp->curl); + + err = curl_easy_setopt(fp->curl, CURLOPT_UPLOAD, 1L); + err |= curl_easy_setopt(fp->curl, CURLOPT_READFUNCTION, upload_callback); + err |= curl_easy_setopt(fp->curl, CURLOPT_READDATA, fp); + err |= curl_easy_setopt(fp->curl, CURLOPT_INFILESIZE_LARGE, (curl_off_t)fp->buffer.l); + err |= curl_easy_setopt(fp->curl, CURLOPT_HEADERFUNCTION, response_callback); + err |= curl_easy_setopt(fp->curl, CURLOPT_HEADERDATA, (void *)resp); + err |= curl_easy_setopt(fp->curl, CURLOPT_URL, url.s); + err |= curl_easy_setopt(fp->curl, CURLOPT_USERAGENT, curl.useragent.s); + err |= curl_easy_setopt(fp->curl, CURLOPT_VERBOSE, fp->verbose); + + if (err != CURLE_OK) + goto out; + + headers = set_html_headers(fp, &fp->authorisation, &fp->date, &fp->content, &fp->token, NULL); + + if (!headers) + goto out; + + fp->ret = curl_easy_perform(fp->curl); + + if (fp->ret == CURLE_OK) { + ret = 0; + } + + out: + ks_free(&url); + ks_free(&canonical_query_string); + curl_slist_free_all(headers); + + return ret; +} + + +static ssize_t s3_write(hFILE *fpv, const void *bufferv, size_t nbytes) { + hFILE_s3 *fp = (hFILE_s3 *)fpv; + const char *buffer = (const char *)bufferv; + CURLcode cret; + + if (kputsn(buffer, nbytes, &fp->buffer) == EOF) { + return -1; + } + + if (fp->buffer.l > fp->part_size) { + // time to write out our data + kstring_t response = {0, 0, NULL}; + int ret; + + ret = upload_part(fp, &response); + + if (!ret) { + long response_code; + kstring_t etag = {0, 0, NULL}; + + cret = curl_easy_getinfo(fp->curl, CURLINFO_RESPONSE_CODE, &response_code); + + if (cret != CURLE_OK || response_code > 200) { + errno = http_status_errno(response_code); + ret = -1; + } else { + if (get_entry(response.s, "Etag: \"", "\"", &etag) == EOF) { + fprintf(stderr, "hfile_s3: Failed to read Etag\n"); + ret = -1; + } else { + ksprintf(&fp->completion_message, "\t\n\t\t%d\n\t\t%s\n\t\n", + fp->part_no, etag.s); + + ks_free(&etag); + } + } + } + + ks_free(&response); + + if (ret) { + abort_upload(fp); + return -1; + } + + fp->part_no++; + fp->buffer.l = 0; + + if (fp->expand && (fp->part_no % EXPAND_ON == 0)) { + fp->part_size *= 2; + } + } + + return nbytes; +} + + +static int s3_write_close(hFILE *fpv) { + hFILE_s3 *fp = (hFILE_s3 *)fpv; + kstring_t response = {0, 0, NULL}; + int ret = 0; + CURLcode cret; + + if (!fp->aborted) { + + if (fp->buffer.l) { + // write the last part + + ret = upload_part(fp, &response); + + if (!ret) { + long response_code; + kstring_t etag = {0, 0, NULL}; + + cret = curl_easy_getinfo(fp->curl, CURLINFO_RESPONSE_CODE, &response_code); + + if (cret != CURLE_OK || response_code > 200) { + errno = http_status_errno(response_code); + ret = -1; + } else { + if (get_entry(response.s, "ETag: \"", "\"", &etag) == EOF) { + ret = -1; + } else { + ksprintf(&fp->completion_message, "\t\n\t\t%d\n\t\t%s\n\t\n", + fp->part_no, etag.s); + + ks_free(&etag); + } + } + } + + ks_free(&response); + + if (ret) { + abort_upload(fp); + return -1; + } + + fp->part_no++; + } + + if (fp->part_no > 1) { + ret = complete_upload(fp, &response); + + if (!ret) { + if (strstr(response.s, "CompleteMultipartUploadResult") == NULL) { + ret = -1; + } + } + } else { + ret = -1; + } + + if (ret) { + abort_upload(fp); + } else { + cleanup(fp); + } + } + + ks_free(&response); + + return ret; +} + + +static int handle_bad_request(hFILE_s3 *fp, kstring_t *resp) { + kstring_t region = {0, 0, NULL}; + int ret = -1; + + if (get_entry(resp->s, "", "", ®ion) == EOF) { + return -1; + } + + ret = set_region(fp->au, ®ion); + + ks_free(®ion); + + return ret; +} + +static int initialise_upload(hFILE_s3 *fp, kstring_t *head, kstring_t *resp, int user_query) { + kstring_t url = KS_INITIALIZE; + int ret = -1; + struct curl_slist *headers = NULL; + char http_request[] = "POST"; + char delimiter = '?'; + CURLcode err; + + clear_authorisation_values(fp); + + if (user_query) { + delimiter = '&'; + } + + if (v4_authorisation(fp, http_request, NULL, "uploads=", user_query) != 0) { + goto out; + } + + if (ksprintf(&url, "%s%cuploads", fp->url.s, delimiter) < 0) { + goto out; + } + + if (ksprintf(&fp->content, "x-amz-content-sha256: %s", fp->content_hash.s) < 0) { + goto out; + } + + err = curl_easy_setopt(fp->curl, CURLOPT_URL, url.s); + err |= curl_easy_setopt(fp->curl, CURLOPT_POST, 1L); + err |= curl_easy_setopt(fp->curl, CURLOPT_POSTFIELDS, ""); // send no data + err |= curl_easy_setopt(fp->curl, CURLOPT_WRITEFUNCTION, response_callback); + err |= curl_easy_setopt(fp->curl, CURLOPT_WRITEDATA, (void *)resp); + err |= curl_easy_setopt(fp->curl, CURLOPT_HEADERFUNCTION, response_callback); + err |= curl_easy_setopt(fp->curl, CURLOPT_HEADERDATA, (void *)head); + err |= curl_easy_setopt(fp->curl, CURLOPT_USERAGENT, curl.useragent.s); + err |= curl_easy_setopt(fp->curl, CURLOPT_VERBOSE, fp->verbose); + + if (err != CURLE_OK) + goto out; + + headers = set_html_headers(fp, &fp->authorisation, &fp->date, &fp->content, &fp->token, NULL); + + if (!headers) + goto out; + + fp->ret = curl_easy_perform(fp->curl); + + if (fp->ret == CURLE_OK) { + ret = 0; + } + + out: + curl_slist_free_all(headers); + ks_free(&url); + + return ret; +} + + +static int get_upload_id(hFILE_s3 *fp, kstring_t *resp) { + int ret = 0; + + if (get_entry(resp->s, "", "", &fp->upload_id) == EOF) { + ret = -1; + } + + return ret; +} + + +/* + Now for the reading code +*/ + +#define READ_PART_SIZE 1048576 + +static size_t recv_callback(char *ptr, size_t size, size_t nmemb, void *fpv) { + hFILE_s3 *fp = (hFILE_s3 *) fpv; + size_t n = size * nmemb; + + if (n) { + if (kputsn(ptr, n, &fp->buffer) == EOF) { + fprintf(stderr, "hfile_s3: error: unable to allocate memory to read data.\n"); + return 0; + } + } + + return n; +} + + +static int s3_read_close(hFILE *fpv) { + hFILE_s3 *fp = (hFILE_s3 *)fpv; + + cleanup(fp); + + return 0; +} + + +static int get_part(hFILE_s3 *fp, kstring_t *resp) { + struct curl_slist *headers = NULL; + int ret = -1; + char http_request[] = "GET"; + char canonical_query_string = 0; + CURLcode err; + + ks_clear(&fp->buffer); // reset storage buffer + clear_authorisation_values(fp); + + if (fp->au->is_v4) { + if (v4_authorisation(fp, http_request, NULL, &canonical_query_string, 0) != 0) { + goto out; + } + + if (hts_verbose >= HTS_LOG_INFO) fprintf(stderr, "hfile_s3: get_part: v4 auth done\n"); + + if (ksprintf(&fp->content, "x-amz-content-sha256: %s", fp->content_hash.s) < 0) { + goto out; + } + } else { + if (v2_authorisation(fp, http_request) != 0) { + goto out; + } + + if (hts_verbose >= HTS_LOG_INFO) fprintf(stderr, "hfile_s3: get_part v2 auth done\n"); + } + + if (ksprintf(&fp->range, "Range: bytes=%zu-%zu", fp->last_read, fp->last_read + fp->part_size - 1) < 0) { + goto out; + } + + if (hts_verbose >= HTS_LOG_INFO) { + fprintf(stderr, "hfile_s3: get_part: range set %s\n", fp->range.s); + fprintf(stderr, "hfile_s3: url %s\n", fp->url.s); + } + + curl_easy_reset(fp->curl); + + err = curl_easy_setopt(fp->curl, CURLOPT_URL, fp->url.s); + err |= curl_easy_setopt(fp->curl, CURLOPT_WRITEFUNCTION, recv_callback); + err |= curl_easy_setopt(fp->curl, CURLOPT_WRITEDATA, (void *)fp); + err |= curl_easy_setopt(fp->curl, CURLOPT_USERAGENT, curl.useragent.s); + err |= curl_easy_setopt(fp->curl, CURLOPT_VERBOSE, fp->verbose); + + if (resp) { + err |= curl_easy_setopt(fp->curl, CURLOPT_HEADERFUNCTION, response_callback); + err |= curl_easy_setopt(fp->curl, CURLOPT_HEADERDATA, (void *)resp); + } + + if (err != CURLE_OK) + goto out; + + headers = set_html_headers(fp, &fp->authorisation, &fp->date, &fp->content, &fp->token, &fp->range); + + if (!headers) + goto out; + + fp->ret = curl_easy_perform(fp->curl); + + if (fp->ret == CURLE_OK) { + ret = 0; + } + +out: + if (hts_verbose >= HTS_LOG_INFO) fprintf(stderr, "hfile_s3: get_part: ret %d\n", ret); + curl_slist_free_all(headers); + + return ret; +} + + +static ssize_t s3_read(hFILE *fpv, void *bufferv, size_t nbytes) { + hFILE_s3 *fp = (hFILE_s3 *)fpv; + char *buffer = (char *)bufferv; + size_t read = 0; + + /* Transfer data from the fp->buffer to the calling buffer. + If there is no data left in the fp->buffer, grab another chunk of + data from s3. + */ + while (fp->keep_going && read < nbytes) { + + if (fp->buffer.l && fp->last_read_buffer < fp->buffer.l) { + // copy data across + size_t to_copy; + size_t remaining = fp->buffer.l - fp->last_read_buffer; + size_t bytes_left = nbytes - read; + + if (hts_verbose > HTS_LOG_INFO) fprintf(stderr, "hfile_s3: read - remaining %zu read %zu bytes_left %zu, nbytes %zu\n", remaining, read, bytes_left, nbytes); + + if (bytes_left < remaining) { + to_copy = bytes_left; + } else { + to_copy = remaining; + } + + memcpy(buffer + read, fp->buffer.s + fp->last_read_buffer, to_copy); + read += to_copy; + fp->last_read_buffer += to_copy; + + if ((fp->buffer.l < fp->part_size) && (fp->last_read_buffer == fp->buffer.l)) { + fp->keep_going = 0; + } + } else { + int ret; + + ret = get_part(fp, NULL); + + if (!ret) { + long response_code; + CURLcode cret = curl_easy_getinfo(fp->curl, CURLINFO_RESPONSE_CODE, &response_code); + + if (cret != CURLE_OK || response_code > 300) { + errno = http_status_errno(response_code); + ret = -1; + } + } + + if (hts_verbose >= HTS_LOG_INFO) fprintf(stderr, "hfile_s3: read - read error %d\n", ret); + + if (ret < 0) + return ret; + + if (fp->buffer.l == 0) { + fp->keep_going = 0; + break; + } + + fp->last_read_buffer = 0; + fp->last_read = fp->last_read + fp->buffer.l; + } + } + + return read; +} + + +static off_t s3_seek(hFILE *fpv, off_t offset, int whence) { + hFILE_s3 *fp = (hFILE_s3 *)fpv; + off_t origin; + + if (fp->write) { + // lets not try and seek while writing + errno = ESPIPE; + return -1; + } + + // I am not sure we handle any seek other than one from the beginning + switch (whence) { + case SEEK_SET: + origin = 0; + break; + case SEEK_CUR: + // hseek() should convert this to SEEK_SET + errno = ENOSYS; + return -1; + case SEEK_END: + if (fp->file_size < 0) { + errno = ESPIPE; + return -1; + } + + origin = fp->file_size; + break; + default: + errno = EINVAL; + return -1; + } + + // Check 0 <= origin+offset < fp->file_size carefully, avoiding overflow + if ((offset < 0)? origin + offset < 0 + : (fp->file_size >= 0 && offset > fp->file_size - origin)) { + errno = EINVAL; + return -1; + } + + fp->keep_going = 1; + + size_t pos = origin + offset; // origin is really only useful if we can make the other modes work + + if (pos <= fp->last_read && pos > (fp->last_read - fp->buffer.l)) { + // within the current local buffer + fp->last_read_buffer = pos - (fp->last_read - fp->buffer.l); + } else { + fp->last_read = pos; + ks_clear(&fp->buffer); // resetting fp->buffer triggers a new remote read + } + + return fp->last_read; +} + + +/* + Unlike upload, download does not really need an initialisation. Here we use it to + get the size of the wanted files and as a test for redirects. +*/ +static int initialise_download(hFILE_s3 *fp, kstring_t *resp) { + + fp->last_read = 0; + ks_clear(resp); + + return get_part(fp, resp); +} + + +static int s3_close(hFILE *fpv) { + hFILE_s3 *fp = (hFILE_s3 *)fpv; + int ret; + + if (!fp->write) { + ret = s3_read_close(fpv); + } else { + ret = s3_write_close(fpv); + } + + return ret; +} + + +static const struct hFILE_backend s3_backend = { + s3_read, s3_write, s3_seek, NULL, s3_close +}; + +/* Read and write open here, need to be after the s3_backend declaration. */ +static hFILE *s3_write_open(const char *url, s3_auth_data *auth) { + hFILE_s3 *fp; + kstring_t response = {0, 0, NULL}; + kstring_t header = {0, 0, NULL}; + int ret, has_user_query = 0; + char *query_start; + const char *env; + CURLcode cret; + long response_code; + + + fp = (hFILE_s3 *)hfile_init(sizeof(hFILE_s3), "w", 0); + + if (fp == NULL) { + return NULL; + } + + if ((fp->curl = curl_easy_init()) == NULL) { + errno = ENOMEM; + goto error; + } + + fp->au = auth; + + initialise_local(fp); + initialise_authorisation_values(fp); + fp->aborted = 0; + fp->part_size = MINIMUM_S3_WRITE_SIZE; + fp->expand = 1; + fp->write = 1; + + if ((env = getenv("HTS_S3_PART_SIZE")) != NULL) { + int part_size = atoi(env) * 1024 * 1024; + + if (part_size > fp->part_size) + fp->part_size = part_size; + + fp->expand = 0; + } + + if (hts_verbose >= 8) { + fp->verbose = 1L; + } else { + fp->verbose = 0L; + } + + kputs(url, &fp->url); + + if ((query_start = strchr(fp->url.s, '?'))) { + has_user_query = 1;; + } + + ret = initialise_upload(fp, &header, &response, has_user_query); + cret = curl_easy_getinfo(fp->curl, CURLINFO_RESPONSE_CODE, &response_code); + + if (ret == 0) { + if (cret == CURLE_OK) { + if (response_code == S3_MOVED_PERMANENTLY) { + if (redirect_endpoint(fp, &header) == 0) { + ks_clear(&response); + ks_clear(&header); + + ret = initialise_upload(fp, &header, &response, has_user_query); + } + } else if (response_code == S3_BAD_REQUEST) { + if (handle_bad_request(fp, &response) == 0) { + ks_clear(&response); + ks_clear(&header); + + ret = initialise_upload(fp, &header, &response, has_user_query); + } + } + } else { + // unable to get a response code from curl + ret = -1; + } + } + + if (response_code >= 300) { + // something went wrong with the initialisation + + if (cret == CURLE_OK) { + if (hts_verbose >= HTS_LOG_INFO) { + if (report_s3_error(&response, response_code)) { + fprintf(stderr, "hfile_s3: warning, unable to report full S3 error status.\n"); + } + } + + errno = http_status_errno(response_code); + } + + ret = -1; + } + + if (ret) goto error; + + if (get_upload_id(fp, &response)) goto error; + + // start the completion message (a formatted list of parts) + if (kputs("\n", &fp->completion_message) == EOF) { + goto error; + } + + fp->part_no = 1; + + // user query string no longer a useful part of the URL + if (query_start) + *query_start = '\0'; + + fp->base.backend = &s3_backend; + ks_free(&response); + ks_free(&header); + + return &fp->base; + +error: + ks_free(&response); + ks_free(&header); + cleanup_local(fp); + free_authorisation_values(fp); + hfile_destroy((hFILE *)fp); + return NULL; +} + + +static hFILE *s3_read_open(const char *url, s3_auth_data *auth) { + hFILE_s3 *fp; + const char *env; + kstring_t response = {0, 0, NULL}; + kstring_t file_range = {0, 0, NULL}; + int ret; + CURLcode cret; + long response_code = 0; + + fp = (hFILE_s3 *)hfile_init(sizeof(hFILE_s3), "r", 0); + + if (fp == NULL) { + return NULL; + } + + if ((fp->curl = curl_easy_init()) == NULL) { + errno = ENOMEM; + goto error; + } + + fp->au = auth; + + initialise_local(fp); + initialise_authorisation_values(fp); + + fp->last_read = 0; // ranges start at 0 + fp->write = 0; + + if ((env = getenv("HTS_S3_READ_PART_SIZE")) != NULL) { + fp->part_size = atoi(env) * 1024 * 1024; + } else { + fp->part_size = READ_PART_SIZE; + } + + if (hts_verbose >= 8) { + fp->verbose = 1L; + } else { + fp->verbose = 0L; + } + + kputs(url, &fp->url); + + ret = initialise_download(fp, &response); + cret = curl_easy_getinfo(fp->curl, CURLINFO_RESPONSE_CODE, &response_code); + + if (ret == 0) { + if (cret == CURLE_OK) { + if (response_code == S3_MOVED_PERMANENTLY) { + ks_clear(&response); + + if (redirect_endpoint(fp, &response) == 0) { + ret = initialise_download(fp, &response); + } + } else if (response_code == S3_BAD_REQUEST) { + ks_clear(&response); + + if (handle_bad_request(fp, &fp->buffer) == 0) { + ret = initialise_download(fp, &response); + } + } + + // reget the response code (may not have changed) + cret = curl_easy_getinfo(fp->curl, CURLINFO_RESPONSE_CODE, &response_code); + } else { + // unable to get a response code from curl + ret = -1; + } + } + + if (response_code >= 300) { + // something went wrong with the initialisation + + if (cret == CURLE_OK) { + if (hts_verbose >= HTS_LOG_INFO) { + if (report_s3_error(&fp->buffer, response_code)) { + fprintf(stderr, "hfile_s3: warning, unable to report full S3 error status.\n"); + } + } + + errno = http_status_errno(response_code); + } + + ret = -1; + } + + if (ret) goto error; + + if (get_entry(response.s, "content-range: bytes ", "\n", &file_range) == EOF) { + fprintf(stderr, "hfile_s3: warning: failed to read file size.\n"); + fp->file_size = -1; + } else { + char *s; + if ((s = strchr(file_range.s, '/'))) { + fp->file_size = strtoll(s + 1, NULL, 10); + } else { + fp->file_size = -1; + } + } + + fp->last_read_buffer = 0; + fp->last_read = fp->last_read + fp->buffer.l; + fp->base.backend = &s3_backend; + fp->keep_going = 1; + + ks_free(&response); + ks_free(&file_range); + return &fp->base; + + + error: + ks_free(&response); + ks_free(&file_range); + cleanup_local(fp); + free_authorisation_values(fp); + hfile_destroy((hFILE *)fp); + return NULL; +} + + +static hFILE *s3_open_v4(const char *s3url, const char *mode, va_list *argsp) { + kstring_t url = { 0, 0, NULL }; + + s3_auth_data *ad = setup_auth_data(s3url, mode, 4, &url); + hFILE *fp = NULL; + + if (ad == NULL) { + return NULL; + } + + if (hts_verbose >= HTS_LOG_INFO) fprintf(stderr, "hfile_s3: s3_open_v4 url %s\n", url.s); + + if (*mode == 'r') { + fp = s3_read_open(url.s, ad); + } else { + fp = s3_write_open(url.s, ad); + } + + ks_free(&url); + if (!fp) + free_auth_data(ad); + + return fp; +} + + +static hFILE *s3_open_v2(const char *s3url, const char *mode, va_list *argsp) { + kstring_t url = { 0, 0, NULL }; + + s3_auth_data *ad = setup_auth_data(s3url, mode, 2, &url); + hFILE *fp = NULL; + + if (ad == NULL) { + return NULL; + } + + if (hts_verbose >= HTS_LOG_INFO) fprintf(stderr, "hfile_s3: s3_open_v2 url %s\n", url.s); + + if (*mode == 'r') { + fp = s3_read_open(url.s, ad); + } else { + fprintf(stderr, "hfile_s3: error - signature v2 not handled for writing.\n."); + } + + ks_free(&url); + if (!fp) + free_auth_data(ad); + + return fp; +} + + +static hFILE *hopen_s3(const char *url, const char *mode) +{ + hFILE *fp; + + if (getenv("HTS_S3_V2") == NULL) { // Force the v2 signature code + fp = s3_open_v4(url, mode, NULL); + } else { + fp = s3_open_v2(url, mode, NULL); + } + + return fp; +} + + +static hFILE *vhopen_s3(const char *url, const char *mode, va_list args0) +{ + hFILE *fp; + + // This should handle to vargs case. Not sure what vargs we want + // to handle + fp = hopen_s3(url, mode); + + return fp; +} + + +static void s3_exit(void) { + if (curl_share_cleanup(curl.share) == CURLSHE_OK) + curl.share = NULL; + + free(curl.useragent.s); + curl.useragent.l = curl.useragent.m = 0; curl.useragent.s = NULL; + curl_global_cleanup(); +} + + +int PLUGIN_GLOBAL(hfile_plugin_init,_s3)(struct hFILE_plugin *self) { + + static const struct hFILE_scheme_handler handler = + { hopen_s3, hfile_always_remote, "Amazon S3", + 2000 + 50, vhopen_s3 + }; + +#ifdef ENABLE_PLUGINS + // Embed version string for examination via strings(1) or what(1) + static const char id[] = + "@(#)hfile_s3 plugin (htslib)\t" HTS_VERSION_TEXT; + const char *version = strchr(id, '\t') + 1; + + if (hts_verbose >= 9) + fprintf(stderr, "[M::hfile_s3.init] version %s\n", + version); +#else + const char *version = hts_version(); +#endif + + const curl_version_info_data *info; + CURLcode err; + CURLSHcode errsh; + + err = curl_global_init(CURL_GLOBAL_ALL); + + if (err != CURLE_OK) { + // look at putting in an errno here + return -1; + } + + curl.share = curl_share_init(); + + if (curl.share == NULL) { + curl_global_cleanup(); + errno = EIO; + return -1; + } + + errsh = curl_share_setopt(curl.share, CURLSHOPT_LOCKFUNC, share_lock); + errsh |= curl_share_setopt(curl.share, CURLSHOPT_UNLOCKFUNC, share_unlock); + errsh |= curl_share_setopt(curl.share, CURLSHOPT_SHARE, CURL_LOCK_DATA_DNS); + + if (errsh != 0) { + curl_share_cleanup(curl.share); + curl_global_cleanup(); + errno = EIO; + return -1; + } + + info = curl_version_info(CURLVERSION_NOW); + ksprintf(&curl.useragent, "htslib/%s libcurl/%s", version, info->version); + + self->name = "Amazon S3"; + self->destroy = s3_exit; + + hfile_add_scheme_handler("s3", &handler); + hfile_add_scheme_handler("s3+http", &handler); + hfile_add_scheme_handler("s3+https", &handler); + + return 0; +} + diff --git a/hfile_s3_write.c b/hfile_s3_write.c deleted file mode 100644 index a501645ca..000000000 --- a/hfile_s3_write.c +++ /dev/null @@ -1,896 +0,0 @@ -/* - hfile_s3_write.c - Code to handle multipart uploading to S3. - - Copyright (C) 2019 Genome Research Ltd. - - Author: Andrew Whitwham - -Permission is hereby granted, free of charge, to any person obtaining a copy -of this software and associated documentation files (the "Software"), to deal -in the Software without restriction, including without limitation the rights -to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -copies of the Software, and to permit persons to whom the Software is -furnished to do so, subject to the following conditions: - -The above copyright notice and this permission notice shall be included in -all copies or substantial portions of the Software. - -THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL -THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING -FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER -DEALINGS IN THE SOFTWARE - - -S3 Multipart Upload -------------------- - -There are several steps in the Mulitipart upload. - - -1) Initiate Upload ------------------- - -Initiate the upload and get an upload ID. This ID is used in all other steps. - - -2) Upload Part --------------- - -Upload a part of the data. 5Mb minimum part size (except for the last part). -Each part is numbered and a successful upload returns an Etag header value that -needs to used for the completion step. - -Step repeated till all data is uploaded. - - -3) Completion -------------- - -Complete the upload by sending all the part numbers along with their associated -Etag values. - - -Optional - Abort ----------------- - -If something goes wrong this instructs the server to delete all the partial -uploads and abandon the upload process. - - -Andrew Whitwham, January 2019 -*/ - -#define HTS_BUILDING_LIBRARY // Enables HTSLIB_EXPORT, see htslib/hts_defs.h -#include - -#include -#include -#include -#ifdef __MSYS__ -#include -#endif -#include -#include - -#include "hfile_internal.h" -#ifdef ENABLE_PLUGINS -#include "version.h" -#endif -#include "htslib/hts.h" -#include "htslib/kstring.h" -#include "htslib/khash.h" - -#include - -#define MINIMUM_S3_WRITE_SIZE 5242880 -#define S3_MOVED_PERMANENTLY 301 -#define S3_BAD_REQUEST 400 - -// Lets the part memory size grow to about 1Gb giving a 2.5Tb max file size. -// Max. parts allowed by AWS is 10000, so use ceil(10000.0/9.0) -#define EXPAND_ON 1112 - -static struct { - kstring_t useragent; - CURLSH *share; - pthread_mutex_t share_lock; -} curl = { { 0, 0, NULL }, NULL, PTHREAD_MUTEX_INITIALIZER }; - -static void share_lock(CURL *handle, curl_lock_data data, - curl_lock_access access, void *userptr) { - pthread_mutex_lock(&curl.share_lock); -} - -static void share_unlock(CURL *handle, curl_lock_data data, void *userptr) { - pthread_mutex_unlock(&curl.share_lock); -} - -typedef int (*s3_auth_callback) (void *auth_data, char *, kstring_t*, char*, kstring_t*, kstring_t*, kstring_t*, kstring_t*, int); - -typedef int (*set_region_callback) (void *auth_data, kstring_t *region); - -typedef struct { - s3_auth_callback callback; - redirect_callback redirect_callback; - set_region_callback set_region_callback; - void *callback_data; -} s3_authorisation; - -typedef struct { - hFILE base; - CURL *curl; - CURLcode ret; - s3_authorisation *au; - kstring_t buffer; - kstring_t url; - kstring_t upload_id; - kstring_t completion_message; - int part_no; - int aborted; - size_t index; - long verbose; - int part_size; - int expand; -} hFILE_s3_write; - - -static void ksinit(kstring_t *s) { - s->l = 0; - s->m = 0; - s->s = NULL; -} - - -static void ksfree(kstring_t *s) { - free(s->s); - ksinit(s); -} - - -static size_t response_callback(void *contents, size_t size, size_t nmemb, void *userp) { - size_t realsize = size * nmemb; - kstring_t *resp = (kstring_t *)userp; - - if (kputsn((const char *)contents, realsize, resp) == EOF) { - return 0; - } - - return realsize; -} - - -static int get_entry(char *in, char *start_tag, char *end_tag, kstring_t *out) { - char *start; - char *end; - - if (!in) { - return EOF; - } - - start = strstr(in, start_tag); - if (!start) return EOF; - - start += strlen(start_tag); - end = strstr(start, end_tag); - - if (!end) return EOF; - - return kputsn(start, end - start, out); -} - - -static void cleanup_local(hFILE_s3_write *fp) { - ksfree(&fp->buffer); - ksfree(&fp->url); - ksfree(&fp->upload_id); - ksfree(&fp->completion_message); - curl_easy_cleanup(fp->curl); - free(fp->au); - -} - - -static void cleanup(hFILE_s3_write *fp) { - // free up authorisation data - fp->au->callback(fp->au->callback_data, NULL, NULL, NULL, NULL, NULL, NULL, NULL, 0); - cleanup_local(fp); -} - - -static struct curl_slist *set_html_headers(hFILE_s3_write *fp, kstring_t *auth, kstring_t *date, kstring_t *content, kstring_t *token) { - struct curl_slist *headers = NULL; - - headers = curl_slist_append(headers, "Content-Type:"); // get rid of this - headers = curl_slist_append(headers, "Expect:"); // and this - headers = curl_slist_append(headers, auth->s); - headers = curl_slist_append(headers, date->s); - headers = curl_slist_append(headers, content->s); - - if (token->l) { - headers = curl_slist_append(headers, token->s); - } - - curl_easy_setopt(fp->curl, CURLOPT_HTTPHEADER, headers); - - return headers; -} - - -/* - The partially uploaded file will hang around unless the delete command is sent. -*/ -static int abort_upload(hFILE_s3_write *fp) { - kstring_t content_hash = {0, 0, NULL}; - kstring_t authorisation = {0, 0, NULL}; - kstring_t url = {0, 0, NULL}; - kstring_t content = {0, 0, NULL}; - kstring_t canonical_query_string = {0, 0, NULL}; - kstring_t date = {0, 0, NULL}; - kstring_t token = {0, 0, NULL}; - int ret = -1; - struct curl_slist *headers = NULL; - char http_request[] = "DELETE"; - - if (ksprintf(&canonical_query_string, "uploadId=%s", fp->upload_id.s) < 0) { - goto out; - } - - if (fp->au->callback(fp->au->callback_data, http_request, NULL, - canonical_query_string.s, &content_hash, - &authorisation, &date, &token, 0) != 0) { - goto out; - } - - if (ksprintf(&url, "%s?%s", fp->url.s, canonical_query_string.s) < 0) { - goto out; - } - - if (ksprintf(&content, "x-amz-content-sha256: %s", content_hash.s) < 0) { - goto out; - } - - curl_easy_reset(fp->curl); - curl_easy_setopt(fp->curl, CURLOPT_CUSTOMREQUEST, http_request); - curl_easy_setopt(fp->curl, CURLOPT_USERAGENT, curl.useragent.s); - curl_easy_setopt(fp->curl, CURLOPT_URL, url.s); - - curl_easy_setopt(fp->curl, CURLOPT_VERBOSE, fp->verbose); - - headers = set_html_headers(fp, &authorisation, &date, &content, &token); - fp->ret = curl_easy_perform(fp->curl); - - if (fp->ret == CURLE_OK) { - ret = 0; - } - - out: - ksfree(&authorisation); - ksfree(&content); - ksfree(&content_hash); - ksfree(&url); - ksfree(&date); - ksfree(&canonical_query_string); - ksfree(&token); - curl_slist_free_all(headers); - - fp->aborted = 1; - cleanup(fp); - - return ret; -} - - -static int complete_upload(hFILE_s3_write *fp, kstring_t *resp) { - kstring_t content_hash = {0, 0, NULL}; - kstring_t authorisation = {0, 0, NULL}; - kstring_t url = {0, 0, NULL}; - kstring_t content = {0, 0, NULL}; - kstring_t canonical_query_string = {0, 0, NULL}; - kstring_t date = {0, 0, NULL}; - kstring_t token = {0, 0, NULL}; - int ret = -1; - struct curl_slist *headers = NULL; - char http_request[] = "POST"; - - if (ksprintf(&canonical_query_string, "uploadId=%s", fp->upload_id.s) < 0) { - return -1; - } - - // finish off the completion reply - if (kputs("\n", &fp->completion_message) < 0) { - goto out; - } - - if (fp->au->callback(fp->au->callback_data, http_request, - &fp->completion_message, canonical_query_string.s, - &content_hash, &authorisation, &date, &token, 0) != 0) { - goto out; - } - - if (ksprintf(&url, "%s?%s", fp->url.s, canonical_query_string.s) < 0) { - goto out; - } - - if (ksprintf(&content, "x-amz-content-sha256: %s", content_hash.s) < 0) { - goto out; - } - - curl_easy_reset(fp->curl); - curl_easy_setopt(fp->curl, CURLOPT_POST, 1L); - curl_easy_setopt(fp->curl, CURLOPT_POSTFIELDS, fp->completion_message.s); - curl_easy_setopt(fp->curl, CURLOPT_POSTFIELDSIZE, (long) fp->completion_message.l); - curl_easy_setopt(fp->curl, CURLOPT_WRITEFUNCTION, response_callback); - curl_easy_setopt(fp->curl, CURLOPT_WRITEDATA, (void *)resp); - curl_easy_setopt(fp->curl, CURLOPT_URL, url.s); - curl_easy_setopt(fp->curl, CURLOPT_USERAGENT, curl.useragent.s); - - curl_easy_setopt(fp->curl, CURLOPT_VERBOSE, fp->verbose); - - headers = set_html_headers(fp, &authorisation, &date, &content, &token); - fp->ret = curl_easy_perform(fp->curl); - - if (fp->ret == CURLE_OK) { - ret = 0; - } - - out: - ksfree(&authorisation); - ksfree(&content); - ksfree(&content_hash); - ksfree(&url); - ksfree(&date); - ksfree(&token); - ksfree(&canonical_query_string); - curl_slist_free_all(headers); - - return ret; -} - - -static size_t upload_callback(void *ptr, size_t size, size_t nmemb, void *stream) { - size_t realsize = size * nmemb; - hFILE_s3_write *fp = (hFILE_s3_write *)stream; - size_t read_length; - - if (realsize > (fp->buffer.l - fp->index)) { - read_length = fp->buffer.l - fp->index; - } else { - read_length = realsize; - } - - memcpy(ptr, fp->buffer.s + fp->index, read_length); - fp->index += read_length; - - return read_length; -} - - -static int upload_part(hFILE_s3_write *fp, kstring_t *resp) { - kstring_t content_hash = {0, 0, NULL}; - kstring_t authorisation = {0, 0, NULL}; - kstring_t url = {0, 0, NULL}; - kstring_t content = {0, 0, NULL}; - kstring_t canonical_query_string = {0, 0, NULL}; - kstring_t date = {0, 0, NULL}; - kstring_t token = {0, 0, NULL}; - int ret = -1; - struct curl_slist *headers = NULL; - char http_request[] = "PUT"; - - if (ksprintf(&canonical_query_string, "partNumber=%d&uploadId=%s", fp->part_no, fp->upload_id.s) < 0) { - return -1; - } - - if (fp->au->callback(fp->au->callback_data, http_request, &fp->buffer, - canonical_query_string.s, &content_hash, - &authorisation, &date, &token, 0) != 0) { - goto out; - } - - if (ksprintf(&url, "%s?%s", fp->url.s, canonical_query_string.s) < 0) { - goto out; - } - - fp->index = 0; - if (ksprintf(&content, "x-amz-content-sha256: %s", content_hash.s) < 0) { - goto out; - } - - curl_easy_reset(fp->curl); - - curl_easy_setopt(fp->curl, CURLOPT_UPLOAD, 1L); - curl_easy_setopt(fp->curl, CURLOPT_READFUNCTION, upload_callback); - curl_easy_setopt(fp->curl, CURLOPT_READDATA, fp); - curl_easy_setopt(fp->curl, CURLOPT_INFILESIZE_LARGE, (curl_off_t)fp->buffer.l); - curl_easy_setopt(fp->curl, CURLOPT_HEADERFUNCTION, response_callback); - curl_easy_setopt(fp->curl, CURLOPT_HEADERDATA, (void *)resp); - curl_easy_setopt(fp->curl, CURLOPT_URL, url.s); - curl_easy_setopt(fp->curl, CURLOPT_USERAGENT, curl.useragent.s); - - curl_easy_setopt(fp->curl, CURLOPT_VERBOSE, fp->verbose); - - headers = set_html_headers(fp, &authorisation, &date, &content, &token); - fp->ret = curl_easy_perform(fp->curl); - - if (fp->ret == CURLE_OK) { - ret = 0; - } - - out: - ksfree(&authorisation); - ksfree(&content); - ksfree(&content_hash); - ksfree(&url); - ksfree(&date); - ksfree(&token); - ksfree(&canonical_query_string); - curl_slist_free_all(headers); - - return ret; -} - - -static ssize_t s3_write(hFILE *fpv, const void *bufferv, size_t nbytes) { - hFILE_s3_write *fp = (hFILE_s3_write *)fpv; - const char *buffer = (const char *)bufferv; - - if (kputsn(buffer, nbytes, &fp->buffer) == EOF) { - return -1; - } - - if (fp->buffer.l > fp->part_size) { - // time to write out our data - kstring_t response = {0, 0, NULL}; - int ret; - - ret = upload_part(fp, &response); - - if (!ret) { - long response_code; - kstring_t etag = {0, 0, NULL}; - - curl_easy_getinfo(fp->curl, CURLINFO_RESPONSE_CODE, &response_code); - - if (response_code > 200) { - ret = -1; - } else { - if (get_entry(response.s, "ETag: \"", "\"", &etag) == EOF) { - ret = -1; - } else { - ksprintf(&fp->completion_message, "\t\n\t\t%d\n\t\t%s\n\t\n", - fp->part_no, etag.s); - - ksfree(&etag); - } - } - } - - ksfree(&response); - - if (ret) { - abort_upload(fp); - return -1; - } - - fp->part_no++; - fp->buffer.l = 0; - - if (fp->expand && (fp->part_no % EXPAND_ON == 0)) { - fp->part_size *= 2; - } - } - - return nbytes; -} - - -static int s3_close(hFILE *fpv) { - hFILE_s3_write *fp = (hFILE_s3_write *)fpv; - kstring_t response = {0, 0, NULL}; - int ret = 0; - - if (!fp->aborted) { - - if (fp->buffer.l) { - // write the last part - - ret = upload_part(fp, &response); - - if (!ret) { - long response_code; - kstring_t etag = {0, 0, NULL}; - - curl_easy_getinfo(fp->curl, CURLINFO_RESPONSE_CODE, &response_code); - - if (response_code > 200) { - ret = -1; - } else { - if (get_entry(response.s, "ETag: \"", "\"", &etag) == EOF) { - ret = -1; - } else { - ksprintf(&fp->completion_message, "\t\n\t\t%d\n\t\t%s\n\t\n", - fp->part_no, etag.s); - - ksfree(&etag); - } - } - } - - ksfree(&response); - - if (ret) { - abort_upload(fp); - return -1; - } - - fp->part_no++; - } - - if (fp->part_no > 1) { - ret = complete_upload(fp, &response); - - if (!ret) { - if (strstr(response.s, "CompleteMultipartUploadResult") == NULL) { - ret = -1; - } - } - } else { - ret = -1; - } - - if (ret) { - abort_upload(fp); - } else { - cleanup(fp); - } - } - - ksfree(&response); - - return ret; -} - - -static int redirect_endpoint(hFILE_s3_write *fp, kstring_t *head) { - int ret = -1; - - if (fp->au->redirect_callback) { - ret = fp->au->redirect_callback(fp->au->callback_data, 301, head, &fp->url); - } - - return ret; -} - -static int handle_bad_request(hFILE_s3_write *fp, kstring_t *resp) { - kstring_t region = {0, 0, NULL}; - int ret = -1; - - if (fp->au->set_region_callback) { - if (get_entry(resp->s, "", "", ®ion) == EOF) { - return -1; - } - - ret = fp->au->set_region_callback(fp->au->callback_data, ®ion); - - ksfree(®ion); - } - - return ret; -} - -static int initialise_upload(hFILE_s3_write *fp, kstring_t *head, kstring_t *resp, int user_query) { - kstring_t content_hash = {0, 0, NULL}; - kstring_t authorisation = {0, 0, NULL}; - kstring_t url = {0, 0, NULL}; - kstring_t content = {0, 0, NULL}; - kstring_t date = {0, 0, NULL}; - kstring_t token = {0, 0, NULL}; - int ret = -1; - struct curl_slist *headers = NULL; - char http_request[] = "POST"; - char delimiter = '?'; - - if (user_query) { - delimiter = '&'; - } - - if (fp->au->callback(fp->au->callback_data, http_request, NULL, "uploads=", - &content_hash, &authorisation, &date, &token, user_query) != 0) { - goto out; - } - - if (ksprintf(&url, "%s%cuploads", fp->url.s, delimiter) < 0) { - goto out; - } - - if (ksprintf(&content, "x-amz-content-sha256: %s", content_hash.s) < 0) { - goto out; - } - - curl_easy_setopt(fp->curl, CURLOPT_URL, url.s); - curl_easy_setopt(fp->curl, CURLOPT_POST, 1L); - curl_easy_setopt(fp->curl, CURLOPT_POSTFIELDS, ""); // send no data - curl_easy_setopt(fp->curl, CURLOPT_WRITEFUNCTION, response_callback); - curl_easy_setopt(fp->curl, CURLOPT_WRITEDATA, (void *)resp); - curl_easy_setopt(fp->curl, CURLOPT_HEADERFUNCTION, response_callback); - curl_easy_setopt(fp->curl, CURLOPT_HEADERDATA, (void *)head); - curl_easy_setopt(fp->curl, CURLOPT_USERAGENT, curl.useragent.s); - - curl_easy_setopt(fp->curl, CURLOPT_VERBOSE, fp->verbose); - - headers = set_html_headers(fp, &authorisation, &date, &content, &token); - fp->ret = curl_easy_perform(fp->curl); - - if (fp->ret == CURLE_OK) { - ret = 0; - } - - out: - ksfree(&authorisation); - ksfree(&content); - ksfree(&content_hash); - ksfree(&url); - ksfree(&date); - ksfree(&token); - curl_slist_free_all(headers); - - return ret; -} - - -static int get_upload_id(hFILE_s3_write *fp, kstring_t *resp) { - int ret = 0; - - ksinit(&fp->upload_id); - - if (get_entry(resp->s, "", "", &fp->upload_id) == EOF) { - ret = -1; - } - - return ret; -} - - -static const struct hFILE_backend s3_write_backend = { - NULL, s3_write, NULL, NULL, s3_close -}; - - -static hFILE *s3_write_open(const char *url, s3_authorisation *auth) { - hFILE_s3_write *fp; - kstring_t response = {0, 0, NULL}; - kstring_t header = {0, 0, NULL}; - int ret, has_user_query = 0; - char *query_start; - const char *env; - - - if (!auth || !auth->callback || !auth->callback_data) { - return NULL; - } - - fp = (hFILE_s3_write *)hfile_init(sizeof(hFILE_s3_write), "w", 0); - - if (fp == NULL) { - return NULL; - } - - if ((fp->curl = curl_easy_init()) == NULL) { - errno = ENOMEM; - goto error; - } - - if ((fp->au = calloc(1, sizeof(s3_authorisation))) == NULL) { - goto error; - } - - memcpy(fp->au, auth, sizeof(s3_authorisation)); - - ksinit(&fp->buffer); - ksinit(&fp->url); - ksinit(&fp->completion_message); - fp->aborted = 0; - - fp->part_size = MINIMUM_S3_WRITE_SIZE; - fp->expand = 1; - - if ((env = getenv("HTS_S3_PART_SIZE")) != NULL) { - int part_size = atoi(env) * 1024 * 1024; - - if (part_size > fp->part_size) - fp->part_size = part_size; - - fp->expand = 0; - } - - if (hts_verbose >= 8) { - fp->verbose = 1L; - } else { - fp->verbose = 0L; - } - - kputs(url + 4, &fp->url); - - if ((query_start = strchr(fp->url.s, '?'))) { - has_user_query = 1;; - } - - ret = initialise_upload(fp, &header, &response, has_user_query); - - if (ret == 0) { - long response_code; - - curl_easy_getinfo(fp->curl, CURLINFO_RESPONSE_CODE, &response_code); - - if (response_code == S3_MOVED_PERMANENTLY) { - if (redirect_endpoint(fp, &header) == 0) { - ksfree(&response); - ksfree(&header); - - ret = initialise_upload(fp, &header, &response, has_user_query); - } - } else if (response_code == S3_BAD_REQUEST) { - if (handle_bad_request(fp, &response) == 0) { - ksfree(&response); - ksfree(&header); - - ret = initialise_upload(fp, &header, &response, has_user_query); - } - } - - ksfree(&header); // no longer needed - } - - if (ret) goto error; - - if (get_upload_id(fp, &response)) goto error; - - // start the completion message (a formatted list of parts) - ksinit(&fp->completion_message); - - if (kputs("\n", &fp->completion_message) == EOF) { - goto error; - } - - fp->part_no = 1; - - // user query string no longer a useful part of the URL - if (query_start) - *query_start = '\0'; - - fp->base.backend = &s3_write_backend; - ksfree(&response); - - return &fp->base; - -error: - ksfree(&response); - cleanup_local(fp); - hfile_destroy((hFILE *)fp); - return NULL; -} - - -static hFILE *hopen_s3_write(const char *url, const char *mode) { - if (hts_verbose >= 1) { - fprintf(stderr, "[E::%s] s3w:// URLs should not be used directly; use s3:// instead.\n", __func__); - } - return NULL; -} - - -static int parse_va_list(s3_authorisation *auth, va_list args) { - const char *argtype; - - while ((argtype = va_arg(args, const char *)) != NULL) { - if (strcmp(argtype, "s3_auth_callback") == 0) { - auth->callback = va_arg(args, s3_auth_callback); - } else if (strcmp(argtype, "s3_auth_callback_data") == 0) { - auth->callback_data = va_arg(args, void *); - } else if (strcmp(argtype, "redirect_callback") == 0) { - auth->redirect_callback = va_arg(args, redirect_callback); - } else if (strcmp(argtype, "set_region_callback") == 0) { - auth->set_region_callback = va_arg(args, set_region_callback); - } else if (strcmp(argtype, "va_list") == 0) { - va_list *args2 = va_arg(args, va_list *); - - if (args2) { - if (parse_va_list(auth, *args2) < 0) return -1; - } - } else { - errno = EINVAL; - return -1; - } - } - - return 0; -} - - -static hFILE *vhopen_s3_write(const char *url, const char *mode, va_list args) { - hFILE *fp = NULL; - s3_authorisation auth = {NULL, NULL, NULL}; - - if (parse_va_list(&auth, args) == 0) { - fp = s3_write_open(url, &auth); - } - - return fp; -} - - -static void s3_write_exit(void) { - if (curl_share_cleanup(curl.share) == CURLSHE_OK) - curl.share = NULL; - - free(curl.useragent.s); - curl.useragent.l = curl.useragent.m = 0; curl.useragent.s = NULL; - curl_global_cleanup(); -} - - -int PLUGIN_GLOBAL(hfile_plugin_init,_s3_write)(struct hFILE_plugin *self) { - - static const struct hFILE_scheme_handler handler = - { hopen_s3_write, hfile_always_remote, "S3 Multipart Upload", - 2000 + 50, vhopen_s3_write - }; - -#ifdef ENABLE_PLUGINS - // Embed version string for examination via strings(1) or what(1) - static const char id[] = - "@(#)hfile_s3_write plugin (htslib)\t" HTS_VERSION_TEXT; - const char *version = strchr(id, '\t') + 1; - - if (hts_verbose >= 9) - fprintf(stderr, "[M::hfile_s3_write.init] version %s\n", - version); -#else - const char *version = hts_version(); -#endif - - const curl_version_info_data *info; - CURLcode err; - CURLSHcode errsh; - - err = curl_global_init(CURL_GLOBAL_ALL); - - if (err != CURLE_OK) { - // look at putting in an errno here - return -1; - } - - curl.share = curl_share_init(); - - if (curl.share == NULL) { - curl_global_cleanup(); - errno = EIO; - return -1; - } - - errsh = curl_share_setopt(curl.share, CURLSHOPT_LOCKFUNC, share_lock); - errsh |= curl_share_setopt(curl.share, CURLSHOPT_UNLOCKFUNC, share_unlock); - errsh |= curl_share_setopt(curl.share, CURLSHOPT_SHARE, CURL_LOCK_DATA_DNS); - - if (errsh != 0) { - curl_share_cleanup(curl.share); - curl_global_cleanup(); - errno = EIO; - return -1; - } - - info = curl_version_info(CURLVERSION_NOW); - ksprintf(&curl.useragent, "htslib/%s libcurl/%s", version, info->version); - - self->name = "S3 Multipart Upload"; - self->destroy = s3_write_exit; - - hfile_add_scheme_handler("s3w", &handler); - hfile_add_scheme_handler("s3w+http", &handler); - hfile_add_scheme_handler("s3w+https", &handler); - - return 0; -} diff --git a/htslib.mk b/htslib.mk index 57dffae29..3e131a430 100644 --- a/htslib.mk +++ b/htslib.mk @@ -97,7 +97,6 @@ HTSLIB_ALL = \ $(HTSSRCDIR)/hfile_gcs.c \ $(HTSSRCDIR)/hfile_libcurl.c \ $(HTSSRCDIR)/hfile_s3.c \ - $(HTSSRCDIR)/hfile_s3_write.c \ $(HTSSRCDIR)/hts.c \ $(HTSSRCDIR)/hts_expr.c \ $(HTSSRCDIR)/hts_internal.h \