Skip to content

Commit

Permalink
feat: metrics for solr state (#79)
Browse files Browse the repository at this point in the history
  • Loading branch information
jachro authored Mar 28, 2024
1 parent 33431df commit 404938e
Show file tree
Hide file tree
Showing 8 changed files with 273 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ final case class CollectorRegistryBuilder[F[_]: Sync](
def add(c: Collector): CollectorRegistryBuilder[F] =
copy(collectors = collectors + c)

def addAll(c: Iterable[Collector]): CollectorRegistryBuilder[F] =
copy(collectors = collectors ++ c)

def makeRegistry: Resource[F, CollectorRegistry] =
val registry = new CollectorRegistry()
(registerJVM(registry) >> registerCollectors(registry))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import cats.syntax.all.*
import io.renku.logging.LoggingSetup
import io.renku.search.http.HttpServer
import io.renku.search.metrics.CollectorRegistryBuilder
import io.renku.search.provision.metrics.{MetricsCollectorsUpdater, RedisMetrics}
import io.renku.search.provision.metrics.*
import io.renku.search.solr.schema.Migrations
import io.renku.solr.client.migration.SchemaMigrator

Expand All @@ -41,6 +41,7 @@ object Microservice extends IOApp:
registryBuilder = CollectorRegistryBuilder[IO].withJVMMetrics
.add(RedisMetrics.queueSizeGauge)
.add(RedisMetrics.unprocessedGauge)
.addAll(SolrMetrics.allCollectors)
metrics = metricsUpdaterTask(services)
httpServer = httpServerTask(registryBuilder, services.config)
tasks = services.messageHandlers.getAll + metrics + httpServer
Expand All @@ -64,14 +65,15 @@ object Microservice extends IOApp:
val io =
if (updateInterval <= Duration.Zero)
logger.info(
s"Metric update interval is ${updateInterval}, disable periodic metric task"
s"Metric update interval is $updateInterval, disable periodic metric task"
)
else
MetricsCollectorsUpdater[IO](
services.config.clientId,
services.config.queuesConfig,
updateInterval,
services.queueClient
services.queueClient,
services.solrClient
).run()
"metrics updater" -> io

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright 2024 Swiss Data Science Center (SDSC)
* A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and
* Eidgenössische Technische Hochschule Zürich (ETHZ).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.renku.search.provision.metrics

import io.prometheus.client.Gauge
import io.renku.search.metrics.Collector
import io.renku.search.model.EntityType
import io.renku.search.solr.documents.DocumentKind

private class DocumentKindGauge(val entityType: EntityType) extends Collector:

private val underlying =
Gauge
.build()
.name(s"solr_${entityType.name.toLowerCase}_by_kind")
.help(s"Total number of '$entityType' entities by kind")
.labelNames("kind")
.create()

override val asJCollector: Gauge = underlying

def set(l: DocumentKind, v: Double): Unit =
underlying.labels(l.name).set(v)
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Copyright 2024 Swiss Data Science Center (SDSC)
* A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and
* Eidgenössische Technische Hochschule Zürich (ETHZ).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.renku.search.provision.metrics

import cats.MonadThrow
import cats.syntax.all.*
import io.renku.search.solr.client.SearchSolrClient
import io.renku.search.solr.documents.DocumentKind
import io.renku.search.solr.query.SolrToken
import io.renku.search.solr.query.SolrToken.entityTypeIs
import io.renku.search.solr.schema.EntityDocumentSchema.Fields
import io.renku.solr.client.QueryData
import io.renku.solr.client.facet.{Facet, Facets}
import io.renku.solr.client.schema.FieldName

private class DocumentKindGaugeUpdater[F[_]: MonadThrow](
sc: SearchSolrClient[F],
gauge: DocumentKindGauge
):

private val kindsField = FieldName("kinds")
private val queryData =
QueryData(
entityTypeIs(gauge.entityType).value,
filter = Seq.empty,
limit = 0,
offset = 0,
facet = Facets(Facet.Terms(kindsField, Fields.kind))
)

def update(): F[Unit] =
sc.query[Null](queryData)
.flatMap {
_.facetResponse
.flatMap(_.buckets.get(kindsField))
.map(_.buckets.toList)
.sequence
.flatten
.map(b => toDocumentKind(b.value).tupleRight(b.count.toDouble))
.sequence
.map(_.foreach { case (k, c) => gauge.set(k, c) })
}

private def toDocumentKind(v: String): F[DocumentKind] =
MonadThrow[F]
.fromEither(DocumentKind.fromString(v).leftMap(new IllegalArgumentException(_)))
Original file line number Diff line number Diff line change
Expand Up @@ -18,52 +18,38 @@

package io.renku.search.provision.metrics

import cats.NonEmptyParallel
import cats.effect.Async
import cats.syntax.all.*
import fs2.Stream
import io.renku.queue.client.QueueClient
import io.renku.redis.client.ClientId
import io.renku.search.provision.QueuesConfig
import io.renku.search.solr.client.SearchSolrClient

import scala.concurrent.duration.FiniteDuration

object MetricsCollectorsUpdater:

def apply[F[_]: Async](
def apply[F[_]: Async: NonEmptyParallel](
clientId: ClientId,
queuesConfig: QueuesConfig,
updateInterval: FiniteDuration,
queueClient: Stream[F, QueueClient[F]]
queueClient: Stream[F, QueueClient[F]],
solrClient: SearchSolrClient[F]
): MetricsCollectorsUpdater[F] =
new MetricsCollectorsUpdater[F](
queueClient,
queuesConfig,
RedisMetrics.updaterFactories(clientId),
updateInterval
RedisMetricsCollectorsUpdater[F](
clientId,
queuesConfig,
updateInterval,
queueClient
),
SolrMetricsCollectorsUpdater[F](updateInterval, solrClient)
)

class MetricsCollectorsUpdater[F[_]: Async](
queueClient: Stream[F, QueueClient[F]],
queuesConfig: QueuesConfig,
collectors: List[QueueClient[F] => CollectorUpdater[F]],
updateInterval: FiniteDuration
class MetricsCollectorsUpdater[F[_]: Async: NonEmptyParallel](
rcu: RedisMetricsCollectorsUpdater[F],
scu: SolrMetricsCollectorsUpdater[F]
):

private val allQueues = queuesConfig.all.toList

def run(): F[Unit] =
createUpdateStream.compile.drain

private def createUpdateStream: Stream[F, Unit] =
val awake: Stream[F, Unit] =
Stream.awakeEvery[F](updateInterval).void
queueClient
.flatTap(_ => awake)
.evalMap(runUpdate)

private def runUpdate(qc: QueueClient[F]) =
allQueues.traverse_ { q =>
collectors
.map(_.apply(qc))
.traverse_(_.update(q))
}
(rcu.run(), scu.run()).parTupled.void
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Copyright 2024 Swiss Data Science Center (SDSC)
* A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and
* Eidgenössische Technische Hochschule Zürich (ETHZ).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.renku.search.provision.metrics

import cats.effect.Async
import cats.syntax.all.*
import fs2.Stream
import io.renku.queue.client.QueueClient
import io.renku.redis.client.ClientId
import io.renku.search.provision.QueuesConfig

import scala.concurrent.duration.FiniteDuration

private object RedisMetricsCollectorsUpdater:

def apply[F[_]: Async](
clientId: ClientId,
queuesConfig: QueuesConfig,
updateInterval: FiniteDuration,
queueClient: Stream[F, QueueClient[F]]
): RedisMetricsCollectorsUpdater[F] =
new RedisMetricsCollectorsUpdater[F](
queueClient,
queuesConfig,
RedisMetrics.updaterFactories(clientId),
updateInterval
)

private class RedisMetricsCollectorsUpdater[F[_]: Async](
queueClient: Stream[F, QueueClient[F]],
queuesConfig: QueuesConfig,
collectors: List[QueueClient[F] => CollectorUpdater[F]],
updateInterval: FiniteDuration
):

private val allQueues = queuesConfig.all.toList

def run(): F[Unit] =
createUpdateStream.compile.drain

private def createUpdateStream: Stream[F, Unit] =
val awake: Stream[F, Unit] =
Stream.awakeEvery[F](updateInterval).void
queueClient
.flatTap(_ => awake)
.evalMap(runUpdate)

private def runUpdate(qc: QueueClient[F]) =
allQueues.traverse_ { q =>
collectors
.map(_.apply(qc))
.traverse_(_.update(q))
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright 2024 Swiss Data Science Center (SDSC)
* A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and
* Eidgenössische Technische Hochschule Zürich (ETHZ).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.renku.search.provision.metrics

import io.renku.search.model.EntityType
import io.renku.search.model.EntityType.*

object SolrMetrics:

val allCollectors: Set[DocumentKindGauge] =
EntityType.values.toSet.map(DocumentKindGauge.apply)
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Copyright 2024 Swiss Data Science Center (SDSC)
* A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and
* Eidgenössische Technische Hochschule Zürich (ETHZ).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.renku.search.provision.metrics

import cats.effect.Async
import cats.syntax.all.*
import fs2.Stream
import io.renku.search.provision.metrics.SolrMetrics.*
import io.renku.search.solr.client.SearchSolrClient

import scala.concurrent.duration.FiniteDuration

private object SolrMetricsCollectorsUpdater:

def apply[F[_]: Async](
updateInterval: FiniteDuration,
sc: SearchSolrClient[F]
): SolrMetricsCollectorsUpdater[F] =
new SolrMetricsCollectorsUpdater[F](
updateInterval,
allCollectors.map(DocumentKindGaugeUpdater(sc, _)).toList
)

private class SolrMetricsCollectorsUpdater[F[_]: Async](
updateInterval: FiniteDuration,
updaters: List[DocumentKindGaugeUpdater[F]]
):

def run(): F[Unit] =
Stream
.awakeEvery[F](updateInterval)
.evalMap(_ => updateCollectors)
.compile
.drain

private def updateCollectors = updaters.traverse_(_.update())

0 comments on commit 404938e

Please sign in to comment.