diff --git a/functions/allure_report/mapper.py b/functions/allure_report/mapper.py index 5064cf4..76daa05 100644 --- a/functions/allure_report/mapper.py +++ b/functions/allure_report/mapper.py @@ -21,16 +21,16 @@ def get_test_human_name(file): - exp = get_expectation_impl(get_test_name(file)) - template_json = exp._prescriptive_renderer(configuration=ExpectationConfiguration(get_test_name(file), kwargs=get_params1(file)))[0] + template_json = \ + exp._prescriptive_renderer(configuration=ExpectationConfiguration(get_test_name(file), kwargs=get_params1(file)))[0] if type(template_json) is not dict: template_json = template_json.to_json_dict() template_str = template_json['string_template']['template'] params = get_params1(file) result_string = template_str new_params = {} - for key,value in params.items(): + for key, value in params.items(): if type(value) == list: if key == 'value_set': for i in value: @@ -57,7 +57,7 @@ def get_test_human_name(file): return result_string -def get_json(json_name,validate_id): +def get_json(json_name, validate_id): file_name = f"great_expectations/uncommitted/validations/{validate_id}.json" content_object = s3.Object(qa_bucket, f"{qa_bucket}/{file_name}") file_content = content_object.get()['Body'].read().decode('utf-8') @@ -65,7 +65,6 @@ def get_json(json_name,validate_id): return json_content - def get_suit_status(): return "passed" @@ -75,23 +74,25 @@ def get_test_name(file): def get_suit_name(file, i): - return file['meta']['batch_kwargs']['data_asset_name'] + "." + i['expectation_config']['kwargs']['column'] if 'column' in i['expectation_config']['kwargs'] else file['meta']['batch_kwargs']['data_asset_name'] + return f"{file['meta']['batch_kwargs']['data_asset_name']}.{i['expectation_config']['kwargs']['column']}" if "column" in \ + i[ + "expectation_config"][ + "kwargs"] else \ + file["meta"]["batch_kwargs"]["data_asset_name"] def get_jira_ticket(file): if 'Bug Ticket' in file['expectation_config']['meta']: return { - "name": "Bug ticket", - "url": file['expectation_config']['meta']['Bug Ticket'], - "type": "issue" - } + "name": "Bug ticket", + "url": file['expectation_config']['meta']['Bug Ticket'], + "type": "issue" + } else: return {} - - def get_severity(file): return file['expectation_config']['meta']['Severity'] if 'Severity' in file['expectation_config']['meta'] else "" @@ -105,7 +106,8 @@ def get_stop_suit_time(): def parse_datetime(date_str): - return datetime.timestamp(datetime. strptime(date_str, '%Y%m%dT%H%M%S.%fZ'))*1000 + return datetime.timestamp(datetime.strptime(date_str, '%Y%m%dT%H%M%S.%fZ')) * 1000 + def get_start_test_time(file): return parse_datetime(file['meta']['run_id']['run_name']) @@ -120,91 +122,90 @@ def get_params(file): del params['result_format'] result = [] for param in params: - result.append({"name": param, "value": str(params[param])}) if isinstance(params[param], list) else result.append({"name": param, "value": params[param]}) + result.append({"name": param, "value": str(params[param])}) if isinstance(params[param], + list) else result.append( + {"name": param, "value": params[param]}) return result + def get_params1(file): params = file['expectation_config']['kwargs'] - # del params['result_format'] return params + def get_test_status(file): return "passed" if file['success'] is True else "failed" + def get_test_description(file): result = "" - for f in file['result']: - if str(f)!='observed_value': - result = result +"\n" + str(f) + ": " + str(file['result'][f])+"\n" + for f in file["result"]: + if str(f) != "observed_value": + result = result + "\n" + f"{str(f)}: {str(file['result'][f])}" + "\n" return result def get_observed_value(file): try: - return "Observed value: "+str(file['result']['observed_value']) if 'observed_value' in file['result'] else "Unexpected count: "+str(file['result']['unexpected_count']) + return f"Observed value: {str(file['result']['observed_value'])}" if "observed_value" in file[ + "result"] else f"Unexpected count: {str(file['result']['unexpected_count'])}" except KeyError: return 'Column not exist' - def get_exception_message(file): return file['exception_info']['exception_message'] + def get_exception_traceback(file): return file['exception_info']['exception_traceback'] - -def get_folder_key(folder,folder_key): - - - folder = folder + str(folder_key) + '/' +def get_folder_key(folder, folder_key): + folder = f"{folder}{str(folder_key)}/" bucket.put_object(Key=folder) return folder_key -def create_categories_json(json_name,key): +def create_categories_json(json_name, key): data = [ - { - "name": "Ignored tests", - "matchedStatuses": [ - "skipped" - ] - }, - { - "name": "Passed tests", - "matchedStatuses": [ - "passed" - ] - }, - { - "name": "Broken tests", - "matchedStatuses": [ - "broken" - ] - }, - { - "name": "Failed tests", - "matchedStatuses": [ - "failed" - ] - } - ] + { + "name": "Ignored tests", + "matchedStatuses": [ + "skipped" + ] + }, + { + "name": "Passed tests", + "matchedStatuses": [ + "passed" + ] + }, + { + "name": "Broken tests", + "matchedStatuses": [ + "broken" + ] + }, + { + "name": "Failed tests", + "matchedStatuses": [ + "failed" + ] + } + ] result = json.dumps(data) - # with open("dags/reportsx/categories.json", "w") as file: - s3.Object(qa_bucket, "allure/"+json_name+key+"/result/categories.json").put(Body=bytes(result.encode('UTF-8'))) - + s3.Object(qa_bucket, f"allure/{json_name}{key}/result/categories.json").put(Body=bytes(result.encode("UTF-8"))) - -def get_uuid(i, json_name,key): +def get_uuid(i, json_name, key): fl = "" - objs = list(bucket.objects.filter(Prefix='allure/'+json_name+key+'/allure-report/history')) - if(len(objs)>0): + objs = list(bucket.objects.filter(Prefix=f"allure/{json_name}{key}/allure-report/history")) + if (len(objs) > 0): - df = wr.s3.read_json(path=['s3://'+qa_bucket+'/allure/'+json_name+key+'/allure-report/history/history.json']) + df = wr.s3.read_json(path=[f"s3://{qa_bucket}/allure/{json_name}{key}/allure-report/history/history.json"]) fl = json.loads(df.to_json()) keys = list(fl.keys()) @@ -214,58 +215,55 @@ def get_uuid(i, json_name,key): return datetime.now().strftime("%S%f") +def create_suit_json(json_name, key, validate_id): + bucket.put_object(Key=f"allure/{json_name}{key}/result/") -def create_suit_json(json_name,key,validate_id): - bucket.put_object(Key="allure/"+json_name+key+"/result/") - - file = get_json(json_name,validate_id) + file = get_json(json_name, validate_id) start_time = get_start_suit_time(file) stop_time = get_stop_test_time(file) - # for i in range(len(file['results'])): for i in file['results']: uuid = str(get_uuid(list(file['results']).index(i), json_name, key)) data = { - "uuid": uuid, - "historyId": uuid, - "status": get_test_status(i), - "parameters": get_params(i), - "labels": [{ - "name": "test", - "value": get_test_name(i) - }, { - "name": "suite", - "value": get_suit_name(file,i) - }, - { - "name": "severity", - "value": get_severity(i) - } - ], - "links": [get_jira_ticket(i)], - "name": get_test_name(i), - "description": get_test_description(i), - "statusDetails": {"known": False, "muted": False, "flaky": False, - "message": get_observed_value(i) if get_test_status(i)=='failed' else "", - "trace": get_exception_traceback(i)}, - "start": start_time, - "stop": stop_time, - "steps": [ - { - "status": get_test_status(i), - "name": get_test_human_name(i), - "start": get_start_test_time(file), - "stop": get_stop_test_time(file) - }] - } - - + "uuid": uuid, + "historyId": uuid, + "status": get_test_status(i), + "parameters": get_params(i), + "labels": [{ + "name": "test", + "value": get_test_name(i) + }, { + "name": "suite", + "value": get_suit_name(file, i) + }, + { + "name": "severity", + "value": get_severity(i) + } + ], + "links": [get_jira_ticket(i)], + "name": get_test_name(i), + "description": get_test_description(i), + "statusDetails": {"known": False, "muted": False, "flaky": False, + "message": get_observed_value(i) if get_test_status(i) == 'failed' else "", + "trace": get_exception_traceback(i)}, + "start": start_time, + "stop": stop_time, + "steps": [ + { + "status": get_test_status(i), + "name": get_test_human_name(i), + "start": get_start_test_time(file), + "stop": get_stop_test_time(file) + }] + } result = json.dumps(data) - s3.Object(qa_bucket, "allure/"+json_name+key+"/result/"+uuid+"-result.json").put(Body=bytes(result.encode('UTF-8'))) + s3.Object(qa_bucket, f"allure/{json_name}{key}/result/{uuid}-result.json").put( + Body=bytes(result.encode("UTF-8"))) -def transfer_folder(root_src_dir,root_dst_dir): +def transfer_folder(root_src_dir, root_dst_dir): for src_dir, dirs, files in os.walk(root_src_dir): dst_dir = src_dir.replace(root_src_dir, root_dst_dir, 1) if not os.path.exists(dst_dir): @@ -281,16 +279,8 @@ def transfer_folder(root_src_dir,root_dst_dir): shutil.copy(src_file, dst_dir) - - - -def create_json_report(json_name,cloudfront,folder_key,validate_id): - key = "/"+get_folder_key("allure/"+json_name+"/",folder_key) - create_suit_json(json_name,key,validate_id) - create_categories_json(json_name,key) - return cloudfront+"/allure/"+json_name+key+"/allure-report/index.html", json_name+key - - - - - +def create_json_report(json_name, cloudfront, folder_key, validate_id): + key = "/" + get_folder_key(f"allure/{json_name}/", folder_key) + create_suit_json(json_name, key, validate_id) + create_categories_json(json_name, key) + return f"{cloudfront}/allure/{json_name}{key}/allure-report/index.html", json_name + key diff --git a/functions/data_test/Expectation_report_new.py b/functions/data_test/Expectation_report_new.py index 0be4249..23a589a 100644 --- a/functions/data_test/Expectation_report_new.py +++ b/functions/data_test/Expectation_report_new.py @@ -1,6 +1,7 @@ from typing import Any, Optional import pandas as pd +from great_expectations.core import ExpectationConfiguration from pandas_profiling.expectations_report import ExpectationHandler from visions import VisionsTypeset @@ -18,17 +19,18 @@ def typeset(self) -> Optional[VisionsTypeset]: return None def to_expectation_suite( - self, - suite_name: Optional[str] = None, - data_context: Optional[Any] = None, - mapping_schema: Optional[list] = None, - save_suite: bool = True, - reuse_suite: bool = False, - run_validation: bool = True, - build_data_docs: bool = True, - old_suite_name: Optional[str] = None, - use_old_suite: Optional[str] = None, - handler: Optional[Handler] = None, + self, + run_name: Optional[str] = None, + suite_name: Optional[str] = None, + data_context: Optional[Any] = None, + mapping_config: Optional[dict] = None, + save_suite: bool = True, + reuse_suite: bool = False, + run_validation: bool = True, + build_data_docs: bool = True, + old_suite_name: Optional[str] = None, + use_old_suite: Optional[str] = None, + handler: Optional[Handler] = None, ) -> Any: """ All parameters default to True to make it easier to access the full functionality of Great Expectations out of @@ -45,6 +47,8 @@ def to_expectation_suite( Returns: An ExpectationSuite """ + + ignored_columns = [] try: import great_expectations as ge except ImportError: @@ -65,70 +69,175 @@ def to_expectation_suite( data_context = ge.data_context.DataContext() new_column_in_mapping = {} + try: + mapping_schema = mapping_config[suite_name] + except KeyError: + mapping_schema = None + if reuse_suite: if use_old_suite: - suite_old = data_context.get_expectation_suite(old_suite_name) - data_context.save_expectation_suite(expectation_suite=suite_old,expectation_suite_name=suite_name,overwrite_existing=True) + suite_old = data_context.get_expectation_suite(f"{suite_name}_{old_suite_name}") + data_context.save_expectation_suite(expectation_suite=suite_old, expectation_suite_name=f"{suite_name}_{run_name}", + overwrite_existing=True) else: - suite_old = data_context.get_expectation_suite(old_suite_name) schema_list = list(mapping_schema.keys()) - schema_list.append('_nocolumn') + dict_keys = [i for i in mapping_schema if isinstance(mapping_schema[i], dict)] - r = re.compile("new_col_added") - new_column_in_mapping_keys = list(filter(r.match, schema_list)) - for key in new_column_in_mapping_keys: - new_column_in_mapping.update({key:mapping_schema[key]}) - if new_column_in_mapping_keys: - schema_list = [x for x in schema_list if x not in new_column_in_mapping_keys] - old_schema_list = list(suite_old.get_grouped_and_ordered_expectations_by_column()[0].keys()) - new_schema_list = [x for x in old_schema_list if x not in schema_list] - for key in new_schema_list: - exp_conf = [] - exp_conf.append(suite_old.get_grouped_and_ordered_expectations_by_column()[0][key]) - for exps in exp_conf: - for exp in exps: - suite_old.remove_expectation( - exp, - match_type="runtime", - ) - schema_values = list(mapping_schema.values()) - for key,v in zip(schema_list,schema_values): - exp_conf = [] - exp_conf.append(suite_old.get_grouped_and_ordered_expectations_by_column()[0][key]) - for exps in exp_conf: - for exp in exps: - if(exp['expectation_type']=='expect_table_columns_to_match_set'): - suite_old.patch_expectation( - exp, - op="replace", - path="/column_set", - value=schema_values, - match_type="runtime", - ) - elif (exp['expectation_type']!='expect_table_row_count_to_equal'): - suite_old.patch_expectation( + if not dict_keys: + suite_old = data_context.get_expectation_suite(f"{suite_name}_{old_suite_name}") + schema_list.append("_nocolumn") + + r = re.compile("new_col_added") + new_column_in_mapping_keys = list(filter(r.match, schema_list)) + for key in new_column_in_mapping_keys: + new_column_in_mapping.update({key: mapping_schema[key]}) + if new_column_in_mapping_keys: + schema_list = [x for x in schema_list if + x not in new_column_in_mapping_keys and x not in ignored_columns] + old_schema_list = list(suite_old.get_grouped_and_ordered_expectations_by_column()[0].keys()) + new_schema_list = [x for x in old_schema_list if x not in schema_list] + for key in new_schema_list: + exp_conf = [] + exp_conf.append(suite_old.get_grouped_and_ordered_expectations_by_column()[0][key]) + for exps in exp_conf: + for exp in exps: + suite_old.remove_expectation( exp, - op="replace", - path="/column", - value=v, match_type="runtime", ) - data_context.save_expectation_suite(expectation_suite=suite_old,expectation_suite_name=suite_name,overwrite_existing=True) + schema_values = list(mapping_schema.values()) + for key, v in zip(schema_list, schema_values): + exp_conf = [] + exp_conf.append(suite_old.get_grouped_and_ordered_expectations_by_column()[0][key]) + for exps in exp_conf: + for exp in exps: + if (exp["expectation_type"] == "expect_table_columns_to_match_set"): + suite_old.patch_expectation( + exp, + op="replace", + path="/column_set", + value=schema_values, + match_type="runtime", + ) + elif (exp["expectation_type"] != "expect_table_row_count_to_equal"): + suite_old.patch_expectation( + exp, + op="replace", + path="/column", + value=v, + match_type="runtime", + ) + data_context.save_expectation_suite(expectation_suite=suite_old, expectation_suite_name=f"{suite_name}_{run_name}", + overwrite_existing=True) + + if new_column_in_mapping: + suite_old = data_context.get_expectation_suite(f"{suite_name}_{run_name}") + batch = ge.dataset.PandasDataset(self.df, expectation_suite=suite_old) + summary = self.get_description() + for name, variable_summary in summary["variables"].items(): + if name in list(new_column_in_mapping.values()): + handler.handle(variable_summary["type"], name, variable_summary, batch) + suite = batch.get_expectation_suite(discard_failed_expectations=False) + data_context.save_expectation_suite(expectation_suite=suite, expectation_suite_name=f"{suite_name}_{run_name}", + overwrite_existing=True) + + else: # if we have nested tables + r = re.compile("new_col_added") + new_column_in_mapping_keys = list(filter(r.match, schema_list)) + schema_list = [x for x in schema_list if x not in new_column_in_mapping_keys] + schema_list = [x for x in schema_list if + x not in dict_keys] # subtract original suite list keys from nested + dict_keys_schema_list = [] + dict_values_schema_list = [] + for key in dict_keys: + if not mapping_schema[key]: + dict_keys_schema_list.append( + list(mapping_config[key].keys())) # create list of lists for nested suites columns + dict_values_schema_list.append(list(mapping_config[key].values())) + else: + dict_keys_schema_list.append( + list(mapping_schema[key].keys())) # if nested table has renaming + dict_values_schema_list.append(list(mapping_schema[key].values())) + dict_suites = [] + for d_key in dict_keys: + suite_old = data_context.get_expectation_suite(f"{d_key}_{old_suite_name}") + old_schema_list = list(suite_old.get_grouped_and_ordered_expectations_by_column()[ + 0].keys()) # get schema from original nested suite + dict_keys_schema_list[list(dict_keys).index(d_key)].append("_nocolumn") + new_schema_list = [x for x in old_schema_list if x not in dict_keys_schema_list[ + list(dict_keys).index(d_key)]] # subtract mapping schema from original schema + for key in new_schema_list: # delete not necessary tests based on mapping + exp_conf = [] + exp_conf.append(suite_old.get_grouped_and_ordered_expectations_by_column()[0][key]) + for exps in exp_conf: + for exp in exps: + suite_old.remove_expectation( + exp, + match_type="runtime", + ) + # schema_values = list(mapping_config[d_key].values()) + schema_values = dict_values_schema_list[list(dict_keys).index(d_key)] + for key, v in zip(dict_keys_schema_list[list(dict_keys).index(d_key)], + schema_values): # remove table schema test and replace columns name in tests + exp_conf = [] + exp_conf.append(suite_old.get_grouped_and_ordered_expectations_by_column()[0][key]) + for exps in exp_conf: + for exp in exps: + if (exp["expectation_type"] == "expect_table_columns_to_match_set" or exp["expectation_type"] == "expect_table_row_count_to_equal"): + suite_old.remove_expectation( + exp, + match_type="runtime", + ) + else: + suite_old.patch_expectation( + exp, + op="replace", + path="/column", + value=v, + match_type="runtime", + ) + dict_suites.append(suite_old) - if new_column_in_mapping: - suite = data_context.get_expectation_suite(suite_name) - batch = ge.dataset.PandasDataset(self.df, expectation_suite=suite) + # ##run tests generation against new columns + suite_old = data_context.create_expectation_suite(f"{suite_name}_{run_name}", + overwrite_existing=True) + batch = ge.dataset.PandasDataset(self.df, expectation_suite=suite_old) summary = self.get_description() for name, variable_summary in summary["variables"].items(): - if name in list(new_column_in_mapping.values()): + if name in schema_list and name not in ignored_columns: handler.handle(variable_summary["type"], name, variable_summary, batch) suite = batch.get_expectation_suite(discard_failed_expectations=False) - data_context.save_expectation_suite(suite,overwrite_existing=True) + ## join all suites to one + ## new version + # for dict_suite in dict_suites: + # for (key, value) in dict_suite.get_grouped_and_ordered_expectations_by_column()[0].items(): + # suite.add_expectation_configurations(value) + + for dict_suite in dict_suites: + for (key, values) in dict_suite.get_grouped_and_ordered_expectations_by_column()[0].items(): + for value in values: + suite.add_expectation(ExpectationConfiguration(kwargs=value["kwargs"], + expectation_type=value[ + "expectation_type"], + meta=value["meta"])) + + ## add expected_table_columns_to_match_set + final_schema = sum(dict_values_schema_list, [mapping_schema[x] for x in schema_list]) + + suite.add_expectation( + expectation_configuration=ExpectationConfiguration(kwargs={"column_set": final_schema}, + expectation_type="expect_table_columns_to_match_set")) + suite.add_expectation( + expectation_configuration=ExpectationConfiguration(kwargs={"value": summary['table']['n']}, + expectation_type="expect_table_row_count_to_equal")) + data_context.save_expectation_suite(expectation_suite=suite, + expectation_suite_name=f"{suite_name}_{run_name}", + overwrite_existing=True, discard_failed_expectations=False) else: suite = data_context.create_expectation_suite( - suite_name, overwrite_existing=True, + f"{suite_name}_{run_name}", overwrite_existing=True, ) @@ -147,10 +256,11 @@ def to_expectation_suite( for name, variable_summary in summary["variables"].items(): name_list.append(name) if mapping_schema is not None: - if name in list(mapping_schema.keys()): + if name in list(mapping_schema.keys()) and name not in ignored_columns: handler.handle(variable_summary["type"], name, variable_summary, batch) else: - handler.handle(variable_summary["type"], name, variable_summary, batch) + if name not in ignored_columns: + handler.handle(variable_summary["type"], name, variable_summary, batch) batch.expect_table_columns_to_match_set( column_set=name_list) batch.expect_table_row_count_to_equal(value=summary['table']['n']) diff --git a/functions/data_test/data_test.py b/functions/data_test/data_test.py index 34ea796..39cfe59 100755 --- a/functions/data_test/data_test.py +++ b/functions/data_test/data_test.py @@ -12,7 +12,7 @@ def handler(event, context): - s3 = boto3.resource('s3') + s3 = boto3.resource("s3", endpoint_url=f"http://{os.environ['S3_HOST']}:4566") if os.environ['ENVIRONMENT'] == 'local' else boto3.resource("s3") cloudfront = os.environ['QA_CLOUDFRONT'] qa_bucket_name = os.environ['QA_BUCKET'] run_name = event['run_name'] @@ -21,12 +21,13 @@ def handler(event, context): else: pipeline_config = json.loads( wr.s3.read_json(path=f"s3://{qa_bucket_name}/test_configs/pipeline.json").to_json()) - engine = pipeline_config[run_name]['engine'] - source_root = event['source_root'] - source_input = event['source_data'] - # coverage_config = json.loads(s3.Object(qa_bucket_name,"test_configs/test_coverage.json" ).get()['Body'].read().decode('utf-8')) - coverage_config = json.loads(wr.s3.read_json(path=f"s3://{qa_bucket_name}/test_configs/test_coverage.json").to_json()) - mapping_config = json.loads(wr.s3.read_json(path=f"s3://{qa_bucket_name}/test_configs/mapping.json").to_json()) + engine = pipeline_config[run_name]["engine"] + source_root = event["source_root"] + source_input = event["source_data"] + coverage_config = json.loads( + s3.Object(qa_bucket_name, "test_configs/test_coverage.json").get()["Body"].read().decode("utf-8")) + mapping_config = json.loads( + s3.Object(qa_bucket_name, "test_configs/mapping.json").get()["Body"].read().decode("utf-8")) if type(source_input) is not list: source = [source_input] else: @@ -36,14 +37,20 @@ def handler(event, context): else: source_extension = get_file_extension(source[0]) source_name = get_source_name(source[0], source_extension) - final_ds, path = prepare_final_ds(source, engine, source_root, run_name, source_name) suite_name = f"{source_name}_{run_name}" try: source_covered = coverage_config[suite_name]['complexSuite'] except (IndexError, KeyError) as e: source_covered = False + try: + suite_coverage_config = coverage_config[suite_name] + except (IndexError, KeyError) as e: + suite_coverage_config = None + + final_ds, path = prepare_final_ds(source, engine, source_root, run_name, source_name, suite_coverage_config) - profile_link, folder_key, config = profile_data(final_ds, suite_name, cloudfront, source_root, source_covered, mapping_config, run_name) + profile_link, folder_key, config = profile_data(final_ds, suite_name, cloudfront, source_root, source_covered, + mapping_config, run_name) validate_id = validate_data(final_ds, suite_name, config) test_suite = f"{cloudfront}/data_docs/validations/{validate_id}.html" diff --git a/functions/data_test/datasource.py b/functions/data_test/datasource.py index 3e1360f..9bd4bb6 100644 --- a/functions/data_test/datasource.py +++ b/functions/data_test/datasource.py @@ -22,7 +22,7 @@ def get_file_extension(source): return pathlib.Path(source).suffix[1:] -def read_source(source, engine, extension, run_name, table_name=None): +def read_source(source, engine, extension, run_name, table_name=None, coverage_config=None): path = source if engine == 's3': if extension == 'csv': @@ -51,13 +51,27 @@ def read_source(source, engine, extension, run_name, table_name=None): try: sort_keys_config = json.loads( wr.s3.read_json(path=f"s3://{qa_bucket_name}/test_configs/sort_keys.json").to_json()) - sort_key = sort_keys_config[table_name]['sortKey'] + sort_key = list(map(str.lower, sort_keys_config[table_name]["sortKey"])) except KeyError: sort_key = ['update_dt'] + try: + target_table = coverage_config["targetTable"] + except (KeyError, IndexError, TypeError) as e: + target_table = None + if target_table: + table_name = target_table con = wr.redshift.connect(secret_id=redshift_secret, dbname=redshift_db) - if final_df.nunique()[sort_key][0] > 1: + try: + nunique = final_df.nunique()[sort_key][0] + except (KeyError,IndexError) as e: + nunique = final_df.nunique()[sort_key] + + if nunique > 1: min_key = final_df[sort_key].min() max_key = final_df[sort_key].max() + if type(min_key) != str or type(max_key) != str: + min_key = final_df[sort_key].min()[0] + max_key = final_df[sort_key].max()[0] sql_query = f"SELECT * FROM public.{table_name} WHERE {sort_key[0]} between \\'{min_key}\\' and \\'{max_key}\\'" final_df = wr.redshift.unload( sql=sql_query, @@ -66,8 +80,10 @@ def read_source(source, engine, extension, run_name, table_name=None): ) con.close() else: - key = str(final_df[sort_key].loc[0]) - sql_query = f"SELECT * FROM public.{table_name} WHERE {sort_key[0]}=\\'{key}\\'" + key = final_df[sort_key].values[0] + if type(key) != str: + key = str(key[0]) + sql_query = f"SELECT * FROM {table_name}.{table_name} WHERE {sort_key[0]}=\\'{key}\\'" final_df = wr.redshift.unload( sql=sql_query, con=con, @@ -114,10 +130,6 @@ def read_source(source, engine, extension, run_name, table_name=None): except KeyError: print('Op column not exist') return final_df, path - elif engine == 'postgresql': - return 5 - elif engine == 'snowflake': - return 6 else: return wr.s3.read_parquet(path=source), path @@ -127,7 +139,7 @@ def get_source_name(source, extension): return result.group(1) if result else None -def prepare_final_ds(source, engine, source_engine, run_name, source_name=None): +def prepare_final_ds(source, engine, source_engine, run_name, source_name=None, coverage_config=None): path = source if engine == 's3': source = concat_source_list(source, source_engine) @@ -139,5 +151,5 @@ def prepare_final_ds(source, engine, source_engine, run_name, source_name=None): df, path = read_source(source, engine, source_extension, run_name, source_name) else: source = concat_source_list(source, source_engine) - df, path = read_source(source, engine, None, run_name, source_name) + df, path = read_source(source, engine, None, run_name, source_name,coverage_config) return df, path diff --git a/functions/data_test/profiling.py b/functions/data_test/profiling.py index 2b700cc..ad31247 100755 --- a/functions/data_test/profiling.py +++ b/functions/data_test/profiling.py @@ -144,13 +144,12 @@ def add_local_s3_to_data_docs(data_docs_sites): return data_docs_sites -def profile_data(df, suite_name, cloudfront, datasource_root, source_covered,mapping_config,run_name): - try: - mapping_schema = mapping_config[suite_name.split('_')[0]] - except KeyError: - mapping_schema = None - +def remove_suffix(input_string, suffix): + if suffix and input_string.endswith(suffix): + return input_string[:-len(suffix)] + return input_string +def profile_data(df, suite_name, cloudfront, datasource_root, source_covered, mapping_config, run_name): qa_bucket = s3.Bucket(qa_bucket_name) config = change_ge_config(datasource_root) context_ge = BaseDataContext(project_config=config) @@ -174,12 +173,13 @@ def profile_data(df, suite_name, cloudfront, datasource_root, source_covered,map ExpectationsReport.to_expectation_suite = ExpectationsReportNew.to_expectation_suite suite = profile.to_expectation_suite( data_context=context_ge, - suite_name=suite_name, + suite_name=remove_suffix(suite_name,f"_{run_name}"), + run_name = run_name, save_suite=True, run_validation=False, build_data_docs=False, reuse_suite=reuse_suite, - mapping_schema=mapping_schema, + mapping_config=mapping_config, use_old_suite=use_old_suite_only, old_suite_name=old_suite_name, handler=MyExpectationHandler(profile.typeset) diff --git a/functions/data_test/suite_run.py b/functions/data_test/suite_run.py index 67189b6..0ff20fc 100644 --- a/functions/data_test/suite_run.py +++ b/functions/data_test/suite_run.py @@ -12,7 +12,7 @@ import boto3 BASE_DIR = Path(__file__).resolve().parent from great_expectations.data_context import BaseDataContext -def validate_data(file,suite_name,config): +def validate_data(file, suite_name, config): s3 = boto3.resource("s3") context_ge = BaseDataContext(project_config=config) diff --git a/functions/report_push/push_data_report.py b/functions/report_push/push_data_report.py index d823970..5945c5e 100644 --- a/functions/report_push/push_data_report.py +++ b/functions/report_push/push_data_report.py @@ -34,7 +34,11 @@ def handler(event, context): bucket = s3.Bucket(qa_bucket) created_bug_count = 0 items = [] - df = wr.s3.read_json(path=[f's3://{qa_bucket}/allure/{suite}/{key}/allure-report/history/history-trend.json']) + df = wr.s3.read_json( + path=[ + f"s3://{qa_bucket}/allure/{suite}/{key}/allure-report/history/history-trend.json" + ] + ) history = json.loads(df.to_json()) total = history['data']['0']['total'] failed = history['data']['0']['failed'] @@ -63,15 +67,14 @@ def handler(event, context): with table.batch_writer() as batch: for item in items: - batch.put_item( - Item=item - ) + batch.put_item(Item=item) try: pipeline_config = json.loads( wr.s3.read_json(path=f"s3://{qa_bucket}/test_configs/pipeline.json").to_json()) autobug = pipeline_config[run_name]['autobug'] except KeyError: + autobug = False print(f"Can't find config for {run_name}") if autobug: @@ -129,7 +132,7 @@ def handler(event, context): def create_jira_bugs_from_allure_result(bucket, key, replaced_allure_links, suite, jira_project_key): created_bug_count = 0 - all_result_files = bucket.objects.filter(Prefix=f'allure/{suite}/{key}/result/') + all_result_files = bucket.objects.filter(Prefix=f"allure/{suite}/{key}/result/") issues = get_all_issues(jira_project_key) for result_file_name in all_result_files: if result_file_name.key.endswith('result.json'): @@ -138,9 +141,15 @@ def create_jira_bugs_from_allure_result(bucket, key, replaced_allure_links, suit status = data_in_file['status'] if status == "failed": created_bug_count += 1 - table_name = data_in_file['labels'][1]['value'] - fail_step = data_in_file['steps'][0]['name'] - description = data_in_file['description'] - open_bug(table_name[:table_name.find('.')], fail_step[:fail_step.find('.')], description, - f'https://{replaced_allure_links}', issues, jira_project_key) + table_name = data_in_file["labels"][1]["value"] + fail_step = data_in_file["steps"][0]["name"] + description = data_in_file["description"] + open_bug( + table_name[: table_name.find(".")], + fail_step[: fail_step.find(".")], + description, + f"https://{replaced_allure_links}", + issues, + jira_project_key, + ) return created_bug_count