Skip to content

Commit

Permalink
feat(logs): paginate through logs after backoff
Browse files Browse the repository at this point in the history
  • Loading branch information
philnash committed Jul 16, 2021
1 parent 928dbcb commit ab2e4b6
Show file tree
Hide file tree
Showing 2 changed files with 138 additions and 48 deletions.
94 changes: 69 additions & 25 deletions packages/serverless-api/src/api/logs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,39 @@ import { LogApiResource, LogList, Sid, LogFilters } from '../types';
import { TwilioServerlessApiClient } from '../client';
import { getPaginatedResource } from './utils/pagination';
import { ClientApiError } from '../utils/error';
import { OptionsOfJSONResponseBody } from 'got';

const log = debug('twilio-serverless-api:logs');

function urlWithFilters(
environmentSid: Sid,
serviceSid: Sid,
filters: LogFilters = {}
): string {
const pageSize = filters.pageSize || 50;
const { functionSid, startDate, endDate, pageToken } = filters;
let url = `Services/${serviceSid}/Environments/${environmentSid}/Logs?PageSize=${pageSize}`;
if (typeof functionSid !== 'undefined') {
url += `&FunctionSid=${functionSid}`;
}
if (typeof startDate !== 'undefined') {
url += `&StartDate=${
startDate instanceof Date ? startDate.toISOString() : startDate
}`;
}
if (typeof endDate !== 'undefined') {
url += `&EndDate=${
endDate instanceof Date ? endDate.toISOString() : endDate
}`;
}
if (typeof pageToken !== 'undefined') {
url += `&PageToken=${pageToken}`;
}
return url;
}

/**
* Calls the API to retrieve a list of all assets
* Calls the API to retrieve a list of all logs
*
* @param {Sid} environmentSid environment in which to get logs
* @param {Sid} serviceSid service to look for logs
Expand All @@ -24,7 +52,7 @@ export async function listLogResources(
try {
return getPaginatedResource<LogList, LogApiResource>(
client,
`Services/${serviceSid}/Environments/${environmentSid}/Logs`
urlWithFilters(environmentSid, serviceSid)
);
} catch (err) {
log('%O', new ClientApiError(err));
Expand All @@ -33,7 +61,7 @@ export async function listLogResources(
}

/**
* Calls the API to retrieve a list of all assets
* Calls the API to retrieve one page of a list of logs
*
* @param {Sid} environmentSid environment in which to get logs
* @param {Sid} serviceSid service to look for logs
Expand All @@ -46,28 +74,10 @@ export async function listOnePageLogResources(
client: TwilioServerlessApiClient,
filters: LogFilters
): Promise<LogApiResource[]> {
const pageSize = filters.pageSize || 50;
const { functionSid, startDate, endDate, pageToken } = filters;
const url = urlWithFilters(environmentSid, serviceSid, filters);
try {
let url = `Services/${serviceSid}/Environments/${environmentSid}/Logs?PageSize=${pageSize}`;
if (typeof functionSid !== 'undefined') {
url += `&FunctionSid=${functionSid}`;
}
if (typeof startDate !== 'undefined') {
url += `&StartDate=${
startDate instanceof Date ? startDate.toISOString() : startDate
}`;
}
if (typeof endDate !== 'undefined') {
url += `&EndDate=${
endDate instanceof Date ? endDate.toISOString() : endDate
}`;
}
if (typeof pageToken !== 'undefined') {
url += `&PageToken=${pageToken}`;
}
const resp = await client.request('get', url);
const content = (resp.body as unknown) as LogList;
const content = resp.body as unknown as LogList;
return content.logs as LogApiResource[];
} catch (err) {
log('%O', new ClientApiError(err));
Expand All @@ -76,7 +86,41 @@ export async function listOnePageLogResources(
}

/**
* Calls the API to retrieve a list of all assets
* Calls the API to retrieve a paginated list of logs
*
* @param {Sid} environmentSid environment in which to get logs
* @param {Sid} serviceSid service to look for logs
* @param {TwilioServerlessApiClient} client API client
* @param {LogFilters} filters filters to apply to the request
* @param {string} nextPageUrl if you have a next page url, use it
* @returns {Promise<LogList>}
*/
export async function listPaginatedLogs(
environmentSid: Sid,
serviceSid: Sid,
client: TwilioServerlessApiClient,
filters: LogFilters = {},
nextPageUrl?: string
): Promise<LogList> {
try {
const opts: OptionsOfJSONResponseBody = { responseType: 'json' };
let url = nextPageUrl;
if (typeof url === 'undefined') {
url = urlWithFilters(environmentSid, serviceSid, filters);
}
if (url.startsWith('http')) {
opts.prefixUrl = '';
}
const resp = await client.request('get', url, opts);
return resp.body as unknown as LogList;
} catch (err) {
log('%O', new ClientApiError(err));
throw err;
}
}

