Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,10 @@ public Batch()
AddRecords = new ConcurrentQueue<BasicParticipantCsvRecord>();
UpdateRecords = new ConcurrentQueue<BasicParticipantCsvRecord>();
DeleteRecords = new ConcurrentQueue<BasicParticipantCsvRecord>();
DemographicData = new ConcurrentQueue<ParticipantDemographic>();
}

public ConcurrentQueue<BasicParticipantCsvRecord> AddRecords { get; set; }
public ConcurrentQueue<BasicParticipantCsvRecord> UpdateRecords { get; set; }
public ConcurrentQueue<BasicParticipantCsvRecord> DeleteRecords { get; set; }
public ConcurrentQueue<ParticipantDemographic> DemographicData { get; set; }

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,13 @@ public class ProcessCaasFile : IProcessCaasFile
{
private readonly ILogger<ProcessCaasFile> _logger;
private readonly IReceiveCaasFileHelper _receiveCaasFileHelper;
private readonly ICallDurableDemographicFunc _callDurableDemographicFunc;
private readonly ICreateBasicParticipantData _createBasicParticipantData;
private readonly IAddBatchToQueue _addBatchToQueue;
private readonly IExceptionHandler _exceptionHandler;
private readonly IDataServiceClient<ParticipantDemographic> _participantDemographic;
private readonly IRecordsProcessedTracker _recordsProcessTracker;
private readonly IValidateDates _validateDates;
private readonly ReceiveCaasFileConfig _config;
private readonly string DemographicURI;


public ProcessCaasFile(
Expand All @@ -34,7 +32,6 @@ public ProcessCaasFile(
IDataServiceClient<ParticipantDemographic> participantDemographic,
IRecordsProcessedTracker recordsProcessedTracker,
IValidateDates validateDates,
ICallDurableDemographicFunc callDurableDemographicFunc,
IOptions<ReceiveCaasFileConfig> receiveCaasFileConfig
)
{
Expand All @@ -46,9 +43,7 @@ IOptions<ReceiveCaasFileConfig> receiveCaasFileConfig
_participantDemographic = participantDemographic;
_recordsProcessTracker = recordsProcessedTracker;
_validateDates = validateDates;
_callDurableDemographicFunc = callDurableDemographicFunc;
_config = receiveCaasFileConfig.Value;
DemographicURI = _config.DemographicURI;
}

/// <summary>
Expand Down Expand Up @@ -93,10 +88,7 @@ await Parallel.ForEachAsync(values, options, async (rec, cancellationToken) =>
await AddRecordToBatch(participant, currentBatch, name);
});

if (await _callDurableDemographicFunc.PostDemographicDataAsync(currentBatch.DemographicData.ToList(), DemographicURI, name))
{
await AddBatchToQueue(currentBatch, name);
}
await AddBatchToQueue(currentBatch, name);
}

/// <summary>
Expand All @@ -114,40 +106,26 @@ private async Task AddRecordToBatch(Participant participant, Batch currentBatch,
FileName = fileName,
Participant = participant
};
// take note: we don't need to add DemographicData to the queue for update because we loop through all updates in the UpdateParticipant method

// Upsert demographic record immediately (no batching)
await UpdateOldDemographicRecord(basicParticipantCsvRecord, fileName);

// Add to Service Bus queues based on record type
switch (participant.RecordType?.Trim())
{

case Actions.New:

currentBatch.AddRecords.Enqueue(basicParticipantCsvRecord);
if (await UpdateOldDemographicRecord(basicParticipantCsvRecord, fileName))
{
break;
}
currentBatch.DemographicData.Enqueue(participant.ToParticipantDemographic());
break;
case Actions.Amended:
if (!await UpdateOldDemographicRecord(basicParticipantCsvRecord, fileName))
{
currentBatch.DemographicData.Enqueue(participant.ToParticipantDemographic());
currentBatch.UpdateRecords.Enqueue(basicParticipantCsvRecord);
break;
}
currentBatch.UpdateRecords.Enqueue(basicParticipantCsvRecord);
break;
case Actions.Removed:
if (!await UpdateOldDemographicRecord(basicParticipantCsvRecord, fileName))
{
currentBatch.DemographicData.Enqueue(participant.ToParticipantDemographic());
}
currentBatch.DeleteRecords.Enqueue(basicParticipantCsvRecord);
break;
default:
await _exceptionHandler.CreateSchemaValidationException(basicParticipantCsvRecord, "RecordType was not set to an expected value");
break;
}

}

private async Task AddBatchToQueue(Batch currentBatch, string name)
Expand Down Expand Up @@ -175,34 +153,22 @@ private async Task<bool> UpdateOldDemographicRecord(BasicParticipantCsvRecord ba
throw new FormatException("Unable to parse NHS Number");
}

