Skip to content

Commit

Permalink
Merge pull request #202 from SwissDataScienceCenter/reprovision-events
Browse files Browse the repository at this point in the history
Reprovision SOLR index from data services
  • Loading branch information
eikek authored Sep 18, 2024
2 parents d256a8b + 211ee48 commit a9fd1a5
Show file tree
Hide file tree
Showing 35 changed files with 1,402 additions and 138 deletions.
80 changes: 80 additions & 0 deletions docs/adr/0001-reprovision-from-data-service.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
# Reprovision From Data-Service

## Context and Problem Statement

The SOLR index can get out of sync due to many possible problems:
wrong events get sent, redis streams corrupted, bugs in search, etc.
Also for some SOLR schema changes, a re-index of every document is
necessary. This results in two things that we need:

1. Run a re-index from search service directly that would ingest every
document again. This can be run on demand or as a result of
specific migrations that require re-indexing.
2. In case the redis stream is corrupted, all data must be sent again
from data services. This indicates a new "start" in the stream and
consumers should discard all previous events.

## Considered Options

### switching multiple streams

In this scenario, data services would create a new stream, recreate
all events from the database and push them into the stream. This
happens concurrently with any "live" events that are happening at the
time. Once done, a different "control message" is sent to tell all
consumers to switch to a new stream.

Pros:
- it makes it very easy to delete the entire old stream and cleanup
space on redis
- if something fails when re-creating the events, data services could
just start over as no one has consumed anything yet
- a re-index from the search side would be simple as it only requires
to re-read the stream from its beginning

Cons:
- it is more complex protocol to tell consumers to switch, they could
miss the message (due to bugs) etc
- it requires to change configuration of services going from
statically known stream names to dynamic ones
- only when the "control" message is sent, consumers know about the
new stream and can start only then with consuming, which would lead
to a longer time of degraded search

### using the existing stream

Since all events are pushed to a single stream, send a
`ReprovisionStarted` message to indicate that data service is going to
re-send data. The search service can process this message by clearing
the index and keeping the message id.

Pros:
- Doesn't require consumers to change their configuration to redis.
- Simple protocol by introducing another message
- Consumers can start immediately with processing new messages

Cons:
- Deleting obsolete messages from the redis stream is possible, but
the data structure is not optimized for that
- A re-index from the search side requires to store the redis message
id of the `ReprovisionStarted` message and thus more book keeping is
required
- Consumers could still read "bad data" if they go before the
`ReprovisionStarted` message, as these messages are still visible
(until finally deleted)

### Additional

We also discussed marking the events coming from reprovisioning so
they are discoverable by consumers. One idea would be to use entirely
different event types, or to add some other property to the events.

## Decision Outcome

For now, we opted into the _using the existing stream_ option. It is
the easiest and quickest to implement and most of the code would be
needed also for the other option. The other option is still possible
to re-visit in the future.

The idea of marking reprovisioning events has been dropped, as it was
deemed to be not necessary or useful.
4 changes: 4 additions & 0 deletions flake.nix
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
./nix/services.nix
{
virtualisation.memorySize = 2048;
networking.hostName = "rsdev-vm";
}
];
};
Expand All @@ -27,6 +28,9 @@
system = flake-utils.lib.system.x86_64-linux;
modules = [
./nix/services.nix
{
networking.hostName = "rsdev-cnt";
}
];
};
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,13 @@ object MessageId:

def apply(id: String): MessageId = id

extension (self: MessageId) def value: String = self
extension (self: MessageId)
def value: String = self

private def order = new Ordered[MessageId] {
override def compare(that: MessageId): Int = self.compareTo(that)
}
export order.*

given Decoder[MessageId] = Decoder.forString
given Encoder[MessageId] = Encoder.forString
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ enum MsgType(val name: String):
case GroupMemberAdded extends MsgType("memberGroup.added")
case GroupMemberUpdated extends MsgType("memberGroup.updated")
case GroupMemberRemoved extends MsgType("memberGroup.removed")
case ReprovisioningStarted extends MsgType("reprovisioning.started")
case ReprovisioningFinished extends MsgType("reprovisioning.finished")

object MsgType:
def fromString(s: String): Either[String, MsgType] =
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Copyright 2024 Swiss Data Science Center (SDSC)
* A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and
* Eidgenössische Technische Hochschule Zürich (ETHZ).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.renku.search.events

import cats.Show
import cats.data.NonEmptyList

import io.renku.avro.codec.AvroEncoder
import io.renku.avro.codec.all.given
import io.renku.events.v2
import io.renku.search.model.*
import org.apache.avro.Schema

