Skip to content

Commit

Permalink
core - refactor outputs add policy api/processs stats and tracing (cl…
Browse files Browse the repository at this point in the history
  • Loading branch information
kapilt authored Oct 4, 2018
1 parent 0beaa59 commit 48812eb
Show file tree
Hide file tree
Showing 47 changed files with 1,278 additions and 609 deletions.
27 changes: 2 additions & 25 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,41 +1,18 @@

install:
python3.6 -m virtualenv --python python3.6 .
python3 -m venv .
. bin/activate && pip install -r requirements-dev.txt
. bin/activate && pip install -e .
. bin/activate && pip install -r tools/c7n_mailer/requirements.txt
. bin/activate && pip install -r tools/c7n_azure/requirements.txt
. bin/activate && pip install -r tools/c7n_gcp/requirements.txt
. bin/activate && pip install -r tools/c7n_kube/requirements.txt

coverage:
rm -Rf .coverage
AWS_DEFAULT_REGION=us-east-1 AWS_ACCESS_KEY_ID=foo AWS_SECRET_ACCESS_KEY=bar C7N_VALIDATE=true nosetests -s -v --with-coverage --cover-html --cover-package=c7n --cover-html-dir=coverage --processes=-1 --cover-inclusive tests --process-timeout=64

test:
./bin/tox -e py27

test3:
./bin/tox -e py36

nose-tests:
AWS_DEFAULT_REGION=us-east-1 AWS_ACCESS_KEY_ID=foo AWS_SECRET_ACCESS_KEY=bar C7N_VALIDATE=true nosetests -s -v --processes=-1 --process-timeout=300 tests


azure-tests:

C7N_VALIDATE=true AZURE_ACCESS_TOKEN=fake_token AZURE_SUBSCRIPTION_ID=ea42f556-5106-4743-99b0-c129bfa71a47 ./bin/py.test --tb=native tools/c7n_azure

ttest:
AWS_DEFAULT_REGION=us-east-1 nosetests -s --with-timer --process-timeout=300 tests

depcache:
mkdir -p deps
python -m virtualenv --python python2.7 dep-download
dep-download/bin/pip install -d deps -r requirements.txt
tar cvf custodian-deps.tgz deps
rm -Rf dep-download
rm -Rf deps
./bin/tox -e py37

ftest:
C7N_FUNCTIONAL=yes AWS_DEFAULT_REGION=us-east-2 ./bin/py.test -m functional tests
Expand Down
22 changes: 20 additions & 2 deletions c7n/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,25 @@

log = logging.getLogger('custodian.cache')

CACHE_NOTIFY = False


def factory(config):

global CACHE_NOTIFY

if not config:
return NullCache(None)

if not config.cache or not config.cache_period:
log.debug("Disabling cache")
if not CACHE_NOTIFY:
log.debug("Disabling cache")
CACHE_NOTIFY = True
return NullCache(config)
elif config.cache == 'memory':
log.debug("Using in-memory cache")
if not CACHE_NOTIFY:
log.debug("Using in-memory cache")
CACHE_NOTIFY = True
return InMemoryCache()

return FileCacheManager(config)
Expand All @@ -53,6 +62,9 @@ def get(self, key):
def save(self, key, data):
pass

def size(self):
return 0


class InMemoryCache(object):
# Running in a temporary environment, so keep as a cache.
Expand All @@ -71,6 +83,9 @@ def get(self, key):
def save(self, key, data):
self.data[pickle.dumps(key)] = data

def size(self):
return sum(map(len, self.data.values()))


class FileCacheManager(object):

Expand Down Expand Up @@ -118,3 +133,6 @@ def save(self, key, data):
except Exception as e:
log.warning("Could not create directory: %s err: %s" % (
directory, e))

def size(self):
return os.path.exists(self.cache_path) and os.path.getsize(self.cache_path) or 0
6 changes: 6 additions & 0 deletions c7n/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,11 @@ def setup_parser():
"-m", "--metrics-enabled",
default=None, nargs="?", const="aws",
help="Emit metrics to provider metrics")
run.add_argument(
"--trace",
dest="tracer",
help=argparse.SUPPRESS,
default=None, nargs="?", const="default")

return parser

Expand Down Expand Up @@ -326,6 +331,7 @@ def _setup_logger(options):
logging.getLogger('botocore').setLevel(external_log_level)
logging.getLogger('urllib3').setLevel(external_log_level)
logging.getLogger('s3transfer').setLevel(external_log_level)
logging.getLogger('urllib3').setLevel(logging.ERROR)


