Ranges, Coroutines, and React: Early Musings on the Future of Async in C++

Disclaimer: these are my early thoughts. None of this is battle ready. You’ve been warned.

Hello, Coroutines!

At the recent C++ Committee meeting in Toronto, the Coroutines TS was forwarded to ISO for publication. That roughly means that the coroutine “feature branch” is finished, and is ready to be merged into trunk (standard C++) after a suitable vetting period (no less than a year). That puts it on target for C++20. What does that mean for idiomatic modern C++?

Lots, actually. With the resumable functions (aka, stackless coroutines) from the Coroutines TS, we can do away with callbacks, event loops, and future chaining (future.then()) in our asynchronous APIs. Instead, our APIs can return “awaitable” types. Programmers can then just use these APIs in a synchronous-looking style, spamming co_await in front of any async API call and returning an awaitable type.

This is a bit abstract, so this blog post make it more concrete. It describes how the author wrapped the interface of libuv — a C library that provides the asynchronous I/O in Node.js — in awaitables. In libuv, all async APIs take a callback and loop on an internal event loop, invoking the callback when the operation completes. Wrapping the interfaces in awaitables makes for a much better experience without the callbacks and the inversion of control they bring.

Below, for instance, is a function that (asynchronously) opens a file, reads from it, writes it to stdout, and closes it:

auto start_dump_file( const std::string& str )
  -> future_t<void>
{
  // We can use the same request object for
  // all file operations as they don't overlap.
  static_buf_t<1024> buffer;

  fs_t openreq;
  uv_file file = co_await fs_open(uv_default_loop(),
                                  &openreq,
                                  str.c_str(),
                                  O_RDONLY,
                                  0);
  if (file > 0)
  {
    while (1)
    {
      fs_t readreq;
      int result = co_await fs_read(uv_default_loop(),
                                    &readreq,
                                    file,
                                    &buffer,
                                    1,
                                    -1);
      if (result <= 0)
        break;
      buffer.len = result;
      fs_t req;
      (void) co_await fs_write(uv_default_loop(),
                               &req,
                               1 /*stdout*/,
                               &buffer,
                               1,
                               -1);
    }
    fs_t closereq;
    (void) co_await fs_close(uv_default_loop(),
                             &closereq,
                             file);
  }
}

You can see that this looks almost exactly like ordinary synchronous code, with two exceptions:

  1. Calls to asynchronous operations are preceded with co_await, and
  2. The function returns an awaitable type (future_t<void>).

Very nice. But this code snippet does too much in my opinion. Wouldn’t it be nice to have a reusable component for asynchronously reading a file, separate from the bit about writing it to stdout? What would that even look like?

Hello, Ranges!

Also at the recent C++ Committee meeting in Toronto, the Ranges TS was forwarded to ISO for publication. This is the first baby step toward a complete reimagining and reimplementation of the C++ standard library in which interfaces are specified in terms of ranges in addition to iterators.

Once we have “range” as an abstraction, we can build range adaptors and build pipelines that transform ranges of values in interesting ways. More than just a curiosity, this is a very functional style that lets you program without a lot of state manipulation. The fewer states your program can be in, the easier it is for you to reason about your code, and the fewer bugs you’ll have. (For more on that, you can see my 2015 C++Con talk about ranges; or just look at the source for a simple app that prints a formatted calendar to stdout, and note the lack of loops, conditionals, and overt state manipulation.)

For instance, if we have a range of characters, we might want to lazily convert each character to lowercase. Using the range-v3 library, you can do the following:

std::string hello("Hello, World!");
using namespace ranges;
auto lower = hello
           | view::transform([](char c){
               return (char)std::tolower(c);});

Now lower presents a view of hello where each character is run through the tolower transform on the fly.

Although the range adaptors haven’t been standardized yet, the Committee has already put its stamp of approval on the overall direction, including adaptors and pipelines. (See N4128 for the ranges position paper.) Someday, these components will all be standard, and the C++ community can encourage their use in idiomatic modern C++.

Ranges + Coroutines == ?

With coroutines, ranges become even more powerful. For one thing, the co_yield keyword makes it trivial to define your own (synchronous) ranges. Already with range-v3 you can use the following code to define a range of all the integers and apply a filter to them:

#include <iostream>
#include <range/v3/all.hpp>
#include <range/v3/experimental/utility/generator.hpp>

