From e7ecae8544f3ea7968c7e46b10a9e58279380005 Mon Sep 17 00:00:00 2001 From: Tony <43834836+lofoyet@users.noreply.github.com> Date: Tue, 21 Nov 2023 09:26:46 -0800 Subject: [PATCH] Solve Allocation of Resources Without Limits or Throttling in org.xerial.snappy:snappy-java (#887) * bump org.xerial.snappy:snappy-java * upgrade reactive mongo and fs2-kafka version for vulnerability in org.xerial.snappy:snappy-java --- build.sbt | 8 ++++---- .../com/iheart/thomas/kafka/MessageProcessor.scala | 10 +++++----- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/build.sbt b/build.sbt index 7a0e5449..1b7b4ddd 100644 --- a/build.sbt +++ b/build.sbt @@ -13,7 +13,7 @@ val gh = GitHubSettings( ) lazy val rootSettings = buildSettings ++ publishSettings ++ commonSettings -val reactiveMongoVer = "1.0.10" +val reactiveMongoVer = "1.1.0" val typeLevelOrg = "org.typelevel" // format: off lazy val libs = @@ -23,7 +23,7 @@ lazy val libs = .addJava(name ="commons-math3", version = "3.6.1", org ="org.apache.commons") .addJVM(name = "decline", version = "2.2.0", org = "com.monovore") .addJVM(name = "embedded-kafka", version = "3.2.3", org = "io.github.embeddedkafka") - .addJVM(name = "fs2-kafka", version = "2.4.0", org = "com.github.fd4s") + .addJVM(name = "fs2-kafka", version = "3.2.0", org = "com.github.fd4s") .add( name = "fs2", version = "3.3.0") .add( name = "cats-effect", version = "3.3.13") .addJVM(name = "henkan-convert", version = "0.6.5", org ="com.kailuowang") @@ -35,8 +35,8 @@ lazy val libs = .add( name = "play-json", version = "2.9.4", org = "com.typesafe.play") .addJVM(name = "play-json-derived-codecs", version = "10.1.0", org = "org.julienrf") .addJVM(name = "rainier", version = "0.3.5", org ="com.stripe", "rainier-core", "rainier-cats") - .addJVM(name = "reactivemongo", version = reactiveMongoVer, org = "org.reactivemongo", "reactivemongo", "reactivemongo-bson-api", "reactivemongo-iteratees" ) - .addJVM(name = "reactivemongo-play-json-compat", version = reactiveMongoVer + "-play27", org = "org.reactivemongo") + .addJVM(name = "reactivemongo", version = reactiveMongoVer + "-RC11", org = "org.reactivemongo", "reactivemongo", "reactivemongo-bson-api", "reactivemongo-iteratees" ) + .addJVM(name = "reactivemongo-play-json-compat", version = reactiveMongoVer + "-play29-RC11", org = "org.reactivemongo") .addJVM(name = "scala-java8-compat", version = "1.0.2", org = "org.scala-lang.modules") .addJVM(name = "scala-collection-compat", version = "2.8.0", org = "org.scala-lang.modules") .add( name = "scalacheck-1-14", version = "3.2.2.0",org = "org.scalatestplus") diff --git a/kafka/src/main/scala/com/iheart/thomas/kafka/MessageProcessor.scala b/kafka/src/main/scala/com/iheart/thomas/kafka/MessageProcessor.scala index af2a1c3b..6158f4b9 100644 --- a/kafka/src/main/scala/com/iheart/thomas/kafka/MessageProcessor.scala +++ b/kafka/src/main/scala/com/iheart/thomas/kafka/MessageProcessor.scala @@ -3,16 +3,16 @@ package kafka import com.iheart.thomas.FeatureName import com.iheart.thomas.analysis.KPIName - import com.iheart.thomas.analysis.ConversionEvent +import cats.effect.kernel.Resource import fs2.Pipe -import fs2.kafka.RecordDeserializer +import fs2.kafka.Deserializer trait MessageProcessor[F[_]] { type RawMessage type PreprocessedMessage - implicit def deserializer: RecordDeserializer[F, RawMessage] + implicit def deserializer: Resource[F, Deserializer[F, RawMessage]] def preprocessor: Pipe[F, RawMessage, PreprocessedMessage] def toConversionEvent( featureName: FeatureName, @@ -28,7 +28,7 @@ object MessageProcessor { FeatureName, KPIName ) => F[Pipe[F, Message, (ArmName, ConversionEvent)]] - )(implicit ev: RecordDeserializer[F, Message] + )(implicit ev: Resource[F, Deserializer[F, Message]] ): MessageProcessor[F] { type RawMessage = Message; type PreprocessedMessage = Message @@ -37,7 +37,7 @@ object MessageProcessor { type RawMessage = Message type PreprocessedMessage = Message - implicit def deserializer: RecordDeserializer[F, Message] = ev + implicit def deserializer: Resource[F, Deserializer[F, RawMessage]] = ev def preprocessor: Pipe[F, Message, Message] = identity def toConversionEvent(