diff --git a/build.sbt b/build.sbt index 7c76fa57..def21169 100644 --- a/build.sbt +++ b/build.sbt @@ -145,6 +145,36 @@ lazy val httpClient = project http4sBorer % "compile->compile;test->test" ) +lazy val http4sMetrics = project + .in(file("modules/http4s-metrics")) + .enablePlugins(AutomateHeaderPlugin) + .disablePlugins(DbTestPlugin, RevolverPlugin) + .withId("http4s-metrics") + .settings(commonSettings) + .settings( + name := "http4s-metrics", + description := "Prometheus metrics for http4s", + libraryDependencies ++= + Dependencies.http4sCore ++ + Dependencies.http4sPrometheusMetrics + ) + +lazy val http4sCommons = project + .in(file("modules/http4s-commons")) + .enablePlugins(AutomateHeaderPlugin) + .disablePlugins(DbTestPlugin, RevolverPlugin) + .withId("http4s-commons") + .settings(commonSettings) + .settings( + name := "http4s-commons", + description := "Commons for http services", + libraryDependencies ++= + Dependencies.http4sCore ++ + Dependencies.http4sDsl ++ + Dependencies.http4sServer ++ + Dependencies.tapirHttp4sServer + ) + lazy val redisClient = project .in(file("modules/redis-client")) .withId("redis-client") @@ -271,6 +301,7 @@ lazy val configValues = project .dependsOn( commons % "compile->compile;test->test", events % "compile->compile;test->test", + http4sCommons % "compile->compile;test->test", renkuRedisClient % "compile->compile;test->test", searchSolrClient % "compile->compile;test->test" ) @@ -312,6 +343,8 @@ lazy val searchProvision = project .dependsOn( commons % "compile->compile;test->test", events % "compile->compile;test->test", + http4sCommons % "compile->compile;test->test", + http4sMetrics % "compile->compile;test->test", renkuRedisClient % "compile->compile;test->test", searchSolrClient % "compile->compile;test->test", configValues % "compile->compile;test->test" @@ -326,14 +359,13 @@ lazy val searchApi = project name := "search-api", libraryDependencies ++= Dependencies.ciris ++ - Dependencies.http4sDsl ++ - Dependencies.http4sServer ++ - Dependencies.tapirHttp4sServer ++ Dependencies.tapirOpenAPI ) .dependsOn( commons % "compile->compile;test->test", http4sBorer % "compile->compile;test->test", + http4sCommons % "compile->compile;test->test", + http4sMetrics % "compile->compile;test->test", searchSolrClient % "compile->compile;test->test", configValues % "compile->compile;test->test", searchQueryDocs % "compile->compile;test->test", diff --git a/modules/config-values/src/main/scala/io/renku/search/config/ConfigDecoders.scala b/modules/config-values/src/main/scala/io/renku/search/config/ConfigDecoders.scala index d84b406b..89c63f6d 100644 --- a/modules/config-values/src/main/scala/io/renku/search/config/ConfigDecoders.scala +++ b/modules/config-values/src/main/scala/io/renku/search/config/ConfigDecoders.scala @@ -20,6 +20,7 @@ package io.renku.search.config import cats.syntax.all.* import ciris.{ConfigDecoder, ConfigError} +import com.comcast.ip4s.{Ipv4Address, Port} import io.renku.redis.client.* import org.http4s.Uri @@ -50,3 +51,12 @@ trait ConfigDecoders: given ConfigDecoder[String, QueueName] = ConfigDecoder[String].map(s => QueueName(s)) + given ConfigDecoder[String, ClientId] = + ConfigDecoder[String].map(s => ClientId(s)) + + given ConfigDecoder[String, Ipv4Address] = + ConfigDecoder[String] + .mapOption(Ipv4Address.getClass.getSimpleName)(Ipv4Address.fromString) + given ConfigDecoder[String, Port] = + ConfigDecoder[String] + .mapOption(Port.getClass.getSimpleName)(Port.fromString) diff --git a/modules/config-values/src/main/scala/io/renku/search/config/ConfigValues.scala b/modules/config-values/src/main/scala/io/renku/search/config/ConfigValues.scala index 59ebd843..3d64280b 100644 --- a/modules/config-values/src/main/scala/io/renku/search/config/ConfigValues.scala +++ b/modules/config-values/src/main/scala/io/renku/search/config/ConfigValues.scala @@ -20,7 +20,9 @@ package io.renku.search.config import cats.syntax.all.* import ciris.* +import com.comcast.ip4s.{Ipv4Address, Port} import io.renku.redis.client.* +import io.renku.search.http.HttpServerConfig import io.renku.solr.client.{SolrConfig, SolrUser} import org.http4s.Uri @@ -53,6 +55,12 @@ object ConfigValues extends ConfigDecoders: val retryOnErrorDelay: ConfigValue[Effect, FiniteDuration] = renv("RETRY_ON_ERROR_DELAY").default("10 seconds").as[FiniteDuration] + val metricsUpdateInterval: ConfigValue[Effect, FiniteDuration] = + renv("METRICS_UPDATE_INTERVAL").default("15 seconds").as[FiniteDuration] + + def clientId(default: ClientId): ConfigValue[Effect, ClientId] = + renv("CLIENT_ID").default(default.value).as[ClientId] + val solrConfig: ConfigValue[Effect, SolrConfig] = { val url = renv("SOLR_URL").default("http://localhost:8983/solr").as[Uri] val core = renv("SOLR_CORE").default("search-core-test") @@ -69,3 +77,8 @@ object ConfigValues extends ConfigDecoders: renv("SOLR_LOG_MESSAGE_BODIES").default("false").as[Boolean] (url, core, maybeUser, defaultCommit, logMessageBodies).mapN(SolrConfig.apply) } + + def httpServerConfig(defaultPort: Port): ConfigValue[Effect, HttpServerConfig] = + val bindAddress = renv("HTTP_SERVER_BIND_ADDRESS").default("0.0.0.0").as[Ipv4Address] + val port = renv("HTTP_SERVER_PORT").default(defaultPort.value.toString).as[Port] + (bindAddress, port).mapN(HttpServerConfig.apply) diff --git a/modules/search-api/src/main/scala/io/renku/search/api/HttpServer.scala b/modules/http4s-commons/src/main/scala/io/renku/search/http/HttpServer.scala similarity index 77% rename from modules/search-api/src/main/scala/io/renku/search/api/HttpServer.scala rename to modules/http4s-commons/src/main/scala/io/renku/search/http/HttpServer.scala index 486ec551..19c26ed1 100644 --- a/modules/search-api/src/main/scala/io/renku/search/api/HttpServer.scala +++ b/modules/http4s-commons/src/main/scala/io/renku/search/http/HttpServer.scala @@ -16,23 +16,23 @@ * limitations under the License. */ -package io.renku.search.api +package io.renku.search.http import cats.effect.{Async, Resource} -import com.comcast.ip4s.{Port, ipv4, port} import fs2.io.net.Network -import org.http4s.HttpApp +import org.http4s.HttpRoutes import org.http4s.ember.server.EmberServerBuilder import org.http4s.server.Server object HttpServer: - val port: Port = port"8080" - - def build[F[_]: Async: Network](app: HttpApp[F]): Resource[F, Server] = + def build[F[_]: Async: Network]( + routes: HttpRoutes[F], + config: HttpServerConfig + ): Resource[F, Server] = EmberServerBuilder .default[F] - .withHost(ipv4"0.0.0.0") - .withPort(port) - .withHttpApp(app) + .withHost(config.bindAddress) + .withPort(config.port) + .withHttpApp(routes.orNotFound) .build diff --git a/modules/http4s-commons/src/main/scala/io/renku/search/http/HttpServerConfig.scala b/modules/http4s-commons/src/main/scala/io/renku/search/http/HttpServerConfig.scala new file mode 100644 index 00000000..e7637624 --- /dev/null +++ b/modules/http4s-commons/src/main/scala/io/renku/search/http/HttpServerConfig.scala @@ -0,0 +1,23 @@ +/* + * Copyright 2024 Swiss Data Science Center (SDSC) + * A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and + * Eidgenössische Technische Hochschule Zürich (ETHZ). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.renku.search.http + +import com.comcast.ip4s.{Ipv4Address, Port} + +final case class HttpServerConfig(bindAddress: Ipv4Address, port: Port) diff --git a/modules/search-api/src/main/scala/io/renku/search/api/routes/OperationRoutes.scala b/modules/http4s-commons/src/main/scala/io/renku/search/http/routes/OperationRoutes.scala similarity index 97% rename from modules/search-api/src/main/scala/io/renku/search/api/routes/OperationRoutes.scala rename to modules/http4s-commons/src/main/scala/io/renku/search/http/routes/OperationRoutes.scala index e8338e9f..88b1a3e0 100644 --- a/modules/search-api/src/main/scala/io/renku/search/api/routes/OperationRoutes.scala +++ b/modules/http4s-commons/src/main/scala/io/renku/search/http/routes/OperationRoutes.scala @@ -16,7 +16,7 @@ * limitations under the License. */ -package io.renku.search.api.routes +package io.renku.search.http.routes import cats.effect.Async import cats.syntax.all.* diff --git a/modules/http4s-metrics/src/main/scala/io/renku/search/http/metrics/MetricsRoutes.scala b/modules/http4s-metrics/src/main/scala/io/renku/search/http/metrics/MetricsRoutes.scala new file mode 100644 index 00000000..d55cd319 --- /dev/null +++ b/modules/http4s-metrics/src/main/scala/io/renku/search/http/metrics/MetricsRoutes.scala @@ -0,0 +1,37 @@ +/* + * Copyright 2024 Swiss Data Science Center (SDSC) + * A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and + * Eidgenössische Technische Hochschule Zürich (ETHZ). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.renku.search.http.metrics + +import cats.effect.{Resource, Sync} +import cats.syntax.all.* +import io.renku.search.metrics.CollectorRegistryBuilder +import org.http4s.HttpRoutes +import org.http4s.metrics.prometheus.{Prometheus, PrometheusExportService} +import org.http4s.server.middleware.Metrics + +final class MetricsRoutes[F[_]: Sync](registryBuilder: CollectorRegistryBuilder[F]): + + def makeRoutes(measuredRoutes: HttpRoutes[F]): Resource[F, HttpRoutes[F]] = + for + registry <- registryBuilder.makeRegistry + metrics <- Prometheus.metricsOps[F](registry, prefix = "server") + yield PrometheusExportService(registry).routes <+> Metrics[F](metrics)(measuredRoutes) + + def makeRoutes: Resource[F, HttpRoutes[F]] = + registryBuilder.makeRegistry.map(PrometheusExportService.apply).map(_.routes) diff --git a/modules/http4s-metrics/src/main/scala/io/renku/search/metrics/Collector.scala b/modules/http4s-metrics/src/main/scala/io/renku/search/metrics/Collector.scala new file mode 100644 index 00000000..06418db0 --- /dev/null +++ b/modules/http4s-metrics/src/main/scala/io/renku/search/metrics/Collector.scala @@ -0,0 +1,24 @@ +/* + * Copyright 2024 Swiss Data Science Center (SDSC) + * A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and + * Eidgenössische Technische Hochschule Zürich (ETHZ). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.renku.search.metrics + +import io.prometheus.client.Collector as JCollector + +trait Collector: + def asJCollector: JCollector diff --git a/modules/http4s-metrics/src/main/scala/io/renku/search/metrics/CollectorRegistryBuilder.scala b/modules/http4s-metrics/src/main/scala/io/renku/search/metrics/CollectorRegistryBuilder.scala new file mode 100644 index 00000000..a9afbf26 --- /dev/null +++ b/modules/http4s-metrics/src/main/scala/io/renku/search/metrics/CollectorRegistryBuilder.scala @@ -0,0 +1,56 @@ +/* + * Copyright 2024 Swiss Data Science Center (SDSC) + * A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and + * Eidgenössische Technische Hochschule Zürich (ETHZ). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.renku.search.metrics + +import cats.effect.{Resource, Sync} +import cats.syntax.all.* +import io.prometheus.client.{CollectorRegistry, Collector as JCollector} +import org.http4s.metrics.prometheus.PrometheusExportService + +final case class CollectorRegistryBuilder[F[_]: Sync]( + collectors: Set[Collector], + standardJVMMetrics: Boolean +): + + def withJVMMetrics: CollectorRegistryBuilder[F] = + copy(standardJVMMetrics = true) + + def add(c: Collector): CollectorRegistryBuilder[F] = + copy(collectors = collectors + c) + + def makeRegistry: Resource[F, CollectorRegistry] = + val registry = new CollectorRegistry() + (registerJVM(registry) >> registerCollectors(registry)) + .as(registry) + + private def registerCollectors(registry: CollectorRegistry): Resource[F, Unit] = + collectors.toList.map(registerCollector(registry)).sequence.void + + private def registerCollector(registry: CollectorRegistry)(collector: Collector) = + val F = Sync[F] + val acq = F.blocking(collector.asJCollector.register[JCollector](registry)) + Resource.make(acq)(c => F.blocking(registry.unregister(c))) + + private def registerJVM(registry: CollectorRegistry) = + if standardJVMMetrics then PrometheusExportService.addDefaults(registry) + else Resource.pure[F, Unit](()) + +object CollectorRegistryBuilder: + def apply[F[_]: Sync]: CollectorRegistryBuilder[F] = + new CollectorRegistryBuilder[F](Set.empty, false) diff --git a/modules/redis-client/src/main/scala/io/renku/redis/client/RedisQueueClient.scala b/modules/redis-client/src/main/scala/io/renku/redis/client/RedisQueueClient.scala index 010bed79..7d8adeea 100644 --- a/modules/redis-client/src/main/scala/io/renku/redis/client/RedisQueueClient.scala +++ b/modules/redis-client/src/main/scala/io/renku/redis/client/RedisQueueClient.scala @@ -22,11 +22,14 @@ import cats.effect.{Async, Resource} import cats.syntax.all.* import dev.profunktor.redis4cats.connection.RedisClient import dev.profunktor.redis4cats.data.RedisCodec -import dev.profunktor.redis4cats.effect.Log +import dev.profunktor.redis4cats.effect.FutureLift.* +import dev.profunktor.redis4cats.effect.{FutureLift, Log} +import dev.profunktor.redis4cats.effects.ScriptOutputType import dev.profunktor.redis4cats.streams.data.{StreamingOffset, XAddMessage, XReadMessage} import dev.profunktor.redis4cats.streams.{RedisStream, Streaming} import dev.profunktor.redis4cats.{Redis, RedisCommands} import fs2.Stream +import io.lettuce.core.api.StatefulRedisConnection import scodec.bits.ByteVector import scribe.Scribe @@ -51,6 +54,10 @@ trait RedisQueueClient[F[_]] { ): F[Unit] def findLastProcessed(clientId: ClientId, queueName: QueueName): F[Option[MessageId]] + + def getSize(queueName: QueueName): F[Long] + + def getSize(queueName: QueueName, from: MessageId): F[Long] } object RedisQueueClient: @@ -131,11 +138,47 @@ class RedisQueueClientImpl[F[_]: Async: Log](client: RedisClient) private def formProcessedKey(clientId: ClientId, queueName: QueueName) = s"$queueName.$clientId" + override def getSize(queueName: QueueName): F[Long] = + val xlen: StatefulRedisConnection[String, ByteVector] => F[Long] = + _.async().xlen(queueName.name).futureLift.map(_.longValue()) + + makeLettuceStreamingConnection(StringBytesCodec.instance) + .use(xlen) + + override def getSize(queueName: QueueName, from: MessageId): F[Long] = + val script = + """local xrange = redis.call('XRANGE', KEYS[1], ARGV[1], ARGV[2]) + |return #xrange - 1""".stripMargin + createStringCommands.use( + _.eval( + script, + ScriptOutputType.Integer, + List(queueName.name), + List(from.value, "+") + ) + ) + private def makeStreamingConnection : Stream[F, Streaming[[A] =>> Stream[F, A], String, ByteVector]] = RedisStream .mkStreamingConnection[F, String, ByteVector](client, StringBytesCodec.instance) + private def makeLettuceStreamingConnection[K, V]( + codec: RedisCodec[K, V] + ): Resource[F, StatefulRedisConnection[K, V]] = { + val acquire = + FutureLift[F].lift( + client.underlying.connectAsync[K, V](codec.underlying, client.uri.underlying) + ) + + client.underlying.connect + val release: StatefulRedisConnection[K, V] => F[Unit] = c => + FutureLift[F].lift(c.closeAsync()) *> + Log[F].info(s"Releasing Streaming connection: ${client.uri.underlying}") + + Resource.make(acquire)(release) + } + private def createStringCommands: Resource[F, RedisCommands[F, String, String]] = Redis[F].fromClient(client, RedisCodec.Utf8) } diff --git a/modules/redis-client/src/test/scala/io/renku/redis/client/RedisQueueClientSpec.scala b/modules/redis-client/src/test/scala/io/renku/redis/client/RedisQueueClientSpec.scala index a0aa56e1..15668c30 100644 --- a/modules/redis-client/src/test/scala/io/renku/redis/client/RedisQueueClientSpec.scala +++ b/modules/redis-client/src/test/scala/io/renku/redis/client/RedisQueueClientSpec.scala @@ -25,10 +25,13 @@ import dev.profunktor.redis4cats.streams.data.XAddMessage import dev.profunktor.redis4cats.streams.{RedisStream, Streaming} import fs2.* import fs2.concurrent.SignallingRef -import io.renku.search.GeneratorSyntax.* import io.renku.redis.client.RedisClientGenerators.* import io.renku.redis.client.util.RedisSpec +import io.renku.search.GeneratorSyntax.* import munit.CatsEffectSuite +import org.scalacheck.Gen +import org.scalacheck.Gen.alphaChar +import org.scalacheck.cats.implicits.* import scodec.bits.ByteVector class RedisQueueClientSpec extends CatsEffectSuite with RedisSpec: @@ -129,6 +132,36 @@ class RedisQueueClientSpec extends CatsEffectSuite with RedisSpec: yield () } + test("can find out the total size of the given stream"): + withRedisClient.asRedisQueueClient().use { client => + val queue = RedisClientGenerators.queueNameGen.generateOne + val messages = (stringGen, stringGen).mapN(_ -> _).generateList(1, 30) + for + _ <- messages.traverse_ { case (h, p) => + client.enqueue(queue, toByteVector(h), toByteVector(p)) + } + _ <- client.getSize(queue).map(s => assert(s == messages.size)) + yield () + } + + test("can find out a size of the given stream from the given MessageId"): + withRedisClient.asRedisQueueClient().use { client => + val queue = RedisClientGenerators.queueNameGen.generateOne + val olderMessages = (stringGen, stringGen).mapN(_ -> _).generateList(1, 30) + val (msgH, msgP) = (stringGen, stringGen).mapN(_ -> _).generateOne + val newerMessages = (stringGen, stringGen).mapN(_ -> _).generateList(1, 30) + for + _ <- olderMessages.traverse_ { case (h, p) => + client.enqueue(queue, toByteVector(h), toByteVector(p)) + } + messageId <- client.enqueue(queue, toByteVector(msgH), toByteVector(msgP)) + _ <- newerMessages.traverse_ { case (h, p) => + client.enqueue(queue, toByteVector(h), toByteVector(p)) + } + _ <- client.getSize(queue, messageId).map(s => assert(s == newerMessages.size)) + yield () + } + private def toByteVector(v: String): ByteVector = ByteVector.encodeUtf8(v).fold(throw _, identity) @@ -152,6 +185,11 @@ class RedisQueueClientSpec extends CatsEffectSuite with RedisSpec: .toList .map(_.head) + private lazy val stringGen: Gen[String] = + Gen + .chooseNum(3, 10) + .flatMap(Gen.stringOfN(_, alphaChar)) + private def makeStreamingConnection( client: RedisClient ): Stream[IO, Streaming[[A] =>> Stream[IO, A], String, ByteVector]] = diff --git a/modules/renku-redis-client/src/main/scala/io/renku/queue/client/QueueClient.scala b/modules/renku-redis-client/src/main/scala/io/renku/queue/client/QueueClient.scala index 458e93c2..fcd0d631 100644 --- a/modules/renku-redis-client/src/main/scala/io/renku/queue/client/QueueClient.scala +++ b/modules/renku-redis-client/src/main/scala/io/renku/queue/client/QueueClient.scala @@ -45,6 +45,10 @@ trait QueueClient[F[_]]: def findLastProcessed(clientId: ClientId, queueName: QueueName): F[Option[MessageId]] + def getSize(queueName: QueueName): F[Long] + + def getSize(queueName: QueueName, from: MessageId): F[Long] + object QueueClient: def make[F[_]: Async](redisConfig: RedisConfig): Resource[F, QueueClient[F]] = RedisQueueClient.make[F](redisConfig).map(new QueueClientImpl[F](_)) diff --git a/modules/renku-redis-client/src/main/scala/io/renku/queue/client/QueueClientImpl.scala b/modules/renku-redis-client/src/main/scala/io/renku/queue/client/QueueClientImpl.scala index 5c401d0f..bc477930 100644 --- a/modules/renku-redis-client/src/main/scala/io/renku/queue/client/QueueClientImpl.scala +++ b/modules/renku-redis-client/src/main/scala/io/renku/queue/client/QueueClientImpl.scala @@ -82,3 +82,9 @@ private class QueueClientImpl[F[_]: Async](redisQueueClient: RedisQueueClient[F] queueName: QueueName ): F[Option[MessageId]] = redisQueueClient.findLastProcessed(clientId, queueName) + + override def getSize(queueName: QueueName): F[Long] = + redisQueueClient.getSize(queueName) + + override def getSize(queueName: QueueName, from: MessageId): F[Long] = + redisQueueClient.getSize(queueName, from) diff --git a/modules/search-api/src/main/scala/io/renku/search/api/Microservice.scala b/modules/search-api/src/main/scala/io/renku/search/api/Microservice.scala index b6b19c67..6f1f6b59 100644 --- a/modules/search-api/src/main/scala/io/renku/search/api/Microservice.scala +++ b/modules/search-api/src/main/scala/io/renku/search/api/Microservice.scala @@ -20,6 +20,7 @@ package io.renku.search.api import cats.effect.{ExitCode, IO, IOApp} import io.renku.logging.LoggingSetup +import io.renku.search.http.HttpServer object Microservice extends IOApp: @@ -29,7 +30,7 @@ object Microservice extends IOApp: for { config <- loadConfig _ <- IO(LoggingSetup.doConfigure(config.verbosity)) - _ <- HttpApplication[IO](config.solrConfig) - .flatMap(HttpServer.build) + _ <- Routes[IO](config.solrConfig) + .flatMap(HttpServer.build(_, config.httpServerConfig)) .use(_ => IO.never) } yield ExitCode.Success diff --git a/modules/search-api/src/main/scala/io/renku/search/api/Routes.scala b/modules/search-api/src/main/scala/io/renku/search/api/Routes.scala new file mode 100644 index 00000000..e4b47885 --- /dev/null +++ b/modules/search-api/src/main/scala/io/renku/search/api/Routes.scala @@ -0,0 +1,58 @@ +/* + * Copyright 2024 Swiss Data Science Center (SDSC) + * A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and + * Eidgenössische Technische Hochschule Zürich (ETHZ). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.renku.search.api + +import cats.effect.{Async, Resource} +import cats.syntax.all.* +import fs2.io.net.Network +import io.renku.search.api.routes.* +import io.renku.search.http.metrics.MetricsRoutes +import io.renku.search.http.routes.OperationRoutes +import io.renku.search.metrics.CollectorRegistryBuilder +import io.renku.solr.client.SolrConfig +import org.http4s.HttpRoutes +import org.http4s.server.Router + +private object Routes: + + def apply[F[_]: Async: Network](solrConfig: SolrConfig): Resource[F, HttpRoutes[F]] = + for + search <- SearchApi[F](solrConfig).map(api => SearchRoutes[F](api)) + metricsRoutes <- MetricsRoutes[F](CollectorRegistryBuilder[F].withJVMMetrics) + .makeRoutes(search.routes) + yield new Routes[F](search, metricsRoutes).routes + +final private class Routes[F[_]: Async]( + searchRoutes: SearchRoutes[F], + metricsRoutes: HttpRoutes[F] +): + + private val prefix = "/search" + + private val openapi = + OpenApiRoute[F](s"/api$prefix", "Renku Search API", searchRoutes.endpoints) + + private lazy val searchAndOperationRoutes = + Router[F]( + prefix -> (openapi.routes <+> searchRoutes.routes), + "/" -> OperationRoutes[F] + ) + + lazy val routes: HttpRoutes[F] = + searchAndOperationRoutes <+> metricsRoutes diff --git a/modules/search-api/src/main/scala/io/renku/search/api/SearchApiConfig.scala b/modules/search-api/src/main/scala/io/renku/search/api/SearchApiConfig.scala index 0d607a12..8a9b2992 100644 --- a/modules/search-api/src/main/scala/io/renku/search/api/SearchApiConfig.scala +++ b/modules/search-api/src/main/scala/io/renku/search/api/SearchApiConfig.scala @@ -20,14 +20,21 @@ package io.renku.search.api import cats.syntax.all.* import ciris.{ConfigValue, Effect} +import com.comcast.ip4s.port import io.renku.search.config.ConfigValues +import io.renku.search.http.HttpServerConfig import io.renku.solr.client.SolrConfig final case class SearchApiConfig( solrConfig: SolrConfig, + httpServerConfig: HttpServerConfig, verbosity: Int ) object SearchApiConfig: val config: ConfigValue[Effect, SearchApiConfig] = - (ConfigValues.solrConfig, ConfigValues.logLevel).mapN(SearchApiConfig.apply) + ( + ConfigValues.solrConfig, + ConfigValues.httpServerConfig(port"8080"), + ConfigValues.logLevel + ).mapN(SearchApiConfig.apply) diff --git a/modules/search-api/src/main/scala/io/renku/search/api/routes/OpenApiRoute.scala b/modules/search-api/src/main/scala/io/renku/search/api/routes/OpenApiRoute.scala index 18111be2..d31f225b 100644 --- a/modules/search-api/src/main/scala/io/renku/search/api/routes/OpenApiRoute.scala +++ b/modules/search-api/src/main/scala/io/renku/search/api/routes/OpenApiRoute.scala @@ -21,15 +21,15 @@ package io.renku.search.api.routes import cats.effect.Async import cats.syntax.all.* import io.circe.syntax.given +import io.renku.search.BuildInfo +import org.http4s.HttpRoutes import sttp.apispec.openapi.Server import sttp.apispec.openapi.circe.given import sttp.tapir.* import sttp.tapir.docs.openapi.OpenAPIDocsInterpreter import sttp.tapir.server.ServerEndpoint -import sttp.tapir.server.http4s.Http4sServerInterpreter -import sttp.tapir.server.http4s.Http4sServerOptions +import sttp.tapir.server.http4s.{Http4sServerInterpreter, Http4sServerOptions} import sttp.tapir.server.interceptor.cors.CORSInterceptor -import io.renku.search.BuildInfo final class OpenApiRoute[F[_]: Async]( prefix: String, @@ -52,7 +52,7 @@ final class OpenApiRoute[F[_]: Async]( .description("OpenAPI docs") .serverLogic(_ => docs.asJson.spaces2.asRight.pure[F]) - def routes = + val routes: HttpRoutes[F] = Http4sServerInterpreter[F]( Http4sServerOptions.customiseInterceptors .corsInterceptor(CORSInterceptor.default) diff --git a/modules/search-provision/src/main/scala/io/renku/search/provision/Microservice.scala b/modules/search-provision/src/main/scala/io/renku/search/provision/Microservice.scala index 79bb3d5c..403faa14 100644 --- a/modules/search-provision/src/main/scala/io/renku/search/provision/Microservice.scala +++ b/modules/search-provision/src/main/scala/io/renku/search/provision/Microservice.scala @@ -20,12 +20,15 @@ package io.renku.search.provision import cats.effect.* import cats.syntax.all.* - import io.renku.logging.LoggingSetup +import io.renku.search.http.HttpServer +import io.renku.search.metrics.CollectorRegistryBuilder +import io.renku.search.provision.metrics.{MetricsCollectorsUpdater, RedisMetrics} import io.renku.search.solr.schema.Migrations import io.renku.solr.client.migration.SchemaMigrator object Microservice extends IOApp: + private val logger = scribe.cats.io override def run(args: List[String]): IO[ExitCode] = @@ -33,14 +36,37 @@ object Microservice extends IOApp: for { _ <- IO(LoggingSetup.doConfigure(services.config.verbosity)) _ <- runSolrMigrations(services.config) + registryBuilder = CollectorRegistryBuilder[IO].withJVMMetrics + .add(RedisMetrics.queueSizeGauge) + .add(RedisMetrics.unprocessedGauge) + metrics = metricsUpdaterTask(services) + httpServer = httpServerTask(registryBuilder, services.config) + tasks = services.messageHandlers.getAll + metrics + httpServer pm <- BackgroundProcessManage[IO](services.config.retryOnErrorDelay) - tasks = services.messageHandlers.getAll.toList - _ <- tasks.traverse_(pm.register.tupled) + _ <- tasks.toList.traverse_(pm.register.tupled) _ <- pm.startAll } yield ExitCode.Success } - def runSolrMigrations(cfg: SearchProvisionConfig): IO[Unit] = + private def httpServerTask( + registryBuilder: CollectorRegistryBuilder[IO], + config: SearchProvisionConfig + ) = + val io = Routes[IO](registryBuilder) + .flatMap(HttpServer.build(_, config.httpServerConfig)) + .use(_ => IO.never) + "http server" -> io + + private def metricsUpdaterTask(services: Services[IO]) = + val io = MetricsCollectorsUpdater[IO]( + services.config.clientId, + services.config.queuesConfig, + services.config.metricsUpdateInterval, + services.queueClient + ).run() + "metrics updater" -> io + + private def runSolrMigrations(cfg: SearchProvisionConfig): IO[Unit] = SchemaMigrator[IO](cfg.solrConfig) .use(_.migrate(Migrations.all)) .handleErrorWith { err => diff --git a/modules/search-provision/src/main/scala/io/renku/search/provision/QueuesConfig.scala b/modules/search-provision/src/main/scala/io/renku/search/provision/QueuesConfig.scala index 043ae26f..10575692 100644 --- a/modules/search-provision/src/main/scala/io/renku/search/provision/QueuesConfig.scala +++ b/modules/search-provision/src/main/scala/io/renku/search/provision/QueuesConfig.scala @@ -33,7 +33,18 @@ final case class QueuesConfig( userAdded: QueueName, userUpdated: QueueName, userRemoved: QueueName -) +): + lazy val all: Set[QueueName] = Set( + projectCreated, + projectUpdated, + projectRemoved, + projectAuthorizationAdded, + projectAuthorizationUpdated, + projectAuthorizationRemoved, + userAdded, + userUpdated, + userRemoved + ) object QueuesConfig: val config: ConfigValue[Effect, QueuesConfig] = diff --git a/modules/search-api/src/main/scala/io/renku/search/api/HttpApplication.scala b/modules/search-provision/src/main/scala/io/renku/search/provision/Routes.scala similarity index 57% rename from modules/search-api/src/main/scala/io/renku/search/api/HttpApplication.scala rename to modules/search-provision/src/main/scala/io/renku/search/provision/Routes.scala index 628731c8..8401e91c 100644 --- a/modules/search-api/src/main/scala/io/renku/search/api/HttpApplication.scala +++ b/modules/search-provision/src/main/scala/io/renku/search/provision/Routes.scala @@ -16,33 +16,31 @@ * limitations under the License. */ -package io.renku.search.api +package io.renku.search.provision import cats.effect.{Async, Resource} import cats.syntax.all.* import fs2.io.net.Network -import io.renku.solr.client.SolrConfig -import org.http4s.dsl.Http4sDsl +import io.renku.search.http.metrics.MetricsRoutes +import io.renku.search.http.routes.OperationRoutes +import io.renku.search.metrics.CollectorRegistryBuilder +import org.http4s.HttpRoutes import org.http4s.server.Router -import org.http4s.{HttpApp, HttpRoutes, Response} -import io.renku.search.api.routes.* -object HttpApplication: - def apply[F[_]: Async: Network]( - solrConfig: SolrConfig - ): Resource[F, HttpApp[F]] = - SearchApi[F](solrConfig).map(new HttpApplication[F](_).router) - -final class HttpApplication[F[_]: Async](searchApi: SearchApi[F]) extends Http4sDsl[F]: +private object Routes: - private val prefix = "/search" + def apply[F[_]: Async: Network]( + registryBuilder: CollectorRegistryBuilder[F] + ): Resource[F, HttpRoutes[F]] = + MetricsRoutes[F](registryBuilder).makeRoutes + .map(new Routes[F](_).routes) - private val search = SearchRoutes[F](searchApi) - private val openapi = - OpenApiRoute[F](s"/api$prefix", "Renku Search API", search.endpoints) +final private class Routes[F[_]: Async](metricsRoutes: HttpRoutes[F]): - lazy val router: HttpApp[F] = + private lazy val operationRoutes = Router[F]( - prefix -> (openapi.routes <+> search.routes), "/" -> OperationRoutes[F] - ).orNotFound + ) + + lazy val routes: HttpRoutes[F] = + operationRoutes <+> metricsRoutes diff --git a/modules/search-provision/src/main/scala/io/renku/search/provision/SearchProvisionConfig.scala b/modules/search-provision/src/main/scala/io/renku/search/provision/SearchProvisionConfig.scala index fb68944a..fbc0b798 100644 --- a/modules/search-provision/src/main/scala/io/renku/search/provision/SearchProvisionConfig.scala +++ b/modules/search-provision/src/main/scala/io/renku/search/provision/SearchProvisionConfig.scala @@ -20,8 +20,10 @@ package io.renku.search.provision import cats.syntax.all.* import ciris.{ConfigValue, Effect} -import io.renku.redis.client.RedisConfig +import com.comcast.ip4s.port +import io.renku.redis.client.{ClientId, RedisConfig} import io.renku.search.config.ConfigValues +import io.renku.search.http.HttpServerConfig import io.renku.solr.client.SolrConfig import scala.concurrent.duration.FiniteDuration @@ -30,8 +32,11 @@ final case class SearchProvisionConfig( redisConfig: RedisConfig, solrConfig: SolrConfig, retryOnErrorDelay: FiniteDuration, + metricsUpdateInterval: FiniteDuration, verbosity: Int, - queuesConfig: QueuesConfig + queuesConfig: QueuesConfig, + httpServerConfig: HttpServerConfig, + clientId: ClientId ) object SearchProvisionConfig: @@ -41,6 +46,9 @@ object SearchProvisionConfig: ConfigValues.redisConfig, ConfigValues.solrConfig, ConfigValues.retryOnErrorDelay, + ConfigValues.metricsUpdateInterval, ConfigValues.logLevel, - QueuesConfig.config + QueuesConfig.config, + ConfigValues.httpServerConfig(defaultPort = port"8081"), + ConfigValues.clientId(ClientId("search-provisioner")) ).mapN(SearchProvisionConfig.apply) diff --git a/modules/search-provision/src/main/scala/io/renku/search/provision/Services.scala b/modules/search-provision/src/main/scala/io/renku/search/provision/Services.scala index e94e593b..99ef27c6 100644 --- a/modules/search-provision/src/main/scala/io/renku/search/provision/Services.scala +++ b/modules/search-provision/src/main/scala/io/renku/search/provision/Services.scala @@ -18,12 +18,9 @@ package io.renku.search.provision -import cats.effect.kernel.Async -import cats.effect.kernel.Resource +import cats.effect.kernel.{Async, Resource} import fs2.io.net.Network - import io.renku.queue.client.QueueClient -import io.renku.redis.client.ClientId import io.renku.search.provision.handler.PipelineSteps import io.renku.search.solr.client.SearchSolrClient @@ -35,7 +32,7 @@ final case class Services[F[_]]( ) object Services: - val clientId: ClientId = ClientId("search-provisioner") + def make[F[_]: Async: Network]: Resource[F, Services[F]] = for { cfg <- Resource.eval(SearchProvisionConfig.config.load[F]) @@ -45,6 +42,12 @@ object Services: // be able to connect to the cluster redis = QueueClient.make[F](cfg.redisConfig) - steps = PipelineSteps[F](solr, redis, cfg.queuesConfig, 1, clientId) + steps = PipelineSteps[F]( + solr, + redis, + cfg.queuesConfig, + inChunkSize = 1, + cfg.clientId + ) handlers = MessageHandlers[F](steps, cfg.queuesConfig) } yield Services(cfg, solr, redis, handlers) diff --git a/modules/search-provision/src/main/scala/io/renku/search/provision/metrics/CollectorUpdater.scala b/modules/search-provision/src/main/scala/io/renku/search/provision/metrics/CollectorUpdater.scala new file mode 100644 index 00000000..b6679926 --- /dev/null +++ b/modules/search-provision/src/main/scala/io/renku/search/provision/metrics/CollectorUpdater.scala @@ -0,0 +1,24 @@ +/* + * Copyright 2024 Swiss Data Science Center (SDSC) + * A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and + * Eidgenössische Technische Hochschule Zürich (ETHZ). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.renku.search.provision.metrics + +import io.renku.redis.client.QueueName + +private trait CollectorUpdater[F[_]]: + def update(queueName: QueueName): F[Unit] diff --git a/modules/search-provision/src/main/scala/io/renku/search/provision/metrics/MetricsCollectorsUpdater.scala b/modules/search-provision/src/main/scala/io/renku/search/provision/metrics/MetricsCollectorsUpdater.scala new file mode 100644 index 00000000..fb85ca58 --- /dev/null +++ b/modules/search-provision/src/main/scala/io/renku/search/provision/metrics/MetricsCollectorsUpdater.scala @@ -0,0 +1,70 @@ +/* + * Copyright 2024 Swiss Data Science Center (SDSC) + * A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and + * Eidgenössische Technische Hochschule Zürich (ETHZ). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.renku.search.provision.metrics + +import cats.effect.{Async, Resource} +import cats.syntax.all.* +import fs2.Stream +import io.renku.queue.client.QueueClient +import io.renku.redis.client.ClientId +import io.renku.search.provision.QueuesConfig + +import scala.concurrent.duration.FiniteDuration + +object MetricsCollectorsUpdater: + + def apply[F[_]: Async]( + clientId: ClientId, + queuesConfig: QueuesConfig, + updateInterval: FiniteDuration, + qcResource: Resource[F, QueueClient[F]] + ): MetricsCollectorsUpdater[F] = + new MetricsCollectorsUpdater[F]( + qcResource, + queuesConfig, + RedisMetrics.updaterFactories(clientId), + updateInterval + ) + +class MetricsCollectorsUpdater[F[_]: Async]( + qcResource: Resource[F, QueueClient[F]], + queuesConfig: QueuesConfig, + collectors: List[QueueClient[F] => CollectorUpdater[F]], + updateInterval: FiniteDuration +): + + private val allQueues = queuesConfig.all.toList + + def run(): F[Unit] = + val queueClient: Stream[F, QueueClient[F]] = + Stream.resource(qcResource) + val awake: Stream[F, Unit] = + Stream.awakeEvery[F](updateInterval).void + queueClient + .flatTap(_ => awake) + .evalMap(runUpdate) + .compile + .drain + + private def runUpdate(qc: QueueClient[F]) = + allQueues.traverse_ { q => + collectors + .map(_.apply(qc)) + .traverse_(_.update(q)) + } diff --git a/modules/search-provision/src/main/scala/io/renku/search/provision/metrics/QueueGauge.scala b/modules/search-provision/src/main/scala/io/renku/search/provision/metrics/QueueGauge.scala new file mode 100644 index 00000000..73791a2c --- /dev/null +++ b/modules/search-provision/src/main/scala/io/renku/search/provision/metrics/QueueGauge.scala @@ -0,0 +1,25 @@ +/* + * Copyright 2024 Swiss Data Science Center (SDSC) + * A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and + * Eidgenössische Technische Hochschule Zürich (ETHZ). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.renku.search.provision.metrics + +import io.renku.redis.client.QueueName +import io.renku.search.metrics.Collector + +private trait QueueGauge extends Collector: + def set(q: QueueName, v: Double): Unit diff --git a/modules/search-provision/src/main/scala/io/renku/search/provision/metrics/QueueSizeGauge.scala b/modules/search-provision/src/main/scala/io/renku/search/provision/metrics/QueueSizeGauge.scala new file mode 100644 index 00000000..e9e789d1 --- /dev/null +++ b/modules/search-provision/src/main/scala/io/renku/search/provision/metrics/QueueSizeGauge.scala @@ -0,0 +1,37 @@ +/* + * Copyright 2024 Swiss Data Science Center (SDSC) + * A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and + * Eidgenössische Technische Hochschule Zürich (ETHZ). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.renku.search.provision.metrics + +import io.prometheus.client.Gauge +import io.renku.redis.client.QueueName + +private class QueueSizeGauge extends QueueGauge: + + private[this] val underlying = + Gauge + .build() + .name("redis_stream_size") + .help("Total number of items in a stream") + .labelNames("queue_name") + .create() + + override val asJCollector: Gauge = underlying + + override def set(q: QueueName, v: Double): Unit = + underlying.labels(q.name).set(v) diff --git a/modules/search-provision/src/main/scala/io/renku/search/provision/metrics/QueueSizeGaugeUpdater.scala b/modules/search-provision/src/main/scala/io/renku/search/provision/metrics/QueueSizeGaugeUpdater.scala new file mode 100644 index 00000000..9bf953ea --- /dev/null +++ b/modules/search-provision/src/main/scala/io/renku/search/provision/metrics/QueueSizeGaugeUpdater.scala @@ -0,0 +1,32 @@ +/* + * Copyright 2024 Swiss Data Science Center (SDSC) + * A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and + * Eidgenössische Technische Hochschule Zürich (ETHZ). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.renku.search.provision.metrics + +import cats.Monad +import cats.syntax.all.* +import io.renku.queue.client.QueueClient +import io.renku.redis.client.QueueName + +private class QueueSizeGaugeUpdater[F[_]: Monad]( + qc: QueueClient[F], + gauge: QueueSizeGauge +) extends CollectorUpdater[F]: + + override def update(queueName: QueueName): F[Unit] = + qc.getSize(queueName).map(s => gauge.set(queueName, s.toDouble)) diff --git a/modules/search-provision/src/main/scala/io/renku/search/provision/metrics/RedisMetrics.scala b/modules/search-provision/src/main/scala/io/renku/search/provision/metrics/RedisMetrics.scala new file mode 100644 index 00000000..0cc9ece9 --- /dev/null +++ b/modules/search-provision/src/main/scala/io/renku/search/provision/metrics/RedisMetrics.scala @@ -0,0 +1,36 @@ +/* + * Copyright 2024 Swiss Data Science Center (SDSC) + * A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and + * Eidgenössische Technische Hochschule Zürich (ETHZ). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.renku.search.provision.metrics + +import cats.Monad +import io.renku.queue.client.QueueClient +import io.renku.redis.client.ClientId + +object RedisMetrics: + + val queueSizeGauge: QueueSizeGauge = QueueSizeGauge() + val unprocessedGauge: UnprocessedCountGauge = UnprocessedCountGauge() + + def updaterFactories[F[_]: Monad]( + clientId: ClientId + ): List[QueueClient[F] => CollectorUpdater[F]] = + List( + new QueueSizeGaugeUpdater[F](_, queueSizeGauge), + new UnprocessedCountGaugeUpdater[F](clientId, _, unprocessedGauge) + ) diff --git a/modules/search-provision/src/main/scala/io/renku/search/provision/metrics/UnprocessedCountGauge.scala b/modules/search-provision/src/main/scala/io/renku/search/provision/metrics/UnprocessedCountGauge.scala new file mode 100644 index 00000000..b76cace6 --- /dev/null +++ b/modules/search-provision/src/main/scala/io/renku/search/provision/metrics/UnprocessedCountGauge.scala @@ -0,0 +1,37 @@ +/* + * Copyright 2024 Swiss Data Science Center (SDSC) + * A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and + * Eidgenössische Technische Hochschule Zürich (ETHZ). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.renku.search.provision.metrics + +import io.prometheus.client.Gauge +import io.renku.redis.client.QueueName + +private class UnprocessedCountGauge extends QueueGauge: + + private[this] val underlying = + Gauge + .build() + .name("redis_stream_unprocessed") + .help("Total number of items left for processing in a stream") + .labelNames("queue_name") + .create() + + override val asJCollector: Gauge = underlying + + override def set(q: QueueName, v: Double): Unit = + underlying.labels(q.name).set(v) diff --git a/modules/search-provision/src/main/scala/io/renku/search/provision/metrics/UnprocessedCountGaugeUpdater.scala b/modules/search-provision/src/main/scala/io/renku/search/provision/metrics/UnprocessedCountGaugeUpdater.scala new file mode 100644 index 00000000..175dd82a --- /dev/null +++ b/modules/search-provision/src/main/scala/io/renku/search/provision/metrics/UnprocessedCountGaugeUpdater.scala @@ -0,0 +1,39 @@ +/* + * Copyright 2024 Swiss Data Science Center (SDSC) + * A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and + * Eidgenössische Technische Hochschule Zürich (ETHZ). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.renku.search.provision.metrics + +import cats.Monad +import cats.syntax.all.* +import io.renku.queue.client.QueueClient +import io.renku.redis.client.{ClientId, QueueName} + +private class UnprocessedCountGaugeUpdater[F[_]: Monad]( + clientId: ClientId, + rc: QueueClient[F], + gauge: UnprocessedCountGauge +) extends CollectorUpdater[F]: + + override def update(queueName: QueueName): F[Unit] = + rc + .findLastProcessed(clientId, queueName) + .flatMap { + case None => rc.getSize(queueName) + case Some(lm) => rc.getSize(queueName, lm) + } + .map(s => gauge.set(queueName, s.toDouble)) diff --git a/project/Dependencies.scala b/project/Dependencies.scala index dda32c1f..76c24803 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -16,6 +16,7 @@ object Dependencies { val ducktape = "0.1.11" val fs2 = "3.10.0" val http4s = "0.23.26" + val http4sPrometheusMetrics = "0.24.6" val redis4Cats = "1.6.0" val scalacheckEffectMunit = "1.0.4" val scodec = "2.2.2" @@ -102,6 +103,10 @@ object Dependencies { val http4sServer = Seq( "org.http4s" %% "http4s-ember-server" % V.http4s ) + val http4sPrometheusMetrics = Seq( + "org.http4s" %% "http4s-server" % V.http4s, + "org.http4s" %% "http4s-prometheus-metrics" % V.http4sPrometheusMetrics + ) val scribe = Seq( "com.outr" %% "scribe" % V.scribe,