Skip to content

Commit

Permalink
chore: no halting 🚀 (#178)
Browse files Browse the repository at this point in the history
  • Loading branch information
bryanchriswhite authored Nov 17, 2022
1 parent 02b8532 commit 3b4bbf3
Show file tree
Hide file tree
Showing 12 changed files with 232 additions and 23 deletions.
9 changes: 9 additions & 0 deletions schema.graphql
Original file line number Diff line number Diff line change
Expand Up @@ -256,3 +256,12 @@ type Contract @entity {
storeMessage: StoreContractMessage!
instantiateMessage: InstantiateContractMessage!
}

type UnprocessedEntity @entity {
id: ID!
error: String!
event: Event
message: Message
transaction: Transaction
block: Block
}
6 changes: 5 additions & 1 deletion src/mappings/authz/exec.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
import {messageId} from "../utils";
import {attemptHandling, messageId, unprocessedEventHandler} from "../utils";
import {AuthzExecMsg} from "../types";
import {CosmosMessage} from "@subql/types-cosmos";

import {AuthzExec, AuthzExecMessage, Message} from "../../types";
import allModuleTypes from "../../cosmjs/proto";

export async function handleAuthzExec(msg: CosmosMessage<AuthzExecMsg>): Promise<void> {
await attemptHandling(msg, _handleAuthzExec, unprocessedEventHandler);
}

async function _handleAuthzExec(msg: CosmosMessage<AuthzExecMsg>): Promise<void> {
logger.info(`[handleAuthzExec] (tx ${msg.tx.hash}): indexing message ${msg.idx + 1} / ${msg.tx.decodedTx.body.messages.length}`);
logger.debug(`[handleAuthzExec] (msg.msg): ${JSON.stringify(msg.msg, null, 2)}`);

Expand Down
17 changes: 15 additions & 2 deletions src/mappings/bank/balanceChange.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
import {CosmosEvent} from "@subql/types-cosmos";
import {NativeBalanceChange, Transaction} from "../../types";
import {checkBalancesAccount, messageId} from "../utils";
import {
attemptHandling,
checkBalancesAccount,
messageId,
unprocessedEventHandler
} from "../utils";
import {parseCoins} from "../../cosmjs/utils";

export async function saveNativeBalanceEvent(id: string, address: string, amount: bigint, denom: string, event: CosmosEvent) {
Expand Down Expand Up @@ -28,6 +33,14 @@ async function saveNativeFeesEvent(event: CosmosEvent) {
}

export async function handleNativeBalanceDecrement(event: CosmosEvent): Promise<void> {
await attemptHandling(event, _handleNativeBalanceDecrement, unprocessedEventHandler);
}

export async function handleNativeBalanceIncrement(event: CosmosEvent): Promise<void> {
await attemptHandling(event, _handleNativeBalanceDecrement, unprocessedEventHandler);
}

async function _handleNativeBalanceDecrement(event: CosmosEvent): Promise<void> {
logger.info(`[handleNativeBalanceDecrement] (tx ${event.tx.hash}): indexing event ${event.idx + 1} / ${event.tx.tx.events.length}`);
logger.debug(`[handleNativeBalanceDecrement] (event.event): ${JSON.stringify(event.event, null, 2)}`);
logger.debug(`[handleNativeBalanceDecrement] (event.log): ${JSON.stringify(event.log, null, 2)}`);
Expand Down Expand Up @@ -65,7 +78,7 @@ export async function handleNativeBalanceDecrement(event: CosmosEvent): Promise<
await saveNativeFeesEvent(event);
}

export async function handleNativeBalanceIncrement(event: CosmosEvent): Promise<void> {
async function _handleNativeBalanceIncrement(event: CosmosEvent): Promise<void> {
logger.info(`[handleNativeBalanceIncrement] (tx ${event.tx.hash}): indexing event ${event.idx + 1} / ${event.tx.tx.events.length}`);
logger.debug(`[handleNativeBalanceIncrement] (event.event): ${JSON.stringify(event.event, null, 2)}`);
logger.debug(`[handleNativeBalanceIncrement] (event.log): ${JSON.stringify(event.log, null, 2)}`);
Expand Down
6 changes: 5 additions & 1 deletion src/mappings/bank/transfer.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
import {CosmosEvent, CosmosMessage} from "@subql/types-cosmos";
import {NativeTransferMsg} from "../types";
import {messageId} from "../utils";
import {attemptHandling, messageId, unprocessedEventHandler} from "../utils";
import {NativeTransfer} from "../../types";

export async function handleNativeTransfer(event: CosmosEvent): Promise<void> {
await attemptHandling(event, _handleNativeTransfer, unprocessedEventHandler);
}

async function _handleNativeTransfer(event: CosmosEvent): Promise<void> {
const msg: CosmosMessage<NativeTransferMsg> = event.msg;
logger.info(`[handleNativeTransfer] (tx ${msg.tx.hash}): indexing message ${msg.idx + 1} / ${msg.tx.decodedTx.body.messages.length}`);
logger.debug(`[handleNativeTransfer] (msg.msg): ${JSON.stringify(msg.msg, null, 2)}`);
Expand Down
16 changes: 14 additions & 2 deletions src/mappings/dist/rewards.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,22 @@
import {CosmosEvent, CosmosMessage} from "@subql/types-cosmos";
import {DistDelegatorClaimMsg} from "../types";
import {messageId} from "../utils";
import {attemptHandling, messageId, unprocessedEventHandler} from "../utils";
import {DistDelegatorClaim} from "../../types";
import {parseCoins} from "../../cosmjs/utils";

export async function handleDistDelegatorClaim(event: CosmosEvent): Promise<void> {
await attemptHandling(event,
_handleDistDelegatorClaim,
unprocessedEventHandler);
}

export async function handleDelegatorWithdrawRewardEvent(event: CosmosEvent): Promise<void> {
await attemptHandling(event,
_handleDelegatorWithdrawRewardEvent,
unprocessedEventHandler);
}

async function _handleDistDelegatorClaim(event: CosmosEvent): Promise<void> {
const msg: CosmosMessage<DistDelegatorClaimMsg> = event.msg;
logger.info(`[handleDistDelegatorClaim] (tx ${msg.tx.hash}): indexing DistDelegatorClaim ${messageId(msg)}`);
logger.debug(`[handleDistDelegatorClaim] (event.msg.msg): ${JSON.stringify(msg.msg, null, 2)}`);
Expand Down Expand Up @@ -36,7 +48,7 @@ export async function handleDistDelegatorClaim(event: CosmosEvent): Promise<void
await claim.save();
}

export async function handleDelegatorWithdrawRewardEvent(event: CosmosEvent): Promise<void> {
async function _handleDelegatorWithdrawRewardEvent(event: CosmosEvent): Promise<void> {
logger.debug(`[handleDelegateWithdrawRewardEvent] (event.event): ${JSON.stringify(event.event, null, 2)}`);
logger.debug(`[handleDelegateWithdrawRewardEvent] (event.log): ${JSON.stringify(event.log, null, 2)}`);

Expand Down
6 changes: 5 additions & 1 deletion src/mappings/gov/votes.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
import {CosmosEvent, CosmosMessage} from "@subql/types-cosmos";
import {GovProposalVoteMsg} from "../types";
import {messageId} from "../utils";
import {attemptHandling, messageId, unprocessedEventHandler} from "../utils";
import {GovProposalVote, GovProposalVoteOption} from "../../types";

export async function handleGovProposalVote(event: CosmosEvent): Promise<void> {
await attemptHandling(event, _handleGovProposalVote, unprocessedEventHandler);
}

async function _handleGovProposalVote(event: CosmosEvent): Promise<void> {
const msg: CosmosMessage<GovProposalVoteMsg> = event.msg;
logger.info(`[handleGovProposalVote] (tx ${msg.tx.hash}): indexing GovProposalVote ${messageId(msg)}`);
logger.debug(`[handleGovProposalVote] (event.msg.msg): ${JSON.stringify(msg.msg, null, 2)}`);
Expand Down
6 changes: 5 additions & 1 deletion src/mappings/ibc/transfer.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
import {CosmosEvent} from "@subql/types-cosmos";
import {IbcTransfer} from "../../types";
import {messageId} from "../utils";
import {attemptHandling, messageId, unprocessedEventHandler} from "../utils";

export async function handleIBCTransfer(event: CosmosEvent): Promise<void> {
await attemptHandling(event, _handleIBCTransfer, unprocessedEventHandler);
}

async function _handleIBCTransfer(event: CosmosEvent): Promise<void> {
const msg = event.msg;
logger.info(`[handleIBCTransfer] (tx ${msg.tx.hash}): indexing message ${msg.idx + 1} / ${msg.tx.decodedTx.body.messages.length}`);
logger.debug(`[handleIBCTransfer] (msg.msg): ${JSON.stringify(msg.msg, null, 2)}`);
Expand Down
44 changes: 40 additions & 4 deletions src/mappings/primitives.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,33 @@
import {CosmosBlock, CosmosEvent, CosmosMessage, CosmosTransaction} from "@subql/types-cosmos";
import {Block, Event, Message, Transaction, TxStatus} from "../types";
import {messageId} from "./utils";
import {
attemptHandling,
messageId,
primitivesFromMsg,
primitivesFromTx,
trackUnprocessed,
unprocessedEventHandler
} from "./utils";
import {createHash} from "crypto";
import {toBech32} from "@cosmjs/encoding";

export async function handleBlock(block: CosmosBlock): Promise<void> {
await attemptHandling(block, _handleBlock, _handleBlockError);
}

export async function handleTransaction(tx: CosmosTransaction): Promise<void> {
await attemptHandling(tx, _handleTransaction, _handleTransactionError);
}

export async function handleMessage(msg: CosmosMessage): Promise<void> {
await attemptHandling(msg, _handleMessage, _handleMessageError);
}

export async function handleEvent(event: CosmosEvent): Promise<void> {
await attemptHandling(event, _handleEvent, unprocessedEventHandler);
}

async function _handleBlock(block: CosmosBlock): Promise<void> {
logger.info(`[handleBlock] (block.header.height): indexing block ${block.block.header.height}`);

const {id, header: {chainId, height, time}} = block.block;
Expand All @@ -19,7 +42,7 @@ export async function handleBlock(block: CosmosBlock): Promise<void> {
await blockEntity.save();
}

export async function handleTransaction(tx: CosmosTransaction): Promise<void> {
async function _handleTransaction(tx: CosmosTransaction): Promise<void> {
logger.info(`[handleTransaction] (block ${tx.block.block.header.height}): indexing transaction ${tx.idx + 1} / ${tx.block.txs.length}`);
logger.debug(`[handleTransaction] (tx.decodedTx): ${JSON.stringify(tx.decodedTx, null, 2)}`);
logger.debug(`[handleTransaction] (tx.tx.log): ${tx.tx.log}`);
Expand Down Expand Up @@ -67,7 +90,7 @@ export async function handleTransaction(tx: CosmosTransaction): Promise<void> {
await txEntity.save();
}

export async function handleMessage(msg: CosmosMessage): Promise<void> {
async function _handleMessage(msg: CosmosMessage): Promise<void> {
logger.info(`[handleMessage] (tx ${msg.tx.hash}): indexing message ${msg.idx + 1} / ${msg.tx.decodedTx.body.messages.length}`);
logger.debug(`[handleMessage] (msg.msg): ${JSON.stringify(msg.msg, null, 2)}`);
delete msg.msg?.decodedMsg?.wasmByteCode;
Expand All @@ -83,7 +106,7 @@ export async function handleMessage(msg: CosmosMessage): Promise<void> {
await msgEntity.save();
}

export async function handleEvent(event: CosmosEvent): Promise<void> {
async function _handleEvent(event: CosmosEvent): Promise<void> {
logger.info(`[handleEvent] (tx ${event.tx.hash}): indexing event ${event.idx + 1} / ${event.tx.tx.events.length}`);
logger.debug(`[handleEvent] (event.event): ${JSON.stringify(event.event, null, 2)}`);
logger.debug(`[handleEvent] (event.log): ${JSON.stringify(event.log, null, 2)}`);
Expand All @@ -105,3 +128,16 @@ export async function handleEvent(event: CosmosEvent): Promise<void> {

await eventEntity.save();
}

async function _handleBlockError(err: Error, _: CosmosBlock): Promise<void> {
// NB: we won't have persisted any related entities yet.
await trackUnprocessed(err, {});
}

async function _handleTransactionError(err: Error, tx: CosmosTransaction): Promise<void> {
await trackUnprocessed(err, primitivesFromTx(tx));
}

async function _handleMessageError(err: Error, msg: CosmosMessage): Promise<void> {
await trackUnprocessed(err, primitivesFromMsg(msg));
}
77 changes: 74 additions & 3 deletions src/mappings/utils.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import {CosmosEvent, CosmosMessage} from "@subql/types-cosmos";
import {Account, Interface} from "../types";
import {CosmosBlock, CosmosEvent, CosmosMessage, CosmosTransaction} from "@subql/types-cosmos";
import {Account, Interface, UnprocessedEntity} from "../types";
import { createHash } from "crypto";

// messageId returns the id of the message passed or
// that of the message which generated the event passed.
Expand Down Expand Up @@ -42,6 +43,76 @@ export function getJaccardResult(payload: object): Interface {
return prediction.getInterface(); // return best matched Interface to contract
}

export type Primitive = CosmosEvent | CosmosMessage | CosmosTransaction | CosmosBlock;

export interface Primitives {
event?: CosmosEvent;
msg?: CosmosMessage;
tx?: CosmosTransaction;
block?: CosmosBlock;
}

export async function attemptHandling(input: Primitive,
handlerFn: (primitive) => Promise<void>,
errorFn: (Error, Primitive) => void): Promise<void> {
try {
await handlerFn(input);
} catch (error) {
errorFn(error, input);
}
}

export async function unprocessedEventHandler(err: Error, event: CosmosEvent): Promise<void> {
await trackUnprocessed(err, primitivesFromEvent(event));
}

export function primitivesFromTx(tx: CosmosTransaction): Primitives {
return {block: tx.block, tx: tx};
}

export function primitivesFromMsg(msg: CosmosMessage): Primitives {
return {block: msg.block, tx: msg.tx};
}

export function primitivesFromEvent(event: CosmosEvent): Primitives {
return {block: event.block, tx: event.tx};
}

export async function trackUnprocessed(error: Error, primitives: Primitives): Promise<void> {
logger.warn(`[trackUnprocessable] (error.message): ${error.message}`);
// NB: failsafe try/catch
try {
const {event, msg, tx, block} = primitives;
const sha256 = createHash("sha256");
// NB: use error stack if no primitives available (i.e. block handler).
const hashInput = event ?
messageId(event) : msg ?
messageId(msg) : tx ?
tx.hash : block ?
block.block.id : error.stack;
sha256.write(hashInput);
sha256.end();
// NB: ID is base64 encoded representation of the sha256 of `raw`.
const id = sha256.read().toString("base64");
const eventId = event ? messageId(event) : undefined;
const _messageId = event ? messageId(event) : undefined;
const transactionId = tx ? tx.hash : undefined;

const unprocessedEntity = UnprocessedEntity.create({
id,
error: error.stack,
eventId,
messageId: _messageId,
transactionId,
blockId: block.block.id,
});
return await unprocessedEntity.save();
} catch {
logger.error("[trackUnprocessable] (ERROR): unable to persist unprocessable entity");
logger.error(`[trackUnprocessable] (ERROR | stack): ${error.stack}`);
}
}

class Structure {
static getInterface() {
return Interface.Uncertain;
Expand Down Expand Up @@ -94,4 +165,4 @@ class LegacyBridgeSwapStructure extends Structure {
static getInterface() {
return Interface.LegacyBridgeSwap;
}
}
}
10 changes: 9 additions & 1 deletion src/mappings/wasm/bridge.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
import {CosmosEvent, CosmosMessage} from "@subql/types-cosmos";
import {LegacyBridgeSwapMsg} from "../types";
import {messageId} from "../utils";
import {attemptHandling, messageId, primitivesFromEvent, trackUnprocessed} from "../utils";
import {LegacyBridgeSwap} from "../../types";

export async function handleLegacyBridgeSwap(event: CosmosEvent): Promise<void> {
await attemptHandling(event, _handleLegacyBridgeSwap, _handleLegacyBridgeSwapError);
}

async function _handleLegacyBridgeSwap(event: CosmosEvent): Promise<void> {
const msg: CosmosMessage<LegacyBridgeSwapMsg> = event.msg;
const id = messageId(msg);
logger.info(`[handleLegacyBridgeSwap] (tx ${msg.tx.hash}): indexing LegacyBridgeSwap ${id}`);
Expand Down Expand Up @@ -38,3 +42,7 @@ export async function handleLegacyBridgeSwap(event: CosmosEvent): Promise<void>

await legacySwap.save();
}

async function _handleLegacyBridgeSwapError(err: Error, event: CosmosEvent): Promise<void> {
await trackUnprocessed(err, primitivesFromEvent(event));
}
29 changes: 26 additions & 3 deletions src/mappings/wasm/contracts.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
import {CosmosEvent, CosmosMessage} from "@subql/types-cosmos";
import {ExecuteContractMsg} from "../types";
import {getJaccardResult, messageId} from "../utils";
import {
attemptHandling,
getJaccardResult,
messageId,
unprocessedEventHandler
} from "../utils";
import {
Contract,
ExecuteContractMessage,
Expand All @@ -9,6 +14,24 @@ import {
} from "../../types/";

export async function handleExecuteContractEvent(event: CosmosEvent): Promise<void> {
await attemptHandling(event,
_handleExecuteContractEvent,
unprocessedEventHandler);
}

export async function handleContractStoreEvent(event: CosmosEvent): Promise<void> {
await attemptHandling(event,
_handleContractStoreEvent,
unprocessedEventHandler);
}

export async function handleContractInstantiateEvent(event: CosmosEvent): Promise<void> {
await attemptHandling(event,
_handleContractInstantiateEvent,
unprocessedEventHandler);
}

async function _handleExecuteContractEvent(event: CosmosEvent): Promise<void> {
const msg: CosmosMessage<ExecuteContractMsg> = event.msg;
logger.info(`[handleExecuteContractMessage] (tx ${msg.tx.hash}): indexing ExecuteContractMessage ${messageId(msg)}`);
logger.debug(`[handleExecuteContractMessage] (event.msg.msg): ${JSON.stringify(msg.msg, null, 2)}`);
Expand Down Expand Up @@ -37,7 +60,7 @@ export async function handleExecuteContractEvent(event: CosmosEvent): Promise<vo
await msgEntity.save();
}

export async function handleContractStoreEvent(event: CosmosEvent): Promise<void> {
async function _handleContractStoreEvent(event: CosmosEvent): Promise<void> {
logger.info(`[handleContractStoreEvent] (tx ${event.msg.tx.hash}): indexing event ${messageId(event.msg)}`);
logger.debug(`[handleContractStoreEvent] (event.event): ${JSON.stringify(event.event, null, 2)}`);
logger.debug(`[handleContractStoreEvent] (event.log): ${JSON.stringify(event.log, null, 2)}`);
Expand Down Expand Up @@ -67,7 +90,7 @@ export async function handleContractStoreEvent(event: CosmosEvent): Promise<void
await storeMsg.save();
}

export async function handleContractInstantiateEvent(event: CosmosEvent): Promise<void> {
async function _handleContractInstantiateEvent(event: CosmosEvent): Promise<void> {
logger.info(`[handleContractInstantiateEvent] (tx ${event.msg.tx.hash}): indexing event ${messageId(event.msg)}`);
logger.debug(`[handleContractInstantiateEvent] (event.event): ${JSON.stringify(event.event, null, 2)}`);
logger.debug(`[handleContractInstantiateEvent] (event.log): ${JSON.stringify(event.log, null, 2)}`);
Expand Down
Loading

0 comments on commit 3b4bbf3

Please sign in to comment.