Skip to content

Commit

Permalink
feat: force cleanup at boot with SYNC_FORCE_REMOVE=true (#956)
Browse files Browse the repository at this point in the history
#### Migration notes

None

- [ ] The change comes with new or modified tests
- [ ] Hard-to-understand functions have explanatory comments
- [ ] End-user documentation is updated to reflect the change


<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

- **New Features**
- Added optional `SYNC_FORCE_REMOVE` configuration variable for typegate
synchronization.
	- Introduced ability to forcefully remove cached typegraphs at boot.
- Added a new method to retrieve all history entries from the Redis
replicated map.
- Introduced a new function to return a greeting based on a provided
name.
- Added a synchronization feature test suite for validating cleanup
logic.

- **Documentation**
- Updated documentation to reflect new synchronization configuration
option.

- **Improvements**
- Enhanced the `Typegate` class with a method to facilitate bulk removal
of typegraphs during initialization.
- Made the `replicatedMap` parameter publicly accessible in the
`ReplicatedRegister` class constructor.
- Updated configuration retrieval to include the new `forceRemove`
property.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->

---------

Co-authored-by: Yohe-Am <[email protected]>
  • Loading branch information
michael-0acf4 and Yohe-Am committed Jan 15, 2025
1 parent 5bb03ef commit 2ac0fdc
Show file tree
Hide file tree
Showing 12 changed files with 203 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ Synchronization variable names start with `SYNC_`.
| SYNC\__S3_SECRET_KEY (\_Required_) | Access key secret for the S3 store credentials; |
| SYNC\__S3_PATH_STYLE (\_Optional_) | `true` or `false`, force path style if `true`. |
| SYNC\__S3_BUCKET (\_Required_) | The bucket to be used for the system (dedicated). |
| SYNC\__FORCE_REMOVE (\_Optional_) | `true` or `false`, Undeploy cached typegraphs at boot |

## Synchronized mode features

Expand Down
1 change: 1 addition & 0 deletions src/typegate/src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ export function transformSyncConfig(raw: SyncConfig): SyncConfigX {
redis,
s3,
s3Bucket: raw.s3_bucket,
forceRemove: raw.force_remove
};
}

Expand Down
2 changes: 2 additions & 0 deletions src/typegate/src/config/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -115,12 +115,14 @@ export const syncConfigSchema = z.object({
s3_access_key: refineEnvVar("SYNC_S3_ACCESS_KEY"),
s3_secret_key: refineEnvVar("SYNC_S3_SECRET_KEY"),
s3_path_style: zBooleanString.default(false),
force_remove: zBooleanString.default(false),
});
export type SyncConfig = z.infer<typeof syncConfigSchema>;
export type SyncConfigX = {
redis: RedisConnectOptions;
s3: S3ClientConfig;
s3Bucket: string;
forceRemove?: boolean
};

export type TypegateConfig = {
Expand Down
1 change: 0 additions & 1 deletion src/typegate/src/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ try {
base: defaultTypegateConfigBase,
});
const typegate = await Typegate.init(config);

await SystemTypegraph.loadAll(typegate, !globalConfig.packaged);

