Skip to content

Commit

Permalink
feat: prometheus metrics (#62)
Browse files Browse the repository at this point in the history
* feat: http4s-metrics module

* feat: standard HTTP and JVM metrics

* feat: search-api to expose the /metrics API

* feat: search-provision to expose the /metrics API

* feat: metrics for monitoring Redis queues
  • Loading branch information
jachro authored Mar 19, 2024
1 parent 7f0acfa commit c0a94b2
Show file tree
Hide file tree
Showing 31 changed files with 758 additions and 55 deletions.
38 changes: 35 additions & 3 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,36 @@ lazy val httpClient = project
http4sBorer % "compile->compile;test->test"
)

lazy val http4sMetrics = project
.in(file("modules/http4s-metrics"))
.enablePlugins(AutomateHeaderPlugin)
.disablePlugins(DbTestPlugin, RevolverPlugin)
.withId("http4s-metrics")
.settings(commonSettings)
.settings(
name := "http4s-metrics",
description := "Prometheus metrics for http4s",
libraryDependencies ++=
Dependencies.http4sCore ++
Dependencies.http4sPrometheusMetrics
)

lazy val http4sCommons = project
.in(file("modules/http4s-commons"))
.enablePlugins(AutomateHeaderPlugin)
.disablePlugins(DbTestPlugin, RevolverPlugin)
.withId("http4s-commons")
.settings(commonSettings)
.settings(
name := "http4s-commons",
description := "Commons for http services",
libraryDependencies ++=
Dependencies.http4sCore ++
Dependencies.http4sDsl ++
Dependencies.http4sServer ++
Dependencies.tapirHttp4sServer
)

