Skip to content

Commit 476a40f

Browse files
david-rowleyCommitfest Bot
authored andcommitted
fixup! v10 parallel tid range scan
1 parent fd5759a commit 476a40f

File tree

11 files changed

+150
-155
lines changed

11 files changed

+150
-155
lines changed

doc/src/sgml/parallel.sgml

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -299,6 +299,15 @@ EXPLAIN SELECT * FROM pgbench_accounts WHERE filler LIKE '%x%';
299299
within each worker process.
300300
</para>
301301
</listitem>
302+
<listitem>
303+
<para>
304+
In a <emphasis>parallel tid range scan</emphasis>, the range of blocks
305+
will be subdivided into smaller ranges which are shared among the
306+
cooperating processes. Each worker process will complete the scanning
307+
of its given range of blocks before requesting an additional range of
308+
blocks.
309+
</para>
310+
</listitem>
302311
</itemizedlist>
303312

304313
Other scan types, such as scans of non-btree indexes, may support

src/backend/access/heap/heapam.c

Lines changed: 3 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -258,7 +258,9 @@ heap_scan_stream_read_next_parallel(ReadStream *stream,
258258
/* parallel scan */
259259
table_block_parallelscan_startblock_init(scan->rs_base.rs_rd,
260260
scan->rs_parallelworkerdata,
261-
(ParallelBlockTableScanDesc) scan->rs_base.rs_parallel);
261+
(ParallelBlockTableScanDesc) scan->rs_base.rs_parallel,
262+
scan->rs_startblock,
263+
scan->rs_numblocks);
262264

263265
/* may return InvalidBlockNumber if there are no more blocks */
264266
scan->rs_prefetch_block = table_block_parallelscan_nextpage(scan->rs_base.rs_rd,
@@ -490,16 +492,6 @@ heap_setscanlimits(TableScanDesc sscan, BlockNumber startBlk, BlockNumber numBlk
490492

491493
scan->rs_startblock = startBlk;
492494
scan->rs_numblocks = numBlks;
493-
494-
/* set the limits in the ParallelBlockTableScanDesc, when present as leader */
495-
if (scan->rs_base.rs_parallel != NULL && !IsParallelWorker())
496-
{
497-
ParallelBlockTableScanDesc bpscan;
498-
499-
bpscan = (ParallelBlockTableScanDesc) scan->rs_base.rs_parallel;
500-
bpscan->phs_startblock = startBlk;
501-
bpscan->phs_numblock = numBlks;
502-
}
503495
}
504496

505497
/*

src/backend/access/table/tableam.c

Lines changed: 80 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -189,8 +189,8 @@ table_beginscan_parallel(Relation relation, ParallelTableScanDesc pscan)
189189
}
190190

191191
TableScanDesc
192-
table_beginscan_parallel_tidrange(Relation relation, ParallelTableScanDesc pscan,
193-
ItemPointerData * mintid, ItemPointerData * maxtid)
192+
table_beginscan_parallel_tidrange(Relation relation,
193+
ParallelTableScanDesc pscan)
194194
{
195195
Snapshot snapshot;
196196
uint32 flags = SO_TYPE_TIDRANGESCAN | SO_ALLOW_PAGEMODE;
@@ -216,11 +216,6 @@ table_beginscan_parallel_tidrange(Relation relation, ParallelTableScanDesc pscan
216216

217217
sscan = relation->rd_tableam->scan_begin(relation, snapshot, 0, NULL,
218218
pscan, flags);
219-
220-
/* Set the TID range if needed */
221-
if (mintid && maxtid)
222-
relation->rd_tableam->scan_set_tidrange(sscan, mintid, maxtid);
223-
224219
return sscan;
225220
}
226221

