Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature Request] Add batching and concurrency options #98

Open
dmeehan1968 opened this issue Jan 17, 2025 · 7 comments
Open

[Feature Request] Add batching and concurrency options #98

dmeehan1968 opened this issue Jan 17, 2025 · 7 comments
Labels
enhancement New feature or request

Comments

@dmeehan1968
Copy link

Many of the AWS clients support sending multiple events within a single request, such as SQS and EventBridge, but there are typically quota limits on how many. Addtionally, the client is usually configured with a maximum number of concurrent requests, typically defined by the HTTP agent (tends to be 50).

I have often found myself repeating the process of having some iterable of events and having to chunk according to the service limit per request, and then handling the requests serially or with some degree of concurrency.

It would be useful if the generated Effect clients had additional options to control this, though appreciate this might then affect response (success and response handling). I have tended to rely on errors being returned to the invoker so that it takes advantage of the inherent retries and DLQ handling where configured, rather than attempting to implement retry logic at the client level.

I'm not sure if its possible for the code gen to know what the service limits are, and which commands they would apply to. There are additional service limits such as payload size to also consider, although calculating those can, I think, be service specific and perhaps much harder to implement generically.

For example:

EventBridge.putEvents(
  { entries }, 
  {}, // handler options
  { batchSize: 10, concurrency: 10}
)
@floydspace
Copy link
Owner

floydspace commented Jan 17, 2025

that is a good suggestion, I had this in mind to implement for dynamodb service (for batchGetItem and batchWriteItem)

but I suggest to ether merge options with second argument

EventBridge.putEvents(
  { entries }, 
  { batchSize: 10, concurrency: 10} // handler options
)

or add some higher order function like withConcurrency, similar to withRequestBatching and withConcurrency from Effect

I'm not sure if its possible for the code gen to know what the service limits are, and which commands they would apply to.

that is most probably the case, but I'm not agains to customize clients, especially it is already the case for client-s3 where I added presigned url overloads as custom addition

* @see {@link GetObjectCommand}
*/
getObject(
args: GetObjectCommandInput,
options?: { readonly presigned?: false } & HttpHandlerOptions,
): Effect.Effect<
GetObjectCommandOutput,
SdkError | InvalidObjectStateError | NoSuchKeyError
>;
getObject(
args: GetObjectCommandInput,
options?: { readonly presigned: true } & RequestPresigningArguments,
): Effect.Effect<string, SdkError | S3ServiceError>;

@floydspace floydspace added the enhancement New feature or request label Jan 17, 2025
@dmeehan1968
Copy link
Author

dmeehan1968 commented Jan 17, 2025

I took a look at the Request/RequestResolver interfaces in the Batching docs.

Here I was taking the approach that requests could be made for single entries, and the resolver would submit the batch in a single putEvents call. The idea being that the caller would only have to iterate of input entries once, and the batching logic should work on those single entries. This might be an approach for all commands that accept arrays of entries, but there may be some complication over whether the target resource (SQS Queue etc) is part of the entry or the request. In my initial use case of EventBridge the EventBus is an entry detail, so easy to handle.

However, I don't seem to be able to get the correct return type, as Request.complete/completeEffect have void returns:

class PutEventError {
  readonly _tag = 'PutEventError'
}

interface PutEvent extends Request.Request<PutEventsResultEntry, PutEventError>{
  readonly _tag: 'PutEvent'
  readonly entry: PutEventsRequestEntry
}

const PutEvent = Request.tagged<PutEvent>('PutEvent')

const PutEventResolver = RequestResolver.makeBatched((requests: ReadonlyArray<PutEvent>) =>
  EventBridge.putEvents({ Entries: requests?.map(request => request.entry) }).pipe(
    Effect.map(response => response.Entries ?? []),
    Effect.andThen(results =>
      Effect.forEach(requests, (request, index) =>
        Request.completeEffect(request, Effect.succeed(results[index]!))))
  )
)

gives:

TS2345: Argument of type
(requests: ReadonlyArray<PutEvent>) => Effect. Effect<void[], InternalError | SdkError, EventBridgeService>
is not assignable to parameter of type
(requests: [PutEvent, ...PutEvent[]]) => Effect<void, never, EventBridgeService>
Type
Effect<void[], InternalError | SdkError, EventBridgeService>
is not assignable to type Effect<void, never, EventBridgeService>
Type void[] is not assignable to type void

@dmeehan1968
Copy link
Author

Update code in previous comment to include request types

@floydspace
Copy link
Owner

floydspace commented Jan 17, 2025

you complete your request when it succeeds, but you also need to complete in case of errors,
according to the TS error, the expected output error is never, means you need to Effect.orDie

however I see you actually use PutEventError error in your request interface, so you need to remap from InternalError | SdkError to PutEventError

...
      Effect.catchAll((error) =>
        Effect.forEach(requests, Request.completeEffect(Effect.fail(new PutEventError()))),
      ),

@dmeehan1968
Copy link
Author

Thanks, somehow I wasn't thinking that was important to the outcome!