using namespace ranges;

// Define a range of all the unsigned shorts:
experimental::generator<unsigned short> ushorts()
{
  unsigned short u = 0;
  do { co_yield u; } while (++u);
}

int main()
{
  // Filter all the even unsigned shorts:
  auto evens = ushorts()
             | view::filter([](auto i) {
                   return (i % 2) == 0; });

  // Write the evens to cout:
  copy( evens, ostream_iterator<>(std::cout, "\n") );
}

Put the above code in a .cpp file, compile with a recent clang and -fcoroutines-ts -std=gnu++1z, and away you go. Congrats, you’re using coroutines and ranges together. This is a trivial example, but you get the idea.

Asynchronous Ranges

That great and all, but it’s not asynchronous, so who cares? If it were asynchronous, what would that look like? Moving to the first element of the range would be an awaitable operation, and then moving to every subsequent element would also be awaitable.

In the ranges world, moving to the first element of a range R is spelled “auto it = begin(R)”, and moving to subsequent elements is spelled “++it”. So for an asynchronous range, those two operations should be awaitable. In other words, given an asynchronous range R, we should be able to do:

// Consume a range asynchronously
for( auto it = co_await begin(R);
     it != end(R);
     co_await ++it )
{
  auto && e = *it;
  do_something( e );
}

In fact, the Coroutines TS anticipates this and has a asynchronous range-based for loop for just this abstraction. The above code can be rewritten:

// Same as above:
for co_await ( auto&& e : R )
{
  do_something( e );
}

Now we have two different but closely related abstractions: Range and AsynchronousRange. In the first, begin returns something that models an Iterator. In the second, begin returns an Awaitable of an AsynchronousIterator. What does that buy us?

Asynchronous Range Adaptors

Once we have an abstraction, we can program against that abstraction. Today we have a view::transform that knows how to operate on synchronous ranges. It can be extended to also work with asynchronous ranges. So can all the other range adaptors: filter, join, chunk, group_by, interleave, transpose, etc, etc. So it will be possible to build a pipeline of operations, and apply the pipeline to a synchronous range to get a (lazy) synchronous transformation, and apply the same exact pipeline to an asynchronous range to get a non-blocking asynchronous transformation. The benefits are:

  • The same functional style can be used for synchronous and asynchronous code, reusing the same components and the same idioms.
  • Asynchronous code, when expressed with ranges and transformations, can be made largely stateless, as can be done today with synchronous range-based code. This leads to programs with fewer states and hence fewer state-related bugs.
  • Range-based code composes very well and encourages a decomposition of problems into orthogonal pieces which are easily testable in isolation. (E.g., a view::filter component can be used with any input range, synchronous or asynchronous, and can be easily tested in isolation of any particular range.)

Another way to look at this is that synchronous ranges are an example of a pull-based interface: the user extracts elements from the range and processes them one at a time. Asynchronous ranges, on the other hand, represent more of a push-based model: things happen when data shows up, whenever that may be. This is akin to the reactive style of programming.

By using ranges and coroutines together, we unify push and pull based idioms into a consistent, functional style of programming. And that’s going to be important, I think.

Back to LibUV

Earlier, we wondered about a reusable libuv component that used its asynchronous operations to read a file. Now we know what such a component could look like: an asynchronous range. Let’s start with an asynchronous range of characters. (Here I’m glossing over the fact that libuv deals with UTF-8, not ASCII. I’m also ignoring errors, which is another can of worms.)

auto async_file( const std::string& str )
  -> async_generator<char>
{
  // We can use the same request object for
  // all file operations as they don't overlap.
  static_buf_t<1024> buffer;

  fs_t openreq;
  uv_file file = co_await fs_open(uv_default_loop(),
                                  &openreq,
                                  str.c_str(),
                                  O_RDONLY,
                                  0);
  if (file > 0)
  {
    while (1)
    {
      fs_t readreq;
      int result = co_await fs_read(uv_default_loop(),
                                    &readreq,
                                    file,
                                    &buffer,
                                    1,
                                    -1);
      if (result <= 0)
        break;
      // Yield the characters one at a time.
      for ( int i = 0; i < result; ++i )
      {
        co_yield buffer.buffer[i];
      }
    }
    fs_t closereq;
    (void) co_await fs_close(uv_default_loop(),
                             &closereq,
                             file);
  }
}

