Skip to content

Commit

Permalink
Merge branch 'development'
Browse files Browse the repository at this point in the history
  • Loading branch information
jachro committed Jan 19, 2024
2 parents df2b09f + 2920208 commit c3f3198
Show file tree
Hide file tree
Showing 34 changed files with 1,712 additions and 12 deletions.
36 changes: 36 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
name: CI
on:
pull_request:
branches:
- development
jobs:
ci-matrix:
runs-on: ubuntu-latest
strategy:
fail-fast: false
matrix:
java: [ '[email protected]' ]
steps:
- uses: actions/[email protected]
with:
fetch-depth: 100
- uses: olafurpg/setup-scala@v14
with:
java-version: ${{ matrix.java }}
# - name: Coursier cache
# uses: coursier/cache-action@v6
- name: sbt ci ${{ github.ref }}
run: sbt -mem 2048 ci
# - name: Log in to Docker Hub
# uses: docker/login-action@v2
# with:
# username: ${{ secrets.RENKU_DOCKER_USERNAME }}
# password: ${{ secrets.RENKU_DOCKER_PASSWORD }}
# - name: sbt docker:publishLocal
# run: sbt -mem 2048 cli/Docker/publishLocal
ci:
runs-on: ubuntu-latest
needs: [ci-matrix]
steps:
- name: Aggregate of lint, and all tests
run: echo "ci passed"
75 changes: 68 additions & 7 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,42 @@ releaseVersionBump := sbtrelease.Version.Bump.Minor
releaseIgnoreUntrackedFiles := true
releaseTagName := (ThisBuild / version).value

addCommandAlias("ci", "; lint; test; publishLocal")
addCommandAlias(
"lint",
"; scalafmtSbtCheck; scalafmtCheckAll;" // Compile/scalafix --check; Test/scalafix --check
)
addCommandAlias("fix", "; scalafmtSbt; scalafmtAll") // ; Compile/scalafix; Test/scalafix

lazy val root = project
.in(file("."))
.withId("renku-search")
.settings(
publish / skip := true,
publishTo := Some(Resolver.file("Unused transient repository", file("target/unusedrepo")))
publishTo := Some(
Resolver.file("Unused transient repository", file("target/unusedrepo"))
)
)
.aggregate(
commons,
messages,
redisClient
)

lazy val commons = project
.in(file("modules/commons"))
.settings(commonSettings)
.settings(
name := "commons",
libraryDependencies ++=
Dependencies.catsCore ++
Dependencies.catsEffect ++
Dependencies.fs2Core ++
Dependencies.scodecBits ++
Dependencies.scribe
)
.enablePlugins(AutomateHeaderPlugin)

lazy val redisClient = project
.in(file("modules/redis-client"))
.withId("redis-client")
Expand All @@ -54,10 +79,46 @@ lazy val redisClient = project
)
.enablePlugins(AutomateHeaderPlugin)

lazy val avroCodec = project
.in(file("modules/avro-codec"))
.settings(commonSettings)
.settings(
name := "avro-codecs",
libraryDependencies ++=
Dependencies.avro ++
Dependencies.scodecBits
)

lazy val messages = project
.in(file("modules/messages"))
.settings(commonSettings)
.settings(
name := "messages",
libraryDependencies ++= Dependencies.avro,
Compile / avroScalaCustomTypes := {
avrohugger.format.SpecificRecord.defaultTypes.copy(
record = avrohugger.types.ScalaCaseClassWithSchema
)
},
Compile / avroScalaSpecificCustomTypes := {
avrohugger.format.SpecificRecord.defaultTypes.copy(
record = avrohugger.types.ScalaCaseClassWithSchema
)
},
Compile / sourceGenerators += (Compile / avroScalaGenerate).taskValue
)
.dependsOn(
commons % "compile->compile;test->test",
avroCodec % "compile->compile;test->test"
)
.enablePlugins(AutomateHeaderPlugin)

lazy val commonSettings = Seq(
organization := "io.renku",
publish / skip := true,
publishTo := Some(Resolver.file("Unused transient repository", file("target/unusedrepo"))),
publishTo := Some(
Resolver.file("Unused transient repository", file("target/unusedrepo"))
),
Compile / packageDoc / publishArtifact := false,
Compile / packageSrc / publishArtifact := false,
// format: off
Expand All @@ -78,17 +139,17 @@ lazy val commonSettings = Seq(
),
Compile / console / scalacOptions := (Compile / scalacOptions).value.filterNot(_ == "-Xfatal-warnings"),
Test / console / scalacOptions := (Compile / console / scalacOptions).value,
libraryDependencies ++= (
Dependencies.scribe
),
libraryDependencies ++= (
Dependencies.catsEffectMunit ++
Dependencies.scalacheckEffectMunit
Dependencies.scalacheckEffectMunit ++
Dependencies.scribe
).map(_ % Test),
// Format: on
organizationName := "Swiss Data Science Center (SDSC)",
startYear := Some(java.time.LocalDate.now().getYear),
licenses += ("Apache-2.0", new URI("https://www.apache.org/licenses/LICENSE-2.0.txt").toURL),
licenses += ("Apache-2.0", new URI(
"https://www.apache.org/licenses/LICENSE-2.0.txt"
).toURL),
headerLicense := Some(
HeaderLicense.Custom(
s"""|Copyright ${java.time.LocalDate.now().getYear} Swiss Data Science Center (SDSC)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright 2024 Swiss Data Science Center (SDSC)
* A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and
* Eidgenössische Technische Hochschule Zürich (ETHZ).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.renku.avro.codec

sealed abstract class AvroCodecException(msg: String) extends RuntimeException(msg)

object AvroCodecException:
def encode(msg: String): AvroEncodeError = AvroEncodeError(msg)
def decode(msg: String): AvroDecodeError = AvroDecodeError(msg)

final class AvroEncodeError(msg: String) extends AvroCodecException(msg)

final class AvroDecodeError(msg: String) extends AvroCodecException(msg)
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright 2024 Swiss Data Science Center (SDSC)
* A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and
* Eidgenössische Technische Hochschule Zürich (ETHZ).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.renku.avro.codec

import io.renku.avro.codec.decoders.RecordDecoders
import org.apache.avro.Schema

import scala.deriving.Mirror

trait AvroDecoder[T] { self =>

def decode(schema: Schema): Any => T

final def map[U](f: T => U): AvroDecoder[U] =
AvroDecoder.curried[U](schema => in => f(self.decode(schema).apply(in)))
}

object AvroDecoder:
def apply[T](f: (Schema, Any) => T): AvroDecoder[T] = (schema: Schema) => f(schema, _)
def curried[T](f: Schema => Any => T): AvroDecoder[T] = (schema: Schema) => f(schema)
def basic[T](f: Any => T): AvroDecoder[T] = apply[T]((_, in) => f(in))

def apply[T](using dec: AvroDecoder[T]): AvroDecoder[T] = dec

inline def derived[A <: Product](using
inline A: Mirror.ProductOf[A]
): AvroDecoder[A] =
RecordDecoders.derived[A]
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright 2024 Swiss Data Science Center (SDSC)
* A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and
* Eidgenössische Technische Hochschule Zürich (ETHZ).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.renku.avro.codec

import io.renku.avro.codec.encoders.RecordEncoders
import org.apache.avro.Schema

import scala.deriving.Mirror

trait AvroEncoder[T] { self =>

def encode(schema: Schema): T => Any

final def contramap[U](f: U => T): AvroEncoder[U] =
AvroEncoder.curried[U](schema => u => self.encode(schema).apply(f(u)))

}

object AvroEncoder:
def apply[T](f: (Schema, T) => Any): AvroEncoder[T] = (schema: Schema) => f(schema, _)
def curried[T](f: Schema => T => Any): AvroEncoder[T] = (schema: Schema) => f(schema)
def basic[T](f: T => Any): AvroEncoder[T] = (_: Schema) => t => f(t)
def id[T]: AvroEncoder[T] = AvroEncoder.basic(identity)

def apply[T](using enc: AvroEncoder[T]): AvroEncoder[T] = enc

final inline def derived[A <: Product](using
inline A: Mirror.ProductOf[A]
): AvroEncoder[A] =
RecordEncoders.derived[A]
28 changes: 28 additions & 0 deletions modules/avro-codec/src/main/scala/io/renku/avro/codec/AvroIO.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package io.renku.avro.codec
import org.apache.avro.Schema
import scodec.bits.ByteVector

trait AvroIO extends AvroWriter with AvroReader

object AvroIO:
def apply(schema: Schema): AvroIO =
new AvroIO:
private[this] val reader = AvroReader(schema)
private[this] val writer = AvroWriter(schema)

override def write[A: AvroEncoder](values: Seq[A]): ByteVector =
writer.write(values)

override def writeJson[A: AvroEncoder](values: Seq[A]): ByteVector =
writer.writeJson(values)

override def writeContainer[A: AvroEncoder](values: Seq[A]): ByteVector =
writer.writeContainer(values)

override def read[T: AvroDecoder](input: ByteVector): Seq[T] = reader.read(input)

override def readJson[T: AvroDecoder](input: ByteVector): Seq[T] =
reader.readJson(input)

override def readContainer[T: AvroDecoder](input: ByteVector): Seq[T] =
reader.readContainer(input)
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package io.renku.avro.codec

import org.apache.avro.Schema
import org.apache.avro.file.DataFileReader
import org.apache.avro.generic.GenericDatumReader
import org.apache.avro.io.{
BinaryDecoder,
DatumReader,
Decoder,
DecoderFactory,
JsonDecoder
}
import scodec.bits.ByteVector

import java.io.EOFException

trait AvroReader:
def read[T: AvroDecoder](input: ByteVector): Seq[T]
def readJson[T: AvroDecoder](input: ByteVector): Seq[T]
def readContainer[T: AvroDecoder](input: ByteVector): Seq[T]

object AvroReader:
def apply(schema: Schema): AvroReader = new Impl(schema)

private class Impl(schema: Schema) extends AvroReader:
private[this] val reader = new GenericDatumReader[Any](schema)

extension (self: DatumReader[Any])
def readOpt[A: AvroDecoder](decoder: Decoder): Option[A] =
try Option(self.read(null, decoder)).map(AvroDecoder[A].decode(schema))
catch {
case _: EOFException => None
}

override def read[T: AvroDecoder](
input: ByteVector
): Seq[T] = {
val in = ByteVectorInput(input)
val decoder = DecoderFactory.get().binaryDecoder(in, null)
read0(decoder)
}

def readJson[T: AvroDecoder](input: ByteVector): Seq[T] = {
val in = ByteVectorInput(input)
val decoder = DecoderFactory.get().jsonDecoder(schema, in)
read0(decoder)
}

def read0[T: AvroDecoder](
decoder: BinaryDecoder | JsonDecoder
): Seq[T] =
@annotation.tailrec
def go(r: GenericDatumReader[Any], result: List[T]): Seq[T] =
if (isEnd(decoder)) result.reverse
else
r.readOpt(decoder) match
case None => result.reverse
case Some(el) => go(r, el :: result)

go(reader, Nil)

private def isEnd(d: JsonDecoder | BinaryDecoder): Boolean = d match
case jd: JsonDecoder => false
case bd: BinaryDecoder => bd.isEnd

def readContainer[T: AvroDecoder](input: ByteVector): Seq[T] =
val sin = ByteVectorInput(input)

@annotation.tailrec
def go(r: DataFileReader[Any], result: List[T]): Seq[T] =
if (r.hasNext) {
val data = r.next()
val decoded = AvroDecoder[T].decode(schema)(data)
go(r, decoded :: result)
} else result.reverse

go(new DataFileReader[Any](sin, reader), Nil)
Loading

0 comments on commit c3f3198

Please sign in to comment.