PostgreSQL-ReadBuffer源码阅读

写在前面

本次阅读选用了 17.0.0 版本的 PG ,

alt text

首先看一张图,在缓存命中率极低的情况下,函数栈调用大概是上面的火焰图。我们先按照这个调用栈进行阅读。

源码阅读

1、ReadBuffer_common

ReadBuffer_common 函数几乎是 ReadBuffer 函数的全部调用子栈,所以先从他看起:

下面的是 ReadBufferMode mode 这个 读类型参数的具体枚举类型:

alt text

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
/*
* ReadBuffer_common -- common logic for all ReadBuffer variants
*
* smgr is required, rel is optional unless using P_NEW.
*/
static pg_attribute_always_inline Buffer
ReadBuffer_common(Relation rel, SMgrRelation smgr, char smgr_persistence,
ForkNumber forkNum,
BlockNumber blockNum, ReadBufferMode mode,
BufferAccessStrategy strategy)
{
ReadBuffersOperation operation;
Buffer buffer;
int flags;

/*
* Backward compatibility path, most code should use ExtendBufferedRel()
* instead, as acquiring the extension lock inside ExtendBufferedRel()
* scales a lot better.
*/
if (unlikely(blockNum == P_NEW))
{
uint32 flags = EB_SKIP_EXTENSION_LOCK;

/*
* Since no-one else can be looking at the page contents yet, there is
* no difference between an exclusive lock and a cleanup-strength
* lock.
*/
if (mode == RBM_ZERO_AND_LOCK || mode == RBM_ZERO_AND_CLEANUP_LOCK)
flags |= EB_LOCK_FIRST;

return ExtendBufferedRel(BMR_REL(rel), forkNum, strategy, flags);
}

if (unlikely(mode == RBM_ZERO_AND_CLEANUP_LOCK ||
mode == RBM_ZERO_AND_LOCK))
{
bool found;

buffer = PinBufferForBlock(rel, smgr, smgr_persistence,
forkNum, blockNum, strategy, &found);
ZeroAndLockBuffer(buffer, mode, found);
return buffer;
}

if (mode == RBM_ZERO_ON_ERROR)
flags = READ_BUFFERS_ZERO_ON_ERROR;
else
flags = 0;
operation.smgr = smgr;
operation.rel = rel;
operation.smgr_persistence = smgr_persistence;
operation.forknum = forkNum;
operation.strategy = strategy;
if (StartReadBuffer(&operation,
&buffer,
blockNum,
flags))
WaitReadBuffers(&operation);

return buffer;
}

对于上面的全部的 ReadBuffer_common 代码,这是全部参数的含义:

Relation rel:表示数据库中的一个关系(表或索引)。这是一个指向关系描述符的指针,用于标识要读取的关系。

SMgrRelation smgr:表示存储管理器关系。它是一个指向存储管理器描述符的指针,用于标识存储管理器中的关系。

char smgr_persistence:表示关系的持久性。它是一个字符值,用于指示关系是永久的(persistent)、临时的(temporary)还是未记录的(unlogged)。

ForkNumber forkNum:表示关系的文件类型 。

BlockNumber blockNum:表示要读取的块号。它是一个无符号整数,用于指定关系中的具体块。

ReadBufferMode mode:表示读取缓冲区的模式。它是一个枚举类型,用于指定读取操作的模式,例如普通读取、清零并加锁等。

BufferAccessStrategy strategy:表示缓冲区访问策略。它是一个指向缓冲区访问策略描述符的指针,用于指定读取缓冲区时使用的策略,例如顺序扫描、随机访问等。

