pgvector-hnswbuild源码解读

写在前面

pgvector 是 PG 的向量索引插件,其最低需要 PG versuib >= 13 。虽然其性能较差,但其对 PG 生态的贡献是有意义的,下图是其 github 的官方描述:

alt text

https://github.com/pgvector/pgvector

这是一张 HNSW 索引构建时函数栈调用情况的全面火焰图:

alt text

本篇文章,我们将从 hnswbuild 出发,在阅读并理解其构建流程的基础上,学习 PostgreSQL 的相关源码。

源码阅读

1、hnswbuild

hnswbuildhnswbuildempty 是索引构建的入口函数,下图是 hnswhandler 的定义:

alt text

hnswhandler 是 pgvector 定义的 PG 索引实现接口结构体,结构体中储存了 pgvector 实现的索引相关函数。IndexAmRoutine amroutine 是 pg 定义的 index AM 接口结构体类型, AM 的全称是 Access Method(访问或者说是存取方法),它是服务于索引引擎层的可拓展结构。索引引擎实现了一个接口,它主要的用途是从AM中获取TID,并且对它们进行处理。

alt text

我们继续阅读 IndexAmRoutine ,发现 ambuildhnswbuildempty 其实都是函数指针,在这里,函数指针扮演了多态的任务,实现了同一个接口 ambuild 动态调用不同索引的不同实现(如: HNSW 、 GIST 等索引都有一套自己的构建函数)。

alt text

alt text

如上图所示, ambuild 是一个函数指针:

IndexBuildResult *:这是函数的返回类型,表示返回一个指向 **IndexBuildResult** 类型的指针。

(*ambuild_function):这是函数指针的名字,表示这是一个指向函数的指针,名字为 ambuild_function。

(Relation heapRelation, Relation indexRelation, struct IndexInfo *indexInfo):这是函数的参数列表,包含三个参数:

    Relation heapRelation:表示堆关系。

    Relation indexRelation:表示索引关系。

    struct IndexInfo *indexInfo:表示一个指向 IndexInfo 结构体的指针。

我们遵循着调用栈,往栈内继续看,发现是 PG 中 index.c 文件的 index_build 函数真实调用了具体的 ambuild ,这也验证了我们之前的多态假想:

alt text

到这里我们就可以暂停阅读 Index 引擎的具体实现,将重点转回到 pgvector 上了。基于 C 语言的 PG ,为了实现其多态思想,只能使用函数指针从下层写起,从某些层面来讲直接使用函数指针的效率肯定会高一些,但从某些层面来讲这也是语言的不足。

推荐扩展阅读:

PG 索引基础知识: https://www.mengqingzhong.com/2020/10/01/postgresql-index-introduction-1/

PG 索引接口描述:http://postgres.cn/docs/9.4/index-functions.html

我们接着读 hnswbuildhnswbuildempty

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
/*
* Build the index for a logged table
*/
IndexBuildResult *
hnswbuild(Relation heap, Relation index, IndexInfo *indexInfo)
{
IndexBuildResult *result;
HnswBuildState buildstate;

BuildIndex(heap, index, indexInfo, &buildstate, MAIN_FORKNUM);

result = (IndexBuildResult *) palloc(sizeof(IndexBuildResult));
result->heap_tuples = buildstate.reltuples;
result->index_tuples = buildstate.indtuples;

return result;
}

/*
* Build the index for an unlogged table
*/
void
hnswbuildempty(Relation index)
{
IndexInfo *indexInfo = BuildIndexInfo(index);
HnswBuildState buildstate;

BuildIndex(NULL, index, indexInfo, &buildstate, INIT_FORKNUM);
}

关于 logged table 和 unlogged table ,可以阅读下面的文档。简而言之, unlogged table 是一种处于最快写入速度和最佳数据准确性之间的长效表:

https://developer.volcengine.com/articles/7103843662659846174

https://www.cnblogs.com/ilifeilong/p/11783565.html

从火焰图中,我们更关注 hnswbuild ,所以我们深入 hnswbuild 的调用子栈 BuildIndex 去阅读。

2、BuildIndex

首先我们阅读一下 pgvector 针对 HNSW 索引的构建逻辑设计:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
/*
* The HNSW build happens in two phases:
*
* 1. In-memory phase
*
* In this first phase, the graph is held completely in memory. When the graph
* is fully built, or we run out of memory reserved for the build (determined
* by maintenance_work_mem), we materialize the graph to disk (see
* FlushPages()), and switch to the on-disk phase.
*
* In a parallel build, a large contiguous chunk of shared memory is allocated
* to hold the graph. Each worker process has its own HnswBuildState struct in
* private memory, which contains information that doesn't change throughout
* the build, and pointers to the shared structs in shared memory. The shared
* memory area is mapped to a different address in each worker process, and
* 'HnswBuildState.hnswarea' points to the beginning of the shared area in the
* worker process's address space. All pointers used in the graph are
* "relative pointers", stored as an offset from 'hnswarea'.
*
* Each element is protected by an LWLock. It must be held when reading or
* modifying the element's neighbors or 'heaptids'.
*
* In a non-parallel build, the graph is held in backend-private memory. All
* the elements are allocated in a dedicated memory context, 'graphCtx', and
* the pointers used in the graph are regular pointers.
*
* 2. On-disk phase
*
* In the on-disk phase, the index is built by inserting each vector to the
* index one by one, just like on INSERT. The only difference is that we don't
* WAL-log the individual inserts. If the graph fit completely in memory and
* was fully built in the in-memory phase, the on-disk phase is skipped.
*
* After we have finished building the graph, we perform one more scan through
* the index and write all the pages to the WAL.
*/

上面是 hnswbuild.c 文件的头注释。可以看到, HNSW 的构建被分为了两步,内存阶段和交换阶段:

如果内存足够容纳整个索引,构建阶段将不会产生 IO ,直到整个索引构建完成,统一进行写入。

如果内存不足,构建会变成若干不写日志的插入操作。

同时,针对并行构建和非并行构建两种方式,图索引的内存存储空间是不同的:

针对并行构建, pgvector 分配了一大块连续的共享内存来保存图。每个进程都拥有独立的私有的 HnswBuildStateHnswBuildState 的定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
typedef struct HnswBuildState
{
/* Info */
Relation heap;
Relation index;
IndexInfo *indexInfo;
ForkNumber forkNum;
const HnswTypeInfo *typeInfo;

/* Settings */
int dimensions;
int m;
int efConstruction;

/* Statistics */
double indtuples;
double reltuples;

/* Support functions */
HnswSupport support;

/* Variables */
HnswGraph graphData;
HnswGraph *graph;
double ml;
int maxLevel;

/* Memory */
MemoryContext graphCtx;
MemoryContext tmpCtx;
HnswAllocator allocator;

/* Parallel builds */
HnswLeader *hnswleader;
HnswShared *hnswshared;
char *hnswarea;
} HnswBuildState;

HnswBuildState 中最重要的一部分是指向共享内存中的共享结构体的指针。共享内存区域在每个工作进程中映射到不同的地址,并且 HnswBuildState.hnswarea 指向工作进程地址空间中共享区域的起始位置。在并行构建中,使用的所有指针都是 相对指针 ,存储相对于 hnswarea 的偏移量。每个索引元素都由一个 LWLock 保护,在读取或修改元素的邻居或 ‘heaptids’ 时,必须持有该锁。

针对非并行构建,图索引的内存空间都是在进程的私有内存中分配的,且所有的索引项都使用了 graphCtx 作为专用的内存空间,指针全部都是绝对指针。

