diff --git a/_emulator/.firebaserc b/_emulator/.firebaserc index 5d2802a3c..8019a3d99 100644 --- a/_emulator/.firebaserc +++ b/_emulator/.firebaserc @@ -1,5 +1,13 @@ { "projects": { "default": "demo-test" + }, + "targets": {}, + "etags": { + "dev-extensions-testing": { + "extensionInstances": { + "firestore-bigquery-export": "02acbd8b443b9635716d52d65758a78db1e51140191caecaaf60d932d314a62a" + } + } } } \ No newline at end of file diff --git a/firestore-bigquery-export/CHANGELOG.md b/firestore-bigquery-export/CHANGELOG.md index 06946ab46..6f684225b 100644 --- a/firestore-bigquery-export/CHANGELOG.md +++ b/firestore-bigquery-export/CHANGELOG.md @@ -1,3 +1,13 @@ +## Version 0.1.56 + +feat - improve sync strategy by immediately writing to BQ, and using cloud tasks only as a last resort + +refactor - improve observability/logging of events + +chore - remove legacy backfill code + +fix - improved usage of the types from change tracker package + ## Version 0.1.55 feat - log failed queued tasks diff --git a/firestore-bigquery-export/README.md b/firestore-bigquery-export/README.md index 932e71881..2807b8c1a 100644 --- a/firestore-bigquery-export/README.md +++ b/firestore-bigquery-export/README.md @@ -126,8 +126,6 @@ To install an extension, your project must be on the [Blaze (pay as you go) plan * Collection path: What is the path of the collection that you would like to export? You may use `{wildcard}` notation to match a subcollection of all documents in a collection (for example: `chatrooms/{chatid}/posts`). Parent Firestore Document IDs from `{wildcards}` can be returned in `path_params` as a JSON formatted string. -* Enable logging failed exports: If enabled, the extension will log event exports that failed to enqueue to Cloud Logging, to mitigate data loss. - * Enable Wildcard Column field with Parent Firestore Document IDs: If enabled, creates a column containing a JSON object of all wildcard ids from a documents path. * Dataset ID: What ID would you like to use for your BigQuery dataset? This extension will create the dataset, if it doesn't already exist. @@ -158,18 +156,16 @@ essential for the script to insert data into an already partitioned table.) * Exclude old data payloads: If enabled, table rows will never contain old data (document snapshot before the Firestore onDocumentUpdate event: `change.before.data()`). The reduction in data should be more performant, and avoid potential resource limitations. -* Use Collection Group query: Do you want to use a [collection group](https://firebase.google.com/docs/firestore/query-data/queries#collection-group-query) query for importing existing documents? You have to enable collectionGroup query if your import path contains subcollections. Warning: A collectionGroup query will target every collection in your Firestore project that matches the 'Existing documents collection'. For example, if you have 10,000 documents with a subcollection named: landmarks, this will query every document in 10,000 landmarks collections. - * Cloud KMS key name: Instead of Google managing the key encryption keys that protect your data, you control and manage key encryption keys in Cloud KMS. If this parameter is set, the extension will specify the KMS key name when creating the BQ table. See the PREINSTALL.md for more details. +* Maximum number of enqueue attempts: This parameter will set the maximum number of attempts to enqueue a document to cloud tasks for export to BigQuery. If the maximum number of attempts is reached, the failed export will be handled according to the `LOG_FAILED_EXPORTS` parameter. + **Cloud Functions:** * **fsexportbigquery:** Listens for document changes in your specified Cloud Firestore collection, then exports the changes into BigQuery. -* **fsimportexistingdocs:** Imports existing documents from the specified collection into BigQuery. Imported documents will have a special changelog with the operation of `IMPORT` and the timestamp of epoch. - * **syncBigQuery:** A task-triggered function that gets called on BigQuery sync * **initBigQuerySync:** Runs configuration for sycning with BigQuery diff --git a/firestore-bigquery-export/extension.yaml b/firestore-bigquery-export/extension.yaml index c8045fe97..e41c68043 100644 --- a/firestore-bigquery-export/extension.yaml +++ b/firestore-bigquery-export/extension.yaml @@ -13,7 +13,7 @@ # limitations under the License. name: firestore-bigquery-export -version: 0.1.55 +version: 0.1.56 specVersion: v1beta displayName: Stream Firestore to BigQuery @@ -48,6 +48,9 @@ roles: - role: datastore.user reason: Allows the extension to write updates to the database. + # - role: storage.objectAdmin + # reason: Allows the extension to create objects in the storage bucket. + resources: - name: fsexportbigquery type: firebaseextensions.v1beta.function @@ -60,19 +63,6 @@ resources: eventType: providers/cloud.firestore/eventTypes/document.write resource: projects/${param:PROJECT_ID}/databases/(default)/documents/${param:COLLECTION_PATH}/{documentId} - - name: fsimportexistingdocs - type: firebaseextensions.v1beta.function - description: - Imports existing documents from the specified collection into BigQuery. - Imported documents will have a special changelog with the operation of - `IMPORT` and the timestamp of epoch. - properties: - runtime: nodejs18 - taskQueueTrigger: - retryConfig: - maxAttempts: 15 - minBackoffSeconds: 60 - - name: syncBigQuery type: firebaseextensions.v1beta.function description: >- @@ -206,19 +196,6 @@ params: default: posts required: true - - param: LOG_FAILED_EXPORTS - label: Enable logging failed exports - description: >- - If enabled, the extension will log event exports that failed to enqueue to - Cloud Logging, to mitigate data loss. - type: select - options: - - label: Yes - value: yes - - label: No - value: no - required: true - - param: WILDCARD_IDS label: Enable Wildcard Column field with Parent Firestore Document IDs description: >- @@ -409,74 +386,6 @@ params: - label: No value: no - # - param: DO_BACKFILL - # label: Import existing Firestore documents into BigQuery? - # description: >- - # Do you want to import existing documents from your Firestore collection - # into BigQuery? These documents will have each have a special changelog - # with the operation of `IMPORT` and the timestamp of epoch. This ensures - # that any operation on an imported document supersedes the import record. - # type: select - # required: true - # default: no - # options: - # - label: Yes - # value: yes - # - label: No - # value: no - - # - param: IMPORT_COLLECTION_PATH - # label: Existing Documents Collection - # description: >- - # Specify the path of the Cloud Firestore Collection you would like to - # import from. This may or may not be the same Collection for which you plan - # to mirror changes. If you want to use a collectionGroup query, provide the - # collection name value here, and set 'Use Collection Group query' to true. - # You may use `{wildcard}` notation with an enabled collectionGroup query to - # match a subcollection of all documents in a collection (e.g., - # `chatrooms/{chatid}/posts`). - # type: string - # validationRegex: "^[^/]+(/[^/]+/[^/]+)*$" - # validationErrorMessage: - # Firestore collection paths must be an odd number of segments separated by - # slashes, e.g. "path/to/collection". - # example: posts - # required: false - - - param: USE_COLLECTION_GROUP_QUERY - label: Use Collection Group query - description: >- - Do you want to use a [collection - group](https://firebase.google.com/docs/firestore/query-data/queries#collection-group-query) - query for importing existing documents? You have to enable collectionGroup - query if your import path contains subcollections. Warning: A - collectionGroup query will target every collection in your Firestore - project that matches the 'Existing documents collection'. For example, if - you have 10,000 documents with a subcollection named: landmarks, this will - query every document in 10,000 landmarks collections. - type: select - default: no - options: - - label: Yes - value: yes - - label: No - value: no - - # - param: DOCS_PER_BACKFILL - # label: Docs per backfill - # description: >- - # When importing existing documents, how many should be imported at once? - # The default value of 200 should be ok for most users. If you are using a - # transform function or have very large documents, you may need to set this - # to a lower number. If the lifecycle event function times out, lower this - # value. - # type: string - # example: 200 - # validationRegex: "^[1-9][0-9]*$" - # validationErrorMessage: Must be a postive integer. - # default: 200 - # required: true - - param: KMS_KEY_NAME label: Cloud KMS key name description: >- @@ -491,6 +400,38 @@ params: 'projects/PROJECT_NAME/locations/KEY_RING_LOCATION/keyRings/KEY_RING_ID/cryptoKeys/KEY_ID'. required: false + - param: MAX_ENQUEUE_ATTEMPTS + label: Maximum number of enqueue attempts + description: >- + This parameter will set the maximum number of attempts to enqueue a + document to cloud tasks for export to BigQuery. If the maximum number of + attempts is reached, the failed export will be handled according to the + `LOG_FAILED_EXPORTS` parameter. + type: string + validationRegex: ^(10|[1-9])$ + validationErrorMessage: Please select an integer between 1 and 10 + default: 3 + + # - param: BACKUP_TO_GCS + # label: Backup to GCS + # description: >- + # If enabled, failed BigQuery updates will be written to a GCS bucket. + # type: select + # options: + # - label: Yes + # value: yes + # - label: No + # value: no + # default: no + # required: true + + # - param: BACKUP_GCS_BUCKET + # label: Backup GCS Bucket Name + # description: >- + # This (optional) parameter will allow you to specify a GCS bucket for which + # failed BigQuery updates will be written to, if this feature is enabled. + # type: string + events: - type: firebase.extensions.firestore-counter.v1.onStart description: diff --git a/firestore-bigquery-export/functions/__tests__/__snapshots__/config.test.ts.snap b/firestore-bigquery-export/functions/__tests__/__snapshots__/config.test.ts.snap index d5a48cdab..8d6803f5b 100644 --- a/firestore-bigquery-export/functions/__tests__/__snapshots__/config.test.ts.snap +++ b/firestore-bigquery-export/functions/__tests__/__snapshots__/config.test.ts.snap @@ -2,7 +2,10 @@ exports[`extension config config loaded from environment variables 1`] = ` Object { + "backupBucketName": "undefined.appspot.com", "backupCollectionId": undefined, + "backupDir": "_firestore-bigquery-export", + "backupToGCS": false, "bqProjectId": undefined, "clustering": Array [ "data", @@ -12,23 +15,20 @@ Object { "databaseId": "(default)", "datasetId": "my_dataset", "datasetLocation": undefined, - "doBackfill": false, - "docsPerBackfill": 200, "excludeOldData": false, "importCollectionPath": undefined, "initialized": false, "instanceId": undefined, "kmsKeyName": "test", "location": "us-central1", - "logFailedExportData": false, "maxDispatchesPerSecond": 10, + "maxEnqueueAttempts": 3, "tableId": "my_table", "timePartitioning": null, "timePartitioningField": undefined, "timePartitioningFieldType": undefined, "timePartitioningFirestoreField": undefined, "transformFunction": "", - "useCollectionGroupQuery": false, "useNewSnapshotQuerySyntax": false, "wildcardIds": false, } diff --git a/firestore-bigquery-export/functions/__tests__/e2e.test.ts b/firestore-bigquery-export/functions/__tests__/e2e.test.ts index 4b6ff773e..4773a3e25 100644 --- a/firestore-bigquery-export/functions/__tests__/e2e.test.ts +++ b/firestore-bigquery-export/functions/__tests__/e2e.test.ts @@ -2,9 +2,9 @@ import * as admin from "firebase-admin"; import { BigQuery } from "@google-cloud/bigquery"; /** Set defaults */ -const bqProjectId = "dev-extensions-testing"; -const datasetId = "firestore_export"; -const tableId = "bq_e2e_test_raw_changelog"; +const bqProjectId = process.env.BQ_PROJECT_ID || "dev-extensions-testing"; +const datasetId = process.env.DATASET_ID || "firestore_export"; +const tableId = process.env.TABLE_ID || "bq_e2e_test_raw_changelog"; /** Init resources */ admin.initializeApp({ projectId: bqProjectId }); @@ -34,7 +34,7 @@ describe("e2e", () => { /** Get the latest record from this table */ const [changeLogQuery] = await bq.createQueryJob({ - query: `SELECT * FROM \`${bqProjectId}.${datasetId}.${tableId}\` ORDER BY timestamp DESC \ LIMIT 1`, + query: `SELECT * FROM \`${bqProjectId}.${datasetId}.${tableId}\` ORDER BY timestamp DESC LIMIT 1`, }); const [rows] = await changeLogQuery.getQueryResults(); diff --git a/firestore-bigquery-export/functions/__tests__/functions.test.ts b/firestore-bigquery-export/functions/__tests__/functions.test.ts index 801ef71dc..d9aefb52e 100644 --- a/firestore-bigquery-export/functions/__tests__/functions.test.ts +++ b/firestore-bigquery-export/functions/__tests__/functions.test.ts @@ -37,6 +37,7 @@ jest.mock("firebase-admin/functions", () => ({ })); jest.mock("../src/logs", () => ({ + ...jest.requireActual("../src/logs"), start: jest.fn(() => logger.log("Started execution of extension with configuration", config) ), diff --git a/firestore-bigquery-export/functions/package-lock.json b/firestore-bigquery-export/functions/package-lock.json index 6879b257a..69656d49f 100644 --- a/firestore-bigquery-export/functions/package-lock.json +++ b/firestore-bigquery-export/functions/package-lock.json @@ -7,7 +7,7 @@ "name": "firestore-bigquery-export", "license": "Apache-2.0", "dependencies": { - "@firebaseextensions/firestore-bigquery-change-tracker": "^1.1.37", + "@firebaseextensions/firestore-bigquery-change-tracker": "^1.1.38", "@google-cloud/bigquery": "^7.6.0", "@types/chai": "^4.1.6", "@types/express-serve-static-core": "4.17.30", @@ -572,9 +572,9 @@ } }, "node_modules/@firebaseextensions/firestore-bigquery-change-tracker": { - "version": "1.1.37", - "resolved": "https://registry.npmjs.org/@firebaseextensions/firestore-bigquery-change-tracker/-/firestore-bigquery-change-tracker-1.1.37.tgz", - "integrity": "sha512-+pepcVgtXurbgLjHyDz/fWWNrThAa+UANY+1+kfBRr6V+AzS7wrtSyCRO5bfbO1L/0Tn3DHENJVCWBqeMcwTyw==", + "version": "1.1.38", + "resolved": "https://registry.npmjs.org/@firebaseextensions/firestore-bigquery-change-tracker/-/firestore-bigquery-change-tracker-1.1.38.tgz", + "integrity": "sha512-GPebB/JB3QyTph6/0mo3V9oUsD4C6wM1oTlhlKGOR/Km2fIPv022rvOVorfA4IgWRCRD8uQP1SpkDiKHJ4r5TQ==", "dependencies": { "@google-cloud/bigquery": "^7.6.0", "@google-cloud/resource-manager": "^5.1.0", diff --git a/firestore-bigquery-export/functions/package.json b/firestore-bigquery-export/functions/package.json index fca7e29df..5450224fd 100644 --- a/firestore-bigquery-export/functions/package.json +++ b/firestore-bigquery-export/functions/package.json @@ -13,10 +13,11 @@ "author": "Jan Wyszynski ", "license": "Apache-2.0", "dependencies": { - "@firebaseextensions/firestore-bigquery-change-tracker": "^1.1.37", + "@firebaseextensions/firestore-bigquery-change-tracker": "^1.1.38", "@google-cloud/bigquery": "^7.6.0", "@types/chai": "^4.1.6", "@types/express-serve-static-core": "4.17.30", + "@types/jest": "29.5.0", "@types/node": "^20.4.4", "chai": "^4.2.0", "firebase-admin": "^12.0.0", @@ -24,20 +25,19 @@ "firebase-functions-test": "^0.3.3", "generate-schema": "^2.6.0", "inquirer": "^6.4.0", + "jest": "29.5.0", + "jest-config": "29.5.0", "lodash": "^4.17.14", "nyc": "^14.0.0", "rimraf": "^2.6.3", "sql-formatter": "^2.3.3", + "ts-jest": "29.1.2", "ts-node": "^9.0.0", - "typescript": "^4.8.4", - "@types/jest": "29.5.0", - "jest": "29.5.0", - "jest-config": "29.5.0", - "ts-jest": "29.1.2" + "typescript": "^4.8.4" }, "private": true, "devDependencies": { - "mocked-env": "^1.3.2", - "faker": "^5.1.0" + "faker": "^5.1.0", + "mocked-env": "^1.3.2" } } diff --git a/firestore-bigquery-export/functions/src/config.ts b/firestore-bigquery-export/functions/src/config.ts index 3adb5d05d..3ef09ebfa 100644 --- a/firestore-bigquery-export/functions/src/config.ts +++ b/firestore-bigquery-export/functions/src/config.ts @@ -32,13 +32,10 @@ export function clustering(clusters: string | undefined) { } export default { - logFailedExportData: process.env.LOG_FAILED_EXPORTS === "yes", bqProjectId: process.env.BIGQUERY_PROJECT_ID, databaseId: "(default)", collectionPath: process.env.COLLECTION_PATH, datasetId: process.env.DATASET_ID, - doBackfill: process.env.DO_BACKFILL === "yes", - docsPerBackfill: parseInt(process.env.DOCS_PER_BACKFILL) || 200, tableId: process.env.TABLE_ID, location: process.env.LOCATION, initialized: false, @@ -63,5 +60,12 @@ export default { process.env.MAX_DISPATCHES_PER_SECOND || "10" ), kmsKeyName: process.env.KMS_KEY_NAME, - useCollectionGroupQuery: process.env.USE_COLLECTION_GROUP_QUERY === "yes", + maxEnqueueAttempts: isNaN(parseInt(process.env.MAX_ENQUEUE_ATTEMPTS)) + ? 3 + : parseInt(process.env.MAX_ENQUEUE_ATTEMPTS), + // backup bucket defaults to default firebase cloud storage bucket + backupToGCS: process.env.BACKUP_TO_GCS === "yes" ? true : false, + backupBucketName: + process.env.BACKUP_GCS_BUCKET || `${process.env.PROJECT_ID}.appspot.com`, + backupDir: `_${process.env.INSTANCE_ID || "firestore-bigquery-export"}`, }; diff --git a/firestore-bigquery-export/functions/src/index.ts b/firestore-bigquery-export/functions/src/index.ts index 76f2399a8..13e1b577a 100644 --- a/firestore-bigquery-export/functions/src/index.ts +++ b/firestore-bigquery-export/functions/src/index.ts @@ -19,19 +19,17 @@ import * as functions from "firebase-functions"; import * as admin from "firebase-admin"; import { getExtensions } from "firebase-admin/extensions"; import { getFunctions } from "firebase-admin/functions"; -import { getFirestore } from "firebase-admin/firestore"; - import { ChangeType, FirestoreBigQueryEventHistoryTracker, - FirestoreEventHistoryTracker, + FirestoreDocumentChangeEvent, } from "@firebaseextensions/firestore-bigquery-change-tracker"; -import { getEventarc } from "firebase-admin/eventarc"; import * as logs from "./logs"; import * as events from "./events"; -import { getChangeType, getDocumentId, resolveWildcardIds } from "./util"; +import { getChangeType, getDocumentId } from "./util"; +// Configuration for the Firestore Event History Tracker. const eventTrackerConfig = { tableId: config.tableId, datasetId: config.datasetId, @@ -42,89 +40,147 @@ const eventTrackerConfig = { timePartitioningField: config.timePartitioningField, timePartitioningFieldType: config.timePartitioningFieldType, timePartitioningFirestoreField: config.timePartitioningFirestoreField, + // Database related configurations databaseId: config.databaseId, clustering: config.clustering, wildcardIds: config.wildcardIds, bqProjectId: config.bqProjectId, + // Optional configurations useNewSnapshotQuerySyntax: config.useNewSnapshotQuerySyntax, skipInit: true, kmsKeyName: config.kmsKeyName, }; +// Initialize the Firestore Event History Tracker with the given configuration. const eventTracker: FirestoreBigQueryEventHistoryTracker = new FirestoreBigQueryEventHistoryTracker(eventTrackerConfig); +// Initialize logging. logs.init(); -/** Init app, if not already initialized */ +/** Initialize Firebase Admin SDK if not already initialized */ if (admin.apps.length === 0) { admin.initializeApp(); } +// Setup the event channel for EventArc. events.setupEventChannel(); +/** + * Cloud Function to handle enqueued tasks to synchronize Firestore changes to BigQuery. + */ export const syncBigQuery = functions.tasks .taskQueue() .onDispatch( async ({ context, changeType, documentId, data, oldData }, ctx) => { - const update = { - timestamp: context.timestamp, // This is a Cloud Firestore commit timestamp with microsecond precision. - operation: changeType, - documentName: context.resource.name, - documentId: documentId, - pathParams: config.wildcardIds ? context.params : null, - eventId: context.eventId, - data, - oldData, - }; - - /** Record the chnages in the change tracker */ - await eventTracker.record([{ ...update }]); - - /** Send an event Arc update , if configured */ - await events.recordSuccessEvent({ - subject: documentId, - data: { - ...update, - }, - }); + const documentName = context.resource.name; + const eventId = context.eventId; + const operation = changeType; + + logs.logEventAction( + "Firestore event received by onDispatch trigger", + documentName, + eventId, + operation + ); + + try { + // Use the shared function to write the event to BigQuery + await recordEventToBigQuery( + changeType, + documentId, + data, + oldData, + context + ); + + // Record a success event in EventArc, if configured + await events.recordSuccessEvent({ + subject: documentId, + data: { + timestamp: context.timestamp, + operation: changeType, + documentName: context.resource.name, + documentId, + pathParams: config.wildcardIds ? context.params : null, + eventId: context.eventId, + data, + oldData, + }, + }); + + // Log completion of the task. + logs.complete(); + } catch (err) { + // Log error and throw it to handle in the calling function. + logs.logFailedEventAction( + "Failed to write event to BigQuery from onDispatch handler", + documentName, + eventId, + operation, + err as Error + ); - logs.complete(); + throw err; + } } ); -export const fsexportbigquery = functions - .runWith({ failurePolicy: true }) - .firestore.database(config.databaseId) +/** + * Cloud Function triggered on Firestore document changes to export data to BigQuery. + */ +export const fsexportbigquery = functions.firestore + .database(config.databaseId) .document(config.collectionPath) .onWrite(async (change, context) => { + // Start logging the function execution. logs.start(); + + // Determine the type of change (CREATE, UPDATE, DELETE). const changeType = getChangeType(change); const documentId = getDocumentId(change); + // Check if the document is newly created or deleted. const isCreated = changeType === ChangeType.CREATE; const isDeleted = changeType === ChangeType.DELETE; + // Get the new data (after change) and old data (before change). const data = isDeleted ? undefined : change.after?.data(); const oldData = isCreated || config.excludeOldData ? undefined : change.before?.data(); - /** - * Serialize early before queueing in cloud task - * Cloud tasks currently have a limit of 1mb, this also ensures payloads are kept to a minimum - */ + const documentName = context.resource.name; + const eventId = context.eventId; + const operation = changeType; + + logs.logEventAction( + "Firestore event received by onWrite trigger", + documentName, + eventId, + operation + ); + let serializedData: any; let serializedOldData: any; try { + // Serialize the data before processing. serializedData = eventTracker.serializeData(data); serializedOldData = eventTracker.serializeData(oldData); } catch (err) { - logs.error(false, "Failed to serialize data", err, null, null); + // Log serialization error and throw it. + logs.logFailedEventAction( + "Failed to serialize data", + documentName, + eventId, + operation, + err as Error + ); throw err; } try { + // Record the start event for the change in EventArc, if configured. await events.recordStartEvent({ documentId, changeType, @@ -133,163 +189,183 @@ export const fsexportbigquery = functions context: context.resource, }); } catch (err) { - logs.error(false, "Failed to record start event", err, null, null); + // Log the error if recording start event fails and throw it. + logs.error(false, "Failed to record start event", err); throw err; } try { - const queue = getFunctions().taskQueue( - `locations/${config.location}/functions/syncBigQuery`, - config.instanceId + // Write the change event to BigQuery. + await recordEventToBigQuery( + changeType, + documentId, + serializedData, + serializedOldData, + context ); - - await queue.enqueue({ + } catch (err) { + functions.logger.warn( + "Failed to write event to BigQuery Immediately. Will attempt to Enqueue to Cloud Tasks.", + err + ); + // Handle enqueue errors with retries and backup to GCS. + await attemptToEnqueue( + err, context, changeType, documentId, - data: serializedData, - oldData: serializedOldData, - }); - } catch (err) { - const event = { - timestamp: context.timestamp, // This is a Cloud Firestore commit timestamp with microsecond precision. - operation: changeType, - documentName: context.resource.name, - documentId: documentId, - pathParams: config.wildcardIds ? context.params : null, - eventId: context.eventId, - data: serializedData, - oldData: serializedOldData, - }; - - await events.recordErrorEvent(err as Error); - // Only log the error once here - if (!err.logged) { - logs.error( - config.logFailedExportData, - "Failed to enqueue task to syncBigQuery", - err, - event, - eventTrackerConfig - ); - } - return; + serializedData, + serializedOldData + ); } + // Log the successful completion of the function. logs.complete(); }); +/** + * Record the event to the Firestore Event History Tracker and BigQuery. + * + * @param changeType - The type of change (CREATE, UPDATE, DELETE). + * @param documentId - The ID of the Firestore document. + * @param serializedData - The serialized new data of the document. + * @param serializedOldData - The serialized old data of the document. + * @param context - The event context from Firestore. + */ +async function recordEventToBigQuery( + changeType: ChangeType, + documentId: string, + serializedData: any, + serializedOldData: any, + context: functions.EventContext +) { + const event: FirestoreDocumentChangeEvent = { + timestamp: context.timestamp, // Cloud Firestore commit timestamp + operation: changeType, // The type of operation performed + documentName: context.resource.name, // The document name + documentId, // The document ID + pathParams: (config.wildcardIds ? context.params : null) as + | FirestoreDocumentChangeEvent["pathParams"] + | null, // Path parameters, if any + eventId: context.eventId, // The event ID from Firestore + data: serializedData, // Serialized new data + oldData: serializedOldData, // Serialized old data + }; + + // Record the event in the Firestore Event History Tracker and BigQuery. + await eventTracker.record([event]); +} + +/** + * Handle errors when enqueueing tasks to sync BigQuery. + * + * @param err - The error object. + * @param context - The event context from Firestore. + * @param changeType - The type of change (CREATE, UPDATE, DELETE). + * @param documentId - The ID of the Firestore document. + * @param serializedData - The serialized new data of the document. + * @param serializedOldData - The serialized old data of the document. + */ +async function attemptToEnqueue( + err: Error, + context: functions.EventContext, + changeType: ChangeType, + documentId: string, + serializedData: any, + serializedOldData: any +) { + try { + const queue = getFunctions().taskQueue( + `locations/${config.location}/functions/syncBigQuery`, + config.instanceId + ); + + let attempts = 0; + const jitter = Math.random() * 100; // Adding jitter to avoid collision + + // Exponential backoff formula with a maximum of 5 + jitter seconds + const backoff = (attempt: number) => + Math.min(Math.pow(2, attempt) * 100, 5000) + jitter; + + while (attempts < config.maxEnqueueAttempts) { + if (attempts > 0) { + // Wait before retrying to enqueue the task. + await new Promise((resolve) => setTimeout(resolve, backoff(attempts))); + } + + attempts++; + try { + // Attempt to enqueue the task to the queue. + await queue.enqueue({ + context, + changeType, + documentId, + data: serializedData, + oldData: serializedOldData, + }); + break; // Break the loop if enqueuing is successful. + } catch (enqueueErr) { + // Throw the error if max attempts are reached. + if (attempts === config.maxEnqueueAttempts) { + throw enqueueErr; + } + } + } + } catch (enqueueErr) { + // Prepare the event object for error logging. + + // Record the error event. + await events.recordErrorEvent(enqueueErr as Error); + + const documentName = context.resource.name; + const eventId = context.eventId; + const operation = changeType; + + logs.logFailedEventAction( + "Failed to enqueue event to Cloud Tasks from onWrite handler", + documentName, + eventId, + operation, + enqueueErr as Error + ); + } +} + +/** + * Cloud Function to set up BigQuery sync by initializing the event tracker. + */ export const setupBigQuerySync = functions.tasks .taskQueue() .onDispatch(async () => { /** Setup runtime environment */ const runtime = getExtensions().runtime(); - /** Init the BigQuery sync */ + // Initialize the BigQuery sync. await eventTracker.initialize(); + // Update the processing state. await runtime.setProcessingState( "PROCESSING_COMPLETE", "Sync setup completed" ); }); +/** + * Cloud Function to initialize BigQuery sync. + */ export const initBigQuerySync = functions.tasks .taskQueue() .onDispatch(async () => { /** Setup runtime environment */ const runtime = getExtensions().runtime(); - /** Init the BigQuery sync */ + // Initialize the BigQuery sync. await eventTracker.initialize(); - /** Run Backfill */ - if (false) { - await getFunctions() - .taskQueue( - `locations/${config.location}/functions/fsimportexistingdocs`, - config.instanceId - ) - .enqueue({ offset: 0, docsCount: 0 }); - return; - } - + // Update the processing state. await runtime.setProcessingState( "PROCESSING_COMPLETE", "Sync setup completed" ); return; }); - -exports.fsimportexistingdocs = functions.tasks - .taskQueue() - .onDispatch(async (data, context) => { - const runtime = getExtensions().runtime(); - await runtime.setProcessingState( - "PROCESSING_COMPLETE", - "Completed. No existing documents imported into BigQuery." - ); - return; - - // if (!config.doBackfill || !config.importCollectionPath) { - // await runtime.setProcessingState( - // "PROCESSING_COMPLETE", - // "Completed. No existing documents imported into BigQuery." - // ); - // return; - // } - - // const offset = (data["offset"] as number) ?? 0; - // const docsCount = (data["docsCount"] as number) ?? 0; - - // const query = config.useCollectionGroupQuery - // ? getFirestore(config.databaseId).collectionGroup( - // config.importCollectionPath.split("/")[ - // config.importCollectionPath.split("/").length - 1 - // ] - // ) - // : getFirestore(config.databaseId).collection(config.importCollectionPath); - - // const snapshot = await query - // .offset(offset) - // .limit(config.docsPerBackfill) - // .get(); - - // const rows = snapshot.docs.map((d) => { - // return { - // timestamp: new Date().toISOString(), - // operation: ChangeType.IMPORT, - // documentName: `projects/${config.bqProjectId}/databases/(default)/documents/${d.ref.path}`, - // documentId: d.id, - // eventId: "", - // pathParams: resolveWildcardIds(config.importCollectionPath, d.ref.path), - // data: eventTracker.serializeData(d.data()), - // }; - // }); - // try { - // await eventTracker.record(rows); - // } catch (err: any) { - // /** If configured, event tracker wil handle failed rows in a backup collection */ - // functions.logger.log(err); - // } - // if (rows.length == config.docsPerBackfill) { - // // There are more documents to import - enqueue another task to continue the backfill. - // const queue = getFunctions().taskQueue( - // `locations/${config.location}/functions/fsimportexistingdocs`, - // config.instanceId - // ); - // await queue.enqueue({ - // offset: offset + config.docsPerBackfill, - // docsCount: docsCount + rows.length, - // }); - // } else { - // // We are finished, set the processing state to report back how many docs were imported. - // runtime.setProcessingState( - // "PROCESSING_COMPLETE", - // `Successfully imported ${ - // docsCount + rows.length - // } documents into BigQuery` - // ); - // } - // await events.recordCompletionEvent({ context }); - }); diff --git a/firestore-bigquery-export/functions/src/logs.ts b/firestore-bigquery-export/functions/src/logs.ts index c312cecdf..5eb112f3e 100644 --- a/firestore-bigquery-export/functions/src/logs.ts +++ b/firestore-bigquery-export/functions/src/logs.ts @@ -15,6 +15,7 @@ */ import { logger } from "firebase-functions"; import config from "./config"; +import { ChangeType } from "@firebaseextensions/firestore-bigquery-change-tracker"; export const arrayFieldInvalid = (fieldName: string) => { logger.warn(`Array field '${fieldName}' does not contain an array, skipping`); @@ -153,20 +154,20 @@ export const error = ( includeEvent: boolean, message: string, err: Error, - event: any, - eventTrackerConfig: any + event?: any, // Made optional, as it is not always required + eventTrackerConfig?: any // Made optional, as it is not always required ) => { - if (includeEvent) { - logger.error(`Error when mirroring data to BigQuery: ${message}`, { - error: err, - event, - eventTrackerConfig, - }); - } else { - logger.error(`Error when mirroring data to BigQuery: ${message}`, { - error: err, - }); + const logDetails: Record = { error: err }; + + if (includeEvent && event) { + logDetails.event = event; + } + + if (includeEvent && eventTrackerConfig) { + logDetails.eventTrackerConfig = eventTrackerConfig; } + + logger.error(`Error when mirroring data to BigQuery: ${message}`, logDetails); }; export const init = () => { @@ -182,3 +183,31 @@ export const timestampMissingValue = (fieldName: string) => { `Missing value for timestamp field: ${fieldName}, using default timestamp instead.` ); }; + +export const logEventAction = ( + action: string, + document_name: string, + event_id: string, + operation: ChangeType +) => { + logger.info(action, { + document_name, + event_id, + operation, + }); +}; + +export const logFailedEventAction = ( + action: string, + document_name: string, + event_id: string, + operation: ChangeType, + error: Error +) => { + logger.error(action, { + document_name, + event_id, + operation, + error, + }); +}; diff --git a/firestore-bigquery-export/functions/src/util.ts b/firestore-bigquery-export/functions/src/util.ts index c93ab45f1..c128ee56f 100644 --- a/firestore-bigquery-export/functions/src/util.ts +++ b/firestore-bigquery-export/functions/src/util.ts @@ -19,6 +19,11 @@ import { Change } from "firebase-functions"; import { ChangeType } from "@firebaseextensions/firestore-bigquery-change-tracker"; +/** + * Get the change type (CREATE, UPDATE, DELETE) from the Firestore change. + * @param change Firestore document change object. + * @returns {ChangeType} The type of change. + */ export function getChangeType(change: Change): ChangeType { if (!change.after.exists) { return ChangeType.DELETE; @@ -29,6 +34,11 @@ export function getChangeType(change: Change): ChangeType { return ChangeType.UPDATE; } +/** + * Get the document ID from the Firestore change. + * @param change Firestore document change object. + * @returns {string} The document ID. + */ export function getDocumentId(change: Change): string { if (change.after.exists) { return change.after.id; diff --git a/firestore-bigquery-export/functions/stress_test/count.js b/firestore-bigquery-export/functions/stress_test/count.js new file mode 100644 index 000000000..6f9ea64d9 --- /dev/null +++ b/firestore-bigquery-export/functions/stress_test/count.js @@ -0,0 +1,33 @@ +const admin = require("firebase-admin"); + +// Initialize Firebase Admin with your credentials +// Make sure you've already set up your Firebase Admin SDK +admin.initializeApp({ + projectId: "vertex-testing-1efc3", +}); + +const firestore = admin.firestore(); + +async function countDocuments(collectionPath) { + try { + const collectionRef = firestore.collection(collectionPath); + + // Perform an aggregate query to count the documents + const snapshot = await collectionRef.count().get(); + + // Access the count from the snapshot + const docCount = snapshot.data().count; + + console.log( + `Number of documents in collection '${collectionPath}':`, + docCount + ); + return docCount; + } catch (error) { + console.error("Error counting documents:", error); + throw error; + } +} + +// Call the function and pass the collection path +countDocuments("posts_2"); diff --git a/firestore-bigquery-export/functions/stress_test/main.js b/firestore-bigquery-export/functions/stress_test/main.js new file mode 100644 index 000000000..cdecdff8e --- /dev/null +++ b/firestore-bigquery-export/functions/stress_test/main.js @@ -0,0 +1,186 @@ +const { Worker } = require("worker_threads"); +const { performance } = require("perf_hooks"); +const path = require("path"); +const admin = require("firebase-admin"); + +// Initialize Firebase Admin SDK +admin.initializeApp({ + projectId: "vertex-testing-1efc3", +}); + +// Get a reference to the Firestore service +const db = admin.firestore(); + +const totalDocs = 1000000; // Total number of documents to write +const maxThreads = 20; // Maximum number of worker threads +const batchSize = 500; // Documents per batch +const targetRate = 500; // Target docs per second +const rampUpDelay = 1000; // Delay between ramp-ups +const rampUps = 5; // Number of ramp-ups +const docsPerRampUp = Math.ceil(totalDocs / rampUps); // Documents per ramp-up + +// Calculate the delay needed to meet the target rate (in milliseconds) +const delayBetweenBatches = Math.max(1000 / (targetRate / batchSize), 0); // Delay between batches in ms + +// Hardcoded collection paths with the form: A/{aid}/B/{bid}/C/{cid}/D/{did}/E/{eid}/F/{fid}/G +const collectionPaths = [ + "A/aid1/B/bid1/C/cid1/D/did1/E/eid1/F/fid1/G", + "A/aid2/B/bid2/C/cid2/D/did2/E/eid2/F/fid2/G", + "A/aid3/B/bid3/C/cid3/D/did3/E/eid3/F/fid3/G", + "A/aid4/B/bid4/C/cid4/D/did4/E/eid4/F/fid4/G", + "A/aid5/B/bid5/C/cid5/D/did5/E/eid5/F/fid5/G", +]; + +// Start measuring total execution time +const totalStartTime = performance.now(); + +const workerJsPath = path.resolve(__dirname, "worker.js"); + +// Function to spawn worker threads for a specific ramp-up +const spawnWorkers = async ( + activeThreads, + startDoc, + docsPerRampUp, + collectionPath +) => { + console.log( + `Spawning ${activeThreads} worker(s) for collection ${collectionPath}...` + ); + let promises = []; + const docsPerThread = Math.ceil(docsPerRampUp / activeThreads); + + for (let i = 0; i < activeThreads; i++) { + const docsForThisThread = Math.min(docsPerThread, docsPerRampUp); + const start = startDoc + i * docsForThisThread; + const end = Math.min(start + docsForThisThread, startDoc + docsPerRampUp); + + promises.push( + new Promise((resolve, reject) => { + const worker = new Worker(workerJsPath, { + workerData: { + start, + end, + batchSize, + collectionPath, // Pass the collection path to the worker + delayBetweenBatches, // Pass the delay to the worker + }, + }); + + worker.on("message", (message) => { + console.log(`Worker ${i + 1}: ${message}`); + }); + + worker.on("error", (err) => { + console.error(`Worker ${i + 1} error: ${err}`); + reject(err); + }); + + worker.on("exit", (code) => { + if (code !== 0) { + reject(new Error(`Worker ${i + 1} stopped with exit code ${code}`)); + } else { + resolve(); + } + }); + }) + ); + } + + try { + await Promise.all(promises); + } catch (error) { + console.error("Error in worker threads: ", error); + throw error; + } +}; + +// Function to query Firestore for the total document count using count() aggregation +const getCollectionCounts = async () => { + let counts = {}; + + for (const collectionPath of collectionPaths) { + const collectionRef = db.collection(collectionPath); + const snapshot = await collectionRef.count().get(); // Use the count aggregation query + const count = snapshot.data().count; + counts[collectionPath] = count; + console.log(`Collection ${collectionPath} has ${count} documents.`); + } + + return counts; +}; + +// Function to calculate the difference between two count objects +const calculateCountDifference = (beforeCounts, afterCounts) => { + let totalDifference = 0; + + for (const collectionPath in beforeCounts) { + const beforeCount = beforeCounts[collectionPath] || 0; + const afterCount = afterCounts[collectionPath] || 0; + const difference = afterCount - beforeCount; + console.log(`Collection ${collectionPath} difference: ${difference}`); + totalDifference += difference; + } + + return totalDifference; +}; + +// Function to execute ramp-ups +const executeRampUps = async () => { + let activeThreads = 1; + let startDoc = 0; + + for (let i = 0; i < rampUps; i++) { + const collectionPath = collectionPaths[i % collectionPaths.length]; // Rotate through collections + await spawnWorkers(activeThreads, startDoc, docsPerRampUp, collectionPath); + startDoc += docsPerRampUp; + + if (activeThreads < maxThreads) { + activeThreads++; // Increase the number of threads for next ramp-up + } + + if (i < rampUps - 1) { + console.log( + `Ramping up to ${activeThreads} worker(s) in ${ + rampUpDelay / 1000 + } seconds...` + ); + await new Promise((resolve) => setTimeout(resolve, rampUpDelay)); + } + } +}; + +// Main execution flow +const main = async () => { + try { + // Count documents before writing + console.log("Counting documents before the operation..."); + const beforeCounts = await getCollectionCounts(); + + // Perform the writing operation + await executeRampUps(); + + // Count documents after writing + console.log("Counting documents after the operation..."); + const afterCounts = await getCollectionCounts(); + + // Calculate and log the difference + const totalDocsWritten = calculateCountDifference( + beforeCounts, + afterCounts + ); + console.log(`Total documents written: ${totalDocsWritten}`); + + const totalEndTime = performance.now(); + const totalDuration = (totalEndTime - totalStartTime) / 1000; // Convert to seconds + console.log( + `Successfully written ${totalDocsWritten} documents in ${totalDuration.toFixed( + 2 + )} seconds.` + ); + } catch (error) { + console.error("Error during execution: ", error); + } +}; + +// Run the main function +main(); diff --git a/firestore-bigquery-export/functions/stress_test/worker.js b/firestore-bigquery-export/functions/stress_test/worker.js new file mode 100644 index 000000000..3f53e985b --- /dev/null +++ b/firestore-bigquery-export/functions/stress_test/worker.js @@ -0,0 +1,87 @@ +const { parentPort, workerData } = require("worker_threads"); +const admin = require("firebase-admin"); +const { v4: uuidv4 } = require("uuid"); +const { performance } = require("perf_hooks"); + +// Initialize Firebase Admin SDK +admin.initializeApp({ + projectId: "vertex-testing-1efc3", +}); + +// Get a reference to the Firestore service +const db = admin.firestore(); + +// Generate a large random document closer to 1MB +const generateRandomDocument = () => { + // const largeString = "x".repeat(300000); // A string of 300,000 characters (~300 KB) + // const largeArray = new Array(5000).fill().map((_, i) => ({ + // index: i, + // value: `Value_${Math.random().toString(36).substring(7)}`, + // })); + + return { + id: uuidv4(), + name: `Name_${Math.random().toString(36).substring(7)}`, + age: Math.floor(Math.random() * 60) + 18, // Random age between 18 and 78 + email: `user_${Math.random().toString(36).substring(7)}@example.com`, + isActive: Math.random() > 0.5, // Random boolean value + createdAt: admin.firestore.Timestamp.now(), + // largeString, // Large string field + // largeArray, // Large array field + }; +}; + +// Delay function for rate control +const delay = (ms) => new Promise((resolve) => setTimeout(resolve, ms)); + +// Write a batch of documents to a specific collection in Firestore +const writeBatch = async ( + start, + end, + batchSize, + collectionPath, + delayBetweenBatches +) => { + let count = start; + while (count < end) { + const batchStartTime = performance.now(); + + let batch = db.batch(); + const remainingDocs = end - count; + const adjustedBatchSize = Math.min(batchSize, remainingDocs); // Adjust batch size if remaining docs < batchSize + + for (let i = 0; i < adjustedBatchSize && count < end; i++) { + let docRef = db.collection(collectionPath).doc(); + batch.set(docRef, generateRandomDocument()); + count++; + } + + await batch.commit(); + + const batchEndTime = performance.now(); + const batchDuration = (batchEndTime - batchStartTime) / 1000; // Convert to seconds + parentPort.postMessage( + `Batch of ${adjustedBatchSize} documents written in ${batchDuration.toFixed( + 2 + )} seconds to ${collectionPath}.` + ); + + // Introduce delay between batches to meet target rate + await delay(delayBetweenBatches); + } +}; + +// Start writing in batches +writeBatch( + workerData.start, + workerData.end, + workerData.batchSize, + workerData.collectionPath, + workerData.delayBetweenBatches // Pass the delay for rate control +) + .then(() => { + parentPort.postMessage("Completed writing documents."); + }) + .catch((error) => { + parentPort.postMessage(`Error writing documents: ${error}`); + });