/* * Build the index for an unlogged table */ void hnswbuildempty(Relation index) { IndexInfo *indexInfo = BuildIndexInfo(index); HnswBuildState buildstate;
/* * The HNSW build happens in two phases: * * 1. In-memory phase * * In this first phase, the graph is held completely in memory. When the graph * is fully built, or we run out of memory reserved for the build (determined * by maintenance_work_mem), we materialize the graph to disk (see * FlushPages()), and switch to the on-disk phase. * * In a parallel build, a large contiguous chunk of shared memory is allocated * to hold the graph. Each worker process has its own HnswBuildState struct in * private memory, which contains information that doesn't change throughout * the build, and pointers to the shared structs in shared memory. The shared * memory area is mapped to a different address in each worker process, and * 'HnswBuildState.hnswarea' points to the beginning of the shared area in the * worker process's address space. All pointers used in the graph are * "relative pointers", stored as an offset from 'hnswarea'. * * Each element is protected by an LWLock. It must be held when reading or * modifying the element's neighbors or 'heaptids'. * * In a non-parallel build, the graph is held in backend-private memory. All * the elements are allocated in a dedicated memory context, 'graphCtx', and * the pointers used in the graph are regular pointers. * * 2. On-disk phase * * In the on-disk phase, the index is built by inserting each vector to the * index one by one, just like on INSERT. The only difference is that we don't * WAL-log the individual inserts. If the graph fit completely in memory and * was fully built in the in-memory phase, the on-disk phase is skipped. * * After we have finished building the graph, we perform one more scan through * the index and write all the pages to the WAL. */
/* Flush pages */ if (!buildstate->graph->flushed) FlushPages(buildstate); }
/* * table_index_build_scan - scan the table to find tuples to be indexed * * This is called back from an access-method-specific index build procedure * after the AM has done whatever setup it needs. The parent table relation * is scanned to find tuples that should be entered into the index. Each * such tuple is passed to the AM's callback routine, which does the right * things to add it to the new index. After we return, the AM's index * build procedure does whatever cleanup it needs. * * The total count of live tuples is returned. This is for updating pg_class * statistics. (It's annoying not to be able to do that here, but we want to * merge that update with others; see index_update_stats.) Note that the * index AM itself must keep track of the number of index tuples; we don't do * so here because the AM might reject some of the tuples for its own reasons, * such as being unable to store NULLs. * * If 'progress', the PROGRESS_SCAN_BLOCKS_TOTAL counter is updated when * starting the scan, and PROGRESS_SCAN_BLOCKS_DONE is updated as we go along. * * A side effect is to set indexInfo->ii_BrokenHotChain to true if we detect * any potentially broken HOT chains. Currently, we set this if there are any * RECENTLY_DEAD or DELETE_IN_PROGRESS entries in a HOT chain, without trying * very hard to detect whether they're really incompatible with the chain tip. * This only really makes sense for heap AM, it might need to be generalized * for other AMs later. */ staticinlinedouble table_index_build_scan(Relation table_rel, Relation index_rel, struct IndexInfo *index_info, bool allow_sync, bool progress, IndexBuildCallback callback, void *callback_state, TableScanDesc scan) { return table_rel->rd_tableam->index_build_range_scan(table_rel, index_rel, index_info, allow_sync, false, progress, 0, InvalidBlockNumber, callback, callback_state, scan); }
/* Typedef for callback function for table_index_build_scan */ typedefvoid(*IndexBuildCallback)(Relation index, ItemPointer tid, Datum *values, bool *isnull, bool tupleIsAlive, void *state);
/* * In a parallel build, the HnswElement is allocated from the shared * memory area, so we need to coordinate with other processes. */ LWLockAcquire(&graph->allocatorLock, LW_EXCLUSIVE);
/* * Check that we have enough memory available for the new element now that * we have the allocator lock, and flush pages if needed. */ if (graph->memoryUsed >= graph->memoryTotal) { LWLockRelease(&graph->allocatorLock);
if (!graph->flushed) { ereport(NOTICE, (errmsg("hnsw graph no longer fits into maintenance_work_mem after " INT64_FORMAT " tuples", (int64) graph->indtuples), errdetail("Building will take significantly more time."), errhint("Increase maintenance_work_mem to speed up builds.")));
/* Ok, we can proceed to allocate the element */ element = HnswInitElement(base, heaptid, buildstate->m, buildstate->ml, buildstate->maxLevel, allocator); valuePtr = HnswAlloc(allocator, valueSize);
/* * We have now allocated the space needed for the element, so we don't * need the allocator lock anymore. Release it and initialize the rest of * the element. */ LWLockRelease(&graph->allocatorLock);
/* Copy the datum */ memcpy(valuePtr, DatumGetPointer(value), valueSize); HnswPtrStore(base, element->value, valuePtr);
/* Create a lock for the element */ LWLockInitialize(&element->lock, hnsw_lock_tranche_id);
// 1、从 buffer 中读待索引目标,本地拷贝 /* Form index value */ if (!HnswFormIndexValue(&value, values, isnull, buildstate->typeInfo, support)) returnfalse;
/* Get datum size */ valueSize = VARSIZE_ANY(DatumGetPointer(value));
/* * Check that we have enough memory available for the new element now that * we have the allocator lock, and flush pages if needed. */ if (graph->memoryUsed >= graph->memoryTotal) { return; }
/* Ok, we can proceed to allocate the element */ // 2、在索引中初始化待目标项的索引项 element = HnswInitElement(base, heaptid, buildstate->m, buildstate->ml, buildstate->maxLevel, allocator); valuePtr = HnswAlloc(allocator, valueSize);
/* Copy the datum */ memcpy(valuePtr, DatumGetPointer(value), valueSize); HnswPtrStore(base, element->value, valuePtr);
1、随机设定待插入层级
2、首先在 allocator 定义的内存空间中,分配节点信息头 HnswElement 长度的空间,拿取其地址 element ,将节点信息头(如:原始向量 tid ,节点层级、节点是否删除等信息)写入到 element 中
3、在 allocator 定义的内存空间中,按层依次初始化存储各层邻居指针的空间
/* Wait if another process needs exclusive lock on entry lock */ LWLockAcquire(entryWaitLock, LW_EXCLUSIVE); LWLockRelease(entryWaitLock);
/* Get entry point */ LWLockAcquire(entryLock, LW_SHARED); entryPoint = HnswPtrAccess(base, graph->entryPoint);
/* Prevent concurrent inserts when likely updating entry point */ if (entryPoint == NULL || element->level > entryPoint->level) { /* Release shared lock */ LWLockRelease(entryLock);
/* Tell other processes to wait and get exclusive lock */ LWLockAcquire(entryWaitLock, LW_EXCLUSIVE); LWLockAcquire(entryLock, LW_EXCLUSIVE); LWLockRelease(entryWaitLock);
/* Get latest entry point after lock is acquired */ entryPoint = HnswPtrAccess(base, graph->entryPoint); }
/* Find neighbors for element */ HnswFindElementNeighbors(base, element, entryPoint, NULL, support, m, efConstruction, false);
/* Update graph in memory */ UpdateGraphInMemory(support, element, m, efConstruction, entryPoint, buildstate);
/* * Code outside of lwlock.c should not manipulate the contents of this * structure directly, but we have to declare it here to allow LWLocks to be * incorporated into other data structures. */ typedefstructLWLock { uint16 tranche; /* tranche ID */ pg_atomic_uint32 state; /* state of exclusive/nonexclusive lockers */ proclist_head waiters; /* list of waiting PGPROCs */ #ifdef LOCK_DEBUG pg_atomic_uint32 nwaiters; /* number of waiters */ structPGPROC *owner;/* last exclusive owner of the lock */ #endif } LWLock;
/* * LWLockAcquire - acquire a lightweight lock in the specified mode * * If the lock is not available, sleep until it is. Returns true if the lock * was available immediately, false if we had to sleep. * * Side effect: cancel/die interrupts are held off until lock release. */ bool LWLockAcquire(LWLock *lock, LWLockMode mode) { PGPROC *proc = MyProc; bool result = true; int extraWaits = 0; #ifdef LWLOCK_STATS lwlock_stats *lwstats;
/* * We can't wait if we haven't got a PGPROC. This should only occur * during bootstrap or shared memory initialization. Put an Assert here * to catch unsafe coding practices. */ Assert(!(proc == NULL && IsUnderPostmaster));
/* Ensure we will have room to remember the lock */ if (num_held_lwlocks >= MAX_SIMUL_LWLOCKS) elog(ERROR, "too many LWLocks taken");
/* * Lock out cancel/die interrupts until we exit the code section protected * by the LWLock. This ensures that interrupts will not interfere with * manipulations of data structures in shared memory. */ HOLD_INTERRUPTS();
for (;;) { bool mustwait;
/* * Try to grab the lock the first time, we're not in the waitqueue * yet/anymore. */ mustwait = LWLockAttemptLock(lock, mode);
if (!mustwait) { LOG_LWDEBUG("LWLockAcquire", lock, "immediately acquired lock"); break; /* got the lock */ }
/* add to the queue */ LWLockQueueSelf(lock, mode);
/* we're now guaranteed to be woken up if necessary */ mustwait = LWLockAttemptLock(lock, mode);
/* ok, grabbed the lock the second time round, need to undo queueing */ if (!mustwait) { LOG_LWDEBUG("LWLockAcquire", lock, "acquired, undoing queue");
/* * Internal function that tries to atomically acquire the lwlock in the passed * in mode. * * This function will not block waiting for a lock to become free - that's the * caller's job. * * Returns true if the lock isn't free and we need to wait. */ staticbool LWLockAttemptLock(LWLock *lock, LWLockMode mode) { uint32 old_state;
if (pg_atomic_compare_exchange_u32(&lock->state, &old_state, desired_state)) { if (lock_free) { /* Great! Got the lock. */ #ifdef LOCK_DEBUG if (mode == LW_EXCLUSIVE) lock->owner = MyProc; #endif returnfalse; } else returntrue; /* somebody else has the lock */ } } pg_unreachable(); }
尝试加锁函数 LWLockAttemptLock 采用了 无条件循环 + CAS 的方式来保证每个获取锁语句执行过程是不被打扰的。如果 CAS 失败则继续循环,重新尝试加锁,直到某次尝试成功。可以看到这个过程的临界区资源访问消耗是极小的(CAS的优点),缺点是可能会出现忙等待(在循环中不断尝试获取锁,如果锁被其他线程持有,可能会导致忙等待,消耗 CPU 资源)、缺乏公平性(可能导致某些线程长时间无法获取锁,出现“饥饿”现象)和依赖硬件支持等缺点。
/* * Algorithm 1 from paper */ void HnswFindElementNeighbors(char *base, HnswElement element, HnswElement entryPoint, Relation index, HnswSupport * support, int m, int efConstruction, bool existing) { List *ep; List *w; int level = element->level; int entryLevel; HnswQuery q; HnswElement skipElement = existing ? element : NULL; bool inMemory = index == NULL;
q.value = HnswGetValue(base, element);
/* Precompute hash */ if (inMemory) PrecomputeHash(base, element);
/* No neighbors if no entry point */ if (entryPoint == NULL) return;
/* Get entry point and level */ ep = list_make1(HnswEntryCandidate(base, entryPoint, &q, index, support, true)); entryLevel = entryPoint->level;
/* 1st phase: greedy search to insert level */ for (int lc = entryLevel; lc >= level + 1; lc--) { w = HnswSearchLayer(base, &q, ep, 1, lc, index, support, m, true, skipElement, NULL, NULL, true, NULL); ep = w; }
if (level > entryLevel) level = entryLevel;
/* Add one for existing element */ if (existing) efConstruction++;
/* 2nd phase */ for (int lc = level; lc >= 0; lc--) { int lm = HnswGetLayerM(m, lc); List *neighbors; List *lw = NIL; // 局部邻居候选队列 ListCell *lc2;
w = HnswSearchLayer(base, &q, ep, efConstruction, lc, index, support, m, true, skipElement, NULL, NULL, true, NULL);
/* Elements being deleted or skipped can help with search */ /* but should be removed before selecting neighbors */ if (!inMemory) lw = RemoveElements(base, lw, skipElement);
/* * Candidates are sorted, but not deterministically. Could set * sortCandidates to true for in-memory builds to enable closer * caching, but there does not seem to be a difference in performance. */ neighbors = SelectNeighbors(base, lw, lm, support, &HnswGetNeighbors(base, element, lc)->closerSet, NULL, NULL, false); // 启发式从局部邻居候选队列中选择最终的邻居
AddConnections(base, element, neighbors, lc); // 为 element 和其邻居建立边
/* * Append a pointer to the list. A pointer to the modified list is * returned. Note that this function may or may not destructively * modify the list; callers should always use this function's return * value, rather than continuing to use the pointer passed as the * first argument. */ List * lappend(List *list, void *datum) { Assert(IsPointerList(list));
if (list == NIL) list = new_list(T_List, 1); else new_tail_cell(list);
#ifdef USE_ASSERT_CHECKING /* * Check that the specified List is valid (so far as we can tell). */ staticvoid check_list_invariants(const List *list) { if (list == NIL) return;
/* * A pairing heap. * * You can use pairingheap_allocate() to create a new palloc'd heap, or embed * this in a larger struct, set ph_compare and ph_arg directly and initialize * ph_root to NULL. */ typedefstructpairingheap { pairingheap_comparator ph_compare; /* comparison function */ void *ph_arg; /* opaque argument to ph_compare */ pairingheap_node *ph_root; /* current root of the heap */ } pairingheap;
/* * Algorithm 2 from paper */ List * HnswSearchLayer(char *base, HnswQuery * q, List *ep, int ef, int lc, Relation index, HnswSupport * support, int m, bool inserting, HnswElement skipElement, visited_hash * v, pairingheap **discarded, bool initVisited, int64 *tuples) { List *w = NIL; pairingheap *C = pairingheap_allocate(CompareNearestCandidates, NULL); pairingheap *W = pairingheap_allocate(CompareFurthestCandidates, NULL); int wlen = 0; visited_hash vh; ListCell *lc2; HnswNeighborArray *localNeighborhood = NULL; Size neighborhoodSize = 0; int lm = HnswGetLayerM(m, lc); HnswUnvisited *unvisited = palloc(lm * sizeof(HnswUnvisited)); int unvisitedLength; bool inMemory = index == NULL;
if (v == NULL) { v = &vh; initVisited = true; }
if (initVisited) { InitVisited(base, v, inMemory, ef, m);
if (discarded != NULL) *discarded = pairingheap_allocate(CompareNearestDiscardedCandidates, NULL); }
/* Create local memory for neighborhood if needed */ if (inMemory) { neighborhoodSize = HNSW_NEIGHBOR_ARRAY_SIZE(lm); localNeighborhood = palloc(neighborhoodSize); }
/* Add entry points to v, C, and W */ foreach(lc2, ep) { HnswSearchCandidate *sc = (HnswSearchCandidate *) lfirst(lc2); bool found;
if (initVisited) { AddToVisited(base, v, sc->element, inMemory, &found);
/* OK to count elements instead of tuples */ if (tuples != NULL) (*tuples)++; }
/* * Do not count elements being deleted towards ef when vacuuming. It * would be ideal to do this for inserts as well, but this could * affect insert performance. */ if (CountElement(skipElement, HnswPtrAccess(base, sc->element))) wlen++; }
/* Avoid any allocations if not adding */ eElement = NULL; HnswLoadElementImpl(blkno, offno, &eDistance, q, index, support, inserting, alwaysAdd || discarded != NULL ? NULL : &f->distance, &eElement);
if (eElement == NULL) continue; }
if (eElement == NULL || !(eDistance < f->distance || alwaysAdd)) { if (discarded != NULL) { /* Create a new candidate */ e = HnswInitSearchCandidate(base, eElement, eDistance); pairingheap_add(*discarded, &e->w_node); }
continue; }
/* Make robust to issues */ if (eElement->level < lc) continue;
/* Create a new candidate */ e = HnswInitSearchCandidate(base, eElement, eDistance); pairingheap_add(C, &e->c_node); pairingheap_add(W, &e->w_node);
/* * Do not count elements being deleted towards ef when vacuuming. * It would be ideal to do this for inserts as well, but this * could affect insert performance. */ if (CountElement(skipElement, eElement)) { wlen++;
/* No need to decrement wlen */ if (wlen > ef) { HnswSearchCandidate *d = HnswGetSearchCandidate(w_node, pairingheap_remove_first(W));
if (discarded != NULL) pairingheap_add(*discarded, &d->w_node); } } } }
/* Add each element of W to w */ while (!pairingheap_is_empty(W)) { HnswSearchCandidate *sc = HnswGetSearchCandidate(w_node, pairingheap_remove_first(W));
/* * HnswPtrDeclare 是一个两用指针 union 的宏定义,type *ptr 指的是直接指针,而 relptrtype relptr 指的是偏移量。 */ #define HnswPtrDeclare(type,relptrtype,ptrtype) relptr_declare(type, relptrtype); typedef union { type *ptr; relptrtype relptr; } ptrtype; /* Pointers that can be absolute or relative */ /* Use char for DatumPtr so works with Pointer */ HnswPtrDeclare(HnswElementData, HnswElementRelptr, HnswElementPtr);
typedefstructHnswGraph { /* Graph state */ slock_t lock; HnswElementPtr head; // head 为一个两用指针 double indtuples; }
/* * Add to element list */ staticvoid AddElementInMemory(char *base, HnswGraph * graph, HnswElement element) { SpinLockAcquire(&graph->lock); element->next = graph->head; HnswPtrStore(base, graph->head, element); // 将 graph->head 指针指向的地址替换为 element 的地址 SpinLockRelease(&graph->lock); }
/* Add candidates */ for (int i = 0; i < neighbors->length; i++) c = lappend(c, &neighbors->items[i]); c = lappend(c, &newHc); // 2.2 将邻居列表都加入到临时列表 c 中,最后把 新增节点 插入到 c 中
/* Should not happen */ if (pruned == NULL) return;
/* Find and replace the pruned element */ for (int i = 0; i < neighbors->length; i++) { if (HnswPtrEqual(base, neighbors->items[i].element, pruned->element)) // 这里默认最多删除一个元素,不会出现删除过多的情况 { neighbors->items[i] = newHc;
/* * 工具函数,判断新的 HnswCandidate e 是否符合选边策略 * Check if an element is closer to q than any element from R */ staticbool CheckElementCloser(char *base, HnswCandidate * e, List *r, HnswSupport * support) { HnswElement eElement = HnswPtrAccess(base, e->element); Datum eValue = HnswGetValue(base, eElement); ListCell *lc2;
/* Ensure order of candidates is deterministic for closer caching */ if (sortCandidates) { if (base == NULL) list_sort(w, CompareCandidateDistances); // 列表排序,根据距离降序排列 else list_sort(w, CompareCandidateDistancesOffset); }
while (list_length(w) > 0 && list_length(r) < lm) { /* Assumes w is already ordered desc */// 拿到距离最小的一个候选邻居 HnswCandidate *e = llast(w);
w = list_delete_last(w);
/* Use previous state of r and wd to skip work when possible */ if (mustCalculate) e->closer = CheckElementCloser(base, e, r, support); // 检查当前候选邻居是否不用被剪掉 elseif (list_length(added) > 0) { /* Keep Valgrind happy for in-memory, parallel builds */ if (base != NULL) VALGRIND_MAKE_MEM_DEFINED(&e->closer, 1);
/* * If the current candidate was closer, we only need to compare it * with the other candidates that we have added. */ if (e->closer) { e->closer = CheckElementCloser(base, e, added, support);
if (!e->closer) removedAny = true; } else { /* * If we have removed any candidates from closer, a candidate * that was not closer earlier might now be. */ if (removedAny) { e->closer = CheckElementCloser(base, e, r, support); if (e->closer) added = lappend(added, e); } } } elseif (e == newCandidate) { e->closer = CheckElementCloser(base, e, r, support); if (e->closer) added = lappend(added, e); }
/* Keep Valgrind happy for in-memory, parallel builds */ if (base != NULL) VALGRIND_MAKE_MEM_DEFINED(&e->closer, 1);
if (e->closer) r = lappend(r, e); else wd[wdlen++] = e; }
/* Cached value can only be used in future if sorted deterministically */ *closerSet = sortCandidates;
/* Keep pruned connections */ while (wdoff < wdlen && list_length(r) < lm) r = lappend(r, wd[wdoff++]); // 如果裁减的邻居太多了导致不满 M 个,恢复一些邻居
/* Return pruned for update connections */ if (pruned != NULL) { if (wdoff < wdlen) *pruned = wd[wdoff]; else *pruned = linitial(w); // 裁减掉距离最远的一个候选邻居 }
/* * If the current candidate was closer, we only need to compare it * with the other candidates that we have added. */ if (e->closer) { e->closer = CheckElementCloser(base, e, added, support); // 3、已经被成功选择过一次的邻居节点,只需要比较它和新增节点的距离,而不需要比较之前的邻居(和之前的邻居的比较都必定返回 true)
if (!e->closer) removedAny = true; // 4、如果之前被选择但这次被删除了,后续的邻居都可能会变化,启动强制计算 } else { /* * If we have removed any candidates from closer, a candidate * that was not closer earlier might now be. */ if (removedAny) { e->closer = CheckElementCloser(base, e, r, support); // 5、 r 是截至到目前循环轮数的全部通过筛选的邻居列表,e 需要和全部列表进行比较 if (e->closer) added = lappend(added, e); } } } elseif (e == newCandidate) // 2、遇到了 newCandidate ,如果 newCandidate 被成功选择,后面的其它邻居都可能会被影响,所以修改 added 用于判断后续邻居需要计算 { e->closer = CheckElementCloser(base, e, r, support); if (e->closer) added = lappend(added, e); }
// 如果单页的容量能够保证向量和其邻居放置于一页中,但是目前的页容量不足,直接分配新的页并将当前的 buf 和 page 替换成新的。 /* Keep element and neighbors on the same page if possible */ if (PageGetFreeSpace(page) < etupSize || (combinedSize <= maxSize && PageGetFreeSpace(page) < combinedSize)) HnswBuildAppendPage(index, &buf, &page, forkNum);
/* Add element */ if (PageAddItem(page, (Item) etup, etupSize, InvalidOffsetNumber, false, false) != element->offno) elog(ERROR, "failed to add index item to \"%s\"", RelationGetRelationName(index));
/* Add new page if needed */ if (PageGetFreeSpace(page) < ntupSize) HnswBuildAppendPage(index, &buf, &page, forkNum);
/* Add placeholder for neighbors */ if (PageAddItem(page, (Item) ntup, ntupSize, InvalidOffsetNumber, false, false) != element->neighborOffno) elog(ERROR, "failed to add index item to \"%s\"", RelationGetRelationName(index)); }
/* * Set element tuple, except for neighbor info */ void HnswSetElementTuple(char *base, HnswElementTuple etup, HnswElement element) { Pointer valuePtr = HnswPtrAccess(base, element->value);
etup->type = HNSW_ELEMENT_TUPLE_TYPE; etup->level = element->level; etup->deleted = 0; etup->version = element->version; for (int i = 0; i < HNSW_HEAPTIDS; i++) { if (i < element->heaptidsLength) etup->heaptids[i] = element->heaptids[i]; // 拷贝 TID else ItemPointerSetInvalid(&etup->heaptids[i]); // 设置为无效 } memcpy(&etup->data, valuePtr, VARSIZE_ANY(valuePtr)); // 复制向量数据 }
可以看到,页面即将存储的元组 etup 的 tid 列表来自于 element 。那么 element 的 tid 列表来自哪里呢?我们回过头看,之前阅读过但没注意到的 InsertTuple 函数中设置了插入元素的 heaptid :
1 2 3 4 5 6 7 8
/* Ok, we can proceed to allocate the element */ element = HnswInitElement(base, heaptid, buildstate->m, buildstate->ml, buildstate->maxLevel, allocator); // 初始化 element 时,将下层 scan 函数传递过来的 heaptid 赋值给了 element valuePtr = HnswAlloc(allocator, valueSize);
读完了 tid 的具体由来,我们接着往下读。下面这段代码中, PageGetMaxOffsetNumber 获取了当前页面最后一个数据指针的偏移量,其实也就是页面中元组的数量。因此内存中的链表图节点 element 的 偏移量 element->offno 被设置成了其数据指针的位置,而其块号直接是当前块的块号。
/* * PageGetMaxOffsetNumber * Returns the maximum offset number used by the given page. * Since offset numbers are 1-based, this is also the number * of items on the page. * * NOTE: if the page is not initialized (pd_lower == 0), we must * return zero to ensure sane behavior. */ staticinline OffsetNumber PageGetMaxOffsetNumber(Page page) { PageHeader pageheader = (PageHeader) page;
/* * ItemPointerSet * Sets a disk item pointer to the specified block and offset. */ staticinlinevoid ItemPointerSet(ItemPointerData *pointer, BlockNumber blockNumber, OffsetNumber offNum) { Assert(PointerIsValid(pointer)); BlockIdSet(&pointer->ip_blkid, blockNumber); pointer->ip_posid = offNum; }
到这里,我们准备写入的 tuple 内容便都准备好了。再次回想一下之前都做了什么:
1、完成了 节点元组 etuple 的内容准备,从内存中的节点复制信息。包括:复制节点头数据(层级、状态);复制向量数据;复制 tid 数组。
2、计算好了每个节点的邻居数组 ntuple 所需的空间。准备好了执行插入的页面,etuple 和 ntuple 的总长比当前页面剩余空间大,但仍然能够放置于一个新页中时,自动新建页面。
3、内存中的节点 element 的内容被更新了,更新的主要内容是向量节点的 块号,偏移量,以及其邻居数组的块号和偏移量。同时,etuple 中的邻居 tuple 的 tid 也被更新完成了。
总的来说,我们确定了当前处理的节点和它邻居两个 tuple 的储存位置,接下来的步骤是进行写盘:
1 2 3 4 5 6 7 8 9 10 11
/* Add element */ if (PageAddItem(page, (Item) etup, etupSize, InvalidOffsetNumber, false, false) != element->offno) elog(ERROR, "failed to add index item to \"%s\"", RelationGetRelationName(index));
/* Add new page if needed */ if (PageGetFreeSpace(page) < ntupSize) HnswBuildAppendPage(index, &buf, &page, forkNum);
/* Add placeholder for neighbors */ if (PageAddItem(page, (Item) ntup, ntupSize, InvalidOffsetNumber, false, false) != element->neighborOffno) elog(ERROR, "failed to add index item to \"%s\"", RelationGetRelationName(index));
写盘相对简单,我们就不深入阅读了。值得一提的是, Add new page if needed 这段逻辑,用于处理即使一个新页也无法同时容纳 etuple 和 ntuple 时分配页面存储 ntuple 的逻辑。
if (!PageIndexTupleOverwrite(page, element->neighborOffno, (Item) ntup, ntupSize)) elog(ERROR, "failed to add index item to \"%s\"", RelationGetRelationName(index));