Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: share Kafka producers among threads #1460

Merged
merged 7 commits into from
Nov 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions cloud-agent/service/server/src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -250,34 +250,34 @@ agent {
}
messagingService {
connectFlow {
consumerCount = 5
consumerCount = 2
retryStrategy {
maxRetries = 4
maxRetries = 2
initialDelay = 5.seconds
maxDelay = 40.seconds
}
}
issueFlow {
consumerCount = 5
consumerCount = 2
retryStrategy {
maxRetries = 4
maxRetries = 2
initialDelay = 5.seconds
maxDelay = 40.seconds
}
}
presentFlow {
consumerCount = 5
consumerCount = 2
retryStrategy {
maxRetries = 4
maxRetries = 2
initialDelay = 5.seconds
maxDelay = 40.seconds
}
}
didStateSync {
consumerCount = 5
consumerCount = 1
}
statusListSync {
consumerCount = 5
consumerCount = 1
}
inMemoryQueueCapacity = 1000
kafkaEnabled = false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ object OIDCCredentialIssuerServiceSpec
LinkSecretServiceImpl.layer,
CredentialServiceImpl.layer,
(MessagingServiceConfig.inMemoryLayer >>> MessagingService.serviceLayer >>>
MessagingService.producerLayer[UUID, WalletIdAndRecordId]).orDie,
(zio.Scope.default >>> MessagingService.producerLayer[UUID, WalletIdAndRecordId])).orDie,
OIDCCredentialIssuerServiceImpl.layer
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,7 @@ object ConnectionServiceImplSpec extends ZIOSpecDefault {
messaging.MessagingService.serviceLayer,
messaging.MessagingService.producerLayer[UUID, WalletIdAndRecordId],
ZLayer.succeed(WalletAccessContext(WalletId.random)),
zio.Scope.default
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,8 @@ object ConnectionServiceNotifierSpec extends ZIOSpecDefault {
ZLayer.succeed(WalletAccessContext(WalletId.random)),
messaging.MessagingServiceConfig.inMemoryLayer,
messaging.MessagingService.serviceLayer,
messaging.MessagingService.producerLayer[UUID, WalletIdAndRecordId]
messaging.MessagingService.producerLayer[UUID, WalletIdAndRecordId],
zio.Scope.default
)
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,11 @@ object MessagingServiceTest extends ZIOAppDefault {
.fork
_ <- ZIO.never
} yield ()
effect.provide(
effect.provideSome(
messaging.MessagingServiceConfig.inMemoryLayer,
messaging.MessagingService.serviceLayer,
ZLayer.succeed("Sample 'R' passed to handler")
ZLayer.succeed("Sample 'R' passed to handler"),
zio.Scope.default
)
}

Expand Down
32 changes: 13 additions & 19 deletions infrastructure/shared/docker-compose-with-kafka.yml
Original file line number Diff line number Diff line change
Expand Up @@ -203,36 +203,30 @@ services:
echo -e 'Creating kafka topics'

# Connect
kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic connect --replication-factor 1 --partitions 20
kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic connect-retry-1 --replication-factor 1 --partitions 20
kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic connect-retry-2 --replication-factor 1 --partitions 20
kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic connect-retry-3 --replication-factor 1 --partitions 20
kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic connect-retry-4 --replication-factor 1 --partitions 20
kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic connect --replication-factor 1 --partitions 4
kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic connect-retry-1 --replication-factor 1 --partitions 4
kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic connect-retry-2 --replication-factor 1 --partitions 4
kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic connect-DLQ --replication-factor 1 --partitions 1

# Issue
kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic issue --replication-factor 1 --partitions 5
kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic issue-retry-1 --replication-factor 1 --partitions 5
kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic issue-retry-2 --replication-factor 1 --partitions 5
kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic issue-retry-3 --replication-factor 1 --partitions 5
kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic issue-retry-4 --replication-factor 1 --partitions 5
kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic issue --replication-factor 1 --partitions 4
kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic issue-retry-1 --replication-factor 1 --partitions 4
kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic issue-retry-2 --replication-factor 1 --partitions 4
kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic issue-DLQ --replication-factor 1 --partitions 1

# Present
kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic present --replication-factor 1 --partitions 5
kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic present-retry-1 --replication-factor 1 --partitions 5
kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic present-retry-2 --replication-factor 1 --partitions 5
kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic present-retry-3 --replication-factor 1 --partitions 5
kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic present-retry-4 --replication-factor 1 --partitions 5
kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic present --replication-factor 1 --partitions 4
kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic present-retry-1 --replication-factor 1 --partitions 4
kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic present-retry-2 --replication-factor 1 --partitions 4
kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic present-DLQ --replication-factor 1 --partitions 1

# DID Publication State Sync
kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic sync-did-state --replication-factor 1 --partitions 5
kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic sync-did-state-DLQ --replication-factor 1 --partitions 5
kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic sync-did-state --replication-factor 1 --partitions 4
kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic sync-did-state-DLQ --replication-factor 1 --partitions 4

# Status List Sync
kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic sync-status-list --replication-factor 1 --partitions 5
kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic sync-status-list-DLQ --replication-factor 1 --partitions 5
kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic sync-status-list --replication-factor 1 --partitions 4
kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic sync-status-list-DLQ --replication-factor 1 --partitions 4

tail -f /dev/null
"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ trait CredentialServiceSpecHelper {
GenericSecretStorageInMemory.layer,
LinkSecretServiceImpl.layer,
(MessagingServiceConfig.inMemoryLayer >>> MessagingService.serviceLayer >>>
MessagingService.producerLayer[UUID, WalletIdAndRecordId]).orDie,
(zio.Scope.default >>> MessagingService.producerLayer[UUID, WalletIdAndRecordId])).orDie,
CredentialServiceImpl.layer
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ trait PresentationServiceSpecHelper {
PresentationRepositoryInMemory.layer,
CredentialRepositoryInMemory.layer,
(MessagingServiceConfig.inMemoryLayer >>> MessagingService.serviceLayer >>>
MessagingService.producerLayer[UUID, WalletIdAndRecordId]).orDie,
(zio.Scope.default >>> MessagingService.producerLayer[UUID, WalletIdAndRecordId])).orDie,
) ++ defaultWalletLayer

