Skip to content

Commit

Permalink
[8.0] Add cleanups for filters, calendars, transforms, jobs
Browse files Browse the repository at this point in the history
Co-authored-by: Seth Michael Larson <[email protected]>
  • Loading branch information
github-actions[bot] and sethmlarson authored Feb 3, 2022
1 parent 7391f3b commit 44fbea3
Showing 1 changed file with 55 additions and 1 deletion.
56 changes: 55 additions & 1 deletion test_elasticsearch/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,9 @@ def wipe_cluster(client):
wipe_tasks(client)
wipe_node_shutdown_metadata(client)
wait_for_pending_datafeeds_and_jobs(client)
wipe_calendars(client)
wipe_filters(client)
wipe_transforms(client)

wait_for_cluster_state_updates_to_finish(client)
if close_after_wipe:
Expand Down Expand Up @@ -334,7 +337,7 @@ def wait_for_pending_tasks(client, filter, timeout=30):
break


def wait_for_pending_datafeeds_and_jobs(client, timeout=30):
def wait_for_pending_datafeeds_and_jobs(client: Elasticsearch, timeout=30):
end_time = time.time() + timeout
while time.time() < end_time:
resp = client.ml.get_datafeeds(datafeed_id="*", allow_no_match=True)
Expand All @@ -345,6 +348,7 @@ def wait_for_pending_datafeeds_and_jobs(client, timeout=30):
datafeed_id=datafeed["datafeed_id"]
)

end_time = time.time() + timeout
while time.time() < end_time:
resp = client.ml.get_jobs(job_id="*", allow_no_match=True)
if resp["count"] == 0:
Expand All @@ -353,6 +357,56 @@ def wait_for_pending_datafeeds_and_jobs(client, timeout=30):
client.options(ignore_status=404).ml.close_job(job_id=job["job_id"])
client.options(ignore_status=404).ml.delete_job(job_id=job["job_id"])

end_time = time.time() + timeout
while time.time() < end_time:
resp = client.ml.get_data_frame_analytics(id="*")
if resp["count"] == 0:
break
for job in resp["data_frame_analytics"]:
client.options(ignore_status=404).ml.stop_data_frame_analytics(id=job["id"])
client.options(ignore_status=404).ml.delete_data_frame_analytics(
id=job["id"]
)


def wipe_filters(client: Elasticsearch, timeout=30):
end_time = time.time() + timeout
while time.time() < end_time:
resp = client.ml.get_filters(filter_id="*")
if resp["count"] == 0:
break
for filter in resp["filters"]:
client.options(ignore_status=404).ml.delete_filter(
filter_id=filter["filter_id"]
)


def wipe_calendars(client: Elasticsearch, timeout=30):
end_time = time.time() + timeout
while time.time() < end_time:
resp = client.ml.get_calendars(calendar_id="*")
if resp["count"] == 0:
break
for calendar in resp["calendars"]:
client.options(ignore_status=404).ml.delete_calendar(
calendar_id=calendar["calendar_id"]
)


def wipe_transforms(client: Elasticsearch, timeout=30):
end_time = time.time() + timeout
while time.time() < end_time:
resp = client.transform.get_transform(transform_id="*")
if resp["count"] == 0:
break
for trasnform in resp["transforms"]:
client.options(ignore_status=404).transform.stop_transform(
transform_id=trasnform["id"]
)
client.options(ignore_status=404).transform.delete_transform(
transform_id=trasnform["id"]
)


def wait_for_cluster_state_updates_to_finish(client, timeout=30):
end_time = time.time() + timeout
Expand Down

0 comments on commit 44fbea3

Please sign in to comment.