var participant = await _participantDemographic.GetSingleByFilter(x => x.NhsNumber == nhsNumber);

if (participant == null)
{
_logger.LogWarning("The participant could not be found, when trying to update old Participant");
return false;
}

basicParticipantCsvRecord.Participant.RecordInsertDateTime = participant.RecordInsertDateTime?.ToString("yyyy-MM-dd HH:mm:ss");
var participantForUpdate = basicParticipantCsvRecord.Participant.ToParticipantDemographic();

participantForUpdate.RecordUpdateDateTime = DateTime.UtcNow;
participantForUpdate.ParticipantId = participant.ParticipantId;

var participantForUpsert = basicParticipantCsvRecord.Participant.ToParticipantDemographic();
participantForUpsert.RecordUpdateDateTime = DateTime.UtcNow;

var updated = await _participantDemographic.Update(participantForUpdate);
if (updated)
var upserted = await _participantDemographic.Upsert(participantForUpsert);
if (upserted)
{
_logger.LogInformation("updating old Demographic record was successful");
return updated;
_logger.LogInformation("Upsert of Demographic record was successful");
return true;
}

_logger.LogError("updating old Demographic record was not successful");
throw new InvalidOperationException("updating old Demographic record was not successful");
_logger.LogError("Upsert of Demographic record was not successful");
throw new InvalidOperationException("Upsert of Demographic record was not successful");
}
catch (Exception ex)
{
var errorDescription = $"Update participant function failed.\nMessage: {ex.Message}\nStack Trace: {ex.StackTrace}";
var errorDescription = $"Upsert participant function failed.\nMessage: {ex.Message}\nStack Trace: {ex.StackTrace}";
_logger.LogError(ex, errorDescription);
await CreateError(basicParticipantCsvRecord.Participant, name, errorDescription);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
services.AddScoped<IProcessCaasFile, ProcessCaasFile>(); //Do not change the lifetime of this.
services.AddSingleton<ICreateResponse, CreateResponse>();
services.AddScoped<ICheckDemographic, CheckDemographic>();
services.AddScoped<ICallDurableDemographicFunc, CallDurableDemographicFunc>();
services.AddScoped<ICreateBasicParticipantData, CreateBasicParticipantData>();
services.AddScoped<IAddBatchToQueue, AddBatchToQueue>();
services.AddScoped<IRecordsProcessedTracker, RecordsProcessedTracker>(); //Do not change the lifetime of this.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,28 @@ public async Task<bool> Update(TEntity entity)
return true;
}

public async Task<bool> Upsert(TEntity entity)
{
var jsonString = JsonSerializer.Serialize<TEntity>(entity);

if (string.IsNullOrEmpty(jsonString))
{
_logger.LogWarning("Unable to serialize upsert request body for entity of type {EntityType}", typeof(TEntity).FullName);
return false;
}

// Use POST with a special endpoint or header to indicate upsert
// Option 1: Use a dedicated upsert endpoint
var upsertUrl = UrlBuilder(_baseUrl, "upsert");
var result = await _httpClientFunction.SendPost(upsertUrl, jsonString);

if (result.StatusCode != HttpStatusCode.OK)
{
return false;
}
return true;
}

private async Task<string> GetJsonStringByFilter(Expression<Func<TEntity, bool>> predicate, bool returnOneRecord = false)
{
try
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,11 @@ public Task<bool> Update(TEntity entity)
throw new NotImplementedException();
}

public Task<bool> Upsert(TEntity entity)
{
throw new NotImplementedException();
}

private Expression<Func<TEntity, bool>> CreateGetByKeyExpression(string filter)
{
var entityParameter = Expression.Parameter(typeof(TEntity));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,4 +51,10 @@ public interface IDataServiceClient<TEntity>
/// <param name="entity">the object that is being updated/param>
/// <returns>a boolean representing if the record was updated successfully</returns>
Task<bool> Update(TEntity entity);
/// <summary>
/// Upserts (Inserts or Updates) a single record atomically
/// </summary>
/// <param name="entity">the object to be upserted</param>
/// <returns>a boolean representing if the record was upserted successfully</returns>
Task<bool> Upsert(TEntity entity);
}
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ public async Task<TEntity> Update(TEntity entity, Expression<Func<TEntity, bool>
{
return null;
}

_context.Update(entity);
rowsEffected = await _context.SaveChangesAsync();

Expand All @@ -113,12 +114,13 @@ public async Task<TEntity> Update(TEntity entity, Expression<Func<TEntity, bool>
await transaction.RollbackAsync();
return existingEntity;
}
else if (rowsEffected > 1)

if (rowsEffected > 1)
{
await transaction.RollbackAsync();
return null;

}

await transaction.CommitAsync();
return entity;

Expand All @@ -130,21 +132,140 @@ public async Task<TEntity> Update(TEntity entity, Expression<Func<TEntity, bool>
_logger.LogWarning("Entity to be updated not found");
return null;
}
else if(rowsEffected == 0 && dbEntity != null)

if(rowsEffected == 0 && dbEntity != null)
{
_logger.LogError("Records where found to be updated but the update failed");
throw new MultipleRecordsFoundException("Records where found to be updated but the update failed");
}
else if(rowsEffected > 1)

if(rowsEffected > 1)
{
_logger.LogError("Multiple Records were updated by PUT request, Changes have been Rolled-back");
throw new MultipleRecordsFoundException("Multiple Records were updated by PUT request, Changes have been Rolled-back");
}

return dbEntity!;
}

