Skip to content

Commit

Permalink
[8.x] Updating ESApi class methods to use ES client calls instead of …
Browse files Browse the repository at this point in the history
…temporary wrapper methods (#3110) (#3159)
  • Loading branch information
mattnowzari authored Jan 30, 2025
1 parent f263840 commit c802e8a
Show file tree
Hide file tree
Showing 5 changed files with 203 additions and 78 deletions.
2 changes: 1 addition & 1 deletion NOTICE.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3466,7 +3466,7 @@ SOFTWARE
to the Commercial Software.

elasticsearch
8.17.0
8.17.1
Apache Software License

Apache License
Expand Down
104 changes: 30 additions & 74 deletions connectors/es/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,64 +27,12 @@ class TemporaryConnectorApiWrapper(ESClient):
def __init__(self, elastic_config):
super().__init__(elastic_config)

async def connector_check_in(self, connector_id):
async def connector_get(self, connector_id, include_deleted):
return await self.client.perform_request(
"PUT",
f"/_connector/{connector_id}/_check_in",
"GET",
f"/_connector/{connector_id}",
headers={"accept": "application/json"},
)

async def connector_update_filtering_draft_validation(
self, connector_id, validation_result
):
return await self.client.perform_request(
"PUT",
f"/_connector/{connector_id}/_filtering/_validation",
headers={"accept": "application/json", "Content-Type": "application/json"},
body={"validation": validation_result},
)

async def connector_activate_filtering_draft(self, connector_id):
return await self.client.perform_request(
"PUT",
f"/_connector/{connector_id}/_filtering/_activate",
headers={"accept": "application/json"},
)

async def connector_sync_job_claim(self, sync_job_id, worker_hostname, sync_cursor):
await self.client.perform_request(
"PUT",
f"/_connector/_sync_job/{sync_job_id}/_claim",
headers={"accept": "application/json", "Content-Type": "application/json"},
body={
"worker_hostname": worker_hostname,
**({"sync_cursor": sync_cursor} if sync_cursor else {}),
},
)

async def connector_sync_job_create(self, connector_id, job_type, trigger_method):
return await self.client.perform_request(
"POST",
"/_connector/_sync_job",
headers={"accept": "application/json", "Content-Type": "application/json"},
body={
"id": connector_id,
"job_type": job_type,
"trigger_method": trigger_method,
},
)

async def connector_sync_job_update_stats(
self, sync_job_id, ingestion_stats, metadata
):
await self.client.perform_request(
"PUT",
f"/_connector/_sync_job/{sync_job_id}/_stats",
headers={"accept": "application/json", "Content-Type": "application/json"},
body={
**ingestion_stats,
**({"metadata": metadata} if metadata else {}),
},
params={"include_deleted": include_deleted},
)


Expand All @@ -95,11 +43,16 @@ def __init__(self, elastic_config):

async def connector_check_in(self, connector_id):
return await self._retrier.execute_with_retry(
partial(self._api_wrapper.connector_check_in, connector_id)
partial(self.client.connector.check_in, connector_id=connector_id)
)

async def connector_get(self, connector_id, include_deleted=False):
return await self._retrier.execute_with_retry(
partial(self._api_wrapper.connector_get, connector_id, include_deleted)
)

async def connector_put(
self, connector_id, service_type, connector_name, index_name
self, connector_id, service_type, connector_name, index_name, is_native
):
return await self._retrier.execute_with_retry(
partial(
Expand All @@ -108,6 +61,7 @@ async def connector_put(
service_type=service_type,
name=connector_name,
index_name=index_name,
is_native=is_native,
)
)

Expand Down Expand Up @@ -135,34 +89,36 @@ async def connector_update_filtering_draft_validation(
):
return await self._retrier.execute_with_retry(
partial(
self._api_wrapper.connector_update_filtering_draft_validation,
connector_id,
validation_result,
self.client.connector.update_filtering_validation,
connector_id=connector_id,
validation=validation_result,
)
)

async def connector_activate_filtering_draft(self, connector_id):
return await self._retrier.execute_with_retry(
partial(self._api_wrapper.connector_activate_filtering_draft, connector_id)
partial(
self.client.connector.update_active_filtering, connector_id=connector_id
)
)

async def connector_sync_job_claim(self, sync_job_id, worker_hostname, sync_cursor):
return await self._retrier.execute_with_retry(
partial(
self._api_wrapper.connector_sync_job_claim,
sync_job_id,
worker_hostname,
sync_cursor,
self.client.connector.sync_job_claim,
connector_sync_job_id=sync_job_id,
worker_hostname=worker_hostname,
sync_cursor=sync_cursor,
)
)

async def connector_sync_job_create(self, connector_id, job_type, trigger_method):
return await self._retrier.execute_with_retry(
partial(
self._api_wrapper.connector_sync_job_create,
connector_id,
job_type,
trigger_method,
self.client.connector.sync_job_post,
id=connector_id,
job_type=job_type,
trigger_method=trigger_method,
)
)

Expand All @@ -171,10 +127,10 @@ async def connector_sync_job_update_stats(
):
return await self._retrier.execute_with_retry(
partial(
self._api_wrapper.connector_sync_job_update_stats,
sync_job_id,
ingestion_stats,
metadata,
self.client.connector.sync_job_update_stats,
connector_sync_job_id=sync_job_id,
body=ingestion_stats,
metadata=metadata,
)
)

Expand Down
8 changes: 7 additions & 1 deletion connectors/protocol/connectors.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,13 +163,19 @@ async def heartbeat(self, doc_id):
await self.update(doc_id=doc_id, doc={"last_seen": iso_utc()})

async def connector_put(
self, connector_id, service_type, connector_name=None, index_name=None
self,
connector_id,
service_type,
connector_name=None,
index_name=None,
is_native=False,
):
await self.api.connector_put(
connector_id=connector_id,
service_type=service_type,
connector_name=connector_name,
index_name=index_name,
is_native=is_native,
)

async def connector_update_scheduling(
Expand Down
2 changes: 1 addition & 1 deletion requirements/framework.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ aiofiles==23.2.1
aiomysql==0.1.1
httpx==0.27.0
httpx-ntlm==1.4.0
elasticsearch[async]==8.17.0
elasticsearch[async]==8.17.1
elastic-transport==8.15.1
pyyaml==6.0.1
cffi==1.16.0
Expand Down
165 changes: 164 additions & 1 deletion tests/es/test_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import pytest
from elasticsearch import ApiError, ConflictError

from connectors.es.index import DocumentNotFoundError, ESIndex
from connectors.es.index import DocumentNotFoundError, ESApi, ESIndex

headers = {"X-Elastic-Product": "Elasticsearch"}
config = {
Expand Down Expand Up @@ -255,3 +255,166 @@ async def test_get_all_docs(mock_responses):
assert doc_count == total

await index.close()


@pytest.mark.asyncio
async def test_es_api_connector_check_in():
connector_id = "id"

es_api = ESApi(elastic_config=config)
es_api.client = AsyncMock()

await es_api.connector_check_in(connector_id)

es_api.client.connector.check_in.assert_called_once_with(connector_id=connector_id)


@pytest.mark.asyncio
async def test_es_api_connector_put():
connector_id = "id"
service_type = "service_type"
connector_name = "connector_name"
index_name = "index_name"
is_native = True

es_api = ESApi(elastic_config=config)
es_api.client = AsyncMock()

await es_api.connector_put(
connector_id, service_type, connector_name, index_name, is_native
)

es_api.client.connector.put.assert_called_once_with(
connector_id=connector_id,
service_type=service_type,
name=connector_name,
index_name=index_name,
is_native=is_native,
)


@pytest.mark.asyncio
async def test_es_api_connector_update_scheduling():
connector_id = "id"
scheduling = {"enabled": "true", "interval": "0 4 5 1 *"}

es_api = ESApi(elastic_config=config)
es_api.client = AsyncMock()

await es_api.connector_update_scheduling(connector_id, scheduling)

es_api.client.connector.update_scheduling.assert_called_once_with(
connector_id=connector_id, scheduling=scheduling
)


@pytest.mark.asyncio
async def test_es_api_connector_update_configuration():
connector_id = "id"
configuration = {"config_key": "config_value"}
values = {}

es_api = ESApi(elastic_config=config)
es_api.client = AsyncMock()

await es_api.connector_update_configuration(connector_id, configuration, values)

es_api.client.connector.update_configuration.assert_called_once_with(
connector_id=connector_id, configuration=configuration, values=values
)


@pytest.mark.asyncio
async def test_es_api_connector_update_filtering_draft_validation():
connector_id = "id"
validation_result = {"validation": "result"}

es_api = ESApi(elastic_config=config)
es_api.client = AsyncMock()

await es_api.connector_update_filtering_draft_validation(
connector_id, validation_result
)

es_api.client.connector.update_filtering_validation.assert_called_once_with(
connector_id=connector_id, validation=validation_result
)


@pytest.mark.asyncio
async def test_es_api_connector_activate_filtering_draft():
connector_id = "id"

es_api = ESApi(elastic_config=config)
es_api.client = AsyncMock()

await es_api.connector_activate_filtering_draft(connector_id)

es_api.client.connector.update_active_filtering.assert_called_once_with(
connector_id=connector_id
)


@pytest.mark.asyncio
async def test_es_api_connector_sync_job_create():
connector_id = "id"
job_type = "full"
trigger_method = "on_demand"

es_api = ESApi(elastic_config=config)
es_api.client = AsyncMock()

await es_api.connector_sync_job_create(connector_id, job_type, trigger_method)

es_api.client.connector.sync_job_post.assert_called_once_with(
id=connector_id, job_type=job_type, trigger_method=trigger_method
)


@pytest.mark.asyncio
async def test_es_api_connector_get():
connector_id = "id"
include_deleted = False

es_api = ESApi(elastic_config=config)
es_api._api_wrapper = AsyncMock()

await es_api.connector_get(connector_id, include_deleted)

es_api._api_wrapper.connector_get.assert_called_once_with(
connector_id, include_deleted
)


@pytest.mark.asyncio
async def test_es_api_connector_sync_job_claim():
sync_job_id = "sync_job_id_test"
worker_hostname = "workerhostname"
sync_cursor = {"foo": "bar"}

es_api = ESApi(elastic_config=config)
es_api.client = AsyncMock()

await es_api.connector_sync_job_claim(sync_job_id, worker_hostname, sync_cursor)

es_api.client.connector.sync_job_claim.assert_called_once_with(
connector_sync_job_id=sync_job_id,
worker_hostname=worker_hostname,
sync_cursor=sync_cursor,
)


@pytest.mark.asyncio
async def test_es_api_connector_sync_job_update_stats():
sync_job_id = "sync_job_id_test"
ingestion_stats = {"ingestion": "stat"}
metadata = {"meta": "data"}

es_api = ESApi(elastic_config=config)
es_api.client = AsyncMock()

await es_api.connector_sync_job_update_stats(sync_job_id, ingestion_stats, metadata)

es_api.client.connector.sync_job_update_stats.assert_called_once_with(
connector_sync_job_id=sync_job_id, body=ingestion_stats, metadata=metadata
)

0 comments on commit c802e8a

Please sign in to comment.