sealed trait ReprovisioningFinished extends RenkuEventPayload:
val msgType = MsgType.ReprovisioningFinished
def withId(id: Id): ReprovisioningFinished
val schema: Schema = v2.ReprovisioningFinished.SCHEMA$
val version: NonEmptyList[SchemaVersion] = NonEmptyList.of(SchemaVersion.V2)
def fold[A](f: v2.ReprovisioningFinished => A): A

object ReprovisioningFinished:
def apply(id: Id): ReprovisioningFinished = V2(v2.ReprovisioningFinished(id.value))

final case class V2(event: v2.ReprovisioningFinished) extends ReprovisioningFinished:
val id: Id = Id(event.id)
def withId(id: Id): V2 = V2(event.copy(id = id.value))
def fold[A](f: v2.ReprovisioningFinished => A): A = f(event)

given AvroEncoder[ReprovisioningFinished] =
val v2e = AvroEncoder[v2.ReprovisioningFinished]
AvroEncoder.basic { v =>
v.fold(v2e.encode(v.schema))
}

given EventMessageDecoder[ReprovisioningFinished] =
EventMessageDecoder.instance { qm =>
qm.header.schemaVersion match
case SchemaVersion.V1 =>
Left(DecodeFailure.VersionNotSupported(qm.id, qm.header))

case SchemaVersion.V2 =>
val schema = v2.ReprovisioningFinished.SCHEMA$
qm.toMessage[v2.ReprovisioningFinished](schema)
.map(_.map(ReprovisioningFinished.V2.apply))
}

given Show[ReprovisioningFinished] = Show.show(_.fold(_.toString))
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Copyright 2024 Swiss Data Science Center (SDSC)
* A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and
* Eidgenössische Technische Hochschule Zürich (ETHZ).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.renku.search.events

import cats.Show
import cats.data.NonEmptyList

import io.renku.avro.codec.AvroEncoder
import io.renku.avro.codec.all.given
import io.renku.events.v2
import io.renku.search.model.*
import org.apache.avro.Schema

sealed trait ReprovisioningStarted extends RenkuEventPayload:
val msgType = MsgType.ReprovisioningStarted
def withId(id: Id): ReprovisioningStarted
val schema: Schema = v2.ReprovisioningStarted.SCHEMA$
val version: NonEmptyList[SchemaVersion] = NonEmptyList.of(SchemaVersion.V2)
def fold[A](f: v2.ReprovisioningStarted => A): A

object ReprovisioningStarted:
def apply(id: Id): ReprovisioningStarted = V2(v2.ReprovisioningStarted(id.value))

final case class V2(event: v2.ReprovisioningStarted) extends ReprovisioningStarted:
val id: Id = Id(event.id)
def withId(id: Id): V2 = V2(event.copy(id = id.value))
def fold[A](f: v2.ReprovisioningStarted => A): A = f(event)

given AvroEncoder[ReprovisioningStarted] =
val v2e = AvroEncoder[v2.ReprovisioningStarted]
AvroEncoder.basic { v =>
v.fold(v2e.encode(v.schema))
}

given EventMessageDecoder[ReprovisioningStarted] =
EventMessageDecoder.instance { qm =>
qm.header.schemaVersion match
case SchemaVersion.V1 =>
Left(DecodeFailure.VersionNotSupported(qm.id, qm.header))

case SchemaVersion.V2 =>
val schema = v2.ReprovisioningStarted.SCHEMA$
qm.toMessage[v2.ReprovisioningStarted](schema)
.map(_.map(ReprovisioningStarted.V2.apply))
}