@@ -453,57 +448,59 @@ table_block_parallelscan_reinitialize(Relation rel, ParallelTableScanDesc pscan)
453448
*
454449
* Determine where the parallel seq scan should start. This function may be
455450
* called many times, once by each parallel worker. We must be careful only
456-
* to set the startblock once.
451+
* to set the phs_startblock and phs_numblock fields once.
452+
*
453+
* Callers may optionally specify a non-InvalidBlockNumber value for
454+
* 'startblock' to force the scan to start at the given page. Likewise,
455+
* 'numblocks' can be specified as a non-InvalidBlockNumber to limit the
456+
* number of blocks to scan to that many blocks.
457457
*/
458458
void
459459
table_block_parallelscan_startblock_init(Relation rel,
460460
ParallelBlockTableScanWorker pbscanwork,
461-
ParallelBlockTableScanDesc pbscan)
461+
ParallelBlockTableScanDesc pbscan,
462+
BlockNumber startblock,
463+
BlockNumber numblocks)
462464
{
463465
BlockNumber sync_startpage = InvalidBlockNumber;
466+
BlockNumber scan_nblocks;
464467

465468
/* Reset the state we use for controlling allocation size. */
466469
memset(pbscanwork, 0, sizeof(*pbscanwork));
467470

468471
StaticAssertStmt(MaxBlockNumber <= 0xFFFFFFFE,
469472
"pg_nextpower2_32 may be too small for non-standard BlockNumber width");
470473

471-
/*
472-
* We determine the chunk size based on the size of the relation. First we
473-
* split the relation into PARALLEL_SEQSCAN_NCHUNKS chunks but we then
474-
* take the next highest power of 2 number of the chunk size. This means
475-
* we split the relation into somewhere between PARALLEL_SEQSCAN_NCHUNKS
476-
* and PARALLEL_SEQSCAN_NCHUNKS / 2 chunks.
477-
*/
478-
pbscanwork->phsw_chunk_size = pg_nextpower2_32(Max(pbscan->phs_nblocks /
479-
PARALLEL_SEQSCAN_NCHUNKS, 1));
480-
481-
/*
482-
* Ensure we don't go over the maximum chunk size with larger tables. This
483-
* means we may get much more than PARALLEL_SEQSCAN_NCHUNKS for larger
484-
* tables. Too large a chunk size has been shown to be detrimental to
485-
* synchronous scan performance.
486-
*/
487-
pbscanwork->phsw_chunk_size = Min(pbscanwork->phsw_chunk_size,
488-
PARALLEL_SEQSCAN_MAX_CHUNK_SIZE);
489-
490474
retry:
491475
/* Grab the spinlock. */
492476
SpinLockAcquire(&pbscan->phs_mutex);
493477

494478
/*
495-
* If the scan's startblock has not yet been initialized, we must do so
496-
* now. If this is not a synchronized scan, we just start at block 0, but
497-
* if it is a synchronized scan, we must get the starting position from
498-
* the synchronized scan machinery. We can't hold the spinlock while
499-
* doing that, though, so release the spinlock, get the information we
500-
* need, and retry. If nobody else has initialized the scan in the
501-
* meantime, we'll fill in the value we fetched on the second time
502-
* through.
479+
* When the caller specified a limit on the number of blocks to scan, set
480+
* that in the ParallelBlockTableScanDesc, if it's not been done by
481+
* another worker already.
482+
*/
483+
if (numblocks != InvalidBlockNumber &&
484+
pbscan->phs_numblock == InvalidBlockNumber)
485+
{
486+
pbscan->phs_numblock = numblocks;
487+
}
488+
489+
/*
490+
* If the scan's phs_startblock has not yet been initialized, we must do
491+
* so now. If a startblock was specified, start there, otherwise if this
492+
* is not a synchronized scan, we just start at block 0, but if it is a
493+
* synchronized scan, we must get the starting position from the
494+
* synchronized scan machinery. We can't hold the spinlock while doing
495+
* that, though, so release the spinlock, get the information we need, and
496+
* retry. If nobody else has initialized the scan in the meantime, we'll
497+
* fill in the value we fetched on the second time through.
503498
*/
504499
if (pbscan->phs_startblock == InvalidBlockNumber)
505500
{
506-
if (!pbscan->base.phs_syncscan)
501+
if (startblock != InvalidBlockNumber)
502+
pbscan->phs_startblock = startblock;
503+
else if (!pbscan->base.phs_syncscan)
507504
pbscan->phs_startblock = 0;
508505
else if (sync_startpage != InvalidBlockNumber)
509506
pbscan->phs_startblock = sync_startpage;
@@ -515,6 +512,34 @@ table_block_parallelscan_startblock_init(Relation rel,
515512
}
516513
}
517514
SpinLockRelease(&pbscan->phs_mutex);
515+
516+
/*
517+
* Figure out how many blocks we're going to scan; either all of them, or
518+
* just phs_numblock's worth, if a limit has been imposed.
519+
*/
520+
if (pbscan->phs_numblock == InvalidBlockNumber)
521+
scan_nblocks = pbscan->phs_nblocks;
522+
else
523+
scan_nblocks = pbscan->phs_numblock;
524+
525+
/*
526+
* We determine the chunk size based on scan_nblocks. First we split
527+
* scan_nblocks into PARALLEL_SEQSCAN_NCHUNKS chunks then we calculate the
528+
* next highest power of 2 number of the result. This means we split the
529+
* blocks we're scanning into somewhere between PARALLEL_SEQSCAN_NCHUNKS
530+
* and PARALLEL_SEQSCAN_NCHUNKS / 2 chunks.
531+
*/
532+
pbscanwork->phsw_chunk_size = pg_nextpower2_32(Max(scan_nblocks /
533+
PARALLEL_SEQSCAN_NCHUNKS, 1));
534+
535+
/*
536+
* Ensure we don't go over the maximum chunk size with larger tables. This
537+
* means we may get much more than PARALLEL_SEQSCAN_NCHUNKS for larger
538+
* tables. Too large a chunk size has been shown to be detrimental to
539+
* synchronous scan performance.
540+
*/
541+
pbscanwork->phsw_chunk_size = Min(pbscanwork->phsw_chunk_size,
542+
PARALLEL_SEQSCAN_MAX_CHUNK_SIZE);
518543
}
519544

