Skip to content

Commit

Permalink
remove redunant cleanup_cache function and behavior
Browse files Browse the repository at this point in the history
  • Loading branch information
Ryan Waldorf committed Feb 5, 2025
1 parent 03f9c11 commit 056b438
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 38 deletions.
18 changes: 0 additions & 18 deletions universql/plugins/snow.py
Original file line number Diff line number Diff line change
Expand Up @@ -921,24 +921,6 @@ def _get_cache_snapshot(self, monitored_dirs):
pass
return files

def cleanup_cache(self, monitored_dirs):
"""
Removes files in the monitored directories.
Args:
before_snapshot: Set of file paths that existed before COPY
monitored_dirs: List of directory paths to clean up
Note:
Logs warnings for files it fails to remove but continues execution
"""
files_in_cache = self._get_cache_snapshot(monitored_dirs)
for file_path in files_in_cache:
try:
os.remove(file_path)
except (FileNotFoundError, PermissionError):
print(f"Could not remove cached file: {file_path}")

def _load_file_format(self, file_format):
file_format_queries = {
"JSON": ["INSTALL json;", "LOAD json;"],
Expand Down
33 changes: 13 additions & 20 deletions universql/protocol/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,6 @@ def perform_query(self, alternative_executor: Executor, raw_query, ast=None) ->
locations = self.get_table_paths_from_catalog(alternative_executor.catalog, tables_list)
with sentry_sdk.start_span(op=op_name, name="Execute query"):
current_ast = ast
clear_files_after_execution = None
for transform in self.transforms:
try:
current_ast = transform.transform_sql(current_ast, alternative_executor)
Expand All @@ -189,26 +188,20 @@ def perform_query(self, alternative_executor: Executor, raw_query, ast=None) ->
message = f"Unable to perform transformation {transform.__class__}"
logger.error(message, exc_info=e)
raise QueryError(f"{message}: {str(e)}")
max_query_attempts = 1
if isinstance(ast, Copy):
stage_plugin = next(transform for transform in self.transforms
if transform.__class__.__name__ == "SnowflakeStageUniversqlPlugin")
before_query_snapshot = stage_plugin.get_snapshot_of_copy_file_directories(ast)
clear_files_after_execution = True
query_attempts = 0
try:
while query_attempts < 1:
try:
new_locations = alternative_executor.execute(current_ast, self.catalog_executor, locations)
break
except Exception as e:
print(f"There was an issue executing this query: {e}. Trying again.")
query_attempts += 1
if query_attempts == 3:
raise e # Re-raise the last exception if all attempts fail
time.sleep(query_attempts * 1.0/2)
finally:
if clear_files_after_execution == True and stage_plugin is not None and before_query_snapshot is not None:
stage_plugin.cleanup_cache(before_query_snapshot)
max_query_attempts = 3
query_attempt = 0
while query_attempt < max_query_attempts:
try:
new_locations = alternative_executor.execute(current_ast, self.catalog_executor, locations)
break
except Exception as e:
print(f"There was an issue executing this query: {e}. Trying again.")
query_attempt += 1
if query_attempt >= max_attempts:
raise e # Re-raise the last exception if all attempts fail
time.sleep(query_attempt * 1.0/2)
if new_locations is not None:
with sentry_sdk.start_span(op=op_name, name="Register new locations"):
self.catalog.register_locations(new_locations)
Expand Down

0 comments on commit 056b438

Please sign in to comment.