Here is an outline of Batching code for an EventBridge PutEvent (singular):

class PutEventError {
  readonly _tag = 'PutEventError'
}

interface PutEventRequest extends Request.Request<PutEventsResultEntry, PutEventError>{
  readonly _tag: 'PutEventRequest'
  readonly entry: PutEventsRequestEntry
}

const PutEventRequest = Request.tagged<PutEventRequest>('PutEventRequest')

const PutEventResolver = RequestResolver.makeBatched((requests: ReadonlyArray<PutEventRequest>) =>
  EventBridge.putEvents({ Entries: requests?.map(request => request.entry) }).pipe(
    Effect.map(response => response.Entries ?? []),
    Effect.andThen(results =>
      Effect.forEach(requests, (request, index) =>
        Request.completeEffect(request, Effect.succeed(results[index]!)))
    ),
    Effect.catchAll((_error) =>
      Effect.forEach(requests, Request.completeEffect(Effect.fail(new PutEventError()))),
    ),
  )
).pipe(
  RequestResolver.batchN(10),
  RequestResolver.contextFromServices(EventBridge)
)

const PutEvent = (entry: PutEventsRequestEntry) => Effect.request(PutEventRequest({ entry }), PutEventResolver)

And here is example usage, with the commenting out of my previous chunking code:

const decodeServiceHandler = (event: unknown, _context: Context) => pipe(
  Schema.decodeUnknown(EventBridgeEventWithUplink)(event),
  Effect.andThen(uplink => decodeUplink(uplink.detail)),
  Effect.andThen(Effect.forEach(({ type, ...payload }) => Effect.gen(function* () {
    const config = yield* DecoderConfig
    return Common.PutEventsRequestEntry.make({
      Source: config.source,
      DetailType: type ?? config.detailType,
      Detail: JSON.stringify(payload),
      EventBusName: config.eventBusArn,
    })
  }))),
  Effect.andThen(entries => Effect.forEach(entries, PutEvent, { batching: true })),
  // Effect.andThen(payloads => Chunk.fromIterable(payloads).pipe(Chunk.chunksOf(10))),
  // Effect.andThen(Effect.forEach(entries => EventBridge.putEvents({ Entries: Chunk.toArray(entries) }), { concurrency: 10 })),
  Effect.as(undefined),
)

Observations:

  1. Even when specifying { concurrency: 2 } on the forEach with PutEvent, it looks like the requests are done in series, and I don't see an option in the RequestResolver or makeBatched to do them in parallel (if desired, which I would imagine would be the default case).
  2. It's not clear if batching allows for a batching window (can't see one), for example, to make a downstream request when the upstream producer is slow. I can imagine the case of an SQS Queue consumer (using client side polling in Lambda, not an event trigger) where you might get less than the batch size requests within a specified window, and want to pass the accumulated events downstream whilst continuing to wait. That might be beyond the scope, I don't have an immediate need for concurrency on top of batching, but seems like a useful optimisation.

Creating the Request/RequestResolver is quite a lot of boiler plate on top of the existing effect-aws code, and given that it might be hard to determine which services/commands could benefit from batching, it might be easier to create a utility, perhaps as a static on the service, that allows the developer to make the Request query and associated code on demand.

Something like:

const PutEvent = EventBridge.makeBatching(PutEventsCommand, { 
  max: 10, 
  window: Duration.milliseconds(100), 
  concurrency: 10 
})

I also think that if possible, the query effect (PutEvent in this case) should enable batching by default, and so perhaps this encapsulates the forEach and becomes PutEvents (plural):

const PutEvents = (entries: ReadonlyArray<PutEventsRequestEntry>) => Effect.forEach(entries, PutEvent, { batching: true })

Maybe that's an option to the utility function, as to whether it presents a singular or plural interface.

As previously suggested, batching could be added as an option to the existing service methods, but I think I like the idea of keeping those as an 'effect'ual version of the SDK, and supporting batching on an adhoc basis.

I think something would need to be done about accumulating results, or maybe its sufficient to just return an array of the regular command output.

@dmeehan1968
Copy link
Author

Thinking out loud here - I went back to see how Batching compared to the chunking alternative:

I define the features as:

  • Respecting Service Limits - Automatic splitting of batch requests to respect service limits
  • Concurrency Control - Allowing for concurrent batch requests to optimise throughput
  • Distributed producers - Support for producers from different fibers (concurrent processing of upstream events)
  • Optimal aggregation - Automatic aggregation of items into the maximum allowed items per batch request, whilst allowing for early flush in the event of a slow upstream
Feature Batching Chunking
Respecting service limits Yes Yes
Concurrency control No Yes
Distributed producers Yes No
Optimal aggregation Yes No
Batching Window No No

For completeness, here is an implementation of PutEvents that uses Chunks to meet the service limit whilst allowing concurrency:

