Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Vince juliano/mu from module 270 #447

Merged
merged 4 commits into from
Feb 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions servers/mu/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions servers/mu/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
"express": "^4.18.2",
"heapdump": "^0.3.15",
"hyper-async": "^1.1.2",
"lru-cache": "^10.2.0",
"pg": "^8.11.3",
"pg-promise": "^11.5.4",
"ramda": "^0.29.1",
Expand Down
43 changes: 43 additions & 0 deletions servers/mu/src/domain/clients/in-memory.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import { LRUCache } from 'lru-cache'

/**
* @type {LRUCache}
*/
let internalCache
let internalSize
export function createLruCache ({ size }) {
if (internalCache) return internalCache
internalSize = size
internalCache = new LRUCache({
/**
* number of entries
*/
max: size,
/**
* max size of cache, as a scalar.
*
* In our case, characters (see sizeCalculation)
*/
maxSize: 1_000_000 * 5,
/**
* Simply stringify to get the bytes
*/
sizeCalculation: (v) => JSON.stringify(v).length,
allowStale: true
})
return internalCache
}

export function getByProcessWith ({ cache = internalCache }) {
return async (processId) => {
if (!internalSize) return
return cache.get(processId)
}
}

export function setByProcessWith ({ cache = internalCache }) {
return async (processId, processData) => {
if (!internalSize) return
return cache.set(processId, processData)
}
}
29 changes: 22 additions & 7 deletions servers/mu/src/domain/clients/scheduler.js
Original file line number Diff line number Diff line change
Expand Up @@ -34,17 +34,32 @@ function writeDataItemWith ({ fetch, logger }) {
}
}

