PostgreSQL源码学习更新数据#0

database

以一条update test set b = "bcd" where a = 123;的SQL语句为例,跟踪更新数据的代码逻辑。(PG版本为12.2)

删除数据主要的函数是heap_update。查看调用栈:

#0  heap_update (relation=0x7fbe842e9270, otid=0x7ffffcdf4eda, 

newtup=0x244f828, cid=0, crosscheck=0x0, wait=true, tmfd=0x7ffffcdf4df0,

lockmode=0x7ffffcdf4dec) at heapam.c:2898

#1 0x00000000004d3c47 in heapam_tuple_update (relation=0x7fbe842e9270,

otid=0x7ffffcdf4eda, slot=0x244f278, cid=0, snapshot=0x2418fa0,

crosscheck=0x0, wait=true, tmfd=0x7ffffcdf4df0, lockmode=0x7ffffcdf4dec,

update_indexes=0x7ffffcdf4deb) at heapam_handler.c:332

#2 0x00000000006dacfa in table_tuple_update (rel=0x7fbe842e9270,

otid=0x7ffffcdf4eda, slot=0x244f278, cid=0, snapshot=0x2418fa0,

crosscheck=0x0, wait=true, tmfd=0x7ffffcdf4df0, lockmode=0x7ffffcdf4dec,

update_indexes=0x7ffffcdf4deb)

at ../../../src/include/access/tableam.h:1275

#3 0x00000000006dcb0d in ExecUpdate (mtstate=0x244dd40,

tupleid=0x7ffffcdf4eda, oldtuple=0x0, slot=0x244f278, planSlot=0x244e5a0,

epqstate=0x244de38, estate=0x244d9c0, canSetTag=true)

at nodeModifyTable.c:1311

#4 0x00000000006ddfc4 in ExecModifyTable (pstate=0x244dd40)

at nodeModifyTable.c:2222

heap_update函数

//src/include/access/heapam.h

extern TM_Result heap_update(Relation relation, ItemPointer otid,

HeapTuple newtup,

CommandId cid, Snapshot crosscheck, bool wait,

struct TM_FailureData *tmfd, LockTupleMode *lockmode);

//src/backend/access/heap/heapam.c

TransactionId xid = GetCurrentTransactionId();

Assert(ItemPointerIsValid(otid));

/* 禁止在并行操作中update */

if (IsInParallelMode())

ereport(ERROR,

(errcode(ERRCODE_INVALID_TRANSACTION_STATE),

errmsg("cannot update tuples during a parallel operation")));

/* 为各种操作检查获取属性列表 */

hot_attrs = RelationGetIndexAttrBitmap(relation, INDEX_ATTR_BITMAP_ALL);

key_attrs = RelationGetIndexAttrBitmap(relation, INDEX_ATTR_BITMAP_KEY);

id_attrs = RelationGetIndexAttrBitmap(relation,

INDEX_ATTR_BITMAP_IDENTITY_KEY);

/* 获取旧元组的block,buffer,page */

block = ItemPointerGetBlockNumber(otid);

buffer = ReadBuffer(relation, block);

page = BufferGetPage(buffer);

interesting_attrs = NULL;

/* 若page已经满了,就不可以做HOT更新了 */

if (!PageIsFull(page))

{

interesting_attrs = bms_add_members(interesting_attrs, hot_attrs);

hot_attrs_checked = true;

}

interesting_attrs = bms_add_members(interesting_attrs, key_attrs);

interesting_attrs = bms_add_members(interesting_attrs, id_attrs);

/* 若整页可见,将visible map pin在内存 */

if (PageIsAllVisible(page))

visibilitymap_pin(relation, block, &vmbuffer);

/* 锁定buffer */

LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);

lp = PageGetItemId(page, ItemPointerGetOffsetNumber(otid));

Assert(ItemIdIsNormal(lp));

/* 获取oldtup的信息 */

oldtup.t_tableOid = RelationGetRelid(relation);

