diff --git a/src/backend/catalog/index.c b/src/backend/catalog/index.c index 99e130c..2eab82b 100644 --- a/src/backend/catalog/index.c +++ b/src/backend/catalog/index.c @@ -111,7 +111,6 @@ static void validate_index_heapscan(Relation heapRelation, IndexInfo *indexInfo, Snapshot snapshot, v_i_state *state); -static Oid IndexGetRelation(Oid indexId); static bool ReindexIsCurrentlyProcessingIndex(Oid indexOid); static void SetReindexProcessing(Oid heapOid, Oid indexOid); static void ResetReindexProcessing(void); @@ -2763,7 +2762,7 @@ validate_index_heapscan(Relation heapRelation, * IndexGetRelation: given an index's relation OID, get the OID of the * relation it is an index on. Uses the system cache. */ -static Oid +Oid IndexGetRelation(Oid indexId) { HeapTuple tuple; diff --git a/src/backend/catalog/namespace.c b/src/backend/catalog/namespace.c index 6d4d4b1..c74168c 100644 --- a/src/backend/catalog/namespace.c +++ b/src/backend/catalog/namespace.c @@ -219,10 +219,14 @@ Datum pg_is_other_temp_schema(PG_FUNCTION_ARGS); * otherwise raise an error. * * If nowait = true, throw an error if we'd have to wait for a lock. + * + * Callback allows caller to check permissions or acquire additional locks + * prior to grabbing the relation lock. */ Oid -RangeVarGetRelid(const RangeVar *relation, LOCKMODE lockmode, bool missing_ok, - bool nowait) +RangeVarGetRelidExtended(const RangeVar *relation, LOCKMODE lockmode, + bool missing_ok, bool nowait, + RangeVarGetRelidCallback callback) { uint64 inval_count; Oid relId; @@ -336,6 +340,19 @@ RangeVarGetRelid(const RangeVar *relation, LOCKMODE lockmode, bool missing_ok, } /* + * Invoke caller-supplied callback, if any. + * + * This callback is a good place to check permissions: we haven't taken + * the table lock yet (and it's really best to check permissions before + * locking anything!), but we've gotten far enough to know what OID we + * think we should lock. Of course, concurrent DDL might things while + * we're waiting for the lock, but in that case the callback will be + * invoked again for the new OID. + */ + if (callback) + callback(relation, relId, oldRelId); + + /* * Lock relation. This will also accept any pending invalidation * messages. If we got back InvalidOid, indicating not found, then * there's nothing to lock, but we accept invalidation messages diff --git a/src/backend/commands/indexcmds.c b/src/backend/commands/indexcmds.c index 69aa5bf..174a194 100644 --- a/src/backend/commands/indexcmds.c +++ b/src/backend/commands/indexcmds.c @@ -63,7 +63,10 @@ static void ComputeIndexAttrs(IndexInfo *indexInfo, static Oid GetIndexOpClass(List *opclass, Oid attrType, char *accessMethodName, Oid accessMethodId); static char *ChooseIndexNameAddition(List *colnames); - +static void RangeVarCallbackForReindexTable(const RangeVar *relation, + Oid relId, Oid oldRelId); +static void RangeVarCallbackForReindexIndex(const RangeVar *relation, + Oid relId, Oid oldRelId); /* * CheckIndexCompatible @@ -1725,33 +1728,47 @@ void ReindexIndex(RangeVar *indexRelation) { Oid indOid; + + /* lock level used here should match index lock reindex_index() */ + indOid = RangeVarGetRelidExtended(indexRelation, AccessExclusiveLock, + false, false, + RangeVarCallbackForReindexIndex); + + reindex_index(indOid, false); +} + +/* + * Check permissions on table before acquiring relation lock; also lock + * the heap before the RangeVarGetRelid() takes the index lock, to avoid + * deadlocks. + */ +static void +RangeVarCallbackForReindexIndex(const RangeVar *relation, + Oid relId, Oid oldRelId) +{ HeapTuple tuple; - /* - * XXX: This is not safe in the presence of concurrent DDL. We should - * take AccessExclusiveLock here, but that would violate the rule that - * indexes should only be locked after their parent tables. For now, - * we live with it. - */ - indOid = RangeVarGetRelid(indexRelation, NoLock, false, false); - tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(indOid)); + tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(relId)); if (!HeapTupleIsValid(tuple)) - elog(ERROR, "cache lookup failed for relation %u", indOid); + elog(ERROR, "cache lookup failed for relation %u", relId); if (((Form_pg_class) GETSTRUCT(tuple))->relkind != RELKIND_INDEX) ereport(ERROR, (errcode(ERRCODE_WRONG_OBJECT_TYPE), errmsg("\"%s\" is not an index", - indexRelation->relname))); + relation->relname))); /* Check permissions */ - if (!pg_class_ownercheck(indOid, GetUserId())) + if (!pg_class_ownercheck(relId, GetUserId())) aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS, - indexRelation->relname); + relation->relname); - ReleaseSysCache(tuple); + /* lock levels here should match reindex_index() heap lock */ + if (OidIsValid(oldRelId)) + UnlockRelationOid(IndexGetRelation(oldRelId), ShareLock); + LockRelationOid(IndexGetRelation(relId), ShareLock); - reindex_index(indOid, false); + ReleaseSysCache(tuple); } /* @@ -1762,13 +1779,29 @@ void ReindexTable(RangeVar *relation) { Oid heapOid; - HeapTuple tuple; /* The lock level used here should match reindex_relation(). */ - heapOid = RangeVarGetRelid(relation, ShareLock, false, false); - tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(heapOid)); + heapOid = RangeVarGetRelidExtended(relation, ShareLock, false, false, + RangeVarCallbackForReindexTable); + + if (!reindex_relation(heapOid, REINDEX_REL_PROCESS_TOAST)) + ereport(NOTICE, + (errmsg("table \"%s\" has no indexes", + relation->relname))); +} + +/* + * Check permissions on table before acquiring relation lock. + */ +static void +RangeVarCallbackForReindexTable(const RangeVar *relation, + Oid relId, Oid oldRelId) +{ + HeapTuple tuple; + + tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(relId)); if (!HeapTupleIsValid(tuple)) /* shouldn't happen */ - elog(ERROR, "cache lookup failed for relation %u", heapOid); + elog(ERROR, "cache lookup failed for relation %u", relId); if (((Form_pg_class) GETSTRUCT(tuple))->relkind != RELKIND_RELATION && ((Form_pg_class) GETSTRUCT(tuple))->relkind != RELKIND_TOASTVALUE) @@ -1778,16 +1811,11 @@ ReindexTable(RangeVar *relation) relation->relname))); /* Check permissions */ - if (!pg_class_ownercheck(heapOid, GetUserId())) + if (!pg_class_ownercheck(relId, GetUserId())) aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS, relation->relname); ReleaseSysCache(tuple); - - if (!reindex_relation(heapOid, REINDEX_REL_PROCESS_TOAST)) - ereport(NOTICE, - (errmsg("table \"%s\" has no indexes", - relation->relname))); } /* diff --git a/src/include/catalog/index.h b/src/include/catalog/index.h index 8b78b05..5b09b9f 100644 --- a/src/include/catalog/index.h +++ b/src/include/catalog/index.h @@ -99,5 +99,6 @@ extern bool reindex_relation(Oid relid, int flags); extern bool ReindexIsProcessingHeap(Oid heapOid); extern bool ReindexIsProcessingIndex(Oid indexOid); +extern Oid IndexGetRelation(Oid indexId); #endif /* INDEX_H */ diff --git a/src/include/catalog/namespace.h b/src/include/catalog/namespace.h index 904c6fd..97f8e72 100644 --- a/src/include/catalog/namespace.h +++ b/src/include/catalog/namespace.h @@ -47,9 +47,15 @@ typedef struct OverrideSearchPath bool addTemp; /* implicitly prepend temp schema? */ } OverrideSearchPath; +typedef void (*RangeVarGetRelidCallback)(const RangeVar *relation, Oid relId, + Oid oldRelId); -extern Oid RangeVarGetRelid(const RangeVar *relation, LOCKMODE lockmode, - bool missing_ok, bool nowait); +#define RangeVarGetRelid(relation, lockmode, missing_ok, nowait) \ + RangeVarGetRelidExtended(relation, lockmode, missing_ok, nowait, NULL) + +extern Oid RangeVarGetRelidExtended(const RangeVar *relation, + LOCKMODE lockmode, bool missing_ok, bool nowait, + RangeVarGetRelidCallback callback); extern Oid RangeVarGetCreationNamespace(const RangeVar *newRelation); extern Oid RangeVarGetAndCheckCreationNamespace(const RangeVar *newRelation); extern void RangeVarAdjustRelationPersistence(RangeVar *newRelation, Oid nspid);