Skip to content

Commit 7dee4e4

Browse files
authored
Merge pull request #901 from Project-MONAI/AI-230
added code to register outputs from ArtifactRecieved
2 parents bf68b15 + 2323464 commit 7dee4e4

File tree

4 files changed

+65
-0
lines changed

4 files changed

+65
-0
lines changed

src/WorkflowManager/WorkflowExecuter/Services/WorkflowExecuterService.cs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -210,6 +210,8 @@ public async Task<bool> ProcessArtifactReceivedAsync(ArtifactsReceivedEvent mess
210210
return false;
211211
}
212212

213+
await ProcessArtifactReceivedOutputs(message, workflowInstance, taskTemplate, taskId);
214+
213215
var previouslyReceivedArtifactsFromRepo = await _artifactsRepository.GetAllAsync(workflowInstanceId, taskId).ConfigureAwait(false);
214216
if (previouslyReceivedArtifactsFromRepo is null || previouslyReceivedArtifactsFromRepo.Count == 0)
215217
{
@@ -246,6 +248,21 @@ await _artifactsRepository
246248
return true;
247249
}
248250

251+
private async Task ProcessArtifactReceivedOutputs(ArtifactsReceivedEvent message, WorkflowInstance workflowInstance, TaskObject task, string taskId)
252+
{
253+
254+
var artifactsInStorage = (await _storageService.VerifyObjectsExistAsync(workflowInstance.BucketId, message.Artifacts.Select(a => a.Path).ToList(), default)) ?? new Dictionary<string, bool>();
255+
if (artifactsInStorage.Any())
256+
{
257+
var messageArtifactsInStorage = message.Artifacts.Where(m => artifactsInStorage.First(a => a.Key == m.Path).Value).ToList();
258+
259+
var validArtifacts = new Dictionary<string, string>();
260+
messageArtifactsInStorage.ForEach(m => validArtifacts.Add(task.Artifacts.Output.First(t => t.Type == m.Type).Name, m.Path));
261+
262+
await _workflowInstanceRepository.UpdateTaskOutputArtifactsAsync(workflowInstance.Id, taskId, validArtifacts);
263+
}
264+
}
265+
249266
private async Task<bool> AllRequiredArtifactsReceivedAsync(ArtifactsReceivedEvent message, WorkflowInstance workflowInstance,
250267
string taskId, string workflowInstanceId, WorkflowRevision workflowTemplate)
251268
{

tests/IntegrationTests/WorkflowExecutor.IntegrationTests/StepDefinitions/ArtifactReceivedEventStepDefinitions.cs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
using Monai.Deploy.WorkflowManager.Common.IntegrationTests.Support;
2222
using Monai.Deploy.WorkflowManager.Common.WorkflowExecutor.IntegrationTests.Support;
2323
using MongoDB.Driver;
24+
using NUnit.Framework;
2425
using Polly;
2526
using Polly.Retry;
2627
using TechTalk.SpecFlow.Infrastructure;
@@ -77,6 +78,7 @@ public async Task GivenIHaveAClinicalWorkflowIHaveAWorkflowInstance(string clini
7778

7879
_outputHelper.WriteLine("Seeding minio with workflow input artifacts");
7980
await MinioDataSeeding.SeedWorkflowInputArtifacts(workflowInstance.PayloadId);
81+
await MinioDataSeeding.SeedArtifactRecieviedArtifact(workflowInstance.PayloadId);
8082

8183
_outputHelper.WriteLine($"Retrieving workflow instance with name={wfiName}");
8284
await MongoClient.CreateWorkflowInstanceDocumentAsync(workflowInstance);
@@ -120,7 +122,9 @@ public void ThenICanSeeXArtifactReceivedItemIsCreated(int count)
120122
{
121123
throw new Exception("Failing Test");
122124
}
125+
var wfitest = MongoClient.GetWorkflowInstanceById(artifactsReceivedItems.FirstOrDefault().WorkflowInstanceId);
123126
Assertions.AssertArtifactsReceivedItemMatchesExpectedWorkflow(artifactsReceivedItem, workflow, wfi);
127+
Assert.AreEqual(wfitest.Tasks[1].OutputArtifacts.First().Value, "path"); // this was passed in the message
124128
}
125129
}
126130
});