const server = Deno.serve(
Expand Down
28 changes: 22 additions & 6 deletions src/typegate/src/sync/replicated_map.ts
Original file line number Diff line number Diff line change
Expand Up @@ -105,27 +105,43 @@ export class RedisReplicatedMap<T> implements AsyncDisposable {
this.redisObs.close();
}

async getAllHistory() {
const { key, redis } = this;
const all = await redis.hgetall(key);
const history = [];
for (let i = 0; i < all.length; i += 2) {
history.push({
name: all[i],
payload: all[i+1]
});
}

return history;
}

async historySync(): Promise<XIdInput> {
const { key, redis, deserializer } = this;
const { redis, deserializer } = this;

// get last received message before loading history
const [lastMessage] = await redis.xrevrange(this.ekey, "+", "-", 1);
const lastId = lastMessage ? lastMessage.xid : 0;
logger.debug("last message loaded: {}", lastId);

const all = await redis.hgetall(key);
const all = await this.getAllHistory();
logger.debug("history load start: {} elements", all.length);
for (let i = 0; i < all.length; i += 2) {
const name = all[i];
const payload = all[i + 1];

for (const { name, payload } of all) {
logger.info(`reloaded addition: ${name}`);
ensure(
!this.memory.has(name),
() => `typegraph ${name} should not exists in memory at first sync`,
);
this.memory.set(name, await deserializer(payload, true));

const engine = await deserializer(payload, true);
this.memory.set(name, engine);
}
logger.debug("history load end");

return lastId;
}

Expand Down
35 changes: 33 additions & 2 deletions src/typegate/src/typegate/mod.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ import type { ArtifactStore } from "./artifacts/mod.ts";
// TODO move from tests (MET-497)
import { MemoryRegister } from "./memory_register.ts";
import { NoLimiter } from "./no_limiter.ts";
import { TypegraphStore } from "../sync/typegraph.ts";
import { typegraphIdSchema, TypegraphStore } from "../sync/typegraph.ts";
import { createLocalArtifactStore } from "./artifacts/local.ts";
import { createSharedArtifactStore } from "./artifacts/shared.ts";
import { AsyncDisposableStack } from "dispose";
Expand Down Expand Up @@ -141,15 +141,30 @@ export class Typegate implements AsyncDisposable {
stack.move(),
);

const typegraphStore = TypegraphStore.init(syncConfig, cryptoKeys);
const register = await ReplicatedRegister.init(
typegate,
syncConfig.redis,
TypegraphStore.init(syncConfig, cryptoKeys),
typegraphStore
);
typegate.disposables.use(register);

(typegate as { register: Register }).register = register;


if (config.sync?.forceRemove) {
logger.warn("Force removal at boot enabled");
const history = await register.replicatedMap.getAllHistory();
for (const { name, payload } of history) {
try {
await typegate.forceRemove(name, payload, typegraphStore);
} catch (e) {
logger.error(`Failed to force remove typegraph "${name}": ${e}`);
Sentry.captureException(e);
}
}
}

const lastSync = await register.historySync().catch((err) => {
logger.error(err);
throw new Error(
Expand Down Expand Up @@ -403,6 +418,22 @@ export class Typegate implements AsyncDisposable {
await this.artifactStore.runArtifactGC();
}

async forceRemove(name: string, payload: string, typegraphStore: TypegraphStore) {
logger.warn(`Dropping "${name}": started`);
const typegraphId = typegraphIdSchema.parse(JSON.parse(payload));
const [tg] = await typegraphStore.download(
typegraphId,
);
const artifacts = new Set(
Object.values(tg.meta.artifacts).map((m) => m.hash),
);

await this.register.remove(name);
await this.artifactStore.updateRefCounts(new Set(), artifacts);
await this.artifactStore.runArtifactGC();
logger.warn(`Dropping "${name}": done`);
}

async initQueryEngine(
tgDS: TypeGraphDS,
secretManager: SecretManager,
Expand Down
2 changes: 1 addition & 1 deletion src/typegate/src/typegate/register.ts
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ export class ReplicatedRegister extends Register {
return new ReplicatedRegister(replicatedMap);
}

constructor(private replicatedMap: RedisReplicatedMap<QueryEngine>) {
constructor(public replicatedMap: RedisReplicatedMap<QueryEngine>) {
super();
}

Expand Down
1 change: 1 addition & 0 deletions tests/e2e/published/published_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ const syncConfig = transformSyncConfig({
s3_secret_key: syncEnvs.SYNC_S3_SECRET_KEY,
s3_bucket: syncEnvs.SYNC_S3_BUCKET,
s3_path_style: true,
force_remove: false
});

// put here typegraphs that are to be excluded
Expand Down
6 changes: 6 additions & 0 deletions tests/sync/scripts/hello.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
// Copyright Metatype OÜ, licensed under the Mozilla Public License Version 2.0.
// SPDX-License-Identifier: MPL-2.0

export function hello({ name }: { name: string }) {
return `Hello ${name}`;
}
21 changes: 21 additions & 0 deletions tests/sync/sync.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# Copyright Metatype OÜ, licensed under the Mozilla Public License Version 2.0.
# SPDX-License-Identifier: MPL-2.0

from typegraph import t, typegraph, Policy, Graph
from typegraph.runtimes.deno import DenoRuntime


@typegraph()
def sync(g: Graph):
deno = DenoRuntime()
public = Policy.public()

g.expose(
hello=deno.import_(
t.struct({"name": t.string()}),
t.string(),
name="hello",
module="scripts/hello.ts",
secrets=["ULTRA_SECRET"],
).with_policy(public)
)
1 change: 1 addition & 0 deletions tests/sync/sync_config_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ Deno.test("test sync config", async (t) => {
Deno.env.set("SYNC_S3_BUCKET", "bucket");

assertEquals(getSyncConfig(), {
forceRemove: false,
redis: {
hostname: "localhost",
port: "6379",
Expand Down
114 changes: 114 additions & 0 deletions tests/sync/sync_force_remove_test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
// Copyright Metatype OÜ, licensed under the Mozilla Public License Version 2.0.
// SPDX-License-Identifier: MPL-2.0

import { gql, Meta } from "test-utils/mod.ts";
import { connect } from "redis";
import { S3Client } from "aws-sdk/client-s3";
import { createBucket, listObjects, tryDeleteBucket } from "test-utils/s3.ts";
import { assertEquals } from "@std/assert";
import { clearSyncData, setupSync } from "test-utils/hooks.ts";
import { Typegate } from "@metatype/typegate/typegate/mod.ts";
import {
defaultTypegateConfigBase,
getTypegateConfig,
SyncConfig,
} from "@metatype/typegate/config.ts";

const redisKey = "typegraph";
const redisEventKey = "typegraph_event";

async function cleanUp(config: typeof syncConfig) {
using redis = await connect(config.redis);
await redis.del(redisKey);
await redis.del(redisEventKey);

const s3 = new S3Client(config.s3);
await tryDeleteBucket(s3, config.s3Bucket);
await createBucket(s3, config.s3Bucket);
s3.destroy();
await redis.quit();
}

const syncConfig = {
redis: {
hostname: "localhost",
port: 6379,
password: "password",
db: 1,
},
s3: {
endpoint: "http://localhost:9000",
region: "local",
credentials: {
accessKeyId: "minio",
secretAccessKey: "password",
},
forcePathStyle: true,
},
s3Bucket: "metatype-deno-runtime-sync-test",
};

async function spawnGate(syncConfig: SyncConfig) {
const config = getTypegateConfig({
base: {
...defaultTypegateConfigBase,
},
});

return await Typegate.init({
...config,
sync: syncConfig,
});
}

Meta.test(
{
name: "Force cleanup at boot on sync mode",
syncConfig,
async setup() {
await clearSyncData(syncConfig);
await setupSync(syncConfig);
},
async teardown() {
await cleanUp(syncConfig);
},
},
async (t) => {
await t.should(
"cleanup if forceRemove is true",
async () => {
const _engine = await t.engine("sync/sync.py", {
secrets: {
ULTRA_SECRET:
"if_you_can_read_me_on_an_ERROR_there_is_a_bug",
},
});

const s3 = new S3Client(syncConfig.s3);
const initialObjects = await listObjects(s3, syncConfig.s3Bucket);
assertEquals(initialObjects?.length, 3);

const gateNoRemove = await spawnGate(syncConfig);
const namesNoRemove = gateNoRemove.register.list().map(({ name }) =>
name
);

const gateAfterRemove = await spawnGate({
...syncConfig,
forceRemove: true,
});
const namesAfterRemove = gateAfterRemove.register.list().map((
{ name },
) => name);

t.addCleanup(async () => {
await gateNoRemove[Symbol.asyncDispose]();
await gateAfterRemove[Symbol.asyncDispose]();
});

assertEquals(namesNoRemove, ["sync"]);
assertEquals(namesAfterRemove, []); // !
},
);
},
);

0 comments on commit 2ac0fdc

Please sign in to comment.