diff --git a/warehouse/ingest-core/pom.xml b/warehouse/ingest-core/pom.xml index 90fb606c9be..014454ea370 100644 --- a/warehouse/ingest-core/pom.xml +++ b/warehouse/ingest-core/pom.xml @@ -212,6 +212,21 @@ javassist test + + org.junit.jupiter + junit-jupiter-api + test + + + org.junit.jupiter + junit-jupiter-engine + test + + + org.junit.jupiter + junit-jupiter-params + test + diff --git a/warehouse/ingest-core/src/main/java/datawave/ingest/data/config/CachedFieldConfigHelper.java b/warehouse/ingest-core/src/main/java/datawave/ingest/data/config/CachedFieldConfigHelper.java new file mode 100644 index 00000000000..e9cb340aba2 --- /dev/null +++ b/warehouse/ingest-core/src/main/java/datawave/ingest/data/config/CachedFieldConfigHelper.java @@ -0,0 +1,208 @@ +package datawave.ingest.data.config; + +import static java.lang.Thread.NORM_PRIORITY; +import static java.util.concurrent.TimeUnit.SECONDS; + +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Predicate; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.util.concurrent.ThreadFactoryBuilder; + +public class CachedFieldConfigHelper implements FieldConfigHelper { + private final static Logger log = LoggerFactory.getLogger(CachedFieldConfigHelper.class); + + private final static float DEFAULT_LRU_LF = 0.75f; + private final static int DEFAULT_DEBUG_STATE_SECS = 30; + + private final FieldConfigHelper underlyingHelper; + private final LruCache resultCache; + private final boolean debugLimitsEnabled; + private final int limit; + private final Set debugFieldUnique; + private final ScheduledExecutorService debugStateExecutor; + private final AtomicLong debugFieldComputes; + + enum AttributeType { + INDEXED_FIELD, REVERSE_INDEXED_FIELD, TOKENIZED_FIELD, REVERSE_TOKENIZED_FIELD, STORED_FIELD, INDEX_ONLY_FIELD + } + + public CachedFieldConfigHelper(FieldConfigHelper helper, int limit) { + this(helper, limit, false); + } + + public CachedFieldConfigHelper(FieldConfigHelper helper, int limit, boolean debugLimitEnabled) { + if (limit < 1) { + throw new IllegalArgumentException("Limit must be a positive integer"); + } + this.underlyingHelper = helper; + this.resultCache = new LruCache<>(limit); + this.limit = limit; + this.debugLimitsEnabled = debugLimitEnabled; + this.debugFieldUnique = new HashSet<>(); + this.debugFieldComputes = new AtomicLong(); + + if (debugLimitEnabled) { + this.debugStateExecutor = Executors.newSingleThreadScheduledExecutor( + // @formatter:off + new ThreadFactoryBuilder() + .setPriority(NORM_PRIORITY) + .setDaemon(true) + .setNameFormat("CachedFieldConfigHelper.DebugState") + .build() + // formatter:off + ); + this.debugStateExecutor.scheduleAtFixedRate(this::debugLogState, DEFAULT_DEBUG_STATE_SECS, DEFAULT_DEBUG_STATE_SECS, SECONDS); + } else { + this.debugStateExecutor = null; + } + } + + @Override + public boolean isStoredField(String fieldName) { + return getFieldResult(AttributeType.STORED_FIELD, fieldName, underlyingHelper::isStoredField); + } + + @Override + public boolean isIndexedField(String fieldName) { + return getFieldResult(AttributeType.INDEXED_FIELD, fieldName, underlyingHelper::isIndexedField); + } + + @Override + public boolean isIndexOnlyField(String fieldName) { + return getFieldResult(AttributeType.INDEX_ONLY_FIELD, fieldName, underlyingHelper::isIndexOnlyField); + } + + @Override + public boolean isReverseIndexedField(String fieldName) { + return getFieldResult(AttributeType.REVERSE_INDEXED_FIELD, fieldName, underlyingHelper::isReverseIndexedField); + } + + @Override + public boolean isTokenizedField(String fieldName) { + return getFieldResult(AttributeType.TOKENIZED_FIELD, fieldName, underlyingHelper::isTokenizedField); + } + + @Override + public boolean isReverseTokenizedField(String fieldName) { + return getFieldResult(AttributeType.REVERSE_TOKENIZED_FIELD, fieldName, underlyingHelper::isReverseTokenizedField); + } + + @VisibleForTesting + boolean getFieldResult(AttributeType attributeType, String fieldName, Predicate fn) { + CachedEntry ce = !debugLimitsEnabled ? + resultCache.computeIfAbsent(fieldName, CachedEntry::new) : + resultCache.computeIfAbsent(fieldName, this::debugCachedEntryCreation); + return ce.get(attributeType).getResultOrEvaluate(fn); + } + + @VisibleForTesting + boolean hasLimitExceeded() { + return resultCache.hasLimitExceeded(); + } + + private CachedEntry debugCachedEntryCreation(String fieldName) { + debugFieldComputes.incrementAndGet(); + debugFieldUnique.add(fieldName); + return new CachedEntry(fieldName); + } + + private void debugLogState() { + if (resultCache.hasLimitExceeded()) { + log.info("Field cache LRU limit exceeded [limit={}, debug={}, size={}, uniq={}]", + limit, debugFieldComputes.get(), debugFieldUnique.size(), debugLimitsEnabled); + } + } + + private static class LruCache extends LinkedHashMap { + private final int maxSize; + private volatile boolean limitExceeded; + + LruCache(int maxSize) { + super((int)(maxSize / DEFAULT_LRU_LF) + 1, DEFAULT_LRU_LF, true); + this.maxSize = maxSize; + } + + boolean hasLimitExceeded() { + // thread-safe + return limitExceeded; + } + + protected boolean removeEldestEntry(Map.Entry eldest) { + boolean localLimitExceeded = size() > maxSize; + if (localLimitExceeded) { + limitExceeded = true; + } + return localLimitExceeded; + } + } + + private static class CachedEntry { + private final String fieldName; + private final MemoizedResult indexed; + private final MemoizedResult reverseIndexed; + private final MemoizedResult stored; + private final MemoizedResult indexedOnly; + private final MemoizedResult tokenized; + private final MemoizedResult reverseTokenized; + + private CachedEntry(String fieldName) { + this.fieldName = fieldName; + this.indexed = new MemoizedResult(); + this.reverseIndexed = new MemoizedResult(); + this.stored = new MemoizedResult(); + this.indexedOnly = new MemoizedResult(); + this.tokenized = new MemoizedResult(); + this.reverseTokenized = new MemoizedResult(); + } + + private MemoizedResult get(AttributeType attributeType) { + MemoizedResult result; + switch (attributeType) { + case INDEX_ONLY_FIELD: + result = indexedOnly; + break; + case INDEXED_FIELD: + result = indexed; + break; + case REVERSE_INDEXED_FIELD: + result = reverseIndexed; + break; + case TOKENIZED_FIELD: + result = tokenized; + break; + case REVERSE_TOKENIZED_FIELD: + result = reverseTokenized; + break; + case STORED_FIELD: + result = stored; + break; + default: + throw new IllegalArgumentException("Undefined attribute type: " + attributeType); + } + return result; + } + + private class MemoizedResult { + private boolean resultEvaluated; + private boolean result; + + private boolean getResultOrEvaluate(Predicate evaluateFn) { + if (!resultEvaluated) { + result = evaluateFn.test(fieldName); + resultEvaluated = true; + } + return result; + } + } + } +} diff --git a/warehouse/ingest-core/src/main/java/datawave/ingest/data/config/ingest/BaseIngestHelper.java b/warehouse/ingest-core/src/main/java/datawave/ingest/data/config/ingest/BaseIngestHelper.java index c3d28d3a2d8..df2a069df05 100644 --- a/warehouse/ingest-core/src/main/java/datawave/ingest/data/config/ingest/BaseIngestHelper.java +++ b/warehouse/ingest-core/src/main/java/datawave/ingest/data/config/ingest/BaseIngestHelper.java @@ -33,6 +33,7 @@ import datawave.ingest.config.IngestConfigurationFactory; import datawave.ingest.data.Type; import datawave.ingest.data.TypeRegistry; +import datawave.ingest.data.config.CachedFieldConfigHelper; import datawave.ingest.data.config.DataTypeHelperImpl; import datawave.ingest.data.config.FieldConfigHelper; import datawave.ingest.data.config.MarkingsHelper; @@ -138,6 +139,9 @@ public abstract class BaseIngestHelper extends AbstractIngestHelper implements C public static final String FIELD_FAILED_NORMALIZATION_POLICY = ".data.field.normalization.failure.policy"; public static final String FIELD_CONFIG_FILE = ".data.category.field.config.file"; + public static final String FIELD_CONFIG_CACHE_ENABLED = ".data.category.field.config.cache.enabled"; + public static final String FIELD_CONFIG_CACHE_KEY_LIMIT = ".data.category.field.config.cache.limit"; + public static final String FIELD_CONFIG_CACHE_KEY_LIMIT_DEBUG = ".data.category.field.config.cache.limit.debug"; private static final Logger log = ThreadConfigurableLogger.getLogger(BaseIngestHelper.class); @@ -255,10 +259,19 @@ public void setup(Configuration config) { // Load the field helper, which takes precedence over the individual field configurations final String fieldConfigFile = config.get(this.getType().typeName() + FIELD_CONFIG_FILE); if (fieldConfigFile != null) { - if (log.isDebugEnabled()) { - log.debug("Field config file " + fieldConfigFile + " specified for: " + this.getType().typeName() + FIELD_CONFIG_FILE); + final boolean fieldConfigCacheEnabled = config.getBoolean(this.getType().typeName() + FIELD_CONFIG_CACHE_ENABLED, false); + final boolean fieldConfigCacheLimitDebug = config.getBoolean(this.getType().typeName() + FIELD_CONFIG_CACHE_KEY_LIMIT_DEBUG, false); + final int fieldConfigCacheLimit = config.getInt(this.getType().typeName() + FIELD_CONFIG_CACHE_KEY_LIMIT, 100); + log.info("Field config file " + fieldConfigFile + " specified for: " + this.getType().typeName() + FIELD_CONFIG_FILE); + log.info("Field config cache enabled: " + fieldConfigCacheEnabled); + if (fieldConfigCacheEnabled) { + log.info("Field config cache limit: " + fieldConfigCacheLimit); + log.info("Field config cache limit debug: " + fieldConfigCacheLimitDebug); + } + fieldConfigHelper = XMLFieldConfigHelper.load(fieldConfigFile, this); + if (fieldConfigCacheEnabled) { + fieldConfigHelper = new CachedFieldConfigHelper(fieldConfigHelper, fieldConfigCacheLimit, fieldConfigCacheLimitDebug); } - this.fieldConfigHelper = XMLFieldConfigHelper.load(fieldConfigFile, this); } // Process the indexed fields diff --git a/warehouse/ingest-core/src/test/java/datawave/ingest/data/config/CachedFieldConfigHelperTest.java b/warehouse/ingest-core/src/test/java/datawave/ingest/data/config/CachedFieldConfigHelperTest.java new file mode 100644 index 00000000000..e040edccd95 --- /dev/null +++ b/warehouse/ingest-core/src/test/java/datawave/ingest/data/config/CachedFieldConfigHelperTest.java @@ -0,0 +1,117 @@ +package datawave.ingest.data.config; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.concurrent.atomic.AtomicLong; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; +import org.junit.jupiter.params.provider.ValueSource; + +public class CachedFieldConfigHelperTest { + @Test + public void testCachingBehaviorWillCallBaseMethods() { + String fieldName = "test"; + FieldConfigHelper mockHelper = mock(FieldConfigHelper.class); + FieldConfigHelper cachedHelper = new CachedFieldConfigHelper(mockHelper, 1); + + cachedHelper.isIndexOnlyField(fieldName); + verify(mockHelper).isIndexOnlyField(eq(fieldName)); + + cachedHelper.isIndexedField(fieldName); + verify(mockHelper).isIndexedField(eq(fieldName)); + + cachedHelper.isTokenizedField(fieldName); + verify(mockHelper).isTokenizedField(eq(fieldName)); + + cachedHelper.isStoredField(fieldName); + verify(mockHelper).isStoredField(eq(fieldName)); + + cachedHelper.isReverseIndexedField(fieldName); + verify(mockHelper).isReverseIndexedField(eq(fieldName)); + + cachedHelper.isReverseTokenizedField(fieldName); + verify(mockHelper).isReverseTokenizedField(eq(fieldName)); + } + + @ParameterizedTest + @ValueSource(ints = {-1, 0}) + public void testConstructorWithNonPositiveLimitWillThrow(int limit) { + assertThrows(IllegalArgumentException.class, () -> new CachedFieldConfigHelper(mock(FieldConfigHelper.class), limit)); + } + + @SuppressWarnings("ClassEscapesDefinedScope") + @ParameterizedTest + @EnumSource(CachedFieldConfigHelper.AttributeType.class) + public void testAttributeTypesDoNotThrow(CachedFieldConfigHelper.AttributeType attributeType) { + String fieldName = "test"; + FieldConfigHelper mockHelper = mock(FieldConfigHelper.class); + CachedFieldConfigHelper cachedHelper = new CachedFieldConfigHelper(mockHelper, 1); + cachedHelper.getFieldResult(attributeType, fieldName, (f) -> true); + } + + @Test + public void testCachingLimitsBetweenFieldsAndAttributeTypes() { + AtomicLong storedCounter = new AtomicLong(); + AtomicLong indexCounter = new AtomicLong(); + FieldConfigHelper innerHelper = mock(FieldConfigHelper.class); + CachedFieldConfigHelper helper = new CachedFieldConfigHelper(innerHelper, 2, true); + + when(innerHelper.isStoredField(any())).then((a) -> { + storedCounter.incrementAndGet(); + return true; + }); + + when(innerHelper.isIndexedField(any())).then((a) -> { + indexCounter.incrementAndGet(); + return true; + }); + + // following ensures that: + // 1. fields are computed, where appropriate per attribute-type + // 2. limit allows cache results to return + // 3. limit blocks results to return if exceeded + // 4. limit functions across attribute-types + + helper.getFieldResult(CachedFieldConfigHelper.AttributeType.STORED_FIELD, "field1", innerHelper::isStoredField); + assertEquals(1, storedCounter.get(), "field1 should compute result (new field)"); + assertFalse(helper.hasLimitExceeded()); + + helper.getFieldResult(CachedFieldConfigHelper.AttributeType.STORED_FIELD, "field1", innerHelper::isStoredField); + assertEquals(1, storedCounter.get(), "field1 repeated (existing field)"); + assertFalse(helper.hasLimitExceeded()); + + helper.getFieldResult(CachedFieldConfigHelper.AttributeType.STORED_FIELD, "field2", innerHelper::isStoredField); + assertEquals(2, storedCounter.get(), "field2 should compute result (new field)"); + assertFalse(helper.hasLimitExceeded()); + + helper.getFieldResult(CachedFieldConfigHelper.AttributeType.STORED_FIELD, "field2", innerHelper::isStoredField); + assertEquals(2, storedCounter.get(), "field2 repeated (existing)"); + assertFalse(helper.hasLimitExceeded()); + + helper.getFieldResult(CachedFieldConfigHelper.AttributeType.INDEXED_FIELD, "field1", innerHelper::isIndexedField); + assertEquals(1, indexCounter.get(), "field1 should compute result (new attribute)"); + assertFalse(helper.hasLimitExceeded()); + + helper.getFieldResult(CachedFieldConfigHelper.AttributeType.STORED_FIELD, "field3", innerHelper::isStoredField); + assertEquals(3, storedCounter.get(), "field3 exceeded limit (new field)"); + assertTrue(helper.hasLimitExceeded()); + + helper.getFieldResult(CachedFieldConfigHelper.AttributeType.STORED_FIELD, "field3", innerHelper::isStoredField); + assertEquals(3, storedCounter.get(), "field3 exceeded limit (existing field)"); + + // LRU map should evict field #2 + // we access field #1 above which has more accesses over field #2 + helper.getFieldResult(CachedFieldConfigHelper.AttributeType.STORED_FIELD, "field2", innerHelper::isStoredField); + assertEquals(4, storedCounter.get(), "field1 exceeded limit (new field/eviction)"); + } +}