def createIssuer(did: String): Issuer = {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
package org.hyperledger.identus.shared.messaging

import org.hyperledger.identus.shared.messaging.kafka.{InMemoryMessagingService, ZKafkaMessagingServiceImpl}
import zio.{durationInt, Cause, Duration, EnvironmentTag, RIO, RLayer, Task, URIO, URLayer, ZIO, ZLayer}
import zio.{durationInt, Cause, Duration, EnvironmentTag, RIO, RLayer, Scope, Task, URIO, URLayer, ZIO, ZLayer}

import java.time.Instant
trait MessagingService {
def makeConsumer[K, V](groupId: String)(implicit kSerde: Serde[K], vSerde: Serde[V]): Task[Consumer[K, V]]
def makeProducer[K, V]()(implicit kSerde: Serde[K], vSerde: Serde[V]): Task[Producer[K, V]]
def makeConsumer[K, V](groupId: String)(implicit kSerde: Serde[K], vSerde: Serde[V]): RIO[Scope, Consumer[K, V]]
def makeProducer[K, V]()(implicit kSerde: Serde[K], vSerde: Serde[V]): RIO[Scope, Producer[K, V]]
}

object MessagingService {
Expand All @@ -22,7 +22,7 @@ object MessagingService {
groupId: String,
handler: Message[K, V] => RIO[HR, Unit],
steps: Seq[RetryStep]
)(implicit kSerde: Serde[K], vSerde: Serde[V]): RIO[HR & Producer[K, V] & MessagingService, Unit] = {
)(implicit kSerde: Serde[K], vSerde: Serde[V]): RIO[HR & Producer[K, V] & Scope & MessagingService, Unit] = {
for {
messagingService <- ZIO.service[MessagingService]
messageProducer <- ZIO.service[Producer[K, V]]
Expand Down Expand Up @@ -67,7 +67,7 @@ object MessagingService {
topicName: String,
consumerCount: Int,
handler: Message[K, V] => RIO[HR, Unit]
)(implicit kSerde: Serde[K], vSerde: Serde[V]): RIO[HR & Producer[K, V] & MessagingService, Unit] =
)(implicit kSerde: Serde[K], vSerde: Serde[V]): RIO[HR & Producer[K, V] & Scope & MessagingService, Unit] =
consumeWithRetryStrategy(groupId, handler, Seq(RetryStep(topicName, consumerCount, 0.seconds, None)))

val serviceLayer: URLayer[MessagingServiceConfig, MessagingService] =
Expand All @@ -81,15 +81,16 @@ object MessagingService {
def producerLayer[K: EnvironmentTag, V: EnvironmentTag](implicit
kSerde: Serde[K],
vSerde: Serde[V]
): RLayer[MessagingService, Producer[K, V]] = ZLayer.fromZIO(for {
): RLayer[Scope & MessagingService, Producer[K, V]] = ZLayer.fromZIO(for {
messagingService <- ZIO.service[MessagingService]
_ <- ZIO.logInfo("Producer layer invoked!!")
producer <- messagingService.makeProducer[K, V]()
} yield producer)

def consumerLayer[K: EnvironmentTag, V: EnvironmentTag](groupId: String)(implicit
kSerde: Serde[K],
vSerde: Serde[V]
): RLayer[MessagingService, Consumer[K, V]] = ZLayer.fromZIO(for {
): RLayer[Scope & MessagingService, Consumer[K, V]] = ZLayer.fromZIO(for {
messagingService <- ZIO.service[MessagingService]
consumer <- messagingService.makeConsumer[K, V](groupId)
} yield consumer)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@ class InMemoryMessagingService(
]
) extends MessagingService {

override def makeConsumer[K, V](groupId: String)(using kSerde: Serde[K], vSerde: Serde[V]): Task[Consumer[K, V]] = {
override def makeConsumer[K, V](
groupId: String
)(using kSerde: Serde[K], vSerde: Serde[V]): RIO[Scope, Consumer[K, V]] = {
ZIO.succeed(new InMemoryConsumer[K, V](groupId, topicQueues, processedMessagesMap))
}

override def makeProducer[K, V]()(using kSerde: Serde[K], vSerde: Serde[V]): Task[Producer[K, V]] =
override def makeProducer[K, V]()(using kSerde: Serde[K], vSerde: Serde[V]): RIO[Scope, Producer[K, V]] =
ZIO.succeed(new InMemoryProducer[K, V](topicQueues, queueCapacity))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package org.hyperledger.identus.shared.messaging.kafka
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.header.Headers
import org.hyperledger.identus.shared.messaging.*
import zio.{Duration, RIO, Task, URIO, URLayer, ZIO, ZLayer}
import zio.{Duration, RIO, Scope, Task, URIO, URLayer, ZIO, ZLayer}
import zio.kafka.consumer.{
Consumer as ZKConsumer,
ConsumerSettings as ZKConsumerSettings,
Expand All @@ -21,23 +21,34 @@ class ZKafkaMessagingServiceImpl(
pollTimeout: Duration,
rebalanceSafeCommits: Boolean
) extends MessagingService {
override def makeConsumer[K, V](groupId: String)(implicit kSerde: Serde[K], vSerde: Serde[V]): Task[Consumer[K, V]] =
ZIO.succeed(
new ZKafkaConsumerImpl[K, V](
bootstrapServers,
groupId,
kSerde,
vSerde,
autoCreateTopics,
maxPollRecords,
maxPollInterval,
pollTimeout,
rebalanceSafeCommits
override def makeConsumer[K, V](
groupId: String
)(implicit kSerde: Serde[K], vSerde: Serde[V]): RIO[Scope, Consumer[K, V]] =
for {
zkConsumer <- ZKConsumer.make(
ZKConsumerSettings(bootstrapServers)
.withProperty(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, autoCreateTopics.toString)
.withGroupId(groupId)
// 'max.poll.records' default is 500. This is a Kafka property.
.withMaxPollRecords(maxPollRecords)
// 'max.poll.interval.ms' default is 5 minutes. This is a Kafka property.
.withMaxPollInterval(maxPollInterval) // Should be max.poll.records x 'max processing time per record'
// 'pollTimeout' default is 50 millis. This is a ZIO Kafka property.
.withPollTimeout(pollTimeout)
.withOffsetRetrieval(OffsetRetrieval.Auto(AutoOffsetStrategy.Earliest))
.withRebalanceSafeCommits(rebalanceSafeCommits)
// .withMaxRebalanceDuration(30.seconds)
)
} yield new ZKafkaConsumerImpl[K, V](
zkConsumer,
kSerde,
vSerde
)

override def makeProducer[K, V]()(implicit kSerde: Serde[K], vSerde: Serde[V]): Task[Producer[K, V]] =
ZIO.succeed(new ZKafkaProducerImpl[K, V](bootstrapServers, kSerde, vSerde))
override def makeProducer[K, V]()(implicit kSerde: Serde[K], vSerde: Serde[V]): RIO[Scope, Producer[K, V]] =
for {
zkProducer <- ZKProducer.make(ZKProducerSettings(bootstrapServers))
} yield new ZKafkaProducerImpl[K, V](zkProducer, kSerde, vSerde)
}

object ZKafkaMessagingServiceImpl {
Expand All @@ -60,32 +71,10 @@ object ZKafkaMessagingServiceImpl {
}

class ZKafkaConsumerImpl[K, V](
bootstrapServers: List[String],
groupId: String,
zkConsumer: ZKConsumer,
kSerde: Serde[K],
vSerde: Serde[V],
autoCreateTopics: Boolean,
maxPollRecords: Int,
maxPollInterval: Duration,
pollTimeout: Duration,
rebalanceSafeCommits: Boolean
vSerde: Serde[V]
) extends Consumer[K, V] {
private val zkConsumer = ZLayer.scoped(
ZKConsumer.make(
ZKConsumerSettings(bootstrapServers)
.withProperty(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, autoCreateTopics.toString)
.withGroupId(groupId)
// 'max.poll.records' default is 500. This is a Kafka property.
.withMaxPollRecords(maxPollRecords)
// 'max.poll.interval.ms' default is 5 minutes. This is a Kafka property.
.withMaxPollInterval(maxPollInterval) // Should be max.poll.records x 'max processing time per record'
// 'pollTimeout' default is 50 millis. This is a ZIO Kafka property.
.withPollTimeout(pollTimeout)
.withOffsetRetrieval(OffsetRetrieval.Auto(AutoOffsetStrategy.Earliest))
.withRebalanceSafeCommits(rebalanceSafeCommits)
// .withMaxRebalanceDuration(30.seconds)
)
)

private val zkKeyDeserializer = new ZKDeserializer[Any, K] {
override def deserialize(topic: String, headers: Headers, data: Array[Byte]): RIO[Any, K] =
Expand All @@ -98,9 +87,8 @@ class ZKafkaConsumerImpl[K, V](
}

override def consume[HR](topic: String, topics: String*)(handler: Message[K, V] => URIO[HR, Unit]): RIO[HR, Unit] =
ZKConsumer
zkConsumer
.plainStream(ZKSubscription.topics(topic, topics*), zkKeyDeserializer, zkValueDeserializer)
.provideSomeLayer(zkConsumer)
.mapZIO(record =>
handler(Message(record.key, record.value, record.offset.offset, record.timestamp)).as(record.offset)
)
Expand All @@ -109,13 +97,7 @@ class ZKafkaConsumerImpl[K, V](
.runDrain
}

class ZKafkaProducerImpl[K, V](bootstrapServers: List[String], kSerde: Serde[K], vSerde: Serde[V])
extends Producer[K, V] {
private val zkProducer = ZLayer.scoped(
ZKProducer.make(
ZKProducerSettings(bootstrapServers)
)
)
class ZKafkaProducerImpl[K, V](zkProducer: ZKProducer, kSerde: Serde[K], vSerde: Serde[V]) extends Producer[K, V] {

private val zkKeySerializer = new ZKSerializer[Any, K] {
override def serialize(topic: String, headers: Headers, value: K): RIO[Any, Array[Byte]] =
Expand All @@ -128,10 +110,9 @@ class ZKafkaProducerImpl[K, V](bootstrapServers: List[String], kSerde: Serde[K],
}

override def produce(topic: String, key: K, value: V): Task[Unit] =
ZKProducer
zkProducer
.produce(topic, key, value, zkKeySerializer, zkValueSerializer)
.tap(metadata => ZIO.logInfo(s"Message produced: ${metadata.offset()}"))
.map(_ => ())
.provideSome(zkProducer)

}
Loading
Loading