>From ac48fc2f5c5f0031494cfabb0bca46f0bbef47d2 Mon Sep 17 00:00:00 2001 From: Andres Freund Date: Mon, 19 Aug 2013 13:24:30 +0200 Subject: [PATCH 03/13] wal_decoding: Allow walsender's to connect to a specific database Currently the decision whether to connect to a database or not is made by checking whether the passed "dbname" parameter is "replication". Unfortunately this makes it impossible to connect a to a database named replication... Possibly it would be better to use a separate connection parameter like replication_dbname=xxx? This is useful for future walsender commands which need database interaction. --- doc/src/sgml/protocol.sgml | 5 +++- src/backend/postmaster/postmaster.c | 13 +++++++++-- .../libpqwalreceiver/libpqwalreceiver.c | 4 ++-- src/backend/replication/walsender.c | 27 ++++++++++++++++++---- src/backend/utils/init/postinit.c | 5 ++++ src/bin/pg_basebackup/pg_basebackup.c | 4 ++-- src/bin/pg_basebackup/pg_receivexlog.c | 4 ++-- src/bin/pg_basebackup/receivelog.c | 4 ++-- 8 files changed, 51 insertions(+), 15 deletions(-) diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml index 0b2e60e..51b4435 100644 --- a/doc/src/sgml/protocol.sgml +++ b/doc/src/sgml/protocol.sgml @@ -1304,7 +1304,10 @@ To initiate streaming replication, the frontend sends the replication parameter in the startup message. This tells the backend to go into walsender mode, wherein a small set of replication commands can be issued instead of SQL statements. Only the simple query protocol can be -used in walsender mode. +used in walsender mode. A dbname of replication will +start a walsender not connected to any database, specifying any other database +will connect to that. Connecting to a specific database is only required for +logical replication so far. The commands accepted in walsender mode are: diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c index 01d2618..7520e42 100644 --- a/src/backend/postmaster/postmaster.c +++ b/src/backend/postmaster/postmaster.c @@ -1983,9 +1983,18 @@ retry1: if (strlen(port->user_name) >= NAMEDATALEN) port->user_name[NAMEDATALEN - 1] = '\0'; - /* Walsender is not related to a particular database */ - if (am_walsender) + /* + * Generic walsender, e.g. for streaming replication, is not connected to a + * particular database. But walsenders used for logical replication need to + * connect to a specific database. Unfortunately the initial choices for + * distinguishing normal connections from replication connections included + * dbname=replication being specified for the latter. + * We now assume that a database name + */ + if (am_walsender && strcmp(port->database_name, "replication") == 0) port->database_name[0] = '\0'; + else if (am_walsender) + elog(DEBUG1, "WAL sender attaching to database %s", port->database_name); /* * Done putting stuff in TopMemoryContext. diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c index 6bc0aa1..ee0f1fe 100644 --- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c +++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c @@ -130,7 +130,7 @@ libpqrcv_identify_system(TimeLineID *primary_tli) "the primary server: %s", PQerrorMessage(streamConn)))); } - if (PQnfields(res) != 3 || PQntuples(res) != 1) + if (PQnfields(res) != 4 || PQntuples(res) != 1) { int ntuples = PQntuples(res); int nfields = PQnfields(res); @@ -138,7 +138,7 @@ libpqrcv_identify_system(TimeLineID *primary_tli) PQclear(res); ereport(ERROR, (errmsg("invalid response from primary server"), - errdetail("Expected 1 tuple with 3 fields, got %d tuples with %d fields.", + errdetail("Expected 1 tuple with 4 fields, got %d tuples with %d fields.", ntuples, nfields))); } primary_sysid = PQgetvalue(res, 0, 0); diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index eae6e59..f6463fc 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -47,6 +47,7 @@ #include "access/transam.h" #include "access/xlog_internal.h" #include "catalog/pg_type.h" +#include "commands/dbcommands.h" #include "funcapi.h" #include "libpq/libpq.h" #include "libpq/pqformat.h" @@ -243,10 +244,12 @@ IdentifySystem(void) char tli[11]; char xpos[MAXFNAMELEN]; XLogRecPtr logptr; + char* dbname = NULL; /* - * Reply with a result set with one row, three columns. First col is - * system ID, second is timeline ID, and third is current xlog location. + * Reply with a result set with one row, four columns. First col is system + * ID, second is timeline ID, third is current xlog location and the fourth + * contains the database name if we are connected to one. */ snprintf(sysid, sizeof(sysid), UINT64_FORMAT, @@ -265,9 +268,14 @@ IdentifySystem(void) snprintf(xpos, sizeof(xpos), "%X/%X", (uint32) (logptr >> 32), (uint32) logptr); + if (MyDatabaseId != InvalidOid) + dbname = get_database_name(MyDatabaseId); + else + dbname = "(none)"; + /* Send a RowDescription message */ pq_beginmessage(&buf, 'T'); - pq_sendint(&buf, 3, 2); /* 3 fields */ + pq_sendint(&buf, 4, 2); /* 4 fields */ /* first field */ pq_sendstring(&buf, "systemid"); /* col name */ @@ -295,17 +303,28 @@ IdentifySystem(void) pq_sendint(&buf, -1, 2); pq_sendint(&buf, 0, 4); pq_sendint(&buf, 0, 2); + + /* fourth field */ + pq_sendstring(&buf, "dbname"); + pq_sendint(&buf, 0, 4); + pq_sendint(&buf, 0, 2); + pq_sendint(&buf, TEXTOID, 4); + pq_sendint(&buf, -1, 2); + pq_sendint(&buf, 0, 4); + pq_sendint(&buf, 0, 2); pq_endmessage(&buf); /* Send a DataRow message */ pq_beginmessage(&buf, 'D'); - pq_sendint(&buf, 3, 2); /* # of columns */ + pq_sendint(&buf, 4, 2); /* # of columns */ pq_sendint(&buf, strlen(sysid), 4); /* col1 len */ pq_sendbytes(&buf, (char *) &sysid, strlen(sysid)); pq_sendint(&buf, strlen(tli), 4); /* col2 len */ pq_sendbytes(&buf, (char *) tli, strlen(tli)); pq_sendint(&buf, strlen(xpos), 4); /* col3 len */ pq_sendbytes(&buf, (char *) xpos, strlen(xpos)); + pq_sendint(&buf, strlen(dbname), 4); /* col4 len */ + pq_sendbytes(&buf, (char *) dbname, strlen(dbname)); pq_endmessage(&buf); } diff --git a/src/backend/utils/init/postinit.c b/src/backend/utils/init/postinit.c index 2c7f0f1..56c352c 100644 --- a/src/backend/utils/init/postinit.c +++ b/src/backend/utils/init/postinit.c @@ -725,7 +725,12 @@ InitPostgres(const char *in_dbname, Oid dboid, const char *username, ereport(FATAL, (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), errmsg("must be superuser or replication role to start walsender"))); + } + if (am_walsender && + (in_dbname == NULL || in_dbname[0] == '\0') && + dboid == InvalidOid) + { /* process any options passed in the startup packet */ if (MyProcPort != NULL) process_startup_options(MyProcPort, am_superuser); diff --git a/src/bin/pg_basebackup/pg_basebackup.c b/src/bin/pg_basebackup/pg_basebackup.c index a1e12a8..89e2376 100644 --- a/src/bin/pg_basebackup/pg_basebackup.c +++ b/src/bin/pg_basebackup/pg_basebackup.c @@ -1361,11 +1361,11 @@ BaseBackup(void) progname, "IDENTIFY_SYSTEM", PQerrorMessage(conn)); disconnect_and_exit(1); } - if (PQntuples(res) != 1 || PQnfields(res) != 3) + if (PQntuples(res) != 1 || PQnfields(res) != 4) { fprintf(stderr, _("%s: could not identify system: got %d rows and %d fields, expected %d rows and %d fields\n"), - progname, PQntuples(res), PQnfields(res), 1, 3); + progname, PQntuples(res), PQnfields(res), 1, 4); disconnect_and_exit(1); } sysidentifier = pg_strdup(PQgetvalue(res, 0, 0)); diff --git a/src/bin/pg_basebackup/pg_receivexlog.c b/src/bin/pg_basebackup/pg_receivexlog.c index 787a395..fe8aef6 100644 --- a/src/bin/pg_basebackup/pg_receivexlog.c +++ b/src/bin/pg_basebackup/pg_receivexlog.c @@ -252,11 +252,11 @@ StreamLog(void) progname, "IDENTIFY_SYSTEM", PQerrorMessage(conn)); disconnect_and_exit(1); } - if (PQntuples(res) != 1 || PQnfields(res) != 3) + if (PQntuples(res) != 1 || PQnfields(res) != 4) { fprintf(stderr, _("%s: could not identify system: got %d rows and %d fields, expected %d rows and %d fields\n"), - progname, PQntuples(res), PQnfields(res), 1, 3); + progname, PQntuples(res), PQnfields(res), 1, 4); disconnect_and_exit(1); } servertli = atoi(PQgetvalue(res, 0, 1)); diff --git a/src/bin/pg_basebackup/receivelog.c b/src/bin/pg_basebackup/receivelog.c index d56a4d7..22a5340 100644 --- a/src/bin/pg_basebackup/receivelog.c +++ b/src/bin/pg_basebackup/receivelog.c @@ -534,11 +534,11 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, PQclear(res); return false; } - if (PQnfields(res) != 3 || PQntuples(res) != 1) + if (PQnfields(res) != 4 || PQntuples(res) != 1) { fprintf(stderr, _("%s: could not identify system: got %d rows and %d fields, expected %d rows and %d fields\n"), - progname, PQntuples(res), PQnfields(res), 1, 3); + progname, PQntuples(res), PQnfields(res), 1, 4); PQclear(res); return false; } -- 1.8.3.251.g1462b67