Skip to content

Commit

Permalink
dcache-bulk,dcache-qos: repair mass cancellation issues
Browse files Browse the repository at this point in the history
Motivation:

The latest modifications (9.2 and master) created
a regression in how bulk request cancellation is
handled.  The symptoms present themselves under
two guises:

- cancellation stalls and requires a second
  cancellation to complete; this is not
  always successful in cancelling all the
  requests in question.
- cancellation en masse provokes a large
  number of timeout failures from QoS
  because of message congestion and
  response time.

Modifications:

The second problem seems to be solved fairly
easily by increasing a few default message
timeouts from 1 to 5 minutes (both bulk
and qos).  These are reset by this patch.

The first problem has been addressed as follows:

1.  `checkForRequestCancellation()` has been added
    in a number positions (some accidentally eliminated
    from earlier versions) allowing ongoing operations
    to be short-circuited.  This includes any
    writing of state to the database.

2.  The update of the database entries to CANCELLED is now
    done before (rather than after) the attempt to cancel
    in-flight activities; since we now short-circuit writing
    of state after completion on cancelled targets.
    Discovered targets which have not yet been stored
    should be ignored.

In addition to these changes, this patch also includes:

- A refactoring of the handling of RuntimeExceptions
  which makes for clearer code and which does not
  throw or catch Throwable.
- Correction of an error in `handleCompletion` where `cancel`
  is called on the ContainerTask instead the container
  itself.
- A minor refactoring of the `checkForRequestCancellation`
  method.
- Elimination of redundant iterator remove() in one
  cancellation method.

Result:

Mass cancellation of all running requests now
finishes correctly.  (Note that there may be
a tail of operations still being cancelled
by the `QoSEngine` thereafter, but everything
eventually settles out.)  Both non-recursive
and recursive requests were tested.

Target: master
Request: 9.2
Patch: https://rb.dcache.org/r/14169
Requires-notes: yes (Fixes cancellation bugs in Bulk)
Acked-by: Tigran
  • Loading branch information
alrossi authored and mksahakyan committed Nov 20, 2023
1 parent 4bed61a commit 3a255bb
Show file tree
Hide file tree
Showing 4 changed files with 96 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import javax.security.auth.Subject;
import javax.ws.rs.HEAD;
import org.dcache.auth.attributes.Restriction;
import org.dcache.cells.AbstractMessageCallback;
import org.dcache.cells.CellStub;
Expand Down Expand Up @@ -252,8 +253,8 @@ abstract class ContainerTask implements Runnable {
public void run() {
try {
doInner();
} catch (Throwable e) {
handleException(e);
} catch (RuntimeException e) {
uncaughtException(Thread.currentThread(), e);
}
}

Expand Down Expand Up @@ -324,7 +325,7 @@ void remove() {
}
}

abstract void doInner() throws Throwable;
abstract void doInner();
}