const ChunkedPutEvents = ({ batchSize, concurrency } : { batchSize: number, concurrency: Concurrency }) => (entries: ReadonlyArray<PutEventsRequestEntry>) => {
  const chunks = Chunk.chunksOf(batchSize)(Chunk.fromIterable(entries))
  return Effect.forEach(chunks, chunk => EventBridge.putEvents({ Entries: Chunk.toArray(chunk)}), { concurrency })
}

I'm not familiar enough with Effect or the Batching API to fully understand whether its capable of providing the concurrency support. I don't see a way to configure concurrenct so I'm assuming there is none. I am also assuming that it will aggregrate separate request until a batch is full, but without the possibility of a batching window so flush downstream when the upstream is slow.

I've somewhat familiar with Mailbox which might provide the necessary support. It seems possible to have an API similar to makeBatched which creates the mailbox and a consumer (the 'request resolver'), and then pass the mailbox to a request wrapper so that it can insert items into the mailbox (the request effect).

The mailbox can be converted to a Stream with a Stream.groupedWithin to honor the batch size and window. I think it would then need fibers to be created for each batch and a way to collect the results, whilst respecting the concurrency configuration.

@dmeehan1968
Copy link
Author

dmeehan1968 commented Jan 25, 2025

I worked up batching for DynamoDB Get, using BatchGetCommand under the hood. This is less straightforward as requests need to be grouped by table and then the responses matched up to the request. This is far from complete:

  • Doesn't handle UnprocessedItems which can occur when the request/response exceeds the service limits
  • Should ensure that all requests are completed in the batch, even if the AWS response doesn't contain a match (e.g. not found)
  • Not clear on how to customise the batching size
  • Not clear on how to make batch requests concurrent
  • Not clear on how to get batching to flush a partial batch in the case of slow upstream requests
  • Probably lots of edge cases

The main value here was that there is plenty of nuance in how indivdual AWS clients handle batching and that might make adding broad support for batching in effect-aws more effort.

import { KeysAndAttributes } from "@aws-sdk/client-dynamodb"
import { BatchGetCommandInput, NativeAttributeValue } from "@aws-sdk/lib-dynamodb"
import { DynamoDBDocument } from "@effect-aws/lib-dynamodb"
import { Data, Effect, Equal, Request, RequestResolver } from "effect"

type Item = Record<string, NativeAttributeValue>
type KeyAndAttributes = Omit<KeysAndAttributes, "Keys"> & {
  Key: Item
}

export class GetItemError extends Data.TaggedError('GetItemError')<{
  Key: Item
  cause: Error
}> {
}


interface GetItemRequest extends Request.Request<Item, GetItemError>, KeyAndAttributes {
  readonly _tag: 'GetItemRequest'
  readonly TableArn: string
}

export const GetItemRequest = Request.tagged<GetItemRequest>('GetItemRequest')
const GetItemResolver = RequestResolver.makeBatched((requests: ReadonlyArray<GetItemRequest>) => Effect.gen(function* () {

  const groupRequestsByTable = () => {
    type TableArn = string
    const requestsPerTableMap = new Map<TableArn, Set<GetItemRequest>>()
    for (const request of requests) {
      const setOfRequests = requestsPerTableMap.get(request.TableArn) ?? new Set()
      setOfRequests.add(request)
      requestsPerTableMap.set(request.TableArn, setOfRequests)
    }
    return requestsPerTableMap
  }

  const makeBatchGetItemCommand = (requestsPerTableMap: ReturnType<typeof groupRequestsByTable>) => {
    const input: BatchGetCommandInput = { RequestItems: {} }
    for (const [tableArn, tableRequests] of requestsPerTableMap) {
      input.RequestItems ??= {}
      input.RequestItems[tableArn] ??= { Keys: [] }
      for (const request of tableRequests) {
        input.RequestItems[tableArn].Keys!.push(request.Key)
      }
    }
    return input
  }

  const requestsPerTableMap = groupRequestsByTable()
  const input = makeBatchGetItemCommand(requestsPerTableMap)

  const itemMatchesKey = (item: Item, key: Item) => {
    for (const [k, v] of Object.entries(key)) {
      if (!Equal.equals(item[k], v)) {
        return false
      }
    }
    return true
  }

  yield* DynamoDBDocument.batchGet(input).pipe(
    Effect.andThen(response =>
      Effect.forEach(Object.entries(response.Responses ?? {}), ([tableArn, items]) =>
        Effect.forEach(items, item =>
          Effect.filter(requests, request => Effect.succeed(request.TableArn === tableArn && itemMatchesKey(item, request.Key))).pipe(
            Effect.andThen(Effect.forEach(request =>
              Request.completeEffect(request, Effect.succeed(item)))),
          ),
        ),
      ),
    ),
    Effect.catchAll(cause =>
      Effect.forEach(requests, request =>
        Request.completeEffect(request, new GetItemError({
          cause,
          Key: request.Key,
        })),
      ),
    ),
  )

})).pipe(
  RequestResolver.batchN(100),
  RequestResolver.contextFromServices(DynamoDBDocument),
)

export const GetItem = (request: GetItemRequest) => Effect.request(request, GetItemResolver)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

2 participants