Skip to content

3.4 Task continuations

Alexander Damian edited this page Aug 21, 2019 · 3 revisions

Chaining coroutines

Coroutines typically run in parallel on separate threads, with the possibility of specifying on which queue a coroutine should run on. As we have seen earlier, this allows for writing lock-free code. Such coroutines have no logical affiliation with other coroutines and run in isolation, implementing some business logic of their own.

There comes a time however when it is necessary to execute business logic in conjunction with other business logic, or at least conditionally execute some code if-and-only-if some dependent code has successfully completed. In a fully parallel world, it is impossible to guarantee order and this is where task continuations come in.

Task continuation API allows chaining of tasks which guarantees strict ordering which means the next coroutine in the chain will only execute when the previous one completes successfully. A typical example is:

dispatcher.postFirst(firstFunc)->
    then(func2)->...->then(funcN)->
    onError(errHandler)->
    finally(cleanupFunc)->
    end();
  • postFirst() function is very similar to it's standalone counterparts post() and postAsyncIo(). It also allows the user to specify a queue id and a priority on which to run the task chain. It is to be noted that once a queue id and priority have been specified in postFirst(), all subsequent continuations will run on the same queue and at the same priority level (i.e. these properties propagate along the chain). postFirst() represents the head of a continuation chain and must appear only once at the beginning. Its syntax is identical to post().
  • then() function can be called zero or more times and follows immediately the postFirst() function.
  • onError() can be called at most once. This function must either follow postFirst() (in the absence of a then() function) or the last then() in the chain. This function only executes if one of the preceding chained continuation methods fails. A good analogy for this is the catch clause in a try-catch C++ statement. When an error happens (e.g. exception is thrown in a then() function) all functions in the chain following the error are skipped and onError() is executed if present.
  • finally() can be called at most once and must be the last function before end(). finally() will always be called even if an error occurs. Typically it can be used to run cleanup code or any logic which must absolutely execute.
  • end() function must terminate all continuation chains and must appear at most once. It does not invoke any coroutines and takes no arguments. Its sole purpose is to signal to the scheduler that the chain is complete and that the coroutine specified in postFirst() is ready to run.

NOTE: Continuations are only available for coroutines and not async IO tasks.

Nesting continuation chains

Nothing prevents the programmer to invoke a new execution chain from within a coroutine or another chain. Just like post() and postAsyncIO() can be called via the context object, so can postFirst():

dispatcher.postFirst([](CoroContextPtr<int> ctx)                      //func 1
{
    std::cout << "Inside the first coroutine" << std::end;

    //Post a new chain
    ctx->postFist([](CoroContextPtr<std::string> ctx)    //func 1.1
    {
        std::cout << "Inside the first nested coroutine" << std::end;
    })->finally([](CoroContextPtr<int> ctx)                      //func 1.2
    {
        std::cout << "Inside the last nested coroutine" << std::end;
    })->end();
})->then([](CoroContextPtr<double> ctx)                       //func 2
{
    std::cout << "Inside the second coroutine" << std::end;
})->end();
  • In this example, func 1 will run before func 2. Func 1.1 will run before func 1.2.
  • Func 1.1 may run in parallel with func 1 or func 2. If we want func 1.1 and func 1.2 to run before func 1 completes, we can simply wait on its future inside func 1 by calling ...->end()->wait(). Since each continuation method returns a ThreadContext object, end() gives access to func's 1.2 future. (see following chapter)

Accessing continuation futures

All continuation functions postFirst(), then(), onError(), finally(), end() return a ThreadContext interface, or similarly a CoroContext interface if called from within a coroutine. The template type T of each ThreadContext (or CoroContext) depends on what type of future that particular coroutine returns. In order to access a particular future in the chain, getAt(), getRefAt(), waitAt(), waitAll() functions have been provided. They all take the index of the specific future in the chain starting at index 0 representing the postFirst() future.

//First coroutine promises a std::string (index 0)
ThreadContext<int> tctx = dispatcher.postFirst(func1)->
    then(func2)->           //promises a double (index 1)
    then(func3)->           //promises a std::vector<int> (index 2)
    onError(errHandler)->   //promises a size_t (index 3)
    finally(cleanupFunc)->  //promises a int (index 4 or -1 aka last index)
    end();                  //end of chain. No promises.

//To access the futures:

//blocks on the 3rd promise
std::vector<int> v = tctx->getAt<std::vector<int>>(2);

//get a reference to the 2nd future. Does NOT block!
const double& d = tctx->getRefAt<double>(1);

//blocks for at most 2 seconds waiting for the 4th promise
size_t value = tctx->waitForAt(3, std::chrono::milliseconds(2000));

//blocks until all futures are available for reading.
tctx->waitAll();

NOTE: Since continuation coroutines run sequentially, invoking getAt(N-1) after getAt(N) is guaranteed not to block since at the time Nth future becomes available, N-1th promise is already set.

Propagating values from one continuation to the next (aka "past" futures)

One other advantage of using continuations is the ability to propagate the promised value not only to the calling task (via get(), getAt(), getRef(), getRefAt()) but also to the following continuation in the chain, so that it may take appropriate actions if needed based on the output of the previous stage in the chain. For this purpose, the getPrev() or getPrevRef() methods can be used as outlined below:

dispatcher.postFirst([](CoroContextPtr<std::string> ctx)->int
{
    ...
    return ctx->set("Answer to the question");
})->then([](CoroContextPtr<double> ctx)->int
{
    std::string s = ctx->getPrev<std::string>(); //Retrieve past future
    if (s == "Answer to the question")
    {
        return ctx->set(42.42);
    }
    else
    {
        return ctx->set(-1);
    }
})->end();

One thing to keep in mind is that getPrev() and getPrevRef() don't block like their get() and getRef() counterparts because the future has already been posted by the previous continuation task.

Programmatic chaining of tasks

Sometimes it's necessary to serialize multiple identical tasks, for example a series of objects which are grouped together by certain business logic and need to be processed in a particular order (e.g. order of creation, order of arrival, etc). Using continuations, it's quite easy to chain together handlers in a programmatic way:

std::queue<std::shared_ptr<object>> objects; //objects to be processed in order

auto handler = [](CoroContextPtr<int>, std::shared_ptr<object> obj)->int
{
    ... //Do some logic with 'obj'
    return 0;
};

auto tctx = dispatcher.postFirst(handler, objects.front()); //start the chain
objects.pop();

while (!objects.empty())
{
    tctx = tctx->then(handler, objects.front()); //chain the next handler and object
    objects.pop();
}

//End chain and run all handlers
tctx->end();

Programmatic serialization of tasks

If the tasks to chain are not necessarily known in advance and the application requires immediate processing of any incoming data, a more simplistic way of serializing tasks is shown below.

//note that access to this queue should be protected
std::queue<std::shared_ptr<object>> objects; //objects to be processed in order

//continuous processing
while (1) 
{
     while (!objects.empty())
     {
         //run processing coroutine
         dispatcher.post(handler, objects.front())->wait(); //wait till finish
         objects.pop();
     }
     std::this_thread::sleep_for(std::chrono::milliseconds(100)); //wait a bit
}