lazy val redisClient = project
.in(file("modules/redis-client"))
.withId("redis-client")
Expand Down Expand Up @@ -271,6 +301,7 @@ lazy val configValues = project
.dependsOn(
commons % "compile->compile;test->test",
events % "compile->compile;test->test",
http4sCommons % "compile->compile;test->test",
renkuRedisClient % "compile->compile;test->test",
searchSolrClient % "compile->compile;test->test"
)
Expand Down Expand Up @@ -312,6 +343,8 @@ lazy val searchProvision = project
.dependsOn(
commons % "compile->compile;test->test",
events % "compile->compile;test->test",
http4sCommons % "compile->compile;test->test",
http4sMetrics % "compile->compile;test->test",
renkuRedisClient % "compile->compile;test->test",
searchSolrClient % "compile->compile;test->test",
configValues % "compile->compile;test->test"
Expand All @@ -326,14 +359,13 @@ lazy val searchApi = project
name := "search-api",
libraryDependencies ++=
Dependencies.ciris ++
Dependencies.http4sDsl ++
Dependencies.http4sServer ++
Dependencies.tapirHttp4sServer ++
Dependencies.tapirOpenAPI
)
.dependsOn(
commons % "compile->compile;test->test",
http4sBorer % "compile->compile;test->test",
http4sCommons % "compile->compile;test->test",
http4sMetrics % "compile->compile;test->test",
searchSolrClient % "compile->compile;test->test",
configValues % "compile->compile;test->test",
searchQueryDocs % "compile->compile;test->test",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package io.renku.search.config

import cats.syntax.all.*
import ciris.{ConfigDecoder, ConfigError}
import com.comcast.ip4s.{Ipv4Address, Port}
import io.renku.redis.client.*
import org.http4s.Uri

Expand Down Expand Up @@ -50,3 +51,12 @@ trait ConfigDecoders:

given ConfigDecoder[String, QueueName] =
ConfigDecoder[String].map(s => QueueName(s))
given ConfigDecoder[String, ClientId] =
ConfigDecoder[String].map(s => ClientId(s))

given ConfigDecoder[String, Ipv4Address] =
ConfigDecoder[String]
.mapOption(Ipv4Address.getClass.getSimpleName)(Ipv4Address.fromString)
given ConfigDecoder[String, Port] =
ConfigDecoder[String]
.mapOption(Port.getClass.getSimpleName)(Port.fromString)
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ package io.renku.search.config

import cats.syntax.all.*
import ciris.*
import com.comcast.ip4s.{Ipv4Address, Port}
import io.renku.redis.client.*
import io.renku.search.http.HttpServerConfig
import io.renku.solr.client.{SolrConfig, SolrUser}
import org.http4s.Uri

Expand Down Expand Up @@ -53,6 +55,12 @@ object ConfigValues extends ConfigDecoders:
val retryOnErrorDelay: ConfigValue[Effect, FiniteDuration] =
renv("RETRY_ON_ERROR_DELAY").default("10 seconds").as[FiniteDuration]

val metricsUpdateInterval: ConfigValue[Effect, FiniteDuration] =
renv("METRICS_UPDATE_INTERVAL").default("15 seconds").as[FiniteDuration]

def clientId(default: ClientId): ConfigValue[Effect, ClientId] =
renv("CLIENT_ID").default(default.value).as[ClientId]

val solrConfig: ConfigValue[Effect, SolrConfig] = {
val url = renv("SOLR_URL").default("http://localhost:8983/solr").as[Uri]
val core = renv("SOLR_CORE").default("search-core-test")
Expand All @@ -69,3 +77,8 @@ object ConfigValues extends ConfigDecoders:
renv("SOLR_LOG_MESSAGE_BODIES").default("false").as[Boolean]
(url, core, maybeUser, defaultCommit, logMessageBodies).mapN(SolrConfig.apply)
}

def httpServerConfig(defaultPort: Port): ConfigValue[Effect, HttpServerConfig] =
val bindAddress = renv("HTTP_SERVER_BIND_ADDRESS").default("0.0.0.0").as[Ipv4Address]
val port = renv("HTTP_SERVER_PORT").default(defaultPort.value.toString).as[Port]
(bindAddress, port).mapN(HttpServerConfig.apply)
Original file line number Diff line number Diff line change
Expand Up @@ -16,23 +16,23 @@
* limitations under the License.
*/

package io.renku.search.api
package io.renku.search.http

import cats.effect.{Async, Resource}
import com.comcast.ip4s.{Port, ipv4, port}
import fs2.io.net.Network
import org.http4s.HttpApp
import org.http4s.HttpRoutes
import org.http4s.ember.server.EmberServerBuilder
import org.http4s.server.Server

object HttpServer:

val port: Port = port"8080"

def build[F[_]: Async: Network](app: HttpApp[F]): Resource[F, Server] =
def build[F[_]: Async: Network](
routes: HttpRoutes[F],
config: HttpServerConfig
): Resource[F, Server] =
EmberServerBuilder
.default[F]
.withHost(ipv4"0.0.0.0")
.withPort(port)
.withHttpApp(app)
.withHost(config.bindAddress)
.withPort(config.port)
.withHttpApp(routes.orNotFound)
.build
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright 2024 Swiss Data Science Center (SDSC)
* A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and
* Eidgenössische Technische Hochschule Zürich (ETHZ).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.renku.search.http

import com.comcast.ip4s.{Ipv4Address, Port}

final case class HttpServerConfig(bindAddress: Ipv4Address, port: Port)
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* limitations under the License.
*/

package io.renku.search.api.routes
package io.renku.search.http.routes

import cats.effect.Async
import cats.syntax.all.*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright 2024 Swiss Data Science Center (SDSC)
* A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and
* Eidgenössische Technische Hochschule Zürich (ETHZ).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.renku.search.http.metrics

import cats.effect.{Resource, Sync}
import cats.syntax.all.*
import io.renku.search.metrics.CollectorRegistryBuilder
import org.http4s.HttpRoutes
import org.http4s.metrics.prometheus.{Prometheus, PrometheusExportService}
import org.http4s.server.middleware.Metrics

final class MetricsRoutes[F[_]: Sync](registryBuilder: CollectorRegistryBuilder[F]):

def makeRoutes(measuredRoutes: HttpRoutes[F]): Resource[F, HttpRoutes[F]] =
for
registry <- registryBuilder.makeRegistry
metrics <- Prometheus.metricsOps[F](registry, prefix = "server")
yield PrometheusExportService(registry).routes <+> Metrics[F](metrics)(measuredRoutes)

def makeRoutes: Resource[F, HttpRoutes[F]] =
registryBuilder.makeRegistry.map(PrometheusExportService.apply).map(_.routes)
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Copyright 2024 Swiss Data Science Center (SDSC)
* A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and
* Eidgenössische Technische Hochschule Zürich (ETHZ).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.renku.search.metrics

import io.prometheus.client.Collector as JCollector

trait Collector:
def asJCollector: JCollector
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Copyright 2024 Swiss Data Science Center (SDSC)
* A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and
* Eidgenössische Technische Hochschule Zürich (ETHZ).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.renku.search.metrics

import cats.effect.{Resource, Sync}
import cats.syntax.all.*
import io.prometheus.client.{CollectorRegistry, Collector as JCollector}
import org.http4s.metrics.prometheus.PrometheusExportService

final case class CollectorRegistryBuilder[F[_]: Sync](
collectors: Set[Collector],
standardJVMMetrics: Boolean
):

def withJVMMetrics: CollectorRegistryBuilder[F] =
copy(standardJVMMetrics = true)

def add(c: Collector): CollectorRegistryBuilder[F] =
copy(collectors = collectors + c)

def makeRegistry: Resource[F, CollectorRegistry] =
val registry = new CollectorRegistry()
(registerJVM(registry) >> registerCollectors(registry))
.as(registry)

private def registerCollectors(registry: CollectorRegistry): Resource[F, Unit] =
collectors.toList.map(registerCollector(registry)).sequence.void

private def registerCollector(registry: CollectorRegistry)(collector: Collector) =
val F = Sync[F]
val acq = F.blocking(collector.asJCollector.register[JCollector](registry))
Resource.make(acq)(c => F.blocking(registry.unregister(c)))

private def registerJVM(registry: CollectorRegistry) =
if standardJVMMetrics then PrometheusExportService.addDefaults(registry)
else Resource.pure[F, Unit](())

object CollectorRegistryBuilder:
def apply[F[_]: Sync]: CollectorRegistryBuilder[F] =
new CollectorRegistryBuilder[F](Set.empty, false)
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,14 @@ import cats.effect.{Async, Resource}
import cats.syntax.all.*
import dev.profunktor.redis4cats.connection.RedisClient
import dev.profunktor.redis4cats.data.RedisCodec
import dev.profunktor.redis4cats.effect.Log
import dev.profunktor.redis4cats.effect.FutureLift.*
import dev.profunktor.redis4cats.effect.{FutureLift, Log}
import dev.profunktor.redis4cats.effects.ScriptOutputType
import dev.profunktor.redis4cats.streams.data.{StreamingOffset, XAddMessage, XReadMessage}
import dev.profunktor.redis4cats.streams.{RedisStream, Streaming}
import dev.profunktor.redis4cats.{Redis, RedisCommands}
import fs2.Stream
import io.lettuce.core.api.StatefulRedisConnection
import scodec.bits.ByteVector
import scribe.Scribe

Expand All @@ -51,6 +54,10 @@ trait RedisQueueClient[F[_]] {
): F[Unit]

def findLastProcessed(clientId: ClientId, queueName: QueueName): F[Option[MessageId]]

def getSize(queueName: QueueName): F[Long]

def getSize(queueName: QueueName, from: MessageId): F[Long]
}