The async_file function above asynchronously reads a block of text from the file and then co_yields the individual characters one at a time. The result is an asynchronous range of characters: async_generator<char>. (For an implementation of async_generator, look in Lewis Baker’s cppcoro library.)

Now that we have an asynchronous range of characters representing the file, we can apply transformations to it. For instance, we could convert all the characters to lowercase:

// Create an asynchronous range of characters read
// from a file and lower-cased:
auto async_lower = async_file("some_input.txt")
                 | view::transform([](char c){
                     return (char)std::tolower(c);});

That’s the same transformation we applied above to a std::string synchronously, but here it’s used asynchronously. Such an asynchronous range can then be passed through further transforms, asynchronously written out, or passed to an asynchronous std:: algorithm (because we’ll need those, too!)

One More Thing

I hear you saying, “Processing a file one character at a time like this would be too slow! I want to operate on chunks.” The above async_file function is still doing too much. It should be an asynchronous range of chunks. Let’s try again:

auto async_file_chunk( const std::string& str )
  -> async_generator<static_buf_t<1024>&>
{
  // We can use the same request object for
  // all file operations as they don't overlap.
  static_buf_t<1024> buffer;

  fs_t openreq;
  uv_file file = co_await fs_open(uv_default_loop(),
                                  &openreq,
                                  str.c_str(),
                                  O_RDONLY,
                                  0);
  if (file > 0)
  {
    while (1)
    {
      fs_t readreq;
      int result = co_await fs_read(uv_default_loop(),
                                    &readreq,
                                    file,
                                    &buffer,
                                    1,
                                    -1);
      if (result <= 0)
        break;
      // Just yield the buffer.
      buffer.len = result;
      co_yield buffer;
    }
    fs_t closereq;
    (void) co_await fs_close(uv_default_loop(),
                             &closereq,
                             file);
  }
}

Now if I want to, I can asynchronously read a block and asynchronously write the block, as the original code was doing, but while keeping those components separate, as they should be.

For some uses, a flattened view would be more convenient. No problem. That’s what the adaptors are for. If static_buf_t is a (synchronous) range of characters, we already have the tools we need:

// Create an asynchronous range of characters read from a
// chunked file and lower-cased:
auto async_lower = async_file_chunk("some_input.txt")
                 | view::join
                 | view::transform([](char c){
                     return (char)std::tolower(c);});

Notice the addition of view::join. Its job is to take a range of ranges and flatten it. Let’s see what joining an asynchronous range might look like:

template <class AsyncRange>
auto async_join( AsyncRange&& rng )
  -> async_generator<range_value_t<
       async_range_value_t<AsyncRange>>>
{
  for co_await ( auto&& chunk : rng )
  {
    for ( auto&& e : chunk )
      co_yield e;
  }
}

We (asynchronously) loop over the outer range, then (synchronously) loop over the inner ranges, and co_yield each value. Pretty easy. From there, it’s just a matter of rigging up operator| to async_join to make joining work in pipelines. (A fully generic view::join will be more complicated than that since both the inner and outer ranges can be either synchronous or asynchronous, but this suffices for now.)

Summary

With ranges and coroutines together, we can unify the push and pull programming idioms, bringing ordinary C++ and reactive C++ closer together. The C++ Standard Library is already evolving in this direction, and I’m working to make that happen both on the Committee and internally at Facebook.

There’s LOTS of open questions. How well does this perform at runtime? Does this scale? Is it flexible enough to handle lots of interesting use cases? How do we handle errors in the middle of an asynchronous pipeline? What about splits and joins in the async call graph? Can this handle streaming interfaces? And so on. I’ll be looking into all this, but at least for now I have a promising direction, and that’s fun.

"\e"

