From b0ceb4c404b95b1037e31578037be791206c3d1a Mon Sep 17 00:00:00 2001 From: Jacob Burroughs Date: Tue, 12 Dec 2023 16:28:03 -0600 Subject: [PATCH v3 1/5] Add IO stream abstraction This is a shared abstraction between the frontend and backend that is designed to draw a clean line between the compoments of postgres that want to consume "regular" protocol bytes and any layers that may want to transform them. In this patch, encyryption is simply moved to use this abstraction rather than callers directly using secure_read/secure_write (which depending on context might or might not actually be secure). This is a pre-work change intended to make compression integrate into frontend and backend comms layers more cleanly. This may be a first step towards facilitating greater sharing of SSL/GSS api code between the frontend and backend down the road, but for now it the stream processors themselves are completely independent. --- src/backend/libpq/be-secure-gssapi.c | 63 ++-- src/backend/libpq/be-secure-openssl.c | 75 +++-- src/backend/libpq/be-secure.c | 216 ------------ src/backend/libpq/pqcomm.c | 245 +++++++++++++- src/backend/postmaster/postmaster.c | 5 + src/common/Makefile | 1 + src/common/io_stream.c | 148 ++++++++ src/common/meson.build | 1 + src/include/common/io_stream.h | 131 ++++++++ src/include/libpq/libpq-be.h | 23 +- src/include/libpq/libpq.h | 6 +- src/interfaces/libpq/fe-connect.c | 343 ++++++++++++++++++- src/interfaces/libpq/fe-misc.c | 31 +- src/interfaces/libpq/fe-secure-gssapi.c | 64 ++-- src/interfaces/libpq/fe-secure-openssl.c | 68 +++- src/interfaces/libpq/fe-secure.c | 411 ----------------------- src/interfaces/libpq/libpq-int.h | 42 +-- 17 files changed, 1069 insertions(+), 804 deletions(-) create mode 100644 src/common/io_stream.c create mode 100644 src/include/common/io_stream.h diff --git a/src/backend/libpq/be-secure-gssapi.c b/src/backend/libpq/be-secure-gssapi.c index 8ed2f65347..597ddbae6e 100644 --- a/src/backend/libpq/be-secure-gssapi.c +++ b/src/backend/libpq/be-secure-gssapi.c @@ -74,6 +74,12 @@ static int PqGSSResultNext; /* Next index to read a byte from static uint32 PqGSSMaxPktSize; /* Maximum size we can encrypt and fit the * results into our output buffer */ +static ssize_t be_gssapi_read(IoStreamLayer * self, Port *port, void *ptr, size_t len, bool buffered_only); +static int be_gssapi_write(IoStreamLayer * self, Port *port, void const *ptr, size_t len, size_t *bytes_written); +IoStreamProcessor be_gssapi_processor = { + .read = (io_stream_read_func) be_gssapi_read, + .write = (io_stream_write_func) be_gssapi_write +}; /* * Attempt to write len bytes of data from ptr to a GSSAPI-encrypted connection. @@ -91,8 +97,8 @@ static uint32 PqGSSMaxPktSize; /* Maximum size we can encrypt and fit the * recursion. Instead, use elog(COMMERROR) to log extra info about the * failure if necessary, and then return an errno indicating connection loss. */ -ssize_t -be_gssapi_write(Port *port, void *ptr, size_t len) +static int +be_gssapi_write(IoStreamLayer * self, Port *port, void const *ptr, size_t len, size_t *bytes_written) { OM_uint32 major, minor; @@ -102,6 +108,8 @@ be_gssapi_write(Port *port, void *ptr, size_t len) size_t bytes_encrypted; gss_ctx_id_t gctx = port->gss->ctx; + *bytes_written = 0; + /* * When we get a retryable failure, we must not tell the caller we have * successfully transmitted everything, else it won't retry. For @@ -148,20 +156,21 @@ be_gssapi_write(Port *port, void *ptr, size_t len) */ if (PqGSSSendLength) { - ssize_t ret; + int retval; + size_t count; ssize_t amount = PqGSSSendLength - PqGSSSendNext; - ret = secure_raw_write(port, PqGSSSendBuffer + PqGSSSendNext, amount); - if (ret <= 0) - return ret; + retval = io_stream_next_write(self, PqGSSSendBuffer + PqGSSSendNext, amount, &count); + if (retval < 0 || count == 0) + return retval; /* * Check if this was a partial write, and if so, move forward that * far in our buffer and try again. */ - if (ret < amount) + if (count < amount) { - PqGSSSendNext += ret; + PqGSSSendNext += count; continue; } @@ -242,7 +251,8 @@ be_gssapi_write(Port *port, void *ptr, size_t len) /* We're reporting all the data as sent, so reset PqGSSSendConsumed. */ PqGSSSendConsumed = 0; - return bytes_encrypted; + *bytes_written = bytes_encrypted; + return 0; } /* @@ -258,8 +268,8 @@ be_gssapi_write(Port *port, void *ptr, size_t len) * We treat fatal errors the same as in be_gssapi_write(), even though the * argument about infinite recursion doesn't apply here. */ -ssize_t -be_gssapi_read(Port *port, void *ptr, size_t len) +static ssize_t +be_gssapi_read(IoStreamLayer * self, Port *port, void *ptr, size_t len, bool buffered_only) { OM_uint32 major, minor; @@ -269,6 +279,9 @@ be_gssapi_read(Port *port, void *ptr, size_t len) size_t bytes_returned = 0; gss_ctx_id_t gctx = port->gss->ctx; + if (buffered_only) + return false; + /* * The plan here is to read one incoming encrypted packet into * PqGSSRecvBuffer, decrypt it into PqGSSResultBuffer, and then dole out @@ -325,10 +338,10 @@ be_gssapi_read(Port *port, void *ptr, size_t len) /* Collect the length if we haven't already */ if (PqGSSRecvLength < sizeof(uint32)) { - ret = secure_raw_read(port, PqGSSRecvBuffer + PqGSSRecvLength, - sizeof(uint32) - PqGSSRecvLength); + ret = io_stream_next_read(self, PqGSSRecvBuffer + PqGSSRecvLength, + sizeof(uint32) - PqGSSRecvLength, false); - /* If ret <= 0, secure_raw_read already set the correct errno */ + /* If ret <= 0, io_stream_next_read already set the correct errno */ if (ret <= 0) return ret; @@ -359,8 +372,8 @@ be_gssapi_read(Port *port, void *ptr, size_t len) * Read as much of the packet as we are able to on this call into * wherever we left off from the last time we were called. */ - ret = secure_raw_read(port, PqGSSRecvBuffer + PqGSSRecvLength, - input.length - (PqGSSRecvLength - sizeof(uint32))); + ret = io_stream_next_read(self, PqGSSRecvBuffer + PqGSSRecvLength, + input.length - (PqGSSRecvLength - sizeof(uint32)), false); /* If ret <= 0, secure_raw_read already set the correct errno */ if (ret <= 0) return ret; @@ -413,7 +426,8 @@ be_gssapi_read(Port *port, void *ptr, size_t len) /* * Read the specified number of bytes off the wire, waiting using - * WaitLatchOrSocket if we would block. + * WaitLatchOrSocket if we would block. Only used during connecition setup + * before GSS is added to the io_stream. * * Results are read into PqGSSRecvBuffer. * @@ -430,7 +444,7 @@ read_or_wait(Port *port, ssize_t len) */ while (PqGSSRecvLength < len) { - ret = secure_raw_read(port, PqGSSRecvBuffer + PqGSSRecvLength, len - PqGSSRecvLength); + ret = io_stream_read(port->io_stream, PqGSSRecvBuffer + PqGSSRecvLength, len - PqGSSRecvLength, false); /* * If we got back an error and it wasn't just @@ -465,7 +479,7 @@ read_or_wait(Port *port, ssize_t len) */ if (ret == 0) { - ret = secure_raw_read(port, PqGSSRecvBuffer + PqGSSRecvLength, len - PqGSSRecvLength); + ret = io_stream_read(port->io_stream, PqGSSRecvBuffer + PqGSSRecvLength, len - PqGSSRecvLength, false); if (ret == 0) return -1; } @@ -648,8 +662,10 @@ secure_open_gssapi(Port *port) while (PqGSSSendNext < PqGSSSendLength) { - ret = secure_raw_write(port, PqGSSSendBuffer + PqGSSSendNext, - PqGSSSendLength - PqGSSSendNext); + size_t count; + + ret = io_stream_write(port->io_stream, PqGSSSendBuffer + PqGSSSendNext, + PqGSSSendLength - PqGSSSendNext, &count); /* * If we got back an error and it wasn't just @@ -663,7 +679,7 @@ secure_open_gssapi(Port *port) } /* Wait and retry if we couldn't write yet */ - if (ret <= 0) + if (ret < 0 || count == 0) { WaitLatchOrSocket(MyLatch, WL_SOCKET_WRITEABLE | WL_EXIT_ON_PM_DEATH, @@ -671,7 +687,7 @@ secure_open_gssapi(Port *port) continue; } - PqGSSSendNext += ret; + PqGSSSendNext += count; } /* Done sending the packet, reset our buffer */ @@ -701,6 +717,7 @@ secure_open_gssapi(Port *port) pg_GSS_error(_("GSSAPI size check error"), major, minor); return -1; } + io_stream_add_layer(port->io_stream, &be_gssapi_processor, port); port->gss->enc = true; diff --git a/src/backend/libpq/be-secure-openssl.c b/src/backend/libpq/be-secure-openssl.c index 22e3dc5a81..5c67fd46aa 100644 --- a/src/backend/libpq/be-secure-openssl.c +++ b/src/backend/libpq/be-secure-openssl.c @@ -59,7 +59,7 @@ openssl_tls_init_hook_typ openssl_tls_init_hook = default_openssl_tls_init; static int my_sock_read(BIO *h, char *buf, int size); static int my_sock_write(BIO *h, const char *buf, int size); static BIO_METHOD *my_BIO_s_socket(void); -static int my_SSL_set_fd(Port *port, int fd); +static int my_SSL_set_fd(SSL *ssl, int fd, IoStreamLayer * layer); static DH *load_dh_file(char *filename, bool isServerStart); static DH *load_dh_buffer(const char *buffer, size_t len); @@ -71,6 +71,16 @@ static bool initialize_dh(SSL_CTX *context, bool isServerStart); static bool initialize_ecdh(SSL_CTX *context, bool isServerStart); static const char *SSLerrmessage(unsigned long ecode); +static ssize_t be_tls_read(IoStreamLayer * self, Port *port, void *ptr, size_t len, bool buffered_only); +static int be_tls_write(IoStreamLayer * self, Port *port, void const *ptr, size_t len, size_t *bytes_written); +static void be_tls_close(Port *port); + +IoStreamProcessor be_tls_processor = { + .read = (io_stream_read_func) be_tls_read, + .write = (io_stream_write_func) be_tls_write, + .destroy = (io_stream_destroy_func) be_tls_close +}; + static char *X509_NAME_to_cstring(X509_NAME *name); static SSL_CTX *SSL_context = NULL; @@ -440,7 +450,8 @@ be_tls_open_server(Port *port) SSLerrmessage(ERR_get_error())))); return -1; } - if (!my_SSL_set_fd(port, port->sock)) + if (!my_SSL_set_fd(port->ssl, port->sock, + io_stream_add_layer(port->io_stream, &be_tls_processor, port))) { ereport(COMMERROR, (errcode(ERRCODE_PROTOCOL_VIOLATION), @@ -674,7 +685,7 @@ aloop: return 0; } -void +static void be_tls_close(Port *port) { if (port->ssl) @@ -704,14 +715,27 @@ be_tls_close(Port *port) } } -ssize_t -be_tls_read(Port *port, void *ptr, size_t len, int *waitfor) +static ssize_t +be_tls_read(IoStreamLayer * self, Port *port, void *ptr, size_t len, bool buffered_only) { ssize_t n; int err; unsigned long ecode; + port->waitfor = 0; errno = 0; + + if (buffered_only) + { + /* + * SSL_pending bytes are guaranteed to be available and readable + * without blocking + */ + len = Min(len, SSL_pending(port->ssl)); + if (len == 0) + return 0; + } + ERR_clear_error(); n = SSL_read(port->ssl, ptr, len); err = SSL_get_error(port->ssl, n); @@ -722,12 +746,12 @@ be_tls_read(Port *port, void *ptr, size_t len, int *waitfor) /* a-ok */ break; case SSL_ERROR_WANT_READ: - *waitfor = WL_SOCKET_READABLE; + port->waitfor = WL_SOCKET_READABLE; errno = EWOULDBLOCK; n = -1; break; case SSL_ERROR_WANT_WRITE: - *waitfor = WL_SOCKET_WRITEABLE; + port->waitfor = WL_SOCKET_WRITEABLE; errno = EWOULDBLOCK; n = -1; break; @@ -763,13 +787,14 @@ be_tls_read(Port *port, void *ptr, size_t len, int *waitfor) return n; } -ssize_t -be_tls_write(Port *port, void *ptr, size_t len, int *waitfor) +static int +be_tls_write(IoStreamLayer * self, Port *port, void const *ptr, size_t len, size_t *bytes_written) { ssize_t n; int err; unsigned long ecode; + port->waitfor = 0; errno = 0; ERR_clear_error(); n = SSL_write(port->ssl, ptr, len); @@ -781,12 +806,12 @@ be_tls_write(Port *port, void *ptr, size_t len, int *waitfor) /* a-ok */ break; case SSL_ERROR_WANT_READ: - *waitfor = WL_SOCKET_READABLE; + port->waitfor = WL_SOCKET_READABLE; errno = EWOULDBLOCK; n = -1; break; case SSL_ERROR_WANT_WRITE: - *waitfor = WL_SOCKET_WRITEABLE; + port->waitfor = WL_SOCKET_WRITEABLE; errno = EWOULDBLOCK; n = -1; break; @@ -830,7 +855,16 @@ be_tls_write(Port *port, void *ptr, size_t len, int *waitfor) break; } - return n; + if (n >= 0) + { + *bytes_written = n; + return 0; + } + else + { + *bytes_written = 0; + return n; + } } /* ------------------------------------------------------------ */ @@ -858,7 +892,7 @@ my_sock_read(BIO *h, char *buf, int size) if (buf != NULL) { - res = secure_raw_read(((Port *) BIO_get_app_data(h)), buf, size); + res = io_stream_next_read(BIO_get_app_data(h), buf, size, false); BIO_clear_retry_flags(h); if (res <= 0) { @@ -876,11 +910,12 @@ my_sock_read(BIO *h, char *buf, int size) static int my_sock_write(BIO *h, const char *buf, int size) { - int res = 0; + int res; + size_t bytes_written; - res = secure_raw_write(((Port *) BIO_get_app_data(h)), buf, size); + res = io_stream_next_write(BIO_get_app_data(h), buf, size, &bytes_written); BIO_clear_retry_flags(h); - if (res <= 0) + if (res < 0 || bytes_written == 0) { /* If we were interrupted, tell caller to retry */ if (errno == EINTR || errno == EWOULDBLOCK || errno == EAGAIN) @@ -889,7 +924,7 @@ my_sock_write(BIO *h, const char *buf, int size) } } - return res; + return res ? res : bytes_written; } static BIO_METHOD * @@ -935,7 +970,7 @@ my_BIO_s_socket(void) /* This should exactly match OpenSSL's SSL_set_fd except for using my BIO */ static int -my_SSL_set_fd(Port *port, int fd) +my_SSL_set_fd(SSL *ssl, int fd, IoStreamLayer * layer) { int ret = 0; BIO *bio; @@ -954,10 +989,10 @@ my_SSL_set_fd(Port *port, int fd) SSLerr(SSL_F_SSL_SET_FD, ERR_R_BUF_LIB); goto err; } - BIO_set_app_data(bio, port); + BIO_set_app_data(bio, layer); BIO_set_fd(bio, fd, BIO_NOCLOSE); - SSL_set_bio(port->ssl, bio, bio); + SSL_set_bio(ssl, bio, bio); ret = 1; err: return ret; diff --git a/src/backend/libpq/be-secure.c b/src/backend/libpq/be-secure.c index a0f7084018..caee5b0556 100644 --- a/src/backend/libpq/be-secure.c +++ b/src/backend/libpq/be-secure.c @@ -125,219 +125,3 @@ secure_open_server(Port *port) return r; } - -/* - * Close secure session. - */ -void -secure_close(Port *port) -{ -#ifdef USE_SSL - if (port->ssl_in_use) - be_tls_close(port); -#endif -} - -/* - * Read data from a secure connection. - */ -ssize_t -secure_read(Port *port, void *ptr, size_t len) -{ - ssize_t n; - int waitfor; - - /* Deal with any already-pending interrupt condition. */ - ProcessClientReadInterrupt(false); - -retry: -#ifdef USE_SSL - waitfor = 0; - if (port->ssl_in_use) - { - n = be_tls_read(port, ptr, len, &waitfor); - } - else -#endif -#ifdef ENABLE_GSS - if (port->gss && port->gss->enc) - { - n = be_gssapi_read(port, ptr, len); - waitfor = WL_SOCKET_READABLE; - } - else -#endif - { - n = secure_raw_read(port, ptr, len); - waitfor = WL_SOCKET_READABLE; - } - - /* In blocking mode, wait until the socket is ready */ - if (n < 0 && !port->noblock && (errno == EWOULDBLOCK || errno == EAGAIN)) - { - WaitEvent event; - - Assert(waitfor); - - ModifyWaitEvent(FeBeWaitSet, FeBeWaitSetSocketPos, waitfor, NULL); - - WaitEventSetWait(FeBeWaitSet, -1 /* no timeout */ , &event, 1, - WAIT_EVENT_CLIENT_READ); - - /* - * If the postmaster has died, it's not safe to continue running, - * because it is the postmaster's job to kill us if some other backend - * exits uncleanly. Moreover, we won't run very well in this state; - * helper processes like walwriter and the bgwriter will exit, so - * performance may be poor. Finally, if we don't exit, pg_ctl will be - * unable to restart the postmaster without manual intervention, so no - * new connections can be accepted. Exiting clears the deck for a - * postmaster restart. - * - * (Note that we only make this check when we would otherwise sleep on - * our latch. We might still continue running for a while if the - * postmaster is killed in mid-query, or even through multiple queries - * if we never have to wait for read. We don't want to burn too many - * cycles checking for this very rare condition, and this should cause - * us to exit quickly in most cases.) - */ - if (event.events & WL_POSTMASTER_DEATH) - ereport(FATAL, - (errcode(ERRCODE_ADMIN_SHUTDOWN), - errmsg("terminating connection due to unexpected postmaster exit"))); - - /* Handle interrupt. */ - if (event.events & WL_LATCH_SET) - { - ResetLatch(MyLatch); - ProcessClientReadInterrupt(true); - - /* - * We'll retry the read. Most likely it will return immediately - * because there's still no data available, and we'll wait for the - * socket to become ready again. - */ - } - goto retry; - } - - /* - * Process interrupts that happened during a successful (or non-blocking, - * or hard-failed) read. - */ - ProcessClientReadInterrupt(false); - - return n; -} - -ssize_t -secure_raw_read(Port *port, void *ptr, size_t len) -{ - ssize_t n; - - /* - * Try to read from the socket without blocking. If it succeeds we're - * done, otherwise we'll wait for the socket using the latch mechanism. - */ -#ifdef WIN32 - pgwin32_noblock = true; -#endif - n = recv(port->sock, ptr, len, 0); -#ifdef WIN32 - pgwin32_noblock = false; -#endif - - return n; -} - - -/* - * Write data to a secure connection. - */ -ssize_t -secure_write(Port *port, void *ptr, size_t len) -{ - ssize_t n; - int waitfor; - - /* Deal with any already-pending interrupt condition. */ - ProcessClientWriteInterrupt(false); - -retry: - waitfor = 0; -#ifdef USE_SSL - if (port->ssl_in_use) - { - n = be_tls_write(port, ptr, len, &waitfor); - } - else -#endif -#ifdef ENABLE_GSS - if (port->gss && port->gss->enc) - { - n = be_gssapi_write(port, ptr, len); - waitfor = WL_SOCKET_WRITEABLE; - } - else -#endif - { - n = secure_raw_write(port, ptr, len); - waitfor = WL_SOCKET_WRITEABLE; - } - - if (n < 0 && !port->noblock && (errno == EWOULDBLOCK || errno == EAGAIN)) - { - WaitEvent event; - - Assert(waitfor); - - ModifyWaitEvent(FeBeWaitSet, FeBeWaitSetSocketPos, waitfor, NULL); - - WaitEventSetWait(FeBeWaitSet, -1 /* no timeout */ , &event, 1, - WAIT_EVENT_CLIENT_WRITE); - - /* See comments in secure_read. */ - if (event.events & WL_POSTMASTER_DEATH) - ereport(FATAL, - (errcode(ERRCODE_ADMIN_SHUTDOWN), - errmsg("terminating connection due to unexpected postmaster exit"))); - - /* Handle interrupt. */ - if (event.events & WL_LATCH_SET) - { - ResetLatch(MyLatch); - ProcessClientWriteInterrupt(true); - - /* - * We'll retry the write. Most likely it will return immediately - * because there's still no buffer space available, and we'll wait - * for the socket to become ready again. - */ - } - goto retry; - } - - /* - * Process interrupts that happened during a successful (or non-blocking, - * or hard-failed) write. - */ - ProcessClientWriteInterrupt(false); - - return n; -} - -ssize_t -secure_raw_write(Port *port, const void *ptr, size_t len) -{ - ssize_t n; - -#ifdef WIN32 - pgwin32_noblock = true; -#endif - n = send(port->sock, ptr, len, 0); -#ifdef WIN32 - pgwin32_noblock = false; -#endif - - return n; -} diff --git a/src/backend/libpq/pqcomm.c b/src/backend/libpq/pqcomm.c index 0084a9bf13..030686cc3b 100644 --- a/src/backend/libpq/pqcomm.c +++ b/src/backend/libpq/pqcomm.c @@ -31,6 +31,7 @@ * setup/teardown: * StreamServerPort - Open postmaster's server port * StreamConnection - Create new connection with client + * StreamSetupIo - Configures IO stream on connection * StreamClose - Close a client/backend connection * TouchSocketFiles - Protect socket files against /tmp cleaners * pq_init - initialize libpq at backend startup @@ -74,12 +75,15 @@ #endif #include "common/ip.h" +#include "common/io_stream.h" #include "libpq/libpq.h" #include "miscadmin.h" #include "port/pg_bswap.h" #include "storage/ipc.h" +#include "tcop/tcopprot.h" #include "utils/guc_hooks.h" #include "utils/memutils.h" +#include "utils/wait_event.h" /* * Cope with the various platform-specific ways to spell TCP keepalive socket @@ -146,6 +150,15 @@ static int socket_putmessage(char msgtype, const char *s, size_t len); static void socket_putmessage_noblock(char msgtype, const char *s, size_t len); static int internal_putbytes(const char *s, size_t len); static int internal_flush(void); +static ssize_t socket_read(IoStreamLayer * self, Port *port, void *ptr, size_t len, bool buffered_only); +static int socket_write(IoStreamLayer * self, Port *port, void const *ptr, size_t len, size_t *bytes_written); +static ssize_t io_read_with_wait(Port *port, void *ptr, size_t len); +static int io_write_with_wait(Port *port, char const *ptr, size_t len, size_t *bytes_written); + +IoStreamProcessor socket_processor = { + .read = (io_stream_read_func) socket_read, + .write = (io_stream_write_func) socket_write +}; static int Lock_AF_UNIX(const char *unixSocketDir, const char *unixSocketPath); static int Setup_AF_UNIX(const char *sock_path); @@ -277,7 +290,8 @@ socket_close(int code, Datum arg) * Cleanly shut down SSL layer. Nowhere else does a postmaster child * call this, so this is safe when interrupting BackendInitialize(). */ - secure_close(MyProcPort); + io_stream_destroy(MyProcPort->io_stream); + MyProcPort->io_stream = NULL; /* * Formerly we did an explicit close() here, but it seems better to @@ -817,6 +831,30 @@ StreamConnection(pgsocket server_fd, Port *port) return STATUS_OK; } +/* This must be called after the child process is launched or the data structures + * do not comre across correctly + */ +int +StreamSetupIo(Port *port) +{ + if (port->io_stream) + { + ereport(LOG, + (errmsg("%s() failed: io_stream already configured", "io_stream_create"))); + return STATUS_ERROR; + } + port->io_stream = io_stream_create(); + if (!port->io_stream) + { + ereport(LOG, + (errmsg("%s() failed: %m", "io_stream_create"))); + return STATUS_ERROR; + } + io_stream_add_layer(port->io_stream, &socket_processor, port); + + return STATUS_OK; +} + /* * StreamClose -- close a client/backend connection * @@ -905,6 +943,193 @@ socket_set_nonblocking(bool nonblocking) MyProcPort->noblock = nonblocking; } +static ssize_t +socket_read(IoStreamLayer * self, Port *port, void *ptr, size_t len, bool buffered_only) +{ + ssize_t n; + + if (buffered_only) + return 0; + + /* + * Try to read from the socket without blocking. If it succeeds we're + * done, otherwise we'll wait for the socket using the latch mechanism. + */ +#ifdef WIN32 + pgwin32_noblock = true; +#endif + n = recv(port->sock, ptr, len, 0); +#ifdef WIN32 + pgwin32_noblock = false; +#endif + + return n; +} + +static int +socket_write(IoStreamLayer * self, Port *port, const void *ptr, size_t len, size_t *bytes_written) +{ + ssize_t n; + +#ifdef WIN32 + pgwin32_noblock = true; +#endif + n = send(port->sock, ptr, len, 0); +#ifdef WIN32 + pgwin32_noblock = false; +#endif + + if (n >= 0) + { + *bytes_written = n; + return 0; + } + else + { + *bytes_written = 0; + return n; + } +} + +/* + * Read protocol-level data, processed through any intermediate streams like TLS + */ +static ssize_t +io_read_with_wait(Port *port, void *ptr, size_t len) +{ + ssize_t n; + + /* Deal with any already-pending interrupt condition. */ + ProcessClientReadInterrupt(false); + +retry: + port->waitfor = WL_SOCKET_READABLE; + n = io_stream_read(port->io_stream, ptr, len, false); + + /* In blocking mode, wait until the socket is ready */ + if (n < 0 && !port->noblock && (errno == EWOULDBLOCK || errno == EAGAIN)) + { + WaitEvent event; + + Assert(port->waitfor); + + ModifyWaitEvent(FeBeWaitSet, FeBeWaitSetSocketPos, port->waitfor, NULL); + + WaitEventSetWait(FeBeWaitSet, -1 /* no timeout */ , &event, 1, + WAIT_EVENT_CLIENT_READ); + + /* + * If the postmaster has died, it's not safe to continue running, + * because it is the postmaster's job to kill us if some other backend + * exits uncleanly. Moreover, we won't run very well in this state; + * helper processes like walwriter and the bgwriter will exit, so + * performance may be poor. Finally, if we don't exit, pg_ctl will be + * unable to restart the postmaster without manual intervention, so no + * new connections can be accepted. Exiting clears the deck for a + * postmaster restart. + * + * (Note that we only make this check when we would otherwise sleep on + * our latch. We might still continue running for a while if the + * postmaster is killed in mid-query, or even through multiple queries + * if we never have to wait for read. We don't want to burn too many + * cycles checking for this very rare condition, and this should cause + * us to exit quickly in most cases.) + */ + if (event.events & WL_POSTMASTER_DEATH) + ereport(FATAL, + (errcode(ERRCODE_ADMIN_SHUTDOWN), + errmsg("terminating connection due to unexpected postmaster exit"))); + + /* Handle interrupt. */ + if (event.events & WL_LATCH_SET) + { + ResetLatch(MyLatch); + ProcessClientReadInterrupt(true); + + /* + * We'll retry the read. Most likely it will return immediately + * because there's still no data available, and we'll wait for the + * socket to become ready again. + */ + } + goto retry; + } + + /* + * Process interrupts that happened during a successful (or non-blocking, + * or hard-failed) read. + */ + ProcessClientReadInterrupt(false); + + return n; +} + + +/* + * Write protocol-level data to be processed through any intermediate streams like TLS + */ +static int +io_write_with_wait(Port *port, char const *ptr, size_t len, size_t *bytes_processed) +{ + int rc; + size_t count = 0; + + *bytes_processed = 0; + + /* Deal with any already-pending interrupt condition. */ + ProcessClientWriteInterrupt(false); + +retry: + port->waitfor = WL_SOCKET_WRITEABLE; + + /* + * on retry it is possible some of the input will have already been + * processed, so make sure we offset our retries + */ + rc = io_stream_write(port->io_stream, ptr + *bytes_processed, len - *bytes_processed, &count); + *bytes_processed += count; + + if (rc < 0 && !port->noblock && (errno == EWOULDBLOCK || errno == EAGAIN)) + { + WaitEvent event; + + Assert(port->waitfor); + + ModifyWaitEvent(FeBeWaitSet, FeBeWaitSetSocketPos, port->waitfor, NULL); + + WaitEventSetWait(FeBeWaitSet, -1 /* no timeout */ , &event, 1, + WAIT_EVENT_CLIENT_WRITE); + + /* See comments in secure_read. */ + if (event.events & WL_POSTMASTER_DEATH) + ereport(FATAL, + (errcode(ERRCODE_ADMIN_SHUTDOWN), + errmsg("terminating connection due to unexpected postmaster exit"))); + + /* Handle interrupt. */ + if (event.events & WL_LATCH_SET) + { + ResetLatch(MyLatch); + ProcessClientWriteInterrupt(true); + + /* + * We'll retry the write. Most likely it will return immediately + * because there's still no buffer space available, and we'll wait + * for the socket to become ready again. + */ + } + goto retry; + } + + /* + * Process interrupts that happened during a successful (or non-blocking, + * or hard-failed) write. + */ + ProcessClientWriteInterrupt(false); + + return rc; +} + /* -------------------------------- * pq_recvbuf - load some bytes into the input buffer * @@ -938,8 +1163,8 @@ pq_recvbuf(void) errno = 0; - r = secure_read(MyProcPort, PqRecvBuffer + PqRecvLength, - PQ_RECV_BUFFER_SIZE - PqRecvLength); + r = io_read_with_wait(MyProcPort, PqRecvBuffer + PqRecvLength, + PQ_RECV_BUFFER_SIZE - PqRecvLength); if (r < 0) { @@ -1035,7 +1260,7 @@ pq_getbyte_if_available(unsigned char *c) errno = 0; - r = secure_read(MyProcPort, c, 1); + r = io_read_with_wait(MyProcPort, c, 1); if (r < 0) { /* @@ -1351,11 +1576,15 @@ internal_flush(void) while (bufptr < bufend) { - int r; + int rc; + size_t bytes_sent; + size_t available = bufend - bufptr; - r = secure_write(MyProcPort, bufptr, bufend - bufptr); + rc = io_write_with_wait(MyProcPort, bufptr, available, &bytes_sent); + bufptr += bytes_sent; + PqSendStart += bytes_sent; - if (r <= 0) + if (rc < 0 || (bytes_sent == 0 && available)) { if (errno == EINTR) continue; /* Ok if we were interrupted */ @@ -1400,8 +1629,6 @@ internal_flush(void) } last_reported_send_errno = 0; /* reset after any successful send */ - bufptr += r; - PqSendStart += r; } PqSendStart = PqSendPointer = 0; diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c index fb04e4dde3..edf28e36f1 100644 --- a/src/backend/postmaster/postmaster.c +++ b/src/backend/postmaster/postmaster.c @@ -4232,6 +4232,11 @@ BackendInitialize(Port *port) /* Save port etc. for ps status */ MyProcPort = port; + if (StreamSetupIo(port)) + ereport(ERROR, + (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("Unable to configure backend I/O"))); + /* Tell fd.c about the long-lived FD associated with the port */ ReserveExternalFD(); diff --git a/src/common/Makefile b/src/common/Makefile index 2ba5069dca..a80f4b39a6 100644 --- a/src/common/Makefile +++ b/src/common/Makefile @@ -59,6 +59,7 @@ OBJS_COMMON = \ file_perm.o \ file_utils.o \ hashfn.o \ + io_stream.o \ ip.o \ jsonapi.o \ keywords.o \ diff --git a/src/common/io_stream.c b/src/common/io_stream.c new file mode 100644 index 0000000000..b15aca326d --- /dev/null +++ b/src/common/io_stream.c @@ -0,0 +1,148 @@ +/*------------------------------------------------------------------------- + * + * io_stream.c + * functions related to managing layers of streaming IO. + * In general the base layer will work with raw sockets, and then + * additional layers will add features such as encryption and + * compression. + * + * Copyright (c) 2023, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/common/io_stream.c + * + *------------------------------------------------------------------------- + */ + +#ifndef FRONTEND +#include "postgres.h" +#else +#include "postgres_fe.h" +#endif + +#include + +#ifndef FRONTEND +#include "utils/memutils.h" +#define ALLOC(size) MemoryContextAlloc(TopMemoryContext, size) +#define FREE(size) pfree(size) +#else +#define ALLOC(size) malloc(size) +#define FREE(size) free(size) +#endif + +struct IoStreamLayer +{ + IoStreamProcessor *processor; + void *context; + IoStreamLayer *next; +}; + +struct IoStream +{ + IoStreamLayer *layer; +}; + +IoStream * +io_stream_create(void) +{ + IoStream *ret = ALLOC(sizeof(IoStream)); + + ret->layer = NULL; + return ret; +} + +void +io_stream_destroy(IoStream * arg) +{ + IoStreamLayer *layer; + + if (arg == NULL) + return; + + layer = arg->layer; + while (layer != NULL) + { + IoStreamLayer *next = layer->next; + + if (layer->processor->destroy != NULL) + layer->processor->destroy(layer->context); + FREE(layer); + layer = next; + } + FREE(arg); +} + +IoStreamLayer * +io_stream_add_layer(IoStream * stream, IoStreamProcessor * processor, void *context) +{ + IoStreamLayer *layer = ALLOC(sizeof(IoStreamLayer)); + + layer->processor = processor; + layer->context = context; + layer->next = stream->layer; + stream->layer = layer; + return layer; +} + +ssize_t +io_stream_read(IoStream * stream, void *data, size_t size, bool buffered_only) +{ + if (stream->layer == NULL) + return -1; + + return stream->layer->processor->read(stream->layer, stream->layer->context, data, size, buffered_only); +} + +int +io_stream_write(IoStream * stream, void const *data, size_t size, size_t *bytes_written) +{ + if (stream->layer == NULL) + return -1; + + return stream->layer->processor->write(stream->layer, stream->layer->context, data, size, bytes_written); +} + +bool +io_stream_buffered_read_data(IoStream * stream) +{ + IoStreamLayer *layer; + + for (layer = stream->layer; layer != NULL; layer = layer->next) + { + if (layer->processor->buffered_read_data != NULL && layer->processor->buffered_read_data(layer->context)) + return true; + } + return false; +} + +bool +io_stream_buffered_write_data(IoStream * stream) +{ + IoStreamLayer *layer; + + for (layer = stream->layer; layer != NULL; layer = layer->next) + { + if (layer->processor->buffered_write_data != NULL && layer->processor->buffered_write_data(layer->context)) + return true; + } + return false; +} + +ssize_t +io_stream_next_read(IoStreamLayer * layer, void *data, size_t size, bool buffered_only) +{ + if (layer->next == NULL) + return -1; + + return layer->next->processor->read(layer->next, layer->next->context, data, size, buffered_only); +} + +int +io_stream_next_write(IoStreamLayer * layer, void const *data, size_t size, size_t *bytes_written) +{ + if (layer->next == NULL) + return -1; + + return layer->next->processor->write(layer->next, layer->next->context, data, size, bytes_written); +} diff --git a/src/common/meson.build b/src/common/meson.build index 12fd43e87f..e25fa44133 100644 --- a/src/common/meson.build +++ b/src/common/meson.build @@ -13,6 +13,7 @@ common_sources = files( 'file_perm.c', 'file_utils.c', 'hashfn.c', + 'io_stream.c', 'ip.c', 'jsonapi.c', 'keywords.c', diff --git a/src/include/common/io_stream.h b/src/include/common/io_stream.h new file mode 100644 index 0000000000..9af88aa9ea --- /dev/null +++ b/src/include/common/io_stream.h @@ -0,0 +1,131 @@ +/*------------------------------------------------------------------------- + * + * io_stream.c + * defintions for managing layers of streaming IO. + * In general the base layer will work with raw sockets, and then + * additional layers will add features such as encryption and + * compression. + * + * Copyright (c) 2023, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/include/common/io_stream.h + * + *------------------------------------------------------------------------- + */ +#ifndef IO_STREAM_H +#define IO_STREAM_H + +/* Opaque structs should only be interacted with through corresponding functions*/ +typedef struct IoStreamLayer IoStreamLayer; +typedef struct IoStream IoStream; + +typedef ssize_t (*io_stream_read_func) (IoStreamLayer * self, void *context, void *data, size_t size, bool buffered_only); +typedef int (*io_stream_write_func) (IoStreamLayer * self, void *context, void const *data, size_t size, size_t *bytes_written); +typedef bool (*io_stream_predicate) (void *context); +typedef void (*io_stream_destroy_func) (void *context); + +typedef struct IoStreamProcessor +{ + /* + * Required Should call io_stream_next_read with self either directly or + * indirectly to recieve bytes from the underlying layer of the stream + */ + io_stream_read_func read; + + /* + * Required Should call io_stream_next_write with self either directly or + * indirectly to write bytes to the underlying layer of the stream + */ + io_stream_write_func write; + + /* + * Optional Return true if this layer is holding buffered readable data + */ + io_stream_predicate buffered_read_data; + + /* + * Optional Return true if this layer is holding buffered writable data + */ + io_stream_predicate buffered_write_data; + + /* + * Optional will be called as part of io_stream_destroy when cleaning up + * the stream + */ + io_stream_destroy_func destroy; +} IoStreamProcessor; + +/* + * Allocate a new IoStream and return the address to the caller. IoStreams should always be destroyed with + * the corresponding io_stream_destroy function + */ +extern IoStream * io_stream_create(void); +/* + * Adds new processors to the IO stream + * + * processorDescription contains collection of function pointers for this layer + * context (optional) pointer with context to be used in reader/writer + * + * Returns the newly created processor + * */ +extern IoStreamLayer * io_stream_add_layer(IoStream * stream, IoStreamProcessor * processorDescription, void *context); +/* + * Destroy an IoStream, freeing all associated memory + */ +extern void io_stream_destroy(IoStream * stream); + +/* + * Read data from the stream + * + * Reads at most size bytes into the buffer pointed to by data, and returns + * the number of bytes read. If buffered_only is true, then only data that + * was stored in an in-process buffer will be returned and this function will + * not block. In that case, a return value of 0 simply means there was no + * buffered data available and does not mean the stream has reached EOF. + */ +extern ssize_t io_stream_read(IoStream * stream, void *data, size_t size, bool buffered_only); + +/* + * Write data to the stream + * + * Writes at most size bytes from the buffer pointed to by data, and returns + * true on success, false on failure, with the specific error in errno. The + * count of bytes written from data will be stored in bytes_written and may + * be non-zero even on failure + */ +extern int io_stream_write(IoStream * stream, void const *data, size_t size, size_t *bytes_written); + +/* + * Check if there is data buffered in memory waiting to be read (e.g. if + * compression is in use and more uncompressed data was read than fit into + * the provided buffer) + */ +extern bool io_stream_buffered_read_data(IoStream * stream); + +/* + * Check if there is data buffered in memory waiting to be written to the underlying backing store + */ +extern bool io_stream_buffered_write_data(IoStream * stream); + +/* + * Read data from the next layer of the stream + * (to be used by io_stream_read_func) + * + * Reads at most size bytes into the buffer pointed to by data, and returns + * the number of bytes read + */ +extern ssize_t io_stream_next_read(IoStreamLayer * layer, void *data, size_t size, bool buffered_only); + +/* + * Write data to the next layer of the stream + * (to be used by io_stream_write_func) + * + * Writes at most size bytes from the buffer pointed to by data, and returns + * true on success, false on failure, with the specific error in errno. The + * count of bytes written from data will be stored in bytes_written and may + * be non-zero even on failure + */ +extern int io_stream_next_write(IoStreamLayer * layer, void const *data, size_t size, size_t *bytes_written); + +#endif /* //IO_STREAM_H */ diff --git a/src/include/libpq/libpq-be.h b/src/include/libpq/libpq-be.h index c57ed12fb6..87ba7f5ea0 100644 --- a/src/include/libpq/libpq-be.h +++ b/src/include/libpq/libpq-be.h @@ -53,6 +53,7 @@ typedef struct #endif #endif /* ENABLE_SSPI */ +#include "common/io_stream.h" #include "datatype/timestamp.h" #include "libpq/hba.h" #include "libpq/pqcomm.h" @@ -145,8 +146,11 @@ typedef struct ClientConnectionInfo typedef struct Port { + IoStream *io_stream; pgsocket sock; /* File descriptor */ bool noblock; /* is the socket in non-blocking mode? */ + int waitfor; /* Events to wait on the socket for after + * attempted read/write */ ProtocolVersion proto; /* FE/BE protocol version */ SockAddr laddr; /* local addr (postmaster) */ SockAddr raddr; /* remote addr (client) */ @@ -274,21 +278,6 @@ extern void be_tls_destroy(void); */ extern int be_tls_open_server(Port *port); -/* - * Close SSL connection. - */ -extern void be_tls_close(Port *port); - -/* - * Read data from a secure connection. - */ -extern ssize_t be_tls_read(Port *port, void *ptr, size_t len, int *waitfor); - -/* - * Write data to a secure connection. - */ -extern ssize_t be_tls_write(Port *port, void *ptr, size_t len, int *waitfor); - /* * Return information about the SSL connection. */ @@ -324,10 +313,6 @@ extern bool be_gssapi_get_auth(Port *port); extern bool be_gssapi_get_enc(Port *port); extern const char *be_gssapi_get_princ(Port *port); extern bool be_gssapi_get_delegation(Port *port); - -/* Read and write to a GSSAPI-encrypted connection. */ -extern ssize_t be_gssapi_read(Port *port, void *ptr, size_t len); -extern ssize_t be_gssapi_write(Port *port, void *ptr, size_t len); #endif /* ENABLE_GSS */ extern PGDLLIMPORT ProtocolVersion FrontendProtocol; diff --git a/src/include/libpq/libpq.h b/src/include/libpq/libpq.h index a6104d8cd0..3612280146 100644 --- a/src/include/libpq/libpq.h +++ b/src/include/libpq/libpq.h @@ -68,6 +68,7 @@ extern int StreamServerPort(int family, const char *hostName, unsigned short portNumber, const char *unixSocketDir, pgsocket ListenSocket[], int *NumListenSockets, int MaxListen); extern int StreamConnection(pgsocket server_fd, Port *port); +extern int StreamSetupIo(Port *port); extern void StreamClose(pgsocket sock); extern void TouchSocketFiles(void); extern void RemoveSocketFiles(void); @@ -104,11 +105,6 @@ extern int secure_initialize(bool isServerStart); extern bool secure_loaded_verify_locations(void); extern void secure_destroy(void); extern int secure_open_server(Port *port); -extern void secure_close(Port *port); -extern ssize_t secure_read(Port *port, void *ptr, size_t len); -extern ssize_t secure_write(Port *port, void *ptr, size_t len); -extern ssize_t secure_raw_read(Port *port, void *ptr, size_t len); -extern ssize_t secure_raw_write(Port *port, const void *ptr, size_t len); /* * prototypes for functions in be-secure-gssapi.c diff --git a/src/interfaces/libpq/fe-connect.c b/src/interfaces/libpq/fe-connect.c index bf83a9b569..a0f12e62af 100644 --- a/src/interfaces/libpq/fe-connect.c +++ b/src/interfaces/libpq/fe-connect.c @@ -136,6 +136,55 @@ static int ldapServiceLookup(const char *purl, PQconninfoOption *options, #define DefaultGSSMode "disable" #endif +/* + * Macros to handle disabling and then restoring the state of SIGPIPE handling. + * On Windows, these are all no-ops since there's no SIGPIPEs. + */ + +#ifndef WIN32 + +#define SIGPIPE_MASKED(conn) ((conn)->sigpipe_so || (conn)->sigpipe_flag) + +struct sigpipe_info +{ + sigset_t oldsigmask; + bool sigpipe_pending; + bool got_epipe; +}; + +#define DECLARE_SIGPIPE_INFO(spinfo) struct sigpipe_info spinfo + +#define DISABLE_SIGPIPE(conn, spinfo, failaction) \ +do { \ +(spinfo).got_epipe = false; \ +if (!SIGPIPE_MASKED(conn)) \ +{ \ +if (pq_block_sigpipe(&(spinfo).oldsigmask, \ +&(spinfo).sigpipe_pending) < 0) \ +failaction; \ +} \ +} while (0) + +#define REMEMBER_EPIPE(spinfo, cond) \ +do { \ +if (cond) \ +(spinfo).got_epipe = true; \ +} while (0) + +#define RESTORE_SIGPIPE(conn, spinfo) \ +do { \ +if (!SIGPIPE_MASKED(conn)) \ +pq_reset_sigpipe(&(spinfo).oldsigmask, (spinfo).sigpipe_pending, \ +(spinfo).got_epipe); \ +} while (0) +#else /* WIN32 */ + +#define DECLARE_SIGPIPE_INFO(spinfo) +#define DISABLE_SIGPIPE(conn, spinfo, failaction) +#define REMEMBER_EPIPE(spinfo, cond) +#define RESTORE_SIGPIPE(conn, spinfo) +#endif /* WIN32 */ + /* ---------- * Definition of the conninfo parameters and their fallback resources. * @@ -445,6 +494,12 @@ static bool sslVerifyProtocolVersion(const char *version); static bool sslVerifyProtocolRange(const char *min, const char *max); static bool parse_int_param(const char *value, int *result, PGconn *conn, const char *context); +static ssize_t socket_read(IoStreamLayer * self, PGconn *conn, void *ptr, size_t len, bool buffered_only); +static int socket_write(IoStreamLayer * self, PGconn *conn, void const *ptr, size_t len, size_t *bytes_written); +IoStreamProcessor socket_processor = { + .read = (io_stream_read_func) socket_read, + .write = (io_stream_write_func) socket_write +}; /* global variable because fe-auth.c needs to access it */ @@ -466,9 +521,8 @@ pgthreadlock_t pg_g_threadlock = default_threadlock; void pqDropConnection(PGconn *conn, bool flushInput) { - /* Drop any SSL state */ - pqsecure_close(conn); - + io_stream_destroy(conn->io_stream); + conn->io_stream = NULL; /* Close the socket itself */ if (conn->sock != PGINVALID_SOCKET) closesocket(conn->sock); @@ -2879,6 +2933,9 @@ keep_going: /* We will come back to here until there is sock_type |= SOCK_NONBLOCK; #endif conn->sock = socket(addr_cur->family, sock_type, 0); + conn->io_stream = io_stream_create(); + io_stream_add_layer(conn->io_stream, &socket_processor, conn); + if (conn->sock == PGINVALID_SOCKET) { int errorno = SOCK_ERRNO; @@ -7829,3 +7886,283 @@ PQregisterThreadLock(pgthreadlock_t newhandler) return prev; } + +static ssize_t +socket_read(IoStreamLayer * self, PGconn *conn, void *ptr, size_t len, bool buffered_only) +{ + ssize_t n; + int result_errno = 0; + char sebuf[PG_STRERROR_R_BUFLEN]; + + SOCK_ERRNO_SET(0); + + if (buffered_only) + return 0; + + n = recv(conn->sock, ptr, len, 0); + + if (n < 0) + { + result_errno = SOCK_ERRNO; + + /* Set error message if appropriate */ + switch (result_errno) + { +#ifdef EAGAIN + case EAGAIN: +#endif +#if defined(EWOULDBLOCK) && (!defined(EAGAIN) || (EWOULDBLOCK != EAGAIN)) + case EWOULDBLOCK: +#endif + case EINTR: + /* no error message, caller is expected to retry */ + break; + + case EPIPE: + case ECONNRESET: + libpq_append_conn_error(conn, "server closed the connection unexpectedly\n" + "\tThis probably means the server terminated abnormally\n" + "\tbefore or while processing the request."); + break; + + case 0: + /* If errno didn't get set, treat it as regular EOF */ + n = 0; + break; + + default: + libpq_append_conn_error(conn, "could not receive data from server: %s", + SOCK_STRERROR(result_errno, + sebuf, sizeof(sebuf))); + break; + } + } + + /* ensure we return the intended errno to caller */ + SOCK_ERRNO_SET(result_errno); + + return n; +} + +/* + * Socket-level implementation of data writing. + * + * This is used directly for an unencrypted connection. For encrypted + * connections, this is wrapped by higher layers through IO stream + * + * This function reports failure (i.e., returns a negative result) only + * for retryable errors such as EINTR. Looping for such cases is to be + * handled at some outer level, maybe all the way up to the application. + * For hard failures, we set conn->write_failed and store an error message + * in conn->write_err_msg, but then claim to have written the data anyway. + * This is because we don't want to report write failures so long as there + * is a possibility of reading from the server and getting an error message + * that could explain why the connection dropped. Many TCP stacks have + * race conditions such that a write failure may or may not be reported + * before all incoming data has been read. + * + * Note that this error behavior happens below the SSL management level when + * we are using SSL. That's because at least some versions of OpenSSL are + * too quick to report a write failure when there's still a possibility to + * get a more useful error from the server. + */ +static int +socket_write(IoStreamLayer * self, PGconn *conn, void const *ptr, size_t len, size_t *bytes_written) +{ + ssize_t n; + int flags = 0; + int result_errno = 0; + char msgbuf[1024]; + char sebuf[PG_STRERROR_R_BUFLEN]; + + DECLARE_SIGPIPE_INFO(spinfo); + + /* + * If we already had a write failure, we will never again try to send data + * on that connection. Even if the kernel would let us, we've probably + * lost message boundary sync with the server. conn->write_failed + * therefore persists until the connection is reset, and we just discard + * all data presented to be written. + */ + if (conn->write_failed) + return len; + +#ifdef MSG_NOSIGNAL + if (conn->sigpipe_flag) + flags |= MSG_NOSIGNAL; + +retry_masked: +#endif /* MSG_NOSIGNAL */ + + DISABLE_SIGPIPE(conn, spinfo, return -1); + + n = send(conn->sock, ptr, len, flags); + + if (n < 0) + { + *bytes_written = 0; + result_errno = SOCK_ERRNO; + + /* + * If we see an EINVAL, it may be because MSG_NOSIGNAL isn't available + * on this machine. So, clear sigpipe_flag so we don't try the flag + * again, and retry the send(). + */ +#ifdef MSG_NOSIGNAL + if (flags != 0 && result_errno == EINVAL) + { + conn->sigpipe_flag = false; + flags = 0; + goto retry_masked; + } +#endif /* MSG_NOSIGNAL */ + + /* Set error message if appropriate */ + switch (result_errno) + { +#ifdef EAGAIN + case EAGAIN: +#endif +#if defined(EWOULDBLOCK) && (!defined(EAGAIN) || (EWOULDBLOCK != EAGAIN)) + case EWOULDBLOCK: +#endif + case EINTR: + /* no error message, caller is expected to retry */ + break; + + case EPIPE: + /* Set flag for EPIPE */ + REMEMBER_EPIPE(spinfo, true); + + /* FALL THRU */ + + case ECONNRESET: + conn->write_failed = true; + /* Store error message in conn->write_err_msg, if possible */ + /* (strdup failure is OK, we'll cope later) */ + snprintf(msgbuf, sizeof(msgbuf), + libpq_gettext("server closed the connection unexpectedly\n" + "\tThis probably means the server terminated abnormally\n" + "\tbefore or while processing the request.")); + /* keep newline out of translated string */ + strlcat(msgbuf, "\n", sizeof(msgbuf)); + conn->write_err_msg = strdup(msgbuf); + /* Now claim the write succeeded */ + n = len; + break; + + default: + conn->write_failed = true; + /* Store error message in conn->write_err_msg, if possible */ + /* (strdup failure is OK, we'll cope later) */ + snprintf(msgbuf, sizeof(msgbuf), + libpq_gettext("could not send data to server: %s"), + SOCK_STRERROR(result_errno, + sebuf, sizeof(sebuf))); + /* keep newline out of translated string */ + strlcat(msgbuf, "\n", sizeof(msgbuf)); + conn->write_err_msg = strdup(msgbuf); + /* Now claim the write succeeded */ + n = len; + break; + } + } + else + { + *bytes_written = n; + n = 0; + } + + RESTORE_SIGPIPE(conn, spinfo); + + /* ensure we return the intended errno to caller */ + SOCK_ERRNO_SET(result_errno); + + return n; +} + +#if !defined(WIN32) + +/* + * Block SIGPIPE for this thread. This prevents send()/write() from exiting + * the application. + */ +int +pq_block_sigpipe(sigset_t *osigset, bool *sigpipe_pending) +{ + sigset_t sigpipe_sigset; + sigset_t sigset; + + sigemptyset(&sigpipe_sigset); + sigaddset(&sigpipe_sigset, SIGPIPE); + + /* Block SIGPIPE and save previous mask for later reset */ + SOCK_ERRNO_SET(pthread_sigmask(SIG_BLOCK, &sigpipe_sigset, osigset)); + if (SOCK_ERRNO) + return -1; + + /* We can have a pending SIGPIPE only if it was blocked before */ + if (sigismember(osigset, SIGPIPE)) + { + /* Is there a pending SIGPIPE? */ + if (sigpending(&sigset) != 0) + return -1; + + if (sigismember(&sigset, SIGPIPE)) + *sigpipe_pending = true; + else + *sigpipe_pending = false; + } + else + *sigpipe_pending = false; + + return 0; +} + +/* + * Discard any pending SIGPIPE and reset the signal mask. + * + * Note: we are effectively assuming here that the C library doesn't queue + * up multiple SIGPIPE events. If it did, then we'd accidentally leave + * ours in the queue when an event was already pending and we got another. + * As long as it doesn't queue multiple events, we're OK because the caller + * can't tell the difference. + * + * The caller should say got_epipe = false if it is certain that it + * didn't get an EPIPE error; in that case we'll skip the clear operation + * and things are definitely OK, queuing or no. If it got one or might have + * gotten one, pass got_epipe = true. + * + * We do not want this to change errno, since if it did that could lose + * the error code from a preceding send(). We essentially assume that if + * we were able to do pq_block_sigpipe(), this can't fail. + */ +void +pq_reset_sigpipe(sigset_t *osigset, bool sigpipe_pending, bool got_epipe) +{ + int save_errno = SOCK_ERRNO; + int signo; + sigset_t sigset; + + /* Clear SIGPIPE only if none was pending */ + if (got_epipe && !sigpipe_pending) + { + if (sigpending(&sigset) == 0 && + sigismember(&sigset, SIGPIPE)) + { + sigset_t sigpipe_sigset; + + sigemptyset(&sigpipe_sigset); + sigaddset(&sigpipe_sigset, SIGPIPE); + + sigwait(&sigpipe_sigset, &signo); + } + } + + /* Restore saved block mask */ + pthread_sigmask(SIG_SETMASK, osigset, NULL); + + SOCK_ERRNO_SET(save_errno); +} + +#endif /* !WIN32 */ diff --git a/src/interfaces/libpq/fe-misc.c b/src/interfaces/libpq/fe-misc.c index 660cdec93c..6772f2876d 100644 --- a/src/interfaces/libpq/fe-misc.c +++ b/src/interfaces/libpq/fe-misc.c @@ -614,8 +614,8 @@ pqReadData(PGconn *conn) /* OK, try to read some data */ retry3: - nread = pqsecure_read(conn, conn->inBuffer + conn->inEnd, - conn->inBufSize - conn->inEnd); + nread = io_stream_read(conn->io_stream, conn->inBuffer + conn->inEnd, + conn->inBufSize - conn->inEnd, false); if (nread < 0) { switch (SOCK_ERRNO) @@ -709,8 +709,8 @@ retry3: * arrived. */ retry4: - nread = pqsecure_read(conn, conn->inBuffer + conn->inEnd, - conn->inBufSize - conn->inEnd); + nread = io_stream_read(conn->io_stream, conn->inBuffer + conn->inEnd, + conn->inBufSize - conn->inEnd, false); if (nread < 0) { switch (SOCK_ERRNO) @@ -824,10 +824,11 @@ pqSendSome(PGconn *conn, int len) /* while there's still data to send */ while (len > 0) { - int sent; + size_t sent; + int rc; #ifndef WIN32 - sent = pqsecure_write(conn, ptr, len); + rc = io_stream_write(conn->io_stream, ptr, len, &sent); #else /* @@ -835,10 +836,13 @@ pqSendSome(PGconn *conn, int len) * failure-point appears to be different in different versions of * Windows, but 64k should always be safe. */ - sent = pqsecure_write(conn, ptr, Min(len, 65536)); + rc = io_stream_write(conn->io_stream, ptr, Min(len, 65536), &sent); #endif + ptr += sent; + len -= sent; + remaining -= sent; - if (sent < 0) + if (rc < 0) { /* Anything except EAGAIN/EWOULDBLOCK/EINTR is trouble */ switch (SOCK_ERRNO) @@ -878,12 +882,6 @@ pqSendSome(PGconn *conn, int len) return -1; } } - else - { - ptr += sent; - len -= sent; - remaining -= sent; - } if (len > 0) { @@ -1048,14 +1046,11 @@ pqSocketCheck(PGconn *conn, int forRead, int forWrite, time_t end_time) return -1; } -#ifdef USE_SSL - /* Check for SSL library buffering read bytes */ - if (forRead && conn->ssl_in_use && pgtls_read_pending(conn)) + if (forRead && io_stream_buffered_read_data(conn->io_stream)) { /* short-circuit the select */ return 1; } -#endif /* We will retry as long as we get EINTR */ do diff --git a/src/interfaces/libpq/fe-secure-gssapi.c b/src/interfaces/libpq/fe-secure-gssapi.c index ea8b0020d2..85cec6ced3 100644 --- a/src/interfaces/libpq/fe-secure-gssapi.c +++ b/src/interfaces/libpq/fe-secure-gssapi.c @@ -69,6 +69,13 @@ #define PqGSSResultNext (conn->gss_ResultNext) #define PqGSSMaxPktSize (conn->gss_MaxPktSize) +static ssize_t pg_GSS_read(IoStreamLayer * self, PGconn *conn, void *ptr, size_t len, bool buffered_only); +static int pg_GSS_write(IoStreamLayer * self, PGconn *conn, const void *ptr, size_t len, size_t *bytes_written); + +IoStreamProcessor pg_GSS_processor = { + .read = (io_stream_read_func) pg_GSS_read, + .write = (io_stream_write_func) pg_GSS_write +}; /* * Attempt to write len bytes of data from ptr to a GSSAPI-encrypted connection. @@ -82,8 +89,8 @@ * For retryable errors, caller should call again (passing the same or more * data) once the socket is ready. */ -ssize_t -pg_GSS_write(PGconn *conn, const void *ptr, size_t len) +static int +pg_GSS_write(IoStreamLayer * self, PGconn *conn, const void *ptr, size_t len, size_t *bytes_written) { OM_uint32 major, minor; @@ -94,6 +101,8 @@ pg_GSS_write(PGconn *conn, const void *ptr, size_t len) size_t bytes_encrypted; gss_ctx_id_t gctx = conn->gctx; + *bytes_written = 0; + /* * When we get a retryable failure, we must not tell the caller we have * successfully transmitted everything, else it won't retry. For @@ -124,7 +133,7 @@ pg_GSS_write(PGconn *conn, const void *ptr, size_t len) /* * Loop through encrypting data and sending it out until it's all done or - * pqsecure_raw_write() complains (which would likely mean that the socket + * io_stream_next_write() complains (which would likely mean that the socket * is non-blocking and the requested send() would block, or there was some * kind of actual error). */ @@ -141,20 +150,21 @@ pg_GSS_write(PGconn *conn, const void *ptr, size_t len) */ if (PqGSSSendLength) { - ssize_t retval; + int retval; + size_t count; ssize_t amount = PqGSSSendLength - PqGSSSendNext; - retval = pqsecure_raw_write(conn, PqGSSSendBuffer + PqGSSSendNext, amount); - if (retval <= 0) + retval = io_stream_next_write(self, PqGSSSendBuffer + PqGSSSendNext, amount, &count); + if (retval < 0 || count == 0) return retval; /* * Check if this was a partial write, and if so, move forward that * far in our buffer and try again. */ - if (retval < amount) + if (count < amount) { - PqGSSSendNext += retval; + PqGSSSendNext += count; continue; } @@ -235,7 +245,8 @@ pg_GSS_write(PGconn *conn, const void *ptr, size_t len) /* We're reporting all the data as sent, so reset PqGSSSendConsumed. */ PqGSSSendConsumed = 0; - ret = bytes_encrypted; + ret = 0; + *bytes_written = bytes_encrypted; cleanup: /* Release GSSAPI buffer storage, if we didn't already */ @@ -255,8 +266,8 @@ cleanup: * error, a message is added to conn->errorMessage. For retryable errors, * caller should call again once the socket is ready. */ -ssize_t -pg_GSS_read(PGconn *conn, void *ptr, size_t len) +static ssize_t +pg_GSS_read(IoStreamLayer * self, PGconn *conn, void *ptr, size_t len, bool buffered_only) { OM_uint32 major, minor; @@ -266,6 +277,9 @@ pg_GSS_read(PGconn *conn, void *ptr, size_t len) size_t bytes_returned = 0; gss_ctx_id_t gctx = conn->gctx; + if (buffered_only) + return 0; + /* * The plan here is to read one incoming encrypted packet into * PqGSSRecvBuffer, decrypt it into PqGSSResultBuffer, and then dole out @@ -322,10 +336,10 @@ pg_GSS_read(PGconn *conn, void *ptr, size_t len) /* Collect the length if we haven't already */ if (PqGSSRecvLength < sizeof(uint32)) { - ret = pqsecure_raw_read(conn, PqGSSRecvBuffer + PqGSSRecvLength, - sizeof(uint32) - PqGSSRecvLength); + ret = io_stream_next_read(self, PqGSSRecvBuffer + PqGSSRecvLength, + sizeof(uint32) - PqGSSRecvLength, false); - /* If ret <= 0, pqsecure_raw_read already set the correct errno */ + /* If ret <= 0, io_stream_next_read already set the correct errno */ if (ret <= 0) return ret; @@ -355,9 +369,9 @@ pg_GSS_read(PGconn *conn, void *ptr, size_t len) * Read as much of the packet as we are able to on this call into * wherever we left off from the last time we were called. */ - ret = pqsecure_raw_read(conn, PqGSSRecvBuffer + PqGSSRecvLength, - input.length - (PqGSSRecvLength - sizeof(uint32))); - /* If ret <= 0, pqsecure_raw_read already set the correct errno */ + ret = io_stream_next_read(self, PqGSSRecvBuffer + PqGSSRecvLength, + input.length - (PqGSSRecvLength - sizeof(uint32)), false); + /* If ret <= 0, io_stream_next_read already set the correct errno */ if (ret <= 0) return ret; @@ -418,16 +432,17 @@ cleanup: } /* - * Simple wrapper for reading from pqsecure_raw_read. + * Simple wrapper for reading from io_stream_read. Only used during connecition setup + * before GSS is added to the io_stream. * - * This takes the same arguments as pqsecure_raw_read, plus an output parameter + * This takes the same arguments as io_stream_read, plus an output parameter * to return the number of bytes read. This handles if blocking would occur and * if we detect EOF on the connection. */ static PostgresPollingStatusType gss_read(PGconn *conn, void *recv_buffer, size_t length, ssize_t *ret) { - *ret = pqsecure_raw_read(conn, recv_buffer, length); + *ret = io_stream_read(conn->io_stream, recv_buffer, length, false); if (*ret < 0) { if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) @@ -447,7 +462,7 @@ gss_read(PGconn *conn, void *recv_buffer, size_t length, ssize_t *ret) if (!result) return PGRES_POLLING_READING; - *ret = pqsecure_raw_read(conn, recv_buffer, length); + *ret = io_stream_read(conn->io_stream, recv_buffer, length, false); if (*ret < 0) { if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) @@ -506,8 +521,9 @@ pqsecure_open_gss(PGconn *conn) if (PqGSSSendLength) { ssize_t amount = PqGSSSendLength - PqGSSSendNext; + size_t count; - ret = pqsecure_raw_write(conn, PqGSSSendBuffer + PqGSSSendNext, amount); + ret = io_stream_write(conn->io_stream, PqGSSSendBuffer + PqGSSSendNext, amount, &count); if (ret < 0) { if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) @@ -516,7 +532,7 @@ pqsecure_open_gss(PGconn *conn) return PGRES_POLLING_FAILED; } - if (ret < amount) + if (count < amount) { PqGSSSendNext += ret; return PGRES_POLLING_WRITING; @@ -662,6 +678,8 @@ pqsecure_open_gss(PGconn *conn) */ conn->gssenc = true; conn->gssapi_used = true; + io_stream_add_layer(conn->io_stream, &pg_GSS_processor, conn); + /* Clean up */ gss_release_cred(&minor, &conn->gcred); diff --git a/src/interfaces/libpq/fe-secure-openssl.c b/src/interfaces/libpq/fe-secure-openssl.c index 2b221e7d15..b9706cb151 100644 --- a/src/interfaces/libpq/fe-secure-openssl.c +++ b/src/interfaces/libpq/fe-secure-openssl.c @@ -81,8 +81,17 @@ static int PQssl_passwd_cb(char *buf, int size, int rwflag, void *userdata); static int my_sock_read(BIO *h, char *buf, int size); static int my_sock_write(BIO *h, const char *buf, int size); static BIO_METHOD *my_BIO_s_socket(void); -static int my_SSL_set_fd(PGconn *conn, int fd); - +static int my_SSL_set_fd(SSL *ssl, int fd, IoStreamLayer * layer); +static ssize_t pgtls_read(IoStreamLayer * self, PGconn *conn, void *ptr, size_t len, bool buffered_only); +static int pgtls_write(IoStreamLayer * self, PGconn *conn, void const *ptr, size_t len, size_t *bytes_written); +static bool pgtls_read_pending(PGconn *conn); +static void pgtls_close(PGconn *conn); +IoStreamProcessor pgtls_processor = { + .read = (io_stream_read_func) pgtls_read, + .write = (io_stream_write_func) pgtls_write, + .buffered_read_data = (io_stream_predicate) pgtls_read_pending, + .destroy = (io_stream_destroy_func) pgtls_close +}; static bool pq_init_ssl_lib = true; static bool pq_init_crypto_lib = true; @@ -141,8 +150,8 @@ pgtls_open_client(PGconn *conn) return open_client_SSL(conn); } -ssize_t -pgtls_read(PGconn *conn, void *ptr, size_t len) +static ssize_t +pgtls_read(IoStreamLayer * self, PGconn *conn, void *ptr, size_t len, bool buffered_only) { ssize_t n; int result_errno = 0; @@ -150,8 +159,20 @@ pgtls_read(PGconn *conn, void *ptr, size_t len) int err; unsigned long ecode; + if (buffered_only) + { + /* + * SSL_pending bytes are guaranteed to be available and readable + * without blocking + */ + len = Min(len, SSL_pending(conn->ssl)); + if (len == 0) + return 0; + } + rloop: + /* * Prepare to call SSL_get_error() by clearing thread's OpenSSL error * queue. In general, the current thread's error queue must be empty @@ -257,14 +278,14 @@ rloop: return n; } -bool +static bool pgtls_read_pending(PGconn *conn) { return SSL_pending(conn->ssl) > 0; } -ssize_t -pgtls_write(PGconn *conn, const void *ptr, size_t len) +static int +pgtls_write(IoStreamLayer * self, PGconn *conn, const void *ptr, size_t len, size_t *bytes_written) { ssize_t n; int result_errno = 0; @@ -360,7 +381,17 @@ pgtls_write(PGconn *conn, const void *ptr, size_t len) /* ensure we return the intended errno to caller */ SOCK_ERRNO_SET(result_errno); - return n; + + if (n >= 0) + { + *bytes_written = n; + return 0; + } + else + { + *bytes_written = 0; + return n; + } } char * @@ -1211,7 +1242,8 @@ initialize_SSL(PGconn *conn) */ if (!(conn->ssl = SSL_new(SSL_context)) || !SSL_set_app_data(conn->ssl, conn) || - !my_SSL_set_fd(conn, conn->sock)) + !my_SSL_set_fd(conn->ssl, conn->sock, + io_stream_add_layer(conn->io_stream, &pgtls_processor, conn))) { char *err = SSLerrmessage(ERR_get_error()); @@ -1613,7 +1645,7 @@ open_client_SSL(PGconn *conn) return PGRES_POLLING_OK; } -void +static void pgtls_close(PGconn *conn) { bool destroy_needed = false; @@ -1815,7 +1847,7 @@ PQsslAttribute(PGconn *conn, const char *attribute_name) /* * Private substitute BIO: this does the sending and receiving using - * pqsecure_raw_write() and pqsecure_raw_read() instead, to allow those + * io_stream_next_write() and io_stream_next_read() instead, to allow those * functions to disable SIGPIPE and give better error messages on I/O errors. * * These functions are closely modelled on the standard socket BIO in OpenSSL; @@ -1830,7 +1862,7 @@ my_sock_read(BIO *h, char *buf, int size) { int res; - res = pqsecure_raw_read((PGconn *) BIO_get_app_data(h), buf, size); + res = io_stream_next_read(BIO_get_app_data(h), buf, size, false); BIO_clear_retry_flags(h); if (res < 0) { @@ -1859,8 +1891,9 @@ static int my_sock_write(BIO *h, const char *buf, int size) { int res; + size_t count; - res = pqsecure_raw_write((PGconn *) BIO_get_app_data(h), buf, size); + res = io_stream_next_write(BIO_get_app_data(h), buf, size, &count); BIO_clear_retry_flags(h); if (res < 0) { @@ -1882,7 +1915,7 @@ my_sock_write(BIO *h, const char *buf, int size) } } - return res; + return res == 0 ? count : res; } static BIO_METHOD * @@ -1952,7 +1985,7 @@ err: /* This should exactly match OpenSSL's SSL_set_fd except for using my BIO */ static int -my_SSL_set_fd(PGconn *conn, int fd) +my_SSL_set_fd(SSL *ssl, int fd, IoStreamLayer * layer) { int ret = 0; BIO *bio; @@ -1965,15 +1998,16 @@ my_SSL_set_fd(PGconn *conn, int fd) goto err; } bio = BIO_new(bio_method); + if (bio == NULL) { SSLerr(SSL_F_SSL_SET_FD, ERR_R_BUF_LIB); goto err; } - BIO_set_app_data(bio, conn); + BIO_set_app_data(bio, layer); - SSL_set_bio(conn->ssl, bio, bio); BIO_set_fd(bio, fd, BIO_NOCLOSE); + SSL_set_bio(ssl, bio, bio); ret = 1; err: return ret; diff --git a/src/interfaces/libpq/fe-secure.c b/src/interfaces/libpq/fe-secure.c index b2430362a9..4b34de0220 100644 --- a/src/interfaces/libpq/fe-secure.c +++ b/src/interfaces/libpq/fe-secure.c @@ -45,55 +45,6 @@ #include "libpq-fe.h" #include "libpq-int.h" -/* - * Macros to handle disabling and then restoring the state of SIGPIPE handling. - * On Windows, these are all no-ops since there's no SIGPIPEs. - */ - -#ifndef WIN32 - -#define SIGPIPE_MASKED(conn) ((conn)->sigpipe_so || (conn)->sigpipe_flag) - -struct sigpipe_info -{ - sigset_t oldsigmask; - bool sigpipe_pending; - bool got_epipe; -}; - -#define DECLARE_SIGPIPE_INFO(spinfo) struct sigpipe_info spinfo - -#define DISABLE_SIGPIPE(conn, spinfo, failaction) \ - do { \ - (spinfo).got_epipe = false; \ - if (!SIGPIPE_MASKED(conn)) \ - { \ - if (pq_block_sigpipe(&(spinfo).oldsigmask, \ - &(spinfo).sigpipe_pending) < 0) \ - failaction; \ - } \ - } while (0) - -#define REMEMBER_EPIPE(spinfo, cond) \ - do { \ - if (cond) \ - (spinfo).got_epipe = true; \ - } while (0) - -#define RESTORE_SIGPIPE(conn, spinfo) \ - do { \ - if (!SIGPIPE_MASKED(conn)) \ - pq_reset_sigpipe(&(spinfo).oldsigmask, (spinfo).sigpipe_pending, \ - (spinfo).got_epipe); \ - } while (0) -#else /* WIN32 */ - -#define DECLARE_SIGPIPE_INFO(spinfo) -#define DISABLE_SIGPIPE(conn, spinfo, failaction) -#define REMEMBER_EPIPE(spinfo, cond) -#define RESTORE_SIGPIPE(conn, spinfo) -#endif /* WIN32 */ - /* ------------------------------------------------------------ */ /* Procedures common to all secure sessions */ /* ------------------------------------------------------------ */ @@ -160,281 +111,6 @@ pqsecure_open_client(PGconn *conn) #endif } -/* - * Close secure session. - */ -void -pqsecure_close(PGconn *conn) -{ -#ifdef USE_SSL - pgtls_close(conn); -#endif -} - -/* - * Read data from a secure connection. - * - * On failure, this function is responsible for appending a suitable message - * to conn->errorMessage. The caller must still inspect errno, but only - * to determine whether to continue/retry after error. - */ -ssize_t -pqsecure_read(PGconn *conn, void *ptr, size_t len) -{ - ssize_t n; - -#ifdef USE_SSL - if (conn->ssl_in_use) - { - n = pgtls_read(conn, ptr, len); - } - else -#endif -#ifdef ENABLE_GSS - if (conn->gssenc) - { - n = pg_GSS_read(conn, ptr, len); - } - else -#endif - { - n = pqsecure_raw_read(conn, ptr, len); - } - - return n; -} - -ssize_t -pqsecure_raw_read(PGconn *conn, void *ptr, size_t len) -{ - ssize_t n; - int result_errno = 0; - char sebuf[PG_STRERROR_R_BUFLEN]; - - SOCK_ERRNO_SET(0); - - n = recv(conn->sock, ptr, len, 0); - - if (n < 0) - { - result_errno = SOCK_ERRNO; - - /* Set error message if appropriate */ - switch (result_errno) - { -#ifdef EAGAIN - case EAGAIN: -#endif -#if defined(EWOULDBLOCK) && (!defined(EAGAIN) || (EWOULDBLOCK != EAGAIN)) - case EWOULDBLOCK: -#endif - case EINTR: - /* no error message, caller is expected to retry */ - break; - - case EPIPE: - case ECONNRESET: - libpq_append_conn_error(conn, "server closed the connection unexpectedly\n" - "\tThis probably means the server terminated abnormally\n" - "\tbefore or while processing the request."); - break; - - case 0: - /* If errno didn't get set, treat it as regular EOF */ - n = 0; - break; - - default: - libpq_append_conn_error(conn, "could not receive data from server: %s", - SOCK_STRERROR(result_errno, - sebuf, sizeof(sebuf))); - break; - } - } - - /* ensure we return the intended errno to caller */ - SOCK_ERRNO_SET(result_errno); - - return n; -} - -/* - * Write data to a secure connection. - * - * Returns the number of bytes written, or a negative value (with errno - * set) upon failure. The write count could be less than requested. - * - * Note that socket-level hard failures are masked from the caller, - * instead setting conn->write_failed and storing an error message - * in conn->write_err_msg; see pqsecure_raw_write. This allows us to - * postpone reporting of write failures until we're sure no error - * message is available from the server. - * - * However, errors detected in the SSL or GSS management level are reported - * via a negative result, with message appended to conn->errorMessage. - * It's frequently unclear whether such errors should be considered read or - * write errors, so we don't attempt to postpone reporting them. - * - * The caller must still inspect errno upon failure, but only to determine - * whether to continue/retry; a message has been saved someplace in any case. - */ -ssize_t -pqsecure_write(PGconn *conn, const void *ptr, size_t len) -{ - ssize_t n; - -#ifdef USE_SSL - if (conn->ssl_in_use) - { - n = pgtls_write(conn, ptr, len); - } - else -#endif -#ifdef ENABLE_GSS - if (conn->gssenc) - { - n = pg_GSS_write(conn, ptr, len); - } - else -#endif - { - n = pqsecure_raw_write(conn, ptr, len); - } - - return n; -} - -/* - * Low-level implementation of pqsecure_write. - * - * This is used directly for an unencrypted connection. For encrypted - * connections, this does the physical I/O on behalf of pgtls_write or - * pg_GSS_write. - * - * This function reports failure (i.e., returns a negative result) only - * for retryable errors such as EINTR. Looping for such cases is to be - * handled at some outer level, maybe all the way up to the application. - * For hard failures, we set conn->write_failed and store an error message - * in conn->write_err_msg, but then claim to have written the data anyway. - * This is because we don't want to report write failures so long as there - * is a possibility of reading from the server and getting an error message - * that could explain why the connection dropped. Many TCP stacks have - * race conditions such that a write failure may or may not be reported - * before all incoming data has been read. - * - * Note that this error behavior happens below the SSL management level when - * we are using SSL. That's because at least some versions of OpenSSL are - * too quick to report a write failure when there's still a possibility to - * get a more useful error from the server. - */ -ssize_t -pqsecure_raw_write(PGconn *conn, const void *ptr, size_t len) -{ - ssize_t n; - int flags = 0; - int result_errno = 0; - char msgbuf[1024]; - char sebuf[PG_STRERROR_R_BUFLEN]; - - DECLARE_SIGPIPE_INFO(spinfo); - - /* - * If we already had a write failure, we will never again try to send data - * on that connection. Even if the kernel would let us, we've probably - * lost message boundary sync with the server. conn->write_failed - * therefore persists until the connection is reset, and we just discard - * all data presented to be written. - */ - if (conn->write_failed) - return len; - -#ifdef MSG_NOSIGNAL - if (conn->sigpipe_flag) - flags |= MSG_NOSIGNAL; - -retry_masked: -#endif /* MSG_NOSIGNAL */ - - DISABLE_SIGPIPE(conn, spinfo, return -1); - - n = send(conn->sock, ptr, len, flags); - - if (n < 0) - { - result_errno = SOCK_ERRNO; - - /* - * If we see an EINVAL, it may be because MSG_NOSIGNAL isn't available - * on this machine. So, clear sigpipe_flag so we don't try the flag - * again, and retry the send(). - */ -#ifdef MSG_NOSIGNAL - if (flags != 0 && result_errno == EINVAL) - { - conn->sigpipe_flag = false; - flags = 0; - goto retry_masked; - } -#endif /* MSG_NOSIGNAL */ - - /* Set error message if appropriate */ - switch (result_errno) - { -#ifdef EAGAIN - case EAGAIN: -#endif -#if defined(EWOULDBLOCK) && (!defined(EAGAIN) || (EWOULDBLOCK != EAGAIN)) - case EWOULDBLOCK: -#endif - case EINTR: - /* no error message, caller is expected to retry */ - break; - - case EPIPE: - /* Set flag for EPIPE */ - REMEMBER_EPIPE(spinfo, true); - - /* FALL THRU */ - - case ECONNRESET: - conn->write_failed = true; - /* Store error message in conn->write_err_msg, if possible */ - /* (strdup failure is OK, we'll cope later) */ - snprintf(msgbuf, sizeof(msgbuf), - libpq_gettext("server closed the connection unexpectedly\n" - "\tThis probably means the server terminated abnormally\n" - "\tbefore or while processing the request.")); - /* keep newline out of translated string */ - strlcat(msgbuf, "\n", sizeof(msgbuf)); - conn->write_err_msg = strdup(msgbuf); - /* Now claim the write succeeded */ - n = len; - break; - - default: - conn->write_failed = true; - /* Store error message in conn->write_err_msg, if possible */ - /* (strdup failure is OK, we'll cope later) */ - snprintf(msgbuf, sizeof(msgbuf), - libpq_gettext("could not send data to server: %s"), - SOCK_STRERROR(result_errno, - sebuf, sizeof(sebuf))); - /* keep newline out of translated string */ - strlcat(msgbuf, "\n", sizeof(msgbuf)); - conn->write_err_msg = strdup(msgbuf); - /* Now claim the write succeeded */ - n = len; - break; - } - } - - RESTORE_SIGPIPE(conn, spinfo); - - /* ensure we return the intended errno to caller */ - SOCK_ERRNO_SET(result_errno); - - return n; -} /* Dummy versions of SSL info functions, when built without SSL support */ #ifndef USE_SSL @@ -507,90 +183,3 @@ PQgssEncInUse(PGconn *conn) } #endif /* ENABLE_GSS */ - - -#if !defined(WIN32) - -/* - * Block SIGPIPE for this thread. This prevents send()/write() from exiting - * the application. - */ -int -pq_block_sigpipe(sigset_t *osigset, bool *sigpipe_pending) -{ - sigset_t sigpipe_sigset; - sigset_t sigset; - - sigemptyset(&sigpipe_sigset); - sigaddset(&sigpipe_sigset, SIGPIPE); - - /* Block SIGPIPE and save previous mask for later reset */ - SOCK_ERRNO_SET(pthread_sigmask(SIG_BLOCK, &sigpipe_sigset, osigset)); - if (SOCK_ERRNO) - return -1; - - /* We can have a pending SIGPIPE only if it was blocked before */ - if (sigismember(osigset, SIGPIPE)) - { - /* Is there a pending SIGPIPE? */ - if (sigpending(&sigset) != 0) - return -1; - - if (sigismember(&sigset, SIGPIPE)) - *sigpipe_pending = true; - else - *sigpipe_pending = false; - } - else - *sigpipe_pending = false; - - return 0; -} - -/* - * Discard any pending SIGPIPE and reset the signal mask. - * - * Note: we are effectively assuming here that the C library doesn't queue - * up multiple SIGPIPE events. If it did, then we'd accidentally leave - * ours in the queue when an event was already pending and we got another. - * As long as it doesn't queue multiple events, we're OK because the caller - * can't tell the difference. - * - * The caller should say got_epipe = false if it is certain that it - * didn't get an EPIPE error; in that case we'll skip the clear operation - * and things are definitely OK, queuing or no. If it got one or might have - * gotten one, pass got_epipe = true. - * - * We do not want this to change errno, since if it did that could lose - * the error code from a preceding send(). We essentially assume that if - * we were able to do pq_block_sigpipe(), this can't fail. - */ -void -pq_reset_sigpipe(sigset_t *osigset, bool sigpipe_pending, bool got_epipe) -{ - int save_errno = SOCK_ERRNO; - int signo; - sigset_t sigset; - - /* Clear SIGPIPE only if none was pending */ - if (got_epipe && !sigpipe_pending) - { - if (sigpending(&sigset) == 0 && - sigismember(&sigset, SIGPIPE)) - { - sigset_t sigpipe_sigset; - - sigemptyset(&sigpipe_sigset); - sigaddset(&sigpipe_sigset, SIGPIPE); - - sigwait(&sigpipe_sigset, &signo); - } - } - - /* Restore saved block mask */ - pthread_sigmask(SIG_SETMASK, osigset, NULL); - - SOCK_ERRNO_SET(save_errno); -} - -#endif /* !WIN32 */ diff --git a/src/interfaces/libpq/libpq-int.h b/src/interfaces/libpq/libpq-int.h index 7888199b0d..1314663213 100644 --- a/src/interfaces/libpq/libpq-int.h +++ b/src/interfaces/libpq/libpq-int.h @@ -81,6 +81,7 @@ typedef struct #endif #endif /* USE_OPENSSL */ +#include "common/io_stream.h" #include "common/pg_prng.h" /* @@ -456,6 +457,7 @@ struct pg_conn PGcmdQueueEntry *cmd_queue_recycle; /* Connection data */ + IoStream *io_stream; pgsocket sock; /* FD for socket, PGINVALID_SOCKET if * unconnected */ SockAddr laddr; /* Local address */ @@ -753,11 +755,6 @@ extern int pqWriteReady(PGconn *conn); extern int pqsecure_initialize(PGconn *, bool, bool); extern PostgresPollingStatusType pqsecure_open_client(PGconn *); -extern void pqsecure_close(PGconn *); -extern ssize_t pqsecure_read(PGconn *, void *ptr, size_t len); -extern ssize_t pqsecure_write(PGconn *, const void *ptr, size_t len); -extern ssize_t pqsecure_raw_read(PGconn *, void *ptr, size_t len); -extern ssize_t pqsecure_raw_write(PGconn *, const void *ptr, size_t len); #if !defined(WIN32) extern int pq_block_sigpipe(sigset_t *osigset, bool *sigpipe_pending); @@ -793,34 +790,6 @@ extern int pgtls_init(PGconn *conn, bool do_ssl, bool do_crypto); */ extern PostgresPollingStatusType pgtls_open_client(PGconn *conn); -/* - * Close SSL connection. - */ -extern void pgtls_close(PGconn *conn); - -/* - * Read data from a secure connection. - * - * On failure, this function is responsible for appending a suitable message - * to conn->errorMessage. The caller must still inspect errno, but only - * to determine whether to continue/retry after error. - */ -extern ssize_t pgtls_read(PGconn *conn, void *ptr, size_t len); - -/* - * Is there unread data waiting in the SSL read buffer? - */ -extern bool pgtls_read_pending(PGconn *conn); - -/* - * Write data to a secure connection. - * - * On failure, this function is responsible for appending a suitable message - * to conn->errorMessage. The caller must still inspect errno, but only - * to determine whether to continue/retry after error. - */ -extern ssize_t pgtls_write(PGconn *conn, const void *ptr, size_t len); - /* * Get the hash of the server certificate, for SCRAM channel binding type * tls-server-end-point. @@ -851,13 +820,6 @@ extern int pgtls_verify_peer_name_matches_certificate_guts(PGconn *conn, * Establish a GSSAPI-encrypted connection. */ extern PostgresPollingStatusType pqsecure_open_gss(PGconn *conn); - -/* - * Read and write functions for GSSAPI-encrypted connections, with internal - * buffering to handle nonblocking sockets. - */ -extern ssize_t pg_GSS_write(PGconn *conn, const void *ptr, size_t len); -extern ssize_t pg_GSS_read(PGconn *conn, void *ptr, size_t len); #endif /* === in fe-trace.c === */ -- 2.42.0