Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 21 additions & 18 deletions packages/global/common/error/code/evaluation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,7 @@ export enum EvaluationErrEnum {
evalInvalidStatus = 'evaluationInvalidStatus',
evalInvalidStateTransition = 'evaluationInvalidStateTransition',
evalOnlyRunningCanStop = 'evaluationOnlyRunningCanStop',
evalOnlyFailedCanRetry = 'evaluationOnlyFailedCanRetry',
evalItemNoErrorToRetry = 'evaluationItemNoErrorToRetry',
evalTargetOutputRequired = 'evaluationTargetOutputRequired',
evalEvaluatorOutputRequired = 'evaluationEvaluatorOutputRequired',
evalDatasetLoadFailed = 'evaluationDatasetLoadFailed',
evalTargetConfigInvalid = 'evaluationTargetConfigInvalid',
evalEvaluatorsConfigInvalid = 'evaluationEvaluatorsConfigInvalid',
Expand All @@ -42,7 +39,11 @@ export enum EvaluationErrEnum {
evalDuplicateDatasetName = 'evaluationDuplicateDatasetName',
evalNoDataInCollections = 'evaluationNoDataInCollections',
evalUpdateFailed = 'evaluationUpdateFailed',
evalLockAcquisitionFailed = 'evaluationLockAcquisitionFailed',
// Task execution errors
evalTaskSystemError = 'evaluationTaskSystemError',
evalManuallyStopped = 'evaluationManuallyStopped',
evalEvaluatorExecutionErrors = 'evaluationEvaluatorExecutionErrors',
evalTargetExecutionError = 'evaluationTargetExecutionError',

// Metric related errors
evalMetricNotFound = 'evaluationMetricNotFound',
Expand Down Expand Up @@ -250,21 +251,13 @@ const evaluationErrList = [
statusText: EvaluationErrEnum.evalOnlyRunningCanStop,
message: i18nT('evaluation:only_running_can_stop')
},
{
statusText: EvaluationErrEnum.evalOnlyFailedCanRetry,
message: i18nT('evaluation:only_failed_can_retry')
},
{
statusText: EvaluationErrEnum.evalItemNoErrorToRetry,
message: i18nT('evaluation:item_no_error_to_retry')
},
{
statusText: EvaluationErrEnum.evalTargetOutputRequired,
message: i18nT('evaluation:target_output_required')
},
{
statusText: EvaluationErrEnum.evalEvaluatorOutputRequired,
message: i18nT('evaluation:evaluator_output_required')
statusText: EvaluationErrEnum.evalTargetExecutionError,
message: i18nT('evaluation:target_execution_error')
},
{
statusText: EvaluationErrEnum.evalDatasetLoadFailed,
Expand Down Expand Up @@ -298,10 +291,6 @@ const evaluationErrList = [
statusText: EvaluationErrEnum.evalUpdateFailed,
message: i18nT('evaluation:update_failed')
},
{
statusText: EvaluationErrEnum.evalLockAcquisitionFailed,
message: i18nT('evaluation:lock_acquisition_failed')
},
// Metric related errors
{
statusText: EvaluationErrEnum.evalMetricNotFound,
Expand Down Expand Up @@ -638,6 +627,20 @@ const evaluationErrList = [
{
statusText: EvaluationErrEnum.evalDescriptionInvalidType,
message: i18nT('evaluation:description_invalid_type')
},

// Task execution errors
{
statusText: EvaluationErrEnum.evalTaskSystemError,
message: i18nT('evaluation:task_system_error')
},
{
statusText: EvaluationErrEnum.evalManuallyStopped,
message: i18nT('evaluation:manually_stopped')
},
{
statusText: EvaluationErrEnum.evalEvaluatorExecutionErrors,
message: i18nT('evaluation:evaluator_execution_errors')
}
];

Expand Down
2 changes: 1 addition & 1 deletion packages/global/core/evaluation/api.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ export type StopEvaluationResponse = MessageResponse;
// Get Evaluation Stats
export type StatsEvaluationRequest = EvalIdQuery;
export type EvaluationStatsResponse = EvaluationStatistics & {
failed: number;
belowThreshold: number;
};

// Export Evaluation Items
Expand Down
30 changes: 30 additions & 0 deletions packages/global/core/evaluation/validate.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,5 +72,35 @@ export const ValidationResultUtils = {
throw new Error('Cannot create error from valid validation result');
}
return new Error(ValidationResultUtils.formatErrors(result));
},

/**
* Create an Error object with only the first error code for i18n translation
* Logs detailed validation errors for debugging
*/
toTranslatableError(result: ValidationResult): Error {
if (result.isValid) {
throw new Error('Cannot create error from valid validation result');
}

// Log detailed validation errors for debugging
// Note: addLog is not available in global package, will be logged by caller
const debugInfo = {
errors: result.errors,
formattedMessage: ValidationResultUtils.formatErrors(result),
summary: ValidationResultUtils.getSummaryMessage(result)
};

// Return error with only the first error code for i18n translation
const firstError = result.errors[0];
if (!firstError) {
return new Error('Validation failed with unknown error');
}

// Attach debug info to the error for caller to log
const error = new Error(firstError.code);
(error as any).validationDebugInfo = debugInfo;

return error;
}
};
2 changes: 1 addition & 1 deletion packages/service/core/evaluation/dataset/dataQualityMq.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ export const getEvalDatasetDataQualityWorker = (
QueueNames.evalDatasetDataQuality,
processor,
{
maxStalledCount: 3,
maxStalledCount: global.systemEnv?.evalConfig?.maxStalledCount || 3,
removeOnFail: {
count: 1000 // Keep last 1000 failed jobs
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ export const getEvalDatasetDataSynthesizeWorker = (
processor: Processor<EvalDatasetDataSynthesizeData>
) => {
return getWorker<EvalDatasetDataSynthesizeData>(QueueNames.evalDatasetDataSynthesize, processor, {
maxStalledCount: 3,
maxStalledCount: global.systemEnv?.evalConfig?.maxStalledCount || 3,
removeOnFail: {
age: 30 * 60 * 60 * 24 // 30 day
},
Expand Down
209 changes: 208 additions & 1 deletion packages/service/core/evaluation/index.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,21 @@
import { addLog } from '../../common/system/log';
import { setCron } from '../../common/system/cron';
import { initEvalDatasetDataQualityWorker } from './dataset/dataQualityProcessor';
import { initEvalDatasetDataSynthesizeWorker } from './dataset/dataSynthesizeProcessor';
import { initEvalTaskWorker, initEvalTaskItemWorker } from './task/processor';
import { initEvaluationSummaryWorker } from './summary/worker';

// Import all queues for cleanup
import { evaluationTaskQueue, evaluationItemQueue } from './task/mq';
import { evalDatasetDataQualityQueue } from './dataset/dataQualityMq';
import { evalDatasetDataSynthesizeQueue } from './dataset/dataSynthesizeMq';
import { getEvaluationSummaryQueue } from './summary/queue';

// Import MongoDB models for existence checks
import { MongoEvaluation, MongoEvalItem } from './task/schema';
import { MongoEvalDatasetData } from './dataset/evalDatasetDataSchema';
import { MongoEvalDatasetCollection } from './dataset/evalDatasetCollectionSchema';

// Initialize evaluation workers

export const initEvaluationWorkers = () => {
Expand All @@ -15,6 +27,201 @@ export const initEvaluationWorkers = () => {
initEvalDatasetDataQualityWorker();
initEvalDatasetDataSynthesizeWorker();

// 初始化评估总结Worker
initEvaluationSummaryWorker();

// Setup periodic orphaned jobs cleanup
setupOrphanedJobsCleanup();
};

/**
* Setup periodic cleanup for orphaned jobs
* Specifically handles residual issues caused by active jobs that cannot be deleted
*/
const setupOrphanedJobsCleanup = () => {
// Run cleanup every 30 minutes
setCron('*/30 * * * *', async () => {
await cleanupOrphanedJobs();
});

addLog.info('[Evaluation] Orphaned jobs cleanup scheduled (every 30 minutes)');
};

/**
* Comprehensive cleanup for all orphaned jobs in evaluation system
* Handles active jobs that cannot be deleted by BullMQ
*/
export const cleanupOrphanedJobs = async () => {
try {
addLog.debug('[Evaluation] Starting comprehensive orphaned jobs cleanup');

const summaryQueue = getEvaluationSummaryQueue();

// Get all jobs from all evaluation queues
const [taskJobs, itemJobs, dataQualityJobs, dataSynthesizeJobs, summaryJobs] =
await Promise.all([
evaluationTaskQueue.getJobs(
['active', 'waiting', 'delayed', 'completed', 'failed'],
0,
1000
),
evaluationItemQueue.getJobs(
['active', 'waiting', 'delayed', 'completed', 'failed'],
0,
2000
),
evalDatasetDataQualityQueue.getJobs(
['active', 'waiting', 'delayed', 'completed', 'failed'],
0,
1000
),
evalDatasetDataSynthesizeQueue.getJobs(
['active', 'waiting', 'delayed', 'completed', 'failed'],
0,
1000
),
summaryQueue.getJobs(['active', 'waiting', 'delayed', 'completed', 'failed'], 0, 500)
]);

let cleanedCount = 0;
let skippedActiveCount = 0;

// 1. Clean orphaned task jobs
for (const job of taskJobs) {
try {
const { evalId } = job.data;
const evaluation = await MongoEvaluation.exists({ _id: evalId });

if (!evaluation) {
const result = await cleanupJob(job, 'task', { evalId });
if (result.cleaned) cleanedCount++;
if (result.skippedActive) skippedActiveCount++;
}
} catch (error) {
addLog.warn('[Evaluation] Failed to cleanup task job', { jobId: job.id, error });
}
}

// 2. Clean orphaned item jobs
for (const job of itemJobs) {
try {
const { evalId, evalItemId } = job.data;
const [evaluation, evalItem] = await Promise.all([
MongoEvaluation.exists({ _id: evalId }),
MongoEvalItem.exists({ _id: evalItemId })
]);

if (!evaluation || !evalItem) {
const result = await cleanupJob(job, 'item', { evalId, evalItemId });
if (result.cleaned) cleanedCount++;
if (result.skippedActive) skippedActiveCount++;
}
} catch (error) {
addLog.warn('[Evaluation] Failed to cleanup item job', { jobId: job.id, error });
}
}

// 3. Clean orphaned data quality jobs
for (const job of dataQualityJobs) {
try {
const { dataId } = job.data;
const dataExists = await MongoEvalDatasetData.exists({ _id: dataId });

if (!dataExists) {
const result = await cleanupJob(job, 'dataQuality', { dataId });
if (result.cleaned) cleanedCount++;
if (result.skippedActive) skippedActiveCount++;
}
} catch (error) {
addLog.warn('[Evaluation] Failed to cleanup data quality job', { jobId: job.id, error });
}
}

// 4. Clean orphaned data synthesize jobs
for (const job of dataSynthesizeJobs) {
try {
const { dataId, evalDatasetCollectionId } = job.data;
const [dataExists, collectionExists] = await Promise.all([
MongoEvalDatasetData.exists({ _id: dataId }),
MongoEvalDatasetCollection.exists({ _id: evalDatasetCollectionId })
]);

if (!dataExists || !collectionExists) {
const result = await cleanupJob(job, 'dataSynthesize', {
dataId,
evalDatasetCollectionId
});
if (result.cleaned) cleanedCount++;
if (result.skippedActive) skippedActiveCount++;
}
} catch (error) {
addLog.warn('[Evaluation] Failed to cleanup data synthesize job', { jobId: job.id, error });
}
}

// 5. Clean orphaned summary jobs
for (const job of summaryJobs) {
try {
const { evalId } = job.data;
const evaluation = await MongoEvaluation.exists({ _id: evalId });

if (!evaluation) {
const result = await cleanupJob(job, 'summary', { evalId });
if (result.cleaned) cleanedCount++;
if (result.skippedActive) skippedActiveCount++;
}
} catch (error) {
addLog.warn('[Evaluation] Failed to cleanup summary job', { jobId: job.id, error });
}
}

const result = {
totalJobs:
taskJobs.length +
itemJobs.length +
dataQualityJobs.length +
dataSynthesizeJobs.length +
summaryJobs.length,
cleanedJobs: cleanedCount,
skippedActiveJobs: skippedActiveCount,
breakdown: {
taskJobs: taskJobs.length,
itemJobs: itemJobs.length,
dataQualityJobs: dataQualityJobs.length,
dataSynthesizeJobs: dataSynthesizeJobs.length,
summaryJobs: summaryJobs.length
}
};

addLog.info('[Evaluation] Comprehensive orphaned jobs cleanup completed', result);
return result;
} catch (error) {
addLog.error('[Evaluation] Comprehensive orphaned jobs cleanup failed', { error });
return null;
}
};

/**
* Helper function to cleanup individual job
*/
async function cleanupJob(job: any, jobType: string, context: Record<string, any>) {
const jobState = await job.getState();

if (jobState === 'active') {
// Active job cannot be removed, log warning
addLog.warn(`[Evaluation] Found orphaned active ${jobType} job (cannot remove)`, {
jobId: job.id,
state: jobState,
...context
});
return { cleaned: false, skippedActive: true };
} else {
// Non-active jobs can be safely removed
await job.remove();
addLog.debug(`[Evaluation] Removed orphaned ${jobType} job`, {
jobId: job.id,
state: jobState,
...context
});
return { cleaned: true, skippedActive: false };
}
}
2 changes: 2 additions & 0 deletions packages/service/core/evaluation/task/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@ export const createEvaluationError = (
forceRetry?: boolean
): Error => {
const errorStr = error?.message || error?.code || String(error);

// Use getErrText for all error types for consistent translation
const errorMessage = getErrText(error);

// Build detailed error context
Expand Down
Loading
Loading