/**
* Calls the API to retrieve a single log resource
*
* @param {Sid} logSid SID of log to retrieve
* @param {Sid} environmentSid environment in which to get logs
Expand All @@ -95,7 +139,7 @@ export async function getLog(
'get',
`Services/${serviceSid}/Environments/${environmentSid}/Logs/${logSid}`
);
return (resp.body as unknown) as LogApiResource;
return resp.body as unknown as LogApiResource;
} catch (err) {
log('%O', new ClientApiError(err));
throw err;
Expand Down
92 changes: 69 additions & 23 deletions packages/serverless-api/src/streams/logs.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { Readable } from 'stream';
import { listOnePageLogResources } from '../api/logs';
import { listPaginatedLogs } from '../api/logs';
import { TwilioServerlessApiClient } from '../client';
import { Sid } from '../types';
import { LogsConfig } from '../types/logs';
Expand All @@ -22,6 +22,7 @@ export class LogsStream extends Readable {
private _interval: NodeJS.Timeout | undefined;
private _viewedSids: Set<Sid>;
private _viewedLogs: Array<{ sid: Sid; dateCreated: Date }>;
private _paginating: boolean;

constructor(
private environmentSid: Sid,
Expand All @@ -39,6 +40,7 @@ export class LogsStream extends Readable {
config.maxPollingFrequency || defaultMaxPollingFrequency;
this._pollsWithoutResults = 0;
this._pollingCacheSize = config.logCacheSize || defaultLogCacheSize;
this._paginating = false;
}

set pollingFrequency(frequency: number) {
Expand All @@ -53,7 +55,12 @@ export class LogsStream extends Readable {

async _poll() {
try {
const logs = await listOnePageLogResources(
if (this._paginating) {
// We are going back through older logs that have been missed between
// polls, so don't start a new poll of the latest logs yet.
return;
}
let logPage = await listPaginatedLogs(
this.environmentSid,
this.serviceSid,
this.client,
Expand All @@ -62,32 +69,39 @@ export class LogsStream extends Readable {
pageSize: this.config.limit,
}
);
const unviewedLogs = logs.filter((log) => !this._viewedSids.has(log.sid));
let logs = logPage.logs;
let unviewedLogs = logs.filter((log) => !this._viewedSids.has(log.sid));
if (this._viewedSids.size > 0) {
// if we have seen some logs, we need to check if more than one page of
// logs are new.
while (
unviewedLogs.length === logs.length &&
logPage.meta.next_page_url
) {
// all of the logs are new, so we should get the next page
this._paginating = true;
logPage = await listPaginatedLogs(
this.environmentSid,
this.serviceSid,
this.client,
{},
logPage.meta.next_page_url
);
unviewedLogs = unviewedLogs.concat(
logPage.logs.filter((log) => !this._viewedSids.has(log.sid))
);
logs = logs.concat(logPage.logs);
}
}
if (unviewedLogs.length > 0) {
this._pollsWithoutResults = 0;
this.pollingFrequency = this._initialPollingFrequency;
log(
`New log received. Now polling once every ${this._pollingFrequency} milliseconds.`
);
// We got new logs, make sure we are polling at the base frequency
this._resetPollingFrequency();
unviewedLogs.reverse().forEach((log) => {
this.push(log);
});
} else {
if (this._pollsWithoutResults < pollsBeforeBackOff) {
this._pollsWithoutResults++;
} else {
if (this._pollingFrequency < this._maxPollingFrequency) {
log(
`No new logs for ${
this._pollsWithoutResults * this._pollingFrequency
} milliseconds. Now polling once every ${
this._pollingFrequency * 2
} milliseconds.`
);
this.pollingFrequency = this._pollingFrequency * 2;
this._pollsWithoutResults = 0;
}
}
// No new logs this time, so maybe back off polling
this._backOffPolling();
}

// The logs endpoint is not reliably returning logs in the same order
Expand Down Expand Up @@ -122,9 +136,13 @@ export class LogsStream extends Readable {
// Finally we create a set of just SIDs to compare against.
this._viewedSids = new Set(this._viewedLogs.map((log) => log.sid));

// If this is not tailing the logs, stop the stream.
if (!this.config.tail) {
this.push(null);
}
// If we were paginating through older resources, we can now allow the
// next poll when it is triggered.
this._paginating = false;
} catch (err) {
this.destroy(err);
}
Expand All @@ -148,4 +166,32 @@ export class LogsStream extends Readable {
this._interval = undefined;
}
}

private _resetPollingFrequency() {
this._pollsWithoutResults = 0;
if (this.pollingFrequency !== this._initialPollingFrequency) {
this.pollingFrequency = this._initialPollingFrequency;
log(
`New log received. Now polling once every ${this._pollingFrequency} milliseconds.`
);
}
}

private _backOffPolling() {
if (this._pollsWithoutResults < pollsBeforeBackOff) {
this._pollsWithoutResults++;
} else {
if (this._pollingFrequency < this._maxPollingFrequency) {
log(
`No new logs for ${
this._pollsWithoutResults * this._pollingFrequency
} milliseconds. Now polling once every ${
this._pollingFrequency * 2
} milliseconds.`
);
this.pollingFrequency = this._pollingFrequency * 2;
this._pollsWithoutResults = 0;
}
}
}
}

0 comments on commit ab2e4b6

Please sign in to comment.