@@ -13,19 +13,19 @@ import {
1313 Tag ,
1414 UploadPartCommand ,
1515} from "@aws-sdk/client-s3" ;
16- import { AbortController } from "@smithy/abort-controller" ;
1716import {
1817 EndpointParameterInstructionsSupplier ,
1918 getEndpointFromInstructions ,
2019 toEndpointV1 ,
2120} from "@smithy/middleware-endpoint" ;
2221import { HttpRequest } from "@smithy/protocol-http" ;
2322import { extendedEncodeURIComponent } from "@smithy/smithy-client" ;
24- import type { AbortController as IAbortController , AbortSignal as IAbortSignal , Endpoint } from "@smithy/types" ;
23+ import type { Endpoint } from "@smithy/types" ;
2524import { EventEmitter } from "events" ;
2625
2726import { byteLength } from "./bytelength" ;
2827import { getChunk } from "./chunker" ;
28+ import { wireSignal } from "./signal" ;
2929import { BodyDataTypes , Options , Progress } from "./types" ;
3030
3131export interface RawDataPart {
@@ -59,8 +59,7 @@ export class Upload extends EventEmitter {
5959 private bytesUploadedSoFar : number ;
6060
6161 // used in the upload.
62- private abortController : IAbortController ;
63- private concurrentUploaders : Promise < void > [ ] = [ ] ;
62+ private abortController = new AbortController ( ) ;
6463 private createMultiPartPromise ?: Promise < CreateMultipartUploadCommandOutput > ;
6564 private abortMultipartUploadCommand : AbortMultipartUploadCommand | null = null ;
6665
@@ -93,7 +92,9 @@ export class Upload extends EventEmitter {
9392 // set progress defaults
9493 this . totalBytes = byteLength ( this . params . Body ) ;
9594 this . bytesUploadedSoFar = 0 ;
96- this . abortController = options . abortController ?? new AbortController ( ) ;
95+
96+ wireSignal ( this . abortController , options . abortSignal ) ;
97+ wireSignal ( this . abortController , options . abortController ?. signal ) ;
9798 }
9899
99100 async abort ( ) : Promise < void > {
@@ -111,7 +112,12 @@ export class Upload extends EventEmitter {
111112 ) ;
112113 }
113114 this . sent = true ;
114- return await Promise . race ( [ this . __doMultipartUpload ( ) , this . __abortTimeout ( this . abortController . signal ) ] ) ;
115+
116+ try {
117+ return await this . __doMultipartUpload ( ) ;
118+ } finally {
119+ this . abortController . abort ( ) ;
120+ }
115121 }
116122
117123 public on ( event : "httpUploadProgress" , listener : ( progress : Progress ) => void ) : this {
@@ -143,7 +149,12 @@ export class Upload extends EventEmitter {
143149 eventEmitter . on ( "xhr.upload.progress" , uploadEventListener ) ;
144150 }
145151
146- const resolved = await Promise . all ( [ this . client . send ( new PutObjectCommand ( params ) ) , clientConfig ?. endpoint ?.( ) ] ) ;
152+ const resolved = await Promise . all ( [
153+ this . client . send ( new PutObjectCommand ( params ) , {
154+ abortSignal : this . abortController . signal ,
155+ } ) ,
156+ clientConfig ?. endpoint ?.( ) ,
157+ ] ) ;
147158 const putResult = resolved [ 0 ] ;
148159 let endpoint : Endpoint | undefined = resolved [ 1 ] ;
149160
@@ -291,7 +302,10 @@ export class Upload extends EventEmitter {
291302 UploadId : this . uploadId ,
292303 Body : dataPart . data ,
293304 PartNumber : dataPart . partNumber ,
294- } )
305+ } ) ,
306+ {
307+ abortSignal : this . abortController . signal ,
308+ }
295309 ) ;
296310
297311 if ( eventEmitter !== null ) {
@@ -333,28 +347,27 @@ export class Upload extends EventEmitter {
333347
334348 private async __doMultipartUpload ( ) : Promise < CompleteMultipartUploadCommandOutput > {
335349 const dataFeeder = getChunk ( this . params . Body , this . partSize ) ;
336- const concurrentUploaderFailures : Error [ ] = [ ] ;
350+ const concurrentUploads : Promise < void > [ ] = [ ] ;
337351
338352 for ( let index = 0 ; index < this . queueSize ; index ++ ) {
339- const currentUpload = this . __doConcurrentUpload ( dataFeeder ) . catch ( ( err ) => {
340- concurrentUploaderFailures . push ( err ) ;
341- } ) ;
342- this . concurrentUploaders . push ( currentUpload ) ;
353+ const currentUpload = this . __doConcurrentUpload ( dataFeeder ) ;
354+ concurrentUploads . push ( currentUpload ) ;
343355 }
344356
345- await Promise . all ( this . concurrentUploaders ) ;
346- if ( concurrentUploaderFailures . length >= 1 ) {
357+ /**
358+ * Previously, each promise in concurrentUploads could potentially throw
359+ * and immediately return control to user code. However, we want to wait for
360+ * all uploaders to finish before calling AbortMultipartUpload to avoid
361+ * stranding uploaded parts.
362+ *
363+ * We throw only the first error to be consistent with prior behavior,
364+ * but may consider combining the errors into a report in the future.
365+ */
366+ const results = await Promise . allSettled ( concurrentUploads ) ;
367+ const firstFailure = results . find ( ( result ) => result . status === "rejected" ) ;
368+ if ( firstFailure ) {
347369 await this . markUploadAsAborted ( ) ;
348- /**
349- * Previously, each promise in concurrentUploaders could potentially throw
350- * and immediately return control to user code. However, we want to wait for
351- * all uploaders to finish before calling AbortMultipartUpload to avoid
352- * stranding uploaded parts.
353- *
354- * We throw only the first error to be consistent with prior behavior,
355- * but may consider combining the errors into a report in the future.
356- */
357- throw concurrentUploaderFailures [ 0 ] ;
370+ throw firstFailure . reason ;
358371 }
359372
360373 if ( this . abortController . signal . aborted ) {
@@ -417,16 +430,6 @@ export class Upload extends EventEmitter {
417430 }
418431 }
419432
420- private async __abortTimeout ( abortSignal : IAbortSignal ) : Promise < never > {
421- return new Promise ( ( resolve , reject ) => {
422- abortSignal . onabort = ( ) => {
423- const abortError = new Error ( "Upload aborted." ) ;
424- abortError . name = "AbortError" ;
425- reject ( abortError ) ;
426- } ;
427- } ) ;
428- }
429-
430433 private __validateInput ( ) : void {
431434 if ( ! this . params ) {
432435 throw new Error ( `InputError: Upload requires params to be passed to upload.` ) ;
0 commit comments