Patch to allow multiple table locks in "Unison" - Mailing list pgsql-patches
From | Neil Padgett |
---|---|
Subject | Patch to allow multiple table locks in "Unison" |
Date | |
Msg-id | 3B5C70A3.ED2DCB70@redhat.com Whole thread Raw |
Responses |
Re: Patch to allow multiple table locks in "Unison"
|
List | pgsql-patches |
Below find a patch to allow the syntax: LOCK TABLE tab1, tab2, tab3 [...] as outlined in the TODO item Commands::Allow LOCK TABLE tab1, tab2, tab3 so all tables locked in unison. The behaviour is as follows: Behaviour of locking a single table is unchanged. For multiple tables, the tables are locked in series. The request blocks until a lock on all tables is obtained, each in turn. The tables are locked in OID order always -- this should prevent deadlocking. (See the comments for details.) The single table lock case is handled separately to save making the sort call, etc. under what will likely remain the most common case. The only possible downside to this system, is that starvation is possible, but to no greater an extent than with explicitly serial locking (i.e. LOCK a; LOCK b;). However, unlike with users using multiple requests, deadlock is prevented if everyone uses the new syntax to obtain multiple locks. I think I've hit all the necessary places to change things -- however, this being my first patch to PostgreSQL, I'd especially like some feedback on whether I've missed something, or if it is the custom to do something differently in a way different from what I've done. Neil Red Hat Database Team -- Neil Padgett Red Hat Canada Ltd. E-Mail: npadgett@redhat.com 2323 Yonge Street, Suite #300, Toronto, ON M4P 2C9 Index: src/backend/commands/command.c =================================================================== RCS file: /home/projects/pgsql/cvsroot/pgsql/src/backend/commands/command.c,v retrieving revision 1.136 diff -c -p -r1.136 command.c *** src/backend/commands/command.c 2001/07/16 05:06:57 1.136 --- src/backend/commands/command.c 2001/07/23 16:24:33 *************** needs_toast_table(Relation rel) *** 1985,1991 **** --- 1985,2008 ---- return (tuple_length > TOAST_TUPLE_THRESHOLD); } + /* + * + * Helper for Lock table + * Used when sorting relations -- see LockTableCommand() + * + */ + static int + qsort_compareRelOnOID(const void *a, const void *b) + { + Oid Oid_a, Oid_b; + + Oid_a = RelationGetRelid(*((Relation *) a)); + Oid_b = RelationGetRelid(*((Relation *) b)); + + return (Oid_a < Oid_b) ? -1 : (Oid_a > Oid_b) ? 1 : 0; + } + /* * * LOCK TABLE *************** needs_toast_table(Relation rel) *** 1994,2019 **** void LockTableCommand(LockStmt *lockstmt) { ! Relation rel; ! int aclresult; ! rel = heap_openr(lockstmt->relname, NoLock); ! if (rel->rd_rel->relkind != RELKIND_RELATION) ! elog(ERROR, "LOCK TABLE: %s is not a table", lockstmt->relname); ! if (lockstmt->mode == AccessShareLock) ! aclresult = pg_aclcheck(lockstmt->relname, GetUserId(), ACL_SELECT); ! else ! aclresult = pg_aclcheck(lockstmt->relname, GetUserId(), ! ACL_UPDATE | ACL_DELETE); ! if (aclresult != ACLCHECK_OK) ! elog(ERROR, "LOCK TABLE: permission denied"); ! LockRelation(rel, lockstmt->mode); ! heap_close(rel, NoLock); /* close rel, keep lock */ } --- 2011,2123 ---- void LockTableCommand(LockStmt *lockstmt) { ! int relCnt; ! ! relCnt = length(lockstmt -> rellist); ! ! /* Handle a single relation lock specially to avoid overhead on likely the ! most common case */ ! ! if(relCnt == 1) ! { ! ! /* Locking a single table */ ! ! Relation rel; ! int aclresult; ! char *relname; ! ! relname = strVal(lfirst(lockstmt->rellist)); ! ! freeList(lockstmt->rellist); ! ! rel = heap_openr(relname, NoLock); ! ! if (rel->rd_rel->relkind != RELKIND_RELATION) ! elog(ERROR, "LOCK TABLE: %s is not a table", relname); ! ! if (lockstmt->mode == AccessShareLock) ! aclresult = pg_aclcheck(relname, GetUserId(), ! ACL_SELECT); ! else ! aclresult = pg_aclcheck(relname, GetUserId(), ! ACL_UPDATE | ACL_DELETE); ! ! if (aclresult != ACLCHECK_OK) ! elog(ERROR, "LOCK TABLE: permission denied"); ! ! LockRelation(rel, lockstmt->mode); ! ! pfree(relname); ! ! heap_close(rel, NoLock); /* close rel, keep lock */ ! } ! else ! { ! List *p; ! Relation *RelationArray; ! Relation *pRel; ! ! /* Locking multiple tables */ ! ! /* Create an array of relations */ ! ! RelationArray = palloc(relCnt * sizeof(Relation)); ! pRel = RelationArray; ! ! /* Iterate over the list and populate the relation array */ ! ! foreach(p, lockstmt->rellist) ! { ! char* relname = strVal(lfirst(p)); ! int aclresult; ! ! *pRel = heap_openr(relname, NoLock); ! ! if ((*pRel)->rd_rel->relkind != RELKIND_RELATION) ! elog(ERROR, "LOCK TABLE: %s is not a table", ! relname); ! ! if (lockstmt->mode == AccessShareLock) ! aclresult = pg_aclcheck(relname, GetUserId(), ! ACL_SELECT); ! else ! aclresult = pg_aclcheck(relname, GetUserId(), ! ACL_UPDATE | ACL_DELETE); ! ! if (aclresult != ACLCHECK_OK) ! elog(ERROR, "LOCK TABLE: permission denied"); ! ! pRel++; ! pfree(relname); ! } ! ! /* Now, sort the list on OID. OIDs come in very handy here, since ! they provide a unique unchanging ordering for the relations. ! Further, new relations always have a higher OID. Hence, they ! are perfect for our purpose here -- if we always lock relations ! in OID order, we can avoid deadlock. */ ! qsort((void *) RelationArray, (size_t) relCnt, ! sizeof(Relation), qsort_compareRelOnOID); ! /* Now, lock all the relations, closing each after it is locked ! (Keeping the locks) ! */ ! for(pRel = RelationArray; ! pRel < RelationArray + relCnt; ! pRel++) ! { ! LockRelation(*pRel, lockstmt->mode); ! heap_close(*pRel, NoLock); ! } ! /* Free the relation array */ ! pfree(RelationArray); ! } } Index: src/backend/nodes/copyfuncs.c =================================================================== RCS file: /home/projects/pgsql/cvsroot/pgsql/src/backend/nodes/copyfuncs.c,v retrieving revision 1.148 diff -c -p -r1.148 copyfuncs.c *** src/backend/nodes/copyfuncs.c 2001/07/16 19:07:37 1.148 --- src/backend/nodes/copyfuncs.c 2001/07/23 16:24:33 *************** _copyLockStmt(LockStmt *from) *** 2425,2432 **** { LockStmt *newnode = makeNode(LockStmt); ! if (from->relname) ! newnode->relname = pstrdup(from->relname); newnode->mode = from->mode; return newnode; --- 2425,2432 ---- { LockStmt *newnode = makeNode(LockStmt); ! Node_Copy(from, newnode, rellist); ! newnode->mode = from->mode; return newnode; Index: src/backend/nodes/equalfuncs.c =================================================================== RCS file: /home/projects/pgsql/cvsroot/pgsql/src/backend/nodes/equalfuncs.c,v retrieving revision 1.96 diff -c -p -r1.96 equalfuncs.c *** src/backend/nodes/equalfuncs.c 2001/07/16 19:07:38 1.96 --- src/backend/nodes/equalfuncs.c 2001/07/23 16:24:33 *************** _equalDropUserStmt(DropUserStmt *a, Drop *** 1283,1289 **** static bool _equalLockStmt(LockStmt *a, LockStmt *b) { ! if (!equalstr(a->relname, b->relname)) return false; if (a->mode != b->mode) return false; --- 1283,1289 ---- static bool _equalLockStmt(LockStmt *a, LockStmt *b) { ! if (!equal(a->rellist, b->rellist)) return false; if (a->mode != b->mode) return false; Index: src/backend/parser/gram.y =================================================================== RCS file: /home/projects/pgsql/cvsroot/pgsql/src/backend/parser/gram.y,v retrieving revision 2.238 diff -c -p -r2.238 gram.y *** src/backend/parser/gram.y 2001/07/16 19:07:40 2.238 --- src/backend/parser/gram.y 2001/07/23 16:24:36 *************** DeleteStmt: DELETE FROM relation_expr w *** 3280,3290 **** } ; ! LockStmt: LOCK_P opt_table relation_name opt_lock { LockStmt *n = makeNode(LockStmt); ! n->relname = $3; n->mode = $4; $$ = (Node *)n; } --- 3280,3290 ---- } ; ! LockStmt: LOCK_P opt_table relation_name_list opt_lock { LockStmt *n = makeNode(LockStmt); ! n->rellist = $3; n->mode = $4; $$ = (Node *)n; } Index: src/include/nodes/parsenodes.h =================================================================== RCS file: /home/projects/pgsql/cvsroot/pgsql/src/include/nodes/parsenodes.h,v retrieving revision 1.136 diff -c -p -r1.136 parsenodes.h *** src/include/nodes/parsenodes.h 2001/07/16 19:07:40 1.136 --- src/include/nodes/parsenodes.h 2001/07/23 16:24:37 *************** typedef struct VariableResetStmt *** 760,766 **** typedef struct LockStmt { NodeTag type; ! char *relname; /* relation to lock */ int mode; /* lock mode */ } LockStmt; --- 760,766 ---- typedef struct LockStmt { NodeTag type; ! List *rellist; /* relations to lock */ int mode; /* lock mode */ } LockStmt; Index: src/interfaces/ecpg/preproc/preproc.y =================================================================== RCS file: /home/projects/pgsql/cvsroot/pgsql/src/interfaces/ecpg/preproc/preproc.y,v retrieving revision 1.146 diff -c -p -r1.146 preproc.y *** src/interfaces/ecpg/preproc/preproc.y 2001/07/16 05:07:00 1.146 --- src/interfaces/ecpg/preproc/preproc.y 2001/07/23 16:24:40 *************** DeleteStmt: DELETE FROM relation_expr w *** 2421,2427 **** } ; ! LockStmt: LOCK_P opt_table relation_name opt_lock { $$ = cat_str(4, make_str("lock"), $2, $3, $4); } --- 2421,2427 ---- } ; ! LockStmt: LOCK_P opt_table relation_name_list opt_lock { $$ = cat_str(4, make_str("lock"), $2, $3, $4); }
pgsql-patches by date: