服务器之家:专注于服务器技术及软件下载分享
分类导航

Mysql|Sql Server|Oracle|Redis|MongoDB|PostgreSQL|Sqlite|DB2|mariadb|Access|数据库技术|

服务器之家 - 数据库 - PostgreSQL - Postgres中UPDATE更新语句源码分析

Postgres中UPDATE更新语句源码分析

2022-03-05 21:00chirpyli PostgreSQL

这篇文章主要给大家介绍了关于Postgres中UPDATE更新语句源码分析的相关资料,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下

PG中UPDATE源码分析

本文主要描述SQL中UPDATE语句的源码分析,代码为PG13.3版本。

整体流程分析

update dtea set id = 1;这条最简单的Update语句进行源码分析(dtea不是分区表,不考虑并行等,没有建立任何索引),帮助我们理解update的大致流程。

SQL流程如下:

  • parser(语法解析,生成语法解析树UpdateStmt,检查是否有语法层面的错误)

  • analyze(语义分析, UpdateStmt转为查询树Query, 会查系统表检查有无语义方面的错误)

  • rewrite(规则重写, 根据规则rules重写查询树Query, 根据事先存储在系统表中的规则进行重写,没有的话不进行重写,另外加一句,视图的实现是根据规则系统实现的,也是在这里需要进行处理)

  • optimizer(优化器:逻辑优化、物理优化、生成执行计划, 由Query生成对应的执行计划PlannedStmt, 基于代价的优化器,由最佳路径Path生成最佳执行计划Plan)

  • executor(执行器,会有各种算子,依据执行计划进行处理,火山模型,一次一元组)

  • storage(存储引擎)。中间还有事务处理。事务处理部分的代码这里不再进行分析,免得将问题复杂化。存储引擎那部分也不进行分析,重点关注解析、优化、执行这三部分。

对应的代码:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
exec_simple_query(const char *query_string)
// ------- 解析器部分--------------
--> pg_parse_query(query_string);    //生成语法解析树
--> pg_analyze_and_rewrite(parsetree, query_string,NULL, 0, NULL);   // 生成查询树Query
    --> parse_analyze(parsetree, query_string, paramTypes, numParams,queryEnv); // 语义分析
    --> pg_rewrite_query(query);    // 规则重写
 
// --------优化器----------
--> pg_plan_queries()
 
//-------- 执行器----------
--> PortalStart(portal, NULL, 0, InvalidSnapshot);
--> PortalRun(portal,FETCH_ALL,true,true,receiver,receiver,&qc);    // 执行器执行
--> PortalDrop(portal, false);

解析部分——生成语法解析树UpdateStmt

关键数据结构:UpdateStmtRangeVarResTarget:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
/* Update Statement  */
typedef struct UpdateStmt
{
 NodeTag  type;
 RangeVar   *relation;  /* relation to update */
 List    *targetList;  /* the target list (of ResTarget) */ // 对应语句中的set id = 0;信息在这里
 Node    *whereClause; /* qualifications */
 List    *fromClause;  /* optional from clause for more tables */
 List    *returningList; /* list of expressions to return */
 WithClause *withClause;  /* WITH clause */
} UpdateStmt;
 
// dtea 表
typedef struct RangeVar
{
 NodeTag  type;
 char    *catalogname; /* the catalog (databasenameor NULL */
 char    *schemaname;  /* the schema nameor NULL */
 char    *relname;  /* the relation/sequence name */
 bool  inh;   /* expand rel by inheritance? recursively act
         on children? */
 char  relpersistence; /* see RELPERSISTENCE_* in pg_class.h */
 Alias    *alias;   /* table alias & optional column aliases */
 int   location;  /* token location, or -1 if unknown */
} RangeVar;
 
// set id = 0;   经transformTargetList() -> transformTargetEntry,会转为TargetEntry
typedef struct ResTarget
{
 NodeTag  type;
 char    *name;   /* column name or NULL */     // id column
 List    *indirection; /* subscripts, field names, and '*'or NIL */
 Node    *val;   /* the value expression to compute or assign */  // = 1表达式节点存在这里
 int   location;  /* token location, or -1 if unknown */
} ResTarget;

用户输入的update语句update dtea set id = 1由字符串会转为可由数据库理解的内部数据结构语法解析树UpdateStmt。执行逻辑在pg_parse_query(query_string);中,需要理解flex与bison。

gram.y中Update语法的定义:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
/*****************************************************************************
 *  QUERY:
 *    UpdateStmt (UPDATE)
 *****************************************************************************/
//结合这条语句分析 update dtea set id = 0;
UpdateStmt: opt_with_clause UPDATE relation_expr_opt_alias
   SET set_clause_list from_clause where_or_current_clause returning_clause
    {
     UpdateStmt *n = makeNode(UpdateStmt);
     n->relation = $3;
     n->targetList = $5;
     n->fromClause = $6;
     n->whereClause = $7;
     n->returningList = $8;
     n->withClause = $1;
     $$ = (Node *)n;
    }
  ;
 
set_clause_list:
   set_clause       { $$ = $1; }
   | set_clause_list ',' set_clause { $$ = list_concat($1,$3); }
  ;
// 对应的是 set id = 0
set_clause:   // id     =   0
   set_target '=' a_expr
    {
     $1->val = (Node *) $3;
     $$ = list_make1($1);
    }
   '(' set_target_list ')' '=' a_expr
    {
     int ncolumns = list_length($2);
     int i = 1;
     ListCell *col_cell;
 
     foreach(col_cell, $2) /* Create a MultiAssignRef source for each target */
     {
      ResTarget *res_col = (ResTarget *) lfirst(col_cell);
      MultiAssignRef *r = makeNode(MultiAssignRef);
 
      r->source = (Node *) $5;
      r->colno = i;
      r->ncolumns = ncolumns;
      res_col->val = (Node *) r;
      i++;
     }
 
     $$ = $2;
    }
  ;
 
set_target:
   ColId opt_indirection
    {
     $$ = makeNode(ResTarget);
     $$->name = $1;
     $$->indirection = check_indirection($2, yyscanner);
     $$->val = NULL; /* upper production sets this */
     $$->location = @1;
    }
  ;
 
set_target_list:
   set_target        { $$ = list_make1($1); }
   | set_target_list ',' set_target  { $$ = lappend($1,$3); }
  ;

解析部分——生成查询树Query

生成了UpdateStmt后, 会经由parse_analyze语义分析,生成查询树Query,以供后续优化器生成执行计划。主要代码在src/backent/parser/analyze.c

analyze.c : transform the raw parse tree into a query tree

?
1
2
3
4
5
6
parse_analyze()
--> transformTopLevelStmt(pstate, parseTree);
    --> transformOptionalSelectInto(pstate, parseTree->stmt);
        --> transformStmt(pstate, parseTree);
            // transforms an update statement
            --> transformUpdateStmt(pstate, (UpdateStmt *) parseTree);  // 实际由UpdateStmt转为Query的处理函数

