Skip to content

Commit

Permalink
feat: the initial provisioning route (#7)
Browse files Browse the repository at this point in the history
* Add SOLR client library allowing basic queries, updates and schema changes
* Allow solr payloads encoded by borer; adding discriminator fields for case classes
* Add search module using solr library to search and insert projects
* Combine solr and redis in search-provision module implementing the whole route: consuming redis messages, avro decoding and pushing to slr
* Adds test setup that starts single instances of solr and redis for all tests to share

---------

Co-authored-by: Eike Kettner <[email protected]>
  • Loading branch information
jachro and eikek authored Jan 26, 2024
1 parent 322f215 commit e503089
Show file tree
Hide file tree
Showing 80 changed files with 3,030 additions and 204 deletions.
123 changes: 99 additions & 24 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ releaseVersionBump := sbtrelease.Version.Bump.Minor
releaseIgnoreUntrackedFiles := true
releaseTagName := (ThisBuild / version).value

addCommandAlias("ci", "; lint; test; publishLocal")
addCommandAlias("ci", "; lint; dbTests; publishLocal")
addCommandAlias(
"lint",
"; scalafmtSbtCheck; scalafmtCheckAll;" // Compile/scalafix --check; Test/scalafix --check
Expand All @@ -37,6 +37,7 @@ addCommandAlias("fix", "; scalafmtSbt; scalafmtAll") // ; Compile/scalafix; Test
lazy val root = project
.in(file("."))
.withId("renku-search")
.enablePlugins(DbTestPlugin)
.settings(
publish / skip := true,
publishTo := Some(
Expand All @@ -45,8 +46,11 @@ lazy val root = project
)
.aggregate(
commons,
httpClient,
messages,
redisClient,
solrClient,
searchSolrClient,
searchProvision
)

Expand All @@ -60,31 +64,115 @@ lazy val commons = project
Dependencies.catsEffect ++
Dependencies.fs2Core ++
Dependencies.scodecBits ++
Dependencies.scribe
Dependencies.scribe,
Test / sourceGenerators += Def.task {
val sourceDir =
(LocalRootProject / baseDirectory).value / "project"
val sources = Seq(
sourceDir / "RedisServer.scala",
sourceDir / "SolrServer.scala"
) // IO.listFiles(sourceDir)
val targetDir = (Test / sourceManaged).value / "servers"
IO.createDirectory(targetDir)

val targets = sources.map(s => targetDir / s.name)
IO.copy(sources.zip(targets))
targets
}.taskValue
)
.enablePlugins(AutomateHeaderPlugin)
.disablePlugins(DbTestPlugin)

lazy val http4sBorer = project
.in(file("modules/http4s-borer"))
.enablePlugins(AutomateHeaderPlugin)
.disablePlugins(DbTestPlugin)
.withId("http4s-borer")
.settings(commonSettings)
.settings(
name := "http4s-borer",
description := "Use borer codecs with http4s",
libraryDependencies ++=
Dependencies.borer ++
Dependencies.http4sCore ++
Dependencies.fs2Core
)

lazy val httpClient = project
.in(file("modules/http-client"))
.withId("http-client")
.enablePlugins(AutomateHeaderPlugin)
.disablePlugins(DbTestPlugin)
.settings(commonSettings)
.settings(
name := "http-client",
description := "Utilities for the http client in http4s",
libraryDependencies ++=
Dependencies.http4sClient ++
Dependencies.fs2Core ++
Dependencies.scribe
)
.dependsOn(
http4sBorer % "compile->compile;test->test"
)

lazy val redisClient = project
.in(file("modules/redis-client"))
.withId("redis-client")
.settings(commonSettings)
.settings(
name := "redis-client",
Test / testOptions += Tests.Setup(RedisServer.start),
Test / testOptions += Tests.Cleanup(RedisServer.stop),
libraryDependencies ++=
Dependencies.catsCore ++
Dependencies.catsEffect ++
Dependencies.redis4Cats ++
Dependencies.redis4CatsStreams
)
.enablePlugins(AutomateHeaderPlugin)
.dependsOn(
commons % "test->test"
)

lazy val solrClient = project
.in(file("modules/solr-client"))
.withId("solr-client")
.enablePlugins(AvroCodeGen, AutomateHeaderPlugin)
.settings(commonSettings)
.settings(
name := "solr-client",
libraryDependencies ++=
Dependencies.catsCore ++
Dependencies.catsEffect ++
Dependencies.http4sClient
)
.dependsOn(
httpClient % "compile->compile;test->test",
commons % "test->test"
)

lazy val searchSolrClient = project
.in(file("modules/search-solr-client"))
.withId("search-solr-client")
.enablePlugins(AvroCodeGen, AutomateHeaderPlugin)
.settings(commonSettings)
.settings(
name := "search-solr-client",
libraryDependencies ++=
Dependencies.catsCore ++
Dependencies.catsEffect
)
.dependsOn(
avroCodec % "compile->compile;test->test",
solrClient % "compile->compile;test->test",
commons % "test->test"
)

lazy val avroCodec = project
.in(file("modules/avro-codec"))
.disablePlugins(DbTestPlugin)
.settings(commonSettings)
.settings(
name := "avro-codecs",
name := "avro-codec",
libraryDependencies ++=
Dependencies.avro ++
Dependencies.scodecBits
Expand All @@ -94,40 +182,27 @@ lazy val messages = project
.in(file("modules/messages"))
.settings(commonSettings)
.settings(
name := "messages",
libraryDependencies ++= Dependencies.avro,
Compile / avroScalaCustomTypes := {
avrohugger.format.SpecificRecord.defaultTypes.copy(
record = avrohugger.types.ScalaCaseClassWithSchema
)
},
Compile / avroScalaSpecificCustomTypes := {
avrohugger.format.SpecificRecord.defaultTypes.copy(
record = avrohugger.types.ScalaCaseClassWithSchema
)
},
Compile / sourceGenerators += (Compile / avroScalaGenerate).taskValue
name := "messages"
)
.dependsOn(
commons % "compile->compile;test->test",
avroCodec % "compile->compile;test->test"
)
.enablePlugins(AutomateHeaderPlugin)
.enablePlugins(AvroCodeGen, AutomateHeaderPlugin)
.disablePlugins(DbTestPlugin)

lazy val searchProvision = project
.in(file("modules/search-provision"))
.withId("search-provision")
.settings(commonSettings)
.settings(
name := "search-provision",
Test / testOptions += Tests.Setup(RedisServer.start),
Test / testOptions += Tests.Cleanup(RedisServer.stop)
name := "search-provision"
)
.dependsOn(
commons % "compile->compile;test->test",
messages % "compile->compile;test->test",
avroCodec % "compile->compile;test->test",
redisClient % "compile->compile;test->test"
redisClient % "compile->compile;test->test",
searchSolrClient % "compile->compile;test->test"
)
.enablePlugins(AutomateHeaderPlugin)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
package io.renku.avro.codec

object all extends encoders.all with decoders.all
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,16 @@

package io.renku.avro.codec.decoders

import io.renku.avro.codec.{AvroDecoder, AvroCodecException}
import io.renku.avro.codec.{AvroCodecException, AvroDecoder}
import org.apache.avro.Schema

import scala.jdk.CollectionConverters.*
import scala.reflect.ClassTag

trait CollectionDecoders:
given [T: ClassTag](using decoder: AvroDecoder[T]): AvroDecoder[Array[T]] =
CollectionDecoders.ArrayDecoder[T](decoder)

given [T](using dec: AvroDecoder[T]): AvroDecoder[List[T]] =
CollectionDecoders.iterableDecoder[T, List](dec, _.toList)

Expand All @@ -34,8 +37,12 @@ trait CollectionDecoders:
given [T](using dec: AvroDecoder[T]): AvroDecoder[Set[T]] =
CollectionDecoders.iterableDecoder[T, Set](dec, _.toSet)

given [T](using dec: AvroDecoder[T]): AvroDecoder[Map[String, T]] =
CollectionDecoders.MapDecoder[T](dec)

object CollectionDecoders:
class ArrayDecoder[T: ClassTag](decoder: AvroDecoder[T]) extends AvroDecoder[Array[T]]:
private class ArrayDecoder[T: ClassTag](decoder: AvroDecoder[T])
extends AvroDecoder[Array[T]]:
def decode(schema: Schema): Any => Array[T] = {
require(
schema.getType == Schema.Type.ARRAY,
Expand All @@ -51,6 +58,16 @@ object CollectionDecoders:
}
}

private class MapDecoder[T](decoder: AvroDecoder[T])
extends AvroDecoder[Map[String, T]]:
override def decode(schema: Schema): Any => Map[String, T] = {
require(schema.getType == Schema.Type.MAP)
val decode = decoder.decode(schema.getValueType)
{ case map: java.util.Map[_, _] =>
map.asScala.toMap.map { case (k, v) => k.toString -> decode(v) }
}
}

def iterableDecoder[T, C[X] <: Iterable[X]](
decoder: AvroDecoder[T],
build: Iterable[T] => C[T]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

package io.renku.avro.codec.decoders

object all
trait all
extends PrimitiveDecoders
with StringDecoders
with BigDecimalDecoders
Expand All @@ -29,3 +29,5 @@ object all
with JavaEnumDecoders
with EitherDecoders
with RecordDecoders

object all extends all
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ trait CollectionEncoders {
private def iterableEncoder[T, C[X] <: Iterable[X]](
encoder: AvroEncoder[T]
): AvroEncoder[C[T]] = (schema: Schema) => {
require(schema.getType == Schema.Type.ARRAY)
require(schema.getType == Schema.Type.ARRAY, s"Expected array schema, got: $schema")
val elementEncoder = encoder.encode(schema.getElementType)
{ t => t.map(elementEncoder.apply).toList.asJava }
}
Expand All @@ -48,4 +48,18 @@ trait CollectionEncoders {
given [T](using encoder: AvroEncoder[T]): AvroEncoder[Vector[T]] = iterableEncoder(
encoder
)
given [T](using encoder: AvroEncoder[T]): AvroEncoder[Map[String, T]] =
CollectionEncoders.MapEncoder[T](encoder)
}

object CollectionEncoders:
private class MapEncoder[T](encoder: AvroEncoder[T])
extends AvroEncoder[Map[String, T]]:
override def encode(schema: Schema): Map[String, T] => Any = {
val encodeT = encoder.encode(schema.getValueType)
{ value =>
val map = new java.util.HashMap[String, Any]
value.foreach { case (k, v) => map.put(k, encodeT.apply(v)) }
map
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

package io.renku.avro.codec.encoders

object all
trait all
extends PrimitiveEncoders
with StringEncoders
with BigDecimalEncoders
Expand All @@ -29,3 +29,5 @@ object all
with ByteArrayEncoders
with JavaEnumEncoders
with RecordEncoders

object all extends all
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package io.renku.avro.codec.json

import io.renku.avro.codec.{AvroDecoder, AvroReader}
import org.apache.avro.Schema
import scodec.bits.ByteVector

import scala.util.Try

trait AvroJsonDecoder[A]:
def decode(json: ByteVector): Either[String, A]
final def map[B](f: A => B): AvroJsonDecoder[B] =
AvroJsonDecoder(this.decode.andThen(_.map(f)))
final def emap[B](f: A => Either[String, B]): AvroJsonDecoder[B] =
AvroJsonDecoder(this.decode.andThen(_.flatMap(f)))

object AvroJsonDecoder:
def apply[A](using d: AvroJsonDecoder[A]): AvroJsonDecoder[A] = d

def apply[A](f: ByteVector => Either[String, A]): AvroJsonDecoder[A] =
(json: ByteVector) => f(json)

def create[A: AvroDecoder](schema: Schema): AvroJsonDecoder[A] = { json =>
Try(AvroReader(schema).readJson[A](json)).toEither.left
.map(_.getMessage)
.flatMap(_.headOption.toRight(s"Empty json"))
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package io.renku.avro.codec.json

import io.renku.avro.codec.{AvroEncoder, AvroWriter}
import io.renku.avro.codec.encoders.all.given
import org.apache.avro.{Schema, SchemaBuilder}
import scodec.bits.ByteVector

trait AvroJsonEncoder[A]:
def encode(value: A): ByteVector
final def contramap[B](f: B => A): AvroJsonEncoder[B] =
AvroJsonEncoder(f.andThen(this.encode))

object AvroJsonEncoder:
def apply[A](using e: AvroJsonEncoder[A]): AvroJsonEncoder[A] = e

def apply[A](f: A => ByteVector): AvroJsonEncoder[A] =
(a: A) => f(a)

def create[A: AvroEncoder](schema: Schema): AvroJsonEncoder[A] =
a => AvroWriter(schema).writeJson(Seq(a))

given AvroJsonEncoder[String] =
create[String](SchemaBuilder.builder().stringType())

given AvroJsonEncoder[Long] =
create[Long](SchemaBuilder.builder().longType())

given AvroJsonEncoder[Int] =
create[Int](SchemaBuilder.builder().intType())

given AvroJsonEncoder[Double] =
create[Double](SchemaBuilder.builder().doubleType())
Loading

0 comments on commit e503089

Please sign in to comment.