From 7a980d945122cd9a9ef2100894109a810b66ab2b Mon Sep 17 00:00:00 2001 From: Thibault Reidy Date: Wed, 21 Feb 2024 15:04:43 +0000 Subject: [PATCH 1/6] feat: move the transaction inside each move function - move transaction into move function of itemService - publish websocket post move item after the transaction - publish websocket feedback after all the transactions --- src/services/item/controller.ts | 65 ++++++++++++++++++--------- src/services/item/service.ts | 78 ++++++++++++++------------------- 2 files changed, 78 insertions(+), 65 deletions(-) diff --git a/src/services/item/controller.ts b/src/services/item/controller.ts index 23e85ef7fa..97a12bf10d 100644 --- a/src/services/item/controller.ts +++ b/src/services/item/controller.ts @@ -271,30 +271,55 @@ const plugin: FastifyPluginAsync = async (fastify) => { body: { parentId }, log, } = request; - db.transaction(async (manager) => { - const repositories = buildRepositories(manager); - const items = await itemService.moveMany(member, repositories, ids, parentId); - await actionItemService.postManyMoveAction(request, reply, repositories, items); - if (member) { - websockets.publish( - memberItemsTopic, - member.id, - ItemOpFeedbackEvent('move', ids, { - data: Object.fromEntries(items.map((i) => [i.id, i])), - errors: [], - }), + + Promise.allSettled( + ids.map((itemId) => itemService.move(member, db, itemId, parentId)), + ).then(async (results) => { + const successfulItems = results + .filter((result) => result.status === 'fulfilled') + .map((result) => (result as PromiseFulfilledResult).value); + + const errors = results + .filter((result) => result.status === 'rejected') + .map((result) => (result as PromiseRejectedResult).reason); + + if (successfulItems.length) { + await actionItemService.postManyMoveAction( + request, + reply, + buildRepositories(), + successfulItems, ); + if (member) { + websockets.publish( + memberItemsTopic, + member.id, + ItemOpFeedbackEvent( + 'move', + successfulItems.map((i) => i.id), + { + data: Object.fromEntries(successfulItems.map((i) => [i.id, i])), + errors: [], + }, + ), + ); + } } - }).catch((e) => { - log.error(e); - if (member) { - websockets.publish( - memberItemsTopic, - member.id, - ItemOpFeedbackEvent('move', ids, { error: e }), - ); + + if (errors.length) { + if (member) { + errors.forEach((e) => { + log.error(e); + websockets.publish( + memberItemsTopic, + member.id, + ItemOpFeedbackEvent('move', ids, { error: e }), + ); + }); + } } }); + reply.status(StatusCodes.ACCEPTED); return ids; }, diff --git a/src/services/item/service.ts b/src/services/item/service.ts index f2b8a15398..353160e732 100644 --- a/src/services/item/service.ts +++ b/src/services/item/service.ts @@ -23,7 +23,7 @@ import { UnauthorizedMember, } from '../../utils/errors'; import HookManager from '../../utils/hook'; -import { Repositories } from '../../utils/repositories'; +import { Repositories, buildRepositories } from '../../utils/repositories'; import { filterOutItems, validatePermission, validatePermissionMany } from '../authorization'; import { Actor, Member } from '../member/entities/member'; import { mapById } from '../utils'; @@ -364,59 +364,47 @@ export class ItemService { } /////// -------- MOVE - async move(actor: Actor, repositories: Repositories, itemId: UUID, toItemId?: UUID) { + async move(actor: Actor, db, itemId: UUID, toItemId?: UUID) { if (!actor) { throw new UnauthorizedMember(actor); } - const { itemRepository } = repositories; - // TODO: check memberships - let parentItem; - if (toItemId) { - parentItem = await itemRepository.get(toItemId); - await validatePermission(repositories, PermissionLevel.Write, actor, parentItem); - } - const item = await itemRepository.get(itemId); - - await validatePermission(repositories, PermissionLevel.Admin, actor, item); + return db + .transaction(async (manager) => { + const repositories = buildRepositories(manager); + const { itemRepository } = repositories; - // check how "big the tree is" below the item - await itemRepository.checkNumberOfDescendants(item, MAX_DESCENDANTS_FOR_MOVE); - - if (parentItem) { - // check how deep (number of levels) the resulting tree will be - const levelsToFarthestChild = await itemRepository.getNumberOfLevelsToFarthestChild(item); - await itemRepository.checkHierarchyDepth(parentItem, levelsToFarthestChild); - } - - // post hook - // question: invoque on all items? - await this.hooks.runPreHooks('move', actor, repositories, { - source: item, - destinationParent: parentItem, - }); - - const result = await this._move(actor, repositories, item, parentItem); + // TODO: check memberships + let parentItem; + if (toItemId) { + parentItem = await itemRepository.get(toItemId); + await validatePermission(repositories, PermissionLevel.Write, actor, parentItem); + } + const item = await itemRepository.get(itemId); - await this.hooks.runPostHooks('move', actor, repositories, { - source: item, - sourceParentId: getParentFromPath(item.path), - destination: result, - }); + await validatePermission(repositories, PermissionLevel.Admin, actor, item); - return result; - } + // check how "big the tree is" below the item + await itemRepository.checkNumberOfDescendants(item, MAX_DESCENDANTS_FOR_MOVE); - // TODO: optimize - async moveMany(actor: Actor, repositories: Repositories, itemIds: string[], toItemId?: string) { - if (!actor) { - throw new UnauthorizedMember(actor); - } + if (parentItem) { + // check how deep (number of levels) the resulting tree will be + const levelsToFarthestChild = await itemRepository.getNumberOfLevelsToFarthestChild(item); + await itemRepository.checkHierarchyDepth(parentItem, levelsToFarthestChild); + } - const items = await Promise.all( - itemIds.map((id) => this.move(actor, repositories, id, toItemId)), - ); - return items; + const result = await this._move(actor, repositories, item, parentItem); + return { result, item }; + }) + .then(async ({ result, item }) => { + await this.hooks.runPostHooks('move', actor, buildRepositories(), { + source: item, + sourceParentId: getParentFromPath(item.path), + destination: result, + }); + + return result; + }); } /** From d8ce57b410422d5f677d4f8bc81fd781050a2a07 Mon Sep 17 00:00:00 2001 From: Thibault Reidy Date: Thu, 22 Feb 2024 11:22:57 +0000 Subject: [PATCH 2/6] feat: move the transactions in the controller - each transaction execute one move - add ws services to publish websockets --- src/services/item/controller.ts | 77 +++++++++------------ src/services/item/service.ts | 60 ++++++++-------- src/services/item/utils.ts | 12 ++++ src/services/item/ws/hooks.ts | 54 --------------- src/services/item/ws/services.ts | 115 +++++++++++++++++++++++++++++++ 5 files changed, 188 insertions(+), 130 deletions(-) create mode 100644 src/services/item/ws/services.ts diff --git a/src/services/item/controller.ts b/src/services/item/controller.ts index 97a12bf10d..763b94213e 100644 --- a/src/services/item/controller.ts +++ b/src/services/item/controller.ts @@ -2,7 +2,7 @@ import { StatusCodes } from 'http-status-codes'; import { FastifyPluginAsync } from 'fastify'; -import { IdParam, IdsParams, ParentIdParam, PermissionLevel } from '@graasp/sdk'; +import { IdParam, IdsParams, ParentIdParam, PermissionLevel, getParentFromPath } from '@graasp/sdk'; import { PaginationParams } from '../../types'; import { UnauthorizedMember } from '../../utils/errors'; @@ -26,6 +26,7 @@ import { import { ItemGeolocation } from './plugins/geolocation/ItemGeolocation'; import { ItemChildrenParams, ItemSearchParams } from './types'; import { ItemOpFeedbackEvent, memberItemsTopic } from './ws/events'; +import { publishAfterMoved, publishFeedbackAfterAllMoved } from './ws/services'; const plugin: FastifyPluginAsync = async (fastify) => { const { db, items, websockets } = fastify; @@ -271,52 +272,42 @@ const plugin: FastifyPluginAsync = async (fastify) => { body: { parentId }, log, } = request; - Promise.allSettled( - ids.map((itemId) => itemService.move(member, db, itemId, parentId)), - ).then(async (results) => { - const successfulItems = results - .filter((result) => result.status === 'fulfilled') - .map((result) => (result as PromiseFulfilledResult).value); + ids.map((itemId) => { + return db + .transaction(async (manager) => { + const repositories = buildRepositories(manager); + const { source, destination } = await itemService.move( + member, + repositories, + itemId, + parentId, + ); - const errors = results - .filter((result) => result.status === 'rejected') - .map((result) => (result as PromiseRejectedResult).reason); + await actionItemService.postManyMoveAction(request, reply, repositories, [ + destination, + ]); - if (successfulItems.length) { - await actionItemService.postManyMoveAction( - request, - reply, - buildRepositories(), - successfulItems, - ); - if (member) { - websockets.publish( - memberItemsTopic, - member.id, - ItemOpFeedbackEvent( - 'move', - successfulItems.map((i) => i.id), - { - data: Object.fromEntries(successfulItems.map((i) => [i.id, i])), - errors: [], - }, - ), - ); - } - } - - if (errors.length) { - if (member) { - errors.forEach((e) => { - log.error(e); - websockets.publish( - memberItemsTopic, - member.id, - ItemOpFeedbackEvent('move', ids, { error: e }), - ); + return { source, destination }; + }) + .then(async ({ source, destination }) => { + publishAfterMoved(websockets, { + source, + destination, + sourceParentId: getParentFromPath(source.path), + }); + return destination; }); - } + }), + ).then(async (results) => { + if (member) { + publishFeedbackAfterAllMoved({ + websockets, + log, + results, + itemIds: ids, + memberId: member.id, + }); } }); diff --git a/src/services/item/service.ts b/src/services/item/service.ts index 353160e732..40167af0de 100644 --- a/src/services/item/service.ts +++ b/src/services/item/service.ts @@ -23,7 +23,7 @@ import { UnauthorizedMember, } from '../../utils/errors'; import HookManager from '../../utils/hook'; -import { Repositories, buildRepositories } from '../../utils/repositories'; +import { Repositories } from '../../utils/repositories'; import { filterOutItems, validatePermission, validatePermissionMany } from '../authorization'; import { Actor, Member } from '../member/entities/member'; import { mapById } from '../utils'; @@ -364,47 +364,41 @@ export class ItemService { } /////// -------- MOVE - async move(actor: Actor, db, itemId: UUID, toItemId?: UUID) { + async move(actor: Actor, repositories: Repositories, itemId: UUID, toItemId?: UUID) { if (!actor) { throw new UnauthorizedMember(actor); } - return db - .transaction(async (manager) => { - const repositories = buildRepositories(manager); - const { itemRepository } = repositories; + const { itemRepository } = repositories; - // TODO: check memberships - let parentItem; - if (toItemId) { - parentItem = await itemRepository.get(toItemId); - await validatePermission(repositories, PermissionLevel.Write, actor, parentItem); - } - const item = await itemRepository.get(itemId); + // TODO: check memberships + let parentItem; + if (toItemId) { + parentItem = await itemRepository.get(toItemId); + await validatePermission(repositories, PermissionLevel.Write, actor, parentItem); + } + const item = await itemRepository.get(itemId); - await validatePermission(repositories, PermissionLevel.Admin, actor, item); + await validatePermission(repositories, PermissionLevel.Admin, actor, item); - // check how "big the tree is" below the item - await itemRepository.checkNumberOfDescendants(item, MAX_DESCENDANTS_FOR_MOVE); + // check how "big the tree is" below the item + await itemRepository.checkNumberOfDescendants(item, MAX_DESCENDANTS_FOR_MOVE); - if (parentItem) { - // check how deep (number of levels) the resulting tree will be - const levelsToFarthestChild = await itemRepository.getNumberOfLevelsToFarthestChild(item); - await itemRepository.checkHierarchyDepth(parentItem, levelsToFarthestChild); - } + if (parentItem) { + // check how deep (number of levels) the resulting tree will be + const levelsToFarthestChild = await itemRepository.getNumberOfLevelsToFarthestChild(item); + await itemRepository.checkHierarchyDepth(parentItem, levelsToFarthestChild); + } - const result = await this._move(actor, repositories, item, parentItem); - return { result, item }; - }) - .then(async ({ result, item }) => { - await this.hooks.runPostHooks('move', actor, buildRepositories(), { - source: item, - sourceParentId: getParentFromPath(item.path), - destination: result, - }); - - return result; - }); + const result = await this._move(actor, repositories, item, parentItem); + + await this.hooks.runPostHooks('move', actor, repositories, { + source: item, + sourceParentId: getParentFromPath(item.path), + destination: result, + }); + + return { destination: result, source: item }; } /** diff --git a/src/services/item/utils.ts b/src/services/item/utils.ts index d96a61d939..0f91df1ffb 100644 --- a/src/services/item/utils.ts +++ b/src/services/item/utils.ts @@ -89,3 +89,15 @@ export const readPdfContent = async (source: string | URL) => { return ''; } }; + +export const getPromiseResults = (results: PromiseSettledResult[]) => { + const success = results + .filter((result) => result.status === 'fulfilled') + .map((result) => (result as PromiseFulfilledResult).value); + + const failed = results + .filter((result) => result.status === 'rejected') + .map((result) => (result as PromiseRejectedResult).reason); + + return { success, failed }; +}; diff --git a/src/services/item/ws/hooks.ts b/src/services/item/ws/hooks.ts index 3f4686706c..0cb3d5a613 100644 --- a/src/services/item/ws/hooks.ts +++ b/src/services/item/ws/hooks.ts @@ -70,23 +70,6 @@ function registerItemTopic(websockets: WebsocketService, itemService: ItemServic websockets.publish(itemTopic, parentId, ChildItemEvent('create', item)); } }); - - // on move item, notify: - // - parent of old location of deleted child - // - parent of new location of new child - itemService.hooks.setPostHook( - 'move', - async (actor, repositories, { sourceParentId, source, destination }) => { - if (sourceParentId !== undefined) { - websockets.publish(itemTopic, sourceParentId, ChildItemEvent('delete', source)); - } - - const destParentId = getParentFromPath(destination.path); - if (destParentId) { - websockets.publish(itemTopic, destParentId, ChildItemEvent('create', destination)); - } - }, - ); } /** @@ -189,43 +172,6 @@ function registerMemberItemsTopic(websockets: WebsocketService, itemService: Ite websockets.publish(memberItemsTopic, item.creator.id, AccessibleItemsEvent('create', item)); } }); - - // on move item: - // - notify own items of creator of delete IF old location was root - // - notify own items of creator of create IF new location is root - itemService.hooks.setPostHook( - 'move', - async (actor, repositories, { source, destination, sourceParentId }) => { - if (sourceParentId === undefined && source.creator) { - // root item, notify creator - - // todo: remove own when we don't use own anymore - websockets.publish(memberItemsTopic, source.creator.id, OwnItemsEvent('delete', source)); - - websockets.publish( - memberItemsTopic, - source.creator.id, - AccessibleItemsEvent('delete', source), - ); - } - - const destParentId = getParentFromPath(destination.path); - if (destParentId === undefined && destination.creator) { - // root item, notify creator - // todo: remove own when we don't use own anymore - websockets.publish( - memberItemsTopic, - destination.creator.id, - OwnItemsEvent('create', destination), - ); - websockets.publish( - memberItemsTopic, - destination.creator.id, - AccessibleItemsEvent('create', destination), - ); - } - }, - ); } /** diff --git a/src/services/item/ws/services.ts b/src/services/item/ws/services.ts new file mode 100644 index 0000000000..011afcf6c2 --- /dev/null +++ b/src/services/item/ws/services.ts @@ -0,0 +1,115 @@ +import { FastifyBaseLogger } from 'fastify'; + +import { getParentFromPath } from '@graasp/sdk'; + +import { WebsocketService } from '../../websockets/ws-service'; +import { Item } from '../entities/Item'; +import { getPromiseResults } from '../utils'; +import { + AccessibleItemsEvent, + ChildItemEvent, + ItemOpFeedbackEvent, + OwnItemsEvent, + itemTopic, + memberItemsTopic, +} from './events'; + +// on move item: +// - notify own items of creator of delete IF old location was root +// - notify own items of creator of create IF new location is root +const publishMemberItemsTopic = ( + websockets: WebsocketService, + { source, destination, sourceParentId }, +) => { + if (sourceParentId === undefined && source.creator) { + // root item, notify creator + // todo: remove own when we don't use own anymore + websockets.publish(memberItemsTopic, source.creator.id, OwnItemsEvent('delete', source)); + websockets.publish(memberItemsTopic, source.creator.id, AccessibleItemsEvent('delete', source)); + } + + const destParentId = getParentFromPath(destination.path); + if (destParentId === undefined && destination.creator) { + // root item, notify creator + // todo: remove own when we don't use own anymore + websockets.publish( + memberItemsTopic, + destination.creator.id, + OwnItemsEvent('create', destination), + ); + websockets.publish( + memberItemsTopic, + destination.creator.id, + AccessibleItemsEvent('create', destination), + ); + } +}; + +// on move item, notify: +// - parent of old location of deleted child +// - parent of new location of new child +const publishItemTopic = ( + websockets: WebsocketService, + { source, destination, sourceParentId }, +) => { + if (sourceParentId !== undefined) { + websockets.publish(itemTopic, sourceParentId, ChildItemEvent('delete', source)); + } + + const destParentId = getParentFromPath(destination.path); + if (destParentId) { + websockets.publish(itemTopic, destParentId, ChildItemEvent('create', destination)); + } +}; + +export const publishAfterMoved = ( + websockets: WebsocketService, + { source, destination, sourceParentId }, +) => { + publishItemTopic(websockets, { source, destination, sourceParentId }); + publishMemberItemsTopic(websockets, { source, destination, sourceParentId }); +}; + +// TODO: update this to send only one ws ? like a warning or success or error ? +export const publishFeedbackAfterAllMoved = ({ + websockets, + results, + itemIds, + log, + memberId, +}: { + websockets: WebsocketService; + results: PromiseSettledResult[]; + itemIds: string[]; + log: FastifyBaseLogger; + memberId: string; +}) => { + const { success, failed } = getPromiseResults(results); + const successIds = success.map((i) => i.id); + const failedIds = itemIds.filter((id) => !successIds.includes(id)); + + if (success.length) { + websockets.publish( + memberItemsTopic, + memberId, + ItemOpFeedbackEvent( + 'move', + success.map((i) => i.id), + { + data: Object.fromEntries(success.map((i) => [i.id, i])), + errors: [], + }, + ), + ); + } + if (failed.length) { + failed.forEach((e) => { + log.error(e); + websockets.publish( + memberItemsTopic, + memberId, + ItemOpFeedbackEvent('move', failedIds, { error: e }), + ); + }); + } +}; From ca29cbbb08eb915e1bf7b3a2b9cdf2244102681d Mon Sep 17 00:00:00 2001 From: Thibault Reidy Date: Fri, 23 Feb 2024 08:19:07 +0000 Subject: [PATCH 3/6] test: update move ws tests - check that the move is committed when ws is received --- src/services/item/test/ws.test.ts | 81 +++++++++++++++++++++---------- 1 file changed, 55 insertions(+), 26 deletions(-) diff --git a/src/services/item/test/ws.test.ts b/src/services/item/test/ws.test.ts index de0e56b545..fdb2f7f4fe 100644 --- a/src/services/item/test/ws.test.ts +++ b/src/services/item/test/ws.test.ts @@ -7,6 +7,7 @@ import { HttpMethod, PermissionLevel, Websocket, + getParentFromPath, parseStringToDate, } from '@graasp/sdk'; @@ -433,11 +434,11 @@ describe('Item websocket hooks', () => { }); expect(response.statusCode).toBe(StatusCodes.ACCEPTED); - await waitForExpect(async () => { - expect((await ItemRepository.findOneBy({ id: childItem.id }))?.path).toContain( - newParentItem.path, - ); - }); + // When the websocket is not received, indicating that the transaction + // is still in progress, the item should contain the old path. + expect((await ItemRepository.findOneBy({ id: childItem.id }))?.path).toContain( + oldParentItem.path, + ); await waitForExpect(() => { const [childDelete] = itemUpdates; @@ -445,6 +446,12 @@ describe('Item websocket hooks', () => { ChildItemEvent('delete', parseStringToDate(childItem) as Item), ); }); + + // When the websocket is received, indicating that the transaction + // is done, the item should contain the new path. + expect((await ItemRepository.findOneBy({ id: childItem.id }))?.path).toContain( + newParentItem.path, + ); }); it('parent of new location receives child creation update', async () => { @@ -464,20 +471,25 @@ describe('Item websocket hooks', () => { }); expect(response.statusCode).toBe(StatusCodes.ACCEPTED); - await waitForExpect(async () => { - expect((await ItemRepository.findOneBy({ id: childItem.id }))?.path).toContain( - newParentItem.path, - ); - }); - - const moved = await ItemRepository.findOneBy({ id: childItem.id }); + // When the websocket is not received, indicating that the transaction + // is still in progress, the item should contain the old path. + expect((await ItemRepository.findOneBy({ id: childItem.id }))?.path).toContain( + oldParentItem.path, + ); - await waitForExpect(() => { + await waitForExpect(async () => { const [childCreate] = itemUpdates; + const moved = await ItemRepository.findOneBy({ id: childItem.id }); expect(childCreate).toMatchObject( ChildItemEvent('create', parseStringToDate(moved) as Item), ); }); + + // When the websocket is received, indicating that the transaction + // is done, the item should contain the new path. + expect((await ItemRepository.findOneBy({ id: childItem.id }))?.path).toContain( + newParentItem.path, + ); }); it('creator receives own items delete update if old location was root of creator', async () => { @@ -495,11 +507,13 @@ describe('Item websocket hooks', () => { }); expect(response.statusCode).toBe(StatusCodes.ACCEPTED); - await waitForExpect(async () => { - expect((await ItemRepository.findOneBy({ id: item.id }))?.path).toContain( - newParentItem.path, - ); - }); + // When the websocket is not received, indicating that the transaction + // is still in progress, the item's parent path should be root (undefined). + const itemBeforeUpdate = await ItemRepository.findOneBy({ id: item.id }); + if (!itemBeforeUpdate) { + fail('The item was not found in the repository.'); + } + expect(getParentFromPath(itemBeforeUpdate.path)).toBeUndefined(); await waitForExpect(() => { const [ownDelete, accessibleDelete] = itemUpdates; @@ -508,6 +522,14 @@ describe('Item websocket hooks', () => { AccessibleItemsEvent('delete', parseStringToDate(item) as Item), ); }); + + // When the websocket is received, indicating that the transaction + // is done, the item should contain the new parent path. + const itemAfterUpdate = await ItemRepository.findOneBy({ id: item.id }); + if (!itemAfterUpdate) { + fail('The item was not found in the repository.'); + } + expect(itemAfterUpdate.path).toContain(newParentItem.path); }); it('creator receives own items create update if new location is root of creator', async () => { @@ -526,18 +548,25 @@ describe('Item websocket hooks', () => { }); expect(response.statusCode).toBe(StatusCodes.ACCEPTED); - await waitForExpect(async () => { - expect((await ItemRepository.findOneBy({ id: childItem.id }))?.path).not.toContain( - oldParentItem.path, - ); - }); - - const moved = await ItemRepository.findOneBy({ id: childItem.id }); + // When the websocket is not received, indicating that the transaction + // is still in progress, the item should contain the old path. + expect((await ItemRepository.findOneBy({ id: childItem.id }))?.path).toContain( + oldParentItem.path, + ); - await waitForExpect(() => { + await waitForExpect(async () => { const [ownCreate] = itemUpdates; + const moved = await ItemRepository.findOneBy({ id: childItem.id }); expect(ownCreate).toMatchObject(OwnItemsEvent('create', parseStringToDate(moved) as Item)); }); + + // When the websocket is received, indicating that the transaction + // is done, the item's parent path should be root (undefined). + const itemAfterUpdate = await ItemRepository.findOneBy({ id: childItem.id }); + if (!itemAfterUpdate) { + fail('The item was not found in the repository.'); + } + expect(getParentFromPath(itemAfterUpdate.path)).toBeUndefined(); }); }); From 422cb820d4dbee0cbafbec1e115af207dc5aa9c9 Mon Sep 17 00:00:00 2001 From: Thibault Reidy Date: Mon, 26 Feb 2024 12:31:25 +0000 Subject: [PATCH 4/6] feat: improve code from code review - implement ItemWebsocketsService class - implement SeriesPromise class to run Promises in series - use newly postMoveAction function --- src/services/action/repositories/action.ts | 14 +- src/services/action/services/action.ts | 37 ++++ src/services/item/controller.ts | 53 +++--- src/services/item/plugins/action/service.ts | 13 ++ src/services/item/test/ws.test.ts | 16 +- src/services/item/types.ts | 25 +++ src/services/item/utils.ts | 12 -- src/services/item/ws/events.ts | 10 +- src/services/item/ws/services.ts | 191 +++++++++++--------- 9 files changed, 231 insertions(+), 140 deletions(-) diff --git a/src/services/action/repositories/action.ts b/src/services/action/repositories/action.ts index 8d6746bde3..e3e720e453 100644 --- a/src/services/action/repositories/action.ts +++ b/src/services/action/repositories/action.ts @@ -10,8 +10,9 @@ import { aggregateExpressionNames, buildAggregateExpression } from '../utils/act */ export const ActionRepository = AppDataSource.getRepository(Action).extend({ /** - * Create given action and return it. - * @param action Action to create + * TODO: remove if it not used. + * Create given actions. + * @param action Actions to create */ async postMany(actions: Pick[]): Promise { // save action @@ -20,6 +21,15 @@ export const ActionRepository = AppDataSource.getRepository(Action).extend({ } }, + /** + * Create given action. + * @param action Action to create + */ + async post(action: Pick): Promise { + // save action + await this.insert(action); + }, + /** * Delete actions matching the given `memberId`. Return actions, or `null`, if delete has no effect. * @param memberId ID of the member whose actions are deleted diff --git a/src/services/action/services/action.ts b/src/services/action/services/action.ts index bb9b906d6c..3bc701fea4 100644 --- a/src/services/action/services/action.ts +++ b/src/services/action/services/action.ts @@ -23,6 +23,7 @@ export class ActionService { this.memberService = memberService; } + // TODO: remove if it not used async postMany( member: Actor, repositories: Repositories, @@ -59,4 +60,40 @@ export class ActionService { await repositories.actionRepository.postMany(completeActions); } + + async post( + member: Actor, + repositories: Repositories, + request: FastifyRequest, + action: Partial & Pick, + ): Promise { + const { headers } = request; + + // prevent saving if member disabled + const enableMemberSaving = member?.extra?.enableSaveActions ?? true; + if (!enableMemberSaving) { + return; + } + + // prevent saving if item disabled analytics + if (action.item?.settings?.enableSaveActions) { + return; + } + + const view = getView(headers); + // warning: addresses might contained spoofed ips + const addresses = forwarded(request.raw); + const ip = addresses.pop(); + + const geolocation = ip ? getGeolocationIp(ip) : null; + const completeAction = { + member, + geolocation: geolocation ?? undefined, + view, + extra: {}, + ...action, + }; + + await repositories.actionRepository.post(completeAction); + } } diff --git a/src/services/item/controller.ts b/src/services/item/controller.ts index 763b94213e..18b03eaeef 100644 --- a/src/services/item/controller.ts +++ b/src/services/item/controller.ts @@ -24,9 +24,9 @@ import { updateMany, } from './fluent-schema'; import { ItemGeolocation } from './plugins/geolocation/ItemGeolocation'; -import { ItemChildrenParams, ItemSearchParams } from './types'; +import { ItemChildrenParams, ItemSearchParams, SeriesPromise } from './types'; import { ItemOpFeedbackEvent, memberItemsTopic } from './ws/events'; -import { publishAfterMoved, publishFeedbackAfterAllMoved } from './ws/services'; +import { ItemWebsocketsService } from './ws/services'; const plugin: FastifyPluginAsync = async (fastify) => { const { db, items, websockets } = fastify; @@ -272,37 +272,38 @@ const plugin: FastifyPluginAsync = async (fastify) => { body: { parentId }, log, } = request; - Promise.allSettled( + const itemWsService = new ItemWebsocketsService(websockets); + + SeriesPromise.allSettled( ids.map((itemId) => { - return db - .transaction(async (manager) => { - const repositories = buildRepositories(manager); - const { source, destination } = await itemService.move( - member, - repositories, - itemId, - parentId, - ); + return async () => { + return db + .transaction(async (manager) => { + const repositories = buildRepositories(manager); + const { source, destination } = await itemService.move( + member, + repositories, + itemId, + parentId, + ); - await actionItemService.postManyMoveAction(request, reply, repositories, [ - destination, - ]); + await actionItemService.postMoveAction(request, repositories, destination); - return { source, destination }; - }) - .then(async ({ source, destination }) => { - publishAfterMoved(websockets, { - source, - destination, - sourceParentId: getParentFromPath(source.path), + return { source, destination }; + }) + .then(async ({ source, destination }) => { + itemWsService.publishTopicsForMove({ + source, + destination, + sourceParentId: getParentFromPath(source.path), + }); + return destination; }); - return destination; - }); + }; }), ).then(async (results) => { if (member) { - publishFeedbackAfterAllMoved({ - websockets, + itemWsService.publishFeedbacksForMove({ log, results, itemIds: ids, diff --git a/src/services/item/plugins/action/service.ts b/src/services/item/plugins/action/service.ts index c6a5537b33..9781127beb 100644 --- a/src/services/item/plugins/action/service.ts +++ b/src/services/item/plugins/action/service.ts @@ -280,6 +280,7 @@ export class ActionItemService { await this.actionService.postMany(member, repositories, request, actions); } + // TODO: remove it if it is never used async postManyMoveAction( request: FastifyRequest, reply: FastifyReply, @@ -297,6 +298,18 @@ export class ActionItemService { await this.actionService.postMany(member, repositories, request, actions); } + async postMoveAction(request: FastifyRequest, repositories: Repositories, item: Item) { + const { member } = request; + const action = { + item, + type: ItemActionType.Move, + // TODO: remove any + // eslint-disable-next-line @typescript-eslint/no-explicit-any + extra: { itemId: item.id, body: request.body as any }, + }; + await this.actionService.post(member, repositories, request, action); + } + async postManyCopyAction( request: FastifyRequest, reply: FastifyReply, diff --git a/src/services/item/test/ws.test.ts b/src/services/item/test/ws.test.ts index fdb2f7f4fe..f103b9c2d0 100644 --- a/src/services/item/test/ws.test.ts +++ b/src/services/item/test/ws.test.ts @@ -434,7 +434,7 @@ describe('Item websocket hooks', () => { }); expect(response.statusCode).toBe(StatusCodes.ACCEPTED); - // When the websocket is not received, indicating that the transaction + // When the websocket is not received, indicates that the transaction // is still in progress, the item should contain the old path. expect((await ItemRepository.findOneBy({ id: childItem.id }))?.path).toContain( oldParentItem.path, @@ -447,7 +447,7 @@ describe('Item websocket hooks', () => { ); }); - // When the websocket is received, indicating that the transaction + // When the websocket is received, indicates that the transaction // is done, the item should contain the new path. expect((await ItemRepository.findOneBy({ id: childItem.id }))?.path).toContain( newParentItem.path, @@ -471,7 +471,7 @@ describe('Item websocket hooks', () => { }); expect(response.statusCode).toBe(StatusCodes.ACCEPTED); - // When the websocket is not received, indicating that the transaction + // When the websocket is not received, indicates that the transaction // is still in progress, the item should contain the old path. expect((await ItemRepository.findOneBy({ id: childItem.id }))?.path).toContain( oldParentItem.path, @@ -485,7 +485,7 @@ describe('Item websocket hooks', () => { ); }); - // When the websocket is received, indicating that the transaction + // When the websocket is received, indicates that the transaction // is done, the item should contain the new path. expect((await ItemRepository.findOneBy({ id: childItem.id }))?.path).toContain( newParentItem.path, @@ -507,7 +507,7 @@ describe('Item websocket hooks', () => { }); expect(response.statusCode).toBe(StatusCodes.ACCEPTED); - // When the websocket is not received, indicating that the transaction + // When the websocket is not received, indicates that the transaction // is still in progress, the item's parent path should be root (undefined). const itemBeforeUpdate = await ItemRepository.findOneBy({ id: item.id }); if (!itemBeforeUpdate) { @@ -523,7 +523,7 @@ describe('Item websocket hooks', () => { ); }); - // When the websocket is received, indicating that the transaction + // When the websocket is received, indicates that the transaction // is done, the item should contain the new parent path. const itemAfterUpdate = await ItemRepository.findOneBy({ id: item.id }); if (!itemAfterUpdate) { @@ -548,7 +548,7 @@ describe('Item websocket hooks', () => { }); expect(response.statusCode).toBe(StatusCodes.ACCEPTED); - // When the websocket is not received, indicating that the transaction + // When the websocket is not received, indicates that the transaction // is still in progress, the item should contain the old path. expect((await ItemRepository.findOneBy({ id: childItem.id }))?.path).toContain( oldParentItem.path, @@ -560,7 +560,7 @@ describe('Item websocket hooks', () => { expect(ownCreate).toMatchObject(OwnItemsEvent('create', parseStringToDate(moved) as Item)); }); - // When the websocket is received, indicating that the transaction + // When the websocket is received, indicates that the transaction // is done, the item's parent path should be root (undefined). const itemAfterUpdate = await ItemRepository.findOneBy({ id: childItem.id }); if (!itemAfterUpdate) { diff --git a/src/services/item/types.ts b/src/services/item/types.ts index 10fb5e4961..149ce84e37 100644 --- a/src/services/item/types.ts +++ b/src/services/item/types.ts @@ -30,3 +30,28 @@ export type ItemChildrenParams = { ordered?: boolean; types?: UnionOfConst[]; }; + +export type SeriesPromiseResults = { + success: T[]; + failed: Error[]; +}; + +export class SeriesPromise { + public static async allSettled( + promises: (() => Promise)[], + ): Promise> { + const success: SeriesPromiseResults['success'] = []; + const failed: SeriesPromiseResults['failed'] = []; + + for (const promise of promises) { + try { + const result = await promise(); + success.push(result); + } catch (error) { + failed.push(error); + } + } + + return { success, failed }; + } +} diff --git a/src/services/item/utils.ts b/src/services/item/utils.ts index 0f91df1ffb..d96a61d939 100644 --- a/src/services/item/utils.ts +++ b/src/services/item/utils.ts @@ -89,15 +89,3 @@ export const readPdfContent = async (source: string | URL) => { return ''; } }; - -export const getPromiseResults = (results: PromiseSettledResult[]) => { - const success = results - .filter((result) => result.status === 'fulfilled') - .map((result) => (result as PromiseFulfilledResult).value); - - const failed = results - .filter((result) => result.status === 'rejected') - .map((result) => (result as PromiseRejectedResult).reason); - - return { success, failed }; -}; diff --git a/src/services/item/ws/events.ts b/src/services/item/ws/events.ts index f302082c6c..dd81c30b9c 100644 --- a/src/services/item/ws/events.ts +++ b/src/services/item/ws/events.ts @@ -133,7 +133,7 @@ export const SharedItemsEvent = (op: SharedItemsEvent['op'], item: Item): Shared /** * Events from asynchronous background operations on given items */ -interface ItemOpFeedbackEvent { +export interface ItemOpFeedbackEventInterface { kind: 'feedback'; op: 'update' | 'delete' | 'move' | 'copy' | 'export' | 'recycle' | 'restore' | 'validate'; resource: Item['id'][]; @@ -153,10 +153,10 @@ interface ItemOpFeedbackEvent { */ // eslint-disable-next-line @typescript-eslint/no-unused-vars export const ItemOpFeedbackEvent = ( - op: ItemOpFeedbackEvent['op'], - resource: ItemOpFeedbackEvent['resource'], - result: ItemOpFeedbackEvent['result'], -): ItemOpFeedbackEvent => ({ + op: ItemOpFeedbackEventInterface['op'], + resource: ItemOpFeedbackEventInterface['resource'], + result: ItemOpFeedbackEventInterface['result'], +): ItemOpFeedbackEventInterface => ({ kind: 'feedback', op, resource, diff --git a/src/services/item/ws/services.ts b/src/services/item/ws/services.ts index 011afcf6c2..fdbd624345 100644 --- a/src/services/item/ws/services.ts +++ b/src/services/item/ws/services.ts @@ -4,112 +4,129 @@ import { getParentFromPath } from '@graasp/sdk'; import { WebsocketService } from '../../websockets/ws-service'; import { Item } from '../entities/Item'; -import { getPromiseResults } from '../utils'; +import { SeriesPromiseResults } from '../types'; import { AccessibleItemsEvent, ChildItemEvent, ItemOpFeedbackEvent, + ItemOpFeedbackEventInterface, OwnItemsEvent, itemTopic, memberItemsTopic, } from './events'; -// on move item: -// - notify own items of creator of delete IF old location was root -// - notify own items of creator of create IF new location is root -const publishMemberItemsTopic = ( - websockets: WebsocketService, - { source, destination, sourceParentId }, -) => { - if (sourceParentId === undefined && source.creator) { - // root item, notify creator - // todo: remove own when we don't use own anymore - websockets.publish(memberItemsTopic, source.creator.id, OwnItemsEvent('delete', source)); - websockets.publish(memberItemsTopic, source.creator.id, AccessibleItemsEvent('delete', source)); - } - - const destParentId = getParentFromPath(destination.path); - if (destParentId === undefined && destination.creator) { - // root item, notify creator - // todo: remove own when we don't use own anymore - websockets.publish( - memberItemsTopic, - destination.creator.id, - OwnItemsEvent('create', destination), - ); - websockets.publish( - memberItemsTopic, - destination.creator.id, - AccessibleItemsEvent('create', destination), - ); - } +type MoveParams = { + source: Item; + destination: Item; + sourceParentId?: string; }; -// on move item, notify: -// - parent of old location of deleted child -// - parent of new location of new child -const publishItemTopic = ( - websockets: WebsocketService, - { source, destination, sourceParentId }, -) => { - if (sourceParentId !== undefined) { - websockets.publish(itemTopic, sourceParentId, ChildItemEvent('delete', source)); - } +export class ItemWebsocketsService { + private websockets; - const destParentId = getParentFromPath(destination.path); - if (destParentId) { - websockets.publish(itemTopic, destParentId, ChildItemEvent('create', destination)); + constructor(websockets: WebsocketService) { + this.websockets = websockets; } -}; - -export const publishAfterMoved = ( - websockets: WebsocketService, - { source, destination, sourceParentId }, -) => { - publishItemTopic(websockets, { source, destination, sourceParentId }); - publishMemberItemsTopic(websockets, { source, destination, sourceParentId }); -}; -// TODO: update this to send only one ws ? like a warning or success or error ? -export const publishFeedbackAfterAllMoved = ({ - websockets, - results, - itemIds, - log, - memberId, -}: { - websockets: WebsocketService; - results: PromiseSettledResult[]; - itemIds: string[]; - log: FastifyBaseLogger; - memberId: string; -}) => { - const { success, failed } = getPromiseResults(results); - const successIds = success.map((i) => i.id); - const failedIds = itemIds.filter((id) => !successIds.includes(id)); + // TODO: update this to send only one ws ? like a warning or success or error ? + private publishFeedback({ + results, + itemIds, + log, + memberId, + feedbackOp, + }: { + results: SeriesPromiseResults; + itemIds: string[]; + log: FastifyBaseLogger; + memberId: string; + feedbackOp: ItemOpFeedbackEventInterface['op']; + }) { + const { success, failed } = results; + const successIds = success.map((i) => i.id); + const failedIds = itemIds.filter((id) => !successIds.includes(id)); - if (success.length) { - websockets.publish( - memberItemsTopic, - memberId, - ItemOpFeedbackEvent( - 'move', - success.map((i) => i.id), - { + if (success.length) { + this.websockets.publish( + memberItemsTopic, + memberId, + ItemOpFeedbackEvent(feedbackOp, successIds, { data: Object.fromEntries(success.map((i) => [i.id, i])), errors: [], - }, - ), - ); + }), + ); + } + if (failed.length) { + failed.forEach((e) => { + log.error(e); + this.websockets.publish( + memberItemsTopic, + memberId, + ItemOpFeedbackEvent(feedbackOp, failedIds, { error: e }), + ); + }); + } } - if (failed.length) { - failed.forEach((e) => { - log.error(e); - websockets.publish( + + public publishTopicsForMove({ source, destination, sourceParentId }: MoveParams) { + const destParentId = getParentFromPath(destination.path); + + // on move item: + // - notify own items of creator of delete IF old location was root + // - notify own items of creator of create IF new location is root + if (sourceParentId === undefined && source.creator) { + // root item, notify creator + // todo: remove own when we don't use own anymore + this.websockets.publish(memberItemsTopic, source.creator.id, OwnItemsEvent('delete', source)); + this.websockets.publish( memberItemsTopic, - memberId, - ItemOpFeedbackEvent('move', failedIds, { error: e }), + source.creator.id, + AccessibleItemsEvent('delete', source), + ); + } + if (destParentId === undefined && destination.creator) { + // root item, notify creator + // todo: remove own when we don't use own anymore + this.websockets.publish( + memberItemsTopic, + destination.creator.id, + OwnItemsEvent('create', destination), + ); + this.websockets.publish( + memberItemsTopic, + destination.creator.id, + AccessibleItemsEvent('create', destination), ); + } + + // on move item, notify: + // - parent of old location of deleted child + // - parent of new location of new child + if (sourceParentId !== undefined) { + this.websockets.publish(itemTopic, sourceParentId, ChildItemEvent('delete', source)); + } + if (destParentId) { + this.websockets.publish(itemTopic, destParentId, ChildItemEvent('create', destination)); + } + } + + public publishFeedbacksForMove({ + results, + itemIds, + log, + memberId, + }: { + results: SeriesPromiseResults; + itemIds: string[]; + log: FastifyBaseLogger; + memberId: string; + }) { + this.publishFeedback({ + results, + itemIds, + log, + memberId, + feedbackOp: 'move', }); } -}; +} From bd20355610a1e3cf87905faeecc6a661a2c99a7f Mon Sep 17 00:00:00 2001 From: Thibault Reidy Date: Wed, 28 Feb 2024 07:33:21 +0000 Subject: [PATCH 5/6] feat: remove error in ItemOpFeedbackEvent and use ResultOf exclusively --- src/services/item/controller.ts | 8 +-- src/services/item/plugins/action/index.ts | 4 +- .../item/plugins/action/test/ws.test.ts | 8 ++- src/services/item/plugins/recycled/index.ts | 6 +- .../item/plugins/recycled/test/ws.test.ts | 17 ++++-- src/services/item/plugins/validation/index.ts | 4 +- .../item/plugins/validation/test/ws.test.ts | 15 +++-- src/services/item/test/ws.test.ts | 25 ++++++-- src/services/item/ws/events.ts | 23 ++++--- src/services/item/ws/services.ts | 61 +++++++------------ 10 files changed, 95 insertions(+), 76 deletions(-) diff --git a/src/services/item/controller.ts b/src/services/item/controller.ts index 18b03eaeef..927a56b541 100644 --- a/src/services/item/controller.ts +++ b/src/services/item/controller.ts @@ -25,7 +25,7 @@ import { } from './fluent-schema'; import { ItemGeolocation } from './plugins/geolocation/ItemGeolocation'; import { ItemChildrenParams, ItemSearchParams, SeriesPromise } from './types'; -import { ItemOpFeedbackEvent, memberItemsTopic } from './ws/events'; +import { ItemOpFeedbackEvent, ResultOfFactory, memberItemsTopic } from './ws/events'; import { ItemWebsocketsService } from './ws/services'; const plugin: FastifyPluginAsync = async (fastify) => { @@ -212,7 +212,7 @@ const plugin: FastifyPluginAsync = async (fastify) => { websockets.publish( memberItemsTopic, member.id, - ItemOpFeedbackEvent('update', ids, { error: e }), + ItemOpFeedbackEvent('update', ids, ResultOfFactory.withError(e)), ); } }); @@ -253,7 +253,7 @@ const plugin: FastifyPluginAsync = async (fastify) => { websockets.publish( memberItemsTopic, member.id, - ItemOpFeedbackEvent('delete', ids, { error: e }), + ItemOpFeedbackEvent('delete', ids, ResultOfFactory.withError(e)), ); } }); @@ -352,7 +352,7 @@ const plugin: FastifyPluginAsync = async (fastify) => { websockets.publish( memberItemsTopic, member.id, - ItemOpFeedbackEvent('copy', ids, { error: e }), + ItemOpFeedbackEvent('copy', ids, ResultOfFactory.withError(e)), ); } }); diff --git a/src/services/item/plugins/action/index.ts b/src/services/item/plugins/action/index.ts index 7c11ce3c68..5d7bccdaa8 100644 --- a/src/services/item/plugins/action/index.ts +++ b/src/services/item/plugins/action/index.ts @@ -20,7 +20,7 @@ import { LocalFileConfiguration, S3FileConfiguration, } from '../../../file/interfaces/configuration'; -import { ItemOpFeedbackEvent, memberItemsTopic } from '../../ws/events'; +import { ItemOpFeedbackEvent, ResultOfFactory, memberItemsTopic } from '../../ws/events'; import { CannotPostAction } from './errors'; import { ActionRequestExportService } from './requestExport/service'; import { exportAction, getAggregateActions, getItemActions, postAction } from './schemas'; @@ -175,7 +175,7 @@ const plugin: FastifyPluginAsync = async (fastify) => { websockets.publish( memberItemsTopic, member.id, - ItemOpFeedbackEvent('export', [itemId], { error: e }), + ItemOpFeedbackEvent('export', [itemId], ResultOfFactory.withError(e)), ); } }); diff --git a/src/services/item/plugins/action/test/ws.test.ts b/src/services/item/plugins/action/test/ws.test.ts index ed02a2d1dd..d185d7f545 100644 --- a/src/services/item/plugins/action/test/ws.test.ts +++ b/src/services/item/plugins/action/test/ws.test.ts @@ -7,7 +7,7 @@ import { clearDatabase } from '../../../../../../test/app'; import { saveItemAndMembership } from '../../../../itemMembership/test/fixtures/memberships'; import { TestWsClient } from '../../../../websockets/test/test-websocket-client'; import { setupWsApp } from '../../../../websockets/test/ws-app'; -import { ItemOpFeedbackEvent, memberItemsTopic } from '../../../ws/events'; +import { ItemOpFeedbackEvent, ResultOfFactory, memberItemsTopic } from '../../../ws/events'; import { ActionRequestExportRepository } from '../requestExport/repository'; // mock datasource @@ -97,7 +97,11 @@ describe('asynchronous feedback', () => { await waitForExpect(() => { const [feedbackUpdate] = memberUpdates; expect(feedbackUpdate).toMatchObject( - ItemOpFeedbackEvent('export', [item.id], { error: new Error('mock error') }), + ItemOpFeedbackEvent( + 'export', + [item.id], + ResultOfFactory.withError(new Error('mock error')), + ), ); }); }); diff --git a/src/services/item/plugins/recycled/index.ts b/src/services/item/plugins/recycled/index.ts index 7a1aca6a33..c98dc05ede 100644 --- a/src/services/item/plugins/recycled/index.ts +++ b/src/services/item/plugins/recycled/index.ts @@ -5,7 +5,7 @@ import { FastifyPluginAsync } from 'fastify'; import { IdParam, IdsParams, MAX_TARGETS_FOR_READ_REQUEST } from '@graasp/sdk'; import { buildRepositories } from '../../../../utils/repositories'; -import { ItemOpFeedbackEvent, memberItemsTopic } from '../../ws/events'; +import { ItemOpFeedbackEvent, ResultOfFactory, memberItemsTopic } from '../../ws/events'; import schemas, { getRecycledItemDatas, recycleMany, restoreMany } from './schemas'; import { RecycledBinService } from './service'; import { recycleWsHooks } from './ws/hooks'; @@ -88,7 +88,7 @@ const plugin: FastifyPluginAsync = async (fastify, opti websockets.publish( memberItemsTopic, member.id, - ItemOpFeedbackEvent('recycle', ids, { error: e }), + ItemOpFeedbackEvent('recycle', ids, ResultOfFactory.withError(e)), ); } }); @@ -138,7 +138,7 @@ const plugin: FastifyPluginAsync = async (fastify, opti websockets.publish( memberItemsTopic, member.id, - ItemOpFeedbackEvent('restore', ids, { error: e }), + ItemOpFeedbackEvent('restore', ids, ResultOfFactory.withError(e)), ); } }); diff --git a/src/services/item/plugins/recycled/test/ws.test.ts b/src/services/item/plugins/recycled/test/ws.test.ts index b09222e8e5..14d65e6dd7 100644 --- a/src/services/item/plugins/recycled/test/ws.test.ts +++ b/src/services/item/plugins/recycled/test/ws.test.ts @@ -18,6 +18,7 @@ import { ItemEvent, ItemOpFeedbackEvent, OwnItemsEvent, + ResultOfFactory, SelfItemEvent, SharedItemsEvent, itemTopic, @@ -710,9 +711,11 @@ describe('Recycle websocket hooks', () => { await waitForExpect(() => { const [feedbackUpdate] = memberUpdates; expect(feedbackUpdate).toMatchObject( - ItemOpFeedbackEvent('recycle', [item.id], { - error: new Error('mock error'), - }), + ItemOpFeedbackEvent( + 'recycle', + [item.id], + ResultOfFactory.withError(new Error('mock error')), + ), ); }); }); @@ -771,9 +774,11 @@ describe('Recycle websocket hooks', () => { await waitForExpect(() => { const [feedbackUpdate] = memberUpdates; expect(feedbackUpdate).toMatchObject( - ItemOpFeedbackEvent('restore', [item.id], { - error: new Error('mock error'), - }), + ItemOpFeedbackEvent( + 'restore', + [item.id], + ResultOfFactory.withError(new Error('mock error')), + ), ); }); }); diff --git a/src/services/item/plugins/validation/index.ts b/src/services/item/plugins/validation/index.ts index d117a72bf2..43701c8694 100644 --- a/src/services/item/plugins/validation/index.ts +++ b/src/services/item/plugins/validation/index.ts @@ -4,7 +4,7 @@ import { FastifyPluginAsync } from 'fastify'; import { UnauthorizedMember } from '../../../../utils/errors'; import { buildRepositories } from '../../../../utils/repositories'; -import { ItemOpFeedbackEvent, memberItemsTopic } from '../../ws/events'; +import { ItemOpFeedbackEvent, ResultOfFactory, memberItemsTopic } from '../../ws/events'; import { itemValidation, itemValidationGroup } from './schemas'; import { ItemValidationService } from './service'; @@ -98,7 +98,7 @@ const plugin: FastifyPluginAsync = async (fastify websockets.publish( memberItemsTopic, member.id, - ItemOpFeedbackEvent('validate', [itemId], { error: e }), + ItemOpFeedbackEvent('validate', [itemId], ResultOfFactory.withError(e)), ); } }); diff --git a/src/services/item/plugins/validation/test/ws.test.ts b/src/services/item/plugins/validation/test/ws.test.ts index 139639e139..6a1fb235c8 100644 --- a/src/services/item/plugins/validation/test/ws.test.ts +++ b/src/services/item/plugins/validation/test/ws.test.ts @@ -8,7 +8,12 @@ import { ITEMS_ROUTE_PREFIX } from '../../../../../utils/config'; import { saveItemAndMembership } from '../../../../itemMembership/test/fixtures/memberships'; import { TestWsClient } from '../../../../websockets/test/test-websocket-client'; import { setupWsApp } from '../../../../websockets/test/ws-app'; -import { ItemEvent, ItemOpFeedbackEvent, memberItemsTopic } from '../../../ws/events'; +import { + ItemEvent, + ItemOpFeedbackEvent, + ResultOfFactory, + memberItemsTopic, +} from '../../../ws/events'; import { ItemValidationGroupRepository } from '../repositories/ItemValidationGroup'; import { saveItemValidation } from './utils'; @@ -82,9 +87,11 @@ describe('asynchronous feedback', () => { await waitForExpect(() => { const [feedbackUpdate] = memberUpdates; expect(feedbackUpdate).toMatchObject( - ItemOpFeedbackEvent('validate', [item.id], { - error: new Error('mock error'), - }), + ItemOpFeedbackEvent( + 'validate', + [item.id], + ResultOfFactory.withError(new Error('mock error')), + ), ); }); }); diff --git a/src/services/item/test/ws.test.ts b/src/services/item/test/ws.test.ts index f103b9c2d0..1894ee30cc 100644 --- a/src/services/item/test/ws.test.ts +++ b/src/services/item/test/ws.test.ts @@ -28,6 +28,7 @@ import { ItemEvent, ItemOpFeedbackEvent, OwnItemsEvent, + ResultOfFactory, SelfItemEvent, SharedItemsEvent, itemTopic, @@ -618,7 +619,11 @@ describe('Item websocket hooks', () => { await waitForExpect(() => { const [feedbackUpdate] = memberUpdates; expect(feedbackUpdate).toMatchObject( - ItemOpFeedbackEvent('update', [item.id], { error: new Error('mock error') }), + ItemOpFeedbackEvent( + 'update', + [item.id], + ResultOfFactory.withError(new Error('mock error')), + ), ); }); }); @@ -662,7 +667,11 @@ describe('Item websocket hooks', () => { await waitForExpect(() => { const [_ownUpdate, _accessibleUpdate, feedbackUpdate] = memberUpdates; expect(feedbackUpdate).toMatchObject( - ItemOpFeedbackEvent('delete', [item.id], { error: new Error('mock error') }), + ItemOpFeedbackEvent( + 'delete', + [item.id], + ResultOfFactory.withError(new Error('mock error')), + ), ); }); }); @@ -714,7 +723,11 @@ describe('Item websocket hooks', () => { await waitForExpect(() => { const [feedbackUpdate] = memberUpdates; expect(feedbackUpdate).toMatchObject( - ItemOpFeedbackEvent('move', [item.id], { error: new Error('mock error') }), + ItemOpFeedbackEvent( + 'move', + [item.id], + ResultOfFactory.withError(new Error('mock error')), + ), ); }); }); @@ -764,7 +777,11 @@ describe('Item websocket hooks', () => { await waitForExpect(() => { const [feedbackUpdate] = memberUpdates; expect(feedbackUpdate).toMatchObject( - ItemOpFeedbackEvent('copy', [item.id], { error: new Error('mock error') }), + ItemOpFeedbackEvent( + 'copy', + [item.id], + ResultOfFactory.withError(new Error('mock error')), + ), ); }); }); diff --git a/src/services/item/ws/events.ts b/src/services/item/ws/events.ts index dd81c30b9c..e9fcf2f0db 100644 --- a/src/services/item/ws/events.ts +++ b/src/services/item/ws/events.ts @@ -130,6 +130,11 @@ export const SharedItemsEvent = (op: SharedItemsEvent['op'], item: Item): Shared item, }); +export const ResultOfFactory = { + withError: (e: Error) => ResultOfFactory.withErrors([e]), + withErrors: (errors: Error[]) => ({ data: {}, errors }), +}; + /** * Events from asynchronous background operations on given items */ @@ -137,11 +142,7 @@ export interface ItemOpFeedbackEventInterface { kind: 'feedback'; op: 'update' | 'delete' | 'move' | 'copy' | 'export' | 'recycle' | 'restore' | 'validate'; resource: Item['id'][]; - result: - | { - error: Error; - } - | ResultOf; + result: ResultOf; } /** @@ -160,8 +161,12 @@ export const ItemOpFeedbackEvent = ( kind: 'feedback', op, resource, - result: result['error'] - ? // monkey patch because somehow JSON.stringify(e: Error) will always result in {} - { error: { name: result['error'].name, message: result['error'].message } } - : result, + result: { + data: result.data, + // monkey patch because JSON.stringify(e: Error) will always result in {} + errors: result.errors.map((e) => ({ + name: e.name, + message: e.message, + })), + }, }); diff --git a/src/services/item/ws/services.ts b/src/services/item/ws/services.ts index fdbd624345..eb6bd2ebb8 100644 --- a/src/services/item/ws/services.ts +++ b/src/services/item/ws/services.ts @@ -21,50 +21,36 @@ type MoveParams = { sourceParentId?: string; }; +type PublishFeedbackParams = { + results: SeriesPromiseResults; + itemIds: string[]; + log: FastifyBaseLogger; + memberId: string; + feedbackOp: ItemOpFeedbackEventInterface['op']; +}; + export class ItemWebsocketsService { - private websockets; + private websockets: WebsocketService; constructor(websockets: WebsocketService) { this.websockets = websockets; } // TODO: update this to send only one ws ? like a warning or success or error ? - private publishFeedback({ - results, - itemIds, - log, - memberId, - feedbackOp, - }: { - results: SeriesPromiseResults; - itemIds: string[]; - log: FastifyBaseLogger; - memberId: string; - feedbackOp: ItemOpFeedbackEventInterface['op']; - }) { + private publishFeedback({ results, itemIds, log, memberId, feedbackOp }: PublishFeedbackParams) { const { success, failed } = results; - const successIds = success.map((i) => i.id); - const failedIds = itemIds.filter((id) => !successIds.includes(id)); - if (success.length) { - this.websockets.publish( - memberItemsTopic, - memberId, - ItemOpFeedbackEvent(feedbackOp, successIds, { - data: Object.fromEntries(success.map((i) => [i.id, i])), - errors: [], - }), - ); - } + this.websockets.publish( + memberItemsTopic, + memberId, + ItemOpFeedbackEvent(feedbackOp, itemIds, { + data: Object.fromEntries(success.map((i) => [i.id, i])), + errors: failed, + }), + ); + if (failed.length) { - failed.forEach((e) => { - log.error(e); - this.websockets.publish( - memberItemsTopic, - memberId, - ItemOpFeedbackEvent(feedbackOp, failedIds, { error: e }), - ); - }); + log.error(failed); } } @@ -115,12 +101,7 @@ export class ItemWebsocketsService { itemIds, log, memberId, - }: { - results: SeriesPromiseResults; - itemIds: string[]; - log: FastifyBaseLogger; - memberId: string; - }) { + }: Omit) { this.publishFeedback({ results, itemIds, From 0f10e2a72624b583288c3249020367ab99ccb24c Mon Sep 17 00:00:00 2001 From: Thibault Reidy Date: Fri, 1 Mar 2024 09:13:10 +0000 Subject: [PATCH 6/6] feat: update the promise runner to allow to run in parallel or in series the same way --- src/services/item/controller.ts | 8 ++--- src/services/item/types.ts | 51 ++++++++++++++++++++++++++++---- src/services/item/ws/services.ts | 4 +-- 3 files changed, 51 insertions(+), 12 deletions(-) diff --git a/src/services/item/controller.ts b/src/services/item/controller.ts index 927a56b541..e680c55d92 100644 --- a/src/services/item/controller.ts +++ b/src/services/item/controller.ts @@ -24,7 +24,7 @@ import { updateMany, } from './fluent-schema'; import { ItemGeolocation } from './plugins/geolocation/ItemGeolocation'; -import { ItemChildrenParams, ItemSearchParams, SeriesPromise } from './types'; +import { ItemChildrenParams, ItemSearchParams, PromiseRunner } from './types'; import { ItemOpFeedbackEvent, ResultOfFactory, memberItemsTopic } from './ws/events'; import { ItemWebsocketsService } from './ws/services'; @@ -274,7 +274,7 @@ const plugin: FastifyPluginAsync = async (fastify) => { } = request; const itemWsService = new ItemWebsocketsService(websockets); - SeriesPromise.allSettled( + PromiseRunner.inSeries( ids.map((itemId) => { return async () => { return db @@ -291,7 +291,7 @@ const plugin: FastifyPluginAsync = async (fastify) => { return { source, destination }; }) - .then(async ({ source, destination }) => { + .then(({ source, destination }) => { itemWsService.publishTopicsForMove({ source, destination, @@ -301,7 +301,7 @@ const plugin: FastifyPluginAsync = async (fastify) => { }); }; }), - ).then(async (results) => { + ).then((results) => { if (member) { itemWsService.publishFeedbacksForMove({ log, diff --git a/src/services/item/types.ts b/src/services/item/types.ts index 149ce84e37..49e3fda4ae 100644 --- a/src/services/item/types.ts +++ b/src/services/item/types.ts @@ -31,17 +31,31 @@ export type ItemChildrenParams = { types?: UnionOfConst[]; }; -export type SeriesPromiseResults = { +export type PromiseRunnerResults = { success: T[]; failed: Error[]; }; -export class SeriesPromise { - public static async allSettled( +export class PromiseRunner { + private static transformPromiseResults = (results: PromiseSettledResult[]) => { + const success = results + .filter((result) => result.status === 'fulfilled') + .map((result) => (result as PromiseFulfilledResult).value); + + const failed = results + .filter((result) => result.status === 'rejected') + .map((result) => new Error((result as PromiseRejectedResult).reason)); + + return { success, failed }; + }; + + public static async inSeries( promises: (() => Promise)[], - ): Promise> { - const success: SeriesPromiseResults['success'] = []; - const failed: SeriesPromiseResults['failed'] = []; + ): Promise> { + const success: PromiseRunnerResults['success'] = []; + const failed: PromiseRunnerResults['failed'] = []; + + const startTime = performance.now(); for (const promise of promises) { try { @@ -52,6 +66,31 @@ export class SeriesPromise { } } + const endTime = performance.now(); + console.log( + `Series terminated after ${endTime - startTime} ms for ${promises.length} Promises`, + startTime, + endTime, + ); + + return { success, failed }; + } + + public static async allSettled( + promises: (() => Promise)[], + ): Promise> { + const startTime = performance.now(); + + const results = await Promise.allSettled(promises.map((promise) => promise())); + const { success, failed } = PromiseRunner.transformPromiseResults(results); + + const endTime = performance.now(); + console.log( + `Concurrently terminated after ${endTime - startTime} ms for ${promises.length} Promises`, + startTime, + endTime, + ); + return { success, failed }; } } diff --git a/src/services/item/ws/services.ts b/src/services/item/ws/services.ts index eb6bd2ebb8..32ba07813e 100644 --- a/src/services/item/ws/services.ts +++ b/src/services/item/ws/services.ts @@ -4,7 +4,7 @@ import { getParentFromPath } from '@graasp/sdk'; import { WebsocketService } from '../../websockets/ws-service'; import { Item } from '../entities/Item'; -import { SeriesPromiseResults } from '../types'; +import { PromiseRunnerResults } from '../types'; import { AccessibleItemsEvent, ChildItemEvent, @@ -22,7 +22,7 @@ type MoveParams = { }; type PublishFeedbackParams = { - results: SeriesPromiseResults; + results: PromiseRunnerResults; itemIds: string[]; log: FastifyBaseLogger; memberId: string;