Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 53 additions & 67 deletions src/Migration/MessageQueue/Handler/ResetChecksumHandler.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@

use Doctrine\DBAL\Connection;
use Doctrine\DBAL\ParameterType;
use Shopware\Core\Framework\Context;
use Shopware\Core\Framework\DataAbstractionLayer\EntityRepository;
use Shopware\Core\Framework\DataAbstractionLayer\Search\Criteria;
use Shopware\Core\Framework\Log\Package;
use Shopware\Core\Framework\Uuid\Uuid;
use SwagMigrationAssistant\Migration\DataSelection\DefaultEntities;
Expand Down Expand Up @@ -48,92 +48,60 @@ public function __invoke(ResetChecksumMessage $message): void
{
$connectionId = $message->getConnectionId();
$totalMappings = $message->getTotalMappings();
$progress = null;

if ($totalMappings === null) {
$totalMappings = $this->getTotalMappingsCount($connectionId);

if ($message->getRunId() !== null && $totalMappings > 0) {
$progress = $this->updateProgress(
$message,
0,
$totalMappings,
$message->getContext()
);
}
}

$affectedRows = $this->resetChecksums($connectionId);
$newProcessedCount = $message->getProcessedMappings() + $affectedRows;

if ($affectedRows === 0) {
$this->handleCompletion($message, $progress);
$isCompleted = $affectedRows < self::BATCH_SIZE;

return;
}
if ($isCompleted) {
$this->handleCompletion($message);

$newProcessedCount = $message->getProcessedMappings() + $affectedRows;

if ($message->getRunId() !== null) {
$progress = $this->updateProgress(
$message,
$newProcessedCount,
$totalMappings,
$message->getContext()
);
if ($message->isPartOfAbort()) {
return;
}
}

if ($affectedRows < self::BATCH_SIZE) {
$this->handleCompletion($message, $progress);

return;
if ($message->getRunId() !== null && $totalMappings > 0) {
$this->updateProgress($message, $newProcessedCount, $totalMappings);
}

$this->messageBus->dispatch(new ResetChecksumMessage(
$message->getConnectionId(),
$message->getContext(),
$message->getRunId(),
$message->getEntity(),
$totalMappings,
$newProcessedCount,
$message->isPartOfAbort()
));
if (!$isCompleted) {
$this->messageBus->dispatch(new ResetChecksumMessage(
$message->getConnectionId(),
$message->getContext(),
$message->getRunId(),
$message->getEntity(),
$totalMappings,
$newProcessedCount,
$message->isPartOfAbort()
));
}
}

private function handleCompletion(ResetChecksumMessage $message, ?MigrationProgress $progress): void
private function handleCompletion(ResetChecksumMessage $message): void
{
$this->clearResettingChecksumsFlag();
$runId = $message->getRunId();

if (!$message->isPartOfAbort() || $message->getRunId() === null) {
if (!$message->isPartOfAbort() || $runId === null) {
return;
}

$runId = $message->getRunId();
$context = $message->getContext();

$this->runTransitionService->forceTransitionToRunStep(
$runId,
MigrationStep::CLEANUP
);

$finalProgress = new MigrationProgress(
0,
0,
$progress?->getDataSets() ?? new ProgressDataSetCollection(),
$message->getEntity() ?? DefaultEntities::RULE,
$progress?->getCurrentEntityProgress() ?? 0
);
$finalProgress->setIsAborted(true);

$this->migrationRunRepo->upsert([
[
'id' => $runId,
'progress' => $finalProgress->jsonSerialize(),
],
], $context);
$this->updateProgress($message, 0, 0, true);

$this->messageBus->dispatch(new MigrationProcessMessage(
$context,
$runId
$message->getContext(),
$runId,
));
}

Expand Down Expand Up @@ -168,22 +136,40 @@ private function getTotalMappingsCount(string $connectionId): int
->fetchOne();
}

private function updateProgress(ResetChecksumMessage $message, int $processed, int $total, Context $context): MigrationProgress
private function updateProgress(ResetChecksumMessage $message, int $processed, int $total, bool $isAborted = false): void
{
$progress = new MigrationProgress(
$runId = $message->getRunId();

if ($runId === null) {
return;
}

$run = $this->migrationRunRepo->search(
new Criteria([$runId]),
$message->getContext(),
)->getEntities()->first();

if ($run === null) {
return;
}

$progress = $run->getProgress();
$newProgress = new MigrationProgress(
$processed,
$total,
new ProgressDataSetCollection(),
$progress?->getDataSets() ?? new ProgressDataSetCollection(),
$message->getEntity() ?? DefaultEntities::RULE,
$processed
);

$this->migrationRunRepo->update([[
'id' => $message->getRunId(),
'progress' => $progress->jsonSerialize(),
]], $context);
if ($isAborted) {
$newProgress->setIsAborted(true);
}

return $progress;
$this->migrationRunRepo->update([[
'id' => $runId,
'progress' => $newProgress->jsonSerialize(),
]], $message->getContext());
}

private function clearResettingChecksumsFlag(): void
Expand Down
1 change: 1 addition & 0 deletions src/Migration/Run/RunService.php
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ public function abortMigration(Context $context): void

$this->runTransitionService->transitionToRunStep($runId, MigrationStep::ABORTING);

$this->bus->dispatch(new MigrationProcessMessage($context, $runId));
$this->fireTrackingInformation(self::TRACKING_EVENT_MIGRATION_ABORTED, $runId, $context);
}

Expand Down
44 changes: 38 additions & 6 deletions tests/Migration/MessageQueue/Handler/ResetChecksumHandlerTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
use PHPUnit\Framework\TestCase;
use Shopware\Core\Framework\Context;
use Shopware\Core\Framework\DataAbstractionLayer\EntityRepository;
use Shopware\Core\Framework\DataAbstractionLayer\Search\Criteria;
use Shopware\Core\Framework\DataAbstractionLayer\Search\EntitySearchResult;
use Shopware\Core\Framework\Log\Package;
use Shopware\Core\Framework\Uuid\Uuid;
use SwagMigrationAssistant\Migration\DataSelection\DefaultEntities;
Expand All @@ -24,6 +26,8 @@
use SwagMigrationAssistant\Migration\Run\MigrationStep;
use SwagMigrationAssistant\Migration\Run\RunTransitionServiceInterface;
use SwagMigrationAssistant\Migration\Run\SwagMigrationRunCollection;
use SwagMigrationAssistant\Migration\Run\SwagMigrationRunDefinition;
use SwagMigrationAssistant\Migration\Run\SwagMigrationRunEntity;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\MessageBusInterface;

Expand Down Expand Up @@ -124,9 +128,10 @@ public function testInvokeWithSingleBatchWithRunId(): void

$this->mockTotalCount(1);
$this->mockResetChecksumsAndClearFlag(1);
$this->mockRunSearch($runId);

$this->migrationRunRepo
->expects(static::exactly(2))
->expects(static::once())
->method('update')
->with(static::callback(function ($data) use ($runId) {
return isset($data[0]['id'])
Expand Down Expand Up @@ -155,9 +160,10 @@ public function testInvokeWithFullBatchDispatchesContinuation(): void

$this->mockTotalCount(500);
$this->mockResetChecksumsOnly(ResetChecksumHandler::BATCH_SIZE);
$this->mockRunSearch($runId);

$this->migrationRunRepo
->expects(static::exactly(2))
->expects(static::once())
->method('update')
->with(static::callback(function ($data) use ($runId) {
return $data[0]['id'] === $runId && isset($data[0]['progress']);
Expand Down Expand Up @@ -215,6 +221,7 @@ public function testInvokePartOfAbortWithCompletion(): void
);

$this->mockResetChecksumsAndClearFlag(0);
$this->mockRunSearch($runId);

$this->runTransitionService
->expects(static::once())
Expand All @@ -223,7 +230,7 @@ public function testInvokePartOfAbortWithCompletion(): void

$this->migrationRunRepo
->expects(static::once())
->method('upsert')
->method('update')
->with(static::callback(function ($data) use ($runId) {
$progress = $data[0]['progress'];

Expand Down Expand Up @@ -260,9 +267,10 @@ public function testInvokePartOfAbortWithContinuation(): void

$this->mockTotalCount(500);
$this->mockResetChecksumsOnly(ResetChecksumHandler::BATCH_SIZE);
$this->mockRunSearch($runId);

$this->migrationRunRepo
->expects(static::exactly(2))
->expects(static::once())
->method('update');

$this->messageBus
Expand Down Expand Up @@ -293,9 +301,10 @@ public function testInvokeInitializesProgressOnFirstRun(): void

$this->mockTotalCount(100);
$this->mockResetChecksumsAndClearFlag(2);
$this->mockRunSearch($runId);

$this->migrationRunRepo
->expects(static::exactly(2))
->expects(static::once())
->method('update')
->with(static::callback(function ($data) use ($runId) {
return $data[0]['id'] === $runId && isset($data[0]['progress']);
Expand All @@ -321,6 +330,7 @@ public function testInvokeContinuationWithTotalMappingsAlreadySet(): void
);

$this->mockResetChecksumsAndClearFlag(2);
$this->mockRunSearch($runId);

$this->migrationRunRepo
->expects(static::once())
Expand Down Expand Up @@ -352,9 +362,10 @@ public function testInvokeWithSpecificEntity(): void

$this->mockTotalCount(5);
$this->mockResetChecksumsAndClearFlag(1);
$this->mockRunSearch($runId);

$this->migrationRunRepo
->expects(static::exactly(2))
->expects(static::once())
->method('update')
->with(static::callback(function ($data) use ($runId) {
return $data[0]['id'] === $runId && isset($data[0]['progress']);
Expand Down Expand Up @@ -435,4 +446,25 @@ private function mockResetChecksumsOnly(int $affectedRows): void
->with(static::stringContains('swag_migration_mapping'))
->willReturn($affectedRows);
}

private function mockRunSearch(string $runId): void
{
$run = new SwagMigrationRunEntity();
$run->setId($runId);

$collection = new SwagMigrationRunCollection([$run]);

$searchResult = new EntitySearchResult(
SwagMigrationRunDefinition::ENTITY_NAME,
1,
$collection,
null,
new Criteria(),
Context::createDefaultContext(),
);

$this->migrationRunRepo
->method('search')
->willReturn($searchResult);
}
}
Loading