tests/IntegrationTests/WorkflowExecutor.IntegrationTests/Support/MinioDataSeeding.cs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,13 @@ public async Task SeedWorkflowInputArtifacts(string payloadId, string? folderNam
6464
OutputHelper.WriteLine($"Objects seeded");
6565
}
6666

67+
public async Task SeedArtifactRecieviedArtifact(string payloadId)
68+
{
69+
var localPath = Path.Combine(GetDirectory() ?? "", "DICOMs", "full_patient_metadata", "dcm");
70+
71+
await MinioClient.AddFileToStorage(localPath, $"path");
72+
}
73+
6774
public async Task SeedTaskOutputArtifacts(string payloadId, string workflowInstanceId, string executionId, string? folderName = null)
6875
{
6976
string localPath;

tests/UnitTests/WorkflowExecuter.Tests/Services/WorkflowExecuterServiceTests.cs

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3161,6 +3161,43 @@ public async Task ArtifactReceveid_Valid_ReturnesTrue()
31613161

31623162
Assert.True(result);
31633163
}
3164+
[Fact]
3165+
public async Task ProcessArtifactReceived_Calls_WorkflowInstanceRepository_UpdateTaskOutputArtifactsAsync()
3166+
{
3167+
var artifactPath = "some path here";
3168+
//incoming artifacts
3169+
var message = new ArtifactsReceivedEvent
3170+
{
3171+
WorkflowInstanceId = "123", TaskId = "456",
3172+
Artifacts = new List<Messaging.Common.Artifact>() { new Messaging.Common.Artifact() { Type = ArtifactType.CT, Path = artifactPath } }
3173+
};
3174+
var workflowInstance = new WorkflowInstance
3175+
{
3176+
WorkflowId = "789", Tasks = new List<TaskExecution>()
3177+
{ new TaskExecution() { TaskId = "not456" } }
3178+
};
3179+
_workflowInstanceRepository.Setup(w => w.GetByWorkflowInstanceIdAsync(message.WorkflowInstanceId))!
3180+
.ReturnsAsync(workflowInstance);
3181+
//expected artifacts
3182+
var templateArtifacts = new OutputArtifact[]
3183+
{
3184+
new OutputArtifact() { Type = ArtifactType.CT , Name = "CT scan"},
3185+
};
3186+
var taskTemplate = new TaskObject() { Id = "456", Artifacts = new ArtifactMap { Output = templateArtifacts } };
3187+
var workflowTemplate = new WorkflowRevision { Workflow = new Workflow { Tasks = new[] { taskTemplate } } };
3188+
_workflowRepository.Setup(w => w.GetByWorkflowIdAsync("789"))!
3189+
.ReturnsAsync(workflowTemplate);
3190+
3191+
_storageService.Setup(s => s.VerifyObjectsExistAsync(It.IsAny<string>(), It.IsAny<List<string>>(), It.IsAny<CancellationToken>())).ReturnsAsync(new Dictionary<string, bool> { { artifactPath, true } });
3192+
3193+
//previously received artifacts
3194+
_artifactReceivedRepository.Setup(r => r.GetAllAsync(workflowInstance.WorkflowId, taskTemplate.Id))
3195+
.ReturnsAsync((List<ArtifactReceivedItems>?)null);
3196+
3197+
var result = await WorkflowExecuterService.ProcessArtifactReceivedAsync(message);
3198+
3199+
_workflowInstanceRepository.Verify(w => w.UpdateTaskOutputArtifactsAsync(It.IsAny<string>(), It.IsAny<string>(), It.IsAny<Dictionary<string, string>>()), Times.Once());
3200+
}
31643201
}
31653202
#pragma warning restore CS8625 // Cannot convert null literal to non-nullable reference type.
31663203
}

0 commit comments

Comments
 (0)