Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
//
// Copyright 2023 Guy’s and St Thomas’ NHS Foundation Trust
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

using Monai.Deploy.WorkflowManager.Common.Contracts.Models;
using Mongo.Migration.Migrations.Document;
using MongoDB.Bson;

namespace Monai.Deploy.WorkflowManager.Common.Contracts.Migrations
{
public class M002_WorkflowRevision_addVerion : DocumentMigration<WorkflowRevision>
{
public M002_WorkflowRevision_addVerion() : base("1.0.0") { }

public override void Up(BsonDocument document)
{
// empty, but this will make all objects re-saved with a version
}
public override void Down(BsonDocument document)
{
try
{
document.Remove("Version");
}
catch
{ // can ignore we dont want failures stopping startup !
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@

namespace Monai.Deploy.WorkflowManager.Common.Contracts.Migrations
{
public class M004_WorkflowRevision_AddDataRetension : DocumentMigration<WorkflowRevision>
public class M003_WorkflowRevision_addDataRetension : DocumentMigration<WorkflowRevision>
{
public M004_WorkflowRevision_AddDataRetension() : base("1.0.1") { }
public M003_WorkflowRevision_addDataRetension() : base("1.0.1") { }

public override void Up(BsonDocument document)
{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
//
// Copyright 2023 Guy’s and St Thomas’ NHS Foundation Trust
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

using Monai.Deploy.WorkflowManager.Common.Contracts.Models;
using Mongo.Migration.Migrations.Document;
using MongoDB.Bson;


namespace Monai.Deploy.WorkflowManager.Common.Contracts.Migrations
{
public class M004_WorkflowRevision_addConditions : DocumentMigration<WorkflowRevision>
{
public M004_WorkflowRevision_addConditions() : base("1.0.2") { }

public override void Up(BsonDocument document)
{
var workflow = document["Workflow"].AsBsonDocument;
workflow.Add("Conditions", new BsonArray { });
}

public override void Down(BsonDocument document)
{
try
{
var workflow = document["Workflow"].AsBsonDocument;
workflow.Remove("Conditions");
}
catch
{ // can ignore we dont want failures stopping startup !
}
}
}
}
3 changes: 3 additions & 0 deletions src/WorkflowManager/Contracts/Models/Workflow.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,5 +39,8 @@ public class Workflow
[JsonProperty(PropertyName = "dataRetentionDays")]
public int? DataRetentionDays { get; set; } = 3;// note. -1 = never delete

[JsonProperty(PropertyName = "conditions")]
public string[] Conditions { get; set; } = [];

}
}
2 changes: 1 addition & 1 deletion src/WorkflowManager/Contracts/Models/WorkflowRevision.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@

namespace Monai.Deploy.WorkflowManager.Common.Contracts.Models
{
[CollectionLocation("Workflows"), RuntimeVersion("1.0.1")]
[CollectionLocation("Workflows"), RuntimeVersion("1.0.2")]
public class WorkflowRevision : ISoftDeleteable, IDocument
{
[BsonId]
Expand Down
3 changes: 3 additions & 0 deletions src/WorkflowManager/Logging/Log.200000.Workflow.cs
Original file line number Diff line number Diff line change
Expand Up @@ -114,5 +114,8 @@ public static partial class Log

[LoggerMessage(EventId = 210019, Level = LogLevel.Error, Message = "Task is missing required input artifacts {taskId} Artifacts {ArtifactsJson}")]
public static partial void TaskIsMissingRequiredInputArtifacts(this ILogger logger, string taskId, string ArtifactsJson);

[LoggerMessage(EventId = 200020, Level = LogLevel.Warning, Message = "no workflow to execute for the given workflow request.")]
public static partial void DidntToCreateWorkflowInstances(this ILogger logger);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,13 @@ public async Task<bool> ProcessPayload(WorkflowRequestEvent message, Payload pay

var tasks = workflows.Select(workflow => CreateWorkflowInstanceAsync(message, workflow));
var newInstances = await Task.WhenAll(tasks).ConfigureAwait(false);

if (newInstances is null || newInstances.Length == 0 || newInstances[0] is null) // if null then it because it didnt meet the conditions needed to create a workflow instance
{
_logger.DidntToCreateWorkflowInstances();
return false;
}

workflowInstances.AddRange(newInstances);

var existingInstances = await _workflowInstanceRepository.GetByWorkflowsIdsAsync(workflowInstances.Select(w => w.WorkflowId).ToList());
Expand Down Expand Up @@ -1103,29 +1110,34 @@ private async Task<bool> ClinicalReviewTimeOutEvent(WorkflowInstance workflowIns
return true;
}

private async Task<WorkflowInstance> CreateWorkflowInstanceAsync(WorkflowRequestEvent message, WorkflowRevision workflow)
private async Task<WorkflowInstance?> CreateWorkflowInstanceAsync(WorkflowRequestEvent message, WorkflowRevision workflow)
{
ArgumentNullException.ThrowIfNull(message, nameof(message));
ArgumentNullException.ThrowIfNull(workflow, nameof(workflow));
ArgumentNullException.ThrowIfNull(workflow.Workflow, nameof(workflow.Workflow));

var workflowInstanceId = Guid.NewGuid().ToString();
var workflowInstance = MakeInstance(message, workflow);

var workflowInstance = new WorkflowInstance()
// check if the conditionals allow the workflow to be created

if (workflow.Workflow.Conditions.Length != 0)
{
Id = workflowInstanceId,
WorkflowId = workflow.WorkflowId,
WorkflowName = workflow.Workflow.Name,
PayloadId = message.PayloadId.ToString(),
StartTime = DateTime.UtcNow,
Status = Status.Created,
AeTitle = workflow.Workflow?.InformaticsGateway?.AeTitle,
BucketId = message.Bucket,
InputMetaData = { } //Functionality to be added later
};
var conditionalMet = _conditionalParameterParser.TryParse(workflow.Workflow.Conditions, workflowInstance, out var resolvedConditional);
if (conditionalMet is false)
{
return null;
}
}

await CreateTaskExecutionForFirstTask(message, workflow, workflowInstance);

return workflowInstance;
}

private async Task CreateTaskExecutionForFirstTask(WorkflowRequestEvent message, WorkflowRevision workflow, WorkflowInstance workflowInstance)
{
var tasks = new List<TaskExecution>();
// part of this ticket just take the first task

if (workflow?.Workflow?.Tasks.Length > 0)
{
var firstTask = workflow.Workflow.Tasks.First();
Expand All @@ -1141,7 +1153,24 @@ private async Task<WorkflowInstance> CreateWorkflowInstanceAsync(WorkflowRequest
}

workflowInstance.Tasks = tasks;
}

private static WorkflowInstance MakeInstance(WorkflowRequestEvent message, WorkflowRevision workflow)
{
var workflowInstanceId = Guid.NewGuid().ToString();

var workflowInstance = new WorkflowInstance()
{
Id = workflowInstanceId,
WorkflowId = workflow.WorkflowId,
WorkflowName = workflow.Workflow.Name,
PayloadId = message.PayloadId.ToString(),
StartTime = DateTime.UtcNow,
Status = Status.Created,
AeTitle = workflow.Workflow?.InformaticsGateway?.AeTitle,
BucketId = message.Bucket,
InputMetaData = { } //Functionality to be added later
};
return workflowInstance;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ public class WorkflowExecuterServiceTests
private readonly IOptions<WorkflowManagerOptions> _configuration;
private readonly IOptions<StorageServiceConfiguration> _storageConfiguration;
private readonly Mock<ITaskExecutionStatsRepository> _taskExecutionStatsRepository;
private readonly Mock<IDicomService> _dicom = new Mock<IDicomService>();
private readonly int _timeoutForTypeTask = 999;
private readonly int _timeoutForDefault = 966;

Expand Down Expand Up @@ -98,11 +99,10 @@ public WorkflowExecuterServiceTests()

_storageConfiguration = Options.Create(new StorageServiceConfiguration() { Settings = new Dictionary<string, string> { { "bucket", "testbucket" }, { "endpoint", "localhost" }, { "securedConnection", "False" } } });

var dicom = new Mock<IDicomService>();
var logger = new Mock<ILogger<ConditionalParameterParser>>();

var conditionalParser = new ConditionalParameterParser(logger.Object,
dicom.Object,
_dicom.Object,
_workflowInstanceService.Object,
_payloadService.Object,
_workflowService.Object);
Expand Down Expand Up @@ -3868,7 +3868,115 @@ public async Task ProcessPayload_With_Multiple_Taskdestinations_One_Has_Inputs()

#pragma warning restore CS8604 // Possible null reference argument.
}
}

[Fact]
public async Task ProcessPayload_With_Failing_Workflow_Conditional_Should_Not_Procced()
{
var workflowRequest = new WorkflowRequestEvent
{
Bucket = "testbucket",
DataTrigger = new DataOrigin { Source = "aetitle", Destination = "aetitle" },
CorrelationId = Guid.NewGuid().ToString(),
Timestamp = DateTime.UtcNow
};

var workflows = new List<WorkflowRevision>
{
new() {
Id = Guid.NewGuid().ToString(),
WorkflowId = Guid.NewGuid().ToString(),
Revision = 1,
Workflow = new Workflow
{
Name = "Workflowname",
Description = "Workflowdesc",
Version = "1",
InformaticsGateway = new InformaticsGateway
{
AeTitle = "aetitle"
},
Tasks =
[
new TaskObject {
Id = Guid.NewGuid().ToString(),
Type = "type",
Description = "taskdesc"
}
],
Conditions = ["{{ context.dicom.series.any('0010','0040') }} == 'lordge'"]
}
}
};

_workflowRepository.Setup(w => w.GetWorkflowsByAeTitleAsync(It.IsAny<List<string>>())).ReturnsAsync(workflows);
_workflowRepository.Setup(w => w.GetWorkflowsForWorkflowRequestAsync(It.IsAny<string>(), It.IsAny<string>())).ReturnsAsync(workflows);
_workflowRepository.Setup(w => w.GetByWorkflowIdAsync(workflows[0].WorkflowId)).ReturnsAsync(workflows[0]);
_workflowInstanceRepository.Setup(w => w.CreateAsync(It.IsAny<List<WorkflowInstance>>())).ReturnsAsync(true);
_workflowInstanceRepository.Setup(w => w.GetByWorkflowsIdsAsync(It.IsAny<List<string>>())).ReturnsAsync(new List<WorkflowInstance>());
_workflowInstanceRepository.Setup(w => w.UpdateTaskStatusAsync(It.IsAny<string>(), It.IsAny<string>(), It.IsAny<TaskExecutionStatus>())).ReturnsAsync(true);

var result = await WorkflowExecuterService.ProcessPayload(workflowRequest, new Payload() { Id = Guid.NewGuid().ToString() });

_messageBrokerPublisherService.Verify(w => w.Publish(_configuration.Value.Messaging.Topics.TaskDispatchRequest, It.IsAny<Message>()), Times.Never());

Assert.False(result);
}

[Fact]
public async Task ProcessPayload_With_Passing_Workflow_Conditional_Should_Procced()
{
var workflowRequest = new WorkflowRequestEvent
{
Bucket = "testbucket",
DataTrigger = new DataOrigin { Source = "aetitle", Destination = "aetitle" },
CorrelationId = Guid.NewGuid().ToString(),
Timestamp = DateTime.UtcNow
};

var workflows = new List<WorkflowRevision>
{
new() {
Id = Guid.NewGuid().ToString(),
WorkflowId = Guid.NewGuid().ToString(),
Revision = 1,
Workflow = new Workflow
{
Name = "Workflowname",
Description = "Workflowdesc",
Version = "1",
InformaticsGateway = new InformaticsGateway
{
AeTitle = "aetitle"
},
Tasks =
[
new TaskObject {
Id = Guid.NewGuid().ToString(),
Type = "type",
Description = "taskdesc"
}
],
Conditions = ["{{ context.dicom.series.any('0010','0040') }} == 'lordge'"]
}
}
};

_dicom.Setup(w => w.GetAnyValueAsync(It.IsAny<string>(), It.IsAny<string>(), It.IsAny<string>()))
.ReturnsAsync(() => "lordge");

_workflowRepository.Setup(w => w.GetWorkflowsByAeTitleAsync(It.IsAny<List<string>>())).ReturnsAsync(workflows);
_workflowRepository.Setup(w => w.GetWorkflowsForWorkflowRequestAsync(It.IsAny<string>(), It.IsAny<string>())).ReturnsAsync(workflows);
_workflowRepository.Setup(w => w.GetByWorkflowIdAsync(workflows[0].WorkflowId)).ReturnsAsync(workflows[0]);
_workflowInstanceRepository.Setup(w => w.CreateAsync(It.IsAny<List<WorkflowInstance>>())).ReturnsAsync(true);
_workflowInstanceRepository.Setup(w => w.GetByWorkflowsIdsAsync(It.IsAny<List<string>>())).ReturnsAsync(new List<WorkflowInstance>());
_workflowInstanceRepository.Setup(w => w.UpdateTaskStatusAsync(It.IsAny<string>(), It.IsAny<string>(), It.IsAny<TaskExecutionStatus>())).ReturnsAsync(true);

var result = await WorkflowExecuterService.ProcessPayload(workflowRequest, new Payload() { Id = Guid.NewGuid().ToString() });

_messageBrokerPublisherService.Verify(w => w.Publish(_configuration.Value.Messaging.Topics.TaskDispatchRequest, It.IsAny<Message>()), Times.Once());

Assert.True(result);
}
}
#pragma warning restore CS8625 // Cannot convert null literal to non-nullable reference type.
}