Skip to content

Commit

Permalink
feat: parallel funcs fix, e2e + stub unit tests
Browse files Browse the repository at this point in the history
  • Loading branch information
barjin committed Nov 14, 2022
1 parent 121c794 commit a486669
Show file tree
Hide file tree
Showing 3 changed files with 149 additions and 113 deletions.
188 changes: 104 additions & 84 deletions packages/apify-extra/src/dataset.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,27 +6,6 @@ import { APIFY_EXTRA_KV_RECORD_PREFIX, APIFY_EXTRA_LOG_PREFIX } from './const';

export type DatasetItem = Exclude<Parameters<OriginalDataset['pushData']>[0], any[]>;

export interface ParallelPersistedPushDataOptions {
/**
* Number of items to be pushed in one push call.
* Should not be higher than 1000 to ensure each push call finishes when migration happens.
*
* @default 1000
*/
uploadBatchSize?: number;
/**
* Number of push calls to be done in parallel. Apify API should handle up to 30 parallel requests but better be careful.
*
* @default 10
*/
parallelPushes?: number;
/**
* You must provide idempotency key if you want to call this function multiple times in the same run.
* The key should only contain letters and numbers.
*/
idempotencyKey?: string;
}

export async function waitForCompletion<T>(promises: (() => Promise<T>)[], maxConcurrency: number): Promise<void> {
async function worker() {
let job;
Expand Down Expand Up @@ -61,103 +40,144 @@ export class ChunkTracker {

export class Dataset extends OriginalDataset {
/**
* Returns batches of items to be pushed to dataset.
* @param data Items to be pushed to dataset.
* @param batchSize Number of items to be pushed in one push call.
* @param batchCount Number of batches to be returned.
* @returns Array of batches of items to be pushed to dataset.
*/
private getBatches(data: DatasetItem | DatasetItem[], batchSize: number, batchCount: number) {
if (!Array.isArray(data)) {
return [[data]];
}
return [...Array(batchCount).keys()]
.map((x: number) => data.slice(x * batchSize, (x + 1) * batchSize))
.filter((x) => x.length !== 0);
}

/**
* Push data to dataset in parallel while also resuming correctly after Actor migration.
* This should be used when you want to push a lot of items to dataset at once.
* If you call this function multiple times in the same run,
* you must provide different idempotencyKey option for each call.
* Stores an object or an array of objects to the dataset.
* The function returns a promise that resolves when the operation finishes.
* It has no result, but throws on invalid args or other errors.
*
* **IMPORTANT**: Make sure to use the `await` keyword when calling `pushDataParallel()`,
* otherwise the crawler process might finish before the data is stored!
*
* The size of the data is limited by the receiving API and therefore `pushDataParallel()` will only
* allow objects whose JSON representation is smaller than 9MB. When an array is passed,
* none of the included objects
* may be larger than 9MB, but the array itself may be of any size.
*
* This method parallellizes the pushData calls to the Apify API, which can handle up to 30 parallel requests.
* It also ensures keeps track of the progress and can resume the push if the actor is migrated.
* Unline the `pushData` method, this method does not guarantee the order of items.
*
* @param data Object or array of objects containing data to be stored in the dataset.
* The objects must be serializable to JSON and the JSON representation of each object must be smaller than 9MB.
* @param [options] All `pushDataParallel()` parameters.
* @param [options.batchSize] Number of items to be pushed in one push call.
* Should not be higher than 1000 to ensure each push call finishes when migration happens.
* @param [options.parallelPushes] Number of push calls to be done in parallel. Apify API should handle up to 30 parallel requests.
* @param [options.idempotencyKey] By providing different idempotency keys (any string), you can call this function multiple times in the same run.
*/
public async pushDataParallel(data: DatasetItem | DatasetItem[], options: ParallelPersistedPushDataOptions = {}) {
public async pushDataParallel(data: DatasetItem | DatasetItem[], options: {
batchSize?: number;
parallelPushes?: number;
idempotencyKey?: string;
} = {}) {
if (!Array.isArray(data)) {
return this.pushData(data);
}

const {
uploadBatchSize = 1000,
batchSize = 1000,
parallelPushes = 10,
idempotencyKey = '',
} = options;

let isMigrating = false;
Actor.on('migrating', () => { isMigrating = true; });
Actor.on('aborting', () => { isMigrating = true; });
if (parallelPushes > 30) {
log.warning(`${APIFY_EXTRA_LOG_PREFIX} Setting the parallelPushes option larger than 30 can lead to problems with the Apify Platform API.`);
}

const sanitizedIdempotencyKey = idempotencyKey.replace(/[^a-zA-Z0-9]/g, '-').slice(0, 30);
const chunkTrackerName = `${APIFY_EXTRA_KV_RECORD_PREFIX}-PUSH-${this.id}-${sanitizedIdempotencyKey}`;
const chunkTracker = new ChunkTracker(await Actor.getValue<Record<string, boolean>>(chunkTrackerName));

const kvRecordName = `${APIFY_EXTRA_KV_RECORD_PREFIX}STATE-PUSHED-COUNT${this.name}${sanitizedIdempotencyKey}`;
let pushedItemsCount = await Actor.getValue<number>(kvRecordName) ?? 0;

Actor.on('persistState', async () => {
await Actor.setValue(kvRecordName, pushedItemsCount);
});

const stepSize = uploadBatchSize * parallelPushes;

for (let i = pushedItemsCount; i < data.length; i += stepSize) {
if (isMigrating) {
log.info(`${APIFY_EXTRA_LOG_PREFIX}[pushDataParallel]: Stopping push because of migration`);
// hang indefinitely until migration is done
await new Promise(() => {});
}
let isMigrating = false;
const migrationCallback = async (migrating: boolean) => {
isMigrating = migrating ?? true;
await Actor.setValue(chunkTrackerName, chunkTracker.get());
};

const itemsToPush = data.slice(i, i + stepSize);
const batches = this.getBatches(itemsToPush, uploadBatchSize, parallelPushes);
const pushPromises = batches.map(this.pushData);
Actor.on('migrating', migrationCallback);
Actor.on('aborting', migrationCallback);
Actor.on('persistState', () => migrationCallback(false));

// We must update it before awaiting the promises because the push can take time
// and migration can cut us off but the items will already be on the way to dataset
pushedItemsCount += itemsToPush.length;
await Promise.all(pushPromises);
}
return waitForCompletion(
[...new Array(Math.ceil(data.length / batchSize))]
.filter((_, i) => !chunkTracker.has(`${batchSize * i}`))
.map((_, i) => async () => {
if (isMigrating) {
log.info(`${APIFY_EXTRA_LOG_PREFIX}[pushParallel]: Stopping pushParallel because of migration`);
await new Promise(() => {});
}

chunkTracker.add(`${batchSize * i}`);
const currentSlice = data.slice(batchSize * i, batchSize * (i + 1));
if (currentSlice.length > 0) {
await this.pushData(currentSlice);
}
}),
parallelPushes);
};

/**
* Iterates over dataset items, passing every item to the provided `func` function.
* Each invocation of `func` is called with two arguments: `(item, index)`. Index specifies the zero-based index of the item in the dataset.
*
* If the `func` function returns a Promise, it is awaited.
* If it throws an error, the iteration is aborted and the `forEachParallel` function throws the error.
*
* **Example usage**
* ```typescript
* const dataset = await Dataset.open('my-results');
* await dataset.forEachParallel(async (item, index) => {
* console.log(`Item at ${index}: ${JSON.stringify(item)}`);
* });
* ```
*
* *Important note*: Unlike the `forEach` method, this method processes items in parallel and does not guarantee the order of items.
* It also doesn't wait before calling the provided function for the next item.
*
* @param func A function that is called for every item in the dataset.
* @param [options] All `forEach()` parameters.
* @param [options.parallelLoads] Maximum number of item batches to be processed in parallel.
* @param [options.batchSize] Maximum number of items to be processed in one batch.
* @param [options.persistState] If `true`, the processing state will be persisted between actor migrations and runs.
* @returns {Promise<void>}
*/
public async forEachParallel(
func: Parameters<OriginalDataset['forEach']>[0],
options: Parameters<OriginalDataset['forEach']>[1] & { parallelLoads?: number; batchSize?: number; persistState: boolean },
options: Parameters<OriginalDataset['forEach']>[1] & { parallelLoads?: number; batchSize?: number; persistState: boolean; idempotencyKey?: string },
) {
const { parallelLoads = 20, batchSize = 50000 } = options;
const { offset: globalOffset = 0, limit: globalLimit = 0 } = options;
const { parallelLoads = 20, batchSize = 50000, persistState, idempotencyKey = '' } = options;
const { offset: globalOffset = 0, limit: globalLimit = Infinity } = options;

const chunkTrackerName = `${APIFY_EXTRA_KV_RECORD_PREFIX}CHUNKS${this.name}`;
const chunkTracker = new ChunkTracker(await Actor.getValue<Record<string, boolean>>(chunkTrackerName));
const sanitizedIdempotencyKey = idempotencyKey.replace(/[^a-zA-Z0-9]/g, '-').slice(0, 30);
const chunkTrackerName = `${APIFY_EXTRA_KV_RECORD_PREFIX}FOREACH-${this.id}-${sanitizedIdempotencyKey}`;
const chunkTracker = new ChunkTracker(persistState ? await Actor.getValue<Record<string, boolean>>(chunkTrackerName) : undefined);

let isMigrating = false;
const migrationCallback = async () => {
isMigrating = true;
await Actor.setValue(chunkTrackerName, chunkTracker.get());

const migrationCallback = async (migrating: boolean) => {
isMigrating = migrating ?? true;
await Actor.setValue(chunkTrackerName, persistState ? chunkTracker.get() : null);
};

Actor.on('migrating', migrationCallback);
Actor.on('aborting', migrationCallback);

Actor.on('persistState', async () => {
await Actor.setValue(chunkTrackerName, chunkTracker.get());
});
Actor.on('persistState', () => migrationCallback(false));

const { itemCount } = await this.getInfo() ?? { itemCount: 0 };

return waitForCompletion(
[...new Array(Math.ceil((itemCount < globalLimit ? itemCount : globalLimit) / batchSize))]
[...new Array(Math.ceil((itemCount < globalLimit ? itemCount : globalLimit) / batchSize))] // every item represents one chunk
.filter((_, i) => !chunkTracker.has(`${globalOffset + i * batchSize}`))
.map((_, i) => async () => {
if (isMigrating) await new Promise(() => {}); // blocks indefinitely - after a while, stops the entire execution
await this.forEach(func, { ...options, limit: batchSize, offset: globalOffset + batchSize * i });
if (isMigrating) {
log.info(`${APIFY_EXTRA_LOG_PREFIX}[forEachParallel]: Stopping forEachParallel because of migration`);
// hang indefinitely until migration is done
await new Promise(() => {});
}

const { items } = await this.getData({ limit: batchSize, offset: globalOffset + batchSize * i });

chunkTracker.add(`${globalOffset + batchSize * i}`);
await Promise.all(items.map((item, b) => func(item, globalOffset + batchSize * i + b)));
}),
parallelLoads);
};
Expand Down
45 changes: 16 additions & 29 deletions test/apify/extra-dataset.test.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import { ENV_VARS } from '@apify/consts';
import { Actor, Configuration, PlatformEventManager, log } from 'apify';
import { Actor, Configuration, Dataset as DatasetClassic, PlatformEventManager, log } from 'apify';
import { Server } from 'ws';
import { Dataset } from 'apify-extra';

describe('apify-extra dataset', () => {
describe('forEachParallel', () => {
let wss: Server = null;
const config = Configuration.getGlobalConfig();
const events = new PlatformEventManager(config);
Expand All @@ -20,43 +20,30 @@ describe('apify-extra dataset', () => {
wss.close(done);
});

test('forEachParallel', async () => {
// let wsClosed = false;
// const isWsConnected = new Promise((resolve) => {
// wss.on('connection', (ws, req) => {
// ws.on('close', () => {
// wsClosed = true;
// });
// resolve(ws);
// expect(req.url).toBe('/someRunId');
// const send = (obj: Dictionary) => ws.send(JSON.stringify(obj));

// setTimeout(() => send({ name: 'migrating' }), 10);
// });
// });

test('slicing should work', async () => {
const forEachSpy = jest.spyOn(Dataset.prototype, 'forEach').mockImplementation();
jest.spyOn(Dataset.prototype, 'getInfo').mockImplementation(async () => ({
itemCount: 20,
itemCount: 229, // a prime number
} as any));
jest.spyOn(Actor.prototype, 'getValue').mockImplementation(async () => ({}));

// await isWsConnected;

const dataset = new Dataset({
client: Configuration.getStorageClient(),
id: 'dataset-forEachParallel-test',
});

await dataset.forEachParallel(() => { log.debug('noop'); }, { persistState: true });
await dataset.forEachParallel(() => { log.debug('noop'); }, { persistState: true, batchSize: 7 });

// // Cleanup.
// await new Promise<void>((resolve) => {
// wss.close(async () => {
// await sleep(10); // Here must be short sleep to get following line to later tick
// expect(wsClosed).toBe(true);
// resolve();
// });
// });
expect(forEachSpy).toBeCalledTimes(Math.ceil(229 / 7));

[...Array(Math.ceil(229 / 7))].forEach((_, i) => {
expect(forEachSpy).toBeCalledWith(
expect.any(Function),
expect.objectContaining({
limit: 7,
offset: i * 7,
}));
});
forEachSpy.mockRestore();
}, 60e3);
});
29 changes: 29 additions & 0 deletions test/e2e/extra-dataset/test.mjs
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import { getTestDir, initialize, expect } from '../tools.mjs';
import { Dataset } from 'apify-extra';
import { Actor, Configuration, log } from 'apify';

const testDir = getTestDir(import.meta.url);

const exit = process.exit;
process.exit = () => {};

await initialize(testDir);
await Actor.openDataset('dataset-forEachParallel-test');
const dataset = new Dataset({
id: 'dataset-forEachParallel-test',
client: Configuration.getStorageClient(),
});

const ITEM_COUNT = 229;
const target = [];

await dataset.pushDataParallel([...new Array(ITEM_COUNT)].map((_, i) => ({ index: i })), { batchSize: 10, parallelPushes: 4 });
await dataset.forEachParallel((x) => target.push(x), { persistState: true, batchSize: 10 });

expect((await dataset.getData()).total === ITEM_COUNT, 'all items pushed');
expect([...new Array(ITEM_COUNT)].every((_, i) => target.some(x => x.index === i)), 'all items processed');

expect(target.length === ITEM_COUNT, `forEach called ${target.length} times.`);

process.exit = exit;
process.exit(0);

0 comments on commit a486669

Please sign in to comment.