Skip to content

Commit

Permalink
fix(scheduler-utils): wrap all side effects in schemas. Store ttl in …
Browse files Browse the repository at this point in the history
…byOwner cache
  • Loading branch information
TillaTheHun0 committed Apr 16, 2024
1 parent f486e20 commit 03bf97c
Show file tree
Hide file tree
Showing 14 changed files with 166 additions and 74 deletions.
16 changes: 15 additions & 1 deletion scheduler-utils/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion scheduler-utils/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@
},
"dependencies": {
"lru-cache": "^10.2.0",
"ramda": "^0.29.1"
"ramda": "^0.29.1",
"zod": "^3.22.4"
},
"devDependencies": {
"esbuild": "^0.20.1",
Expand Down
2 changes: 1 addition & 1 deletion scheduler-utils/src/client/gateway.js
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,6 @@ export function loadSchedulerWith ({ fetch, GRAPHQL_URL }) {
.then(([url, ttl]) => {
if (!url) throw new InvalidSchedulerLocationError('No "Url" tag found on Scheduler-Location')
if (!ttl) throw new InvalidSchedulerLocationError('No "Time-To-Live" tag found on Scheduler-Location')
return { url, ttl, owner: walletAddress }
return { url, ttl, address: walletAddress }
})
}
55 changes: 33 additions & 22 deletions scheduler-utils/src/client/gateway.test.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { describe, test } from 'node:test'
import * as assert from 'node:assert'

import { loadProcessSchedulerSchema, loadSchedulerSchema } from '../dal.js'
import { InvalidSchedulerLocationError, SchedulerTagNotFoundError } from '../err.js'
import { loadProcessSchedulerWith, loadSchedulerWith } from './gateway.js'

Expand Down Expand Up @@ -61,16 +62,18 @@ describe('gateway', () => {
}
}

const loadProcessScheduler = loadProcessSchedulerWith({
GRAPHQL_URL,
fetch: mockFetch
})
const loadProcessScheduler = loadProcessSchedulerSchema.implement(
loadProcessSchedulerWith({
GRAPHQL_URL,
fetch: mockFetch
})
)

await loadProcessScheduler(PROCESS)
.then((res) => {
assert.equal(res.url, 'https://foo.bar')
assert.equal(res.ttl, `${TWO_DAYS}`)
assert.equal(res.owner, SCHEDULER)
assert.equal(res.address, SCHEDULER)
})
})

Expand All @@ -94,10 +97,12 @@ describe('gateway', () => {
}
}

const loadProcessScheduler = loadProcessSchedulerWith({
GRAPHQL_URL,
fetch: mockFetch
})
const loadProcessScheduler = loadProcessSchedulerSchema.implement(
loadProcessSchedulerWith({
GRAPHQL_URL,
fetch: mockFetch
})
)

await loadProcessScheduler(PROCESS)
.catch((err) => assert.ok(err instanceof SchedulerTagNotFoundError))
Expand Down Expand Up @@ -129,16 +134,18 @@ describe('gateway', () => {
}
}

const loadScheduler = loadSchedulerWith({
GRAPHQL_URL,
fetch: mockFetch
})
const loadScheduler = loadSchedulerSchema.implement(
loadSchedulerWith({
GRAPHQL_URL,
fetch: mockFetch
})
)

await loadScheduler(SCHEDULER)
.then((res) => {
assert.equal(res.url, 'https://foo.bar')
assert.equal(res.ttl, `${TWO_DAYS}`)
assert.equal(res.owner, SCHEDULER)
assert.equal(res.address, SCHEDULER)
})
})

Expand Down Expand Up @@ -166,10 +173,12 @@ describe('gateway', () => {
}
}

const loadScheduler = loadSchedulerWith({
GRAPHQL_URL,
fetch: mockFetch
})
const loadScheduler = loadSchedulerSchema.implement(
loadSchedulerWith({
GRAPHQL_URL,
fetch: mockFetch
})
)

