Re: parallel pg_restore - WIP patch - Mailing list pgsql-hackers
From | Andrew Dunstan |
---|---|
Subject | Re: parallel pg_restore - WIP patch |
Date | |
Msg-id | 48E16712.8020301@dunslane.net Whole thread Raw |
In response to | Re: parallel pg_restore - WIP patch (Stefan Kaltenbrunner <stefan@kaltenbrunner.cc>) |
Responses |
Re: parallel pg_restore - WIP patch
Re: parallel pg_restore - WIP patch |
List | pgsql-hackers |
Stefan Kaltenbrunner wrote: > Tom Lane wrote: >> Andrew Dunstan <andrew@dunslane.net> writes: >>> Tom Lane wrote: >>>> Um, FKs could conflict with each other too, so that by itself isn't >>>> gonna fix anything. >> >>> Good point. Looks like we'll need to make a list of "can't run in >>> parallel with" items as well as strict dependencies. >> >> Yeah, I was just thinking about that. The current archive format >> doesn't really carry enough information for this. I think there >> are two basic solutions we could adopt: >> >> * Extend the archive format to provide some indication that "restoring >> this object requires exclusive access to these dependencies". >> >> * Hardwire knowledge into pg_restore that certain types of objects >> require exclusive access to their dependencies. >> >> The former seems more flexible, as well as more in tune with the basic >> design assumption that pg_restore shouldn't have a lot of knowledge >> about individual archive object types. But it would mean that you >> couldn't use parallel restore with any pre-8.4 dumps. In the long run >> that's no big deal, but in the short run it's annoying. > > hmm not sure how much of a problem that really is - we usually > recommend to use the pg_dump version of the target database anyway. > > > > We don't really need a huge amount of hardwiring as it turns out. Here is a version of the patch that tries to do what's needed in this area. cheers andrew Index: pg_backup.h =================================================================== RCS file: /cvsroot/pgsql/src/bin/pg_dump/pg_backup.h,v retrieving revision 1.47 diff -c -r1.47 pg_backup.h *** pg_backup.h 13 Apr 2008 03:49:21 -0000 1.47 --- pg_backup.h 29 Sep 2008 23:34:57 -0000 *************** *** 123,128 **** --- 123,130 ---- int suppressDumpWarnings; /* Suppress output of WARNING entries * to stderr */ bool single_txn; + int number_of_threads; + bool truncate_before_load; bool *idWanted; /* array showing which dump IDs to emit */ } RestoreOptions; *************** *** 165,170 **** --- 167,173 ---- extern void CloseArchive(Archive *AH); extern void RestoreArchive(Archive *AH, RestoreOptions *ropt); + extern void RestoreArchiveParallel(Archive *AH, RestoreOptions *ropt); /* Open an existing archive */ extern Archive *OpenArchive(const char *FileSpec, const ArchiveFormat fmt); Index: pg_backup_archiver.c =================================================================== RCS file: /cvsroot/pgsql/src/bin/pg_dump/pg_backup_archiver.c,v retrieving revision 1.158 diff -c -r1.158 pg_backup_archiver.c *** pg_backup_archiver.c 5 Sep 2008 23:53:42 -0000 1.158 --- pg_backup_archiver.c 29 Sep 2008 23:34:58 -0000 *************** *** 27,38 **** --- 27,51 ---- #include <unistd.h> + #include <sys/types.h> + #include <sys/wait.h> + + #ifdef WIN32 #include <io.h> #endif #include "libpq/libpq-fs.h" + typedef struct _parallel_slot + { + pid_t pid; + TocEntry *te; + DumpId dumpId; + DumpId tdeps[2]; + } ParallelSlot; + + #define NO_SLOT (-1) const char *progname; *************** *** 70,76 **** --- 83,100 ---- static void dumpTimestamp(ArchiveHandle *AH, const char *msg, time_t tim); static OutputContext SetOutput(ArchiveHandle *AH, char *filename, int compression); static void ResetOutput(ArchiveHandle *AH, OutputContext savedContext); + static bool work_is_being_done(ParallelSlot *slot, int n_slots); + static int get_next_slot(ParallelSlot *slots, int n_slots); + static TocEntry *get_next_work_item(ArchiveHandle *AH, ParallelSlot *slots, int n_slots); + static void prestore(ArchiveHandle *AH, TocEntry *te); + static void mark_work_done(ArchiveHandle *AH, pid_t worker, ParallelSlot *slots, int n_slots); + static int _restore_one_te(ArchiveHandle *ah, TocEntry *te, RestoreOptions *ropt,bool is_parallel); + static void _reduce_dependencies(ArchiveHandle * AH, TocEntry *te); + static void _fix_dependency_counts(ArchiveHandle *AH); + static void _inhibit_data_for_failed_table(ArchiveHandle *AH, TocEntry * te); + + static ArchiveHandle *GAH; /* * Wrapper functions. *************** *** 125,137 **** /* Public */ void RestoreArchive(Archive *AHX, RestoreOptions *ropt) { ArchiveHandle *AH = (ArchiveHandle *) AHX; TocEntry *te; teReqs reqs; OutputContext sav; - bool defnDumped; AH->ropt = ropt; AH->stage = STAGE_INITIALIZING; --- 149,633 ---- /* Public */ void + RestoreArchiveParallel(Archive *AHX, RestoreOptions *ropt) + { + + ArchiveHandle *AH = (ArchiveHandle *) AHX; + ParallelSlot *slots; + int next_slot; + TocEntry *next_work_item = NULL; + int work_status; + pid_t ret_child; + int n_slots = ropt->number_of_threads; + TocEntry *te; + teReqs reqs; + + + AH->debugLevel = 99; + /* some routines that use ahlog() don't get passed AH */ + GAH = AH; + + ahlog(AH,1,"entering RestoreARchiveParallel\n"); + + + slots = (ParallelSlot *) calloc(sizeof(ParallelSlot),n_slots); + AH->ropt = ropt; + + /* + if (ropt->create) + die_horribly(AH,modulename, + "parallel restore is incompatible with --create\n"); + */ + + + if (ropt->dropSchema) + die_horribly(AH,modulename, + "parallel restore is incompatible with --clean\n"); + + if (!ropt->useDB) + die_horribly(AH,modulename, + "parallel restore requires direct database connection\n"); + + + #ifndef HAVE_LIBZ + + /* make sure we won't need (de)compression we haven't got */ + if (AH->compression != 0 && AH->PrintTocDataPtr != NULL) + { + for (te = AH->toc->next; te != AH->toc; te = te->next) + { + reqs = _tocEntryRequired(te, ropt, false); + if (te->hadDumper && (reqs & REQ_DATA) != 0) + die_horribly(AH, modulename, + "cannot restore from compressed archive (compression not supported in this installation)\n"); + } + } + #endif + + ahlog(AH, 1, "connecting to database for restore\n"); + if (AH->version < K_VERS_1_3) + die_horribly(AH, modulename, + "direct database connections are not supported in pre-1.3 archives\n"); + + /* XXX Should get this from the archive */ + AHX->minRemoteVersion = 070100; + AHX->maxRemoteVersion = 999999; + + /* correct dependency counts in case we're doing a partial restore */ + if (ropt->idWanted == NULL) + InitDummyWantedList(AHX,ropt); + _fix_dependency_counts(AH); + + /* + * Since we're talking to the DB directly, don't send comments since they + * obscure SQL when displaying errors + */ + AH->noTocComments = 1; + + /* Do all the early stuff in a single connection in the parent. + * There's no great point in running it in parallel and it will actually + * run faster in a single connection because we avoid all the connection + * and setup overhead, including the 0.5s sleep below. + */ + ConnectDatabase(AHX, ropt->dbname, + ropt->pghost, ropt->pgport, ropt->username, + ropt->requirePassword); + + + /* + * Establish important parameter values right away. + */ + _doSetFixedOutputState(AH); + + while((next_work_item = get_next_work_item(AH,NULL,0)) != NULL) + { + /* XXX need to improve this test in case there is no table data */ + /* need to test for indexes, FKs, PK, Unique, etc */ + if(strcmp(next_work_item->desc,"TABLE DATA") == 0) + break; + (void) _restore_one_te(AH, next_work_item, ropt, false); + + next_work_item->prestored = true; + + _reduce_dependencies(AH,next_work_item); + } + + + /* + * now close parent connection in prep for parallel step. + */ + PQfinish(AH->connection); + AH->connection = NULL; + + /* blow away any preserved state from the previous connection */ + + if (AH->currSchema) + free(AH->currSchema); + AH->currSchema = strdup(""); + if (AH->currUser) + free(AH->currUser); + AH->currUser = strdup(""); + if (AH->currTablespace) + free(AH->currTablespace); + AH->currTablespace = NULL; + AH->currWithOids = -1; + + /* main parent loop */ + + ahlog(AH,1,"entering main loop\n"); + + while (((next_work_item = get_next_work_item(AH,slots,n_slots)) != NULL) || + (work_is_being_done(slots,n_slots))) + { + if (next_work_item != NULL && + ((next_slot = get_next_slot(slots,n_slots)) != NO_SLOT)) + { + /* there is work still to do and a worker slot available */ + + pid_t child; + + next_work_item->prestored = true; + + child = fork(); + if (child == 0) + { + prestore(AH,next_work_item); + /* should not happen ... we expect prestore to exit */ + exit(1); + } + else if (child > 0) + { + slots[next_slot].pid = child; + slots[next_slot].te = next_work_item; + slots[next_slot].dumpId = next_work_item->dumpId; + slots[next_slot].tdeps[0] = next_work_item->tdeps[0]; + slots[next_slot].tdeps[1] = next_work_item->tdeps[1]; + } + else + { + /* XXX fork error - handle it! */ + } + continue; /* in case the slots are not yet full */ + } + /* if we get here there must be work being done */ + ret_child = wait(&work_status); + + if (WIFEXITED(work_status) && WEXITSTATUS(work_status) == 0) + { + mark_work_done(AH, ret_child, slots, n_slots); + } + else if (WIFEXITED(work_status) && WEXITSTATUS(work_status) == 1) + { + int i; + + for (i = 0; i < n_slots; i++) + { + if (slots[i].pid == ret_child) + _inhibit_data_for_failed_table(AH, slots[i].te); + break; + } + mark_work_done(AH, ret_child, slots, n_slots); + } + else + { + /* XXX something went wrong - deal with it */ + } + } + + /* + * now process the ACLs - no need to do this in parallel + */ + + /* reconnect from parent */ + ConnectDatabase(AHX, ropt->dbname, + ropt->pghost, ropt->pgport, ropt->username, + ropt->requirePassword); + + /* + * Scan TOC to output ownership commands and ACLs + */ + for (te = AH->toc->next; te != AH->toc; te = te->next) + { + AH->currentTE = te; + + /* Work out what, if anything, we want from this entry */ + reqs = _tocEntryRequired(te, ropt, true); + + if ((reqs & REQ_SCHEMA) != 0) /* We want the schema */ + { + ahlog(AH, 1, "setting owner and privileges for %s %s\n", + te->desc, te->tag); + _printTocEntry(AH, te, ropt, false, true); + } + } + + /* clean up */ + PQfinish(AH->connection); + AH->connection = NULL; + + } + + static bool + work_is_being_done(ParallelSlot *slot, int n_slots) + { + ahlog(GAH,1,"is work being done?\n"); + while(n_slots--) + { + if (slot->pid > 0) + return true; + slot++; + } + ahlog(GAH,1,"work is not being done\n"); + return false; + } + + static int + get_next_slot(ParallelSlot *slots, int n_slots) + { + int i; + + for (i = 0; i < n_slots; i++) + { + if (slots[i].pid == 0) + { + ahlog(GAH,1,"available slots is %d\n",i); + return i; + } + } + ahlog(GAH,1,"No slot available\n"); + return NO_SLOT; + } + + static TocEntry* + get_next_work_item(ArchiveHandle *AH, ParallelSlot *slots, int n_slots) + { + TocEntry *te; + teReqs reqs; + int i; + + /* just search from the top of the queue until we find an available item. + * Note that the queue isn't reordered in the current implementation. If + * we ever do reorder it, then certain code that processes entries from the + * current item to the end of the queue will probably need to be + * re-examined. + */ + + for (te = AH->toc->next; te != AH->toc; te = te->next) + { + if (!te->prestored && te->depCount < 1) + { + /* make sure it's not an ACL */ + reqs = _tocEntryRequired (te, AH->ropt, false); + if ((reqs & (REQ_SCHEMA | REQ_DATA)) == 0) + continue; + + /* check against parallel slots for incompatible table locks */ + for (i=0; i < n_slots; i++) + { + if ((slots[i].tdeps[0] != 0 && + (te->tdeps[0] == slots[i].tdeps[0] || te->tdeps[1] == slots[i].tdeps[0])) || + (slots[i].tdeps[1] != 0 && + (te->tdeps[0] == slots[i].tdeps[1] || te->tdeps[1] == slots[i].tdeps[1]))) + { + if (strcmp(te->desc,"CONSTRAINT") == 0 || + strcmp(te->desc,"FK CONSTRAINT") == 0 || + strcmp(te->desc,"CHECK CONSTRAINT") == 0 || + strcmp(te->desc,"TRIGGER") == 0 || + strcmp(slots[i].te->desc,"CONSTRAINT") == 0 || + strcmp(slots[i].te->desc,"FK CONSTRAINT") == 0 || + strcmp(slots[i].te->desc,"CHECK CONSTRAINT") == 0 || + strcmp(slots[i].te->desc,"TRIGGER") == 0) + { + /* If either the thing that is running will have an + * AccessExclusive lock on the table, or this item + * would acquire such a lock, the item can't run yet. + */ + continue; + } + + } + } + + ahlog(AH,1,"next item is %d\n",te->dumpId); + return te; + } + } + ahlog(AH,1,"No item ready\n"); + return NULL; + } + + static void + prestore(ArchiveHandle *AH, TocEntry *te) + { + RestoreOptions *ropt = AH->ropt; + int retval; + + /* close and reopen the archive so we have a private copy that doesn't + * stomp on anyone else's file pointer + */ + + (AH->ReopenPtr)(AH); + + ConnectDatabase((Archive *)AH, ropt->dbname, + ropt->pghost, ropt->pgport, ropt->username, + ropt->requirePassword); + + /* + * Establish important parameter values right away. + */ + _doSetFixedOutputState(AH); + + retval = _restore_one_te(AH, te, ropt, true); + + PQfinish(AH->connection); + exit(retval); + + } + + static void + mark_work_done(ArchiveHandle *AH, pid_t worker, + ParallelSlot *slots, int n_slots) + { + + TocEntry *te = NULL; + int i; + + for (i = 0; i < n_slots; i++) + { + if (slots[i].pid == worker) + { + te = slots[i].te; + slots[i].pid = 0; + slots[i].te = NULL; + slots[i].dumpId = 0; + slots[i].tdeps[0] = 0; + slots[i].tdeps[1] = 0; + + break; + } + } + + /* Assert (te != NULL); */ + + _reduce_dependencies(AH,te); + + + } + + + /* + * Make sure the head of each dependency chain is a live item + * + * Once this is established the property will be maintained by + * _reduce_dependencies called as items are done. + */ + static void + _fix_dependency_counts(ArchiveHandle *AH) + { + TocEntry * te; + RestoreOptions * ropt = AH->ropt; + bool *RealDumpIds, *TableDumpIds; + DumpId d; + int i; + + + RealDumpIds = calloc(AH->maxDumpId, sizeof(bool)); + for (te = AH->toc->next; te != AH->toc; te = te->next) + { + RealDumpIds[te->dumpId-1] = true; + if (te->depCount == 0 && ! ropt->idWanted[te->dumpId -1]) + _reduce_dependencies(AH,te); + } + + /* + * It is possible that the dependencies list items that are + * not in the archive at all. Reduce the depcounts so those get + * ignored. + */ + for (te = AH->toc->next; te != AH->toc; te = te->next) + for (i = 0; i < te->nDeps; i++) + if (!RealDumpIds[te->dependencies[i]-1]) + te->depCount--; + + TableDumpIds = calloc(AH->maxDumpId,sizeof(bool)); + for (te = AH->toc->next; te != AH->toc; te = te->next) + if (strcmp(te->desc,"TABLE") == 0) + TableDumpIds[te->dumpId-1] = true; + + for (te = AH->toc->next; te != AH->toc; te = te->next) + for (i = 0; i < te->nDeps; i++) + { + d = te->dependencies[i]; + if (TableDumpIds[d-1]) + { + if (te->tdeps[0] == d || te->tdeps[1] == d) + continue; + + if (te->tdeps[0] == 0) + te->tdeps[0] = d; + else if (te->tdeps[1] == 0) + te->tdeps[1] = d; + else + die_horribly(AH,modulename, + "item %d has a dependency on more than two tables", te->dumpId); + } + } + } + + static void + _reduce_dependencies(ArchiveHandle * AH, TocEntry *te) + { + DumpId item = te->dumpId; + RestoreOptions * ropt = AH->ropt; + int i; + + for (te = te->next; te != AH->toc; te = te->next) + { + if (te->nDeps == 0) + continue; + + for (i = 0; i < te->nDeps; i++) + if (te->dependencies[i] == item) + te->depCount = te->depCount - 1; + + /* If this is a table data item we are making available, + * make the table's dependencies depend on this item instead of + * the table definition, so they + * don't get scheduled until the data is loaded. + * Have to do this now before the main loop gets to anything + * further down the list. + */ + if (te->depCount == 0 && strcmp(te->desc,"TABLEDATA") == 0) + { + TocEntry *tes; + int j; + for (tes = te->next; tes != AH->toc; tes = tes->next) + for (j = 0; j < tes->nDeps; j++) + if (tes->dependencies[j] == item) + tes->dependencies[j] = te->dumpId; + } + + /* + * If this item won't in fact be done, and is now at + * 0 dependency count, we pretend it's been done and + * reduce the dependency counts of all the things that + * depend on it, by a recursive call + */ + if (te->depCount == 0 && ! ropt->idWanted[te->dumpId -1]) + _reduce_dependencies(AH,te); + } + + } + + + /* Public */ + void RestoreArchive(Archive *AHX, RestoreOptions *ropt) { ArchiveHandle *AH = (ArchiveHandle *) AHX; TocEntry *te; teReqs reqs; OutputContext sav; AH->ropt = ropt; AH->stage = STAGE_INITIALIZING; *************** *** 171,176 **** --- 667,686 ---- AH->noTocComments = 1; } + #ifndef HAVE_LIBZ + + /* make sure we won't need (de)compression we haven't got */ + if (AH->compression != 0 && AH->PrintTocDataPtr != NULL) + { + for (te = AH->toc->next; te != AH->toc; te = te->next) + { + reqs = _tocEntryRequired(te, ropt, false); + if (te->hadDumper && (reqs & REQ_DATA) != 0) + die_horribly(AH, modulename, "cannot restore from compressed archive (compression not supported in thisinstallation)\n"); + } + } + #endif + /* * Work out if we have an implied data-only restore. This can happen if * the dump was data only or if the user has used a toc list to exclude *************** *** 270,409 **** */ for (te = AH->toc->next; te != AH->toc; te = te->next) { ! AH->currentTE = te; ! ! /* Work out what, if anything, we want from this entry */ ! reqs = _tocEntryRequired(te, ropt, false); ! ! /* Dump any relevant dump warnings to stderr */ ! if (!ropt->suppressDumpWarnings && strcmp(te->desc, "WARNING") == 0) ! { ! if (!ropt->dataOnly && te->defn != NULL && strlen(te->defn) != 0) ! write_msg(modulename, "warning from original dump file: %s\n", te->defn); ! else if (te->copyStmt != NULL && strlen(te->copyStmt) != 0) ! write_msg(modulename, "warning from original dump file: %s\n", te->copyStmt); ! } ! ! defnDumped = false; ! ! if ((reqs & REQ_SCHEMA) != 0) /* We want the schema */ ! { ! ahlog(AH, 1, "creating %s %s\n", te->desc, te->tag); ! ! _printTocEntry(AH, te, ropt, false, false); ! defnDumped = true; ! ! /* ! * If we could not create a table and --no-data-for-failed-tables ! * was given, ignore the corresponding TABLE DATA ! */ ! if (ropt->noDataForFailedTables && ! AH->lastErrorTE == te && ! strcmp(te->desc, "TABLE") == 0) ! { ! TocEntry *tes; ! ! ahlog(AH, 1, "table \"%s\" could not be created, will not restore its data\n", ! te->tag); ! ! for (tes = te->next; tes != AH->toc; tes = tes->next) ! { ! if (strcmp(tes->desc, "TABLE DATA") == 0 && ! strcmp(tes->tag, te->tag) == 0 && ! strcmp(tes->namespace ? tes->namespace : "", ! te->namespace ? te->namespace : "") == 0) ! { ! /* mark it unwanted */ ! ropt->idWanted[tes->dumpId - 1] = false; ! break; ! } ! } ! } ! ! /* If we created a DB, connect to it... */ ! if (strcmp(te->desc, "DATABASE") == 0) ! { ! ahlog(AH, 1, "connecting to new database \"%s\"\n", te->tag); ! _reconnectToDB(AH, te->tag); ! } ! } ! ! /* ! * If we have a data component, then process it ! */ ! if ((reqs & REQ_DATA) != 0) ! { ! /* ! * hadDumper will be set if there is genuine data component for ! * this node. Otherwise, we need to check the defn field for ! * statements that need to be executed in data-only restores. ! */ ! if (te->hadDumper) ! { ! /* ! * If we can output the data, then restore it. ! */ ! if (AH->PrintTocDataPtr !=NULL && (reqs & REQ_DATA) != 0) ! { ! #ifndef HAVE_LIBZ ! if (AH->compression != 0) ! die_horribly(AH, modulename, "cannot restore from compressed archive (compression not supportedin this installation)\n"); ! #endif ! ! _printTocEntry(AH, te, ropt, true, false); ! ! if (strcmp(te->desc, "BLOBS") == 0 || ! strcmp(te->desc, "BLOB COMMENTS") == 0) ! { ! ahlog(AH, 1, "restoring %s\n", te->desc); ! ! _selectOutputSchema(AH, "pg_catalog"); ! ! (*AH->PrintTocDataPtr) (AH, te, ropt); ! } ! else ! { ! _disableTriggersIfNecessary(AH, te, ropt); ! ! /* Select owner and schema as necessary */ ! _becomeOwner(AH, te); ! _selectOutputSchema(AH, te->namespace); ! ! ahlog(AH, 1, "restoring data for table \"%s\"\n", ! te->tag); ! ! /* ! * If we have a copy statement, use it. As of V1.3, ! * these are separate to allow easy import from ! * withing a database connection. Pre 1.3 archives can ! * not use DB connections and are sent to output only. ! * ! * For V1.3+, the table data MUST have a copy ! * statement so that we can go into appropriate mode ! * with libpq. ! */ ! if (te->copyStmt && strlen(te->copyStmt) > 0) ! { ! ahprintf(AH, "%s", te->copyStmt); ! AH->writingCopyData = true; ! } ! ! (*AH->PrintTocDataPtr) (AH, te, ropt); ! ! AH->writingCopyData = false; ! ! _enableTriggersIfNecessary(AH, te, ropt); ! } ! } ! } ! else if (!defnDumped) ! { ! /* If we haven't already dumped the defn part, do so now */ ! ahlog(AH, 1, "executing %s %s\n", te->desc, te->tag); ! _printTocEntry(AH, te, ropt, false, false); ! } ! } ! } /* end loop over TOC entries */ /* * Scan TOC again to output ownership commands and ACLs --- 780,787 ---- */ for (te = AH->toc->next; te != AH->toc; te = te->next) { ! (void) _restore_one_te(AH, te, ropt, false); ! } /* * Scan TOC again to output ownership commands and ACLs *************** *** 451,456 **** --- 829,1009 ---- } } + static int + _restore_one_te(ArchiveHandle *AH, TocEntry *te, + RestoreOptions *ropt, bool is_parallel) + { + teReqs reqs; + bool defnDumped; + int retval = 0; + + AH->currentTE = te; + + /* Work out what, if anything, we want from this entry */ + reqs = _tocEntryRequired(te, ropt, false); + + /* Dump any relevant dump warnings to stderr */ + if (!ropt->suppressDumpWarnings && strcmp(te->desc, "WARNING") == 0) + { + if (!ropt->dataOnly && te->defn != NULL && strlen(te->defn) != 0) + write_msg(modulename, "warning from original dump file: %s\n", te->defn); + else if (te->copyStmt != NULL && strlen(te->copyStmt) != 0) + write_msg(modulename, "warning from original dump file: %s\n", te->copyStmt); + } + + defnDumped = false; + + if ((reqs & REQ_SCHEMA) != 0) /* We want the schema */ + { + ahlog(AH, 1, "creating %s %s\n", te->desc, te->tag); + + _printTocEntry(AH, te, ropt, false, false); + defnDumped = true; + + /* + * If we could not create a table and --no-data-for-failed-tables + * was given, ignore the corresponding TABLE DATA + * + * For the parallel case this must be done in the parent, so we just + * set a return value. + */ + if (ropt->noDataForFailedTables && + AH->lastErrorTE == te && + strcmp(te->desc, "TABLE") == 0) + { + if (is_parallel) + retval = 1; + else + _inhibit_data_for_failed_table(AH,te); + } + + /* If we created a DB, connect to it... */ + /* won't happen in parallel restore */ + if (strcmp(te->desc, "DATABASE") == 0) + { + ahlog(AH, 1, "connecting to new database \"%s\"\n", te->tag); + _reconnectToDB(AH, te->tag); + } + } + + /* + * If we have a data component, then process it + */ + if ((reqs & REQ_DATA) != 0) + { + /* + * hadDumper will be set if there is genuine data component for + * this node. Otherwise, we need to check the defn field for + * statements that need to be executed in data-only restores. + */ + if (te->hadDumper) + { + /* + * If we can output the data, then restore it. + */ + if (AH->PrintTocDataPtr !=NULL && (reqs & REQ_DATA) != 0) + { + _printTocEntry(AH, te, ropt, true, false); + + if (strcmp(te->desc, "BLOBS") == 0 || + strcmp(te->desc, "BLOB COMMENTS") == 0) + { + ahlog(AH, 1, "restoring %s\n", te->desc); + + _selectOutputSchema(AH, "pg_catalog"); + + (*AH->PrintTocDataPtr) (AH, te, ropt); + } + else + { + _disableTriggersIfNecessary(AH, te, ropt); + + /* Select owner and schema as necessary */ + _becomeOwner(AH, te); + _selectOutputSchema(AH, te->namespace); + + ahlog(AH, 1, "restoring data for table \"%s\"\n", + te->tag); + + if (ropt->truncate_before_load) + { + if (AH->connection) + StartTransaction(AH); + else + ahprintf(AH, "BEGIN;\n\n"); + + ahprintf(AH, "TRUNCATE TABLE %s;\n\n", + fmtId(te->tag)); } + + /* + * If we have a copy statement, use it. As of V1.3, + * these are separate to allow easy import from + * withing a database connection. Pre 1.3 archives can + * not use DB connections and are sent to output only. + * + * For V1.3+, the table data MUST have a copy + * statement so that we can go into appropriate mode + * with libpq. + */ + if (te->copyStmt && strlen(te->copyStmt) > 0) + { + ahprintf(AH, "%s", te->copyStmt); + AH->writingCopyData = true; + } + + (*AH->PrintTocDataPtr) (AH, te, ropt); + + AH->writingCopyData = false; + + if (ropt->truncate_before_load) + { + if (AH->connection) + CommitTransaction(AH); + else + ahprintf(AH, "COMMIT;\n\n"); + } + + + _enableTriggersIfNecessary(AH, te, ropt); + } + } + } + else if (!defnDumped) + { + /* If we haven't already dumped the defn part, do so now */ + ahlog(AH, 1, "executing %s %s\n", te->desc, te->tag); + _printTocEntry(AH, te, ropt, false, false); + } + } + + return retval; + } + + static void + _inhibit_data_for_failed_table(ArchiveHandle *AH, TocEntry * te) + { + TocEntry *tes; + RestoreOptions *ropt = AH->ropt; + + ahlog(AH, 1, "table \"%s\" could not be created, will not restore its data\n", + te->tag); + + for (tes = te->next; tes != AH->toc; tes = tes->next) + { + if (strcmp(tes->desc, "TABLE DATA") == 0 && + strcmp(tes->tag, te->tag) == 0 && + strcmp(tes->namespace ? tes->namespace : "", + te->namespace ? te->namespace : "") == 0) + { + /* mark it unwanted */ + ropt->idWanted[tes->dumpId - 1] = false; + + _reduce_dependencies(AH, tes); + break; + } + } + } + /* * Allocate a new RestoreOptions block. * This is mainly so we can initialize it, but also for future expansion, *************** *** 614,619 **** --- 1167,1173 ---- TocEntry *te = AH->toc->next; OutputContext sav; char *fmtName; + bool *TableDumpIds; if (ropt->filename) sav = SetOutput(AH, ropt->filename, 0 /* no compression */ ); *************** *** 650,662 **** ahprintf(AH, ";\n;\n; Selected TOC Entries:\n;\n"); while (te != AH->toc) { if (_tocEntryRequired(te, ropt, true) != 0) ! ahprintf(AH, "%d; %u %u %s %s %s %s\n", te->dumpId, te->catalogId.tableoid, te->catalogId.oid, te->desc, te->namespace ? te->namespace : "-", te->tag, te->owner); te = te->next; } --- 1204,1235 ---- ahprintf(AH, ";\n;\n; Selected TOC Entries:\n;\n"); + TableDumpIds = calloc(AH->maxDumpId,sizeof(bool)); + while(te!= AH->toc) + { + if (strcmp(te->desc,"TABLE") == 0) + TableDumpIds[te->dumpId-1] = true; + te = te->next; + } + te = AH->toc->next; + while (te != AH->toc) { if (_tocEntryRequired(te, ropt, true) != 0) ! { ! int i; ! ahprintf(AH, "%d;[%d: ",te->dumpId, te->nDeps); ! for (i=0 ;i<te->nDeps; i++) ! ahprintf(AH, "%d ",te->dependencies[i]); ! ahprintf(AH, "] { "); ! for (i=0 ;i<te->nDeps; i++) ! if (TableDumpIds[te->dependencies[i]-1]) ! ahprintf(AH, "%d ",te->dependencies[i]); ! ahprintf(AH,"} %u %u %s %s %s %s\n", te->catalogId.tableoid, te->catalogId.oid, te->desc, te->namespace ? te->namespace : "-", te->tag, te->owner); + } te = te->next; } *************** *** 1948,1965 **** --- 2521,2541 ---- deps = (DumpId *) realloc(deps, sizeof(DumpId) * depIdx); te->dependencies = deps; te->nDeps = depIdx; + te->depCount = depIdx; } else { free(deps); te->dependencies = NULL; te->nDeps = 0; + te->depCount = 0; } } else { te->dependencies = NULL; te->nDeps = 0; + te->depCount = 0; } if (AH->ReadExtraTocPtr) Index: pg_backup_archiver.h =================================================================== RCS file: /cvsroot/pgsql/src/bin/pg_dump/pg_backup_archiver.h,v retrieving revision 1.76 diff -c -r1.76 pg_backup_archiver.h *** pg_backup_archiver.h 7 Nov 2007 12:24:24 -0000 1.76 --- pg_backup_archiver.h 29 Sep 2008 23:34:58 -0000 *************** *** 99,104 **** --- 99,105 ---- struct _restoreList; typedef void (*ClosePtr) (struct _archiveHandle * AH); + typedef void (*ReopenPtr) (struct _archiveHandle * AH); typedef void (*ArchiveEntryPtr) (struct _archiveHandle * AH, struct _tocEntry * te); typedef void (*StartDataPtr) (struct _archiveHandle * AH, struct _tocEntry * te); *************** *** 212,217 **** --- 213,219 ---- WriteBufPtr WriteBufPtr; /* Write a buffer of output to the archive */ ReadBufPtr ReadBufPtr; /* Read a buffer of input from the archive */ ClosePtr ClosePtr; /* Close the archive */ + ReopenPtr ReopenPtr; /* Reopen the archive */ WriteExtraTocPtr WriteExtraTocPtr; /* Write extra TOC entry data * associated with the current archive * format */ *************** *** 231,236 **** --- 233,239 ---- char *archdbname; /* DB name *read* from archive */ bool requirePassword; PGconn *connection; + char *cachepw; int connectToDB; /* Flag to indicate if direct DB connection is * required */ bool writingCopyData; /* True when we are sending COPY data */ *************** *** 284,289 **** --- 287,293 ---- DumpId dumpId; bool hadDumper; /* Archiver was passed a dumper routine (used * in restore) */ + bool prestored; /* keep track of parallel restore */ char *tag; /* index tag */ char *namespace; /* null or empty string if not in a schema */ char *tablespace; /* null if not in a tablespace; empty string *************** *** 296,301 **** --- 300,307 ---- char *copyStmt; DumpId *dependencies; /* dumpIds of objects this one depends on */ int nDeps; /* number of dependencies */ + int depCount; /* adjustable tally of dependencies */ + int tdeps[2]; DataDumperPtr dataDumper; /* Routine to dump data for object */ void *dataDumperArg; /* Arg for above routine */ Index: pg_backup_custom.c =================================================================== RCS file: /cvsroot/pgsql/src/bin/pg_dump/pg_backup_custom.c,v retrieving revision 1.40 diff -c -r1.40 pg_backup_custom.c *** pg_backup_custom.c 28 Oct 2007 21:55:52 -0000 1.40 --- pg_backup_custom.c 29 Sep 2008 23:34:58 -0000 *************** *** 40,45 **** --- 40,46 ---- static size_t _WriteBuf(ArchiveHandle *AH, const void *buf, size_t len); static size_t _ReadBuf(ArchiveHandle *AH, void *buf, size_t len); static void _CloseArchive(ArchiveHandle *AH); + static void _ReopenArchive(ArchiveHandle *AH); static void _PrintTocData(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt); static void _WriteExtraToc(ArchiveHandle *AH, TocEntry *te); static void _ReadExtraToc(ArchiveHandle *AH, TocEntry *te); *************** *** 120,125 **** --- 121,127 ---- AH->WriteBufPtr = _WriteBuf; AH->ReadBufPtr = _ReadBuf; AH->ClosePtr = _CloseArchive; + AH->ReopenPtr = _ReopenArchive; AH->PrintTocDataPtr = _PrintTocData; AH->ReadExtraTocPtr = _ReadExtraToc; AH->WriteExtraTocPtr = _WriteExtraToc; *************** *** 835,840 **** --- 837,879 ---- AH->FH = NULL; } + static void + _ReopenArchive(ArchiveHandle *AH) + { + lclContext *ctx = (lclContext *) AH->formatData; + pgoff_t tpos; + + if (AH->mode == archModeWrite) + { + die_horribly(AH,modulename,"Can only reopen input archives"); + } + else if ((! AH->fSpec) || strcmp(AH->fSpec, "") == 0) + { + die_horribly(AH,modulename,"Cannot reopen stdin"); + } + + tpos = ftello(AH->FH); + + if (fclose(AH->FH) != 0) + die_horribly(AH, modulename, "could not close archive file: %s\n", + strerror(errno)); + + AH->FH = fopen(AH->fSpec, PG_BINARY_R); + if (!AH->FH) + die_horribly(AH, modulename, "could not open input file \"%s\": %s\n", + AH->fSpec, strerror(errno)); + + if (ctx->hasSeek) + { + fseeko(AH->FH, tpos, SEEK_SET); + } + else + { + die_horribly(AH,modulename,"cannot reopen non-seekable file"); + } + + } + /*-------------------------------------------------- * END OF FORMAT CALLBACKS *-------------------------------------------------- Index: pg_backup_db.c =================================================================== RCS file: /cvsroot/pgsql/src/bin/pg_dump/pg_backup_db.c,v retrieving revision 1.80 diff -c -r1.80 pg_backup_db.c *** pg_backup_db.c 16 Aug 2008 02:25:06 -0000 1.80 --- pg_backup_db.c 29 Sep 2008 23:34:58 -0000 *************** *** 138,148 **** ahlog(AH, 1, "connecting to database \"%s\" as user \"%s\"\n", newdb, newuser); ! if (AH->requirePassword) { password = simple_prompt("Password: ", 100, false); if (password == NULL) die_horribly(AH, modulename, "out of memory\n"); } do --- 138,153 ---- ahlog(AH, 1, "connecting to database \"%s\" as user \"%s\"\n", newdb, newuser); ! if (AH->requirePassword && AH->cachepw == NULL) { password = simple_prompt("Password: ", 100, false); if (password == NULL) die_horribly(AH, modulename, "out of memory\n"); + AH->requirePassword = true; + } + else if (AH->requirePassword) + { + password = AH->cachepw; } do *************** *** 174,180 **** } } while (new_pass); ! if (password) free(password); /* check for version mismatch */ --- 179,185 ---- } } while (new_pass); ! if (password != AH->cachepw) free(password); /* check for version mismatch */ *************** *** 206,220 **** if (AH->connection) die_horribly(AH, modulename, "already connected to a database\n"); ! if (reqPwd) { password = simple_prompt("Password: ", 100, false); if (password == NULL) die_horribly(AH, modulename, "out of memory\n"); AH->requirePassword = true; } else AH->requirePassword = false; /* * Start the connection. Loop until we have a password if requested by --- 211,231 ---- if (AH->connection) die_horribly(AH, modulename, "already connected to a database\n"); ! if (reqPwd && AH->cachepw == NULL) { password = simple_prompt("Password: ", 100, false); if (password == NULL) die_horribly(AH, modulename, "out of memory\n"); AH->requirePassword = true; } + else if (reqPwd) + { + password = AH->cachepw; + } else + { AH->requirePassword = false; + } /* * Start the connection. Loop until we have a password if requested by *************** *** 241,247 **** } while (new_pass); if (password) ! free(password); /* check to see that the backend connection was successfully made */ if (PQstatus(AH->connection) == CONNECTION_BAD) --- 252,258 ---- } while (new_pass); if (password) ! AH->cachepw = password; /* check to see that the backend connection was successfully made */ if (PQstatus(AH->connection) == CONNECTION_BAD) Index: pg_backup_files.c =================================================================== RCS file: /cvsroot/pgsql/src/bin/pg_dump/pg_backup_files.c,v retrieving revision 1.34 diff -c -r1.34 pg_backup_files.c *** pg_backup_files.c 28 Oct 2007 21:55:52 -0000 1.34 --- pg_backup_files.c 29 Sep 2008 23:34:58 -0000 *************** *** 87,92 **** --- 87,93 ---- AH->WriteBufPtr = _WriteBuf; AH->ReadBufPtr = _ReadBuf; AH->ClosePtr = _CloseArchive; + AH->ReopenPtr = NULL; AH->PrintTocDataPtr = _PrintTocData; AH->ReadExtraTocPtr = _ReadExtraToc; AH->WriteExtraTocPtr = _WriteExtraToc; Index: pg_backup_tar.c =================================================================== RCS file: /cvsroot/pgsql/src/bin/pg_dump/pg_backup_tar.c,v retrieving revision 1.62 diff -c -r1.62 pg_backup_tar.c *** pg_backup_tar.c 15 Nov 2007 21:14:41 -0000 1.62 --- pg_backup_tar.c 29 Sep 2008 23:34:58 -0000 *************** *** 143,148 **** --- 143,149 ---- AH->WriteBufPtr = _WriteBuf; AH->ReadBufPtr = _ReadBuf; AH->ClosePtr = _CloseArchive; + AH->ReopenPtr = NULL; AH->PrintTocDataPtr = _PrintTocData; AH->ReadExtraTocPtr = _ReadExtraToc; AH->WriteExtraTocPtr = _WriteExtraToc; Index: pg_restore.c =================================================================== RCS file: /cvsroot/pgsql/src/bin/pg_dump/pg_restore.c,v retrieving revision 1.88 diff -c -r1.88 pg_restore.c *** pg_restore.c 13 Apr 2008 03:49:22 -0000 1.88 --- pg_restore.c 29 Sep 2008 23:34:58 -0000 *************** *** 78,83 **** --- 78,84 ---- static int no_data_for_failed_tables = 0; static int outputNoTablespaces = 0; static int use_setsessauth = 0; + static int truncate_before_load = 0; struct option cmdopts[] = { {"clean", 0, NULL, 'c'}, *************** *** 92,97 **** --- 93,99 ---- {"ignore-version", 0, NULL, 'i'}, {"index", 1, NULL, 'I'}, {"list", 0, NULL, 'l'}, + {"multi-thread",1,NULL,'m'}, {"no-privileges", 0, NULL, 'x'}, {"no-acl", 0, NULL, 'x'}, {"no-owner", 0, NULL, 'O'}, *************** *** 114,119 **** --- 116,122 ---- {"disable-triggers", no_argument, &disable_triggers, 1}, {"no-data-for-failed-tables", no_argument, &no_data_for_failed_tables, 1}, {"no-tablespaces", no_argument, &outputNoTablespaces, 1}, + {"truncate-before-load", no_argument, &truncate_before_load, 1}, {"use-set-session-authorization", no_argument, &use_setsessauth, 1}, {NULL, 0, NULL, 0} *************** *** 139,145 **** } } ! while ((c = getopt_long(argc, argv, "acCd:ef:F:h:iI:lL:n:Op:P:RsS:t:T:U:vWxX:1", cmdopts, NULL)) != -1) { switch (c) --- 142,148 ---- } } ! while ((c = getopt_long(argc, argv, "acCd:ef:F:h:iI:lL:m:n:Op:P:RsS:t:T:U:vWxX:1", cmdopts, NULL)) != -1) { switch (c) *************** *** 182,187 **** --- 185,194 ---- opts->tocFile = strdup(optarg); break; + case 'm': + opts->number_of_threads = atoi(optarg); /* XXX fix error checking */ + break; + case 'n': /* Dump data for this schema only */ opts->schemaNames = strdup(optarg); break; *************** *** 262,268 **** break; case 0: ! /* This covers the long options equivalent to -X xxx. */ break; case '1': /* Restore data in a single transaction */ --- 269,278 ---- break; case 0: ! /* ! * This covers the long options without a short equivalent, ! * including those equivalent to -X xxx. ! */ break; case '1': /* Restore data in a single transaction */ *************** *** 299,304 **** --- 309,329 ---- opts->noDataForFailedTables = no_data_for_failed_tables; opts->noTablespace = outputNoTablespaces; opts->use_setsessauth = use_setsessauth; + opts->truncate_before_load = truncate_before_load; + + if (opts->single_txn) + { + if (opts->number_of_threads > 1) + { + write_msg(NULL, "single transaction not compatible with multi-threading"); + exit(1); + } + else if (opts->truncate_before_load) + { + write_msg(NULL, "single transaction not compatible with truncate-before-load"); + exit(1); + } + } if (opts->formatName) { *************** *** 330,335 **** --- 355,362 ---- AH = OpenArchive(inputFileSpec, opts->format); + /* XXX looks like we'll have to do sanity checks in the parallel archiver */ + /* Let the archiver know how noisy to be */ AH->verbose = opts->verbose; *************** *** 351,356 **** --- 378,385 ---- if (opts->tocSummary) PrintTOCSummary(AH, opts); + else if (opts->number_of_threads > 1) + RestoreArchiveParallel(AH, opts); else RestoreArchive(AH, opts);
pgsql-hackers by date: