Skip to content

Commit

Permalink
Merge branch 'main' into post-3.0
Browse files Browse the repository at this point in the history
  • Loading branch information
jdegoes authored Sep 10, 2024
2 parents 03d6ed2 + 188ff2e commit 3fd21ef
Show file tree
Hide file tree
Showing 22 changed files with 383 additions and 265 deletions.
18 changes: 13 additions & 5 deletions .devcontainer/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,21 +1,20 @@
# See here for image contents: https://github.com/microsoft/vscode-dev-containers/tree/v0.238.1/containers/java/.devcontainer/base.Dockerfile

# [Choice] Java version (use -bullseye variants on local arm64/Apple Silicon): 11, 17, 11-bullseye, 17-bullseye, 11-buster, 17-buster
ARG VARIANT="11"
FROM mcr.microsoft.com/vscode/devcontainers/java:0-${VARIANT}
ARG VARIANT="17"
FROM mcr.microsoft.com/vscode/devcontainers/java:${VARIANT}


RUN curl -s "https://get.sdkman.io" | bash

# Install Scala Lang
ARG SBT_VERSION="1.7.1"
ARG SBT_VERSION="1.10.1"
RUN \
curl -L "https://github.com/sbt/sbt/releases/download/v$SBT_VERSION/sbt-$SBT_VERSION.tgz" | tar zxf - -C /usr/share && \
cd /usr/share/sbt/bin && \
rm sbt.bat sbtn-x86_64-apple-darwin sbtn-x86_64-pc-linux sbtn-x86_64-pc-win32.exe && \
ln -s /usr/share/sbt/bin/sbt /usr/local/bin/sbt

ARG SCALA_VERSION="3.1.3"
ARG SCALA_VERSION="3.3.3"
RUN \
mkdir /setup-project && \
cd /setup-project && \
Expand All @@ -24,4 +23,13 @@ RUN \
sbt compile && \
rm -rf /setup-project

RUN \
mkdir /setup-wrk && \
sudo apt-get update -y && sudo apt-get install build-essential libssl-dev git -y && \
git clone https://github.com/wg/wrk.git wrk && \
cd wrk && \
make && \
cp wrk /usr/local/bin && \
rm -rf /setup-wrk

CMD ["sbt"]
4 changes: 2 additions & 2 deletions .devcontainer/devcontainer.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@
// Update the VARIANT arg to pick a Java version: 11, 17
// Append -bullseye or -buster to pin to an OS version.
// Use the -bullseye variants on local arm64/Apple Silicon.
"VARIANT": "11",
"SCALA_VERSION": "3.1.3"
"VARIANT": "17",
"SCALA_VERSION": "3.3.3"
}
},

