From 212a9e44b7618a8168b929b302c3ad4782028115 Mon Sep 17 00:00:00 2001 From: Iury Gregory Melo Ferreira Date: Fri, 21 Feb 2020 14:58:10 +0100 Subject: [PATCH 1/6] Fix stable/train CI This change fixes the stable/train CI for ngs - update the .gitreview - update tox.ini - Change ironic-grenade-dsvm-multinode-multitenant to non-voting. - Fix tempest for networking-generic-switch-tempest-dlm-python2. Change-Id: Ibb6735c6f03513630cd02e0cf273e0d0e5dc4d0a --- .gitreview | 1 + tox.ini | 8 ++++---- zuul.d/networking-generic-switch-jobs.yaml | 1 + zuul.d/project.yaml | 7 +++++-- 4 files changed, 11 insertions(+), 6 deletions(-) diff --git a/.gitreview b/.gitreview index 3a78cb01..7e394384 100644 --- a/.gitreview +++ b/.gitreview @@ -2,3 +2,4 @@ host=review.opendev.org port=29418 project=openstack/networking-generic-switch.git +defaultbranch=stable/train diff --git a/tox.ini b/tox.ini index 68503f0c..947d44a0 100644 --- a/tox.ini +++ b/tox.ini @@ -13,7 +13,7 @@ setenv = VIRTUAL_ENV={envdir} LC_ALL=en_US.UTF-8 TESTS_DIR=./networking_generic_switch/tests/unit/ deps = - -c{env:UPPER_CONSTRAINTS_FILE:https://opendev.org/openstack/requirements/raw/branch/master/upper-constraints.txt} + -c{env:UPPER_CONSTRAINTS_FILE:https://releases.openstack.org/constraints/upper/train} -r{toxinidir}/requirements.txt -r{toxinidir}/test-requirements.txt passenv = http_proxy HTTP_PROXY https_proxy HTTPS_PROXY no_proxy NO_PROXY @@ -36,7 +36,7 @@ setenv = PYTHONHASHSEED=0 sitepackages = False envdir = {toxworkdir}/venv deps = - -c{env:UPPER_CONSTRAINTS_FILE:https://opendev.org/openstack/requirements/raw/branch/master/upper-constraints.txt} + -c{env:UPPER_CONSTRAINTS_FILE:https://releases.openstack.org/constraints/upper/train} -r{toxinidir}/requirements.txt -r{toxinidir}/doc/requirements.txt commands = @@ -56,7 +56,7 @@ commands = [testenv:releasenotes] basepython = python3 deps = - -c{env:UPPER_CONSTRAINTS_FILE:https://opendev.org/openstack/requirements/raw/branch/master/upper-constraints.txt} + -c{env:UPPER_CONSTRAINTS_FILE:https://releases.openstack.org/constraints/upper/train} -r{toxinidir}/doc/requirements.txt commands = sphinx-build -a -E -W -d releasenotes/build/doctrees -b html releasenotes/source releasenotes/build/html @@ -65,7 +65,7 @@ commands = basepython = python3 setenv = PYTHONHASHSEED=0 deps = - -c{env:UPPER_CONSTRAINTS_FILE:https://opendev.org/openstack/requirements/raw/branch/master/upper-constraints.txt} + -c{env:UPPER_CONSTRAINTS_FILE:https://releases.openstack.org/constraints/upper/train} -r{toxinidir}/test-requirements.txt -r{toxinidir}/doc/requirements.txt commands = {posargs} diff --git a/zuul.d/networking-generic-switch-jobs.yaml b/zuul.d/networking-generic-switch-jobs.yaml index eab79498..e68beca8 100644 --- a/zuul.d/networking-generic-switch-jobs.yaml +++ b/zuul.d/networking-generic-switch-jobs.yaml @@ -72,5 +72,6 @@ name: networking-generic-switch-tempest-dlm-python2 parent: networking-generic-switch-tempest-dlm-base vars: + tox_envlist: py27 devstack_localrc: USE_PYTHON3: False diff --git a/zuul.d/project.yaml b/zuul.d/project.yaml index 3cdd2c85..c7ead52e 100644 --- a/zuul.d/project.yaml +++ b/zuul.d/project.yaml @@ -13,7 +13,9 @@ irrelevant-files: - ^(test-|)requirements.txt$ - ^setup.cfg$ - - ironic-grenade-dsvm-multinode-multitenant + # NOTE(iurygregory): Non-voting due to instability. + - ironic-grenade-dsvm-multinode-multitenant: + voting: false - openstack-tox-lower-constraints gate: queue: networking-generic-switch @@ -24,5 +26,6 @@ irrelevant-files: - ^(test-|)requirements.txt$ - ^setup.cfg$ - - ironic-grenade-dsvm-multinode-multitenant + # Removing from gate due to instability. + # - ironic-grenade-dsvm-multinode-multitenant - openstack-tox-lower-constraints From 4dd2d4fc69dc28f82a8d1b33662c5dea2b84d9b5 Mon Sep 17 00:00:00 2001 From: John Garbutt Date: Mon, 27 Jul 2020 16:42:07 +0100 Subject: [PATCH 2/6] Add support for Cumulus 4.x NCLU driver This adds a new driver for Cumulus switches that support using NCLU. It actually uses the netmiko linux driver, given we are really just accessing a debian user space, and there is currently no cumulus specific driver in netmiko. It borrows quite heavily from the support in networking-ansible that uses these roles to configure cumulus switches: https://github.com/ansible-network/network-runner However, this driver adopts the more typical networking-generic-switch configuration operations, where existing port configuration is retained as the port cycles through various VLANs. In a similar way, we make disabling a port optional, and support users specifying a default VLAN, such as the ironic inspection network. Change-Id: I4d96ea3f3c3d55ff7d742b9e08aa75ce990eee9a --- .../devices/netmiko_devices/__init__.py | 11 +- .../devices/netmiko_devices/cumulus.py | 73 +++++++++++ .../tests/unit/netmiko/test_cumulus.py | 117 ++++++++++++++++++ ...cumulus-nclu-support-ddcffa604c3e1b18.yaml | 5 + setup.cfg | 1 + 5 files changed, 204 insertions(+), 3 deletions(-) create mode 100644 networking_generic_switch/devices/netmiko_devices/cumulus.py create mode 100644 networking_generic_switch/tests/unit/netmiko/test_cumulus.py create mode 100644 releasenotes/notes/add-cumulus-nclu-support-ddcffa604c3e1b18.yaml diff --git a/networking_generic_switch/devices/netmiko_devices/__init__.py b/networking_generic_switch/devices/netmiko_devices/__init__.py index 57d8e2aa..cb1ad380 100644 --- a/networking_generic_switch/devices/netmiko_devices/__init__.py +++ b/networking_generic_switch/devices/netmiko_devices/__init__.py @@ -61,6 +61,8 @@ def wrapper(self, *args, **kwargs): class NetmikoSwitch(devices.GenericSwitchDevice): + NETMIKO_DEVICE_TYPE = None + ADD_NETWORK = None DELETE_NETWORK = None @@ -88,9 +90,12 @@ class NetmikoSwitch(devices.GenericSwitchDevice): def __init__(self, device_cfg): super(NetmikoSwitch, self).__init__(device_cfg) - device_type = self.config.get('device_type', '') - # use part that is after 'netmiko_' - device_type = device_type.partition('netmiko_')[2] + if self.NETMIKO_DEVICE_TYPE: + device_type = self.NETMIKO_DEVICE_TYPE + else: + device_type = self.config.get('device_type', '') + # use part that is after 'netmiko_' + device_type = device_type.partition('netmiko_')[2] if device_type not in netmiko.platforms: raise exc.GenericSwitchNetmikoNotSupported( device_type=device_type) diff --git a/networking_generic_switch/devices/netmiko_devices/cumulus.py b/networking_generic_switch/devices/netmiko_devices/cumulus.py new file mode 100644 index 00000000..f6a5b40f --- /dev/null +++ b/networking_generic_switch/devices/netmiko_devices/cumulus.py @@ -0,0 +1,73 @@ +# Copyright 2020 StackHPC +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +import re + +from networking_generic_switch.devices import netmiko_devices + + +class Cumulus(netmiko_devices.NetmikoSwitch): + """Built for Cumulus 4.x + + Note for this switch you want config like this, + where secret is the password needed for sudo su: + + [genericswitch:] + device_type = netmiko_cumulus + ip = + username = + password = + secret = + ngs_physical_networks = physnet1 + ngs_max_connections = 1 + ngs_port_default_vlan = 123 + ngs_disable_inactive_ports = False + """ + NETMIKO_DEVICE_TYPE = "linux" + + ADD_NETWORK = [ + 'net add vlan {segmentation_id}', + ] + + DELETE_NETWORK = [ + 'net del vlan {segmentation_id}', + ] + + PLUG_PORT_TO_NETWORK = [ + 'net add interface {port} bridge access {segmentation_id}', + ] + + DELETE_PORT = [ + 'net del interface {port} bridge access {segmentation_id}', + ] + + ENABLE_PORT = [ + 'net del interface {port} link down', + ] + + DISABLE_PORT = [ + 'net add interface {port} link down', + ] + + SAVE_CONFIGURATION = [ + 'net commit', + ] + + ERROR_MSG_PATTERNS = [ + # Its tempting to add this error message, but as only one + # bridge-access is allowed, we ignore that error for now: + # re.compile(r'configuration does not have "bridge-access') + re.compile(r'ERROR: Command not found.'), + re.compile(r'command not found'), + re.compile(r'is not a physical interface on this switch'), + ] diff --git a/networking_generic_switch/tests/unit/netmiko/test_cumulus.py b/networking_generic_switch/tests/unit/netmiko/test_cumulus.py new file mode 100644 index 00000000..73c1831f --- /dev/null +++ b/networking_generic_switch/tests/unit/netmiko/test_cumulus.py @@ -0,0 +1,117 @@ +# Copyright 2020 StackHPC +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import mock + +from networking_generic_switch.devices.netmiko_devices import cumulus +from networking_generic_switch import exceptions as exc +from networking_generic_switch.tests.unit.netmiko import test_netmiko_base + + +class TestNetmikoCumulus(test_netmiko_base.NetmikoSwitchTestBase): + + def _make_switch_device(self, extra_cfg={}): + device_cfg = { + 'device_type': 'netmiko_cumulus', + 'ngs_port_default_vlan': '123', + 'ngs_disable_inactive_ports': 'True', + } + device_cfg.update(extra_cfg) + return cumulus.Cumulus(device_cfg) + + @mock.patch('networking_generic_switch.devices.netmiko_devices.' + 'NetmikoSwitch.send_commands_to_device', + return_value="") + def test_add_network(self, mock_exec): + self.switch.add_network(3333, '0ae071f5-5be9-43e4-80ea-e41fefe85b21') + mock_exec.assert_called_with( + ['net add vlan 3333']) + + @mock.patch('networking_generic_switch.devices.netmiko_devices.' + 'NetmikoSwitch.send_commands_to_device', + return_value="") + def test_delete_network(self, mock_exec): + self.switch.del_network(3333, '0ae071f5-5be9-43e4-80ea-e41fefe85b21') + mock_exec.assert_called_with( + ['net del vlan 3333']) + + @mock.patch('networking_generic_switch.devices.netmiko_devices.' + 'NetmikoSwitch.send_commands_to_device', + return_value="") + def test_plug_port_to_network(self, mock_exec): + self.switch.plug_port_to_network(3333, 33) + mock_exec.assert_called_with( + ['net del interface 3333 link down', + 'net del interface 3333 bridge access 123', + 'net add interface 3333 bridge access 33']) + + @mock.patch('networking_generic_switch.devices.netmiko_devices.' + 'NetmikoSwitch.send_commands_to_device') + def test_plug_port_to_network_fails(self, mock_exec): + mock_exec.return_value = ( + 'ERROR: Command not found.\n\nasdf' + ) + self.assertRaises(exc.GenericSwitchNetmikoConfigError, + self.switch.plug_port_to_network, 3333, 33) + + @mock.patch('networking_generic_switch.devices.netmiko_devices.' + 'NetmikoSwitch.send_commands_to_device') + def test_plug_port_to_network_fails_bad_port(self, mock_exec): + mock_exec.return_value = ( + 'ERROR: asd123 is not a physical interface on this switch.' + '\n\nasdf' + ) + self.assertRaises(exc.GenericSwitchNetmikoConfigError, + self.switch.plug_port_to_network, 3333, 33) + + @mock.patch('networking_generic_switch.devices.netmiko_devices.' + 'NetmikoSwitch.send_commands_to_device', + return_value="") + def test_plug_port_simple(self, mock_exec): + switch = self._make_switch_device({ + 'ngs_disable_inactive_ports': 'false', + 'ngs_port_default_vlan': '', + }) + switch.plug_port_to_network(3333, 33) + mock_exec.assert_called_with( + ['net add interface 3333 bridge access 33']) + + @mock.patch('networking_generic_switch.devices.netmiko_devices.' + 'NetmikoSwitch.send_commands_to_device', + return_value="") + def test_delete_port(self, mock_exec): + self.switch.delete_port(3333, 33) + mock_exec.assert_called_with( + ['net del interface 3333 bridge access 33', + 'net add vlan 123', + 'net add interface 3333 bridge access 123', + 'net add interface 3333 link down']) + + @mock.patch('networking_generic_switch.devices.netmiko_devices.' + 'NetmikoSwitch.send_commands_to_device', + return_value="") + def test_delete_port_simple(self, mock_exec): + switch = self._make_switch_device({ + 'ngs_disable_inactive_ports': 'false', + 'ngs_port_default_vlan': '', + }) + switch.delete_port(3333, 33) + mock_exec.assert_called_with( + ['net del interface 3333 bridge access 33']) + + def test_save(self): + mock_connect = mock.MagicMock() + mock_connect.save_config.side_effect = NotImplementedError + self.switch.save_configuration(mock_connect) + mock_connect.send_command.assert_called_with('net commit') diff --git a/releasenotes/notes/add-cumulus-nclu-support-ddcffa604c3e1b18.yaml b/releasenotes/notes/add-cumulus-nclu-support-ddcffa604c3e1b18.yaml new file mode 100644 index 00000000..5377610c --- /dev/null +++ b/releasenotes/notes/add-cumulus-nclu-support-ddcffa604c3e1b18.yaml @@ -0,0 +1,5 @@ +--- +features: + - | + Adds a new device driver, ``netmiko_cumulus``, for managing cumulus + based switch devices via NCLU. diff --git a/setup.cfg b/setup.cfg index ac4b33f9..e5621dc3 100644 --- a/setup.cfg +++ b/setup.cfg @@ -43,6 +43,7 @@ generic_switch.devices = netmiko_hp_comware = networking_generic_switch.devices.netmiko_devices.hpe:HpeComware netmiko_juniper = networking_generic_switch.devices.netmiko_devices.juniper:Juniper netmiko_mellanox_mlnxos = networking_generic_switch.devices.netmiko_devices.mellanox_mlnxos:MellanoxMlnxOS + netmiko_cumulus = networking_generic_switch.devices.netmiko_devices.cumulus:Cumulus tempest.test_plugins = ngs_tests = tempest_plugin.plugin:NGSTempestPlugin From e9996466fd28a878c5a1662175d9a0150b90101c Mon Sep 17 00:00:00 2001 From: John Garbutt Date: Mon, 27 Jul 2020 17:59:10 +0100 Subject: [PATCH 3/6] WIP: Attempt to batch up cmds When you have around 60 baremetal nodes attached to a single switch, it takes a long time to execute all those commands. This gets worse when you limit the number of concurrent ssh connections. Here we look to batch up commands to send to the switch together, then wait for them all to complete before returning the result. It does this by using etcd keys as a queueing system. We pull off the queue using the version at which the keys were added, giving a FIFO style queue. Change-Id: I8c458bbc94df5630cfede5434bcdbe527988059c --- lower-constraints.txt | 1 + networking_generic_switch/batching.py | 335 ++++++++++++++++++ networking_generic_switch/config.py | 3 + .../devices/netmiko_devices/__init__.py | 32 +- .../tests/unit/test_batching.py | 191 ++++++++++ requirements.txt | 2 + 6 files changed, 557 insertions(+), 7 deletions(-) create mode 100644 networking_generic_switch/batching.py create mode 100644 networking_generic_switch/tests/unit/test_batching.py diff --git a/lower-constraints.txt b/lower-constraints.txt index 49f9e30d..4c99164c 100644 --- a/lower-constraints.txt +++ b/lower-constraints.txt @@ -19,6 +19,7 @@ deprecation==1.0 doc8==0.6.0 docutils==0.11 dogpile.cache==0.6.2 +etcd3gw===0.2.4 eventlet==0.18.2 extras==1.0.0 fasteners==0.7.0 diff --git a/networking_generic_switch/batching.py b/networking_generic_switch/batching.py new file mode 100644 index 00000000..04f97518 --- /dev/null +++ b/networking_generic_switch/batching.py @@ -0,0 +1,335 @@ +# Copyright 2020 StackHPC +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import json +import queue + +import etcd3gw +from etcd3gw import watch +import eventlet +from oslo_log import log as logging +from oslo_utils import netutils +from oslo_utils import uuidutils +import tenacity + +LOG = logging.getLogger(__name__) + + +class SwitchQueue(object): + INPUT_PREFIX = "/ngs/batch/%s/input/" + INPUT_ITEM_KEY = "/ngs/batch/%s/input/%s" + RESULT_ITEM_KEY = "/ngs/batch/%s/output/%s" + EXEC_LOCK = "/ngs/batch/%s/execute_lock" + + def __init__(self, switch_name, etcd_client): + self.switch_name = switch_name + self.client = etcd_client + + def add_batch_and_wait_for_result(self, cmds): + """Clients add batch, given key events. + + Each batch is given an uuid that is used to generate both + and input and result key in etcd. + + First we watch for any results, second we write the input + in a location that the caller of get_batches will be looking. + + No locks are required when calling this function to send work + to the workers, and start waiting for results. + + Returns a function that takes a timeout parameter to wait + for the default. + """ + + uuid = uuidutils.generate_uuid() + result_key = self.RESULT_ITEM_KEY % (self.switch_name, uuid) + input_key = self.INPUT_ITEM_KEY % (self.switch_name, uuid) + + # Start waiting on the key we expect to be created and + # start watching before writing input key to avoid racing + watcher, get_result = self._watch_for_result(result_key) + + batch = { + "uuid": uuid, + "input_key": input_key, + "result_key": result_key, + "cmds": cmds, + } + value = json.dumps(batch, sort_keys=True).encode("utf-8") + try: + # TODO(johngarbutt) add a lease so this times out? + success = self.client.create(input_key, value) + except Exception: + # Be sure to free watcher resources + watcher.stop() + raise + + # Be sure to free watcher resources on error + if not success: + watcher.stop() + raise Exception("failed to add batch to key: %s", input_key) + + LOG.debug("written input key %s", input_key) + return get_result + + def _watch_for_result(self, result_key): + # Logic based on implementation of client.watch_once() + event_queue = queue.Queue() + + def callback(event): + event_queue.put(event) + + watcher = watch.Watcher(self.client, result_key, callback) + + def wait_for_key(timeout): + try: + event = event_queue.get(timeout=timeout) + except queue.Empty: + raise Exception("timed out waiting for key: %s", result_key) + finally: + # NOTE(johngarbutt) this means we need the caller + # to always watch for the result, or call stop + # before starting to wait for the key + watcher.stop() + + LOG.debug("got event: %s", event) + # TODO(johngarbutt) check we have the create event and result? + result_dict = self._get_and_delete_result(result_key) + LOG.debug("got result: %s", result_dict) + return result_dict["result"] + + return watcher, wait_for_key + + def _get_and_delete_result(self, result_key): + # called when watch event says the result key should exist + raw_results = self.client.get(result_key) + if len(raw_results) != 1: + raise Exception("unable to find result: %s", result_key) + raw_value = raw_results[0] + result_dict = json.loads(raw_value.decode('utf-8')) + LOG.debug("fetched result for: ", result_key) + + # delete key now we have the result + delete_success = self.client.delete(result_key) + if not delete_success: + LOG.error("unable to delete result key: %s", + result_key) + LOG.debug("deleted result for: ", result_key) + return result_dict + + def _get_raw_batches(self): + input_prefix = self.INPUT_PREFIX % self.switch_name + # Sort order ensures FIFO style queue + raw_batches = self.client.get_prefix(input_prefix, + sort_order="ascend", + sort_target="create") + return raw_batches + + def get_batches(self): + """Return a list of the event dicts written in wait for result. + + This is called both with or without getting a lock to get the + latest list of work that has send to the per switch queue in + etcd. + """ + raw_batches = self._get_raw_batches() + LOG.debug("found %s batches", len(raw_batches)) + + batches = [] + for raw_value, metadata in raw_batches: + batch = json.loads(raw_value.decode('utf-8')) + batches.append(batch) + return batches + + def record_result(self, result, batches): + """Record the result from executing given batch list. + + We assume that a lock is held before getting a fresh list + of batches, executing them, and then calling this record + results function, before finally dropping the lock. + """ + LOG.debug("write results for %s batches", len(batches)) + + # Write results first, so watchers seen these quickly + for batch in batches: + batch["result"] = result + # TODO(johngarbutt) create this with a lease + # so auto delete if no one gets the result? + success = self.client.create( + batch['result_key'], + json.dumps(batch, sort_keys=True).encode('utf-8')) + if not success: + # TODO(johngarbutt) should we fail to delete the key at + # this point? + LOG.error("failed to report batch result for: %s", + batch) + + # delete input keys so the next worker to hold the lock + # knows not to execute these batches + for batch in batches: + input_key = batch["input_key"] + delete_success = self.client.delete(input_key) + if not delete_success: + LOG.error("unable to delete input key: %s", + input_key) + else: + LOG.debug("deleted input key: %s", input_key) + + def acquire_worker_lock(self, acquire_timeout=300, lock_ttl=120, + wait=None): + """Wait for lock needed to call record_result. + + This blocks until the work queue is empty of the switch lock is + acquired. If we timeout waiting for the lock we raise an exception. + """ + lock_name = self.EXEC_LOCK % self.switch_name + lock = self.client.lock(lock_name, lock_ttl) + + if wait is None: + wait = tenacity.wait_random(min=1, max=3) + + @tenacity.retry( + # Log a message after each failed attempt. + after=tenacity.after_log(LOG, logging.DEBUG), + # Retry if we haven't got the lock yet + retry=tenacity.retry_if_result(lambda x: x is False), + # Stop after timeout. + stop=tenacity.stop_after_delay(acquire_timeout), + # Wait between lock retries + wait=wait, + ) + def _acquire_lock_with_retry(): + lock_acquired = lock.acquire() + if lock_acquired: + return lock + + # Stop waiting for the lock if there is nothing to do + work = self._get_raw_batches() + if not work: + return None + + # Trigger a retry + return False + + return _acquire_lock_with_retry() + + +class SwitchBatch(object): + def __init__(self, switch_name, etcd_url=None, switch_queue=None): + if switch_queue is None: + parsed_url = netutils.urlsplit(etcd_url) + host = parsed_url.hostname + port = parsed_url.port + # TODO(johngarbutt): support certs + protocol = 'https' if parsed_url.scheme.endswith( + 'https') else 'http' + etcd_client = etcd3gw.client( + host=host, port=port, protocol=protocol, + timeout=30) + self.queue = SwitchQueue(switch_name, etcd_client) + else: + self.queue = switch_queue + self.switch_name = switch_name + + def do_batch(self, cmd_set, batch_fn, timeout=300): + """Batch up calls to this function to reduce overheads. + + We collect together the iterables in the cmd_set, and + execute them toegether in a single larger batch. + This reduces overhead, but does make it harder to track + down which of the cmds failed. + + :param cmd_set: an iterable of commands + :param batch_fn: function that takes an iterable of commands + :return: output string generated by the whole batch + """ + + # request that the cmd_set by executed + cmd_list = list(cmd_set) + wait_for_result = self.queue.add_batch_and_wait_for_result(cmd_list) + + def do_work(): + try: + self._execute_pending_batches( + batch_fn) + except Exception as e: + LOG.error("failed to run execute batch: %s", e, + exec_info=True) + raise + + self._spawn(do_work) + + # Wait for our result key + # as the result might be done before the above task starts + output = wait_for_result(timeout=timeout) + LOG.debug("Got batch result: %s", output) + return output + + @staticmethod + def _spawn(work_fn): + # TODO(johngarbutt) remove hard eventlet dependency + # in a similar way to etcd3gw + # Sleep to let possible other work to batch together + eventlet.sleep(0.1) + # Run all pending tasks, which might be a no op + # if pending tasks already ran + eventlet.spawn_n(work_fn) + + def _execute_pending_batches(self, batch_fn): + """Execute all batches currently registered. + + Typically called by every caller of add_batch. + Could be a noop if all batches are already executed. + """ + batches = self.queue.get_batches() + if not batches: + LOG.debug("Skipped execution for %s", self.switch_name) + return + LOG.debug("Getting lock to execute %d batches for %s", + len(batches), self.switch_name) + + lock = self.queue.acquire_worker_lock() + if lock is None: + # This means we stopped waiting as the work queue was empty + LOG.debug("Work list empty for %s", self.switch_name) + return + + # Check we got the lock + if not lock.is_acquired(): + raise Exception("unable to get lock for: %s", self.switch_name) + + # be sure to drop the lock when we are done + with lock: + LOG.debug("got lock for %s", self.switch_name) + + # Fetch fresh list now we have the lock + # and order the list so we execute in order added + batches = self.queue.get_batches() + if not batches: + LOG.debug("No batches to execute %s", self.switch_name) + return + + LOG.debug("Starting to execute %d batches", len(batches)) + all_cmds = [] + for batch in batches: + all_cmds += batch['cmds'] + + # Execute batch function with all the commands + result = batch_fn(all_cmds) + + # Tell request watchers the result and + # tell workers which batches have now been executed + self.queue.record_result(result, batches) + + LOG.debug("end of lock for %s", self.switch_name) diff --git a/networking_generic_switch/config.py b/networking_generic_switch/config.py index 1f5eef77..543d7483 100644 --- a/networking_generic_switch/config.py +++ b/networking_generic_switch/config.py @@ -26,6 +26,9 @@ default=60, help='Timeout in seconds after which an attempt to grab a lock ' 'is failed. Value of 0 is forever.'), + cfg.BoolOpt('batch_requests', default=False, + help='EXPERIMENTAL: option to batch up concurrent requests ' + 'to each switche. Only tested with Cumulus driver.') ] CONF.register_opts(coordination_opts, group='ngs_coordination') diff --git a/networking_generic_switch/devices/netmiko_devices/__init__.py b/networking_generic_switch/devices/netmiko_devices/__init__.py index cb1ad380..b569fbc9 100644 --- a/networking_generic_switch/devices/netmiko_devices/__init__.py +++ b/networking_generic_switch/devices/netmiko_devices/__init__.py @@ -24,6 +24,7 @@ import tenacity from tooz import coordination +from networking_generic_switch import batching from networking_generic_switch import devices from networking_generic_switch.devices import utils as device_utils from networking_generic_switch import exceptions as exc @@ -101,20 +102,29 @@ def __init__(self, device_cfg): device_type=device_type) self.config['device_type'] = device_type + self.lock_kwargs = { + 'locks_pool_size': int(self.ngs_config['ngs_max_connections']), + 'locks_prefix': self.config.get( + 'host', '') or self.config.get('ip', ''), + 'timeout': CONF.ngs_coordination.acquire_timeout} + self.locker = None - if CONF.ngs_coordination.backend_url: + self.batch_cmds = None + if CONF.ngs_coordination.batch_requests and \ + CONF.ngs_coordination.backend_url: + # NOTE: we skip the lock if we are batching requests + self.locker = None + switch_name = self.lock_kwargs['locks_prefix'] + self.batch_cmds = batching.SwitchBatch( + CONF.ngs_coordination.backend_url, + switch_name) + elif CONF.ngs_coordination.backend_url: self.locker = coordination.get_coordinator( CONF.ngs_coordination.backend_url, ('ngs-' + CONF.host).encode('ascii')) self.locker.start() atexit.register(self.locker.stop) - self.lock_kwargs = { - 'locks_pool_size': int(self.ngs_config['ngs_max_connections']), - 'locks_prefix': self.config.get( - 'host', '') or self.config.get('ip', ''), - 'timeout': CONF.ngs_coordination.acquire_timeout} - def _format_commands(self, commands, **kwargs): if not commands: return [] @@ -172,6 +182,14 @@ def _create_connection(): yield net_connect def send_commands_to_device(self, cmd_set): + # If configured, batch up requests to the switch + if self.batch_cmds is not None: + return self.batch_cmds.do_batch( + cmd_set, + self._send_commands_to_device) + return self._send_commands_to_device(cmd_set) + + def _send_commands_to_device(self, cmd_set): if not cmd_set: LOG.debug("Nothing to execute") return diff --git a/networking_generic_switch/tests/unit/test_batching.py b/networking_generic_switch/tests/unit/test_batching.py new file mode 100644 index 00000000..10213f8a --- /dev/null +++ b/networking_generic_switch/tests/unit/test_batching.py @@ -0,0 +1,191 @@ +# Copyright 2020 StackHPC +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import mock + +import fixtures +from oslo_config import fixture as config_fixture +from oslo_utils import uuidutils +import tenacity + +from networking_generic_switch import batching + + +class SwitchQueueTest(fixtures.TestWithFixtures): + def setUp(self): + super(SwitchQueueTest, self).setUp() + self.cfg = self.useFixture(config_fixture.Config()) + + self.client = mock.Mock() + self.switch_name = "switch1" + self.queue = batching.SwitchQueue(self.switch_name, self.client) + + @mock.patch.object(uuidutils, "generate_uuid") + @mock.patch.object(batching.SwitchQueue, "_watch_for_result") + def test_add_batch_and_wait_for_result(self, mock_watch, mock_uuid): + mock_watch.return_value = ("watcher", "callback") + mock_uuid.return_value = "uuid" + + callback = self.queue.add_batch_and_wait_for_result(["cmd1", "cmd2"]) + + self.assertEqual("callback", callback) + mock_watch.assert_called_once_with('/ngs/batch/switch1/output/uuid') + self.client.create.assert_called_once_with( + '/ngs/batch/switch1/input/uuid', + b'{"cmds": ["cmd1", "cmd2"], ' + b'"input_key": "/ngs/batch/switch1/input/uuid", ' + b'"result_key": "/ngs/batch/switch1/output/uuid", ' + b'"uuid": "uuid"}' + ) + + def test_get_and_delete_result(self): + self.client.get.return_value = [b'{"foo": "bar"}'] + + result = self.queue._get_and_delete_result("result_key") + + self.assertEqual({"foo": "bar"}, result) + self.client.get.assert_called_once_with("result_key") + self.client.delete.assert_called_once_with("result_key") + + def test_get_batches(self): + self.client.get_prefix.return_value = [ + (b'{"foo": "bar"}', {}), + (b'{"foo1": "bar1"}', {}) + ] + + batches = self.queue.get_batches() + + self.assertEqual([ + {"foo": "bar"}, + {"foo1": "bar1"} + ], batches) + self.client.get_prefix.assert_called_once_with( + '/ngs/batch/switch1/input/', + sort_order='ascend', sort_target='create') + + def test_record_result(self): + batches = [ + {"result_key": "result1", "input_key": "input1"}, + {"result_key": "result2", "input_key": "input2"}, + ] + + self.queue.record_result("asdf", batches) + + self.assertEqual(2, self.client.create.call_count) + self.client.create.assert_has_calls([ + mock.call( + "result1", + b'{"input_key": "input1", "result": "asdf", ' + b'"result_key": "result1"}'), + mock.call( + "result2", + b'{"input_key": "input2", "result": "asdf", ' + b'"result_key": "result2"}'), + ]) + self.assertEqual(2, self.client.delete.call_count) + self.client.delete.assert_has_calls([ + mock.call("input1"), + mock.call("input2"), + ]) + + @mock.patch.object(batching.SwitchQueue, "_get_raw_batches") + def test_acquire_worker_lock_timeout(self, mock_get): + mock_get.return_value = ["work"] + lock = mock.MagicMock() + lock.acquire.return_value = False + self.client.lock.return_value = lock + + wait = tenacity.wait_none() + self.assertRaises( + tenacity.RetryError, + self.queue.acquire_worker_lock, + wait=wait, acquire_timeout=0.05) + + @mock.patch.object(batching.SwitchQueue, "_get_raw_batches") + def test_acquire_worker_lock_no_work(self, mock_get): + mock_get.side_effect = [["work"], None] + lock = mock.MagicMock() + lock.acquire.return_value = False + self.client.lock.return_value = lock + + wait = tenacity.wait_none() + result = self.queue.acquire_worker_lock( + wait=wait, acquire_timeout=0.05) + + self.assertIsNone(result) + self.assertEqual(2, mock_get.call_count) + self.assertEqual(2, lock.acquire.call_count) + + @mock.patch.object(batching.SwitchQueue, "_get_raw_batches") + def test_acquire_worker_lock_success(self, mock_get): + mock_get.return_value = ["work"] + lock = mock.MagicMock() + lock.acquire.side_effect = [False, False, True] + self.client.lock.return_value = lock + + wait = tenacity.wait_none() + result = self.queue.acquire_worker_lock( + wait=wait, acquire_timeout=0.05) + + self.assertEqual(lock, result) + self.assertEqual(2, mock_get.call_count) + self.assertEqual(3, lock.acquire.call_count) + + +class SwitchBatchTest(fixtures.TestWithFixtures): + def setUp(self): + super(SwitchBatchTest, self).setUp() + self.cfg = self.useFixture(config_fixture.Config()) + + self.queue = mock.Mock() + self.switch_name = "switch1" + self.batch = batching.SwitchBatch( + self.switch_name, switch_queue=self.queue) + + @mock.patch.object(batching.SwitchBatch, "_spawn") + def test_do_batch(self, mock_spawn): + callback = mock.MagicMock() + callback.return_value = "output" + self.queue.add_batch_and_wait_for_result.return_value = callback + result = self.batch.do_batch(["cmd1"], "fn") + + self.assertEqual("output", result) + self.assertEqual(1, mock_spawn.call_count) + self.queue.add_batch_and_wait_for_result.assert_called_once_with( + ["cmd1"]) + callback.assert_called_once_with(timeout=300) + + def test_do_execute_pending_batches_skip(self): + self.queue.get_batches.return_value = [] + batch = mock.MagicMock() + + result = self.batch._execute_pending_batches(batch) + + self.assertIsNone(result) + + def test_do_execute_pending_batches_success(self): + self.queue.get_batches.return_value = [ + {"cmds": ["cmd1", "cmd2"]}, + {"cmds": ["cmd3", "cmd4"]}, + ] + batch = mock.MagicMock() + lock = mock.MagicMock() + self.queue.acquire_worker_lock.return_value = lock + + self.batch._execute_pending_batches(batch) + + batch.assert_called_once_with(['cmd1', 'cmd2', 'cmd3', 'cmd4']) + self.queue.acquire_worker_lock.assert_called_once_with() + lock.__enter__.assert_called_once_with() + lock.__exit__.assert_called_once_with(None, None, None) diff --git a/requirements.txt b/requirements.txt index 0b369b97..9dc07e1e 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,6 +1,8 @@ # The order of packages is significant, because pip processes them in the order # of appearance. Changing the order has an impact on the overall integration # process, which may cause wedges in the gate later. +etcd3gw>=0.2.4 # Apache-2.0 +eventlet>=0.18.2 # Apache-2.0 stevedore>=1.20.0 # Apache-2.0 netmiko>=2.4.1 # MIT neutron>=13.0.0.0b1 # Apache-2.0 From 7f630f6bdbc8479daec6b83c6a7f7df18cc14466 Mon Sep 17 00:00:00 2001 From: John Garbutt Date: Tue, 28 Jul 2020 15:55:35 +0100 Subject: [PATCH 4/6] Fix up batching so config is per switch Change-Id: If2eeb49202b83d59197f1577982b05db1534b894 --- networking_generic_switch/config.py | 3 --- networking_generic_switch/devices/__init__.py | 6 ++++++ .../devices/netmiko_devices/__init__.py | 3 +-- 3 files changed, 7 insertions(+), 5 deletions(-) diff --git a/networking_generic_switch/config.py b/networking_generic_switch/config.py index 543d7483..1f5eef77 100644 --- a/networking_generic_switch/config.py +++ b/networking_generic_switch/config.py @@ -26,9 +26,6 @@ default=60, help='Timeout in seconds after which an attempt to grab a lock ' 'is failed. Value of 0 is forever.'), - cfg.BoolOpt('batch_requests', default=False, - help='EXPERIMENTAL: option to batch up concurrent requests ' - 'to each switche. Only tested with Cumulus driver.') ] CONF.register_opts(coordination_opts, group='ngs_coordination') diff --git a/networking_generic_switch/devices/__init__.py b/networking_generic_switch/devices/__init__.py index c2319513..daf479b9 100644 --- a/networking_generic_switch/devices/__init__.py +++ b/networking_generic_switch/devices/__init__.py @@ -38,6 +38,7 @@ {'name': 'ngs_switchport_mode', 'default': 'access'}, # If True, disable switch ports that are not in use. {'name': 'ngs_disable_inactive_ports', 'default': False}, + {'name': 'ngs_batch_requests', 'default': False}, ] @@ -105,6 +106,11 @@ def _disable_inactive_ports(self): return strutils.bool_from_string( self.ngs_config['ngs_disable_inactive_ports']) + def _batch_requests(self): + """Return whether to batch up requests to the switch.""" + return strutils.bool_from_string( + self.ngs_config['ngs_batch_requests']) + @abc.abstractmethod def add_network(self, segmentation_id, network_id): pass diff --git a/networking_generic_switch/devices/netmiko_devices/__init__.py b/networking_generic_switch/devices/netmiko_devices/__init__.py index b569fbc9..a6de60f1 100644 --- a/networking_generic_switch/devices/netmiko_devices/__init__.py +++ b/networking_generic_switch/devices/netmiko_devices/__init__.py @@ -110,8 +110,7 @@ def __init__(self, device_cfg): self.locker = None self.batch_cmds = None - if CONF.ngs_coordination.batch_requests and \ - CONF.ngs_coordination.backend_url: + if self._batch_requests() and CONF.ngs_coordination.backend_url: # NOTE: we skip the lock if we are batching requests self.locker = None switch_name = self.lock_kwargs['locks_prefix'] From b796d0800c8292c4e61daa73cf5bdd16b1c23ee9 Mon Sep 17 00:00:00 2001 From: John Garbutt Date: Wed, 29 Jul 2020 09:57:21 +0100 Subject: [PATCH 5/6] Add config debug Change-Id: I3e527f08d3f939a929fd06412d4e0e843891510d --- networking_generic_switch/batching.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/networking_generic_switch/batching.py b/networking_generic_switch/batching.py index 04f97518..18aaa548 100644 --- a/networking_generic_switch/batching.py +++ b/networking_generic_switch/batching.py @@ -229,11 +229,16 @@ class SwitchBatch(object): def __init__(self, switch_name, etcd_url=None, switch_queue=None): if switch_queue is None: parsed_url = netutils.urlsplit(etcd_url) + LOG.debug(parsed_url) + LOG.debug(etcd_url) host = parsed_url.hostname port = parsed_url.port # TODO(johngarbutt): support certs protocol = 'https' if parsed_url.scheme.endswith( 'https') else 'http' + kwargs = dict(host=host, port=port, + protocol=protocol, timeout=30) + LOG.debug(kwargs) etcd_client = etcd3gw.client( host=host, port=port, protocol=protocol, timeout=30) From af378da4f6daa5b31e91f73b689fc1716c458e88 Mon Sep 17 00:00:00 2001 From: John Garbutt Date: Wed, 29 Jul 2020 10:30:43 +0100 Subject: [PATCH 6/6] Fix switch batch config Change-Id: Ie1eef0337775ec4ad04ce0e83352b75fd987a12c --- networking_generic_switch/batching.py | 2 -- networking_generic_switch/devices/netmiko_devices/__init__.py | 4 ++-- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/networking_generic_switch/batching.py b/networking_generic_switch/batching.py index 18aaa548..5e7486e8 100644 --- a/networking_generic_switch/batching.py +++ b/networking_generic_switch/batching.py @@ -229,8 +229,6 @@ class SwitchBatch(object): def __init__(self, switch_name, etcd_url=None, switch_queue=None): if switch_queue is None: parsed_url = netutils.urlsplit(etcd_url) - LOG.debug(parsed_url) - LOG.debug(etcd_url) host = parsed_url.hostname port = parsed_url.port # TODO(johngarbutt): support certs diff --git a/networking_generic_switch/devices/netmiko_devices/__init__.py b/networking_generic_switch/devices/netmiko_devices/__init__.py index a6de60f1..22fd5d45 100644 --- a/networking_generic_switch/devices/netmiko_devices/__init__.py +++ b/networking_generic_switch/devices/netmiko_devices/__init__.py @@ -114,9 +114,9 @@ def __init__(self, device_cfg): # NOTE: we skip the lock if we are batching requests self.locker = None switch_name = self.lock_kwargs['locks_prefix'] + LOG.debug("setting up request batching for: %s", switch_name) self.batch_cmds = batching.SwitchBatch( - CONF.ngs_coordination.backend_url, - switch_name) + switch_name, CONF.ngs_coordination.backend_url) elif CONF.ngs_coordination.backend_url: self.locker = coordination.get_coordinator( CONF.ngs_coordination.backend_url,