Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ Flowise support different environment variables to configure your instance. You
| DATABASE_USER | Database username (When DATABASE_TYPE is not sqlite) | String | |
| DATABASE_PASSWORD | Database password (When DATABASE_TYPE is not sqlite) | String | |
| DATABASE_NAME | Database name (When DATABASE_TYPE is not sqlite) | String | |
| DATABASE_SCHEMA | Database schema (When DATABASE_TYPE is postgres) | String | |
| DATABASE_SSL_KEY_BASE64 | Database SSL client cert in base64 (takes priority over DATABASE_SSL) | Boolean | false |
| DATABASE_SSL | Database connection overssl (When DATABASE_TYPE is postgre) | Boolean | false |
| SECRETKEY_PATH | Location where encryption key (used to encrypt/decrypt credentials) is saved | String | `your-path/Flowise/packages/server` |
Expand Down
1 change: 1 addition & 0 deletions docker/.env.example
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ DATABASE_PATH=/root/.flowise
# DATABASE_PORT=5432
# DATABASE_HOST=""
# DATABASE_NAME=flowise
# DATABASE_SCHEMA=default
# DATABASE_USER=root
# DATABASE_PASSWORD=mypassword
# DATABASE_SSL=true
Expand Down
2 changes: 2 additions & 0 deletions docker/docker-compose-queue-prebuilt.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ services:
- DATABASE_PORT=${DATABASE_PORT}
- DATABASE_HOST=${DATABASE_HOST}
- DATABASE_NAME=${DATABASE_NAME}
- DATABASE_SCHEMA=${DATABASE_SCHEMA}
- DATABASE_USER=${DATABASE_USER}
- DATABASE_PASSWORD=${DATABASE_PASSWORD}
- DATABASE_SSL=${DATABASE_SSL}
Expand Down Expand Up @@ -172,6 +173,7 @@ services:
- DATABASE_PORT=${DATABASE_PORT}
- DATABASE_HOST=${DATABASE_HOST}
- DATABASE_NAME=${DATABASE_NAME}
- DATABASE_SCHEMA=${DATABASE_SCHEMA}
- DATABASE_USER=${DATABASE_USER}
- DATABASE_PASSWORD=${DATABASE_PASSWORD}
- DATABASE_SSL=${DATABASE_SSL}
Expand Down
1 change: 1 addition & 0 deletions docker/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ services:
- DATABASE_PORT=${DATABASE_PORT}
- DATABASE_HOST=${DATABASE_HOST}
- DATABASE_NAME=${DATABASE_NAME}
- DATABASE_SCHEMA=${DATABASE_SCHEMA}
- DATABASE_USER=${DATABASE_USER}
- DATABASE_PASSWORD=${DATABASE_PASSWORD}
- DATABASE_SSL=${DATABASE_SSL}
Expand Down
1 change: 1 addition & 0 deletions docker/worker/.env.example
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ DATABASE_PATH=/root/.flowise
# DATABASE_PORT=5432
# DATABASE_HOST=""
# DATABASE_NAME=flowise
# DATABASE_SCHEMA=default
# DATABASE_USER=root
# DATABASE_PASSWORD=mypassword
# DATABASE_SSL=true
Expand Down
1 change: 1 addition & 0 deletions docker/worker/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ services:
- DATABASE_PORT=${DATABASE_PORT}
- DATABASE_HOST=${DATABASE_HOST}
- DATABASE_NAME=${DATABASE_NAME}
- DATABASE_SCHEMA=${DATABASE_SCHEMA}
- DATABASE_USER=${DATABASE_USER}
- DATABASE_PASSWORD=${DATABASE_PASSWORD}
- DATABASE_SSL=${DATABASE_SSL}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { DataSource } from 'typeorm'
import { CheckpointTuple, SaverOptions, SerializerProtocol } from '../interface'
import { IMessage, MemoryMethods } from '../../../../src/Interface'
import { mapChatMessageToBaseMessage } from '../../../../src/utils'
import { getSchema } from '../../../vectorstores/Postgres/utils'

