Skip to content

Commit

Permalink
NIFI-13796: When writing to a content claim and its size + offset exc…
Browse files Browse the repository at this point in the history
…eeds the max appendable size, do not write to that content claim again
  • Loading branch information
markap14 committed Sep 23, 2024
1 parent 150cba8 commit d46e0b7
Show file tree
Hide file tree
Showing 8 changed files with 85 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,7 @@ public class NiFiProperties extends ApplicationProperties {
public static final String DEFAULT_NAR_LIBRARY_DIR = "./lib";
public static final String DEFAULT_NAR_LIBRARY_AUTOLOAD_DIR = "./extensions";
public static final String DEFAULT_FLOWFILE_CHECKPOINT_INTERVAL = "20 secs";
public static final String DEFAULT_MAX_APPENDABLE_CLAIM_SIZE = "1 MB";
public static final String DEFAULT_MAX_APPENDABLE_CLAIM_SIZE = "50 KB";
public static final int DEFAULT_QUEUE_SWAP_THRESHOLD = 20000;
public static final long DEFAULT_BACKPRESSURE_COUNT = 10_000L;
public static final String DEFAULT_BACKPRESSURE_SIZE = "1 GB";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@
import org.apache.nifi.parameter.ParameterContextManager;
import org.apache.nifi.parameter.ParameterProvider;
import org.apache.nifi.parameter.StandardParameterContextManager;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.Processor;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.StandardProcessContext;
Expand Down Expand Up @@ -542,7 +543,9 @@ private FlowController(
processScheduler = new StandardProcessScheduler(timerDrivenEngineRef.get(), this, stateManagerProvider, this.nifiProperties, lifecycleStateManager);

parameterContextManager = new StandardParameterContextManager();
repositoryContextFactory = new RepositoryContextFactory(contentRepository, flowFileRepository, flowFileEventRepository, counterRepositoryRef.get(), provenanceRepository, stateManagerProvider);
final long maxAppendableBytes = getMaxAppendableBytes();
repositoryContextFactory = new RepositoryContextFactory(contentRepository, flowFileRepository, flowFileEventRepository,
counterRepositoryRef.get(), provenanceRepository, stateManagerProvider, maxAppendableBytes);
assetManager = createAssetManager(nifiProperties);

this.flowAnalysisThreadPool = new FlowEngine(1, "Background Flow Analysis", true);
Expand Down Expand Up @@ -1044,8 +1047,9 @@ public void initializeFlow(final QueueProvider queueProvider) throws IOException
flowFileRepository.updateMaxFlowFileIdentifier(maxIdFromSwapFiles + 1);

// Begin expiring FlowFiles that are old
final long maxAppendableClaimBytes = getMaxAppendableBytes();
final RepositoryContextFactory contextFactory = new RepositoryContextFactory(contentRepository, flowFileRepository,
flowFileEventRepository, counterRepositoryRef.get(), provenanceRepository, stateManagerProvider);
flowFileEventRepository, counterRepositoryRef.get(), provenanceRepository, stateManagerProvider, maxAppendableClaimBytes);
processScheduler.scheduleFrameworkTask(new ExpireFlowFiles(this, contextFactory), "Expire FlowFiles", 30L, 30L, TimeUnit.SECONDS);

// now that we've loaded the FlowFiles, this has restored our ContentClaims' states, so we can tell the
Expand Down Expand Up @@ -1096,6 +1100,12 @@ public void run() {
}
}

private long getMaxAppendableBytes() {
final String maxAppendableClaimSize = nifiProperties.getMaxAppendableClaimSize();
final long maxAppendableClaimBytes = DataUnit.parseDataSize(maxAppendableClaimSize, DataUnit.B).longValue();
return maxAppendableClaimBytes;
}

private void notifyComponentsConfigurationRestored() {
for (final ProcessorNode procNode : flowManager.getRootGroup().findAllProcessors()) {
final Processor processor = procNode.getProcessor();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,17 @@

public class StandardRepositoryContext extends AbstractRepositoryContext implements RepositoryContext {

private final long maxAppendableClaimBytes;

public StandardRepositoryContext(final Connectable connectable, final AtomicLong connectionIndex, final ContentRepository contentRepository, final FlowFileRepository flowFileRepository,
final FlowFileEventRepository flowFileEventRepository, final CounterRepository counterRepository, final ProvenanceEventRepository provenanceRepository,
final StateManager stateManager) {
final StateManager stateManager, final long maxAppendableClaimBytes) {
super(connectable, connectionIndex, contentRepository, flowFileRepository, flowFileEventRepository, counterRepository, provenanceRepository, stateManager);
this.maxAppendableClaimBytes = maxAppendableClaimBytes;
}

@Override
public ContentClaimWriteCache createContentClaimWriteCache(final PerformanceTracker performanceTracker) {
return new StandardContentClaimWriteCache(getContentRepository(), performanceTracker);
return new StandardContentClaimWriteCache(getContentRepository(), performanceTracker, maxAppendableClaimBytes, 8192);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@

package org.apache.nifi.controller.repository.claim;

import org.apache.nifi.controller.repository.io.ContentClaimOutputStream;
import org.apache.nifi.controller.repository.ContentRepository;
import org.apache.nifi.controller.repository.io.ContentClaimOutputStream;
import org.apache.nifi.controller.repository.metrics.PerformanceTracker;
import org.apache.nifi.controller.repository.metrics.PerformanceTrackingOutputStream;

Expand All @@ -35,15 +35,13 @@ public class StandardContentClaimWriteCache implements ContentClaimWriteCache {
private final Map<ResourceClaim, MappedOutputStream> streamMap = new ConcurrentHashMap<>();
private final Queue<ContentClaim> queue = new LinkedList<>();
private final PerformanceTracker performanceTracker;
private final long maxAppendableClaimBytes;
private final int bufferSize;

public StandardContentClaimWriteCache(final ContentRepository contentRepo, final PerformanceTracker performanceTracker) {
this(contentRepo, performanceTracker, 8192);
}

public StandardContentClaimWriteCache(final ContentRepository contentRepo, final PerformanceTracker performanceTracker, final int bufferSize) {
public StandardContentClaimWriteCache(final ContentRepository contentRepo, final PerformanceTracker performanceTracker, final long maxAppendableClaimBytes, final int bufferSize) {
this.contentRepo = contentRepo;
this.performanceTracker = performanceTracker;
this.maxAppendableClaimBytes = maxAppendableClaimBytes;
this.bufferSize = bufferSize;
}

Expand Down Expand Up @@ -154,7 +152,10 @@ public void close() {
scc.setLength(0L);
}

queue.offer(claim);
// Add the claim back to the queue if it is still writable
if ((scc.getOffset() + scc.getLength()) < maxAppendableClaimBytes) {
queue.offer(claim);
}
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,22 +37,25 @@ public class RepositoryContextFactory {
private final CounterRepository counterRepo;
private final ProvenanceRepository provenanceRepo;
private final StateManagerProvider stateManagerProvider;
private final long maxAppendableClaimBytes;

public RepositoryContextFactory(final ContentRepository contentRepository, final FlowFileRepository flowFileRepository,
final FlowFileEventRepository flowFileEventRepository, final CounterRepository counterRepository,
final ProvenanceRepository provenanceRepository, final StateManagerProvider stateManagerProvider) {
final ProvenanceRepository provenanceRepository, final StateManagerProvider stateManagerProvider,
final long maxAppendableClaimBytes) {

this.contentRepo = contentRepository;
this.flowFileRepo = flowFileRepository;
this.flowFileEventRepo = flowFileEventRepository;
this.counterRepo = counterRepository;
this.provenanceRepo = provenanceRepository;
this.stateManagerProvider = stateManagerProvider;
this.maxAppendableClaimBytes = maxAppendableClaimBytes;
}

public RepositoryContext newProcessContext(final Connectable connectable, final AtomicLong connectionIndex) {
final StateManager stateManager = stateManagerProvider.getStateManager(connectable.getIdentifier());
return new StandardRepositoryContext(connectable, connectionIndex, contentRepo, flowFileRepo, flowFileEventRepo, counterRepo, provenanceRepo, stateManager);
return new StandardRepositoryContext(connectable, connectionIndex, contentRepo, flowFileRepo, flowFileEventRepo, counterRepo, provenanceRepo, stateManager, maxAppendableClaimBytes);
}

public ContentRepository getContentRepository() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,8 @@ public Set<Connection> answer(final InvocationOnMock invocation) throws Throwabl
stateManager = new MockStateManager(connectable);
stateManager.setIgnoreAnnotations(true);

context = new StandardRepositoryContext(connectable, new AtomicLong(0L), contentRepo, flowFileRepo, flowFileEventRepository, counterRepository, provenanceRepo, stateManager);
context = new StandardRepositoryContext(connectable, new AtomicLong(0L), contentRepo, flowFileRepo, flowFileEventRepository,
counterRepository, provenanceRepo, stateManager, 50_000L);
session = new StandardProcessSession(context, () -> false, new NopPerformanceTracker());
}

Expand Down Expand Up @@ -3134,7 +3135,8 @@ public StandardProcessSession createSessionForRetry(final Connectable connectabl
flowFileEventRepository,
counterRepository,
provenanceRepo,
stateManager);
stateManager,
50_000L);
return new StandardProcessSession(context, () -> false, new NopPerformanceTracker());

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,21 +33,22 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Random;

import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;

public class TestContentClaimWriteCache {
public class TestStandardContentClaimWriteCache {

private FileSystemRepository repository = null;
private StandardResourceClaimManager claimManager = null;
private final File rootFile = new File("target/testContentClaimWriteCache");
private NiFiProperties nifiProperties;

@BeforeEach
public void setup() throws IOException {
nifiProperties = NiFiProperties.createBasicNiFiProperties(TestFileSystemRepository.class.getResource("/conf/nifi.properties").getFile());
NiFiProperties nifiProperties = NiFiProperties.createBasicNiFiProperties(TestFileSystemRepository.class.getResource("/conf/nifi.properties").getFile());
if (rootFile.exists()) {
DiskUtils.deleteRecursively(rootFile);
}
Expand All @@ -64,7 +65,7 @@ public void shutdown() throws IOException {

@Test
public void testFlushWriteCorrectData() throws IOException {
final ContentClaimWriteCache cache = new StandardContentClaimWriteCache(repository, new NopPerformanceTracker(), 4);
final ContentClaimWriteCache cache = new StandardContentClaimWriteCache(repository, new NopPerformanceTracker(), 50_000L, 4);

final ContentClaim claim1 = cache.getContentClaim();
assertNotNull(claim1);
Expand Down Expand Up @@ -97,4 +98,48 @@ public void testFlushWriteCorrectData() throws IOException {
assertArrayEquals("good-dayhello".getBytes(), buff2);
}

@Test
public void testWriteLargeRollsOverToNewFileOnNext() throws IOException {
final ContentClaimWriteCache cache = new StandardContentClaimWriteCache(repository, new NopPerformanceTracker(), 50_000L, 4);

final ContentClaim claim1 = cache.getContentClaim();
assertNotNull(claim1);

try (final OutputStream out = cache.write(claim1)) {
assertNotNull(out);
out.write("hello".getBytes());
out.write("good-bye".getBytes());

cache.flush();
}

final ContentClaim claim2 = cache.getContentClaim();
assertEquals(claim1.getResourceClaim(), claim2.getResourceClaim());

try (final OutputStream out = cache.write(claim2)) {
assertNotNull(out);
out.write("greeting".getBytes());
}

final ContentClaim claim3 = cache.getContentClaim();
assertEquals(claim1.getResourceClaim(), claim3.getResourceClaim());

// Write 1 MB to the claim. This should result in the next Content Claim having a different Resource Claim.
try (final OutputStream out = cache.write(claim3)) {
assertNotNull(out);
final byte[] buffer = new byte[1024 * 1024];
final Random random = new Random();
random.nextBytes(buffer);
out.write(buffer);
}

assertEquals(3, claimManager.getClaimantCount(claim1.getResourceClaim()));

final ContentClaim claim4 = cache.getContentClaim();
assertNotNull(claim4);
assertNotEquals(claim1.getResourceClaim(), claim4.getResourceClaim());

assertEquals(1, claimManager.getClaimantCount(claim4.getResourceClaim()));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.net.ssl.SSLContext;
import java.io.File;
import java.io.IOException;
import java.time.Duration;
Expand All @@ -97,6 +96,7 @@
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import javax.net.ssl.SSLContext;

public class StandardStatelessDataflowFactory implements StatelessDataflowFactory {
private static final Logger logger = LoggerFactory.getLogger(StandardStatelessDataflowFactory.class);
Expand Down

0 comments on commit d46e0b7

Please sign in to comment.