Skip to content

Commit

Permalink
NIFI-13546: Ensure on startup that Content Repo archive directories a…
Browse files Browse the repository at this point in the history
…re created, if configured to do so, rather than creating them on demand. If IOException thrown when archiving file, delete it instead. Cleaned up some code duplication between remove(ResourceClaim) and archive(ResourceClaim) methods
  • Loading branch information
markap14 committed Jul 12, 2024
1 parent b5b61d9 commit 2051564
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 101 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,27 @@
*/
package org.apache.nifi.controller.repository;

import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.controller.repository.claim.ContentClaim;
import org.apache.nifi.controller.repository.claim.ResourceClaim;
import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
import org.apache.nifi.controller.repository.claim.StandardContentClaim;
import org.apache.nifi.controller.repository.io.ContentClaimOutputStream;
import org.apache.nifi.controller.repository.io.LimitedInputStream;
import org.apache.nifi.engine.FlowEngine;
import org.apache.nifi.events.EventReporter;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.reporting.Severity;
import org.apache.nifi.stream.io.ByteCountingOutputStream;
import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.stream.io.SynchronizedByteCountingOutputStream;
import org.apache.nifi.util.FormatUtils;
import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.util.StopWatch;
import org.apache.nifi.util.file.FileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.ByteArrayInputStream;
import java.io.Closeable;
import java.io.EOFException;
Expand All @@ -28,7 +49,6 @@
import java.io.OutputStream;
import java.nio.file.FileVisitResult;
import java.nio.file.Files;
import java.nio.file.NoSuchFileException;
import java.nio.file.Path;
import java.nio.file.SimpleFileVisitor;
import java.nio.file.StandardOpenOption;
Expand Down Expand Up @@ -57,26 +77,6 @@
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.regex.Pattern;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.controller.repository.claim.ContentClaim;
import org.apache.nifi.controller.repository.claim.ResourceClaim;
import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
import org.apache.nifi.controller.repository.claim.StandardContentClaim;
import org.apache.nifi.controller.repository.io.ContentClaimOutputStream;
import org.apache.nifi.controller.repository.io.LimitedInputStream;
import org.apache.nifi.engine.FlowEngine;
import org.apache.nifi.events.EventReporter;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.reporting.Severity;
import org.apache.nifi.stream.io.ByteCountingOutputStream;
import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.stream.io.SynchronizedByteCountingOutputStream;
import org.apache.nifi.util.FormatUtils;
import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.util.StopWatch;
import org.apache.nifi.util.file.FileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Is thread safe
Expand Down Expand Up @@ -145,10 +145,10 @@ public FileSystemRepository(final NiFiProperties nifiProperties) throws IOExcept
final long appendableClaimLengthCap = DataUnit.parseDataSize(APPENDABLE_CLAIM_LENGTH_CAP, DataUnit.B).longValue();
if (configuredAppendableClaimLength > appendableClaimLengthCap) {
LOG.warn("Configured property '{}' with value {} exceeds cap of {}. Setting value to {}",
NiFiProperties.MAX_APPENDABLE_CLAIM_SIZE,
configuredAppendableClaimLength,
APPENDABLE_CLAIM_LENGTH_CAP,
APPENDABLE_CLAIM_LENGTH_CAP);
NiFiProperties.MAX_APPENDABLE_CLAIM_SIZE,
configuredAppendableClaimLength,
APPENDABLE_CLAIM_LENGTH_CAP,
APPENDABLE_CLAIM_LENGTH_CAP);
this.maxAppendableClaimLength = appendableClaimLengthCap;
} else {
this.maxAppendableClaimLength = configuredAppendableClaimLength;
Expand All @@ -173,7 +173,7 @@ public FileSystemRepository(final NiFiProperties nifiProperties) throws IOExcept

if (maxArchiveSize == null) {
throw new RuntimeException("No value specified for property '"
+ NiFiProperties.CONTENT_ARCHIVE_MAX_USAGE_PERCENTAGE + "' but archiving is enabled. You must configure the max disk usage in order to enable archiving.");
+ NiFiProperties.CONTENT_ARCHIVE_MAX_USAGE_PERCENTAGE + "' but archiving is enabled. You must configure the max disk usage in order to enable archiving.");
}

