Skip to content

Commit

Permalink
Add endpoint to trigger re-creating the index from the stream
Browse files Browse the repository at this point in the history
Sometimes it is necessary to re-create the entire index from the raw
data (for example, when solr schema changes happen). This is done by
reading the redis stream from the beginning (or from a given message).

The endpoint is only available from the provisioning process and not
from the api.
  • Loading branch information
eikek committed Sep 5, 2024
1 parent eb1181b commit 376b3c7
Show file tree
Hide file tree
Showing 13 changed files with 377 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,15 @@

package io.renku.search.events

import io.bullet.borer.*

opaque type MessageId = String

object MessageId:

def apply(id: String): MessageId = id

extension (self: MessageId) def value: String = self

given Decoder[MessageId] = Decoder.forString
given Encoder[MessageId] = Encoder.forString
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,11 @@ trait RedisQueueClient[F[_]] {
messageId: MessageId
): F[Unit]

def removeLastProcessed(
clientId: ClientId,
queueNames: NonEmptyList[QueueName]
): F[Unit]

def findLastProcessed(
clientId: ClientId,
queueNames: NonEmptyList[QueueName]
Expand Down Expand Up @@ -144,6 +149,21 @@ class RedisQueueClientImpl[F[_]: Async: Log](client: RedisClient)
}
}

override def removeLastProcessed(
clientId: ClientId,
queueNames: NonEmptyList[QueueName]
): F[Unit] =
val key = formProcessedKey(clientId, queueNames)
createStringCommands.use { cmd =>
logger.debug(s"Delete last message-id for: $key") >>
cmd.del(key).flatMap(n => logger.debug(s"Deleted $n")).recoverWith { case ex =>
logger.warn(
s"Error deleting last message-id '$key'",
ex
)
}
}

override def findLastProcessed(
clientId: ClientId,
queueNames: NonEmptyList[QueueName]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,20 @@ class RedisQueueClientSpec extends CatsEffectSuite with RedisBaseSuite:
.map(v => assert(v contains messageId))
yield ()

test("remove last seen message id"):
val clientId = RedisClientGenerators.clientIdGen.generateOne
val queue = NonEmptyList.of(RedisClientGenerators.queueNameGen.generateOne)
val messageId = RedisClientGenerators.messageIdGen.generateOne
for
client <- IO(redisClients().queueClient)
_ <- client.markProcessed(clientId, queue, messageId)
mid <- client.findLastProcessed(clientId, queue)
_ = assertEquals(mid, Some(messageId))

_ <- client.removeLastProcessed(clientId, queue)
_ <- client.findLastProcessed(clientId, queue).map(v => assert(v.isEmpty))
yield ()

test("can find out the total size of the given stream"):
val queue = RedisClientGenerators.queueNameGen.generateOne
val messages = (stringGen, stringGen).mapN(_ -> _).generateList(1, 30)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ trait QueueClient[F[_]]:

def findLastProcessed(queueNames: NonEmptyList[QueueName]): F[Option[MessageId]]

def removeLastProcessed(queueNames: NonEmptyList[QueueName]): F[Unit]

def getSize(queueName: QueueName): F[Long]

def getSize(queueName: QueueName, from: MessageId): F[Long]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,9 @@ private class QueueClientImpl[F[_]: Async](
): F[Option[MessageId]] =
redisQueueClient.findLastProcessed(clientId, queueNames).map(_.map(MessageId.apply))

override def removeLastProcessed(queueNames: NonEmptyList[QueueName]): F[Unit] =
redisQueueClient.removeLastProcessed(clientId, queueNames)

override def getSize(queueName: QueueName): F[Long] =
redisQueueClient.getSize(queueName)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ object Microservice extends IOApp:
.add(RedisMetrics.unprocessedGauge)
.addAll(SolrMetrics.allCollectors)
metrics = metricsUpdaterTask(services)
httpServer = httpServerTask(registryBuilder, services.config)
httpServer = httpServerTask(registryBuilder, services)
tasks = services.messageHandlers.getAll + metrics + httpServer
pm = services.backgroundManage
_ <- tasks.toList.traverse_(pm.register.tupled)
Expand All @@ -56,10 +56,10 @@ object Microservice extends IOApp:

private def httpServerTask(
registryBuilder: CollectorRegistryBuilder[IO],
config: SearchProvisionConfig
services: Services[IO]
): (TaskName, IO[Unit]) =
val io = Routes[IO](registryBuilder)
.flatMap(HttpServer.build(_, config.httpServerConfig))
val io = Routes[IO](registryBuilder, services)
.flatMap(HttpServer.build(_, services.config.httpServerConfig))
.use(_ => IO.never)
TaskName.fromString("http server") -> io

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,22 +26,35 @@ import io.renku.search.http.metrics.MetricsRoutes
import io.renku.search.http.routes.OperationRoutes
import io.renku.search.metrics.CollectorRegistryBuilder
import org.http4s.HttpRoutes
import org.http4s.dsl.Http4sDsl
import org.http4s.server.Router

private object Routes:

def apply[F[_]: Async: Network](
registryBuilder: CollectorRegistryBuilder[F]
registryBuilder: CollectorRegistryBuilder[F],
services: Services[F]
): Resource[F, HttpRoutes[F]] =
MetricsRoutes[F](registryBuilder).makeRoutes
.map(new Routes[F](_).routes)
.map(new Routes[F](_, services).routes)

final private class Routes[F[_]: Async](metricsRoutes: HttpRoutes[F]):
final private class Routes[F[_]: Async](
metricsRoutes: HttpRoutes[F],
services: Services[F]
) extends Http4sDsl[F]:

private lazy val operationRoutes =
Router[F](
"/reindex" -> reIndexRoutes,
"/" -> OperationRoutes[F]
)

lazy val routes: HttpRoutes[F] =
operationRoutes <+> metricsRoutes

def reIndexRoutes: HttpRoutes[F] = HttpRoutes.of { case POST -> Root =>
services.reIndex.startReIndex(None).flatMap {
case true => NoContent()
case false => UnprocessableEntity()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,16 @@ import fs2.io.net.Network

import io.renku.queue.client.QueueClient
import io.renku.search.provision.handler.PipelineSteps
import io.renku.search.provision.reindex.ReIndexService
import io.renku.search.solr.client.SearchSolrClient

final case class Services[F[_]](
config: SearchProvisionConfig,
solrClient: SearchSolrClient[F],
queueClient: Stream[F, QueueClient[F]],
messageHandlers: MessageHandlers[F],
backgroundManage: BackgroundProcessManage[F]
backgroundManage: BackgroundProcessManage[F],
reIndex: ReIndexService[F]
)

object Services:
Expand All @@ -52,4 +54,6 @@ object Services:
handlers = MessageHandlers[F](steps, cfg.queuesConfig)

bm <- BackgroundProcessManage[F](cfg.retryOnErrorDelay)
} yield Services(cfg, solr, redis, handlers, bm)

ris = ReIndexService[F](bm, redis, solr, cfg.queuesConfig)
} yield Services(cfg, solr, redis, handlers, bm, ris)
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Copyright 2024 Swiss Data Science Center (SDSC)
* A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and
* Eidgenössische Technische Hochschule Zürich (ETHZ).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.renku.search.provision.reindex

import java.time.Instant

import cats.Functor
import cats.effect.*
import cats.syntax.all.*

import io.bullet.borer.Decoder
import io.bullet.borer.Encoder
import io.bullet.borer.derivation.{MapBasedCodecs, key}
import io.renku.json.codecs.all.given
import io.renku.search.events.MessageId
import io.renku.search.model.Id
import io.renku.solr.client.DocVersion

final private case class ReIndexDocument(
id: Id,
created: Instant,
messageId: Option[MessageId],
@key("_version_") version: DocVersion
)

private object ReIndexDocument:
private val docId: Id = Id("reindex_31baded5-9fc2-4935-9b07-80f7a3ecb13f")

def createNew[F[_]: Clock: Functor](messageId: Option[MessageId]): F[ReIndexDocument] =
Clock[F].realTimeInstant.map { now =>
ReIndexDocument(docId, now, messageId, DocVersion.NotExists)
}

given Encoder[ReIndexDocument] = MapBasedCodecs.deriveEncoder
given Decoder[ReIndexDocument] = MapBasedCodecs.deriveDecoder
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
* Copyright 2024 Swiss Data Science Center (SDSC)
* A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and
* Eidgenössische Technische Hochschule Zürich (ETHZ).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.renku.search.provision.reindex

import cats.data.NonEmptyList
import cats.effect.*
import cats.syntax.all.*
import fs2.Stream

import io.renku.queue.client.QueueClient
import io.renku.search.config.QueuesConfig
import io.renku.search.events.MessageId
import io.renku.search.provision.BackgroundProcessManage
import io.renku.search.provision.MessageHandlers.MessageHandlerKey
import io.renku.search.solr.client.SearchSolrClient

trait ReIndexService[F[_]]:
def startReIndex(startMessage: Option[MessageId]): F[Boolean]

object ReIndexService:

def apply[F[_]: Clock: Sync](
bpm: BackgroundProcessManage[F],
redisClient: Stream[F, QueueClient[F]],
solrClient: SearchSolrClient[F],
queueCfg: QueuesConfig
): ReIndexService[F] =
new ReIndexService[F] {
private val queueName = NonEmptyList.of(queueCfg.dataServiceAllEvents)
private val logger = scribe.cats.effect[F]

def startReIndex(startMessage: Option[MessageId]): F[Boolean] =
for
syncDoc <- ReIndexDocument.createNew[F](startMessage)
upsertResp <- solrClient.upsert(Seq(syncDoc))
_ <- logger.debug(s"Insert reindex sync document: $upsertResp")
res <-
if (upsertResp.isFailure)
logger.debug(s"Re-Index called while already in progress").as(false)
else dropIndexAndRestart(syncDoc, startMessage)
yield res

private def dropIndexAndRestart(
syncDoc: ReIndexDocument,
startMessage: Option[MessageId]
) =
for
_ <- logger.info(
s"Starting re-indexing all data, since message ${syncDoc.messageId}"
)
_ <- bpm.cancelProcesses(MessageHandlerKey.isInstance)
_ <- logger.info("Background processes stopped")
_ <- startMessage match
case Some(msgId) =>
logger.info("Set last seen message id to $msgId for $queueName") >>
redisClient
.evalMap(_.markProcessed(queueName, msgId))
.take(1)
.compile
.drain
case None =>
// a special case until the other queues are removed and
// only `data_service.all_events` is left. we remove the
// last seen message-ids from all queue names, in
// contrast for setting a message-id which can only be
// done for one queue (and doesn't make sense to
// implement for all)
logger.info(s"Remove last processed message id for ${queueCfg.all}") >>
redisClient
.evalMap(rc =>
queueCfg.all.toList.traverse_(q =>
rc.removeLastProcessed(NonEmptyList.of(q))
)
)
.take(1)
.compile
.drain
_ <- logger.info("Delete SOLR index")
_ <- solrClient.deletePublicData
_ <- logger.info("Start background processes")
_ <- bpm.background(MessageHandlerKey.isInstance)
_ <- solrClient.deleteIds(NonEmptyList.of(syncDoc.id))
yield true
}
Loading

0 comments on commit 376b3c7

Please sign in to comment.