Skip to content

Commit

Permalink
Don't re-aggregate on retries (#264)
Browse files Browse the repository at this point in the history
  • Loading branch information
etspaceman authored Dec 12, 2023
1 parent 92fc48d commit 5a51212
Show file tree
Hide file tree
Showing 4 changed files with 119 additions and 51 deletions.
22 changes: 14 additions & 8 deletions shared/src/main/scala/kinesis4cats/producer/Producer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,9 @@ abstract class Producer[F[_], PutReq, PutRes] private[kinesis4cats] (

def config: Producer.Config[F]

private lazy val batcher: Batcher = new Batcher(config.batcherConfig)
private[kinesis4cats] lazy val batcher: Batcher = new Batcher(
config.batcherConfig
)

/** Underlying implementation for putting a batch request to Kinesis
*
Expand Down Expand Up @@ -107,7 +109,8 @@ abstract class Producer[F[_], PutReq, PutRes] private[kinesis4cats] (
): Option[NonEmptyList[Producer.FailedRecord]]

private def _put(
records: NonEmptyList[Record]
records: NonEmptyList[Record],
retrying: Boolean
): F[Producer.Result[PutRes]] = {
val ctx = LogContext()

Expand All @@ -133,7 +136,7 @@ abstract class Producer[F[_], PutReq, PutRes] private[kinesis4cats] (
else F.unit
} yield Record.WithShard.fromOption(rec, shardRes.toOption)
)
batched = batcher.batch(withShards)
batched = batcher.batch(withShards, retrying)
res <-
batched.batches
.flatTraverse(batch =>
Expand Down Expand Up @@ -174,7 +177,7 @@ abstract class Producer[F[_], PutReq, PutRes] private[kinesis4cats] (
val ctx = LogContext()

for {
ref <- Ref.of(Producer.RetryState[PutRes](records, None))
ref <- Ref.of(Producer.RetryState[PutRes](records, None, false))
finalRes <- retryingOnFailuresAndAllErrors(
config.retryPolicy,
(x: Producer.Result[PutRes]) =>
Expand All @@ -201,15 +204,16 @@ abstract class Producer[F[_], PutReq, PutRes] private[kinesis4cats] (
failed.toList // Only use failed from most recent result
)
)
}
},
true
)
)
} yield (),
(e: Throwable, details: RetryDetails) =>
logger.warn(ctx.addEncoded("retryDetails", details).context, e)(
"Exception when putting records, retrying"
)
)(ref.get.flatMap(x => _put(x.inputRecords)))
)(ref.get.flatMap(x => _put(x.inputRecords, x.retrying)))
_ <-
if (finalRes.hasFailed) {
if (config.raiseOnFailures) {
Expand All @@ -234,7 +238,8 @@ abstract class Producer[F[_], PutReq, PutRes] private[kinesis4cats] (
(
Producer.RetryState(
current.inputRecords,
Some(result)
Some(result),
current.retrying
),
result
)
Expand All @@ -247,7 +252,8 @@ object Producer {

final private case class RetryState[A](
inputRecords: NonEmptyList[Record],
res: Option[Result[A]]
res: Option[Result[A]],
retrying: Boolean
)

private[kinesis4cats] final case class Result[A](
Expand Down
13 changes: 10 additions & 3 deletions shared/src/main/scala/kinesis4cats/producer/batching/Batcher.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,15 @@ private[kinesis4cats] final class Batcher(config: Batcher.Config) {
* @param records
* [[cats.data.NonEmptyList NonEmptyList]] of
* [[kinesis4cats.producer.Record.WithShard records]]
* @param retrying
* Determines if the batcher is being executed during a retry. This helps
* to avoid re-aggregation of data during retries
* @return
* [[kinesis4cats.producer.batching.Batcher.Result Batcher.Result]]
*/
def batch(
records: NonEmptyList[Record.WithShard]
records: NonEmptyList[Record.WithShard],
retrying: Boolean
): Batcher.Result = {
val errors = records.collect {
case record
Expand All @@ -66,7 +70,10 @@ private[kinesis4cats] final class Batcher(config: Batcher.Config) {

val batchRes: List[Batch] = NonEmptyList
.fromList(valid)
.flatMap(x => if (config.aggregate) _aggregateAndBatch(x) else _batch(x))
.flatMap(x =>
if (config.aggregate && !retrying) _aggregateAndBatch(x)
else _batch(x)
)
.map(_.toList)
.getOrElse(Nil)

Expand All @@ -85,7 +92,7 @@ private[kinesis4cats] final class Batcher(config: Batcher.Config) {
* @return
* List of batches
*/
private def _aggregateAndBatch(
private[kinesis4cats] def _aggregateAndBatch(
records: NonEmptyList[Record.WithShard]
): Option[NonEmptyList[Batch]] = {
val aggregated =
Expand Down
123 changes: 89 additions & 34 deletions shared/src/test/scala/kinesis4cats/producer/ProducerSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,52 +33,107 @@ import kinesis4cats.models.ShardId
import kinesis4cats.models.StreamNameOrArn

class ProducerSpec extends munit.CatsEffectSuite {
def fixture: SyncIO[FunFixture[MockProducer]] = ResourceFunFixture(
MockProducer()
)
def fixture(aggregate: Boolean): SyncIO[FunFixture[MockProducer]] =
ResourceFunFixture(
MockProducer(aggregate)
)

fixture(false).test("It should retry methods and eventually produce") {
producer =>
val record1 = Record(Array.fill(50)(1), "1")
val record2 = Record(Array.fill(50)(1), "2")
val record3 = Record(Array.fill(50)(1), "3")
val record4 = Record(Array.fill(50)(1), "4")
val record5 = Record(Array.fill(50)(1), "5")

val data = NonEmptyList.of(
record1,
record2,
record3,
record4,
record5
)

val response1 = MockPutResponse(
NonEmptyList.one(record1),
List(record2, record3, record4, record5)
)

val response2 = MockPutResponse(
NonEmptyList.one(record2),
List(record3, record4, record5)
)

val response3 = MockPutResponse(
NonEmptyList.one(record3),
List(record4, record5)
)

fixture.test("It should retry methods and eventually produce") { producer =>
val record1 = Record(Array.fill(50)(1), "1")
val record2 = Record(Array.fill(50)(1), "2")
val record3 = Record(Array.fill(50)(1), "3")
val record4 = Record(Array.fill(50)(1), "4")
val record5 = Record(Array.fill(50)(1), "5")
val response4 = MockPutResponse(
NonEmptyList.one(record4),
List(record5)
)

val response5 = MockPutResponse(
NonEmptyList.one(record5),
List.empty
)

val expected: Producer.Result[MockPutResponse] = Producer.Result(
List(response1, response2, response3, response4, response5),
Nil,
Nil
)

producer.put(data).map { res =>
assert(res === expected, s"res: $res\nexp: $expected")
}
}

fixture(true).test(
"It should retry methods and eventually produce when aggregated"
) { producer =>
val record1 = Record("record 1".getBytes(), "1")
val record2 = Record("record 2".getBytes(), "2")

val data = NonEmptyList.of(
record1,
record2,
record3,
record4,
record5
record2
)

val aggregatedRecord1 = producer.batcher
._aggregateAndBatch(
NonEmptyList.one(Record.WithShard(record1, ShardId("1")))
)
.getOrElse(fail("Could not get aggregated batch"))
.head
.shardBatches
.head
._2
.records
val aggregatedRecord2 = producer.batcher
._aggregateAndBatch(
NonEmptyList.one(Record.WithShard(record2, ShardId("1")))
)
.getOrElse(fail("Could not get aggregated batch"))
.head
.shardBatches
.head
._2
.records

val response1 = MockPutResponse(
NonEmptyList.one(record1),
List(record2, record3, record4, record5)
aggregatedRecord1,
aggregatedRecord2.toList
)

val response2 = MockPutResponse(
NonEmptyList.one(record2),
List(record3, record4, record5)
)

val response3 = MockPutResponse(
NonEmptyList.one(record3),
List(record4, record5)
)

val response4 = MockPutResponse(
NonEmptyList.one(record4),
List(record5)
)

val response5 = MockPutResponse(
NonEmptyList.one(record5),
aggregatedRecord2,
List.empty
)

val expected: Producer.Result[MockPutResponse] = Producer.Result(
List(response1, response2, response3, response4, response5),
List(response1, response2),
Nil,
Nil
)
Expand Down Expand Up @@ -134,7 +189,7 @@ class MockProducer(
}

object MockProducer {
def apply(): Resource[IO, MockProducer] = for {
def apply(aggregate: Boolean): Resource[IO, MockProducer] = for {
logger <- Resource.pure(NoOpLogger[IO])
shardMapCache <- ShardMapCache.Builder
.default[IO](
Expand Down Expand Up @@ -164,7 +219,7 @@ object MockProducer {
logger,
shardMapCache,
defaultConfig.copy(batcherConfig =
defaultConfig.batcherConfig.copy(aggregate = false)
defaultConfig.batcherConfig.copy(aggregate = aggregate)
),
Producer.LogEncoders.show
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ class BatcherSpec extends munit.CatsEffectSuite {
)
)

val res = batcher.batch(records)
val res = batcher.batch(records, false)
assert(res.isSuccessful)
assert(res === expected, s"res: $res\nexp: $expected")
}
Expand Down Expand Up @@ -101,7 +101,7 @@ class BatcherSpec extends munit.CatsEffectSuite {
)
)

val res = batcher.batch(records.map(_._2))
val res = batcher.batch(records.map(_._2), false)
assert(res.isSuccessful)
assert(res === expected, s"res: $res\nexp: $expected")
}
Expand Down Expand Up @@ -137,7 +137,7 @@ class BatcherSpec extends munit.CatsEffectSuite {
)
)

val res = batcher.batch(records)
val res = batcher.batch(records, false)
assert(res.isSuccessful)
assert(res === expected, s"res: $res\nexp: $expected")
}
Expand Down Expand Up @@ -179,7 +179,7 @@ class BatcherSpec extends munit.CatsEffectSuite {
)
)

val res = batcher.batch(records.map(_._2))
val res = batcher.batch(records.map(_._2), false)
assert(res.isSuccessful)
assert(res === expected, s"res: $res\nexp: $expected")
}
Expand All @@ -197,7 +197,7 @@ class BatcherSpec extends munit.CatsEffectSuite {
records.map(x => Producer.InvalidRecord.RecordTooLarge(x.record)).toList,
Nil
)
val res = batcher.batch(records)
val res = batcher.batch(records, false)
assert(res.hasInvalid)
assert(res === expected, s"res: $res\nexp: $expected")
}
Expand Down Expand Up @@ -242,7 +242,7 @@ class BatcherSpec extends munit.CatsEffectSuite {
}
)

val res = batcher.batch(badRecords ::: goodRecords.map(_._2))
val res = batcher.batch(badRecords ::: goodRecords.map(_._2), false)
assert(res.isPartiallySuccessful)
assert(res === expected, s"res: $res\nexp: $expected")
}
Expand Down

0 comments on commit 5a51212

Please sign in to comment.