From 93e981ddd66af08b027de3916f65042d6e38c92a Mon Sep 17 00:00:00 2001 From: Thomas Munro Date: Mon, 18 Jan 2021 10:07:31 +1300 Subject: [PATCH v8 3/3] B --- src/bin/pgbench/pgbench.c | 69 +++++++++++++++++++++++++++++++++++++-- 1 file changed, 66 insertions(+), 3 deletions(-) diff --git a/src/bin/pgbench/pgbench.c b/src/bin/pgbench/pgbench.c index e639f4980d..6287e28229 100644 --- a/src/bin/pgbench/pgbench.c +++ b/src/bin/pgbench/pgbench.c @@ -114,18 +114,29 @@ typedef struct socket_set */ #ifdef WIN32 +#define PTHREAD_BARRIER_SERIAL_THREAD (-1) + /* Use native win32 threads on Windows */ typedef struct win32_pthread *pthread_t; typedef int pthread_attr_t; +typedef SYNCHRONIZATION_BARRIER pthread_barrier_t; static int pthread_create(pthread_t *thread, pthread_attr_t *attr, void *(*start_routine) (void *), void *arg); static int pthread_join(pthread_t th, void **thread_return); + +static int pthread_barrier_init(pthread_barrier_t *barrier, void *unused, int nthreads); +static int pthread_barrier_wait(pthread_barrier_t *barrier); +static int pthread_barrier_destroy(pthread_barrier_t *barrier); #elif defined(ENABLE_THREAD_SAFETY) /* Use platform-dependent pthread capability */ -#include +#include "port/pg_pthread.h" #else /* No threads implementation, use none (-j 1) */ #define pthread_t void * +#define pthread_barrier_t void * +#define pthread_barrier_init(a, b, c) /* ignore */ +#define pthread_barrier_wait(a) /* ignore */ +#define pthread_barrier_destroy(a) /* ignore */ #endif @@ -311,6 +322,9 @@ typedef struct RandomState /* Various random sequences are initialized from this one. */ static RandomState base_random_sequence; +/* Synchronization barrier for start and connection */ +static pthread_barrier_t barrier; + /* * Connection state machine states. */ @@ -454,8 +468,8 @@ typedef struct /* per thread collected stats in microseconds */ pg_time_usec_t create_time; /* thread creation time */ - pg_time_usec_t started_time; /* thread is running */ - pg_time_usec_t bench_start; /* thread is benchmarking */ + pg_time_usec_t started_time; /* thread is running after start barrier */ + pg_time_usec_t bench_start; /* thread is benchmarking after connection barrier */ pg_time_usec_t conn_duration; /* cumulated connection and deconnection delays */ StatsData stats; @@ -6125,6 +6139,8 @@ main(int argc, char **argv) if (duration > 0) setalarm(duration); + pthread_barrier_init(&barrier, NULL, nthreads); + #ifdef ENABLE_THREAD_SAFETY /* start all threads but thread 0 which is executed directly later */ for (i = 1; i < nthreads; i++) @@ -6196,6 +6212,8 @@ main(int argc, char **argv) printResults(&stats, pg_time_now() - bench_start, conn_total_duration, bench_start - start_time, latency_late); + pthread_barrier_destroy(&barrier); + if (exit_code != 0) pg_log_fatal("Run was aborted; the above results are incomplete."); @@ -6242,6 +6260,8 @@ threadRun(void *arg) state[i].state = CSTATE_CHOOSE_SCRIPT; /* READY */ + pthread_barrier_wait(&barrier); + thread_start = pg_time_now(); thread->started_time = thread_start; last_report = thread_start; @@ -6254,7 +6274,18 @@ threadRun(void *arg) for (int i = 0; i < nstate; i++) { if ((state[i].con = doConnect()) == NULL) + { + /* + * On connection failure, we meet the barrier here in place of + * GO before proceeding to the "done" path which will cleanup, + * so as to avoid locking the process. + * + * It is unclear whether it is worth doing anything rather than + * coldly exiting with an error message. + */ + pthread_barrier_wait(&barrier); goto done; + } } /* compute connection delay */ @@ -6266,6 +6297,8 @@ threadRun(void *arg) thread->conn_duration = 0; } + /* GO */ + pthread_barrier_wait(&barrier); start = pg_time_now(); thread->bench_start = start; @@ -6771,4 +6804,34 @@ pthread_join(pthread_t th, void **thread_return) return 0; } +static int +pthread_barrier_init(pthread_barrier_t *barrier, void *unused, int nthreads) +{ + /* no spinning: threads are not expected to arrive at the barrier together */ + bool ok = InitializeSynchronizationBarrier(barrier, nthreads, 0); + return 0; +} + +static int +pthread_barrier_wait(pthread_barrier_t *barrier) +{ + bool last = EnterSynchronizationBarrier(barrier, SYNCHRONIZATION_BARRIER_FLAGS_BLOCK_ONLY); + return last ? PTHREAD_BARRIER_SERIAL_THREAD : 0; +} + +static int +pthread_barrier_destroy(pthread_barrier_t *barrier) +{ + /* + * The following is coldly ignored because it requires Windows 8 + * or Windows Server 2012, which is a little too much. + * + * Also, there is a SYNCHRONIZATION_BARRIER_FLAGS_NO_DELETE flag + * but it probably requires the same versions. + * + * (void) DeleteSynchronizationBarrier(barrier); + */ + return 0; +} + #endif /* WIN32 */ -- 2.24.3 (Apple Git-128)