Re: CLUSTER patch - Mailing list pgsql-patches
From | Bruce Momjian |
---|---|
Subject | Re: CLUSTER patch |
Date | |
Msg-id | 200207141721.g6EHL0S09573@candle.pha.pa.us Whole thread Raw |
In response to | Re: CLUSTER patch (Tom Lane <tgl@sss.pgh.pa.us>) |
Responses |
Re: CLUSTER patch
|
List | pgsql-patches |
Tom Lane wrote: > Bruce Momjian <pgman@candle.pha.pa.us> writes: > > + CommandCounterIncrement(); > > + > > + // bjm > > + // RelationIdInvalidateRelationCacheByRelationId(r1); > > + // RelationIdInvalidateRelationCacheByRelationId(r2); > > + > > + RelationClearRelation(RelationIdGetRelation(r1), true); > > + // RelationClearRelation(RelationIdGetRelation(r2), true); > > + > > + CommandCounterIncrement(); > > Surely the above is neither necessary nor appropriate. The relcache > should automatically rebuild its entries for these relations at New patch attached. Something like this is required or heap_drop/index_drop will fail because it can't find the relation cache entries for the relation. Maybe the trick is to properly invalidate the relation caches when pg_class is updated. This is particularly significant for thisxactonly relations. I don't think we need to handle oid changes in pg_class, but relfilenode changes are not updated, causing problems down the road. The attached patch actually does work with the following warnings: test=> cluster i on test; WARNING: trying to delete a reldesc (rd_node) that does not exist. WARNING: trying to delete a reldesc (rd_node) that does not exist. CLUSTER My guess is that the proper fix is elsewhere. I looked through relcache.c and didn't see any case where a relfilenode could be invalidated _without_ removing the old entry first, but of course, the old entry has a different value from the new one. My guess is that the work is to be done there somewhere. My cache forget calls work because it forces new cache entries to match pg_class, and that way the other commands can succeed. > CommandCounterIncrement. In any case I do not care for exporting > internal relcache routines to make CLUSTER work ... I need to get cluster working before I can worry about style. -- Bruce Momjian | http://candle.pha.pa.us pgman@candle.pha.pa.us | (610) 853-3000 + If your life is a hard drive, | 830 Blythe Avenue + Christ can be your backup. | Drexel Hill, Pennsylvania 19026 Index: src/backend/commands/cluster.c =================================================================== RCS file: /cvsroot/pgsql/src/backend/commands/cluster.c,v retrieving revision 1.83 diff -c -r1.83 cluster.c *** src/backend/commands/cluster.c 12 Jul 2002 18:43:15 -0000 1.83 --- src/backend/commands/cluster.c 14 Jul 2002 17:03:37 -0000 *************** *** 15,21 **** * * * IDENTIFICATION ! * $Header: /cvsroot/pgsql/src/backend/commands/cluster.c,v 1.83 2002/07/12 18:43:15 tgl Exp $ * *------------------------------------------------------------------------- */ --- 15,21 ---- * * * IDENTIFICATION ! * $Header: /cvsroot/pgsql/src/backend/commands/cluster.c,v 1.82 2002/06/20 20:29:26 momjian Exp $ * *------------------------------------------------------------------------- */ *************** *** 24,74 **** #include "access/genam.h" #include "access/heapam.h" - #include "catalog/dependency.h" #include "catalog/heap.h" #include "catalog/index.h" #include "catalog/pg_index.h" #include "catalog/pg_proc.h" #include "commands/cluster.h" #include "commands/tablecmds.h" #include "miscadmin.h" #include "utils/builtins.h" #include "utils/lsyscache.h" #include "utils/syscache.h" static Oid copy_heap(Oid OIDOldHeap, const char *NewName); - static Oid copy_index(Oid OIDOldIndex, Oid OIDNewHeap, - const char *NewIndexName); static void rebuildheap(Oid OIDNewHeap, Oid OIDOldHeap, Oid OIDOldIndex); /* * cluster * * STILL TO DO: ! * Create a list of all the other indexes on this relation. Because ! * the cluster will wreck all the tids, I'll need to destroy bogus ! * indexes. The user will have to re-create them. Not nice, but ! * I'm not a nice guy. The alternative is to try some kind of post ! * destroy re-build. This may be possible. I'll check out what the ! * index create functiond want in the way of paramaters. On the other ! * hand, re-creating n indexes may blow out the space. */ void cluster(RangeVar *oldrelation, char *oldindexname) { Oid OIDOldHeap, OIDOldIndex, ! OIDNewHeap, ! OIDNewIndex; Relation OldHeap, OldIndex; char NewHeapName[NAMEDATALEN]; ! char NewIndexName[NAMEDATALEN]; ObjectAddress object; /* ! * We grab exclusive access to the target rel and index for the * duration of the transaction. */ OldHeap = heap_openrv(oldrelation, AccessExclusiveLock); --- 24,99 ---- #include "access/genam.h" #include "access/heapam.h" #include "catalog/heap.h" + #include "catalog/dependency.h" #include "catalog/index.h" + #include "catalog/indexing.h" + #include "catalog/catname.h" #include "catalog/pg_index.h" #include "catalog/pg_proc.h" #include "commands/cluster.h" #include "commands/tablecmds.h" #include "miscadmin.h" #include "utils/builtins.h" + #include "utils/fmgroids.h" #include "utils/lsyscache.h" + #include "utils/relcache.h" #include "utils/syscache.h" + /* + * We need one of these structs for each index in the relation to be + * clustered. It's basically the data needed by index_create() so + * we can recreate the indexes after destroying the old heap. + */ + typedef struct + { + char *indexName; + IndexInfo *indexInfo; + Oid accessMethodOID; + Oid *classOID; + Oid indexOID; + bool isPrimary; + } IndexAttrs; static Oid copy_heap(Oid OIDOldHeap, const char *NewName); static void rebuildheap(Oid OIDNewHeap, Oid OIDOldHeap, Oid OIDOldIndex); + List *get_indexattr_list (Oid OIDOldHeap); + void recreate_indexattr(Oid OIDOldHeap, List *indexes); + void swap_relfilenodes(Oid r1, Oid r2); + + void RelationFlushRelation(Relation relation); /* * cluster * * STILL TO DO: ! * Keep foreign keys, permissions and inheritance of the clustered table. ! * ! * We need to look at making use of the ability to write a new version of a ! * table (or index) under a new relfilenode value, without changing the ! * table's OID. */ void cluster(RangeVar *oldrelation, char *oldindexname) { Oid OIDOldHeap, OIDOldIndex, ! OIDNewHeap; Relation OldHeap, OldIndex; char NewHeapName[NAMEDATALEN]; ! List *indexes; ObjectAddress object; + /* The games with filenodes may not be rollbackable, so + * disallow running this inside a transaction block. + * This may be a false assumption though. + */ + if (IsTransactionBlock()) + elog (ERROR, "CLUSTER: may not be called inside a transaction block"); + /* ! * We grab exclusive access to the target relation for the * duration of the transaction. */ OldHeap = heap_openrv(oldrelation, AccessExclusiveLock); *************** *** 96,143 **** heap_close(OldHeap, NoLock); index_close(OldIndex); ! /* ! * Create the new heap with a temporary name. ! */ ! snprintf(NewHeapName, NAMEDATALEN, "temp_%u", OIDOldHeap); OIDNewHeap = copy_heap(OIDOldHeap, NewHeapName); /* We do not need CommandCounterIncrement() because copy_heap did it. */ ! /* ! * Copy the heap data into the new table in the desired order. ! */ rebuildheap(OIDNewHeap, OIDOldHeap, OIDOldIndex); /* To make the new heap's data visible. */ CommandCounterIncrement(); - /* Create new index over the tuples of the new heap. */ - snprintf(NewIndexName, NAMEDATALEN, "temp_%u", OIDOldIndex); ! OIDNewIndex = copy_index(OIDOldIndex, OIDNewHeap, NewIndexName); ! CommandCounterIncrement(); ! /* Destroy old heap (along with its index) and rename new. */ object.classId = RelOid_pg_class; ! object.objectId = OIDOldHeap; object.objectSubId = 0; /* XXX better to use DROP_CASCADE here? */ performDeletion(&object, DROP_RESTRICT); - /* performDeletion does CommandCounterIncrement at end */ - - renamerel(OIDNewHeap, oldrelation->relname); - - /* This one might be unnecessary, but let's be safe. */ - CommandCounterIncrement(); - - renamerel(OIDNewIndex, oldindexname); } static Oid copy_heap(Oid OIDOldHeap, const char *NewName) { --- 121,164 ---- heap_close(OldHeap, NoLock); index_close(OldIndex); ! /* Save the information of all indexes on the relation. */ ! indexes = get_indexattr_list(OIDOldHeap); + /* Create the new heap with a temporary name. */ + snprintf(NewHeapName, NAMEDATALEN, "temp_%u", OIDOldHeap); OIDNewHeap = copy_heap(OIDOldHeap, NewHeapName); /* We do not need CommandCounterIncrement() because copy_heap did it. */ ! /* Copy the heap data into the new table in the desired order. */ rebuildheap(OIDNewHeap, OIDOldHeap, OIDOldIndex); /* To make the new heap's data visible. */ CommandCounterIncrement(); ! /* Swap the relfilenodes of the old and new heaps. */ ! swap_relfilenodes(OIDNewHeap, OIDOldHeap); ! /* At this point, Old points to the new file, and new points to old */ ! ! /* Recreate the indexes on the relation. We do not need ! * CommandCounterIncrement() because recreate_indexattr does it. ! */ ! recreate_indexattr(OIDOldHeap, indexes); ! /* Destroy the new heap, carrying the old filenode along. */ object.classId = RelOid_pg_class; ! object.objectId = OIDNewHeap; object.objectSubId = 0; /* XXX better to use DROP_CASCADE here? */ performDeletion(&object, DROP_RESTRICT); /* performDeletion does CommandCounterIncrement at end */ } + /* Create and initialize the new heap + */ static Oid copy_heap(Oid OIDOldHeap, const char *NewName) { *************** *** 181,223 **** return OIDNewHeap; } ! static Oid ! copy_index(Oid OIDOldIndex, Oid OIDNewHeap, const char *NewIndexName) ! { ! Oid OIDNewIndex; ! Relation OldIndex, ! NewHeap; ! IndexInfo *indexInfo; ! ! NewHeap = heap_open(OIDNewHeap, AccessExclusiveLock); ! OldIndex = index_open(OIDOldIndex); ! ! /* ! * Create a new index like the old one. To do this I get the info ! * from pg_index, and add a new index with a temporary name (that will ! * be changed later). ! */ ! indexInfo = BuildIndexInfo(OldIndex->rd_index); ! ! OIDNewIndex = index_create(OIDNewHeap, ! NewIndexName, ! indexInfo, ! OldIndex->rd_rel->relam, ! OldIndex->rd_index->indclass, ! OldIndex->rd_index->indisprimary, ! false, /* XXX losing constraint status */ ! allowSystemTableMods); ! ! setRelhasindex(OIDNewHeap, true, ! OldIndex->rd_index->indisprimary, InvalidOid); ! ! index_close(OldIndex); ! heap_close(NewHeap, NoLock); ! ! return OIDNewIndex; ! } ! ! static void rebuildheap(Oid OIDNewHeap, Oid OIDOldHeap, Oid OIDOldIndex) { --- 202,209 ---- return OIDNewHeap; } ! /* Load the data into the new heap, clustered. ! */ static void rebuildheap(Oid OIDNewHeap, Oid OIDOldHeap, Oid OIDOldIndex) { *************** *** 260,263 **** --- 246,418 ---- index_close(LocalOldIndex); heap_close(LocalOldHeap, NoLock); heap_close(LocalNewHeap, NoLock); + } + + /* Get the necessary info about the indexes in the relation and + * return a List of IndexAttrs. + */ + List * + get_indexattr_list (Oid OIDOldHeap) + { + ScanKeyData entry; + HeapScanDesc scan; + Relation indexRelation; + HeapTuple indexTuple; + List *indexes = NIL; + IndexAttrs *attrs; + HeapTuple tuple; + Form_pg_index index; + + /* Grab the index tuples by looking into RelationRelationName + * by the OID of the old heap. + */ + indexRelation = heap_openr(IndexRelationName, AccessShareLock); + ScanKeyEntryInitialize(&entry, 0, Anum_pg_index_indrelid, + F_OIDEQ, ObjectIdGetDatum(OIDOldHeap)); + scan = heap_beginscan(indexRelation, SnapshotNow, 1, &entry); + while ((indexTuple = heap_getnext(scan, ForwardScanDirection)) != NULL) + { + index = (Form_pg_index) GETSTRUCT(indexTuple); + + attrs = (IndexAttrs *) palloc(sizeof(IndexAttrs)); + attrs->indexInfo = BuildIndexInfo(index); + attrs->isPrimary = index->indisprimary; + attrs->indexOID = index->indexrelid; + + /* The opclasses are copied verbatim from the original indexes. + */ + attrs->classOID = (Oid *)palloc(sizeof(Oid) * + attrs->indexInfo->ii_NumIndexAttrs); + memcpy(attrs->classOID, index->indclass, + sizeof(Oid) * attrs->indexInfo->ii_NumIndexAttrs); + + /* Name and access method of each index come from + * RelationRelationName. + */ + tuple = SearchSysCache(RELOID, + ObjectIdGetDatum(attrs->indexOID), + 0, 0, 0); + if (!HeapTupleIsValid(tuple)) + elog(ERROR, "CLUSTER: cannot find index %u", attrs->indexOID); + attrs->indexName = pstrdup(NameStr(((Form_pg_class) GETSTRUCT(tuple))->relname)); + attrs->accessMethodOID = ((Form_pg_class) GETSTRUCT(tuple))->relam; + ReleaseSysCache(tuple); + + /* Cons the gathered data into the list. We do not care about + * ordering, and this is more efficient than append. + */ + indexes=lcons((void *)attrs, indexes); + } + heap_endscan(scan); + heap_close(indexRelation, NoLock); + return indexes; + } + + /* Create new indexes and swap the filenodes with old indexes. Then drop + * the new index (carrying the old heap along). + */ + void + recreate_indexattr(Oid OIDOldHeap, List *indexes) + { + IndexAttrs *attrs; + List *elem; + Oid newIndexOID; + char newIndexName[NAMEDATALEN]; + + foreach (elem, indexes) + { + attrs=(IndexAttrs *) lfirst(elem); + + /* Create the new index under a temporary name */ + snprintf(newIndexName, NAMEDATALEN, "temp_%u", attrs->indexOID); + newIndexOID = index_create(OIDOldHeap, newIndexName, + attrs->indexInfo, attrs->accessMethodOID, + attrs->classOID, attrs->isPrimary, + false, /* XXX losing constraint status */ + allowSystemTableMods); + CommandCounterIncrement(); + + /* Swap the filenodes. */ + swap_relfilenodes(newIndexOID, attrs->indexOID); + setRelhasindex(OIDOldHeap, true, attrs->isPrimary, InvalidOid); + + /* I'm not sure this one is needed, but let's be safe. */ + CommandCounterIncrement(); + + /* Drop the new index, carrying the old filenode along. */ + index_drop(newIndexOID); + CommandCounterIncrement(); + + pfree(attrs->classOID); + pfree(attrs); + } + freeList(indexes); + } + + /* Swap the relfilenodes for two given relations. + */ + void + swap_relfilenodes(Oid r1, Oid r2) + { + /* I can probably keep RelationRelationName open in the main + * function and pass the Relation around so I don't have to open + * it avery time. + */ + Relation relRelation, + rel1, rel2, + irels[Num_pg_class_indices]; + HeapTuple reltup[2]; + Oid tempRFNode; + + rel1 = RelationIdGetRelation(r1); + rel2 = RelationIdGetRelation(r2); + RelationFlushRelation(rel1); + RelationFlushRelation(rel2); + RelationDecrementReferenceCount(rel1); + RelationDecrementReferenceCount(rel2); + + CommandCounterIncrement(); + /* We need both RelationRelationName tuples. */ + relRelation = heap_openr(RelationRelationName, RowExclusiveLock); + + reltup[0] = SearchSysCacheCopy(RELOID, + ObjectIdGetDatum(r1), + 0, 0, 0); + if (!HeapTupleIsValid(reltup[0])) + elog(ERROR, "CLUSTER: Cannot find tuple for relation %u", r1); + reltup[1] = SearchSysCacheCopy(RELOID, + ObjectIdGetDatum(r2), + 0, 0, 0); + if (!HeapTupleIsValid(reltup[1])) + elog(ERROR, "CLUSTER: Cannot find tuple for relation %u", r2); + + /* Swap the filenodes */ + tempRFNode = ((Form_pg_class) GETSTRUCT(reltup[0]))->relfilenode; + ((Form_pg_class) GETSTRUCT(reltup[0]))->relfilenode = + ((Form_pg_class) GETSTRUCT(reltup[1]))->relfilenode; + ((Form_pg_class) GETSTRUCT(reltup[1]))->relfilenode = tempRFNode; + + /* Update the RelationRelationName tuples */ + simple_heap_update(relRelation, &reltup[0]->t_self, reltup[0]); + simple_heap_update(relRelation, &reltup[1]->t_self, reltup[1]); + + /* Keep system catalogs current */ + CatalogOpenIndices(Num_pg_class_indices, Name_pg_class_indices, irels); + CatalogIndexInsert(irels, Num_pg_class_indices, relRelation, reltup[0]); + CatalogIndexInsert(irels, Num_pg_class_indices, relRelation, reltup[1]); + CatalogCloseIndices(Num_pg_class_indices, irels); + + heap_close(relRelation, RowExclusiveLock); + heap_freetuple(reltup[0]); + heap_freetuple(reltup[1]); + + CommandCounterIncrement(); + + rel1 = RelationIdGetRelation(r1); + rel2 = RelationIdGetRelation(r2); + RelationFlushRelation(rel1); + RelationFlushRelation(rel2); + RelationDecrementReferenceCount(rel1); + RelationDecrementReferenceCount(rel2); + CommandCounterIncrement(); } Index: src/backend/utils/cache/relcache.c =================================================================== RCS file: /cvsroot/pgsql/src/backend/utils/cache/relcache.c,v retrieving revision 1.166 diff -c -r1.166 relcache.c *** src/backend/utils/cache/relcache.c 12 Jul 2002 18:43:18 -0000 1.166 --- src/backend/utils/cache/relcache.c 14 Jul 2002 17:03:46 -0000 *************** *** 238,249 **** (void *)&(RELATION->rd_id), \ HASH_REMOVE, NULL); \ if (idhentry == NULL) \ ! elog(WARNING, "trying to delete a reldesc that does not exist."); \ nodentry = (RelNodeCacheEnt*)hash_search(RelationNodeCache, \ (void *)&(RELATION->rd_node), \ HASH_REMOVE, NULL); \ if (nodentry == NULL) \ ! elog(WARNING, "trying to delete a reldesc that does not exist."); \ if (IsSystemNamespace(RelationGetNamespace(RELATION))) \ { \ char *relname = RelationGetRelationName(RELATION); \ --- 238,249 ---- (void *)&(RELATION->rd_id), \ HASH_REMOVE, NULL); \ if (idhentry == NULL) \ ! elog(WARNING, "trying to delete a reldesc (rd_id) that does not exist."); \ nodentry = (RelNodeCacheEnt*)hash_search(RelationNodeCache, \ (void *)&(RELATION->rd_node), \ HASH_REMOVE, NULL); \ if (nodentry == NULL) \ ! elog(WARNING, "trying to delete a reldesc (rd_node) that does not exist."); \ if (IsSystemNamespace(RelationGetNamespace(RELATION))) \ { \ char *relname = RelationGetRelationName(RELATION); \ *************** *** 252,258 **** relname, \ HASH_REMOVE, NULL); \ if (namehentry == NULL) \ ! elog(WARNING, "trying to delete a reldesc that does not exist."); \ } \ } while(0) --- 252,258 ---- relname, \ HASH_REMOVE, NULL); \ if (namehentry == NULL) \ ! elog(WARNING, "trying to delete a reldesc (relname) that does not exist."); \ } \ } while(0) *************** *** 281,287 **** #ifdef ENABLE_REINDEX_NAILED_RELATIONS static void RelationReloadClassinfo(Relation relation); #endif /* ENABLE_REINDEX_NAILED_RELATIONS */ ! static void RelationFlushRelation(Relation relation); static Relation RelationSysNameCacheGetRelation(const char *relationName); static bool load_relcache_init_file(void); static void write_relcache_init_file(void); --- 281,287 ---- #ifdef ENABLE_REINDEX_NAILED_RELATIONS static void RelationReloadClassinfo(Relation relation); #endif /* ENABLE_REINDEX_NAILED_RELATIONS */ ! //static void RelationFlushRelation(Relation relation); static Relation RelationSysNameCacheGetRelation(const char *relationName); static bool load_relcache_init_file(void); static void write_relcache_init_file(void); *************** *** 683,689 **** */ rewrite_desc = heap_openr(RewriteRelationName, AccessShareLock); rewrite_tupdesc = RelationGetDescr(rewrite_desc); ! rewrite_scan = systable_beginscan(rewrite_desc, RewriteRelRulenameIndex, true, SnapshotNow, 1, &key); --- 683,689 ---- */ rewrite_desc = heap_openr(RewriteRelationName, AccessShareLock); rewrite_tupdesc = RelationGetDescr(rewrite_desc); ! rewrite_scan = systable_beginscan(rewrite_desc, RewriteRelRulenameIndex, true, SnapshotNow, 1, &key); *************** *** 1651,1657 **** * (one with refcount > 0). However, this routine just does whichever * it's told to do; callers must determine which they want. */ ! static void RelationClearRelation(Relation relation, bool rebuildIt) { MemoryContext oldcxt; --- 1651,1658 ---- * (one with refcount > 0). However, this routine just does whichever * it's told to do; callers must determine which they want. */ ! //bjm ! void RelationClearRelation(Relation relation, bool rebuildIt) { MemoryContext oldcxt; *************** *** 1803,1809 **** * * Rebuild the relation if it is open (refcount > 0), else blow it away. */ ! static void RelationFlushRelation(Relation relation) { bool rebuildIt; --- 1804,1810 ---- * * Rebuild the relation if it is open (refcount > 0), else blow it away. */ ! void RelationFlushRelation(Relation relation) { bool rebuildIt;
pgsql-patches by date: