Skip to content

Commit

Permalink
Consume all events from a single stream (#189)
Browse files Browse the repository at this point in the history
All events are now consumed from a single stream. The other streams are still active for the time being, until producers have been updated.
  • Loading branch information
eikek authored Aug 27, 2024
1 parent ed3f0d6 commit 17acb63
Show file tree
Hide file tree
Showing 87 changed files with 1,645 additions and 885 deletions.
5 changes: 5 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,11 @@ on:
pull_request:
branches:
- development

concurrency:
group: ci-${{ github.event.pull_request.number || github.ref }}
cancel-in-progress: true

jobs:
ci-matrix:
runs-on: ubuntu-latest
Expand Down
1 change: 1 addition & 0 deletions flake.nix
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
groupMemberAdded = "groupMemberAdded";
groupMemberUpdated = "groupMemberUpdated";
groupMemberRemoved = "groupMemberRemoved";
searchSync = "searchSync";
};

queueNameConfig = with nixpkgs.lib; mapAttrs' (key: qn: nameValuePair "RS_REDIS_QUEUE_${key}" qn) queueNames;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright 2024 Swiss Data Science Center (SDSC)
* A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and
* Eidgenössische Technische Hochschule Zürich (ETHZ).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.renku.search.common

import cats.Monoid

import scodec.bits.BitVector
import scodec.bits.ByteVector

trait ScodecInstances:

given Monoid[ByteVector] = new Monoid[ByteVector] {
def empty = ByteVector.empty
def combine(x: ByteVector, y: ByteVector) = x ++ y
}

given Monoid[BitVector] = new Monoid[BitVector] {
def empty = BitVector.empty
def combine(x: BitVector, y: BitVector) = x ++ y
}

object ScodecInstances extends ScodecInstances
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ final case class QueuesConfig(
groupRemoved: QueueName,
groupMemberAdded: QueueName,
groupMemberUpdated: QueueName,
groupMemberRemoved: QueueName
groupMemberRemoved: QueueName,
searchSync: QueueName
):
lazy val all: Set[QueueName] = Set(
projectCreated,
Expand All @@ -54,7 +55,8 @@ final case class QueuesConfig(
groupRemoved,
groupMemberAdded,
groupMemberUpdated,
groupMemberRemoved
groupMemberRemoved,
searchSync
)

object QueuesConfig:
Expand All @@ -74,5 +76,6 @@ object QueuesConfig:
ConfigValues.eventQueue("groupRemoved"),
ConfigValues.eventQueue("groupMemberAdded"),
ConfigValues.eventQueue("groupMemberUpdated"),
ConfigValues.eventQueue("groupMemberRemoved")
ConfigValues.eventQueue("groupMemberRemoved"),
ConfigValues.eventQueue("searchSync")
).mapN(QueuesConfig.apply)
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ final case class EventMessage[P](
private lazy val payloadWriter = AvroWriter(payloadSchema)

def toAvro(using AvroEncoder[P]): EventMessage.AvroPayload =
val h = header.toAvro(payload.getClass.getName)
val h = header.toAvro
val b = header.dataContentType match
case DataContentType.Binary => payloadWriter.write(payload)
case DataContentType.Json => payloadWriter.writeJson(payload)
Expand All @@ -51,12 +51,14 @@ final case class EventMessage[P](

object EventMessage:
def create[F[_]: Sync, A <: RenkuEventPayload](
id: MessageId,
src: MessageSource,
ct: DataContentType,
reqId: RequestId,
payload: A
): F[EventMessage[A]] =
(MessageId.random[F], MessageHeader.create(src, ct, payload.version.head, reqId))
.mapN((id, h) => EventMessage(id, h, payload.schema, Seq(payload)))
MessageHeader
.create(src, payload.msgType, ct, payload.version.head, reqId)
.map(h => EventMessage(id, h, payload.schema, Seq(payload)))

final case class AvroPayload(header: ByteVector, payload: ByteVector)
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ trait EventMessageDecoder[T]:
def decode(qm: QueueMessage): Either[DecodeFailure, EventMessage[T]]

object EventMessageDecoder:
def apply[T](using emd: EventMessageDecoder[T]): EventMessageDecoder[T] = emd
def instance[T](
f: QueueMessage => Either[DecodeFailure, EventMessage[T]]
): EventMessageDecoder[T] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ sealed trait GroupAdded extends RenkuEventPayload:
NonEmptyList.of(fold(_ => SchemaVersion.V2))
def schema: Schema =
fold(_ => v2.GroupAdded.SCHEMA$)
val msgType = MsgType.GroupAdded

object GroupAdded:
def apply(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ sealed trait GroupMemberAdded extends RenkuEventPayload:
def schema: Schema = v2.GroupMemberAdded.SCHEMA$
def userId: Id = fold(a => Id(a.userId))
def role: MemberRole
val msgType: MsgType = MsgType.GroupMemberAdded

object GroupMemberAdded:
def apply(groupId: Id, userId: Id, role: MemberRole): GroupMemberAdded =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ sealed trait GroupMemberRemoved extends RenkuEventPayload:
def version: NonEmptyList[SchemaVersion] = NonEmptyList.of(SchemaVersion.V2)
def schema: Schema = v2.GroupMemberRemoved.SCHEMA$
def userId: Id = fold(a => Id(a.userId))
val msgType: MsgType = MsgType.GroupMemberRemoved

object GroupMemberRemoved:
def apply(groupId: Id, userId: Id): GroupMemberRemoved =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ sealed trait GroupMemberUpdated extends RenkuEventPayload:
def schema: Schema = v2.GroupMemberUpdated.SCHEMA$
def userId: Id = fold(a => Id(a.userId))
def role: MemberRole
val msgType: MsgType = MsgType.GroupMemberUpdated

object GroupMemberUpdated:
def apply(groupId: Id, userId: Id, role: MemberRole): GroupMemberUpdated =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ sealed trait GroupRemoved extends RenkuEventPayload:
NonEmptyList.of(fold(_ => SchemaVersion.V2))
def schema: Schema =
fold(_ => v2.GroupRemoved.SCHEMA$)
val msgType: MsgType = MsgType.GroupRemoved

object GroupRemoved:
def apply(groupId: Id): GroupRemoved =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ sealed trait GroupUpdated extends RenkuEventPayload:
lazy val schema: Schema =
fold(_ => v2.GroupUpdated.SCHEMA$)
def namespace: Namespace
val msgType: MsgType = MsgType.GroupUpdated

object GroupUpdated:
def apply(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import scodec.bits.ByteVector

final case class MessageHeader(
source: MessageSource,
msgType: MsgType,
dataContentType: DataContentType,
schemaVersion: SchemaVersion,
time: Timestamp,
Expand All @@ -38,11 +39,11 @@ final case class MessageHeader(
def withContentType(dt: DataContentType): MessageHeader = copy(dataContentType = dt)
def withSchemaVersion(v: SchemaVersion): MessageHeader = copy(schemaVersion = v)

def toAvro(payloadType: String): ByteVector =
def toAvro: ByteVector =
val h =
Header(
source.value,
payloadType,
msgType.name,
dataContentType.mimeType,
schemaVersion.name,
time.toInstant,
Expand All @@ -55,11 +56,12 @@ final case class MessageHeader(
object MessageHeader:
def create[F[_]: Sync](
src: MessageSource,
msgType: MsgType,
ct: DataContentType,
sv: SchemaVersion,
reqId: RequestId
): F[MessageHeader] =
Timestamp.now[F].map(ts => MessageHeader(src, ct, sv, ts, reqId))
Timestamp.now[F].map(ts => MessageHeader(src, msgType, ct, sv, ts, reqId))

private def readJsonOrBinary(
bv: ByteVector
Expand Down Expand Up @@ -107,8 +109,11 @@ object MessageHeader:
.leftMap(err =>
DecodeFailure.FieldReadError("schemaVersion", h.schemaVersion, err)
)
mst <- MsgType
.fromString(h.`type`)
.leftMap(err => DecodeFailure.FieldReadError("type", h.`type`, err))
src = MessageSource(h.source)
ts = Timestamp(h.time)
rid = RequestId(h.requestId)
yield MessageHeader(src, ctReal, v, ts, rid)
yield MessageHeader(src, mst, ctReal, v, ts, rid)
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,10 @@

package io.renku.search.events

import java.util.UUID

import cats.effect.Sync
import cats.syntax.all.*

opaque type MessageId = String

object MessageId:

def apply(id: String): MessageId = id

def random[F[_]: Sync]: F[MessageId] =
Sync[F].delay(UUID.randomUUID().toString()).map(MessageId(_))

extension (self: MessageId) def value: String = self
43 changes: 43 additions & 0 deletions modules/events/src/main/scala/io/renku/search/events/MsgType.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright 2024 Swiss Data Science Center (SDSC)
* A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and
* Eidgenössische Technische Hochschule Zürich (ETHZ).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.renku.search.events

/** This represents expected values of the `type` property in the message header. */
enum MsgType(val name: String):
case ProjectCreated extends MsgType("project.created")
case ProjectUpdated extends MsgType("project.updated")
case ProjectRemoved extends MsgType("project.removed")
case ProjectMemberAdded extends MsgType("projectAuth.added")
case ProjectMemberUpdated extends MsgType("projectAuth.updated")
case ProjectMemberRemoved extends MsgType("projectAuth.removed")
case UserAdded extends MsgType("user.added")
case UserUpdated extends MsgType("user.updated")
case UserRemoved extends MsgType("user.removed")
case GroupAdded extends MsgType("group.added")
case GroupUpdated extends MsgType("group.updated")
case GroupRemoved extends MsgType("group.removed")
case GroupMemberAdded extends MsgType("memberGroup.added")
case GroupMemberUpdated extends MsgType("memberGroup.updated")
case GroupMemberRemoved extends MsgType("memberGroup.removed")

object MsgType:
def fromString(s: String): Either[String, MsgType] =
MsgType.values
.find(e => e.name.equalsIgnoreCase(s) || e.productPrefix.equalsIgnoreCase(s))
.toRight(s"Invalid msg type: $s")
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ sealed trait ProjectCreated extends RenkuEventPayload:
NonEmptyList.of(fold(_ => SchemaVersion.V1, _ => SchemaVersion.V2))
def schema: Schema =
fold(_ => v1.ProjectCreated.SCHEMA$, _ => v2.ProjectCreated.SCHEMA$)
val msgType = MsgType.ProjectCreated

object ProjectCreated:
def apply(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ sealed trait ProjectMemberAdded extends RenkuEventPayload:
fold(_ => v1.ProjectAuthorizationAdded.SCHEMA$, _ => v2.ProjectMemberAdded.SCHEMA$)
def userId: Id = fold(a => Id(a.userId), b => Id(b.userId))
def role: MemberRole
val msgType: MsgType = MsgType.ProjectMemberAdded

object ProjectMemberAdded:
def apply(projectId: Id, userId: Id, role: MemberRole): ProjectMemberAdded =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ sealed trait ProjectMemberRemoved extends RenkuEventPayload:
_ => v2.ProjectMemberRemoved.SCHEMA$
)
def userId: Id = fold(a => Id(a.userId), b => Id(b.userId))
val msgType: MsgType = MsgType.ProjectMemberRemoved

object ProjectMemberRemoved:
def apply(projectId: Id, userId: Id): ProjectMemberRemoved =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ sealed trait ProjectMemberUpdated extends RenkuEventPayload:
)
def userId: Id = fold(a => Id(a.userId), b => Id(b.userId))
def role: MemberRole
val msgType: MsgType = MsgType.ProjectMemberUpdated

object ProjectMemberUpdated:
def apply(projectId: Id, userId: Id, role: MemberRole): ProjectMemberUpdated =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import org.apache.avro.Schema
final case class ProjectRemoved(id: Id) extends RenkuEventPayload:
val version: NonEmptyList[SchemaVersion] = SchemaVersion.all
val schema: Schema = v2.ProjectRemoved.SCHEMA$
val msgType: MsgType = MsgType.ProjectRemoved

object ProjectRemoved:
given Show[ProjectRemoved] = Show.fromToString
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ sealed trait ProjectUpdated extends RenkuEventPayload:
NonEmptyList.of(fold(_ => SchemaVersion.V1, _ => SchemaVersion.V2))
def schema: Schema =
fold(_ => v1.ProjectUpdated.SCHEMA$, _ => v2.ProjectUpdated.SCHEMA$)
val msgType: MsgType = MsgType.ProjectUpdated

object ProjectUpdated:
def apply(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,4 @@ trait RenkuEventPayload:
def id: Id
def version: NonEmptyList[SchemaVersion]
def schema: Schema
def msgType: MsgType
Loading

0 comments on commit 17acb63

Please sign in to comment.