export class PostgresSaver extends BaseCheckpointSaver implements MemoryMethods {
protected isSetup: boolean
Expand All @@ -17,6 +18,7 @@ export class PostgresSaver extends BaseCheckpointSaver implements MemoryMethods
this.config = config
const { threadId } = config
this.threadId = threadId
this.tableName = getSchema() != 'public' ? `${getSchema()}.checkpoints` : 'checkpoints'
}

sanitizeTableName(tableName: string): string {
Expand Down Expand Up @@ -154,6 +156,7 @@ CREATE TABLE IF NOT EXISTS ${tableName} (
const queryRunner = dataSource.createQueryRunner()
const thread_id = config.configurable?.thread_id || this.threadId
const tableName = this.sanitizeTableName(this.tableName)

let sql = `SELECT thread_id, checkpoint_id, parent_id, checkpoint, metadata FROM ${tableName} WHERE thread_id = $1`
const args = [thread_id]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { getBaseClasses, getCredentialData, getCredentialParam } from '../../../
import { ListKeyOptions, RecordManagerInterface, UpdateOptions } from '@langchain/community/indexes/base'
import { DataSource } from 'typeorm'
import { getHost, getSSL } from '../../vectorstores/Postgres/utils'
import { getDatabase, getPort, getTableName } from './utils'
import { getDatabase, getPort, getSchema, getTableName } from './utils'

const serverCredentialsExists = !!process.env.POSTGRES_RECORDMANAGER_USER && !!process.env.POSTGRES_RECORDMANAGER_PASSWORD

Expand Down Expand Up @@ -74,6 +74,14 @@ class PostgresRecordManager_RecordManager implements INode {
additionalParams: true,
optional: true
},
{
label: 'Schema',
name: 'schema',
type: 'string',
placeholder: getSchema(),
additionalParams: true,
optional: true
},
{
label: 'Namespace',
name: 'namespace',
Expand Down Expand Up @@ -160,7 +168,8 @@ class PostgresRecordManager_RecordManager implements INode {
ssl: getSSL(nodeData),
username: user,
password: password,
database: getDatabase(nodeData)
database: getDatabase(nodeData),
schema: getSchema(nodeData)
}

const args = {
Expand All @@ -186,12 +195,17 @@ class PostgresRecordManager implements RecordManagerInterface {
lc_namespace = ['langchain', 'recordmanagers', 'postgres']
config: PostgresRecordManagerOptions
tableName: string
schema: string
namespace: string

constructor(namespace: string, config: PostgresRecordManagerOptions) {
const { tableName } = config
const {
tableName,
postgresConnectionOptions: { schema }
} = config
this.namespace = namespace
this.tableName = tableName
this.schema = schema
this.config = config
}

Expand All @@ -206,7 +220,17 @@ class PostgresRecordManager implements RecordManagerInterface {

return tableName
}
sanitizeSchema(schema: string): string {
// Trim and normalize case, turn whitespace into underscores
schema = schema.trim().toLowerCase().replace(/\s+/g, '_')

// Validate using a regex (alphanumeric and underscores only)
if (!/^[a-zA-Z0-9_]+$/.test(schema)) {
throw new Error('Invalid table name')
}

return schema
}
private async getDataSource(): Promise<DataSource> {
const { postgresConnectionOptions } = this.config
if (!postgresConnectionOptions) {
Expand All @@ -225,21 +249,20 @@ class PostgresRecordManager implements RecordManagerInterface {
try {
const dataSource = await this.getDataSource()
const queryRunner = dataSource.createQueryRunner()
const tableName = this.sanitizeTableName(this.tableName)

const fullTableName = this.getFullTableName()
await queryRunner.manager.query(`
CREATE TABLE IF NOT EXISTS "${tableName}" (
CREATE TABLE IF NOT EXISTS ${fullTableName} (
uuid UUID PRIMARY KEY DEFAULT gen_random_uuid(),
key TEXT NOT NULL,
namespace TEXT NOT NULL,
updated_at Double PRECISION NOT NULL,
group_id TEXT,
UNIQUE (key, namespace)
);
CREATE INDEX IF NOT EXISTS updated_at_index ON "${tableName}" (updated_at);
CREATE INDEX IF NOT EXISTS key_index ON "${tableName}" (key);
CREATE INDEX IF NOT EXISTS namespace_index ON "${tableName}" (namespace);
CREATE INDEX IF NOT EXISTS group_id_index ON "${tableName}" (group_id);`)
CREATE INDEX IF NOT EXISTS updated_at_index ON ${fullTableName} (updated_at);
CREATE INDEX IF NOT EXISTS key_index ON ${fullTableName} (key);
CREATE INDEX IF NOT EXISTS namespace_index ON ${fullTableName} (namespace);
CREATE INDEX IF NOT EXISTS group_id_index ON ${fullTableName} (group_id);`)

await queryRunner.release()
} catch (e: any) {
Expand Down Expand Up @@ -291,8 +314,7 @@ class PostgresRecordManager implements RecordManagerInterface {

const dataSource = await this.getDataSource()
const queryRunner = dataSource.createQueryRunner()
const tableName = this.sanitizeTableName(this.tableName)

const fullTableName = this.getFullTableName()
const updatedAt = await this.getTime()
const { timeAtLeast, groupIds: _groupIds } = updateOptions ?? {}

Expand All @@ -310,7 +332,7 @@ class PostgresRecordManager implements RecordManagerInterface {

const valuesPlaceholders = recordsToUpsert.map((_, j) => this.generatePlaceholderForRowAt(j, recordsToUpsert[0].length)).join(', ')

const query = `INSERT INTO "${tableName}" (key, namespace, updated_at, group_id) VALUES ${valuesPlaceholders} ON CONFLICT (key, namespace) DO UPDATE SET updated_at = EXCLUDED.updated_at;`
const query = `INSERT INTO ${fullTableName} (key, namespace, updated_at, group_id) VALUES ${valuesPlaceholders} ON CONFLICT (key, namespace) DO UPDATE SET updated_at = EXCLUDED.updated_at;`
try {
await queryRunner.manager.query(query, recordsToUpsert.flat())
await queryRunner.release()
Expand All @@ -329,13 +351,12 @@ class PostgresRecordManager implements RecordManagerInterface {

const dataSource = await this.getDataSource()
const queryRunner = dataSource.createQueryRunner()
const tableName = this.sanitizeTableName(this.tableName)

const fullTableName = this.getFullTableName()
const startIndex = 2
const arrayPlaceholders = keys.map((_, i) => `$${i + startIndex}`).join(', ')

const query = `
SELECT k, (key is not null) ex from unnest(ARRAY[${arrayPlaceholders}]) k left join "${tableName}" on k=key and namespace = $1;
SELECT k, (key is not null) ex from unnest(ARRAY[${arrayPlaceholders}]) k left join ${fullTableName} on k=key and namespace = $1;
`
try {
const res = await queryRunner.manager.query(query, [this.namespace, ...keys.flat()])
Expand All @@ -351,9 +372,9 @@ class PostgresRecordManager implements RecordManagerInterface {

async listKeys(options?: ListKeyOptions): Promise<string[]> {
const { before, after, limit, groupIds } = options ?? {}
const tableName = this.sanitizeTableName(this.tableName)
const fullTableName = this.getFullTableName()

let query = `SELECT key FROM "${tableName}" WHERE namespace = $1`
let query = `SELECT key FROM ${fullTableName} WHERE namespace = $1`
const values: (string | number | (string | null)[])[] = [this.namespace]

let index = 2
Expand Down Expand Up @@ -405,10 +426,10 @@ class PostgresRecordManager implements RecordManagerInterface {

const dataSource = await this.getDataSource()
const queryRunner = dataSource.createQueryRunner()
const tableName = this.sanitizeTableName(this.tableName)
const fullTableName = this.getFullTableName()

try {
const query = `DELETE FROM "${tableName}" WHERE namespace = $1 AND key = ANY($2);`
const query = `DELETE FROM ${fullTableName} WHERE namespace = $1 AND key = ANY($2);`
await queryRunner.manager.query(query, [this.namespace, keys])
await queryRunner.release()
} catch (error) {
Expand All @@ -418,6 +439,11 @@ class PostgresRecordManager implements RecordManagerInterface {
await dataSource.destroy()
}
}
getFullTableName(): string {
const tableName = this.sanitizeTableName(this.tableName)
const schema = this.sanitizeSchema(this.schema)
return schema ? `${schema}.${tableName}` : tableName
}
}

module.exports = { nodeClass: PostgresRecordManager_RecordManager }
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ export function getHost(nodeData?: INodeData) {
export function getDatabase(nodeData?: INodeData) {
return defaultChain(nodeData?.inputs?.database, process.env.POSTGRES_RECORDMANAGER_DATABASE)
}
export function getSchema(nodeData?: INodeData) {
return defaultChain(nodeData?.inputs?.schema, process.env.POSTGRES_RECORDMANAGER_SCHEMA, 'public')
}

export function getPort(nodeData?: INodeData) {
return defaultChain(nodeData?.inputs?.port, process.env.POSTGRES_RECORDMANAGER_PORT, '5432')
Expand Down
10 changes: 9 additions & 1 deletion packages/components/nodes/vectorstores/Postgres/Postgres.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import { VectorStore } from '@langchain/core/vectorstores'
import { VectorStoreDriver } from './driver/Base'
import { TypeORMDriver } from './driver/TypeORM'
// import { PGVectorDriver } from './driver/PGVector'
import { getContentColumnName, getDatabase, getHost, getPort, getTableName } from './utils'
import { getContentColumnName, getDatabase, getHost, getPort, getTableName, getSchema } from './utils'

const serverCredentialsExists = !!process.env.POSTGRES_VECTORSTORE_USER && !!process.env.POSTGRES_VECTORSTORE_PASSWORD

Expand Down Expand Up @@ -119,6 +119,14 @@ class Postgres_VectorStores implements INode {
additionalParams: true,
optional: true
},
{
label: 'Schema',
name: 'schema',
type: 'string',
placeholder: getSchema(),
additionalParams: true,
optional: true
},
/*{
label: 'Driver',
name: 'driver',
Expand Down
16 changes: 14 additions & 2 deletions packages/components/nodes/vectorstores/Postgres/driver/Base.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { VectorStore } from '@langchain/core/vectorstores'
import { getCredentialData, getCredentialParam, ICommonObject, INodeData } from '../../../../src'
import { Document } from '@langchain/core/documents'
import { Embeddings } from '@langchain/core/embeddings'
import { getDatabase, getHost, getPort, getSSL, getTableName } from '../utils'
import { getDatabase, getHost, getPort, getSchema, getSSL, getTableName } from '../utils'

export abstract class VectorStoreDriver {
constructor(protected nodeData: INodeData, protected options: ICommonObject) {}
Expand Down Expand Up @@ -34,7 +34,9 @@ export abstract class VectorStoreDriver {
getTableName() {
return this.sanitizeTableName(getTableName(this.nodeData))
}

getSchema() {
return this.sanitizeSchema(getSchema(this.nodeData))
}
getEmbeddings() {
return this.nodeData.inputs?.embeddings as Embeddings
}
Expand All @@ -50,7 +52,17 @@ export abstract class VectorStoreDriver {

return tableName
}
sanitizeSchema(schema: string): string {
// Trim and normalize case, turn whitespace into underscores
schema = schema.trim().toLowerCase().replace(/\s+/g, '_')

// Validate using a regex (alphanumeric and underscores only)
if (!/^[a-zA-Z0-9_]+$/.test(schema)) {
throw new Error('Invalid schema name')
}

return schema
}
async getCredentials() {
const credentialData = await getCredentialData(this.nodeData.credential ?? '', this.options)
const user = getCredentialParam('user', credentialData, this.nodeData, process.env.POSTGRES_VECTORSTORE_USER)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ export class TypeORMDriver extends VectorStoreDriver {
if (!this._postgresConnectionOptions) {
const { user, password } = await this.getCredentials()
const additionalConfig = this.nodeData.inputs?.additionalConfig as string
const schema = this.getSchema()

let additionalConfiguration = {}

Expand All @@ -33,7 +34,13 @@ export class TypeORMDriver extends VectorStoreDriver {
username: user, // Required by TypeORMVectorStore
user: user, // Required by Pool in similaritySearchVectorWithScore
password: password,
database: this.getDatabase()
// schema: this.getSchema(),
database: this.getDatabase(),
extra: {
...(additionalConfiguration as any)?.extra,
// Force PostgreSQL to use your schema
options: `-c search_path=${schema},public`
}
} as DataSourceOptions

// Prevent using default MySQL port, otherwise will throw uncaught error and crashing the app
Expand Down
4 changes: 3 additions & 1 deletion packages/components/nodes/vectorstores/Postgres/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ export function getSSL(nodeData?: INodeData) {
export function getTableName(nodeData?: INodeData) {
return defaultChain(nodeData?.inputs?.tableName, process.env.POSTGRES_VECTORSTORE_TABLE_NAME, 'documents')
}

export function getSchema(nodeData?: INodeData) {
return defaultChain(nodeData?.inputs?.schema, process.env.POSTGRES_VECTORSTORE_SCHEMA, 'public')
}
export function getContentColumnName(nodeData?: INodeData) {
return defaultChain(nodeData?.inputs?.contentColumnName, process.env.POSTGRES_VECTORSTORE_CONTENT_COLUMN_NAME, 'pageContent')
}
5 changes: 4 additions & 1 deletion packages/server/src/DataSource.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,13 +70,16 @@ export const init = async (): Promise<void> => {
username: process.env.DATABASE_USER,
password: process.env.DATABASE_PASSWORD,
database: process.env.DATABASE_NAME,
schema: process.env.DATABASE_SCHEMA || 'public', // set custom schema if provided
ssl: getDatabaseSSLFromEnv(),
synchronize: false,
migrationsRun: false,
entities: Object.values(entities),
migrations: postgresMigrations,
extra: {
idleTimeoutMillis: 120000
idleTimeoutMillis: 120000,
// set the search_path for the migrations and queries to the desired schema
options: `-c search_path=${process.env.DATABASE_SCHEMA},public`
Comment on lines +80 to +82
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Potential issue: Undefined schema in search_path.

If DATABASE_SCHEMA is not set, the template literal on line 82 will produce "undefined,public" instead of just "public". This could cause PostgreSQL to search for a schema named "undefined".

Apply this diff to fix the issue:

-                options: `-c search_path=${process.env.DATABASE_SCHEMA},public`
+                options: `-c search_path=${process.env.DATABASE_SCHEMA || 'public'},public`
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
idleTimeoutMillis: 120000,
// set the search_path for the migrations and queries to the desired schema
options: `-c search_path=${process.env.DATABASE_SCHEMA},public`
idleTimeoutMillis: 120000,
// set the search_path for the migrations and queries to the desired schema
options: `-c search_path=${process.env.DATABASE_SCHEMA || 'public'},public`
🤖 Prompt for AI Agents
In packages/server/src/DataSource.ts around lines 80 to 82, the options string
sets search_path using process.env.DATABASE_SCHEMA which can yield
"undefined,public" if the env var is missing; change the construction so it
falls back to only "public" or omits the custom schema when DATABASE_SCHEMA is
unset — e.g. compute a schema value = process.env.DATABASE_SCHEMA ?
`${process.env.DATABASE_SCHEMA},public` : 'public' and use that for options
(ensuring no literal "undefined" appears).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is valid find, please address

},
logging: ['error', 'warn', 'info', 'log'],
logger: 'advanced-console',
Expand Down
Loading