diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c new file mode 100644 index 246a3a9..bfc4fef *** a/contrib/postgres_fdw/postgres_fdw.c --- b/contrib/postgres_fdw/postgres_fdw.c *************** postgresAddForeignUpdateTargets(Query *p *** 1093,1099 **** */ /* Make a Var representing the desired value */ ! var = makeVar(parsetree->resultRelation, SelfItemPointerAttributeNumber, TIDOID, -1, --- 1093,1099 ---- */ /* Make a Var representing the desired value */ ! var = makeVar(parsetree->sourceRelation, SelfItemPointerAttributeNumber, TIDOID, -1, diff --git a/doc/src/sgml/ref/create_view.sgml b/doc/src/sgml/ref/create_view.sgml new file mode 100644 index e0fbe1e..888410f *** a/doc/src/sgml/ref/create_view.sgml --- b/doc/src/sgml/ref/create_view.sgml *************** CREATE VIEW vista AS SELECT text 'Hello *** 323,334 **** or set-returning functions. - - - - The view must not have the security_barrier property. - - --- 323,328 ---- diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c new file mode 100644 index 65f3b98..ffe0e20 *** a/src/backend/nodes/copyfuncs.c --- b/src/backend/nodes/copyfuncs.c *************** _copyRangeTblEntry(const RangeTblEntry * *** 2003,2008 **** --- 2003,2009 ---- COPY_SCALAR_FIELD(checkAsUser); COPY_BITMAPSET_FIELD(selectedCols); COPY_BITMAPSET_FIELD(modifiedCols); + COPY_NODE_FIELD(securityQuals); return newnode; } *************** _copyQuery(const Query *from) *** 2451,2456 **** --- 2452,2458 ---- COPY_SCALAR_FIELD(queryId); COPY_SCALAR_FIELD(canSetTag); COPY_NODE_FIELD(utilityStmt); + COPY_SCALAR_FIELD(sourceRelation); COPY_SCALAR_FIELD(resultRelation); COPY_SCALAR_FIELD(hasAggs); COPY_SCALAR_FIELD(hasWindowFuncs); diff --git a/src/backend/nodes/equalfuncs.c b/src/backend/nodes/equalfuncs.c new file mode 100644 index 4c9b05e..27331d5 *** a/src/backend/nodes/equalfuncs.c --- b/src/backend/nodes/equalfuncs.c *************** _equalQuery(const Query *a, const Query *** 846,851 **** --- 846,852 ---- /* we intentionally ignore queryId, since it might not be set */ COMPARE_SCALAR_FIELD(canSetTag); COMPARE_NODE_FIELD(utilityStmt); + COMPARE_SCALAR_FIELD(sourceRelation); COMPARE_SCALAR_FIELD(resultRelation); COMPARE_SCALAR_FIELD(hasAggs); COMPARE_SCALAR_FIELD(hasWindowFuncs); *************** _equalRangeTblEntry(const RangeTblEntry *** 2258,2263 **** --- 2259,2265 ---- COMPARE_SCALAR_FIELD(checkAsUser); COMPARE_BITMAPSET_FIELD(selectedCols); COMPARE_BITMAPSET_FIELD(modifiedCols); + COMPARE_NODE_FIELD(securityQuals); return true; } diff --git a/src/backend/nodes/nodeFuncs.c b/src/backend/nodes/nodeFuncs.c new file mode 100644 index 908f397..38cced1 *** a/src/backend/nodes/nodeFuncs.c --- b/src/backend/nodes/nodeFuncs.c *************** range_table_walker(List *rtable, *** 2008,2013 **** --- 2008,2016 ---- return true; break; } + + if (walker(rte->securityQuals, context)) + return true; } return false; } *************** range_table_mutator(List *rtable, *** 2731,2736 **** --- 2734,2740 ---- MUTATE(newrte->values_lists, rte->values_lists, List *); break; } + MUTATE(newrte->securityQuals, rte->securityQuals, List *); newrt = lappend(newrt, newrte); } return newrt; diff --git a/src/backend/nodes/outfuncs.c b/src/backend/nodes/outfuncs.c new file mode 100644 index 817b149..092e7b9 *** a/src/backend/nodes/outfuncs.c --- b/src/backend/nodes/outfuncs.c *************** _outQuery(StringInfo str, const Query *n *** 2239,2244 **** --- 2239,2245 ---- else appendStringInfoString(str, " :utilityStmt <>"); + WRITE_INT_FIELD(sourceRelation); WRITE_INT_FIELD(resultRelation); WRITE_BOOL_FIELD(hasAggs); WRITE_BOOL_FIELD(hasWindowFuncs); *************** _outRangeTblEntry(StringInfo str, const *** 2411,2416 **** --- 2412,2418 ---- WRITE_OID_FIELD(checkAsUser); WRITE_BITMAPSET_FIELD(selectedCols); WRITE_BITMAPSET_FIELD(modifiedCols); + WRITE_NODE_FIELD(securityQuals); } static void diff --git a/src/backend/nodes/readfuncs.c b/src/backend/nodes/readfuncs.c new file mode 100644 index d325bb3..719d6ba *** a/src/backend/nodes/readfuncs.c --- b/src/backend/nodes/readfuncs.c *************** _readQuery(void) *** 198,203 **** --- 198,204 ---- local_node->queryId = 0; /* not saved in output format */ READ_BOOL_FIELD(canSetTag); READ_NODE_FIELD(utilityStmt); + READ_INT_FIELD(sourceRelation); READ_INT_FIELD(resultRelation); READ_BOOL_FIELD(hasAggs); READ_BOOL_FIELD(hasWindowFuncs); *************** _readRangeTblEntry(void) *** 1251,1256 **** --- 1252,1258 ---- READ_OID_FIELD(checkAsUser); READ_BITMAPSET_FIELD(selectedCols); READ_BITMAPSET_FIELD(modifiedCols); + READ_NODE_FIELD(securityQuals); READ_DONE(); } diff --git a/src/backend/optimizer/plan/planner.c b/src/backend/optimizer/plan/planner.c new file mode 100644 index d8aa35d..19031bf *** a/src/backend/optimizer/plan/planner.c --- b/src/backend/optimizer/plan/planner.c *************** inheritance_planner(PlannerInfo *root) *** 891,896 **** --- 891,898 ---- rte = copyObject(rte); subroot.parse->rtable = lappend(subroot.parse->rtable, rte); + if (subroot.parse->sourceRelation == rti) + subroot.parse->sourceRelation = newrti; } rti++; } diff --git a/src/backend/optimizer/prep/preptlist.c b/src/backend/optimizer/prep/preptlist.c new file mode 100644 index fb67f9e..75a6c5d *** a/src/backend/optimizer/prep/preptlist.c --- b/src/backend/optimizer/prep/preptlist.c *************** *** 36,45 **** #include "utils/rel.h" ! static List *expand_targetlist(List *tlist, int command_type, ! Index result_relation, List *range_table); /* * preprocess_targetlist * Driver for preprocessing the parse tree targetlist. --- 36,74 ---- #include "utils/rel.h" ! static List *expand_targetlist(Query *parse, List *tlist, int command_type, ! Index source_relation, Index result_relation, ! List *range_table); + static AttrNumber + lookup_varattno(Query *parse, AttrNumber attno, Index rt_index, List *rtables) + { + RangeTblEntry *rte = rt_fetch(rt_index, rtables); + + if (rte->rtekind == RTE_SUBQUERY && rt_index == parse->sourceRelation) + { + ListCell *cell; + + foreach (cell, rte->subquery->targetList) + { + TargetEntry *tle = lfirst(cell); + Var *var; + + if (IsA(tle->expr, Const)) + continue; + + var = (Var *) tle->expr; + Assert(IsA(var, Var)); + + if (var->varattno == attno) + return tle->resno; + } + elog(ERROR, "attno %d not found in subquery targetlist", attno); + } + return attno; + } + /* * preprocess_targetlist * Driver for preprocessing the parse tree targetlist. *************** preprocess_targetlist(PlannerInfo *root, *** 73,80 **** * 10/94 */ if (command_type == CMD_INSERT || command_type == CMD_UPDATE) ! tlist = expand_targetlist(tlist, command_type, result_relation, range_table); /* * Add necessary junk columns for rowmarked rels. These values are needed --- 102,113 ---- * 10/94 */ if (command_type == CMD_INSERT || command_type == CMD_UPDATE) ! { ! Index source_relation = parse->sourceRelation > 0 ? ! parse->sourceRelation : result_relation; ! tlist = expand_targetlist(parse, tlist, command_type, source_relation, result_relation, range_table); + } /* * Add necessary junk columns for rowmarked rels. These values are needed *************** preprocess_targetlist(PlannerInfo *root, *** 96,102 **** { /* It's a regular table, so fetch its TID */ var = makeVar(rc->rti, ! SelfItemPointerAttributeNumber, TIDOID, -1, InvalidOid, --- 129,136 ---- { /* It's a regular table, so fetch its TID */ var = makeVar(rc->rti, ! lookup_varattno(parse, SelfItemPointerAttributeNumber, ! rc->rti, range_table), TIDOID, -1, InvalidOid, *************** preprocess_targetlist(PlannerInfo *root, *** 112,118 **** if (rc->isParent) { var = makeVar(rc->rti, ! TableOidAttributeNumber, OIDOID, -1, InvalidOid, --- 146,153 ---- if (rc->isParent) { var = makeVar(rc->rti, ! lookup_varattno(parse, TableOidAttributeNumber, ! rc->rti, range_table), OIDOID, -1, InvalidOid, *************** preprocess_targetlist(PlannerInfo *root, *** 194,201 **** * non-junk attributes appear in proper field order. */ static List * ! expand_targetlist(List *tlist, int command_type, ! Index result_relation, List *range_table) { List *new_tlist = NIL; ListCell *tlist_item; --- 229,237 ---- * non-junk attributes appear in proper field order. */ static List * ! expand_targetlist(Query *parse, List *tlist, int command_type, ! Index source_relation, Index result_relation, ! List *range_table) { List *new_tlist = NIL; ListCell *tlist_item; *************** expand_targetlist(List *tlist, int comma *** 298,305 **** case CMD_UPDATE: if (!att_tup->attisdropped) { ! new_expr = (Node *) makeVar(result_relation, ! attrno, atttype, atttypmod, attcollation, --- 334,344 ---- case CMD_UPDATE: if (!att_tup->attisdropped) { ! new_expr = (Node *) makeVar(source_relation, ! lookup_varattno(parse, ! attrno, ! source_relation, ! range_table), atttype, atttypmod, attcollation, diff --git a/src/backend/rewrite/rewriteHandler.c b/src/backend/rewrite/rewriteHandler.c new file mode 100644 index c52a374..660d62f *** a/src/backend/rewrite/rewriteHandler.c --- b/src/backend/rewrite/rewriteHandler.c *************** *** 14,19 **** --- 14,20 ---- #include "postgres.h" #include "access/sysattr.h" + #include "catalog/heap.h" #include "catalog/pg_type.h" #include "commands/trigger.h" #include "foreign/fdwapi.h" *************** static Query *fireRIRrules(Query *parset *** 63,68 **** --- 64,70 ---- bool forUpdatePushedDown); static bool view_has_instead_trigger(Relation view, CmdType event); static Bitmapset *adjust_view_column_set(Bitmapset *cols, List *targetlist); + static Query *rewriteSecurityQuals(Query *parsetree); /* *************** view_query_is_auto_updatable(Query *view *** 2048,2061 **** return gettext_noop("Views that return set-returning functions are not automatically updatable."); /* - * For now, we also don't support security-barrier views, because of the - * difficulty of keeping upper-level qual expressions away from - * lower-level data. This might get relaxed in the future. - */ - if (security_barrier) - return gettext_noop("Security-barrier views are not automatically updatable."); - - /* * The view query should select from a single base relation, which must be * a table or another view. */ --- 2050,2055 ---- *************** rewriteTargetView(Query *parsetree, Rela *** 2664,2669 **** --- 2658,2671 ---- view_targetlist); /* + * Move any security barrier quals from the view RTE onto the new target + * RTE. Any such quals should now apply to the new target RTE and will not + * reference the original view RTE in the rewritten query. + */ + new_rte->securityQuals = view_rte->securityQuals; + view_rte->securityQuals = NIL; + + /* * For UPDATE/DELETE, rewriteTargetListUD will have added a wholerow junk * TLE for the view to the end of the targetlist, which we no longer need. * Remove it to avoid unnecessary work when we process the targetlist. *************** rewriteTargetView(Query *parsetree, Rela *** 2743,2748 **** --- 2745,2754 ---- * only adjust their varnos to reference the new target (just the same as * we did with the view targetlist). * + * Note that there is special-case handling for the quals of a security + * barrier view, since they need to be kept separate from any user-supplied + * quals - see rewriteSecurityQuals. + * * For INSERT, the view's quals can be ignored in the main query. */ if (parsetree->commandType != CMD_INSERT && *************** rewriteTargetView(Query *parsetree, Rela *** 2751,2757 **** Node *viewqual = (Node *) copyObject(viewquery->jointree->quals); ChangeVarNodes(viewqual, base_rt_index, new_rt_index, 0); ! AddQual(parsetree, (Node *) viewqual); } /* --- 2757,2774 ---- Node *viewqual = (Node *) copyObject(viewquery->jointree->quals); ChangeVarNodes(viewqual, base_rt_index, new_rt_index, 0); ! ! if (RelationIsSecurityView(view)) ! { ! /* ! * Note: the parsetree has been mutated, so the new_rte pointer is ! * stale and needs to be re-computed. ! */ ! new_rte = rt_fetch(new_rt_index, parsetree->rtable); ! new_rte->securityQuals = lcons(viewqual, new_rte->securityQuals); ! } ! else ! AddQual(parsetree, (Node *) viewqual); } /* *************** rewriteTargetView(Query *parsetree, Rela *** 2812,2824 **** /* * Make sure that the query is marked correctly if the added * qual has sublinks. We can skip this check if the query is ! * already marked, or if the command is an UPDATE, in which ! * case the same qual will have already been added to the ! * query's WHERE clause, and AddQual will have already done ! * this check. */ if (!parsetree->hasSubLinks && ! parsetree->commandType != CMD_UPDATE) parsetree->hasSubLinks = checkExprHasSubLink(wco->qual); } } --- 2829,2842 ---- /* * Make sure that the query is marked correctly if the added * qual has sublinks. We can skip this check if the query is ! * already marked, or if the command is an UPDATE to a ! * non-security-barrier view, in which case the same qual will ! * have already been added to the query's WHERE clause, and ! * AddQual will have already done this check. */ if (!parsetree->hasSubLinks && ! (parsetree->commandType != CMD_UPDATE || ! RelationIsSecurityView(view))) parsetree->hasSubLinks = checkExprHasSubLink(wco->qual); } } *************** rewriteTargetView(Query *parsetree, Rela *** 2829,2834 **** --- 2847,3291 ---- /* + * rewriteSecurityQual - + * rewrite the specified security barrier qual on a query RTE, turning the + * RTE into a subquery. + */ + static Query * + rewriteSecurityQual(Query *parsetree, int rt_index, + bool is_result_relation, Node *qual) + { + RangeTblEntry *rte; + List *targetlist = NIL; + List *rte_targetlist = NIL; + List *colnames = NIL; + Relation relation; + AttrNumber attno; + Var *var; + TargetEntry *tle; + Query *subquery; + RangeTblEntry *subrte; + RangeTblRef *subrtr; + ListCell *cell; + + rte = rt_fetch(rt_index, parsetree->rtable); + + /* + * There are 2 possible cases: + * + * 1. A relation RTE, which we turn into a subquery RTE containing all + * referenced columns. + * + * 2. A subquery RTE (either from a prior call to this function or from an + * expanded view). In this case we build a new subquery on top of it to + * isolate this security barrier qual from any other quals. + */ + switch (rte->rtekind) + { + case RTE_RELATION: + /* + * Build the subquery targetlist from the non-dropped columns of + * the underlying table and an RTE targetlist to map varattnos in + * the main query onto the new subquery columns. + */ + relation = heap_open(rte->relid, NoLock); + + for (attno = 1; attno <= relation->rd_att->natts; attno++) + { + Form_pg_attribute att_tup; + char *attname; + + /* Ignore dropped attributes */ + att_tup = relation->rd_att->attrs[attno - 1]; + if (att_tup->attisdropped) + continue; + + /* target entry for new subquery targetlist */ + var = makeVar(1, + attno, + att_tup->atttypid, + att_tup->atttypmod, + att_tup->attcollation, + 0); + attname = NameStr(att_tup->attname); + + tle = makeTargetEntry((Expr *) var, + list_length(targetlist) + 1, + pstrdup(attname), + false); + targetlist = lappend(targetlist, tle); + + /* top-level target entry for rewriting the main query */ + var = copyObject(var); + var->varno = rt_index; + var->varattno = list_length(targetlist); + tle = makeTargetEntry((Expr *) var, + attno, + pstrdup(attname), + false); + rte_targetlist = lappend(rte_targetlist, tle); + + colnames = lappend(colnames, makeString(pstrdup(attname))); + } + + /* + * Then include any junk system attributes. These must come at + * the end of the targetlist. + */ + for (attno = FirstLowInvalidHeapAttributeNumber + 1; + attno <= InvalidAttrNumber; attno++) + { + Form_pg_attribute att_tup; + char *attname; + + if (attno == ObjectIdAttributeNumber && + !relation->rd_rel->relhasoids) + continue; + + if (attno == InvalidAttrNumber) + { + /* whole-row attribute */ + var = makeWholeRowVar(rte, 1, 0, false); + attname = "wholerow"; + } + else + { + /* system attribute */ + att_tup = SystemAttributeDefinition(attno, + relation->rd_rel->relhasoids); + + var = makeVar(1, + attno, + att_tup->atttypid, + att_tup->atttypmod, + att_tup->attcollation, + 0); + attname = NameStr(att_tup->attname); + } + + /* target entry for new subquery targetlist */ + tle = makeTargetEntry((Expr *) var, + list_length(targetlist) + 1, + pstrdup(attname), + false); + targetlist = lappend(targetlist, tle); + + /* top-level target entry for rewriting the main query */ + var = copyObject(var); + var->varno = rt_index; + var->varattno = list_length(targetlist); + tle = makeTargetEntry((Expr *) var, + attno, + pstrdup(attname), + false); + rte_targetlist = lappend(rte_targetlist, tle); + + colnames = lappend(colnames, makeString(pstrdup(attname))); + } + heap_close(relation, NoLock); + + /* + * Turn the main relation RTE into a security barrier subquery + * RTE, moving all permissions checks and rowMarks down into the + * subquery. + */ + subquery = makeNode(Query); + subquery->commandType = CMD_SELECT; + subquery->querySource = QSRC_INSTEAD_RULE; + subquery->targetList = targetlist; + + subrte = copyObject(rte); + subrte->inFromCl = true; + subrte->securityQuals = NIL; + subquery->rtable = list_make1(subrte); + + subrtr = makeNode(RangeTblRef); + subrtr->rtindex = 1; + subquery->jointree = makeFromExpr(list_make1(subrtr), qual); + subquery->hasSubLinks = checkExprHasSubLink(qual); + + foreach(cell, parsetree->rowMarks) + { + RowMarkClause *rc = (RowMarkClause *) lfirst(cell); + if (rc->rti == rt_index) + { + parsetree->rowMarks = list_delete(parsetree->rowMarks, + rc); + rc->rti = 1; + rc->pushedDown = true; + subquery->rowMarks = list_make1(rc); + break; + } + } + + /* + * If this RTE was the result relation, then we need to lock the + * rows coming from it. Note that by the time we get here, this + * RTE will no longer be the result relation, so we have to rely + * on the flag passed in. + */ + if (is_result_relation) + applyLockingClause(subquery, 1, LCS_FORUPDATE, false, true); + + rte->rtekind = RTE_SUBQUERY; + rte->relid = InvalidOid; + rte->subquery = subquery; + rte->security_barrier = true; + rte->eref = makeAlias(rte->eref->aliasname, colnames); + rte->inh = false; /* must not be set for a subquery */ + + /* the permissions checks have now been moved down */ + rte->requiredPerms = 0; + rte->checkAsUser = InvalidOid; + rte->selectedCols = NULL; + rte->modifiedCols = NULL; + + /* + * Update any varattnos in the main query that refer to this RTE, + * using the entries from the RTE targetlist. + */ + parsetree = (Query *) + ReplaceVarsFromTargetList((Node *) parsetree, + rt_index, + 0, + rte, + rte_targetlist, + REPLACEVARS_REPORT_ERROR, + 0, + &parsetree->hasSubLinks); + break; + + case RTE_SUBQUERY: + /* + * Build a new subquery that includes all the same columns as the + * original subquery. + */ + foreach(cell, rte->subquery->targetList) + { + tle = (TargetEntry *) lfirst(cell); + var = makeVarFromTargetEntry(1, tle); + + tle = makeTargetEntry((Expr *) var, + list_length(targetlist) + 1, + pstrdup(tle->resname), + tle->resjunk); + targetlist = lappend(targetlist, tle); + } + + subquery = makeNode(Query); + subquery->commandType = CMD_SELECT; + subquery->querySource = QSRC_INSTEAD_RULE; + subquery->targetList = targetlist; + + subrte = makeNode(RangeTblEntry); + subrte->rtekind = RTE_SUBQUERY; + subrte->subquery = rte->subquery; + subrte->security_barrier = rte->security_barrier; + subrte->eref = copyObject(rte->eref); + subrte->inFromCl = true; + subquery->rtable = list_make1(subrte); + + subrtr = makeNode(RangeTblRef); + subrtr->rtindex = 1; + subquery->jointree = makeFromExpr(list_make1(subrtr), qual); + subquery->hasSubLinks = checkExprHasSubLink(qual); + + rte->subquery = subquery; + rte->security_barrier = true; + + break; + + default: + elog(ERROR, "invalid range table entry for security barrier qual"); + } + + return parsetree; + } + + + /* + * rewriteSecurityQualsOnSubLink - + * Apply rewriteSecurityQuals() to each SubLink (subselect in expression) + * found in the given tree. + * + * NOTE: although this has the form of a walker, we cheat and modify the + * SubLink nodes in-place. This is safe because we are not descending into + * subqueries, so no parts of the tree we are modifying are being traversed. + * + * Each SubLink subselect is replaced with a possibly-rewritten subquery. + */ + static bool + rewriteSecurityQualsOnSubLink(Node *node, void *ctx) + { + if (node == NULL) + return false; + if (IsA(node, SubLink)) + { + SubLink *sub = (SubLink *) node; + + sub->subselect = (Node *) + rewriteSecurityQuals((Query *) sub->subselect); + /* Fall through to process lefthand args of SubLink */ + } + + /* + * Do NOT recurse into Query nodes, because rewriteSecurityQuals already + * processed all subqueries in the rtable and cteList. + */ + return expression_tree_walker(node, rewriteSecurityQualsOnSubLink, ctx); + } + + + /* + * rewriteSecurityQuals - + * rewrites any security barrier quals on RTEs in the query, turning them + * into subqueries to allow the planner to enforce them before any user + * quals where necessary. Currently such security barrier quals can only + * have come from automatically updatable security barrier views. + * + * We do this at the end of the rewriting process (after any SELECT rules have + * been applied) so that the new security barrier subqueries wrap any remaining + * views after they are expanded. + * + * Any given RTE may have multiple security barrier quals in a list, from which + * we create a set of nested subqueries to isolate each security barrier from + * the others, providing protection against malicious user security barriers. + * The first item in the list represents the innermost subquery. + */ + static Query * + rewriteSecurityQuals(Query *parsetree) + { + ListCell *l; + int rt_index; + + /* + * Process each RTE in the rtable list. Security barrier quals are + * initially only added to the result relation, but subsequent rules + * may change that, so they may be anywhere. + * + * Note that this is deliberately not a foreach loop, since the whole + * parsetree may be mutated each time through the loop. + */ + rt_index = 0; + while (rt_index < list_length(parsetree->rtable)) + { + RangeTblEntry *rte; + bool is_result_relation; + int qual_idx; + + ++rt_index; + rte = rt_fetch(rt_index, parsetree->rtable); + + if (rte->securityQuals == NIL) + continue; + + /* + * Ignore any RTEs that aren't used in the query (such RTEs may be + * present for permissions checks). + */ + if (rt_index != parsetree->resultRelation && + !rangeTableEntry_used((Node *) parsetree, rt_index, 0)) + continue; + + /* + * Recursively process any security barrier quals in subquery RTEs + * before processing any at this query level. + */ + if (rte->rtekind == RTE_SUBQUERY) + rte->subquery = rewriteSecurityQuals(rte->subquery); + + /* + * If this RTE is the target then we need to make a copy of it before + * expanding it. The unexpanded copy will become the new target, and + * the expanded RTE will be the source of rows to update/delete + * (cf. ApplyRetrieveRule). + */ + is_result_relation = rt_index == parsetree->resultRelation; + if (is_result_relation) + { + RangeTblEntry *newrte = copyObject(rte); + parsetree->rtable = lappend(parsetree->rtable, newrte); + parsetree->sourceRelation = parsetree->resultRelation; + parsetree->resultRelation = list_length(parsetree->rtable); + + /* + * Wipe out any copied security quals on the new target to prevent + * infinite recursion. + */ + newrte->securityQuals = NIL; + + /* + * There's no need to do permissions checks twice, so wipe out the + * permissions info for the original RTE (we prefer to keep the + * bits set on the result RTE). + */ + rte->requiredPerms = 0; + rte->checkAsUser = InvalidOid; + rte->selectedCols = NULL; + rte->modifiedCols = NULL; + + /* + * For the most part, Vars referencing the original relation + * should remain as as they are, meaning that they implicitly + * represent OLD values. But in the RETURNING list if any, we + * want such Vars to represent NEW values, so change them to + * reference the new RTE. + * + * Since ChangeVarNodes scribbles on the tree in-place, copy the + * RETURNING list first for safety. + */ + parsetree->returningList = copyObject(parsetree->returningList); + ChangeVarNodes((Node *) parsetree->returningList, rt_index, + parsetree->resultRelation, 0); + } + + /* + * Process each security qual in turn, starting with the innermost + * one and working outwards. + * + * Note that we can't use a foreach loop here because the whole + * parsetree may be mutated each time through the loop. For the same + * reason we must re-fetch the RTE each time. + */ + qual_idx = 0; + while (qual_idx < list_length(rte->securityQuals)) + { + Node *qual = (Node *) list_nth(rte->securityQuals, qual_idx); + + ChangeVarNodes(qual, rt_index, 1, 0); + parsetree = rewriteSecurityQual(parsetree, rt_index, + is_result_relation, qual); + + /* re-fetch the RTE in case it has been re-written */ + rte = rt_fetch(rt_index, parsetree->rtable); + + qual_idx++; + } + rte->securityQuals = NIL; + } + + /* Recurse into subqueries in WITH */ + foreach(l, parsetree->cteList) + { + CommonTableExpr *cte = (CommonTableExpr *) lfirst(l); + + cte->ctequery = (Node *) + rewriteSecurityQuals((Query *) cte->ctequery); + } + + /* + * Recurse into sublink subqueries too, without descending into the rtable + * or the cteList, which we have already processed. + */ + if (parsetree->hasSubLinks) + query_tree_walker(parsetree, rewriteSecurityQualsOnSubLink, NULL, + QTW_IGNORE_RC_SUBQUERIES); + + return parsetree; + } + + + /* * RewriteQuery - * rewrites the query and apply the rules again on the queries rewritten * *************** QueryRewrite(Query *parsetree) *** 3218,3224 **** /* * Step 2 * ! * Apply all the RIR rules on each query * * This is also a handy place to mark each query with the original queryId */ --- 3675,3682 ---- /* * Step 2 * ! * Apply all the RIR rules on each query, and then expand any security ! * quals that apply to RTEs in each query. * * This is also a handy place to mark each query with the original queryId */ *************** QueryRewrite(Query *parsetree) *** 3228,3233 **** --- 3686,3692 ---- Query *query = (Query *) lfirst(l); query = fireRIRrules(query, NIL, false); + query = rewriteSecurityQuals(query); query->queryId = input_query_id; diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h new file mode 100644 index e5235cb..5e89651 *** a/src/include/nodes/parsenodes.h --- b/src/include/nodes/parsenodes.h *************** typedef struct Query *** 110,115 **** --- 110,118 ---- Node *utilityStmt; /* non-null if this is DECLARE CURSOR or a * non-optimizable statement */ + int sourceRelation; /* rtable index of source relation for + * UPDATE/DELETE; 0 otherwise */ + int resultRelation; /* rtable index of target relation for * INSERT/UPDATE/DELETE; 0 for SELECT */ *************** typedef struct RangeTblEntry *** 801,806 **** --- 804,810 ---- Oid checkAsUser; /* if valid, check access as this role */ Bitmapset *selectedCols; /* columns needing SELECT permission */ Bitmapset *modifiedCols; /* columns needing INSERT/UPDATE permission */ + List *securityQuals; /* any security barrier quals to apply */ } RangeTblEntry; /* diff --git a/src/test/regress/expected/updatable_views.out b/src/test/regress/expected/updatable_views.out new file mode 100644 index c725bba..b4d0af2 *** a/src/test/regress/expected/updatable_views.out --- b/src/test/regress/expected/updatable_views.out *************** CREATE VIEW rw_view14 AS SELECT ctid, a, *** 22,33 **** CREATE VIEW rw_view15 AS SELECT a, upper(b) FROM base_tbl; -- Expression/function may be part of an updatable view CREATE VIEW rw_view16 AS SELECT a, b, a AS aa FROM base_tbl; -- Repeated column may be part of an updatable view CREATE VIEW ro_view17 AS SELECT * FROM ro_view1; -- Base relation not updatable ! CREATE VIEW ro_view18 WITH (security_barrier = true) ! AS SELECT * FROM base_tbl; -- Security barrier views not updatable ! CREATE VIEW ro_view19 AS SELECT * FROM (VALUES(1)) AS tmp(a); -- VALUES in rangetable CREATE SEQUENCE seq; ! CREATE VIEW ro_view20 AS SELECT * FROM seq; -- View based on a sequence ! CREATE VIEW ro_view21 AS SELECT a, b, generate_series(1, a) g FROM base_tbl; -- SRF in targetlist not supported SELECT table_name, is_insertable_into FROM information_schema.tables WHERE table_name LIKE E'r_\\_view%' --- 22,31 ---- CREATE VIEW rw_view15 AS SELECT a, upper(b) FROM base_tbl; -- Expression/function may be part of an updatable view CREATE VIEW rw_view16 AS SELECT a, b, a AS aa FROM base_tbl; -- Repeated column may be part of an updatable view CREATE VIEW ro_view17 AS SELECT * FROM ro_view1; -- Base relation not updatable ! CREATE VIEW ro_view18 AS SELECT * FROM (VALUES(1)) AS tmp(a); -- VALUES in rangetable CREATE SEQUENCE seq; ! CREATE VIEW ro_view19 AS SELECT * FROM seq; -- View based on a sequence ! CREATE VIEW ro_view20 AS SELECT a, b, generate_series(1, a) g FROM base_tbl; -- SRF in targetlist not supported SELECT table_name, is_insertable_into FROM information_schema.tables WHERE table_name LIKE E'r_\\_view%' *************** SELECT table_name, is_insertable_into *** 44,50 **** ro_view19 | NO ro_view2 | NO ro_view20 | NO - ro_view21 | NO ro_view3 | NO ro_view4 | NO ro_view5 | NO --- 42,47 ---- *************** SELECT table_name, is_insertable_into *** 55,61 **** rw_view14 | YES rw_view15 | YES rw_view16 | YES ! (21 rows) SELECT table_name, is_updatable, is_insertable_into FROM information_schema.views --- 52,58 ---- rw_view14 | YES rw_view15 | YES rw_view16 | YES ! (20 rows) SELECT table_name, is_updatable, is_insertable_into FROM information_schema.views *************** SELECT table_name, is_updatable, is_inse *** 73,79 **** ro_view19 | NO | NO ro_view2 | NO | NO ro_view20 | NO | NO - ro_view21 | NO | NO ro_view3 | NO | NO ro_view4 | NO | NO ro_view5 | NO | NO --- 70,75 ---- *************** SELECT table_name, is_updatable, is_inse *** 84,90 **** rw_view14 | YES | YES rw_view15 | YES | YES rw_view16 | YES | YES ! (21 rows) SELECT table_name, column_name, is_updatable FROM information_schema.columns --- 80,86 ---- rw_view14 | YES | YES rw_view15 | YES | YES rw_view16 | YES | YES ! (20 rows) SELECT table_name, column_name, is_updatable FROM information_schema.columns *************** SELECT table_name, column_name, is_updat *** 103,125 **** ro_view17 | a | NO ro_view17 | b | NO ro_view18 | a | NO ! ro_view18 | b | NO ! ro_view19 | a | NO ro_view2 | a | NO ro_view2 | b | NO ! ro_view20 | sequence_name | NO ! ro_view20 | last_value | NO ! ro_view20 | start_value | NO ! ro_view20 | increment_by | NO ! ro_view20 | max_value | NO ! ro_view20 | min_value | NO ! ro_view20 | cache_value | NO ! ro_view20 | log_cnt | NO ! ro_view20 | is_cycled | NO ! ro_view20 | is_called | NO ! ro_view21 | a | NO ! ro_view21 | b | NO ! ro_view21 | g | NO ro_view3 | ?column? | NO ro_view4 | count | NO ro_view5 | a | NO --- 99,119 ---- ro_view17 | a | NO ro_view17 | b | NO ro_view18 | a | NO ! ro_view19 | sequence_name | NO ! ro_view19 | last_value | NO ! ro_view19 | start_value | NO ! ro_view19 | increment_by | NO ! ro_view19 | max_value | NO ! ro_view19 | min_value | NO ! ro_view19 | cache_value | NO ! ro_view19 | log_cnt | NO ! ro_view19 | is_cycled | NO ! ro_view19 | is_called | NO ro_view2 | a | NO ro_view2 | b | NO ! ro_view20 | a | NO ! ro_view20 | b | NO ! ro_view20 | g | NO ro_view3 | ?column? | NO ro_view4 | count | NO ro_view5 | a | NO *************** SELECT table_name, column_name, is_updat *** 140,146 **** rw_view16 | a | YES rw_view16 | b | YES rw_view16 | aa | YES ! (48 rows) -- Read-only views DELETE FROM ro_view1; --- 134,140 ---- rw_view16 | a | YES rw_view16 | b | YES rw_view16 | aa | YES ! (46 rows) -- Read-only views DELETE FROM ro_view1; *************** INSERT INTO ro_view17 VALUES (3, 'ROW 3' *** 268,291 **** ERROR: cannot insert into view "ro_view1" DETAIL: Views containing DISTINCT are not automatically updatable. HINT: To enable inserting into the view, provide an INSTEAD OF INSERT trigger or an unconditional ON INSERT DO INSTEAD rule. ! INSERT INTO ro_view18 VALUES (3, 'ROW 3'); ! ERROR: cannot insert into view "ro_view18" ! DETAIL: Security-barrier views are not automatically updatable. ! HINT: To enable inserting into the view, provide an INSTEAD OF INSERT trigger or an unconditional ON INSERT DO INSTEAD rule. ! DELETE FROM ro_view19; ! ERROR: cannot delete from view "ro_view19" DETAIL: Views that do not select from a single table or view are not automatically updatable. HINT: To enable deleting from the view, provide an INSTEAD OF DELETE trigger or an unconditional ON DELETE DO INSTEAD rule. ! UPDATE ro_view20 SET max_value=1000; ! ERROR: cannot update view "ro_view20" DETAIL: Views that do not select from a single table or view are not automatically updatable. HINT: To enable updating the view, provide an INSTEAD OF UPDATE trigger or an unconditional ON UPDATE DO INSTEAD rule. ! UPDATE ro_view21 SET b=upper(b); ! ERROR: cannot update view "ro_view21" DETAIL: Views that return set-returning functions are not automatically updatable. HINT: To enable updating the view, provide an INSTEAD OF UPDATE trigger or an unconditional ON UPDATE DO INSTEAD rule. DROP TABLE base_tbl CASCADE; ! NOTICE: drop cascades to 17 other objects DETAIL: drop cascades to view ro_view1 drop cascades to view ro_view17 drop cascades to view ro_view2 --- 262,281 ---- ERROR: cannot insert into view "ro_view1" DETAIL: Views containing DISTINCT are not automatically updatable. HINT: To enable inserting into the view, provide an INSTEAD OF INSERT trigger or an unconditional ON INSERT DO INSTEAD rule. ! DELETE FROM ro_view18; ! ERROR: cannot delete from view "ro_view18" DETAIL: Views that do not select from a single table or view are not automatically updatable. HINT: To enable deleting from the view, provide an INSTEAD OF DELETE trigger or an unconditional ON DELETE DO INSTEAD rule. ! UPDATE ro_view19 SET max_value=1000; ! ERROR: cannot update view "ro_view19" DETAIL: Views that do not select from a single table or view are not automatically updatable. HINT: To enable updating the view, provide an INSTEAD OF UPDATE trigger or an unconditional ON UPDATE DO INSTEAD rule. ! UPDATE ro_view20 SET b=upper(b); ! ERROR: cannot update view "ro_view20" DETAIL: Views that return set-returning functions are not automatically updatable. HINT: To enable updating the view, provide an INSTEAD OF UPDATE trigger or an unconditional ON UPDATE DO INSTEAD rule. DROP TABLE base_tbl CASCADE; ! NOTICE: drop cascades to 16 other objects DETAIL: drop cascades to view ro_view1 drop cascades to view ro_view17 drop cascades to view ro_view2 *************** drop cascades to view ro_view11 *** 299,311 **** drop cascades to view ro_view13 drop cascades to view rw_view15 drop cascades to view rw_view16 ! drop cascades to view ro_view18 ! drop cascades to view ro_view21 drop cascades to view ro_view4 drop cascades to view rw_view14 ! DROP VIEW ro_view10, ro_view12, ro_view19; DROP SEQUENCE seq CASCADE; ! NOTICE: drop cascades to view ro_view20 -- simple updatable view CREATE TABLE base_tbl (a int PRIMARY KEY, b text DEFAULT 'Unspecified'); INSERT INTO base_tbl SELECT i, 'Row ' || i FROM generate_series(-2, 2) g(i); --- 289,300 ---- drop cascades to view ro_view13 drop cascades to view rw_view15 drop cascades to view rw_view16 ! drop cascades to view ro_view20 drop cascades to view ro_view4 drop cascades to view rw_view14 ! DROP VIEW ro_view10, ro_view12, ro_view18; DROP SEQUENCE seq CASCADE; ! NOTICE: drop cascades to view ro_view19 -- simple updatable view CREATE TABLE base_tbl (a int PRIMARY KEY, b text DEFAULT 'Unspecified'); INSERT INTO base_tbl SELECT i, 'Row ' || i FROM generate_series(-2, 2) g(i); *************** INSERT INTO rw_view2 VALUES (2,3); -- ok *** 1739,1742 **** --- 1728,1927 ---- DROP TABLE base_tbl CASCADE; NOTICE: drop cascades to 2 other objects DETAIL: drop cascades to view rw_view1 + drop cascades to view rw_view2 + -- security barrier view + CREATE TABLE base_tbl (person text, visibility text); + INSERT INTO base_tbl VALUES ('Tom', 'public'), + ('Dick', 'private'), + ('Harry', 'public'); + CREATE VIEW rw_view1 AS + SELECT person FROM base_tbl WHERE visibility = 'public'; + CREATE FUNCTION snoop(val text) + RETURNS boolean AS + $$ + BEGIN + RAISE NOTICE 'snooped value: %', val; + RETURN true; + END; + $$ + LANGUAGE plpgsql COST 0.000001; + SELECT * FROM rw_view1 WHERE snoop(person); + NOTICE: snooped value: Tom + NOTICE: snooped value: Dick + NOTICE: snooped value: Harry + person + -------- + Tom + Harry + (2 rows) + + UPDATE rw_view1 SET person=person WHERE snoop(person); + NOTICE: snooped value: Tom + NOTICE: snooped value: Dick + NOTICE: snooped value: Harry + DELETE FROM rw_view1 WHERE NOT snoop(person); + NOTICE: snooped value: Dick + NOTICE: snooped value: Tom + NOTICE: snooped value: Harry + ALTER VIEW rw_view1 SET (security_barrier = true); + SELECT table_name, is_insertable_into + FROM information_schema.tables + WHERE table_name = 'rw_view1'; + table_name | is_insertable_into + ------------+-------------------- + rw_view1 | YES + (1 row) + + SELECT table_name, is_updatable, is_insertable_into + FROM information_schema.views + WHERE table_name = 'rw_view1'; + table_name | is_updatable | is_insertable_into + ------------+--------------+-------------------- + rw_view1 | YES | YES + (1 row) + + SELECT table_name, column_name, is_updatable + FROM information_schema.columns + WHERE table_name = 'rw_view1' + ORDER BY ordinal_position; + table_name | column_name | is_updatable + ------------+-------------+-------------- + rw_view1 | person | YES + (1 row) + + SELECT * FROM rw_view1 WHERE snoop(person); + NOTICE: snooped value: Tom + NOTICE: snooped value: Harry + person + -------- + Tom + Harry + (2 rows) + + UPDATE rw_view1 SET person=person WHERE snoop(person); + NOTICE: snooped value: Tom + NOTICE: snooped value: Harry + DELETE FROM rw_view1 WHERE NOT snoop(person); + NOTICE: snooped value: Tom + NOTICE: snooped value: Harry + EXPLAIN (costs off) SELECT * FROM rw_view1 WHERE snoop(person); + QUERY PLAN + ----------------------------------------------- + Subquery Scan on rw_view1 + Filter: snoop(rw_view1.person) + -> Seq Scan on base_tbl + Filter: (visibility = 'public'::text) + (4 rows) + + EXPLAIN (costs off) UPDATE rw_view1 SET person=person WHERE snoop(person); + QUERY PLAN + ----------------------------------------------------------- + Update on base_tbl base_tbl_1 + -> Subquery Scan on base_tbl + Filter: snoop(base_tbl.person) + -> LockRows + -> Seq Scan on base_tbl base_tbl_2 + Filter: (visibility = 'public'::text) + (6 rows) + + EXPLAIN (costs off) DELETE FROM rw_view1 WHERE NOT snoop(person); + QUERY PLAN + ----------------------------------------------------------- + Delete on base_tbl base_tbl_1 + -> Subquery Scan on base_tbl + Filter: (NOT snoop(base_tbl.person)) + -> LockRows + -> Seq Scan on base_tbl base_tbl_2 + Filter: (visibility = 'public'::text) + (6 rows) + + -- security barrier view on top of security barrier view + CREATE VIEW rw_view2 WITH (security_barrier = true) AS + SELECT * FROM rw_view1 WHERE snoop(person); + SELECT table_name, is_insertable_into + FROM information_schema.tables + WHERE table_name = 'rw_view2'; + table_name | is_insertable_into + ------------+-------------------- + rw_view2 | YES + (1 row) + + SELECT table_name, is_updatable, is_insertable_into + FROM information_schema.views + WHERE table_name = 'rw_view2'; + table_name | is_updatable | is_insertable_into + ------------+--------------+-------------------- + rw_view2 | YES | YES + (1 row) + + SELECT table_name, column_name, is_updatable + FROM information_schema.columns + WHERE table_name = 'rw_view2' + ORDER BY ordinal_position; + table_name | column_name | is_updatable + ------------+-------------+-------------- + rw_view2 | person | YES + (1 row) + + SELECT * FROM rw_view2 WHERE snoop(person); + NOTICE: snooped value: Tom + NOTICE: snooped value: Tom + NOTICE: snooped value: Harry + NOTICE: snooped value: Harry + person + -------- + Tom + Harry + (2 rows) + + UPDATE rw_view2 SET person=person WHERE snoop(person); + NOTICE: snooped value: Tom + NOTICE: snooped value: Tom + NOTICE: snooped value: Harry + NOTICE: snooped value: Harry + DELETE FROM rw_view2 WHERE NOT snoop(person); + NOTICE: snooped value: Tom + NOTICE: snooped value: Tom + NOTICE: snooped value: Harry + NOTICE: snooped value: Harry + EXPLAIN (costs off) SELECT * FROM rw_view2 WHERE snoop(person); + QUERY PLAN + ----------------------------------------------------- + Subquery Scan on rw_view2 + Filter: snoop(rw_view2.person) + -> Subquery Scan on rw_view1 + Filter: snoop(rw_view1.person) + -> Seq Scan on base_tbl + Filter: (visibility = 'public'::text) + (6 rows) + + EXPLAIN (costs off) UPDATE rw_view2 SET person=person WHERE snoop(person); + QUERY PLAN + ----------------------------------------------------------------- + Update on base_tbl base_tbl_1 + -> Subquery Scan on base_tbl + Filter: snoop(base_tbl.person) + -> Subquery Scan on base_tbl_2 + Filter: snoop(base_tbl_2.person) + -> LockRows + -> Seq Scan on base_tbl base_tbl_3 + Filter: (visibility = 'public'::text) + (8 rows) + + EXPLAIN (costs off) DELETE FROM rw_view2 WHERE NOT snoop(person); + QUERY PLAN + ----------------------------------------------------------------- + Delete on base_tbl base_tbl_1 + -> Subquery Scan on base_tbl + Filter: (NOT snoop(base_tbl.person)) + -> Subquery Scan on base_tbl_2 + Filter: snoop(base_tbl_2.person) + -> LockRows + -> Seq Scan on base_tbl base_tbl_3 + Filter: (visibility = 'public'::text) + (8 rows) + + DROP TABLE base_tbl CASCADE; + NOTICE: drop cascades to 2 other objects + DETAIL: drop cascades to view rw_view1 drop cascades to view rw_view2 diff --git a/src/test/regress/sql/updatable_views.sql b/src/test/regress/sql/updatable_views.sql new file mode 100644 index a77cf19..6b352c1 *** a/src/test/regress/sql/updatable_views.sql --- b/src/test/regress/sql/updatable_views.sql *************** CREATE VIEW rw_view14 AS SELECT ctid, a, *** 25,36 **** CREATE VIEW rw_view15 AS SELECT a, upper(b) FROM base_tbl; -- Expression/function may be part of an updatable view CREATE VIEW rw_view16 AS SELECT a, b, a AS aa FROM base_tbl; -- Repeated column may be part of an updatable view CREATE VIEW ro_view17 AS SELECT * FROM ro_view1; -- Base relation not updatable ! CREATE VIEW ro_view18 WITH (security_barrier = true) ! AS SELECT * FROM base_tbl; -- Security barrier views not updatable ! CREATE VIEW ro_view19 AS SELECT * FROM (VALUES(1)) AS tmp(a); -- VALUES in rangetable CREATE SEQUENCE seq; ! CREATE VIEW ro_view20 AS SELECT * FROM seq; -- View based on a sequence ! CREATE VIEW ro_view21 AS SELECT a, b, generate_series(1, a) g FROM base_tbl; -- SRF in targetlist not supported SELECT table_name, is_insertable_into FROM information_schema.tables --- 25,34 ---- CREATE VIEW rw_view15 AS SELECT a, upper(b) FROM base_tbl; -- Expression/function may be part of an updatable view CREATE VIEW rw_view16 AS SELECT a, b, a AS aa FROM base_tbl; -- Repeated column may be part of an updatable view CREATE VIEW ro_view17 AS SELECT * FROM ro_view1; -- Base relation not updatable ! CREATE VIEW ro_view18 AS SELECT * FROM (VALUES(1)) AS tmp(a); -- VALUES in rangetable CREATE SEQUENCE seq; ! CREATE VIEW ro_view19 AS SELECT * FROM seq; -- View based on a sequence ! CREATE VIEW ro_view20 AS SELECT a, b, generate_series(1, a) g FROM base_tbl; -- SRF in targetlist not supported SELECT table_name, is_insertable_into FROM information_schema.tables *************** SELECT * FROM base_tbl; *** 87,99 **** DELETE FROM rw_view16 WHERE a=-3; -- should be OK -- Read-only views INSERT INTO ro_view17 VALUES (3, 'ROW 3'); ! INSERT INTO ro_view18 VALUES (3, 'ROW 3'); ! DELETE FROM ro_view19; ! UPDATE ro_view20 SET max_value=1000; ! UPDATE ro_view21 SET b=upper(b); DROP TABLE base_tbl CASCADE; ! DROP VIEW ro_view10, ro_view12, ro_view19; DROP SEQUENCE seq CASCADE; -- simple updatable view --- 85,96 ---- DELETE FROM rw_view16 WHERE a=-3; -- should be OK -- Read-only views INSERT INTO ro_view17 VALUES (3, 'ROW 3'); ! DELETE FROM ro_view18; ! UPDATE ro_view19 SET max_value=1000; ! UPDATE ro_view20 SET b=upper(b); DROP TABLE base_tbl CASCADE; ! DROP VIEW ro_view10, ro_view12, ro_view18; DROP SEQUENCE seq CASCADE; -- simple updatable view *************** CREATE VIEW rw_view2 AS *** 828,830 **** --- 825,902 ---- SELECT * FROM rw_view1 WHERE a > b WITH LOCAL CHECK OPTION; INSERT INTO rw_view2 VALUES (2,3); -- ok, but not in view (doesn't fail rw_view2's check) DROP TABLE base_tbl CASCADE; + + -- security barrier view + + CREATE TABLE base_tbl (person text, visibility text); + INSERT INTO base_tbl VALUES ('Tom', 'public'), + ('Dick', 'private'), + ('Harry', 'public'); + + CREATE VIEW rw_view1 AS + SELECT person FROM base_tbl WHERE visibility = 'public'; + + CREATE FUNCTION snoop(val text) + RETURNS boolean AS + $$ + BEGIN + RAISE NOTICE 'snooped value: %', val; + RETURN true; + END; + $$ + LANGUAGE plpgsql COST 0.000001; + + SELECT * FROM rw_view1 WHERE snoop(person); + UPDATE rw_view1 SET person=person WHERE snoop(person); + DELETE FROM rw_view1 WHERE NOT snoop(person); + + ALTER VIEW rw_view1 SET (security_barrier = true); + + SELECT table_name, is_insertable_into + FROM information_schema.tables + WHERE table_name = 'rw_view1'; + + SELECT table_name, is_updatable, is_insertable_into + FROM information_schema.views + WHERE table_name = 'rw_view1'; + + SELECT table_name, column_name, is_updatable + FROM information_schema.columns + WHERE table_name = 'rw_view1' + ORDER BY ordinal_position; + + SELECT * FROM rw_view1 WHERE snoop(person); + UPDATE rw_view1 SET person=person WHERE snoop(person); + DELETE FROM rw_view1 WHERE NOT snoop(person); + + EXPLAIN (costs off) SELECT * FROM rw_view1 WHERE snoop(person); + EXPLAIN (costs off) UPDATE rw_view1 SET person=person WHERE snoop(person); + EXPLAIN (costs off) DELETE FROM rw_view1 WHERE NOT snoop(person); + + -- security barrier view on top of security barrier view + + CREATE VIEW rw_view2 WITH (security_barrier = true) AS + SELECT * FROM rw_view1 WHERE snoop(person); + + SELECT table_name, is_insertable_into + FROM information_schema.tables + WHERE table_name = 'rw_view2'; + + SELECT table_name, is_updatable, is_insertable_into + FROM information_schema.views + WHERE table_name = 'rw_view2'; + + SELECT table_name, column_name, is_updatable + FROM information_schema.columns + WHERE table_name = 'rw_view2' + ORDER BY ordinal_position; + + SELECT * FROM rw_view2 WHERE snoop(person); + UPDATE rw_view2 SET person=person WHERE snoop(person); + DELETE FROM rw_view2 WHERE NOT snoop(person); + + EXPLAIN (costs off) SELECT * FROM rw_view2 WHERE snoop(person); + EXPLAIN (costs off) UPDATE rw_view2 SET person=person WHERE snoop(person); + EXPLAIN (costs off) DELETE FROM rw_view2 WHERE NOT snoop(person); + + DROP TABLE base_tbl CASCADE;