diff --git a/modules/http4s-metrics/src/main/scala/io/renku/search/metrics/CollectorRegistryBuilder.scala b/modules/http4s-metrics/src/main/scala/io/renku/search/metrics/CollectorRegistryBuilder.scala index a9afbf26..50f69918 100644 --- a/modules/http4s-metrics/src/main/scala/io/renku/search/metrics/CollectorRegistryBuilder.scala +++ b/modules/http4s-metrics/src/main/scala/io/renku/search/metrics/CollectorRegistryBuilder.scala @@ -34,6 +34,9 @@ final case class CollectorRegistryBuilder[F[_]: Sync]( def add(c: Collector): CollectorRegistryBuilder[F] = copy(collectors = collectors + c) + def addAll(c: Iterable[Collector]): CollectorRegistryBuilder[F] = + copy(collectors = collectors ++ c) + def makeRegistry: Resource[F, CollectorRegistry] = val registry = new CollectorRegistry() (registerJVM(registry) >> registerCollectors(registry)) diff --git a/modules/search-provision/src/main/scala/io/renku/search/provision/Microservice.scala b/modules/search-provision/src/main/scala/io/renku/search/provision/Microservice.scala index 021555f2..c99b359a 100644 --- a/modules/search-provision/src/main/scala/io/renku/search/provision/Microservice.scala +++ b/modules/search-provision/src/main/scala/io/renku/search/provision/Microservice.scala @@ -23,7 +23,7 @@ import cats.syntax.all.* import io.renku.logging.LoggingSetup import io.renku.search.http.HttpServer import io.renku.search.metrics.CollectorRegistryBuilder -import io.renku.search.provision.metrics.{MetricsCollectorsUpdater, RedisMetrics} +import io.renku.search.provision.metrics.* import io.renku.search.solr.schema.Migrations import io.renku.solr.client.migration.SchemaMigrator @@ -41,6 +41,7 @@ object Microservice extends IOApp: registryBuilder = CollectorRegistryBuilder[IO].withJVMMetrics .add(RedisMetrics.queueSizeGauge) .add(RedisMetrics.unprocessedGauge) + .addAll(SolrMetrics.allCollectors) metrics = metricsUpdaterTask(services) httpServer = httpServerTask(registryBuilder, services.config) tasks = services.messageHandlers.getAll + metrics + httpServer @@ -64,14 +65,15 @@ object Microservice extends IOApp: val io = if (updateInterval <= Duration.Zero) logger.info( - s"Metric update interval is ${updateInterval}, disable periodic metric task" + s"Metric update interval is $updateInterval, disable periodic metric task" ) else MetricsCollectorsUpdater[IO]( services.config.clientId, services.config.queuesConfig, updateInterval, - services.queueClient + services.queueClient, + services.solrClient ).run() "metrics updater" -> io diff --git a/modules/search-provision/src/main/scala/io/renku/search/provision/metrics/DocumentKindGauge.scala b/modules/search-provision/src/main/scala/io/renku/search/provision/metrics/DocumentKindGauge.scala new file mode 100644 index 00000000..993e02e1 --- /dev/null +++ b/modules/search-provision/src/main/scala/io/renku/search/provision/metrics/DocumentKindGauge.scala @@ -0,0 +1,39 @@ +/* + * Copyright 2024 Swiss Data Science Center (SDSC) + * A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and + * Eidgenössische Technische Hochschule Zürich (ETHZ). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.renku.search.provision.metrics + +import io.prometheus.client.Gauge +import io.renku.search.metrics.Collector +import io.renku.search.model.EntityType +import io.renku.search.solr.documents.DocumentKind + +private class DocumentKindGauge(val entityType: EntityType) extends Collector: + + private val underlying = + Gauge + .build() + .name(s"solr_${entityType.name.toLowerCase}_by_kind") + .help(s"Total number of '$entityType' entities by kind") + .labelNames("kind") + .create() + + override val asJCollector: Gauge = underlying + + def set(l: DocumentKind, v: Double): Unit = + underlying.labels(l.name).set(v) diff --git a/modules/search-provision/src/main/scala/io/renku/search/provision/metrics/DocumentKindGaugeUpdater.scala b/modules/search-provision/src/main/scala/io/renku/search/provision/metrics/DocumentKindGaugeUpdater.scala new file mode 100644 index 00000000..da4fe239 --- /dev/null +++ b/modules/search-provision/src/main/scala/io/renku/search/provision/metrics/DocumentKindGaugeUpdater.scala @@ -0,0 +1,62 @@ +/* + * Copyright 2024 Swiss Data Science Center (SDSC) + * A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and + * Eidgenössische Technische Hochschule Zürich (ETHZ). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.renku.search.provision.metrics + +import cats.MonadThrow +import cats.syntax.all.* +import io.renku.search.solr.client.SearchSolrClient +import io.renku.search.solr.documents.DocumentKind +import io.renku.search.solr.query.SolrToken +import io.renku.search.solr.query.SolrToken.entityTypeIs +import io.renku.search.solr.schema.EntityDocumentSchema.Fields +import io.renku.solr.client.QueryData +import io.renku.solr.client.facet.{Facet, Facets} +import io.renku.solr.client.schema.FieldName + +private class DocumentKindGaugeUpdater[F[_]: MonadThrow]( + sc: SearchSolrClient[F], + gauge: DocumentKindGauge +): + + private val kindsField = FieldName("kinds") + private val queryData = + QueryData( + entityTypeIs(gauge.entityType).value, + filter = Seq.empty, + limit = 0, + offset = 0, + facet = Facets(Facet.Terms(kindsField, Fields.kind)) + ) + + def update(): F[Unit] = + sc.query[Null](queryData) + .flatMap { + _.facetResponse + .flatMap(_.buckets.get(kindsField)) + .map(_.buckets.toList) + .sequence + .flatten + .map(b => toDocumentKind(b.value).tupleRight(b.count.toDouble)) + .sequence + .map(_.foreach { case (k, c) => gauge.set(k, c) }) + } + + private def toDocumentKind(v: String): F[DocumentKind] = + MonadThrow[F] + .fromEither(DocumentKind.fromString(v).leftMap(new IllegalArgumentException(_))) diff --git a/modules/search-provision/src/main/scala/io/renku/search/provision/metrics/MetricsCollectorsUpdater.scala b/modules/search-provision/src/main/scala/io/renku/search/provision/metrics/MetricsCollectorsUpdater.scala index 8c6f6e4a..ab66da2b 100644 --- a/modules/search-provision/src/main/scala/io/renku/search/provision/metrics/MetricsCollectorsUpdater.scala +++ b/modules/search-provision/src/main/scala/io/renku/search/provision/metrics/MetricsCollectorsUpdater.scala @@ -18,52 +18,38 @@ package io.renku.search.provision.metrics +import cats.NonEmptyParallel import cats.effect.Async import cats.syntax.all.* import fs2.Stream import io.renku.queue.client.QueueClient import io.renku.redis.client.ClientId import io.renku.search.provision.QueuesConfig +import io.renku.search.solr.client.SearchSolrClient import scala.concurrent.duration.FiniteDuration object MetricsCollectorsUpdater: - - def apply[F[_]: Async]( + def apply[F[_]: Async: NonEmptyParallel]( clientId: ClientId, queuesConfig: QueuesConfig, updateInterval: FiniteDuration, - queueClient: Stream[F, QueueClient[F]] + queueClient: Stream[F, QueueClient[F]], + solrClient: SearchSolrClient[F] ): MetricsCollectorsUpdater[F] = new MetricsCollectorsUpdater[F]( - queueClient, - queuesConfig, - RedisMetrics.updaterFactories(clientId), - updateInterval + RedisMetricsCollectorsUpdater[F]( + clientId, + queuesConfig, + updateInterval, + queueClient + ), + SolrMetricsCollectorsUpdater[F](updateInterval, solrClient) ) -class MetricsCollectorsUpdater[F[_]: Async]( - queueClient: Stream[F, QueueClient[F]], - queuesConfig: QueuesConfig, - collectors: List[QueueClient[F] => CollectorUpdater[F]], - updateInterval: FiniteDuration +class MetricsCollectorsUpdater[F[_]: Async: NonEmptyParallel]( + rcu: RedisMetricsCollectorsUpdater[F], + scu: SolrMetricsCollectorsUpdater[F] ): - - private val allQueues = queuesConfig.all.toList - def run(): F[Unit] = - createUpdateStream.compile.drain - - private def createUpdateStream: Stream[F, Unit] = - val awake: Stream[F, Unit] = - Stream.awakeEvery[F](updateInterval).void - queueClient - .flatTap(_ => awake) - .evalMap(runUpdate) - - private def runUpdate(qc: QueueClient[F]) = - allQueues.traverse_ { q => - collectors - .map(_.apply(qc)) - .traverse_(_.update(q)) - } + (rcu.run(), scu.run()).parTupled.void diff --git a/modules/search-provision/src/main/scala/io/renku/search/provision/metrics/RedisMetricsCollectorsUpdater.scala b/modules/search-provision/src/main/scala/io/renku/search/provision/metrics/RedisMetricsCollectorsUpdater.scala new file mode 100644 index 00000000..e735c76a --- /dev/null +++ b/modules/search-provision/src/main/scala/io/renku/search/provision/metrics/RedisMetricsCollectorsUpdater.scala @@ -0,0 +1,69 @@ +/* + * Copyright 2024 Swiss Data Science Center (SDSC) + * A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and + * Eidgenössische Technische Hochschule Zürich (ETHZ). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.renku.search.provision.metrics + +import cats.effect.Async +import cats.syntax.all.* +import fs2.Stream +import io.renku.queue.client.QueueClient +import io.renku.redis.client.ClientId +import io.renku.search.provision.QueuesConfig + +import scala.concurrent.duration.FiniteDuration + +private object RedisMetricsCollectorsUpdater: + + def apply[F[_]: Async]( + clientId: ClientId, + queuesConfig: QueuesConfig, + updateInterval: FiniteDuration, + queueClient: Stream[F, QueueClient[F]] + ): RedisMetricsCollectorsUpdater[F] = + new RedisMetricsCollectorsUpdater[F]( + queueClient, + queuesConfig, + RedisMetrics.updaterFactories(clientId), + updateInterval + ) + +private class RedisMetricsCollectorsUpdater[F[_]: Async]( + queueClient: Stream[F, QueueClient[F]], + queuesConfig: QueuesConfig, + collectors: List[QueueClient[F] => CollectorUpdater[F]], + updateInterval: FiniteDuration +): + + private val allQueues = queuesConfig.all.toList + + def run(): F[Unit] = + createUpdateStream.compile.drain + + private def createUpdateStream: Stream[F, Unit] = + val awake: Stream[F, Unit] = + Stream.awakeEvery[F](updateInterval).void + queueClient + .flatTap(_ => awake) + .evalMap(runUpdate) + + private def runUpdate(qc: QueueClient[F]) = + allQueues.traverse_ { q => + collectors + .map(_.apply(qc)) + .traverse_(_.update(q)) + } diff --git a/modules/search-provision/src/main/scala/io/renku/search/provision/metrics/SolrMetrics.scala b/modules/search-provision/src/main/scala/io/renku/search/provision/metrics/SolrMetrics.scala new file mode 100644 index 00000000..56e8d4f6 --- /dev/null +++ b/modules/search-provision/src/main/scala/io/renku/search/provision/metrics/SolrMetrics.scala @@ -0,0 +1,27 @@ +/* + * Copyright 2024 Swiss Data Science Center (SDSC) + * A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and + * Eidgenössische Technische Hochschule Zürich (ETHZ). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.renku.search.provision.metrics + +import io.renku.search.model.EntityType +import io.renku.search.model.EntityType.* + +object SolrMetrics: + + val allCollectors: Set[DocumentKindGauge] = + EntityType.values.toSet.map(DocumentKindGauge.apply) diff --git a/modules/search-provision/src/main/scala/io/renku/search/provision/metrics/SolrMetricsCollectorsUpdater.scala b/modules/search-provision/src/main/scala/io/renku/search/provision/metrics/SolrMetricsCollectorsUpdater.scala new file mode 100644 index 00000000..c5b12b63 --- /dev/null +++ b/modules/search-provision/src/main/scala/io/renku/search/provision/metrics/SolrMetricsCollectorsUpdater.scala @@ -0,0 +1,52 @@ +/* + * Copyright 2024 Swiss Data Science Center (SDSC) + * A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and + * Eidgenössische Technische Hochschule Zürich (ETHZ). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.renku.search.provision.metrics + +import cats.effect.Async +import cats.syntax.all.* +import fs2.Stream +import io.renku.search.provision.metrics.SolrMetrics.* +import io.renku.search.solr.client.SearchSolrClient + +import scala.concurrent.duration.FiniteDuration + +private object SolrMetricsCollectorsUpdater: + + def apply[F[_]: Async]( + updateInterval: FiniteDuration, + sc: SearchSolrClient[F] + ): SolrMetricsCollectorsUpdater[F] = + new SolrMetricsCollectorsUpdater[F]( + updateInterval, + allCollectors.map(DocumentKindGaugeUpdater(sc, _)).toList + ) + +private class SolrMetricsCollectorsUpdater[F[_]: Async]( + updateInterval: FiniteDuration, + updaters: List[DocumentKindGaugeUpdater[F]] +): + + def run(): F[Unit] = + Stream + .awakeEvery[F](updateInterval) + .evalMap(_ => updateCollectors) + .compile + .drain + + private def updateCollectors = updaters.traverse_(_.update())