Skip to content

Commit 84ee385

Browse files
authored
Merge pull request #932 from Project-MONAI/AI-358
Ai 358
2 parents 460e86a + 708ceb2 commit 84ee385

File tree

5 files changed

+56
-58
lines changed

5 files changed

+56
-58
lines changed

src/WorkflowManager/Database/Repositories/ArtifactsRepository.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -183,7 +183,7 @@ public async Task AddOrUpdateItemAsync(string workflowInstanceId, string taskId,
183183
}
184184
else
185185
{
186-
item.Artifacts = item.Artifacts.Concat(existing.Artifacts).ToList();
186+
item.Artifacts = item.Artifacts.Union(existing.Artifacts).ToList();
187187
var update = Builders<ArtifactReceivedItems>.Update.Set(a => a.Artifacts, item.Artifacts);
188188
await _artifactReceivedItemsCollection
189189
.UpdateOneAsync(a => a.WorkflowInstanceId == workflowInstanceId && a.TaskId == taskId, update)

src/WorkflowManager/WorkflowExecuter/Services/WorkflowExecuterService.cs

Lines changed: 46 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -251,7 +251,7 @@ await _artifactsRepository
251251
return true;
252252
}
253253

254-
private async Task ProcessArtifactReceivedOutputs(ArtifactsReceivedEvent message, WorkflowInstance workflowInstance, TaskObject task, string taskId)
254+
private async Task ProcessArtifactReceivedOutputs(ArtifactsReceivedEvent message, WorkflowInstance workflowInstance, TaskObject taskTemplate, string taskId)
255255
{
256256
var artifactList = message.Artifacts.Select(a => $"{a.Path}").ToList();
257257
var artifactsInStorage = (await _storageService.VerifyObjectsExistAsync(workflowInstance.BucketId, artifactList, default)) ?? new Dictionary<string, bool>();
@@ -263,22 +263,36 @@ private async Task ProcessArtifactReceivedOutputs(ArtifactsReceivedEvent message
263263

264264
var messageArtifactsInStorage = message.Artifacts.Where(m => artifactsInStorage.First(a => a.Value && a.Key == $"{m.Path}").Value).ToList();
265265

266+
var addedNew = false;
266267
var validArtifacts = new Dictionary<string, string>();
267268
foreach (var artifact in messageArtifactsInStorage)
268269
{
269-
var match = task.Artifacts.Output.FirstOrDefault(t => t.Type == artifact.Type);
270+
var match = taskTemplate.Artifacts.Output.FirstOrDefault(t => t.Type == artifact.Type);
270271
if (match is not null && validArtifacts.ContainsKey(match!.Name) is false)
271272
{
272273
validArtifacts.Add(match.Name, $"{artifact.Path}");
274+
273275
}
274276
}
275277

276278
var currentTask = workflowInstance.Tasks?.Find(t => t.TaskId == taskId);
277279

278-
currentTask!.OutputArtifacts = validArtifacts; // adding the actual paths here, the parent function does the saving of the changes
279280

280-
_logger.AddingFilesToWorkflowInstance(workflowInstance.Id, taskId, JsonConvert.SerializeObject(validArtifacts));
281-
await _workflowInstanceRepository.UpdateTaskOutputArtifactsAsync(workflowInstance.Id, taskId, validArtifacts);
281+
foreach (var artifact in validArtifacts)
282+
{
283+
if (currentTask?.OutputArtifacts.ContainsKey(artifact.Key) is false)
284+
{
285+
// adding the actual paths here, the parent function does the saving of the changes
286+
currentTask?.OutputArtifacts.Add(artifact.Key, artifact.Value);
287+
addedNew = true;
288+
}
289+
}
290+
291+
if (currentTask is not null && addedNew)
292+
{
293+
_logger.AddingFilesToWorkflowInstance(workflowInstance.Id, taskId, JsonConvert.SerializeObject(validArtifacts));
294+
await _workflowInstanceRepository.UpdateTaskAsync(workflowInstance.Id, taskId, currentTask);
295+
}
282296
}
283297

284298
private async Task<bool> AllRequiredArtifactsReceivedAsync(ArtifactsReceivedEvent message, WorkflowInstance workflowInstance,
@@ -511,9 +525,13 @@ public async Task<bool> ProcessExportComplete(ExportCompleteEvent message, strin
511525
return false;
512526
}
513527

514-
if (string.Compare(task.TaskType, ValidationConstants.ExportTaskType, true) == 0)
528+
switch (task.TaskType)
515529
{
516-
return await HandleTaskDestinations(workflowInstance, workflow, task, correlationId);
530+
case TaskTypeConstants.DicomExportTask:
531+
case TaskTypeConstants.HL7ExportTask:
532+
return await HandleTaskDestinations(workflowInstance, workflow, task, correlationId);
533+
default:
534+
break;
517535
}
518536
}
519537

@@ -612,7 +630,12 @@ private async Task<bool> ExternalAppRequest(ExternalAppRequestEvent externalAppR
612630
return true;
613631
}
614632

615-
private async Task HandleDicomExportAsync(WorkflowRevision workflow, WorkflowInstance workflowInstance, TaskExecution task, string correlationId, List<string>? plugins = null)
633+
private async Task HandleDicomExportAsync(
634+
WorkflowRevision workflow,
635+
WorkflowInstance workflowInstance,
636+
TaskExecution task,
637+
string correlationId,
638+
List<string>? plugins = null)
616639
{
617640
plugins ??= new List<string>();
618641
var (exportList, artifactValues) = await GetExportsAndArtifcts(workflow, workflowInstance, task, correlationId);
@@ -629,15 +652,20 @@ private async Task HandleDicomExportAsync(WorkflowRevision workflow, WorkflowIns
629652
await _workflowInstanceRepository.UpdateTaskStatusAsync(workflowInstance.Id, task.TaskId, TaskExecutionStatus.Dispatched);
630653
}
631654

632-
private async Task<(string[]? exportList, string[]? artifactValues)> GetExportsAndArtifcts(WorkflowRevision workflow, WorkflowInstance workflowInstance, TaskExecution task, string correlationId)
655+
private async Task<(string[]? exportList, string[]? artifactValues)> GetExportsAndArtifcts(
656+
WorkflowRevision workflow,
657+
WorkflowInstance workflowInstance,
658+
TaskExecution task,
659+
string correlationId,
660+
bool enforceDcmOnly = true)
633661
{
634662
var exportList = workflow.Workflow?.Tasks?.FirstOrDefault(t => t.Id == task.TaskId)?.ExportDestinations.Select(e => e.Name).ToArray();
635663
if (exportList is null || !exportList.Any())
636664
{
637665
exportList = null;
638666
}
639667

640-
var artifactValues = await GetArtifactValues(workflow, workflowInstance, task, exportList, correlationId);
668+
var artifactValues = await GetArtifactValues(workflow, workflowInstance, task, exportList, correlationId, enforceDcmOnly);
641669

642670
if (artifactValues.IsNullOrEmpty())
643671
{
@@ -646,7 +674,12 @@ private async Task HandleDicomExportAsync(WorkflowRevision workflow, WorkflowIns
646674
return (exportList, artifactValues);
647675
}
648676

649-
private async Task<string[]> GetArtifactValues(WorkflowRevision workflow, WorkflowInstance workflowInstance, TaskExecution task, string[]? exportList, string correlationId)
677+
private async Task<string[]> GetArtifactValues(
678+
WorkflowRevision workflow, WorkflowInstance workflowInstance,
679+
TaskExecution task,
680+
string[]? exportList,
681+
string correlationId,
682+
bool enforceDcmOnly = true)
650683
{
651684
var artifactValues = GetDicomExports(workflow, task, exportList);
652685

@@ -660,7 +693,7 @@ private async Task<string[]> GetArtifactValues(WorkflowRevision workflow, Workfl
660693
artifact,
661694
true);
662695

663-
var dcmFiles = objects?.Where(o => o.IsValidDicomFile())?.ToList();
696+
var dcmFiles = objects?.Where(o => o.IsValidDicomFile() || enforceDcmOnly is false)?.ToList();
664697

665698
if (dcmFiles?.IsNullOrEmpty() is false)
666699
{
@@ -681,7 +714,7 @@ private async Task<string[]> GetArtifactValues(WorkflowRevision workflow, Workfl
681714

682715
private async Task HandleHl7ExportAsync(WorkflowRevision workflow, WorkflowInstance workflowInstance, TaskExecution task, string correlationId)
683716
{
684-
var (exportList, artifactValues) = await GetExportsAndArtifcts(workflow, workflowInstance, task, correlationId);
717+
var (exportList, artifactValues) = await GetExportsAndArtifcts(workflow, workflowInstance, task, correlationId, false);
685718

686719
if (exportList is null || artifactValues is null)
687720
{

tests/IntegrationTests/WorkflowExecutor.IntegrationTests/TestData/WorkflowInstanceTestData.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
*/
1616

1717
using Monai.Deploy.Messaging.Events;
18+
using Monai.Deploy.WorkflowManager.Common.Contracts.Constants;
1819
using Monai.Deploy.WorkflowManager.Common.Contracts.Models;
1920
using Monai.Deploy.WorkflowManager.Common.IntegrationTests.POCO;
2021
#pragma warning disable CS8602 // Dereference of a possibly null reference.
@@ -209,7 +210,7 @@ public static WorkflowInstance CreateWorkflowInstance(string workflowName)
209210
ExecutionId = Guid.NewGuid().ToString(),
210211
TaskId = "7d7c8b83-6628-413c-9912-a89314e5e2d5",
211212
OutputDirectory = "payloadId/workflows/workflowInstanceId/executionId/",
212-
TaskType = "Export",
213+
TaskType = TaskTypeConstants.DicomExportTask,
213214
Status = TaskExecutionStatus.Dispatched
214215
}
215216
}

tests/IntegrationTests/WorkflowExecutor.IntegrationTests/TestData/WorkflowRevisionTestData.cs

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
*/
1616

1717
using Monai.Deploy.Messaging.Common;
18+
using Monai.Deploy.WorkflowManager.Common.Contracts.Constants;
1819
using Monai.Deploy.WorkflowManager.Common.Contracts.Models;
1920
using Artifact = Monai.Deploy.WorkflowManager.Common.Contracts.Models.Artifact;
2021
// ReSharper disable ArrangeObjectCreationWhenTypeEvident
@@ -2174,7 +2175,7 @@ public static class WorkflowRevisionsTestData
21742175
new TaskObject
21752176
{
21762177
Id = "export_task_1",
2177-
Type = "Export",
2178+
Type = TaskTypeConstants.DicomExportTask,
21782179
Description = "Export Workflow 1 Task 2",
21792180
ExportDestinations = new ExportDestination[]
21802181
{
@@ -2235,7 +2236,7 @@ public static class WorkflowRevisionsTestData
22352236
new TaskObject
22362237
{
22372238
Id = "export_task_1",
2238-
Type = "Export",
2239+
Type = TaskTypeConstants.DicomExportTask,
22392240
Description = "Export Workflow 1 Task 2",
22402241
ExportDestinations = new ExportDestination[]
22412242
{
@@ -2296,7 +2297,7 @@ public static class WorkflowRevisionsTestData
22962297
new TaskObject
22972298
{
22982299
Id = "export_task_1",
2299-
Type = "Export",
2300+
Type = TaskTypeConstants.DicomExportTask,
23002301
Description = "Export Workflow 1 Task 2",
23012302
ExportDestinations = new ExportDestination[]
23022303
{
@@ -2358,7 +2359,7 @@ public static class WorkflowRevisionsTestData
23582359
new TaskObject
23592360
{
23602361
Id = "export_task_1",
2361-
Type = "Export",
2362+
Type = TaskTypeConstants.DicomExportTask,
23622363
Description = "Export Workflow 1 Task 2",
23632364
ExportDestinations = new ExportDestination[]
23642365
{
@@ -2375,7 +2376,7 @@ public static class WorkflowRevisionsTestData
23752376
new TaskObject
23762377
{
23772378
Id = "export_task_2",
2378-
Type = "Export",
2379+
Type = TaskTypeConstants.DicomExportTask,
23792380
Description = "Export Workflow 1 Task 3",
23802381
ExportDestinations = new ExportDestination[]
23812382
{
@@ -2437,7 +2438,7 @@ public static class WorkflowRevisionsTestData
24372438
new TaskObject
24382439
{
24392440
Id = "export_task_1",
2440-
Type = "Export",
2441+
Type = TaskTypeConstants.DicomExportTask,
24412442
Description = "Export Workflow 1 Task 2",
24422443
ExportDestinations = new ExportDestination[]
24432444
{

tests/UnitTests/WorkflowExecuter.Tests/Services/WorkflowExecuterServiceTests.cs

Lines changed: 0 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -3184,44 +3184,7 @@ public async Task ArtifactReceveid_Valid_ReturnesTrue()
31843184

31853185
Assert.True(result);
31863186
}
3187-
[Fact]
3188-
public async Task ProcessArtifactReceived_Calls_WorkflowInstanceRepository_UpdateTaskOutputArtifactsAsync()
3189-
{
3190-
var artifactPath = "some path here";
3191-
//incoming artifacts
3192-
var message = new ArtifactsReceivedEvent
3193-
{
3194-
WorkflowInstanceId = "123", TaskId = "456",
3195-
Artifacts = new List<Messaging.Common.Artifact>() { new Messaging.Common.Artifact() { Type = ArtifactType.CT, Path = $"{new Guid()}/{artifactPath}" } }
3196-
};
3197-
var workflowInstance = new WorkflowInstance
3198-
{
3199-
WorkflowId = "789", Tasks = new List<TaskExecution>()
3200-
{ new TaskExecution() { TaskId = "456" } }
3201-
};
3202-
_workflowInstanceRepository.Setup(w => w.GetByWorkflowInstanceIdAsync(message.WorkflowInstanceId))!
3203-
.ReturnsAsync(workflowInstance);
3204-
//expected artifacts
3205-
var templateArtifacts = new OutputArtifact[]
3206-
{
3207-
new OutputArtifact() { Type = ArtifactType.CT , Name = "CT scan"},
3208-
};
3209-
var taskTemplate = new TaskObject() { Id = "456", Artifacts = new ArtifactMap { Output = templateArtifacts } };
3210-
var workflowTemplate = new WorkflowRevision { Workflow = new Workflow { Tasks = new[] { taskTemplate } } };
3211-
_workflowRepository.Setup(w => w.GetByWorkflowIdAsync("789"))!
3212-
.ReturnsAsync(workflowTemplate);
3213-
3214-
_storageService.Setup(s => s.VerifyObjectsExistAsync(It.IsAny<string>(), It.IsAny<List<string>>(), It.IsAny<CancellationToken>()))
3215-
.ReturnsAsync(new Dictionary<string, bool> { { $"{message.PayloadId}/{artifactPath}", true } });
32163187

3217-
//previously received artifacts
3218-
_artifactReceivedRepository.Setup(r => r.GetAllAsync(workflowInstance.WorkflowId, taskTemplate.Id))
3219-
.ReturnsAsync((List<ArtifactReceivedItems>?)null);
3220-
3221-
var result = await WorkflowExecuterService.ProcessArtifactReceivedAsync(message);
3222-
3223-
_workflowInstanceRepository.Verify(w => w.UpdateTaskOutputArtifactsAsync(It.IsAny<string>(), It.IsAny<string>(), It.IsAny<Dictionary<string, string>>()), Times.Once());
3224-
}
32253188
[Fact]
32263189
public async Task ProcessPayload_WithExportTask_NoExportsFails()
32273190
{

0 commit comments

Comments
 (0)