diff --git a/.gitignore b/.gitignore index dcaaf205..24be4a11 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,6 @@ +__pycache__ examples/ *.tgz charts/ .idea/ -*.code-workspace \ No newline at end of file +*.code-workspace diff --git a/scripts/Dockerfile b/scripts/Dockerfile new file mode 100644 index 00000000..d29166ff --- /dev/null +++ b/scripts/Dockerfile @@ -0,0 +1,13 @@ +FROM python:3 + +WORKDIR /app + +COPY helpers.py /app/ +COPY convert.py /app/ +COPY mappings.py /app/ + +RUN pip install argparse pyyaml + +ENTRYPOINT ["python3", "convert.py"] + +CMD ["-e", "values.yaml"] \ No newline at end of file diff --git a/scripts/README.md b/scripts/README.md new file mode 100644 index 00000000..8fd1eaba --- /dev/null +++ b/scripts/README.md @@ -0,0 +1,43 @@ +# Anchore Engine to Enterprise Helm Chart Value File Converter + +This script converts the values file of Anchore Engine to the values file format suitable for the Anchore Enterprise Helm chart. + +## Prerequisites + +- Docker: Make sure you have Docker installed on your machine. + +## Usage + +1. **The Docker Image**: + To build the docker image yourself, from the `scripts` directory, build the Docker image using the following command: + + ```bash + docker build -t script-container . + ``` + + Alternatively, a docker image is available at `docker.io/anchore/enterprise-helm-migrator:latest` + +2. **Run the Docker Container**: + + Run the Docker container with the following command. Change the name of the file as needed: + + ```bash + export VALUES_FILE_NAME=my-values-file.yaml + docker run -v ${PWD}:/tmp -v ${PWD}/${VALUES_FILE_NAME}:/app/${VALUES_FILE_NAME} docker.io/anchore/enterprise-helm-migrator:latest -e /app/${VALUES_FILE_NAME} -d /tmp/output + ``` + + This command mounts a local volume to store the output files and mounts the input file to be converted, and passes it using the `-e` flag. + +3. **Retrieve Output**: + + After running the Docker container, the converted Helm chart values file will be available in the `${PWD}/output` directory on your local machine. + +## Important Note + +Please ensure that you have reviewed and understood the content of the input file before running the conversion. The script provided is specifically tailored to convert Anchore Engine values files to the format expected by the Anchore Enterprise Helm chart. + +## Disclaimer + +This script is provided as-is and is intended to help reduce the friction of converting from anchore-engine to enterprise. It is your responsibility to ensure that any modifications or usage of the script align with your requirements and best practices. + +For any issues or suggestions related to the script or Docker image, feel free to create an issue or pull request in this repository. diff --git a/scripts/convert.py b/scripts/convert.py new file mode 100644 index 00000000..f9f4a5a3 --- /dev/null +++ b/scripts/convert.py @@ -0,0 +1,25 @@ +import sys +sys.dont_write_bytecode = True + +import argparse +from helpers import convert_values_file + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="Ingests one values files, changes the keys based on a declared map, then spits out a different values file") + parser.add_argument( + "-e", "--engine-file", + type=str, + help="Path to the original values file being ingested", + default="" + ) + parser.add_argument( + "-d", "--results-dir", + type=str, + help="directory to put resulting files in", + default="enterprise-values" + ) + + args = parser.parse_args() + engine_file = args.engine_file + results_dir = args.results_dir + convert_values_file(file=engine_file, results_dir=results_dir) \ No newline at end of file diff --git a/scripts/helpers.py b/scripts/helpers.py new file mode 100644 index 00000000..6ec992dc --- /dev/null +++ b/scripts/helpers.py @@ -0,0 +1,269 @@ +import copy +import os +import pathlib +import shutil +import yaml + +from mappings import ( + KEYS_WITHOUT_CHANGES, + KUBERNETES_KEYS, + TOP_LEVEL_MAPPING, + FULL_CHANGE_KEY_MAPPING, LEVEL_TWO_CHANGE_KEY_MAPPING, LEVEL_THREE_CHANGE_KEY_MAPPING, + DEPENDENCY_CHARTS, + ENTERPRISE_ENV_VAR_MAPPING, FEEDS_ENV_VAR_MAPPING, + DEPRECATED_KEYS, CHECK_LAST, + POST_PROCESSING +) + +def represent_block_scalar(dumper, data): + style = "|" if "\n" in data else '"' + return dumper.represent_scalar("tag:yaml.org,2002:str", data, style=style) + +def convert_values_file(file, results_dir): + file_name = os.path.basename(file) + prep_dir(path=results_dir, clean=True) + + with open(file, 'r') as content: + parsed_data = yaml.safe_load(content) + + dot_string_dict = dict_keys_to_dot_string(parsed_data) + write_to_file(data=str("\n".join(f"{key} = {val}" for key, val in dot_string_dict.items())), output_file=os.path.join(results_dir, "dotstring.txt"), write_mode="w") + + enterprise_chart_values_dict, enterprise_chart_env_var_dict = replace_keys_with_mappings(dot_string_dict, results_dir) + + for key, val in enterprise_chart_env_var_dict.items(): + if isinstance(val, list): + enterprise_chart_values_dict[key] = enterprise_chart_values_dict[key] + val + elif isinstance(val, dict): + enterprise_chart_values_dict[key] = enterprise_chart_values_dict.get(key, {}) + enterprise_chart_values_dict[key]["extraEnv"] = enterprise_chart_values_dict[key].get("extraEnv", []) + enterprise_chart_values_dict[key]["extraEnv"] = enterprise_chart_values_dict[key]["extraEnv"] + val.get("extraEnv", []) + + yaml.add_representer(str, represent_block_scalar) + yaml_data = yaml.dump(enterprise_chart_values_dict, default_flow_style=False) + file_name = f"enterprise.{file_name}" + write_to_file(data=yaml_data, output_file=os.path.join(results_dir, file_name), write_mode="w") + +def write_to_file(data, output_file, write_mode='w'): + file_parent_dir = pathlib.Path(output_file).parent + prep_dir(file_parent_dir) + with open(f"{output_file}", write_mode) as file: + file.write(data) + return f"{output_file}" + +def prep_dir(path, clean=False): + if clean: + if pathlib.Path(path).is_dir(): + shutil.rmtree(path) + if not pathlib.Path(path).is_dir(): + pathlib.Path(path).mkdir(parents=True, exist_ok=True) + return path + +# return as the first return value, a dictionary where the keys are dot string representation of the old keys and +# the value is the original values +def dict_keys_to_dot_string(dictionary, prefix=''): + result = {} + for key, value in dictionary.items(): + full_key = f'{prefix}.{key}' if prefix else key + if isinstance(value, dict) and bool(value): + sub_dict = dict_keys_to_dot_string(value, full_key) + result.update(sub_dict) + else: + result[full_key] = value + return result + +# returns the resulting dictionary that will be used to create the new values file +def replace_keys_with_mappings(dot_string_dict, results_dir): + result = {} + env_var_results = {} + keys_without_changes = KEYS_WITHOUT_CHANGES + top_level_mapping = TOP_LEVEL_MAPPING + kubernetes_keys = KUBERNETES_KEYS + full_change_key_mapping = FULL_CHANGE_KEY_MAPPING + + level_two_change_key_mapping = LEVEL_TWO_CHANGE_KEY_MAPPING + level_three_change_key_mapping = LEVEL_THREE_CHANGE_KEY_MAPPING + + enterprise_env_var_mapping = ENTERPRISE_ENV_VAR_MAPPING + feeds_env_var_mapping = FEEDS_ENV_VAR_MAPPING + deprecated_keys = DEPRECATED_KEYS + dependency_charts_keys = DEPENDENCY_CHARTS + check_last = CHECK_LAST + post_processing = POST_PROCESSING + + env_var_mapping = {**enterprise_env_var_mapping, **feeds_env_var_mapping} + logs_dir = f"{results_dir}/logs" + for dotstring_key, val in dot_string_dict.items(): + keys = dotstring_key.split('.') + + if deprecated_keys.get(dotstring_key): + log_file_name = "warning.log" + write_to_file(f"{dotstring_key}: no longer used\n", os.path.join(logs_dir, log_file_name), "a") + continue + + # serviceName.annotations + if len(keys) > 1 and keys[1] in ['annotations', 'labels', 'nodeSelector', 'affinity', 'deploymentAnnotations']: + if val != {}: + val = { + '.'.join(keys[2:]): val + } + keys = keys[:2] + # serviceName.service.annotations + elif len(keys) > 2 and keys[2] in ['annotations', 'labels']: + if val != {}: + val = { + '.'.join(keys[3:]): val + } + keys = keys[:3] + + update_result = False + errored = True + + if dotstring_key in post_processing: + pp_val = post_processing.get(dotstring_key) + action = pp_val.get("action") + if action == "split_value": + delimeter = pp_val.get("split_on") + new_vals = val.split(delimeter) + new_keys = pp_val.get("new_keys") + combined_dict = dict(zip(new_keys, new_vals)) + for new_key, new_val in combined_dict.items(): + dict_key = create_dict_entry(new_key, new_val) + result = merge_dicts(result, dict_key) + continue + elif action == "merge": + merge_keys = pp_val.get("merge_keys") + merged_val = [] + for merge_key in merge_keys: + merged_val.append(dot_string_dict.get(merge_key)) + merged_val = ":".join(merged_val) + + dotstring_key = pp_val.get("new_key") + dict_key = create_dict_entry(dotstring_key, merged_val) + result = merge_dicts(result, dict_key) + continue + elif action == "duplicate": + new_keys = pp_val.get("new_keys") + for dotstring_key in new_keys: + dict_key = create_dict_entry(dotstring_key, copy.deepcopy(val)) + result = merge_dicts(result, dict_key) + continue + elif action == "key_addition": + new_keys = pp_val.get("new_keys") + for new_key in new_keys: + key = new_key[0] + value = new_key[1] + if value == "default": + value = val + dict_key = create_dict_entry(key, value) + result = merge_dicts(result, dict_key) + continue + + if not update_result: + if full_change_key_mapping.get(dotstring_key): + dotstring_key = full_change_key_mapping.get(dotstring_key) + update_result = True + elif len(keys) > 1: + level_three_replacement = False + if len(keys) > 2: + level_three_replacement = level_three_change_key_mapping.get(f"{keys[0]}.{keys[1]}.{keys[2]}", False) + level_two_replacement = level_two_change_key_mapping.get(f"{keys[0]}.{keys[1]}", False) + top_level_key = top_level_mapping.get(f"{keys[0]}", False) + + if level_three_replacement: + # replace the first three keys of the original + dotstring_key = create_new_dotstring(keys=keys, dotstring=level_three_replacement, level=3) + update_result = True + # if its not a level 3 replacement, check if its a level 2 replacement + elif level_two_replacement: + dotstring_key = create_new_dotstring(keys=keys, dotstring=level_two_replacement, level=2) + update_result = True + elif top_level_key and (f"{keys[1]}" in kubernetes_keys): + keys[0] = top_level_key + dotstring_key = ".".join(keys) + update_result = True + + if not update_result: + if env_var_mapping.get(dotstring_key): + extra_environment_variable = env_var_mapping.get(dotstring_key) + + environment_variable_name = extra_environment_variable.split(".")[-1] + service_name = "" + if len(extra_environment_variable.split(".")) > 1: + service_name = extra_environment_variable.split(".")[0] + + message = f"{dotstring_key} is now an environment variable: {environment_variable_name}" + log_file_name = "alert.log" + write_to_file(f"{message}\n", os.path.join(logs_dir, log_file_name), "a") + + env_dict = {"name": environment_variable_name, "value": val} + + if service_name != "": + env_var_results[service_name] = env_var_results.get(service_name, {}) + if env_var_results[service_name].get("extraEnv"): + env_var_results[service_name]["extraEnv"].append(env_dict) + else: + env_var_results[service_name]["extraEnv"] = [env_dict] + else: + env_var_results["extraEnv"] = env_var_results.get("extraEnv", []) + env_var_results["extraEnv"].append(env_dict) + continue + + elif f"{keys[0]}" in keys_without_changes: + log_file_name = "info.log" + write_to_file(f"{dotstring_key}: being carried over directly because there should be no changes\n", os.path.join(logs_dir, log_file_name), "a") + update_result = True + elif dependency_charts_keys.get(f"{keys[0]}"): + new_dep_key = dependency_charts_keys.get(f"{keys[0]}") + log_file_name = "dependency-chart-alert.log" + write_to_file(f"{dotstring_key}: {keys[0]} changed to {new_dep_key} but inner keys should be checked.\n", os.path.join(logs_dir, log_file_name), "a") + keys[0] = new_dep_key + dotstring_key = ".".join(keys) + update_result = True + elif f"{keys[0]}" in check_last: + keys.pop(0) + dotstring_key = ".".join(keys) + update_result = True + + if update_result: + dict_key = create_dict_entry(dotstring_key, val) + result = merge_dicts(result, dict_key) + elif errored: + if dotstring_key.split('.')[0] in deprecated_keys: + message = f"{dotstring_key}: not found. likely deprecated.\n" + else: + message = f"{dotstring_key}: not found.\n" + log_file_name = "error.log" + write_to_file(message, os.path.join(logs_dir, log_file_name), "a") + return result, env_var_results + +def create_new_dotstring(keys: list, dotstring: str, level: int) -> str: + new_keys = dotstring.split(".") + new_keys.extend(keys[level:]) + dotstring_key = ".".join(new_keys) + return dotstring_key + +def create_dict_entry(dotstring, value): + result = {} + current_dict = result + keys = dotstring.split('.') + + for index, key in enumerate(keys): + if index == len(keys) - 1: + current_dict[key] = value + else: + # creates the key with an empty map as a value because theres more to come + current_dict[key] = {} + current_dict = current_dict[key] + return result + +def merge_dicts(dict1, dict2): + merged_dict = dict1.copy() + + for key, value in dict2.items(): + if key in merged_dict and isinstance(merged_dict[key], dict) and isinstance(value, dict): + merged_dict[key] = merge_dicts(merged_dict[key], value) + else: + merged_dict[key] = value + + return merged_dict \ No newline at end of file diff --git a/scripts/mappings.py b/scripts/mappings.py new file mode 100644 index 00000000..90d329d1 --- /dev/null +++ b/scripts/mappings.py @@ -0,0 +1,328 @@ +# If we see this as first level, just skip them +KEYS_WITHOUT_CHANGES = { + "cloudsql", + "ingress", + "fullnameOverride" +} + +# check this last. If this is the last thing, and it starts with this, drop the key. eg anchoreGlobal.something -> something +CHECK_LAST = { + "anchoreEnterpriseGlobal", + "anchoreGlobal" +} + +# if first level in dep charts, and no matches in any of mapping, log to file +DEPENDENCY_CHARTS = { + "anchore-feeds-db": "feeds-db", + "anchore-feeds-gem-db": "gem-db", + "anchore-ui-redis": "ui-redis", + "postgresql": "postgresql", + "ui-redis": "ui-redis" +} + +# if second key is in this list, replace first key with the value from TOP_LEVEL_MAPPING +KUBERNETES_KEYS = { + "affinity", + "annotations", + "deploymentAnnotations", + "extraEnv", + "labels", + "nodeSelector", + "replicaCount", + "resources", + "service", + "tolerations", + "serviceAccountName" +} +TOP_LEVEL_MAPPING = { + "anchore-feeds-db": "feeds.feeds-db", + "anchore-feeds-gem-db": "feeds.gem-db", + "anchore-ui-redis": "ui-redis", + "anchoreAnalyzer": "analyzer", + "anchoreApi": "api", + "anchoreCatalog": "catalog", + "anchoreEnterpriseEngineUpgradeJob": "upgradeJob", + "anchoreEnterpriseFeeds": "feeds", + "anchoreEnterpriseFeedsUpgradeJob": "feeds.feedsUpgradeJob", + "anchoreEnterpriseNotifications": "notifications", + "anchoreEnterpriseRbac": "rbacManager", + "anchoreEnterpriseReports": "reports", + "anchoreEnterpriseUi": "ui", + "anchorePolicyEngine": "policyEngine", + "anchoreSimpleQueue": "simpleQueue", + "ingress": "ingress" +} + +LEVEL_TWO_CHANGE_KEY_MAPPING = { + "anchore-feeds-db.externalEndpoint": "feeds.feeds-db.externalEndpoint", + "anchoreEnterpriseUi.customLinks": "anchoreConfig.ui.custom_links", + "anchoreEnterpriseUi.enableAddRepositories": "anchoreConfig.ui.enable_add_repositories", + "anchoreEnterpriseFeeds.url": "feeds.url", + ######################################################################## + ################ TEST configfile, set malware stuff #################### + ######################################################################## + "anchoreAnalyzer.configFile": "anchoreConfig.analyzer.configFile", + "anchoreApi.external": "anchoreConfig.apiext.external", + "anchoreCatalog.analysis_archive": "anchoreConfig.catalog.analysis_archive", + "anchoreCatalog.cycleTimers": "anchoreConfig.catalog.cycle_timers", + "anchoreCatalog.events": "anchoreConfig.catalog.event_log", + "anchoreCatalog.object_store": "anchoreConfig.catalog.object_store", + "anchoreEnterpriseEngineUpgradeJob.enabled": "upgradeJob.enabled", + "anchoreEnterpriseFeeds.cycleTimers": "feeds.anchoreConfig.feeds.cycle_timers", + "anchoreEnterpriseFeeds.dbConfig": "feeds.anchoreConfig.dbConfig", + "anchoreEnterpriseFeeds.debianExtraReleases": "feeds.anchoreConfig.feeds.drivers.debian.releases", + + "anchoreEnterpriseFeeds.gemDriverEnabled": "feeds.anchoreConfig.feeds.drivers.gem.enabled", + "anchoreEnterpriseFeeds.githubDriverEnabled": "feeds.anchoreConfig.feeds.drivers.github.enabled", + "anchoreEnterpriseFeeds.githubDriverToken": "feeds.anchoreConfig.feeds.drivers.github.token", + + "anchoreEnterpriseFeeds.msrcWhitelist": "feeds.anchoreConfig.feeds.drivers.msrc.whitelist", + "anchoreEnterpriseFeeds.msrcDriverEnabled": "feeds.anchoreConfig.feeds.drivers.msrc.enabled", + + "anchoreEnterpriseFeeds.npmDriverEnabled": "feeds.anchoreConfig.feeds.drivers.npm.enabled", + + + "anchoreEnterpriseFeeds.persistence": "feeds.persistence", + "anchoreEnterpriseFeeds.ubuntuExtraReleases": "feeds.anchoreConfig.feeds.drivers.ubuntu.releases", + + "anchoreEnterpriseFeedsUpgradeJob.enabled": "feeds.feedsUpgradeJob.enabled", + "anchoreEnterpriseNotifications.cycleTimers": "anchoreConfig.notifications.cycle_timers", + "anchoreEnterpriseReports.cycleTimers": "anchoreConfig.reports_worker.cycle_timers", + "anchoreEnterpriseUi.appDBConfig": "anchoreConfig.ui.appdb_config", + "anchoreEnterpriseUi.authenticationLock": "anchoreConfig.ui.authentication_lock", + "anchoreEnterpriseUi.existingSecretName": "ui.existingSecretName", + "anchoreEnterpriseUi.image": "ui.image", + "anchoreEnterpriseUi.imagePullPolicy": "ui.imagePullPolicy", + "anchoreEnterpriseUi.ldapsRootCaCertName": "ui.ldapsRootCaCertName", + "anchoreGlobal.dbConfig": "anchoreConfig.database", + "anchoreGlobal.internalServicesSsl": "anchoreConfig.internalServicesSSL", + "anchoreGlobal.policyBundles": "anchoreConfig.policyBundles", + "anchoreGlobal.webhooks": "anchoreConfig.webhooks", + "anchorePolicyEngine.cycleTimers": "anchoreConfig.policy_engine.cycle_timers", + "anchorePolicyEngine.overrideFeedsToUpstream": "anchoreConfig.policy_engine.overrideFeedsToUpstream", + + "postgresql.externalEndpoint": "postgresql.externalEndpoint", + "postgresql.persistence": "postgresql.primary.persistence", + "postgresql.extraEnv": "postgresql.primary.extraEnvVars", + "anchore-feeds-db.extraEnv": "feeds.feeds-db.primary.extraEnvVars", + "anchore-feeds-gem-db.extraEnv": "feeds.gem-db.primary.extraEnvVars", + + "anchore-feeds-gem-db.persistence": "feeds.gem-db.primary.persistence", + "anchore-feeds-db.persistence": "feeds.feeds-db.primary.persistence", + + "anchoreEnterpriseRbac.managerResources": "rbacManager.resources", + "anchoreEnterpriseRbac.authResources": "rbacAuth.resources", +} + +LEVEL_THREE_CHANGE_KEY_MAPPING = { + "anchore-feeds-db.persistence.resourcePolicy": "feeds.feeds-db.primary.persistence.resourcePolicy", + "anchore-feeds-db.persistence.size": "feeds.feeds-db.primary.persistence.size", + "anchoreAnalyzer.cycleTimers.image_analyzer": "anchoreConfig.analyzer.cycle_timers.image_analyzer", + "anchoreGlobal.saml.secret": "anchoreConfig.keys.secret", +} + +# We need to go all the way down to the value. Replace the whole original key +FULL_CHANGE_KEY_MAPPING = { + "postgresql.enabled": "postgresql.chartEnabled", + "postgresql.postgresDatabase": "postgresql.auth.database", + "postgresql.postgresPassword": "postgresql.auth.password", + "postgresql.postgresUser": "postgresql.auth.username", + "postgresql.postgresPort": "postgresql.primary.service.ports.postgresql", + "postgresql.imageTag": "postgresql.imageTag", + + "anchore-feeds-db.imageTag": "feeds.feeds-db.image.tag", + "anchore-feeds-gem-db.imageTag": "feeds.gem-db.image.tag", + "anchore-feeds-db.enabled": "feeds.feeds-db.chartEnabled", + + "anchore-feeds-db.postgresDatabase": "feeds.feeds-db.auth.database", + "anchore-feeds-db.postgresPassword": "feeds.feeds-db.auth.password", + "anchore-feeds-db.postgresPort": "feeds.feeds-db.primary.service.ports.postgresql", + "anchore-feeds-db.postgresUser": "feeds.feeds-db.auth.username", + + "anchore-feeds-gem-db.enabled": "feeds.gem-db.chartEnabled", + "anchore-feeds-gem-db.externalEndpoint": "feeds.gem-db.externalEndpoint", + + + "anchore-feeds-gem-db.postgresDatabase": "feeds.gem-db.auth.database", + "anchore-feeds-gem-db.postgresPassword": "feeds.gem-db.auth.password", + "anchore-feeds-gem-db.postgresPort": "feeds.gem-db.primary.service.ports.postgresql", + "anchore-feeds-gem-db.postgresUser": "feeds.gem-db.auth.username", + + + "anchoreAnalyzer.concurrentTasksPerWorker": "anchoreConfig.analyzer.max_threads", + "anchoreAnalyzer.containerPort": "analyzer.service.port", + "anchoreAnalyzer.enableHints": "anchoreConfig.analyzer.enable_hints", + + "anchoreAnalyzer.layerCacheMaxGigabytes": "anchoreConfig.analyzer.layer_cache_max_gigabytes", + "anchoreApi.external.use_tls": "anchoreConfig.apiext.external.useTLS", + "anchoreCatalog.downAnalyzerTaskRequeue": "anchoreConfig.catalog.down_analyzer_task_requeue", + "anchoreCatalog.runtimeInventory.imageTTLDays": "anchoreConfig.catalog.runtime_inventory.image_ttl_days", + "anchoreEnterpriseFeeds.enabled": "feeds.chartEnabled", + "anchoreEnterpriseFeeds.nvdDriverApiKey": "feeds.anchoreConfig.feeds.drivers.nvdv2.api_key", + "anchoreEnterpriseNotifications.uiUrl": "anchoreConfig.notifications.ui_url", + + "anchoreEnterpriseRbac.service.managerPort": "rbacManager.service.port", + "anchoreEnterpriseRbac.service.type": "rbacManager.service.type", + + + "anchoreEnterpriseReports.dataEgressWindow": "anchoreConfig.reports_worker.data_egress_window", + "anchoreEnterpriseReports.dataLoadMaxWorkers": "anchoreConfig.reports_worker.data_load_max_workers", + "anchoreEnterpriseReports.dataRefreshMaxWorkers": "anchoreConfig.reports_worker.data_refresh_max_workers", + "anchoreEnterpriseReports.enableDataEgress": "anchoreConfig.reports_worker.enable_data_egress", + "anchoreEnterpriseReports.enableDataIngress": "anchoreConfig.reports_worker.enable_data_ingress", + "anchoreEnterpriseReports.enableGraphiql": "anchoreConfig.reports.enable_graphiql", + "anchoreEnterpriseReports.service.apiPort": "reports.service.port", + "anchoreEnterpriseUi.enableProxy": "anchoreConfig.ui.enable_proxy", + "anchoreEnterpriseUi.enableSharedLogin": "anchoreConfig.ui.enable_shared_login", + "anchoreEnterpriseUi.enableSsl": "anchoreConfig.ui.enable_ssl", + "anchoreEnterpriseUi.enrichInventoryView": "anchoreConfig.ui.enrich_inventory_view", + "anchoreEnterpriseUi.forceWebsocket": "anchoreConfig.ui.force_websocket", + "anchoreEnterpriseUi.logLevel": "anchoreConfig.ui.log_level", + "anchoreEnterpriseUi.dbUser": "ui.dbUser", + "anchoreEnterpriseUi.dbPass": "ui.dbPass", + "anchoreEnterpriseUi.redisHost": "anchoreConfig.ui.redis_host", + "anchoreEnterpriseUi.redisFlushdb": "anchoreConfig.ui.redis_flushdb", + "anchoreGlobal.dbConfig.connectionPoolMaxOverflow": "anchoreConfig.database.db_pool_max_overflow", + "anchoreGlobal.dbConfig.connectionPoolSize": "anchoreConfig.database.db_pool_size", + "anchoreGlobal.dbConfig.sslRootCertName": "anchoreConfig.database.sslRootCertFileName", + "anchoreGlobal.defaultAdminEmail": "anchoreConfig.default_admin_email", + "anchoreGlobal.defaultAdminPassword": "anchoreConfig.default_admin_password", + "anchoreGlobal.enableMetrics": "anchoreConfig.metrics.enabled", + "anchoreGlobal.hashedPasswords": "anchoreConfig.user_authentication.hashed_passwords", + "anchoreGlobal.internalServicesSsl.certSecretCertName": "anchoreConfig.internalServicesSSL.certSecretCertFileName", + "anchoreGlobal.internalServicesSsl.certSecretKeyName": "anchoreConfig.internalServicesSSL.certSecretKeyFileName", + "anchoreGlobal.logLevel": "anchoreConfig.log_level", + "anchoreGlobal.metricsAuthDisabled": "anchoreConfig.metrics.auth_disabled", + "anchoreGlobal.oauthEnabled": "anchoreConfig.user_authentication.oauth.enabled", + "anchoreGlobal.oauthRefreshTokenExpirationSeconds": "anchoreConfig.user_authentication.oauth.refresh_token_expiration_seconds", + "anchoreGlobal.oauthTokenExpirationSeconds": "anchoreConfig.user_authentication.oauth.default_token_expiration_seconds", + "anchoreGlobal.saml.privateKeyName": "anchoreConfig.keys.privateKeyFileName", + "anchoreGlobal.saml.publicKeyName": "anchoreConfig.keys.publicKeyFileName", + "anchoreGlobal.serviceDir": "anchoreConfig.service_dir", + "anchoreGlobal.ssoRequireExistingUsers": "anchoreConfig.user_authentication.sso_require_existing_users", + "cloudsql.image.pullPolicy": "cloudsql.imagePullPolicy", + "inject_secrets_via_env": "injectSecretsViaEnv", + + + "ui-redis.enabled": "ui-redis.chartEnabled", + "anchoreGlobal.allowECRUseIAMRole": "anchoreConfig.allow_awsecr_iam_auto", +} + +#### ENGINE TO ENTERPRISE FOR KEYS THAT ARE NOW ENV VARS #### +ENTERPRISE_ENV_VAR_MAPPING = { + "anchoreAnalyzer.maxRequestThreads": "analyzer.ANCHORE_MAX_REQUEST_THREADS", + "anchoreAnalyzer.enableOwnedPackageFiltering": "analyzer.ANCHORE_OWNED_PACKAGE_FILTERING_ENABLED", + "anchoreApi.maxRequestThreads": "api.ANCHORE_MAX_REQUEST_THREADS", + "anchoreCatalog.maxRequestThreads": "catalog.ANCHORE_MAX_REQUEST_THREADS", + "anchoreCatalog.imageGCMaxWorkerThreads": "catalog.ANCHORE_CATALOG_IMAGE_GC_WORKERS", + + "anchoreEnterpriseNotifications.maxRequestThreads": "notifications.ANCHORE_MAX_REQUEST_THREADS", + "anchoreEnterpriseRbac.maxRequestThreads": "rbacAuth.ANCHORE_MAX_REQUEST_THREADS", + "anchoreEnterpriseReports.maxRequestThreads": "reports.ANCHORE_MAX_REQUEST_THREADS", + + "anchoreGlobal.clientConnectTimeout": "ANCHORE_GLOBAL_CLIENT_CONNECT_TIMEOUT", + "anchoreGlobal.clientReadTimeout": "ANCHORE_GLOBAL_CLIENT_READ_TIMEOUT", + "anchoreGlobal.maxCompressedImageSizeMB": "ANCHORE_MAX_COMPRESSED_IMAGE_SIZE_MB", + "anchoreGlobal.serverRequestTimeout": "ANCHORE_GLOBAL_SERVER_REQUEST_TIMEOUT_SEC", + "anchoreGlobal.syncGithub": "ANCHORE_FEEDS_GITHUB_ENABLED", + "anchoreGlobal.syncPackages": "ANCHORE_FEEDS_PACKAGES_ENABLED", + "anchoreGlobal.syncVulnerabilites": "ANCHORE_FEEDS_VULNERABILITIES_ENABLED", + "anchoreGlobal.syncNvd": "ANCHORE_FEEDS_DRIVER_NVDV2_ENABLED", + "anchoreGlobal.imageAnalyzeTimeoutSeconds": "ANCHORE_IMAGE_ANALYZE_TIMEOUT_SECONDS", + + "anchorePolicyEngine.cacheTTL": "policyEngine.ANCHORE_POLICY_EVAL_CACHE_TTL_SECONDS", + "anchorePolicyEngine.enablePackageDbLoad": "policyEngine.ANCHORE_POLICY_ENGINE_ENABLE_PACKAGE_DB_LOAD", + "anchorePolicyEngine.maxRequestThreads": "policyEngine.ANCHORE_MAX_REQUEST_THREADS", + "anchoreSimpleQueue.maxRequestThreads": "simpleQueue.ANCHORE_MAX_REQUEST_THREADS", + +} + +#### ENGINE TO FEEDS KEYS THAT ARE NOW ENV VARS #### +FEEDS_ENV_VAR_MAPPING = { + + "anchoreEnterpriseFeeds.alpineDriverEnabled": "feeds.ANCHORE_FEEDS_DRIVER_ALPINE_ENABLED", + "anchoreEnterpriseFeeds.amazonDriverEnabled": "feeds.ANCHORE_FEEDS_DRIVER_AMAZON_ENABLED", + "anchoreEnterpriseFeeds.anchoreMatchExclusionsEnabled": "feeds.ANCHORE_FEEDS_DRIVER_MATCH_EXCLUSIONS", + "anchoreEnterpriseFeeds.apiOnly": "feeds.ANCHORE_FEEDS_API_ONLY", + "anchoreEnterpriseFeeds.chainguardDriverEnabled": "feeds.ANCHORE_FEEDS_DRIVER_CHAINGUARD_ENABLED", + "anchoreEnterpriseFeeds.debianDriverEnabled": "feeds.ANCHORE_FEEDS_DRIVER_DEBIAN_ENABLED", + "anchoreEnterpriseFeeds.grypeDriverEnabled": "feeds.ANCHORE_FEEDS_DRIVER_GRYPEDB_ENABLED", + "anchoreEnterpriseFeeds.grypedbPersistProviderWorkspaces": "feeds.ANCHORE_FEEDS_GRYPEDB_PERSIST_WORKSPACE", + "anchoreEnterpriseFeeds.grypedbPreloadEnabled": "feeds.ANCHORE_FEEDS_GRYPEDB_PRELOAD_ENABLED", + "anchoreEnterpriseFeeds.grypedbPreloadWorkspaceArchivePath": "feeds.ANCHORE_FEEDS_GRYPEDB_PRELOAD_PATH", + "anchoreEnterpriseFeeds.grypedbRestoreProviderWorkspaces": "feeds.ANCHORE_FEEDS_GRYPEDB_RESTORE_WORKSPACE", + "anchoreEnterpriseFeeds.maxRequestThreads": "feeds.ANCHORE_MAX_REQUEST_THREADS", + + "anchoreEnterpriseFeeds.olDriverEnabled": "feeds.ANCHORE_FEEDS_DRIVER_OL_ENABLED", + "anchoreEnterpriseFeeds.rhelDriverConcurrency": "feeds.ANCHORE_FEEDS_DRIVER_RHEL_CONCURRENCY", + "anchoreEnterpriseFeeds.rhelDriverEnabled": "feeds.ANCHORE_FEEDS_DRIVER_RHEL_ENBALED", + "anchoreEnterpriseFeeds.slesDriverEnabled": "feeds.ANCHORE_FEEDS_DRIVER_SLES_ENABLED", + "anchoreEnterpriseFeeds.ubuntuDriverEnabled": "feeds.ANCHORE_FEEDS_DRIVER_UBUNTU_ENABLED", + "anchoreEnterpriseFeeds.ubuntuDriverGitBranch": "feeds.ANCHORE_FEEDS_DRIVER_UBUNTU_BRANCH", + "anchoreEnterpriseFeeds.ubuntuDriverGitUrl": "feeds.ANCHORE_FEEDS_DRIVER_UBUNTU_URL", + "anchoreEnterpriseFeeds.wolfiDriverEnabled": "feeds.ANCHORE_FEEDS_DRIVER_WOLFI_ENABLED", + "anchoreEnterpriseFeeds.nvdv2DriverEnabled": "feeds.ANCHORE_FEEDS_DRIVER_NVDV2_ENABLED", +} + +#### VALUES THAT ARE NO LONGER PART OF THE NEW CHART #### +DEPRECATED_KEYS = { + + "anchoreEngineUpgradeJob": "deprecated", + + "anchoreEnterpriseFeeds.nvdDriverEnabled": "deprecated", + "anchoreEnterpriseFeeds.useNvdDriverApiKey": "deprecated", + + "anchoreEnterpriseGlobal.enabled": "deprecated", + "anchoreEnterpriseNotifications.enabled": "deprecated", + "anchoreEnterpriseRbac.enabled": "deprecated", + "anchoreEnterpriseRbac.service.authPort": "8089", + "anchoreEnterpriseReports.enabled": "deprecated", + "anchoreEnterpriseUi.enabled": "deprecated", + "anchoreGlobal.feedsConnectionTimeout": "3", + "anchoreGlobal.feedsReadTimeout": "60", + "anchoreGlobal.image": "deprecated", + + "anchoreGlobal.imagePullPolicy": "deprecated", + "anchoreGlobal.imagePullSecretName": "deprecated", + "anchoreGlobal.syncGrypeDB": "true", + "anchoreGlobal.webhooksEnabled": "deprecated", + "postgresql.persistence.resourcePolicy": "deprecated", + "anchoreGlobal.saml.useExistingSecret": "deprecated", + "anchoreEnterpriseReports.service.workerPort": "deprecated", +} + +POST_PROCESSING = { + "postgresql.image": { + "action": "split_value", + "split_on": ":", + "new_keys": ("postgresql.image.repository", "postgresql.image.tag") + }, + "anchore-feeds-db.image": { + "action": "split_value", + "split_on": ":", + "new_keys": ("feeds.feeds-db.image.repository", "feeds.feeds-db.image.tag") + }, + "anchore-feeds-gem-db.image": { + "action": "split_value", + "split_on": ":", + "new_keys": ("feeds.gem-db.image.repository", "feeds.gem-db.image.tag") + }, + "cloudsql.image.repository": { + "action": "merge", + "merge_keys": ("cloudsql.image.repository", "cloudsql.image.tag"), + "new_key": "cloudsql.image" + }, + "cloudsql.image.tag": { + "action": "merge", + "merge_keys": ("cloudsql.image.repository", "cloudsql.image.tag"), + "new_key": "cloudsql.image" + }, + "anchoreEnterpriseRbac.extraEnv": { + "action": "duplicate", + "new_keys": ["rbacAuth.extraEnv", "rbacManager.extraEnv"] + }, + "anchoreEnterpriseFeeds.existingSecretName": { + "action": "key_addition", + "new_keys": [("feeds.existingSecretName", "default"), ("feeds.useExistingSecrets", True)] + } +} diff --git a/scripts/tests/configs/test_convert_values_file.yaml b/scripts/tests/configs/test_convert_values_file.yaml new file mode 100644 index 00000000..8d2d86e3 --- /dev/null +++ b/scripts/tests/configs/test_convert_values_file.yaml @@ -0,0 +1,15 @@ +anchoreEnterpriseGlobal: + enabled: true + +anchoreGlobal: + useExistingSecrets: true + existingSecretName: global-existing-secrets + +anchoreEnterpriseUi: + existingSecretName: ui-existing-secrets + +anchoreEnterpriseFeeds: + existingSecretName: feeds-existing-secrets + +anchoreApi: + maxRequestThreads: 9876543210 \ No newline at end of file diff --git a/scripts/tests/configs/test_convert_values_file_result.yaml b/scripts/tests/configs/test_convert_values_file_result.yaml new file mode 100644 index 00000000..18fa8a86 --- /dev/null +++ b/scripts/tests/configs/test_convert_values_file_result.yaml @@ -0,0 +1,11 @@ +"existingSecretName": "global-existing-secrets" +"feeds": + "existingSecretName": "feeds-existing-secrets" + "useExistingSecrets": true +"ui": + "existingSecretName": "ui-existing-secrets" +"useExistingSecrets": true +"api": + "extraEnv": + - "name": "ANCHORE_MAX_REQUEST_THREADS" + "value": 9876543210 \ No newline at end of file diff --git a/scripts/tests/test_anchoreAnalyzer_value_mapping.py b/scripts/tests/test_anchoreAnalyzer_value_mapping.py new file mode 100644 index 00000000..5e112c70 --- /dev/null +++ b/scripts/tests/test_anchoreAnalyzer_value_mapping.py @@ -0,0 +1,345 @@ +import os +import shutil +import unittest +from helpers import ( + replace_keys_with_mappings, +) + +class TestReplaceKeysWithMappingsAnalyzer(unittest.TestCase): + def setUp(self): + self.results_dir = "test_results_dir" + + def tearDown(self): + if os.path.exists(self.results_dir): + shutil.rmtree(self.results_dir) + + def test_anchoreAnalyzer_replicaCount_value(self): + dot_string_dict = { + "anchoreAnalyzer.replicaCount": 2, + } + expected_result = { + 'analyzer': { + 'replicaCount': 2 + } + } + result = replace_keys_with_mappings(dot_string_dict, self.results_dir) + self.assertEqual(result[0], expected_result) + + def test_anchoreAnalyzer_containerPort_value(self): + dot_string_dict = { + "anchoreAnalyzer.containerPort": 8084, + } + expected_result = { + 'analyzer': { + 'service': { + 'port': 8084 + } + } + } + result = replace_keys_with_mappings(dot_string_dict, self.results_dir) + self.assertEqual(result[0], expected_result) + + def test_anchoreAnalyzer_extraEnv_value(self): + dot_string_dict = { + "anchoreAnalyzer.extraEnv": [ + { + "name": "foo", + "value": "bar" + } + ] + } + expected_result = { + 'analyzer': { + 'extraEnv': [ + { + 'name': 'foo', + 'value': 'bar' + } + ] + } + } + result = replace_keys_with_mappings(dot_string_dict, self.results_dir) + self.assertEqual(result[0], expected_result) + + def test_anchoreAnalyzer_serviceAccountName_value(self): + dot_string_dict = { + "anchoreAnalyzer.serviceAccountName": "foo", + } + expected_result = { + 'analyzer': { + 'serviceAccountName': 'foo' + } + } + result = replace_keys_with_mappings(dot_string_dict, self.results_dir) + self.assertEqual(result[0], expected_result) + + def test_anchoreAnalyzer_resources_value(self): + dot_string_dict = { + "anchoreAnalyzer.resources.limits.cpu": 1, + "anchoreAnalyzer.resources.limits.memory": "4G", + "anchoreAnalyzer.resources.requests.cpu": 1, + "anchoreAnalyzer.resources.requests.memory": "1G", + } + expected_result = { + 'analyzer': { + 'resources': { + 'limits': { + 'cpu': 1, + 'memory': '4G' + }, + 'requests': { + 'cpu': 1, + 'memory': '1G' + } + } + } + } + result = replace_keys_with_mappings(dot_string_dict, self.results_dir) + self.assertEqual(result[0], expected_result) + + def test_anchoreAnalyzer_labels_value(self): + dot_string_dict = { + "anchoreAnalyzer.labels.name": "foo", + "anchoreAnalyzer.labels.value": "bar", + "anchoreAnalyzer.labels.kubernetes.io/description": "baz", + } + expected_result = { + 'analyzer': { + 'labels': + { + 'name': 'foo', + 'value': 'bar', + 'kubernetes.io/description': 'baz' + } + } + } + result = replace_keys_with_mappings(dot_string_dict, self.results_dir) + self.assertEqual(result[0], expected_result) + + def test_anchoreAnalyzer_annotations_value(self): + dot_string_dict = { + "anchoreAnalyzer.annotations.name": "foo", + "anchoreAnalyzer.annotations.value": "bar", + "anchoreAnalyzer.annotations.kubernetes.io/description": "baz", + } + expected_result = { + 'analyzer': { + 'annotations': + { + 'name': 'foo', + 'value': 'bar', + 'kubernetes.io/description': 'baz' + } + } + } + result = replace_keys_with_mappings(dot_string_dict, self.results_dir) + self.assertEqual(result[0], expected_result) + + def test_anchoreanalyzer_deploymentAnnotations_value(self): + dot_string_dict = { + "anchoreAnalyzer.deploymentAnnotations.name": "foo", + "anchoreAnalyzer.deploymentAnnotations.value": "bar", + } + expected_result = { + 'analyzer': { + 'deploymentAnnotations': { + 'name': 'foo', + 'value': 'bar' + } + } + } + result = replace_keys_with_mappings(dot_string_dict, self.results_dir) + self.assertEqual(result[0], expected_result) + + def test_anchoreAnalyzer_nodeSelector_value(self): + dot_string_dict = { + "anchoreAnalyzer.nodeSelector.name": "foo", + "anchoreAnalyzer.nodeSelector.value": "bar", + + } + expected_result = { + 'analyzer': { + 'nodeSelector': + { + 'name': 'foo', + 'value': 'bar' + } + + } + } + result = replace_keys_with_mappings(dot_string_dict, self.results_dir) + self.assertEqual(result[0], expected_result) + + def test_anchoreAnalyzer_tolerations_value(self): + dot_string_dict = { + "anchoreAnalyzer.tolerations": [ + { + "name": "foo", + "value": "bar" + } + ] + } + expected_result = { + 'analyzer': { + 'tolerations': [ + { + 'name': 'foo', + 'value': 'bar' + } + ] + } + } + result = replace_keys_with_mappings(dot_string_dict, self.results_dir) + self.assertEqual(result[0], expected_result) + + def test_anchoreAnalyzer_affinity_value(self): + dot_string_dict = { + "anchoreAnalyzer.affinity.name": "foo", + "anchoreAnalyzer.affinity.value": "bar", + } + expected_result = { + 'analyzer': { + 'affinity': { + 'name': 'foo', + 'value': 'bar' + } + } + } + result = replace_keys_with_mappings(dot_string_dict, self.results_dir) + self.assertEqual(result[0], expected_result) + + def test_anchoreAnalyzer_cycleTimers_image_analyzer_value(self): + dot_string_dict = { + "anchoreAnalyzer.cycleTimers.image_analyzer": 1, + } + expected_result = { + 'anchoreConfig': { + 'analyzer': { + 'cycle_timers': { + 'image_analyzer': 1 + } + } + } + + } + result = replace_keys_with_mappings(dot_string_dict, self.results_dir) + self.assertEqual(result[0], expected_result) + + def test_anchoreAnalyzer_concurrentTasksPerWorker_value(self): + dot_string_dict = { + "anchoreAnalyzer.concurrentTasksPerWorker": 1, + } + expected_result = { + 'anchoreConfig': { + 'analyzer': { + 'max_threads': 1 + } + } + + } + result = replace_keys_with_mappings(dot_string_dict, self.results_dir) + self.assertEqual(result[0], expected_result) + + def test_anchoreAnalyzer_layerCacheMaxGigabytes_value(self): + dot_string_dict = { + "anchoreAnalyzer.layerCacheMaxGigabytes": 1, + } + expected_result = { + 'anchoreConfig': { + 'analyzer': { + 'layer_cache_max_gigabytes': 1 + } + } + + } + result = replace_keys_with_mappings(dot_string_dict, self.results_dir) + self.assertEqual(result[0], expected_result) + + def test_anchoreAnalyzer_enableHints_value(self): + dot_string_dict = { + "anchoreAnalyzer.enableHints": False, + } + expected_result = { + 'anchoreConfig': { + 'analyzer': { + 'enable_hints': False + } + } + + } + result = replace_keys_with_mappings(dot_string_dict, self.results_dir) + self.assertEqual(result[0], expected_result) + + def test_anchoreAnalyzer_configFile_value(self): + dot_string_dict = { + "anchoreAnalyzer.configFile.retrieve_files.file_list": [ + "/etc/passwd" + ], + "anchoreAnalyzer.configFile.secret_search.match_params": [ + "MAXFILESIZE=10000", + "STOREONMATCH=n" + ], + "anchoreAnalyzer.configFile.secret_search.regexp_match": [ + "AWS_ACCESS_KEY=(?i).*aws_access_key_id( *=+ *).*(?/", + } + + expected_result = { + 'anchoreConfig': { + 'webhooks': { + 'ssl_verify': False, + 'url': 'http://somehost:9090//', + 'webhook_pass': 'my-webhook-pass', + 'webhook_user': 'my-webhook-user' + } + } + } + + result = replace_keys_with_mappings(dot_string_dict, self.results_dir) + self.assertEqual(result[0], expected_result) + + def test_anchoreGlobal_policyBundles_values(self): + dot_string_dict = { + 'anchoreGlobal.policyBundles.custom_policy_bundle1.json': '{\n "id": "custom1",\n "version": "1_0",\n "name": "My custom bundle",\n "comment": "My system\'s custom bundle",\n "whitelisted_images": [],\n "blacklisted_images": [],\n "mappings": [],\n "whitelists": [],\n "policies": []\n}\n' + } + + expected_result = { + 'anchoreConfig': { + 'policyBundles': { + 'custom_policy_bundle1': { + 'json': '{\n "id": "custom1",\n "version": "1_0",\n "name": "My custom bundle",\n "comment": "My system\'s custom bundle",\n "whitelisted_images": [],\n "blacklisted_images": [],\n "mappings": [],\n "whitelists": [],\n "policies": []\n}\n' + } + } + } + } + + result = replace_keys_with_mappings(dot_string_dict, self.results_dir) + self.assertEqual(result[0], expected_result) + + def test_anchoreGlobal_probes_values(self): + dot_string_dict = { + "anchoreGlobal.probes.liveness.initialDelaySeconds": 120, + "anchoreGlobal.probes.liveness.timeoutSeconds": 10, + "anchoreGlobal.probes.liveness.periodSeconds": 10, + "anchoreGlobal.probes.liveness.failureThreshold": 6, + "anchoreGlobal.probes.liveness.successThreshold": 1, + "anchoreGlobal.probes.readiness.timeoutSeconds": 10, + "anchoreGlobal.probes.readiness.periodSeconds": 10, + "anchoreGlobal.probes.readiness.failureThreshold": 3, + "anchoreGlobal.probes.readiness.successThreshold": 1, + } + + expected_result = { + 'probes': { + 'liveness': { + 'failureThreshold': 6, + 'initialDelaySeconds': 120, + 'periodSeconds': 10, + 'successThreshold': 1, + 'timeoutSeconds': 10 + }, + 'readiness': { + 'failureThreshold': 3, + 'periodSeconds': 10, + 'successThreshold': 1, + 'timeoutSeconds': 10 + } + } + } + result = replace_keys_with_mappings(dot_string_dict, self.results_dir) + self.assertEqual(result[0], expected_result) + + # inject_secrets_via_env: false + def test_anchoreGlobal_inject_secrets_via_env_value(self): + dot_string_dict = { + "inject_secrets_via_env": True, + } + expected_result = { + 'injectSecretsViaEnv': True + } + result = replace_keys_with_mappings(dot_string_dict, self.results_dir) + self.assertEqual(result[0], expected_result) + + # def test_replace_keys_with_mappings_env_var(self): + + # dot_string_dict = {"anchoreApi.maxRequestThreads": 999} + # expected_result = { + # 'api': + # {'extraEnv': [ + # {'name': 'ANCHORE_MAX_REQUEST_THREADS', 'value': 999} + # ]} + # } + # result = replace_keys_with_mappings(dot_string_dict, self.results_dir) + # self.assertEqual(result[1], expected_result) + + # def test_replace_keys_with_mappings(self): + + # dot_string_dict = {"anchore-feeds-db.persistence.size": 100} + # expected_result = { + # "feeds": { + # "feeds-db": { + # "primary": { + # "persistence": { + # "size": 100 + # } + # } + # } + # } + # } + # result = replace_keys_with_mappings(dot_string_dict, self.results_dir) + # self.assertEqual(result[0], expected_result) + + # now an environment variable + def test_anchoreGlobal_serverRequestTimeout_value(self): + dot_string_dict = { + "anchoreGlobal.serverRequestTimeout": 300, + } + expected_result = {} + result = replace_keys_with_mappings(dot_string_dict, self.results_dir) + self.assertEqual(result[0], expected_result) + + expected_env_result = { + 'extraEnv': + [ + { + 'name': 'ANCHORE_GLOBAL_SERVER_REQUEST_TIMEOUT_SEC', + 'value': 300 + } + ] + } + self.assertEqual(result[1], expected_env_result) + + +if __name__ == '__main__': + unittest.main() \ No newline at end of file diff --git a/scripts/tests/test_anchorePolicyEngine_value_mapping.py b/scripts/tests/test_anchorePolicyEngine_value_mapping.py new file mode 100644 index 00000000..904166f2 --- /dev/null +++ b/scripts/tests/test_anchorePolicyEngine_value_mapping.py @@ -0,0 +1,299 @@ +import os +import shutil +import unittest +from helpers import ( + replace_keys_with_mappings, +) + +class TestReplaceKeysWithMappingsPolicyEngine(unittest.TestCase): + def setUp(self): + self.results_dir = "test_results_dir" + + def tearDown(self): + if os.path.exists(self.results_dir): + shutil.rmtree(self.results_dir) + + def test_anchorePolicyEngine_replicaCount_value(self): + dot_string_dict = { + "anchorePolicyEngine.replicaCount": 2, + } + expected_result = { + 'policyEngine': { + 'replicaCount': 2 + } + } + result = replace_keys_with_mappings(dot_string_dict, self.results_dir) + self.assertEqual(result[0], expected_result) + + + def test_anchorePolicyEngine_resources_value(self): + dot_string_dict = { + "anchorePolicyEngine.resources.limits.cpu": 1, + "anchorePolicyEngine.resources.limits.memory": "4G", + "anchorePolicyEngine.resources.requests.cpu": 1, + "anchorePolicyEngine.resources.requests.memory": "1G" + } + expected_result = { + 'policyEngine': { + 'resources': { + 'limits': { + 'cpu': 1, + 'memory': '4G' + }, + 'requests': { + 'cpu': 1, + 'memory': '1G' + } + } + } + } + result = replace_keys_with_mappings(dot_string_dict, self.results_dir) + self.assertEqual(result[0], expected_result) + + def test_anchorePolicyEngine_labels_value(self): + dot_string_dict = { + "anchorePolicyEngine.labels.foobar": "baz", + "anchorePolicyEngine.labels.with.a.dot.foobar": "baz" + } + expected_result = { + 'policyEngine': { + 'labels': + { + 'foobar': 'baz', + 'with.a.dot.foobar': 'baz' + } + } + } + result = replace_keys_with_mappings(dot_string_dict, self.results_dir) + self.assertEqual(result[0], expected_result) + + def test_anchorePolicyEngine_annotations_value(self): + dot_string_dict = { + "anchorePolicyEngine.annotations.foobar": "baz", + "anchorePolicyEngine.annotations.with.a.dot.foobar": "baz" + } + expected_result = { + 'policyEngine': { + 'annotations': + { + 'foobar': 'baz', + 'with.a.dot.foobar': 'baz' + } + } + } + result = replace_keys_with_mappings(dot_string_dict, self.results_dir) + self.assertEqual(result[0], expected_result) + + def test_anchorePolicyEngine_deploymentAnnotations_value(self): + dot_string_dict = { + "anchorePolicyEngine.deploymentAnnotations.foobar": "baz", + "anchorePolicyEngine.deploymentAnnotations.with.a.dot.foobar": "baz" + } + expected_result = { + 'policyEngine': { + 'deploymentAnnotations': { + 'foobar': 'baz', + 'with.a.dot.foobar': 'baz' + } + } + } + result = replace_keys_with_mappings(dot_string_dict, self.results_dir) + self.assertEqual(result[0], expected_result) + + def test_anchorePolicyEngine_nodeSelector_value(self): + dot_string_dict = { + "anchorePolicyEngine.nodeSelector.name": "foo", + "anchorePolicyEngine.nodeSelector.with.a.dot.name": "bar" + } + expected_result = { + 'policyEngine': { + 'nodeSelector': { + 'name': 'foo', + 'with.a.dot.name': 'bar' + } + } + } + result = replace_keys_with_mappings(dot_string_dict, self.results_dir) + self.assertEqual(result[0], expected_result) + + def test_anchorePolicyEngine_tolerations_value(self): + dot_string_dict = { + "anchorePolicyEngine.tolerations": [ + { + "key": "key", + "operator": "Equal", + "value": "value", + "effect": "NoSchedule" + } + ] + } + expected_result = { + 'policyEngine': { + 'tolerations': [ + { + 'key': 'key', + 'operator': 'Equal', + 'value': 'value', + 'effect': 'NoSchedule' + } + ] + } + } + result = replace_keys_with_mappings(dot_string_dict, self.results_dir) + self.assertEqual(result[0], expected_result) + + def test_anchorePolicyEngine_affinity_value(self): + dot_string_dict = { + "anchorePolicyEngine.affinity.name": "foo", + "anchorePolicyEngine.affinity.with.a.dot.name": "bar" + } + expected_result = { + 'policyEngine': { + 'affinity': + { + 'name': 'foo', + 'with.a.dot.name': 'bar' + } + } + } + result = replace_keys_with_mappings(dot_string_dict, self.results_dir) + self.assertEqual(result[0], expected_result) + + def test_anchorePolicyEngine_extraEnv_value(self): + dot_string_dict = { + "anchorePolicyEngine.extraEnv": [ + { + "name": "foo", + "value": "bar" + } + ] + } + expected_result = { + 'policyEngine': { + 'extraEnv': [ + { + "name": "foo", + "value": "bar" + } + ] + } + } + result = replace_keys_with_mappings(dot_string_dict, self.results_dir) + self.assertEqual(result[0], expected_result) + + def test_anchorePolicyEngine_serviceAccountName_value(self): + dot_string_dict = { + "anchorePolicyEngine.serviceAccountName": "Null" + } + expected_result = { + 'policyEngine': { + 'serviceAccountName': "Null" + } + } + result = replace_keys_with_mappings(dot_string_dict, self.results_dir) + self.assertEqual(result[0], expected_result) + + + def test_anchorePolicyEngine_service_value(self): + dot_string_dict = { + "anchorePolicyEngine.service.name": "Null", + "anchorePolicyEngine.service.type": "ClusterIP", + "anchorePolicyEngine.service.port": 8087, + "anchorePolicyEngine.service.annotations.foo": "bar", + "anchorePolicyEngine.service.annotations.with.a.dot": "qux", + "anchorePolicyEngine.service.labels.foobar": "baz", + "anchorePolicyEngine.service.labels.with.a.dot": "qux", + } + + expected_result = { + 'policyEngine': { + 'service': { + "name": "Null", + "type": "ClusterIP", + "port": 8087, + "annotations": { + "foo": "bar", + "with.a.dot": "qux" + }, + "labels": { + "foobar": "baz", + "with.a.dot": "qux" + } + } + } + } + result = replace_keys_with_mappings(dot_string_dict, self.results_dir) + self.assertEqual(result[0], expected_result) + + def test_anchorePolicyEngine_cycleTimers_value(self): + dot_string_dict = { + "anchorePolicyEngine.cycleTimers.feed_sync": 14400, + "anchorePolicyEngine.cycleTimers.feed_sync_checker": 3600, + "anchorePolicyEngine.cycleTimers.grypedb_sync": 60, + } + + expected_result = { + 'anchoreConfig': { + 'policy_engine': { + 'cycle_timers': { + "feed_sync": 14400, + "feed_sync_checker": 3600, + "grypedb_sync": 60, + } + } + } + } + result = replace_keys_with_mappings(dot_string_dict, self.results_dir) + self.assertEqual(result[0], expected_result) + + def test_anchorePolicyEngine_overrideFeedsToUpstream_value(self): + dot_string_dict = { + "anchorePolicyEngine.overrideFeedsToUpstream": True + } + + expected_result = { + 'anchoreConfig': { + 'policy_engine': { + 'overrideFeedsToUpstream': True + } + } + } + result = replace_keys_with_mappings(dot_string_dict, self.results_dir) + self.assertEqual(result[0], expected_result) + + # Values that become environment variables for Anchore Policy Engine + def test_anchorePolicyEngine_cacheTTL_value(self): + dot_string_dict = { + "anchorePolicyEngine.cacheTTL": 3600, + } + + expected_result = { + 'policyEngine': { + 'extraEnv': [ + { + 'name': 'ANCHORE_POLICY_EVAL_CACHE_TTL_SECONDS', + 'value': 3600 + } + ] + } + } + result = replace_keys_with_mappings(dot_string_dict, self.results_dir) + self.assertEqual(result[1], expected_result) + + def test_anchorePolicyEngine_enablePackageDbLoad_value(self): + dot_string_dict = { + "anchorePolicyEngine.enablePackageDbLoad": True, + } + + expected_result = { + 'policyEngine': { + 'extraEnv': [ + { + 'name': 'ANCHORE_POLICY_ENGINE_ENABLE_PACKAGE_DB_LOAD', + 'value': True + } + ] + } + } + result = replace_keys_with_mappings(dot_string_dict, self.results_dir) + self.assertEqual(result[1], expected_result) \ No newline at end of file diff --git a/scripts/tests/test_anchoreSimpleQueue_value_mapping.py b/scripts/tests/test_anchoreSimpleQueue_value_mapping.py new file mode 100644 index 00000000..1f9312bf --- /dev/null +++ b/scripts/tests/test_anchoreSimpleQueue_value_mapping.py @@ -0,0 +1,236 @@ +import os +import shutil +import unittest +from helpers import ( + replace_keys_with_mappings, +) + +class TestReplaceKeysWithMappingsSimpleQueue(unittest.TestCase): + def setUp(self): + self.results_dir = "test_results_dir" + + def tearDown(self): + if os.path.exists(self.results_dir): + shutil.rmtree(self.results_dir) + + def test_anchoreSimpleQueue_replicaCount_value(self): + dot_string_dict = { + "anchoreSimpleQueue.replicaCount": 2, + } + expected_result = { + 'simpleQueue': { + 'replicaCount': 2 + } + } + result = replace_keys_with_mappings(dot_string_dict, self.results_dir) + self.assertEqual(result[0], expected_result) + + + def test_anchoreSimpleQueue_resources_value(self): + dot_string_dict = { + "anchoreSimpleQueue.resources.limits.cpu": 1, + "anchoreSimpleQueue.resources.limits.memory": "4G", + "anchoreSimpleQueue.resources.requests.cpu": 1, + "anchoreSimpleQueue.resources.requests.memory": "1G" + } + expected_result = { + 'simpleQueue': { + 'resources': { + 'limits': { + 'cpu': 1, + 'memory': '4G' + }, + 'requests': { + 'cpu': 1, + 'memory': '1G' + } + } + } + } + result = replace_keys_with_mappings(dot_string_dict, self.results_dir) + self.assertEqual(result[0], expected_result) + + + def test_anchoreSimpleQueue_labels_value(self): + dot_string_dict = { + "anchoreSimpleQueue.labels.myLabel": "myValue", + "anchoreSimpleQueue.labels.myOtherLabel": "myOtherValue", + "anchoreSimpleQueue.labels.anotherLabel.with.a.dot": "qux" + } + expected_result = { + 'simpleQueue': { + 'labels': + { + 'myLabel': 'myValue', + 'myOtherLabel': 'myOtherValue', + 'anotherLabel.with.a.dot': 'qux' + } + } + } + result = replace_keys_with_mappings(dot_string_dict, self.results_dir) + self.assertEqual(result[0], expected_result) + + def test_anchoreSimpleQueue_annotations_value(self): + dot_string_dict = { + "anchoreSimpleQueue.annotations.foo": "bar", + "anchoreSimpleQueue.annotations.bar": "baz", + "anchoreSimpleQueue.annotations.anotherLabel.with.a.dot": "qux" + } + expected_result = { + 'simpleQueue': { + 'annotations': + { + 'foo': 'bar', + 'bar': 'baz', + 'anotherLabel.with.a.dot': 'qux' + } + + } + } + result = replace_keys_with_mappings(dot_string_dict, self.results_dir) + self.assertEqual(result[0], expected_result) + + def test_anchoreSimpleQueue_deploymentAnnotations_value(self): + dot_string_dict = { + "anchoreSimpleQueue.deploymentAnnotations.foo": "bar", + "anchoreSimpleQueue.deploymentAnnotations.bar": "baz", + "anchoreSimpleQueue.deploymentAnnotations.anotherLabel.with.a.dot": "qux" + } + expected_result = { + 'simpleQueue': { + 'deploymentAnnotations': + { + 'foo': 'bar', + 'bar': 'baz', + 'anotherLabel.with.a.dot': 'qux' + } + } + } + result = replace_keys_with_mappings(dot_string_dict, self.results_dir) + self.assertEqual(result[0], expected_result) + + def test_anchoreSimpleQueue_nodeSelector_value(self): + dot_string_dict = { + "anchoreSimpleQueue.nodeSelector.name": "foo", + "anchoreSimpleQueue.nodeSelector.value": "bar", + "anchoreSimpleQueue.nodeSelector.anotherLabel.with.a.dot": "baz" + } + expected_result = { + 'simpleQueue': { + 'nodeSelector': + { + 'name': 'foo', + 'value': 'bar', + 'anotherLabel.with.a.dot': 'baz' + } + } + } + result = replace_keys_with_mappings(dot_string_dict, self.results_dir) + self.assertEqual(result[0], expected_result) + + def test_anchoreSimpleQueue_tolerations_value(self): + dot_string_dict = { + "anchoreSimpleQueue.tolerations": [ + { + "name": "foo", + "value": "bar" + } + ] + } + expected_result = { + 'simpleQueue': { + 'tolerations': [ + { + 'name': 'foo', + 'value': 'bar' + } + ] + } + } + result = replace_keys_with_mappings(dot_string_dict, self.results_dir) + self.assertEqual(result[0], expected_result) + + def test_anchoreSimpleQueue_affinity_value(self): + dot_string_dict = { + "anchoreSimpleQueue.affinity.name": "foo", + "anchoreSimpleQueue.affinity.value": "bar", + "anchoreSimpleQueue.affinity.anotherLabel.with.a.dot": "baz" + } + expected_result = { + 'simpleQueue': { + 'affinity':{ + 'name': 'foo', + 'value': 'bar', + 'anotherLabel.with.a.dot': 'baz' + } + } + } + result = replace_keys_with_mappings(dot_string_dict, self.results_dir) + self.assertEqual(result[0], expected_result) + + def test_anchoreSimpleQueue_extraEnv_value(self): + dot_string_dict = { + "anchoreSimpleQueue.extraEnv": [ + { + "name": "foo", + "value": "bar" + } + ] + } + expected_result = { + 'simpleQueue': { + 'extraEnv': [ + { + "name": "foo", + "value": "bar" + } + ] + } + } + result = replace_keys_with_mappings(dot_string_dict, self.results_dir) + self.assertEqual(result[0], expected_result) + + def test_anchoreSimpleQueue_serviceAccountName_value(self): + dot_string_dict = { + "anchoreSimpleQueue.serviceAccountName": "Null" + } + expected_result = { + 'simpleQueue': { + 'serviceAccountName': "Null" + } + } + result = replace_keys_with_mappings(dot_string_dict, self.results_dir) + self.assertEqual(result[0], expected_result) + + + def test_anchoreSimpleQueue_service_value(self): + dot_string_dict = { + "anchoreSimpleQueue.service.name": "Null", + "anchoreSimpleQueue.service.type": "ClusterIP", + "anchoreSimpleQueue.service.port": 8082, + "anchoreSimpleQueue.service.annotations.foo": "bar", + "anchoreSimpleQueue.service.annotations.baz": "qux", + "anchoreSimpleQueue.service.annotations.with.a.dot": "quux", + "anchoreSimpleQueue.service.labels.foobar": "baz", + "anchoreSimpleQueue.service.labels.with.a.dot": "qux" + } + expected_result = { + 'simpleQueue': { + 'service': { + "name": "Null", + "type": "ClusterIP", + "port": 8082, + "annotations": { + "foo": "bar", + "baz": "qux", + "with.a.dot": "quux" + }, + "labels": { + "foobar": "baz", + "with.a.dot": "qux" + } + } + } + } + result = replace_keys_with_mappings(dot_string_dict, self.results_dir) + self.assertEqual(result[0], expected_result) \ No newline at end of file diff --git a/scripts/tests/test_helpers.py b/scripts/tests/test_helpers.py new file mode 100644 index 00000000..1aded896 --- /dev/null +++ b/scripts/tests/test_helpers.py @@ -0,0 +1,313 @@ +# test_helpers.py +import os +import shutil +import unittest +import yaml +from helpers import ( + create_new_dotstring, + write_to_file, + prep_dir, + dict_keys_to_dot_string, + merge_dicts, + replace_keys_with_mappings, + create_dict_entry, + convert_values_file +) + +# write_to_file(data, file_name): writes data to file_name, returns file_name +class TestWriteToFile(unittest.TestCase): + def setUp(self): + self.test_filename = 'test_file.txt' + + def tearDown(self): + if os.path.exists(self.test_filename): + os.remove(self.test_filename) + + def test_write_to_file(self): + data = 'Hello, world!' + file_name = write_to_file(data, self.test_filename) + + self.assertTrue(os.path.exists(self.test_filename)) + self.assertEqual(file_name, self.test_filename) + + with open(self.test_filename, 'r') as file: + written_data = file.read() + + self.assertEqual(written_data, data) + +# prep_dir(directory_name, clean=False): creates directory_name if it doesn't exist, returns directory_name +class TestPrepDir(unittest.TestCase): + def empty_dir(self, directory_path): + # if listdir returns an empty list, the directory is empty, return true + return not os.listdir(directory_path) + + def setUp(self): + self.prep_dir_name = 'prep_dir_name' + if os.path.exists(self.prep_dir_name): + shutil.rmtree(self.prep_dir_name) + + def tearDown(self): + if os.path.exists(self.prep_dir_name): + shutil.rmtree(self.prep_dir_name) + + def test_prep_dir_with_clean(self): + # create the self.prep_dir_name directory with some stuff in it to confirm its cleared out + os.makedirs(self.prep_dir_name) + file_path = os.path.join(self.prep_dir_name, "test_file.txt") + + # Create and close an empty file + with open(file_path, 'w'): + pass + + self.assertFalse(self.empty_dir(self.prep_dir_name)) + + # clean=True deletes the whole directory, then recreates it + prep_dir_path = prep_dir(self.prep_dir_name, clean=True) + self.assertTrue(os.path.exists(self.prep_dir_name)) + self.assertTrue(self.empty_dir(self.prep_dir_name)) + self.assertEqual(prep_dir_path, self.prep_dir_name) + + def test_prep_dir_without_clean(self): + # create the self.prep_dir_name directory with some stuff in it to confirm its not cleared out + os.makedirs(self.prep_dir_name) + file_path = os.path.join(self.prep_dir_name, "test_file.txt") + + # Create and close an empty file + with open(file_path, 'w'): + pass + + self.assertFalse(self.empty_dir(self.prep_dir_name)) + + # clean=False just creates the directory if it doesn't exist + prep_dir_path = prep_dir(self.prep_dir_name, clean=False) + self.assertTrue(os.path.exists(self.prep_dir_name)) + self.assertEqual(prep_dir_path, self.prep_dir_name) + self.assertFalse(self.empty_dir(self.prep_dir_name)) + +# dict_keys_to_dot_string(dictionary, prefix=''): recursively converts dictionary keys to dot string representation +# # return a dictionary where the keys are dot string representation of the old keys and +# the value is the original values +class TestDictKeysToDotString(unittest.TestCase): + def test_dict_keys_to_dotstring(self): + my_dict = { + "key1": "value1", + "key2": "value2", + "key3": { + "key31": "value31", + "key32": "value32", + "key33": { + "key331": "value331", + "key332": "value332", + "key333": ["value3331", "value3332"] + } + }, + "key4": ["value41", "value42"], + "key5": 5, + "key6": False + } + + result = dict_keys_to_dot_string(my_dict) + + self.assertIn("key1", result) + self.assertEqual(result["key1"], "value1") + self.assertTrue(isinstance(result["key1"], str)) + + self.assertIn("key2", result) + self.assertEqual(result["key2"], "value2") + self.assertTrue(isinstance(result["key2"], str)) + + self.assertIn("key3.key31", result) + self.assertEqual(result["key3.key31"], "value31") + self.assertTrue(isinstance(result["key3.key31"], str)) + + self.assertIn("key3.key32", result) + self.assertEqual(result["key3.key32"], "value32") + self.assertTrue(isinstance(result["key3.key32"], str)) + + self.assertIn("key3.key33.key331", result) + self.assertEqual(result["key3.key33.key331"], "value331") + self.assertTrue(isinstance(result["key3.key33.key331"], str)) + + self.assertIn("key3.key33.key332", result) + self.assertEqual(result["key3.key33.key332"], "value332") + self.assertTrue(isinstance(result["key3.key33.key332"], str)) + + self.assertIn("key3.key33.key333", result) + self.assertEqual(result["key3.key33.key333"], ["value3331", "value3332"]) + self.assertTrue(isinstance(result["key3.key33.key333"], list)) + + self.assertIn("key4", result) + self.assertEqual(result["key4"], ["value41", "value42"]) + self.assertTrue(isinstance(result["key4"], list)) + + self.assertIn("key5", result) + self.assertEqual(result["key5"], 5) + self.assertTrue(isinstance(result["key5"], int)) + + self.assertIn("key6", result) + self.assertEqual(result["key6"], False) + self.assertTrue(isinstance(result["key6"], bool)) + + self.assertTrue(isinstance(result, dict)) + +# merge_dicts(dict1, dict2): merges dictionaries, returns merged dictionary +class TestMergeDicts(unittest.TestCase): + def test_merge_dicts(self): + dicts1 = { + "key1": "value1", + "nested_keys": { + "uncommon": "uncommon_value", + "common": "dict1_common_value" + }, + "common_key": "dict1_common_value" + } + + dict2 = { + "key2": "value2", + "nested_keys": { + "common": "dict2_common_value" + }, + "common_key": "dict2_common_value" + } + + expected_dict = { + "key1": "value1", + "key2": "value2", + "nested_keys": { + "uncommon": "uncommon_value", + "common": "dict2_common_value" + }, + "common_key": "dict2_common_value" + } + + merge_dicts_result = merge_dicts(dicts1, dict2) + + self.assertEqual(merge_dicts_result, expected_dict) + +# create_new_dotstring(keys: list, dotstring: str, level: int) -> str +# takes the original key as a list, a dotstring representation of the new key, and the level that the replacement should occur +# strips off the level number from the original key, and appends the dotstring representation of the new key as a list to the end of the original key +# returns a string +class TestCreateNewDotString(unittest.TestCase): + def test_create_new_dotstring_level_1(self): + keys = ["key1", "key2", "key3"] + dotstring = "key4" + level = 1 + + expected_result = "key4.key2.key3" + + result = create_new_dotstring(keys, dotstring, level) + + self.assertEqual(result, expected_result) + + def test_create_new_dotstring_level_2(self): + keys = ["key1", "key2", "key3"] + dotstring = "key4" + level = 2 + + expected_result = "key4.key3" + + result = create_new_dotstring(keys, dotstring, level) + + self.assertEqual(result, expected_result) + + def test_create_new_dotstring_level_3(self): + keys = ["key1", "key2", "key3"] + dotstring = "key4" + level = 3 + + expected_result = "key4" + + result = create_new_dotstring(keys, dotstring, level) + + self.assertEqual(result, expected_result) + +# create_dict_entry(dotstring, value) +# takes a dotstring and a value, returns a dictionary where the keys are created from the dot string representation +class TestCreateDictEntry(unittest.TestCase): + def test_create_dict_entry(self): + dotstring = "key1.key2.key3" + value = "value" + + expected_result = { + "key1": { + "key2": { + "key3": "value" + } + } + } + + result = create_dict_entry(dotstring, value) + + self.assertEqual(result, expected_result) + +# convert_values_file(file, results_dir) +class TestConvertValuesFile(unittest.TestCase): + def setUp(self): + original_test_config_file = 'tests/configs/test_convert_values_file.yaml' + self.expected_result_file = 'tests/configs/test_convert_values_file_result.yaml' + self.temp_test_config_file = 'test_values.yaml' + self.test_results_dir = 'test_results_dir' + shutil.copy(original_test_config_file, self.temp_test_config_file) + + def tearDown(self): + if os.path.exists(self.temp_test_config_file): + os.remove(self.temp_test_config_file) + if os.path.exists(self.test_results_dir): + shutil.rmtree(self.test_results_dir) + + def test_convert_values_file(self): + convert_values_file(self.temp_test_config_file, self.test_results_dir) + self.assertTrue(os.path.exists(self.test_results_dir)) + self.assertTrue(os.path.exists(os.path.join(self.test_results_dir, 'enterprise.test_values.yaml'))) + self.assertTrue(os.path.exists(os.path.join(self.test_results_dir, 'dotstring.txt'))) + converted = dict() + with open(os.path.join(self.test_results_dir, 'enterprise.test_values.yaml'), 'r') as content: + converted = yaml.safe_load(content) + + with open(self.expected_result_file, 'r') as expected_content: + expected_result = yaml.safe_load(expected_content) + + self.assertEqual(converted, expected_result) + +# replace_keys_with_mappings(dot_string_dict, results_dir): +# returns a dictionary where the keys are created from the dot string representation +class TestReplaceKeysWithMappings(unittest.TestCase): + def setUp(self): + self.results_dir = "test_results_dir" + + def tearDown(self): + if os.path.exists(self.results_dir): + shutil.rmtree(self.results_dir) + + def test_replace_keys_with_mappings(self): + + dot_string_dict = {"anchore-feeds-db.persistence.size": 100} + expected_result = { + "feeds": { + "feeds-db": { + "primary": { + "persistence": { + "size": 100 + } + } + } + } + } + result = replace_keys_with_mappings(dot_string_dict, self.results_dir) + self.assertEqual(result[0], expected_result) + + def test_replace_keys_with_mappings_env_var(self): + + dot_string_dict = {"anchoreApi.maxRequestThreads": 999} + expected_result = { + 'api': + {'extraEnv': [ + {'name': 'ANCHORE_MAX_REQUEST_THREADS', 'value': 999} + ]} + } + result = replace_keys_with_mappings(dot_string_dict, self.results_dir) + self.assertEqual(result[1], expected_result) + +if __name__ == '__main__': + unittest.main() \ No newline at end of file