Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/bugfix/CLDSRV-573-prom-client' i…
Browse files Browse the repository at this point in the history
…nto w/8.6/bugfix/CLDSRV-573-prom-client
  • Loading branch information
BourgoisMickael committed Nov 6, 2024
2 parents 3d0ebd1 + cbaa5d6 commit c460ccc
Show file tree
Hide file tree
Showing 2 changed files with 272 additions and 0 deletions.
87 changes: 87 additions & 0 deletions lib/check_unicode.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
#! /usr/bin/env node
/* eslint-disable no-console */

// https://en.wikipedia.org/wiki/Plane_(Unicode)
// Test all unicode plan from 0 to 0x10FFF

const AWS = require('aws-sdk');
const fs = require('fs');
const timers = require('timers/promises');

Check failure on line 9 in lib/check_unicode.js

View workflow job for this annotation

GitHub Actions / linting-coverage

'timers' is assigned a value but never used
const async = require('async');
const s3 = new AWS.S3({
endpoint: process.env.AWS_ENDPOINT_URL,
region: process.env.AWS_REGION,
});

const bucketName = 'mick-encoding';
const folder = 'unicode';
const prefix = `${folder}/code`;

/** BMP */
const plane0 = {
start: 0x0000,
end: 0x0010, //0xFFFF,
success: {},
failure: {},
};

const MAX_PROMISE = 3000;

async function run() {
const result = await async.parallelLimit(
[...Array(plane0.end - plane0.start).keys()]
.map(index => async () => {
const charCode = plane0.start + index;
const char = String.fromCharCode(charCode);
const hex = (charCode).toString(16).padStart(4, '0');
const key = `${prefix}-u${hex}-${char}-end`;
console.log(`PUT [${charCode}]`, hex, 'encoded', encodeURIComponent(char));
console.log('');

try {
const res = await s3.putObject({ Bucket: bucketName, Key: key }).promise();
console.log('Success', charCode, hex, res);
plane0.success[hex] = true;
return res;
} catch (err) {
console.log('Failure', charCode, hex, err);
plane0.failure[hex] = true;
return err;
}
}),
MAX_PROMISE);

console.log(result);
/*
for (let charCode = plane0.start; charCode < plane0.end; charCode++) {
const char = String.fromCharCode(charCode);
const hex = (charCode).toString(16).padStart(4, '0');
const key = `${prefix}-u${hex}-${char}-end`;
console.log(`PUT [${charCode}]`, 'encoded', encodeURIComponent(char), key);
console.log('');
while (ongoing >= MAX_PROMISE) {
console.log('WAITING Because there are already ongoing promises');
void await timers.setTimeout(2000);
}
ongoing++;
s3.putObject({ Bucket: bucketName, Key: key }).promise()
.then(() => {
console.log('SUCCESS', key);
plane0.success[hex] = true;
})
.catch(err => {
console.error('FAILURE', key, err.toString());
plane0.failure[hex] = true;
})
.finally(() => {
ongoing--;
});
}
*/

fs.writeFileSync('plane0.json', JSON.stringify(plane0, null, 4));
}

run().then(console.log).catch(console.error);
185 changes: 185 additions & 0 deletions lib/tst.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
const http = require('http');
const cluster = require('cluster');
const arsenal = require('arsenal');

const logger = require('./utilities/logger');

const client = require('prom-client');
const timers = require('timers/promises');
const collectDefaultMetrics = client.collectDefaultMetrics;
const aggregatorRegistry = new client.AggregatorRegistry();

