diff --git a/src/Migration/MessageQueue/Handler/ResetChecksumHandler.php b/src/Migration/MessageQueue/Handler/ResetChecksumHandler.php index 76618154f..483af8053 100644 --- a/src/Migration/MessageQueue/Handler/ResetChecksumHandler.php +++ b/src/Migration/MessageQueue/Handler/ResetChecksumHandler.php @@ -9,8 +9,8 @@ use Doctrine\DBAL\Connection; use Doctrine\DBAL\ParameterType; -use Shopware\Core\Framework\Context; use Shopware\Core\Framework\DataAbstractionLayer\EntityRepository; +use Shopware\Core\Framework\DataAbstractionLayer\Search\Criteria; use Shopware\Core\Framework\Log\Package; use Shopware\Core\Framework\Uuid\Uuid; use SwagMigrationAssistant\Migration\DataSelection\DefaultEntities; @@ -48,92 +48,60 @@ public function __invoke(ResetChecksumMessage $message): void { $connectionId = $message->getConnectionId(); $totalMappings = $message->getTotalMappings(); - $progress = null; if ($totalMappings === null) { $totalMappings = $this->getTotalMappingsCount($connectionId); - - if ($message->getRunId() !== null && $totalMappings > 0) { - $progress = $this->updateProgress( - $message, - 0, - $totalMappings, - $message->getContext() - ); - } } $affectedRows = $this->resetChecksums($connectionId); + $newProcessedCount = $message->getProcessedMappings() + $affectedRows; - if ($affectedRows === 0) { - $this->handleCompletion($message, $progress); + $isCompleted = $affectedRows < self::BATCH_SIZE; - return; - } + if ($isCompleted) { + $this->handleCompletion($message); - $newProcessedCount = $message->getProcessedMappings() + $affectedRows; - - if ($message->getRunId() !== null) { - $progress = $this->updateProgress( - $message, - $newProcessedCount, - $totalMappings, - $message->getContext() - ); + if ($message->isPartOfAbort()) { + return; + } } - if ($affectedRows < self::BATCH_SIZE) { - $this->handleCompletion($message, $progress); - - return; + if ($message->getRunId() !== null && $totalMappings > 0) { + $this->updateProgress($message, $newProcessedCount, $totalMappings); } - $this->messageBus->dispatch(new ResetChecksumMessage( - $message->getConnectionId(), - $message->getContext(), - $message->getRunId(), - $message->getEntity(), - $totalMappings, - $newProcessedCount, - $message->isPartOfAbort() - )); + if (!$isCompleted) { + $this->messageBus->dispatch(new ResetChecksumMessage( + $message->getConnectionId(), + $message->getContext(), + $message->getRunId(), + $message->getEntity(), + $totalMappings, + $newProcessedCount, + $message->isPartOfAbort() + )); + } } - private function handleCompletion(ResetChecksumMessage $message, ?MigrationProgress $progress): void + private function handleCompletion(ResetChecksumMessage $message): void { $this->clearResettingChecksumsFlag(); + $runId = $message->getRunId(); - if (!$message->isPartOfAbort() || $message->getRunId() === null) { + if (!$message->isPartOfAbort() || $runId === null) { return; } - $runId = $message->getRunId(); - $context = $message->getContext(); - $this->runTransitionService->forceTransitionToRunStep( $runId, MigrationStep::CLEANUP ); - $finalProgress = new MigrationProgress( - 0, - 0, - $progress?->getDataSets() ?? new ProgressDataSetCollection(), - $message->getEntity() ?? DefaultEntities::RULE, - $progress?->getCurrentEntityProgress() ?? 0 - ); - $finalProgress->setIsAborted(true); - - $this->migrationRunRepo->upsert([ - [ - 'id' => $runId, - 'progress' => $finalProgress->jsonSerialize(), - ], - ], $context); + $this->updateProgress($message, 0, 0, true); $this->messageBus->dispatch(new MigrationProcessMessage( - $context, - $runId + $message->getContext(), + $runId, )); } @@ -168,22 +136,40 @@ private function getTotalMappingsCount(string $connectionId): int ->fetchOne(); } - private function updateProgress(ResetChecksumMessage $message, int $processed, int $total, Context $context): MigrationProgress + private function updateProgress(ResetChecksumMessage $message, int $processed, int $total, bool $isAborted = false): void { - $progress = new MigrationProgress( + $runId = $message->getRunId(); + + if ($runId === null) { + return; + } + + $run = $this->migrationRunRepo->search( + new Criteria([$runId]), + $message->getContext(), + )->getEntities()->first(); + + if ($run === null) { + return; + } + + $progress = $run->getProgress(); + $newProgress = new MigrationProgress( $processed, $total, - new ProgressDataSetCollection(), + $progress?->getDataSets() ?? new ProgressDataSetCollection(), $message->getEntity() ?? DefaultEntities::RULE, $processed ); - $this->migrationRunRepo->update([[ - 'id' => $message->getRunId(), - 'progress' => $progress->jsonSerialize(), - ]], $context); + if ($isAborted) { + $newProgress->setIsAborted(true); + } - return $progress; + $this->migrationRunRepo->update([[ + 'id' => $runId, + 'progress' => $newProgress->jsonSerialize(), + ]], $message->getContext()); } private function clearResettingChecksumsFlag(): void diff --git a/src/Migration/Run/RunService.php b/src/Migration/Run/RunService.php index d37a16610..54ca805a4 100644 --- a/src/Migration/Run/RunService.php +++ b/src/Migration/Run/RunService.php @@ -174,6 +174,7 @@ public function abortMigration(Context $context): void $this->runTransitionService->transitionToRunStep($runId, MigrationStep::ABORTING); + $this->bus->dispatch(new MigrationProcessMessage($context, $runId)); $this->fireTrackingInformation(self::TRACKING_EVENT_MIGRATION_ABORTED, $runId, $context); } diff --git a/tests/Migration/MessageQueue/Handler/ResetChecksumHandlerTest.php b/tests/Migration/MessageQueue/Handler/ResetChecksumHandlerTest.php index ead49d2f8..0ed2b958d 100644 --- a/tests/Migration/MessageQueue/Handler/ResetChecksumHandlerTest.php +++ b/tests/Migration/MessageQueue/Handler/ResetChecksumHandlerTest.php @@ -15,6 +15,8 @@ use PHPUnit\Framework\TestCase; use Shopware\Core\Framework\Context; use Shopware\Core\Framework\DataAbstractionLayer\EntityRepository; +use Shopware\Core\Framework\DataAbstractionLayer\Search\Criteria; +use Shopware\Core\Framework\DataAbstractionLayer\Search\EntitySearchResult; use Shopware\Core\Framework\Log\Package; use Shopware\Core\Framework\Uuid\Uuid; use SwagMigrationAssistant\Migration\DataSelection\DefaultEntities; @@ -24,6 +26,8 @@ use SwagMigrationAssistant\Migration\Run\MigrationStep; use SwagMigrationAssistant\Migration\Run\RunTransitionServiceInterface; use SwagMigrationAssistant\Migration\Run\SwagMigrationRunCollection; +use SwagMigrationAssistant\Migration\Run\SwagMigrationRunDefinition; +use SwagMigrationAssistant\Migration\Run\SwagMigrationRunEntity; use Symfony\Component\Messenger\Envelope; use Symfony\Component\Messenger\MessageBusInterface; @@ -124,9 +128,10 @@ public function testInvokeWithSingleBatchWithRunId(): void $this->mockTotalCount(1); $this->mockResetChecksumsAndClearFlag(1); + $this->mockRunSearch($runId); $this->migrationRunRepo - ->expects(static::exactly(2)) + ->expects(static::once()) ->method('update') ->with(static::callback(function ($data) use ($runId) { return isset($data[0]['id']) @@ -155,9 +160,10 @@ public function testInvokeWithFullBatchDispatchesContinuation(): void $this->mockTotalCount(500); $this->mockResetChecksumsOnly(ResetChecksumHandler::BATCH_SIZE); + $this->mockRunSearch($runId); $this->migrationRunRepo - ->expects(static::exactly(2)) + ->expects(static::once()) ->method('update') ->with(static::callback(function ($data) use ($runId) { return $data[0]['id'] === $runId && isset($data[0]['progress']); @@ -215,6 +221,7 @@ public function testInvokePartOfAbortWithCompletion(): void ); $this->mockResetChecksumsAndClearFlag(0); + $this->mockRunSearch($runId); $this->runTransitionService ->expects(static::once()) @@ -223,7 +230,7 @@ public function testInvokePartOfAbortWithCompletion(): void $this->migrationRunRepo ->expects(static::once()) - ->method('upsert') + ->method('update') ->with(static::callback(function ($data) use ($runId) { $progress = $data[0]['progress']; @@ -260,9 +267,10 @@ public function testInvokePartOfAbortWithContinuation(): void $this->mockTotalCount(500); $this->mockResetChecksumsOnly(ResetChecksumHandler::BATCH_SIZE); + $this->mockRunSearch($runId); $this->migrationRunRepo - ->expects(static::exactly(2)) + ->expects(static::once()) ->method('update'); $this->messageBus @@ -293,9 +301,10 @@ public function testInvokeInitializesProgressOnFirstRun(): void $this->mockTotalCount(100); $this->mockResetChecksumsAndClearFlag(2); + $this->mockRunSearch($runId); $this->migrationRunRepo - ->expects(static::exactly(2)) + ->expects(static::once()) ->method('update') ->with(static::callback(function ($data) use ($runId) { return $data[0]['id'] === $runId && isset($data[0]['progress']); @@ -321,6 +330,7 @@ public function testInvokeContinuationWithTotalMappingsAlreadySet(): void ); $this->mockResetChecksumsAndClearFlag(2); + $this->mockRunSearch($runId); $this->migrationRunRepo ->expects(static::once()) @@ -352,9 +362,10 @@ public function testInvokeWithSpecificEntity(): void $this->mockTotalCount(5); $this->mockResetChecksumsAndClearFlag(1); + $this->mockRunSearch($runId); $this->migrationRunRepo - ->expects(static::exactly(2)) + ->expects(static::once()) ->method('update') ->with(static::callback(function ($data) use ($runId) { return $data[0]['id'] === $runId && isset($data[0]['progress']); @@ -435,4 +446,25 @@ private function mockResetChecksumsOnly(int $affectedRows): void ->with(static::stringContains('swag_migration_mapping')) ->willReturn($affectedRows); } + + private function mockRunSearch(string $runId): void + { + $run = new SwagMigrationRunEntity(); + $run->setId($runId); + + $collection = new SwagMigrationRunCollection([$run]); + + $searchResult = new EntitySearchResult( + SwagMigrationRunDefinition::ENTITY_NAME, + 1, + $collection, + null, + new Criteria(), + Context::createDefaultContext(), + ); + + $this->migrationRunRepo + ->method('search') + ->willReturn($searchResult); + } }