具体的我们看一下transformUpdateStmt函数实现:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
/* transformUpdateStmt -  transforms an update statement  */
static Query *transformUpdateStmt(ParseState *pstate, UpdateStmt *stmt) {
 Query    *qry = makeNode(Query);
 ParseNamespaceItem *nsitem;
 Node    *qual;
 
 qry->commandType = CMD_UPDATE;
 pstate->p_is_insert = false;
 
 /* process the WITH clause independently of all else */
 if (stmt->withClause) {
  qry->hasRecursive = stmt->withClause->recursive;
  qry->cteList = transformWithClause(pstate, stmt->withClause);
  qry->hasModifyingCTE = pstate->p_hasModifyingCTE;
 }
 
 qry->resultRelation = setTargetTable(pstate, stmt->relation, stmt->relation->inh, true, ACL_UPDATE);
 nsitem = pstate->p_target_nsitem;
 
 /* subqueries in FROM cannot access the result relation */
 nsitem->p_lateral_only = true;
 nsitem->p_lateral_ok = false;
 
 /* the FROM clause is non-standard SQL syntax. We used to be able to do this with REPLACE in POSTQUEL so we keep the feature.*/
 transformFromClause(pstate, stmt->fromClause);
 
 /* remaining clauses can reference the result relation normally */
 nsitem->p_lateral_only = false;
 nsitem->p_lateral_ok = true;
 
 qual = transformWhereClause(pstate, stmt->whereClause,EXPR_KIND_WHERE, "WHERE");
 qry->returningList = transformReturningList(pstate, stmt->returningList);
 
 /* Now we are done with SELECT-like processing, and can get on with
  * transforming the target list to match the UPDATE target columns.*/
 qry->targetList = transformUpdateTargetList(pstate, stmt->targetList);  // 处理SQL语句中的 set id =1 
 
 qry->rtable = pstate->p_rtable;
 qry->jointree = makeFromExpr(pstate->p_joinlist, qual);
 qry->hasTargetSRFs = pstate->p_hasTargetSRFs;
 qry->hasSubLinks = pstate->p_hasSubLinks;
 
 assign_query_collations(pstate, qry);
 
 return qry;
}

这里面要重点关注一下transformTargetList,会将抽象语法树中的ResTarget转为查询器的TargetEntry

?
1
2
3
4
5
6
7
8
9
10
11
typedef struct TargetEntry
{
 Expr  xpr;
 Expr    *expr;   /* expression to evaluate */
 AttrNumber resno;   /* attribute number (see notes above) */
 char    *resname;  /* name of the column (could be NULL) */
 Index  ressortgroupref; /* nonzero if referenced by a sort/group clause */
 Oid   resorigtbl;  /* OID of column's source table */
 AttrNumber resorigcol;  /* column's number in source table */
 bool  resjunk;  /* set to true to eliminate the attribute from final target list */
} TargetEntry;

对于其内部处理可参考源码src/backend/parser中的相关处理,这里不再细述。需要重点阅读一下README,PG源码中所有的README都是非常好的资料,一定要认真读。

优化器——生成执行计划

这块的内容很多,主要的逻辑是先进行逻辑优化,比如子查询、子链接、常量表达式、选择下推等等的处理,因为我们要分析的这条语句十分简单,所以逻辑优化的这部分都没有涉及到。物理优化,涉及到选择率,代价估计,索引扫描还是顺序扫描,选择那种连接方式,应用动态规划呢还是基因算法,选择nestloop-join、merge-join还是hash-join等。因为我们这个表没有建索引,更新单表也不涉及到多表连接,所以物理优化这块涉及的也不多。路径生成,生成最佳路径,再由最佳路径生成执行计划。

在路径生成这块,最基础的是对表的扫描方式,比如顺序扫描、索引扫描,再往上是连接方式,采用那种连接方式,再往上是比如排序、Limit等路径......,由底向上生成路径。我们要分析的语句很简单,没有其他处理,就顺序扫描再更新就可以了。

这里先不考虑并行执行计划。我们先看一下其执行计划结果:

?
1
2
3
4
5
6
postgres@postgres=# explain update dtea set id = 0;
                          QUERY PLAN                          
--------------------------------------------------------------
 Update on dtea  (cost=0.00..19.00 rows=900 width=68)
   ->  Seq Scan on dtea  (cost=0.00..19.00 rows=900 width=68)
(2 rows)

下面我们分析一下其执行计划的生成流程:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
// 由查询树Query--> Path --> Plan (PlannedStmt)
pg_plan_queries()
--> pg_plan_query()
    --> planner()
        --> standard_planner(Query *parse, const char *query_string, int cursorOptions,ParamListInfo boundParams)
            // 由Query---> PlannerInfo
            --> subquery_planner(glob, parse, NULL,false, tuple_fraction);  // 涉及到很多逻辑优化的内容,很多不列出
                --> pull_up_sublinks(root);
                --> pull_up_subqueries(root);   // 这里只列出几个重要的逻辑优化内容,其他的不再列出......
                // 如果是update/delete分区表继承表则走inheritance_planner(),其他情况走grouping_planner()
                --> inheritance_planner()   // update/delete分区表继承表的情况
                    --> grouping_planner()
                --> grouping_planner() // 非分区表、继承表的情况
                    --> preprocess_targetlist(root); // update虽然只更新一列,但是插入一条新元组的时候,需要知道其他列信息.
                        --> rewriteTargetListUD(parse, target_rte, target_relation);
                        --> expand_targetlist()
                    --> query_planner(root, standard_qp_callback, &qp_extra);   // 重要
                        --> add_base_rels_to_query()
                        --> deconstruct_jointree(root);
                        --> add_other_rels_to_query(root); // 展开分区表到PlannerInfo中的相关字段中 
                            --> expand_inherited_rtentry()  
        --> expand_planner_arrays(root, num_live_parts);
                        --> make_one_rel(root, joinlist);   
                            --> set_base_rel_sizes(root); 
                                --> set_rel_size();
         --> set_append_rel_size(root, rel, rti, rte); // 如果是分区表或者继承走这里,否则走下面
          --> set_rel_size(root, childrel, childRTindex, childRTE); // 处理子分区表
           --> set_plain_rel_size(root, rel, rte);
                                    --> set_plain_rel_size()   // 如果不是分区表或者继承
                                        --> set_baserel_size_estimates()
                            --> set_base_rel_pathlists(root);
        --> set_rel_pathlist(root, rel, rti, root->simple_rte_array[rti]);
         --> set_append_rel_pathlist(root, rel, rti, rte); // 生成各分区表的访问路径
                            --> make_rel_from_joinlist(root, joinlist);// 动态规划还是基因规划
        --> standard_join_search() // 动态规划
        --> geqo() // 基因规划与动态规划二选一
                    --> apply_scanjoin_target_to_paths()
                    --> create_modifytable_path()
            // 由PlannerInfo---> RelOptInfo 
            --> fetch_upper_rel(root, UPPERREL_FINAL, NULL);
            // 由RelOptInfo---> Path
            --> get_cheapest_fractional_path(final_rel, tuple_fraction);
            // 由 PlannerInfo+Path  ---> Plan
            --> create_plan(root, best_path);
            // 后续处理,由Plan ---> PlannedStmt

核心数据结构:PlannedStmt、PlannerInfo、RelOptInfo(存储访问路径及其代价)、Path

