From 0221ffca1cc5ef93c0a88424252d430ab6a468ca Mon Sep 17 00:00:00 2001 From: Benjamin Voiturier Date: Tue, 19 Nov 2024 19:53:10 +0100 Subject: [PATCH 1/5] feat: allow ZIO scope to be passed to ZKafkaConsumer to better control lifecycle Signed-off-by: Benjamin Voiturier --- .../shared/messaging/MessagingService.scala | 7 +++--- .../kafka/ZKafkaMessagingServiceImpl.scala | 25 ++++++------------- 2 files changed, 12 insertions(+), 20 deletions(-) diff --git a/shared/core/src/main/scala/org/hyperledger/identus/shared/messaging/MessagingService.scala b/shared/core/src/main/scala/org/hyperledger/identus/shared/messaging/MessagingService.scala index 8c3a60e56e..2c926e2aac 100644 --- a/shared/core/src/main/scala/org/hyperledger/identus/shared/messaging/MessagingService.scala +++ b/shared/core/src/main/scala/org/hyperledger/identus/shared/messaging/MessagingService.scala @@ -1,12 +1,12 @@ package org.hyperledger.identus.shared.messaging import org.hyperledger.identus.shared.messaging.kafka.{InMemoryMessagingService, ZKafkaMessagingServiceImpl} -import zio.{durationInt, Cause, Duration, EnvironmentTag, RIO, RLayer, Task, URIO, URLayer, ZIO, ZLayer} +import zio.{Cause, Duration, EnvironmentTag, RIO, RLayer, Scope, Task, URIO, URLayer, ZIO, ZLayer, durationInt} import java.time.Instant trait MessagingService { def makeConsumer[K, V](groupId: String)(implicit kSerde: Serde[K], vSerde: Serde[V]): Task[Consumer[K, V]] - def makeProducer[K, V]()(implicit kSerde: Serde[K], vSerde: Serde[V]): Task[Producer[K, V]] + def makeProducer[K, V]()(implicit kSerde: Serde[K], vSerde: Serde[V]): RIO[Scope, Producer[K, V]] } object MessagingService { @@ -81,8 +81,9 @@ object MessagingService { def producerLayer[K: EnvironmentTag, V: EnvironmentTag](implicit kSerde: Serde[K], vSerde: Serde[V] - ): RLayer[MessagingService, Producer[K, V]] = ZLayer.fromZIO(for { + ): RLayer[Scope & MessagingService, Producer[K, V]] = ZLayer.fromZIO(for { messagingService <- ZIO.service[MessagingService] + _ <- ZIO.logInfo("Producer layer invoked!!") producer <- messagingService.makeProducer[K, V]() } yield producer) diff --git a/shared/core/src/main/scala/org/hyperledger/identus/shared/messaging/kafka/ZKafkaMessagingServiceImpl.scala b/shared/core/src/main/scala/org/hyperledger/identus/shared/messaging/kafka/ZKafkaMessagingServiceImpl.scala index 14e5ab0491..c9a48caa78 100644 --- a/shared/core/src/main/scala/org/hyperledger/identus/shared/messaging/kafka/ZKafkaMessagingServiceImpl.scala +++ b/shared/core/src/main/scala/org/hyperledger/identus/shared/messaging/kafka/ZKafkaMessagingServiceImpl.scala @@ -3,12 +3,8 @@ package org.hyperledger.identus.shared.messaging.kafka import org.apache.kafka.clients.consumer.ConsumerConfig import org.apache.kafka.common.header.Headers import org.hyperledger.identus.shared.messaging.* -import zio.{Duration, RIO, Task, URIO, URLayer, ZIO, ZLayer} -import zio.kafka.consumer.{ - Consumer as ZKConsumer, - ConsumerSettings as ZKConsumerSettings, - Subscription as ZKSubscription -} +import zio.{Duration, RIO, Scope, Task, URIO, URLayer, ZIO, ZLayer} +import zio.kafka.consumer.{Consumer as ZKConsumer, ConsumerSettings as ZKConsumerSettings, Subscription as ZKSubscription} import zio.kafka.consumer.Consumer.{AutoOffsetStrategy, OffsetRetrieval} import zio.kafka.producer.{Producer as ZKProducer, ProducerSettings as ZKProducerSettings} import zio.kafka.serde.{Deserializer as ZKDeserializer, Serializer as ZKSerializer} @@ -36,8 +32,10 @@ class ZKafkaMessagingServiceImpl( ) ) - override def makeProducer[K, V]()(implicit kSerde: Serde[K], vSerde: Serde[V]): Task[Producer[K, V]] = - ZIO.succeed(new ZKafkaProducerImpl[K, V](bootstrapServers, kSerde, vSerde)) + override def makeProducer[K, V]()(implicit kSerde: Serde[K], vSerde: Serde[V]): RIO[Scope, Producer[K, V]] = + for { + zkProducer <- ZKProducer.make(ZKProducerSettings(bootstrapServers)) + } yield new ZKafkaProducerImpl[K, V](zkProducer, kSerde, vSerde) } object ZKafkaMessagingServiceImpl { @@ -109,13 +107,7 @@ class ZKafkaConsumerImpl[K, V]( .runDrain } -class ZKafkaProducerImpl[K, V](bootstrapServers: List[String], kSerde: Serde[K], vSerde: Serde[V]) - extends Producer[K, V] { - private val zkProducer = ZLayer.scoped( - ZKProducer.make( - ZKProducerSettings(bootstrapServers) - ) - ) +class ZKafkaProducerImpl[K, V](zkProducer: ZKProducer, kSerde: Serde[K], vSerde: Serde[V]) extends Producer[K, V] { private val zkKeySerializer = new ZKSerializer[Any, K] { override def serialize(topic: String, headers: Headers, value: K): RIO[Any, Array[Byte]] = @@ -128,10 +120,9 @@ class ZKafkaProducerImpl[K, V](bootstrapServers: List[String], kSerde: Serde[K], } override def produce(topic: String, key: K, value: V): Task[Unit] = - ZKProducer + zkProducer .produce(topic, key, value, zkKeySerializer, zkValueSerializer) .tap(metadata => ZIO.logInfo(s"Message produced: ${metadata.offset()}")) .map(_ => ()) - .provideSome(zkProducer) } From 4ff4ea5a76b61196d2f732410960d42debd69c7f Mon Sep 17 00:00:00 2001 From: Benjamin Voiturier Date: Tue, 19 Nov 2024 19:53:26 +0100 Subject: [PATCH 2/5] test: fix unit tests Signed-off-by: Benjamin Voiturier --- .../oid4vci/domain/OIDCCredentialIssuerServiceSpec.scala | 2 +- .../connect/core/service/ConnectionServiceImplSpec.scala | 1 + .../connect/core/service/ConnectionServiceNotifierSpec.scala | 3 ++- .../hyperledger/identus/messaging/MessagingServiceTest.scala | 5 +++-- .../pollux/core/service/CredentialServiceSpecHelper.scala | 2 +- .../pollux/core/service/PresentationServiceSpecHelper.scala | 2 +- 6 files changed, 9 insertions(+), 6 deletions(-) diff --git a/cloud-agent/service/server/src/test/scala/org/hyperledger/identus/oid4vci/domain/OIDCCredentialIssuerServiceSpec.scala b/cloud-agent/service/server/src/test/scala/org/hyperledger/identus/oid4vci/domain/OIDCCredentialIssuerServiceSpec.scala index d1a5e65584..e80051d700 100644 --- a/cloud-agent/service/server/src/test/scala/org/hyperledger/identus/oid4vci/domain/OIDCCredentialIssuerServiceSpec.scala +++ b/cloud-agent/service/server/src/test/scala/org/hyperledger/identus/oid4vci/domain/OIDCCredentialIssuerServiceSpec.scala @@ -54,7 +54,7 @@ object OIDCCredentialIssuerServiceSpec LinkSecretServiceImpl.layer, CredentialServiceImpl.layer, (MessagingServiceConfig.inMemoryLayer >>> MessagingService.serviceLayer >>> - MessagingService.producerLayer[UUID, WalletIdAndRecordId]).orDie, + (zio.Scope.default >>> MessagingService.producerLayer[UUID, WalletIdAndRecordId])).orDie, OIDCCredentialIssuerServiceImpl.layer ) diff --git a/connect/core/src/test/scala/org/hyperledger/identus/connect/core/service/ConnectionServiceImplSpec.scala b/connect/core/src/test/scala/org/hyperledger/identus/connect/core/service/ConnectionServiceImplSpec.scala index 7067b55bf6..e999551dbf 100644 --- a/connect/core/src/test/scala/org/hyperledger/identus/connect/core/service/ConnectionServiceImplSpec.scala +++ b/connect/core/src/test/scala/org/hyperledger/identus/connect/core/service/ConnectionServiceImplSpec.scala @@ -316,6 +316,7 @@ object ConnectionServiceImplSpec extends ZIOSpecDefault { messaging.MessagingService.serviceLayer, messaging.MessagingService.producerLayer[UUID, WalletIdAndRecordId], ZLayer.succeed(WalletAccessContext(WalletId.random)), + zio.Scope.default ) } diff --git a/connect/core/src/test/scala/org/hyperledger/identus/connect/core/service/ConnectionServiceNotifierSpec.scala b/connect/core/src/test/scala/org/hyperledger/identus/connect/core/service/ConnectionServiceNotifierSpec.scala index 185bd95b95..e2b03eb30c 100644 --- a/connect/core/src/test/scala/org/hyperledger/identus/connect/core/service/ConnectionServiceNotifierSpec.scala +++ b/connect/core/src/test/scala/org/hyperledger/identus/connect/core/service/ConnectionServiceNotifierSpec.scala @@ -155,7 +155,8 @@ object ConnectionServiceNotifierSpec extends ZIOSpecDefault { ZLayer.succeed(WalletAccessContext(WalletId.random)), messaging.MessagingServiceConfig.inMemoryLayer, messaging.MessagingService.serviceLayer, - messaging.MessagingService.producerLayer[UUID, WalletIdAndRecordId] + messaging.MessagingService.producerLayer[UUID, WalletIdAndRecordId], + zio.Scope.default ) ) } diff --git a/event-notification/src/test/scala/org/hyperledger/identus/messaging/MessagingServiceTest.scala b/event-notification/src/test/scala/org/hyperledger/identus/messaging/MessagingServiceTest.scala index 54aea505f6..6fa9b28e9e 100644 --- a/event-notification/src/test/scala/org/hyperledger/identus/messaging/MessagingServiceTest.scala +++ b/event-notification/src/test/scala/org/hyperledger/identus/messaging/MessagingServiceTest.scala @@ -35,10 +35,11 @@ object MessagingServiceTest extends ZIOAppDefault { .fork _ <- ZIO.never } yield () - effect.provide( + effect.provideSome( messaging.MessagingServiceConfig.inMemoryLayer, messaging.MessagingService.serviceLayer, - ZLayer.succeed("Sample 'R' passed to handler") + ZLayer.succeed("Sample 'R' passed to handler"), + zio.Scope.default ) } diff --git a/pollux/core/src/test/scala/org/hyperledger/identus/pollux/core/service/CredentialServiceSpecHelper.scala b/pollux/core/src/test/scala/org/hyperledger/identus/pollux/core/service/CredentialServiceSpecHelper.scala index c8c608959f..4d872c31b1 100644 --- a/pollux/core/src/test/scala/org/hyperledger/identus/pollux/core/service/CredentialServiceSpecHelper.scala +++ b/pollux/core/src/test/scala/org/hyperledger/identus/pollux/core/service/CredentialServiceSpecHelper.scala @@ -42,7 +42,7 @@ trait CredentialServiceSpecHelper { GenericSecretStorageInMemory.layer, LinkSecretServiceImpl.layer, (MessagingServiceConfig.inMemoryLayer >>> MessagingService.serviceLayer >>> - MessagingService.producerLayer[UUID, WalletIdAndRecordId]).orDie, + (zio.Scope.default >>> MessagingService.producerLayer[UUID, WalletIdAndRecordId])).orDie, CredentialServiceImpl.layer ) diff --git a/pollux/core/src/test/scala/org/hyperledger/identus/pollux/core/service/PresentationServiceSpecHelper.scala b/pollux/core/src/test/scala/org/hyperledger/identus/pollux/core/service/PresentationServiceSpecHelper.scala index 1e59c4704a..8a644ae682 100644 --- a/pollux/core/src/test/scala/org/hyperledger/identus/pollux/core/service/PresentationServiceSpecHelper.scala +++ b/pollux/core/src/test/scala/org/hyperledger/identus/pollux/core/service/PresentationServiceSpecHelper.scala @@ -44,7 +44,7 @@ trait PresentationServiceSpecHelper { PresentationRepositoryInMemory.layer, CredentialRepositoryInMemory.layer, (MessagingServiceConfig.inMemoryLayer >>> MessagingService.serviceLayer >>> - MessagingService.producerLayer[UUID, WalletIdAndRecordId]).orDie, + (zio.Scope.default >>> MessagingService.producerLayer[UUID, WalletIdAndRecordId])).orDie, ) ++ defaultWalletLayer def createIssuer(did: String): Issuer = { From 597cf926332a85c7533890ff38240d89284f2307 Mon Sep 17 00:00:00 2001 From: Benjamin Voiturier Date: Wed, 20 Nov 2024 12:07:02 +0100 Subject: [PATCH 3/5] chore: require ZIO scope when making a new ZKafkaConsumer Signed-off-by: Benjamin Voiturier --- .../shared/messaging/MessagingService.scala | 8 +-- .../kafka/InMemoryMessagingService.scala | 4 +- .../kafka/ZKafkaMessagingServiceImpl.scala | 68 ++++++++----------- 3 files changed, 35 insertions(+), 45 deletions(-) diff --git a/shared/core/src/main/scala/org/hyperledger/identus/shared/messaging/MessagingService.scala b/shared/core/src/main/scala/org/hyperledger/identus/shared/messaging/MessagingService.scala index 2c926e2aac..ffa8855136 100644 --- a/shared/core/src/main/scala/org/hyperledger/identus/shared/messaging/MessagingService.scala +++ b/shared/core/src/main/scala/org/hyperledger/identus/shared/messaging/MessagingService.scala @@ -5,7 +5,7 @@ import zio.{Cause, Duration, EnvironmentTag, RIO, RLayer, Scope, Task, URIO, URL import java.time.Instant trait MessagingService { - def makeConsumer[K, V](groupId: String)(implicit kSerde: Serde[K], vSerde: Serde[V]): Task[Consumer[K, V]] + def makeConsumer[K, V](groupId: String)(implicit kSerde: Serde[K], vSerde: Serde[V]): RIO[Scope, Consumer[K, V]] def makeProducer[K, V]()(implicit kSerde: Serde[K], vSerde: Serde[V]): RIO[Scope, Producer[K, V]] } @@ -22,7 +22,7 @@ object MessagingService { groupId: String, handler: Message[K, V] => RIO[HR, Unit], steps: Seq[RetryStep] - )(implicit kSerde: Serde[K], vSerde: Serde[V]): RIO[HR & Producer[K, V] & MessagingService, Unit] = { + )(implicit kSerde: Serde[K], vSerde: Serde[V]): RIO[HR & Producer[K, V] & Scope & MessagingService, Unit] = { for { messagingService <- ZIO.service[MessagingService] messageProducer <- ZIO.service[Producer[K, V]] @@ -67,7 +67,7 @@ object MessagingService { topicName: String, consumerCount: Int, handler: Message[K, V] => RIO[HR, Unit] - )(implicit kSerde: Serde[K], vSerde: Serde[V]): RIO[HR & Producer[K, V] & MessagingService, Unit] = + )(implicit kSerde: Serde[K], vSerde: Serde[V]): RIO[HR & Producer[K, V] & Scope & MessagingService, Unit] = consumeWithRetryStrategy(groupId, handler, Seq(RetryStep(topicName, consumerCount, 0.seconds, None))) val serviceLayer: URLayer[MessagingServiceConfig, MessagingService] = @@ -90,7 +90,7 @@ object MessagingService { def consumerLayer[K: EnvironmentTag, V: EnvironmentTag](groupId: String)(implicit kSerde: Serde[K], vSerde: Serde[V] - ): RLayer[MessagingService, Consumer[K, V]] = ZLayer.fromZIO(for { + ): RLayer[Scope & MessagingService, Consumer[K, V]] = ZLayer.fromZIO(for { messagingService <- ZIO.service[MessagingService] consumer <- messagingService.makeConsumer[K, V](groupId) } yield consumer) diff --git a/shared/core/src/main/scala/org/hyperledger/identus/shared/messaging/kafka/InMemoryMessagingService.scala b/shared/core/src/main/scala/org/hyperledger/identus/shared/messaging/kafka/InMemoryMessagingService.scala index 54d8c935c2..0d5fd3f4dc 100644 --- a/shared/core/src/main/scala/org/hyperledger/identus/shared/messaging/kafka/InMemoryMessagingService.scala +++ b/shared/core/src/main/scala/org/hyperledger/identus/shared/messaging/kafka/InMemoryMessagingService.scala @@ -19,11 +19,11 @@ class InMemoryMessagingService( ] ) extends MessagingService { - override def makeConsumer[K, V](groupId: String)(using kSerde: Serde[K], vSerde: Serde[V]): Task[Consumer[K, V]] = { + override def makeConsumer[K, V](groupId: String)(using kSerde: Serde[K], vSerde: Serde[V]): RIO[Scope, Consumer[K, V]] = { ZIO.succeed(new InMemoryConsumer[K, V](groupId, topicQueues, processedMessagesMap)) } - override def makeProducer[K, V]()(using kSerde: Serde[K], vSerde: Serde[V]): Task[Producer[K, V]] = + override def makeProducer[K, V]()(using kSerde: Serde[K], vSerde: Serde[V]): RIO[Scope, Producer[K, V]] = ZIO.succeed(new InMemoryProducer[K, V](topicQueues, queueCapacity)) } diff --git a/shared/core/src/main/scala/org/hyperledger/identus/shared/messaging/kafka/ZKafkaMessagingServiceImpl.scala b/shared/core/src/main/scala/org/hyperledger/identus/shared/messaging/kafka/ZKafkaMessagingServiceImpl.scala index c9a48caa78..93c06d82b5 100644 --- a/shared/core/src/main/scala/org/hyperledger/identus/shared/messaging/kafka/ZKafkaMessagingServiceImpl.scala +++ b/shared/core/src/main/scala/org/hyperledger/identus/shared/messaging/kafka/ZKafkaMessagingServiceImpl.scala @@ -4,7 +4,11 @@ import org.apache.kafka.clients.consumer.ConsumerConfig import org.apache.kafka.common.header.Headers import org.hyperledger.identus.shared.messaging.* import zio.{Duration, RIO, Scope, Task, URIO, URLayer, ZIO, ZLayer} -import zio.kafka.consumer.{Consumer as ZKConsumer, ConsumerSettings as ZKConsumerSettings, Subscription as ZKSubscription} +import zio.kafka.consumer.{ + Consumer as ZKConsumer, + ConsumerSettings as ZKConsumerSettings, + Subscription as ZKSubscription +} import zio.kafka.consumer.Consumer.{AutoOffsetStrategy, OffsetRetrieval} import zio.kafka.producer.{Producer as ZKProducer, ProducerSettings as ZKProducerSettings} import zio.kafka.serde.{Deserializer as ZKDeserializer, Serializer as ZKSerializer} @@ -17,19 +21,28 @@ class ZKafkaMessagingServiceImpl( pollTimeout: Duration, rebalanceSafeCommits: Boolean ) extends MessagingService { - override def makeConsumer[K, V](groupId: String)(implicit kSerde: Serde[K], vSerde: Serde[V]): Task[Consumer[K, V]] = - ZIO.succeed( - new ZKafkaConsumerImpl[K, V]( - bootstrapServers, - groupId, - kSerde, - vSerde, - autoCreateTopics, - maxPollRecords, - maxPollInterval, - pollTimeout, - rebalanceSafeCommits + override def makeConsumer[K, V]( + groupId: String + )(implicit kSerde: Serde[K], vSerde: Serde[V]): RIO[Scope, Consumer[K, V]] = + for { + zkConsumer <- ZKConsumer.make( + ZKConsumerSettings(bootstrapServers) + .withProperty(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, autoCreateTopics.toString) + .withGroupId(groupId) + // 'max.poll.records' default is 500. This is a Kafka property. + .withMaxPollRecords(maxPollRecords) + // 'max.poll.interval.ms' default is 5 minutes. This is a Kafka property. + .withMaxPollInterval(maxPollInterval) // Should be max.poll.records x 'max processing time per record' + // 'pollTimeout' default is 50 millis. This is a ZIO Kafka property. + .withPollTimeout(pollTimeout) + .withOffsetRetrieval(OffsetRetrieval.Auto(AutoOffsetStrategy.Earliest)) + .withRebalanceSafeCommits(rebalanceSafeCommits) + // .withMaxRebalanceDuration(30.seconds) ) + } yield new ZKafkaConsumerImpl[K, V]( + zkConsumer, + kSerde, + vSerde ) override def makeProducer[K, V]()(implicit kSerde: Serde[K], vSerde: Serde[V]): RIO[Scope, Producer[K, V]] = @@ -58,32 +71,10 @@ object ZKafkaMessagingServiceImpl { } class ZKafkaConsumerImpl[K, V]( - bootstrapServers: List[String], - groupId: String, + zkConsumer: ZKConsumer, kSerde: Serde[K], - vSerde: Serde[V], - autoCreateTopics: Boolean, - maxPollRecords: Int, - maxPollInterval: Duration, - pollTimeout: Duration, - rebalanceSafeCommits: Boolean + vSerde: Serde[V] ) extends Consumer[K, V] { - private val zkConsumer = ZLayer.scoped( - ZKConsumer.make( - ZKConsumerSettings(bootstrapServers) - .withProperty(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, autoCreateTopics.toString) - .withGroupId(groupId) - // 'max.poll.records' default is 500. This is a Kafka property. - .withMaxPollRecords(maxPollRecords) - // 'max.poll.interval.ms' default is 5 minutes. This is a Kafka property. - .withMaxPollInterval(maxPollInterval) // Should be max.poll.records x 'max processing time per record' - // 'pollTimeout' default is 50 millis. This is a ZIO Kafka property. - .withPollTimeout(pollTimeout) - .withOffsetRetrieval(OffsetRetrieval.Auto(AutoOffsetStrategy.Earliest)) - .withRebalanceSafeCommits(rebalanceSafeCommits) - // .withMaxRebalanceDuration(30.seconds) - ) - ) private val zkKeyDeserializer = new ZKDeserializer[Any, K] { override def deserialize(topic: String, headers: Headers, data: Array[Byte]): RIO[Any, K] = @@ -96,9 +87,8 @@ class ZKafkaConsumerImpl[K, V]( } override def consume[HR](topic: String, topics: String*)(handler: Message[K, V] => URIO[HR, Unit]): RIO[HR, Unit] = - ZKConsumer + zkConsumer .plainStream(ZKSubscription.topics(topic, topics*), zkKeyDeserializer, zkValueDeserializer) - .provideSomeLayer(zkConsumer) .mapZIO(record => handler(Message(record.key, record.value, record.offset.offset, record.timestamp)).as(record.offset) ) From eff93ab654771889ba160c8742f20b1b8786ab92 Mon Sep 17 00:00:00 2001 From: Benjamin Voiturier Date: Wed, 20 Nov 2024 13:38:58 +0100 Subject: [PATCH 4/5] chore: downsize the Kafka consumers and retry topics configuration Signed-off-by: Benjamin Voiturier --- .../src/main/resources/application.conf | 16 +++++----- .../shared/docker-compose-with-kafka.yml | 32 ++++++++----------- .../src/test/resources/containers/agent.yml | 30 ++++++++--------- 3 files changed, 34 insertions(+), 44 deletions(-) diff --git a/cloud-agent/service/server/src/main/resources/application.conf b/cloud-agent/service/server/src/main/resources/application.conf index 7b5bebb01a..404408119d 100644 --- a/cloud-agent/service/server/src/main/resources/application.conf +++ b/cloud-agent/service/server/src/main/resources/application.conf @@ -250,34 +250,34 @@ agent { } messagingService { connectFlow { - consumerCount = 5 + consumerCount = 2 retryStrategy { - maxRetries = 4 + maxRetries = 2 initialDelay = 5.seconds maxDelay = 40.seconds } } issueFlow { - consumerCount = 5 + consumerCount = 2 retryStrategy { - maxRetries = 4 + maxRetries = 2 initialDelay = 5.seconds maxDelay = 40.seconds } } presentFlow { - consumerCount = 5 + consumerCount = 2 retryStrategy { - maxRetries = 4 + maxRetries = 2 initialDelay = 5.seconds maxDelay = 40.seconds } } didStateSync { - consumerCount = 5 + consumerCount = 1 } statusListSync { - consumerCount = 5 + consumerCount = 1 } inMemoryQueueCapacity = 1000 kafkaEnabled = false diff --git a/infrastructure/shared/docker-compose-with-kafka.yml b/infrastructure/shared/docker-compose-with-kafka.yml index 3e24ad6128..2d8bccd8cb 100644 --- a/infrastructure/shared/docker-compose-with-kafka.yml +++ b/infrastructure/shared/docker-compose-with-kafka.yml @@ -203,36 +203,30 @@ services: echo -e 'Creating kafka topics' # Connect - kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic connect --replication-factor 1 --partitions 20 - kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic connect-retry-1 --replication-factor 1 --partitions 20 - kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic connect-retry-2 --replication-factor 1 --partitions 20 - kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic connect-retry-3 --replication-factor 1 --partitions 20 - kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic connect-retry-4 --replication-factor 1 --partitions 20 + kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic connect --replication-factor 1 --partitions 4 + kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic connect-retry-1 --replication-factor 1 --partitions 4 + kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic connect-retry-2 --replication-factor 1 --partitions 4 kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic connect-DLQ --replication-factor 1 --partitions 1 # Issue - kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic issue --replication-factor 1 --partitions 5 - kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic issue-retry-1 --replication-factor 1 --partitions 5 - kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic issue-retry-2 --replication-factor 1 --partitions 5 - kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic issue-retry-3 --replication-factor 1 --partitions 5 - kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic issue-retry-4 --replication-factor 1 --partitions 5 + kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic issue --replication-factor 1 --partitions 4 + kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic issue-retry-1 --replication-factor 1 --partitions 4 + kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic issue-retry-2 --replication-factor 1 --partitions 4 kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic issue-DLQ --replication-factor 1 --partitions 1 # Present - kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic present --replication-factor 1 --partitions 5 - kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic present-retry-1 --replication-factor 1 --partitions 5 - kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic present-retry-2 --replication-factor 1 --partitions 5 - kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic present-retry-3 --replication-factor 1 --partitions 5 - kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic present-retry-4 --replication-factor 1 --partitions 5 + kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic present --replication-factor 1 --partitions 4 + kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic present-retry-1 --replication-factor 1 --partitions 4 + kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic present-retry-2 --replication-factor 1 --partitions 4 kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic present-DLQ --replication-factor 1 --partitions 1 # DID Publication State Sync - kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic sync-did-state --replication-factor 1 --partitions 5 - kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic sync-did-state-DLQ --replication-factor 1 --partitions 5 + kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic sync-did-state --replication-factor 1 --partitions 4 + kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic sync-did-state-DLQ --replication-factor 1 --partitions 4 # Status List Sync - kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic sync-status-list --replication-factor 1 --partitions 5 - kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic sync-status-list-DLQ --replication-factor 1 --partitions 5 + kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic sync-status-list --replication-factor 1 --partitions 4 + kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic sync-status-list-DLQ --replication-factor 1 --partitions 4 tail -f /dev/null " diff --git a/tests/integration-tests/src/test/resources/containers/agent.yml b/tests/integration-tests/src/test/resources/containers/agent.yml index 74bf287ada..7405ce6b8f 100644 --- a/tests/integration-tests/src/test/resources/containers/agent.yml +++ b/tests/integration-tests/src/test/resources/containers/agent.yml @@ -126,34 +126,30 @@ services: echo -e 'Creating kafka topics' # Connect - kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic connect --replication-factor 1 --partitions 5 - kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic connect-retry-1 --replication-factor 1 --partitions 5 - kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic connect-retry-2 --replication-factor 1 --partitions 5 - kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic connect-retry-3 --replication-factor 1 --partitions 5 - kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic connect-retry-4 --replication-factor 1 --partitions 5 + kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic connect --replication-factor 1 --partitions 4 + kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic connect-retry-1 --replication-factor 1 --partitions 4 + kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic connect-retry-2 --replication-factor 1 --partitions 4 kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic connect-DLQ --replication-factor 1 --partitions 1 # Issue - kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic issue --replication-factor 1 --partitions 5 - kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic issue-retry-1 --replication-factor 1 --partitions 5 - kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic issue-retry-2 --replication-factor 1 --partitions 5 - kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic issue-retry-3 --replication-factor 1 --partitions 5 - kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic issue-retry-4 --replication-factor 1 --partitions 5 + kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic issue --replication-factor 1 --partitions 4 + kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic issue-retry-1 --replication-factor 1 --partitions 4 + kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic issue-retry-2 --replication-factor 1 --partitions 4 kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic issue-DLQ --replication-factor 1 --partitions 1 # Present - kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic present --replication-factor 1 --partitions 5 - kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic present-retry-1 --replication-factor 1 --partitions 5 - kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic present-retry-2 --replication-factor 1 --partitions 5 - kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic present-retry-3 --replication-factor 1 --partitions 5 - kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic present-retry-4 --replication-factor 1 --partitions 5 + kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic present --replication-factor 1 --partitions 4 + kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic present-retry-1 --replication-factor 1 --partitions 4 + kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic present-retry-2 --replication-factor 1 --partitions 4 kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic present-DLQ --replication-factor 1 --partitions 1 # DID Publication State Sync - kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic sync-did-state --replication-factor 1 --partitions 5 + kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic sync-did-state --replication-factor 1 --partitions 4 + kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic sync-did-state-DLQ --replication-factor 1 --partitions 4 # Status List Sync - kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic sync-status-list --replication-factor 1 --partitions 5 + kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic sync-status-list --replication-factor 1 --partitions 4 + kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic sync-status-list-DLQ --replication-factor 1 --partitions 4 tail -f /dev/null " From da843dbdce3f96be780916fafd94f7d7e39018dc Mon Sep 17 00:00:00 2001 From: Benjamin Voiturier Date: Wed, 20 Nov 2024 13:54:43 +0100 Subject: [PATCH 5/5] chore: run scalafmt Signed-off-by: Benjamin Voiturier --- .../identus/shared/messaging/MessagingService.scala | 2 +- .../shared/messaging/kafka/InMemoryMessagingService.scala | 4 +++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/shared/core/src/main/scala/org/hyperledger/identus/shared/messaging/MessagingService.scala b/shared/core/src/main/scala/org/hyperledger/identus/shared/messaging/MessagingService.scala index ffa8855136..3ef20e3dd9 100644 --- a/shared/core/src/main/scala/org/hyperledger/identus/shared/messaging/MessagingService.scala +++ b/shared/core/src/main/scala/org/hyperledger/identus/shared/messaging/MessagingService.scala @@ -1,7 +1,7 @@ package org.hyperledger.identus.shared.messaging import org.hyperledger.identus.shared.messaging.kafka.{InMemoryMessagingService, ZKafkaMessagingServiceImpl} -import zio.{Cause, Duration, EnvironmentTag, RIO, RLayer, Scope, Task, URIO, URLayer, ZIO, ZLayer, durationInt} +import zio.{durationInt, Cause, Duration, EnvironmentTag, RIO, RLayer, Scope, Task, URIO, URLayer, ZIO, ZLayer} import java.time.Instant trait MessagingService { diff --git a/shared/core/src/main/scala/org/hyperledger/identus/shared/messaging/kafka/InMemoryMessagingService.scala b/shared/core/src/main/scala/org/hyperledger/identus/shared/messaging/kafka/InMemoryMessagingService.scala index 0d5fd3f4dc..8577ef072c 100644 --- a/shared/core/src/main/scala/org/hyperledger/identus/shared/messaging/kafka/InMemoryMessagingService.scala +++ b/shared/core/src/main/scala/org/hyperledger/identus/shared/messaging/kafka/InMemoryMessagingService.scala @@ -19,7 +19,9 @@ class InMemoryMessagingService( ] ) extends MessagingService { - override def makeConsumer[K, V](groupId: String)(using kSerde: Serde[K], vSerde: Serde[V]): RIO[Scope, Consumer[K, V]] = { + override def makeConsumer[K, V]( + groupId: String + )(using kSerde: Serde[K], vSerde: Serde[V]): RIO[Scope, Consumer[K, V]] = { ZIO.succeed(new InMemoryConsumer[K, V](groupId, topicQueues, processedMessagesMap)) }