Skip to content

Commit

Permalink
Bump black and fix lint
Browse files Browse the repository at this point in the history
  • Loading branch information
pquentin committed Oct 30, 2023
1 parent 527b156 commit 3f4a94e
Show file tree
Hide file tree
Showing 29 changed files with 50 additions and 74 deletions.
10 changes: 3 additions & 7 deletions elasticsearch/_async/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ async def _process_bulk_chunk(
raise_on_error=True,
ignore_status=(),
*args,
**kwargs
**kwargs,
):
"""
Send a bulk request to elasticsearch and process the output.
Expand Down Expand Up @@ -132,9 +132,8 @@ async def async_streaming_bulk(
yield_ok=True,
ignore_status=(),
*args,
**kwargs
**kwargs,
):

"""
Streaming bulk consumes actions from the iterable passed in and yields
results per action. For non-streaming usecases use
Expand Down Expand Up @@ -176,7 +175,6 @@ async def map_actions():
async for bulk_data, bulk_actions in _chunk_actions(
map_actions(), chunk_size, max_chunk_bytes, client.transport.serializer
):

for attempt in range(max_retries + 1):
to_retry, to_retry_data = [], []
if attempt:
Expand All @@ -198,7 +196,6 @@ async def map_actions():
**kwargs,
),
):