520545
/*
@@ -530,6 +555,7 @@ table_block_parallelscan_nextpage(Relation rel,
530555
ParallelBlockTableScanWorker pbscanwork,
531556
ParallelBlockTableScanDesc pbscan)
532557
{
558+
BlockNumber scan_nblocks;
533559
BlockNumber page;
534560
uint64 nallocated;
535561

@@ -550,7 +576,7 @@ table_block_parallelscan_nextpage(Relation rel,
550576
*
551577
* Here we name these ranges of blocks "chunks". The initial size of
552578
* these chunks is determined in table_block_parallelscan_startblock_init
553-
* based on the size of the relation. Towards the end of the scan, we
579+
* based on the number of blocks to scan. Towards the end of the scan, we
554580
* start making reductions in the size of the chunks in order to attempt
555581
* to divide the remaining work over all the workers as evenly as
556582
* possible.
@@ -567,17 +593,23 @@ table_block_parallelscan_nextpage(Relation rel,
567593
* phs_nallocated counter will exceed rs_nblocks, because workers will
568594
* still increment the value, when they try to allocate the next block but
569595
* all blocks have been allocated already. The counter must be 64 bits
570-
* wide because of that, to avoid wrapping around when rs_nblocks is close
571-
* to 2^32.
596+
* wide because of that, to avoid wrapping around when scan_nblocks is
597+
* close to 2^32.
572598
*
573599
* The actual block to return is calculated by adding the counter to the
574-
* starting block number, modulo nblocks.
600+
* starting block number, modulo phs_nblocks.
575601
*/
576602

