PostgreSQL源码学习更新数据#3

database

本节介绍ExecUpdate函数。

ExecUpdate函数

static TupleTableSlot *

ExecUpdate(ModifyTableState *mtstate,

ItemPointer tupleid,

HeapTuple oldtuple,

TupleTableSlot *slot,

TupleTableSlot *planSlot,

EPQState *epqstate,

EState *estate,

bool canSetTag);

/* 必须在(隐式)事务中进行 */

if (IsBootstrapProcessingMode())

elog(ERROR, "cannot UPDATE during bootstrap");

ExecMaterializeSlot(slot);

/* 获取结果关系的信息 */

resultRelInfo = estate->es_result_relation_info;

resultRelationDesc = resultRelInfo->ri_RelationDesc;

/* BEFORE ROW UPDATE 触发器 */

if (resultRelInfo->ri_TrigDesc &&

resultRelInfo->ri_TrigDesc->trig_update_before_row)

{

if (!ExecBRUpdateTriggers(estate, epqstate, resultRelInfo,

tupleid, oldtuple, slot))

return NULL; /* "do nothing" */

}

/* INSTEAD OF ROW UPDATE 触发器 */

if (resultRelInfo->ri_TrigDesc &&

resultRelInfo->ri_TrigDesc->trig_update_instead_row)

{

if (!ExecIRUpdateTriggers(estate, resultRelInfo,

oldtuple, slot))

return NULL; /* "do nothing" */

}

/* 若是外部表 */

else if (resultRelInfo->ri_FdwRoutine)

{

/* 先计算生成列 */

if (resultRelationDesc->rd_att->constr &&

resultRelationDesc->rd_att->constr->has_generated_stored)

ExecComputeStoredGenerated(estate, slot);

/* update外部表,使用fdw */

slot = resultRelInfo->ri_FdwRoutine->ExecForeignUpdate(estate,

resultRelInfo,

slot,

planSlot);

if (slot == NULL) /* "do nothing" */

return NULL;

/* AFTER ROW触发器或RETURNING表达式有可能会用到tableoid,所以初始化一下tts_tableOid */

slot->tts_tableOid = RelationGetRelid(resultRelationDesc);

}

/* 一般的update流程 */

else

{

/* 约束表达式可能会用到tableoid,因此初始化一下tts_tableOid */

slot->tts_tableOid = RelationGetRelid(resultRelationDesc);

/* 计算生成列 */

if (resultRelationDesc->rd_att->constr &&

resultRelationDesc->rd_att->constr->has_generated_stored)

ExecComputeStoredGenerated(estate, slot);

/* goto跳跃点 */

lreplace:

/* 确保slot被物化(考虑到可能到来的EPQ等) */

ExecMaterializeSlot(slot);

/* 检查元组是否符合分区约束条件 */

partition_constraint_failed =

resultRelInfo->ri_PartitionCheck &&

!ExecPartitionCheck(resultRelInfo, slot, estate, false);

/* 当不符合分区约束条件时跳过WCO检查

*(因为在插入前的触发器可能还会更改元组,所以wco放到插入数据前检查) */

if (!partition_constraint_failed &&

resultRelInfo->ri_WithCheckOptions != NIL)

{

ExecWithCheckOptions(WCO_RLS_UPDATE_CHECK,

resultRelInfo, slot, estate);

}

/* 若不符合分区约束,尝试将行移动到正确的分区中 */

if (partition_constraint_failed)

{

PartitionTupleRouting *proute = mtstate->mt_partition_tuple_routing;

/* 若是INSERT ON CONFLICT DO UPDATE导致的update,这意味着当前元组不符合分区约束

* 但是当前分区却存在有冲突的元组,这样的话报错 */

if (((ModifyTable *) mtstate->ps.plan)->onConflictAction == ONCONFLICT_UPDATE)

ereport(ERROR,

(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),

errmsg("invalid ON UPDATE specification"),

errdetail("The result tuple would appear in a different partition than the original tuple.")));

/* 没有设置分区路由表明现在在叶分区上执行操作,因此出现了不符合分区约束那么就报错 */

if (proute == NULL)

ExecPartitionCheckEmitError(resultRelInfo, slot, estate);

/* 删除元组,但不进行RETURNING,insert操作的时候再返回 */

ExecDelete(mtstate, tupleid, oldtuple, planSlot, epqstate,

estate, false, false /* canSetTag */ ,

true /* changingPart */ , &tuple_deleted, &epqslot);

/* 若因为各种原因没能成功删除 */

if (!tuple_deleted)

{

/* epqslot通常为空,但是若删除时发现另一个事务同时更新了同一行,那么重新检查 */

if (TupIsNull(epqslot))

return NULL;

else

{

slot = ExecFilterJunk(resultRelInfo->ri_junkFilter, epqslot);

goto lreplace;

}

}

/* 保存update操作的转换捕获映射,因为在插入之后需要恢复回来 */

if (mtstate->mt_transition_capture)

saved_tcs_map = mtstate->mt_transition_capture->tcs_map;

/* 获取resultRel在mtstate->resultRelInfo中的位置,执行元组转换 */

map_index = resultRelInfo - mtstate->resultRelInfo;

Assert(map_index >= 0 && map_index < mtstate->mt_nplans);

tupconv_map = tupconv_map_for_subplan(mtstate, map_index);

if (tupconv_map != NULL)

slot = execute_attr_map_slot(tupconv_map->attrMap,

slot,

mtstate->mt_root_tuple_slot);

/* 准备元组路由 */

Assert(mtstate->rootResultRelInfo != NULL);

slot = ExecPrepareTupleRouting(mtstate, estate, proute,

mtstate->rootResultRelInfo, slot);

/* 执行插入 */

ret_slot = ExecInsert(mtstate, slot, planSlot,

estate, canSetTag);

/* 还原一些临时更改 */

estate->es_result_relation_info = resultRelInfo;

if (mtstate->mt_transition_capture)

{

mtstate->mt_transition_capture->tcs_original_insert_tuple = NULL;

mtstate->mt_transition_capture->tcs_map = saved_tcs_map;

}

/* 向其它分区插入的update操作完成 */

return ret_slot;

}

