Skip to content

Commit

Permalink
Update amazon-kinesis-client to 2.4.8 (#88)
Browse files Browse the repository at this point in the history
Co-authored-by: Eric Meisel <[email protected]>
  • Loading branch information
scala-steward and etspaceman authored Mar 24, 2023
1 parent 0e598b4 commit 1980ad2
Show file tree
Hide file tree
Showing 23 changed files with 214 additions and 1,077 deletions.
8 changes: 5 additions & 3 deletions docs/kcl/getting-started.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import cats.syntax.all._
import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient
import software.amazon.kinesis.processor.SingleStreamTracker

import kinesis4cats.kcl._
import kinesis4cats.kcl.logging.instances.show._
Expand All @@ -36,7 +37,7 @@ object MyApp extends ResourceApp.Forever {
kinesisClient,
dynamoClient,
cloudWatchClient,
"my-stream",
new SingleStreamTracker("my-stream"),
"my-app-name"
)((records: List[CommittableRecord[IO]]) =>
records.traverse_(r => IO.println(r.data.asString))
Expand Down Expand Up @@ -88,7 +89,7 @@ object MyApp extends ResourceApp.Forever {
kinesisClient,
Map(streamArn1 -> position, streamArn2 -> position)
).toResource
consumer <- KCLConsumer.configsBuilderMultiStream[IO](
consumer <- KCLConsumer.configsBuilder[IO](
kinesisClient.client,
dynamoClient,
cloudWatchClient,
Expand All @@ -113,6 +114,7 @@ import cats.effect._
import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient
import software.amazon.kinesis.processor.SingleStreamTracker

import kinesis4cats.kcl.fs2.KCLConsumerFS2
import kinesis4cats.kcl.logging.instances.show._
Expand All @@ -133,7 +135,7 @@ object MyApp extends ResourceApp.Forever {
kinesisClient,
dynamoClient,
cloudWatchClient,
"my-stream",
new SingleStreamTracker("my-stream"),
"my-app-name"
)()
_ <- consumer
Expand Down
3 changes: 2 additions & 1 deletion docs/kcl/http4s.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import com.comcast.ip4s._
import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient
import software.amazon.kinesis.processor.SingleStreamTracker

import kinesis4cats.kcl._
import kinesis4cats.kcl.http4s.KCLService
Expand All @@ -41,7 +42,7 @@ object MyApp extends ResourceApp.Forever {
kinesisClient,
dynamoClient,
cloudWatchClient,
"my-stream",
new SingleStreamTracker("my-stream"),
"my-app-name"
)((records: List[CommittableRecord[IO]]) =>
records.traverse_(r => IO.println(r.data.asString))
Expand Down
17 changes: 14 additions & 3 deletions docs/kcl/localstack.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ libraryDependencies += "io.github.etspaceman" %% "kinesis4cats-kcl-localstack" %
```scala mdoc:compile-only
import cats.effect.IO
import cats.syntax.all._
import software.amazon.kinesis.processor.SingleStreamTracker

import kinesis4cats.kcl._
import kinesis4cats.kcl.logging.instances.show._
Expand All @@ -24,22 +25,32 @@ val processRecords = (records: List[CommittableRecord[IO]]) =>

// Runs a KCLConsumer as a Resource. Resource contains a Deferred value,
//which completes when the consumer has begun to process records.
LocalstackKCLConsumer.kclConsumer[IO]("my-stream", "my-app-name")(processRecords)
LocalstackKCLConsumer.kclConsumer[IO](
new SingleStreamTracker("my-stream"),
"my-app-name"
)(processRecords)

// Runs a KCLConsumer as a Resource. Resource contains 2 things:
// - A Deferred value, which completes when the consumer has begun to process records.
// - A results Queue, which contains records received by the consumer
LocalstackKCLConsumer.kclConsumerWithResults[IO]("my-stream", "my-app-name")(processRecords)
LocalstackKCLConsumer.kclConsumerWithResults[IO](
new SingleStreamTracker("my-stream"),
"my-app-name"
)(processRecords)
```

## Usage - FS2

```scala mdoc:compile-only
import cats.effect.IO
import software.amazon.kinesis.processor.SingleStreamTracker

import kinesis4cats.kcl.logging.instances.show._
import kinesis4cats.kcl.fs2.localstack.LocalstackKCLConsumerFS2

// Runs a KCLConsumerFS2 as a Resource, which contains FS2 Streaming methods.
LocalstackKCLConsumerFS2.kclConsumer[IO]("my-stream", "my-app-name")
LocalstackKCLConsumerFS2.kclConsumer[IO](
new SingleStreamTracker("my-stream"),
"my-app-name"
)
```
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package http4s

import cats.effect.{IO, Resource, ResourceApp}
import com.comcast.ip4s._
import software.amazon.kinesis.processor.SingleStreamTracker

import kinesis4cats.Utils
import kinesis4cats.ciris.CirisReader
Expand All @@ -30,7 +31,7 @@ object TestKCLService extends ResourceApp.Forever {
override def run(args: List[String]): Resource[IO, Unit] = for {
streamName <- CirisReader.read[String](List("test", "stream")).resource[IO]
configAndResults <- LocalstackKCLConsumer.kclConfigWithResults[IO](
streamName,
new SingleStreamTracker(streamName),
s"test-kcl-service-spec-${Utils.randomUUIDString}"
)((_: List[CommittableRecord[IO]]) => IO.unit)
consumer = new KCLConsumer[IO](configAndResults.kclConfig)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import cats.effect.syntax.all._
import cats.syntax.all._
import software.amazon.awssdk.services.kinesis.model.PutRecordsRequest
import software.amazon.awssdk.services.kinesis.model.PutRecordsResponse
import software.amazon.kinesis.common._
import software.amazon.kinesis.processor.SingleStreamTracker

import kinesis4cats.Utils
import kinesis4cats.client.KinesisClient
Expand Down Expand Up @@ -77,7 +79,12 @@ class KinesisProducerNoShardMapSpec
for {
_ <- LocalstackKinesisClient.streamResource[IO](streamName, shardCount)
deferredWithResults <- LocalstackKCLConsumer.kclConsumerWithResults(
streamName,
new SingleStreamTracker(
StreamIdentifier.singleStreamInstance(streamName),
InitialPositionInStreamExtended.newInitialPosition(
InitialPositionInStream.TRIM_HORIZON
)
),
appName
)((_: List[CommittableRecord[IO]]) => IO.unit)
_ <- deferredWithResults.deferred.get.toResource
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import cats.effect._
import cats.effect.syntax.all._
import software.amazon.awssdk.services.kinesis.model.PutRecordsRequest
import software.amazon.awssdk.services.kinesis.model.PutRecordsResponse
import software.amazon.kinesis.common._
import software.amazon.kinesis.processor.SingleStreamTracker

import kinesis4cats.Utils
import kinesis4cats.client.localstack.LocalstackKinesisClient
Expand Down Expand Up @@ -59,7 +61,12 @@ class KinesisProducerSpec
for {
_ <- LocalstackKinesisClient.streamResource[IO](streamName, shardCount)
deferredWithResults <- LocalstackKCLConsumer.kclConsumerWithResults(
streamName,
new SingleStreamTracker(
StreamIdentifier.singleStreamInstance(streamName),
InitialPositionInStreamExtended.newInitialPosition(
InitialPositionInStream.TRIM_HORIZON
)
),
appName
)((_: List[CommittableRecord[IO]]) => IO.unit)
_ <- deferredWithResults.deferred.get.toResource
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import cats.effect._
import cats.effect.syntax.all._
import software.amazon.awssdk.services.kinesis.model.PutRecordsRequest
import software.amazon.awssdk.services.kinesis.model.PutRecordsResponse
import software.amazon.kinesis.common._
import software.amazon.kinesis.processor.SingleStreamTracker

import kinesis4cats.Utils
import kinesis4cats.client.localstack.LocalstackKinesisClient
Expand Down Expand Up @@ -59,7 +61,12 @@ class KinesisFS2ProducerSpec
for {
_ <- LocalstackKinesisClient.streamResource[IO](streamName, shardCount)
deferredWithResults <- LocalstackKCLConsumer.kclConsumerWithResults(
streamName,
new SingleStreamTracker(
StreamIdentifier.singleStreamInstance(streamName),
InitialPositionInStreamExtended.newInitialPosition(
InitialPositionInStream.TRIM_HORIZON
)
),
appName
)((_: List[CommittableRecord[IO]]) => IO.unit)
_ <- deferredWithResults.deferred.get.toResource
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ import io.circe.syntax._
import org.scalacheck.Arbitrary
import software.amazon.awssdk.core.SdkBytes
import software.amazon.awssdk.services.kinesis.model.PutRecordRequest
import software.amazon.kinesis.common._
import software.amazon.kinesis.processor.SingleStreamTracker

import kinesis4cats.Utils
import kinesis4cats.client.KinesisClient
Expand Down Expand Up @@ -92,7 +94,12 @@ object KCLConsumerSpec {
): Resource[IO, Resources[IO]] = for {
client <- LocalstackKinesisClient.streamResource[IO](streamName, shardCount)
deferredWithResults <- LocalstackKCLConsumer.kclConsumerWithResults(
streamName,
new SingleStreamTracker(
StreamIdentifier.singleStreamInstance(streamName),
InitialPositionInStreamExtended.newInitialPosition(
InitialPositionInStream.TRIM_HORIZON
)
),
appName
)((_: List[CommittableRecord[IO]]) => IO.unit)
} yield Resources(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ import io.circe.syntax._
import org.scalacheck.Arbitrary
import software.amazon.awssdk.core.SdkBytes
import software.amazon.awssdk.services.kinesis.model.PutRecordRequest
import software.amazon.kinesis.common._
import software.amazon.kinesis.processor.SingleStreamTracker

import kinesis4cats.Utils
import kinesis4cats.client.KinesisClient
Expand Down Expand Up @@ -89,7 +91,12 @@ object KCLConsumerFS2Spec {
): Resource[IO, Resources[IO]] = for {
client <- LocalstackKinesisClient.streamResource[IO](streamName, shardCount)
consumer <- LocalstackKCLConsumerFS2.kclConsumer[IO](
streamName,
new SingleStreamTracker(
StreamIdentifier.singleStreamInstance(streamName),
InitialPositionInStreamExtended.newInitialPosition(
InitialPositionInStream.TRIM_HORIZON
)
),
appName
)
streamAndDeferred <- consumer.streamWithDeferredListener()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ object KCLConsumerFS2MultiSpec {
Map(streamArn1 -> position, streamArn2 -> position)
)
.toResource
consumer <- LocalstackKCLConsumerFS2.kclMultiConsumer[IO](
consumer <- LocalstackKCLConsumerFS2.kclConsumer[IO](
tracker,
appName
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ object KCLConsumerMultiSpec {
Map(streamArn1 -> position, streamArn2 -> position)
)
.toResource
deferredWithResults <- LocalstackKCLConsumer.kclMultiConsumerWithResults(
deferredWithResults <- LocalstackKCLConsumer.kclConsumerWithResults(
tracker,
appName
)((_: List[CommittableRecord[IO]]) => IO.unit)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import com.amazonaws.kinesis.PutRecordsOutput
import org.http4s.blaze.client.BlazeClientBuilder
import org.typelevel.log4cats.slf4j.Slf4jLogger
import smithy4s.aws.kernel.AwsRegion
import software.amazon.kinesis.common._
import software.amazon.kinesis.processor.SingleStreamTracker

import kinesis4cats.SSL
import kinesis4cats.Utils
Expand Down Expand Up @@ -103,7 +105,12 @@ class KinesisProducerNoShardMapSpec
loggerF = (f: Async[IO]) => Slf4jLogger.create[IO](f, implicitly)
)
deferredWithResults <- LocalstackKCLConsumer.kclConsumerWithResults(
streamName,
new SingleStreamTracker(
StreamIdentifier.singleStreamInstance(streamName),
InitialPositionInStreamExtended.newInitialPosition(
InitialPositionInStream.TRIM_HORIZON
)
),
appName
)((_: List[CommittableRecord[IO]]) => IO.unit)
_ <- deferredWithResults.deferred.get.toResource
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import com.amazonaws.kinesis.PutRecordsOutput
import org.http4s.blaze.client.BlazeClientBuilder
import org.typelevel.log4cats.slf4j.Slf4jLogger
import smithy4s.aws.kernel.AwsRegion
import software.amazon.kinesis.common._
import software.amazon.kinesis.processor.SingleStreamTracker

import kinesis4cats.SSL
import kinesis4cats.Utils
Expand Down Expand Up @@ -87,7 +89,12 @@ class KinesisProducerSpec
loggerF = (f: Async[IO]) => Slf4jLogger.create[IO](f, implicitly)
)
deferredWithResults <- LocalstackKCLConsumer.kclConsumerWithResults(
streamName,
new SingleStreamTracker(
StreamIdentifier.singleStreamInstance(streamName),
InitialPositionInStreamExtended.newInitialPosition(
InitialPositionInStream.TRIM_HORIZON
)
),
appName
)((_: List[CommittableRecord[IO]]) => IO.unit)
_ <- deferredWithResults.deferred.get.toResource
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import com.amazonaws.kinesis.PutRecordsOutput
import org.http4s.blaze.client.BlazeClientBuilder
import org.typelevel.log4cats.slf4j.Slf4jLogger
import smithy4s.aws.kernel.AwsRegion
import software.amazon.kinesis.common._
import software.amazon.kinesis.processor.SingleStreamTracker

import kinesis4cats.SSL
import kinesis4cats.Utils
Expand Down Expand Up @@ -87,7 +89,12 @@ class FS2KinesisProducerSpec
loggerF = (f: Async[IO]) => Slf4jLogger.create[IO](f, implicitly)
)
deferredWithResults <- LocalstackKCLConsumer.kclConsumerWithResults(
streamName,
new SingleStreamTracker(
StreamIdentifier.singleStreamInstance(streamName),
InitialPositionInStreamExtended.newInitialPosition(
InitialPositionInStream.TRIM_HORIZON
)
),
appName
)((_: List[CommittableRecord[IO]]) => IO.unit)
_ <- deferredWithResults.deferred.get.toResource
Expand Down
11 changes: 9 additions & 2 deletions kcl-ciris/src/main/scala/kinesis4cats/kcl/ciris/KCLCiris.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import software.amazon.kinesis.leases._
import software.amazon.kinesis.leases.dynamodb.TableCreatorCallback
import software.amazon.kinesis.lifecycle._
import software.amazon.kinesis.metrics._
import software.amazon.kinesis.processor.SingleStreamTracker
import software.amazon.kinesis.retrieval.fanout.FanOutConfig
import software.amazon.kinesis.retrieval.polling.PollingConfig
import software.amazon.kinesis.retrieval.{AggregatorUtil, RetrievalConfig}
Expand Down Expand Up @@ -1255,10 +1256,16 @@ object KCLCiris {
List("kcl", "retrieval", "max", "list", "shards", "retry", "attempts"),
prefix
)
} yield new RetrievalConfig(kinesisClient, streamName, appName)
} yield new RetrievalConfig(
kinesisClient,
new SingleStreamTracker(
StreamIdentifier.singleStreamInstance(streamName),
position
),
appName
)
.retrievalSpecificConfig(retrievalConfig)
.retrievalFactory(retrievalConfig.retrievalFactory())
.initialPositionInStreamExtended(position)
.maybeTransform(listShardsBackoffTime)(
_.listShardsBackoffTimeInMillis(_)
)
Expand Down
Loading

0 comments on commit 1980ad2

Please sign in to comment.