diff --git a/CMakeLists.txt b/CMakeLists.txt index 9169cba..fb9af16 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -148,6 +148,10 @@ if(WITH_TIMEOUT_WARNING) set(SALUS_ENABLE_TIMEOUT_WARNING 1) endif(WITH_TIMEOUT_WARNING) +if(USE_TENSORFLOW) + set(SALUS_ENABLE_TENSORFLOW 1) +endif(USE_TENSORFLOW) + configure_file(src/config.h.in ${CMAKE_CURRENT_BINARY_DIR}/config.h) include_directories(${CMAKE_CURRENT_BINARY_DIR}) diff --git a/benchmarks/driver/runner.py b/benchmarks/driver/runner.py index 6d51252..88fa2a3 100644 --- a/benchmarks/driver/runner.py +++ b/benchmarks/driver/runner.py @@ -213,7 +213,8 @@ def __call__(self, executor, output_file): else: output_file.parent.mkdir(exist_ok=True, parents=True) with output_file.open('w') as f: - return execute(cmd, cwd=str(cwd), env=env, stdout=f, stderr=sp.STDOUT) + # return execute(cmd, cwd=str(cwd), env=env, stdout=f, stderr=sp.STDOUT) + return execute(cmd, cwd=str(cwd), env=env, stdout=f, stderr=None) def _construct_test_name(self, executor): # type: (Executor) -> Tuple[str, str] @@ -239,7 +240,7 @@ def _construct_test_name(self, executor): }) } - variable_batch_size_models = {'vae', 'superres'} + variable_batch_size_models = {'vae', 'superres', 'seq2seq', 'mnistsf', 'mnistcv', 'mnistlg'} if remove_suffix(self.wl.name, 'eval') not in variable_batch_size_models: if self.wl.batch_size not in self.wl.wtl.available_batch_sizes(): raise ValueError(f"Batch size `{self.wl.batch_size}' is not supported for {self.wl.name}," @@ -273,6 +274,8 @@ def _construct_test_name(self, executor): } postfix = names.get(self.wl.batch_size, '0') + if model_name == 'seq2seq' and postfix == '0': + postfix = '2_large' method = f'{cls}.{prefix}{postfix}' return pkg, method diff --git a/benchmarks/driver/server/config/__init__.py b/benchmarks/driver/server/config/__init__.py index 2055115..dad744b 100644 --- a/benchmarks/driver/server/config/__init__.py +++ b/benchmarks/driver/server/config/__init__.py @@ -21,7 +21,7 @@ from builtins import super from absl import flags -from copy import copy +from copy import deepcopy from ...utils import maybe_path from ...utils.compatiblity import pathlib @@ -77,7 +77,7 @@ def __setattr__(self, key, value): def copy(self, **kwargs): # type: (...) -> SalusConfig """Return a new copy of the tuple""" - return copy(self).update(**kwargs) + return deepcopy(self).update(**kwargs) def update(self, d=None, **kwargs): # type: (...) -> SalusConfig diff --git a/benchmarks/exps/__init__.py b/benchmarks/exps/__init__.py index 9262e05..b093421 100644 --- a/benchmarks/exps/__init__.py +++ b/benchmarks/exps/__init__.py @@ -23,6 +23,8 @@ import time import re import logging +import string +import random from absl import flags from typing import Union, Iterable, List, TypeVar, Callable, Optional @@ -379,3 +381,8 @@ def release_on_pipe(pipe): def sync_on_pipe(pipe): wait_on_pipe(pipe) release_on_pipe(pipe) + + +def random_id(size=6, chars=string.ascii_uppercase + string.digits): + """Generate a random ID""" + return ''.join(random.choice(chars) for _ in range(size)) diff --git a/benchmarks/exps/smtracing.py b/benchmarks/exps/smtracing.py index 91c61ba..e1e6d19 100644 --- a/benchmarks/exps/smtracing.py +++ b/benchmarks/exps/smtracing.py @@ -170,6 +170,31 @@ def tfmps(argv): ) +def train_alone(argv): + """Run training workload alone take note of SM usage""" + sm_factors = [float(v) for v in argv] + if not sm_factors: + sm_factors = [1.0, 1.5, 2.0, 2.5, 3.0] + + logger.info(f"Running Salus with sm factors: {sm_factors}") + + # run salus + for factor in sm_factors: + with tempfile.TemporaryDirectory() as td: + scfg = maybe_forced_preset(presets.OpTracing) + scfg.logconf = 'smtracing' + scfg.extra_args += [ + '--sm-factor', f'{factor:.2f}' + ] + logger.info(f"Running Salus with sm factor: {factor}") + # the background training job + wl, pipe = create_train(Executor.Salus, 0, td) + run_seq(scfg.copy(output_dir=FLAGS.save_dir / "alone" / f"{factor:.2f}"), + wl, + RunFn(lambda *args, **kwargs: wait_on_pipe(pipe)), + RunFn(lambda *args, **kwargs: release_on_pipe(pipe))) + + @case_switch_main def main(): - return salus, tfmps + return salus, tfmps, train_alone, salus_factor diff --git a/benchmarks/exps/tune_pending.py b/benchmarks/exps/tune_pending.py new file mode 100644 index 0000000..7371654 --- /dev/null +++ b/benchmarks/exps/tune_pending.py @@ -0,0 +1,389 @@ +# -*- coding: future_fstrings -*- +# +# Copyright 2019 Peifeng Yu +# +# This file is part of Salus +# (see https://github.com/SymbioticLab/Salus). +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +""" +Tune pending: Experiment one inference job with one training job + +Almost the same as Card 304, but try to tune the pending parameter. + +The propurse of this experiment is to tune and debug the SM tracing pipeline. + +- reduce the inference latency, and see if the tail latency for training reduces + +Record inference latency. Compare inference job latency running along vs. running with a training job. + +The latency should be measured with increasing throughput (qps) for the inference job. + +Collected data: inference per iteration speed (latency), training throughput (derived from per iteration speed) +""" +from __future__ import absolute_import, print_function, division, unicode_literals + +import tempfile +from typing import Sequence + +from absl import flags +import logging +import os + +from benchmarks.driver.server.config import presets +from benchmarks.driver.workload import WTL, Executor +from benchmarks.driver.utils.compatiblity import pathlib +from benchmarks.exps import ( + run_seq, maybe_forced_preset, RunFn, Pause, wait_on_pipe, release_on_pipe, + case_switch_main, + run_tfdist, run_tf, + random_id, +) + + +FLAGS = flags.FLAGS +logger = logging.getLogger(__name__) + + +def set_env(wl): + wl.env['SALUS_TFBENCH_EVAL_BLOCK'] = 'true' + + model_dir = pathlib.Path('~/../symbiotic/peifeng/tf_cnn_benchmarks_models/legacy_checkpoint_models') + model_dir = model_dir.expanduser().resolve() + wl.env['SALUS_TFBENCH_EVAL_MODEL_DIR'] = model_dir + + +def create_train(executor, idx, td=None): + # the batch number has no effect here, only used to distinguish different runs + train_wl = WTL.create('inception4', 50, 100 + idx, executor=executor) + # make sure it runs long enough + train_wl.env['SALUS_ITER_SECONDS'] = '300' + + if td is not None: + # create a pipe to signal train_wl + pipetrain = str(pathlib.Path(td) / f'{train_wl.canonical_name}-{random_id()}-fifo') + os.mkfifo(pipetrain) + train_wl.env['SALUS_WAIT_FOR_SIGNAL'] = pipetrain + return train_wl, pipetrain + return train_wl + + +def create_infer(executor, bs, td=None): + wl = WTL.create('vgg11eval', bs, 300, executor=executor) + set_env(wl) + wl.env['SALUS_ITER_SECONDS'] = '150' + wl.extra_args += [ + # '--eval_interval_secs=0.02', + # '--eval_interval_random_factor=5' + ] + + if td is not None: + pipe = str(pathlib.Path(td) / f'{wl.canonical_name}-{random_id()}-fifo') + os.mkfifo(pipe) + wl.env['SALUS_WAIT_FOR_SIGNAL'] = pipe + return wl, pipe + + return wl + + +def alone_tf(_argv): + # run tf + # the foreground inference job + wl = create_infer(Executor.TF, 10) + wl.extra_args += ['--min_mem'] + run_tf(FLAGS.save_dir / "alone", wl) + + # the background training job + wl = create_train(Executor.TF, 0) + wl.extra_args += ['--min_mem'] + run_tf(FLAGS.save_dir / "alone", wl) + + +def alone(argv): + """Run each workload alone for reference""" + sm_factors = [float(v) for v in argv] + if not sm_factors: + sm_factors = [1.0, 1.5, 2.0, 2.5, 3.0] + + logger.info(f"Running Salus with sm factors: {sm_factors}") + + # run salus + for factor in sm_factors: + scfg = maybe_forced_preset(presets.MostEfficient) + scfg.extra_args += [ + '--sm-factor', f'{factor:.2f}' + ] + logger.info(f"Running Salus with sm factor: {factor}") + wl = create_infer(Executor.Salus, 10) + run_seq(scfg.copy(output_dir=FLAGS.save_dir / "alone" / f"{factor:.2f}"), wl) + + # the background training job + wl = create_train(Executor.Salus, 0) + run_seq(scfg.copy(output_dir=FLAGS.save_dir / "alone" / f"{factor:.2f}"), wl) + + +def salus(argv): + # type: (Sequence[str]) -> None + base_cfg = maybe_forced_preset(presets.MostEfficient) + + sm_factors = [float(v) for v in argv] + if not sm_factors: + sm_factors = [1.0, 1.5, 2.0, 2.5, 3.0] + + for idx, factor in enumerate(sm_factors): + scfg = base_cfg.copy(output_dir=FLAGS.save_dir / "salus" / f"{factor:.2f}") + scfg.extra_args += [ + '--sm-factor', f'{factor:.2f}' + ] + with tempfile.TemporaryDirectory() as td: + # create a background training job + train_wl, pipetrain = create_train(Executor.Salus, 0, td) + + # create the foreground inference job + wl, pipe = create_infer(Executor.Salus, 10, td) + + run_seq(scfg, + train_wl, # start the background job + wl, # start the foreground job + # wait for both jobs to be ready + RunFn(lambda *args, **kwargs: wait_on_pipe(pipetrain)), + RunFn(lambda *args, **kwargs: wait_on_pipe(pipe)), + # start train job + RunFn(lambda *args, **kwargs: release_on_pipe(pipetrain)), + # wait 10 seconds + Pause(10), + # release inference job + RunFn(lambda *args, **kwargs: release_on_pipe(pipe)), + # run_seq automatically join all jobs at the end of the sequence + ) + + +def inverse_salus(argv): + # type: (Sequence[str]) -> None + """Inversed priority for training and inference""" + base_cfg = maybe_forced_preset(presets.MostEfficient) + + sm_factors = [float(v) for v in argv] + if not sm_factors: + sm_factors = [1.0, 1.5, 2.0, 2.5, 3.0] + + for idx, factor in enumerate(sm_factors): + scfg = base_cfg.copy(output_dir=FLAGS.save_dir / "inverse" / f"{factor:.2f}") + scfg.extra_args += [ + '--sm-factor', f'{factor:.2f}' + ] + with tempfile.TemporaryDirectory() as td: + # create a background training job + train_wl, pipetrain = create_train(Executor.Salus, 0, td) + + # create the foreground inference job + wl, pipe = create_infer(Executor.Salus, 10, td) + wl.extra_args += [ + '--eval_sched_priority', '40' + ] + + run_seq(scfg, + train_wl, # start the background job + wl, # start the foreground job + # wait for both jobs to be ready + RunFn(lambda *args, **kwargs: wait_on_pipe(pipetrain)), + RunFn(lambda *args, **kwargs: wait_on_pipe(pipe)), + # start train job + RunFn(lambda *args, **kwargs: release_on_pipe(pipetrain)), + # wait 10 seconds + Pause(10), + # release inference job + RunFn(lambda *args, **kwargs: release_on_pipe(pipe)), + # run_seq automatically join all jobs at the end of the sequence + ) + + +def same_pri_salus(argv): + # type: (Sequence[str]) -> None + """Inversed priority for training and inference""" + base_cfg = maybe_forced_preset(presets.MostEfficient) + + sm_factors = [float(v) for v in argv] + if not sm_factors: + sm_factors = [1.0, 1.5, 2.0, 2.5, 3.0] + + for idx, factor in enumerate(sm_factors): + scfg = base_cfg.copy(output_dir=FLAGS.save_dir / "same_pri" / f"{factor:.2f}") + scfg.extra_args += [ + '--sm-factor', f'{factor:.2f}' + ] + with tempfile.TemporaryDirectory() as td: + # create a background training job + train_wl, pipetrain = create_train(Executor.Salus, 0, td) + + # create the foreground inference job + wl, pipe = create_infer(Executor.Salus, 10, td) + wl.extra_args += [ + '--eval_sched_priority', '20' + ] + + run_seq(scfg, + train_wl, # start the background job + wl, # start the foreground job + # wait for both jobs to be ready + RunFn(lambda *args, **kwargs: wait_on_pipe(pipetrain)), + RunFn(lambda *args, **kwargs: wait_on_pipe(pipe)), + # start train job + RunFn(lambda *args, **kwargs: release_on_pipe(pipetrain)), + # wait 10 seconds + Pause(10), + # release inference job + RunFn(lambda *args, **kwargs: release_on_pipe(pipe)), + # run_seq automatically join all jobs at the end of the sequence + ) + + +def tfmps(argv): + # type: (Sequence[str]) -> None + batch_sizes = [int(v) for v in argv[1:]] + + if not batch_sizes: + batch_sizes = [1, 2, 4, 8] + + for idx, bs in enumerate(batch_sizes): + with tempfile.TemporaryDirectory() as td: + # create a background training job + train_wl, pipetrain = create_train(Executor.TF, idx, td) + train_wl.extra_args += ['--min_mem'] + + # create the foreground inference job + wl, pipe = create_infer(Executor.TF, bs, td) + wl.extra_args += ['--min_mem'] + + run_tf(FLAGS.save_dir / "tfmps", + train_wl, # start the background job + wl, # start the foreground job + # wait for both jobs to be ready + RunFn(lambda *args, **kwargs: wait_on_pipe(pipetrain)), + RunFn(lambda *args, **kwargs: wait_on_pipe(pipe)), + # start train job + RunFn(lambda *args, **kwargs: release_on_pipe(pipetrain)), + # wait 10 seconds + Pause(10), + # release inference job + RunFn(lambda *args, **kwargs: release_on_pipe(pipe)), + # run_seq automatically join all jobs at the end of the sequence + ) + + +def twoinfer_tfmps(argv): + # type: (Sequence[str]) -> None + batch_sizes = [int(v) for v in argv] + + if not batch_sizes: + batch_sizes = [1, 2, 4, 8] + + for idx, bs in enumerate(batch_sizes): + with tempfile.TemporaryDirectory() as td: + # create the foreground inference job + wl1, pipe1 = create_infer(Executor.TF, bs, td) + wl1.extra_args += ['--min_mem'] + # create the foreground inference job + wl2, pipe2 = create_infer(Executor.TF, bs, td) + wl2.extra_args += ['--min_mem'] + + run_tf(FLAGS.save_dir / "twoinfer" / "tfmps", + wl1, # start the background job + wl2, # start the foreground job + # wait for both jobs to be ready + RunFn(lambda *args, **kwargs: wait_on_pipe(pipe1)), + RunFn(lambda *args, **kwargs: wait_on_pipe(pipe2)), + # start train job + RunFn(lambda *args, **kwargs: release_on_pipe(pipe1)), + # release inference job + RunFn(lambda *args, **kwargs: release_on_pipe(pipe2)), + # run_seq automatically join all jobs at the end of the sequence + ) + + +def twoinfer(argv): + # type: (Sequence[str]) -> None + base_cfg = maybe_forced_preset(presets.MostEfficient) + + sm_factors = [float(v) for v in argv] + if not sm_factors: + sm_factors = [1.0, 1.5, 2.0, 2.5, 3.0] + + for idx, factor in enumerate(sm_factors): + scfg = base_cfg.copy(output_dir=FLAGS.save_dir / "twoinfer" / "salus" / f"{factor:.2f}") + scfg.extra_args += [ + '--sm-factor', f'{factor:.2f}' + ] + with tempfile.TemporaryDirectory() as td: + # create the foreground inference job + wl1, pipe1 = create_infer(Executor.Salus, 10, td) + + # create the foreground inference job + wl2, pipe2 = create_infer(Executor.Salus, 10, td) + + run_seq(scfg, + wl1, # start the first job + wl2, # start the second job + # wait for both jobs to be ready + RunFn(lambda *args, **kwargs: wait_on_pipe(pipe1)), + RunFn(lambda *args, **kwargs: wait_on_pipe(pipe2)), + # start 1st job + RunFn(lambda *args, **kwargs: release_on_pipe(pipe1)), + # release 2nd job + RunFn(lambda *args, **kwargs: release_on_pipe(pipe2)), + # run_seq automatically join all jobs at the end of the sequence + ) + + +def twoinfer_pri(argv): + # type: (Sequence[str]) -> None + """Two inferences with difference priority""" + base_cfg = maybe_forced_preset(presets.MostEfficient) + + sm_factors = [float(v) for v in argv] + if not sm_factors: + sm_factors = [1.0, 1.5, 2.0, 2.5, 3.0] + + for idx, factor in enumerate(sm_factors): + scfg = base_cfg.copy(output_dir=FLAGS.save_dir / "twoinfer_pri" / "salus" / f"{factor:.2f}") + scfg.extra_args += [ + '--sm-factor', f'{factor:.2f}' + ] + with tempfile.TemporaryDirectory() as td: + # create the foreground inference job + wl1, pipe1 = create_infer(Executor.Salus, 10, td) + + # create the background inference job + wl2, pipe2 = create_infer(Executor.Salus, 10, td) + wl2.extra_args += [ + '--eval_sched_priority', '20' + ] + + run_seq(scfg, + wl1, # start the first job + wl2, # start the second job + # wait for both jobs to be ready + RunFn(lambda *args, **kwargs: wait_on_pipe(pipe1)), + RunFn(lambda *args, **kwargs: wait_on_pipe(pipe2)), + # start 1st job + RunFn(lambda *args, **kwargs: release_on_pipe(pipe1)), + # release 2nd job + RunFn(lambda *args, **kwargs: release_on_pipe(pipe2)), + # run_seq automatically join all jobs at the end of the sequence + ) + + +@case_switch_main +def main(): + return alone, salus, tfmps, twoinfer, twoinfer_tfmps, inverse_salus, same_pri_salus, twoinfer_pri diff --git a/scripts/parse_card250.py b/scripts/parse_card250.py index 3b33080..f347c47 100644 --- a/scripts/parse_card250.py +++ b/scripts/parse_card250.py @@ -1,16 +1,16 @@ #!/usr/bin/env python3 # # Copyright 2019 Peifeng Yu -# +# # This file is part of Salus # (see https://github.com/SymbioticLab/Salus). -# +# # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -127,12 +127,11 @@ def plot_speeds(df, total_kws=None, **kwargs): ax.legend().remove() ax.set_xlabel('Time (s)') - ax.set_ylabel('Images per second') + ax.set_ylabel('Images\nper second') return ax -path = 'logs/nsdi19' -def prepare_paper(path): +def prepare_paper(path='logs/nsdi19'): path = Path(path) df = load_speeds(path/'card250'/'case1') @@ -166,6 +165,6 @@ def prepare_paper(path): total_kws={'marker': 'None', 'zorder': -1, 'linewidth': 1}) fig.tight_layout() - fig.set_size_inches(3.25, 2.35, forward=True) - fig.savefig('/tmp/workspace/card250.pdf', dpi=300) + fig.set_size_inches(3.25, 1.5, forward=True) + fig.savefig('/tmp/workspace/card250.pdf', dpi=300, bbox_inches='tight') plt.close() diff --git a/scripts/parse_card260.py b/scripts/parse_card260.py index 317f0bd..a466f66 100644 --- a/scripts/parse_card260.py +++ b/scripts/parse_card260.py @@ -1,16 +1,16 @@ #!/usr/bin/env python3 # # Copyright 2019 Peifeng Yu -# +# # This file is part of Salus # (see https://github.com/SymbioticLab/Salus). -# +# # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -112,14 +112,14 @@ def plot_ratio(df, **kwargs): try: path except NameError: - path = '/tmp/workspace' + path = 'logs/nsdi19' -def prepare_paper(path): +def prepare_paper(path='logs/nsdi19'): with plt.style.context(['seaborn-paper', 'mypaper', 'gray']): df = load_data(path) fig, ax = plt.subplots() - fig.set_size_inches(3.25, 1.5, forward=True) + fig.set_size_inches(3.25, 1.2, forward=True) #plot_eval_pit_vs_speed(df, ax=ax) #ax.set_xlabel('Time (s)') @@ -132,4 +132,4 @@ def prepare_paper(path): fig.tight_layout() fig.savefig('/tmp/workspace/card260.pdf', dpi=300, bbox_inches='tight', pad_inches = .015) - plt.close() \ No newline at end of file + plt.close() diff --git a/scripts/parse_card271.py b/scripts/parse_card271.py index 03999be..1764e8b 100644 --- a/scripts/parse_card271.py +++ b/scripts/parse_card271.py @@ -1,16 +1,16 @@ #!/usr/bin/env python3 # # Copyright 2019 Peifeng Yu -# +# # This file is part of Salus # (see https://github.com/SymbioticLab/Salus). -# +# # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -129,15 +129,14 @@ def plot_makespan(df, **kwargs): return ax -path = 'logs/nsdi19' -def prepare_paper(path): +def prepare_paper(path='logs/nsdi19'): path = Path(path) with plt.style.context(['seaborn-paper', 'mypaper', 'color3']): # fifo = ju.load_trace(path/'card266'/'salus'/'trace.csv') df = load_data(path/'card271') fig, ax = plt.subplots() - fig.set_size_inches(3.25, 1.85, forward=True) + fig.set_size_inches(3.25, 1.3, forward=True) # set col order df = df[['Network', 'Salus', 'TF']] @@ -150,4 +149,4 @@ def prepare_paper(path): fig.tight_layout() fig.savefig('/tmp/workspace/card271.pdf', dpi=300) plt.close() - return df \ No newline at end of file + return df diff --git a/scripts/parse_card272.py b/scripts/parse_card272.py index 523b9e0..1ded432 100644 --- a/scripts/parse_card272.py +++ b/scripts/parse_card272.py @@ -1,16 +1,16 @@ #!/usr/bin/env python3 # # Copyright 2019 Peifeng Yu -# +# # This file is part of Salus # (see https://github.com/SymbioticLab/Salus). -# +# # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -116,7 +116,7 @@ def do_timelines(path): path = 'logs/nsdi19' -def prepare_paper(path): +def prepare_paper(path='logs/nsdi19'): path = Path(path) with plt.style.context(['seaborn-paper', 'mypaper', 'line12']): # also use color @@ -131,7 +131,7 @@ def prepare_paper(path): pack = load_data(path/'card272'/'case1'/'salus', 'case1.output') fig, ax = plt.subplots() - fig.set_size_inches(3.25, 1.85, forward=True) + fig.set_size_inches(3.25, 1.5, forward=True) jcts = pd.DataFrame({'FIFO': fifo.JCT, 'SRTF': srtf.JCT, 'PACK': pack.JCT, 'FAIR': fair.JCT}) plot_jcts(jcts, ax=ax, markevery=0.1, markersize=4, linewidth=1) @@ -141,4 +141,4 @@ def prepare_paper(path): fig.savefig('/tmp/workspace/card272-jct.pdf', dpi=300) plt.close() - return fifo, srtf, srtf_refine, fair, pack \ No newline at end of file + return fifo, srtf, srtf_refine, fair, pack diff --git a/scripts/parse_card274.py b/scripts/parse_card274.py index fa6584c..4b2fb60 100644 --- a/scripts/parse_card274.py +++ b/scripts/parse_card274.py @@ -1,16 +1,16 @@ #!/usr/bin/env python3 # # Copyright 2019 Peifeng Yu -# +# # This file is part of Salus # (see https://github.com/SymbioticLab/Salus). -# +# # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -35,6 +35,7 @@ #import seaborn as sns import numpy as np import matplotlib.pyplot as plt +import matplotlib.patches as mpatches from matplotlib import cycler import plotutils as pu @@ -94,11 +95,17 @@ def do_srtf2(path): plot_offset=-st_sec ) ax.set_xlim([0, ed_sec-st_sec]) + + # add a legend + ax.legend(handles=[ + mpatches.Patch(color='#b6b6b6', label='Queuing'), + mpatches.Patch(color='black', label='Active') + ], bbox_to_anchor=(0.85, 0.03), loc='lower right') ax.set_ylabel('Job #') ax.yaxis.set_ticks([0, 1, 2, 3, 4, 5]) - fig.set_size_inches(4.875, 2, forward=True) + fig.set_size_inches(4.875, 1.5, forward=True) fig.savefig('/tmp/workspace/card274-srtf-compute.pdf', dpi=300, bbox_inches='tight', pad_inches = .015) plt.close() @@ -146,7 +153,7 @@ def do_srtf3(path): ax.legend().remove() #fig.tight_layout() - fig.set_size_inches(1.625, 2, forward=True) + fig.set_size_inches(1.625, 1.5, forward=True) fig.savefig('/tmp/workspace/card274-srtf-mem.pdf', dpi=300, bbox_inches='tight', pad_inches = .015) plt.close() diff --git a/scripts/parse_exp17.py b/scripts/parse_exp17.py index e050424..1b4d4bb 100644 --- a/scripts/parse_exp17.py +++ b/scripts/parse_exp17.py @@ -1,16 +1,16 @@ #!/usr/bin/env python3 # # Copyright 2019 Peifeng Yu -# +# # This file is part of Salus # (see https://github.com/SymbioticLab/Salus). -# +# # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -73,20 +73,25 @@ def load_exp17(path): pits = pits.drop(['index', 'BatchSize', 'Network'], axis=1) pits = pits.rename(columns={'Model': 'Network'}).set_index('Network') +old_vae = pits.at['vae', 'Salus'] +pits.loc['vae', 'Salus'] = 1.2 +old_superres = pits.at['superres', 'Salus'] +pits.loc['superres', 'Salus'] = 1.2 + with plt.style.context(['seaborn-paper', 'mypaper', 'color3']): ax = pits.plot.bar(legend=None) pu.axhlines(1.0, ax=ax, color='k', linestyle='--', linewidth=1) - pu.bar_show_data(ax, pits.index.get_loc('superres'), pits.at['superres', 'Salus']) - pu.bar_show_data(ax, pits.index.get_loc('vae'), pits.at['vae', 'Salus']) + pu.bar_show_data(ax, pits.index.get_loc('superres'), 1.15, data_y=old_superres, fmt='{:.2f}') + pu.bar_show_data(ax, pits.index.get_loc('vae'), 1.13, data_y=old_vae, fmt='{:.2f}') - ax.set_ylim(0.9, 1.9) + ax.set_ylim(0.9, 1.15) ax.set_xlabel('Workloads') - ax.set_ylabel('Normalized Per Iteration\nTraining Time') + ax.set_ylabel('Normalized\nPer Iteration\nTraining Time') # ax.legend() ax.tick_params(axis='x', labelsize=7) - ax.figure.set_size_inches(3.25, 2.35, forward=True) + ax.figure.set_size_inches(3.25, 1.8, forward=True) ax.figure.tight_layout() - ax.figure.savefig('/tmp/workspace/exp17.pdf', dpi=300, bbox_inches='tight', pad_inches = .015) + ax.figure.savefig('/tmp/workspace/exp17.pdf', dpi=300, bbox_inches='tight', pad_inches=.015) plt.close() diff --git a/scripts/plotutils.py b/scripts/plotutils.py index cb4c45c..41da8e8 100644 --- a/scripts/plotutils.py +++ b/scripts/plotutils.py @@ -1,15 +1,15 @@ # # Copyright 2019 Peifeng Yu -# +# # This file is part of Salus # (see https://github.com/SymbioticLab/Salus). -# +# # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -435,7 +435,7 @@ def bar(df, width=0.8, ax=None, **kwargs): return ax -def bar_show_data(ax, x, y, fmt='{:.1f}', **kwargs): +def bar_show_data(ax, x, y, data_y=None, fmt='{:.1f}', **kwargs): kws = { 'xytext': [0, 7], 'textcoords': 'offset points', @@ -443,7 +443,9 @@ def bar_show_data(ax, x, y, fmt='{:.1f}', **kwargs): 'horizontalalignment': 'center', 'verticalalignment': 'top' } - ax.annotate(fmt.format(y), + if data_y is None: + data_y = y + ax.annotate(fmt.format(data_y), xy=[x, y], **{**kws, **kwargs}) diff --git a/src/config.h.in b/src/config.h.in index 16f5518..60bca31 100644 --- a/src/config.h.in +++ b/src/config.h.in @@ -12,6 +12,7 @@ #cmakedefine SALUS_ENABLE_EXCLUSIVE_ITER #cmakedefine SALUS_ENABLE_TIMEOUT_WARNING #cmakedefine SALUS_ENABLE_JSON_LOG +#cmakedefine SALUS_ENABLE_TENSORFLOW #define SALUS_BUILD_TYPE "@CMAKE_BUILD_TYPE@" diff --git a/src/main.cpp b/src/main.cpp index 03c966e..ab67d14 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -17,13 +17,18 @@ * limitations under the License. */ +#include "utils/macros.h" + +#ifdef SALUS_ENABLE_TENSORFLOW +#include "oplibraries/tensorflow/v3/smblocker.h" +#endif + #include "execution/executionengine.h" #include "resources/resources.h" #include "platform/logging.h" #include "platform/signals.h" #include "platform/profiler.h" #include "rpcserver/zmqserver.h" -#include "utils/macros.h" #include "utils/envutils.h" #include @@ -45,6 +50,7 @@ const static auto listen = "--listen"; const static auto maxHolWaiting = "--max-hol-waiting"; const static auto disableFairness = "--disable-fairness"; const static auto disableWorkConservative = "--disable-wc"; +const static auto smFactor = "--sm-factor"; const static auto scheduler = "--sched"; const static auto logConf = "--logconf"; @@ -77,6 +83,7 @@ Salus: Fine-Grained GPU Sharing for DNN. fairness is on. --max-hol-waiting= Maximum number of task allowed go before queue head in scheduling. [default: 50] + --sm-factor= Scale factor for # of SMs. [default: 1] -c , --logconf= Path to log configuration file. Note that settings in this file takes precedence over other command line arguments. @@ -214,6 +221,17 @@ void configureExecution(std::map &args) salus::ExecutionEngine::instance().setSchedulingParam({maxQueueHeadWaiting, !disableWorkConservative, sched}); } +void configureSMBlocker(std::map &args) +{ +#ifdef SALUS_ENABLE_TENSORFLOW + // docopt doesn't handle double number + // so we get as string and do conversion ourselves + auto scale = std::atof(value_or(args[flags::smFactor], "1.0"s).c_str()); + + salus::oplib::tensorflow::SMBlocker::setScaleFactorSM(scale); +#endif +} + void printConfiguration(std::map &) { LOG(INFO) << "Running build type: " << SALUS_BUILD_TYPE; @@ -237,6 +255,11 @@ void printConfiguration(std::map &) LOG(INFO) << " Policy: " << param.scheduler; LOG(INFO) << " MaxQueueHeadWaiting: " << param.maxHolWaiting; LOG(INFO) << " WorkConservative: " << (param.workConservative ? "on" : "off"); + +#ifdef SALUS_ENABLE_TENSORFLOW + LOG(INFO) << "GPU execution:"; + LOG(INFO) << " SM scale factor: " << salus::oplib::tensorflow::SMBlocker::scaleFactorSM(); +#endif } int main(int argc, char **argv) @@ -250,6 +273,8 @@ int main(int argc, char **argv) configureExecution(args); + configureSMBlocker(args); + printConfiguration(args); ScopedProfiling sp(value_or(args[flags::gperf], false)); diff --git a/src/oplibraries/tensorflow/v3/smblocker.cpp b/src/oplibraries/tensorflow/v3/smblocker.cpp index 7114701..f2eaf55 100644 --- a/src/oplibraries/tensorflow/v3/smblocker.cpp +++ b/src/oplibraries/tensorflow/v3/smblocker.cpp @@ -78,9 +78,11 @@ void salus_kernel_launch_callback(unsigned int gridDimX, unsigned int gridDimY, namespace salus::oplib::tensorflow { +double SMBlocker::m_scaleFactorSM = 0.0; + SMBlocker &SMBlocker::instance() { - static SMBlocker blocker; + static SMBlocker blocker(scaleFactorSM()); return blocker; } @@ -95,9 +97,9 @@ SMUsage SMBlocker::queryAvailableSM() }; } -SMBlocker::SMBlocker() - : m_maxUsage{queryAvailableSM()} - , m_freeBlocks(m_maxUsage.blockCount) +SMBlocker::SMBlocker(double factor) + : m_maxUsage{queryAvailableSM(), factor} + , m_freeBlocks(m_maxUsage.get().blockCount) { } @@ -159,7 +161,7 @@ void SMBlocker::wait(uint64_t graphId, int nodeId, int priority) LogSMTracing() << "Wait at SMBlocker: graph " << graphId << " node " << nodeId << " sm " << smUsage << " priority " << priority; m_freeBlocks.wait(smUsage, priority); - LogSMTracing() << "Passed at SMBlocker: graph " << graphId << " node " << nodeId + LogSMTracing() << "Took at SMBlocker: graph " << graphId << " node " << nodeId << " sm " << smUsage << " priority " << priority; } @@ -169,11 +171,13 @@ uint64_t SMBlocker::getUsageForKernel(uint64_t graphId, int nodeId) auto usage = sstl::getOrDefault(m_cache, {graphId, nodeId}, {}); - return std::min(usage.blockCount, m_maxUsage.blockCount); + return std::min(usage.blockCount, m_maxUsage.get().blockCount); } void SMBlocker::release(uint64_t numSms) { + LogSMTracing() << "Release at SMBlocker: graph " << 0 << " node " << 0 + << " sm " << numSms << " priority " << 0; m_freeBlocks.post(numSms); } diff --git a/src/oplibraries/tensorflow/v3/smblocker.h b/src/oplibraries/tensorflow/v3/smblocker.h index 89a54fb..714a871 100644 --- a/src/oplibraries/tensorflow/v3/smblocker.h +++ b/src/oplibraries/tensorflow/v3/smblocker.h @@ -57,6 +57,17 @@ class SMBlocker public: static SMBlocker &instance(); + static void setScaleFactorSM(double factor) + { + m_scaleFactorSM = factor; + } + + static double scaleFactorSM() + { + CHECK_NE(m_scaleFactorSM, 0.0) << "Must call SMBlocker::setScaleFactorSM before getting value"; + return m_scaleFactorSM; + } + /** * @brief Release this amount of numSms */ @@ -95,13 +106,38 @@ class SMBlocker static constexpr int MaxPriority = 100; private: + static double m_scaleFactorSM; static SMUsage queryAvailableSM(); - SMBlocker(); + explicit SMBlocker(double factor); uint64_t getUsageForKernel(uint64_t graphId, int nodeId); - const SMUsage m_maxUsage; + class MaxSMUsage + { + SMUsage usage; + double scale; + public: + explicit MaxSMUsage(SMUsage u, double scale = 1.0) + : usage(u) + , scale(scale) + {} + + SMUsage get() const { + return {usage.threadPerBlock, static_cast(usage.blockCount * scale)}; + } + double getScale() const { + return scale; + } + void set(SMUsage u) { + usage = u; + } + void setScale(double s) { + scale = s; + } + }; + + MaxSMUsage m_maxUsage; sstl::priority_semaphore m_freeBlocks; diff --git a/tests/test_tf/lib/seq2seq/ptb/ptb_word_lm.py b/tests/test_tf/lib/seq2seq/ptb/ptb_word_lm.py index bfe1ed6..6f9eb46 100644 --- a/tests/test_tf/lib/seq2seq/ptb/ptb_word_lm.py +++ b/tests/test_tf/lib/seq2seq/ptb/ptb_word_lm.py @@ -206,7 +206,7 @@ def run_epoch(self, session, eval_op=None, verbose=False): state = session.run(self.initial_state) eval_interval = os.environ.get('SALUS_TFBENCH_EVAL_INTERVAL', '0.1') - eval_rand_factor = os.environ.get('SALUS_TFBENCH_EVAL_RAND_FACTOR', '5') + eval_rand_factor = os.environ.get('SALUS_TFBENCH_EVAL_RAND_FACTOR', None) eval_block = os.environ.get('SALUS_TFBENCH_EVAL_BLOCK', 'true') if eval_block != 'true': @@ -242,10 +242,11 @@ def run_epoch(self, session, eval_op=None, verbose=False): print(fmt_str.format(datetime.now(), step, np.exp(costs / iters), local_speed, dur)) if self._train_op is None: - factor = 1 - if eval_rand_factor != "1": - factor = random.randint(1, int(eval_rand_factor)) - time.sleep(float(eval_interval) * factor) + if float(eval_interval) > 0: + factor = 1 + if eval_rand_factor is not None: + factor = random.randint(1, int(eval_rand_factor)) + time.sleep(float(eval_interval) * factor) return np.exp(costs / iters), speeds diff --git a/tests/test_tf/test_mnist_tf.py b/tests/test_tf/test_mnist_tf.py index bcdd15b..21b5496 100644 --- a/tests/test_tf/test_mnist_tf.py +++ b/tests/test_tf/test_mnist_tf.py @@ -16,6 +16,8 @@ def run_mnist_softmax(sess, batch_size=50): + batch_size = tfhelper.batch_size_from_env(batch_size) + print('Using batch_size {}'.format(batch_size)) x_image, y_, num_classes = fake_data(batch_size, None, height=28, width=28, depth=1, num_classes=10) y_ = tf.one_hot(y_, num_classes) x = tf.reshape(x_image, [-1, 784]) @@ -73,6 +75,8 @@ def conv2d(x, W): def max_pool_2x2(x): return tf.nn.max_pool(x, ksize=[1, 2, 2, 1], strides=[1, 2, 2, 1], padding='SAME') + batch_size = tfhelper.batch_size_from_env(batch_size) + print('Using batch_size {}'.format(batch_size)) x_image, y_, num_classes = fake_data(batch_size, None, height=28, width=28, depth=1, num_classes=10) y_ = tf.one_hot(y_, num_classes) keep_prob = tf.placeholder(tf.float32) @@ -150,6 +154,8 @@ def conv2d(x, W): def max_pool_2x2(x): return tf.nn.max_pool(x, ksize=[1, 2, 2, 1], strides=[1, 2, 2, 1], padding='SAME') + batch_size = tfhelper.batch_size_from_env(batch_size) + print('Using batch_size {}'.format(batch_size)) x_image, y_, num_classes = fake_data(batch_size, None, height=28, width=28, depth=1, num_classes=10) y_ = tf.one_hot(y_, num_classes) keep_prob = tf.placeholder(tf.float32) diff --git a/tests/test_tf/test_seq.py b/tests/test_tf/test_seq.py index 64edbbb..971d558 100644 --- a/tests/test_tf/test_seq.py +++ b/tests/test_tf/test_seq.py @@ -19,10 +19,12 @@ def run_seq_ptb(sess, config_name): eval_config = get_config(config_name) config.max_max_epoch = 1 config.max_epoch = 1 + config.batch_size = tfhelper.batch_size_from_env(config.batch_size) + print("Using batch size {}".format(config.batch_size)) eval_config.max_max_epoch = 1 eval_config.max_epoch = 1 - eval_config.batch_size = 1 + eval_config.batch_size = config.batch_size eval_config.num_steps = 1 train_input, valid_input, test_input = datasets.ptb_data(config, eval_config) @@ -65,10 +67,12 @@ def test_seq_ptb(sess, config_name): eval_config = get_config(config_name) config.max_max_epoch = 1 config.max_epoch = 1 + config.batch_size = tfhelper.batch_size_from_env(config.batch_size) + print("Using batch size {}".format(config.batch_size)) eval_config.max_max_epoch = 1 eval_config.max_epoch = 1 - eval_config.batch_size = 1 + eval_config.batch_size = config.batch_size eval_config.num_steps = 1 train_input, valid_input, test_input = datasets.ptb_data(config, eval_config) diff --git a/tests/test_tf/test_super_res.py b/tests/test_tf/test_super_res.py index fff5fa2..7dfd669 100644 --- a/tests/test_tf/test_super_res.py +++ b/tests/test_tf/test_super_res.py @@ -20,6 +20,7 @@ def run_superres(sess, input_data, batch_size=100, isEval=False): batch_size = tfhelper.batch_size_from_env(batch_size) + print("{}: Using batch size {}".format(datetime.now(), batch_size)) input_images, target_images = input_data(batch_size=batch_size)