diff --git a/web/pgadmin/browser/server_groups/servers/databases/casts/tests/cast_test_cases.json b/web/pgadmin/browser/server_groups/servers/databases/casts/tests/cast_test_cases.json new file mode 100644 index 000000000..ab03a0853 --- /dev/null +++ b/web/pgadmin/browser/server_groups/servers/databases/casts/tests/cast_test_cases.json @@ -0,0 +1,796 @@ +{ + "cast_create": [ + { + "name": "TC_01 - Create cast: With valid source & target type of implicit_type.", + "is_positive_test": true, + "inventory_data": {}, + "test_data": { + "castcontext": "IMPLICIT", + "encoding": "UTF8", + "name": "money->bigint", + "srctyp": "money", + "trgtyp": "bigint" + }, + "mocking_required": false, + "mock_data": {}, + "expected_data": { + "status_code": 200, + "error_msg": null, + "test_result_data": {} + }, + "sql_to_check_backend": "" + }, + { + "name": "TC_02 - Create cast: With valid source & target type of explict_type.", + "is_positive_test": true, + "inventory_data": {}, + "test_data": { + "castcontext": "EXPLICIT", + "encoding": "UTF8", + "name": "timestamp with time zone->bigint", + "srctyp": "timestamp with time zone", + "trgtyp": "bigint" + }, + "mocking_required": false, + "mock_data": {}, + "expected_data": { + "status_code": 200, + "error_msg": null, + "test_result_data": {} + }, + "sql_to_check_backend": "" + }, + { + "name": "TC_03 - Create cast: With invalid source type.", + "is_positive_test": false, + "inventory_data": {}, + "test_data": { + "castcontext": "IMPLICIT", + "encoding": "UTF8", + "name": "money->bigint", + "srctyp": "money123", + "trgtyp": "bigint" + }, + "mocking_required": false, + "mock_data": {}, + "expected_data": { + "status_code": 410, + "error_msg": "ERROR: type \"money123\" does not exist", + "test_result_data": {} + }, + "sql_to_check_backend": "" + }, + { + "name": "TC_04 - Create cast: With insufficient parameters missing target type.", + "is_positive_test": false, + "inventory_data": {}, + "test_data": { + "castcontext": "IMPLICIT", + "encoding": "UTF8", + "name": "money->bigint", + "srctyp": "money" + }, + "mocking_required": false, + "mock_data": {}, + "expected_data": { + "status_code": 410, + "error_msg": "Could not find the required parameter (trgtyp)", + "test_result_data": {} + }, + "sql_to_check_backend": "" + }, + { + "name": "TC_05 - Create cast: With valid data while server down", + "is_positive_test": false, + "inventory_data": {}, + "test_data": { + "castcontext": "IMPLICIT", + "encoding": "UTF8", + "name": "money->bigint", + "srctyp": "money", + "trgtyp": "bigint" + }, + "mocking_required": true, + "mock_data": { + "function_name": "pgadmin.utils.driver.psycopg2.connection.Connection.execute_scalar", + "return_value": "(False,'Mocked Internal Server Error')" + }, + "expected_data": { + "status_code": 500, + "error_msg": "Mocked Internal Server Error", + "test_result_data": {} + }, + "sql_to_check_backend": "" + }, + { + "name": "TC_06 - Create cast: With valid data while exception.", + "is_positive_test": false, + "inventory_data": {}, + "test_data": { + "castcontext": "IMPLICIT", + "encoding": "UTF8", + "name": "money->bigint", + "srctyp": "money", + "trgtyp": "bigint" + }, + "mocking_required": true, + "mock_data": { + "function_name": "pgadmin.utils.driver.psycopg2.connection.Connection.execute_scalar", + "return_value": "Exception('Mocked Exception Message')" + }, + "expected_data": { + "status_code": 500, + "error_msg": "Mocked Exception Message", + "test_result_data": {} + }, + "sql_to_check_backend": "" + } + ], + "cast_create_get_functions": [ + { + "name": "TC_01 - From create cast dialogue, get available cast functions for valid source & target type", + "is_positive_test": true, + "inventory_data": {}, + "test_data": { + "castcontext": "IMPLICIT", + "encoding": "UTF8", + "name": "integer->bigint", + "srctyp": "integer", + "trgtyp": "bigint" + }, + "mocking_required": false, + "mock_data": {}, + "expected_data": { + "status_code": 200, + "error_msg": null, + "test_result_data": {} + }, + "sql_to_check_backend": "" + }, + { + "name": "TC_02 - From create cast dialogue, get available cast functions when server is down", + "is_positive_test": false, + "inventory_data": {}, + "test_data": { + "castcontext": "IMPLICIT", + "encoding": "UTF8", + "name": "integer->bigint", + "srctyp": "integer", + "trgtyp": "bigint" + }, + "mocking_required": true, + "mock_data": { + "function_name": "pgadmin.utils.driver.psycopg2.connection.Connection.execute_dict", + "return_value": "(False,'Mocked Internal Server Error')" + }, + "expected_data": { + "status_code": 500, + "error_msg": "Mocked Internal Server Error", + "test_result_data": {} + }, + "sql_to_check_backend": "" + } + ], + + + "cast_create_get_type": [ + { + "name": "TC_01 - From create cast dialogue get available cast types", + "is_positive_test": true, + "inventory_data": {}, + "test_data": {}, + "mocking_required": false, + "mock_data": {}, + "expected_data": { + "status_code": 200, + "error_msg": null, + "test_result_data": {} + }, + "sql_to_check_backend": "" + }, + { + "name": "TC_02 - From create cast dialogue get available cast types while server is down", + "is_positive_test": false, + "inventory_data": {}, + "test_data": {}, + "mocking_required": true, + "mock_data": { + "function_name": "pgadmin.utils.driver.psycopg2.connection.Connection.execute_dict", + "return_value": "(False,'Mocked Internal Server Error')" + }, + "expected_data": { + "status_code": 500, + "error_msg": "Mocked Internal Server Error", + "test_result_data": {} + }, + "sql_to_check_backend": "" + } + ], + + "cast_delete": [ + { + "name": "TC_01 - Delete existing cast using cast id", + "is_positive_test": true, + "inventory_data": { + "castcontext": "EXPLICIT", + "encoding": "UTF8", + "name": "timestamp with time zone->bigint", + "srctyp": "timestamp with time zone", + "trgtyp": "bigint" + }, + "test_data": {}, + "mocking_required": false, + "mock_data": {}, + "expected_data": { + "status_code": 200, + "error_msg": null, + "test_result_data": {} + } + }, + { + "name": "TC_02 - Delete non-existing cast using cast id", + "is_positive_test": false, + "inventory_data": { + "castcontext": "IMPLICIT", + "encoding": "UTF8", + "name": "money->bigint", + "srctyp": "money", + "trgtyp": "bigint" + }, + "test_data": {}, + "mocking_required": false, + "mock_data": {}, + "expected_data": { + "status_code": 410, + "error_msg": "", + "test_result_data": {} + }, + "sql_to_check_backend": "" + }, + { + "name": "TC_03 - Delete existing cast using cast id while server down", + "is_positive_test": false, + "inventory_data": { + "castcontext": "IMPLICIT", + "encoding": "UTF8", + "name": "money->bigint", + "srctyp": "money", + "trgtyp": "bigint" + }, + "test_data": {}, + "mocking_required": true, + "mock_data": { + "function_name": "pgadmin.utils.driver.psycopg2.connection.Connection.execute_dict", + "return_value": "(False,'Mocked Internal Server Error')" + }, + "expected_data": { + "status_code": 500, + "error_msg": "Mocked Internal Server Error", + "test_result_data": {} + }, + "sql_to_check_backend": "" + } + ], + "cast_delete_multiple": [ + { + "name": "TC_01 - Delete multiple existing casts using cast ids", + "is_positive_test": true, + "inventory_data": { + "castcontext": "EXPLICIT", + "encoding": "UTF8", + "name": "timestamp with time zone->bigint", + "srctyp": "timestamp with time zone", + "trgtyp": "bigint" + }, + "test_data": {}, + "mocking_required": false, + "mock_data": {}, + "expected_data": { + "status_code": 200, + "error_msg": null, + "test_result_data": {} + }, + "sql_to_check_backend": "" + }, + { + "name": "TC_02 - Delete multiple existing casts using cast ids while server down", + "is_positive_test": false, + "inventory_data": { + "castcontext": "IMPLICIT", + "encoding": "UTF8", + "name": "money->bigint", + "srctyp": "money", + "trgtyp": "bigint" + }, + "test_data": {}, + "mocking_required": true, + "mock_data": { + "function_name": "pgadmin.utils.driver.psycopg2.connection.Connection.execute_scalar", + "return_value": "(False,'Mocked Internal Server Error')" + }, + "expected_data": { + "status_code": 500, + "error_msg": "Mocked Internal Server Error", + "test_result_data": {} + }, + "sql_to_check_backend": "" + + }, + { + "name": "TC_03 - Delete multiple existing casts using cast ids while exception", + "is_positive_test": false, + "inventory_data": { + "castcontext": "IMPLICIT", + "encoding": "UTF8", + "name": "money->bigint", + "srctyp": "money", + "trgtyp": "bigint" + }, + "test_data": {}, + "mocking_required": true, + "mock_data": { + "function_name": "pgadmin.utils.driver.psycopg2.connection.Connection.execute_scalar", + "return_value": "Exception('Mocked Exception Message')" + }, + "expected_data": { + "status_code": 500, + "error_msg": "Mocked Exception Message", + "test_result_data": {} + }, + "sql_to_check_backend": "" + + } + ], + + "cast_get": [ + { + "name": "TC_01 - Get cast details: With existing cast id.", + "is_positive_test": true, + "inventory_data": { + "castcontext": "IMPLICIT", + "encoding": "UTF8", + "name": "money->bigint", + "srctyp": "money", + "trgtyp": "bigint" + }, + "test_data": {}, + "mocking_required": false, + "mock_data": {}, + "expected_data": { + "status_code": 200, + "error_msg": null, + "test_result_data": {} + }, + "sql_to_check_backend": "", + "is_list": false + }, + { + "name": "TC_02 - Get casts list: With existing db id.", + "is_positive_test": true, + + "inventory_data": { + "castcontext": "IMPLICIT", + "encoding": "UTF8", + "name": "money->bigint", + "srctyp": "money", + "trgtyp": "bigint" + }, + "test_data": {}, + "mocking_required": false, + "mock_data": {}, + "expected_data": { + "status_code": 200, + "error_msg": null, + "test_result_data": {} + }, + "sql_to_check_backend": "", + "is_list": true + }, + { + "name": "TC_03 - Get cast details: With non existing db id", + "is_positive_test": false, + "inventory_data": { + "castcontext": "IMPLICIT", + "encoding": "UTF8", + "name": "money->bigint", + "srctyp": "money", + "trgtyp": "bigint" + }, + "test_data": {}, + "mocking_required": false, + "mock_data": {}, + "expected_data": { + "status_code": 410, + "error_msg": null, + "test_result_data": {} + }, + "sql_to_check_backend": "", + "is_list": false + }, + { + "name": "TC_04 - Get cast details: With existing cast id while server is down.", + "is_positive_test": false, + "inventory_data": { + "castcontext": "IMPLICIT", + "encoding": "UTF8", + "name": "money->bigint", + "srctyp": "money", + "trgtyp": "bigint" + }, + "test_data": {}, + "mocking_required": true, + "mock_data": { + "function_name": "pgadmin.utils.driver.psycopg2.connection.Connection.execute_dict", + "return_value": "(False,'Mocked Internal Server Error')" + }, + "expected_data": { + "status_code": 500, + "error_msg": "Mocked Internal Server Error", + "test_result_data": {} + }, + "sql_to_check_backend": "", + "is_list": false + }, + { + "name": "TC_05 - Get casts list: With existing db id while server is down.", + "is_positive_test": false, + "inventory_data": { + "castcontext": "IMPLICIT", + "encoding": "UTF8", + "name": "money->bigint", + "srctyp": "money", + "trgtyp": "bigint" + }, + "test_data": {}, + "mocking_required": true, + "mock_data": { + "function_name": "pgadmin.utils.driver.psycopg2.connection.Connection.execute_dict", + "return_value": "(False,'Mocked Internal Server Error')" + }, + "expected_data": { + "status_code": 500, + "error_msg": "Mocked Internal Server Error", + "test_result_data": {} + }, + "sql_to_check_backend": "", + "is_list": true + } + ], + + "cast_get_dependencies_dependants": [ + { + "name": "TC_01 - Get cast dependents with existing cast id", + "is_positive_test": true, + "inventory_data": {"castcontext": "IMPLICIT", + "encoding": "UTF8", + "name": "money->bigint", + "srctyp": "money", + "trgtyp": "bigint"}, + "test_data": {}, + "mocking_required": false, + "mock_data": {}, + "expected_data": { + "status_code": 200, + "error_msg": null, + "test_result_data": {} + }, + "sql_to_check_backend": "", + "is_dependant": true + }, + { + "name": "TC_02 - Get cast dependencies with existing cast id", + "is_positive_test": true, + "inventory_data": { + "castcontext": "IMPLICIT", + "encoding": "UTF8", + "name": "money->bigint", + "srctyp": "money", + "trgtyp": "bigint" + }, + "test_data": {}, + "mocking_required": false, + "mock_data": {}, + "expected_data": { + "status_code": 200, + "error_msg": null, + "test_result_data": {} + }, + "sql_to_check_backend": "", + "is_dependant": true + } + ], + + "cast_get_node": [ + { + "name": "TC_01 - Get cast node details: With existing cast id.", + "is_positive_test": true, + "inventory_data": { + "castcontext": "IMPLICIT", + "encoding": "UTF8", + "name": "money->bigint", + "srctyp": "money", + "trgtyp": "bigint" + }, + "test_data": {}, + "mocking_required": false, + "mock_data": {}, + "expected_data": { + "status_code": 200, + "error_msg": null, + "test_result_data": {} + }, + "sql_to_check_backend": "", + "is_list": false + }, + { + "name": "TC_02 - Get casts nodes list: With existing db id.", + "is_positive_test": true, + "inventory_data": { + "castcontext": "IMPLICIT", + "encoding": "UTF8", + "name": "money->bigint", + "srctyp": "money", + "trgtyp": "bigint" + }, + "test_data": {}, + "mocking_required": false, + "mock_data": {}, + "expected_data": { + "status_code": 200, + "error_msg": null, + "test_result_data": {} + }, + "sql_to_check_backend": "", + "is_list": true + }, + { + "name": "TC_03 - Get cast node details: With non existing db id", + "is_positive_test": false, + "inventory_data": { + "castcontext": "IMPLICIT", + "encoding": "UTF8", + "name": "money->bigint", + "srctyp": "money", + "trgtyp": "bigint" + }, + "test_data": {}, + "mocking_required": false, + "mock_data": {}, + "expected_data": { + "status_code": 410, + "error_msg": null, + "test_result_data": {} + }, + "sql_to_check_backend": "", + "is_list": false + }, + { + "name": "TC_04 - Get cast node details: With existing cast id while server is down.", + "is_positive_test": false, + "inventory_data": { + "castcontext": "IMPLICIT", + "encoding": "UTF8", + "name": "money->bigint", + "srctyp": "money", + "trgtyp": "bigint" + }, + "test_data": {}, + "mocking_required": true, + "mock_data": { + "function_name": "pgadmin.utils.driver.psycopg2.connection.Connection.execute_2darray", + "return_value": "(False,'Mocked Internal Server Error')" + }, + "expected_data": { + "status_code": 500, + "error_msg": "Mocked Internal Server Error", + "test_result_data": {} + }, + "sql_to_check_backend": "", + "is_list": false + }, + { + "name": "TC_05 - Get casts list: With existing db id while server is down.", + "is_positive_test": false, + "inventory_data": { + "castcontext": "IMPLICIT", + "encoding": "UTF8", + "name": "money->bigint", + "srctyp": "money", + "trgtyp": "bigint" + }, + "test_data": {}, + "mocking_required": true, + "mock_data": { + "function_name": "pgadmin.utils.driver.psycopg2.connection.Connection.execute_2darray", + "return_value": "(False,'Mocked Internal Server Error')" + }, + "expected_data": { + "status_code": 500, + "error_msg": "Mocked Internal Server Error", + "test_result_data": {} + }, + "sql_to_check_backend": "", + "is_list": true + } + ], + + "cast_get_sql": [ + { + "name": "TC_01 - Get cast sql for existing cast id", + "is_positive_test": true, + "inventory_data": { + "castcontext": "IMPLICIT", + "encoding": "UTF8", + "name": "money->bigint", + "srctyp": "money", + "trgtyp": "bigint" + }, + "test_data": {}, + "mocking_required": false, + "mock_data": {}, + "expected_data": { + "status_code": 200, + "error_msg": null, + "test_result_data": {} + } + }, + { + "name": "TC_02 - Get cast sql for non-existing cast id", + "is_positive_test": false, + "inventory_data": { + "castcontext": "IMPLICIT", + "encoding": "UTF8", + "name": "money->bigint", + "srctyp": "money", + "trgtyp": "bigint" + }, + "test_data": {}, + "mocking_required": false, + "mock_data": {}, + "expected_data": { + "status_code": 410, + "error_msg": " ", + "test_result_data": {} + } + }, + { + "name": "TC_03 - Get cast sql for existing cast id while server is down", + "is_positive_test": false, + "inventory_data": { + "castcontext": "IMPLICIT", + "encoding": "UTF8", + "name": "money->bigint", + "srctyp": "money", + "trgtyp": "bigint" + }, + "test_data": {}, + "mocking_required": true, + "mock_data": { + "function_name": "pgadmin.utils.driver.psycopg2.connection.Connection.execute_scalar", + "return_value": "(False,'Mocked Internal Server Error')" + }, + "expected_data": { + "status_code": 500, + "error_msg": "Mocked Internal Server Error", + "test_result_data": {} + }, + "sql_to_check_backend": "" + }, + { + "name": "TC_04 - Get cast sql for existing cast id while exception", + "is_positive_test": false, + "inventory_data": { + "castcontext": "IMPLICIT", + "encoding": "UTF8", + "name": "money->bigint", + "srctyp": "money", + "trgtyp": "bigint" + }, + "test_data": {}, + "mocking_required": true, + "mock_data": { + "function_name": "pgadmin.utils.driver.psycopg2.connection.Connection.execute_scalar", + "return_value": "Exception('Mocked Exception Message')" + }, + "expected_data": { + "status_code": 500, + "error_msg": "Mocked Exception Message", + "test_result_data": {} + }, + "sql_to_check_backend": "" + } + ], + + + "cast_put": [ + { + "name": "TC_01 - Update existing cast with valid parameter Value ", + "is_positive_test": true, + "inventory_data": { + "castcontext": "IMPLICIT", + "encoding": "UTF8", + "name": "money->bigint", + "srctyp": "money", + "trgtyp": "bigint" + }, + "test_data": { + "description": "This is cast update comment" + }, + "expected_data": { + "status_code": 200, + "error_msg": null, + "test_result_data": {} + }, + "sql_to_check_backend": "" + }, + { + "name": "TC_02 - Update existing cast with invalid parameter value ", + "is_positive_test": false, + "inventory_data": { + "castcontext": "IMPLICIT", + "encoding": "UTF8", + "name": "money->bigint", + "srctyp": "money", + "trgtyp": "bigint" + }, + "test_data": { + "trgtyp": "bigint344" + }, + "mocking_required": false, + "mock_data": {}, + "expected_data": { + "status_code": 410, + "error_msg": null, + "test_result_data": {} + }, + "sql_to_check_backend": "" + }, + { + "name": "TC_03 - Update existing cast with valid parameter value while server is down", + "is_positive_test": false, + "inventory_data": { + "castcontext": "IMPLICIT", + "encoding": "UTF8", + "name": "money->bigint", + "srctyp": "money", + "trgtyp": "bigint" + }, + "test_data": {}, + "mocking_required": true, + "mock_data": { + "function_name": "pgadmin.utils.driver.psycopg2.connection.Connection.execute_scalar", + "return_value": "(False,'Mocked Internal Server Error')" + }, + "expected_data": { + "status_code": 500, + "error_msg": "Mocked Internal Server Error", + "test_result_data": {} + }, + "sql_to_check_backend": "" + }, + { + "name": "TC_04 - Update non-existing cast with valid parameter value", + "is_positive_test": false, + "inventory_data": { + "castcontext": "IMPLICIT", + "encoding": "UTF8", + "name": "money->bigint", + "srctyp": "money", + "trgtyp": "bigint" + }, + "test_data": {}, + "mocking_required": false, + "mock_data": { + "function_name": "pgadmin.utils.driver.psycopg2.connection.Connection.execute_scalar", + "return_value": "(False,'Mocked Internal Server Error')" + }, + "expected_data": { + "status_code": 500, + "error_msg": "Mocked Internal Server Error", + "test_result_data": {} + }, + "sql_to_check_backend": "" + + } + ] +} diff --git a/web/pgadmin/browser/server_groups/servers/databases/casts/tests/test_cast_create.py b/web/pgadmin/browser/server_groups/servers/databases/casts/tests/test_cast_create.py new file mode 100644 index 000000000..51339d667 --- /dev/null +++ b/web/pgadmin/browser/server_groups/servers/databases/casts/tests/test_cast_create.py @@ -0,0 +1,77 @@ +########################################################################## +# +# pgAdmin 4 - PostgreSQL Tools +# +# Copyright (C) 2013 - 2020, The pgAdmin Development Team +# This software is released under the PostgreSQL Licence +# +########################################################################## + +from __future__ import print_function +from pgadmin.browser.server_groups.servers.databases.tests import \ + utils as database_utils +from pgadmin.utils.route import BaseTestGenerator +from regression import parent_node_dict +from regression.python_test_utils import test_utils as utils +from . import utils as cast_utils +import sys + +if sys.version_info < (3, 3): + from mock import patch +else: + from unittest.mock import patch + + +class CastsCreateTestCase(BaseTestGenerator): + skip_on_database = ['gpdb'] + url = '/browser/cast/obj/' + + # Generates scenarios from cast_test_cases.json file + scenarios = cast_utils.generate_scenarios("cast_create") + + def setUp(self): + """ This function will get data required to create cast.""" + super(CastsCreateTestCase, self).runTest() + self.data = self.test_data + + def runTest(self): + """ This function will add cast under test database. """ + self.server_data = parent_node_dict["database"][-1] + self.server_id = self.server_data["server_id"] + self.db_id = self.server_data['db_id'] + db_con = database_utils.connect_database(self, + utils.SERVER_GROUP, + self.server_id, + self.db_id) + if not db_con["info"] == "Database connected.": + raise Exception("Could not connect to database.") + + if self.is_positive_test: + response = cast_utils.api_create_cast(self) + cast_utils.assert_status_code(self, response) + cast_utils.assert_cast_created(self) + + else: + if self.mocking_required: + with patch(self.mock_data["function_name"], + side_effect=[eval(self.mock_data["return_value"])]): + response = cast_utils.api_create_cast(self) + cast_utils.assert_status_code(self, response) + cast_utils.assert_error_message(self, response) + else: + response = cast_utils.api_create_cast(self) + cast_utils.assert_status_code(self, response) + + def tearDown(self): + """This function disconnect the test database and drop added cast.""" + if self.is_positive_test: + connection = utils.get_db_connection(self.server_data['db_name'], + self.server['username'], + self.server['db_password'], + self.server['host'], + self.server['port'], + self.server['sslmode']) + cast_utils.drop_cast(connection, self.data["srctyp"], + self.data["trgtyp"]) + database_utils.disconnect_database(self, self.server_id, + self.db_id) diff --git a/web/pgadmin/browser/server_groups/servers/databases/casts/tests/test_cast_create_get_functions.py b/web/pgadmin/browser/server_groups/servers/databases/casts/tests/test_cast_create_get_functions.py new file mode 100644 index 000000000..6fa012b33 --- /dev/null +++ b/web/pgadmin/browser/server_groups/servers/databases/casts/tests/test_cast_create_get_functions.py @@ -0,0 +1,62 @@ +########################################################################## +# +# pgAdmin 4 - PostgreSQL Tools +# +# Copyright (C) 2013 - 2020, The pgAdmin Development Team +# This software is released under the PostgreSQL Licence +# +########################################################################## + +from __future__ import print_function +from pgadmin.browser.server_groups.servers.databases.tests import \ + utils as database_utils +from pgadmin.utils.route import BaseTestGenerator +from regression import parent_node_dict +from regression.python_test_utils import test_utils as utils +from . import utils as cast_utils + +import sys + +if sys.version_info < (3, 3): + from mock import patch +else: + from unittest.mock import patch + + +class CastsCreateGetFunctionsTestCase(BaseTestGenerator): + skip_on_database = ['gpdb'] + url = '/browser/cast/' + scenarios = cast_utils.generate_scenarios("cast_create_get_functions") + + def runTest(self): + """ This function will add cast under test database. """ + super(CastsCreateGetFunctionsTestCase, self).runTest() + self.data = self.test_data + self.server_data = parent_node_dict["database"][-1] + self.server_id = self.server_data["server_id"] + self.db_id = self.server_data['db_id'] + db_con = database_utils.connect_database(self, + utils.SERVER_GROUP, + self.server_id, + self.db_id) + if not db_con["info"] == "Database connected.": + raise Exception("Could not connect to database.") + + if self.is_positive_test: + response = cast_utils.api_create_cast_get_functions(self) + cast_utils.assert_status_code(self, response) + + else: + if self.mocking_required: + return_value_object = eval(self.mock_data["return_value"]) + with patch(self.mock_data["function_name"], + side_effect=[return_value_object]): + response = cast_utils.api_create_cast_get_functions(self) + cast_utils.assert_status_code(self, response) + cast_utils.assert_error_message(self, response) + + def tearDown(self): + """This function disconnect the test database and drop added cast.""" + if self.is_positive_test: + database_utils.disconnect_database(self, self.server_id, + self.db_id) diff --git a/web/pgadmin/browser/server_groups/servers/databases/casts/tests/test_cast_create_get_type.py b/web/pgadmin/browser/server_groups/servers/databases/casts/tests/test_cast_create_get_type.py new file mode 100644 index 000000000..fe19c4de0 --- /dev/null +++ b/web/pgadmin/browser/server_groups/servers/databases/casts/tests/test_cast_create_get_type.py @@ -0,0 +1,58 @@ +########################################################################## +# +# pgAdmin 4 - PostgreSQL Tools +# +# Copyright (C) 2013 - 2020, The pgAdmin Development Team +# This software is released under the PostgreSQL Licence +# +########################################################################## + +from __future__ import print_function +from pgadmin.browser.server_groups.servers.databases.tests import \ + utils as database_utils +from pgadmin.utils.route import BaseTestGenerator +from regression import parent_node_dict +from regression.python_test_utils import test_utils as utils +from . import utils as cast_utils + +import sys + +if sys.version_info < (3, 3): + from mock import patch +else: + from unittest.mock import patch + + +class CastsCreateGettypeTestCase(BaseTestGenerator): + skip_on_database = ['gpdb'] + url = '/browser/cast/' + scenarios = cast_utils.generate_scenarios("cast_create_get_type") + + def runTest(self): + """ This function will add cast under test database. """ + super(CastsCreateGettypeTestCase, self).runTest() + self.server_data = parent_node_dict["database"][-1] + self.server_id = self.server_data["server_id"] + self.db_id = self.server_data['db_id'] + db_con = database_utils.connect_database(self, + utils.SERVER_GROUP, + self.server_id, + self.db_id) + if not db_con["info"] == "Database connected.": + raise Exception("Could not connect to database.") + + if self.is_positive_test: + response = cast_utils.api_create_cast_get_type(self) + cast_utils.assert_status_code(self, response) + else: + if self.mocking_required: + with patch(self.mock_data["function_name"], + side_effect=[eval(self.mock_data["return_value"])]): + response = cast_utils.api_create_cast_get_type(self) + cast_utils.assert_status_code(self, response) + cast_utils.assert_error_message(self, response) + + def tearDown(self): + """ This function disconnect the test database and drop added cast. """ + database_utils.disconnect_database(self, self.server_id, + self.db_id) diff --git a/web/pgadmin/browser/server_groups/servers/databases/casts/tests/test_cast_delete.py b/web/pgadmin/browser/server_groups/servers/databases/casts/tests/test_cast_delete.py index d67273472..505821101 100644 --- a/web/pgadmin/browser/server_groups/servers/databases/casts/tests/test_cast_delete.py +++ b/web/pgadmin/browser/server_groups/servers/databases/casts/tests/test_cast_delete.py @@ -16,25 +16,30 @@ from regression import parent_node_dict from regression.python_test_utils import test_utils as utils from . import utils as cast_utils +import sys + +if sys.version_info < (3, 3): + from mock import patch +else: + from unittest.mock import patch + class CastsDeleteTestCase(BaseTestGenerator): + url = '/browser/cast/obj/' """ This class will delete the cast node added under database node. """ skip_on_database = ['gpdb'] - scenarios = [ - # Fetching default URL for cast node. - ('Check Cast Node', dict(url='/browser/cast/obj/')) - ] + scenarios = cast_utils.generate_scenarios("cast_delete") def setUp(self): super(CastsDeleteTestCase, self).setUp() + self.inv_data = self.inventory_data self.default_db = self.server["db"] self.database_info = parent_node_dict['database'][-1] self.db_name = self.database_info['db_name'] self.server["db"] = self.db_name - self.source_type = 'money' - self.target_type = 'bigint' - self.cast_id = cast_utils.create_cast(self.server, self.source_type, - self.target_type) + self.cast_id = cast_utils.create_cast(self.server, + self.inv_data["srctyp"], + self.inv_data["trgtyp"]) def runTest(self): """ This function will delete added cast.""" @@ -52,18 +57,32 @@ class CastsDeleteTestCase(BaseTestGenerator): self.server['host'], self.server['port'], self.server['sslmode']) - response = cast_utils.verify_cast(connection, self.source_type, - self.target_type) - if len(response) == 0: + casts_exists = cast_utils.verify_cast(connection, + self.inv_data["srctyp"], + self.inv_data["trgtyp"]) + if not casts_exists: raise Exception("Could not find cast.") - delete_response = self.tester.delete( - self.url + str(utils.SERVER_GROUP) + '/' + - str(self.server_id) + '/' + str(self.db_id) + - '/' + str(self.cast_id), - follow_redirects=True) - self.assertEquals(delete_response.status_code, 200) + + if self.is_positive_test: + response = cast_utils.api_delete_cast(self, self.cast_id) + cast_utils.assert_status_code(self, response) + else: + if self.mocking_required: + with patch(self.mock_data["function_name"], + side_effect=[eval(self.mock_data["return_value"])]): + response = cast_utils.api_delete_cast(self, self.cast_id) + cast_utils.assert_status_code(self, response) + + cast_utils.assert_error_message(self, response) + else: + response = cast_utils.api_delete_cast(self, 12398) + cast_utils.assert_status_code(self, response) def tearDown(self): + """ Actually Delete cast """ + if not self.is_positive_test: + cast_utils.api_delete_cast(self, self.cast_id) + """This function will disconnect test database.""" database_utils.disconnect_database(self, self.server_id, self.db_id) diff --git a/web/pgadmin/browser/server_groups/servers/databases/casts/tests/test_cast_delete_multiple.py b/web/pgadmin/browser/server_groups/servers/databases/casts/tests/test_cast_delete_multiple.py index 6f4146b5f..70abd043c 100644 --- a/web/pgadmin/browser/server_groups/servers/databases/casts/tests/test_cast_delete_multiple.py +++ b/web/pgadmin/browser/server_groups/servers/databases/casts/tests/test_cast_delete_multiple.py @@ -17,25 +17,30 @@ from regression import parent_node_dict from regression.python_test_utils import test_utils as utils from . import utils as cast_utils +import sys + +if sys.version_info < (3, 3): + from mock import patch +else: + from unittest.mock import patch + class CastsMultipleDeleteTestCase(BaseTestGenerator): """ This class will delete the cast node added under database node. """ skip_on_database = ['gpdb'] - scenarios = [ - # Fetching default URL for cast node. - ('Check Cast Node', dict(url='/browser/cast/obj/')) - ] + url = '/browser/cast/obj/' + scenarios = cast_utils.generate_scenarios("cast_delete_multiple") def setUp(self): super(CastsMultipleDeleteTestCase, self).setUp() + self.inv_data = self.inventory_data self.default_db = self.server["db"] self.database_info = parent_node_dict['database'][-1] self.db_name = self.database_info['db_name'] self.server["db"] = self.db_name - self.source_type = 'money' - self.target_type = 'bigint' - self.cast_id = cast_utils.create_cast(self.server, self.source_type, - self.target_type) + self.cast_id = cast_utils.create_cast(self.server, + self.inv_data["srctyp"], + self.inv_data["trgtyp"]) def runTest(self): """ This function will delete added cast.""" @@ -53,20 +58,32 @@ class CastsMultipleDeleteTestCase(BaseTestGenerator): self.server['host'], self.server['port'], self.server['sslmode']) - response = cast_utils.verify_cast(connection, self.source_type, - self.target_type) - if len(response) == 0: + casts_exists = cast_utils.verify_cast(connection, + self.inv_data["srctyp"], + self.inv_data["trgtyp"]) + if not casts_exists: raise Exception("Could not find cast.") - delete_response = self.tester.delete( - self.url + str(utils.SERVER_GROUP) + '/' + - str(self.server_id) + '/' + str(self.db_id) + - '/', - data=json.dumps({'ids': [self.cast_id]}), - content_type='html/json', - follow_redirects=True) - self.assertEquals(delete_response.status_code, 200) + + if self.is_positive_test: + response = cast_utils.api_delete_casts(self, [self.cast_id]) + cast_utils.assert_status_code(self, response) + else: + if self.mocking_required: + with patch(self.mock_data["function_name"], + side_effect=[eval(self.mock_data["return_value"])]): + response = cast_utils.api_delete_casts(self, + [self.cast_id]) + cast_utils.assert_status_code(self, response) + cast_utils.assert_error_message(self, response) + else: + response = cast_utils.api_delete_casts(self, [self.cast_id]) + cast_utils.assert_status_code(self, response) def tearDown(self): + """ Actually Delete cast """ + if not self.is_positive_test: + cast_utils.api_delete_casts(self, [self.cast_id]) + """This function will disconnect test database.""" database_utils.disconnect_database(self, self.server_id, self.db_id) diff --git a/web/pgadmin/browser/server_groups/servers/databases/casts/tests/test_cast_get.py b/web/pgadmin/browser/server_groups/servers/databases/casts/tests/test_cast_get.py index b168a7318..7331ca9ad 100644 --- a/web/pgadmin/browser/server_groups/servers/databases/casts/tests/test_cast_get.py +++ b/web/pgadmin/browser/server_groups/servers/databases/casts/tests/test_cast_get.py @@ -8,7 +8,6 @@ ########################################################################## from __future__ import print_function - from pgadmin.browser.server_groups.servers.databases.tests import \ utils as database_utils from pgadmin.utils.route import BaseTestGenerator @@ -16,26 +15,31 @@ from regression import parent_node_dict from regression.python_test_utils import test_utils as utils from . import utils as cast_utils +import sys + +if sys.version_info < (3, 3): + from mock import patch +else: + from unittest.mock import patch + class CastsGetTestCase(BaseTestGenerator): """ This class will fetch the cast node added under database node. """ skip_on_database = ['gpdb'] - scenarios = [ - # Fetching default URL for cast node. - ('Check Cast Node', dict(url='/browser/cast/obj/')) - ] + url = '/browser/cast/obj/' + scenarios = cast_utils.generate_scenarios("cast_get") def setUp(self): """ This function will create cast.""" super(CastsGetTestCase, self).setUp() + self.inv_data = self.inventory_data self.default_db = self.server["db"] self.database_info = parent_node_dict['database'][-1] self.db_name = self.database_info['db_name'] self.server["db"] = self.db_name - self.source_type = 'money' - self.target_type = 'bigint' - self.cast_id = cast_utils.create_cast(self.server, self.source_type, - self.target_type) + self.cast_id = cast_utils.create_cast(self.server, + self.inv_data["srctyp"], + self.inv_data["trgtyp"]) def runTest(self): """ This function will fetch added cast.""" @@ -47,12 +51,31 @@ class CastsGetTestCase(BaseTestGenerator): self.db_id) if not db_con["info"] == "Database connected.": raise Exception("Could not connect to database.") - response = self.tester.get( - self.url + str(utils.SERVER_GROUP) + '/' + str( - self.server_id) + '/' + - str(self.db_id) + '/' + str(self.cast_id), - content_type='html/json') - self.assertEquals(response.status_code, 200) + + if self.is_positive_test: + if self.is_list: + response = cast_utils.api_get_cast(self, "") + cast_utils.assert_status_code(self, response) + + else: + response = cast_utils.api_get_cast(self, self.cast_id) + cast_utils.assert_status_code(self, response) + else: + if self.mocking_required: + with patch(self.mock_data["function_name"], + side_effect=[eval(self.mock_data["return_value"])]): + if self.is_list: + response = cast_utils.api_get_cast(self, "") + cast_utils.assert_status_code(self, response) + cast_utils.assert_error_message(self, response) + + else: + response = cast_utils.api_get_cast(self, self.cast_id) + cast_utils.assert_status_code(self, response) + else: + self.cast_id = 12893 + response = cast_utils.api_get_cast(self, self.cast_id) + cast_utils.assert_status_code(self, response) def tearDown(self): """This function disconnect the test database and drop added cast.""" @@ -62,8 +85,8 @@ class CastsGetTestCase(BaseTestGenerator): self.server['host'], self.server['port'], self.server['sslmode']) - cast_utils.drop_cast(connection, self.source_type, - self.target_type) + cast_utils.drop_cast(connection, self.inv_data["srctyp"], + self.inv_data["trgtyp"]) database_utils.disconnect_database(self, self.server_id, self.db_id) self.server['db'] = self.default_db diff --git a/web/pgadmin/browser/server_groups/servers/databases/casts/tests/test_cast_get_dependencies_dependent.py b/web/pgadmin/browser/server_groups/servers/databases/casts/tests/test_cast_get_dependencies_dependent.py new file mode 100644 index 000000000..f814b727e --- /dev/null +++ b/web/pgadmin/browser/server_groups/servers/databases/casts/tests/test_cast_get_dependencies_dependent.py @@ -0,0 +1,80 @@ +########################################################################## +# +# pgAdmin 4 - PostgreSQL Tools +# +# Copyright (C) 2013 - 2020, The pgAdmin Development Team +# This software is released under the PostgreSQL Licence +# +########################################################################## + +from __future__ import print_function + +from pgadmin.browser.server_groups.servers.databases.tests import \ + utils as database_utils +from pgadmin.utils.route import BaseTestGenerator +from regression import parent_node_dict +from regression.python_test_utils import test_utils as utils +from . import utils as cast_utils +import sys + +if sys.version_info < (3, 3): + from mock import patch +else: + from unittest.mock import patch + + +class CastsGetDependentsAndDependencyTestCase(BaseTestGenerator): + """ This class will fetch the cast node added under database node. """ + skip_on_database = ['gpdb'] + url = '/browser/cast/' + scenarios = cast_utils.generate_scenarios( + "cast_get_dependencies_dependants") + + def setUp(self): + """ This function will create cast.""" + super(CastsGetDependentsAndDependencyTestCase, self).setUp() + self.inv_data = self.inventory_data + self.default_db = self.server["db"] + self.database_info = parent_node_dict['database'][-1] + self.db_name = self.database_info['db_name'] + self.server["db"] = self.db_name + self.cast_id = cast_utils.create_cast(self.server, + self.inv_data["srctyp"], + self.inv_data["trgtyp"]) + + def runTest(self): + """ This function will fetch added cast.""" + self.server_id = self.database_info["server_id"] + self.db_id = self.database_info['db_id'] + db_con = database_utils.connect_database(self, + utils.SERVER_GROUP, + self.server_id, + self.db_id) + if not db_con["info"] == "Database connected.": + raise Exception("Could not connect to database.") + + if self.is_positive_test: + if self.is_dependant: + response = cast_utils.api_get_cast_node_dependent(self) + cast_utils.assert_status_code(self, response) + else: + response = cast_utils.api_get_cast_node_dependencies(self) + cast_utils.assert_status_code(self, response) + + # TODO + # Check weather to add -ve tests or not + # as -ve scenarios NOT handled in code + + def tearDown(self): + """This function disconnect the test database and drop added cast.""" + connection = utils.get_db_connection(self.server['db'], + self.server['username'], + self.server['db_password'], + self.server['host'], + self.server['port'], + self.server['sslmode']) + cast_utils.drop_cast(connection, self.inv_data["srctyp"], + self.inv_data["trgtyp"]) + database_utils.disconnect_database(self, self.server_id, + self.db_id) + self.server['db'] = self.default_db diff --git a/web/pgadmin/browser/server_groups/servers/databases/casts/tests/test_cast_get_node.py b/web/pgadmin/browser/server_groups/servers/databases/casts/tests/test_cast_get_node.py new file mode 100644 index 000000000..843675423 --- /dev/null +++ b/web/pgadmin/browser/server_groups/servers/databases/casts/tests/test_cast_get_node.py @@ -0,0 +1,95 @@ +from __future__ import print_function +from pgadmin.browser.server_groups.servers.databases.tests import \ + utils as database_utils +from pgadmin.utils.route import BaseTestGenerator +from regression import parent_node_dict +from regression.python_test_utils import test_utils as utils +from . import utils as cast_utils + +import sys + +if sys.version_info < (3, 3): + from mock import patch +else: + from unittest.mock import patch + + +class CastsGetNodeTestCase(BaseTestGenerator): + """ This class will fetch the cast node added under database node. """ + skip_on_database = ['gpdb'] + url = '/browser/cast/nodes/' + scenarios = cast_utils.generate_scenarios("cast_get_node") + + def setUp(self): + """ This function will create cast.""" + super(CastsGetNodeTestCase, self).setUp() + self.inv_data = self.inventory_data + self.default_db = self.server["db"] + self.database_info = parent_node_dict['database'][-1] + self.db_name = self.database_info['db_name'] + self.server["db"] = self.db_name + self.cast_id = cast_utils.create_cast(self.server, + self.inv_data["srctyp"], + self.inv_data["trgtyp"]) + + def runTest(self): + """ This function will fetch added cast.""" + self.server_id = self.database_info["server_id"] + self.db_id = self.database_info['db_id'] + db_con = database_utils.connect_database(self, + utils.SERVER_GROUP, + self.server_id, + self.db_id) + if not db_con["info"] == "Database connected.": + raise Exception("Could not connect to database.") + + if self.is_positive_test: + if self.is_list: + response = cast_utils.api_get_cast_node(self, "") + cast_utils.assert_status_code(self, response) + else: + response = cast_utils.api_get_cast_node(self, self.cast_id) + cast_utils.assert_status_code(self, response) + else: + if self.mocking_required: + return_value_object = eval(self.mock_data["return_value"]) + with patch(self.mock_data["function_name"], + side_effect=[return_value_object]): + if self.is_list: + response = cast_utils.api_get_cast_node(self, "") + cast_utils.assert_status_code(self, response) + # act_res = response.status_code + # exp_res = self.expected_data["status_code"] + # self.assertEquals(act_res, exp_res) + + cast_utils.assert_error_message(self, response) + + # act_res = response.json["errormsg"] + # exp_res = self.expected_data["error_msg"] + # self.assertEquals(act_res, exp_res) + + else: + response = cast_utils.api_get_cast_node(self, + self.cast_id) + cast_utils.assert_status_code(self, response) + # act_res = response.status_code + # exp_res = self.expected_data["status_code"] + # self.assertEquals(act_res, exp_res) + else: + self.cast_id = 12893 + response = cast_utils.api_get_cast(self, self.cast_id) + cast_utils.assert_status_code(self, response) + + def tearDown(self): + """This function disconnect the test database and drop added cast.""" + connection = utils.get_db_connection(self.server['db'], + self.server['username'], + self.server['db_password'], + self.server['host'], + self.server['port'], + self.server['sslmode']) + cast_utils.drop_cast(connection, self.inv_data["srctyp"], + self.inv_data["trgtyp"]) + database_utils.disconnect_database(self, self.server_id, + self.db_id) + self.server['db'] = self.default_db diff --git a/web/pgadmin/browser/server_groups/servers/databases/casts/tests/test_cast_get_sql.py b/web/pgadmin/browser/server_groups/servers/databases/casts/tests/test_cast_get_sql.py new file mode 100644 index 000000000..20c9d5dab --- /dev/null +++ b/web/pgadmin/browser/server_groups/servers/databases/casts/tests/test_cast_get_sql.py @@ -0,0 +1,81 @@ +########################################################################## +# +# pgAdmin 4 - PostgreSQL Tools +# +# Copyright (C) 2013 - 2020, The pgAdmin Development Team +# This software is released under the PostgreSQL Licence +# +########################################################################## + +from __future__ import print_function + +from pgadmin.browser.server_groups.servers.databases.tests import \ + utils as database_utils +from pgadmin.utils.route import BaseTestGenerator +from regression import parent_node_dict +from regression.python_test_utils import test_utils as utils +from . import utils as cast_utils +import sys + +if sys.version_info < (3, 3): + from mock import patch +else: + from unittest.mock import patch + + +class CastsGetSqlTestCase(BaseTestGenerator): + """ This class will fetch the cast node added under database node. """ + skip_on_database = ['gpdb'] + url = '/browser/cast/' + scenarios = cast_utils.generate_scenarios("cast_get_sql") + + def setUp(self): + """ This function will create cast.""" + super(CastsGetSqlTestCase, self).setUp() + self.inv_data = self.inventory_data + self.default_db = self.server["db"] + self.database_info = parent_node_dict['database'][-1] + self.db_name = self.database_info['db_name'] + self.server["db"] = self.db_name + self.cast_id = cast_utils.create_cast(self.server, + self.inv_data["srctyp"], + self.inv_data["trgtyp"]) + + def runTest(self): + """ This function will fetch added cast.""" + self.server_id = self.database_info["server_id"] + self.db_id = self.database_info['db_id'] + db_con = database_utils.connect_database(self, + utils.SERVER_GROUP, + self.server_id, + self.db_id) + if not db_con["info"] == "Database connected.": + raise Exception("Could not connect to database.") + + if self.is_positive_test: + response = cast_utils.api_get_cast_sql(self) + cast_utils.assert_status_code(self, response) + else: + if self.mocking_required: + with patch(self.mock_data["function_name"], + side_effect=[eval(self.mock_data["return_value"])]): + response = cast_utils.api_get_cast_sql(self) + cast_utils.assert_status_code(self, response) + else: + self.cast_id = 34091 + response = cast_utils.api_get_cast_sql(self) + cast_utils.assert_status_code(self, response) + + def tearDown(self): + """This function disconnect the test database and drop added cast.""" + connection = utils.get_db_connection(self.server['db'], + self.server['username'], + self.server['db_password'], + self.server['host'], + self.server['port'], + self.server['sslmode']) + cast_utils.drop_cast(connection, self.inv_data["srctyp"], + self.inv_data["trgtyp"]) + database_utils.disconnect_database(self, self.server_id, + self.db_id) + self.server['db'] = self.default_db diff --git a/web/pgadmin/browser/server_groups/servers/databases/casts/tests/test_cast_put.py b/web/pgadmin/browser/server_groups/servers/databases/casts/tests/test_cast_put.py index 402e288eb..499c7a2cf 100644 --- a/web/pgadmin/browser/server_groups/servers/databases/casts/tests/test_cast_put.py +++ b/web/pgadmin/browser/server_groups/servers/databases/casts/tests/test_cast_put.py @@ -17,25 +17,31 @@ from pgadmin.utils.route import BaseTestGenerator from regression import parent_node_dict from regression.python_test_utils import test_utils as utils from . import utils as cast_utils +import sys + +if sys.version_info < (3, 3): + from mock import patch +else: + from unittest.mock import patch class CastsPutTestCase(BaseTestGenerator): + url = '/browser/cast/obj/' """ This class will fetch the cast node added under database node. """ skip_on_database = ['gpdb'] - scenarios = [ - # Fetching default URL for cast node. - ('Check Cast Node', dict(url='/browser/cast/obj/')) - ] + scenarios = cast_utils.generate_scenarios("cast_put") def setUp(self): """ This function will create cast.""" super(CastsPutTestCase, self).setUp() + self.inv_data = self.inventory_data + self.data = self.test_data self.default_db = self.server["db"] self.database_info = parent_node_dict['database'][-1] self.db_name = self.database_info['db_name'] self.server["db"] = self.db_name - self.source_type = 'character' - self.target_type = 'cidr' + self.source_type = self.inv_data["srctyp"] + self.target_type = self.inv_data["trgtyp"] self.cast_id = cast_utils.create_cast(self.server, self.source_type, self.target_type) @@ -54,22 +60,27 @@ class CastsPutTestCase(BaseTestGenerator): self.server['db_password'], self.server['host'], self.server['port']) - response = cast_utils.verify_cast(connection, self.source_type, - self.target_type) - if len(response) == 0: + casts_exists = cast_utils.verify_cast(connection, self.source_type, + self.target_type) + if not casts_exists: raise Exception("Could not find cast.") - data = { - "description": "This is cast update comment", - "id": self.cast_id - } - put_response = self.tester.put( - self.url + str(utils.SERVER_GROUP) + '/' + - str(self.server_id) + '/' + str( - self.db_id) + - '/' + str(self.cast_id), - data=json.dumps(data), - follow_redirects=True) - self.assertEquals(put_response.status_code, 200) + + self.data["id"] = self.cast_id + if self.is_positive_test: + response = cast_utils.api_update_cast(self) + cast_utils.assert_status_code(self, response) + else: + if self.mocking_required: + with patch(self.mock_data["function_name"], + side_effect=[eval(self.mock_data["return_value"])]): + response = cast_utils.api_update_cast(self) + cast_utils.assert_status_code(self, response) + cast_utils.assert_error_message(self, response) + else: + if len(self.data) == 1: + self.cast_id = 109822 + response = cast_utils.api_update_cast(self) + cast_utils.assert_status_code(self, response) def tearDown(self): """This function disconnect the test database and drop added cast.""" diff --git a/web/pgadmin/browser/server_groups/servers/databases/casts/tests/utils.py b/web/pgadmin/browser/server_groups/servers/databases/casts/tests/utils.py index 97b788850..f755a4c02 100644 --- a/web/pgadmin/browser/server_groups/servers/databases/casts/tests/utils.py +++ b/web/pgadmin/browser/server_groups/servers/databases/casts/tests/utils.py @@ -8,22 +8,167 @@ ########################################################################## from __future__ import print_function +from regression.python_test_utils.test_utils import * +import os +import json -import sys -import traceback +# def get_cast_data(): +# data = { +# "castcontext": "IMPLICIT", +# "encoding": "UTF8", +# "name": "money->bigint", +# "srctyp": "money", +# "trgtyp": "bigint", +# } +# return data -from regression.python_test_utils.test_utils import get_db_connection +CURRENT_PATH = os.path.dirname(os.path.realpath(__file__)) +BASE_URL = "/browser/cast/" +with open(CURRENT_PATH + "/cast_test_cases.json") as data_file: + test_cases = json.load(data_file) -def get_cast_data(): - data = { - "castcontext": "IMPLICIT", - "encoding": "UTF8", - "name": "money->bigint", - "srctyp": "money", - "trgtyp": "bigint", - } - return data +def generate_scenarios(key): + """ + This function generates the test case scenarios according to key given + to it, e.g. key=ADD, key=update etc. + :param key: for which operation generate the test scenario + :type key: str + :return: scenarios + :rtype: list + """ + scenarios = [] + for scenario in test_cases[key]: + test_name = scenario["name"] + scenario.pop("name") + tup = (test_name, dict(scenario)) + scenarios.append(tup) + return scenarios + + +def api_get_cast(self, cast_id): + return self.tester.get( + self.url + str(SERVER_GROUP) + '/' + str( + self.server_id) + '/' + + str(self.db_id) + '/' + str(cast_id), + content_type='html/json') + + +def api_create_cast(self): + return self.tester.post( + self.url + str(SERVER_GROUP) + '/' + + str(self.server_id) + '/' + str( + self.db_id) + '/', + data=json.dumps(self.data), + content_type='html/json') + + +def api_get_cast_node(self, cast_id): + return self.tester.get( + self.url + str(SERVER_GROUP) + '/' + str( + self.server_id) + '/' + + str(self.db_id) + '/' + str(cast_id), + content_type='html/json') + + +def api_create_cast_get_functions(self): + return self.tester.post( + self.url + 'get_functions/' + str(SERVER_GROUP) + '/' + + str(self.server_id) + '/' + str( + self.db_id) + '/', + data=json.dumps(self.data), + content_type='html/json') + + +def api_delete_cast(self, cast_id): + return self.tester.delete( + self.url + str(SERVER_GROUP) + '/' + + str(self.server_id) + '/' + str(self.db_id) + + '/' + str(cast_id), + follow_redirects=True) + + +def api_delete_casts(self, list_of_cast_ids): + return self.tester.delete( + self.url + str(SERVER_GROUP) + '/' + + str(self.server_id) + '/' + str(self.db_id) + '/', + data=json.dumps({'ids': list_of_cast_ids}), + content_type='html/json', + follow_redirects=True) + + +def api_create_cast_get_type(self): + return self.tester.get( + self.url + 'get_type/' + str(SERVER_GROUP) + '/' + + str(self.server_id) + '/' + str( + self.db_id) + '/', + content_type='html/json') + + +def api_get_cast_node_dependent(self): + return self.tester.get( + self.url + 'dependent/' + str(SERVER_GROUP) + '/' + str( + self.server_id) + '/' + + str(self.db_id) + '/' + str(self.cast_id), + content_type='html/json') + + +def api_get_cast_node_dependencies(self): + return self.tester.get( + self.url + 'dependency/' + str(SERVER_GROUP) + '/' + str( + self.server_id) + '/' + + str(self.db_id) + '/' + str(self.cast_id), + content_type='html/json') + + +def api_get_cast_sql(self): + return self.tester.get( + self.url + 'sql/' + str(SERVER_GROUP) + '/' + str( + self.server_id) + '/' + + str(self.db_id) + '/' + str(self.cast_id), + content_type='html/json') + + +def api_update_cast(self): + return self.tester.put( + self.url + str(SERVER_GROUP) + '/' + + str(self.server_id) + '/' + str( + self.db_id) + + '/' + str(self.cast_id), + data=json.dumps(self.data), + follow_redirects=True) + + +def get_database_connection(self): + return get_db_connection(self.server['db'], + self.server['username'], + self.server['db_password'], + self.server['host'], + self.server['port'], + self.server['sslmode']) + + +def assert_status_code(self, response): + act_res = response.status_code + exp_res = self.expected_data["status_code"] + print(act_res, exp_res) + return self.assertEquals(act_res, exp_res) + + +def assert_error_message(self, response): + act_res = response.json["errormsg"] + exp_res = self.expected_data["error_msg"] + print(act_res, exp_res) + return self.assertEquals(act_res, exp_res) + + +def assert_cast_created(self): + source_type = self.data["srctyp"] + target_type = self.data["trgtyp"] + con = get_database_connection(self) + act_res = verify_cast(con, source_type, target_type) + print(act_res, "True") + return self.assertTrue(act_res) def create_cast(server, source_type, target_type): @@ -72,6 +217,26 @@ def create_cast(server, source_type, target_type): def verify_cast(connection, source_type, target_type): """ This function will verify current cast.""" + try: + pg_cursor = connection.cursor() + pg_cursor.execute( + "SELECT * FROM pg_cast ca WHERE ca.castsource = " + "(SELECT t.oid FROM pg_type t " + "WHERE format_type(t.oid, NULL)='%s') " + "AND ca.casttarget = (SELECT t.oid FROM pg_type t WHERE " + "format_type(t.oid, NULL) = '%s')" % (source_type, target_type)) + casts = pg_cursor.fetchall() + connection.close() + if len(casts) > 0: + return True + else: + return False + except Exception: + traceback.print_exc(file=sys.stderr) + + +def get_cast(connection, source_type, target_type): + """ This function will return current cast details.""" try: pg_cursor = connection.cursor() pg_cursor.execute(