oldtup.t_data = (HeapTupleHeader) PageGetItem(page, lp);

oldtup.t_len = ItemIdGetLength(lp);

oldtup.t_self = *otid;

/* 补充newtup的信息 */

newtup->t_tableOid = RelationGetRelid(relation);

/* 确定出update操作要修改的列 */

modified_attrs = HeapDetermineModifiedColumns(relation, interesting_attrs,

&oldtup, newtup);

/* 若不更新任何“键”列,那么可以使用较弱的锁类型 */

if (!bms_overlap(modified_attrs, key_attrs))

{

*lockmode = LockTupleNoKeyExclusive;

mxact_status = MultiXactStatusNoKeyUpdate;

key_intact = true;

/* 确保该事务不会成为其它更旧的组合事务的成员 */

MultiXactIdSetOldestMember();

}

else

{

*lockmode = LockTupleExclusive;

mxact_status = MultiXactStatusUpdate;

key_intact = false;

}

/* goto跳跃点l2 */

l2:

checked_lockers = false;

locker_remains = false;

result = HeapTupleSatisfiesUpdate(&oldtup, cid, buffer);

/* TM_BeingModified时wait应该为true */

Assert(result != TM_BeingModified || wait);

/* 旧元组已不可见 */

if (result == TM_Invisible)

{

UnlockReleaseBuffer(buffer);

ereport(ERROR,

(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),

errmsg("attempted to update invisible tuple")));

}

/* 旧元组正在被修改 */

else if (result == TM_BeingModified && wait)

{

bool can_continue = false;

/* 解锁前拷贝一些信息出来 */

xwait = HeapTupleHeaderGetRawXmax(oldtup.t_data);

infomask = oldtup.t_data->t_infomask;

/* 若该元组上已有组合事务在共享锁 */

if (infomask & HEAP_XMAX_IS_MULTI)

{

bool current_is_member = false;

/* 当前事务请求的锁是否与已持有的锁冲突 */

if (DoesMultiXactIdConflict((MultiXactId) xwait, infomask,

*lockmode, &current_is_member))

{

/* 解锁buffer */

LockBuffer(buffer, BUFFER_LOCK_UNLOCK);

/* 有需要的话获取元组锁 */

if (!current_is_member)

heap_acquire_tuplock(relation, &(oldtup.t_self), *lockmode,

LockWaitBlock, &have_tuple_lock);

/* 等待组合事务,唤醒后重新锁定buffer */

MultiXactIdWait((MultiXactId) xwait, mxact_status, infomask,

relation, &oldtup.t_self, XLTW_Update,

&remain);

checked_lockers = true;

locker_remains = remain != 0;

LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);

/* 检查等待期间xmax是否变化或者元组被更新 */

if (xmax_infomask_changed(oldtup.t_data->t_infomask,

infomask) ||

!TransactionIdEquals(HeapTupleHeaderGetRawXmax(oldtup.t_data),

xwait))

goto l2;

}

/* 需要判断是否可以继续执行update(万一组合事务的其它事务进行了更新并提交了) */

if (!HEAP_XMAX_IS_LOCKED_ONLY(oldtup.t_data->t_infomask))

update_xact = HeapTupleGetUpdateXid(oldtup.t_data);

else

update_xact = InvalidTransactionId;

if (!TransactionIdIsValid(update_xact) ||

TransactionIdDidAbort(update_xact))

can_continue = true;

}

/* 不是组合事务,不需要等待其它事务的情况,这时我们也不需要用到元组锁了 */

else if (TransactionIdIsCurrentTransactionId(xwait))

{

checked_lockers = true;

locker_remains = true;

can_continue = true;

}

/* 若存在key-share锁,但是我们不需要更改key列,那么不需要等待 */

else if (HEAP_XMAX_IS_KEYSHR_LOCKED(infomask) && key_intact)

{

checked_lockers = true;

locker_remains = true;

can_continue = true;

}