/* 检查元组分区约束以外的其它约束条件 */

if (resultRelationDesc->rd_att->constr)

ExecConstraints(resultRelInfo, slot, estate);

/* 执行update操作 */

result = table_tuple_update(resultRelationDesc, tupleid, slot,

estate->es_output_cid,

estate->es_snapshot,

estate->es_crosscheck_snapshot,

true /* wait for commit */ ,

&tmfd, &lockmode, &update_indexes);

switch (result)

{

/* 目标元组已经被当前命令或当前事务的后续命令更新或删除了 */

case TM_SelfModified:

if (tmfd.cmax != estate->es_output_cid)

ereport(ERROR,

(errcode(ERRCODE_TRIGGERED_DATA_CHANGE_VIOLATION),

errmsg("tuple to be updated was already modified by an operation triggered by the current command"),

errhint("Consider using an AFTER trigger instead of a BEFORE trigger to propagate changes to other rows.")));

/* Else, already updated by self; nothing to do */

return NULL;

/* 正常更新 */

case TM_Ok:

break;

/* 目标元组已经被其它事务更新 */

case TM_Updated:

{

/* 检查事务隔离级别是否大于等于可重复读 */

if (IsolationUsesXactSnapshot())

ereport(ERROR,

(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),

errmsg("could not serialize access due to concurrent update")));

/* 获取或创建EPQ测试要用的slot,然后锁定最新版本的元组,并将其放入inputslot */

inputslot = EvalPlanQualSlot(epqstate, resultRelationDesc,

resultRelInfo->ri_RangeTableIndex);

result = table_tuple_lock(resultRelationDesc, tupleid,

estate->es_snapshot,

inputslot, estate->es_output_cid,

lockmode, LockWaitBlock,

TUPLE_LOCK_FLAG_FIND_LAST_VERSION,

&tmfd);

switch (result)

{

case TM_Ok:

Assert(tmfd.traversed);

/* 判断是否应该在读已提交的规则下处理已更新的元组,如果为否的话,返回NULL */

epqslot = EvalPlanQual(epqstate,

resultRelationDesc,

resultRelInfo->ri_RangeTableIndex,

inputslot);

if (TupIsNull(epqslot))

return NULL;

slot = ExecFilterJunk(resultRelInfo->ri_junkFilter, epqslot);

goto lreplace;

/* 元组已被删除 */

case TM_Deleted:

return NULL;

/* 若已经被自身这条命令更新,则跳过更新操作,否则报错 */

case TM_SelfModified:

if (tmfd.cmax != estate->es_output_cid)

ereport(ERROR,

(errcode(ERRCODE_TRIGGERED_DATA_CHANGE_VIOLATION),

errmsg("tuple to be updated was already modified by an operation triggered by the current command"),

errhint("Consider using an AFTER trigger instead of a BEFORE trigger to propagate changes to other rows.")));

return NULL;

/* 其它出错情况 */

default:

elog(ERROR, "unexpected table_tuple_lock status: %u",

result);

return NULL;

}

}

break;

/* 目标元组已被其它事务删除 */

case TM_Deleted:

if (IsolationUsesXactSnapshot())

ereport(ERROR,

(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),

errmsg("could not serialize access due to concurrent delete")));

return NULL;

/* 其它出错情况 */

default:

elog(ERROR, "unrecognized table_tuple_update status: %u",

result);

return NULL;

}

/* 有需要的话,为元组插入索引 */

if (resultRelInfo->ri_NumIndices > 0 && update_indexes)

recheckIndexes = ExecInsertIndexTuples(slot, estate, false, NULL, NIL);

}

/* 有需要的话,统计处理的元组数目 */

if (canSetTag)

(estate->es_processed)++;

/* AFTER ROW UPDATE 触发器 */

ExecARUpdateTriggers(estate, resultRelInfo, tupleid, oldtuple, slot,

recheckIndexes,

mtstate->operation == CMD_INSERT ?

mtstate->mt_oc_transition_capture :

mtstate->mt_transition_capture);

list_free(recheckIndexes);

/* 检查父视图中的所有WITH CHECK OPTION约束 */

if (resultRelInfo->ri_WithCheckOptions != NIL)

ExecWithCheckOptions(WCO_VIEW_CHECK, resultRelInfo, slot, estate);

/* 处理RETURNING */

if (resultRelInfo->ri_projectReturning)

return ExecProcessReturning(resultRelInfo, slot, planSlot);

return NULL;

以上是 PostgreSQL源码学习更新数据#3 的全部内容, 来源链接: utcz.com/z/534161.html

回到顶部