public async Task<bool> Upsert(TEntity entity, Expression<Func<TEntity, bool>> predicate)
{
int rowsAffected = 0;
var strategy = _context.Database.CreateExecutionStrategy();

await strategy.ExecuteAsync(
async () =>
{
using var transaction = await _context.Database.BeginTransactionAsync();

try
{
var existingEntity = await _context.Set<TEntity>().AsNoTracking().SingleOrDefaultAsync(predicate);

if (existingEntity == null)
{
await InsertNewEntity(entity);
}
else
{
await UpdateExistingEntity(entity, existingEntity);
}

rowsAffected = await _context.SaveChangesAsync();

if (!ValidateRowsAffected(rowsAffected))
{
await transaction.RollbackAsync();
return;
}

await transaction.CommitAsync();
_logger.LogInformation("Upsert operation completed successfully. Rows affected: {RowsAffected}", rowsAffected);
}
catch (Exception ex)
{
_logger.LogError(ex, "Error during upsert operation. Rolling back transaction.");
await transaction.RollbackAsync();
throw new InvalidOperationException("Upsert operation failed. See inner exception for details.", ex);
}
}
);

return rowsAffected > 0;
}

private async Task InsertNewEntity(TEntity entity)
{
await _context.AddAsync(entity);
_logger.LogInformation("Inserting new entity in Upsert operation");
}

private async Task UpdateExistingEntity(TEntity entity, TEntity existingEntity)
{
_logger.LogInformation("Updating existing entity in Upsert operation");

PreservePrimaryKeys(entity, existingEntity);
PreserveRecordInsertDateTime(entity, existingEntity);

_context.Update(entity);
await Task.CompletedTask;
}

private void PreservePrimaryKeys(TEntity entity, TEntity existingEntity)
{
var entityType = _context.Model.FindEntityType(typeof(TEntity));
var keyProperties = entityType?.FindPrimaryKey()?.Properties;

if (keyProperties == null)
{
return;
}

var propertyNames = keyProperties.Select(kp => kp.Name).ToList();

foreach (var propertyName in propertyNames)
{
var clrProperty = typeof(TEntity).GetProperty(propertyName);
if (clrProperty != null)
{
var existingKey = clrProperty.GetValue(existingEntity);
clrProperty.SetValue(entity, existingKey);
_logger.LogDebug("Preserved primary key {KeyName}: {KeyValue}", propertyName, existingKey);
}
}
}

private void PreserveRecordInsertDateTime(TEntity entity, TEntity existingEntity)
{
var insertDateProperty = typeof(TEntity).GetProperty("RecordInsertDateTime");
if (insertDateProperty == null)
{
return;
}

var existingInsertDate = insertDateProperty.GetValue(existingEntity);
if (existingInsertDate != null)
{
insertDateProperty.SetValue(entity, existingInsertDate);
_logger.LogDebug("Preserved RecordInsertDateTime: {RecordInsertDateTime}", existingInsertDate);
}
}

private bool ValidateRowsAffected(int rowsAffected)
{
if (rowsAffected == 0)
{
_logger.LogWarning("Upsert resulted in 0 rows affected");
return false;
}

if (rowsAffected > 1)
{
_logger.LogError("Multiple records ({RowsAffected}) were affected during upsert operation. Rolling back transaction.", rowsAffected);
return false;
}

return true;
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,11 @@ public interface IDataServiceAccessor<TEntity>
Task<bool> InsertMany(IEnumerable<TEntity> entities);
Task<bool> Remove(Expression<Func<TEntity, bool>> predicate);
Task<TEntity> Update(TEntity entity, Expression<Func<TEntity, bool>> predicate);
/// <summary>
/// Upserts (Insert or Update) a single entity atomically using database MERGE
/// </summary>
/// <param name="entity">The entity to upsert</param>
/// <param name="predicate">The predicate to match existing records</param>
/// <returns>True if successful, false otherwise</returns>
Task<bool> Upsert(TEntity entity, Expression<Func<TEntity, bool>> predicate);
}
Loading
Loading