Skip to content

Commit

Permalink
fix(cu): omit Memory not buffer on result. prevent Cron message on or…
Browse files Browse the repository at this point in the history
…igin block and timestamp
  • Loading branch information
TillaTheHun0 committed Dec 19, 2023
1 parent 5fd26d2 commit f3063fd
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 4 deletions.
2 changes: 1 addition & 1 deletion servers/cu/src/domain/client/pouchdb.js
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ export function findLatestEvaluationWith ({ pouchDb }) {
.chain((doc) => doc ? Resolved(doc) : Rejected(undefined))
/**
* Also retrieve the state buffer, persisted as an attachment
* and set it on the output.buffer field to match the expected output shape
* and set it on the output.Memory field to match the expected output shape
*/
.chain(fromPromise(async (doc) => {
const buffer = await pouchDb.getAttachment(doc._id, 'memory.txt')
Expand Down
16 changes: 15 additions & 1 deletion servers/cu/src/domain/lib/loadMessages.js
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,11 @@ export const CRON_TAG_REGEX = /^Cron-Tag-(.+)$/
* matches the provided cron
*/
export function isBlockOnCron ({ height, originHeight, cron }) {
/**
* Don't count the origin height as a match
*/
if (height === originHeight) return false

return (height - originHeight) % cron.value === 0
}

Expand All @@ -126,10 +131,15 @@ export function isTimestampOnCron ({ timestamp, originTimestamp, cron }) {
*/
timestamp = Math.floor(timestamp / 1000)
originTimestamp = Math.floor(originTimestamp / 1000)
/**
* don't count the origin timestamp as a match
*/
if (timestamp === originTimestamp) return false
return (timestamp - originTimestamp) % cron.value === 0
}

export function cronMessagesBetweenWith ({
logger,
processId,
owner: processOwner,
tags: processTags,
Expand Down Expand Up @@ -175,7 +185,9 @@ export function cronMessagesBetweenWith ({
*/
for (let i = 0; i < blockBased.length; i++) {
const cron = blockBased[i]

if (isBlockOnCron({ height: curBlock.height, originHeight: originBlock.height, cron })) {
logger('Generating Block based Cron Message for cron "%s" at block "%s"', `${i}-${cron.interval}`, curBlock.height)
yield {
cron: `${i}-${cron.interval}`,
message: {
Expand Down Expand Up @@ -211,6 +223,7 @@ export function cronMessagesBetweenWith ({
const cron = timeBased[i]

if (isTimestampOnCron({ timestamp: curTimestamp, originTimestamp: originBlock.timestamp, cron })) {
logger('Generating Time based Cron Message for cron "%s" at timestamp "%s"', `${i}-${cron.interval}`, curTimestamp)
yield {
cron: `${i}-${cron.interval}`,
message: {
Expand Down Expand Up @@ -305,7 +318,7 @@ function loadCronMessagesWith ({ loadTimestamp, locateScheduler, loadBlocksMeta,
logger.tap('Failed to parse crons:'),
ifElse(
length,
logger.tap('Crons found. Generating cron messages accoding to Crons'),
logger.tap('Crons found. Generating cron messages according to Crons'),
logger.tap('No crons found. No cron messages to generate')
)
)
Expand Down Expand Up @@ -356,6 +369,7 @@ function loadCronMessagesWith ({ loadTimestamp, locateScheduler, loadBlocksMeta,
* that are between those boundaries
*/
genCronMessages: cronMessagesBetweenWith({
logger,
processId: ctx.id,
owner: ctx.owner,
tags: ctx.tags,
Expand Down
2 changes: 1 addition & 1 deletion servers/cu/src/domain/lib/loadMessages.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,7 @@ describe('loadMessages', () => {
*/
]

const cronMessagesBetween = cronMessagesBetweenWith({ processId, owner, originBlock, crons, blocksMeta })
const cronMessagesBetween = cronMessagesBetweenWith({ logger: () => {}, processId, owner, originBlock, crons, blocksMeta })

const genCronMessages = cronMessagesBetween(
// left
Expand Down
2 changes: 1 addition & 1 deletion servers/cu/src/domain/readResult.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ export function readResultWith (env) {
return of({ processId, messageTxId })
.chain(loadMessageMeta)
.chain(res => readState({ processId: res.processId, to: res.timestamp }))
.map(omit(['buffer']))
.map(omit(['Memory']))
.map(
env.logger.tap(
'readResult result for message with txId %s to process %s',
Expand Down

0 comments on commit f3063fd

Please sign in to comment.