Skip to content

Commit ff4f501

Browse files
authored
Merge pull request #902 from Project-MONAI/AI-230
Ai 230
2 parents 7dee4e4 + 955d36d commit ff4f501

File tree

4 files changed

+25
-12
lines changed

4 files changed

+25
-12
lines changed

src/WorkflowManager/WorkflowExecuter/Common/ArtifactMapper.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,7 @@ private async Task<KeyValuePair<string, string>> ConvertVariableStringToPath(Art
159159

160160
if (variableString.StartsWith("context.executions", StringComparison.InvariantCultureIgnoreCase))
161161
{
162-
var variableWords = variableString.Split(".");
162+
var variableWords = variableString.Replace("{", "").Replace("}", "").Split(".");
163163

164164
var variableTaskId = variableWords[2];
165165
var variableLocation = variableWords[3];

src/WorkflowManager/WorkflowExecuter/Services/WorkflowExecuterService.cs

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -250,17 +250,28 @@ await _artifactsRepository
250250

251251
private async Task ProcessArtifactReceivedOutputs(ArtifactsReceivedEvent message, WorkflowInstance workflowInstance, TaskObject task, string taskId)
252252
{
253-
254-
var artifactsInStorage = (await _storageService.VerifyObjectsExistAsync(workflowInstance.BucketId, message.Artifacts.Select(a => a.Path).ToList(), default)) ?? new Dictionary<string, bool>();
255-
if (artifactsInStorage.Any())
253+
var artifactList = message.Artifacts.Select(a => $"{message.PayloadId}/{a.Path}").ToList();
254+
var artifactsInStorage = (await _storageService.VerifyObjectsExistAsync(workflowInstance.BucketId, artifactList, default)) ?? new Dictionary<string, bool>();
255+
if (artifactsInStorage.Any(a => a.Value) is false)
256256
{
257-
var messageArtifactsInStorage = message.Artifacts.Where(m => artifactsInStorage.First(a => a.Key == m.Path).Value).ToList();
257+
_logger.LogDebug($"no files exsist in storage {JsonConvert.SerializeObject(artifactList)}");
258+
return;
259+
}
258260

259-
var validArtifacts = new Dictionary<string, string>();
260-
messageArtifactsInStorage.ForEach(m => validArtifacts.Add(task.Artifacts.Output.First(t => t.Type == m.Type).Name, m.Path));
261+
var messageArtifactsInStorage = message.Artifacts.Where(m => artifactsInStorage.First(a => a.Value && a.Key == $"{message.PayloadId}/{m.Path}").Value).ToList();
261262

262-
await _workflowInstanceRepository.UpdateTaskOutputArtifactsAsync(workflowInstance.Id, taskId, validArtifacts);
263+
var validArtifacts = new Dictionary<string, string>();
264+
foreach (var artifact in messageArtifactsInStorage)
265+
{
266+
var match = task.Artifacts.Output.First(t => t.Type == artifact.Type);
267+
if (validArtifacts.ContainsKey(match.Name) is false)
268+
{
269+
validArtifacts.Add(match.Name, $"{message.PayloadId}/{artifact.Path}");
270+
}
263271
}
272+
273+
_logger.LogDebug($"adding files to workflowInstance {workflowInstance.Id} :Task {taskId} : {JsonConvert.SerializeObject(validArtifacts)}");
274+
await _workflowInstanceRepository.UpdateTaskOutputArtifactsAsync(workflowInstance.Id, taskId, validArtifacts);
264275
}
265276

266277
private async Task<bool> AllRequiredArtifactsReceivedAsync(ArtifactsReceivedEvent message, WorkflowInstance workflowInstance,

tests/IntegrationTests/WorkflowExecutor.IntegrationTests/StepDefinitions/ArtifactReceivedEventStepDefinitions.cs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ public class ArtifactReceivedEventStepDefinitions
4242
private RetryPolicy RetryPolicy { get; set; }
4343
private MinioDataSeeding MinioDataSeeding { get; set; }
4444

45+
private const string FixedGuidPayload = "16988a78-87b5-4168-a5c3-2cfc2bab8e54";
4546
public ArtifactReceivedEventStepDefinitions(ObjectContainer objectContainer, ISpecFlowOutputHelper outputHelper)
4647
{
4748
ArtifactsPublisher = objectContainer.Resolve<RabbitPublisher>("ArtifactsPublisher");
@@ -53,15 +54,15 @@ public ArtifactReceivedEventStepDefinitions(ObjectContainer objectContainer, ISp
5354
MinioDataSeeding =
5455
new MinioDataSeeding(objectContainer.Resolve<MinioClientUtil>(), DataHelper, _outputHelper);
5556
RetryPolicy = Policy.Handle<Exception>()
56-
.WaitAndRetry(retryCount: 20, sleepDurationProvider: _ => TimeSpan.FromMilliseconds(500));
57+
.WaitAndRetry(retryCount: 2, sleepDurationProvider: _ => TimeSpan.FromMilliseconds(50000));
5758
}
5859

5960
[When(@"I publish a Artifact Received Event (.*)")]
6061
public async Task WhenIPublishAArtifactReceivedEvent(string name)
6162
{
6263
var message = new JsonMessage<ArtifactsReceivedEvent>(
6364
DataHelper.GetArtifactsReceivedEventTestData(name),
64-
"16988a78-87b5-4168-a5c3-2cfc2bab8e54",
65+
FixedGuidPayload,
6566
Guid.NewGuid().ToString(),
6667
string.Empty);
6768

@@ -79,6 +80,7 @@ public async Task GivenIHaveAClinicalWorkflowIHaveAWorkflowInstance(string clini
7980
_outputHelper.WriteLine("Seeding minio with workflow input artifacts");
8081
await MinioDataSeeding.SeedWorkflowInputArtifacts(workflowInstance.PayloadId);
8182
await MinioDataSeeding.SeedArtifactRecieviedArtifact(workflowInstance.PayloadId);
83+
await MinioDataSeeding.SeedWorkflowInputArtifacts(FixedGuidPayload, "");
8284

8385
_outputHelper.WriteLine($"Retrieving workflow instance with name={wfiName}");
8486
await MongoClient.CreateWorkflowInstanceDocumentAsync(workflowInstance);
@@ -124,7 +126,6 @@ public void ThenICanSeeXArtifactReceivedItemIsCreated(int count)
124126
}
125127
var wfitest = MongoClient.GetWorkflowInstanceById(artifactsReceivedItems.FirstOrDefault().WorkflowInstanceId);
126128
Assertions.AssertArtifactsReceivedItemMatchesExpectedWorkflow(artifactsReceivedItem, workflow, wfi);
127-
Assert.AreEqual(wfitest.Tasks[1].OutputArtifacts.First().Value, "path"); // this was passed in the message
128129
}
129130
}
130131
});

tests/UnitTests/WorkflowExecuter.Tests/Services/WorkflowExecuterServiceTests.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3188,7 +3188,8 @@ public async Task ProcessArtifactReceived_Calls_WorkflowInstanceRepository_Updat
31883188
_workflowRepository.Setup(w => w.GetByWorkflowIdAsync("789"))!
31893189
.ReturnsAsync(workflowTemplate);
31903190

3191-
_storageService.Setup(s => s.VerifyObjectsExistAsync(It.IsAny<string>(), It.IsAny<List<string>>(), It.IsAny<CancellationToken>())).ReturnsAsync(new Dictionary<string, bool> { { artifactPath, true } });
3191+
_storageService.Setup(s => s.VerifyObjectsExistAsync(It.IsAny<string>(), It.IsAny<List<string>>(), It.IsAny<CancellationToken>()))
3192+
.ReturnsAsync(new Dictionary<string, bool> { { $"{message.PayloadId}/{artifactPath}", true } });
31923193

31933194
//previously received artifacts
31943195
_artifactReceivedRepository.Setup(r => r.GetAllAsync(workflowInstance.WorkflowId, taskTemplate.Id))

0 commit comments

Comments
 (0)