class S3Server {
constructor(worker) {
this.worker = worker;
this.servers = [];
http.globalAgent.keepAlive = true;

process.on('SIGINT', this.cleanUp.bind(this));
process.on('SIGHUP', this.cleanUp.bind(this));
process.on('SIGQUIT', this.cleanUp.bind(this));
process.on('SIGTERM', this.cleanUp.bind(this));
process.on('SIGPIPE', () => {});
process.on('uncaughtException', err => {
console.error('caught error', {
error: err.message,
stack: err.stack,
workerId: this.worker ? this.worker.id : undefined,
workerPid: this.worker ? this.worker.process.pid : undefined,
});
this.caughtExceptionShutdown();
});
this.started = false;
}

routeRequest(req, res) {
console.log(req.url);
res.end(200);
}

initiateStartup() {
const server = http.createServer();
server.on('connection', socket => {
socket.on('error', err => console.info('request rejected',
{ error: err }));
});
server.on('request', (req, res) => {
console.log('handle req worker', cluster.worker?.id, cluster.worker?.process?.pid)
let a = Number.MAX_SAFE_INTEGER
for (let i = 0; i < 1_000_000_000; i++) {
a--;
a++;

}
res.end();
});
server.on('listening', () => {
console.info('server started', {
pid: process.pid,
serverPort: 18000,
});
});
server.listen(18000);
this.servers.push(server);
}

cleanUp() {
console.info('server shutting down');
Promise.all(this.servers.map(server =>
new Promise(resolve => server.close(resolve))
)).then(() => process.exit(0));
}

caughtExceptionShutdown() {
console.error('shutdown of worker due to exception', {
workerId: this.worker ? this.worker.id : undefined,
workerPid: this.worker ? this.worker.process.pid : undefined,
});
// Will close all servers, cause disconnect event on master and kill
// worker process with 'SIGTERM'.
this.worker.kill();
}
}

async function _routeAdminRequest(req, res) {
console.log('received admin request');

let metrics = '';
let code = 200;
try {
metrics
} catch (err) {
console.log('METRIC ERR', err);
code = 500
metrics = err.toString()
}
const contentLen = Buffer.byteLength(metrics, 'utf8');
res.writeHead(code, {
'Content-Length': contentLen,
'Content-Type': aggregatorRegistry.contentType,
});
res.end(undefined);
}

function setupMetricsServer() {
const WebServer = arsenal.network.http.server;
const metricsServer = new WebServer(18001, logger);
metricsServer.onRequest(_routeAdminRequest);

console.info(`starting metrics server on port 18001`);
metricsServer.start();
}

function main() {
const clusters = 10;
if (cluster.isMaster) {
// Make sure all workers use the same report token
process.env.REPORT_TOKEN = 'hello';

for (let n = 0; n < clusters; n++) {
const worker = cluster.fork();
console.info('new worker forked', {
workerId: worker.id,
workerPid: worker.process.pid,
});
}
setInterval(() => {
const len = Object.keys(cluster.workers).length;
if (len < clusters) {
for (let i = len; i < clusters; i++) {
const newWorker = cluster.fork();
console.info('new worker forked', {
workerId: newWorker.id,
workerPid: newWorker.process.pid,
});
}
}
}, 1000);
cluster.on('disconnect', worker => {
console.error('worker disconnected. making sure exits', {
workerId: worker.id,
workerPid: worker.process.pid,
});
setTimeout(() => {
if (!worker.isDead() && !worker.exitedAfterDisconnect) {
console.error('worker not exiting. killing it', {
workerId: worker.id,
workerPid: worker.pid,
});
worker.process.kill('SIGKILL');
}
}, 2000);
});
cluster.on('exit', worker => {
console.error('worker exited.', {
workerId: worker.id,
workerPid: worker.process.pid,
});
});
// setup metrics server in master, prom-client will not aggregate metrics
// from workers by default, use prom-client's aggregate registry to collect
// from all workers!
setupMetricsServer();
} else {
const server = new S3Server(cluster.worker);
server.initiateStartup();
// collect default metrics from the workers (will be aggregated in master)
collectDefaultMetrics();
new client.Gauge({
name: 'zworker',
help: 'zworker',
labelNames: ['id']
}).labels({ id: cluster.worker?.id }).inc();
const m = new client.Gauge({
name: 'zzzzzzz',
help: 'aaaaa',
async collect() {
this.set(5);
},
});
m.inc();
m.inc();
}
}

main();

0 comments on commit c460ccc

Please sign in to comment.