Skip to content

Commit

Permalink
feat(standalone-kafka): explicitly use netty grpc server server, and …
Browse files Browse the repository at this point in the history
…use loopback address for client and server (#690)
  • Loading branch information
osoykan authored Jan 27, 2025
1 parent 276fad7 commit dbfd9dd
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ import com.trendyol.stove.testing.e2e.standalone.kafka.intercepting.*
import com.trendyol.stove.testing.e2e.system.TestSystem
import com.trendyol.stove.testing.e2e.system.abstractions.*
import com.trendyol.stove.testing.e2e.system.annotations.StoveDsl
import io.grpc.*
import io.grpc.Server
import io.grpc.netty.NettyServerBuilder
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.selects.*
Expand All @@ -17,6 +18,7 @@ import org.apache.kafka.clients.consumer.*
import org.apache.kafka.clients.producer.*
import org.apache.kafka.common.serialization.*
import org.slf4j.*
import java.net.*
import java.util.*
import kotlin.reflect.KClass
import kotlin.time.*
Expand Down Expand Up @@ -385,8 +387,8 @@ class KafkaSystem(
private suspend fun startGrpcServer(): Server {
System.setProperty(STOVE_KAFKA_BRIDGE_PORT, context.options.bridgeGrpcServerPort.toString())
return Try {
ServerBuilder
.forPort(context.options.bridgeGrpcServerPort)
NettyServerBuilder
.forAddress(InetSocketAddress(InetAddress.getLoopbackAddress(), context.options.bridgeGrpcServerPort))
.executor(StoveKafkaCoroutineScope.also { it.ensureActive() }.asExecutor)
.addService(StoveKafkaObserverGrpcServer(sink))
.handshakeTimeout(GRPC_TIMEOUT_IN_SECONDS, java.util.concurrent.TimeUnit.SECONDS)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
@file:Suppress("HttpUrlsUsage")

package com.trendyol.stove.testing.e2e.standalone.kafka.intercepting

import com.squareup.wire.*
import com.trendyol.stove.testing.e2e.standalone.kafka.*
import kotlinx.coroutines.CoroutineScope
import okhttp3.*
import java.net.InetAddress
import kotlin.time.Duration.Companion.seconds
import kotlin.time.toJavaDuration

Expand All @@ -23,7 +26,9 @@ object GrpcUtils {
fun createClient(onPort: String, scope: CoroutineScope): StoveKafkaObserverServiceClient = GrpcClient
.Builder()
.client(getClient(scope))
.baseUrl("http://0.0.0.0:$onPort".toHttpUrl())
.baseUrl(onLoopback(onPort))
.build()
.create<StoveKafkaObserverServiceClient>()

private fun onLoopback(port: String): GrpcHttpUrl = "http://${InetAddress.getLoopbackAddress().hostAddress}:$port".toHttpUrl()
}

0 comments on commit dbfd9dd

Please sign in to comment.