Path:所有的路径都继承自Path,所以这个比较重要。

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
typedef struct Path
{
 NodeTag  type;
 NodeTag  pathtype;  /* tag identifying scan/join method */
 
 RelOptInfo *parent;   /* the relation this path can build */
 PathTarget *pathtarget;  /* list of Vars/Exprs, cost, width */
 
 ParamPathInfo *param_info; /* parameterization info, or NULL if none */
 
 bool  parallel_aware; /* engage parallel-aware logic? */
 bool  parallel_safe; /* OK to use as part of parallel plan? */
 int   parallel_workers; /* desired # of workers; 0 = not parallel */
 
 /* estimated size/costs for path (see costsize.c for more info) */
 double  rows;   /* estimated number of result tuples */
 Cost  startup_cost; /* cost expended before fetching any tuples */
 Cost  total_cost;  /* total cost (assuming all tuples fetched) */
 
 List    *pathkeys;  /* sort ordering of path's output */
 /* pathkeys is a List of PathKey nodes; see above */
} Path;
 
/* ModifyTablePath represents performing INSERT/UPDATE/DELETE modifications
 * We represent most things that will be in the ModifyTable plan node
 * literally, except we have child Path(s) not Plan(s).  But analysis of the
 * OnConflictExpr is deferred to createplan.c, as is collection of FDW data. */
typedef struct ModifyTablePath
{
 Path  path;   // 可以看到ModifyTablePath继承自Path
 CmdType  operation;  /* INSERTUPDATEor DELETE */
 bool  canSetTag;  /* do we set the command tag/es_processed? */
 Index  nominalRelation; /* Parent RT index for use of EXPLAIN */
 Index  rootRelation; /* Root RT index, if target is partitioned */
 bool  partColsUpdated; /* some part key in hierarchy updated */
 List    *resultRelations; /* integer list of RT indexes */
 List    *subpaths;  /* Path(s) producing source data */
 List    *subroots;  /* per-target-table PlannerInfos */
 List    *withCheckOptionLists; /* per-target-table WCO lists */
 List    *returningLists; /* per-target-table RETURNING tlists */
 List    *rowMarks;  /* PlanRowMarks (non-locking only) */
 OnConflictExpr *onconflict; /* ON CONFLICT clause, or NULL */
 int   epqParam;  /* ID of Param for EvalPlanQual re-eval */
} ModifyTablePath;

生成update执行路径,最终都是要生成ModifyTablePath,本例中路径生成过程:Path-->ProjectionPath-->ModifyTablePath,也就是先顺序扫描表,再修改表。后面由路径生成执行计划。

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
/* create_modifytable_path
 *   Creates a pathnode that represents performing INSERT/UPDATE/DELETE mods
 *
 'rel' is the parent relation associated with the result
 'resultRelations' is an integer list of actual RT indexes of target rel(s)
 'subpaths' is a list of Path(s) producing source data (one per rel)
 'subroots' is a list of PlannerInfo structs (one per rel)*/
ModifyTablePath *create_modifytable_path(PlannerInfo *root, RelOptInfo *rel,
      CmdType operation, bool canSetTag,
      Index nominalRelation, Index rootRelation,
      bool partColsUpdated,
      List *resultRelations, List *subpaths,
      List *subroots,
      List *withCheckOptionLists, List *returningLists,
      List *rowMarks, OnConflictExpr *onconflict,
      int epqParam)
{
 ModifyTablePath *pathnode = makeNode(ModifyTablePath);
 double  total_size;
 ListCell   *lc;
 
 Assert(list_length(resultRelations) == list_length(subpaths));
 Assert(list_length(resultRelations) == list_length(subroots));
 Assert(withCheckOptionLists == NIL || list_length(resultRelations) == list_length(withCheckOptionLists));
 Assert(returningLists == NIL || list_length(resultRelations) == list_length(returningLists));
 
 pathnode->path.pathtype = T_ModifyTable;
 pathnode->path.parent = rel;
 
 pathnode->path.pathtarget = rel->reltarget; /* pathtarget is not interesting, just make it minimally valid */
 /* For now, assume we are above any joins, so no parameterization */
 pathnode->path.param_info = NULL;
 pathnode->path.parallel_aware = false;
 pathnode->path.parallel_safe = false;
 pathnode->path.parallel_workers = 0;
 pathnode->path.pathkeys = NIL;
 
 /** Compute cost & rowcount as sum of subpath costs & rowcounts.
  *
  * Currently, we don't charge anything extra for the actual table
  * modification work, nor for the WITH CHECK OPTIONS or RETURNING
  * expressions if any.  It would only be window dressing, since
  * ModifyTable is always a top-level node and there is no way for the
  * costs to change any higher-level planning choices.  But we might want
  * to make it look better sometime.*/
 pathnode->path.startup_cost = 0;
 pathnode->path.total_cost = 0;
 pathnode->path.rows = 0;
 total_size = 0;
 foreach(lc, subpaths)
 {
  Path    *subpath = (Path *) lfirst(lc);
 
  if (lc == list_head(subpaths)) /* first node? */
   pathnode->path.startup_cost = subpath->startup_cost;
  pathnode->path.total_cost += subpath->total_cost;
  pathnode->path.rows += subpath->rows;
  total_size += subpath->pathtarget->width * subpath->rows;
 }
 
 /* Set width to the average width of the subpath outputs.  XXX this is
  * totally wrong: we should report zero if no RETURNING, else an average
  * of the RETURNING tlist widths.  But it's what happened historically,
  and improving it is a task for another day.*/
 if (pathnode->path.rows > 0)
  total_size /= pathnode->path.rows;
 pathnode->path.pathtarget->width = rint(total_size);
 
 pathnode->operation = operation;
 pathnode->canSetTag = canSetTag;
 pathnode->nominalRelation = nominalRelation;
 pathnode->rootRelation = rootRelation;
 pathnode->partColsUpdated = partColsUpdated;
 pathnode->resultRelations = resultRelations;
 pathnode->subpaths = subpaths;
 pathnode->subroots = subroots;
 pathnode->withCheckOptionLists = withCheckOptionLists;
 pathnode->returningLists = returningLists;
 pathnode->rowMarks = rowMarks;
 pathnode->onconflict = onconflict;
 pathnode->epqParam = epqParam;
 
 return pathnode;
}

现在我们生成了最优的update路径,需要由路径生成执行计划:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
Plan *create_plan(PlannerInfo *root, Path *best_path)
{
 Plan    *plan;
 Assert(root->plan_params == NIL); /* plan_params should not be in use in current query level */
 
 /* Initialize this module's workspace in PlannerInfo */
 root->curOuterRels = NULL;
 root->curOuterParams = NIL;
 
 /* Recursively process the path tree, demanding the correct tlist result */
 plan = create_plan_recurse(root, best_path, CP_EXACT_TLIST); // 实际实现是在这里
 
 /** Make sure the topmost plan node's targetlist exposes the original
  column names and other decorative info.  Targetlists generated within
  * the planner don't bother with that stuff, but we must have it on the
  * top-level tlist seen at execution time.  However, ModifyTable plan
  * nodes don't have a tlist matching the querytree targetlist.*/
 if (!IsA(plan, ModifyTable))
  apply_tlist_labeling(plan->targetlist, root->processed_tlist);
 
 /** Attach any initPlans created in this query level to the topmost plan
  * node.  (In principle the initplans could go in any plan node at or
  * above where they're referenced, but there seems no reason to put them
  * any lower than the topmost node for the query level.  Also, see
  * comments for SS_finalize_plan before you try to change this.)*/
 SS_attach_initplans(root, plan);
 
 /* Check we successfully assigned all NestLoopParams to plan nodes */
 if (root->curOuterParams != NIL)
  elog(ERROR, "failed to assign all NestLoopParams to plan nodes");
 
 /** Reset plan_params to ensure param IDs used for nestloop params are not re-used later*/
 root->plan_params = NIL;
 
 return plan;
}
 
