From cd634a28027fde662bdd09c09d5c6f2fae98bd1e Mon Sep 17 00:00:00 2001 From: BUDUMURU SRINIVAS SAI SARAN TEJA Date: Sun, 1 Feb 2026 00:13:52 +0530 Subject: [PATCH] feat: implement execution registry and execution routes for node execution --- apps/http-backend/src/index.ts | 7 ++- .../src/routes/userRoutes/executionRoutes.ts | 61 +++++++++++++++++++ apps/worker/src/engine/executor.ts | 30 ++++++++- apps/worker/src/index.ts | 5 +- packages/common/src/index.ts | 5 ++ packages/nodes/src/index.ts | 2 + .../src/registry/Execution.config.types.ts | 16 +++++ .../nodes/src/registry/execution.registory.ts | 47 ++++++++++++++ 8 files changed, 167 insertions(+), 6 deletions(-) create mode 100644 apps/http-backend/src/routes/userRoutes/executionRoutes.ts create mode 100644 packages/nodes/src/registry/Execution.config.types.ts create mode 100644 packages/nodes/src/registry/execution.registory.ts diff --git a/apps/http-backend/src/index.ts b/apps/http-backend/src/index.ts index e518948..13e5fe7 100644 --- a/apps/http-backend/src/index.ts +++ b/apps/http-backend/src/index.ts @@ -12,13 +12,14 @@ console.log("📁 Loaded .env from:", envPath); console.log("🔐 GOOGLE_REDIRECT_URI:", process.env.GOOGLE_REDIRECT_URI || "NOT SET"); import cookieParser from 'cookie-parser' -import { NodeRegistry } from "@repo/nodes/nodeClient"; +import { ExecutionRegister, NodeRegistry } from "@repo/nodes/nodeClient"; import express from "express"; import { userRouter } from "./routes/userRoutes/userRoutes.js"; import cors from "cors" import { sheetRouter } from "./routes/sheet.routes.js"; import { googleAuth } from "./routes/google_callback.js"; import { tokenScheduler } from "./scheduler/token-scheduler.js"; +import { execRouter } from "./routes/userRoutes/executionRoutes.js"; const app = express() @@ -33,13 +34,15 @@ app.use(cookieParser()); app.use("/user" , userRouter) app.use('/node', sheetRouter) -app.use('/auth/google', googleAuth) // ← CHANGED THIS LINE! +app.use('/auth/google', googleAuth) +app.use('/execute', execRouter) const PORT= 3002 async function startServer() { await NodeRegistry.registerAll() tokenScheduler.start(); + ExecutionRegister.initialize() app.listen(PORT, () => { console.log(`Server running on port ${PORT}`); }) diff --git a/apps/http-backend/src/routes/userRoutes/executionRoutes.ts b/apps/http-backend/src/routes/userRoutes/executionRoutes.ts new file mode 100644 index 0000000..2cfd7ed --- /dev/null +++ b/apps/http-backend/src/routes/userRoutes/executionRoutes.ts @@ -0,0 +1,61 @@ +import { Response, Request, Router } from "express"; +import { AuthRequest, userMiddleware } from "./userMiddleware.js"; +import { ExecuteNode, statusCodes } from "@repo/common/zod"; +import { prismaClient } from "@repo/db"; +import { ExecutionRegister } from '@repo/nodes' +export const execRouter: Router = Router() + +execRouter.post('/node', userMiddleware, async(req: AuthRequest, res: Response)=>{ + try{ + if(!req.user){ + return res.status(statusCodes.BAD_REQUEST).json({ + message: "User is not logged in ", + }); + } + const data = req.body; + const dataSafe = ExecuteNode.safeParse(data) + if(!dataSafe.success){ + return res.status(statusCodes.BAD_REQUEST).json({ + message: "Invalid input" + }) + } + const nodeData = await prismaClient.node.findFirst({ + where: {id: dataSafe.data.NodeId}, + include: { + AvailableNode: true + } + }) + + if(nodeData){ + const type = nodeData.AvailableNode.type + const config = dataSafe.data.Config ? dataSafe.data.Config : nodeData.config // for test api data prefered fist then config in db + console.log(`config and type: ${JSON.stringify(config)} & ${type}`) + const context = { + userId: req.user.sub || "", + config: config + } + const executionResult = await ExecutionRegister.execute(type, context) + + console.log(`Execution result: ${executionResult}`) + + if(executionResult) + return res.status(statusCodes.ACCEPTED).json({ + message: `${nodeData.name} node execution done` , + data: executionResult + }) + + return res.status(statusCodes.FORBIDDEN).json({ + message: `${nodeData.name} node execution failed` + }) + } + return res.status(statusCodes.NOT_FOUND).json({ + message: `${dataSafe.data.NodeId} not found` + }) + + }catch(e){ + console.log("This is the error from executing node", e); + return res.status(statusCodes.INTERNAL_SERVER_ERROR).json({ + message: "Internal server Error from node execution ", + }); + } +}) \ No newline at end of file diff --git a/apps/worker/src/engine/executor.ts b/apps/worker/src/engine/executor.ts index bf4d8a7..767c9f9 100644 --- a/apps/worker/src/engine/executor.ts +++ b/apps/worker/src/engine/executor.ts @@ -1,5 +1,6 @@ import { prismaClient } from "@repo/db/client"; -import { register } from "./registory.js"; +// import { register } from "./registory.js"; +import { ExecutionRegister } from "@repo/nodes"; export async function executeWorkflow( workflowExecutionId: string ): Promise { @@ -41,6 +42,14 @@ export async function executeWorkflow( console.log(`Total nodes - ${nodes.length}`); for (const node of nodes) { console.log(`${node.name}, ${node.stage}, ${node.id}th - started Execution`); + const nodeExecution = await prismaClient.nodeExecution.create({ + data:{ + nodeId: node.id, + workflowExecId: workflowExecutionId, + status: "Start", + startedAt: new Date() + } + }) const nodeType = node.AvailableNode.type; // Create mutable copy of config const nodeConfig = { ...(node.config as Record) }; @@ -63,7 +72,7 @@ export async function executeWorkflow( console.log(`Executing with context: ${JSON.stringify(context)}`); console.log(`Executing with context: ${context.credId}`); - const execute = await register.execute(nodeType, context); + const execute = await ExecutionRegister.execute(nodeType, context); if (!execute.success) { await prismaClient.workflowExecution.update({ where: { id: workflowExecutionId }, @@ -73,8 +82,25 @@ export async function executeWorkflow( completedAt: new Date(), }, }); + + await prismaClient.nodeExecution.update({ + where: {id: nodeExecution.id}, + data:{ + status: "Failed", + error: execute.error, + completedAt: new Date() + } + }) return; } + await prismaClient.nodeExecution.update({ + where: {id: nodeExecution.id}, + data: { + completedAt: new Date(), + outputData: execute.output, + status: "Completed" + } + }) currentInputData = execute.output; console.log("output: ", JSON.stringify(execute)); } diff --git a/apps/worker/src/index.ts b/apps/worker/src/index.ts index b752a45..6c32cfb 100644 --- a/apps/worker/src/index.ts +++ b/apps/worker/src/index.ts @@ -1,6 +1,7 @@ import { Kafka } from "kafkajs"; import { executeWorkflow } from "./engine/executor.js"; -import { register } from "./engine/registory.js"; +import { ExecutionRegister } from "@repo/nodes"; +// import { register } from "./engine/registory.js"; const kafka = new Kafka({ // clientId: "Processing App", clientId: "BuildFlow-Worker", @@ -8,7 +9,7 @@ const kafka = new Kafka({ }); const TOPIC_NAME = "First-Client"; -register.initialize() +ExecutionRegister.initialize() async function main() { const consumer = kafka.consumer({ groupId: "test-group" }); diff --git a/packages/common/src/index.ts b/packages/common/src/index.ts index f59ab28..613b5d3 100644 --- a/packages/common/src/index.ts +++ b/packages/common/src/index.ts @@ -40,6 +40,11 @@ export const NodeSchema = z.object({ export const ExecuteWorkflow = z.object({ workflowId : z.string(), }) + +export const ExecuteNode = z.object({ + NodeId: z.string(), + Config: z.any().optional() +}) export const NodeUpdateSchema = z.object({ NodeId: z.string(), Config: z.any().optional(), diff --git a/packages/nodes/src/index.ts b/packages/nodes/src/index.ts index 20f9caf..f015f3a 100644 --- a/packages/nodes/src/index.ts +++ b/packages/nodes/src/index.ts @@ -1,6 +1,7 @@ // Central export for all major modules // export { default as NodeRegistry } from './registry/node-registry.js'; + export { GmailExecutor } from './gmail/gmail.executor.js'; @@ -10,6 +11,7 @@ export type { OAuthTokens } from './common/google-oauth-service.js' export { default as GoogleSheetsNodeExecutor } from './google-sheets/google-sheets.executor.js'; export { GoogleOAuthService } from './common/google-oauth-service.js'; export { default as NodeRegistry } from './registry/node-registry.js'; +export { ExecutionRegister } from './registry/execution.registory.js'; export { GoogleSheetsService } from './google-sheets/google-sheets.service.js'; console.log("Hello World From node / index.ts"); diff --git a/packages/nodes/src/registry/Execution.config.types.ts b/packages/nodes/src/registry/Execution.config.types.ts new file mode 100644 index 0000000..106f6aa --- /dev/null +++ b/packages/nodes/src/registry/Execution.config.types.ts @@ -0,0 +1,16 @@ +export interface ExecutionContext { + nodeId?: string; + userId: string; + credentialsId?: string; + config: Record; + inputData?: any; +} +export interface ExecutionResult { + success: boolean; + output?: any; + error?: string; + metadata?: Record; +} +export interface NodeExecutor { + execute(context: ExecutionContext): Promise; +} diff --git a/packages/nodes/src/registry/execution.registory.ts b/packages/nodes/src/registry/execution.registory.ts new file mode 100644 index 0000000..74e17a9 --- /dev/null +++ b/packages/nodes/src/registry/execution.registory.ts @@ -0,0 +1,47 @@ +import { ExecutionContext, ExecutionResult, NodeExecutor } from "../registry/Execution.config.types.js"; +import { GmailExecutor } from "@repo/nodes/nodeClient"; +import {GoogleSheetsNodeExecutor} from "@repo/nodes/nodeClient"; +class ExecutionRegistry { + private executors = new Map(); + + register(nodeType: string, executor: NodeExecutor) { + this.executors.set(nodeType, executor); + } + + async execute( + nodeType: string, + context: ExecutionContext + ): Promise { + const executor = this.executors.get(nodeType); + if (!executor) { + return { + success: false, + error: `No Executor found for ${nodeType}`, + }; + } + try { + const result = await executor.execute(context) + console.log("Execute result:", result); + + return result; + } catch (error: any) { + return { + success: false, + error: error, + }; + } + } + initialize() { + // TODO: Ensure GmailExecutor implements NodeExecutor and is compatible with ExecutionContext + // If needed, adapt/extract a compatible Executor for registration. + // For now, this cast suppresses the type error. Refactor as soon as possible! + + + //wehen visits this next time make sure chang gmail executor implements NodeExecutor + this.register("gmail", new GmailExecutor() as unknown as NodeExecutor); + this.register("google_sheet", new GoogleSheetsNodeExecutor() as unknown as NodeExecutor) + console.log(`The current Executors are ${this.executors.size}`); + } +} + +export const ExecutionRegister = new ExecutionRegistry();