-
Notifications
You must be signed in to change notification settings - Fork 14
feat: require pgmq 1.5.1+ and remove compatibility layer #301
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: chore-update-supabase-versions
Are you sure you want to change the base?
feat: require pgmq 1.5.1+ and remove compatibility layer #301
Conversation
🦋 Changeset detectedLatest commit: 3af7bb9 The changes in this PR will be included in the next version bump. This PR includes changesets to release 8 packages
Not sure what this means? Click here to learn what changesets are. Click here if you're a maintainer who wants to add another changeset to this PR |
|
Warning This pull request is not mergeable via GitHub because a downstack PR is open. Once all requirements are satisfied, merge this PR as a stack on Graphite.
How to use the Graphite Merge QueueAdd either label to this PR to merge it via the merge queue:
You must have a Graphite account in order to use the merge queue. Sign up using this link. An organization admin has enabled the Graphite Merge Queue in this repository. Please do not merge from GitHub as this will restart CI on PRs being processed by the merge queue. This stack of pull requests is managed by Graphite. Learn more about stacking. |
|
| Command | Status | Duration | Result |
|---|---|---|---|
nx affected -t lint typecheck test --parallel -... |
❌ Failed | 7m 53s | View ↗ |
nx affected -t test:e2e --parallel --base=c35fa... |
✅ Succeeded | 5m 28s | View ↗ |
☁️ Nx Cloud last updated this comment at 2025-11-12 23:20:26 UTC
b531d64 to
c9b5c0e
Compare
pkgs/core/supabase/migrations/20251102201302_pgflow_upgrade_pgmq_1_5_1.sql
Show resolved
Hide resolved
4a21116 to
361b043
Compare
9f66fe6 to
dfb9cbe
Compare
361b043 to
6bf098a
Compare
| Args: { msg_ids: number[]; queue_name: string; vt_offsets: number[] } | ||
| Returns: { | ||
| enqueued_at: string | ||
| headers: Json | ||
| message: Json | ||
| msg_id: number | ||
| read_ct: number | ||
| vt: string | ||
| }[] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The headers field in the return type is typed as Json but should be Json | null since the JSONB column in PostgreSQL can be NULL. This type mismatch could cause runtime errors when TypeScript code receives null headers from messages that were sent without headers.
set_vt_batch: {
Args: { msg_ids: number[]; queue_name: string; vt_offsets: number[] }
Returns: {
enqueued_at: string
headers: Json | null // Should allow null
message: Json
msg_id: number
read_ct: number
vt: string
}[]
}This aligns with how headers is typed elsewhere in the codebase (line 784) and matches the actual database behavior where headers can be NULL for messages sent without headers.
| Args: { msg_ids: number[]; queue_name: string; vt_offsets: number[] } | |
| Returns: { | |
| enqueued_at: string | |
| headers: Json | |
| message: Json | |
| msg_id: number | |
| read_ct: number | |
| vt: string | |
| }[] | |
| Args: { msg_ids: number[]; queue_name: string; vt_offsets: number[] } | |
| Returns: { | |
| enqueued_at: string | |
| headers: Json | null | |
| message: Json | |
| msg_id: number | |
| read_ct: number | |
| vt: string | |
| }[] |
Spotted by Graphite Agent
Is this helpful? React 👍 or 👎 to let us know.
dfb9cbe to
e5e3dfa
Compare
6bf098a to
53aaf5c
Compare
e5e3dfa to
8f99711
Compare
968c2ae to
05b808d
Compare
| -- Drop "set_vt_batch" function | ||
| DROP FUNCTION "pgflow"."set_vt_batch"; | ||
| -- Create "set_vt_batch" function | ||
| CREATE FUNCTION "pgflow"."set_vt_batch" ("queue_name" text, "msg_ids" bigint[], "vt_offsets" integer[]) RETURNS TABLE ("msg_id" bigint, "read_ct" integer, "enqueued_at" timestamptz, "vt" timestamptz, "message" jsonb, "headers" jsonb) LANGUAGE plpgsql AS $$ | ||
| DECLARE | ||
| qtable TEXT := pgmq.format_table_name(queue_name, 'q'); | ||
| sql TEXT; | ||
| BEGIN | ||
| /* ---------- safety checks ---------------------------------------------------- */ | ||
| IF msg_ids IS NULL OR vt_offsets IS NULL OR array_length(msg_ids, 1) = 0 THEN | ||
| RETURN; -- nothing to do, return empty set | ||
| END IF; | ||
|
|
||
| IF array_length(msg_ids, 1) IS DISTINCT FROM array_length(vt_offsets, 1) THEN | ||
| RAISE EXCEPTION | ||
| 'msg_ids length (%) must equal vt_offsets length (%)', | ||
| array_length(msg_ids, 1), array_length(vt_offsets, 1); | ||
| END IF; | ||
|
|
||
| /* ---------- dynamic statement ------------------------------------------------ */ | ||
| /* One UPDATE joins with the unnested arrays */ | ||
| sql := format( | ||
| $FMT$ | ||
| WITH input (msg_id, vt_offset) AS ( | ||
| SELECT unnest($1)::bigint | ||
| , unnest($2)::int | ||
| ) | ||
| UPDATE pgmq.%I q | ||
| SET vt = clock_timestamp() + make_interval(secs => input.vt_offset), | ||
| read_ct = read_ct -- no change, but keeps RETURNING list aligned | ||
| FROM input | ||
| WHERE q.msg_id = input.msg_id | ||
| RETURNING q.msg_id, | ||
| q.read_ct, | ||
| q.enqueued_at, | ||
| q.vt, | ||
| q.message, | ||
| q.headers | ||
| $FMT$, | ||
| qtable | ||
| ); | ||
|
|
||
| RETURN QUERY EXECUTE sql USING msg_ids, vt_offsets; | ||
| END; | ||
| $$; | ||
| -- Drop "read_with_poll" function | ||
| DROP FUNCTION "pgflow"."read_with_poll"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Critical: Missing pgmq version compatibility check
The core migration is missing the compatibility check that exists in the playground migration. Without this check, the migration will succeed on pgmq < 1.5.0 but fail at runtime when set_vt_batch tries to return the headers column that doesn't exist in older pgmq versions.
The playground migration (lines 1-32) includes this check:
DO $$
DECLARE
has_headers BOOLEAN;
BEGIN
SELECT EXISTS (
SELECT 1 FROM pg_type t
JOIN pg_namespace n ON t.typnamespace = n.oid
JOIN pg_attribute a ON a.attrelid = t.typrelid
WHERE n.nspname = 'pgmq'
AND t.typname = 'message_record'
AND a.attname = 'headers'
AND a.attnum > 0
AND NOT a.attisdropped
) INTO has_headers;
IF NOT has_headers THEN
RAISE EXCEPTION 'INCOMPATIBLE PGMQ VERSION DETECTED...';
END IF;
END $$;This check must be added at the beginning of the core migration to prevent silent failures in production.
| -- Drop "set_vt_batch" function | |
| DROP FUNCTION "pgflow"."set_vt_batch"; | |
| -- Create "set_vt_batch" function | |
| CREATE FUNCTION "pgflow"."set_vt_batch" ("queue_name" text, "msg_ids" bigint[], "vt_offsets" integer[]) RETURNS TABLE ("msg_id" bigint, "read_ct" integer, "enqueued_at" timestamptz, "vt" timestamptz, "message" jsonb, "headers" jsonb) LANGUAGE plpgsql AS $$ | |
| DECLARE | |
| qtable TEXT := pgmq.format_table_name(queue_name, 'q'); | |
| sql TEXT; | |
| BEGIN | |
| /* ---------- safety checks ---------------------------------------------------- */ | |
| IF msg_ids IS NULL OR vt_offsets IS NULL OR array_length(msg_ids, 1) = 0 THEN | |
| RETURN; -- nothing to do, return empty set | |
| END IF; | |
| IF array_length(msg_ids, 1) IS DISTINCT FROM array_length(vt_offsets, 1) THEN | |
| RAISE EXCEPTION | |
| 'msg_ids length (%) must equal vt_offsets length (%)', | |
| array_length(msg_ids, 1), array_length(vt_offsets, 1); | |
| END IF; | |
| /* ---------- dynamic statement ------------------------------------------------ */ | |
| /* One UPDATE joins with the unnested arrays */ | |
| sql := format( | |
| $FMT$ | |
| WITH input (msg_id, vt_offset) AS ( | |
| SELECT unnest($1)::bigint | |
| , unnest($2)::int | |
| ) | |
| UPDATE pgmq.%I q | |
| SET vt = clock_timestamp() + make_interval(secs => input.vt_offset), | |
| read_ct = read_ct -- no change, but keeps RETURNING list aligned | |
| FROM input | |
| WHERE q.msg_id = input.msg_id | |
| RETURNING q.msg_id, | |
| q.read_ct, | |
| q.enqueued_at, | |
| q.vt, | |
| q.message, | |
| q.headers | |
| $FMT$, | |
| qtable | |
| ); | |
| RETURN QUERY EXECUTE sql USING msg_ids, vt_offsets; | |
| END; | |
| $$; | |
| -- Drop "read_with_poll" function | |
| DROP FUNCTION "pgflow"."read_with_poll"; | |
| -- Check for pgmq version compatibility | |
| DO $$ | |
| DECLARE | |
| has_headers BOOLEAN; | |
| BEGIN | |
| SELECT EXISTS ( | |
| SELECT 1 FROM pg_type t | |
| JOIN pg_namespace n ON t.typnamespace = n.oid | |
| JOIN pg_attribute a ON a.attrelid = t.typrelid | |
| WHERE n.nspname = 'pgmq' | |
| AND t.typname = 'message_record' | |
| AND a.attname = 'headers' | |
| AND a.attnum > 0 | |
| AND NOT a.attisdropped | |
| ) INTO has_headers; | |
| IF NOT has_headers THEN | |
| RAISE EXCEPTION 'INCOMPATIBLE PGMQ VERSION DETECTED: This migration requires pgmq 1.5.0 or later with headers support'; | |
| END IF; | |
| END $$; | |
| -- Drop "set_vt_batch" function | |
| DROP FUNCTION "pgflow"."set_vt_batch"; | |
| -- Create "set_vt_batch" function | |
| CREATE FUNCTION "pgflow"."set_vt_batch" ("queue_name" text, "msg_ids" bigint[], "vt_offsets" integer[]) RETURNS TABLE ("msg_id" bigint, "read_ct" integer, "enqueued_at" timestamptz, "vt" timestamptz, "message" jsonb, "headers" jsonb) LANGUAGE plpgsql AS $$ | |
| DECLARE | |
| qtable TEXT := pgmq.format_table_name(queue_name, 'q'); | |
| sql TEXT; | |
| BEGIN | |
| /* ---------- safety checks ---------------------------------------------------- */ | |
| IF msg_ids IS NULL OR vt_offsets IS NULL OR array_length(msg_ids, 1) = 0 THEN | |
| RETURN; -- nothing to do, return empty set | |
| END IF; | |
| IF array_length(msg_ids, 1) IS DISTINCT FROM array_length(vt_offsets, 1) THEN | |
| RAISE EXCEPTION | |
| 'msg_ids length (%) must equal vt_offsets length (%)', | |
| array_length(msg_ids, 1), array_length(vt_offsets, 1); | |
| END IF; | |
| /* ---------- dynamic statement ------------------------------------------------ */ | |
| /* One UPDATE joins with the unnested arrays */ | |
| sql := format( | |
| $FMT$ | |
| WITH input (msg_id, vt_offset) AS ( | |
| SELECT unnest($1)::bigint | |
| , unnest($2)::int | |
| ) | |
| UPDATE pgmq.%I q | |
| SET vt = clock_timestamp() + make_interval(secs => input.vt_offset), | |
| read_ct = read_ct -- no change, but keeps RETURNING list aligned | |
| FROM input | |
| WHERE q.msg_id = input.msg_id | |
| RETURNING q.msg_id, | |
| q.read_ct, | |
| q.enqueued_at, | |
| q.vt, | |
| q.message, | |
| q.headers | |
| $FMT$, | |
| qtable | |
| ); | |
| RETURN QUERY EXECUTE sql USING msg_ids, vt_offsets; | |
| END; | |
| $$; | |
| -- Drop "read_with_poll" function | |
| DROP FUNCTION "pgflow"."read_with_poll"; | |
Spotted by Graphite Agent
Is this helpful? React 👍 or 👎 to let us know.
🔍 Preview Deployment: Website✅ Deployment successful! 🔗 Preview URL: https://pr-301.pgflow.pages.dev 📝 Details:
_Last updated: _ |
8f99711 to
b05ba56
Compare
05b808d to
b05b2ae
Compare
b05ba56 to
06a26d1
Compare
e6a446e to
1d22df3
Compare
06a26d1 to
e1462be
Compare
1d22df3 to
ff95e03
Compare
3eb2f52 to
1deb956
Compare
ff95e03 to
00b1670
Compare
1deb956 to
eba7284
Compare
00b1670 to
508c6ac
Compare
eba7284 to
f8447bd
Compare
508c6ac to
b1b578b
Compare
…test references - Deleted the create_realtime_partition SQL function and related comments - Updated test scripts to no longer call create_realtime_partition - Replaced pgflow.read_with_poll with pgmq.read_with_poll in client code - Ensured tests focus on existing functionality without partition creation - Minor adjustments to test setup for consistency and clarity
f8447bd to
c35fab0
Compare
b1b578b to
3af7bb9
Compare