// 由最佳路径生成最佳执行计划
static ModifyTable *create_modifytable_plan(PlannerInfo *root, ModifyTablePath *best_path)
{
 ModifyTable *plan;
 List    *subplans = NIL;
 ListCell   *subpaths,
      *subroots;
 
 /* Build the plan for each input path */
 forboth(subpaths, best_path->subpaths, subroots, best_path->subroots)
 {
  Path    *subpath = (Path *) lfirst(subpaths);
  PlannerInfo *subroot = (PlannerInfo *) lfirst(subroots);
  Plan    *subplan;
 
  /* In an inherited UPDATE/DELETE, reference the per-child modified
   * subroot while creating Plans from Paths for the child rel.  This is
   * a kluge, but otherwise it's too hard to ensure that Plan creation
   * functions (particularly in FDWs) don't depend on the contents of
   * "root" matching what they saw at Path creation time.  The main
   * downside is that creation functions for Plans that might appear
   * below a ModifyTable cannot expect to modify the contents of "root"
   * and have it "stick" for subsequent processing such as setrefs.c.
   * That'not great, but it seems better than the alternative.*/
  subplan = create_plan_recurse(subroot, subpath, CP_EXACT_TLIST);
 
  /* Transfer resname/resjunk labeling, too, to keep executor happy */
  apply_tlist_labeling(subplan->targetlist, subroot->processed_tlist);
 
  subplans = lappend(subplans, subplan);
 }
 
 plan = make_modifytable(root,best_path->operation,best_path->canSetTag,
      best_path->nominalRelation,best_path->rootRelation,
      best_path->partColsUpdated,best_path->resultRelations,
      subplans,best_path->subroots,best_path->withCheckOptionLists,
      best_path->returningLists,best_path->rowMarks,
      best_path->onconflict,best_path->epqParam);
 
 copy_generic_path_info(&plan->plan, &best_path->path);
 
 return plan;
}

最终的执行计划是ModifyTable:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
/* ----------------
 *  ModifyTable node -
 *  Apply rows produced by subplan(s) to result table(s),
 *  by inserting, updating, or deleting.
 *
 * If the originally named target table is a partitioned table, both
 * nominalRelation and rootRelation contain the RT index of the partition
 * root, which is not otherwise mentioned in the plan.  Otherwise rootRelation
 is zero.  However, nominalRelation will always be setas it's the rel that
 * EXPLAIN should claim is the INSERT/UPDATE/DELETE target.
 *
 * Note that rowMarks and epqParam are presumed to be valid for all the
 * subplan(s); they can't contain any info that varies across subplans.
 ----------------*/
typedef struct ModifyTable
{
 Plan  plan;
 CmdType  operation;  /* INSERTUPDATEor DELETE */
 bool  canSetTag;  /* do we set the command tag/es_processed? */
 Index  nominalRelation; /* Parent RT index for use of EXPLAIN */
 Index  rootRelation; /* Root RT index, if target is partitioned */
 bool  partColsUpdated; /* some part key in hierarchy updated */
 List    *resultRelations; /* integer list of RT indexes */
 int   resultRelIndex; /* index of first resultRel in plan's list */
 int   rootResultRelIndex; /* index of the partitioned table root */
 List    *plans;   /* plan(s) producing source data */
 List    *withCheckOptionLists; /* per-target-table WCO lists */
 List    *returningLists; /* per-target-table RETURNING tlists */
 List    *fdwPrivLists; /* per-target-table FDW private data lists */
 Bitmapset  *fdwDirectModifyPlans; /* indices of FDW DM plans */
 List    *rowMarks;  /* PlanRowMarks (non-locking only) */
 int   epqParam;  /* ID of Param for EvalPlanQual re-eval */
 OnConflictAction onConflictAction; /* ON CONFLICT action */
 List    *arbiterIndexes; /* List of ON CONFLICT arbiter index OIDs  */
 List    *onConflictSet; /* SET for INSERT ON CONFLICT DO UPDATE */
 Node    *onConflictWhere; /* WHERE for ON CONFLICT UPDATE */
 Index  exclRelRTI;  /* RTI of the EXCLUDED pseudo relation */
 List    *exclRelTlist; /* tlist of the EXCLUDED pseudo relation */
} ModifyTable;

执行器

根据上面的执行计划,去执行。主要是各种算子的实现,其中要理解执行器的运行原理,主要是火山模型,一次一元组。我们看一下其调用过程。

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
CreatePortal(""truetrue);
PortalDefineQuery(portal,NULL,query_string,commandTag,plantree_list,NULL);
PortalStart(portal, NULL, 0, InvalidSnapshot);
PortalRun(portal,FETCH_ALL,true,true,receiver,receiver,&qc);
--> PortalRunMulti()
 --> ProcessQuery()
  --> ExecutorStart(queryDesc, 0);
   --> standard_ExecutorStart()
    --> estate = CreateExecutorState(); // 创建EState
    --> estate->es_output_cid = GetCurrentCommandId(true); // 获得cid,后面更新的时候要用
    --> InitPlan(queryDesc, eflags);
     --> ExecInitNode(plan, estate, eflags);  
      --> ExecInitModifyTable() // 初始化ModifyTableState
  --> ExecutorRun(queryDesc, ForwardScanDirection, 0L, true);
   --> standard_ExecutorRun()
    --> ExecutePlan()
     --> ExecProcNode(planstate); // 一次一元组 火山模型
      --> node->ExecProcNode(node);
       --> ExecProcNodeFirst(PlanState *node)
        --> node->ExecProcNode(node);
         --> ExecModifyTable(PlanState *pstate)
          --> ExecUpdate()
           --> table_tuple_update(Relation rel, ......)
            --> rel->rd_tableam->tuple_update()
             --> heapam_tuple_update(Relation relation, ......)
              --> heap_update(relation, otid, tuple, cid, ......)
 
  --> ExecutorFinish(queryDesc);
  --> ExecutorEnd(queryDesc);
PortalDrop(portal, false);

关键数据结构:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// ModifyTableState information
typedef struct ModifyTableState
{
 PlanState ps;    /* its first field is NodeTag */
 CmdType  operation;  /* INSERTUPDATEor DELETE */
 bool  canSetTag;  /* do we set the command tag/es_processed? */
 bool  mt_done;  /* are we done? */
 PlanState **mt_plans;  /* subplans (one per target rel) */
 int   mt_nplans;  /* number of plans in the array */
 int   mt_whichplan; /* which one is being executed (0..n-1) */
 TupleTableSlot **mt_scans; /* input tuple corresponding to underlying
         * plans */
 ResultRelInfo *resultRelInfo; /* per-subplan target relations */
 ResultRelInfo *rootResultRelInfo; /* root target relation (partitioned
           table root) */
 List   **mt_arowmarks; /* per-subplan ExecAuxRowMark lists */
 EPQState mt_epqstate; /* for evaluating EvalPlanQual rechecks */
 bool  fireBSTriggers; /* do we need to fire stmt triggers? */
 
 /* Slot for storing tuples in the root partitioned table's rowtype during
  * an UPDATE of a partitioned table. */
 TupleTableSlot *mt_root_tuple_slot;
 
 struct PartitionTupleRouting *mt_partition_tuple_routing; /* Tuple-routing support info */
 
 struct TransitionCaptureState *mt_transition_capture; /* controls transition table population for specified operation */
 
 /* controls transition table population for INSERT...ON CONFLICT UPDATE */
 struct TransitionCaptureState *mt_oc_transition_capture;
 
 /* Per plan map for tuple conversion from child to root */
 TupleConversionMap **mt_per_subplan_tupconv_maps;
} ModifyTableState;

核心执行算子实现:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
/* ----------------------------------------------------------------
 *    ExecModifyTable
 *
 *  Perform table modifications as required, and return RETURNING results
 *  if needed.
 ---------------------------------------------------------------- */
static TupleTableSlot *ExecModifyTable(PlanState *pstate)
{
 ModifyTableState *node = castNode(ModifyTableState, pstate);
 PartitionTupleRouting *proute = node->mt_partition_tuple_routing;
 EState    *estate = node->ps.state;
 CmdType  operation = node->operation;
 ResultRelInfo *saved_resultRelInfo;
 ResultRelInfo *resultRelInfo;
 PlanState  *subplanstate;
 JunkFilter *junkfilter;
 TupleTableSlot *slot;
 TupleTableSlot *planSlot;
 ItemPointer tupleid;
 ItemPointerData tuple_ctid;
 HeapTupleData oldtupdata;
 HeapTuple oldtuple;
 
 CHECK_FOR_INTERRUPTS();
 
 /* This should NOT get called during EvalPlanQual; we should have passed a
  * subplan tree to EvalPlanQual, instead.  Use a runtime test not just
  * Assert because this condition is easy to miss in testing. */
 if (estate->es_epq_active != NULL)
  elog(ERROR, "ModifyTable should not be called during EvalPlanQual");
 
 /* If we've already completed processing, don't try to do more.  We need
  * this test because ExecPostprocessPlan might call us an extra timeand
  * our subplan's nodes aren't necessarily robust against being called
  * extra times.*/
 if (node->mt_done)
  return NULL;
 
 /* On first call, fire BEFORE STATEMENT triggers before proceeding.*/
 if (node->fireBSTriggers)
 {
  fireBSTriggers(node);
  node->fireBSTriggers = false;
 }
 
 /* Preload local variables */
 resultRelInfo = node->resultRelInfo + node->mt_whichplan;
 subplanstate = node->mt_plans[node->mt_whichplan];
 junkfilter = resultRelInfo->ri_junkFilter;
 
 /* es_result_relation_info must point to the currently active result relation while we are within this ModifyTable node.  
  * Even though ModifyTable nodes can't be nested statically, they can be nested
  * dynamically (since our subplan could include a reference to a modifying
  * CTE).  So we have to save and restore the caller's value.*/
 saved_resultRelInfo = estate->es_result_relation_info;
 estate->es_result_relation_info = resultRelInfo;
 
 /* Fetch rows from subplan(s), and execute the required table modification for each row.*/
 for (;;)
 {
  /* Reset the per-output-tuple exprcontext.  This is needed because
   * triggers expect to use that context as workspace.  It's a bit ugly
   * to do this below the top level of the plan, however.  We might need to rethink this later.*/
  ResetPerTupleExprContext(estate);
 
  /* Reset per-tuple memory context used for processing on conflict and
   * returning clauses, to free any expression evaluation storage allocated in the previous cycle. */
  if (pstate->ps_ExprContext)
   ResetExprContext(pstate->ps_ExprContext);
 
  planSlot = ExecProcNode(subplanstate);
  if (TupIsNull(planSlot))
  {
   /* advance to next subplan if any */
   node->mt_whichplan++; // 分区表的update,每个分区分布对应一个subplan,当执行完一个分区再执行下一个分区
   if (node->mt_whichplan < node->mt_nplans)
   {
    resultRelInfo++;
    subplanstate = node->mt_plans[node->mt_whichplan];
    junkfilter = resultRelInfo->ri_junkFilter;
    estate->es_result_relation_info = resultRelInfo;
    EvalPlanQualSetPlan(&node->mt_epqstate, subplanstate->plan, node->mt_arowmarks[node->mt_whichplan]);
    /* Prepare to convert transition tuples from this child. */
    if (node->mt_transition_capture != NULL) {
     node->mt_transition_capture->tcs_map = tupconv_map_for_subplan(node, node->mt_whichplan);
    }
    if (node->mt_oc_transition_capture != NULL) {
     node->mt_oc_transition_capture->tcs_map = tupconv_map_for_subplan(node, node->mt_whichplan);
    }
    continue;
   }
   else
    break;
  }
 
  /* Ensure input tuple is the right format for the target relation.*/
  if (node->mt_scans[node->mt_whichplan]->tts_ops != planSlot->tts_ops) {
   ExecCopySlot(node->mt_scans[node->mt_whichplan], planSlot);
   planSlot = node->mt_scans[node->mt_whichplan];
  }
 
  /* If resultRelInfo->ri_usesFdwDirectModify is true, all we need to do here is compute the RETURNING expressions.*/
  if (resultRelInfo->ri_usesFdwDirectModify)
  {
   Assert(resultRelInfo->ri_projectReturning);
   slot = ExecProcessReturning(resultRelInfo->ri_projectReturning, RelationGetRelid(resultRelInfo->ri_RelationDesc), NULL, planSlot);
 
   estate->es_result_relation_info = saved_resultRelInfo;
   return slot;
  }
 
  EvalPlanQualSetSlot(&node->mt_epqstate, planSlot);
  slot = planSlot;
 
  tupleid = NULL;
  oldtuple = NULL;
  if (junkfilter != NULL)
  {
   /* extract the 'ctid' or 'wholerow' junk attribute.*/
   if (operation == CMD_UPDATE || operation == CMD_DELETE)
   {
    char  relkind;
    Datum  datum;
    bool  isNull;
 
    relkind = resultRelInfo->ri_RelationDesc->rd_rel->relkind;
    if (relkind == RELKIND_RELATION || relkind == RELKIND_MATVIEW)
    {
     datum = ExecGetJunkAttribute(slot,junkfilter->jf_junkAttNo,&isNull);
     /* shouldn't ever get a null result... */
     if (isNull)
      elog(ERROR, "ctid is NULL");
 
     tupleid = (ItemPointer) DatumGetPointer(datum);
     tuple_ctid = *tupleid; /* be sure we don't free ctid!! */
     tupleid = &tuple_ctid;
    }
    /* Use the wholerow attribute, when available, to reconstruct the old relation tuple.*/
    else if (AttributeNumberIsValid(junkfilter->jf_junkAttNo))
    {
     datum = ExecGetJunkAttribute(slot,junkfilter->jf_junkAttNo,&isNull);
     /* shouldn't ever get a null result... */
     if (isNull)
      elog(ERROR, "wholerow is NULL");
 
     oldtupdata.t_data = DatumGetHeapTupleHeader(datum);
     oldtupdata.t_len = HeapTupleHeaderGetDatumLength(oldtupdata.t_data);
     ItemPointerSetInvalid(&(oldtupdata.t_self));
     /* Historically, view triggers see invalid t_tableOid. */
     oldtupdata.t_tableOid = (relkind == RELKIND_VIEW) ? InvalidOid : RelationGetRelid(resultRelInfo->ri_RelationDesc);
     oldtuple = &oldtupdata;
    }
    else
     Assert(relkind == RELKIND_FOREIGN_TABLE);
   }
 
   /* apply the junkfilter if needed. */
   if (operation != CMD_DELETE)
    slot = ExecFilterJunk(junkfilter, slot);
  }
 
  switch (operation)
  {
   case CMD_INSERT:
    if (proute)    /* Prepare for tuple routing if needed. */
     slot = ExecPrepareTupleRouting(node, estate, proute, resultRelInfo, slot);
    slot = ExecInsert(node, slot, planSlot, NULL, estate->es_result_relation_info, estate, node->canSetTag);
    if (proute)    /* Revert ExecPrepareTupleRouting's state change. */
     estate->es_result_relation_info = resultRelInfo;
    break;
   case CMD_UPDATE:
    slot = ExecUpdate(node, tupleid, oldtuple, slot, planSlot,
          &node->mt_epqstate, estate, node->canSetTag);
    break;
   case CMD_DELETE:
    slot = ExecDelete(node, tupleid, oldtuple, planSlot,
          &node->mt_epqstate, estate,
          true, node->canSetTag, false /* changingPart */ , NULL, NULL);
    break;
   default:
    elog(ERROR, "unknown operation");
    break;
  }
 
  /* If we got a RETURNING result, return it to caller.  We'll continue the work on next call.*/
  if (slot) {
   estate->es_result_relation_info = saved_resultRelInfo;
   return slot;
  }
 }
 
 estate->es_result_relation_info = saved_resultRelInfo; /* Restore es_result_relation_info before exiting */
 fireASTriggers(node); /* We're done, but fire AFTER STATEMENT triggers before exiting.*/
 
 node->mt_done = true;
 
 return NULL;
}

