diff --git a/docs/kcl/getting-started.md b/docs/kcl/getting-started.md index 52d855e1..bbb90283 100644 --- a/docs/kcl/getting-started.md +++ b/docs/kcl/getting-started.md @@ -16,6 +16,7 @@ import cats.syntax.all._ import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient import software.amazon.awssdk.services.kinesis.KinesisAsyncClient +import software.amazon.kinesis.processor.SingleStreamTracker import kinesis4cats.kcl._ import kinesis4cats.kcl.logging.instances.show._ @@ -36,7 +37,7 @@ object MyApp extends ResourceApp.Forever { kinesisClient, dynamoClient, cloudWatchClient, - "my-stream", + new SingleStreamTracker("my-stream"), "my-app-name" )((records: List[CommittableRecord[IO]]) => records.traverse_(r => IO.println(r.data.asString)) @@ -88,7 +89,7 @@ object MyApp extends ResourceApp.Forever { kinesisClient, Map(streamArn1 -> position, streamArn2 -> position) ).toResource - consumer <- KCLConsumer.configsBuilderMultiStream[IO]( + consumer <- KCLConsumer.configsBuilder[IO]( kinesisClient.client, dynamoClient, cloudWatchClient, @@ -113,6 +114,7 @@ import cats.effect._ import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient import software.amazon.awssdk.services.kinesis.KinesisAsyncClient +import software.amazon.kinesis.processor.SingleStreamTracker import kinesis4cats.kcl.fs2.KCLConsumerFS2 import kinesis4cats.kcl.logging.instances.show._ @@ -133,7 +135,7 @@ object MyApp extends ResourceApp.Forever { kinesisClient, dynamoClient, cloudWatchClient, - "my-stream", + new SingleStreamTracker("my-stream"), "my-app-name" )() _ <- consumer diff --git a/docs/kcl/http4s.md b/docs/kcl/http4s.md index e80b2a27..56c22892 100644 --- a/docs/kcl/http4s.md +++ b/docs/kcl/http4s.md @@ -20,6 +20,7 @@ import com.comcast.ip4s._ import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient import software.amazon.awssdk.services.kinesis.KinesisAsyncClient +import software.amazon.kinesis.processor.SingleStreamTracker import kinesis4cats.kcl._ import kinesis4cats.kcl.http4s.KCLService @@ -41,7 +42,7 @@ object MyApp extends ResourceApp.Forever { kinesisClient, dynamoClient, cloudWatchClient, - "my-stream", + new SingleStreamTracker("my-stream"), "my-app-name" )((records: List[CommittableRecord[IO]]) => records.traverse_(r => IO.println(r.data.asString)) diff --git a/docs/kcl/localstack.md b/docs/kcl/localstack.md index 78f18426..99484f97 100644 --- a/docs/kcl/localstack.md +++ b/docs/kcl/localstack.md @@ -13,6 +13,7 @@ libraryDependencies += "io.github.etspaceman" %% "kinesis4cats-kcl-localstack" % ```scala mdoc:compile-only import cats.effect.IO import cats.syntax.all._ +import software.amazon.kinesis.processor.SingleStreamTracker import kinesis4cats.kcl._ import kinesis4cats.kcl.logging.instances.show._ @@ -24,22 +25,32 @@ val processRecords = (records: List[CommittableRecord[IO]]) => // Runs a KCLConsumer as a Resource. Resource contains a Deferred value, //which completes when the consumer has begun to process records. -LocalstackKCLConsumer.kclConsumer[IO]("my-stream", "my-app-name")(processRecords) +LocalstackKCLConsumer.kclConsumer[IO]( + new SingleStreamTracker("my-stream"), + "my-app-name" +)(processRecords) // Runs a KCLConsumer as a Resource. Resource contains 2 things: // - A Deferred value, which completes when the consumer has begun to process records. // - A results Queue, which contains records received by the consumer -LocalstackKCLConsumer.kclConsumerWithResults[IO]("my-stream", "my-app-name")(processRecords) +LocalstackKCLConsumer.kclConsumerWithResults[IO]( + new SingleStreamTracker("my-stream"), + "my-app-name" +)(processRecords) ``` ## Usage - FS2 ```scala mdoc:compile-only import cats.effect.IO +import software.amazon.kinesis.processor.SingleStreamTracker import kinesis4cats.kcl.logging.instances.show._ import kinesis4cats.kcl.fs2.localstack.LocalstackKCLConsumerFS2 // Runs a KCLConsumerFS2 as a Resource, which contains FS2 Streaming methods. -LocalstackKCLConsumerFS2.kclConsumer[IO]("my-stream", "my-app-name") +LocalstackKCLConsumerFS2.kclConsumer[IO]( + new SingleStreamTracker("my-stream"), + "my-app-name" +) ``` diff --git a/integration-tests/src/main/scalajvm/kinesis4cats/kcl/http4s/TestKCLService.scala b/integration-tests/src/main/scalajvm/kinesis4cats/kcl/http4s/TestKCLService.scala index 39e30b45..f4f7ff6a 100644 --- a/integration-tests/src/main/scalajvm/kinesis4cats/kcl/http4s/TestKCLService.scala +++ b/integration-tests/src/main/scalajvm/kinesis4cats/kcl/http4s/TestKCLService.scala @@ -19,6 +19,7 @@ package http4s import cats.effect.{IO, Resource, ResourceApp} import com.comcast.ip4s._ +import software.amazon.kinesis.processor.SingleStreamTracker import kinesis4cats.Utils import kinesis4cats.ciris.CirisReader @@ -30,7 +31,7 @@ object TestKCLService extends ResourceApp.Forever { override def run(args: List[String]): Resource[IO, Unit] = for { streamName <- CirisReader.read[String](List("test", "stream")).resource[IO] configAndResults <- LocalstackKCLConsumer.kclConfigWithResults[IO]( - streamName, + new SingleStreamTracker(streamName), s"test-kcl-service-spec-${Utils.randomUUIDString}" )((_: List[CommittableRecord[IO]]) => IO.unit) consumer = new KCLConsumer[IO](configAndResults.kclConfig) diff --git a/integration-tests/src/test/scalajvm/kinesis4cats/client/producer/KinesisProducerNoShardMapSpec.scala b/integration-tests/src/test/scalajvm/kinesis4cats/client/producer/KinesisProducerNoShardMapSpec.scala index b02b9d8a..b8ca3415 100644 --- a/integration-tests/src/test/scalajvm/kinesis4cats/client/producer/KinesisProducerNoShardMapSpec.scala +++ b/integration-tests/src/test/scalajvm/kinesis4cats/client/producer/KinesisProducerNoShardMapSpec.scala @@ -21,6 +21,8 @@ import cats.effect.syntax.all._ import cats.syntax.all._ import software.amazon.awssdk.services.kinesis.model.PutRecordsRequest import software.amazon.awssdk.services.kinesis.model.PutRecordsResponse +import software.amazon.kinesis.common._ +import software.amazon.kinesis.processor.SingleStreamTracker import kinesis4cats.Utils import kinesis4cats.client.KinesisClient @@ -77,7 +79,12 @@ class KinesisProducerNoShardMapSpec for { _ <- LocalstackKinesisClient.streamResource[IO](streamName, shardCount) deferredWithResults <- LocalstackKCLConsumer.kclConsumerWithResults( - streamName, + new SingleStreamTracker( + StreamIdentifier.singleStreamInstance(streamName), + InitialPositionInStreamExtended.newInitialPosition( + InitialPositionInStream.TRIM_HORIZON + ) + ), appName )((_: List[CommittableRecord[IO]]) => IO.unit) _ <- deferredWithResults.deferred.get.toResource diff --git a/integration-tests/src/test/scalajvm/kinesis4cats/client/producer/KinesisProducerSpec.scala b/integration-tests/src/test/scalajvm/kinesis4cats/client/producer/KinesisProducerSpec.scala index 36282f2c..6f661108 100644 --- a/integration-tests/src/test/scalajvm/kinesis4cats/client/producer/KinesisProducerSpec.scala +++ b/integration-tests/src/test/scalajvm/kinesis4cats/client/producer/KinesisProducerSpec.scala @@ -20,6 +20,8 @@ import cats.effect._ import cats.effect.syntax.all._ import software.amazon.awssdk.services.kinesis.model.PutRecordsRequest import software.amazon.awssdk.services.kinesis.model.PutRecordsResponse +import software.amazon.kinesis.common._ +import software.amazon.kinesis.processor.SingleStreamTracker import kinesis4cats.Utils import kinesis4cats.client.localstack.LocalstackKinesisClient @@ -59,7 +61,12 @@ class KinesisProducerSpec for { _ <- LocalstackKinesisClient.streamResource[IO](streamName, shardCount) deferredWithResults <- LocalstackKCLConsumer.kclConsumerWithResults( - streamName, + new SingleStreamTracker( + StreamIdentifier.singleStreamInstance(streamName), + InitialPositionInStreamExtended.newInitialPosition( + InitialPositionInStream.TRIM_HORIZON + ) + ), appName )((_: List[CommittableRecord[IO]]) => IO.unit) _ <- deferredWithResults.deferred.get.toResource diff --git a/integration-tests/src/test/scalajvm/kinesis4cats/client/producer/fs2/FS2KinesisProducerSpec.scala b/integration-tests/src/test/scalajvm/kinesis4cats/client/producer/fs2/FS2KinesisProducerSpec.scala index cd5589b8..d22971b7 100644 --- a/integration-tests/src/test/scalajvm/kinesis4cats/client/producer/fs2/FS2KinesisProducerSpec.scala +++ b/integration-tests/src/test/scalajvm/kinesis4cats/client/producer/fs2/FS2KinesisProducerSpec.scala @@ -20,6 +20,8 @@ import cats.effect._ import cats.effect.syntax.all._ import software.amazon.awssdk.services.kinesis.model.PutRecordsRequest import software.amazon.awssdk.services.kinesis.model.PutRecordsResponse +import software.amazon.kinesis.common._ +import software.amazon.kinesis.processor.SingleStreamTracker import kinesis4cats.Utils import kinesis4cats.client.localstack.LocalstackKinesisClient @@ -59,7 +61,12 @@ class KinesisFS2ProducerSpec for { _ <- LocalstackKinesisClient.streamResource[IO](streamName, shardCount) deferredWithResults <- LocalstackKCLConsumer.kclConsumerWithResults( - streamName, + new SingleStreamTracker( + StreamIdentifier.singleStreamInstance(streamName), + InitialPositionInStreamExtended.newInitialPosition( + InitialPositionInStream.TRIM_HORIZON + ) + ), appName )((_: List[CommittableRecord[IO]]) => IO.unit) _ <- deferredWithResults.deferred.get.toResource diff --git a/integration-tests/src/test/scalajvm/kinesis4cats/kcl/KCLConsumerSpec.scala b/integration-tests/src/test/scalajvm/kinesis4cats/kcl/KCLConsumerSpec.scala index e7188073..8e9b8cd0 100644 --- a/integration-tests/src/test/scalajvm/kinesis4cats/kcl/KCLConsumerSpec.scala +++ b/integration-tests/src/test/scalajvm/kinesis4cats/kcl/KCLConsumerSpec.scala @@ -28,6 +28,8 @@ import io.circe.syntax._ import org.scalacheck.Arbitrary import software.amazon.awssdk.core.SdkBytes import software.amazon.awssdk.services.kinesis.model.PutRecordRequest +import software.amazon.kinesis.common._ +import software.amazon.kinesis.processor.SingleStreamTracker import kinesis4cats.Utils import kinesis4cats.client.KinesisClient @@ -92,7 +94,12 @@ object KCLConsumerSpec { ): Resource[IO, Resources[IO]] = for { client <- LocalstackKinesisClient.streamResource[IO](streamName, shardCount) deferredWithResults <- LocalstackKCLConsumer.kclConsumerWithResults( - streamName, + new SingleStreamTracker( + StreamIdentifier.singleStreamInstance(streamName), + InitialPositionInStreamExtended.newInitialPosition( + InitialPositionInStream.TRIM_HORIZON + ) + ), appName )((_: List[CommittableRecord[IO]]) => IO.unit) } yield Resources( diff --git a/integration-tests/src/test/scalajvm/kinesis4cats/kcl/fs2/KCLConsumerFS2Spec.scala b/integration-tests/src/test/scalajvm/kinesis4cats/kcl/fs2/KCLConsumerFS2Spec.scala index 1071f884..500ef38c 100644 --- a/integration-tests/src/test/scalajvm/kinesis4cats/kcl/fs2/KCLConsumerFS2Spec.scala +++ b/integration-tests/src/test/scalajvm/kinesis4cats/kcl/fs2/KCLConsumerFS2Spec.scala @@ -29,6 +29,8 @@ import io.circe.syntax._ import org.scalacheck.Arbitrary import software.amazon.awssdk.core.SdkBytes import software.amazon.awssdk.services.kinesis.model.PutRecordRequest +import software.amazon.kinesis.common._ +import software.amazon.kinesis.processor.SingleStreamTracker import kinesis4cats.Utils import kinesis4cats.client.KinesisClient @@ -89,7 +91,12 @@ object KCLConsumerFS2Spec { ): Resource[IO, Resources[IO]] = for { client <- LocalstackKinesisClient.streamResource[IO](streamName, shardCount) consumer <- LocalstackKCLConsumerFS2.kclConsumer[IO]( - streamName, + new SingleStreamTracker( + StreamIdentifier.singleStreamInstance(streamName), + InitialPositionInStreamExtended.newInitialPosition( + InitialPositionInStream.TRIM_HORIZON + ) + ), appName ) streamAndDeferred <- consumer.streamWithDeferredListener() diff --git a/integration-tests/src/test/scalajvm/kinesis4cats/kcl/fs2/multistream/KCLConsumerFS2MultiSpec.scala b/integration-tests/src/test/scalajvm/kinesis4cats/kcl/fs2/multistream/KCLConsumerFS2MultiSpec.scala index 5674c9ca..5d00d405 100644 --- a/integration-tests/src/test/scalajvm/kinesis4cats/kcl/fs2/multistream/KCLConsumerFS2MultiSpec.scala +++ b/integration-tests/src/test/scalajvm/kinesis4cats/kcl/fs2/multistream/KCLConsumerFS2MultiSpec.scala @@ -136,7 +136,7 @@ object KCLConsumerFS2MultiSpec { Map(streamArn1 -> position, streamArn2 -> position) ) .toResource - consumer <- LocalstackKCLConsumerFS2.kclMultiConsumer[IO]( + consumer <- LocalstackKCLConsumerFS2.kclConsumer[IO]( tracker, appName ) diff --git a/integration-tests/src/test/scalajvm/kinesis4cats/kcl/multistream/KCLConsumerMultiSpec.scala b/integration-tests/src/test/scalajvm/kinesis4cats/kcl/multistream/KCLConsumerMultiSpec.scala index 4b24bcc3..34391620 100644 --- a/integration-tests/src/test/scalajvm/kinesis4cats/kcl/multistream/KCLConsumerMultiSpec.scala +++ b/integration-tests/src/test/scalajvm/kinesis4cats/kcl/multistream/KCLConsumerMultiSpec.scala @@ -133,7 +133,7 @@ object KCLConsumerMultiSpec { Map(streamArn1 -> position, streamArn2 -> position) ) .toResource - deferredWithResults <- LocalstackKCLConsumer.kclMultiConsumerWithResults( + deferredWithResults <- LocalstackKCLConsumer.kclConsumerWithResults( tracker, appName )((_: List[CommittableRecord[IO]]) => IO.unit) diff --git a/integration-tests/src/test/scalajvm/kinesis4cats/smithy4s/client/producer/KinesisProducerNoShardMapSpec.scala b/integration-tests/src/test/scalajvm/kinesis4cats/smithy4s/client/producer/KinesisProducerNoShardMapSpec.scala index 56516e68..7206baa5 100644 --- a/integration-tests/src/test/scalajvm/kinesis4cats/smithy4s/client/producer/KinesisProducerNoShardMapSpec.scala +++ b/integration-tests/src/test/scalajvm/kinesis4cats/smithy4s/client/producer/KinesisProducerNoShardMapSpec.scala @@ -24,6 +24,8 @@ import com.amazonaws.kinesis.PutRecordsOutput import org.http4s.blaze.client.BlazeClientBuilder import org.typelevel.log4cats.slf4j.Slf4jLogger import smithy4s.aws.kernel.AwsRegion +import software.amazon.kinesis.common._ +import software.amazon.kinesis.processor.SingleStreamTracker import kinesis4cats.SSL import kinesis4cats.Utils @@ -103,7 +105,12 @@ class KinesisProducerNoShardMapSpec loggerF = (f: Async[IO]) => Slf4jLogger.create[IO](f, implicitly) ) deferredWithResults <- LocalstackKCLConsumer.kclConsumerWithResults( - streamName, + new SingleStreamTracker( + StreamIdentifier.singleStreamInstance(streamName), + InitialPositionInStreamExtended.newInitialPosition( + InitialPositionInStream.TRIM_HORIZON + ) + ), appName )((_: List[CommittableRecord[IO]]) => IO.unit) _ <- deferredWithResults.deferred.get.toResource diff --git a/integration-tests/src/test/scalajvm/kinesis4cats/smithy4s/client/producer/KinesisProducerSpec.scala b/integration-tests/src/test/scalajvm/kinesis4cats/smithy4s/client/producer/KinesisProducerSpec.scala index 23ff1a28..6197121e 100644 --- a/integration-tests/src/test/scalajvm/kinesis4cats/smithy4s/client/producer/KinesisProducerSpec.scala +++ b/integration-tests/src/test/scalajvm/kinesis4cats/smithy4s/client/producer/KinesisProducerSpec.scala @@ -23,6 +23,8 @@ import com.amazonaws.kinesis.PutRecordsOutput import org.http4s.blaze.client.BlazeClientBuilder import org.typelevel.log4cats.slf4j.Slf4jLogger import smithy4s.aws.kernel.AwsRegion +import software.amazon.kinesis.common._ +import software.amazon.kinesis.processor.SingleStreamTracker import kinesis4cats.SSL import kinesis4cats.Utils @@ -87,7 +89,12 @@ class KinesisProducerSpec loggerF = (f: Async[IO]) => Slf4jLogger.create[IO](f, implicitly) ) deferredWithResults <- LocalstackKCLConsumer.kclConsumerWithResults( - streamName, + new SingleStreamTracker( + StreamIdentifier.singleStreamInstance(streamName), + InitialPositionInStreamExtended.newInitialPosition( + InitialPositionInStream.TRIM_HORIZON + ) + ), appName )((_: List[CommittableRecord[IO]]) => IO.unit) _ <- deferredWithResults.deferred.get.toResource diff --git a/integration-tests/src/test/scalajvm/kinesis4cats/smithy4s/client/producer/fs2/FS2KinesisProducerSpec.scala b/integration-tests/src/test/scalajvm/kinesis4cats/smithy4s/client/producer/fs2/FS2KinesisProducerSpec.scala index cf3b93aa..ca5ddf83 100644 --- a/integration-tests/src/test/scalajvm/kinesis4cats/smithy4s/client/producer/fs2/FS2KinesisProducerSpec.scala +++ b/integration-tests/src/test/scalajvm/kinesis4cats/smithy4s/client/producer/fs2/FS2KinesisProducerSpec.scala @@ -23,6 +23,8 @@ import com.amazonaws.kinesis.PutRecordsOutput import org.http4s.blaze.client.BlazeClientBuilder import org.typelevel.log4cats.slf4j.Slf4jLogger import smithy4s.aws.kernel.AwsRegion +import software.amazon.kinesis.common._ +import software.amazon.kinesis.processor.SingleStreamTracker import kinesis4cats.SSL import kinesis4cats.Utils @@ -87,7 +89,12 @@ class FS2KinesisProducerSpec loggerF = (f: Async[IO]) => Slf4jLogger.create[IO](f, implicitly) ) deferredWithResults <- LocalstackKCLConsumer.kclConsumerWithResults( - streamName, + new SingleStreamTracker( + StreamIdentifier.singleStreamInstance(streamName), + InitialPositionInStreamExtended.newInitialPosition( + InitialPositionInStream.TRIM_HORIZON + ) + ), appName )((_: List[CommittableRecord[IO]]) => IO.unit) _ <- deferredWithResults.deferred.get.toResource diff --git a/kcl-ciris/src/main/scala/kinesis4cats/kcl/ciris/KCLCiris.scala b/kcl-ciris/src/main/scala/kinesis4cats/kcl/ciris/KCLCiris.scala index 757415bf..9faf55ff 100644 --- a/kcl-ciris/src/main/scala/kinesis4cats/kcl/ciris/KCLCiris.scala +++ b/kcl-ciris/src/main/scala/kinesis4cats/kcl/ciris/KCLCiris.scala @@ -35,6 +35,7 @@ import software.amazon.kinesis.leases._ import software.amazon.kinesis.leases.dynamodb.TableCreatorCallback import software.amazon.kinesis.lifecycle._ import software.amazon.kinesis.metrics._ +import software.amazon.kinesis.processor.SingleStreamTracker import software.amazon.kinesis.retrieval.fanout.FanOutConfig import software.amazon.kinesis.retrieval.polling.PollingConfig import software.amazon.kinesis.retrieval.{AggregatorUtil, RetrievalConfig} @@ -1255,10 +1256,16 @@ object KCLCiris { List("kcl", "retrieval", "max", "list", "shards", "retry", "attempts"), prefix ) - } yield new RetrievalConfig(kinesisClient, streamName, appName) + } yield new RetrievalConfig( + kinesisClient, + new SingleStreamTracker( + StreamIdentifier.singleStreamInstance(streamName), + position + ), + appName + ) .retrievalSpecificConfig(retrievalConfig) .retrievalFactory(retrievalConfig.retrievalFactory()) - .initialPositionInStreamExtended(position) .maybeTransform(listShardsBackoffTime)( _.listShardsBackoffTimeInMillis(_) ) diff --git a/kcl-ciris/src/test/scala/kinesis4cats/kcl/ciris/KCLCirisSpec.scala b/kcl-ciris/src/test/scala/kinesis4cats/kcl/ciris/KCLCirisSpec.scala index 60215a9e..71111789 100644 --- a/kcl-ciris/src/test/scala/kinesis4cats/kcl/ciris/KCLCirisSpec.scala +++ b/kcl-ciris/src/test/scala/kinesis4cats/kcl/ciris/KCLCirisSpec.scala @@ -26,6 +26,7 @@ import software.amazon.kinesis.coordinator.CoordinatorConfig import software.amazon.kinesis.leases.LeaseManagementConfig import software.amazon.kinesis.lifecycle.LifecycleConfig import software.amazon.kinesis.metrics.{MetricsConfig, MetricsLevel} +import software.amazon.kinesis.processor.SingleStreamTracker import software.amazon.kinesis.retrieval.RetrievalConfig import software.amazon.kinesis.retrieval.fanout.FanOutConfig import software.amazon.kinesis.retrieval.polling.PollingConfig @@ -255,7 +256,12 @@ class KCLCirisSpec extends munit.CatsEffectSuite { .load[IO](kinesisClient, prefix = Some("polling.prop")) pollingExpected = new RetrievalConfig( kinesisClient, - BuildInfo.pollingKclStreamName, + new SingleStreamTracker( + StreamIdentifier.singleStreamInstance(BuildInfo.pollingKclStreamName), + InitialPositionInStreamExtended.newInitialPosition( + InitialPositionInStream.valueOf(BuildInfo.pollingKclInitialPosition) + ) + ), BuildInfo.pollingKclAppName ) .retrievalSpecificConfig( @@ -290,11 +296,6 @@ class KCLCirisSpec extends munit.CatsEffectSuite { .asJava ) ) - .initialPositionInStreamExtended( - InitialPositionInStreamExtended.newInitialPosition( - InitialPositionInStream.valueOf(BuildInfo.pollingKclInitialPosition) - ) - ) .safeTransform( BuildInfo.pollingKclRetrievalListShardsBackoffTime.asMillisUnsafe )( @@ -308,7 +309,12 @@ class KCLCirisSpec extends munit.CatsEffectSuite { fanoutExpected = new RetrievalConfig( kinesisClient, - BuildInfo.fanoutKclStreamName, + new SingleStreamTracker( + StreamIdentifier.singleStreamInstance(BuildInfo.fanoutKclStreamName), + InitialPositionInStreamExtended.newInitialPosition( + InitialPositionInStream.valueOf(BuildInfo.fanoutKclInitialPosition) + ) + ), BuildInfo.fanoutKclAppName ) .retrievalSpecificConfig( @@ -340,11 +346,6 @@ class KCLCirisSpec extends munit.CatsEffectSuite { BuildInfo.fanoutKclRetrievalFanoutRetryBackoff.asMillisUnsafe )(_.retryBackoffMillis(_)) ) - .initialPositionInStreamExtended( - InitialPositionInStreamExtended.newInitialPosition( - InitialPositionInStream.valueOf(BuildInfo.fanoutKclInitialPosition) - ) - ) .safeTransform( BuildInfo.fanoutKclRetrievalListShardsBackoffTime.asMillisUnsafe )( diff --git a/kcl-ciris/src/test/scala/kinesis4cats/kcl/instances/eq.scala b/kcl-ciris/src/test/scala/kinesis4cats/kcl/instances/eq.scala index ab179040..63d59fb9 100644 --- a/kcl-ciris/src/test/scala/kinesis4cats/kcl/instances/eq.scala +++ b/kcl-ciris/src/test/scala/kinesis4cats/kcl/instances/eq.scala @@ -125,10 +125,8 @@ object eq { } implicit val retrievalConfigEq: Eq[RetrievalConfig] = (x, y) => - x.appStreamTracker() == y.appStreamTracker() && + x.streamTracker() == y.streamTracker() && x.applicationName() === y.applicationName() && - x.initialPositionInStreamExtended() === y - .initialPositionInStreamExtended() && x.listShardsBackoffTimeInMillis() === y.listShardsBackoffTimeInMillis() && x.maxListShardsRetryAttempts() === y.maxListShardsRetryAttempts() && x.retrievalSpecificConfig() === y.retrievalSpecificConfig() diff --git a/kcl-ciris/src/test/scala/kinesis4cats/kcl/instances/show.scala b/kcl-ciris/src/test/scala/kinesis4cats/kcl/instances/show.scala index bddb6594..5c3af2ea 100644 --- a/kcl-ciris/src/test/scala/kinesis4cats/kcl/instances/show.scala +++ b/kcl-ciris/src/test/scala/kinesis4cats/kcl/instances/show.scala @@ -27,6 +27,8 @@ import software.amazon.kinesis.leases.LeaseManagementConfig import software.amazon.kinesis.lifecycle.LifecycleConfig import software.amazon.kinesis.metrics.MetricsConfig import software.amazon.kinesis.processor.MultiStreamTracker +import software.amazon.kinesis.processor.SingleStreamTracker +import software.amazon.kinesis.processor.StreamTracker import software.amazon.kinesis.retrieval._ import software.amazon.kinesis.retrieval.fanout.FanOutConfig import software.amazon.kinesis.retrieval.polling.PollingConfig @@ -182,17 +184,25 @@ object show { implicit val mutliStreamTrackerShow: Show[MultiStreamTracker] = x => ShowBuilder("MultiStreamTracker") - .add("", x.streamConfigList().asScala.toList) + .add("streamConfigList", x.streamConfigList().asScala.toList) + .add("isMultiStream", x.isMultiStream()) .build + implicit val singleStreamTrackerShow: Show[SingleStreamTracker] = x => + ShowBuilder("SingleStreamTracker") + .add("streamConfigList", x.streamConfigList().asScala.toList) + .add("isMultiStream", x.isMultiStream()) + .build + + implicit val streamTrackerShow: Show[StreamTracker] = { + case x: MultiStreamTracker => mutliStreamTrackerShow.show(x) + case x: SingleStreamTracker => singleStreamTrackerShow.show(x) + } + implicit val retrievalConfigShow: Show[RetrievalConfig] = x => ShowBuilder("RetrievalConfig") - .add("appStreamTracker", x.appStreamTracker()) + .add("streamTracker", x.streamTracker()) .add("applicationName", x.applicationName()) - .add( - "initialPositionInStreamExtended", - x.initialPositionInStreamExtended().toString() - ) .add("listShardsBackoffTimeInMillis", x.listShardsBackoffTimeInMillis()) .add("maxListShardsRetryAttempts", x.maxListShardsRetryAttempts()) .add("retrievalSpecificConfig", x.retrievalSpecificConfig()) diff --git a/kcl-localstack/src/main/scala/kinesis4cats/kcl/fs2/localstack/LocalstackKCLConsumerFS2.scala b/kcl-localstack/src/main/scala/kinesis4cats/kcl/fs2/localstack/LocalstackKCLConsumerFS2.scala index a66dab48..3a3ac31b 100644 --- a/kcl-localstack/src/main/scala/kinesis4cats/kcl/fs2/localstack/LocalstackKCLConsumerFS2.scala +++ b/kcl-localstack/src/main/scala/kinesis4cats/kcl/fs2/localstack/LocalstackKCLConsumerFS2.scala @@ -22,11 +22,10 @@ import cats.Parallel import cats.effect.std.Queue import cats.effect.syntax.all._ import cats.effect.{Async, Resource} -import software.amazon.kinesis.common._ +import software.amazon.kinesis.processor.StreamTracker import kinesis4cats.Utils import kinesis4cats.kcl.localstack.LocalstackKCLConsumer -import kinesis4cats.kcl.multistream.MultiStreamTracker import kinesis4cats.localstack.LocalstackConfig /** Helpers for constructing and leveraging the KCL with Localstack via FS2. @@ -39,7 +38,7 @@ object LocalstackKCLConsumerFS2 { * * @param config * [[kinesis4cats.localstack.LocalstackConfig LocalstackConfig]] - * @param streamName + * @param streamTracker * Name of stream to consume * @param appName * Application name for the consumer. Used for the dynamodb table name as @@ -59,10 +58,9 @@ object LocalstackKCLConsumerFS2 { */ def kclConfig[F[_]]( config: LocalstackConfig, - streamName: String, + streamTracker: StreamTracker, appName: String, workerId: String, - position: InitialPositionInStreamExtended, processConfig: KCLConsumer.ProcessConfig )(implicit F: Async[F], @@ -71,10 +69,9 @@ object LocalstackKCLConsumerFS2 { queue <- Queue.bounded[F, CommittableRecord[F]](100).toResource underlying <- LocalstackKCLConsumer.kclConfig( config, - streamName, + streamTracker, appName, workerId, - position, processConfig )(KCLConsumerFS2.callback(queue)) } yield KCLConsumerFS2 @@ -84,50 +81,7 @@ object LocalstackKCLConsumerFS2 { * [[kinesis4cats.kcl.fs2.KCLConsumerFS2.Config KCLConsumerFS2.Config]] that * is compliant with Localstack. * - * @param config - * [[kinesis4cats.localstack.LocalstackConfig LocalstackConfig]] - * @param tracker - * [[kinesis4cats.kcl.multistream.MultiStreamTracker MultiStreamTracker]] - * @param appName - * Application name for the consumer. Used for the dynamodb table name as - * well as the metrics namespace. - * @param workerId - * Unique identifier for the worker. Typically a UUID. - * @param processConfig - * [[kinesis4cats.kcl.KCLConsumer.ProcessConfig KCLConsumer.ProcessConfig]] - * @param F - * [[cats.effect.Async Async]] - * @param LE - * [[kinesis4cats.kcl.RecordProcessor.LogEncoders RecordProcessor.LogEncoders]] - * @return - * [[kinesis4cats.kcl.fs2.KCLConsumerFS2.Config KCLConsumerFS2.Config]] - */ - def kclMultiConfig[F[_]]( - config: LocalstackConfig, - tracker: MultiStreamTracker, - appName: String, - workerId: String, - processConfig: KCLConsumer.ProcessConfig - )(implicit - F: Async[F], - LE: RecordProcessor.LogEncoders - ): Resource[F, KCLConsumerFS2.Config[F]] = for { - queue <- Queue.bounded[F, CommittableRecord[F]](100).toResource - underlying <- LocalstackKCLConsumer.kclMultiConfig( - config, - tracker, - appName, - workerId, - processConfig - )(KCLConsumerFS2.callback(queue)) - } yield KCLConsumerFS2 - .Config[F](underlying, queue, KCLConsumerFS2.FS2Config.default) - - /** Creates a - * [[kinesis4cats.kcl.fs2.KCLConsumerFS2.Config KCLConsumerFS2.Config]] that - * is compliant with Localstack. - * - * @param streamName + * @param streamTracker * Name of stream to consume * @param appName * Application name for the consumer. Used for the dynamodb table name as @@ -150,14 +104,10 @@ object LocalstackKCLConsumerFS2 { * [[kinesis4cats.kcl.fs2.KCLConsumerFS2.Config KCLConsumerFS2.Config]] */ def kclConfig[F[_]]( - streamName: String, + streamTracker: StreamTracker, appName: String, prefix: Option[String] = None, workerId: String = Utils.randomUUIDString, - position: InitialPositionInStreamExtended = - InitialPositionInStreamExtended.newInitialPosition( - InitialPositionInStream.TRIM_HORIZON - ), processConfig: KCLConsumer.ProcessConfig = KCLConsumerFS2.defaultProcessConfig )(implicit @@ -167,53 +117,7 @@ object LocalstackKCLConsumerFS2 { config <- LocalstackConfig.resource(prefix) result <- kclConfig( config, - streamName, - appName, - workerId, - position, - processConfig - ) - } yield result - - /** Creates a - * [[kinesis4cats.kcl.fs2.KCLConsumerFS2.Config KCLConsumerFS2.Config]] that - * is compliant with Localstack. - * - * @param tracker - * [[kinesis4cats.kcl.multistream.MultiStreamTracker MultiStreamTracker]] - * @param appName - * Application name for the consumer. Used for the dynamodb table name as - * well as the metrics namespace. - * @param prefix - * Optional prefix for parsing configuration. Default to None - * @param workerId - * Unique identifier for the worker. Default is a random UUID Default is - * TRIM_HORIZON - * @param processConfig - * [[kinesis4cats.kcl.KCLConsumer.ProcessConfig KCLConsumer.ProcessConfig]] - * Default is `ProcessConfig.default` with autoCommit set to false - * @param F - * [[cats.effect.Async Async]] - * @param LE - * [[kinesis4cats.kcl.RecordProcessor.LogEncoders RecordProcessor.LogEncoders]] - * @return - * [[kinesis4cats.kcl.fs2.KCLConsumerFS2.Config KCLConsumerFS2.Config]] - */ - def kclMultiConfig[F[_]]( - tracker: MultiStreamTracker, - appName: String, - prefix: Option[String] = None, - workerId: String = Utils.randomUUIDString, - processConfig: KCLConsumer.ProcessConfig = - KCLConsumerFS2.defaultProcessConfig - )(implicit - F: Async[F], - LE: RecordProcessor.LogEncoders - ): Resource[F, KCLConsumerFS2.Config[F]] = for { - config <- LocalstackConfig.resource(prefix) - result <- kclMultiConfig( - config, - tracker, + streamTracker, appName, workerId, processConfig @@ -228,7 +132,7 @@ object LocalstackKCLConsumerFS2 { * * @param config * [[kinesis4cats.localstack.LocalstackConfig LocalstackConfig]] - * @param streamName + * @param streamTracker * Name of stream to consume * @param appName * Application name for the consumer. Used for the dynamodb table name as @@ -249,10 +153,9 @@ object LocalstackKCLConsumerFS2 { */ def kclConsumer[F[_]]( config: LocalstackConfig, - streamName: String, + streamTracker: StreamTracker, appName: String, workerId: String, - position: InitialPositionInStreamExtended, processConfig: KCLConsumer.ProcessConfig )(implicit F: Async[F], @@ -261,54 +164,7 @@ object LocalstackKCLConsumerFS2 { ): Resource[F, KCLConsumerFS2[F]] = kclConfig( config, - streamName, - appName, - workerId, - position, - processConfig - ).map( - new KCLConsumerFS2[F](_) - ) - - /** Runs a [[kinesis4cats.kcl.fs2.KCLConsumerFS2 KCLConsumerFS2]] that is - * compliant with Localstack. Also exposes a - * [[cats.effect.Deferred Deferred]] that will complete when the consumer has - * started processing records. Useful for allowing tests time for the - * consumer to start before processing the stream. - * - * @param config - * [[kinesis4cats.localstack.LocalstackConfig LocalstackConfig]] - * @param tracker - * [[kinesis4cats.kcl.multistream.MultiStreamTracker MultiStreamTracker]] - * @param appName - * Application name for the consumer. Used for the dynamodb table name as - * well as the metrics namespace. - * @param workerId - * Unique identifier for the worker. Typically a UUID. - * @param processConfig - * [[kinesis4cats.kcl.KCLConsumer.ProcessConfig KCLConsumer.ProcessConfig]] - * @param F - * [[cats.effect.Async Async]] - * @param LE - * [[kinesis4cats.kcl.RecordProcessor.LogEncoders RecordProcessor.LogEncoders]] - * @return - * [[kinesis4cats.kcl.fs2.KCLConsumerFS2]] in a - * [[cats.effect.Resource Resource]] - */ - def kclMultiConsumer[F[_]]( - config: LocalstackConfig, - tracker: MultiStreamTracker, - appName: String, - workerId: String, - processConfig: KCLConsumer.ProcessConfig - )(implicit - F: Async[F], - P: Parallel[F], - LE: RecordProcessor.LogEncoders - ): Resource[F, KCLConsumerFS2[F]] = - kclMultiConfig( - config, - tracker, + streamTracker, appName, workerId, processConfig @@ -322,7 +178,7 @@ object LocalstackKCLConsumerFS2 { * started processing records. Useful for allowing tests time for the * consumer to start before processing the stream. * - * @param streamName + * @param streamTracker * Name of stream to consume * @param appName * Application name for the consumer. Used for the dynamodb table name as @@ -346,14 +202,10 @@ object LocalstackKCLConsumerFS2 { * [[cats.effect.Resource Resource]] */ def kclConsumer[F[_]]( - streamName: String, + streamTracker: StreamTracker, appName: String, prefix: Option[String] = None, workerId: String = Utils.randomUUIDString, - position: InitialPositionInStreamExtended = - InitialPositionInStreamExtended.newInitialPosition( - InitialPositionInStream.TRIM_HORIZON - ), processConfig: KCLConsumer.ProcessConfig = KCLConsumerFS2.defaultProcessConfig )(implicit @@ -364,56 +216,7 @@ object LocalstackKCLConsumerFS2 { config <- LocalstackConfig.resource(prefix) result <- kclConsumer( config, - streamName, - appName, - workerId, - position, - processConfig - ) - } yield result - - /** Runs a [[kinesis4cats.kcl.fs2.KCLConsumerFS2 KCLConsumerFS2]] that is - * compliant with Localstack. Also exposes a - * [[cats.effect.Deferred Deferred]] that will complete when the consumer has - * started processing records. Useful for allowing tests time for the - * consumer to start before processing the stream. - * - * @param tracker - * [[kinesis4cats.kcl.multistream.MultiStreamTracker MultiStreamTracker]] - * @param appName - * Application name for the consumer. Used for the dynamodb table name as - * well as the metrics namespace. - * @param prefix - * Optional prefix for parsing configuration. Default to None - * @param workerId - * Unique identifier for the worker. Default to a random UUID. - * @param processConfig - * [[kinesis4cats.kcl.KCLConsumer.ProcessConfig KCLConsumer.ProcessConfig]] - * Default is `ProcessConfig.default` with autoCommit set to false - * @param F - * [[cats.effect.Async Async]] - * @param LE - * [[kinesis4cats.kcl.RecordProcessor.LogEncoders RecordProcessor.LogEncoders]] - * @return - * [[kinesis4cats.kcl.fs2.KCLConsumerFS2 KCLConsumerFS2]] in a - * [[cats.effect.Resource Resource]] - */ - def kclMultiConsumer[F[_]]( - tracker: MultiStreamTracker, - appName: String, - prefix: Option[String] = None, - workerId: String = Utils.randomUUIDString, - processConfig: KCLConsumer.ProcessConfig = - KCLConsumerFS2.defaultProcessConfig - )(implicit - F: Async[F], - P: Parallel[F], - LE: RecordProcessor.LogEncoders - ): Resource[F, KCLConsumerFS2[F]] = for { - config <- LocalstackConfig.resource(prefix) - result <- kclMultiConsumer( - config, - tracker, + streamTracker, appName, workerId, processConfig diff --git a/kcl-localstack/src/main/scala/kinesis4cats/kcl/localstack/LocalstackKCLConsumer.scala b/kcl-localstack/src/main/scala/kinesis4cats/kcl/localstack/LocalstackKCLConsumer.scala index 6695ac48..c9354284 100644 --- a/kcl-localstack/src/main/scala/kinesis4cats/kcl/localstack/LocalstackKCLConsumer.scala +++ b/kcl-localstack/src/main/scala/kinesis4cats/kcl/localstack/LocalstackKCLConsumer.scala @@ -22,16 +22,15 @@ import cats.effect.syntax.all._ import cats.effect.{Async, Deferred, Resource} import cats.syntax.all._ import software.amazon.kinesis.checkpoint.CheckpointConfig -import software.amazon.kinesis.common._ import software.amazon.kinesis.coordinator.CoordinatorConfig import software.amazon.kinesis.leases.LeaseManagementConfig import software.amazon.kinesis.lifecycle.LifecycleConfig import software.amazon.kinesis.metrics.MetricsConfig +import software.amazon.kinesis.processor.StreamTracker import software.amazon.kinesis.retrieval.RetrievalConfig import software.amazon.kinesis.retrieval.polling.PollingConfig import kinesis4cats.Utils -import kinesis4cats.kcl.multistream.MultiStreamTracker import kinesis4cats.localstack.LocalstackConfig import kinesis4cats.localstack.aws.v2.AwsClients @@ -54,7 +53,7 @@ object LocalstackKCLConsumer { * * @param config * [[kinesis4cats.localstack.LocalstackConfig LocalstackConfig]] - * @param streamName + * @param streamTracker * Name of stream to consume * @param appName * Application name for the consumer. Used for the dynamodb table name as @@ -76,10 +75,9 @@ object LocalstackKCLConsumer { */ def kclConfig[F[_]]( config: LocalstackConfig, - streamName: String, + streamTracker: StreamTracker, appName: String, workerId: String, - position: InitialPositionInStreamExtended, processConfig: KCLConsumer.ProcessConfig )(cb: List[CommittableRecord[F]] => F[Unit])(implicit F: Async[F], @@ -88,77 +86,32 @@ object LocalstackKCLConsumer { kinesisClient <- AwsClients.kinesisClientResource(config) cloudwatchClient <- AwsClients.cloudwatchClientResource(config) dynamoClient <- AwsClients.dynamoClientResource(config) - retrievalConfig = new PollingConfig(streamName, kinesisClient) - result <- KCLConsumer.Config.create[F]( - new CheckpointConfig(), - new CoordinatorConfig(appName).parentShardPollIntervalMillis(1000L), - new LeaseManagementConfig( - appName, - dynamoClient, - kinesisClient, - workerId - ).initialPositionInStream(position) - .shardSyncIntervalMillis(1000L), - new LifecycleConfig(), - new MetricsConfig(cloudwatchClient, appName), - new RetrievalConfig(kinesisClient, streamName, appName) - .retrievalSpecificConfig(retrievalConfig) - .retrievalFactory(retrievalConfig.retrievalFactory()) - .initialPositionInStreamExtended(position), - processConfig = processConfig - )(cb) - } yield result - - /** Creates a [[kinesis4cats.kcl.KCLConsumer.Config KCLConsumer.Config]] that - * is compliant with Localstack. Meant to be used with multi-stream - * consumers. - * - * @param config - * [[kinesis4cats.localstack.LocalstackConfig LocalstackConfig]] - * @param tracker - * [[kinesis4cats.kcl.multistream.MultiStreamTracker]] - * @param appName - * Application name for the consumer. Used for the dynamodb table name as - * well as the metrics namespace. - * @param workerId - * Unique identifier for the worker. Typically a UUID. - * @param processConfig - * [[kinesis4cats.kcl.KCLConsumer.ProcessConfig KCLConsumer.ProcessConfig]] - * @param cb - * User-defined callback function for processing records - * @param F - * [[cats.effect.Async Async]] - * @param LE - * [[kinesis4cats.kcl.RecordProcessor.LogEncoders RecordProcessor.LogEncoders]] - * @return - * [[kinesis4cats.kcl.KCLConsumer.Config KCLConsumer.Config]] - */ - def kclMultiConfig[F[_]]( - config: LocalstackConfig, - tracker: MultiStreamTracker, - appName: String, - workerId: String, - processConfig: KCLConsumer.ProcessConfig - )(cb: List[CommittableRecord[F]] => F[Unit])(implicit - F: Async[F], - LE: RecordProcessor.LogEncoders - ): Resource[F, KCLConsumer.Config[F]] = for { - kinesisClient <- AwsClients.kinesisClientResource(config) - cloudwatchClient <- AwsClients.cloudwatchClientResource(config) - dynamoClient <- AwsClients.dynamoClientResource(config) - retrievalConfig = new PollingConfig(kinesisClient) + initialLeaseManagementConfig = new LeaseManagementConfig( + appName, + dynamoClient, + kinesisClient, + workerId + ).shardSyncIntervalMillis(1000L) + retrievalConfig = + if (streamTracker.isMultiStream()) new PollingConfig(kinesisClient) + else + new PollingConfig( + streamTracker.streamConfigList.get(0).streamIdentifier.streamName, + kinesisClient + ) result <- KCLConsumer.Config.create[F]( new CheckpointConfig(), new CoordinatorConfig(appName).parentShardPollIntervalMillis(1000L), - new LeaseManagementConfig( - appName, - dynamoClient, - kinesisClient, - workerId - ).shardSyncIntervalMillis(1000L), + if (streamTracker.isMultiStream()) initialLeaseManagementConfig + else + initialLeaseManagementConfig.initialPositionInStream( + streamTracker.streamConfigList + .get(0) + .initialPositionInStreamExtended() + ), new LifecycleConfig(), new MetricsConfig(cloudwatchClient, appName), - new RetrievalConfig(kinesisClient, tracker, appName) + new RetrievalConfig(kinesisClient, streamTracker, appName) .retrievalSpecificConfig(retrievalConfig) .retrievalFactory(retrievalConfig.retrievalFactory()), processConfig = processConfig @@ -168,7 +121,7 @@ object LocalstackKCLConsumer { /** Creates a [[kinesis4cats.kcl.KCLConsumer.Config KCLConsumer.Config]] that * is compliant with Localstack. * - * @param streamName + * @param streamTracker * Name of stream to consume * @param appName * Application name for the consumer. Used for the dynamodb table name as @@ -193,14 +146,10 @@ object LocalstackKCLConsumer { * [[kinesis4cats.kcl.KCLConsumer.Config KCLConsumer.Config]] */ def kclConfig[F[_]]( - streamName: String, + streamTracker: StreamTracker, appName: String, prefix: Option[String] = None, workerId: String = Utils.randomUUIDString, - position: InitialPositionInStreamExtended = - InitialPositionInStreamExtended.newInitialPosition( - InitialPositionInStream.TRIM_HORIZON - ), processConfig: KCLConsumer.ProcessConfig = KCLConsumer.ProcessConfig.default )(cb: List[CommittableRecord[F]] => F[Unit])(implicit @@ -210,53 +159,7 @@ object LocalstackKCLConsumer { config <- LocalstackConfig.resource(prefix) result <- kclConfig( config, - streamName, - appName, - workerId, - position, - processConfig - )(cb) - } yield result - - /** Creates a [[kinesis4cats.kcl.KCLConsumer.Config KCLConsumer.Config]] that - * is compliant with Localstack. - * - * @param tracker - * [[kinesis4cats.kcl.multistream.MultiStreamTracker]] - * @param appName - * Application name for the consumer. Used for the dynamodb table name as - * well as the metrics namespace. - * @param prefix - * Optional prefix for parsing configuration. Default to None - * @param workerId - * Unique identifier for the worker. Default is a random UUID - * @param processConfig - * [[kinesis4cats.kcl.KCLConsumer.ProcessConfig KCLConsumer.ProcessConfig]] - * Default is `ProcessConfig.default` - * @param cb - * User-defined callback function for processing records - * @param F - * [[cats.effect.Async Async]] - * @param LE - * [[kinesis4cats.kcl.RecordProcessor.LogEncoders RecordProcessor.LogEncoders]] - * @return - * [[kinesis4cats.kcl.KCLConsumer.Config KCLConsumer.Config]] - */ - def kclMultiConfig[F[_]]( - tracker: MultiStreamTracker, - appName: String, - prefix: Option[String] = None, - workerId: String = Utils.randomUUIDString, - processConfig: KCLConsumer.ProcessConfig = - KCLConsumer.ProcessConfig.default - )(cb: List[CommittableRecord[F]] => F[Unit])(implicit - F: Async[F], - LE: RecordProcessor.LogEncoders - ): Resource[F, KCLConsumer.Config[F]] = for { - config <- LocalstackConfig.resource(prefix) - result <- kclMultiConfig( - config, - tracker, + streamTracker, appName, workerId, processConfig @@ -270,7 +173,7 @@ object LocalstackKCLConsumer { * * @param config * [[kinesis4cats.localstack.LocalstackConfig LocalstackConfig]] - * @param streamName + * @param streamTracker * Name of stream to consume * @param appName * Application name for the consumer. Used for the dynamodb table name as @@ -295,10 +198,9 @@ object LocalstackKCLConsumer { */ def kclConfigWithResults[F[_]]( config: LocalstackConfig, - streamName: String, + streamTracker: StreamTracker, appName: String, workerId: String, - position: InitialPositionInStreamExtended, processConfig: KCLConsumer.ProcessConfig, resultsQueueSize: Int )(cb: List[CommittableRecord[F]] => F[Unit])(implicit @@ -310,62 +212,7 @@ object LocalstackKCLConsumer { .toResource kclConf <- kclConfig( config, - streamName, - appName, - workerId, - position, - processConfig - )((recs: List[CommittableRecord[F]]) => - resultsQueue.tryOfferN(recs) >> cb(recs) - ) - } yield ConfigWithResults(kclConf, resultsQueue) - - /** Creates a [[kinesis4cats.kcl.KCLConsumer.Config KCLConsumer.Config]] that - * is compliant with Localstack. Also creates a results - * [[cats.effect.std.Queue queue]] for the consumer to stick results into. - * Helpful when confirming data that has been produced to a stream. Intended - * to be used for testing a multi-stream consumer. - * - * @param config - * [[kinesis4cats.localstack.LocalstackConfig LocalstackConfig]] - * @param tracker - * [[kinesis4cats.kcl.multistream.MultiStreamTracker]] - * @param appName - * Application name for the consumer. Used for the dynamodb table name as - * well as the metrics namespace. - * @param workerId - * Unique identifier for the worker. Typically a UUID. - * @param processConfig - * [[kinesis4cats.kcl.KCLConsumer.ProcessConfig KCLConsumer.ProcessConfig]] - * @param resultsQueueSize - * Bounded size of the [[cats.effect.std.Queue Queue]] - * @param cb - * User-defined callback function for processing records. This will run - * after the records are enqueued into the results queue - * @param F - * [[cats.effect.Async Async]] - * @param LE - * [[kinesis4cats.kcl.RecordProcessor.LogEncoders RecordProcessor.LogEncoders]] - * @return - * [[kinesis4cats.kcl.localstack.LocalstackKCLConsumer.ConfigWithResults ConfigWithResults]] - */ - def kclMultiConfigWithResults[F[_]]( - config: LocalstackConfig, - tracker: MultiStreamTracker, - appName: String, - workerId: String, - processConfig: KCLConsumer.ProcessConfig, - resultsQueueSize: Int - )(cb: List[CommittableRecord[F]] => F[Unit])(implicit - F: Async[F], - LE: RecordProcessor.LogEncoders - ): Resource[F, ConfigWithResults[F]] = for { - resultsQueue <- Queue - .bounded[F, CommittableRecord[F]](resultsQueueSize) - .toResource - kclConf <- kclMultiConfig( - config, - tracker, + streamTracker, appName, workerId, processConfig @@ -379,7 +226,7 @@ object LocalstackKCLConsumer { * [[cats.effect.std.Queue queue]] for the consumer to stick results into. * Helpful when confirming data that has been produced to a stream. * - * @param streamName + * @param streamTracker * Name of stream to consume * @param appName * Application name for the consumer. Used for the dynamodb table name as @@ -406,14 +253,10 @@ object LocalstackKCLConsumer { * [[kinesis4cats.kcl.localstack.LocalstackKCLConsumer.ConfigWithResults ConfigWithResults]] */ def kclConfigWithResults[F[_]]( - streamName: String, + streamTracker: StreamTracker, appName: String, prefix: Option[String] = None, workerId: String = Utils.randomUUIDString, - position: InitialPositionInStreamExtended = - InitialPositionInStreamExtended.newInitialPosition( - InitialPositionInStream.TRIM_HORIZON - ), processConfig: KCLConsumer.ProcessConfig = KCLConsumer.ProcessConfig.default, resultsQueueSize: Int = 50 @@ -424,60 +267,7 @@ object LocalstackKCLConsumer { config <- LocalstackConfig.resource(prefix) result <- kclConfigWithResults( config, - streamName, - appName, - workerId, - position, - processConfig, - resultsQueueSize - )(cb) - } yield result - - /** Creates a [[kinesis4cats.kcl.KCLConsumer.Config KCLConsumer.Config]] that - * is compliant with Localstack. Also creates a results - * [[cats.effect.std.Queue queue]] for the consumer to stick results into. - * Helpful when confirming data that has been produced to a stream. - * - * @param tracker - * [[kinesis4cats.kcl.multistream.MultiStreamTracker]] - * @param appName - * Application name for the consumer. Used for the dynamodb table name as - * well as the metrics namespace. - * @param prefix - * Optional prefix for parsing configuration. Default to None - * @param workerId - * Unique identifier for the worker. Default to random UUID - * @param processConfig - * [[kinesis4cats.kcl.KCLConsumer.ProcessConfig KCLConsumer.ProcessConfig]] - * Default is `ProcessConfig.default` - * @param resultsQueueSize - * Bounded size of the [[cats.effect.std.Queue Queue]]. Default to 50. - * @param cb - * User-defined callback function for processing records. This will run - * after the records are enqueued into the results queue - * @param F - * [[cats.effect.Async Async]] - * @param LE - * [[kinesis4cats.kcl.RecordProcessor.LogEncoders RecordProcessor.LogEncoders]] - * @return - * [[kinesis4cats.kcl.localstack.LocalstackKCLConsumer.ConfigWithResults ConfigWithResults]] - */ - def kclMultiConfigWithResults[F[_]]( - tracker: MultiStreamTracker, - appName: String, - prefix: Option[String] = None, - workerId: String = Utils.randomUUIDString, - processConfig: KCLConsumer.ProcessConfig = - KCLConsumer.ProcessConfig.default, - resultsQueueSize: Int = 50 - )(cb: List[CommittableRecord[F]] => F[Unit])(implicit - F: Async[F], - LE: RecordProcessor.LogEncoders - ): Resource[F, ConfigWithResults[F]] = for { - config <- LocalstackConfig.resource(prefix) - result <- kclMultiConfigWithResults( - config, - tracker, + streamTracker, appName, workerId, processConfig, @@ -493,7 +283,7 @@ object LocalstackKCLConsumer { * * @param config * [[kinesis4cats.localstack.LocalstackConfig LocalstackConfig]] - * @param streamName + * @param streamTracker * Name of stream to consume * @param appName * Application name for the consumer. Used for the dynamodb table name as @@ -517,10 +307,9 @@ object LocalstackKCLConsumer { */ def kclConsumer[F[_]]( config: LocalstackConfig, - streamName: String, + streamTracker: StreamTracker, appName: String, workerId: String, - position: InitialPositionInStreamExtended, processConfig: KCLConsumer.ProcessConfig )(cb: List[CommittableRecord[F]] => F[Unit])(implicit F: Async[F], @@ -528,57 +317,7 @@ object LocalstackKCLConsumer { ): Resource[F, Deferred[F, Unit]] = for { config <- kclConfig( config, - streamName, - appName, - workerId, - position, - processConfig - )(cb) - consumer = new KCLConsumer(config) - deferred <- consumer.runWithDeferredListener() - } yield deferred - - /** Runs a [[kinesis4cats.kcl.KCLConsumer KCLConsumer]] that is compliant with - * Localstack. Also exposes a [[cats.effect.Deferred Deferred]] that will - * complete when the consumer has started processing records. Useful for - * allowing tests time for the consumer to start before processing the - * stream. - * - * @param config - * [[kinesis4cats.localstack.LocalstackConfig LocalstackConfig]] - * @param tracker - * [[kinesis4cats.kcl.multistream.MultiStreamTracker]] - * @param appName - * Application name for the consumer. Used for the dynamodb table name as - * well as the metrics namespace. - * @param workerId - * Unique identifier for the worker. Typically a UUID. - * @param processConfig - * [[kinesis4cats.kcl.KCLConsumer.ProcessConfig KCLConsumer.ProcessConfig]] - * @param cb - * User-defined callback function for processing records - * @param F - * [[cats.effect.Async Async]] - * @param LE - * [[kinesis4cats.kcl.RecordProcessor.LogEncoders RecordProcessor.LogEncoders]] - * @return - * [[cats.effect.Deferred Deferred]] in a - * [[cats.effect.Resource Resource]], which completes when the consumer has - * started processing records - */ - def kclMultiConsumer[F[_]]( - config: LocalstackConfig, - tracker: MultiStreamTracker, - appName: String, - workerId: String, - processConfig: KCLConsumer.ProcessConfig - )(cb: List[CommittableRecord[F]] => F[Unit])(implicit - F: Async[F], - LE: RecordProcessor.LogEncoders - ): Resource[F, Deferred[F, Unit]] = for { - config <- kclMultiConfig( - config, - tracker, + streamTracker, appName, workerId, processConfig @@ -593,7 +332,7 @@ object LocalstackKCLConsumer { * allowing tests time for the consumer to start before processing the * stream. * - * @param streamName + * @param streamTracker * Name of stream to consume * @param appName * Application name for the consumer. Used for the dynamodb table name as @@ -620,14 +359,10 @@ object LocalstackKCLConsumer { * started processing records */ def kclConsumer[F[_]]( - streamName: String, + streamTracker: StreamTracker, appName: String, prefix: Option[String] = None, workerId: String = Utils.randomUUIDString, - position: InitialPositionInStreamExtended = - InitialPositionInStreamExtended.newInitialPosition( - InitialPositionInStream.TRIM_HORIZON - ), processConfig: KCLConsumer.ProcessConfig = KCLConsumer.ProcessConfig.default )(cb: List[CommittableRecord[F]] => F[Unit])(implicit @@ -637,58 +372,7 @@ object LocalstackKCLConsumer { config <- LocalstackConfig.resource(prefix) result <- kclConsumer( config, - streamName, - appName, - workerId, - position, - processConfig - )(cb) - } yield result - - /** Runs a [[kinesis4cats.kcl.KCLConsumer KCLConsumer]] that is compliant with - * Localstack. Also exposes a [[cats.effect.Deferred Deferred]] that will - * complete when the consumer has started processing records. Useful for - * allowing tests time for the consumer to start before processing the - * stream. - * - * @param tracker - * [[kinesis4cats.kcl.multistream.MultiStreamTracker]] - * @param appName - * Application name for the consumer. Used for the dynamodb table name as - * well as the metrics namespace. - * @param prefix - * Optional prefix for parsing configuration. Default to None - * @param workerId - * Unique identifier for the worker. Default to a random UUID. - * @param processConfig - * [[kinesis4cats.kcl.KCLConsumer.ProcessConfig KCLConsumer.ProcessConfig]] - * Default is `ProcessConfig.default` - * @param cb - * User-defined callback function for processing records - * @param F - * [[cats.effect.Async Async]] - * @param LE - * [[kinesis4cats.kcl.RecordProcessor.LogEncoders RecordProcessor.LogEncoders]] - * @return - * [[cats.effect.Deferred Deferred]] in a - * [[cats.effect.Resource Resource]], which completes when the consumer has - * started processing records - */ - def kclMultiConsumer[F[_]]( - tracker: MultiStreamTracker, - appName: String, - prefix: Option[String] = None, - workerId: String = Utils.randomUUIDString, - processConfig: KCLConsumer.ProcessConfig = - KCLConsumer.ProcessConfig.default - )(cb: List[CommittableRecord[F]] => F[Unit])(implicit - F: Async[F], - LE: RecordProcessor.LogEncoders - ): Resource[F, Deferred[F, Unit]] = for { - config <- LocalstackConfig.resource(prefix) - result <- kclMultiConsumer( - config, - tracker, + streamTracker, appName, workerId, processConfig @@ -704,7 +388,7 @@ object LocalstackKCLConsumer { * * @param config * [[kinesis4cats.localstack.LocalstackConfig LocalstackConfig]] - * @param streamName + * @param streamTracker * Name of stream to consume * @param appName * Application name for the consumer. Used for the dynamodb table name as @@ -730,10 +414,9 @@ object LocalstackKCLConsumer { */ def kclConsumerWithResults[F[_]]( config: LocalstackConfig, - streamName: String, + streamTracker: StreamTracker, appName: String, workerId: String, - position: InitialPositionInStreamExtended, processConfig: KCLConsumer.ProcessConfig, resultsQueueSize: Int )(cb: List[CommittableRecord[F]] => F[Unit])(implicit @@ -742,10 +425,9 @@ object LocalstackKCLConsumer { ): Resource[F, DeferredWithResults[F]] = for { configWithResults <- kclConfigWithResults( config, - streamName, + streamTracker, appName, workerId, - position, processConfig, resultsQueueSize )(cb) @@ -760,61 +442,7 @@ object LocalstackKCLConsumer { * for allowing tests time for the consumer to start before processing the * stream, and testing those records that have been received. * - * @param config - * [[kinesis4cats.localstack.LocalstackConfig LocalstackConfig]] - * @param tracker - * [[kinesis4cats.kcl.multistream.MultiStreamTracker]] - * @param appName - * Application name for the consumer. Used for the dynamodb table name as - * well as the metrics namespace. - * @param workerId - * Unique identifier for the worker. Typically a UUID. - * @param processConfig - * [[kinesis4cats.kcl.KCLConsumer.ProcessConfig KCLConsumer.ProcessConfig]] - * @param resultsQueueSize - * Bounded size of the [[cats.effect.std.Queue Queue]]. - * @param cb - * User-defined callback function for processing records - * @param F - * [[cats.effect.Async Async]] - * @param LE - * [[kinesis4cats.kcl.RecordProcessor.LogEncoders RecordProcessor.LogEncoders]] - * @return - * [[kinesis4cats.kcl.localstack.LocalstackKCLConsumer.DeferredWithResults DeferredWithResults]] - * in a [[cats.effect.Resource Resource]], which completes when the - * consumer has started processing records - */ - def kclMultiConsumerWithResults[F[_]]( - config: LocalstackConfig, - tracker: MultiStreamTracker, - appName: String, - workerId: String, - processConfig: KCLConsumer.ProcessConfig, - resultsQueueSize: Int - )(cb: List[CommittableRecord[F]] => F[Unit])(implicit - F: Async[F], - LE: RecordProcessor.LogEncoders - ): Resource[F, DeferredWithResults[F]] = for { - configWithResults <- kclMultiConfigWithResults( - config, - tracker, - appName, - workerId, - processConfig, - resultsQueueSize - )(cb) - consumer = new KCLConsumer(configWithResults.kclConfig) - deferred <- consumer.runWithDeferredListener() - } yield DeferredWithResults(deferred, configWithResults.resultsQueue) - - /** Runs a [[kinesis4cats.kcl.KCLConsumer KCLConsumer]] that is compliant with - * Localstack. Exposes a [[cats.effect.Deferred Deferred]] that will complete - * when the consumer has started processing records, as well as a - * [[cats.effect.std.Queue Queue]] for tracking the received records. Useful - * for allowing tests time for the consumer to start before processing the - * stream, and testing those records that have been received. - * - * @param streamName + * @param streamTracker * Name of stream to consume * @param appName * Application name for the consumer. Used for the dynamodb table name as @@ -843,14 +471,10 @@ object LocalstackKCLConsumer { * consumer has started processing records */ def kclConsumerWithResults[F[_]]( - streamName: String, + streamTracker: StreamTracker, appName: String, prefix: Option[String] = None, workerId: String = Utils.randomUUIDString, - position: InitialPositionInStreamExtended = - InitialPositionInStreamExtended.newInitialPosition( - InitialPositionInStream.TRIM_HORIZON - ), processConfig: KCLConsumer.ProcessConfig = KCLConsumer.ProcessConfig.default, resultsQueueSize: Int = 50 @@ -859,65 +483,7 @@ object LocalstackKCLConsumer { LE: RecordProcessor.LogEncoders ): Resource[F, DeferredWithResults[F]] = for { configWithResults <- kclConfigWithResults( - streamName, - appName, - prefix, - workerId, - position, - processConfig, - resultsQueueSize - )(cb) - consumer = new KCLConsumer(configWithResults.kclConfig) - deferred <- consumer.runWithDeferredListener() - } yield DeferredWithResults(deferred, configWithResults.resultsQueue) - - /** Runs a [[kinesis4cats.kcl.KCLConsumer KCLConsumer]] that is compliant with - * Localstack. Exposes a [[cats.effect.Deferred Deferred]] that will complete - * when the consumer has started processing records, as well as a - * [[cats.effect.std.Queue Queue]] for tracking the received records. Useful - * for allowing tests time for the consumer to start before processing the - * stream, and testing those records that have been received. - * - * @param tracker - * [[kinesis4cats.kcl.multistream.MultiStreamTracker]] - * @param appName - * Application name for the consumer. Used for the dynamodb table name as - * well as the metrics namespace. - * @param prefix - * Optional prefix for parsing configuration. Default to None - * @param workerId - * Unique identifier for the worker. Default to a random UUID Default to - * TRIM_HORIZON. - * @param processConfig - * [[kinesis4cats.kcl.KCLConsumer.ProcessConfig KCLConsumer.ProcessConfig]] - * Default is `ProcessConfig.default` - * @param resultsQueueSize - * Bounded size of the [[cats.effect.std.Queue Queue]]. Default to 50. - * @param cb - * User-defined callback function for processing records - * @param F - * [[cats.effect.Async Async]] - * @param LE - * [[kinesis4cats.kcl.RecordProcessor.LogEncoders RecordProcessor.LogEncoders]] - * @return - * [[kinesis4cats.kcl.localstack.LocalstackKCLConsumer.DeferredWithResults DeferredWithResults]] - * in a [[cats.effect.Resource Resource]], which completes when the - * consumer has started processing records - */ - def kclMultiConsumerWithResults[F[_]]( - tracker: MultiStreamTracker, - appName: String, - prefix: Option[String] = None, - workerId: String = Utils.randomUUIDString, - processConfig: KCLConsumer.ProcessConfig = - KCLConsumer.ProcessConfig.default, - resultsQueueSize: Int = 50 - )(cb: List[CommittableRecord[F]] => F[Unit])(implicit - F: Async[F], - LE: RecordProcessor.LogEncoders - ): Resource[F, DeferredWithResults[F]] = for { - configWithResults <- kclMultiConfigWithResults( - tracker, + streamTracker, appName, prefix, workerId, diff --git a/kcl/src/main/scala/kinesis4cats/kcl/KCLConsumer.scala b/kcl/src/main/scala/kinesis4cats/kcl/KCLConsumer.scala index d6538a7b..146ca3b6 100644 --- a/kcl/src/main/scala/kinesis4cats/kcl/KCLConsumer.scala +++ b/kcl/src/main/scala/kinesis4cats/kcl/KCLConsumer.scala @@ -31,11 +31,11 @@ import software.amazon.kinesis.leases.LeaseManagementConfig import software.amazon.kinesis.lifecycle.LifecycleConfig import software.amazon.kinesis.metrics.MetricsConfig import software.amazon.kinesis.processor.ProcessorConfig +import software.amazon.kinesis.processor.StreamTracker import software.amazon.kinesis.retrieval.RetrievalConfig import kinesis4cats.Utils import kinesis4cats.kcl.WorkerListeners._ -import kinesis4cats.kcl.multistream.MultiStreamTracker import kinesis4cats.syntax.id._ /** Wrapper offering for the @@ -188,8 +188,10 @@ object KCLConsumer { * [[https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/services/dynamodb/DynamoDbAsyncClient.html DynamoDbAsyncClient]] * @param cloudWatchClient * [[https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/services/cloudwatch/CloudWatchClient.html CloudWatchClient]] - * @param streamName - * Name of the Kinesis stream to consume from + * @param streamTracker + * [[https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/StreamTracker.java StreamTracker]] + * to use, which defines the name of the stream(s) and the initial position + * within them * @param appName * Name of the application. Usually also the dynamo table name for * checkpoints @@ -220,7 +222,7 @@ object KCLConsumer { kinesisClient: KinesisAsyncClient, dynamoClient: DynamoDbAsyncClient, cloudWatchClient: CloudWatchAsyncClient, - streamName: String, + streamTracker: StreamTracker, appName: String, workerId: String = Utils.randomUUIDString, processConfig: ProcessConfig = ProcessConfig.default @@ -236,78 +238,7 @@ object KCLConsumer { kinesisClient, dynamoClient, cloudWatchClient, - streamName, - appName, - workerId, - processConfig - )(cb)(tfn) - .map(new KCLConsumer[F](_)) - - /** Constructor for the [[kinesis4cats.kcl.KCLConsumer KCLConsumer]] that - * leverages the - * [[https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/ConfigsBuilder.java ConfigsBuilder]] - * from the KCL. This is a simpler entry-point for creating the - * configuration, and provides a transform function to add any custom - * configuration that was not covered by the default. This constructor - * specifically leverages the - * [[kinesis4cats.kcl.multistream.MultiStreamTracker MultiStreamTracker]] to - * allow for consumption from multiple streams. - * - * @param kinesisClient - * [[https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/services/kinesis/KinesisAsyncClient.html KinesisAsyncClient]] - * @param dynamoClient - * [[https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/services/dynamodb/DynamoDbAsyncClient.html DynamoDbAsyncClient]] - * @param cloudWatchClient - * [[https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/services/cloudwatch/CloudWatchClient.html CloudWatchClient]] - * @param tracker - * [[kinesis4cats.kcl.multistream.MultiStreamTracker MultiStreamTracker]] - * @param appName - * Name of the application. Usually also the dynamo table name for - * checkpoints - * @param workerId - * Unique identifier for a single instance of this consumer. Default is a - * random UUID. - * @param processConfig - * [[kinesis4cats.kcl.KCLConsumer.ProcessConfig KCLConsumer.ProcessConfig]] - * @param cb - * Function to process - * [[kinesis4cats.kcl.CommittableRecord CommittableRecords]] received from - * Kinesis - * @param tfn - * Function to update the - * [[kinesis4cats.kcl.KCLConsumer.Config KCLConsumer.Config]]. Useful for - * overriding defaults. - * @param F - * [[cats.effect.Async Async]] instance - * @param encoders - * [[kinesis4cats.kcl.RecordProcessor.LogEncoders RecordProcessor.LogEncoders]] - * for encoding structured logs - * @return - * [[cats.effect.Resource Resource]] containing the - * [[kinesis4cats.kcl.KCLConsumer KCLConsumer]] - * @return - */ - def configsBuilderMultiStream[F[_]]( - kinesisClient: KinesisAsyncClient, - dynamoClient: DynamoDbAsyncClient, - cloudWatchClient: CloudWatchAsyncClient, - tracker: MultiStreamTracker, - appName: String, - workerId: String = Utils.randomUUIDString, - processConfig: ProcessConfig = ProcessConfig.default - )( - cb: List[CommittableRecord[F]] => F[Unit] - )( - tfn: Config[F] => Config[F] = (x: Config[F]) => x - )(implicit - F: Async[F], - encoders: RecordProcessor.LogEncoders - ): Resource[F, KCLConsumer[F]] = Config - .configsBuilderMultiStream( - kinesisClient, - dynamoClient, - cloudWatchClient, - tracker, + streamTracker, appName, workerId, processConfig @@ -460,8 +391,10 @@ object KCLConsumer { * [[https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/services/dynamodb/DynamoDbAsyncClient.html DynamoDbAsyncClient]] * @param cloudWatchClient * [[https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/services/cloudwatch/CloudWatchClient.html CloudWatchClient]] - * @param streamName - * Name of the Kinesis stream to consume from + * @param streamTracker + * [[https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/StreamTracker.java StreamTracker]] + * to use, which defines the name of the stream(s) and the initial + * position within them * @param appName * Name of the application. Usually also the dynamo table name for * checkpoints @@ -492,7 +425,7 @@ object KCLConsumer { kinesisClient: KinesisAsyncClient, dynamoClient: DynamoDbAsyncClient, cloudWatchClient: CloudWatchAsyncClient, - streamName: String, + streamTracker: StreamTracker, appName: String, workerId: String = Utils.randomUUIDString, processConfig: ProcessConfig = ProcessConfig.default @@ -511,99 +444,7 @@ object KCLConsumer { processConfig.raiseOnError )(cb) confBuilder = new ConfigsBuilder( - streamName, - appName, - kinesisClient, - dynamoClient, - cloudWatchClient, - workerId, - processorFactory - ) - } yield tfn( - Config( - confBuilder.checkpointConfig(), - confBuilder.coordinatorConfig(), - confBuilder.leaseManagementConfig(), - confBuilder.lifecycleConfig(), - confBuilder.metricsConfig(), - confBuilder.processorConfig(), - confBuilder.retrievalConfig(), - deferredException, - processConfig.raiseOnError - ) - ) - - /** Constructor for the - * [[kinesis4cats.kcl.KCLConsumer.Config KCLConsumer.Config]] that - * leverages the - * [[https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/ConfigsBuilder.java ConfigsBuilder]] - * from the KCL. This is a simpler entry-point for creating the - * configuration, and provides a transform function to add any custom - * configuration that was not covered by the default. This constructor - * specifically leverages the - * [[kinesis4cats.kcl.multistream.MultiStreamTracker MultiStreamTracker]] - * to allow for consumption from multiple streams. - * - * @param kinesisClient - * [[https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/services/kinesis/KinesisAsyncClient.html KinesisAsyncClient]] - * @param dynamoClient - * [[https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/services/dynamodb/DynamoDbAsyncClient.html DynamoDbAsyncClient]] - * @param cloudWatchClient - * [[https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/services/cloudwatch/CloudWatchClient.html CloudWatchClient]] - * @param tracker - * [[kinesis4cats.kcl.multistream.MultiStreamTracker MultiStreamTracker]] - * containing the streams and positions for consumption - * @param appName - * Name of the application. Usually also the dynamo table name for - * checkpoints - * @param workerId - * Unique identifier for a single instance of this consumer. Default is a - * random UUID. - * @param processConfig - * [[kinesis4cats.kcl.KCLConsumer.ProcessConfig KCLConsumer.ProcessConfig]] - * @param cb - * Function to process - * [[kinesis4cats.kcl.CommittableRecord CommittableRecords]] received - * from Kinesis - * @param tfn - * Function to update the - * [[kinesis4cats.kcl.KCLConsumer.Config KCLConsumer.Config]]. Useful for - * overriding defaults. - * @param F - * [[cats.effect.Async Async]] instance - * @param encoders - * [[kinesis4cats.kcl.RecordProcessor.LogEncoders RecordProcessor.LogEncoders]] - * for encoding structured logs - * @return - * [[cats.effect.Resource Resource]] containing the - * [[kinesis4cats.kcl.KCLConsumer.Config KCLConsumer.Config]] - * @return - */ - def configsBuilderMultiStream[F[_]]( - kinesisClient: KinesisAsyncClient, - dynamoClient: DynamoDbAsyncClient, - cloudWatchClient: CloudWatchAsyncClient, - tracker: MultiStreamTracker, - appName: String, - workerId: String = Utils.randomUUIDString, - processConfig: KCLConsumer.ProcessConfig = - KCLConsumer.ProcessConfig.default - )( - cb: List[CommittableRecord[F]] => F[Unit] - )( - tfn: Config[F] => Config[F] = (x: Config[F]) => x - )(implicit - F: Async[F], - encoders: RecordProcessor.LogEncoders - ): Resource[F, Config[F]] = for { - deferredException <- Resource.eval(Deferred[F, Throwable]) - processorFactory <- RecordProcessor.Factory[F]( - processConfig.recordProcessorConfig, - deferredException, - processConfig.raiseOnError - )(cb) - confBuilder = new ConfigsBuilder( - tracker, + streamTracker, appName, kinesisClient, dynamoClient, diff --git a/kcl/src/main/scala/kinesis4cats/kcl/fs2/KCLConsumerFS2.scala b/kcl/src/main/scala/kinesis4cats/kcl/fs2/KCLConsumerFS2.scala index 7020dcb5..1c4a0c09 100644 --- a/kcl/src/main/scala/kinesis4cats/kcl/fs2/KCLConsumerFS2.scala +++ b/kcl/src/main/scala/kinesis4cats/kcl/fs2/KCLConsumerFS2.scala @@ -35,13 +35,13 @@ import software.amazon.kinesis.coordinator._ import software.amazon.kinesis.leases.LeaseManagementConfig import software.amazon.kinesis.lifecycle.LifecycleConfig import software.amazon.kinesis.metrics.MetricsConfig +import software.amazon.kinesis.processor.StreamTracker import software.amazon.kinesis.retrieval.RetrievalConfig import kinesis4cats.Utils import kinesis4cats.compat.retry.RetryPolicies._ import kinesis4cats.compat.retry._ import kinesis4cats.kcl.WorkerListeners._ -import kinesis4cats.kcl.multistream.MultiStreamTracker import kinesis4cats.kcl.{CommittableRecord, KCLConsumer, RecordProcessor} /** Wrapper offering for the @@ -271,8 +271,10 @@ object KCLConsumerFS2 { * [[https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/services/dynamodb/DynamoDbAsyncClient.html DynamoDbAsyncClient]] * @param cloudWatchClient * [[https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/services/cloudwatch/CloudWatchClient.html CloudWatchClient]] - * @param streamName - * Name of the Kinesis stream to consume from + * @param streamTracker + * [[https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/StreamTracker.java StreamTracker]] + * to use, which defines the name of the stream(s) and the initial position + * within them * @param appName * Name of the application. Usually also the dynamo table name for * checkpoints @@ -307,7 +309,7 @@ object KCLConsumerFS2 { kinesisClient: KinesisAsyncClient, dynamoClient: DynamoDbAsyncClient, cloudWatchClient: CloudWatchAsyncClient, - streamName: String, + streamTracker: StreamTracker, appName: String, fs2Config: FS2Config = FS2Config.default, workerId: String = Utils.randomUUIDString, @@ -326,86 +328,7 @@ object KCLConsumerFS2 { kinesisClient, dynamoClient, cloudWatchClient, - streamName, - appName, - fs2Config, - workerId, - processConfig - )(tfn) - .map(new KCLConsumerFS2[F](_)) - - /** Constructor for the [[kinesis4cats.kcl.fs2.KCLConsumerFS2 KCLConsumerFS2]] - * that leverages the - * [[https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/ConfigsBuilder.java ConfigsBuilder]] - * from the KCL. This is a simpler entry-point for creating the - * configuration, and provides a transform function to add any custom - * configuration that was not covered by the default. This constructor - * specifically leverages the - * [[kinesis4cats.kcl.multistream.MultiStreamTracker MultiStreamTracker]] to - * allow for consumption from multiple streams. - * - * @param kinesisClient - * [[https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/services/kinesis/KinesisAsyncClient.html KinesisAsyncClient]] - * @param dynamoClient - * [[https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/services/dynamodb/DynamoDbAsyncClient.html DynamoDbAsyncClient]] - * @param cloudWatchClient - * [[https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/services/cloudwatch/CloudWatchClient.html CloudWatchClient]] - * @param tracker - * [[kinesis4cats.kcl.multistream.MultiStreamTracker MultiStreamTracker]] - * @param appName - * Name of the application. Usually also the dynamo table name for - * checkpoints - * @param queueSize - * Size of the underlying queue for the FS2 stream. If the queue fills up, - * backpressure on the processors will occur. Default 100 - * @param commitMaxChunk - * Max records to be received in the commitRecords [[fs2.Pipe Pipe]] before - * a commit is run. Default is 1000 - * @param commitMaxWait - * Max duration to wait in commitRecords [[fs2.Pipe Pipe]] before a commit - * is run. Default is 10 seconds - * @param workerId - * Unique identifier for a single instance of this consumer. Default is a - * random UUID. - * @param processConfig - * [[kinesis4cats.kcl.KCLConsumer.ProcessConfig KCLConsumer.ProcessConfig]] - * @param tfn - * Function to update the - * [[kinesis4cats.kcl.KCLConsumer.Config KCLConsumer.Config]]. Useful for - * overriding defaults. - * @param F - * [[cats.effect.Async Async]] instance - * @param encoders - * [[kinesis4cats.kcl.RecordProcessor.LogEncoders RecordProcessor.LogEncoders]] - * for encoding structured logs - * @return - * [[cats.effect.Resource Resource]] containing the - * [[kinesis4cats.kcl.fs2.KCLConsumerFS2.Config KCLConsumerFS2.Config]] - */ - def configsBuilderMultiStream[F[_]]( - kinesisClient: KinesisAsyncClient, - dynamoClient: DynamoDbAsyncClient, - cloudWatchClient: CloudWatchAsyncClient, - tracker: MultiStreamTracker, - appName: String, - fs2Config: FS2Config = FS2Config.default, - workerId: String = Utils.randomUUIDString, - processConfig: KCLConsumer.ProcessConfig = defaultProcessConfig - )( - tfn: kinesis4cats.kcl.KCLConsumer.Config[ - F - ] => kinesis4cats.kcl.KCLConsumer.Config[F] = - (x: kinesis4cats.kcl.KCLConsumer.Config[F]) => x - )(implicit - F: Async[F], - P: Parallel[F], - encoders: RecordProcessor.LogEncoders - ): Resource[F, KCLConsumerFS2[F]] = Config - .configsBuilderMultiStream( - kinesisClient, - dynamoClient, - cloudWatchClient, - tracker, + streamTracker, appName, fs2Config, workerId, @@ -521,8 +444,10 @@ object KCLConsumerFS2 { * [[https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/services/dynamodb/DynamoDbAsyncClient.html DynamoDbAsyncClient]] * @param cloudWatchClient * [[https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/services/cloudwatch/CloudWatchClient.html CloudWatchClient]] - * @param streamName - * Name of the Kinesis stream to consume from + * @param streamTracker + * [[https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/StreamTracker.java StreamTracker]] + * to use, which defines the name of the stream(s) and the initial + * position within them * @param appName * Name of the application. Usually also the dynamo table name for * checkpoints @@ -548,7 +473,7 @@ object KCLConsumerFS2 { kinesisClient: KinesisAsyncClient, dynamoClient: DynamoDbAsyncClient, cloudWatchClient: CloudWatchAsyncClient, - streamName: String, + streamTracker: StreamTracker, appName: String, fs2Config: KCLConsumerFS2.FS2Config = KCLConsumerFS2.FS2Config.default, workerId: String = Utils.randomUUIDString, @@ -570,92 +495,7 @@ object KCLConsumerFS2 { kinesisClient, dynamoClient, cloudWatchClient, - streamName, - appName, - workerId, - processConfig - )(callback(queue))(tfn) - } yield Config(underlying, queue, fs2Config) - - /** Constructor for the - * [[kinesis4cats.kcl.fs2.KCLConsumerFS2.Config KCLConsumerFS2.Config]] - * that leverages the - * [[https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/ConfigsBuilder.java ConfigsBuilder]] - * from the KCL. This is a simpler entry-point for creating the - * configuration, and provides a transform function to add any custom - * configuration that was not covered by the default. This constructor - * specifically leverages the - * [[kinesis4cats.kcl.multistream.MultiStreamTracker MultiStreamTracker]] - * to allow for consumption from multiple streams. - * - * @param kinesisClient - * [[https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/services/kinesis/KinesisAsyncClient.html KinesisAsyncClient]] - * @param dynamoClient - * [[https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/services/dynamodb/DynamoDbAsyncClient.html DynamoDbAsyncClient]] - * @param cloudWatchClient - * [[https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/services/cloudwatch/CloudWatchClient.html CloudWatchClient]] - * @param tracker - * [[kinesis4cats.kcl.multistream.MultiStreamTracker MultiStreamTracker]] - * @param appName - * Name of the application. Usually also the dynamo table name for - * checkpoints - * @param queueSize - * Size of the underlying queue for the FS2 stream. If the queue fills - * up, backpressure on the processors will occur. Default 100 - * @param commitMaxChunk - * Max records to be received in the commitRecords [[fs2.Pipe Pipe]] - * before a commit is run. Default is 1000 - * @param commitMaxWait - * Max duration to wait in commitRecords [[fs2.Pipe Pipe]] before a - * commit is run. Default is 10 seconds - * @param commitMaxRetries - * Max number of retries for a commit operation - * @param commitRetryInterval - * @param workerId - * Unique identifier for a single instance of this consumer. Default is a - * random UUID. - * @param processConfig - * [[kinesis4cats.kcl.KCLConsumer.ProcessConfig KCLConsumer.ProcessConfig]] - * @param tfn - * Function to update the - * [[kinesis4cats.kcl.KCLConsumer.Config KCLConsumer.Config]]. Useful for - * overriding defaults. - * @param F - * [[cats.effect.Async Async]] instance - * @param encoders - * [[kinesis4cats.kcl.RecordProcessor.LogEncoders RecordProcessor.LogEncoders]] - * for encoding structured logs - * @return - * [[cats.effect.Resource Resource]] containing the - * [[kinesis4cats.kcl.fs2.KCLConsumerFS2.Config KCLConsumerFS2.Config]] - */ - def configsBuilderMultiStream[F[_]]( - kinesisClient: KinesisAsyncClient, - dynamoClient: DynamoDbAsyncClient, - cloudWatchClient: CloudWatchAsyncClient, - tracker: MultiStreamTracker, - appName: String, - fs2Config: FS2Config, - workerId: String = Utils.randomUUIDString, - processConfig: KCLConsumer.ProcessConfig = defaultProcessConfig - )( - tfn: kinesis4cats.kcl.KCLConsumer.Config[ - F - ] => kinesis4cats.kcl.KCLConsumer.Config[F] = - (x: kinesis4cats.kcl.KCLConsumer.Config[F]) => x - )(implicit - F: Async[F], - encoders: RecordProcessor.LogEncoders - ): Resource[F, Config[F]] = for { - queue <- Queue - .bounded[F, CommittableRecord[F]](fs2Config.queueSize) - .toResource - underlying <- kinesis4cats.kcl.KCLConsumer.Config - .configsBuilderMultiStream( - kinesisClient, - dynamoClient, - cloudWatchClient, - tracker, + streamTracker, appName, workerId, processConfig diff --git a/project/LibraryDependencies.scala b/project/LibraryDependencies.scala index bbed7126..09aaf436 100644 --- a/project/LibraryDependencies.scala +++ b/project/LibraryDependencies.scala @@ -51,7 +51,7 @@ object LibraryDependencies { object V2 { val awssdkVersion = - "2.19.2" // Should be the same as the version in the KCL + "2.20.8" // Should be the same as the version in the KCL val kinesis = "software.amazon.awssdk" % "kinesis" % awssdkVersion val dynamo = "software.amazon.awssdk" % "dynamodb" % awssdkVersion val cloudwatch = "software.amazon.awssdk" % "cloudwatch" % awssdkVersion @@ -65,7 +65,7 @@ object LibraryDependencies { } val kpl = "com.amazonaws" % "amazon-kinesis-producer" % "0.15.5" - val kcl = "software.amazon.kinesis" % "amazon-kinesis-client" % "2.4.5" + val kcl = "software.amazon.kinesis" % "amazon-kinesis-client" % "2.4.8" } object Cats {