Summary
This PR removes the pgmq 1.4.x compatibility layer and requires pgmq 1.5.1 or higher. It eliminates deprecated functions, removes backported SQL code, and leverages new PGMQ features including message headers support.
Changes
PGMQ Version Requirement
@pgflow/core,@pgflow/edge-workerRemoved Compatibility Layer
Backported Functions Removed
pgflow.read_with_poll(68 lines) - Removed backport, now uses nativepgmq.read_with_polldirectlypgflow.create_realtime_partition- Removed deprecated partitioning functionsupabase/tests/realtime/create_realtime_partition.test.sql(83 lines)Function Updates
pgflow.set_vt_batch- Updated to returnheaderscolumn from pgmq 1.5.0+:SETOFtoTABLEwith explicit column definitionsmsg_id,read_ct,enqueued_at,vt,message,headersMigration
File:
pkgs/core/supabase/migrations/20251102201302_pgflow_upgrade_pgmq_1_5_1.sqlThe migration includes:
pgmq.message_recordhasheaderscolumnset_vt_batchwith new signatureCode Updates
TypeScript
PgflowSqlClient.ts- Changed queue polling to usepgmq.read_with_pollinstead ofpgflow.read_with_pollQueue.ts- Updated queue reading logicheadersfield to message record typesSQL Tests
Updated 11 test files to remove calls to deprecated functions:
pgflow.create_realtime_partition()calls from all realtime testssupabase/tests/set_vt_batch/headers_handling.test.sqlDocumentation
Migration Management Skill
.claude/skills/migration-management/SKILL.mdwith additional troubleshooting guidanceMigration Guide
For Supabase Users
Recent Supabase versions include pgmq 1.5.0+ by default. Simply upgrade pgflow:
# Upgrade will apply the migration automatically pnpm nx migrate @pgflow/coreThe migration will verify pgmq compatibility and fail with a clear message if your pgmq version is too old.
For Self-Hosted Users
Before upgrading pgflow:
extversion < 1.5.0, upgrade pgmq first:Benefits
Verified Fixes
This PR verifies that upstream issues in Supabase Realtime have been resolved:
realtime.send()after database resets has been fixed in recent Supabase versionscreate_realtime_partitionworkaround function that manually created partitions before testspgflow.create_realtime_partitionand its test file confirms the upstream fix is stable and working as expectedTesting
Dependencies
Related Files
Changed:
Key files:
pkgs/core/supabase/migrations/20251102201302_pgflow_upgrade_pgmq_1_5_1.sqlpkgs/core/schemas/0110_function_set_vt_batch.sqlpkgs/core/src/PgflowSqlClient.tspkgs/edge-worker/src/queue/Queue.ts