diff --git a/build.sbt b/build.sbt index 2f9123fd..f27b43f8 100644 --- a/build.sbt +++ b/build.sbt @@ -27,7 +27,7 @@ releaseVersionBump := sbtrelease.Version.Bump.Minor releaseIgnoreUntrackedFiles := true releaseTagName := (ThisBuild / version).value -addCommandAlias("ci", "; lint; test; publishLocal") +addCommandAlias("ci", "; lint; dbTests; publishLocal") addCommandAlias( "lint", "; scalafmtSbtCheck; scalafmtCheckAll;" // Compile/scalafix --check; Test/scalafix --check @@ -37,6 +37,7 @@ addCommandAlias("fix", "; scalafmtSbt; scalafmtAll") // ; Compile/scalafix; Test lazy val root = project .in(file(".")) .withId("renku-search") + .enablePlugins(DbTestPlugin) .settings( publish / skip := true, publishTo := Some( @@ -45,8 +46,11 @@ lazy val root = project ) .aggregate( commons, + httpClient, messages, redisClient, + solrClient, + searchSolrClient, searchProvision ) @@ -60,9 +64,57 @@ lazy val commons = project Dependencies.catsEffect ++ Dependencies.fs2Core ++ Dependencies.scodecBits ++ - Dependencies.scribe + Dependencies.scribe, + Test / sourceGenerators += Def.task { + val sourceDir = + (LocalRootProject / baseDirectory).value / "project" + val sources = Seq( + sourceDir / "RedisServer.scala", + sourceDir / "SolrServer.scala" + ) // IO.listFiles(sourceDir) + val targetDir = (Test / sourceManaged).value / "servers" + IO.createDirectory(targetDir) + + val targets = sources.map(s => targetDir / s.name) + IO.copy(sources.zip(targets)) + targets + }.taskValue + ) + .enablePlugins(AutomateHeaderPlugin) + .disablePlugins(DbTestPlugin) + +lazy val http4sBorer = project + .in(file("modules/http4s-borer")) + .enablePlugins(AutomateHeaderPlugin) + .disablePlugins(DbTestPlugin) + .withId("http4s-borer") + .settings(commonSettings) + .settings( + name := "http4s-borer", + description := "Use borer codecs with http4s", + libraryDependencies ++= + Dependencies.borer ++ + Dependencies.http4sCore ++ + Dependencies.fs2Core ) + +lazy val httpClient = project + .in(file("modules/http-client")) + .withId("http-client") .enablePlugins(AutomateHeaderPlugin) + .disablePlugins(DbTestPlugin) + .settings(commonSettings) + .settings( + name := "http-client", + description := "Utilities for the http client in http4s", + libraryDependencies ++= + Dependencies.http4sClient ++ + Dependencies.fs2Core ++ + Dependencies.scribe + ) + .dependsOn( + http4sBorer % "compile->compile;test->test" + ) lazy val redisClient = project .in(file("modules/redis-client")) @@ -70,8 +122,6 @@ lazy val redisClient = project .settings(commonSettings) .settings( name := "redis-client", - Test / testOptions += Tests.Setup(RedisServer.start), - Test / testOptions += Tests.Cleanup(RedisServer.stop), libraryDependencies ++= Dependencies.catsCore ++ Dependencies.catsEffect ++ @@ -79,12 +129,50 @@ lazy val redisClient = project Dependencies.redis4CatsStreams ) .enablePlugins(AutomateHeaderPlugin) + .dependsOn( + commons % "test->test" + ) + +lazy val solrClient = project + .in(file("modules/solr-client")) + .withId("solr-client") + .enablePlugins(AvroCodeGen, AutomateHeaderPlugin) + .settings(commonSettings) + .settings( + name := "solr-client", + libraryDependencies ++= + Dependencies.catsCore ++ + Dependencies.catsEffect ++ + Dependencies.http4sClient + ) + .dependsOn( + httpClient % "compile->compile;test->test", + commons % "test->test" + ) + +lazy val searchSolrClient = project + .in(file("modules/search-solr-client")) + .withId("search-solr-client") + .enablePlugins(AvroCodeGen, AutomateHeaderPlugin) + .settings(commonSettings) + .settings( + name := "search-solr-client", + libraryDependencies ++= + Dependencies.catsCore ++ + Dependencies.catsEffect + ) + .dependsOn( + avroCodec % "compile->compile;test->test", + solrClient % "compile->compile;test->test", + commons % "test->test" + ) lazy val avroCodec = project .in(file("modules/avro-codec")) + .disablePlugins(DbTestPlugin) .settings(commonSettings) .settings( - name := "avro-codecs", + name := "avro-codec", libraryDependencies ++= Dependencies.avro ++ Dependencies.scodecBits @@ -94,40 +182,27 @@ lazy val messages = project .in(file("modules/messages")) .settings(commonSettings) .settings( - name := "messages", - libraryDependencies ++= Dependencies.avro, - Compile / avroScalaCustomTypes := { - avrohugger.format.SpecificRecord.defaultTypes.copy( - record = avrohugger.types.ScalaCaseClassWithSchema - ) - }, - Compile / avroScalaSpecificCustomTypes := { - avrohugger.format.SpecificRecord.defaultTypes.copy( - record = avrohugger.types.ScalaCaseClassWithSchema - ) - }, - Compile / sourceGenerators += (Compile / avroScalaGenerate).taskValue + name := "messages" ) .dependsOn( commons % "compile->compile;test->test", avroCodec % "compile->compile;test->test" ) - .enablePlugins(AutomateHeaderPlugin) + .enablePlugins(AvroCodeGen, AutomateHeaderPlugin) + .disablePlugins(DbTestPlugin) lazy val searchProvision = project .in(file("modules/search-provision")) .withId("search-provision") .settings(commonSettings) .settings( - name := "search-provision", - Test / testOptions += Tests.Setup(RedisServer.start), - Test / testOptions += Tests.Cleanup(RedisServer.stop) + name := "search-provision" ) .dependsOn( commons % "compile->compile;test->test", messages % "compile->compile;test->test", - avroCodec % "compile->compile;test->test", - redisClient % "compile->compile;test->test" + redisClient % "compile->compile;test->test", + searchSolrClient % "compile->compile;test->test" ) .enablePlugins(AutomateHeaderPlugin) diff --git a/modules/avro-codec/src/main/scala/io/renku/avro/codec/all.scala b/modules/avro-codec/src/main/scala/io/renku/avro/codec/all.scala new file mode 100644 index 00000000..d5595053 --- /dev/null +++ b/modules/avro-codec/src/main/scala/io/renku/avro/codec/all.scala @@ -0,0 +1,3 @@ +package io.renku.avro.codec + +object all extends encoders.all with decoders.all diff --git a/modules/avro-codec/src/main/scala/io/renku/avro/codec/decoders/CollectionDecoders.scala b/modules/avro-codec/src/main/scala/io/renku/avro/codec/decoders/CollectionDecoders.scala index d6cf2cd4..7cd9a39f 100644 --- a/modules/avro-codec/src/main/scala/io/renku/avro/codec/decoders/CollectionDecoders.scala +++ b/modules/avro-codec/src/main/scala/io/renku/avro/codec/decoders/CollectionDecoders.scala @@ -18,13 +18,16 @@ package io.renku.avro.codec.decoders -import io.renku.avro.codec.{AvroDecoder, AvroCodecException} +import io.renku.avro.codec.{AvroCodecException, AvroDecoder} import org.apache.avro.Schema import scala.jdk.CollectionConverters.* import scala.reflect.ClassTag trait CollectionDecoders: + given [T: ClassTag](using decoder: AvroDecoder[T]): AvroDecoder[Array[T]] = + CollectionDecoders.ArrayDecoder[T](decoder) + given [T](using dec: AvroDecoder[T]): AvroDecoder[List[T]] = CollectionDecoders.iterableDecoder[T, List](dec, _.toList) @@ -34,8 +37,12 @@ trait CollectionDecoders: given [T](using dec: AvroDecoder[T]): AvroDecoder[Set[T]] = CollectionDecoders.iterableDecoder[T, Set](dec, _.toSet) + given [T](using dec: AvroDecoder[T]): AvroDecoder[Map[String, T]] = + CollectionDecoders.MapDecoder[T](dec) + object CollectionDecoders: - class ArrayDecoder[T: ClassTag](decoder: AvroDecoder[T]) extends AvroDecoder[Array[T]]: + private class ArrayDecoder[T: ClassTag](decoder: AvroDecoder[T]) + extends AvroDecoder[Array[T]]: def decode(schema: Schema): Any => Array[T] = { require( schema.getType == Schema.Type.ARRAY, @@ -51,6 +58,16 @@ object CollectionDecoders: } } + private class MapDecoder[T](decoder: AvroDecoder[T]) + extends AvroDecoder[Map[String, T]]: + override def decode(schema: Schema): Any => Map[String, T] = { + require(schema.getType == Schema.Type.MAP) + val decode = decoder.decode(schema.getValueType) + { case map: java.util.Map[_, _] => + map.asScala.toMap.map { case (k, v) => k.toString -> decode(v) } + } + } + def iterableDecoder[T, C[X] <: Iterable[X]]( decoder: AvroDecoder[T], build: Iterable[T] => C[T] diff --git a/modules/avro-codec/src/main/scala/io/renku/avro/codec/decoders/all.scala b/modules/avro-codec/src/main/scala/io/renku/avro/codec/decoders/all.scala index 92937b5e..a8655e1b 100644 --- a/modules/avro-codec/src/main/scala/io/renku/avro/codec/decoders/all.scala +++ b/modules/avro-codec/src/main/scala/io/renku/avro/codec/decoders/all.scala @@ -18,7 +18,7 @@ package io.renku.avro.codec.decoders -object all +trait all extends PrimitiveDecoders with StringDecoders with BigDecimalDecoders @@ -29,3 +29,5 @@ object all with JavaEnumDecoders with EitherDecoders with RecordDecoders + +object all extends all diff --git a/modules/avro-codec/src/main/scala/io/renku/avro/codec/encoders/CollectionEncoders.scala b/modules/avro-codec/src/main/scala/io/renku/avro/codec/encoders/CollectionEncoders.scala index 9909ec8a..222c66d9 100644 --- a/modules/avro-codec/src/main/scala/io/renku/avro/codec/encoders/CollectionEncoders.scala +++ b/modules/avro-codec/src/main/scala/io/renku/avro/codec/encoders/CollectionEncoders.scala @@ -28,7 +28,7 @@ trait CollectionEncoders { private def iterableEncoder[T, C[X] <: Iterable[X]]( encoder: AvroEncoder[T] ): AvroEncoder[C[T]] = (schema: Schema) => { - require(schema.getType == Schema.Type.ARRAY) + require(schema.getType == Schema.Type.ARRAY, s"Expected array schema, got: $schema") val elementEncoder = encoder.encode(schema.getElementType) { t => t.map(elementEncoder.apply).toList.asJava } } @@ -48,4 +48,18 @@ trait CollectionEncoders { given [T](using encoder: AvroEncoder[T]): AvroEncoder[Vector[T]] = iterableEncoder( encoder ) + given [T](using encoder: AvroEncoder[T]): AvroEncoder[Map[String, T]] = + CollectionEncoders.MapEncoder[T](encoder) } + +object CollectionEncoders: + private class MapEncoder[T](encoder: AvroEncoder[T]) + extends AvroEncoder[Map[String, T]]: + override def encode(schema: Schema): Map[String, T] => Any = { + val encodeT = encoder.encode(schema.getValueType) + { value => + val map = new java.util.HashMap[String, Any] + value.foreach { case (k, v) => map.put(k, encodeT.apply(v)) } + map + } + } diff --git a/modules/avro-codec/src/main/scala/io/renku/avro/codec/encoders/all.scala b/modules/avro-codec/src/main/scala/io/renku/avro/codec/encoders/all.scala index 4db87c24..a5810637 100644 --- a/modules/avro-codec/src/main/scala/io/renku/avro/codec/encoders/all.scala +++ b/modules/avro-codec/src/main/scala/io/renku/avro/codec/encoders/all.scala @@ -18,7 +18,7 @@ package io.renku.avro.codec.encoders -object all +trait all extends PrimitiveEncoders with StringEncoders with BigDecimalEncoders @@ -29,3 +29,5 @@ object all with ByteArrayEncoders with JavaEnumEncoders with RecordEncoders + +object all extends all diff --git a/modules/avro-codec/src/main/scala/io/renku/avro/codec/json/AvroJsonDecoder.scala b/modules/avro-codec/src/main/scala/io/renku/avro/codec/json/AvroJsonDecoder.scala new file mode 100644 index 00000000..e65222ee --- /dev/null +++ b/modules/avro-codec/src/main/scala/io/renku/avro/codec/json/AvroJsonDecoder.scala @@ -0,0 +1,26 @@ +package io.renku.avro.codec.json + +import io.renku.avro.codec.{AvroDecoder, AvroReader} +import org.apache.avro.Schema +import scodec.bits.ByteVector + +import scala.util.Try + +trait AvroJsonDecoder[A]: + def decode(json: ByteVector): Either[String, A] + final def map[B](f: A => B): AvroJsonDecoder[B] = + AvroJsonDecoder(this.decode.andThen(_.map(f))) + final def emap[B](f: A => Either[String, B]): AvroJsonDecoder[B] = + AvroJsonDecoder(this.decode.andThen(_.flatMap(f))) + +object AvroJsonDecoder: + def apply[A](using d: AvroJsonDecoder[A]): AvroJsonDecoder[A] = d + + def apply[A](f: ByteVector => Either[String, A]): AvroJsonDecoder[A] = + (json: ByteVector) => f(json) + + def create[A: AvroDecoder](schema: Schema): AvroJsonDecoder[A] = { json => + Try(AvroReader(schema).readJson[A](json)).toEither.left + .map(_.getMessage) + .flatMap(_.headOption.toRight(s"Empty json")) + } diff --git a/modules/avro-codec/src/main/scala/io/renku/avro/codec/json/AvroJsonEncoder.scala b/modules/avro-codec/src/main/scala/io/renku/avro/codec/json/AvroJsonEncoder.scala new file mode 100644 index 00000000..1e9a72b4 --- /dev/null +++ b/modules/avro-codec/src/main/scala/io/renku/avro/codec/json/AvroJsonEncoder.scala @@ -0,0 +1,32 @@ +package io.renku.avro.codec.json + +import io.renku.avro.codec.{AvroEncoder, AvroWriter} +import io.renku.avro.codec.encoders.all.given +import org.apache.avro.{Schema, SchemaBuilder} +import scodec.bits.ByteVector + +trait AvroJsonEncoder[A]: + def encode(value: A): ByteVector + final def contramap[B](f: B => A): AvroJsonEncoder[B] = + AvroJsonEncoder(f.andThen(this.encode)) + +object AvroJsonEncoder: + def apply[A](using e: AvroJsonEncoder[A]): AvroJsonEncoder[A] = e + + def apply[A](f: A => ByteVector): AvroJsonEncoder[A] = + (a: A) => f(a) + + def create[A: AvroEncoder](schema: Schema): AvroJsonEncoder[A] = + a => AvroWriter(schema).writeJson(Seq(a)) + + given AvroJsonEncoder[String] = + create[String](SchemaBuilder.builder().stringType()) + + given AvroJsonEncoder[Long] = + create[Long](SchemaBuilder.builder().longType()) + + given AvroJsonEncoder[Int] = + create[Int](SchemaBuilder.builder().intType()) + + given AvroJsonEncoder[Double] = + create[Double](SchemaBuilder.builder().doubleType()) diff --git a/modules/avro-codec/src/test/scala/io/renku/avro/codec/json/JsonCodecTest.scala b/modules/avro-codec/src/test/scala/io/renku/avro/codec/json/JsonCodecTest.scala new file mode 100644 index 00000000..e9f60039 --- /dev/null +++ b/modules/avro-codec/src/test/scala/io/renku/avro/codec/json/JsonCodecTest.scala @@ -0,0 +1,43 @@ +package io.renku.avro.codec.json + +import io.renku.avro.codec.* +import io.renku.avro.codec.all.given +import munit.FunSuite +import org.apache.avro.{Schema, SchemaBuilder} +import scala.collection.immutable.Map + +class JsonCodecTest extends FunSuite { + + test("encode and decode json") { + val person = + JsonCodecTest.Person("hugo", 42, Map("date" -> "1982", "children" -> "0")) + val json = AvroJsonEncoder[JsonCodecTest.Person].encode(person) + val decoded = AvroJsonDecoder[JsonCodecTest.Person].decode(json) + assertEquals(decoded, Right(person)) + } +} + +object JsonCodecTest: + + case class Person(name: String, age: Int, props: Map[String, String]) + derives AvroEncoder, + AvroDecoder + object Person: + val schema: Schema = SchemaBuilder + .record("Person") + .fields() + .name("name") + .`type`("string") + .noDefault() + .name("age") + .`type`("int") + .noDefault() + .name("props") + .`type`( + SchemaBuilder.map().values(SchemaBuilder.builder().`type`("string")) + ) + .noDefault() + .endRecord() + + given AvroJsonEncoder[Person] = AvroJsonEncoder.create(schema) + given AvroJsonDecoder[Person] = AvroJsonDecoder.create(schema) diff --git a/modules/http-client/src/main/scala/io/renku/search/http/ClientBuilder.scala b/modules/http-client/src/main/scala/io/renku/search/http/ClientBuilder.scala new file mode 100644 index 00000000..355108c6 --- /dev/null +++ b/modules/http-client/src/main/scala/io/renku/search/http/ClientBuilder.scala @@ -0,0 +1,82 @@ +/* + * Copyright 2024 Swiss Data Science Center (SDSC) + * A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and + * Eidgenössische Technische Hochschule Zürich (ETHZ). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.renku.search.http + +import scala.concurrent.duration.Duration + +import cats.effect.kernel.{Async, Resource} +import fs2.io.net.Network + +import org.http4s.Status +import org.http4s.client.Client +import org.http4s.ember.client.EmberClientBuilder +import scribe.Scribe + +final class ClientBuilder[F[_]: Async]( + delegate: EmberClientBuilder[F], + middlewares: List[Client[F] => Client[F]] +) { + def withLogger(l: Scribe[F]): ClientBuilder[F] = + forward(_.withLogger(LoggerProxy(l))) + + def withTimeout(timeout: Duration): ClientBuilder[F] = + forward(_.withTimeout(timeout).withIdleConnectionTime(timeout * 1.5)) + + def withDefaultRetry(cfg: RetryConfig): ClientBuilder[F] = + forward(_.withRetryPolicy(RetryStrategy.default[F].policy(cfg))) + + def withDefaultRetry(cfg: Option[RetryConfig]): ClientBuilder[F] = + cfg match + case Some(c) => withDefaultRetry(c) + case None => this + + def withRetry( + retriableStatuses: Set[Status], + cfg: RetryConfig + ): ClientBuilder[F] = + forward(_.withRetryPolicy(RetryStrategy(retriableStatuses).policy(cfg))) + + def withLogging(logBody: Boolean, logger: Scribe[F]): ClientBuilder[F] = { + val mw = org.http4s.client.middleware.Logger[F]( + logHeaders = true, + logBody = logBody, + logAction = Some(logger.debug(_)) + ) + new ClientBuilder[F](delegate.withLogger(LoggerProxy(logger)), mw :: middlewares) + } + + private def forward( + f: EmberClientBuilder[F] => EmberClientBuilder[F] + ): ClientBuilder[F] = + new ClientBuilder[F](f(delegate), middlewares) + + def build: Resource[F, Client[F]] = + delegate.build.map(c0 => middlewares.foldRight(c0)(_ apply _)) + +} + +object ClientBuilder: + def apply[F[_]: Async](b: EmberClientBuilder[F]): ClientBuilder[F] = + new ClientBuilder[F](b, Nil) + + def default[F[_]: Async: Network]: ClientBuilder[F] = + apply(EmberClientBuilder.default[F]) + + extension [F[_]: Async](self: EmberClientBuilder[F]) + def lift: ClientBuilder[F] = new ClientBuilder[F](self, Nil) diff --git a/modules/http-client/src/main/scala/io/renku/search/http/HttpClientDsl.scala b/modules/http-client/src/main/scala/io/renku/search/http/HttpClientDsl.scala new file mode 100644 index 00000000..97037721 --- /dev/null +++ b/modules/http-client/src/main/scala/io/renku/search/http/HttpClientDsl.scala @@ -0,0 +1,39 @@ +/* + * Copyright 2024 Swiss Data Science Center (SDSC) + * A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and + * Eidgenössische Technische Hochschule Zürich (ETHZ). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.renku.search.http + +import org.http4s.client.dsl.Http4sClientDsl +import org.http4s.headers.Authorization +import org.http4s.{AuthScheme, BasicCredentials, Request} + +trait HttpClientDsl[F[_]] extends Http4sClientDsl[F] { + + implicit final class MoreRequestDsl(req: Request[F]) { + def withBasicAuth(cred: Option[BasicCredentials]): Request[F] = + cred.map(c => req.putHeaders(Authorization(c))).getOrElse(req) + + def withBearerToken(token: Option[String]): Request[F] = + token match + case Some(t) => + req.putHeaders( + Authorization(org.http4s.Credentials.Token(AuthScheme.Bearer, t)) + ) + case None => req + } +} diff --git a/modules/http-client/src/main/scala/io/renku/search/http/LoggerProxy.scala b/modules/http-client/src/main/scala/io/renku/search/http/LoggerProxy.scala new file mode 100644 index 00000000..29405482 --- /dev/null +++ b/modules/http-client/src/main/scala/io/renku/search/http/LoggerProxy.scala @@ -0,0 +1,48 @@ +/* + * Copyright 2024 Swiss Data Science Center (SDSC) + * A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and + * Eidgenössische Technische Hochschule Zürich (ETHZ). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.renku.search.http + +import scribe.Scribe + +final class LoggerProxy[F[_]](logger: Scribe[F]) + extends org.typelevel.log4cats.Logger[F] { + override def error(message: => String): F[Unit] = logger.error(message) + + override def warn(message: => String): F[Unit] = logger.warn(message) + + override def info(message: => String): F[Unit] = logger.info(message) + + override def debug(message: => String): F[Unit] = logger.debug(message) + + override def trace(message: => String): F[Unit] = logger.trace(message) + + override def error(t: Throwable)(message: => String): F[Unit] = logger.error(message, t) + + override def warn(t: Throwable)(message: => String): F[Unit] = logger.warn(message, t) + + override def info(t: Throwable)(message: => String): F[Unit] = logger.info(message, t) + + override def debug(t: Throwable)(message: => String): F[Unit] = logger.debug(message, t) + + override def trace(t: Throwable)(message: => String): F[Unit] = logger.trace(message, t) +} + +object LoggerProxy: + def apply[F[_]](delegate: Scribe[F]): org.typelevel.log4cats.Logger[F] = + new LoggerProxy[F](delegate) diff --git a/modules/http-client/src/main/scala/io/renku/search/http/ResponseLogging.scala b/modules/http-client/src/main/scala/io/renku/search/http/ResponseLogging.scala new file mode 100644 index 00000000..31e9d9b7 --- /dev/null +++ b/modules/http-client/src/main/scala/io/renku/search/http/ResponseLogging.scala @@ -0,0 +1,62 @@ +/* + * Copyright 2024 Swiss Data Science Center (SDSC) + * A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and + * Eidgenössische Technische Hochschule Zürich (ETHZ). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.renku.search.http + +import cats.effect.Sync +import cats.syntax.all.* +import org.http4s.client.UnexpectedStatus +import org.http4s.{Request, Response} +import scribe.mdc.MDC +import scribe.{Level, Scribe} + +enum ResponseLogging(level: Option[Level]): + case Error extends ResponseLogging(Level.Error.some) + case Warn extends ResponseLogging(Level.Warn.some) + case Info extends ResponseLogging(Level.Info.some) + case Debug extends ResponseLogging(Level.Debug.some) + case Ignore extends ResponseLogging(None) + + def apply[F[_]: Sync](logger: Scribe[F], req: Request[F])(using + sourcecode.Pkg, + sourcecode.FileName, + sourcecode.Name, + sourcecode.Line + ): Response[F] => F[Throwable] = + level match + case Some(level) => ResponseLogging.log(logger, level, req) + case None => r => UnexpectedStatus(r.status, req.method, req.uri).pure[F] + +object ResponseLogging: + def log[F[_]: Sync]( + logger: Scribe[F], + level: Level, + req: Request[F] + )(using + mdc: MDC, + pkg: sourcecode.Pkg, + fileName: sourcecode.FileName, + name: sourcecode.Name, + line: sourcecode.Line + ): Response[F] => F[Throwable] = + resp => + resp.bodyText.compile.string.flatMap { body => + logger + .log(level, mdc, s"Unexpected status ${resp.status}: $body") + .as(UnexpectedStatus(resp.status, req.method, req.uri)) + } diff --git a/modules/http-client/src/main/scala/io/renku/search/http/RetryConfig.scala b/modules/http-client/src/main/scala/io/renku/search/http/RetryConfig.scala new file mode 100644 index 00000000..950a19b6 --- /dev/null +++ b/modules/http-client/src/main/scala/io/renku/search/http/RetryConfig.scala @@ -0,0 +1,26 @@ +/* + * Copyright 2024 Swiss Data Science Center (SDSC) + * A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and + * Eidgenössische Technische Hochschule Zürich (ETHZ). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.renku.search.http + +import scala.concurrent.duration.* + +final case class RetryConfig(maxWait: Duration, maxRetries: Int) + +object RetryConfig: + val default: RetryConfig = RetryConfig(50.seconds, 4) diff --git a/modules/http-client/src/main/scala/io/renku/search/http/RetryStrategy.scala b/modules/http-client/src/main/scala/io/renku/search/http/RetryStrategy.scala new file mode 100644 index 00000000..bb766ffe --- /dev/null +++ b/modules/http-client/src/main/scala/io/renku/search/http/RetryStrategy.scala @@ -0,0 +1,48 @@ +/* + * Copyright 2024 Swiss Data Science Center (SDSC) + * A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and + * Eidgenössische Technische Hochschule Zürich (ETHZ). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.renku.search.http + +import org.http4s.client.middleware.RetryPolicy +import org.http4s.headers.`Idempotency-Key` +import org.http4s.{Request, Response, Status} + +final class RetryStrategy[F[_]](retriableStatuses: Set[Status]) { + + def policy(cfg: RetryConfig): RetryPolicy[F] = + RetryPolicy( + backoff = RetryPolicy.exponentialBackoff(cfg.maxWait, cfg.maxRetries), + retriable = isRetryRequest + ) + + private def isRetryRequest( + req: Request[F], + result: Either[Throwable, Response[F]] + ): Boolean = + (req.method.isIdempotent || req.headers.contains[`Idempotency-Key`]) && + RetryPolicy.isErrorOrStatus(result, retriableStatuses) +} + +object RetryStrategy: + def apply[F[_]](retriableStatuses: Set[Status]): RetryStrategy[F] = + new RetryStrategy[F](retriableStatuses) + + def default[F[_]]: RetryStrategy[F] = + apply(RetryPolicy.RetriableStatuses) + + def defaultPolicy[F[_]](cfg: RetryConfig): RetryPolicy[F] = default[F].policy(cfg) diff --git a/modules/http4s-borer/src/main/scala/io/renku/search/http/borer/BorerDecodeFailure.scala b/modules/http4s-borer/src/main/scala/io/renku/search/http/borer/BorerDecodeFailure.scala new file mode 100644 index 00000000..4a6618a7 --- /dev/null +++ b/modules/http4s-borer/src/main/scala/io/renku/search/http/borer/BorerDecodeFailure.scala @@ -0,0 +1,40 @@ +/* + * Copyright 2024 Swiss Data Science Center (SDSC) + * A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and + * Eidgenössische Technische Hochschule Zürich (ETHZ). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.renku.search.http.borer + +import io.bullet.borer.derivation.MapBasedCodecs.* +import io.bullet.borer.{Borer, Encoder} +import org.http4s._ + +final case class BorerDecodeFailure(respString: String, error: Borer.Error[?]) + extends DecodeFailure { + + override val message: String = s"${error.getMessage}: $respString" + + override val cause: Option[Throwable] = Option(error.getCause) + + def toHttpResponse[F[_]](httpVersion: HttpVersion): Response[F] = + Response(status = Status.BadRequest).withEntity(this) +} + +object BorerDecodeFailure: + given Encoder[Borer.Error[?]] = Encoder.forString.contramap(_.getMessage) + given Encoder[BorerDecodeFailure] = deriveEncoder + given [F[_]]: EntityEncoder[F, BorerDecodeFailure] = + BorerEntities.encodeEntityJson[F, BorerDecodeFailure] diff --git a/modules/http4s-borer/src/main/scala/io/renku/search/http/borer/BorerEntities.scala b/modules/http4s-borer/src/main/scala/io/renku/search/http/borer/BorerEntities.scala new file mode 100644 index 00000000..0e16fcce --- /dev/null +++ b/modules/http4s-borer/src/main/scala/io/renku/search/http/borer/BorerEntities.scala @@ -0,0 +1,59 @@ +/* + * Copyright 2024 Swiss Data Science Center (SDSC) + * A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and + * Eidgenössische Technische Hochschule Zürich (ETHZ). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.renku.search.http.borer + +import cats.data.EitherT +import cats.effect.* +import cats.syntax.all.* +import fs2.Chunk + +import io.bullet.borer.* +import org.http4s.* +import org.http4s.headers.* + +object BorerEntities: + def decodeEntityJson[F[_]: Async, A: Decoder]: EntityDecoder[F, A] = + EntityDecoder.decodeBy(MediaType.application.json)(decodeJson) + + def decodeEntityCbor[F[_]: Async, A: Decoder]: EntityDecoder[F, A] = + EntityDecoder.decodeBy(MediaType.application.cbor)(decodeCbor) + + def decodeJson[F[_]: Async, A: Decoder](media: Media[F]): DecodeResult[F, A] = + EitherT(StreamProvider(media.body).flatMap { implicit input => + for { + res <- Async[F].delay(Json.decode(input).to[A].valueEither) + } yield res.left.map(BorerDecodeFailure("", _)) + }) + + def decodeCbor[F[_]: Async, A: Decoder](media: Media[F]): DecodeResult[F, A] = + EitherT(StreamProvider(media.body).flatMap { implicit input => + for { + res <- Async[F].delay(Cbor.decode(input).to[A].valueEither) + } yield res.left.map(BorerDecodeFailure("", _)) + }) + + def encodeEntityJson[F[_], A: Encoder]: EntityEncoder[F, A] = + EntityEncoder.simple(`Content-Type`(MediaType.application.json))(a => + Chunk.array(Json.encode(a).toByteArray) + ) + + def encodeEntityCbor[F[_], A: Encoder]: EntityEncoder[F, A] = + EntityEncoder.simple(`Content-Type`(MediaType.application.cbor))(a => + Chunk.array(Cbor.encode(a).toByteArray) + ) diff --git a/modules/http4s-borer/src/main/scala/io/renku/search/http/borer/BorerEntityCborCodec.scala b/modules/http4s-borer/src/main/scala/io/renku/search/http/borer/BorerEntityCborCodec.scala new file mode 100644 index 00000000..9efd0c3b --- /dev/null +++ b/modules/http4s-borer/src/main/scala/io/renku/search/http/borer/BorerEntityCborCodec.scala @@ -0,0 +1,32 @@ +/* + * Copyright 2024 Swiss Data Science Center (SDSC) + * A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and + * Eidgenössische Technische Hochschule Zürich (ETHZ). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.renku.search.http.borer + +import cats.effect.Async +import io.bullet.borer.{Decoder, Encoder} +import org.http4s.{EntityDecoder, EntityEncoder} + +trait BorerEntityCborCodec: + given [F[_]: Async, A: Decoder]: EntityDecoder[F, A] = + BorerEntities.decodeEntityCbor[F, A] + + given [F[_], A: Encoder]: EntityEncoder[F, A] = + BorerEntities.encodeEntityCbor[F, A] + +object BorerEntityCborCodec extends BorerEntityCborCodec diff --git a/modules/http4s-borer/src/main/scala/io/renku/search/http/borer/BorerEntityJsonCodec.scala b/modules/http4s-borer/src/main/scala/io/renku/search/http/borer/BorerEntityJsonCodec.scala new file mode 100644 index 00000000..decf8927 --- /dev/null +++ b/modules/http4s-borer/src/main/scala/io/renku/search/http/borer/BorerEntityJsonCodec.scala @@ -0,0 +1,32 @@ +/* + * Copyright 2024 Swiss Data Science Center (SDSC) + * A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and + * Eidgenössische Technische Hochschule Zürich (ETHZ). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.renku.search.http.borer + +import cats.effect.Async +import io.bullet.borer.{Decoder, Encoder} +import org.http4s.{EntityDecoder, EntityEncoder} + +trait BorerEntityJsonCodec: + given [F[_]: Async, A: Decoder]: EntityDecoder[F, A] = + BorerEntities.decodeEntityJson[F, A] + + given [F[_], A: Encoder]: EntityEncoder[F, A] = + BorerEntities.encodeEntityJson[F, A] + +object BorerEntityJsonCodec extends BorerEntityJsonCodec diff --git a/modules/http4s-borer/src/main/scala/io/renku/search/http/borer/Http4sJsonCodec.scala b/modules/http4s-borer/src/main/scala/io/renku/search/http/borer/Http4sJsonCodec.scala new file mode 100644 index 00000000..84815087 --- /dev/null +++ b/modules/http4s-borer/src/main/scala/io/renku/search/http/borer/Http4sJsonCodec.scala @@ -0,0 +1,27 @@ +/* + * Copyright 2024 Swiss Data Science Center (SDSC) + * A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and + * Eidgenössische Technische Hochschule Zürich (ETHZ). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.renku.search.http.borer + +import io.bullet.borer.* +import org.http4s.* + +trait Http4sJsonCodec: + given Encoder[Uri] = Encoder.forString.contramap(_.renderString) + +object Http4sJsonCodec extends Http4sJsonCodec diff --git a/modules/http4s-borer/src/main/scala/io/renku/search/http/borer/StreamDecode.scala b/modules/http4s-borer/src/main/scala/io/renku/search/http/borer/StreamDecode.scala new file mode 100644 index 00000000..6d1eb8e2 --- /dev/null +++ b/modules/http4s-borer/src/main/scala/io/renku/search/http/borer/StreamDecode.scala @@ -0,0 +1,83 @@ +/* + * Copyright 2024 Swiss Data Science Center (SDSC) + * A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and + * Eidgenössische Technische Hochschule Zürich (ETHZ). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.renku.search.http.borer + +import fs2.* + +import _root_.io.bullet.borer.* + +/** Caveat: the buffer-size must be large enough to decode at least one A */ +object StreamDecode { + + def decodeJson[F[_]: RaiseThrowable, A: Decoder]( + in: Stream[F, Byte], + bufferSize: Int = 64 * 1024 + ): Stream[F, A] = + decode0[F, A](ba => Json.decode(ba).withPrefixOnly.to[A])(in, bufferSize) + + private def decode0[F[_]: RaiseThrowable, A]( + decode: Input[Array[Byte]] => DecodingSetup.Sealed[A] + )( + in: Stream[F, Byte], + bufferSize: Int + ): Stream[F, A] = + in.dropLastIf(_ == 10) + .repeatPull(_.unconsN(bufferSize, allowFewer = true).flatMap { + case Some((hd, tl)) => + decodeCont[A](decode)(hd) match { + case Right((v, remain)) => + Pull.output(Chunk.from(v)).as(Some(Stream.chunk(remain) ++ tl)) + case Left(ex) => + Pull.raiseError(ex) + } + case None => + Pull.pure(None) + }) + + private[this] val curlyClose = Chunk('}'.toByte) + private def decodeCont[A]( + decode: Input[Array[Byte]] => DecodingSetup.Sealed[A] + )(input: Chunk[Byte]): Either[Throwable, (Vector[A], Chunk[Byte])] = { + @annotation.tailrec + def go( + in: Input[Array[Byte]], + pos: Int, + result: Vector[A] + ): Either[Throwable, (Vector[A], Chunk[Byte])] = + decode(in).valueAndInputEither match + case Right((v, rest)) => + val rb = rest.asInstanceOf[Input[Array[Byte]]] + val nextPos = rb.cursor.toInt + go(rb.unread(1), nextPos - 1, result :+ v) + + case Left(ex) => + if (result.isEmpty) Left(ex) + else { + // the position seems sometimes off by 1, standing on the last char closing the json object + val hack = { + val next = input.drop(pos) + if (next == curlyClose) Chunk.empty + else next + } + Right(result -> hack) + } + + go(Input.fromByteArray(input.toArray), 0, Vector.empty) + } +} diff --git a/modules/http4s-borer/src/main/scala/io/renku/search/http/borer/StreamProvider.scala b/modules/http4s-borer/src/main/scala/io/renku/search/http/borer/StreamProvider.scala new file mode 100644 index 00000000..ff281f4d --- /dev/null +++ b/modules/http4s-borer/src/main/scala/io/renku/search/http/borer/StreamProvider.scala @@ -0,0 +1,33 @@ +/* + * Copyright 2024 Swiss Data Science Center (SDSC) + * A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and + * Eidgenössische Technische Hochschule Zürich (ETHZ). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.renku.search.http.borer + +import cats.effect.* +import cats.syntax.all.* +import fs2.Stream + +import io.bullet.borer.* +import scodec.bits.ByteVector + +object StreamProvider: + + def apply[F[_]: Sync]( + in: Stream[F, Byte] + ): F[Input[Array[Byte]]] = + in.compile.to(ByteVector).map(bv => Input.fromByteArray(bv.toArray)) diff --git a/modules/messages/src/main/avro/messages.avdl b/modules/messages/src/main/avro/messages.avdl index 89b5b11d..2cfb19b7 100644 --- a/modules/messages/src/main/avro/messages.avdl +++ b/modules/messages/src/main/avro/messages.avdl @@ -6,6 +6,7 @@ protocol Messages { /* An example record for a "project-created-event" */ record ProjectCreated { + string id; string name; string description; string? owner; @@ -14,6 +15,7 @@ protocol Messages { /* A project got updated */ record ProjectUpdated { + string id; string name; string @aliases(["oldDescription"]) description; Shapes icon; @@ -22,6 +24,7 @@ protocol Messages { } record ProjectDeleted { + string id; string name; timestamp_ms deletedAt; } @@ -30,4 +33,11 @@ protocol Messages { /* record ProjectMsg { union { ProjectCreated, ProjectUpdated, ProjectDeleted } message; } */ + + /* An example record for a Project Entity returned from the Search API */ + record ProjectEntity { + string id; + string name; + string description; + } } \ No newline at end of file diff --git a/modules/messages/src/test/scala/io/renku/messages/SerializeDeserializeTest.scala b/modules/messages/src/test/scala/io/renku/messages/SerializeDeserializeTest.scala index 7c2b9e57..aabca478 100644 --- a/modules/messages/src/test/scala/io/renku/messages/SerializeDeserializeTest.scala +++ b/modules/messages/src/test/scala/io/renku/messages/SerializeDeserializeTest.scala @@ -25,11 +25,13 @@ import munit.FunSuite import java.time.Instant import java.time.temporal.ChronoUnit +import java.util.UUID class SerializeDeserializeTest extends FunSuite { test("serialize and deserialize ProjectCreated") { val data = ProjectCreated( + UUID.randomUUID().toString, "my-project", "a description for it", None, @@ -45,6 +47,7 @@ class SerializeDeserializeTest extends FunSuite { test("serialize and deserialize ProjectUpdated") { val data1 = ProjectUpdated( + UUID.randomUUID().toString, "my-project", "a description for it", Shapes.CIRCLE, diff --git a/modules/redis-client/src/main/scala/io/renku/queue/client/QueueClient.scala b/modules/redis-client/src/main/scala/io/renku/queue/client/QueueClient.scala index 160e22a4..a63d32f9 100644 --- a/modules/redis-client/src/main/scala/io/renku/queue/client/QueueClient.scala +++ b/modules/redis-client/src/main/scala/io/renku/queue/client/QueueClient.scala @@ -18,8 +18,11 @@ package io.renku.queue.client +import cats.effect.{Async, Resource} import fs2.Stream +import io.renku.redis.client.{RedisQueueClient, RedisUrl} import scodec.bits.ByteVector +import scribe.Scribe trait QueueClient[F[_]] { @@ -39,3 +42,7 @@ trait QueueClient[F[_]] { def findLastProcessed(clientId: ClientId, queueName: QueueName): F[Option[MessageId]] } + +object QueueClient: + def apply[F[_]: Async: Scribe](redisUrl: RedisUrl): Resource[F, QueueClient[F]] = + RedisQueueClient[F](redisUrl) diff --git a/modules/redis-client/src/test/scala/io/renku/redis/client/util/RedisServer.scala b/modules/redis-client/src/test/scala/io/renku/redis/client/util/RedisServer.scala deleted file mode 100644 index f5b8d541..00000000 --- a/modules/redis-client/src/test/scala/io/renku/redis/client/util/RedisServer.scala +++ /dev/null @@ -1,93 +0,0 @@ -/* - * Copyright 2024 Swiss Data Science Center (SDSC) - * A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and - * Eidgenössische Technische Hochschule Zürich (ETHZ). - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.renku.redis.client.util - -import cats.syntax.all._ - -import java.util.concurrent.atomic.AtomicBoolean -import scala.sys.process._ -import scala.util.Try - -object RedisServer extends RedisServer("graph", port = 6379) - -class RedisServer(module: String, port: Int) { - - val url: String = s"redis://localhost:$port" - - // When using a local Redis for development, use this env variable - // to not start a Redis server via docker for the tests - private val skipServer: Boolean = sys.env.contains("NO_REDIS") - - private val containerName = s"$module-test-redis" - private val image = "redis:7.2.4-alpine" - private val startCmd = s"""|docker run --rm - |--name $containerName - |-p $port:6379 - |-d $image""".stripMargin - private val isRunningCmd = s"docker container ls --filter 'name=$containerName'" - private val stopCmd = s"docker stop -t5 $containerName" - private val readyCmd = "redis-cli -h 127.0.0.1 -p 6379 PING" - private val isReadyCmd = s"docker exec $containerName sh -c '$readyCmd'" - private val wasRunning = new AtomicBoolean(false) - - def start(): Unit = synchronized { - if (skipServer) println("Not starting Redis via docker") - else if (checkRunning) () - else { - println(s"Starting Redis container for '$module' from '$image' image") - startContainer() - var rc = 1 - while (rc != 0) { - Thread.sleep(500) - rc = isReadyCmd.! - if (rc == 0) println(s"Redis container for '$module' started on port $port") - } - } - } - - private def checkRunning: Boolean = { - val out = isRunningCmd.lazyLines.toList - val isRunning = out.exists(_ contains containerName) - wasRunning.set(isRunning) - isRunning - } - - private def startContainer(): Unit = { - val retryOnContainerFailedToRun: Throwable => Unit = { - case ex if ex.getMessage contains "Nonzero exit value: 125" => - Thread.sleep(500); start() - case ex => throw ex - } - Try(startCmd.!!).fold(retryOnContainerFailedToRun, _ => ()) - } - - def stop(): Unit = - if (!skipServer && !wasRunning.get()) { - println(s"Stopping Redis container for '$module'") - stopCmd.!! - () - } - - def forceStop(): Unit = - if (!skipServer) { - println(s"Stopping Redis container for '$module'") - stopCmd.!! - () - } -} diff --git a/modules/redis-client/src/test/scala/io/renku/redis/client/util/RedisSpec.scala b/modules/redis-client/src/test/scala/io/renku/redis/client/util/RedisSpec.scala index 57c73913..3d49065a 100644 --- a/modules/redis-client/src/test/scala/io/renku/redis/client/util/RedisSpec.scala +++ b/modules/redis-client/src/test/scala/io/renku/redis/client/util/RedisSpec.scala @@ -28,6 +28,7 @@ import dev.profunktor.redis4cats.{Redis, RedisCommands} import io.lettuce.core.RedisConnectionException import io.renku.queue.client.QueueClient import io.renku.redis.client.RedisQueueClient +import io.renku.servers.RedisServer trait RedisSpec: self: munit.Suite => @@ -66,5 +67,5 @@ trait RedisSpec: : RedisClient => Resource[IO, RedisCommands[IO, String, String]] = Redis[IO].fromClient(_, RedisCodec.Utf8) - override def munitFixtures: Seq[Fixture[Resource[IO, RedisClient]]] = + override def munitFixtures: Seq[Fixture[_]] = List(withRedisClient) diff --git a/modules/search-provision/src/main/scala/io/renku/search/provision/SearchProvisioner.scala b/modules/search-provision/src/main/scala/io/renku/search/provision/SearchProvisioner.scala new file mode 100644 index 00000000..845c6fca --- /dev/null +++ b/modules/search-provision/src/main/scala/io/renku/search/provision/SearchProvisioner.scala @@ -0,0 +1,72 @@ +/* + * Copyright 2024 Swiss Data Science Center (SDSC) + * A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and + * Eidgenössische Technische Hochschule Zürich (ETHZ). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.renku.search.provision + +import cats.effect.{Async, Resource} +import cats.syntax.all.* +import fs2.Stream +import fs2.io.net.Network +import io.renku.avro.codec.AvroReader +import io.renku.avro.codec.decoders.all.given +import io.renku.messages.ProjectCreated +import io.renku.queue.client.{Message, QueueClient, QueueName} +import io.renku.redis.client.RedisUrl +import io.renku.search.solr.client.SearchSolrClient +import io.renku.search.solr.documents.Project +import io.renku.solr.client.SolrConfig +import scribe.Scribe + +trait SearchProvisioner[F[_]]: + def provisionSolr: F[Unit] + +object SearchProvisioner: + def apply[F[_]: Async: Network: Scribe]( + queueName: QueueName, + redisUrl: RedisUrl, + solrConfig: SolrConfig + ): Resource[F, SearchProvisioner[F]] = + QueueClient[F](redisUrl) + .flatMap(qc => SearchSolrClient[F](solrConfig).tupleLeft(qc)) + .map { case (qc, sc) => new SearchProvisionerImpl[F](queueName, qc, sc) } + +private class SearchProvisionerImpl[F[_]: Async]( + queueName: QueueName, + queueClient: QueueClient[F], + solrClient: SearchSolrClient[F] +) extends SearchProvisioner[F]: + + override def provisionSolr: F[Unit] = + queueClient + .acquireEventsStream(queueName, chunkSize = 1, maybeOffset = None) + .map(decodeEvent) + .flatMap(decoded => Stream.emits[F, ProjectCreated](decoded)) + .evalMap(pushToSolr) + .compile + .drain + + private val avro = AvroReader(ProjectCreated.SCHEMA$) + + private def decodeEvent(message: Message): Seq[ProjectCreated] = + avro.read[ProjectCreated](message.payload) + + private def pushToSolr(pc: ProjectCreated): F[Unit] = + solrClient + .insertProject( + Project(id = pc.id, name = pc.name, description = pc.description) + ) diff --git a/modules/search-provision/src/test/scala/io/renku/search/provision/SearchProvisionSpec.scala b/modules/search-provision/src/test/scala/io/renku/search/provision/SearchProvisionSpec.scala deleted file mode 100644 index c9e3adf7..00000000 --- a/modules/search-provision/src/test/scala/io/renku/search/provision/SearchProvisionSpec.scala +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Copyright 2024 Swiss Data Science Center (SDSC) - * A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and - * Eidgenössische Technische Hochschule Zürich (ETHZ). - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.renku.search.provision - -import cats.effect.{Clock, IO} -import fs2.* -import fs2.concurrent.SignallingRef -import io.renku.avro.codec.AvroIO -import io.renku.messages.ProjectCreated -import io.renku.avro.codec.encoders.all.given -import io.renku.avro.codec.decoders.all.given -import io.renku.redis.client.RedisClientGenerators -import io.renku.redis.client.RedisClientGenerators.* -import io.renku.redis.client.util.RedisSpec -import munit.CatsEffectSuite - -import java.time.temporal.ChronoUnit - -class SearchProvisionSpec extends CatsEffectSuite with RedisSpec: - - val avro = AvroIO(ProjectCreated.SCHEMA$) - - test("can enqueue and dequeue events"): - withRedisClient.asQueueClient().use { client => - val queue = RedisClientGenerators.queueNameGen.generateOne - for - dequeued <- SignallingRef.of[IO, List[ProjectCreated]](Nil) - - now <- Clock[IO].realTimeInstant.map(_.truncatedTo(ChronoUnit.MILLIS)) - - message1 = ProjectCreated("my project", "my description", Some("myself"), now) - _ <- client.enqueue(queue, avro.write[ProjectCreated](Seq(message1))) - - streamingProcFiber <- client - .acquireEventsStream(queue, chunkSize = 1, maybeOffset = None) - .evalTap(m => IO.println(avro.read[ProjectCreated](m.payload))) - .evalMap(event => - dequeued.update(avro.read[ProjectCreated](event.payload).toList ::: _) - ) - .compile - .drain - .start - _ <- dequeued.waitUntil(_ == List(message1)) - - message2 = message1.copy(name = "my other project") - _ <- client.enqueue(queue, avro.write(Seq(message2))) - _ <- dequeued.waitUntil(_.toSet == Set(message1, message2)) - - _ <- streamingProcFiber.cancel - yield () - } diff --git a/modules/search-provision/src/test/scala/io/renku/search/provision/SearchProvisionerSpec.scala b/modules/search-provision/src/test/scala/io/renku/search/provision/SearchProvisionerSpec.scala new file mode 100644 index 00000000..d559eb97 --- /dev/null +++ b/modules/search-provision/src/test/scala/io/renku/search/provision/SearchProvisionerSpec.scala @@ -0,0 +1,88 @@ +/* + * Copyright 2024 Swiss Data Science Center (SDSC) + * A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and + * Eidgenössische Technische Hochschule Zürich (ETHZ). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.renku.search.provision + +import cats.effect.{Clock, IO, Resource} +import cats.syntax.all.* +import fs2.Stream +import fs2.concurrent.SignallingRef +import io.renku.avro.codec.AvroIO +import io.renku.avro.codec.encoders.all.given +import io.renku.messages.ProjectCreated +import io.renku.redis.client.RedisClientGenerators +import io.renku.redis.client.RedisClientGenerators.* +import io.renku.redis.client.util.RedisSpec +import io.renku.search.solr.client.SearchSolrSpec +import io.renku.search.solr.documents.Project +import munit.CatsEffectSuite + +import java.time.temporal.ChronoUnit +import scala.concurrent.duration.* + +class SearchProvisionerSpec extends CatsEffectSuite with RedisSpec with SearchSolrSpec: + + private val avro = AvroIO(ProjectCreated.SCHEMA$) + + test("can fetch events and send them to Solr"): + val queue = RedisClientGenerators.queueNameGen.generateOne + + (withRedisClient.asQueueClient() >>= withSearchSolrClient().tupleLeft) + .use { case (queueClient, solrClient) => + val provisioner = new SearchProvisionerImpl(queue, queueClient, solrClient) + for + solrDocs <- SignallingRef.of[IO, Set[Project]](Set.empty) + + provisioningFiber <- provisioner.provisionSolr.start + + message1 <- generateProjectCreated("project", "description", Some("myself")) + _ <- queueClient.enqueue(queue, avro.write[ProjectCreated](Seq(message1))) + + docsCollectorFiber <- + Stream + .awakeEvery[IO](500 millis) + .evalMap(_ => solrClient.findAllProjects) + .flatMap(Stream.emits(_)) + .evalTap(IO.println) + .evalMap(d => solrDocs.update(_ + d)) + .compile + .drain + .start + + _ <- solrDocs.waitUntil(_ contains toSolrDocument(message1)) + + _ <- provisioningFiber.cancel + _ <- docsCollectorFiber.cancel + yield () + } + + private def generateProjectCreated( + name: String, + description: String, + owner: Option[String] + ): IO[ProjectCreated] = + for + now <- Clock[IO].realTimeInstant.map(_.truncatedTo(ChronoUnit.MILLIS)) + uuid <- IO.randomUUID + yield ProjectCreated(uuid.toString, name, description, owner, now) + + private def toSolrDocument(created: ProjectCreated): Project = + Project(created.id, created.name, created.description) + + override def munitFixtures: Seq[Fixture[_]] = + List(withRedisClient, withSearchSolrClient) diff --git a/modules/search-solr-client/README.md b/modules/search-solr-client/README.md new file mode 100644 index 00000000..e63b74cf --- /dev/null +++ b/modules/search-solr-client/README.md @@ -0,0 +1,3 @@ +# search-solr-client + +This module brings algebras for renku-search and solr. diff --git a/modules/search-solr-client/src/main/scala/io/renku/search/solr/client/SearchSolrClient.scala b/modules/search-solr-client/src/main/scala/io/renku/search/solr/client/SearchSolrClient.scala new file mode 100644 index 00000000..2e849c64 --- /dev/null +++ b/modules/search-solr-client/src/main/scala/io/renku/search/solr/client/SearchSolrClient.scala @@ -0,0 +1,36 @@ +/* + * Copyright 2024 Swiss Data Science Center (SDSC) + * A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and + * Eidgenössische Technische Hochschule Zürich (ETHZ). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.renku.search.solr.client + +import cats.effect.{Async, Resource} +import fs2.io.net.Network +import io.renku.search.solr.documents.Project +import io.renku.solr.client.{SolrClient, SolrConfig} + +trait SearchSolrClient[F[_]]: + + def insertProject(project: Project): F[Unit] + + def findAllProjects: F[List[Project]] + +object SearchSolrClient: + def apply[F[_]: Async: Network]( + solrConfig: SolrConfig + ): Resource[F, SearchSolrClient[F]] = + SolrClient[F](solrConfig).map(new SearchSolrClientImpl[F](_)) diff --git a/modules/search-solr-client/src/main/scala/io/renku/search/solr/client/SearchSolrClientImpl.scala b/modules/search-solr-client/src/main/scala/io/renku/search/solr/client/SearchSolrClientImpl.scala new file mode 100644 index 00000000..987ef0dc --- /dev/null +++ b/modules/search-solr-client/src/main/scala/io/renku/search/solr/client/SearchSolrClientImpl.scala @@ -0,0 +1,38 @@ +/* + * Copyright 2024 Swiss Data Science Center (SDSC) + * A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and + * Eidgenössische Technische Hochschule Zürich (ETHZ). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.renku.search.solr.client + +import cats.effect.Async +import cats.syntax.all.* +import io.renku.search.solr.documents.Project +import io.renku.search.solr.schema.EntityDocumentSchema +import io.renku.solr.client.{QueryString, SolrClient} + +class SearchSolrClientImpl[F[_]: Async](solrClient: SolrClient[F]) + extends SearchSolrClient[F]: + + override def insertProject(project: Project): F[Unit] = + solrClient.insert(Seq(project)).void + + override def findAllProjects: F[List[Project]] = + solrClient + .query[Project]( + QueryString(s"${EntityDocumentSchema.Fields.entityType}:${Project.entityType}") + ) + .map(_.responseBody.docs.toList) diff --git a/modules/search-solr-client/src/main/scala/io/renku/search/solr/documents/Project.scala b/modules/search-solr-client/src/main/scala/io/renku/search/solr/documents/Project.scala new file mode 100644 index 00000000..26981c35 --- /dev/null +++ b/modules/search-solr-client/src/main/scala/io/renku/search/solr/documents/Project.scala @@ -0,0 +1,31 @@ +/* + * Copyright 2024 Swiss Data Science Center (SDSC) + * A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and + * Eidgenössische Technische Hochschule Zürich (ETHZ). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.renku.search.solr.documents + +import io.bullet.borer.derivation.MapBasedCodecs.deriveDecoder +import io.bullet.borer.{Decoder, Encoder} +import io.renku.solr.client.EncoderSupport.deriveWithDiscriminator + +final case class Project(id: String, name: String, description: String) + +object Project: + val entityType: String = "Project" + + given Encoder[Project] = deriveWithDiscriminator + given Decoder[Project] = deriveDecoder diff --git a/modules/search-solr-client/src/main/scala/io/renku/search/solr/schema/Discriminator.scala b/modules/search-solr-client/src/main/scala/io/renku/search/solr/schema/Discriminator.scala new file mode 100644 index 00000000..921add64 --- /dev/null +++ b/modules/search-solr-client/src/main/scala/io/renku/search/solr/schema/Discriminator.scala @@ -0,0 +1,27 @@ +/* + * Copyright 2024 Swiss Data Science Center (SDSC) + * A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and + * Eidgenössische Technische Hochschule Zürich (ETHZ). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.renku.search.solr.schema + +opaque type Discriminator = String +object Discriminator: + def apply(name: String): Discriminator = name + + extension (self: Discriminator) def name: String = self + + val project: Discriminator = "project" diff --git a/modules/search-solr-client/src/main/scala/io/renku/search/solr/schema/EntityDocumentSchema.scala b/modules/search-solr-client/src/main/scala/io/renku/search/solr/schema/EntityDocumentSchema.scala new file mode 100644 index 00000000..0c6e1aff --- /dev/null +++ b/modules/search-solr-client/src/main/scala/io/renku/search/solr/schema/EntityDocumentSchema.scala @@ -0,0 +1,38 @@ +/* + * Copyright 2024 Swiss Data Science Center (SDSC) + * A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and + * Eidgenössische Technische Hochschule Zürich (ETHZ). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.renku.search.solr.schema + +import io.renku.solr.client.schema.* + +object EntityDocumentSchema: + + object Fields: + val id: FieldName = FieldName("id") + val entityType: FieldName = FieldName("_type") + val name: FieldName = FieldName("name") + val description: FieldName = FieldName("description") + + val initialEntityDocumentAdd: Seq[SchemaCommand] = Seq( + SchemaCommand.Add(FieldType.str(TypeName("entityType"))), + SchemaCommand.Add(FieldType.str(TypeName("name"))), + SchemaCommand.Add(FieldType.text(TypeName("description"), Analyzer.classic)), + SchemaCommand.Add(Field(Fields.entityType, TypeName("entityType"))), + SchemaCommand.Add(Field(Fields.name, TypeName("name"))), + SchemaCommand.Add(Field(Fields.description, TypeName("description"))) + ) diff --git a/modules/search-solr-client/src/main/scala/io/renku/search/solr/schema/Migrations.scala b/modules/search-solr-client/src/main/scala/io/renku/search/solr/schema/Migrations.scala new file mode 100644 index 00000000..71d0984e --- /dev/null +++ b/modules/search-solr-client/src/main/scala/io/renku/search/solr/schema/Migrations.scala @@ -0,0 +1,28 @@ +/* + * Copyright 2024 Swiss Data Science Center (SDSC) + * A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and + * Eidgenössische Technische Hochschule Zürich (ETHZ). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.renku.search.solr.schema + +import io.renku.solr.client.migration.SchemaMigration + +object Migrations { + + val all: Seq[SchemaMigration] = Seq( + SchemaMigration(version = 1L, EntityDocumentSchema.initialEntityDocumentAdd) + ) +} diff --git a/modules/search-solr-client/src/test/scala/io/renku/search/solr/client/SearchSolrClientGenerators.scala b/modules/search-solr-client/src/test/scala/io/renku/search/solr/client/SearchSolrClientGenerators.scala new file mode 100644 index 00000000..04828969 --- /dev/null +++ b/modules/search-solr-client/src/test/scala/io/renku/search/solr/client/SearchSolrClientGenerators.scala @@ -0,0 +1,31 @@ +/* + * Copyright 2024 Swiss Data Science Center (SDSC) + * A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and + * Eidgenössische Technische Hochschule Zürich (ETHZ). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.renku.search.solr.client + +import io.renku.search.solr.documents.Project +import org.scalacheck.Gen + +object SearchSolrClientGenerators: + + def projectDocumentGen(name: String, desc: String): Gen[Project] = + Gen.uuid.map(uuid => + Project(uuid.toString, "solr-project", "solr project description") + ) + + extension [V](gen: Gen[V]) def generateOne: V = gen.sample.getOrElse(generateOne) diff --git a/modules/search-solr-client/src/test/scala/io/renku/search/solr/client/SearchSolrClientSpec.scala b/modules/search-solr-client/src/test/scala/io/renku/search/solr/client/SearchSolrClientSpec.scala new file mode 100644 index 00000000..88318eb8 --- /dev/null +++ b/modules/search-solr-client/src/test/scala/io/renku/search/solr/client/SearchSolrClientSpec.scala @@ -0,0 +1,36 @@ +/* + * Copyright 2024 Swiss Data Science Center (SDSC) + * A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and + * Eidgenössische Technische Hochschule Zürich (ETHZ). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.renku.search.solr.client + +import cats.effect.IO +import io.renku.search.solr.client.SearchSolrClientGenerators.* +import munit.CatsEffectSuite + +class SearchSolrClientSpec extends CatsEffectSuite with SearchSolrSpec: + + test("be able to insert and fetch a project document"): + withSearchSolrClient().use { client => + val project = + projectDocumentGen("solr-project", "solr project description").generateOne + for { + _ <- client.insertProject(project) + r <- client.findAllProjects + _ = assert(r contains project) + } yield () + } diff --git a/modules/search-solr-client/src/test/scala/io/renku/search/solr/client/SearchSolrSpec.scala b/modules/search-solr-client/src/test/scala/io/renku/search/solr/client/SearchSolrSpec.scala new file mode 100644 index 00000000..9d59f23a --- /dev/null +++ b/modules/search-solr-client/src/test/scala/io/renku/search/solr/client/SearchSolrSpec.scala @@ -0,0 +1,45 @@ +/* + * Copyright 2024 Swiss Data Science Center (SDSC) + * A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and + * Eidgenössische Technische Hochschule Zürich (ETHZ). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.renku.search.solr.client + +import cats.effect.{IO, Resource} +import io.renku.search.solr.schema.Migrations +import io.renku.solr.client.SolrClient +import io.renku.solr.client.migration.SchemaMigrator +import io.renku.solr.client.util.SolrSpec + +trait SearchSolrSpec extends SolrSpec: + self: munit.Suite => + + val withSearchSolrClient: Fixture[Resource[IO, SearchSolrClient[IO]]] = + new Fixture[Resource[IO, SearchSolrClient[IO]]]("search-solr"): + + def apply(): Resource[IO, SearchSolrClient[IO]] = + SolrClient[IO](solrConfig.copy(core = server.searchCoreName)) + .evalTap(SchemaMigrator[IO](_).migrate(Migrations.all).attempt.void) + .map(new SearchSolrClientImpl[IO](_)) + + override def beforeAll(): Unit = + server.start() + + override def afterAll(): Unit = + server.stop() + + override def munitFixtures: Seq[Fixture[_]] = + List(withSearchSolrClient) diff --git a/modules/solr-client/README.md b/modules/solr-client/README.md new file mode 100644 index 00000000..f5b52466 --- /dev/null +++ b/modules/solr-client/README.md @@ -0,0 +1,3 @@ +# solr-client + +This module brings tooling to work with Solr. diff --git a/modules/solr-client/src/main/scala/io/renku/solr/client/DeleteRequest.scala b/modules/solr-client/src/main/scala/io/renku/solr/client/DeleteRequest.scala new file mode 100644 index 00000000..0ecd8522 --- /dev/null +++ b/modules/solr-client/src/main/scala/io/renku/solr/client/DeleteRequest.scala @@ -0,0 +1,32 @@ +/* + * Copyright 2024 Swiss Data Science Center (SDSC) + * A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and + * Eidgenössische Technische Hochschule Zürich (ETHZ). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.renku.solr.client + +import io.bullet.borer.derivation.MapBasedCodecs.deriveEncoder +import io.bullet.borer.{Encoder, Writer} + +final private[client] case class DeleteRequest(query: String) + +private[client] object DeleteRequest: + given Encoder[DeleteRequest] = { + val e: Encoder[DeleteRequest] = deriveEncoder[DeleteRequest] + new Encoder[DeleteRequest]: + override def write(w: Writer, value: DeleteRequest) = + w.writeMap(Map("delete" -> value))(Encoder[String], e) + } diff --git a/modules/solr-client/src/main/scala/io/renku/solr/client/EncoderSupport.scala b/modules/solr-client/src/main/scala/io/renku/solr/client/EncoderSupport.scala new file mode 100644 index 00000000..832a599d --- /dev/null +++ b/modules/solr-client/src/main/scala/io/renku/solr/client/EncoderSupport.scala @@ -0,0 +1,62 @@ +/* + * Copyright 2024 Swiss Data Science Center (SDSC) + * A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and + * Eidgenössische Technische Hochschule Zürich (ETHZ). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.renku.solr.client + +import io.bullet.borer.{Encoder, Writer} +import scala.deriving.* +import scala.compiletime.* + +object EncoderSupport { + + inline def deriveWithDiscriminator[A <: Product](using + Mirror.ProductOf[A] + ): Encoder[A] = + Macros.createEncoder[String, A]("_type") + + private object Macros { + + final inline def createEncoder[K: Encoder, T <: Product](discriminatorName: K)(using + m: Mirror.ProductOf[T] + ): Encoder[T] = + val names = summonLabels[m.MirroredElemLabels] + val encoders = summonEncoder[m.MirroredElemTypes] + + new Encoder[T]: + def write(w: Writer, value: T): Writer = + val kind = value.asInstanceOf[Product].productPrefix + val values = value.asInstanceOf[Product].productIterator.toList + w.writeMapOpen(names.size + 1) + w.writeMapMember(discriminatorName, kind) + names.zip(values).zip(encoders).foreach { case ((k, v), e) => + w.writeMapMember(k, v)(Encoder[String], e.asInstanceOf[Encoder[Any]]) + } + w.writeMapClose() + + inline def summonEncoder[A <: Tuple]: List[Encoder[_]] = + inline erasedValue[A] match + case _: EmptyTuple => Nil + case _: (t *: ts) => summonInline[Encoder[t]] :: summonEncoder[ts] + + inline def summonLabels[A <: Tuple]: List[String] = + inline erasedValue[A] match + case _: EmptyTuple => Nil + case _: (t *: ts) => constValue[t].toString :: summonLabels[ts] + } + +} diff --git a/modules/solr-client/src/main/scala/io/renku/solr/client/InsertResponse.scala b/modules/solr-client/src/main/scala/io/renku/solr/client/InsertResponse.scala new file mode 100644 index 00000000..88f46e50 --- /dev/null +++ b/modules/solr-client/src/main/scala/io/renku/solr/client/InsertResponse.scala @@ -0,0 +1,27 @@ +/* + * Copyright 2024 Swiss Data Science Center (SDSC) + * A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and + * Eidgenössische Technische Hochschule Zürich (ETHZ). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.renku.solr.client + +import io.bullet.borer.Decoder +import io.bullet.borer.derivation.MapBasedCodecs.deriveDecoder + +final case class InsertResponse(responseHeader: ResponseHeader) + +object InsertResponse: + given Decoder[InsertResponse] = deriveDecoder diff --git a/modules/solr-client/src/main/scala/io/renku/solr/client/QueryData.scala b/modules/solr-client/src/main/scala/io/renku/solr/client/QueryData.scala new file mode 100644 index 00000000..73cb1ae7 --- /dev/null +++ b/modules/solr-client/src/main/scala/io/renku/solr/client/QueryData.scala @@ -0,0 +1,48 @@ +/* + * Copyright 2024 Swiss Data Science Center (SDSC) + * A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and + * Eidgenössische Technische Hochschule Zürich (ETHZ). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.renku.solr.client + +import io.bullet.borer.Encoder +import io.bullet.borer.derivation.MapBasedCodecs.deriveEncoder +import io.renku.solr.client.schema.FieldName + +final case class QueryData( + query: String, + filter: Seq[String], + limit: Int, + offset: Int, + fields: Seq[FieldName], + params: Map[String, String] +): + def nextPage: QueryData = + copy(offset = offset + limit) + + def withHighLight(fields: List[FieldName], pre: String, post: String): QueryData = + copy(params = + params ++ Map( + "hl" -> "on", + "hl.requireFieldMatch" -> "true", + "hl.fl" -> fields.map(_.name).mkString(","), + "hl.simple.pre" -> pre, + "hl.simple.post" -> post + ) + ) + +object QueryData: + given Encoder[QueryData] = deriveEncoder diff --git a/modules/solr-client/src/main/scala/io/renku/solr/client/QueryResponse.scala b/modules/solr-client/src/main/scala/io/renku/solr/client/QueryResponse.scala new file mode 100644 index 00000000..e72b70d6 --- /dev/null +++ b/modules/solr-client/src/main/scala/io/renku/solr/client/QueryResponse.scala @@ -0,0 +1,32 @@ +/* + * Copyright 2024 Swiss Data Science Center (SDSC) + * A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and + * Eidgenössische Technische Hochschule Zürich (ETHZ). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.renku.solr.client + +import io.bullet.borer.Decoder +import io.bullet.borer.derivation.MapBasedCodecs.deriveDecoder +import io.bullet.borer.derivation.key + +final case class QueryResponse[A]( + responseHeader: ResponseHeader, + @key("response") responseBody: ResponseBody[A] +) + +object QueryResponse: + given [A](using Decoder[A]): Decoder[QueryResponse[A]] = + deriveDecoder diff --git a/modules/solr-client/src/main/scala/io/renku/solr/client/QueryString.scala b/modules/solr-client/src/main/scala/io/renku/solr/client/QueryString.scala new file mode 100644 index 00000000..b4109cbd --- /dev/null +++ b/modules/solr-client/src/main/scala/io/renku/solr/client/QueryString.scala @@ -0,0 +1,24 @@ +/* + * Copyright 2024 Swiss Data Science Center (SDSC) + * A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and + * Eidgenössische Technische Hochschule Zürich (ETHZ). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.renku.solr.client + +final case class QueryString(q: String, limit: Int, offset: Int) + +object QueryString: + def apply(q: String): QueryString = QueryString(q, 50, 0) diff --git a/modules/solr-client/src/main/scala/io/renku/solr/client/ResponseBody.scala b/modules/solr-client/src/main/scala/io/renku/solr/client/ResponseBody.scala new file mode 100644 index 00000000..79e99a18 --- /dev/null +++ b/modules/solr-client/src/main/scala/io/renku/solr/client/ResponseBody.scala @@ -0,0 +1,32 @@ +/* + * Copyright 2024 Swiss Data Science Center (SDSC) + * A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and + * Eidgenössische Technische Hochschule Zürich (ETHZ). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.renku.solr.client + +import io.bullet.borer.Decoder +import io.bullet.borer.derivation.MapBasedCodecs.deriveDecoder + +final case class ResponseBody[A]( + numFound: Long, + start: Long, + numFoundExact: Boolean, + docs: Seq[A] +) + +object ResponseBody: + given [A](using Decoder[A]): Decoder[ResponseBody[A]] = deriveDecoder diff --git a/modules/solr-client/src/main/scala/io/renku/solr/client/ResponseHeader.scala b/modules/solr-client/src/main/scala/io/renku/solr/client/ResponseHeader.scala new file mode 100644 index 00000000..2ad6225c --- /dev/null +++ b/modules/solr-client/src/main/scala/io/renku/solr/client/ResponseHeader.scala @@ -0,0 +1,32 @@ +/* + * Copyright 2024 Swiss Data Science Center (SDSC) + * A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and + * Eidgenössische Technische Hochschule Zürich (ETHZ). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.renku.solr.client + +import io.bullet.borer.Decoder +import io.bullet.borer.derivation.MapBasedCodecs.deriveDecoder +import io.bullet.borer.derivation.key + +final case class ResponseHeader( + status: Int, + @key("QTime") queryTime: Long, + params: Map[String, String] = Map() +) + +object ResponseHeader: + given Decoder[ResponseHeader] = deriveDecoder diff --git a/modules/solr-client/src/main/scala/io/renku/solr/client/SolrClient.scala b/modules/solr-client/src/main/scala/io/renku/solr/client/SolrClient.scala new file mode 100644 index 00000000..33a7faea --- /dev/null +++ b/modules/solr-client/src/main/scala/io/renku/solr/client/SolrClient.scala @@ -0,0 +1,47 @@ +/* + * Copyright 2024 Swiss Data Science Center (SDSC) + * A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and + * Eidgenössische Technische Hochschule Zürich (ETHZ). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.renku.solr.client + +import cats.effect.{Async, Resource} +import fs2.io.net.Network +import io.bullet.borer.{Decoder, Encoder} +import io.renku.search.http.{ClientBuilder, ResponseLogging, RetryConfig} +import io.renku.solr.client.schema.SchemaCommand +import org.http4s.ember.client.EmberClientBuilder +import org.http4s.ember.client.EmberClientBuilder.default + +trait SolrClient[F[_]]: + def modifySchema( + cmds: Seq[SchemaCommand], + onErrorLog: ResponseLogging = ResponseLogging.Error + ): F[Unit] + + def query[A: Decoder](q: QueryString): F[QueryResponse[A]] + + def delete(q: QueryString): F[Unit] + + def insert[A: Encoder](docs: Seq[A]): F[InsertResponse] + +object SolrClient: + def apply[F[_]: Async: Network](config: SolrConfig): Resource[F, SolrClient[F]] = + ClientBuilder(EmberClientBuilder.default[F]) + .withDefaultRetry(RetryConfig.default) + .withLogging(logBody = config.logMessageBodies, scribe.cats.effect[F]) + .build + .map(new SolrClientImpl[F](config, _)) diff --git a/modules/solr-client/src/main/scala/io/renku/solr/client/SolrClientImpl.scala b/modules/solr-client/src/main/scala/io/renku/solr/client/SolrClientImpl.scala new file mode 100644 index 00000000..d0bac305 --- /dev/null +++ b/modules/solr-client/src/main/scala/io/renku/solr/client/SolrClientImpl.scala @@ -0,0 +1,73 @@ +/* + * Copyright 2024 Swiss Data Science Center (SDSC) + * A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and + * Eidgenössische Technische Hochschule Zürich (ETHZ). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.renku.solr.client + +import cats.effect.Async +import cats.syntax.all.* +import io.bullet.borer.{Decoder, Encoder} +import io.renku.search.http.borer.BorerEntityJsonCodec +import io.renku.search.http.{HttpClientDsl, ResponseLogging} +import io.renku.solr.client.schema.{SchemaCommand, SchemaJsonCodec} +import org.http4s.client.Client +import org.http4s.{Method, Uri} + +import scala.concurrent.duration.Duration + +private class SolrClientImpl[F[_]: Async](config: SolrConfig, underlying: Client[F]) + extends SolrClient[F] + with HttpClientDsl[F] + with SchemaJsonCodec + with BorerEntityJsonCodec + with SolrEntityCodec: + private[this] val logger = scribe.cats.effect[F] + private[this] val solrUrl: Uri = config.baseUrl / config.core + + def modifySchema(cmds: Seq[SchemaCommand], onErrorLog: ResponseLogging): F[Unit] = + val req = Method.POST(cmds, (solrUrl / "schema").withQueryParam("commit", "true")) + underlying.expectOr[String](req)(onErrorLog(logger, req)).void + + def query[A: Decoder](q: QueryString): F[QueryResponse[A]] = + val req = Method.POST( + io.renku.solr.client.QueryData(q.q, Nil, q.limit, q.offset, Nil, Map.empty), + solrUrl / "query" + ) + underlying + .expectOr[QueryResponse[A]](req)(ResponseLogging.Error(logger, req)) + .flatTap(r => logger.trace(s"Query response: $r")) + + def delete(q: QueryString): F[Unit] = + val req = Method.POST(DeleteRequest(q.q), makeUpdateUrl) + underlying + .expectOr[InsertResponse](req)(ResponseLogging.Error(logger, req)) + .flatTap(r => logger.trace(s"Solr delete response: $r")) + .void + + def insert[A: Encoder](docs: Seq[A]): F[InsertResponse] = + val req = Method.POST(docs, makeUpdateUrl) + underlying + .expectOr[InsertResponse](req)(ResponseLogging.Error(logger, req)) + .flatTap(r => logger.trace(s"Solr inserted response: $r")) + + private def makeUpdateUrl = { + val base = solrUrl / "update" + config.commitWithin match + case Some(d) if d == Duration.Zero => base.withQueryParam("commit", "true") + case Some(d) => base.withQueryParam("commitWithin", d.toMillis) + case None => base + } diff --git a/modules/solr-client/src/main/scala/io/renku/solr/client/SolrConfig.scala b/modules/solr-client/src/main/scala/io/renku/solr/client/SolrConfig.scala new file mode 100644 index 00000000..02e48169 --- /dev/null +++ b/modules/solr-client/src/main/scala/io/renku/solr/client/SolrConfig.scala @@ -0,0 +1,30 @@ +/* + * Copyright 2024 Swiss Data Science Center (SDSC) + * A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and + * Eidgenössische Technische Hochschule Zürich (ETHZ). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.renku.solr.client + +import org.http4s.Uri + +import scala.concurrent.duration.FiniteDuration + +final case class SolrConfig( + baseUrl: Uri, + core: String, + commitWithin: Option[FiniteDuration], + logMessageBodies: Boolean +) diff --git a/modules/solr-client/src/main/scala/io/renku/solr/client/SolrEntityCodec.scala b/modules/solr-client/src/main/scala/io/renku/solr/client/SolrEntityCodec.scala new file mode 100644 index 00000000..3fc77b7b --- /dev/null +++ b/modules/solr-client/src/main/scala/io/renku/solr/client/SolrEntityCodec.scala @@ -0,0 +1,29 @@ +/* + * Copyright 2024 Swiss Data Science Center (SDSC) + * A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and + * Eidgenössische Technische Hochschule Zürich (ETHZ). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.renku.solr.client + +import cats.effect.Concurrent +import org.http4s.EntityDecoder + +trait SolrEntityCodec { + given [F[_]: Concurrent]: EntityDecoder[F, String] = + EntityDecoder.text +} + +object SolrEntityCodec extends SolrEntityCodec diff --git a/modules/solr-client/src/main/scala/io/renku/solr/client/migration/SchemaMigration.scala b/modules/solr-client/src/main/scala/io/renku/solr/client/migration/SchemaMigration.scala new file mode 100644 index 00000000..06cac3d5 --- /dev/null +++ b/modules/solr-client/src/main/scala/io/renku/solr/client/migration/SchemaMigration.scala @@ -0,0 +1,30 @@ +/* + * Copyright 2024 Swiss Data Science Center (SDSC) + * A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and + * Eidgenössische Technische Hochschule Zürich (ETHZ). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.renku.solr.client.migration + +import io.renku.solr.client.schema.SchemaCommand + +final case class SchemaMigration( + version: Long, + commands: Seq[SchemaCommand] +) + +object SchemaMigration: + def apply(version: Long, cmd: SchemaCommand, more: SchemaCommand*): SchemaMigration = + SchemaMigration(version, cmd +: more) diff --git a/modules/solr-client/src/main/scala/io/renku/solr/client/migration/SchemaMigrator.scala b/modules/solr-client/src/main/scala/io/renku/solr/client/migration/SchemaMigrator.scala new file mode 100644 index 00000000..2b56a19a --- /dev/null +++ b/modules/solr-client/src/main/scala/io/renku/solr/client/migration/SchemaMigrator.scala @@ -0,0 +1,73 @@ +/* + * Copyright 2024 Swiss Data Science Center (SDSC) + * A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and + * Eidgenössische Technische Hochschule Zürich (ETHZ). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.renku.solr.client.migration + +import cats.effect.Sync +import cats.syntax.all.* +import io.renku.solr.client.schema.{Field, FieldName, SchemaCommand, TypeName} +import io.renku.solr.client.{QueryString, SolrClient} + +trait SchemaMigrator[F[_]] { + + def currentVersion: F[Option[Long]] + + def migrate(migrations: Seq[SchemaMigration]): F[Unit] +} + +object SchemaMigrator: + def apply[F[_]: Sync](client: SolrClient[F]): SchemaMigrator[F] = Impl[F](client) + + private class Impl[F[_]: Sync](client: SolrClient[F]) extends SchemaMigrator[F] { + private[this] val logger = scribe.cats.effect[F] + private[this] val versionDocId = "VERSION_ID_EB779C6B-1D96-47CB-B304-BECF15E4A607" + private[this] val versionTypeName: TypeName = TypeName("plong") + + override def currentVersion: F[Option[Long]] = + client + .query[VersionDocument](QueryString(s"id:$versionDocId")) + .map(_.responseBody.docs.headOption.map(_.currentSchemaVersion)) + + override def migrate(migrations: Seq[SchemaMigration]): F[Unit] = for { + current <- currentVersion + _ <- current.fold(initVersionDocument)(_ => ().pure[F]) + remain = migrations.sortBy(_.version).dropWhile(m => current.exists(_ >= m.version)) + _ <- remain.traverse_(m => + client.modifySchema(m.commands) >> upsertVersion(m.version) + ) + } yield () + + private def initVersionDocument: F[Unit] = + logger.info("Initialize schema migration version document") >> + client.modifySchema( + Seq( + SchemaCommand.Add( + Field( + FieldName("currentSchemaVersion"), + versionTypeName + ) + ) + ) + ) + + private def version(n: Long): VersionDocument = VersionDocument(versionDocId, n) + + private def upsertVersion(n: Long) = + logger.info(s"Set schema migration version to $n") >> + client.insert(Seq(version(n))) + } diff --git a/modules/solr-client/src/main/scala/io/renku/solr/client/migration/VersionDocument.scala b/modules/solr-client/src/main/scala/io/renku/solr/client/migration/VersionDocument.scala new file mode 100644 index 00000000..10f5c48b --- /dev/null +++ b/modules/solr-client/src/main/scala/io/renku/solr/client/migration/VersionDocument.scala @@ -0,0 +1,28 @@ +/* + * Copyright 2024 Swiss Data Science Center (SDSC) + * A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and + * Eidgenössische Technische Hochschule Zürich (ETHZ). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.renku.solr.client.migration + +import io.bullet.borer.{Decoder, Encoder} +import io.bullet.borer.derivation.MapBasedCodecs.{deriveDecoder, deriveEncoder} + +final private[client] case class VersionDocument(id: String, currentSchemaVersion: Long) + +private[client] object VersionDocument: + given Encoder[VersionDocument] = deriveEncoder + given Decoder[VersionDocument] = deriveDecoder diff --git a/modules/solr-client/src/main/scala/io/renku/solr/client/schema/Analyzer.scala b/modules/solr-client/src/main/scala/io/renku/solr/client/schema/Analyzer.scala new file mode 100644 index 00000000..eec6edc0 --- /dev/null +++ b/modules/solr-client/src/main/scala/io/renku/solr/client/schema/Analyzer.scala @@ -0,0 +1,39 @@ +/* + * Copyright 2024 Swiss Data Science Center (SDSC) + * A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and + * Eidgenössische Technische Hochschule Zürich (ETHZ). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.renku.solr.client.schema + +// see https://solr.apache.org/guide/solr/latest/indexing-guide/analyzers.html + +final case class Analyzer( + tokenizer: Tokenizer, + `type`: Analyzer.AnalyzerType = Analyzer.AnalyzerType.None, + filter: Seq[Filter] = Nil +) + +object Analyzer: + enum AnalyzerType: + case Index + case Multiterm + case Query + case None + + def index(tokenizer: Tokenizer, filters: Filter*): Analyzer = + Analyzer(tokenizer, AnalyzerType.Index, filters) + + val classic: Analyzer = Analyzer(Tokenizer.classic, filter = List(Filter.classic)) diff --git a/modules/solr-client/src/main/scala/io/renku/solr/client/schema/CopyFieldRule.scala b/modules/solr-client/src/main/scala/io/renku/solr/client/schema/CopyFieldRule.scala new file mode 100644 index 00000000..367a7201 --- /dev/null +++ b/modules/solr-client/src/main/scala/io/renku/solr/client/schema/CopyFieldRule.scala @@ -0,0 +1,25 @@ +/* + * Copyright 2024 Swiss Data Science Center (SDSC) + * A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and + * Eidgenössische Technische Hochschule Zürich (ETHZ). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.renku.solr.client.schema + +final case class CopyFieldRule( + source: FieldName, + dest: FieldName, + maxChars: Option[Int] = None +) diff --git a/modules/solr-client/src/main/scala/io/renku/solr/client/schema/DynamicFieldRule.scala b/modules/solr-client/src/main/scala/io/renku/solr/client/schema/DynamicFieldRule.scala new file mode 100644 index 00000000..007362a2 --- /dev/null +++ b/modules/solr-client/src/main/scala/io/renku/solr/client/schema/DynamicFieldRule.scala @@ -0,0 +1,30 @@ +/* + * Copyright 2024 Swiss Data Science Center (SDSC) + * A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and + * Eidgenössische Technische Hochschule Zürich (ETHZ). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.renku.solr.client.schema + +final case class DynamicFieldRule( + name: FieldName, + `type`: TypeName, + required: Boolean = false, + indexed: Boolean = true, + stored: Boolean = true, + multiValued: Boolean = false, + uninvertible: Boolean = false, + docValues: Boolean = false +) diff --git a/modules/solr-client/src/main/scala/io/renku/solr/client/schema/Field.scala b/modules/solr-client/src/main/scala/io/renku/solr/client/schema/Field.scala new file mode 100644 index 00000000..105b166e --- /dev/null +++ b/modules/solr-client/src/main/scala/io/renku/solr/client/schema/Field.scala @@ -0,0 +1,49 @@ +/* + * Copyright 2024 Swiss Data Science Center (SDSC) + * A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and + * Eidgenössische Technische Hochschule Zürich (ETHZ). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.renku.solr.client.schema + +import io.bullet.borer.Encoder +import io.bullet.borer.derivation.key +import io.bullet.borer.derivation.MapBasedCodecs.deriveEncoder + +final case class Field( + name: FieldName, + @key("type") typeName: TypeName, + required: Boolean, + indexed: Boolean, + stored: Boolean, + multiValued: Boolean, + uninvertible: Boolean, + docValues: Boolean +) + +object Field: + def apply(name: FieldName, typeName: TypeName): Field = + Field( + name = name, + typeName = typeName, + required = false, + indexed = true, + stored = true, + multiValued = false, + uninvertible = true, + docValues = false + ) + + given Encoder[Field] = deriveEncoder diff --git a/modules/solr-client/src/main/scala/io/renku/solr/client/schema/FieldName.scala b/modules/solr-client/src/main/scala/io/renku/solr/client/schema/FieldName.scala new file mode 100644 index 00000000..6cb8f02a --- /dev/null +++ b/modules/solr-client/src/main/scala/io/renku/solr/client/schema/FieldName.scala @@ -0,0 +1,29 @@ +/* + * Copyright 2024 Swiss Data Science Center (SDSC) + * A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and + * Eidgenössische Technische Hochschule Zürich (ETHZ). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.renku.solr.client.schema + +import io.bullet.borer.Encoder + +opaque type FieldName = String +object FieldName: + def apply(name: String): FieldName = name + + extension (self: FieldName) def name: String = self + + given Encoder[FieldName] = Encoder.forString diff --git a/modules/solr-client/src/main/scala/io/renku/solr/client/schema/FieldType.scala b/modules/solr-client/src/main/scala/io/renku/solr/client/schema/FieldType.scala new file mode 100644 index 00000000..3ea13eec --- /dev/null +++ b/modules/solr-client/src/main/scala/io/renku/solr/client/schema/FieldType.scala @@ -0,0 +1,52 @@ +/* + * Copyright 2024 Swiss Data Science Center (SDSC) + * A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and + * Eidgenössische Technische Hochschule Zürich (ETHZ). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.renku.solr.client.schema + +final case class FieldType( + name: TypeName, + `class`: FieldTypeClass, + analyzer: Option[Analyzer] = None, + required: Boolean = false, + indexed: Boolean = true, + stored: Boolean = true, + multiValued: Boolean = false, + uninvertible: Boolean = false, + docValues: Boolean = false, + sortMissingLast: Boolean = true +) + +object FieldType: + + def text(name: TypeName, analyzer: Analyzer): FieldType = + FieldType(name, FieldTypeClass.Defaults.textField, analyzer = Some(analyzer)) + + def str(name: TypeName): FieldType = + FieldType(name, FieldTypeClass.Defaults.strField) + + def int(name: TypeName): FieldType = + FieldType(name, FieldTypeClass.Defaults.intPointField) + + def long(name: TypeName): FieldType = + FieldType(name, FieldTypeClass.Defaults.longPointField) + + def double(name: TypeName): FieldType = + FieldType(name, FieldTypeClass.Defaults.doublePointField) + + def dateTime(name: TypeName): FieldType = + FieldType(name, FieldTypeClass.Defaults.dateRangeField) diff --git a/modules/solr-client/src/main/scala/io/renku/solr/client/schema/FieldTypeClass.scala b/modules/solr-client/src/main/scala/io/renku/solr/client/schema/FieldTypeClass.scala new file mode 100644 index 00000000..62b12dac --- /dev/null +++ b/modules/solr-client/src/main/scala/io/renku/solr/client/schema/FieldTypeClass.scala @@ -0,0 +1,43 @@ +/* + * Copyright 2024 Swiss Data Science Center (SDSC) + * A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and + * Eidgenössische Technische Hochschule Zürich (ETHZ). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.renku.solr.client.schema + +import io.bullet.borer.Encoder + +opaque type FieldTypeClass = String + +object FieldTypeClass: + def apply(name: String): FieldTypeClass = name + + extension (self: FieldTypeClass) def name: String = self + + object Defaults: + val intPointField: FieldTypeClass = "IntPointField" + val longPointField: FieldTypeClass = "LongPointField" + val floatPointField: FieldTypeClass = "FloatPointField" + val doublePointField: FieldTypeClass = "DoublePointField" + val textField: FieldTypeClass = "TextField" + val strField: FieldTypeClass = "StrField" + val uuidField: FieldTypeClass = "UUIDField" + val rankField: FieldTypeClass = "RankField" + val dateRangeField: FieldTypeClass = "DateRangeField" + val boolField: FieldTypeClass = "BoolField" + val binaryField: FieldTypeClass = "BinaryField" + + given Encoder[FieldTypeClass] = Encoder.forString diff --git a/modules/solr-client/src/main/scala/io/renku/solr/client/schema/Filter.scala b/modules/solr-client/src/main/scala/io/renku/solr/client/schema/Filter.scala new file mode 100644 index 00000000..741f137e --- /dev/null +++ b/modules/solr-client/src/main/scala/io/renku/solr/client/schema/Filter.scala @@ -0,0 +1,31 @@ +/* + * Copyright 2024 Swiss Data Science Center (SDSC) + * A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and + * Eidgenössische Technische Hochschule Zürich (ETHZ). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.renku.solr.client.schema + +// see https://solr.apache.org/guide/solr/latest/indexing-guide/filters.html + +final case class Filter(name: String) + +object Filter: + val lowercase: Filter = Filter("lowercase") + val stop: Filter = Filter("stop") + val englishPorter: Filter = Filter("englishPorter") + val classic: Filter = Filter("classic") + val daitchMokotoffSoundex: Filter = Filter("daitchMokotoffSoundex") + val doubleMetaphone: Filter = Filter("doubleMetaphone") diff --git a/modules/solr-client/src/main/scala/io/renku/solr/client/schema/SchemaCommand.scala b/modules/solr-client/src/main/scala/io/renku/solr/client/schema/SchemaCommand.scala new file mode 100644 index 00000000..f82d10ca --- /dev/null +++ b/modules/solr-client/src/main/scala/io/renku/solr/client/schema/SchemaCommand.scala @@ -0,0 +1,39 @@ +/* + * Copyright 2024 Swiss Data Science Center (SDSC) + * A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and + * Eidgenössische Technische Hochschule Zürich (ETHZ). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.renku.solr.client.schema + +import io.renku.solr.client.schema.SchemaCommand.DeleteDynamicField + +enum SchemaCommand: + case Add(element: SchemaCommand.Element) + case DeleteField(name: FieldName) + case DeleteType(name: TypeName) + case DeleteDynamicField(name: FieldName) + + def commandName: String = this match + case Add(_: Field) => "add-field" + case Add(_: FieldType) => "add-field-type" + case Add(_: DynamicFieldRule) => "add-dynamic-field" + case Add(_: CopyFieldRule) => "add-copy-field" + case _: DeleteField => "delete-field" + case _: DeleteType => "delete-field-type" + case _: DeleteDynamicField => "delete-dynamic-field" + +object SchemaCommand: + type Element = FieldType | Field | DynamicFieldRule | CopyFieldRule diff --git a/modules/solr-client/src/main/scala/io/renku/solr/client/schema/SchemaJsonCodec.scala b/modules/solr-client/src/main/scala/io/renku/solr/client/schema/SchemaJsonCodec.scala new file mode 100644 index 00000000..ba524ee3 --- /dev/null +++ b/modules/solr-client/src/main/scala/io/renku/solr/client/schema/SchemaJsonCodec.scala @@ -0,0 +1,81 @@ +/* + * Copyright 2024 Swiss Data Science Center (SDSC) + * A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and + * Eidgenössische Technische Hochschule Zürich (ETHZ). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.renku.solr.client.schema + +import io.bullet.borer.{Encoder, Writer} +import io.bullet.borer.NullOptions.* +import io.bullet.borer.derivation.MapBasedCodecs.deriveEncoder +import io.renku.solr.client.schema.SchemaCommand.Element + +trait SchemaJsonCodec { + + given Encoder[Tokenizer] = deriveEncoder + given Encoder[Filter] = deriveEncoder + given Encoder[Analyzer.AnalyzerType] = + Encoder.forString.contramap(_.productPrefix.toLowerCase) + given Encoder[Analyzer] = deriveEncoder + given Encoder[FieldType] = deriveEncoder + given Encoder[DynamicFieldRule] = deriveEncoder + given Encoder[CopyFieldRule] = deriveEncoder + + given (using + e1: Encoder[Field], + e2: Encoder[FieldType], + e3: Encoder[DynamicFieldRule], + e4: Encoder[CopyFieldRule] + ): Encoder[SchemaCommand.Element] = + (w: Writer, value: Element) => + value match + case v: Field => e1.write(w, v) + case v: FieldType => e2.write(w, v) + case v: DynamicFieldRule => e3.write(w, v) + case v: CopyFieldRule => e4.write(w, v) + + private def commandPayloadEncoder(using + e: Encoder[SchemaCommand.Element] + ): Encoder[SchemaCommand] = + new Encoder[SchemaCommand]: + override def write(w: Writer, value: SchemaCommand) = + value match + case SchemaCommand.Add(v) => + e.write(w, v) + case SchemaCommand.DeleteType(tn) => + w.writeMap(Map("name" -> tn)) + case SchemaCommand.DeleteField(fn) => + w.writeMap(Map("name" -> fn)) + case SchemaCommand.DeleteDynamicField(fn) => + w.writeMap(Map("name" -> fn)) + + given Encoder[Seq[SchemaCommand]] = + new Encoder[Seq[SchemaCommand]]: + override def write(w: Writer, value: Seq[SchemaCommand]) = + w.writeMapOpen(value.size) + value.foreach { v => + w.writeMapMember(v.commandName, v)( + Encoder[String], + commandPayloadEncoder + ) + } + w.writeMapClose() + + given Encoder[SchemaCommand] = + Encoder[Seq[SchemaCommand]].contramap(Seq(_)) +} + +object SchemaJsonCodec extends SchemaJsonCodec diff --git a/modules/solr-client/src/main/scala/io/renku/solr/client/schema/Tokenizer.scala b/modules/solr-client/src/main/scala/io/renku/solr/client/schema/Tokenizer.scala new file mode 100644 index 00000000..8dd801a6 --- /dev/null +++ b/modules/solr-client/src/main/scala/io/renku/solr/client/schema/Tokenizer.scala @@ -0,0 +1,26 @@ +/* + * Copyright 2024 Swiss Data Science Center (SDSC) + * A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and + * Eidgenössische Technische Hochschule Zürich (ETHZ). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.renku.solr.client.schema + +final case class Tokenizer(name: String) + +object Tokenizer: + val standard: Tokenizer = Tokenizer("standard") + val whitespace: Tokenizer = Tokenizer("whitespace") + val classic: Tokenizer = Tokenizer("classic") diff --git a/modules/solr-client/src/main/scala/io/renku/solr/client/schema/TypeName.scala b/modules/solr-client/src/main/scala/io/renku/solr/client/schema/TypeName.scala new file mode 100644 index 00000000..0b51ade0 --- /dev/null +++ b/modules/solr-client/src/main/scala/io/renku/solr/client/schema/TypeName.scala @@ -0,0 +1,30 @@ +/* + * Copyright 2024 Swiss Data Science Center (SDSC) + * A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and + * Eidgenössische Technische Hochschule Zürich (ETHZ). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.renku.solr.client.schema + +import io.bullet.borer.Encoder + +opaque type TypeName = String + +object TypeName: + def apply(name: String): TypeName = name + + extension (self: TypeName) def name: String = self + + given Encoder[TypeName] = Encoder.forString diff --git a/modules/solr-client/src/test/scala/io/renku/solr/client/JsonEncodingTest.scala b/modules/solr-client/src/test/scala/io/renku/solr/client/JsonEncodingTest.scala new file mode 100644 index 00000000..fea78507 --- /dev/null +++ b/modules/solr-client/src/test/scala/io/renku/solr/client/JsonEncodingTest.scala @@ -0,0 +1,40 @@ +/* + * Copyright 2024 Swiss Data Science Center (SDSC) + * A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and + * Eidgenössische Technische Hochschule Zürich (ETHZ). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.renku.solr.client + +import io.bullet.borer.derivation.MapBasedCodecs.deriveDecoder +import io.bullet.borer.{Decoder, Encoder, Json, Writer} +import io.renku.solr.client.JsonEncodingTest.Room +import munit.FunSuite + +class JsonEncodingTest extends FunSuite { + + test("test with discriminator"): + val r = Room("meeting room", 59) + val json = Json.encode(r).toUtf8String + val rr = Json.decode(json.getBytes).to[Room].value + assertEquals(json, """{"_type":"Room","name":"meeting room","seats":59}""") + assertEquals(rr, r) +} + +object JsonEncodingTest: + case class Room(name: String, seats: Int) + object Room: + given Decoder[Room] = deriveDecoder + given Encoder[Room] = EncoderSupport.deriveWithDiscriminator[Room] diff --git a/modules/solr-client/src/test/scala/io/renku/solr/client/SolrClientGenerator.scala b/modules/solr-client/src/test/scala/io/renku/solr/client/SolrClientGenerator.scala new file mode 100644 index 00000000..6cce503a --- /dev/null +++ b/modules/solr-client/src/test/scala/io/renku/solr/client/SolrClientGenerator.scala @@ -0,0 +1,25 @@ +/* + * Copyright 2024 Swiss Data Science Center (SDSC) + * A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and + * Eidgenössische Technische Hochschule Zürich (ETHZ). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.renku.solr.client + +import org.scalacheck.Gen + +object SolrClientGenerator: + + extension [V](gen: Gen[V]) def generateOne: V = gen.sample.getOrElse(generateOne) diff --git a/modules/solr-client/src/test/scala/io/renku/solr/client/SolrClientSpec.scala b/modules/solr-client/src/test/scala/io/renku/solr/client/SolrClientSpec.scala new file mode 100644 index 00000000..dee7bcac --- /dev/null +++ b/modules/solr-client/src/test/scala/io/renku/solr/client/SolrClientSpec.scala @@ -0,0 +1,53 @@ +/* + * Copyright 2024 Swiss Data Science Center (SDSC) + * A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and + * Eidgenössische Technische Hochschule Zürich (ETHZ). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.renku.solr.client + +import cats.effect.IO +import io.bullet.borer.{Decoder, Encoder} +import io.bullet.borer.derivation.MapBasedCodecs.deriveDecoder +import io.renku.solr.client.SolrClientSpec.Room +import io.renku.solr.client.schema.* +import io.renku.solr.client.util.{SolrSpec, SolrTruncate} +import munit.CatsEffectSuite + +class SolrClientSpec extends CatsEffectSuite with SolrSpec with SolrTruncate: + + test("use schema for inserting and querying"): + val cmds = Seq( + SchemaCommand.Add(FieldType.text(TypeName("roomText"), Analyzer.classic)), + SchemaCommand.Add(FieldType.int(TypeName("roomInt"))), + SchemaCommand.Add(Field(FieldName("roomName"), TypeName("roomText"))), + SchemaCommand.Add(Field(FieldName("roomDescription"), TypeName("roomText"))), + SchemaCommand.Add(Field(FieldName("roomSeats"), TypeName("roomInt"))) + ) + withSolrClient().use { client => + for { + _ <- client.modifySchema(cmds) + _ <- client + .insert[Room](Seq(Room("meeting room", "room for meetings", 56))) + r <- client.query[Room](QueryString("_type:Room")) + _ <- IO.println(r) + } yield () + } + +object SolrClientSpec: + case class Room(roomName: String, roomDescription: String, roomSeats: Int) + object Room: + given Decoder[Room] = deriveDecoder + given Encoder[Room] = EncoderSupport.deriveWithDiscriminator[Room] diff --git a/modules/solr-client/src/test/scala/io/renku/solr/client/migration/SolrMigratorSpec.scala b/modules/solr-client/src/test/scala/io/renku/solr/client/migration/SolrMigratorSpec.scala new file mode 100644 index 00000000..bf96c3fb --- /dev/null +++ b/modules/solr-client/src/test/scala/io/renku/solr/client/migration/SolrMigratorSpec.scala @@ -0,0 +1,74 @@ +/* + * Copyright 2024 Swiss Data Science Center (SDSC) + * A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and + * Eidgenössische Technische Hochschule Zürich (ETHZ). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.renku.solr.client.migration + +import cats.effect.IO +import io.renku.solr.client.SolrClient +import io.renku.solr.client.schema.* +import io.renku.solr.client.schema.SchemaCommand.Add +import io.renku.solr.client.util.{SolrSpec, SolrTruncate} +import munit.CatsEffectSuite + +class SolrMigratorSpec extends CatsEffectSuite with SolrSpec with SolrTruncate: + private val logger = scribe.cats.io + private val migrations = Seq( + SchemaMigration(-5, Add(FieldType.text(TypeName("testText"), Analyzer.classic))), + SchemaMigration(-4, Add(FieldType.int(TypeName("testInt")))), + SchemaMigration(-3, Add(Field(FieldName("testName"), TypeName("testText")))), + SchemaMigration(-2, Add(Field(FieldName("testDescription"), TypeName("testText")))), + SchemaMigration(-1, Add(Field(FieldName("testSeats"), TypeName("testInt")))) + ) + + private def truncate(client: SolrClient[IO]): IO[Unit] = + truncateAll(client)( + Seq( + FieldName("currentSchemaVersion"), + FieldName("testName"), + FieldName("testDescription"), + FieldName("testSeats") + ), + Seq(TypeName("testText"), TypeName("testInt")) + ) + + test("run sample migrations"): + withSolrClient().use { client => + val migrator = SchemaMigrator[IO](client) + for { + _ <- truncate(client) + _ <- migrator.migrate(migrations) + c <- migrator.currentVersion + _ = assertEquals(c, Some(-1L)) + } yield () + } + + test("run migrations"): + withSolrClient().use { client => + val migrator = SchemaMigrator(client) + val first = migrations.take(2) + for { + _ <- truncate(client) + _ <- migrator.migrate(first) + v0 <- migrator.currentVersion + _ = assertEquals(v0, Some(-4L)) + + _ <- migrator.migrate(migrations) + v1 <- migrator.currentVersion + _ = assertEquals(v1, Some(-1L)) + } yield () + } diff --git a/modules/solr-client/src/test/scala/io/renku/solr/client/schema/BorerJsonCodecTest.scala b/modules/solr-client/src/test/scala/io/renku/solr/client/schema/BorerJsonCodecTest.scala new file mode 100644 index 00000000..81038feb --- /dev/null +++ b/modules/solr-client/src/test/scala/io/renku/solr/client/schema/BorerJsonCodecTest.scala @@ -0,0 +1,51 @@ +/* + * Copyright 2024 Swiss Data Science Center (SDSC) + * A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and + * Eidgenössische Technische Hochschule Zürich (ETHZ). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.renku.solr.client.schema + +import io.bullet.borer.Json +import io.renku.solr.client.schema.SchemaCommand.DeleteType +import munit.FunSuite + +class BorerJsonCodecTest extends FunSuite with SchemaJsonCodec { + + test("encode schema command: delete type"): + val v = DeleteType(TypeName("integer")) + assertEquals( + Json.encode(v).toUtf8String, + """{"delete-field-type":{"name":"integer"}}""" + ) + + test("encode schema command: add"): + val v = SchemaCommand.Add(Field(FieldName("description"), TypeName("integer"))) + assertEquals( + Json.encode(v).toUtf8String, + """{"add-field":{"name":"description","type":"integer","required":false,"indexed":true,"stored":true,"multiValued":false,"uninvertible":true,"docValues":false}}""" + ) + + test("encode multiple schema commands into a single object"): + val vs = Seq( + DeleteType(TypeName("integer")), + DeleteType(TypeName("float")), + SchemaCommand.Add(Field(FieldName("description"), TypeName("text"))) + ) + assertEquals( + Json.encode(vs).toUtf8String, + """{"delete-field-type":{"name":"integer"},"delete-field-type":{"name":"float"},"add-field":{"name":"description","type":"text","required":false,"indexed":true,"stored":true,"multiValued":false,"uninvertible":true,"docValues":false}}""".stripMargin + ) +} diff --git a/modules/solr-client/src/test/scala/io/renku/solr/client/util/SolrSpec.scala b/modules/solr-client/src/test/scala/io/renku/solr/client/util/SolrSpec.scala new file mode 100644 index 00000000..06249b87 --- /dev/null +++ b/modules/solr-client/src/test/scala/io/renku/solr/client/util/SolrSpec.scala @@ -0,0 +1,52 @@ +/* + * Copyright 2024 Swiss Data Science Center (SDSC) + * A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and + * Eidgenössische Technische Hochschule Zürich (ETHZ). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.renku.solr.client.util + +import cats.effect.* +import io.renku.servers.SolrServer +import io.renku.solr.client.{SolrClient, SolrConfig} +import org.http4s.Uri + +import scala.concurrent.duration.Duration + +trait SolrSpec: + self: munit.Suite => + + protected lazy val server: SolrServer = SolrServer + protected lazy val solrConfig: SolrConfig = SolrConfig( + Uri.unsafeFromString(server.url) / "solr", + server.genericCoreName, + commitWithin = Some(Duration.Zero), + logMessageBodies = true + ) + + val withSolrClient: Fixture[Resource[IO, SolrClient[IO]]] = + new Fixture[Resource[IO, SolrClient[IO]]]("solr"): + + def apply(): Resource[IO, SolrClient[IO]] = + SolrClient[IO](solrConfig) + + override def beforeAll(): Unit = + server.start() + + override def afterAll(): Unit = + server.stop() + + override def munitFixtures: Seq[Fixture[_]] = + List(withSolrClient) diff --git a/modules/solr-client/src/test/scala/io/renku/solr/client/util/SolrTruncate.scala b/modules/solr-client/src/test/scala/io/renku/solr/client/util/SolrTruncate.scala new file mode 100644 index 00000000..cd59a4e6 --- /dev/null +++ b/modules/solr-client/src/test/scala/io/renku/solr/client/util/SolrTruncate.scala @@ -0,0 +1,47 @@ +/* + * Copyright 2024 Swiss Data Science Center (SDSC) + * A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and + * Eidgenössische Technische Hochschule Zürich (ETHZ). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.renku.solr.client.util + +import cats.effect.IO +import cats.syntax.all.* +import io.renku.search.http.ResponseLogging +import io.renku.solr.client.schema.{FieldName, SchemaCommand, TypeName} +import io.renku.solr.client.{QueryString, SolrClient} + +trait SolrTruncate { + + def truncateAll( + client: SolrClient[IO] + )(fields: Seq[FieldName], types: Seq[TypeName]): IO[Unit] = + for { + _ <- client.delete(QueryString("*:*")) + _ <- fields + .map(SchemaCommand.DeleteField.apply) + .traverse_(modifyIgnoreError(client)) + _ <- types + .map(SchemaCommand.DeleteType.apply) + .traverse_(modifyIgnoreError(client)) + } yield () + + private def modifyIgnoreError(client: SolrClient[IO])(c: SchemaCommand) = + client + .modifySchema(Seq(c), ResponseLogging.Ignore) + .attempt + .void +} diff --git a/project/AvroCodeGen.scala b/project/AvroCodeGen.scala new file mode 100644 index 00000000..35d8b874 --- /dev/null +++ b/project/AvroCodeGen.scala @@ -0,0 +1,23 @@ +import sbt.* +import sbt.Keys.{libraryDependencies, sourceGenerators} +import sbtavrohugger.SbtAvrohugger +import sbtavrohugger.SbtAvrohugger.autoImport.* + +object AvroCodeGen extends AutoPlugin { + override def requires = SbtAvrohugger + + override def projectSettings = Seq( + libraryDependencies ++= Dependencies.avro, + Compile / avroScalaCustomTypes := { + avrohugger.format.SpecificRecord.defaultTypes.copy( + record = avrohugger.types.ScalaCaseClassWithSchema + ) + }, + Compile / avroScalaSpecificCustomTypes := { + avrohugger.format.SpecificRecord.defaultTypes.copy( + record = avrohugger.types.ScalaCaseClassWithSchema + ) + }, + Compile / sourceGenerators += (Compile / avroScalaGenerate).taskValue + ) +} diff --git a/project/DbTestPlugin.scala b/project/DbTestPlugin.scala new file mode 100644 index 00000000..3ea9421d --- /dev/null +++ b/project/DbTestPlugin.scala @@ -0,0 +1,48 @@ +import sbt._ +import sbt.Keys._ +import _root_.io.renku.servers._ + +object DbTestPlugin extends AutoPlugin { + + object autoImport { + val dbTests = taskKey[Unit]("Run the tests with databases turned on") + } + + import autoImport._ + + // AllRequirements makes it enabled on all sub projects by default + // It is possible to use `.disablePlugins(DbTestPlugin)` to disable + // it + override def trigger = PluginTrigger.AllRequirements + + override def projectSettings: Seq[Def.Setting[_]] = Seq( + Test / dbTests := { + Def + .sequential( + Def.task { + val logger = streams.value.log + logger.info("Starting REDIS server") + RedisServer.start() + logger.info("Starting SOLR server") + SolrServer.start() + logger.info("Running tests") + }, + (Test / test).all(ScopeFilter(inAggregates(ThisProject))), + Def.task { + val logger = streams.value.log + logger.info("Stopping SOLR server") + SolrServer.forceStop() + logger.info("Stopping REDIS server") + RedisServer.forceStop() + } + ) + .value + }, + // We need to disable running the `dbTests` on all aggregates, + // otherwise it would try starting/stopping servers again and + // again. The `all(ScopeFilter(inAggregates(ThisProject)))` makes + // sure that tests run on all aggregates anyways- but + // starting/stopping servers only once. + dbTests / aggregate := false + ) +} diff --git a/project/Dependencies.scala b/project/Dependencies.scala index dcd0efa6..fbcf68c1 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -6,10 +6,12 @@ object Dependencies { object V { val avro = "1.11.1" val avro4s = "5.0.9" + val borer = "1.13.0" val catsCore = "2.10.0" val catsEffect = "3.5.3" val catsEffectMunit = "1.0.7" val fs2 = "3.9.3" + val http4s = "0.23.25" val redis4Cats = "1.5.2" val scalacheckEffectMunit = "1.0.4" val scodec = "2.2.2" @@ -17,6 +19,13 @@ object Dependencies { val scribe = "3.13.0" } + val borer = Seq( + "io.bullet" %% "borer-core" % V.borer, + "io.bullet" %% "borer-derivation" % V.borer, + "io.bullet" %% "borer-compat-cats" % V.borer, + "io.bullet" %% "borer-compat-scodec" % V.borer + ) + val scodec = Seq( "org.scodec" %% "scodec-core" % V.scodec ) @@ -54,6 +63,13 @@ object Dependencies { "co.fs2" %% "fs2-core" % V.fs2 ) + val http4sCore = Seq( + "org.http4s" %% "http4s-core" % V.http4s + ) + val http4sClient = Seq( + "org.http4s" %% "http4s-ember-client" % V.http4s + ) + val scribe = Seq( "com.outr" %% "scribe" % V.scribe, "com.outr" %% "scribe-slf4j2" % V.scribe, diff --git a/project/RedisServer.scala b/project/RedisServer.scala index dd22014f..19042ea5 100644 --- a/project/RedisServer.scala +++ b/project/RedisServer.scala @@ -15,28 +15,78 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +package io.renku.servers -import java.util.concurrent.atomic.AtomicInteger +import java.util.concurrent.atomic.AtomicBoolean +import scala.sys.process.* import scala.util.Try -object RedisServer { +object RedisServer extends RedisServer("graph", port = 6379) - private val startRequests = new AtomicInteger(0) +@annotation.nowarn() +class RedisServer(module: String, port: Int) { - def start: ClassLoader => Unit = { cl => - if (startRequests.getAndIncrement() == 0) call("start")(cl) + val url: String = s"redis://localhost:$port" + + // When using a local Redis for development, use this env variable + // to not start a Redis server via docker for the tests + private val skipServer: Boolean = sys.env.contains("NO_REDIS") + + private val containerName = s"$module-test-redis" + private val image = "redis:7.2.4-alpine" + private val startCmd = s"""|docker run --rm + |--name $containerName + |-p $port:6379 + |-d $image""".stripMargin + private val isRunningCmd = + Seq("docker", "container", "ls", "--filter", s"name=$containerName") + private val stopCmd = s"docker stop -t5 $containerName" + private val readyCmd = "redis-cli -h 127.0.0.1 -p 6379 PING" + private val isReadyCmd = + Seq("docker", "exec", containerName, "sh", "-c", readyCmd) + private val wasStartedHere = new AtomicBoolean(false) + + def start(): Unit = synchronized { + if (skipServer) println("Not starting Redis via docker") + else if (checkRunning) () + else { + println(s"Starting Redis container for '$module' from '$image' image") + startContainer() + var rc = 1 + while (rc != 0) { + Thread.sleep(500) + rc = Process(isReadyCmd).! + if (rc == 0) println(s"Redis container for '$module' started on port $port") + else println(s"IsReadyCmd returned $rc") + } + } } - def stop: ClassLoader => Unit = { cl => - if (startRequests.decrementAndGet() == 0) - Try(call("forceStop")(cl)) - .recover { case err => err.printStackTrace() } + private def checkRunning: Boolean = { + val out = isRunningCmd.lineStream_!.take(200).toList + out.exists(_ contains containerName) } - private def call(methodName: String): ClassLoader => Unit = classLoader => { - val clazz = classLoader.loadClass("io.renku.redis.client.util.RedisServer$") - val method = clazz.getMethod(methodName) - val instance = clazz.getField("MODULE$").get(null) - method.invoke(instance) + private def startContainer(): Unit = { + val retryOnContainerFailedToRun: Throwable => Unit = { + case ex if ex.getMessage contains "Nonzero exit value: 125" => + Thread.sleep(500); start() + case ex => throw ex + } + Try(startCmd.!!).fold(retryOnContainerFailedToRun, _ => wasStartedHere.set(true)) } + + def stop(): Unit = + if (!skipServer && wasStartedHere.get()) { + println(s"Stopping Redis container for '$module'") + stopCmd.!! + () + } + + def forceStop(): Unit = + if (!skipServer) { + println(s"Stopping Redis container for '$module'") + stopCmd.!! + () + } } diff --git a/project/SolrServer.scala b/project/SolrServer.scala new file mode 100644 index 00000000..05727524 --- /dev/null +++ b/project/SolrServer.scala @@ -0,0 +1,112 @@ +/* + * Copyright 2024 Swiss Data Science Center (SDSC) + * A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and + * Eidgenössische Technische Hochschule Zürich (ETHZ). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.renku.servers + +import java.util.concurrent.atomic.AtomicBoolean +import scala.sys.process.* +import scala.util.Try + +object SolrServer extends SolrServer("graph", port = 8983) + +@annotation.nowarn() +class SolrServer(module: String, port: Int) { + + val url: String = s"http://localhost:$port" + + // When using a local Solr for development, use this env variable + // to not start a Solr server via docker for the tests + private val skipServer: Boolean = sys.env.contains("NO_SOLR") + + private val containerName = s"$module-test-solr" + private val image = "solr:9.4.1-slim" + val genericCoreName = "core-test" + val searchCoreName = "search-core-test" + private val cores = Set(genericCoreName, searchCoreName) + private val startCmd = s"""|docker run --rm + |--name $containerName + |-p $port:8983 + |-d $image""".stripMargin + private val isRunningCmd = + Seq("docker", "container", "ls", "--filter", s"name=$containerName") + private val stopCmd = s"docker stop -t5 $containerName" + private def readyCmd(core: String) = + s"curl http://localhost:8983/solr/$core/select?q=*:* --no-progress-meter --fail 1> /dev/null" + private def isReadyCmd(core: String) = + Seq("docker", "exec", containerName, "sh", "-c", readyCmd(core)) + private def createCore(core: String) = s"precreate-core $core" + private def createCoreCmd(core: String) = + Seq("docker", "exec", containerName, "sh", "-c", createCore(core)) + private val wasStartedHere = new AtomicBoolean(false) + + def start(): Unit = + if (skipServer) println("Not starting Solr via docker") + else if (checkRunning) () + else { + println(s"Starting Solr container for '$module' from '$image' image") + startContainer() + waitForCoresToBeReady() + } + + private def waitForCoresToBeReady(): Unit = { + var rc = 1 + while (rc != 0) { + Thread.sleep(500) + rc = checkCoresReady + if (rc == 0) println(s"Solr container for '$module' ready on port $port") + } + } + + private def checkCoresReady = + cores.foldLeft(0)((rc, core) => if (rc == 0) isReadyCmd(core).! else rc) + + private def checkRunning: Boolean = { + val out = isRunningCmd.lineStream_!.take(20).toList + val isRunning = out.exists(_ contains containerName) + if (isRunning) waitForCoresToBeReady() + isRunning + } + + private def startContainer(): Unit = { + val retryOnContainerFailedToRun: Throwable => Unit = { + case ex if ex.getMessage contains "Nonzero exit value: 125" => + Thread.sleep(500); start() + case ex => throw ex + } + Try(startCmd.!!).fold(retryOnContainerFailedToRun, _ => wasStartedHere.set(true)) + val rcs = cores.map(c => c -> createCoreCmd(c).!) + println( + s"Created solr cores: ${rcs.map { case (core, rc) => s"'$core' ($rc)" }.mkString(", ")}" + ) + } + + def stop(): Unit = + if (!skipServer && !wasStartedHere.get()) () + else { + println(s"Stopping Solr container for '$module'") + stopCmd.!! + () + } + + def forceStop(): Unit = + if (!skipServer) { + println(s"Stopping Solr container for '$module'") + stopCmd.!! + () + } +}