From 48812eb98072d2c0b8149f5ada65bb17568f005e Mon Sep 17 00:00:00 2001 From: Kapil Thangavelu Date: Thu, 4 Oct 2018 14:37:51 -0400 Subject: [PATCH] core - refactor outputs add policy api/processs stats and tracing (#2844) --- Makefile | 27 +- c7n/cache.py | 22 +- c7n/cli.py | 6 + c7n/commands.py | 2 + c7n/config.py | 1 + c7n/credentials.py | 8 + c7n/ctx.py | 121 +++-- c7n/exceptions.py | 4 + c7n/filters/metrics.py | 4 +- c7n/filters/offhours.py | 14 +- c7n/manager.py | 22 +- c7n/mu.py | 80 ++- c7n/output.py | 464 +++++++++++------- c7n/policy.py | 8 +- c7n/query.py | 10 +- c7n/registry.py | 10 +- c7n/reports/csvout.py | 4 +- c7n/resources/asg.py | 6 +- c7n/resources/aws.py | 306 +++++++++++- c7n/resources/awslambda.py | 89 ++-- c7n/resources/dynamodb.py | 88 ++-- c7n/resources/s3.py | 10 +- c7n/tags.py | 12 +- c7n/testing.py | 42 +- requirements-dev.txt | 2 + .../monitoring.PutMetricData_1.json | 16 + .../xray.PutTraceSegments_1.json | 18 + .../dax.ListTags_1.json | 19 +- .../dax.ListTags_1.json | 2 +- tests/test_asg.py | 4 +- tests/test_aws.py | 70 +++ tests/test_cache.py | 37 +- tests/test_manager.py | 24 +- tests/test_metric_tz.py | 46 -- tests/test_offhours.py | 94 ++-- tests/test_output.py | 45 +- tests/test_policy.py | 24 +- tests/test_rds.py | 2 +- tests/test_redshift.py | 4 +- tests/test_registry.py | 62 +++ tests/test_s3.py | 5 +- tools/c7n_azure/c7n_azure/output.py | 10 +- .../c7n_azure/resources/access_control.py | 6 +- tools/c7n_azure/tests/test_output.py | 8 +- tools/c7n_gcp/c7n_gcp/output.py | 21 +- tools/c7n_gcp/tests/test_output_gcp.py | 4 +- tools/c7n_org/c7n_org/cli.py | 4 +- 47 files changed, 1278 insertions(+), 609 deletions(-) create mode 100644 tests/data/placebo/output-aws-metrics/monitoring.PutMetricData_1.json create mode 100644 tests/data/placebo/output-xray-trace/xray.PutTraceSegments_1.json create mode 100644 tests/test_aws.py delete mode 100644 tests/test_metric_tz.py create mode 100644 tests/test_registry.py diff --git a/Makefile b/Makefile index aba15e0b6b3..4478d21a97c 100644 --- a/Makefile +++ b/Makefile @@ -1,6 +1,6 @@ install: - python3.6 -m virtualenv --python python3.6 . + python3 -m venv . . bin/activate && pip install -r requirements-dev.txt . bin/activate && pip install -e . . bin/activate && pip install -r tools/c7n_mailer/requirements.txt @@ -8,34 +8,11 @@ install: . bin/activate && pip install -r tools/c7n_gcp/requirements.txt . bin/activate && pip install -r tools/c7n_kube/requirements.txt -coverage: - rm -Rf .coverage - AWS_DEFAULT_REGION=us-east-1 AWS_ACCESS_KEY_ID=foo AWS_SECRET_ACCESS_KEY=bar C7N_VALIDATE=true nosetests -s -v --with-coverage --cover-html --cover-package=c7n --cover-html-dir=coverage --processes=-1 --cover-inclusive tests --process-timeout=64 - test: ./bin/tox -e py27 test3: - ./bin/tox -e py36 - -nose-tests: - AWS_DEFAULT_REGION=us-east-1 AWS_ACCESS_KEY_ID=foo AWS_SECRET_ACCESS_KEY=bar C7N_VALIDATE=true nosetests -s -v --processes=-1 --process-timeout=300 tests - - -azure-tests: - - C7N_VALIDATE=true AZURE_ACCESS_TOKEN=fake_token AZURE_SUBSCRIPTION_ID=ea42f556-5106-4743-99b0-c129bfa71a47 ./bin/py.test --tb=native tools/c7n_azure - -ttest: - AWS_DEFAULT_REGION=us-east-1 nosetests -s --with-timer --process-timeout=300 tests - -depcache: - mkdir -p deps - python -m virtualenv --python python2.7 dep-download - dep-download/bin/pip install -d deps -r requirements.txt - tar cvf custodian-deps.tgz deps - rm -Rf dep-download - rm -Rf deps + ./bin/tox -e py37 ftest: C7N_FUNCTIONAL=yes AWS_DEFAULT_REGION=us-east-2 ./bin/py.test -m functional tests diff --git a/c7n/cache.py b/c7n/cache.py index f39274dcb6f..fd202cfb8cf 100644 --- a/c7n/cache.py +++ b/c7n/cache.py @@ -24,16 +24,25 @@ log = logging.getLogger('custodian.cache') +CACHE_NOTIFY = False + def factory(config): + + global CACHE_NOTIFY + if not config: return NullCache(None) if not config.cache or not config.cache_period: - log.debug("Disabling cache") + if not CACHE_NOTIFY: + log.debug("Disabling cache") + CACHE_NOTIFY = True return NullCache(config) elif config.cache == 'memory': - log.debug("Using in-memory cache") + if not CACHE_NOTIFY: + log.debug("Using in-memory cache") + CACHE_NOTIFY = True return InMemoryCache() return FileCacheManager(config) @@ -53,6 +62,9 @@ def get(self, key): def save(self, key, data): pass + def size(self): + return 0 + class InMemoryCache(object): # Running in a temporary environment, so keep as a cache. @@ -71,6 +83,9 @@ def get(self, key): def save(self, key, data): self.data[pickle.dumps(key)] = data + def size(self): + return sum(map(len, self.data.values())) + class FileCacheManager(object): @@ -118,3 +133,6 @@ def save(self, key, data): except Exception as e: log.warning("Could not create directory: %s err: %s" % ( directory, e)) + + def size(self): + return os.path.exists(self.cache_path) and os.path.getsize(self.cache_path) or 0 diff --git a/c7n/cli.py b/c7n/cli.py index b7837482db0..9c05a27afdb 100644 --- a/c7n/cli.py +++ b/c7n/cli.py @@ -293,6 +293,11 @@ def setup_parser(): "-m", "--metrics-enabled", default=None, nargs="?", const="aws", help="Emit metrics to provider metrics") + run.add_argument( + "--trace", + dest="tracer", + help=argparse.SUPPRESS, + default=None, nargs="?", const="default") return parser @@ -326,6 +331,7 @@ def _setup_logger(options): logging.getLogger('botocore').setLevel(external_log_level) logging.getLogger('urllib3').setLevel(external_log_level) logging.getLogger('s3transfer').setLevel(external_log_level) + logging.getLogger('urllib3').setLevel(logging.ERROR) def main(): diff --git a/c7n/commands.py b/c7n/commands.py index 3eeafd9edbc..49101098d26 100644 --- a/c7n/commands.py +++ b/c7n/commands.py @@ -266,6 +266,8 @@ def logs(options, policies): sys.exit(1) policy = policies.pop() + # initialize policy execution context for access to outputs + policy.ctx.initialize() for e in policy.get_logs(options.start, options.end): print("%s: %s" % ( diff --git a/c7n/config.py b/c7n/config.py index ea2ac31e344..45792f5532d 100644 --- a/c7n/config.py +++ b/c7n/config.py @@ -40,6 +40,7 @@ def empty(cls, **kw): 'assume_role': None, 'external_id': None, 'log_group': None, + 'tracer': 'default', 'metrics_enabled': False, 'output_dir': '', 'cache_period': 0, diff --git a/c7n/credentials.py b/c7n/credentials.py index 69d10a8e1d9..e0e6aafd506 100644 --- a/c7n/credentials.py +++ b/c7n/credentials.py @@ -38,6 +38,7 @@ def __init__(self, region, profile=None, assume_role=None, external_id=None): if 'C7N_SESSION_SUFFIX' in os.environ: self.session_name = "%s@%s" % ( self.session_name, os.environ['C7N_SESSION_SUFFIX']) + self._subscribers = [] def _set_policy_name(self, name): self.user_agent_name = ("CloudCustodian(%s)" % name).strip() @@ -56,8 +57,15 @@ def __call__(self, assume=True, region=None): session._session.user_agent_name = self.user_agent_name session._session.user_agent_version = version + + for s in self._subscribers: + s(session) + return session + def set_subscribers(self, subscribers): + self._subscribers = subscribers + def assumed_session(role_arn, session_name, session=None, region=None, external_id=None): """STS Role assume a boto3.Session diff --git a/c7n/ctx.py b/c7n/ctx.py index 0a9ba4f7fb8..a5d07333e1d 100644 --- a/c7n/ctx.py +++ b/c7n/ctx.py @@ -1,4 +1,4 @@ -# Copyright 2015-2017 Capital One Services, LLC +# Copyright 2015-2018 Capital One Services, LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -14,10 +14,20 @@ from __future__ import absolute_import, division, print_function, unicode_literals import time +import uuid import os -from c7n.output import FSOutput, MetricsOutput, CloudWatchLogOutput -from c7n.utils import reset_session_cache + +from c7n.output import ( + api_stats_outputs, + blob_outputs, + log_outputs, + metrics_outputs, + sys_stats_outputs, + tracer_outputs) + +from c7n.utils import reset_session_cache, dumps +from c7n.version import version class ExecutionContext(object): @@ -27,47 +37,92 @@ def __init__(self, session_factory, policy, options): self.policy = policy self.options = options self.session_factory = session_factory - self.cloudwatch_logs = None + + # Runtime initialized during policy execution + # We treat policies as a fly weight pre-execution. self.start_time = None + self.execution_id = None + self.output = None + self.api_stats = None + self.sys_stats = None + + # A few tests patch on metrics flush + self.metrics = metrics_outputs.select(self.options.metrics_enabled, self) - metrics_enabled = getattr(options, 'metrics_enabled', None) - factory = MetricsOutput.select(metrics_enabled) - self.metrics = factory(self) + # Tracer is wired into core filtering code / which is getting + # invoked sans execution context entry in tests + self.tracer = tracer_outputs.select(self.options.tracer, self) - output_dir = getattr(options, 'output_dir', '') - if output_dir: - factory = FSOutput.select(output_dir) - self.output_path = factory.join(output_dir, policy.name) - self.output = factory(self) - else: - self.output_path = self.output = None + def initialize(self): + self.output = blob_outputs.select(self.options.output_dir, self) + self.logs = log_outputs.select(self.options.log_group, self) - if options.log_group: - self.cloudwatch_logs = CloudWatchLogOutput(self) + # Look for customizations, but fallback to default + for api_stats_type in (self.policy.provider_name, 'default'): + if api_stats_type in api_stats_outputs: + self.api_stats = api_stats_outputs.select(api_stats_type, self) + break + for sys_stats_type in ('psutil', 'default'): + if sys_stats_type in sys_stats_outputs: + self.sys_stats = sys_stats_outputs.select(sys_stats_type, self) + break + + self.start_time = time.time() + self.execution_id = str(uuid.uuid4()) @property def log_dir(self): - if self.output: - return self.output.root_dir + return self.output.root_dir def __enter__(self): - if self.output: - self.output.__enter__() - if self.cloudwatch_logs: - self.cloudwatch_logs.__enter__() - self.start_time = time.time() + self.initialize() + self.session_factory.policy_name = self.policy.name + self.sys_stats.__enter__() + self.output.__enter__() + self.logs.__enter__() + self.api_stats.__enter__() + self.tracer.__enter__() return self def __exit__(self, exc_type=None, exc_value=None, exc_traceback=None): - self.metrics.flush() - # Clear policy execution thread local session cache if running in tests. - # IMPORTANT: multi-account execution (c7n-org and others) need to manually reset this. - # Why: Not doing this means we get excessive memory usage from client - # reconstruction. + if exc_type is not None and self.metrics: + self.metrics.put_metric('PolicyException', 1, "Count") + self.policy._write_file( + 'metadata.json', dumps(self.get_metadata(), indent=2)) + self.api_stats.__exit__(exc_type, exc_value, exc_traceback) + + with self.tracer.subsegment('output'): + self.metrics.flush() + self.logs.__exit__(exc_type, exc_value, exc_traceback) + self.output.__exit__(exc_type, exc_value, exc_traceback) + + self.tracer.__exit__() + + self.session_factory.policy_name = None + # IMPORTANT: multi-account execution (c7n-org and others) need + # to manually reset this. Why: Not doing this means we get + # excessive memory usage from client reconstruction for dynamic-gen + # sdks. if os.environ.get('C7N_TEST_RUN'): reset_session_cache() - if self.cloudwatch_logs: - self.cloudwatch_logs.__exit__(exc_type, exc_value, exc_traceback) - self.cloudwatch_logs = None - if self.output: - self.output.__exit__(exc_type, exc_value, exc_traceback) + + def get_metadata(self, include=('sys-stats', 'api-stats', 'metrics')): + t = time.time() + md = { + 'policy': self.policy.data, + 'version': version, + 'execution': { + 'id': self.execution_id, + 'start': self.start_time, + 'end_time': t, + 'duration': t - self.start_time}, + 'config': dict(self.options) + } + + if 'sys-stats' in include and self.sys_stats: + md['sys-stats'] = self.sys_stats.get_metadata() + if 'api-stats' in include and self.api_stats: + md['api-stats'] = self.api_stats.get_metadata() + if 'metrics' in include and self.metrics: + md['metrics'] = self.metrics.get_metadata() + return md diff --git a/c7n/exceptions.py b/c7n/exceptions.py index 8a687c08ea0..fee1fb388a6 100644 --- a/c7n/exceptions.py +++ b/c7n/exceptions.py @@ -24,6 +24,10 @@ class CustodianError(Exception): """ +class InvalidOutputConfig(CustodianError): + """Invalid configuration for an output""" + + class PolicySyntaxError(CustodianError): """Policy Syntax Error """ diff --git a/c7n/filters/metrics.py b/c7n/filters/metrics.py index db80df0b1fb..e753fea3a12 100644 --- a/c7n/filters/metrics.py +++ b/c7n/filters/metrics.py @@ -50,7 +50,9 @@ class MetricsFilter(Filter): of calculated statistics as in the case of a stopped ec2 instance, nor for resources to new to have existed the entire period. ie. being stopped for an ec2 instance wouldn't lower the - average cpu utilization, nor would + average cpu utilization. + + Note the default statistic for metrics is Average. """ schema = type_schema( diff --git a/c7n/filters/offhours.py b/c7n/filters/offhours.py index 284a40dfaf4..e05c6084e77 100644 --- a/c7n/filters/offhours.py +++ b/c7n/filters/offhours.py @@ -240,7 +240,7 @@ import logging from os.path import join -from dateutil import zoneinfo +from dateutil import zoneinfo, tz as tzutil from c7n.exceptions import PolicyValidationError from c7n.filters import Filter @@ -351,16 +351,16 @@ def validate(self): def process(self, resources, event=None): resources = super(Time, self).process(resources) - if self.parse_errors and self.manager and self.manager.log_dir: + if self.parse_errors and self.manager and self.manager.ctx.log_dir: self.log.warning("parse errors %d", len(self.parse_errors)) with open(join( - self.manager.log_dir, 'parse_errors.json'), 'w') as fh: + self.manager.ctx.log_dir, 'parse_errors.json'), 'w') as fh: dumps(self.parse_errors, fh=fh) self.parse_errors = [] - if self.opted_out and self.manager and self.manager.log_dir: + if self.opted_out and self.manager and self.manager.ctx.log_dir: self.log.debug("disabled count %d", len(self.opted_out)) with open(join( - self.manager.log_dir, 'opted_out.json'), 'w') as fh: + self.manager.ctx.log_dir, 'opted_out.json'), 'w') as fh: dumps(self.opted_out, fh=fh) self.opted_out = [] return resources @@ -466,8 +466,8 @@ def get_tag_value(self, i): def get_tz(cls, tz): found = cls.TZ_ALIASES.get(tz) if found: - return zoneinfo.gettz(found) - return zoneinfo.gettz(tz.title()) + return tzutil.gettz(found) + return tzutil.gettz(tz.title()) def get_default_schedule(self): raise NotImplementedError("use subclass") diff --git a/c7n/manager.py b/c7n/manager.py index 04787ee3f91..1e58ca26479 100644 --- a/c7n/manager.py +++ b/c7n/manager.py @@ -17,6 +17,7 @@ from c7n import cache from c7n.executor import ThreadPoolExecutor +from c7n.provider import clouds from c7n.registry import PluginRegistry try: from c7n.resources.aws import AWS @@ -39,7 +40,6 @@ def __init__(self, ctx, data): self.session_factory = ctx.session_factory self.config = ctx.options self.data = data - self.log_dir = ctx.log_dir self._cache = cache.factory(self.ctx.options) self.log = logging.getLogger('custodian.resources.%s' % ( self.__class__.__name__.lower())) @@ -70,13 +70,24 @@ def resources(self): raise NotImplementedError("") def get_resource_manager(self, resource_type, data=None): - klass = resources.get(resource_type) + """get a resource manager or a given resource type. + + assumes the query is for the same underlying cloud provider. + """ + if '.' in resource_type: + provider_name, resource_type = resource_type.split('.', 1) + else: + provider_name = self.ctx.policy.provider_name + + provider_resources = clouds[provider_name].resources + klass = provider_resources.get(resource_type) if klass is None: raise ValueError(resource_type) + # if we're already querying via config carry it forward if not data and self.source_type == 'config' and getattr( klass.get_model(), 'config_type', None): - return klass(self.ctx, {'source': self.config_type}) + return klass(self.ctx, {'source': self.source_type}) return klass(self.ctx, data or {}) def filter_resources(self, resources, event=None): @@ -88,7 +99,10 @@ def filter_resources(self, resources, event=None): if not resources: break rcount = len(resources) - resources = f.process(resources, event) + + with self.ctx.tracer.subsegment("filter:%s" % f.type): + resources = f.process(resources, event) + if event and event.get('debug', False): self.log.debug( "applied filter %s %d->%d", f, rcount, len(resources)) diff --git a/c7n/mu.py b/c7n/mu.py index 43ea86a2be6..4d7ebfcadc6 100644 --- a/c7n/mu.py +++ b/c7n/mu.py @@ -298,9 +298,8 @@ def remove(self, func, alias=None): log.info("Removing lambda function %s", func.name) try: self.client.delete_function(FunctionName=func.name) - except ClientError as e: - if e.response['Error']['Code'] != 'ResourceNotFoundException': - raise + except self.client.exceptions.ResourceNotFoundException: + pass def metrics(self, funcs, start, end, period=5 * 60): @@ -335,18 +334,16 @@ def logs(self, func, start, end): try: logs.describe_log_groups( logGroupNamePrefix=group_name) - except ClientError as e: - if e.response['Error']['Code'] == 'ResourceNotFoundException': - return - raise + except logs.exceptions.ResourceNotFoundException: + pass + try: log_streams = logs.describe_log_streams( logGroupName=group_name, orderBy="LastEventTime", limit=3, descending=True) - except ClientError as e: - if e.response['Error']['Code'] == 'ResourceNotFoundException': - return - raise + except logs.exceptions.ResourceNotFoundException: + return + start = _timestamp_from_string(start) end = _timestamp_from_string(end) for s in reversed(log_streams['logStreams']): @@ -354,8 +351,7 @@ def logs(self, func, start, end): logGroupName=group_name, logStreamName=s['logStreamName'], startTime=start, - endTime=end, - ) + endTime=end) for e in result['events']: yield e @@ -965,17 +961,17 @@ def add(self, func): else: response = {'RuleArn': rule['Arn']} + client = self.session.client('lambda') try: - self.session.client('lambda').add_permission( + client.add_permission( FunctionName=func.name, StatementId=func.name, SourceArn=response['RuleArn'], Action='lambda:InvokeFunction', Principal='events.amazonaws.com') log.debug('Added lambda invoke cwe rule permission') - except ClientError as e: - if e.response['Error']['Code'] != 'ResourceConflictException': - raise + except client.exceptions.ResourceConflictException: + pass # Add Targets found = False @@ -1091,9 +1087,8 @@ def add(self, func): params['SourceArn'] = 'arn:aws:s3:::%' % self.bucket['Name'] try: lambda_client.add_permission(**params) - except ClientError as e: - if e.response['Error']['Code'] != 'ResourceConflictException': - raise + except lambda_client.exceptions.ResourceConflictException: + pass notifies.setdefault('LambdaFunctionConfigurations', []).append(n_params) s3.put_bucket_notification_configuration( @@ -1113,9 +1108,8 @@ def remove(self, func): FunctionName=func['FunctionName'], StatementId=self.bucket['Name']) log.debug("Removed lambda permission result: %s" % response) - except ClientError as e: - if e.response['Error']['Code'] != 'ResourceNotFoundException': - raise + except lambda_client.exceptions.ResourceNotFoundException: + pass notifies['LambdaFunctionConfigurations'].remove(found) s3.put_bucket_notification_configuration( @@ -1152,9 +1146,8 @@ def add(self, func): log.debug("Added lambda ipo nvoke log group permission") # iam eventual consistency and propagation time.sleep(self.iam_delay) - except ClientError as e: - if e.response['Error']['Code'] != 'ResourceConflictException': - raise + except lambda_client.exceptions.ResourceConflictException: + pass # Consistent put semantics / ie no op if extant self.client.put_subscription_filter( logGroupName=group['logGroupName'], @@ -1170,18 +1163,16 @@ def remove(self, func): FunctionName=func.name, StatementId=group['logGroupName'][1:].replace('/', '-')) log.debug("Removed lambda permission result: %s" % response) - except ClientError as e: - if e.response['Error']['Code'] != 'ResourceNotFoundException': - raise + except lambda_client.exceptions.ResourceNotFoundException: + pass try: response = self.client.delete_subscription_filter( logGroupName=group['logGroupName'], filterName=func.name) log.debug("Removed subscription filter from: %s", group['logGroupName']) - except ClientError as e: - if e.response['Error']['Code'] != 'ResourceNotFoundException': - raise + except lambda_client.exceptions.ResourceNotFoundException: + pass class SQSSubscription(object): @@ -1278,9 +1269,8 @@ def add(self, func): log.debug("Added permission for sns to invoke lambda") # iam eventual consistency and propagation time.sleep(self.iam_delay) - except ClientError as e: - if e.response['Error']['Code'] != 'ResourceConflictException': - raise + except lambda_client.exceptions.ResourceConflictException: + pass # Subscribe the lambda to the topic, idempotent sns_client = session.client('sns') @@ -1308,6 +1298,7 @@ def remove(self, func): class Done(Exception): pass + try: for page in paginator.paginate(TopicArn=topic_arn): for subscription in page['Subscriptions']: @@ -1318,10 +1309,8 @@ class Done(Exception): SubscriptionArn=subscription['SubscriptionArn']) log.debug("Unsubscribed %s from %s" % (func.name, topic_name)) - except ClientError as e: - code = e.response['Error']['Code'] - if code != 'ResourceNotFoundException': - raise + except sns_client.exceptions.NotFoundException: + pass raise Done # break out of both for loops except Done: pass @@ -1452,16 +1441,16 @@ def add(self, func): elif rule: log.debug("Config rule up to date") return + client = self.session.client('lambda') try: - self.session.client('lambda').add_permission( + client.add_permission( FunctionName=func.name, StatementId=func.name, SourceAccount=func.arn.split(':')[4], Action='lambda:InvokeFunction', Principal='config.amazonaws.com') - except ClientError as e: - if e.response['Error']['Code'] != 'ResourceConflictException': - raise + except client.exceptions.ResourceConflictException: + pass log.debug("Adding config rule for %s" % func.name) return self.client.put_config_rule(ConfigRule=params) @@ -1474,6 +1463,5 @@ def remove(self, func): try: self.client.delete_config_rule( ConfigRuleName=func.name) - except ClientError as e: - if e.response['Error']['Code'] != 'ResourceNotFoundException': - raise + except self.client.exceptions.NoSuchConfigRuleException: + pass diff --git a/c7n/output.py b/c7n/output.py index 11b76fb7318..48c2132fd66 100644 --- a/c7n/output.py +++ b/c7n/output.py @@ -12,74 +12,248 @@ # See the License for the specific language governing permissions and # limitations under the License. """ -Outputs metrics, logs, structured records across -a variety of sources. +Outputs metrics, logs, stats, traces, and structured records across +a variety of sinks. See docs/usage/outputs.rst """ from __future__ import absolute_import, division, print_function, unicode_literals -import datetime +import contextlib +from datetime import datetime +import json import gzip import logging +import os import shutil -import tempfile +import time +import uuid -import os +from c7n.exceptions import InvalidOutputConfig from c7n.registry import PluginRegistry -from c7n.log import CloudWatchLogHandler -from c7n.utils import chunks, local_session, parse_s3, get_retry -DEFAULT_NAMESPACE = "CloudMaid" +try: + import psutil + HAVE_PSUTIL = True +except ImportError: + HAVE_PSUTIL = False log = logging.getLogger('custodian.output') -metrics_outputs = PluginRegistry('c7n.blob-outputs') -blob_outputs = PluginRegistry('c7n.blob-outputs') +# TODO remove +DEFAULT_NAMESPACE = "CloudMaid" -@metrics_outputs.register('aws') -class MetricsOutput(object): - """Send metrics data to cloudwatch - """ +class OutputRegistry(PluginRegistry): - permissions = ("cloudWatch:PutMetricData",) + default_protocol = None - retry = staticmethod(get_retry(('Throttling',))) + def select(self, selector, ctx): + if not selector: + return self['default'](ctx, {'url': selector}) + if self.default_protocol and '://' not in selector: + selector = "{}://{}".format( + self.default_protocol, selector) + for k in self.keys(): + if selector.startswith(k): + return self[k](ctx, {'url': selector}) + raise InvalidOutputConfig("Invalid %s: %s" % ( + self.plugin_type, + selector)) + + +class BlobOutputRegistry(OutputRegistry): + + default_protocol = "file" - BUFFER_SIZE = 20 - @staticmethod - def select(metrics_selector): - if not metrics_selector: - return NullMetricsOutput +class LogOutputRegistry(OutputRegistry): + + default_protocol = "aws" + + +class MetricsRegistry(OutputRegistry): + + def select(self, selector, ctx): # Compatibility for boolean configuration - if isinstance(metrics_selector, bool): - metrics_selector = 'aws' - for k in metrics_outputs.keys(): - if k.startswith(metrics_selector): - return metrics_outputs[k] - raise ValueError("invalid metrics option %r" % metrics_selector) - - def __init__(self, ctx, namespace=DEFAULT_NAMESPACE): + if isinstance(selector, bool) and selector: + selector = 'aws' + return super(MetricsRegistry, self).select(selector, ctx) + + +api_stats_outputs = OutputRegistry('c7n.output.api_stats') +blob_outputs = BlobOutputRegistry('c7n.output.blob') +log_outputs = LogOutputRegistry('c7n.output.logs') +metrics_outputs = MetricsRegistry('c7n.output.metrics') +tracer_outputs = OutputRegistry('c7n.output.tracer') +sys_stats_outputs = OutputRegistry('c7n.output.sys_stats') + + +@tracer_outputs.register('default') +class NullTracer(object): + """Tracing provides for detailed analytics of a policy execution. + + Uses native cloud provider integration (xray, stack driver trace). + """ + def __init__(self, ctx, config=None): self.ctx = ctx - self.namespace = namespace - self.buf = [] + self.config = config or {} - def get_timestamp(self): + @contextlib.contextmanager + def subsegment(self, name): + """Create a named subsegment as a context manager """ - Now, if C7N_METRICS_TZ is set to TRUE, UTC timestamp will be used. - For backwards compatibility, if it is not set, UTC will be the default. - To disable this and use the system's time zone, C7N_METRICS_TZ shoule be set to FALSE. + yield self + + def __enter__(self): + """Enter main segment for policy execution. """ - if os.getenv("C7N_METRICS_TZ", 'TRUE').upper() in ('TRUE', ''): - return datetime.datetime.utcnow() - else: - return datetime.datetime.now() + def __exit__(self, exc_type=None, exc_value=None, exc_traceback=None): + """Exit main segment for policy execution. + """ + + +class DeltaStats(object): + """Capture stats (dictionary of string->integer) as a stack. + + Popping the stack automatically creates a delta of the last + stack element to the current stats. + """ + def __init__(self, ctx, config=None): + self.ctx = ctx + self.config = config or {} + self.snapshot_stack = [] + + def push_snapshot(self): + self.snapshot_stack.append(self.get_snapshot()) + + def pop_snapshot(self): + return self.delta( + self.snapshot_stack.pop(), self.get_snapshot()) + + def get_snapshot(self): + return {} + + def delta(self, before, after): + delta = {} + for k in before: + val = after[k] - before[k] + if val: + delta[k] = val + return delta + + +@sys_stats_outputs.register('default') +@api_stats_outputs.register('default') +class NullStats(object): + """Execution statistics/metrics collection. + + Encompasses concrete implementations over system stats (memory, cpu, cache size) + and api calls. + + The api supports stack nested snapshots, with delta consumption to support + tracing metadata annotation across nested subsegments. + """ + + def __init__(self, ctx, config=None): + self.ctx = ctx + self.config = config or {} + + def push_snapshot(self): + """Take a snapshot of the system stats and append to the stack.""" + + def pop_snapshot(self): + """Remove a snapshot from the snack and return a delta of the current stats to it. + """ + return {} + + def get_metadata(self): + """Return default of current to last snapshot, without popping. + """ + return {} + + def __enter__(self): + """Push a snapshot + """ + + def __exit__(self, exc_type=None, exc_value=None, exc_traceback=None): + """Pop a snapshot + """ + + +@sys_stats_outputs.register('psutil', condition=HAVE_PSUTIL) +class SystemStats(DeltaStats): + """Collect process statistics via psutil as deltas over policy execution. + """ + def __init__(self, ctx, config=None): + super(SystemStats, self).__init__(ctx, config) + self.process = psutil.Process(os.getpid()) + + def __enter__(self): + self.push_snapshot() + + def __exit__(self): + self.pop_snapshot() + + def get_metadata(self): + if self.snapshot_stack: + return self.delta(self.snapshot_stack[-1], self.get_snapshot()) + return self.get_snapshot() + + def get_snapshot(self): + snapshot = { + 'num_threads': self.process.num_threads(), + 'num_fds': self.process.num_fds(), + 'snapshot_time': time.time(), + 'cache_size': self.ctx.policy.get_cache().size() + } + + with self.process.oneshot(): + # simpler would be json.dumps(self.process.as_dict()), but + # that complicates delta diffing between snapshots. + cpu_time = self.process.cpu_times() + snapshot['cpu_user'] = cpu_time.user + snapshot['cpu_system'] = cpu_time.system + (snapshot['num_ctx_switches_voluntary'], + snapshot['num_ctx_switches_involuntary']) = self.process.num_ctx_switches() + # io counters ( not available on osx) + if getattr(self.process, 'io_counters', None): + io = self.process.io_counters() + for counter in ( + 'read_count', 'write_count', + 'write_bytes', 'read_bytes'): + snapshot[counter] = getattr(io, counter) + # memory counters + mem = self.process.memory_info() + for counter in ( + 'rss', 'vms', 'shared', 'text', 'data', 'lib', + 'pfaults', 'pageins'): + v = getattr(mem, counter, None) + if v is not None: + snapshot[counter] = v + return snapshot + + +class Metrics(object): + + permissions = () + namespace = DEFAULT_NAMESPACE + BUFFER_SIZE = 20 + + def __init__(self, ctx, config=None): + self.ctx = ctx + self.config = config + self.buf = [] + + def _format_metric(self, key, value, unit, dimensions): + raise NotImplementedError("subclass responsiblity") + + def _put_metrics(self, ns, metrics): + raise NotImplementedError("subclass responsiblity") def flush(self): if self.buf: @@ -91,15 +265,36 @@ def put_metric(self, key, value, unit, buffer=True, **dimensions): self.buf.append(point) if buffer: # Max metrics in a single request - if len(self.buf) == 20: + if len(self.buf) >= self.BUFFER_SIZE: self.flush() else: self.flush() + def get_metadata(self): + return list(self.buf) + + +@metrics_outputs.register('default') +class LogMetrics(Metrics): + """Default metrics collection. + + logs metrics, default handler should send to stderr + """ + def _put_metrics(self, ns, metrics): + for m in metrics: + if m['MetricName'] not in ('ActionTime', 'ResourceTime'): + log.debug(self.render_metric(m)) + + def render_metric(self, m): + label = "metric:%s %s:%s" % (m['MetricName'], m['Unit'], m['Value']) + for d in m['Dimensions']: + label += " %s:%s" % (d['Name'].lower(), d['Value'].lower()) + return label + def _format_metric(self, key, value, unit, dimensions): d = { "MetricName": key, - "Timestamp": self.get_timestamp(), + "Timestamp": datetime.now(), "Value": value, "Unit": unit} d["Dimensions"] = [ @@ -109,40 +304,23 @@ def _format_metric(self, key, value, unit, dimensions): d['Dimensions'].append({"Name": k, "Value": v}) return d - def _put_metrics(self, ns, metrics): - watch = local_session(self.ctx.session_factory).client('cloudwatch') - for metric_values in chunks(metrics, self.BUFFER_SIZE): - return self.retry( - watch.put_metric_data, Namespace=ns, MetricData=metrics) - - -class NullMetricsOutput(MetricsOutput): - - permissions = () - - def __init__(self, ctx, namespace=DEFAULT_NAMESPACE): - super(NullMetricsOutput, self).__init__(ctx, namespace) - self.data = [] - - def _put_metrics(self, ns, metrics): - self.data.append({'Namespace': ns, 'MetricData': metrics}) - for m in metrics: - if m['MetricName'] not in ('ActionTime', 'ResourceTime'): - log.debug(self.format_metric(m)) - - def format_metric(self, m): - label = "metric:%s %s:%s" % (m['MetricName'], m['Unit'], m['Value']) - for d in m['Dimensions']: - label += " %s:%s" % (d['Name'].lower(), d['Value'].lower()) - return label + def get_metadata(self): + res = [] + for k in self.buf: + k = dict(k) + k.pop('Dimensions', None) + res.append(k) + return res class LogOutput(object): log_format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s' - def __init__(self, ctx): + def __init__(self, ctx, config=None): self.ctx = ctx + self.config = config or {} + self.handler = None def get_handler(self): raise NotImplementedError() @@ -171,45 +349,41 @@ def leave_log(self): self.handler.close() -class CloudWatchLogOutput(LogOutput): - - log_format = '%(asctime)s - %(levelname)s - %(name)s - %(message)s' +@log_outputs.register('default') +class LogFile(LogOutput): def get_handler(self): - return CloudWatchLogHandler( - log_group=self.ctx.options.log_group, - log_stream=self.ctx.policy.name, - session_factory=lambda x=None: self.ctx.session_factory( - assume=False)) + return logging.FileHandler( + os.path.join( + self.ctx.log_dir, 'custodian-run.log')) - def __repr__(self): - return "<%s to group:%s stream:%s>" % ( - self.__class__.__name__, - self.ctx.options.log_group, - self.ctx.policy.name) +@blob_outputs.register('file') +@blob_outputs.register('default') +class DirectoryOutput(object): -class FSOutput(LogOutput): + permissions = () - @staticmethod - def select(path): - for k in blob_outputs.keys(): - if path.startswith('%s://' % k): - return blob_outputs[k] - # Fall back local disk - return blob_outputs['file'] + def __init__(self, ctx, config): + self.ctx = ctx + self.config = config - @staticmethod - def join(*parts): - return os.path.join(*parts) + output_path = self.get_output_path(config['url']) + if output_path.startswith('file://'): + output_path = output_path[len('file://'):] - def __init__(self, ctx): - super(FSOutput, self).__init__(ctx) - self.root_dir = self.ctx.output_path or tempfile.mkdtemp() + self.root_dir = output_path + if self.root_dir and not os.path.exists(self.root_dir): + os.makedirs(self.root_dir) - def get_handler(self): - return logging.FileHandler( - os.path.join(self.root_dir, 'custodian-run.log')) + def __enter__(self): + return + + def __exit__(self, exc_type=None, exc_value=None, exc_traceback=None): + return + + def __repr__(self): + return "<%s to dir:%s>" % (self.__class__.__name__, self.root_dir) def compress(self): # Compress files individually so thats easy to walk them, without @@ -222,81 +396,29 @@ def compress(self): shutil.copyfileobj(sfh, zfh, length=2**15) os.remove(fp) + def get_output_path(self, output_url): + if '{' not in output_url: + return os.path.join(output_url, self.ctx.policy.name) + return output_url.format(**self.get_output_vars()) -@blob_outputs.register('file') -class DirectoryOutput(FSOutput): - - permissions = () - - def __init__(self, ctx): - super(DirectoryOutput, self).__init__(ctx) - if self.root_dir.startswith('file://'): - self.root_dir = self.root_dir[len('file://'):] - if self.ctx.output_path is not None: - if not os.path.exists(self.root_dir): - os.makedirs(self.root_dir) - - def __repr__(self): - return "<%s to dir:%s>" % (self.__class__.__name__, self.root_dir) - - -@blob_outputs.register('s3') -class S3Output(FSOutput): - """ - Usage: - - .. code-block:: python - - with S3Output(session_factory, 's3://bucket/prefix'): - log.info('xyz') # -> log messages sent to custodian-run.log.gz - - """ - - permissions = ('S3:PutObject',) + def get_output_vars(self): + data = { + 'account_id': self.ctx.options.account_id, + 'policy': self.ctx.policy.name, + 'now': datetime.utcnow(), + 'uuid': str(uuid.uuid4())} + return data - def __init__(self, ctx): - super(S3Output, self).__init__(ctx) - self.date_path = datetime.datetime.now().strftime('%Y/%m/%d/%H') - self.s3_path, self.bucket, self.key_prefix = parse_s3( - self.ctx.output_path) - self.root_dir = tempfile.mkdtemp() - self.transfer = None + def get_resource_set(self): + record_path = os.path.join(self.root_dir, 'resources.json') - def __repr__(self): - return "<%s to bucket:%s prefix:%s>" % ( - self.__class__.__name__, - self.bucket, - "%s/%s" % (self.key_prefix, self.date_path)) + if not os.path.exists(record_path): + return [] - @staticmethod - def join(*parts): - return "/".join([s.strip('/') for s in parts]) + mdate = datetime.fromtimestamp( + os.stat(record_path).st_ctime) - def __exit__(self, exc_type=None, exc_value=None, exc_traceback=None): - from boto3.s3.transfer import S3Transfer, TransferConfig - if exc_type is not None: - log.exception("Error while executing policy") - log.debug("Uploading policy logs") - self.leave_log() - self.compress() - self.transfer = S3Transfer( - self.ctx.session_factory(assume=False).client('s3'), - config=TransferConfig(use_threads=False)) - self.upload() - shutil.rmtree(self.root_dir) - log.debug("Policy Logs uploaded") - - def upload(self): - for root, dirs, files in os.walk(self.root_dir): - for f in files: - key = "%s/%s%s" % ( - self.key_prefix, - self.date_path, - "%s/%s" % ( - root[len(self.root_dir):], f)) - key = key.strip('/') - self.transfer.upload_file( - os.path.join(root, f), self.bucket, key, - extra_args={ - 'ACL': 'bucket-owner-full-control', - 'ServerSideEncryption': 'AES256'}) + with open(record_path) as fh: + records = json.load(fh) + [r.__setitem__('CustodianDate', mdate) for r in records] + return records diff --git a/c7n/policy.py b/c7n/policy.py index c98cb54ce29..f45a660a335 100644 --- a/c7n/policy.py +++ b/c7n/policy.py @@ -22,7 +22,6 @@ import os import time - import jmespath import six @@ -270,7 +269,8 @@ def run(self, *args, **kw): at = time.time() for a in self.policy.resource_manager.actions: s = time.time() - results = a.process(resources) + with self.policy.ctx.tracer.subsegment('action:%s' % a.type): + results = a.process(resources) self.policy.log.info( "policy: %s action: %s" " resources: %d" @@ -743,6 +743,9 @@ def max_resources_percent(self): def tags(self): return self.data.get('tags', ()) + def get_cache(self): + return self.resource_manager._cache + def get_execution_mode(self): exec_mode_type = self.data.get('mode', {'type': 'pull'}).get('type') exec_mode = execution[exec_mode_type] @@ -861,7 +864,6 @@ def get_permissions(self): def __call__(self): """Run policy in default mode""" mode = self.get_execution_mode() - self.session_factory.policy_name = self.name if self.options.dryrun: resources = PullMode(self).run() elif isinstance(mode, ServerlessExecutionMode): diff --git a/c7n/query.py b/c7n/query.py index 68a3a6999a1..6c669856ef7 100644 --- a/c7n/query.py +++ b/c7n/query.py @@ -417,10 +417,16 @@ def resources(self, query=None): if query is None: query = {} - resources = self.augment(self.source.resources(query)) + with self.ctx.tracer.subsegment('resource-fetch'): + resources = self.source.resources(query) + with self.ctx.tracer.subsegment('resource-augment'): + resources = self.augment(resources) + resource_count = len(resources) self._cache.save(key, resources) - resources = self.filter_resources(resources) + + with self.ctx.tracer.subsegment('filter'): + resources = self.filter_resources(resources) # Check if we're out of a policies execution limits. if self.data == self.ctx.policy.data: diff --git a/c7n/registry.py b/c7n/registry.py index bb781742f5f..7ffe324b983 100644 --- a/c7n/registry.py +++ b/c7n/registry.py @@ -59,7 +59,10 @@ def subscribe(self, event, func): raise ValueError('Invalid event') self._subscribers[event].append(func) - def register(self, name, klass=None): + def register(self, name, klass=None, condition=True, + condition_message="Missing dependency for {}"): + if not condition and klass: + return klass # invoked as function if klass: klass.type = name @@ -69,6 +72,8 @@ def register(self, name, klass=None): # invoked as class decorator def _register_class(klass): + if not condition: + return klass self._factories[name] = klass klass.type = name self.notify(self.EVENT_REGISTER, klass) @@ -83,6 +88,9 @@ def notify(self, event, key=None): for subscriber in self._subscribers[event]: subscriber(self, key) + def __contains__(self, key): + return key in self._factories + def __getitem__(self, name): return self.get(name) diff --git a/c7n/reports/csvout.py b/c7n/reports/csvout.py index 5c85d6fac41..4e98cdd6e17 100644 --- a/c7n/reports/csvout.py +++ b/c7n/reports/csvout.py @@ -77,6 +77,8 @@ def report(policies, start_date, options, output_fh, raw_output_fh=None): records = [] for policy in policies: + # initialize policy execution context for output access + policy.ctx.initialize() if policy.ctx.output.type == 's3': policy_records = record_set( policy.session_factory, @@ -84,7 +86,7 @@ def report(policies, start_date, options, output_fh, raw_output_fh=None): policy.ctx.output.key_prefix, start_date) else: - policy_records = fs_record_set(policy.ctx.output_path, policy.name) + policy_records = fs_record_set(policy.ctx.log_dir, policy.name) log.debug("Found %d records for region %s", len(policy_records), policy.options.region) diff --git a/c7n/resources/asg.py b/c7n/resources/asg.py index abbe54ec86c..6499d367d72 100644 --- a/c7n/resources/asg.py +++ b/c7n/resources/asg.py @@ -19,7 +19,7 @@ from concurrent.futures import as_completed from datetime import datetime, timedelta -from dateutil import zoneinfo +from dateutil import tz as tzutil from dateutil.parser import parse import logging @@ -1410,7 +1410,7 @@ class MarkForOp(Tag): 'AutoScaleGroup does not meet org policy: {op}@{action_date}') def validate(self): - self.tz = zoneinfo.gettz( + self.tz = tzutil.gettz( Time.TZ_ALIASES.get(self.data.get('tz', 'utc'))) if not self.tz: raise PolicyValidationError( @@ -1418,7 +1418,7 @@ def validate(self): return self def process(self, asgs): - self.tz = zoneinfo.gettz( + self.tz = tzutil.gettz( Time.TZ_ALIASES.get(self.data.get('tz', 'utc'))) msg_tmpl = self.data.get('message', self.default_template) diff --git a/c7n/resources/aws.py b/c7n/resources/aws.py index 7bdab62c33a..2a44ef4b041 100644 --- a/c7n/resources/aws.py +++ b/c7n/resources/aws.py @@ -14,23 +14,62 @@ from c7n.provider import clouds +from collections import Counter +import contextlib import copy +import datetime import itertools import logging +import os +import shutil import sys +import tempfile +import time +import traceback import boto3 from c7n.credentials import SessionFactory +from c7n.config import Bag +from c7n.log import CloudWatchLogHandler + +# Import output registries aws provider extends. +from c7n.output import ( + api_stats_outputs, + blob_outputs, + log_outputs, + metrics_outputs, + tracer_outputs +) + +# Output base implementations we extend. +from c7n.output import ( + Metrics, + DeltaStats, + DirectoryOutput, + LogOutput, +) + from c7n.registry import PluginRegistry -from c7n import utils +from c7n import credentials, utils log = logging.getLogger('custodian.aws') +try: + from aws_xray_sdk.core import xray_recorder, patch + from aws_xray_sdk.core.context import Context + from aws_xray_sdk.core.sampling.local.sampler import LocalSampler + HAVE_XRAY = True +except ImportError: + HAVE_XRAY = False + class Context: pass # NOQA _profile_session = None +DEFAULT_NAMESPACE = "CloudMaid" + + def get_profile_session(options): global _profile_session if _profile_session: @@ -78,6 +117,271 @@ def _default_account_id(options): options.account_id = None +@metrics_outputs.register('aws') +class MetricsOutput(Metrics): + """Send metrics data to cloudwatch + """ + + permissions = ("cloudWatch:PutMetricData",) + retry = staticmethod(utils.get_retry(('Throttling',))) + + def __init__(self, ctx, config=None): + super(MetricsOutput, self).__init__(ctx, config) + self.namespace = self.config.get('namespace', DEFAULT_NAMESPACE) + + def _format_metric(self, key, value, unit, dimensions): + d = { + "MetricName": key, + "Timestamp": datetime.datetime.utcnow(), + "Value": value, + "Unit": unit} + d["Dimensions"] = [ + {"Name": "Policy", "Value": self.ctx.policy.name}, + {"Name": "ResType", "Value": self.ctx.policy.resource_type}] + for k, v in dimensions.items(): + d['Dimensions'].append({"Name": k, "Value": v}) + return d + + def _put_metrics(self, ns, metrics): + watch = utils.local_session(self.ctx.session_factory).client('cloudwatch') + return self.retry( + watch.put_metric_data, Namespace=ns, MetricData=metrics) + + +@log_outputs.register('aws') +class CloudWatchLogOutput(LogOutput): + + log_format = '%(asctime)s - %(levelname)s - %(name)s - %(message)s' + + def get_handler(self): + return CloudWatchLogHandler( + log_group=self.ctx.options.log_group, + log_stream=self.ctx.policy.name, + session_factory=lambda x=None: self.ctx.session_factory( + assume=False)) + + def __repr__(self): + return "<%s to group:%s stream:%s>" % ( + self.__class__.__name__, + self.ctx.options.log_group, + self.ctx.policy.name) + + +class XrayEmitter(object): + + def __init__(self): + self.buf = [] + self.client = None + + def send_entity(self, entity): + self.buf.append(entity) + if len(self.buf) > 49: + self.flush() + + def flush(self): + buf = self.buf + self.buf = [] + for segment_set in utils.chunks(buf, 50): + self.client.put_trace_segments( + TraceSegmentDocuments=[ + s.serialize() for s in segment_set]) + + +class XrayContext(Context): + + def __init__(self, *args, **kw): + super(XrayContext, self).__init__(*args, **kw) + self.sampler = LocalSampler() + # We want process global semantics as policy execution + # can span threads. + self._local = Bag() + self._current_subsegment = None + + def handle_context_missing(self): + """Custodian has a few api calls out of band of policy execution. + + - Resolving account alias. + - Cloudwatch Log group/stream discovery/creation (when using -l on cli) + + Also we want to folks to optionally based on configuration using xray + so default to disabling context missing output. + """ + + +@tracer_outputs.register('xray', condition=HAVE_XRAY) +class XrayTracer(object): + + emitter = XrayEmitter() + + in_lambda = 'LAMBDA_TASK_ROOT' in os.environ + use_daemon = 'AWS_XRAY_DAEMON_ADDRESS' in os.environ + service_name = 'custodian' + + context = XrayContext() + if HAVE_XRAY: + xray_recorder.configure( + emitter=use_daemon is False and emitter or None, + context=context, + sampling=True, + context_missing='LOG_ERROR' + ) + patch(['boto3', 'requests']) + logging.getLogger('aws_xray_sdk.core').setLevel(logging.ERROR) + + def __init__(self, ctx, config): + self.ctx = ctx + self.config = config or {} + self.client = None + self.metadata = {} + + @contextlib.contextmanager + def subsegment(self, name): + segment = xray_recorder.begin_subsegment(name) + try: + yield segment + except Exception as e: + stack = traceback.extract_stack(limit=xray_recorder.max_trace_back) + segment.add_exception(e, stack) + raise + finally: + xray_recorder.end_subsegment(time.time()) + + def __enter__(self): + if self.client is None: + self.client = self.ctx.session_factory(assume=False).client('xray') + + self.emitter.client = self.client + + if self.in_lambda: + self.segment = xray_recorder.begin_subsegment(self.service_name) + else: + self.segment = xray_recorder.begin_segment( + self.service_name, sampling=True) + + p = self.ctx.policy + xray_recorder.put_annotation('policy', p.name) + xray_recorder.put_annotation('resource', p.resource_type) + if self.ctx.options.account_id: + xray_recorder.put_annotation('account', self.ctx.options.account_id) + + def __exit__(self, exc_type=None, exc_value=None, exc_traceback=None): + metadata = self.ctx.get_metadata(('api-stats',)) + metadata.update(self.metadata) + xray_recorder.put_metadata('custodian', metadata) + if self.in_lambda: + xray_recorder.end_subsegment() + return + xray_recorder.end_segment() + if not self.use_daemon: + self.emitter.flush() + self.metadata.clear() + + +@api_stats_outputs.register('aws') +class ApiStats(DeltaStats): + + def __init__(self, ctx, config=None): + super(ApiStats, self).__init__(ctx, config) + self.api_calls = Counter() + + def get_snapshot(self): + return dict(self.api_calls) + + def get_metadata(self): + return self.get_snapshot() + + def __enter__(self): + if isinstance(self.ctx.session_factory, credentials.SessionFactory): + self.ctx.session_factory.set_subscribers((self,)) + self.push_snapshot() + + def __exit__(self, exc_type=None, exc_value=None, exc_traceback=None): + if isinstance(self.ctx.session_factory, credentials.SessionFactory): + self.ctx.session_factory.set_subscribers(()) + self.ctx.metrics.put_metric( + "ApiCalls", sum(self.api_calls.values()), "Count") + self.ctx.policy._write_file( + 'api-stats.json', utils.dumps(dict(self.api_calls))) + self.pop_snapshot() + + def __call__(self, s): + s.events.register( + 'after-call.*.*', self._record, unique_id='c7n-api-stats') + + def _record(self, http_response, parsed, model, **kwargs): + self.api_calls["%s.%s" % ( + model.service_model.endpoint_prefix, + model.name)] += 1 + + +@blob_outputs.register('s3') +class S3Output(DirectoryOutput): + """ + Usage: + + .. code-block:: python + + with S3Output(session_factory, 's3://bucket/prefix'): + log.info('xyz') # -> log messages sent to custodian-run.log.gz + + """ + + permissions = ('S3:PutObject',) + + def __init__(self, ctx, config): + self.ctx = ctx + self.config = config + self.output_path = self.get_output_path(self.config['url']) + self.s3_path, self.bucket, self.key_prefix = utils.parse_s3( + self.output_path) + self.root_dir = tempfile.mkdtemp() + self.transfer = None + + def __repr__(self): + return "<%s to bucket:%s prefix:%s>" % ( + self.__class__.__name__, + self.bucket, + self.key_prefix) + + def get_output_path(self, output_url): + if '{' not in output_url: + date_path = datetime.datetime.now().strftime('%Y/%m/%d/%H') + return self.join( + output_url, self.ctx.policy.name, date_path) + return output_url.format(**self.get_output_vars()) + + @staticmethod + def join(*parts): + return "/".join([s.strip('/') for s in parts]) + + def __exit__(self, exc_type=None, exc_value=None, exc_traceback=None): + from boto3.s3.transfer import S3Transfer + if exc_type is not None: + log.exception("Error while executing policy") + log.debug("Uploading policy logs") + self.leave_log() + self.compress() + self.transfer = S3Transfer( + self.ctx.session_factory(assume=False).client('s3')) + self.upload() + shutil.rmtree(self.root_dir) + log.debug("Policy Logs uploaded") + + def upload(self): + for root, dirs, files in os.walk(self.root_dir): + for f in files: + key = "%s%s" % ( + self.key_prefix, + "%s/%s" % ( + root[len(self.root_dir):], f)) + key = key.strip('/') + self.transfer.upload_file( + os.path.join(root, f), self.bucket, key, + extra_args={ + 'ACL': 'bucket-owner-full-control', + 'ServerSideEncryption': 'AES256'}) + + @clouds.register('aws') class AWS(object): diff --git a/c7n/resources/awslambda.py b/c7n/resources/awslambda.py index b2e7100ebb0..cfe1e1d272b 100644 --- a/c7n/resources/awslambda.py +++ b/c7n/resources/awslambda.py @@ -19,6 +19,7 @@ import six from botocore.exceptions import ClientError +from concurrent.futures import as_completed from c7n.actions import ActionRegistry, BaseAction, RemovePolicyBase from c7n.filters import CrossAccountAccessFilter, FilterRegistry, ValueFilter @@ -33,6 +34,8 @@ actions = ActionRegistry('lambda.actions') filters.register('marked-for-op', TagActionFilter) +ErrAccessDenied = "AccessDeniedException" + @resources.register('lambda') class AWSLambda(query.QueryResourceManager): @@ -145,7 +148,7 @@ def _augment(r): client.get_function, FunctionName=r['FunctionArn']) r[self.annotation_key].pop('ResponseMetadata') except ClientError as e: - if e.response['Error']['Code'] == 'AccessDeniedException': + if e.response['Error']['Code'] == ErrAccessDenied: self.log.warning( "Access denied getting lambda:%s", r['FunctionName']) @@ -157,6 +160,42 @@ def _augment(r): return super(ReservedConcurrency, self).process(resources, event) +def get_lambda_policies(client, executor_factory, resources, log): + + def _augment(r): + try: + r['c7n:Policy'] = client.get_policy( + FunctionName=r['FunctionName'])['Policy'] + except client.exceptions.ResourceNotFoundException: + return None + except ClientError as e: + if e.response['Error']['Code'] == 'AccessDeniedException': + log.warning( + "Access denied getting policy lambda:%s", + r['FunctionName']) + return r + + results = [] + futures = {} + + with executor_factory(max_workers=3) as w: + for r in resources: + if 'c7n:Policy' in r: + results.append(r) + continue + futures[w.submit(_augment, r)] = r + + for f in as_completed(futures): + if f.exception(): + log.warning("Error getting policy for:%s err:%s", + r['FunctionName'], f.exception()) + r = futures[f] + continue + results.append(f.result()) + + return filter(None, results) + + @filters.register('event-source') class LambdaEventSource(ValueFilter): # this uses iam policy, it should probably use @@ -167,28 +206,12 @@ class LambdaEventSource(ValueFilter): permissions = ('lambda:GetPolicy',) def process(self, resources, event=None): - def _augment(r): - if 'c7n:Policy' in r: - return - client = local_session( - self.manager.session_factory).client('lambda') - try: - r['c7n:Policy'] = client.get_policy( - FunctionName=r['FunctionName'])['Policy'] - return r - except ClientError as e: - if e.response['Error']['Code'] == 'AccessDeniedException': - self.log.warning( - "Access denied getting policy lambda:%s", - r['FunctionName']) - raise - + client = local_session(self.manager.session_factory).client('lambda') self.log.debug("fetching policy for %d lambdas" % len(resources)) + resources = get_lambda_policies( + client, self.executor_factory, resources, self.log) self.data['key'] = self.annotation_key - - with self.executor_factory(max_workers=3) as w: - resources = list(filter(None, w.map(_augment, resources))) - return super(LambdaEventSource, self).process(resources, event) + return super(LambdaEventSource, self).process(resources, event) def __call__(self, r): if 'c7n:Policy' not in r: @@ -205,9 +228,6 @@ def __call__(self, r): return self.match(r) -ErrAccessDenied = "AccessDeniedException" - - @filters.register('cross-account') class LambdaCrossAccountAccessFilter(CrossAccountAccessFilter): """Filters lambda functions with cross-account permissions @@ -236,25 +256,10 @@ class LambdaCrossAccountAccessFilter(CrossAccountAccessFilter): policy_attribute = 'c7n:Policy' def process(self, resources, event=None): - - client = local_session( - self.manager.session_factory).client('lambda') - - def _augment(r): - try: - r['c7n:Policy'] = client.get_policy( - FunctionName=r['FunctionName'])['Policy'] - return r - except ClientError as e: - if e.response['Error']['Code'] == ErrAccessDenied: - self.log.warning( - "Access denied getting policy lambda:%s", - r['FunctionName']) - + client = local_session(self.manager.session_factory).client('lambda') self.log.debug("fetching policy for %d lambdas" % len(resources)) - with self.executor_factory(max_workers=3) as w: - resources = list(filter(None, w.map(_augment, resources))) - + resources = get_lambda_policies( + client, self.executor_factory, resources, self.log) return super(LambdaCrossAccountAccessFilter, self).process( resources, event) diff --git a/c7n/resources/dynamodb.py b/c7n/resources/dynamodb.py index 09931b072af..59aaad3d379 100644 --- a/c7n/resources/dynamodb.py +++ b/c7n/resources/dynamodb.py @@ -229,8 +229,7 @@ class DeleteTable(BaseAction, StatusFilter): schema = type_schema('delete') permissions = ("dynamodb:DeleteTable",) - def delete_table(self, table_set): - client = local_session(self.manager.session_factory).client('dynamodb') + def delete_table(self, client, table_set): for t in table_set: client.delete_table(TableName=t['TableName']) @@ -241,10 +240,11 @@ def process(self, resources): return futures = [] + client = local_session(self.manager.session_factory).client('dynamodb') with self.executor_factory(max_workers=2) as w: for table_set in chunks(resources, 20): - futures.append(w.submit(self.delete_table, table_set)) + futures.append(w.submit(self.delete_table, client, table_set)) for f in as_completed(futures): if f.exception(): self.log.error( @@ -504,20 +504,15 @@ def process_tags(r): try: tags = retry( client.list_tags, ResourceName=r['ClusterArn'])['Tags'] - except ClientError as e: - if e.response['Error']['Code'] in ( - 'ClusterNotFoundFault', - 'InvalidARNFault', - 'InvalidClusterStateFault'): - log.warning('Exception collecting tags for %s: \n%s' % ( - r['ClusterName'], e)) - else: - raise + except (client.exceptions.ClusterNotFoundFault, + client.exceptions.InvalidARNFault, + client.exceptions.InvalidClusterStateFault): + return None r['Tags'] = tags return r with executor_factory(max_workers=2) as w: - return list(w.map(process_tags, tables)) + return filter(None, list(w.map(process_tags, tables))) @DynamoDbAccelerator.filter_registry.register('security-group') @@ -551,15 +546,10 @@ def process_resource_set(self, resources, tags): for r in resources: try: client.tag_resource(ResourceName=r[self.id_key], Tags=tags) - except ClientError as e: - if e.response['Error']['Code'] in ( - 'ClusterNotFoundFault', - 'InvalidClusterStateFault', - 'InvalidARNFault'): - self.log.warning('Exception tagging %s: \n%s' % ( - r['ClusterName'], e)) - continue - raise + except (client.exceptions.ClusterNotFoundFault, + client.exceptions.InvalidARNFault, + client.exceptions.InvalidClusterStateFault) as e: + self.log.warning('Exception tagging %s: \n%s', r['ClusterName'], e) @DynamoDbAccelerator.action_registry.register('remove-tag') @@ -587,16 +577,11 @@ def process_resource_set(self, resources, tag_keys): try: client.untag_resource( ResourceName=r['ClusterArn'], TagKeys=tag_keys) - except ClientError as e: - if e.response['Error']['Code'] in ( - 'ClusterNotFoundFault', - 'InvalidARNFault', - 'InvalidClusterStateFault', - 'TagNotFoundFault'): - self.log.warning('Exception removing tags on %s: \n%s' % ( - r['ClusterName'], e)) - continue - raise + except (client.exceptions.ClusterNotFoundFault, + client.exceptions.InvalidARNFault, + client.exceptions.TagNotFoundFault, + client.exceptions.InvalidClusterStateFault) as e: + self.log.warning('Exception removing tags on %s: \n%s', r['ClusterName'], e) @DynamoDbAccelerator.action_registry.register('mark-for-op') @@ -627,15 +612,10 @@ def process_resource_set(self, resources, tags): for r in resources: try: client.tag_resource(ResourceName=r[self.id_key], Tags=tags) - except ClientError as e: - if e.response['Error']['Code'] in ( - 'ClusterNotFoundFault', - 'InvalidARNFault', - 'InvalidClusterStateFault'): - self.log.warning( - 'Exception marking %s: \n%s' % (r['ClusterName'], e)) - continue - raise + except (client.exceptions.ClusterNotFoundFault, + client.exceptions.InvalidARNFault, + client.exceptions.InvalidClusterStateFault) as e: + self.log.warning('Exception marking %s: \n%s', r['ClusterName'], e) @DynamoDbAccelerator.action_registry.register('delete') @@ -662,14 +642,10 @@ def process(self, resources): for r in resources: try: client.delete_cluster(ClusterName=r['ClusterName']) - except ClientError as e: - if e.response['Error']['Code'] in ( - 'ClusterNotFoundFault', - 'InvalidClusterStateFault'): - self.log.warning('Exception marking %s: \n%s' % ( - r['ClusterName'], e)) - continue - raise + except (client.exceptions.ClusterNotFoundFault, + client.exceptions.InvalidARNFault, + client.exceptions.InvalidClusterStateFault) as e: + self.log.warning('Exception marking %s: \n%s', r['ClusterName'], e) @DynamoDbAccelerator.action_registry.register('update-cluster') @@ -711,15 +687,11 @@ def process(self, resources): params['ClusterName'] = r['ClusterName'] try: client.update_cluster(**params) - except ClientError as e: - if e.response['Error']['Code'] in ( - 'ClusterNotFoundFault', - 'InvalidClusterStateFault'): - self.log.warning( - 'Exception updating dax cluster %s: \n%s' % ( - r['ClusterName'], e)) - continue - raise + except (client.exceptions.ClusterNotFoundFault, + client.exceptions.InvalidClusterStateFault) as e: + self.log.warning( + 'Exception updating dax cluster %s: \n%s', + r['ClusterName'], e) @DynamoDbAccelerator.action_registry.register('modify-security-groups') diff --git a/c7n/resources/s3.py b/c7n/resources/s3.py index d7805784881..37ff40eb55f 100644 --- a/c7n/resources/s3.py +++ b/c7n/resources/s3.py @@ -100,10 +100,6 @@ class resource_type(object): filter_registry = filters action_registry = actions - def __init__(self, ctx, data): - super(S3, self).__init__(ctx, data) - self.log_dir = ctx.log_dir - def get_source(self, source_type): if source_type == 'describe': return DescribeS3(self) @@ -1601,10 +1597,10 @@ def _process_with_futures(self, helper, buckets, max_workers=3): return results def write_denied_buckets_file(self): - if self.denied_buckets and self.manager.log_dir: + if self.denied_buckets and self.manager.ctx.log_dir: with open( os.path.join( - self.manager.log_dir, 'denied.json'), 'w') as fh: + self.manager.ctx.log_dir, 'denied.json'), 'w') as fh: json.dump(list(self.denied_buckets), fh, indent=2) self.denied_buckets = set() @@ -1622,7 +1618,7 @@ def process_bucket(self, b): p = s3.get_paginator( self.get_bucket_op(b, 'iterator')).paginate(Bucket=b['Name']) - with BucketScanLog(self.manager.log_dir, b['Name']) as key_log: + with BucketScanLog(self.manager.ctx.log_dir, b['Name']) as key_log: with self.executor_factory(max_workers=10) as w: try: return self._process_bucket(b, p, key_log, w) diff --git a/c7n/tags.py b/c7n/tags.py index 04d7ad17c6c..a6d688467b6 100644 --- a/c7n/tags.py +++ b/c7n/tags.py @@ -24,7 +24,7 @@ from concurrent.futures import as_completed from datetime import datetime, timedelta -from dateutil import zoneinfo +from dateutil import tz as tzutil from dateutil.parser import parse import itertools @@ -274,7 +274,7 @@ def validate(self): raise PolicyValidationError( "Invalid marked-for-op op:%s in %s" % (op, self.manager.data)) - tz = zoneinfo.gettz(Time.TZ_ALIASES.get(self.data.get('tz', 'utc'))) + tz = tzutil.gettz(Time.TZ_ALIASES.get(self.data.get('tz', 'utc'))) if not tz: raise PolicyValidationError( "Invalid timezone specified '%s' in %s" % ( @@ -286,7 +286,7 @@ def __call__(self, i): op = self.data.get('op', 'stop') skew = self.data.get('skew', 0) skew_hours = self.data.get('skew_hours', 0) - tz = zoneinfo.gettz(Time.TZ_ALIASES.get(self.data.get('tz', 'utc'))) + tz = tzutil.gettz(Time.TZ_ALIASES.get(self.data.get('tz', 'utc'))) v = None for n in i.get('Tags', ()): @@ -604,7 +604,7 @@ def validate(self): "mark-for-op specifies invalid op:%s in %s" % ( op, self.manager.data)) - self.tz = zoneinfo.gettz( + self.tz = tzutil.gettz( Time.TZ_ALIASES.get(self.data.get('tz', 'utc'))) if not self.tz: raise PolicyValidationError( @@ -626,7 +626,7 @@ def generate_timestamp(self, days, hours): return action_date_string def process(self, resources): - self.tz = zoneinfo.gettz( + self.tz = tzutil.gettz( Time.TZ_ALIASES.get(self.data.get('tz', 'utc'))) self.id_key = self.manager.get_model().id @@ -875,7 +875,7 @@ class UniversalTagDelayedAction(TagDelayedAction): permissions = ('resourcegroupstaggingapi:TagResources',) def process(self, resources): - self.tz = zoneinfo.gettz( + self.tz = tzutil.gettz( Time.TZ_ALIASES.get(self.data.get('tz', 'utc'))) self.id_key = self.manager.get_model().id diff --git a/c7n/testing.py b/c7n/testing.py index 33a965483f9..718d62b919f 100644 --- a/c7n/testing.py +++ b/c7n/testing.py @@ -14,6 +14,7 @@ from __future__ import absolute_import, division, print_function, unicode_literals import json +import datetime import io import logging import os @@ -21,6 +22,8 @@ import tempfile import unittest + +import mock import six import yaml @@ -83,8 +86,8 @@ def get_context(self, config=None, session_factory=None, policy=None): self.context_output_dir = self.get_temp_dir() config = Config.empty(output_dir=self.context_output_dir) ctx = ExecutionContext( - session_factory, policy or Bag({"name": "test-policy"}), config - ) + session_factory, policy or Bag({ + "name": "test-policy", "provider_name": "aws"}), config) return ctx def load_policy( @@ -197,3 +200,38 @@ def write(self, b): if not isinstance(b, six.text_type): b = b.decode("utf8") return super(TextTestIO, self).write(b) + + +# Per http://blog.xelnor.net/python-mocking-datetime/ +# naive implementation has issues with pypy + +real_datetime_class = datetime.datetime + + +def mock_datetime_now(tgt, dt): + + class DatetimeSubclassMeta(type): + + @classmethod + def __instancecheck__(mcs, obj): + return isinstance(obj, real_datetime_class) + + class BaseMockedDatetime(real_datetime_class): + target = tgt + + @classmethod + def now(cls, tz=None): + return cls.target.replace(tzinfo=tz) + + @classmethod + def utcnow(cls): + return cls.target + + # Python2 & Python3 compatible metaclass + + MockedDatetime = DatetimeSubclassMeta( + b"datetime" if str is bytes else "datetime", # hack Python2/3 port + (BaseMockedDatetime,), + {}, + ) + return mock.patch.object(dt, "datetime", MockedDatetime) diff --git a/requirements-dev.txt b/requirements-dev.txt index 8b9ac1e8a9a..d0ef135d9df 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -15,6 +15,8 @@ pytest>=3.2.1 pytest-xdist>=1.2.0 pytest-cov tox +psutil +aws_xray_sdk # Local package required for Sphinx docs -e tools/c7n_sphinxext diff --git a/tests/data/placebo/output-aws-metrics/monitoring.PutMetricData_1.json b/tests/data/placebo/output-aws-metrics/monitoring.PutMetricData_1.json new file mode 100644 index 00000000000..b194bfa7232 --- /dev/null +++ b/tests/data/placebo/output-aws-metrics/monitoring.PutMetricData_1.json @@ -0,0 +1,16 @@ +{ + "status_code": 200, + "data": { + "ResponseMetadata": { + "RequestId": "a35e5750-c6b1-11e8-ab29-b7f794e29cd1", + "HTTPStatusCode": 200, + "HTTPHeaders": { + "x-amzn-requestid": "a35e5750-c6b1-11e8-ab29-b7f794e29cd1", + "content-type": "text/xml", + "content-length": "212", + "date": "Wed, 03 Oct 2018 02:11:28 GMT" + }, + "RetryAttempts": 0 + } + } +} \ No newline at end of file diff --git a/tests/data/placebo/output-xray-trace/xray.PutTraceSegments_1.json b/tests/data/placebo/output-xray-trace/xray.PutTraceSegments_1.json new file mode 100644 index 00000000000..f4e876052d8 --- /dev/null +++ b/tests/data/placebo/output-xray-trace/xray.PutTraceSegments_1.json @@ -0,0 +1,18 @@ +{ + "status_code": 200, + "data": { + "ResponseMetadata": { + "RequestId": "e5fd9656-c6b2-11e8-a512-43815a788243", + "HTTPStatusCode": 200, + "HTTPHeaders": { + "date": "Wed, 03 Oct 2018 02:20:30 GMT", + "content-type": "application/json", + "content-length": "31", + "connection": "keep-alive", + "x-amzn-requestid": "e5fd9656-c6b2-11e8-a512-43815a788243" + }, + "RetryAttempts": 0 + }, + "UnprocessedTraceSegments": [] + } +} \ No newline at end of file diff --git a/tests/data/placebo/test_dax_subnet_group_filter/dax.ListTags_1.json b/tests/data/placebo/test_dax_subnet_group_filter/dax.ListTags_1.json index 7445889c46b..b8f67578600 100644 --- a/tests/data/placebo/test_dax_subnet_group_filter/dax.ListTags_1.json +++ b/tests/data/placebo/test_dax_subnet_group_filter/dax.ListTags_1.json @@ -1,20 +1,17 @@ { - "status_code": 400, + "status_code": 200, "data": { "ResponseMetadata": { "RetryAttempts": 0, - "HTTPStatusCode": 400, - "RequestId": "d929ba21-5dbe-11e8-999e-454883316b27", + "HTTPStatusCode": 200, + "RequestId": "7bdf0b6b-5782-11e8-8d38-2b6f8936bddb", "HTTPHeaders": { - "x-amzn-requestid": "d929ba21-5dbe-11e8-999e-454883316b27", - "date": "Tue, 22 May 2018 12:51:29 GMT", - "content-length": "93", + "x-amzn-requestid": "7bdf0b6b-5782-11e8-8d38-2b6f8936bddb", + "date": "Mon, 14 May 2018 14:24:16 GMT", + "content-length": "11", "content-type": "application/x-amz-json-1.1" } }, - "Error": { - "Message": "c7n-test is either not present or not available", - "Code": "ClusterNotFoundFault" - } + "Tags": [] } -} \ No newline at end of file +} diff --git a/tests/data/placebo/test_dax_update_security_groups/dax.ListTags_1.json b/tests/data/placebo/test_dax_update_security_groups/dax.ListTags_1.json index 8a8ae6f6f70..b8f67578600 100644 --- a/tests/data/placebo/test_dax_update_security_groups/dax.ListTags_1.json +++ b/tests/data/placebo/test_dax_update_security_groups/dax.ListTags_1.json @@ -14,4 +14,4 @@ }, "Tags": [] } -} \ No newline at end of file +} diff --git a/tests/test_asg.py b/tests/test_asg.py index 92b5df9657e..08eb839b895 100644 --- a/tests/test_asg.py +++ b/tests/test_asg.py @@ -127,7 +127,7 @@ def test_get_bad_snapshot_malformed(self): } e = ClientError(error_response, operation_name) snap = NotEncryptedFilter.get_bad_snapshot(e) - self.assertEquals(snap, "snap-malformedsnap") + self.assertEqual(snap, "snap-malformedsnap") def test_get_bad_snapshot_notfound(self): operation_name = "DescribeSnapshots" @@ -139,7 +139,7 @@ def test_get_bad_snapshot_notfound(self): } e = ClientError(error_response, operation_name) snap = NotEncryptedFilter.get_bad_snapshot(e) - self.assertEquals(snap, "snap-notfound") + self.assertEqual(snap, "snap-notfound") def test_asg_image_age_filter(self): factory = self.replay_flight_data("test_asg_image_age_filter") diff --git a/tests/test_aws.py b/tests/test_aws.py new file mode 100644 index 00000000000..a4eb0051a98 --- /dev/null +++ b/tests/test_aws.py @@ -0,0 +1,70 @@ + +import json + +from mock import Mock + +from c7n.config import Bag +from c7n.resources import aws +from c7n import output + +from .common import BaseTest + + +class TraceDoc(Bag): + + def serialize(self): + return json.dumps(dict(self)) + + +class OutputXrayTracerTest(BaseTest): + + def test_emitter(self): + emitter = aws.XrayEmitter() + emitter.client = m = Mock() + doc = TraceDoc({'good': 'morning'}) + emitter.send_entity(doc) + emitter.flush() + m.put_trace_segments.assert_called_with( + TraceSegmentDocuments=[doc.serialize()]) + + +class UtilTest(BaseTest): + + def test_default_account_id_assume(self): + config = Bag(assume_role='arn:aws:iam::644160558196:role/custodian-mu') + aws._default_account_id(config) + self.assertEqual(config.account_id, '644160558196') + + +class TracerTest(BaseTest): + + def test_tracer(self): + session_factory = self.replay_flight_data('output-xray-trace') + policy = Bag(name='test', resource_type='ec2') + ctx = Bag( + policy=policy, + session_factory=session_factory, + options=Bag(account_id='644160558196')) + ctx.get_metadata = lambda *args: {} + config = Bag() + tracer = aws.XrayTracer(ctx, config) + + with tracer: + try: + with tracer.subsegment('testing') as w: + raise ValueError() + except ValueError: + pass + self.assertNotEqual(w.cause, {}) + + +class OutputMetricsTest(BaseTest): + + def test_metrics(self): + session_factory = self.replay_flight_data('output-aws-metrics') + policy = Bag(name='test', resource_type='ec2') + ctx = Bag(session_factory=session_factory, policy=policy) + sink = output.metrics_outputs.select('aws', ctx) + self.assertTrue(isinstance(sink, aws.MetricsOutput)) + sink.put_metric('ResourceCount', 101, 'Count') + sink.flush() diff --git a/tests/test_cache.py b/tests/test_cache.py index 9c925c31885..74247cd04fb 100644 --- a/tests/test_cache.py +++ b/tests/test_cache.py @@ -14,7 +14,7 @@ from __future__ import absolute_import, division, print_function, unicode_literals from unittest import TestCase -from c7n import cache +from c7n import cache, config from argparse import Namespace from six.moves import cPickle as pickle import tempfile @@ -31,6 +31,25 @@ def test_factory(self): self.assertIsInstance(cache.factory(test_config), cache.NullCache) +class MemCacheTest(TestCase): + + def test_mem_factory(self): + self.assertEqual( + cache.factory(config.Bag(cache='memory', cache_period=5)).__class__, + cache.InMemoryCache) + + def test_get_set(self): + mem_cache = cache.InMemoryCache() + mem_cache.save({'region': 'us-east-1'}, {'hello': 'world'}) + self.assertEqual(mem_cache.size(), 1) + self.assertEqual(mem_cache.load(), True) + + mem_cache = cache.InMemoryCache() + self.assertEqual( + mem_cache.get({'region': 'us-east-1'}), + {'hello': 'world'}) + + class FileCacheManagerTest(TestCase): def setUp(self): @@ -68,8 +87,8 @@ def test_get(self): self.test_cache.data = pickle.loads(test_pickle) # assert - self.assertEquals(self.test_cache.get(self.test_key), self.test_value) - self.assertEquals(self.test_cache.get(self.bad_key), None) + self.assertEqual(self.test_cache.get(self.test_key), self.test_value) + self.assertEqual(self.test_cache.get(self.bad_key), None) def test_load(self): t = tempfile.NamedTemporaryFile(suffix=".cache") @@ -98,9 +117,9 @@ def test_save_exists(self, mock_dumps, mock_dump, mock_exists, mock_mkdir): self.assertTrue(mock_dump.called) # mkdir should NOT be called, but pickles should - self.assertEquals(mock_mkdir.call_count, 0) - self.assertEquals(mock_dump.call_count, 1) - self.assertEquals(mock_dumps.call_count, 1) + self.assertEqual(mock_mkdir.call_count, 0) + self.assertEqual(mock_dump.call_count, 1) + self.assertEqual(mock_dumps.call_count, 1) @mock.patch.object(cache.os, "makedirs") @mock.patch.object(cache.os.path, "exists") @@ -125,6 +144,6 @@ def test_save_doesnt_exists(self, mock_dumps, mock_dump, mock_exists, mock_mkdir self.assertTrue(mock_dump.called) # all 3 should be called once - self.assertEquals(mock_mkdir.call_count, 1) - self.assertEquals(mock_dump.call_count, 1) - self.assertEquals(mock_dumps.call_count, 1) + self.assertEqual(mock_mkdir.call_count, 1) + self.assertEqual(mock_dump.call_count, 1) + self.assertEqual(mock_dumps.call_count, 1) diff --git a/tests/test_manager.py b/tests/test_manager.py index 24e01624356..7882bb3b620 100644 --- a/tests/test_manager.py +++ b/tests/test_manager.py @@ -23,10 +23,32 @@ class TestEC2Manager(BaseTest): def get_manager(self, data, config=None, session_factory=None): ctx = ExecutionContext( - session_factory, Bag({"name": "test-policy"}), config or Config.empty() + session_factory, Bag( + {"name": "test-policy", 'provider_name': 'aws'}), config or Config.empty() ) return EC2(ctx, data) + def test_get_resource_manager(self): + p = self.load_policy( + {'resource': 'ec2', + 'name': 'instances'}) + self.assertEqual(p.resource_manager.get_resource_manager( + 'aws.lambda').type, 'lambda') + self.assertEqual(p.resource_manager.source_type, 'describe') + self.assertRaises( + ValueError, + p.resource_manager.get_resource_manager, + 'gcp.lambda') + + def test_source_propagate(self): + p = self.load_policy( + {'resource': 'ec2', + 'source': 'config', + 'name': 'instances'}) + manager = p.resource_manager.get_resource_manager( + 'aws.security-group') + self.assertEqual(manager.source_type, 'config') + def test_manager(self): ec2_mgr = self.get_manager( { diff --git a/tests/test_metric_tz.py b/tests/test_metric_tz.py deleted file mode 100644 index 2678158e804..00000000000 --- a/tests/test_metric_tz.py +++ /dev/null @@ -1,46 +0,0 @@ -# Copyright 2017 Capital One Services, LLC -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -from __future__ import absolute_import, division, print_function, unicode_literals - -import unittest - -from .common import BaseTest - -import datetime -from c7n.output import MetricsOutput - - -class metrics_log_test(BaseTest): - - def test_output_tz(self): - - output = MetricsOutput(None) - self.change_environment() - now = datetime.datetime.utcnow() - output_time = output.get_timestamp() - - self.assertEqual(now.hour, output_time.hour) - self.assertEqual(now.minute, output_time.minute) - - self.change_environment(C7N_METRICS_TZ="False") - now_utc = datetime.datetime.now() - - output_time = output.get_timestamp() - - self.assertEqual(now_utc.hour, output_time.hour) - self.assertEqual(now_utc.minute, output_time.minute) - - -if __name__ == "__main__": - unittest.main() diff --git a/tests/test_offhours.py b/tests/test_offhours.py index a743c7fe960..7ca25ba2060 100644 --- a/tests/test_offhours.py +++ b/tests/test_offhours.py @@ -17,49 +17,13 @@ import json import os -from dateutil import zoneinfo - -from mock import mock +from dateutil import tz as tzutil from .common import BaseTest, instance from c7n.exceptions import PolicyValidationError from c7n.filters.offhours import OffHour, OnHour, ScheduleParser, Time - - -# Per http://blog.xelnor.net/python-mocking-datetime/ -# naive implementation has issues with pypy - -real_datetime_class = datetime.datetime - - -def mock_datetime_now(tgt, dt): - - class DatetimeSubclassMeta(type): - - @classmethod - def __instancecheck__(mcs, obj): - return isinstance(obj, real_datetime_class) - - class BaseMockedDatetime(real_datetime_class): - target = tgt - - @classmethod - def now(cls, tz=None): - return cls.target.replace(tzinfo=tz) - - @classmethod - def utcnow(cls): - return cls.target - - # Python2 & Python3 compatible metaclass - - MockedDatetime = DatetimeSubclassMeta( - b"datetime" if str is bytes else "datetime", # hack Python2/3 port - (BaseMockedDatetime,), - {}, - ) - return mock.patch.object(dt, "datetime", MockedDatetime) +from c7n.testing import mock_datetime_now class OffHoursFilterTest(BaseTest): @@ -67,7 +31,7 @@ class OffHoursFilterTest(BaseTest): def test_offhours_records(self): session_factory = self.replay_flight_data("test_offhours_records") - t = datetime.datetime.now(zoneinfo.gettz("America/New_York")) + t = datetime.datetime.now(tzutil.gettz("America/New_York")) t = t.replace(year=2016, month=8, day=14, hour=19, minute=00) with mock_datetime_now(t, datetime): @@ -146,7 +110,7 @@ def test_process(self): day=1, hour=19, minute=5, - tzinfo=zoneinfo.gettz("America/New_York"), + tzinfo=tzutil.gettz("America/New_York"), ) with mock_datetime_now(t, datetime): self.assertEqual( @@ -162,7 +126,7 @@ def test_opt_out_behavior(self): day=1, hour=19, minute=5, - tzinfo=zoneinfo.gettz("America/New_York"), + tzinfo=tzutil.gettz("America/New_York"), ) f = OffHour({"opt-out": True}) @@ -191,7 +155,7 @@ def test_opt_in_behavior(self): day=1, hour=19, minute=5, - tzinfo=zoneinfo.gettz("America/New_York"), + tzinfo=tzutil.gettz("America/New_York"), ) f = OffHour({}) @@ -206,7 +170,7 @@ def test_opt_in_behavior(self): day=1, hour=7, minute=5, - tzinfo=zoneinfo.gettz("America/New_York"), + tzinfo=tzutil.gettz("America/New_York"), ) f = OnHour({}) @@ -223,7 +187,7 @@ def xtest_time_match_stops_after_skew(self): day=1, hour=hour, minute=5, - tzinfo=zoneinfo.gettz("America/New_York"), + tzinfo=tzutil.gettz("America/New_York"), ) i = instance(Tags=[{"Key": "maid_offhours", "Value": "tz=est"}]) f = OnHour({"skew": 1}) @@ -236,7 +200,7 @@ def xtest_time_match_stops_after_skew(self): self.assertEqual(results, [True, True, False, False]) def test_resource_schedule_error(self): - t = datetime.datetime.now(zoneinfo.gettz("America/New_York")) + t = datetime.datetime.now(tzutil.gettz("America/New_York")) t = t.replace(year=2015, month=12, day=1, hour=19, minute=5) f = OffHour({}) f.process_resource_schedule = lambda: False @@ -326,7 +290,7 @@ def test_offhour_weekend_support(self): self.assertEqual(results, [True, False, False, True]) def test_current_time_test(self): - t = datetime.datetime.now(zoneinfo.gettz("America/New_York")) + t = datetime.datetime.now(tzutil.gettz("America/New_York")) t = t.replace(year=2015, month=12, day=1, hour=19, minute=5) with mock_datetime_now(t, datetime): i = instance(Tags=[{"Key": "maid_offhours", "Value": "tz=est"}]) @@ -335,13 +299,13 @@ def test_current_time_test(self): self.assertEqual(p, "tz=est") tz = f.get_tz("est") self.assertTrue( - str(tz) in ("tzfile('US/Eastern')", "tzfile('America/New_York')") - ) + 'America/New_York' in str(tz) or + 'US/Eastern' in str(tz)) self.assertEqual(datetime.datetime.now(tz), t) self.assertEqual(t.hour, 19) def test_offhours_real_world_values(self): - t = datetime.datetime.now(zoneinfo.gettz("America/New_York")) + t = datetime.datetime.now(tzutil.gettz("America/New_York")) t = t.replace(year=2015, month=12, day=1, hour=19, minute=5) with mock_datetime_now(t, datetime): results = [ @@ -375,7 +339,7 @@ def test_offhours(self): day=1, hour=19, minute=5, - tzinfo=zoneinfo.gettz("America/New_York"), + tzinfo=tzutil.gettz("America/New_York"), ) with mock_datetime_now(t, datetime): i = instance(Tags=[{"Key": "maid_offhours", "Value": "tz=est"}]) @@ -388,7 +352,7 @@ def test_onhour(self): day=1, hour=7, minute=5, - tzinfo=zoneinfo.gettz("America/New_York"), + tzinfo=tzutil.gettz("America/New_York"), ) with mock_datetime_now(t, datetime): i = instance(Tags=[{"Key": "maid_offhours", "Value": "tz=est"}]) @@ -400,7 +364,7 @@ def test_cant_parse_tz(self): self.assertEqual(OffHour({})(i), False) def test_custom_offhours(self): - t = datetime.datetime.now(zoneinfo.gettz("America/New_York")) + t = datetime.datetime.now(tzutil.gettz("America/New_York")) t = t.replace(year=2016, month=5, day=26, hour=19, minute=00) results = [] @@ -427,7 +391,7 @@ def test_custom_offhours(self): self.assertEqual(results, [True, False]) def test_custom_onhours(self): - t = datetime.datetime.now(zoneinfo.gettz("America/New_York")) + t = datetime.datetime.now(tzutil.gettz("America/New_York")) t = t.replace(year=2016, month=5, day=26, hour=7, minute=00) results = [] @@ -454,7 +418,7 @@ def test_custom_onhours(self): self.assertEqual(results, [True, False]) def test_arizona_tz(self): - t = datetime.datetime.now(zoneinfo.gettz("America/New_York")) + t = datetime.datetime.now(tzutil.gettz("America/New_York")) t = t.replace(year=2016, month=5, day=26, hour=7, minute=00) with mock_datetime_now(t, datetime): i = instance( @@ -472,7 +436,7 @@ def test_arizona_tz(self): self.assertEqual(OnHour({})(i), False) def test_custom_bad_tz(self): - t = datetime.datetime.now(zoneinfo.gettz("America/New_York")) + t = datetime.datetime.now(tzutil.gettz("America/New_York")) t = t.replace(year=2016, month=5, day=26, hour=7, minute=00) with mock_datetime_now(t, datetime): i = instance( @@ -490,7 +454,7 @@ def test_custom_bad_tz(self): self.assertEqual(OnHour({})(i), False) def test_custom_bad_hours(self): - t = datetime.datetime.now(zoneinfo.gettz("America/New_York")) + t = datetime.datetime.now(tzutil.gettz("America/New_York")) t = t.replace(year=2016, month=5, day=26, hour=19, minute=00) # default error handling is to exclude the resource @@ -521,17 +485,17 @@ def test_custom_bad_hours(self): self.assertEqual(OffHour({})(i), False) def test_tz_only(self): - t = datetime.datetime.now(zoneinfo.gettz("America/New_York")) + t = datetime.datetime.now(tzutil.gettz("America/New_York")) t = t.replace(year=2016, month=5, day=26, hour=7, minute=00) with mock_datetime_now(t, datetime): i = instance(Tags=[{"Key": "maid_offhours", "Value": "tz=est"}]) self.assertEqual(OnHour({})(i), True) def test_tz_long_form_resolve(self): - pacific = zoneinfo.gettz("America/Los_Angeles") - nzt = zoneinfo.gettz("Pacific/Auckland") - gmt = zoneinfo.gettz("Etc/GMT") - easter_island = zoneinfo.gettz("Chile/EasterIsland") + pacific = tzutil.gettz("America/Los_Angeles") + nzt = tzutil.gettz("Pacific/Auckland") + gmt = tzutil.gettz("Etc/GMT") + easter_island = tzutil.gettz("Chile/EasterIsland") self.assertEqual( OnHour({}).get_tz('america/los_angeles'), pacific) @@ -549,14 +513,14 @@ def test_tz_long_form_resolve(self): easter_island) def test_empty_tag(self): - t = datetime.datetime.now(zoneinfo.gettz("America/New_York")) + t = datetime.datetime.now(tzutil.gettz("America/New_York")) t = t.replace(year=2016, month=5, day=26, hour=7, minute=00) with mock_datetime_now(t, datetime): i = instance(Tags=[{"Key": "maid_offhours", "Value": ""}]) self.assertEqual(OnHour({})(i), True) def test_on_tag(self): - t = datetime.datetime.now(zoneinfo.gettz("America/New_York")) + t = datetime.datetime.now(tzutil.gettz("America/New_York")) t = t.replace(year=2016, month=5, day=26, hour=7, minute=00) with mock_datetime_now(t, datetime): i = instance(Tags=[{"Key": "maid_offhours", "Value": "on"}]) @@ -676,7 +640,7 @@ def test_offhours_skip(self): day=1, hour=19, minute=5, - tzinfo=zoneinfo.gettz("America/New_York"), + tzinfo=tzutil.gettz("America/New_York"), ) with mock_datetime_now(t, datetime): i = instance(Tags=[{"Key": "maid_offhours", "Value": "tz=est"}]) @@ -694,7 +658,7 @@ def test_onhour_skip(self): day=1, hour=7, minute=5, - tzinfo=zoneinfo.gettz("America/New_York"), + tzinfo=tzutil.gettz("America/New_York"), ) with mock_datetime_now(t, datetime): i = instance(Tags=[{"Key": "maid_offhours", "Value": "tz=est"}]) diff --git a/tests/test_output.py b/tests/test_output.py index f613806244d..2257c73f4dc 100644 --- a/tests/test_output.py +++ b/tests/test_output.py @@ -13,15 +13,19 @@ # limitations under the License. from __future__ import absolute_import, division, print_function, unicode_literals +import datetime import gzip import logging import mock -import unittest import shutil import os +from dateutil.parser import parse as date_parse + from c7n.ctx import ExecutionContext -from c7n.output import S3Output, DirectoryOutput, MetricsOutput +from c7n.output import DirectoryOutput, LogFile, metrics_outputs +from c7n.resources.aws import S3Output, MetricsOutput +from c7n.testing import mock_datetime_now, TestUtils from .common import Bag, BaseTest, TestConfig as Config @@ -29,7 +33,8 @@ class MetricsTest(BaseTest): def test_boolean_config_compatibility(self): - self.assertEqual(MetricsOutput.select(True), MetricsOutput) + self.assertTrue( + isinstance(metrics_outputs.select(True, {}), MetricsOutput)) class DirOutputTest(BaseTest): @@ -37,7 +42,11 @@ class DirOutputTest(BaseTest): def get_dir_output(self, location): work_dir = self.change_cwd() return work_dir, DirectoryOutput( - ExecutionContext(None, Bag(name="xyz"), Config.empty(output_dir=location)) + ExecutionContext( + None, + Bag(name="xyz", provider_name="ostack"), + Config.empty(output_dir=location)), + {'url': location}, ) def test_dir_output(self): @@ -46,7 +55,7 @@ def test_dir_output(self): self.assertTrue(os.path.isdir(os.path.join(work_dir, "myoutput"))) -class S3OutputTest(unittest.TestCase): +class S3OutputTest(TestUtils): def test_path_join(self): @@ -57,27 +66,29 @@ def test_path_join(self): self.assertEqual(S3Output.join("s3://xyz/xyz/", "/bar/"), "s3://xyz/xyz/bar") def get_s3_output(self): + output_dir = "s3://cloud-custodian/policies" output = S3Output( ExecutionContext( None, - Bag(name="xyz"), - Config.empty(output_dir="s3://cloud-custodian/policies"), - ) - ) + Bag(name="xyz", provider_name="ostack"), + Config.empty(output_dir=output_dir)), + {'url': output_dir}) + self.addCleanup(shutil.rmtree, output.root_dir) return output def test_s3_output(self): output = self.get_s3_output() - self.assertEquals(output.type, "s3") + self.assertEqual(output.type, "s3") # Make sure __repr__ is defined name = str(output) self.assertIn("bucket:cloud-custodian", name) def test_join_leave_log(self): - output = self.get_s3_output() + temp_dir = self.get_temp_dir() + output = LogFile(Bag(log_dir=temp_dir), {}) output.join_log() l = logging.getLogger("custodian.s3") # NOQA @@ -93,7 +104,7 @@ def test_join_leave_log(self): # Reset logging.manager back to nose configured value l.manager.disable = v - with open(os.path.join(output.root_dir, "custodian-run.log")) as fh: + with open(os.path.join(temp_dir, "custodian-run.log")) as fh: content = fh.read().strip() self.assertTrue(content.endswith("hello world")) @@ -116,8 +127,10 @@ def test_compress(self): self.assertEqual(fh.read(), b"abc") def test_upload(self): - output = self.get_s3_output() - self.assertEqual(output.key_prefix, "/policies/xyz") + + with mock_datetime_now(date_parse('2018/09/01 13:00'), datetime): + output = self.get_s3_output() + self.assertEqual(output.key_prefix, "/policies/xyz/2018/09/01/13") with open(os.path.join(output.root_dir, "foo.txt"), "w") as fh: fh.write("abc") @@ -130,7 +143,7 @@ def test_upload(self): m.assert_called_with( fh.name, "cloud-custodian", - "policies/xyz/%s/foo.txt" % output.date_path, + "%s/foo.txt" % output.key_prefix.lstrip('/'), extra_args={"ACL": "bucket-owner-full-control", "ServerSideEncryption": "AES256"}, ) @@ -148,6 +161,6 @@ def test_sans_prefix(self): m.assert_called_with( fh.name, "cloud-custodian", - "policies/xyz/%s/foo.txt" % output.date_path, + "%s/foo.txt" % output.key_prefix.lstrip('/'), extra_args={"ACL": "bucket-owner-full-control", "ServerSideEncryption": "AES256"}, ) diff --git a/tests/test_policy.py b/tests/test_policy.py index 81f7559e5ea..1244d27b160 100644 --- a/tests/test_policy.py +++ b/tests/test_policy.py @@ -137,7 +137,7 @@ def test_resource_shadow_source_augment(self): if not getattr(v.resource_type, "config_type", None): continue - p = Bag({"name": "permcheck", "resource": k}) + p = Bag({"name": "permcheck", "resource": k, 'provider_name': 'aws'}) ctx = self.get_context(config=cfg, policy=p) mgr = v(ctx, p) @@ -171,8 +171,7 @@ def test_resource_permissions(self): cfg = Config.empty() for k, v in manager.resources.items(): - p = Bag({"name": "permcheck", "resource": k}) - + p = Bag({"name": "permcheck", "resource": k, 'provider_name': 'aws'}) ctx = self.get_context(config=cfg, policy=p) mgr = v(ctx, p) @@ -522,6 +521,7 @@ def test_policy_resource_limits_count(self): self.load_policy, policy, config=config, + validate=True, session_factory=session_factory ) @@ -754,7 +754,7 @@ def test_is_runnable_mismatch_region(self): config={'region': 'us-west-2', 'validate': True}, session_factory=None) pull_mode = policy.PullMode(p) - self.assertEquals(pull_mode.is_runnable(), False) + self.assertEqual(pull_mode.is_runnable(), False) def test_is_runnable_dates(self): p = self.load_policy( @@ -765,7 +765,7 @@ def test_is_runnable_dates(self): config={'validate': True}, session_factory=None) pull_mode = policy.PullMode(p) - self.assertEquals(pull_mode.is_runnable(), True) + self.assertEqual(pull_mode.is_runnable(), True) tomorrow_date = str(datetime.date(datetime.utcnow()) + timedelta(days=1)) p = self.load_policy( @@ -776,7 +776,7 @@ def test_is_runnable_dates(self): config={'validate': True}, session_factory=None) pull_mode = policy.PullMode(p) - self.assertEquals(pull_mode.is_runnable(), False) + self.assertEqual(pull_mode.is_runnable(), False) p = self.load_policy( {'name': 'good-end-date', @@ -786,7 +786,7 @@ def test_is_runnable_dates(self): config={'validate': True}, session_factory=None) pull_mode = policy.PullMode(p) - self.assertEquals(pull_mode.is_runnable(), True) + self.assertEqual(pull_mode.is_runnable(), True) p = self.load_policy( {'name': 'bad-end-date', @@ -796,7 +796,7 @@ def test_is_runnable_dates(self): config={'validate': True}, session_factory=None) pull_mode = policy.PullMode(p) - self.assertEquals(pull_mode.is_runnable(), False) + self.assertEqual(pull_mode.is_runnable(), False) p = self.load_policy( {'name': 'bad-start-end-date', @@ -807,7 +807,7 @@ def test_is_runnable_dates(self): config={'validate': True}, session_factory=None) pull_mode = policy.PullMode(p) - self.assertEquals(pull_mode.is_runnable(), False) + self.assertEqual(pull_mode.is_runnable(), False) def test_is_runnable_parse_dates(self): p = self.load_policy( @@ -818,7 +818,7 @@ def test_is_runnable_parse_dates(self): config={'validate': True}, session_factory=None) pull_mode = policy.PullMode(p) - self.assertEquals(pull_mode.is_runnable(), True) + self.assertEqual(pull_mode.is_runnable(), True) p = self.load_policy( {'name': 'parse-date-policy', @@ -828,7 +828,7 @@ def test_is_runnable_parse_dates(self): config={'validate': True}, session_factory=None) pull_mode = policy.PullMode(p) - self.assertEquals(pull_mode.is_runnable(), True) + self.assertEqual(pull_mode.is_runnable(), True) p = self.load_policy( {'name': 'parse-date-policy', @@ -838,7 +838,7 @@ def test_is_runnable_parse_dates(self): config={'validate': True}, session_factory=None) pull_mode = policy.PullMode(p) - self.assertEquals(pull_mode.is_runnable(), True) + self.assertEqual(pull_mode.is_runnable(), True) class GuardModeTest(BaseTest): diff --git a/tests/test_rds.py b/tests/test_rds.py index 129919f1739..0af55d74cca 100644 --- a/tests/test_rds.py +++ b/tests/test_rds.py @@ -707,7 +707,7 @@ def test_rds_db_subnetgroup_unused(self): resources = policy.run() - self.assertEquals(len(resources), 1, "Resources should be unused") + self.assertEqual(len(resources), 1, "Resources should be unused") class RDSSnapshotTest(BaseTest): diff --git a/tests/test_redshift.py b/tests/test_redshift.py index fae307a30cb..4c62718fb18 100644 --- a/tests/test_redshift.py +++ b/tests/test_redshift.py @@ -241,10 +241,10 @@ def test_redshift_vpc_routing(self): ClusterIdentifier=resources[0]["ClusterIdentifier"] ) cluster = response["Clusters"][0] - self.assertEquals( + self.assertEqual( cluster["ClusterIdentifier"], resources[0]["ClusterIdentifier"] ) - self.assertEquals(cluster["ClusterStatus"], "modifying") + self.assertEqual(cluster["ClusterStatus"], "modifying") self.assertTrue(cluster["PendingModifiedValues"]["EnhancedVpcRouting"]) def test_redshift_public_access(self): diff --git a/tests/test_registry.py b/tests/test_registry.py new file mode 100644 index 00000000000..131374724d8 --- /dev/null +++ b/tests/test_registry.py @@ -0,0 +1,62 @@ +# Copyright 2018 Capital One Services, LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from __future__ import absolute_import, division, print_function, unicode_literals + +import unittest + +from c7n.registry import PluginRegistry + + +class RegistryTest(unittest.TestCase): + + def test_unregister(self): + + registry = PluginRegistry('dummy') + registry.register('dust', klass=lambda: 1) + self.assertEqual(list(registry.keys()), ['dust']) + registry.unregister('dust') + + def test_event_subscriber(self): + + observed = [] + + def observer(*args): + observed.append(args) + + registry = PluginRegistry('dummy') + registry.subscribe(PluginRegistry.EVENT_REGISTER, observer) + + @registry.register('water') + class _plugin_impl: + pass + + self.assertEqual(observed[0], (registry, _plugin_impl)) + self.assertEqual(list(registry.keys()), ['water']) + self.assertRaises(ValueError, registry.subscribe, 'foo', observer) + + def test_condition(self): + + registry = PluginRegistry('dummy') + + @registry.register('mud', condition=False) + class _plugin_impl: + pass + + self.assertEqual(list(registry.keys()), []) + + def _plugin_impl_func(): + pass + + registry.register('concrete', _plugin_impl_func, condition=False) + self.assertEqual(list(registry.keys()), []) diff --git a/tests/test_s3.py b/tests/test_s3.py index 91d79d15e11..644f36802e8 100644 --- a/tests/test_s3.py +++ b/tests/test_s3.py @@ -612,6 +612,7 @@ def test_delete_bucket_with_failure(self): self.patch(s3.S3, "executor_factory", MainThreadExecutor) self.patch(s3.DeleteBucket, "executor_factory", MainThreadExecutor) self.patch(s3, "S3_AUGMENT_TABLE", []) + session_factory = self.replay_flight_data("test_s3_delete_bucket_with_failure") session = session_factory() client = session.client("s3") @@ -650,7 +651,7 @@ def test_delete_bucket_with_failure(self): self.assertIn(bname, buckets) # Make sure file got written - denied_file = os.path.join(p.resource_manager.log_dir, "denied.json") + denied_file = os.path.join(p.ctx.log_dir, "denied.json") self.assertIn(bname, open(denied_file).read()) # # Now delete it for real @@ -2541,7 +2542,7 @@ def test_encrypt_keys_aes256_sufficient(self): result = client.head_object(Bucket=bname, Key="testing-abc") self.assertTrue(result["ServerSideEncryption"] == "aws:kms") data = json.load( - open(os.path.join(p.ctx.output_path, "action-encryptextantkeys")) + open(os.path.join(p.ctx.output.root_dir, "action-encryptextantkeys")) ) self.assertEqual([{"Count": 2, "Remediated": 0, "Bucket": bname}], data) diff --git a/tools/c7n_azure/c7n_azure/output.py b/tools/c7n_azure/c7n_azure/output.py index 7e0fc94c36b..8c9a14bb428 100644 --- a/tools/c7n_azure/c7n_azure/output.py +++ b/tools/c7n_azure/c7n_azure/output.py @@ -25,11 +25,11 @@ from c7n_azure.storage_utils import StorageUtilities from c7n.utils import local_session -from c7n.output import FSOutput, blob_outputs +from c7n.output import DirectoryOutput, blob_outputs @blob_outputs.register('azure') -class AzureStorageOutput(FSOutput): +class AzureStorageOutput(DirectoryOutput): """ Usage: @@ -40,13 +40,13 @@ class AzureStorageOutput(FSOutput): """ - def __init__(self, ctx): - super(AzureStorageOutput, self).__init__(ctx) + def __init__(self, ctx, config=None): + super(AzureStorageOutput, self).__init__(ctx, config) self.log = logging.getLogger('custodian.output') self.date_path = datetime.datetime.now().strftime('%Y/%m/%d/%H') self.root_dir = tempfile.mkdtemp() self.blob_service, self.container, self.file_prefix = \ - self.get_blob_client_wrapper(self.ctx.output_path, ctx) + self.get_blob_client_wrapper(self.root_dir, ctx) def __exit__(self, exc_type=None, exc_value=None, exc_traceback=None): if exc_type is not None: diff --git a/tools/c7n_azure/c7n_azure/resources/access_control.py b/tools/c7n_azure/c7n_azure/resources/access_control.py index 8bff1c9af84..52ac57c7457 100644 --- a/tools/c7n_azure/c7n_azure/resources/access_control.py +++ b/tools/c7n_azure/c7n_azure/resources/access_control.py @@ -24,8 +24,6 @@ from c7n_azure.utils import GraphHelper from c7n.actions import BaseAction -from c7n.config import Config -from c7n.ctx import ExecutionContext from c7n.filters import Filter from c7n.filters import FilterValidationError from c7n.filters import ValueFilter @@ -181,9 +179,7 @@ def __init__(self, data, manager=None): resource_type.rsplit('.', 1)[-1]) def get_related(self, resources): - ctx = ExecutionContext(local_session(Session), self.data, Config.empty()) - manager = self.factory(ctx, self.data) - related = manager.source.get_resources(None) + related = self.manager.get_resource_manager(self.factory.type).resources() if self.data.get('op'): return [r['id'] for r in related if self.match(r)] else: diff --git a/tools/c7n_azure/tests/test_output.py b/tools/c7n_azure/tests/test_output.py index 9e116345fa2..8bdd8d1ac61 100644 --- a/tools/c7n_azure/tests/test_output.py +++ b/tools/c7n_azure/tests/test_output.py @@ -29,12 +29,14 @@ def setUp(self): super(OutputTest, self).setUp() def get_azure_output(self): + output_dir = "azure://mystorage.blob.core.windows.net/logs" output = AzureStorageOutput( ExecutionContext( None, - Bag(name="xyz"), - Config.empty(output_dir="azure://mystorage.blob.core.windows.net/logs"), - ) + Bag(name="xyz", provider_name='azure'), + Config.empty(output_dir=output_dir) + ), + {'url': output_dir}, ) self.addCleanup(shutil.rmtree, output.root_dir) diff --git a/tools/c7n_gcp/c7n_gcp/output.py b/tools/c7n_gcp/c7n_gcp/output.py index c0fcbee2b8d..8c8b9a4b791 100644 --- a/tools/c7n_gcp/c7n_gcp/output.py +++ b/tools/c7n_gcp/c7n_gcp/output.py @@ -12,15 +12,15 @@ from c7n.output import ( blob_outputs, - FSOutput, metrics_outputs, - MetricsOutput, + DirectoryOutput, + Metrics, LogOutput) from c7n.utils import local_session @metrics_outputs.register('gcp') -class StackDriverMetrics(MetricsOutput): +class StackDriverMetrics(Metrics): METRICS_PREFIX = 'custom.googleapis.com/custodian/policy' @@ -59,9 +59,10 @@ class StackDriverMetrics(MetricsOutput): log = logging.getLogger('c7n_gcp.metrics') - def __init__(self, ctx): - super(StackDriverMetrics, self).__init__(ctx) - self.project_id = local_session(self.ctx.session_factory).get_default_project() + def __init__(self, ctx, config=None): + super(StackDriverMetrics, self).__init__(ctx, config) + self.project_id = local_session( + self.ctx.session_factory).get_default_project() def initialize(self): """One time initialization of metrics descriptors. @@ -93,7 +94,7 @@ def _format_metric(self, key, value, unit, dimensions): # Resource is a Google controlled vocabulary with artificial # limitations on resource type there's not much useful we can # utilize. - now = self.get_timestamp() + now = datetime.datetime.utcnow() metrics_series = { 'metric': { 'type': 'custom.googleapis.com/custodian/policy/%s' % key.lower(), @@ -158,10 +159,10 @@ def leave_log(self): @blob_outputs.register('gs') -class GCPStorageOutput(FSOutput): +class GCPStorageOutput(DirectoryOutput): - def __init__(self, ctx): - super(GCPStorageOutput, self).__init__(ctx) + def __init__(self, ctx, config=None): + super(GCPStorageOutput, self).__init__(ctx, config) self.date_path = datetime.datetime.now().strftime('%Y/%m/%d/%H') self.gs_path, self.bucket, self.key_prefix = parse_gs( self.ctx.output_path) diff --git a/tools/c7n_gcp/tests/test_output_gcp.py b/tools/c7n_gcp/tests/test_output_gcp.py index 83b5e23fe18..178338c67b1 100644 --- a/tools/c7n_gcp/tests/test_output_gcp.py +++ b/tools/c7n_gcp/tests/test_output_gcp.py @@ -16,7 +16,7 @@ import time from c7n.config import Bag -from c7n.output import MetricsOutput +from c7n.output import metrics_outputs from c7n_gcp.output import StackDriverMetrics from gcp_common import BaseTest @@ -26,7 +26,7 @@ class MetricsOutputTest(BaseTest): def test_metrics_selector(self): self.assertEqual( - MetricsOutput.select('gcp'), + metrics_outputs.get('gcp'), StackDriverMetrics) def test_metrics_output(self): diff --git a/tools/c7n_org/c7n_org/cli.py b/tools/c7n_org/c7n_org/cli.py index 6e98b8cc210..cbe39328c89 100644 --- a/tools/c7n_org/c7n_org/cli.py +++ b/tools/c7n_org/c7n_org/cli.py @@ -238,10 +238,12 @@ def report_account(account, region, policies_config, output_path, debug): policies = PolicyCollection.from_data(policies_config, config) records = [] for p in policies: + # initializee policy execution context for output access + p.ctx.initialize() log.debug( "Report policy:%s account:%s region:%s path:%s", p.name, account['name'], region, output_path) - policy_records = fs_record_set(p.ctx.output_path, p.name) + policy_records = fs_record_set(p.ctx.log_dir, p.name) for r in policy_records: r['policy'] = p.name r['region'] = p.options.region