diff --git a/config/default.js b/config/default.js index 7c08768..29ff00d 100644 --- a/config/default.js +++ b/config/default.js @@ -20,6 +20,7 @@ export default { slackCron: process.env.SLACK_CRON || "*/10 * * * *", redis: { host: process.env.REDIS_HOST || "localhost", + remote_host: process.env.REDIS_REMOTE_HOST || "redis://redis:6379", }, logLevel: "info", rootDomain: process.env.ROOT_DOMAIN || "nos.social", diff --git a/docker-compose.yml b/docker-compose.yml index b72edb6..32167f8 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,6 +1,3 @@ ---- -version: "3.8" - services: server: build: . @@ -10,6 +7,7 @@ services: - NODE_ENV=development - REDIS_HOST=redis - ROOT_DOMAIN=localhost + redis: image: redis:7.2.4 ports: diff --git a/scripts/add_name b/scripts/add_name index 260c8f1..a631cbd 100755 --- a/scripts/add_name +++ b/scripts/add_name @@ -6,6 +6,7 @@ usage() { echo " NPUB - The public key." echo " RELAY_URLS - One or more relay URLs, each as a separate argument." echo " Note: This script requires the 'pubhex' secret to be set in the NIP05_SEC environment variable." + echo " The base URL can be changed by setting the BASE_URL environment variable. Default is 'https://nos.social'." echo "Dependencies:" echo " nostrkeytool - A tool for NOSTR keys, installable via 'cargo install nostrkeytool' (https://crates.io/crates/nostrkeytool)." echo " nak - A tool required for authentication, installable via 'go install github.com/fiatjaf/nak@latest' (https://github.com/fiatjaf/nak)." @@ -23,17 +24,17 @@ fi NAME="$1" NPUB="$2" RELAYS="${@:3}" - +BASE_URL="${BASE_URL:-https://nos.social}" RELAYS_JSON_ARRAY=$(printf "%s\n" $RELAYS | jq -R . | jq -s .) -BASE64_DELETE_AUTH_EVENT=$(nak event --content='' --kind 27235 -t method='DELETE' -t u="https://nos.social/api/names/$NAME" --sec $NIP05_SEC | base64) +BASE64_DELETE_AUTH_EVENT=$(nak event --content='' --kind 27235 -t method='DELETE' -t u="$BASE_URL/api/names/$NAME" --sec "$NIP05_SEC" | base64) -HTTP_STATUS=$(curl -s -o /dev/null -w "%{http_code}" -X DELETE "https://nos.social/api/names/$NAME" \ +HTTP_STATUS=$(curl -s -o /dev/null -w "%{http_code}" -X DELETE "$BASE_URL/api/names/$NAME" \ -H "Content-Type: application/json" \ -H "Authorization: Nostr $BASE64_DELETE_AUTH_EVENT") echo "HTTP Status from delete: $HTTP_STATUS" -PUBKEY=$(nostrkeytool --npub2pubkey $NPUB) +PUBKEY=$(nostrkeytool --npub2pubkey "$NPUB") JSON_PAYLOAD=$(jq -n \ --arg name "$NAME" \ @@ -41,8 +42,9 @@ JSON_PAYLOAD=$(jq -n \ --argjson relays "$RELAYS_JSON_ARRAY" \ '{name: $name, data: {pubkey: $pubkey, relays: $relays}}') -BASE64_AUTH_EVENT=$(nak event --content='' --kind 27235 -t method='POST' -t u='https://nos.social/api/names' --sec $NIP05_SEC | base64) -curl -s https://nos.social/api/names \ +BASE64_AUTH_EVENT=$(nak event --content='' --kind 27235 -t method='POST' -t u="$BASE_URL/api/names" --sec "$NIP05_SEC" | base64) + +curl -s "$BASE_URL/api/names" \ -H "Content-Type: application/json" \ -H "Authorization: Nostr $BASE64_AUTH_EVENT" \ - -d "$JSON_PAYLOAD" + -d "$JSON_PAYLOAD" \ No newline at end of file diff --git a/src/app.js b/src/app.js index a7919a1..cfe0faf 100644 --- a/src/app.js +++ b/src/app.js @@ -4,7 +4,7 @@ import pinoHTTP from "pino-http"; import promClient from "prom-client"; import promBundle from "express-prom-bundle"; import cors from "cors"; -import getRedisClient from "./getRedisClient.js"; +import { getRedisClient } from "./getRedisClient.js"; import routes from "./routes.js"; import logger from "./logger.js"; import NameRecordRepository from "./nameRecordRepository.js"; diff --git a/src/getRedisClient.js b/src/getRedisClient.js index 0f22092..4e1370b 100644 --- a/src/getRedisClient.js +++ b/src/getRedisClient.js @@ -2,11 +2,11 @@ import config from "../config/index.js"; import logger from "./logger.js"; // istanbul ignore next -const redisImportPromise = process.env.NODE_ENV === "test" - ? import("ioredis-mock") - : import("ioredis"); +const redisImportPromise = + process.env.NODE_ENV === "test" ? import("ioredis-mock") : import("ioredis"); let redisClient; +let remoteRedisClient; async function initializeRedis() { try { @@ -25,11 +25,33 @@ async function initializeRedis() { } } -async function getRedisClient() { +async function initializeRemoteRedis() { + try { + const Redis = (await redisImportPromise).default; + remoteRedisClient = new Redis(config.redis.remote_host); + + remoteRedisClient.on("connect", () => + logger.info("Connected to Remote Redis") + ); + remoteRedisClient.on("error", (err) => + logger.error(err, "Remote Redis error") + ); + } catch (error) { + // istanbul ignore next + logger.error(error, "Error initializing Remote Redis client"); + } +} + +export async function getRedisClient() { if (!redisClient) { await initializeRedis(); } return redisClient; } -export default getRedisClient; \ No newline at end of file +export async function getRemoteRedisClient() { + if (!remoteRedisClient) { + await initializeRemoteRedis(); + } + return remoteRedisClient; +} diff --git a/src/nameRecordRepository.js b/src/nameRecordRepository.js index c35b235..4d2ca14 100644 --- a/src/nameRecordRepository.js +++ b/src/nameRecordRepository.js @@ -11,12 +11,12 @@ export default class NameRecordRepository { const luaScript = ` local pubkey = redis.call('GET', 'pubkey:' .. KEYS[1]) if not pubkey then return nil end - + local relays = redis.call('SMEMBERS', 'relays:' .. pubkey) local userAgent = redis.call('GET', 'user_agent:' .. pubkey) local clientIp = redis.call('GET', 'ip:' .. pubkey) local updatedAt = redis.call('GET', 'updated_at:' .. pubkey) - + return {pubkey, relays, userAgent, clientIp, updatedAt} `; @@ -87,6 +87,72 @@ export default class NameRecordRepository { return true; } + async deleteByPubkey(pubkey) { + const namesToDelete = []; + + // Use SCAN, avoid KEYS + const stream = this.redis.scanStream({ + match: "pubkey:*", + count: 1000, + }); + + let processingPromises = []; + + return new Promise((resolve, reject) => { + stream.on("data", (resultKeys) => { + stream.pause(); + + const pipeline = this.redis.pipeline(); + + resultKeys.forEach((key) => { + pipeline.get(key); + }); + + pipeline + .exec() + .then((results) => { + const processing = []; + + for (let i = 0; i < resultKeys.length; i++) { + const key = resultKeys[i]; + const [err, associatedPubkey] = results[i]; + + if (err) { + console.error(`Error getting value for key ${key}:`, err); + continue; + } + + if (associatedPubkey === pubkey) { + const name = key.split(":")[1]; + namesToDelete.push(name); + } + } + + stream.resume(); + }) + .catch((err) => { + stream.destroy(); + reject(err); + }); + }); + + stream.on("end", async () => { + try { + for (const name of namesToDelete) { + await this.deleteByName(name); + } + resolve(true); + } catch (err) { + reject(err); + } + }); + + stream.on("error", (err) => { + reject(err); + }); + }); + } + async fetchAndClearPendingNotifications() { const luaScript = ` local entries = redis.call('ZRANGE', 'pending_notifications', 0, -1) diff --git a/src/server.js b/src/server.js index ca11cd9..0b95bd3 100644 --- a/src/server.js +++ b/src/server.js @@ -1,23 +1,45 @@ import app from "./app.js"; import logger from "./logger.js"; import config from "../config/index.js"; +import { getRemoteRedisClient, getRedisClient } from "./getRedisClient.js"; +import VanishSubscriber from "./vanishSubscriber.js"; // Import the VanishSubscriber class -app.listen(config.port, () => { +const vanishRequestsRedisClient = await getRemoteRedisClient(); +const nip05RedisClient = await getRedisClient(); + +const server = app.listen(config.port, () => { logger.info(`Server is running on port ${config.port}`); }); -process.on("uncaughtException", (err) => { - logger.fatal(err, "Uncaught exception detected"); +const vanishSubscriber = new VanishSubscriber( + vanishRequestsRedisClient, + nip05RedisClient +); +vanishSubscriber.run(); + +async function gracefulShutdown() { + logger.info("Graceful shutdown initiated..."); + + vanishSubscriber.stop(); + + while (vanishSubscriber.isRunning) { + await new Promise((resolve) => setTimeout(resolve, 100)); + } + server.close(() => { - process.exit(1); + logger.info("Express server closed."); + process.exit(0); }); +} - setTimeout(() => { - process.abort(); - }, 1000).unref(); - process.exit(1); +process.on("uncaughtException", (err) => { + logger.fatal(err, "Uncaught exception detected"); + gracefulShutdown(); }); process.on("unhandledRejection", (reason, promise) => { logger.error(reason, "An unhandled promise rejection was detected"); }); + +process.on("SIGINT", gracefulShutdown); +process.on("SIGTERM", gracefulShutdown); diff --git a/src/vanishSubscriber.js b/src/vanishSubscriber.js new file mode 100644 index 0000000..61bd51c --- /dev/null +++ b/src/vanishSubscriber.js @@ -0,0 +1,125 @@ +import NameRecordRepository from "./nameRecordRepository.js"; + +const VANISH_STREAM_KEY = "vanish_requests"; +const LAST_PROCESSED_ID_KEY = "vanish_requests:nip05_service:last_id"; +const BLOCK_TIME_MS = 5000; // 5 seconds + +class VanishSubscriber { + constructor(vanishRequestsRedis, nip05Redis) { + // Right now we have a local redis instance for nip05 data and a remote one + // used by all our services. For the momen, the remote one is only used for + // the vanish stream. + // TODO: Refactor to migrate and use only one redis instance. + + const nameRecordRepository = new NameRecordRepository(nip05Redis); + + this.vanishRequestsRedis = vanishRequestsRedis; + this.nameRecordRepository = nameRecordRepository; + this.abortController = new AbortController(); + this.isRunning = false; + } + + async processPubkey(pubkey) { + console.log(`Deleting pubkey: ${pubkey}`); + await this.nameRecordRepository.deleteByPubkey(pubkey); + } + + async run() { + if (this.isRunning) return; // Prevent multiple runs + this.isRunning = true; + + let lastProcessedID; + + try { + lastProcessedID = + (await this.vanishRequestsRedis.get(LAST_PROCESSED_ID_KEY)) || "0-0"; + console.log(`Starting from last processed ID: ${lastProcessedID}`); + } catch (err) { + console.error("Error fetching last processed ID from Redis", err); + this.isRunning = false; + return; + } + + const abortSignal = this.abortController.signal; + + while (!abortSignal.aborted) { + try { + const streamEntries = await this.vanishRequestsRedis.xread( + "BLOCK", + BLOCK_TIME_MS, + "STREAMS", + VANISH_STREAM_KEY, + lastProcessedID + ); + + if (!streamEntries) { + continue; + } + + for (const [stream, messages] of streamEntries) { + for (const [messageID, messageData] of messages) { + const event = createObjectFromPairs(messageData); + + console.log(`Vanish requests event: ${JSON.stringify(event)} `); + const pubkey = event.pubkey; + + console.log( + `Processing message ID: ${messageID} with pubkey: ${pubkey}` + ); + + try { + await this.processPubkey(pubkey); + } catch (err) { + console.error(`Error processing pubkey: ${pubkey}`, err); + } + + try { + await this.vanishRequestsRedis.set( + LAST_PROCESSED_ID_KEY, + messageID + ); + lastProcessedID = messageID; + console.log(`Updated last processed ID to: ${lastProcessedID}`); + } catch (err) { + console.error( + `Error updating last processed ID: ${messageID}`, + err + ); + } + } + } + } catch (err) { + if (abortSignal.aborted) { + break; + } + console.error("Error reading from Redis stream", err); + await new Promise((resolve) => setTimeout(resolve, 1000)); + } + } + + console.log("Cancellation signal received. Exiting gracefully..."); + await this.vanishRequestsRedis.set(LAST_PROCESSED_ID_KEY, lastProcessedID); + console.log(`Final last processed ID saved: ${lastProcessedID}`); + + this.isRunning = false; + } + + stop() { + if (!this.isRunning) return; + this.abortController.abort(); + console.log( + "Abort signal sent. Waiting for current processing to finish..." + ); + } +} + +function createObjectFromPairs(messageData) { + return messageData.reduce((acc, value, index, arr) => { + if (index % 2 === 0) { + acc[value] = arr[index + 1]; + } + return acc; + }, {}); +} + +export default VanishSubscriber; diff --git a/test/app.test.js b/test/app.test.js index a0d11aa..5e57e6f 100644 --- a/test/app.test.js +++ b/test/app.test.js @@ -1,11 +1,10 @@ import request from "supertest"; -import getRedisClient from "../src/getRedisClient.js"; +import { getRedisClient } from "../src/getRedisClient.js"; import app from "../src/app.js"; import config from "../config/index.js"; import { getNip98AuthToken, createUserPayload } from "./testUtils.js"; import NameRecord from "../src/nameRecord.js"; import NameRecordRepository from "../src/nameRecordRepository.js"; -import { response } from "express"; const notSystemSecret = "73685b53bdf5ac16498f2dc6a9891d076039adbe7eebff88b7f7ac72963450e2"; @@ -497,4 +496,61 @@ describe("Nostr NIP 05 API tests", () => { ); expect(pendingCountAfter).toEqual(0); }); + + it("should delete all data associated with a given pubkey but not affect other pubkeys", async () => { + // Arrange + const repo = new NameRecordRepository(redisClient); + + // Create NameRecords with different pubkeys + const record1 = new NameRecord( + "user1", + "pubkey1", + ["wss://relay1.com"], + "clientIp1", + "userAgent1", + new Date().toISOString() + ); + + const record2 = new NameRecord( + "user2", + "pubkey2", + ["wss://relay2.com"], + "clientIp2", + "userAgent2", + new Date().toISOString() + ); + + const record3 = new NameRecord( + "user3", + "pubkey1", // Same pubkey as record1 + ["wss://relay3.com"], + "clientIp3", + "userAgent3", + new Date().toISOString() + ); + + // Save the records + await repo.save(record1); + await repo.save(record2); + await repo.save(record3); + + // Act + // Delete by pubkey1 + await repo.deleteByPubkey("pubkey1"); + + // Assert + // Verify that records with pubkey1 are deleted + const fetchedRecord1 = await repo.findByName("user1"); + const fetchedRecord3 = await repo.findByName("user3"); + + expect(fetchedRecord1).toBeNull(); + expect(fetchedRecord3).toBeNull(); + + // Verify that records with pubkey2 are still present + const fetchedRecord2 = await repo.findByName("user2"); + + expect(fetchedRecord2).not.toBeNull(); + expect(fetchedRecord2.name).toEqual("user2"); + expect(fetchedRecord2.pubkey).toEqual("pubkey2"); + }); }); diff --git a/test/auth.test.js b/test/auth.test.js index 1e6ee01..c732ee4 100644 --- a/test/auth.test.js +++ b/test/auth.test.js @@ -1,5 +1,5 @@ import request from "supertest"; -import getRedisClient from "../src/getRedisClient.js"; +import { getRedisClient } from "../src/getRedisClient.js"; import app from "../src/app.js"; import config from "../config"; import {