/* 需要等待其它标准事务结束的情况 */

else

{

/* 解锁buffer,获取元组锁,等待,锁定buffer */

LockBuffer(buffer, BUFFER_LOCK_UNLOCK);

heap_acquire_tuplock(relation, &(oldtup.t_self), *lockmode,

LockWaitBlock, &have_tuple_lock);

XactLockTableWait(xwait, relation, &oldtup.t_self,

XLTW_Update);

checked_lockers = true;

LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);

/* 若等待期间xmax发生变化或元组被更新 */

if (xmax_infomask_changed(oldtup.t_data->t_infomask, infomask) ||

!TransactionIdEquals(xwait,

HeapTupleHeaderGetRawXmax(oldtup.t_data)))

goto l2;

/* 否则,更新关于xmax中止或提交的标志位 */

UpdateXmaxHintBits(oldtup.t_data, buffer, xwait);

if (oldtup.t_data->t_infomask & HEAP_XMAX_INVALID)

can_continue = true;

}

/* 都没问题,可以进行update */

if (can_continue)

result = TM_Ok;

/* 旧元组被更新了 */

else if (!ItemPointerEquals(&oldtup.t_self, &oldtup.t_data->t_ctid) ||

HeapTupleHeaderIndicatesMovedPartitions(oldtup.t_data))

result = TM_Updated;

/* 旧元组被删除了 */

else

result = TM_Deleted;

}

/* 对事务快照模式RI更新执行附加检查 */

if (crosscheck != InvalidSnapshot && result == TM_Ok)

{

if (!HeapTupleSatisfiesVisibility(&oldtup, crosscheck, buffer))

{

result = TM_Updated;

Assert(!ItemPointerEquals(&oldtup.t_self, &oldtup.t_data->t_ctid));

}

}

/* 无法执行update,填充TM_FailureData然后return */

if (result != TM_Ok)

{

Assert(result == TM_SelfModified ||

result == TM_Updated ||

result == TM_Deleted ||

result == TM_BeingModified);

Assert(!(oldtup.t_data->t_infomask & HEAP_XMAX_INVALID));

Assert(result != TM_Updated ||

!ItemPointerEquals(&oldtup.t_self, &oldtup.t_data->t_ctid));

tmfd->ctid = oldtup.t_data->t_ctid;

tmfd->xmax = HeapTupleHeaderGetUpdateXid(oldtup.t_data);

if (result == TM_SelfModified)

tmfd->cmax = HeapTupleHeaderGetCmax(oldtup.t_data);

else

tmfd->cmax = InvalidCommandId;

UnlockReleaseBuffer(buffer);

if (have_tuple_lock)

UnlockTupleTuplock(relation, &(oldtup.t_self), *lockmode);

if (vmbuffer != InvalidBuffer)

ReleaseBuffer(vmbuffer);

bms_free(hot_attrs);

bms_free(key_attrs);

bms_free(id_attrs);

bms_free(modified_attrs);

bms_free(interesting_attrs);

return result;

}

/* 若刚才没有pin visible map,但是在等待锁期间page被设置了全可见,

* 那么需要重新锁定buffer并将visible map pin到内存,并返回跳跃点继续执行 */

if (vmbuffer == InvalidBuffer && PageIsAllVisible(page))

{

LockBuffer(buffer, BUFFER_LOCK_UNLOCK);

visibilitymap_pin(relation, block, &vmbuffer);

LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);

goto l2;

}

/* 计算新的xmax和infomask */

compute_new_xmax_infomask(HeapTupleHeaderGetRawXmax(oldtup.t_data),

oldtup.t_data->t_infomask,

oldtup.t_data->t_infomask2,

xid, *lockmode, true,

&xmax_old_tuple, &infomask_old_tuple,

&infomask2_old_tuple);

/* 为新元组设置xmax */

if ((oldtup.t_data->t_infomask & HEAP_XMAX_INVALID) ||

HEAP_LOCKED_UPGRADED(oldtup.t_data->t_infomask) ||

(checked_lockers && !locker_remains))

