diff --git a/web/pgadmin/tools/sqleditor/tests/test_encoding_charset.py b/web/pgadmin/tools/sqleditor/tests/test_encoding_charset.py new file mode 100644 index 0000000..0826dec --- /dev/null +++ b/web/pgadmin/tools/sqleditor/tests/test_encoding_charset.py @@ -0,0 +1,103 @@ +########################################################################## +# +# pgAdmin 4 - PostgreSQL Tools +# +# Copyright (C) 2013 - 2018, The pgAdmin Development Team +# This software is released under the PostgreSQL Licence +# +########################################################################## + +from pgadmin.utils.route import BaseTestGenerator +from pgadmin.browser.server_groups.servers.databases.tests import utils as \ + database_utils +from regression import parent_node_dict +from regression.python_test_utils import test_utils +import json + + +class TestEncodingCharset(BaseTestGenerator): + """ + This class validates character support in pgAdmin4 for + different PostgresDB encodings + """ + scenarios = [ + ( + 'With Encoding UTF8', + dict( + db_encoding='UTF8', + lc_collate='C', + test_str='A' + )), + ( + 'With Encoding WIN1252', + dict( + db_encoding='WIN1252', + lc_collate='C', + test_str='A' + )), + ( + 'With Encoding EUC_CN', + dict( + db_encoding='EUC_CN', + lc_collate='C', + test_str='A' + )), + ( + 'With Encoding SQL_ASCII', + dict( + db_encoding='SQL_ASCII', + lc_collate='C', + test_str='\\255' + )), + ] + + def setUp(self): + self.encode_db_name = 'encoding_' + self.db_encoding + self.encode_sid = self.server_information['server_id'] + self.encode_did = test_utils.create_database( + self.server, self.encode_db_name, + (self.db_encoding, self.lc_collate)) + + def runTest(self): + + db_con = database_utils.connect_database(self, + test_utils.SERVER_GROUP, + self.encode_sid, + self.encode_did) + if not db_con["info"] == "Database connected.": + raise Exception("Could not connect to the database.") + + # Initialize query tool + url = '/datagrid/initialize/query_tool/{0}/{1}/{2}'.format( + test_utils.SERVER_GROUP, self.encode_sid, self.encode_did) + response = self.tester.post(url) + self.assertEquals(response.status_code, 200) + + response_data = json.loads(response.data.decode('utf-8')) + self.trans_id = response_data['data']['gridTransId'] + + # Check character + url = "/sqleditor/query_tool/start/{0}".format(self.trans_id) + sql = "select E'{0}';".format(self.test_str) + response = self.tester.post(url, data=json.dumps({"sql": sql}), + content_type='html/json') + self.assertEquals(response.status_code, 200) + url = '/sqleditor/poll/{0}'.format(self.trans_id) + response = self.tester.get(url) + self.assertEquals(response.status_code, 200) + response_data = json.loads(response.data.decode('utf-8')) + self.assertEquals(response_data['data']['rows_fetched_to'], 1) + + database_utils.disconnect_database(self, self.encode_sid, + self.encode_did) + + def tearDown(self): + main_conn = test_utils.get_db_connection( + self.server['db'], + self.server['username'], + self.server['db_password'], + self.server['host'], + self.server['port'], + self.server['sslmode'] + ) + test_utils.drop_database(main_conn, self.encode_db_name) diff --git a/web/pgadmin/utils/driver/psycopg2/connection.py b/web/pgadmin/utils/driver/psycopg2/connection.py index cfd161a..e2538d1 100644 --- a/web/pgadmin/utils/driver/psycopg2/connection.py +++ b/web/pgadmin/utils/driver/psycopg2/connection.py @@ -33,7 +33,7 @@ from pgadmin.utils import get_complete_file_path from ..abstract import BaseDriver, BaseConnection from .cursor import DictCursor from .typecast import register_global_typecasters, \ - register_string_typecasters, register_binary_typecasters, \ + register_binary_typecasters, \ register_array_to_string_typecasters, ALL_JSON_TYPES @@ -387,8 +387,6 @@ class Connection(BaseConnection): else: self.conn.autocommit = True - register_string_typecasters(self.conn) - if self.array_to_string: register_array_to_string_typecasters(self.conn) @@ -397,10 +395,19 @@ class Connection(BaseConnection): if self.use_binary_placeholder: register_binary_typecasters(self.conn) - status = _execute(cur, "SET DateStyle=ISO;" - "SET client_min_messages=notice;" - "SET bytea_output=escape;" - "SET client_encoding='UNICODE';") + if self.conn.encoding not in ('SQL_ASCII', 'SQLASCII', + 'MULE_INTERNAL', 'MULEINTERNAL'): + status = _execute(cur, "SET DateStyle=ISO;" + "SET client_min_messages=notice;" + "SET bytea_output=escape;" + "SET client_encoding='UNICODE';") + + encodings[self.conn.encoding] = 'utf-8' + else: + status = _execute(cur, "SET DateStyle=ISO;" + "SET client_min_messages=notice;" + "SET bytea_output=escape;") + encodings[self.conn.encoding] = 'raw_unicode_escape' if status is not None: self.conn.close() diff --git a/web/pgadmin/utils/driver/psycopg2/typecast.py b/web/pgadmin/utils/driver/psycopg2/typecast.py index f136604..a8f6c38 100644 --- a/web/pgadmin/utils/driver/psycopg2/typecast.py +++ b/web/pgadmin/utils/driver/psycopg2/typecast.py @@ -163,49 +163,6 @@ def register_global_typecasters(): psycopg2.extensions.register_type(pg_array_types_to_array_of_string_type) -def register_string_typecasters(connection): - if connection.encoding != 'UTF8': - # In python3 when database encoding is other than utf-8 and client - # encoding is set to UNICODE then we need to map data from database - # encoding to utf-8. - # This is required because when client encoding is set to UNICODE then - # psycopg assumes database encoding utf-8 and not the actual encoding. - # Not sure whether it's bug or feature in psycopg for python3. - if sys.version_info >= (3,): - def return_as_unicode(value, cursor): - if value is None: - return None - # Treat value as byte sequence of database encoding and then - # decode it as utf-8 to get correct unicode value. - return bytes( - value, encodings[cursor.connection.encoding] - ).decode('utf-8') - - unicode_type = psycopg2.extensions.new_type( - # "char", name, text, character, character varying - (19, 18, 25, 1042, 1043, 0), - 'UNICODE', return_as_unicode) - else: - def return_as_unicode(value, cursor): - if value is None: - return None - # Decode it as utf-8 to get correct unicode value. - return value.decode('utf-8') - - unicode_type = psycopg2.extensions.new_type( - # "char", name, text, character, character varying - (19, 18, 25, 1042, 1043, 0), - 'UNICODE', return_as_unicode) - - unicode_array_type = psycopg2.extensions.new_array_type( - # "char"[], name[], text[], character[], character varying[] - (1002, 1003, 1009, 1014, 1015, 0 - ), 'UNICODEARRAY', unicode_type) - - psycopg2.extensions.register_type(unicode_type) - psycopg2.extensions.register_type(unicode_array_type) - - def register_binary_typecasters(connection): psycopg2.extensions.register_type( psycopg2.extensions.new_type( diff --git a/web/regression/python_test_utils/test_utils.py b/web/regression/python_test_utils/test_utils.py index 3e517b6..464a09e 100644 --- a/web/regression/python_test_utils/test_utils.py +++ b/web/regression/python_test_utils/test_utils.py @@ -116,7 +116,7 @@ def clear_node_info_dict(): del node_info_dict[node][:] -def create_database(server, db_name): +def create_database(server, db_name, encoding=None): """This function used to create database and returns the database id""" try: connection = get_db_connection( @@ -130,8 +130,14 @@ def create_database(server, db_name): old_isolation_level = connection.isolation_level connection.set_isolation_level(0) pg_cursor = connection.cursor() - pg_cursor.execute( - '''CREATE DATABASE "%s" TEMPLATE template0''' % db_name) + if encoding is None: + pg_cursor.execute( + '''CREATE DATABASE "%s" TEMPLATE template0''' % db_name) + else: + pg_cursor.execute( + '''CREATE DATABASE "%s" TEMPLATE template0 + ENCODING='%s' LC_COLLATE='%s' LC_CTYPE='%s' ''' % + (db_name, encoding[0], encoding[1], encoding[1])) connection.set_isolation_level(old_isolation_level) connection.commit()