Expand Down
4 changes: 3 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,8 @@ lazy val zioHttpExample = (project in file("zio-http-example"))
.settings(runSettings(Debug.Main))
.settings(libraryDependencies ++= Seq(`jwt-core`, `zio-schema-json`))
.settings(
run / fork := true,
run / javaOptions ++= Seq("-Xms4G", "-Xmx4G", "-XX:+UseG1GC"),
libraryDependencies ++= Seq(
`zio-config`,
`zio-config-magnolia`,
Expand Down Expand Up @@ -404,7 +406,7 @@ lazy val docs = project
testFrameworks += new TestFramework("zio.test.sbt.ZTestFramework"),
libraryDependencies ++= Seq(
`jwt-core`,
"dev.zio" %% "zio-test" % ZioVersion,
"dev.zio" %% "zio-test" % ZioVersion,
`zio-config`,
`zio-config-magnolia`,
`zio-config-typesafe`,
Expand Down
8 changes: 4 additions & 4 deletions docs/reference/aop/handler_aspect.md
Original file line number Diff line number Diff line change
Expand Up @@ -303,11 +303,11 @@ object UserRepository {

```scala mdoc:silent
Routes(
Method.GET / "user" / int("userId") -> sessionMiddleware -> handler {
(userId: Int, session: Session, request: Request) =>
UserRepository.getUser(session.organizationId, userId)
Method.GET / "user" / int("userId") -> handler {
(userId: Int, request: Request) =>
withContext((session: Session) => UserRepository.getUser(session.organizationId, userId))
}
)
) @@ sessionMiddleware
```

The `HandlerAspect` companion object provides a number of helpful constructors for these middlewares.
Expand Down
18 changes: 9 additions & 9 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,15 @@ object Dependencies {

val netty =
Seq(
"io.netty" % "netty-codec-http" % NettyVersion,
"io.netty" % "netty-handler-proxy" % NettyVersion,
"io.netty" % "netty-transport-native-epoll" % NettyVersion,
"io.netty" % "netty-transport-native-epoll" % NettyVersion % Runtime classifier "linux-x86_64",
"io.netty" % "netty-transport-native-epoll" % NettyVersion % Runtime classifier "linux-aarch_64",
"io.netty" % "netty-transport-native-kqueue" % NettyVersion,
"io.netty" % "netty-transport-native-kqueue" % NettyVersion % Runtime classifier "osx-x86_64",
"io.netty" % "netty-transport-native-kqueue" % NettyVersion % Runtime classifier "osx-aarch_64",
"com.aayushatharva.brotli4j" % "brotli4j" % "1.16.0" % "provided",
"io.netty" % "netty-codec-http" % NettyVersion,
"io.netty" % "netty-handler-proxy" % NettyVersion,
"io.netty" % "netty-transport-native-epoll" % NettyVersion,
"io.netty" % "netty-transport-native-epoll" % NettyVersion classifier "linux-x86_64",
"io.netty" % "netty-transport-native-epoll" % NettyVersion classifier "linux-aarch_64",
"io.netty" % "netty-transport-native-kqueue" % NettyVersion,
"io.netty" % "netty-transport-native-kqueue" % NettyVersion classifier "osx-x86_64",
"io.netty" % "netty-transport-native-kqueue" % NettyVersion classifier "osx-aarch_64",
"com.aayushatharva.brotli4j" % "brotli4j" % "1.16.0" % "provided",
)

val `netty-incubator` =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,9 @@ object PlainTextBenchmarkServer extends ZIOAppDefault {

private val config = Server.Config.default
.port(8080)
.enableRequestStreaming

private val nettyConfig = NettyConfig.default
.leakDetection(LeakDetectionLevel.DISABLED)
.maxThreads(8)

private val configLayer = ZLayer.succeed(config)
private val nettyConfigLayer = ZLayer.succeed(nettyConfig)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,9 @@ object SimpleEffectBenchmarkServer extends ZIOAppDefault {

private val config = Server.Config.default
.port(8080)
.enableRequestStreaming

private val nettyConfig = NettyConfig.default
.leakDetection(LeakDetectionLevel.DISABLED)
.maxThreads(8)

private val configLayer = ZLayer.succeed(config)
private val nettyConfigLayer = ZLayer.succeed(nettyConfig)
Expand Down
129 changes: 95 additions & 34 deletions zio-http/jvm/src/main/scala/zio/http/netty/NettyConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,19 @@ final case class NettyConfig(
nThreads: Int,
shutdownQuietPeriodDuration: Duration,
shutdownTimeoutDuration: Duration,
bossGroup: NettyConfig.BossGroup,
) extends EventLoopGroups.Config { self =>

/**
* Configure Netty's boss event-loop group. This only applies to server
* applications and is ignored for the Client
*/
def bossGroup(cfg: NettyConfig.BossGroup): NettyConfig = self.copy(bossGroup = cfg)

def channelType(channelType: ChannelType): NettyConfig = self.copy(channelType = channelType)

/**
* Configure the server to use the leak detection level provided.
* Configure Netty to use the leak detection level provided.
*
* @see
* <a
Expand All @@ -44,50 +51,104 @@ final case class NettyConfig(
def leakDetection(level: LeakDetectionLevel): NettyConfig = self.copy(leakDetectionLevel = level)

/**
* Configure the server to use a maximum of nThreads to process requests.
* Configure Netty to use a maximum of `nThreads` for the worker event-loop
* group.
*/
def maxThreads(nThreads: Int): NettyConfig = self.copy(nThreads = nThreads)

val shutdownTimeUnit: TimeUnit = TimeUnit.MILLISECONDS

val shutdownQuietPeriod: Long = shutdownQuietPeriodDuration.toMillis
val shutdownTimeOut: Long = shutdownTimeoutDuration.toMillis
def shutdownTimeUnit: TimeUnit = TimeUnit.MILLISECONDS
def shutdownQuietPeriod: Long = shutdownQuietPeriodDuration.toMillis
def shutdownTimeOut: Long = shutdownTimeoutDuration.toMillis
}

object NettyConfig {
def config: Config[NettyConfig] =
(LeakDetectionLevel.config.nested("leak-detection-level").withDefault(NettyConfig.default.leakDetectionLevel) ++
Config
.string("channel-type")
.mapOrFail {
case "auto" => Right(ChannelType.AUTO)
case "nio" => Right(ChannelType.NIO)
case "epoll" => Right(ChannelType.EPOLL)
case "kqueue" => Right(ChannelType.KQUEUE)
case "uring" => Right(ChannelType.URING)
case other => Left(Config.Error.InvalidData(message = s"Invalid channel type: $other"))
}
.withDefault(NettyConfig.default.channelType) ++
final case class BossGroup(
channelType: ChannelType,
nThreads: Int,
shutdownQuietPeriodDuration: Duration,
shutdownTimeOutDuration: Duration,
) extends EventLoopGroups.Config {
def shutdownTimeUnit: TimeUnit = TimeUnit.MILLISECONDS
def shutdownQuietPeriod: Long = shutdownQuietPeriodDuration.toMillis
def shutdownTimeOut: Long = shutdownTimeOutDuration.toMillis
}

private def baseConfig: Config[EventLoopGroups.Config] =
(Config
.string("channel-type")
.mapOrFail {
case "auto" => Right(ChannelType.AUTO)
case "nio" => Right(ChannelType.NIO)
case "epoll" => Right(ChannelType.EPOLL)
case "kqueue" => Right(ChannelType.KQUEUE)
case "uring" => Right(ChannelType.URING)
case other => Left(Config.Error.InvalidData(message = s"Invalid channel type: $other"))
}
.withDefault(NettyConfig.default.channelType) ++
Config.int("max-threads").withDefault(NettyConfig.default.nThreads) ++
Config.duration("shutdown-quiet-period").withDefault(NettyConfig.default.shutdownQuietPeriodDuration) ++
Config.duration("shutdown-timeout").withDefault(NettyConfig.default.shutdownTimeoutDuration)).map {
case (leakDetectionLevel, channelType, maxThreads, quietPeriod, timeout) =>
NettyConfig(leakDetectionLevel, channelType, maxThreads, quietPeriod, timeout)
case (channelT, maxThreads, quietPeriod, timeout) =>
new EventLoopGroups.Config {
override val channelType: ChannelType = channelT
override val nThreads: Int = maxThreads
override val shutdownQuietPeriod: Long = quietPeriod.toMillis
override val shutdownTimeOut: Long = timeout.toMillis
override val shutdownTimeUnit: TimeUnit = TimeUnit.MILLISECONDS
}
}

val default: NettyConfig = NettyConfig(
LeakDetectionLevel.SIMPLE,
ChannelType.AUTO,
0,
// Defaults taken from io.netty.util.concurrent.AbstractEventExecutor
Duration.fromSeconds(2),
Duration.fromSeconds(15),
)

val defaultWithFastShutdown: NettyConfig = default.copy(
shutdownQuietPeriodDuration = Duration.fromMillis(50),
shutdownTimeoutDuration = Duration.fromMillis(250),
)
def config: Config[NettyConfig] =
(LeakDetectionLevel.config.nested("leak-detection-level").withDefault(NettyConfig.default.leakDetectionLevel) ++
baseConfig.nested("worker-group").orElse(baseConfig) ++
baseConfig.nested("boss-group")).map { case (leakDetectionLevel, worker, boss) =>
def toDuration(n: Long, timeUnit: TimeUnit) = Duration.fromJava(java.time.Duration.of(n, timeUnit.toChronoUnit))
NettyConfig(
leakDetectionLevel,
worker.channelType,
worker.nThreads,
shutdownQuietPeriodDuration = toDuration(worker.shutdownQuietPeriod, worker.shutdownTimeUnit),
shutdownTimeoutDuration = toDuration(worker.shutdownTimeOut, worker.shutdownTimeUnit),
NettyConfig.BossGroup(
boss.channelType,
boss.nThreads,
shutdownQuietPeriodDuration = toDuration(boss.shutdownQuietPeriod, boss.shutdownTimeUnit),
shutdownTimeOutDuration = toDuration(boss.shutdownTimeOut, boss.shutdownTimeUnit),
),
)
}

val default: NettyConfig = {
val quietPeriod = Duration.fromSeconds(2)
val timeout = Duration.fromSeconds(15)
NettyConfig(
LeakDetectionLevel.SIMPLE,
ChannelType.AUTO,
java.lang.Runtime.getRuntime.availableProcessors(),
// Defaults taken from io.netty.util.concurrent.AbstractEventExecutor
shutdownQuietPeriodDuration = quietPeriod,
shutdownTimeoutDuration = timeout,
NettyConfig.BossGroup(
ChannelType.AUTO,
1,
shutdownQuietPeriodDuration = quietPeriod,
shutdownTimeOutDuration = timeout,
),
)
}

val defaultWithFastShutdown: NettyConfig = {
val quietPeriod = Duration.fromMillis(50)
val timeout = Duration.fromMillis(250)
default.copy(
shutdownQuietPeriodDuration = quietPeriod,
shutdownTimeoutDuration = timeout,
bossGroup = default.bossGroup.copy(
shutdownQuietPeriodDuration = quietPeriod,
shutdownTimeOutDuration = timeout,
),
)
}

sealed trait LeakDetectionLevel {
self =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,10 +109,13 @@ private[netty] object Conversions {
}

def statusToNetty(status: Status): HttpResponseStatus =
HttpResponseStatus.valueOf(status.code)
HttpResponseStatus.valueOf(status.code, status.reasonPhrase)

def statusFromNetty(status: HttpResponseStatus): Status =
Status.fromInt(status.code)
Status.fromInt(status.code) match {
case Status.Custom(code, _) => Status.Custom(code, status.reasonPhrase)
case status => status
}

def schemeToNetty(scheme: Scheme): Option[HttpScheme] = scheme match {
case Scheme.HTTP => Option(HttpScheme.HTTP)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ private[zio] final case class NettyDriver(
channelFactory: ChannelFactory[ServerChannel],
channelInitializer: ChannelInitializer[Channel],
serverInboundHandler: ServerInboundHandler,
eventLoopGroup: EventLoopGroup,
eventLoopGroups: ServerEventLoopGroups,
serverConfig: Server.Config,
nettyConfig: NettyConfig,
) extends Driver { self =>
Expand All @@ -44,7 +44,7 @@ private[zio] final case class NettyDriver(
for {
chf <- ZIO.attempt {
new ServerBootstrap()
.group(eventLoopGroup)
.group(eventLoopGroups.boss, eventLoopGroups.worker)
.channelFactory(channelFactory)
.childHandler(channelInitializer)
.option[Integer](ChannelOption.SO_BACKLOG, serverConfig.soBacklog)
Expand Down Expand Up @@ -84,7 +84,7 @@ private[zio] final case class NettyDriver(
channelFactory <- ChannelFactories.Client.live.build
.provideSomeEnvironment[Scope](_ ++ ZEnvironment[ChannelType.Config](nettyConfig))
nettyRuntime <- NettyRuntime.live.build
} yield NettyClientDriver(channelFactory.get, eventLoopGroup, nettyRuntime.get)
} yield NettyClientDriver(channelFactory.get, eventLoopGroups.worker, nettyRuntime.get)

override def toString: String = s"NettyDriver($serverConfig)"
}
Expand All @@ -97,7 +97,7 @@ object NettyDriver {
RoutesRef
& ChannelFactory[ServerChannel]
& ChannelInitializer[Channel]
& EventLoopGroup
& ServerEventLoopGroups
& Server.Config
& NettyConfig
& ServerInboundHandler,
Expand All @@ -108,7 +108,7 @@ object NettyDriver {
app <- ZIO.service[RoutesRef]
cf <- ZIO.service[ChannelFactory[ServerChannel]]
cInit <- ZIO.service[ChannelInitializer[Channel]]
elg <- ZIO.service[EventLoopGroup]
elg <- ZIO.service[ServerEventLoopGroups]
sc <- ZIO.service[Server.Config]
nsc <- ZIO.service[NettyConfig]
sih <- ZIO.service[ServerInboundHandler]
Expand All @@ -117,14 +117,15 @@ object NettyDriver {
channelFactory = cf,
channelInitializer = cInit,
serverInboundHandler = sih,
eventLoopGroup = elg,
eventLoopGroups = elg,
serverConfig = sc,
nettyConfig = nsc,
)

val manual: ZLayer[EventLoopGroup & ChannelFactory[ServerChannel] & Server.Config & NettyConfig, Nothing, Driver] = {
val manual
: ZLayer[ServerEventLoopGroups & ChannelFactory[ServerChannel] & Server.Config & NettyConfig, Nothing, Driver] = {
implicit val trace: Trace = Trace.empty
ZLayer.makeSome[EventLoopGroup & ChannelFactory[ServerChannel] & Server.Config & NettyConfig, Driver](
ZLayer.makeSome[ServerEventLoopGroups & ChannelFactory[ServerChannel] & Server.Config & NettyConfig, Driver](
ZLayer(AppRef.empty),
ServerChannelInitializer.layer,
ServerInboundHandler.live,
Expand All @@ -135,7 +136,7 @@ object NettyDriver {
val customized: ZLayer[Server.Config & NettyConfig, Throwable, Driver] = {
val serverChannelFactory: ZLayer[NettyConfig, Nothing, ChannelFactory[ServerChannel]] =
ChannelFactories.Server.fromConfig
val eventLoopGroup: ZLayer[NettyConfig, Nothing, EventLoopGroup] = EventLoopGroups.live
val eventLoopGroup: ZLayer[NettyConfig, Nothing, ServerEventLoopGroups] = ServerEventLoopGroups.live

ZLayer.makeSome[Server.Config & NettyConfig, Driver](
eventLoopGroup,
Expand Down
Loading

0 comments on commit 3fd21ef

Please sign in to comment.