Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

future.apply::future_lapply is not a blocking call in certain situations #84

Closed
vsrdharca opened this issue May 9, 2021 · 7 comments
Closed
Labels

Comments

@vsrdharca
Copy link

@HenrikBengtsson

Hi Henrik - This is an old discussion but I have a quick question along the same lines. As you indicated, like the the older implementation future::future_lapply, future.apply::future_lapply is also blocking.

But I have noticed say with a cluster of servers controlled by a main server with future::plan strategy = cluster (earlySignal by default seems to be FALSE), and run a job spread across each of the individual servers in the cluster, when the job is initiated across all the servers in the cluster with the future.future_lapply call, it starts out blocking as expected. But when one of the servers in the cluster is terminated (called away by the cloud provider) and hence the worker dies unexpectedly, future.apply::future_lapply returns. The individual jobs on the other servers are still running but since the function returns because future_lapply returns in this case the downstream script starts processing when it should wait till the entire job spanning across the cluster of servers is complete.

base-r - 4.0.1 (main server and all workers)
future.apply - 1.6.0
future - 1.17.0

Is this expected?

Much thanks,

@HenrikBengtsson
Copy link
Collaborator

But when one of the servers in the cluster is terminated (called away by the cloud provider) and hence the worker dies unexpectedly, future.apply::future_lapply returns.

It should not return, it should result in an error (of class FutureError) being produced and thrown. Are you sure it's actually returning?

The individual jobs on the other servers are still running ...

Yes, this is expected because we (still) don't have a mechanism for "terminating"/"suspending" futures.

This is a feature request that requires several other things to be in place before it can be addressed, cf. futureverse/future#93, futureverse/future#213, futureverse/parallelly#33, futureverse/future.batchtools#27.

@vsrdharca
Copy link
Author

vsrdharca commented May 10, 2021

Hi @HenrikBengtsson

Thanks for the quick response. I just kicked off a job that spans 30 instances. future_lapply returned to a console prompt even though none of the 30 instances were not pre-empted. When some of the instances were pre-empted I didn't get an exception/ error either. I see console output from the job so its running.

@HenrikBengtsson
Copy link
Collaborator

HenrikBengtsson commented May 10, 2021

Hmm... I think there's some miscommunication or misunderstanding going on. I don't see how it would be possible for future_lapply() to return without producing an error when one of the parallel workers terminates.

It sounds like you're running HPC jobs or something and you're calling R from the terminal. Here is an illustrating of what happens if you do that without and with errors:

$ Rscript --vanilla -e "1+2"
[1] 3

$ echo "exit code=$?"
exit code=0
$ Rscript --vanilla -e "stop('boom')"
Error: boom
Execution halted

$ echo "exit code=$?"
exit code=1

If you use future_lapply() with multisession workers, you'll get the same behavior:

$ Rscript --vanilla -e "library(future.apply); plan(multisession); future_lapply(1:2, function(x) { x })"
Loading required package: future
[[1]]
[1] 1

[[2]]
[1] 2

$ echo "exit code=$?"
exit code=0
$ Rscript --vanilla -e "library(future.apply); plan(multisession); future_lapply(1:2, function(x) { if (x == 2) stop('boom'); Sys.sleep(5); x })"
Loading required package: future
Error in ...future.FUN(...future.X_jj, ...) : boom
Calls: future_lapply ... resolve.list -> signalConditionsASAP ->signalConditions
Execution halted

$ echo "exit code=$?"
exit code=1

And in the extreme case when one of the parallel workers crashes completely, which is emulated with a quit() call to one of the workers, you'll still get an error in the main R session:

$ Rscript --vanilla -e "library(future.apply); plan(multisession, workers=2); future_lapply(1:2, function(x) { if (x == 2) quit(); Sys.sleep(5); x })"
Loading required package: future
Error in unserialize(node$con) :

Failed to retrieve the value of
MultisessionFuture (future_lapply-2) from cluster RichSOCKnode #2 (PID 11217 on
localhost 'localhost'). The reason reported was 'error reading from connection'. Post-mortem
diagnostic: No process exists with this PID, i.e. the localhost worker is no longer alive.

Calls: future_lapply ... resolved -> resolved.ClusterFuture -> receiveMessageFromWorker
Execution halted

$ echo "exit code=$?"
exit code=1

This is with up-to-date future, future.apply, and parallelly, but this has always been the behavior. If you get something different from the above, there's something really weird going on.

@vsrdharca
Copy link
Author

Hi @HenrikBengtsson

You are right it is a HPC cluster. Each instance is running a docker container with the R environment and packages. I spin up the HPC cluster in Google Cloud from a controlling server; deploy the docker container containing my R environment to each individual server; open a pipe to Rscript in the R environment inside the docker container from the controlling server. I then execute my function which does its computation inside each of the containers. In situations where one of the worker servers int he cluster is called away I don't get the FutureError in the controlling server and sometimes the even when all of the worker servers are up and the function is running inside of the containers and the console prompt returns in the controller server invoking the future.apply:: future_apply.

Below is the code snipper creating the plan and opening up a pipe to the Rscript prompt inside the docker container inside of the worker servers.

Is it an issue with how I am setting up the plan?

# Create cluster
  plan(cluster, workers = as.cluster(
    VM_List,
    docker_image=<name>,
    rscript=launch container invoke Rscript inside container)
 )

@HenrikBengtsson
Copy link
Collaborator

Sorry, there's not much for me to go by here. I can only re-iterated that I don't see how future_lapply() could possibly return when a worker terminates without returning results. y <- future_lapply(X, FUN, ..) either (i) succeeds and returns y with length(y) == length(X), (ii) throws a regular error in case one of the FUN(X[ii]) calls produce an error, or (iii) throws a FutureError in case there's a major orchestration failure, e.g. worker crashes, power failure, network problem. There also are lots of sanity checks around these. The first two - (i) and (ii) - are just how regular lapply() works. There is no other outcome of a call to future_lapply() that (i), (ii), or (iii).

If it's true that (i) happens despite workers crashes, which I really don't see how it could happen, I suggest that you look at the results y and see what you've got. At a minimum you should see length(y) == length(X)`.

I suggest that your work of my minimal examples and see if those work with your "plan". If they don't try to simplify your setup as far as possible, e.g. only two workers, or possibly even a single worker.

@vsrdharca
Copy link
Author

Hi @HenrikBengtsson - Thank you for the info. I'll try to run a smaller and more controlled example to test. Much thanks.

@HenrikBengtsson
Copy link
Collaborator

Closing. Please reopen if you come up with a reproducible example.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants