From d021d237271724a40f630e401540ec9c308438ca Mon Sep 17 00:00:00 2001 From: Jakub Chrobasik Date: Mon, 29 Jan 2024 10:39:46 +0100 Subject: [PATCH] feat: Microservice for search-provision service --- .../renku/search/provision/Microservice.scala | 64 +++++++++++++++++++ .../client/migration/SchemaMigrator.scala | 12 +++- 2 files changed, 74 insertions(+), 2 deletions(-) create mode 100644 modules/search-provision/src/main/scala/io/renku/search/provision/Microservice.scala diff --git a/modules/search-provision/src/main/scala/io/renku/search/provision/Microservice.scala b/modules/search-provision/src/main/scala/io/renku/search/provision/Microservice.scala new file mode 100644 index 00000000..178d4d75 --- /dev/null +++ b/modules/search-provision/src/main/scala/io/renku/search/provision/Microservice.scala @@ -0,0 +1,64 @@ +/* + * Copyright 2024 Swiss Data Science Center (SDSC) + * A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and + * Eidgenössische Technische Hochschule Zürich (ETHZ). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.renku.search.provision + +import cats.effect.{ExitCode, IO, IOApp, Temporal} +import io.renku.queue.client.QueueName +import io.renku.redis.client.RedisUrl +import io.renku.search.solr.schema.Migrations +import io.renku.solr.client.SolrConfig +import io.renku.solr.client.migration.SchemaMigrator +import org.http4s.Uri +import org.http4s.implicits.* +import scribe.Scribe +import scribe.cats.* + +import scala.concurrent.duration.* + +object Microservice extends IOApp: + + private val queueName = QueueName("events") + private val redisUrl = RedisUrl("redis://localhost:6379") + private val solrConfig = SolrConfig( + baseUrl = uri"http://localhost:8983" / "solr", + core = "search-core-test", + commitWithin = Some(Duration.Zero), + logMessageBodies = true + ) + private val retryOnErrorDelay = 2 seconds + + override def run(args: List[String]): IO[ExitCode] = + (runSolrMigrations >> startProvisioning) + .as(ExitCode.Success) + + private def startProvisioning: IO[Unit] = + SearchProvisioner[IO](queueName, redisUrl, solrConfig) + .use(_.provisionSolr) + .handleErrorWith { err => + Scribe[IO].error("Starting provisioning failure, retrying", err) >> + Temporal[IO].delayBy(startProvisioning, retryOnErrorDelay) + } + + private def runSolrMigrations: IO[Unit] = + SchemaMigrator[IO](solrConfig) + .use(_.migrate(Migrations.all)) + .handleErrorWith { err => + Scribe[IO].error("Running solr migrations failure, retrying", err) >> + Temporal[IO].delayBy(runSolrMigrations, retryOnErrorDelay) + } diff --git a/modules/solr-client/src/main/scala/io/renku/solr/client/migration/SchemaMigrator.scala b/modules/solr-client/src/main/scala/io/renku/solr/client/migration/SchemaMigrator.scala index 2b56a19a..99d9e628 100644 --- a/modules/solr-client/src/main/scala/io/renku/solr/client/migration/SchemaMigrator.scala +++ b/modules/solr-client/src/main/scala/io/renku/solr/client/migration/SchemaMigrator.scala @@ -18,10 +18,12 @@ package io.renku.solr.client.migration -import cats.effect.Sync +import cats.effect.kernel.Resource +import cats.effect.{Async, Sync} import cats.syntax.all.* +import fs2.io.net.Network import io.renku.solr.client.schema.{Field, FieldName, SchemaCommand, TypeName} -import io.renku.solr.client.{QueryString, SolrClient} +import io.renku.solr.client.{QueryString, SolrClient, SolrConfig} trait SchemaMigrator[F[_]] { @@ -31,8 +33,14 @@ trait SchemaMigrator[F[_]] { } object SchemaMigrator: + def apply[F[_]: Sync](client: SolrClient[F]): SchemaMigrator[F] = Impl[F](client) + def apply[F[_]: Async: Network]( + solrConfig: SolrConfig + ): Resource[F, SchemaMigrator[F]] = + SolrClient[F](solrConfig).map(apply[F]) + private class Impl[F[_]: Sync](client: SolrClient[F]) extends SchemaMigrator[F] { private[this] val logger = scribe.cats.effect[F] private[this] val versionDocId = "VERSION_ID_EB779C6B-1D96-47CB-B304-BECF15E4A607"