From aa42d4fcd8d11fe24ac394a2838131e1db64c604 Mon Sep 17 00:00:00 2001 From: jennyhliu Date: Thu, 30 Oct 2025 19:12:11 -0400 Subject: [PATCH 1/4] CUMULUS-4153:Add functionality and endpoint to demote a granule --- packages/api/lib/granules.js | 2 +- packages/api/lib/schemas.js | 4 + .../api/src/lib/granule-demote-promote.ts | 168 ++++++++++++++++++ packages/db/src/lib/granule.ts | 11 +- packages/db/src/translate/collections.ts | 2 + packages/db/src/types/collection.ts | 1 + packages/types/api/collections.d.ts | 1 + 7 files changed, 178 insertions(+), 11 deletions(-) create mode 100644 packages/api/src/lib/granule-demote-promote.ts diff --git a/packages/api/lib/granules.js b/packages/api/lib/granules.js index 49f59cee9e3..5b3c27b3080 100644 --- a/packages/api/lib/granules.js +++ b/packages/api/lib/granules.js @@ -72,7 +72,7 @@ const getExecutionProcessingTimeInfo = ({ }; /** - * Move granule 'file' S3 Objects and update Postgres/CMR metadata with new locations + * Move granule 'file' S3 Objects and update Postgres with new locations * * @param {Object} params - params object * @param {Object} params.apiGranule - API 'granule' object to move diff --git a/packages/api/lib/schemas.js b/packages/api/lib/schemas.js index 73a7ffcfd61..82226f0c476 100644 --- a/packages/api/lib/schemas.js +++ b/packages/api/lib/schemas.js @@ -150,6 +150,10 @@ module.exports.collection = { + 'validation and extraction regexes against', type: 'string', }, + hiddenFileBucket: { + description: 'Name of the S3 bucket used to store granule files in the hidden archive. Defaults to the primary archive bucket if not specified.', + type: 'string', + }, ignoreFilesConfigForDiscovery: { title: 'Ignore Files Configuration During Discovery', description: "When true, ignore this collection's files config list for" diff --git a/packages/api/src/lib/granule-demote-promote.ts b/packages/api/src/lib/granule-demote-promote.ts new file mode 100644 index 00000000000..c9fb94ae954 --- /dev/null +++ b/packages/api/src/lib/granule-demote-promote.ts @@ -0,0 +1,168 @@ +/** + * @module granule-demote-promote + * + * Implements `demoteGranule` and `promoteGranule` functions that: + * - Move granule files between visible/hidden locations. + * - Update file records and granule group states in the database. + * - Publish to SNS. + * - Interact with CMR + */ + +import { Knex } from 'knex'; +import Logger from '@cumulus/logger'; +import { RecordDoesNotExist } from '@cumulus/errors'; +import { moveObject } from '@cumulus/aws-client/S3'; +import { + CollectionPgModel, + FilePgModel, + GranuleGroupsPgModel, + PdrPgModel, + ProviderPgModel, + getUniqueGranuleByGranuleId, + GranulePgModel, + translatePostgresCollectionToApiCollection, + translatePostgresGranuleToApiGranule, +} from '@cumulus/db'; +const unpublishGranule = require('../../lib/granule-remove-from-cmr'); +const { publishGranuleDeleteSnsMessage } = require('../../lib/publishSnsMessageUtils'); + +const log = new Logger({ sender: 'granule-demote-promote' }); + +/** + * Demote granule: + * - Remove from CMR + * - Move granule files to hidden location + * - Update DB file records + * - Update granule_group state to 'H' + * - Publish SNS event + */ +export const demoteGranule = async (params: { + knex: Knex, + granuleId: string, + granulePgModel?: GranulePgModel, + collectionPgModel?: CollectionPgModel, + filePgModel?: FilePgModel, + granuleGroupsModel?: GranuleGroupsPgModel, + pdrPgModel?: PdrPgModel, + providerPgModel?: ProviderPgModel, +}) => { + const { + knex, + granuleId, + granulePgModel = new GranulePgModel(), + collectionPgModel = new CollectionPgModel(), + filePgModel = new FilePgModel(), + granuleGroupsModel = new GranuleGroupsPgModel(), + pdrPgModel = new PdrPgModel(), + providerPgModel = new ProviderPgModel(), + } = params; + + log.info(`Demoting granule ${granuleId}`); + + let pgGranule; + let pgCollection; + + try { + pgGranule = await getUniqueGranuleByGranuleId(knex, granuleId, granulePgModel); + pgCollection = await collectionPgModel.get( + knex, { cumulus_id: pgGranule.collection_cumulus_id } + ); + } catch (error) { + if (error instanceof RecordDoesNotExist) { + return; + } + throw error; + } + + const apiCollection = translatePostgresCollectionToApiCollection(pgCollection); + const pgGranuleCumulusId = pgGranule.cumulus_id; + + // 1. Remove from CMR + await unpublishGranule({ pgGranuleRecord: pgGranule, pgCollection, knex }); + + const existingGroup = await granuleGroupsModel.search( + knex, + { granule_cumulus_id: pgGranuleCumulusId } + ); + + const pgFiles = await filePgModel.search( + knex, + { granule_cumulus_id: pgGranuleCumulusId } + ); + + const pgFilesWithNewLocation = pgFiles.map((pgFile) => { + const updatedPgFile: any = structuredClone(pgFile); + updatedPgFile.newBucket = apiCollection.hiddenFileBucket || pgFile.bucket; + updatedPgFile.newKey = `${granuleId}/${pgFile.key}`; + return updatedPgFile; + }); + + // TODO move files partial recovery + let trx; + try { + // 2. Move files to hidden bucket and location + // If there is a hidden bucket, move all the files to this bucket with prefix granuleId/ + // If not configured, move each file to its own bucket with prefix granuleId/ + //TODO limit parallel move files + await Promise.all(pgFilesWithNewLocation.map(async (file) => { + const moved = await moveObject({ + sourceBucket: file.bucket, + sourceKey: file.key, + destinationBucket: file.newBucket, + destinationKey: file.newKey, + copyTags: true, + }); + log.info(`Moved ${file.bucket}/${file.key} -> ${file.newBucket}/${file.newKey}`); + return moved; + })); + + // update granule files in db + const pgFileRecordsForUpdate = pgFilesWithNewLocation.map((file) => ({ + cumulus_id: file.cumulus_id, + granule_cumulus_id: file.granule_cumulus_id, + bucket: file.newBucket, + key: file.newKey, + })); + + trx = await knex.transaction(); + await filePgModel.upsert(trx, pgFileRecordsForUpdate); + + // 3. Update granule_group state to 'H' + // TODO existing or not already exist, both should work + const granuleGroup = { + group_id: existingGroup?.[0]?.group_id, + granule_cumulus_id: pgGranuleCumulusId, + state: 'H', + }; + await granuleGroupsModel.upsert(trx, granuleGroup); + + await trx.commit(); + } catch (error) { + if (trx) await trx.rollback(); + await Promise.all(pgFilesWithNewLocation.map((file) => { + log.info(`Recover ${file.newBucket}/${file.newKey} -> ${file.bucket}/${file.key}`); + return moveObject({ + sourceBucket: file.newBucket, + sourceKey: file.newKey, + destinationBucket: file.bucket, + destinationKey: file.key, + copyTags: true, + }); + })); + log.error(`Failed to demote granule ${granuleId}: ${(error as Error).message}`); + throw error; + } + + // 4. Publish SNS topic + const granuleToPublishToSns = await translatePostgresGranuleToApiGranule({ + granulePgRecord: pgGranule, + knexOrTransaction: knex, + collectionPgModel, + filePgModel, + pdrPgModel, + providerPgModel, + }); + await publishGranuleDeleteSnsMessage(granuleToPublishToSns); + + log.info(`Granule ${granuleId} demoted successfully`); +}; diff --git a/packages/db/src/lib/granule.ts b/packages/db/src/lib/granule.ts index 22015e53f07..89c83eb5a16 100644 --- a/packages/db/src/lib/granule.ts +++ b/packages/db/src/lib/granule.ts @@ -15,7 +15,6 @@ import { GranulesExecutionsPgModel } from '../models/granules-executions'; import { PostgresGranule, PostgresGranuleRecord } from '../types/granule'; import { GranuleWithProviderAndCollectionInfo } from '../types/query'; import { UpdatedAtRange } from '../types/record'; -const { deprecate } = require('@cumulus/common/util'); const { TableNames } = require('../tables'); @@ -133,19 +132,11 @@ export const getUniqueGranuleByGranuleId = async ( granuleId: string, granulePgModel = new GranulePgModel() ): Promise => { - deprecate( - '@cumulus/db/getUniqueGranuleByGranuleId', - 'RDS-Phase-3', - '@cumulus/db/getGranuleByUniqueColumns' - ); - - const logger = new Logger({ sender: '@cumulus/api/granules' }); - const PgGranuleRecords = await granulePgModel.search(knexOrTransaction, { granule_id: granuleId, }); if (PgGranuleRecords.length > 1) { - logger.warn(`Granule ID ${granuleId} is not unique across collections, cannot make an update action based on granule Id alone`); + log.warn(`Granule ID ${granuleId} is not unique across collections, cannot make an update action based on granule Id alone`); throw new Error(`Failed to write ${granuleId} due to granuleId duplication on postgres granule record`); } if (PgGranuleRecords.length === 0) { diff --git a/packages/db/src/translate/collections.ts b/packages/db/src/translate/collections.ts index 8c0ffeea1e7..13d16d237fd 100644 --- a/packages/db/src/translate/collections.ts +++ b/packages/db/src/translate/collections.ts @@ -22,6 +22,7 @@ export const translatePostgresCollectionToApiCollection = ( files: collectionRecord.files, reportToEms: collectionRecord.report_to_ems, sampleFileName: collectionRecord.sample_file_name, + hiddenFileBucket: collectionRecord.hidden_file_bucket, ignoreFilesConfigForDiscovery: collectionRecord.ignore_files_config_for_discovery, meta: collectionRecord.meta, tags: collectionRecord.tags, @@ -48,6 +49,7 @@ export const translateApiCollectionToPostgresCollection = ( files: (JSON.stringify(record.files)), report_to_ems: record.reportToEms, sample_file_name: record.sampleFileName, + hidden_file_bucket: record.hiddenFileBucket, ignore_files_config_for_discovery: record.ignoreFilesConfigForDiscovery, meta: record.meta, // have to stringify on an array of values diff --git a/packages/db/src/types/collection.ts b/packages/db/src/types/collection.ts index d67e3575f17..29d4c0ce5d5 100644 --- a/packages/db/src/types/collection.ts +++ b/packages/db/src/types/collection.ts @@ -11,6 +11,7 @@ export interface PostgresCollection { report_to_ems?: boolean, sample_file_name: string, url_path?: string, + hidden_file_bucket?: string, ignore_files_config_for_discovery?: boolean, meta?: object, tags?: string, diff --git a/packages/types/api/collections.d.ts b/packages/types/api/collections.d.ts index bf5975d2ec1..3be9f9c73da 100644 --- a/packages/types/api/collections.d.ts +++ b/packages/types/api/collections.d.ts @@ -19,6 +19,7 @@ export interface PartialCollectionRecord { files?: CollectionFile[], granuleId?: string, granuleIdExtraction?: string, + hiddenFileBucket?: string, ignoreFilesConfigForDiscovery?: boolean, name?: string, process?: string, From 5a83b0fb8a1bbcfe5a2167fcf658750fa5d217b8 Mon Sep 17 00:00:00 2001 From: jennyhliu Date: Fri, 31 Oct 2025 12:44:23 -0400 Subject: [PATCH 2/4] refactor demote granule add column --- .../api/src/lib/granule-demote-promote.ts | 214 ++++++++++-------- ...2012_collections_add_hidden_file_bucket.ts | 19 ++ 2 files changed, 143 insertions(+), 90 deletions(-) create mode 100644 packages/db/src/migrations/20251031122012_collections_add_hidden_file_bucket.ts diff --git a/packages/api/src/lib/granule-demote-promote.ts b/packages/api/src/lib/granule-demote-promote.ts index c9fb94ae954..50d5ebd6eaf 100644 --- a/packages/api/src/lib/granule-demote-promote.ts +++ b/packages/api/src/lib/granule-demote-promote.ts @@ -23,20 +23,110 @@ import { translatePostgresCollectionToApiCollection, translatePostgresGranuleToApiGranule, } from '@cumulus/db'; +import { publishGranuleUpdateSnsMessage } from '../../lib/publishSnsMessageUtils'; + const unpublishGranule = require('../../lib/granule-remove-from-cmr'); -const { publishGranuleDeleteSnsMessage } = require('../../lib/publishSnsMessageUtils'); const log = new Logger({ sender: 'granule-demote-promote' }); /** - * Demote granule: - * - Remove from CMR - * - Move granule files to hidden location - * - Update DB file records - * - Update granule_group state to 'H' - * - Publish SNS event + * Generate new file locations for hidden bucket storage. + */ +const buildHiddenFileLocations = (pgFiles: any[], apiCollection: any, granuleId: string) => + pgFiles.map((pgFile) => ({ + ...pgFile, + newBucket: apiCollection.hiddenFileBucket || pgFile.bucket, + newKey: `${granuleId}/${pgFile.key}`, + })); + +// TODO move files partial recovery, not all files moved to destination +// TODO limit concurrent move +/** + * Move a set of files to their new S3 locations. + */ +const moveGranuleFiles = async (files: any[]) => { + await Promise.all(files.map(async (file) => { + await moveObject({ + sourceBucket: file.bucket, + sourceKey: file.key, + destinationBucket: file.newBucket, + destinationKey: file.newKey, + copyTags: true, + }); + log.info(`Moved ${file.bucket}/${file.key} → ${file.newBucket}/${file.newKey}`); + })); +}; + +/** + * Roll back file moves if something fails during the demotion process. + */ +const rollbackFileMoves = async (files: any[]) => { + await Promise.all(files.map(async (file) => { + log.info(`Rolling back ${file.newBucket}/${file.newKey} → ${file.bucket}/${file.key}`); + await moveObject({ + sourceBucket: file.newBucket, + sourceKey: file.newKey, + destinationBucket: file.bucket, + destinationKey: file.key, + copyTags: true, + }); + })); +}; + +/** + * Update file records and granule group state in a transaction. */ -export const demoteGranule = async (params: { +export const updateDatabaseRecords = async ({ + knex, + filePgModel, + granuleGroupsModel, + files, + granuleCumulusId, + existingGroup, +}: { + knex: Knex; + filePgModel: FilePgModel; + granuleGroupsModel: GranuleGroupsPgModel; + files: any[]; + granuleCumulusId: number; + existingGroup?: any; +}): Promise => { + const trx = await knex.transaction(); + + try { + const updatedFileRecords = files.map((file) => ({ + cumulus_id: file.cumulus_id, + granule_cumulus_id: file.granule_cumulus_id, + bucket: file.newBucket, + key: file.newKey, + })); + + await filePgModel.upsert(trx, updatedFileRecords); + + // TODO existing or not already exist, both should work + await granuleGroupsModel.upsert(trx, { + group_id: existingGroup?.[0]?.group_id, + granule_cumulus_id: granuleCumulusId, + state: 'H', + }); + + await trx.commit(); + } catch (error) { + await trx.rollback(); + throw error; + } +}; + +export const demoteGranule = async ({ + knex, + granuleId, + granulePgModel = new GranulePgModel(), + collectionPgModel = new CollectionPgModel(), + filePgModel = new FilePgModel(), + granuleGroupsModel = new GranuleGroupsPgModel(), + pdrPgModel = new PdrPgModel(), + providerPgModel = new ProviderPgModel(), +}: { knex: Knex, granuleId: string, granulePgModel?: GranulePgModel, @@ -46,17 +136,6 @@ export const demoteGranule = async (params: { pdrPgModel?: PdrPgModel, providerPgModel?: ProviderPgModel, }) => { - const { - knex, - granuleId, - granulePgModel = new GranulePgModel(), - collectionPgModel = new CollectionPgModel(), - filePgModel = new FilePgModel(), - granuleGroupsModel = new GranuleGroupsPgModel(), - pdrPgModel = new PdrPgModel(), - providerPgModel = new ProviderPgModel(), - } = params; - log.info(`Demoting granule ${granuleId}`); let pgGranule; @@ -65,96 +144,50 @@ export const demoteGranule = async (params: { try { pgGranule = await getUniqueGranuleByGranuleId(knex, granuleId, granulePgModel); pgCollection = await collectionPgModel.get( - knex, { cumulus_id: pgGranule.collection_cumulus_id } + knex, + { cumulus_id: pgGranule.collection_cumulus_id } ); } catch (error) { if (error instanceof RecordDoesNotExist) { + log.warn(`Granule ${granuleId} does not exist — skipping demotion`); return; } throw error; } const apiCollection = translatePostgresCollectionToApiCollection(pgCollection); - const pgGranuleCumulusId = pgGranule.cumulus_id; + const granuleCumulusId = pgGranule.cumulus_id; - // 1. Remove from CMR + // 1: Remove from CMR await unpublishGranule({ pgGranuleRecord: pgGranule, pgCollection, knex }); - const existingGroup = await granuleGroupsModel.search( - knex, - { granule_cumulus_id: pgGranuleCumulusId } - ); - - const pgFiles = await filePgModel.search( - knex, - { granule_cumulus_id: pgGranuleCumulusId } - ); - - const pgFilesWithNewLocation = pgFiles.map((pgFile) => { - const updatedPgFile: any = structuredClone(pgFile); - updatedPgFile.newBucket = apiCollection.hiddenFileBucket || pgFile.bucket; - updatedPgFile.newKey = `${granuleId}/${pgFile.key}`; - return updatedPgFile; + // 2: Prepare new file locations + const pgFiles = await filePgModel.search(knex, { granule_cumulus_id: granuleCumulusId }); + const filesWithUpdatedLocations = buildHiddenFileLocations(pgFiles, apiCollection, granuleId); + + const existingGroup = await granuleGroupsModel.search(knex, { + granule_cumulus_id: granuleCumulusId, }); - // TODO move files partial recovery - let trx; + // 3: Move files and update database try { - // 2. Move files to hidden bucket and location - // If there is a hidden bucket, move all the files to this bucket with prefix granuleId/ - // If not configured, move each file to its own bucket with prefix granuleId/ - //TODO limit parallel move files - await Promise.all(pgFilesWithNewLocation.map(async (file) => { - const moved = await moveObject({ - sourceBucket: file.bucket, - sourceKey: file.key, - destinationBucket: file.newBucket, - destinationKey: file.newKey, - copyTags: true, - }); - log.info(`Moved ${file.bucket}/${file.key} -> ${file.newBucket}/${file.newKey}`); - return moved; - })); - - // update granule files in db - const pgFileRecordsForUpdate = pgFilesWithNewLocation.map((file) => ({ - cumulus_id: file.cumulus_id, - granule_cumulus_id: file.granule_cumulus_id, - bucket: file.newBucket, - key: file.newKey, - })); - - trx = await knex.transaction(); - await filePgModel.upsert(trx, pgFileRecordsForUpdate); - - // 3. Update granule_group state to 'H' - // TODO existing or not already exist, both should work - const granuleGroup = { - group_id: existingGroup?.[0]?.group_id, - granule_cumulus_id: pgGranuleCumulusId, - state: 'H', - }; - await granuleGroupsModel.upsert(trx, granuleGroup); - - await trx.commit(); + await moveGranuleFiles(filesWithUpdatedLocations); + await updateDatabaseRecords({ + knex, + filePgModel, + granuleGroupsModel, + files: filesWithUpdatedLocations, + granuleCumulusId, + existingGroup, + }); } catch (error) { - if (trx) await trx.rollback(); - await Promise.all(pgFilesWithNewLocation.map((file) => { - log.info(`Recover ${file.newBucket}/${file.newKey} -> ${file.bucket}/${file.key}`); - return moveObject({ - sourceBucket: file.newBucket, - sourceKey: file.newKey, - destinationBucket: file.bucket, - destinationKey: file.key, - copyTags: true, - }); - })); + await rollbackFileMoves(filesWithUpdatedLocations); log.error(`Failed to demote granule ${granuleId}: ${(error as Error).message}`); throw error; } - // 4. Publish SNS topic - const granuleToPublishToSns = await translatePostgresGranuleToApiGranule({ + // 4: Publish SNS event + const apiGranule = await translatePostgresGranuleToApiGranule({ granulePgRecord: pgGranule, knexOrTransaction: knex, collectionPgModel, @@ -162,7 +195,8 @@ export const demoteGranule = async (params: { pdrPgModel, providerPgModel, }); - await publishGranuleDeleteSnsMessage(granuleToPublishToSns); + + await publishGranuleUpdateSnsMessage(apiGranule); log.info(`Granule ${granuleId} demoted successfully`); }; diff --git a/packages/db/src/migrations/20251031122012_collections_add_hidden_file_bucket.ts b/packages/db/src/migrations/20251031122012_collections_add_hidden_file_bucket.ts new file mode 100644 index 00000000000..bbf674b319c --- /dev/null +++ b/packages/db/src/migrations/20251031122012_collections_add_hidden_file_bucket.ts @@ -0,0 +1,19 @@ +import { Knex } from 'knex'; + +export const up = async (knex: Knex): Promise => { + if (!await knex.schema.hasColumn('collections', 'hidden_file_bucket')) { + await knex.schema.table('collections', (table) => { + table + .text('hidden_file_bucket') + .comment('Bucket for hidden granule files'); + }); + } +}; + +export const down = async (knex: Knex): Promise => { + if (await knex.schema.hasColumn('collections', 'hidden_file_bucket')) { + await knex.schema.table('collections', (table) => { + table.dropColumn('hidden_file_bucket'); + }); + } +}; From 7083df6d675e039091a6bd8addd19e8e461e6d84 Mon Sep 17 00:00:00 2001 From: jennyhliu Date: Fri, 31 Oct 2025 13:42:16 -0400 Subject: [PATCH 3/4] fix import --- packages/api/src/lib/granule-demote-promote.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/api/src/lib/granule-demote-promote.ts b/packages/api/src/lib/granule-demote-promote.ts index 50d5ebd6eaf..17ded37b250 100644 --- a/packages/api/src/lib/granule-demote-promote.ts +++ b/packages/api/src/lib/granule-demote-promote.ts @@ -23,7 +23,7 @@ import { translatePostgresCollectionToApiCollection, translatePostgresGranuleToApiGranule, } from '@cumulus/db'; -import { publishGranuleUpdateSnsMessage } from '../../lib/publishSnsMessageUtils'; +const { publishGranuleUpdateSnsMessage } = require('../../lib/publishSnsMessageUtils'); const unpublishGranule = require('../../lib/granule-remove-from-cmr'); From 37fdb443c9bfa1b09eac9fcb8b0552d92cd0b639 Mon Sep 17 00:00:00 2001 From: jennyhliu Date: Tue, 4 Nov 2025 18:59:10 -0500 Subject: [PATCH 4/4] fileModel.updateFilesById work on unit test --- packages/api/lib/schemas.js | 2 +- .../api/src/lib/granule-demote-promote.ts | 6 +- packages/api/tests/lib/test-granule-demote.js | 493 ++++++++++++++++++ packages/db/src/models/file.ts | 38 ++ 4 files changed, 535 insertions(+), 4 deletions(-) create mode 100644 packages/api/tests/lib/test-granule-demote.js diff --git a/packages/api/lib/schemas.js b/packages/api/lib/schemas.js index 82226f0c476..0016974649b 100644 --- a/packages/api/lib/schemas.js +++ b/packages/api/lib/schemas.js @@ -151,7 +151,7 @@ module.exports.collection = { type: 'string', }, hiddenFileBucket: { - description: 'Name of the S3 bucket used to store granule files in the hidden archive. Defaults to the primary archive bucket if not specified.', + description: 'Name of the S3 bucket used to store granule files in the hidden archive. Defaults to the primary archive buckets if not specified.', type: 'string', }, ignoreFilesConfigForDiscovery: { diff --git a/packages/api/src/lib/granule-demote-promote.ts b/packages/api/src/lib/granule-demote-promote.ts index 17ded37b250..3c8b00822ed 100644 --- a/packages/api/src/lib/granule-demote-promote.ts +++ b/packages/api/src/lib/granule-demote-promote.ts @@ -24,8 +24,7 @@ import { translatePostgresGranuleToApiGranule, } from '@cumulus/db'; const { publishGranuleUpdateSnsMessage } = require('../../lib/publishSnsMessageUtils'); - -const unpublishGranule = require('../../lib/granule-remove-from-cmr'); +const { unpublishGranule } = require('../../lib/granule-remove-from-cmr'); const log = new Logger({ sender: 'granule-demote-promote' }); @@ -101,7 +100,7 @@ export const updateDatabaseRecords = async ({ key: file.newKey, })); - await filePgModel.upsert(trx, updatedFileRecords); + await filePgModel.updateFilesById(trx, updatedFileRecords); // TODO existing or not already exist, both should work await granuleGroupsModel.upsert(trx, { @@ -112,6 +111,7 @@ export const updateDatabaseRecords = async ({ await trx.commit(); } catch (error) { + log.error(error); await trx.rollback(); throw error; } diff --git a/packages/api/tests/lib/test-granule-demote.js b/packages/api/tests/lib/test-granule-demote.js new file mode 100644 index 00000000000..ae9e781e21d --- /dev/null +++ b/packages/api/tests/lib/test-granule-demote.js @@ -0,0 +1,493 @@ +'use strict'; + +const test = require('ava'); +const sinon = require('sinon'); +const proxyquire = require('proxyquire'); +const cryptoRandomString = require('crypto-random-string'); +const { randomId, randomString } = require('@cumulus/common/test-utils'); +const s3Client = require('@cumulus/aws-client/S3'); +const { sns, sqs } = require('@cumulus/aws-client/services'); +const { SubscribeCommand, DeleteTopicCommand } = require('@aws-sdk/client-sns'); +const { createSnsTopic } = require('@cumulus/aws-client/SNS'); +const { + createBucket, + deleteS3Buckets, + s3PutObject, + s3ObjectExists, +} = require('@cumulus/aws-client/S3'); +const { + CollectionPgModel, + FilePgModel, + GranulePgModel, + GranuleGroupsPgModel, + generateLocalTestDb, + destroyLocalTestDb, + localStackConnectionEnv, + migrationDir, + fakeGranuleRecordFactory, + translateApiCollectionToPostgresCollection, +} = require('@cumulus/db'); + +const { + fakeCollectionFactory, +} = require('../../lib/testUtils'); +const { updateDatabaseRecords } = require('../../src/lib/granule-demote-promote'); + +// Create stubs +const unpublishGranuleStub = sinon.stub().resolves(); +const publishGranuleUpdateSnsMessageStub = sinon.stub().resolves(); + +// Import module under test with stubs injected +const { demoteGranule } = proxyquire('../../src/lib/granule-demote-promote', { + '../../lib/granule-remove-from-cmr': { unpublishGranule: unpublishGranuleStub }, + '../../lib/publishSnsMessageUtils': { publishGranuleUpdateSnsMessage: publishGranuleUpdateSnsMessageStub }, +}); + +const testDbName = `granules_${cryptoRandomString({ length: 10 })}`; +let granulePgModel; +let filePgModel; +let granuleGroupsModel; + +process.env.stackName = randomId('stackname'); +process.env.system_bucket = randomId('bucket'); +process.env.TOKEN_SECRET = randomId('secret'); + +test.before(async (t) => { + process.env = { + ...process.env, + ...localStackConnectionEnv, + PG_DATABASE: testDbName, + }; + + // Create fake buckets + const visibleBucket = randomId('visible'); + const hiddenBucket = randomId('hidden'); + await createBucket(visibleBucket); + await createBucket(hiddenBucket); + + t.context.visibleBucket = visibleBucket; + t.context.hiddenBucket = hiddenBucket; + + const { knex, knexAdmin } = await generateLocalTestDb(testDbName, migrationDir); + t.context.knex = knex; + t.context.knexAdmin = knexAdmin; + + granulePgModel = new GranulePgModel(); + filePgModel = new FilePgModel(); + granuleGroupsModel = new GranuleGroupsPgModel(); + + // Create collection + t.context.testCollection = fakeCollectionFactory({ + name: 'fakeCollection', + version: 'v1', + duplicateHandling: 'error', + hiddenFileBucket: hiddenBucket, + }); + + const collectionPgModel = new CollectionPgModel(); + const [pgCollection] = await collectionPgModel.create( + knex, + translateApiCollectionToPostgresCollection(t.context.testCollection) + ); + t.context.collectionCumulusId = pgCollection.cumulus_id; + t.context.collectionPgModel = collectionPgModel; +}); + +test.beforeEach(async (t) => { + const topicName = randomString(); + const { TopicArn } = await createSnsTopic(topicName); + process.env.granule_sns_topic_arn = TopicArn; + t.context.TopicArn = TopicArn; + + const QueueName = randomString(); + const { QueueUrl } = await sqs().createQueue({ QueueName }); + t.context.QueueUrl = QueueUrl; + const getQueueAttributesResponse = await sqs().getQueueAttributes({ + QueueUrl, + AttributeNames: ['QueueArn'], + }); + const QueueArn = getQueueAttributesResponse.Attributes.QueueArn; + + const { SubscriptionArn } = await sns().send( + new SubscribeCommand({ TopicArn, Protocol: 'sqs', Endpoint: QueueArn }) + ); + t.context.SubscriptionArn = SubscriptionArn; +}); + +test.afterEach.always(async (t) => { + sinon.resetHistory(); + const { QueueUrl, TopicArn } = t.context; + await sqs().deleteQueue({ QueueUrl }); + await sns().send(new DeleteTopicCommand({ TopicArn })); +}); + +test.after.always(async (t) => { + await destroyLocalTestDb({ + knex: t.context.knex, + knexAdmin: t.context.knexAdmin, + testDbName, + }); + + await deleteS3Buckets([t.context.visibleBucket, t.context.hiddenBucket]); +}); + +test.serial.only('demoteGranule() moves files to hidden bucket and updates DB records', async (t) => { + const { knex, collectionCumulusId, visibleBucket, hiddenBucket } = t.context; + + const granuleId = randomId('granule'); + const fakePgGranule = fakeGranuleRecordFactory({ + granule_id: granuleId, + collection_cumulus_id: collectionCumulusId, + status: 'completed', + }); + const [pgGranule] = await granulePgModel.create(knex, fakePgGranule); + + const fileKey = `${granuleId}/file.txt`; + await s3PutObject({ + Bucket: visibleBucket, + Key: fileKey, + Body: 'test-content' }); + + // TODO granule is published initialially + // TODO multiple files + const [createdFile] = await filePgModel.create(knex, { + granule_cumulus_id: pgGranule.cumulus_id, + bucket: visibleBucket, + key: fileKey, + file_name: 'file.txt', + }); + + t.true(await s3ObjectExists({ Bucket: visibleBucket, Key: fileKey })); + + await demoteGranule({ + knex, + granuleId, + granulePgModel, + filePgModel, + granuleGroupsModel, + }); + + const hiddenKey = `${granuleId}/${fileKey}`; + t.false(await s3ObjectExists({ Bucket: visibleBucket, Key: fileKey })); + t.true(await s3ObjectExists({ Bucket: hiddenBucket, Key: hiddenKey })); + + const updatedFile = await filePgModel.get(knex, { cumulus_id: createdFile.cumulus_id }); + t.is(updatedFile.bucket, hiddenBucket); + t.is(updatedFile.key, hiddenKey); + + const groupRecord = await granuleGroupsModel.search(knex, { + granule_cumulus_id: pgGranule.cumulus_id, + }); + t.truthy(groupRecord[0]); + t.is(groupRecord[0].state, 'H'); + + t.true(unpublishGranuleStub.calledOnce); + const args = unpublishGranuleStub.firstCall.args[0]; + t.deepEqual(args.knex, knex); + t.true(publishGranuleUpdateSnsMessageStub.calledOnce); +}); + +test.serial.only('demoteGranule() rolls back file moves if one file move fails', async (t) => { + const { knex, collectionCumulusId, visibleBucket, hiddenBucket } = t.context; + + const granuleId = randomId('granule'); + const fakePgGranule = fakeGranuleRecordFactory({ + granule_id: granuleId, + collection_cumulus_id: collectionCumulusId, + status: 'completed', + }); + const [pgGranule] = await granulePgModel.create(knex, fakePgGranule); + + const fileKeys = [`${granuleId}/file1.txt`, `${granuleId}/file2.txt`]; + await Promise.all(fileKeys.map((key) => s3PutObject({ + Bucket: visibleBucket, + Key: key, + Body: 'test-content' }))); + + const filesToCreate = fileKeys.map((key) => ({ + granule_cumulus_id: pgGranule.cumulus_id, + bucket: visibleBucket, + key, + file_name: key.split('/').pop(), + })); + const createdFiles = await filePgModel.create(knex, filesToCreate, '*'); + + sinon.stub(s3Client, 'moveObject').callsFake(({ sourceKey }) => { + if (sourceKey.endsWith('file2.txt')) throw new Error('Simulated move failure'); + return true; + }); + + const error = await t.throwsAsync(() => + demoteGranule({ + knex, + granuleId, + granulePgModel, + filePgModel, + granuleGroupsModel, + })); + + t.true(error.message.includes('Simulated move failure')); + + for (const key of fileKeys) { + // eslint-disable-next-line no-await-in-loop + t.true(await s3ObjectExists({ Bucket: visibleBucket, Key: key })); + // eslint-disable-next-line no-await-in-loop + t.false(await s3ObjectExists({ Bucket: hiddenBucket, Key: `${granuleId}/${key}` })); + } + + for (const createdFile of createdFiles) { + // eslint-disable-next-line no-await-in-loop + const dbFile = await filePgModel.get(knex, { cumulus_id: createdFile.cumulus_id }); + t.is(dbFile.bucket, visibleBucket); + t.is(dbFile.key, createdFile.key); + } + + t.true(unpublishGranuleStub.calledOnce); + const args = unpublishGranuleStub.firstCall.args[0]; + t.deepEqual(args.knex, knex); + t.true(publishGranuleUpdateSnsMessageStub.notCalled); +}); + +test.serial('demoteGranule() rolls back if file copy succeeds but delete fails', async (t) => { + const { knex, collectionCumulusId, visibleBucket, hiddenBucket } = t.context; + + const granuleId = randomId('granule'); + const fakePgGranule = fakeGranuleRecordFactory({ + granule_id: granuleId, + collection_cumulus_id: collectionCumulusId, + status: 'completed', + }); + const [pgGranule] = await granulePgModel.create(knex, fakePgGranule); + + const fileKey = `${granuleId}/file.txt`; + await s3PutObject(visibleBucket, fileKey, 'test-content'); + + const [pgFile] = await filePgModel.create(knex, { + granule_cumulus_id: pgGranule.cumulus_id, + bucket: visibleBucket, + key: fileKey, + file_name: 'file.txt', + }); + + sinon.stub(s3Client, 'moveObject').callsFake(async (params) => { + if (params.destinationBucket === hiddenBucket) { + await s3PutObject(params.destinationBucket, params.destinationKey, 'copied-content'); + } + throw new Error('Simulated S3 delete failure'); + }); + + const error = await t.throwsAsync(() => + demoteGranule({ + knex, + granuleId, + granulePgModel, + filePgModel, + granuleGroupsModel, + })); + + t.true(error.message.includes('Simulated S3 delete failure')); + t.true(await s3ObjectExists({ Bucket: visibleBucket, Key: fileKey })); + + const hiddenKey = `${granuleId}/${fileKey}`; + t.false(await s3ObjectExists({ Bucket: hiddenBucket, Key: hiddenKey })); + + const fileRecord = await filePgModel.get(knex, { cumulus_id: pgFile.cumulus_id }); + t.is(fileRecord.bucket, visibleBucket); + t.is(fileRecord.key, fileKey); +}); + +test.serial('demoteGranule() should not exceed concurrency limit when moving many files (placeholder)', async (t) => { + const { knex, collectionCumulusId, visibleBucket, hiddenBucket } = t.context; + + // Create granule with multiple files + const granuleId = randomId('granule'); + const fakePgGranule = fakeGranuleRecordFactory({ + granule_id: granuleId, + collection_cumulus_id: collectionCumulusId, + status: 'completed', + }); + const [pgGranule] = await granulePgModel.create(knex, fakePgGranule); + + const totalFiles = 10; + const keys = Array.from({ length: totalFiles }, (_, i) => `${granuleId}/file${i}.txt`); + await Promise.all(keys.map((key) => s3PutObject(visibleBucket, key, 'test-content'))); + + for (const key of keys) { + await filePgModel.create(knex, { + granule_cumulus_id: pgGranule.cumulus_id, + bucket: visibleBucket, + key, + file_name: key.split('/').pop(), + }); + } + + // Track concurrent move operations + let concurrent = 0; + let maxConcurrent = 0; + + const moveObjectStub = sinon.stub().callsFake(async (args) => { + concurrent += 1; + maxConcurrent = Math.max(maxConcurrent, concurrent); + await new Promise((r) => setTimeout(r, 20)); // simulate work + concurrent -= 1; + }); + require.cache[require.resolve('@cumulus/aws-client/S3')].exports.moveObject = moveObjectStub; + + await demoteGranule({ + knex, + granuleId, + granulePgModel, + filePgModel, + granuleGroupsModel, + }); + + // For now, concurrency may equal total files — future limit test will assert < some threshold + t.true(maxConcurrent >= 1); + + // Clean up stub + delete require.cache[require.resolve('@cumulus/aws-client/S3')]; +}); + +test.serial('updateDatabaseRecords() creates granule group if none exists', async (t) => { + const { knex, collectionCumulusId } = t.context; + + // Create a granule and file + const granuleId = randomId('granule'); + const fakePgGranule = fakeGranuleRecordFactory({ + granule_id: granuleId, + collection_cumulus_id: collectionCumulusId, + }); + const [pgGranule] = await granulePgModel.create(knex, fakePgGranule); + + const pgFile = { + granule_cumulus_id: pgGranule.cumulus_id, + bucket: 'fake-bucket', + key: 'file.txt', + file_name: 'file.txt', + }; + const [createdFile] = await filePgModel.create(knex, pgFile); + + // Verify no group exists yet + const existingGroup = await granuleGroupsModel.search(knex, { + granule_cumulus_id: pgGranule.cumulus_id, + }); + t.is(existingGroup.length, 0); + + // Call updateDatabaseRecords (should create new group) + await updateDatabaseRecords({ + knex, + filePgModel, + granuleGroupsModel, + files: [{ + ...createdFile, + newBucket: 'new-bucket', + newKey: 'new-key.txt', + }], + granuleCumulusId: pgGranule.cumulus_id, + }); + + // Verify granule group created + const newGroup = await granuleGroupsModel.search(knex, { + granule_cumulus_id: pgGranule.cumulus_id, + }); + t.is(newGroup.length, 1); + t.is(newGroup[0].state, 'H'); + + // Verify file record updated + const updatedFile = await filePgModel.get(knex, { cumulus_id: createdFile.cumulus_id }); + t.is(updatedFile.bucket, 'new-bucket'); + t.is(updatedFile.key, 'new-key.txt'); +}); + +test.serial('updateDatabaseRecords() updates existing granule group if it already exists', async (t) => { + const { knex, collectionCumulusId } = t.context; + + // Create a granule and file + const granuleId = randomId('granule'); + const fakePgGranule = fakeGranuleRecordFactory({ + granule_id: granuleId, + collection_cumulus_id: collectionCumulusId, + }); + const [pgGranule] = await granulePgModel.create(knex, fakePgGranule); + + const pgFile = { + granule_cumulus_id: pgGranule.cumulus_id, + bucket: 'fake-bucket', + key: 'file.txt', + file_name: 'file.txt', + }; + const [createdFile] = await filePgModel.create(knex, pgFile); + + // Create an existing granule group (state = 'V') + const [existingGroup] = await granuleGroupsModel.create(knex, { + granule_cumulus_id: pgGranule.cumulus_id, + state: 'V', + }); + + // Call updateDatabaseRecords with updated file + await updateDatabaseRecords({ + knex, + filePgModel, + granuleGroupsModel, + files: [{ + ...createdFile, + newBucket: 'updated-bucket', + newKey: 'updated-key.txt', + }], + granuleCumulusId: pgGranule.cumulus_id, + existingGroup: [existingGroup], + }); + + // Verify granule group was updated (state changed to 'H') + const updatedGroup = await granuleGroupsModel.get(knex, { cumulus_id: existingGroup.cumulus_id }); + t.is(updatedGroup.state, 'H'); + + // Verify file record updated too + const updatedFile = await filePgModel.get(knex, { cumulus_id: createdFile.cumulus_id }); + t.is(updatedFile.bucket, 'updated-bucket'); + t.is(updatedFile.key, 'updated-key.txt'); +}); + +test.serial('updateDatabaseRecords() rolls back if database transaction fails', async (t) => { + const { knex, collectionCumulusId } = t.context; + + // Create granule + file + const granuleId = randomId('granule'); + const fakePgGranule = fakeGranuleRecordFactory({ + granule_id: granuleId, + collection_cumulus_id: collectionCumulusId, + }); + const [pgGranule] = await granulePgModel.create(knex, fakePgGranule); + + const pgFile = { + granule_cumulus_id: pgGranule.cumulus_id, + bucket: 'source-bucket', + key: 'file.txt', + file_name: 'file.txt', + }; + const [createdFile] = await filePgModel.create(knex, pgFile); + + // Monkey-patch FilePgModel.upsert() to simulate DB failure + const filePgModelStub = new FilePgModel(); + const upsertStub = sinon.stub(filePgModelStub, 'upsert').throws(new Error('Simulated DB error')); + + await t.throwsAsync( + () => updateDatabaseRecords({ + knex, + filePgModel: filePgModelStub, + granuleGroupsModel: new GranuleGroupsPgModel(), + files: [{ + ...createdFile, + newBucket: 'updated-bucket', + newKey: 'updated-key.txt', + }], + granuleCumulusId: pgGranule.cumulus_id, + }), + { message: /Simulated DB error/ } + ); + + // DB should remain unchanged + const fileAfter = await filePgModel.get(knex, { cumulus_id: createdFile.cumulus_id }); + t.is(fileAfter.bucket, 'source-bucket'); + t.is(fileAfter.key, 'file.txt'); +}); diff --git a/packages/db/src/models/file.ts b/packages/db/src/models/file.ts index 275f3fe4e9f..0ec5b8b836d 100644 --- a/packages/db/src/models/file.ts +++ b/packages/db/src/models/file.ts @@ -27,6 +27,44 @@ class FilePgModel extends BasePgModel { .returning('*'); } + /** + * Updates one or multiple files by cumulus_id + */ + async updateFilesById( + knexOrTrx: Knex | Knex.Transaction, + input: Partial | Partial[] + ): Promise { + const files = Array.isArray(input) ? input : [input]; + + if (files.length === 0) return Promise.resolve([]); + + const results: PostgresFileRecord[] = []; + + for (const file of files) { + const { cumulus_id: cumulusId, ...updates } = file; + + if (!cumulusId) { + throw new Error('cumulus_id is required to update a file'); + } + + if (Object.keys(updates).length > 0) { + // eslint-disable-next-line no-await-in-loop + const [updated] = await knexOrTrx(this.tableName) + .where({ cumulus_id: cumulusId }) + .update(updates) + .returning('*'); + + if (!updated) { + throw new Error(`File not found with cumulus_id=${cumulusId}`); + } + + results.push(updated); + } + } + + return results; + } + /** * Retrieves all files for all granules given */