xmax_new_tuple = InvalidTransactionId;

else

xmax_new_tuple = HeapTupleHeaderGetRawXmax(oldtup.t_data);

/* 设置新元组与xmax相关的infomask信息 */

if (!TransactionIdIsValid(xmax_new_tuple))

{

infomask_new_tuple = HEAP_XMAX_INVALID;

infomask2_new_tuple = 0;

}

else

{

if (oldtup.t_data->t_infomask & HEAP_XMAX_IS_MULTI)

{

GetMultiXactIdHintBits(xmax_new_tuple, &infomask_new_tuple,

&infomask2_new_tuple);

}

else

{

infomask_new_tuple = HEAP_XMAX_KEYSHR_LOCK | HEAP_XMAX_LOCK_ONLY;

infomask2_new_tuple = 0;

}

}

/* 填充新元组的头信息 */

newtup->t_data->t_infomask &= ~(HEAP_XACT_MASK);

newtup->t_data->t_infomask2 &= ~(HEAP2_XACT_MASK);

HeapTupleHeaderSetXmin(newtup->t_data, xid);

HeapTupleHeaderSetCmin(newtup->t_data, cid);

newtup->t_data->t_infomask |= HEAP_UPDATED | infomask_new_tuple;

newtup->t_data->t_infomask2 |= infomask2_new_tuple;

HeapTupleHeaderSetXmax(newtup->t_data, xmax_new_tuple);

/* 有需要的话,使用combo cid */

HeapTupleHeaderAdjustCmax(oldtup.t_data, &cid, &iscombo);

/* 判断是否需要toast */

if (relation->rd_rel->relkind != RELKIND_RELATION &&

relation->rd_rel->relkind != RELKIND_MATVIEW)

{

/* toast表不应该递归的toast */

Assert(!HeapTupleHasExternal(&oldtup));

Assert(!HeapTupleHasExternal(newtup));

need_toast = false;

}

else

need_toast = (HeapTupleHasExternal(&oldtup) ||

HeapTupleHasExternal(newtup) ||

newtup->t_len > TOAST_TUPLE_THRESHOLD);

pagefree = PageGetHeapFreeSpace(page);

newtupsize = MAXALIGN(newtup->t_len);

/* 需要toast或者当前page存储空间不够了 */

if (need_toast || newtupsize > pagefree)