def main():
Expand Down
2 changes: 2 additions & 0 deletions c7n/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,8 @@ def logs(options, policies):
sys.exit(1)

policy = policies.pop()
# initialize policy execution context for access to outputs
policy.ctx.initialize()

for e in policy.get_logs(options.start, options.end):
print("%s: %s" % (
Expand Down
1 change: 1 addition & 0 deletions c7n/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ def empty(cls, **kw):
'assume_role': None,
'external_id': None,
'log_group': None,
'tracer': 'default',
'metrics_enabled': False,
'output_dir': '',
'cache_period': 0,
Expand Down
8 changes: 8 additions & 0 deletions c7n/credentials.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ def __init__(self, region, profile=None, assume_role=None, external_id=None):
if 'C7N_SESSION_SUFFIX' in os.environ:
self.session_name = "%s@%s" % (
self.session_name, os.environ['C7N_SESSION_SUFFIX'])
self._subscribers = []

def _set_policy_name(self, name):
self.user_agent_name = ("CloudCustodian(%s)" % name).strip()
Expand All @@ -56,8 +57,15 @@ def __call__(self, assume=True, region=None):

session._session.user_agent_name = self.user_agent_name
session._session.user_agent_version = version

for s in self._subscribers:
s(session)

return session

def set_subscribers(self, subscribers):
self._subscribers = subscribers


def assumed_session(role_arn, session_name, session=None, region=None, external_id=None):
"""STS Role assume a boto3.Session
Expand Down
121 changes: 88 additions & 33 deletions c7n/ctx.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright 2015-2017 Capital One Services, LLC
# Copyright 2015-2018 Capital One Services, LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -14,10 +14,20 @@
from __future__ import absolute_import, division, print_function, unicode_literals

import time
import uuid
import os

from c7n.output import FSOutput, MetricsOutput, CloudWatchLogOutput
from c7n.utils import reset_session_cache

from c7n.output import (
api_stats_outputs,
blob_outputs,
log_outputs,
metrics_outputs,
sys_stats_outputs,
tracer_outputs)

from c7n.utils import reset_session_cache, dumps
from c7n.version import version


class ExecutionContext(object):
Expand All @@ -27,47 +37,92 @@ def __init__(self, session_factory, policy, options):
self.policy = policy
self.options = options
self.session_factory = session_factory
self.cloudwatch_logs = None

# Runtime initialized during policy execution
# We treat policies as a fly weight pre-execution.
self.start_time = None
self.execution_id = None
self.output = None
self.api_stats = None
self.sys_stats = None

# A few tests patch on metrics flush
self.metrics = metrics_outputs.select(self.options.metrics_enabled, self)

metrics_enabled = getattr(options, 'metrics_enabled', None)
factory = MetricsOutput.select(metrics_enabled)
self.metrics = factory(self)
# Tracer is wired into core filtering code / which is getting
# invoked sans execution context entry in tests
self.tracer = tracer_outputs.select(self.options.tracer, self)

output_dir = getattr(options, 'output_dir', '')
if output_dir:
factory = FSOutput.select(output_dir)
self.output_path = factory.join(output_dir, policy.name)
self.output = factory(self)
else:
self.output_path = self.output = None
def initialize(self):
self.output = blob_outputs.select(self.options.output_dir, self)
self.logs = log_outputs.select(self.options.log_group, self)

if options.log_group:
self.cloudwatch_logs = CloudWatchLogOutput(self)
# Look for customizations, but fallback to default
for api_stats_type in (self.policy.provider_name, 'default'):
if api_stats_type in api_stats_outputs:
self.api_stats = api_stats_outputs.select(api_stats_type, self)
break
for sys_stats_type in ('psutil', 'default'):
if sys_stats_type in sys_stats_outputs:
self.sys_stats = sys_stats_outputs.select(sys_stats_type, self)
break

self.start_time = time.time()
self.execution_id = str(uuid.uuid4())

@property
def log_dir(self):
if self.output:
return self.output.root_dir
return self.output.root_dir

def __enter__(self):
if self.output:
self.output.__enter__()
if self.cloudwatch_logs:
self.cloudwatch_logs.__enter__()
self.start_time = time.time()
self.initialize()
self.session_factory.policy_name = self.policy.name
self.sys_stats.__enter__()
self.output.__enter__()
self.logs.__enter__()
self.api_stats.__enter__()
self.tracer.__enter__()
return self

def __exit__(self, exc_type=None, exc_value=None, exc_traceback=None):
self.metrics.flush()
# Clear policy execution thread local session cache if running in tests.
# IMPORTANT: multi-account execution (c7n-org and others) need to manually reset this.
# Why: Not doing this means we get excessive memory usage from client
# reconstruction.
if exc_type is not None and self.metrics:
self.metrics.put_metric('PolicyException', 1, "Count")
self.policy._write_file(
'metadata.json', dumps(self.get_metadata(), indent=2))
self.api_stats.__exit__(exc_type, exc_value, exc_traceback)

with self.tracer.subsegment('output'):
self.metrics.flush()
self.logs.__exit__(exc_type, exc_value, exc_traceback)
self.output.__exit__(exc_type, exc_value, exc_traceback)

self.tracer.__exit__()

self.session_factory.policy_name = None
# IMPORTANT: multi-account execution (c7n-org and others) need
# to manually reset this. Why: Not doing this means we get
# excessive memory usage from client reconstruction for dynamic-gen
# sdks.
if os.environ.get('C7N_TEST_RUN'):
reset_session_cache()
if self.cloudwatch_logs:
self.cloudwatch_logs.__exit__(exc_type, exc_value, exc_traceback)
self.cloudwatch_logs = None
if self.output:
self.output.__exit__(exc_type, exc_value, exc_traceback)

def get_metadata(self, include=('sys-stats', 'api-stats', 'metrics')):
t = time.time()
md = {
'policy': self.policy.data,
'version': version,
'execution': {
'id': self.execution_id,
'start': self.start_time,
'end_time': t,
'duration': t - self.start_time},
'config': dict(self.options)
}

if 'sys-stats' in include and self.sys_stats:
md['sys-stats'] = self.sys_stats.get_metadata()
if 'api-stats' in include and self.api_stats:
md['api-stats'] = self.api_stats.get_metadata()
if 'metrics' in include and self.metrics:
md['metrics'] = self.metrics.get_metadata()
return md
4 changes: 4 additions & 0 deletions c7n/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ class CustodianError(Exception):
"""


class InvalidOutputConfig(CustodianError):
"""Invalid configuration for an output"""


class PolicySyntaxError(CustodianError):
"""Policy Syntax Error
"""
Expand Down
4 changes: 3 additions & 1 deletion c7n/filters/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,9 @@ class MetricsFilter(Filter):
of calculated statistics as in the case of a stopped ec2 instance,
nor for resources to new to have existed the entire
period. ie. being stopped for an ec2 instance wouldn't lower the
average cpu utilization, nor would
average cpu utilization.
Note the default statistic for metrics is Average.
"""

schema = type_schema(
Expand Down
14 changes: 7 additions & 7 deletions c7n/filters/offhours.py
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@
import logging
from os.path import join

from dateutil import zoneinfo
from dateutil import zoneinfo, tz as tzutil

from c7n.exceptions import PolicyValidationError
from c7n.filters import Filter
Expand Down Expand Up @@ -351,16 +351,16 @@ def validate(self):

def process(self, resources, event=None):
resources = super(Time, self).process(resources)
if self.parse_errors and self.manager and self.manager.log_dir:
if self.parse_errors and self.manager and self.manager.ctx.log_dir:
self.log.warning("parse errors %d", len(self.parse_errors))
with open(join(
self.manager.log_dir, 'parse_errors.json'), 'w') as fh:
self.manager.ctx.log_dir, 'parse_errors.json'), 'w') as fh:
dumps(self.parse_errors, fh=fh)
self.parse_errors = []
if self.opted_out and self.manager and self.manager.log_dir:
if self.opted_out and self.manager and self.manager.ctx.log_dir:
self.log.debug("disabled count %d", len(self.opted_out))
with open(join(
self.manager.log_dir, 'opted_out.json'), 'w') as fh:
self.manager.ctx.log_dir, 'opted_out.json'), 'w') as fh:
dumps(self.opted_out, fh=fh)
self.opted_out = []
return resources
Expand Down Expand Up @@ -466,8 +466,8 @@ def get_tag_value(self, i):
def get_tz(cls, tz):
found = cls.TZ_ALIASES.get(tz)
if found:
return zoneinfo.gettz(found)
return zoneinfo.gettz(tz.title())
return tzutil.gettz(found)
return tzutil.gettz(tz.title())

def get_default_schedule(self):
raise NotImplementedError("use subclass")
Expand Down
Loading

0 comments on commit 48812eb

Please sign in to comment.