Skip to content

Commit

Permalink
Merge pull request #4717 from apache/nouveau-purge
Browse files Browse the repository at this point in the history
Add clustered purge support in nouveau
  • Loading branch information
rnewson authored Aug 6, 2023
2 parents 64faef7 + 259e641 commit 954b573
Show file tree
Hide file tree
Showing 18 changed files with 625 additions and 100 deletions.
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -547,6 +547,7 @@ derived:
# Build nouveau
nouveau:
ifeq ($(with_nouveau), 1)
@cd nouveau && ./gradlew spotlessApply
@cd nouveau && ./gradlew build -x test
endif

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import io.swagger.v3.jaxrs2.integration.resources.OpenApiResource;
import java.util.concurrent.ForkJoinPool;
import org.apache.couchdb.nouveau.core.IndexManager;
import org.apache.couchdb.nouveau.core.UpdatesOutOfOrderExceptionMapper;
import org.apache.couchdb.nouveau.health.AnalyzeHealthCheck;
import org.apache.couchdb.nouveau.health.IndexHealthCheck;
import org.apache.couchdb.nouveau.lucene9.Lucene9Module;
Expand All @@ -41,8 +40,6 @@ public String getName() {

@Override
public void run(NouveauApplicationConfiguration configuration, Environment environment) throws Exception {
environment.jersey().register(new UpdatesOutOfOrderExceptionMapper());

// configure index manager
final IndexManager indexManager = new IndexManager();
indexManager.setCommitIntervalSeconds(configuration.getCommitIntervalSeconds());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,24 +24,32 @@ public class DocumentDeleteRequest {
@Positive
private long seq;

private boolean purge;

public DocumentDeleteRequest() {
// Jackson deserialization
}

public DocumentDeleteRequest(long seq) {
public DocumentDeleteRequest(long seq, final boolean purge) {
if (seq < 1) {
throw new IllegalArgumentException("seq must be 1 or greater");
}
this.seq = seq;
this.purge = purge;
}

@JsonProperty
public long getSeq() {
return seq;
}

@JsonProperty
public boolean isPurge() {
return purge;
}

@Override
public String toString() {
return "DocumentDeleteRequest [seq=" + seq + "]";
return "DocumentDeleteRequest [seq=" + seq + ", purge=" + purge + "]";
}
}
28 changes: 18 additions & 10 deletions nouveau/src/main/java/org/apache/couchdb/nouveau/api/IndexInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,42 +19,50 @@
import jakarta.validation.constraints.PositiveOrZero;

@JsonNaming(PropertyNamingStrategies.SnakeCaseStrategy.class)
public class IndexInfo {
public final class IndexInfo {

@PositiveOrZero
private long updateSeq;
private final long updateSeq;

@PositiveOrZero
private int numDocs;
private final long purgeSeq;

@PositiveOrZero
private long diskSize;
private final int numDocs;

public IndexInfo() {}
@PositiveOrZero
private final long diskSize;

public IndexInfo(final long updateSeq, final int numDocs, final long diskSize) {
public IndexInfo(
@JsonProperty("update_seq") final long updateSeq,
@JsonProperty("purge_seq") final long purgeSeq,
@JsonProperty("num_docs") final int numDocs,
@JsonProperty("disk_size") final long diskSize) {
this.updateSeq = updateSeq;
this.purgeSeq = purgeSeq;
this.numDocs = numDocs;
this.diskSize = diskSize;
}

@JsonProperty
public int getNumDocs() {
return numDocs;
}

@JsonProperty
public long getDiskSize() {
return diskSize;
}

@JsonProperty
public long getUpdateSeq() {
return updateSeq;
}

public long getPurgeSeq() {
return purgeSeq;
}

@Override
public String toString() {
return "IndexInfo [updateSeq=" + updateSeq + ", numDocs=" + numDocs + ", diskSize=" + diskSize + "]";
return "IndexInfo [updateSeq=" + updateSeq + ", purgeSeq=" + purgeSeq + ", numDocs=" + numDocs + ", diskSize="
+ diskSize + "]";
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package org.apache.couchdb.nouveau.api;

import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.validation.constraints.Positive;
import java.util.OptionalLong;

public final class IndexInfoRequest {

private OptionalLong updateSeq;

private OptionalLong purgeSeq;

public IndexInfoRequest(
@JsonProperty("update_seq") @Positive OptionalLong updateSeq,
@JsonProperty("purge_seq") @Positive OptionalLong purgeSeq) {
this.updateSeq = updateSeq;
this.purgeSeq = purgeSeq;
}

public OptionalLong getUpdateSeq() {
return updateSeq;
}

public OptionalLong getPurgeSeq() {
return purgeSeq;
}

@Override
public String toString() {
return "IndexInfoRequest [updateSeq=" + updateSeq + ", purgeSeq=" + purgeSeq + "]";
}
}
59 changes: 52 additions & 7 deletions nouveau/src/main/java/org/apache/couchdb/nouveau/core/Index.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@

package org.apache.couchdb.nouveau.core;

import jakarta.ws.rs.WebApplicationException;
import jakarta.ws.rs.core.Response.Status;
import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.Semaphore;
Expand All @@ -36,13 +38,15 @@
public abstract class Index implements Closeable {

private long updateSeq;
private long purgeSeq;
private boolean deleteOnClose = false;
private long lastCommit = now();
private volatile boolean closed;
private final Semaphore permits = new Semaphore(Integer.MAX_VALUE);

protected Index(final long updateSeq) {
protected Index(final long updateSeq, final long purgeSeq) {
this.updateSeq = updateSeq;
this.purgeSeq = purgeSeq;
}

public final boolean tryAcquire() {
Expand Down Expand Up @@ -74,7 +78,7 @@ public final void release() {
public final IndexInfo info() throws IOException {
final int numDocs = doNumDocs();
final long diskSize = doDiskSize();
return new IndexInfo(updateSeq, numDocs, diskSize);
return new IndexInfo(updateSeq, purgeSeq, numDocs, diskSize);
}

protected abstract int doNumDocs() throws IOException;
Expand All @@ -90,9 +94,15 @@ public final synchronized void update(final String docId, final DocumentUpdateRe
protected abstract void doUpdate(final String docId, final DocumentUpdateRequest request) throws IOException;

public final synchronized void delete(final String docId, final DocumentDeleteRequest request) throws IOException {
assertUpdateSeqIsLower(request.getSeq());
doDelete(docId, request);
incrementUpdateSeq(request.getSeq());
if (request.isPurge()) {
assertPurgeSeqIsLower(request.getSeq());
doDelete(docId, request);
incrementPurgeSeq(request.getSeq());
} else {
assertUpdateSeqIsLower(request.getSeq());
doDelete(docId, request);
incrementUpdateSeq(request.getSeq());
}
}

protected abstract void doDelete(final String docId, final DocumentDeleteRequest request) throws IOException;
Expand All @@ -105,10 +115,12 @@ public final SearchResults search(final SearchRequest request) throws IOExceptio

public final boolean commit() throws IOException {
final long updateSeq;
final long purgeSeq;
synchronized (this) {
updateSeq = this.updateSeq;
purgeSeq = this.purgeSeq;
}
final boolean result = doCommit(updateSeq);
final boolean result = doCommit(updateSeq, purgeSeq);
if (result) {
final long now = now();
synchronized (this) {
Expand All @@ -118,7 +130,27 @@ public final boolean commit() throws IOException {
return result;
}

protected abstract boolean doCommit(final long updateSeq) throws IOException;
protected abstract boolean doCommit(final long updateSeq, final long purgeSeq) throws IOException;

public final synchronized void setUpdateSeq(final long updateSeq) throws IOException {
if (updateSeq < this.updateSeq) {
throw new WebApplicationException(
"update_seq must be equal or greater than current update_seq", Status.BAD_REQUEST);
}
if (updateSeq > this.updateSeq) {
incrementUpdateSeq(updateSeq);
}
}

public final synchronized void setPurgeSeq(final long purgeSeq) throws IOException {
if (purgeSeq < this.purgeSeq) {
throw new WebApplicationException(
"purge_seq must be equal or greater than current purge_seq", Status.BAD_REQUEST);
}
if (purgeSeq > this.purgeSeq) {
incrementPurgeSeq(purgeSeq);
}
}

@Override
public final void close() throws IOException {
Expand Down Expand Up @@ -159,6 +191,19 @@ protected final void incrementUpdateSeq(final long updateSeq) throws IOException
this.updateSeq = updateSeq;
}

protected final void assertPurgeSeqIsLower(final long purgeSeq) throws UpdatesOutOfOrderException {
assert Thread.holdsLock(this);
if (!(purgeSeq > this.purgeSeq)) {
throw new UpdatesOutOfOrderException(this.purgeSeq, purgeSeq);
}
}

protected final void incrementPurgeSeq(final long purgeSeq) throws IOException {
assert Thread.holdsLock(this);
assertPurgeSeqIsLower(purgeSeq);
this.purgeSeq = purgeSeq;
}

public boolean needsCommit(final long duration, final TimeUnit unit) {
final long commitNeededSince = now() - unit.toNanos(duration);
synchronized (this) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,16 @@

package org.apache.couchdb.nouveau.core;

import java.io.IOException;
import jakarta.ws.rs.WebApplicationException;
import jakarta.ws.rs.core.Response.Status;

public final class UpdatesOutOfOrderException extends IOException {
public final class UpdatesOutOfOrderException extends WebApplicationException {

public UpdatesOutOfOrderException(final long currentSeq, final long attemptedSeq) {
super(String.format(
"Updates applied in the wrong order (current seq: %d, attempted seq: %d)", currentSeq, attemptedSeq));
super(
String.format(
"Updates applied in the wrong order (current seq: %d, attempted seq: %d)",
currentSeq, attemptedSeq),
Status.BAD_REQUEST);
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import java.nio.file.NoSuchFileException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -104,8 +103,9 @@ public Lucene9Index(
final Analyzer analyzer,
final IndexWriter writer,
final long updateSeq,
final long purgeSeq,
final SearcherManager searcherManager) {
super(updateSeq);
super(updateSeq, purgeSeq);
this.analyzer = Objects.requireNonNull(analyzer);
this.writer = Objects.requireNonNull(writer);
this.searcherManager = Objects.requireNonNull(searcherManager);
Expand Down Expand Up @@ -144,12 +144,12 @@ public void doDelete(final String docId, final DocumentDeleteRequest request) th
}

@Override
public boolean doCommit(final long updateSeq) throws IOException {
public boolean doCommit(final long updateSeq, final long purgeSeq) throws IOException {
if (!writer.hasUncommittedChanges()) {
return false;
}
writer.setLiveCommitData(
Collections.singletonMap("update_seq", Long.toString(updateSeq)).entrySet());
writer.setLiveCommitData(Map.of("update_seq", Long.toString(updateSeq), "purge_seq", Long.toString(purgeSeq))
.entrySet());
writer.commit();
return true;
}
Expand Down
Loading

0 comments on commit 954b573

Please sign in to comment.