Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Updating ESApi class methods to use ES client calls instead of temporary wrapper methods #3110

Merged
merged 19 commits into from
Jan 28, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion NOTICE.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3467,7 +3467,7 @@ SOFTWARE
to the Commercial Software.

elasticsearch
8.17.0
8.17.1
Apache Software License

Apache License
Expand Down
96 changes: 19 additions & 77 deletions connectors/es/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,66 +35,6 @@ async def connector_get(self, connector_id, include_deleted):
params={"include_deleted": include_deleted},
mattnowzari marked this conversation as resolved.
Show resolved Hide resolved
)

async def connector_check_in(self, connector_id):
return await self.client.perform_request(
"PUT",
f"/_connector/{connector_id}/_check_in",
headers={"accept": "application/json"},
)

async def connector_update_filtering_draft_validation(
self, connector_id, validation_result
):
return await self.client.perform_request(
"PUT",
f"/_connector/{connector_id}/_filtering/_validation",
headers={"accept": "application/json", "Content-Type": "application/json"},
body={"validation": validation_result},
)

async def connector_activate_filtering_draft(self, connector_id):
return await self.client.perform_request(
"PUT",
f"/_connector/{connector_id}/_filtering/_activate",
headers={"accept": "application/json"},
)

async def connector_sync_job_claim(self, sync_job_id, worker_hostname, sync_cursor):
await self.client.perform_request(
"PUT",
f"/_connector/_sync_job/{sync_job_id}/_claim",
headers={"accept": "application/json", "Content-Type": "application/json"},
body={
"worker_hostname": worker_hostname,
**({"sync_cursor": sync_cursor} if sync_cursor else {}),
},
)

async def connector_sync_job_create(self, connector_id, job_type, trigger_method):
return await self.client.perform_request(
"POST",
"/_connector/_sync_job",
headers={"accept": "application/json", "Content-Type": "application/json"},
body={
"id": connector_id,
"job_type": job_type,
"trigger_method": trigger_method,
},
)

async def connector_sync_job_update_stats(
self, sync_job_id, ingestion_stats, metadata
):
await self.client.perform_request(
"PUT",
f"/_connector/_sync_job/{sync_job_id}/_stats",
headers={"accept": "application/json", "Content-Type": "application/json"},
body={
**ingestion_stats,
**({"metadata": metadata} if metadata else {}),
},
)


class ESApi(ESClient):
def __init__(self, elastic_config):
Expand All @@ -103,7 +43,7 @@ def __init__(self, elastic_config):

async def connector_check_in(self, connector_id):
return await self._retrier.execute_with_retry(
partial(self._api_wrapper.connector_check_in, connector_id)
partial(self.client.connector.check_in, connector_id=connector_id)
)

async def connector_get(self, connector_id, include_deleted=False):
Expand Down Expand Up @@ -149,34 +89,36 @@ async def connector_update_filtering_draft_validation(
):
return await self._retrier.execute_with_retry(
partial(
self._api_wrapper.connector_update_filtering_draft_validation,
connector_id,
validation_result,
self.client.connector.update_filtering_validation,
connector_id=connector_id,
validation=validation_result,
)
)

async def connector_activate_filtering_draft(self, connector_id):
return await self._retrier.execute_with_retry(
partial(self._api_wrapper.connector_activate_filtering_draft, connector_id)
partial(
self.client.connector.update_active_filtering, connector_id=connector_id
)
)

async def connector_sync_job_claim(self, sync_job_id, worker_hostname, sync_cursor):
return await self._retrier.execute_with_retry(
partial(
self._api_wrapper.connector_sync_job_claim,
sync_job_id,
worker_hostname,
sync_cursor,
self.client.connector.sync_job_claim,
connector_sync_job_id=sync_job_id,
worker_hostname=worker_hostname,
sync_cursor=sync_cursor,
)
)

