Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions apps/http-backend/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,14 @@ console.log("📁 Loaded .env from:", envPath);
console.log("🔐 GOOGLE_REDIRECT_URI:", process.env.GOOGLE_REDIRECT_URI || "NOT SET");

import cookieParser from 'cookie-parser'
import { NodeRegistry } from "@repo/nodes/nodeClient";
import { ExecutionRegister, NodeRegistry } from "@repo/nodes/nodeClient";
import express from "express";
import { userRouter } from "./routes/userRoutes/userRoutes.js";
import cors from "cors"
import { sheetRouter } from "./routes/sheet.routes.js";
import { googleAuth } from "./routes/google_callback.js";
import { tokenScheduler } from "./scheduler/token-scheduler.js";
import { execRouter } from "./routes/userRoutes/executionRoutes.js";

const app = express()

Expand All @@ -33,13 +34,15 @@ app.use(cookieParser());

app.use("/user" , userRouter)
app.use('/node', sheetRouter)
app.use('/auth/google', googleAuth) // ← CHANGED THIS LINE!
app.use('/auth/google', googleAuth)
app.use('/execute', execRouter)

const PORT= 3002

async function startServer() {
await NodeRegistry.registerAll()
tokenScheduler.start();
ExecutionRegister.initialize()
app.listen(PORT, () => {
console.log(`Server running on port ${PORT}`);
})
Expand Down
61 changes: 61 additions & 0 deletions apps/http-backend/src/routes/userRoutes/executionRoutes.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
import { Response, Request, Router } from "express";
import { AuthRequest, userMiddleware } from "./userMiddleware.js";
import { ExecuteNode, statusCodes } from "@repo/common/zod";
import { prismaClient } from "@repo/db";
import { ExecutionRegister } from '@repo/nodes'
export const execRouter: Router = Router()

execRouter.post('/node', userMiddleware, async(req: AuthRequest, res: Response)=>{
try{
if(!req.user){
return res.status(statusCodes.BAD_REQUEST).json({
message: "User is not logged in ",
});
}
const data = req.body;
const dataSafe = ExecuteNode.safeParse(data)
if(!dataSafe.success){
return res.status(statusCodes.BAD_REQUEST).json({
message: "Invalid input"
})
}
const nodeData = await prismaClient.node.findFirst({
where: {id: dataSafe.data.NodeId},
include: {
AvailableNode: true
}
})

if(nodeData){
const type = nodeData.AvailableNode.type
const config = dataSafe.data.Config ? dataSafe.data.Config : nodeData.config // for test api data prefered fist then config in db
console.log(`config and type: ${JSON.stringify(config)} & ${type}`)
const context = {
userId: req.user.sub || "",
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Empty string fallback for userId may cause issues.

If req.user.sub is undefined, using an empty string as userId could cause downstream failures (e.g., credential lookups). Consider returning an error instead.

🛡️ Suggested defensive handling
+            if (!req.user.sub) {
+                return res.status(statusCodes.BAD_REQUEST).json({
+                    message: "User ID not found in token"
+                });
+            }
             const context = {
-                userId: req.user.sub || "",
+                userId: req.user.sub,
                 config: config   
             }
🤖 Prompt for AI Agents
In `@apps/http-backend/src/routes/userRoutes/executionRoutes.ts` at line 34, The
current fallback to an empty string for userId (userId: req.user.sub || "") is
unsafe; update the route handler in executionRoutes to validate req.user?.sub
and return a 400/401 error if missing instead of using "" so downstream
credential lookups won't receive an invalid id. Locate the handler constructing
the payload with userId and add a guard that checks req.user and req.user.sub,
sending an explicit error response (with a clear message) when absent; only
proceed to call the downstream logic using the validated userId value.

config: config
}
Comment on lines +33 to +36
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Missing credentialsId in execution context.

The context object doesn't include credentialsId, which is needed by executors like GmailExecutor and GoogleSheetsNodeExecutor for authentication. Without it, node execution will fail with authorization errors.

🐛 Proposed fix - include credentials
             const context = {
                 userId: req.user.sub || "",
-                config: config   
+                config: config,
+                credentialsId: nodeData.CredentialsID
             }

Note: Verify that nodeData includes CredentialsID by updating the Prisma query to include the credentials relation if needed.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
const context = {
userId: req.user.sub || "",
config: config
}
const context = {
userId: req.user.sub || "",
config: config,
credentialsId: nodeData.CredentialsID
}
🤖 Prompt for AI Agents
In `@apps/http-backend/src/routes/userRoutes/executionRoutes.ts` around lines 33 -
36, The execution context object currently created in executionRoutes (variable
context with userId: req.user.sub and config) must also include credentialsId so
executors like GmailExecutor and GoogleSheetsNodeExecutor can authenticate;
update the context to add credentialsId sourced from the node data (e.g.,
nodeData.CredentialsID or nodeData.credentials.id) and ensure the Prisma query
that fetches nodeData includes the credentials relation so CredentialsID is
available to use in context.

const executionResult = await ExecutionRegister.execute(type, context)

console.log(`Execution result: ${executionResult}`)

if(executionResult)
return res.status(statusCodes.ACCEPTED).json({
message: `${nodeData.name} node execution done` ,
data: executionResult
})

return res.status(statusCodes.FORBIDDEN).json({
message: `${nodeData.name} node execution failed`
})
Comment on lines +41 to +49
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Logic error: Response always returns 202 regardless of execution success.

The condition if(executionResult) checks for truthiness of the object, which is always true since ExecutionRegister.execute() returns an object. The subsequent FORBIDDEN response (lines 47-49) is unreachable. You should check executionResult.success instead.

🐛 Proposed fix
-            if(executionResult)
+            if(executionResult.success)
                 return res.status(statusCodes.ACCEPTED).json({
                 message: `${nodeData.name} node execution done` ,
                 data: executionResult       
             })

             return res.status(statusCodes.FORBIDDEN).json({
-                message: `${nodeData.name} node execution failed`
+                message: `${nodeData.name} node execution failed`,
+                error: executionResult.error
             })
🤖 Prompt for AI Agents
In `@apps/http-backend/src/routes/userRoutes/executionRoutes.ts` around lines 41 -
49, The current truthy check on executionResult causes the ACCEPTED branch to
always run; change the conditional to inspect the actual success flag returned
by ExecutionRegister.execute() by replacing if(executionResult) with if
(executionResult && executionResult.success === true) (or simply if
(executionResult?.success)), so the res.status(statusCodes.ACCEPTED) path runs
only on success and the res.status(statusCodes.FORBIDDEN) path is reachable when
executionResult.success is falsy; keep the existing response bodies that
reference nodeData.name and executionResult.data.

}
return res.status(statusCodes.NOT_FOUND).json({
message: `${dataSafe.data.NodeId} not found`
})

}catch(e){
console.log("This is the error from executing node", e);
return res.status(statusCodes.INTERNAL_SERVER_ERROR).json({
message: "Internal server Error from node execution ",
});
}
})
30 changes: 28 additions & 2 deletions apps/worker/src/engine/executor.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { prismaClient } from "@repo/db/client";
import { register } from "./registory.js";
// import { register } from "./registory.js";
import { ExecutionRegister } from "@repo/nodes";
export async function executeWorkflow(
workflowExecutionId: string
): Promise<void> {
Expand Down Expand Up @@ -41,6 +42,14 @@ export async function executeWorkflow(
console.log(`Total nodes - ${nodes.length}`);
for (const node of nodes) {
console.log(`${node.name}, ${node.stage}, ${node.id}th - started Execution`);
const nodeExecution = await prismaClient.nodeExecution.create({
data:{
nodeId: node.id,
workflowExecId: workflowExecutionId,
status: "Start",
startedAt: new Date()
}
})
const nodeType = node.AvailableNode.type;
// Create mutable copy of config
const nodeConfig = { ...(node.config as Record<string, any>) };
Expand All @@ -63,7 +72,7 @@ export async function executeWorkflow(
console.log(`Executing with context: ${JSON.stringify(context)}`);
console.log(`Executing with context: ${context.credId}`);

const execute = await register.execute(nodeType, context);
const execute = await ExecutionRegister.execute(nodeType, context);
if (!execute.success) {
await prismaClient.workflowExecution.update({
where: { id: workflowExecutionId },
Expand All @@ -73,8 +82,25 @@ export async function executeWorkflow(
completedAt: new Date(),
},
});

await prismaClient.nodeExecution.update({
where: {id: nodeExecution.id},
data:{
status: "Failed",
error: execute.error,
completedAt: new Date()
}
})
return;
}
await prismaClient.nodeExecution.update({
where: {id: nodeExecution.id},
data: {
completedAt: new Date(),
outputData: execute.output,
status: "Completed"
}
})
currentInputData = execute.output;
console.log("output: ", JSON.stringify(execute));
}
Expand Down
5 changes: 3 additions & 2 deletions apps/worker/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
import { Kafka } from "kafkajs";
import { executeWorkflow } from "./engine/executor.js";
import { register } from "./engine/registory.js";
import { ExecutionRegister } from "@repo/nodes";
// import { register } from "./engine/registory.js";
const kafka = new Kafka({
// clientId: "Processing App",
clientId: "BuildFlow-Worker",
brokers: ["localhost:9092"],
});
const TOPIC_NAME = "First-Client";

register.initialize()
ExecutionRegister.initialize()

async function main() {
const consumer = kafka.consumer({ groupId: "test-group" });
Expand Down
5 changes: 5 additions & 0 deletions packages/common/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@ export const NodeSchema = z.object({
export const ExecuteWorkflow = z.object({
workflowId : z.string(),
})

export const ExecuteNode = z.object({
NodeId: z.string(),
Config: z.any().optional()
})
export const NodeUpdateSchema = z.object({
NodeId: z.string(),
Config: z.any().optional(),
Expand Down
2 changes: 2 additions & 0 deletions packages/nodes/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Central export for all major modules
// export { default as NodeRegistry } from './registry/node-registry.js';


export { GmailExecutor } from './gmail/gmail.executor.js';


Expand All @@ -10,6 +11,7 @@ export type { OAuthTokens } from './common/google-oauth-service.js'
export { default as GoogleSheetsNodeExecutor } from './google-sheets/google-sheets.executor.js';
export { GoogleOAuthService } from './common/google-oauth-service.js';
export { default as NodeRegistry } from './registry/node-registry.js';
export { ExecutionRegister } from './registry/execution.registory.js';
export { GoogleSheetsService } from './google-sheets/google-sheets.service.js';

console.log("Hello World From node / index.ts");
16 changes: 16 additions & 0 deletions packages/nodes/src/registry/Execution.config.types.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
export interface ExecutionContext {
nodeId?: string;
userId: string;
credentialsId?: string;
config: Record<string, any>;
inputData?: any;
}
Comment on lines +1 to +7
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Search for credId and credentialsId usage patterns in executor files
echo "=== Checking credId usage ==="
rg -n "credId" --type ts

echo ""
echo "=== Checking credentialsId usage ==="
rg -n "credentialsId" --type ts

Repository: Dev-Pross/BuildFlow

Length of output: 3481


Fix field naming mismatch between ExecutionContext interface and executor implementations.

ExecutionContext defines credentialsId, but GmailExecutor and GoogleSheetsNodeExecutor access context.credId. The worker engine (apps/worker/src/engine/executor.ts:67) also sets the context field as credId, not credentialsId. This mismatch will cause credentials to be undefined at runtime. Rename credentialsId to credId in the interface to match the actual usage.

🤖 Prompt for AI Agents
In `@packages/nodes/src/registry/Execution.config.types.ts` around lines 1 - 7,
The ExecutionContext interface uses the wrong field name for credentials: change
the property name from credentialsId to credId so it matches usage across the
codebase (e.g., GmailExecutor, GoogleSheetsNodeExecutor) and the worker engine
that sets context.credId (see executor implementations and
apps/worker/src/engine/executor.ts), ensuring all references are consistent and
types updated accordingly.

export interface ExecutionResult {
success: boolean;
output?: any;
error?: string;
metadata?: Record<any, any>;
}
export interface NodeExecutor {
execute(context: ExecutionContext): Promise<ExecutionResult>;
}
47 changes: 47 additions & 0 deletions packages/nodes/src/registry/execution.registory.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import { ExecutionContext, ExecutionResult, NodeExecutor } from "../registry/Execution.config.types.js";
import { GmailExecutor } from "@repo/nodes/nodeClient";
import {GoogleSheetsNodeExecutor} from "@repo/nodes/nodeClient";
class ExecutionRegistry {
private executors = new Map<string, NodeExecutor>();

register(nodeType: string, executor: NodeExecutor) {
this.executors.set(nodeType, executor);
}

async execute(
nodeType: string,
context: ExecutionContext
): Promise<ExecutionResult> {
const executor = this.executors.get(nodeType);
if (!executor) {
return {
success: false,
error: `No Executor found for ${nodeType}`,
};
}
try {
const result = await executor.execute(context)
console.log("Execute result:", result);

return result;
} catch (error: any) {
return {
success: false,
error: error,
};
}
Comment on lines +27 to +32
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Error object assigned to string field.

ExecutionResult.error is typed as string, but the caught error object is assigned directly. This can result in [object Object] or unexpected serialization.

🐛 Proposed fix
     } catch (error: any) {
       return {
         success: false,
-        error: error,
+        error: error instanceof Error ? error.message : String(error),
       };
     }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
} catch (error: any) {
return {
success: false,
error: error,
};
}
} catch (error: any) {
return {
success: false,
error: error instanceof Error ? error.message : String(error),
};
}
🤖 Prompt for AI Agents
In `@packages/nodes/src/registry/execution.registory.ts` around lines 27 - 32, The
catch block returns the raw caught value into ExecutionResult.error which is
typed as string; change the catch handling in the catch of the function in
execution.registory.ts to normalize the error to a string (e.g. use error
instanceof Error ? error.message : typeof error === 'string' ? error :
JSON.stringify(error)) and return that string in the error field so
ExecutionResult.error remains a string; ensure you handle potential
JSON.stringify failures by falling back to String(error).

}
initialize() {
// TODO: Ensure GmailExecutor implements NodeExecutor and is compatible with ExecutionContext
// If needed, adapt/extract a compatible Executor for registration.
// For now, this cast suppresses the type error. Refactor as soon as possible!


//wehen visits this next time make sure chang gmail executor implements NodeExecutor
this.register("gmail", new GmailExecutor() as unknown as NodeExecutor);
this.register("google_sheet", new GoogleSheetsNodeExecutor() as unknown as NodeExecutor)
console.log(`The current Executors are ${this.executors.size}`);
}
Comment on lines +34 to +44
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Address the type incompatibility TODOs before merging.

The as unknown as NodeExecutor casts suppress type errors but hide real interface mismatches (e.g., credId vs credentialsId, different ExecutionContext types). This can cause runtime failures. Consider creating adapter functions or updating the executor interfaces to properly implement NodeExecutor.

Would you like me to help design an adapter pattern to properly bridge the executor implementations with the NodeExecutor interface?

🤖 Prompt for AI Agents
In `@packages/nodes/src/registry/execution.registory.ts` around lines 34 - 44, The
code registers GmailExecutor and GoogleSheetsNodeExecutor using unsafe casts (as
unknown as NodeExecutor), hiding real interface mismatches; replace these casts
by adapting the executors to the NodeExecutor contract: implement an adapter or
wrapper class/function (e.g., GmailNodeExecutorAdapter,
GoogleSheetsNodeExecutorAdapter) that implements NodeExecutor and internally
translates the executor methods/fields (map credId -> credentialsId, convert
their ExecutionContext shape to the NodeExecutor ExecutionContext, and delegate
calls to GmailExecutor/GoogleSheetsNodeExecutor), then call
this.register("gmail", new GmailNodeExecutorAdapter(new GmailExecutor())) and
this.register("google_sheet", new GoogleSheetsNodeExecutorAdapter(new
GoogleSheetsNodeExecutor())); ensure the adapter types satisfy NodeExecutor and
remove the casts so the TypeScript compiler enforces correctness.

}

export const ExecutionRegister = new ExecutionRegistry();