我们看一下具体执行Update的实现

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
```c++
/* ----------------------------------------------------------------
 *  ExecUpdate
 *
 *  note: we can't run UPDATE queries with transactions off because UPDATEs are actually INSERTs and our
 *  scan will mistakenly loop forever, updating the tuple it just inserted..  This should be fixed but until it
 *  is, we don't want to get stuck in an infinite loop which corrupts your database..
 *
 *  When updating a table, tupleid identifies the tuple to update and oldtuple is NULL.  
 *
 *  Returns RETURNING result if any, otherwise NULL.
 ----------------------------------------------------------------*/
static TupleTableSlot *
ExecUpdate(ModifyTableState *mtstate,
     ItemPointer tupleid,
     HeapTuple oldtuple,
     TupleTableSlot *slot,
     TupleTableSlot *planSlot,
     EPQState *epqstate,
     EState *estate,
     bool canSetTag)
{
 ResultRelInfo *resultRelInfo;
 Relation resultRelationDesc;
 TM_Result result;
 TM_FailureData tmfd;
 List    *recheckIndexes = NIL;
 TupleConversionMap *saved_tcs_map = NULL;
 
 /* abort the operation if not running transactions*/
 if (IsBootstrapProcessingMode())
  elog(ERROR, "cannot UPDATE during bootstrap");
 
 ExecMaterializeSlot(slot);
 
 /* get information on the (current) result relation*/
 resultRelInfo = estate->es_result_relation_info;
 resultRelationDesc = resultRelInfo->ri_RelationDesc;
 
 /* BEFORE ROW UPDATE Triggers */
 if (resultRelInfo->ri_TrigDesc && resultRelInfo->ri_TrigDesc->trig_update_before_row)
 {
  if (!ExecBRUpdateTriggers(estate, epqstate, resultRelInfo, tupleid, oldtuple, slot))
   return NULL;  /* "do nothing" */
 }
 
 /* INSTEAD OF ROW UPDATE Triggers */
 if (resultRelInfo->ri_TrigDesc && resultRelInfo->ri_TrigDesc->trig_update_instead_row)
 {
  if (!ExecIRUpdateTriggers(estate, resultRelInfo, oldtuple, slot))
   return NULL;  /* "do nothing" */
 }
 else if (resultRelInfo->ri_FdwRoutine)
 {
  /* Compute stored generated columns*/
  if (resultRelationDesc->rd_att->constr && resultRelationDesc->rd_att->constr->has_generated_stored)
   ExecComputeStoredGenerated(estate, slot, CMD_UPDATE);
 
  /* update in foreign table: let the FDW do it*/
  slot = resultRelInfo->ri_FdwRoutine->ExecForeignUpdate(estate, resultRelInfo, slot, planSlot);
 
  if (slot == NULL)  /* "do nothing" */
   return NULL;
 
  /* AFTER ROW Triggers or RETURNING expressions might reference the
   * tableoid column, so (re-)initialize tts_tableOid before evaluating them. */
  slot->tts_tableOid = RelationGetRelid(resultRelationDesc);
 }
 else
 {
  LockTupleMode lockmode;
  bool  partition_constraint_failed;
  bool  update_indexes;
 
  /* Constraints might reference the tableoid column, so (re-)initialize
   * tts_tableOid before evaluating them.*/
  slot->tts_tableOid = RelationGetRelid(resultRelationDesc);
 
  /* Compute stored generated columns*/
  if (resultRelationDesc->rd_att->constr && resultRelationDesc->rd_att->constr->has_generated_stored)
   ExecComputeStoredGenerated(estate, slot, CMD_UPDATE);
 
  /*
   Check any RLS UPDATE WITH CHECK policies
   *
   * If we generate a new candidate tuple after EvalPlanQual testing, we
   * must loop back here and recheck any RLS policies and constraints.
   * (We don't need to redo triggers, however.  If there are any BEFORE
   * triggers then trigger.c will have done table_tuple_lock to lock the
   * correct tuple, so there'no need to do them again.) */
lreplace:;
 
  /* ensure slot is independent, consider e.g. EPQ */
  ExecMaterializeSlot(slot);
 
  /* If partition constraint fails, this row might get moved to another
   * partition, in which case we should check the RLS CHECK policy just
   * before inserting into the new partition, rather than doing it here.
   * This is because a trigger on that partition might again change the
   * row.  So skip the WCO checks if the partition constraint fails. */
  partition_constraint_failed = resultRelInfo->ri_PartitionCheck && !ExecPartitionCheck(resultRelInfo, slot, estate, false);
 
  if (!partition_constraint_failed && resultRelInfo->ri_WithCheckOptions != NIL)
  {
   /* ExecWithCheckOptions() will skip any WCOs which are not of the kind we are looking for at this point. */
   ExecWithCheckOptions(WCO_RLS_UPDATE_CHECK, resultRelInfo, slot, estate);
  }
 
  /* If a partition check failed, try to move the row into the right partition.*/
  if (partition_constraint_failed)
  {
   bool  tuple_deleted;
   TupleTableSlot *ret_slot;
   TupleTableSlot *orig_slot = slot;
   TupleTableSlot *epqslot = NULL;
   PartitionTupleRouting *proute = mtstate->mt_partition_tuple_routing;
   int   map_index;
   TupleConversionMap *tupconv_map;
 
   /* Disallow an INSERT ON CONFLICT DO UPDATE that causes the
    * original row to migrate to a different partition.  Maybe this
    * can be implemented some day, but it seems a fringe feature with
    * little redeeming value.*/
   if (((ModifyTable *) mtstate->ps.plan)->onConflictAction == ONCONFLICT_UPDATE)
    ereport(ERROR,
      (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
       errmsg("invalid ON UPDATE specification"),
       errdetail("The result tuple would appear in a different partition than the original tuple.")));
 
   /* When an UPDATE is run on a leaf partition, we will not have
    * partition tuple routing set up. In that case, fail with
    * partition constraint violation error.*/
   if (proute == NULL)
    ExecPartitionCheckEmitError(resultRelInfo, slot, estate);
 
   /* Row movement, part 1.  Delete the tuple, but skip RETURNING
    * processing. We want to return rows from INSERT.*/
   ExecDelete(mtstate, tupleid, oldtuple, planSlot, epqstate, estate, falsefalse /* canSetTag */ , true /* changingPart */ , &tuple_deleted, &epqslot);
 
   /* For some reason if DELETE didn't happen (e.g. trigger prevented
    * it, or it was already deleted by self, or it was concurrently
    * deleted by another transaction), then we should skip the insert
    * as well; otherwise, an UPDATE could cause an increase in the
    * total number of rows across all partitions, which is clearly wrong.
    *
    * For a normal UPDATE, the case where the tuple has been the
    * subject of a concurrent UPDATE or DELETE would be handled by
    * the EvalPlanQual machinery, but for an UPDATE that we've
    * translated into DELETE from this partition and an INSERT into
    some other partition, that's not available, because CTID chains
    * can't span relation boundaries.  We mimic the semantics to a
    * limited extent by skipping the INSERT if the DELETE fails to
    * find a tuple. This ensures that two concurrent attempts to
    UPDATE the same tuple at the same time can't turn one tuple
    * into two, and that an UPDATE of a just-deleted tuple can't resurrect it.*/
   if (!tuple_deleted)
   {
    /*
     * epqslot will be typically NULL.  But when ExecDelete()
     * finds that another transaction has concurrently updated the
     * same row, it re-fetches the row, skips the deleteand
     * epqslot is set to the re-fetched tuple slot. In that case,
     * we need to do all the checks again.
     */
    if (TupIsNull(epqslot))
     return NULL;
    else
    {
     slot = ExecFilterJunk(resultRelInfo->ri_junkFilter, epqslot);
     goto lreplace;
    }
   }
 
   /* Updates set the transition capture map only when a new subplan
    is chosen.  But for inserts, it is set for each row. So after
    INSERT, we need to revert back to the map created for UPDATE;
    * otherwise the next UPDATE will incorrectly use the one created
    for INSERT.  So first save the one created for UPDATE. */
   if (mtstate->mt_transition_capture)
    saved_tcs_map = mtstate->mt_transition_capture->tcs_map;
 
   /* resultRelInfo is one of the per-subplan resultRelInfos.  So we
    * should convert the tuple into root's tuple descriptor, since
    * ExecInsert() starts the search from root.  The tuple conversion
    * map list is in the order of mtstate->resultRelInfo[], so to
    * retrieve the one for this resultRel, we need to know the
    * position of the resultRel in mtstate->resultRelInfo[]. */
   map_index = resultRelInfo - mtstate->resultRelInfo;
   Assert(map_index >= 0 && map_index < mtstate->mt_nplans);
   tupconv_map = tupconv_map_for_subplan(mtstate, map_index);
   if (tupconv_map != NULL)
    slot = execute_attr_map_slot(tupconv_map->attrMap, slot, mtstate->mt_root_tuple_slot);
 
   /* Prepare for tuple routing, making it look like we're inserting into the root. */
   Assert(mtstate->rootResultRelInfo != NULL);
   slot = ExecPrepareTupleRouting(mtstate, estate, proute, mtstate->rootResultRelInfo, slot);
 
   ret_slot = ExecInsert(mtstate, slot, planSlot,
          orig_slot, resultRelInfo,
          estate, canSetTag);
 
   /* Revert ExecPrepareTupleRouting's node change. */
   estate->es_result_relation_info = resultRelInfo;
   if (mtstate->mt_transition_capture)
   {
    mtstate->mt_transition_capture->tcs_original_insert_tuple = NULL;
    mtstate->mt_transition_capture->tcs_map = saved_tcs_map;
   }
 
   return ret_slot;
  }
 
  /* Check the constraints of the tuple.  We've already checked the
   * partition constraint above; however, we must still ensure the tuple
   * passes all other constraints, so we will call ExecConstraints() and
   * have it validate all remaining checks.*/
  if (resultRelationDesc->rd_att->constr)
   ExecConstraints(resultRelInfo, slot, estate);
 
  /* replace the heap tuple
   *
   * Note: if es_crosscheck_snapshot isn't InvalidSnapshot, we check
   * that the row to be updated is visible to that snapshot, and throw a
   * can't-serialize error if not. This is a special-case behavior
   * needed for referential integrity updates in transaction-snapshot mode transactions. */
  result = table_tuple_update(resultRelationDesc, tupleid, slot, estate->es_output_cid,
         estate->es_snapshot, estate->es_crosscheck_snapshot, true /* wait for commit */ ,&tmfd, &lockmode, &update_indexes);
 
  switch (result)
  {
   case TM_SelfModified:
 
    /* The target tuple was already updated or deleted by the
     current command, or by a later command in the current
     transaction.  The former case is possible in join UPDATE
     where multiple tuples join to the same target tuple. This
     is pretty questionable, but Postgres has always allowed it:
     * we just execute the first update action and ignore
     * additional update attempts.
     *
     * The latter case arises if the tuple is modified by a
     * command in a BEFORE triggeror perhaps by a command in a
     * volatile function used in the query.  In such situations we
     * should not ignore the update, but it is equally unsafe to
     * proceed.  We don't want to discard the original UPDATE
     * while keeping the triggered actions based on it; and we
     * have no principled way to merge this update with the
     * previous ones.  So throwing an error is the only safe
     * course.
     *
     * If a trigger actually intends this type of interaction, it
     * can re-execute the UPDATE (assuming it can figure out how)
     * and then return NULL to cancel the outer update.*/
    if (tmfd.cmax != estate->es_output_cid)
     ereport(ERROR,(errcode(ERRCODE_TRIGGERED_DATA_CHANGE_VIOLATION),
        errmsg("tuple to be updated was already modified by an operation triggered by the current command"),
        errhint("Consider using an AFTER trigger instead of a BEFORE trigger to propagate changes to other rows.")));
 
    /* Else, already updated by self; nothing to do */
    return NULL;
 
   case TM_Ok:
    break;
 
   case TM_Updated:
    {
     TupleTableSlot *inputslot;
     TupleTableSlot *epqslot;
 
     if (IsolationUsesXactSnapshot())
      ereport(ERROR,(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),errmsg("could not serialize access due to concurrent update")));
 
     /* Already know that we're going to need to do EPQ, so fetch tuple directly into the right slot. */
     inputslot = EvalPlanQualSlot(epqstate, resultRelationDesc,resultRelInfo->ri_RangeTableIndex);
 
     result = table_tuple_lock(resultRelationDesc, tupleid, estate->es_snapshot,inputslot, estate->es_output_cid, lockmode, LockWaitBlock, TUPLE_LOCK_FLAG_FIND_LAST_VERSION,&tmfd);
 
     switch (result)
     {
      case TM_Ok:
       Assert(tmfd.traversed);
       epqslot = EvalPlanQual(epqstate, resultRelationDesc, resultRelInfo->ri_RangeTableIndex, inputslot);
       if (TupIsNull(epqslot))
        /* Tuple not passing quals anymore, exiting... */
        return NULL;
 
       slot = ExecFilterJunk(resultRelInfo->ri_junkFilter, epqslot);
       goto lreplace;
 
      case TM_Deleted:
       /* tuple already deleted; nothing to do */
       return NULL;
 
      case TM_SelfModified:
 
       /*
        * This can be reached when following an update chain from a tuple updated by another session,
        * reaching a tuple that was already updated in this transaction. If previously modified by
        * this command, ignore the redundant update, otherwise error out.
        *
        * See also TM_SelfModified response to table_tuple_update() above.*/
       if (tmfd.cmax != estate->es_output_cid)
        ereport(ERROR,(errcode(ERRCODE_TRIGGERED_DATA_CHANGE_VIOLATION),
           errmsg("tuple to be updated was already modified by an operation triggered by the current command"),errhint("Consider using an AFTER trigger instead of a BEFORE trigger to propagate changes to other rows.")));
       return NULL;
 
      default:
       /* see table_tuple_lock call in ExecDelete() */
       elog(ERROR, "unexpected table_tuple_lock status: %u", result);
       return NULL;
     }
    }
 
    break;
 
   case TM_Deleted:
    if (IsolationUsesXactSnapshot())
     ereport(ERROR,(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),errmsg("could not serialize access due to concurrent delete")));
    /* tuple already deleted; nothing to do */
    return NULL;
 
   default:
    elog(ERROR, "unrecognized table_tuple_update status: %u",
      result);
    return NULL;
  }
 
  /* insert index entries for tuple if necessary */
  if (resultRelInfo->ri_NumIndices > 0 && update_indexes)
   recheckIndexes = ExecInsertIndexTuples(slot, estate, falseNULL, NIL);
 }
 
 if (canSetTag)
  (estate->es_processed)++;
 
 /* AFTER ROW UPDATE Triggers */
 ExecARUpdateTriggers(estate, resultRelInfo, tupleid, oldtuple, slot,recheckIndexes,mtstate->operation == CMD_INSERT ?mtstate->mt_oc_transition_capture : mtstate->mt_transition_capture);
 
 list_free(recheckIndexes);
 
 /* Check any WITH CHECK OPTION constraints from parent views.  We are
  * required to do this after testing all constraints and uniqueness
  * violations per the SQL spec, so we do it after actually updating the
  * record in the heap and all indexes.
  *
  * ExecWithCheckOptions() will skip any WCOs which are not of the kind we
  * are looking for at this point. */
 if (resultRelInfo->ri_WithCheckOptions != NIL)
  ExecWithCheckOptions(WCO_VIEW_CHECK, resultRelInfo, slot, estate);
 
 if (resultRelInfo->ri_projectReturning) /* Process RETURNING if present */
  return ExecProcessReturning(resultRelInfo->ri_projectReturning,RelationGetRelid(resultRelationDesc),slot, planSlot);
 
 return NULL;
}

再往下就是涉及到存储引擎的部分了,我们重点看一下其对外的接口输入参数。重点是这4个参数:

  • relation - table to be modified (caller must hold suitable lock) (要更新的那个表)

  • otid - TID of old tuple to be replaced (要更新的元组ID,对应的是老的元组,更新后相当于是插入一条新元组,老元组的tid值要更新为新的tid值)

  • slot - newly constructed tuple data to store (新元组的值)

  • cid - update command ID (used for visibility test, and stored into cmax/cmin if successful) (cid值,事务相关) 执行器层面的更新算子是建立在存储引擎提供的底层table_tuple_update接口之上的。是我们编写ExecUpdate以及ExecModifyTable的基础。

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
/*
 Update a tuple.
 
 * Input parameters:
 * relation - table to be modified (caller must hold suitable lock)
 * otid - TID of old tuple to be replaced
 * slot - newly constructed tuple data to store
 * cid - update command ID (used for visibility test, and stored into cmax/cmin if successful)
 * crosscheck - if not InvalidSnapshot, also check old tuple against this
 * wait - true if should wait for any conflicting update to commit/abort
 
 Output parameters:
 * tmfd - filled in failure cases (see below)
 * lockmode - filled with lock mode acquired on tuple
 *  update_indexes - in success cases this is set to true if new index entries are required for this tuple
 *
 * Normal, successful return value is TM_Ok, which means we did actually update it. */
static inline TM_Result
table_tuple_update(Relation rel, ItemPointer otid, TupleTableSlot *slot, CommandId cid, 
       Snapshot snapshot, Snapshot crosscheck, bool wait, TM_FailureData *tmfd, LockTupleMode *lockmode, bool *update_indexes)
{
 return rel->rd_tableam->tuple_update(rel, otid, slot, cid, 
           snapshot, crosscheck, wait, tmfd, lockmode, update_indexes);
}

事务

这一块主要是要理解PG中update语句并不是原地更新元组,而是插入一条新元组。因为PG实现MVCC与Mysql,Oracle的实现方式有所不同,并不是通过undo日志实现的,相当于把undo日志记录到了原有的表中,并不是单独存放在一个地方。具体的不再细述,内容太多了,以后再分析事务部分。

好了,内容很多,分析源码的时候,涉及到的知识点以及逻辑是非常多的,我们最好每次分析只抓一个主干,不然每个都分析,最后就会比较乱。就先分析到这里吧。

总结

到此这篇关于Postgres中UPDATE更新语句源码分析的文章就介绍到这了,更多相关Postgres中UPDATE源码内容请搜索服务器之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持服务器之家!

原文链接:https://mp.weixin.qq.com/s/e3yiVWQp9ho6ZUD6RgwvbA

延伸 · 阅读

精彩推荐