diff --git a/configure b/configure index 8315dec23a8..342b97777e2 100755 --- a/configure +++ b/configure @@ -10600,7 +10600,7 @@ fi ## Header files ## -for ac_header in atomic.h crypt.h dld.h fp_class.h getopt.h ieeefp.h ifaddrs.h langinfo.h mbarrier.h poll.h sys/epoll.h sys/ipc.h sys/pstat.h sys/resource.h sys/select.h sys/sem.h sys/shm.h sys/sockio.h sys/tas.h sys/un.h termios.h ucred.h utime.h wchar.h wctype.h +for ac_header in atomic.h crypt.h dld.h fp_class.h getopt.h ieeefp.h ifaddrs.h langinfo.h mbarrier.h poll.h sys/epoll.h sys/event.h sys/ipc.h sys/pstat.h sys/resource.h sys/select.h sys/sem.h sys/shm.h sys/sockio.h sys/tas.h sys/un.h termios.h ucred.h utime.h wchar.h wctype.h do : as_ac_Header=`$as_echo "ac_cv_header_$ac_header" | $as_tr_sh` ac_fn_c_check_header_mongrel "$LINENO" "$ac_header" "$as_ac_Header" "$ac_includes_default" @@ -12892,7 +12892,7 @@ fi LIBS_including_readline="$LIBS" LIBS=`echo "$LIBS" | sed -e 's/-ledit//g' -e 's/-lreadline//g'` -for ac_func in cbrt clock_gettime dlopen fdatasync getifaddrs getpeerucred getrlimit mbstowcs_l memmove poll pstat pthread_is_threaded_np readlink setproctitle setsid shm_open symlink sync_file_range towlower utime utimes wcstombs wcstombs_l +for ac_func in cbrt clock_gettime dlopen fdatasync getifaddrs getpeerucred getrlimit kqueue mbstowcs_l memmove poll pstat pthread_is_threaded_np readlink setproctitle setsid shm_open symlink sync_file_range towlower utime utimes wcstombs wcstombs_l do : as_ac_var=`$as_echo "ac_cv_func_$ac_func" | $as_tr_sh` ac_fn_c_check_func "$LINENO" "$ac_func" "$as_ac_var" diff --git a/configure.in b/configure.in index 11eb9c8acfc..48a25110459 100644 --- a/configure.in +++ b/configure.in @@ -1179,7 +1179,7 @@ AC_SUBST(UUID_LIBS) ## Header files ## -AC_CHECK_HEADERS([atomic.h crypt.h dld.h fp_class.h getopt.h ieeefp.h ifaddrs.h langinfo.h mbarrier.h poll.h sys/epoll.h sys/ipc.h sys/pstat.h sys/resource.h sys/select.h sys/sem.h sys/shm.h sys/sockio.h sys/tas.h sys/un.h termios.h ucred.h utime.h wchar.h wctype.h]) +AC_CHECK_HEADERS([atomic.h crypt.h dld.h fp_class.h getopt.h ieeefp.h ifaddrs.h langinfo.h mbarrier.h poll.h sys/epoll.h sys/event.h sys/ipc.h sys/pstat.h sys/resource.h sys/select.h sys/sem.h sys/shm.h sys/sockio.h sys/tas.h sys/un.h termios.h ucred.h utime.h wchar.h wctype.h]) # On BSD, test for net/if.h will fail unless sys/socket.h # is included first. @@ -1429,7 +1429,7 @@ PGAC_FUNC_WCSTOMBS_L LIBS_including_readline="$LIBS" LIBS=`echo "$LIBS" | sed -e 's/-ledit//g' -e 's/-lreadline//g'` -AC_CHECK_FUNCS([cbrt clock_gettime dlopen fdatasync getifaddrs getpeerucred getrlimit mbstowcs_l memmove poll pstat pthread_is_threaded_np readlink setproctitle setsid shm_open symlink sync_file_range towlower utime utimes wcstombs wcstombs_l]) +AC_CHECK_FUNCS([cbrt clock_gettime dlopen fdatasync getifaddrs getpeerucred getrlimit kqueue mbstowcs_l memmove poll pstat pthread_is_threaded_np readlink setproctitle setsid shm_open symlink sync_file_range towlower utime utimes wcstombs wcstombs_l]) AC_REPLACE_FUNCS(fseeko) case $host_os in diff --git a/src/backend/storage/ipc/latch.c b/src/backend/storage/ipc/latch.c index 07b1364de8f..766ccfa6f82 100644 --- a/src/backend/storage/ipc/latch.c +++ b/src/backend/storage/ipc/latch.c @@ -39,6 +39,9 @@ #ifdef HAVE_SYS_EPOLL_H #include #endif +#ifdef HAVE_SYS_EVENT_H +#include +#endif #ifdef HAVE_POLL_H #include #endif @@ -59,10 +62,12 @@ * define somewhere before this block. */ #if defined(WAIT_USE_EPOLL) || defined(WAIT_USE_POLL) || \ - defined(WAIT_USE_WIN32) + defined(WAIT_USE_KQUEUE) || defined(WAIT_USE_WIN32) /* don't overwrite manual choice */ #elif defined(HAVE_SYS_EPOLL_H) #define WAIT_USE_EPOLL +#elif defined(HAVE_KQUEUE) +#define WAIT_USE_KQUEUE #elif defined(HAVE_POLL) #define WAIT_USE_POLL #elif WIN32 @@ -96,6 +101,10 @@ struct WaitEventSet int epoll_fd; /* epoll_wait returns events in a user provided arrays, allocate once */ struct epoll_event *epoll_ret_events; +#elif defined(WAIT_USE_KQUEUE) + int kqueue_fd; + /* kevent returns events in a user provided arrays, allocate once */ + struct kevent *kqueue_ret_events; #elif defined(WAIT_USE_POLL) /* poll expects events to be waited on every poll() call, prepare once */ struct pollfd *pollfds; @@ -128,6 +137,8 @@ static void drainSelfPipe(void); #if defined(WAIT_USE_EPOLL) static void WaitEventAdjustEpoll(WaitEventSet *set, WaitEvent *event, int action); +#elif defined(WAIT_USE_KQUEUE) +static void WaitEventAdjustKqueue(WaitEventSet *set, WaitEvent *event, int old_events); #elif defined(WAIT_USE_POLL) static void WaitEventAdjustPoll(WaitEventSet *set, WaitEvent *event); #elif defined(WAIT_USE_WIN32) @@ -535,6 +546,8 @@ CreateWaitEventSet(MemoryContext context, int nevents) #if defined(WAIT_USE_EPOLL) sz += MAXALIGN(sizeof(struct epoll_event) * nevents); +#elif defined(WAIT_USE_KQUEUE) + sz += MAXALIGN(sizeof(struct kevent) * nevents); #elif defined(WAIT_USE_POLL) sz += MAXALIGN(sizeof(struct pollfd) * nevents); #elif defined(WAIT_USE_WIN32) @@ -553,6 +566,9 @@ CreateWaitEventSet(MemoryContext context, int nevents) #if defined(WAIT_USE_EPOLL) set->epoll_ret_events = (struct epoll_event *) data; data += MAXALIGN(sizeof(struct epoll_event) * nevents); +#elif defined(WAIT_USE_KQUEUE) + set->kqueue_ret_events = (struct kevent *) data; + data += MAXALIGN(sizeof(struct kevent) * nevents); #elif defined(WAIT_USE_POLL) set->pollfds = (struct pollfd *) data; data += MAXALIGN(sizeof(struct pollfd) * nevents); @@ -577,6 +593,10 @@ CreateWaitEventSet(MemoryContext context, int nevents) if (fcntl(set->epoll_fd, F_SETFD, FD_CLOEXEC) == -1) elog(ERROR, "fcntl(F_SETFD) failed on epoll descriptor: %m"); #endif /* EPOLL_CLOEXEC */ +#elif defined(WAIT_USE_KQUEUE) + set->kqueue_fd = kqueue(); + if (set->kqueue_fd < 0) + elog(ERROR, "kqueue failed: %m"); #elif defined(WAIT_USE_WIN32) /* @@ -609,6 +629,8 @@ FreeWaitEventSet(WaitEventSet *set) { #if defined(WAIT_USE_EPOLL) close(set->epoll_fd); +#elif defined(WAIT_USE_KQUEUE) + close(set->kqueue_fd); #elif defined(WAIT_USE_WIN32) WaitEvent *cur_event; @@ -716,6 +738,8 @@ AddWaitEventToSet(WaitEventSet *set, uint32 events, pgsocket fd, Latch *latch, /* perform wait primitive specific initialization, if needed */ #if defined(WAIT_USE_EPOLL) WaitEventAdjustEpoll(set, event, EPOLL_CTL_ADD); +#elif defined(WAIT_USE_KQUEUE) + WaitEventAdjustKqueue(set, event, 0); #elif defined(WAIT_USE_POLL) WaitEventAdjustPoll(set, event); #elif defined(WAIT_USE_WIN32) @@ -735,10 +759,16 @@ void ModifyWaitEvent(WaitEventSet *set, int pos, uint32 events, Latch *latch) { WaitEvent *event; +#if defined(WAIT_USE_KQUEUE) + int old_events; +#endif Assert(pos < set->nevents); event = &set->events[pos]; +#if defined(WAIT_USE_KQUEUE) + old_events = event->events; +#endif /* * If neither the event mask nor the associated latch changes, return @@ -772,6 +802,8 @@ ModifyWaitEvent(WaitEventSet *set, int pos, uint32 events, Latch *latch) #if defined(WAIT_USE_EPOLL) WaitEventAdjustEpoll(set, event, EPOLL_CTL_MOD); +#elif defined(WAIT_USE_KQUEUE) + WaitEventAdjustKqueue(set, event, old_events); #elif defined(WAIT_USE_POLL) WaitEventAdjustPoll(set, event); #elif defined(WAIT_USE_WIN32) @@ -862,6 +894,127 @@ WaitEventAdjustPoll(WaitEventSet *set, WaitEvent *event) } #endif +#if defined(WAIT_USE_KQUEUE) + +static inline void +WaitEventAdjustKqueueAdd(struct kevent *k_ev, int filter, int action, + WaitEvent *event) +{ + k_ev->ident = event->fd; + k_ev->filter = filter; + k_ev->flags = action | EV_CLEAR; + k_ev->fflags = 0; + k_ev->data = 0; + + /* + * On most BSD family systems, udata is of type void * so we could simply + * assign event to it without casting, or use the EV_SET macro instead of + * filling in the struct manually. Unfortunately, NetBSD and possibly + * others have it as intptr_t, so here we wallpaper over that difference + * with an unsightly lvalue cast. + */ + *((WaitEvent **)(&k_ev->udata)) = event; +} + +static inline void +WaitEventAdjustKqueueAddPostmaster(struct kevent *k_ev, WaitEvent *event) +{ + /* For now postmaster death can only be added, not removed. */ + k_ev->ident = PostmasterPid; + k_ev->filter = EVFILT_PROC; + k_ev->flags = EV_ADD | EV_CLEAR; + k_ev->fflags = NOTE_EXIT; + k_ev->data = 0; + *((WaitEvent **)(&k_ev->udata)) = event; +} + +/* + * old_events is the previous event mask, used to compute what has changed. + */ +static void +WaitEventAdjustKqueue(WaitEventSet *set, WaitEvent *event, int old_events) +{ + int rc; + struct kevent k_ev[2]; + int count = 0; + bool new_filt_read = false; + bool old_filt_read = false; + bool new_filt_write = false; + bool old_filt_write = false; + + if (old_events == event->events) + return; + + Assert(event->events != WL_LATCH_SET || set->latch != NULL); + Assert(event->events == WL_LATCH_SET || + event->events == WL_POSTMASTER_DEATH || + (event->events & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE))); + + if (event->events == WL_POSTMASTER_DEATH) + { + /* + * Unlike all the other implementations, we detect postmaster death + * using process notification instead of waiting on the postmaster + * alive pipe. + */ + WaitEventAdjustKqueueAddPostmaster(&k_ev[count++], event); + } + else + { + /* + * We need to compute the adds and deletes required to get from the + * old event mask to the new event mask, since kevent treats readable + * and writable as separate events. + */ + if (old_events == WL_LATCH_SET || + (old_events & WL_SOCKET_READABLE)) + old_filt_read = true; + if (event->events == WL_LATCH_SET || + event->events == WL_POSTMASTER_DEATH || + (event->events & WL_SOCKET_READABLE)) + new_filt_read = true; + if (old_events & WL_SOCKET_WRITEABLE) + old_filt_write = true; + if (event->events & WL_SOCKET_WRITEABLE) + new_filt_write = true; + if (old_filt_read && !new_filt_read) + WaitEventAdjustKqueueAdd(&k_ev[count++], EVFILT_READ, EV_DELETE, + event); + else if (!old_filt_read && new_filt_read) + WaitEventAdjustKqueueAdd(&k_ev[count++], EVFILT_READ, EV_ADD, + event); + if (old_filt_write && !new_filt_write) + WaitEventAdjustKqueueAdd(&k_ev[count++], EVFILT_WRITE, EV_DELETE, + event); + else if (!old_filt_write && new_filt_write) + WaitEventAdjustKqueueAdd(&k_ev[count++], EVFILT_WRITE, EV_ADD, + event); + } + + Assert(count > 0); + Assert(count <= 2); + + rc = kevent(set->kqueue_fd, &k_ev[0], count, NULL, 0, NULL); + + if (rc < 0 && event->events == WL_POSTMASTER_DEATH && errno == ESRCH) + { + /* + * The postmaster is already dead. Defer reporting this to the caller + * until wait time, for compatibility with the other implementations. + * To do that we will now add the regular alive pipe. + */ + WaitEventAdjustKqueueAdd(&k_ev[0], EVFILT_READ, EV_ADD, event); + rc = kevent(set->kqueue_fd, &k_ev[0], count, NULL, 0, NULL); + } + + if (rc < 0) + ereport(ERROR, + (errcode_for_socket_access(), + errmsg("kevent() failed: %m"))); +} + +#endif + #if defined(WAIT_USE_WIN32) static void WaitEventAdjustWin32(WaitEventSet *set, WaitEvent *event) @@ -1147,6 +1300,142 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout, return returned_events; } +#elif defined(WAIT_USE_KQUEUE) + +/* + * Wait using FreeBSD kqueue(2)/kevent(2). Also available on other BSD-family + * systems including MacOSX. + * + * This is the preferrable wait method for systems that have it, as several + * readiness notifications are delivered, without having to iterate through + * all of set->events. + * + * For now this mirrors the epoll code, but in future it could modify the fd + * set in the same call to kevent as it uses for waiting instead of doing that + * with separate system calls. + */ +static int +WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout, + WaitEvent *occurred_events, int nevents) +{ + int returned_events = 0; + int rc; + WaitEvent *cur_event; + struct kevent *cur_kqueue_event; + struct timespec timeout; + struct timespec *timeout_p; + + if (cur_timeout < 0) + timeout_p = NULL; + else + { + timeout.tv_sec = cur_timeout / 1000; + timeout.tv_nsec = (cur_timeout % 1000) * 1000000; + timeout_p = &timeout; + } + + /* Sleep */ + rc = kevent(set->kqueue_fd, NULL, 0, + set->kqueue_ret_events, nevents, + timeout_p); + + /* Check return code */ + if (rc < 0) + { + /* EINTR is okay, otherwise complain */ + if (errno != EINTR) + { + waiting = false; + ereport(ERROR, + (errcode_for_socket_access(), + errmsg("kevent() failed while trying to wait: %m"))); + } + return 0; + } + else if (rc == 0) + { + /* timeout exceeded */ + return -1; + } + + /* + * At least one event occurred, iterate over the returned kqueue events + * until they're either all processed, or we've returned all the events + * the caller desired. + */ + for (cur_kqueue_event = set->kqueue_ret_events; + cur_kqueue_event < (set->kqueue_ret_events + rc) && + returned_events < nevents; + cur_kqueue_event++) + { + /* kevent's data pointer is set to the associated WaitEvent */ + cur_event = (WaitEvent *) cur_kqueue_event->udata; + + occurred_events->pos = cur_event->pos; + occurred_events->user_data = cur_event->user_data; + occurred_events->events = 0; + + if (cur_event->events == WL_LATCH_SET && + cur_kqueue_event->filter == EVFILT_READ) + { + /* There's data in the self-pipe, clear it. */ + drainSelfPipe(); + + if (set->latch->is_set) + { + occurred_events->fd = PGINVALID_SOCKET; + occurred_events->events = WL_LATCH_SET; + occurred_events++; + returned_events++; + } + } + else if (cur_event->events == WL_POSTMASTER_DEATH && + ((cur_kqueue_event->filter == EVFILT_PROC && + (cur_kqueue_event->fflags & NOTE_EXIT) != 0) || + cur_kqueue_event->filter == EVFILT_READ)) + { + /* + * Postmaster death notification usually triggers filter + * EVFILT_PROC. But if the postmaster process was aready dead + * when we tried to add it to the kqueue, then + * WaitEventAdjustKqueue adds the alive pipe instead, which + * triggers EVFILT_READ here when we try to wait. + */ + occurred_events->fd = PGINVALID_SOCKET; + occurred_events->events = WL_POSTMASTER_DEATH; + occurred_events++; + returned_events++; + } + else if (cur_event->events & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE)) + { + Assert(cur_event->fd >= 0); + + if ((cur_event->events & WL_SOCKET_READABLE) && + (cur_kqueue_event->filter == EVFILT_READ)) + { + /* readable, or EOF */ + occurred_events->events |= WL_SOCKET_READABLE; + } + + if ((cur_event->events & WL_SOCKET_WRITEABLE) && + (cur_kqueue_event->filter == EVFILT_WRITE)) + { + /* writable, or EOF */ + occurred_events->events |= WL_SOCKET_WRITEABLE; + } + + if (occurred_events->events != 0) + { + occurred_events->fd = cur_event->fd; + occurred_events++; + returned_events++; + } + } + } + + return returned_events; +} + #elif defined(WAIT_USE_POLL) /* diff --git a/src/include/pg_config.h.in b/src/include/pg_config.h.in index 7a05c7e5b85..cf492a29e81 100644 --- a/src/include/pg_config.h.in +++ b/src/include/pg_config.h.in @@ -297,6 +297,9 @@ /* Define to 1 if you have isinf(). */ #undef HAVE_ISINF +/* Define to 1 if you have the `kqueue' function. */ +#undef HAVE_KQUEUE + /* Define to 1 if you have the header file. */ #undef HAVE_LANGINFO_H @@ -554,6 +557,9 @@ /* Define to 1 if you have the header file. */ #undef HAVE_SYS_EPOLL_H +/* Define to 1 if you have the header file. */ +#undef HAVE_SYS_EVENT_H + /* Define to 1 if you have the header file. */ #undef HAVE_SYS_IPC_H