From 60ac29febcf91f07be2b05b9ef20471dec389564 Mon Sep 17 00:00:00 2001 From: Bruno Camarneiro Date: Tue, 23 Oct 2018 16:43:04 +0100 Subject: [PATCH] feat(NA): added plugin mechanism; moved system stats metrics to its own file; doc(NA): added documentation; tests(NA): fixed unit tests; chore(NA): removed cpu usage metric; chore(NA): exported status plugin; --- README.md | 29 ++++++ lib/client.js | 221 ++++++++++------------------------------ plugins/system-stats.js | 217 +++++++++++++++++++++++++++++++++++++++ spec/common.spec.js | 84 ++++----------- statful.js | 1 + 5 files changed, 319 insertions(+), 233 deletions(-) create mode 100644 plugins/system-stats.js diff --git a/README.md b/README.md index f6d4001..87bc1d3 100644 --- a/README.md +++ b/README.md @@ -243,6 +243,35 @@ Read the methods options reference bellow to get more information about the defa | **_tags_** (`object`) - Defines the tags of the metric. These tags are merged with the ones configured globally, including method defaults. | `{}` | `{}` | `{ unit: 'ms' }` | `{}` | **YES** | | **_timestamp_** (`string`) - Defines the timestamp of the metric. This timestamp is a **POSIX/Epoch** time in **seconds**. | `current timestamp` | `current timestamp` | `current timestamp` | `current timestamp` | **YES** | +## Plugins +It is possible to use plugin with the client. +```javascript + var SystemStatsPlugin = require('./plugins/system-stats.js'); + var statful = new Statful(config, log); + statful.use(new SystemStatsPlugin()); +``` +### System Stats Plugin +This plugin allows the client to send system-related metrics and/or enrich the user metrics with system tags. + +#### System Stats Plugin Configuration + +The custom options that can be set on config param are detailed below. + +| Option | Description | Type | Default | Required | +|:---|:---|:---|:---|:---| +| _bufferFlushLength_ | Defines the application global name. If specified sets a global tag `app=setValue`. | `metric` | true | **NO** | +| _timerEventLoop_ | Object to set methods options. | `metric` | true | **NO** | +| _processUptime_ | Uptime of the process in **miliseconds**. | `metric` | true | **NO** | +| _processMemoryUsage_ | Process memory usage in **bytes**. | `metric` | true | **NO** | +| _processMemoryUsagePerc_ | Process memory usage **percentage**. (compared to total OS memory) | `metric` | true | **NO** | +| _osUptime_ | OS uptime in **miliseconds**. | `metric` | true | **NO** | +| _osTotalMemory_ | OS total memory in **bytes**. | `metric` | true | **NO** | +| _osFreeMemory_ | OS free memory in **bytes**. | `metric` | true | **NO** | +| _tagHostname_ | Hostname. | `tag` | true | **NO** | +| _tagPlatform_ | Platform. | `tag` | true | **NO** | +| _tagArchitecture_ | Architecture. | `tag` | true | **NO** | +| _tagNodeVersion_ | NodeJS Version | `tag` | true | **NO** | + ## Authors [Mindera - Software Craft](https://github.com/Mindera) diff --git a/lib/client.js b/lib/client.js index 5a20c34..34fcf6a 100644 --- a/lib/client.js +++ b/lib/client.js @@ -1,7 +1,7 @@ 'use strict'; var dgram = require('dgram'); -var blocked = require('blocked'); + var merge = require('merge'); var configHelper = require('./config-helper'); var transport = require('./transport'); @@ -54,60 +54,6 @@ function putMetric (self, metricTypeConf, name, value, aggregation, aggregationF } } -/** - * Puts a system stats metric into the system stats buffer ready to be sent. - * - * @param self A self statful client. - * @param metricTypeConf A configuration for each metric type (counter, gauge, timer). Can be null if it a custom metric. - * @param name A metric name. - * @param value A metric value. - * @param aggregation The aggregation with which metric was aggregated. - * @param aggregationFreq The aggregation frequency with which metric was aggregated. - * @param parameters An object with metric para meters: tags, agg, aggFreq, namespace and timestamp. - */ -function putSystemStatsMetrics (self, name, value, parameters) { - var metricParams = parameters || {}; - var tags = metricParams.tags, - agg = metricParams.agg, - aggFreq = metricParams.aggFreq, - namespace = metricParams.namespace, - timestamp = metricParams.timestamp, - sampleRate = metricParams.sampleRate; - - putSystemStats(self, name, value, { - tags: tags, - agg: agg, - aggFreq: aggFreq, - namespace: namespace, - timestamp: timestamp, - sampleRate: sampleRate - }); -} - -function sendFlushStats (self) { - if (self.systemStats) { - var aggregations = ['avg', 'sum']; - if (self.aggregatedBuffer.bufferSize > 0 && self.transport === 'api') { - putSystemStatsMetrics(self, 'buffer.flush_length', self.aggregatedBuffer.bufferSize, { - agg: aggregations, - tags: { buffer_type: 'aggregated' } - }); - } - if (self.nonAggregatedBuffer.bufferSize > 0) { - putSystemStatsMetrics(self, 'buffer.flush_length', self.nonAggregatedBuffer.bufferSize, { - agg: aggregations, - tags: { buffer_type: 'non-aggregated' } - }); - } - if (self.systemStatsBuffer.bufferSize > 0) { - putSystemStatsMetrics(self, 'buffer.flush_length', self.systemStatsBuffer.bufferSize, { - agg: aggregations, - tags: { buffer_type: 'system-stats' } - }); - } - } -} - /** * Logs all the metrics to the logger * @@ -138,11 +84,21 @@ function logMetrics (self) { } self.logger.debug(stringToLogHeader + stringToLog); } - if (self.systemStatsBuffer.bufferSize > 0) { - self.logger.debug('Flushing metrics (system stats): ' + self.systemStatsBuffer.buffer); + if (pluginsBuffersSize(self.pluginBuffers) > 0) { + self.logger.debug('Flushing plugins metrics'); } } +function pluginsBuffersSize (buffers) { + var size = 0; + + for (var i in buffers) { + size += buffers[i].bufferSize; + } + + return size; +} + /** * Sends the non aggregated and system stats metrics using UDP transport * @@ -164,8 +120,8 @@ function sendMetricsByUdpTransport (self) { self.socket.send(buffer, 0, buffer.length, self.port, self.host); } - if (self.systemStatsBuffer.bufferSize > 0) { - buffer = new Buffer(self.systemStatsBuffer.buffer); + for (var i in self.pluginBuffers) { + buffer = new Buffer(self.pluginBuffers[i].buffer); self.socket.send(buffer, 0, buffer.length, self.port, self.host); } } @@ -233,22 +189,25 @@ function sendMetricsByApiTransport (self) { } } - if (self.systemStatsBuffer.bufferSize > 0) { - var nonAggregatedStatsOptions = transport.buildRequestOptions( - self.protocol, - self.host, - self.port, - self.basePath, - self.token, - self.timeout - ); - - self.logger.debug('Flushing to ' + nonAggregatedStatsOptions.url + ' system stats metrics'); - - if (self.compression) { - transport.sendCompressedMessage(nonAggregatedStatsOptions, self.systemStatsBuffer.buffer, self.logger); - } else { - transport.sendUncompressedMessage(nonAggregatedStatsOptions, self.systemStatsBuffer.buffer, self.logger); + for (var i in self.pluginBuffers) { + var element = self.pluginBuffers[i]; + if(element.bufferSize > 0) { + var nonAggregatedStatsOptions = transport.buildRequestOptions( + self.protocol, + self.host, + self.port, + self.basePath, + self.token, + self.timeout + ); + + self.logger.debug('Flushing to ' + nonAggregatedStatsOptions.url + ' system stats metrics'); + + if (self.compression) { + transport.sendCompressedMessage(nonAggregatedStatsOptions, element.buffer, self.logger); + } else { + transport.sendUncompressedMessage(nonAggregatedStatsOptions, element.buffer, self.logger); + } } } } @@ -259,9 +218,12 @@ function sendMetricsByApiTransport (self) { * @param self A self client instance. */ function flush (self) { - sendFlushStats(self); + for (var index = 0; index < self.plugins.length; index++) { + self.plugins[index].onFlush(self); + } + var metricsCounter = - self.aggregatedBuffer.bufferSize + self.nonAggregatedBuffer.bufferSize + self.systemStatsBuffer.bufferSize; + self.aggregatedBuffer.bufferSize + self.nonAggregatedBuffer.bufferSize + pluginsBuffersSize(self.pluginBuffers); if (metricsCounter > 0) { if (self.dryRun) { @@ -280,8 +242,11 @@ function flush (self) { self.aggregatedBuffer.bufferSize = 0; self.nonAggregatedBuffer.buffer = ''; self.nonAggregatedBuffer.bufferSize = 0; - self.systemStatsBuffer.buffer = ''; - self.systemStatsBuffer.bufferSize = 0; + + for (var i in self.pluginBuffers) { + self.pluginBuffers[i].buffer = ''; + self.pluginBuffers[i].bufferSize = 0; + } } } @@ -317,7 +282,7 @@ function addToBuffer (self, metricLines, isMetricAggregated, agg, aggFreq) { if ( self.aggregatedBuffer.bufferSize + self.nonAggregatedBuffer.bufferSize + - self.systemStatsBuffer.bufferSize >= + pluginsBuffersSize(self.pluginBuffers) >= self.flushSize ) { setTimeout(function () { @@ -329,27 +294,6 @@ function addToBuffer (self, metricLines, isMetricAggregated, agg, aggFreq) { } } -/** - * Adds raw metrics directly into the flush buffer. Use this method with caution. - * - * @param self A self client instance. - * @param metricLines The metrics, in valid line protocol, to push to the buffer. - */ -function addToStatsBuffer (self, metricLines) { - if (typeof metricLines !== 'undefined') { - var targetBuffer = self.systemStatsBuffer; - - if (targetBuffer.bufferSize > 0) { - targetBuffer.buffer += '\n'; - } - - targetBuffer.buffer += metricLines; - targetBuffer.bufferSize++; - } else { - self.logger.error('addToStatsBuffer: Invalid metric lines: ' + metricLines); - } -} - /** * Adds a new metric to the in-memory buffer. * @@ -409,60 +353,6 @@ function putRaw (self, metric, value, parameters, isMetricAggregated) { } } -/** - * Adds a new system stats metric to the in-memory system stats buffer. - * - * @param self A self client instance. - * @param metric Name metric such as 'response_time'. - * @param value. - * @param parameters An object with metric para meters: tags, agg, aggFreq, namespace and timestamp. - * - tags: Tags to associate this value with, for example {from: 'serviceA', to: 'serviceB', method: 'login'}. - * - agg: List of aggregations to be applied by Statful. Ex: ['avg', 'p90', 'min']. - * - aggFreq: Aggregation frequency in seconds. One of: 10, 30, 60 ,120, 180, 300. Default: 10. - * - namespace: Define the metric namespace. Default: application. - * - timestamp: Defines the metrics timestamp. Default: current timestamp. - */ -function putSystemStats (self, metric, value, parameters) { - var metricParams = parameters || {}; - - var tags = metricParams.tags, - agg = metricParams.agg, - aggFreq = metricParams.aggFreq, - namespace = metricParams.namespace, - timestamp = metricParams.timestamp, - sampleRate = parameters.sampleRate || self.sampleRate; - - // Vars to Put - var putNamespace = namespace || self.namespace; - var putAggFreq = aggFreq || 10; - var putTags = merge(self.app ? merge({ app: self.app }, tags) : tags, self.tags); - - var metricName = putNamespace + '.' + metric, - flushLine = metricName, - sampleRateNormalized = (sampleRate || 100) / 100; - - if (Math.random() <= sampleRateNormalized) { - flushLine = Object.keys(putTags).reduce(function (previousValue, tag) { - return previousValue + ',' + tag + '=' + putTags[tag]; - }, flushLine); - - flushLine += ' ' + value + ' ' + (timestamp || Math.round(new Date().getTime() / 1000)); - - if (agg) { - agg.push(putAggFreq); - flushLine += ' ' + agg.join(','); - - if (sampleRate && sampleRate < 100) { - flushLine += ' ' + sampleRate; - } - } - - addToStatsBuffer(self, [flushLine]); - } else { - self.logger.debug('Metric was discarded due to sample rate.'); - } -} - /** * Calls put metric with an aggregated metric. * @@ -532,7 +422,6 @@ var Client = function (config, logger) { this.namespace = config.namespace || 'application'; this.dryRun = config.dryRun; this.tags = config.tags || {}; - this.systemStats = config.systemStats !== undefined ? config.systemStats : true; this.sampleRate = config.sampleRate || 100; this.flushInterval = config.flushInterval || 3000; this.flushSize = config.flushSize || 1000; @@ -559,19 +448,8 @@ var Client = function (config, logger) { bufferSize: 0 }; - this.systemStatsBuffer = { - buffer: '', - bufferSize: 0 - }; - - if (this.systemStats) { - blocked(function (ms) { - putSystemStatsMetrics(self, 'timer.event_loop', ms, { - agg: ['avg', 'p90', 'count'], - tags: { unit: 'ms' } - }); - }); - } + this.plugins = []; + this.pluginBuffers = {}; setInterval( function (obj) { @@ -739,4 +617,9 @@ Client.prototype.timer = function (name, value, parameters) { putNonAggregatedMetric(this, this.default.timer, 'timer.' + name, value, parameters); }; +Client.prototype.use = function (plugin) { + plugin.onInit(this); + this.plugins.push(plugin); +}; + module.exports = Client; diff --git a/plugins/system-stats.js b/plugins/system-stats.js new file mode 100644 index 0000000..1f41877 --- /dev/null +++ b/plugins/system-stats.js @@ -0,0 +1,217 @@ +var os = require('os'); +var merge = require('merge'); +var blocked = require('blocked'); + +/** + * Puts a system stats metric into the system stats buffer ready to be sent. + * + * @param self A self statful client. + * @param metricTypeConf A configuration for each metric type (counter, gauge, timer). Can be null if it a custom metric. + * @param name A metric name. + * @param value A metric value. + * @param aggregation The aggregation with which metric was aggregated. + * @param aggregationFreq The aggregation frequency with which metric was aggregated. + * @param parameters An object with metric para meters: tags, agg, aggFreq, namespace and timestamp. + */ +function putSystemStatsMetrics (self, name, value, parameters) { + var metricParams = parameters || {}; + var tags = metricParams.tags, + agg = metricParams.agg, + aggFreq = metricParams.aggFreq, + namespace = metricParams.namespace, + timestamp = metricParams.timestamp, + sampleRate = metricParams.sampleRate; + + putSystemStats(self, name, value, { + tags: tags, + agg: agg, + aggFreq: aggFreq, + namespace: namespace, + timestamp: timestamp, + sampleRate: sampleRate + }); +} + +/** + * Adds a new system stats metric to the in-memory system stats buffer. + * + * @param self A self client instance. + * @param metric Name metric such as 'response_time'. + * @param value. + * @param parameters An object with metric para meters: tags, agg, aggFreq, namespace and timestamp. + * - tags: Tags to associate this value with, for example {from: 'serviceA', to: 'serviceB', method: 'login'}. + * - agg: List of aggregations to be applied by Statful. Ex: ['avg', 'p90', 'min']. + * - aggFreq: Aggregation frequency in seconds. One of: 10, 30, 60 ,120, 180, 300. Default: 10. + * - namespace: Define the metric namespace. Default: application. + * - timestamp: Defines the metrics timestamp. Default: current timestamp. + */ +function putSystemStats (self, metric, value, parameters) { + var metricParams = parameters || {}; + + var tags = metricParams.tags, + agg = metricParams.agg, + aggFreq = metricParams.aggFreq, + namespace = metricParams.namespace, + timestamp = metricParams.timestamp, + sampleRate = parameters.sampleRate || self.sampleRate; + + // Vars to Put + var putNamespace = namespace || self.namespace; + var putAggFreq = aggFreq || 10; + var putTags = merge(self.app ? merge({ app: self.app }, tags) : tags, self.tags); + + var metricName = putNamespace + '.' + metric, + flushLine = metricName, + sampleRateNormalized = (sampleRate || 100) / 100; + + if (Math.random() <= sampleRateNormalized) { + flushLine = Object.keys(putTags).reduce(function (previousValue, tag) { + return previousValue + ',' + tag + '=' + putTags[tag]; + }, flushLine); + + flushLine += ' ' + value + ' ' + (timestamp || Math.round(new Date().getTime() / 1000)); + + if (agg) { + agg.push(putAggFreq); + flushLine += ' ' + agg.join(','); + + if (sampleRate && sampleRate < 100) { + flushLine += ' ' + sampleRate; + } + } + + addToStatsBuffer(self, [flushLine]); + } else { + self.logger.debug('Metric was discarded due to sample rate.'); + } +} + +/** + * Adds raw metrics directly into the flush buffer. Use this method with caution. + * + * @param self A self client instance. + * @param metricLines The metrics, in valid line protocol, to push to the buffer. + */ +function addToStatsBuffer (self, metricLines) { + if (typeof metricLines !== 'undefined') { + var targetBuffer = self.pluginBuffers.systemStats; + + if (targetBuffer.bufferSize > 0) { + targetBuffer.buffer += '\n'; + } + + targetBuffer.buffer += metricLines; + targetBuffer.bufferSize++; + } else { + self.logger.error('addToStatsBuffer: Invalid metric lines: ' + metricLines); + } +} + +var SystemStatsPlugin = function (configs) { + if (!configs) { configs = {}; } + + // System Metrics + this.metrics = { + bufferFlushLength: configs.bufferFlushLength || true, + timerEventLoop: configs.timerEventLoop || true, + processUptime: configs.processUptime || false, + processMemoryUsage: configs.processMemoryUsage || false, + processMemoryUsagePerc: configs.processMemoryUsagePerc || false, + osCpuUsage: configs.osCpuUsage || false, + osUptime: configs.osUptime || false, + osTotalMemory: configs.osTotalMemory || false, + osFreeMemory: configs.osFreeMemory || false + }; + + // System Tags + this.tags = { + hostname: configs.tagHostname || false, + platform: configs.tagPlatform || false, + architecture: configs.tagArchitecture || false, + nodeVersion: configs.tagNodeVersion || false + }; +}; + +SystemStatsPlugin.prototype.onInit = function (client) { + client.pluginBuffers.systemStats = { + buffer: '', + bufferSize: 0 + }; + + var tags = {}; + + if (this.tags.hostname) { + tags.hostname = os.hostname(); + } + if (this.tags.platform) { + tags.platform = os.platform(); + } + if (this.tags.architecture) { + tags.architecture = os.arch(); + } + if (this.tags.nodeVersion) { + tags.node_version = process.version; + } + + client.tags = merge(client.tags, tags); + + if (this.metrics.timerEventLoop) { + blocked(function (ms) { + putSystemStatsMetrics(client, 'timer.event_loop', ms, { + agg: ['avg', 'p90', 'count'], + tags: { unit: 'ms' } + }); + }); + } +}; + +SystemStatsPlugin.prototype.onFlush = function (client) { + //process + var procMem = process.memoryUsage().rss, + osTotMem = os.totalmem(), + osFreeMem = os.freemem(), + procMemPerc = (procMem * 100) / osTotMem, + procUptimeMs = process.uptime() * 1000, + osUptimeMs = os.uptime() * 1000; + + if (this.metrics.processUptime) { + putSystemStatsMetrics(client, 'process.uptime', procUptimeMs, { + agg: ['avg', 'last'], + tags: { unit: 'ms' } + }); + } + + if (this.metrics.processMemoryUsage) { + putSystemStatsMetrics(client, 'process.memory.usage', procMem, { + agg: ['avg', 'last'], + tags: { unit: 'byte' } + }); + } + + if (this.metrics.processMemoryUsagePerc) { + putSystemStatsMetrics(client, 'process.memory.usage.perc', procMemPerc, { + agg: ['avg', 'last'] + }); + } + + if (this.metrics.osTotalMemory) { + putSystemStatsMetrics(client, 'os.memory.total', osTotMem, { + agg: ['avg', 'last'] + }); + } + + if (this.metrics.osFreeMemory) { + putSystemStatsMetrics(client, 'os.memory.free', osFreeMem, { + agg: ['avg', 'last'] + }); + } + + if (this.metrics.osUptime) { + putSystemStatsMetrics(client, 'os.uptime', osUptimeMs, { + agg: ['avg', 'last'], + tags: { unit: 'ms' } + }); + } +}; + +module.exports = SystemStatsPlugin; \ No newline at end of file diff --git a/spec/common.spec.js b/spec/common.spec.js index 6b3fb64..170da75 100644 --- a/spec/common.spec.js +++ b/spec/common.spec.js @@ -3,6 +3,22 @@ /*jshint -W003 */ var Client = require('../lib/client'); +var SystemStatsPlugin = require('../plugins/system-stats.js'); +var pluginConfig = { + bufferFlushLength: true, + timerEventLoop: false, + processUptime: false, + processMemoryUsage: false, + processMemoryUsagePerc: false, + osCpuUsage: false, + osUptime: false, + osTotalMemory: false, + osFreeMemory: false, + tagHostname: false, + tagPlatform: false, + tagArchitecture: false, + tagNodeVersion: false +} var udpServer = require('./tools/udp-server'); var httpsServer = require('./tools/https-server'); @@ -891,11 +907,10 @@ describe('When sending metrics', function () { done(); }); - it('should log metrics when dryRun is activated (systemStats)', function (done) { + it('should log metrics when dryRun is activated', function (done) { // Given var victim = new Client( { - systemStats: true, transport: 'api', api: apiConf, flushSize: 1, @@ -903,6 +918,7 @@ describe('When sending metrics', function () { }, logger ); + victim.use(new SystemStatsPlugin(pluginConfig)); sinon.spy(victim.logger, 'debug'); @@ -911,71 +927,11 @@ describe('When sending metrics', function () { // Then setTimeout(function () { - expect(victim.logger.debug.getCall(1).args[0]).to.match( - /^Flushing metrics \(system stats\): application\.buffer\.flush_length,buffer_type=non-aggregated 1 \d+ avg,sum,10\napplication\.buffer\.flush_length,buffer_type=system-stats 1 \d+ avg,sum,10/ + expect(victim.logger.debug.getCall(0).args[0]).to.match( + /^Flushing metrics \(non aggregated\): application\.my_metric/ ); - victim.logger.debug.restore(); done(); }); }); - - it('should send system stats metrics through UDP', function (done) { - // Given - udpServer.start(udpPort, '127.0.0.1', null, onResponse); - var victim = new Client( - { - systemStats: true, - transport: 'udp', - port: udpPort, - flushSize: 1, - flushInterval: 10000 - }, - logger - ); - // When - victim.put('my_metric', 1, null, false); - // Then - function onResponse (lines) { - //expect my_metric - if (lines.toString().indexOf('\n') === -1) { - expect(lines.toString()).to.match(/^application.my_metric 1 \d+$/); - } else { - //expect systemStats after that - expect(lines.toString()).to.match( - /^application\.buffer\.flush_length,buffer_type=non-aggregated 1 \d+ avg,sum,10\napplication.buffer.flush_length,buffer_type=system-stats 1 \d+ avg,sum,10,10/ - ); - udpServer.stop(); - done(); - } - } - }); - - it('should send system stats metrics through HTTPS', function (done) { - // Given - httpsServer.start(httpPort, '127.0.0.1', onResponse); - var victim = new Client( - { - systemStats: true, - transport: 'api', - api: apiConf, - flushSize: 1 - }, - logger - ); - // When - victim.put('my_metric', 1); - // Then - function onResponse (lines) { - if (lines.toString().indexOf('\n') === -1) { - expect(lines).to.match(/^application\.my_metric 1 \d+$/); - } else { - expect(lines).to.match( - /^application\.buffer\.flush_length,buffer_type=non-aggregated 1 \d+ avg,sum,10\napplication.buffer.flush_length,buffer_type=system-stats 1 \d+ avg,sum,10,10/ - ); - httpsServer.stop(); - done(); - } - } - }); }); diff --git a/statful.js b/statful.js index e1250b0..63a03fc 100644 --- a/statful.js +++ b/statful.js @@ -1 +1,2 @@ module.exports = require('./lib/client'); +module.exports.statsPlugin = require('./plugins/system-stats');