diff --git a/src/Exception/MultipartDownloadException.php b/src/Exception/MultipartDownloadException.php new file mode 100644 index 0000000000..948ff493b7 --- /dev/null +++ b/src/Exception/MultipartDownloadException.php @@ -0,0 +1,64 @@ + 'downloading parts to']); + $msg .= ". The following parts had errors:\n"; + /** @var $error AwsException */ + foreach ($prev as $part => $error) { + $msg .= "- Part {$part}: " . $error->getMessage(). "\n"; + } + } elseif ($prev instanceof AwsException) { + switch ($prev->getCommand()->getName()) { + case 'GetObject': + case 'GetObject': + $action = 'initiating'; + break; + case 'GetObject': + $action = 'completing'; + break; + } + if (isset($action)) { + $msg = strtr($msg, ['performing' => $action]); + } + $msg .= ": {$prev->getMessage()}"; + } + + if (!$prev instanceof \Exception) { + $prev = null; + } + + parent::__construct($msg, 0, $prev); + $this->state = $state; + } + + /** + * Get the state of the transfer + * + * @return DownloadState + */ + public function getState() + { + return $this->state; + } +} + diff --git a/src/Multipart/AbstractDownloadManager.php b/src/Multipart/AbstractDownloadManager.php new file mode 100644 index 0000000000..97a7b0c2ef --- /dev/null +++ b/src/Multipart/AbstractDownloadManager.php @@ -0,0 +1,339 @@ + null, + 'state' => null, + 'concurrency' => self::DEFAULT_CONCURRENCY, + 'prepare_data_source' => null, + 'before_initiate' => null, + 'before_download' => null, + 'before_complete' => null, + 'exception_class' => 'Aws\Exception\MultipartDownloadException', + ]; + + /** @var Client Client used for the download. */ + protected $client; + + /** @var array Configuration used to perform the download. */ + protected $config; + + /** @var array Service-specific information about the download workflow. */ + protected $info; + + /** @var PromiseInterface Promise that represents the multipart download. */ + protected $promise; + + /** @var DownloadState State used to manage the download. */ + protected $state; + + /** + * @param Client $client + * @param array $config + */ + public function __construct(Client $client, array $config = []) + { + $this->client = $client; + $this->info = $this->loadDownloadWorkflowInfo(); + $this->config = $config + self::$defaultConfig; + $this->state = $this->determineState(); + } + + /** + * Returns the current state of the download. + * + * @return DownloadState + */ + public function getState() + { + return $this->state; + } + + /** + * Download the source using multipart download operations. + * + * @return Result The result of the GetObject operations. + * @throws \LogicException if the download is already complete or aborted. + * @throws MultipartDownloadException if a download operation fails. + */ + public function download() + { + return $this->promise()->wait(); + } + + /** + * Download the source asynchronously using multipart download operations. + * + * @return PromiseInterface + */ + public function promise() + { + if ($this->promise) { + return $this->promise; + } + + return $this->promise = Promise\Coroutine::of(function () { + // Initiate the download. + if ($this->state->isCompleted()) { + throw new \LogicException('This multipart download has already ' + . 'been completed or aborted.' + ); + } + + if (!$this->state->isInitiated()) { + // Execute the prepare callback. + if (is_callable($this->config["prepare_data_source"])) { + $this->config["prepare_data_source"](); + } + $type = $this->getDownloadType(); + $result = (yield $this->execCommand('initiate', $this->getInitiateParams($type))); + $this->determineSourceSize($result['ContentRange']); + if ($this->getState()->displayProgress) { + $this->state->setProgressThresholds($this->sourceSize); + } + $this->setStreamPositionArray(); + $this->state->setStatus(DownloadState::INITIATED); + if (isset($type['multipart'])){ + $this->handleResult(1, $result); + } else { + $this->handleResult($type['configParam'], $result); + } + } + // end download if PartNumber or Range is set, or object is only one part total. + if (isset($this->config['partnumber']) + or isset($this->config['range']) + or $result['PartsCount']==1){ + $this->state->setStatus(DownloadState::COMPLETED); + if ($this->getState()->displayProgress) { + echo end($this->state->progressBar); + } + } else { + // Create a command pool from a generator that yields DownloadPart + // commands for each download part. + $resultHandler = $this->getResultHandler($errors); + $commands = new CommandPool( + $this->client, + $this->getDownloadCommands($resultHandler), + [ + 'concurrency' => $this->config['concurrency'], + 'before' => $this->config['before_download'], + ] + ); + + // Execute the pool of commands concurrently, and process errors. + yield $commands->promise(); + if ($errors) { + throw new $this->config['exception_class']($this->state, $errors); + } + + // Complete the multipart download. + $this->state->setStatus(DownloadState::COMPLETED); + }})->otherwise($this->buildFailureCatch()); + } + + private function transformException($e) + { + // Throw errors from the operations as a specific Multipart error. + if ($e instanceof AwsException) { + $e = new $this->config['exception_class']($this->state, $e); + } + throw $e; + } + + private function buildFailureCatch() + { + if (interface_exists("Throwable")) { + return function (\Throwable $e) { + return $this->transformException($e); + }; + } else { + return function (\Exception $e) { + return $this->transformException($e); + }; + } + } + + protected function getConfig() + { + return $this->config; + } + + /** + * Provides service-specific information about the multipart download + * workflow. + * + * This array of data should include the keys: 'command', 'id', and 'part_num'. + * + * @return array + */ + abstract protected function loadDownloadWorkflowInfo(); + + abstract protected function determineSourceSize($size); + + /** + * Determines the part size to use for download parts. + * + * Examines the provided partSize value and the source to determine the + * best possible part size. + * + * @throws \InvalidArgumentException if the part size is invalid. + * + * @return int + */ + abstract protected function determinePartSize(); + + /** + * Uses information from the Command and Result to determine which part was + * downloaded and mark it as downloaded in the download's state, and sends + * information to be written to the destination stream. + * + * @param CommandInterface $command + * @param ResultInterface $result + */ + abstract protected function handleResult( + CommandInterface $command, + ResultInterface $result + ); + + /** + * Gets the service-specific parameters used to initiate the download. + * + * @param array $configType Service-specific params for the operation. + * + * @return array + */ + abstract protected function getInitiateParams($configType); + + /** + * Gets the service-specific parameters used to complete the download + * from the config. + * + * @return array + */ + abstract protected function getDownloadType(); + + /** + * Based on the config and service-specific workflow info, creates a + * `Promise` for a `DownloadState` object. + * + * @return PromiseInterface A `Promise` that resolves to a `DownloadState`. + */ + private function determineState() + { + // If the state was provided via config, then just use it. + if ($this->config['state'] instanceof DownloadState) { + return $this->config['state']; + } + + // Otherwise, construct a new state from the provided identifiers. + // TODO delete id logic + $required = $this->info['id']; + $id = []; + foreach ($required as $key => $param) { + if (!$this->config[$key]) { + throw new IAE('You must provide a value for "' . $key . '" in ' + . 'your config for the MultipartDownloader for ' + . $this->client->getApi()->getServiceFullName() . '.'); + } + $id[$param] = $this->config[$key]; + } + $state = new DownloadState($id); + $state->setPartSize($this->determinePartSize()); + + return $state; + } + + /** + * Executes a MUP command with all the parameters for the operation. + * + * @param string $operation Name of the operation. + * @param array $params Service-specific params for the operation. + * + * @return PromiseInterface + */ + protected function execCommand($operation, array $params) + { + // Create the command. + $command = $this->client->getCommand( + $this->info['command'][$operation], + $params + $this->state->getId() + ); + + // Execute the before callback. + if (is_callable($this->config["before_{$operation}"])) { + $this->config["before_{$operation}"]($command); + } + + // Execute the command asynchronously and return the promise. + return $this->client->executeAsync($command); + } + + /** + * Returns a middleware for processing responses of part download operations. + * + * - Adds an onFulfilled callback that calls the service-specific + * handleResult method on the Result of the operation. + * - Adds an onRejected callback that adds the error to an array of errors. + * - Has a passedByRef $errors arg that the exceptions get added to. The + * caller should use that &$errors array to do error handling. + * + * @param array $errors Errors from download operations are added to this. + * + * @return callable + */ + protected function getResultHandler(&$errors = []) + { + return function (callable $handler) use (&$errors) { + return function ( + CommandInterface $command, + RequestInterface $request = null + ) use ($handler, &$errors) { + return $handler($command, $request)->then( + function (ResultInterface $result) use ($command) { + $this->handleResult($command, $result); + return $result; + }, + function (AwsException $e) use (&$errors) { + $errors[$e->getCommand()[$this->info['part_num']]] = $e; + return new Result(); + } + ); + }; + }; + } + + /** + * Creates a generator that yields part data for the download's source. + * + * Yields associative arrays of parameters that are ultimately merged in + * with others to form the complete parameters of a command. This can + * include the PartNumber or Range parameter. + * + * @param callable $resultHandler + * + * @return \Generator + */ + abstract protected function getDownloadCommands(callable $resultHandler); +} diff --git a/src/Multipart/AbstractDownloader.php b/src/Multipart/AbstractDownloader.php new file mode 100644 index 0000000000..9b74a2af7d --- /dev/null +++ b/src/Multipart/AbstractDownloader.php @@ -0,0 +1,114 @@ +config = $config; + parent::__construct($client, $config); + } + + protected function getDownloadCommands(callable $resultHandler) + { + // Determine if the source can be seeked. + for ($partNumber = 1; $this->isEof($this->partPosition); $partNumber++) { + // If we haven't already downloaded this part, yield a new part. + if (!$this->state->hasPartBeenDownloaded($partNumber)) { + $partStartPosition = $this->partPosition; + if (!($data = $this->createPart($partStartPosition, $partNumber))) { + break; + } + $command = $this->client->getCommand( + $this->info['command']['download'], + $data + $this->state->getId() + ); + $command->getHandlerList()->appendSign($resultHandler, 'mup'); + $numberOfParts = ($this->getNumberOfParts($this->state->getPartSize())); + if (isset($numberOfParts) && $partNumber > $numberOfParts) { + throw new $this->config['exception_class']( + $this->state, + new AwsException( + "Maximum part number for this job exceeded, file has likely been corrupted." . + " Please restart this upload.", + $command + ) + ); + } + yield $command; + } + // Advance the source's offset if not already advanced. + $this->partPosition += $this->state->getPartSize(); + } + } + + /** + * Generates the parameters for a download part by analyzing a range of the + * source starting from the current offset up to the part size. + * + * @param numeric $partStartPosition + * @param int $partNumber + * + * @return array|null + */ + abstract protected function createPart($partStartPosition, $partNumber); + + /** + * Checks if the source is at EOF. + * + * @param numeric $position + * + * @return bool + */ + private function isEof($position) + { + return $position <= $this->sourceSize; + } + + /** + * Determines and sets size of the source. + * + * @param mixed $range + * + */ + protected function determineSourceSize($range) + { + $size = substr($range, strpos($range, "/") + 1); + $this->sourceSize = $size; + } + + /** + * Determines and sets number of parts. + * + * @param numeric $partSize + * + * @return float|null + */ + protected function getNumberOfParts($partSize) + { + if ($this->sourceSize) { + return ceil($this->sourceSize/$partSize); + } + return null; + } +} \ No newline at end of file diff --git a/src/Multipart/AbstractUploader.php b/src/Multipart/AbstractUploader.php index 75e6794660..f0fd6e56e6 100644 --- a/src/Multipart/AbstractUploader.php +++ b/src/Multipart/AbstractUploader.php @@ -147,4 +147,4 @@ protected function getNumberOfParts($partSize) } return null; } -} +} \ No newline at end of file diff --git a/src/Multipart/DownloadState.php b/src/Multipart/DownloadState.php new file mode 100644 index 0000000000..d422bdc073 --- /dev/null +++ b/src/Multipart/DownloadState.php @@ -0,0 +1,208 @@ +id = $id; + } + + /** + * Get the download's ID, which is a tuple of parameters that can uniquely + * identify the download. + * + * @return array + */ + public function getId() + { + return $this->id; + } + + /** + * Set's the "download_id", or 3rd part of the download's ID. This typically + * only needs to be done after initiating a download. + * + * @param string $key The param key of the download_id. + * @param string $value The param value of the download_id. + */ +// public function setDownloadId($key, $value) +// { +// $this->id[$key] = $value; +// } + + /** + * Get the part size. + * + * @return int + */ + public function getPartSize() + { + return $this->partSize; + } + + /** + * Set the part size. + * + * @param $partSize int Size of download parts. + */ + public function setPartSize($partSize) + { + $this->partSize = $partSize; + } + + /** + * Sets the 1/8th thresholds array. $totalSize is only sent if + * 'track_download' is true. + * + * @param $totalSize numeric Size of object to download. + * + * @return array + */ + public function setProgressThresholds($totalSize) + { + if(!is_numeric($totalSize)) { + throw new \InvalidArgumentException( + 'The total size of the upload must be a number.' + ); + } + + $this->progressThresholds[0] = 0; + for ($i=1;$i<=8;$i++) { + $this->progressThresholds []= round($totalSize*($i/8)); + } + return $this->progressThresholds; + } + + /** + * Prints progress of download. + * + * @param $totalUploaded numeric Size of download so far. + */ + public function getDisplayProgress($totalUploaded) + { + if (!is_numeric($totalUploaded)) { + throw new \InvalidArgumentException( + 'The size of the bytes being uploaded must be a number.' + ); + } + + if ($this->displayProgress) { + while (!empty($this->progressBar) + && $totalUploaded >= $this->progressThresholds[0]) { + echo array_shift($this->progressBar); + array_shift($this->progressThresholds); + } + } + } + + /** + * Marks a part as being downloaded. + * + * @param string $partNumber The part number. + * @param array $partData Data from the download operation that needs to be + * recalled during the complete operation. + */ + public function markPartAsDownloaded($partNumber, array $partData = []) + { + $this->downloadedParts[$partNumber] = $partData; + } + + /** + * Returns whether a part has been downloaded. + * + * @param int $partNumber The part number. + * + * @return bool + */ + public function hasPartBeenDownloaded($partNumber) + { + return isset($this->downloadedParts[$partNumber]); + } + + /** + * Returns a sorted list of all the downloaded parts. + * + * @return array + */ + public function getDownloadedParts() + { + ksort($this->downloadedParts); + return $this->downloadedParts; + } + + /** + * Set the status of the download. + * + * @param int $status Status is an integer code defined by the constants + * CREATED, INITIATED, and COMPLETED on this class. + */ + + public function setStatus($status) + { + $this->status = $status; + } + + /** + * Determines whether the download state is in the INITIATED status. + * + * @return bool + */ + public function isInitiated() + { + return $this->status === self::INITIATED; + } + + /** + * Determines whether the download state is in the COMPLETED status. + * + * @return bool + */ + public function isCompleted() + { + return $this->status === self::COMPLETED; + } +} \ No newline at end of file diff --git a/src/Multipart/UploadState.php b/src/Multipart/UploadState.php index 4108c4f13b..94c1354aaf 100644 --- a/src/Multipart/UploadState.php +++ b/src/Multipart/UploadState.php @@ -13,6 +13,18 @@ class UploadState const INITIATED = 1; const COMPLETED = 2; + protected $progressBar = [ + "Transfer initiated...\n| | 0.0%\n", + "|== | 12.5%\n", + "|===== | 25.0%\n", + "|======= | 37.5%\n", + "|========== | 50.0%\n", + "|============ | 62.5%\n", + "|=============== | 75.0%\n", + "|================= | 87.5%\n", + "|====================| 100.0%\nTransfer complete!\n" + ]; + /** @var array Params used to identity the upload. */ private $id; @@ -25,6 +37,10 @@ class UploadState /** @var int Identifies the status the upload. */ private $status = self::CREATED; + private $progressThresholds = []; + +// private $displayUploadProgress; + /** * @param array $id Params used to identity the upload. */ @@ -76,6 +92,35 @@ public function setPartSize($partSize) $this->partSize = $partSize; } + public function setProgressThresholds($totalSize) + { + if(!is_int($totalSize)) { + throw new \InvalidArgumentException('The total size of the upload must be an int.'); + } + + $this->progressThresholds[0] = 0; + for ($i=1;$i<=8;$i++) { + $this->progressThresholds []= round($totalSize*($i/8)); + } + $this->progressBar = array_combine($this->progressThresholds, $this->progressBar); + return $this->progressThresholds; + } + + public function displayProgress($totalUploaded) + { + if(!is_int($totalUploaded)) { + throw new \InvalidArgumentException('The size of the bytes being uploaded must be an int.'); + } + + while ($this->progressThresholds + && !empty($this->progressBar) + && $totalUploaded >= array_key_first($this->progressBar)) + { + echo $this->progressBar[array_key_first($this->progressBar)]; + unset($this->progressBar[array_key_first($this->progressBar)]); + } + } + /** * Marks a part as being uploaded. * @@ -108,7 +153,6 @@ public function hasPartBeenUploaded($partNumber) public function getUploadedParts() { ksort($this->uploadedParts); - return $this->uploadedParts; } @@ -118,6 +162,7 @@ public function getUploadedParts() * @param int $status Status is an integer code defined by the constants * CREATED, INITIATED, and COMPLETED on this class. */ + public function setStatus($status) { $this->status = $status; @@ -142,4 +187,4 @@ public function isCompleted() { return $this->status === self::COMPLETED; } -} +} \ No newline at end of file diff --git a/src/S3/Exception/S3MultipartDownloadException.php b/src/S3/Exception/S3MultipartDownloadException.php new file mode 100644 index 0000000000..410a747a90 --- /dev/null +++ b/src/S3/Exception/S3MultipartDownloadException.php @@ -0,0 +1,85 @@ +collectPathInfo($error->getCommand()); + } elseif ($prev instanceof AwsException) { + $this->collectPathInfo($prev->getCommand()); + } + parent::__construct($state, $prev); + } + + /** + * Get the Bucket information of the transfer object + * + * @return string|null Returns null when 'Bucket' information + * is unavailable. + */ + public function getBucket() + { + return $this->bucket; + } + + /** + * Get the Key information of the transfer object + * + * @return string|null Returns null when 'Key' information + * is unavailable. + */ + public function getKey() + { + return $this->key; + } + + /** + * Get the source file name of the transfer object + * + * @return string|null Returns null when metadata of the stream + * wrapped in 'Body' parameter is unavailable. + */ + public function getSourceFileName() + { + return $this->filename; + } + + /** + * Collect file path information when accessible. (Bucket, Key) + * + * @param CommandInterface $cmd + */ + private function collectPathInfo(CommandInterface $cmd) + { + if (empty($this->bucket) && isset($cmd['Bucket'])) { + $this->bucket = $cmd['Bucket']; + } + if (empty($this->key) && isset($cmd['Key'])) { + $this->key = $cmd['Key']; + } + if (empty($this->filename) && isset($cmd['Body'])) { + $this->filename = $cmd['Body']->getMetadata('uri'); + } + } +} + diff --git a/src/S3/MultipartCopy.php b/src/S3/MultipartCopy.php index 5b26dea79e..c79a135e75 100644 --- a/src/S3/MultipartCopy.php +++ b/src/S3/MultipartCopy.php @@ -75,6 +75,12 @@ public function __construct( $client, array_change_key_case($config) + ['source_metadata' => null] ); + + if (isset($config['track_upload']) && $config['track_upload']) { + $this->getState()->setProgressThresholds( + $this->sourceMetadata["ContentLength"] + ); + } } /** diff --git a/src/S3/MultipartDownloader.php b/src/S3/MultipartDownloader.php new file mode 100644 index 0000000000..02445a3711 --- /dev/null +++ b/src/S3/MultipartDownloader.php @@ -0,0 +1,174 @@ +destStream = $this->createDestStream($dest); + parent::__construct($client, $dest, array_change_key_case($config) + [ + 'bucket' => null, + 'key' => null, + 'exception_class' => S3MultipartDownloadException::class, + ]); + + if (isset($config['checksum_validation_enabled']) + && !$config['checksum_validation_enabled']) { + $this->checksumMode = false; + } + if (isset($this->config['track_download']) && ($this->config['track_download'])) { + $this->getState()->displayProgress = true; + } + } + + protected function loadDownloadWorkflowInfo() + { + return [ + 'command' => [ + 'initiate' => 'GetObject', + 'download' => 'GetObject' + ], + 'id' => [ + 'bucket' => 'Bucket', + 'key' => 'Key', +// 'download_id' => 'DownloadId', + ], + 'part_num' => 'PartNumber', + ]; + } + + protected function createPart($partStartPosition, $number) + { + // Initialize the array of part data that will be returned. + $data = []; + + // Apply custom params to DownloadPart data + $config = $this->getConfig(); + $params = $config['params'] ?? []; + foreach ($params as $k => $v) { + $data[$k] = $v; + } + + // Set Range or PartNumber params + if (isset($this->config['range']) or + isset($this->config['multipartdownloadtype']) + && $this->config['multipartdownloadtype'] == 'Range'){ + $partEndPosition = $partStartPosition+$this->state->getPartSize(); + $data['Range'] = 'bytes='.$partStartPosition.'-'.$partEndPosition; + } else { + $data['PartNumber'] = $number; + } + + if (isset($config['add_content_md5']) + && $config['add_content_md5'] === true + ) { + $data['AddContentMD5'] = true; + } + + return $data; + } + + protected function extractETag(ResultInterface $result) + { + return $result['ETag']; + } + + /** + * Sets streamPositionArray with information on beginning of each part, + * depending on config. + */ + public function setStreamPositionArray() + { + $parts = ceil($this->sourceSize/$this->state->getPartSize()); + $position = 0; + if (isset($this->config['range']) or + (isset($this->config['multipartdownloadtype']) && + $this->config['multipartdownloadtype'] == 'Range')) { + for ($i = 1; $i <= $parts; $i++) { + $this->streamPositionArray [$position] = $i; + $position += $this->state->getPartSize(); + } + } else { + for ($i = 1; $i <= $parts; $i++) { + $this->streamPositionArray [$i] = $position; + $position += $this->state->getPartSize(); + } + } + } + + /** + * Turns the provided destination into a writable stream and stores it. + * + * @param string $filePath Destination to turn into stream. + * + * @return Psr7\LazyOpenStream + */ + protected function createDestStream($filePath) + { + return new Psr7\LazyOpenStream($filePath, 'w'); + } +} diff --git a/src/S3/MultipartDownloadingTrait.php b/src/S3/MultipartDownloadingTrait.php new file mode 100644 index 0000000000..ceb0a3f4a9 --- /dev/null +++ b/src/S3/MultipartDownloadingTrait.php @@ -0,0 +1,176 @@ + $bucket, + 'Key' => $key + ]); + + $info = $client->headObject([ + 'Bucket' => $bucket, + 'Key' => $key, +// 'ChecksumMode' => 'ENABLED' + ]); + + $totalSize = $info['ContentLength']; + $state->setPartSize(1048576); + $partSize = $state->getPartSize(); + $destStream = new Psr7\LazyOpenStream($dest, 'rw'); + + for ($byte = 0; $byte <= $totalSize; $byte+=$partSize) { + $stream = new Psr7\LimitStream($destStream, $partSize, $byte); + echo $stream->getSize() . "\n"; + echo $stream->tell() . "\n"; + // mark part as downloaded as you check + } + + $state->setStatus(DownloadState::INITIATED); + return $state; + } + + protected function handleResult($command, ResultInterface $result) + { + if ($this->checksumMode) { + if ($this->validateChecksum($result)){ + throw new \Exception('Checksum invalid.'); + } + } + + if (!($command instanceof CommandInterface)){ + // single part downloads - part and range + $partNumber = 1; + $position = 0; + } elseif (!(isset($command['PartNumber']))) { + // multipart downloads - range + $seek = substr($command['Range'], strpos($command['Range'], "=") + 1); + $seek = (int)(strtok($seek, '-')); + $partNumber = $this->streamPositionArray[$seek]; + $position = $seek; + } else { + // multipart downloads - part + $partNumber = $command['PartNumber']; + $position = $this->streamPositionArray[$command['PartNumber']]; + } + $this->getState()->markPartAsDownloaded($partNumber, [ + 'PartNumber' => $partNumber, + 'ETag' => $this->extractETag($result), + ]); + + $this->writeDestStream($position, $result['Body']); + if ($this->getState()->displayProgress) { + $this->downloadedBytes+=strlen($result['Body']); + $this->getState()->getDisplayProgress($this->downloadedBytes); + } + } + + private function validateChecksum($result) + { + if (isset($result['ChecksumValidated'])) { + $checksum = CalculatesChecksumTrait::getEncodedValue( + $result['ChecksumValidated'], $result['Body'] + ); + if ($checksum != $result['Checksum' . $result['ChecksumValidated']]) { + return true; + } + } elseif (!strpos($result['ETag'], '-') + && $result['ETag'] != md5($result['Body'])) { + return true; + } + } + + protected function writeDestStream($position, $body) + { + $this->destStream->seek($position); + $this->destStream->write($body->getContents()); + } + + abstract protected function extractETag(ResultInterface $result); + + protected function determinePartSize() + { + // Make sure the part size is set. + $partSize = $this->getConfig()['part_size'] ?: MultipartDownloader::PART_MIN_SIZE; + + // Ensure that the part size follows the rules: 5 MB <= size <= 5 GB. + if ($partSize < MultipartDownloader::PART_MIN_SIZE || $partSize > MultipartDownloader::PART_MAX_SIZE) { + throw new \InvalidArgumentException('The part size must be no less ' + . 'than 5 MB and no greater than 5 GB.'); + } + + return $partSize; + } + + protected function getInitiateParams($configType) + { + $config = $this->getConfig(); + $params = $config['params'] ?? []; + + if (isset($config['acl'])) { + $params['ACL'] = $config['acl']; + } + + $params[$configType['config']] = $configType['configParam']; + + if (isset($this->checksumMode)) { + $params['ChecksumMode'] = 'ENABLED'; + } + + return $params; + } + + protected function getDownloadType() + { + $config = $this->getConfig(); + if (isset($config['partnumber'])) { + return ['config' => 'PartNumber', + 'configParam' => $config['partnumber']]; + } elseif (isset($config['range'])) { + return ['config' => 'Range', + 'configParam' => $config['range']]; + } elseif (isset($config['multipartdownloadtype']) && $config['multipartdownloadtype'] == 'Range') { + return ['config' => 'Range', + 'configParam' => 'bytes=0-'.MultipartDownloader::PART_MIN_SIZE, + 'multipart' => 'yes' + ]; + } else { + return ['config' => 'PartNumber', + 'configParam' => 1, + 'multipart' => 'yes']; + } + } + + /** + * @return DownloadState + */ + abstract protected function getState(); + + /** + * @return array + */ + abstract protected function getConfig(); +} diff --git a/src/S3/MultipartUploader.php b/src/S3/MultipartUploader.php index ae47d7e5fd..ab7426607e 100644 --- a/src/S3/MultipartUploader.php +++ b/src/S3/MultipartUploader.php @@ -70,6 +70,9 @@ public function __construct( 'key' => null, 'exception_class' => S3MultipartUploadException::class, ]); + if (isset($config['track_upload']) && $config['track_upload']) { + $this->getState()->setProgressThresholds($this->source->getSize()); + } } protected function loadUploadWorkflowInfo() diff --git a/src/S3/MultipartUploadingTrait.php b/src/S3/MultipartUploadingTrait.php index baccf58c51..9e1eedacc7 100644 --- a/src/S3/MultipartUploadingTrait.php +++ b/src/S3/MultipartUploadingTrait.php @@ -7,6 +7,8 @@ trait MultipartUploadingTrait { + private $uploadedBytes = 0; + /** * Creates an UploadState object for a multipart upload by querying the * service for the specified upload's information. @@ -55,6 +57,9 @@ protected function handleResult(CommandInterface $command, ResultInterface $resu 'PartNumber' => $command['PartNumber'], 'ETag' => $this->extractETag($result), ]); + + $this->uploadedBytes += $command["ContentLength"]; + $this->getState()->displayProgress($this->uploadedBytes); } abstract protected function extractETag(ResultInterface $result); diff --git a/src/S3/ObjectDownloader.php b/src/S3/ObjectDownloader.php new file mode 100644 index 0000000000..d91c337493 --- /dev/null +++ b/src/S3/ObjectDownloader.php @@ -0,0 +1,129 @@ + null, + 'concurrency' => 3, + 'mup_threshold' => self::DEFAULT_MULTIPART_THRESHOLD, + 'params' => [], + 'part_size' => null, + ]; + private $addContentMD5; + + /** + * @param S3ClientInterface $client The S3 Client used to execute + * the upload command(s). + * @param string $bucket Bucket to upload the object, or + * an S3 access point ARN. + * @param string $key Key of the object. + * @param mixed $body Object data to upload. Can be a + * StreamInterface, PHP stream + * resource, or a string of data to + * upload. + * @param string $acl ACL to apply to the copy + * (default: private). + * @param array $options Options used to configure the + * copy process. Options passed in + * through 'params' are added to + * the sub command(s). + */ + public function __construct( + S3ClientInterface $client, + $bucket, + $key, + $dest + ) { + $this->client = $client; + $this->bucket = $bucket; + $this->key = $key; + $this->dest = $dest; + } + + /** + * @return PromiseInterface + */ + public function promise() + { + // Perform a regular GetObject operation. + $command = $this->client->getCommand('GetObject', [ + 'Bucket' => $this->bucket, + 'Key' => $this->key, + 'SaveAs' => $this->dest + ]); + + return $this->client->executeAsync($command); + } + + public function download() + { + return $this->promise()->wait(); + } + + /** + * Determines if the body should be uploaded using PutObject or the + * Multipart Upload System. It also modifies the passed-in $body as needed + * to support the upload. + * + * @param StreamInterface $body Stream representing the body. + * @param integer $threshold Minimum bytes before using Multipart. + * + * @return bool + */ + private function requiresMultipart(StreamInterface &$body, $threshold) + { + // If body size known, compare to threshold to determine if Multipart. + if ($body->getSize() !== null) { + return $body->getSize() >= $threshold; + } + + /** + * Handle the situation where the body size is unknown. + * Read up to 5MB into a buffer to determine how to upload the body. + * @var StreamInterface $buffer + */ + $buffer = Psr7\Utils::streamFor(); + Psr7\Utils::copyToStream($body, $buffer, MultipartUploader::PART_MIN_SIZE); + + // If body < 5MB, use PutObject with the buffer. + if ($buffer->getSize() < MultipartUploader::PART_MIN_SIZE) { + $buffer->seek(0); + $body = $buffer; + return false; + } + + // If body >= 5 MB, then use multipart. [YES] + if ($body->isSeekable() && $body->getMetadata('uri') !== 'php://input') { + // If the body is seekable, just rewind the body. + $body->seek(0); + } else { + // If the body is non-seekable, stitch the rewind the buffer and + // the partially read body together into one stream. This avoids + // unnecessary disc usage and does not require seeking on the + // original stream. + $buffer->seek(0); + $body = new Psr7\AppendStream([$buffer, $body]); + } + + return true; + } +} + diff --git a/src/S3/S3ClientInterface.php b/src/S3/S3ClientInterface.php index 261d7dd3c0..f5d84dcc8c 100644 --- a/src/S3/S3ClientInterface.php +++ b/src/S3/S3ClientInterface.php @@ -217,6 +217,18 @@ public function uploadAsync( array $options = [] ); + public function download( + $bucket, + $key, + $dest + ); + + public function downloadAsync( + $bucket, + $key, + $dest + ); + /** * Copy an object of any size to a different location. * diff --git a/src/S3/S3ClientTrait.php b/src/S3/S3ClientTrait.php index 5ec2f7d6a3..f404f52060 100644 --- a/src/S3/S3ClientTrait.php +++ b/src/S3/S3ClientTrait.php @@ -49,6 +49,25 @@ public function uploadAsync( ->promise(); } + public function download( + $bucket, + $key, + $dest + ) { + return $this + ->downloadAsync($bucket, $key, $dest) + ->wait(); + } + + public function downloadAsync( + $bucket, + $key, + $dest + ) { + return (new ObjectDownloader($bucket, $key, $dest)) + ->promise(); + } + /** * @see S3ClientInterface::copy() */ diff --git a/tests/Exception/MultipartDownloadExceptionTest.php b/tests/Exception/MultipartDownloadExceptionTest.php new file mode 100644 index 0000000000..899cef8df0 --- /dev/null +++ b/tests/Exception/MultipartDownloadExceptionTest.php @@ -0,0 +1,61 @@ +assertSame( + "An exception occurred while {$status} a multipart upload: $msg", + $exception->getMessage() + ); + $this->assertSame($state, $exception->getState()); + $this->assertSame($prev, $exception->getPrevious()); + } + + public function getTestCases() + { + return [ + ['GetObject', 'performing'] + ]; + } + + public function testCanCreateExceptionListingFailedParts() + { + $state = new DownloadState([]); + $failed = [ + 1 => new AwsException('Bad digest.', new Command('GetObject')), + 5 => new AwsException('Missing header.', new Command('GetObject')), + 8 => new AwsException('Needs more love.', new Command('GetObject')), + ]; + + $exception = new MultipartDownloadException($state, $failed); + + $expected = <<assertSame($expected, $exception->getMessage()); + } +} diff --git a/tests/Multipart/AbstractDownloaderTest.php b/tests/Multipart/AbstractDownloaderTest.php new file mode 100644 index 0000000000..ce058f3e60 --- /dev/null +++ b/tests/Multipart/AbstractDownloaderTest.php @@ -0,0 +1,224 @@ + 'foo', 'Key' => 'bar']); + $state->setPartSize(2); + $state->setStatus($status); + + return $this->getTestDownloader( + ['state' => $state], + $results + ); + } + + private function getTestDownloader( + array $config = [], + array $results = [] + ) { + $client = $this->getTestClient('s3', [ + 'validate' => false, + 'retries' => 0, + ]); + $this->addMockResults($client, $results); + + return new MultipartDownloader($client, 'php://temp', $config); + } + + public function testThrowsExceptionOnBadInitiateRequest() + { + $this->expectException(\Aws\S3\Exception\S3MultipartDownloadException::class); + $downloader = $this->getDownloaderWithState(DownloadState::CREATED, [ + new AwsException('Failed', new Command('Initiate')), + ]); + $downloader->download(); + } + + public function testThrowsExceptionIfStateIsCompleted() + { + // set exception to expect + // set state as completed + // make sure state is completed + // check that exception is thrown + $this->expectException(\LogicException::class); + $downloader = $this->getDownloaderWithState(DownloadState::COMPLETED); + $this->assertTrue($downloader->getState()->isCompleted()); + $downloader->download(); + } + + public function testSuccessfulCompleteReturnsResult() + { + // + $downloader = $this->getDownloaderWithState(DownloadState::CREATED, [ + new Result(['body' => Psr7\Utils::streamFor(str_repeat('.', 1 * 1048576))]) + ], 'php://temp'); + $this->assertSame(str_repeat('.', 1 * 1048576), $downloader->download()['body']); + $this->assertTrue($downloader->getState()->isCompleted()); + } + + public function testThrowsExceptionOnBadCompleteRequest() + { + $this->expectException(\Aws\S3\Exception\S3MultipartDownloadException::class); + $uploader = $this->getDownloaderWithState(DownloadState::CREATED, [ + new Result(), // Initiate + new Result(), // Upload + new AwsException('Failed', new Command('Complete')), + ], 'php://temp'); + $uploader->download(); + } + + public function testThrowsExceptionOnBadUploadRequest() + { + $uploader = $this->getDownloaderWithState(DownloadState::CREATED, [ + new Result(), // Initiate + new AwsException('Failed[1]', new Command('Upload', ['PartNumber' => 1])), + new Result(), // Upload + new Result(), // Upload + new AwsException('Failed[4]', new Command('Upload', ['PartNumber' => 4])), + new Result(), // Upload + ], Psr7\Utils::streamFor('abcdefghi')); + + try { + $uploader->download(); + $this->fail('No exception was thrown.'); + } catch (MultipartDownloadException $e) { + $message = $e->getMessage(); + $this->assertStringContainsString('Failed[1]', $message); + $this->assertStringContainsString('Failed[4]', $message); + $uploadedParts = $e->getState()->getDownloadedParts(); + $this->assertCount(3, $uploadedParts); + $this->assertArrayHasKey(2, $uploadedParts); + $this->assertArrayHasKey(3, $uploadedParts); + $this->assertArrayHasKey(5, $uploadedParts); + + // Test if can resume an upload. + $serializedState = serialize($e->getState()); + $state = unserialize($serializedState); + $secondChance = $this->getTestDownloader( + Psr7\Utils::streamFor('abcdefghi'), + ['state' => $state], + [ + new Result(), // Upload + new Result(), // Upload + new Result(['foo' => 'bar']), // Upload + ] + ); + $result = $secondChance->upload(); + $this->assertSame('bar', $result['foo']); + } + } + + public function testAsyncUpload() + { + $called = 0; + $fn = function () use (&$called) { + $called++; + }; + + $uploader = $this->getTestDownloader(Psr7\Utils::streamFor('abcde'), [ + 'bucket' => 'foo', + 'key' => 'bar', + 'prepare_data_source' => $fn, + 'before_initiate' => $fn, + 'before_upload' => $fn, + 'before_complete' => $fn, + ], [ + new Result(), // Initiate + new Result(), // Upload + new Result(), // Upload + new Result(), // Upload + new Result(['test' => 'foo']) // Complete + ]); + + $promise = $uploader->promise(); + $this->assertSame($promise, $uploader->promise()); + $this->assertInstanceOf('Aws\Result', $promise->wait()); + $this->assertSame(6, $called); + } + + public function testRequiresIdParams() + { + $this->expectException(\InvalidArgumentException::class); + $this->getTestDownloader(); + } + + /** + * @param bool $seekable + * @param DownloadState $state + * @param array $expectedBodies + * + * @dataProvider getPartGeneratorTestCases + */ + public function testCommandGeneratorYieldsExpectedUploadCommands( + $seekable, + DownloadState $state, + array $expectedBodies + ) { + $source = Psr7\Utils::streamFor(fopen(__DIR__ . '/source.txt', 'r')); + if (!$seekable) { + $source = new Psr7\NoSeekStream($source); + } + + $uploader = $this->getTestDownloader($source, ['state' => $state]); + $uploader->getState(); + $handler = function (callable $handler) { + return function ($c, $r) use ($handler) { + return $handler($c, $r); + }; + }; + + $actualBodies = []; + $getUploadCommands = (new \ReflectionObject($uploader)) + ->getMethod('getDownloadCommands'); + $getUploadCommands->setAccessible(true); + foreach ($getUploadCommands->invoke($uploader, $handler) as $cmd) { + $actualBodies[$cmd['PartNumber']] = $cmd['Body']->getContents(); + } + + $this->assertEquals($expectedBodies, $actualBodies); + } + + public function getPartGeneratorTestCases() + { + $expected = [ + 1 => 'AA', + 2 => 'BB', + 3 => 'CC', + 4 => 'DD', + 5 => 'EE', + 6 => 'F' , + ]; + $expectedSkip = $expected; + unset($expectedSkip[1], $expectedSkip[2], $expectedSkip[4]); + $state = new DownloadState([]); + $state->setPartSize(2); + $stateSkip = clone $state; + $stateSkip->markPartAsDownloaded(1); + $stateSkip->markPartAsDownloaded(2); + $stateSkip->markPartAsDownloaded(4); + return [ + [true, $state, $expected], + [false, $state, $expected], + [true, $stateSkip, $expectedSkip], + [false, $stateSkip, $expectedSkip], + ]; + } +} diff --git a/tests/Multipart/DownloadStateTest.php b/tests/Multipart/DownloadStateTest.php new file mode 100644 index 0000000000..c368633a86 --- /dev/null +++ b/tests/Multipart/DownloadStateTest.php @@ -0,0 +1,66 @@ +assertFalse($state->isInitiated()); + $this->assertFalse($state->isCompleted()); + + $state->setStatus(DownloadState::INITIATED); + $this->assertFalse($state->isCompleted()); + $this->assertTrue($state->isInitiated()); + + $state->setStatus(DownloadState::COMPLETED); + $this->assertFalse($state->isInitiated()); + $this->assertTrue($state->isCompleted()); + } + + public function testCanStorePartSize() + { + $state = new DownloadState([]); + $this->assertNull($state->getPartSize()); + $state->setPartSize(50000000); + $this->assertSame(50000000, $state->getPartSize()); + } + + public function testCanTrackDownloadedParts() + { + $state = new DownloadState([]); + $this->assertEmpty($state->getDownloadedParts()); + + $state->markPartAsDownloaded(1, ['foo' => 1]); + $state->markPartAsDownloaded(3, ['foo' => 3]); + $state->markPartAsDownloaded(2, ['foo' => 2]); + + $this->assertTrue($state->hasPartBeenDownloaded(2)); + $this->assertFalse($state->hasPartBeenDownloaded(5)); + + // Note: The parts should come out sorted. + $this->assertSame([1, 2, 3], array_keys($state->getDownloadedParts())); + } + + public function testSerializationWorks() + { + $state = new DownloadState([]); + $state->setPartSize(5); + $state->markPartAsDownloaded(1); + $state->setStatus($state::INITIATED); + $serializedState = serialize($state); + + /** @var DownloadState $newState */ + $newState = unserialize($serializedState); + $this->assertSame(5, $newState->getPartSize()); + $this->assertArrayHasKey(1, $state->getDownloadedParts()); + $this->assertTrue($newState->isInitiated()); + } +} diff --git a/tests/Multipart/TestDownloader.php b/tests/Multipart/TestDownloader.php new file mode 100644 index 0000000000..f0656f7569 --- /dev/null +++ b/tests/Multipart/TestDownloader.php @@ -0,0 +1,149 @@ +destStream = new Psr7\LazyOpenStream($source, 'w'); + parent::__construct($client, $source, $config + [ + 'bucket' => null, + 'key' => null, + 'exception_class' => S3MultipartDownloadException::class, + ]); + } + protected function loadDownloadWorkflowInfo() + { + return [ + 'command' => [ + 'initiate' => 'GetObject', + 'download' => 'GetObject' + ], + 'id' => [ + 'bucket' => 'Bucket', + 'key' => 'Key', + 'download_id' => 'DownloadId', + ], + 'part_num' => 'PartNumber', + ]; + } + + protected function determinePartSize() + { + return $this->config['part_size'] ?: 2; + } + + protected function getInitiateParams($type) + { + return []; + } + + protected function createPart($seekable, $number) + { + if ($seekable) { + $body = Psr7\Utils::streamFor(fopen($this->source->getMetadata('uri'), 'r')); + $body = $this->limitPartStream($body); + } else { + $body = Psr7\Utils::streamFor($this->source->read($this->state->getPartSize())); + } + + // Do not create a part if the body size is zero. + if ($body->getSize() === 0) { + return false; + } + + return [ + 'PartNumber' => $number, + 'Body' => $body, + 'UploadId' => 'baz' + ]; + } + + protected function handleResult($command, ResultInterface $result) + { + if (!($command instanceof CommandInterface)){ + // single part downloads - part and range + $partNumber = 1; + $position = 0; + } elseif (!(isset($command['PartNumber']))) { + // multipart downloads - range + $seek = substr($command['Range'], strpos($command['Range'], "=") + 1); + $seek = (int)(strtok($seek, '-')); + $partNumber = $this->streamPositionArray[$seek]; + $position = $seek; + } else { + // multipart downloads - part + $partNumber = $command['PartNumber']; + $position = $this->streamPositionArray[$command['PartNumber']]; + } + + $this->getState()->markPartAsDownloaded($partNumber, [ + 'PartNumber' => $partNumber, + 'ETag' => $this->extractETag($result), + ]); + $this->writeDestStream($position, $result['Body']); + } + + protected function extractETag(ResultInterface $result) + { + return $result['ETag']; + } + + protected function writeDestStream($position, $body) + { + $this->destStream->seek($position); + if ($body) { + $this->destStream->write($body->getContents()); + } + } + + protected function getDownloadType() + { + $config = $this->getConfig(); + if (isset($config['partnumber'])) { + return ['config' => 'PartNumber', + 'configParam' => $config['partnumber']]; + } elseif (isset($config['range'])) { + return ['config' => 'Range', + 'configParam' => $config['range']]; + } elseif (isset($config['multipartdownloadtype']) && $config['multipartdownloadtype'] == 'Range') { + return ['config' => 'Range', + 'configParam' => 'bytes=0-'.MultipartDownloader::PART_MIN_SIZE, + 'multipart' => 'yes' + ]; + } else { + return ['config' => 'PartNumber', + 'configParam' => 1, + 'multipart' => 'yes']; + } + } + + public function setStreamPositionArray() + { + $parts = ceil($this->sourceSize/$this->state->getPartSize()); + $position = 0; + if (isset($this->config['range']) or + (isset($this->config['multipartdownloadtype']) && + $this->config['multipartdownloadtype'] == 'Range')) { + for ($i = 1; $i <= $parts; $i++) { + $this->streamPositionArray [$position] = $i; + $position += $this->state->getPartSize(); + } + } else { + for ($i = 1; $i <= $parts; $i++) { + $this->streamPositionArray [$i] = $position; + $position += $this->state->getPartSize(); + } + } + } + +} diff --git a/tests/Multipart/UploadStateTest.php b/tests/Multipart/UploadStateTest.php index 64ac615f30..27a359e854 100644 --- a/tests/Multipart/UploadStateTest.php +++ b/tests/Multipart/UploadStateTest.php @@ -70,4 +70,163 @@ public function testSerializationWorks() $this->assertTrue($newState->isInitiated()); $this->assertArrayHasKey('foo', $newState->getId()); } -} + + public function testEmptyUploadStateOutputWithConfigFalse() + { + $config['track_upload'] = false; + $state = new UploadState([], $config); + $state->setProgressThresholds(100); + $state->displayProgress(13); + $this->expectOutputString(''); + } + + /** + * @dataProvider getDisplayProgressCases + */ + public function testDisplayProgressPrintsProgress( + $totalSize, + $totalUploaded, + $progressBar + ) { + $config['track_upload'] = true; + $state = new UploadState([]); + $state->setProgressThresholds($totalSize); + $state->displayProgress($totalUploaded); + + $this->expectOutputString($progressBar); + } + + public function getDisplayProgressCases() + { + $progressBar = ["Transfer initiated...\n| | 0.0%\n", + "|== | 12.5%\n", + "|===== | 25.0%\n", + "|======= | 37.5%\n", + "|========== | 50.0%\n", + "|============ | 62.5%\n", + "|=============== | 75.0%\n", + "|================= | 87.5%\n", + "|====================| 100.0%\nTransfer complete!\n"]; + return [ + [100000, 0, $progressBar[0]], + [100000, 12499, $progressBar[0]], + [100000, 12500, "{$progressBar[0]}{$progressBar[1]}"], + [100000, 24999, "{$progressBar[0]}{$progressBar[1]}"], + [100000, 25000, "{$progressBar[0]}{$progressBar[1]}{$progressBar[2]}"], + [100000, 37499, "{$progressBar[0]}{$progressBar[1]}{$progressBar[2]}"], + [ + 100000, + 37500, + "{$progressBar[0]}{$progressBar[1]}{$progressBar[2]}{$progressBar[3]}" + ], + [ + 100000, + 49999, + "{$progressBar[0]}{$progressBar[1]}{$progressBar[2]}{$progressBar[3]}" + ], + [ + 100000, + 50000, + "{$progressBar[0]}{$progressBar[1]}{$progressBar[2]}{$progressBar[3]}{$progressBar[4]}" + ], + [ + 100000, + 62499, + "{$progressBar[0]}{$progressBar[1]}{$progressBar[2]}{$progressBar[3]}{$progressBar[4]}" + ], + [ + 100000, + 62500, + "{$progressBar[0]}{$progressBar[1]}{$progressBar[2]}{$progressBar[3]}{$progressBar[4]}" . + "{$progressBar[5]}" + ], + [ + 100000, + 74999, + "{$progressBar[0]}{$progressBar[1]}{$progressBar[2]}{$progressBar[3]}{$progressBar[4]}" . + "{$progressBar[5]}" + ], + [ + 100000, + 75000, + "{$progressBar[0]}{$progressBar[1]}{$progressBar[2]}{$progressBar[3]}{$progressBar[4]}" . + "{$progressBar[5]}{$progressBar[6]}" + ], + [ + 100000, + 87499, + "{$progressBar[0]}{$progressBar[1]}{$progressBar[2]}{$progressBar[3]}{$progressBar[4]}" . + "{$progressBar[5]}{$progressBar[6]}" + ], + [ + 100000, + 87500, + "{$progressBar[0]}{$progressBar[1]}{$progressBar[2]}{$progressBar[3]}{$progressBar[4]}" . + "{$progressBar[5]}{$progressBar[6]}{$progressBar[7]}" + ], + [ + 100000, + 99999, + "{$progressBar[0]}{$progressBar[1]}{$progressBar[2]}{$progressBar[3]}{$progressBar[4]}" . + "{$progressBar[5]}{$progressBar[6]}{$progressBar[7]}" + ], + [100000, 100000, implode($progressBar)] + ]; + } + + /** + * @dataProvider getThresholdCases + */ + public function testUploadThresholds($totalSize) + { + $config['track_upload'] = true; + $state = new UploadState([]); + $threshold = $state->setProgressThresholds($totalSize); + + $this->assertIsArray($threshold); + $this->assertCount(9, $threshold); + } + + public function getThresholdCases() + { + return [ + [0], + [100000], + [100001] + ]; + } + + /** + * @dataProvider getInvalidIntCases + */ + public function testSetProgressThresholdsThrowsException($totalSize) + { + $state = new UploadState([]); + $this->expectExceptionMessage('The total size of the upload must be an int.'); + $this->expectException(\InvalidArgumentException::class); + + $state->setProgressThresholds($totalSize); + } + + /** + * @dataProvider getInvalidIntCases + */ + public function testDisplayProgressThrowsException($totalUploaded) + { + $state = new UploadState([]); + $this->expectExceptionMessage('The size of the bytes being uploaded must be an int.'); + $this->expectException(\InvalidArgumentException::class); + + $state->displayProgress($totalUploaded); + } + + public function getInvalidIntCases() + { + return [ + [''], + [null], + ['1234'], + ['aws'], + ]; + } +} \ No newline at end of file diff --git a/tests/S3/Exception/S3MultipartDownloadExceptionTest.php b/tests/S3/Exception/S3MultipartDownloadExceptionTest.php new file mode 100644 index 0000000000..d166b6ed08 --- /dev/null +++ b/tests/S3/Exception/S3MultipartDownloadExceptionTest.php @@ -0,0 +1,37 @@ + new AwsException('Bad digest.', new Command('GetObject', [ + 'Bucket' => 'foo', + 'Key' => 'bar' + ])), + 5 => new AwsException('Missing header.', new Command('GetObject', [ + 'Bucket' => 'foo', + 'Key' => 'bar' + ])), + 8 => new AwsException('Needs more love.', new Command('GetObject')), + ]; + + $path = '/path/to/the/large/file/test.zip'; + $exception = new S3MultipartDownloadException($state, $failed); + $this->assertSame('foo', $exception->getBucket()); + $this->assertSame('bar', $exception->getKey()); +// $this->assertSame('php://temp', $exception->getSourceFileName()); + } +} diff --git a/tests/S3/MultipartDownloaderTest.php b/tests/S3/MultipartDownloaderTest.php new file mode 100644 index 0000000000..e24433bbc3 --- /dev/null +++ b/tests/S3/MultipartDownloaderTest.php @@ -0,0 +1,336 @@ +getTestClient('s3'); + $this->addMockResults($client, [ + new Result(['Body' => Psr7\Utils::streamFor(str_repeat('.', 10 * self::MB)), + 'ChecksumValidated' => 'CRC32', +// 'ChecksumCRC32' => +// CalculatesChecksumTrait::getEncodedValue('crc32', +// Psr7\Utils::streamFor(str_repeat('.', 10 * self::MB))) + 'ChecksumCRC32' => 'M6FqCg==' + ]) + ]); + + if ($error) { + if (method_exists($this, 'expectException')) { + $this->expectException($error); + } else { + $this->setExpectedException($error); + } + } + + $filename = tmpfile(); + $dest = stream_get_meta_data($filename)['uri']; + $downloader = new MultipartDownloader($client, $dest, $uploadOptions); + $result = $downloader->download(); + $output = file_get_contents($dest); + + $this->assertStringContainsString(str_repeat('.', 10 * self::MB), $output); + $this->assertTrue(filesize($dest) == 10*self::MB); + $this->assertTrue($downloader->getState()->isCompleted()); + } + + public function getTestCases() + { + $defaults = [ + 'bucket' => 'foo', + 'key' => 'bar' + ]; + + return [ + [ + ['acl' => 'private'] + $defaults + ], + [ + ['MultipartDownloadType' => 'Range'] + $defaults + ], + [ + ['MultipartDownloadType' => 'Parts'] + $defaults + ], + [ + ['PartNumber' => '1'] + $defaults + ], + [ + ['Range' => 'bytes=0-100'] + $defaults + ], + [ + ['checksum_validation_enabled' => false] + $defaults + ], + [ + ['checksum_validation_enabled' => true] + $defaults + ] + ]; + } + + // continuing a prev download? + public function testCanLoadStateFromDownload() + { + $client = $this->getTestClient('s3'); + $this->addMockResults($client, [ + new Result(['ETag' => 'A', + 'ChecksumValidated' => 'CRC32', + 'ContentLength' => 3 * self::MB]) + ]); + + $size = 1 * self::MB; + $data = str_repeat('.', $size); + file_put_contents('php://memory', $data); + + $state = MultipartDownloader::getStateFromService($client, 'foo', 'bar', 'php://memory'); + $downloader = new MultipartDownloader($client, $dest, ['state' => $state]); + $downloader->download(); + + $this->assertTrue($downloader->getState()->isCompleted()); +// $this->assertSame(4 * self::MB, $downloader->getState()->getPartSize()); +// $this->assertSame($url, $result['ObjectURL']); + } + + public function testCanUseCaseInsensitiveConfigKeys() + { + $client = $this->getTestClient('s3'); + $putObjectMup = new MultipartDownloader($client, 'php://temp', [ + 'Bucket' => 'bucket', + 'Key' => 'key', + ]); + $classicMup = new MultipartDownloader($client, 'php://temp', [ + 'bucket' => 'bucket', + 'key' => 'key', + ]); + $configProp = (new \ReflectionClass(MultipartDownloader::class)) + ->getProperty('config'); + $configProp->setAccessible(true); + + $this->assertSame($configProp->getValue($classicMup), $configProp->getValue($putObjectMup)); + } + + /** @doesNotPerformAssertions */ + public function testMultipartSuccessStreams() + { + $size = 12 * self::MB; + $data = str_repeat('.', $size); + $filename = sys_get_temp_dir() . '/' . self::FILENAME; + file_put_contents($filename, $data); + + return [ + [ // Seekable stream, regular config + 'php://temp', + $size, + ], + [ // Non-seekable stream + 'php://temp', + $size, + ] + ]; + } + + /** + * @dataProvider testMultipartSuccessStreams + */ + public function testS3MultipartDownloadParams($dest, $size) + { + /** @var \Aws\S3\S3Client $client */ + $client = $this->getTestClient('s3'); + $client->getHandlerList()->appendSign( + Middleware::tap(function ($cmd, $req) { + $name = $cmd->getName(); + if ($name === 'GetObject') { + $this->assertTrue( + $req->hasHeader('Content-MD5') + ); + } + }) + ); + $uploadOptions = [ + 'bucket' => 'foo', + 'key' => 'bar', + 'add_content_md5' => true, +// 'params' => [ +// 'RequestPayer' => 'test', +// 'ContentLength' => $size +// ], +// 'before_initiate' => function($command) { +// $this->assertSame('test', $command['RequestPayer']); +// }, +// 'before_download' => function($command) use ($size) { +// $this->assertLessThan($size, $command['ContentLength']); +// $this->assertSame('test', $command['RequestPayer']); +// }, +// 'before_complete' => function($command) { +// $this->assertSame('test', $command['RequestPayer']); +// }, + 'checksum_validation_enabled' => false + ]; + $url = 'http://foo.s3.amazonaws.com/bar'; + + $this->addMockResults($client, [ + new Result(['PartNumber' => 1, 'ETag' => 'A', 'Body' => 'foobar', + 'ChecksumValidated' => 'CRC32', +// 'ChecksumCRC32' => CalculatesChecksumTrait::getEncodedValue('crc32', 'foobar') +]), + new Result(['PartNumber' => 2, 'ETag' => 'B', 'Body' => 'foobar2', + 'ChecksumValidated' => 'CRC32', +// 'ChecksumCRC32' => CalculatesChecksumTrait::getEncodedValue('crc32', 'foobar2') + ]) + ]); + $filename = tmpfile(); + $dest = stream_get_meta_data($filename)['uri']; + $uploader = new MultipartDownloader($client, $dest, $uploadOptions); + $result = $uploader->download(); + print_r($result); + $this->assertTrue($uploader->getState()->isCompleted()); + } + + public function getContentTypeSettingTests() + { + $size = 12 * self::MB; + $data = str_repeat('.', $size); + $filename = sys_get_temp_dir() . '/' . self::FILENAME; + file_put_contents($filename, $data); + + return [ + [ // Successful lookup from filename via stream + Psr7\Utils::streamFor(fopen($filename, 'r')), + [], + 'text/plain' + ], + [ // Unsuccessful lookup because of no file name + Psr7\Utils::streamFor($data), + [], + 'application/octet-stream' + ], + [ // Successful override of known type from filename + Psr7\Utils::streamFor(fopen($filename, 'r')), + ['ContentType' => 'TestType'], + 'TestType' + ], + [ // Successful override of unknown type + Psr7\Utils::streamFor($data), + ['ContentType' => 'TestType'], + 'TestType' + ] + ]; + } + + /** + * @dataProvider getContentTypeSettingTests + */ + public function testS3MultipartContentTypeSetting( + $stream, + $params, + $expectedContentType + ) { + /** @var \Aws\S3\S3Client $client */ + $client = $this->getTestClient('s3'); + $uploadOptions = [ + 'bucket' => 'foo', + 'key' => 'bar', + 'params' => $params, + 'before_initiate' => function($command) use ($expectedContentType) { + $this->assertEquals( + $expectedContentType, + $command['ContentType'] + ); + }, + ]; + $url = 'http://foo.s3.amazonaws.com/bar'; + + $this->addMockResults($client, [ + new Result(['UploadId' => 'baz']), + new Result(['ETag' => 'A']), + new Result(['ETag' => 'B']), + new Result(['ETag' => 'C']), + new Result(['Location' => $url]) + ]); + + $uploader = new MultipartDownloader($client, $stream, $uploadOptions); + $result = $uploader->download(); + + $this->assertTrue($uploader->getState()->isCompleted()); + $this->assertSame($url, $result['ObjectURL']); + } + + public function testAppliesAmbiguousSuccessParsing() + { + $this->expectExceptionMessage("An exception occurred while downloading parts to a multipart download"); + $this->expectException(\Aws\S3\Exception\S3MultipartDownloadException::class); + $counter = 0; + + $httpHandler = function ($request, array $options) use (&$counter) { + if ($counter < 1) { + $body = "baz"; + } else { + $body = "\n\n\n"; + } + $counter++; + + return Promise\Create::promiseFor( + new Psr7\Response(200, [], $body) + ); + }; + + $s3 = new S3Client([ + 'version' => 'latest', + 'region' => 'us-east-1', + 'http_handler' => $httpHandler + ]); + +// $data = str_repeat('.', 12 * 1048576); +// $source = Psr7\Utils::streamFor($data); + + $filename = tmpfile(); + $dest = stream_get_meta_data($filename)['uri']; + + $downloader = new MultipartDownloader( + $s3, + $dest, + [ + 'bucket' => 'test-bucket', + 'key' => 'test-key', + 'checksum_validation_enabled' => false + ] + ); + $downloader->download(); + } +} + diff --git a/tests/S3/MultipartUploaderTest.php b/tests/S3/MultipartUploaderTest.php index c5beefae92..d5a53a40be 100644 --- a/tests/S3/MultipartUploaderTest.php +++ b/tests/S3/MultipartUploaderTest.php @@ -316,4 +316,49 @@ public function testAppliesAmbiguousSuccessParsing() ); $uploader->upload(); } + + public function testFailedUploadPrintsPartialProgressBar() + { + $partialBar = [ "Transfer initiated...\n| | 0.0%\n", + "|== | 12.5%\n", + "|===== | 25.0%\n"]; + $this->expectOutputString("{$partialBar[0]}{$partialBar[1]}{$partialBar[2]}"); + + $this->expectExceptionMessage("An exception occurred while uploading parts to a multipart upload"); + $this->expectException(\Aws\S3\Exception\S3MultipartUploadException::class); + $counter = 0; + + $httpHandler = function ($request, array $options) use (&$counter) { + if ($counter < 4) { + $body = "baz"; + } else { + $body = "\n\n\n"; + } + $counter++; + + return Promise\Create::promiseFor( + new Psr7\Response(200, [], $body) + ); + }; + + $s3 = new S3Client([ + 'version' => 'latest', + 'region' => 'us-east-1', + 'http_handler' => $httpHandler + ]); + + $data = str_repeat('.', 50 * self::MB); + $source = Psr7\Utils::streamFor($data); + + $uploader = new MultipartUploader( + $s3, + $source, + [ + 'bucket' => 'test-bucket', + 'key' => 'test-key', + 'track_upload' => 'true' + ] + ); + $uploader->upload(); + } }