ALTER TABLE ADD/DROP INHERITS - Mailing list pgsql-patches
From | Greg Stark |
---|---|
Subject | ALTER TABLE ADD/DROP INHERITS |
Date | |
Msg-id | 87r720omsr.fsf@stark.xeocode.com Whole thread Raw |
Responses |
Re: ALTER TABLE ADD/DROP INHERITS
|
List | pgsql-patches |
As described on -hackers this is my work so far adding ADD/DROP INHERITS. It implements the controversial "ALTER TABLE <table> ADD/DROP INHERITS <parent>" syntax that requires making INHERITS a reserved keyword. I haven't seen a clear consensus yet on what the best syntax to use here would be. Also, it doesn't handle default column values yet. Other than that I think it's complete. There are a number of things I'm not completely certain I'm on the right track with though so it can certainly use some more eyeballs on it. Index: src/backend/commands/tablecmds.c =================================================================== RCS file: /projects/cvsroot/pgsql/src/backend/commands/tablecmds.c,v retrieving revision 1.184 diff -u -p -c -r1.184 tablecmds.c cvs diff: conflicting specifications of output style *** src/backend/commands/tablecmds.c 10 May 2006 23:18:39 -0000 1.184 --- src/backend/commands/tablecmds.c 7 Jun 2006 18:09:56 -0000 *************** typedef struct NewColumnValue *** 159,166 **** --- 159,168 ---- static void truncate_check_rel(Relation rel); static List *MergeAttributes(List *schema, List *supers, bool istemp, List **supOids, List **supconstr, int *supOidCount); + static void MergeAttributesIntoExisting(Relation rel, Relation relation); static bool change_varattnos_of_a_node(Node *node, const AttrNumber *newattno); static void StoreCatalogInheritance(Oid relationId, List *supers); + static void StoreCatalogInheritance1(Oid relationId, Oid parentOid, int16 seqNumber, Relation catalogRelation); static int findAttrByName(const char *attributeName, List *schema); static void setRelhassubclassInRelation(Oid relationId, bool relhassubclass); static bool needs_toast_table(Relation rel); *************** static void ATPrepSetTableSpace(AlteredT *** 246,251 **** --- 248,255 ---- static void ATExecSetTableSpace(Oid tableOid, Oid newTableSpace); static void ATExecEnableDisableTrigger(Relation rel, char *trigname, bool enable, bool skip_system); + static void ATExecAddInherits(Relation rel, RangeVar *parent); + static void ATExecDropInherits(Relation rel, RangeVar *parent); static void copy_relation_data(Relation rel, SMgrRelation dst); static void update_ri_trigger_args(Oid relid, const char *oldname, *************** static void *** 1156,1165 **** StoreCatalogInheritance(Oid relationId, List *supers) { Relation relation; - TupleDesc desc; int16 seqNumber; ListCell *entry; - HeapTuple tuple; /* * sanity checks --- 1160,1167 ---- *************** StoreCatalogInheritance(Oid relationId, *** 1179,1194 **** * anymore, there's no need to look for indirect ancestors.) */ relation = heap_open(InheritsRelationId, RowExclusiveLock); - desc = RelationGetDescr(relation); seqNumber = 1; foreach(entry, supers) { ! Oid parentOid = lfirst_oid(entry); Datum datum[Natts_pg_inherits]; char nullarr[Natts_pg_inherits]; ObjectAddress childobject, parentobject; datum[0] = ObjectIdGetDatum(relationId); /* inhrel */ datum[1] = ObjectIdGetDatum(parentOid); /* inhparent */ --- 1181,1206 ---- * anymore, there's no need to look for indirect ancestors.) */ relation = heap_open(InheritsRelationId, RowExclusiveLock); seqNumber = 1; foreach(entry, supers) { ! StoreCatalogInheritance1(relationId, lfirst_oid(entry), seqNumber, relation); ! seqNumber += 1; ! } ! ! heap_close(relation, RowExclusiveLock); ! } ! ! static void ! StoreCatalogInheritance1(Oid relationId, Oid parentOid, int16 seqNumber, Relation relation) ! { Datum datum[Natts_pg_inherits]; char nullarr[Natts_pg_inherits]; ObjectAddress childobject, parentobject; + HeapTuple tuple; + TupleDesc desc = RelationGetDescr(relation); datum[0] = ObjectIdGetDatum(relationId); /* inhrel */ datum[1] = ObjectIdGetDatum(parentOid); /* inhparent */ *************** StoreCatalogInheritance(Oid relationId, *** 1222,1234 **** * Mark the parent as having subclasses. */ setRelhassubclassInRelation(parentOid, true); - seqNumber += 1; - } - heap_close(relation, RowExclusiveLock); } /* * Look for an existing schema entry with the given name. * --- 1234,1246 ---- * Mark the parent as having subclasses. */ setRelhassubclassInRelation(parentOid, true); + } + + /* * Look for an existing schema entry with the given name. * *************** ATPrepCmd(List **wqueue, Relation rel, A *** 2053,2063 **** --- 2065,2078 ---- case AT_DisableTrig: /* DISABLE TRIGGER variants */ case AT_DisableTrigAll: case AT_DisableTrigUser: + case AT_AddInherits: + case AT_DropInherits: ATSimplePermissions(rel, false); /* These commands never recurse */ /* No command-specific prep needed */ pass = AT_PASS_MISC; break; + default: /* oops */ elog(ERROR, "unrecognized alter table type: %d", (int) cmd->subtype); *************** ATExecCmd(AlteredTableInfo *tab, Relatio *** 2233,2238 **** --- 2248,2259 ---- case AT_DisableTrigUser: /* DISABLE TRIGGER USER */ ATExecEnableDisableTrigger(rel, NULL, false, true); break; + case AT_DropInherits: + ATExecDropInherits(rel, cmd->parent); + break; + case AT_AddInherits: + ATExecAddInherits(rel, cmd->parent); + break; default: /* oops */ elog(ERROR, "unrecognized alter table type: %d", (int) cmd->subtype); *************** ATExecEnableDisableTrigger(Relation rel, *** 5880,5885 **** --- 5901,6198 ---- EnableDisableTrigger(rel, trigname, enable, skip_system); } + static void + ATExecAddInherits(Relation rel, RangeVar *parent) + { + Relation relation, catalogRelation; + SysScanDesc scan; + ScanKeyData key; + HeapTuple inheritsTuple; + int4 inhseqno = 0; + ListCell *child; + List *children; + + relation = heap_openrv(parent, AccessShareLock); /* XXX is this enough locking? */ + if (relation->rd_rel->relkind != RELKIND_RELATION) + ereport(ERROR, + (errcode(ERRCODE_WRONG_OBJECT_TYPE), + errmsg("inherited relation \"%s\" is not a table", + parent->relname))); + + + /* Permanent rels cannot inherit from temporary ones */ + if (!rel->rd_istemp && isTempNamespace(RelationGetNamespace(relation))) + ereport(ERROR, + (errcode(ERRCODE_WRONG_OBJECT_TYPE), + errmsg("cannot inherit from temporary relation \"%s\"", + parent->relname))); + + if (!pg_class_ownercheck(RelationGetRelid(relation), GetUserId())) + aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS, + RelationGetRelationName(relation)); + + /* If parent has OIDs then all children must have OIDs */ + if (relation->rd_rel->relhasoids && !rel->rd_rel->relhasoids) + ereport(ERROR, + (errcode(ERRCODE_WRONG_OBJECT_TYPE), + errmsg("table \"%s\" without OIDs cannot inherit from table \"%s\" with OIDs", + RelationGetRelationName(rel), parent->relname))); + + /* + * Reject duplications in the list of parents. -- this is the same check as + * when creating a table, but maybe we should check for the parent anywhere + * higher in the inheritance structure? + */ + catalogRelation = heap_open(InheritsRelationId, RowExclusiveLock); + ScanKeyInit(&key, + Anum_pg_inherits_inhrelid, + BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(RelationGetRelid(rel))); + scan = systable_beginscan(catalogRelation, InheritsRelidSeqnoIndexId, true, SnapshotNow, 1, &key); + while (HeapTupleIsValid(inheritsTuple = systable_getnext(scan))) + { + Form_pg_inherits inh = (Form_pg_inherits) GETSTRUCT(inheritsTuple); + if (inh->inhparent == RelationGetRelid(relation)) + ereport(ERROR, + (errcode(ERRCODE_DUPLICATE_TABLE), + errmsg("inherited relation \"%s\" duplicated", + parent->relname))); + if (inh->inhseqno > inhseqno) + inhseqno = inh->inhseqno; + } + systable_endscan(scan); + heap_close(catalogRelation, RowExclusiveLock); + + /* Get children because we have to manually recurse and also because we + * have to check for recursive inheritance graphs */ + + /* this routine is actually in the planner */ + children = find_all_inheritors(RelationGetRelid(rel)); + + if (list_member_oid(children, RelationGetRelid(relation))) + ereport(ERROR, + (errcode(ERRCODE_DUPLICATE_TABLE), + errmsg("Circular inheritance structure found"))); + + foreach(child, children) + { + Oid childrelid = lfirst_oid(child); + Relation childrel; + + childrel = relation_open(childrelid, AccessExclusiveLock); + MergeAttributesIntoExisting(childrel, relation); + relation_close(childrel, NoLock); + } + + catalogRelation = heap_open(InheritsRelationId, RowExclusiveLock); + StoreCatalogInheritance1(RelationGetRelid(rel), RelationGetRelid(relation), inhseqno+1, catalogRelation); + heap_close(catalogRelation, RowExclusiveLock); + + heap_close(relation, AccessShareLock); + } + + + static void + MergeAttributesIntoExisting(Relation rel, Relation relation) + { + Relation attrdesc; + AttrNumber parent_attno, child_attno; + TupleDesc tupleDesc; + TupleConstr *constr; + HeapTuple tuple; + + child_attno = RelationGetNumberOfAttributes(rel); + + tupleDesc = RelationGetDescr(relation); + constr = tupleDesc->constr; + + for (parent_attno = 1; parent_attno <= tupleDesc->natts; + parent_attno++) + { + Form_pg_attribute attribute = tupleDesc->attrs[parent_attno - 1]; + char *attributeName = NameStr(attribute->attname); + + /* Ignore dropped columns in the parent. */ + if (attribute->attisdropped) + continue; + + /* Does it conflict with an existing column? */ + attrdesc = heap_open(AttributeRelationId, RowExclusiveLock); + + tuple = SearchSysCacheCopyAttName(RelationGetRelid(rel), attributeName); + if (HeapTupleIsValid(tuple)) { + /* + * Yes, try to merge the two column definitions. They must + * have the same type and typmod. + */ + Form_pg_attribute childatt = (Form_pg_attribute) GETSTRUCT(tuple); + ereport(NOTICE, + (errmsg("merging column \"%s\" with inherited definition", + attributeName))); + if (attribute->atttypid != childatt->atttypid || + attribute->atttypmod != childatt->atttypmod || + (attribute->attnotnull && !childatt->attnotnull)) + ereport(ERROR, + (errcode(ERRCODE_DATATYPE_MISMATCH), + errmsg("child table \"%s\" has different type for column \"%s\"", + RelationGetRelationName(rel), NameStr(attribute->attname)))); + + childatt->attinhcount++; + simple_heap_update(attrdesc, &tuple->t_self, tuple); + CatalogUpdateIndexes(attrdesc, tuple); /* XXX strength reduce openindexes to outside loop? */ + heap_freetuple(tuple); + + /* XXX defaults */ + + } else { + /* + * No, create a new inherited column + */ + + FormData_pg_attribute attributeD; + HeapTuple attributeTuple = heap_addheader(Natts_pg_attribute, + false, + ATTRIBUTE_TUPLE_SIZE, + (void *) &attributeD); + Form_pg_attribute childatt = (Form_pg_attribute) GETSTRUCT(attributeTuple); + + if (attribute->attnotnull) + ereport(ERROR, + (errcode(ERRCODE_DATATYPE_MISMATCH), + errmsg("Cannot add new inherited NOT NULL column \"%s\"", + NameStr(attribute->attname)))); + + childatt->attrelid = RelationGetRelid(rel); + namecpy(&childatt->attname, &attribute->attname); + childatt->atttypid = attribute->atttypid; + childatt->attstattarget = -1; + childatt->attlen = attribute->attlen; + childatt->attcacheoff = -1; + childatt->atttypmod = attribute->atttypmod; + childatt->attnum = ++child_attno; + childatt->attbyval = attribute->attbyval; + childatt->attndims = attribute->attndims; + childatt->attstorage = attribute->attstorage; + childatt->attalign = attribute->attalign; + childatt->attnotnull = false; + childatt->atthasdef = false; /* XXX */ + childatt->attisdropped = false; + childatt->attislocal = false; + childatt->attinhcount = attribute->attinhcount+1; + + simple_heap_insert(attrdesc, attributeTuple); + CatalogUpdateIndexes(attrdesc, attributeTuple); + heap_freetuple(attributeTuple); + + /* XXX Defaults */ + + } + heap_close(attrdesc, RowExclusiveLock); + } + + } + + static void + ATExecDropInherits(Relation rel, RangeVar *parent) + { + + + Relation catalogRelation; + SysScanDesc scan; + ScanKeyData key; + HeapTuple inheritsTuple, attributeTuple; + Oid inhparent; + Oid dropparent; + int found = 0; + + /* Get the OID of parent -- if no schema is specified use the regular + * search path and only drop the one table that's found. We could try to be + * clever and look at each parent and see if it matches but that would be + * inconsistent with other operations I think. */ + + Assert(rel); + Assert(parent); + + dropparent = RangeVarGetRelid(parent, false); + + /* Search through the direct parents of rel looking for dropparent oid */ + + catalogRelation = heap_open(InheritsRelationId, RowExclusiveLock); + ScanKeyInit(&key, + Anum_pg_inherits_inhrelid, + BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(RelationGetRelid(rel))); + scan = systable_beginscan(catalogRelation, InheritsRelidSeqnoIndexId, true, SnapshotNow, 1, &key); + while (!found && HeapTupleIsValid(inheritsTuple = systable_getnext(scan))) + { + inhparent = ((Form_pg_inherits) GETSTRUCT(inheritsTuple))->inhparent; + if (inhparent == dropparent) { + simple_heap_delete(catalogRelation, &inheritsTuple->t_self); + found = 1; + } + } + systable_endscan(scan); + heap_close(catalogRelation, RowExclusiveLock); + + + if (!found) { + /* would it be better to look up the actual schema of dropparent and + * make the error message explicitly name the qualified name it's + * trying to drop ?*/ + if (parent->schemaname) + ereport(ERROR, + (errcode(ERRCODE_UNDEFINED_TABLE), + errmsg("relation \"%s.%s\" is not a parent of relation \"%s\"", + parent->schemaname, parent->relname, RelationGetRelationName(rel)))); + else + ereport(ERROR, + (errcode(ERRCODE_UNDEFINED_TABLE), + errmsg("relation \"%s\" is not a parent of relation \"%s\"", + parent->relname, RelationGetRelationName(rel)))); + } + + /* Search through columns looking for matching columns from parent table */ + + catalogRelation = heap_open(AttributeRelationId, RowExclusiveLock); + ScanKeyInit(&key, + Anum_pg_attribute_attrelid, + BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(RelationGetRelid(rel))); + scan = systable_beginscan(catalogRelation, AttributeRelidNumIndexId, true, SnapshotNow, 1, &key); + while (HeapTupleIsValid(attributeTuple = systable_getnext(scan))) { + Form_pg_attribute att = ((Form_pg_attribute)GETSTRUCT(attributeTuple)); + /* Not an inherited column at all + * (do NOT use islocal for this test--it can be true for inherited columns) + */ + if (att->attinhcount == 0) + continue; + if (att->attisdropped) /* XXX Is this right? */ + continue; + if (SearchSysCacheExistsAttName(dropparent, NameStr(att->attname))) { + /* Decrement inhcount and possibly set islocal to 1 */ + HeapTuple copyTuple = heap_copytuple(attributeTuple); + Form_pg_attribute copy_att = ((Form_pg_attribute)GETSTRUCT(copyTuple)); + + copy_att->attinhcount--; + if (copy_att->attinhcount == 0) + copy_att->attislocal = 1; + + simple_heap_update(catalogRelation, ©Tuple->t_self, copyTuple); + /* XXX "Avoid using it for multiple tuples, since opening the + * indexes and building the index info structures is moderately + * expensive." Perhaps this can be moved outside the loop or else + * at least the CatalogOpenIndexes/CatalogCloseIndexes moved + * outside the loop but when I try that it seg faults?!*/ + CatalogUpdateIndexes(catalogRelation, copyTuple); + heap_freetuple(copyTuple); + } + } + systable_endscan(scan); + heap_close(catalogRelation, RowExclusiveLock); + } + + + /* * ALTER TABLE CREATE TOAST TABLE * Index: src/backend/nodes/copyfuncs.c =================================================================== RCS file: /projects/cvsroot/pgsql/src/backend/nodes/copyfuncs.c,v retrieving revision 1.335 diff -u -p -c -r1.335 copyfuncs.c cvs diff: conflicting specifications of output style *** src/backend/nodes/copyfuncs.c 30 Apr 2006 18:30:38 -0000 1.335 --- src/backend/nodes/copyfuncs.c 7 Jun 2006 18:09:56 -0000 *************** _copyAlterTableCmd(AlterTableCmd *from) *** 1799,1804 **** --- 1799,1805 ---- COPY_SCALAR_FIELD(subtype); COPY_STRING_FIELD(name); COPY_NODE_FIELD(def); + COPY_NODE_FIELD(parent); COPY_NODE_FIELD(transform); COPY_SCALAR_FIELD(behavior); Index: src/backend/parser/gram.y =================================================================== RCS file: /projects/cvsroot/pgsql/src/backend/parser/gram.y,v retrieving revision 2.545 diff -u -p -c -r2.545 gram.y cvs diff: conflicting specifications of output style *** src/backend/parser/gram.y 27 May 2006 17:38:45 -0000 2.545 --- src/backend/parser/gram.y 7 Jun 2006 18:09:57 -0000 *************** alter_table_cmd: *** 1514,1519 **** --- 1514,1535 ---- n->subtype = AT_DisableTrigUser; $$ = (Node *)n; } + /* ALTER TABLE <name> ADD INHERITS <parent> */ + | ADD_P INHERITS qualified_name + { + AlterTableCmd *n = makeNode(AlterTableCmd); + n->subtype = AT_AddInherits; + n->parent = $3; + $$ = (Node *)n; + } + /* ALTER TABLE <name> DROP INHERITS <parent> */ + | DROP INHERITS qualified_name + { + AlterTableCmd *n = makeNode(AlterTableCmd); + n->subtype = AT_DropInherits; + n->parent = $3; + $$ = (Node *)n; + } | alter_rel_cmd { $$ = $1; *************** unreserved_keyword: *** 8422,8428 **** | INCREMENT | INDEX | INHERIT - | INHERITS | INPUT_P | INSENSITIVE | INSERT --- 8438,8443 ---- *************** func_name_keyword: *** 8640,8645 **** --- 8655,8661 ---- */ reserved_keyword: ALL + | INHERITS | ANALYSE | ANALYZE | AND Index: src/include/nodes/parsenodes.h =================================================================== RCS file: /projects/cvsroot/pgsql/src/include/nodes/parsenodes.h,v retrieving revision 1.310 diff -u -p -c -r1.310 parsenodes.h cvs diff: conflicting specifications of output style *** src/include/nodes/parsenodes.h 30 Apr 2006 18:30:40 -0000 1.310 --- src/include/nodes/parsenodes.h 7 Jun 2006 18:10:00 -0000 *************** typedef enum AlterTableType *** 874,880 **** AT_EnableTrigAll, /* ENABLE TRIGGER ALL */ AT_DisableTrigAll, /* DISABLE TRIGGER ALL */ AT_EnableTrigUser, /* ENABLE TRIGGER USER */ ! AT_DisableTrigUser /* DISABLE TRIGGER USER */ } AlterTableType; typedef struct AlterTableCmd /* one subcommand of an ALTER TABLE */ --- 874,882 ---- AT_EnableTrigAll, /* ENABLE TRIGGER ALL */ AT_DisableTrigAll, /* DISABLE TRIGGER ALL */ AT_EnableTrigUser, /* ENABLE TRIGGER USER */ ! AT_DisableTrigUser, /* DISABLE TRIGGER USER */ ! AT_AddInherits, /* ADD INHERITS parent */ ! AT_DropInherits /* DROP INHERITS parent */ } AlterTableType; typedef struct AlterTableCmd /* one subcommand of an ALTER TABLE */ *************** typedef struct AlterTableCmd /* one subc *** 883,888 **** --- 885,891 ---- AlterTableType subtype; /* Type of table alteration to apply */ char *name; /* column, constraint, or trigger to act on, * or new owner or tablespace */ + RangeVar *parent; /* Parent table for add/drop inherits */ Node *def; /* definition of new column, column type, * index, or constraint */ Node *transform; /* transformation expr for ALTER TYPE */ -- greg
pgsql-patches by date: