Skip to content

Commit

Permalink
Merge pull request #2 from statful/fix-performance-problems
Browse files Browse the repository at this point in the history
Fix performance problems
  • Loading branch information
mistic authored Oct 6, 2016
2 parents 8e2e2dc + af087da commit 0fc7e98
Show file tree
Hide file tree
Showing 6 changed files with 176 additions and 83 deletions.
4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
"version": "1.0.1",
"description": "AWS Cloudwatch collector for Statful",
"engines": {
"node": ">=4.0.0",
"node": ">=4.6.0",
"npm": ">=3.8.0"
},
"scripts": {
Expand Down Expand Up @@ -53,7 +53,7 @@
"jsonschema": "^1.1.0",
"lodash": "^4.15.0",
"source-map-support": "^0.4.0",
"statful-client": "~4.1.0",
"statful-client": "~4.1.2",
"yargs": "^5.0.0"
},
"repository": {
Expand Down
1 change: 1 addition & 0 deletions src/collector.js
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ class Collector {
let now = new Date();
let nowMinusPastPeriod = new Date(new Date(now).setSeconds(now.getSeconds() - (this[_config].statfulCollectorAws.period - 1)));

// TODO - Verify this window time with AWS documentation
now.setMinutes(now.getMinutes() - 5);
nowMinusPastPeriod.setMinutes(nowMinusPastPeriod.getMinutes() - 5);

Expand Down
25 changes: 22 additions & 3 deletions src/metrics-list.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import Promise from 'bluebird';
import each from 'async/each';
import eachSeries from 'async/eachSeries';
import eachOf from 'async/eachOf';
import doWhilst from 'async/doWhilst';
import AWS from 'aws-sdk';
Expand All @@ -16,12 +17,16 @@ const _metricsPerRegion = Symbol('metricsPerRegion');
const _parseValidDimensionsFromRequestData = Symbol('parseValidDimensionsFromRequestData');
const _parseValidMetricsWithDimensionsValidatorFromRequestsData = Symbol('parseValidMetricsWithDimensionsValidatorFromRequestsData');
const _processReceivedRequestsData = Symbol('processReceivedRequestsData');
const _requestsPerBatch = Symbol('requestsPerBatch');
const _timeoutPerBatch = Symbol('timeoutPerBatch');

class MetricsList {
constructor(config) {
this[_config] = config;
this[_metricsPerRegion] = null;
this[_dimensionsValidatorPerRegionAndMetric] = null;
this[_timeoutPerBatch] = 300;
this[_requestsPerBatch] = 50;
}

buildMetricsPerRegion() {
Expand All @@ -32,10 +37,21 @@ class MetricsList {

eachOf(whiteListConfig,
(metrics, region, eachOfCallback) => {
each(metrics,
let requestsCount = 0;

eachSeries(metrics,
(metric, eachCallback) => {
requestsPromises.push(this[_cloudWatchListMetrics](region, metric));
eachCallback(null);

if (requestsCount >= this[_requestsPerBatch]) {
setTimeout(()=>{
requestsCount = 0;
eachCallback(null);
}, this[_timeoutPerBatch]);
} else {
requestsCount++;
eachCallback(null);
}
},
() => {
eachOfCallback(null);
Expand Down Expand Up @@ -224,7 +240,9 @@ class MetricsList {
}

[_parseValidMetricsWithDimensionsValidatorFromRequestsData](requestsData, resolve) {
let auxMetricsPerRegion = {};
let auxMetricsPerRegion = {
totalMetrics: 0
};

each(requestsData,
(requestData, eachCallback) => {
Expand All @@ -250,6 +268,7 @@ class MetricsList {
},
() => {
auxMetricsPerRegion[metricRegion] = auxMetricsPerRegion[metricRegion].concat(validMetrics);
auxMetricsPerRegion.totalMetrics += validMetrics.length;
eachCallback(null);
}
);
Expand Down
208 changes: 132 additions & 76 deletions src/request.js
Original file line number Diff line number Diff line change
@@ -1,21 +1,27 @@
import Promise from 'bluebird';
import series from 'async/series';
import each from 'async/each';
import eachSeries from 'async/eachSeries';
import eachOf from 'async/eachOf';
import AWS from 'aws-sdk';

const _buildAndSendDatapoints = Symbol('buildAndSendDatapoints');
const _calculateTimeoutPerBatch = Symbol('calculateTimeoutPerBatch');
const _cloudWatchClientPerRegion = Symbol('cloudWatchClientPerRegion');
const _cloudWatchGetMetricStatistics = Symbol('cloudWatchGetMetricStatistics');
const _config = Symbol('config');
const _endTime = Symbol('endTime');
const _getAWSMetricsData = Symbol('getAWSMetricsData');
const _metricsPerRegion = Symbol('metricsPerRegion');
const _period = Symbol('period');
const _receivedDataPerRegion = Symbol('receivedDataPerRegion');
const _requestsPerBatch = Symbol('requestsPerBatch');
const _sendAWSMetricsData = Symbol('sendAWSMetricsData');
const _startTime = Symbol('startTime');
const _statfulClient = Symbol('statfulClient');
const _statistics = Symbol('statistics');
const _timeoutPerBatch = Symbol('timeoutPerBatch');
const _totalNumberOfRequests = Symbol('totalNumberOfRequests');

class Request {
constructor(config, metricsPerRegion, startTime, endTime, statfulClient) {
Expand All @@ -27,6 +33,23 @@ class Request {
this[_metricsPerRegion] = metricsPerRegion;
this[_receivedDataPerRegion] = {};
this[_statfulClient] = statfulClient;
this[_totalNumberOfRequests] = this[_metricsPerRegion].totalMetrics;
this[_requestsPerBatch] = 50;
this[_timeoutPerBatch] = this[_calculateTimeoutPerBatch]();
// TODO: If timeout per batch was < 100 throw an exception and don't execute the request.
this[_cloudWatchClientPerRegion] = {};

eachOf(this[_metricsPerRegion],
(metrics, region, eachOfRegionsCallback) => {
this[_cloudWatchClientPerRegion][region] = new AWS.CloudWatch({
accessKeyId: this[_config].statfulCollectorAws.credentials.accessKeyId,
secretAccessKey: this[_config].statfulCollectorAws.credentials.secretAccessKey,
region: region
});
eachOfRegionsCallback(null);
},
() => {}
);
}

execute() {
Expand All @@ -49,85 +72,95 @@ class Request {
});
}

[_buildAndSendDatapoints](region, metric, eachMetricCallback) {
let metricName = metric.MetricName;
let metricNamespace = metric.Namespace.replace('/', '.');
let metricAggFreq = metric.Period;
let metricTags = {
region: region
};

metric.Dimensions.forEach((dimension) => {
metricTags[dimension.Name] = dimension.Value;
});
[_buildAndSendDatapoints](region, metric) {
return new Promise( (resolve) => {
setTimeout(()=> {
let metricName = metric.MetricName;
let metricNamespace = metric.Namespace.replace('/', '.');
let metricAggFreq = metric.Period;
let metricTags = {
region: region
};

metric.Dimensions.forEach((dimension) => {
metricTags[dimension.Name] = dimension.Value;
});

each(metric.Datapoints,
(dataPoint, eachDataPointCallback) => {
let metricTimestamp = Math.round(new Date(dataPoint.Timestamp).getTime() / 1000);

metricTags.Unit = dataPoint.Unit;

this[_config].statfulCollectorAws.statistics.forEach((statistic) => {
let metricValue = dataPoint[statistic];
let metricAgg = null;

switch (statistic) {
case 'SampleCount':
metricAgg = 'count';
break;
case 'Average':
metricAgg = 'avg';
break;
case 'Sum':
metricAgg = 'sum';
break;
case 'Minimum':
metricAgg = 'min';
break;
case 'Maximum':
metricAgg = 'max';
break;
each(metric.Datapoints,
(dataPoint, eachDataPointCallback) => {
let metricTimestamp = Math.round(new Date(dataPoint.Timestamp).getTime() / 1000);

metricTags.Unit = dataPoint.Unit;

this[_config].statfulCollectorAws.statistics.forEach((statistic) => {
let metricValue = dataPoint[statistic];
let metricAgg = null;

switch (statistic) {
case 'SampleCount':
metricAgg = 'count';
break;
case 'Average':
metricAgg = 'avg';
break;
case 'Sum':
metricAgg = 'sum';
break;
case 'Minimum':
metricAgg = 'min';
break;
case 'Maximum':
metricAgg = 'max';
break;
}

if (metricAgg) {
this[_statfulClient].aggregatedPut(metricName, metricValue, metricAgg, metricAggFreq, {
namespace: metricNamespace,
tags: metricTags,
timestamp: metricTimestamp
});
}
});
eachDataPointCallback(null);
},
() => {
resolve();
}
);
}, 0);
});
}

if (metricAgg) {
this[_statfulClient].aggregatedPut(metricName, metricValue, metricAgg, metricAggFreq, {namespace:metricNamespace, tags:metricTags, timestamp: metricTimestamp});
}
});
eachDataPointCallback(null);
},
() => {
eachMetricCallback(null);
}
);
[_calculateTimeoutPerBatch]() {
let timeout = (this[_requestsPerBatch] * 0.4 * this[_period] * 1000) / this[_totalNumberOfRequests];
return Math.round(timeout);
}

[_cloudWatchGetMetricStatistics](region, metric) {
return new Promise( (resolve) => {
let cloudWatch = new AWS.CloudWatch({
accessKeyId: this[_config].statfulCollectorAws.credentials.accessKeyId,
secretAccessKey: this[_config].statfulCollectorAws.credentials.secretAccessKey,
region: region
});
let reqParams = {
StartTime: this[_startTime],
EndTime: this[_endTime],
Period: this[_period],
Statistics: this[_statistics],
MetricName: metric.MetricName,
Namespace: metric.Namespace,
Dimensions: metric.Dimensions
};

cloudWatch.getMetricStatistics(reqParams, (err, data) => {
if (data) {
data.Period = this[_period];
data.Statistics = this[_statistics];
data.MetricName = metric.MetricName;
data.Namespace = metric.Namespace;
data.Dimensions = metric.Dimensions;
}
resolve({region:region, data:data});
});
setTimeout(()=>{
let reqParams = {
StartTime: this[_startTime],
EndTime: this[_endTime],
Period: this[_period],
Statistics: this[_statistics],
MetricName: metric.MetricName,
Namespace: metric.Namespace,
Dimensions: metric.Dimensions
};

this[_cloudWatchClientPerRegion][region].getMetricStatistics(reqParams, (err, data) => {
if (data) {
data.Period = this[_period];
data.Statistics = this[_statistics];
data.MetricName = metric.MetricName;
data.Namespace = metric.Namespace;
data.Dimensions = metric.Dimensions;
}
resolve({region:region, data:data});
});
}, 0);
});
}

Expand All @@ -137,10 +170,21 @@ class Request {

eachOf(this[_metricsPerRegion],
(metrics, region, eachOfRegionsCallback) => {
each(metrics,
let requestsCount = 0;

eachSeries(metrics,
(metric, eachMetricCallback) => {
requestsPromises.push(this[_cloudWatchGetMetricStatistics](region, metric));
eachMetricCallback(null);

if (requestsCount >= this[_requestsPerBatch]) {
setTimeout(()=>{
requestsCount = 0;
eachMetricCallback(null);
}, this[_timeoutPerBatch]);
} else {
requestsCount++;
eachMetricCallback(null);
}
},
() => {
eachOfRegionsCallback(null);
Expand Down Expand Up @@ -175,9 +219,21 @@ class Request {
return new Promise( (resolve) => {
eachOf(this[_receivedDataPerRegion],
(metrics, region, eachOfRegionsCallback) => {
each(metrics,
let requestsCount = 0;

eachSeries(metrics,
(metric, eachMetricCallback) => {
this[_buildAndSendDatapoints](region, metric, eachMetricCallback);
this[_buildAndSendDatapoints](region, metric);

if (requestsCount >= this[_requestsPerBatch]) {
setTimeout(()=>{
requestsCount = 0;
eachMetricCallback(null);
}, this[_timeoutPerBatch]);
} else {
requestsCount++;
eachMetricCallback(null);
}
},
() => {
eachOfRegionsCallback(null);
Expand Down
1 change: 1 addition & 0 deletions test/unit/metrics-list.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ describe('Metrics list module tests', () => {
metricsList.getMetricsPerRegion().then(
(metricsPerRegion) => {
let expectedResults = {
"totalMetrics": 5,
"us-west-2": [
{
"Namespace": "AWS/ELB",
Expand Down
Loading

0 comments on commit 0fc7e98

Please sign in to comment.