PostgreSQL源码学习更新数据#0
以一条update test set b = "bcd" where a = 123;的SQL语句为例,跟踪更新数据的代码逻辑。(PG版本为12.2)
删除数据主要的函数是heap_update。查看调用栈:
#0 heap_update (relation=0x7fbe842e9270, otid=0x7ffffcdf4eda, newtup=0x244f828, cid=0, crosscheck=0x0, wait=true, tmfd=0x7ffffcdf4df0,
lockmode=0x7ffffcdf4dec) at heapam.c:2898
#1 0x00000000004d3c47 in heapam_tuple_update (relation=0x7fbe842e9270,
otid=0x7ffffcdf4eda, slot=0x244f278, cid=0, snapshot=0x2418fa0,
crosscheck=0x0, wait=true, tmfd=0x7ffffcdf4df0, lockmode=0x7ffffcdf4dec,
update_indexes=0x7ffffcdf4deb) at heapam_handler.c:332
#2 0x00000000006dacfa in table_tuple_update (rel=0x7fbe842e9270,
otid=0x7ffffcdf4eda, slot=0x244f278, cid=0, snapshot=0x2418fa0,
crosscheck=0x0, wait=true, tmfd=0x7ffffcdf4df0, lockmode=0x7ffffcdf4dec,
update_indexes=0x7ffffcdf4deb)
at ../../../src/include/access/tableam.h:1275
#3 0x00000000006dcb0d in ExecUpdate (mtstate=0x244dd40,
tupleid=0x7ffffcdf4eda, oldtuple=0x0, slot=0x244f278, planSlot=0x244e5a0,
epqstate=0x244de38, estate=0x244d9c0, canSetTag=true)
at nodeModifyTable.c:1311
#4 0x00000000006ddfc4 in ExecModifyTable (pstate=0x244dd40)
at nodeModifyTable.c:2222
heap_update函数
//src/include/access/heapam.hextern TM_Result heap_update(Relation relation, ItemPointer otid,
HeapTuple newtup,
CommandId cid, Snapshot crosscheck, bool wait,
struct TM_FailureData *tmfd, LockTupleMode *lockmode);
//src/backend/access/heap/heapam.cTransactionId xid = GetCurrentTransactionId();
Assert(ItemPointerIsValid(otid));
/* 禁止在并行操作中update */
if (IsInParallelMode())
ereport(ERROR,
(errcode(ERRCODE_INVALID_TRANSACTION_STATE),
errmsg("cannot update tuples during a parallel operation")));
/* 为各种操作检查获取属性列表 */
hot_attrs = RelationGetIndexAttrBitmap(relation, INDEX_ATTR_BITMAP_ALL);
key_attrs = RelationGetIndexAttrBitmap(relation, INDEX_ATTR_BITMAP_KEY);
id_attrs = RelationGetIndexAttrBitmap(relation,
INDEX_ATTR_BITMAP_IDENTITY_KEY);
/* 获取旧元组的block,buffer,page */
block = ItemPointerGetBlockNumber(otid);
buffer = ReadBuffer(relation, block);
page = BufferGetPage(buffer);
interesting_attrs = NULL;
/* 若page已经满了,就不可以做HOT更新了 */
if (!PageIsFull(page))
{
interesting_attrs = bms_add_members(interesting_attrs, hot_attrs);
hot_attrs_checked = true;
}
interesting_attrs = bms_add_members(interesting_attrs, key_attrs);
interesting_attrs = bms_add_members(interesting_attrs, id_attrs);
/* 若整页可见,将visible map pin在内存 */
if (PageIsAllVisible(page))
visibilitymap_pin(relation, block, &vmbuffer);
/* 锁定buffer */
LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
lp = PageGetItemId(page, ItemPointerGetOffsetNumber(otid));
Assert(ItemIdIsNormal(lp));
/* 获取oldtup的信息 */
oldtup.t_tableOid = RelationGetRelid(relation);
oldtup.t_data = (HeapTupleHeader) PageGetItem(page, lp);
oldtup.t_len = ItemIdGetLength(lp);
oldtup.t_self = *otid;
/* 补充newtup的信息 */
newtup->t_tableOid = RelationGetRelid(relation);
/* 确定出update操作要修改的列 */
modified_attrs = HeapDetermineModifiedColumns(relation, interesting_attrs,
&oldtup, newtup);
/* 若不更新任何“键”列,那么可以使用较弱的锁类型 */
if (!bms_overlap(modified_attrs, key_attrs))
{
*lockmode = LockTupleNoKeyExclusive;
mxact_status = MultiXactStatusNoKeyUpdate;
key_intact = true;
/* 确保该事务不会成为其它更旧的组合事务的成员 */
MultiXactIdSetOldestMember();
}
else
{
*lockmode = LockTupleExclusive;
mxact_status = MultiXactStatusUpdate;
key_intact = false;
}
/* goto跳跃点l2 */
l2:
checked_lockers = false;
locker_remains = false;
result = HeapTupleSatisfiesUpdate(&oldtup, cid, buffer);
/* TM_BeingModified时wait应该为true */
Assert(result != TM_BeingModified || wait);
/* 旧元组已不可见 */
if (result == TM_Invisible)
{
UnlockReleaseBuffer(buffer);
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("attempted to update invisible tuple")));
}
/* 旧元组正在被修改 */
else if (result == TM_BeingModified && wait)
{
bool can_continue = false;
/* 解锁前拷贝一些信息出来 */
xwait = HeapTupleHeaderGetRawXmax(oldtup.t_data);
infomask = oldtup.t_data->t_infomask;
/* 若该元组上已有组合事务在共享锁 */
if (infomask & HEAP_XMAX_IS_MULTI)
{
bool current_is_member = false;
/* 当前事务请求的锁是否与已持有的锁冲突 */
if (DoesMultiXactIdConflict((MultiXactId) xwait, infomask,
*lockmode, ¤t_is_member))
{
/* 解锁buffer */
LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
/* 有需要的话获取元组锁 */
if (!current_is_member)
heap_acquire_tuplock(relation, &(oldtup.t_self), *lockmode,
LockWaitBlock, &have_tuple_lock);
/* 等待组合事务,唤醒后重新锁定buffer */
MultiXactIdWait((MultiXactId) xwait, mxact_status, infomask,
relation, &oldtup.t_self, XLTW_Update,
&remain);
checked_lockers = true;
locker_remains = remain != 0;
LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
/* 检查等待期间xmax是否变化或者元组被更新 */
if (xmax_infomask_changed(oldtup.t_data->t_infomask,
infomask) ||
!TransactionIdEquals(HeapTupleHeaderGetRawXmax(oldtup.t_data),
xwait))
goto l2;
}
/* 需要判断是否可以继续执行update(万一组合事务的其它事务进行了更新并提交了) */
if (!HEAP_XMAX_IS_LOCKED_ONLY(oldtup.t_data->t_infomask))
update_xact = HeapTupleGetUpdateXid(oldtup.t_data);
else
update_xact = InvalidTransactionId;
if (!TransactionIdIsValid(update_xact) ||
TransactionIdDidAbort(update_xact))
can_continue = true;
}
/* 不是组合事务,不需要等待其它事务的情况,这时我们也不需要用到元组锁了 */
else if (TransactionIdIsCurrentTransactionId(xwait))
{
checked_lockers = true;
locker_remains = true;
can_continue = true;
}
/* 若存在key-share锁,但是我们不需要更改key列,那么不需要等待 */
else if (HEAP_XMAX_IS_KEYSHR_LOCKED(infomask) && key_intact)
{
checked_lockers = true;
locker_remains = true;
can_continue = true;
}
/* 需要等待其它标准事务结束的情况 */
else
{
/* 解锁buffer,获取元组锁,等待,锁定buffer */
LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
heap_acquire_tuplock(relation, &(oldtup.t_self), *lockmode,
LockWaitBlock, &have_tuple_lock);
XactLockTableWait(xwait, relation, &oldtup.t_self,
XLTW_Update);
checked_lockers = true;
LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
/* 若等待期间xmax发生变化或元组被更新 */
if (xmax_infomask_changed(oldtup.t_data->t_infomask, infomask) ||
!TransactionIdEquals(xwait,
HeapTupleHeaderGetRawXmax(oldtup.t_data)))
goto l2;
/* 否则,更新关于xmax中止或提交的标志位 */
UpdateXmaxHintBits(oldtup.t_data, buffer, xwait);
if (oldtup.t_data->t_infomask & HEAP_XMAX_INVALID)
can_continue = true;
}
/* 都没问题,可以进行update */
if (can_continue)
result = TM_Ok;
/* 旧元组被更新了 */
else if (!ItemPointerEquals(&oldtup.t_self, &oldtup.t_data->t_ctid) ||
HeapTupleHeaderIndicatesMovedPartitions(oldtup.t_data))
result = TM_Updated;
/* 旧元组被删除了 */
else
result = TM_Deleted;
}
/* 对事务快照模式RI更新执行附加检查 */
if (crosscheck != InvalidSnapshot && result == TM_Ok)
{
if (!HeapTupleSatisfiesVisibility(&oldtup, crosscheck, buffer))
{
result = TM_Updated;
Assert(!ItemPointerEquals(&oldtup.t_self, &oldtup.t_data->t_ctid));
}
}
/* 无法执行update,填充TM_FailureData然后return */
if (result != TM_Ok)
{
Assert(result == TM_SelfModified ||
result == TM_Updated ||
result == TM_Deleted ||
result == TM_BeingModified);
Assert(!(oldtup.t_data->t_infomask & HEAP_XMAX_INVALID));
Assert(result != TM_Updated ||
!ItemPointerEquals(&oldtup.t_self, &oldtup.t_data->t_ctid));
tmfd->ctid = oldtup.t_data->t_ctid;
tmfd->xmax = HeapTupleHeaderGetUpdateXid(oldtup.t_data);
if (result == TM_SelfModified)
tmfd->cmax = HeapTupleHeaderGetCmax(oldtup.t_data);
else
tmfd->cmax = InvalidCommandId;
UnlockReleaseBuffer(buffer);
if (have_tuple_lock)
UnlockTupleTuplock(relation, &(oldtup.t_self), *lockmode);
if (vmbuffer != InvalidBuffer)
ReleaseBuffer(vmbuffer);
bms_free(hot_attrs);
bms_free(key_attrs);
bms_free(id_attrs);
bms_free(modified_attrs);
bms_free(interesting_attrs);
return result;
}
/* 若刚才没有pin visible map,但是在等待锁期间page被设置了全可见,
* 那么需要重新锁定buffer并将visible map pin到内存,并返回跳跃点继续执行 */
if (vmbuffer == InvalidBuffer && PageIsAllVisible(page))
{
LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
visibilitymap_pin(relation, block, &vmbuffer);
LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
goto l2;
}
/* 计算新的xmax和infomask */
compute_new_xmax_infomask(HeapTupleHeaderGetRawXmax(oldtup.t_data),
oldtup.t_data->t_infomask,
oldtup.t_data->t_infomask2,
xid, *lockmode, true,
&xmax_old_tuple, &infomask_old_tuple,
&infomask2_old_tuple);
/* 为新元组设置xmax */
if ((oldtup.t_data->t_infomask & HEAP_XMAX_INVALID) ||
HEAP_LOCKED_UPGRADED(oldtup.t_data->t_infomask) ||
(checked_lockers && !locker_remains))
xmax_new_tuple = InvalidTransactionId;
else
xmax_new_tuple = HeapTupleHeaderGetRawXmax(oldtup.t_data);
/* 设置新元组与xmax相关的infomask信息 */
if (!TransactionIdIsValid(xmax_new_tuple))
{
infomask_new_tuple = HEAP_XMAX_INVALID;
infomask2_new_tuple = 0;
}
else
{
if (oldtup.t_data->t_infomask & HEAP_XMAX_IS_MULTI)
{
GetMultiXactIdHintBits(xmax_new_tuple, &infomask_new_tuple,
&infomask2_new_tuple);
}
else
{
infomask_new_tuple = HEAP_XMAX_KEYSHR_LOCK | HEAP_XMAX_LOCK_ONLY;
infomask2_new_tuple = 0;
}
}
/* 填充新元组的头信息 */
newtup->t_data->t_infomask &= ~(HEAP_XACT_MASK);
newtup->t_data->t_infomask2 &= ~(HEAP2_XACT_MASK);
HeapTupleHeaderSetXmin(newtup->t_data, xid);
HeapTupleHeaderSetCmin(newtup->t_data, cid);
newtup->t_data->t_infomask |= HEAP_UPDATED | infomask_new_tuple;
newtup->t_data->t_infomask2 |= infomask2_new_tuple;
HeapTupleHeaderSetXmax(newtup->t_data, xmax_new_tuple);
/* 有需要的话,使用combo cid */
HeapTupleHeaderAdjustCmax(oldtup.t_data, &cid, &iscombo);
/* 判断是否需要toast */
if (relation->rd_rel->relkind != RELKIND_RELATION &&
relation->rd_rel->relkind != RELKIND_MATVIEW)
{
/* toast表不应该递归的toast */
Assert(!HeapTupleHasExternal(&oldtup));
Assert(!HeapTupleHasExternal(newtup));
need_toast = false;
}
else
need_toast = (HeapTupleHasExternal(&oldtup) ||
HeapTupleHasExternal(newtup) ||
newtup->t_len > TOAST_TUPLE_THRESHOLD);
pagefree = PageGetHeapFreeSpace(page);
newtupsize = MAXALIGN(newtup->t_len);
/* 需要toast或者当前page存储空间不够了 */
if (need_toast || newtupsize > pagefree)
{
bool cleared_all_frozen = false;
/* 在此toast或跨页的情况下,我们需要解锁页面时锁定元组,防止被其它事务修改 */
compute_new_xmax_infomask(HeapTupleHeaderGetRawXmax(oldtup.t_data),
oldtup.t_data->t_infomask,
oldtup.t_data->t_infomask2,
xid, *lockmode, false,
&xmax_lock_old_tuple, &infomask_lock_old_tuple,
&infomask2_lock_old_tuple);
Assert(HEAP_XMAX_IS_LOCKED_ONLY(infomask_lock_old_tuple));
START_CRIT_SECTION();
/* 清除过时的可见性标志 */
oldtup.t_data->t_infomask &= ~(HEAP_XMAX_BITS | HEAP_MOVED);
oldtup.t_data->t_infomask2 &= ~HEAP_KEYS_UPDATED;
HeapTupleClearHotUpdated(&oldtup);
/* 存储关于事务更新元组的信息 */
Assert(TransactionIdIsValid(xmax_lock_old_tuple));
HeapTupleHeaderSetXmax(oldtup.t_data, xmax_lock_old_tuple);
oldtup.t_data->t_infomask |= infomask_lock_old_tuple;
oldtup.t_data->t_infomask2 |= infomask2_lock_old_tuple;
HeapTupleHeaderSetCmax(oldtup.t_data, cid, iscombo);
/* 暂时让它看起来没有更新,但是我们已经锁定了它 */
oldtup.t_data->t_ctid = oldtup.t_self;
/* 有需要的话清除visible map中的所有冻结位 */
if (PageIsAllVisible(BufferGetPage(buffer)) &&
visibilitymap_clear(relation, block, vmbuffer,
VISIBILITYMAP_ALL_FROZEN))
cleared_all_frozen = true;
/* 标记buffer为脏 */
MarkBufferDirty(buffer);
/* 虽然整个过程还没完,但是一会要解锁buffer,
* 所以当前修改的部分仍然需要记录wal日志 */
if (RelationNeedsWAL(relation))
{
xl_heap_lock xlrec;
XLogRecPtr recptr;
XLogBeginInsert();
XLogRegisterBuffer(0, buffer, REGBUF_STANDARD);
xlrec.offnum = ItemPointerGetOffsetNumber(&oldtup.t_self);
xlrec.locking_xid = xmax_lock_old_tuple;
xlrec.infobits_set = compute_infobits(oldtup.t_data->t_infomask,
oldtup.t_data->t_infomask2);
xlrec.flags =
cleared_all_frozen ? XLH_LOCK_ALL_FROZEN_CLEARED : 0;
XLogRegisterData((char *) &xlrec, SizeOfHeapLock);
recptr = XLogInsert(RM_HEAP_ID, XLOG_HEAP_LOCK);
PageSetLSN(page, recptr);
}
END_CRIT_SECTION();
/* 解锁buffer */
LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
/* 有需要的话执行toast的操作 */
if (need_toast)
{
heaptup = toast_insert_or_update(relation, newtup, &oldtup, 0);
newtupsize = MAXALIGN(heaptup->t_len);
}
else
heaptup = newtup;
if (newtupsize > pagefree)
{
newbuf = RelationGetBufferForTuple(relation, heaptup->t_len,
buffer, 0, NULL,
&vmbuffer_new, &vmbuffer);
}
else
{
/* 重新锁定buffer后需要重新检查一次空间还够用不 */
LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
pagefree = PageGetHeapFreeSpace(page)
if (newtupsize > pagefree)
{
LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
newbuf = RelationGetBufferForTuple(relation, heaptup->t_len,
buffer, 0, NULL,
&vmbuffer_new, &vmbuffer);
}
else
{
newbuf = buffer;
}
}
}
/* 不需要toast且当前page存的下 */
else
{
newbuf = buffer;
heaptup = newtup;
}
/* 检查序列化冲突 */
CheckForSerializableConflictIn(relation, &oldtup, buffer);
/* 若新旧元组放到同一个page,可以考虑使用HOT */
if (newbuf == buffer)
{
if (hot_attrs_checked && !bms_overlap(modified_attrs, hot_attrs))
use_hot_update = true;
}
/* 否则新元组存在了另一个page,那么旧元组可以被清理掉 */
else
{
PageSetFull(page);
}
/* 获得元组标识副本 */
old_key_tuple = ExtractReplicaIdentity(relation, &oldtup,
bms_overlap(modified_attrs, id_attrs),
&old_key_copied);
START_CRIT_SECTION();
/* 如果这个事务提交,旧元组很快就会变为dead状态。在这里给page设置
* 一个标志,当xid低于OldestXmin时,此page会成为被修剪的候选页 */
PageSetPrunable(page, xid);
/* 使用HOT的流程 */
if (use_hot_update)
{
HeapTupleSetHotUpdated(&oldtup);
HeapTupleSetHeapOnly(heaptup);
HeapTupleSetHeapOnly(newtup);
}
/* 不使用HOT */
else
{
HeapTupleClearHotUpdated(&oldtup);
HeapTupleClearHeapOnly(heaptup);
HeapTupleClearHeapOnly(newtup);
}
/* 插入新元组,此函数在heap_insert中见过 */
RelationPutHeapTuple(relation, newbuf, heaptup, false);
/* 清除过时的可见性标志(可能是我们刚刚在上面设置的) */
oldtup.t_data->t_infomask &= ~(HEAP_XMAX_BITS | HEAP_MOVED);
oldtup.t_data->t_infomask2 &= ~HEAP_KEYS_UPDATED;
/* 存储事务更新元组相关的信息 */
Assert(TransactionIdIsValid(xmax_old_tuple));
HeapTupleHeaderSetXmax(oldtup.t_data, xmax_old_tuple);
oldtup.t_data->t_infomask |= infomask_old_tuple;
oldtup.t_data->t_infomask2 |= infomask2_old_tuple;
HeapTupleHeaderSetCmax(oldtup.t_data, cid, iscombo);
/* 在旧元组的t_ctid中记录新元组的地址 */
oldtup.t_data->t_ctid = heaptup->t_self;
/* 清除PD_ALL_VISIVLE标志,重置visible map的标志位 */
if (PageIsAllVisible(BufferGetPage(buffer)))
{
all_visible_cleared = true;
PageClearAllVisible(BufferGetPage(buffer));
visibilitymap_clear(relation, BufferGetBlockNumber(buffer),
vmbuffer, VISIBILITYMAP_VALID_BITS);
}
if (newbuf != buffer && PageIsAllVisible(BufferGetPage(newbuf)))
{
all_visible_cleared_new = true;
PageClearAllVisible(BufferGetPage(newbuf));
visibilitymap_clear(relation, BufferGetBlockNumber(newbuf),
vmbuffer_new, VISIBILITYMAP_VALID_BITS);
}
/* 标记脏块 */
if (newbuf != buffer)
MarkBufferDirty(newbuf);
MarkBufferDirty(buffer);
/* xlog相关流程 */
if (RelationNeedsWAL(relation))
{
/* 若是系统表元组,需要发送combocids来正确解码,
* 因此记录一条XLOG_HEAP2_NEW_CID记录 */
if (RelationIsAccessibleInLogicalDecoding(relation))
{
log_heap_new_cid(relation, &oldtup);
log_heap_new_cid(relation, heaptup);
}
recptr = log_heap_update(relation, buffer,
newbuf, &oldtup, heaptup,
old_key_tuple,
all_visible_cleared,
all_visible_cleared_new);
if (newbuf != buffer)
{
PageSetLSN(BufferGetPage(newbuf), recptr);
}
PageSetLSN(BufferGetPage(buffer), recptr);
}
END_CRIT_SECTION();
/* 解锁buffer */
if (newbuf != buffer)
LockBuffer(newbuf, BUFFER_LOCK_UNLOCK);
LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
/* Cache同步 */
CacheInvalidateHeapTuple(relation, &oldtup, heaptup);
/* 释放buffer */
if (newbuf != buffer)
ReleaseBuffer(newbuf);
ReleaseBuffer(buffer);
if (BufferIsValid(vmbuffer_new))
ReleaseBuffer(vmbuffer_new);
if (BufferIsValid(vmbuffer))
ReleaseBuffer(vmbuffer);
/* 释放元组锁 */
if (have_tuple_lock)
UnlockTupleTuplock(relation, &(oldtup.t_self), *lockmode);
/* 记录统计信息 */
pgstat_count_heap_update(relation, use_hot_update);
/* 释放拷贝的副本 */
if (heaptup != newtup)
{
newtup->t_self = heaptup->t_self;
heap_freetuple(heaptup);
}
if (old_key_tuple != NULL && old_key_copied)
heap_freetuple(old_key_tuple);
bms_free(hot_attrs);
bms_free(key_attrs);
bms_free(id_attrs);
bms_free(modified_attrs);
bms_free(interesting_attrs);
return TM_Ok;
以上是 PostgreSQL源码学习更新数据#0 的全部内容, 来源链接: utcz.com/z/534094.html