Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HDDS-12079. Improve container merkle tree write handling #7694

Open
wants to merge 5 commits into
base: HDDS-10239-container-reconciliation
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public void stop() {
* file remains unchanged.
* Concurrent writes to the same file are coordinated internally.
*/
public void writeContainerDataTree(ContainerData data, ContainerMerkleTree tree) throws IOException {
public void writeContainerDataTree(ContainerData data, ContainerMerkleTreeWriter tree) throws IOException {
long containerID = data.getContainerID();
Lock writeLock = getLock(containerID);
writeLock.lock();
Expand Down Expand Up @@ -216,7 +216,7 @@ private void compareContainerMerkleTree(ContainerProtos.ContainerChecksumInfo th
// TODO: HDDS-11765 - Handle missed block deletions from the deleted block ids.
if (!thisDeletedBlockSet.contains(thisBlockMerkleTree.getBlockID()) &&
!peerDeletedBlockSet.contains(thisBlockMerkleTree.getBlockID()) &&
thisBlockMerkleTree.getBlockChecksum() != peerBlockMerkleTree.getBlockChecksum()) {
thisBlockMerkleTree.getDataChecksum() != peerBlockMerkleTree.getDataChecksum()) {
compareBlockMerkleTree(thisBlockMerkleTree, peerBlockMerkleTree, report);
}
thisIdx++;
Expand Down Expand Up @@ -267,7 +267,7 @@ private void compareBlockMerkleTree(ContainerProtos.BlockMerkleTree thisBlockMer
// thisTree = Unhealthy, peerTree = Healthy -> Add to corrupt chunk.
// thisTree = Healthy, peerTree = unhealthy -> Do nothing as thisTree is healthy.
// thisTree = Unhealthy, peerTree = Unhealthy -> Do Nothing as both are corrupt.
if (thisChunkMerkleTree.getChunkChecksum() != peerChunkMerkleTree.getChunkChecksum() &&
if (thisChunkMerkleTree.getDataChecksum() != peerChunkMerkleTree.getDataChecksum() &&
!thisChunkMerkleTree.getIsHealthy() && peerChunkMerkleTree.getIsHealthy()) {
report.addCorruptChunk(peerBlockMerkleTree.getBlockID(), peerChunkMerkleTree);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import java.util.TreeMap;

/**
* This class represents a Merkle tree that provides one checksum for all data within a container.
* This class constructs a Merkle tree that provides one checksum for all data within a container.
*
* As the leaves of the tree, a checksum for each chunk is computed by taking a checksum of all checksums within that
* chunk. Each chunk checksum in a block is further checksummed together to generate the block level checksum. Finally,
Expand All @@ -37,17 +37,17 @@
*
* This class can be used to construct a consistent and completely filled {@link ContainerProtos.ContainerMerkleTree}
* object. It allows building a container merkle tree from scratch by incrementally adding chunks.
* The final checksums at higher levels of the tree are not calculated until
* {@link ContainerMerkleTree#toProto} is called.
* The final checksums above the leaf levels of the tree are not calculated until
* {@link ContainerMerkleTreeWriter#toProto} is called.
*/
public class ContainerMerkleTree {
public class ContainerMerkleTreeWriter {

private final SortedMap<Long, BlockMerkleTree> id2Block;
private final SortedMap<Long, BlockMerkleTreeWriter> id2Block;

/**
* Constructs an empty Container merkle tree object.
*/
public ContainerMerkleTree() {
public ContainerMerkleTreeWriter() {
id2Block = new TreeMap<>();
}

Expand All @@ -59,7 +59,7 @@ public ContainerMerkleTree() {
* @param chunks A list of chunks to add to this block. The chunks will be sorted internally by their offset.
*/
public void addChunks(long blockID, Collection<ContainerProtos.ChunkInfo> chunks) {
id2Block.computeIfAbsent(blockID, BlockMerkleTree::new).addChunks(chunks);
id2Block.computeIfAbsent(blockID, BlockMerkleTreeWriter::new).addChunks(chunks);
}

/**
Expand All @@ -74,11 +74,11 @@ public ContainerProtos.ContainerMerkleTree toProto() {
ChecksumByteBuffer checksumImpl = ChecksumByteBufferFactory.crc32Impl();
ByteBuffer containerChecksumBuffer = ByteBuffer.allocate(Long.BYTES * id2Block.size());

for (BlockMerkleTree blockTree: id2Block.values()) {
for (BlockMerkleTreeWriter blockTree: id2Block.values()) {
ContainerProtos.BlockMerkleTree blockTreeProto = blockTree.toProto();
containerTreeBuilder.addBlockMerkleTree(blockTreeProto);
// Add the block's checksum to the buffer that will be used to calculate the container checksum.
containerChecksumBuffer.putLong(blockTreeProto.getBlockChecksum());
containerChecksumBuffer.putLong(blockTreeProto.getDataChecksum());
}
containerChecksumBuffer.flip();
checksumImpl.update(containerChecksumBuffer);
Expand All @@ -89,15 +89,15 @@ public ContainerProtos.ContainerMerkleTree toProto() {
}

/**
* Represents a merkle tree for a single block within a container.
* Constructs a merkle tree for a single block within a container.
*/
private static class BlockMerkleTree {
private static class BlockMerkleTreeWriter {
// Map of each offset within the block to its chunk info.
// Chunk order in the checksum is determined by their offset.
private final SortedMap<Long, ChunkMerkleTree> offset2Chunk;
private final SortedMap<Long, ChunkMerkleTreeWriter> offset2Chunk;
private final long blockID;

BlockMerkleTree(long blockID) {
BlockMerkleTreeWriter(long blockID) {
this.blockID = blockID;
this.offset2Chunk = new TreeMap<>();
}
Expand All @@ -110,7 +110,7 @@ private static class BlockMerkleTree {
*/
public void addChunks(Collection<ContainerProtos.ChunkInfo> chunks) {
for (ContainerProtos.ChunkInfo chunk: chunks) {
offset2Chunk.put(chunk.getOffset(), new ChunkMerkleTree(chunk));
offset2Chunk.put(chunk.getOffset(), new ChunkMerkleTreeWriter(chunk));
}
}

Expand All @@ -125,37 +125,46 @@ public ContainerProtos.BlockMerkleTree toProto() {
ChecksumByteBuffer checksumImpl = ChecksumByteBufferFactory.crc32Impl();
ByteBuffer blockChecksumBuffer = ByteBuffer.allocate(Long.BYTES * offset2Chunk.size());

for (ChunkMerkleTree chunkTree: offset2Chunk.values()) {
for (ChunkMerkleTreeWriter chunkTree: offset2Chunk.values()) {
// Ordering of checksums within a chunk is assumed to be in the order they are written.
// This assumption is already built in to the code that reads and writes the values (see
// ChunkInputStream#validateChunk for an example on the client read path).
// There is no other value we can use to sort these checksums, so we assume the stored proto has them in the
// correct order.
ContainerProtos.ChunkMerkleTree chunkTreeProto = chunkTree.toProto();
blockTreeBuilder.addChunkMerkleTree(chunkTreeProto);
blockChecksumBuffer.putLong(chunkTreeProto.getChunkChecksum());
blockChecksumBuffer.putLong(chunkTreeProto.getDataChecksum());
}
blockChecksumBuffer.flip();
checksumImpl.update(blockChecksumBuffer);

return blockTreeBuilder
.setBlockID(blockID)
.setBlockChecksum(checksumImpl.getValue())
.setDataChecksum(checksumImpl.getValue())
.build();
}
}

/**
* Represents a merkle tree for a single chunk within a container.
* Constructs a merkle tree for a single chunk within a container.
* Each chunk has multiple checksums within it at each "bytesPerChecksum" interval.
* This class computes one checksum for the whole chunk by aggregating these.
*/
private static class ChunkMerkleTree {
private ContainerProtos.ChunkInfo chunk;
private boolean isHealthy = true;

ChunkMerkleTree(ContainerProtos.ChunkInfo chunk) {
this.chunk = chunk;
private static class ChunkMerkleTreeWriter {
private final long length;
private final long offset;
private final boolean isHealthy;
private final long dataChecksum;

ChunkMerkleTreeWriter(ContainerProtos.ChunkInfo chunk) {
length = chunk.getLen();
offset = chunk.getOffset();
isHealthy = true;
ChecksumByteBuffer checksumImpl = ChecksumByteBufferFactory.crc32Impl();
for (ByteString checksum: chunk.getChecksumData().getChecksumsList()) {
checksumImpl.update(checksum.asReadOnlyByteBuffer());
}
this.dataChecksum = checksumImpl.getValue();
}

/**
Expand All @@ -165,16 +174,11 @@ private static class ChunkMerkleTree {
* @return A complete protobuf representation of this chunk as a leaf in the container merkle tree.
*/
public ContainerProtos.ChunkMerkleTree toProto() {
ChecksumByteBuffer checksumImpl = ChecksumByteBufferFactory.crc32Impl();
for (ByteString checksum: chunk.getChecksumData().getChecksumsList()) {
checksumImpl.update(checksum.asReadOnlyByteBuffer());
}

return ContainerProtos.ChunkMerkleTree.newBuilder()
.setOffset(chunk.getOffset())
.setLength(chunk.getLen())
.setOffset(offset)
.setLength(length)
.setIsHealthy(isHealthy)
.setChunkChecksum(checksumImpl.getValue())
.setDataChecksum(dataChecksum)
.build();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import org.apache.hadoop.ozone.common.Checksum;
import org.apache.hadoop.ozone.common.ChecksumData;
import org.apache.hadoop.ozone.common.OzoneChecksumException;
import org.apache.hadoop.ozone.container.checksum.ContainerMerkleTree;
import org.apache.hadoop.ozone.container.checksum.ContainerMerkleTreeWriter;
import org.apache.hadoop.ozone.container.common.helpers.BlockData;
import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
Expand Down Expand Up @@ -193,7 +193,7 @@ public DataScanResult fullCheck(DataTransferThrottler throttler, Canceler cancel
LOG.debug("Running data checks for container {}", containerID);
try {
// TODO HDDS-10374 this tree will get updated with the container's contents as it is scanned.
ContainerMerkleTree dataTree = new ContainerMerkleTree();
ContainerMerkleTreeWriter dataTree = new ContainerMerkleTreeWriter();
List<ContainerScanError> dataErrors = scanData(dataTree, throttler, canceler);
if (containerIsDeleted()) {
return DataScanResult.deleted();
Expand All @@ -209,8 +209,8 @@ public DataScanResult fullCheck(DataTransferThrottler throttler, Canceler cancel
}
}

private List<ContainerScanError> scanData(ContainerMerkleTree currentTree, DataTransferThrottler throttler,
Canceler canceler) {
private List<ContainerScanError> scanData(ContainerMerkleTreeWriter currentTree, DataTransferThrottler throttler,
Canceler canceler) {
Preconditions.checkState(containerDataFromDisk != null,
"invoke loadContainerData prior to calling this function");

Expand Down Expand Up @@ -341,7 +341,7 @@ private boolean blockInDBWithLock(DBHandle db, BlockData block)
}

private List<ContainerScanError> scanBlock(DBHandle db, File dbFile, BlockData block,
DataTransferThrottler throttler, Canceler canceler, ContainerMerkleTree currentTree) {
DataTransferThrottler throttler, Canceler canceler, ContainerMerkleTreeWriter currentTree) {
ContainerLayoutVersion layout = containerDataFromDisk.getLayoutVersion();

List<ContainerScanError> blockErrors = new ArrayList<>();
Expand Down Expand Up @@ -418,7 +418,7 @@ private List<ContainerScanError> scanBlock(DBHandle db, File dbFile, BlockData b
@SuppressWarnings("checkstyle:ParameterNumber")
private static List<ContainerScanError> verifyChecksum(BlockData block,
ContainerProtos.ChunkInfo chunk, File chunkFile, ContainerLayoutVersion layout, ByteBuffer buffer,
ContainerMerkleTree currentTree, DataTransferThrottler throttler, Canceler canceler) {
ContainerMerkleTreeWriter currentTree, DataTransferThrottler throttler, Canceler canceler) {

List<ContainerScanError> scanErrors = new ArrayList<>();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@
import org.apache.hadoop.ozone.common.utils.BufferUtils;
import org.apache.hadoop.ozone.container.checksum.ContainerChecksumTreeManager;
import org.apache.hadoop.ozone.container.checksum.DNContainerOperationClient;
import org.apache.hadoop.ozone.container.checksum.ContainerMerkleTree;
import org.apache.hadoop.ozone.container.checksum.ContainerMerkleTreeWriter;
import org.apache.hadoop.ozone.container.common.helpers.BlockData;
import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
Expand Down Expand Up @@ -584,7 +584,7 @@ private void createContainerMerkleTree(Container container) {

try {
KeyValueContainerData containerData = (KeyValueContainerData) container.getContainerData();
ContainerMerkleTree merkleTree = new ContainerMerkleTree();
ContainerMerkleTreeWriter merkleTree = new ContainerMerkleTreeWriter();
try (DBHandle dbHandle = BlockUtils.getDB(containerData, conf);
BlockIterator<BlockData> blockIterator = dbHandle.getStore().
getBlockIterator(containerData.getContainerID())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.hadoop.ozone.container.ozoneimpl;

import com.google.common.base.Preconditions;
import org.apache.hadoop.ozone.container.checksum.ContainerMerkleTree;
import org.apache.hadoop.ozone.container.checksum.ContainerMerkleTreeWriter;

import java.util.Collections;
import java.util.List;
Expand All @@ -31,12 +31,12 @@
*/
public final class DataScanResult extends MetadataScanResult {

private final ContainerMerkleTree dataTree;
private final ContainerMerkleTreeWriter dataTree;
// Only deleted results can be interned. Healthy results will still have different trees.
private static final DataScanResult DELETED = new DataScanResult(Collections.emptyList(),
new ContainerMerkleTree(), true);
new ContainerMerkleTreeWriter(), true);

private DataScanResult(List<ContainerScanError> errors, ContainerMerkleTree dataTree, boolean deleted) {
private DataScanResult(List<ContainerScanError> errors, ContainerMerkleTreeWriter dataTree, boolean deleted) {
super(errors, deleted);
this.dataTree = dataTree;
}
Expand All @@ -47,7 +47,7 @@ private DataScanResult(List<ContainerScanError> errors, ContainerMerkleTree data
*/
public static DataScanResult unhealthyMetadata(MetadataScanResult result) {
Preconditions.checkArgument(!result.isHealthy());
return new DataScanResult(result.getErrors(), new ContainerMerkleTree(), false);
return new DataScanResult(result.getErrors(), new ContainerMerkleTreeWriter(), false);
}

/**
Expand All @@ -60,11 +60,11 @@ public static DataScanResult deleted() {
/**
* Constructs a data scan result whose health will be determined based on the presence of errors.
*/
public static DataScanResult fromErrors(List<ContainerScanError> errors, ContainerMerkleTree dataTree) {
public static DataScanResult fromErrors(List<ContainerScanError> errors, ContainerMerkleTreeWriter dataTree) {
return new DataScanResult(errors, dataTree, false);
}

public ContainerMerkleTree getDataTree() {
public ContainerMerkleTreeWriter getDataTree() {
return dataTree;
}
}
Loading
Loading