if not ok:
action, info = info.popitem()
# retry if retries enabled, we get 429, and we are not
Expand Down Expand Up @@ -292,7 +289,7 @@ async def async_scan(
request_timeout=None,
clear_scroll=True,
scroll_kwargs=None,
**kwargs
**kwargs,
):
"""
Simple abstraction on top of the
Expand Down Expand Up @@ -430,7 +427,6 @@ async def async_reindex(
scan_kwargs={},
bulk_kwargs={},
):

"""
Reindex all documents from one index that satisfy a given query
to another, potentially (if `target_client` is specified) on a different cluster.
Expand Down
8 changes: 4 additions & 4 deletions elasticsearch/_async/helpers.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ def _process_bulk_chunk(
raise_on_error: bool = ...,
ignore_status: Optional[Union[int, Collection[int]]] = ...,
*args: Any,
**kwargs: Any
**kwargs: Any,
) -> AsyncGenerator[Tuple[bool, Any], None]: ...
def aiter(x: Union[Iterable[T], AsyncIterable[T]]) -> AsyncGenerator[T, None]: ...
def azip(
Expand All @@ -70,15 +70,15 @@ def async_streaming_bulk(
yield_ok: bool = ...,
ignore_status: Optional[Union[int, Collection[int]]] = ...,
*args: Any,
**kwargs: Any
**kwargs: Any,
) -> AsyncGenerator[Tuple[bool, Any], None]: ...
async def async_bulk(
client: AsyncElasticsearch,
actions: Union[Iterable[Any], AsyncIterable[Any]],
stats_only: bool = ...,
ignore_status: Optional[Union[int, Collection[int]]] = ...,
*args: Any,
**kwargs: Any
**kwargs: Any,
) -> Tuple[int, Union[int, List[Any]]]: ...
def async_scan(
client: AsyncElasticsearch,
Expand All @@ -90,7 +90,7 @@ def async_scan(
request_timeout: Optional[Union[float, int]] = ...,
clear_scroll: bool = ...,
scroll_kwargs: Optional[Mapping[str, Any]] = ...,
**kwargs: Any
**kwargs: Any,
) -> AsyncGenerator[int, None]: ...
async def async_reindex(
client: AsyncElasticsearch,
Expand Down
1 change: 0 additions & 1 deletion elasticsearch/_async/http_aiohttp.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@ async def close(self):


class AIOHttpConnection(AsyncConnection):

HTTP_CLIENT_META = ("ai", _client_meta_version(aiohttp.__version__))

def __init__(
Expand Down
4 changes: 1 addition & 3 deletions elasticsearch/_async/transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ def __init__(
retry_on_timeout=False,
send_get_body_as="GET",
meta_header=True,
**kwargs
**kwargs,
):
"""
:arg hosts: list of dictionaries, each containing keyword arguments to
Expand Down Expand Up @@ -166,7 +166,6 @@ async def _async_init(self):

# ... and we can start sniffing in the background.
if self.sniffing_task is None and self.sniff_on_start:

# Create an asyncio.Event for future calls to block on
# until the initial sniffing task completes.
self._sniff_on_start_event = asyncio.Event()
Expand Down Expand Up @@ -467,7 +466,6 @@ async def _do_verify_elasticsearch(self, headers, timeout):
# Ensure that there's only one async exec within this section
# at a time to not emit unnecessary index API calls.
async with self._verify_elasticsearch_lock:

# Product check has already been completed while we were
# waiting our turn, no need to do again.
if self._verified_elasticsearch is not None:
Expand Down
2 changes: 1 addition & 1 deletion elasticsearch/_async/transport.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ class AsyncTransport(object):
retry_on_timeout: bool = ...,
send_get_body_as: str = ...,
meta_header: bool = ...,
**kwargs: Any
**kwargs: Any,
) -> None: ...
def add_connection(self, host: Any) -> None: ...
def set_connections(self, hosts: Collection[Any]) -> None: ...
Expand Down
2 changes: 1 addition & 1 deletion elasticsearch/client/utils.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ def query_params(
response_mimetypes: Optional[List[str]] = ...,
body_params: Optional[List[str]] = ...,
body_name: Optional[str] = ...,
body_required: Optional[bool] = ...
body_required: Optional[bool] = ...,
) -> Callable[[Callable[..., T]], Callable[..., T]]: ...
def _bulk_body(
serializer: Serializer, body: Union[str, bytes, Mapping[str, Any], Iterable[Any]]
Expand Down
7 changes: 3 additions & 4 deletions elasticsearch/compat.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,13 @@
# under the License.

import asyncio
from collections.abc import Mapping
from queue import Queue
from urllib.parse import quote, quote_plus, unquote, urlencode, urlparse

string_types = str, bytes
from urllib.parse import quote, quote_plus, unquote, urlencode, urlparse

map = map
from queue import Queue


def to_str(x, encoding="ascii"):
Expand All @@ -36,8 +37,6 @@ def to_bytes(x, encoding="ascii"):
return x


from collections.abc import Mapping

reraise_exceptions = (RecursionError, asyncio.CancelledError)

try:
Expand Down
3 changes: 1 addition & 2 deletions elasticsearch/connection/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,8 @@ def __init__(
api_key=None,
opaque_id=None,
meta_header=True,
**kwargs
**kwargs,
):

if cloud_id:
try:
_, cloud_id = cloud_id.split(":")
Expand Down
2 changes: 1 addition & 1 deletion elasticsearch/connection/base.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ class Connection(object):
api_key: Optional[Union[Tuple[str, str], List[str], str]] = ...,
opaque_id: Optional[str] = ...,
meta_header: bool = ...,
**kwargs: Any
**kwargs: Any,
) -> None: ...
def __repr__(self) -> str: ...
def __eq__(self, other: object) -> bool: ...
Expand Down
4 changes: 2 additions & 2 deletions elasticsearch/connection/http_requests.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ def __init__(
cloud_id=None,
api_key=None,
opaque_id=None,
**kwargs
**kwargs,
):
if not REQUESTS_AVAILABLE:
raise ImproperlyConfigured(
Expand All @@ -101,7 +101,7 @@ def __init__(
cloud_id=cloud_id,
api_key=api_key,
opaque_id=opaque_id,
**kwargs
**kwargs,
)

if not self.http_compress:
Expand Down
2 changes: 1 addition & 1 deletion elasticsearch/connection/http_requests.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -40,5 +40,5 @@ class RequestsHttpConnection(Connection):
api_key: Optional[Any] = ...,
opaque_id: Optional[str] = ...,
meta_header: bool = ...,
**kwargs: Any
**kwargs: Any,
) -> None: ...
4 changes: 2 additions & 2 deletions elasticsearch/connection/http_urllib3.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ def __init__(
cloud_id=None,
api_key=None,
opaque_id=None,
**kwargs
**kwargs,
):
# Initialize headers before calling super().__init__().
self.headers = urllib3.make_headers(keep_alive=True)
Expand All @@ -136,7 +136,7 @@ def __init__(
cloud_id=cloud_id,
api_key=api_key,
opaque_id=opaque_id,
**kwargs
**kwargs,
)
if http_auth is not None:
if isinstance(http_auth, (tuple, list)):
Expand Down
2 changes: 1 addition & 1 deletion elasticsearch/connection/http_urllib3.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -54,5 +54,5 @@ class Urllib3HttpConnection(Connection):
api_key: Optional[Any] = ...,
opaque_id: Optional[str] = ...,
meta_header: bool = ...,
**kwargs: Any
**kwargs: Any,
) -> None: ...
2 changes: 1 addition & 1 deletion elasticsearch/connection_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ def __init__(
timeout_cutoff=5,
selector_class=RoundRobinSelector,
randomize_hosts=True,
**kwargs
**kwargs,
):
"""
:arg connections: list of tuples containing the
Expand Down
2 changes: 1 addition & 1 deletion elasticsearch/connection_pool.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ class ConnectionPool(object):
timeout_cutoff: int = ...,
selector_class: Type[ConnectionSelector] = ...,
randomize_hosts: bool = ...,
**kwargs: Any
**kwargs: Any,
) -> None: ...
def mark_dead(self, connection: Connection, now: Optional[float] = ...) -> None: ...
def mark_live(self, connection: Connection) -> None: ...
Expand Down
2 changes: 0 additions & 2 deletions elasticsearch/helpers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@
# specific language governing permissions and limitations
# under the License.

import sys

from .._async.helpers import async_bulk, async_reindex, async_scan, async_streaming_bulk
from .actions import (
_chunk_actions,
Expand Down
20 changes: 8 additions & 12 deletions elasticsearch/helpers/actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ def _process_bulk_chunk(
raise_on_error=True,
ignore_status=(),
*args,
**kwargs
**kwargs,
):
"""
Send a bulk request to elasticsearch and process the output.
Expand Down Expand Up @@ -278,9 +278,8 @@ def streaming_bulk(
yield_ok=True,
ignore_status=(),
*args,
**kwargs
**kwargs,
):

"""
Streaming bulk consumes actions from the iterable passed in and yields
results per action. For non-streaming usecases use
Expand Down Expand Up @@ -319,7 +318,6 @@ def streaming_bulk(
for bulk_data, bulk_actions in _chunk_actions(
actions, chunk_size, max_chunk_bytes, client.transport.serializer
):

for attempt in range(max_retries + 1):
to_retry, to_retry_data = [], []
if attempt:
Expand All @@ -336,10 +334,9 @@ def streaming_bulk(
raise_on_error,
ignore_status,
*args,
**kwargs
**kwargs,
),
):

if not ok:
action, info = info.popitem()
# retry if retries enabled, we get 429, and we are not
Expand Down Expand Up @@ -431,7 +428,7 @@ def parallel_bulk(
expand_action_callback=expand_action,
ignore_status=(),
*args,
**kwargs
**kwargs,
):
"""
Parallel version of the bulk helper run in multiple threads at once.
Expand Down Expand Up @@ -477,7 +474,7 @@ def _setup_queues(self):
bulk_chunk[0],
ignore_status=ignore_status,
*args,
**kwargs
**kwargs,
)
),
_chunk_actions(
Expand All @@ -502,7 +499,7 @@ def scan(
request_timeout=None,
clear_scroll=True,
scroll_kwargs=None,
**kwargs
**kwargs,
):
"""
Simple abstraction on top of the
Expand Down Expand Up @@ -624,7 +621,7 @@ def scan(
scroll_id=scroll_id,
ignore=(404,),
params={"__elastic_client_meta": (("h", "s"),)},
**transport_kwargs
**transport_kwargs,
)


Expand All @@ -640,7 +637,6 @@ def reindex(
scan_kwargs={},
bulk_kwargs={},
):

"""
Reindex all documents from one index that satisfy a given query
to another, potentially (if `target_client` is specified) on a different cluster.
Expand Down Expand Up @@ -713,5 +709,5 @@ def _change_doc_index(hits, index, op_type):
target_client,
_change_doc_index(docs, target_index, op_type),
chunk_size=chunk_size,
**kwargs
**kwargs,
)
10 changes: 5 additions & 5 deletions elasticsearch/helpers/actions.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def _process_bulk_chunk(
raise_on_exception: bool = ...,
raise_on_error: bool = ...,
*args: Any,
**kwargs: Any
**kwargs: Any,
) -> Generator[Tuple[bool, Any], None, None]: ...
def streaming_bulk(
client: Elasticsearch,
Expand All @@ -63,15 +63,15 @@ def streaming_bulk(
yield_ok: bool = ...,
ignore_status: Optional[Union[int, Collection[int]]] = ...,
*args: Any,
**kwargs: Any
**kwargs: Any,
) -> Generator[Tuple[bool, Any], None, None]: ...
def bulk(
client: Elasticsearch,
actions: Iterable[Any],
stats_only: bool = ...,
ignore_status: Optional[Union[int, Collection[int]]] = ...,
*args: Any,
**kwargs: Any
**kwargs: Any,
) -> Tuple[int, Union[int, List[Any]]]: ...
def parallel_bulk(
client: Elasticsearch,
Expand All @@ -83,7 +83,7 @@ def parallel_bulk(
expand_action_callback: Callable[[Any], Tuple[Dict[str, Any], Optional[Any]]] = ...,
ignore_status: Optional[Union[int, Collection[int]]] = ...,
*args: Any,
**kwargs: Any
**kwargs: Any,
) -> Generator[Tuple[bool, Any], None, None]: ...
def scan(
client: Elasticsearch,
Expand All @@ -95,7 +95,7 @@ def scan(
request_timeout: Optional[Union[float, int]] = ...,
clear_scroll: bool = ...,
scroll_kwargs: Optional[Mapping[str, Any]] = ...,
**kwargs: Any
**kwargs: Any,
) -> Generator[Any, None, None]: ...
def reindex(
client: Elasticsearch,
Expand Down
Loading

0 comments on commit 3f4a94e

Please sign in to comment.