*** a/src/bin/pg_dump/common.c --- b/src/bin/pg_dump/common.c *************** *** 15,21 **** */ #include "pg_backup_archiver.h" #include "pg_backup_utils.h" ! #include #include "catalog/pg_class.h" --- 15,21 ---- */ #include "pg_backup_archiver.h" #include "pg_backup_utils.h" ! #include "parallel_utils.h" #include #include "catalog/pg_class.h" *** a/src/bin/pg_dump/compress_io.c --- b/src/bin/pg_dump/compress_io.c *************** *** 184,190 **** WriteDataToArchive(ArchiveHandle *AH, CompressorState *cs, const void *data, size_t dLen) { /* Are we aborting? */ ! checkAborting(AH); switch (cs->comprAlg) { --- 184,190 ---- const void *data, size_t dLen) { /* Are we aborting? */ ! checkAborting(); switch (cs->comprAlg) { *************** *** 351,357 **** ReadDataFromArchiveZlib(ArchiveHandle *AH, ReadFunc readF) while ((cnt = readF(AH, &buf, &buflen))) { /* Are we aborting? */ ! checkAborting(AH); zp->next_in = (void *) buf; zp->avail_in = cnt; --- 351,357 ---- while ((cnt = readF(AH, &buf, &buflen))) { /* Are we aborting? */ ! checkAborting(); zp->next_in = (void *) buf; zp->avail_in = cnt; *************** *** 414,420 **** ReadDataFromArchiveNone(ArchiveHandle *AH, ReadFunc readF) while ((cnt = readF(AH, &buf, &buflen))) { /* Are we aborting? */ ! checkAborting(AH); ahwrite(buf, 1, cnt, AH); } --- 414,420 ---- while ((cnt = readF(AH, &buf, &buflen))) { /* Are we aborting? */ ! checkAborting(); ahwrite(buf, 1, cnt, AH); } *** a/src/bin/pg_dump/parallel.c --- b/src/bin/pg_dump/parallel.c *************** *** 20,25 **** --- 20,26 ---- #include "pg_backup_utils.h" #include "parallel.h" + #include "parallel_utils.h" #ifndef WIN32 #include *************** *** 35,43 **** /* file-scope variables */ #ifdef WIN32 static unsigned int tMasterThreadId = 0; - static HANDLE termEvent = INVALID_HANDLE_VALUE; - static int pgpipe(int handles[2]); - static int piperead(int s, char *buf, int len); /* * Structure to hold info passed by _beginthreadex() to the function it calls --- 36,41 ---- *************** *** 53,228 **** typedef struct } WorkerInfo; #define pipewrite(a,b,c) send(a,b,c,0) - #else - /* - * aborting is only ever used in the master, the workers are fine with just - * wantAbort. - */ - static bool aborting = false; - static volatile sig_atomic_t wantAbort = 0; - #define pgpipe(a) pipe(a) - #define piperead(a,b,c) read(a,b,c) - #define pipewrite(a,b,c) write(a,b,c) #endif - typedef struct ShutdownInformation - { - ParallelState *pstate; - Archive *AHX; - } ShutdownInformation; - - static ShutdownInformation shutdown_info; - static const char *modulename = gettext_noop("parallel archiver"); - static ParallelSlot *GetMyPSlot(ParallelState *pstate); - static void - parallel_msg_master(ParallelSlot *slot, const char *modulename, - const char *fmt, va_list ap) __attribute__((format(PG_PRINTF_ATTRIBUTE, 3, 0))); static void archive_close_connection(int code, void *arg); - static void ShutdownWorkersHard(ParallelState *pstate); - static void WaitForTerminatingWorkers(ParallelState *pstate); - #ifndef WIN32 - static void sigTermHandler(int signum); - #endif static void SetupWorker(ArchiveHandle *AH, int pipefd[2], int worker, RestoreOptions *ropt); - static bool HasEveryWorkerTerminated(ParallelState *pstate); static void lockTableNoWait(ArchiveHandle *AH, TocEntry *te); static void WaitForCommands(ArchiveHandle *AH, int pipefd[2]); - static char *getMessageFromMaster(int pipefd[2]); - static void sendMessageToMaster(int pipefd[2], const char *str); - static int select_loop(int maxFd, fd_set *workerset); - static char *getMessageFromWorker(ParallelState *pstate, - bool do_wait, int *worker); - static void sendMessageToWorker(ParallelState *pstate, - int worker, const char *str); - static char *readMessageFromPipe(int fd); #define messageStartsWith(msg, prefix) \ (strncmp(msg, prefix, strlen(prefix)) == 0) #define messageEquals(msg, pattern) \ (strcmp(msg, pattern) == 0) - #ifdef WIN32 - static void shutdown_parallel_dump_utils(int code, void *unused); - bool parallel_init_done = false; - static DWORD tls_index; - DWORD mainThreadId; - #endif - - - #ifdef WIN32 - static void - shutdown_parallel_dump_utils(int code, void *unused) - { - /* Call the cleanup function only from the main thread */ - if (mainThreadId == GetCurrentThreadId()) - WSACleanup(); - } - #endif - - void - init_parallel_dump_utils(void) - { - #ifdef WIN32 - if (!parallel_init_done) - { - WSADATA wsaData; - int err; - - tls_index = TlsAlloc(); - mainThreadId = GetCurrentThreadId(); - err = WSAStartup(MAKEWORD(2, 2), &wsaData); - if (err != 0) - { - fprintf(stderr, _("%s: WSAStartup failed: %d\n"), progname, err); - exit_nicely(1); - } - on_exit_nicely(shutdown_parallel_dump_utils, NULL); - parallel_init_done = true; - } - #endif - } - - static ParallelSlot * - GetMyPSlot(ParallelState *pstate) - { - int i; - - for (i = 0; i < pstate->numWorkers; i++) - #ifdef WIN32 - if (pstate->parallelSlot[i].threadId == GetCurrentThreadId()) - #else - if (pstate->parallelSlot[i].pid == getpid()) - #endif - return &(pstate->parallelSlot[i]); - - return NULL; - } - - /* - * Fail and die, with a message to stderr. Parameters as for write_msg. - * - * This is defined in parallel.c, because in parallel mode, things are more - * complicated. If the worker process does exit_horribly(), we forward its - * last words to the master process. The master process then does - * exit_horribly() with this error message itself and prints it normally. - * After printing the message, exit_horribly() on the master will shut down - * the remaining worker processes. - */ - void - exit_horribly(const char *modulename, const char *fmt,...) - { - va_list ap; - ParallelState *pstate = shutdown_info.pstate; - ParallelSlot *slot; - - va_start(ap, fmt); - - if (pstate == NULL) - { - /* Not in parallel mode, just write to stderr */ - vwrite_msg(modulename, fmt, ap); - } - else - { - slot = GetMyPSlot(pstate); - - if (!slot) - /* We're the parent, just write the message out */ - vwrite_msg(modulename, fmt, ap); - else - /* If we're a worker process, send the msg to the master process */ - parallel_msg_master(slot, modulename, fmt, ap); - } - - va_end(ap); - - exit_nicely(1); - } - - /* Sends the error message from the worker to the master process */ - static void - parallel_msg_master(ParallelSlot *slot, const char *modulename, - const char *fmt, va_list ap) - { - char buf[512]; - int pipefd[2]; - - pipefd[PIPE_READ] = slot->pipeRevRead; - pipefd[PIPE_WRITE] = slot->pipeRevWrite; - - strcpy(buf, "ERROR "); - vsnprintf(buf + strlen("ERROR "), - sizeof(buf) - strlen("ERROR "), fmt, ap); - - sendMessageToMaster(pipefd, buf); - } /* * A thread-local version of getLocalPQExpBuffer(). --- 51,75 ---- *************** *** 280,286 **** getThreadLocalPQExpBuffer(void) void on_exit_close_archive(Archive *AHX) { ! shutdown_info.AHX = AHX; on_exit_nicely(archive_close_connection, &shutdown_info); } --- 127,133 ---- void on_exit_close_archive(Archive *AHX) { ! shutdown_info.handle = (void*)AHX; on_exit_nicely(archive_close_connection, &shutdown_info); } *************** *** 306,312 **** archive_close_connection(int code, void *arg) * connection (only open during parallel dump but not restore) and * shut down the remaining workers. */ ! DisconnectDatabase(si->AHX); #ifndef WIN32 /* --- 153,159 ---- * connection (only open during parallel dump but not restore) and * shut down the remaining workers. */ ! DisconnectDatabase((Archive*)si->handle); #ifndef WIN32 /* *************** *** 318,436 **** archive_close_connection(int code, void *arg) #endif ShutdownWorkersHard(si->pstate); } ! else if (slot->args->AH) ! DisconnectDatabase(&(slot->args->AH->public)); } ! else if (si->AHX) ! DisconnectDatabase(si->AHX); } /* - * If we have one worker that terminates for some reason, we'd like the other - * threads to terminate as well (and not finish with their 70 GB table dump - * first...). Now in UNIX we can just kill these processes, and let the signal - * handler set wantAbort to 1. In Windows we set a termEvent and this serves - * as the signal for everyone to terminate. - */ - void - checkAborting(ArchiveHandle *AH) - { - #ifdef WIN32 - if (WaitForSingleObject(termEvent, 0) == WAIT_OBJECT_0) - #else - if (wantAbort) - #endif - exit_horribly(modulename, "worker is terminating\n"); - } - - /* - * Shut down any remaining workers, this has an implicit do_wait == true. - * - * The fastest way we can make the workers terminate gracefully is when - * they are listening for new commands and we just tell them to terminate. - */ - static void - ShutdownWorkersHard(ParallelState *pstate) - { - #ifndef WIN32 - int i; - - signal(SIGPIPE, SIG_IGN); - - /* - * Close our write end of the sockets so that the workers know they can - * exit. - */ - for (i = 0; i < pstate->numWorkers; i++) - closesocket(pstate->parallelSlot[i].pipeWrite); - - for (i = 0; i < pstate->numWorkers; i++) - kill(pstate->parallelSlot[i].pid, SIGTERM); - #else - /* The workers monitor this event via checkAborting(). */ - SetEvent(termEvent); - #endif - - WaitForTerminatingWorkers(pstate); - } - - /* - * Wait for the termination of the processes using the OS-specific method. - */ - static void - WaitForTerminatingWorkers(ParallelState *pstate) - { - while (!HasEveryWorkerTerminated(pstate)) - { - ParallelSlot *slot = NULL; - int j; - - #ifndef WIN32 - int status; - pid_t pid = wait(&status); - - for (j = 0; j < pstate->numWorkers; j++) - if (pstate->parallelSlot[j].pid == pid) - slot = &(pstate->parallelSlot[j]); - #else - uintptr_t hThread; - DWORD ret; - uintptr_t *lpHandles = pg_malloc(sizeof(HANDLE) * pstate->numWorkers); - int nrun = 0; - - for (j = 0; j < pstate->numWorkers; j++) - if (pstate->parallelSlot[j].workerStatus != WRKR_TERMINATED) - { - lpHandles[nrun] = pstate->parallelSlot[j].hThread; - nrun++; - } - ret = WaitForMultipleObjects(nrun, (HANDLE *) lpHandles, false, INFINITE); - Assert(ret != WAIT_FAILED); - hThread = lpHandles[ret - WAIT_OBJECT_0]; - - for (j = 0; j < pstate->numWorkers; j++) - if (pstate->parallelSlot[j].hThread == hThread) - slot = &(pstate->parallelSlot[j]); - - free(lpHandles); - #endif - Assert(slot); - - slot->workerStatus = WRKR_TERMINATED; - } - Assert(HasEveryWorkerTerminated(pstate)); - } - - #ifndef WIN32 - /* Signal handling (UNIX only) */ - static void - sigTermHandler(int signum) - { - wantAbort = 1; - } - #endif - - /* * This function is called by both UNIX and Windows variants to set up a * worker process. */ --- 165,178 ---- #endif ShutdownWorkersHard(si->pstate); } ! else if (((ParallelArgs*)slot->args)->AH) ! DisconnectDatabase(&(((ParallelArgs*)slot->args)->AH->public)); } ! else if ((ArchiveHandle*)si->handle) ! DisconnectDatabase((Archive*)si->handle); } /* * This function is called by both UNIX and Windows variants to set up a * worker process. */ *************** *** 537,544 **** ParallelBackupStart(ArchiveHandle *AH, RestoreOptions *ropt) pstate->parallelSlot[i].workerStatus = WRKR_IDLE; pstate->parallelSlot[i].args = (ParallelArgs *) pg_malloc(sizeof(ParallelArgs)); ! pstate->parallelSlot[i].args->AH = NULL; ! pstate->parallelSlot[i].args->te = NULL; #ifdef WIN32 /* Allocate a new structure for every worker */ wi = (WorkerInfo *) pg_malloc(sizeof(WorkerInfo)); --- 279,286 ---- pstate->parallelSlot[i].workerStatus = WRKR_IDLE; pstate->parallelSlot[i].args = (ParallelArgs *) pg_malloc(sizeof(ParallelArgs)); ! ((ParallelArgs*)pstate->parallelSlot[i].args)->AH = NULL; ! ((ParallelArgs*)pstate->parallelSlot[i].args)->te = NULL; #ifdef WIN32 /* Allocate a new structure for every worker */ wi = (WorkerInfo *) pg_malloc(sizeof(WorkerInfo)); *************** *** 581,587 **** ParallelBackupStart(ArchiveHandle *AH, RestoreOptions *ropt) * and also clones the database connection (for parallel dump) * which both seem kinda helpful. */ ! pstate->parallelSlot[i].args->AH = CloneArchive(AH); /* close read end of Worker -> Master */ closesocket(pipeWM[PIPE_READ]); --- 323,329 ---- * and also clones the database connection (for parallel dump) * which both seem kinda helpful. */ ! ((ParallelArgs*)pstate->parallelSlot[i].args)->AH = CloneArchive(AH); /* close read end of Worker -> Master */ closesocket(pipeWM[PIPE_READ]); *************** *** 598,604 **** ParallelBackupStart(ArchiveHandle *AH, RestoreOptions *ropt) closesocket(pstate->parallelSlot[j].pipeWrite); } ! SetupWorker(pstate->parallelSlot[i].args->AH, pipefd, i, ropt); exit(0); } --- 340,346 ---- closesocket(pstate->parallelSlot[j].pipeWrite); } ! SetupWorker(((ParallelArgs*)pstate->parallelSlot[i].args)->AH, pipefd, i, ropt); exit(0); } *************** *** 738,786 **** DispatchJobForTocEntry(ArchiveHandle *AH, ParallelState *pstate, TocEntry *te, sendMessageToWorker(pstate, worker, arg); pstate->parallelSlot[worker].workerStatus = WRKR_WORKING; ! pstate->parallelSlot[worker].args->te = te; ! } ! ! /* ! * Find the first free parallel slot (if any). ! */ ! int ! GetIdleWorker(ParallelState *pstate) ! { ! int i; ! ! for (i = 0; i < pstate->numWorkers; i++) ! if (pstate->parallelSlot[i].workerStatus == WRKR_IDLE) ! return i; ! return NO_SLOT; ! } ! ! /* ! * Return true iff every worker process is in the WRKR_TERMINATED state. ! */ ! static bool ! HasEveryWorkerTerminated(ParallelState *pstate) ! { ! int i; ! ! for (i = 0; i < pstate->numWorkers; i++) ! if (pstate->parallelSlot[i].workerStatus != WRKR_TERMINATED) ! return false; ! return true; ! } ! ! /* ! * Return true iff every worker is in the WRKR_IDLE state. ! */ ! bool ! IsEveryWorkerIdle(ParallelState *pstate) ! { ! int i; ! ! for (i = 0; i < pstate->numWorkers; i++) ! if (pstate->parallelSlot[i].workerStatus != WRKR_IDLE) ! return false; ! return true; } /* --- 480,486 ---- sendMessageToWorker(pstate, worker, arg); pstate->parallelSlot[worker].workerStatus = WRKR_WORKING; ! ((ParallelArgs*)pstate->parallelSlot[worker].args)->te = te; } /* *************** *** 966,972 **** ListenToWorkers(ArchiveHandle *AH, ParallelState *pstate, bool do_wait) TocEntry *te; pstate->parallelSlot[worker].workerStatus = WRKR_FINISHED; ! te = pstate->parallelSlot[worker].args->te; if (messageStartsWith(msg, "OK RESTORE ")) { statusString = msg + strlen("OK RESTORE "); --- 666,672 ---- TocEntry *te; pstate->parallelSlot[worker].workerStatus = WRKR_FINISHED; ! te = ((ParallelArgs*)pstate->parallelSlot[worker].args)->te; if (messageStartsWith(msg, "OK RESTORE ")) { statusString = msg + strlen("OK RESTORE "); *************** *** 1001,1031 **** ListenToWorkers(ArchiveHandle *AH, ParallelState *pstate, bool do_wait) /* * This function is executed in the master process. * - * This function is used to get the return value of a terminated worker - * process. If a process has terminated, its status is stored in *status and - * the id of the worker is returned. - */ - int - ReapWorkerStatus(ParallelState *pstate, int *status) - { - int i; - - for (i = 0; i < pstate->numWorkers; i++) - { - if (pstate->parallelSlot[i].workerStatus == WRKR_FINISHED) - { - *status = pstate->parallelSlot[i].status; - pstate->parallelSlot[i].status = 0; - pstate->parallelSlot[i].workerStatus = WRKR_IDLE; - return i; - } - } - return NO_SLOT; - } - - /* - * This function is executed in the master process. - * * It looks for an idle worker process and only returns if there is one. */ void --- 701,706 ---- *************** *** 1089,1417 **** EnsureWorkersFinished(ArchiveHandle *AH, ParallelState *pstate) } } - /* - * This function is executed in the worker process. - * - * It returns the next message on the communication channel, blocking until it - * becomes available. - */ - static char * - getMessageFromMaster(int pipefd[2]) - { - return readMessageFromPipe(pipefd[PIPE_READ]); - } - - /* - * This function is executed in the worker process. - * - * It sends a message to the master on the communication channel. - */ - static void - sendMessageToMaster(int pipefd[2], const char *str) - { - int len = strlen(str) + 1; - - if (pipewrite(pipefd[PIPE_WRITE], str, len) != len) - exit_horribly(modulename, - "could not write to the communication channel: %s\n", - strerror(errno)); - } - - /* - * A select loop that repeats calling select until a descriptor in the read - * set becomes readable. On Windows we have to check for the termination event - * from time to time, on Unix we can just block forever. - */ - static int - select_loop(int maxFd, fd_set *workerset) - { - int i; - fd_set saveSet = *workerset; - - #ifdef WIN32 - /* should always be the master */ - Assert(tMasterThreadId == GetCurrentThreadId()); - - for (;;) - { - /* - * sleep a quarter of a second before checking if we should terminate. - */ - struct timeval tv = {0, 250000}; - - *workerset = saveSet; - i = select(maxFd + 1, workerset, NULL, NULL, &tv); - - if (i == SOCKET_ERROR && WSAGetLastError() == WSAEINTR) - continue; - if (i) - break; - } - #else /* UNIX */ - - for (;;) - { - *workerset = saveSet; - i = select(maxFd + 1, workerset, NULL, NULL, NULL); - - /* - * If we Ctrl-C the master process , it's likely that we interrupt - * select() here. The signal handler will set wantAbort == true and - * the shutdown journey starts from here. Note that we'll come back - * here later when we tell all workers to terminate and read their - * responses. But then we have aborting set to true. - */ - if (wantAbort && !aborting) - exit_horribly(modulename, "terminated by user\n"); - - if (i < 0 && errno == EINTR) - continue; - break; - } - #endif - - return i; - } - - - /* - * This function is executed in the master process. - * - * It returns the next message from the worker on the communication channel, - * optionally blocking (do_wait) until it becomes available. - * - * The id of the worker is returned in *worker. - */ - static char * - getMessageFromWorker(ParallelState *pstate, bool do_wait, int *worker) - { - int i; - fd_set workerset; - int maxFd = -1; - struct timeval nowait = {0, 0}; - - FD_ZERO(&workerset); - - for (i = 0; i < pstate->numWorkers; i++) - { - if (pstate->parallelSlot[i].workerStatus == WRKR_TERMINATED) - continue; - FD_SET(pstate->parallelSlot[i].pipeRead, &workerset); - /* actually WIN32 ignores the first parameter to select()... */ - if (pstate->parallelSlot[i].pipeRead > maxFd) - maxFd = pstate->parallelSlot[i].pipeRead; - } - - if (do_wait) - { - i = select_loop(maxFd, &workerset); - Assert(i != 0); - } - else - { - if ((i = select(maxFd + 1, &workerset, NULL, NULL, &nowait)) == 0) - return NULL; - } - - if (i < 0) - exit_horribly(modulename, "error in ListenToWorkers(): %s\n", strerror(errno)); - - for (i = 0; i < pstate->numWorkers; i++) - { - char *msg; - - if (!FD_ISSET(pstate->parallelSlot[i].pipeRead, &workerset)) - continue; - - msg = readMessageFromPipe(pstate->parallelSlot[i].pipeRead); - *worker = i; - return msg; - } - Assert(false); - return NULL; - } - - /* - * This function is executed in the master process. - * - * It sends a message to a certain worker on the communication channel. - */ - static void - sendMessageToWorker(ParallelState *pstate, int worker, const char *str) - { - int len = strlen(str) + 1; - - if (pipewrite(pstate->parallelSlot[worker].pipeWrite, str, len) != len) - { - /* - * If we're already aborting anyway, don't care if we succeed or not. - * The child might have gone already. - */ - #ifndef WIN32 - if (!aborting) - #endif - exit_horribly(modulename, - "could not write to the communication channel: %s\n", - strerror(errno)); - } - } - - /* - * The underlying function to read a message from the communication channel - * (fd) with optional blocking (do_wait). - */ - static char * - readMessageFromPipe(int fd) - { - char *msg; - int msgsize, - bufsize; - int ret; - - /* - * The problem here is that we need to deal with several possibilites: we - * could receive only a partial message or several messages at once. The - * caller expects us to return exactly one message however. - * - * We could either read in as much as we can and keep track of what we - * delivered back to the caller or we just read byte by byte. Once we see - * (char) 0, we know that it's the message's end. This would be quite - * inefficient for more data but since we are reading only on the command - * channel, the performance loss does not seem worth the trouble of - * keeping internal states for different file descriptors. - */ - bufsize = 64; /* could be any number */ - msg = (char *) pg_malloc(bufsize); - - msgsize = 0; - for (;;) - { - Assert(msgsize <= bufsize); - ret = piperead(fd, msg + msgsize, 1); - - /* worker has closed the connection or another error happened */ - if (ret <= 0) - break; - - Assert(ret == 1); - - if (msg[msgsize] == '\0') - return msg; - - msgsize++; - if (msgsize == bufsize) - { - /* could be any number */ - bufsize += 16; - msg = (char *) realloc(msg, bufsize); - } - } - - /* - * Worker has closed the connection, make sure to clean up before return - * since we are not returning msg (but did allocate it). - */ - free(msg); - - return NULL; - } - - #ifdef WIN32 - /* - * This is a replacement version of pipe for Win32 which allows returned - * handles to be used in select(). Note that read/write calls must be replaced - * with recv/send. "handles" have to be integers so we check for errors then - * cast to integers. - */ - static int - pgpipe(int handles[2]) - { - pgsocket s, tmp_sock; - struct sockaddr_in serv_addr; - int len = sizeof(serv_addr); - - /* We have to use the Unix socket invalid file descriptor value here. */ - handles[0] = handles[1] = -1; - - /* - * setup listen socket - */ - if ((s = socket(AF_INET, SOCK_STREAM, 0)) == PGINVALID_SOCKET) - { - write_msg(modulename, "pgpipe: could not create socket: error code %d\n", - WSAGetLastError()); - return -1; - } - - memset((void *) &serv_addr, 0, sizeof(serv_addr)); - serv_addr.sin_family = AF_INET; - serv_addr.sin_port = htons(0); - serv_addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK); - if (bind(s, (SOCKADDR *) &serv_addr, len) == SOCKET_ERROR) - { - write_msg(modulename, "pgpipe: could not bind: error code %d\n", - WSAGetLastError()); - closesocket(s); - return -1; - } - if (listen(s, 1) == SOCKET_ERROR) - { - write_msg(modulename, "pgpipe: could not listen: error code %d\n", - WSAGetLastError()); - closesocket(s); - return -1; - } - if (getsockname(s, (SOCKADDR *) &serv_addr, &len) == SOCKET_ERROR) - { - write_msg(modulename, "pgpipe: getsockname() failed: error code %d\n", - WSAGetLastError()); - closesocket(s); - return -1; - } - - /* - * setup pipe handles - */ - if ((tmp_sock = socket(AF_INET, SOCK_STREAM, 0)) == PGINVALID_SOCKET) - { - write_msg(modulename, "pgpipe: could not create second socket: error code %d\n", - WSAGetLastError()); - closesocket(s); - return -1; - } - handles[1] = (int) tmp_sock; - - if (connect(handles[1], (SOCKADDR *) &serv_addr, len) == SOCKET_ERROR) - { - write_msg(modulename, "pgpipe: could not connect socket: error code %d\n", - WSAGetLastError()); - closesocket(s); - return -1; - } - if ((tmp_sock = accept(s, (SOCKADDR *) &serv_addr, &len)) == PGINVALID_SOCKET) - { - write_msg(modulename, "pgpipe: could not accept connection: error code %d\n", - WSAGetLastError()); - closesocket(handles[1]); - handles[1] = -1; - closesocket(s); - return -1; - } - handles[0] = (int) tmp_sock; - - closesocket(s); - return 0; - } - - static int - piperead(int s, char *buf, int len) - { - int ret = recv(s, buf, len, 0); - - if (ret < 0 && WSAGetLastError() == WSAECONNRESET) - /* EOF on the pipe! (win32 socket based implementation) */ - ret = 0; - return ret; - } - - #endif --- 764,766 ---- *** a/src/bin/pg_dump/parallel.h --- b/src/bin/pg_dump/parallel.h *************** *** 20,37 **** #define PG_DUMP_PARALLEL_H #include "pg_backup_db.h" struct _archiveHandle; struct _tocEntry; - typedef enum - { - WRKR_TERMINATED = 0, - WRKR_IDLE, - WRKR_WORKING, - WRKR_FINISHED - } T_WorkerStatus; - /* Arguments needed for a worker process */ typedef struct ParallelArgs { --- 20,30 ---- #define PG_DUMP_PARALLEL_H #include "pg_backup_db.h" + #include "parallel_utils.h" struct _archiveHandle; struct _tocEntry; /* Arguments needed for a worker process */ typedef struct ParallelArgs { *************** *** 39,81 **** typedef struct ParallelArgs struct _tocEntry *te; } ParallelArgs; - /* State for each parallel activity slot */ - typedef struct ParallelSlot - { - ParallelArgs *args; - T_WorkerStatus workerStatus; - int status; - int pipeRead; - int pipeWrite; - int pipeRevRead; - int pipeRevWrite; - #ifdef WIN32 - uintptr_t hThread; - unsigned int threadId; - #else - pid_t pid; - #endif - } ParallelSlot; - - #define NO_SLOT (-1) - - typedef struct ParallelState - { - int numWorkers; - ParallelSlot *parallelSlot; - } ParallelState; - - #ifdef WIN32 - extern bool parallel_init_done; - extern DWORD mainThreadId; - #endif - - extern void init_parallel_dump_utils(void); - - extern int GetIdleWorker(ParallelState *pstate); - extern bool IsEveryWorkerIdle(ParallelState *pstate); extern void ListenToWorkers(struct _archiveHandle * AH, ParallelState *pstate, bool do_wait); - extern int ReapWorkerStatus(ParallelState *pstate, int *status); extern void EnsureIdleWorker(struct _archiveHandle * AH, ParallelState *pstate); extern void EnsureWorkersFinished(struct _archiveHandle * AH, ParallelState *pstate); --- 32,38 ---- *************** *** 86,95 **** extern void DispatchJobForTocEntry(struct _archiveHandle * AH, struct _tocEntry * te, T_Action act); extern void ParallelBackupEnd(struct _archiveHandle * AH, ParallelState *pstate); - extern void checkAborting(struct _archiveHandle * AH); - - extern void - exit_horribly(const char *modulename, const char *fmt,...) - __attribute__((format(PG_PRINTF_ATTRIBUTE, 2, 3), noreturn)); - #endif /* PG_DUMP_PARALLEL_H */ --- 43,46 ---- *** a/src/bin/pg_dump/pg_backup_archiver.c --- b/src/bin/pg_dump/pg_backup_archiver.c *************** *** 3825,3834 **** get_next_work_item(ArchiveHandle *AH, TocEntry *ready_list, if (pref_non_data) { int count = 0; ! for (k = 0; k < pstate->numWorkers; k++) ! if (pstate->parallelSlot[k].args->te != NULL && ! pstate->parallelSlot[k].args->te->section == SECTION_DATA) count++; if (pstate->numWorkers == 0 || count * 4 < pstate->numWorkers) pref_non_data = false; --- 3825,3834 ---- if (pref_non_data) { int count = 0; ! for (k = 0; k < pstate->numWorkers; k++) ! if (((ParallelArgs*)pstate->parallelSlot[k].args)->te != NULL && ! ((ParallelArgs*)pstate->parallelSlot[k].args)->te->section == SECTION_DATA) count++; if (pstate->numWorkers == 0 || count * 4 < pstate->numWorkers) pref_non_data = false; *************** *** 3852,3858 **** get_next_work_item(ArchiveHandle *AH, TocEntry *ready_list, if (pstate->parallelSlot[i].workerStatus != WRKR_WORKING) continue; ! running_te = pstate->parallelSlot[i].args->te; if (has_lock_conflicts(te, running_te) || has_lock_conflicts(running_te, te)) --- 3852,3858 ---- if (pstate->parallelSlot[i].workerStatus != WRKR_WORKING) continue; ! running_te = ((ParallelArgs*)pstate->parallelSlot[i].args)->te; if (has_lock_conflicts(te, running_te) || has_lock_conflicts(running_te, te)) *************** *** 3926,3932 **** mark_work_done(ArchiveHandle *AH, TocEntry *ready_list, { TocEntry *te = NULL; ! te = pstate->parallelSlot[worker].args->te; if (te == NULL) exit_horribly(modulename, "could not find slot of finished worker\n"); --- 3926,3932 ---- { TocEntry *te = NULL; ! te = ((ParallelArgs*)pstate->parallelSlot[worker].args)->te; if (te == NULL) exit_horribly(modulename, "could not find slot of finished worker\n"); *** a/src/bin/pg_dump/pg_backup_directory.c --- b/src/bin/pg_dump/pg_backup_directory.c *************** *** 35,42 **** #include "compress_io.h" #include "pg_backup_utils.h" #include "parallel.h" - #include #include --- 35,42 ---- #include "compress_io.h" #include "pg_backup_utils.h" + #include "parallel_utils.h" #include "parallel.h" #include #include *************** *** 356,362 **** _WriteData(ArchiveHandle *AH, const void *data, size_t dLen) lclContext *ctx = (lclContext *) AH->formatData; /* Are we aborting? */ ! checkAborting(AH); if (dLen > 0 && cfwrite(data, dLen, ctx->dataFH) != dLen) WRITE_ERROR_EXIT; --- 356,362 ---- lclContext *ctx = (lclContext *) AH->formatData; /* Are we aborting? */ ! checkAborting(); if (dLen > 0 && cfwrite(data, dLen, ctx->dataFH) != dLen) WRITE_ERROR_EXIT; *************** *** 524,530 **** _WriteBuf(ArchiveHandle *AH, const void *buf, size_t len) lclContext *ctx = (lclContext *) AH->formatData; /* Are we aborting? */ ! checkAborting(AH); if (cfwrite(buf, len, ctx->dataFH) != len) WRITE_ERROR_EXIT; --- 524,530 ---- lclContext *ctx = (lclContext *) AH->formatData; /* Are we aborting? */ ! checkAborting(); if (cfwrite(buf, len, ctx->dataFH) != len) WRITE_ERROR_EXIT; *** a/src/bin/pg_dump/pg_backup_utils.c --- b/src/bin/pg_dump/pg_backup_utils.c *************** *** 15,33 **** #include "pg_backup_utils.h" #include "parallel.h" ! /* Globals exported by this file */ const char *progname = NULL; - #define MAX_ON_EXIT_NICELY 20 - - static struct - { - on_exit_nicely_callback function; - void *arg; - } on_exit_nicely_list[MAX_ON_EXIT_NICELY]; - - static int on_exit_nicely_index; /* * Parse a --section=foo command line argument. --- 15,24 ---- #include "pg_backup_utils.h" #include "parallel.h" ! #include "parallel_utils.h" /* Globals exported by this file */ const char *progname = NULL; /* * Parse a --section=foo command line argument. *************** *** 60,126 **** set_dump_section(const char *arg, int *dumpSections) } - /* - * Write a printf-style message to stderr. - * - * The program name is prepended, if "progname" has been set. - * Also, if modulename isn't NULL, that's included too. - * Note that we'll try to translate the modulename and the fmt string. - */ - void - write_msg(const char *modulename, const char *fmt,...) - { - va_list ap; - - va_start(ap, fmt); - vwrite_msg(modulename, fmt, ap); - va_end(ap); - } - - /* - * As write_msg, but pass a va_list not variable arguments. - */ - void - vwrite_msg(const char *modulename, const char *fmt, va_list ap) - { - if (progname) - { - if (modulename) - fprintf(stderr, "%s: [%s] ", progname, _(modulename)); - else - fprintf(stderr, "%s: ", progname); - } - vfprintf(stderr, _(fmt), ap); - } - - /* Register a callback to be run when exit_nicely is invoked. */ - void - on_exit_nicely(on_exit_nicely_callback function, void *arg) - { - if (on_exit_nicely_index >= MAX_ON_EXIT_NICELY) - exit_horribly(NULL, "out of on_exit_nicely slots\n"); - on_exit_nicely_list[on_exit_nicely_index].function = function; - on_exit_nicely_list[on_exit_nicely_index].arg = arg; - on_exit_nicely_index++; - } - - /* - * Run accumulated on_exit_nicely callbacks in reverse order and then exit - * quietly. This needs to be thread-safe. - */ - void - exit_nicely(int code) - { - int i; - - for (i = on_exit_nicely_index - 1; i >= 0; i--) - (*on_exit_nicely_list[i].function) (code, - on_exit_nicely_list[i].arg); - - #ifdef WIN32 - if (parallel_init_done && GetCurrentThreadId() != mainThreadId) - ExitThread(code); - #endif - - exit(code); - } --- 51,53 ---- *** a/src/bin/pg_dump/pg_backup_utils.h --- b/src/bin/pg_dump/pg_backup_utils.h *************** *** 23,40 **** typedef enum /* bits returned by set_dump_section */ DUMP_UNSECTIONED = 0xff } DumpSections; - typedef void (*on_exit_nicely_callback) (int code, void *arg); - - extern const char *progname; - extern void set_dump_section(const char *arg, int *dumpSections); - extern void - write_msg(const char *modulename, const char *fmt,...) - __attribute__((format(PG_PRINTF_ATTRIBUTE, 2, 3))); - extern void - vwrite_msg(const char *modulename, const char *fmt, va_list ap) - __attribute__((format(PG_PRINTF_ATTRIBUTE, 2, 0))); - extern void on_exit_nicely(on_exit_nicely_callback function, void *arg); - extern void exit_nicely(int code) __attribute__((noreturn)); - #endif /* PG_BACKUP_UTILS_H */ --- 23,27 ---- *** /dev/null --- b/src/include/parallel_utils.h *************** *** 0 **** --- 1,176 ---- + /*------------------------------------------------------------------------- + * + * parallel_utils.h + * Header for src/port/ parallel execution functions. + * + * Portions Copyright (c) 1996-2014, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * src/include/port.h + * + *------------------------------------------------------------------------- + */ + #ifndef PARALLEL_UTILS_H + #define PARALLEL_UTILS_H + + #include + #include + #include + + /* + * WIN32 doesn't allow descriptors returned by pipe() to be used in select(), + * so for that platform we use socket() instead of pipe(). + * There is some inconsistency here because sometimes we require pg*, like + * pgpipe, but in other cases we define rename to pgrename just on Win32. + */ + #ifndef WIN32 + /* + * The function prototypes are not supplied because every C file + * includes this file. + */ + #define pgpipe(a) pipe(a) + #define piperead(a,b,c) read(a,b,c) + #define pipewrite(a,b,c) write(a,b,c) + #else + extern int pgpipe(int handles[2]); + extern int piperead(int s, char *buf, int len); + #define pipewrite(a,b,c) send(a,b,c,0) + #endif + + #ifdef WIN32 + #define PG_SIGNAL_COUNT 32 + #define kill(pid,sig) pgkill(pid,sig) + extern int pgkill(int pid, int sig); + #endif + + #ifdef WIN32 + extern DWORD mainThreadId; + extern bool parallel_init_done; + extern HANDLE termEvent; + + #endif + + + typedef enum + { + WRKR_TERMINATED = 0, + WRKR_IDLE, + WRKR_WORKING, + WRKR_FINISHED + } T_WorkerStatus; + + #define PIPE_READ 0 + #define PIPE_WRITE 1 + + #define NO_SLOT (-1) + + /* State for each parallel activity slot */ + typedef struct ParallelSlot + { + void *args; + T_WorkerStatus workerStatus; + int status; + int pipeRead; + int pipeWrite; + int pipeRevRead; + int pipeRevWrite; + #ifdef WIN32 + uintptr_t hThread; + unsigned int threadId; + #else + pid_t pid; + #endif + } ParallelSlot; + + typedef struct ParallelState + { + int numWorkers; + ParallelSlot *parallelSlot; + } ParallelState; + + typedef struct ShutdownInformation + { + ParallelState *pstate; + void *handle; + } ShutdownInformation; + + extern ShutdownInformation shutdown_info; + + #define MAX_ON_EXIT_NICELY 20 + + typedef void (*on_exit_nicely_callback) (int code, void *arg); + + typedef struct on_exit_nicely_stru + { + on_exit_nicely_callback function; + void *arg; + }on_exit_nicely_stru; + + extern on_exit_nicely_stru on_exit_nicely_list[MAX_ON_EXIT_NICELY]; + + extern int on_exit_nicely_index; + + + #ifdef WIN32 + extern bool parallel_init_done; + extern DWORD tls_index; + + #else + extern bool aborting; + + #endif + + extern const char *progname; + + extern char * + readMessageFromPipe(int fd); + + void + parallel_msg_master(ParallelSlot *slot, const char *modulename, + const char *fmt, va_list ap) + __attribute__((format(PG_PRINTF_ATTRIBUTE, 3, 0))); + + extern void ShutdownWorkersHard(ParallelState *pstate); + extern void WaitForTerminatingWorkers(ParallelState *pstate); + + extern bool HasEveryWorkerTerminated(ParallelState *pstate); + + extern char *getMessageFromMaster(int pipefd[2]); + extern void sendMessageToMaster(int pipefd[2], const char *str); + extern int select_loop(int maxFd, fd_set *workerset); + extern char *getMessageFromWorker(ParallelState *pstate, + bool do_wait, int *worker); + extern void sendMessageToWorker(ParallelState *pstate, + int worker, const char *str); + extern int ReapWorkerStatus(ParallelState *pstate, int *status); + + extern ParallelSlot * + GetMyPSlot(ParallelState *pstate); + + extern void init_parallel_dump_utils(void); + extern int GetIdleWorker(ParallelState *pstate); + extern bool IsEveryWorkerIdle(ParallelState *pstate); + extern void checkAborting(); + + extern void + write_msg(const char *modulename, const char *fmt,...) + __attribute__((format(PG_PRINTF_ATTRIBUTE, 2, 3))); + extern void + vwrite_msg(const char *modulename, const char *fmt, va_list ap) + __attribute__((format(PG_PRINTF_ATTRIBUTE, 2, 0))); + extern void on_exit_nicely(on_exit_nicely_callback function, void *arg); + extern void exit_nicely(int code) __attribute__((noreturn)); + + extern void + exit_horribly(const char *modulename, const char *fmt,...) + __attribute__((format(PG_PRINTF_ATTRIBUTE, 2, 3), noreturn)); + + #ifndef WIN32 + extern void sigTermHandler(int signum); + #endif + + #ifdef WIN32 + extern void shutdown_parallel_dump_utils(int code, void *unused); + #endif + + #endif /* PARALLEL_UTILS_H */ *** a/src/port/Makefile --- b/src/port/Makefile *************** *** 33,39 **** LIBS += $(PTHREAD_LIBS) OBJS = $(LIBOBJS) chklocale.o dirmod.o erand48.o fls.o inet_net_ntop.o \ noblock.o path.o pgcheckdir.o pg_crc.o pgmkdirp.o pgsleep.o \ pgstrcasecmp.o pqsignal.o \ ! qsort.o qsort_arg.o quotes.o sprompt.o tar.o thread.o # foo_srv.o and foo.o are both built from foo.c, but only foo.o has -DFRONTEND OBJS_SRV = $(OBJS:%.o=%_srv.o) --- 33,39 ---- OBJS = $(LIBOBJS) chklocale.o dirmod.o erand48.o fls.o inet_net_ntop.o \ noblock.o path.o pgcheckdir.o pg_crc.o pgmkdirp.o pgsleep.o \ pgstrcasecmp.o pqsignal.o \ ! qsort.o qsort_arg.o quotes.o sprompt.o tar.o thread.o parallel_utils.o # foo_srv.o and foo.o are both built from foo.c, but only foo.o has -DFRONTEND OBJS_SRV = $(OBJS:%.o=%_srv.o) *** /dev/null --- b/src/port/parallel_utils.c *************** *** 0 **** --- 1,737 ---- + /*------------------------------------------------------------------------- + * + * parallel_utils.c + * kill() + * + * Copyright (c) 1996-2014, PostgreSQL Global Development Group + * + * Utility function for supporing parallel execution in client tools + * + * IDENTIFICATION + * src/port/parallel.c + * + *------------------------------------------------------------------------- + */ + + #include "postgres.h" + #include "parallel_utils.h" + #include "common/fe_memutils.h" + + #ifndef WIN32 + #include + #include + #include "signal.h" + #include + #include + + #endif + + + #ifdef WIN32 + DWORD mainThreadId; + bool parallel_init_done = false; + HANDLE termEvent = INVALID_HANDLE_VALUE; + DWORD tls_index; + #else + bool aborting = false; + static volatile sig_atomic_t wantAbort = 0; + + #endif + + static const char *modulename = gettext_noop("parallel utils"); + + ShutdownInformation shutdown_info; + int on_exit_nicely_index; + on_exit_nicely_stru on_exit_nicely_list[MAX_ON_EXIT_NICELY]; + + + + /* + * The underlying function to read a message from the communication channel + * (fd) with optional blocking (do_wait). + */ + char * + readMessageFromPipe(int fd) + { + char *msg; + int msgsize, + bufsize; + int ret; + + /* + * The problem here is that we need to deal with several possibilites: we + * could receive only a partial message or several messages at once. The + * caller expects us to return exactly one message however. + * + * We could either read in as much as we can and keep track of what we + * delivered back to the caller or we just read byte by byte. Once we see + * (char) 0, we know that it's the message's end. This would be quite + * inefficient for more data but since we are reading only on the command + * channel, the performance loss does not seem worth the trouble of + * keeping internal states for different file descriptors. + */ + bufsize = 64; /* could be any number */ + msg = (char *)malloc(bufsize); + if (!msg) + { + fprintf(stderr, _("out of memory\n")); + exit(1); + } + + + msgsize = 0; + for (;;) + { + Assert(msgsize <= bufsize); + ret = piperead(fd, msg + msgsize, 1); + + /* worker has closed the connection or another error happened */ + if (ret <= 0) + return NULL; + + Assert(ret == 1); + + if (msg[msgsize] == '\0') + return msg; + + msgsize++; + if (msgsize == bufsize) + { + /* could be any number */ + bufsize += 16; + msg = (char *) realloc(msg, bufsize); + } + } + } + + #ifdef WIN32 + /* + * This is a replacement version of pipe for Win32 which allows returned + * handles to be used in select(). Note that read/write calls must be replaced + * with recv/send. "handles" have to be integers so we check for errors then + * cast to integers. + */ + int + pgpipe(int handles[2]) + { + pgsocket s, tmp_sock; + struct sockaddr_in serv_addr; + int len = sizeof(serv_addr); + + /* We have to use the Unix socket invalid file descriptor value here. */ + handles[0] = handles[1] = -1; + + /* + * setup listen socket + */ + if ((s = socket(AF_INET, SOCK_STREAM, 0)) == PGINVALID_SOCKET) + { + fprintf(stderr, _("pgpipe: could not create socket: error code %d\n"), + WSAGetLastError()); + + return -1; + } + + memset((void *) &serv_addr, 0, sizeof(serv_addr)); + serv_addr.sin_family = AF_INET; + serv_addr.sin_port = htons(0); + serv_addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK); + if (bind(s, (SOCKADDR *) &serv_addr, len) == SOCKET_ERROR) + { + fprintf(stderr, _("pgpipe: could not bind: error code %d\n"), + WSAGetLastError()); + closesocket(s); + return -1; + } + if (listen(s, 1) == SOCKET_ERROR) + { + fprintf(stderr, _("pgpipe: could not listen: error code %d\n"), + WSAGetLastError()); + closesocket(s); + return -1; + } + if (getsockname(s, (SOCKADDR *) &serv_addr, &len) == SOCKET_ERROR) + { + fprintf(stderr, _("pgpipe: getsockname() failed: error code %d\n"), + WSAGetLastError()); + closesocket(s); + return -1; + } + + /* + * setup pipe handles + */ + if ((tmp_sock = socket(AF_INET, SOCK_STREAM, 0)) == PGINVALID_SOCKET) + { + fprintf(stderr, _("pgpipe: could not create second socket: error code %d\n"), + WSAGetLastError()); + closesocket(s); + return -1; + } + handles[1] = (int) tmp_sock; + + if (connect(handles[1], (SOCKADDR *) &serv_addr, len) == SOCKET_ERROR) + { + fprintf(stderr, _("pgpipe: could not connect socket: error code %d\n"), + WSAGetLastError()); + closesocket(s); + return -1; + } + if ((tmp_sock = accept(s, (SOCKADDR *) &serv_addr, &len)) == PGINVALID_SOCKET) + { + fprintf(stderr, _("pgpipe: could not accept connection: error code %d\n"), + WSAGetLastError()); + closesocket(handles[1]); + handles[1] = -1; + closesocket(s); + return -1; + } + handles[0] = (int) tmp_sock; + + closesocket(s); + return 0; + } + + int + piperead(int s, char *buf, int len) + { + int ret = recv(s, buf, len, 0); + + if (ret < 0 && WSAGetLastError() == WSAECONNRESET) + /* EOF on the pipe! (win32 socket based implementation) */ + ret = 0; + return ret; + } + + #endif + + + #ifdef WIN32 + void + shutdown_parallel_dump_utils(int code, void *unused) + { + /* Call the cleanup function only from the main thread */ + if (mainThreadId == GetCurrentThreadId()) + WSACleanup(); + } + #endif + + void + init_parallel_dump_utils(void) + { + #ifdef WIN32 + if (!parallel_init_done) + { + WSADATA wsaData; + int err; + + tls_index = TlsAlloc(); + mainThreadId = GetCurrentThreadId(); + err = WSAStartup(MAKEWORD(2, 2), &wsaData); + if (err != 0) + { + fprintf(stderr, _("%s: WSAStartup failed: %d\n"), progname, err); + exit_nicely(1); + } + on_exit_nicely(shutdown_parallel_dump_utils, NULL); + parallel_init_done = true; + } + #endif + } + + ParallelSlot * + GetMyPSlot(ParallelState *pstate) + { + int i; + + for (i = 0; i < pstate->numWorkers; i++) + #ifdef WIN32 + if (pstate->parallelSlot[i].threadId == GetCurrentThreadId()) + #else + if (pstate->parallelSlot[i].pid == getpid()) + #endif + return &(pstate->parallelSlot[i]); + + return NULL; + } + + + /* + * Fail and die, with a message to stderr. Parameters as for write_msg. + * + * This is defined in parallel.c, because in parallel mode, things are more + * complicated. If the worker process does exit_horribly(), we forward its + * last words to the master process. The master process then does + * exit_horribly() with this error message itself and prints it normally. + * After printing the message, exit_horribly() on the master will shut down + * the remaining worker processes. + */ + void + exit_horribly(const char *modulename, const char *fmt,...) + { + va_list ap; + ParallelState *pstate = shutdown_info.pstate; + ParallelSlot *slot; + + va_start(ap, fmt); + + if (pstate == NULL) + { + /* Not in parallel mode, just write to stderr */ + vwrite_msg(modulename, fmt, ap); + } + else + { + slot = GetMyPSlot(pstate); + + if (!slot) + /* We're the parent, just write the message out */ + vwrite_msg(modulename, fmt, ap); + else + /* If we're a worker process, send the msg to the master process */ + parallel_msg_master(slot, modulename, fmt, ap); + } + + va_end(ap); + + exit_nicely(1); + } + + /* Sends the error message from the worker to the master process */ + void + parallel_msg_master(ParallelSlot *slot, const char *modulename, + const char *fmt, va_list ap) + { + char buf[512]; + int pipefd[2]; + + pipefd[PIPE_READ] = slot->pipeRevRead; + pipefd[PIPE_WRITE] = slot->pipeRevWrite; + + strcpy(buf, "ERROR "); + vsnprintf(buf + strlen("ERROR "), + sizeof(buf) - strlen("ERROR "), fmt, ap); + + sendMessageToMaster(pipefd, buf); + } + + /* + * Shut down any remaining workers, this has an implicit do_wait == true. + * + * The fastest way we can make the workers terminate gracefully is when + * they are listening for new commands and we just tell them to terminate. + */ + void + ShutdownWorkersHard(ParallelState *pstate) + { + int i; + + #ifndef WIN32 + signal(SIGPIPE, SIG_IGN); + + /* + * Close our write end of the sockets so that the workers know they can + * exit. + */ + for (i = 0; i < pstate->numWorkers; i++) + closesocket(pstate->parallelSlot[i].pipeWrite); + + for (i = 0; i < pstate->numWorkers; i++) + kill(pstate->parallelSlot[i].pid, SIGTERM); + #else + /* + * Close our write end of the sockets so that the workers know they can + * exit. + */ + for (i = 0; i < pstate->numWorkers; i++) + closesocket(pstate->parallelSlot[i].pipeWrite); + + /* The workers monitor this event via checkAborting(). */ + SetEvent(termEvent); + #endif + + WaitForTerminatingWorkers(pstate); + } + + + /* + * Wait for the termination of the processes using the OS-specific method. + */ + void + WaitForTerminatingWorkers(ParallelState *pstate) + { + while (!HasEveryWorkerTerminated(pstate)) + { + ParallelSlot *slot = NULL; + int j; + + #ifndef WIN32 + int status; + pid_t pid = wait(&status); + + for (j = 0; j < pstate->numWorkers; j++) + if (pstate->parallelSlot[j].pid == pid) + slot = &(pstate->parallelSlot[j]); + #else + uintptr_t hThread; + DWORD ret; + uintptr_t *lpHandles; + int nrun = 0; + + lpHandles = malloc(sizeof(HANDLE) * pstate->numWorkers); + if (!lpHandles) + { + fprintf(stderr, _("out of memory\n")); + exit(1); + } + + for (j = 0; j < pstate->numWorkers; j++) + if (pstate->parallelSlot[j].workerStatus != WRKR_TERMINATED) + { + lpHandles[nrun] = pstate->parallelSlot[j].hThread; + nrun++; + } + ret = WaitForMultipleObjects(nrun, (HANDLE *) lpHandles, false, INFINITE); + Assert(ret != WAIT_FAILED); + hThread = lpHandles[ret - WAIT_OBJECT_0]; + + for (j = 0; j < pstate->numWorkers; j++) + if (pstate->parallelSlot[j].hThread == hThread) + slot = &(pstate->parallelSlot[j]); + + free(lpHandles); + #endif + Assert(slot); + + slot->workerStatus = WRKR_TERMINATED; + } + Assert(HasEveryWorkerTerminated(pstate)); + } + + #ifndef WIN32 + /* Signal handling (UNIX only) */ + void + sigTermHandler(int signum) + { + wantAbort = 1; + } + #endif + + /* + * Find the first free parallel slot (if any). + */ + int + GetIdleWorker(ParallelState *pstate) + { + int i; + + for (i = 0; i < pstate->numWorkers; i++) + if (pstate->parallelSlot[i].workerStatus == WRKR_IDLE) + return i; + return NO_SLOT; + } + + /* + * Return true iff every worker process is in the WRKR_TERMINATED state. + */ + bool + HasEveryWorkerTerminated(ParallelState *pstate) + { + int i; + + for (i = 0; i < pstate->numWorkers; i++) + if (pstate->parallelSlot[i].workerStatus != WRKR_TERMINATED) + return false; + return true; + } + + /* + * Return true iff every worker is in the WRKR_IDLE state. + */ + bool + IsEveryWorkerIdle(ParallelState *pstate) + { + int i; + + for (i = 0; i < pstate->numWorkers; i++) + if (pstate->parallelSlot[i].workerStatus != WRKR_IDLE) + return false; + return true; + } + + /* + * This function is executed in the worker process. + * + * It returns the next message on the communication channel, blocking until it + * becomes available. + */ + char * + getMessageFromMaster(int pipefd[2]) + { + return readMessageFromPipe(pipefd[PIPE_READ]); + } + + /* + * This function is executed in the worker process. + * + * It sends a message to the master on the communication channel. + */ + void + sendMessageToMaster(int pipefd[2], const char *str) + { + int len = strlen(str) + 1; + + if (pipewrite(pipefd[PIPE_WRITE], str, len) != len) + exit_horribly(modulename, + "could not write to the communication channel: %s\n", + strerror(errno)); + } + + /* + * A select loop that repeats calling select until a descriptor in the read + * set becomes readable. On Windows we have to check for the termination event + * from time to time, on Unix we can just block forever. + */ + int + select_loop(int maxFd, fd_set *workerset) + { + int i; + fd_set saveSet = *workerset; + + #ifdef WIN32 + /* should always be the master */ + Assert(tMasterThreadId == GetCurrentThreadId()); + + for (;;) + { + /* + * sleep a quarter of a second before checking if we should terminate. + */ + struct timeval tv = {0, 250000}; + + *workerset = saveSet; + i = select(maxFd + 1, workerset, NULL, NULL, &tv); + + if (i == SOCKET_ERROR && WSAGetLastError() == WSAEINTR) + continue; + if (i) + break; + } + #else /* UNIX */ + + for (;;) + { + *workerset = saveSet; + i = select(maxFd + 1, workerset, NULL, NULL, NULL); + + /* + * If we Ctrl-C the master process , it's likely that we interrupt + * select() here. The signal handler will set wantAbort == true and + * the shutdown journey starts from here. Note that we'll come back + * here later when we tell all workers to terminate and read their + * responses. But then we have aborting set to true. + */ + if (wantAbort && !aborting) + exit_horribly(modulename, "terminated by user\n"); + + if (i < 0 && errno == EINTR) + continue; + break; + } + #endif + + return i; + } + + + /* + * This function is executed in the master process. + * + * It returns the next message from the worker on the communication channel, + * optionally blocking (do_wait) until it becomes available. + * + * The id of the worker is returned in *worker. + */ + char * + getMessageFromWorker(ParallelState *pstate, bool do_wait, int *worker) + { + int i; + fd_set workerset; + int maxFd = -1; + struct timeval nowait = {0, 0}; + + FD_ZERO(&workerset); + + for (i = 0; i < pstate->numWorkers; i++) + { + if (pstate->parallelSlot[i].workerStatus == WRKR_TERMINATED) + continue; + FD_SET(pstate->parallelSlot[i].pipeRead, &workerset); + /* actually WIN32 ignores the first parameter to select()... */ + if (pstate->parallelSlot[i].pipeRead > maxFd) + maxFd = pstate->parallelSlot[i].pipeRead; + } + + if (do_wait) + { + i = select_loop(maxFd, &workerset); + Assert(i != 0); + } + else + { + if ((i = select(maxFd + 1, &workerset, NULL, NULL, &nowait)) == 0) + return NULL; + } + + if (i < 0) + exit_horribly(modulename, "error in ListenToWorkers(): %s\n", strerror(errno)); + + for (i = 0; i < pstate->numWorkers; i++) + { + char *msg; + + if (!FD_ISSET(pstate->parallelSlot[i].pipeRead, &workerset)) + continue; + + msg = readMessageFromPipe(pstate->parallelSlot[i].pipeRead); + *worker = i; + return msg; + } + Assert(false); + return NULL; + } + + /* + * This function is executed in the master process. + * + * It sends a message to a certain worker on the communication channel. + */ + void + sendMessageToWorker(ParallelState *pstate, int worker, const char *str) + { + int len = strlen(str) + 1; + + if (pipewrite(pstate->parallelSlot[worker].pipeWrite, str, len) != len) + { + /* + * If we're already aborting anyway, don't care if we succeed or not. + * The child might have gone already. + */ + #ifndef WIN32 + if (!aborting) + #endif + exit_horribly(modulename, + "could not write to the communication channel: %s\n", + strerror(errno)); + } + } + + /* + * This function is executed in the master process. + * + * This function is used to get the return value of a terminated worker + * process. If a process has terminated, its status is stored in *status and + * the id of the worker is returned. + */ + int + ReapWorkerStatus(ParallelState *pstate, int *status) + { + int i; + + for (i = 0; i < pstate->numWorkers; i++) + { + if (pstate->parallelSlot[i].workerStatus == WRKR_FINISHED) + { + *status = pstate->parallelSlot[i].status; + pstate->parallelSlot[i].status = 0; + pstate->parallelSlot[i].workerStatus = WRKR_IDLE; + return i; + } + } + return NO_SLOT; + } + + /* + * If we have one worker that terminates for some reason, we'd like the other + * threads to terminate as well (and not finish with their 70 GB table dump + * first...). Now in UNIX we can just kill these processes, and let the signal + * handler set wantAbort to 1. In Windows we set a termEvent and this serves + * as the signal for everyone to terminate. + */ + void + checkAborting() + { + #ifdef WIN32 + if (WaitForSingleObject(termEvent, 0) == WAIT_OBJECT_0) + #else + if (wantAbort) + #endif + exit_horribly(modulename, "worker is terminating\n"); + } + + + /* + * Write a printf-style message to stderr. + * + * The program name is prepended, if "progname" has been set. + * Also, if modulename isn't NULL, that's included too. + * Note that we'll try to translate the modulename and the fmt string. + */ + void + write_msg(const char *modulename, const char *fmt,...) + { + va_list ap; + + va_start(ap, fmt); + vwrite_msg(modulename, fmt, ap); + va_end(ap); + } + + /* + * As write_msg, but pass a va_list not variable arguments. + */ + void + vwrite_msg(const char *modulename, const char *fmt, va_list ap) + { + if (progname) + { + if (modulename) + fprintf(stderr, "%s: [%s] ", progname, _(modulename)); + else + fprintf(stderr, "%s: ", progname); + } + vfprintf(stderr, _(fmt), ap); + } + + /* Register a callback to be run when exit_nicely is invoked. */ + void + on_exit_nicely(on_exit_nicely_callback function, void *arg) + { + if (on_exit_nicely_index >= MAX_ON_EXIT_NICELY) + exit_horribly(NULL, "out of on_exit_nicely slots\n"); + on_exit_nicely_list[on_exit_nicely_index].function = function; + on_exit_nicely_list[on_exit_nicely_index].arg = arg; + on_exit_nicely_index++; + } + + /* + * Run accumulated on_exit_nicely callbacks in reverse order and then exit + * quietly. This needs to be thread-safe. + */ + void + exit_nicely(int code) + { + int i; + + for (i = on_exit_nicely_index - 1; i >= 0; i--) + (*on_exit_nicely_list[i].function) (code, + on_exit_nicely_list[i].arg); + + #ifdef WIN32 + if (parallel_init_done && GetCurrentThreadId() != mainThreadId) + ExitThread(code); + #endif + + exit(code); + } + *** a/src/tools/msvc/Mkvcbuild.pm --- b/src/tools/msvc/Mkvcbuild.pm *************** *** 71,77 **** sub mkvcbuild pgcheckdir.c pg_crc.c pgmkdirp.c pgsleep.c pgstrcasecmp.c pqsignal.c mkdtemp.c qsort.c qsort_arg.c quotes.c system.c sprompt.c tar.c thread.c getopt.c getopt_long.c dirent.c ! win32env.c win32error.c win32setlocale.c); push(@pgportfiles, 'rint.c') if ($vsVersion < '12.00'); --- 71,77 ---- pgcheckdir.c pg_crc.c pgmkdirp.c pgsleep.c pgstrcasecmp.c pqsignal.c mkdtemp.c qsort.c qsort_arg.c quotes.c system.c sprompt.c tar.c thread.c getopt.c getopt_long.c dirent.c ! win32env.c win32error.c win32setlocale.c parallel_utils.c); push(@pgportfiles, 'rint.c') if ($vsVersion < '12.00');