Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
183 changes: 122 additions & 61 deletions contrib/pax_storage/expected/cbdb_parallel.out

Large diffs are not rendered by default.

5 changes: 3 additions & 2 deletions src/backend/cdb/cdbpath.c
Original file line number Diff line number Diff line change
Expand Up @@ -3112,8 +3112,9 @@ cdbpath_motion_for_parallel_join(PlannerInfo *root,
case JOIN_UNIQUE_INNER:
case JOIN_RIGHT:
case JOIN_FULL:
/* Join types are not supported in parallel yet. */
goto fail;
outer.ok_to_replicate = false;
inner.ok_to_replicate = false;
break;
case JOIN_DEDUP_SEMI:
if (!enable_parallel_dedup_semi_join)
goto fail;
Expand Down
28 changes: 21 additions & 7 deletions src/backend/cdb/cdbpathlocus.c
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,11 @@ cdbpathlocus_equal(CdbPathLocus a, CdbPathLocus b)
list_length(a.distkey) != list_length(b.distkey))
return false;

/*
* CBDB_PARALLEL: What if both a and b are HashedOJ with parallel workers > 0 ?
* Are they equal in practice?
*/

if ((CdbPathLocus_IsHashed(a) || CdbPathLocus_IsHashedOJ(a)) &&
(CdbPathLocus_IsHashed(b) || CdbPathLocus_IsHashedOJ(b)))
return cdbpath_distkey_equal(a.distkey, b.distkey);
Expand Down Expand Up @@ -544,7 +549,7 @@ cdbpathlocus_from_subquery(struct PlannerInfo *root,
else
{
Assert(CdbPathLocus_IsHashedOJ(subpath->locus));
CdbPathLocus_MakeHashedOJ(&locus, distkeys, numsegments);
CdbPathLocus_MakeHashedOJ(&locus, distkeys, numsegments, subpath->locus.parallel_workers);
}
}
else
Expand Down Expand Up @@ -711,7 +716,7 @@ cdbpathlocus_pull_above_projection(struct PlannerInfo *root,
CdbPathLocus_MakeHashedWorkers(&newlocus, newdistkeys, numsegments, locus.parallel_workers);
}
else
CdbPathLocus_MakeHashedOJ(&newlocus, newdistkeys, numsegments);
CdbPathLocus_MakeHashedOJ(&newlocus, newdistkeys, numsegments, locus.parallel_workers);
return newlocus;
}
else
Expand Down Expand Up @@ -880,7 +885,7 @@ cdbpathlocus_join(JoinType jointype, CdbPathLocus a, CdbPathLocus b)

newdistkeys = lappend(newdistkeys, newdistkey);
}
CdbPathLocus_MakeHashedOJ(&resultlocus, newdistkeys, numsegments);
CdbPathLocus_MakeHashedOJ(&resultlocus, newdistkeys, numsegments, 0 /* Both are 0 parallel here*/);
}
Assert(cdbpathlocus_is_valid(resultlocus));
return resultlocus;
Expand Down Expand Up @@ -1236,8 +1241,14 @@ cdbpathlocus_parallel_join(JoinType jointype, CdbPathLocus a, CdbPathLocus b, bo
Assert(cdbpathlocus_is_valid(a));
Assert(cdbpathlocus_is_valid(b));

/* Do both input rels have same locus? */
if (cdbpathlocus_equal(a, b))
/*
* Do both input rels have same locus?
* CBDB_PARALLEL: for FULL JOIN, it could be different even both
* are same loucs. Because the NULL values could be on any segments
* after join.
*/

if (jointype != JOIN_FULL && cdbpathlocus_equal(a, b))
return a;

/*
Expand Down Expand Up @@ -1412,8 +1423,9 @@ cdbpathlocus_parallel_join(JoinType jointype, CdbPathLocus a, CdbPathLocus b, bo
* If inner is hashed workers, and outer is hashed. Join locus will be hashed.
* If outer is hashed workers, and inner is hashed. Join locus will be hashed workers.
* Seems we should just return outer locus anyway.
* Things changed since we have parallel full join now.
*/
if (parallel_aware)
if (parallel_aware && jointype != JOIN_FULL)
return a;

numsegments = CdbPathLocus_NumSegments(a);
Expand Down Expand Up @@ -1469,7 +1481,9 @@ cdbpathlocus_parallel_join(JoinType jointype, CdbPathLocus a, CdbPathLocus b, bo
newdistkeys = lappend(newdistkeys, newdistkey);
}

CdbPathLocus_MakeHashedOJ(&resultlocus, newdistkeys, numsegments);
Assert(CdbPathLocus_NumParallelWorkers(a) == CdbPathLocus_NumParallelWorkers(b));

CdbPathLocus_MakeHashedOJ(&resultlocus, newdistkeys, numsegments, CdbPathLocus_NumParallelWorkers(a));
}
Assert(cdbpathlocus_is_valid(resultlocus));
return resultlocus;
Expand Down
181 changes: 175 additions & 6 deletions src/backend/executor/nodeHash.c
Original file line number Diff line number Diff line change
Expand Up @@ -2004,6 +2004,7 @@ ExecParallelHashTableInsert(HashJoinTable hashtable,
/* Store the hash value in the HashJoinTuple header. */
hashTuple->hashvalue = hashvalue;
memcpy(HJTUPLE_MINTUPLE(hashTuple), tuple, tuple->t_len);
HeapTupleHeaderClearMatch(HJTUPLE_MINTUPLE(hashTuple));

/* Push it onto the front of the bucket's list */
ExecParallelHashPushTuple(&hashtable->buckets.shared[bucketno],
Expand Down Expand Up @@ -2388,6 +2389,69 @@ ExecPrepHashTableForUnmatched(HashJoinState *hjstate)
hjstate->hj_CurTuple = NULL;
}

/*
* Decide if this process is allowed to run the unmatched scan. If so, the
* batch barrier is advanced to PHJ_BATCH_SCAN and true is returned.
* Otherwise the batch is detached and false is returned.
*/
bool
ExecParallelPrepHashTableForUnmatched(HashJoinState *hjstate)
{
HashJoinTable hashtable = hjstate->hj_HashTable;
int curbatch = hashtable->curbatch;
ParallelHashJoinBatch *batch = hashtable->batches[curbatch].shared;

Assert(BarrierPhase(&batch->batch_barrier) == PHJ_BATCH_PROBING);

/*
* It would not be deadlock-free to wait on the batch barrier, because it
* is in PHJ_BATCH_PROBING phase, and thus processes attached to it have
* already emitted tuples. Therefore, we'll hold a wait-free election:
* only one process can continue to the next phase, and all others detach
* from this batch. They can still go any work on other batches, if there
* are any.
*/
if (!BarrierArriveAndDetachExceptLast(&batch->batch_barrier))
{
/* This process considers the batch to be done. */
hashtable->batches[hashtable->curbatch].done = true;

/* Make sure any temporary files are closed. */
sts_end_parallel_scan(hashtable->batches[curbatch].inner_tuples);
sts_end_parallel_scan(hashtable->batches[curbatch].outer_tuples);

/*
* Track largest batch we've seen, which would normally happen in
* ExecHashTableDetachBatch().
*/
hashtable->spacePeak =
Max(hashtable->spacePeak,
batch->size + sizeof(dsa_pointer_atomic) * hashtable->nbuckets);
hashtable->curbatch = -1;
return false;
}

/* Now we are alone with this batch. */
Assert(BarrierPhase(&batch->batch_barrier) == PHJ_BATCH_SCAN);
Assert(BarrierParticipants(&batch->batch_barrier) == 1);

/*
* Has another process decided to give up early and command all processes
* to skip the unmatched scan?
*/
if (batch->skip_unmatched)
{
hashtable->batches[hashtable->curbatch].done = true;
ExecHashTableDetachBatch(hashtable);
return false;
}

/* Now prepare the process local state, just as for non-parallel join. */
ExecPrepHashTableForUnmatched(hjstate);

return true;
}

/*
* ExecScanHashTableForUnmatched
* scan the hash table for unmatched inner tuples
Expand Down Expand Up @@ -2462,6 +2526,72 @@ ExecScanHashTableForUnmatched(HashJoinState *hjstate, ExprContext *econtext)
return false;
}

/*
* ExecParallelScanHashTableForUnmatched
* scan the hash table for unmatched inner tuples, in parallel join
*
* On success, the inner tuple is stored into hjstate->hj_CurTuple and
* econtext->ecxt_innertuple, using hjstate->hj_HashTupleSlot as the slot
* for the latter.
*/
bool
ExecParallelScanHashTableForUnmatched(HashJoinState *hjstate,
ExprContext *econtext)
{
HashJoinTable hashtable = hjstate->hj_HashTable;
HashJoinTuple hashTuple = hjstate->hj_CurTuple;

for (;;)
{
/*
* hj_CurTuple is the address of the tuple last returned from the
* current bucket, or NULL if it's time to start scanning a new
* bucket.
*/
if (hashTuple != NULL)
hashTuple = ExecParallelHashNextTuple(hashtable, hashTuple);
else if (hjstate->hj_CurBucketNo < hashtable->nbuckets)
hashTuple = ExecParallelHashFirstTuple(hashtable,
hjstate->hj_CurBucketNo++);
else
break; /* finished all buckets */

while (hashTuple != NULL)
{
if (!HeapTupleHeaderHasMatch(HJTUPLE_MINTUPLE(hashTuple)))
{
TupleTableSlot *inntuple;

/* insert hashtable's tuple into exec slot */
inntuple = ExecStoreMinimalTuple(HJTUPLE_MINTUPLE(hashTuple),
hjstate->hj_HashTupleSlot,
false); /* do not pfree */
econtext->ecxt_innertuple = inntuple;

/*
* Reset temp memory each time; although this function doesn't
* do any qual eval, the caller will, so let's keep it
* parallel to ExecScanHashBucket.
*/
ResetExprContext(econtext);

hjstate->hj_CurTuple = hashTuple;
return true;
}

hashTuple = ExecParallelHashNextTuple(hashtable, hashTuple);
}

/* allow this loop to be cancellable */
CHECK_FOR_INTERRUPTS();
}

/*
* no more unmatched tuples
*/
return false;
}

/*
* ExecHashTableReset
*
Expand Down Expand Up @@ -3793,6 +3923,7 @@ ExecParallelHashEnsureBatchAccessors(HashJoinTable hashtable)
accessor->shared = shared;
accessor->preallocated = 0;
accessor->done = false;
accessor->outer_eof = false;
accessor->inner_tuples =
sts_attach(ParallelHashJoinBatchInner(shared),
hashtable->hjstate->worker_id,
Expand Down Expand Up @@ -3838,25 +3969,63 @@ ExecHashTableDetachBatch(HashJoinTable hashtable)
{
int curbatch = hashtable->curbatch;
ParallelHashJoinBatch *batch = hashtable->batches[curbatch].shared;
bool attached = true;

/* Make sure any temporary files are closed. */
sts_end_parallel_scan(hashtable->batches[curbatch].inner_tuples);
sts_end_parallel_scan(hashtable->batches[curbatch].outer_tuples);

/* Detach from the batch we were last working on. */
/* After attaching we always get at least to PHJ_BATCH_PROBING. */
Assert(BarrierPhase(&batch->batch_barrier) == PHJ_BATCH_PROBING ||
BarrierPhase(&batch->batch_barrier) == PHJ_BATCH_SCAN);

/*
* If we're abandoning the PHJ_BATCH_PROBING phase early without having
* reached the end of it, it means the plan doesn't want any more
* tuples, and it is happy to abandon any tuples buffered in this
* process's subplans. For correctness, we can't allow any process to
* execute the PHJ_BATCH_SCAN phase, because we will never have the
* complete set of match bits. Therefore we skip emitting unmatched
* tuples in all backends (if this is a full/right join), as if those
* tuples were all due to be emitted by this process and it has
* abandoned them too.
*/
/*
* CBDB_PARALLEL: Parallel Hash Left Anti Semi (Not-In) Join(parallel-aware)
* If phs_lasj_has_null is true, that means we have found null when building hash table,
* there were no batches to detach.
*/
if (!hashtable->parallel_state->phs_lasj_has_null && BarrierArriveAndDetach(&batch->batch_barrier))
if (BarrierPhase(&batch->batch_barrier) == PHJ_BATCH_PROBING &&
!hashtable->parallel_state->phs_lasj_has_null && /* CBDB_PARALLEL */
!hashtable->batches[curbatch].outer_eof)
{
/*
* This flag may be written to by multiple backends during
* PHJ_BATCH_PROBING phase, but will only be read in PHJ_BATCH_SCAN
* phase so requires no extra locking.
*/
batch->skip_unmatched = true;
}

/*
* Even if we aren't doing a full/right outer join, we'll step through
* the PHJ_BATCH_SCAN phase just to maintain the invariant that
* freeing happens in PHJ_BATCH_FREE, but that'll be wait-free.
*/
if (BarrierPhase(&batch->batch_barrier) == PHJ_BATCH_PROBING &&
!hashtable->parallel_state->phs_lasj_has_null /* CBDB_PARALLEL */)
attached = BarrierArriveAndDetachExceptLast(&batch->batch_barrier);
if (attached && !hashtable->parallel_state->phs_lasj_has_null /* CBDB_PARALLEL */ &&
BarrierArriveAndDetach(&batch->batch_barrier))
{
/*
* Technically we shouldn't access the barrier because we're no
* longer attached, but since there is no way it's moving after
* this point it seems safe to make the following assertion.
* We are not longer attached to the batch barrier, but we're the
* process that was chosen to free resources and it's safe to
* assert the current phase. The ParallelHashJoinBatch can't go
* away underneath us while we are attached to the build barrier,
* making this access safe.
*/
Assert(BarrierPhase(&batch->batch_barrier) == PHJ_BATCH_DONE);
Assert(BarrierPhase(&batch->batch_barrier) == PHJ_BATCH_FREE);

/* Free shared chunks and buckets. */
while (DsaPointerIsValid(batch->chunks))
Expand Down
Loading
Loading