-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathtest.py
executable file
·121 lines (95 loc) · 5.21 KB
/
test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
#!/usr/bin/env python
import unittest
import json
import mysql_to_bigquery_schema_converter as converter
TEST_DATA_PATH = "./test_data"
TYPES_MAP_PATH = TEST_DATA_PATH + "/type_mappings_map.json"
FIELD_MAP_PATH = TEST_DATA_PATH + "/field_mappings_map.json"
INVALID_TYPES_MAP_PATH = TEST_DATA_PATH + "/invalid_type_mappings_map.json"
INVALID_FIELDS_MAP_PATH = TEST_DATA_PATH + "/invalid_field_mappings_map.json"
class TestConverter(unittest.TestCase):
maxDiff = None
def test_output_correctly_generated_default(self):
"""
Test if converted mysql schema matches with the BigQuery json one.
"""
# Point to the default `.sql` case and its relative `.json`
default_case_sql = TEST_DATA_PATH + "/default_case.sql"
big_query_json = TEST_DATA_PATH + "/default_case.json"
with open(big_query_json) as json_file:
bigquery_data = json.load(json_file)
_, big_query_list = converter.convert(default_case_sql, None, None)
self.assertEqual(bigquery_data, big_query_list,
msg=f"\nFAILED AT FOLLOWING SCHEMA: {default_case_sql}")
def test_output_correctly_generated_virtual(self):
"""
Test if converted mysql schema matches with the BigQuery json one.
"""
virtual_case_sql = TEST_DATA_PATH + "/virtual_case.sql"
big_query_json = TEST_DATA_PATH + "/virtual_case.json"
with open(big_query_json) as json_file:
bigquery_data = json.load(json_file)
_, big_query_list = converter.convert(virtual_case_sql, None, None, True)
self.assertEqual(bigquery_data, big_query_list,
msg=f"\nFAILED AT FOLLOWING SCHEMA: {virtual_case_sql}")
def test_output_type_mappings_case(self):
"""
Test if the converted schema is correct when providing a custom data type mapping.
The custom map is provided via CLI with the '-t', '--extra-type-mappings' option.
"""
# Point to the type_mappings_case `.sql` case and its relative `.json`
type_mappings_case_sql = TEST_DATA_PATH + "/type_mappings_case.sql"
big_query_json = TEST_DATA_PATH + "/type_mappings_case.json"
with open(big_query_json) as json_file:
bigquery_data = json.load(json_file)
_, big_query_list = converter.convert(
type_mappings_case_sql, TYPES_MAP_PATH, None)
self.assertEqual(bigquery_data, big_query_list,
msg=f"\nFAILED AT FOLLOWING SCHEMA: {type_mappings_case_sql}")
def test_output_field_mappings_case(self):
"""
Test if the converted schema is correct when providing a custom field type mapping.
The custom map is provided via CLI with the '-f', '--extra-field-mappings' option.
"""
# Point to the field_mappings_case `.sql` case and its relative `.json`
field_mappings_case_sql = TEST_DATA_PATH + "/field_mappings_case.sql"
big_query_json = TEST_DATA_PATH + "/field_mappings_case.json"
with open(big_query_json) as json_file:
bigquery_data = json.load(json_file)
_, big_query_list = converter.convert(
field_mappings_case_sql, None, FIELD_MAP_PATH)
self.assertEqual(bigquery_data, big_query_list,
msg=f"\nFAILED AT FOLLOWING SCHEMA: {field_mappings_case_sql}")
def test_invalid_sql_provided(self):
"""
Test if the converter fails when provided with an .sql file missing the CREATE TABLE statement.
"""
invalid_sql_case = TEST_DATA_PATH + "/invalid_sql_case.sql"
with self.assertRaises(ValueError) as ctx:
_, big_query_list = converter.convert(invalid_sql_case, None, None)
expected = f"File {TEST_DATA_PATH}/invalid_sql_case.sql does not contain a CREATE TABLE STATEMENT"
self.assertEqual(str(ctx.exception), expected)
def test_invalid_type_mappings_provided(self):
"""
Test if the converter fails when provided with an invalid data type in the custom type map.
It refers to the map passed with '--extra-type-mappings'.
"""
type_mappings_case = TEST_DATA_PATH + "/type_mappings_case.sql"
with self.assertRaises(ValueError) as ctx:
_, big_query_list = converter.convert(
type_mappings_case, INVALID_TYPES_MAP_PATH, None)
expected = "The provided data types are not valid in BigQuery: \n[{'type': 'sTRING', 'name': 'created_at', 'mode': 'REQUIRED'}]\n"
self.assertEqual(str(ctx.exception), expected)
def test_invalid_field_mappings_provided(self):
"""
Test if the converter fails when provided with an invalid data type in the custom field map.
It refers to the map passed with '--extra-field-mappings'.
"""
field_mappings_case = TEST_DATA_PATH + "/field_mappings_case.sql"
with self.assertRaises(ValueError) as ctx:
_, big_query_list = converter.convert(
field_mappings_case, None, INVALID_FIELDS_MAP_PATH)
expected = "The provided data types are not valid in BigQuery: \n[{'type': 'sTRING', 'name': 'created_at', 'mode': 'REQUIRED'}]\n"
self.assertEqual(str(ctx.exception), expected)
if __name__ == '__main__':
unittest.main()