Skip to content

Commit

Permalink
#6233 fix sailor ignores error from maester; update test (#191)
Browse files Browse the repository at this point in the history
* #6233 fix sailor ignores error from maester; update test

* #6233 bump object-storage-client version

* #6233 npm audit fix

* #6233 add better-npm-audit

* #6233 code style

* #6233 cleanup; improve log/error message

* #6233 run audit from packege.json -> scripts

* #6233 update test

* #6233 code style

* #6233 use latest version for object-storage-client

* #6233 fix incorrect params passed to objectStorage methods

* #6233 update tests

* #6233 update version

* #6233 update changelog
  • Loading branch information
Allirey authored Jun 21, 2022
1 parent a29a0a9 commit 7bd6325
Show file tree
Hide file tree
Showing 7 changed files with 376 additions and 369 deletions.
2 changes: 1 addition & 1 deletion .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ jobs: # a collection of steps
- ./node_modules
- run:
name: "Running audit"
command: npm audit --production --audit-level=high
command: npm run audit
- run: # run tests
name: test
command: npm test
Expand Down
Empty file added .nsprc
Empty file.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
## 2.6.28 (June 21, 2022)

* Fix: "sailor-nodejs ignores errors from maester during lightweight message upload" [#6233](https://github.com/elasticio/elasticio/issues/6233))

## 2.6.27 (March 10, 2022)

* Added npm audit to CI and fixed all dependencies
Expand Down
80 changes: 39 additions & 41 deletions lib/sailor.js
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,10 @@ class Sailor {
logger.info('Going to fetch message body.', { objectId });

try {
object = await this.objectStorage.getAsJSON(objectId, this.settings.OBJECT_STORAGE_TOKEN);
object = await this.objectStorage.getAsJSON(
objectId,
{ jwtPayloadOrToken: this.settings.OBJECT_STORAGE_TOKEN }
);
} catch (e) {
log.error(e);
throw new Error(`Failed to get message body with id=${objectId}`);
Expand All @@ -292,17 +295,12 @@ class Sailor {
return object;
}

async uploadMessageBody(bodyBuf) {
let id;

try {
const stream = () => Readable.from(bodyBuf);
id = await this.objectStorage.addAsStream(stream, this.settings.OBJECT_STORAGE_TOKEN);
} catch (e) {
log.error(e);
}

return id;
uploadMessageBody(bodyBuf) {
const stream = () => Readable.from(bodyBuf);
return this.objectStorage.addAsStream(
stream,
{ jwtPayloadOrToken: this.settings.OBJECT_STORAGE_TOKEN }
);
}

async runExec(module, payload, message, outgoingMessageHeaders, stepData, timeStart, logger) {
Expand Down Expand Up @@ -386,38 +384,38 @@ class Sailor {
OBJECT_STORAGE_SIZE_THRESHOLD: settings.OBJECT_STORAGE_SIZE_THRESHOLD
}
);
const [bodyId, ...passthroughIds] = await Promise.all([
that.uploadMessageBody(bodyBuf),
...passthroughBufs.map(async ({ stepId, body, id }) => {
const bodyId = id || await that.uploadMessageBody(body);
return { stepId, bodyId };
})
]);

if (bodyId) {
logger.info('Message body uploaded', { id: bodyId });
const { headers } = data;
data.body = {};
data.headers = {
...(headers || {}),
[OBJECT_ID_HEADER]: bodyId
};
} else {
logger.info('Message body not uploaded');

let bodyId;
let passthroughIds;
try {
[bodyId, ...passthroughIds] = await Promise.all([
that.uploadMessageBody(bodyBuf),
...passthroughBufs.map(async ({ stepId, body, id }) => {
const bodyId = id || await that.uploadMessageBody(body);
return { stepId, bodyId };
})
]);
} catch (e) {
logger.error(e, 'Error during message/passthrough body upload');
return onError(new Error('Lightweight message/passthrough body upload error'));
}

logger.info('Message body uploaded', { id: bodyId });
const { headers } = data;
data.body = {};
data.headers = {
...(headers || {}),
[OBJECT_ID_HEADER]: bodyId
};

for (const { stepId, bodyId } of passthroughIds) {
if (bodyId) {
logger.info('Passthrough Message body uploaded', { stepId, id: bodyId });
const { [stepId]: { headers } } = passthrough;
data.passthrough[stepId].body = {};
data.passthrough[stepId].headers = {
...(headers || {}),
[OBJECT_ID_HEADER]: bodyId
};
} else {
logger.info('Passtrough Message body not uploaded', { stepId, id: bodyId });
}
logger.info('Passthrough Message body uploaded', { stepId, id: bodyId });
const { [stepId]: { headers } } = passthrough;
data.passthrough[stepId].body = {};
data.passthrough[stepId].headers = {
...(headers || {}),
[OBJECT_ID_HEADER]: bodyId
};
}

} else {
Expand Down
51 changes: 9 additions & 42 deletions mocha_spec/unit/sailor.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -1063,7 +1063,7 @@ describe('Sailor', () => {
beforeEach(async () => {
const getObjectStub = sandbox.stub(sailor.objectStorage, 'getAsJSON');
bodyRequestStub = getObjectStub
.withArgs(bodyObjectId, settings.OBJECT_STORAGE_TOKEN)
.withArgs(bodyObjectId, { jwtPayloadOrToken: settings.OBJECT_STORAGE_TOKEN })
.resolves(body);
passthroughRequestStub = bodyRequestStub
.withArgs(passthroughObjectId)
Expand Down Expand Up @@ -1144,7 +1144,7 @@ describe('Sailor', () => {
beforeEach(async () => {
const getObjectStub = sandbox.stub(sailor.objectStorage, 'getAsJSON');
bodyRequestStub = getObjectStub
.withArgs(bodyObjectId, settings.OBJECT_STORAGE_TOKEN)
.withArgs(bodyObjectId, { jwtPayloadOrToken: settings.OBJECT_STORAGE_TOKEN })
.resolves(body);
passthroughRequestStub = bodyRequestStub.withArgs(passthroughObjectId).rejects(new Error());

Expand Down Expand Up @@ -1195,7 +1195,7 @@ describe('Sailor', () => {

passthroughRequestStub = sandbox
.stub(sailor.objectStorage, 'getAsJSON')
.withArgs(passthroughObjectId, settings.OBJECT_STORAGE_TOKEN)
.withArgs(passthroughObjectId, { jwtPayloadOrToken: settings.OBJECT_STORAGE_TOKEN })
.resolves(passThroughBody);

await sailor.connect();
Expand Down Expand Up @@ -1310,7 +1310,7 @@ describe('Sailor', () => {
});

sandbox.stub(sailor.objectStorage, 'getAsJSON')
.withArgs(passthroughObjectId, settings.OBJECT_STORAGE_TOKEN)
.withArgs(passthroughObjectId, { jwtPayloadOrToken: settings.OBJECT_STORAGE_TOKEN })
.resolves({ passthrough: 'body' });

await sailor.connect();
Expand Down Expand Up @@ -1391,46 +1391,13 @@ describe('Sailor', () => {
expect(sailor.apiClient.tasks.retrieveStep).to.have.been.calledOnce;
expect(fakeAMQPConnection.connect).to.have.been.calledOnce;
sinon.assert.calledTwice(addObjectStub);
sinon.assert.notCalled(fakeAMQPConnection.sendError);
sinon.assert.calledOnce(fakeAMQPConnection.sendData);
sinon.assert.calledWith(
fakeAMQPConnection.sendData,
{
body: { items: [1,2,3,4,5,6] },
headers: {},
passthrough: {
...payload.passthrough,
step_2: {
body: {},
headers: {
[Sailor.OBJECT_ID_HEADER]: passthroughObjectId //reuse already uploaded
}
},
step_1: {
headers: {},
body: { items: [1,2,3,4,5,6] }
}
}
},
expect(fakeAMQPConnection.sendError).to.have.been.calledOnce.and.calledWith(
sinon.match({
compId: '5559edd38968ec0736000456',
containerId: 'dc1c8c3f-f9cb-49e1-a6b8-716af9e15948',
end: sinon.match.number,
execId: 'some-exec-id',
function: 'data_trigger',
messageId: sinon.match.string,
parentMessageId: message.properties.headers.messageId,
start: sinon.match.number,
stepId: 'step_1',
taskId: '5559edd38968ec0736000003',
threadId: message.properties.headers.threadId,
userId: '5559edd38968ec0736000002',
workspaceId: '5559edd38968ec073600683'
})
message: 'Lightweight message/passthrough body upload error',
stack: sinon.match.string
}),
);

expect(fakeAMQPConnection.ack).to.have.been.calledOnce.and.calledWith(message);
const [{ headers, passthrough }] = fakeAMQPConnection.sendData.getCall(0).args;
});
});
});
Expand All @@ -1450,7 +1417,7 @@ describe('Sailor', () => {
});

sandbox.stub(sailor.objectStorage, 'getAsJSON')
.withArgs(passthroughObjectId, settings.OBJECT_STORAGE_TOKEN)
.withArgs(passthroughObjectId, { jwtPayloadOrToken: settings.OBJECT_STORAGE_TOKEN })
.resolves({ passthrough: 'body' });

await sailor.connect();
Expand Down
Loading

0 comments on commit 7bd6325

Please sign in to comment.