diff --git a/.gitreview b/.gitreview index 3a78cb01..7e394384 100644 --- a/.gitreview +++ b/.gitreview @@ -2,3 +2,4 @@ host=review.opendev.org port=29418 project=openstack/networking-generic-switch.git +defaultbranch=stable/train diff --git a/lower-constraints.txt b/lower-constraints.txt index 49f9e30d..4c99164c 100644 --- a/lower-constraints.txt +++ b/lower-constraints.txt @@ -19,6 +19,7 @@ deprecation==1.0 doc8==0.6.0 docutils==0.11 dogpile.cache==0.6.2 +etcd3gw===0.2.4 eventlet==0.18.2 extras==1.0.0 fasteners==0.7.0 diff --git a/networking_generic_switch/batching.py b/networking_generic_switch/batching.py new file mode 100644 index 00000000..5e7486e8 --- /dev/null +++ b/networking_generic_switch/batching.py @@ -0,0 +1,338 @@ +# Copyright 2020 StackHPC +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import json +import queue + +import etcd3gw +from etcd3gw import watch +import eventlet +from oslo_log import log as logging +from oslo_utils import netutils +from oslo_utils import uuidutils +import tenacity + +LOG = logging.getLogger(__name__) + + +class SwitchQueue(object): + INPUT_PREFIX = "/ngs/batch/%s/input/" + INPUT_ITEM_KEY = "/ngs/batch/%s/input/%s" + RESULT_ITEM_KEY = "/ngs/batch/%s/output/%s" + EXEC_LOCK = "/ngs/batch/%s/execute_lock" + + def __init__(self, switch_name, etcd_client): + self.switch_name = switch_name + self.client = etcd_client + + def add_batch_and_wait_for_result(self, cmds): + """Clients add batch, given key events. + + Each batch is given an uuid that is used to generate both + and input and result key in etcd. + + First we watch for any results, second we write the input + in a location that the caller of get_batches will be looking. + + No locks are required when calling this function to send work + to the workers, and start waiting for results. + + Returns a function that takes a timeout parameter to wait + for the default. + """ + + uuid = uuidutils.generate_uuid() + result_key = self.RESULT_ITEM_KEY % (self.switch_name, uuid) + input_key = self.INPUT_ITEM_KEY % (self.switch_name, uuid) + + # Start waiting on the key we expect to be created and + # start watching before writing input key to avoid racing + watcher, get_result = self._watch_for_result(result_key) + + batch = { + "uuid": uuid, + "input_key": input_key, + "result_key": result_key, + "cmds": cmds, + } + value = json.dumps(batch, sort_keys=True).encode("utf-8") + try: + # TODO(johngarbutt) add a lease so this times out? + success = self.client.create(input_key, value) + except Exception: + # Be sure to free watcher resources + watcher.stop() + raise + + # Be sure to free watcher resources on error + if not success: + watcher.stop() + raise Exception("failed to add batch to key: %s", input_key) + + LOG.debug("written input key %s", input_key) + return get_result + + def _watch_for_result(self, result_key): + # Logic based on implementation of client.watch_once() + event_queue = queue.Queue() + + def callback(event): + event_queue.put(event) + + watcher = watch.Watcher(self.client, result_key, callback) + + def wait_for_key(timeout): + try: + event = event_queue.get(timeout=timeout) + except queue.Empty: + raise Exception("timed out waiting for key: %s", result_key) + finally: + # NOTE(johngarbutt) this means we need the caller + # to always watch for the result, or call stop + # before starting to wait for the key + watcher.stop() + + LOG.debug("got event: %s", event) + # TODO(johngarbutt) check we have the create event and result? + result_dict = self._get_and_delete_result(result_key) + LOG.debug("got result: %s", result_dict) + return result_dict["result"] + + return watcher, wait_for_key + + def _get_and_delete_result(self, result_key): + # called when watch event says the result key should exist + raw_results = self.client.get(result_key) + if len(raw_results) != 1: + raise Exception("unable to find result: %s", result_key) + raw_value = raw_results[0] + result_dict = json.loads(raw_value.decode('utf-8')) + LOG.debug("fetched result for: ", result_key) + + # delete key now we have the result + delete_success = self.client.delete(result_key) + if not delete_success: + LOG.error("unable to delete result key: %s", + result_key) + LOG.debug("deleted result for: ", result_key) + return result_dict + + def _get_raw_batches(self): + input_prefix = self.INPUT_PREFIX % self.switch_name + # Sort order ensures FIFO style queue + raw_batches = self.client.get_prefix(input_prefix, + sort_order="ascend", + sort_target="create") + return raw_batches + + def get_batches(self): + """Return a list of the event dicts written in wait for result. + + This is called both with or without getting a lock to get the + latest list of work that has send to the per switch queue in + etcd. + """ + raw_batches = self._get_raw_batches() + LOG.debug("found %s batches", len(raw_batches)) + + batches = [] + for raw_value, metadata in raw_batches: + batch = json.loads(raw_value.decode('utf-8')) + batches.append(batch) + return batches + + def record_result(self, result, batches): + """Record the result from executing given batch list. + + We assume that a lock is held before getting a fresh list + of batches, executing them, and then calling this record + results function, before finally dropping the lock. + """ + LOG.debug("write results for %s batches", len(batches)) + + # Write results first, so watchers seen these quickly + for batch in batches: + batch["result"] = result + # TODO(johngarbutt) create this with a lease + # so auto delete if no one gets the result? + success = self.client.create( + batch['result_key'], + json.dumps(batch, sort_keys=True).encode('utf-8')) + if not success: + # TODO(johngarbutt) should we fail to delete the key at + # this point? + LOG.error("failed to report batch result for: %s", + batch) + + # delete input keys so the next worker to hold the lock + # knows not to execute these batches + for batch in batches: + input_key = batch["input_key"] + delete_success = self.client.delete(input_key) + if not delete_success: + LOG.error("unable to delete input key: %s", + input_key) + else: + LOG.debug("deleted input key: %s", input_key) + + def acquire_worker_lock(self, acquire_timeout=300, lock_ttl=120, + wait=None): + """Wait for lock needed to call record_result. + + This blocks until the work queue is empty of the switch lock is + acquired. If we timeout waiting for the lock we raise an exception. + """ + lock_name = self.EXEC_LOCK % self.switch_name + lock = self.client.lock(lock_name, lock_ttl) + + if wait is None: + wait = tenacity.wait_random(min=1, max=3) + + @tenacity.retry( + # Log a message after each failed attempt. + after=tenacity.after_log(LOG, logging.DEBUG), + # Retry if we haven't got the lock yet + retry=tenacity.retry_if_result(lambda x: x is False), + # Stop after timeout. + stop=tenacity.stop_after_delay(acquire_timeout), + # Wait between lock retries + wait=wait, + ) + def _acquire_lock_with_retry(): + lock_acquired = lock.acquire() + if lock_acquired: + return lock + + # Stop waiting for the lock if there is nothing to do + work = self._get_raw_batches() + if not work: + return None + + # Trigger a retry + return False + + return _acquire_lock_with_retry() + + +class SwitchBatch(object): + def __init__(self, switch_name, etcd_url=None, switch_queue=None): + if switch_queue is None: + parsed_url = netutils.urlsplit(etcd_url) + host = parsed_url.hostname + port = parsed_url.port + # TODO(johngarbutt): support certs + protocol = 'https' if parsed_url.scheme.endswith( + 'https') else 'http' + kwargs = dict(host=host, port=port, + protocol=protocol, timeout=30) + LOG.debug(kwargs) + etcd_client = etcd3gw.client( + host=host, port=port, protocol=protocol, + timeout=30) + self.queue = SwitchQueue(switch_name, etcd_client) + else: + self.queue = switch_queue + self.switch_name = switch_name + + def do_batch(self, cmd_set, batch_fn, timeout=300): + """Batch up calls to this function to reduce overheads. + + We collect together the iterables in the cmd_set, and + execute them toegether in a single larger batch. + This reduces overhead, but does make it harder to track + down which of the cmds failed. + + :param cmd_set: an iterable of commands + :param batch_fn: function that takes an iterable of commands + :return: output string generated by the whole batch + """ + + # request that the cmd_set by executed + cmd_list = list(cmd_set) + wait_for_result = self.queue.add_batch_and_wait_for_result(cmd_list) + + def do_work(): + try: + self._execute_pending_batches( + batch_fn) + except Exception as e: + LOG.error("failed to run execute batch: %s", e, + exec_info=True) + raise + + self._spawn(do_work) + + # Wait for our result key + # as the result might be done before the above task starts + output = wait_for_result(timeout=timeout) + LOG.debug("Got batch result: %s", output) + return output + + @staticmethod + def _spawn(work_fn): + # TODO(johngarbutt) remove hard eventlet dependency + # in a similar way to etcd3gw + # Sleep to let possible other work to batch together + eventlet.sleep(0.1) + # Run all pending tasks, which might be a no op + # if pending tasks already ran + eventlet.spawn_n(work_fn) + + def _execute_pending_batches(self, batch_fn): + """Execute all batches currently registered. + + Typically called by every caller of add_batch. + Could be a noop if all batches are already executed. + """ + batches = self.queue.get_batches() + if not batches: + LOG.debug("Skipped execution for %s", self.switch_name) + return + LOG.debug("Getting lock to execute %d batches for %s", + len(batches), self.switch_name) + + lock = self.queue.acquire_worker_lock() + if lock is None: + # This means we stopped waiting as the work queue was empty + LOG.debug("Work list empty for %s", self.switch_name) + return + + # Check we got the lock + if not lock.is_acquired(): + raise Exception("unable to get lock for: %s", self.switch_name) + + # be sure to drop the lock when we are done + with lock: + LOG.debug("got lock for %s", self.switch_name) + + # Fetch fresh list now we have the lock + # and order the list so we execute in order added + batches = self.queue.get_batches() + if not batches: + LOG.debug("No batches to execute %s", self.switch_name) + return + + LOG.debug("Starting to execute %d batches", len(batches)) + all_cmds = [] + for batch in batches: + all_cmds += batch['cmds'] + + # Execute batch function with all the commands + result = batch_fn(all_cmds) + + # Tell request watchers the result and + # tell workers which batches have now been executed + self.queue.record_result(result, batches) + + LOG.debug("end of lock for %s", self.switch_name) diff --git a/networking_generic_switch/devices/__init__.py b/networking_generic_switch/devices/__init__.py index c2319513..daf479b9 100644 --- a/networking_generic_switch/devices/__init__.py +++ b/networking_generic_switch/devices/__init__.py @@ -38,6 +38,7 @@ {'name': 'ngs_switchport_mode', 'default': 'access'}, # If True, disable switch ports that are not in use. {'name': 'ngs_disable_inactive_ports', 'default': False}, + {'name': 'ngs_batch_requests', 'default': False}, ] @@ -105,6 +106,11 @@ def _disable_inactive_ports(self): return strutils.bool_from_string( self.ngs_config['ngs_disable_inactive_ports']) + def _batch_requests(self): + """Return whether to batch up requests to the switch.""" + return strutils.bool_from_string( + self.ngs_config['ngs_batch_requests']) + @abc.abstractmethod def add_network(self, segmentation_id, network_id): pass diff --git a/networking_generic_switch/devices/netmiko_devices/__init__.py b/networking_generic_switch/devices/netmiko_devices/__init__.py index 57d8e2aa..22fd5d45 100644 --- a/networking_generic_switch/devices/netmiko_devices/__init__.py +++ b/networking_generic_switch/devices/netmiko_devices/__init__.py @@ -24,6 +24,7 @@ import tenacity from tooz import coordination +from networking_generic_switch import batching from networking_generic_switch import devices from networking_generic_switch.devices import utils as device_utils from networking_generic_switch import exceptions as exc @@ -61,6 +62,8 @@ def wrapper(self, *args, **kwargs): class NetmikoSwitch(devices.GenericSwitchDevice): + NETMIKO_DEVICE_TYPE = None + ADD_NETWORK = None DELETE_NETWORK = None @@ -88,28 +91,39 @@ class NetmikoSwitch(devices.GenericSwitchDevice): def __init__(self, device_cfg): super(NetmikoSwitch, self).__init__(device_cfg) - device_type = self.config.get('device_type', '') - # use part that is after 'netmiko_' - device_type = device_type.partition('netmiko_')[2] + if self.NETMIKO_DEVICE_TYPE: + device_type = self.NETMIKO_DEVICE_TYPE + else: + device_type = self.config.get('device_type', '') + # use part that is after 'netmiko_' + device_type = device_type.partition('netmiko_')[2] if device_type not in netmiko.platforms: raise exc.GenericSwitchNetmikoNotSupported( device_type=device_type) self.config['device_type'] = device_type + self.lock_kwargs = { + 'locks_pool_size': int(self.ngs_config['ngs_max_connections']), + 'locks_prefix': self.config.get( + 'host', '') or self.config.get('ip', ''), + 'timeout': CONF.ngs_coordination.acquire_timeout} + self.locker = None - if CONF.ngs_coordination.backend_url: + self.batch_cmds = None + if self._batch_requests() and CONF.ngs_coordination.backend_url: + # NOTE: we skip the lock if we are batching requests + self.locker = None + switch_name = self.lock_kwargs['locks_prefix'] + LOG.debug("setting up request batching for: %s", switch_name) + self.batch_cmds = batching.SwitchBatch( + switch_name, CONF.ngs_coordination.backend_url) + elif CONF.ngs_coordination.backend_url: self.locker = coordination.get_coordinator( CONF.ngs_coordination.backend_url, ('ngs-' + CONF.host).encode('ascii')) self.locker.start() atexit.register(self.locker.stop) - self.lock_kwargs = { - 'locks_pool_size': int(self.ngs_config['ngs_max_connections']), - 'locks_prefix': self.config.get( - 'host', '') or self.config.get('ip', ''), - 'timeout': CONF.ngs_coordination.acquire_timeout} - def _format_commands(self, commands, **kwargs): if not commands: return [] @@ -167,6 +181,14 @@ def _create_connection(): yield net_connect def send_commands_to_device(self, cmd_set): + # If configured, batch up requests to the switch + if self.batch_cmds is not None: + return self.batch_cmds.do_batch( + cmd_set, + self._send_commands_to_device) + return self._send_commands_to_device(cmd_set) + + def _send_commands_to_device(self, cmd_set): if not cmd_set: LOG.debug("Nothing to execute") return diff --git a/networking_generic_switch/devices/netmiko_devices/cumulus.py b/networking_generic_switch/devices/netmiko_devices/cumulus.py new file mode 100644 index 00000000..f6a5b40f --- /dev/null +++ b/networking_generic_switch/devices/netmiko_devices/cumulus.py @@ -0,0 +1,73 @@ +# Copyright 2020 StackHPC +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +import re + +from networking_generic_switch.devices import netmiko_devices + + +class Cumulus(netmiko_devices.NetmikoSwitch): + """Built for Cumulus 4.x + + Note for this switch you want config like this, + where secret is the password needed for sudo su: + + [genericswitch:] + device_type = netmiko_cumulus + ip = + username = + password = + secret = + ngs_physical_networks = physnet1 + ngs_max_connections = 1 + ngs_port_default_vlan = 123 + ngs_disable_inactive_ports = False + """ + NETMIKO_DEVICE_TYPE = "linux" + + ADD_NETWORK = [ + 'net add vlan {segmentation_id}', + ] + + DELETE_NETWORK = [ + 'net del vlan {segmentation_id}', + ] + + PLUG_PORT_TO_NETWORK = [ + 'net add interface {port} bridge access {segmentation_id}', + ] + + DELETE_PORT = [ + 'net del interface {port} bridge access {segmentation_id}', + ] + + ENABLE_PORT = [ + 'net del interface {port} link down', + ] + + DISABLE_PORT = [ + 'net add interface {port} link down', + ] + + SAVE_CONFIGURATION = [ + 'net commit', + ] + + ERROR_MSG_PATTERNS = [ + # Its tempting to add this error message, but as only one + # bridge-access is allowed, we ignore that error for now: + # re.compile(r'configuration does not have "bridge-access') + re.compile(r'ERROR: Command not found.'), + re.compile(r'command not found'), + re.compile(r'is not a physical interface on this switch'), + ] diff --git a/networking_generic_switch/tests/unit/netmiko/test_cumulus.py b/networking_generic_switch/tests/unit/netmiko/test_cumulus.py new file mode 100644 index 00000000..73c1831f --- /dev/null +++ b/networking_generic_switch/tests/unit/netmiko/test_cumulus.py @@ -0,0 +1,117 @@ +# Copyright 2020 StackHPC +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import mock + +from networking_generic_switch.devices.netmiko_devices import cumulus +from networking_generic_switch import exceptions as exc +from networking_generic_switch.tests.unit.netmiko import test_netmiko_base + + +class TestNetmikoCumulus(test_netmiko_base.NetmikoSwitchTestBase): + + def _make_switch_device(self, extra_cfg={}): + device_cfg = { + 'device_type': 'netmiko_cumulus', + 'ngs_port_default_vlan': '123', + 'ngs_disable_inactive_ports': 'True', + } + device_cfg.update(extra_cfg) + return cumulus.Cumulus(device_cfg) + + @mock.patch('networking_generic_switch.devices.netmiko_devices.' + 'NetmikoSwitch.send_commands_to_device', + return_value="") + def test_add_network(self, mock_exec): + self.switch.add_network(3333, '0ae071f5-5be9-43e4-80ea-e41fefe85b21') + mock_exec.assert_called_with( + ['net add vlan 3333']) + + @mock.patch('networking_generic_switch.devices.netmiko_devices.' + 'NetmikoSwitch.send_commands_to_device', + return_value="") + def test_delete_network(self, mock_exec): + self.switch.del_network(3333, '0ae071f5-5be9-43e4-80ea-e41fefe85b21') + mock_exec.assert_called_with( + ['net del vlan 3333']) + + @mock.patch('networking_generic_switch.devices.netmiko_devices.' + 'NetmikoSwitch.send_commands_to_device', + return_value="") + def test_plug_port_to_network(self, mock_exec): + self.switch.plug_port_to_network(3333, 33) + mock_exec.assert_called_with( + ['net del interface 3333 link down', + 'net del interface 3333 bridge access 123', + 'net add interface 3333 bridge access 33']) + + @mock.patch('networking_generic_switch.devices.netmiko_devices.' + 'NetmikoSwitch.send_commands_to_device') + def test_plug_port_to_network_fails(self, mock_exec): + mock_exec.return_value = ( + 'ERROR: Command not found.\n\nasdf' + ) + self.assertRaises(exc.GenericSwitchNetmikoConfigError, + self.switch.plug_port_to_network, 3333, 33) + + @mock.patch('networking_generic_switch.devices.netmiko_devices.' + 'NetmikoSwitch.send_commands_to_device') + def test_plug_port_to_network_fails_bad_port(self, mock_exec): + mock_exec.return_value = ( + 'ERROR: asd123 is not a physical interface on this switch.' + '\n\nasdf' + ) + self.assertRaises(exc.GenericSwitchNetmikoConfigError, + self.switch.plug_port_to_network, 3333, 33) + + @mock.patch('networking_generic_switch.devices.netmiko_devices.' + 'NetmikoSwitch.send_commands_to_device', + return_value="") + def test_plug_port_simple(self, mock_exec): + switch = self._make_switch_device({ + 'ngs_disable_inactive_ports': 'false', + 'ngs_port_default_vlan': '', + }) + switch.plug_port_to_network(3333, 33) + mock_exec.assert_called_with( + ['net add interface 3333 bridge access 33']) + + @mock.patch('networking_generic_switch.devices.netmiko_devices.' + 'NetmikoSwitch.send_commands_to_device', + return_value="") + def test_delete_port(self, mock_exec): + self.switch.delete_port(3333, 33) + mock_exec.assert_called_with( + ['net del interface 3333 bridge access 33', + 'net add vlan 123', + 'net add interface 3333 bridge access 123', + 'net add interface 3333 link down']) + + @mock.patch('networking_generic_switch.devices.netmiko_devices.' + 'NetmikoSwitch.send_commands_to_device', + return_value="") + def test_delete_port_simple(self, mock_exec): + switch = self._make_switch_device({ + 'ngs_disable_inactive_ports': 'false', + 'ngs_port_default_vlan': '', + }) + switch.delete_port(3333, 33) + mock_exec.assert_called_with( + ['net del interface 3333 bridge access 33']) + + def test_save(self): + mock_connect = mock.MagicMock() + mock_connect.save_config.side_effect = NotImplementedError + self.switch.save_configuration(mock_connect) + mock_connect.send_command.assert_called_with('net commit') diff --git a/networking_generic_switch/tests/unit/test_batching.py b/networking_generic_switch/tests/unit/test_batching.py new file mode 100644 index 00000000..10213f8a --- /dev/null +++ b/networking_generic_switch/tests/unit/test_batching.py @@ -0,0 +1,191 @@ +# Copyright 2020 StackHPC +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import mock + +import fixtures +from oslo_config import fixture as config_fixture +from oslo_utils import uuidutils +import tenacity + +from networking_generic_switch import batching + + +class SwitchQueueTest(fixtures.TestWithFixtures): + def setUp(self): + super(SwitchQueueTest, self).setUp() + self.cfg = self.useFixture(config_fixture.Config()) + + self.client = mock.Mock() + self.switch_name = "switch1" + self.queue = batching.SwitchQueue(self.switch_name, self.client) + + @mock.patch.object(uuidutils, "generate_uuid") + @mock.patch.object(batching.SwitchQueue, "_watch_for_result") + def test_add_batch_and_wait_for_result(self, mock_watch, mock_uuid): + mock_watch.return_value = ("watcher", "callback") + mock_uuid.return_value = "uuid" + + callback = self.queue.add_batch_and_wait_for_result(["cmd1", "cmd2"]) + + self.assertEqual("callback", callback) + mock_watch.assert_called_once_with('/ngs/batch/switch1/output/uuid') + self.client.create.assert_called_once_with( + '/ngs/batch/switch1/input/uuid', + b'{"cmds": ["cmd1", "cmd2"], ' + b'"input_key": "/ngs/batch/switch1/input/uuid", ' + b'"result_key": "/ngs/batch/switch1/output/uuid", ' + b'"uuid": "uuid"}' + ) + + def test_get_and_delete_result(self): + self.client.get.return_value = [b'{"foo": "bar"}'] + + result = self.queue._get_and_delete_result("result_key") + + self.assertEqual({"foo": "bar"}, result) + self.client.get.assert_called_once_with("result_key") + self.client.delete.assert_called_once_with("result_key") + + def test_get_batches(self): + self.client.get_prefix.return_value = [ + (b'{"foo": "bar"}', {}), + (b'{"foo1": "bar1"}', {}) + ] + + batches = self.queue.get_batches() + + self.assertEqual([ + {"foo": "bar"}, + {"foo1": "bar1"} + ], batches) + self.client.get_prefix.assert_called_once_with( + '/ngs/batch/switch1/input/', + sort_order='ascend', sort_target='create') + + def test_record_result(self): + batches = [ + {"result_key": "result1", "input_key": "input1"}, + {"result_key": "result2", "input_key": "input2"}, + ] + + self.queue.record_result("asdf", batches) + + self.assertEqual(2, self.client.create.call_count) + self.client.create.assert_has_calls([ + mock.call( + "result1", + b'{"input_key": "input1", "result": "asdf", ' + b'"result_key": "result1"}'), + mock.call( + "result2", + b'{"input_key": "input2", "result": "asdf", ' + b'"result_key": "result2"}'), + ]) + self.assertEqual(2, self.client.delete.call_count) + self.client.delete.assert_has_calls([ + mock.call("input1"), + mock.call("input2"), + ]) + + @mock.patch.object(batching.SwitchQueue, "_get_raw_batches") + def test_acquire_worker_lock_timeout(self, mock_get): + mock_get.return_value = ["work"] + lock = mock.MagicMock() + lock.acquire.return_value = False + self.client.lock.return_value = lock + + wait = tenacity.wait_none() + self.assertRaises( + tenacity.RetryError, + self.queue.acquire_worker_lock, + wait=wait, acquire_timeout=0.05) + + @mock.patch.object(batching.SwitchQueue, "_get_raw_batches") + def test_acquire_worker_lock_no_work(self, mock_get): + mock_get.side_effect = [["work"], None] + lock = mock.MagicMock() + lock.acquire.return_value = False + self.client.lock.return_value = lock + + wait = tenacity.wait_none() + result = self.queue.acquire_worker_lock( + wait=wait, acquire_timeout=0.05) + + self.assertIsNone(result) + self.assertEqual(2, mock_get.call_count) + self.assertEqual(2, lock.acquire.call_count) + + @mock.patch.object(batching.SwitchQueue, "_get_raw_batches") + def test_acquire_worker_lock_success(self, mock_get): + mock_get.return_value = ["work"] + lock = mock.MagicMock() + lock.acquire.side_effect = [False, False, True] + self.client.lock.return_value = lock + + wait = tenacity.wait_none() + result = self.queue.acquire_worker_lock( + wait=wait, acquire_timeout=0.05) + + self.assertEqual(lock, result) + self.assertEqual(2, mock_get.call_count) + self.assertEqual(3, lock.acquire.call_count) + + +class SwitchBatchTest(fixtures.TestWithFixtures): + def setUp(self): + super(SwitchBatchTest, self).setUp() + self.cfg = self.useFixture(config_fixture.Config()) + + self.queue = mock.Mock() + self.switch_name = "switch1" + self.batch = batching.SwitchBatch( + self.switch_name, switch_queue=self.queue) + + @mock.patch.object(batching.SwitchBatch, "_spawn") + def test_do_batch(self, mock_spawn): + callback = mock.MagicMock() + callback.return_value = "output" + self.queue.add_batch_and_wait_for_result.return_value = callback + result = self.batch.do_batch(["cmd1"], "fn") + + self.assertEqual("output", result) + self.assertEqual(1, mock_spawn.call_count) + self.queue.add_batch_and_wait_for_result.assert_called_once_with( + ["cmd1"]) + callback.assert_called_once_with(timeout=300) + + def test_do_execute_pending_batches_skip(self): + self.queue.get_batches.return_value = [] + batch = mock.MagicMock() + + result = self.batch._execute_pending_batches(batch) + + self.assertIsNone(result) + + def test_do_execute_pending_batches_success(self): + self.queue.get_batches.return_value = [ + {"cmds": ["cmd1", "cmd2"]}, + {"cmds": ["cmd3", "cmd4"]}, + ] + batch = mock.MagicMock() + lock = mock.MagicMock() + self.queue.acquire_worker_lock.return_value = lock + + self.batch._execute_pending_batches(batch) + + batch.assert_called_once_with(['cmd1', 'cmd2', 'cmd3', 'cmd4']) + self.queue.acquire_worker_lock.assert_called_once_with() + lock.__enter__.assert_called_once_with() + lock.__exit__.assert_called_once_with(None, None, None) diff --git a/releasenotes/notes/add-cumulus-nclu-support-ddcffa604c3e1b18.yaml b/releasenotes/notes/add-cumulus-nclu-support-ddcffa604c3e1b18.yaml new file mode 100644 index 00000000..5377610c --- /dev/null +++ b/releasenotes/notes/add-cumulus-nclu-support-ddcffa604c3e1b18.yaml @@ -0,0 +1,5 @@ +--- +features: + - | + Adds a new device driver, ``netmiko_cumulus``, for managing cumulus + based switch devices via NCLU. diff --git a/requirements.txt b/requirements.txt index 0b369b97..9dc07e1e 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,6 +1,8 @@ # The order of packages is significant, because pip processes them in the order # of appearance. Changing the order has an impact on the overall integration # process, which may cause wedges in the gate later. +etcd3gw>=0.2.4 # Apache-2.0 +eventlet>=0.18.2 # Apache-2.0 stevedore>=1.20.0 # Apache-2.0 netmiko>=2.4.1 # MIT neutron>=13.0.0.0b1 # Apache-2.0 diff --git a/setup.cfg b/setup.cfg index ac4b33f9..e5621dc3 100644 --- a/setup.cfg +++ b/setup.cfg @@ -43,6 +43,7 @@ generic_switch.devices = netmiko_hp_comware = networking_generic_switch.devices.netmiko_devices.hpe:HpeComware netmiko_juniper = networking_generic_switch.devices.netmiko_devices.juniper:Juniper netmiko_mellanox_mlnxos = networking_generic_switch.devices.netmiko_devices.mellanox_mlnxos:MellanoxMlnxOS + netmiko_cumulus = networking_generic_switch.devices.netmiko_devices.cumulus:Cumulus tempest.test_plugins = ngs_tests = tempest_plugin.plugin:NGSTempestPlugin diff --git a/tox.ini b/tox.ini index 68503f0c..947d44a0 100644 --- a/tox.ini +++ b/tox.ini @@ -13,7 +13,7 @@ setenv = VIRTUAL_ENV={envdir} LC_ALL=en_US.UTF-8 TESTS_DIR=./networking_generic_switch/tests/unit/ deps = - -c{env:UPPER_CONSTRAINTS_FILE:https://opendev.org/openstack/requirements/raw/branch/master/upper-constraints.txt} + -c{env:UPPER_CONSTRAINTS_FILE:https://releases.openstack.org/constraints/upper/train} -r{toxinidir}/requirements.txt -r{toxinidir}/test-requirements.txt passenv = http_proxy HTTP_PROXY https_proxy HTTPS_PROXY no_proxy NO_PROXY @@ -36,7 +36,7 @@ setenv = PYTHONHASHSEED=0 sitepackages = False envdir = {toxworkdir}/venv deps = - -c{env:UPPER_CONSTRAINTS_FILE:https://opendev.org/openstack/requirements/raw/branch/master/upper-constraints.txt} + -c{env:UPPER_CONSTRAINTS_FILE:https://releases.openstack.org/constraints/upper/train} -r{toxinidir}/requirements.txt -r{toxinidir}/doc/requirements.txt commands = @@ -56,7 +56,7 @@ commands = [testenv:releasenotes] basepython = python3 deps = - -c{env:UPPER_CONSTRAINTS_FILE:https://opendev.org/openstack/requirements/raw/branch/master/upper-constraints.txt} + -c{env:UPPER_CONSTRAINTS_FILE:https://releases.openstack.org/constraints/upper/train} -r{toxinidir}/doc/requirements.txt commands = sphinx-build -a -E -W -d releasenotes/build/doctrees -b html releasenotes/source releasenotes/build/html @@ -65,7 +65,7 @@ commands = basepython = python3 setenv = PYTHONHASHSEED=0 deps = - -c{env:UPPER_CONSTRAINTS_FILE:https://opendev.org/openstack/requirements/raw/branch/master/upper-constraints.txt} + -c{env:UPPER_CONSTRAINTS_FILE:https://releases.openstack.org/constraints/upper/train} -r{toxinidir}/test-requirements.txt -r{toxinidir}/doc/requirements.txt commands = {posargs} diff --git a/zuul.d/networking-generic-switch-jobs.yaml b/zuul.d/networking-generic-switch-jobs.yaml index eab79498..e68beca8 100644 --- a/zuul.d/networking-generic-switch-jobs.yaml +++ b/zuul.d/networking-generic-switch-jobs.yaml @@ -72,5 +72,6 @@ name: networking-generic-switch-tempest-dlm-python2 parent: networking-generic-switch-tempest-dlm-base vars: + tox_envlist: py27 devstack_localrc: USE_PYTHON3: False diff --git a/zuul.d/project.yaml b/zuul.d/project.yaml index 3cdd2c85..c7ead52e 100644 --- a/zuul.d/project.yaml +++ b/zuul.d/project.yaml @@ -13,7 +13,9 @@ irrelevant-files: - ^(test-|)requirements.txt$ - ^setup.cfg$ - - ironic-grenade-dsvm-multinode-multitenant + # NOTE(iurygregory): Non-voting due to instability. + - ironic-grenade-dsvm-multinode-multitenant: + voting: false - openstack-tox-lower-constraints gate: queue: networking-generic-switch @@ -24,5 +26,6 @@ irrelevant-files: - ^(test-|)requirements.txt$ - ^setup.cfg$ - - ironic-grenade-dsvm-multinode-multitenant + # Removing from gate due to instability. + # - ironic-grenade-dsvm-multinode-multitenant - openstack-tox-lower-constraints