Skip to content

Commit

Permalink
Rename KafkaAdmin to KafkaAdminClient
Browse files Browse the repository at this point in the history
  • Loading branch information
jeffwidman committed Nov 20, 2018
1 parent 45196e3 commit fcc800f
Show file tree
Hide file tree
Showing 6 changed files with 26 additions and 26 deletions.
5 changes: 0 additions & 5 deletions docs/apidoc/KafkaAdmin.rst

This file was deleted.

5 changes: 5 additions & 0 deletions docs/apidoc/KafkaAdminClient.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
KafkaAdminClient
===========

.. autoclass:: kafka.admin.KafkaAdminClient
:members:
2 changes: 1 addition & 1 deletion docs/apidoc/modules.rst
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ kafka-python API

KafkaConsumer
KafkaProducer
KafkaAdmin
KafkaAdminClient
KafkaClient
BrokerConnection
ClusterMetadata
4 changes: 2 additions & 2 deletions kafka/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ def emit(self, record):
logging.getLogger(__name__).addHandler(NullHandler())


from kafka.admin import KafkaAdmin
from kafka.admin import KafkaAdminClient
from kafka.consumer import KafkaConsumer
from kafka.consumer.subscription_state import ConsumerRebalanceListener
from kafka.producer import KafkaProducer
Expand Down Expand Up @@ -47,7 +47,7 @@ def __init__(self, *args, **kwargs):


__all__ = [
'KafkaAdmin',
'KafkaAdminClient',
'KafkaConsumer', 'KafkaProducer', 'KafkaClient', 'BrokerConnection',
'SimpleClient', 'SimpleProducer', 'KeyedProducer',
'RoundRobinPartitioner', 'HashedPartitioner',
Expand Down
4 changes: 2 additions & 2 deletions kafka/admin/__init__.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
from __future__ import absolute_import

from kafka.admin.config_resource import ConfigResource, ConfigResourceType
from kafka.admin.kafka import KafkaAdmin
from kafka.admin.client import KafkaAdminClient
from kafka.admin.new_topic import NewTopic
from kafka.admin.new_partitions import NewPartitions

__all__ = [
'ConfigResource', 'ConfigResourceType', 'KafkaAdmin', 'NewTopic', 'NewPartitions'
'ConfigResource', 'ConfigResourceType', 'KafkaAdminClient', 'NewTopic', 'NewPartitions'
]
32 changes: 16 additions & 16 deletions kafka/admin/kafka.py → kafka/admin/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
log = logging.getLogger(__name__)


class KafkaAdmin(object):
class KafkaAdminClient(object):
"""A class for administering the Kafka cluster.
Warning:
Expand All @@ -35,7 +35,7 @@ class KafkaAdmin(object):
nicer, more pythonic objects. Unfortunately, this will likely break
those interfaces.
The KafkaAdmin class will negotiate for the latest version of each message
The KafkaAdminClient class will negotiate for the latest version of each message
protocol format supported by both the kafka-python client library and the
Kafka broker. Usage of optional fields from protocol versions that are not
supported by the broker will result in IncompatibleBrokerVersion exceptions.
Expand Down Expand Up @@ -174,7 +174,7 @@ class KafkaAdmin(object):
}

def __init__(self, **configs):
log.debug("Starting KafkaAdmin interface.")
log.debug("Starting KafkaAdminClient with configuration: %s", configs)
extra_configs = set(configs).difference(self.DEFAULT_CONFIG)
if extra_configs:
raise KafkaConfigurationError("Unrecognized configs: {}".format(extra_configs))
Expand All @@ -200,18 +200,18 @@ def __init__(self, **configs):

self._closed = False
self._refresh_controller_id()
log.debug("KafkaAdmin interface started.")
log.debug("KafkaAdminClient started.")

def close(self):
"""Close the KafkaAdmin connection to the Kafka broker."""
"""Close the KafkaAdminClient connection to the Kafka broker."""
if not hasattr(self, '_closed') or self._closed:
log.info("KafkaAdmin interface already closed.")
log.info("KafkaAdminClient already closed.")
return

self._metrics.close()
self._client.close()
self._closed = True
log.debug("KafkaAdmin interface has closed.")
log.debug("KafkaAdminClient is now closed.")

def _matching_api_version(self, operation):
"""Find the latest version of the protocol operation supported by both
Expand Down Expand Up @@ -252,7 +252,7 @@ def _refresh_controller_id(self):
controller_version = self._client.check_version(controller_id)
if controller_version < (0, 10, 0):
raise IncompatibleBrokerVersion(
"The controller appears to be running Kafka {}. KafkaAdmin requires brokers >= 0.10.0.0."
"The controller appears to be running Kafka {}. KafkaAdminClient requires brokers >= 0.10.0.0."
.format(controller_version))
self._controller_id = controller_id
else:
Expand Down Expand Up @@ -391,7 +391,7 @@ def create_topics(self, new_topics, timeout_ms=None, validate_only=False):
)
else:
raise NotImplementedError(
"Support for CreateTopics v{} has not yet been added to KafkaAdmin."
"Support for CreateTopics v{} has not yet been added to KafkaAdminClient."
.format(version))
# TODO convert structs to a more pythonic interface
# TODO raise exceptions if errors
Expand All @@ -415,7 +415,7 @@ def delete_topics(self, topics, timeout_ms=None):
response = self._send_request_to_controller(request)
else:
raise NotImplementedError(
"Support for DeleteTopics v{} has not yet been added to KafkaAdmin."
"Support for DeleteTopics v{} has not yet been added to KafkaAdminClient."
.format(version))
return response