class DirListTask extends ContainerTask {
Expand All @@ -343,10 +344,12 @@ class DirListTask extends ContainerTask {
permitHolder.setTaskSemaphore(dirListSemaphore);
}

void doInner() throws Throwable {
void doInner() {
try {
checkForRequestCancellation();
DirectoryStream stream = getDirectoryListing(path);
for (DirectoryEntry entry : stream) {
checkForRequestCancellation();
LOGGER.debug("{} - DirListTask, directory {}, entry {}", ruid, path,
entry.getName());
FsPath childPath = path.child(entry.getName());
Expand Down Expand Up @@ -384,6 +387,7 @@ void doInner() throws Throwable {
}
}

checkForRequestCancellation();
switch (targetType) {
case BOTH:
case DIR:
Expand All @@ -402,6 +406,18 @@ void doInner() throws Throwable {
SKIPPED, null));
}
}
} catch (InterruptedException e) {
/*
* Cancelled. Do nothing.
*/
permitHolder.releaseIfHoldingPermit();
} catch (BulkServiceException | CacheException e) {
/*
* Fail fast
*/
containerState = ContainerState.STOP;
jobTarget.setErrorObject(e);
update();
} finally {
remove();
}
Expand Down Expand Up @@ -457,27 +473,42 @@ void cancel() {
}

@Override
void doInner() throws Throwable {
switch (state) {
case RESOLVE_PATH:
resolvePath();
break;
case FETCH_ATTRIBUTES:
fetchAttributes();
break;
case HANDLE_DIR_TARGET:
performActivity();
break;
case HANDLE_TARGET:
default:
switch (depth) {
case NONE:
performActivity();
break;
default:
handleTarget();
}
break;
void doInner() {
try {
checkForRequestCancellation();
switch (state) {
case RESOLVE_PATH:
resolvePath();
break;
case FETCH_ATTRIBUTES:
fetchAttributes();
break;
case HANDLE_DIR_TARGET:
performActivity();
break;
case HANDLE_TARGET:
default:
switch (depth) {
case NONE:
performActivity();
break;
default:
handleTarget();
}
break;
}
} catch (InterruptedException e) {
/*
* Cancellation. Do nothing.
*/
permitHolder.releaseIfHoldingPermit();
} catch (RuntimeException e) {
target.setErrorObject(e);
if (activityFuture == null) {
activityFuture = Futures.immediateFailedFuture(Throwables.getRootCause(e));
}
handleCompletion();
uncaughtException(Thread.currentThread(), e);
}
}

Expand Down Expand Up @@ -631,6 +662,7 @@ private void handleCompletion() {

State state = RUNNING;
try {
checkForRequestCancellation();
activity.handleCompletion(target, activityFuture);
state = target.getState();

Expand All @@ -644,10 +676,15 @@ private void handleCompletion() {
} catch (BulkStorageException e) {
LOGGER.error("{}, could not store target from result {}, {}, {}: {}.", ruid,
target.getId(), target.getPath(), target.getAttributes(), e.toString());
} catch (InterruptedException e) {
/*
* Cancelled. Do nothing.
*/
return;
}

if (state == FAILED && request.isCancelOnFailure()) {
cancel();
if (state == FAILED && request.isCancelOnFailure() && !jobTarget.isTerminated()) {
BulkRequestContainerJob.this.cancel();
} else {
remove();
}
Expand All @@ -674,25 +711,28 @@ private void storeOrUpdate(Throwable error) {
return;
}

target.setState(error == null ? RUNNING : FAILED);
target.setErrorObject(error);
if (jobTarget.isTerminated()) {
error = new InterruptedException();
}

if (error == null) {
target.setState(RUNNING);
} else {
target.setErrorObject(error);
}

try {
/*
* If this is an insert (id == null), the target id will be updated to what is
* returned from the database.
*/
checkForRequestCancellation();
targetStore.storeOrUpdate(target);
LOGGER.debug("{} - storeOrUpdate, target id {}", ruid, target.getId());
} catch (BulkStorageException e) {
LOGGER.error("{}, could not store or update target from result {}, {}, {}: {}.",
ruid,
target.getId(), target.getPath(), target.getAttributes(), e.toString());
error = e;
} catch (InterruptedException e) {
remove();
return;
}

if (error != null) {
Expand Down Expand Up @@ -762,11 +802,14 @@ public void cancel() {
*/
containerState = ContainerState.STOP;
update(CANCELLED);
targetStore.cancelAll(rid);

LOGGER.debug("{} - cancel, running {}.", ruid, running.size());

int count = 0;

/*
* Drain running tasks. Calling task cancel removes the task from the map.
* Drain running tasks.
*/
while (true) {
ContainerTask task;
Expand All @@ -778,12 +821,12 @@ public void cancel() {
task = running.values().iterator().next();
}

task.cancel();

LOGGER.debug("{} - cancel: task {} cancelled.", ruid, task.seqNo);
task.cancel(); // removes the task
++count;
}

targetStore.cancelAll(rid);
LOGGER.trace("{} - cancel: {} tasks cancelled; running size: {}.", ruid, count,
running.size());

signalStateChange();
}
Expand All @@ -794,8 +837,7 @@ public void cancel(long targetId) {
ContainerTask task = i.next();
if (task instanceof TargetTask
&& targetId == ((TargetTask) task).target.getId()) {
task.cancel();
i.remove();
task.cancel(); // removes the task
break;
}
}
Expand Down Expand Up @@ -963,19 +1005,10 @@ public void update(State state) {
}

private void checkForRequestCancellation() throws InterruptedException {
if (containerState == ContainerState.STOP) {
throw new InterruptedException();
}

if (isRunThreadInterrupted()) {
if (containerState == ContainerState.STOP || isRunThreadInterrupted()
|| jobTarget.isTerminated()) {
throw new InterruptedException();
}

synchronized (running) {
if (jobTarget.isTerminated()) {
throw new InterruptedException();
}
}
}

private void checkTransitionToDirs() {
Expand Down Expand Up @@ -1038,6 +1071,7 @@ private void processDirTargets() {
*/
for (DirTarget dirTarget : sorted) {
try {
checkForRequestCancellation();
new TargetTask(toTarget(dirTarget.id, dirTarget.pid, dirTarget.path,
Optional.of(dirTarget.attributes), CREATED, null),
TaskState.HANDLE_DIR_TARGET).performSync();
Expand All @@ -1064,6 +1098,7 @@ private void processFileTargets() {

for (BulkRequestTarget target : requestTargets) {
try {
checkForRequestCancellation();
new TargetTask(target, TaskState.RESOLVE_PATH).submitAsync();
} catch (InterruptedException e) {
/*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ public void setErrorObject(Object error) {
errorType = errorObject.getClass().getCanonicalName();
errorMessage = errorObject.getMessage();

setState(State.FAILED);
setState(errorObject instanceof InterruptedException ? State.CANCELLED : State.FAILED);
}
}

Expand Down
4 changes: 3 additions & 1 deletion skel/share/defaults/bulk.properties
Original file line number Diff line number Diff line change
Expand Up @@ -175,8 +175,10 @@ bulk.service.poolmanager.timeout=1
bulk.service.qos=${dcache.service.qos}

# ---- How long to wait for a response from qos.
# The default timeout is slightly longer in order to support cancellations
# of many requests simultaneously, especially if these requests each have many targets.
#
bulk.service.qos.timeout=1
bulk.service.qos.timeout=5
(one-of?MILLISECONDS|SECONDS|MINUTES|HOURS|DAYS)bulk.service.qos.timeout.unit=MINUTES

# ---- How long to wait for a response from the HA leader.
Expand Down
8 changes: 6 additions & 2 deletions skel/share/defaults/qos.properties
Original file line number Diff line number Diff line change
Expand Up @@ -118,9 +118,11 @@ qos.service.pinmanager.timeout=1
(one-of?MILLISECONDS|SECONDS|MINUTES|HOURS|DAYS)qos.service.pinmanager.timeout.unit=MINUTES

# ---- Endpoint (cell) settings for contacting pnfs manager.
# Activity in the engine and verifier can be intense so a longer default timeout
# may at times be required.
#
qos.service.pnfsmanager=${dcache.service.pnfsmanager}
qos.service.pnfsmanager.timeout=1
qos.service.pnfsmanager.timeout=5
(one-of?MILLISECONDS|SECONDS|MINUTES|HOURS|DAYS)qos.service.pnfsmanager.timeout.unit=MINUTES

# ---- Endpoint (cell) settings for contacting pools (destination is dynamic).
Expand All @@ -134,9 +136,11 @@ qos.service.transition.timeout=1
(one-of?MILLISECONDS|SECONDS|MINUTES|HOURS|DAYS)qos.service.transition.timeout.unit=MINUTES

# ---- Main external entry point for qos.
# The verifier requirements requests can accumulate against the engine so a longer
# default timeout is indicated.
#
qos.service.requirements=${dcache.service.qos}
qos.service.requirements.timeout=1
qos.service.requirements.timeout=5
(one-of?MILLISECONDS|SECONDS|MINUTES|HOURS|DAYS)qos.service.requirements.timeout.unit=MINUTES

# ---- Internal endpoints consumed only by other qos services.
Expand Down

0 comments on commit 3a255bb

Please sign in to comment.