diff --git a/elasticsearch/helpers/__init__.py b/elasticsearch/helpers/__init__.py index 848138bf0..174bdaecd 100644 --- a/elasticsearch/helpers/__init__.py +++ b/elasticsearch/helpers/__init__.py @@ -1,4 +1,5 @@ from .errors import BulkIndexError, ScanError from .actions import expand_action, streaming_bulk, bulk, parallel_bulk +from .actions import streaming_chunks, parallel_chunks from .actions import scan, reindex from .actions import _chunk_actions, _process_bulk_chunk diff --git a/elasticsearch/helpers/actions.py b/elasticsearch/helpers/actions.py index 61469f538..54c4c41c1 100644 --- a/elasticsearch/helpers/actions.py +++ b/elasticsearch/helpers/actions.py @@ -1,3 +1,4 @@ +from functools import partial from operator import methodcaller import time @@ -53,14 +54,21 @@ def expand_action(data): return action, data.get("_source", data) -def _chunk_actions(actions, chunk_size, max_chunk_bytes, serializer): +def _chunk_actions( + actions, + chunk_size, + max_chunk_bytes, + serializer, + expand_action_callback=expand_action +): """ Split actions into chunks by number or size, serialize them into strings in the process. """ bulk_actions, bulk_data = [], [] size, action_count = 0, 0 - for action, data in actions: + for input_action in actions: + action, data = expand_action_callback(input_action) raw_data, raw_action = data, action action = serializer.dumps(action) # +1 to account for the trailing new line character @@ -81,9 +89,9 @@ def _chunk_actions(actions, chunk_size, max_chunk_bytes, serializer): bulk_actions.append(action) if data is not None: bulk_actions.append(data) - bulk_data.append((raw_action, raw_data)) + bulk_data.append((input_action, raw_action, raw_data)) else: - bulk_data.append((raw_action,)) + bulk_data.append((input_action, raw_action,)) size += cur_size action_count += 1 @@ -121,10 +129,12 @@ def _process_bulk_chunk( for data in bulk_data: # collect all the information about failed actions - op_type, action = data[0].copy().popitem() + op_type, action = data[1].copy().popitem() info = {"error": err_message, "status": e.status_code, "exception": e} + # include original input action + info["action"] = data[0] if op_type != "delete": - info["data"] = data[1] + info["data"] = data[2] info.update(action) exc_errors.append({op_type: info}) @@ -142,11 +152,13 @@ def _process_bulk_chunk( for data, (op_type, item) in zip( bulk_data, map(methodcaller("popitem"), resp["items"]) ): + # include original input action + item["action"] = data[0] ok = 200 <= item.get("status", 500) < 300 if not ok and raise_on_error: # include original document source - if len(data) > 1: - item["data"] = data[1] + if len(data) > 2: + item["data"] = data[2] errors.append({op_type: item}) if ok or not errors: @@ -173,7 +185,6 @@ def streaming_bulk( *args, **kwargs ): - """ Streaming bulk consumes actions from the iterable passed in and yields results per action. For non-streaming usecases use @@ -206,11 +217,64 @@ def streaming_bulk( :arg max_backoff: maximum number of seconds a retry will wait :arg yield_ok: if set to False will skip successful documents in the output """ - actions = map(expand_action_callback, actions) + chunker = partial( + _chunk_actions, + chunk_size=chunk_size, + max_chunk_bytes=max_chunk_bytes, + serializer=client.transport.serializer, + expand_action_callback=expand_action_callback + ) - for bulk_data, bulk_actions in _chunk_actions( - actions, chunk_size, max_chunk_bytes, client.transport.serializer + for item in streaming_chunks( + client, + actions, + chunker, + raise_on_error=raise_on_error, + raise_on_exception=raise_on_exception, + max_retries=max_retries, + initial_backoff=initial_backoff, + max_backoff=max_backoff, + yield_ok=yield_ok, + *args, + **kwargs ): + yield item + + +def streaming_chunks( + client, + actions, + chunker, + raise_on_error=True, + raise_on_exception=True, + max_retries=0, + initial_backoff=2, + max_backoff=600, + yield_ok=True, + *args, + **kwargs +): + """ + Implementation of the ``streaming_bulk`` helper, chunking actions using + given chunker function. + + :arg client: instance of :class:`~elasticsearch.Elasticsearch` to use + :arg actions: iterable containing the actions to be executed + :arg chunker: function to chunk actions into separate ``bulk`` calls, + should yield tuples of raw data and serialized actions. + :arg raise_on_error: raise ``BulkIndexError`` containing errors (as `.errors`) + from the execution of the last chunk when some occur. By default we raise. + :arg raise_on_exception: if ``False`` then don't propagate exceptions from + call to ``bulk`` and just report the items that failed as failed. + :arg max_retries: maximum number of times a document will be retried when + ``429`` is received, set to 0 (default) for no retries on ``429`` + :arg initial_backoff: number of seconds we should wait before the first + retry. Any subsequent retries will be powers of ``initial_backoff * + 2**retry_number`` + :arg max_backoff: maximum number of seconds a retry will wait + :arg yield_ok: if set to False will skip successful documents in the output + """ + for bulk_data, bulk_actions in chunker(actions): for attempt in range(max_retries + 1): to_retry, to_retry_data = [], [] @@ -241,9 +305,9 @@ def streaming_bulk( and (attempt + 1) <= max_retries ): # _process_bulk_chunk expects strings so we need to - # re-serialize the data + # re-serialize the expanded action and data to_retry.extend( - map(client.transport.serializer.dumps, data) + map(client.transport.serializer.dumps, data[1:]) ) to_retry_data.append(data) else: @@ -338,12 +402,55 @@ def parallel_bulk( :arg queue_size: size of the task queue between the main thread (producing chunks to send) and the processing threads. """ + chunker = partial( + _chunk_actions, + chunk_size=chunk_size, + max_chunk_bytes=max_chunk_bytes, + serializer=client.transport.serializer, + expand_action_callback=expand_action_callback + ) + + for item in parallel_chunks( + client, + actions, + chunker, + thread_count=thread_count, + queue_size=queue_size, + *args, + **kwargs + ): + yield item + + +def parallel_chunks( + client, + actions, + chunker, + thread_count=4, + queue_size=4, + *args, + **kwargs +): + """ + Implementation of the ``parallel_bulk`` helper, chunking actions using + given chunker function. + + :arg client: instance of :class:`~elasticsearch.Elasticsearch` to use + :arg actions: iterator containing the actions + :arg chunker: function to chunk actions into separate ``bulk`` calls, + should yield tuples of raw data and serialized actions. + :arg thread_count: size of the threadpool to use for the bulk requests + :arg raise_on_error: raise ``BulkIndexError`` containing errors (as `.errors`) + from the execution of the last chunk when some occur. By default we raise. + :arg raise_on_exception: if ``False`` then don't propagate exceptions from + call to ``bulk`` and just report the items that failed as failed. + :arg queue_size: size of the task queue between the main thread (producing + chunks to send) and the processing threads. + """ # Avoid importing multiprocessing unless parallel_bulk is used # to avoid exceptions on restricted environments like App Engine from multiprocessing.pool import ThreadPool - actions = map(expand_action_callback, actions) - class BlockingPool(ThreadPool): def _setup_queues(self): super(BlockingPool, self)._setup_queues() @@ -361,9 +468,7 @@ def _setup_queues(self): client, bulk_chunk[1], bulk_chunk[0], *args, **kwargs ) ), - _chunk_actions( - actions, chunk_size, max_chunk_bytes, client.transport.serializer - ), + chunker(actions) ): for item in result: yield item diff --git a/test_elasticsearch/test_helpers.py b/test_elasticsearch/test_helpers.py index b95792f79..b8ebc66f6 100644 --- a/test_elasticsearch/test_helpers.py +++ b/test_elasticsearch/test_helpers.py @@ -58,7 +58,7 @@ def test_chunk_sent_from_different_threads(self, _process_bulk_chunk): class TestChunkActions(TestCase): def setUp(self): super(TestChunkActions, self).setUp() - self.actions = [({"index": {}}, {"some": u"datá", "i": i}) for i in range(100)] + self.actions = [{"_op_type": "index", "some": u"datá", "i": i} for i in range(100)] def test_chunks_are_chopped_by_byte_size(self): self.assertEquals( diff --git a/test_elasticsearch/test_server/test_helpers.py b/test_elasticsearch/test_server/test_helpers.py index bb679e9e3..6cfe211c2 100644 --- a/test_elasticsearch/test_server/test_helpers.py +++ b/test_elasticsearch/test_server/test_helpers.py @@ -1,4 +1,4 @@ -from mock import patch +from mock import patch, MagicMock from elasticsearch import helpers, TransportError from elasticsearch.helpers import ScanError @@ -56,10 +56,13 @@ def test_all_errors_from_chunk_are_raised_on_failure(self): self.client.cluster.health(wait_for_status="yellow") try: - for ok, item in helpers.streaming_bulk( - self.client, [{"a": "b"}, {"a": "c"}], index="i", raise_on_error=True - ): + docs = [{"a": "b"}, {"a": "c"}] + for i, (ok, item) in enumerate(helpers.streaming_bulk( + self.client, docs, index="i", raise_on_error=True + )): self.assertTrue(ok) + op_type, info = item.popitem() + self.assertEquals(info["action"], docs[i]) except helpers.BulkIndexError as e: self.assertEquals(2, len(e.errors)) else: @@ -81,8 +84,10 @@ def test_different_op_types(self): "doc": {"answer": 42}, }, ] - for ok, item in helpers.streaming_bulk(self.client, docs): + for i, (ok, item) in enumerate(helpers.streaming_bulk(self.client, docs)): self.assertTrue(ok) + op_type, info = item.popitem() + self.assertEquals(info["action"], docs[i]) self.assertFalse(self.client.exists(index="i", id=45)) self.assertEquals({"answer": 42}, self.client.get(index="i", id=42)["_source"]) @@ -117,6 +122,7 @@ def test_transport_error_can_becaught(self): "_index": "i", "_type": "_doc", "_id": 45, + "action": docs[1], "data": {"f": "v"}, "error": "TransportError(599, 'Error!')", "status": 599, @@ -147,6 +153,7 @@ def test_rejected_documents_are_retried(self): ) self.assertEquals(3, len(results)) self.assertEquals([True, True, True], [r[0] for r in results]) + self.assertEquals(results[1][1]["index"]["action"], docs[1]) self.client.indices.refresh(index="i") res = self.client.search(index="i") self.assertEquals({"value": 3, "relation": "eq"}, res["hits"]["total"]) @@ -175,6 +182,7 @@ def test_rejected_documents_are_retried_at_most_max_retries_times(self): ) self.assertEquals(3, len(results)) self.assertEquals([False, True, True], [r[0] for r in results]) + self.assertEquals(results[0][1]["index"]["action"], docs[0]) self.client.indices.refresh(index="i") res = self.client.search(index="i") self.assertEquals({"value": 2, "relation": "eq"}, res["hits"]["total"]) @@ -203,6 +211,39 @@ def streaming_bulk(): self.assertEquals(4, failing_client._called) +class TestStreamingChunks(ElasticsearchTestCase): + def simple_chunker(self, actions): + for item in actions: + raw_action = { + "index": { + "_id": item["id"] + } + } + data = { + "x": item["x"] + } + action_lines = list(map( + self.client.transport.serializer.dumps, (raw_action, data) + )) + yield [(item, raw_action, data)], action_lines + + def test_actions_chunker(self): + actions = [{"id": 1, "x": "A"}, {"id": 2, "x": "B"}] + actions_gen = (action for action in actions) + + mock_chunker = MagicMock() + mock_chunker.side_effect = self.simple_chunker + + for i, (ok, item) in enumerate(helpers.streaming_chunks( + self.client, actions_gen, mock_chunker, index="test-index" + )): + self.assertTrue(ok) + self.assertEquals(item["index"]["_id"], str(actions[i]["id"])) + self.assertEquals(item["index"]["action"], actions[i]) + + mock_chunker.assert_called_once_with(actions_gen) + + class TestBulk(ElasticsearchTestCase): def test_bulk_works_with_single_item(self): docs = [{"answer": 42, "_id": 1}]