async def connector_sync_job_create(self, connector_id, job_type, trigger_method):
return await self._retrier.execute_with_retry(
partial(
self._api_wrapper.connector_sync_job_create,
connector_id,
job_type,
trigger_method,
self.client.connector.sync_job_post,
id=connector_id,
job_type=job_type,
trigger_method=trigger_method,
)
)

Expand All @@ -185,10 +127,10 @@ async def connector_sync_job_update_stats(
):
return await self._retrier.execute_with_retry(
partial(
self._api_wrapper.connector_sync_job_update_stats,
sync_job_id,
ingestion_stats,
metadata,
self.client.connector.sync_job_update_stats,
connector_sync_job_id=sync_job_id,
body=ingestion_stats,
metadata=metadata,
)
)

Expand Down
2 changes: 1 addition & 1 deletion requirements/framework.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ aiofiles==23.2.1
aiomysql==0.1.1
httpx==0.27.0
httpx-ntlm==1.4.0
elasticsearch[async]==8.17.0
elasticsearch[async]==8.17.1
elastic-transport==8.15.1
pyyaml==6.0.1
cffi==1.16.0
Expand Down
165 changes: 164 additions & 1 deletion tests/es/test_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import pytest
from elasticsearch import ApiError, ConflictError

from connectors.es.index import DocumentNotFoundError, ESIndex
from connectors.es.index import DocumentNotFoundError, ESApi, ESIndex