603+
/* First, figure out how many blocks we're planning on scanning */
604+
if (pbscan->phs_numblock == InvalidBlockNumber)
605+
scan_nblocks = pbscan->phs_nblocks;
606+
else
607+
scan_nblocks = pbscan->phs_numblock;
608+
577609
/*
578-
* First check if we have any remaining blocks in a previous chunk for
579-
* this worker. We must consume all of the blocks from that before we
580-
* allocate a new chunk to the worker.
610+
* Now check if we have any remaining blocks in a previous chunk for this
611+
* worker. We must consume all of the blocks from that before we allocate
612+
* a new chunk to the worker.
581613
*/
582614
if (pbscanwork->phsw_chunk_remaining > 0)
583615
{
@@ -599,7 +631,7 @@ table_block_parallelscan_nextpage(Relation rel,
599631
* chunk size set to 1.
600632
*/
601633
if (pbscanwork->phsw_chunk_size > 1 &&
602-
pbscanwork->phsw_nallocated > pbscan->phs_nblocks -
634+
pbscanwork->phsw_nallocated > scan_nblocks -
603635
(pbscanwork->phsw_chunk_size * PARALLEL_SEQSCAN_RAMPDOWN_CHUNKS))
604636
pbscanwork->phsw_chunk_size >>= 1;
605637

@@ -614,15 +646,9 @@ table_block_parallelscan_nextpage(Relation rel,
614646
pbscanwork->phsw_chunk_remaining = pbscanwork->phsw_chunk_size - 1;
615647
}
616648

617-
/*
618-
* Check if we've allocated every block in the relation, or if we've
619-
* reached the limit imposed by pbscan->phs_numblock (if set).
620-
*/
621-
if (nallocated >= pbscan->phs_nblocks)
622-
page = InvalidBlockNumber; /* all blocks have been allocated */
623-
else if (pbscan->phs_numblock != InvalidBlockNumber &&
624-
nallocated >= pbscan->phs_numblock)
625-
page = InvalidBlockNumber; /* upper scan limit reached */
649+
/* Check if we've run out of blocks to scan */
650+
if (nallocated >= scan_nblocks)
651+
page = InvalidBlockNumber; /* all blocks have been allocated */
626652
else
627653
page = (nallocated + pbscan->phs_startblock) % pbscan->phs_nblocks;
628654

src/backend/executor/execParallel.c

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,8 @@
4040
#include "executor/nodeSeqscan.h"
4141
#include "executor/nodeSort.h"
4242
#include "executor/nodeSubplan.h"
43-
#include "executor/tqueue.h"
4443
#include "executor/nodeTidrangescan.h"
44+
#include "executor/tqueue.h"
4545
#include "jit/jit.h"
4646
#include "nodes/nodeFuncs.h"
4747
#include "pgstat.h"
@@ -502,7 +502,7 @@ ExecParallelInitializeDSM(PlanState *planstate,
502502
case T_TidRangeScanState:
503503
if (planstate->plan->parallel_aware)
504504
ExecTidRangeScanInitializeDSM((TidRangeScanState *) planstate,
505-
d->pcxt);
505+
d->pcxt);
506506
break;
507507
case T_AppendState:
508508
if (planstate->plan->parallel_aware)
@@ -1008,7 +1008,7 @@ ExecParallelReInitializeDSM(PlanState *planstate,
10081008
case T_TidRangeScanState:
10091009
if (planstate->plan->parallel_aware)
10101010
ExecTidRangeScanReInitializeDSM((TidRangeScanState *) planstate,
1011-
pcxt);
1011+
pcxt);
10121012
break;
10131013
case T_AppendState:
10141014
if (planstate->plan->parallel_aware)

src/backend/executor/nodeTidrangescan.c

Lines changed: 13 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -250,13 +250,9 @@ TidRangeNext(TidRangeScanState *node)
250250
}
251251
else
252252
{
253-
/* rescan with the updated TID range only in non-parallel mode */
254-
if (scandesc->rs_parallel == NULL)
255-
{
256-
/* rescan with the updated TID range */
257-
table_rescan_tidrange(scandesc, &node->trss_mintid,
258-
&node->trss_maxtid);
259-
}
253+
/* rescan with the updated TID range */
254+
table_rescan_tidrange(scandesc, &node->trss_mintid,
255+
&node->trss_maxtid);
260256
}
261257

262258
node->trss_inScan = true;
@@ -419,6 +415,7 @@ ExecInitTidRangeScan(TidRangeScan *node, EState *estate, int eflags)
419415
*/
420416
return tidrangestate;
421417
}
418+
422419
/* ----------------------------------------------------------------
423420
* Parallel Scan Support
424421
* ----------------------------------------------------------------
@@ -446,7 +443,7 @@ ExecTidRangeScanEstimate(TidRangeScanState *node, ParallelContext *pcxt)
446443
/* ----------------------------------------------------------------
447444
* ExecTidRangeScanInitializeDSM
448445
*
449-
* Set up a parallel TID scan descriptor.
446+
* Set up a parallel TID range scan descriptor.
450447
* ----------------------------------------------------------------
451448
*/
452449
void
@@ -460,19 +457,9 @@ ExecTidRangeScanInitializeDSM(TidRangeScanState *node, ParallelContext *pcxt)
460457
pscan,
461458
estate->es_snapshot);
462459
shm_toc_insert(pcxt->toc, node->ss.ps.plan->plan_node_id, pscan);
463-
464-
/*
465-
* Initialize parallel scan descriptor with given TID range if it can be
466-
* evaluated successfully.
467-
*/
468-
if (TidRangeEval(node))
469-
node->ss.ss_currentScanDesc =
470-
table_beginscan_parallel_tidrange(node->ss.ss_currentRelation, pscan,
471-
&node->trss_mintid, &node->trss_maxtid);
472-
else
473-
node->ss.ss_currentScanDesc =
474-
table_beginscan_parallel_tidrange(node->ss.ss_currentRelation, pscan,
475-
NULL, NULL);
460+
node->ss.ss_currentScanDesc =
461+
table_beginscan_parallel_tidrange(node->ss.ss_currentRelation,
462+
pscan);
476463
}
477464

