Skip to content

Commit

Permalink
Merge pull request iipc#4 from nlevitt/hc43
Browse files Browse the repository at this point in the history
changes needed for move to httpcomponents
  • Loading branch information
kngenie committed Jan 14, 2014
2 parents 7ff3d4d + 7c6c673 commit 8e33ab7
Show file tree
Hide file tree
Showing 4 changed files with 144 additions and 22 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

<groupId>org.archive</groupId>
<artifactId>ia-web-commons</artifactId>
<version>1.1.0</version>
<version>1.1.1-SNAPSHOT</version>
<packaging>jar</packaging>

<name>ia-web-commons</name>
Expand Down
88 changes: 77 additions & 11 deletions src/main/java/org/archive/io/RecordingInputStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,10 @@ public RecordingInputStream(int bufferSize, String backingFilename)
}

public void open(InputStream wrappedStream) throws IOException {
logger.fine(Thread.currentThread().getName() + " opening " +
wrappedStream + ", " + Thread.currentThread().getName());
if (logger.isLoggable(Level.FINE)) {
logger.fine("wrapping " + wrappedStream + " in thread "
+ Thread.currentThread().getName());
}
if(isOpen()) {
// error; should not be opening/wrapping in an unclosed
// stream remains open
Expand Down Expand Up @@ -135,11 +137,11 @@ public int read(byte[] b) throws IOException {

public void close() throws IOException {
if (logger.isLoggable(Level.FINE)) {
logger.fine(Thread.currentThread().getName() + " closing " +
this.in + ", " + Thread.currentThread().getName());
logger.fine("closing " + this.in + " in thread "
+ Thread.currentThread().getName());
}
IOUtils.closeQuietly(this.in);
this.in = null;
this.in = null;
IOUtils.closeQuietly(this.recordingOutputStream);
}

Expand All @@ -159,20 +161,77 @@ public long readFully() throws IOException {
return this.recordingOutputStream.getSize();
}

public void readToEndOfContent(long contentLength)
throws IOException, InterruptedException {
// Check we're open before proceeding.
if (!isOpen()) {
// TODO: should this be a noisier exception-raising error?
return;
}

long totalBytes = recordingOutputStream.position - recordingOutputStream.getMessageBodyBegin();
long bytesRead = -1L;
long maxToRead = -1;
while (contentLength <= 0 || totalBytes < contentLength) {
try {
// read no more than soft max
maxToRead = (contentLength <= 0)
? drainBuffer.length
: Math.min(drainBuffer.length, contentLength - totalBytes);
// nor more than hard max
maxToRead = Math.min(maxToRead, recordingOutputStream.getRemainingLength());
// but always at least 1 (to trigger hard max exception) XXX wtf is this?
maxToRead = Math.max(maxToRead, 1);

bytesRead = read(drainBuffer,0,(int)maxToRead);
if (bytesRead == -1) {
break;
}
totalBytes += bytesRead;

if (Thread.interrupted()) {
throw new InterruptedException("Interrupted during IO");
}
} catch (SocketTimeoutException e) {
// A socket timeout is just a transient problem, meaning
// nothing was available in the configured timeout period,
// but something else might become available later.
// Take this opportunity to check the overall
// timeout (below). One reason for this timeout is
// servers that keep up the connection, 'keep-alive', even
// though we asked them to not keep the connection open.
if (logger.isLoggable(Level.FINE)) {
logger.log(Level.FINE, "socket timeout", e);
}
// check for interrupt
if (Thread.interrupted()) {
throw new InterruptedException("Interrupted during IO");
}
// check for overall timeout
recordingOutputStream.checkLimits();
} catch (SocketException se) {
throw se;
} catch (NullPointerException e) {
// [ 896757 ] NPEs in Andy's Th-Fri Crawl.
// A crawl was showing NPE's in this part of the code but can
// not reproduce. Adding this rethrowing catch block w/
// diagnostics to help should we come across the problem in the
// future.
throw new NullPointerException("Stream " + this.in + ", " +
e.getMessage() + " " + Thread.currentThread().getName());
}
}
}

/**
* Read all of a stream (Or read until we timeout or have read to the max).
* @param softMaxLength Maximum length to read; if zero or < 0, then no
* limit. If met, return normally.
* @param hardMaxLength Maximum length to read; if zero or < 0, then no
* limit. If exceeded, throw RecorderLengthExceededException
* @param timeout Timeout in milliseconds for total read; if zero or
* negative, timeout is <code>Long.MAX_VALUE</code>. If exceeded, throw
* RecorderTimeoutException
* @param maxBytesPerMs How many bytes per millisecond.
* @throws IOException failed read.
* @throws RecorderLengthExceededException
* @throws RecorderTimeoutException
* @throws InterruptedException
* @deprecated
*/
public void readFullyOrUntil(long softMaxLength)
throws IOException, RecorderLengthExceededException,
Expand Down Expand Up @@ -349,6 +408,13 @@ public int getRecordedBufferLength() {
return recordingOutputStream.getBufferLength();
}

/**
* See doc on {@link RecordingOutputStream#chopAtMessageBodyBegin()}
*/
public void chopAtMessageBodyBegin() {
recordingOutputStream.chopAtMessageBodyBegin();
}

public void clearForReuse() throws IOException {
recordingOutputStream.clearForReuse();
}
Expand Down
61 changes: 59 additions & 2 deletions src/main/java/org/archive/io/RecordingOutputStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public class RecordingOutputStream extends OutputStream {
private byte[] buffer;

/** current virtual position in the recording */
private long position;
long position;

/** flag to disable recording */
private boolean recording;
Expand Down Expand Up @@ -132,6 +132,29 @@ public class RecordingOutputStream extends OutputStream {
*/
protected long messageBodyBeginMark;

/**
* While messageBodyBeginMark is not set, the last two bytes seen.
*
* <p>
* This class does automatic detection of http message body begin (i.e. end
* of http headers). Unfortunately httpcomponents did not want to add
* functionality to help us with this, see
* https://issues.apache.org/jira/browse/HTTPCORE-325
*
* <p>
* It works like this: while messageBodyBeginMark is not set, we remember
* the last two bytes seen, and look at each byte we write. If the
* lastTwoBytes+currentByte is "\n\r\n", or lastTwoBytes[1]+currentByte is
* "\n\n" then we call markMessageBodyBegin() at the position after
* currentByte.
*
* <p>
* An assumption here is that protocols other than http don't have headers,
* and for those protocols the user of this class will call
* markMessageBodyBegin() at position 0 before writing anything.
*/
protected int[] lastTwoBytes = new int[] {-1, -1};

/**
* Stream to record.
*/
Expand Down Expand Up @@ -204,6 +227,20 @@ public void write(int b) throws IOException {
if (this.out != null) {
this.out.write(b);
}

// see comment on int[] lastTwoBytes
if (messageBodyBeginMark < 0l) {
// looking for "\n\n" or "\n\r\n"
if (b == '\n'
&& (lastTwoBytes[1] == '\n'
|| (lastTwoBytes[0] == '\n' && lastTwoBytes[1] == '\r'))) {
markMessageBodyBegin();
} else {
lastTwoBytes[0] = lastTwoBytes[1];
lastTwoBytes[1] = b;
}
}

checkLimits();
}

Expand All @@ -220,6 +257,14 @@ public void write(byte[] b, int off, int len) throws IOException {
off += consumeRange;
len -= consumeRange;
}

// see comment on int[] lastTwoBytes
while (messageBodyBeginMark < 0 && len > 0) {
write(b[off]);
off++;
len--;
}

if(recording) {
record(b, off, len);
}
Expand Down Expand Up @@ -251,7 +296,7 @@ protected void checkLimits() throws RecorderIOException {
throw new RecorderTimeoutException();
}
// need to throttle reading to hit max configured rate?
if(position/duration > maxRateBytesPerMs) {
if(position/duration >= maxRateBytesPerMs) {
long desiredDuration = position / maxRateBytesPerMs;
try {
Thread.sleep(desiredDuration-duration);
Expand Down Expand Up @@ -557,6 +602,18 @@ public long getRemainingLength() {
return maxLength - position;
}

/**
* Forget about anything past the point where the content-body starts. This
* is needed to support FetchHTTP's shouldFetchBody setting. See also the
* docs on {@link #lastTwoBytes}
*/
public void chopAtMessageBodyBegin() {
if (messageBodyBeginMark >= 0) {
this.size = messageBodyBeginMark;
this.position = messageBodyBeginMark;
}
}

public void clearForReuse() throws IOException {
this.out = null;
this.position = 0;
Expand Down
15 changes: 7 additions & 8 deletions src/main/java/org/archive/io/ReplayInputStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -192,11 +192,15 @@ public int read(byte[] b, int off, int len) throws IOException {
}

public void readFullyTo(OutputStream os) throws IOException {
readFullyTo(this, os);
}

public static void readFullyTo(InputStream in, OutputStream os) throws IOException {
byte[] buf = new byte[4096];
int c = read(buf);
int c = in.read(buf);
while (c != -1) {
os.write(buf,0,c);
c = read(buf);
c = in.read(buf);
}
}

Expand All @@ -218,12 +222,7 @@ public void readHeaderTo(OutputStream os) throws IOException {
*/
public void readContentTo(OutputStream os) throws IOException {
setToResponseBodyStart();
byte[] buf = new byte[4096];
int c = read(buf);
while (c != -1) {
os.write(buf,0,c);
c = read(buf);
}
readFullyTo(os);
}

/**
Expand Down

0 comments on commit 8e33ab7

Please sign in to comment.