Skip to content
This repository has been archived by the owner on Aug 12, 2023. It is now read-only.

Commit

Permalink
Introduce job for indexing fill traders (#458)
Browse files Browse the repository at this point in the history
* Add new job type to constants

* Start creating job for indexing fill traders

* Finish test suite and fix a couple of bugs

* Index traders after fill creation
  • Loading branch information
cbovis authored Sep 19, 2020
1 parent 18a1854 commit 4abbb53
Show file tree
Hide file tree
Showing 7 changed files with 659 additions and 4 deletions.
1 change: 1 addition & 0 deletions src/constants.js
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ module.exports = {
INDEX_APP_FILL_ATTRIBUTONS: 'index-app-fill-attributions',
INDEX_FILL: 'index-fill',
INDEX_FILL_PROTOCOL_FEE: 'index-fill-protocol-fee',
INDEX_FILL_TRADERS: 'index-fill-traders',
INDEX_FILL_VALUE: 'index-fill-value',
INDEX_TRADED_TOKENS: 'index-traded-tokens',
},
Expand Down
113 changes: 113 additions & 0 deletions src/consumers/index-fill-traders/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
const _ = require('lodash');

const { JOB, QUEUE } = require('../../constants');
const { publishJob } = require('../../queues');
const elasticsearch = require('../../util/elasticsearch');
const getAddressMetadata = require('../../addresses/get-address-metadata');
const getTransactionByHash = require('../../transactions/get-transaction-by-hash');

const indexFillTraders = async (job, { logger }) => {
const delayJobProcessing = async () => {
await publishJob(QUEUE.INDEXING, JOB.INDEX_FILL_TRADERS, job.data, {
delay: 30000,
});
};

const {
fillDate,
fillId,
fillValue,
maker,
taker,
tradeCount,
transactionHash,
} = job.data;

const takerMetadata = await getAddressMetadata(taker);

if (takerMetadata === null || takerMetadata.isContract === undefined) {
logger.warn(`taker address type is unknown: ${taker}`);
await delayJobProcessing();

return;
}

let transaction;

if (takerMetadata.isContract) {
transaction = await getTransactionByHash(transactionHash);

if (transaction === null) {
logger.warn(`transaction has not been fetched: ${transactionHash}`);
await delayJobProcessing();

return;
}
}

const tradeValue =
fillValue === undefined ? undefined : fillValue * tradeCount;

const requestBody = [
JSON.stringify({
index: {
_id: `${fillId}_maker`,
},
}),
JSON.stringify({
address: maker,
fillId,
date: fillDate,
makerFillCount: 1,
makerFillValue: fillValue,
makerTradeCount: tradeCount,
makerTradeValue: tradeValue,
totalFillCount: 1,
totalFillValue: fillValue,
totalTradeCount: tradeCount,
totalTradeValue: tradeValue,
updatedAt: new Date().toISOString(),
}),
JSON.stringify({
index: {
_id: `${fillId}_taker`,
},
}),
JSON.stringify({
address: takerMetadata.isContract ? transaction.from : taker,
fillId,
date: fillDate,
takerFillCount: 1,
takerFillValue: fillValue,
takerTradeCount: tradeCount,
takerTradeValue: tradeValue,
totalFillCount: 1,
totalFillValue: fillValue,
totalTradeCount: tradeCount,
totalTradeValue: tradeValue,
updatedAt: new Date().toISOString(),
}),
].join('\n');

const result = await elasticsearch
.getClient()
.bulk({ body: `${requestBody}\n`, index: 'trader_fills' });

if (result.body.errors === true) {
const errorMessage = _.get(
result,
'body.items[0].index.error.reason',
`Failed to index trader_fills`,
);

throw new Error(errorMessage);
}

logger.info(`indexed fill traders: ${fillId}`);
};

module.exports = {
fn: indexFillTraders,
jobName: JOB.INDEX_FILL_TRADERS,
queueName: QUEUE.INDEXING,
};
Loading

0 comments on commit 4abbb53

Please sign in to comment.