{

bool cleared_all_frozen = false;

/* 在此toast或跨页的情况下,我们需要解锁页面时锁定元组,防止被其它事务修改 */

compute_new_xmax_infomask(HeapTupleHeaderGetRawXmax(oldtup.t_data),

oldtup.t_data->t_infomask,

oldtup.t_data->t_infomask2,

xid, *lockmode, false,

&xmax_lock_old_tuple, &infomask_lock_old_tuple,

&infomask2_lock_old_tuple);

Assert(HEAP_XMAX_IS_LOCKED_ONLY(infomask_lock_old_tuple));

START_CRIT_SECTION();

/* 清除过时的可见性标志 */

oldtup.t_data->t_infomask &= ~(HEAP_XMAX_BITS | HEAP_MOVED);

oldtup.t_data->t_infomask2 &= ~HEAP_KEYS_UPDATED;

HeapTupleClearHotUpdated(&oldtup);

/* 存储关于事务更新元组的信息 */

Assert(TransactionIdIsValid(xmax_lock_old_tuple));

HeapTupleHeaderSetXmax(oldtup.t_data, xmax_lock_old_tuple);

oldtup.t_data->t_infomask |= infomask_lock_old_tuple;

oldtup.t_data->t_infomask2 |= infomask2_lock_old_tuple;

HeapTupleHeaderSetCmax(oldtup.t_data, cid, iscombo);

/* 暂时让它看起来没有更新,但是我们已经锁定了它 */

oldtup.t_data->t_ctid = oldtup.t_self;

/* 有需要的话清除visible map中的所有冻结位 */

if (PageIsAllVisible(BufferGetPage(buffer)) &&

visibilitymap_clear(relation, block, vmbuffer,

VISIBILITYMAP_ALL_FROZEN))

cleared_all_frozen = true;

/* 标记buffer为脏 */

MarkBufferDirty(buffer);

/* 虽然整个过程还没完,但是一会要解锁buffer,

* 所以当前修改的部分仍然需要记录wal日志 */

if (RelationNeedsWAL(relation))

{

xl_heap_lock xlrec;

XLogRecPtr recptr;

XLogBeginInsert();

XLogRegisterBuffer(0, buffer, REGBUF_STANDARD);

xlrec.offnum = ItemPointerGetOffsetNumber(&oldtup.t_self);

xlrec.locking_xid = xmax_lock_old_tuple;

xlrec.infobits_set = compute_infobits(oldtup.t_data->t_infomask,

oldtup.t_data->t_infomask2);

xlrec.flags =

cleared_all_frozen ? XLH_LOCK_ALL_FROZEN_CLEARED : 0;

XLogRegisterData((char *) &xlrec, SizeOfHeapLock);

recptr = XLogInsert(RM_HEAP_ID, XLOG_HEAP_LOCK);

PageSetLSN(page, recptr);

}

END_CRIT_SECTION();

/* 解锁buffer */

LockBuffer(buffer, BUFFER_LOCK_UNLOCK);

/* 有需要的话执行toast的操作 */

if (need_toast)

{

heaptup = toast_insert_or_update(relation, newtup, &oldtup, 0);

newtupsize = MAXALIGN(heaptup->t_len);

}

else

heaptup = newtup;

if (newtupsize > pagefree)

{

newbuf = RelationGetBufferForTuple(relation, heaptup->t_len,

buffer, 0, NULL,

&vmbuffer_new, &vmbuffer);

}

else

{

/* 重新锁定buffer后需要重新检查一次空间还够用不 */

LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);

pagefree = PageGetHeapFreeSpace(page)

if (newtupsize > pagefree)

{

LockBuffer(buffer, BUFFER_LOCK_UNLOCK);

newbuf = RelationGetBufferForTuple(relation, heaptup->t_len,

buffer, 0, NULL,

&vmbuffer_new, &vmbuffer);

}

else

{

newbuf = buffer;

}

}

}

/* 不需要toast且当前page存的下 */

else

{

newbuf = buffer;

heaptup = newtup;

}

/* 检查序列化冲突 */

CheckForSerializableConflictIn(relation, &oldtup, buffer);

/* 若新旧元组放到同一个page,可以考虑使用HOT */

if (newbuf == buffer)

{

if (hot_attrs_checked && !bms_overlap(modified_attrs, hot_attrs))

use_hot_update = true;

}

/* 否则新元组存在了另一个page,那么旧元组可以被清理掉 */

else

{

PageSetFull(page);

}

/* 获得元组标识副本 */

old_key_tuple = ExtractReplicaIdentity(relation, &oldtup,

bms_overlap(modified_attrs, id_attrs),

&old_key_copied);

START_CRIT_SECTION();

/* 如果这个事务提交,旧元组很快就会变为dead状态。在这里给page设置

* 一个标志,当xid低于OldestXmin时,此page会成为被修剪的候选页 */

PageSetPrunable(page, xid);

/* 使用HOT的流程 */

if (use_hot_update)

{

HeapTupleSetHotUpdated(&oldtup);

HeapTupleSetHeapOnly(heaptup);

HeapTupleSetHeapOnly(newtup);

}

/* 不使用HOT */

else

{

HeapTupleClearHotUpdated(&oldtup);

HeapTupleClearHeapOnly(heaptup);

HeapTupleClearHeapOnly(newtup);

}

/* 插入新元组,此函数在heap_insert中见过 */

RelationPutHeapTuple(relation, newbuf, heaptup, false);