function fetchSequencerProcessWith ({ logger }) {
return async (processId, suUrl) => {
logger(`${suUrl}/processes/${processId}`)
function fetchSchedulerProcessWith ({ fetch, logger, setByProcess, getByProcess }) {
return (processId, suUrl) => {
return getByProcess(processId)
.then(cached => {
if (cached) {
logger(`cached process found ${processId}`)
return cached
}

return fetch(`${suUrl}/processes/${processId}`)
.then(res => res.json())
.then(res => res || {})
logger(`${suUrl}/processes/${processId}`)

return fetch(`${suUrl}/processes/${processId}`)
.then(res => res.json())
.then(res => {
if (res) {
return setByProcess(processId, res).then(() => res)
}
})
})
.catch((e) => {
logger(`error fetching process ${e}`)
})
}
}

export default {
writeDataItemWith,
fetchSequencerProcessWith
fetchSchedulerProcessWith
}
8 changes: 7 additions & 1 deletion servers/mu/src/domain/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import schedulerClient from './clients/scheduler.js'
import signerClient from './clients/signer.js'
import uploaderClient from './clients/uploader.js'
import osClient from './clients/os.js'
import * as InMemoryClient from './clients/in-memory.js'

import dataStoreClient from './lib/datastore.js'
import { processMsgWith, crankMsgsWith, processSpawnWith, monitorProcessWith, stopMonitorProcessWith, sendDataItemWith, traceMsgsWith } from './lib/main.js'
Expand Down Expand Up @@ -43,20 +44,25 @@ export const createApis = (ctx) => {
const logger = ctx.logger
const fetch = ctx.fetch

const { locate, raw } = schedulerUtilsConnect({ cacheSize: 100, GATEWAY_URL: ctx.GATEWAY_URL, followRedirects: true })
const { locate, raw } = schedulerUtilsConnect({ cacheSize: 500, GATEWAY_URL: ctx.GATEWAY_URL, followRedirects: true })

/**
* hate side effects like this, see TODO in ./dbInstance.js
*/
createDbClient({ MU_DATABASE_URL })

const cache = InMemoryClient.createLruCache({ size: 500 })
const getByProcess = InMemoryClient.getByProcessWith({ cache })
const setByProcess = InMemoryClient.setByProcessWith({ cache })

const processMsgLogger = logger.child('processMsg')
const processMsg = processMsgWith({
selectNode: cuClient.selectNodeWith({ CU_URL, logger: processMsgLogger }),
createDataItem,
locateScheduler: raw,
locateProcess: locate,
writeDataItem: schedulerClient.writeDataItemWith({ fetch, logger: processMsgLogger }),
fetchSchedulerProcess: schedulerClient.fetchSchedulerProcessWith({ getByProcess, setByProcess, fetch, logger: processMsgLogger }),
buildAndSign: signerClient.buildAndSignWith({ MU_WALLET, logger: processMsgLogger }),
fetchResult: cuClient.resultWith({ fetch, CU_URL, logger: processMsgLogger }),
saveMsg: dataStoreClient.saveMsgWith({ dbInstance, logger: processMsgLogger }),
Expand Down
9 changes: 5 additions & 4 deletions servers/mu/src/domain/lib/main.js
Original file line number Diff line number Diff line change
Expand Up @@ -160,9 +160,10 @@ export function processMsgWith ({
findLatestSpawns,
deleteMsg,
logger,
writeDataItemArweave
writeDataItemArweave,
fetchSchedulerProcess
}) {
const buildTx = buildTxWith({ buildAndSign, logger })
const buildTx = buildTxWith({ buildAndSign, logger, locateProcess, fetchSchedulerProcess })
const getCuAddress = getCuAddressWith({ selectNode, logger })
const writeMessage = writeMessageTxWith({ writeDataItem, locateProcess, logger, writeDataItemArweave })
const fetchAndSaveResult = fetchAndSaveResultWith({ fetchResult, saveMsg, saveSpawn, findLatestMsgs, findLatestSpawns, logger })
Expand Down Expand Up @@ -237,7 +238,7 @@ export function crankMsgsWith ({
export function monitorProcessWith ({
logger,
createDataItem,
startProcessMonitor,
startProcessMonitor
}) {
const parseDataItem = parseDataItemWith({ createDataItem, logger })
const start = startWith({ logger, startProcessMonitor })
Expand All @@ -252,7 +253,7 @@ export function monitorProcessWith ({
export function stopMonitorProcessWith ({
logger,
createDataItem,
stopProcessMonitor,
stopProcessMonitor
}) {
const parseDataItem = parseDataItemWith({ createDataItem, logger })
const stop = stopWith({ logger, stopProcessMonitor })
Expand Down
26 changes: 19 additions & 7 deletions servers/mu/src/domain/lib/processDataItem/build-tx.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,30 +10,42 @@ const ctxSchema = z.object({
})
}).passthrough()

export function buildTxWith ({ buildAndSign, logger }) {
export function buildTxWith (env) {
let { buildAndSign, logger, locateProcess, fetchSchedulerProcess } = env

locateProcess = fromPromise(locateProcess)
fetchSchedulerProcess = fromPromise(fetchSchedulerProcess)
buildAndSign = fromPromise(buildAndSign)

return (ctx) => {
return of(ctx)
.map(tap(() => ctx.tracer.trace('Building and signing message from outbox')))
.chain(fromPromise(() => {
.chain(() => locateProcess(ctx.cachedMsg.processId))
.chain((res) => fetchSchedulerProcess(ctx.cachedMsg.processId, res.url))
.map((res) => {
const tagsIn = [
...ctx.cachedMsg.msg.Tags?.filter((tag) => {
return !['Data-Protocol', 'Type', 'Variant', 'From-Process'].includes(tag.name)
return !['Data-Protocol', 'Type', 'Variant', 'From-Process', 'From-Module'].includes(tag.name)
}) ?? [],
{ name: 'Data-Protocol', value: 'ao' },
{ name: 'Type', value: 'Message' },
{ name: 'Variant', value: 'ao.TN.1' },
{ name: 'From-Process', value: ctx.cachedMsg.processId }
{ name: 'From-Process', value: ctx.cachedMsg.processId },
{ name: 'From-Module', value: res.tags.find((t) => t.name === 'Module')?.value ?? '' }
]
if (ctx.cachedMsg.initialTxId) {
tagsIn.push({ name: 'Pushed-For', value: ctx.cachedMsg.initialTxId })
}
return buildAndSign({
return tagsIn
})
.chain(
(tags) => buildAndSign({
processId: ctx.cachedMsg.msg.Target,
tags: tagsIn,
tags,
anchor: ctx.cachedMsg.msg.Anchor,
data: ctx.cachedMsg.msg.Data
})
}))
)
.map(assoc('tx', __, ctx))
.map(ctxSchema.parse)
.map(logger.tap('Added tx to ctx'))
Expand Down
14 changes: 11 additions & 3 deletions servers/mu/src/domain/lib/processDataItem/build-tx.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,26 @@ async function buildAndSign ({ processId, tags, anchor }) {

return {
id: 'id-1',
// the real function doesnt return tags as data
// doing this here for testing
data: Buffer.alloc(0),
processId
}
}

async function locateProcess () {
return { url: 'sched-url' }
}

async function fetchSchedulerProcess () {
return { tags: [{ name: 'Module', value: 'mod-1' }] }
}

describe('buildTx', () => {
test('build and sign a tx from a cached msg', async () => {
const buildTx = buildTxWith({
buildAndSign,
logger
logger,
locateProcess,
fetchSchedulerProcess
})

const result = await buildTx({
Expand Down