headers = {"X-Elastic-Product": "Elasticsearch"}
config = {
Expand Down Expand Up @@ -255,3 +255,166 @@ async def test_get_all_docs(mock_responses):
assert doc_count == total

await index.close()


Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see that we mock the calls to the API, call the API, and then check that the mock was called with the params we specified. It does not feel this tests anything other than the signature of the method. Is this typical in our tests?

Copy link
Contributor Author

@mattnowzari mattnowzari Jan 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I saw a few tests that behave this way (here is an example of one), and I figured that because ESApi extends* the ES client, we could simply have coverage to make sure we are calling those classes correctly.

I can totally go back to the drawing board with these if we wanted to make them do more, though!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do test like this sometimes, alternative is testing with mocking HTTP requests: https://github.com/elastic/connectors/blob/main/tests/test_sink.py#L95

Copy link

@erikcurrin-elastic erikcurrin-elastic Jan 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like the later we can mock, the better. Otherwise, we are testing things which are not likely to reveal a bug. The questions is why do we write the test? It should be to do sanity checking and to protect ourselves against an incompatible change, not just for the sake of tests. I defer to the team's judgement on the level of testing needed/

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree with your thoughts, Erik. These tests are rather thin, much like ESApi itself.

Given that ESApi acts like a partial wrapper around the ES client dependency, I would say any deeper testing would be akin to testing the client itself which, from connectors' perspective, is an import that presumably was tested already.

I don't think these unit tests are hurting anything overall, and they do follow precedent in how other tests are written in connectors. I am guaranteed to revisit this part of the code base when 9.0 releases, so I personally would advocate for these diffs as they sit.

The unit tests also technically don't exist yet, so I could be convinced we don't need them at all - we've come this far without them 🤣

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK. If something changed in the underlying call and this test fails, it is worthy to keep. Your call

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with Matt, these tests follow the pattern found in this repo and ensure that we pass arguments correctly to the client from our wrapper methods, which add the retry logic.

For the end-to-end scenario, we have a suite of functional tests that spin up an ES cluster and use these client calls to communicate with ES.

So, this code will ultimately be tested during the functional test step anyway.

@pytest.mark.asyncio
async def test_es_api_connector_check_in():
connector_id = "id"

es_api = ESApi(elastic_config=config)
es_api.client = AsyncMock()

await es_api.connector_check_in(connector_id)

es_api.client.connector.check_in.assert_called_once_with(connector_id=connector_id)


@pytest.mark.asyncio
async def test_es_api_connector_put():
connector_id = "id"
service_type = "service_type"
connector_name = "connector_name"
index_name = "index_name"
is_native = True

es_api = ESApi(elastic_config=config)
es_api.client = AsyncMock()

await es_api.connector_put(
connector_id, service_type, connector_name, index_name, is_native
)

es_api.client.connector.put.assert_called_once_with(
connector_id=connector_id,
service_type=service_type,
name=connector_name,
index_name=index_name,
is_native=is_native,
)


@pytest.mark.asyncio
async def test_es_api_connector_update_scheduling():
connector_id = "id"
scheduling = {"enabled": "true", "interval": "0 4 5 1 *"}

es_api = ESApi(elastic_config=config)
es_api.client = AsyncMock()

await es_api.connector_update_scheduling(connector_id, scheduling)

es_api.client.connector.update_scheduling.assert_called_once_with(
connector_id=connector_id, scheduling=scheduling
)


@pytest.mark.asyncio
async def test_es_api_connector_update_configuration():
connector_id = "id"
configuration = {"config_key": "config_value"}
values = {}

es_api = ESApi(elastic_config=config)
es_api.client = AsyncMock()

await es_api.connector_update_configuration(connector_id, configuration, values)

es_api.client.connector.update_configuration.assert_called_once_with(
connector_id=connector_id, configuration=configuration, values=values
)


@pytest.mark.asyncio
async def test_es_api_connector_update_filtering_draft_validation():
connector_id = "id"
validation_result = {"validation": "result"}

es_api = ESApi(elastic_config=config)
es_api.client = AsyncMock()

await es_api.connector_update_filtering_draft_validation(
connector_id, validation_result
)

es_api.client.connector.update_filtering_validation.assert_called_once_with(
connector_id=connector_id, validation=validation_result
)


@pytest.mark.asyncio
async def test_es_api_connector_activate_filtering_draft():
connector_id = "id"

es_api = ESApi(elastic_config=config)
es_api.client = AsyncMock()

await es_api.connector_activate_filtering_draft(connector_id)

es_api.client.connector.update_active_filtering.assert_called_once_with(
connector_id=connector_id
)


@pytest.mark.asyncio
async def test_es_api_connector_sync_job_create():
connector_id = "id"
job_type = "full"
trigger_method = "on_demand"

es_api = ESApi(elastic_config=config)
es_api.client = AsyncMock()

await es_api.connector_sync_job_create(connector_id, job_type, trigger_method)

es_api.client.connector.sync_job_post.assert_called_once_with(
id=connector_id, job_type=job_type, trigger_method=trigger_method
)


@pytest.mark.asyncio
async def test_es_api_connector_get():
connector_id = "id"
include_deleted = False

es_api = ESApi(elastic_config=config)
es_api._api_wrapper = AsyncMock()

await es_api.connector_get(connector_id, include_deleted)

es_api._api_wrapper.connector_get.assert_called_once_with(
connector_id, include_deleted
)


@pytest.mark.asyncio
async def test_es_api_connector_sync_job_claim():
sync_job_id = "sync_job_id_test"
worker_hostname = "workerhostname"
sync_cursor = {"foo": "bar"}

es_api = ESApi(elastic_config=config)
es_api.client = AsyncMock()

await es_api.connector_sync_job_claim(sync_job_id, worker_hostname, sync_cursor)

es_api.client.connector.sync_job_claim.assert_called_once_with(
connector_sync_job_id=sync_job_id,
worker_hostname=worker_hostname,
sync_cursor=sync_cursor,
)


@pytest.mark.asyncio
async def test_es_api_connector_sync_job_update_stats():
sync_job_id = "sync_job_id_test"
ingestion_stats = {"ingestion": "stat"}
metadata = {"meta": "data"}

es_api = ESApi(elastic_config=config)
es_api.client = AsyncMock()

await es_api.connector_sync_job_update_stats(sync_job_id, ingestion_stats, metadata)

es_api.client.connector.sync_job_update_stats.assert_called_once_with(
connector_sync_job_id=sync_job_id, body=ingestion_stats, metadata=metadata
)