Skip to content

Commit

Permalink
Updating ESApi class methods to use ES client calls instead of tempor…
Browse files Browse the repository at this point in the history
…ary wrapper methods (#3110)

Co-authored-by: Elastic Machine <[email protected]>
(cherry picked from commit c08af2e)

# Conflicts:
#	connectors/es/index.py
  • Loading branch information
mattnowzari committed Jan 29, 2025
1 parent f263840 commit c4f5933
Show file tree
Hide file tree
Showing 4 changed files with 189 additions and 76 deletions.
2 changes: 1 addition & 1 deletion NOTICE.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3466,7 +3466,7 @@ SOFTWARE
to the Commercial Software.

elasticsearch
8.17.0
8.17.1
Apache Software License

Apache License
Expand Down
96 changes: 23 additions & 73 deletions connectors/es/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,64 +27,12 @@ class TemporaryConnectorApiWrapper(ESClient):
def __init__(self, elastic_config):
super().__init__(elastic_config)

async def connector_check_in(self, connector_id):
return await self.client.perform_request(
"PUT",
f"/_connector/{connector_id}/_check_in",
headers={"accept": "application/json"},
)

async def connector_update_filtering_draft_validation(
self, connector_id, validation_result
):
async def connector_get(self, connector_id, include_deleted):
return await self.client.perform_request(
"PUT",
f"/_connector/{connector_id}/_filtering/_validation",
headers={"accept": "application/json", "Content-Type": "application/json"},
body={"validation": validation_result},
)

async def connector_activate_filtering_draft(self, connector_id):
return await self.client.perform_request(
"PUT",
f"/_connector/{connector_id}/_filtering/_activate",
"GET",
f"/_connector/{connector_id}",
headers={"accept": "application/json"},
)

async def connector_sync_job_claim(self, sync_job_id, worker_hostname, sync_cursor):
await self.client.perform_request(
"PUT",
f"/_connector/_sync_job/{sync_job_id}/_claim",
headers={"accept": "application/json", "Content-Type": "application/json"},
body={
"worker_hostname": worker_hostname,
**({"sync_cursor": sync_cursor} if sync_cursor else {}),
},
)

async def connector_sync_job_create(self, connector_id, job_type, trigger_method):
return await self.client.perform_request(
"POST",
"/_connector/_sync_job",
headers={"accept": "application/json", "Content-Type": "application/json"},
body={
"id": connector_id,
"job_type": job_type,
"trigger_method": trigger_method,
},
)

async def connector_sync_job_update_stats(
self, sync_job_id, ingestion_stats, metadata
):
await self.client.perform_request(
"PUT",
f"/_connector/_sync_job/{sync_job_id}/_stats",
headers={"accept": "application/json", "Content-Type": "application/json"},
body={
**ingestion_stats,
**({"metadata": metadata} if metadata else {}),
},
params={"include_deleted": include_deleted},
)


Expand All @@ -95,7 +43,7 @@ def __init__(self, elastic_config):

async def connector_check_in(self, connector_id):
return await self._retrier.execute_with_retry(
partial(self._api_wrapper.connector_check_in, connector_id)
partial(self.client.connector.check_in, connector_id=connector_id)
)

async def connector_put(
Expand Down Expand Up @@ -135,34 +83,36 @@ async def connector_update_filtering_draft_validation(
):
return await self._retrier.execute_with_retry(
partial(
self._api_wrapper.connector_update_filtering_draft_validation,
connector_id,
validation_result,
self.client.connector.update_filtering_validation,
connector_id=connector_id,
validation=validation_result,
)
)

async def connector_activate_filtering_draft(self, connector_id):
return await self._retrier.execute_with_retry(
partial(self._api_wrapper.connector_activate_filtering_draft, connector_id)
partial(
self.client.connector.update_active_filtering, connector_id=connector_id
)
)

async def connector_sync_job_claim(self, sync_job_id, worker_hostname, sync_cursor):
return await self._retrier.execute_with_retry(
partial(
self._api_wrapper.connector_sync_job_claim,
sync_job_id,
worker_hostname,
sync_cursor,
self.client.connector.sync_job_claim,
connector_sync_job_id=sync_job_id,
worker_hostname=worker_hostname,
sync_cursor=sync_cursor,
)
)

async def connector_sync_job_create(self, connector_id, job_type, trigger_method):
return await self._retrier.execute_with_retry(
partial(
self._api_wrapper.connector_sync_job_create,
connector_id,
job_type,
trigger_method,
self.client.connector.sync_job_post,
id=connector_id,
job_type=job_type,
trigger_method=trigger_method,
)
)

Expand All @@ -171,10 +121,10 @@ async def connector_sync_job_update_stats(
):
return await self._retrier.execute_with_retry(
partial(
self._api_wrapper.connector_sync_job_update_stats,
sync_job_id,
ingestion_stats,
metadata,
self.client.connector.sync_job_update_stats,
connector_sync_job_id=sync_job_id,
body=ingestion_stats,
metadata=metadata,
)
)

Expand Down
2 changes: 1 addition & 1 deletion requirements/framework.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ aiofiles==23.2.1
aiomysql==0.1.1
httpx==0.27.0
httpx-ntlm==1.4.0
elasticsearch[async]==8.17.0
elasticsearch[async]==8.17.1
elastic-transport==8.15.1
pyyaml==6.0.1
cffi==1.16.0
Expand Down
165 changes: 164 additions & 1 deletion tests/es/test_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import pytest
from elasticsearch import ApiError, ConflictError

from connectors.es.index import DocumentNotFoundError, ESIndex
from connectors.es.index import DocumentNotFoundError, ESApi, ESIndex

headers = {"X-Elastic-Product": "Elasticsearch"}
config = {
Expand Down Expand Up @@ -255,3 +255,166 @@ async def test_get_all_docs(mock_responses):
assert doc_count == total

await index.close()


@pytest.mark.asyncio
async def test_es_api_connector_check_in():
connector_id = "id"

es_api = ESApi(elastic_config=config)
es_api.client = AsyncMock()

await es_api.connector_check_in(connector_id)

es_api.client.connector.check_in.assert_called_once_with(connector_id=connector_id)


@pytest.mark.asyncio
async def test_es_api_connector_put():
connector_id = "id"
service_type = "service_type"
connector_name = "connector_name"
index_name = "index_name"
is_native = True

es_api = ESApi(elastic_config=config)
es_api.client = AsyncMock()

await es_api.connector_put(
connector_id, service_type, connector_name, index_name, is_native
)

es_api.client.connector.put.assert_called_once_with(
connector_id=connector_id,
service_type=service_type,
name=connector_name,
index_name=index_name,
is_native=is_native,
)


@pytest.mark.asyncio
async def test_es_api_connector_update_scheduling():
connector_id = "id"
scheduling = {"enabled": "true", "interval": "0 4 5 1 *"}

es_api = ESApi(elastic_config=config)
es_api.client = AsyncMock()

await es_api.connector_update_scheduling(connector_id, scheduling)

es_api.client.connector.update_scheduling.assert_called_once_with(
connector_id=connector_id, scheduling=scheduling
)


@pytest.mark.asyncio
async def test_es_api_connector_update_configuration():
connector_id = "id"
configuration = {"config_key": "config_value"}
values = {}

es_api = ESApi(elastic_config=config)
es_api.client = AsyncMock()

await es_api.connector_update_configuration(connector_id, configuration, values)

es_api.client.connector.update_configuration.assert_called_once_with(
connector_id=connector_id, configuration=configuration, values=values
)


@pytest.mark.asyncio
async def test_es_api_connector_update_filtering_draft_validation():
connector_id = "id"
validation_result = {"validation": "result"}

es_api = ESApi(elastic_config=config)
es_api.client = AsyncMock()

await es_api.connector_update_filtering_draft_validation(
connector_id, validation_result
)

es_api.client.connector.update_filtering_validation.assert_called_once_with(
connector_id=connector_id, validation=validation_result
)


@pytest.mark.asyncio
async def test_es_api_connector_activate_filtering_draft():
connector_id = "id"

es_api = ESApi(elastic_config=config)
es_api.client = AsyncMock()

await es_api.connector_activate_filtering_draft(connector_id)

es_api.client.connector.update_active_filtering.assert_called_once_with(
connector_id=connector_id
)


@pytest.mark.asyncio
async def test_es_api_connector_sync_job_create():
connector_id = "id"
job_type = "full"
trigger_method = "on_demand"

es_api = ESApi(elastic_config=config)
es_api.client = AsyncMock()

await es_api.connector_sync_job_create(connector_id, job_type, trigger_method)

es_api.client.connector.sync_job_post.assert_called_once_with(
id=connector_id, job_type=job_type, trigger_method=trigger_method
)


@pytest.mark.asyncio
async def test_es_api_connector_get():
connector_id = "id"
include_deleted = False

es_api = ESApi(elastic_config=config)
es_api._api_wrapper = AsyncMock()

await es_api.connector_get(connector_id, include_deleted)

es_api._api_wrapper.connector_get.assert_called_once_with(
connector_id, include_deleted
)


@pytest.mark.asyncio
async def test_es_api_connector_sync_job_claim():
sync_job_id = "sync_job_id_test"
worker_hostname = "workerhostname"
sync_cursor = {"foo": "bar"}

es_api = ESApi(elastic_config=config)
es_api.client = AsyncMock()

await es_api.connector_sync_job_claim(sync_job_id, worker_hostname, sync_cursor)

es_api.client.connector.sync_job_claim.assert_called_once_with(
connector_sync_job_id=sync_job_id,
worker_hostname=worker_hostname,
sync_cursor=sync_cursor,
)


@pytest.mark.asyncio
async def test_es_api_connector_sync_job_update_stats():
sync_job_id = "sync_job_id_test"
ingestion_stats = {"ingestion": "stat"}
metadata = {"meta": "data"}

es_api = ESApi(elastic_config=config)
es_api.client = AsyncMock()

await es_api.connector_sync_job_update_stats(sync_job_id, ingestion_stats, metadata)

es_api.client.connector.sync_job_update_stats.assert_called_once_with(
connector_sync_job_id=sync_job_id, body=ingestion_stats, metadata=metadata
)

0 comments on commit c4f5933

Please sign in to comment.