diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c index 5c6e56a5b2..c1fb4fd3d4 100644 --- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c +++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c @@ -30,6 +30,7 @@ #include "pqexpbuffer.h" #include "replication/walreceiver.h" #include "utils/builtins.h" +#include "utils/guc.h" #include "utils/memutils.h" #include "utils/pg_lsn.h" #include "utils/tuplestore.h" @@ -82,6 +83,8 @@ static WalRcvExecResult *libpqrcv_exec(WalReceiverConn *conn, const int nRetTypes, const Oid *retTypes); static void libpqrcv_disconnect(WalReceiverConn *conn); +static char *add_backslash(const char *value); +static char *get_transmission_modes(void); static WalReceiverFunctionsType PQWalReceiverFunctions = { libpqrcv_connect, @@ -128,9 +131,10 @@ libpqrcv_connect(const char *conninfo, bool logical, const char *appname, { WalReceiverConn *conn; PostgresPollingStatusType status; - const char *keys[5]; - const char *vals[5]; + const char *keys[6]; + const char *vals[6]; int i = 0; + char *options; /* * We use the expand_dbname parameter to process the connection string (or @@ -155,6 +159,10 @@ libpqrcv_connect(const char *conninfo, bool logical, const char *appname, { keys[++i] = "client_encoding"; vals[i] = GetDatabaseEncodingName(); + + options = get_transmission_modes(); + keys[++i] = "options"; + vals[i] = options; } keys[++i] = NULL; vals[i] = NULL; @@ -164,6 +172,7 @@ libpqrcv_connect(const char *conninfo, bool logical, const char *appname, conn = palloc0(sizeof(WalReceiverConn)); conn->streamConn = PQconnectStartParams(keys, vals, /* expand_dbname = */ true); + pfree(options); if (PQstatus(conn->streamConn) == CONNECTION_BAD) { *err = pchomp(PQerrorMessage(conn->streamConn)); @@ -1153,3 +1162,56 @@ stringlist_to_identifierstr(PGconn *conn, List *strings) return res.data; } + +static char * +add_backslash(const char *value) +{ + const char *p; + StringInfoData res; + + initStringInfo(&res); + + for (p = value; *p != '\0'; p++) + { + if (*p == ' ') + appendStringInfoChar(&res, '\\'); + + appendStringInfoChar(&res, *p); + } + + return res.data; +} + +static char * +get_transmission_modes(void) +{ + char *res; + char *date_style; + char *interval_style; + char *extra_float_digits; + + date_style = (char *) GetConfigOption("datestyle", true, true); + if (date_style == NULL) + date_style = "ISO, YMD"; + + interval_style = (char *) GetConfigOption("interval", true, true); + if (interval_style == NULL) + interval_style = "postgres"; + + extra_float_digits = (char *) GetConfigOption("extra_float_digits", true, true); + if (extra_float_digits == NULL) + extra_float_digits = "3"; + + date_style = add_backslash(date_style); + interval_style = add_backslash(interval_style); + extra_float_digits = add_backslash(extra_float_digits); + + res = psprintf("-c datestyle=%s -c intervalstyle=%s -c extra_float_digits=%s", + date_style, interval_style, extra_float_digits); + + pfree(date_style); + pfree(interval_style); + pfree(extra_float_digits); + + return res; +} diff --git a/src/test/subscription/t/100_bugs.pl b/src/test/subscription/t/100_bugs.pl index baa4a90771..d92ab60d86 100644 --- a/src/test/subscription/t/100_bugs.pl +++ b/src/test/subscription/t/100_bugs.pl @@ -6,7 +6,7 @@ use strict; use warnings; use PostgresNode; use TestLib; -use Test::More tests => 5; +use Test::More tests => 7; # Bug #15114 @@ -224,3 +224,69 @@ $node_sub->safe_psql('postgres', "DROP TABLE tab1"); $node_pub->stop('fast'); $node_pub_sub->stop('fast'); $node_sub->stop('fast'); + +# Verify different datestyle between publisher and subscriber. +$node_publisher = PostgresNode->new('datestyle_publisher'); +$node_publisher->init(allows_streaming => 'logical'); +$node_publisher->append_conf('postgresql.conf', + "datestyle = 'SQL, MDY'"); +$node_publisher->append_conf('postgresql.conf', + "extra_float_digits = '-4'"); +$node_publisher->start; + +$node_subscriber = PostgresNode->new('datestyle_subscriber'); +$node_subscriber->init(allows_streaming => 'logical'); +$node_subscriber->append_conf('postgresql.conf', + "datestyle = 'SQL, DMY'"); +$node_subscriber->start; + +# Table for datestyle +$node_publisher->safe_psql('postgres', + "CREATE TABLE tab_rep(a date)"); +$node_subscriber->safe_psql('postgres', + "CREATE TABLE tab_rep(a date)"); + +# Table for extra_float_digits +$node_publisher->safe_psql('postgres', + "CREATE TABLE flt_rep(a real, d double precision)"); +$node_subscriber->safe_psql('postgres', + "CREATE TABLE flt_rep(a real, d double precision)"); + +# Setup logical replication +my $node_publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION tab_pub FOR ALL TABLES"); +$node_subscriber->safe_psql('postgres', + "CREATE SUBSCRIPTION tab_sub CONNECTION '$node_publisher_connstr' PUBLICATION tab_pub"); + +$node_publisher->safe_psql('postgres', + "INSERT INTO tab_rep VALUES ('07-18-2021'), ('05-15-2018')"); + +$node_publisher->wait_for_catchup('tab_sub'); + +my $result = $node_subscriber->safe_psql('postgres', + "SELECT count(*) FROM tab_rep"); +is($result, qq(2), 'failed to replication date from different datestyle'); + +$node_publisher->safe_psql('postgres', + "INSERT INTO flt_rep VALUES (1.2121323, 32.32321232132434)"); + +$node_publisher->wait_for_catchup('tab_sub'); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT a, d FROM flt_rep"); +is($result, qq(1.2121323|32.32321232132434), + 'failed to replication floating-point values'); + +# Clean up the tables on both publisher and subscriber as we don't need them +$node_publisher->safe_psql('postgres', 'DROP TABLE tab_rep'); +$node_subscriber->safe_psql('postgres', 'DROP TABLE tab_rep'); +$node_publisher->safe_psql('postgres', 'DROP TABLE flt_rep'); +$node_subscriber->safe_psql('postgres', 'DROP TABLE flt_rep'); + +# Drop subscription/publication as we don't need anymore +$node_subscriber->safe_psql('postgres', 'DROP SUBSCRIPTION tab_sub'); +$node_publisher->safe_psql('postgres', 'DROP PUBLICATION tab_pub'); + +$node_publisher->stop('fast'); +$node_subscriber->stop('fast');