Skip to content

Commit

Permalink
Experimental support for Netty IO_URING incubator
Browse files Browse the repository at this point in the history
  • Loading branch information
AntonRoskvist committed Oct 11, 2024
1 parent eabeae0 commit b04da7f
Show file tree
Hide file tree
Showing 9 changed files with 123 additions and 7 deletions.
9 changes: 9 additions & 0 deletions artemis-core-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,15 @@
<groupId>io.netty</groupId>
<artifactId>netty-transport-classes-kqueue</artifactId>
</dependency>
<dependency>
<groupId>io.netty.incubator</groupId>
<artifactId>netty-incubator-transport-native-io_uring</artifactId>
<classifier>${netty-transport-native-io_uring-classifier}</classifier>
</dependency>
<dependency>
<groupId>io.netty.incubator</groupId>
<artifactId>netty-incubator-transport-classes-io_uring</artifactId>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-codec-http</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -353,4 +353,11 @@ public interface ActiveMQClientLogger {

@LogMessage(id = 214036, value = "Connection closure to {} has been detected: {} [code={}]", level = LogMessage.Level.INFO)
void connectionClosureDetected(String remoteAddress, String message, ActiveMQExceptionType type);

@LogMessage(id = 214037, value = "Unable to check IoUring availability ", level = LogMessage.Level.WARN)
void unableToCheckIoUringAvailability(Throwable e);

@LogMessage(id = 214038, value = "IoUring is not available, please add to the classpath or configure useIoUring=false to remove this warning", level = LogMessage.Level.WARN)
void unableToCheckIoUringAvailabilitynoClass();

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import io.netty.channel.epoll.Epoll;
import io.netty.channel.kqueue.KQueue;
import io.netty.incubator.channel.uring.IOUring;
import org.apache.activemq.artemis.core.client.ActiveMQClientLogger;
import org.apache.activemq.artemis.utils.Env;
import org.slf4j.Logger;
Expand All @@ -45,6 +46,18 @@ public static final boolean isEpollAvailable() {
}
}

public static final boolean isIoUringAvailable() {
try {
return Env.isLinuxOs() && IOUring.isAvailable();
} catch (NoClassDefFoundError noClassDefFoundError) {
ActiveMQClientLogger.LOGGER.unableToCheckIoUringAvailabilitynoClass();
return false;
} catch (Throwable e) {
ActiveMQClientLogger.LOGGER.unableToCheckIoUringAvailability(e);
return false;
}
}

public static final boolean isKQueueAvailable() {
try {
return Env.isMacOs() && KQueue.isAvailable();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,10 @@
import java.net.URISyntaxException;
import java.net.UnknownHostException;
import java.nio.charset.StandardCharsets;
import java.security.AccessController;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.security.PrivilegedAction;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -98,6 +100,9 @@
import io.netty.handler.proxy.Socks4ProxyHandler;
import io.netty.handler.proxy.Socks5ProxyHandler;
import io.netty.handler.ssl.SslHandler;
import io.netty.incubator.channel.uring.IOUringEventLoopGroup;
import io.netty.incubator.channel.uring.IOUringServerSocketChannel;
import io.netty.incubator.channel.uring.IOUringSocketChannel;
import io.netty.resolver.NoopAddressResolverGroup;
import io.netty.util.AttributeKey;
import io.netty.util.ResourceLeakDetector;
Expand All @@ -109,6 +114,7 @@
import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.core.client.ActiveMQClientLogger;
import org.apache.activemq.artemis.core.client.ActiveMQClientMessageBundle;
import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl;
import org.apache.activemq.artemis.core.protocol.core.impl.ActiveMQClientProtocolManager;
import org.apache.activemq.artemis.core.remoting.impl.ssl.SSLSupport;
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
Expand All @@ -122,6 +128,7 @@
import org.apache.activemq.artemis.spi.core.remoting.ssl.OpenSSLContextFactoryProvider;
import org.apache.activemq.artemis.spi.core.remoting.ssl.SSLContextConfig;
import org.apache.activemq.artemis.spi.core.remoting.ssl.SSLContextFactoryProvider;
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
import org.apache.activemq.artemis.utils.ConfigurationHelper;
import org.apache.activemq.artemis.utils.FutureLatch;
import org.apache.activemq.artemis.utils.IPV6Util;
Expand All @@ -137,6 +144,7 @@ public class NettyConnector extends AbstractConnector {
public static String NIO_CONNECTOR_TYPE = "NIO";
public static String EPOLL_CONNECTOR_TYPE = "EPOLL";
public static String KQUEUE_CONNECTOR_TYPE = "KQUEUE";
public static String IOURING_CONNECTOR_TYPE = "IO_URING";

private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

Expand Down Expand Up @@ -295,6 +303,8 @@ public class NettyConnector extends AbstractConnector {

private boolean useKQueue;

private boolean useIoUring;

private int remotingThreads;

private boolean useGlobalWorkerPool;
Expand Down Expand Up @@ -404,6 +414,7 @@ public NettyConnector(final Map<String, Object> configuration,

useEpoll = ConfigurationHelper.getBooleanProperty(TransportConstants.USE_EPOLL_PROP_NAME, TransportConstants.DEFAULT_USE_EPOLL, configuration);
useKQueue = ConfigurationHelper.getBooleanProperty(TransportConstants.USE_KQUEUE_PROP_NAME, TransportConstants.DEFAULT_USE_KQUEUE, configuration);
useIoUring = ConfigurationHelper.getBooleanProperty(TransportConstants.USE_IOURING_PROP_NAME, TransportConstants.DEFAULT_USE_IOURING, configuration);

useServlet = ConfigurationHelper.getBooleanProperty(TransportConstants.USE_SERVLET_PROP_NAME, TransportConstants.DEFAULT_USE_SERVLET, configuration);
host = ConfigurationHelper.getStringProperty(TransportConstants.HOST_PROP_NAME, TransportConstants.DEFAULT_HOST, configuration);
Expand Down Expand Up @@ -528,41 +539,61 @@ public synchronized void start() {
return;
}

boolean defaultRemotingThreads = remotingThreads == -1;

if (remotingThreads == -1) {
// Default to number of cores * 3
remotingThreads = Runtime.getRuntime().availableProcessors() * 3;
}

String connectorType;

if (useEpoll && CheckDependencies.isEpollAvailable()) {
if (useIoUring && CheckDependencies.isIoUringAvailable()) {
//IO_URING should default to 1 remotingThread unless specified in config
remotingThreads = defaultRemotingThreads ? 1 : remotingThreads;

if (useGlobalWorkerPool) {
group = SharedEventLoopGroup.getInstance((threadFactory -> new IOUringEventLoopGroup(remotingThreads, threadFactory)));
} else {
group = new IOUringEventLoopGroup(remotingThreads);
}

connectorType = IOURING_CONNECTOR_TYPE;
channelClazz = IOUringSocketChannel.class;

logger.debug("Connector {} using native io_uring", this);
} else if (useEpoll && CheckDependencies.isEpollAvailable()) {
if (useGlobalWorkerPool) {
group = SharedEventLoopGroup.getInstance((threadFactory -> new EpollEventLoopGroup(remotingThreads, threadFactory)));
} else {
group = new EpollEventLoopGroup(remotingThreads);
}

connectorType = EPOLL_CONNECTOR_TYPE;
channelClazz = EpollSocketChannel.class;

logger.debug("Connector {} using native epoll", this);
} else if (useKQueue && CheckDependencies.isKQueueAvailable()) {
if (useGlobalWorkerPool) {
group = SharedEventLoopGroup.getInstance((threadFactory -> new KQueueEventLoopGroup(remotingThreads, threadFactory)));
} else {
group = new KQueueEventLoopGroup(remotingThreads);
}

connectorType = KQUEUE_CONNECTOR_TYPE;
channelClazz = KQueueSocketChannel.class;

logger.debug("Connector {} using native kqueue", this);
} else {
if (useGlobalWorkerPool) {
channelClazz = NioSocketChannel.class;
group = SharedEventLoopGroup.getInstance((threadFactory -> new NioEventLoopGroup(remotingThreads, threadFactory)));
} else {
channelClazz = NioSocketChannel.class;
group = new NioEventLoopGroup(remotingThreads);
}

connectorType = NIO_CONNECTOR_TYPE;
channelClazz = NioSocketChannel.class;

logger.debug("Connector {} using nio", this);
}
// if we are a servlet wrap the socketChannelFactory
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ public class TransportConstants {

public static final String USE_EPOLL_PROP_NAME = "useEpoll";

public static final String USE_IOURING_PROP_NAME = "useIoUring";

public static final String USE_KQUEUE_PROP_NAME = "useKQueue";

@Deprecated
Expand Down Expand Up @@ -213,6 +215,8 @@ public class TransportConstants {

public static final boolean DEFAULT_USE_KQUEUE = true;

public static final boolean DEFAULT_USE_IOURING = false;

public static final boolean DEFAULT_USE_INVM = false;

public static final boolean DEFAULT_USE_SERVLET = false;
Expand Down Expand Up @@ -409,6 +413,7 @@ private static int parseDefaultVariable(String variableName, int defaultValue) {
allowableAcceptorKeys.add(TransportConstants.USE_NIO_PROP_NAME);
allowableAcceptorKeys.add(TransportConstants.USE_EPOLL_PROP_NAME);
allowableAcceptorKeys.add(TransportConstants.USE_KQUEUE_PROP_NAME);
allowableAcceptorKeys.add(TransportConstants.USE_IOURING_PROP_NAME);
allowableAcceptorKeys.add(TransportConstants.USE_INVM_PROP_NAME);
//noinspection deprecation
allowableAcceptorKeys.add(TransportConstants.PROTOCOL_PROP_NAME);
Expand Down Expand Up @@ -484,6 +489,7 @@ private static int parseDefaultVariable(String variableName, int defaultValue) {
allowableConnectorKeys.add(TransportConstants.USE_NIO_GLOBAL_WORKER_POOL_PROP_NAME);
allowableConnectorKeys.add(TransportConstants.USE_EPOLL_PROP_NAME);
allowableConnectorKeys.add(TransportConstants.USE_KQUEUE_PROP_NAME);
allowableConnectorKeys.add(TransportConstants.USE_IOURING_PROP_NAME);
allowableConnectorKeys.add(TransportConstants.USE_GLOBAL_WORKER_POOL_PROP_NAME);
allowableConnectorKeys.add(TransportConstants.HOST_PROP_NAME);
allowableConnectorKeys.add(TransportConstants.PORT_PROP_NAME);
Expand Down
13 changes: 13 additions & 0 deletions artemis-pom/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -440,6 +440,19 @@
<classifier>${netty-transport-native-kqueue-classifier}</classifier>
<!-- License: Apache 2.0 -->
</dependency>
<dependency>
<groupId>io.netty.incubator</groupId>
<artifactId>netty-incubator-transport-classes-io_uring</artifactId>
<version>${netty.incubator.io_uring.version}</version>
<!-- License: Apache 2.0 -->
</dependency>
<dependency>
<groupId>io.netty.incubator</groupId>
<artifactId>netty-incubator-transport-native-io_uring</artifactId>
<version>${netty.incubator.io_uring.version}</version>
<classifier>${netty-transport-native-io_uring-classifier}</classifier>
<!-- License: Apache 2.0 -->
</dependency>
<dependency>
<groupId>org.apache.qpid</groupId>
<artifactId>proton-j</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslHandler;
import io.netty.incubator.channel.uring.IOUringEventLoopGroup;
import io.netty.incubator.channel.uring.IOUringServerSocketChannel;
import io.netty.util.ResourceLeakDetector;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
Expand Down Expand Up @@ -112,6 +114,7 @@ public class NettyAcceptor extends AbstractAcceptor {
public static final String NIO_ACCEPTOR_TYPE = "NIO";
public static final String EPOLL_ACCEPTOR_TYPE = "EPOLL";
public static final String KQUEUE_ACCEPTOR_TYPE = "KQUEUE";
public static final String IOURING_ACCEPTOR_TYPE = "EXPERIMENTAL_IO_URING";

static {
// Disable default Netty leak detection if the Netty leak detection level system properties are not in use
Expand Down Expand Up @@ -148,6 +151,8 @@ public class NettyAcceptor extends AbstractAcceptor {

private final boolean useKQueue;

private final boolean useIoUring;

private final ProtocolHandler protocolHandler;

private final String host;
Expand Down Expand Up @@ -276,6 +281,7 @@ public NettyAcceptor(final String name,

useEpoll = ConfigurationHelper.getBooleanProperty(TransportConstants.USE_EPOLL_PROP_NAME, TransportConstants.DEFAULT_USE_EPOLL, configuration);
useKQueue = ConfigurationHelper.getBooleanProperty(TransportConstants.USE_KQUEUE_PROP_NAME, TransportConstants.DEFAULT_USE_KQUEUE, configuration);
useIoUring = ConfigurationHelper.getBooleanProperty(TransportConstants.USE_IOURING_PROP_NAME, TransportConstants.DEFAULT_USE_IOURING, configuration);

backlog = ConfigurationHelper.getIntProperty(TransportConstants.BACKLOG_PROP_NAME, -1, configuration);
useInvm = ConfigurationHelper.getBooleanProperty(TransportConstants.USE_INVM_PROP_NAME, TransportConstants.DEFAULT_USE_INVM, configuration);
Expand Down Expand Up @@ -425,12 +431,23 @@ public synchronized void start() throws Exception {
eventLoopGroup = new DefaultEventLoopGroup();
} else {

boolean defaultRemotingThreads = remotingThreads == -1;

if (remotingThreads == -1) {
// Default to number of cores * 3
remotingThreads = Runtime.getRuntime().availableProcessors() * 3;
}

if (useEpoll && CheckDependencies.isEpollAvailable()) {
if (useIoUring && CheckDependencies.isIoUringAvailable()) {
//IO_URING should default to 1 remotingThread unless specified in config
remotingThreads = defaultRemotingThreads ? 1 : remotingThreads;

channelClazz = IOUringServerSocketChannel.class;
eventLoopGroup = new IOUringEventLoopGroup(remotingThreads, AccessController.doPrivileged((PrivilegedAction<ActiveMQThreadFactory>) () -> new ActiveMQThreadFactory("activemq-netty-threads", true, ClientSessionFactoryImpl.class.getClassLoader())));
acceptorType = IOURING_ACCEPTOR_TYPE;

logger.debug("Acceptor using native io_uring");
} else if (useEpoll && CheckDependencies.isEpollAvailable()) {
channelClazz = EpollServerSocketChannel.class;
eventLoopGroup = new EpollEventLoopGroup(remotingThreads, AccessController.doPrivileged((PrivilegedAction<ActiveMQThreadFactory>) () -> new ActiveMQThreadFactory("activemq-netty-threads", true, ClientSessionFactoryImpl.class.getClassLoader())));
acceptorType = EPOLL_ACCEPTOR_TYPE;
Expand All @@ -446,6 +463,7 @@ public synchronized void start() throws Exception {
channelClazz = NioServerSocketChannel.class;
eventLoopGroup = new NioEventLoopGroup(remotingThreads, AccessController.doPrivileged((PrivilegedAction<ActiveMQThreadFactory>) () -> new ActiveMQThreadFactory("activemq-netty-threads", true, ClientSessionFactoryImpl.class.getClassLoader())));
acceptorType = NIO_ACCEPTOR_TYPE;

logger.debug("Acceptor using nio");
}
}
Expand Down
23 changes: 20 additions & 3 deletions docs/user-manual/configuring-transports.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -243,14 +243,14 @@ These Native transports add features specific to a particular platform, generate

Both Clients and Server can benefit from this.

Current Supported Platforms.
Currently supported platforms:

* Linux running 64bit JVM
* MacOS running 64bit JVM

Apache ActiveMQ Artemis will by default enable the corresponding native transport if a supported platform is detected.
Apache ActiveMQ Artemis will enable the corresponding native transport by default if a supported platform is detected.

If running on an unsupported platform or any issues loading native libs, Apache ActiveMQ Artemis will fallback onto Java NIO.
If running on an unsupported platform, or if any issues occur while loading the native libs, Apache ActiveMQ Artemis will fallback onto Java NIO.

==== Linux Native Transport

Expand All @@ -263,6 +263,23 @@ enables the use of epoll if a supported linux platform is running a 64bit JVM is
Setting this to `false` will force the use of Java NIO instead of epoll.
Default is `true`


Additionally, Apache ActiveMQ Artemis offers `experimental` support for using IO_URING, @see https://en.wikipedia.org/wiki/Io_uring.

The following properties are specific to this native transport:

useIoUring::
enables the use of IO_URING if a supported linux platform running a 64bit JVM is detected.
Setting this to `false` will attempt the use of `epoll`, then finally falling back to using Java NIO.
Default is `false`

[WARNING]
====
[#io_uring-warning]
IO_URING support is `experimental` at this point. Using it _could_ introduce unwanted side effects or unpredicted behavior.
It's currently not recommended for production or any otherwise critical use.
====

==== MacOS Native Transport

On supported MacOS platforms KQueue is used, @see https://en.wikipedia.org/wiki/Kqueue.
Expand Down
2 changes: 2 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@
<mockito.version>5.14.1</mockito.version>
<jctools.version>4.0.5</jctools.version>
<netty.version>4.1.114.Final</netty.version>
<netty.incubator.io_uring.version>0.0.25.Final</netty.incubator.io_uring.version>
<hdrhistogram.version>2.2.2</hdrhistogram.version>
<curator.version>5.7.0</curator.version>
<zookeeper.version>3.9.2</zookeeper.version>
Expand Down Expand Up @@ -261,6 +262,7 @@

<netty-transport-native-epoll-classifier>linux-x86_64</netty-transport-native-epoll-classifier>
<netty-transport-native-kqueue-classifier>osx-x86_64</netty-transport-native-kqueue-classifier>
<netty-transport-native-io_uring-classifier>linux-x86_64</netty-transport-native-io_uring-classifier>

<fast-tests>false</fast-tests>

Expand Down

0 comments on commit b04da7f

Please sign in to comment.