given Show[ReprovisioningStarted] = Show.show(_.fold(_.toString))
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ type SyncEventMessage = EventMessage[ProjectCreated] | EventMessage[ProjectUpdat
EventMessage[UserAdded] | EventMessage[UserUpdated] | EventMessage[UserRemoved] |
EventMessage[GroupAdded] | EventMessage[GroupUpdated] | EventMessage[GroupRemoved] |
EventMessage[GroupMemberAdded] | EventMessage[GroupMemberUpdated] |
EventMessage[GroupMemberRemoved]
EventMessage[GroupMemberRemoved] | EventMessage[ReprovisioningStarted] |
EventMessage[ReprovisioningFinished]

object SyncEventMessage:

Expand Down Expand Up @@ -63,6 +64,10 @@ object SyncEventMessage:
mt.decoder.decode(qm)
case mt: MsgType.GroupMemberRemoved.type =>
mt.decoder.decode(qm)
case mt: MsgType.ReprovisioningStarted.type =>
mt.decoder.decode(qm)
case mt: MsgType.ReprovisioningFinished.type =>
mt.decoder.decode(qm)

// maps each MsgType to its correspondin payload type by providing a
// cast method paired with the decoder that results in that payload
Expand Down Expand Up @@ -188,4 +193,20 @@ object SyncEventMessage:
@targetName("decoderGroupMemberRemoved")
def decoder: EventMessageDecoder[GroupMemberRemoved] =
EventMessageDecoder[GroupMemberRemoved]

extension (self: MsgType.ReprovisioningStarted.type)
@targetName("castReprovisioningStarted")
def cast(x: SyncEventMessage): EventMessage[ReprovisioningStarted] =
x.asInstanceOf[EventMessage[ReprovisioningStarted]]
@targetName("decoderReprovisioningStarted")
def decoder: EventMessageDecoder[ReprovisioningStarted] =
EventMessageDecoder[ReprovisioningStarted]

extension (self: MsgType.ReprovisioningFinished.type)
@targetName("castReprovisioningFinished")
def cast(x: SyncEventMessage): EventMessage[ReprovisioningFinished] =
x.asInstanceOf[EventMessage[ReprovisioningFinished]]
@targetName("decoderReprovisioningFinished")
def decoder: EventMessageDecoder[ReprovisioningFinished] =
EventMessageDecoder[ReprovisioningFinished]
}
Original file line number Diff line number Diff line change
Expand Up @@ -383,6 +383,16 @@ object EventsGenerators:
def groupUpdatedGen(prefix: String): Gen[GroupUpdated] =
v2GroupUpdatedGen(prefix).map(GroupUpdated.V2.apply)

def reprovisionStarted(
id: Gen[Id] = stringGen(5).map(Id.apply)
): Gen[ReprovisioningStarted] =
id.map(ReprovisioningStarted.apply)

def reprovisionFinished(
id: Gen[Id] = stringGen(5).map(Id.apply)
): Gen[ReprovisioningFinished] =
id.map(ReprovisioningFinished.apply)

def stringGen(max: Int): Gen[String] =
Gen
.chooseNum(3, max)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ class RedisQueueClientImpl[F[_]: Async: Log](client: RedisClient)

lazy val logInfo: ((XReadMessage[?, ?], Option[RedisMessage])) => F[Unit] = {
case (m, None) =>
Log[F].info(
logger.warn(
s"Message '${m.id}' skipped as it has no '${MessageBodyKeys.headers}' or '${MessageBodyKeys.payload}'"
)
case _ => ().pure[F]
Expand Down Expand Up @@ -216,7 +216,6 @@ class RedisQueueClientImpl[F[_]: Async: Log](client: RedisClient)
client.underlying.connectAsync[K, V](codec.underlying, client.uri.underlying)
)

client.underlying.connect
val release: StatefulRedisConnection[K, V] => F[Unit] = c =>
FutureLift[F].lift(c.closeAsync()) *>
Log[F].info(s"Releasing Streaming connection: ${client.uri.underlying}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,3 +49,6 @@ trait RedisBaseSuite
yield RedisClients(config, lc, cmds, qc)

val redisClients = ResourceSuiteLocalFixture("all-redis-clients", redisClientsR)

val redisClearAll: IO[Unit] =
IO(redisClients()).flatMap(_.commands.flushAll)
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright 2024 Swiss Data Science Center (SDSC)
* A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and
* Eidgenössische Technische Hochschule Zürich (ETHZ).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.renku.search.cli

import cats.effect.*

import com.monovore.decline.Opts
import io.renku.search.cli.reprovision.*

object ReprovisionCmd:

enum SubCmdOpts:
case Start(opts: StartCmd.Options)
case Finish(opts: FinishCmd.Options)

private val startOpts: Opts[StartCmd.Options] =
Opts.subcommand("start", "Send reprovisioning-started message")(StartCmd.opts)

private val finishOpts: Opts[FinishCmd.Options] =
Opts.subcommand("finish", "Send reprovisioning-finished message")(FinishCmd.opts)

val opts: Opts[SubCmdOpts] =
startOpts
.map(SubCmdOpts.Start.apply)
.orElse(finishOpts.map(SubCmdOpts.Finish.apply))

def apply(opts: SubCmdOpts): IO[ExitCode] =
opts match
case SubCmdOpts.Start(c) => StartCmd(c)
case SubCmdOpts.Finish(c) => FinishCmd(c)
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,7 @@ object SearchCli

case SubCommands.User(opts) =>
UserCmd(opts)

case SubCommands.Reprovision(opts) =>
ReprovisionCmd(opts)
}
Loading

0 comments on commit a9fd1a5

Please sign in to comment.