From c987bd3453f6660387946c116109175a0ba3231a Mon Sep 17 00:00:00 2001 From: Simon Herrmann Date: Tue, 23 Dec 2025 01:00:01 +0100 Subject: [PATCH 1/3] add function drops --- database/dbJob.go | 8 ++++++++ database/dbMaster.go | 8 ++++++++ database/dbWorker.go | 8 ++++++++ 3 files changed, 24 insertions(+) diff --git a/database/dbJob.go b/database/dbJob.go index 32256d0..3f7c1c1 100644 --- a/database/dbJob.go +++ b/database/dbJob.go @@ -134,6 +134,14 @@ func (r JobDBHandler) DropTables() error { return helper.NewError("job_archive table", err) } + for _, functionName := range loadSql.JobFunctions { + dropFunctionQuery := fmt.Sprintf(`DROP FUNCTION IF EXISTS %s;`, functionName) + _, err = r.db.Instance.ExecContext(ctx, dropFunctionQuery) + if err != nil { + return helper.NewError("drop function "+functionName, err) + } + } + r.db.Logger.Info("Dropped table job") return nil diff --git a/database/dbMaster.go b/database/dbMaster.go index 8a62b2a..73fbf63 100644 --- a/database/dbMaster.go +++ b/database/dbMaster.go @@ -96,6 +96,14 @@ func (r MasterDBHandler) DropTable() error { return helper.NewError("drop", err) } + for _, functionName := range loadSql.MasterFunctions { + dropFunctionQuery := fmt.Sprintf(`DROP FUNCTION IF EXISTS %s;`, functionName) + _, err = r.db.Instance.ExecContext(ctx, dropFunctionQuery) + if err != nil { + return helper.NewError("drop function "+functionName, err) + } + } + r.db.Logger.Info("Dropped table master") return nil diff --git a/database/dbWorker.go b/database/dbWorker.go index aaab0f9..7fee342 100644 --- a/database/dbWorker.go +++ b/database/dbWorker.go @@ -110,6 +110,14 @@ func (r WorkerDBHandler) DropTable() error { return helper.NewError("worker table", err) } + for _, functionName := range loadSql.WorkerFunctions { + dropFunctionQuery := fmt.Sprintf(`DROP FUNCTION IF EXISTS %s;`, functionName) + _, err = r.db.Instance.ExecContext(ctx, dropFunctionQuery) + if err != nil { + return helper.NewError("drop function "+functionName, err) + } + } + r.db.Logger.Info("Dropped table worker") return nil From 336ebd2e55411d12cfc6cfae46298d7c716958a9 Mon Sep 17 00:00:00 2001 From: Simon Herrmann Date: Tue, 23 Dec 2025 01:28:21 +0100 Subject: [PATCH 2/3] fix function drop tests --- database/dbJob.go | 27 +++++++++++------------- database/dbMaster.go | 23 +++++++++------------ database/dbWorker.go | 25 ++++++++++------------ helper/database.go | 49 ++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 82 insertions(+), 42 deletions(-) diff --git a/database/dbJob.go b/database/dbJob.go index 3f7c1c1..b3eed75 100644 --- a/database/dbJob.go +++ b/database/dbJob.go @@ -60,6 +60,18 @@ func NewJobDBHandler(dbConnection *helper.Database, withTableDrop bool, encrypti jobDbHandler.EncryptionKey = encryptionKey[0] } + if withTableDrop { + err := dbConnection.DropFunctionsFromPublicSchema(loadSql.JobFunctions) + if err != nil { + return nil, helper.NewError("drop job functions", err) + } + + err = jobDbHandler.DropTables() + if err != nil { + return nil, helper.NewError("drop tables", err) + } + } + err := loadSql.LoadJobSql(jobDbHandler.db.Instance, false) if err != nil { return nil, helper.NewError("load job sql", err) @@ -70,13 +82,6 @@ func NewJobDBHandler(dbConnection *helper.Database, withTableDrop bool, encrypti return nil, helper.NewError("load notify sql", err) } - if withTableDrop { - err := jobDbHandler.DropTables() - if err != nil { - return nil, helper.NewError("drop tables", err) - } - } - err = jobDbHandler.CreateTable() if err != nil { return nil, helper.NewError("create table", err) @@ -134,14 +139,6 @@ func (r JobDBHandler) DropTables() error { return helper.NewError("job_archive table", err) } - for _, functionName := range loadSql.JobFunctions { - dropFunctionQuery := fmt.Sprintf(`DROP FUNCTION IF EXISTS %s;`, functionName) - _, err = r.db.Instance.ExecContext(ctx, dropFunctionQuery) - if err != nil { - return helper.NewError("drop function "+functionName, err) - } - } - r.db.Logger.Info("Dropped table job") return nil diff --git a/database/dbMaster.go b/database/dbMaster.go index 73fbf63..3631ddd 100644 --- a/database/dbMaster.go +++ b/database/dbMaster.go @@ -37,18 +37,23 @@ func NewMasterDBHandler(dbConnection *helper.Database, withTableDrop bool) (*Mas db: dbConnection, } - err := loadSql.LoadMasterSql(masterDbHandler.db.Instance, false) - if err != nil { - return nil, helper.NewError("load master sql", err) - } - if withTableDrop { + err := dbConnection.DropFunctionsFromPublicSchema(loadSql.MasterFunctions) + if err != nil { + return nil, helper.NewError("drop master functions", err) + } + err = masterDbHandler.DropTable() if err != nil { return nil, helper.NewError("drop master table", err) } } + err := loadSql.LoadMasterSql(masterDbHandler.db.Instance, false) + if err != nil { + return nil, helper.NewError("load master sql", err) + } + err = masterDbHandler.CreateTable() if err != nil { return nil, helper.NewError("create master table", err) @@ -96,14 +101,6 @@ func (r MasterDBHandler) DropTable() error { return helper.NewError("drop", err) } - for _, functionName := range loadSql.MasterFunctions { - dropFunctionQuery := fmt.Sprintf(`DROP FUNCTION IF EXISTS %s;`, functionName) - _, err = r.db.Instance.ExecContext(ctx, dropFunctionQuery) - if err != nil { - return helper.NewError("drop function "+functionName, err) - } - } - r.db.Logger.Info("Dropped table master") return nil diff --git a/database/dbWorker.go b/database/dbWorker.go index 7fee342..ba55855 100644 --- a/database/dbWorker.go +++ b/database/dbWorker.go @@ -47,18 +47,23 @@ func NewWorkerDBHandler(dbConnection *helper.Database, withTableDrop bool) (*Wor db: dbConnection, } - err := loadSql.LoadWorkerSql(dbConnection.Instance, withTableDrop) - if err != nil { - return nil, helper.NewError("load worker sql", err) - } - if withTableDrop { - err := workerDbHandler.DropTable() + err := dbConnection.DropFunctionsFromPublicSchema(loadSql.WorkerFunctions) + if err != nil { + return nil, helper.NewError("drop worker functions", err) + } + + err = workerDbHandler.DropTable() if err != nil { return nil, helper.NewError("drop worker table", err) } } + err := loadSql.LoadWorkerSql(dbConnection.Instance, withTableDrop) + if err != nil { + return nil, helper.NewError("load worker sql", err) + } + err = workerDbHandler.CreateTable() if err != nil { return nil, helper.NewError("create worker table", err) @@ -110,14 +115,6 @@ func (r WorkerDBHandler) DropTable() error { return helper.NewError("worker table", err) } - for _, functionName := range loadSql.WorkerFunctions { - dropFunctionQuery := fmt.Sprintf(`DROP FUNCTION IF EXISTS %s;`, functionName) - _, err = r.db.Instance.ExecContext(ctx, dropFunctionQuery) - if err != nil { - return helper.NewError("drop function "+functionName, err) - } - } - r.db.Logger.Info("Dropped table worker") return nil diff --git a/helper/database.go b/helper/database.go index 96575a6..06ef059 100644 --- a/helper/database.go +++ b/helper/database.go @@ -316,3 +316,52 @@ func (d *Database) Close() error { log.Printf("Disconnected from database: %v", d.Instance) return d.Instance.Close() } + +// DropFunctionsFromPublicSchema drops user-defined functions from the public schema. +// It queries pg_proc to find all overloaded versions of the specified functions, +// filters to only public schema functions, excludes extension-owned functions, +// and drops each by its full signature. This is SQL injection safe as it uses +// parameterized queries for function name lookup. +func (d *Database) DropFunctionsFromPublicSchema(functionNames []string) error { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + for _, functionName := range functionNames { + // Query for all overloaded versions, filtering to public schema and excluding extensions + rows, queryErr := d.Instance.QueryContext(ctx, ` + SELECT pg_proc.oid::regprocedure::text + FROM pg_proc + JOIN pg_namespace ON pg_proc.pronamespace = pg_namespace.oid + WHERE pg_proc.proname = $1 + AND pg_namespace.nspname = 'public' + AND NOT EXISTS ( + SELECT 1 FROM pg_depend + WHERE objid = pg_proc.oid AND deptype = 'e' + ) + `, functionName) + if queryErr != nil { + return NewError("query function overloads "+functionName, queryErr) + } + defer rows.Close() + + var signatures []string + for rows.Next() { + var signature string + if scanErr := rows.Scan(&signature); scanErr != nil { + return NewError("scan function signature", scanErr) + } + signatures = append(signatures, signature) + } + + // Drop each overloaded function by its full signature + for _, signature := range signatures { + dropQuery := fmt.Sprintf(`DROP FUNCTION IF EXISTS %s;`, signature) + _, err := d.Instance.ExecContext(ctx, dropQuery) + if err != nil { + return NewError("drop function "+signature, err) + } + } + } + + return nil +} From d1a3ce1c4dfd7f458c632f946516ad63775774b6 Mon Sep 17 00:00:00 2001 From: Simon Herrmann Date: Tue, 23 Dec 2025 02:45:00 +0100 Subject: [PATCH 3/3] make tests more robust --- queuerJob_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/queuerJob_test.go b/queuerJob_test.go index 0c5bf7a..ea2ab43 100644 --- a/queuerJob_test.go +++ b/queuerJob_test.go @@ -133,7 +133,7 @@ func TestAddJobRunning(t *testing.T) { t.Run("Successfully runs a job with schedule options once", func(t *testing.T) { options := &model.Options{ Schedule: &model.Schedule{ - Start: time.Now().Add(1 * time.Second), + Start: time.Now().Add(2 * time.Second), MaxCount: 1, Interval: 15 * time.Second, }, @@ -144,8 +144,8 @@ func TestAddJobRunning(t *testing.T) { queuedJob, err := testQueuer.GetJob(job.RID) require.NoError(t, err, "GetJob should not return an error") - require.NotNil(t, queuedJob, "GetJob should return the job that is currently running") - assert.Equal(t, model.JobStatusScheduled, queuedJob.Status, "Job should be in Running status") + require.NotNil(t, queuedJob, "GetJob should return the scheduled job") + assert.Equal(t, model.JobStatusScheduled, queuedJob.Status, "Job should be in Scheduled status") job = testQueuer.WaitForJobFinished(job.RID, 5*time.Second) assert.NotNil(t, job, "WaitForJobFinished should return the finished job")