Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SPI Queue/QE #1572

Open
wants to merge 1 commit into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,14 @@
import net.openhft.chronicle.queue.impl.table.ReadonlyTableStore;
import net.openhft.chronicle.queue.impl.table.SingleTableBuilder;
import net.openhft.chronicle.queue.internal.domestic.QueueOffsetSpec;
import net.openhft.chronicle.queue.providers.EnterpriseQueueFactories;
import net.openhft.chronicle.threads.*;
import net.openhft.chronicle.wire.*;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

import javax.crypto.spec.SecretKeySpec;
import java.io.File;
import java.lang.reflect.Constructor;
import java.nio.file.Path;
import java.time.LocalTime;
import java.time.ZoneId;
Expand All @@ -64,7 +64,6 @@ public class SingleChronicleQueueBuilder extends SelfDescribingMarshallable impl
public static final long SMALL_BLOCK_SIZE = OS.isWindows() ? OS.SAFE_PAGE_SIZE : OS.pageSize(); // the smallest safe block size on Windows 8+

public static final long DEFAULT_SPARSE_CAPACITY = 512L << 30;
private static final Constructor<?> ENTERPRISE_QUEUE_CONSTRUCTOR;
private static final WireStoreFactory storeFactory = SingleChronicleQueueBuilder::createStore;
private static final Supplier<TimingPauser> TIMING_PAUSER_SUPPLIER = DefaultPauserSupplier.INSTANCE;

Expand All @@ -74,15 +73,6 @@ public class SingleChronicleQueueBuilder extends SelfDescribingMarshallable impl
CLASS_ALIASES.addAlias(SCQRoll.class, "SCQSRoll");
CLASS_ALIASES.addAlias(SCQIndexing.class, "SCQSIndexing");
CLASS_ALIASES.addAlias(SingleChronicleQueueStore.class, "SCQStore");

Constructor<?> co;
try {
co = ((Class<?>) Class.forName("software.chronicle.enterprise.queue.EnterpriseSingleChronicleQueue")).getDeclaredConstructors()[0];
Jvm.setAccessible(co);
} catch (Exception e) {
co = null;
}
ENTERPRISE_QUEUE_CONSTRUCTOR = co;
}

private BufferMode writeBufferMode = BufferMode.None;
Expand Down Expand Up @@ -224,7 +214,7 @@ static SingleChronicleQueueStore createStore(@NotNull RollingChronicleQueue queu
}

public static boolean areEnterpriseFeaturesAvailable() {
return ENTERPRISE_QUEUE_CONSTRUCTOR != null;
return EnterpriseQueueFactories.get().areEnterpriseFeaturesAvailable();
}

private static RollCycle loadDefaultRollCycle() {
Expand Down Expand Up @@ -284,10 +274,10 @@ public SingleChronicleQueue build() {

// It is important to check enterprise features after preBuild()
// Enterprise-only config options can be loaded from the metadata
if (checkEnterpriseFeaturesRequested())
chronicleQueue = buildEnterprise();
else
chronicleQueue = new SingleChronicleQueue(this);
if (checkEnterpriseFeaturesRequested() && !areEnterpriseFeaturesAvailable()) {
throw new IllegalStateException("Enterprise features requested but Chronicle Queue Enterprise is not in the class path!");
}
chronicleQueue = EnterpriseQueueFactories.get().newInstance(this);

postBuild(chronicleQueue);

Expand Down Expand Up @@ -324,23 +314,11 @@ private boolean checkEnterpriseFeaturesRequested() {
}

public static boolean onlyAvailableInEnterprise(final String feature) {
if (ENTERPRISE_QUEUE_CONSTRUCTOR == null)
if (!areEnterpriseFeaturesAvailable())
Jvm.warn().on(SingleChronicleQueueBuilder.class, feature + " is only supported in Chronicle Queue Enterprise. If you would like to use this feature, please contact [email protected] for more information.");
return true;
}

@NotNull
private SingleChronicleQueue buildEnterprise() {
if (ENTERPRISE_QUEUE_CONSTRUCTOR == null)
throw new IllegalStateException("Enterprise features requested but Chronicle Queue Enterprise is not in the class path!");

try {
return (SingleChronicleQueue) ENTERPRISE_QUEUE_CONSTRUCTOR.newInstance(this);
} catch (Exception e) {
throw new IllegalStateException("Couldn't create an instance of Enterprise queue", e);
}
}

public SingleChronicleQueueBuilder aesEncryption(@Nullable byte[] keyBytes) {
if (keyBytes == null) {
codingSuppliers(null, null);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package net.openhft.chronicle.queue.impl.single;

import net.openhft.chronicle.queue.providers.QueueFactory;

public class SingleChronicleQueueFactory implements QueueFactory {

@Override
public boolean areEnterpriseFeaturesAvailable() {
return false;
}

@Override
public SingleChronicleQueue newInstance(SingleChronicleQueueBuilder queueBuilder) {
return new SingleChronicleQueue(queueBuilder);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package net.openhft.chronicle.queue.providers;

import net.openhft.chronicle.core.Jvm;
import net.openhft.chronicle.queue.impl.single.SingleChronicleQueueFactory;
import org.jetbrains.annotations.NotNull;

import java.util.ServiceLoader;

public class EnterpriseQueueFactories {

private static final String PREFERRED_FACTORY_CLASS_NAME = Jvm.getProperty("net.openhft.chronicle.queue.providers.EnterpriseQueueWrapper",
"software.chronicle.enterprise.queue.EnterpriseQueueFactory");

private static QueueFactory queueFactory;

/**
* Get the {@link QueueFactory}
*
* @return the active queue wrapper
*/
@NotNull
public static QueueFactory get() {
if (queueFactory == null) {
final ServiceLoader<QueueFactory> load = ServiceLoader.load(QueueFactory.class);
for (QueueFactory factory : load) {
// last one in wins, unless we encounter the "preferred" one
queueFactory = factory;
if (PREFERRED_FACTORY_CLASS_NAME.equals(factory.getClass().getName())) {
break;
}
}
if (queueFactory == null) {
Jvm.error().on(EnterpriseQueueFactories.class, "There's no queue wrapper factory configured, this shouldn't happen. ");
queueFactory = new SingleChronicleQueueFactory();
}
}
return queueFactory;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package net.openhft.chronicle.queue.providers;

import net.openhft.chronicle.queue.impl.single.SingleChronicleQueue;
import net.openhft.chronicle.queue.impl.single.SingleChronicleQueueBuilder;

public interface QueueFactory {

// SPI for SingleChronicleQueueBuilder
boolean areEnterpriseFeaturesAvailable();

SingleChronicleQueue newInstance(SingleChronicleQueueBuilder queueBuilder);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
net.openhft.chronicle.queue.impl.single.SingleChronicleQueueFactory