if (!MAX_ARCHIVE_SIZE_PATTERN.matcher(maxArchiveSize.trim()).matches()) {
Expand Down Expand Up @@ -210,7 +210,7 @@ public FileSystemRepository(final NiFiProperties nifiProperties) throws IOExcept
final long maxArchiveBytes = (long) (capacity * (1D - (maxArchiveRatio - 0.02)));
minUsableContainerBytesForArchive.put(container.getKey(), maxArchiveBytes);
LOG.info("Maximum Threshold for Container {} set to {} bytes; if volume exceeds this size, archived data will be deleted until it no longer exceeds this size",
containerName, maxArchiveBytes);
containerName, maxArchiveBytes);

final long backPressureBytes = (long) (container.getValue().toFile().getTotalSpace() * archiveBackPressureRatio);
final ContainerState containerState = new ContainerState(containerName, true, backPressureBytes, capacity);
Expand Down Expand Up @@ -304,8 +304,11 @@ private synchronized void initializeRepository() throws IOException {
realPath = Files.createDirectories(containerPath).toRealPath();
}

// Ensure that the directory exists for the section, including the archive directory, if configured to archive data.
for (int i = 0; i < SECTIONS_PER_CONTAINER; i++) {
Files.createDirectories(realPath.resolve(String.valueOf(i)));
final Path sectionPath = realPath.resolve(String.valueOf(i));
final Path toCreate = archiveData ? sectionPath.resolve(ARCHIVE_DIR_NAME) : sectionPath;
Files.createDirectories(toCreate);
}

realPathMap.put(containerName, realPath);
Expand Down Expand Up @@ -727,33 +730,11 @@ public boolean remove(final ContentClaim claim) {
}

