From 22c6f17be2412cc4f9e89bcec283f4b71998514e Mon Sep 17 00:00:00 2001 From: Jakub Chrobasik Date: Tue, 26 Mar 2024 13:49:49 +0100 Subject: [PATCH] feat: redis connection to be restarted every configured interval (#74) --- .../io/renku/search/config/ConfigValues.scala | 18 +++++--- .../io/renku/redis/client/RedisConfig.scala | 5 ++- .../renku/redis/client/util/RedisSpec.scala | 5 ++- .../io/renku/queue/client/QueueClient.scala | 10 +++++ .../renku/search/cli/perftests/Enqueuer.scala | 18 ++++++++ .../cli/perftests/PerfTestsConfig.scala | 8 +++- .../search/cli/perftests/QueueDelivery.scala | 18 ++++++++ .../renku/search/provision/Microservice.scala | 1 + .../io/renku/search/provision/Services.scala | 8 ++-- .../provision/handler/MessageReader.scala | 43 +++++++++---------- .../provision/handler/PipelineSteps.scala | 12 +++--- .../provision/handler/PushToRedis.scala | 38 ++++++++-------- .../metrics/MetricsCollectorsUpdater.scala | 15 +++---- .../search/provision/ProvisioningSuite.scala | 15 +++---- .../search/solr/client/SearchSolrClient.scala | 1 - 15 files changed, 138 insertions(+), 77 deletions(-) diff --git a/modules/config-values/src/main/scala/io/renku/search/config/ConfigValues.scala b/modules/config-values/src/main/scala/io/renku/search/config/ConfigValues.scala index 58f5f2ef..40ec645e 100644 --- a/modules/config-values/src/main/scala/io/renku/search/config/ConfigValues.scala +++ b/modules/config-values/src/main/scala/io/renku/search/config/ConfigValues.scala @@ -26,7 +26,7 @@ import io.renku.search.http.HttpServerConfig import io.renku.solr.client.{SolrConfig, SolrUser} import org.http4s.Uri -import scala.concurrent.duration.FiniteDuration +import scala.concurrent.duration.* object ConfigValues extends ConfigDecoders: @@ -45,8 +45,11 @@ object ConfigValues extends ConfigDecoders: val maybeDB = renv("REDIS_DB").as[RedisDB].option val maybePass = renv("REDIS_PASSWORD").as[RedisPassword].option val maybeMasterSet = renv("REDIS_MASTER_SET").as[RedisMasterSet].option + val connectionRefresh = + renv("REDIS_CONNECTION_REFRESH_INTERVAL").as[FiniteDuration].default(30 minutes) - (host, port, sentinel, maybeDB, maybePass, maybeMasterSet).mapN(RedisConfig.apply) + (host, port, sentinel, maybeDB, maybePass, maybeMasterSet, connectionRefresh) + .mapN(RedisConfig.apply) } def eventQueue(eventType: String): ConfigValue[Effect, QueueName] = @@ -78,7 +81,12 @@ object ConfigValues extends ConfigDecoders: (url, core, maybeUser, defaultCommit, logMessageBodies).mapN(SolrConfig.apply) } - def httpServerConfig(prefix: String, defaultPort: Port): ConfigValue[Effect, HttpServerConfig] = - val bindAddress = renv(s"${prefix}_HTTP_SERVER_BIND_ADDRESS").default("0.0.0.0").as[Ipv4Address] - val port = renv(s"${prefix}_HTTP_SERVER_PORT").default(defaultPort.value.toString).as[Port] + def httpServerConfig( + prefix: String, + defaultPort: Port + ): ConfigValue[Effect, HttpServerConfig] = + val bindAddress = + renv(s"${prefix}_HTTP_SERVER_BIND_ADDRESS").default("0.0.0.0").as[Ipv4Address] + val port = + renv(s"${prefix}_HTTP_SERVER_PORT").default(defaultPort.value.toString).as[Port] (bindAddress, port).mapN(HttpServerConfig.apply) diff --git a/modules/redis-client/src/main/scala/io/renku/redis/client/RedisConfig.scala b/modules/redis-client/src/main/scala/io/renku/redis/client/RedisConfig.scala index 3787e39e..ce140aac 100644 --- a/modules/redis-client/src/main/scala/io/renku/redis/client/RedisConfig.scala +++ b/modules/redis-client/src/main/scala/io/renku/redis/client/RedisConfig.scala @@ -21,13 +21,16 @@ package io.renku.redis.client import dev.profunktor.redis4cats.connection.RedisURI import io.lettuce.core.RedisURI as JRedisURI +import scala.concurrent.duration.FiniteDuration + final case class RedisConfig( host: RedisHost, port: RedisPort, sentinel: Boolean = false, maybeDB: Option[RedisDB] = None, maybePassword: Option[RedisPassword] = None, - maybeMasterSet: Option[RedisMasterSet] = None + maybeMasterSet: Option[RedisMasterSet] = None, + connectionRefreshInterval: FiniteDuration ): lazy val asRedisUri: RedisURI = val builder = JRedisURI.Builder.redis(host.value, port.value) diff --git a/modules/redis-client/src/test/scala/io/renku/redis/client/util/RedisSpec.scala b/modules/redis-client/src/test/scala/io/renku/redis/client/util/RedisSpec.scala index a7d9cb08..e8a2ef7f 100644 --- a/modules/redis-client/src/test/scala/io/renku/redis/client/util/RedisSpec.scala +++ b/modules/redis-client/src/test/scala/io/renku/redis/client/util/RedisSpec.scala @@ -29,6 +29,8 @@ import io.lettuce.core.RedisConnectionException import io.renku.redis.client.* import io.renku.servers.RedisServer +import scala.concurrent.duration.* + trait RedisSpec: self: munit.Suite => @@ -60,7 +62,8 @@ trait RedisSpec: override lazy val redisConfig: RedisConfig = RedisConfig( RedisHost(server.host), - RedisPort(server.port) + RedisPort(server.port), + connectionRefreshInterval = 10 minutes ) override def beforeAll(): Unit = diff --git a/modules/renku-redis-client/src/main/scala/io/renku/queue/client/QueueClient.scala b/modules/renku-redis-client/src/main/scala/io/renku/queue/client/QueueClient.scala index fcd0d631..ba0cdfd3 100644 --- a/modules/renku-redis-client/src/main/scala/io/renku/queue/client/QueueClient.scala +++ b/modules/renku-redis-client/src/main/scala/io/renku/queue/client/QueueClient.scala @@ -50,5 +50,15 @@ trait QueueClient[F[_]]: def getSize(queueName: QueueName, from: MessageId): F[Long] object QueueClient: + + // Be aware that it was observed that the client can lose the connection to Redis. + // Because of that consider using the QueueClient.stream + // that auto-refreshes (recreates) the connection every connectionRefreshInterval. def make[F[_]: Async](redisConfig: RedisConfig): Resource[F, QueueClient[F]] = RedisQueueClient.make[F](redisConfig).map(new QueueClientImpl[F](_)) + + def stream[F[_]: Async](redisConfig: RedisConfig): Stream[F, QueueClient[F]] = + val s = Stream + .resource[F, QueueClient[F]](make(redisConfig)) + .interruptAfter(redisConfig.connectionRefreshInterval) + s ++ stream(redisConfig) diff --git a/modules/search-cli/src/main/scala/io/renku/search/cli/perftests/Enqueuer.scala b/modules/search-cli/src/main/scala/io/renku/search/cli/perftests/Enqueuer.scala index 4bd9e372..ed06e118 100644 --- a/modules/search-cli/src/main/scala/io/renku/search/cli/perftests/Enqueuer.scala +++ b/modules/search-cli/src/main/scala/io/renku/search/cli/perftests/Enqueuer.scala @@ -1,3 +1,21 @@ +/* + * Copyright 2024 Swiss Data Science Center (SDSC) + * A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and + * Eidgenössische Technische Hochschule Zürich (ETHZ). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package io.renku.search.cli.perftests import cats.effect.{Async, Ref, Resource} diff --git a/modules/search-cli/src/main/scala/io/renku/search/cli/perftests/PerfTestsConfig.scala b/modules/search-cli/src/main/scala/io/renku/search/cli/perftests/PerfTestsConfig.scala index 999d15c0..d5422bb9 100644 --- a/modules/search-cli/src/main/scala/io/renku/search/cli/perftests/PerfTestsConfig.scala +++ b/modules/search-cli/src/main/scala/io/renku/search/cli/perftests/PerfTestsConfig.scala @@ -23,6 +23,8 @@ import com.monovore.decline.Opts import io.renku.redis.client.* import org.http4s.Uri +import scala.concurrent.duration.* + final case class PerfTestsConfig( itemsToGenerate: Int, providers: List[Provider], @@ -79,9 +81,13 @@ private object RedisConfigOpts: .option[String]("redis-masterset", "Redis masterset") .map(RedisMasterSet.apply) .orNone + private val refreshInterval: Opts[FiniteDuration] = + Opts + .option[FiniteDuration]("redis-connection-refresh", "Redis connection refresh") + .withDefault(1 hour) val configOpts: Opts[RedisConfig] = - (host, port, sentinel, db.map(Option(_)), password, masterSet) + (host, port, sentinel, db.map(Option(_)), password, masterSet, refreshInterval) .mapN(RedisConfig.apply) sealed private trait Provider diff --git a/modules/search-cli/src/main/scala/io/renku/search/cli/perftests/QueueDelivery.scala b/modules/search-cli/src/main/scala/io/renku/search/cli/perftests/QueueDelivery.scala index 749c8f8d..51b6e1d9 100644 --- a/modules/search-cli/src/main/scala/io/renku/search/cli/perftests/QueueDelivery.scala +++ b/modules/search-cli/src/main/scala/io/renku/search/cli/perftests/QueueDelivery.scala @@ -1,3 +1,21 @@ +/* + * Copyright 2024 Swiss Data Science Center (SDSC) + * A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and + * Eidgenössische Technische Hochschule Zürich (ETHZ). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package io.renku.search.cli.perftests import cats.Show diff --git a/modules/search-provision/src/main/scala/io/renku/search/provision/Microservice.scala b/modules/search-provision/src/main/scala/io/renku/search/provision/Microservice.scala index 8f1e872a..021555f2 100644 --- a/modules/search-provision/src/main/scala/io/renku/search/provision/Microservice.scala +++ b/modules/search-provision/src/main/scala/io/renku/search/provision/Microservice.scala @@ -26,6 +26,7 @@ import io.renku.search.metrics.CollectorRegistryBuilder import io.renku.search.provision.metrics.{MetricsCollectorsUpdater, RedisMetrics} import io.renku.search.solr.schema.Migrations import io.renku.solr.client.migration.SchemaMigrator + import scala.concurrent.duration.Duration object Microservice extends IOApp: diff --git a/modules/search-provision/src/main/scala/io/renku/search/provision/Services.scala b/modules/search-provision/src/main/scala/io/renku/search/provision/Services.scala index 99ef27c6..ecaa941d 100644 --- a/modules/search-provision/src/main/scala/io/renku/search/provision/Services.scala +++ b/modules/search-provision/src/main/scala/io/renku/search/provision/Services.scala @@ -19,6 +19,7 @@ package io.renku.search.provision import cats.effect.kernel.{Async, Resource} +import fs2.Stream import fs2.io.net.Network import io.renku.queue.client.QueueClient import io.renku.search.provision.handler.PipelineSteps @@ -27,7 +28,7 @@ import io.renku.search.solr.client.SearchSolrClient final case class Services[F[_]]( config: SearchProvisionConfig, solrClient: SearchSolrClient[F], - queueClient: Resource[F, QueueClient[F]], + queueClient: Stream[F, QueueClient[F]], messageHandlers: MessageHandlers[F] ) @@ -38,9 +39,8 @@ object Services: cfg <- Resource.eval(SearchProvisionConfig.config.load[F]) solr <- SearchSolrClient.make[F](cfg.solrConfig) - // The redis client must be initialized on each operation to - // be able to connect to the cluster - redis = QueueClient.make[F](cfg.redisConfig) + // The redis client is refreshed every now and then so it's provided as a stream + redis = QueueClient.stream[F](cfg.redisConfig) steps = PipelineSteps[F]( solr, diff --git a/modules/search-provision/src/main/scala/io/renku/search/provision/handler/MessageReader.scala b/modules/search-provision/src/main/scala/io/renku/search/provision/handler/MessageReader.scala index 0b9175f9..fe3140ac 100644 --- a/modules/search-provision/src/main/scala/io/renku/search/provision/handler/MessageReader.scala +++ b/modules/search-provision/src/main/scala/io/renku/search/provision/handler/MessageReader.scala @@ -18,21 +18,17 @@ package io.renku.search.provision.handler -import cats.effect.Sync +import cats.Show +import cats.effect.Async import cats.syntax.all.* -import fs2.Stream - -import io.renku.queue.client.{QueueClient, QueueMessage} -import io.renku.redis.client.{ClientId, QueueName} +import fs2.{Chunk, Stream} +import io.renku.queue.client.{QueueClient, QueueMessage, RequestId} +import io.renku.redis.client.{ClientId, MessageId, QueueName} import io.renku.search.provision.QueueMessageDecoder -import scala.concurrent.duration.FiniteDuration -import fs2.Chunk -import cats.effect.{Async, Resource} -import cats.Show -import io.renku.queue.client.RequestId -import io.renku.redis.client.MessageId import scribe.Scribe +import scala.concurrent.duration.FiniteDuration + trait MessageReader[F[_]]: def read[A](using QueueMessageDecoder[F, A], @@ -51,7 +47,7 @@ trait MessageReader[F[_]]: object MessageReader: final case class Message[A](raw: QueueMessage, decoded: Seq[A]): - val id = raw.id + val id: MessageId = raw.id val requestId: RequestId = RequestId(raw.header.requestId) def map[B](f: A => B): Message[B] = Message(raw, decoded.map(f)) def stream[F[_]]: Stream[F, A] = Stream.emits(decoded).covary[F] @@ -59,18 +55,21 @@ object MessageReader: /** MessageReader that dequeues messages attempt to decode it. If decoding fails, the * message is marked as processed and the next message is read. */ - def apply[F[_]: Sync]( - queueClient: Resource[F, QueueClient[F]], + def apply[F[_]: Async]( + queueClient: Stream[F, QueueClient[F]], queue: QueueName, clientId: ClientId, chunkSize: Int ): MessageReader[F] = new MessageReader[F]: - val logger = scribe.cats.effect[F] + val logger: Scribe[F] = scribe.cats.effect[F] - def read[A](using QueueMessageDecoder[F, A], Show[A]): Stream[F, Message[A]] = + override def read[A](using + QueueMessageDecoder[F, A], + Show[A] + ): Stream[F, Message[A]] = for { - client <- Stream.resource(queueClient) + client <- queueClient last <- Stream.eval(client.findLastProcessed(clientId, queue)) qmsg <- client.acquireEventsStream(queue, chunkSize, last) dec <- Stream @@ -91,17 +90,17 @@ object MessageReader: _ <- Stream.eval(logInfo(dec)) } yield dec - def markProcessed(id: MessageId): F[Unit] = - queueClient.use(_.markProcessed(clientId, queue, id)) + override def markProcessed(id: MessageId): F[Unit] = + queueClient.evalMap(_.markProcessed(clientId, queue, id)).take(1).compile.drain - def markProcessedError(err: Throwable, id: MessageId)(using + override def markProcessedError(err: Throwable, id: MessageId)(using logger: Scribe[F] ): F[Unit] = markProcessed(id) >> - logger.error(s"Processing messageId: ${id} for '${queue}' failed", err) + logger.error(s"Processing messageId: $id for '$queue' failed", err) private def logInfo[A: Show](m: Message[A]): F[Unit] = lazy val values = m.decoded.mkString_(", ") logger.info( - s"""Received message queue: ${queue.name}, id: ${m.id}, source: ${m.raw.header.source}, type: ${m.raw.header.`type`} for: ${values}""" + s"""Received message queue: ${queue.name}, id: ${m.id}, source: ${m.raw.header.source}, type: ${m.raw.header.`type`} for: $values""" ) diff --git a/modules/search-provision/src/main/scala/io/renku/search/provision/handler/PipelineSteps.scala b/modules/search-provision/src/main/scala/io/renku/search/provision/handler/PipelineSteps.scala index 6ad4cda6..36b61e90 100644 --- a/modules/search-provision/src/main/scala/io/renku/search/provision/handler/PipelineSteps.scala +++ b/modules/search-provision/src/main/scala/io/renku/search/provision/handler/PipelineSteps.scala @@ -18,12 +18,12 @@ package io.renku.search.provision.handler -import cats.effect.{Resource, Sync} -import io.renku.search.solr.client.SearchSolrClient +import cats.effect.Async +import fs2.Stream import io.renku.queue.client.QueueClient -import io.renku.redis.client.ClientId -import io.renku.redis.client.QueueName +import io.renku.redis.client.{ClientId, QueueName} import io.renku.search.provision.QueuesConfig +import io.renku.search.solr.client.SearchSolrClient trait PipelineSteps[F[_]]: def reader: MessageReader[F] @@ -35,9 +35,9 @@ trait PipelineSteps[F[_]]: def userUtils: UserUtils[F] object PipelineSteps: - def apply[F[_]: Sync]( + def apply[F[_]: Async]( solrClient: SearchSolrClient[F], - queueClient: Resource[F, QueueClient[F]], + queueClient: Stream[F, QueueClient[F]], queueConfig: QueuesConfig, inChunkSize: Int, clientId: ClientId diff --git a/modules/search-provision/src/main/scala/io/renku/search/provision/handler/PushToRedis.scala b/modules/search-provision/src/main/scala/io/renku/search/provision/handler/PushToRedis.scala index d7692a8a..de55649b 100644 --- a/modules/search-provision/src/main/scala/io/renku/search/provision/handler/PushToRedis.scala +++ b/modules/search-provision/src/main/scala/io/renku/search/provision/handler/PushToRedis.scala @@ -19,16 +19,15 @@ package io.renku.search.provision.handler import cats.Show -import cats.effect.{Resource, Sync} +import cats.effect.Sync import cats.syntax.all.* -import fs2.Pipe - +import fs2.{Pipe, Stream} import io.renku.avro.codec.encoders.all.given import io.renku.events.v1.ProjectAuthorizationRemoved import io.renku.queue.client.* -import io.renku.redis.client.ClientId -import io.renku.redis.client.MessageId +import io.renku.redis.client.{ClientId, MessageId} import io.renku.search.provision.QueuesConfig +import scribe.Scribe trait PushToRedis[F[_]]: def pushAuthorizationRemoved( @@ -40,31 +39,32 @@ trait PushToRedis[F[_]]: object PushToRedis: def apply[F[_]: Sync]( - queueClient: Resource[F, QueueClient[F]], + queueClient: Stream[F, QueueClient[F]], clientId: ClientId, queueConfig: QueuesConfig ): PushToRedis[F] = new PushToRedis[F] { - val logger = scribe.cats.effect[F] + val logger: Scribe[F] = scribe.cats.effect[F] + def pushAuthorizationRemoved( requestId: RequestId )(using Show[ProjectAuthorizationRemoved] ): Pipe[F, ProjectAuthorizationRemoved, MessageId] = - _.evalMap(payload => - createHeader(requestId).flatMap { header => - logger.debug(show"Pushing $payload to redis") >> - queueClient.use( - _.enqueue( - queueConfig.projectAuthorizationRemoved, - header, - payload - ) - ) - } + _.evalMap(payload => createHeader(requestId).tupleRight(payload)) + .evalTap { case (_, payload) => logger.debug(show"Pushing $payload to redis") } + .flatMap(enqueue) + + private def enqueue(header: MessageHeader, payload: ProjectAuthorizationRemoved) = + queueClient.evalMap( + _.enqueue( + queueConfig.projectAuthorizationRemoved, + header, + payload + ) ) - def createHeader(requestId: RequestId): F[MessageHeader] = + private def createHeader(requestId: RequestId): F[MessageHeader] = MessageHeader[F]( MessageSource(clientId.value), ProjectAuthorizationRemoved.SCHEMA$, diff --git a/modules/search-provision/src/main/scala/io/renku/search/provision/metrics/MetricsCollectorsUpdater.scala b/modules/search-provision/src/main/scala/io/renku/search/provision/metrics/MetricsCollectorsUpdater.scala index fb85ca58..8c6f6e4a 100644 --- a/modules/search-provision/src/main/scala/io/renku/search/provision/metrics/MetricsCollectorsUpdater.scala +++ b/modules/search-provision/src/main/scala/io/renku/search/provision/metrics/MetricsCollectorsUpdater.scala @@ -18,7 +18,7 @@ package io.renku.search.provision.metrics -import cats.effect.{Async, Resource} +import cats.effect.Async import cats.syntax.all.* import fs2.Stream import io.renku.queue.client.QueueClient @@ -33,17 +33,17 @@ object MetricsCollectorsUpdater: clientId: ClientId, queuesConfig: QueuesConfig, updateInterval: FiniteDuration, - qcResource: Resource[F, QueueClient[F]] + queueClient: Stream[F, QueueClient[F]] ): MetricsCollectorsUpdater[F] = new MetricsCollectorsUpdater[F]( - qcResource, + queueClient, queuesConfig, RedisMetrics.updaterFactories(clientId), updateInterval ) class MetricsCollectorsUpdater[F[_]: Async]( - qcResource: Resource[F, QueueClient[F]], + queueClient: Stream[F, QueueClient[F]], queuesConfig: QueuesConfig, collectors: List[QueueClient[F] => CollectorUpdater[F]], updateInterval: FiniteDuration @@ -52,15 +52,14 @@ class MetricsCollectorsUpdater[F[_]: Async]( private val allQueues = queuesConfig.all.toList def run(): F[Unit] = - val queueClient: Stream[F, QueueClient[F]] = - Stream.resource(qcResource) + createUpdateStream.compile.drain + + private def createUpdateStream: Stream[F, Unit] = val awake: Stream[F, Unit] = Stream.awakeEvery[F](updateInterval).void queueClient .flatTap(_ => awake) .evalMap(runUpdate) - .compile - .drain private def runUpdate(qc: QueueClient[F]) = allQueues.traverse_ { q => diff --git a/modules/search-provision/src/test/scala/io/renku/search/provision/ProvisioningSuite.scala b/modules/search-provision/src/test/scala/io/renku/search/provision/ProvisioningSuite.scala index a24331ba..c9f68e49 100644 --- a/modules/search-provision/src/test/scala/io/renku/search/provision/ProvisioningSuite.scala +++ b/modules/search-provision/src/test/scala/io/renku/search/provision/ProvisioningSuite.scala @@ -20,18 +20,15 @@ package io.renku.search.provision import cats.effect.{IO, Resource} import cats.syntax.all.* - -import io.renku.queue.client.QueueClient -import io.renku.queue.client.QueueSpec -import io.renku.redis.client.ClientId -import io.renku.redis.client.QueueName +import fs2.Stream +import io.renku.queue.client.{QueueClient, QueueSpec} +import io.renku.redis.client.{ClientId, QueueName} import io.renku.search.LoggingConfigure import io.renku.search.model.Id import io.renku.search.provision.handler.PipelineSteps import io.renku.search.provision.project.ProjectSyntax import io.renku.search.provision.user.UserSyntax -import io.renku.search.solr.client.SearchSolrClient -import io.renku.search.solr.client.SearchSolrSpec +import io.renku.search.solr.client.{SearchSolrClient, SearchSolrSpec} import io.renku.search.solr.documents.* import munit.CatsEffectSuite @@ -63,9 +60,9 @@ trait ProvisioningSuite val steps = PipelineSteps[IO]( solrClient, - Resource.pure(queueClient), + Stream[IO, QueueClient[IO]](queueClient), queueConfig, - 1, + inChunkSize = 1, clientId ) val handlers = MessageHandlers[IO](steps, queueConfig) diff --git a/modules/search-solr-client/src/main/scala/io/renku/search/solr/client/SearchSolrClient.scala b/modules/search-solr-client/src/main/scala/io/renku/search/solr/client/SearchSolrClient.scala index c0fe2800..41b53929 100644 --- a/modules/search-solr-client/src/main/scala/io/renku/search/solr/client/SearchSolrClient.scala +++ b/modules/search-solr-client/src/main/scala/io/renku/search/solr/client/SearchSolrClient.scala @@ -22,7 +22,6 @@ import cats.data.NonEmptyList import cats.effect.{Async, Resource} import fs2.Stream import fs2.io.net.Network - import io.bullet.borer.{Decoder, Encoder} import io.renku.search.model.Id import io.renku.search.query.Query