From df1e99b7cb2c3acf02af5db4cd4c05829cad38ca Mon Sep 17 00:00:00 2001 From: Thomas Munro Date: Fri, 31 Mar 2023 11:01:51 +1300 Subject: [PATCH 1/5] Parallel Hash Full Join. Full and right outer joins were not supported in the initial implementation of Parallel Hash Join because of deadlock hazards (see discussion). Therefore FULL JOIN inhibited parallelism, as the other join strategies can't do that in parallel either. Add a new PHJ phase PHJ_BATCH_SCAN that scans for unmatched tuples on the inner side of one batch's hash table. For now, sidestep the deadlock problem by terminating parallelism there. The last process to arrive at that phase emits the unmatched tuples, while others detach and are free to go and work on other batches, if there are any, but otherwise they finish the join early. That unfairness is considered acceptable for now, because it's better than no parallelism at all. The build and probe phases are run in parallel, and the new scan-for-unmatched phase, while serial, is usually applied to the smaller of the two relations and is either limited by some multiple of work_mem, or it's too big and is partitioned into batches and then the situation is improved by batch-level parallelism. Author: Melanie Plageman Author: Thomas Munro Reviewed-by: Thomas Munro Discussion: https://postgr.es/m/CA%2BhUKG%2BA6ftXPz4oe92%2Bx8Er%2BxpGZqto70-Q_ERwRaSyA%3DafNg%40mail.gmail.com --- src/backend/executor/nodeHash.c | 179 +++++++++++++++++++++++- src/backend/executor/nodeHashjoin.c | 102 +++++++++----- src/backend/optimizer/path/joinpath.c | 14 +- src/include/executor/hashjoin.h | 6 +- src/include/executor/nodeHash.h | 3 + src/test/regress/expected/join_hash.out | 65 ++++++++- src/test/regress/sql/join_hash.sql | 27 +++- 7 files changed, 339 insertions(+), 57 deletions(-) diff --git a/src/backend/executor/nodeHash.c b/src/backend/executor/nodeHash.c index 62d3c2da790..39db08164e0 100644 --- a/src/backend/executor/nodeHash.c +++ b/src/backend/executor/nodeHash.c @@ -2388,6 +2388,69 @@ ExecPrepHashTableForUnmatched(HashJoinState *hjstate) hjstate->hj_CurTuple = NULL; } +/* + * Decide if this process is allowed to run the unmatched scan. If so, the + * batch barrier is advanced to PHJ_BATCH_SCAN and true is returned. + * Otherwise the batch is detached and false is returned. + */ +bool +ExecParallelPrepHashTableForUnmatched(HashJoinState *hjstate) +{ + HashJoinTable hashtable = hjstate->hj_HashTable; + int curbatch = hashtable->curbatch; + ParallelHashJoinBatch *batch = hashtable->batches[curbatch].shared; + + Assert(BarrierPhase(&batch->batch_barrier) == PHJ_BATCH_PROBE); + + /* + * It would not be deadlock-free to wait on the batch barrier, because it + * is in PHJ_BATCH_PROBE phase, and thus processes attached to it have + * already emitted tuples. Therefore, we'll hold a wait-free election: + * only one process can continue to the next phase, and all others detach + * from this batch. They can still go any work on other batches, if there + * are any. + */ + if (!BarrierArriveAndDetachExceptLast(&batch->batch_barrier)) + { + /* This process considers the batch to be done. */ + hashtable->batches[hashtable->curbatch].done = true; + + /* Make sure any temporary files are closed. */ + sts_end_parallel_scan(hashtable->batches[curbatch].inner_tuples); + sts_end_parallel_scan(hashtable->batches[curbatch].outer_tuples); + + /* + * Track largest batch we've seen, which would normally happen in + * ExecHashTableDetachBatch(). + */ + hashtable->spacePeak = + Max(hashtable->spacePeak, + batch->size + sizeof(dsa_pointer_atomic) * hashtable->nbuckets); + hashtable->curbatch = -1; + return false; + } + + /* Now we are alone with this batch. */ + Assert(BarrierPhase(&batch->batch_barrier) == PHJ_BATCH_SCAN); + Assert(BarrierParticipants(&batch->batch_barrier) == 1); + + /* + * Has another process decided to give up early and command all processes + * to skip the unmatched scan? + */ + if (batch->skip_unmatched) + { + hashtable->batches[hashtable->curbatch].done = true; + ExecHashTableDetachBatch(hashtable); + return false; + } + + /* Now prepare the process local state, just as for non-parallel join. */ + ExecPrepHashTableForUnmatched(hjstate); + + return true; +} + /* * ExecScanHashTableForUnmatched * scan the hash table for unmatched inner tuples @@ -2462,6 +2525,72 @@ ExecScanHashTableForUnmatched(HashJoinState *hjstate, ExprContext *econtext) return false; } +/* + * ExecParallelScanHashTableForUnmatched + * scan the hash table for unmatched inner tuples, in parallel join + * + * On success, the inner tuple is stored into hjstate->hj_CurTuple and + * econtext->ecxt_innertuple, using hjstate->hj_HashTupleSlot as the slot + * for the latter. + */ +bool +ExecParallelScanHashTableForUnmatched(HashJoinState *hjstate, + ExprContext *econtext) +{ + HashJoinTable hashtable = hjstate->hj_HashTable; + HashJoinTuple hashTuple = hjstate->hj_CurTuple; + + for (;;) + { + /* + * hj_CurTuple is the address of the tuple last returned from the + * current bucket, or NULL if it's time to start scanning a new + * bucket. + */ + if (hashTuple != NULL) + hashTuple = ExecParallelHashNextTuple(hashtable, hashTuple); + else if (hjstate->hj_CurBucketNo < hashtable->nbuckets) + hashTuple = ExecParallelHashFirstTuple(hashtable, + hjstate->hj_CurBucketNo++); + else + break; /* finished all buckets */ + + while (hashTuple != NULL) + { + if (!HeapTupleHeaderHasMatch(HJTUPLE_MINTUPLE(hashTuple))) + { + TupleTableSlot *inntuple; + + /* insert hashtable's tuple into exec slot */ + inntuple = ExecStoreMinimalTuple(HJTUPLE_MINTUPLE(hashTuple), + hjstate->hj_HashTupleSlot, + false); /* do not pfree */ + econtext->ecxt_innertuple = inntuple; + + /* + * Reset temp memory each time; although this function doesn't + * do any qual eval, the caller will, so let's keep it + * parallel to ExecScanHashBucket. + */ + ResetExprContext(econtext); + + hjstate->hj_CurTuple = hashTuple; + return true; + } + + hashTuple = ExecParallelHashNextTuple(hashtable, hashTuple); + } + + /* allow this loop to be cancellable */ + CHECK_FOR_INTERRUPTS(); + } + + /* + * no more unmatched tuples + */ + return false; +} + /* * ExecHashTableReset * @@ -3793,6 +3922,7 @@ ExecParallelHashEnsureBatchAccessors(HashJoinTable hashtable) accessor->shared = shared; accessor->preallocated = 0; accessor->done = false; + accessor->outer_eof = false; accessor->inner_tuples = sts_attach(ParallelHashJoinBatchInner(shared), hashtable->hjstate->worker_id, @@ -3838,25 +3968,62 @@ ExecHashTableDetachBatch(HashJoinTable hashtable) { int curbatch = hashtable->curbatch; ParallelHashJoinBatch *batch = hashtable->batches[curbatch].shared; + bool attached = true; /* Make sure any temporary files are closed. */ sts_end_parallel_scan(hashtable->batches[curbatch].inner_tuples); sts_end_parallel_scan(hashtable->batches[curbatch].outer_tuples); - /* Detach from the batch we were last working on. */ + /* After attaching we always get at least to PHJ_BATCH_PROBE. */ + Assert(BarrierPhase(&batch->batch_barrier) == PHJ_BATCH_PROBE || + BarrierPhase(&batch->batch_barrier) == PHJ_BATCH_SCAN); + + /* + * If we're abandoning the PHJ_BATCH_PROBE phase early without having + * reached the end of it, it means the plan doesn't want any more + * tuples, and it is happy to abandon any tuples buffered in this + * process's subplans. For correctness, we can't allow any process to + * execute the PHJ_BATCH_SCAN phase, because we will never have the + * complete set of match bits. Therefore we skip emitting unmatched + * tuples in all backends (if this is a full/right join), as if those + * tuples were all due to be emitted by this process and it has + * abandoned them too. + */ /* * CBDB_PARALLEL: Parallel Hash Left Anti Semi (Not-In) Join(parallel-aware) * If phs_lasj_has_null is true, that means we have found null when building hash table, * there were no batches to detach. */ - if (!hashtable->parallel_state->phs_lasj_has_null && BarrierArriveAndDetach(&batch->batch_barrier)) + if (BarrierPhase(&batch->batch_barrier) == PHJ_BATCH_PROBE && + !hashtable->parallel_state->phs_lasj_has_null && /* CBDB_PARALLEL */ + !hashtable->batches[curbatch].outer_eof) + { + /* + * This flag may be written to by multiple backends during + * PHJ_BATCH_PROBE phase, but will only be read in PHJ_BATCH_SCAN + * phase so requires no extra locking. + */ + batch->skip_unmatched = true; + } + + /* + * Even if we aren't doing a full/right outer join, we'll step through + * the PHJ_BATCH_SCAN phase just to maintain the invariant that + * freeing happens in PHJ_BATCH_FREE, but that'll be wait-free. + */ + if (BarrierPhase(&batch->batch_barrier) == PHJ_BATCH_PROBE && + !hashtable->parallel_state->phs_lasj_has_null /* CBDB_PARALLEL */) + attached = BarrierArriveAndDetachExceptLast(&batch->batch_barrier); + if (attached && BarrierArriveAndDetach(&batch->batch_barrier)) { /* - * Technically we shouldn't access the barrier because we're no - * longer attached, but since there is no way it's moving after - * this point it seems safe to make the following assertion. + * We are not longer attached to the batch barrier, but we're the + * process that was chosen to free resources and it's safe to + * assert the current phase. The ParallelHashJoinBatch can't go + * away underneath us while we are attached to the build barrier, + * making this access safe. */ - Assert(BarrierPhase(&batch->batch_barrier) == PHJ_BATCH_DONE); + Assert(BarrierPhase(&batch->batch_barrier) == PHJ_BATCH_FREE); /* Free shared chunks and buckets. */ while (DsaPointerIsValid(batch->chunks)) diff --git a/src/backend/executor/nodeHashjoin.c b/src/backend/executor/nodeHashjoin.c index 9ec70f16e31..7f58cafb75d 100644 --- a/src/backend/executor/nodeHashjoin.c +++ b/src/backend/executor/nodeHashjoin.c @@ -80,11 +80,12 @@ * aren't enough to go around. For each batch there is a separate barrier * with the following phases: * - * PHJ_BATCH_ELECTING -- initial state - * PHJ_BATCH_ALLOCATING -- one allocates buckets - * PHJ_BATCH_LOADING -- all load the hash table from disk - * PHJ_BATCH_PROBING -- all probe - * PHJ_BATCH_DONE -- end + * PHJ_BATCH_ELECT -- initial state + * PHJ_BATCH_ALLOCATE* -- one allocates buckets + * PHJ_BATCH_LOAD -- all load the hash table from disk + * PHJ_BATCH_PROBE -- all probe + * PHJ_BATCH_SCAN* -- one does full/right unmatched scan + * PHJ_BATCH_FREE* -- one frees memory * * Batch 0 is a special case, because it starts out in phase * PHJ_BATCH_PROBING; populating batch 0's hash table is done during @@ -97,11 +98,17 @@ * * To avoid deadlocks, we never wait for any barrier unless it is known that * all other backends attached to it are actively executing the node or have - * already arrived. Practically, that means that we never return a tuple - * while attached to a barrier, unless the barrier has reached its final - * state. In the slightly special case of the per-batch barrier, we return - * tuples while in PHJ_BATCH_PROBING phase, but that's OK because we use - * BarrierArriveAndDetach() to advance it to PHJ_BATCH_DONE without waiting. + * finished. Practically, that means that we never emit a tuple while attached + * to a barrier, unless the barrier has reached a phase that means that no + * process will wait on it again. We emit tuples while attached to the build + * barrier in phase PHJ_BUILD_RUN, and to a per-batch barrier in phase + * PHJ_BATCH_PROBE. These are advanced to PHJ_BUILD_FREE and PHJ_BATCH_SCAN + * respectively without waiting, using BarrierArriveAndDetach() and + * BarrierArriveAndDetachExceptLast() respectively. The last to detach + * receives a different return value so that it knows that it's safe to + * clean up. Any straggler process that attaches after that phase is reached + * will see that it's too late to participate or access the relevant shared + * memory objects. * *------------------------------------------------------------------------- */ @@ -493,8 +500,23 @@ ExecHashJoinImpl(PlanState *pstate, bool parallel) if (HJ_FILL_INNER(node)) { /* set up to scan for unmatched inner tuples */ - ExecPrepHashTableForUnmatched(node); - node->hj_JoinState = HJ_FILL_INNER_TUPLES; + if (parallel) + { + /* + * Only one process is currently allow to handle + * each batch's unmatched tuples, in a parallel + * join. + */ + if (ExecParallelPrepHashTableForUnmatched(node)) + node->hj_JoinState = HJ_FILL_INNER_TUPLES; + else + node->hj_JoinState = HJ_NEED_NEW_BATCH; + } + else + { + ExecPrepHashTableForUnmatched(node); + node->hj_JoinState = HJ_FILL_INNER_TUPLES; + } } else node->hj_JoinState = HJ_NEED_NEW_BATCH; @@ -605,25 +627,13 @@ ExecHashJoinImpl(PlanState *pstate, bool parallel) { node->hj_MatchedOuter = true; - if (parallel) - { - /* - * Full/right outer joins are currently not supported - * for parallel joins, so we don't need to set the - * match bit. Experiments show that it's worth - * avoiding the shared memory traffic on large - * systems. - */ - Assert(!HJ_FILL_INNER(node)); - } - else - { - /* - * This is really only needed if HJ_FILL_INNER(node), - * but we'll avoid the branch and just set it always. - */ + + /* + * This is really only needed if HJ_FILL_INNER(node), but + * we'll avoid the branch and just set it always. + */ + if (!HeapTupleHeaderHasMatch(HJTUPLE_MINTUPLE(node->hj_CurTuple))) HeapTupleHeaderSetMatch(HJTUPLE_MINTUPLE(node->hj_CurTuple)); - } /* In an antijoin, we never return a matched tuple */ if (node->js.jointype == JOIN_ANTI || @@ -682,7 +692,8 @@ ExecHashJoinImpl(PlanState *pstate, bool parallel) * so any unmatched inner tuples in the hashtable have to be * emitted before we continue to the next batch. */ - if (!ExecScanHashTableForUnmatched(node, econtext)) + if (!(parallel ? ExecParallelScanHashTableForUnmatched(node, econtext) + : ExecScanHashTableForUnmatched(node, econtext))) { /* no more unmatched tuples */ node->hj_JoinState = HJ_NEED_NEW_BATCH; @@ -1241,6 +1252,8 @@ ExecParallelHashJoinOuterGetTuple(PlanState *outerNode, } /* End of this batch */ + hashtable->batches[curbatch].outer_eof = true; + return NULL; } @@ -1521,15 +1534,34 @@ ExecParallelHashJoinNewBatch(HashJoinState *hjstate) * hash table stays alive until everyone's finished * probing it, but no participant is allowed to wait at * this barrier again (or else a deadlock could occur). - * All attached participants must eventually call - * BarrierArriveAndDetach() so that the final phase - * PHJ_BATCH_DONE can be reached. + * All attached participants must eventually detach from + * the barrier and one worker must advance the phase so + * that the final phase is reached. */ ExecParallelHashTableSetCurrentBatch(hashtable, batchno); sts_begin_parallel_scan(hashtable->batches[batchno].outer_tuples); + return true; + case PHJ_BATCH_SCAN: + + /* + * In principle, we could help scan for unmatched tuples, + * since that phase is already underway (the thing we + * can't do under current deadlock-avoidance rules is wait + * for others to arrive at PHJ_BATCH_SCAN, because + * PHJ_BATCH_PROBE emits tuples, but in this case we just + * got here without waiting). That is not yet done. For + * now, we just detach and go around again. We have to + * use ExecHashTableDetachBatch() because there's a small + * chance we'll be the last to detach, and then we're + * responsible for freeing memory. + */ + ExecParallelHashTableSetCurrentBatch(hashtable, batchno); + hashtable->batches[batchno].done = true; + ExecHashTableDetachBatch(hashtable); + break; - case PHJ_BATCH_DONE: + case PHJ_BATCH_FREE: /* * Already done. Detach and go around again (if any diff --git a/src/backend/optimizer/path/joinpath.c b/src/backend/optimizer/path/joinpath.c index ec50c66104a..bff31340128 100644 --- a/src/backend/optimizer/path/joinpath.c +++ b/src/backend/optimizer/path/joinpath.c @@ -2315,15 +2315,9 @@ hash_inner_and_outer(PlannerInfo *root, * able to properly guarantee uniqueness. Similarly, we can't handle * JOIN_FULL and JOIN_RIGHT, because they can produce false null * extended rows. Also, the resulting path must not be parameterized. - * We would be able to support JOIN_FULL and JOIN_RIGHT for Parallel - * Hash, since in that case we're back to a single hash table with a - * single set of match bits for each batch, but that will require - * figuring out a deadlock-free way to wait for the probe to finish. */ if (joinrel->consider_parallel && save_jointype != JOIN_UNIQUE_OUTER && - save_jointype != JOIN_FULL && - save_jointype != JOIN_RIGHT && outerrel->partial_pathlist != NIL && bms_is_empty(joinrel->lateral_relids)) { @@ -2360,9 +2354,13 @@ hash_inner_and_outer(PlannerInfo *root, * total inner path will also be parallel-safe, but if not, we'll * have to search for the cheapest safe, unparameterized inner * path. If doing JOIN_UNIQUE_INNER, we can't use any alternative - * inner path. + * inner path. If full or right join, we can't use parallelism + * (building the hash table in each backend) because no one + * process has all the match bits. */ - if (cheapest_total_inner->parallel_safe) + if (save_jointype == JOIN_FULL || save_jointype == JOIN_RIGHT) + cheapest_safe_inner = NULL; + else if (cheapest_total_inner->parallel_safe) cheapest_safe_inner = cheapest_total_inner; else if (save_jointype != JOIN_UNIQUE_INNER) cheapest_safe_inner = diff --git a/src/include/executor/hashjoin.h b/src/include/executor/hashjoin.h index b1fbaacf5e9..b240d0ae555 100644 --- a/src/include/executor/hashjoin.h +++ b/src/include/executor/hashjoin.h @@ -195,6 +195,7 @@ typedef struct ParallelHashJoinBatch size_t ntuples; /* number of tuples loaded */ size_t old_ntuples; /* number of tuples before repartitioning */ bool space_exhausted; + bool skip_unmatched; /* whether to abandon unmatched scan */ /* * Variable-sized SharedTuplestore objects follow this struct in memory. @@ -239,7 +240,7 @@ typedef struct ParallelHashJoinBatchAccessor size_t estimated_size; /* size of partition on disk */ size_t old_ntuples; /* how many tuples before repartitioning? */ bool at_least_one_chunk; /* has this backend allocated a chunk? */ - + bool outer_eof; /* has this process hit end of batch? */ bool done; /* flag to remember that a batch is done */ SharedTuplestoreAccessor *inner_tuples; SharedTuplestoreAccessor *outer_tuples; @@ -305,7 +306,8 @@ typedef struct ParallelHashJoinState #define PHJ_BATCH_ALLOCATING 1 #define PHJ_BATCH_LOADING 2 #define PHJ_BATCH_PROBING 3 -#define PHJ_BATCH_DONE 4 +#define PHJ_BATCH_SCAN 4 +#define PHJ_BATCH_FREE 5 /* The phases of batch growth while hashing, for grow_batches_barrier. */ #define PHJ_GROW_BATCHES_ELECTING 0 diff --git a/src/include/executor/nodeHash.h b/src/include/executor/nodeHash.h index 993de4519b5..36549376ef9 100644 --- a/src/include/executor/nodeHash.h +++ b/src/include/executor/nodeHash.h @@ -64,9 +64,12 @@ extern bool ExecScanHashBucket(HashState *hashState, HashJoinState *hjstate, extern bool ExecParallelScanHashBucket(HashState *hashState, HashJoinState *hjstate, ExprContext *econtext); extern void ExecPrepHashTableForUnmatched(HashJoinState *hjstate); +extern bool ExecParallelPrepHashTableForUnmatched(HashJoinState *hjstate); extern bool ExecScanHashTableForUnmatched(HashJoinState *hjstate, ExprContext *econtext); extern void ExecHashTableReset(HashState *hashState, HashJoinTable hashtable); +extern bool ExecParallelScanHashTableForUnmatched(HashJoinState *hjstate, + ExprContext *econtext); extern void ExecHashTableResetMatchFlags(HashJoinTable hashtable); extern void ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew, uint64 operatorMemKB, diff --git a/src/test/regress/expected/join_hash.out b/src/test/regress/expected/join_hash.out index 5171a7d9cf3..250704efbd7 100644 --- a/src/test/regress/expected/join_hash.out +++ b/src/test/regress/expected/join_hash.out @@ -315,6 +315,13 @@ $$); t | f (1 row) +-- parallel full multi-batch hash join +select count(*) from simple r full outer join simple s using (id); + count +------- + 20000 +(1 row) + rollback to settings; -- The "bad" case: during execution we need to increase number of -- batches; in this case we plan for 1 batch, and increase at least a @@ -816,8 +823,9 @@ select count(*) from simple r full outer join simple s using (id); (1 row) rollback to settings; --- parallelism not possible with parallel-oblivious outer hash join +-- parallelism not possible with parallel-oblivious full hash join savepoint settings; +set enable_parallel_hash = off; set local max_parallel_workers_per_gather = 2; explain (costs off) select count(*) from simple r full outer join simple s using (id); @@ -841,7 +849,32 @@ select count(*) from simple r full outer join simple s using (id); (1 row) rollback to settings; --- An full outer join where every record is not matched. +-- parallelism is possible with parallel-aware full hash join +savepoint settings; +set local max_parallel_workers_per_gather = 2; +explain (costs off) + select count(*) from simple r full outer join simple s using (id); + QUERY PLAN +------------------------------------------------------------- + Finalize Aggregate + -> Gather + Workers Planned: 2 + -> Partial Aggregate + -> Parallel Hash Full Join + Hash Cond: (r.id = s.id) + -> Parallel Seq Scan on simple r + -> Parallel Hash + -> Parallel Seq Scan on simple s +(9 rows) + +select count(*) from simple r full outer join simple s using (id); + count +------- + 20000 +(1 row) + +rollback to settings; +-- A full outer join where every record is not matched. -- non-parallel savepoint settings; set local max_parallel_workers_per_gather = 0; @@ -869,8 +902,9 @@ select count(*) from simple r full outer join simple s on (r.id = 0 - s.id); (1 row) rollback to settings; --- parallelism not possible with parallel-oblivious outer hash join +-- parallelism not possible with parallel-oblivious full hash join savepoint settings; +set enable_parallel_hash = off; set local max_parallel_workers_per_gather = 2; explain (costs off) select count(*) from simple r full outer join simple s on (r.id = 0 - s.id); @@ -895,6 +929,31 @@ select count(*) from simple r full outer join simple s on (r.id = 0 - s.id); 120000 (1 row) +rollback to settings; +-- parallelism is possible with parallel-aware full hash join +savepoint settings; +set local max_parallel_workers_per_gather = 2; +explain (costs off) + select count(*) from simple r full outer join simple s on (r.id = 0 - s.id); + QUERY PLAN +------------------------------------------------------------- + Finalize Aggregate + -> Gather + Workers Planned: 2 + -> Partial Aggregate + -> Parallel Hash Full Join + Hash Cond: ((0 - s.id) = r.id) + -> Parallel Seq Scan on simple s + -> Parallel Hash + -> Parallel Seq Scan on simple r +(9 rows) + +select count(*) from simple r full outer join simple s on (r.id = 0 - s.id); + count +------- + 40000 +(1 row) + rollback to settings; -- exercise special code paths for huge tuples (note use of non-strict -- expression and left join required to get the detoasted tuple into diff --git a/src/test/regress/sql/join_hash.sql b/src/test/regress/sql/join_hash.sql index 325068e9d23..01961d1ce6e 100644 --- a/src/test/regress/sql/join_hash.sql +++ b/src/test/regress/sql/join_hash.sql @@ -191,6 +191,8 @@ select original > 1 as initially_multibatch, final > original as increased_batch $$ select count(*) from simple r join simple s using (id); $$); +-- parallel full multi-batch hash join +select count(*) from simple r full outer join simple s using (id); rollback to settings; -- The "bad" case: during execution we need to increase number of @@ -438,15 +440,24 @@ explain (costs off) select count(*) from simple r full outer join simple s using (id); rollback to settings; --- parallelism not possible with parallel-oblivious outer hash join +-- parallelism not possible with parallel-oblivious full hash join savepoint settings; +set enable_parallel_hash = off; set local max_parallel_workers_per_gather = 2; explain (costs off) select count(*) from simple r full outer join simple s using (id); select count(*) from simple r full outer join simple s using (id); rollback to settings; --- An full outer join where every record is not matched. +-- parallelism is possible with parallel-aware full hash join +savepoint settings; +set local max_parallel_workers_per_gather = 2; +explain (costs off) + select count(*) from simple r full outer join simple s using (id); +select count(*) from simple r full outer join simple s using (id); +rollback to settings; + +-- A full outer join where every record is not matched. -- non-parallel savepoint settings; @@ -456,14 +467,24 @@ explain (costs off) select count(*) from simple r full outer join simple s on (r.id = 0 - s.id); rollback to settings; --- parallelism not possible with parallel-oblivious outer hash join +-- parallelism not possible with parallel-oblivious full hash join savepoint settings; +set enable_parallel_hash = off; set local max_parallel_workers_per_gather = 2; explain (costs off) select count(*) from simple r full outer join simple s on (r.id = 0 - s.id); select count(*) from simple r full outer join simple s on (r.id = 0 - s.id); rollback to settings; +-- parallelism is possible with parallel-aware full hash join +savepoint settings; +set local max_parallel_workers_per_gather = 2; +explain (costs off) + select count(*) from simple r full outer join simple s on (r.id = 0 - s.id); +select count(*) from simple r full outer join simple s on (r.id = 0 - s.id); +rollback to settings; + + -- exercise special code paths for huge tuples (note use of non-strict -- expression and left join required to get the detoasted tuple into -- the hash table) From 41b22c20cc347511cea95e879725081f16c189fd Mon Sep 17 00:00:00 2001 From: Thomas Munro Date: Fri, 14 Apr 2023 10:52:58 +1200 Subject: [PATCH 2/5] Fix PHJ match bit initialization. Hash join tuples reuse the HOT status bit to indicate match status during hash join execution. Correct reuse requires clearing the bit in all tuples. Serial hash join and parallel multi-batch hash join do so upon inserting the tuple into the hashtable. Single batch parallel hash join and batch 0 of unexpected multi-batch hash joins forgot to do this. It hadn't come up before because hashtable tuple match bits are only used for right and full outer joins and parallel ROJ and FOJ were unsupported. 11c2d6fdf5 introduced support for parallel ROJ/FOJ but neglected to ensure the match bits were reset. Author: Melanie Plageman Reported-by: Richard Guo Discussion: https://postgr.es/m/flat/CAMbWs48Nde1Mv%3DBJv6_vXmRKHMuHZm2Q_g4F6Z3_pn%2B3EV6BGQ%40mail.gmail.com --- src/backend/executor/nodeHash.c | 1 + src/test/regress/expected/join_hash.out | 37 +++++++++++++++++++++++++ src/test/regress/sql/join_hash.sql | 27 ++++++++++++++++++ 3 files changed, 65 insertions(+) diff --git a/src/backend/executor/nodeHash.c b/src/backend/executor/nodeHash.c index 39db08164e0..1a305de21e6 100644 --- a/src/backend/executor/nodeHash.c +++ b/src/backend/executor/nodeHash.c @@ -2004,6 +2004,7 @@ ExecParallelHashTableInsert(HashJoinTable hashtable, /* Store the hash value in the HashJoinTuple header. */ hashTuple->hashvalue = hashvalue; memcpy(HJTUPLE_MINTUPLE(hashTuple), tuple, tuple->t_len); + HeapTupleHeaderClearMatch(HJTUPLE_MINTUPLE(hashTuple)); /* Push it onto the front of the bucket's list */ ExecParallelHashPushTuple(&hashtable->buckets.shared[bucketno], diff --git a/src/test/regress/expected/join_hash.out b/src/test/regress/expected/join_hash.out index 250704efbd7..b1f780ff7b8 100644 --- a/src/test/regress/expected/join_hash.out +++ b/src/test/regress/expected/join_hash.out @@ -1031,6 +1031,43 @@ explain (costs off) select * from join_hash_t_small, join_hash_t_big where a = b (7 rows) rollback to settings; +-- Hash join reuses the HOT status bit to indicate match status. This can only +-- be guaranteed to produce correct results if all the hash join tuple match +-- bits are reset before reuse. This is done upon loading them into the +-- hashtable. +SAVEPOINT settings; +SET enable_parallel_hash = on; +SET min_parallel_table_scan_size = 0; +SET parallel_setup_cost = 0; +SET parallel_tuple_cost = 0; +CREATE TABLE hjtest_matchbits_t1(id int); +CREATE TABLE hjtest_matchbits_t2(id int); +INSERT INTO hjtest_matchbits_t1 VALUES (1); +INSERT INTO hjtest_matchbits_t2 VALUES (2); +-- Update should create a HOT tuple. If this status bit isn't cleared, we won't +-- correctly emit the NULL-extended unmatching tuple in full hash join. +UPDATE hjtest_matchbits_t2 set id = 2; +SELECT * FROM hjtest_matchbits_t1 t1 FULL JOIN hjtest_matchbits_t2 t2 ON t1.id = t2.id; + id | id +----+---- + 1 | + | 2 +(2 rows) + +-- Test serial full hash join. +-- Resetting parallel_setup_cost should force a serial plan. +-- Just to be safe, however, set enable_parallel_hash to off, as parallel full +-- hash joins are only supported with shared hashtables. +RESET parallel_setup_cost; +SET enable_parallel_hash = off; +SELECT * FROM hjtest_matchbits_t1 t1 FULL JOIN hjtest_matchbits_t2 t2 ON t1.id = t2.id; + id | id +----+---- + 1 | + | 2 +(2 rows) + +ROLLBACK TO settings; rollback; -- Verify that hash key expressions reference the correct -- nodes. Hashjoin's hashkeys need to reference its outer plan, Hash's diff --git a/src/test/regress/sql/join_hash.sql b/src/test/regress/sql/join_hash.sql index 01961d1ce6e..0858e14040f 100644 --- a/src/test/regress/sql/join_hash.sql +++ b/src/test/regress/sql/join_hash.sql @@ -539,6 +539,33 @@ rollback to settings; rollback; +-- Hash join reuses the HOT status bit to indicate match status. This can only +-- be guaranteed to produce correct results if all the hash join tuple match +-- bits are reset before reuse. This is done upon loading them into the +-- hashtable. +SAVEPOINT settings; +SET enable_parallel_hash = on; +SET min_parallel_table_scan_size = 0; +SET parallel_setup_cost = 0; +SET parallel_tuple_cost = 0; +CREATE TABLE hjtest_matchbits_t1(id int); +CREATE TABLE hjtest_matchbits_t2(id int); +INSERT INTO hjtest_matchbits_t1 VALUES (1); +INSERT INTO hjtest_matchbits_t2 VALUES (2); +-- Update should create a HOT tuple. If this status bit isn't cleared, we won't +-- correctly emit the NULL-extended unmatching tuple in full hash join. +UPDATE hjtest_matchbits_t2 set id = 2; +SELECT * FROM hjtest_matchbits_t1 t1 FULL JOIN hjtest_matchbits_t2 t2 ON t1.id = t2.id; +-- Test serial full hash join. +-- Resetting parallel_setup_cost should force a serial plan. +-- Just to be safe, however, set enable_parallel_hash to off, as parallel full +-- hash joins are only supported with shared hashtables. +RESET parallel_setup_cost; +SET enable_parallel_hash = off; +SELECT * FROM hjtest_matchbits_t1 t1 FULL JOIN hjtest_matchbits_t2 t2 ON t1.id = t2.id; +ROLLBACK TO settings; + +rollback; -- Verify that hash key expressions reference the correct -- nodes. Hashjoin's hashkeys need to reference its outer plan, Hash's From f923ea16a0d04744c6a5a64c75ecc7b7d301de08 Mon Sep 17 00:00:00 2001 From: Michael Paquier Date: Mon, 12 Jun 2023 12:19:46 +0900 Subject: [PATCH 3/5] Fix instability in regression test for Parallel Hash Full Join As reported by buildfarm member conchuela, one of the regression tests added by 558c9d7 is having some ordering issues. This commit adds an ORDER BY clause to make the output more stable for the problematic query. Fix suggested by Tom Lane. The plan of the query updated still uses a parallel hash full join. Author: Melanie Plageman Discussion: https://postgr.es/m/623596.1684541098@sss.pgh.pa.us --- src/test/regress/expected/join_hash.out | 3 ++- src/test/regress/sql/join_hash.sql | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/src/test/regress/expected/join_hash.out b/src/test/regress/expected/join_hash.out index b1f780ff7b8..28f4558ec04 100644 --- a/src/test/regress/expected/join_hash.out +++ b/src/test/regress/expected/join_hash.out @@ -1047,7 +1047,8 @@ INSERT INTO hjtest_matchbits_t2 VALUES (2); -- Update should create a HOT tuple. If this status bit isn't cleared, we won't -- correctly emit the NULL-extended unmatching tuple in full hash join. UPDATE hjtest_matchbits_t2 set id = 2; -SELECT * FROM hjtest_matchbits_t1 t1 FULL JOIN hjtest_matchbits_t2 t2 ON t1.id = t2.id; +SELECT * FROM hjtest_matchbits_t1 t1 FULL JOIN hjtest_matchbits_t2 t2 ON t1.id = t2.id + ORDER BY t1.id; id | id ----+---- 1 | diff --git a/src/test/regress/sql/join_hash.sql b/src/test/regress/sql/join_hash.sql index 0858e14040f..0115489a6b9 100644 --- a/src/test/regress/sql/join_hash.sql +++ b/src/test/regress/sql/join_hash.sql @@ -555,7 +555,8 @@ INSERT INTO hjtest_matchbits_t2 VALUES (2); -- Update should create a HOT tuple. If this status bit isn't cleared, we won't -- correctly emit the NULL-extended unmatching tuple in full hash join. UPDATE hjtest_matchbits_t2 set id = 2; -SELECT * FROM hjtest_matchbits_t1 t1 FULL JOIN hjtest_matchbits_t2 t2 ON t1.id = t2.id; +SELECT * FROM hjtest_matchbits_t1 t1 FULL JOIN hjtest_matchbits_t2 t2 ON t1.id = t2.id + ORDER BY t1.id; -- Test serial full hash join. -- Resetting parallel_setup_cost should force a serial plan. -- Just to be safe, however, set enable_parallel_hash to off, as parallel full From 83266a62a275885b514182572156712977e2415d Mon Sep 17 00:00:00 2001 From: Zhang Mingli Date: Tue, 24 Mar 2026 13:00:49 +0800 Subject: [PATCH 4/5] Parallel Hash Full Join and Right Join PostgreSQL originally excluded FULL and RIGHT outer joins from parallel hash join because of deadlock hazards in the per-batch barrier protocol. PG 14 resolved this by introducing a dedicated PHJ_BATCH_SCAN phase: one elected worker emits unmatched inner-side rows after probing, while the others detach and move on. In CBDB, distributed execution adds a second dimension: after a full outer join the unmatched NULL-filled rows may come from any segment, so the result carries a HashedOJ locus rather than a plain Hashed locus. This change teaches the parallel planner about that: - FULL JOIN and RIGHT JOIN are now valid parallel join types in the distributed planner. Previously they were unconditionally rejected, forcing serial execution across all segments. - The HashedOJ locus produced by a parallel full join now carries parallel_workers, so operators above the join (aggregates, further joins) can remain parallel. - A crash that could occur when a parallel LASJ_NOTIN (NOT IN) join encountered NULL inner keys is fixed. The worker would exit early but the batch barrier, which was never attached to, would be touched on shutdown causing an assertion failure. Example plans (3 segments, parallel_workers=2): -- FULL JOIN: result locus is HashedOJ with Parallel Workers: 2 EXPLAIN(costs off, locus) SELECT count(*) FROM t1 FULL JOIN t2 USING (id); Finalize Aggregate Locus: Entry -> Gather Motion 6:1 (slice1; segments: 6) -> Partial Aggregate Locus: HashedOJ Parallel Workers: 2 -> Parallel Hash Full Join Locus: HashedOJ Parallel Workers: 2 Hash Cond: (t1.id = t2.id) -> Parallel Seq Scan on t1 Locus: HashedWorkers -> Parallel Hash -> Parallel Seq Scan on t2 Locus: HashedWorkers -- RIGHT JOIN: when t1 is larger the planner hashes the smaller t2 -- and probes with t1; result locus HashedWorkers EXPLAIN(costs off, locus) SELECT count(*) FROM t1 RIGHT JOIN t2 USING (id); Finalize Aggregate Locus: Entry -> Gather Motion 6:1 (slice1; segments: 6) -> Partial Aggregate Locus: HashedWorkers Parallel Workers: 2 -> Parallel Hash Right Join Locus: HashedWorkers Parallel Workers: 2 Hash Cond: (t1.id = t2.id) -> Parallel Seq Scan on t1 Locus: HashedWorkers -> Parallel Hash -> Parallel Seq Scan on t2 Locus: HashedWorkers Performance (3 segments x 2 parallel workers, 6M rows each, 50% overlap): FULL JOIN parallel: 4040 ms serial: 6347 ms speedup: 1.57x RIGHT JOIN parallel: 3039 ms serial: 5568 ms speedup: 1.83x --- src/backend/cdb/cdbpath.c | 5 +++-- src/backend/cdb/cdbpathlocus.c | 28 +++++++++++++++++++++------- src/backend/executor/nodeHash.c | 19 ++++++++++--------- src/backend/executor/nodeHashjoin.c | 6 +++--- src/include/cdb/cdbpathlocus.h | 4 ++-- 5 files changed, 39 insertions(+), 23 deletions(-) diff --git a/src/backend/cdb/cdbpath.c b/src/backend/cdb/cdbpath.c index 9e3697a3b03..e9d7dac9895 100644 --- a/src/backend/cdb/cdbpath.c +++ b/src/backend/cdb/cdbpath.c @@ -3112,8 +3112,9 @@ cdbpath_motion_for_parallel_join(PlannerInfo *root, case JOIN_UNIQUE_INNER: case JOIN_RIGHT: case JOIN_FULL: - /* Join types are not supported in parallel yet. */ - goto fail; + outer.ok_to_replicate = false; + inner.ok_to_replicate = false; + break; case JOIN_DEDUP_SEMI: if (!enable_parallel_dedup_semi_join) goto fail; diff --git a/src/backend/cdb/cdbpathlocus.c b/src/backend/cdb/cdbpathlocus.c index 29930085429..dddae1aa64c 100644 --- a/src/backend/cdb/cdbpathlocus.c +++ b/src/backend/cdb/cdbpathlocus.c @@ -119,6 +119,11 @@ cdbpathlocus_equal(CdbPathLocus a, CdbPathLocus b) list_length(a.distkey) != list_length(b.distkey)) return false; + /* + * CBDB_PARALLEL: What if both a and b are HashedOJ with parallel workers > 0 ? + * Are they equal in practice? + */ + if ((CdbPathLocus_IsHashed(a) || CdbPathLocus_IsHashedOJ(a)) && (CdbPathLocus_IsHashed(b) || CdbPathLocus_IsHashedOJ(b))) return cdbpath_distkey_equal(a.distkey, b.distkey); @@ -544,7 +549,7 @@ cdbpathlocus_from_subquery(struct PlannerInfo *root, else { Assert(CdbPathLocus_IsHashedOJ(subpath->locus)); - CdbPathLocus_MakeHashedOJ(&locus, distkeys, numsegments); + CdbPathLocus_MakeHashedOJ(&locus, distkeys, numsegments, subpath->locus.parallel_workers); } } else @@ -711,7 +716,7 @@ cdbpathlocus_pull_above_projection(struct PlannerInfo *root, CdbPathLocus_MakeHashedWorkers(&newlocus, newdistkeys, numsegments, locus.parallel_workers); } else - CdbPathLocus_MakeHashedOJ(&newlocus, newdistkeys, numsegments); + CdbPathLocus_MakeHashedOJ(&newlocus, newdistkeys, numsegments, locus.parallel_workers); return newlocus; } else @@ -880,7 +885,7 @@ cdbpathlocus_join(JoinType jointype, CdbPathLocus a, CdbPathLocus b) newdistkeys = lappend(newdistkeys, newdistkey); } - CdbPathLocus_MakeHashedOJ(&resultlocus, newdistkeys, numsegments); + CdbPathLocus_MakeHashedOJ(&resultlocus, newdistkeys, numsegments, 0 /* Both are 0 parallel here*/); } Assert(cdbpathlocus_is_valid(resultlocus)); return resultlocus; @@ -1236,8 +1241,14 @@ cdbpathlocus_parallel_join(JoinType jointype, CdbPathLocus a, CdbPathLocus b, bo Assert(cdbpathlocus_is_valid(a)); Assert(cdbpathlocus_is_valid(b)); - /* Do both input rels have same locus? */ - if (cdbpathlocus_equal(a, b)) + /* + * Do both input rels have same locus? + * CBDB_PARALLEL: for FULL JOIN, it could be different even both + * are same loucs. Because the NULL values could be on any segments + * after join. + */ + + if (jointype != JOIN_FULL && cdbpathlocus_equal(a, b)) return a; /* @@ -1412,8 +1423,9 @@ cdbpathlocus_parallel_join(JoinType jointype, CdbPathLocus a, CdbPathLocus b, bo * If inner is hashed workers, and outer is hashed. Join locus will be hashed. * If outer is hashed workers, and inner is hashed. Join locus will be hashed workers. * Seems we should just return outer locus anyway. + * Things changed since we have parallel full join now. */ - if (parallel_aware) + if (parallel_aware && jointype != JOIN_FULL) return a; numsegments = CdbPathLocus_NumSegments(a); @@ -1469,7 +1481,9 @@ cdbpathlocus_parallel_join(JoinType jointype, CdbPathLocus a, CdbPathLocus b, bo newdistkeys = lappend(newdistkeys, newdistkey); } - CdbPathLocus_MakeHashedOJ(&resultlocus, newdistkeys, numsegments); + Assert(CdbPathLocus_NumParallelWorkers(a) == CdbPathLocus_NumParallelWorkers(b)); + + CdbPathLocus_MakeHashedOJ(&resultlocus, newdistkeys, numsegments, CdbPathLocus_NumParallelWorkers(a)); } Assert(cdbpathlocus_is_valid(resultlocus)); return resultlocus; diff --git a/src/backend/executor/nodeHash.c b/src/backend/executor/nodeHash.c index 1a305de21e6..c084f7e7c78 100644 --- a/src/backend/executor/nodeHash.c +++ b/src/backend/executor/nodeHash.c @@ -2401,11 +2401,11 @@ ExecParallelPrepHashTableForUnmatched(HashJoinState *hjstate) int curbatch = hashtable->curbatch; ParallelHashJoinBatch *batch = hashtable->batches[curbatch].shared; - Assert(BarrierPhase(&batch->batch_barrier) == PHJ_BATCH_PROBE); + Assert(BarrierPhase(&batch->batch_barrier) == PHJ_BATCH_PROBING); /* * It would not be deadlock-free to wait on the batch barrier, because it - * is in PHJ_BATCH_PROBE phase, and thus processes attached to it have + * is in PHJ_BATCH_PROBING phase, and thus processes attached to it have * already emitted tuples. Therefore, we'll hold a wait-free election: * only one process can continue to the next phase, and all others detach * from this batch. They can still go any work on other batches, if there @@ -3975,12 +3975,12 @@ ExecHashTableDetachBatch(HashJoinTable hashtable) sts_end_parallel_scan(hashtable->batches[curbatch].inner_tuples); sts_end_parallel_scan(hashtable->batches[curbatch].outer_tuples); - /* After attaching we always get at least to PHJ_BATCH_PROBE. */ - Assert(BarrierPhase(&batch->batch_barrier) == PHJ_BATCH_PROBE || + /* After attaching we always get at least to PHJ_BATCH_PROBING. */ + Assert(BarrierPhase(&batch->batch_barrier) == PHJ_BATCH_PROBING || BarrierPhase(&batch->batch_barrier) == PHJ_BATCH_SCAN); /* - * If we're abandoning the PHJ_BATCH_PROBE phase early without having + * If we're abandoning the PHJ_BATCH_PROBING phase early without having * reached the end of it, it means the plan doesn't want any more * tuples, and it is happy to abandon any tuples buffered in this * process's subplans. For correctness, we can't allow any process to @@ -3995,13 +3995,13 @@ ExecHashTableDetachBatch(HashJoinTable hashtable) * If phs_lasj_has_null is true, that means we have found null when building hash table, * there were no batches to detach. */ - if (BarrierPhase(&batch->batch_barrier) == PHJ_BATCH_PROBE && + if (BarrierPhase(&batch->batch_barrier) == PHJ_BATCH_PROBING && !hashtable->parallel_state->phs_lasj_has_null && /* CBDB_PARALLEL */ !hashtable->batches[curbatch].outer_eof) { /* * This flag may be written to by multiple backends during - * PHJ_BATCH_PROBE phase, but will only be read in PHJ_BATCH_SCAN + * PHJ_BATCH_PROBING phase, but will only be read in PHJ_BATCH_SCAN * phase so requires no extra locking. */ batch->skip_unmatched = true; @@ -4012,10 +4012,11 @@ ExecHashTableDetachBatch(HashJoinTable hashtable) * the PHJ_BATCH_SCAN phase just to maintain the invariant that * freeing happens in PHJ_BATCH_FREE, but that'll be wait-free. */ - if (BarrierPhase(&batch->batch_barrier) == PHJ_BATCH_PROBE && + if (BarrierPhase(&batch->batch_barrier) == PHJ_BATCH_PROBING && !hashtable->parallel_state->phs_lasj_has_null /* CBDB_PARALLEL */) attached = BarrierArriveAndDetachExceptLast(&batch->batch_barrier); - if (attached && BarrierArriveAndDetach(&batch->batch_barrier)) + if (attached && !hashtable->parallel_state->phs_lasj_has_null /* CBDB_PARALLEL */ && + BarrierArriveAndDetach(&batch->batch_barrier)) { /* * We are not longer attached to the batch barrier, but we're the diff --git a/src/backend/executor/nodeHashjoin.c b/src/backend/executor/nodeHashjoin.c index 7f58cafb75d..a28e6a14cdb 100644 --- a/src/backend/executor/nodeHashjoin.c +++ b/src/backend/executor/nodeHashjoin.c @@ -83,7 +83,7 @@ * PHJ_BATCH_ELECT -- initial state * PHJ_BATCH_ALLOCATE* -- one allocates buckets * PHJ_BATCH_LOAD -- all load the hash table from disk - * PHJ_BATCH_PROBE -- all probe + * PHJ_BATCH_PROBING -- all probe * PHJ_BATCH_SCAN* -- one does full/right unmatched scan * PHJ_BATCH_FREE* -- one frees memory * @@ -102,7 +102,7 @@ * to a barrier, unless the barrier has reached a phase that means that no * process will wait on it again. We emit tuples while attached to the build * barrier in phase PHJ_BUILD_RUN, and to a per-batch barrier in phase - * PHJ_BATCH_PROBE. These are advanced to PHJ_BUILD_FREE and PHJ_BATCH_SCAN + * PHJ_BATCH_PROBING. These are advanced to PHJ_BUILD_FREE and PHJ_BATCH_SCAN * respectively without waiting, using BarrierArriveAndDetach() and * BarrierArriveAndDetachExceptLast() respectively. The last to detach * receives a different return value so that it knows that it's safe to @@ -1549,7 +1549,7 @@ ExecParallelHashJoinNewBatch(HashJoinState *hjstate) * since that phase is already underway (the thing we * can't do under current deadlock-avoidance rules is wait * for others to arrive at PHJ_BATCH_SCAN, because - * PHJ_BATCH_PROBE emits tuples, but in this case we just + * PHJ_BATCH_PROBING emits tuples, but in this case we just * got here without waiting). That is not yet done. For * now, we just detach and go around again. We have to * use ExecHashTableDetachBatch() because there's a small diff --git a/src/include/cdb/cdbpathlocus.h b/src/include/cdb/cdbpathlocus.h index 0f71ba55dfb..9f5a8227e68 100644 --- a/src/include/cdb/cdbpathlocus.h +++ b/src/include/cdb/cdbpathlocus.h @@ -292,13 +292,13 @@ typedef struct CdbPathLocus _locus->parallel_workers = (parallel_workers_); \ Assert(cdbpathlocus_is_valid(*_locus)); \ } while (0) -#define CdbPathLocus_MakeHashedOJ(plocus, distkey_, numsegments_) \ +#define CdbPathLocus_MakeHashedOJ(plocus, distkey_, numsegments_, parallel_workers_) \ do { \ CdbPathLocus *_locus = (plocus); \ _locus->locustype = CdbLocusType_HashedOJ; \ _locus->numsegments = (numsegments_); \ _locus->distkey = (distkey_); \ - _locus->parallel_workers = 0; \ + _locus->parallel_workers = (parallel_workers_); \ Assert(cdbpathlocus_is_valid(*_locus)); \ } while (0) #define CdbPathLocus_MakeHashedWorkers(plocus, distkey_, numsegments_, parallel_workers_) \ From a073d9a4f4abe178500a99f3833fa29a9b21c668 Mon Sep 17 00:00:00 2001 From: Zhang Mingli Date: Tue, 24 Mar 2026 16:33:27 +0800 Subject: [PATCH 5/5] tests: add Parallel Hash Full/Right Join regression cases cbdb_parallel.sql: add a new test block covering: - Parallel Hash Full Join (HashedWorkers FULL JOIN HashedWorkers produces HashedOJ with parallel_workers=2) - Parallel Hash Right Join (pj_t1 is 3x larger than pj_t2, so the planner hashes the smaller pj_t2 and probes with pj_t1; result locus HashedWorkers) - Correctness checks: count(*) matches serial execution - Locus propagation: HashedOJ(parallel) followed by INNER JOIN produces HashedOJ; followed by FULL JOIN produces HashedOJ join_hash.sql/out: CBDB-specific adaptations for the upstream parallel full join test -- disable parallel mode for tests that require serial plans, fix SAVEPOINT inside a parallel worker context, and update expected output to match CBDB plan shapes. --- .../pax_storage/expected/cbdb_parallel.out | 183 ++++++--- src/test/regress/expected/cbdb_parallel.out | 370 +++++++++++++----- src/test/regress/expected/join_hash.out | 83 ++-- .../regress/expected/join_hash_optimizer.out | 204 +++++++--- src/test/regress/sql/cbdb_parallel.sql | 50 +++ src/test/regress/sql/join_hash.sql | 6 + 6 files changed, 668 insertions(+), 228 deletions(-) diff --git a/contrib/pax_storage/expected/cbdb_parallel.out b/contrib/pax_storage/expected/cbdb_parallel.out index db583090026..ec6ceba7e3c 100644 --- a/contrib/pax_storage/expected/cbdb_parallel.out +++ b/contrib/pax_storage/expected/cbdb_parallel.out @@ -41,13 +41,29 @@ set gp_appendonly_insert_files = 4; begin; set local enable_parallel = on; create table test_131_ao1(x int, y int) using ao_row with(parallel_workers=2); +NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'x' as the Apache Cloudberry data distribution key for this table. +HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew. create table test_131_ao2(x int, y int) using ao_row with(parallel_workers=2); +NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'x' as the Apache Cloudberry data distribution key for this table. +HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew. create table test_131_ao3(x int, y int) using ao_row with(parallel_workers=0); +NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'x' as the Apache Cloudberry data distribution key for this table. +HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew. create table test_131_ao4(x int, y int) using ao_row with(parallel_workers=0); +NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'x' as the Apache Cloudberry data distribution key for this table. +HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew. create table test_131_aoco1(x int, y int) using ao_column with(parallel_workers=2); +NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'x' as the Apache Cloudberry data distribution key for this table. +HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew. create table test_131_aoco2(x int, y int) using ao_column with(parallel_workers=2); +NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'x' as the Apache Cloudberry data distribution key for this table. +HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew. create table test_131_aoco3(x int, y int) using ao_column with(parallel_workers=0); +NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'x' as the Apache Cloudberry data distribution key for this table. +HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew. create table test_131_aoco4(x int, y int) using ao_column with(parallel_workers=0); +NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'x' as the Apache Cloudberry data distribution key for this table. +HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew. select relname, reloptions from pg_catalog.pg_class where relname like 'test_131_ao%'; relname | reloptions ----------------+---------------------- @@ -155,8 +171,14 @@ explain(locus, costs off) select count(*) from test_131_aoco3, test_131_aoco4 wh abort; create table ao1(x int, y int); +NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'x' as the Apache Cloudberry data distribution key for this table. +HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew. create table ao2(x int, y int); +NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'x' as the Apache Cloudberry data distribution key for this table. +HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew. create table aocs1(x int, y int); +NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'x' as the Apache Cloudberry data distribution key for this table. +HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew. begin; -- encourage use of parallel plans set local min_parallel_table_scan_size = 0; @@ -367,6 +389,8 @@ abort; begin; set local max_parallel_workers_per_gather = 2; create table t1(a int, b int) with(parallel_workers=2); +NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'a' as the Apache Cloudberry data distribution key for this table. +HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew. create table rt1(a int, b int) with(parallel_workers=2) distributed replicated; create table rt2(a int, b int) distributed replicated; create table rt3(a int, b int) distributed replicated; @@ -599,6 +623,8 @@ select * from rt1 join t1 on rt1.a = t1.b join rt2 on rt2.a = t1.b; 5 | 6 | 4 | 5 | 5 | 6 8 | 9 | 7 | 8 | 8 | 9 9 | 10 | 8 | 9 | 9 | 10 + 1 | 2 | 1 | 1 | 1 | 2 + 2 | 3 | 1 | 2 | 2 | 3 5 | 6 | 5 | 5 | 5 | 6 6 | 7 | 6 | 6 | 6 | 7 9 | 10 | 9 | 9 | 9 | 10 @@ -606,8 +632,6 @@ select * from rt1 join t1 on rt1.a = t1.b join rt2 on rt2.a = t1.b; 6 | 7 | 5 | 6 | 6 | 7 7 | 8 | 6 | 7 | 7 | 8 10 | 11 | 9 | 10 | 10 | 11 - 1 | 2 | 1 | 1 | 1 | 2 - 2 | 3 | 1 | 2 | 2 | 3 (19 rows) -- parallel hash join @@ -650,13 +674,6 @@ explain(locus, costs off) select * from rt1 join t1 on rt1.a = t1.b join rt2 on select * from rt1 join t1 on rt1.a = t1.b join rt2 on rt2.a = t1.b; a | b | a | b | a | b ----+----+----+----+----+---- - 5 | 6 | 5 | 5 | 5 | 6 - 6 | 7 | 5 | 6 | 6 | 7 - 6 | 7 | 6 | 6 | 6 | 7 - 7 | 8 | 6 | 7 | 7 | 8 - 9 | 10 | 9 | 9 | 9 | 10 - 10 | 11 | 9 | 10 | 10 | 11 - 10 | 11 | 10 | 10 | 10 | 11 2 | 3 | 2 | 2 | 2 | 3 3 | 4 | 2 | 3 | 3 | 4 3 | 4 | 3 | 3 | 3 | 4 @@ -669,6 +686,13 @@ select * from rt1 join t1 on rt1.a = t1.b join rt2 on rt2.a = t1.b; 9 | 10 | 8 | 9 | 9 | 10 1 | 2 | 1 | 1 | 1 | 2 2 | 3 | 1 | 2 | 2 | 3 + 5 | 6 | 5 | 5 | 5 | 6 + 6 | 7 | 5 | 6 | 6 | 7 + 6 | 7 | 6 | 6 | 6 | 7 + 7 | 8 | 6 | 7 | 7 | 8 + 9 | 10 | 9 | 9 | 9 | 10 + 10 | 11 | 9 | 10 | 10 | 11 + 10 | 11 | 10 | 10 | 10 | 11 (19 rows) -- @@ -702,6 +726,8 @@ explain(locus, costs off) select * from rt1 join t1 on rt1.a = t1.b join rt3 on select * from rt1 join t1 on rt1.a = t1.b join rt3 on rt3.a = t1.b; a | b | a | b | a | b ----+----+----+----+----+---- + 1 | 2 | 1 | 1 | 1 | 2 + 2 | 3 | 1 | 2 | 2 | 3 2 | 3 | 2 | 2 | 2 | 3 3 | 4 | 3 | 3 | 3 | 4 4 | 5 | 4 | 4 | 4 | 5 @@ -712,8 +738,6 @@ select * from rt1 join t1 on rt1.a = t1.b join rt3 on rt3.a = t1.b; 5 | 6 | 4 | 5 | 5 | 6 8 | 9 | 7 | 8 | 8 | 9 9 | 10 | 8 | 9 | 9 | 10 - 1 | 2 | 1 | 1 | 1 | 2 - 2 | 3 | 1 | 2 | 2 | 3 5 | 6 | 5 | 5 | 5 | 6 6 | 7 | 6 | 6 | 6 | 7 9 | 10 | 9 | 9 | 9 | 10 @@ -779,6 +803,8 @@ select * from rt1 join t1 on rt1.a = t1.b join rt3 on rt3.a = t1.b; (19 rows) create table t2(a int, b int) with(parallel_workers=0); +NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'a' as the Apache Cloudberry data distribution key for this table. +HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew. create table rt4(a int, b int) with(parallel_workers=2) distributed replicated; insert into t2 select i, i+1 from generate_series(1, 10) i; insert into rt4 select i, i+1 from generate_series(1, 10000) i; @@ -788,16 +814,16 @@ set local enable_parallel = off; select * from rt4 join t2 using(b); b | a | a ----+----+---- - 2 | 1 | 1 - 6 | 5 | 5 - 7 | 6 | 6 - 10 | 9 | 9 - 11 | 10 | 10 3 | 2 | 2 4 | 3 | 3 5 | 4 | 4 8 | 7 | 7 9 | 8 | 8 + 2 | 1 | 1 + 6 | 5 | 5 + 7 | 6 | 6 + 10 | 9 | 9 + 11 | 10 | 10 (10 rows) set local enable_parallel = on; @@ -828,19 +854,21 @@ explain(locus, costs off) select * from rt4 join t2 using(b); select * from rt4 join t2 using(b); b | a | a ----+----+---- - 2 | 1 | 1 + 6 | 5 | 5 + 7 | 6 | 6 + 10 | 9 | 9 + 11 | 10 | 10 3 | 2 | 2 4 | 3 | 3 5 | 4 | 4 8 | 7 | 7 9 | 8 | 8 - 6 | 5 | 5 - 7 | 6 | 6 - 10 | 9 | 9 - 11 | 10 | 10 + 2 | 1 | 1 (10 rows) create table t3(a int, b int) with(parallel_workers=2); +NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'a' as the Apache Cloudberry data distribution key for this table. +HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew. insert into t3 select i, i+1 from generate_series(1, 9000) i; analyze t3; set local enable_parallel = off; @@ -919,10 +947,10 @@ explain(locus, costs off) select * from t_replica_workers_2 join t_random_worker select * from t_replica_workers_2 join t_random_workers_0 using(a); a | b | b ---+---+--- - 2 | 3 | 3 - 3 | 4 | 4 1 | 2 | 2 + 2 | 3 | 3 4 | 5 | 5 + 3 | 4 | 4 5 | 6 | 6 (5 rows) @@ -931,11 +959,11 @@ set local enable_parallel=false; select * from t_replica_workers_2 join t_random_workers_0 using(a); a | b | b ---+---+--- - 2 | 3 | 3 3 | 4 | 4 - 1 | 2 | 2 - 4 | 5 | 5 5 | 6 | 6 + 4 | 5 | 5 + 1 | 2 | 2 + 2 | 3 | 3 (5 rows) abort; @@ -976,11 +1004,11 @@ explain(locus, costs off) select * from t_replica_workers_2 right join t_random_ select * from t_replica_workers_2 right join t_random_workers_2 using(a); a | b | b ---+---+--- - 5 | 6 | 6 1 | 2 | 2 2 | 3 | 3 3 | 4 | 4 4 | 5 | 5 + 5 | 6 | 6 (5 rows) -- non parallel results @@ -1028,14 +1056,14 @@ explain(locus, costs off) select * from t_replica_workers_2 join t_random_worker Locus: Strewn Parallel Workers: 2 Optimizer: Postgres query optimizer -(16 rows) +(15 rows) select * from t_replica_workers_2 join t_random_workers_2 using(a); a | b | b ---+---+--- - 2 | 3 | 3 1 | 2 | 2 3 | 4 | 4 + 2 | 3 | 3 4 | 5 | 5 5 | 6 | 6 (5 rows) @@ -1045,9 +1073,9 @@ set local enable_parallel=false; select * from t_replica_workers_2 join t_random_workers_2 using(a); a | b | b ---+---+--- - 2 | 3 | 3 1 | 2 | 2 3 | 4 | 4 + 2 | 3 | 3 4 | 5 | 5 5 | 6 | 6 (5 rows) @@ -1059,7 +1087,11 @@ abort; -- begin; create table t1(a int, b int) with(parallel_workers=3); +NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'a' as the Apache Cloudberry data distribution key for this table. +HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew. create table t2(b int, a int) with(parallel_workers=2); +NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'b' as the Apache Cloudberry data distribution key for this table. +HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew. insert into t1 select i, i+1 from generate_series(1, 10) i; insert into t2 select i, i+1 from generate_series(1, 5) i; analyze t1; @@ -1071,17 +1103,17 @@ explain(costs off) select * from t1 right join t2 on t1.b = t2.a; QUERY PLAN ------------------------------------------------------------------ Gather Motion 9:1 (slice1; segments: 9) - -> Parallel Hash Left Join - Hash Cond: (t2.a = t1.b) - -> Redistribute Motion 6:9 (slice2; segments: 6) - Hash Key: t2.a + -> Parallel Hash Right Join + Hash Cond: (t1.b = t2.a) + -> Redistribute Motion 9:9 (slice2; segments: 9) + Hash Key: t1.b Hash Module: 3 - -> Parallel Seq Scan on t2 + -> Parallel Seq Scan on t1 -> Parallel Hash - -> Redistribute Motion 9:9 (slice3; segments: 9) - Hash Key: t1.b + -> Redistribute Motion 6:9 (slice3; segments: 6) + Hash Key: t2.a Hash Module: 3 - -> Parallel Seq Scan on t1 + -> Parallel Seq Scan on t2 Optimizer: Postgres query optimizer (13 rows) @@ -1091,7 +1123,11 @@ abort; -- begin; create table t1(a int, b int) with(parallel_workers=2); +NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'a' as the Apache Cloudberry data distribution key for this table. +HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew. create table t2(a int, b int) with(parallel_workers=2); +NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'a' as the Apache Cloudberry data distribution key for this table. +HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew. insert into t1 select i%10, i from generate_series(1, 5) i; insert into t1 values (100000); insert into t2 select i%10, i from generate_series(1, 100000) i; @@ -1100,34 +1136,34 @@ analyze t2; set local enable_parallel = on; -- parallel hash join with shared table, SinglQE as outer partial path. explain(locus, costs off) select * from (select count(*) as a from t2) t2 left join t1 on t1.a = t2.a; - QUERY PLAN ------------------------------------------------------------------- + QUERY PLAN +------------------------------------------------------------------------ Gather Motion 6:1 (slice1; segments: 6) Locus: Entry - -> Parallel Hash Left Join - Locus: Hashed + -> Parallel Hash Right Join + Locus: HashedWorkers Parallel Workers: 2 - Hash Cond: ((count(*)) = t1.a) - -> Redistribute Motion 1:6 (slice2; segments: 1) - Locus: Hashed + Hash Cond: (t1.a = (count(*))) + -> Parallel Seq Scan on t1 + Locus: HashedWorkers Parallel Workers: 2 - Hash Key: (count(*)) - Hash Module: 3 - -> Finalize Aggregate - Locus: SingleQE - -> Gather Motion 6:1 (slice3; segments: 6) - Locus: SingleQE - -> Partial Aggregate - Locus: HashedWorkers - Parallel Workers: 2 - -> Parallel Seq Scan on t2 - Locus: HashedWorkers - Parallel Workers: 2 -> Parallel Hash Locus: Hashed - -> Parallel Seq Scan on t1 - Locus: HashedWorkers + -> Redistribute Motion 1:6 (slice2; segments: 1) + Locus: Hashed Parallel Workers: 2 + Hash Key: (count(*)) + Hash Module: 3 + -> Finalize Aggregate + Locus: SingleQE + -> Gather Motion 6:1 (slice3; segments: 6) + Locus: SingleQE + -> Partial Aggregate + Locus: HashedWorkers + Parallel Workers: 2 + -> Parallel Seq Scan on t2 + Locus: HashedWorkers + Parallel Workers: 2 Optimizer: Postgres query optimizer (27 rows) @@ -1323,12 +1359,18 @@ begin; create table rt1(a int, b int) distributed replicated; create table rt2(a int, b int) with (parallel_workers = 0) distributed replicated; create table t1(a int, b int); +NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'a' as the Apache Cloudberry data distribution key for this table. +HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew. create table t2(a int, b int) with (parallel_workers = 0); +NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'a' as the Apache Cloudberry data distribution key for this table. +HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew. insert into t1 select i, i+1 from generate_series(1, 10000) i; insert into t2 select i, i+1 from generate_series(1, 10000) i; insert into rt1 select i, i+1 from generate_series(1, 10000) i; insert into rt2 select i, i+1 from generate_series(1, 10000) i; CREATE TABLE sq1 AS SELECT a, b FROM t1 WHERE gp_segment_id = 0; +NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column(s) named 'a' as the Apache Cloudberry data distribution key for this table. +HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew. set local optimizer=off; set local enable_parallel=on; set local min_parallel_table_scan_size to 0; @@ -1385,7 +1427,7 @@ explain (locus, costs off) select * from rt1 union all select * from t1; -> Result Locus: Strewn Parallel Workers: 2 - One-Time Filter: (gp_execution_segment() = 1) + One-Time Filter: (gp_execution_segment() = 0) -> Parallel Seq Scan on rt1 Locus: SegmentGeneralWorkers Parallel Workers: 2 @@ -1409,7 +1451,7 @@ explain (locus, costs off) select * from rt1 union all select * from t2; -> Result Locus: Strewn Parallel Workers: 2 - One-Time Filter: (gp_execution_segment() = 1) + One-Time Filter: (gp_execution_segment() = 0) -> Parallel Seq Scan on rt1 Locus: SegmentGeneralWorkers Parallel Workers: 2 @@ -1482,6 +1524,8 @@ abort; -- begin; create table t1(c1 int, c2 int) with(parallel_workers=2); +NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'c1' as the Apache Cloudberry data distribution key for this table. +HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew. insert into t1 select i, i+1 from generate_series(1, 100000) i; analyze t1; set local optimizer = off; @@ -1549,6 +1593,8 @@ abort; -- begin; create table t1(c1 int, c2 int) with(parallel_workers=2); +NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'c1' as the Apache Cloudberry data distribution key for this table. +HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew. insert into t1 select i, i+1 from generate_series(1, 100000) i; analyze t1; set local optimizer = off; @@ -1768,6 +1814,8 @@ set local optimizer = off; set local enable_parallel = on; -- ao table create table ao (a INT, b INT) using ao_row; +NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'a' as the Apache Cloudberry data distribution key for this table. +HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew. insert into ao select i as a, i as b from generate_series(1, 100) AS i; alter table ao set (parallel_workers = 2); explain(costs off) select count(*) from ao; @@ -1789,6 +1837,8 @@ select count(*) from ao; alter table ao reset (parallel_workers); -- aocs table create table aocs (a INT, b INT) using ao_column; +NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'a' as the Apache Cloudberry data distribution key for this table. +HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew. insert into aocs select i as a, i as b from generate_series(1, 100) AS i; alter table aocs set (parallel_workers = 2); explain(costs off) select count(*) from aocs; @@ -1862,9 +1912,14 @@ select * from abort; begin; create table pagg_tab (a int, b int, c text, d int) partition by list(c); +NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'a' as the Apache Cloudberry data distribution key for this table. +HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew. create table pagg_tab_p1 partition of pagg_tab for values in ('0000', '0001', '0002', '0003', '0004'); +NOTICE: table has parent, setting distribution columns to match parent table create table pagg_tab_p2 partition of pagg_tab for values in ('0005', '0006', '0007', '0008'); +NOTICE: table has parent, setting distribution columns to match parent table create table pagg_tab_p3 partition of pagg_tab for values in ('0009', '0010', '0011'); +NOTICE: table has parent, setting distribution columns to match parent table insert into pagg_tab select i % 20, i % 30, to_char(i % 12, 'FM0000'), i % 30 from generate_series(0, 2999) i; analyze pagg_tab; set local enable_parallel to off; @@ -1939,7 +1994,11 @@ abort; -- begin; create table t1(a int, b int) with(parallel_workers=3); +NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'a' as the Apache Cloudberry data distribution key for this table. +HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew. create table t2(b int, a int) with(parallel_workers=2); +NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'b' as the Apache Cloudberry data distribution key for this table. +HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew. insert into t1 select i, i+1 from generate_series(1, 10) i; insert into t2 select i, i+1 from generate_series(1, 5) i; analyze t1; @@ -2329,6 +2388,8 @@ abort; -- prepare, execute locus is null begin; create table t1(c1 int, c2 int); +NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'c1' as the Apache Cloudberry data distribution key for this table. +HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew. analyze t1; prepare t1_count(integer) as select count(*) from t1; explain(locus, costs off) execute t1_count(1); diff --git a/src/test/regress/expected/cbdb_parallel.out b/src/test/regress/expected/cbdb_parallel.out index 35e90eebfa1..af975de50f4 100644 --- a/src/test/regress/expected/cbdb_parallel.out +++ b/src/test/regress/expected/cbdb_parallel.out @@ -112,8 +112,8 @@ set local enable_parallel_dedup_semi_reverse_join = on; set local enable_parallel_dedup_semi_join = on; explain (costs off) select sum(foo.a) from foo where exists (select 1 from bar where foo.a = bar.b); - QUERY PLAN ------------------------------------------------------------------------------------------------- + QUERY PLAN +----------------------------------------------------------------------------------------------- Finalize Aggregate -> Gather Motion 6:1 (slice1; segments: 6) -> Partial Aggregate @@ -1032,6 +1032,15 @@ explain(locus, costs off) select * from rt1 join t1 on rt1.a = t1.b join rt2 on select * from rt1 join t1 on rt1.a = t1.b join rt2 on rt2.a = t1.b; a | b | a | b | a | b ----+----+----+----+----+---- + 1 | 2 | 1 | 1 | 1 | 2 + 2 | 3 | 1 | 2 | 2 | 3 + 5 | 6 | 5 | 5 | 5 | 6 + 6 | 7 | 6 | 6 | 6 | 7 + 9 | 10 | 9 | 9 | 9 | 10 + 10 | 11 | 10 | 10 | 10 | 11 + 6 | 7 | 5 | 6 | 6 | 7 + 7 | 8 | 6 | 7 | 7 | 8 + 10 | 11 | 9 | 10 | 10 | 11 2 | 3 | 2 | 2 | 2 | 3 3 | 4 | 3 | 3 | 3 | 4 4 | 5 | 4 | 4 | 4 | 5 @@ -1042,15 +1051,6 @@ select * from rt1 join t1 on rt1.a = t1.b join rt2 on rt2.a = t1.b; 5 | 6 | 4 | 5 | 5 | 6 8 | 9 | 7 | 8 | 8 | 9 9 | 10 | 8 | 9 | 9 | 10 - 5 | 6 | 5 | 5 | 5 | 6 - 6 | 7 | 6 | 6 | 6 | 7 - 9 | 10 | 9 | 9 | 9 | 10 - 10 | 11 | 10 | 10 | 10 | 11 - 6 | 7 | 5 | 6 | 6 | 7 - 7 | 8 | 6 | 7 | 7 | 8 - 10 | 11 | 9 | 10 | 10 | 11 - 1 | 2 | 1 | 1 | 1 | 2 - 2 | 3 | 1 | 2 | 2 | 3 (19 rows) -- parallel hash join @@ -1093,13 +1093,8 @@ explain(locus, costs off) select * from rt1 join t1 on rt1.a = t1.b join rt2 on select * from rt1 join t1 on rt1.a = t1.b join rt2 on rt2.a = t1.b; a | b | a | b | a | b ----+----+----+----+----+---- - 5 | 6 | 5 | 5 | 5 | 6 - 6 | 7 | 5 | 6 | 6 | 7 - 6 | 7 | 6 | 6 | 6 | 7 - 7 | 8 | 6 | 7 | 7 | 8 - 9 | 10 | 9 | 9 | 9 | 10 - 10 | 11 | 9 | 10 | 10 | 11 - 10 | 11 | 10 | 10 | 10 | 11 + 1 | 2 | 1 | 1 | 1 | 2 + 2 | 3 | 1 | 2 | 2 | 3 2 | 3 | 2 | 2 | 2 | 3 3 | 4 | 2 | 3 | 3 | 4 3 | 4 | 3 | 3 | 3 | 4 @@ -1110,8 +1105,13 @@ select * from rt1 join t1 on rt1.a = t1.b join rt2 on rt2.a = t1.b; 8 | 9 | 7 | 8 | 8 | 9 8 | 9 | 8 | 8 | 8 | 9 9 | 10 | 8 | 9 | 9 | 10 - 1 | 2 | 1 | 1 | 1 | 2 - 2 | 3 | 1 | 2 | 2 | 3 + 5 | 6 | 5 | 5 | 5 | 6 + 6 | 7 | 5 | 6 | 6 | 7 + 6 | 7 | 6 | 6 | 6 | 7 + 7 | 8 | 6 | 7 | 7 | 8 + 9 | 10 | 9 | 9 | 9 | 10 + 10 | 11 | 9 | 10 | 10 | 11 + 10 | 11 | 10 | 10 | 10 | 11 (19 rows) -- @@ -1145,6 +1145,8 @@ explain(locus, costs off) select * from rt1 join t1 on rt1.a = t1.b join rt3 on select * from rt1 join t1 on rt1.a = t1.b join rt3 on rt3.a = t1.b; a | b | a | b | a | b ----+----+----+----+----+---- + 1 | 2 | 1 | 1 | 1 | 2 + 2 | 3 | 1 | 2 | 2 | 3 2 | 3 | 2 | 2 | 2 | 3 3 | 4 | 3 | 3 | 3 | 4 4 | 5 | 4 | 4 | 4 | 5 @@ -1155,8 +1157,6 @@ select * from rt1 join t1 on rt1.a = t1.b join rt3 on rt3.a = t1.b; 5 | 6 | 4 | 5 | 5 | 6 8 | 9 | 7 | 8 | 8 | 9 9 | 10 | 8 | 9 | 9 | 10 - 1 | 2 | 1 | 1 | 1 | 2 - 2 | 3 | 1 | 2 | 2 | 3 5 | 6 | 5 | 5 | 5 | 6 6 | 7 | 6 | 6 | 6 | 7 9 | 10 | 9 | 9 | 9 | 10 @@ -1201,14 +1201,11 @@ select * from rt1 join t1 on rt1.a = t1.b join rt3 on rt3.a = t1.b; a | b | a | b | a | b ----+----+----+----+----+---- 1 | 2 | 1 | 1 | 1 | 2 - 2 | 3 | 1 | 2 | 2 | 3 5 | 6 | 5 | 5 | 5 | 6 6 | 7 | 6 | 6 | 6 | 7 9 | 10 | 9 | 9 | 9 | 10 10 | 11 | 10 | 10 | 10 | 11 - 6 | 7 | 5 | 6 | 6 | 7 - 7 | 8 | 6 | 7 | 7 | 8 - 10 | 11 | 9 | 10 | 10 | 11 + 2 | 3 | 1 | 2 | 2 | 3 2 | 3 | 2 | 2 | 2 | 3 3 | 4 | 3 | 3 | 3 | 4 4 | 5 | 4 | 4 | 4 | 5 @@ -1219,6 +1216,9 @@ select * from rt1 join t1 on rt1.a = t1.b join rt3 on rt3.a = t1.b; 5 | 6 | 4 | 5 | 5 | 6 8 | 9 | 7 | 8 | 8 | 9 9 | 10 | 8 | 9 | 9 | 10 + 6 | 7 | 5 | 6 | 6 | 7 + 7 | 8 | 6 | 7 | 7 | 8 + 10 | 11 | 9 | 10 | 10 | 11 (19 rows) create table t2(a int, b int) with(parallel_workers=0); @@ -1271,12 +1271,12 @@ explain(locus, costs off) select * from rt4 join t2 using(b); select * from rt4 join t2 using(b); b | a | a ----+----+---- - 2 | 1 | 1 3 | 2 | 2 4 | 3 | 3 5 | 4 | 4 8 | 7 | 7 9 | 8 | 8 + 2 | 1 | 1 6 | 5 | 5 7 | 6 | 6 10 | 9 | 9 @@ -1362,9 +1362,9 @@ explain(locus, costs off) select * from t_replica_workers_2 join t_random_worker select * from t_replica_workers_2 join t_random_workers_0 using(a); a | b | b ---+---+--- - 2 | 3 | 3 - 3 | 4 | 4 1 | 2 | 2 + 3 | 4 | 4 + 2 | 3 | 3 4 | 5 | 5 5 | 6 | 6 (5 rows) @@ -1374,9 +1374,9 @@ set local enable_parallel=false; select * from t_replica_workers_2 join t_random_workers_0 using(a); a | b | b ---+---+--- - 2 | 3 | 3 - 3 | 4 | 4 1 | 2 | 2 + 3 | 4 | 4 + 2 | 3 | 3 4 | 5 | 5 5 | 6 | 6 (5 rows) @@ -1419,9 +1419,9 @@ explain(locus, costs off) select * from t_replica_workers_2 right join t_random_ select * from t_replica_workers_2 right join t_random_workers_2 using(a); a | b | b ---+---+--- + 2 | 3 | 3 5 | 6 | 6 1 | 2 | 2 - 2 | 3 | 3 3 | 4 | 4 4 | 5 | 5 (5 rows) @@ -1431,11 +1431,11 @@ set local enable_parallel=false; select * from t_replica_workers_2 right join t_random_workers_2 using(a); a | b | b ---+---+--- + 5 | 6 | 6 1 | 2 | 2 - 2 | 3 | 3 3 | 4 | 4 4 | 5 | 5 - 5 | 6 | 6 + 2 | 3 | 3 (5 rows) abort; @@ -1471,13 +1471,13 @@ explain(locus, costs off) select * from t_replica_workers_2 join t_random_worker Locus: Strewn Parallel Workers: 2 Optimizer: Postgres query optimizer -(16 rows) +(15 rows) select * from t_replica_workers_2 join t_random_workers_2 using(a); a | b | b ---+---+--- - 2 | 3 | 3 1 | 2 | 2 + 2 | 3 | 3 3 | 4 | 4 4 | 5 | 5 5 | 6 | 6 @@ -1488,11 +1488,11 @@ set local enable_parallel=false; select * from t_replica_workers_2 join t_random_workers_2 using(a); a | b | b ---+---+--- - 2 | 3 | 3 - 1 | 2 | 2 3 | 4 | 4 4 | 5 | 5 5 | 6 | 6 + 1 | 2 | 2 + 2 | 3 | 3 (5 rows) abort; @@ -1510,28 +1510,28 @@ analyze t1; analyze rt1; set local enable_parallel = on; explain(locus, costs off) select * from (select count(*) as a from t1) t1 left join rt1 on rt1.a = t1.a; - QUERY PLAN ------------------------------------------------------- - Parallel Hash Left Join + QUERY PLAN +------------------------------------------------------------ + Parallel Hash Right Join Locus: Entry - Hash Cond: ((count(*)) = rt1.a) - -> Finalize Aggregate + Hash Cond: (rt1.a = (count(*))) + -> Gather Motion 2:1 (slice1; segments: 2) Locus: Entry - -> Gather Motion 6:1 (slice1; segments: 6) - Locus: Entry - -> Partial Aggregate - Locus: HashedWorkers - Parallel Workers: 2 - -> Parallel Seq Scan on t1 - Locus: HashedWorkers - Parallel Workers: 2 + -> Parallel Seq Scan on rt1 + Locus: SegmentGeneralWorkers + Parallel Workers: 2 -> Parallel Hash Locus: Entry - -> Gather Motion 2:1 (slice2; segments: 2) + -> Finalize Aggregate Locus: Entry - -> Parallel Seq Scan on rt1 - Locus: SegmentGeneralWorkers - Parallel Workers: 2 + -> Gather Motion 6:1 (slice2; segments: 6) + Locus: Entry + -> Partial Aggregate + Locus: HashedWorkers + Parallel Workers: 2 + -> Parallel Seq Scan on t1 + Locus: HashedWorkers + Parallel Workers: 2 Optimizer: Postgres query optimizer (21 rows) @@ -1661,17 +1661,17 @@ explain(costs off) select * from t1 right join t2 on t1.b = t2.a; QUERY PLAN ------------------------------------------------------------------ Gather Motion 9:1 (slice1; segments: 9) - -> Parallel Hash Left Join - Hash Cond: (t2.a = t1.b) - -> Redistribute Motion 6:9 (slice2; segments: 6) - Hash Key: t2.a + -> Parallel Hash Right Join + Hash Cond: (t1.b = t2.a) + -> Redistribute Motion 9:9 (slice2; segments: 9) + Hash Key: t1.b Hash Module: 3 - -> Parallel Seq Scan on t2 + -> Parallel Seq Scan on t1 -> Parallel Hash - -> Redistribute Motion 9:9 (slice3; segments: 9) - Hash Key: t1.b + -> Redistribute Motion 6:9 (slice3; segments: 6) + Hash Key: t2.a Hash Module: 3 - -> Parallel Seq Scan on t1 + -> Parallel Seq Scan on t2 Optimizer: Postgres query optimizer (13 rows) @@ -1690,34 +1690,34 @@ analyze t2; set local enable_parallel = on; -- parallel hash join with shared table, SinglQE as outer partial path. explain(locus, costs off) select * from (select count(*) as a from t2) t2 left join t1 on t1.a = t2.a; - QUERY PLAN ------------------------------------------------------------------- + QUERY PLAN +------------------------------------------------------------------------ Gather Motion 6:1 (slice1; segments: 6) Locus: Entry - -> Parallel Hash Left Join - Locus: Hashed + -> Parallel Hash Right Join + Locus: HashedWorkers Parallel Workers: 2 - Hash Cond: ((count(*)) = t1.a) - -> Redistribute Motion 1:6 (slice2; segments: 1) - Locus: Hashed + Hash Cond: (t1.a = (count(*))) + -> Parallel Seq Scan on t1 + Locus: HashedWorkers Parallel Workers: 2 - Hash Key: (count(*)) - Hash Module: 3 - -> Finalize Aggregate - Locus: SingleQE - -> Gather Motion 6:1 (slice3; segments: 6) - Locus: SingleQE - -> Partial Aggregate - Locus: HashedWorkers - Parallel Workers: 2 - -> Parallel Seq Scan on t2 - Locus: HashedWorkers - Parallel Workers: 2 -> Parallel Hash Locus: Hashed - -> Parallel Seq Scan on t1 - Locus: HashedWorkers + -> Redistribute Motion 1:6 (slice2; segments: 1) + Locus: Hashed Parallel Workers: 2 + Hash Key: (count(*)) + Hash Module: 3 + -> Finalize Aggregate + Locus: SingleQE + -> Gather Motion 6:1 (slice3; segments: 6) + Locus: SingleQE + -> Partial Aggregate + Locus: HashedWorkers + Parallel Workers: 2 + -> Parallel Seq Scan on t2 + Locus: HashedWorkers + Parallel Workers: 2 Optimizer: Postgres query optimizer (27 rows) @@ -1975,7 +1975,7 @@ explain (locus, costs off) select * from rt1 union all select * from t1; -> Result Locus: Strewn Parallel Workers: 3 - One-Time Filter: (gp_execution_segment() = 0) + One-Time Filter: (gp_execution_segment() = 1) -> Parallel Seq Scan on rt1 Locus: SegmentGeneralWorkers Parallel Workers: 3 @@ -1999,7 +1999,7 @@ explain (locus, costs off) select * from rt1 union all select * from t2; -> Result Locus: Strewn Parallel Workers: 3 - One-Time Filter: (gp_execution_segment() = 0) + One-Time Filter: (gp_execution_segment() = 1) -> Parallel Seq Scan on rt1 Locus: SegmentGeneralWorkers Parallel Workers: 3 @@ -2296,8 +2296,8 @@ analyze t1; analyze t2; analyze t3_null; explain(costs off) select sum(t1.c1) from t1 where c1 not in (select c2 from t2); - QUERY PLAN ------------------------------------------------------------------------------------- + QUERY PLAN +----------------------------------------------------------------------------------- Finalize Aggregate -> Gather Motion 6:1 (slice1; segments: 6) -> Partial Aggregate @@ -2317,8 +2317,8 @@ select sum(t1.c1) from t1 where c1 not in (select c2 from t2); (1 row) explain(costs off) select * from t1 where c1 not in (select c2 from t3_null); - QUERY PLAN ------------------------------------------------------------------------- + QUERY PLAN +----------------------------------------------------------------------- Gather Motion 6:1 (slice1; segments: 6) -> Parallel Hash Left Anti Semi (Not-In) Join Hash Cond: (t1.c1 = t3_null.c2) @@ -2457,8 +2457,11 @@ abort; begin; create table pagg_tab (a int, b int, c text, d int) partition by list(c); create table pagg_tab_p1 partition of pagg_tab for values in ('0000', '0001', '0002', '0003', '0004'); +NOTICE: table has parent, setting distribution columns to match parent table create table pagg_tab_p2 partition of pagg_tab for values in ('0005', '0006', '0007', '0008'); +NOTICE: table has parent, setting distribution columns to match parent table create table pagg_tab_p3 partition of pagg_tab for values in ('0009', '0010', '0011'); +NOTICE: table has parent, setting distribution columns to match parent table insert into pagg_tab select i % 20, i % 30, to_char(i % 12, 'FM0000'), i % 30 from generate_series(0, 2999) i; analyze pagg_tab; set local enable_parallel to off; @@ -2972,7 +2975,7 @@ create table t2_anti(a int, b int) with(parallel_workers=2) distributed by (b); insert into t2_anti values(generate_series(5, 10)); explain(costs off, verbose) select t1_anti.a, t1_anti.b from t1_anti left join t2_anti on t1_anti.a = t2_anti.a where t2_anti.a is null; - QUERY PLAN + QUERY PLAN ------------------------------------------------------------------ Gather Motion 3:1 (slice1; segments: 3) Output: t1_anti.a, t1_anti.b @@ -3068,8 +3071,8 @@ select t1_anti.a, t1_anti.b from t1_anti left join t2_anti on t1_anti.a = t2_ant ---+--- 3 | 4 | - 1 | 2 | + 1 | (4 rows) abort; @@ -3098,7 +3101,7 @@ insert into t_distinct_0 select * from t_distinct_0; analyze t_distinct_0; explain(costs off) select distinct a from t_distinct_0; - QUERY PLAN + QUERY PLAN ------------------------------------------------------------ Gather Motion 3:1 (slice1; segments: 3) -> HashAggregate @@ -3232,8 +3235,6 @@ select distinct a, b from t_distinct_0; drop table if exists t_distinct_1; NOTICE: table "t_distinct_1" does not exist, skipping create table t_distinct_1(a int, b int) using ao_column; -NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'a' as the Apache Cloudberry data distribution key for this table. -HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew. insert into t_distinct_1 select * from t_distinct_0; analyze t_distinct_1; set enable_parallel = off; @@ -3520,10 +3521,7 @@ WHERE e.salary > ( -- Test https://github.com/apache/cloudberry/issues/1376 -- create table t1(a int, b int); -NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'a' as the Apache Cloudberry data distribution key for this table. -HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew. create table t2 (like t1); -NOTICE: table doesn't have 'DISTRIBUTED BY' clause, defaulting to distribution columns from LIKE table set gp_cte_sharing = on; explain(locus, costs off) with x as (select a, count(*) as b from t1 group by a union all @@ -3571,8 +3569,184 @@ explain(locus, costs off) with x as reset gp_cte_sharing; reset enable_parallel; reset min_parallel_table_scan_size; +-- +-- Parallel Hash Full/Right Join +-- +begin; +create table pj_t1(id int, v int) with(parallel_workers=2) distributed by (id); +create table pj_t2(id int, v int) with(parallel_workers=2) distributed by (id); +create table pj_t3(id int, v int) with(parallel_workers=0) distributed by (id); +-- pj_t1 is 3x larger than pj_t2 so the planner hashes the smaller pj_t2 +-- and probes with pj_t1, producing a genuine Parallel Hash Right Join plan. +insert into pj_t1 select i, i from generate_series(1,30000)i; +insert into pj_t2 select i, i from generate_series(25001,35000)i; +insert into pj_t3 select i, i from generate_series(1,10000)i; +analyze pj_t1; +analyze pj_t2; +analyze pj_t3; +set local enable_parallel = on; +set local min_parallel_table_scan_size = 0; +-- 12_P_12_10: Parallel Hash Full Join: HashedWorkers FULL JOIN HashedWorkers -> HashedOJ(parallel) +explain(costs off, locus) +select count(*) from pj_t1 full join pj_t2 using (id); + QUERY PLAN +---------------------------------------------------------- + Finalize Aggregate + Locus: Entry + -> Gather Motion 6:1 (slice1; segments: 6) + Locus: Entry + -> Partial Aggregate + Locus: HashedOJ + Parallel Workers: 2 + -> Parallel Hash Full Join + Locus: HashedOJ + Parallel Workers: 2 + Hash Cond: (pj_t1.id = pj_t2.id) + -> Parallel Seq Scan on pj_t1 + Locus: HashedWorkers + Parallel Workers: 2 + -> Parallel Hash + Locus: Hashed + -> Parallel Seq Scan on pj_t2 + Locus: HashedWorkers + Parallel Workers: 2 + Optimizer: Postgres query optimizer +(20 rows) + +-- correctness: parallel result matches non-parallel +set local enable_parallel = off; +select count(*) from pj_t1 full join pj_t2 using (id); + count +------- + 35000 +(1 row) + +set local enable_parallel = on; +select count(*) from pj_t1 full join pj_t2 using (id); + count +------- + 35000 +(1 row) + +-- Parallel Hash Right Join: pj_t1 (30K) is larger, so the planner hashes the smaller pj_t2 +-- (10K) as the build side and probes with pj_t1; result locus HashedWorkers(parallel) +explain(costs off, locus) +select count(*) from pj_t1 right join pj_t2 using (id); + QUERY PLAN +---------------------------------------------------------- + Finalize Aggregate + Locus: Entry + -> Gather Motion 6:1 (slice1; segments: 6) + Locus: Entry + -> Partial Aggregate + Locus: HashedWorkers + Parallel Workers: 2 + -> Parallel Hash Right Join + Locus: HashedWorkers + Parallel Workers: 2 + Hash Cond: (pj_t1.id = pj_t2.id) + -> Parallel Seq Scan on pj_t1 + Locus: HashedWorkers + Parallel Workers: 2 + -> Parallel Hash + Locus: Hashed + -> Parallel Seq Scan on pj_t2 + Locus: HashedWorkers + Parallel Workers: 2 + Optimizer: Postgres query optimizer +(20 rows) + +-- correctness: parallel result matches non-parallel +set local enable_parallel = off; +select count(*) from pj_t1 right join pj_t2 using (id); + count +------- + 10000 +(1 row) + +set local enable_parallel = on; +select count(*) from pj_t1 right join pj_t2 using (id); + count +------- + 10000 +(1 row) + +-- Locus propagation: HashedOJ(parallel) followed by INNER JOIN with Hashed(serial) +-- The full join result (HashedOJ,parallel=2) is joined with pj_t3 (Hashed,serial) +explain(costs off, locus) +select count(*) from (pj_t1 full join pj_t2 using (id)) fj inner join pj_t3 using (id); + QUERY PLAN +--------------------------------------------------------------------------- + Finalize Aggregate + Locus: Entry + -> Gather Motion 3:1 (slice1; segments: 3) + Locus: Entry + -> Partial Aggregate + Locus: HashedOJ + -> Hash Join + Locus: HashedOJ + Hash Cond: (COALESCE(pj_t1.id, pj_t2.id) = pj_t3.id) + -> Hash Full Join + Locus: HashedOJ + Hash Cond: (pj_t1.id = pj_t2.id) + -> Seq Scan on pj_t1 + Locus: Hashed + -> Hash + Locus: Hashed + -> Seq Scan on pj_t2 + Locus: Hashed + -> Hash + Locus: Replicated + -> Broadcast Motion 3:3 (slice2; segments: 3) + Locus: Replicated + -> Seq Scan on pj_t3 + Locus: Hashed + Optimizer: Postgres query optimizer +(25 rows) + +-- Locus propagation: HashedOJ(parallel) followed by FULL JOIN with Hashed(serial) +explain(costs off, locus) +select count(*) from (pj_t1 full join pj_t2 using (id)) fj full join pj_t3 using (id); + QUERY PLAN +-------------------------------------------------------------------------- + Finalize Aggregate + Locus: Entry + -> Gather Motion 3:1 (slice1; segments: 3) + Locus: Entry + -> Partial Aggregate + Locus: HashedOJ + -> Hash Full Join + Locus: HashedOJ + Hash Cond: (COALESCE(pj_t1.id, pj_t2.id) = pj_t3.id) + -> Redistribute Motion 3:3 (slice2; segments: 3) + Locus: Hashed + Hash Key: COALESCE(pj_t1.id, pj_t2.id) + -> Hash Full Join + Locus: HashedOJ + Hash Cond: (pj_t1.id = pj_t2.id) + -> Seq Scan on pj_t1 + Locus: Hashed + -> Hash + Locus: Hashed + -> Seq Scan on pj_t2 + Locus: Hashed + -> Hash + Locus: Hashed + -> Seq Scan on pj_t3 + Locus: Hashed + Optimizer: Postgres query optimizer +(26 rows) + +abort; -- start_ignore drop schema test_parallel cascade; +NOTICE: drop cascades to 6 other objects +DETAIL: drop cascades to table t_distinct_0 +drop cascades to table t_distinct_1 +drop cascades to table departments +drop cascades to table employees +drop cascades to table t1 +drop cascades to table t2 -- end_ignore reset gp_appendonly_insert_files; reset force_parallel_mode; diff --git a/src/test/regress/expected/join_hash.out b/src/test/regress/expected/join_hash.out index 28f4558ec04..e5f74c18d28 100644 --- a/src/test/regress/expected/join_hash.out +++ b/src/test/regress/expected/join_hash.out @@ -10,6 +10,9 @@ set allow_system_table_mods=on; set local min_parallel_table_scan_size = 0; set local parallel_setup_cost = 0; set local enable_hashjoin = on; +-- CBDB: disable CBDB parallel for these PG-originated tests; parallel full join +-- is tested separately in cbdb_parallel.sql. +set local enable_parallel = off; -- Extract bucket and batch counts from an explain analyze plan. In -- general we can't make assertions about how many batches (or -- buckets) will be required because it can vary, but we can in some @@ -58,12 +61,16 @@ $$; -- estimated size. create table simple as select generate_series(1, 60000) AS id, 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa'; +NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column(s) named 'id' as the Apache Cloudberry data distribution key for this table. +HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew. alter table simple set (parallel_workers = 2); analyze simple; -- Make a relation whose size we will under-estimate. We want stats -- to say 1000 rows, but actually there are 20,000 rows. create table bigger_than_it_looks as select generate_series(1, 60000) as id, 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa'; +NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column(s) named 'id' as the Apache Cloudberry data distribution key for this table. +HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew. alter table bigger_than_it_looks set (autovacuum_enabled = 'false'); WARNING: autovacuum is not supported in Cloudberry alter table bigger_than_it_looks set (parallel_workers = 2); @@ -73,6 +80,8 @@ update pg_class set reltuples = 1000 where relname = 'bigger_than_it_looks'; -- kind of skew that breaks our batching scheme. We want stats to say -- 2 rows, but actually there are 20,000 rows with the same key. create table extremely_skewed (id int, t text); +NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'id' as the Apache Cloudberry data distribution key for this table. +HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew. alter table extremely_skewed set (autovacuum_enabled = 'false'); WARNING: autovacuum is not supported in Cloudberry alter table extremely_skewed set (parallel_workers = 2); @@ -85,6 +94,8 @@ update pg_class where relname = 'extremely_skewed'; -- Make a relation with a couple of enormous tuples. create table wide as select generate_series(1, 2) as id, rpad('', 320000, 'x') as t; +NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column(s) named 'id' as the Apache Cloudberry data distribution key for this table. +HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew. alter table wide set (parallel_workers = 2); ANALYZE wide; -- The "optimal" case: the hash table fits in memory; we plan for 1 @@ -319,7 +330,7 @@ $$); select count(*) from simple r full outer join simple s using (id); count ------- - 20000 + 60000 (1 row) rollback to settings; @@ -574,9 +585,13 @@ rollback to settings; -- Exercise rescans. We'll turn off parallel_leader_participation so -- that we can check that instrumentation comes back correctly. create table join_foo as select generate_series(1, 3) as id, 'xxxxx'::text as t; +NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column(s) named 'id' as the Apache Cloudberry data distribution key for this table. +HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew. analyze join_foo; alter table join_foo set (parallel_workers = 0); create table join_bar as select generate_series(1, 20000) as id, 'xxxxx'::text as t; +NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column(s) named 'id' as the Apache Cloudberry data distribution key for this table. +HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew. analyze join_bar; alter table join_bar set (parallel_workers = 2); -- multi-batch with rescan, parallel-oblivious @@ -854,23 +869,23 @@ savepoint settings; set local max_parallel_workers_per_gather = 2; explain (costs off) select count(*) from simple r full outer join simple s using (id); - QUERY PLAN -------------------------------------------------------------- + QUERY PLAN +---------------------------------------------------- Finalize Aggregate - -> Gather - Workers Planned: 2 + -> Gather Motion 3:1 (slice1; segments: 3) -> Partial Aggregate - -> Parallel Hash Full Join + -> Hash Full Join Hash Cond: (r.id = s.id) - -> Parallel Seq Scan on simple r - -> Parallel Hash - -> Parallel Seq Scan on simple s + -> Seq Scan on simple r + -> Hash + -> Seq Scan on simple s + Optimizer: Postgres query optimizer (9 rows) select count(*) from simple r full outer join simple s using (id); count ------- - 20000 + 60000 (1 row) rollback to settings; @@ -935,23 +950,25 @@ savepoint settings; set local max_parallel_workers_per_gather = 2; explain (costs off) select count(*) from simple r full outer join simple s on (r.id = 0 - s.id); - QUERY PLAN -------------------------------------------------------------- + QUERY PLAN +------------------------------------------------------------------------ Finalize Aggregate - -> Gather - Workers Planned: 2 + -> Gather Motion 3:1 (slice1; segments: 3) -> Partial Aggregate - -> Parallel Hash Full Join + -> Hash Full Join Hash Cond: ((0 - s.id) = r.id) - -> Parallel Seq Scan on simple s - -> Parallel Hash - -> Parallel Seq Scan on simple r -(9 rows) + -> Redistribute Motion 3:3 (slice2; segments: 3) + Hash Key: (0 - s.id) + -> Seq Scan on simple s + -> Hash + -> Seq Scan on simple r + Optimizer: Postgres query optimizer +(11 rows) select count(*) from simple r full outer join simple s on (r.id = 0 - s.id); - count -------- - 40000 + count +-------- + 120000 (1 row) rollback to settings; @@ -1013,7 +1030,11 @@ rollback to settings; savepoint settings; set max_parallel_workers_per_gather = 0; create table join_hash_t_small(a int); +NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'a' as the Apache Cloudberry data distribution key for this table. +HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew. create table join_hash_t_big(b int); +NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'b' as the Apache Cloudberry data distribution key for this table. +HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew. insert into join_hash_t_small select i%100 from generate_series(0, 3000)i; insert into join_hash_t_big select i%100000 from generate_series(1, 100000)i ; analyze join_hash_t_small; @@ -1031,17 +1052,25 @@ explain (costs off) select * from join_hash_t_small, join_hash_t_big where a = b (7 rows) rollback to settings; +rollback; -- Hash join reuses the HOT status bit to indicate match status. This can only -- be guaranteed to produce correct results if all the hash join tuple match -- bits are reset before reuse. This is done upon loading them into the -- hashtable. +begin; SAVEPOINT settings; +-- CBDB: disable CBDB parallel; the serial full join match-bit test is what matters here. +SET enable_parallel = off; SET enable_parallel_hash = on; SET min_parallel_table_scan_size = 0; SET parallel_setup_cost = 0; SET parallel_tuple_cost = 0; CREATE TABLE hjtest_matchbits_t1(id int); +NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'id' as the Apache Cloudberry data distribution key for this table. +HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew. CREATE TABLE hjtest_matchbits_t2(id int); +NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'id' as the Apache Cloudberry data distribution key for this table. +HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew. INSERT INTO hjtest_matchbits_t1 VALUES (1); INSERT INTO hjtest_matchbits_t2 VALUES (2); -- Update should create a HOT tuple. If this status bit isn't cleared, we won't @@ -1064,8 +1093,8 @@ SET enable_parallel_hash = off; SELECT * FROM hjtest_matchbits_t1 t1 FULL JOIN hjtest_matchbits_t2 t2 ON t1.id = t2.id; id | id ----+---- - 1 | | 2 + 1 | (2 rows) ROLLBACK TO settings; @@ -1085,7 +1114,11 @@ BEGIN; SET LOCAL enable_sort = OFF; -- avoid mergejoins SET LOCAL from_collapse_limit = 1; -- allows easy changing of join order CREATE TABLE hjtest_1 (a text, b int, id int, c bool); +NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'a' as the Apache Cloudberry data distribution key for this table. +HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew. CREATE TABLE hjtest_2 (a bool, id int, b text, c int); +NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'a' as the Apache Cloudberry data distribution key for this table. +HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew. INSERT INTO hjtest_1(a, b, id, c) VALUES ('text', 2, 1, false); -- matches INSERT INTO hjtest_1(a, b, id, c) VALUES ('text', 1, 2, false); -- fails id join condition INSERT INTO hjtest_1(a, b, id, c) VALUES ('text', 20, 1, false); -- fails < 50 @@ -1142,8 +1175,8 @@ WHERE SubPlan 2 -> Result Output: (hjtest_1.b * 5) + Settings: enable_parallel = 'on', enable_sort = 'off', from_collapse_limit = '1', optimizer = 'off' Optimizer: Postgres query optimizer - Settings: enable_sort=off, from_collapse_limit=1 (38 rows) SELECT hjtest_1.a a1, hjtest_2.a a2,hjtest_1.tableoid::regclass t1, hjtest_2.tableoid::regclass t2 @@ -1206,8 +1239,8 @@ WHERE SubPlan 3 -> Result Output: (hjtest_2.c * 5) + Settings: enable_parallel = 'on', enable_sort = 'off', from_collapse_limit = '1', optimizer = 'off' Optimizer: Postgres query optimizer - Settings: enable_sort=off, from_collapse_limit=1 (38 rows) SELECT hjtest_1.a a1, hjtest_2.a a2,hjtest_1.tableoid::regclass t1, hjtest_2.tableoid::regclass t2 diff --git a/src/test/regress/expected/join_hash_optimizer.out b/src/test/regress/expected/join_hash_optimizer.out index 053d0ef4898..1835bfa4f31 100644 --- a/src/test/regress/expected/join_hash_optimizer.out +++ b/src/test/regress/expected/join_hash_optimizer.out @@ -10,6 +10,9 @@ set allow_system_table_mods=on; set local min_parallel_table_scan_size = 0; set local parallel_setup_cost = 0; set local enable_hashjoin = on; +-- CBDB: disable CBDB parallel for these PG-originated tests; parallel full join +-- is tested separately in cbdb_parallel.sql. +set local enable_parallel = off; -- Extract bucket and batch counts from an explain analyze plan. In -- general we can't make assertions about how many batches (or -- buckets) will be required because it can vary, but we can in some @@ -115,7 +118,7 @@ explain (costs off) -> Redistribute Motion 3:3 (slice3; segments: 3) Hash Key: s.id -> Seq Scan on simple s - Optimizer: Pivotal Optimizer (GPORCA) + Optimizer: GPORCA (13 rows) select count(*) from simple r join simple s using (id); @@ -156,7 +159,7 @@ explain (costs off) -> Redistribute Motion 3:3 (slice3; segments: 3) Hash Key: s.id -> Seq Scan on simple s - Optimizer: Pivotal Optimizer (GPORCA) + Optimizer: GPORCA (13 rows) select count(*) from simple r join simple s using (id); @@ -197,7 +200,7 @@ explain (costs off) -> Redistribute Motion 3:3 (slice3; segments: 3) Hash Key: s.id -> Seq Scan on simple s - Optimizer: Pivotal Optimizer (GPORCA) + Optimizer: GPORCA (13 rows) select count(*) from simple r join simple s using (id); @@ -241,7 +244,7 @@ explain (costs off) -> Redistribute Motion 3:3 (slice3; segments: 3) Hash Key: s.id -> Seq Scan on simple s - Optimizer: Pivotal Optimizer (GPORCA) + Optimizer: GPORCA (13 rows) select count(*) from simple r join simple s using (id); @@ -283,7 +286,7 @@ explain (costs off) -> Redistribute Motion 3:3 (slice3; segments: 3) Hash Key: s.id -> Seq Scan on simple s - Optimizer: Pivotal Optimizer (GPORCA) + Optimizer: GPORCA (13 rows) select count(*) from simple r join simple s using (id); @@ -325,7 +328,7 @@ explain (costs off) -> Redistribute Motion 3:3 (slice3; segments: 3) Hash Key: s.id -> Seq Scan on simple s - Optimizer: Pivotal Optimizer (GPORCA) + Optimizer: GPORCA (13 rows) select count(*) from simple r join simple s using (id); @@ -344,6 +347,13 @@ $$); t | f (1 row) +-- parallel full multi-batch hash join +select count(*) from simple r full outer join simple s using (id); + count +------- + 60000 +(1 row) + rollback to settings; -- The "bad" case: during execution we need to increase number of -- batches; in this case we plan for 1 batch, and increase at least a @@ -356,8 +366,8 @@ set local work_mem = '128kB'; set local statement_mem = '1000kB'; -- GPDB uses statement_mem instead of work_mem explain (costs off) select count(*) FROM simple r JOIN bigger_than_it_looks s USING (id); - QUERY PLAN ------------------------------------------------------------------------------- + QUERY PLAN +--------------------------------------------------------------------------- Finalize Aggregate -> Gather Motion 3:1 (slice1; segments: 3) -> Partial Aggregate @@ -367,8 +377,8 @@ explain (costs off) -> Hash -> Broadcast Motion 3:3 (slice2; segments: 3) -> Seq Scan on bigger_than_it_looks s - Optimizer: Pivotal Optimizer (GPORCA) -(13 rows) + Optimizer: GPORCA +(10 rows) select count(*) FROM simple r JOIN bigger_than_it_looks s USING (id); count @@ -395,8 +405,8 @@ set local statement_mem = '1000kB'; -- GPDB uses statement_mem instead of work_m set local enable_parallel_hash = off; explain (costs off) select count(*) from simple r join bigger_than_it_looks s using (id); - QUERY PLAN ------------------------------------------------------------------------------- + QUERY PLAN +--------------------------------------------------------------------------- Finalize Aggregate -> Gather Motion 3:1 (slice1; segments: 3) -> Partial Aggregate @@ -406,8 +416,8 @@ explain (costs off) -> Hash -> Broadcast Motion 3:3 (slice2; segments: 3) -> Seq Scan on bigger_than_it_looks s - Optimizer: Pivotal Optimizer (GPORCA) -(13 rows) + Optimizer: GPORCA +(10 rows) select count(*) from simple r join bigger_than_it_looks s using (id); count @@ -434,8 +444,8 @@ set local statement_mem = '1000kB'; -- GPDB uses statement_mem instead of work_m set local enable_parallel_hash = on; explain (costs off) select count(*) from simple r join bigger_than_it_looks s using (id); - QUERY PLAN ------------------------------------------------------------------------------- + QUERY PLAN +--------------------------------------------------------------------------- Finalize Aggregate -> Gather Motion 3:1 (slice1; segments: 3) -> Partial Aggregate @@ -445,8 +455,8 @@ explain (costs off) -> Hash -> Broadcast Motion 3:3 (slice2; segments: 3) -> Seq Scan on bigger_than_it_looks s - Optimizer: Pivotal Optimizer (GPORCA) -(13 rows) + Optimizer: GPORCA +(10 rows) select count(*) from simple r join bigger_than_it_looks s using (id); count @@ -490,7 +500,7 @@ HINT: For non-partitioned tables, run analyze (). For -> Hash -> Broadcast Motion 3:3 (slice2; segments: 3) -> Seq Scan on extremely_skewed s - Optimizer: Pivotal Optimizer (GPORCA) + Optimizer: GPORCA (10 rows) select count(*) from simple r join extremely_skewed s using (id); @@ -534,7 +544,7 @@ HINT: For non-partitioned tables, run analyze (). For -> Hash -> Broadcast Motion 3:3 (slice2; segments: 3) -> Seq Scan on extremely_skewed s - Optimizer: Pivotal Optimizer (GPORCA) + Optimizer: GPORCA (10 rows) select count(*) from simple r join extremely_skewed s using (id); @@ -578,7 +588,7 @@ HINT: For non-partitioned tables, run analyze (). For -> Hash -> Broadcast Motion 3:3 (slice2; segments: 3) -> Seq Scan on extremely_skewed s - Optimizer: Pivotal Optimizer (GPORCA) + Optimizer: GPORCA (10 rows) select count(*) from simple r join extremely_skewed s using (id); @@ -643,8 +653,8 @@ explain (costs off) select count(*) from join_foo left join (select b1.id, b1.t from join_bar b1 join join_bar b2 using (id)) ss on join_foo.id < ss.id + 1 and join_foo.id > ss.id - 1; - QUERY PLAN ------------------------------------------------------------------------------------------------------------- + QUERY PLAN +------------------------------------------------------------------------------------------------ Finalize Aggregate -> Gather Motion 3:1 (slice1; segments: 3) -> Partial Aggregate @@ -662,7 +672,7 @@ explain (costs off) -> Redistribute Motion 3:3 (slice4; segments: 3) Hash Key: b2.id -> Seq Scan on join_bar b2 - Optimizer: Pivotal Optimizer (GPORCA) + Optimizer: GPORCA (18 rows) select count(*) from join_foo @@ -701,8 +711,8 @@ explain (costs off) select count(*) from join_foo left join (select b1.id, b1.t from join_bar b1 join join_bar b2 using (id)) ss on join_foo.id < ss.id + 1 and join_foo.id > ss.id - 1; - QUERY PLAN ------------------------------------------------------------------------------------------------------------- + QUERY PLAN +------------------------------------------------------------------------------------------------ Finalize Aggregate -> Gather Motion 3:1 (slice1; segments: 3) -> Partial Aggregate @@ -720,7 +730,7 @@ explain (costs off) -> Redistribute Motion 3:3 (slice4; segments: 3) Hash Key: b2.id -> Seq Scan on join_bar b2 - Optimizer: Pivotal Optimizer (GPORCA) + Optimizer: GPORCA (18 rows) select count(*) from join_foo @@ -760,8 +770,8 @@ explain (costs off) select count(*) from join_foo left join (select b1.id, b1.t from join_bar b1 join join_bar b2 using (id)) ss on join_foo.id < ss.id + 1 and join_foo.id > ss.id - 1; - QUERY PLAN ------------------------------------------------------------------------------------------------------------- + QUERY PLAN +------------------------------------------------------------------------------------------------ Finalize Aggregate -> Gather Motion 3:1 (slice1; segments: 3) -> Partial Aggregate @@ -779,7 +789,7 @@ explain (costs off) -> Redistribute Motion 3:3 (slice4; segments: 3) Hash Key: b2.id -> Seq Scan on join_bar b2 - Optimizer: Pivotal Optimizer (GPORCA) + Optimizer: GPORCA (18 rows) select count(*) from join_foo @@ -818,8 +828,8 @@ explain (costs off) select count(*) from join_foo left join (select b1.id, b1.t from join_bar b1 join join_bar b2 using (id)) ss on join_foo.id < ss.id + 1 and join_foo.id > ss.id - 1; - QUERY PLAN ------------------------------------------------------------------------------------------------------------- + QUERY PLAN +------------------------------------------------------------------------------------------------ Finalize Aggregate -> Gather Motion 3:1 (slice1; segments: 3) -> Partial Aggregate @@ -837,7 +847,7 @@ explain (costs off) -> Redistribute Motion 3:3 (slice4; segments: 3) Hash Key: b2.id -> Seq Scan on join_bar b2 - Optimizer: Pivotal Optimizer (GPORCA) + Optimizer: GPORCA (18 rows) select count(*) from join_foo @@ -891,8 +901,9 @@ select count(*) from simple r full outer join simple s using (id); (1 row) rollback to settings; --- parallelism not possible with parallel-oblivious outer hash join +-- parallelism not possible with parallel-oblivious full hash join savepoint settings; +set enable_parallel_hash = off; set local max_parallel_workers_per_gather = 2; explain (costs off) select count(*) from simple r full outer join simple s using (id); @@ -920,7 +931,36 @@ select count(*) from simple r full outer join simple s using (id); (1 row) rollback to settings; --- An full outer join where every record is not matched. +-- parallelism is possible with parallel-aware full hash join +savepoint settings; +set local max_parallel_workers_per_gather = 2; +explain (costs off) + select count(*) from simple r full outer join simple s using (id); + QUERY PLAN +------------------------------------------------------------------------------ + Finalize Aggregate + -> Gather Motion 3:1 (slice1; segments: 3) + -> Partial Aggregate + -> Hash Full Join + Hash Cond: (r.id = s.id) + -> Redistribute Motion 3:3 (slice2; segments: 3) + Hash Key: r.id + -> Seq Scan on simple r + -> Hash + -> Redistribute Motion 3:3 (slice3; segments: 3) + Hash Key: s.id + -> Seq Scan on simple s + Optimizer: GPORCA +(13 rows) + +select count(*) from simple r full outer join simple s using (id); + count +------- + 60000 +(1 row) + +rollback to settings; +-- A full outer join where every record is not matched. -- non-parallel savepoint settings; set local max_parallel_workers_per_gather = 0; @@ -950,7 +990,37 @@ select count(*) from simple r full outer join simple s on (r.id = 0 - s.id); (1 row) rollback to settings; --- parallelism not possible with parallel-oblivious outer hash join +-- parallelism not possible with parallel-oblivious full hash join +savepoint settings; +set enable_parallel_hash = off; +set local max_parallel_workers_per_gather = 2; +explain (costs off) + select count(*) from simple r full outer join simple s on (r.id = 0 - s.id); + QUERY PLAN +------------------------------------------------------------------------------ + Finalize Aggregate + -> Gather Motion 3:1 (slice1; segments: 3) + -> Partial Aggregate + -> Hash Full Join + Hash Cond: (r.id = (0 - s.id)) + -> Redistribute Motion 3:3 (slice2; segments: 3) + Hash Key: r.id + -> Seq Scan on simple r + -> Hash + -> Redistribute Motion 3:3 (slice3; segments: 3) + Hash Key: (0 - s.id) + -> Seq Scan on simple s + Optimizer: GPORCA +(13 rows) + +select count(*) from simple r full outer join simple s on (r.id = 0 - s.id); + count +-------- + 120000 +(1 row) + +rollback to settings; +-- parallelism is possible with parallel-aware full hash join savepoint settings; set local max_parallel_workers_per_gather = 2; explain (costs off) @@ -1012,7 +1082,7 @@ explain (costs off) -> Redistribute Motion 3:3 (slice3; segments: 3) Hash Key: wide_1.id -> Seq Scan on wide wide_1 - Optimizer: Pivotal Optimizer (GPORCA) + Optimizer: GPORCA (13 rows) select length(max(s.t)) @@ -1060,11 +1130,57 @@ explain (costs off) select * from join_hash_t_small, join_hash_t_big where a = b -> Seq Scan on join_hash_t_big -> Hash -> Seq Scan on join_hash_t_small - Optimizer: Pivotal Optimizer (GPORCA) + Optimizer: GPORCA (7 rows) rollback to settings; rollback; +-- Hash join reuses the HOT status bit to indicate match status. This can only +-- be guaranteed to produce correct results if all the hash join tuple match +-- bits are reset before reuse. This is done upon loading them into the +-- hashtable. +begin; +SAVEPOINT settings; +-- CBDB: disable CBDB parallel; the serial full join match-bit test is what matters here. +SET enable_parallel = off; +SET enable_parallel_hash = on; +SET min_parallel_table_scan_size = 0; +SET parallel_setup_cost = 0; +SET parallel_tuple_cost = 0; +CREATE TABLE hjtest_matchbits_t1(id int); +NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'id' as the Apache Cloudberry data distribution key for this table. +HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew. +CREATE TABLE hjtest_matchbits_t2(id int); +NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'id' as the Apache Cloudberry data distribution key for this table. +HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew. +INSERT INTO hjtest_matchbits_t1 VALUES (1); +INSERT INTO hjtest_matchbits_t2 VALUES (2); +-- Update should create a HOT tuple. If this status bit isn't cleared, we won't +-- correctly emit the NULL-extended unmatching tuple in full hash join. +UPDATE hjtest_matchbits_t2 set id = 2; +SELECT * FROM hjtest_matchbits_t1 t1 FULL JOIN hjtest_matchbits_t2 t2 ON t1.id = t2.id + ORDER BY t1.id; + id | id +----+---- + 1 | + | 2 +(2 rows) + +-- Test serial full hash join. +-- Resetting parallel_setup_cost should force a serial plan. +-- Just to be safe, however, set enable_parallel_hash to off, as parallel full +-- hash joins are only supported with shared hashtables. +RESET parallel_setup_cost; +SET enable_parallel_hash = off; +SELECT * FROM hjtest_matchbits_t1 t1 FULL JOIN hjtest_matchbits_t2 t2 ON t1.id = t2.id; + id | id +----+---- + | 2 + 1 | +(2 rows) + +ROLLBACK TO settings; +rollback; -- Verify that hash key expressions reference the correct -- nodes. Hashjoin's hashkeys need to reference its outer plan, Hash's -- need to reference Hash's outer plan (which is below HashJoin's @@ -1154,9 +1270,9 @@ WHERE Filter: (((hjtest_1.b * 5)) < 50) -> Result Output: (hjtest_1.b * 5) - Settings: enable_sort = 'off', from_collapse_limit = '1' - Optimizer: Pivotal Optimizer (GPORCA) -(49 rows) + Settings: enable_parallel = 'on', enable_sort = 'off', from_collapse_limit = '1', optimizer = 'on' + Optimizer: GPORCA +(51 rows) SELECT hjtest_1.a a1, hjtest_2.a a2,hjtest_1.tableoid::regclass t1, hjtest_2.tableoid::regclass t2 FROM hjtest_1, hjtest_2 @@ -1231,9 +1347,9 @@ WHERE Filter: (((hjtest_1.b * 5)) < 50) -> Result Output: (hjtest_1.b * 5) - Settings: enable_sort = 'off', from_collapse_limit = '1' - Optimizer: Pivotal Optimizer (GPORCA) -(49 rows) + Settings: enable_parallel = 'on', enable_sort = 'off', from_collapse_limit = '1', optimizer = 'on' + Optimizer: GPORCA +(51 rows) SELECT hjtest_1.a a1, hjtest_2.a a2,hjtest_1.tableoid::regclass t1, hjtest_2.tableoid::regclass t2 FROM hjtest_2, hjtest_1 diff --git a/src/test/regress/sql/cbdb_parallel.sql b/src/test/regress/sql/cbdb_parallel.sql index f9d01dd8a00..08e7aa198f9 100644 --- a/src/test/regress/sql/cbdb_parallel.sql +++ b/src/test/regress/sql/cbdb_parallel.sql @@ -1149,6 +1149,56 @@ reset gp_cte_sharing; reset enable_parallel; reset min_parallel_table_scan_size; +-- +-- Parallel Hash Full/Right Join +-- +begin; +create table pj_t1(id int, v int) with(parallel_workers=2) distributed by (id); +create table pj_t2(id int, v int) with(parallel_workers=2) distributed by (id); +create table pj_t3(id int, v int) with(parallel_workers=0) distributed by (id); + +-- pj_t1 is 3x larger than pj_t2 so the planner hashes the smaller pj_t2 +-- and probes with pj_t1, producing a genuine Parallel Hash Right Join plan. +insert into pj_t1 select i, i from generate_series(1,30000)i; +insert into pj_t2 select i, i from generate_series(25001,35000)i; +insert into pj_t3 select i, i from generate_series(1,10000)i; +analyze pj_t1; +analyze pj_t2; +analyze pj_t3; + +set local enable_parallel = on; +set local min_parallel_table_scan_size = 0; + +-- 12_P_12_10: Parallel Hash Full Join: HashedWorkers FULL JOIN HashedWorkers -> HashedOJ(parallel) +explain(costs off, locus) +select count(*) from pj_t1 full join pj_t2 using (id); +-- correctness: parallel result matches non-parallel +set local enable_parallel = off; +select count(*) from pj_t1 full join pj_t2 using (id); +set local enable_parallel = on; +select count(*) from pj_t1 full join pj_t2 using (id); + +-- Parallel Hash Right Join: pj_t1 (30K) is larger, so the planner hashes the smaller pj_t2 +-- (10K) as the build side and probes with pj_t1; result locus HashedWorkers(parallel) +explain(costs off, locus) +select count(*) from pj_t1 right join pj_t2 using (id); +-- correctness: parallel result matches non-parallel +set local enable_parallel = off; +select count(*) from pj_t1 right join pj_t2 using (id); +set local enable_parallel = on; +select count(*) from pj_t1 right join pj_t2 using (id); + +-- Locus propagation: HashedOJ(parallel) followed by INNER JOIN with Hashed(serial) +-- The full join result (HashedOJ,parallel=2) is joined with pj_t3 (Hashed,serial) +explain(costs off, locus) +select count(*) from (pj_t1 full join pj_t2 using (id)) fj inner join pj_t3 using (id); + +-- Locus propagation: HashedOJ(parallel) followed by FULL JOIN with Hashed(serial) +explain(costs off, locus) +select count(*) from (pj_t1 full join pj_t2 using (id)) fj full join pj_t3 using (id); + +abort; + -- start_ignore drop schema test_parallel cascade; -- end_ignore diff --git a/src/test/regress/sql/join_hash.sql b/src/test/regress/sql/join_hash.sql index 0115489a6b9..2978e155ecd 100644 --- a/src/test/regress/sql/join_hash.sql +++ b/src/test/regress/sql/join_hash.sql @@ -13,6 +13,9 @@ set allow_system_table_mods=on; set local min_parallel_table_scan_size = 0; set local parallel_setup_cost = 0; set local enable_hashjoin = on; +-- CBDB: disable CBDB parallel for these PG-originated tests; parallel full join +-- is tested separately in cbdb_parallel.sql. +set local enable_parallel = off; -- Extract bucket and batch counts from an explain analyze plan. In -- general we can't make assertions about how many batches (or @@ -543,7 +546,10 @@ rollback; -- be guaranteed to produce correct results if all the hash join tuple match -- bits are reset before reuse. This is done upon loading them into the -- hashtable. +begin; SAVEPOINT settings; +-- CBDB: disable CBDB parallel; the serial full join match-bit test is what matters here. +SET enable_parallel = off; SET enable_parallel_hash = on; SET min_parallel_table_scan_size = 0; SET parallel_setup_cost = 0;