PostgreSQL Source Code git master
spgdoinsert.c
Go to the documentation of this file.
1/*-------------------------------------------------------------------------
2 *
3 * spgdoinsert.c
4 * implementation of insert algorithm
5 *
6 *
7 * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
8 * Portions Copyright (c) 1994, Regents of the University of California
9 *
10 * IDENTIFICATION
11 * src/backend/access/spgist/spgdoinsert.c
12 *
13 *-------------------------------------------------------------------------
14 */
15
16#include "postgres.h"
17
18#include "access/genam.h"
20#include "access/spgxlog.h"
21#include "access/xloginsert.h"
22#include "common/int.h"
23#include "common/pg_prng.h"
24#include "miscadmin.h"
25#include "storage/bufmgr.h"
26#include "utils/rel.h"
27
28
29/*
30 * SPPageDesc tracks all info about a page we are inserting into. In some
31 * situations it actually identifies a tuple, or even a specific node within
32 * an inner tuple. But any of the fields can be invalid. If the buffer
33 * field is valid, it implies we hold pin and exclusive lock on that buffer.
34 * page pointer should be valid exactly when buffer is.
35 */
36typedef struct SPPageDesc
37{
38 BlockNumber blkno; /* block number, or InvalidBlockNumber */
39 Buffer buffer; /* page's buffer number, or InvalidBuffer */
40 Page page; /* pointer to page buffer, or NULL */
41 OffsetNumber offnum; /* offset of tuple, or InvalidOffsetNumber */
42 int node; /* node number within inner tuple, or -1 */
44
45
46/*
47 * Set the item pointer in the nodeN'th entry in inner tuple tup. This
48 * is used to update the parent inner tuple's downlink after a move or
49 * split operation.
50 */
51void
53 BlockNumber blkno, OffsetNumber offset)
54{
55 int i;
56 SpGistNodeTuple node;
57
58 SGITITERATE(tup, i, node)
59 {
60 if (i == nodeN)
61 {
62 ItemPointerSet(&node->t_tid, blkno, offset);
63 return;
64 }
65 }
66
67 elog(ERROR, "failed to find requested node %d in SPGiST inner tuple",
68 nodeN);
69}
70
71/*
72 * Form a new inner tuple containing one more node than the given one, with
73 * the specified label datum, inserted at offset "offset" in the node array.
74 * The new tuple's prefix is the same as the old one's.
75 *
76 * Note that the new node initially has an invalid downlink. We'll find a
77 * page to point it to later.
78 */
81{
82 SpGistNodeTuple node,
83 *nodes;
84 int i;
85
86 /* if offset is negative, insert at end */
87 if (offset < 0)
88 offset = tuple->nNodes;
89 else if (offset > tuple->nNodes)
90 elog(ERROR, "invalid offset for adding node to SPGiST inner tuple");
91
92 nodes = palloc(sizeof(SpGistNodeTuple) * (tuple->nNodes + 1));
93 SGITITERATE(tuple, i, node)
94 {
95 if (i < offset)
96 nodes[i] = node;
97 else
98 nodes[i + 1] = node;
99 }
100
101 nodes[offset] = spgFormNodeTuple(state, label, false);
102
104 (tuple->prefixSize > 0),
105 SGITDATUM(tuple, state),
106 tuple->nNodes + 1,
107 nodes);
108}
109
110/* qsort comparator for sorting OffsetNumbers */
111static int
112cmpOffsetNumbers(const void *a, const void *b)
113{
114 return pg_cmp_u16(*(const OffsetNumber *) a, *(const OffsetNumber *) b);
115}
116
117/*
118 * Delete multiple tuples from an index page, preserving tuple offset numbers.
119 *
120 * The first tuple in the given list is replaced with a dead tuple of type
121 * "firststate" (REDIRECT/DEAD/PLACEHOLDER); the remaining tuples are replaced
122 * with dead tuples of type "reststate". If either firststate or reststate
123 * is REDIRECT, blkno/offnum specify where to link to.
124 *
125 * NB: this is used during WAL replay, so beware of trying to make it too
126 * smart. In particular, it shouldn't use "state" except for calling
127 * spgFormDeadTuple(). This is also used in a critical section, so no
128 * pallocs either!
129 */
130void
132 OffsetNumber *itemnos, int nitems,
133 int firststate, int reststate,
134 BlockNumber blkno, OffsetNumber offnum)
135{
136 OffsetNumber firstItem;
138 SpGistDeadTuple tuple = NULL;
139 int i;
140
141 if (nitems == 0)
142 return; /* nothing to do */
143
144 /*
145 * For efficiency we want to use PageIndexMultiDelete, which requires the
146 * targets to be listed in sorted order, so we have to sort the itemnos
147 * array. (This also greatly simplifies the math for reinserting the
148 * replacement tuples.) However, we must not scribble on the caller's
149 * array, so we have to make a copy.
150 */
151 memcpy(sortednos, itemnos, sizeof(OffsetNumber) * nitems);
152 if (nitems > 1)
153 qsort(sortednos, nitems, sizeof(OffsetNumber), cmpOffsetNumbers);
154
155 PageIndexMultiDelete(page, sortednos, nitems);
156
157 firstItem = itemnos[0];
158
159 for (i = 0; i < nitems; i++)
160 {
161 OffsetNumber itemno = sortednos[i];
162 int tupstate;
163
164 tupstate = (itemno == firstItem) ? firststate : reststate;
165 if (tuple == NULL || tuple->tupstate != tupstate)
166 tuple = spgFormDeadTuple(state, tupstate, blkno, offnum);
167
168 if (PageAddItem(page, tuple, tuple->size, itemno, false, false) != itemno)
169 elog(ERROR, "failed to add item of size %u to SPGiST index page",
170 tuple->size);
171
172 if (tupstate == SPGIST_REDIRECT)
173 SpGistPageGetOpaque(page)->nRedirection++;
174 else if (tupstate == SPGIST_PLACEHOLDER)
175 SpGistPageGetOpaque(page)->nPlaceholder++;
176 }
177}
178
179/*
180 * Update the parent inner tuple's downlink, and mark the parent buffer
181 * dirty (this must be the last change to the parent page in the current
182 * WAL action).
183 */
184static void
186 BlockNumber blkno, OffsetNumber offnum)
187{
188 SpGistInnerTuple innerTuple;
189
190 innerTuple = (SpGistInnerTuple) PageGetItem(parent->page,
191 PageGetItemId(parent->page, parent->offnum));
192
193 spgUpdateNodeLink(innerTuple, parent->node, blkno, offnum);
194
195 MarkBufferDirty(parent->buffer);
196}
197
198/*
199 * Add a leaf tuple to a leaf page where there is known to be room for it
200 */
201static void
203 SPPageDesc *current, SPPageDesc *parent, bool isNulls, bool isNew)
204{
205 spgxlogAddLeaf xlrec;
206
207 xlrec.newPage = isNew;
208 xlrec.storesNulls = isNulls;
209
210 /* these will be filled below as needed */
214 xlrec.nodeI = 0;
215
217
218 if (current->offnum == InvalidOffsetNumber ||
219 SpGistBlockIsRoot(current->blkno))
220 {
221 /* Tuple is not part of a chain */
223 current->offnum = SpGistPageAddNewItem(state, current->page,
224 leafTuple, leafTuple->size,
225 NULL, false);
226
227 xlrec.offnumLeaf = current->offnum;
228
229 /* Must update parent's downlink if any */
230 if (parent->buffer != InvalidBuffer)
231 {
232 xlrec.offnumParent = parent->offnum;
233 xlrec.nodeI = parent->node;
234
235 saveNodeLink(index, parent, current->blkno, current->offnum);
236 }
237 }
238 else
239 {
240 /*
241 * Tuple must be inserted into existing chain. We mustn't change the
242 * chain's head address, but we don't need to chase the entire chain
243 * to put the tuple at the end; we can insert it second.
244 *
245 * Also, it's possible that the "chain" consists only of a DEAD tuple,
246 * in which case we should replace the DEAD tuple in-place.
247 */
248 SpGistLeafTuple head;
249 OffsetNumber offnum;
250
251 head = (SpGistLeafTuple) PageGetItem(current->page,
252 PageGetItemId(current->page, current->offnum));
253 if (head->tupstate == SPGIST_LIVE)
254 {
256 offnum = SpGistPageAddNewItem(state, current->page,
257 leafTuple, leafTuple->size,
258 NULL, false);
259
260 /*
261 * re-get head of list because it could have been moved on page,
262 * and set new second element
263 */
264 head = (SpGistLeafTuple) PageGetItem(current->page,
265 PageGetItemId(current->page, current->offnum));
266 SGLT_SET_NEXTOFFSET(head, offnum);
267
268 xlrec.offnumLeaf = offnum;
269 xlrec.offnumHeadLeaf = current->offnum;
270 }
271 else if (head->tupstate == SPGIST_DEAD)
272 {
274 PageIndexTupleDelete(current->page, current->offnum);
275 if (PageAddItem(current->page,
276 leafTuple, leafTuple->size,
277 current->offnum, false, false) != current->offnum)
278 elog(ERROR, "failed to add item of size %u to SPGiST index page",
279 leafTuple->size);
280
281 /* WAL replay distinguishes this case by equal offnums */
282 xlrec.offnumLeaf = current->offnum;
283 xlrec.offnumHeadLeaf = current->offnum;
284 }
285 else
286 elog(ERROR, "unexpected SPGiST tuple state: %d", head->tupstate);
287 }
288
289 MarkBufferDirty(current->buffer);
290
291 if (RelationNeedsWAL(index) && !state->isBuild)
292 {
293 XLogRecPtr recptr;
294 int flags;
295
297 XLogRegisterData(&xlrec, sizeof(xlrec));
298 XLogRegisterData(leafTuple, leafTuple->size);
299
300 flags = REGBUF_STANDARD;
301 if (xlrec.newPage)
302 flags |= REGBUF_WILL_INIT;
303 XLogRegisterBuffer(0, current->buffer, flags);
306
307 recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_ADD_LEAF);
308
309 PageSetLSN(current->page, recptr);
310
311 /* update parent only if we actually changed it */
313 {
314 PageSetLSN(parent->page, recptr);
315 }
316 }
317
319}
320
321/*
322 * Count the number and total size of leaf tuples in the chain starting at
323 * current->offnum. Return number into *nToSplit and total size as function
324 * result.
325 *
326 * Klugy special case when considering the root page (i.e., root is a leaf
327 * page, but we're about to split for the first time): return fake large
328 * values to force spgdoinsert() to take the doPickSplit rather than
329 * moveLeafs code path. moveLeafs is not prepared to deal with root page.
330 */
331static int
333 SPPageDesc *current, int *nToSplit)
334{
335 int i,
336 n = 0,
337 totalSize = 0;
338
339 if (SpGistBlockIsRoot(current->blkno))
340 {
341 /* return impossible values to force split */
342 *nToSplit = BLCKSZ;
343 return BLCKSZ;
344 }
345
346 i = current->offnum;
347 while (i != InvalidOffsetNumber)
348 {
350
352 i <= PageGetMaxOffsetNumber(current->page));
353 it = (SpGistLeafTuple) PageGetItem(current->page,
354 PageGetItemId(current->page, i));
355 if (it->tupstate == SPGIST_LIVE)
356 {
357 n++;
358 totalSize += it->size + sizeof(ItemIdData);
359 }
360 else if (it->tupstate == SPGIST_DEAD)
361 {
362 /* We could see a DEAD tuple as first/only chain item */
363 Assert(i == current->offnum);
365 /* Don't count it in result, because it won't go to other page */
366 }
367 else
368 elog(ERROR, "unexpected SPGiST tuple state: %d", it->tupstate);
369
371 }
372
373 *nToSplit = n;
374
375 return totalSize;
376}
377
378/*
379 * current points to a leaf-tuple chain that we wanted to add newLeafTuple to,
380 * but the chain has to be moved because there's not enough room to add
381 * newLeafTuple to its page. We use this method when the chain contains
382 * very little data so a split would be inefficient. We are sure we can
383 * fit the chain plus newLeafTuple on one other page.
384 */
385static void
387 SPPageDesc *current, SPPageDesc *parent,
388 SpGistLeafTuple newLeafTuple, bool isNulls)
389{
390 int i,
391 nDelete,
392 nInsert,
393 size;
394 Buffer nbuf;
395 Page npage;
397 startOffset = InvalidOffsetNumber;
398 bool replaceDead = false;
399 OffsetNumber *toDelete;
400 OffsetNumber *toInsert;
401 BlockNumber nblkno;
402 spgxlogMoveLeafs xlrec;
403 char *leafdata,
404 *leafptr;
405
406 /* This doesn't work on root page */
407 Assert(parent->buffer != InvalidBuffer);
408 Assert(parent->buffer != current->buffer);
409
410 /* Locate the tuples to be moved, and count up the space needed */
411 i = PageGetMaxOffsetNumber(current->page);
412 toDelete = (OffsetNumber *) palloc(sizeof(OffsetNumber) * i);
413 toInsert = (OffsetNumber *) palloc(sizeof(OffsetNumber) * (i + 1));
414
415 size = newLeafTuple->size + sizeof(ItemIdData);
416
417 nDelete = 0;
418 i = current->offnum;
419 while (i != InvalidOffsetNumber)
420 {
422
424 i <= PageGetMaxOffsetNumber(current->page));
425 it = (SpGistLeafTuple) PageGetItem(current->page,
426 PageGetItemId(current->page, i));
427
428 if (it->tupstate == SPGIST_LIVE)
429 {
430 toDelete[nDelete] = i;
431 size += it->size + sizeof(ItemIdData);
432 nDelete++;
433 }
434 else if (it->tupstate == SPGIST_DEAD)
435 {
436 /* We could see a DEAD tuple as first/only chain item */
437 Assert(i == current->offnum);
439 /* We don't want to move it, so don't count it in size */
440 toDelete[nDelete] = i;
441 nDelete++;
442 replaceDead = true;
443 }
444 else
445 elog(ERROR, "unexpected SPGiST tuple state: %d", it->tupstate);
446
448 }
449
450 /* Find a leaf page that will hold them */
451 nbuf = SpGistGetBuffer(index, GBUF_LEAF | (isNulls ? GBUF_NULLS : 0),
452 size, &xlrec.newPage);
453 npage = BufferGetPage(nbuf);
454 nblkno = BufferGetBlockNumber(nbuf);
455 Assert(nblkno != current->blkno);
456
457 leafdata = leafptr = palloc(size);
458
460
461 /* copy all the old tuples to new page, unless they're dead */
462 nInsert = 0;
463 if (!replaceDead)
464 {
465 for (i = 0; i < nDelete; i++)
466 {
468
469 it = (SpGistLeafTuple) PageGetItem(current->page,
470 PageGetItemId(current->page, toDelete[i]));
472
473 /*
474 * Update chain link (notice the chain order gets reversed, but we
475 * don't care). We're modifying the tuple on the source page
476 * here, but it's okay since we're about to delete it.
477 */
478 SGLT_SET_NEXTOFFSET(it, r);
479
480 r = SpGistPageAddNewItem(state, npage, it, it->size, &startOffset, false);
481
482 toInsert[nInsert] = r;
483 nInsert++;
484
485 /* save modified tuple into leafdata as well */
486 memcpy(leafptr, it, it->size);
487 leafptr += it->size;
488 }
489 }
490
491 /* add the new tuple as well */
492 SGLT_SET_NEXTOFFSET(newLeafTuple, r);
493 r = SpGistPageAddNewItem(state, npage, newLeafTuple, newLeafTuple->size, &startOffset, false);
494 toInsert[nInsert] = r;
495 nInsert++;
496 memcpy(leafptr, newLeafTuple, newLeafTuple->size);
497 leafptr += newLeafTuple->size;
498
499 /*
500 * Now delete the old tuples, leaving a redirection pointer behind for the
501 * first one, unless we're doing an index build; in which case there can't
502 * be any concurrent scan so we need not provide a redirect.
503 */
504 spgPageIndexMultiDelete(state, current->page, toDelete, nDelete,
507 nblkno, r);
508
509 /* Update parent's downlink and mark parent page dirty */
510 saveNodeLink(index, parent, nblkno, r);
511
512 /* Mark the leaf pages too */
513 MarkBufferDirty(current->buffer);
514 MarkBufferDirty(nbuf);
515
516 if (RelationNeedsWAL(index) && !state->isBuild)
517 {
518 XLogRecPtr recptr;
519
520 /* prepare WAL info */
521 STORE_STATE(state, xlrec.stateSrc);
522
523 xlrec.nMoves = nDelete;
524 xlrec.replaceDead = replaceDead;
525 xlrec.storesNulls = isNulls;
526
527 xlrec.offnumParent = parent->offnum;
528 xlrec.nodeI = parent->node;
529
532 XLogRegisterData(toDelete,
533 sizeof(OffsetNumber) * nDelete);
534 XLogRegisterData(toInsert,
535 sizeof(OffsetNumber) * nInsert);
536 XLogRegisterData(leafdata, leafptr - leafdata);
537
541
542 recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_MOVE_LEAFS);
543
544 PageSetLSN(current->page, recptr);
545 PageSetLSN(npage, recptr);
546 PageSetLSN(parent->page, recptr);
547 }
548
550
551 /* Update local free-space cache and release new buffer */
554}
555
556/*
557 * Update previously-created redirection tuple with appropriate destination
558 *
559 * We use this when it's not convenient to know the destination first.
560 * The tuple should have been made with the "impossible" destination of
561 * the metapage.
562 */
563static void
565 BlockNumber blkno, OffsetNumber offnum)
566{
568
569 dt = (SpGistDeadTuple) PageGetItem(current->page,
570 PageGetItemId(current->page, position));
573 ItemPointerSet(&dt->pointer, blkno, offnum);
574}
575
576/*
577 * Test to see if the user-defined picksplit function failed to do its job,
578 * ie, it put all the leaf tuples into the same node.
579 * If so, randomly divide the tuples into several nodes (all with the same
580 * label) and return true to select allTheSame mode for this inner tuple.
581 *
582 * (This code is also used to forcibly select allTheSame mode for nulls.)
583 *
584 * If we know that the leaf tuples wouldn't all fit on one page, then we
585 * exclude the last tuple (which is the incoming new tuple that forced a split)
586 * from the check to see if more than one node is used. The reason for this
587 * is that if the existing tuples are put into only one chain, then even if
588 * we move them all to an empty page, there would still not be room for the
589 * new tuple, so we'd get into an infinite loop of picksplit attempts.
590 * Forcing allTheSame mode dodges this problem by ensuring the old tuples will
591 * be split across pages. (Exercise for the reader: figure out why this
592 * fixes the problem even when there is only one old tuple.)
593 */
594static bool
596 bool *includeNew)
597{
598 int theNode;
599 int limit;
600 int i;
601
602 /* For the moment, assume we can include the new leaf tuple */
603 *includeNew = true;
604
605 /* If there's only the new leaf tuple, don't select allTheSame mode */
606 if (in->nTuples <= 1)
607 return false;
608
609 /* If tuple set doesn't fit on one page, ignore the new tuple in test */
610 limit = tooBig ? in->nTuples - 1 : in->nTuples;
611
612 /* Check to see if more than one node is populated */
613 theNode = out->mapTuplesToNodes[0];
614 for (i = 1; i < limit; i++)
615 {
616 if (out->mapTuplesToNodes[i] != theNode)
617 return false;
618 }
619
620 /* Nope, so override the picksplit function's decisions */
621
622 /* If the new tuple is in its own node, it can't be included in split */
623 if (tooBig && out->mapTuplesToNodes[in->nTuples - 1] != theNode)
624 *includeNew = false;
625
626 out->nNodes = 8; /* arbitrary number of child nodes */
627
628 /* Random assignment of tuples to nodes (note we include new tuple) */
629 for (i = 0; i < in->nTuples; i++)
630 out->mapTuplesToNodes[i] = i % out->nNodes;
631
632 /* The opclass may not use node labels, but if it does, duplicate 'em */
633 if (out->nodeLabels)
634 {
635 Datum theLabel = out->nodeLabels[theNode];
636
637 out->nodeLabels = (Datum *) palloc(sizeof(Datum) * out->nNodes);
638 for (i = 0; i < out->nNodes; i++)
639 out->nodeLabels[i] = theLabel;
640 }
641
642 /* We don't touch the prefix or the leaf tuple datum assignments */
643
644 return true;
645}
646
647/*
648 * current points to a leaf-tuple chain that we wanted to add newLeafTuple to,
649 * but the chain has to be split because there's not enough room to add
650 * newLeafTuple to its page.
651 *
652 * This function splits the leaf tuple set according to picksplit's rules,
653 * creating one or more new chains that are spread across the current page
654 * and an additional leaf page (we assume that two leaf pages will be
655 * sufficient). A new inner tuple is created, and the parent downlink
656 * pointer is updated to point to that inner tuple instead of the leaf chain.
657 *
658 * On exit, current contains the address of the new inner tuple.
659 *
660 * Returns true if we successfully inserted newLeafTuple during this function,
661 * false if caller still has to do it (meaning another picksplit operation is
662 * probably needed). Failure could occur if the picksplit result is fairly
663 * unbalanced, or if newLeafTuple is just plain too big to fit on a page.
664 * Because we force the picksplit result to be at least two chains, each
665 * cycle will get rid of at least one leaf tuple from the chain, so the loop
666 * will eventually terminate if lack of balance is the issue. If the tuple
667 * is too big, we assume that repeated picksplit operations will eventually
668 * make it small enough by repeated prefix-stripping. A broken opclass could
669 * make this an infinite loop, though, so spgdoinsert() checks that the
670 * leaf datums get smaller each time.
671 */
672static bool
674 SPPageDesc *current, SPPageDesc *parent,
675 SpGistLeafTuple newLeafTuple,
676 int level, bool isNulls, bool isNew)
677{
678 bool insertedNew = false;
680 spgPickSplitOut out;
681 FmgrInfo *procinfo;
682 bool includeNew;
683 int i,
684 max,
685 n;
686 SpGistInnerTuple innerTuple;
687 SpGistNodeTuple node,
688 *nodes;
689 Buffer newInnerBuffer,
690 newLeafBuffer;
691 uint8 *leafPageSelect;
692 int *leafSizes;
693 OffsetNumber *toDelete;
694 OffsetNumber *toInsert;
695 OffsetNumber redirectTuplePos = InvalidOffsetNumber;
696 OffsetNumber startOffsets[2];
697 SpGistLeafTuple *oldLeafs;
698 SpGistLeafTuple *newLeafs;
699 Datum leafDatums[INDEX_MAX_KEYS];
700 bool leafIsnulls[INDEX_MAX_KEYS];
701 int spaceToDelete;
702 int currentFreeSpace;
703 int totalLeafSizes;
704 bool allTheSame;
705 spgxlogPickSplit xlrec;
706 char *leafdata,
707 *leafptr;
708 SPPageDesc saveCurrent;
709 int nToDelete,
710 nToInsert,
711 maxToInclude;
712
713 in.level = level;
714
715 /*
716 * Allocate per-leaf-tuple work arrays with max possible size
717 */
718 max = PageGetMaxOffsetNumber(current->page);
719 n = max + 1;
720 in.datums = (Datum *) palloc(sizeof(Datum) * n);
721 toDelete = (OffsetNumber *) palloc(sizeof(OffsetNumber) * n);
722 toInsert = (OffsetNumber *) palloc(sizeof(OffsetNumber) * n);
723 oldLeafs = (SpGistLeafTuple *) palloc(sizeof(SpGistLeafTuple) * n);
724 newLeafs = (SpGistLeafTuple *) palloc(sizeof(SpGistLeafTuple) * n);
725 leafPageSelect = (uint8 *) palloc(sizeof(uint8) * n);
726
727 STORE_STATE(state, xlrec.stateSrc);
728
729 /*
730 * Form list of leaf tuples which will be distributed as split result;
731 * also, count up the amount of space that will be freed from current.
732 * (Note that in the non-root case, we won't actually delete the old
733 * tuples, only replace them with redirects or placeholders.)
734 */
735 nToInsert = 0;
736 nToDelete = 0;
737 spaceToDelete = 0;
738 if (SpGistBlockIsRoot(current->blkno))
739 {
740 /*
741 * We are splitting the root (which up to now is also a leaf page).
742 * Its tuples are not linked, so scan sequentially to get them all. We
743 * ignore the original value of current->offnum.
744 */
745 for (i = FirstOffsetNumber; i <= max; i++)
746 {
748
749 it = (SpGistLeafTuple) PageGetItem(current->page,
750 PageGetItemId(current->page, i));
751 if (it->tupstate == SPGIST_LIVE)
752 {
753 in.datums[nToInsert] =
754 isNulls ? (Datum) 0 : SGLTDATUM(it, state);
755 oldLeafs[nToInsert] = it;
756 nToInsert++;
757 toDelete[nToDelete] = i;
758 nToDelete++;
759 /* we will delete the tuple altogether, so count full space */
760 spaceToDelete += it->size + sizeof(ItemIdData);
761 }
762 else /* tuples on root should be live */
763 elog(ERROR, "unexpected SPGiST tuple state: %d", it->tupstate);
764 }
765 }
766 else
767 {
768 /* Normal case, just collect the leaf tuples in the chain */
769 i = current->offnum;
770 while (i != InvalidOffsetNumber)
771 {
773
774 Assert(i >= FirstOffsetNumber && i <= max);
775 it = (SpGistLeafTuple) PageGetItem(current->page,
776 PageGetItemId(current->page, i));
777 if (it->tupstate == SPGIST_LIVE)
778 {
779 in.datums[nToInsert] =
780 isNulls ? (Datum) 0 : SGLTDATUM(it, state);
781 oldLeafs[nToInsert] = it;
782 nToInsert++;
783 toDelete[nToDelete] = i;
784 nToDelete++;
785 /* we will not delete the tuple, only replace with dead */
786 Assert(it->size >= SGDTSIZE);
787 spaceToDelete += it->size - SGDTSIZE;
788 }
789 else if (it->tupstate == SPGIST_DEAD)
790 {
791 /* We could see a DEAD tuple as first/only chain item */
792 Assert(i == current->offnum);
794 toDelete[nToDelete] = i;
795 nToDelete++;
796 /* replacing it with redirect will save no space */
797 }
798 else
799 elog(ERROR, "unexpected SPGiST tuple state: %d", it->tupstate);
800
802 }
803 }
804 in.nTuples = nToInsert;
805
806 /*
807 * We may not actually insert new tuple because another picksplit may be
808 * necessary due to too large value, but we will try to allocate enough
809 * space to include it; and in any case it has to be included in the input
810 * for the picksplit function. So don't increment nToInsert yet.
811 */
812 in.datums[in.nTuples] =
813 isNulls ? (Datum) 0 : SGLTDATUM(newLeafTuple, state);
814 oldLeafs[in.nTuples] = newLeafTuple;
815 in.nTuples++;
816
817 memset(&out, 0, sizeof(out));
818
819 if (!isNulls)
820 {
821 /*
822 * Perform split using user-defined method.
823 */
825 FunctionCall2Coll(procinfo,
826 index->rd_indcollation[0],
827 PointerGetDatum(&in),
828 PointerGetDatum(&out));
829
830 /*
831 * Form new leaf tuples and count up the total space needed.
832 */
833 totalLeafSizes = 0;
834 for (i = 0; i < in.nTuples; i++)
835 {
836 if (state->leafTupDesc->natts > 1)
837 spgDeformLeafTuple(oldLeafs[i],
838 state->leafTupDesc,
839 leafDatums,
840 leafIsnulls,
841 isNulls);
842
843 leafDatums[spgKeyColumn] = out.leafTupleDatums[i];
844 leafIsnulls[spgKeyColumn] = false;
845
846 newLeafs[i] = spgFormLeafTuple(state, &oldLeafs[i]->heapPtr,
847 leafDatums,
848 leafIsnulls);
849 totalLeafSizes += newLeafs[i]->size + sizeof(ItemIdData);
850 }
851 }
852 else
853 {
854 /*
855 * Perform dummy split that puts all tuples into one node.
856 * checkAllTheSame will override this and force allTheSame mode.
857 */
858 out.hasPrefix = false;
859 out.nNodes = 1;
860 out.nodeLabels = NULL;
861 out.mapTuplesToNodes = palloc0(sizeof(int) * in.nTuples);
862
863 /*
864 * Form new leaf tuples and count up the total space needed.
865 */
866 totalLeafSizes = 0;
867 for (i = 0; i < in.nTuples; i++)
868 {
869 if (state->leafTupDesc->natts > 1)
870 spgDeformLeafTuple(oldLeafs[i],
871 state->leafTupDesc,
872 leafDatums,
873 leafIsnulls,
874 isNulls);
875
876 /*
877 * Nulls tree can contain only null key values.
878 */
879 leafDatums[spgKeyColumn] = (Datum) 0;
880 leafIsnulls[spgKeyColumn] = true;
881
882 newLeafs[i] = spgFormLeafTuple(state, &oldLeafs[i]->heapPtr,
883 leafDatums,
884 leafIsnulls);
885 totalLeafSizes += newLeafs[i]->size + sizeof(ItemIdData);
886 }
887 }
888
889 /*
890 * Check to see if the picksplit function failed to separate the values,
891 * ie, it put them all into the same child node. If so, select allTheSame
892 * mode and create a random split instead. See comments for
893 * checkAllTheSame as to why we need to know if the new leaf tuples could
894 * fit on one page.
895 */
896 allTheSame = checkAllTheSame(&in, &out,
897 totalLeafSizes > SPGIST_PAGE_CAPACITY,
898 &includeNew);
899
900 /*
901 * If checkAllTheSame decided we must exclude the new tuple, don't
902 * consider it any further.
903 */
904 if (includeNew)
905 maxToInclude = in.nTuples;
906 else
907 {
908 maxToInclude = in.nTuples - 1;
909 totalLeafSizes -= newLeafs[in.nTuples - 1]->size + sizeof(ItemIdData);
910 }
911
912 /*
913 * Allocate per-node work arrays. Since checkAllTheSame could replace
914 * out.nNodes with a value larger than the number of tuples on the input
915 * page, we can't allocate these arrays before here.
916 */
917 nodes = (SpGistNodeTuple *) palloc(sizeof(SpGistNodeTuple) * out.nNodes);
918 leafSizes = (int *) palloc0(sizeof(int) * out.nNodes);
919
920 /*
921 * Form nodes of inner tuple and inner tuple itself
922 */
923 for (i = 0; i < out.nNodes; i++)
924 {
925 Datum label = (Datum) 0;
926 bool labelisnull = (out.nodeLabels == NULL);
927
928 if (!labelisnull)
929 label = out.nodeLabels[i];
930 nodes[i] = spgFormNodeTuple(state, label, labelisnull);
931 }
932 innerTuple = spgFormInnerTuple(state,
933 out.hasPrefix, out.prefixDatum,
934 out.nNodes, nodes);
935 innerTuple->allTheSame = allTheSame;
936
937 /*
938 * Update nodes[] array to point into the newly formed innerTuple, so that
939 * we can adjust their downlinks below.
940 */
941 SGITITERATE(innerTuple, i, node)
942 {
943 nodes[i] = node;
944 }
945
946 /*
947 * Re-scan new leaf tuples and count up the space needed under each node.
948 */
949 for (i = 0; i < maxToInclude; i++)
950 {
951 n = out.mapTuplesToNodes[i];
952 if (n < 0 || n >= out.nNodes)
953 elog(ERROR, "inconsistent result of SPGiST picksplit function");
954 leafSizes[n] += newLeafs[i]->size + sizeof(ItemIdData);
955 }
956
957 /*
958 * To perform the split, we must insert a new inner tuple, which can't go
959 * on a leaf page; and unless we are splitting the root page, we must then
960 * update the parent tuple's downlink to point to the inner tuple. If
961 * there is room, we'll put the new inner tuple on the same page as the
962 * parent tuple, otherwise we need another non-leaf buffer. But if the
963 * parent page is the root, we can't add the new inner tuple there,
964 * because the root page must have only one inner tuple.
965 */
966 xlrec.initInner = false;
967 if (parent->buffer != InvalidBuffer &&
968 !SpGistBlockIsRoot(parent->blkno) &&
969 (SpGistPageGetFreeSpace(parent->page, 1) >=
970 innerTuple->size + sizeof(ItemIdData)))
971 {
972 /* New inner tuple will fit on parent page */
973 newInnerBuffer = parent->buffer;
974 }
975 else if (parent->buffer != InvalidBuffer)
976 {
977 /* Send tuple to page with next triple parity (see README) */
978 newInnerBuffer = SpGistGetBuffer(index,
979 GBUF_INNER_PARITY(parent->blkno + 1) |
980 (isNulls ? GBUF_NULLS : 0),
981 innerTuple->size + sizeof(ItemIdData),
982 &xlrec.initInner);
983 }
984 else
985 {
986 /* Root page split ... inner tuple will go to root page */
987 newInnerBuffer = InvalidBuffer;
988 }
989
990 /*
991 * The new leaf tuples converted from the existing ones should require the
992 * same or less space, and therefore should all fit onto one page
993 * (although that's not necessarily the current page, since we can't
994 * delete the old tuples but only replace them with placeholders).
995 * However, the incoming new tuple might not also fit, in which case we
996 * might need another picksplit cycle to reduce it some more.
997 *
998 * If there's not room to put everything back onto the current page, then
999 * we decide on a per-node basis which tuples go to the new page. (We do
1000 * it like that because leaf tuple chains can't cross pages, so we must
1001 * place all leaf tuples belonging to the same parent node on the same
1002 * page.)
1003 *
1004 * If we are splitting the root page (turning it from a leaf page into an
1005 * inner page), then no leaf tuples can go back to the current page; they
1006 * must all go somewhere else.
1007 */
1008 if (!SpGistBlockIsRoot(current->blkno))
1009 currentFreeSpace = PageGetExactFreeSpace(current->page) + spaceToDelete;
1010 else
1011 currentFreeSpace = 0; /* prevent assigning any tuples to current */
1012
1013 xlrec.initDest = false;
1014
1015 if (totalLeafSizes <= currentFreeSpace)
1016 {
1017 /* All the leaf tuples will fit on current page */
1018 newLeafBuffer = InvalidBuffer;
1019 /* mark new leaf tuple as included in insertions, if allowed */
1020 if (includeNew)
1021 {
1022 nToInsert++;
1023 insertedNew = true;
1024 }
1025 for (i = 0; i < nToInsert; i++)
1026 leafPageSelect[i] = 0; /* signifies current page */
1027 }
1028 else if (in.nTuples == 1 && totalLeafSizes > SPGIST_PAGE_CAPACITY)
1029 {
1030 /*
1031 * We're trying to split up a long value by repeated suffixing, but
1032 * it's not going to fit yet. Don't bother allocating a second leaf
1033 * buffer that we won't be able to use.
1034 */
1035 newLeafBuffer = InvalidBuffer;
1036 Assert(includeNew);
1037 Assert(nToInsert == 0);
1038 }
1039 else
1040 {
1041 /* We will need another leaf page */
1042 uint8 *nodePageSelect;
1043 int curspace;
1044 int newspace;
1045
1046 newLeafBuffer = SpGistGetBuffer(index,
1047 GBUF_LEAF | (isNulls ? GBUF_NULLS : 0),
1048 Min(totalLeafSizes,
1050 &xlrec.initDest);
1051
1052 /*
1053 * Attempt to assign node groups to the two pages. We might fail to
1054 * do so, even if totalLeafSizes is less than the available space,
1055 * because we can't split a group across pages.
1056 */
1057 nodePageSelect = (uint8 *) palloc(sizeof(uint8) * out.nNodes);
1058
1059 curspace = currentFreeSpace;
1060 newspace = PageGetExactFreeSpace(BufferGetPage(newLeafBuffer));
1061 for (i = 0; i < out.nNodes; i++)
1062 {
1063 if (leafSizes[i] <= curspace)
1064 {
1065 nodePageSelect[i] = 0; /* signifies current page */
1066 curspace -= leafSizes[i];
1067 }
1068 else
1069 {
1070 nodePageSelect[i] = 1; /* signifies new leaf page */
1071 newspace -= leafSizes[i];
1072 }
1073 }
1074 if (curspace >= 0 && newspace >= 0)
1075 {
1076 /* Successful assignment, so we can include the new leaf tuple */
1077 if (includeNew)
1078 {
1079 nToInsert++;
1080 insertedNew = true;
1081 }
1082 }
1083 else if (includeNew)
1084 {
1085 /* We must exclude the new leaf tuple from the split */
1086 int nodeOfNewTuple = out.mapTuplesToNodes[in.nTuples - 1];
1087
1088 leafSizes[nodeOfNewTuple] -=
1089 newLeafs[in.nTuples - 1]->size + sizeof(ItemIdData);
1090
1091 /* Repeat the node assignment process --- should succeed now */
1092 curspace = currentFreeSpace;
1093 newspace = PageGetExactFreeSpace(BufferGetPage(newLeafBuffer));
1094 for (i = 0; i < out.nNodes; i++)
1095 {
1096 if (leafSizes[i] <= curspace)
1097 {
1098 nodePageSelect[i] = 0; /* signifies current page */
1099 curspace -= leafSizes[i];
1100 }
1101 else
1102 {
1103 nodePageSelect[i] = 1; /* signifies new leaf page */
1104 newspace -= leafSizes[i];
1105 }
1106 }
1107 if (curspace < 0 || newspace < 0)
1108 elog(ERROR, "failed to divide leaf tuple groups across pages");
1109 }
1110 else
1111 {
1112 /* oops, we already excluded new tuple ... should not get here */
1113 elog(ERROR, "failed to divide leaf tuple groups across pages");
1114 }
1115 /* Expand the per-node assignments to be shown per leaf tuple */
1116 for (i = 0; i < nToInsert; i++)
1117 {
1118 n = out.mapTuplesToNodes[i];
1119 leafPageSelect[i] = nodePageSelect[n];
1120 }
1121 }
1122
1123 /* Start preparing WAL record */
1124 xlrec.nDelete = 0;
1125 xlrec.initSrc = isNew;
1126 xlrec.storesNulls = isNulls;
1127 xlrec.isRootSplit = SpGistBlockIsRoot(current->blkno);
1128
1129 leafdata = leafptr = (char *) palloc(totalLeafSizes);
1130
1131 /* Here we begin making the changes to the target pages */
1133
1134 /*
1135 * Delete old leaf tuples from current buffer, except when we're splitting
1136 * the root; in that case there's no need because we'll re-init the page
1137 * below. We do this first to make room for reinserting new leaf tuples.
1138 */
1139 if (!SpGistBlockIsRoot(current->blkno))
1140 {
1141 /*
1142 * Init buffer instead of deleting individual tuples, but only if
1143 * there aren't any other live tuples and only during build; otherwise
1144 * we need to set a redirection tuple for concurrent scans.
1145 */
1146 if (state->isBuild &&
1147 nToDelete + SpGistPageGetOpaque(current->page)->nPlaceholder ==
1148 PageGetMaxOffsetNumber(current->page))
1149 {
1150 SpGistInitBuffer(current->buffer,
1151 SPGIST_LEAF | (isNulls ? SPGIST_NULLS : 0));
1152 xlrec.initSrc = true;
1153 }
1154 else if (isNew)
1155 {
1156 /* don't expose the freshly init'd buffer as a backup block */
1157 Assert(nToDelete == 0);
1158 }
1159 else
1160 {
1161 xlrec.nDelete = nToDelete;
1162
1163 if (!state->isBuild)
1164 {
1165 /*
1166 * Need to create redirect tuple (it will point to new inner
1167 * tuple) but right now the new tuple's location is not known
1168 * yet. So, set the redirection pointer to "impossible" value
1169 * and remember its position to update tuple later.
1170 */
1171 if (nToDelete > 0)
1172 redirectTuplePos = toDelete[0];
1174 toDelete, nToDelete,
1179 }
1180 else
1181 {
1182 /*
1183 * During index build there is not concurrent searches, so we
1184 * don't need to create redirection tuple.
1185 */
1187 toDelete, nToDelete,
1192 }
1193 }
1194 }
1195
1196 /*
1197 * Put leaf tuples on proper pages, and update downlinks in innerTuple's
1198 * nodes.
1199 */
1200 startOffsets[0] = startOffsets[1] = InvalidOffsetNumber;
1201 for (i = 0; i < nToInsert; i++)
1202 {
1203 SpGistLeafTuple it = newLeafs[i];
1204 Buffer leafBuffer;
1205 BlockNumber leafBlock;
1206 OffsetNumber newoffset;
1207
1208 /* Which page is it going to? */
1209 leafBuffer = leafPageSelect[i] ? newLeafBuffer : current->buffer;
1210 leafBlock = BufferGetBlockNumber(leafBuffer);
1211
1212 /* Link tuple into correct chain for its node */
1213 n = out.mapTuplesToNodes[i];
1214
1215 if (ItemPointerIsValid(&nodes[n]->t_tid))
1216 {
1217 Assert(ItemPointerGetBlockNumber(&nodes[n]->t_tid) == leafBlock);
1218 SGLT_SET_NEXTOFFSET(it, ItemPointerGetOffsetNumber(&nodes[n]->t_tid));
1219 }
1220 else
1222
1223 /* Insert it on page */
1224 newoffset = SpGistPageAddNewItem(state, BufferGetPage(leafBuffer),
1225 it, it->size,
1226 &startOffsets[leafPageSelect[i]],
1227 false);
1228 toInsert[i] = newoffset;
1229
1230 /* ... and complete the chain linking */
1231 ItemPointerSet(&nodes[n]->t_tid, leafBlock, newoffset);
1232
1233 /* Also copy leaf tuple into WAL data */
1234 memcpy(leafptr, newLeafs[i], newLeafs[i]->size);
1235 leafptr += newLeafs[i]->size;
1236 }
1237
1238 /*
1239 * We're done modifying the other leaf buffer (if any), so mark it dirty.
1240 * current->buffer will be marked below, after we're entirely done
1241 * modifying it.
1242 */
1243 if (newLeafBuffer != InvalidBuffer)
1244 {
1245 MarkBufferDirty(newLeafBuffer);
1246 }
1247
1248 /* Remember current buffer, since we're about to change "current" */
1249 saveCurrent = *current;
1250
1251 /*
1252 * Store the new innerTuple
1253 */
1254 if (newInnerBuffer == parent->buffer && newInnerBuffer != InvalidBuffer)
1255 {
1256 /*
1257 * new inner tuple goes to parent page
1258 */
1259 Assert(current->buffer != parent->buffer);
1260
1261 /* Repoint "current" at the new inner tuple */
1262 current->blkno = parent->blkno;
1263 current->buffer = parent->buffer;
1264 current->page = parent->page;
1265 xlrec.offnumInner = current->offnum =
1267 innerTuple, innerTuple->size,
1268 NULL, false);
1269
1270 /*
1271 * Update parent node link and mark parent page dirty
1272 */
1273 xlrec.innerIsParent = true;
1274 xlrec.offnumParent = parent->offnum;
1275 xlrec.nodeI = parent->node;
1276 saveNodeLink(index, parent, current->blkno, current->offnum);
1277
1278 /*
1279 * Update redirection link (in old current buffer)
1280 */
1281 if (redirectTuplePos != InvalidOffsetNumber)
1282 setRedirectionTuple(&saveCurrent, redirectTuplePos,
1283 current->blkno, current->offnum);
1284
1285 /* Done modifying old current buffer, mark it dirty */
1286 MarkBufferDirty(saveCurrent.buffer);
1287 }
1288 else if (parent->buffer != InvalidBuffer)
1289 {
1290 /*
1291 * new inner tuple will be stored on a new page
1292 */
1293 Assert(newInnerBuffer != InvalidBuffer);
1294
1295 /* Repoint "current" at the new inner tuple */
1296 current->buffer = newInnerBuffer;
1297 current->blkno = BufferGetBlockNumber(current->buffer);
1298 current->page = BufferGetPage(current->buffer);
1299 xlrec.offnumInner = current->offnum =
1301 innerTuple, innerTuple->size,
1302 NULL, false);
1303
1304 /* Done modifying new current buffer, mark it dirty */
1305 MarkBufferDirty(current->buffer);
1306
1307 /*
1308 * Update parent node link and mark parent page dirty
1309 */
1310 xlrec.innerIsParent = (parent->buffer == current->buffer);
1311 xlrec.offnumParent = parent->offnum;
1312 xlrec.nodeI = parent->node;
1313 saveNodeLink(index, parent, current->blkno, current->offnum);
1314
1315 /*
1316 * Update redirection link (in old current buffer)
1317 */
1318 if (redirectTuplePos != InvalidOffsetNumber)
1319 setRedirectionTuple(&saveCurrent, redirectTuplePos,
1320 current->blkno, current->offnum);
1321
1322 /* Done modifying old current buffer, mark it dirty */
1323 MarkBufferDirty(saveCurrent.buffer);
1324 }
1325 else
1326 {
1327 /*
1328 * Splitting root page, which was a leaf but now becomes inner page
1329 * (and so "current" continues to point at it)
1330 */
1331 Assert(SpGistBlockIsRoot(current->blkno));
1332 Assert(redirectTuplePos == InvalidOffsetNumber);
1333
1334 SpGistInitBuffer(current->buffer, (isNulls ? SPGIST_NULLS : 0));
1335 xlrec.initInner = true;
1336 xlrec.innerIsParent = false;
1337
1338 xlrec.offnumInner = current->offnum =
1339 PageAddItem(current->page, innerTuple, innerTuple->size,
1340 InvalidOffsetNumber, false, false);
1341 if (current->offnum != FirstOffsetNumber)
1342 elog(ERROR, "failed to add item of size %u to SPGiST index page",
1343 innerTuple->size);
1344
1345 /* No parent link to update, nor redirection to do */
1347 xlrec.nodeI = 0;
1348
1349 /* Done modifying new current buffer, mark it dirty */
1350 MarkBufferDirty(current->buffer);
1351
1352 /* saveCurrent doesn't represent a different buffer */
1353 saveCurrent.buffer = InvalidBuffer;
1354 }
1355
1356 if (RelationNeedsWAL(index) && !state->isBuild)
1357 {
1358 XLogRecPtr recptr;
1359 int flags;
1360
1362
1363 xlrec.nInsert = nToInsert;
1365
1366 XLogRegisterData(toDelete,
1367 sizeof(OffsetNumber) * xlrec.nDelete);
1368 XLogRegisterData(toInsert,
1369 sizeof(OffsetNumber) * xlrec.nInsert);
1370 XLogRegisterData(leafPageSelect,
1371 sizeof(uint8) * xlrec.nInsert);
1372 XLogRegisterData(innerTuple, innerTuple->size);
1373 XLogRegisterData(leafdata, leafptr - leafdata);
1374
1375 /* Old leaf page */
1376 if (BufferIsValid(saveCurrent.buffer))
1377 {
1378 flags = REGBUF_STANDARD;
1379 if (xlrec.initSrc)
1380 flags |= REGBUF_WILL_INIT;
1381 XLogRegisterBuffer(0, saveCurrent.buffer, flags);
1382 }
1383
1384 /* New leaf page */
1385 if (BufferIsValid(newLeafBuffer))
1386 {
1387 flags = REGBUF_STANDARD;
1388 if (xlrec.initDest)
1389 flags |= REGBUF_WILL_INIT;
1390 XLogRegisterBuffer(1, newLeafBuffer, flags);
1391 }
1392
1393 /* Inner page */
1394 flags = REGBUF_STANDARD;
1395 if (xlrec.initInner)
1396 flags |= REGBUF_WILL_INIT;
1397 XLogRegisterBuffer(2, current->buffer, flags);
1398
1399 /* Parent page, if different from inner page */
1400 if (parent->buffer != InvalidBuffer)
1401 {
1402 if (parent->buffer != current->buffer)
1404 else
1405 Assert(xlrec.innerIsParent);
1406 }
1407
1408 /* Issue the WAL record */
1409 recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_PICKSPLIT);
1410
1411 /* Update page LSNs on all affected pages */
1412 if (newLeafBuffer != InvalidBuffer)
1413 {
1414 Page page = BufferGetPage(newLeafBuffer);
1415
1416 PageSetLSN(page, recptr);
1417 }
1418
1419 if (saveCurrent.buffer != InvalidBuffer)
1420 {
1421 Page page = BufferGetPage(saveCurrent.buffer);
1422
1423 PageSetLSN(page, recptr);
1424 }
1425
1426 PageSetLSN(current->page, recptr);
1427
1428 if (parent->buffer != InvalidBuffer)
1429 {
1430 PageSetLSN(parent->page, recptr);
1431 }
1432 }
1433
1435
1436 /* Update local free-space cache and unlock buffers */
1437 if (newLeafBuffer != InvalidBuffer)
1438 {
1439 SpGistSetLastUsedPage(index, newLeafBuffer);
1440 UnlockReleaseBuffer(newLeafBuffer);
1441 }
1442 if (saveCurrent.buffer != InvalidBuffer)
1443 {
1444 SpGistSetLastUsedPage(index, saveCurrent.buffer);
1445 UnlockReleaseBuffer(saveCurrent.buffer);
1446 }
1447
1448 return insertedNew;
1449}
1450
1451/*
1452 * spgMatchNode action: descend to N'th child node of current inner tuple
1453 */
1454static void
1456 SpGistInnerTuple innerTuple,
1457 SPPageDesc *current, SPPageDesc *parent, int nodeN)
1458{
1459 int i;
1460 SpGistNodeTuple node;
1461
1462 /* Release previous parent buffer if any */
1463 if (parent->buffer != InvalidBuffer &&
1464 parent->buffer != current->buffer)
1465 {
1467 UnlockReleaseBuffer(parent->buffer);
1468 }
1469
1470 /* Repoint parent to specified node of current inner tuple */
1471 parent->blkno = current->blkno;
1472 parent->buffer = current->buffer;
1473 parent->page = current->page;
1474 parent->offnum = current->offnum;
1475 parent->node = nodeN;
1476
1477 /* Locate that node */
1478 SGITITERATE(innerTuple, i, node)
1479 {
1480 if (i == nodeN)
1481 break;
1482 }
1483
1484 if (i != nodeN)
1485 elog(ERROR, "failed to find requested node %d in SPGiST inner tuple",
1486 nodeN);
1487
1488 /* Point current to the downlink location, if any */
1489 if (ItemPointerIsValid(&node->t_tid))
1490 {
1491 current->blkno = ItemPointerGetBlockNumber(&node->t_tid);
1492 current->offnum = ItemPointerGetOffsetNumber(&node->t_tid);
1493 }
1494 else
1495 {
1496 /* Downlink is empty, so we'll need to find a new page */
1497 current->blkno = InvalidBlockNumber;
1498 current->offnum = InvalidOffsetNumber;
1499 }
1500
1501 current->buffer = InvalidBuffer;
1502 current->page = NULL;
1503}
1504
1505/*
1506 * spgAddNode action: add a node to the inner tuple at current
1507 */
1508static void
1510 SpGistInnerTuple innerTuple,
1511 SPPageDesc *current, SPPageDesc *parent,
1512 int nodeN, Datum nodeLabel)
1513{
1514 SpGistInnerTuple newInnerTuple;
1515 spgxlogAddNode xlrec;
1516
1517 /* Should not be applied to nulls */
1518 Assert(!SpGistPageStoresNulls(current->page));
1519
1520 /* Construct new inner tuple with additional node */
1521 newInnerTuple = addNode(state, innerTuple, nodeLabel, nodeN);
1522
1523 /* Prepare WAL record */
1524 STORE_STATE(state, xlrec.stateSrc);
1525 xlrec.offnum = current->offnum;
1526
1527 /* we don't fill these unless we need to change the parent downlink */
1528 xlrec.parentBlk = -1;
1530 xlrec.nodeI = 0;
1531
1532 /* we don't fill these unless tuple has to be moved */
1534 xlrec.newPage = false;
1535
1536 if (PageGetExactFreeSpace(current->page) >=
1537 newInnerTuple->size - innerTuple->size)
1538 {
1539 /*
1540 * We can replace the inner tuple by new version in-place
1541 */
1543
1544 PageIndexTupleDelete(current->page, current->offnum);
1545 if (PageAddItem(current->page,
1546 newInnerTuple, newInnerTuple->size,
1547 current->offnum, false, false) != current->offnum)
1548 elog(ERROR, "failed to add item of size %u to SPGiST index page",
1549 newInnerTuple->size);
1550
1551 MarkBufferDirty(current->buffer);
1552
1553 if (RelationNeedsWAL(index) && !state->isBuild)
1554 {
1555 XLogRecPtr recptr;
1556
1558 XLogRegisterData(&xlrec, sizeof(xlrec));
1559 XLogRegisterData(newInnerTuple, newInnerTuple->size);
1560
1562
1563 recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_ADD_NODE);
1564
1565 PageSetLSN(current->page, recptr);
1566 }
1567
1569 }
1570 else
1571 {
1572 /*
1573 * move inner tuple to another page, and update parent
1574 */
1575 SpGistDeadTuple dt;
1576 SPPageDesc saveCurrent;
1577
1578 /*
1579 * It should not be possible to get here for the root page, since we
1580 * allow only one inner tuple on the root page, and spgFormInnerTuple
1581 * always checks that inner tuples don't exceed the size of a page.
1582 */
1583 if (SpGistBlockIsRoot(current->blkno))
1584 elog(ERROR, "cannot enlarge root tuple any more");
1585 Assert(parent->buffer != InvalidBuffer);
1586
1587 saveCurrent = *current;
1588
1589 xlrec.offnumParent = parent->offnum;
1590 xlrec.nodeI = parent->node;
1591
1592 /*
1593 * obtain new buffer with the same parity as current, since it will be
1594 * a child of same parent tuple
1595 */
1596 current->buffer = SpGistGetBuffer(index,
1597 GBUF_INNER_PARITY(current->blkno),
1598 newInnerTuple->size + sizeof(ItemIdData),
1599 &xlrec.newPage);
1600 current->blkno = BufferGetBlockNumber(current->buffer);
1601 current->page = BufferGetPage(current->buffer);
1602
1603 /*
1604 * Let's just make real sure new current isn't same as old. Right now
1605 * that's impossible, but if SpGistGetBuffer ever got smart enough to
1606 * delete placeholder tuples before checking space, maybe it wouldn't
1607 * be impossible. The case would appear to work except that WAL
1608 * replay would be subtly wrong, so I think a mere assert isn't enough
1609 * here.
1610 */
1611 if (current->blkno == saveCurrent.blkno)
1612 elog(ERROR, "SPGiST new buffer shouldn't be same as old buffer");
1613
1614 /*
1615 * New current and parent buffer will both be modified; but note that
1616 * parent buffer could be same as either new or old current.
1617 */
1618 if (parent->buffer == saveCurrent.buffer)
1619 xlrec.parentBlk = 0;
1620 else if (parent->buffer == current->buffer)
1621 xlrec.parentBlk = 1;
1622 else
1623 xlrec.parentBlk = 2;
1624
1626
1627 /* insert new ... */
1628 xlrec.offnumNew = current->offnum =
1630 newInnerTuple, newInnerTuple->size,
1631 NULL, false);
1632
1633 MarkBufferDirty(current->buffer);
1634
1635 /* update parent's downlink and mark parent page dirty */
1636 saveNodeLink(index, parent, current->blkno, current->offnum);
1637
1638 /*
1639 * Replace old tuple with a placeholder or redirection tuple. Unless
1640 * doing an index build, we have to insert a redirection tuple for
1641 * possible concurrent scans. We can't just delete it in any case,
1642 * because that could change the offsets of other tuples on the page,
1643 * breaking downlinks from their parents.
1644 */
1645 if (state->isBuild)
1648 else
1650 current->blkno, current->offnum);
1651
1652 PageIndexTupleDelete(saveCurrent.page, saveCurrent.offnum);
1653 if (PageAddItem(saveCurrent.page, dt, dt->size,
1654 saveCurrent.offnum,
1655 false, false) != saveCurrent.offnum)
1656 elog(ERROR, "failed to add item of size %u to SPGiST index page",
1657 dt->size);
1658
1659 if (state->isBuild)
1660 SpGistPageGetOpaque(saveCurrent.page)->nPlaceholder++;
1661 else
1662 SpGistPageGetOpaque(saveCurrent.page)->nRedirection++;
1663
1664 MarkBufferDirty(saveCurrent.buffer);
1665
1666 if (RelationNeedsWAL(index) && !state->isBuild)
1667 {
1668 XLogRecPtr recptr;
1669 int flags;
1670
1672
1673 /* orig page */
1674 XLogRegisterBuffer(0, saveCurrent.buffer, REGBUF_STANDARD);
1675 /* new page */
1676 flags = REGBUF_STANDARD;
1677 if (xlrec.newPage)
1678 flags |= REGBUF_WILL_INIT;
1679 XLogRegisterBuffer(1, current->buffer, flags);
1680 /* parent page (if different from orig and new) */
1681 if (xlrec.parentBlk == 2)
1683
1684 XLogRegisterData(&xlrec, sizeof(xlrec));
1685 XLogRegisterData(newInnerTuple, newInnerTuple->size);
1686
1687 recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_ADD_NODE);
1688
1689 /* we don't bother to check if any of these are redundant */
1690 PageSetLSN(current->page, recptr);
1691 PageSetLSN(parent->page, recptr);
1692 PageSetLSN(saveCurrent.page, recptr);
1693 }
1694
1696
1697 /* Release saveCurrent if it's not same as current or parent */
1698 if (saveCurrent.buffer != current->buffer &&
1699 saveCurrent.buffer != parent->buffer)
1700 {
1701 SpGistSetLastUsedPage(index, saveCurrent.buffer);
1702 UnlockReleaseBuffer(saveCurrent.buffer);
1703 }
1704 }
1705}
1706
1707/*
1708 * spgSplitNode action: split inner tuple at current into prefix and postfix
1709 */
1710static void
1712 SpGistInnerTuple innerTuple,
1713 SPPageDesc *current, spgChooseOut *out)
1714{
1715 SpGistInnerTuple prefixTuple,
1716 postfixTuple;
1717 SpGistNodeTuple node,
1718 *nodes;
1719 BlockNumber postfixBlkno;
1720 OffsetNumber postfixOffset;
1721 int i;
1722 spgxlogSplitTuple xlrec;
1723 Buffer newBuffer = InvalidBuffer;
1724
1725 /* Should not be applied to nulls */
1726 Assert(!SpGistPageStoresNulls(current->page));
1727
1728 /* Check opclass gave us sane values */
1729 if (out->result.splitTuple.prefixNNodes <= 0 ||
1731 elog(ERROR, "invalid number of prefix nodes: %d",
1733 if (out->result.splitTuple.childNodeN < 0 ||
1736 elog(ERROR, "invalid child node number: %d",
1738
1739 /*
1740 * Construct new prefix tuple with requested number of nodes. We'll fill
1741 * in the childNodeN'th node's downlink below.
1742 */
1743 nodes = (SpGistNodeTuple *) palloc(sizeof(SpGistNodeTuple) *
1745
1746 for (i = 0; i < out->result.splitTuple.prefixNNodes; i++)
1747 {
1748 Datum label = (Datum) 0;
1749 bool labelisnull;
1750
1751 labelisnull = (out->result.splitTuple.prefixNodeLabels == NULL);
1752 if (!labelisnull)
1754 nodes[i] = spgFormNodeTuple(state, label, labelisnull);
1755 }
1756
1757 prefixTuple = spgFormInnerTuple(state,
1761 nodes);
1762
1763 /* it must fit in the space that innerTuple now occupies */
1764 if (prefixTuple->size > innerTuple->size)
1765 elog(ERROR, "SPGiST inner-tuple split must not produce longer prefix");
1766
1767 /*
1768 * Construct new postfix tuple, containing all nodes of innerTuple with
1769 * same node datums, but with the prefix specified by the picksplit
1770 * function.
1771 */
1772 nodes = palloc(sizeof(SpGistNodeTuple) * innerTuple->nNodes);
1773 SGITITERATE(innerTuple, i, node)
1774 {
1775 nodes[i] = node;
1776 }
1777
1778 postfixTuple = spgFormInnerTuple(state,
1781 innerTuple->nNodes, nodes);
1782
1783 /* Postfix tuple is allTheSame if original tuple was */
1784 postfixTuple->allTheSame = innerTuple->allTheSame;
1785
1786 /* prep data for WAL record */
1787 xlrec.newPage = false;
1788
1789 /*
1790 * If we can't fit both tuples on the current page, get a new page for the
1791 * postfix tuple. In particular, can't split to the root page.
1792 *
1793 * For the space calculation, note that prefixTuple replaces innerTuple
1794 * but postfixTuple will be a new entry.
1795 */
1796 if (SpGistBlockIsRoot(current->blkno) ||
1797 SpGistPageGetFreeSpace(current->page, 1) + innerTuple->size <
1798 prefixTuple->size + postfixTuple->size + sizeof(ItemIdData))
1799 {
1800 /*
1801 * Choose page with next triple parity, because postfix tuple is a
1802 * child of prefix one
1803 */
1804 newBuffer = SpGistGetBuffer(index,
1805 GBUF_INNER_PARITY(current->blkno + 1),
1806 postfixTuple->size + sizeof(ItemIdData),
1807 &xlrec.newPage);
1808 }
1809
1811
1812 /*
1813 * Replace old tuple by prefix tuple
1814 */
1815 PageIndexTupleDelete(current->page, current->offnum);
1816 xlrec.offnumPrefix = PageAddItem(current->page,
1817 prefixTuple, prefixTuple->size,
1818 current->offnum, false, false);
1819 if (xlrec.offnumPrefix != current->offnum)
1820 elog(ERROR, "failed to add item of size %u to SPGiST index page",
1821 prefixTuple->size);
1822
1823 /*
1824 * put postfix tuple into appropriate page
1825 */
1826 if (newBuffer == InvalidBuffer)
1827 {
1828 postfixBlkno = current->blkno;
1829 xlrec.offnumPostfix = postfixOffset =
1831 postfixTuple, postfixTuple->size,
1832 NULL, false);
1833 xlrec.postfixBlkSame = true;
1834 }
1835 else
1836 {
1837 postfixBlkno = BufferGetBlockNumber(newBuffer);
1838 xlrec.offnumPostfix = postfixOffset =
1840 postfixTuple, postfixTuple->size,
1841 NULL, false);
1842 MarkBufferDirty(newBuffer);
1843 xlrec.postfixBlkSame = false;
1844 }
1845
1846 /*
1847 * And set downlink pointer in the prefix tuple to point to postfix tuple.
1848 * (We can't avoid this step by doing the above two steps in opposite
1849 * order, because there might not be enough space on the page to insert
1850 * the postfix tuple first.) We have to update the local copy of the
1851 * prefixTuple too, because that's what will be written to WAL.
1852 */
1854 postfixBlkno, postfixOffset);
1855 prefixTuple = (SpGistInnerTuple) PageGetItem(current->page,
1856 PageGetItemId(current->page, current->offnum));
1858 postfixBlkno, postfixOffset);
1859
1860 MarkBufferDirty(current->buffer);
1861
1862 if (RelationNeedsWAL(index) && !state->isBuild)
1863 {
1864 XLogRecPtr recptr;
1865
1867 XLogRegisterData(&xlrec, sizeof(xlrec));
1868 XLogRegisterData(prefixTuple, prefixTuple->size);
1869 XLogRegisterData(postfixTuple, postfixTuple->size);
1870
1872 if (newBuffer != InvalidBuffer)
1873 {
1874 int flags;
1875
1876 flags = REGBUF_STANDARD;
1877 if (xlrec.newPage)
1878 flags |= REGBUF_WILL_INIT;
1879 XLogRegisterBuffer(1, newBuffer, flags);
1880 }
1881
1882 recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_SPLIT_TUPLE);
1883
1884 PageSetLSN(current->page, recptr);
1885
1886 if (newBuffer != InvalidBuffer)
1887 {
1888 PageSetLSN(BufferGetPage(newBuffer), recptr);
1889 }
1890 }
1891
1893
1894 /* Update local free-space cache and release buffer */
1895 if (newBuffer != InvalidBuffer)
1896 {
1897 SpGistSetLastUsedPage(index, newBuffer);
1898 UnlockReleaseBuffer(newBuffer);
1899 }
1900}
1901
1902/*
1903 * Insert one item into the index.
1904 *
1905 * Returns true on success, false if we failed to complete the insertion
1906 * (typically because of conflict with a concurrent insert). In the latter
1907 * case, caller should re-call spgdoinsert() with the same args.
1908 */
1909bool
1911 const ItemPointerData *heapPtr, const Datum *datums, const bool *isnulls)
1912{
1913 bool result = true;
1914 TupleDesc leafDescriptor = state->leafTupDesc;
1915 bool isnull = isnulls[spgKeyColumn];
1916 int level = 0;
1917 Datum leafDatums[INDEX_MAX_KEYS];
1918 int leafSize;
1919 int bestLeafSize;
1920 int numNoProgressCycles = 0;
1921 SPPageDesc current,
1922 parent;
1923 FmgrInfo *procinfo = NULL;
1924
1925 /*
1926 * Look up FmgrInfo of the user-defined choose function once, to save
1927 * cycles in the loop below.
1928 */
1929 if (!isnull)
1931
1932 /*
1933 * Prepare the leaf datum to insert.
1934 *
1935 * If an optional "compress" method is provided, then call it to form the
1936 * leaf key datum from the input datum. Otherwise, store the input datum
1937 * as is. Since we don't use index_form_tuple in this AM, we have to make
1938 * sure value to be inserted is not toasted; FormIndexDatum doesn't
1939 * guarantee that. But we assume the "compress" method to return an
1940 * untoasted value.
1941 */
1942 if (!isnull)
1943 {
1945 {
1946 FmgrInfo *compressProcinfo = NULL;
1947
1948 compressProcinfo = index_getprocinfo(index, 1, SPGIST_COMPRESS_PROC);
1949 leafDatums[spgKeyColumn] =
1950 FunctionCall1Coll(compressProcinfo,
1951 index->rd_indcollation[spgKeyColumn],
1952 datums[spgKeyColumn]);
1953 }
1954 else
1955 {
1956 Assert(state->attLeafType.type == state->attType.type);
1957
1958 if (state->attType.attlen == -1)
1959 leafDatums[spgKeyColumn] =
1961 else
1962 leafDatums[spgKeyColumn] = datums[spgKeyColumn];
1963 }
1964 }
1965 else
1966 leafDatums[spgKeyColumn] = (Datum) 0;
1967
1968 /* Likewise, ensure that any INCLUDE values are not toasted */
1969 for (int i = spgFirstIncludeColumn; i < leafDescriptor->natts; i++)
1970 {
1971 if (!isnulls[i])
1972 {
1973 if (TupleDescCompactAttr(leafDescriptor, i)->attlen == -1)
1974 leafDatums[i] = PointerGetDatum(PG_DETOAST_DATUM(datums[i]));
1975 else
1976 leafDatums[i] = datums[i];
1977 }
1978 else
1979 leafDatums[i] = (Datum) 0;
1980 }
1981
1982 /*
1983 * Compute space needed for a leaf tuple containing the given data.
1984 */
1985 leafSize = SpGistGetLeafTupleSize(leafDescriptor, leafDatums, isnulls);
1986 /* Account for an item pointer, too */
1987 leafSize += sizeof(ItemIdData);
1988
1989 /*
1990 * If it isn't gonna fit, and the opclass can't reduce the datum size by
1991 * suffixing, bail out now rather than doing a lot of useless work.
1992 */
1993 if (leafSize > SPGIST_PAGE_CAPACITY &&
1994 (isnull || !state->config.longValuesOK))
1995 ereport(ERROR,
1996 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
1997 errmsg("index row size %zu exceeds maximum %zu for index \"%s\"",
1998 leafSize - sizeof(ItemIdData),
2001 errhint("Values larger than a buffer page cannot be indexed.")));
2002 bestLeafSize = leafSize;
2003
2004 /* Initialize "current" to the appropriate root page */
2005 current.blkno = isnull ? SPGIST_NULL_BLKNO : SPGIST_ROOT_BLKNO;
2006 current.buffer = InvalidBuffer;
2007 current.page = NULL;
2008 current.offnum = FirstOffsetNumber;
2009 current.node = -1;
2010
2011 /* "parent" is invalid for the moment */
2012 parent.blkno = InvalidBlockNumber;
2013 parent.buffer = InvalidBuffer;
2014 parent.page = NULL;
2015 parent.offnum = InvalidOffsetNumber;
2016 parent.node = -1;
2017
2018 /*
2019 * Before entering the loop, try to clear any pending interrupt condition.
2020 * If a query cancel is pending, we might as well accept it now not later;
2021 * while if a non-canceling condition is pending, servicing it here avoids
2022 * having to restart the insertion and redo all the work so far.
2023 */
2025
2026 for (;;)
2027 {
2028 bool isNew = false;
2029
2030 /*
2031 * Bail out if query cancel is pending. We must have this somewhere
2032 * in the loop since a broken opclass could produce an infinite
2033 * picksplit loop. However, because we'll be holding buffer lock(s)
2034 * after the first iteration, ProcessInterrupts() wouldn't be able to
2035 * throw a cancel error here. Hence, if we see that an interrupt is
2036 * pending, break out of the loop and deal with the situation below.
2037 * Set result = false because we must restart the insertion if the
2038 * interrupt isn't a query-cancel-or-die case.
2039 */
2041 {
2042 result = false;
2043 break;
2044 }
2045
2046 if (current.blkno == InvalidBlockNumber)
2047 {
2048 /*
2049 * Create a leaf page. If leafSize is too large to fit on a page,
2050 * we won't actually use the page yet, but it simplifies the API
2051 * for doPickSplit to always have a leaf page at hand; so just
2052 * quietly limit our request to a page size.
2053 */
2054 current.buffer =
2056 GBUF_LEAF | (isnull ? GBUF_NULLS : 0),
2057 Min(leafSize, SPGIST_PAGE_CAPACITY),
2058 &isNew);
2059 current.blkno = BufferGetBlockNumber(current.buffer);
2060 }
2061 else if (parent.buffer == InvalidBuffer)
2062 {
2063 /* we hold no parent-page lock, so no deadlock is possible */
2064 current.buffer = ReadBuffer(index, current.blkno);
2066 }
2067 else if (current.blkno != parent.blkno)
2068 {
2069 /* descend to a new child page */
2070 current.buffer = ReadBuffer(index, current.blkno);
2071
2072 /*
2073 * Attempt to acquire lock on child page. We must beware of
2074 * deadlock against another insertion process descending from that
2075 * page to our parent page (see README). If we fail to get lock,
2076 * abandon the insertion and tell our caller to start over.
2077 *
2078 * XXX this could be improved, because failing to get lock on a
2079 * buffer is not proof of a deadlock situation; the lock might be
2080 * held by a reader, or even just background writer/checkpointer
2081 * process. Perhaps it'd be worth retrying after sleeping a bit?
2082 */
2083 if (!ConditionalLockBuffer(current.buffer))
2084 {
2085 ReleaseBuffer(current.buffer);
2087 return false;
2088 }
2089 }
2090 else
2091 {
2092 /* inner tuple can be stored on the same page as parent one */
2093 current.buffer = parent.buffer;
2094 }
2095 current.page = BufferGetPage(current.buffer);
2096
2097 /* should not arrive at a page of the wrong type */
2098 if (isnull ? !SpGistPageStoresNulls(current.page) :
2099 SpGistPageStoresNulls(current.page))
2100 elog(ERROR, "SPGiST index page %u has wrong nulls flag",
2101 current.blkno);
2102
2103 if (SpGistPageIsLeaf(current.page))
2104 {
2105 SpGistLeafTuple leafTuple;
2106 int nToSplit,
2107 sizeToSplit;
2108
2109 leafTuple = spgFormLeafTuple(state, heapPtr, leafDatums, isnulls);
2110 if (leafTuple->size + sizeof(ItemIdData) <=
2111 SpGistPageGetFreeSpace(current.page, 1))
2112 {
2113 /* it fits on page, so insert it and we're done */
2114 addLeafTuple(index, state, leafTuple,
2115 &current, &parent, isnull, isNew);
2116 break;
2117 }
2118 else if ((sizeToSplit =
2119 checkSplitConditions(index, state, &current,
2120 &nToSplit)) < SPGIST_PAGE_CAPACITY / 2 &&
2121 nToSplit < 64 &&
2122 leafTuple->size + sizeof(ItemIdData) + sizeToSplit <= SPGIST_PAGE_CAPACITY)
2123 {
2124 /*
2125 * the amount of data is pretty small, so just move the whole
2126 * chain to another leaf page rather than splitting it.
2127 */
2128 Assert(!isNew);
2129 moveLeafs(index, state, &current, &parent, leafTuple, isnull);
2130 break; /* we're done */
2131 }
2132 else
2133 {
2134 /* picksplit */
2135 if (doPickSplit(index, state, &current, &parent,
2136 leafTuple, level, isnull, isNew))
2137 break; /* doPickSplit installed new tuples */
2138
2139 /* leaf tuple will not be inserted yet */
2140 pfree(leafTuple);
2141
2142 /*
2143 * current now describes new inner tuple, go insert into it
2144 */
2145 Assert(!SpGistPageIsLeaf(current.page));
2146 goto process_inner_tuple;
2147 }
2148 }
2149 else /* non-leaf page */
2150 {
2151 /*
2152 * Apply the opclass choose function to figure out how to insert
2153 * the given datum into the current inner tuple.
2154 */
2155 SpGistInnerTuple innerTuple;
2156 spgChooseIn in;
2157 spgChooseOut out;
2158
2159 /*
2160 * spgAddNode and spgSplitTuple cases will loop back to here to
2161 * complete the insertion operation. Just in case the choose
2162 * function is broken and produces add or split requests
2163 * repeatedly, check for query cancel (see comments above).
2164 */
2165 process_inner_tuple:
2167 {
2168 result = false;
2169 break;
2170 }
2171
2172 innerTuple = (SpGistInnerTuple) PageGetItem(current.page,
2173 PageGetItemId(current.page, current.offnum));
2174
2175 in.datum = datums[spgKeyColumn];
2176 in.leafDatum = leafDatums[spgKeyColumn];
2177 in.level = level;
2178 in.allTheSame = innerTuple->allTheSame;
2179 in.hasPrefix = (innerTuple->prefixSize > 0);
2180 in.prefixDatum = SGITDATUM(innerTuple, state);
2181 in.nNodes = innerTuple->nNodes;
2182 in.nodeLabels = spgExtractNodeLabels(state, innerTuple);
2183
2184 memset(&out, 0, sizeof(out));
2185
2186 if (!isnull)
2187 {
2188 /* use user-defined choose method */
2189 FunctionCall2Coll(procinfo,
2190 index->rd_indcollation[0],
2191 PointerGetDatum(&in),
2192 PointerGetDatum(&out));
2193 }
2194 else
2195 {
2196 /* force "match" action (to insert to random subnode) */
2198 }
2199
2200 if (innerTuple->allTheSame)
2201 {
2202 /*
2203 * It's not allowed to do an AddNode at an allTheSame tuple.
2204 * Opclass must say "match", in which case we choose a random
2205 * one of the nodes to descend into, or "split".
2206 */
2207 if (out.resultType == spgAddNode)
2208 elog(ERROR, "cannot add a node to an allTheSame inner tuple");
2209 else if (out.resultType == spgMatchNode)
2210 out.result.matchNode.nodeN =
2212 0, innerTuple->nNodes - 1);
2213 }
2214
2215 switch (out.resultType)
2216 {
2217 case spgMatchNode:
2218 /* Descend to N'th child node */
2219 spgMatchNodeAction(index, state, innerTuple,
2220 &current, &parent,
2221 out.result.matchNode.nodeN);
2222 /* Adjust level as per opclass request */
2223 level += out.result.matchNode.levelAdd;
2224 /* Replace leafDatum and recompute leafSize */
2225 if (!isnull)
2226 {
2227 leafDatums[spgKeyColumn] = out.result.matchNode.restDatum;
2228 leafSize = SpGistGetLeafTupleSize(leafDescriptor,
2229 leafDatums, isnulls);
2230 leafSize += sizeof(ItemIdData);
2231 }
2232
2233 /*
2234 * Check new tuple size; fail if it can't fit, unless the
2235 * opclass says it can handle the situation by suffixing.
2236 *
2237 * However, the opclass can only shorten the leaf datum,
2238 * which may not be enough to ever make the tuple fit,
2239 * since INCLUDE columns might alone use more than a page.
2240 * Depending on the opclass' behavior, that could lead to
2241 * an infinite loop --- spgtextproc.c, for example, will
2242 * just repeatedly generate an empty-string leaf datum
2243 * once it runs out of data. Actual bugs in opclasses
2244 * might cause infinite looping, too. To detect such a
2245 * loop, check to see if we are making progress by
2246 * reducing the leafSize in each pass. This is a bit
2247 * tricky though. Because of alignment considerations,
2248 * the total tuple size might not decrease on every pass.
2249 * Also, there are edge cases where the choose method
2250 * might seem to not make progress for a cycle or two.
2251 * Somewhat arbitrarily, we allow up to 10 no-progress
2252 * iterations before failing. (This limit should be more
2253 * than MAXALIGN, to accommodate opclasses that trim one
2254 * byte from the leaf datum per pass.)
2255 */
2256 if (leafSize > SPGIST_PAGE_CAPACITY)
2257 {
2258 bool ok = false;
2259
2260 if (state->config.longValuesOK && !isnull)
2261 {
2262 if (leafSize < bestLeafSize)
2263 {
2264 ok = true;
2265 bestLeafSize = leafSize;
2266 numNoProgressCycles = 0;
2267 }
2268 else if (++numNoProgressCycles < 10)
2269 ok = true;
2270 }
2271 if (!ok)
2272 ereport(ERROR,
2273 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
2274 errmsg("index row size %zu exceeds maximum %zu for index \"%s\"",
2275 leafSize - sizeof(ItemIdData),
2278 errhint("Values larger than a buffer page cannot be indexed.")));
2279 }
2280
2281 /*
2282 * Loop around and attempt to insert the new leafDatum at
2283 * "current" (which might reference an existing child
2284 * tuple, or might be invalid to force us to find a new
2285 * page for the tuple).
2286 */
2287 break;
2288 case spgAddNode:
2289 /* AddNode is not sensible if nodes don't have labels */
2290 if (in.nodeLabels == NULL)
2291 elog(ERROR, "cannot add a node to an inner tuple without node labels");
2292 /* Add node to inner tuple, per request */
2293 spgAddNodeAction(index, state, innerTuple,
2294 &current, &parent,
2295 out.result.addNode.nodeN,
2297
2298 /*
2299 * Retry insertion into the enlarged node. We assume that
2300 * we'll get a MatchNode result this time.
2301 */
2302 goto process_inner_tuple;
2303 break;
2304 case spgSplitTuple:
2305 /* Split inner tuple, per request */
2306 spgSplitNodeAction(index, state, innerTuple,
2307 &current, &out);
2308
2309 /* Retry insertion into the split node */
2310 goto process_inner_tuple;
2311 break;
2312 default:
2313 elog(ERROR, "unrecognized SPGiST choose result: %d",
2314 (int) out.resultType);
2315 break;
2316 }
2317 }
2318 } /* end loop */
2319
2320 /*
2321 * Release any buffers we're still holding. Beware of possibility that
2322 * current and parent reference same buffer.
2323 */
2324 if (current.buffer != InvalidBuffer)
2325 {
2327 UnlockReleaseBuffer(current.buffer);
2328 }
2329 if (parent.buffer != InvalidBuffer &&
2330 parent.buffer != current.buffer)
2331 {
2334 }
2335
2336 /*
2337 * We do not support being called while some outer function is holding a
2338 * buffer lock (or any other reason to postpone query cancels). If that
2339 * were the case, telling the caller to retry would create an infinite
2340 * loop.
2341 */
2343
2344 /*
2345 * Finally, check for interrupts again. If there was a query cancel,
2346 * ProcessInterrupts() will be able to throw the error here. If it was
2347 * some other kind of interrupt that can just be cleared, return false to
2348 * tell our caller to retry.
2349 */
2351
2352 return result;
2353}
uint32 BlockNumber
Definition: block.h:31
#define InvalidBlockNumber
Definition: block.h:33
int Buffer
Definition: buf.h:23
#define InvalidBuffer
Definition: buf.h:25
BlockNumber BufferGetBlockNumber(Buffer buffer)
Definition: bufmgr.c:4223
bool ConditionalLockBuffer(Buffer buffer)
Definition: bufmgr.c:5630
void ReleaseBuffer(Buffer buffer)
Definition: bufmgr.c:5366
void UnlockReleaseBuffer(Buffer buffer)
Definition: bufmgr.c:5383
void MarkBufferDirty(Buffer buffer)
Definition: bufmgr.c:2943
void LockBuffer(Buffer buffer, int mode)
Definition: bufmgr.c:5604
Buffer ReadBuffer(Relation reln, BlockNumber blockNum)
Definition: bufmgr.c:745
static Page BufferGetPage(Buffer buffer)
Definition: bufmgr.h:425
#define BUFFER_LOCK_EXCLUSIVE
Definition: bufmgr.h:205
static bool BufferIsValid(Buffer bufnum)
Definition: bufmgr.h:376
void PageIndexMultiDelete(Page page, OffsetNumber *itemnos, int nitems)
Definition: bufpage.c:1160
void PageIndexTupleDelete(Page page, OffsetNumber offnum)
Definition: bufpage.c:1051
Size PageGetExactFreeSpace(const PageData *page)
Definition: bufpage.c:957
static void * PageGetItem(const PageData *page, const ItemIdData *itemId)
Definition: bufpage.h:353
static ItemId PageGetItemId(Page page, OffsetNumber offsetNumber)
Definition: bufpage.h:243
static void PageSetLSN(Page page, XLogRecPtr lsn)
Definition: bufpage.h:390
PageData * Page
Definition: bufpage.h:81
#define PageAddItem(page, item, size, offsetNumber, overwrite, is_heap)
Definition: bufpage.h:471
static OffsetNumber PageGetMaxOffsetNumber(const PageData *page)
Definition: bufpage.h:371
#define Min(x, y)
Definition: c.h:1008
uint8_t uint8
Definition: c.h:541
#define OidIsValid(objectId)
Definition: c.h:779
int errhint(const char *fmt,...)
Definition: elog.c:1330
int errcode(int sqlerrcode)
Definition: elog.c:863
int errmsg(const char *fmt,...)
Definition: elog.c:1080
#define ERROR
Definition: elog.h:39
#define elog(elevel,...)
Definition: elog.h:226
#define ereport(elevel,...)
Definition: elog.h:150
Datum FunctionCall2Coll(FmgrInfo *flinfo, Oid collation, Datum arg1, Datum arg2)
Definition: fmgr.c:1150
Datum FunctionCall1Coll(FmgrInfo *flinfo, Oid collation, Datum arg1)
Definition: fmgr.c:1130
#define PG_DETOAST_DATUM(datum)
Definition: fmgr.h:240
Assert(PointerIsAligned(start, uint64))
#define nitems(x)
Definition: indent.h:31
FmgrInfo * index_getprocinfo(Relation irel, AttrNumber attnum, uint16 procnum)
Definition: indexam.c:917
RegProcedure index_getprocid(Relation irel, AttrNumber attnum, uint16 procnum)
Definition: indexam.c:883
static int pg_cmp_u16(uint16 a, uint16 b)
Definition: int.h:640
int b
Definition: isn.c:74
int a
Definition: isn.c:73
int i
Definition: isn.c:77
if(TABLE==NULL||TABLE_index==NULL)
Definition: isn.c:81
struct ItemIdData ItemIdData
static void ItemPointerSet(ItemPointerData *pointer, BlockNumber blockNumber, OffsetNumber offNum)
Definition: itemptr.h:135
static OffsetNumber ItemPointerGetOffsetNumber(const ItemPointerData *pointer)
Definition: itemptr.h:124
static BlockNumber ItemPointerGetBlockNumber(const ItemPointerData *pointer)
Definition: itemptr.h:103
static bool ItemPointerIsValid(const ItemPointerData *pointer)
Definition: itemptr.h:83
#define MaxIndexTuplesPerPage
Definition: itup.h:181
void pfree(void *pointer)
Definition: mcxt.c:1594
void * palloc0(Size size)
Definition: mcxt.c:1395
void * palloc(Size size)
Definition: mcxt.c:1365
#define INTERRUPTS_CAN_BE_PROCESSED()
Definition: miscadmin.h:130
#define START_CRIT_SECTION()
Definition: miscadmin.h:150
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:123
#define INTERRUPTS_PENDING_CONDITION()
Definition: miscadmin.h:113
#define END_CRIT_SECTION()
Definition: miscadmin.h:152
#define InvalidOffsetNumber
Definition: off.h:26
uint16 OffsetNumber
Definition: off.h:24
#define FirstOffsetNumber
Definition: off.h:27
static char * label
#define INDEX_MAX_KEYS
uint64 pg_prng_uint64_range(pg_prng_state *state, uint64 rmin, uint64 rmax)
Definition: pg_prng.c:144
pg_prng_state pg_global_prng_state
Definition: pg_prng.c:34
#define qsort(a, b, c, d)
Definition: port.h:479
static Datum PointerGetDatum(const void *X)
Definition: postgres.h:332
uint64_t Datum
Definition: postgres.h:70
#define RelationGetRelationName(relation)
Definition: rel.h:549
#define RelationNeedsWAL(relation)
Definition: rel.h:638
static void spgAddNodeAction(Relation index, SpGistState *state, SpGistInnerTuple innerTuple, SPPageDesc *current, SPPageDesc *parent, int nodeN, Datum nodeLabel)
Definition: spgdoinsert.c:1509
static void setRedirectionTuple(SPPageDesc *current, OffsetNumber position, BlockNumber blkno, OffsetNumber offnum)
Definition: spgdoinsert.c:564
static bool checkAllTheSame(spgPickSplitIn *in, spgPickSplitOut *out, bool tooBig, bool *includeNew)
Definition: spgdoinsert.c:595
static void spgSplitNodeAction(Relation index, SpGistState *state, SpGistInnerTuple innerTuple, SPPageDesc *current, spgChooseOut *out)
Definition: spgdoinsert.c:1711
static SpGistInnerTuple addNode(SpGistState *state, SpGistInnerTuple tuple, Datum label, int offset)
Definition: spgdoinsert.c:80
void spgPageIndexMultiDelete(SpGistState *state, Page page, OffsetNumber *itemnos, int nitems, int firststate, int reststate, BlockNumber blkno, OffsetNumber offnum)
Definition: spgdoinsert.c:131
bool spgdoinsert(Relation index, SpGistState *state, const ItemPointerData *heapPtr, const Datum *datums, const bool *isnulls)
Definition: spgdoinsert.c:1910
static void spgMatchNodeAction(Relation index, SpGistState *state, SpGistInnerTuple innerTuple, SPPageDesc *current, SPPageDesc *parent, int nodeN)
Definition: spgdoinsert.c:1455
static bool doPickSplit(Relation index, SpGistState *state, SPPageDesc *current, SPPageDesc *parent, SpGistLeafTuple newLeafTuple, int level, bool isNulls, bool isNew)
Definition: spgdoinsert.c:673
void spgUpdateNodeLink(SpGistInnerTuple tup, int nodeN, BlockNumber blkno, OffsetNumber offset)
Definition: spgdoinsert.c:52
static void moveLeafs(Relation index, SpGistState *state, SPPageDesc *current, SPPageDesc *parent, SpGistLeafTuple newLeafTuple, bool isNulls)
Definition: spgdoinsert.c:386
static void saveNodeLink(Relation index, SPPageDesc *parent, BlockNumber blkno, OffsetNumber offnum)
Definition: spgdoinsert.c:185
struct SPPageDesc SPPageDesc
static int checkSplitConditions(Relation index, SpGistState *state, SPPageDesc *current, int *nToSplit)
Definition: spgdoinsert.c:332
static void addLeafTuple(Relation index, SpGistState *state, SpGistLeafTuple leafTuple, SPPageDesc *current, SPPageDesc *parent, bool isNulls, bool isNew)
Definition: spgdoinsert.c:202
static int cmpOffsetNumbers(const void *a, const void *b)
Definition: spgdoinsert.c:112
#define SPGIST_COMPRESS_PROC
Definition: spgist.h:28
#define SPGIST_CHOOSE_PROC
Definition: spgist.h:24
@ spgMatchNode
Definition: spgist.h:69
@ spgAddNode
Definition: spgist.h:70
@ spgSplitTuple
Definition: spgist.h:71
#define SPGIST_PICKSPLIT_PROC
Definition: spgist.h:25
#define SGLTDATUM(x, s)
#define SPGIST_NULL_BLKNO
SpGistDeadTupleData * SpGistDeadTuple
#define GBUF_NULLS
#define SPGIST_REDIRECT
SpGistInnerTupleData * SpGistInnerTuple
#define SPGIST_LIVE
#define SGDTSIZE
#define SpGistPageStoresNulls(page)
#define SGITDATUM(x, s)
#define SGLT_GET_NEXTOFFSET(spgLeafTuple)
#define SPGIST_PLACEHOLDER
#define SGITITERATE(x, i, nt)
#define GBUF_LEAF
#define spgFirstIncludeColumn
#define SPGIST_DEAD
#define SpGistPageIsLeaf(page)
#define SPGIST_METAPAGE_BLKNO
#define SGLT_SET_NEXTOFFSET(spgLeafTuple, offsetNumber)
#define SpGistBlockIsRoot(blkno)
struct SpGistLeafTupleData * SpGistLeafTuple
#define SPGIST_NULLS
#define STORE_STATE(s, d)
#define SPGIST_PAGE_CAPACITY
#define SGITMAXNNODES
#define SpGistPageGetOpaque(page)
#define GBUF_INNER_PARITY(x)
#define SPGIST_LEAF
#define spgKeyColumn
#define SPGIST_ROOT_BLKNO
#define SpGistPageGetFreeSpace(p, n)
OffsetNumber SpGistPageAddNewItem(SpGistState *state, Page page, const void *item, Size size, OffsetNumber *startOffset, bool errorOK)
Definition: spgutils.c:1203
Datum * spgExtractNodeLabels(SpGistState *state, SpGistInnerTuple innerTuple)
Definition: spgutils.c:1160
SpGistLeafTuple spgFormLeafTuple(SpGistState *state, const ItemPointerData *heapPtr, const Datum *datums, const bool *isnulls)
Definition: spgutils.c:871
SpGistInnerTuple spgFormInnerTuple(SpGistState *state, bool hasPrefix, Datum prefix, int nNodes, SpGistNodeTuple *nodes)
Definition: spgutils.c:1002
void spgDeformLeafTuple(SpGistLeafTuple tup, TupleDesc tupleDescriptor, Datum *datums, bool *isnulls, bool keyColumnIsNull)
Definition: spgutils.c:1115
SpGistDeadTuple spgFormDeadTuple(SpGistState *state, int tupstate, BlockNumber blkno, OffsetNumber offnum)
Definition: spgutils.c:1085
void SpGistInitBuffer(Buffer b, uint16 f)
Definition: spgutils.c:722
Buffer SpGistGetBuffer(Relation index, int flags, int needSpace, bool *isNew)
Definition: spgutils.c:569
void SpGistSetLastUsedPage(Relation index, Buffer buffer)
Definition: spgutils.c:673
Size SpGistGetLeafTupleSize(TupleDesc tupleDescriptor, const Datum *datums, const bool *isnulls)
Definition: spgutils.c:818
SpGistNodeTuple spgFormNodeTuple(SpGistState *state, Datum label, bool isnull)
Definition: spgutils.c:960
#define XLOG_SPGIST_SPLIT_TUPLE
Definition: spgxlog.h:25
#define SizeOfSpgxlogPickSplit
Definition: spgxlog.h:199
#define SizeOfSpgxlogMoveLeafs
Definition: spgxlog.h:91
#define XLOG_SPGIST_ADD_NODE
Definition: spgxlog.h:24
#define XLOG_SPGIST_ADD_LEAF
Definition: spgxlog.h:22
#define XLOG_SPGIST_MOVE_LEAFS
Definition: spgxlog.h:23
#define XLOG_SPGIST_PICKSPLIT
Definition: spgxlog.h:26
int16 attlen
Definition: tupdesc.h:71
Definition: fmgr.h:57
ItemPointerData t_tid
Definition: itup.h:37
Buffer buffer
Definition: spgdoinsert.c:39
OffsetNumber offnum
Definition: spgdoinsert.c:41
BlockNumber blkno
Definition: spgdoinsert.c:38
unsigned int tupstate
ItemPointerData pointer
unsigned int prefixSize
unsigned int allTheSame
unsigned int tupstate
Definition: type.h:96
Datum * nodeLabels
Definition: spgist.h:64
bool hasPrefix
Definition: spgist.h:61
Datum prefixDatum
Definition: spgist.h:62
int nNodes
Definition: spgist.h:63
Datum datum
Definition: spgist.h:55
int level
Definition: spgist.h:57
Datum leafDatum
Definition: spgist.h:56
bool allTheSame
Definition: spgist.h:60
bool postfixHasPrefix
Definition: spgist.h:101
int childNodeN
Definition: spgist.h:98
spgChooseResultType resultType
Definition: spgist.h:76
int levelAdd
Definition: spgist.h:82
struct spgChooseOut::@53::@54 matchNode
struct spgChooseOut::@53::@56 splitTuple
union spgChooseOut::@53 result
Datum nodeLabel
Definition: spgist.h:87
Datum * prefixNodeLabels
Definition: spgist.h:96
Datum postfixPrefixDatum
Definition: spgist.h:102
Datum restDatum
Definition: spgist.h:83
int prefixNNodes
Definition: spgist.h:95
int nodeN
Definition: spgist.h:81
struct spgChooseOut::@53::@55 addNode
Datum prefixPrefixDatum
Definition: spgist.h:94
bool prefixHasPrefix
Definition: spgist.h:93
Datum * datums
Definition: spgist.h:113
bool hasPrefix
Definition: spgist.h:119
int * mapTuplesToNodes
Definition: spgist.h:125
Datum * nodeLabels
Definition: spgist.h:123
Datum * leafTupleDatums
Definition: spgist.h:126
Datum prefixDatum
Definition: spgist.h:120
uint16 nodeI
Definition: spgxlog.h:54
bool newPage
Definition: spgxlog.h:48
OffsetNumber offnumLeaf
Definition: spgxlog.h:50
bool storesNulls
Definition: spgxlog.h:49
OffsetNumber offnumHeadLeaf
Definition: spgxlog.h:51
OffsetNumber offnumParent
Definition: spgxlog.h:53
OffsetNumber offnumNew
Definition: spgxlog.h:111
bool newPage
Definition: spgxlog.h:112
OffsetNumber offnumParent
Definition: spgxlog.h:126
OffsetNumber offnum
Definition: spgxlog.h:105
spgxlogState stateSrc
Definition: spgxlog.h:130
uint16 nodeI
Definition: spgxlog.h:128
int8 parentBlk
Definition: spgxlog.h:125
bool replaceDead
Definition: spgxlog.h:68
bool storesNulls
Definition: spgxlog.h:69
uint16 nMoves
Definition: spgxlog.h:66
uint16 nodeI
Definition: spgxlog.h:73
OffsetNumber offnumParent
Definition: spgxlog.h:72
spgxlogState stateSrc
Definition: spgxlog.h:75
uint16 nInsert
Definition: spgxlog.h:170
spgxlogState stateSrc
Definition: spgxlog.h:185
bool innerIsParent
Definition: spgxlog.h:181
uint16 nodeI
Definition: spgxlog.h:183
bool storesNulls
Definition: spgxlog.h:178
OffsetNumber offnumParent
Definition: spgxlog.h:182
OffsetNumber offnumInner
Definition: spgxlog.h:175
uint16 nDelete
Definition: spgxlog.h:169
bool isRootSplit
Definition: spgxlog.h:167
OffsetNumber offnumPostfix
Definition: spgxlog.h:147
OffsetNumber offnumPrefix
Definition: spgxlog.h:144
bool postfixBlkSame
Definition: spgxlog.h:149
Definition: regguts.h:323
static CompactAttribute * TupleDescCompactAttr(TupleDesc tupdesc, int i)
Definition: tupdesc.h:175
uint64 XLogRecPtr
Definition: xlogdefs.h:21
XLogRecPtr XLogInsert(RmgrId rmid, uint8 info)
Definition: xloginsert.c:478
void XLogRegisterData(const void *data, uint32 len)
Definition: xloginsert.c:368
void XLogRegisterBuffer(uint8 block_id, Buffer buffer, uint8 flags)
Definition: xloginsert.c:245
void XLogBeginInsert(void)
Definition: xloginsert.c:152
#define REGBUF_STANDARD
Definition: xloginsert.h:35
#define REGBUF_WILL_INIT
Definition: xloginsert.h:34