Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP - MetricsReporter for Producers #128

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package kinesis4cats.client.producer.metrics

import cats.effect.Async
import fs2.Chunk
import fs2.concurrent.Channel
import org.typelevel.log4cats.StructuredLogger
import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient
import software.amazon.awssdk.services.cloudwatch.model.Dimension
import software.amazon.awssdk.services.cloudwatch.model.MetricDatum
import software.amazon.awssdk.services.cloudwatch.model.PutMetricDataRequest
import software.amazon.awssdk.services.cloudwatch.model.PutMetricDataResponse

import kinesis4cats.producer.metrics

class CloudwatchMetricsReporter[F[_]](
client: CloudWatchAsyncClient,
namespace: String,
encoders: metrics.MetricsReporter.LogEncoders,
override val logger: StructuredLogger[F],
override val config: metrics.MetricsReporter.Config[F],
override protected val channel: Channel[F, metrics.Metric]
)(
override protected val callback: PutMetricDataResponse => F[Unit]
)(implicit F: Async[F])
extends metrics.MetricsReporter[F, PutMetricDataResponse](encoders) {

override def _put(x: Chunk[metrics.Metric]): F[PutMetricDataResponse] = F
.fromCompletableFuture(
F.delay(
client.putMetricData(
PutMetricDataRequest
.builder()
.namespace(namespace)
.metricData(x.toList.map(CloudwatchMetricsReporter.asAws): _*)
.build()
)
)
)

}

object CloudwatchMetricsReporter {
private def asAwsDimension(dimension: metrics.Dimension): Dimension =
Dimension
.builder()
.name(dimension.name)
.value(dimension.value)
.build()

def asAws(metric: metrics.Metric): MetricDatum = MetricDatum
.builder()
.metricName(metric.name)
.timestamp(metric.timestamp)
.dimensions(metric.dimensions.map(asAwsDimension): _*)
.unit(metric.unit.value)
.value(metric.value)
.build()
}
Original file line number Diff line number Diff line change
Expand Up @@ -142,11 +142,6 @@ object FS2Producer {
* Size of underlying buffer of records
* @param putMaxChunk
* Max records to buffer before running a put request
* @param putMaxWait
* Max time to wait before running a put request
* @param putMaxRetries
* Number of retries for the underlying put request. None means infinite
* retries.
* @param putRetryInterval
* Delay between retries
* @param producerConfig
Expand All @@ -156,8 +151,6 @@ object FS2Producer {
queueSize: Int,
putMaxChunk: Int,
putMaxWait: FiniteDuration,
putMaxRetries: Option[Int],
putRetryInterval: FiniteDuration,
producerConfig: Producer.Config[F],
gracefulShutdownWait: FiniteDuration
)
Expand All @@ -169,8 +162,6 @@ object FS2Producer {
1000,
500,
100.millis,
Some(5),
0.seconds,
Producer.Config.default[F](streamNameOrArn),
30.seconds
)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package kinesis4cats.producer.metrics

import kinesis4cats.models.ShardId
import kinesis4cats.models.StreamArn

sealed abstract class Dimension(val name: String, val value: String)

object Dimension {
final case class Stream(arn: StreamArn)
extends Dimension("StreamARN", arn.streamArn)

final case class Shard(id: ShardId) extends Dimension("ShardID", id.shardId)

final case class ErrorCode(override val value: String)
extends Dimension("ErrorCode", value)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package kinesis4cats.producer.metrics

sealed abstract class Granularity(val value: String)

object Granularity {
case object Global extends Granularity("GLOBAL")
case object Stream extends Granularity("STREAM")
case object Shard extends Granularity("SHARD")
}
21 changes: 21 additions & 0 deletions shared/src/main/scala/kinesis4cats/producer/metrics/Level.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package kinesis4cats.producer.metrics

sealed abstract class Level(val value: String) {
def isDetailed: Boolean
def isSummary: Boolean
}

object Level {
case object None extends Level("NONE") {
override def isDetailed: Boolean = false
override def isSummary: Boolean = false
}
case object Summary extends Level("SUMMARY") {
override def isDetailed: Boolean = false
override def isSummary: Boolean = true
}
case object Detailed extends Level("DETAILED") {
override def isDetailed: Boolean = true
override def isSummary: Boolean = true
}
}
Loading