Skip to content

Commit

Permalink
Fix emit batch strategy (#9)
Browse files Browse the repository at this point in the history
  • Loading branch information
A3a3e1 authored Apr 22, 2022
1 parent 7a0436c commit a19e8ba
Show file tree
Hide file tree
Showing 5 changed files with 133 additions and 4 deletions.
1 change: 1 addition & 0 deletions .eslintrc.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,6 @@ module.exports = {
},
rules: {
'import/extensions': 'off',
"no-use-before-define": 0,
},
};
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
## 3.1.3 (April 22, 2022)
* Fix a bug when emit strategy 'Emit Batch' did not process correctly

## 3.1.2 (April 14, 2022)
* Update `component-commons-library` to read and upload attachments through `Maester`
* Update `elasticio-sailor-nodejs` to v2.6.27
Expand Down
4 changes: 2 additions & 2 deletions component.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"title": "CSV",
"description": "A comma-separated values (CSV) file stores tabular data (numbers and text) in plain-text form",
"docsUrl": "https://github.com/elasticio/csv-component",
"version": "3.1.2",
"version": "3.1.3",
"actions": {
"read_action": {
"main": "./lib/actions/read.js",
Expand Down Expand Up @@ -85,4 +85,4 @@
"dynamicMetadata": true
}
}
}
}
12 changes: 10 additions & 2 deletions lib/actions/read.js
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,11 @@ async function readCSV(msg, cfg) {
if (emitBehavior === 'fetchAll' || cfg.emitAll === true || cfg.emitAll === 'true') {
await this.emit('data', messages.newMessageWithBody({ result }))
} else if (emitBehavior === 'emitBatch') {
const chunks = sliceIntoChunks(result, body.batchSize);
const { batchSize } = body;
if (!isPositiveInteger(batchSize)) {
throw new Error("'batchSize' must be a positive integer!");
}
const chunks = sliceIntoChunks(result, batchSize);
// eslint-disable-next-line no-plusplus
for (let i = 0; i < chunks.length; i++) {
// eslint-disable-next-line no-await-in-loop
Expand Down Expand Up @@ -162,7 +166,7 @@ module.exports.getMetaModel = async function getMetaModel(cfg) {
out: {}
};

if (cfg.emitBehavior === 'emitBatch') {
if (cfg.emitAll === 'emitBatch') {
meta.in.properties.batchSize = {
title: 'Batch Size',
type: 'number',
Expand All @@ -171,3 +175,7 @@ module.exports.getMetaModel = async function getMetaModel(cfg) {
}
return meta;
}

function isPositiveInteger(input) {
return Number.isSafeInteger(input) && input > 0;
}
117 changes: 117 additions & 0 deletions spec/read.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,87 @@ describe('CSV Read component', async () => {
});
});

describe('emitAll: emitBatch', async () => {
it('should contain batchSize field', async () => {
cfg = {
emitAll: 'emitBatch',
};
const expectedMetadata = {
in: {
type: 'object',
properties: {
url: {
type: 'string',
required: true,
title: 'URL',
},
header: {
type: 'boolean',
required: false,
title: 'Contains headers',
},
delimiter: {
type: 'string',
required: false,
title: 'Delimiter',
},
dynamicTyping: {
type: 'boolean',
required: false,
title: 'Convert Data types',
},
batchSize: {
title: 'Batch Size',
type: 'number',
required: true,
},
},
},
out: {},
};
const metadata = await readCSV.getMetaModel(cfg);
expect(metadata).to.deep.equal(expectedMetadata);
});
});

describe('emitAll: fetchAll', async () => {
it('should not contain batchSize field', async () => {
cfg = {
emitAll: 'fetchAll',
};
const expectedMetadata = {
in: {
type: 'object',
properties: {
url: {
type: 'string',
required: true,
title: 'URL',
},
header: {
type: 'boolean',
required: false,
title: 'Contains headers',
},
delimiter: {
type: 'string',
required: false,
title: 'Delimiter',
},
dynamicTyping: {
type: 'boolean',
required: false,
title: 'Convert Data types',
},
},
},
out: {},
};
const metadata = await readCSV.getMetaModel(cfg);
expect(metadata).to.deep.equal(expectedMetadata);
});
});

it('One file', async () => {
msg.body = {
url: 'http://test.env.mock/formats.csv',
Expand Down Expand Up @@ -133,6 +214,42 @@ describe('CSV Read component', async () => {
.to.equal(2.71828); // Number
});

it('emitBatch: true, batchSize is negative', async () => {
msg.body = {
url: 'http://test.env.mock/formats.csv',
header: true,
dynamicTyping: true,
batchSize: -5,
};
cfg = {
emitAll: 'emitBatch',
};
context.emit = sinon.spy();
try {
await readCSV.process.call(context, msg, cfg);
} catch (err) {
expect(err.message).to.be.equal('\'batchSize\' must be a positive integer!');
}
});

it('emitBatch: true, batchSize is string', async () => {
msg.body = {
url: 'http://test.env.mock/formats.csv',
header: true,
dynamicTyping: true,
batchSize: 'asd',
};
cfg = {
emitAll: 'emitBatch',
};
context.emit = sinon.spy();
try {
await readCSV.process.call(context, msg, cfg);
} catch (err) {
expect(err.message).to.be.equal('\'batchSize\' must be a positive integer!');
}
});

it('emitAll: emitIndividually, header: false, dynamicTyping: false', async () => {
msg.body = {
url: 'http://test.env.mock/formats.csv',
Expand Down

0 comments on commit a19e8ba

Please sign in to comment.