commit a5af30148e560a83bfb845c98ecae4951eb60c34 Author: Kaiting Chen Date: Mon Jun 6 17:33:57 2022 -0400 Add feature diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c index 2de0ebacec..e2da2165be 100644 --- a/src/backend/commands/tablecmds.c +++ b/src/backend/commands/tablecmds.c @@ -378,7 +378,10 @@ static int transformFkeyGetPrimaryKey(Relation pkrel, Oid *indexOid, int16 *attnums, Oid *atttypids, Oid *opclasses); static Oid transformFkeyCheckAttrs(Relation pkrel, - int numattrs, int16 *attnums, + int numattrs, int16 *attnums, Oid *pktypoid, + Oid *opclasses); +static Oid transformFkeyCheckIndex(Relation pkrel, const char *pk_indexname, + int numattrs, int16 *attnums, Oid *pktypoid, Oid *opclasses); static void checkFkeyPermissions(Relation rel, int16 *attnums, int natts); static CoercionPathType findFkeyCast(Oid targetTypeId, Oid sourceTypeId, @@ -9151,9 +9154,20 @@ ATAddForeignKeyConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel, numpks = transformColumnNameList(RelationGetRelid(pkrel), fkconstraint->pk_attrs, pkattnum, pktypoid); - /* Look for an index matching the column list */ - indexOid = transformFkeyCheckAttrs(pkrel, numpks, pkattnum, - opclasses); + + /* + * Look for an index matching the column list. If an explicit index name + * is specified in the constraint then use that index. + */ + if (fkconstraint->pk_indexname == NULL) + indexOid = transformFkeyCheckAttrs(pkrel, + numpks, pkattnum, pktypoid, + opclasses); + else + indexOid = transformFkeyCheckIndex(pkrel, + fkconstraint->pk_indexname, + numpks, pkattnum, pktypoid, + opclasses); } /* @@ -11291,7 +11305,7 @@ transformFkeyGetPrimaryKey(Relation pkrel, Oid *indexOid, */ static Oid transformFkeyCheckAttrs(Relation pkrel, - int numattrs, int16 *attnums, + int numattrs, int16 *attnums, Oid *pktypoid, Oid *opclasses) /* output parameter */ { Oid indexoid = InvalidOid; @@ -11405,6 +11419,85 @@ transformFkeyCheckAttrs(Relation pkrel, break; } + /* + * If the search doesn't find an index with the same key columns as the + * referenced columns, then as a fallback, (re)test the primary key against + * a subset of the referenced columns. + */ + if (!found && OidIsValid(indexoid = RelationGetPrimaryKeyIndex(pkrel))) + { + HeapTuple indexTuple; + Form_pg_index indexStruct; + + indexTuple = SearchSysCache1(INDEXRELID, ObjectIdGetDatum(indexoid)); + if (!HeapTupleIsValid(indexTuple)) + elog(ERROR, "cache lookup failed for index %u", indexoid); + indexStruct = (Form_pg_index) GETSTRUCT(indexTuple); + + /* + * A primary key index must be unique, valid, expressionless, and + * predicateless. + */ + Assert(indexStruct->indisunique); + Assert(indexStruct->indisvalid); + Assert(heap_attisnull(indexTuple, Anum_pg_index_indexprs, NULL)); + Assert(heap_attisnull(indexTuple, Anum_pg_index_indpred, NULL)); + + if (indexStruct->indnkeyatts < numattrs && indexStruct->indimmediate) + { + Datum indclassDatum; + bool isnull; + oidvector *indclass; + + indclassDatum = SysCacheGetAttr(INDEXRELID, indexTuple, + Anum_pg_index_indclass, &isnull); + Assert(!isnull); + indclass = (oidvector *) DatumGetPointer(indclassDatum); + + found = true; + for (j = 0; j < indexStruct->indnkeyatts; j++) + { + for (i = 0; i < numattrs; i++) + { + if (attnums[i] == indexStruct->indkey.values[j]) + goto next_indkey; + } + + found = false; + break; + + next_indkey: + opclasses[i] = indclass->values[j]; + } + } + + if (found) + { + HeapTuple classTuple; + Oid amid; + + classTuple = SearchSysCache1(RELOID, ObjectIdGetDatum(indexoid)); + if (!HeapTupleIsValid(classTuple)) + elog(ERROR, "cache lookup failed for relation %u", indexoid); + amid = ((Form_pg_class) GETSTRUCT(classTuple))->relam; + ReleaseSysCache(classTuple); + + /* + * Use the default opclass for the column type with the same access + * method as the index for each referenced column that isn't an index + * column. + */ + for (i = 0; i < numattrs; i++) + { + if (opclasses[i] != InvalidOid) + continue; + opclasses[i] = ResolveOpClass(NIL, pktypoid[i], "btree", amid); + } + } + + ReleaseSysCache(indexTuple); + } + if (!found) { if (found_deferrable) @@ -11424,6 +11517,186 @@ transformFkeyCheckAttrs(Relation pkrel, return indexoid; } +/* + * transformFkeyCheckIndex - + * + * Make sure that the specified index is usable with this foreign key + * constraint. Return the OID of the index supporting the constraint, as well + * as the opclasses associated with the referenced columns. + */ +static Oid +transformFkeyCheckIndex(Relation pkrel, const char *pk_indexname, + int numattrs, int16 *attnums, Oid *pktypoid, + Oid *opclasses) /* output parameter */ +{ + Oid indexoid = InvalidOid, + amid; + HeapTuple classTuple, + indexTuple; + Form_pg_class classStruct; + Form_pg_index indexStruct; + int i, + j; + Datum indclassDatum; + bool isnull; + oidvector *indclass; + + /* + * Reject duplicate appearances of columns in the referenced-columns list. + * Such a case is forbidden by the SQL standard, and even if we thought it + * useful to allow it, there would be ambiguity about how to match the + * list to unique indexes (in particular, it'd be unclear which index + * opclass goes with which FK column). + */ + for (i = 0; i < numattrs; i++) + { + for (j = i + 1; j < numattrs; j++) + { + if (attnums[i] == attnums[j]) + ereport(ERROR, + (errcode(ERRCODE_INVALID_FOREIGN_KEY), + errmsg("foreign key referenced-columns list must not contain duplicates"))); + } + } + + /* Look for the index in the same schema as the table */ + classTuple = SearchSysCache2(RELNAMENSP, PointerGetDatum(pk_indexname), + ObjectIdGetDatum(RelationGetNamespace(pkrel))); + if (!HeapTupleIsValid(classTuple)) + ereport(ERROR, + errcode(ERRCODE_UNDEFINED_OBJECT), + errmsg("index \"%s\" for table \"%s\" does not exist", + pk_indexname, RelationGetRelationName(pkrel))); + classStruct = (Form_pg_class) GETSTRUCT(classTuple); + + if (classStruct->relkind != RELKIND_INDEX && + classStruct->relkind != RELKIND_PARTITIONED_INDEX) + ereport(ERROR, + errcode(ERRCODE_WRONG_OBJECT_TYPE), + errmsg("\"%s\" is not an index", pk_indexname)); + + indexoid = classStruct->oid; + amid = classStruct->relam; + ReleaseSysCache(classTuple); + + indexTuple = SearchSysCache1(INDEXRELID, ObjectIdGetDatum(indexoid)); + if (!HeapTupleIsValid(indexTuple)) + elog(ERROR, "cache lookup failed for index %u", indexoid); + indexStruct = (Form_pg_index) GETSTRUCT(indexTuple); + + /* + * The index must belong to the primary key relation. It can't have more key + * columns than are referenced. It must be unique and not a partial index; + * forget it if there are any expressions, too. Invalid indexes are out as + * well. + */ + if (indexStruct->indrelid != RelationGetRelid(pkrel)) + ereport(ERROR, + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("index \"%s\" does not belong to table \"%s\"", + pk_indexname, RelationGetRelationName(pkrel))); + + /* TODO */ + if (indexStruct->indnkeyatts > numattrs) + ereport(ERROR, + errcode(ERRCODE_WRONG_OBJECT_TYPE), + errmsg("\"%s\" is not a usable index with given keys for referenced table \"%s\"", + pk_indexname, RelationGetRelationName(pkrel)), + errdetail("Cannot create a foreign key constraint using an " + "index unless the index's key columns are a subset " + "of the referenced columns")); + + if (!indexStruct->indisvalid) + ereport(ERROR, + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("index \"%s\" is not valid", pk_indexname)); + + if (!indexStruct->indisunique) + ereport(ERROR, + errcode(ERRCODE_WRONG_OBJECT_TYPE), + errmsg("\"%s\" is not a unique index", + pk_indexname), + errdetail("Cannot create a foreign key constraint using such an index.")); + + if (!heap_attisnull(indexTuple, Anum_pg_index_indexprs, NULL)) + ereport(ERROR, + errcode(ERRCODE_WRONG_OBJECT_TYPE), + errmsg("index \"%s\" contains expressions", + pk_indexname), + errdetail("Cannot create a foreign key constraint using such an index.")); + + if (!heap_attisnull(indexTuple, Anum_pg_index_indpred, NULL)) + ereport(ERROR, + errcode(ERRCODE_WRONG_OBJECT_TYPE), + errmsg("\"%s\" is a partial index", + pk_indexname), + errdetail("Cannot create a foreign key constraint using such an index.")); + + /* + * Also, refuse to use a deferrable unique/primary key. This is per SQL + * spec, and there would be a lot of interesting semantic problems if we + * tried to allow it. + */ + if (!indexStruct->indimmediate) + ereport(ERROR, + errcode(ERRCODE_WRONG_OBJECT_TYPE), + errmsg("\"%s\" is a deferrable index", + pk_indexname), + errdetail("Cannot create a foreign key constraint using such an index.")); + + /* Must get indclass the hard way */ + indclassDatum = SysCacheGetAttr(INDEXRELID, indexTuple, + Anum_pg_index_indclass, &isnull); + Assert(!isnull); + indclass = (oidvector *) DatumGetPointer(indclassDatum); + + /* + * The given attnum list may match the index columns in any order. Check for + * a match, and extract the appropriate opclasses while we're at it. + * + * We know that attnums[] is duplicate-free per the test at the start of + * this function, and we checked above that the number of index columns + * is no more than the number of referenced columns, so if we find a match + * for an attnums[] entry then we must have a unique match in some order. + */ + for (j = 0; j < indexStruct->indnkeyatts; j++) + { + for (i = 0; i < numattrs; i++) + { + if (attnums[i] == indexStruct->indkey.values[j]) + goto next_indkey; + } + + /* TODO */ + ereport(ERROR, + errcode(ERRCODE_WRONG_OBJECT_TYPE), + errmsg("\"%s\" is not a usable index with given keys for referenced table \"%s\"", + pk_indexname, RelationGetRelationName(pkrel)), + errdetail("Cannot create a foreign key constraint using an " + "index unless the index's key columns are a subset " + "of the referenced columns")); + + next_indkey: + opclasses[i] = indclass->values[j]; + } + + /* + * Use the default opclass for the column type with the same access method + * as the index for each referenced column that isn't an index column. + */ + for (i = 0; i < numattrs; i++) + { + if (opclasses[i] != InvalidOid) + continue; + + /* Currently no index AMs other than btree support unique indexes */ + opclasses[i] = ResolveOpClass(NIL, pktypoid[i], "btree", amid); + } + + ReleaseSysCache(indexTuple); + return indexoid; +} + /* * findFkeyCast - * diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c index 51d630fa89..31ca1d29c2 100644 --- a/src/backend/nodes/copyfuncs.c +++ b/src/backend/nodes/copyfuncs.c @@ -3601,6 +3601,7 @@ _copyConstraint(const Constraint *from) COPY_NODE_FIELD(pktable); COPY_NODE_FIELD(fk_attrs); COPY_NODE_FIELD(pk_attrs); + COPY_STRING_FIELD(pk_indexname); COPY_SCALAR_FIELD(fk_matchtype); COPY_SCALAR_FIELD(fk_upd_action); COPY_SCALAR_FIELD(fk_del_action); diff --git a/src/backend/nodes/equalfuncs.c b/src/backend/nodes/equalfuncs.c index e747e1667d..30131e1056 100644 --- a/src/backend/nodes/equalfuncs.c +++ b/src/backend/nodes/equalfuncs.c @@ -3089,6 +3089,7 @@ _equalConstraint(const Constraint *a, const Constraint *b) COMPARE_NODE_FIELD(pktable); COMPARE_NODE_FIELD(fk_attrs); COMPARE_NODE_FIELD(pk_attrs); + COMPARE_STRING_FIELD(pk_indexname); COMPARE_SCALAR_FIELD(fk_matchtype); COMPARE_SCALAR_FIELD(fk_upd_action); COMPARE_SCALAR_FIELD(fk_del_action); diff --git a/src/backend/nodes/outfuncs.c b/src/backend/nodes/outfuncs.c index ce12915592..59166f0ffa 100644 --- a/src/backend/nodes/outfuncs.c +++ b/src/backend/nodes/outfuncs.c @@ -3927,6 +3927,7 @@ _outConstraint(StringInfo str, const Constraint *node) WRITE_NODE_FIELD(pktable); WRITE_NODE_FIELD(fk_attrs); WRITE_NODE_FIELD(pk_attrs); + WRITE_STRING_FIELD(pk_indexname); WRITE_CHAR_FIELD(fk_matchtype); WRITE_CHAR_FIELD(fk_upd_action); WRITE_CHAR_FIELD(fk_del_action); diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y index 969c9c158f..0222d6f20e 100644 --- a/src/backend/parser/gram.y +++ b/src/backend/parser/gram.y @@ -595,7 +595,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query); %type TableConstraint TableLikeClause %type TableLikeOptionList TableLikeOption -%type column_compression opt_column_compression +%type column_compression opt_column_compression key_index %type ColQualList %type ColConstraint ColConstraintElem ConstraintAttr %type key_match @@ -3997,7 +3997,7 @@ ColConstraintElem: $$ = (Node *) n; } - | REFERENCES qualified_name opt_column_list key_match key_actions + | REFERENCES qualified_name opt_column_list key_index key_match key_actions { Constraint *n = makeNode(Constraint); @@ -4006,10 +4006,11 @@ ColConstraintElem: n->pktable = $2; n->fk_attrs = NIL; n->pk_attrs = $3; - n->fk_matchtype = $4; - n->fk_upd_action = ($5)->updateAction->action; - n->fk_del_action = ($5)->deleteAction->action; - n->fk_del_set_cols = ($5)->deleteAction->cols; + n->pk_indexname = $4; + n->fk_matchtype = $5; + n->fk_upd_action = ($6)->updateAction->action; + n->fk_del_action = ($6)->deleteAction->action; + n->fk_del_set_cols = ($6)->deleteAction->cols; n->skip_validation = false; n->initially_valid = true; $$ = (Node *) n; @@ -4229,7 +4230,7 @@ ConstraintElem: $$ = (Node *) n; } | FOREIGN KEY '(' columnList ')' REFERENCES qualified_name - opt_column_list key_match key_actions ConstraintAttributeSpec + opt_column_list key_index key_match key_actions ConstraintAttributeSpec { Constraint *n = makeNode(Constraint); @@ -4238,11 +4239,12 @@ ConstraintElem: n->pktable = $7; n->fk_attrs = $4; n->pk_attrs = $8; - n->fk_matchtype = $9; - n->fk_upd_action = ($10)->updateAction->action; - n->fk_del_action = ($10)->deleteAction->action; - n->fk_del_set_cols = ($10)->deleteAction->cols; - processCASbits($11, @11, "FOREIGN KEY", + n->pk_indexname = $9; + n->fk_matchtype = $10; + n->fk_upd_action = ($11)->updateAction->action; + n->fk_del_action = ($11)->deleteAction->action; + n->fk_del_set_cols = ($11)->deleteAction->cols; + processCASbits($12, @12, "FOREIGN KEY", &n->deferrable, &n->initdeferred, &n->skip_validation, NULL, yyscanner); @@ -4275,6 +4277,16 @@ opt_c_include: INCLUDE '(' columnList ')' { $$ = $3; } | /* EMPTY */ { $$ = NIL; } ; +key_index: USING INDEX name + { + $$ = $3; + } + | /* EMPTY */ + { + $$ = NULL; + } + ; + key_match: MATCH FULL { $$ = FKCONSTR_MATCH_FULL; diff --git a/src/backend/utils/adt/ruleutils.c b/src/backend/utils/adt/ruleutils.c index c3937a60fd..12abea444c 100644 --- a/src/backend/utils/adt/ruleutils.c +++ b/src/backend/utils/adt/ruleutils.c @@ -2231,6 +2231,9 @@ pg_get_constraintdef_worker(Oid constraintId, bool fullCommand, case CONSTRAINT_FOREIGN: { Datum val; + Oid indexId; + HeapTuple indtup; + int nKeys; bool isnull; const char *string; @@ -2244,7 +2247,7 @@ pg_get_constraintdef_worker(Oid constraintId, bool fullCommand, elog(ERROR, "null conkey for constraint %u", constraintId); - decompile_column_index_array(val, conForm->conrelid, &buf); + nKeys = decompile_column_index_array(val, conForm->conrelid, &buf); /* add foreign relation name */ appendStringInfo(&buf, ") REFERENCES %s(", @@ -2258,10 +2261,37 @@ pg_get_constraintdef_worker(Oid constraintId, bool fullCommand, elog(ERROR, "null confkey for constraint %u", constraintId); - decompile_column_index_array(val, conForm->confrelid, &buf); + nKeys = decompile_column_index_array(val, conForm->confrelid, &buf); appendStringInfoChar(&buf, ')'); + indexId = conForm->conindid; + + /* Add USING INDEX clause */ + indtup = SearchSysCache1(INDEXRELID, ObjectIdGetDatum(indexId)); + if (!HeapTupleIsValid(indtup)) + elog(ERROR, "cache lookup failed for index %u", indexId); + + val = SysCacheGetAttr(INDEXRELID, indtup, + Anum_pg_index_indisprimary, &isnull); + if (isnull) + elog(ERROR, "null indisprimary for index %u", indexId); + + if (!DatumGetBool(val)) + { + val = SysCacheGetAttr(INDEXRELID, indtup, + Anum_pg_index_indnkeyatts, &isnull); + if (isnull) + elog(ERROR, "null indnkeyatts for index %u", indexId); + + if (DatumGetInt32(val) < nKeys) + appendStringInfo(&buf, " USING INDEX %s", + generate_relation_name(conForm->conindid, + NIL)); + } + + ReleaseSysCache(indtup); + /* Add match type */ switch (conForm->confmatchtype) { diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h index 73f635b455..0b18c4b8de 100644 --- a/src/include/nodes/parsenodes.h +++ b/src/include/nodes/parsenodes.h @@ -2646,6 +2646,7 @@ typedef struct Constraint RangeVar *pktable; /* Primary key table */ List *fk_attrs; /* Attributes of foreign key */ List *pk_attrs; /* Corresponding attrs in PK table */ + char *pk_indexname; /* Primary key index name, or NULL if unspecified */ char fk_matchtype; /* FULL, PARTIAL, SIMPLE */ char fk_upd_action; /* ON UPDATE action */ char fk_del_action; /* ON DELETE action */