From 38aa6c165fc3a12daebf50a50863b3ab48bc4594 Mon Sep 17 00:00:00 2001 From: Vince Juliano Date: Thu, 8 Feb 2024 15:48:40 -0500 Subject: [PATCH 1/4] feat(mu): pull From-Module tag from process data #270 --- scheduler-utils/src/client/in-memory.js | 22 ++-------- servers/mu/package-lock.json | 1 + servers/mu/package.json | 1 + servers/mu/src/domain/clients/in-memory.js | 43 +++++++++++++++++++ servers/mu/src/domain/clients/scheduler.js | 4 +- servers/mu/src/domain/index.js | 1 + servers/mu/src/domain/lib/main.js | 9 ++-- .../domain/lib/processDataItem/build-tx.js | 26 ++++++++--- .../lib/processDataItem/build-tx.test.js | 14 ++++-- 9 files changed, 87 insertions(+), 34 deletions(-) create mode 100644 servers/mu/src/domain/clients/in-memory.js diff --git a/scheduler-utils/src/client/in-memory.js b/scheduler-utils/src/client/in-memory.js index fe3a8131e..55e724969 100644 --- a/scheduler-utils/src/client/in-memory.js +++ b/scheduler-utils/src/client/in-memory.js @@ -29,29 +29,15 @@ export function createLruCache ({ size }) { } export function getByProcessWith ({ cache = internalCache }) { - return async (process) => { + return async (processId) => { if (!internalSize) return - return cache.get(process) + return cache.get(processId) } } export function setByProcessWith ({ cache = internalCache }) { - return async (process, { url, address }, ttl) => { + return async (processId, processData, ttl) => { if (!internalSize) return - return cache.set(process, { url, address }, { ttl }) - } -} - -export function getByOwnerWith ({ cache = internalCache }) { - return async (owner) => { - if (!internalSize) return - return cache.get(owner) - } -} - -export function setByOwnerWith ({ cache = internalCache }) { - return async (owner, url, ttl) => { - if (!internalSize) return - return cache.set(owner, { url, address: owner }, { ttl }) + return cache.set(processId, processData, { ttl }) } } diff --git a/servers/mu/package-lock.json b/servers/mu/package-lock.json index 91e83b25e..3beb52d92 100644 --- a/servers/mu/package-lock.json +++ b/servers/mu/package-lock.json @@ -18,6 +18,7 @@ "express": "^4.18.2", "heapdump": "^0.3.15", "hyper-async": "^1.1.2", + "lru-cache": "^10.2.0", "pg": "^8.11.3", "pg-promise": "^11.5.4", "ramda": "^0.29.1", diff --git a/servers/mu/package.json b/servers/mu/package.json index 0c9d105ba..9dccc23f7 100644 --- a/servers/mu/package.json +++ b/servers/mu/package.json @@ -31,6 +31,7 @@ "express": "^4.18.2", "heapdump": "^0.3.15", "hyper-async": "^1.1.2", + "lru-cache": "^10.2.0", "pg": "^8.11.3", "pg-promise": "^11.5.4", "ramda": "^0.29.1", diff --git a/servers/mu/src/domain/clients/in-memory.js b/servers/mu/src/domain/clients/in-memory.js new file mode 100644 index 000000000..55e724969 --- /dev/null +++ b/servers/mu/src/domain/clients/in-memory.js @@ -0,0 +1,43 @@ +import { LRUCache } from 'lru-cache' + +/** + * @type {LRUCache} + */ +let internalCache +let internalSize +export function createLruCache ({ size }) { + if (internalCache) return internalCache + internalSize = size + internalCache = new LRUCache({ + /** + * number of entries + */ + max: size, + /** + * max size of cache, as a scalar. + * + * In our case, characters (see sizeCalculation) + */ + maxSize: 1_000_000 * 5, + /** + * Simply stringify to get the bytes + */ + sizeCalculation: (v) => JSON.stringify(v).length, + allowStale: true + }) + return internalCache +} + +export function getByProcessWith ({ cache = internalCache }) { + return async (processId) => { + if (!internalSize) return + return cache.get(processId) + } +} + +export function setByProcessWith ({ cache = internalCache }) { + return async (processId, processData, ttl) => { + if (!internalSize) return + return cache.set(processId, processData, { ttl }) + } +} diff --git a/servers/mu/src/domain/clients/scheduler.js b/servers/mu/src/domain/clients/scheduler.js index 7f363a111..81760cdc6 100644 --- a/servers/mu/src/domain/clients/scheduler.js +++ b/servers/mu/src/domain/clients/scheduler.js @@ -34,7 +34,7 @@ function writeDataItemWith ({ fetch, logger }) { } } -function fetchSequencerProcessWith ({ logger }) { +function fetchSchedulerProcessWith ({ fetch, logger }) { return async (processId, suUrl) => { logger(`${suUrl}/processes/${processId}`) @@ -46,5 +46,5 @@ function fetchSequencerProcessWith ({ logger }) { export default { writeDataItemWith, - fetchSequencerProcessWith + fetchSchedulerProcessWith } diff --git a/servers/mu/src/domain/index.js b/servers/mu/src/domain/index.js index a2dfdd236..c6ecceec5 100644 --- a/servers/mu/src/domain/index.js +++ b/servers/mu/src/domain/index.js @@ -57,6 +57,7 @@ export const createApis = (ctx) => { locateScheduler: raw, locateProcess: locate, writeDataItem: schedulerClient.writeDataItemWith({ fetch, logger: processMsgLogger }), + fetchSchedulerProcess: schedulerClient.fetchSchedulerProcessWith({ fetch, logger: processMsgLogger }), buildAndSign: signerClient.buildAndSignWith({ MU_WALLET, logger: processMsgLogger }), fetchResult: cuClient.resultWith({ fetch, CU_URL, logger: processMsgLogger }), saveMsg: dataStoreClient.saveMsgWith({ dbInstance, logger: processMsgLogger }), diff --git a/servers/mu/src/domain/lib/main.js b/servers/mu/src/domain/lib/main.js index ba51dc1c4..7410d395d 100644 --- a/servers/mu/src/domain/lib/main.js +++ b/servers/mu/src/domain/lib/main.js @@ -160,9 +160,10 @@ export function processMsgWith ({ findLatestSpawns, deleteMsg, logger, - writeDataItemArweave + writeDataItemArweave, + fetchSchedulerProcess }) { - const buildTx = buildTxWith({ buildAndSign, logger }) + const buildTx = buildTxWith({ buildAndSign, logger, locateProcess, fetchSchedulerProcess }) const getCuAddress = getCuAddressWith({ selectNode, logger }) const writeMessage = writeMessageTxWith({ writeDataItem, locateProcess, logger, writeDataItemArweave }) const fetchAndSaveResult = fetchAndSaveResultWith({ fetchResult, saveMsg, saveSpawn, findLatestMsgs, findLatestSpawns, logger }) @@ -237,7 +238,7 @@ export function crankMsgsWith ({ export function monitorProcessWith ({ logger, createDataItem, - startProcessMonitor, + startProcessMonitor }) { const parseDataItem = parseDataItemWith({ createDataItem, logger }) const start = startWith({ logger, startProcessMonitor }) @@ -252,7 +253,7 @@ export function monitorProcessWith ({ export function stopMonitorProcessWith ({ logger, createDataItem, - stopProcessMonitor, + stopProcessMonitor }) { const parseDataItem = parseDataItemWith({ createDataItem, logger }) const stop = stopWith({ logger, stopProcessMonitor }) diff --git a/servers/mu/src/domain/lib/processDataItem/build-tx.js b/servers/mu/src/domain/lib/processDataItem/build-tx.js index 5e6fdff04..222217f43 100644 --- a/servers/mu/src/domain/lib/processDataItem/build-tx.js +++ b/servers/mu/src/domain/lib/processDataItem/build-tx.js @@ -10,30 +10,42 @@ const ctxSchema = z.object({ }) }).passthrough() -export function buildTxWith ({ buildAndSign, logger }) { +export function buildTxWith (env) { + let { buildAndSign, logger, locateProcess, fetchSchedulerProcess } = env + + locateProcess = fromPromise(locateProcess) + fetchSchedulerProcess = fromPromise(fetchSchedulerProcess) + buildAndSign = fromPromise(buildAndSign) + return (ctx) => { return of(ctx) .map(tap(() => ctx.tracer.trace('Building and signing message from outbox'))) - .chain(fromPromise(() => { + .chain(() => locateProcess(ctx.cachedMsg.processId)) + .chain((res) => fetchSchedulerProcess(ctx.cachedMsg.processId, res.url)) + .map((res) => { const tagsIn = [ ...ctx.cachedMsg.msg.Tags?.filter((tag) => { - return !['Data-Protocol', 'Type', 'Variant', 'From-Process'].includes(tag.name) + return !['Data-Protocol', 'Type', 'Variant', 'From-Process', 'From-Module'].includes(tag.name) }) ?? [], { name: 'Data-Protocol', value: 'ao' }, { name: 'Type', value: 'Message' }, { name: 'Variant', value: 'ao.TN.1' }, - { name: 'From-Process', value: ctx.cachedMsg.processId } + { name: 'From-Process', value: ctx.cachedMsg.processId }, + { name: 'From-Module', value: res.tags.find((t) => t.name === 'Module')?.value ?? '' } ] if (ctx.cachedMsg.initialTxId) { tagsIn.push({ name: 'Pushed-For', value: ctx.cachedMsg.initialTxId }) } - return buildAndSign({ + return tagsIn + }) + .chain( + (tags) => buildAndSign({ processId: ctx.cachedMsg.msg.Target, - tags: tagsIn, + tags, anchor: ctx.cachedMsg.msg.Anchor, data: ctx.cachedMsg.msg.Data }) - })) + ) .map(assoc('tx', __, ctx)) .map(ctxSchema.parse) .map(logger.tap('Added tx to ctx')) diff --git a/servers/mu/src/domain/lib/processDataItem/build-tx.test.js b/servers/mu/src/domain/lib/processDataItem/build-tx.test.js index be052eee8..cb376e58e 100644 --- a/servers/mu/src/domain/lib/processDataItem/build-tx.test.js +++ b/servers/mu/src/domain/lib/processDataItem/build-tx.test.js @@ -21,18 +21,26 @@ async function buildAndSign ({ processId, tags, anchor }) { return { id: 'id-1', - // the real function doesnt return tags as data - // doing this here for testing data: Buffer.alloc(0), processId } } +async function locateProcess () { + return { url: 'sched-url' } +} + +async function fetchSchedulerProcess () { + return { tags: [{ name: 'Module', value: 'mod-1' }] } +} + describe('buildTx', () => { test('build and sign a tx from a cached msg', async () => { const buildTx = buildTxWith({ buildAndSign, - logger + logger, + locateProcess, + fetchSchedulerProcess }) const result = await buildTx({ From cd9bebce3d6471f90a4bc51c24b1e203efc3db85 Mon Sep 17 00:00:00 2001 From: Vince Juliano Date: Fri, 9 Feb 2024 11:49:52 -0500 Subject: [PATCH 2/4] feat(mu): add lru cache to scheduler fetch process --- servers/mu/src/domain/clients/scheduler.js | 27 +++++++++++++++++----- servers/mu/src/domain/index.js | 9 ++++++-- 2 files changed, 28 insertions(+), 8 deletions(-) diff --git a/servers/mu/src/domain/clients/scheduler.js b/servers/mu/src/domain/clients/scheduler.js index 81760cdc6..163f4159c 100644 --- a/servers/mu/src/domain/clients/scheduler.js +++ b/servers/mu/src/domain/clients/scheduler.js @@ -34,13 +34,28 @@ function writeDataItemWith ({ fetch, logger }) { } } -function fetchSchedulerProcessWith ({ fetch, logger }) { - return async (processId, suUrl) => { - logger(`${suUrl}/processes/${processId}`) +function fetchSchedulerProcessWith ({ fetch, logger, setByProcess, getByProcess }) { + return (processId, suUrl) => { + return getByProcess(processId) + .then(cached => { + if (cached) { + logger(`cached process found ${processId}`) + return cached + } - return fetch(`${suUrl}/processes/${processId}`) - .then(res => res.json()) - .then(res => res || {}) + logger(`${suUrl}/processes/${processId}`) + + return fetch(`${suUrl}/processes/${processId}`) + .then(res => res.json()) + .then(res => { + if (res) { + return setByProcess(processId, res).then(() => res) + } + }) + }) + .catch((e) => { + logger(`error fetching process ${e}`) + }) } } diff --git a/servers/mu/src/domain/index.js b/servers/mu/src/domain/index.js index c6ecceec5..fa731a616 100644 --- a/servers/mu/src/domain/index.js +++ b/servers/mu/src/domain/index.js @@ -8,6 +8,7 @@ import schedulerClient from './clients/scheduler.js' import signerClient from './clients/signer.js' import uploaderClient from './clients/uploader.js' import osClient from './clients/os.js' +import * as InMemoryClient from './clients/in-memory.js' import dataStoreClient from './lib/datastore.js' import { processMsgWith, crankMsgsWith, processSpawnWith, monitorProcessWith, stopMonitorProcessWith, sendDataItemWith, traceMsgsWith } from './lib/main.js' @@ -43,13 +44,17 @@ export const createApis = (ctx) => { const logger = ctx.logger const fetch = ctx.fetch - const { locate, raw } = schedulerUtilsConnect({ cacheSize: 100, GATEWAY_URL: ctx.GATEWAY_URL, followRedirects: true }) + const { locate, raw } = schedulerUtilsConnect({ cacheSize: 500, GATEWAY_URL: ctx.GATEWAY_URL, followRedirects: true }) /** * hate side effects like this, see TODO in ./dbInstance.js */ createDbClient({ MU_DATABASE_URL }) + const cache = InMemoryClient.createLruCache({ size: 500 }) + const getByProcess = InMemoryClient.getByProcessWith({ cache }) + const setByProcess = InMemoryClient.setByProcessWith({ cache }) + const processMsgLogger = logger.child('processMsg') const processMsg = processMsgWith({ selectNode: cuClient.selectNodeWith({ CU_URL, logger: processMsgLogger }), @@ -57,7 +62,7 @@ export const createApis = (ctx) => { locateScheduler: raw, locateProcess: locate, writeDataItem: schedulerClient.writeDataItemWith({ fetch, logger: processMsgLogger }), - fetchSchedulerProcess: schedulerClient.fetchSchedulerProcessWith({ fetch, logger: processMsgLogger }), + fetchSchedulerProcess: schedulerClient.fetchSchedulerProcessWith({ getByProcess, setByProcess, fetch, logger: processMsgLogger }), buildAndSign: signerClient.buildAndSignWith({ MU_WALLET, logger: processMsgLogger }), fetchResult: cuClient.resultWith({ fetch, CU_URL, logger: processMsgLogger }), saveMsg: dataStoreClient.saveMsgWith({ dbInstance, logger: processMsgLogger }), From 71015272a681fc995bd48ca94283a295914451d6 Mon Sep 17 00:00:00 2001 From: Vince Juliano Date: Fri, 9 Feb 2024 12:01:56 -0500 Subject: [PATCH 3/4] fix(mu): remove ttl from lru cache #270 --- scheduler-utils/src/client/in-memory.js | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/scheduler-utils/src/client/in-memory.js b/scheduler-utils/src/client/in-memory.js index 55e724969..84047bfe0 100644 --- a/scheduler-utils/src/client/in-memory.js +++ b/scheduler-utils/src/client/in-memory.js @@ -36,8 +36,8 @@ export function getByProcessWith ({ cache = internalCache }) { } export function setByProcessWith ({ cache = internalCache }) { - return async (processId, processData, ttl) => { + return async (processId, processData) => { if (!internalSize) return - return cache.set(processId, processData, { ttl }) + return cache.set(processId, processData) } } From 6d1c1d9051ee91810dca8e6770caf1e2f9d7199a Mon Sep 17 00:00:00 2001 From: Vince Juliano Date: Fri, 9 Feb 2024 12:09:05 -0500 Subject: [PATCH 4/4] fix(scheduler-utils): fix accidental change in scheduler utils --- scheduler-utils/src/client/in-memory.js | 22 ++++++++++++++++++---- servers/mu/src/domain/clients/in-memory.js | 4 ++-- 2 files changed, 20 insertions(+), 6 deletions(-) diff --git a/scheduler-utils/src/client/in-memory.js b/scheduler-utils/src/client/in-memory.js index 84047bfe0..fe3a8131e 100644 --- a/scheduler-utils/src/client/in-memory.js +++ b/scheduler-utils/src/client/in-memory.js @@ -29,15 +29,29 @@ export function createLruCache ({ size }) { } export function getByProcessWith ({ cache = internalCache }) { - return async (processId) => { + return async (process) => { if (!internalSize) return - return cache.get(processId) + return cache.get(process) } } export function setByProcessWith ({ cache = internalCache }) { - return async (processId, processData) => { + return async (process, { url, address }, ttl) => { if (!internalSize) return - return cache.set(processId, processData) + return cache.set(process, { url, address }, { ttl }) + } +} + +export function getByOwnerWith ({ cache = internalCache }) { + return async (owner) => { + if (!internalSize) return + return cache.get(owner) + } +} + +export function setByOwnerWith ({ cache = internalCache }) { + return async (owner, url, ttl) => { + if (!internalSize) return + return cache.set(owner, { url, address: owner }, { ttl }) } } diff --git a/servers/mu/src/domain/clients/in-memory.js b/servers/mu/src/domain/clients/in-memory.js index 55e724969..84047bfe0 100644 --- a/servers/mu/src/domain/clients/in-memory.js +++ b/servers/mu/src/domain/clients/in-memory.js @@ -36,8 +36,8 @@ export function getByProcessWith ({ cache = internalCache }) { } export function setByProcessWith ({ cache = internalCache }) { - return async (processId, processData, ttl) => { + return async (processId, processData) => { if (!internalSize) return - return cache.set(processId, processData, { ttl }) + return cache.set(processId, processData) } }