diff --git a/web/pgadmin/browser/server_groups/servers/__init__.py b/web/pgadmin/browser/server_groups/servers/__init__.py index 4271199..0b6c6ca 100644 --- a/web/pgadmin/browser/server_groups/servers/__init__.py +++ b/web/pgadmin/browser/server_groups/servers/__init__.py @@ -818,7 +818,7 @@ class ServerNode(PGChildNodeView): 'servers/password.html', server_label=server.name, username=server.username, - errmsg=e.message if e.message else str(e), + errmsg=getattr(e, 'message', str(e)), _=gettext ) ) diff --git a/web/pgadmin/utils/driver/psycopg2/__init__.py b/web/pgadmin/utils/driver/psycopg2/__init__.py index e3a9e96..e93a82b 100644 --- a/web/pgadmin/utils/driver/psycopg2/__init__.py +++ b/web/pgadmin/utils/driver/psycopg2/__init__.py @@ -131,6 +131,9 @@ class Connection(BaseConnection): * _release() - Release the connection object of psycopg2 + * _reconnect() + - Attempt to reconnect to the database + * _wait(conn) - This method is used to wait for asynchronous connection. This is a blocking call. @@ -181,6 +184,8 @@ class Connection(BaseConnection): self.row_count = 0 self.__notices = None self.password = None + self.wasConnected = False + self.reconnecting = False super(Connection, self).__init__() @@ -233,7 +238,8 @@ class Connection(BaseConnection): encpass = self.password or getattr(mgr, 'password', None) # Reset the existing connection password - self.password = None + if self.reconnecting is not False: + self.password = None if encpass: # Fetch Logged in User Details. @@ -301,8 +307,44 @@ Failed to connect to the database server(#{server_id}) for connection ({conn_id} return False, msg self.conn = pg_conn - self.__backend_pid = pg_conn.get_backend_pid() + self.wasConnected = True + try: + status, msg = self._initialize(conn_id, **kwargs) + except Exception as e: + current_app.logger.exception(e) + self.conn = None + if not self.reconnecting: + self.wasConnected = False + raise e + + if status: + mgr._update_password(encpass) + else: + if not self.reconnecting: + self.wasConnected = False + + return status, msg + + def _initialize(self, conn_id, **kwargs): self.execution_aborted = False + self.__backend_pid = self.conn.get_backend_pid() + + setattr(g, "{0}#{1}".format( + self.manager.sid, + self.conn_id.encode('utf-8') + ), None) + + status, cur = self.__cursor() + formatted_exception_msg = self._formatted_exception_msg + mgr = self.manager + + def _execute(cur, query, params=None): + try: + self.__internal_blocking_execute(cur, query, params) + except psycopg2.Error as pe: + cur.close() + return formatted_exception_msg(pe, False) + return None # autocommit flag does not work with asynchronous connections. # By default asynchronous connection runs in autocommit mode. @@ -313,22 +355,22 @@ Failed to connect to the database server(#{server_id}) for connection ({conn_id} self.conn.autocommit = True register_date_typecasters(self.conn) - status, res = self.execute_scalar(""" + status = _execute(cur, """ SET DateStyle=ISO; SET client_min_messages=notice; SET bytea_output=escape; SET client_encoding='UNICODE';""") - if not status: + if status is not None: self.conn.close() self.conn = None - return False, res + return False, status if mgr.role: - status, res = self.execute_scalar(u"SET ROLE TO %s", [mgr.role]) + status = _execute(cur, u"SET ROLE TO %s", [mgr.role]) - if not status: + if status is not None: self.conn.close() self.conn = None current_app.logger.error(""" @@ -337,35 +379,38 @@ Connect to the database server (#{server_id}) for connection ({conn_id}), but - """.format( server_id=self.manager.sid, conn_id=conn_id, - msg=res + msg=status ) ) return False, \ _("Failed to setup the role with error message:\n{0}").format( - res + status ) if mgr.ver is None: - status, res = self.execute_scalar("SELECT version()") + status = _execute(cur, "SELECT version()") - if status: - mgr.ver = res - mgr.sversion = pg_conn.server_version - else: + if status is not None: self.conn.close() self.conn = None + self.wasConneted = False current_app.logger.error(""" Failed to fetch the version information on the established connection to the database server (#{server_id}) for '{conn_id}' with below error message: {msg} """.format( server_id=self.manager.sid, conn_id=conn_id, - msg=res + msg=status ) ) - return False, res + return False, status + + if cur.rowcount > 0: + row = cur.fetchmany(1)[0] + mgr.ver = row['version'] + mgr.sversion = self.conn.server_version - status, res = self.execute_dict(""" + status = _execute(cur, """ SELECT db.oid as did, db.datname, db.datallowconn, pg_encoding_to_char(db.encoding) AS serverencoding, has_database_privilege(db.oid, 'CREATE') as cancreate, datlastsysoid @@ -373,16 +418,17 @@ FROM pg_database db WHERE db.datname = current_database()""") - if status: + if status is None: mgr.db_info = mgr.db_info or dict() - f_row = res['rows'][0] - mgr.db_info[f_row['did']] = f_row.copy() + if cur.rowcount > 0: + res = cur.fetchmany(1)[0] + mgr.db_info[res['did']] = res.copy() - # We do not have database oid for the maintenance database. - if len(mgr.db_info) == 1: - mgr.did = f_row['did'] + # We do not have database oid for the maintenance database. + if len(mgr.db_info) == 1: + mgr.did = res['did'] - status, res = self.execute_dict(""" + status = _execute(cur, """ SELECT oid as id, rolname as name, rolsuper as is_superuser, rolcreaterole as can_create_role, rolcreatedb as can_create_db @@ -391,28 +437,34 @@ FROM WHERE rolname = current_user""") - if status: + if status is None: mgr.user_info = dict() - f_row = res['rows'][0] - mgr.user_info = f_row.copy() + if cur.rowcount > 0: + mgr.user_info = cur.fetchmany(1)[0] if 'password' in kwargs: mgr.password = kwargs['password'] + server_types = None if 'server_types' in kwargs and isinstance(kwargs['server_types'], list): - for st in kwargs['server_types']: - if st.instanceOf(mgr.ver): - mgr.server_type = st.stype - mgr.server_cls = st - break + server_types = mgr.server_types = kwargs['server_types'] + + if server_types is None: + from pgadmin.browser.server_groups.servers.types import ServerType + server_types = ServerType.types() + + for st in server_types: + if st.instanceOf(mgr.ver): + mgr.server_type = st.stype + mgr.server_cls = st + break - mgr._update_password(encpass) mgr.update_session() return True, None def __cursor(self, server_cursor=False): - if not self.conn: + if self.wasConnected is False: raise ConnectionLost( self.manager.sid, self.db, @@ -439,14 +491,20 @@ Connection to database server (#{server_id}) for the connection - '{conn_id}' ha ) ) - if self.auto_reconnect: - status, errmsg = self.connect() + if self.auto_reconnect and not self.reconnecting: + self.reconnecting = True + try: + status, errmsg = self.connect() + except Exception as e: + current_app.logger.exception(e) + finally: + self.reconnecting = False if not status: errmsg = gettext( - """ -Attempt to reconnect failed with the error: -{0}""".format(errmsg) + "Attempt to reconnect failed with the error:\n{0}".format( + errmsg + ) ) if not status: @@ -464,6 +522,7 @@ Attempt to reconnect failed with the error: else: cur = self.conn.cursor(cursor_factory=DictCursor) except psycopg2.Error as pe: + current_app.logger.exception(pe) errmsg = gettext(""" Failed to create cursor for psycopg2 connection with error message for the \ server#{1}:{2}: @@ -472,7 +531,7 @@ server#{1}:{2}: self.conn.close() self.conn = None - if self.auto_reconnect: + if self.auto_reconnect and not self.reconnecting: current_app.logger.debug(""" Attempting to reconnect to the database server (#{server_id}) for the connection - '{conn_id}'. """.format( @@ -480,7 +539,11 @@ Attempting to reconnect to the database server (#{server_id}) for the connection conn_id=self.conn_id ) ) - status, cur = self.connect() + self.reconnecting = True + try: + status, cur = self.connect() + finally: + self.reconnecting = False if not status: msg = gettext( u""" @@ -537,7 +600,7 @@ Attempt to reconnect it failed with the error: cur.close() errmsg = self._formatted_exception_msg(pe, formatted_exception_msg) current_app.logger.error( - u"Failed to execute query ((with server cursor) for the server #{server_id} - {conn_id} (Query-id: {query_id}):\nError Message:{errmsg}".format( + u"failed to execute query ((with server cursor) for the server #{server_id} - {conn_id} (query-id: {query_id}):\nerror message:{errmsg}".format( server_id=self.manager.sid, conn_id=self.conn_id, query=query, @@ -606,6 +669,10 @@ Attempt to reconnect it failed with the error: except psycopg2.Error as pe: cur.close() if not self.connected(): + if self.auto_reconnect and not self.reconnecting: + return self.__attempt_execution_reconnect( + self.execute_dict, query, params, formatted_exception_msg + ) raise ConnectionLost( self.manager.sid, self.db, @@ -713,6 +780,10 @@ Failed to execute query (execute_async) for the server #{server_id} - {conn_id} except psycopg2.Error as pe: cur.close() if not self.connected(): + if self.auto_reconnect and not self.reconnecting: + return self.__attempt_execution_reconnect( + self.execute_void, query, params, formatted_exception_msg + ) raise ConnectionLost( self.manager.sid, self.db, @@ -736,6 +807,35 @@ Failed to execute query (execute_void) for the server #{server_id} - {conn_id} return True, None + def __attempt_execution_reconnect(self, fn, *args, **kwargs): + self.reconnecting = True + setattr(g, "{0}#{1}".format( + self.manager.sid, + self.conn_id.encode('utf-8') + ), None) + try: + status, msg = self.connect() + if status: + status, res = fn(*args, **kwargs) + self.reconnecting = False + return status, res + except Exception as e: + current_app.logger.exception(e) + self.reconnecting = False + + current_app.warning( + "Failed to reconnect the database server (#{server_id})".format( + server_id=self.manager.sid, + conn_id=self.conn_id + ) + ) + self.reconnecting = False + raise ConnectionLost( + self.manager.sid, + self.db, + None if self.conn_id[0:3] == u'DB:' else self.conn_id[5:] + ) + def execute_2darray(self, query, params=None, formatted_exception_msg=False): status, cur = self.__cursor() self.row_count = 0 @@ -758,11 +858,11 @@ Failed to execute query (execute_void) for the server #{server_id} - {conn_id} except psycopg2.Error as pe: cur.close() if not self.connected(): - raise ConnectionLost( - self.manager.sid, - self.db, - None if self.conn_id[0:3] == u'DB:' else self.conn_id[5:] - ) + if self.wasConnected and self.auto_reconnect and \ + not self.reconnecting: + return self.__attempt_execution_reconnect( + self.execute_2darray, query, params, formatted_exception_msg + ) errmsg = self._formatted_exception_msg(pe, formatted_exception_msg) current_app.logger.error( u"Failed to execute query (execute_2darray) for the server #{server_id} - {conn_id} (Query-id: {query_id}):\nError Message:{errmsg}".format( @@ -809,6 +909,11 @@ Failed to execute query (execute_void) for the server #{server_id} - {conn_id} except psycopg2.Error as pe: cur.close() if not self.connected(): + if self.auto_reconnect and not self.reconnecting: + return self.__attempt_execution_reconnect( + self.execute_dict, query, params, + formatted_exception_msg + ) raise ConnectionLost( self.manager.sid, self.db, @@ -900,10 +1005,12 @@ Failed to reset the connection to the server due to following error: return self.execute_scalar('SELECT 1') def _release(self): - if self.conn: - self.conn.close() - self.conn = None + if self.wasConneted: + if self.conn: + self.conn.close() + self.conn = None self.password = None + self.wasConnected = False def _wait(self, conn): """ @@ -1227,6 +1334,7 @@ class ServerManager(object): self.ssl_mode = server.ssl_mode self.pinged = datetime.datetime.now() self.db_info = dict() + self.server_types = None for con in self.connections: self.connections[con]._release() @@ -1430,7 +1538,7 @@ WHERE db.oid = {0}""".format(did)) self.password = passwd for conn_id in self.connections: conn = self.connections[conn_id] - if conn.conn is not None: + if conn.conn is not None or conn.wasConnected is True: conn.password = passwd def update_session(self):