From 6c5e0830635d0bbcce868b0d0180008e77272263 Mon Sep 17 00:00:00 2001 From: Jacob Burroughs Date: Fri, 15 Dec 2023 12:21:08 -0600 Subject: [PATCH v1 2/5] Add protocol-layer compression to libpq Adds libpq_compression and libpq_fe_compression GUCs to coordinate algorithms between frontend and backend. Adds CompressedMessage message to send compressed data and select a particular negotiated protocol respectively. Supported compression algorithms are zlib (gzip), lz4, and zstd. --- contrib/postgres_fdw/connection.c | 26 +- doc/src/sgml/config.sgml | 26 + doc/src/sgml/libpq.sgml | 50 + doc/src/sgml/protocol.sgml | 55 + meson.build | 4 +- src/backend/libpq/Makefile | 1 + src/backend/libpq/compression.c | 127 +++ src/backend/libpq/meson.build | 1 + src/backend/libpq/pqcomm.c | 86 +- src/backend/postmaster/postmaster.c | 22 +- .../libpqwalreceiver/libpqwalreceiver.c | 15 +- src/backend/utils/misc/guc_tables.c | 12 + src/backend/utils/misc/postgresql.conf.sample | 3 + src/bin/pgbench/pgbench.c | 17 +- src/bin/psql/command.c | 18 + src/common/Makefile | 4 +- src/common/compression.c | 145 ++- src/common/meson.build | 2 + src/common/z_stream.c | 995 ++++++++++++++++ src/common/zpq_stream.c | 1004 +++++++++++++++++ src/include/common/compression.h | 8 +- src/include/common/z_stream.h | 98 ++ src/include/common/zpq_stream.h | 91 ++ src/include/libpq/compression.h | 30 + src/include/libpq/libpq-be.h | 3 + src/include/libpq/libpq.h | 1 + src/include/libpq/protocol.h | 2 +- src/include/utils/guc_hooks.h | 2 + src/interfaces/libpq/exports.txt | 3 + src/interfaces/libpq/fe-connect.c | 123 +- src/interfaces/libpq/fe-exec.c | 15 + src/interfaces/libpq/fe-misc.c | 47 +- src/interfaces/libpq/fe-protocol3.c | 92 +- src/interfaces/libpq/libpq-fe.h | 4 + src/interfaces/libpq/libpq-int.h | 18 +- src/tools/msvc/Mkvcbuild.pm | 3 +- 36 files changed, 3016 insertions(+), 137 deletions(-) create mode 100644 src/backend/libpq/compression.c create mode 100644 src/common/z_stream.c create mode 100644 src/common/zpq_stream.c create mode 100644 src/include/common/z_stream.h create mode 100644 src/include/common/zpq_stream.h create mode 100644 src/include/libpq/compression.h diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c index 5800c6a9fb..82c7b17b58 100644 --- a/contrib/postgres_fdw/connection.c +++ b/contrib/postgres_fdw/connection.c @@ -872,11 +872,14 @@ pgfdw_get_result(PGconn *conn, const char *query) pgfdw_we_get_result = WaitEventExtensionNew("PostgresFdwGetResult"); /* Sleep until there's something to do */ - wc = WaitLatchOrSocket(MyLatch, - WL_LATCH_SET | WL_SOCKET_READABLE | - WL_EXIT_ON_PM_DEATH, - PQsocket(conn), - -1L, pgfdw_we_get_result); + if (PQreadPending(conn)) + wc = WL_SOCKET_READABLE; + else + wc = WaitLatchOrSocket(MyLatch, + WL_LATCH_SET | WL_SOCKET_READABLE | + WL_EXIT_ON_PM_DEATH, + PQsocket(conn), + -1L, pgfdw_we_get_result); ResetLatch(MyLatch); CHECK_FOR_INTERRUPTS(); @@ -1580,11 +1583,14 @@ pgfdw_get_cleanup_result(PGconn *conn, TimestampTz endtime, PGresult **result, pgfdw_we_cleanup_result = WaitEventExtensionNew("PostgresFdwCleanupResult"); /* Sleep until there's something to do */ - wc = WaitLatchOrSocket(MyLatch, - WL_LATCH_SET | WL_SOCKET_READABLE | - WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, - PQsocket(conn), - cur_timeout, pgfdw_we_cleanup_result); + if (PQreadPending(conn)) + wc = WL_SOCKET_READABLE; + else + wc = WaitLatchOrSocket(MyLatch, + WL_LATCH_SET | WL_SOCKET_READABLE | + WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, + PQsocket(conn), + cur_timeout, pgfdw_we_cleanup_result); ResetLatch(MyLatch); CHECK_FOR_INTERRUPTS(); diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml index 44cada2b40..506a1a1945 100644 --- a/doc/src/sgml/config.sgml +++ b/doc/src/sgml/config.sgml @@ -1066,6 +1066,32 @@ include_dir 'conf.d' + + libpq_compression (string) + + libpq_compression configuration parameter + + + + + This parameter controls the available client-server traffic compression methods. + It allows rejecting compression requests even if it is supported by the server (for example, due to security, or CPU consumption). + The default is off, which means that no compression methods are allowed. + The value on means all supported compression methods are allowed. + For more precise control, a list of the allowed compression methods can be specified. + For example, to allow only lz4 and gzip, set the setting to lz4;gzip. + The server will choose the first algorithm from the list also supported by a given client. + none is also allowed when specifying a list, and if selected means that the server + will send messages uncompressed, but may still receive compressed messages if the list includes them. + This is most useful if e.g. you want to enable compression for client-to-server traffic + but not client-to-server traffic, using e.g. none;gzip;lz4 as the + parameter. Also, compression level can be specified for each method, e.g. lz4:1;gzip:2 + setting will set the compression level for lz4 to 1 and gzip + to 2. The default compression level for each algorithm is chosen by the underlying library. + + + + diff --git a/doc/src/sgml/libpq.sgml b/doc/src/sgml/libpq.sgml index ed88ac001a..b5b9ecc2d6 100644 --- a/doc/src/sgml/libpq.sgml +++ b/doc/src/sgml/libpq.sgml @@ -1354,6 +1354,56 @@ postgresql://%2Fvar%2Flib%2Fpostgresql/dbname + + compression + + + Request compression of libpq traffic. The client sends a request with + a list of compression algorithms. Compression can be requested by a client + by including the "compression" option in its connection string. + This can either be a boolean value to enable or disable compression + ("true"/"false", + "on"/"off", + "yes"/"no", + "1"/"0"), + or an explicit list of comma-separated compression algorithms + which can optionally include compression level ("gzip;lz4:2"). + The default value is "off". + If compression is enabled but an algorithm is not explicitly specified, + the client library sends its full list of supported algorithms. + The server sends its list of supported parameters in the libpq_compression + parameter. + Both the client and server will prefer the first algorithm in their list that is supported by + the other side. + "none" is also allowed in the list, and if selected means that the client + (or server) will send messages uncompressed, but depending on the other values it still may + receive compressed messages. + This is most useful if e.g. you want to enable compression for server-to-client traffic + but not client-to-server traffic, using e.g. "none;gzip;lz4" as the + compression parameter. + + + After receiving a startup packet with _pq_.libpq_compression set, the + server can send CompressedData messages referencing any of the specified algorithms, for + server-to-client traffic compression. + After receiving a parameter status message with libpq_compression set, + the client can send CompressedData messages referencing any of the specified algorithms for + client-to-server traffic compression. (Note that if the client has not requested the + _pq_.libpq_compression protocol extension in the startup packet, the + server may reject all CompressedData messages even if libpq_compression + is non-empty.) + + + Support for compression algorithms must be enabled when the server is compiled. + Currently, three algorithms are supported: gzip (default), lz4 (if Postgres was + configured with --with-lz4 option), and zstd (if Postgres was configured with + --with-zstd option). In all cases, streaming mode is used. + Please note that using compression together with SSL may expose extra vulnerabilities: + CRIME + + + + client_encoding diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml index af3f016f74..700e125b3c 100644 --- a/doc/src/sgml/protocol.sgml +++ b/doc/src/sgml/protocol.sgml @@ -92,6 +92,15 @@ such as COPY. + + The protocol supports compressing data to reduce traffic and speed-up client-server interaction. + Compression is especially useful for importing/exporting data to/from the database using the COPY command + and for replication (both physical and logical). Compression can also reduce the server's response time + for queries returning a large amount of data (for example, JSON, BLOBs, text, ...). + Currently, three algorithms are supported: DEFLATE (if PostgreSQL was built with zlib support), + LZ4 (if PostgreSQL was built with lz4 support), and ZStandard (if PostgresSQL was built with zstd support). + + Messaging Overview @@ -4115,6 +4124,52 @@ psql "dbname=postgres replication=database" -c "IDENTIFY_SYSTEM;" + + CompressedData (F & B) + + + + + + Byte1('z') + + + Identifies the message as compressed data. + + + + + Int32 + + + Length of message contents in bytes, including self. + + + + + Int8 + + + Selected compression algorithm, as specified in the pg_compress_algorithm enum. + + + + + + Byten + + + + Compressed message data. + + + + + + + + + CopyData (F & B) diff --git a/meson.build b/meson.build index 52c2a37c41..99e3691589 100644 --- a/meson.build +++ b/meson.build @@ -2794,14 +2794,14 @@ frontend_common_code = declare_dependency( compile_args: ['-DFRONTEND'], include_directories: [postgres_inc], sources: generated_headers, - dependencies: [os_deps, zlib, zstd], + dependencies: [os_deps, lz4, zlib, zstd], ) backend_common_code = declare_dependency( compile_args: ['-DBUILDING_DLL'], include_directories: [postgres_inc], sources: generated_headers, - dependencies: [os_deps, zlib, zstd], + dependencies: [os_deps, lz4, zlib, zstd], ) subdir('src/common') diff --git a/src/backend/libpq/Makefile b/src/backend/libpq/Makefile index 6d385fd6a4..f4b09c09c3 100644 --- a/src/backend/libpq/Makefile +++ b/src/backend/libpq/Makefile @@ -21,6 +21,7 @@ OBJS = \ be-fsstubs.o \ be-secure-common.o \ be-secure.o \ + compression.o \ crypt.o \ hba.o \ ifaddr.o \ diff --git a/src/backend/libpq/compression.c b/src/backend/libpq/compression.c new file mode 100644 index 0000000000..bb17a23231 --- /dev/null +++ b/src/backend/libpq/compression.c @@ -0,0 +1,127 @@ +/*------------------------------------------------------------------------- +* + * compression.c + * Functions and variables to support backend configuration of libpq + * + * + * Copyright (c) 2023, PostgreSQL Global Development Group + * + * src/backend/libpq/compression.c + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" +#include "miscadmin.h" +#include "libpq/libpq-be.h" +#include "libpq/compression.h" +#include "utils/guc_hooks.h" + +/* GUC variable containing the allowed compression algorithms list (separated by semicolon) */ +char *libpq_compress_algorithms = "off"; +pg_compress_specification libpq_compressors[COMPRESSION_ALGORITHM_COUNT]; +size_t libpq_n_compressors = 0; + +bool +check_libpq_compression(char **newval, void **extra, GucSource source) +{ + pg_compress_specification compressors[COMPRESSION_ALGORITHM_COUNT]; + size_t n_compressors; + char *serialized_compressors; + + if (zpq_parse_compression_setting(*newval, compressors, &n_compressors) == -1) + { + GUC_check_errdetail("Cannot parse the libpq_compression setting."); + return false; + } + + if (n_compressors > 0) + { + guc_free(*newval); + serialized_compressors = zpq_serialize_compressors(compressors, n_compressors); + *newval = guc_strdup(ERROR, serialized_compressors); + pfree(serialized_compressors); + } + else + { + guc_free(*newval); + *newval = guc_strdup(ERROR, ""); + } + return true; +} + +void +assign_libpq_compression(const char *newval, void *extra) +{ + if (strlen(newval) == 0) + { + libpq_n_compressors = 0; + return; + } + zpq_parse_compression_setting(newval, libpq_compressors, &libpq_n_compressors); +} + +void +configure_libpq_compression(Port *port, const char *newval) +{ + pg_compress_specification fe_compressors[COMPRESSION_ALGORITHM_COUNT]; + size_t n_fe_compressors; + + Assert(!port->zpq_stream); + + if (libpq_n_compressors == 0) + { + return; + } + + /* Init compression */ + port->zpq_stream = zpq_create(libpq_compressors, libpq_n_compressors, MyProcPort->io_stream); + if (!port->zpq_stream) + { + ereport(FATAL, + errcode(ERRCODE_INTERNAL_ERROR), + errmsg("failed to initialize the compression stream")); + } + + if (strlen(newval) == 0) + { + return; + } + + if (!zpq_deserialize_compressors(newval, fe_compressors, &n_fe_compressors)) + { + ereport(FATAL, + errcode(ERRCODE_INTERNAL_ERROR), + errmsg("Cannot parse the _pq_.libpq_compression setting.")); + } + + if (MyProcPort && MyProcPort->zpq_stream) + { + pg_compress_algorithm algorithms[COMPRESSION_ALGORITHM_COUNT]; + size_t n_algorithms = 0; + + if (n_fe_compressors == 0) + { + return; + } + + /* + * Intersect client and server compressors to determine the final list + * of the supported compressors. O(N^2) is negligible because of a + * small number of the compression methods. + */ + for (size_t i = 0; i < libpq_n_compressors; i++) + { + for (size_t j = 0; j < n_fe_compressors; j++) + { + if (libpq_compressors[i].algorithm == fe_compressors[j].algorithm) + { + algorithms[n_algorithms] = libpq_compressors[i].algorithm; + n_algorithms += 1; + break; + } + } + } + + zpq_enable_compression(MyProcPort->zpq_stream, algorithms, n_algorithms); + } +} diff --git a/src/backend/libpq/meson.build b/src/backend/libpq/meson.build index 74a226c2bd..83a21a7566 100644 --- a/src/backend/libpq/meson.build +++ b/src/backend/libpq/meson.build @@ -7,6 +7,7 @@ backend_sources += files( 'be-fsstubs.c', 'be-secure-common.c', 'be-secure.c', + 'compression.c', 'crypt.c', 'hba.c', 'ifaddr.c', diff --git a/src/backend/libpq/pqcomm.c b/src/backend/libpq/pqcomm.c index 030686cc3b..60363ec1e6 100644 --- a/src/backend/libpq/pqcomm.c +++ b/src/backend/libpq/pqcomm.c @@ -61,6 +61,7 @@ #include #include #include +#include #include #include #include @@ -76,7 +77,9 @@ #include "common/ip.h" #include "common/io_stream.h" +#include "common/zpq_stream.h" #include "libpq/libpq.h" +#include "libpq/pqformat.h" #include "miscadmin.h" #include "port/pg_bswap.h" #include "storage/ipc.h" @@ -84,6 +87,7 @@ #include "utils/guc_hooks.h" #include "utils/memutils.h" #include "utils/wait_event.h" +#include "utils/builtins.h" /* * Cope with the various platform-specific ways to spell TCP keepalive socket @@ -1133,11 +1137,12 @@ retry: /* -------------------------------- * pq_recvbuf - load some bytes into the input buffer * - * returns 0 if OK, EOF if trouble + * nowait parameter toggles non-blocking mode. + * returns number of read bytes, EOF if trouble * -------------------------------- */ static int -pq_recvbuf(void) +pq_recvbuf(bool nowait) { if (PqRecvPointer > 0) { @@ -1153,8 +1158,8 @@ pq_recvbuf(void) PqRecvLength = PqRecvPointer = 0; } - /* Ensure that we're in blocking mode */ - socket_set_nonblocking(false); + /* Ensure that we're in the appropriate mode */ + socket_set_nonblocking(nowait); /* Can fill buffer from PqRecvLength and upwards */ for (;;) @@ -1168,9 +1173,23 @@ pq_recvbuf(void) if (r < 0) { + if (r == ZS_DECOMPRESS_ERROR) + { + char const *msg = zpq_decompress_error(MyProcPort->zpq_stream); + + if (msg == NULL) + msg = "end of stream"; + ereport(COMMERROR, + (errcode_for_socket_access(), + errmsg("failed to decompress data: %s", msg))); + return EOF; + } if (errno == EINTR) continue; /* Ok if interrupted */ + if (nowait && (errno == EAGAIN || errno == EWOULDBLOCK)) + return 0; + /* * Careful: an ereport() that tries to write to the client would * cause recursion to here, leading to stack overflow and core @@ -1194,7 +1213,7 @@ pq_recvbuf(void) } /* r contains number of bytes read, so just incr length */ PqRecvLength += r; - return 0; + return r; } } @@ -1209,7 +1228,8 @@ pq_getbyte(void) while (PqRecvPointer >= PqRecvLength) { - if (pq_recvbuf()) /* If nothing in buffer, then recv some */ + if (pq_recvbuf(false) == EOF) /* If nothing in buffer, then recv + * some */ return EOF; /* Failed to recv data */ } return (unsigned char) PqRecvBuffer[PqRecvPointer++]; @@ -1228,7 +1248,8 @@ pq_peekbyte(void) while (PqRecvPointer >= PqRecvLength) { - if (pq_recvbuf()) /* If nothing in buffer, then recv some */ + if (pq_recvbuf(false) == EOF) /* If nothing in buffer, then recv + * some */ return EOF; /* Failed to recv data */ } return (unsigned char) PqRecvBuffer[PqRecvPointer]; @@ -1249,49 +1270,12 @@ pq_getbyte_if_available(unsigned char *c) Assert(PqCommReadingMsg); - if (PqRecvPointer < PqRecvLength) + if (PqRecvPointer < PqRecvLength || (r = pq_recvbuf(true)) > 0) { *c = PqRecvBuffer[PqRecvPointer++]; return 1; } - /* Put the socket into non-blocking mode */ - socket_set_nonblocking(true); - - errno = 0; - - r = io_read_with_wait(MyProcPort, c, 1); - if (r < 0) - { - /* - * Ok if no data available without blocking or interrupted (though - * EINTR really shouldn't happen with a non-blocking socket). Report - * other errors. - */ - if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) - r = 0; - else - { - /* - * Careful: an ereport() that tries to write to the client would - * cause recursion to here, leading to stack overflow and core - * dump! This message must go *only* to the postmaster log. - * - * If errno is zero, assume it's EOF and let the caller complain. - */ - if (errno != 0) - ereport(COMMERROR, - (errcode_for_socket_access(), - errmsg("could not receive data from client: %m"))); - r = EOF; - } - } - else if (r == 0) - { - /* EOF detected */ - r = EOF; - } - return r; } @@ -1312,7 +1296,8 @@ pq_getbytes(char *s, size_t len) { while (PqRecvPointer >= PqRecvLength) { - if (pq_recvbuf()) /* If nothing in buffer, then recv some */ + if (pq_recvbuf(false) == EOF) /* If nothing in buffer, then recv + * some */ return EOF; /* Failed to recv data */ } amount = PqRecvLength - PqRecvPointer; @@ -1346,7 +1331,8 @@ pq_discardbytes(size_t len) { while (PqRecvPointer >= PqRecvLength) { - if (pq_recvbuf()) /* If nothing in buffer, then recv some */ + if (pq_recvbuf(false) == EOF) /* If nothing in buffer, then recv + * some */ return EOF; /* Failed to recv data */ } amount = PqRecvLength - PqRecvPointer; @@ -1574,7 +1560,7 @@ internal_flush(void) char *bufptr = PqSendBuffer + PqSendStart; char *bufend = PqSendBuffer + PqSendPointer; - while (bufptr < bufend) + while (bufptr < bufend || io_stream_buffered_write_data(MyProcPort->io_stream) != 0) { int rc; size_t bytes_sent; @@ -1647,7 +1633,7 @@ socket_flush_if_writable(void) int res; /* Quick exit if nothing to do */ - if (PqSendPointer == PqSendStart) + if ((PqSendPointer == PqSendStart) && (io_stream_buffered_write_data(MyProcPort->io_stream) == 0)) return 0; /* No-op if reentrant call */ @@ -1670,7 +1656,7 @@ socket_flush_if_writable(void) static bool socket_is_send_pending(void) { - return (PqSendStart < PqSendPointer); + return (PqSendStart < PqSendPointer || (io_stream_buffered_write_data(MyProcPort->io_stream) != 0)); } /* -------------------------------- diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c index 14122de017..be440a1e52 100644 --- a/src/backend/postmaster/postmaster.c +++ b/src/backend/postmaster/postmaster.c @@ -99,6 +99,7 @@ #include "common/string.h" #include "lib/ilist.h" #include "libpq/auth.h" +#include "libpq/compression.h" #include "libpq/libpq.h" #include "libpq/pqformat.h" #include "libpq/pqsignal.h" @@ -2200,12 +2201,15 @@ retry1: valptr), errhint("Valid values are: \"false\", 0, \"true\", 1, \"database\"."))); } + else if (strcmp(nameptr, "_pq_.libpq_compression") == 0) + { + configure_libpq_compression(port, valptr); + } else if (strncmp(nameptr, "_pq_.", 5) == 0) { /* * Any option beginning with _pq_. is reserved for use as a - * protocol-level option, but at present no such options are - * defined. + * protocol-level option. */ unrecognized_protocol_options = lappend(unrecognized_protocol_options, pstrdup(nameptr)); @@ -4380,7 +4384,21 @@ BackendInitialize(Port *port) * already did any appropriate error reporting. */ if (status != STATUS_OK) + { + /* + * Drain the socket receive buffer before exiting to allow for a + * clean (non-rst) shutdown, which ensures the client can read + * any error messages they may have in their local receive buffer + */ + char buffer[32]; + if(port->sock != PGINVALID_SOCKET) + { + pg_set_noblock(port->sock); + while(recv(port->sock, buffer, 32, 0) > 0) {} + closesocket(port->sock); + } proc_exit(0); + } /* * Now that we have the user and database name, we can set the process diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c index 4152d1ddbc..e0e887b06e 100644 --- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c +++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c @@ -727,12 +727,15 @@ libpqrcv_PQgetResult(PGconn *streamConn) * since we'll get interrupted by signals and can handle any * interrupts here. */ - rc = WaitLatchOrSocket(MyLatch, - WL_EXIT_ON_PM_DEATH | WL_SOCKET_READABLE | - WL_LATCH_SET, - PQsocket(streamConn), - 0, - WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE); + if (PQreadPending(streamConn)) + rc = WL_SOCKET_READABLE; + else + rc = WaitLatchOrSocket(MyLatch, + WL_EXIT_ON_PM_DEATH | WL_SOCKET_READABLE | + WL_LATCH_SET, + PQsocket(streamConn), + 0, + WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE); /* Interrupted? */ if (rc & WL_LATCH_SET) diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c index f7c9882f7c..6b805e9a7f 100644 --- a/src/backend/utils/misc/guc_tables.c +++ b/src/backend/utils/misc/guc_tables.c @@ -46,6 +46,7 @@ #include "common/scram-common.h" #include "jit/jit.h" #include "libpq/auth.h" +#include "libpq/compression.h" #include "libpq/libpq.h" #include "libpq/scram.h" #include "nodes/queryjumble.h" @@ -4505,6 +4506,17 @@ struct config_string ConfigureNamesString[] = NULL, NULL, NULL }, + { + {"libpq_compression", PGC_SIGHUP, CLIENT_CONN_OTHER, + gettext_noop("Sets the list of allowed libpq compression algorithms."), + NULL, + GUC_REPORT + }, + &libpq_compress_algorithms, + "off", + check_libpq_compression, assign_libpq_compression, NULL + }, + { {"application_name", PGC_USERSET, LOGGING_WHAT, gettext_noop("Sets the application name to be reported in statistics and logs."), diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample index cf9f283cfe..00d67cc6f6 100644 --- a/src/backend/utils/misc/postgresql.conf.sample +++ b/src/backend/utils/misc/postgresql.conf.sample @@ -74,6 +74,9 @@ # (change requires restart) #bonjour_name = '' # defaults to the computer name # (change requires restart) +#libpq_compression = off # on to allow all supported compression + # methods; off to disable all compression; + # semicolon separated list of algorithms to allow some # - TCP settings - # see "man tcp" for details diff --git a/src/bin/pgbench/pgbench.c b/src/bin/pgbench/pgbench.c index 2e1650d0ad..09d43cf960 100644 --- a/src/bin/pgbench/pgbench.c +++ b/src/bin/pgbench/pgbench.c @@ -7433,6 +7433,9 @@ threadRun(void *arg) int nsocks; /* number of sockets to be waited for */ pg_time_usec_t min_usec; pg_time_usec_t now = 0; /* set this only if needed */ + bool buffered_rx = false; /* true if some of the clients has + * data left in SSL/ZPQ read + * buffers */ /* * identify which client sockets should be checked for input, and @@ -7468,6 +7471,9 @@ threadRun(void *arg) */ int sock = PQsocket(st->con); + /* check if conn has buffered SSL / ZPQ read data */ + buffered_rx = buffered_rx || PQreadPending(st->con); + if (sock < 0) { pg_log_error("invalid socket: %s", PQerrorMessage(st->con)); @@ -7511,7 +7517,7 @@ threadRun(void *arg) { if (nsocks > 0) { - rc = wait_on_socket_set(sockets, min_usec); + rc = buffered_rx ? 1 : wait_on_socket_set(sockets, min_usec); } else /* nothing active, simple sleep */ { @@ -7520,7 +7526,7 @@ threadRun(void *arg) } else /* no explicit delay, wait without timeout */ { - rc = wait_on_socket_set(sockets, 0); + rc = buffered_rx ? 1 : wait_on_socket_set(sockets, 0); } if (rc < 0) @@ -7560,8 +7566,11 @@ threadRun(void *arg) pg_log_error("invalid socket: %s", PQerrorMessage(st->con)); goto done; } - - if (!socket_has_input(sockets, sock, nsocks++)) + if (PQreadPending(st->con)) + { + nsocks++; + } + else if (!socket_has_input(sockets, sock, nsocks++)) continue; } else if (st->state == CSTATE_FINISHED || diff --git a/src/bin/psql/command.c b/src/bin/psql/command.c index 82cc091568..4ed6fb5d21 100644 --- a/src/bin/psql/command.c +++ b/src/bin/psql/command.c @@ -172,6 +172,7 @@ static int count_lines_in_buf(PQExpBuffer buf); static void print_with_linenumbers(FILE *output, char *lines, bool is_func); static void minimal_error_message(PGresult *res); +static void printCompressionInfo(void); static void printSSLInfo(void); static void printGSSInfo(void); static bool printPsetInfo(const char *param, printQueryOpt *popt); @@ -676,6 +677,7 @@ exec_command_conninfo(PsqlScanState scan_state, bool active_branch) printf(_("You are connected to database \"%s\" as user \"%s\" on host \"%s\" at port \"%s\".\n"), db, PQuser(pset.db), host, PQport(pset.db)); } + printCompressionInfo(); printSSLInfo(); printGSSInfo(); } @@ -3821,6 +3823,22 @@ connection_warnings(bool in_startup) } } +/* + * printCompressionInfo + * + * Print information about used compressor/decompressor + */ +static void +printCompressionInfo(void) +{ + char *algorithms = PQcompression(pset.db); + + if (algorithms != NULL) + { + printf(_("Compression: server: %s, client: %s\n"), PQserverCompression(pset.db), algorithms); + pfree(algorithms); + } +} /* * printSSLInfo diff --git a/src/common/Makefile b/src/common/Makefile index a5cd11c6df..7917ecbe5e 100644 --- a/src/common/Makefile +++ b/src/common/Makefile @@ -83,7 +83,9 @@ OBJS_COMMON = \ unicode_norm.o \ username.o \ wait_error.o \ - wchar.o + wchar.o \ + z_stream.o \ + zpq_stream.o ifeq ($(with_ssl),openssl) OBJS_COMMON += \ diff --git a/src/common/compression.c b/src/common/compression.c index ee937623f0..c2b00aaf11 100644 --- a/src/common/compression.c +++ b/src/common/compression.c @@ -36,6 +36,16 @@ #include "common/compression.h" +#ifndef FRONTEND +#define ALLOC palloc +#define STRDUP pstrdup +#define FREE pfree +#else +#define ALLOC malloc +#define STRDUP strdup +#define FREE free +#endif + static int expect_integer_value(char *keyword, char *value, pg_compress_specification *result); static bool expect_boolean_value(char *keyword, char *value, @@ -113,7 +123,7 @@ parse_compress_specification(pg_compress_algorithm algorithm, char *specificatio /* Initial setup of result object. */ result->algorithm = algorithm; result->options = 0; - result->parse_error = NULL; + result->has_error = false; /* * Assign a default level depending on the compression method. This may @@ -128,27 +138,27 @@ parse_compress_specification(pg_compress_algorithm algorithm, char *specificatio #ifdef USE_LZ4 result->level = 0; /* fast compression mode */ #else - result->parse_error = - psprintf(_("this build does not support compression with %s"), - "LZ4"); + result->has_error = true; + snprintf(result->parse_error, 255, _("this build does not support compression with %s"), + "LZ4"); #endif break; case PG_COMPRESSION_ZSTD: #ifdef USE_ZSTD result->level = ZSTD_CLEVEL_DEFAULT; #else - result->parse_error = - psprintf(_("this build does not support compression with %s"), - "ZSTD"); + result->has_error = true; + snprintf(result->parse_error, 255, _("this build does not support compression with %s"), + "ZSTD"); #endif break; case PG_COMPRESSION_GZIP: #ifdef HAVE_LIBZ result->level = Z_DEFAULT_COMPRESSION; #else - result->parse_error = - psprintf(_("this build does not support compression with %s"), - "gzip"); + result->has_error = true; + snprintf(result->parse_error, 255, _("this build does not support compression with %s"), + "gzip"); #endif break; } @@ -201,20 +211,20 @@ parse_compress_specification(pg_compress_algorithm algorithm, char *specificatio /* Reject empty keyword. */ if (kwlen == 0) { - result->parse_error = - pstrdup(_("found empty string where a compression option was expected")); + result->has_error = true; + strlcpy(result->parse_error, _("found empty string where a compression option was expected"), 255); break; } /* Extract keyword and value as separate C strings. */ - keyword = palloc(kwlen + 1); + keyword = ALLOC(kwlen + 1); memcpy(keyword, kwstart, kwlen); keyword[kwlen] = '\0'; if (!has_value) value = NULL; else { - value = palloc(vlen + 1); + value = ALLOC(vlen + 1); memcpy(value, vstart, vlen); value[vlen] = '\0'; } @@ -240,13 +250,15 @@ parse_compress_specification(pg_compress_algorithm algorithm, char *specificatio result->options |= PG_COMPRESSION_OPTION_LONG_DISTANCE; } else - result->parse_error = - psprintf(_("unrecognized compression option: \"%s\""), keyword); + { + result->has_error = true; + snprintf(result->parse_error, 255, _("unrecognized compression option: \"%s\""), keyword); + } /* Release memory, just to be tidy. */ - pfree(keyword); + FREE(keyword); if (value != NULL) - pfree(value); + FREE(value); /* * If we got an error or have reached the end of the string, stop. @@ -256,7 +268,7 @@ parse_compress_specification(pg_compress_algorithm algorithm, char *specificatio * keyword cannot have been the end of the string, but the end of the * value might have been. */ - if (result->parse_error != NULL || + if (result->has_error || (vend == NULL ? *kwend == '\0' : *vend == '\0')) break; @@ -268,7 +280,7 @@ parse_compress_specification(pg_compress_algorithm algorithm, char *specificatio /* * Parse 'value' as an integer and return the result. * - * If parsing fails, set result->parse_error to an appropriate message + * If parsing fails, set result->has_error and write an appropriate message to result->parse_error * and return -1. */ static int @@ -279,18 +291,17 @@ expect_integer_value(char *keyword, char *value, pg_compress_specification *resu if (value == NULL) { - result->parse_error = - psprintf(_("compression option \"%s\" requires a value"), - keyword); + result->has_error = true; + snprintf(result->parse_error, 255, _("compression option \"%s\" requires a value"), keyword); return -1; } ivalue = strtol(value, &ivalue_endp, 10); if (ivalue_endp == value || *ivalue_endp != '\0') { - result->parse_error = - psprintf(_("value for compression option \"%s\" must be an integer"), - keyword); + result->has_error = true; + snprintf(result->parse_error, 255, _("value for compression option \"%s\" must be an integer"), + keyword); return -1; } return ivalue; @@ -299,8 +310,8 @@ expect_integer_value(char *keyword, char *value, pg_compress_specification *resu /* * Parse 'value' as a boolean and return the result. * - * If parsing fails, set result->parse_error to an appropriate message - * and return -1. The caller must check result->parse_error to determine if + * If parsing fails, set result->has_error and write an appropriate message to result->parse_error + * and return -1. The caller must check result->has_error to determine if * the call was successful. * * Valid values are: yes, no, on, off, 1, 0. @@ -327,9 +338,10 @@ expect_boolean_value(char *keyword, char *value, pg_compress_specification *resu if (pg_strcasecmp(value, "0") == 0) return false; - result->parse_error = - psprintf(_("value for compression option \"%s\" must be a Boolean value"), - keyword); + + result->has_error = true; + snprintf(result->parse_error, 255, _("value for compression option \"%s\" must be a Boolean value"), + keyword); return false; } @@ -348,7 +360,7 @@ validate_compress_specification(pg_compress_specification *spec) int default_level = 0; /* If it didn't even parse OK, it's definitely no good. */ - if (spec->parse_error != NULL) + if (spec->has_error) return spec->parse_error; /* @@ -376,16 +388,22 @@ validate_compress_specification(pg_compress_specification *spec) break; case PG_COMPRESSION_NONE: if (spec->level != 0) - return psprintf(_("compression algorithm \"%s\" does not accept a compression level"), - get_compress_algorithm_name(spec->algorithm)); + { + snprintf(spec->parse_error, 255, _("compression algorithm \"%s\" does not accept a compression level"), + get_compress_algorithm_name(spec->algorithm)); + return spec->parse_error; + } break; } if ((spec->level < min_level || spec->level > max_level) && spec->level != default_level) - return psprintf(_("compression algorithm \"%s\" expects a compression level between %d and %d (default at %d)"), - get_compress_algorithm_name(spec->algorithm), - min_level, max_level, default_level); + { + snprintf(spec->parse_error, 255, _("compression algorithm \"%s\" expects a compression level between %d and %d (default at %d)"), + get_compress_algorithm_name(spec->algorithm), + min_level, max_level, default_level); + return spec->parse_error; + } /* * Of the compression algorithms that we currently support, only zstd @@ -394,8 +412,9 @@ validate_compress_specification(pg_compress_specification *spec) if ((spec->options & PG_COMPRESSION_OPTION_WORKERS) != 0 && (spec->algorithm != PG_COMPRESSION_ZSTD)) { - return psprintf(_("compression algorithm \"%s\" does not accept a worker count"), - get_compress_algorithm_name(spec->algorithm)); + snprintf(spec->parse_error, 255, _("compression algorithm \"%s\" does not accept a worker count"), + get_compress_algorithm_name(spec->algorithm)); + return spec->parse_error; } /* @@ -405,13 +424,45 @@ validate_compress_specification(pg_compress_specification *spec) if ((spec->options & PG_COMPRESSION_OPTION_LONG_DISTANCE) != 0 && (spec->algorithm != PG_COMPRESSION_ZSTD)) { - return psprintf(_("compression algorithm \"%s\" does not support long-distance mode"), - get_compress_algorithm_name(spec->algorithm)); + snprintf(spec->parse_error, 255, _("compression algorithm \"%s\" does not support long-distance mode"), + get_compress_algorithm_name(spec->algorithm)); + return spec->parse_error; } return NULL; } +bool +supported_compression_algorithm(pg_compress_algorithm algorithm) +{ + switch (algorithm) + { + case PG_COMPRESSION_NONE: + return true; + case PG_COMPRESSION_GZIP: +#ifdef HAVE_LIBZ + return true; +#else + return false; +#endif + case PG_COMPRESSION_LZ4: +#ifdef USE_LZ4 + return true; +#else + return false; +#endif + case PG_COMPRESSION_ZSTD: +#ifdef USE_ZSTD + return true; +#else + return false; +#endif + /* no default, to provoke compiler warnings if values are added */ + } + Assert(false); + return false; /* placate compiler */ +} + #ifdef FRONTEND /* @@ -440,13 +491,13 @@ parse_compress_options(const char *option, char **algorithm, char **detail) { if (result == 0) { - *algorithm = pstrdup("none"); + *algorithm = STRDUP("none"); *detail = NULL; } else { - *algorithm = pstrdup("gzip"); - *detail = pstrdup(option); + *algorithm = STRDUP("gzip"); + *detail = STRDUP(option); } return; } @@ -458,19 +509,19 @@ parse_compress_options(const char *option, char **algorithm, char **detail) sep = strchr(option, ':'); if (sep == NULL) { - *algorithm = pstrdup(option); + *algorithm = STRDUP(option); *detail = NULL; } else { char *alg; - alg = palloc((sep - option) + 1); + alg = ALLOC((sep - option) + 1); memcpy(alg, option, sep - option); alg[sep - option] = '\0'; *algorithm = alg; - *detail = pstrdup(sep + 1); + *detail = STRDUP(sep + 1); } } #endif /* FRONTEND */ diff --git a/src/common/meson.build b/src/common/meson.build index 3e819108a1..35a70da8a5 100644 --- a/src/common/meson.build +++ b/src/common/meson.build @@ -36,6 +36,8 @@ common_sources = files( 'username.c', 'wait_error.c', 'wchar.c', + 'z_stream.c', + 'zpq_stream.c' ) if ssl.found() diff --git a/src/common/z_stream.c b/src/common/z_stream.c new file mode 100644 index 0000000000..eb34733702 --- /dev/null +++ b/src/common/z_stream.c @@ -0,0 +1,995 @@ +/*------------------------------------------------------------------------- + * + * z_stream.c + * Functions implementing streaming compression algorithms + * + * Copyright (c) 2018-2023, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/common/z_stream.c + * + *------------------------------------------------------------------------- + */ +#include "c.h" +#include "pg_config.h" +#include "common/z_stream.h" +#include "utils/elog.h" + +typedef struct +{ + /* + * Id of compression algorithm. + */ + pg_compress_algorithm algorithm; + + /* + * Create new compression stream. level: compression level + */ + void *(*create_compressor) (int level); + + /* + * Create new decompression stream. + */ + void *(*create_decompressor) (); + + /* + * Decompress up to "src_size" compressed bytes from *src and write up to + * "dst_size" raw (decompressed) bytes to *dst. Number of decompressed + * bytes written to *dst is stored in *dst_processed. Number of compressed + * bytes read from *src is stored in *src_processed. + * + * Return codes: ZS_OK if no errors were encountered during decompression + * attempt. This return code does not guarantee that *src_processed > 0 or + * *dst_processed > 0. + * + * ZS_STREAM_END if encountered end of compressed data stream. + * + * ZS_DECOMPRESS_ERROR if encountered an error during decompression + * attempt. + */ + int (*decompress) (void *ds, void const *src, size_t src_size, size_t *src_processed, void *dst, size_t dst_size, size_t *dst_processed); + + /* + * Returns true if there is some data left in internal decompression + * buffers + */ + bool (*decompress_buffered_data) (void *ds); + + /* + * Compress up to "src_size" raw (non-compressed) bytes from *src and + * write up to "dst_size" compressed bytes to *dst. Number of compressed + * bytes written to *dst is stored in *dst_processed. Number of + * non-compressed bytes read from *src is stored in *src_processed. + * + * Return codes: ZS_OK if no errors were encountered during compression + * attempt. This return code does not guarantee that *src_processed > 0 or + * *dst_processed > 0. + * + * ZS_COMPRESS_ERROR if encountered an error during compression attempt. + */ + int (*compress) (void *cs, void const *src, size_t src_size, size_t *src_processed, void *dst, size_t dst_size, size_t *dst_processed); + + /* + * Returns true if there is some data left in internal compression buffers + */ + bool (*compress_buffered_data) (void *ds); + + /* + * Free compression stream created by create_compressor function. + */ + void (*free_compressor) (void *cs); + + /* + * Free decompression stream created by create_decompressor function. + */ + void (*free_decompressor) (void *ds); + + /* + * Get compressor error message. + */ + char const *(*compress_error) (void *cs); + + /* + * Get decompressor error message. + */ + char const *(*decompress_error) (void *ds); + + int (*end_compression) (void *cs, void *dst, size_t dst_size, size_t *dst_processed); +} ZAlgorithm; + +struct ZStream +{ + ZAlgorithm const *algorithm; + void *stream; +}; + +#ifndef FRONTEND +#include "utils/palloc.h" +#include "utils/memutils.h" +#define ALLOC(size) MemoryContextAlloc(TopMemoryContext, size) +#define FREE(size) pfree(size) +#else +#define ALLOC(size) malloc(size) +#define FREE(size) free(size) +#endif + +#if HAVE_LIBZSTD + +#include +#include + +/* + * Maximum allowed back-reference distance, expressed as power of 2. + * This setting controls max compressor/decompressor window size. + * More details https://github.com/facebook/zstd/blob/v1.4.7/lib/zstd.h#L536 + */ +#define ZSTD_WINDOWLOG_LIMIT 23 /* set max window size to 8MB */ + + +typedef struct ZS_ZSTD_CStream +{ + ZSTD_CStream *stream; + bool has_buffered_data; + char const *error; /* error message */ +} ZS_ZSTD_CStream; + +typedef struct ZS_ZSTD_DStream +{ + ZSTD_DStream *stream; + bool has_buffered_data; + char const *error; /* error message */ +} ZS_ZSTD_DStream; + +static void * +zstd_create_compressor(int level) +{ + size_t rc; + ZS_ZSTD_CStream *c_stream = (ZS_ZSTD_CStream *) ALLOC(sizeof(ZS_ZSTD_CStream)); + + c_stream->stream = ZSTD_createCStream(); + c_stream->has_buffered_data = false; + rc = ZSTD_initCStream(c_stream->stream, level); + if (ZSTD_isError(rc)) + { + ZSTD_freeCStream(c_stream->stream); + FREE(c_stream); + return NULL; + } +#if ZSTD_VERSION_MAJOR > 1 || ZSTD_VERSION_MINOR > 3 + ZSTD_CCtx_setParameter(c_stream->stream, ZSTD_c_windowLog, ZSTD_WINDOWLOG_LIMIT); +#endif + c_stream->error = NULL; + return c_stream; +} + +static void * +zstd_create_decompressor() +{ + size_t rc; + ZS_ZSTD_DStream *d_stream = (ZS_ZSTD_DStream *) ALLOC(sizeof(ZS_ZSTD_DStream)); + + d_stream->stream = ZSTD_createDStream(); + d_stream->has_buffered_data = false; + rc = ZSTD_initDStream(d_stream->stream); + if (ZSTD_isError(rc)) + { + ZSTD_freeDStream(d_stream->stream); + FREE(d_stream); + return NULL; + } +#if ZSTD_VERSION_MAJOR > 1 || ZSTD_VERSION_MINOR > 3 + ZSTD_DCtx_setParameter(d_stream->stream, ZSTD_d_windowLogMax, ZSTD_WINDOWLOG_LIMIT); +#endif + d_stream->error = NULL; + return d_stream; +} + +static int +zstd_decompress(void *d_stream, void const *src, size_t src_size, size_t *src_processed, void *dst, size_t dst_size, size_t *dst_processed) +{ + ZS_ZSTD_DStream *ds = (ZS_ZSTD_DStream *) d_stream; + ZSTD_inBuffer in; + ZSTD_outBuffer out; + size_t rc; + + in.src = src; + in.pos = 0; + in.size = src_size; + + out.dst = dst; + out.pos = 0; + out.size = dst_size; + + rc = ZSTD_decompressStream(ds->stream, &out, &in); + + *src_processed = in.pos; + *dst_processed = out.pos; + if (ZSTD_isError(rc)) + { + ds->error = ZSTD_getErrorName(rc); + return ZS_DECOMPRESS_ERROR; + } + + if (rc == 0) + { + return ZS_STREAM_END; + } + + /* + * if `output.pos == output.size`, there might be some data left within + * internal buffers + */ + ds->has_buffered_data = out.pos == out.size; + + return ZS_OK; +} + +static bool +zstd_decompress_buffered_data(void *d_stream) +{ + ZS_ZSTD_DStream *ds = (ZS_ZSTD_DStream *) d_stream; + + return ds->has_buffered_data; +} + +static int +zstd_compress(void *c_stream, void const *src, size_t src_size, size_t *src_processed, void *dst, size_t dst_size, size_t *dst_processed) +{ + ZS_ZSTD_CStream *cs = (ZS_ZSTD_CStream *) c_stream; + ZSTD_inBuffer in; + ZSTD_outBuffer out; + + in.src = src; + in.pos = 0; + in.size = src_size; + + out.dst = dst; + out.pos = 0; + out.size = dst_size; + + if (in.pos < src_size) /* Has something to compress in input buffer */ + { + size_t rc = ZSTD_compressStream(cs->stream, &out, &in); + + *dst_processed = out.pos; + *src_processed = in.pos; + if (ZSTD_isError(rc)) + { + cs->error = ZSTD_getErrorName(rc); + return ZS_COMPRESS_ERROR; + } + } + + if (in.pos == src_size) /* All data is compressed: flush internal zstd + * buffer */ + { + size_t tx_not_flushed = ZSTD_flushStream(cs->stream, &out); + + *dst_processed = out.pos; + cs->has_buffered_data = tx_not_flushed > 0; + } + else + { + cs->has_buffered_data = false; + } + + return ZS_OK; +} + +static bool +zstd_compress_buffered_data(void *c_stream) +{ + ZS_ZSTD_CStream *cs = (ZS_ZSTD_CStream *) c_stream; + + return cs->has_buffered_data; +} + +static int +zstd_end(void *c_stream, void *dst, size_t dst_size, size_t *dst_processed) +{ + size_t tx_not_flushed; + ZS_ZSTD_CStream *cs = (ZS_ZSTD_CStream *) c_stream; + ZSTD_outBuffer output; + + output.dst = dst; + output.pos = 0; + output.size = dst_size; + + do + { + tx_not_flushed = ZSTD_endStream(cs->stream, &output); + } while ((tx_not_flushed > 0) && (output.pos < output.size)); + + *dst_processed = output.pos; + + cs->has_buffered_data = tx_not_flushed > 0; + return ZS_OK; +} + +static void +zstd_free_compressor(void *c_stream) +{ + ZS_ZSTD_CStream *cs = (ZS_ZSTD_CStream *) c_stream; + + if (cs != NULL) + { + ZSTD_freeCStream(cs->stream); + FREE(cs); + } +} + +static void +zstd_free_decompressor(void *d_stream) +{ + ZS_ZSTD_DStream *ds = (ZS_ZSTD_DStream *) d_stream; + + if (ds != NULL) + { + ZSTD_freeDStream(ds->stream); + FREE(ds); + } +} + +static char const * +zstd_compress_error(void *c_stream) +{ + ZS_ZSTD_CStream *cs = (ZS_ZSTD_CStream *) c_stream; + + return cs->error; +} + +static char const * +zstd_decompress_error(void *d_stream) +{ + ZS_ZSTD_DStream *ds = (ZS_ZSTD_DStream *) d_stream; + + return ds->error; +} + +static ZAlgorithm const zstd_algorithm = { + .algorithm = PG_COMPRESSION_ZSTD, + .create_compressor = zstd_create_compressor, + .create_decompressor = zstd_create_decompressor, + .decompress = zstd_decompress, + .decompress_buffered_data = zstd_decompress_buffered_data, + .compress = zstd_compress, + .compress_buffered_data = zstd_compress_buffered_data, + .free_compressor = zstd_free_compressor, + .free_decompressor = zstd_free_decompressor, + .compress_error = zstd_compress_error, + .decompress_error = zstd_decompress_error, + .end_compression = zstd_end +}; + +#endif + +#if HAVE_LIBZ + +#include +#include + + +static void * +zlib_create_compressor(int level) +{ + int rc; + z_stream *c_stream = (z_stream *) ALLOC(sizeof(z_stream)); + + memset(c_stream, 0, sizeof(*c_stream)); + rc = deflateInit(c_stream, level); + if (rc != Z_OK) + { + FREE(c_stream); + return NULL; + } + return c_stream; +} + +static void * +zlib_create_decompressor() +{ + int rc; + z_stream *d_stream = (z_stream *) ALLOC(sizeof(z_stream)); + + memset(d_stream, 0, sizeof(*d_stream)); + rc = inflateInit(d_stream); + if (rc != Z_OK) + { + FREE(d_stream); + return NULL; + } + return d_stream; +} + +static int +zlib_decompress(void *d_stream, void const *src, size_t src_size, size_t *src_processed, void *dst, size_t dst_size, size_t *dst_processed) +{ + z_stream *ds = (z_stream *) d_stream; + int rc; + + ds->next_in = (Bytef *) src; + ds->avail_in = src_size; + ds->next_out = (Bytef *) dst; + ds->avail_out = dst_size; + + rc = inflate(ds, Z_SYNC_FLUSH); + *src_processed = src_size - ds->avail_in; + *dst_processed = dst_size - ds->avail_out; + + if (rc == Z_STREAM_END) + { + return ZS_STREAM_END; + } + if (rc != Z_OK && rc != Z_BUF_ERROR) + { + return ZS_DECOMPRESS_ERROR; + } + + return ZS_OK; +} + +static bool +zlib_decompress_buffered_data(void *d_stream) +{ + z_stream *ds = (z_stream *) d_stream; + unsigned deflate_pending = 0; + + return deflatePending(ds, &deflate_pending, Z_NULL) > 0; +} + +static int +zlib_compress(void *c_stream, void const *src, size_t src_size, size_t *src_processed, void *dst, size_t dst_size, size_t *dst_processed) +{ + z_stream *cs = (z_stream *) c_stream; + int rc PG_USED_FOR_ASSERTS_ONLY; + + cs->next_out = (Bytef *) dst; + cs->avail_out = dst_size; + cs->next_in = (Bytef *) src; + cs->avail_in = src_size; + + rc = deflate(cs, Z_SYNC_FLUSH); + Assert(rc == Z_OK); + *dst_processed = dst_size - cs->avail_out; + *src_processed = src_size - cs->avail_in; + + return ZS_OK; +} + +static bool +zlib_compress_buffered_data(void *c_stream) +{ + return false; +} + +static int +zlib_end(void *c_stream, void *dst, size_t dst_size, size_t *dst_processed) +{ + z_stream *cs = (z_stream *) c_stream; + int rc; + + cs->next_out = (Bytef *) dst; + cs->avail_out = dst_size; + cs->next_in = NULL; + cs->avail_in = 0; + + rc = deflate(cs, Z_STREAM_END); + Assert(rc == Z_OK || rc == Z_STREAM_END); + *dst_processed = dst_size - cs->avail_out; + if (rc == Z_STREAM_END) + { + return ZS_OK; + } + + return rc; +} + +static void +zlib_free_compressor(void *c_stream) +{ + z_stream *cs = (z_stream *) c_stream; + + if (cs != NULL) + { + deflateEnd(cs); + FREE(cs); + } +} + +static void +zlib_free_decompressor(void *d_stream) +{ + z_stream *ds = (z_stream *) d_stream; + + if (ds != NULL) + { + inflateEnd(ds); + FREE(ds); + } +} + +static char const * +zlib_error(void *stream) +{ + z_stream *zs = (z_stream *) stream; + + return zs->msg; +} + +/* as with elsewhere in postgres, gzip really means zlib */ +static ZAlgorithm const zlib_algorithm = { + .algorithm = PG_COMPRESSION_GZIP, + .create_compressor = zlib_create_compressor, + .create_decompressor = zlib_create_decompressor, + .decompress = zlib_decompress, + .decompress_buffered_data = zlib_decompress_buffered_data, + .compress = zlib_compress, + .compress_buffered_data = zlib_compress_buffered_data, + .free_compressor = zlib_free_compressor, + .free_decompressor = zlib_free_decompressor, + .compress_error = zlib_error, + .decompress_error = zlib_error, + .end_compression = zlib_end +}; + +#endif + +#if USE_LZ4 +#include + +#define MESSAGE_MAX_BYTES 64 * 1024 +#define RING_BUFFER_BYTES (LZ4_DECODER_RING_BUFFER_SIZE(MESSAGE_MAX_BYTES)) + +typedef struct ZS_LZ4_CStream +{ + LZ4_stream_t *stream; + int level; + size_t buf_pos; + char *last_error; + char buf[RING_BUFFER_BYTES]; +} ZS_LZ4_CStream; + +typedef struct ZS_LZ4_DStream +{ + LZ4_streamDecode_t *stream; + size_t buf_pos; + size_t read_pos; + char *last_error; + char buf[RING_BUFFER_BYTES]; +} ZS_LZ4_DStream; + +static void * +lz4_create_compressor(int level) +{ + ZS_LZ4_CStream *c_stream = (ZS_LZ4_CStream *) ALLOC(sizeof(ZS_LZ4_CStream)); + + if (c_stream == NULL) + { + return NULL; + } + c_stream->stream = LZ4_createStream(); + c_stream->level = level; + c_stream->buf_pos = 0; + if (c_stream->stream == NULL) + { + FREE(c_stream); + return NULL; + } + return c_stream; +} + +static void * +lz4_create_decompressor() +{ + ZS_LZ4_DStream *d_stream = (ZS_LZ4_DStream *) ALLOC(sizeof(ZS_LZ4_DStream)); + + if (d_stream == NULL) + { + return NULL; + } + + d_stream->stream = LZ4_createStreamDecode(); + d_stream->buf_pos = 0; + d_stream->read_pos = 0; + if (d_stream->stream == NULL) + { + FREE(d_stream); + return NULL; + } + + return d_stream; +} +char last_error[256]; + +static int +lz4_decompress(void *d_stream, void const *src, size_t src_size, size_t *src_processed, void *dst, size_t dst_size, size_t *dst_processed) +{ + ZS_LZ4_DStream *ds = (ZS_LZ4_DStream *) d_stream; + size_t copyable; + char *decPtr; + int decBytes; + + if (ds->read_pos < ds->buf_pos) + { + decPtr = &ds->buf[ds->read_pos]; + copyable = Min(ds->buf_pos - ds->read_pos, dst_size); + memcpy(dst, decPtr, copyable); /* read msg length */ + *dst_processed = copyable; + *src_processed = 0; + ds->read_pos += copyable; + + if (ds->read_pos == ds->buf_pos && RING_BUFFER_BYTES - ds->buf_pos < MESSAGE_MAX_BYTES) + { + ds->buf_pos = 0; + ds->read_pos = 0; + } + return ZS_OK; + } + decPtr = &ds->buf[ds->buf_pos]; + + decBytes = LZ4_decompress_safe_continue(ds->stream, src, decPtr, (int) src_size, RING_BUFFER_BYTES - ds->buf_pos); + if (decBytes < 0) + { + sprintf(last_error, "LZ4 decompression failed (src_size %ld, dst_size %ld, error: %d)", src_size, RING_BUFFER_BYTES - ds->buf_pos, decBytes); + ds->last_error = last_error; +#ifndef FRONTEND + elog(ERROR, "%s", ds->last_error); +#else + return ZS_DECOMPRESS_ERROR; +#endif + } + + copyable = Min(decBytes, dst_size); + memcpy(dst, decPtr, copyable); + + *dst_processed = copyable; + *src_processed = src_size; + + ds->buf_pos += decBytes; + ds->read_pos += copyable; + /* only reset the ring buffer after the internal buffer is drained */ + if (copyable == decBytes && RING_BUFFER_BYTES - ds->buf_pos < MESSAGE_MAX_BYTES) + { + ds->buf_pos = 0; + ds->read_pos = 0; + } + + return ZS_OK; +} + +static bool +lz4_decompress_buffered_data(void *d_stream) +{ + ZS_LZ4_DStream *ds = (ZS_LZ4_DStream *) d_stream; + + return ds->read_pos < ds->buf_pos; +} + +static int +lz4_compress(void *c_stream, void const *src, size_t src_size, size_t *src_processed, void *dst, size_t dst_size, size_t *dst_processed) +{ + ZS_LZ4_CStream *cs = (ZS_LZ4_CStream *) c_stream; + int cmpBytes; + + src_size = Min(MESSAGE_MAX_BYTES, src_size); + + memcpy((char *) (cs->buf) + cs->buf_pos, src, src_size); /* write msg length */ + + if (dst_size < LZ4_compressBound(src_size)) + { + cs->last_error = "LZ4 compression failed: buffer not big enough"; +#ifndef FRONTEND + elog(ERROR, "%s", cs->last_error); +#else + return ZS_COMPRESS_ERROR; +#endif + } + + cmpBytes = LZ4_compress_fast_continue(cs->stream, (char *) (cs->buf) + cs->buf_pos, dst, (int) src_size, (int) dst_size, cs->level); + + if (cmpBytes < 0 || cmpBytes > MESSAGE_MAX_BYTES) + { + cs->last_error = "LZ4 compression failed"; +#ifndef FRONTEND + elog(ERROR, "%s", cs->last_error); +#else + return ZS_COMPRESS_ERROR; +#endif + } + + *dst_processed = cmpBytes; + *src_processed = src_size; + + cs->buf_pos += src_size; + if (cs->buf_pos >= RING_BUFFER_BYTES - MESSAGE_MAX_BYTES) + { + cs->buf_pos = 0; + } + return ZS_OK; +} + +static bool +lz4_compress_buffered_data(void *d_stream) +{ + return false; +} + + +static int +lz4_end(void *c_stream, void *dst, size_t dst_size, size_t *dst_processed) +{ + *dst_processed = 0; + return ZS_OK; +} + +static void +lz4_free_compressor(void *c_stream) +{ + ZS_LZ4_CStream *cs = (ZS_LZ4_CStream *) c_stream; + + if (cs != NULL) + { + if (cs->stream != NULL) + { + LZ4_freeStream(cs->stream); + } + FREE(cs); + } +} + +static void +lz4_free_decompressor(void *d_stream) +{ + ZS_LZ4_DStream *ds = (ZS_LZ4_DStream *) d_stream; + + if (ds != NULL) + { + if (ds->stream != NULL) + { + LZ4_freeStreamDecode(ds->stream); + } + FREE(ds); + } +} + +static char const * +lz4_compress_error(void *stream) +{ + ZS_LZ4_CStream *cs = (ZS_LZ4_CStream *) stream; + + /* lz4 doesn't have any explicit API to get the error names */ + return cs->last_error; +} + +static char const * +lz4_decompress_error(void *stream) +{ + ZS_LZ4_DStream *ds = (ZS_LZ4_DStream *) stream; + + /* lz4 doesn't have any explicit API to get the error names */ + return ds->last_error; +} + +static ZAlgorithm const lz4_algorithm = { + .algorithm = PG_COMPRESSION_LZ4, + .create_compressor = lz4_create_compressor, + .create_decompressor = lz4_create_decompressor, + .decompress = lz4_decompress, + .decompress_buffered_data = lz4_decompress_buffered_data, + .compress = lz4_compress, + .compress_buffered_data = lz4_compress_buffered_data, + .free_compressor = lz4_free_compressor, + .free_decompressor = lz4_free_decompressor, + .compress_error = lz4_compress_error, + .decompress_error = lz4_decompress_error, + .end_compression = lz4_end +}; + +#endif + +static const ZAlgorithm * +zs_find_algorithm(pg_compress_algorithm algorithm) +{ +#if HAVE_LIBZ + if (algorithm == PG_COMPRESSION_GZIP) + { + return &zlib_algorithm; + } +#endif +#if USE_LZ4 + if (algorithm == PG_COMPRESSION_LZ4) + { + return &lz4_algorithm; + } +#endif +#if HAVE_LIBZSTD + if (algorithm == PG_COMPRESSION_ZSTD) + { + return &zstd_algorithm; + } +#endif + return NULL; +} + +static int +zs_init_compressor(ZStream * zs, pg_compress_specification *spec) +{ + const ZAlgorithm *algorithm = zs_find_algorithm(spec->algorithm); + + if (algorithm == NULL) + { + return -1; + } + zs->algorithm = algorithm; + zs->stream = zs->algorithm->create_compressor(spec->level); + if (zs->stream == NULL) + { + return -1; + } + return 0; +} + +static int +zs_init_decompressor(ZStream * zs, pg_compress_specification *spec) +{ + const ZAlgorithm *algorithm = zs_find_algorithm(spec->algorithm); + + if (algorithm == NULL) + { + return -1; + } + zs->algorithm = algorithm; + zs->stream = zs->algorithm->create_decompressor(); + if (zs->stream == NULL) + { + return -1; + } + return 0; +} + +ZStream * +zs_create_compressor(pg_compress_specification *spec) +{ + ZStream *zs = (ZStream *) ALLOC(sizeof(ZStream)); + + if (zs_init_compressor(zs, spec)) + { + FREE(zs); + return NULL; + } + + return zs; +} + +ZStream * +zs_create_decompressor(pg_compress_specification *spec) +{ + ZStream *zs = (ZStream *) ALLOC(sizeof(ZStream)); + + if (zs_init_decompressor(zs, spec)) + { + FREE(zs); + return NULL; + } + + return zs; +} + +int +zs_read(ZStream * zs, void const *src, size_t src_size, size_t *src_processed, void *dst, size_t dst_size, size_t *dst_processed) +{ + int rc; + + *src_processed = 0; + *dst_processed = 0; + + rc = zs->algorithm->decompress(zs->stream, + src, src_size, src_processed, + dst, dst_size, dst_processed); + + if (rc == ZS_OK || rc == ZS_INCOMPLETE_SRC || rc == ZS_STREAM_END) + { + return rc; + } + + return ZS_DECOMPRESS_ERROR; +} + +int +zs_write(ZStream * zs, void const *buf, size_t size, size_t *processed, void *dst, size_t dst_size, size_t *dst_processed) +{ + int rc; + + *processed = 0; + *dst_processed = 0; + + rc = zs->algorithm->compress(zs->stream, + buf, size, processed, + dst, dst_size, dst_processed); + + if (rc != ZS_OK) + { + return ZS_COMPRESS_ERROR; + } + + return rc; +} + +void +zs_compressor_free(ZStream * zs) +{ + if (zs == NULL) + { + return; + } + + if (zs->stream) + { + zs->algorithm->free_compressor(zs->stream); + } + + FREE(zs); +} + +void +zs_decompressor_free(ZStream * zs) +{ + if (zs == NULL) + { + return; + } + + if (zs->stream) + { + zs->algorithm->free_decompressor(zs->stream); + } + + FREE(zs); +} + +int +zs_end_compression(ZStream * zs, void *dst, size_t dst_size, size_t *dst_processed) +{ + int rc; + + *dst_processed = 0; + + rc = zs->algorithm->end_compression(zs->stream, dst, dst_size, dst_processed); + + if (rc != ZS_OK) + { + return ZS_COMPRESS_ERROR; + } + + return rc; +} + +char const * +zs_compress_error(ZStream * zs) +{ + return zs->algorithm->compress_error(zs->stream); +} + +char const * +zs_decompress_error(ZStream * zs) +{ + return zs->algorithm->decompress_error(zs->stream); +} + +bool +zs_compress_buffered(ZStream * zs) +{ + return zs ? zs->algorithm->compress_buffered_data(zs->stream) : false; +} + +bool +zs_decompress_buffered(ZStream * zs) +{ + return zs ? zs->algorithm->decompress_buffered_data(zs->stream) : false; +} + +pg_compress_algorithm +zs_algorithm(ZStream * zs) +{ + return zs ? zs->algorithm->algorithm : PG_COMPRESSION_NONE; +} diff --git a/src/common/zpq_stream.c b/src/common/zpq_stream.c new file mode 100644 index 0000000000..222a776933 --- /dev/null +++ b/src/common/zpq_stream.c @@ -0,0 +1,1004 @@ +/*------------------------------------------------------------------------- + * + * zpq_stream.c + * IO stream layer applying ZStream compression to libpq + * + * Copyright (c) 2018-2023, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/common/zpq_stream.c + * + *------------------------------------------------------------------------- + */ +#ifndef FRONTEND +#include "postgres.h" +#else +#include "postgres_fe.h" +#endif +#include +#include + +#include "common/zpq_stream.h" + +#include +#include + +#include "pg_config.h" +#include "port/pg_bswap.h" + +/* log warnings on backend */ +#ifndef FRONTEND +#define pg_log_warning(...) elog(WARNING, __VA_ARGS__) +#else +#define pg_log_warning(...) (void)0 +#endif + +#ifndef FRONTEND +#include "utils/memutils.h" +#define ALLOC(size) MemoryContextAlloc(TopMemoryContext, size) +#define STRDUP(str) pstrdup(str) +#define FREE(size) pfree(size) +#else +#define ALLOC(size) malloc(size) +#define STRDUP(str) strdup(str) +#define FREE(size) free(size) +#endif + +/* ZpqBuffer size, in bytes */ +#define ZPQ_BUFFER_SIZE 8192000 + +#define ZPQ_COMPRESS_THRESHOLD 60 + +/* startup messages have no type field and therefore have a null first byte */ +#define MESSAGE_TYPE_OFFSET(msg_type) (msg_type == '\0' ? 0 : 1) + +typedef struct ZpqBuffer ZpqBuffer; + + +/* ZpqBuffer used as RX/TX buffer in ZpqStream */ +struct ZpqBuffer +{ + char buf[ZPQ_BUFFER_SIZE]; + size_t size; /* current size of buf */ + size_t pos; /* current position in buf, in range [0, size] */ +}; + +/* + * Write up to "src_size" raw (decompressed) bytes. + * Returns number of written raw bytes or error code. + * Error code is either ZPQ_COMPRESS_ERROR or error code returned by the tx function. + * In the last case number of bytes written is stored in *src_processed. + */ +static int zpq_write(IoStreamLayer * layer, ZpqStream * zpq, void const *src, size_t src_size, size_t *bytes_written); + +/* + * Read up to "dst_size" raw (decompressed) bytes. + * Returns number of decompressed bytes or error code. + * Error code is either ZPQ_DECOMPRESS_ERROR or error code returned by the rx function. + */ +static ssize_t zpq_read(IoStreamLayer * layer, ZpqStream * zpq, void *dst, size_t dst_size, bool buffered_only); + +/* + * Return true if non-flushed data left in internal rx decompression buffer. + */ +static bool zpq_buffered_rx(ZpqStream * zpq); + +/* + * Return true if non-flushed data left in internal tx compression buffer. + */ +static bool zpq_buffered_tx(ZpqStream * zpq); + +/* + * Free stream created by zs_create function. + */ +static void zpq_free(ZpqStream * zpq); + +IoStreamProcessor zpq_processor = { + .read = (io_stream_read_func) zpq_read, + .write = (io_stream_write_func) zpq_write, + .buffered_read_data = (io_stream_predicate) zpq_buffered_rx, + .buffered_write_data = (io_stream_predicate) zpq_buffered_tx, + .destroy = (io_stream_destroy_func) zpq_free +}; + +static inline void +zpq_buf_init(ZpqBuffer * zb) +{ + zb->size = 0; + zb->pos = 0; +} + +static inline size_t +zpq_buf_left(ZpqBuffer * zb) +{ + Assert(zb->buf); + return ZPQ_BUFFER_SIZE - zb->size; +} + +static inline size_t +zpq_buf_unread(ZpqBuffer * zb) +{ + return zb->size - zb->pos; +} + +static inline char * +zpq_buf_size(ZpqBuffer * zb) +{ + return (char *) (zb->buf) + zb->size; +} + +static inline char * +zpq_buf_pos(ZpqBuffer * zb) +{ + return (char *) (zb->buf) + zb->pos; +} + +static inline void +zpq_buf_size_advance(ZpqBuffer * zb, size_t value) +{ + zb->size += value; +} + +static inline void +zpq_buf_pos_advance(ZpqBuffer * zb, size_t value) +{ + zb->pos += value; +} + +static inline void +zpq_buf_reuse(ZpqBuffer * zb) +{ + size_t unread = zpq_buf_unread(zb); + + if (unread > 5) /* can read message header, don't do anything */ + return; + if (unread == 0) + { + zb->size = 0; + zb->pos = 0; + return; + } + memmove(zb->buf, zb->buf + zb->pos, unread); + zb->size = unread; + zb->pos = 0; +} + +struct ZpqStream +{ + ZStream *c_stream; /* underlying compression stream */ + ZStream *d_stream; /* underlying decompression stream */ + + bool is_compressing; /* current compression state */ + + bool is_decompressing; /* current decompression state */ + bool reading_compressed_header; /* compression header processing + * incomplete */ + size_t rx_msg_bytes_left; /* number of bytes left to process without + * changing the decompression state */ + size_t tx_msg_bytes_left; /* number of bytes left to process without + * changing the compression state */ + + ZpqBuffer rx_in; /* buffer for unprocessed data read from next + * stream layer */ + ZpqBuffer tx_in; /* buffer for unprocessed data consumed by + * zpq_write */ + ZpqBuffer tx_out; /* buffer for processed data waiting for send + * to next stream layer */ + + pg_compress_specification *compressors; /* compressors array holds the + * available compressors to use + * for compression/decompression */ + size_t n_compressors; /* size of the compressors array */ + pg_compress_algorithm compress_algs[COMPRESSION_ALGORITHM_COUNT]; /* array of compression + * algorithms supported + * by the reciever of + * the stream */ + pg_compress_algorithm compress_alg; /* active compression algorithm */ + pg_compress_algorithm decompress_alg; /* active decompression algorithm */ +}; + +/* + * Choose the algorithm to use for the message of msg_type with msg_len. + * Returns a pg_compress_algorithm with a registered compressor, or PG_COMPRESSION_NONE if no compressor is appropriate + */ +static inline pg_compress_algorithm +zpq_choose_algorithm(ZpqStream * zpq, char msg_type, uint32 msg_len) +{ + /* + * in theory we could choose the algorithm based on the message type + * and/or more complex heuristics, but at least for now we will just use + * the first available algorithm (which defaults to none if compression + * has not yet been enabled) for message types that would most obviously + * benefit from compression + */ + if (msg_len >= ZPQ_COMPRESS_THRESHOLD && (msg_type == PqMsg_CopyData || msg_type == PqMsg_DataRow || msg_type == PqMsg_Query)) + { + return zpq->compress_algs[0]; + } + return PG_COMPRESSION_NONE; +} + +static inline bool +zpq_should_compress(ZpqStream * zpq, char msg_type, uint32 msg_len) +{ + return zpq_choose_algorithm(zpq, msg_type, msg_len) != PG_COMPRESSION_NONE; +} + +static inline pg_compress_specification * +zpq_find_compressor(ZpqStream * zpq, pg_compress_algorithm algorithm) +{ + int i; + + for (i = 0; i < zpq->n_compressors; i++) + { + if (algorithm == zpq->compressors[i].algorithm) + return &zpq->compressors[i]; + } + return NULL; +} + +static inline bool +zpq_is_compressed_msg(char msg_type) +{ + return msg_type == PqMsg_CompressedMessage; +} + +ZpqStream * +zpq_create(pg_compress_specification *compressors, size_t n_compressors, IoStream * stream) +{ + ZpqStream *zpq; + + /* zpqStream needs at least one compressor */ + if (n_compressors == 0 || compressors == NULL) + { + return NULL; + } + zpq = (ZpqStream *) ALLOC(sizeof(ZpqStream)); + /* almost all fields should default to 0/NULL/PG_COMPRESSION_NONE */ + memset(zpq, 0, sizeof(ZpqStream)); + zpq->compressors = compressors; + zpq->n_compressors = n_compressors; + zpq_buf_init(&zpq->tx_in); + zpq_buf_init(&zpq->rx_in); + zpq_buf_init(&zpq->tx_out); + + io_stream_add_layer(stream, &zpq_processor, zpq); + + return zpq; +} + +void +zpq_enable_compression(ZpqStream * zpq, pg_compress_algorithm *algorithms, size_t n_algorithms) +{ + Assert(n_algorithms <= COMPRESSION_ALGORITHM_COUNT); + + for (int i = 0; i < COMPRESSION_ALGORITHM_COUNT; i++) + { + if (i < n_algorithms) + { + zpq->compress_algs[i] = algorithms[i]; + } + else + { + zpq->compress_algs[i] = PG_COMPRESSION_NONE; + } + } +} + +/* Compress up to src_size bytes from *src into CompressedData and write it to the tx buffer. + * Returns ZS_OK on success, ZS_COMPRESS_ERROR if encountered a compression error. */ +static inline int +zpq_write_compressed_message(ZpqStream * zpq, char const *src, size_t src_size, size_t *src_processed) +{ + size_t compressed_len; + ssize_t rc; + uint32 size; + uint8 algorithm; + + /* check if have enough space */ + if (zpq_buf_left(&zpq->tx_out) <= 6) + { + /* too little space for CompressedData, abort */ + *src_processed = 0; + return ZS_OK; + } + + compressed_len = 0; + rc = zs_write(zpq->c_stream, src, src_size, src_processed, + zpq_buf_size(&zpq->tx_out) + 6, zpq_buf_left(&zpq->tx_out) - 6, &compressed_len); + + if (compressed_len > 0) + { + /* write CompressedData type */ + *zpq_buf_size(&zpq->tx_out) = PqMsg_CompressedMessage; + size = pg_hton32(compressed_len + 5); + + memcpy(zpq_buf_size(&zpq->tx_out) + 1, &size, sizeof(uint32)); /* write msg length */ + compressed_len += 6; /* append header length to compressed data + * length */ + algorithm = zpq->compress_alg; + memcpy(zpq_buf_size(&zpq->tx_out) + 5, &algorithm, sizeof(uint8)); /* write msg algorithm */ + } + + zpq_buf_size_advance(&zpq->tx_out, compressed_len); + return rc; +} + +/* Copy the data directly from *src to the tx buffer */ +static void +zpq_write_uncompressed(ZpqStream * zpq, char const *src, size_t src_size, size_t *src_processed) +{ + src_size = Min(zpq_buf_left(&zpq->tx_out), src_size); + memcpy(zpq_buf_size(&zpq->tx_out), src, src_size); + + zpq_buf_size_advance(&zpq->tx_out, src_size); + *src_processed = src_size; +} + +/* Determine if should compress the next message and change the current compression state */ +static int +zpq_toggle_compression(ZpqStream * zpq, char msg_type, uint32 msg_len) +{ + pg_compress_algorithm new_compress_alg = zpq_choose_algorithm(zpq, msg_type, msg_len); + pg_compress_specification *spec; + bool should_compress = new_compress_alg != PG_COMPRESSION_NONE; + + /* + * negative new_compress_idx indicates that we should not compress this + * message + */ + if (should_compress) + { + /* + * if the new compressor does not match the current one, change out + * the underlying z_stream + */ + if (zpq->compress_alg != new_compress_alg) + { + zs_compressor_free(zpq->c_stream); + spec = zpq_find_compressor(zpq, new_compress_alg); + if (spec == NULL) + { + return ZPQ_FATAL_ERROR; + } + zpq->c_stream = zs_create_compressor(spec); + if (zpq->c_stream == NULL) + { + return ZPQ_FATAL_ERROR; + } + zpq->compress_alg = new_compress_alg; + } + } + + zpq->is_compressing = should_compress; + zpq->tx_msg_bytes_left = msg_len + MESSAGE_TYPE_OFFSET(msg_type); + return 0; +} + +/* + * Internal write function. Reads the data from *src buffer, + * determines the postgres messages type and length. + * If message matches the compression criteria, it wraps the message into + * CompressedData. Otherwise, leaves the message unchanged. + * If *src data ends with incomplete message header, this function is not + * going to read this message header. + * Returns 0 on success or error code + * Number of bytes written is stored in *processed. + */ +static int +zpq_write_internal(ZpqStream * zpq, void const *src, size_t src_size, size_t *processed) +{ + size_t src_pos = 0; + ssize_t rc; + + do + { + /* + * try to read ahead the next message types and increase + * tx_msg_bytes_left, if possible + */ + while (zpq->tx_msg_bytes_left > 0 && src_size - src_pos >= zpq->tx_msg_bytes_left + 5) + { + char msg_type = *((char *) src + src_pos + zpq->tx_msg_bytes_left); + uint32 msg_len; + + memcpy(&msg_len, (char *) src + src_pos + zpq->tx_msg_bytes_left + MESSAGE_TYPE_OFFSET(msg_type), 4); + msg_len = pg_ntoh32(msg_len); + if (zpq_should_compress(zpq, msg_type, msg_len) != zpq->is_compressing) + { + /* + * cannot proceed further, encountered compression toggle + * point + */ + break; + } + zpq->tx_msg_bytes_left += msg_len + MESSAGE_TYPE_OFFSET(msg_type); + } + + /* + * Write CompressedData if currently is compressing or have some + * buffered data left in underlying compression stream + */ + if (zs_compress_buffered(zpq->c_stream) || (zpq->is_compressing && zpq->tx_msg_bytes_left > 0)) + { + size_t buf_processed = 0; + size_t to_compress = Min(zpq->tx_msg_bytes_left, src_size - src_pos); + + rc = zpq_write_compressed_message(zpq, (char *) src + src_pos, to_compress, &buf_processed); + src_pos += buf_processed; + zpq->tx_msg_bytes_left -= buf_processed; + + if (rc != ZS_OK) + { + *processed = src_pos; + return rc; + } + } + + /* + * If not going to compress the data from *src, just write it + * uncompressed. + */ + else if (zpq->tx_msg_bytes_left > 0) + { /* determine next message type */ + size_t copy_len = Min(src_size - src_pos, zpq->tx_msg_bytes_left); + size_t copy_processed = 0; + + zpq_write_uncompressed(zpq, (char *) src + src_pos, copy_len, ©_processed); + src_pos += copy_processed; + zpq->tx_msg_bytes_left -= copy_processed; + } + + /* + * Reached the compression toggle point, fetch next message header to + * determine compression state. + */ + else + { + char msg_type; + uint32 msg_len; + + if (src_size - src_pos < 5) + { + /* + * must return here because we can't continue without full + * message header + */ + *processed = src_pos; + return 0; + } + + msg_type = *((char *) src + src_pos); + memcpy(&msg_len, (char *) src + src_pos + MESSAGE_TYPE_OFFSET(msg_type), 4); + msg_len = pg_ntoh32(msg_len); + rc = zpq_toggle_compression(zpq, msg_type, msg_len); + if (rc) + { + *processed = src_pos; + return rc; + } + } + + /* + * repeat sending while there is some data in input or internal + * compression buffer + */ + } while (src_pos < src_size && zpq_buf_left(&zpq->tx_out) > 6); + + *processed = src_pos; + return 0; +} + +int +zpq_write(IoStreamLayer * self, ZpqStream * zpq, void const *src, size_t src_size, size_t *bytes_written) +{ + size_t src_pos = 0; + ssize_t rc; + + /* try to process as much data as possible before calling the tx_func */ + while (zpq_buf_left(&zpq->tx_out) > 6) + { + size_t copy_len = Min(zpq_buf_left(&zpq->tx_in), src_size - src_pos); + size_t processed; + + memcpy(zpq_buf_size(&zpq->tx_in), (char *) src + src_pos, copy_len); + zpq_buf_size_advance(&zpq->tx_in, copy_len); + src_pos += copy_len; + + if (zpq_buf_unread(&zpq->tx_in) == 0 && !zs_compress_buffered(zpq->c_stream)) + { + break; + } + + processed = 0; + + rc = zpq_write_internal(zpq, zpq_buf_pos(&zpq->tx_in), zpq_buf_unread(&zpq->tx_in), &processed); + zpq_buf_pos_advance(&zpq->tx_in, processed); + zpq_buf_reuse(&zpq->tx_in); + if (rc < 0) + { + *bytes_written = src_pos; + return rc; + } + if(processed == 0) + { + break; + } + } + *bytes_written = src_pos; + + /* + * call the tx_func if have any bytes to send + */ + while (zpq_buf_unread(&zpq->tx_out)) + { + size_t count; + + rc = io_stream_next_write(self, zpq_buf_pos(&zpq->tx_out), zpq_buf_unread(&zpq->tx_out), &count); + if (!rc && count > 0) + { + zpq_buf_pos_advance(&zpq->tx_out, count); + } + else + { + zpq_buf_reuse(&zpq->tx_out); + return rc; + } + } + + zpq_buf_reuse(&zpq->tx_out); + return 0; +} + +/* Decompress bytes from RX buffer and write up to dst_len of uncompressed data to *dst. + * Returns: + * ZS_OK on success, + * ZS_STREAM_END if reached end of compressed chunk + * ZS_DECOMPRESS_ERROR if encountered a decompression error */ +static inline ssize_t +zpq_read_compressed_message(ZpqStream * zpq, char *dst, size_t dst_len, size_t *dst_processed) +{ + size_t rx_processed = 0; + ssize_t rc; + size_t read_len = Min(zpq->rx_msg_bytes_left, zpq_buf_unread(&zpq->rx_in)); + + Assert(read_len == zpq->rx_msg_bytes_left); + rc = zs_read(zpq->d_stream, zpq_buf_pos(&zpq->rx_in), read_len, &rx_processed, + dst, dst_len, dst_processed); + + zpq_buf_pos_advance(&zpq->rx_in, rx_processed); + zpq->rx_msg_bytes_left -= rx_processed; + return rc; +} + +/* Copy up to dst_len bytes from rx buffer to *dst. + * Returns amount of bytes copied. */ +static inline size_t +zpq_read_uncompressed(ZpqStream * zpq, char *dst, size_t dst_len) +{ + size_t copy_len; + + Assert(zpq_buf_unread(&zpq->rx_in) > 0); + copy_len = Min(zpq->rx_msg_bytes_left, Min(zpq_buf_unread(&zpq->rx_in), dst_len)); + + memcpy(dst, zpq_buf_pos(&zpq->rx_in), copy_len); + + zpq_buf_pos_advance(&zpq->rx_in, copy_len); + zpq->rx_msg_bytes_left -= copy_len; + return copy_len; +} + +/* Determine if should decompress the next message and + * change the current decompression state */ +static inline void +zpq_toggle_decompression(ZpqStream * zpq) +{ + uint32 msg_len; + char msg_type = *zpq_buf_pos(&zpq->rx_in); + + memcpy(&msg_len, zpq_buf_pos(&zpq->rx_in) + MESSAGE_TYPE_OFFSET(msg_type), 4); + msg_len = pg_ntoh32(msg_len); + + zpq->is_decompressing = zpq_is_compressed_msg(msg_type); + zpq->rx_msg_bytes_left = msg_len + MESSAGE_TYPE_OFFSET(msg_type); + + if (zpq->is_decompressing) + { + /* compressed message header is no longer needed, just skip it */ + zpq_buf_pos_advance(&zpq->rx_in, 5); + zpq->rx_msg_bytes_left -= 5; + zpq->reading_compressed_header = true; + } +} + +static inline ssize_t +zpq_process_switch(ZpqStream * zpq) +{ + pg_compress_algorithm algorithm; + pg_compress_specification *spec; + + if (zpq_buf_unread(&zpq->rx_in) < 1) + { + return 0; + } + + algorithm = *zpq_buf_pos(&zpq->rx_in); + + zpq_buf_pos_advance(&zpq->rx_in, 1); + zpq->reading_compressed_header = false; + zpq->rx_msg_bytes_left -= 1; + + /* + * if the new decompressor does not match the current one, change out the + * underlying z_stream + */ + if (algorithm != zpq->decompress_alg) + { + zs_decompressor_free(zpq->d_stream); + spec = zpq_find_compressor(zpq, algorithm); + if (spec == NULL) + { + return ZPQ_FATAL_ERROR; + } + zpq->d_stream = zs_create_decompressor(spec); + if (zpq->d_stream == NULL) + { + return ZPQ_FATAL_ERROR; + } + zpq->decompress_alg = algorithm; + } + + return 0; +} + +ssize_t +zpq_read(IoStreamLayer * self, ZpqStream * zpq, void *dst, size_t dst_size, bool buffered_only) +{ + size_t dst_pos = 0; + size_t dst_processed = 0; + ssize_t rc; + + /* Read until some data fetched */ + while (dst_pos == 0) + { + zpq_buf_reuse(&zpq->rx_in); + + if (!zpq_buffered_rx(zpq) || (zpq->is_decompressing && zpq_buf_unread(&zpq->rx_in) < zpq->rx_msg_bytes_left)) + { + rc = io_stream_next_read(self, zpq_buf_size(&zpq->rx_in), zpq_buf_left(&zpq->rx_in), buffered_only); + if (rc > 0) /* read fetches some data */ + { + zpq_buf_size_advance(&zpq->rx_in, rc); + } + else if (rc == 0) + { + /* got no more data; return what we have */ + return dst_pos; + } + else /* read failed */ + { + return rc; + } + } + + /* + * try to read ahead the next message types and increase + * rx_msg_bytes_left, if possible (ONLY UNCOMPRESSED MESSAGES) + */ + while (!zpq->is_decompressing && zpq->rx_msg_bytes_left > 0 && (zpq_buf_unread(&zpq->rx_in) >= zpq->rx_msg_bytes_left + 5)) + { + char msg_type; + uint32 msg_len; + + msg_type = *(zpq_buf_pos(&zpq->rx_in) + zpq->rx_msg_bytes_left); + if (zpq_is_compressed_msg(msg_type)) + { + /* + * cannot proceed further, encountered compression toggle + * point + */ + break; + } + + memcpy(&msg_len, zpq_buf_pos(&zpq->rx_in) + zpq->rx_msg_bytes_left + MESSAGE_TYPE_OFFSET(msg_type), 4); + zpq->rx_msg_bytes_left += pg_ntoh32(msg_len) + MESSAGE_TYPE_OFFSET(msg_type); + } + + + /* + * If we are in the middle of reading a message, keep reading it until + * we reach the end at which point we need to check if we should + * toggle compression + */ + if (zpq->rx_msg_bytes_left > 0 || zs_decompress_buffered(zpq->d_stream)) + { + dst_processed = 0; + if (zpq->is_decompressing || zs_decompress_buffered(zpq->d_stream)) + { + if (zpq->reading_compressed_header) + { + zpq_process_switch(zpq); + } + if (!zs_decompress_buffered(zpq->d_stream) && zpq_buf_unread(&zpq->rx_in) < zpq->rx_msg_bytes_left) + { + /* + * prefer to read only the fully compressed messages or + * read if some data is buffered + */ + continue; + } + rc = zpq_read_compressed_message(zpq, dst, dst_size - dst_pos, &dst_processed); + dst_pos += dst_processed; + if (rc == ZS_STREAM_END) + { + continue; + } + if (rc != ZS_OK) + { + return rc; + } + } + else + dst_pos += zpq_read_uncompressed(zpq, dst, dst_size - dst_pos); + } + else if (zpq_buf_unread(&zpq->rx_in) >= 5) + zpq_toggle_decompression(zpq); + } + return dst_pos; +} + +bool +zpq_buffered_rx(ZpqStream * zpq) +{ + return zpq ? zpq_buf_unread(&zpq->rx_in) >= 5 || (zpq_buf_unread(&zpq->rx_in) > 0 && zpq->rx_msg_bytes_left > 0) || + zs_decompress_buffered(zpq->d_stream) : 0; +} + +bool +zpq_buffered_tx(ZpqStream * zpq) +{ + return zpq ? zpq_buf_unread(&zpq->tx_in) >= 5 || (zpq_buf_unread(&zpq->tx_in) > 0 && zpq->tx_msg_bytes_left > 0) || zpq_buf_unread(&zpq->tx_out) > 0 || + zs_compress_buffered(zpq->c_stream) : 0; +} + +void +zpq_free(ZpqStream * zpq) +{ + if (zpq) + { + if (zpq->c_stream) + { + zs_compressor_free(zpq->c_stream); + } + if (zpq->d_stream) + { + zs_decompressor_free(zpq->d_stream); + } + FREE(zpq); + } +} + +char const * +zpq_compress_error(ZpqStream * zpq) +{ + return zs_compress_error(zpq->c_stream); +} + +char const * +zpq_decompress_error(ZpqStream * zpq) +{ + return zs_decompress_error(zpq->d_stream); +} + +pg_compress_algorithm +zpq_compress_algorithm(ZpqStream * zpq) +{ + return zs_algorithm(zpq->c_stream); +} + +pg_compress_algorithm +zpq_decompress_algorithm(ZpqStream * zpq) +{ + return zs_algorithm(zpq->d_stream); +} + +char * +zpq_algorithms(ZpqStream * zpq) +{ + return zpq_serialize_compressors(zpq->compressors, zpq->n_compressors); +} + +int +zpq_parse_compression_setting(const char *val, pg_compress_specification *compressors, size_t *n_compressors) +{ + int i; + + *n_compressors = 0; + memset(compressors, 0, sizeof(pg_compress_specification) * COMPRESSION_ALGORITHM_COUNT); + + if (pg_strcasecmp(val, "true") == 0 || + pg_strcasecmp(val, "yes") == 0 || + pg_strcasecmp(val, "on") == 0 || + pg_strcasecmp(val, "1") == 0) + { + int j = 0; + + /* + * return all available compressors, processing NONE (0) as the last + * algorithm rather than the first + */ + for (i = 1; i <= COMPRESSION_ALGORITHM_COUNT; i++) + { + if (supported_compression_algorithm(i % COMPRESSION_ALGORITHM_COUNT)) + *n_compressors += 1; + } + + for (i = 1; i <= COMPRESSION_ALGORITHM_COUNT; i++) + { + if (supported_compression_algorithm(i % COMPRESSION_ALGORITHM_COUNT)) + { + parse_compress_specification(i % COMPRESSION_ALGORITHM_COUNT, NULL, &compressors[j]); + j += 1; + } + } + return 1; + } + + if (*val == 0 || + pg_strcasecmp(val, "false") == 0 || + pg_strcasecmp(val, "no") == 0 || + pg_strcasecmp(val, "off") == 0 || + pg_strcasecmp(val, "0") == 0) + { + /* Compression is disabled */ + return 0; + } + + return zpq_deserialize_compressors(val, compressors, n_compressors) ? 1 : -1; +} + +bool +zpq_deserialize_compressors(char const *c_string, pg_compress_specification *compressors, size_t *n_compressors) +{ + int selected_alg_mask = 0; /* bitmask of already selected + * algorithms to avoid duplicates in + * compressors */ + char *c_string_dup = STRDUP(c_string); /* following parsing can + * modify the string */ + char *p = c_string_dup; + + *n_compressors = 0; + memset(compressors, 0, sizeof(pg_compress_specification) * COMPRESSION_ALGORITHM_COUNT); + + while (*p != '\0') + { + char *sep = strchr(p, ';'); + char *col; + char *error_detail; + pg_compress_algorithm algorithm; + pg_compress_specification *spec = &compressors[*n_compressors]; + + if (sep != NULL) + *sep = '\0'; + + col = strchr(p, ':'); + if (col != NULL) + { + *col = '\0'; + } + if (!parse_compress_algorithm(p, &algorithm)) + { + pg_log_warning("invalid compression algorithm %s", p); + goto error; + } + + if (supported_compression_algorithm(algorithm)) + { + parse_compress_specification(algorithm, col == NULL ? NULL : col + 1, spec); + error_detail = validate_compress_specification(spec); + if (error_detail) + { + pg_log_warning("invalid compression specification: %s", error_detail); + goto error; + } + if (spec->options & PG_COMPRESSION_OPTION_WORKERS || spec->options & PG_COMPRESSION_OPTION_LONG_DISTANCE) + { + pg_log_warning("streaming compression does not support workers or long distance options"); + goto error; + } + + if (selected_alg_mask & (1 << algorithm)) + { + /* duplicates are not allowed */ + pg_log_warning("duplicate algorithm %s in compressors string %s", get_compress_algorithm_name(algorithm), c_string); + goto error; + } + + *n_compressors += 1; + selected_alg_mask |= 1 << algorithm; + } + else + { + /* + * Intentionally do not return an error, as we must support + * clients/servers parsing algorithms they don't suppport mixed + * with ones they do support + */ + pg_log_warning("this build does not support compression with %s", + get_compress_algorithm_name(algorithm)); + } + + if (sep) + p = sep + 1; + else + break; + } + + /* + * Always add NONE to the list at the end to allow one-directional + * compression, but only if there are other algorithms available. If there + * are no active compressors available, we should return n_compressors = 0 + * to allow other layers to skip adding the compression processing layers + */ + if (n_compressors > 0 && !(selected_alg_mask & (1 << PG_COMPRESSION_NONE))) + { + pg_compress_specification *spec = &compressors[*n_compressors]; + + spec->algorithm = PG_COMPRESSION_NONE; + *n_compressors += 1; + } + + FREE(c_string_dup); + return true; + +error: + FREE(c_string_dup); + *n_compressors = 0; + return false; +} + +/* + * Because deserialize rejects all options, this method only needs to concern + * itself with only serializing name and level + */ +char * +zpq_serialize_compressors(pg_compress_specification const *compressors, size_t n_compressors) +{ + char *res; + char *p; + size_t i; + size_t total_len = 0; + + if (n_compressors == 0) + { + return NULL; + } + + for (i = 0; i < n_compressors; i++) + { + size_t level_len; + + /* determine the length of the compression level string */ + level_len = compressors[i].level == 0 ? 1 : (int) floor(log10(abs(compressors[i].level))) + 1; + if (compressors[i].level < 0) + { + level_len += 1; /* add the leading "-" */ + } + + /* + * single entry looks like "alg_name:compression_level," so +2 is for + * ":" and ";" symbols (or trailing null) + */ + total_len += strlen(get_compress_algorithm_name(compressors[i].algorithm)) + level_len + 2; + } + + res = p = ALLOC(total_len); + + for (i = 0; i < n_compressors; i++) + { + p += sprintf(p, "%s:%d", get_compress_algorithm_name(compressors[i].algorithm), compressors[i].level); + if (i < n_compressors - 1) + *p++ = ';'; + } + return res; +} diff --git a/src/include/common/compression.h b/src/include/common/compression.h index c94ace6e8a..9c7d68a036 100644 --- a/src/include/common/compression.h +++ b/src/include/common/compression.h @@ -17,6 +17,7 @@ /* * These values are stored in disk, for example in files generated by pg_dump. * Create the necessary backwards compatibility layers if their order changes. + * Make sure to keep COMPRESSION_ALGORITHM_COUNT in sync if adding new values. */ typedef enum pg_compress_algorithm { @@ -26,6 +27,8 @@ typedef enum pg_compress_algorithm PG_COMPRESSION_ZSTD, } pg_compress_algorithm; +#define COMPRESSION_ALGORITHM_COUNT (PG_COMPRESSION_ZSTD + 1) + #define PG_COMPRESSION_OPTION_WORKERS (1 << 0) #define PG_COMPRESSION_OPTION_LONG_DISTANCE (1 << 1) @@ -36,7 +39,9 @@ typedef struct pg_compress_specification int level; int workers; bool long_distance; - char *parse_error; /* NULL if parsing was OK, else message */ + bool has_error; + char parse_error[255]; /* Populated with error message if + * has_error is true */ } pg_compress_specification; extern void parse_compress_options(const char *option, char **algorithm, @@ -49,5 +54,6 @@ extern void parse_compress_specification(pg_compress_algorithm algorithm, pg_compress_specification *result); extern char *validate_compress_specification(pg_compress_specification *); +extern bool supported_compression_algorithm(pg_compress_algorithm algorithm); #endif diff --git a/src/include/common/z_stream.h b/src/include/common/z_stream.h new file mode 100644 index 0000000000..e456d509e1 --- /dev/null +++ b/src/include/common/z_stream.h @@ -0,0 +1,98 @@ +/*------------------------------------------------------------------------- + * + * z_stream.h + * Streaming compression algorithms + * + * Copyright (c) 2018-2023, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/common/z_stream.h + * + *------------------------------------------------------------------------- + */ +#ifndef Z_STREAM_H +#define Z_STREAM_H + +#include + +#include "compression.h" + +#define ZS_OK (0) +#define ZS_IO_ERROR (-1) +#define ZS_DECOMPRESS_ERROR (-2) +#define ZS_COMPRESS_ERROR (-3) +#define ZS_STREAM_END (-4) +#define ZS_INCOMPLETE_SRC (-5) /* cannot decompress unless full src message + * is fetched */ + +struct ZStream; +typedef struct ZStream ZStream; + +#endif + +/* + * Create compression stream for sending compressed data. + * spec: specifications of chosen decompression algorithm + */ +extern ZStream * zs_create_compressor(pg_compress_specification *spec); + +/* + * Create decompression stream for reading compressed data. + * spec: specifications of chosen decompression algorithm + */ +extern ZStream * zs_create_decompressor(pg_compress_specification *spec); + +/* + * Read up to "size" raw (decompressed) bytes. + * Returns ZS_OK on success or error code. + * Stores bytes read from src in src_processed, bytes written to dst in dst_process. + */ +extern int zs_read(ZStream * zs, void const *src, size_t src_size, size_t *src_processed, void *dst, size_t dst_size, size_t *dst_processed); + +/* + * Write up to "size" raw (decompressed) bytes. + * Returns number of written raw bytes or error code. + * Returns ZS_OK on success or error code. + * Stores bytes read from buf in processed, bytes written to dst in dst_process + */ +extern int zs_write(ZStream * zs, void const *buf, size_t size, size_t *processed, void *dst, size_t dst_size, size_t *dst_processed); + +/* + * Returns if there is buffered raw data remaining in the stream to compress + */ +extern bool zs_compress_buffered(ZStream * zs); + +/* + * Returns if there is buffered decompressed data remaining in the stream to read + */ +extern bool zs_decompress_buffered(ZStream * zs); + +/* + * Get decompressor error message. + */ +extern char const *zs_decompress_error(ZStream * zs); + +/* + * Get compressor error message. + */ +extern char const *zs_compress_error(ZStream * zs); + +/* + * End the compression stream. + */ +extern int zs_end_compression(ZStream * zs, void *dst, size_t dst_size, size_t *dst_processed); + +/* + * Free stream created by zs_create_compressor function. + */ +extern void zs_compressor_free(ZStream * zs); + +/* + * Free stream created by zs_create_decompressor function. + */ +extern void zs_decompressor_free(ZStream * zs); + +/* + * Get the descriptor of chosen algorithm. + */ +extern pg_compress_algorithm zs_algorithm(ZStream * zs); diff --git a/src/include/common/zpq_stream.h b/src/include/common/zpq_stream.h new file mode 100644 index 0000000000..15570f2363 --- /dev/null +++ b/src/include/common/zpq_stream.h @@ -0,0 +1,91 @@ +/*------------------------------------------------------------------------- + * + * zpq_stream.h + * IO stream layer applying ZStream compression to libpq + * + * Copyright (c) 2018-2023, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/common/zpq_stream.h + * + *------------------------------------------------------------------------- + */ +#include "io_stream.h" +#include "z_stream.h" + +#ifndef ZPQ_STREAM_H +#define ZPQ_STREAM_H + +#define ZPQ_FATAL_ERROR (-7) +struct ZpqStream; +typedef struct ZpqStream ZpqStream; + +#endif + +/* + * Create compression stream with rx/tx function for reading/sending compressed data. + * io_stream: IO Stream to layer on top of + * rx_data: received data (compressed data already fetched from input stream) + * rx_data_size: size of data fetched from input stream + * The returned ZpqStream can only be destroyed by destoing the IoStream with io_stream_destroy. + */ +extern ZpqStream * zpq_create(pg_compress_specification *compressors, size_t n_compressors, IoStream * stream); + +/* + * Start compressing applicable outgoing data once the connection is sufficiently set up + */ +extern void zpq_enable_compression(ZpqStream * zpq, pg_compress_algorithm *algorithms, size_t n_algorithms); + +/* + * Get decompressor error message. + */ +extern char const *zpq_decompress_error(ZpqStream * zpq); + +/* + * Get compressor error message. + */ +extern char const *zpq_compress_error(ZpqStream * zpq); + +/* + * Get the name of the current compression algorithm. + */ +extern pg_compress_algorithm zpq_compress_algorithm(ZpqStream * zpq); + +/* + * Get the name of the current decompression algorithm. + */ +extern pg_compress_algorithm zpq_decompress_algorithm(ZpqStream * zpq); + +/* + * Parse the compression setting. + * Compressors must be an array of length COMPRESSION_ALGORITHM_COUNT + * Returns: + * - 1 if the compression setting is valid + * - 0 if the compression setting is valid but disabled + * - -1 if the compression setting is invalid + * It also populates the compressors array with the recognized compressors. Size of the array is stored in n_compressors. + * If no supported compressors recognized or if compression is disabled, then n_compressors is set to 0. + */ +extern int + zpq_parse_compression_setting(const char *val, pg_compress_specification *compressors, size_t *n_compressors); + +/* Serialize the compressors array to string so it can be transmitted to the other side during the compression startup. + * For example, for array of two compressors (zstd, level 1), (zlib, level 2) resulting string would look like "zstd:1;zlib:2". + * Returns the resulting string. + */ +extern char + *zpq_serialize_compressors(pg_compress_specification const *compressors, size_t n_compressors); + +/* Deserialize the compressors string received during the compression setup to a compressors array. + * Compressors must be an array of length COMPRESSION_ALGORITHM_COUNT + * Returns: + * - true if the compressors string is successfully parsed + * - false otherwise + * It also populates the compressors array with the recognized compressors. Size of the array is stored in n_compressors. + * If no supported compressors are recognized or c_string is empty, then n_compressors is set to 0. + */ +bool + zpq_deserialize_compressors(char const *c_string, pg_compress_specification *compressors, size_t *n_compressors); + +/* Returns the currently enabled compression algorithms using zpq_serialize_compressors */ +char *zpq_algorithms(ZpqStream * zpq); diff --git a/src/include/libpq/compression.h b/src/include/libpq/compression.h new file mode 100644 index 0000000000..927ef42751 --- /dev/null +++ b/src/include/libpq/compression.h @@ -0,0 +1,30 @@ +/*------------------------------------------------------------------------- + * + * compression.h + * Interface to libpq/compression.c + * + * + * Copyright (c) 2023, PostgreSQL Global Development Group + * + * src/include/libpq/compression.h + * + *------------------------------------------------------------------------- + */ +#ifndef LIBPQ_COMPRESSION_H +#define LIBPQ_COMPRESSION_H + +#include "postgres.h" +#include "libpq-be.h" +#include "common/compression.h" + +extern PGDLLIMPORT char *libpq_compress_algorithms; +extern PGDLLIMPORT pg_compress_specification libpq_compressors[COMPRESSION_ALGORITHM_COUNT]; +extern PGDLLIMPORT size_t libpq_n_compressors; + +/* + * Enbles compression processing on the given port. + * val is the value of the _pq_.libpq_compression startup packet parameter + */ +extern void configure_libpq_compression(Port *port, const char *val); + +#endif /* LIBPQ_COMPRESSION_H */ diff --git a/src/include/libpq/libpq-be.h b/src/include/libpq/libpq-be.h index 87ba7f5ea0..ec3b69351c 100644 --- a/src/include/libpq/libpq-be.h +++ b/src/include/libpq/libpq-be.h @@ -53,6 +53,8 @@ typedef struct #endif #endif /* ENABLE_SSPI */ +#include + #include "common/io_stream.h" #include "datatype/timestamp.h" #include "libpq/hba.h" @@ -161,6 +163,7 @@ typedef struct Port int remote_hostname_errcode; /* see above */ char *remote_port; /* text rep of remote port */ CAC_state canAcceptConnections; /* postmaster connection status */ + ZpqStream *zpq_stream; /* streaming compression state */ /* * Information that needs to be saved from the startup packet and passed diff --git a/src/include/libpq/libpq.h b/src/include/libpq/libpq.h index 3612280146..01cc3911fa 100644 --- a/src/include/libpq/libpq.h +++ b/src/include/libpq/libpq.h @@ -73,6 +73,7 @@ extern void StreamClose(pgsocket sock); extern void TouchSocketFiles(void); extern void RemoveSocketFiles(void); extern void pq_init(void); +extern int pq_configure(Port *port); extern int pq_getbytes(char *s, size_t len); extern void pq_startmsgread(void); extern void pq_endmsgread(void); diff --git a/src/include/libpq/protocol.h b/src/include/libpq/protocol.h index cc46f4b586..d9eaebbf7e 100644 --- a/src/include/libpq/protocol.h +++ b/src/include/libpq/protocol.h @@ -63,7 +63,7 @@ #define PqMsg_CopyDone 'c' #define PqMsg_CopyData 'd' - +#define PqMsg_CompressedMessage 'z' /* These are the authentication request codes sent by the backend. */ diff --git a/src/include/utils/guc_hooks.h b/src/include/utils/guc_hooks.h index 3d74483f44..ee34fbb20e 100644 --- a/src/include/utils/guc_hooks.h +++ b/src/include/utils/guc_hooks.h @@ -63,6 +63,8 @@ extern bool check_effective_io_concurrency(int *newval, void **extra, GucSource source); extern bool check_huge_page_size(int *newval, void **extra, GucSource source); extern const char *show_in_hot_standby(void); +extern bool check_libpq_compression(char **newval, void **extra, GucSource source); +extern void assign_libpq_compression(const char *newval, void *extra); extern bool check_locale_messages(char **newval, void **extra, GucSource source); extern void assign_locale_messages(const char *newval, void *extra); extern bool check_locale_monetary(char **newval, void **extra, GucSource source); diff --git a/src/interfaces/libpq/exports.txt b/src/interfaces/libpq/exports.txt index 850734ac96..392a6815e3 100644 --- a/src/interfaces/libpq/exports.txt +++ b/src/interfaces/libpq/exports.txt @@ -191,3 +191,6 @@ PQclosePrepared 188 PQclosePortal 189 PQsendClosePrepared 190 PQsendClosePortal 191 +PQcompression 192 +PQreadPending 193 +PQserverCompression 194 diff --git a/src/interfaces/libpq/fe-connect.c b/src/interfaces/libpq/fe-connect.c index a0f12e62af..e9830bc6c7 100644 --- a/src/interfaces/libpq/fe-connect.c +++ b/src/interfaces/libpq/fe-connect.c @@ -25,6 +25,7 @@ #include "common/ip.h" #include "common/link-canary.h" #include "common/scram-common.h" +#include "common/zpq_stream.h" #include "common/string.h" #include "fe-auth.h" #include "libpq-fe.h" @@ -398,6 +399,10 @@ static const internalPQconninfoOption PQconninfoOptions[] = { "Replication", "D", 5, offsetof(struct pg_conn, replication)}, + {"compression", "PGCOMPRESSION", "off", NULL, + "Libpq-compression", "", 16, + offsetof(struct pg_conn, compression)}, + {"target_session_attrs", "PGTARGETSESSIONATTRS", DefaultTargetSessionAttrs, NULL, "Target-Session-Attrs", "", 15, /* sizeof("prefer-standby") = 15 */ @@ -523,6 +528,7 @@ pqDropConnection(PGconn *conn, bool flushInput) { io_stream_destroy(conn->io_stream); conn->io_stream = NULL; + conn->zpqStream = NULL; /* Close the socket itself */ if (conn->sock != PGINVALID_SOCKET) closesocket(conn->sock); @@ -1713,6 +1719,34 @@ connectOptions2(PGconn *conn) goto oom_error; } + /* + * validate compression option + */ + if (conn->compression && conn->compression[0]) + { + pg_compress_specification compressors[COMPRESSION_ALGORITHM_COUNT]; + size_t n_compressors; + int rc = zpq_parse_compression_setting(conn->compression, compressors, &n_compressors); + + if (rc == -1) + { + conn->status = CONNECTION_BAD; + appendPQExpBuffer(&conn->errorMessage, + libpq_gettext("invalid %s value: \"%s\"\n"), + "compression", conn->compression); + return false; + } + + if (rc == 1 && n_compressors == 0) + { + conn->status = CONNECTION_BAD; + appendPQExpBuffer(&conn->errorMessage, + libpq_gettext("no supported algorithms found, %s value: \"%s\"\n"), + "compression", conn->compression); + return false; + } + } + /* * validate target_session_attrs option, and set target_server_type */ @@ -3347,6 +3381,20 @@ keep_going: /* We will come back to here until there is goto error_return; } + /* + * By this point we have set up any encryption layers needed, + * so we can inject compression processing if requested. + */ + if (!conn->zpqStream && conn->n_compressors > 0) + { + conn->zpqStream = zpq_create(conn->compressors, conn->n_compressors, conn->io_stream); + if (!conn->zpqStream) + { + libpq_append_conn_error(conn, "failed to set up compression stream"); + goto error_return; + } + } + /* * Send the startup packet. * @@ -3834,14 +3882,22 @@ keep_going: /* We will come back to here until there is } else if (beresp == PqMsg_NegotiateProtocolVersion) { - if (pqGetNegotiateProtocolVersion3(conn)) + if ((res = pqProcessNegotiateProtocolVersion3(conn)) < 0) { libpq_append_conn_error(conn, "received invalid protocol negotiation message"); goto error_return; } /* OK, we read the message; mark data consumed */ conn->inStart = conn->inCursor; - goto error_return; + + if (res == 0) + { + goto error_return; + } + else + { + goto keep_going; + } } /* It is an authentication request. */ @@ -4270,6 +4326,50 @@ error_return: return PGRES_POLLING_FAILED; } +/* + * Based on server GUC libpq_compression, establishes a zpq_stream to compres + * protocol traffic. Should only be called once per connection lifecycle, as the + * zpq_stream cannot be reconfigured without restarting the connection + */ +void +pqConfigureCompression(PGconn *conn, const char *compressors_str) +{ + pg_compress_specification be_compressors[COMPRESSION_ALGORITHM_COUNT]; + size_t n_be_compressors; + + zpq_deserialize_compressors(compressors_str, be_compressors, &n_be_compressors); + if (conn->zpqStream) + { + pg_compress_algorithm algorithms[COMPRESSION_ALGORITHM_COUNT]; + size_t n_algorithms = 0; + + if (n_be_compressors == 0) + { + return; + } + + /* + * Intersect client and server compressors to determine the final list + * of the supported compressors. O(N^2) is negligible because of a + * small number of the compression methods. + */ + for (size_t i = 0; i < conn->n_compressors; i++) + { + for (size_t j = 0; j < n_be_compressors; j++) + { + if (conn->compressors[i].algorithm == be_compressors[j].algorithm) + { + algorithms[n_algorithms] = conn->compressors[i].algorithm; + n_algorithms += 1; + break; + } + } + } + + zpq_enable_compression(conn->zpqStream, algorithms, n_algorithms); + } +} + /* * internal_ping @@ -4480,6 +4580,7 @@ freePGconn(PGconn *conn) free(conn->fbappname); free(conn->dbName); free(conn->replication); + free(conn->compression); free(conn->pguser); if (conn->pgpass) { @@ -7163,6 +7264,24 @@ PQuser(const PGconn *conn) return conn->pguser; } +char * +PQcompression(const PGconn *conn) +{ + if (!conn || !conn->zpqStream) + return NULL; + + return zpq_algorithms(conn->zpqStream); +} + +char * +PQserverCompression(const PGconn *conn) +{ + if (!conn) + return NULL; + + return conn->server_compression; +} + char * PQpass(const PGconn *conn) { diff --git a/src/interfaces/libpq/fe-exec.c b/src/interfaces/libpq/fe-exec.c index b9511df2c2..b6158d29c2 100644 --- a/src/interfaces/libpq/fe-exec.c +++ b/src/interfaces/libpq/fe-exec.c @@ -1185,6 +1185,15 @@ pqSaveParameterStatus(PGconn *conn, const char *name, const char *value) { conn->scram_sha_256_iterations = atoi(value); } + else if (strcmp(name, "libpq_compression") == 0) + { + if (conn->server_compression) + { + free(conn->server_compression); + } + conn->server_compression = strdup(value); + pqConfigureCompression(conn, value); + } } @@ -3962,6 +3971,12 @@ pqPipelineFlush(PGconn *conn) return 0; } +int +PQreadPending(PGconn *conn) +{ + return pqReadPending(conn); +} + /* * PQfreemem - safely frees memory allocated diff --git a/src/interfaces/libpq/fe-misc.c b/src/interfaces/libpq/fe-misc.c index 6772f2876d..9aeeb123b7 100644 --- a/src/interfaces/libpq/fe-misc.c +++ b/src/interfaces/libpq/fe-misc.c @@ -51,6 +51,8 @@ #include "pg_config_paths.h" #include "port/pg_bswap.h" +#include + static int pqPutMsgBytes(const void *buf, size_t len, PGconn *conn); static int pqSendSome(PGconn *conn, int len); static int pqSocketCheck(PGconn *conn, int forRead, int forWrite, @@ -618,6 +620,14 @@ retry3: conn->inBufSize - conn->inEnd, false); if (nread < 0) { + if (nread == ZS_DECOMPRESS_ERROR) + { + printfPQExpBuffer(&conn->errorMessage, + libpq_gettext("decompress error: %s\n"), + zpq_decompress_error(conn->zpqStream)); + return -1; + } + switch (SOCK_ERRNO) { case EINTR: @@ -713,6 +723,14 @@ retry4: conn->inBufSize - conn->inEnd, false); if (nread < 0) { + if (nread == ZS_DECOMPRESS_ERROR) + { + printfPQExpBuffer(&conn->errorMessage, + libpq_gettext("decompress error: %s\n"), + zpq_decompress_error(conn->zpqStream)); + return -1; + } + switch (SOCK_ERRNO) { case EINTR: @@ -822,7 +840,7 @@ pqSendSome(PGconn *conn, int len) } /* while there's still data to send */ - while (len > 0) + while (len > 0 || io_stream_buffered_write_data(conn->io_stream)) { size_t sent; int rc; @@ -883,7 +901,7 @@ pqSendSome(PGconn *conn, int len) } } - if (len > 0) + if (len > 0 || rc < 0 || io_stream_buffered_write_data(conn->io_stream)) { /* * We didn't send it all, wait till we can send more. @@ -951,7 +969,7 @@ pqSendSome(PGconn *conn, int len) int pqFlush(PGconn *conn) { - if (conn->outCount > 0) + if (conn->outCount > 0 || io_stream_buffered_write_data(conn->io_stream)) { if (conn->Pfdebug) fflush(conn->Pfdebug); @@ -976,6 +994,8 @@ pqFlush(PGconn *conn) int pqWait(int forRead, int forWrite, PGconn *conn) { + if (forRead && conn->inCursor < conn->inEnd) + return 0; return pqWaitTimed(forRead, forWrite, conn, (time_t) -1); } @@ -1030,8 +1050,9 @@ pqWriteReady(PGconn *conn) * or both. Returns >0 if one or more conditions are met, 0 if it timed * out, -1 if an error occurred. * - * If SSL is in use, the SSL buffer is checked prior to checking the socket - * for read data directly. + * If protocol layers are using buffering, the buffers are checked prior + * to checking the socket for read data directly. + * */ static int pqSocketCheck(PGconn *conn, int forRead, int forWrite, time_t end_time) @@ -1068,6 +1089,22 @@ pqSocketCheck(PGconn *conn, int forRead, int forWrite, time_t end_time) return result; } +/* + * Check if there is some data pending in stream buffers. + * Returns -1 on failure, 0 if no, 1 if yes. + */ +int +pqReadPending(PGconn *conn) +{ + if (!conn) + return -1; + + if (io_stream_buffered_read_data(conn->io_stream)) + /* short-circuit the select */ + return 1; + + return 0; +} /* * Check a file descriptor for read and/or write data, possibly waiting. diff --git a/src/interfaces/libpq/fe-protocol3.c b/src/interfaces/libpq/fe-protocol3.c index 8c4ec079ca..9d4642cfb4 100644 --- a/src/interfaces/libpq/fe-protocol3.c +++ b/src/interfaces/libpq/fe-protocol3.c @@ -74,6 +74,20 @@ pqParseInput3(PGconn *conn) */ for (;;) { + /* + * Read any available buffered data + * io_stream may be null when draining the final messages from an aborted connection + */ + if (conn->io_stream && pqReadPending(conn) && (conn->inBufSize - conn->inEnd > 0)) + { + int rc = io_stream_read(conn->io_stream, conn->inBuffer + conn->inEnd, conn->inBufSize - conn->inEnd, true); + + if (rc > 0) + { + conn->inEnd += rc; + } + } + /* * Try to read a message. First get the type code and length. Return * if not enough data. @@ -1404,16 +1418,18 @@ reportErrorPosition(PQExpBuffer msg, const char *query, int loc, int encoding) /* * Attempt to read a NegotiateProtocolVersion message. * Entry: 'v' message type and length have already been consumed. - * Exit: returns 0 if successfully consumed message. + * Exit: returns 0 if successfully consumed message and should exit, + * returns 1 if successfully consumed message and should continue with warning, * returns EOF if not enough data. */ int -pqGetNegotiateProtocolVersion3(PGconn *conn) +pqProcessNegotiateProtocolVersion3(PGconn *conn) { int tmp; ProtocolVersion their_version; int num; PQExpBufferData buf; + int retval = 1; if (pqGetInt(&tmp, 4, conn) != 0) return EOF; @@ -1436,9 +1452,12 @@ pqGetNegotiateProtocolVersion3(PGconn *conn) } if (their_version < conn->pversion) + { libpq_append_conn_error(conn, "protocol version not supported by server: client uses %u.%u, server supports up to %u.%u", PG_PROTOCOL_MAJOR(conn->pversion), PG_PROTOCOL_MINOR(conn->pversion), PG_PROTOCOL_MAJOR(their_version), PG_PROTOCOL_MINOR(their_version)); + retval = 0; + } if (num > 0) { appendPQExpBuffer(&conn->errorMessage, @@ -1446,14 +1465,26 @@ pqGetNegotiateProtocolVersion3(PGconn *conn) "protocol extensions not supported by server: %s", num), buf.data); appendPQExpBufferChar(&conn->errorMessage, '\n'); + + /* + * We can continue to use the connection if the server doesn't support + * compression + */ + if (num > 1 || (strcmp(buf.data, "_pq_.libpq_compression") != 0)) + { + retval = 0; + } } /* neither -- server shouldn't have sent it */ if (!(their_version < conn->pversion) && !(num > 0)) + { libpq_append_conn_error(conn, "invalid %s message", "NegotiateProtocolVersion"); + retval = 0; + } termPQExpBuffer(&buf); - return 0; + return retval; } @@ -1764,7 +1795,7 @@ pqGetCopyData3(PGconn *conn, char **buffer, int async) if (msgLength == 0) { /* Don't block if async read requested */ - if (async) + if (async && !pqReadPending(conn)) return 0; /* Need to load more data */ if (pqWait(true, false, conn) || @@ -2246,6 +2277,46 @@ pqBuildStartupPacket3(PGconn *conn, int *packetlen, return startpacket; } +/* + * Build semicolon-separated list of compression algorithms requested by client. + * It can be either explicitly specified by user in connection string, or + * include all algorithms supported by client library. + * This function returns true if the compression string is successfully parsed and + * stores a comma-separated list of algorithms in *client_compressors. + * If compression is disabled, then NULL is assigned to *client_compressors. + * Also it creates an array of compressor descriptors, each element of which corresponds to + * the corresponding algorithm name in *client_compressors list. This array is stored in PGconn + * and is used during handshake when a compression acknowledgment response is received from the server. + */ +static bool +build_compressors_list(PGconn *conn, char **client_compressors, bool build_descriptors) +{ + pg_compress_specification compressors[COMPRESSION_ALGORITHM_COUNT]; + size_t n_compressors; + + if (zpq_parse_compression_setting(conn->compression, compressors, &n_compressors) < 0) + { + return false; + } + + *client_compressors = NULL; + if (build_descriptors) + { + memcpy(conn->compressors, compressors, sizeof(compressors)); + conn->n_compressors = n_compressors; + } + + if (n_compressors == 0) + { + /* no compressors available, return */ + return true; + } + + *client_compressors = zpq_serialize_compressors(compressors, n_compressors); + + return true; +} + /* * Build a startup packet given a filled-in PGconn structure. * @@ -2292,6 +2363,19 @@ build_startup_packet(const PGconn *conn, char *packet, ADD_STARTUP_OPTION("replication", conn->replication); if (conn->pgoptions && conn->pgoptions[0]) ADD_STARTUP_OPTION("options", conn->pgoptions); + if (conn->compression && conn->compression[0]) + { + char *client_compression_algorithms; + + if (build_compressors_list((PGconn *) conn, &client_compression_algorithms, packet == NULL)) + { + if (client_compression_algorithms) + { + ADD_STARTUP_OPTION("_pq_.libpq_compression", client_compression_algorithms); + free(client_compression_algorithms); + } + } + } if (conn->send_appname) { /* Use appname if present, otherwise use fallback */ diff --git a/src/interfaces/libpq/libpq-fe.h b/src/interfaces/libpq/libpq-fe.h index 97762d56f5..fb7d8a9471 100644 --- a/src/interfaces/libpq/libpq-fe.h +++ b/src/interfaces/libpq/libpq-fe.h @@ -342,6 +342,8 @@ extern char *PQhostaddr(const PGconn *conn); extern char *PQport(const PGconn *conn); extern char *PQtty(const PGconn *conn); extern char *PQoptions(const PGconn *conn); +extern char *PQcompression(const PGconn *conn); +extern char *PQserverCompression(const PGconn *conn); extern ConnStatusType PQstatus(const PGconn *conn); extern PGTransactionStatusType PQtransactionStatus(const PGconn *conn); extern const char *PQparameterStatus(const PGconn *conn, @@ -501,6 +503,8 @@ extern PGPing PQpingParams(const char *const *keywords, /* Force the write buffer to be written (or at least try) */ extern int PQflush(PGconn *conn); +extern int PQreadPending(PGconn *conn); + /* * "Fast path" interface --- not really recommended for application * use diff --git a/src/interfaces/libpq/libpq-int.h b/src/interfaces/libpq/libpq-int.h index 1314663213..4ebd12e420 100644 --- a/src/interfaces/libpq/libpq-int.h +++ b/src/interfaces/libpq/libpq-int.h @@ -40,6 +40,7 @@ /* include stuff common to fe and be */ #include "libpq/pqcomm.h" +#include "common/zpq_stream.h" /* include stuff found in fe only */ #include "fe-auth-sasl.h" #include "pqexpbuffer.h" @@ -406,6 +407,16 @@ struct pg_conn char *gssdelegation; /* Try to delegate GSS credentials? (0 or 1) */ char *ssl_min_protocol_version; /* minimum TLS protocol version */ char *ssl_max_protocol_version; /* maximum TLS protocol version */ + char *compression; /* stream compression (boolean value, "any" or + * list of compression algorithms separated by + * comma) */ + pg_compress_specification compressors[COMPRESSION_ALGORITHM_COUNT]; /* descriptors of + * compression + * algorithms chosen by + * client */ + unsigned n_compressors; /* size of compressors array */ + char *server_compression; /* compression settings set by server */ + char *target_session_attrs; /* desired session properties */ char *require_auth; /* name of the expected auth method */ char *load_balance_hosts; /* load balance over hosts */ @@ -621,6 +632,9 @@ struct pg_conn /* Buffer for receiving various parts of messages */ PQExpBufferData workBuffer; /* expansible string */ + + /* Compression stream */ + ZpqStream *zpqStream; }; /* PGcancel stores all data necessary to cancel a connection. A copy of this @@ -680,6 +694,7 @@ extern void pqDropConnection(PGconn *conn, bool flushInput); extern int pqPacketSend(PGconn *conn, char pack_type, const void *buf, size_t buf_len); extern bool pqGetHomeDirectory(char *buf, int bufsize); +extern void pqConfigureCompression(PGconn *conn, const char *compressors); extern pgthreadlock_t pg_g_threadlock; @@ -712,7 +727,7 @@ extern void pqParseInput3(PGconn *conn); extern int pqGetErrorNotice3(PGconn *conn, bool isError); extern void pqBuildErrorMessage3(PQExpBuffer msg, const PGresult *res, PGVerbosity verbosity, PGContextVisibility show_context); -extern int pqGetNegotiateProtocolVersion3(PGconn *conn); +extern int pqProcessNegotiateProtocolVersion3(PGconn *conn); extern int pqGetCopyData3(PGconn *conn, char **buffer, int async); extern int pqGetline3(PGconn *conn, char *s, int maxlen); extern int pqGetlineAsync3(PGconn *conn, char *buffer, int bufsize); @@ -750,6 +765,7 @@ extern int pqWaitTimed(int forRead, int forWrite, PGconn *conn, time_t finish_time); extern int pqReadReady(PGconn *conn); extern int pqWriteReady(PGconn *conn); +extern int pqReadPending(PGconn *conn); /* === in fe-secure.c === */ diff --git a/src/tools/msvc/Mkvcbuild.pm b/src/tools/msvc/Mkvcbuild.pm index f2013f014a..e419dd3db8 100644 --- a/src/tools/msvc/Mkvcbuild.pm +++ b/src/tools/msvc/Mkvcbuild.pm @@ -141,7 +141,8 @@ sub mkvcbuild keywords.c kwlookup.c link-canary.c md5_common.c percentrepl.c pg_get_line.c pg_lzcompress.c pg_prng.c pgfnames.c psprintf.c relpath.c rmtree.c saslprep.c scram-common.c string.c stringinfo.c - unicode_category.c unicode_norm.c username.c wait_error.c wchar.c); + unicode_category.c unicode_norm.c username.c wait_error.c wchar.c + z_stream.c zpq_stream.c); if ($solution->{options}->{openssl}) { -- 2.42.0