下图是 PG 的内存模型(引自 https://www.modb.pro/db/1738107982955765760 ):

alt text

读完了整体设计思想,我们接着阅读 BuildIndex 函数的具体实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
/*
* Build graph
*/
static void
BuildGraph(HnswBuildState * buildstate, ForkNumber forkNum)
{
int parallel_workers = 0;

pgstat_progress_update_param(PROGRESS_CREATEIDX_SUBPHASE, PROGRESS_HNSW_PHASE_LOAD);

/* Calculate parallel workers */
if (buildstate->heap != NULL)
parallel_workers = ComputeParallelWorkers(buildstate->heap, buildstate->index);

/* Attempt to launch parallel worker scan when required */
if (parallel_workers > 0)
HnswBeginParallel(buildstate, buildstate->indexInfo->ii_Concurrent, parallel_workers);

/* Add tuples to graph */
if (buildstate->heap != NULL)
{
if (buildstate->hnswleader)
buildstate->reltuples = ParallelHeapScan(buildstate);
else
buildstate->reltuples = table_index_build_scan(buildstate->heap, buildstate->index, buildstate->indexInfo,
true, true, BuildCallback, (void *) buildstate, NULL);

buildstate->indtuples = buildstate->graph->indtuples;
}

/* Flush pages */
if (!buildstate->graph->flushed)
FlushPages(buildstate);

/* End parallel build */
if (buildstate->hnswleader)
HnswEndParallel(buildstate->hnswleader);
}

为了便于简化逻辑,我们首先只阅读非并行构建的处理流程,上面的函数简化后为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
/*
* Build graph
*/
static void
BuildGraph(HnswBuildState * buildstate, ForkNumber forkNum)
{
int parallel_workers = 0;

pgstat_progress_update_param(PROGRESS_CREATEIDX_SUBPHASE, PROGRESS_HNSW_PHASE_LOAD);

/* Add tuples to graph */
buildstate->reltuples = table_index_build_scan(buildstate->heap, buildstate->index, buildstate->indexInfo,
true, true, BuildCallback, (void *) buildstate, NULL);

buildstate->indtuples = buildstate->graph->indtuples;

/* Flush pages */
if (!buildstate->graph->flushed)
FlushPages(buildstate);
}

/*
* table_index_build_scan - scan the table to find tuples to be indexed
*
* This is called back from an access-method-specific index build procedure
* after the AM has done whatever setup it needs. The parent table relation
* is scanned to find tuples that should be entered into the index. Each
* such tuple is passed to the AM's callback routine, which does the right
* things to add it to the new index. After we return, the AM's index
* build procedure does whatever cleanup it needs.
*
* The total count of live tuples is returned. This is for updating pg_class
* statistics. (It's annoying not to be able to do that here, but we want to
* merge that update with others; see index_update_stats.) Note that the
* index AM itself must keep track of the number of index tuples; we don't do
* so here because the AM might reject some of the tuples for its own reasons,
* such as being unable to store NULLs.
*
* If 'progress', the PROGRESS_SCAN_BLOCKS_TOTAL counter is updated when
* starting the scan, and PROGRESS_SCAN_BLOCKS_DONE is updated as we go along.
*
* A side effect is to set indexInfo->ii_BrokenHotChain to true if we detect
* any potentially broken HOT chains. Currently, we set this if there are any
* RECENTLY_DEAD or DELETE_IN_PROGRESS entries in a HOT chain, without trying
* very hard to detect whether they're really incompatible with the chain tip.
* This only really makes sense for heap AM, it might need to be generalized
* for other AMs later.
*/
static inline double
table_index_build_scan(Relation table_rel,
Relation index_rel,
struct IndexInfo *index_info,
bool allow_sync,
bool progress,
IndexBuildCallback callback,
void *callback_state,
TableScanDesc scan)
{
return table_rel->rd_tableam->index_build_range_scan(table_rel,
index_rel,
index_info,
allow_sync,
false,
progress,
0,
InvalidBlockNumber,
callback,
callback_state,
scan);
}

/* Typedef for callback function for table_index_build_scan */
typedef void (*IndexBuildCallback) (Relation index,
ItemPointer tid,
Datum *values,
bool *isnull,
bool tupleIsAlive,
void *state);

table_index_build_scan 是 PG 转为索引构建设计的扫表计划, IndexBuildCallback 是扫表计划的回调函数指针, 回调函数(在这里是 BuildCallback )是实际构建索引的函数,所以我们要接着去读 pgvector 的 BuildCallback 函数。

3、BuildCallback

BuildCallback 是火山模型中索引构建的最小算子,它的作用是针对单行构建索引。从下面的实现中我们可以看到,其主要任务是切换内存上下文空间到索引构建空间,然后向索引中插入拿到的元组,最后更新索引的头数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
/*
* Callback for table_index_build_scan
*/
/**
* table_index_build_scan 扫描计划的回调函数
*/
static void
BuildCallback(Relation index, ItemPointer tid, Datum *values,
bool *isnull, bool tupleIsAlive, void *state)
{
HnswBuildState *buildstate = (HnswBuildState *) state;
HnswGraph *graph = buildstate->graph;
MemoryContext oldCtx;

/* Skip nulls */
if (isnull[0])
return;

/* Use memory context */
oldCtx = MemoryContextSwitchTo(buildstate->tmpCtx);

/* Insert tuple */
if (InsertTuple(index, values, isnull, tid, buildstate))
{
/* Update progress */
SpinLockAcquire(&graph->lock);
pgstat_progress_update_param(PROGRESS_CREATEIDX_TUPLES_DONE, ++graph->indtuples);
SpinLockRelease(&graph->lock);
}

/* Reset memory context */
MemoryContextSwitchTo(oldCtx);
MemoryContextReset(buildstate->tmpCtx);
}

火焰图中可以看到, InsertTuple 是核心的耗时函数。

4、InsertTuple

下面 InsertTuple 函数的具体实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
/*
* Insert tuple
*/
static bool
InsertTuple(Relation index, Datum *values, bool *isnull, ItemPointer heaptid, HnswBuildState * buildstate)
{
HnswGraph *graph = buildstate->graph;
HnswElement element;
HnswAllocator *allocator = &buildstate->allocator;
HnswSupport *support = &buildstate->support;
Size valueSize;
Pointer valuePtr;
LWLock *flushLock = &graph->flushLock;
char *base = buildstate->hnswarea;
Datum value;

/* Form index value */
if (!HnswFormIndexValue(&value, values, isnull, buildstate->typeInfo, support))
return false;

/* Get datum size */
valueSize = VARSIZE_ANY(DatumGetPointer(value));

/* Ensure graph not flushed when inserting */
LWLockAcquire(flushLock, LW_SHARED);

/* Are we in the on-disk phase? */
if (graph->flushed)
{
LWLockRelease(flushLock);

return HnswInsertTupleOnDisk(index, support, value, heaptid, true);
}

/*
* In a parallel build, the HnswElement is allocated from the shared
* memory area, so we need to coordinate with other processes.
*/
LWLockAcquire(&graph->allocatorLock, LW_EXCLUSIVE);

/*
* Check that we have enough memory available for the new element now that
* we have the allocator lock, and flush pages if needed.
*/
if (graph->memoryUsed >= graph->memoryTotal)
{
LWLockRelease(&graph->allocatorLock);

LWLockRelease(flushLock);
LWLockAcquire(flushLock, LW_EXCLUSIVE);

if (!graph->flushed)
{
ereport(NOTICE,
(errmsg("hnsw graph no longer fits into maintenance_work_mem after " INT64_FORMAT " tuples", (int64) graph->indtuples),
errdetail("Building will take significantly more time."),
errhint("Increase maintenance_work_mem to speed up builds.")));

FlushPages(buildstate);
}

LWLockRelease(flushLock);

return HnswInsertTupleOnDisk(index, support, value, heaptid, true); //
}

/* Ok, we can proceed to allocate the element */
element = HnswInitElement(base, heaptid, buildstate->m, buildstate->ml, buildstate->maxLevel, allocator);
valuePtr = HnswAlloc(allocator, valueSize);

/*
* We have now allocated the space needed for the element, so we don't
* need the allocator lock anymore. Release it and initialize the rest of
* the element.
*/
LWLockRelease(&graph->allocatorLock);

/* Copy the datum */
memcpy(valuePtr, DatumGetPointer(value), valueSize);
HnswPtrStore(base, element->value, valuePtr);

/* Create a lock for the element */
LWLockInitialize(&element->lock, hnsw_lock_tranche_id);

/* Insert tuple */
InsertTupleInMemory(buildstate, element); //

/* Release flush lock */
LWLockRelease(flushLock);

return true;
}

为了简化阅读,我们首先阅读 没有 On-disk 阶段非并行构建 的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
/*
* Insert tuple
*/
static bool
InsertTuple(Relation index, Datum *values, bool *isnull, ItemPointer heaptid, HnswBuildState * buildstate)
{
HnswGraph *graph = buildstate->graph;
HnswElement element;
HnswAllocator *allocator = &buildstate->allocator;
HnswSupport *support = &buildstate->support;
Size valueSize;
Pointer valuePtr;
LWLock *flushLock = &graph->flushLock;
char *base = buildstate->hnswarea;
Datum value;


// 1、从 buffer 中读待索引目标,本地拷贝
/* Form index value */
if (!HnswFormIndexValue(&value, values, isnull, buildstate->typeInfo, support))
return false;

/* Get datum size */
valueSize = VARSIZE_ANY(DatumGetPointer(value));

/*
* Check that we have enough memory available for the new element now that
* we have the allocator lock, and flush pages if needed.
*/
if (graph->memoryUsed >= graph->memoryTotal)
{
return;
}

/* Ok, we can proceed to allocate the element */
// 2、在索引中初始化待目标项的索引项
element = HnswInitElement(base, heaptid, buildstate->m, buildstate->ml, buildstate->maxLevel, allocator);
valuePtr = HnswAlloc(allocator, valueSize);

/* Copy the datum */
memcpy(valuePtr, DatumGetPointer(value), valueSize);
HnswPtrStore(base, element->value, valuePtr);

/* Insert tuple */
InsertTupleInMemory(buildstate, element); //

return true;
}

element 的定义如下图:

alt text

对于一个元组从 *values -> value -> valuePtr 的过程,我们再进一步阅读:

1
2
3
4
5
6
valueSize = VARSIZE_ANY(DatumGetPointer(value));
...
valuePtr = HnswAlloc(allocator, valueSize);
...
memcpy(valuePtr, DatumGetPointer(value), valueSize);
HnswPtrStore(base, element->value, valuePtr);

HnswFormIndexValue 函数从 values 中提取第一个 Datum 值,并对其进行必要的处理(例如解压缩、检查有效性、归一化等)。处理后的 Datum 值存储在 value 中。接着, VARSIZE_ANY(DatumGetPointer(value)) 获取 value 的大小。 HnswAlloc 函数根据 valueSize 分配内存,并返回指向分配内存的指针 valuePtrmemcpy(valuePtr, DatumGetPointer(value), valueSize)value 的内容复制到 valuePtr 指向的内存区域。
HnswPtrStore(base, element->value, valuePtr)valuePtr 存储到 element->value 中,如果 base 为空,则存储绝对指针,否则存储相对 base 的偏移量作为相对指针。

HnswFormIndexValue 函数不仅仅是简单地提取 values 中的值,还可能对值进行解压缩、检查和归一化等处理。这些处理步骤确保了数据在插入前是有效且符合要求的。如下图的定义:

alt text

关于 Datum 的理解:https://blog.hidva.com/2019/12/08/pgtype/

HnswInitElement 函数的功能则聚焦为下图:

alt text

简而言之,其功能为:

1、随机设定待插入层级

2、首先在 allocator 定义的内存空间中,分配节点信息头 HnswElement 长度的空间,拿取其地址 element ,将节点信息头(如:原始向量 tid ,节点层级、节点是否删除等信息)写入到 element 中

3、在 allocator 定义的内存空间中,按层依次初始化存储各层邻居指针的空间

到这里为止,我们读完了 InsertTupleInMemory 函数前的全部前置操作:

alt text

接下来进入阅读最耗时的函数 InsertTupleInMemory

5、InsertTupleInMemory

InsertTupleInMemory 正文

InsertTupleInMemory 几乎占据了 InsertTuple 的全部函数调用子栈,函数耗时占总构建耗时的 95% 以上。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
/*
* Insert tuple in memory
*/
static void
InsertTupleInMemory(HnswBuildState * buildstate, HnswElement element)
{
HnswGraph *graph = buildstate->graph;
HnswSupport *support = &buildstate->support;
HnswElement entryPoint;
LWLock *entryLock = &graph->entryLock;
LWLock *entryWaitLock = &graph->entryWaitLock;
int efConstruction = buildstate->efConstruction;
int m = buildstate->m;
char *base = buildstate->hnswarea;

/* Wait if another process needs exclusive lock on entry lock */
LWLockAcquire(entryWaitLock, LW_EXCLUSIVE);
LWLockRelease(entryWaitLock);

/* Get entry point */
LWLockAcquire(entryLock, LW_SHARED);
entryPoint = HnswPtrAccess(base, graph->entryPoint);

/* Prevent concurrent inserts when likely updating entry point */
if (entryPoint == NULL || element->level > entryPoint->level)
{
/* Release shared lock */
LWLockRelease(entryLock);

/* Tell other processes to wait and get exclusive lock */
LWLockAcquire(entryWaitLock, LW_EXCLUSIVE);
LWLockAcquire(entryLock, LW_EXCLUSIVE);
LWLockRelease(entryWaitLock);

/* Get latest entry point after lock is acquired */
entryPoint = HnswPtrAccess(base, graph->entryPoint);
}

/* Find neighbors for element */
HnswFindElementNeighbors(base, element, entryPoint, NULL, support, m, efConstruction, false);

/* Update graph in memory */
UpdateGraphInMemory(support, element, m, efConstruction, entryPoint, buildstate);

/* Release entry lock */
LWLockRelease(entryLock);
}

在深入阅读 HnswFindElementNeighborsUpdateGraphInMemory 两个子函数之前,我们有必要阅读下 InsertTupleInMemory 中的两层锁机制,思考两层锁的设计思想:

两层锁为了解决的根本问题是,在执行 将一个向量节点插入至HNSW索引的整个过程中,整个HNSW的入节点不会变化

在完全无锁时,如果某次插入过程中会导致入节点发生变化:第一次向索引中插入向量 或者 新插入的节点层级更高。此时有其它并行插入进程正在尝试读取入节点,便可能会发生 写入未完成时读取 的问题。因此,修改入节点时必须保证没有其它进程尝试读取入节点,也就是下图中的红色部分:

alt text

可以看到,修改入节点内容时必须获取 独占排他锁 LW_EXCLUSIVE ,但检查是否需要修改入节点时,已经需要读取入节点的内容了。想读取临界区资源(入节点的内容),必须加锁,而共享锁是代价最低的,所以第一步骤必定是对入节点 entryLock 加共享锁 LW_SHARED 。也就是下图的红色部分:

alt text

如此一来,我们的问题可以被理解为一个条件锁升级模型:加共享锁读取内容(大部分情况使用共享锁,避免降低并发性能),根据内容判断是否需要升级锁(保障少数情况下的并发准确性)。

由于 PG 没有LWLock锁升级的工具函数,只能通过 解锁 + 新上锁 的方式实现升级。因此,会出现一个短暂的无锁空窗期:

alt text

一、无锁空窗期会导致锁获取乱序,增加锁升级进程的数量,使本不应该触发锁升级的进程触发锁升级。

假设 进程1 Share-get 后发现入节点为 NULL ,需要修改,尝试获取独占锁,在 Share-Free 之后,又有一个 进程2线程1 Exclusive-get 成功前 Share-get 成功,因此它也会读取到 NULL 的入节点,也会进入到更新的逻辑中,但实际上更新逻辑已经被 进程1 触发了, 进程2 不应该走入到更新逻辑中,不应该使用独占锁完成其插入流程,但由于它在 entryLock 释放的空窗期拿到了共享锁,所以还是进入了更新流程。

二、主要问题:无锁空窗期会产生独占锁获取饥饿现象。

我们知道,当进程获取锁失败时,进程会进入到该锁的等待队列,等待被唤醒。但如果进程能够直接获取锁,则不会等待,直接进入后续流程。

如果在空窗期,其它进程成功地对 Lock 变量施加了共享模式锁时,本进程再次尝试获取锁会直接失败,进入到锁的等待队列。因此,其它任何进程能够无视正在等待的独占锁进程,一直成功获取共享锁。这会造成更新进程一直无法获取独占锁,直到所有其它进程都不再尝试获取共享锁,也就是饥饿现象。

为了解决临界资源 entryPoint 空窗期造成的无效锁升级和饥饿问题,二层锁 entryWaitLock 被提出了。 entryWaitLock 是一个独占锁。它的作用是通过独占锁的等待队列排序进程,保证当前进程不会被饥饿困扰,能够在有限的时间内获取到独占锁,从而执行更新流程。

拓展阅读,关于 PG 的 LWLockAcquire 实现

可以看到,锁升级函数都使用了 pg 的加锁函数 LWLockAcquireLWLockAcquire 是一个 pg 实现的通用加锁函数,我们来阅读下其实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
/*
* Code outside of lwlock.c should not manipulate the contents of this
* structure directly, but we have to declare it here to allow LWLocks to be
* incorporated into other data structures.
*/
typedef struct LWLock
{
uint16 tranche; /* tranche ID */
pg_atomic_uint32 state; /* state of exclusive/nonexclusive lockers */
proclist_head waiters; /* list of waiting PGPROCs */
#ifdef LOCK_DEBUG
pg_atomic_uint32 nwaiters; /* number of waiters */
struct PGPROC *owner; /* last exclusive owner of the lock */
#endif
} LWLock;

/*
* LWLockAcquire - acquire a lightweight lock in the specified mode
*
* If the lock is not available, sleep until it is. Returns true if the lock
* was available immediately, false if we had to sleep.
*
* Side effect: cancel/die interrupts are held off until lock release.
*/
bool
LWLockAcquire(LWLock *lock, LWLockMode mode)
{
PGPROC *proc = MyProc;
bool result = true;
int extraWaits = 0;
#ifdef LWLOCK_STATS
lwlock_stats *lwstats;

lwstats = get_lwlock_stats_entry(lock);
#endif

Assert(mode == LW_SHARED || mode == LW_EXCLUSIVE);

PRINT_LWDEBUG("LWLockAcquire", lock, mode);

#ifdef LWLOCK_STATS
/* Count lock acquisition attempts */
if (mode == LW_EXCLUSIVE)
lwstats->ex_acquire_count++;
else
lwstats->sh_acquire_count++;
#endif /* LWLOCK_STATS */

/*
* We can't wait if we haven't got a PGPROC. This should only occur
* during bootstrap or shared memory initialization. Put an Assert here
* to catch unsafe coding practices.
*/
Assert(!(proc == NULL && IsUnderPostmaster));

/* Ensure we will have room to remember the lock */
if (num_held_lwlocks >= MAX_SIMUL_LWLOCKS)
elog(ERROR, "too many LWLocks taken");

/*
* Lock out cancel/die interrupts until we exit the code section protected
* by the LWLock. This ensures that interrupts will not interfere with
* manipulations of data structures in shared memory.
*/
HOLD_INTERRUPTS();

for (;;)
{
bool mustwait;

/*
* Try to grab the lock the first time, we're not in the waitqueue
* yet/anymore.
*/
mustwait = LWLockAttemptLock(lock, mode);

if (!mustwait)
{
LOG_LWDEBUG("LWLockAcquire", lock, "immediately acquired lock");
break; /* got the lock */
}

/* add to the queue */
LWLockQueueSelf(lock, mode);

/* we're now guaranteed to be woken up if necessary */
mustwait = LWLockAttemptLock(lock, mode);

/* ok, grabbed the lock the second time round, need to undo queueing */
if (!mustwait)
{
LOG_LWDEBUG("LWLockAcquire", lock, "acquired, undoing queue");

LWLockDequeueSelf(lock);
break;
}

LOG_LWDEBUG("LWLockAcquire", lock, "waiting");

#ifdef LWLOCK_STATS
lwstats->block_count++;
#endif

LWLockReportWaitStart(lock);
if (TRACE_POSTGRESQL_LWLOCK_WAIT_START_ENABLED())
TRACE_POSTGRESQL_LWLOCK_WAIT_START(T_NAME(lock), mode);

for (;;)
{
PGSemaphoreLock(proc->sem);
if (proc->lwWaiting == LW_WS_NOT_WAITING)
break;
extraWaits++;
}

/* Retrying, allow LWLockRelease to release waiters again. */
pg_atomic_fetch_or_u32(&lock->state, LW_FLAG_RELEASE_OK);

#ifdef LOCK_DEBUG
{
/* not waiting anymore */
uint32 nwaiters PG_USED_FOR_ASSERTS_ONLY = pg_atomic_fetch_sub_u32(&lock->nwaiters, 1);

Assert(nwaiters < MAX_BACKENDS);
}
#endif

if (TRACE_POSTGRESQL_LWLOCK_WAIT_DONE_ENABLED())
TRACE_POSTGRESQL_LWLOCK_WAIT_DONE(T_NAME(lock), mode);
LWLockReportWaitEnd();

LOG_LWDEBUG("LWLockAcquire", lock, "awakened");

/* Now loop back and try to acquire lock again. */
result = false;
}

if (TRACE_POSTGRESQL_LWLOCK_ACQUIRE_ENABLED())
TRACE_POSTGRESQL_LWLOCK_ACQUIRE(T_NAME(lock), mode);

/* Add lock to list of locks held by this backend */
held_lwlocks[num_held_lwlocks].lock = lock;
held_lwlocks[num_held_lwlocks++].mode = mode;

/*
* Fix the process wait semaphore's count for any absorbed wakeups.
*/
while (extraWaits-- > 0)
PGSemaphoreUnlock(proc->sem);

return result;
}

直接阅读源码我们会发现, LWLockAcquire 函数在一个无条件循环中,尝试两次获取锁。第一次获取锁失败后,将自己加入到等待队列中,然后在此尝试获取锁。

如果还是获取失败,才进入无条件循环等待信号量,直到本进程头 PGPROC 中的 lwWaiting 状态字段变为了 LW_WS_NOT_WAITING ,意味着进程被唤醒而停止了等待:

1
2
3
4
5
6
7
for (;;)
{
PGSemaphoreLock(proc->sem);
if (proc->lwWaiting == LW_WS_NOT_WAITING)
break;
extraWaits++;
}

下面两张流程图都展示了这个过程:

alt text

引自 https://mingjie.blog.csdn.net/?type=blog

alt text

引自 https://blog.csdn.net/qq_52668274/article/details/129170343

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
/*
* Internal function that tries to atomically acquire the lwlock in the passed
* in mode.
*
* This function will not block waiting for a lock to become free - that's the
* caller's job.
*
* Returns true if the lock isn't free and we need to wait.
*/
static bool
LWLockAttemptLock(LWLock *lock, LWLockMode mode)
{
uint32 old_state;

Assert(mode == LW_EXCLUSIVE || mode == LW_SHARED);

/*
* Read once outside the loop, later iterations will get the newer value
* via compare & exchange.
*/
old_state = pg_atomic_read_u32(&lock->state);

/* loop until we've determined whether we could acquire the lock or not */
while (true)
{
uint32 desired_state;
bool lock_free;

desired_state = old_state;

if (mode == LW_EXCLUSIVE)
{
lock_free = (old_state & LW_LOCK_MASK) == 0;
if (lock_free)
desired_state += LW_VAL_EXCLUSIVE;
}
else
{
lock_free = (old_state & LW_VAL_EXCLUSIVE) == 0;
if (lock_free)
desired_state += LW_VAL_SHARED;
}

if (pg_atomic_compare_exchange_u32(&lock->state,
&old_state, desired_state))
{
if (lock_free)
{
/* Great! Got the lock. */
#ifdef LOCK_DEBUG
if (mode == LW_EXCLUSIVE)
lock->owner = MyProc;
#endif
return false;
}
else
return true; /* somebody else has the lock */
}
}
pg_unreachable();
}

尝试加锁函数 LWLockAttemptLock 采用了 无条件循环 + CAS 的方式来保证每个获取锁语句执行过程是不被打扰的。如果 CAS 失败则继续循环,重新尝试加锁,直到某次尝试成功。可以看到这个过程的临界区资源访问消耗是极小的(CAS的优点),缺点是可能会出现忙等待(在循环中不断尝试获取锁,如果锁被其他线程持有,可能会导致忙等待,消耗 CPU 资源)、缺乏公平性(可能导致某些线程长时间无法获取锁,出现“饥饿”现象)和依赖硬件支持等缺点。

回过头我们再思考 LWLockAcquire 为什么连续两次尝试获取锁而不进入等待?原因是 LWLock 是 PG 设计的轻量级锁,也就是说每个线程中该锁的持有时间都不会过久,因此针对单个锁,很容易出现频繁的加锁和释放操作。因此,将进程添加进等待队列的过程中,很有可能已经有其他进程释放了锁。如果在第一次尝试失败后直接进入等待状态,进程会被直接挂起,等待锁释放的通知。在刚才的场景下,会出现进程刚刚进入阻塞状态,上下文切换为其它进程,但进程又立刻被通知唤醒了,重新切换为该进程。这就导致了无效的上下文切换,增加系统开销。

先将进程加入到等待队列再二次获取锁,是为了保障线程如果进入等待,能够得到唤醒通知。如果将进程加入到等待队列,然后直接休眠,进程会错过 加入到等待队列 过程中发送的唤醒通知,可能会陷入死锁状态。

扩展阅读《PostgreSQL中的锁》: http://www.postgres.cn/v2/news/viewone/1/241

回归 InsertTupleInMemory 正文

读完了 InsertTupleInMemory 的两重锁设计,以及 PG 中 轻量级锁获取 LWLockAcquire 的设计理念,我们再回归业务,继续阅读 InsertTupleInMemory 中剩余的两个关键子函数。

alt text

6、HnswFindElementNeighbors

HnswFindElementNeighbors 是论文中的 算法1 ,它占据了索引构建耗时的绝大部分,我们需要仔细阅读。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82

/*
* Algorithm 1 from paper
*/
void
HnswFindElementNeighbors(char *base, HnswElement element, HnswElement entryPoint, Relation index, HnswSupport * support, int m, int efConstruction, bool existing)
{
List *ep;
List *w;
int level = element->level;
int entryLevel;
HnswQuery q;
HnswElement skipElement = existing ? element : NULL;
bool inMemory = index == NULL;

q.value = HnswGetValue(base, element);

/* Precompute hash */
if (inMemory)
PrecomputeHash(base, element);

/* No neighbors if no entry point */
if (entryPoint == NULL)
return;

/* Get entry point and level */
ep = list_make1(HnswEntryCandidate(base, entryPoint, &q, index, support, true));
entryLevel = entryPoint->level;

/* 1st phase: greedy search to insert level */
for (int lc = entryLevel; lc >= level + 1; lc--)
{
w = HnswSearchLayer(base, &q, ep, 1, lc, index, support, m, true, skipElement, NULL, NULL, true, NULL);
ep = w;
}

if (level > entryLevel)
level = entryLevel;

/* Add one for existing element */
if (existing)
efConstruction++;

/* 2nd phase */
for (int lc = level; lc >= 0; lc--)
{
int lm = HnswGetLayerM(m, lc);
List *neighbors;
List *lw = NIL; // 局部邻居候选队列
ListCell *lc2;

w = HnswSearchLayer(base, &q, ep, efConstruction, lc, index, support, m, true, skipElement, NULL, NULL, true, NULL);

/* Convert search candidates to candidates */
foreach(lc2, w)
{
HnswSearchCandidate *sc = lfirst(lc2);
HnswCandidate *hc = palloc(sizeof(HnswCandidate));

hc->element = sc->element;
hc->distance = sc->distance;

lw = lappend(lw, hc); // 将每个查询结果加入到局部邻居候选队列
}

/* Elements being deleted or skipped can help with search */
/* but should be removed before selecting neighbors */
if (!inMemory)
lw = RemoveElements(base, lw, skipElement);

/*
* Candidates are sorted, but not deterministically. Could set
* sortCandidates to true for in-memory builds to enable closer
* caching, but there does not seem to be a difference in performance.
*/
neighbors = SelectNeighbors(base, lw, lm, support, &HnswGetNeighbors(base, element, lc)->closerSet, NULL, NULL, false); // 启发式从局部邻居候选队列中选择最终的邻居

AddConnections(base, element, neighbors, lc); // 为 element 和其邻居建立边

ep = w;
}
}

可以看到,函数基本与原算法的逻辑一致,我们在此不过多关注算法实现(重点算法在注释中标注了),而是重点关注 pgvector 为了适配 pg 架构的特殊写法。

List 是 pg 封装的 链表,但是其底层是由动态链表实现的:

alt text

上述函数大量使用了 List 结构,主要涉及到的函数有 list_make1 和 lappend ,我们这里快速阅读下基本算法 lappend:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
/*
* Append a pointer to the list. A pointer to the modified list is
* returned. Note that this function may or may not destructively
* modify the list; callers should always use this function's return
* value, rather than continuing to use the pointer passed as the
* first argument.
*/
List *
lappend(List *list, void *datum)
{
Assert(IsPointerList(list));

if (list == NIL)
list = new_list(T_List, 1);
else
new_tail_cell(list);

llast(list) = datum;
check_list_invariants(list);
return list;
}

#define llast(l) lfirst(list_last_cell(l))

#ifdef USE_ASSERT_CHECKING
/*
* Check that the specified List is valid (so far as we can tell).
*/
static void
check_list_invariants(const List *list)
{
if (list == NIL)
return;

Assert(list->length > 0);
Assert(list->length <= list->max_length);
Assert(list->elements != NULL);

Assert(list->type == T_List ||
list->type == T_IntList ||
list->type == T_OidList ||
list->type == T_XidList);
}
#else
#define check_list_invariants(l) ((void) 0)
#endif /* USE_ASSERT_CHECKING */

可以看到 lappend 基本就是在数组末端添加新项的思路,只不过加了一些拓展和校验操作。

我们把目光转回到核心子函数 HnswSearchLayerSelectNeighbors上,去阅读它们的具体实现,来思考 pgvector 的特殊之处:

7、HnswSearchLayer

HnswSearchLayer 是论文中的算法2, pairingheap 是 pg 实现的堆,和我们之前阅读的 hnswlib 中使用的优先队列一样,他们都支持自定义的比较函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
/*
* A pairing heap.
*
* You can use pairingheap_allocate() to create a new palloc'd heap, or embed
* this in a larger struct, set ph_compare and ph_arg directly and initialize
* ph_root to NULL.
*/
typedef struct pairingheap
{
pairingheap_comparator ph_compare; /* comparison function */
void *ph_arg; /* opaque argument to ph_compare */
pairingheap_node *ph_root; /* current root of the heap */
} pairingheap;

这是 HnswSearchLayer 的具体实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
/*
* Algorithm 2 from paper
*/
List *
HnswSearchLayer(char *base, HnswQuery * q, List *ep, int ef, int lc, Relation index, HnswSupport * support, int m, bool inserting, HnswElement skipElement, visited_hash * v, pairingheap **discarded, bool initVisited, int64 *tuples)
{
List *w = NIL;
pairingheap *C = pairingheap_allocate(CompareNearestCandidates, NULL);
pairingheap *W = pairingheap_allocate(CompareFurthestCandidates, NULL);
int wlen = 0;
visited_hash vh;
ListCell *lc2;
HnswNeighborArray *localNeighborhood = NULL;
Size neighborhoodSize = 0;
int lm = HnswGetLayerM(m, lc);
HnswUnvisited *unvisited = palloc(lm * sizeof(HnswUnvisited));
int unvisitedLength;
bool inMemory = index == NULL;

if (v == NULL)
{
v = &vh;
initVisited = true;
}

if (initVisited)
{
InitVisited(base, v, inMemory, ef, m);

if (discarded != NULL)
*discarded = pairingheap_allocate(CompareNearestDiscardedCandidates, NULL);
}

/* Create local memory for neighborhood if needed */
if (inMemory)
{
neighborhoodSize = HNSW_NEIGHBOR_ARRAY_SIZE(lm);
localNeighborhood = palloc(neighborhoodSize);
}

/* Add entry points to v, C, and W */
foreach(lc2, ep)
{
HnswSearchCandidate *sc = (HnswSearchCandidate *) lfirst(lc2);
bool found;

if (initVisited)
{
AddToVisited(base, v, sc->element, inMemory, &found);

/* OK to count elements instead of tuples */
if (tuples != NULL)
(*tuples)++;
}

pairingheap_add(C, &sc->c_node);
pairingheap_add(W, &sc->w_node);

/*
* Do not count elements being deleted towards ef when vacuuming. It
* would be ideal to do this for inserts as well, but this could
* affect insert performance.
*/
if (CountElement(skipElement, HnswPtrAccess(base, sc->element)))
wlen++;
}

while (!pairingheap_is_empty(C))
{
HnswSearchCandidate *c = HnswGetSearchCandidate(c_node, pairingheap_remove_first(C));
HnswSearchCandidate *f = HnswGetSearchCandidate(w_node, pairingheap_first(W));
HnswElement cElement;

if (c->distance > f->distance)
break;

cElement = HnswPtrAccess(base, c->element);

if (inMemory)
HnswLoadUnvisitedFromMemory(base, cElement, unvisited, &unvisitedLength, v, lc, localNeighborhood, neighborhoodSize);
else
HnswLoadUnvisitedFromDisk(cElement, unvisited, &unvisitedLength, v, index, m, lm, lc);

/* OK to count elements instead of tuples */
if (tuples != NULL)
(*tuples) += unvisitedLength;

for (int i = 0; i < unvisitedLength; i++)
{
HnswElement eElement;
HnswSearchCandidate *e;
double eDistance;
bool alwaysAdd = wlen < ef;

f = HnswGetSearchCandidate(w_node, pairingheap_first(W));

// 对于每个待访问的向量,针对索引是否为全内存,使用两类寻址模式
if (inMemory)
{
eElement = unvisited[i].element;
eDistance = GetElementDistance(base, eElement, q, support);
}
else
{
ItemPointer indextid = &unvisited[i].indextid;
BlockNumber blkno = ItemPointerGetBlockNumber(indextid);
OffsetNumber offno = ItemPointerGetOffsetNumber(indextid);

/* Avoid any allocations if not adding */
eElement = NULL;
HnswLoadElementImpl(blkno, offno, &eDistance, q, index, support, inserting, alwaysAdd || discarded != NULL ? NULL : &f->distance, &eElement);

if (eElement == NULL)
continue;
}

if (eElement == NULL || !(eDistance < f->distance || alwaysAdd))
{
if (discarded != NULL)
{
/* Create a new candidate */
e = HnswInitSearchCandidate(base, eElement, eDistance);
pairingheap_add(*discarded, &e->w_node);
}

continue;
}

/* Make robust to issues */
if (eElement->level < lc)
continue;

/* Create a new candidate */
e = HnswInitSearchCandidate(base, eElement, eDistance);
pairingheap_add(C, &e->c_node);
pairingheap_add(W, &e->w_node);

/*
* Do not count elements being deleted towards ef when vacuuming.
* It would be ideal to do this for inserts as well, but this
* could affect insert performance.
*/
if (CountElement(skipElement, eElement))
{
wlen++;

/* No need to decrement wlen */
if (wlen > ef)
{
HnswSearchCandidate *d = HnswGetSearchCandidate(w_node, pairingheap_remove_first(W));

if (discarded != NULL)
pairingheap_add(*discarded, &d->w_node);
}
}
}
}

/* Add each element of W to w */
while (!pairingheap_is_empty(W))
{
HnswSearchCandidate *sc = HnswGetSearchCandidate(w_node, pairingheap_remove_first(W));

w = lappend(w, sc);
}

return w;
}

可以看到,与 hnswlib 不同, pgvector 对已访问节点的存储方式采用了 hashtable 。而 hnswlib 采用了 全量array动态访问flag 的方法实现。导致实现区别的核心原因是 pgvector 并不是一个简单的算法库,而是一个 PG 系统中的子功能,全量array 固然能节约 Hash定位的消耗,但其分配和销毁代价是不可接受的。我们再反过来思考,为什么 hnswlib 没有使用 hashtable 存储已访问节点呢?原因是 hnswlib 认为其应用场景下,hash 函数的计算消耗远大于指针直接定位的消耗,为了提升其性能,选择了 全量array动态访问flag 代替 hashtable

8、UpdateGraphInMemory

UpdateGraphInMemory 的作用是更新内存中的图,这个函数要完成三个任务:

1、将新的节点插入到图中。

2、对于新节点的每条边,都尝试建立反向边。

3、如果我们新插入的节点应该替代入节点,那么更新入节点。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/*
* Update graph in memory
*/
static void
UpdateGraphInMemory(HnswSupport * support, HnswElement element, int m, int efConstruction, HnswElement entryPoint, HnswBuildState * buildstate)
{
HnswGraph *graph = buildstate->graph;
char *base = buildstate->hnswarea;

/* Look for duplicate */
if (FindDuplicateInMemory(base, element))
return;

/* Add element */
AddElementInMemory(base, graph, element); // 头插法,将element插入到graph->head,完成任务1

/* Update neighbors */
UpdateNeighborsInMemory(base, support, element, m); // 尝试更新反向边,完成任务2

/* Update entry point if needed (already have lock) */
if (entryPoint == NULL || element->level > entryPoint->level) // 更新入节点,完成任务3
HnswPtrStore(base, graph->entryPoint, element);
}

AddElementInMemory 函数可以让我们见微知著,它非常简单,只是获取了图的全局锁,然后用头插法向图中插入了新的节点:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
/*
* 当内存充足时,(base) == NULL 为 true ,该宏可以简化为 (hp).ptr = (value) ,即直接存储地址而不是相对指针。
*/
#define HnswPtrStore(base,hp,value) ((base) == NULL ? (void) ((hp).ptr = (value)) : (void) relptr_store(base, (hp).relptr, value))

/*
* HnswPtrDeclare 是一个两用指针 union 的宏定义,type *ptr 指的是直接指针,而 relptrtype relptr 指的是偏移量。
*/
#define HnswPtrDeclare(type,relptrtype,ptrtype) relptr_declare(type, relptrtype); typedef union { type *ptr; relptrtype relptr; } ptrtype;
/* Pointers that can be absolute or relative */
/* Use char for DatumPtr so works with Pointer */
HnswPtrDeclare(HnswElementData, HnswElementRelptr, HnswElementPtr);

typedef struct HnswGraph
{
/* Graph state */
slock_t lock;
HnswElementPtr head; // head 为一个两用指针
double indtuples;
}

/*
* Add to element list
*/
static void
AddElementInMemory(char *base, HnswGraph * graph, HnswElement element)
{
SpinLockAcquire(&graph->lock);
element->next = graph->head;
HnswPtrStore(base, graph->head, element); // 将 graph->head 指针指向的地址替换为 element 的地址
SpinLockRelease(&graph->lock);
}

读到这里,我们可以明确地知道,当内存足够索引构建时,索引的全部节点都是使用链表进行连接存储的。一方面,使用链表可以尽可能地利用内存碎片,防止连续内存空间不足;另一方面,后续持久化写入也只需要从前到后读取链表。

UpdateNeighborsInMemory 是更新双向边的函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
/*
* Update neighbors,论文中的Algorithm 1的更新部分
*/
static void
UpdateNeighborsInMemory(char *base, HnswSupport * support, HnswElement e, int m)
{
for (int lc = e->level; lc >= 0; lc--) // 对于需要建立边关系的全部层级
{
int lm = HnswGetLayerM(m, lc);
Size neighborsSize = HNSW_NEIGHBOR_ARRAY_SIZE(lm);
HnswNeighborArray *neighbors = palloc(neighborsSize);

/* Copy neighbors to local memory */
LWLockAcquire(&e->lock, LW_SHARED); // 注意并发问题,任何节点的边数组都有可能被反向修改,所以要加读锁
memcpy(neighbors, HnswGetNeighbors(base, e, lc), neighborsSize); // 将 e 的 邻居指针数组 拷贝到函数局部缓存 neighbors
LWLockRelease(&e->lock);

for (int i = 0; i < neighbors->length; i++) // 遍历e的邻居,e是新插入的元素
{
HnswCandidate *hc = &neighbors->items[i];
HnswElement neighborElement = HnswPtrAccess(base, hc->element); // 选取单个邻居,更新该邻居的邻居列表

/* Keep scan-build happy on Mac x86-64 */
Assert(neighborElement);

LWLockAcquire(&neighborElement->lock, LW_EXCLUSIVE); // 尝试向邻居的边数组中加入本节点,如果空间不足,根据启发式选边策略更新边,加写锁
HnswUpdateConnection(base, HnswGetNeighbors(base, neighborElement, lc), e, hc->distance, lm, NULL, NULL, support); // hc 与 e 的距离已经存储下来了,不需要再计算
LWLockRelease(&neighborElement->lock);
}
}
}

HnswUpdateConnection 函数是更新反向边的下层函数,我们来阅读下其优化:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
/*
* 节点的邻居数组数据结构, closerSet 指的是 HnswCandidate 中是否存储了距离信息。
*/
struct HnswNeighborArray
{
int length;
bool closerSet;
HnswCandidate items[FLEXIBLE_ARRAY_MEMBER];
};

/*
* 节点的候选者数据结构,用于 候选队列排序 或者 邻居数组 。
*/
typedef struct HnswCandidate
{
HnswElementPtr element; // 具体所指的节点
float distance; // 所指节点 对于 候选队列所属节点 的距离
bool closer;
} HnswCandidate;


/*
* Update connections
* neighbors 是 当前节点的某个邻居节点 的邻居列表
* newElement 是尝试插入 neighbors 的新节点,即当前节点
* distance 是 当前节点newElement 和 本次整理的邻居节点 的距离

* 可以看到,参数中并没有直接传递邻居节点,原因是 邻居节点 的 邻居列表 中元素与其距离都存储下来了,不需要再进行额外的计算
*/
void
HnswUpdateConnection(char *base, HnswNeighborArray * neighbors, HnswElement newElement, float distance, int lm, int *updateIdx, Relation index, HnswSupport * support)
{
HnswCandidate newHc;

HnswPtrStore(base, newHc.element, newElement);
newHc.distance = distance;

if (neighbors->length < lm)
{
// 1、邻居还不满 M ,直接加入

neighbors->items[neighbors->length++] = newHc;

/* Track update */
if (updateIdx != NULL)
*updateIdx = -2;
}
else
{
// 2、邻居满 M ,根据启发式选边策略重新选边

/* Shrink connections */

List *c = NIL;
HnswCandidate *pruned = NULL; // 2.1 指针 pruned 内容为 NULL , pruned 指的是被裁减调的邻居列表

/* Add candidates */
for (int i = 0; i < neighbors->length; i++)
c = lappend(c, &neighbors->items[i]);
c = lappend(c, &newHc); // 2.2 将邻居列表都加入到临时列表 c 中,最后把 新增节点 插入到 c 中

SelectNeighbors(base, c, lm, support, &neighbors->closerSet, &newHc, &pruned, true); // 2.3 执行启发式选边, pruned 将存储被删除掉的邻居数组

/* Should not happen */
if (pruned == NULL)
return;

/* Find and replace the pruned element */
for (int i = 0; i < neighbors->length; i++)
{
if (HnswPtrEqual(base, neighbors->items[i].element, pruned->element)) // 这里默认最多删除一个元素,不会出现删除过多的情况
{
neighbors->items[i] = newHc;

/* Track update */
if (updateIdx != NULL)
*updateIdx = i;

break;
}
}
}
}

读完了上述代码,我们可以发现 pgvector 将之前搜索过程中计算得到的距离存储了下来,在更新反向边时减轻了大量的距离计算,这是值得我们借鉴的设计思路。

SelectNeighbors 是最终的启发式选边算法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
/*
* 数组浅拷贝,直接复制内存内容
* Return a shallow copy of the specified list.
*/
List *
list_copy(const List *oldlist)
{
List *newlist;

if (oldlist == NIL)
return NIL;

newlist = new_list(oldlist->type, oldlist->length);
memcpy(newlist->elements, oldlist->elements,
newlist->length * sizeof(ListCell));

check_list_invariants(newlist);
return newlist;
}

/*
* 工具函数,判断新的 HnswCandidate e 是否符合选边策略
* Check if an element is closer to q than any element from R
*/
static bool
CheckElementCloser(char *base, HnswCandidate * e, List *r, HnswSupport * support)
{
HnswElement eElement = HnswPtrAccess(base, e->element);
Datum eValue = HnswGetValue(base, eElement);
ListCell *lc2;

foreach(lc2, r)
{
HnswCandidate *ri = lfirst(lc2);
HnswElement riElement = HnswPtrAccess(base, ri->element);
Datum riValue = HnswGetValue(base, riElement);
float distance = HnswGetDistance(eValue, riValue, support);

if (distance <= e->distance)
return false;
}

return true;
}

/*
* Algorithm 4 from paper
* c 是当前节点的候选邻居列表;
* lm 是最大邻居数;
* newCandidate 本义上应该是 当前节点 ,
*/
static List *
SelectNeighbors(char *base, List *c, int lm, HnswSupport * support, bool *closerSet, HnswCandidate * newCandidate, HnswCandidate * *pruned, bool sortCandidates)
{
List *r = NIL;
List *w = list_copy(c); // 调用浅复制,避免删除操作破坏原本的列表 c
HnswCandidate **wd;
int wdlen = 0;
int wdoff = 0;
bool mustCalculate = !(*closerSet);
List *added = NIL;
bool removedAny = false;

if (list_length(w) <= lm)
return w;

wd = palloc(sizeof(HnswCandidate *) * list_length(w)); // 被裁减调的元素列表

/* Ensure order of candidates is deterministic for closer caching */
if (sortCandidates)
{
if (base == NULL)
list_sort(w, CompareCandidateDistances); // 列表排序,根据距离降序排列
else
list_sort(w, CompareCandidateDistancesOffset);
}

while (list_length(w) > 0 && list_length(r) < lm)
{
/* Assumes w is already ordered desc */ // 拿到距离最小的一个候选邻居
HnswCandidate *e = llast(w);

w = list_delete_last(w);

/* Use previous state of r and wd to skip work when possible */
if (mustCalculate)
e->closer = CheckElementCloser(base, e, r, support); // 检查当前候选邻居是否不用被剪掉
else if (list_length(added) > 0)
{
/* Keep Valgrind happy for in-memory, parallel builds */
if (base != NULL)
VALGRIND_MAKE_MEM_DEFINED(&e->closer, 1);

/*
* If the current candidate was closer, we only need to compare it
* with the other candidates that we have added.
*/
if (e->closer)
{
e->closer = CheckElementCloser(base, e, added, support);

if (!e->closer)
removedAny = true;
}
else
{
/*
* If we have removed any candidates from closer, a candidate
* that was not closer earlier might now be.
*/
if (removedAny)
{
e->closer = CheckElementCloser(base, e, r, support);
if (e->closer)
added = lappend(added, e);
}
}
}
else if (e == newCandidate)
{
e->closer = CheckElementCloser(base, e, r, support);
if (e->closer)
added = lappend(added, e);
}

/* Keep Valgrind happy for in-memory, parallel builds */
if (base != NULL)
VALGRIND_MAKE_MEM_DEFINED(&e->closer, 1);

if (e->closer)
r = lappend(r, e);
else
wd[wdlen++] = e;
}

/* Cached value can only be used in future if sorted deterministically */
*closerSet = sortCandidates;


/* Keep pruned connections */
while (wdoff < wdlen && list_length(r) < lm)
r = lappend(r, wd[wdoff++]); // 如果裁减的邻居太多了导致不满 M 个,恢复一些邻居

/* Return pruned for update connections */
if (pruned != NULL)
{
if (wdoff < wdlen)
*pruned = wd[wdoff];
else
*pruned = linitial(w); // 裁减掉距离最远的一个候选邻居
}

return r;
}

其中,这一段涉及到了变量 mustCalculate ,mustCalculate 是 pgvector 的计算优化。如果 mustCalculate 为 true ,则不启动优化。该优化主要用于减少节点邻居选择时重复的无效距离计算和比较。

我们单独拿启动优化的这段函数来说明:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
List	   *added = NIL;

else if (list_length(added) > 0) // 1、added 在迭代没有遇到 newCandidate 之前,都是空,所以不会进入该分支
{
/* Keep Valgrind happy for in-memory, parallel builds */
if (base != NULL)
VALGRIND_MAKE_MEM_DEFINED(&e->closer, 1);

/*
* If the current candidate was closer, we only need to compare it
* with the other candidates that we have added.
*/
if (e->closer)
{
e->closer = CheckElementCloser(base, e, added, support); // 3、已经被成功选择过一次的邻居节点,只需要比较它和新增节点的距离,而不需要比较之前的邻居(和之前的邻居的比较都必定返回 true)

if (!e->closer)
removedAny = true; // 4、如果之前被选择但这次被删除了,后续的邻居都可能会变化,启动强制计算
}
else
{
/*
* If we have removed any candidates from closer, a candidate
* that was not closer earlier might now be.
*/
if (removedAny)
{
e->closer = CheckElementCloser(base, e, r, support); // 5、 r 是截至到目前循环轮数的全部通过筛选的邻居列表,e 需要和全部列表进行比较
if (e->closer)
added = lappend(added, e);
}
}
}
else if (e == newCandidate) // 2、遇到了 newCandidate ,如果 newCandidate 被成功选择,后面的其它邻居都可能会被影响,所以修改 added 用于判断后续邻居需要计算
{
e->closer = CheckElementCloser(base, e, r, support);
if (e->closer)
added = lappend(added, e);
}

在当前版本的 pgvector 中, mustCalculate 默认为 true ,即不启动上面这段计算优化,原因是没有明显的性能优化:

alt text

1
2
if (mustCalculate)
e->closer = CheckElementCloser(base, e, r, support); // 检查当前候选邻居是否不用被剪掉

读到这里 UpdateGraphInMemory 函数我们便整体阅读完成了,总结一下学到的优化方法:

针对每一处距离计算,精心地思考其复用可能性

9、回到 BuildGraph -> FlushPages

我们已经读完了索引构建扫描计划的回调函数 —— BuildCallback 在无并行、内存充足时的全部逻辑,理清了其中的并发设计(虽然无并发时无意义),接下来我们再阅读持久化部分:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
/*
* Build graph
*/
static void
BuildGraph(HnswBuildState * buildstate, ForkNumber forkNum)
{
int parallel_workers = 0;

pgstat_progress_update_param(PROGRESS_CREATEIDX_SUBPHASE, PROGRESS_HNSW_PHASE_LOAD);

/* Calculate parallel workers */
if (buildstate->heap != NULL)
parallel_workers = ComputeParallelWorkers(buildstate->heap, buildstate->index);

/* Attempt to launch parallel worker scan when required */
if (parallel_workers > 0)
HnswBeginParallel(buildstate, buildstate->indexInfo->ii_Concurrent, parallel_workers);

/* Add tuples to graph */
if (buildstate->heap != NULL)
{
if (buildstate->hnswleader)
buildstate->reltuples = ParallelHeapScan(buildstate);
else
buildstate->reltuples = table_index_build_scan(buildstate->heap, buildstate->index, buildstate->indexInfo,
true, true, BuildCallback, (void *) buildstate, NULL);

buildstate->indtuples = buildstate->graph->indtuples;
}

/* Flush pages */
if (!buildstate->graph->flushed)
FlushPages(buildstate);

/* End parallel build */
if (buildstate->hnswleader)
HnswEndParallel(buildstate->hnswleader);
}

在调用 FlushPages 前 ,我们再来回忆一下之前 BuildCallback 做了什么:

1、从 buildstate 中读取 HnswGraph  *graph 作为图结构的总入口,graph 是一个节点链表

2、对每个拿到的 向量数据和其 tid ,根据 HNSW 索引原理建立 HnswElement ,头插法向 graph 中插入其指针;以列表形式建立 edges 。节点和边都存储在内存中。

buildstate 是一个 超参数集合 ,它统一存放了所有索引构建的数据,如索引的元数据 Settings 、索引构建结果 Variables :

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38

typedef struct HnswBuildState
{
/* Info */
Relation heap;
Relation index;
IndexInfo *indexInfo;
ForkNumber forkNum;
const HnswTypeInfo *typeInfo;

/* Settings */
int dimensions;
int m;
int efConstruction;

/* Statistics */
double indtuples;
double reltuples;

/* Support functions */
HnswSupport support;

/* Variables */
HnswGraph graphData;
HnswGraph *graph;
double ml;
int maxLevel;

/* Memory */
MemoryContext graphCtx;
MemoryContext tmpCtx;
HnswAllocator allocator;

/* Parallel builds */
HnswLeader *hnswleader;
HnswShared *hnswshared;
char *hnswarea;
} HnswBuildState;

接下来的任务是持久化索引, FlushPages 完成了这个任务:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/*
* Flush pages
*/
static void
FlushPages(HnswBuildState * buildstate)
{
#ifdef HNSW_MEMORY
elog(INFO, "memory: %zu MB", buildstate->graph->memoryUsed / (1024 * 1024));
#endif

CreateMetaPage(buildstate);
CreateGraphPages(buildstate);
WriteNeighborTuples(buildstate);

buildstate->graph->flushed = true;
MemoryContextReset(buildstate->graphCtx);
}

可以看到, FlushPages 调用了创建页面和写入的子函数,我们再继续阅读这些函数。

10、CreateMetaPage

可以看到, CreateMetaPage 是一个非常简单的 新建页面 -> 修改页面内容 -> 保存页面 的逻辑,元数据是定长的,所以必定能在一页中存储,这里就不用考虑多页的问题了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
/*
* Create the metapage
*/
static void
CreateMetaPage(HnswBuildState * buildstate)
{
Relation index = buildstate->index;
ForkNumber forkNum = buildstate->forkNum;
Buffer buf;
Page page;
HnswMetaPage metap;

buf = HnswNewBuffer(index, forkNum);
page = BufferGetPage(buf);
HnswInitPage(buf, page);

/* Set metapage data */
metap = HnswPageGetMeta(page);
metap->magicNumber = HNSW_MAGIC_NUMBER;
metap->version = HNSW_VERSION;
metap->dimensions = buildstate->dimensions;
metap->m = buildstate->m;
metap->efConstruction = buildstate->efConstruction;
metap->entryBlkno = InvalidBlockNumber;
metap->entryOffno = InvalidOffsetNumber;
metap->entryLevel = -1;
metap->insertPage = InvalidBlockNumber;
((PageHeader) page)->pd_lower =
((char *) metap + sizeof(HnswMetaPageData)) - (char *) page;

MarkBufferDirty(buf);
UnlockReleaseBuffer(buf);
}

11、CreateGraphPages

CreateGraphPages 函数中我们可以看到, pgvector 将向量数据和边连续存放,除非 向量太大 导致一页无法同时放下向量数据和邻居数据,否则都会保持元素和邻居在同一页:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
/*
* Create graph pages
*/
static void
CreateGraphPages(HnswBuildState * buildstate)
{
Relation index = buildstate->index;
ForkNumber forkNum = buildstate->forkNum;
Size maxSize;
HnswElementTuple etup;
HnswNeighborTuple ntup;
BlockNumber insertPage;
HnswElement entryPoint;
Buffer buf;
Page page;
HnswElementPtr iter = buildstate->graph->head;
char *base = buildstate->hnswarea;

/* Calculate sizes */
maxSize = HNSW_MAX_SIZE;

/* Allocate once */
etup = palloc0(HNSW_TUPLE_ALLOC_SIZE);
ntup = palloc0(HNSW_TUPLE_ALLOC_SIZE);

/* Prepare first page */
buf = HnswNewBuffer(index, forkNum);
page = BufferGetPage(buf);
HnswInitPage(buf, page);

while (!HnswPtrIsNull(base, iter))
{
HnswElement element = HnswPtrAccess(base, iter);
Size etupSize;
Size ntupSize;
Size combinedSize;
Pointer valuePtr = HnswPtrAccess(base, element->value);

/* Update iterator */
iter = element->next;

/* Zero memory for each element */
MemSet(etup, 0, HNSW_TUPLE_ALLOC_SIZE);

/* Calculate sizes */
etupSize = HNSW_ELEMENT_TUPLE_SIZE(VARSIZE_ANY(valuePtr)); // 向量数据长 + 其它信息长(如向量的层级,是否被删除),这里在理论上不需要在循环内部重新计算。
ntupSize = HNSW_NEIGHBOR_TUPLE_SIZE(element->level, buildstate->m);
combinedSize = etupSize + ntupSize + sizeof(ItemIdData); // combinedSize 是 向量tuple 和 邻居tuple 的总长

/* Initial size check */
if (etupSize > HNSW_TUPLE_ALLOC_SIZE)
ereport(ERROR,
(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
errmsg("index tuple too large")));

HnswSetElementTuple(base, etup, element); // 将内存中游离的 element 的具体内容拷贝到 etupSize 个字节长的 etup 中

// 如果单页的容量能够保证向量和其邻居放置于一页中,但是目前的页容量不足,直接分配新的页并将当前的 buf 和 page 替换成新的。
/* Keep element and neighbors on the same page if possible */
if (PageGetFreeSpace(page) < etupSize || (combinedSize <= maxSize && PageGetFreeSpace(page) < combinedSize))
HnswBuildAppendPage(index, &buf, &page, forkNum);

/* Calculate offsets */
element->blkno = BufferGetBlockNumber(buf);
element->offno = OffsetNumberNext(PageGetMaxOffsetNumber(page)); // 获取当前页面的最大的
if (combinedSize <= maxSize) // 能在同一页放下,直接
{
element->neighborPage = element->blkno;
element->neighborOffno = OffsetNumberNext(element->offno);
}
else
{
element->neighborPage = element->blkno + 1;
element->neighborOffno = FirstOffsetNumber;
}

ItemPointerSet(&etup->neighbortid, element->neighborPage, element->neighborOffno);

/* Add element */
if (PageAddItem(page, (Item) etup, etupSize, InvalidOffsetNumber, false, false) != element->offno)
elog(ERROR, "failed to add index item to \"%s\"", RelationGetRelationName(index));

/* Add new page if needed */
if (PageGetFreeSpace(page) < ntupSize)
HnswBuildAppendPage(index, &buf, &page, forkNum);

/* Add placeholder for neighbors */
if (PageAddItem(page, (Item) ntup, ntupSize, InvalidOffsetNumber, false, false) != element->neighborOffno)
elog(ERROR, "failed to add index item to \"%s\"", RelationGetRelationName(index));
}

insertPage = BufferGetBlockNumber(buf);

/* Commit */
MarkBufferDirty(buf);
UnlockReleaseBuffer(buf);

entryPoint = HnswPtrAccess(base, buildstate->graph->entryPoint);
HnswUpdateMetaPage(index, HNSW_UPDATE_ENTRY_ALWAYS, entryPoint, insertPage, forkNum, true);

pfree(etup);
pfree(ntup);
}

下图是 PGPage 结构,每个 tuple 都由头部的数据指针(ItemData 32bit [具体偏移量、数据状态、数据长度])和数据空间内具体的数据组成:

alt text

引自:https://zhmin.github.io/posts/postgresql-buffer-page/

索引项是如何与原表项关联起来的呢? HnswSetElementTuple 函数中设置了 heaptidsheaptids 是一个 PG 用于唯一标识某个数据项的指针:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/*
* Set element tuple, except for neighbor info
*/
void
HnswSetElementTuple(char *base, HnswElementTuple etup, HnswElement element)
{
Pointer valuePtr = HnswPtrAccess(base, element->value);

etup->type = HNSW_ELEMENT_TUPLE_TYPE;
etup->level = element->level;
etup->deleted = 0;
etup->version = element->version;
for (int i = 0; i < HNSW_HEAPTIDS; i++)
{
if (i < element->heaptidsLength)
etup->heaptids[i] = element->heaptids[i]; // 拷贝 TID
else
ItemPointerSetInvalid(&etup->heaptids[i]); // 设置为无效
}
memcpy(&etup->data, valuePtr, VARSIZE_ANY(valuePtr)); // 复制向量数据
}

可以看到,页面即将存储的元组 etuptid 列表来自于 element 。那么 elementtid 列表来自哪里呢?我们回过头看,之前阅读过但没注意到的 InsertTuple 函数中设置了插入元素的 heaptid

1
2
3
4
5
6
7
8
/* Ok, we can proceed to allocate the element */
element = HnswInitElement(base, heaptid, buildstate->m, buildstate->ml, buildstate->maxLevel, allocator); // 初始化 element 时,将下层 scan 函数传递过来的 heaptid 赋值给了 element
valuePtr = HnswAlloc(allocator, valueSize);

......

/* Insert tuple */
InsertTupleInMemory(buildstate, element); // 最主要的耗时函数

读完了 tid 的具体由来,我们接着往下读。下面这段代码中, PageGetMaxOffsetNumber 获取了当前页面最后一个数据指针的偏移量,其实也就是页面中元组的数量。因此内存中的链表图节点 element 的 偏移量 element->offno 被设置成了其数据指针的位置,而其块号直接是当前块的块号。

而对于 邻居tuple ,如果它不需要单独放置于一页中,那么它的块内偏移量将直接为 向量tuple 的下一个数据指针的位置。如果向量数据太长,他们必定无法放置于同一页时,它必定会被分配一个新块,偏移量必定是第一个元组的位置。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
/*
* PageGetMaxOffsetNumber
* Returns the maximum offset number used by the given page.
* Since offset numbers are 1-based, this is also the number
* of items on the page.
*
* NOTE: if the page is not initialized (pd_lower == 0), we must
* return zero to ensure sane behavior.
*/
static inline OffsetNumber
PageGetMaxOffsetNumber(Page page)
{
PageHeader pageheader = (PageHeader) page;

if (pageheader->pd_lower <= SizeOfPageHeaderData)
return 0;
else
return (pageheader->pd_lower - SizeOfPageHeaderData) / sizeof(ItemIdData);
}

......

// CreateGraphPages 代码片段:

/* Calculate offsets */
element->blkno = BufferGetBlockNumber(buf);
element->offno = OffsetNumberNext(PageGetMaxOffsetNumber(page)); // 获取
if (combinedSize <= maxSize) // 能在同一页放下,直接
{
element->neighborPage = element->blkno;
element->neighborOffno = OffsetNumberNext(element->offno);
}
else
{
element->neighborPage = element->blkno + 1;
element->neighborOffno = FirstOffsetNumber;
}

ItemPointerSet 函数将 etup->neighbortid 设置为块号和偏移量的组合:

1
2
3
4
5
6
7
8
9
10
11
/*
* ItemPointerSet
* Sets a disk item pointer to the specified block and offset.
*/
static inline void
ItemPointerSet(ItemPointerData *pointer, BlockNumber blockNumber, OffsetNumber offNum)
{
Assert(PointerIsValid(pointer));
BlockIdSet(&pointer->ip_blkid, blockNumber);
pointer->ip_posid = offNum;
}

到这里,我们准备写入的 tuple 内容便都准备好了。再次回想一下之前都做了什么:

1、完成了 节点元组 etuple 的内容准备,从内存中的节点复制信息。包括:复制节点头数据(层级、状态);复制向量数据;复制 tid 数组。

2、计算好了每个节点的邻居数组 ntuple 所需的空间。准备好了执行插入的页面,etuple 和 ntuple 的总长比当前页面剩余空间大,但仍然能够放置于一个新页中时,自动新建页面。

3、内存中的节点 element 的内容被更新了,更新的主要内容是向量节点的 块号,偏移量,以及其邻居数组的块号和偏移量。同时,etuple 中的邻居 tuple 的 tid 也被更新完成了。

总的来说,我们确定了当前处理的节点和它邻居两个 tuple 的储存位置,接下来的步骤是进行写盘:

1
2
3
4
5
6
7
8
9
10
11
/* Add element */
if (PageAddItem(page, (Item) etup, etupSize, InvalidOffsetNumber, false, false) != element->offno)
elog(ERROR, "failed to add index item to \"%s\"", RelationGetRelationName(index));

/* Add new page if needed */
if (PageGetFreeSpace(page) < ntupSize)
HnswBuildAppendPage(index, &buf, &page, forkNum);

/* Add placeholder for neighbors */
if (PageAddItem(page, (Item) ntup, ntupSize, InvalidOffsetNumber, false, false) != element->neighborOffno)
elog(ERROR, "failed to add index item to \"%s\"", RelationGetRelationName(index));

写盘相对简单,我们就不深入阅读了。值得一提的是, Add new page if needed 这段逻辑,用于处理即使一个新页也无法同时容纳 etuple 和 ntuple 时分配页面存储 ntuple 的逻辑。

PG 数据库页布局 :http://www.postgres.cn/docs/9.3/storage-page-layout.html
Postgresql Page 结构:https://zhmin.github.io/posts/postgresql-buffer-page/

12、WriteNeighborTuples

CreateGraphPages 中,我们针对 graph 中的每个节点,都完成了它们对应的 etuple 的存储,同时完成了其邻居列表 ntuple 的存储。

但事实上, ntuple 还是空的。为什么 ntuple 为空呢?原因是:在没有完成每个 element 的持久化之前,我们无法确定每个 element 的具体块号和偏移量。而持久化存储中,邻居列表需要存储的是 etupletid

所以,我们还需要额外地遍历一边 graph,获取内存中的每个 elementneighbors 数组所指向的 elementtid ,写入到 element 对应的 ntuple 中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
/*
* Set neighbor tuple
*/
void
HnswSetNeighborTuple(char *base, HnswNeighborTuple ntup, HnswElement e, int m)
{
int idx = 0;

ntup->type = HNSW_NEIGHBOR_TUPLE_TYPE;

for (int lc = e->level; lc >= 0; lc--)
{
HnswNeighborArray *neighbors = HnswGetNeighbors(base, e, lc);
int lm = HnswGetLayerM(m, lc);

for (int i = 0; i < lm; i++)
{
ItemPointer indextid = &ntup->indextids[idx++];

if (i < neighbors->length)
{
HnswCandidate *hc = &neighbors->items[i];
HnswElement hce = HnswPtrAccess(base, hc->element);

ItemPointerSet(indextid, hce->blkno, hce->offno);
}
else
ItemPointerSetInvalid(indextid);
}
}

ntup->count = idx;
ntup->version = e->version;
}

/*
* Write neighbor tuples
*/
static void
WriteNeighborTuples(HnswBuildState * buildstate)
{
Relation index = buildstate->index;
ForkNumber forkNum = buildstate->forkNum;
int m = buildstate->m;
HnswElementPtr iter = buildstate->graph->head;
char *base = buildstate->hnswarea;
HnswNeighborTuple ntup;

/* Allocate once */
ntup = palloc0(HNSW_TUPLE_ALLOC_SIZE);

while (!HnswPtrIsNull(base, iter))
{
HnswElement element = HnswPtrAccess(base, iter);
Buffer buf;
Page page;
Size ntupSize = HNSW_NEIGHBOR_TUPLE_SIZE(element->level, m);

/* Update iterator */
iter = element->next;

/* Zero memory for each element */
MemSet(ntup, 0, HNSW_TUPLE_ALLOC_SIZE);

/* Can take a while, so ensure we can interrupt */
/* Needs to be called when no buffer locks are held */
CHECK_FOR_INTERRUPTS();

buf = ReadBufferExtended(index, forkNum, element->neighborPage, RBM_NORMAL, NULL);
LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
page = BufferGetPage(buf);

HnswSetNeighborTuple(base, ntup, element, m);

if (!PageIndexTupleOverwrite(page, element->neighborOffno, (Item) ntup, ntupSize))
elog(ERROR, "failed to add index item to \"%s\"", RelationGetRelationName(index));

/* Commit */
MarkBufferDirty(buf);
UnlockReleaseBuffer(buf);
}

pfree(ntup);
}

尾声

到这里为止,我们读完了 非并行构建 + 内存充沛 下的 HNSW 构建的全部源码,同时阅读了一部分并行约束。

在这个过程中,我们拓展阅读了 PostgreSQLIndex 架构轻量锁页面结构 ……

对我而言,这时我第一次自行阅读大型工程中的源码,也是我第一次阅读 C 语言的代码。深刻感受到了 C++ 带来的便利性,以及 PG 代码量的庞大,体会到了开源社区的力量。

后续如果有精力,我还将阅读并解读 并行内存不足 时更复杂的构建逻辑。