Skip to content

Commit

Permalink
rename some functions in snow.py
Browse files Browse the repository at this point in the history
  • Loading branch information
Ryan Waldorf committed Feb 8, 2025
1 parent 0692ba2 commit d87c402
Showing 1 changed file with 20 additions and 20 deletions.
40 changes: 20 additions & 20 deletions universql/plugins/snow.py
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@

DUCKDB_SUPPORTED_FILE_TYPES = ['CSV', 'JSON', 'AVRO', 'PARQUET']

DISALLOWED_PARAMS_BY_FORMAT_COPY = {
DISALLOWED_PARAMS_BY_FORMAT = {
"JSON": {
"ignore_errors": ["ALWAYS_REMOVE"],
"nullstr": ["ALWAYS_REMOVE"],
Expand Down Expand Up @@ -362,10 +362,10 @@ def transform_ast(self, expression: Expression, target_executor: DuckDBExecutor)
return expression

files_list = self._find_files(expression)
processed_file_data_copy = self.get_file_info_copy(files_list, expression)
processed_file_data = self.get_file_info(files_list, expression)

credentials = self._get_credentials_for_copy(processed_file_data_copy, target_executor)
final_ast = self.transform_copy_into_insert_into_select(expression, processed_file_data_copy)
credentials = self._get_credentials_for(processed_file_data, target_executor)
final_ast = self.transform_copy_into_insert_into_select(expression, processed_file_data)
return final_ast

def transform_copy_into_insert_into_select(self, expression, copy_data):
Expand Down Expand Up @@ -503,7 +503,7 @@ def get_stage_name(self, file: sqlglot.exp.Table):
return full_string[1:i]
return full_string[1:i]

def get_file_info_copy(self, files, ast):
def get_file_info(self, files, ast):
copy_data = {
"files": {},
"file_parameters": {}
Expand All @@ -515,7 +515,7 @@ def get_file_info_copy(self, files, ast):
cursor = self.source_executor().catalog.cursor()

for file in files:
raw_duckdb_copy_params = self.get_duckdb_copy_params_copy(file, specified_copy_params, cursor, "SNOWFLAKE")
raw_duckdb_copy_params = self.get_duckdb_copy_params(file, specified_copy_params, cursor, "SNOWFLAKE")
raw_duckdb_copy_params["METADATA"] = raw_duckdb_copy_params["METADATA"] | file
copy_data["files"][file["object_name"]] = raw_duckdb_copy_params["METADATA"]
if copy_data["file_parameters"] == {}:
Expand Down Expand Up @@ -550,7 +550,7 @@ def _extract_copy_params(self, ast):
print(f"{param_name} expressions")
return params

def _get_credentials_for_copy(self, copy_data, target_executor):
def _get_credentials_for(self, copy_data, target_executor):
s3_access_key_id = copy_data.get("s3_access_key_id")
s3_secret_access_key = copy_data.get("s3_secret_access_key")
if s3_access_key_id is None or s3_secret_access_key is None:
Expand All @@ -572,7 +572,7 @@ def _iterate_through_expressions(self, expressions):
result[property_name] = property_value
return result

def get_duckdb_copy_params_copy(self, file, file_format_params, cursor, catalog_type):
def get_duckdb_copy_params(self, file, file_format_params, cursor, catalog_type):
"""
Retrieves and processes Snowflake file metadata.
Expand Down Expand Up @@ -624,11 +624,11 @@ def get_duckdb_copy_params_copy(self, file, file_format_params, cursor, catalog_
property_info["snowflake_property_value"] = property_value
copy_params[property_name] = property_info

duckdb_data = self.convert_to_duckdb_properties_copy(copy_params)
duckdb_data = self.convert_to_duckdb_properties(copy_params)

return duckdb_data

def convert_to_duckdb_properties_copy(self, copy_properties):
def convert_to_duckdb_properties(self, copy_properties):
"""
Maps Snowflake stage properties to DuckDB file reading options.
Expand All @@ -648,7 +648,7 @@ def convert_to_duckdb_properties_copy(self, copy_properties):
# pp(copy_properties)

for snowflake_property_name, snowflake_property_info in copy_properties.items():
converted_properties = self.convert_properties_copy(
converted_properties = self.convert_properties(
file_format, snowflake_property_name, snowflake_property_info)
duckdb_property_name, property_values = next(
iter(converted_properties.items()))
Expand All @@ -669,7 +669,7 @@ def convert_to_duckdb_properties_copy(self, copy_properties):
all_converted_properties["METADATA"] = metadata
return all_converted_properties

def convert_properties_copy(self, file_format, snowflake_property_name, snowflake_property_info):
def convert_properties(self, file_format, snowflake_property_name, snowflake_property_info):

no_match = {
"duckdb_property_name": None,
Expand All @@ -683,14 +683,14 @@ def convert_properties_copy(self, file_format, snowflake_property_name, snowflak
"duckdb_property_type": duckdb_property_type
} | snowflake_property_info | {"snowflake_property_name": snowflake_property_name}
if duckdb_property_name is not None:
value = self._format_value_for_duckdb_copy(
value = self._format_value_for_duckdb(
file_format, snowflake_property_name, properties)
properties["duckdb_property_value"] = value
else:
properties["duckdb_property_value"] = None
return {duckdb_property_name: properties}

def _format_value_for_duckdb_copy(self, file_format, snowflake_property_name, data):
def _format_value_for_duckdb(self, file_format, snowflake_property_name, data):
snowflake_type = data["snowflake_property_type"]
duckdb_type = data["duckdb_property_type"]
snowflake_value = data["snowflake_property_value"]
Expand Down Expand Up @@ -754,7 +754,7 @@ def convert_copy_params_to_read_datatype_params(self, params):
"""

converted_params = []
params = self.apply_param_post_processing_copy(params)
params = self.apply_param_post_processing(params)

for property_name, property_info in params.items():
if property_name == "dateformat" and property_info["duckdb_property_value"] == 'AUTO':
Expand All @@ -781,20 +781,20 @@ def convert_copy_params_to_read_datatype_params(self, params):
converted_params.append(converted_param)
return converted_params

def apply_param_post_processing_copy(self, params):
def apply_param_post_processing(self, params):
format = self.get_file_format(params)
params = self._remove_problematic_params_copy(params, format)
params = self._remove_problematic_params(params, format)
params = self._add_required_params(params, format)
params = self._add_empty_field_as_null_to_nullstr(params)
return params

def get_file_format(self, params):
return params["format"]["duckdb_property_value"]

def _remove_problematic_params_copy(self, params, format):
def _remove_problematic_params(self, params, format):

disallowed_params = DISALLOWED_PARAMS_BY_FORMAT_COPY.get(
format.upper(), {}) | DISALLOWED_PARAMS_BY_FORMAT_COPY.get("ALL_FORMATS", {})
disallowed_params = DISALLOWED_PARAMS_BY_FORMAT.get(
format.upper(), {}) | DISALLOWED_PARAMS_BY_FORMAT.get("ALL_FORMATS", {})

for disallowed_param, disallowed_values in disallowed_params.items():
if disallowed_values[0] in ("ALWAYS_REMOVE"):
Expand Down

0 comments on commit d87c402

Please sign in to comment.