14 Replies to “Ranges, Coroutines, and React: Early Musings on the Future of Async in C++”

    • I will need to learn a great deal about the various flavors of React, but basically yes, this amounts to adding reactive extensions to my ranges library. Eventually, and assuming this works out as I hope, it will mean proposing it to the standard.

      • Taking into account that there are a lot of reactive-like libraries for C++, it would be great, if it would be possible to make pipelines work for them. And not only reactive or range libraries, e.g. transform and filter make sense for other stuff as well.

  1. for co_await is nice for inverting async push to pull. It is also a natural way to implement the http://www.reactive-streams.org/ protocol to provide back-pressure on async sources.

    overly sequenced

    the challenge in using for co_await to compose async sources and build async algorithms is that it overly sequences async values.

    for co_await(auto users: http.get(<users>)){
    for (auto user: users) {
    co_yield make_tuple(user, co_await http.get(user.avatar);
    }
    }

    what does this do?
    – get a chunk of users
    – get one avatar
    – yield that avatar + user
    – wait for resumption
    – repeat sequentially
    meanwhile additional chunks of users are arriving, stuck in the queue..

    algorithms

    nested for co_await is concat. co_await cannot be used to build merge, take_until, combine_latest, with_latest_from, race, zip, etc…

    usage

    Having complex implementations of these algorithms would be fine, but even with these implemented, common patterns in usage are not supported.

    for co_await(auto d : data.get()) {
    co_await resume_on(window1);
    window1.listbox.push_back(d);
    }
    for co_await(auto s : sensor.get()) {
    co_await resume_on(window2);
    window2.graph.push_back(s);
    }

    the desire is to have multiple out-standing async sources running for the lifetime of the app. this is both common and incompatible with co_await.

    NOTE: there are things that can be done with abstraction. putting each for co_await into a function that returns a future and then when_all() all the returned future. the syntax and semantics for errors etc (composition suffers when for co_await return future that then need to be coordinated..)

    resources

    there is a lot of work that has gone into designing and building the async algorithm set in the Rx community. In C++ there is rxcpp and recently rs

    hope

    If all the algorithms can be built and the usability improved for many app scoped for co_await and nested for co_await to be running simultaneously, then extending the range-v3 library to support async is going to be all that I have dreamed of while building rxcpp.

    • Thanks for your feedback! You’ve gotten farther down this road than I have. If we had good semantics for splits and joins in async pipelines, do you think that would help with the problem of over-sequencing?

      • Yes, the algorithms for splits and joins are the table stakes. I listed some of them earlier.

        library & language

        The result, since co_await only expresses total serialization, is that the only way to express interleaving and coordination is in a library.

        Using a library for interleaving and coordinating multiple sequences that are described using co_await will have challenges at the seams where they intersect.

        I developed an axiom many years ago.

        that which is written async can trivially be made sync. that which is written sync is nearly impossible to make async.

        I coined this when working with the Win32 api, but in the last 2 years I have been sad to see that it applies to REST apis as well.

        a service that was written using GET apis is nearly impossible to stream data from and a service that is written using pub/sub is trivial to ‘get’ data from.

        perhaps this will also apply to co_await

        a library or app that is written using co_await directly is nearly impossible to make interleaved while an app using an async algorithm library is trivial to make sequential.

        push

        I have written transducer, co_await and other experiments to explore the best ways to compose async and I am always drawn back to push-model libraries on the Rx pattern.

        An all-library push model is consistent for sequences and interleaves of sequences. it also presents similar complexity when implementing different operators, merge and concat can both be understood easily in a push library.

        Thanks!

        Naming: in an async world join is not a great name for a nested for/for co_await. join is already used for thread in C++ and in existing async libraries concat is used for join to contrast it with merge (which interleaves instead of sequences the values).

        • The range adaptors in range-v3 are all single-pass; that is, they are linear pipelines that visit each element once. Many of the std:: algorithms are also single-pass. For such linear pipelines, co_await is fine. Executing several such asynchronous pipelines in parallel and combining their results will require a library solution. I agree with you there.

          EDIT: When considering what such a library solution would look like, I am drawn toward Sean Parent’s copyable future which can have multiple continuations attached to it: http://sean-parent.stlab.cc/2017/07/10/future-ruminations.html

          As for join vs. concat, in range-v3 concat takes N arguments that are ranges and glues them together. join takes one argument that is a range of ranges, and flattens them. They are both useful operations. I have been happy with the naming, but I can see your point.

  2. Generators also make it much easier — and efficient — to implement most iterators, particularly sophisticated adaptors.
    The problem is that they only give input iterators.

    • I can believe the “easy” part, and I bet sometimes it’s efficient, but there is still the heap allocation that may or may not get elided depending on your compiler and your code flow. You are likely to get more reliable performance without coroutines.

      I could be spreading FUD, so take this with a grain of salt.

  3. In OCaml, Jane Street’s Core.Async package provides a nice idiom for reading/writing files asynchronously. For example,

    open Core
    open Async
    
    (* Not compiled, not tested. *)
    
    (** [cp src dst] asynchronously copies the contents of the file [src]
         to the file named [dst].*)
    let cp 
      (src : string) 
      (dst : string) : unit Deferred.t =
      Async.Writer.with_file dst 
      ~f:(
         fun out ->
           Async.Reader.with_file src 
           ~f:(
             fun in_ ->
               Async.reader.contents in >>= fun s -> 
               Async.writer.write out s
           )
      )
    

    I believe @Juan Alday mentioned to me once that Asio has something like this?

  4. Actually I’m writing a book on programming with coroutines. However I should point out coroutines have nothing to do with asynchronous behaviour, on the contrary they are a entirely synchronous, single threaded sequential programming construction. My whole language is based on a mix of functions (as in purely functional languages functions) and coroutines. What C++ actual got was what I call yielding generators or iterators in my system. They are coroutines but only a very limited variety of them.

    The system then “bolts on” asynchronous I/O, its use is transparent to the end user. In the real world, we already have a coroutine, namely the operating system. So you already know how to do a read: you say read. End of story. To write, you say write. End of story. You coroutine blocks until the I/O is complete. Your pthread, however, does not. Other coroutines can run.

    The primitives of a coroutine system are spawn, read, write, channel creation. and run. C++ co_wait is very weak. What a pity not to provide an actual coroutine system. The problem is co_wait only reads a single implicit output channel which is very useful but not general.

    This is work in progress and definitely not ready for publication but you can read the working draft:

    https://github.com/felix-lang/felix/blob/master/coprogramming.pdf

    Felix is a C++ code generator, each coroutine has its own stack to save state on, the stack frames are heap allocated for this reason. Stack switching is very fast. Coroutines aren’t compatible with a single machine stack functional world view.

  5. Eric, very nice post, just a small comment regarding your characterization of the TS process, which may come across as a tiny bit too optimistic, and a small disclaimer may go a great length to setting appropriate expectations:

    A published TS does not mean that “a C++ feature is finished”. It’s more like “a prototype for a new proposal is finished”. A published TS is a great thing, but any of the following can in principle happen to it: a) adoption as-is. b) adoption with major modifications or of selected parts only. c) retirement without further progress. d) remain active but unmerged indefinitely. Of course we’re all hoping to be near (a), but I think it’s important to be clear that the TS is a vehicle for gathering experience and feedback and neither a fait accompli nor a guaranteed part of the next standard.

  6. FYI Eric in case you want something more native C++ than a libuv wrapper for any future blog posts on this topic, AFIO v2 (https://ned14.github.io/afio/index.html) supports coroutines out of the box. So off the top of my head:

    extern afio::io_service &service();
    
    auto async_file_chunk( const std::string& str )
      -> async_generator<static_buf_t<1024>&>
    {
      namespace afio = AFIO_V2_NAMESPACE;
    
    // We can use the same request object for
      // all file operations as they don't overlap.
      static_buf_t<1024> buffer;
    
    afio::async_file_handle fh = afio::async_file(service(),
        {}, str).value();
      for(size_t offset = 0;; offset += 1024) {
        auto read = co_await co_read(fh,
          {{{buffer, sizeof(buffer)}}, offset}).value();
        buffer.len = read[0].len;
        if(buffer.len == 0)
          break;
        yield buffer;
      }
    }
    

    There are many compelling advantages to using AFIO over libuv, not least that it is enormously more efficient. Plus, the rest of AFIO is Ranges TS ready, you can adapt a file into a view and do stuff to it e.g.

    namespace afio = AFIO_V2_NAMESPACE;
    
    // Make me a 1 trillion element sparsely allocated integer array!
    afio::mapped_file_handle mfh = afio::mapped_temp_inode().value();
    
    // On an extents based filing system, doesn't actually allocate any physical
    // storage but does map approximately 4Tb of all bits zero data into memory
    mfh.truncate(1000000000000ULL*sizeof(int));
    
    // Create a typed view of the one trillion integers
    afio::algorithm::mapped_view<int> one_trillion_int_array(mfh);
    
    // Write and read as you see fit, if you exceed physical RAM it'll be paged out
    one_trillion_int_array[0] = 5;
    one_trillion_int_array[999999999999ULL] = 6;
    

    Which I think is pretty cool, even if I do say so myself.

Leave a Reply

Your email address will not be published. Required fields are marked *

*