ForkNumber 的具体枚举如下(摘自 https://zhmin.github.io/posts/postgresql-storage-interface/ ):

alt text

我们首先阅读两个分支返回语句:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/*
* Backward compatibility path, most code should use ExtendBufferedRel()
* instead, as acquiring the extension lock inside ExtendBufferedRel()
* scales a lot better.
*/
if (unlikely(blockNum == P_NEW))
{
uint32 flags = EB_SKIP_EXTENSION_LOCK;

/*
* Since no-one else can be looking at the page contents yet, there is
* no difference between an exclusive lock and a cleanup-strength
* lock.
*/
if (mode == RBM_ZERO_AND_LOCK || mode == RBM_ZERO_AND_CLEANUP_LOCK)
flags |= EB_LOCK_FIRST;

return ExtendBufferedRel(BMR_REL(rel), forkNum, strategy, flags);
}

unlikely 是 PG 定义的分支预测优化宏:

alt text

unlikely 的含义是 blockNum == P_NEW 这条语句为 false 的情况更多。正如注释中所说,这个获取新块的语句是一个向后兼容的选择,新的功能应该弃用该函数,它的功能是拓展一块 Buffer 并返回新的 buffer

对于下面这块语句,其主要作用是返回一个读取失败的空块:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
if (unlikely(mode == RBM_ZERO_AND_CLEANUP_LOCK ||
mode == RBM_ZERO_AND_LOCK))
{
bool found;

// 获取指定块的缓冲区,并将其固定(PinBufferForBlock)
buffer = PinBufferForBlock(rel, smgr, smgr_persistence,
forkNum, blockNum, strategy, &found);

// 对缓冲区进行清零和加锁操作(ZeroAndLockBuffer)
ZeroAndLockBuffer(buffer, mode, found);

// 返回处理后的缓冲区
return buffer;
}

读完了不重要的部分,接下来是核心语句:

1
2
3
4
5
6
7
8
9
10
operation.smgr = smgr;
operation.rel = rel;
operation.smgr_persistence = smgr_persistence;
operation.forknum = forkNum;
operation.strategy = strategy;
if (StartReadBuffer(&operation,
&buffer,
blockNum,
flags))
WaitReadBuffers(&operation);

这里的逻辑非常简单,如果 StartReadBuffer 返回了 ReadBuffer ,则启动 IO 等待。

2、StartReadBuffer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/*
* Single block version of the StartReadBuffers(). This might save a few
* instructions when called from another translation unit, because it is
* specialized for nblocks == 1.
*/
bool
StartReadBuffer(ReadBuffersOperation *operation,
Buffer *buffer,
BlockNumber blocknum,
int flags)
{
int nblocks = 1;
bool result;

result = StartReadBuffersImpl(operation, buffer, blocknum, &nblocks, flags);
Assert(nblocks == 1); /* single block can't be short */

return result;
}

如上面的代码, StartReadBuffer 函数是一个读取单块的函数,内部调用了 StartReadBuffersImpl 函数。

3、StartReadBuffersImpl

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
static pg_attribute_always_inline bool
StartReadBuffersImpl(ReadBuffersOperation *operation,
Buffer *buffers,
BlockNumber blockNum,
int *nblocks,
int flags)
{
int actual_nblocks = *nblocks;
int io_buffers_len = 0;

Assert(*nblocks > 0);
Assert(*nblocks <= MAX_IO_COMBINE_LIMIT);

for (int i = 0; i < actual_nblocks; ++i)
{
bool found;

buffers[i] = PinBufferForBlock(operation->rel,
operation->smgr,
operation->smgr_persistence,
operation->forknum,
blockNum + i,
operation->strategy,
&found);

if (found)
{
/*
* 一旦找到命中缓冲区,就终止读取操作。可能是单个缓冲区命中,
* 也可能是可读范围之后的命中。我们不希望创建多个可读范围,
* 因此在此停止。
*/
actual_nblocks = i + 1;
break;
}
else
{
/* 扩展可读范围以覆盖此块。 */
io_buffers_len++;
}
}
*nblocks = actual_nblocks;

if (likely(io_buffers_len == 0))
return false;

/* 填充 I/O 所需的信息。 */
operation->buffers = buffers;
operation->blocknum = blockNum;
operation->flags = flags;
operation->nblocks = actual_nblocks;
operation->io_buffers_len = io_buffers_len;

if (flags & READ_BUFFERS_ISSUE_ADVICE)
{
/*
* 理论上,我们应该只在上面的 PinBufferForBlock() 需要分配新缓冲区时才执行此操作。
* 这样,如果在 WaitReadBuffers() 之前对相同的块调用了两次 StartReadBuffers(),
* 只有第一次会发出建议。这将更好地模拟真正的异步 I/O,它只会启动一次 I/O,
* 但为了简单起见,这里没有这样做。还要注意,以下调用如果跨越段边界,
* 实际上可能会发出两个建议调用;在真正的异步版本中,我们可能会选择在这种情况下
* 一次只处理一个实际的 I/O。
*/
smgrprefetch(operation->smgr,
operation->forknum,
blockNum,
operation->io_buffers_len);
}

/* 表示应调用 WaitReadBuffers()。 */
return true;
}

对于 StartReadBuffersImpl 函数,其参数主要为以下含义:

ReadBuffersOperation *operation:表示读取缓冲区的操作结构体,包含了读取操作所需的信息。
Buffer *buffers:表示缓冲区数组,用于存储读取的缓冲区。
BlockNumber blockNum:表示要读取的起始块号。
int *nblocks:表示要读取的块数,函数会更新这个值。
int flags:表示读取操作的标志,用于控制读取行为。

它将尝试从输入的初始块号开始,顺序锁定 blockNum 个块。函数的核心是 PinBufferForBlock :

4、PinBufferForBlock

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
/*
* Pin a buffer for a given block. *foundPtr is set to true if the block was
* already present, or false if more work is required to either read it in or
* zero it.
*/
static pg_attribute_always_inline Buffer
PinBufferForBlock(Relation rel,
SMgrRelation smgr,
char smgr_persistence,
ForkNumber forkNum,
BlockNumber blockNum,
BufferAccessStrategy strategy,
bool *foundPtr)
{
BufferDesc *bufHdr;
IOContext io_context;
IOObject io_object;
char persistence;

Assert(blockNum != P_NEW);

/*
* If there is no Relation it usually implies recovery and thus permanent,
* but we take an argument because CreateAndCopyRelationData can reach us
* with only an SMgrRelation for an unlogged relation that we don't want
* to flag with BM_PERMANENT.
*/
if (rel)
persistence = rel->rd_rel->relpersistence;
else if (smgr_persistence == 0)
persistence = RELPERSISTENCE_PERMANENT;
else
persistence = smgr_persistence;

if (persistence == RELPERSISTENCE_TEMP)
{
io_context = IOCONTEXT_NORMAL;
io_object = IOOBJECT_TEMP_RELATION;
}
else
{
io_context = IOContextForStrategy(strategy);
io_object = IOOBJECT_RELATION;
}

TRACE_POSTGRESQL_BUFFER_READ_START(forkNum, blockNum,
smgr->smgr_rlocator.locator.spcOid,
smgr->smgr_rlocator.locator.dbOid,
smgr->smgr_rlocator.locator.relNumber,
smgr->smgr_rlocator.backend);

if (persistence == RELPERSISTENCE_TEMP)
{
bufHdr = LocalBufferAlloc(smgr, forkNum, blockNum, foundPtr);
if (*foundPtr)
pgBufferUsage.local_blks_hit++;
}
else
{
bufHdr = BufferAlloc(smgr, persistence, forkNum, blockNum,
strategy, foundPtr, io_context);
if (*foundPtr)
pgBufferUsage.shared_blks_hit++;
}
if (rel)
{
/*
* While pgBufferUsage's "read" counter isn't bumped unless we reach
* WaitReadBuffers() (so, not for hits, and not for buffers that are
* zeroed instead), the per-relation stats always count them.
*/
pgstat_count_buffer_read(rel);
if (*foundPtr)
pgstat_count_buffer_hit(rel);
}
if (*foundPtr)
{
VacuumPageHit++;
pgstat_count_io_op(io_object, io_context, IOOP_HIT);
if (VacuumCostActive)
VacuumCostBalance += VacuumCostPageHit;

TRACE_POSTGRESQL_BUFFER_READ_DONE(forkNum, blockNum,
smgr->smgr_rlocator.locator.spcOid,
smgr->smgr_rlocator.locator.dbOid,
smgr->smgr_rlocator.locator.relNumber,
smgr->smgr_rlocator.backend,
true);
}

return BufferDescriptorGetBuffer(bufHdr);
}

PinBufferForBlock 可以分为三部分:

1、获取持久化的具体模式
2、根据持久化模式设置 IO 上下文
3、检查目标块是否已经在 Buffer 的同时,尝试分配 Buffer,我们主要关注 BufferAlloc 。因为火焰图中主要的函数栈均为 BufferAlloc

alt text

5、BufferAlloc

BufferAlloc 将修改 foundPtr 的值来标识目标 Block 是否已经被加载。根据 foundPtr 的值 ,为各种统计增加对应的计数,如 pgstat_count_buffer_hit 增加缓存命中率。这里我们需要深入阅读 BufferAlloc :

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20

/*
* BufferAlloc -- subroutine for PinBufferForBlock. Handles lookup of a shared
* buffer. If no buffer exists already, selects a replacement victim and
* evicts the old page, but does NOT read in new page.
*
* "strategy" can be a buffer replacement strategy object, or NULL for
* the default strategy. The selected buffer's usage_count is advanced when
* using the default strategy, but otherwise possibly not (see PinBuffer).
*
* The returned buffer is pinned and is already marked as holding the
* desired page. If it already did have the desired page, *foundPtr is
* set true. Otherwise, *foundPtr is set false.
*
* io_context is passed as an output parameter to avoid calling
* IOContextForStrategy() when there is a shared buffers hit and no IO
* statistics need be captured.
*
* No locks are held either at entry or exit.
*/

上面是 BufferAlloc 函数的注释,我们翻译一下就是,BufferAlloc 用于处理对于 shared buffer 的查找。如果没有已经存在的 buffer ,使用驱逐策略替换一个旧的 buffer ,但是在这里并不执行 读页 操作,也就是说,如果所需数据不在内存中, IO 并不在这里启动。

接着我们再阅读代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
/* Make sure we will have room to remember the buffer pin */
ResourceOwnerEnlarge(CurrentResourceOwner);
ReservePrivateRefCountEntry();

/* create a tag so we can lookup the buffer */
InitBufferTag(&newTag, &smgr->smgr_rlocator.locator, forkNum, blockNum);

/* determine its hash code and partition lock ID */
newHash = BufTableHashCode(&newTag);
newPartitionLock = BufMappingPartitionLock(newHash);

/* see if the block is in the buffer pool already */
LWLockAcquire(newPartitionLock, LW_SHARED);
existing_buf_id = BufTableLookup(&newTag, newHash);

我们一眼就可以看到,这部分的核心是 existing_buf_id = BufTableLookup(&newTag, newHash); 。该语句从 BufTable 中,尝试查找输入的 forkNum + blockNum 对应的的 buffer 块,来确定不重复缓存相同块。 BufTableLookup 函数的定义如下:

alt text

从命名上看, hash_search_with_hash_value 是一个通过 value 查询 hash 对的函数,我们再继续阅读:

alt text

从注释中我们可以看到,PG 实现的 hash map 还是遵循了传统的 拉链法解决碰撞问题 的方案。这里也是通过 hash_initial_lookup 直接定位到碰撞盒,然后拿到碰撞盒的链表。然后遍历链表查找是否有目标,如果找到了,将 foundPtr 置为 true 。我们再去阅读 hash_initial_lookup ,发现确实是这样:

alt text

在 hash 查找过程中,我们可以看到, BufTableLookup 函数默认使用了 SharedBufHash 作为 hash 表:

alt text

可以看到 SharedBufHash 是一个上下文无关的全局共享结构,这应当触发我们的肌肉记忆,对共享临界区内容的访问,是有可能出现多线程并发问题的。带着这个问题,我们再回过头看 BufferAlloc :

alt text

alt text

上图中可以看到,这里其实是直接拿 hashcode 进行了一遍粗过滤,拿到 newPartitionLock ,针对可能 hash 冲突的部分 Partition 进行加锁操作。这么做可以避免过大的锁粒度导致的性能下降。到这里,前面的 hash 部分我们就读完了,

“ /* Make sure we will have room to remember the buffer pin */
ResourceOwnerEnlarge(CurrentResourceOwner);
ReservePrivateRefCountEntry();”

这两句是后端进程私有内存管理的函数,我们先暂且不读。重点关注对缓存的处理策略:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
if (existing_buf_id >= 0)												// 缓存命中
{
BufferDesc *buf;
bool valid;

/*
* Found it. Now, pin the buffer so no one can steal it from the
* buffer pool, and check to see if the correct data has been loaded
* into the buffer.
*/
buf = GetBufferDescriptor(existing_buf_id);

valid = PinBuffer(buf, strategy);

/* Can release the mapping lock as soon as we've pinned it */
LWLockRelease(newPartitionLock);

*foundPtr = true;

if (!valid)
{
/*
* We can only get here if (a) someone else is still reading in
* the page, (b) a previous read attempt failed, or (c) someone
* called StartReadBuffers() but not yet WaitReadBuffers().
*/
*foundPtr = false;
}

return buf;
}

上面的代码是缓存命中的处理策略, existing_buf_id >= 0 也就是找到了存储目标块的 buffer 号,BufferDesc 结构体的定义如下:

alt text

总地来说,缓存命中的后续处理是分三步走:

1、尝试 pin 住 buffer (PinBuffer)
2、解锁 newPartitionLock3
3、检查 pinbuffer 的结果,按照结果设置 foundPtr 的值(正如注释中所说,pin 返回 false 只有三种情况: 其他进程在读页;之前的一个 read 操作失败了; 其他进程完成了 StartReadBuffers 但是还没有执行 WaitReadBuffers 函数)

6、PinBuffer

我们接着来阅读 PinBuffer ,尝试理解 为什么上述三种情况 会导致&应当导致 valid 为 false;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/*
* PinBuffer -- make buffer unavailable for replacement.
*
* For the default access strategy, the buffer's usage_count is incremented
* when we first pin it; for other strategies we just make sure the usage_count
* isn't zero. (The idea of the latter is that we don't want synchronized
* heap scans to inflate the count, but we need it to not be zero to discourage
* other backends from stealing buffers from our ring. As long as we cycle
* through the ring faster than the global clock-sweep cycles, buffers in
* our ring won't be chosen as victims for replacement by other backends.)
*
* This should be applied only to shared buffers, never local ones.
*
* Since buffers are pinned/unpinned very frequently, pin buffers without
* taking the buffer header lock; instead update the state variable in loop of
* CAS operations. Hopefully it's just a single CAS.
*
* Note that ResourceOwnerEnlarge() and ReservePrivateRefCountEntry()
* must have been done already.
*
* Returns true if buffer is BM_VALID, else false. This provision allows
* some callers to avoid an extra spinlock cycle.
*/

上面这段话是 PinBuffer 函数的注释, PinBuffer 用到了一个结构体,名为 PrivateRefCountEntry

下面是 PrivateRefCountEntry 的官方注释,简而言之, PrivateRefCountEntry 是后端私有的 buffer 的引用计数管理。

在每个后端程序中私有化管理一份 refcount 的好处是,避免单个后端程序频繁对共享 buffer 内容的修改,同时能够保证 pin-unpin 机制不被修改。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
/*
* Backend-Private refcount management:
*
* Each buffer also has a private refcount that keeps track of the number of
* times the buffer is pinned in the current process. This is so that the
* shared refcount needs to be modified only once if a buffer is pinned more
* than once by an individual backend. It's also used to check that no buffers
* are still pinned at the end of transactions and when exiting.
*
*
* To avoid - as we used to - requiring an array with NBuffers entries to keep
* track of local buffers, we use a small sequentially searched array
* (PrivateRefCountArray) and an overflow hash table (PrivateRefCountHash) to
* keep track of backend local pins.
*
* Until no more than REFCOUNT_ARRAY_ENTRIES buffers are pinned at once, all
* refcounts are kept track of in the array; after that, new array entries
* displace old ones into the hash table. That way a frequently used entry
* can't get "stuck" in the hashtable while infrequent ones clog the array.
*
* Note that in most scenarios the number of pinned buffers will not exceed
* REFCOUNT_ARRAY_ENTRIES.
*
*
* To enter a buffer into the refcount tracking mechanism first reserve a free
* entry using ReservePrivateRefCountEntry() and then later, if necessary,
* fill it with NewPrivateRefCountEntry(). That split lets us avoid doing
* memory allocations in NewPrivateRefCountEntry() which can be important
* because in some scenarios it's called with a spinlock held...
*/

对于 PrivateRefCountEntry , pg 采用了短查找链表 + hashtable 的组合方法进行查询加速。也就是函数 GetPrivateRefCountEntry 中的这两段语句:

alt text

alt text

优化思路是将 hashtable 中出现频率高的 PrivateRefCountEntry 移动到短列表 PrivateRefCountArray 中,从而减少 hashtable 的拉链法的遍历消耗。

读到这里,PinBuffer 中的 ref 便清晰地理解了,我们接着读 PinBuffer 。针对 ref == NULL 的情况,也就是后端本地的 Buffer refcount 为空,还没有给共享 buffer 区的对应 Buffer 建立其引用计数头, pg 采用了下面这段代码处理:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
if (ref == NULL)
{
uint32 buf_state;
uint32 old_buf_state;

ref = NewPrivateRefCountEntry(b);

old_buf_state = pg_atomic_read_u32(&buf->state);
for (;;)
{
if (old_buf_state & BM_LOCKED)
old_buf_state = WaitBufHdrUnlocked(buf);

buf_state = old_buf_state;

/* increase refcount */
buf_state += BUF_REFCOUNT_ONE;

if (strategy == NULL)
{
/* Default case: increase usagecount unless already max. */
if (BUF_STATE_GET_USAGECOUNT(buf_state) < BM_MAX_USAGE_COUNT)
buf_state += BUF_USAGECOUNT_ONE;
}
else
{
/*
* Ring buffers shouldn't evict others from pool. Thus we
* don't make usagecount more than 1.
*/
if (BUF_STATE_GET_USAGECOUNT(buf_state) == 0)
buf_state += BUF_USAGECOUNT_ONE;
}

if (pg_atomic_compare_exchange_u32(&buf->state, &old_buf_state,
buf_state))
{
result = (buf_state & BM_VALID) != 0;

/*
* Assume that we acquired a buffer pin for the purposes of
* Valgrind buffer client checks (even in !result case) to
* keep things simple. Buffers that are unsafe to access are
* not generally guaranteed to be marked undefined or
* non-accessible in any case.
*/
VALGRIND_MAKE_MEM_DEFINED(BufHdrGetBlock(buf), BLCKSZ);
break;
}
}
}

在上面的代码中, pg 使用原子操作来替代加锁的重量级消耗,理由在注解中说明了: pin 和 unpin 操作是非常频繁的,用锁来控制并发,会导致严重的性能下降(加、释放锁的过程消耗远大于 原子操作)。

因此,我们重点阅读这里的原子操作机制,主要涉及到两个原子操作:

1、pg_atomic_read_u32
2、pg_atomic_compare_exchange_u32

pg_atomic_read_u32 的核心思想是使用 volatile 关键字对数据访问行为进行修饰,如下图:

alt text

volatile 关键字的作用是人工地建立一层内存屏障,在这里主要是避免 CPU 寄存器作为缓存,造成的数据不一致问题,通过避免读 CPU cache ,而是每次都去内存重 load 该指针所指地址内容的方法避免多线程读取问题。

如下图,一个标准的读数据操作的流程是这样的:

alt text

代码 Read 为读取地址 0X0001 的数据。汇编后只会得道一个 LD 指令,LD 指令的含义为:Load from memory into register 。因此需要将数据加载到 CPU 的 L123级 cache 中,最终 load 到寄存器中。

假设上述 Read 代码后又出现了若干代码, CPU 有足够的寄存器来避免这部分代码中出现寄存器替换。

接着,再次进行一次 Read ,计算机的最优解是,直接从我们之前缓存数据的寄存器中拿数据,而不是再次 IO 。

但是,在多线程/非编译器控制的环境中,内存内容与 register 可能会出现冲突,比如另一个线程改变了内存中数据的值,但是由于我们该线程使用了最优思路读取内存数据(缓存到 register 中),所以无法读取到真实的内存数据。

这里,我们可以理解 pg_atomic_read_u32 这里是使用 volatile 的意义了,这里读取的数据是共享 bufferpool 中 buffer 的状态,该内容是一个多进程的临界内存资源

拿到了临界资源的数据后,线程对其做了一个本地备份 old_buf_state ,用于后续 CAS 使用。

接着,线程进入一个死循环,使用 CAS 机制更新 buf_state 。这里比较有趣的是 WaitBufHdrUnlockedpg_atomic_compare_exchange_u32,我们后面挨个读。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
for (;;)
{
if (old_buf_state & BM_LOCKED)
old_buf_state = WaitBufHdrUnlocked(buf); // 如果该 buffer 的状态是已经被 lock ,那么等待其释放

buf_state = old_buf_state;

/* increase refcount */
buf_state += BUF_REFCOUNT_ONE;

// 根据缓存区访问策略增加引用计数
if (strategy == NULL)
{
/* 默认情况:增加 usagecount,除非已经达到最大值。 */
if (BUF_STATE_GET_USAGECOUNT(buf_state) < BM_MAX_USAGE_COUNT)
buf_state += BUF_USAGECOUNT_ONE;
}
else
{
/*
* 环形缓冲区不应从池中驱逐其他缓冲区。因此我们不会将 usagecount 增加到超过 1。
*/
if (BUF_STATE_GET_USAGECOUNT(buf_state) == 0)
buf_state += BUF_USAGECOUNT_ONE;
}

if (pg_atomic_compare_exchange_u32(&buf->state, &old_buf_state,
buf_state))
{
result = (buf_state & BM_VALID) != 0;

/*
* Assume that we acquired a buffer pin for the purposes of
* Valgrind buffer client checks (even in !result case) to
* keep things simple. Buffers that are unsafe to access are
* not generally guaranteed to be marked undefined or
* non-accessible in any case.
*/
VALGRIND_MAKE_MEM_DEFINED(BufHdrGetBlock(buf), BLCKSZ);
break;
}
}

WaitBufHdrUnlocked 函数的目标是等待解锁,返回解锁后的 state , pg 通过自旋等待 + 重复 read 的方式进行实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
static uint32
WaitBufHdrUnlocked(BufferDesc *buf)
{
SpinDelayStatus delayStatus;
uint32 buf_state;

// 初始化本地自旋延迟状态
init_local_spin_delay(&delayStatus);

// 读取缓冲区头的状态
buf_state = pg_atomic_read_u32(&buf->state);

// 如果缓冲区头被锁定,则进行自旋等待
while (buf_state & BM_LOCKED)
{
// 执行自旋延迟
perform_spin_delay(&delayStatus);
// 再次读取缓冲区头的状态
buf_state = pg_atomic_read_u32(&buf->state);
}

// 完成自旋延迟
finish_spin_delay(&delayStatus);

// 返回缓冲区头的最终状态
return buf_state;
}

pg_atomic_compare_exchange_u32 是 pg 自行实现的 CAS 函数:

alt text

从语义来理解,我们可以认为这里 pg 采用了一种轻量的自旋锁来实现 CAS 操作。更细致的解读我们需要进一步阅读 SpinLockAcquire 等函数再作说明。

读完了基本实现,我们开始思考业务,为什么上述三种情况 会导致&应当导致 valid 为 false; 也就是说,我们刚刚阅读的 PinBuffer 函数在什么情况下会导致 result 为 false?

1
2
result = (buf_state & BM_VALID) != 0;
result = (pg_atomic_read_u32(&buf->state) & BM_VALID) != 0;

可以看到,两个设置 result 的语句均为 buf_state 的比较函数,这里通过按位与计算判断缓冲区状态 buf_state 是否包含 BM_VALID 标志位,并将结果存储在 result 变量中。

alt text

上图是 buf_state 的状态字段枚举,可以看到 32 位的 uint32 中,从 22 - 31 位都用于标识 buffer 的操作状态。所以我们可以得知,如果我们尝试 pin 的 buffer 状态不是 BM_VALID ,这里就会设置 false 。

我找到了一个更详细的版本(摘自https://blog.csdn.net/qq_43899283/article/details/135995302):

alt text

所以我们的问题可以转化为,有什么情况会导致 buffer 的状态不是 BM_VALID 呢?我们再来回顾一下官方注解中给出的三种情况:

1、其他进程在读页

2、之前的一个 read 操作失败了

3、其他进程完成了 StartReadBuffers 但是还没有执行 WaitReadBuffers 函数

对于1,注意这里是进程而不是线程,其他进程读页的目的(read only or write?)难以通过加锁判断,因此 pg 直接选择隔离掉。
对于2,之前的 read 失败,说明块的内容还没有准备好,或者还不允许被 read 。
对于3,我们需要继续阅读代码才能得出这两个函数与 buffer 可用性之间的关系。

7、回顾 BufferAlloc

再回过头阅读这段代码,我们可以很清晰地明白两部分的作用:

alt text

1、锁定 hash 分区,接着在分区中查找目标块是否已经有对应的 buffer 来存储。

2、如果找到了存储目标块的 buffer ,再次尝试 pin 住该 buffer ,防止在本进程使用期间,这块 buffer 被其它的替换策略清空

3、检查 buffer 内的 block 内容是否立即可用,如果不可用需要进行对应的后续处理(WaitReadBuffers)

当目标块已经被缓存系统纳入到缓存中时,我们不确定目标块是否是最新的、可用的、被其他进程使用中的,所以我们主要面对的问题可以归类为临界区资源访问的多线程问题。

当目标块还没有被缓存时,我们需要从 buffer pool 中找到一个空闲的槽位(也就是一块空 buffer)来放置我们的 block 。由于 buffer pool 的总量固定,这里必定会涉及到两类算法:

1、页面替换算法(将**已有内容但没有被正在使用**的 block 从缓存中清空,给我们的目标块腾出空间)

2、临界资源管理算法(临界资源主要包括 hashtable 和 buffer header,这里其实就是一个简单的多线程问题)

我们来继续阅读缓存未命中时的处理方式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
//接下来是缓存未命中的处理方式:
/*
* Didn't find it in the buffer pool. We'll have to initialize a new
* buffer. Remember to unlock the mapping lock while doing the work.
*/
LWLockRelease(newPartitionLock); // 还记得我们上面为了查询 block 是否已经被加载时,锁定了一块 hashtable 的 partition 么?这里要解锁,因为我们一段很长时间的代码都不会使用它

/*
* Acquire a victim buffer. Somebody else might try to do the same, we
* don't hold any conflicting locks. If so we'll have to undo our work
* later.
*/
victim_buffer = GetVictimBuffer(strategy, io_context); // 使用驱逐策略,查找一块受害者 buffer
victim_buf_hdr = GetBufferDescriptor(victim_buffer - 1);

/*
* Try to make a hashtable entry for the buffer under its new tag. If
* somebody else inserted another buffer for the tag, we'll release the
* victim buffer we acquired and use the already inserted one.
*/
LWLockAcquire(newPartitionLock, LW_EXCLUSIVE); // 以排他模式独占 hash 分区锁,尝试插入新的hash
existing_buf_id = BufTableInsert(&newTag, newHash, victim_buf_hdr->buf_id);
if (existing_buf_id >= 0) // 如果在我们释放锁的这段时间内,有并发程序已经向 hashtable 插入了相同的 buffer 头
{
BufferDesc *existing_buf_hdr;
bool valid;

/*
* Got a collision. Someone has already done what we were about to do.
* We'll just handle this as if it were found in the buffer pool in
* the first place. First, give up the buffer we were planning to
* use.
*
* We could do this after releasing the partition lock, but then we'd
* have to call ResourceOwnerEnlarge() & ReservePrivateRefCountEntry()
* before acquiring the lock, for the rare case of such a collision.
*/
UnpinBuffer(victim_buf_hdr); // 释放掉我们这块准备替换的 受害者 buffer ,因为已经有进程完成了插入,我们不需要再插入一次了

/*
* The victim buffer we acquired previously is clean and unused, let
* it be found again quickly
*/
StrategyFreeBuffer(victim_buf_hdr); // 在尝试 GetVictimBuffer 时,buffer 内容已经被清空了,它必定是一个可以直接被使用的 buffer ,这里提高其优先级

/* remaining code should match code at top of routine */
// 下面的语句和缓存命中时相同
existing_buf_hdr = GetBufferDescriptor(existing_buf_id);

valid = PinBuffer(existing_buf_hdr, strategy);

/* Can release the mapping lock as soon as we've pinned it */
LWLockRelease(newPartitionLock);

*foundPtr = true;

if (!valid)
{
/*
* We can only get here if (a) someone else is still reading in
* the page, (b) a previous read attempt failed, or (c) someone
* called StartReadBuffers() but not yet WaitReadBuffers().
*/
*foundPtr = false;
}

return existing_buf_hdr;
}

// 到这里,我们是第一个尝试 load 该块的进程,因此需要同时修改受害者 buffer 对应的 buffer 头,来标识 buffer 内部 block 的信息
/*
* Need to lock the buffer header too in order to change its tag.
*/
victim_buf_state = LockBufHdr(victim_buf_hdr); // 锁定 buffer 头

/* some sanity checks while we hold the buffer header lock */
Assert(BUF_STATE_GET_REFCOUNT(victim_buf_state) == 1); // 检查引用计数是否为1
Assert(!(victim_buf_state & (BM_TAG_VALID | BM_VALID | BM_DIRTY | BM_IO_IN_PROGRESS))); // 保障 victim_buf 是一个新的没有被使用的 buffer

victim_buf_hdr->tag = newTag;

/*
* Make sure BM_PERMANENT is set for buffers that must be written at every
* checkpoint. Unlogged buffers only need to be written at shutdown
* checkpoints, except for their "init" forks, which need to be treated
* just like permanent relations.
*/
victim_buf_state |= BM_TAG_VALID | BUF_USAGECOUNT_ONE; // 设置状态为 BM_TAG_VALID , BUF_USAGECOUNT_ONE
if (relpersistence == RELPERSISTENCE_PERMANENT || forkNum == INIT_FORKNUM)
victim_buf_state |= BM_PERMANENT;

UnlockBufHdr(victim_buf_hdr, victim_buf_state); // CAS写入状态码并解锁 buffer 头

LWLockRelease(newPartitionLock); // 释放 hashtable 的区间锁

/*
* Buffer contents are currently invalid.
*/
*foundPtr = false; // 由于是新分配的内存区,必定还没有加载数据,这里直接返回 false

return victim_buf_hdr;

下面是 Buffer state 的设计:

alt text

读完了 BufferAlloc ,再来阅读火焰图,发现 GetVictimBuffer 耗时占比最严重,为什么呢?

alt text

带着疑问我们再继续阅读 GetVictimBuffer 的子图:

alt text

InvalidateVictimBufferStrategyGetBuffer 是耗时最严重的部分, StrategyGetBuffer 是通过指定的策略获得一块被驱逐的 buffer 块, InvalidateVictimBuffer 则是将对应的 buffer 头 和 hashtable 清空。

所以我们可以得出结论, GetVictimBuffer 函数中包含了查找空闲块的缓存替换策略,以及旧数据删除的操作,这两部分在缓存命中率低且缓存容量较小时,是主要的消耗。

到这里,基本的 BufferAlloc 我们就读完了,接下来我们返回去阅读上层的 StartReadBuffersImpl

8、回顾 StartReadBuffersImpl

StartReadBuffersImpl 是一个允许一次性加载多个连续的块的批量读 buffer 函数,传入的 actual_nblocks 会被更改成最长连续读块号。我们举一个例子来理解:

actual_nblocks 为 10 时,如果第 5 块被找到(found 为 true,块已经被缓存),但第 6 块没有被加载(即没有被找到),代码会按照以下逻辑执行:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
for (int i = 0; i < actual_nblocks; ++i)
{
bool found;

buffers[i] = PinBufferForBlock(operation->rel, // 缓存空闲的 buffer 号
operation->smgr,
operation->smgr_persistence,
operation->forknum,
blockNum + i,
operation->strategy,
&found);

if (found)
{
/*
* 一旦找到命中缓冲区,就终止读取操作。可能是单个缓冲区命中,
* 也可能是可读范围之后的命中。我们不希望创建多个可读范围,
* 因此在此停止。
*/
actual_nblocks = i + 1;
break;
}
else
{
/* 扩展可读范围以覆盖此块。 */
io_buffers_len++;
}
}
*nblocks = actual_nblocks; // 这里设置了真实的可读区域长度

if (likely(io_buffers_len == 0))
return false;

/* 填充 I/O 所需的信息。 */
operation->buffers = buffers;
operation->blocknum = blockNum;
operation->flags = flags;
operation->nblocks = actual_nblocks; // 可读范围长度
operation->io_buffers_len = io_buffers_len; // IO 长度

if (flags & READ_BUFFERS_ISSUE_ADVICE)
{
smgrprefetch(operation->smgr, // 预取
operation->forknum,
blockNum,
operation->io_buffers_len);
}

/* 表示应调用 WaitReadBuffers()。 */
return true;

代码遍历 actual_nblocks ,尝试固定每个块的缓冲区。调用 PinBufferForBlock 获取并固定块的缓冲区。当 i 为 4 时(即第 5 块), PinBufferForBlock 找到了现有的缓冲区(found 为 true)。
由于找到了命中缓冲区, actual_nblocks 被更新为 5(即 i + 1),并且循环终止。由于循环在找到第 5 块时已经终止,第 6 块及其后的块不会被处理。接着,将 *nblocks 更新为 actual_nblocks,即 5。如果 io_buffers_len 为 0,返回 false,表示不需要进行 I/O 操作。则,填充 I/O 所需的信息,并返回 true,表示需要调用 WaitReadBuffers() 进行后续处理。

为什么设置 批量读长度 我们赞且不谈,先去阅读另一块占比严重的函数—— WaitReadBuffers

alt text

9、WaitReadBuffers

我们把本地缓存部分删除,抽取函数的主要框架:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
void
WaitReadBuffers(ReadBuffersOperation *operation)
{

/* Find the range of the physical read we need to perform. */
nblocks = operation->io_buffers_len;
for (int i = 0; i < nblocks; ++i)
{
int io_buffers_len;
Buffer io_buffers[MAX_IO_COMBINE_LIMIT];
void *io_pages[MAX_IO_COMBINE_LIMIT];
instr_time io_start;
BlockNumber io_first_block;

if (!WaitReadBuffersCanStartIO(buffers[i], false))
{
continue;
}

/* We found a buffer that we need to read in. */
io_buffers[0] = buffers[i];
io_pages[0] = BufferGetBlock(buffers[i]);
io_first_block = blocknum + i;
io_buffers_len = 1;

while ((i + 1) < nblocks &&
WaitReadBuffersCanStartIO(buffers[i + 1], true))
{
/* Must be consecutive block numbers. */
Assert(BufferGetBlockNumber(buffers[i + 1]) ==
BufferGetBlockNumber(buffers[i]) + 1);

io_buffers[io_buffers_len] = buffers[++i];
io_pages[io_buffers_len++] = BufferGetBlock(buffers[i]);
}

smgrreadv(operation->smgr, forknum, io_first_block, io_pages, io_buffers_len);

/* Verify each block we read, and terminate the I/O. */
for (int j = 0; j < io_buffers_len; ++j)
{
BufferDesc *bufHdr;
Block bufBlock;

bufHdr = GetBufferDescriptor(io_buffers[j] - 1);
bufBlock = BufHdrGetBlock(bufHdr);

/* check for garbage data */
if (!PageIsVerifiedExtended((Page) bufBlock, io_first_block + j,
PIV_LOG_WARNING | PIV_REPORT_STAT))
{
ereport(ERROR);
}

/* Set BM_VALID, terminate IO, and wake up any waiters */
TerminateBufferIO(bufHdr, false, BM_VALID, true);
}
}
}

可以看到,函数的主要子函数是 WaitReadBuffersCanStartIO, TerminateBufferIO ,以及最耗时的 IO 函数 smgrreadv

WaitReadBuffersCanStartIO 的官方注释如下:

1
2
3
4
5
6
/*
* Skip this block if someone else has already completed it. If an
* I/O is already in progress in another backend, this will wait for
* the outcome: either done, or something went wrong and we will
* retry.
*/

其主要作用是防止同时进行同一块的 IO ,其实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
static inline bool
WaitReadBuffersCanStartIO(Buffer buffer, bool nowait)
{
if (BufferIsLocal(buffer))
{
BufferDesc *bufHdr = GetLocalBufferDescriptor(-buffer - 1);

return (pg_atomic_read_u32(&bufHdr->state) & BM_VALID) == 0;
}
else
return StartBufferIO(GetBufferDescriptor(buffer - 1), true, nowait);
}

/*
* StartBufferIO: 开始对缓冲区进行 I/O 操作
* (假设)
* 我的进程没有执行 I/O 操作
* 缓冲区已被固定
*
* 在某些情况下,可能会出现多个后端同时尝试对同一缓冲区进行 I/O 操作的竞争条件。
* 如果其他人已经开始对该缓冲区进行 I/O 操作,我们将阻塞在 I/O 条件变量上,直到他完成。
*
* 输入操作仅在缓冲区不是 BM_VALID 时尝试,
* 输出操作仅在缓冲区是 BM_VALID 且 BM_DIRTY 时尝试,
* 因此我们总是可以判断工作是否已经完成。
*
* 如果我们成功地将缓冲区标记为 I/O 繁忙,则返回 true,
* 如果其他人已经完成了工作,则返回 false。
*
* 如果 nowait 为 true,则我们不会等待其他后端完成 I/O 操作。
* 在这种情况下,false 表示 I/O 已经完成或仍在进行中。
* 这对于希望在更大的操作中执行 I/O 的调用者很有用,
* 而无需等待答案或区分原因。
*/
static bool
StartBufferIO(BufferDesc *buf, bool forInput, bool nowait)
{
uint32 buf_state;

ResourceOwnerEnlarge(CurrentResourceOwner);

for (;;)
{
buf_state = LockBufHdr(buf);

if (!(buf_state & BM_IO_IN_PROGRESS)) // 没有其他的进程正在 IO ,直接跳出循环
break;
UnlockBufHdr(buf, buf_state);
if (nowait) // 注释中说的很清楚,nowait is true 时,直接返回 false,
return false; // 其他程序正在进行IO,返回 false
WaitIO(buf); // 其他程序正在 IO ,等待其结束即可
}

// 到这里,必定没有这块 buffer 的其他 IOI ,检查 buffer 状态,判断是否需要进行 I/O 操作
// forInput指的是从磁盘读取到buffer,如果是读取操作,那么在执行 IO 前,buffer 必定不是 BM_VALID ,读完后才会改为 BM_VALID
if (forInput ? (buf_state & BM_VALID) : !(buf_state & BM_DIRTY))
{
// 如果其他人已经完成了 I/O 操作,返回 false
UnlockBufHdr(buf, buf_state);
return false;
}

buf_state |= BM_IO_IN_PROGRESS; // IO 没有完成且没有开始,设置标记状态为 IO中
UnlockBufHdr(buf, buf_state);

ResourceOwnerRememberBufferIO(CurrentResourceOwner,
BufferDescriptorGetBuffer(buf));

return true;
}

我们可以将 WaitReadBuffers 分为几个部分:

1、避免重复 IO 。
这部分的作用是检查本次准备 IO 的 buffer 是否已经被其他进程 IO 了,如果不需要 IO ( WaitReadBuffersCanStartIO 返回 false ),则直接跳过该块。

1
2
3
4
if (!WaitReadBuffersCanStartIO(buffers[i], false))
{
continue;
}

2、批量顺序 IO ,这部分的作用是尽可能高效地读取多个连续的块。它首先初始化第一个需要读取的缓冲区和块,然后尝试将相邻的块一次性读取到内存中。通过这种方式,可以减少 I/O 操作的次数,提高读取操作的效率。具体步骤包括:

1、初始化第一个需要读取的缓冲区和块。

2、检查下一个块是否可以开始 I/O 操作,并确保块是连续的。

3、将下一个缓冲区和块添加到 I/O 缓冲区和页数组中。

4、重复上述步骤,直到无法继续读取相邻的块为止。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/* We found a buffer that we need to read in. */
io_buffers[0] = buffers[i];
io_pages[0] = BufferGetBlock(buffers[i]);
io_first_block = blocknum + i;
io_buffers_len = 1;

/*
* 我们可以同时将多少个相邻的磁盘块分散读取到其他缓冲区中?
* 在这种情况下,如果我们看到一个 I/O 已经在进行中,我们不会等待,而是直接停止循环,即我们找到了末尾。
* 我们已经持有头块的 BM_IO_IN_PROGRESS 标志,所以我们应该尽快进行该 I/O 操作。
* 我们会在上面再次回到这个块。
*/
while ((i + 1) < nblocks &&
WaitReadBuffersCanStartIO(buffers[i + 1], true))
{
io_buffers[io_buffers_len] = buffers[++i];
io_pages[io_buffers_len++] = BufferGetBlock(buffers[i]);
}

3、启动 IO ,将连续的若干 page 从磁盘读取到对应的 buffer page 中。

到这里我们便能够理解之前的 连续读长度 的作用了,顺序读比随即读更高效,但多线程情况往往会出现被提前读的情况,因此检测最大顺序读入长度是有必要的。

1
smgrreadv(operation->smgr, forknum, io_first_block, io_pages, io_buffers_len);

4、检查我们刚刚拿到的块的内容是否都正确,然后取消 buffer 头的 BM_IO_IN_PROGRESS 状态。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
		/* Verify each block we read, and terminate the I/O. */
for (int j = 0; j < io_buffers_len; ++j)
{
BufferDesc *bufHdr;
Block bufBlock;

bufHdr = GetBufferDescriptor(io_buffers[j] - 1);
bufBlock = BufHdrGetBlock(bufHdr);

/* check for garbage data */
if (!PageIsVerifiedExtended((Page) bufBlock, io_first_block + j,
PIV_LOG_WARNING | PIV_REPORT_STAT))
{
ereport(ERROR);
}

/* Set BM_VALID, terminate IO, and wake up any waiters */
TerminateBufferIO(bufHdr, false, BM_VALID, true);
}

/*
* TerminateBufferIO: 释放我们正在进行 I/O 操作的缓冲区
* (假设)
* 我的进程正在为缓冲区执行 I/O 操作
* 缓冲区的 BM_IO_IN_PROGRESS 位已设置
* 缓冲区已被固定
*
* 如果 clear_dirty 为 true 且 BM_JUST_DIRTIED 未设置,我们清除缓冲区的 BM_DIRTY 标志。
* 这在终止成功写入时是合适的。检查 BM_JUST_DIRTIED 是必要的,以避免在我们写入时缓冲区被重新弄脏时将其标记为干净。
*
* set_flag_bits 将被 OR 到缓冲区的标志中。在失败情况下,它必须包括 BM_IO_ERROR。
* 对于成功完成,它可以是 0,或者如果我们刚刚完成读取页面,则可以是 BM_VALID。
*
* 如果 forget_owner 为 true,我们从当前资源所有者中释放缓冲区 I/O。
* (forget_owner=false 用于资源所有者本身正在被释放的情况)
*/
static void
TerminateBufferIO(BufferDesc *buf, bool clear_dirty, uint32 set_flag_bits,
bool forget_owner)
{
uint32 buf_state;

// 锁定缓冲区头以检查和修改其状态
buf_state = LockBufHdr(buf);

// 确保缓冲区的 BM_IO_IN_PROGRESS 标志已设置
Assert(buf_state & BM_IO_IN_PROGRESS);

// 清除 BM_IO_IN_PROGRESS 和 BM_IO_ERROR 标志
buf_state &= ~(BM_IO_IN_PROGRESS | BM_IO_ERROR);

// 如果 clear_dirty 为 true 且 BM_JUST_DIRTIED 未设置,清除 BM_DIRTY 和 BM_CHECKPOINT_NEEDED 标志
if (clear_dirty && !(buf_state & BM_JUST_DIRTIED))
buf_state &= ~(BM_DIRTY | BM_CHECKPOINT_NEEDED);

// 设置指定的标志位
buf_state |= set_flag_bits;

// 解锁缓冲区头,并更新其状态
UnlockBufHdr(buf, buf_state);

// 如果 forget_owner 为 true,从当前资源所有者中释放缓冲区 I/O
if (forget_owner)
ResourceOwnerForgetBufferIO(CurrentResourceOwner,
BufferDescriptorGetBuffer(buf));

// 广播条件变量,通知等待该缓冲区 I/O 操作完成的进程
ConditionVariableBroadcast(BufferDescriptorGetIOCV(buf));
}

写在后面

到这里为止,整个 ReadBuffer 我们就差不多读完了。读了这些代码,才感觉到 C 和 C++ 的区别。

我们还没有详细阅读 PG 的缓存替换算法是如何实现的,也没有详细地针对每个锁都一一分析其意义,留待后续有精力再继续深入吧。

下面是一些还不错的博客,可以拓展阅读:

https://developer.aliyun.com/article/1277706
https://zhmin.github.io/posts/postgresql-buffer-pool/
https://blog.csdn.net/qq_43899283/article/details/135995302