await loadScheduler(SCHEDULER)
.catch((err) => {
Expand Down Expand Up @@ -202,10 +211,12 @@ describe('gateway', () => {
}
}

const loadScheduler = loadSchedulerWith({
GRAPHQL_URL,
fetch: mockFetch
})
const loadScheduler = loadSchedulerSchema.implement(
loadSchedulerWith({
GRAPHQL_URL,
fetch: mockFetch
})
)

await loadScheduler(SCHEDULER)
.catch((err) => {
Expand Down
2 changes: 1 addition & 1 deletion scheduler-utils/src/client/in-memory.js
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,6 @@ export function getByOwnerWith ({ cache }) {
export function setByOwnerWith ({ cache }) {
return async (owner, url, ttl) => {
if (!cache.max) return
return cache.set(owner, { url, address: owner }, { ttl })
return cache.set(owner, { url, address: owner, ttl }, { ttl })
}
}
53 changes: 29 additions & 24 deletions scheduler-utils/src/client/in-memory.test.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { describe, test, beforeEach } from 'node:test'
import { describe, test } from 'node:test'
import * as assert from 'node:assert'

import { getByOwnerSchema, getByProcessSchema, setByOwnerSchema, setByProcessSchema } from '../dal.js'
import { createLruCache, getByProcessWith, getByOwnerWith, setByProcessWith, setByOwnerWith } from './in-memory.js'

const PROCESS = 'zc24Wpv_i6NNCEdxeKt7dcNrqL5w0hrShtSCcFGGL24'
Expand All @@ -10,67 +11,71 @@ const TEN_MS = 10
const SIZE = 10

describe('in-memory', () => {
const cache = createLruCache({ size: SIZE })

beforeEach(() => {
cache.clear()
})

describe('getByProcessWith', () => {
test('returns the url if in cache', async () => {
const getByProcess = getByProcessWith({ cache })
test('returns the cached entry', async () => {
const cache = createLruCache({ size: SIZE })
const getByProcess = getByProcessSchema.implement(getByProcessWith({ cache }))
assert.equal(await getByProcess(PROCESS), undefined)
cache.set(PROCESS, { url: DOMAIN, address: SCHEDULER })
assert.deepStrictEqual(await getByProcess(PROCESS), { url: DOMAIN, address: SCHEDULER })
})

test('returns undefined if cache size is set to 0', async () => {
const getByProcess = getByProcessWith({ cache: createLruCache({ size: 0 }) })
const cache = createLruCache({ size: 0 })
const getByProcess = getByProcessSchema.implement(getByProcessWith({ cache }))
assert.equal(await getByProcess(PROCESS), undefined)
cache.set(PROCESS, { url: DOMAIN, address: SCHEDULER })
assert.deepStrictEqual(await getByProcess(PROCESS), undefined)
})
})

describe('getByOwnerWith', () => {
test('returns the url if in cache', async () => {
const getByOwner = getByOwnerWith({ cache })
test('returns the cached entry', async () => {
const cache = createLruCache({ size: SIZE })
const getByOwner = getByOwnerSchema.implement(
getByOwnerWith({ cache })
)
assert.equal(await getByOwner(SCHEDULER), undefined)
cache.set(SCHEDULER, { url: DOMAIN, address: SCHEDULER })
assert.deepStrictEqual(await getByOwner(SCHEDULER), { url: DOMAIN, address: SCHEDULER })
cache.set(SCHEDULER, { url: DOMAIN, address: SCHEDULER, ttl: 10 })
assert.deepStrictEqual(await getByOwner(SCHEDULER), { url: DOMAIN, address: SCHEDULER, ttl: 10 })
})

test('returns undefined if cache size is set to 0', async () => {
const getByOwner = getByOwnerWith({ cache: createLruCache({ size: 0 }) })
const cache = createLruCache({ size: 0 })
const getByOwner = getByOwnerSchema.implement(getByOwnerWith({ cache }))
assert.equal(await getByOwner(SCHEDULER), undefined)
cache.set(SCHEDULER, { url: DOMAIN, address: SCHEDULER })
cache.set(SCHEDULER, { url: DOMAIN, address: SCHEDULER, ttl: 10 })
assert.deepStrictEqual(await getByOwner(SCHEDULER), undefined)
})
})

describe('setByProcessWith', () => {
test('sets the value in cache', async () => {
const setByProcess = setByProcessWith({ cache })
await setByProcess(PROCESS, DOMAIN, TEN_MS)
assert.ok(cache.has(PROCESS))
const cache = createLruCache({ size: SIZE })
const setByProcess = setByProcessSchema.implement(setByProcessWith({ cache }))
await setByProcess(PROCESS, { url: DOMAIN, address: SCHEDULER }, TEN_MS)
assert.deepStrictEqual(cache.get(PROCESS), { url: DOMAIN, address: SCHEDULER })
})

test('does nothing if cache size is set to 0', async () => {
const setByProcess = setByProcessWith({ cache: createLruCache({ size: 0 }) })
await setByProcess(PROCESS, DOMAIN, TEN_MS)
const cache = createLruCache({ size: 0 })
const setByProcess = setByProcessSchema.implement(setByProcessWith({ cache }))
await setByProcess(PROCESS, { url: DOMAIN, address: SCHEDULER }, TEN_MS)
assert.ok(!cache.has(PROCESS))
})
})

describe('setByOwnerWith', () => {
test('sets the value in cache', async () => {
const setByOwner = setByOwnerWith({ cache })
const cache = createLruCache({ size: SIZE })
const setByOwner = setByOwnerSchema.implement(setByOwnerWith({ cache }))
await setByOwner(SCHEDULER, DOMAIN, TEN_MS)
assert.ok(cache.has(SCHEDULER))
assert.deepStrictEqual(cache.get(SCHEDULER), { url: DOMAIN, address: SCHEDULER, ttl: TEN_MS })
})

test('does nothing if cache size is set to 0', async () => {
const setByOwner = setByOwnerWith({ cache: createLruCache({ size: 0 }) })
const cache = createLruCache({ size: 0 })
const setByOwner = setByOwnerSchema.implement(setByOwnerWith({ cache }))
await setByOwner(SCHEDULER, DOMAIN, TEN_MS)
assert.ok(!cache.has(SCHEDULER))
})
Expand Down
9 changes: 7 additions & 2 deletions scheduler-utils/src/client/scheduler.test.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { test } from 'node:test'
import assert from 'node:assert'
import { checkForRedirectWith } from './scheduler.js'
import { checkForRedirectSchema } from '../dal.js'

const mockFetch = (url, options) => {
if (options.method === 'GET' && !url.includes('no-redirect')) {
Expand All @@ -22,13 +23,17 @@ const mockFetch = (url, options) => {
}

test('checkForRedirectWith should return new location on redirect', async (_t) => {
const checkForRedirect = checkForRedirectWith({ fetch: mockFetch })
const checkForRedirect = checkForRedirectSchema.implement(
checkForRedirectWith({ fetch: mockFetch })
)
const result = await checkForRedirect('http://example.com/redirect', 'test-process')
assert.strictEqual(result, 'http://newlocation.com', 'The function should return the new location URL on redirect')
})

test('checkForRedirectWith should return original URL if no redirect', async (_t) => {
const checkForRedirect = checkForRedirectWith({ fetch: mockFetch })
const checkForRedirect = checkForRedirectSchema.implement(
checkForRedirectWith({ fetch: mockFetch })
)
const result = await checkForRedirect('http://example.com/no-redirect', 'test-process')
assert.strictEqual(result, 'http://example.com/no-redirect', 'The function should return the original URL if there is no redirect')
})
30 changes: 30 additions & 0 deletions scheduler-utils/src/dal.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import { z } from 'zod'

const processCacheEntry = z.object({ url: z.string(), address: z.string() })
const scheduler = z.object({ url: z.string(), address: z.string(), ttl: z.coerce.number() })

export const checkForRedirectSchema = z.function()
.args(z.string(), z.string())
.returns(z.promise(z.string()))

export const getByProcessSchema = z.function()
.args(z.string())
.returns(z.promise(processCacheEntry.nullish()))

export const setByProcessSchema = z.function()
.args(z.string(), processCacheEntry, z.number())
.returns(z.promise(z.any()))

export const getByOwnerSchema = z.function()
.args(z.string())
.returns(z.promise(scheduler.nullish()))

export const setByOwnerSchema = z.function()
.args(z.string(), z.string(), z.number())
.returns(z.promise(z.any()))

export const loadSchedulerSchema = z.function()
.args(z.string())
.returns(z.promise(scheduler))

export const loadProcessSchedulerSchema = loadSchedulerSchema
22 changes: 16 additions & 6 deletions scheduler-utils/src/locate.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,14 @@
import { checkForRedirectSchema, getByOwnerSchema, getByProcessSchema, loadProcessSchedulerSchema, loadSchedulerSchema, setByOwnerSchema, setByProcessSchema } from './dal.js'

export function locateWith ({ loadProcessScheduler, loadScheduler, cache, followRedirects, checkForRedirect }) {
loadProcessScheduler = loadProcessSchedulerSchema.implement(loadProcessScheduler)
loadScheduler = loadSchedulerSchema.implement(loadScheduler)
checkForRedirect = checkForRedirectSchema.implement(checkForRedirect)
const getByProcess = getByProcessSchema.implement(cache.getByProcess)
const getByOwner = getByOwnerSchema.implement(cache.getByOwner)
const setByProcess = setByProcessSchema.implement(cache.setByProcess)
const setByOwner = setByOwnerSchema.implement(cache.setByOwner)

/**
* Locate the scheduler for the given process.
*
Expand All @@ -11,7 +21,7 @@ export function locateWith ({ loadProcessScheduler, loadScheduler, cache, follow
* @returns {Promise<{ url: string, address: string }>} - an object whose url field is the Scheduler Location
*/
return (process, schedulerHint) =>
cache.getByProcess(process)
getByProcess(process)
.then(async (cached) => {
if (cached) return cached

Expand All @@ -23,11 +33,11 @@ export function locateWith ({ loadProcessScheduler, loadScheduler, cache, follow
* query the Scheduler-Location record directly
*/
if (schedulerHint) {
const byOwner = await cache.getByOwner(schedulerHint)
if (byOwner) return (byOwner)
const byOwner = await getByOwner(schedulerHint)
if (byOwner) return byOwner

return loadScheduler(schedulerHint).then((scheduler) => {
cache.setByOwner(scheduler.owner, scheduler.url, scheduler.ttl)
setByOwner(scheduler.address, scheduler.url, scheduler.ttl)
return scheduler
})
}
Expand All @@ -43,8 +53,8 @@ export function locateWith ({ loadProcessScheduler, loadScheduler, cache, follow
*/
if (followRedirects) finalUrl = await checkForRedirect(scheduler.url, process)

const byProcess = { url: finalUrl, address: scheduler.owner }
await cache.setByProcess(process, byProcess, scheduler.ttl)
const byProcess = { url: finalUrl, address: scheduler.address }
await setByProcess(process, byProcess, scheduler.ttl)
return byProcess
})
})
Expand Down
Loading

0 comments on commit 03bf97c

Please sign in to comment.