From d2e0887d98273cfb3ab615a0a9b18a686b5283bc Mon Sep 17 00:00:00 2001 From: Abdulaziz Alwaalan Date: Sat, 25 Oct 2025 02:51:47 +0300 Subject: [PATCH 01/12] add schema selection on postgres config --- CONTRIBUTING.md | 2 ++ docker/.env.example | 1 + docker/docker-compose-queue-prebuilt.yml | 2 ++ docker/docker-compose.yml | 1 + docker/worker/.env.example | 1 + docker/worker/docker-compose.yml | 1 + packages/server/src/DataSource.ts | 15 ++++++++++++++- packages/server/src/commands/base.ts | 2 ++ .../middleware/passport/SessionPersistance.ts | 1 + 9 files changed, 25 insertions(+), 1 deletion(-) diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 194bf66235d..edec1007483 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -140,6 +140,8 @@ Flowise support different environment variables to configure your instance. You | DATABASE_USER | Database username (When DATABASE_TYPE is not sqlite) | String | | | DATABASE_PASSWORD | Database password (When DATABASE_TYPE is not sqlite) | String | | | DATABASE_NAME | Database name (When DATABASE_TYPE is not sqlite) | String | | +| DATABASE_SCHEMA | Database schema (When DATABASE_TYPE is postgres) | String | | + | DATABASE_SSL_KEY_BASE64 | Database SSL client cert in base64 (takes priority over DATABASE_SSL) | Boolean | false | | DATABASE_SSL | Database connection overssl (When DATABASE_TYPE is postgre) | Boolean | false | | SECRETKEY_PATH | Location where encryption key (used to encrypt/decrypt credentials) is saved | String | `your-path/Flowise/packages/server` | diff --git a/docker/.env.example b/docker/.env.example index dab25248a80..47573ecc50d 100644 --- a/docker/.env.example +++ b/docker/.env.example @@ -11,6 +11,7 @@ DATABASE_PATH=/root/.flowise # DATABASE_PORT=5432 # DATABASE_HOST="" # DATABASE_NAME=flowise +# DATABASE_SCHEMA: default, # DATABASE_USER=root # DATABASE_PASSWORD=mypassword # DATABASE_SSL=true diff --git a/docker/docker-compose-queue-prebuilt.yml b/docker/docker-compose-queue-prebuilt.yml index 0063eeb1fd9..2b7bec84288 100644 --- a/docker/docker-compose-queue-prebuilt.yml +++ b/docker/docker-compose-queue-prebuilt.yml @@ -28,6 +28,7 @@ services: - DATABASE_PORT=${DATABASE_PORT} - DATABASE_HOST=${DATABASE_HOST} - DATABASE_NAME=${DATABASE_NAME} + - DATABASE_SCHEMA=${DATABASE_SCHEMA} - DATABASE_USER=${DATABASE_USER} - DATABASE_PASSWORD=${DATABASE_PASSWORD} - DATABASE_SSL=${DATABASE_SSL} @@ -172,6 +173,7 @@ services: - DATABASE_PORT=${DATABASE_PORT} - DATABASE_HOST=${DATABASE_HOST} - DATABASE_NAME=${DATABASE_NAME} + - DATABASE_SCHEMA=${DATABASE_SCHEMA} - DATABASE_USER=${DATABASE_USER} - DATABASE_PASSWORD=${DATABASE_PASSWORD} - DATABASE_SSL=${DATABASE_SSL} diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index f66d7106d2e..e098433495a 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -13,6 +13,7 @@ services: - DATABASE_PORT=${DATABASE_PORT} - DATABASE_HOST=${DATABASE_HOST} - DATABASE_NAME=${DATABASE_NAME} + - DATABASE_SCHEMA=${DATABASE_SCHEMA} - DATABASE_USER=${DATABASE_USER} - DATABASE_PASSWORD=${DATABASE_PASSWORD} - DATABASE_SSL=${DATABASE_SSL} diff --git a/docker/worker/.env.example b/docker/worker/.env.example index 769286dffee..4f479dddbce 100644 --- a/docker/worker/.env.example +++ b/docker/worker/.env.example @@ -11,6 +11,7 @@ DATABASE_PATH=/root/.flowise # DATABASE_PORT=5432 # DATABASE_HOST="" # DATABASE_NAME=flowise +# # DATABASE_SCHEMA: default, # DATABASE_USER=root # DATABASE_PASSWORD=mypassword # DATABASE_SSL=true diff --git a/docker/worker/docker-compose.yml b/docker/worker/docker-compose.yml index 952dc04cd8e..d237532a07f 100644 --- a/docker/worker/docker-compose.yml +++ b/docker/worker/docker-compose.yml @@ -13,6 +13,7 @@ services: - DATABASE_PORT=${DATABASE_PORT} - DATABASE_HOST=${DATABASE_HOST} - DATABASE_NAME=${DATABASE_NAME} + - DATABASE_SCHEMA=${DATABASE_SCHEMA} - DATABASE_USER=${DATABASE_USER} - DATABASE_PASSWORD=${DATABASE_PASSWORD} - DATABASE_SSL=${DATABASE_SSL} diff --git a/packages/server/src/DataSource.ts b/packages/server/src/DataSource.ts index a2832c30d79..66b1aefe36b 100644 --- a/packages/server/src/DataSource.ts +++ b/packages/server/src/DataSource.ts @@ -63,6 +63,14 @@ export const init = async (): Promise => { }) break case 'postgres': + logger.info('Using Postgres as the database') + logger.info(`Database Host: ${process.env.DATABASE_HOST}`) + logger.info(`Database Port: ${process.env.DATABASE_PORT}`) + logger.info(`Database User: ${process.env.DATABASE_USER}`) + logger.info(`Database Name: ${process.env.DATABASE_NAME}`) + logger.info(`Database Schema: ${process.env.DATABASE_SCHEMA}`) + const dbSchema = process.env.DATABASE_SCHEMA; + const isCustomSchema = dbSchema && dbSchema !== 'public'; appDataSource = new DataSource({ type: 'postgres', host: process.env.DATABASE_HOST, @@ -70,13 +78,17 @@ export const init = async (): Promise => { username: process.env.DATABASE_USER, password: process.env.DATABASE_PASSWORD, database: process.env.DATABASE_NAME, + ...(isCustomSchema && { schema: dbSchema }), // Only set if custom schema ssl: getDatabaseSSLFromEnv(), synchronize: false, migrationsRun: false, entities: Object.values(entities), migrations: postgresMigrations, + migrationsTableName: 'migrations', extra: { - idleTimeoutMillis: 120000 + idleTimeoutMillis: 120000, + // Add this to set the search_path + ...(isCustomSchema && { options: `-c search_path=${dbSchema},public` }) }, logging: ['error', 'warn', 'info', 'log'], logger: 'advanced-console', @@ -105,6 +117,7 @@ export function getDataSource(): DataSource { if (appDataSource === undefined) { init() } + console.debug('Data Source Type:', appDataSource) return appDataSource } diff --git a/packages/server/src/commands/base.ts b/packages/server/src/commands/base.ts index 4795ec4b31e..ed25b752254 100644 --- a/packages/server/src/commands/base.ts +++ b/packages/server/src/commands/base.ts @@ -29,6 +29,7 @@ export abstract class BaseCommand extends Command { DATABASE_PORT: Flags.string(), DATABASE_HOST: Flags.string(), DATABASE_NAME: Flags.string(), + DATABASE_SCHEMA: Flags.string(), DATABASE_USER: Flags.string(), DATABASE_PASSWORD: Flags.string(), DATABASE_SSL: Flags.string(), @@ -159,6 +160,7 @@ export abstract class BaseCommand extends Command { if (flags.DATABASE_PORT) process.env.DATABASE_PORT = flags.DATABASE_PORT if (flags.DATABASE_HOST) process.env.DATABASE_HOST = flags.DATABASE_HOST if (flags.DATABASE_NAME) process.env.DATABASE_NAME = flags.DATABASE_NAME + if (flags.DATABASE_SCHEMA) process.env.DATABASE_SCHEMA = flags.DATABASE_SCHEMA if (flags.DATABASE_USER) process.env.DATABASE_USER = flags.DATABASE_USER if (flags.DATABASE_PASSWORD) process.env.DATABASE_PASSWORD = flags.DATABASE_PASSWORD if (flags.DATABASE_SSL) process.env.DATABASE_SSL = flags.DATABASE_SSL diff --git a/packages/server/src/enterprise/middleware/passport/SessionPersistance.ts b/packages/server/src/enterprise/middleware/passport/SessionPersistance.ts index bd21dbae730..67e5ef36b51 100644 --- a/packages/server/src/enterprise/middleware/passport/SessionPersistance.ts +++ b/packages/server/src/enterprise/middleware/passport/SessionPersistance.ts @@ -68,6 +68,7 @@ export const initializeDBClientAndStore: any = () => { user: process.env.DATABASE_USER, password: process.env.DATABASE_PASSWORD, database: process.env.DATABASE_NAME, + schema: process.env.DATABASE_SCHEMA, ssl: getDatabaseSSLFromEnv() }) return new pgSession({ From eb9ee423177057bfd86111ecaf34c039a397d11a Mon Sep 17 00:00:00 2001 From: Abdulaziz Alwaalan Date: Sat, 25 Oct 2025 02:55:19 +0300 Subject: [PATCH 02/12] chore: remove logs and fix comments --- packages/server/src/DataSource.ts | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/packages/server/src/DataSource.ts b/packages/server/src/DataSource.ts index 66b1aefe36b..f685af980ed 100644 --- a/packages/server/src/DataSource.ts +++ b/packages/server/src/DataSource.ts @@ -63,12 +63,6 @@ export const init = async (): Promise => { }) break case 'postgres': - logger.info('Using Postgres as the database') - logger.info(`Database Host: ${process.env.DATABASE_HOST}`) - logger.info(`Database Port: ${process.env.DATABASE_PORT}`) - logger.info(`Database User: ${process.env.DATABASE_USER}`) - logger.info(`Database Name: ${process.env.DATABASE_NAME}`) - logger.info(`Database Schema: ${process.env.DATABASE_SCHEMA}`) const dbSchema = process.env.DATABASE_SCHEMA; const isCustomSchema = dbSchema && dbSchema !== 'public'; appDataSource = new DataSource({ @@ -78,7 +72,7 @@ export const init = async (): Promise => { username: process.env.DATABASE_USER, password: process.env.DATABASE_PASSWORD, database: process.env.DATABASE_NAME, - ...(isCustomSchema && { schema: dbSchema }), // Only set if custom schema + ...(isCustomSchema && { schema: dbSchema }), // set custom schema if provided ssl: getDatabaseSSLFromEnv(), synchronize: false, migrationsRun: false, @@ -87,7 +81,7 @@ export const init = async (): Promise => { migrationsTableName: 'migrations', extra: { idleTimeoutMillis: 120000, - // Add this to set the search_path + // set the search_path for the migrations and queries to the desired schema ...(isCustomSchema && { options: `-c search_path=${dbSchema},public` }) }, logging: ['error', 'warn', 'info', 'log'], From 43c142d074582af92c95ae026db1332d93369108 Mon Sep 17 00:00:00 2001 From: Abdulaziz Alwaalan Date: Sat, 25 Oct 2025 02:55:19 +0300 Subject: [PATCH 03/12] chore: remove logs and fix comments --- packages/server/src/DataSource.ts | 1 - 1 file changed, 1 deletion(-) diff --git a/packages/server/src/DataSource.ts b/packages/server/src/DataSource.ts index f685af980ed..0ae6a2ba61a 100644 --- a/packages/server/src/DataSource.ts +++ b/packages/server/src/DataSource.ts @@ -111,7 +111,6 @@ export function getDataSource(): DataSource { if (appDataSource === undefined) { init() } - console.debug('Data Source Type:', appDataSource) return appDataSource } From 7d0951226dfe56092c3744d41cf4f3c6428dccd0 Mon Sep 17 00:00:00 2001 From: Abdulaziz Alwaalan Date: Sat, 25 Oct 2025 02:55:19 +0300 Subject: [PATCH 04/12] fix: md syntax error on CONTRIBUTING.md --- CONTRIBUTING.md | 1 - 1 file changed, 1 deletion(-) diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index edec1007483..a6f5a2d0c16 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -141,7 +141,6 @@ Flowise support different environment variables to configure your instance. You | DATABASE_PASSWORD | Database password (When DATABASE_TYPE is not sqlite) | String | | | DATABASE_NAME | Database name (When DATABASE_TYPE is not sqlite) | String | | | DATABASE_SCHEMA | Database schema (When DATABASE_TYPE is postgres) | String | | - | DATABASE_SSL_KEY_BASE64 | Database SSL client cert in base64 (takes priority over DATABASE_SSL) | Boolean | false | | DATABASE_SSL | Database connection overssl (When DATABASE_TYPE is postgre) | Boolean | false | | SECRETKEY_PATH | Location where encryption key (used to encrypt/decrypt credentials) is saved | String | `your-path/Flowise/packages/server` | From b5f3e939f1bf487a9f8b8d27e8b9ef5437326ffa Mon Sep 17 00:00:00 2001 From: Abdulaziz Alwaalan Date: Sat, 25 Oct 2025 19:58:02 +0300 Subject: [PATCH 05/12] fix: linting error --- packages/server/src/DataSource.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/packages/server/src/DataSource.ts b/packages/server/src/DataSource.ts index 0ae6a2ba61a..b5d42a40118 100644 --- a/packages/server/src/DataSource.ts +++ b/packages/server/src/DataSource.ts @@ -63,8 +63,8 @@ export const init = async (): Promise => { }) break case 'postgres': - const dbSchema = process.env.DATABASE_SCHEMA; - const isCustomSchema = dbSchema && dbSchema !== 'public'; + const dbSchema = process.env.DATABASE_SCHEMA + const isCustomSchema = dbSchema && dbSchema !== 'public' appDataSource = new DataSource({ type: 'postgres', host: process.env.DATABASE_HOST, From 4725d67824647018cac544e474dec21940c3f2fd Mon Sep 17 00:00:00 2001 From: Abdulaziz Alwaalan Date: Tue, 28 Oct 2025 18:52:24 +0300 Subject: [PATCH 06/12] refactored the change and fixed minor issues --- .../PostgresRecordManager.ts | 63 +++++++++++++------ .../PostgresRecordManager/utils.ts | 3 + .../nodes/vectorstores/Postgres/Postgres.ts | 10 ++- .../vectorstores/Postgres/driver/Base.ts | 16 ++++- .../vectorstores/Postgres/driver/TypeORM.ts | 9 ++- .../nodes/vectorstores/Postgres/utils.ts | 4 +- .../vectorstores/Singlestore/Singlestore.ts | 2 + packages/server/src/DataSource.ts | 1 - .../middleware/passport/SessionPersistance.ts | 2 +- 9 files changed, 83 insertions(+), 27 deletions(-) diff --git a/packages/components/nodes/recordmanager/PostgresRecordManager/PostgresRecordManager.ts b/packages/components/nodes/recordmanager/PostgresRecordManager/PostgresRecordManager.ts index cf239522f5b..dfc30fa8de4 100644 --- a/packages/components/nodes/recordmanager/PostgresRecordManager/PostgresRecordManager.ts +++ b/packages/components/nodes/recordmanager/PostgresRecordManager/PostgresRecordManager.ts @@ -3,7 +3,7 @@ import { getBaseClasses, getCredentialData, getCredentialParam } from '../../../ import { ListKeyOptions, RecordManagerInterface, UpdateOptions } from '@langchain/community/indexes/base' import { DataSource } from 'typeorm' import { getHost, getSSL } from '../../vectorstores/Postgres/utils' -import { getDatabase, getPort, getTableName } from './utils' +import { getDatabase, getPort, getSchema, getTableName } from './utils' const serverCredentialsExists = !!process.env.POSTGRES_RECORDMANAGER_USER && !!process.env.POSTGRES_RECORDMANAGER_PASSWORD @@ -74,6 +74,14 @@ class PostgresRecordManager_RecordManager implements INode { additionalParams: true, optional: true }, + { + label: 'Schema Name', + name: 'schema', + type: 'string', + placeholder: getSchema(), + additionalParams: true, + optional: true + }, { label: 'Namespace', name: 'namespace', @@ -160,7 +168,8 @@ class PostgresRecordManager_RecordManager implements INode { ssl: getSSL(nodeData), username: user, password: password, - database: getDatabase(nodeData) + database: getDatabase(nodeData), + schema: getSchema(nodeData) } const args = { @@ -186,12 +195,14 @@ class PostgresRecordManager implements RecordManagerInterface { lc_namespace = ['langchain', 'recordmanagers', 'postgres'] config: PostgresRecordManagerOptions tableName: string + schema: string namespace: string constructor(namespace: string, config: PostgresRecordManagerOptions) { - const { tableName } = config + const { tableName, postgresConnectionOptions: { schema }} = config this.namespace = namespace this.tableName = tableName + this.schema = schema this.config = config } @@ -206,7 +217,17 @@ class PostgresRecordManager implements RecordManagerInterface { return tableName } + sanitizeSchema(schema: string): string { + // Trim and normalize case, turn whitespace into underscores + schema = schema.trim().toLowerCase().replace(/\s+/g, '_') + + // Validate using a regex (alphanumeric and underscores only) + if (!/^[a-zA-Z0-9_]+$/.test(schema)) { + throw new Error('Invalid table name') + } + return schema + } private async getDataSource(): Promise { const { postgresConnectionOptions } = this.config if (!postgresConnectionOptions) { @@ -225,10 +246,9 @@ class PostgresRecordManager implements RecordManagerInterface { try { const dataSource = await this.getDataSource() const queryRunner = dataSource.createQueryRunner() - const tableName = this.sanitizeTableName(this.tableName) - + const fullTableName = this.getFullTableName() await queryRunner.manager.query(` - CREATE TABLE IF NOT EXISTS "${tableName}" ( + CREATE TABLE IF NOT EXISTS ${fullTableName} ( uuid UUID PRIMARY KEY DEFAULT gen_random_uuid(), key TEXT NOT NULL, namespace TEXT NOT NULL, @@ -236,10 +256,10 @@ class PostgresRecordManager implements RecordManagerInterface { group_id TEXT, UNIQUE (key, namespace) ); - CREATE INDEX IF NOT EXISTS updated_at_index ON "${tableName}" (updated_at); - CREATE INDEX IF NOT EXISTS key_index ON "${tableName}" (key); - CREATE INDEX IF NOT EXISTS namespace_index ON "${tableName}" (namespace); - CREATE INDEX IF NOT EXISTS group_id_index ON "${tableName}" (group_id);`) + CREATE INDEX IF NOT EXISTS updated_at_index ON ${fullTableName} (updated_at); + CREATE INDEX IF NOT EXISTS key_index ON ${fullTableName} (key); + CREATE INDEX IF NOT EXISTS namespace_index ON ${fullTableName} (namespace); + CREATE INDEX IF NOT EXISTS group_id_index ON ${fullTableName} (group_id);`) await queryRunner.release() } catch (e: any) { @@ -291,8 +311,7 @@ class PostgresRecordManager implements RecordManagerInterface { const dataSource = await this.getDataSource() const queryRunner = dataSource.createQueryRunner() - const tableName = this.sanitizeTableName(this.tableName) - + const fullTableName = this.getFullTableName() const updatedAt = await this.getTime() const { timeAtLeast, groupIds: _groupIds } = updateOptions ?? {} @@ -310,7 +329,7 @@ class PostgresRecordManager implements RecordManagerInterface { const valuesPlaceholders = recordsToUpsert.map((_, j) => this.generatePlaceholderForRowAt(j, recordsToUpsert[0].length)).join(', ') - const query = `INSERT INTO "${tableName}" (key, namespace, updated_at, group_id) VALUES ${valuesPlaceholders} ON CONFLICT (key, namespace) DO UPDATE SET updated_at = EXCLUDED.updated_at;` + const query = `INSERT INTO ${fullTableName} (key, namespace, updated_at, group_id) VALUES ${valuesPlaceholders} ON CONFLICT (key, namespace) DO UPDATE SET updated_at = EXCLUDED.updated_at;` try { await queryRunner.manager.query(query, recordsToUpsert.flat()) await queryRunner.release() @@ -329,13 +348,12 @@ class PostgresRecordManager implements RecordManagerInterface { const dataSource = await this.getDataSource() const queryRunner = dataSource.createQueryRunner() - const tableName = this.sanitizeTableName(this.tableName) - + const fullTableName = this.getFullTableName() const startIndex = 2 const arrayPlaceholders = keys.map((_, i) => `$${i + startIndex}`).join(', ') const query = ` - SELECT k, (key is not null) ex from unnest(ARRAY[${arrayPlaceholders}]) k left join "${tableName}" on k=key and namespace = $1; + SELECT k, (key is not null) ex from unnest(ARRAY[${arrayPlaceholders}]) k left join ${fullTableName} on k=key and namespace = $1; ` try { const res = await queryRunner.manager.query(query, [this.namespace, ...keys.flat()]) @@ -351,9 +369,9 @@ class PostgresRecordManager implements RecordManagerInterface { async listKeys(options?: ListKeyOptions): Promise { const { before, after, limit, groupIds } = options ?? {} - const tableName = this.sanitizeTableName(this.tableName) + const fullTableName = this.getFullTableName() - let query = `SELECT key FROM "${tableName}" WHERE namespace = $1` + let query = `SELECT key FROM ${fullTableName} WHERE namespace = $1` const values: (string | number | (string | null)[])[] = [this.namespace] let index = 2 @@ -405,10 +423,10 @@ class PostgresRecordManager implements RecordManagerInterface { const dataSource = await this.getDataSource() const queryRunner = dataSource.createQueryRunner() - const tableName = this.sanitizeTableName(this.tableName) + const fullTableName = this.getFullTableName() try { - const query = `DELETE FROM "${tableName}" WHERE namespace = $1 AND key = ANY($2);` + const query = `DELETE FROM ${fullTableName} WHERE namespace = $1 AND key = ANY($2);` await queryRunner.manager.query(query, [this.namespace, keys]) await queryRunner.release() } catch (error) { @@ -418,6 +436,11 @@ class PostgresRecordManager implements RecordManagerInterface { await dataSource.destroy() } } + getFullTableName(): string { + const tableName = this.sanitizeTableName(this.tableName) + const schema = this.sanitizeSchema(this.schema) + return schema ? `${schema}.${tableName}` : tableName + } } module.exports = { nodeClass: PostgresRecordManager_RecordManager } diff --git a/packages/components/nodes/recordmanager/PostgresRecordManager/utils.ts b/packages/components/nodes/recordmanager/PostgresRecordManager/utils.ts index e3547fc89a9..8b0d7c45fff 100644 --- a/packages/components/nodes/recordmanager/PostgresRecordManager/utils.ts +++ b/packages/components/nodes/recordmanager/PostgresRecordManager/utils.ts @@ -7,6 +7,9 @@ export function getHost(nodeData?: INodeData) { export function getDatabase(nodeData?: INodeData) { return defaultChain(nodeData?.inputs?.database, process.env.POSTGRES_RECORDMANAGER_DATABASE) } +export function getSchema(nodeData?: INodeData) { + return defaultChain(nodeData?.inputs?.schema, process.env.DATABASE_SCHEMA, 'public') +} export function getPort(nodeData?: INodeData) { return defaultChain(nodeData?.inputs?.port, process.env.POSTGRES_RECORDMANAGER_PORT, '5432') diff --git a/packages/components/nodes/vectorstores/Postgres/Postgres.ts b/packages/components/nodes/vectorstores/Postgres/Postgres.ts index ce4a7ba22e0..6dff116f907 100644 --- a/packages/components/nodes/vectorstores/Postgres/Postgres.ts +++ b/packages/components/nodes/vectorstores/Postgres/Postgres.ts @@ -8,7 +8,7 @@ import { VectorStore } from '@langchain/core/vectorstores' import { VectorStoreDriver } from './driver/Base' import { TypeORMDriver } from './driver/TypeORM' // import { PGVectorDriver } from './driver/PGVector' -import { getContentColumnName, getDatabase, getHost, getPort, getTableName } from './utils' +import { getContentColumnName, getDatabase, getHost, getPort, getTableName, getSchema } from './utils' const serverCredentialsExists = !!process.env.POSTGRES_VECTORSTORE_USER && !!process.env.POSTGRES_VECTORSTORE_PASSWORD @@ -119,6 +119,14 @@ class Postgres_VectorStores implements INode { additionalParams: true, optional: true }, + { + label: 'Schema', + name: 'schema', + type: 'string', + placeholder: getSchema(), + additionalParams: true, + optional: true + }, /*{ label: 'Driver', name: 'driver', diff --git a/packages/components/nodes/vectorstores/Postgres/driver/Base.ts b/packages/components/nodes/vectorstores/Postgres/driver/Base.ts index f117a065cd2..37d87b4892d 100644 --- a/packages/components/nodes/vectorstores/Postgres/driver/Base.ts +++ b/packages/components/nodes/vectorstores/Postgres/driver/Base.ts @@ -2,7 +2,7 @@ import { VectorStore } from '@langchain/core/vectorstores' import { getCredentialData, getCredentialParam, ICommonObject, INodeData } from '../../../../src' import { Document } from '@langchain/core/documents' import { Embeddings } from '@langchain/core/embeddings' -import { getDatabase, getHost, getPort, getSSL, getTableName } from '../utils' +import { getDatabase, getHost, getPort, getSchema, getSSL, getTableName } from '../utils' export abstract class VectorStoreDriver { constructor(protected nodeData: INodeData, protected options: ICommonObject) {} @@ -34,7 +34,9 @@ export abstract class VectorStoreDriver { getTableName() { return this.sanitizeTableName(getTableName(this.nodeData)) } - + getSchema(){ + return this.sanitizeSchema(getSchema(this.nodeData)) + } getEmbeddings() { return this.nodeData.inputs?.embeddings as Embeddings } @@ -50,7 +52,17 @@ export abstract class VectorStoreDriver { return tableName } + sanitizeSchema(schema: string): string { + // Trim and normalize case, turn whitespace into underscores + schema = schema.trim().toLowerCase().replace(/\s+/g, '_') + // Validate using a regex (alphanumeric and underscores only) + if (!/^[a-zA-Z0-9_]+$/.test(schema)) { + throw new Error('Invalid schema name') + } + + return schema + } async getCredentials() { const credentialData = await getCredentialData(this.nodeData.credential ?? '', this.options) const user = getCredentialParam('user', credentialData, this.nodeData, process.env.POSTGRES_VECTORSTORE_USER) diff --git a/packages/components/nodes/vectorstores/Postgres/driver/TypeORM.ts b/packages/components/nodes/vectorstores/Postgres/driver/TypeORM.ts index 3a0c5ab005e..bcf4ae21f7b 100644 --- a/packages/components/nodes/vectorstores/Postgres/driver/TypeORM.ts +++ b/packages/components/nodes/vectorstores/Postgres/driver/TypeORM.ts @@ -13,6 +13,7 @@ export class TypeORMDriver extends VectorStoreDriver { if (!this._postgresConnectionOptions) { const { user, password } = await this.getCredentials() const additionalConfig = this.nodeData.inputs?.additionalConfig as string + const schema = this.getSchema() || 'public' // Get schema let additionalConfiguration = {} @@ -33,7 +34,13 @@ export class TypeORMDriver extends VectorStoreDriver { username: user, // Required by TypeORMVectorStore user: user, // Required by Pool in similaritySearchVectorWithScore password: password, - database: this.getDatabase() + // schema: this.getSchema(), + database: this.getDatabase(), + extra: { + ...(additionalConfiguration as any)?.extra, + // Force PostgreSQL to use your schema + options: `-c search_path=${schema},public` + } } as DataSourceOptions // Prevent using default MySQL port, otherwise will throw uncaught error and crashing the app diff --git a/packages/components/nodes/vectorstores/Postgres/utils.ts b/packages/components/nodes/vectorstores/Postgres/utils.ts index 96d59f562e6..95b7c526c44 100644 --- a/packages/components/nodes/vectorstores/Postgres/utils.ts +++ b/packages/components/nodes/vectorstores/Postgres/utils.ts @@ -19,7 +19,9 @@ export function getSSL(nodeData?: INodeData) { export function getTableName(nodeData?: INodeData) { return defaultChain(nodeData?.inputs?.tableName, process.env.POSTGRES_VECTORSTORE_TABLE_NAME, 'documents') } - +export function getSchema(nodeData?: INodeData) { + return defaultChain(nodeData?.inputs?.schema, process.env.DATABASE_SCHEMA, 'public') +} export function getContentColumnName(nodeData?: INodeData) { return defaultChain(nodeData?.inputs?.contentColumnName, process.env.POSTGRES_VECTORSTORE_CONTENT_COLUMN_NAME, 'pageContent') } diff --git a/packages/components/nodes/vectorstores/Singlestore/Singlestore.ts b/packages/components/nodes/vectorstores/Singlestore/Singlestore.ts index 896d409e939..ad6e6ee0796 100644 --- a/packages/components/nodes/vectorstores/Singlestore/Singlestore.ts +++ b/packages/components/nodes/vectorstores/Singlestore/Singlestore.ts @@ -131,6 +131,7 @@ class SingleStore_VectorStores implements INode { database: nodeData.inputs?.database as string }, ...(nodeData.inputs?.tableName ? { tableName: nodeData.inputs.tableName as string } : {}), + ...(nodeData.inputs?.schema ? { schema: nodeData.inputs.schema as string } : {}), ...(nodeData.inputs?.contentColumnName ? { contentColumnName: nodeData.inputs.contentColumnName as string } : {}), ...(nodeData.inputs?.vectorColumnName ? { vectorColumnName: nodeData.inputs.vectorColumnName as string } : {}), ...(nodeData.inputs?.metadataColumnName ? { metadataColumnName: nodeData.inputs.metadataColumnName as string } : {}) @@ -171,6 +172,7 @@ class SingleStore_VectorStores implements INode { database: nodeData.inputs?.database as string }, ...(nodeData.inputs?.tableName ? { tableName: nodeData.inputs.tableName as string } : {}), + ...(nodeData.inputs?.schema ? { schema: nodeData.inputs.schema as string } : {}), ...(nodeData.inputs?.contentColumnName ? { contentColumnName: nodeData.inputs.contentColumnName as string } : {}), ...(nodeData.inputs?.vectorColumnName ? { vectorColumnName: nodeData.inputs.vectorColumnName as string } : {}), ...(nodeData.inputs?.metadataColumnName ? { metadataColumnName: nodeData.inputs.metadataColumnName as string } : {}) diff --git a/packages/server/src/DataSource.ts b/packages/server/src/DataSource.ts index b5d42a40118..06996ee44e2 100644 --- a/packages/server/src/DataSource.ts +++ b/packages/server/src/DataSource.ts @@ -78,7 +78,6 @@ export const init = async (): Promise => { migrationsRun: false, entities: Object.values(entities), migrations: postgresMigrations, - migrationsTableName: 'migrations', extra: { idleTimeoutMillis: 120000, // set the search_path for the migrations and queries to the desired schema diff --git a/packages/server/src/enterprise/middleware/passport/SessionPersistance.ts b/packages/server/src/enterprise/middleware/passport/SessionPersistance.ts index 67e5ef36b51..e9ea6d012d6 100644 --- a/packages/server/src/enterprise/middleware/passport/SessionPersistance.ts +++ b/packages/server/src/enterprise/middleware/passport/SessionPersistance.ts @@ -74,7 +74,7 @@ export const initializeDBClientAndStore: any = () => { return new pgSession({ pool: pgPool, // Connection pool tableName: 'login_sessions', - schemaName: 'public', + schemaName: process.env.DATABASE_SCHEMA || 'public', createTableIfMissing: true }) } From 7c0116e23b00a428d16fef9f2aeb3dd6c044c09b Mon Sep 17 00:00:00 2001 From: Abdulaziz Alwaalan Date: Wed, 29 Oct 2025 15:50:43 +0300 Subject: [PATCH 07/12] refactor for cleaner logic --- packages/server/src/DataSource.ts | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/packages/server/src/DataSource.ts b/packages/server/src/DataSource.ts index 06996ee44e2..a1fe9638005 100644 --- a/packages/server/src/DataSource.ts +++ b/packages/server/src/DataSource.ts @@ -63,8 +63,6 @@ export const init = async (): Promise => { }) break case 'postgres': - const dbSchema = process.env.DATABASE_SCHEMA - const isCustomSchema = dbSchema && dbSchema !== 'public' appDataSource = new DataSource({ type: 'postgres', host: process.env.DATABASE_HOST, @@ -72,7 +70,7 @@ export const init = async (): Promise => { username: process.env.DATABASE_USER, password: process.env.DATABASE_PASSWORD, database: process.env.DATABASE_NAME, - ...(isCustomSchema && { schema: dbSchema }), // set custom schema if provided + schema: process.env.DATABASE_SCHEMA || 'public', // set custom schema if provided ssl: getDatabaseSSLFromEnv(), synchronize: false, migrationsRun: false, @@ -81,7 +79,7 @@ export const init = async (): Promise => { extra: { idleTimeoutMillis: 120000, // set the search_path for the migrations and queries to the desired schema - ...(isCustomSchema && { options: `-c search_path=${dbSchema},public` }) + options: `-c search_path=${process.env.DATABASE_SCHEMA},public` }, logging: ['error', 'warn', 'info', 'log'], logger: 'advanced-console', From 22192778a3adfdc524b83638bbc2d00f9768a1f5 Mon Sep 17 00:00:00 2001 From: Abdulaziz Alwaalan Date: Wed, 29 Oct 2025 15:51:13 +0300 Subject: [PATCH 08/12] fix UI issues for vector store setup --- .../nodes/memory/AgentMemory/PostgresAgentMemory/pgSaver.ts | 3 +++ .../PostgresRecordManager/PostgresRecordManager.ts | 2 +- .../nodes/recordmanager/PostgresRecordManager/utils.ts | 2 +- .../components/nodes/vectorstores/Postgres/driver/TypeORM.ts | 2 +- packages/components/nodes/vectorstores/Postgres/utils.ts | 2 +- 5 files changed, 7 insertions(+), 4 deletions(-) diff --git a/packages/components/nodes/memory/AgentMemory/PostgresAgentMemory/pgSaver.ts b/packages/components/nodes/memory/AgentMemory/PostgresAgentMemory/pgSaver.ts index 59d6f720141..ca3ebab3fb1 100644 --- a/packages/components/nodes/memory/AgentMemory/PostgresAgentMemory/pgSaver.ts +++ b/packages/components/nodes/memory/AgentMemory/PostgresAgentMemory/pgSaver.ts @@ -5,6 +5,7 @@ import { DataSource } from 'typeorm' import { CheckpointTuple, SaverOptions, SerializerProtocol } from '../interface' import { IMessage, MemoryMethods } from '../../../../src/Interface' import { mapChatMessageToBaseMessage } from '../../../../src/utils' +import { getSchema } from '../../../vectorstores/Postgres/utils' export class PostgresSaver extends BaseCheckpointSaver implements MemoryMethods { protected isSetup: boolean @@ -17,6 +18,7 @@ export class PostgresSaver extends BaseCheckpointSaver implements MemoryMethods this.config = config const { threadId } = config this.threadId = threadId + this.tableName = getSchema() != 'public' ? `${getSchema()}.checkpoints` : 'checkpoints' } sanitizeTableName(tableName: string): string { @@ -154,6 +156,7 @@ CREATE TABLE IF NOT EXISTS ${tableName} ( const queryRunner = dataSource.createQueryRunner() const thread_id = config.configurable?.thread_id || this.threadId const tableName = this.sanitizeTableName(this.tableName) + let sql = `SELECT thread_id, checkpoint_id, parent_id, checkpoint, metadata FROM ${tableName} WHERE thread_id = $1` const args = [thread_id] diff --git a/packages/components/nodes/recordmanager/PostgresRecordManager/PostgresRecordManager.ts b/packages/components/nodes/recordmanager/PostgresRecordManager/PostgresRecordManager.ts index dfc30fa8de4..5cd7fb4f67e 100644 --- a/packages/components/nodes/recordmanager/PostgresRecordManager/PostgresRecordManager.ts +++ b/packages/components/nodes/recordmanager/PostgresRecordManager/PostgresRecordManager.ts @@ -75,7 +75,7 @@ class PostgresRecordManager_RecordManager implements INode { optional: true }, { - label: 'Schema Name', + label: 'Schema', name: 'schema', type: 'string', placeholder: getSchema(), diff --git a/packages/components/nodes/recordmanager/PostgresRecordManager/utils.ts b/packages/components/nodes/recordmanager/PostgresRecordManager/utils.ts index 8b0d7c45fff..211ce2e611e 100644 --- a/packages/components/nodes/recordmanager/PostgresRecordManager/utils.ts +++ b/packages/components/nodes/recordmanager/PostgresRecordManager/utils.ts @@ -8,7 +8,7 @@ export function getDatabase(nodeData?: INodeData) { return defaultChain(nodeData?.inputs?.database, process.env.POSTGRES_RECORDMANAGER_DATABASE) } export function getSchema(nodeData?: INodeData) { - return defaultChain(nodeData?.inputs?.schema, process.env.DATABASE_SCHEMA, 'public') + return defaultChain(nodeData?.inputs?.schema, process.env.POSTGRES_RECORDMANAGER_SCHEMA, 'public') } export function getPort(nodeData?: INodeData) { diff --git a/packages/components/nodes/vectorstores/Postgres/driver/TypeORM.ts b/packages/components/nodes/vectorstores/Postgres/driver/TypeORM.ts index bcf4ae21f7b..44dfe9e4871 100644 --- a/packages/components/nodes/vectorstores/Postgres/driver/TypeORM.ts +++ b/packages/components/nodes/vectorstores/Postgres/driver/TypeORM.ts @@ -13,7 +13,7 @@ export class TypeORMDriver extends VectorStoreDriver { if (!this._postgresConnectionOptions) { const { user, password } = await this.getCredentials() const additionalConfig = this.nodeData.inputs?.additionalConfig as string - const schema = this.getSchema() || 'public' // Get schema + const schema = this.getSchema() let additionalConfiguration = {} diff --git a/packages/components/nodes/vectorstores/Postgres/utils.ts b/packages/components/nodes/vectorstores/Postgres/utils.ts index 95b7c526c44..6635ca49df6 100644 --- a/packages/components/nodes/vectorstores/Postgres/utils.ts +++ b/packages/components/nodes/vectorstores/Postgres/utils.ts @@ -20,7 +20,7 @@ export function getTableName(nodeData?: INodeData) { return defaultChain(nodeData?.inputs?.tableName, process.env.POSTGRES_VECTORSTORE_TABLE_NAME, 'documents') } export function getSchema(nodeData?: INodeData) { - return defaultChain(nodeData?.inputs?.schema, process.env.DATABASE_SCHEMA, 'public') + return defaultChain(nodeData?.inputs?.schema, process.env.POSTGRES_VECTORSTORE_SCHEMA, 'public') } export function getContentColumnName(nodeData?: INodeData) { return defaultChain(nodeData?.inputs?.contentColumnName, process.env.POSTGRES_VECTORSTORE_CONTENT_COLUMN_NAME, 'pageContent') From 41faaff20ef5bb8a375d7d1375ad5c2685f5caf6 Mon Sep 17 00:00:00 2001 From: Abdulaziz Alwaalan Date: Wed, 29 Oct 2025 16:16:47 +0300 Subject: [PATCH 09/12] Clean up: remove changes unrelated to PR scope --- .../components/nodes/vectorstores/Singlestore/Singlestore.ts | 2 -- 1 file changed, 2 deletions(-) diff --git a/packages/components/nodes/vectorstores/Singlestore/Singlestore.ts b/packages/components/nodes/vectorstores/Singlestore/Singlestore.ts index ad6e6ee0796..896d409e939 100644 --- a/packages/components/nodes/vectorstores/Singlestore/Singlestore.ts +++ b/packages/components/nodes/vectorstores/Singlestore/Singlestore.ts @@ -131,7 +131,6 @@ class SingleStore_VectorStores implements INode { database: nodeData.inputs?.database as string }, ...(nodeData.inputs?.tableName ? { tableName: nodeData.inputs.tableName as string } : {}), - ...(nodeData.inputs?.schema ? { schema: nodeData.inputs.schema as string } : {}), ...(nodeData.inputs?.contentColumnName ? { contentColumnName: nodeData.inputs.contentColumnName as string } : {}), ...(nodeData.inputs?.vectorColumnName ? { vectorColumnName: nodeData.inputs.vectorColumnName as string } : {}), ...(nodeData.inputs?.metadataColumnName ? { metadataColumnName: nodeData.inputs.metadataColumnName as string } : {}) @@ -172,7 +171,6 @@ class SingleStore_VectorStores implements INode { database: nodeData.inputs?.database as string }, ...(nodeData.inputs?.tableName ? { tableName: nodeData.inputs.tableName as string } : {}), - ...(nodeData.inputs?.schema ? { schema: nodeData.inputs.schema as string } : {}), ...(nodeData.inputs?.contentColumnName ? { contentColumnName: nodeData.inputs.contentColumnName as string } : {}), ...(nodeData.inputs?.vectorColumnName ? { vectorColumnName: nodeData.inputs.vectorColumnName as string } : {}), ...(nodeData.inputs?.metadataColumnName ? { metadataColumnName: nodeData.inputs.metadataColumnName as string } : {}) From c560aece3079ee94e244b27912a0d147346efc64 Mon Sep 17 00:00:00 2001 From: Abdulaziz Alwaalan Date: Sun, 2 Nov 2025 23:28:17 +0300 Subject: [PATCH 10/12] Fix linting errors --- .../PostgresRecordManager/PostgresRecordManager.ts | 5 ++++- .../components/nodes/vectorstores/Postgres/driver/Base.ts | 2 +- packages/components/nodes/vectorstores/Postgres/utils.ts | 2 +- 3 files changed, 6 insertions(+), 3 deletions(-) diff --git a/packages/components/nodes/recordmanager/PostgresRecordManager/PostgresRecordManager.ts b/packages/components/nodes/recordmanager/PostgresRecordManager/PostgresRecordManager.ts index 5cd7fb4f67e..704ec40c436 100644 --- a/packages/components/nodes/recordmanager/PostgresRecordManager/PostgresRecordManager.ts +++ b/packages/components/nodes/recordmanager/PostgresRecordManager/PostgresRecordManager.ts @@ -199,7 +199,10 @@ class PostgresRecordManager implements RecordManagerInterface { namespace: string constructor(namespace: string, config: PostgresRecordManagerOptions) { - const { tableName, postgresConnectionOptions: { schema }} = config + const { + tableName, + postgresConnectionOptions: { schema } + } = config this.namespace = namespace this.tableName = tableName this.schema = schema diff --git a/packages/components/nodes/vectorstores/Postgres/driver/Base.ts b/packages/components/nodes/vectorstores/Postgres/driver/Base.ts index 37d87b4892d..1051aba051d 100644 --- a/packages/components/nodes/vectorstores/Postgres/driver/Base.ts +++ b/packages/components/nodes/vectorstores/Postgres/driver/Base.ts @@ -34,7 +34,7 @@ export abstract class VectorStoreDriver { getTableName() { return this.sanitizeTableName(getTableName(this.nodeData)) } - getSchema(){ + getSchema() { return this.sanitizeSchema(getSchema(this.nodeData)) } getEmbeddings() { diff --git a/packages/components/nodes/vectorstores/Postgres/utils.ts b/packages/components/nodes/vectorstores/Postgres/utils.ts index 6635ca49df6..0474afef1dc 100644 --- a/packages/components/nodes/vectorstores/Postgres/utils.ts +++ b/packages/components/nodes/vectorstores/Postgres/utils.ts @@ -20,7 +20,7 @@ export function getTableName(nodeData?: INodeData) { return defaultChain(nodeData?.inputs?.tableName, process.env.POSTGRES_VECTORSTORE_TABLE_NAME, 'documents') } export function getSchema(nodeData?: INodeData) { - return defaultChain(nodeData?.inputs?.schema, process.env.POSTGRES_VECTORSTORE_SCHEMA, 'public') + return defaultChain(nodeData?.inputs?.schema, process.env.POSTGRES_VECTORSTORE_SCHEMA, 'public') } export function getContentColumnName(nodeData?: INodeData) { return defaultChain(nodeData?.inputs?.contentColumnName, process.env.POSTGRES_VECTORSTORE_CONTENT_COLUMN_NAME, 'pageContent') From 7e9d3b1477bde5bf593d5596f3859d0f9d2097e1 Mon Sep 17 00:00:00 2001 From: Henry Heng Date: Wed, 5 Nov 2025 15:37:32 +0000 Subject: [PATCH 11/12] Update .env.example --- docker/worker/.env.example | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docker/worker/.env.example b/docker/worker/.env.example index 4f479dddbce..5b27a4a3c98 100644 --- a/docker/worker/.env.example +++ b/docker/worker/.env.example @@ -11,7 +11,7 @@ DATABASE_PATH=/root/.flowise # DATABASE_PORT=5432 # DATABASE_HOST="" # DATABASE_NAME=flowise -# # DATABASE_SCHEMA: default, +# DATABASE_SCHEMA=default # DATABASE_USER=root # DATABASE_PASSWORD=mypassword # DATABASE_SSL=true From 8076bd5afd62bed8079082a0e2c69c935520d0be Mon Sep 17 00:00:00 2001 From: Henry Heng Date: Wed, 5 Nov 2025 15:37:57 +0000 Subject: [PATCH 12/12] Update .env.example --- docker/.env.example | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docker/.env.example b/docker/.env.example index 47573ecc50d..e403801370a 100644 --- a/docker/.env.example +++ b/docker/.env.example @@ -11,7 +11,7 @@ DATABASE_PATH=/root/.flowise # DATABASE_PORT=5432 # DATABASE_HOST="" # DATABASE_NAME=flowise -# DATABASE_SCHEMA: default, +# DATABASE_SCHEMA=default # DATABASE_USER=root # DATABASE_PASSWORD=mypassword # DATABASE_SSL=true