object RedisQueueClient:
Expand Down Expand Up @@ -131,11 +138,47 @@ class RedisQueueClientImpl[F[_]: Async: Log](client: RedisClient)
private def formProcessedKey(clientId: ClientId, queueName: QueueName) =
s"$queueName.$clientId"

override def getSize(queueName: QueueName): F[Long] =
val xlen: StatefulRedisConnection[String, ByteVector] => F[Long] =
_.async().xlen(queueName.name).futureLift.map(_.longValue())

makeLettuceStreamingConnection(StringBytesCodec.instance)
.use(xlen)

override def getSize(queueName: QueueName, from: MessageId): F[Long] =
val script =
"""local xrange = redis.call('XRANGE', KEYS[1], ARGV[1], ARGV[2])
|return #xrange - 1""".stripMargin
createStringCommands.use(
_.eval(
script,
ScriptOutputType.Integer,
List(queueName.name),
List(from.value, "+")
)
)

private def makeStreamingConnection
: Stream[F, Streaming[[A] =>> Stream[F, A], String, ByteVector]] =
RedisStream
.mkStreamingConnection[F, String, ByteVector](client, StringBytesCodec.instance)

private def makeLettuceStreamingConnection[K, V](
codec: RedisCodec[K, V]
): Resource[F, StatefulRedisConnection[K, V]] = {
val acquire =
FutureLift[F].lift(
client.underlying.connectAsync[K, V](codec.underlying, client.uri.underlying)
)

client.underlying.connect
val release: StatefulRedisConnection[K, V] => F[Unit] = c =>
FutureLift[F].lift(c.closeAsync()) *>
Log[F].info(s"Releasing Streaming connection: ${client.uri.underlying}")

Resource.make(acquire)(release)
}

private def createStringCommands: Resource[F, RedisCommands[F, String, String]] =
Redis[F].fromClient(client, RedisCodec.Utf8)
}
Loading

0 comments on commit c0a94b2

Please sign in to comment.