Expand Down Expand Up @@ -474,7 +474,7 @@ def describe_configs(self, config_resources, include_synonyms=False):
)
else:
raise NotImplementedError(
"Support for DescribeConfigs v{} has not yet been added to KafkaAdmin."
"Support for DescribeConfigs v{} has not yet been added to KafkaAdminClient."
.format(version))
return self._send_request_to_node(self._client.least_loaded_node(), request)

Expand Down Expand Up @@ -507,7 +507,7 @@ def alter_configs(self, config_resources):
)
else:
raise NotImplementedError(
"Support for AlterConfigs v{} has not yet been added to KafkaAdmin."
"Support for AlterConfigs v{} has not yet been added to KafkaAdminClient."
.format(version))
# TODO the Java client has the note:
# // We must make a separate AlterConfigs request for every BROKER resource we want to alter
Expand Down Expand Up @@ -553,7 +553,7 @@ def create_partitions(self, topic_partitions, timeout_ms=None, validate_only=Fal
)
else:
raise NotImplementedError(
"Support for CreatePartitions v{} has not yet been added to KafkaAdmin."
"Support for CreatePartitions v{} has not yet been added to KafkaAdminClient."
.format(version))
return self._send_request_to_controller(request)

Expand Down Expand Up @@ -625,7 +625,7 @@ def describe_consumer_groups(self, group_ids, group_coordinator_id=None):
group_descriptions.append(group_description)
else:
raise NotImplementedError(
"Support for DescribeGroups v{} has not yet been added to KafkaAdmin."
"Support for DescribeGroups v{} has not yet been added to KafkaAdminClient."
.format(version))
return group_descriptions

Expand Down Expand Up @@ -674,7 +674,7 @@ def list_consumer_groups(self, broker_ids=None):
consumer_groups.update(response.groups)
else:
raise NotImplementedError(
"Support for ListGroups v{} has not yet been added to KafkaAdmin."
"Support for ListGroups v{} has not yet been added to KafkaAdminClient."
.format(version))
return list(consumer_groups)

Expand Down Expand Up @@ -744,7 +744,7 @@ def list_consumer_group_offsets(self, group_id, group_coordinator_id=None,
group_offsets_listing[TopicPartition(topic, partition)] = OffsetAndMetadata(offset, metadata)
else:
raise NotImplementedError(
"Support for OffsetFetch v{} has not yet been added to KafkaAdmin."
"Support for OffsetFetch v{} has not yet been added to KafkaAdminClient."
.format(version))
return group_offsets_listing

Expand Down

1 comment on commit fcc800f

@jeffwidman
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: I had forgotten to rename docs/apidoc/KafkaAdmin.rst to docs/apidoc/KafkaAdminClient.rst and since the commit was only 20 minutes old I just amended and force-pushed, as I thought it unlikely anyone else would have pulled the branch that recently...

Please sign in to comment.