From 12deda1446e6d6a87f0f858732b0089b577f4c4c Mon Sep 17 00:00:00 2001 From: devendra-shardeum Date: Wed, 15 Jan 2025 05:52:42 +0530 Subject: [PATCH 1/6] added prepared statements manager for standardising how we handle prepared statment for better code readability, higher performance sql injection proof queries to db --- src/dbstore/accounts.ts | 245 ++++++++++++------ src/dbstore/index.ts | 10 +- .../preparedStmtAccounts.ts | 125 +++++++++ .../preparedStmtManager.ts | 35 +++ 4 files changed, 334 insertions(+), 81 deletions(-) create mode 100644 src/dbstore/prepared-statements/preparedStmtAccounts.ts create mode 100644 src/dbstore/prepared-statements/preparedStmtManager.ts diff --git a/src/dbstore/accounts.ts b/src/dbstore/accounts.ts index 41fcd2c..af8b7e4 100644 --- a/src/dbstore/accounts.ts +++ b/src/dbstore/accounts.ts @@ -3,7 +3,7 @@ import { accountDatabase } from '.' import * as Logger from '../Logger' import { config } from '../Config' import { DeSerializeFromJsonString, SerializeToJsonString } from '../utils/serialization' - +import { getPreparedStmt } from './prepared-statements/preparedStmtAccounts'; /** Same as type AccountsCopy in the shardus core */ export type AccountsCopy = { @@ -20,38 +20,44 @@ type DbAccountCopy = AccountsCopy & { } export async function insertAccount(account: AccountsCopy): Promise { - try { + // Get the prepared statement for inserting an account + const stmt = getPreparedStmt('insertAccount'); - // Define the table columns based on schema - const columns = ['accountId', 'data', 'timestamp', 'hash', 'cycleNumber', 'isGlobal']; - - // Construct the SQL query with placeholders - const placeholders = `(${columns.map(() => '?').join(', ')})`; - const sql = `INSERT OR REPLACE INTO accounts (${columns.join(', ')}) VALUES ${placeholders}`; - - // Map the `account` object to match the columns - const values = columns.map((column) => - typeof account[column] === 'object' - ? SerializeToJsonString(account[column]) // Serialize objects to JSON - : account[column] - ); + // Map the `account` object to the required values for the prepared statement + const values = [ + account.accountId, + SerializeToJsonString(account.data), // Serialize `data` to JSON + account.timestamp, + account.hash, + account.cycleNumber ?? null, // Fallback to `null` if undefined + account.isGlobal, + ]; - // Execute the query directly (single-row insert) - await db.run(accountDatabase, sql, values); + // Execute the prepared statement + await new Promise((resolve, reject) => { + stmt.run(values, (err) => { + if (err) { + reject(err); + } else { + resolve(); + } + }); + }); + // Log success if verbose mode is enabled if (config.VERBOSE) { - Logger.mainLogger.debug('Successfully inserted Account', account.accountId); + Logger.mainLogger.debug('Successfully inserted Account', { accountId: account.accountId, values }); } } catch (err) { - Logger.mainLogger.error(err); - Logger.mainLogger.error( - 'Unable to insert Account or it is already stored in the database', - account.accountId - ); + // Log the error and rethrow it for upstream handling + Logger.mainLogger.error('Failed to insert account', { account, error: err }); + throw new Error(`Failed to insert account with ID ${account.accountId}`); } } + + export async function bulkInsertAccounts(accounts: AccountsCopy[]): Promise { try { @@ -86,103 +92,182 @@ export async function bulkInsertAccounts(accounts: AccountsCopy[]): Promise { try { - const sql = `UPDATE accounts SET cycleNumber = $cycleNumber, timestamp = $timestamp, data = $data, hash = $hash WHERE accountId = $accountId ` - await db.run(accountDatabase, sql, { - $cycleNumber: account.cycleNumber, - $timestamp: account.timestamp, - $data: SerializeToJsonString(account.data), - $hash: account.hash, - $accountId: account.accountId, - }) + // Get the prepared statement + const stmt = getPreparedStmt('updateAccount'); + + // Define the values to match the placeholders in the prepared statement + const values = [ + account.cycleNumber ?? null, // Default to null if undefined + account.timestamp, + SerializeToJsonString(account.data), + account.hash, + account.accountId, + ]; + + // Execute the prepared statement + await new Promise((resolve, reject) => { + stmt.run(values, (err) => { + if (err) { + reject(err); + } else { + resolve(); + } + }); + }); + + // Log success if (config.VERBOSE) { - Logger.mainLogger.debug('Successfully updated Account', account.accountId) + Logger.mainLogger.debug('Successfully updated Account', { accountId: account.accountId, values }); } - } catch (e) { - Logger.mainLogger.error(e) - Logger.mainLogger.error('Unable to update Account', account) + } catch (err) { + // Log the error and rethrow for upstream handling + Logger.mainLogger.error('Failed to update account', { account, error: err }); + throw new Error(`Failed to update account with ID ${account.accountId}`); } } export async function queryAccountByAccountId(accountId: string): Promise { try { - const sql = `SELECT * FROM accounts WHERE accountId=?` - const dbAccount = (await db.get(accountDatabase, sql, [accountId])) as DbAccountCopy - let account: AccountsCopy - if (dbAccount) account = { ...dbAccount, data: DeSerializeFromJsonString(dbAccount.data) } + // Get the prepared statement for querying by accountId + const stmt = getPreparedStmt('queryAccountByAccountId'); + + // Execute the prepared statement + const dbAccount = await new Promise((resolve, reject) => { + stmt.get([accountId], (err, row) => { + if (err) { + Logger.mainLogger.error('Error running queryAccountByAccountId statement', { accountId, error: err }); + reject(err); + } else { + resolve(row as DbAccountCopy|| null); // Resolve `null` if no row is found + } + }); + }); + + // Deserialize the `data` field if the account exists + const account: AccountsCopy | null = dbAccount + ? { ...dbAccount, data: DeSerializeFromJsonString(dbAccount.data) } + : null; + + // Log the result if verbose mode is enabled if (config.VERBOSE) { - Logger.mainLogger.debug('Account accountId', account) + Logger.mainLogger.debug('Queried Account by accountId', { accountId, account }); } - return account - } catch (e) { - Logger.mainLogger.error(e) - return null + + return account; + } catch (err) { + // Log the error and return null + Logger.mainLogger.error('Failed to query account by accountId', { accountId, error: err }); + return null; } } export async function queryLatestAccounts(count: number): Promise { - if (!Number.isInteger(count)) { - Logger.mainLogger.error('queryLatestAccounts - Invalid count value') - return null - } try { - const sql = `SELECT * FROM accounts ORDER BY cycleNumber DESC, timestamp DESC LIMIT ${ - count ? count : 100 - }` - const dbAccounts = (await db.all(accountDatabase, sql)) as DbAccountCopy[] - const accounts: AccountsCopy[] = [] + const effectiveCount = Number.isInteger(count) && count > 0 ? count : 100; + + // Retrieve the prepared statement + const stmt = getPreparedStmt('queryLatestAccounts'); + + // Execute the query with the effectiveCount + const dbAccounts = await new Promise((resolve, reject) => { + stmt.all([effectiveCount], (err, rows) => { + if (err) { + reject(err); + } else { + resolve(rows as DbAccountCopy[]); + } + }); + }); + + const accounts: AccountsCopy[] = []; if (dbAccounts.length > 0) { for (const dbAccount of dbAccounts) { - accounts.push({ ...dbAccount, data: DeSerializeFromJsonString(dbAccount.data) }) + accounts.push({ ...dbAccount, data: DeSerializeFromJsonString(dbAccount.data) }); } } + if (config.VERBOSE) { - Logger.mainLogger.debug('Account latest', accounts) + Logger.mainLogger.debug('Account latest', accounts); } - return accounts + + return accounts; } catch (e) { - Logger.mainLogger.error(e) - return null + Logger.mainLogger.error('Error in queryLatestAccounts:', e); + return null; } } export async function queryAccounts(skip = 0, limit = 10000): Promise { - let dbAccounts: DbAccountCopy[] - const accounts: AccountsCopy[] = [] - if (!Number.isInteger(skip) || !Number.isInteger(limit)) { - Logger.mainLogger.error('queryAccounts - Invalid skip or limit value') - return accounts + const accounts: AccountsCopy[] = []; + + // Validate skip and limit values + if (!Number.isInteger(skip) || !Number.isInteger(limit) || skip < 0 || limit <= 0) { + Logger.mainLogger.error('queryAccounts - Invalid skip or limit value'); + return accounts; } + try { - const sql = `SELECT * FROM accounts ORDER BY cycleNumber ASC, timestamp ASC LIMIT ${limit} OFFSET ${skip}` - dbAccounts = (await db.all(accountDatabase, sql)) as DbAccountCopy[] - if (dbAccounts.length > 0) { - for (const dbAccount of dbAccounts) { - accounts.push({ ...dbAccount, data: DeSerializeFromJsonString(dbAccount.data) }) - } + // Retrieve the prepared statement + const stmt = getPreparedStmt('queryAccounts'); + + // Execute the query with parameters + const dbAccounts = await new Promise((resolve, reject) => { + stmt.all([limit, skip], (err, rows) => { + if (err) { + reject(err); + } else { + resolve(rows as DbAccountCopy[]); + } + }); + }); + + // Deserialize data and push to accounts array + for (const dbAccount of dbAccounts) { + accounts.push({ ...dbAccount, data: DeSerializeFromJsonString(dbAccount.data) }); } } catch (e) { - Logger.mainLogger.error(e) + Logger.mainLogger.error('Error in queryAccounts:', e); } + + // Log the result count if verbose logging is enabled if (config.VERBOSE) { - Logger.mainLogger.debug('Account accounts', accounts ? accounts.length : accounts, 'skip', skip) + Logger.mainLogger.debug('Account accounts', accounts.length, 'skip', skip, 'limit', limit); } - return accounts + + return accounts; } export async function queryAccountCount(): Promise { - let accounts + let accounts; + try { - const sql = `SELECT COUNT(*) FROM accounts` - accounts = await db.get(accountDatabase, sql, []) + // Retrieve the prepared statement + const stmt = getPreparedStmt('queryAccountCount'); + + // Execute the query and retrieve the result + accounts = await new Promise((resolve, reject) => { + stmt.get([], (err, row) => { + if (err) { + reject(err); + } else { + resolve(row); + } + }); + }); } catch (e) { - Logger.mainLogger.error(e) + Logger.mainLogger.error('Error in queryAccountCount:', e); } + + // Log the raw result if verbose logging is enabled if (config.VERBOSE) { - Logger.mainLogger.debug('Account count', accounts) + Logger.mainLogger.debug('Account count', accounts); } - if (accounts) accounts = accounts['COUNT(*)'] - else accounts = 0 - return accounts + + // Extract the count from the result + if (accounts) accounts = accounts['COUNT(*)']; + else accounts = 0; + + return accounts; } export async function queryAccountCountBetweenCycles( diff --git a/src/dbstore/index.ts b/src/dbstore/index.ts index 52a8379..f9a5e14 100644 --- a/src/dbstore/index.ts +++ b/src/dbstore/index.ts @@ -2,7 +2,7 @@ import { Database } from 'sqlite3' import { Config } from '../Config' import { createDB, runCreate, close } from './sqlite3storage' import { createDirectories } from '../Utils' - +import { initializePreparedStatements, finalizePreparedStatements } from './prepared-statements/preparedStmtManager'; export let cycleDatabase: Database export let accountDatabase: Database export let transactionDatabase: Database @@ -10,6 +10,7 @@ export let receiptDatabase: Database export let originalTxDataDatabase: Database export let processedTxDatabase: Database + export const initializeDB = async (config: Config): Promise => { createDirectories(config.ARCHIVER_DB) accountDatabase = await createDB(`${config.ARCHIVER_DB}/${config.ARCHIVER_DATA.accountDB}`, 'Account') @@ -108,10 +109,17 @@ export const initializeDB = async (config: Config): Promise => { processedTxDatabase, 'CREATE INDEX if not exists `processedTxs_cycle_idx` ON `processedTxs` (`cycle`)' ) + + // Initialize prepared statements + initializePreparedStatements(accountDatabase); + + + } export const closeDatabase = async (): Promise => { const promises = [] + promises.push(finalizePreparedStatements()); promises.push(close(accountDatabase, 'Account')) promises.push(close(transactionDatabase, 'Transaction')) promises.push(close(cycleDatabase, 'Cycle')) diff --git a/src/dbstore/prepared-statements/preparedStmtAccounts.ts b/src/dbstore/prepared-statements/preparedStmtAccounts.ts new file mode 100644 index 0000000..6ab1e86 --- /dev/null +++ b/src/dbstore/prepared-statements/preparedStmtAccounts.ts @@ -0,0 +1,125 @@ +import * as sqlite3 from 'sqlite3'; +import * as Logger from '../../Logger'; +import { registerPreparedStatements} from './preparedStmtManager'; + + +let preparedStatements: Map = new Map(); + +export const initialize = (db: sqlite3.Database): void => { + preparedStatements.set( + 'insertAccount', + db.prepare( + `INSERT OR REPLACE INTO accounts + (accountId, data, timestamp, hash, cycleNumber, isGlobal) + VALUES (?, ?, ?, ?, ?, ?)` + ) + ); + + preparedStatements.set( + 'updateAccount', + db.prepare( + `UPDATE accounts + SET cycleNumber = ?, timestamp = ?, data = ?, hash = ? + WHERE accountId = ?` + ) + ); + + preparedStatements.set( + 'queryAccountByAccountId', + db.prepare(`SELECT * FROM accounts WHERE accountId = ?`) + ); + + preparedStatements.set( + 'queryLatestAccounts', + db.prepare( + `SELECT * FROM accounts + ORDER BY cycleNumber DESC, timestamp DESC + LIMIT ?` + ) + ); + + preparedStatements.set( + 'queryAccounts', + db.prepare( + `SELECT * FROM accounts + ORDER BY cycleNumber ASC, timestamp ASC + LIMIT ? OFFSET ?` + ) + ); + + preparedStatements.set( + 'queryAccountCount', + db.prepare(`SELECT COUNT(*) as count FROM accounts`) + ); + + preparedStatements.set( + 'queryAccountCountBetweenCycles', + db.prepare(`SELECT COUNT(*) FROM accounts WHERE cycleNumber BETWEEN ? AND ?`) + ); + + preparedStatements.set( + 'fetchAccountsByRangeWithOffset', + db.prepare( + `SELECT * FROM accounts + WHERE accountId BETWEEN ? AND ? + AND timestamp BETWEEN ? AND ? + ORDER BY timestamp ASC, accountId ASC + LIMIT ? OFFSET ?` + ) + ); + + preparedStatements.set( + 'fetchAccountsByRangeWithOffset', + db.prepare( + `SELECT * FROM accounts + WHERE accountId BETWEEN ? AND ? + AND timestamp BETWEEN ? AND ? + AND accountId >= ? + ORDER BY timestamp ASC, accountId ASC + LIMIT ?` + ) + ); + + preparedStatements.set( + 'fetchAccountsByList', + db.prepare( + `SELECT * FROM accounts + WHERE accountId IN (?)` + ) + ); + +}; + +export const getPreparedStmt = (name: string): sqlite3.Statement => { + const stmt = preparedStatements.get(name); + if (!stmt) { + throw new Error(`Prepared statement not found: ${name}`); + } + return stmt; +}; + +export const finalize = async (): Promise => { + const finalizePromises = []; + + preparedStatements.forEach((stmt, key) => { + if (stmt) { + finalizePromises.push( + new Promise((resolve, reject) => { + stmt.finalize((err) => { + if (err) { + Logger.mainLogger.error(`Error finalizing statement ${key}:`, err); + reject(err); + } else { + Logger.mainLogger.debug(`Successfully finalized statement ${key}`); + resolve(); + } + }); + }) + ); + } + }); + + await Promise.all(finalizePromises); +}; + +registerPreparedStatements(initialize, finalize); diff --git a/src/dbstore/prepared-statements/preparedStmtManager.ts b/src/dbstore/prepared-statements/preparedStmtManager.ts new file mode 100644 index 0000000..48c65bc --- /dev/null +++ b/src/dbstore/prepared-statements/preparedStmtManager.ts @@ -0,0 +1,35 @@ +import * as sqlite3 from 'sqlite3'; +import * as Logger from '../../Logger'; + +type PreparedStatementInitializer = (db: sqlite3.Database) => void; +type PreparedStatementFinalizer = () => Promise; + +const initializers: PreparedStatementInitializer[] = []; +const finalizers: PreparedStatementFinalizer[] = []; + +/** + * Register an initializer and finalizer for a set of prepared statements. + */ +export const registerPreparedStatements = ( + initializer: PreparedStatementInitializer, + finalizer: PreparedStatementFinalizer +): void => { + initializers.push(initializer); + finalizers.push(finalizer); +}; + +/** + * Initialize all registered prepared statements. + */ +export const initializePreparedStatements = (db: sqlite3.Database): void => { + initializers.forEach((initialize) => initialize(db)); + Logger.mainLogger.info('All prepared statements initialized.'); +}; + +/** + * Finalize all registered prepared statements. + */ +export const finalizePreparedStatements = async (): Promise => { + await Promise.all(finalizers.map((finalize) => finalize())); + Logger.mainLogger.info('All prepared statements finalized.'); +}; From 24500e59bb4a34c6b850d84a18ca0fd95e8694df Mon Sep 17 00:00:00 2001 From: devendra-shardeum Date: Wed, 15 Jan 2025 07:13:24 +0530 Subject: [PATCH 2/6] made the prepared statement manager even better --- src/dbstore/accounts.ts | 2 +- src/dbstore/index.ts | 8 ++- .../preparedStmtAccounts.ts | 72 +++++-------------- .../preparedStmtManager.ts | 52 +++++++++----- 4 files changed, 57 insertions(+), 77 deletions(-) diff --git a/src/dbstore/accounts.ts b/src/dbstore/accounts.ts index af8b7e4..270f35b 100644 --- a/src/dbstore/accounts.ts +++ b/src/dbstore/accounts.ts @@ -3,7 +3,7 @@ import { accountDatabase } from '.' import * as Logger from '../Logger' import { config } from '../Config' import { DeSerializeFromJsonString, SerializeToJsonString } from '../utils/serialization' -import { getPreparedStmt } from './prepared-statements/preparedStmtAccounts'; +import { getPreparedStmt } from './prepared-statements/preparedStmtManager' /** Same as type AccountsCopy in the shardus core */ export type AccountsCopy = { diff --git a/src/dbstore/index.ts b/src/dbstore/index.ts index f9a5e14..bfee044 100644 --- a/src/dbstore/index.ts +++ b/src/dbstore/index.ts @@ -2,7 +2,9 @@ import { Database } from 'sqlite3' import { Config } from '../Config' import { createDB, runCreate, close } from './sqlite3storage' import { createDirectories } from '../Utils' -import { initializePreparedStatements, finalizePreparedStatements } from './prepared-statements/preparedStmtManager'; +import { finalizePreparedStatements } from './prepared-statements/preparedStmtManager' +import { initialize as initializeAccountPreparedStatements } from './prepared-statements/preparedStmtAccounts'; + export let cycleDatabase: Database export let accountDatabase: Database export let transactionDatabase: Database @@ -110,8 +112,8 @@ export const initializeDB = async (config: Config): Promise => { 'CREATE INDEX if not exists `processedTxs_cycle_idx` ON `processedTxs` (`cycle`)' ) - // Initialize prepared statements - initializePreparedStatements(accountDatabase); + // Initialize prepared statements for accounts + initializeAccountPreparedStatements(accountDatabase); diff --git a/src/dbstore/prepared-statements/preparedStmtAccounts.ts b/src/dbstore/prepared-statements/preparedStmtAccounts.ts index 6ab1e86..2256c7a 100644 --- a/src/dbstore/prepared-statements/preparedStmtAccounts.ts +++ b/src/dbstore/prepared-statements/preparedStmtAccounts.ts @@ -1,12 +1,11 @@ import * as sqlite3 from 'sqlite3'; -import * as Logger from '../../Logger'; -import { registerPreparedStatements} from './preparedStmtManager'; - - -let preparedStatements: Map = new Map(); +import { addPreparedStatement } from './preparedStmtManager'; +/** + * Initialize prepared statements for the `accounts` table. + */ export const initialize = (db: sqlite3.Database): void => { - preparedStatements.set( + addPreparedStatement( 'insertAccount', db.prepare( `INSERT OR REPLACE INTO accounts @@ -15,7 +14,7 @@ export const initialize = (db: sqlite3.Database): void => { ) ); - preparedStatements.set( + addPreparedStatement( 'updateAccount', db.prepare( `UPDATE accounts @@ -24,12 +23,12 @@ export const initialize = (db: sqlite3.Database): void => { ) ); - preparedStatements.set( + addPreparedStatement( 'queryAccountByAccountId', db.prepare(`SELECT * FROM accounts WHERE accountId = ?`) ); - preparedStatements.set( + addPreparedStatement( 'queryLatestAccounts', db.prepare( `SELECT * FROM accounts @@ -38,7 +37,7 @@ export const initialize = (db: sqlite3.Database): void => { ) ); - preparedStatements.set( + addPreparedStatement( 'queryAccounts', db.prepare( `SELECT * FROM accounts @@ -47,17 +46,17 @@ export const initialize = (db: sqlite3.Database): void => { ) ); - preparedStatements.set( + addPreparedStatement( 'queryAccountCount', db.prepare(`SELECT COUNT(*) as count FROM accounts`) ); - preparedStatements.set( + addPreparedStatement( 'queryAccountCountBetweenCycles', db.prepare(`SELECT COUNT(*) FROM accounts WHERE cycleNumber BETWEEN ? AND ?`) ); - - preparedStatements.set( + + addPreparedStatement( 'fetchAccountsByRangeWithOffset', db.prepare( `SELECT * FROM accounts @@ -68,8 +67,8 @@ export const initialize = (db: sqlite3.Database): void => { ) ); - preparedStatements.set( - 'fetchAccountsByRangeWithOffset', + addPreparedStatement( + 'fetchAccountsByRangeWithOffsetAlt', db.prepare( `SELECT * FROM accounts WHERE accountId BETWEEN ? AND ? @@ -79,47 +78,12 @@ export const initialize = (db: sqlite3.Database): void => { LIMIT ?` ) ); - - preparedStatements.set( + + addPreparedStatement( 'fetchAccountsByList', db.prepare( `SELECT * FROM accounts WHERE accountId IN (?)` ) ); - -}; - -export const getPreparedStmt = (name: string): sqlite3.Statement => { - const stmt = preparedStatements.get(name); - if (!stmt) { - throw new Error(`Prepared statement not found: ${name}`); - } - return stmt; -}; - -export const finalize = async (): Promise => { - const finalizePromises = []; - - preparedStatements.forEach((stmt, key) => { - if (stmt) { - finalizePromises.push( - new Promise((resolve, reject) => { - stmt.finalize((err) => { - if (err) { - Logger.mainLogger.error(`Error finalizing statement ${key}:`, err); - reject(err); - } else { - Logger.mainLogger.debug(`Successfully finalized statement ${key}`); - resolve(); - } - }); - }) - ); - } - }); - - await Promise.all(finalizePromises); -}; - -registerPreparedStatements(initialize, finalize); +}; \ No newline at end of file diff --git a/src/dbstore/prepared-statements/preparedStmtManager.ts b/src/dbstore/prepared-statements/preparedStmtManager.ts index 48c65bc..77d883c 100644 --- a/src/dbstore/prepared-statements/preparedStmtManager.ts +++ b/src/dbstore/prepared-statements/preparedStmtManager.ts @@ -1,35 +1,49 @@ import * as sqlite3 from 'sqlite3'; import * as Logger from '../../Logger'; -type PreparedStatementInitializer = (db: sqlite3.Database) => void; -type PreparedStatementFinalizer = () => Promise; - -const initializers: PreparedStatementInitializer[] = []; -const finalizers: PreparedStatementFinalizer[] = []; +// Centralized Map for all prepared statements +export const preparedStatementRegistry: Map = new Map(); /** - * Register an initializer and finalizer for a set of prepared statements. + * Add a prepared statement to the registry. */ -export const registerPreparedStatements = ( - initializer: PreparedStatementInitializer, - finalizer: PreparedStatementFinalizer -): void => { - initializers.push(initializer); - finalizers.push(finalizer); +export const addPreparedStatement = (name: string, statement: sqlite3.Statement): void => { + if (preparedStatementRegistry.has(name)) { + Logger.mainLogger.error(`Prepared statement with name "${name}" is already registered.`); + throw new Error(`Prepared statement with name "${name}" is already registered.`); + } + preparedStatementRegistry.set(name, statement); }; /** - * Initialize all registered prepared statements. + * Get a prepared statement from the registry. */ -export const initializePreparedStatements = (db: sqlite3.Database): void => { - initializers.forEach((initialize) => initialize(db)); - Logger.mainLogger.info('All prepared statements initialized.'); +export const getPreparedStmt = (name: string): sqlite3.Statement => { + const stmt = preparedStatementRegistry.get(name); + if (!stmt) { + Logger.mainLogger.error(`Prepared statement not found: ${name}`); + throw new Error(`Prepared statement not found: ${name}`); + } + return stmt; }; /** - * Finalize all registered prepared statements. + * Finalize all prepared statements in the registry. */ export const finalizePreparedStatements = async (): Promise => { - await Promise.all(finalizers.map((finalize) => finalize())); - Logger.mainLogger.info('All prepared statements finalized.'); + const finalizePromises = Array.from(preparedStatementRegistry.values()).map( + (stmt) => + new Promise((resolve, reject) => { + stmt.finalize((err) => { + if (err) { + Logger.mainLogger.error(`Error finalizing statement:`, err); + reject(err); + } else { + resolve(); + } + }); + }) + ); + await Promise.all(finalizePromises); + preparedStatementRegistry.clear(); }; From 09efac88400c6023b856254d57a666ae8335dff9 Mon Sep 17 00:00:00 2001 From: devendra-shardeum Date: Wed, 15 Jan 2025 08:57:39 +0530 Subject: [PATCH 3/6] added prepared statements for cycles database --- scripts/archiver_data_patcher.ts | 2 +- src/API.ts | 2 +- src/Data/AccountDataProvider.ts | 239 ++++++++++++------ src/dbstore/accounts.ts | 62 +++++ src/dbstore/cycles.ts | 180 +++++++------ src/dbstore/index.ts | 5 +- .../preparedStmtAccounts.ts | 17 +- .../prepared-statements/preparedStmtCycles.ts | 65 +++++ src/server.ts | 4 +- 9 files changed, 410 insertions(+), 166 deletions(-) create mode 100644 src/dbstore/prepared-statements/preparedStmtCycles.ts diff --git a/scripts/archiver_data_patcher.ts b/scripts/archiver_data_patcher.ts index 957c882..1006840 100644 --- a/scripts/archiver_data_patcher.ts +++ b/scripts/archiver_data_patcher.ts @@ -166,7 +166,7 @@ const runProgram = async (): Promise => { } } - const totalCycles = await CycleDB.queryCyleCount() + const totalCycles = await CycleDB.queryCycleCount() const totalAccounts = await AccountDB.queryAccountCount() const totalTransactions = await TransactionDB.queryTransactionCount() const totalReceipts = await ReceiptDB.queryReceiptCount() diff --git a/src/API.ts b/src/API.ts index 268c369..c1c5a44 100644 --- a/src/API.ts +++ b/src/API.ts @@ -890,7 +890,7 @@ export function registerRoutes(server: FastifyInstance => { - const wrappedAccounts: WrappedStateArray = [] - const wrappedAccounts2: WrappedStateArray = [] // We might not need to provide data to this - let lastUpdateNeeded = false - let highestTs = 0 - let delta = 0 - const { accountStart, accountEnd, tsStart, maxRecords, offset, accountOffset } = payload - const tsEnd = Date.now() - // Query from Shardeum->queryAccountsEntryByRanges3 fn - // const query = `SELECT * FROM accountsEntry - // WHERE (timestamp, accountId) >= (${tsStart}, "${accountOffset}") - // AND timestamp < ${tsEnd} - // AND accountId <= "${accountEnd}" AND accountId >= "${accountStart}" - // ORDER BY timestamp, accountId LIMIT ${maxRecords}` - - const safeSkip = Number.isInteger(offset) ? offset : 0 - const safeLimit = Number.isInteger(maxRecords) ? maxRecords : 100 - const sqlPrefix = `SELECT * FROM accounts WHERE ` - const queryString = `accountId BETWEEN ? AND ? AND timestamp BETWEEN ? AND ? ORDER BY timestamp ASC, accountId ASC LIMIT ${safeLimit}` - const offsetCondition = ` OFFSET ${safeSkip}` - let sql = sqlPrefix - let values = [] - if (accountOffset) { - sql += `accountId >= ? AND ` - values.push(accountOffset) - } - sql += queryString - values.push(accountStart, accountEnd, tsStart, tsEnd) - if (!accountOffset) sql += offsetCondition + const wrappedAccounts: WrappedStateArray = []; + const wrappedAccounts2: WrappedStateArray = []; + let lastUpdateNeeded = false; + let highestTs = 0; + let delta = 0; + const { accountStart, accountEnd, tsStart, maxRecords, offset, accountOffset } = payload; + const tsEnd = Date.now(); - let accounts = await Account.fetchAccountsBySqlQuery(sql, values) - for (const account of accounts) { - wrappedAccounts.push({ - accountId: account.accountId, - stateId: account.hash, - data: account.data, - timestamp: account.timestamp, - }) - } - if (wrappedAccounts.length === 0) { - lastUpdateNeeded = true - } else { - // see if our newest record is new enough - highestTs = 0 - for (const account of wrappedAccounts) { - if (account.timestamp > highestTs) { - highestTs = account.timestamp - } + const safeSkip = Number.isInteger(offset) ? offset : 0; + const safeLimit = Number.isInteger(maxRecords) ? maxRecords : 100; + + try { + let accounts: AccountsCopy[] = []; + if (accountOffset) { + // Fetch accounts with `accountOffset` + accounts = await fetchAccountsByRangeWithAccountOffset( + accountStart, + accountEnd, + tsStart, + tsEnd, + accountOffset, + safeLimit + ); + } else { + // Fetch accounts without `accountOffset` + accounts = await fetchAccountsByRangeWithoutAccountOffset( + accountStart, + accountEnd, + tsStart, + tsEnd, + safeLimit, + safeSkip + ); } - delta = tsEnd - highestTs - // Logger.mainLogger.debug('Account Data received', StringUtils.safeStringify(payload)) - // Logger.mainLogger.debug( - // 'delta ' + delta, - // 'tsEnd ' + tsEnd, - // 'highestTs ' + highestTs, - // delta < QUEUE_SIT_TIME * 2 - // ) - if (delta < QUEUE_SIT_TIME * 2) { - const tsStart2 = highestTs - const tsEnd2 = Date.now() - sql = sqlPrefix + queryString + offsetCondition - values = [accountStart, accountEnd, tsStart2, tsEnd2] - accounts = await Account.fetchAccountsBySqlQuery(sql, values) - for (const account of accounts) { - wrappedAccounts2.push({ - accountId: account.accountId, - stateId: account.hash, - data: account.data, - timestamp: account.timestamp, - }) + + for (const account of accounts) { + wrappedAccounts.push({ + accountId: account.accountId, + stateId: account.hash, + data: account.data, + timestamp: account.timestamp, + }); + } + + if (wrappedAccounts.length === 0) { + lastUpdateNeeded = true; + } else { + highestTs = Math.max(...wrappedAccounts.map((a) => a.timestamp)); + delta = tsEnd - highestTs; + + if (delta < QUEUE_SIT_TIME * 2) { + const tsStart2 = highestTs; + const tsEnd2 = Date.now(); + + const accounts2 = await fetchAccountsByRangeWithoutAccountOffset( + accountStart, + accountEnd, + tsStart2, + tsEnd2, + safeLimit, + safeSkip + ); + + for (const account of accounts2) { + wrappedAccounts2.push({ + accountId: account.accountId, + stateId: account.hash, + data: account.data, + timestamp: account.timestamp, + }); + } + lastUpdateNeeded = true; } - lastUpdateNeeded = true } + } catch (e) { + Logger.mainLogger.error(e); } - return { wrappedAccounts, lastUpdateNeeded, wrappedAccounts2, highestTs, delta } -} + + return { wrappedAccounts, lastUpdateNeeded, wrappedAccounts2, highestTs, delta }; +}; + + + +// /** +// * +// * This function is contructed to provide data in similar way as the `getAccountDataByRangeSmart` function in the validator +// * @param payload +// * @returns GetAccountDataByRangeSmart +// */ +// export const provideAccountDataRequest = async ( +// payload: AccountDataRequestSchema +// ): Promise => { +// const wrappedAccounts: WrappedStateArray = [] +// const wrappedAccounts2: WrappedStateArray = [] // We might not need to provide data to this +// let lastUpdateNeeded = false +// let highestTs = 0 +// let delta = 0 +// const { accountStart, accountEnd, tsStart, maxRecords, offset, accountOffset } = payload +// const tsEnd = Date.now() + +// const safeSkip = Number.isInteger(offset) ? offset : 0 +// const safeLimit = Number.isInteger(maxRecords) ? maxRecords : 100 +// const sqlPrefix = `SELECT * FROM accounts WHERE ` +// const queryString = `accountId BETWEEN ? AND ? AND timestamp BETWEEN ? AND ? ORDER BY timestamp ASC, accountId ASC LIMIT ${safeLimit}` +// const offsetCondition = ` OFFSET ${safeSkip}` +// let sql = sqlPrefix +// let values = [] +// if (accountOffset) { +// sql += `accountId >= ? AND ` +// values.push(accountOffset) +// } +// sql += queryString +// values.push(accountStart, accountEnd, tsStart, tsEnd) +// if (!accountOffset) sql += offsetCondition + +// let accounts = await Account.fetchAccountsBySqlQuery(sql, values) +// for (const account of accounts) { +// wrappedAccounts.push({ +// accountId: account.accountId, +// stateId: account.hash, +// data: account.data, +// timestamp: account.timestamp, +// }) +// } +// if (wrappedAccounts.length === 0) { +// lastUpdateNeeded = true +// } else { +// // see if our newest record is new enough +// highestTs = 0 +// for (const account of wrappedAccounts) { +// if (account.timestamp > highestTs) { +// highestTs = account.timestamp +// } +// } +// delta = tsEnd - highestTs +// // Logger.mainLogger.debug('Account Data received', StringUtils.safeStringify(payload)) +// // Logger.mainLogger.debug( +// // 'delta ' + delta, +// // 'tsEnd ' + tsEnd, +// // 'highestTs ' + highestTs, +// // delta < QUEUE_SIT_TIME * 2 +// // ) +// if (delta < QUEUE_SIT_TIME * 2) { +// const tsStart2 = highestTs +// const tsEnd2 = Date.now() +// sql = sqlPrefix + queryString + offsetCondition +// values = [accountStart, accountEnd, tsStart2, tsEnd2] +// accounts = await Account.fetchAccountsBySqlQuery(sql, values) +// for (const account of accounts) { +// wrappedAccounts2.push({ +// accountId: account.accountId, +// stateId: account.hash, +// data: account.data, +// timestamp: account.timestamp, +// }) +// } +// lastUpdateNeeded = true +// } +// } +// return { wrappedAccounts, lastUpdateNeeded, wrappedAccounts2, highestTs, delta } +// } + export const provideAccountDataByListRequest = async ( payload: AccountDataByListRequestSchema diff --git a/src/dbstore/accounts.ts b/src/dbstore/accounts.ts index 270f35b..bb4d26d 100644 --- a/src/dbstore/accounts.ts +++ b/src/dbstore/accounts.ts @@ -323,6 +323,68 @@ export async function queryAccountsBetweenCycles( return accounts } + +/** + * Fetch accounts using the `fetchAccountsByRangeWithAccountOffset` prepared statement. + */ +export async function fetchAccountsByRangeWithAccountOffset( + accountStart: string, + accountEnd: string, + tsStart: number, + tsEnd: number, + accountOffset: string, + limit: number +): Promise { + try { + const stmt = getPreparedStmt('fetchAccountsByRangeWithAccountOffset'); + const dbAccounts = await new Promise((resolve, reject) => { + stmt.all([accountStart, accountEnd, tsStart, tsEnd, accountOffset, limit], (err, rows) => { + if (err) reject(err); + else resolve(rows as DbAccountCopy[]); + }); + }); + + return dbAccounts.map((dbAccount) => ({ + ...dbAccount, + data: DeSerializeFromJsonString(dbAccount.data), + })); + } catch (e) { + Logger.mainLogger.error(e); + return []; + } +} + +/** + * Fetch accounts using the `fetchAccountsByRangeWithoutAccountOffset` prepared statement. + */ +export async function fetchAccountsByRangeWithoutAccountOffset( + accountStart: string, + accountEnd: string, + tsStart: number, + tsEnd: number, + limit: number, + offset: number +): Promise { + try { + const stmt = getPreparedStmt('fetchAccountsByRangeWithoutAccountOffset'); + const dbAccounts = await new Promise((resolve, reject) => { + stmt.all([accountStart, accountEnd, tsStart, tsEnd, limit, offset], (err, rows) => { + if (err) reject(err); + else resolve(rows as DbAccountCopy[]); + }); + }); + + return dbAccounts.map((dbAccount) => ({ + ...dbAccount, + data: DeSerializeFromJsonString(dbAccount.data), + })); + } catch (e) { + Logger.mainLogger.error(e); + return []; + } +} + + export async function fetchAccountsBySqlQuery(sql: string, value: string[]): Promise { const accounts: AccountsCopy[] = [] try { diff --git a/src/dbstore/cycles.ts b/src/dbstore/cycles.ts index 77ebfde..1cf6122 100644 --- a/src/dbstore/cycles.ts +++ b/src/dbstore/cycles.ts @@ -5,27 +5,23 @@ import * as Logger from '../Logger' import { config } from '../Config' import { DeSerializeFromJsonString, SerializeToJsonString } from '../utils/serialization' import { Cycle, DbCycle } from './types' +import { getPreparedStmt } from './prepared-statements/preparedStmtManager' export async function insertCycle(cycle: Cycle): Promise { - try { - // Define the table columns based on schema - const columns = ['cycleMarker', 'counter', 'cycleRecord']; - - // Construct the SQL query with placeholders - const placeholders = `(${columns.map(() => '?').join(', ')})`; - const sql = `INSERT OR REPLACE INTO cycles (${columns.join(', ')}) VALUES ${placeholders}`; - - // Map the `cycle` object to match the columns - const values = columns.map((column) => - typeof cycle[column] === 'object' - ? SerializeToJsonString(cycle[column]) // Serialize objects to JSON - : cycle[column] - ); - - // Execute the query directly (single-row insert) - await db.run(cycleDatabase, sql, values); + const stmt = getPreparedStmt('insertCycle'); + const values = [ + cycle.cycleMarker, + cycle.counter, + cycle.cycleRecord && SerializeToJsonString(cycle.cycleRecord), + ]; + await new Promise((resolve, reject) => { + stmt.run(values, (err) => { + if (err) reject(err); + else resolve(); + }); + }); if (config.VERBOSE) { Logger.mainLogger.debug( @@ -44,6 +40,7 @@ export async function insertCycle(cycle: Cycle): Promise { } } + export async function bulkInsertCycles(cycles: Cycle[]): Promise { try { @@ -77,102 +74,133 @@ export async function bulkInsertCycles(cycles: Cycle[]): Promise { export async function updateCycle(marker: string, cycle: Cycle): Promise { try { - const sql = `UPDATE cycles SET counter = $counter, cycleRecord = $cycleRecord WHERE cycleMarker = $marker ` - await db.run(cycleDatabase, sql, { - $counter: cycle.counter, - $cycleRecord: cycle.cycleRecord && SerializeToJsonString(cycle.cycleRecord), - $marker: marker, - }) + const stmt = getPreparedStmt('updateCycle'); + const values = [ + cycle.counter, + SerializeToJsonString(cycle.cycleRecord), + marker, + ]; + await new Promise((resolve, reject) => { + stmt.run(values, (err) => { + if (err) reject(err); + else resolve(); + }); + }); + if (config.VERBOSE) { - Logger.mainLogger.debug('Updated cycle for counter', cycle.cycleRecord.counter, cycle.cycleMarker) + Logger.mainLogger.debug('Updated cycle for counter', cycle.counter, marker); } } catch (e) { - Logger.mainLogger.error(e) - Logger.mainLogger.error('Unable to update Cycle', cycle.cycleMarker) + Logger.mainLogger.error(e); + Logger.mainLogger.error('Unable to update Cycle', marker); } } -export async function queryCycleByMarker(marker: string): Promise { +export async function queryCycleByMarker(marker: string): Promise { try { - const sql = `SELECT * FROM cycles WHERE cycleMarker=? LIMIT 1` - const dbCycle = (await db.get(cycleDatabase, sql, [marker])) as DbCycle - let cycle: Cycle + const stmt = getPreparedStmt('queryCycleByMarker'); + const dbCycle = await new Promise((resolve, reject) => { + stmt.get([marker], (err, row) => { + if (err) reject(err); + else resolve(row as DbCycle); + }); + }); + if (dbCycle) { - cycle = { + const cycle: Cycle = { counter: dbCycle.counter, cycleRecord: DeSerializeFromJsonString(dbCycle.cycleRecord), cycleMarker: dbCycle.cycleMarker, + }; + + if (config.VERBOSE) { + Logger.mainLogger.debug('cycle marker', cycle); } + return cycle; } - if (config.VERBOSE) { - Logger.mainLogger.debug('cycle marker', cycle) - } - return cycle + return null; } catch (e) { - Logger.mainLogger.error(e) - return null + Logger.mainLogger.error(e); + return null; } } + export async function queryLatestCycleRecords(count: number): Promise { - if (!Number.isInteger(count)) { - Logger.mainLogger.error('queryLatestCycleRecords - Invalid count value') - return [] + if (!Number.isInteger(count) || count <= 0) { + Logger.mainLogger.error('queryLatestCycleRecords - Invalid count value'); + return []; } try { - const sql = `SELECT * FROM cycles ORDER BY counter DESC LIMIT ${count ? count : 100}` - const dbCycles = (await db.all(cycleDatabase, sql)) as DbCycle[] - const cycleRecords: P2P.CycleCreatorTypes.CycleData[] = [] - if (dbCycles.length > 0) { - for (const cycle of dbCycles) { - if (cycle.cycleRecord) cycleRecords.push(DeSerializeFromJsonString(cycle.cycleRecord)) - } - } + const stmt = getPreparedStmt('queryLatestCycleRecords'); + const dbCycles = await new Promise((resolve, reject) => { + stmt.all([count], (err, rows) => { + if (err) reject(err); + else resolve(rows as DbCycle[]); + }); + }); + + const cycleRecords: P2P.CycleCreatorTypes.CycleData[] = dbCycles.map((cycle) => + DeSerializeFromJsonString(cycle.cycleRecord) + ); + if (config.VERBOSE) { - Logger.mainLogger.debug('cycle latest', cycleRecords) + Logger.mainLogger.debug('cycle latest', cycleRecords); } - return cycleRecords + return cycleRecords; } catch (e) { - Logger.mainLogger.error(e) - return [] + Logger.mainLogger.error(e); + return []; } } + export async function queryCycleRecordsBetween( start: number, end: number ): Promise { try { - const sql = `SELECT * FROM cycles WHERE counter BETWEEN ? AND ? ORDER BY counter ASC` - const dbCycles = (await db.all(cycleDatabase, sql, [start, end])) as DbCycle[] - const cycleRecords: P2P.CycleCreatorTypes.CycleData[] = [] - if (dbCycles.length > 0) { - for (const cycle of dbCycles) { - if (cycle.cycleRecord) cycleRecords.push(DeSerializeFromJsonString(cycle.cycleRecord)) - } - } + const stmt = getPreparedStmt('queryCycleRecordsBetween'); + const dbCycles = await new Promise((resolve, reject) => { + stmt.all([start, end], (err, rows) => { + if (err) reject(err); + else resolve(rows as DbCycle[]); + }); + }); + + const cycleRecords: P2P.CycleCreatorTypes.CycleData[] = dbCycles.map((cycle) => + DeSerializeFromJsonString(cycle.cycleRecord) + ); + if (config.VERBOSE) { - Logger.mainLogger.debug('cycle between', cycleRecords) + Logger.mainLogger.debug('cycle between', cycleRecords); } - return cycleRecords + return cycleRecords; } catch (e) { - Logger.mainLogger.error(e) - return [] + Logger.mainLogger.error(e); + return []; } } -export async function queryCyleCount(): Promise { - let cycles +export async function queryCycleCount(): Promise { try { - const sql = `SELECT COUNT(*) FROM cycles` - cycles = await db.get(cycleDatabase, sql, []) + const stmt = getPreparedStmt('queryCycleCount'); + const result = await new Promise<{ count: number }>((resolve, reject) => { + stmt.get([], (err, row) => { + if (err) reject(err); + else resolve(row as { count: number }); + }); + }); + + const count = result?.count || 0; + + if (config.VERBOSE) { + Logger.mainLogger.debug('Cycle count', count); + } + + return count; } catch (e) { - Logger.mainLogger.error(e) + Logger.mainLogger.error(e); + return 0; } - if (config.VERBOSE) { - Logger.mainLogger.debug('Cycle count', cycles) - } - if (cycles) cycles = cycles['COUNT(*)'] - else cycles = 0 - return cycles -} +} \ No newline at end of file diff --git a/src/dbstore/index.ts b/src/dbstore/index.ts index bfee044..0bfc093 100644 --- a/src/dbstore/index.ts +++ b/src/dbstore/index.ts @@ -4,6 +4,7 @@ import { createDB, runCreate, close } from './sqlite3storage' import { createDirectories } from '../Utils' import { finalizePreparedStatements } from './prepared-statements/preparedStmtManager' import { initialize as initializeAccountPreparedStatements } from './prepared-statements/preparedStmtAccounts'; +import { initialize as initializeCyclesPreparedStatements } from './prepared-statements/preparedStmtCycles'; export let cycleDatabase: Database export let accountDatabase: Database @@ -112,9 +113,9 @@ export const initializeDB = async (config: Config): Promise => { 'CREATE INDEX if not exists `processedTxs_cycle_idx` ON `processedTxs` (`cycle`)' ) - // Initialize prepared statements for accounts + // Initialize prepared statements initializeAccountPreparedStatements(accountDatabase); - + initializeCyclesPreparedStatements(cycleDatabase); } diff --git a/src/dbstore/prepared-statements/preparedStmtAccounts.ts b/src/dbstore/prepared-statements/preparedStmtAccounts.ts index 2256c7a..f273824 100644 --- a/src/dbstore/prepared-statements/preparedStmtAccounts.ts +++ b/src/dbstore/prepared-statements/preparedStmtAccounts.ts @@ -57,33 +57,26 @@ export const initialize = (db: sqlite3.Database): void => { ); addPreparedStatement( - 'fetchAccountsByRangeWithOffset', + 'fetchAccountsByRangeWithAccountOffset', db.prepare( `SELECT * FROM accounts WHERE accountId BETWEEN ? AND ? AND timestamp BETWEEN ? AND ? + AND accountId >= ? ORDER BY timestamp ASC, accountId ASC - LIMIT ? OFFSET ?` + LIMIT ?` ) ); addPreparedStatement( - 'fetchAccountsByRangeWithOffsetAlt', + 'fetchAccountsByRangeWithoutAccountOffset', db.prepare( `SELECT * FROM accounts WHERE accountId BETWEEN ? AND ? AND timestamp BETWEEN ? AND ? - AND accountId >= ? ORDER BY timestamp ASC, accountId ASC - LIMIT ?` + LIMIT ? OFFSET ?` ) ); - addPreparedStatement( - 'fetchAccountsByList', - db.prepare( - `SELECT * FROM accounts - WHERE accountId IN (?)` - ) - ); }; \ No newline at end of file diff --git a/src/dbstore/prepared-statements/preparedStmtCycles.ts b/src/dbstore/prepared-statements/preparedStmtCycles.ts new file mode 100644 index 0000000..84663b2 --- /dev/null +++ b/src/dbstore/prepared-statements/preparedStmtCycles.ts @@ -0,0 +1,65 @@ +import * as sqlite3 from 'sqlite3'; +import { addPreparedStatement } from './preparedStmtManager'; + +/** + * Initialize prepared statements for the `cycles` table. + */ +export const initialize = (db: sqlite3.Database): void => { + // Insert or replace a single cycle + addPreparedStatement( + 'insertCycle', + db.prepare( + `INSERT OR REPLACE INTO cycles (cycleMarker, counter, cycleRecord) + VALUES (?, ?, ?)` + ) + ); + + // Update a cycle by its marker + addPreparedStatement( + 'updateCycle', + db.prepare( + `UPDATE cycles + SET counter = ?, cycleRecord = ? + WHERE cycleMarker = ?` + ) + ); + + // Query a cycle by its marker + addPreparedStatement( + 'queryCycleByMarker', + db.prepare( + `SELECT * FROM cycles + WHERE cycleMarker = ? + LIMIT 1` + ) + ); + + // Query the latest cycle records + addPreparedStatement( + 'queryLatestCycleRecords', + db.prepare( + `SELECT * FROM cycles + ORDER BY counter DESC + LIMIT ?` + ) + ); + + // Query cycle records between two counters + addPreparedStatement( + 'queryCycleRecordsBetween', + db.prepare( + `SELECT * FROM cycles + WHERE counter BETWEEN ? AND ? + ORDER BY counter ASC` + ) + ); + + // Query the total count of cycles + addPreparedStatement( + 'queryCycleCount', + db.prepare( + `SELECT COUNT(*) as count + FROM cycles` + ) + ); +}; diff --git a/src/server.ts b/src/server.ts index 33837c1..bc93703 100644 --- a/src/server.ts +++ b/src/server.ts @@ -213,7 +213,7 @@ async function syncAndStartServer(): Promise { let lastStoredReceiptCount = await ReceiptDB.queryReceiptCount() // Retrieve the count of cycles currently stored in the database - let lastStoredCycleCount = await CycleDB.queryCyleCount() + let lastStoredCycleCount = await CycleDB.queryCycleCount() let lastStoredOriginalTxCount = await OriginalTxDB.queryOriginalTxDataCount() // Query the latest cycle record from the database let lastStoredCycleInfo = (await CycleDB.queryLatestCycleRecords(1))[0] @@ -400,7 +400,7 @@ async function syncAndStartServer(): Promise { // Query for the cycle and receipt counts lastStoredReceiptCount = await ReceiptDB.queryReceiptCount() lastStoredOriginalTxCount = await OriginalTxDB.queryOriginalTxDataCount() - lastStoredCycleCount = await CycleDB.queryCyleCount() + lastStoredCycleCount = await CycleDB.queryCycleCount() lastStoredCycleInfo = (await CycleDB.queryLatestCycleRecords(1))[0] // Check for any missing data and perform syncing if necessary From 5fe44240f04e2aaba73942dc267724fe98e23f51 Mon Sep 17 00:00:00 2001 From: devendra-shardeum Date: Wed, 15 Jan 2025 12:53:14 +0530 Subject: [PATCH 4/6] added prepared statements for receipts database --- .../preparedStmtReceipts.ts | 77 +++++ src/dbstore/receipts.ts | 284 ++++++++++-------- 2 files changed, 240 insertions(+), 121 deletions(-) create mode 100644 src/dbstore/prepared-statements/preparedStmtReceipts.ts diff --git a/src/dbstore/prepared-statements/preparedStmtReceipts.ts b/src/dbstore/prepared-statements/preparedStmtReceipts.ts new file mode 100644 index 0000000..4506cc3 --- /dev/null +++ b/src/dbstore/prepared-statements/preparedStmtReceipts.ts @@ -0,0 +1,77 @@ +import * as sqlite3 from 'sqlite3'; +import { addPreparedStatement } from './preparedStmtManager'; + +/** + * Initialize prepared statements for the `receipts` table. + */ +export const initialize = (db: sqlite3.Database): void => { + addPreparedStatement( + 'insertReceipt', + db.prepare( + `INSERT OR REPLACE INTO receipts + (receiptId, tx, cycle, applyTimestamp, timestamp, signedReceipt, afterStates, beforeStates, appReceiptData, executionShardKey, globalModification) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)` + ) + ); + + + addPreparedStatement( + 'queryReceiptById', + db.prepare(`SELECT * FROM receipts WHERE receiptId=?`) + ); + + addPreparedStatement( + 'queryReceiptByIdAndTimestamp', + db.prepare(`SELECT * FROM receipts WHERE receiptId=? AND timestamp=?`) + ); + + + addPreparedStatement( + 'queryLatestReceipts', + db.prepare( + `SELECT * FROM receipts ORDER BY cycle DESC, timestamp DESC LIMIT ?` + ) + ); + + addPreparedStatement( + 'queryReceipts', + db.prepare( + `SELECT * FROM receipts ORDER BY cycle ASC, timestamp ASC LIMIT ? OFFSET ?` + ) + ); + + addPreparedStatement( + 'queryReceiptCount', + db.prepare( + `SELECT COUNT(*) as count FROM receipts` + ) + ); + + addPreparedStatement( + 'queryReceiptCountByCycles', + db.prepare( + `SELECT cycle, COUNT(*) FROM receipts + GROUP BY cycle + HAVING cycle BETWEEN ? AND ? + ORDER BY cycle ASC` + ) + ); + + addPreparedStatement( + 'queryReceiptCountBetweenCycles', + db.prepare( + `SELECT COUNT(*) FROM receipts + WHERE cycle BETWEEN ? AND ?` + ) + ); + + addPreparedStatement( + 'queryReceiptsBetweenCycles', + db.prepare( + `SELECT * FROM receipts + WHERE cycle BETWEEN ? AND ? + ORDER BY cycle ASC, timestamp ASC + LIMIT ? OFFSET ?` + ) + ); +}; diff --git a/src/dbstore/receipts.ts b/src/dbstore/receipts.ts index 2dedd8a..9b27ace 100644 --- a/src/dbstore/receipts.ts +++ b/src/dbstore/receipts.ts @@ -6,6 +6,7 @@ import * as Logger from '../Logger' import { config } from '../Config' import { DeSerializeFromJsonString , SerializeToJsonString} from '../utils/serialization' import { AccountsCopy } from '../dbstore/accounts' +import { getPreparedStmt } from './prepared-statements/preparedStmtManager'; // const superjson = require('superjson') export type Proposal = { @@ -109,34 +110,26 @@ type DbReceiptCount = ReceiptCount & { export async function insertReceipt(receipt: Receipt): Promise { try { - // Define the columns to match the database schema - const columns = [ - 'receiptId', - 'tx', - 'cycle', - 'applyTimestamp', - 'timestamp', - 'signedReceipt', - 'afterStates', - 'beforeStates', - 'appReceiptData', - 'executionShardKey', - 'globalModification', + const stmt = getPreparedStmt('insertReceipt'); + const values = [ + receipt.receiptId, + SerializeToJsonString(receipt.tx), + receipt.cycle, + receipt.applyTimestamp, + receipt.timestamp, + SerializeToJsonString(receipt.signedReceipt), + SerializeToJsonString(receipt.afterStates), + SerializeToJsonString(receipt.beforeStates), + SerializeToJsonString(receipt.appReceiptData), + receipt.executionShardKey, + receipt.globalModification, ]; - - // Create placeholders for the values - const placeholders = `(${columns.map(() => '?').join(', ')})`; - const sql = `INSERT OR REPLACE INTO receipts (${columns.join(', ')}) VALUES ${placeholders}`; - - // Map the receipt object to match the columns - const values = columns.map((column) => - typeof receipt[column] === 'object' - ? SerializeToJsonString(receipt[column]) // Serialize objects to JSON strings - : receipt[column] - ); - - // Execute the query directly - await db.run(receiptDatabase, sql, values); + await new Promise((resolve, reject) => { + stmt.run(values, (err) => { + if (err) reject(err); + else resolve(); + }); + }); if (config.VERBOSE) { Logger.mainLogger.debug('Successfully inserted Receipt', receipt.receiptId); @@ -144,12 +137,13 @@ export async function insertReceipt(receipt: Receipt): Promise { } catch (err) { Logger.mainLogger.error(err); Logger.mainLogger.error( - 'Unable to insert Receipt or it is already stored in the database', + 'Unable to insert Receipt in the Database', receipt.receiptId ); } } + export async function bulkInsertReceipts(receipts: Receipt[]): Promise { try { @@ -194,127 +188,162 @@ export async function bulkInsertReceipts(receipts: Receipt[]): Promise { } } - - - export async function queryReceiptByReceiptId(receiptId: string, timestamp = 0): Promise { try { - const sql = `SELECT * FROM receipts WHERE receiptId=?` + (timestamp ? ` AND timestamp=?` : '') - const value = timestamp ? [receiptId, timestamp] : [receiptId] - const receipt = (await db.get(receiptDatabase, sql, value)) as DbReceipt - if (receipt) deserializeDbReceipt(receipt) + const stmt = timestamp + ? getPreparedStmt('queryReceiptByIdAndTimestamp') + : getPreparedStmt('queryReceiptById'); + const values = timestamp ? [receiptId, timestamp] : [receiptId]; + const receipt = await new Promise((resolve, reject) => { + stmt.get(values, (err, row) => { + if (err) reject(err); + else resolve(row as DbReceipt); + }); + }); + + if (receipt) deserializeDbReceipt(receipt); + if (config.VERBOSE) { - Logger.mainLogger.debug('Receipt receiptId', receipt) + Logger.mainLogger.debug('Receipt receiptId', receipt); } - return receipt + + return receipt; } catch (e) { - Logger.mainLogger.error(e) - return null + Logger.mainLogger.error(e); + return null; } } + export async function queryLatestReceipts(count: number): Promise { - if (!Number.isInteger(count)) { - Logger.mainLogger.error('queryLatestReceipts - Invalid count value') - return null + if (!Number.isInteger(count) || count <= 0) { + Logger.mainLogger.error('queryLatestReceipts - Invalid count value'); + return []; } try { - const sql = `SELECT * FROM receipts ORDER BY cycle DESC, timestamp DESC LIMIT ${count ? count : 100}` - const receipts = (await db.all(receiptDatabase, sql)) as DbReceipt[] + const stmt = getPreparedStmt('queryLatestReceipts'); + const receipts = await new Promise((resolve, reject) => { + stmt.all([count], (err, rows) => { + if (err) reject(err); + else resolve(rows as DbReceipt[]); + }); + }); + if (receipts.length > 0) { - receipts.forEach((receipt: DbReceipt) => { - deserializeDbReceipt(receipt) - }) + receipts.forEach((receipt: DbReceipt) => deserializeDbReceipt(receipt)); } + if (config.VERBOSE) { - Logger.mainLogger.debug('Receipt latest', receipts) + Logger.mainLogger.debug('Receipt latest', receipts); } - return receipts + + return receipts; } catch (e) { - Logger.mainLogger.error(e) - return null + Logger.mainLogger.error(e); + return []; } } + export async function queryReceipts(skip = 0, limit = 10000): Promise { - let receipts: Receipt[] = [] - if (!Number.isInteger(skip) || !Number.isInteger(limit)) { - Logger.mainLogger.error('queryReceipts - Invalid skip or limit') - return receipts + if (!Number.isInteger(skip) || !Number.isInteger(limit) || skip < 0 || limit <= 0) { + Logger.mainLogger.error('queryReceipts - Invalid skip or limit'); + return []; } + try { - const sql = `SELECT * FROM receipts ORDER BY cycle ASC, timestamp ASC LIMIT ${limit} OFFSET ${skip}` - receipts = (await db.all(receiptDatabase, sql)) as DbReceipt[] + const stmt = getPreparedStmt('queryReceipts'); + const receipts = await new Promise((resolve, reject) => { + stmt.all([limit, skip], (err, rows) => { + if (err) reject(err); + else resolve(rows as DbReceipt[]); + }); + }); + if (receipts.length > 0) { - receipts.forEach((receipt: DbReceipt) => { - deserializeDbReceipt(receipt) - }) + receipts.forEach((receipt: DbReceipt) => deserializeDbReceipt(receipt)); + } + + if (config.VERBOSE) { + Logger.mainLogger.debug('Receipt receipts', receipts.length, 'skip', skip); } + + return receipts; } catch (e) { - Logger.mainLogger.error(e) - } - if (config.VERBOSE) { - Logger.mainLogger.debug('Receipt receipts', receipts ? receipts.length : receipts, 'skip', skip) + Logger.mainLogger.error(e); + return []; } - return receipts } + export async function queryReceiptCount(): Promise { - let receipts try { - const sql = `SELECT COUNT(*) FROM receipts` - receipts = await db.get(receiptDatabase, sql, []) + const stmt = getPreparedStmt('queryReceiptCount'); + const result = await new Promise<{ count: number }>((resolve, reject) => { + stmt.get([], (err, row) => { + if (err) reject(err); + else resolve(row as { count: number }); + }); + }); + + if (config.VERBOSE) { + Logger.mainLogger.debug('Receipt count', result); + } + + return result?.count || 0; } catch (e) { - Logger.mainLogger.error(e) - } - if (config.VERBOSE) { - Logger.mainLogger.debug('Receipt count', receipts) + Logger.mainLogger.error(e); + return 0; } - if (receipts) receipts = receipts['COUNT(*)'] - else receipts = 0 - return receipts } export async function queryReceiptCountByCycles(start: number, end: number): Promise { - let receiptsCount: ReceiptCount[] - let dbReceiptsCount: DbReceiptCount[] try { - const sql = `SELECT cycle, COUNT(*) FROM receipts GROUP BY cycle HAVING cycle BETWEEN ? AND ? ORDER BY cycle ASC` - dbReceiptsCount = (await db.all(receiptDatabase, sql, [start, end])) as DbReceiptCount[] + const stmt = getPreparedStmt('queryReceiptCountByCycles'); + const dbReceiptsCount = await new Promise((resolve, reject) => { + stmt.all([start, end], (err, rows) => { + if (err) reject(err); + else resolve(rows as DbReceiptCount[]); + }); + }); + + if (config.VERBOSE) { + Logger.mainLogger.debug('Receipt count by cycle', dbReceiptsCount); + } + + // Map the database rows into the `ReceiptCount` structure + return dbReceiptsCount.map((dbReceipt) => ({ + cycle: dbReceipt.cycle, + receiptCount: dbReceipt['COUNT(*)'], // Access the count field + })); } catch (e) { - Logger.mainLogger.error(e) + Logger.mainLogger.error(e); + return []; } - if (config.VERBOSE) { - Logger.mainLogger.debug('Receipt count by cycle', dbReceiptsCount) - } - if (dbReceiptsCount.length > 0) { - receiptsCount = dbReceiptsCount.map((dbReceipt) => { - return { - cycle: dbReceipt.cycle, - receiptCount: dbReceipt['COUNT(*)'], - } - }) - } - return receiptsCount } export async function queryReceiptCountBetweenCycles( startCycleNumber: number, endCycleNumber: number ): Promise { - let receipts try { - const sql = `SELECT COUNT(*) FROM receipts WHERE cycle BETWEEN ? AND ?` - receipts = await db.get(receiptDatabase, sql, [startCycleNumber, endCycleNumber]) + const stmt = getPreparedStmt('queryReceiptCountBetweenCycles'); + const result = await new Promise<{ count: number }>((resolve, reject) => { + stmt.get([startCycleNumber, endCycleNumber], (err, row) => { + if (err) reject(err); + else resolve(row as { count: number }); + }); + }); + + if (config.VERBOSE) { + Logger.mainLogger.debug('Receipt count between cycles', result); + } + + return result ? result.count : 0; // Return the count value } catch (e) { - console.log(e) + Logger.mainLogger.error(e); + return 0; // Return 0 in case of an error } - if (config.VERBOSE) { - Logger.mainLogger.debug('Receipt count between cycles', receipts) - } - if (receipts) receipts = receipts['COUNT(*)'] - else receipts = 0 - return receipts } export async function queryReceiptsBetweenCycles( @@ -323,33 +352,46 @@ export async function queryReceiptsBetweenCycles( startCycleNumber: number, endCycleNumber: number ): Promise { - let receipts: Receipt[] = [] + const receipts: Receipt[] = []; + + // Validate input if (!Number.isInteger(skip) || !Number.isInteger(limit)) { - Logger.mainLogger.error('queryReceiptsBetweenCycles - Invalid skip or limit') - return receipts + Logger.mainLogger.error('queryReceiptsBetweenCycles - Invalid skip or limit'); + return receipts; } + try { - const sql = `SELECT * FROM receipts WHERE cycle BETWEEN ? AND ? ORDER BY cycle ASC, timestamp ASC LIMIT ${limit} OFFSET ${skip}` - receipts = (await db.all(receiptDatabase, sql, [startCycleNumber, endCycleNumber])) as DbReceipt[] - if (receipts.length > 0) { - receipts.forEach((receipt: DbReceipt) => { - deserializeDbReceipt(receipt) - }) + const stmt = getPreparedStmt('queryReceiptsBetweenCycles'); + const dbReceipts = await new Promise((resolve, reject) => { + stmt.all([startCycleNumber, endCycleNumber, limit, skip], (err, rows) => { + if (err) reject(err); + else resolve(rows as DbReceipt[]); + }); + }); + + if (dbReceipts.length > 0) { + dbReceipts.forEach((receipt: DbReceipt) => { + deserializeDbReceipt(receipt); + receipts.push(receipt as Receipt); + }); + } + + if (config.VERBOSE) { + Logger.mainLogger.debug( + 'Receipt receipts between cycles', + receipts.length, + 'skip', + skip + ); } } catch (e) { - console.log(e) + Logger.mainLogger.error(e); } - if (config.VERBOSE) { - Logger.mainLogger.debug( - 'Receipt receipts between cycles', - receipts ? receipts.length : receipts, - 'skip', - skip - ) - } - return receipts + + return receipts; } + function deserializeDbReceipt(receipt: DbReceipt): void { if (receipt.tx) receipt.tx = DeSerializeFromJsonString(receipt.tx) if (receipt.beforeStates) receipt.beforeStates = DeSerializeFromJsonString(receipt.beforeStates) From 51ac2712b8e59ba2d51703c1e805565d32382b33 Mon Sep 17 00:00:00 2001 From: devendra-shardeum Date: Wed, 15 Jan 2025 13:35:44 +0530 Subject: [PATCH 5/6] added prepared statements for originalTxsData database --- src/dbstore/originalTxsData.ts | 244 +++++++++++------- .../preparedStmtOriginalTxData.ts | 71 +++++ .../preparedStmtReceipts.ts | 22 +- src/dbstore/receipts.ts | 7 +- 4 files changed, 232 insertions(+), 112 deletions(-) create mode 100644 src/dbstore/prepared-statements/preparedStmtOriginalTxData.ts diff --git a/src/dbstore/originalTxsData.ts b/src/dbstore/originalTxsData.ts index 9dd87da..d811e3d 100644 --- a/src/dbstore/originalTxsData.ts +++ b/src/dbstore/originalTxsData.ts @@ -4,6 +4,7 @@ import { originalTxDataDatabase } from '.' import * as Logger from '../Logger' import { config } from '../Config' import { DeSerializeFromJsonString, SerializeToJsonString } from '../utils/serialization' +import { getPreparedStmt } from './prepared-statements/preparedStmtManager' export interface OriginalTxData { txId: string @@ -28,25 +29,21 @@ type DbOriginalTxDataCount = OriginalTxDataCount & { } export async function insertOriginalTxData(originalTxData: OriginalTxData): Promise { - try { - - // Define the table columns based on schema - const columns = ['txId', 'timestamp', 'cycle', 'originalTxData']; - - // Construct the SQL query with placeholders - const placeholders = `(${columns.map(() => '?').join(', ')})`; - const sql = `INSERT OR REPLACE INTO originalTxsData (${columns.join(', ')}) VALUES ${placeholders}`; - - // Map the `originalTxData` object to match the columns - const values = columns.map((column) => - typeof originalTxData[column] === 'object' - ? SerializeToJsonString(originalTxData[column]) // Serialize objects to JSON - : originalTxData[column] - ); - - // Execute the query directly (single-row insert) - await db.run(originalTxDataDatabase, sql, values); + const stmt = getPreparedStmt('insertOriginalTxData'); + const values = [ + originalTxData.txId, + originalTxData.timestamp, + originalTxData.cycle, + SerializeToJsonString(originalTxData.originalTxData), + ]; + + await new Promise((resolve, reject) => { + stmt.run(values, (err) => { + if (err) reject(err); + else resolve(); + }); + }); if (config.VERBOSE) { Logger.mainLogger.debug('Successfully inserted OriginalTxData', originalTxData.txId); @@ -60,7 +57,6 @@ export async function insertOriginalTxData(originalTxData: OriginalTxData): Prom } } - export async function bulkInsertOriginalTxsData(originalTxsData: OriginalTxData[]): Promise { try { @@ -93,25 +89,34 @@ export async function bulkInsertOriginalTxsData(originalTxsData: OriginalTxData[ } } - - export async function queryOriginalTxDataCount(startCycle?: number, endCycle?: number): Promise { - let originalTxsData try { - let sql = `SELECT COUNT(*) FROM originalTxsData` - const values: number[] = [] - if (startCycle && endCycle) { - sql += ` WHERE cycle BETWEEN ? AND ?` - values.push(startCycle, endCycle) + let stmt; + const values: number[] = []; + + if (startCycle !== undefined && endCycle !== undefined) { + stmt = getPreparedStmt('queryOriginalTxDataCountByCycles'); + values.push(startCycle, endCycle); + } else { + stmt = getPreparedStmt('queryOriginalTxDataCount'); + } + + const result = await new Promise<{ 'COUNT(*)': number }>((resolve, reject) => { + stmt.get(values, (err, row) => { + if (err) reject(err); + else resolve(row as { 'COUNT(*)': number }); + }); + }); + + if (config.VERBOSE) { + Logger.mainLogger.debug('OriginalTxData count', result); } - originalTxsData = await db.get(originalTxDataDatabase, sql, values) + + return result ? result['COUNT(*)'] || 0 : 0; } catch (e) { - console.log(e) + Logger.mainLogger.error(e); + return 0; } - if (config.VERBOSE) { - Logger.mainLogger.debug('OriginalTxData count', originalTxsData) - } - return originalTxsData['COUNT(*)'] || 0 } export async function queryOriginalTxsData( @@ -120,108 +125,151 @@ export async function queryOriginalTxsData( startCycle?: number, endCycle?: number ): Promise { - let originalTxsData: DbOriginalTxData[] = [] + let originalTxsData: DbOriginalTxData[] = []; + if (!Number.isInteger(skip) || !Number.isInteger(limit)) { - Logger.mainLogger.error('queryOriginalTxsData - Invalid skip or limit') - return originalTxsData + Logger.mainLogger.error('queryOriginalTxsData - Invalid skip or limit'); + return originalTxsData; } + try { - let sql = `SELECT * FROM originalTxsData` - const sqlSuffix = ` ORDER BY cycle ASC, timestamp ASC LIMIT ${limit} OFFSET ${skip}` - const values: number[] = [] - if (startCycle && endCycle) { - sql += ` WHERE cycle BETWEEN ? AND ?` - values.push(startCycle, endCycle) + let stmt; + const values: number[] = [limit, skip]; + + if (startCycle !== undefined && endCycle !== undefined) { + stmt = getPreparedStmt('queryOriginalTxsDataByCycles'); + values.unshift(startCycle, endCycle); // Add startCycle and endCycle to the values + } else { + stmt = getPreparedStmt('queryOriginalTxsData'); } - sql += sqlSuffix - originalTxsData = (await db.all(originalTxDataDatabase, sql, values)) as DbOriginalTxData[] + + originalTxsData = await new Promise((resolve, reject) => { + stmt.all(values, (err, rows) => { + if (err) reject(err); + else resolve(rows as DbOriginalTxData[]); + }); + }); + originalTxsData.forEach((originalTxData: DbOriginalTxData) => { - if (originalTxData.originalTxData) - originalTxData.originalTxData = DeSerializeFromJsonString(originalTxData.originalTxData) - // if (originalTxData.sign) originalTxData.sign = DeSerializeFromJsonString(originalTxData.sign) - }) + if (originalTxData.originalTxData) { + originalTxData.originalTxData = DeSerializeFromJsonString(originalTxData.originalTxData); + } + }); } catch (e) { - console.log(e) + Logger.mainLogger.error(e); } + if (config.VERBOSE) { - Logger.mainLogger.debug('OriginalTxData originalTxsData', originalTxsData) + Logger.mainLogger.debug('OriginalTxData originalTxsData', originalTxsData); } - return originalTxsData + + return originalTxsData; } -export async function queryOriginalTxDataByTxId(txId: string, timestamp = 0): Promise { +export async function queryOriginalTxDataByTxId( + txId: string, + timestamp = 0 +): Promise { try { - const sql = `SELECT * FROM originalTxsData WHERE txId=?` + (timestamp ? ` AND timestamp=?` : '') - const value = timestamp ? [txId, timestamp] : [txId] - const originalTxData = (await db.get(originalTxDataDatabase, sql, value)) as DbOriginalTxData + let stmt; + const values: (string | number)[] = [txId]; + + if (timestamp) { + stmt = getPreparedStmt('queryOriginalTxDataByTxIdWithTimestamp'); + values.push(timestamp); + } else { + stmt = getPreparedStmt('queryOriginalTxDataByTxId'); + } + + const originalTxData = await new Promise((resolve, reject) => { + stmt.get(values, (err, row) => { + if (err) reject(err); + else resolve(row as DbOriginalTxData | null); + }); + }); + if (originalTxData) { - if (originalTxData.originalTxData) - originalTxData.originalTxData = DeSerializeFromJsonString(originalTxData.originalTxData) - // if (originalTxData.sign) originalTxData.sign = DeSerializeFromJsonString(originalTxData.sign) + if (originalTxData.originalTxData) { + originalTxData.originalTxData = DeSerializeFromJsonString(originalTxData.originalTxData); + } } + if (config.VERBOSE) { - Logger.mainLogger.debug('OriginalTxData txId', originalTxData) + Logger.mainLogger.debug('OriginalTxData txId', originalTxData); } - return originalTxData as OriginalTxData + + return originalTxData as OriginalTxData | null; } catch (e) { - console.log(e) + Logger.mainLogger.error(e); + return null; } - return null } + export async function queryOriginalTxDataCountByCycles( start: number, end: number ): Promise { - const originalTxsDataCount: OriginalTxDataCount[] = [] - let dbOriginalTxsDataCount: DbOriginalTxDataCount[] = [] + const originalTxsDataCount: OriginalTxDataCount[] = []; try { - const sql = `SELECT cycle, COUNT(*) FROM originalTxsData GROUP BY cycle HAVING cycle BETWEEN ? AND ? ORDER BY cycle ASC` - dbOriginalTxsDataCount = (await db.all(originalTxDataDatabase, sql, [ - start, - end, - ])) as DbOriginalTxDataCount[] - } catch (e) { - Logger.mainLogger.error(e) - } - if (config.VERBOSE) { - Logger.mainLogger.debug('OriginalTxData count by cycle', dbOriginalTxsDataCount) - } - if (dbOriginalTxsDataCount.length > 0) { - for (let i = 0; i < dbOriginalTxsDataCount.length; i++) { - /* eslint-disable security/detect-object-injection */ - originalTxsDataCount.push({ - cycle: dbOriginalTxsDataCount[i].cycle, - originalTxDataCount: dbOriginalTxsDataCount[i]['COUNT(*)'], - }) - /* eslint-enable security/detect-object-injection */ + const stmt = getPreparedStmt('queryOriginalTxDataCountByCycles'); + + const dbOriginalTxsDataCount = await new Promise((resolve, reject) => { + stmt.all([start, end], (err, rows) => { + if (err) reject(err); + else resolve(rows as DbOriginalTxDataCount[]); + }); + }); + + if (config.VERBOSE) { + Logger.mainLogger.debug('OriginalTxData count by cycle', dbOriginalTxsDataCount); + } + + if (dbOriginalTxsDataCount.length > 0) { + for (const dbRecord of dbOriginalTxsDataCount) { + originalTxsDataCount.push({ + cycle: dbRecord.cycle, + originalTxDataCount: dbRecord['COUNT(*)'], // Preserve original logic + }); + } } + } catch (e) { + Logger.mainLogger.error(e); } - return originalTxsDataCount + + return originalTxsDataCount; } -export async function queryLatestOriginalTxs(count: number): Promise { +export async function queryLatestOriginalTxs(count: number): Promise { if (!Number.isInteger(count)) { - Logger.mainLogger.error('queryLatestOriginalTxs - Invalid count value') - return null + Logger.mainLogger.error('queryLatestOriginalTxs - Invalid count value'); + return null; } + try { - const sql = `SELECT * FROM originalTxsData ORDER BY cycle DESC, timestamp DESC LIMIT ${ - count ? count : 100 - }` - const originalTxsData = (await db.all(originalTxDataDatabase, sql)) as DbOriginalTxData[] + const stmt = getPreparedStmt('queryLatestOriginalTxs'); + const originalTxsData = await new Promise((resolve, reject) => { + stmt.all([count], (err, rows) => { + if (err) reject(err); + else resolve(rows as DbOriginalTxData[]); + }); + }); + if (originalTxsData.length > 0) { originalTxsData.forEach((tx: DbOriginalTxData) => { - if (tx.originalTxData) tx.originalTxData = DeSerializeFromJsonString(tx.originalTxData) - // if (tx.sign) tx.sign = DeSerializeFromJsonString(tx.sign) - }) + if (tx.originalTxData) { + tx.originalTxData = DeSerializeFromJsonString(tx.originalTxData); + } + }); } + if (config.VERBOSE) { - Logger.mainLogger.debug('Latest Original-Tx: ', originalTxsData) + Logger.mainLogger.debug('Latest Original-Tx: ', originalTxsData); } - return originalTxsData + + return originalTxsData; } catch (e) { - Logger.mainLogger.error(e) - return null + Logger.mainLogger.error(e); + return null; } } diff --git a/src/dbstore/prepared-statements/preparedStmtOriginalTxData.ts b/src/dbstore/prepared-statements/preparedStmtOriginalTxData.ts new file mode 100644 index 0000000..bc4ae98 --- /dev/null +++ b/src/dbstore/prepared-statements/preparedStmtOriginalTxData.ts @@ -0,0 +1,71 @@ +import * as sqlite3 from 'sqlite3'; +import { addPreparedStatement } from './preparedStmtManager'; + +/** + * Initialize prepared statements for the `receipts` table. + */ +export const initialize = (db: sqlite3.Database): void => { + + addPreparedStatement( + 'insertOriginalTxData', + db.prepare( + `INSERT OR REPLACE INTO originalTxsData + (txId, timestamp, cycle, originalTxData) + VALUES (?, ?, ?, ?)` + ) + ); + + addPreparedStatement( + 'queryOriginalTxDataCount', + db.prepare(`SELECT COUNT(*) FROM originalTxsData`) + ); + + addPreparedStatement( + 'queryOriginalTxDataCountByCycles', + db.prepare(`SELECT COUNT(*) FROM originalTxsData WHERE cycle BETWEEN ? AND ?`) + ); + + addPreparedStatement( + 'queryOriginalTxsData', + db.prepare( + `SELECT * FROM originalTxsData ORDER BY cycle ASC, timestamp ASC LIMIT ? OFFSET ?` + ) + ); + + addPreparedStatement( + 'queryOriginalTxsDataByCycles', + db.prepare( + `SELECT * FROM originalTxsData WHERE cycle BETWEEN ? AND ? ORDER BY cycle ASC, timestamp ASC LIMIT ? OFFSET ?` + ) + ); + + addPreparedStatement( + 'queryOriginalTxDataByTxId', + db.prepare(`SELECT * FROM originalTxsData WHERE txId = ?`) + ); + + addPreparedStatement( + 'queryOriginalTxDataByTxIdWithTimestamp', + db.prepare(`SELECT * FROM originalTxsData WHERE txId = ? AND timestamp = ?`) + ); + + addPreparedStatement( + 'queryOriginalTxDataCountByCycles', + db.prepare(` + SELECT cycle, COUNT(*) + FROM originalTxsData + GROUP BY cycle + HAVING cycle BETWEEN ? AND ? + ORDER BY cycle ASC + `) + ); + + addPreparedStatement( + 'queryLatestOriginalTxs', + db.prepare(` + SELECT * FROM originalTxsData + ORDER BY cycle DESC, timestamp DESC + LIMIT ? + `) + ); +}; diff --git a/src/dbstore/prepared-statements/preparedStmtReceipts.ts b/src/dbstore/prepared-statements/preparedStmtReceipts.ts index 4506cc3..f3cd210 100644 --- a/src/dbstore/prepared-statements/preparedStmtReceipts.ts +++ b/src/dbstore/prepared-statements/preparedStmtReceipts.ts @@ -9,8 +9,8 @@ export const initialize = (db: sqlite3.Database): void => { 'insertReceipt', db.prepare( `INSERT OR REPLACE INTO receipts - (receiptId, tx, cycle, applyTimestamp, timestamp, signedReceipt, afterStates, beforeStates, appReceiptData, executionShardKey, globalModification) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)` + (receiptId, tx, cycle, applyTimestamp, timestamp, signedReceipt, afterStates, beforeStates, appReceiptData, executionShardKey, globalModification) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)` ) ); @@ -19,12 +19,12 @@ export const initialize = (db: sqlite3.Database): void => { 'queryReceiptById', db.prepare(`SELECT * FROM receipts WHERE receiptId=?`) ); - + addPreparedStatement( 'queryReceiptByIdAndTimestamp', db.prepare(`SELECT * FROM receipts WHERE receiptId=? AND timestamp=?`) ); - + addPreparedStatement( 'queryLatestReceipts', @@ -51,9 +51,9 @@ export const initialize = (db: sqlite3.Database): void => { 'queryReceiptCountByCycles', db.prepare( `SELECT cycle, COUNT(*) FROM receipts - GROUP BY cycle - HAVING cycle BETWEEN ? AND ? - ORDER BY cycle ASC` + GROUP BY cycle + HAVING cycle BETWEEN ? AND ? + ORDER BY cycle ASC` ) ); @@ -61,7 +61,7 @@ export const initialize = (db: sqlite3.Database): void => { 'queryReceiptCountBetweenCycles', db.prepare( `SELECT COUNT(*) FROM receipts - WHERE cycle BETWEEN ? AND ?` + WHERE cycle BETWEEN ? AND ?` ) ); @@ -69,9 +69,9 @@ export const initialize = (db: sqlite3.Database): void => { 'queryReceiptsBetweenCycles', db.prepare( `SELECT * FROM receipts - WHERE cycle BETWEEN ? AND ? - ORDER BY cycle ASC, timestamp ASC - LIMIT ? OFFSET ?` + WHERE cycle BETWEEN ? AND ? + ORDER BY cycle ASC, timestamp ASC + LIMIT ? OFFSET ?` ) ); }; diff --git a/src/dbstore/receipts.ts b/src/dbstore/receipts.ts index 9b27ace..9dfeb97 100644 --- a/src/dbstore/receipts.ts +++ b/src/dbstore/receipts.ts @@ -328,10 +328,10 @@ export async function queryReceiptCountBetweenCycles( ): Promise { try { const stmt = getPreparedStmt('queryReceiptCountBetweenCycles'); - const result = await new Promise<{ count: number }>((resolve, reject) => { + const result = await new Promise<{ 'COUNT(*)': number }>((resolve, reject) => { stmt.get([startCycleNumber, endCycleNumber], (err, row) => { if (err) reject(err); - else resolve(row as { count: number }); + else resolve(row as { 'COUNT(*)': number }); }); }); @@ -339,13 +339,14 @@ export async function queryReceiptCountBetweenCycles( Logger.mainLogger.debug('Receipt count between cycles', result); } - return result ? result.count : 0; // Return the count value + return result ? result['COUNT(*)'] : 0; // Access 'COUNT(*)' explicitly } catch (e) { Logger.mainLogger.error(e); return 0; // Return 0 in case of an error } } + export async function queryReceiptsBetweenCycles( skip = 0, limit = 10000, From 907f97eeafb174cc2c95b9e6b9584acee67b3b52 Mon Sep 17 00:00:00 2001 From: devendra-shardeum Date: Wed, 15 Jan 2025 20:19:59 +0530 Subject: [PATCH 6/6] added prepared statements in all the files --- src/API.ts | 5 +- src/dbstore/index.ts | 9 + src/dbstore/originalTxsData.ts | 2 +- .../preparedStmtAccounts.ts | 4 +- .../prepared-statements/preparedStmtCycles.ts | 4 +- .../preparedStmtOriginalTxData.ts | 6 +- .../preparedStmtProcessedTxs.ts | 36 +++ .../preparedStmtReceipts.ts | 4 +- .../preparedStmtTransactions.ts | 64 ++++ src/dbstore/processedTxs.ts | 128 +++++--- src/dbstore/receipts.ts | 2 +- src/dbstore/transactions.ts | 287 +++++++++++------- 12 files changed, 370 insertions(+), 181 deletions(-) create mode 100644 src/dbstore/prepared-statements/preparedStmtProcessedTxs.ts create mode 100644 src/dbstore/prepared-statements/preparedStmtTransactions.ts diff --git a/src/API.ts b/src/API.ts index c1c5a44..949a5b7 100644 --- a/src/API.ts +++ b/src/API.ts @@ -866,10 +866,7 @@ export function registerRoutes(server: FastifyInstance => { // Initialize prepared statements initializeAccountPreparedStatements(accountDatabase); initializeCyclesPreparedStatements(cycleDatabase); + initializeOriginalTxDataPreparedStatements(originalTxDataDatabase); + initializeProcessedTxsPreparedStatements(processedTxDatabase); + initializeReceiptsPreparedStatements(receiptDatabase); + initializeTransactionsPreparedStatements(transactionDatabase); } diff --git a/src/dbstore/originalTxsData.ts b/src/dbstore/originalTxsData.ts index d811e3d..ee654e2 100644 --- a/src/dbstore/originalTxsData.ts +++ b/src/dbstore/originalTxsData.ts @@ -95,7 +95,7 @@ export async function queryOriginalTxDataCount(startCycle?: number, endCycle?: n const values: number[] = []; if (startCycle !== undefined && endCycle !== undefined) { - stmt = getPreparedStmt('queryOriginalTxDataCountByCycles'); + stmt = getPreparedStmt('queryOriginalTxDataCountBetweenCycles'); values.push(startCycle, endCycle); } else { stmt = getPreparedStmt('queryOriginalTxDataCount'); diff --git a/src/dbstore/prepared-statements/preparedStmtAccounts.ts b/src/dbstore/prepared-statements/preparedStmtAccounts.ts index f273824..1fde3f0 100644 --- a/src/dbstore/prepared-statements/preparedStmtAccounts.ts +++ b/src/dbstore/prepared-statements/preparedStmtAccounts.ts @@ -1,9 +1,7 @@ import * as sqlite3 from 'sqlite3'; import { addPreparedStatement } from './preparedStmtManager'; -/** - * Initialize prepared statements for the `accounts` table. - */ + export const initialize = (db: sqlite3.Database): void => { addPreparedStatement( 'insertAccount', diff --git a/src/dbstore/prepared-statements/preparedStmtCycles.ts b/src/dbstore/prepared-statements/preparedStmtCycles.ts index 84663b2..0d6764a 100644 --- a/src/dbstore/prepared-statements/preparedStmtCycles.ts +++ b/src/dbstore/prepared-statements/preparedStmtCycles.ts @@ -1,9 +1,7 @@ import * as sqlite3 from 'sqlite3'; import { addPreparedStatement } from './preparedStmtManager'; -/** - * Initialize prepared statements for the `cycles` table. - */ + export const initialize = (db: sqlite3.Database): void => { // Insert or replace a single cycle addPreparedStatement( diff --git a/src/dbstore/prepared-statements/preparedStmtOriginalTxData.ts b/src/dbstore/prepared-statements/preparedStmtOriginalTxData.ts index bc4ae98..bfcd1f4 100644 --- a/src/dbstore/prepared-statements/preparedStmtOriginalTxData.ts +++ b/src/dbstore/prepared-statements/preparedStmtOriginalTxData.ts @@ -1,9 +1,7 @@ import * as sqlite3 from 'sqlite3'; import { addPreparedStatement } from './preparedStmtManager'; -/** - * Initialize prepared statements for the `receipts` table. - */ + export const initialize = (db: sqlite3.Database): void => { addPreparedStatement( @@ -21,7 +19,7 @@ export const initialize = (db: sqlite3.Database): void => { ); addPreparedStatement( - 'queryOriginalTxDataCountByCycles', + 'queryOriginalTxDataCountBetweenCycles', db.prepare(`SELECT COUNT(*) FROM originalTxsData WHERE cycle BETWEEN ? AND ?`) ); diff --git a/src/dbstore/prepared-statements/preparedStmtProcessedTxs.ts b/src/dbstore/prepared-statements/preparedStmtProcessedTxs.ts new file mode 100644 index 0000000..d2b9d91 --- /dev/null +++ b/src/dbstore/prepared-statements/preparedStmtProcessedTxs.ts @@ -0,0 +1,36 @@ +import * as sqlite3 from 'sqlite3'; +import { addPreparedStatement } from './preparedStmtManager'; + + +export const initialize = (db: sqlite3.Database): void => { + + + addPreparedStatement( + 'insertProcessedTx', + db.prepare(` + INSERT INTO processedTxs (txId, cycle, txTimestamp, applyTimestamp) + VALUES (?, ?, ?, ?) + ON CONFLICT (txId) DO UPDATE SET + cycle = excluded.cycle, + txTimestamp = excluded.txTimestamp, + applyTimestamp = excluded.applyTimestamp + `) + ); + + addPreparedStatement( + 'queryProcessedTxByTxId', + db.prepare(`SELECT * FROM processedTxs WHERE txId = ?`) + ); + + + addPreparedStatement( + 'queryProcessedTxsByCycleNumber', + db.prepare(`SELECT * FROM processedTxs WHERE cycle = ?`) + ); + + addPreparedStatement( + 'querySortedTxsBetweenCycleRange', + db.prepare(`SELECT txId FROM processedTxs WHERE cycle BETWEEN ? AND ?`) + ); + +}; diff --git a/src/dbstore/prepared-statements/preparedStmtReceipts.ts b/src/dbstore/prepared-statements/preparedStmtReceipts.ts index f3cd210..94c9b5a 100644 --- a/src/dbstore/prepared-statements/preparedStmtReceipts.ts +++ b/src/dbstore/prepared-statements/preparedStmtReceipts.ts @@ -1,9 +1,7 @@ import * as sqlite3 from 'sqlite3'; import { addPreparedStatement } from './preparedStmtManager'; -/** - * Initialize prepared statements for the `receipts` table. - */ + export const initialize = (db: sqlite3.Database): void => { addPreparedStatement( 'insertReceipt', diff --git a/src/dbstore/prepared-statements/preparedStmtTransactions.ts b/src/dbstore/prepared-statements/preparedStmtTransactions.ts new file mode 100644 index 0000000..2e6f1a2 --- /dev/null +++ b/src/dbstore/prepared-statements/preparedStmtTransactions.ts @@ -0,0 +1,64 @@ +import * as sqlite3 from 'sqlite3'; +import { addPreparedStatement } from './preparedStmtManager'; + + +export const initialize = (db: sqlite3.Database): void => { + + + addPreparedStatement( + 'insertTransaction', + db.prepare( + `INSERT OR REPLACE INTO transactions + (txId, appReceiptId, timestamp, cycleNumber, data, originalTxData) + VALUES (?, ?, ?, ?, ?, ?)` + ) + ); + + addPreparedStatement( + 'queryTransactionByTxId', + db.prepare(`SELECT * FROM transactions WHERE txId = ?`) + ); + + addPreparedStatement( + 'queryLatestTransactions', + db.prepare( + `SELECT * FROM transactions + ORDER BY cycleNumber DESC, timestamp DESC + LIMIT ?` + ) + ); + + addPreparedStatement( + 'queryTransactions', + db.prepare( + `SELECT * FROM transactions + ORDER BY cycleNumber ASC, timestamp ASC + LIMIT ? OFFSET ?` + ) + ); + + addPreparedStatement( + 'queryTransactionCount', + db.prepare( + `SELECT COUNT(*) FROM transactions` + ) + ); + + addPreparedStatement( + 'queryTransactionCountBetweenCycles', + db.prepare( + `SELECT COUNT(*) FROM transactions WHERE cycleNumber BETWEEN ? AND ?` + ) + ); + + addPreparedStatement( + 'queryTransactionsBetweenCycles', + db.prepare( + `SELECT * FROM transactions + WHERE cycleNumber BETWEEN ? AND ? + ORDER BY cycleNumber ASC, timestamp ASC + LIMIT ? OFFSET ?` + ) + ); + +}; diff --git a/src/dbstore/processedTxs.ts b/src/dbstore/processedTxs.ts index ee91348..aa010e9 100644 --- a/src/dbstore/processedTxs.ts +++ b/src/dbstore/processedTxs.ts @@ -2,6 +2,7 @@ import * as db from './sqlite3storage' import { processedTxDatabase } from './' import * as Logger from '../Logger' import { config } from '../Config' +import { getPreparedStmt } from './prepared-statements/preparedStmtManager' // const superjson = require('superjson') /** @@ -15,27 +16,21 @@ export interface ProcessedTransaction { } export async function insertProcessedTx(processedTx: ProcessedTransaction): Promise { - try { - - // Define the table columns based on schema - const columns = ['txId', 'cycle', 'txTimestamp', 'applyTimestamp']; - - // Construct the SQL query with placeholders - const placeholders = `(${columns.map(() => '?').join(', ')})`; - const sql = ` - INSERT INTO processedTxs (${columns.join(', ')}) VALUES ${placeholders} - ON CONFLICT (txId) DO UPDATE SET - cycle = excluded.cycle, - txTimestamp = excluded.txTimestamp, - applyTimestamp = excluded.applyTimestamp - `; - - // Map the `processedTx` object to match the columns - const values = columns.map((column) => processedTx[column]); - - // Execute the query directly (single-row insert/update) - await db.run(processedTxDatabase, sql, values); + const stmt = getPreparedStmt('insertProcessedTx'); + const values = [ + processedTx.txId, + processedTx.cycle, + processedTx.txTimestamp, + processedTx.applyTimestamp, + ]; + + await new Promise((resolve, reject) => { + stmt.run(values, (err) => { + if (err) reject(err); + else resolve(); + }); + }); if (config.VERBOSE) { Logger.mainLogger.debug('Successfully inserted ProcessedTransaction', processedTx.txId); @@ -50,7 +45,6 @@ export async function insertProcessedTx(processedTx: ProcessedTransaction): Prom } - export async function bulkInsertProcessedTxs(processedTxs: ProcessedTransaction[]): Promise { try { @@ -85,56 +79,92 @@ export async function bulkInsertProcessedTxs(processedTxs: ProcessedTransaction[ } } - - -export async function queryProcessedTxByTxId(txId: string): Promise { +export async function queryProcessedTxByTxId(txId: string): Promise { try { - const sql = `SELECT * FROM processedTxs WHERE txId=?` - const processedTx = (await db.get(processedTxDatabase, sql, [txId])) as ProcessedTransaction + // Get the prepared statement + const stmt = getPreparedStmt('queryProcessedTxByTxId'); + + // Execute the prepared statement + const processedTx = await new Promise((resolve, reject) => { + stmt.get([txId], (err, row) => { + if (err) reject(err); + else resolve(row as ProcessedTransaction | null); + }); + }); + + // Log if verbose mode is enabled if (config.VERBOSE) { - Logger.mainLogger.debug('ProcessedTransaction txId', processedTx) + Logger.mainLogger.debug('ProcessedTransaction txId', processedTx); } - return processedTx + + return processedTx; } catch (e) { - Logger.mainLogger.error(e) - return null + Logger.mainLogger.error(e); + return null; } } -export async function queryProcessedTxsByCycleNumber(cycleNumber: number): Promise { +export async function queryProcessedTxsByCycleNumber( + cycleNumber: number +): Promise { try { - const sql = `SELECT * FROM processedTxs WHERE cycle=?` - const processedTxs = (await db.all(processedTxDatabase, sql, [cycleNumber])) as ProcessedTransaction[] + // Get the prepared statement + const stmt = getPreparedStmt('queryProcessedTxsByCycleNumber'); + + // Execute the prepared statement + const processedTxs = await new Promise((resolve, reject) => { + stmt.all([cycleNumber], (err, rows) => { + if (err) reject(err); + else resolve(rows as ProcessedTransaction[]); + }); + }); + + // Log if verbose mode is enabled if (config.VERBOSE) { - Logger.mainLogger.debug(`ProcessedTransactions for cycle: ${cycleNumber} ${processedTxs.length}`) + Logger.mainLogger.debug( + `ProcessedTransactions for cycle: ${cycleNumber}, count: ${processedTxs.length}` + ); } - return processedTxs + + return processedTxs; } catch (e) { - Logger.mainLogger.error(e) - return null + Logger.mainLogger.error(e); + return null; } } export async function querySortedTxsBetweenCycleRange( startCycle: number, endCycle: number -): Promise { +): Promise { try { - const sql = `SELECT txId FROM processedTxs WHERE cycle BETWEEN ? AND ?` - const txIdsArray = (await db.all(processedTxDatabase, sql, [startCycle, endCycle])) as { txId: string }[] + // Get the prepared statement + const stmt = getPreparedStmt('querySortedTxsBetweenCycleRange'); + + // Execute the prepared statement + const txIdsArray = await new Promise<{ txId: string }[]>((resolve, reject) => { + stmt.all([startCycle, endCycle], (err, rows) => { + if (err) reject(err); + else resolve(rows as { txId: string }[]); + }); + }); + if (config.VERBOSE) { - Logger.mainLogger.debug(`txIds between ${startCycle} and ${endCycle} are ${txIdsArray ? txIdsArray.length : 0}`) + Logger.mainLogger.debug( + `txIds between ${startCycle} and ${endCycle} are ${txIdsArray ? txIdsArray.length : 0}` + ); } - if (!txIdsArray) { - return [] + if (!txIdsArray || txIdsArray.length === 0) { + return []; } - const txIds = txIdsArray.map((tx) => tx.txId) - txIds.sort() - return txIds + // Extract and sort transaction IDs + const txIds = txIdsArray.map((tx) => tx.txId); + txIds.sort(); + return txIds; } catch (e) { - Logger.mainLogger.error('error in querySortedTxsBetweenCycleRange: ', e) - return null + Logger.mainLogger.error('Error in querySortedTxsBetweenCycleRange:', e); + return null; } -} +} \ No newline at end of file diff --git a/src/dbstore/receipts.ts b/src/dbstore/receipts.ts index 9dfeb97..3e6d7d8 100644 --- a/src/dbstore/receipts.ts +++ b/src/dbstore/receipts.ts @@ -401,4 +401,4 @@ function deserializeDbReceipt(receipt: DbReceipt): void { if (receipt.signedReceipt) receipt.signedReceipt = DeSerializeFromJsonString(receipt.signedReceipt) // globalModification is stored as 0 or 1 in the database, convert it to boolean receipt.globalModification = (receipt.globalModification as unknown as number) === 1 -} +} \ No newline at end of file diff --git a/src/dbstore/transactions.ts b/src/dbstore/transactions.ts index 5e9b4af..74f8752 100644 --- a/src/dbstore/transactions.ts +++ b/src/dbstore/transactions.ts @@ -4,7 +4,7 @@ import { transactionDatabase } from '.' import * as Logger from '../Logger' import { config } from '../Config' import { DeSerializeFromJsonString, SerializeToJsonString } from '../utils/serialization' - +import { getPreparedStmt } from './prepared-statements/preparedStmtManager' /** * Transaction is for storing dapp receipt (eg. evm receipt in shardeum) * If there is no dapp receipt, we can skip storing in transactions table and use receipts table @@ -26,22 +26,26 @@ type DbTransaction = Transaction & { export async function insertTransaction(transaction: Transaction): Promise { try { - // Define the table columns based on schema - const columns = ['txId', 'appReceiptId', 'timestamp', 'cycleNumber', 'data', 'originalTxData']; - - // Construct the SQL query with placeholders - const placeholders = `(${columns.map(() => '?').join(', ')})`; - const sql = `INSERT OR REPLACE INTO transactions (${columns.join(', ')}) VALUES ${placeholders}`; + // Get the prepared statement + const stmt = getPreparedStmt('insertTransaction'); // Map the `transaction` object to match the columns - const values = columns.map((column) => - typeof transaction[column] === 'object' - ? SerializeToJsonString(transaction[column]) // Serialize objects to JSON - : transaction[column] - ); + const values = [ + transaction.txId, + transaction.appReceiptId, + transaction.timestamp, + transaction.cycleNumber, + SerializeToJsonString(transaction.data), + SerializeToJsonString(transaction.originalTxData), + ]; - // Execute the query directly - await db.run(transactionDatabase, sql, values); + // Execute the prepared statement + await new Promise((resolve, reject) => { + stmt.run(values, (err) => { + if (err) reject(err); + else resolve(); + }); + }); if (config.VERBOSE) { Logger.mainLogger.debug('Successfully inserted Transaction', transaction.txId); @@ -90,132 +94,178 @@ export async function bulkInsertTransactions(transactions: Transaction[]): Promi export async function queryTransactionByTxId(txId: string): Promise { try { - const sql = `SELECT * FROM transactions WHERE txId=?` - const transaction = (await db.get(transactionDatabase, sql, [txId])) as DbTransaction // TODO: confirm structure of object from db - if (transaction) { - if (transaction.data) transaction.data = DeSerializeFromJsonString(transaction.data) - if (transaction.originalTxData) - transaction.originalTxData = DeSerializeFromJsonString(transaction.originalTxData) - } - if (config.VERBOSE) { - Logger.mainLogger.debug('Transaction txId', transaction) - } - return transaction - } catch (e) { - Logger.mainLogger.error(e) - return null - } -} + // Get the prepared statement + const stmt = getPreparedStmt('queryTransactionByTxId'); -export async function queryTransactionByAccountId(accountId: string): Promise { - try { - const sql = `SELECT * FROM transactions WHERE accountId=?` - const transaction = (await db.get(transactionDatabase, sql, [accountId])) as DbTransaction // TODO: confirm structure of object from db + // Execute the prepared statement + const transaction = await new Promise((resolve, reject) => { + stmt.get([txId], (err, row) => { + if (err) reject(err); + else resolve(row as DbTransaction); + }); + }); + + // Deserialize JSON fields if the transaction exists if (transaction) { - if (transaction.data) transaction.data = DeSerializeFromJsonString(transaction.data) - if (transaction.originalTxData) - transaction.originalTxData = DeSerializeFromJsonString(transaction.originalTxData) + if (transaction.data) { + transaction.data = DeSerializeFromJsonString(transaction.data); + } + if (transaction.originalTxData) { + transaction.originalTxData = DeSerializeFromJsonString(transaction.originalTxData); + } } + if (config.VERBOSE) { - Logger.mainLogger.debug('Transaction accountId', transaction) + Logger.mainLogger.debug('Transaction txId', transaction); } - return transaction + + return transaction; } catch (e) { - Logger.mainLogger.error(e) - return null + Logger.mainLogger.error(e); + return null; } } -export async function queryLatestTransactions(count: number): Promise { - if (!Number.isInteger(count)) { - Logger.mainLogger.error('queryLatestTransactions - Invalid count value') - return null + +export async function queryLatestTransactions(count: number): Promise { + if (!Number.isInteger(count) || count <= 0) { + Logger.mainLogger.error('queryLatestTransactions - Invalid count value'); + return null; } + try { - const sql = `SELECT * FROM transactions ORDER BY cycleNumber DESC, timestamp DESC LIMIT ${ - count ? count : 100 - }` - const transactions = (await db.all(transactionDatabase, sql)) as DbTransaction[] // TODO: confirm structure of object from db + // Get the prepared statement + const stmt = getPreparedStmt('queryLatestTransactions'); + + // Execute the prepared statement + const transactions = await new Promise((resolve, reject) => { + stmt.all([count], (err, rows) => { + if (err) reject(err); + else resolve(rows as DbTransaction[]); + }); + }); + + // Deserialize JSON fields for each transaction if (transactions.length > 0) { transactions.forEach((transaction: DbTransaction) => { - if (transaction.data) transaction.data = DeSerializeFromJsonString(transaction.data) - if (transaction.originalTxData) - transaction.originalTxData = DeSerializeFromJsonString(transaction.originalTxData) - }) + if (transaction.data) { + transaction.data = DeSerializeFromJsonString(transaction.data); + } + if (transaction.originalTxData) { + transaction.originalTxData = DeSerializeFromJsonString(transaction.originalTxData); + } + }); } + if (config.VERBOSE) { - Logger.mainLogger.debug('Transaction latest', transactions) + Logger.mainLogger.debug('Transaction latest', transactions); } - return transactions + + return transactions; } catch (e) { - Logger.mainLogger.error(e) - return null + Logger.mainLogger.error(e); + return null; } } -export async function queryTransactions(skip = 0, limit = 10000): Promise { - let transactions + +export async function queryTransactions(skip = 0, limit = 10000): Promise { if (!Number.isInteger(skip) || !Number.isInteger(limit)) { - Logger.mainLogger.error('queryTransactions - Invalid skip or limit') - return null + Logger.mainLogger.error('queryTransactions - Invalid skip or limit'); + return null; } + try { - const sql = `SELECT * FROM transactions ORDER BY cycleNumber ASC, timestamp ASC LIMIT ${limit} OFFSET ${skip}` - transactions = (await db.all(transactionDatabase, sql)) as DbTransaction[] // TODO: confirm structure of object from db + // Get the prepared statement + const stmt = getPreparedStmt('queryTransactions'); + + // Execute the prepared statement + const transactions = await new Promise((resolve, reject) => { + stmt.all([limit, skip], (err, rows) => { + if (err) reject(err); + else resolve(rows as DbTransaction[]); + }); + }); + + // Deserialize JSON fields for each transaction if (transactions.length > 0) { transactions.forEach((transaction: DbTransaction) => { - if (transaction.data) transaction.data = DeSerializeFromJsonString(transaction.data) - if (transaction.originalTxData) - transaction.originalTxData = DeSerializeFromJsonString(transaction.originalTxData) - }) + if (transaction.data) { + transaction.data = DeSerializeFromJsonString(transaction.data); + } + if (transaction.originalTxData) { + transaction.originalTxData = DeSerializeFromJsonString(transaction.originalTxData); + } + }); } + + if (config.VERBOSE) { + Logger.mainLogger.debug( + 'Transaction transactions', + transactions ? transactions.length : transactions, + 'skip', + skip + ); + } + + return transactions; } catch (e) { - Logger.mainLogger.error(e) - } - if (config.VERBOSE) { - Logger.mainLogger.debug( - 'Transaction transactions', - transactions ? transactions.length : transactions, - 'skip', - skip - ) + Logger.mainLogger.error(e); + return null; } - return transactions } export async function queryTransactionCount(): Promise { - let transactions + let transactions; try { - const sql = `SELECT COUNT(*) FROM transactions` - transactions = await db.get(transactionDatabase, sql, []) + // Get the prepared statement + const stmt = getPreparedStmt('queryTransactionCount'); + + // Execute the prepared statement + transactions = await new Promise<{ 'COUNT(*)': number }>((resolve, reject) => { + stmt.get([], (err, row) => { + if (err) reject(err); + else resolve(row as { 'COUNT(*)': number }); + }); + }); + + if (config.VERBOSE) { + Logger.mainLogger.debug('Transaction count', transactions); + } } catch (e) { - Logger.mainLogger.error(e) + Logger.mainLogger.error(e); } - if (config.VERBOSE) { - Logger.mainLogger.debug('Transaction count', transactions) - } - if (transactions) transactions = transactions['COUNT(*)'] - else transactions = 0 - return transactions + + // Preserve the original logic for accessing 'COUNT(*)' + return transactions ? transactions['COUNT(*)'] : 0; } export async function queryTransactionCountBetweenCycles( startCycleNumber: number, endCycleNumber: number ): Promise { - let transactions try { - const sql = `SELECT COUNT(*) FROM transactions WHERE cycleNumber BETWEEN ? AND ?` - transactions = await db.get(transactionDatabase, sql, [startCycleNumber, endCycleNumber]) + // Get the prepared statement + const stmt = getPreparedStmt('queryTransactionCountBetweenCycles'); + + // Execute the prepared statement + const transactions = await new Promise<{ 'COUNT(*)': number }>((resolve, reject) => { + stmt.get([startCycleNumber, endCycleNumber], (err, row) => { + if (err) reject(err); + else resolve(row as { 'COUNT(*)': number }); + }); + }); + + if (config.VERBOSE) { + Logger.mainLogger.debug('Transaction count between cycles', transactions); + } + + // Preserve original logic for accessing 'COUNT(*)' + return transactions ? transactions['COUNT(*)'] : 0; } catch (e) { - Logger.mainLogger.error(e) + Logger.mainLogger.error(e); + return 0; // Return 0 in case of an error } - if (config.VERBOSE) { - Logger.mainLogger.debug('Transaction count between cycles', transactions) - } - if (transactions) transactions = transactions['COUNT(*)'] - else transactions = 0 - return transactions } export async function queryTransactionsBetweenCycles( @@ -224,34 +274,45 @@ export async function queryTransactionsBetweenCycles( startCycleNumber: number, endCycleNumber: number ): Promise { - let transactions + let transactions: Transaction[] = []; + if (!Number.isInteger(skip) || !Number.isInteger(limit)) { - Logger.mainLogger.error('queryTransactionsBetweenCycles - Invalid skip or limit value') - return null + Logger.mainLogger.error('queryTransactionsBetweenCycles - Invalid skip or limit value'); + return null; } + try { - const sql = `SELECT * FROM transactions WHERE cycleNumber BETWEEN ? AND ? ORDER BY cycleNumber ASC, timestamp ASC LIMIT ${limit} OFFSET ${skip}` - transactions = (await db.all(transactionDatabase, sql, [ - startCycleNumber, - endCycleNumber, - ])) as DbTransaction[] // TODO: confirm structure of object from db - if (transactions.length > 0) { - transactions.forEach((transaction: DbTransaction) => { - if (transaction.data) transaction.data = DeSerializeFromJsonString(transaction.data) + // Get the prepared statement + const stmt = getPreparedStmt('queryTransactionsBetweenCycles'); + + // Execute the prepared statement + const dbTransactions = await new Promise((resolve, reject) => { + stmt.all([startCycleNumber, endCycleNumber, limit, skip], (err, rows) => { + if (err) reject(err); + else resolve(rows as DbTransaction[]); + }); + }); + + if (dbTransactions.length > 0) { + dbTransactions.forEach((transaction: DbTransaction) => { + if (transaction.data) transaction.data = DeSerializeFromJsonString(transaction.data); if (transaction.originalTxData) - transaction.originalTxData = DeSerializeFromJsonString(transaction.originalTxData) - }) + transaction.originalTxData = DeSerializeFromJsonString(transaction.originalTxData); + }); + transactions = dbTransactions; } } catch (e) { - Logger.mainLogger.error(e) + Logger.mainLogger.error(e); } + if (config.VERBOSE) { Logger.mainLogger.debug( 'Transaction transactions between cycles', transactions ? transactions.length : transactions, 'skip', skip - ) + ); } - return transactions + + return transactions; }