PostgreSQL源码学习更新数据#3
本节介绍ExecUpdate函数。
ExecUpdate函数
static TupleTableSlot *ExecUpdate(ModifyTableState *mtstate,
ItemPointer tupleid,
HeapTuple oldtuple,
TupleTableSlot *slot,
TupleTableSlot *planSlot,
EPQState *epqstate,
EState *estate,
bool canSetTag);
/* 必须在(隐式)事务中进行 */if (IsBootstrapProcessingMode())
elog(ERROR, "cannot UPDATE during bootstrap");
ExecMaterializeSlot(slot);
/* 获取结果关系的信息 */
resultRelInfo = estate->es_result_relation_info;
resultRelationDesc = resultRelInfo->ri_RelationDesc;
/* BEFORE ROW UPDATE 触发器 */
if (resultRelInfo->ri_TrigDesc &&
resultRelInfo->ri_TrigDesc->trig_update_before_row)
{
if (!ExecBRUpdateTriggers(estate, epqstate, resultRelInfo,
tupleid, oldtuple, slot))
return NULL; /* "do nothing" */
}
/* INSTEAD OF ROW UPDATE 触发器 */
if (resultRelInfo->ri_TrigDesc &&
resultRelInfo->ri_TrigDesc->trig_update_instead_row)
{
if (!ExecIRUpdateTriggers(estate, resultRelInfo,
oldtuple, slot))
return NULL; /* "do nothing" */
}
/* 若是外部表 */
else if (resultRelInfo->ri_FdwRoutine)
{
/* 先计算生成列 */
if (resultRelationDesc->rd_att->constr &&
resultRelationDesc->rd_att->constr->has_generated_stored)
ExecComputeStoredGenerated(estate, slot);
/* update外部表,使用fdw */
slot = resultRelInfo->ri_FdwRoutine->ExecForeignUpdate(estate,
resultRelInfo,
slot,
planSlot);
if (slot == NULL) /* "do nothing" */
return NULL;
/* AFTER ROW触发器或RETURNING表达式有可能会用到tableoid,所以初始化一下tts_tableOid */
slot->tts_tableOid = RelationGetRelid(resultRelationDesc);
}
/* 一般的update流程 */
else
{
/* 约束表达式可能会用到tableoid,因此初始化一下tts_tableOid */
slot->tts_tableOid = RelationGetRelid(resultRelationDesc);
/* 计算生成列 */
if (resultRelationDesc->rd_att->constr &&
resultRelationDesc->rd_att->constr->has_generated_stored)
ExecComputeStoredGenerated(estate, slot);
/* goto跳跃点 */
lreplace:
/* 确保slot被物化(考虑到可能到来的EPQ等) */
ExecMaterializeSlot(slot);
/* 检查元组是否符合分区约束条件 */
partition_constraint_failed =
resultRelInfo->ri_PartitionCheck &&
!ExecPartitionCheck(resultRelInfo, slot, estate, false);
/* 当不符合分区约束条件时跳过WCO检查
*(因为在插入前的触发器可能还会更改元组,所以wco放到插入数据前检查) */
if (!partition_constraint_failed &&
resultRelInfo->ri_WithCheckOptions != NIL)
{
ExecWithCheckOptions(WCO_RLS_UPDATE_CHECK,
resultRelInfo, slot, estate);
}
/* 若不符合分区约束,尝试将行移动到正确的分区中 */
if (partition_constraint_failed)
{
PartitionTupleRouting *proute = mtstate->mt_partition_tuple_routing;
/* 若是INSERT ON CONFLICT DO UPDATE导致的update,这意味着当前元组不符合分区约束
* 但是当前分区却存在有冲突的元组,这样的话报错 */
if (((ModifyTable *) mtstate->ps.plan)->onConflictAction == ONCONFLICT_UPDATE)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("invalid ON UPDATE specification"),
errdetail("The result tuple would appear in a different partition than the original tuple.")));
/* 没有设置分区路由表明现在在叶分区上执行操作,因此出现了不符合分区约束那么就报错 */
if (proute == NULL)
ExecPartitionCheckEmitError(resultRelInfo, slot, estate);
/* 删除元组,但不进行RETURNING,insert操作的时候再返回 */
ExecDelete(mtstate, tupleid, oldtuple, planSlot, epqstate,
estate, false, false /* canSetTag */ ,
true /* changingPart */ , &tuple_deleted, &epqslot);
/* 若因为各种原因没能成功删除 */
if (!tuple_deleted)
{
/* epqslot通常为空,但是若删除时发现另一个事务同时更新了同一行,那么重新检查 */
if (TupIsNull(epqslot))
return NULL;
else
{
slot = ExecFilterJunk(resultRelInfo->ri_junkFilter, epqslot);
goto lreplace;
}
}
/* 保存update操作的转换捕获映射,因为在插入之后需要恢复回来 */
if (mtstate->mt_transition_capture)
saved_tcs_map = mtstate->mt_transition_capture->tcs_map;
/* 获取resultRel在mtstate->resultRelInfo中的位置,执行元组转换 */
map_index = resultRelInfo - mtstate->resultRelInfo;
Assert(map_index >= 0 && map_index < mtstate->mt_nplans);
tupconv_map = tupconv_map_for_subplan(mtstate, map_index);
if (tupconv_map != NULL)
slot = execute_attr_map_slot(tupconv_map->attrMap,
slot,
mtstate->mt_root_tuple_slot);
/* 准备元组路由 */
Assert(mtstate->rootResultRelInfo != NULL);
slot = ExecPrepareTupleRouting(mtstate, estate, proute,
mtstate->rootResultRelInfo, slot);
/* 执行插入 */
ret_slot = ExecInsert(mtstate, slot, planSlot,
estate, canSetTag);
/* 还原一些临时更改 */
estate->es_result_relation_info = resultRelInfo;
if (mtstate->mt_transition_capture)
{
mtstate->mt_transition_capture->tcs_original_insert_tuple = NULL;
mtstate->mt_transition_capture->tcs_map = saved_tcs_map;
}
/* 向其它分区插入的update操作完成 */
return ret_slot;
}
/* 检查元组分区约束以外的其它约束条件 */
if (resultRelationDesc->rd_att->constr)
ExecConstraints(resultRelInfo, slot, estate);
/* 执行update操作 */
result = table_tuple_update(resultRelationDesc, tupleid, slot,
estate->es_output_cid,
estate->es_snapshot,
estate->es_crosscheck_snapshot,
true /* wait for commit */ ,
&tmfd, &lockmode, &update_indexes);
switch (result)
{
/* 目标元组已经被当前命令或当前事务的后续命令更新或删除了 */
case TM_SelfModified:
if (tmfd.cmax != estate->es_output_cid)
ereport(ERROR,
(errcode(ERRCODE_TRIGGERED_DATA_CHANGE_VIOLATION),
errmsg("tuple to be updated was already modified by an operation triggered by the current command"),
errhint("Consider using an AFTER trigger instead of a BEFORE trigger to propagate changes to other rows.")));
/* Else, already updated by self; nothing to do */
return NULL;
/* 正常更新 */
case TM_Ok:
break;
/* 目标元组已经被其它事务更新 */
case TM_Updated:
{
/* 检查事务隔离级别是否大于等于可重复读 */
if (IsolationUsesXactSnapshot())
ereport(ERROR,
(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
errmsg("could not serialize access due to concurrent update")));
/* 获取或创建EPQ测试要用的slot,然后锁定最新版本的元组,并将其放入inputslot */
inputslot = EvalPlanQualSlot(epqstate, resultRelationDesc,
resultRelInfo->ri_RangeTableIndex);
result = table_tuple_lock(resultRelationDesc, tupleid,
estate->es_snapshot,
inputslot, estate->es_output_cid,
lockmode, LockWaitBlock,
TUPLE_LOCK_FLAG_FIND_LAST_VERSION,
&tmfd);
switch (result)
{
case TM_Ok:
Assert(tmfd.traversed);
/* 判断是否应该在读已提交的规则下处理已更新的元组,如果为否的话,返回NULL */
epqslot = EvalPlanQual(epqstate,
resultRelationDesc,
resultRelInfo->ri_RangeTableIndex,
inputslot);
if (TupIsNull(epqslot))
return NULL;
slot = ExecFilterJunk(resultRelInfo->ri_junkFilter, epqslot);
goto lreplace;
/* 元组已被删除 */
case TM_Deleted:
return NULL;
/* 若已经被自身这条命令更新,则跳过更新操作,否则报错 */
case TM_SelfModified:
if (tmfd.cmax != estate->es_output_cid)
ereport(ERROR,
(errcode(ERRCODE_TRIGGERED_DATA_CHANGE_VIOLATION),
errmsg("tuple to be updated was already modified by an operation triggered by the current command"),
errhint("Consider using an AFTER trigger instead of a BEFORE trigger to propagate changes to other rows.")));
return NULL;
/* 其它出错情况 */
default:
elog(ERROR, "unexpected table_tuple_lock status: %u",
result);
return NULL;
}
}
break;
/* 目标元组已被其它事务删除 */
case TM_Deleted:
if (IsolationUsesXactSnapshot())
ereport(ERROR,
(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
errmsg("could not serialize access due to concurrent delete")));
return NULL;
/* 其它出错情况 */
default:
elog(ERROR, "unrecognized table_tuple_update status: %u",
result);
return NULL;
}
/* 有需要的话,为元组插入索引 */
if (resultRelInfo->ri_NumIndices > 0 && update_indexes)
recheckIndexes = ExecInsertIndexTuples(slot, estate, false, NULL, NIL);
}
/* 有需要的话,统计处理的元组数目 */
if (canSetTag)
(estate->es_processed)++;
/* AFTER ROW UPDATE 触发器 */
ExecARUpdateTriggers(estate, resultRelInfo, tupleid, oldtuple, slot,
recheckIndexes,
mtstate->operation == CMD_INSERT ?
mtstate->mt_oc_transition_capture :
mtstate->mt_transition_capture);
list_free(recheckIndexes);
/* 检查父视图中的所有WITH CHECK OPTION约束 */
if (resultRelInfo->ri_WithCheckOptions != NIL)
ExecWithCheckOptions(WCO_VIEW_CHECK, resultRelInfo, slot, estate);
/* 处理RETURNING */
if (resultRelInfo->ri_projectReturning)
return ExecProcessReturning(resultRelInfo, slot, planSlot);
return NULL;
以上是 PostgreSQL源码学习更新数据#3 的全部内容, 来源链接: utcz.com/z/534161.html