Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid double buffering direct IO index input slices with BufferedIndexInput #14103

Merged
merged 8 commits into from
Jan 25, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.Arrays;
import java.util.Objects;
import java.util.OptionalLong;
import java.util.zip.CRC32;
import java.util.zip.Checksum;
Expand Down Expand Up @@ -298,9 +299,10 @@ private static final class DirectIOIndexInput extends IndexInput {
private final ByteBuffer buffer;
private final FileChannel channel;
private final int blockSize;

private final long offset;
private final long length;
private final boolean isClosable; // clones and slices are not closable
private boolean isOpen;
private boolean isClone;
private long filePos;

/**
Expand All @@ -313,31 +315,32 @@ private static final class DirectIOIndexInput extends IndexInput {
*/
public DirectIOIndexInput(Path path, int blockSize, int bufferSize) throws IOException {
super("DirectIOIndexInput(path=\"" + path + "\")");
this.blockSize = blockSize;

this.channel = FileChannel.open(path, StandardOpenOption.READ, getDirectOpenOption());
this.blockSize = blockSize;
this.buffer = allocateBuffer(bufferSize, blockSize);

isOpen = true;
isClone = false;
filePos = -bufferSize;
buffer.limit(0);
this.isOpen = true;
this.isClosable = true;
this.length = channel.size();
this.offset = 0L;
this.filePos = -bufferSize;
this.buffer.limit(0);
}

// for clone
private DirectIOIndexInput(DirectIOIndexInput other) throws IOException {
super(other.toString());
this.channel = other.channel;
this.blockSize = other.blockSize;

// for clone/slice
private DirectIOIndexInput(
String description, DirectIOIndexInput other, long offset, long length) throws IOException {
super(description);
Objects.checkFromIndexSize(offset, length, other.channel.size());
final int bufferSize = other.buffer.capacity();
this.buffer = allocateBuffer(bufferSize, blockSize);

isOpen = true;
isClone = true;
filePos = -bufferSize;
this.buffer = allocateBuffer(bufferSize, other.blockSize);
this.blockSize = other.blockSize;
this.channel = other.channel;
this.isOpen = true;
this.isClosable = false;
this.length = length;
this.offset = offset;
this.filePos = -bufferSize;
buffer.limit(0);
seek(other.getFilePointer());
}

private static ByteBuffer allocateBuffer(int bufferSize, int blockSize) {
Expand All @@ -348,44 +351,46 @@ private static ByteBuffer allocateBuffer(int bufferSize, int blockSize) {

@Override
public void close() throws IOException {
if (isOpen && !isClone) {
if (isOpen && isClosable) {
channel.close();
isOpen = false;
}
}

@Override
public long getFilePointer() {
long filePointer = filePos + buffer.position();
long filePointer = filePos + buffer.position() - offset;

// opening the input and immediately calling getFilePointer without calling readX (and thus
// refill) first,
// will result in negative value equal to bufferSize being returned,
// due to the initialization method filePos = -bufferSize used in constructor.
assert filePointer == -buffer.capacity() || filePointer >= 0
assert filePointer == -buffer.capacity() - offset || filePointer >= 0
: "filePointer should either be initial value equal to negative buffer capacity, or larger than or equal to 0";
return Math.max(filePointer, 0);
}

@Override
public void seek(long pos) throws IOException {
if (pos != getFilePointer()) {
final long alignedPos = pos - (pos % blockSize);
filePos = alignedPos - buffer.capacity();

final int delta = (int) (pos - alignedPos);
refill(delta);
buffer.position(delta);
seekInternal(pos);
}
assert pos == getFilePointer();
}

private void seekInternal(long pos) throws IOException {
final long absPos = pos + offset;
final long alignedPos = absPos - (absPos % blockSize);
filePos = alignedPos - buffer.capacity();

final int delta = (int) (absPos - alignedPos);
refill(delta);
buffer.position(delta);
}

@Override
public long length() {
try {
return channel.size();
} catch (IOException ioe) {
throw new UncheckedIOException(ioe);
}
return length;
}

@Override
Expand Down Expand Up @@ -429,7 +434,7 @@ private void refill(int bytesToRead) throws IOException {

// BaseDirectoryTestCase#testSeekPastEOF test for consecutive read past EOF,
// hence throwing EOFException early to maintain buffer state (position in particular)
if (filePos > channel.size() || (channel.size() - filePos < bytesToRead)) {
if (filePos > offset + length || ((offset + length) - filePos < bytesToRead)) {
throw new EOFException("read past EOF: " + this);
}

Expand Down Expand Up @@ -523,16 +528,23 @@ public void readLongs(long[] dst, int offset, int len) throws IOException {
@Override
public DirectIOIndexInput clone() {
try {
return new DirectIOIndexInput(this);
var clone = new DirectIOIndexInput("clone:" + this, this, offset, length);
clone.seekInternal(getFilePointer());
return clone;
} catch (IOException ioe) {
throw new UncheckedIOException(ioe);
}
}

@Override
public IndexInput slice(String sliceDescription, long offset, long length) {
// TODO: is this the right thing to do?
ChrisHegarty marked this conversation as resolved.
Show resolved Hide resolved
return BufferedIndexInput.wrap(sliceDescription, this, offset, length);
public IndexInput slice(String sliceDescription, long offset, long length) throws IOException {
if ((length | offset) < 0 || length > this.length - offset) {
throw new IllegalArgumentException(
"slice() " + sliceDescription + " out of bounds: " + this);
}
var slice = new DirectIOIndexInput(sliceDescription, this, this.offset + offset, length);
slice.seekInternal(0L);
return slice;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1206,6 +1206,9 @@ public void testSliceOfSlice() throws Exception {
slice1.seek(TestUtil.nextLong(random(), 0, slice1.length()));
for (int j = 0; j < slice1.length(); j += 16) {
IndexInput slice2 = slice1.slice("slice2", j, num - i - j);
if (random().nextBoolean()) {
slice2 = slice2.clone(); // clone shouldn't impact slice data
}
assertEquals(0, slice2.getFilePointer());
assertEquals(num - i - j, slice2.length());
byte[] data = new byte[num];
Expand Down
Loading