diff --git a/src/bin/pg_dump/parallel.c b/src/bin/pg_dump/parallel.c index 9167294..7e73562 100644 --- a/src/bin/pg_dump/parallel.c +++ b/src/bin/pg_dump/parallel.c @@ -349,9 +349,9 @@ checkAborting(ArchiveHandle *AH) static void ShutdownWorkersHard(ParallelState *pstate) { -#ifndef WIN32 int i; +#ifndef WIN32 signal(SIGPIPE, SIG_IGN); /* @@ -366,6 +366,14 @@ ShutdownWorkersHard(ParallelState *pstate) #else /* The workers monitor this event via checkAborting(). */ SetEvent(termEvent); + + /* + * Disable send and receive on the given socket. Closing sockets is not + * reliable here because piperead()'s WIN32 emulated here does a blocking + * call of recv(), and this will stop the read done on them. + */ + for (i = 0; i < pstate->numWorkers; i++) + shutdown(pstate->parallelSlot[i].pipeWrite, SD_BOTH); #endif WaitForTerminatingWorkers(pstate); @@ -406,9 +414,19 @@ WaitForTerminatingWorkers(ParallelState *pstate) hThread = lpHandles[ret - WAIT_OBJECT_0]; for (j = 0; j < pstate->numWorkers; j++) + { if (pstate->parallelSlot[j].hThread == hThread) + { slot = &(pstate->parallelSlot[j]); + /* + * _endthreadex does not close the handle by itself, hence + * do it here. + */ + CloseHandle((HANDLE) hThread); + } + } + free(lpHandles); #endif Assert(slot); diff --git a/src/bin/pg_dump/pg_backup_utils.c b/src/bin/pg_dump/pg_backup_utils.c index 0aa13cd..0ef7e5e 100644 --- a/src/bin/pg_dump/pg_backup_utils.c +++ b/src/bin/pg_dump/pg_backup_utils.c @@ -119,7 +119,7 @@ exit_nicely(int code) #ifdef WIN32 if (parallel_init_done && GetCurrentThreadId() != mainThreadId) - ExitThread(code); + _endthreadex(code); #endif exit(code);