Skip to content

Commit

Permalink
Avoid double buffering direct IO index input slices with BufferedInde…
Browse files Browse the repository at this point in the history
…xInput (#14103)

This commit avoids double buffering direct IO index input slices with BufferedIndexInput.

Currently BufferedIndexInput is used for slicing, since it will handle the initial offset and length, but this adds an extra layer of buffering - the buffer in buffered index input as well as the buffer in direct IO index input. This change reflows direct IO index input so that it can handle an offset and length, so can be its own implementation for slices.

Existing tests covered this, but I found case where a clone of a slice was not covered. I added a small change to the base directory test case which covers this.
  • Loading branch information
ChrisHegarty committed Jan 25, 2025
1 parent 059d8fe commit bd12e07
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.Arrays;
import java.util.Objects;
import java.util.OptionalLong;
import java.util.zip.CRC32;
import java.util.zip.Checksum;
Expand Down Expand Up @@ -298,9 +299,10 @@ private static final class DirectIOIndexInput extends IndexInput {
private final ByteBuffer buffer;
private final FileChannel channel;
private final int blockSize;

private final long offset;
private final long length;
private final boolean isClosable; // clones and slices are not closable
private boolean isOpen;
private boolean isClone;
private long filePos;

/**
Expand All @@ -313,31 +315,32 @@ private static final class DirectIOIndexInput extends IndexInput {
*/
public DirectIOIndexInput(Path path, int blockSize, int bufferSize) throws IOException {
super("DirectIOIndexInput(path=\"" + path + "\")");
this.blockSize = blockSize;

this.channel = FileChannel.open(path, StandardOpenOption.READ, getDirectOpenOption());
this.blockSize = blockSize;
this.buffer = allocateBuffer(bufferSize, blockSize);

isOpen = true;
isClone = false;
filePos = -bufferSize;
buffer.limit(0);
this.isOpen = true;
this.isClosable = true;
this.length = channel.size();
this.offset = 0L;
this.filePos = -bufferSize;
this.buffer.limit(0);
}

// for clone
private DirectIOIndexInput(DirectIOIndexInput other) throws IOException {
super(other.toString());
this.channel = other.channel;
this.blockSize = other.blockSize;

// for clone/slice
private DirectIOIndexInput(
String description, DirectIOIndexInput other, long offset, long length) throws IOException {
super(description);
Objects.checkFromIndexSize(offset, length, other.channel.size());
final int bufferSize = other.buffer.capacity();
this.buffer = allocateBuffer(bufferSize, blockSize);

isOpen = true;
isClone = true;
filePos = -bufferSize;
this.buffer = allocateBuffer(bufferSize, other.blockSize);
this.blockSize = other.blockSize;
this.channel = other.channel;
this.isOpen = true;
this.isClosable = false;
this.length = length;
this.offset = offset;
this.filePos = -bufferSize;
buffer.limit(0);
seek(other.getFilePointer());
}

private static ByteBuffer allocateBuffer(int bufferSize, int blockSize) {
Expand All @@ -348,44 +351,46 @@ private static ByteBuffer allocateBuffer(int bufferSize, int blockSize) {

@Override
public void close() throws IOException {
if (isOpen && !isClone) {
if (isOpen && isClosable) {
channel.close();
isOpen = false;
}
}

@Override
public long getFilePointer() {
long filePointer = filePos + buffer.position();
long filePointer = filePos + buffer.position() - offset;

// opening the input and immediately calling getFilePointer without calling readX (and thus
// refill) first,
// will result in negative value equal to bufferSize being returned,
// due to the initialization method filePos = -bufferSize used in constructor.
assert filePointer == -buffer.capacity() || filePointer >= 0
assert filePointer == -buffer.capacity() - offset || filePointer >= 0
: "filePointer should either be initial value equal to negative buffer capacity, or larger than or equal to 0";
return Math.max(filePointer, 0);
}

@Override
public void seek(long pos) throws IOException {
if (pos != getFilePointer()) {
final long alignedPos = pos - (pos % blockSize);
filePos = alignedPos - buffer.capacity();

final int delta = (int) (pos - alignedPos);
refill(delta);
buffer.position(delta);
seekInternal(pos);
}
assert pos == getFilePointer();
}

private void seekInternal(long pos) throws IOException {
final long absPos = pos + offset;
final long alignedPos = absPos - (absPos % blockSize);
filePos = alignedPos - buffer.capacity();

final int delta = (int) (absPos - alignedPos);
refill(delta);
buffer.position(delta);
}

@Override
public long length() {
try {
return channel.size();
} catch (IOException ioe) {
throw new UncheckedIOException(ioe);
}
return length;
}

@Override
Expand Down Expand Up @@ -429,7 +434,7 @@ private void refill(int bytesToRead) throws IOException {

// BaseDirectoryTestCase#testSeekPastEOF test for consecutive read past EOF,
// hence throwing EOFException early to maintain buffer state (position in particular)
if (filePos > channel.size() || (channel.size() - filePos < bytesToRead)) {
if (filePos > offset + length || ((offset + length) - filePos < bytesToRead)) {
throw new EOFException("read past EOF: " + this);
}

Expand Down Expand Up @@ -523,16 +528,23 @@ public void readLongs(long[] dst, int offset, int len) throws IOException {
@Override
public DirectIOIndexInput clone() {
try {
return new DirectIOIndexInput(this);
var clone = new DirectIOIndexInput("clone:" + this, this, offset, length);
clone.seekInternal(getFilePointer());
return clone;
} catch (IOException ioe) {
throw new UncheckedIOException(ioe);
}
}

@Override
public IndexInput slice(String sliceDescription, long offset, long length) {
// TODO: is this the right thing to do?
return BufferedIndexInput.wrap(sliceDescription, this, offset, length);
public IndexInput slice(String sliceDescription, long offset, long length) throws IOException {
if ((length | offset) < 0 || length > this.length - offset) {
throw new IllegalArgumentException(
"slice() " + sliceDescription + " out of bounds: " + this);
}
var slice = new DirectIOIndexInput(sliceDescription, this, this.offset + offset, length);
slice.seekInternal(0L);
return slice;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1206,6 +1206,9 @@ public void testSliceOfSlice() throws Exception {
slice1.seek(TestUtil.nextLong(random(), 0, slice1.length()));
for (int j = 0; j < slice1.length(); j += 16) {
IndexInput slice2 = slice1.slice("slice2", j, num - i - j);
if (random().nextBoolean()) {
slice2 = slice2.clone(); // clone shouldn't impact slice data
}
assertEquals(0, slice2.getFilePointer());
assertEquals(num - i - j, slice2.length());
byte[] data = new byte[num];
Expand Down

0 comments on commit bd12e07

Please sign in to comment.