private boolean remove(final ResourceClaim claim) {
if (claim == null) {
return false;
}

// If the claim is still in use, we won't remove it.
if (claim.isInUse()) {
if (!cleanupResources(claim)) {
return false;
}

Path path = null;
try {
path = getPath(claim);
} catch (final ContentNotFoundException ignored) {
}

// Ensure that we have no writable claim streams for this resource claim
final ByteCountingOutputStream bcos = writableClaimStreams.remove(claim);
LOG.debug("Removed Stream {} for {} from writableClaimStreams because Resource Claim was removed", bcos, claim);

if (bcos != null) {
try {
bcos.close();
} catch (final IOException e) {
LOG.warn("Failed to close Output Stream for {} due to {}", claim, e);
}
}

final Path path = getPath(claim);
if (path != null) {
final File file = path.toFile();
if (!file.delete() && file.exists()) {
Expand Down Expand Up @@ -1131,6 +1112,25 @@ boolean archive(final ResourceClaim claim) throws IOException {
return false;
}

if (!cleanupResources(claim)) {
return false;
}

final Path curPath = getPath(claim);
if (curPath == null) {
return false;
}

final boolean archived = archive(curPath);
LOG.debug("Successfully moved {} to archive", claim);
return archived;
}

private boolean cleanupResources(final ResourceClaim claim) {
if (claim == null) {
return false;
}

if (claim.isInUse()) {
return false;
}
Expand All @@ -1140,7 +1140,7 @@ boolean archive(final ResourceClaim claim) throws IOException {
// claimant count is removed without writing to the claim (or more specifically, without closing the
// OutputStream that is returned when calling write() ).
final OutputStream out = writableClaimStreams.remove(claim);
LOG.debug("Removed {} for {} from writableClaimStreams because Resource Claim was archived", out, claim);
LOG.debug("Removed {} for {} from writableClaimStreams because Resource Claim was archived or removed", out, claim);

if (out != null) {
try {
Expand All @@ -1150,14 +1150,7 @@ boolean archive(final ResourceClaim claim) throws IOException {
}
}

final Path curPath = getPath(claim);
if (curPath == null) {
return false;
}

final boolean archived = archive(curPath);
LOG.debug("Successfully moved {} to archive", claim);
return archived;
return true;
}

protected int getOpenStreamCount() {
Expand All @@ -1183,26 +1176,8 @@ protected boolean archive(final Path curPath) throws IOException {
return false;
}

try {
Files.move(curPath, archivePath);
return true;
} catch (final NoSuchFileException nsfee) {
// If the current path exists, try to create archive path and do the move again.
// Otherwise, either the content was removed or has already been archived. Either way,
// there's nothing that can be done.
if (Files.exists(curPath)) {
// The archive directory doesn't exist. Create now and try operation again.
// We do it this way, rather than ensuring that the directory exists ahead of time because
// it will be rare for the directory not to exist and we would prefer to have the overhead
// of the Exception being thrown in these cases, rather than have the overhead of checking
// for the existence of the directory continually.
Files.createDirectories(archivePath.getParent());
Files.move(curPath, archivePath);
return true;
}

return false;
}
Files.move(curPath, archivePath);
return true;
}

private long getLastModTime(final File file) {
Expand Down Expand Up @@ -1466,7 +1441,12 @@ public void run() {
successCount++;
}
} catch (final Exception e) {
LOG.warn("Failed to archive {}", claim, e);
final boolean removed = remove(claim);
if (removed) {
LOG.warn("Failed to archive {} but was able to cleanup resources; removed file instead of archiving.", claim);
} else {
LOG.warn("Failed to archive {} and unable to remove file.", claim, e);
}
}
} else if (remove(claim)) {
successCount++;
Expand Down Expand Up @@ -1929,4 +1909,5 @@ public synchronized ContentClaim newContentClaim() throws IOException {
return scc;
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,24 @@
*/
package org.apache.nifi.controller.repository;

import org.apache.nifi.controller.repository.claim.ContentClaim;
import org.apache.nifi.controller.repository.claim.ResourceClaim;
import org.apache.nifi.controller.repository.claim.StandardContentClaim;
import org.apache.nifi.controller.repository.claim.StandardResourceClaim;
import org.apache.nifi.controller.repository.claim.StandardResourceClaimManager;
import org.apache.nifi.controller.repository.util.DiskUtils;
import org.apache.nifi.events.EventReporter;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.util.NiFiProperties;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.condition.DisabledOnOs;
import org.junit.jupiter.api.condition.OS;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
Expand All @@ -39,23 +57,6 @@
import java.util.Map;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import org.apache.nifi.controller.repository.claim.ContentClaim;
import org.apache.nifi.controller.repository.claim.ResourceClaim;
import org.apache.nifi.controller.repository.claim.StandardContentClaim;
import org.apache.nifi.controller.repository.claim.StandardResourceClaim;
import org.apache.nifi.controller.repository.claim.StandardResourceClaimManager;
import org.apache.nifi.controller.repository.util.DiskUtils;
import org.apache.nifi.events.EventReporter;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.util.NiFiProperties;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.condition.DisabledOnOs;
import org.junit.jupiter.api.condition.OS;

import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
Expand Down Expand Up @@ -179,7 +180,6 @@ public void testArchivedClaimRemovedDueToAge() throws IOException, InterruptedEx
// Perform a few iterations to ensure that it works not just the first time, since there is a lot of logic on initialization.
for (int i = 0; i < 3; i++) {
final File archiveDir = containerPath.resolve(String.valueOf(i)).resolve("archive").toFile();
assertTrue(archiveDir.mkdirs());
final File archivedFile = new File(archiveDir, "1234");

try (final OutputStream fos = new FileOutputStream(archivedFile)) {
Expand Down Expand Up @@ -209,7 +209,6 @@ public void testArchivedClaimRemovedDueToDiskUsage() throws IOException, Interru
// Perform a few iterations to ensure that it works not just the first time, since there is a lot of logic on initialization.
for (int i = 0; i < 3; i++) {
final File archiveDir = containerPath.resolve(String.valueOf(i)).resolve("archive").toFile();
assertTrue(archiveDir.mkdirs());
final File archivedFile = new File(archiveDir, "1234");

try (final OutputStream fos = new FileOutputStream(archivedFile)) {
Expand Down

0 comments on commit 2051564

Please sign in to comment.