/* 清除过时的可见性标志(可能是我们刚刚在上面设置的) */

oldtup.t_data->t_infomask &= ~(HEAP_XMAX_BITS | HEAP_MOVED);

oldtup.t_data->t_infomask2 &= ~HEAP_KEYS_UPDATED;

/* 存储事务更新元组相关的信息 */

Assert(TransactionIdIsValid(xmax_old_tuple));

HeapTupleHeaderSetXmax(oldtup.t_data, xmax_old_tuple);

oldtup.t_data->t_infomask |= infomask_old_tuple;

oldtup.t_data->t_infomask2 |= infomask2_old_tuple;

HeapTupleHeaderSetCmax(oldtup.t_data, cid, iscombo);

/* 在旧元组的t_ctid中记录新元组的地址 */

oldtup.t_data->t_ctid = heaptup->t_self;

/* 清除PD_ALL_VISIVLE标志,重置visible map的标志位 */

if (PageIsAllVisible(BufferGetPage(buffer)))

{

all_visible_cleared = true;

PageClearAllVisible(BufferGetPage(buffer));

visibilitymap_clear(relation, BufferGetBlockNumber(buffer),

vmbuffer, VISIBILITYMAP_VALID_BITS);

}

if (newbuf != buffer && PageIsAllVisible(BufferGetPage(newbuf)))

{

all_visible_cleared_new = true;

PageClearAllVisible(BufferGetPage(newbuf));

visibilitymap_clear(relation, BufferGetBlockNumber(newbuf),

vmbuffer_new, VISIBILITYMAP_VALID_BITS);

}

/* 标记脏块 */

if (newbuf != buffer)

MarkBufferDirty(newbuf);

MarkBufferDirty(buffer);

/* xlog相关流程 */

if (RelationNeedsWAL(relation))

{

/* 若是系统表元组,需要发送combocids来正确解码,

* 因此记录一条XLOG_HEAP2_NEW_CID记录 */

if (RelationIsAccessibleInLogicalDecoding(relation))

{

log_heap_new_cid(relation, &oldtup);

log_heap_new_cid(relation, heaptup);

}

recptr = log_heap_update(relation, buffer,

newbuf, &oldtup, heaptup,

old_key_tuple,

all_visible_cleared,

all_visible_cleared_new);

if (newbuf != buffer)

{

PageSetLSN(BufferGetPage(newbuf), recptr);

}

PageSetLSN(BufferGetPage(buffer), recptr);

}

END_CRIT_SECTION();

/* 解锁buffer */

if (newbuf != buffer)

LockBuffer(newbuf, BUFFER_LOCK_UNLOCK);

LockBuffer(buffer, BUFFER_LOCK_UNLOCK);

/* Cache同步 */

CacheInvalidateHeapTuple(relation, &oldtup, heaptup);

/* 释放buffer */

if (newbuf != buffer)

ReleaseBuffer(newbuf);

ReleaseBuffer(buffer);

if (BufferIsValid(vmbuffer_new))

ReleaseBuffer(vmbuffer_new);

if (BufferIsValid(vmbuffer))

ReleaseBuffer(vmbuffer);

/* 释放元组锁 */

if (have_tuple_lock)

UnlockTupleTuplock(relation, &(oldtup.t_self), *lockmode);

/* 记录统计信息 */

pgstat_count_heap_update(relation, use_hot_update);

/* 释放拷贝的副本 */

if (heaptup != newtup)

{

newtup->t_self = heaptup->t_self;

heap_freetuple(heaptup);

}

if (old_key_tuple != NULL && old_key_copied)

heap_freetuple(old_key_tuple);

bms_free(hot_attrs);

bms_free(key_attrs);

bms_free(id_attrs);

bms_free(modified_attrs);

bms_free(interesting_attrs);

return TM_Ok;

以上是 PostgreSQL源码学习更新数据#0 的全部内容, 来源链接: utcz.com/z/534094.html

回到顶部