478465
/* ----------------------------------------------------------------
@@ -483,21 +470,12 @@ ExecTidRangeScanInitializeDSM(TidRangeScanState *node, ParallelContext *pcxt)
483470
*/
484471
void
485472
ExecTidRangeScanReInitializeDSM(TidRangeScanState *node,
486-
ParallelContext *pcxt)
473+
ParallelContext *pcxt)
487474
{
488475
ParallelTableScanDesc pscan;
489476

490477
pscan = node->ss.ss_currentScanDesc->rs_parallel;
491478
table_parallelscan_reinitialize(node->ss.ss_currentRelation, pscan);
492-
493-
/* Set the new TID range if it can be evaluated successfully */
494-
if (TidRangeEval(node))
495-
node->ss.ss_currentRelation->rd_tableam->scan_set_tidrange(
496-
node->ss.ss_currentScanDesc, &node->trss_mintid,
497-
&node->trss_maxtid);
498-
else
499-
node->ss.ss_currentRelation->rd_tableam->scan_set_tidrange(
500-
node->ss.ss_currentScanDesc, NULL, NULL);
501479
}
502480

503481
/* ----------------------------------------------------------------
@@ -508,18 +486,12 @@ ExecTidRangeScanReInitializeDSM(TidRangeScanState *node,
508486
*/
509487
void
510488
ExecTidRangeScanInitializeWorker(TidRangeScanState *node,
511-
ParallelWorkerContext *pwcxt)
489+
ParallelWorkerContext *pwcxt)
512490
{
513491
ParallelTableScanDesc pscan;
514492

515493
pscan = shm_toc_lookup(pwcxt->toc, node->ss.ps.plan->plan_node_id, false);
516-
517-
if (TidRangeEval(node))
518-
node->ss.ss_currentScanDesc =
519-
table_beginscan_parallel_tidrange(node->ss.ss_currentRelation, pscan,
520-
&node->trss_mintid, &node->trss_maxtid);
521-
else
522-
node->ss.ss_currentScanDesc =
523-
table_beginscan_parallel_tidrange(node->ss.ss_currentRelation, pscan,
524-
NULL, NULL);
494+
node->ss.ss_currentScanDesc =
495+
table_beginscan_parallel_tidrange(node->ss.ss_currentRelation,
496+
pscan);
525497
}

0 commit comments

Comments
 (0)