/* * ReadBuffer_common -- common logic for all ReadBuffer variants * * smgr is required, rel is optional unless using P_NEW. */ static pg_attribute_always_inline Buffer ReadBuffer_common(Relation rel, SMgrRelation smgr, char smgr_persistence, ForkNumber forkNum, BlockNumber blockNum, ReadBufferMode mode, BufferAccessStrategy strategy) { ReadBuffersOperation operation; Buffer buffer; int flags;
/* * Backward compatibility path, most code should use ExtendBufferedRel() * instead, as acquiring the extension lock inside ExtendBufferedRel() * scales a lot better. */ if (unlikely(blockNum == P_NEW)) { uint32 flags = EB_SKIP_EXTENSION_LOCK;
/* * Since no-one else can be looking at the page contents yet, there is * no difference between an exclusive lock and a cleanup-strength * lock. */ if (mode == RBM_ZERO_AND_LOCK || mode == RBM_ZERO_AND_CLEANUP_LOCK) flags |= EB_LOCK_FIRST;
/* * Backward compatibility path, most code should use ExtendBufferedRel() * instead, as acquiring the extension lock inside ExtendBufferedRel() * scales a lot better. */ if (unlikely(blockNum == P_NEW)) { uint32 flags = EB_SKIP_EXTENSION_LOCK;
/* * Since no-one else can be looking at the page contents yet, there is * no difference between an exclusive lock and a cleanup-strength * lock. */ if (mode == RBM_ZERO_AND_LOCK || mode == RBM_ZERO_AND_CLEANUP_LOCK) flags |= EB_LOCK_FIRST;
/* * Single block version of the StartReadBuffers(). This might save a few * instructions when called from another translation unit, because it is * specialized for nblocks == 1. */ bool StartReadBuffer(ReadBuffersOperation *operation, Buffer *buffer, BlockNumber blocknum, int flags) { int nblocks = 1; bool result;
result = StartReadBuffersImpl(operation, buffer, blocknum, &nblocks, flags); Assert(nblocks == 1); /* single block can't be short */
ReadBuffersOperation *operation:表示读取缓冲区的操作结构体,包含了读取操作所需的信息。
Buffer *buffers:表示缓冲区数组,用于存储读取的缓冲区。
BlockNumber blockNum:表示要读取的起始块号。
int *nblocks:表示要读取的块数,函数会更新这个值。
int flags:表示读取操作的标志,用于控制读取行为。
/* * Pin a buffer for a given block. *foundPtr is set to true if the block was * already present, or false if more work is required to either read it in or * zero it. */ static pg_attribute_always_inline Buffer PinBufferForBlock(Relation rel, SMgrRelation smgr, char smgr_persistence, ForkNumber forkNum, BlockNumber blockNum, BufferAccessStrategy strategy, bool *foundPtr) { BufferDesc *bufHdr; IOContext io_context; IOObject io_object; char persistence;
Assert(blockNum != P_NEW);
/* * If there is no Relation it usually implies recovery and thus permanent, * but we take an argument because CreateAndCopyRelationData can reach us * with only an SMgrRelation for an unlogged relation that we don't want * to flag with BM_PERMANENT. */ if (rel) persistence = rel->rd_rel->relpersistence; elseif (smgr_persistence == 0) persistence = RELPERSISTENCE_PERMANENT; else persistence = smgr_persistence;
if (persistence == RELPERSISTENCE_TEMP) { bufHdr = LocalBufferAlloc(smgr, forkNum, blockNum, foundPtr); if (*foundPtr) pgBufferUsage.local_blks_hit++; } else { bufHdr = BufferAlloc(smgr, persistence, forkNum, blockNum, strategy, foundPtr, io_context); if (*foundPtr) pgBufferUsage.shared_blks_hit++; } if (rel) { /* * While pgBufferUsage's "read" counter isn't bumped unless we reach * WaitReadBuffers() (so, not for hits, and not for buffers that are * zeroed instead), the per-relation stats always count them. */ pgstat_count_buffer_read(rel); if (*foundPtr) pgstat_count_buffer_hit(rel); } if (*foundPtr) { VacuumPageHit++; pgstat_count_io_op(io_object, io_context, IOOP_HIT); if (VacuumCostActive) VacuumCostBalance += VacuumCostPageHit;
/* * BufferAlloc -- subroutine for PinBufferForBlock. Handles lookup of a shared * buffer. If no buffer exists already, selects a replacement victim and * evicts the old page, but does NOT read in new page. * * "strategy" can be a buffer replacement strategy object, or NULL for * the default strategy. The selected buffer's usage_count is advanced when * using the default strategy, but otherwise possibly not (see PinBuffer). * * The returned buffer is pinned and is already marked as holding the * desired page. If it already did have the desired page, *foundPtr is * set true. Otherwise, *foundPtr is set false. * * io_context is passed as an output parameter to avoid calling * IOContextForStrategy() when there is a shared buffers hit and no IO * statistics need be captured. * * No locks are held either at entry or exit. */
/* * Found it. Now, pin the buffer so no one can steal it from the * buffer pool, and check to see if the correct data has been loaded * into the buffer. */ buf = GetBufferDescriptor(existing_buf_id);
valid = PinBuffer(buf, strategy);
/* Can release the mapping lock as soon as we've pinned it */ LWLockRelease(newPartitionLock);
*foundPtr = true;
if (!valid) { /* * We can only get here if (a) someone else is still reading in * the page, (b) a previous read attempt failed, or (c) someone * called StartReadBuffers() but not yet WaitReadBuffers(). */ *foundPtr = false; }
/* * PinBuffer -- make buffer unavailable for replacement. * * For the default access strategy, the buffer's usage_count is incremented * when we first pin it; for other strategies we just make sure the usage_count * isn't zero. (The idea of the latter is that we don't want synchronized * heap scans to inflate the count, but we need it to not be zero to discourage * other backends from stealing buffers from our ring. As long as we cycle * through the ring faster than the global clock-sweep cycles, buffers in * our ring won't be chosen as victims for replacement by other backends.) * * This should be applied only to shared buffers, never local ones. * * Since buffers are pinned/unpinned very frequently, pin buffers without * taking the buffer header lock; instead update the state variable in loop of * CAS operations. Hopefully it's just a single CAS. * * Note that ResourceOwnerEnlarge() and ReservePrivateRefCountEntry() * must have been done already. * * Returns true if buffer is BM_VALID, else false. This provision allows * some callers to avoid an extra spinlock cycle. */
/* * Backend-Private refcount management: * * Each buffer also has a private refcount that keeps track of the number of * times the buffer is pinned in the current process. This is so that the * shared refcount needs to be modified only once if a buffer is pinned more * than once by an individual backend. It's also used to check that no buffers * are still pinned at the end of transactions and when exiting. * * * To avoid - as we used to - requiring an array with NBuffers entries to keep * track of local buffers, we use a small sequentially searched array * (PrivateRefCountArray) and an overflow hash table (PrivateRefCountHash) to * keep track of backend local pins. * * Until no more than REFCOUNT_ARRAY_ENTRIES buffers are pinned at once, all * refcounts are kept track of in the array; after that, new array entries * displace old ones into the hash table. That way a frequently used entry * can't get "stuck" in the hashtable while infrequent ones clog the array. * * Note that in most scenarios the number of pinned buffers will not exceed * REFCOUNT_ARRAY_ENTRIES. * * * To enter a buffer into the refcount tracking mechanism first reserve a free * entry using ReservePrivateRefCountEntry() and then later, if necessary, * fill it with NewPrivateRefCountEntry(). That split lets us avoid doing * memory allocations in NewPrivateRefCountEntry() which can be important * because in some scenarios it's called with a spinlock held... */
if (strategy == NULL) { /* Default case: increase usagecount unless already max. */ if (BUF_STATE_GET_USAGECOUNT(buf_state) < BM_MAX_USAGE_COUNT) buf_state += BUF_USAGECOUNT_ONE; } else { /* * Ring buffers shouldn't evict others from pool. Thus we * don't make usagecount more than 1. */ if (BUF_STATE_GET_USAGECOUNT(buf_state) == 0) buf_state += BUF_USAGECOUNT_ONE; }
if (pg_atomic_compare_exchange_u32(&buf->state, &old_buf_state, buf_state)) { result = (buf_state & BM_VALID) != 0;
/* * Assume that we acquired a buffer pin for the purposes of * Valgrind buffer client checks (even in !result case) to * keep things simple. Buffers that are unsafe to access are * not generally guaranteed to be marked undefined or * non-accessible in any case. */ VALGRIND_MAKE_MEM_DEFINED(BufHdrGetBlock(buf), BLCKSZ); break; } } }
if (pg_atomic_compare_exchange_u32(&buf->state, &old_buf_state, buf_state)) { result = (buf_state & BM_VALID) != 0;
/* * Assume that we acquired a buffer pin for the purposes of * Valgrind buffer client checks (even in !result case) to * keep things simple. Buffers that are unsafe to access are * not generally guaranteed to be marked undefined or * non-accessible in any case. */ VALGRIND_MAKE_MEM_DEFINED(BufHdrGetBlock(buf), BLCKSZ); break; } }
WaitBufHdrUnlocked 函数的目标是等待解锁,返回解锁后的 state , pg 通过自旋等待 + 重复 read 的方式进行实现:
//接下来是缓存未命中的处理方式: /* * Didn't find it in the buffer pool. We'll have to initialize a new * buffer. Remember to unlock the mapping lock while doing the work. */ LWLockRelease(newPartitionLock); // 还记得我们上面为了查询 block 是否已经被加载时,锁定了一块 hashtable 的 partition 么?这里要解锁,因为我们一段很长时间的代码都不会使用它
/* * Acquire a victim buffer. Somebody else might try to do the same, we * don't hold any conflicting locks. If so we'll have to undo our work * later. */ victim_buffer = GetVictimBuffer(strategy, io_context); // 使用驱逐策略,查找一块受害者 buffer victim_buf_hdr = GetBufferDescriptor(victim_buffer - 1);
/* * Try to make a hashtable entry for the buffer under its new tag. If * somebody else inserted another buffer for the tag, we'll release the * victim buffer we acquired and use the already inserted one. */ LWLockAcquire(newPartitionLock, LW_EXCLUSIVE); // 以排他模式独占 hash 分区锁,尝试插入新的hash existing_buf_id = BufTableInsert(&newTag, newHash, victim_buf_hdr->buf_id); if (existing_buf_id >= 0) // 如果在我们释放锁的这段时间内,有并发程序已经向 hashtable 插入了相同的 buffer 头 { BufferDesc *existing_buf_hdr; bool valid;
/* * Got a collision. Someone has already done what we were about to do. * We'll just handle this as if it were found in the buffer pool in * the first place. First, give up the buffer we were planning to * use. * * We could do this after releasing the partition lock, but then we'd * have to call ResourceOwnerEnlarge() & ReservePrivateRefCountEntry() * before acquiring the lock, for the rare case of such a collision. */ UnpinBuffer(victim_buf_hdr); // 释放掉我们这块准备替换的 受害者 buffer ,因为已经有进程完成了插入,我们不需要再插入一次了
/* * The victim buffer we acquired previously is clean and unused, let * it be found again quickly */ StrategyFreeBuffer(victim_buf_hdr); // 在尝试 GetVictimBuffer 时,buffer 内容已经被清空了,它必定是一个可以直接被使用的 buffer ,这里提高其优先级
/* remaining code should match code at top of routine */ // 下面的语句和缓存命中时相同 existing_buf_hdr = GetBufferDescriptor(existing_buf_id);
valid = PinBuffer(existing_buf_hdr, strategy);
/* Can release the mapping lock as soon as we've pinned it */ LWLockRelease(newPartitionLock);
*foundPtr = true;
if (!valid) { /* * We can only get here if (a) someone else is still reading in * the page, (b) a previous read attempt failed, or (c) someone * called StartReadBuffers() but not yet WaitReadBuffers(). */ *foundPtr = false; }
return existing_buf_hdr; }
// 到这里,我们是第一个尝试 load 该块的进程,因此需要同时修改受害者 buffer 对应的 buffer 头,来标识 buffer 内部 block 的信息 /* * Need to lock the buffer header too in order to change its tag. */ victim_buf_state = LockBufHdr(victim_buf_hdr); // 锁定 buffer 头
/* some sanity checks while we hold the buffer header lock */ Assert(BUF_STATE_GET_REFCOUNT(victim_buf_state) == 1); // 检查引用计数是否为1 Assert(!(victim_buf_state & (BM_TAG_VALID | BM_VALID | BM_DIRTY | BM_IO_IN_PROGRESS))); // 保障 victim_buf 是一个新的没有被使用的 buffer
victim_buf_hdr->tag = newTag;
/* * Make sure BM_PERMANENT is set for buffers that must be written at every * checkpoint. Unlogged buffers only need to be written at shutdown * checkpoints, except for their "init" forks, which need to be treated * just like permanent relations. */ victim_buf_state |= BM_TAG_VALID | BUF_USAGECOUNT_ONE; // 设置状态为 BM_TAG_VALID , BUF_USAGECOUNT_ONE if (relpersistence == RELPERSISTENCE_PERMANENT || forkNum == INIT_FORKNUM) victim_buf_state |= BM_PERMANENT;
/* Find the range of the physical read we need to perform. */ nblocks = operation->io_buffers_len; for (int i = 0; i < nblocks; ++i) { int io_buffers_len; Buffer io_buffers[MAX_IO_COMBINE_LIMIT]; void *io_pages[MAX_IO_COMBINE_LIMIT]; instr_time io_start; BlockNumber io_first_block;
if (!WaitReadBuffersCanStartIO(buffers[i], false)) { continue; }
/* We found a buffer that we need to read in. */ io_buffers[0] = buffers[i]; io_pages[0] = BufferGetBlock(buffers[i]); io_first_block = blocknum + i; io_buffers_len = 1;
while ((i + 1) < nblocks && WaitReadBuffersCanStartIO(buffers[i + 1], true)) { /* Must be consecutive block numbers. */ Assert(BufferGetBlockNumber(buffers[i + 1]) == BufferGetBlockNumber(buffers[i]) + 1);
/* * Skip this block if someone else has already completed it. If an * I/O is already in progress in another backend, this will wait for * the outcome: either done, or something went wrong and we will * retry. */
/* We found a buffer that we need to read in. */ io_buffers[0] = buffers[i]; io_pages[0] = BufferGetBlock(buffers[i]); io_first_block = blocknum + i; io_buffers_len = 1;