What are Senders Good For, Anyway?

Some colleagues of mine and I have been working for the past few years to give C++ a standard async programming model. That effort has resulted in a proposal, P2300, that has been design-approved for C++26. If you know me or if you follow me on social media, you know I’m embarrassingly excited about this work and its potential impact. I am aware, however, that not everybody shares my enthusiasm. I hear these things a lot these days:

Why would I want to use it?

Why do we need senders when C++ has coroutines?

It’s all just too complicated!

The P2300 crew have collectively done a terrible job of making this work accessible. At the heart of P2300 is a simple, elegant (IMHO) core that brings many benefits, but it’s hard to see that forest for all the trees.

So let’s make this concrete. In this post, I’ll show how to bring a crusty old C-style async API into the world of senders, and why you might want to do that.

C(lassic) async APIs

In a past life I did a lot of Win32 programming. Win32 has several async models to choose from, but the simplest is the good ol’ callback. Async IO APIs like ReadFileEx are shaped like this:

/// Old-style async C API with a callback
/// (like Win32's ReadFileEx)

struct overlapped {
  // ...OS internals here...
};

using overlapped_callback =
  void(int status, int bytes, overlapped* user);

int read_file(FILE*, char* buffer, int bytes,
              overlapped* user, overlapped_callback* cb);

The protocol is pretty simple: you call read_file passing in the usual arguments plus two extra: a pointer to an “overlapped” structure and a callback. The OS will use the overlapped struct for its own purposes, but the user can stuff data there too that the callback can later use. It looks like this:

struct my_overlapped : overlapped {
  // ... my extra data goes here ...
};

void my_callback(int status, int bytes, overlapped* data) {
  auto* my_data = static_cast<my_overlapped*>(data);

  // ...use the extra stuff we put in the `my_overlapped`
  //   object...

  delete my_data; // clean up
}

void enqueue_read(FILE* pfile) {
  // Allocate and initialize my_data...
  auto* my_data =
    new my_overlapped{{}, /* my data goes here */ };

  int status =
    read_file(pfile, buff, bytes, my_data, my_callback);
  // ...
}

What happens is this: read_file causes the OS to enqueue an IO operation, saving the overlapped and overlapped_callback pointers with it. When the IO completes, the OS invokes the callback, passing in the pointer to the overlapped struct. I’ve written code like this hundreds of times. You probably have too.

It’s simple. It works. Why make this more complicated, right?

C(onfusion of) async APIs

There’s nothing wrong with the callback API. What’s wrong is that every library that exposes asynchrony uses a slightly different callback API. If you want to chain two async operations from two different libraries, you’re going to need to write a bunch of glue code to map this async abstraction to that async abstraction. It’s the Tower of Babel problem.

Pieter Brueghel the Elder, Public domain, via Wikimedia Commons Pieter Brueghel the Elder, Public domain, via Wikimedia Commons

“Look, they are one people, and they have all one language, and this is only the beginning of what they will do; nothing that they propose to do will now be impossible for them. Come, let us go down and confuse their language there, so that they will not understand one another’s speech.” — Genesis 11:6–7

So, when there are too many incompatible ways to do a thing, what do we do? We make another of course.

xkcd comic about how standards proliferate https://xkcd.com/927/

The way to escape this trap is for the C++ standard to endorse one async abstraction. Then all the libraries that expose asynchrony can map their abstractions to the standard one, and we’ll all be able to talk to each other. Babel solved.

So that is why the C++ Standardization Committee is interested in this problem. Practically speaking, it’s a problem that can only be solved by the standard.

C(omposable) async APIs

Which brings us to senders, the topic of P2300. There’s a lot I could say about them (they’re efficient! they’re structured! they compose!), but instead I’m going to show some code and let the code do the talking.

If we look at the read_file API above, we can identify some different parts:

  1. The allocation of any resources the async operation needs,
  2. The data that must live at a stable address for the duration of the operation (i.e., the overlapped structure),
  3. The initiation of the async operation that enqueues the async IO, and the handling of any initiation failure,
  4. The user-provided continuation that is executed after the async operation completes (i.e., the callback).
  5. The reclamation of any resources allocated in step 1.

Senders have all these pieces too, but in a uniform shape that makes it possible to work with them generically. In fact, the only meaningful difference between senders and the C-style API is that instead of one callback, in senders there are three: one each for success, failure, and cancellation.

Step 1: The Allocation

Our re-imagined read_file API will look like this:

read_file_sender 
  async_read_file(FILE* file, char* buffer, int size)
  {
    return {file, buffer, size};
  }

The only job of this function is to put the arguments into a sender-shaped object, which looks as follows (explanation after the break)[*]:

[*]: Or rather, it will look like this after P2855 is accepted.

namespace stdex = std::execution;

struct read_file_sender
{
  using sender_concept = stdex::sender_t;             // (1)

  using completion_signatures =                       // (2)
    stdex::completion_signatures<
      stdex::set_value_t( int, char* ),
      stdex::set_error_t( int ) >;

  auto connect( stdex::receiver auto rcvr )           // (3)
  {
    return read_file_operation{{}, {}, pfile, buffer,
                               size, std::move(rcvr)};
  }

  FILE* pfile;
  char* buffer;
  int size;
};

The job of a sender is to describe the asynchronous operation. (It is also a factory for the operation state, but that’s step 2.) On the line marked “(1)”, we declare this type to be a sender. On the line marked “(2)”, we declare the ways in which this asynchronous operation can complete. We do this using a list of function types. This:

stdex::set_value_t( int, char* )

… declares that this async operation may complete successfully by passing an int and a char* to the value callback. (Remember, there are three callbacks.) And this:

stdex::set_error_t( int )

… declares that this async operation may complete in error by passing an int to the error callback. (If this async operation were cancelable, it would declare that with stdex::set_stopped_t().)

Step 2: The Data

On the line marked “(3)” above, the connect member function accepts a “receiver” and returns an “operation state”. A receiver is an amalgamation of three callbacks: value, error, and stopped (canceled, more or less). The result of connecting a sender and a receiver is an operation state. The operation state, like the overlapped struct in the C API, is the data for async operation. It must live at a stable address for the duration.

The connect function returns a read_file_operation object. The caller of connect assumes responsibility for ensuring that this object stays alive and doesn’t move until one of the callbacks is executed. The read_file_operation type looks like this (explanation after the break):

struct immovable {
  immovable() = default;
  immovable(immovable&&) = delete;
};

template <class Receiver>
struct read_file_operation : overlapped, immovable // (1)
{
  static void _callback(int status, int bytes,     // (2)
                        overlapped* data)
  {
    auto* op =
      static_cast<read_file_operation*>(data);     // (3)

    if (status == OK)
      stdex::set_value(std::move(op->rcvr),        // (4)
                       bytes, op->buffer);
    else
      stdex::set_error(std::move(op->rcvr),
                       status);
  }

  void start() noexcept                            // (5)
  {
    int status =
      read_file(pfile, buffer, size, this, &_callback);

    if (status != OK)
      stdex::set_error(std::move(rcvr), status);
  }

  FILE* pfile;
  char* buffer;
  int size;
  Receiver rcvr;
};

The operation state stores the arguments needed to initiate the async operation as well as the receiver (the three callbacks). Let’s break this down by line.

  • “(1)”: The operation state inherits from overlapped so we can pass a pointer to it into read_file. It also inherits from an immovable struct. Although not strictly necessary, this ensures we don’t move the operation state by accident.
  • “(2)”: We define the overlapped_callback that we will pass to read_file as a class static function.
  • “(3)”: In the callback, we down-cast the overlapped pointer back into a pointer to the read_file_operation object.
  • “(4)”: In the callback, we check the status to see if the operation completed successfully or not, and we call set_value or set_error on the receiver as appropriate.
  • “(5)”: In the start() function — which all operation states must have — we actually initiate the read operation. If the initiation fails, we pass the error to the receiver immediately since the callback will never execute.

Step 3: The Initiation

You’ll notice that when we call the sender-ified async_read_file function, we are just constructing a sender. No actual work is started. Then we call connect with a receiver and get back an operation state, but still no work has been started. We’ve just been lining up our ducks, making sure everything is in place at a stable address so we can initiate the work. Work isn’t initiated until .start() is called on the operation state. Only then do we make a call to the C-style read_file API, thereby enqueuing an IO operation.

All this hoop-jumping becomes important once we start building pipelines and task graphs of senders. Separating the launch of the work from the construction of the operation state lets us aggregate lots of operation states into one that contains all the data needed by the entire task graph, swiveling everything into place before any work gets started. That means we can launch lots of async work with complex dependencies with only a single dynamic allocation or, in some cases, no allocations at all.

Now I have to fess up. I fibbed a little when I said that the caller of connect needs to keep the operation state alive at a stable address until one of the callbacks is executed. That only becomes true once .start() has been called on it. It is perfectly acceptable to connect a sender to a receiver and then drop the operation state on the floor as long as .start() hasn’t been called yet. But with .start() you’re committed. .start() launches the rockets. There’s no calling them back.

OK, we’ve constructed the operation state, and we’ve called .start() on it. Now the proverbial ball is in the operating system’s court.

Step 4: The Continuation

The operating system does its IO magic. Time passes. When the IO operation is finished, it will invoke the _callback function with a status code, a pointer to the overlapped struct (our read_file_operation) and, if successful, the number of bytes read. The _callback passes the completion information to the receiver that was connect-ed to the sender, and the circle is complete.

But wait, what about “Step 5: Deallocation”? We never really allocated anything in the first place! The connect function returned the operation state by value. It’s up to the caller of connect, whoever that is, to keep it alive. They may do that by putting it on the heap, in which case they are responsible for cleaning it up. Or, if this async operation is part of a task graph, they may do that by aggregating the operation state into a larger one.

Step 6: Profit!

At this point you may be wondering what’s the point to all of this. Senders and receivers, operation states with fiddly lifetime requirements, connect, start, three different callbacks — who wants to manage all of this? The C API was way simpler. It’s true! So why am I so unreasonably excited about all of this?

The caller of async_read_file doesn’t need to care about any of that.

The end user, the caller of async_read_file, is not going to be mucking about with receivers and operation states. They are going to be awaiting senders in coroutines. Look! The following code uses a coroutine task type from the stdexec library.

exec::task< std::string > process_file( FILE* pfile )
{
  std::string str;
  str.resize( MAX_BUFFER );

  auto [bytes, buff] =
    co_await async_read_file(pfile, str.data(), str.size());

  str.resize( bytes );
  co_return str;
}

What wizardry is this? We wrote a sender, not an awaitable, right? But this is working code! See for yourself: https://godbolt.org/z/1rjsxWxh7.

This is where we get to reap the benefits of programming to a standard async model. Generic code, whether from the standard library or from third party libraries, will work with our senders. In the case above, the task<> type from the stdexec library knows how to await anything that looks like a sender. If you have a sender, you can co_await it without doing any extra work.

P2300 comes with a small collection of generic async algorithms for common async patterns — things like chaining (then), dynamically picking the next task (let_value), grouping senders into one (when_all), and blocking until a sender is complete (sync_wait). It’s a paltry set to be sure, but it will grow with future standards. And as third party libraries begin to adopt this model, more and more async code will work together. Some day very soon you’ll be able to initiate some file IO, read from the network, wait on a timer, listen for a cancellation from the user, wait for them all to complete, and then transfer execution to a thread pool to do more work — whew! — even if each sender and the thread pool came from different libraries.

Senders, FTW!

So, back to those pernicious questions folks keep asking me.

Why would I want to use it?

You want to use senders because then you can stitch your async operations together with other operations from other libraries using generic algorithms from still other libraries. And so you can co_await your async operations in coroutines without having to write an additional line of code.

Why do we need senders when C++ has coroutines?

I hope you realize by now that this isn’t an either/or. Senders are part of the coroutine story. If your library exposes asynchrony, then returning a sender is a great choice: your users can await the sender in a coroutine if they like, or they can avoid the coroutine frame allocation and use the sender with a generic algorithm like then() or when_all(). The lack of allocations makes senders an especially good choice for embedded developers.

It’s all just too complicated!

I’ll grant that implementing senders is more involved than using ordinary C-style callbacks. But consuming senders is as easy as typing co_await, or as simple as passing one to an algorithm like sync_wait(). Opting in to senders is opting into an ecosystem of reusable code that will grow over time.

THAT is why I’m excited about senders.

(And be honest, was wrapping read_file in a sender really all that hard, after all?)

"\e"

15 Replies to “What are Senders Good For, Anyway?”

  1. There is a stupendously important part of “why do we need senders when C++ has coroutines?”. Every call chain leading to a coroutine has a top-level function that isn’t a coroutine. You have your coroutine’s asynchronous result; how do you send that result to your UI?

    With senders, the answer is the same for coroutines (or rather coroutines that yield a sender-aware task) as it is for senders. You bind a continuation to your coroutine task and launch that algorithm-transformed task on an async_scope. Like here:
    https://git.qt.io/vivoutil/libunifex-with-qt/-/blob/main/http_example_stdexec/httpdownloader.cpp?ref_type=heads#L109-113

    • It has “channels”. The set_value completion of a sender is a channel, the set_error completion is a channel. They work across execution contexts, including across threads, and carry no synchronization overhead. If you need to send a bag of data over the channel, pass a std::vector of elements that don’t share anything by value into your value completion, boom, it sends a bag of data over a channel, does it with maximal efficiency wrt. how much data needs to be transferred, and needs no synchronization.

      The framework has much more than just channels, but it does also have channels. It establishes a protocol for how to use those channels, among many other things, so it ties those channels into higher-level abstractions that are generic and composable, instead of providing only the channels and letting you figure out how to tie them together in a composable manner.

      • Is the sender interface only for a one-off value?
        I wish to deliver one value at a time. For example, how do I model mouse events in this framework?
        Also, do you know if it has FRP algorithms like filter, switchMap, zip, etc., which work on several senders?

  2. Seeing three callbacks being mentioned, it reminds me of the story of Rust’s futures. In their first iteration they included some explicit error handling. This got removed because people realized that not all async tasks might result in an error and because Result is sufficient as “return type” for cases when operations can fail. It’s a separation of concerns issue. Does the async facility need to be concerned about failure cases or might an expected be sufficient?

    • A sender can be no-fail, which is like declaring a function noexcept. That means that the receiver connected to it doesn’t need any error handling at all. That receiver wouldn’t need an error callback.

      A receiver only needs handlers for the sender’s completions.

  3. Is there support for async execution that doesn’t involve coroutines or execution threads (eg, simple callbacks) ?

    Separately, I’m seeing the benefits in composability, but there does appear to be an added cost in copying/storage of all the arguments only used by the operation’s start() implementation. Depending on what those arguments are, that cost can be significant, or add complexity to ensuring their lifetime spans the life of the operation (or at least until start() is called, which appears intentionally indeterminate).

    • Is there support for async execution that doesn’t involve coroutines or
      execution threads (eg, simple callbacks) ?

      Sure, you don’t have to use coroutines to use senders. And senders can “multi-task” on a single thread, handy for embedded systems that only have a single thread.

      There is a cost to currying the arguments into senders, it’s true. That said, compilers are surprisingly good at optimizing the overhead away.

      And unless you’re writing sender algorithms or wrapping async APIs, you’re not going to be dealing with receivers or operation states. The lifetime issues will be handled by the sender algorithms you call.

  4. It looks as though this models a one-shot operation, which completes as soon as the sender has called the receiver once.

    I’ve been working on building stream-based pub/sub with coroutines, where a sender can send any number of data values before completing — a typical example is a wrapper on a TCP socket, which sends the receiver strings of bytes as they arrive ending in an EOF or error.

    Is it possible to represent this with the std senders API?

  5. Personally, I won’t be using this API for the same reason I won’t be using coroutines. What we are getting is high-level abstraction compromise of some problem the comittee deemed is the most common. What we were supposed to get are the building blocks to make our own abstractions, like the one you have here on this page. The goal to unify the APIs so they are composable is noble one but you won’t reach it if nobody is using the standard abstraction.

    I have been following your work for years now and have great respect for it but I think you are trying to solve too much in one step.

  6. Thanks for this great blog post. It really helped me to grasp what a sender is.
    A few Qs:
    Can senders send more than 1 value?
    Can I model a “mouse event sender” that sends mouse x/y, button press, etc?
    Can I think of senders as RX operators? Will we have RX algos, such as zip, switch_map, take_*, etc?
    Can I model fan-in/Fan-out graphs?
    Can I model a control flow loop with senders?
    Say I have an algorithm that optimizes a problem’s solution, and I wish to run it as long as it doesn’t converge (like a while loop in regular code). Can I do this with senders?

  7. Hi Eric,

    In this blog post you write that «They may do that by putting it [the operation state, nda] on the heap», however P2300R9§5.2 states that «An operation state is neither movable nor copyable».

    I’m a bit at a loss: how can a non movable nor copyable object returned by connect() by value be put on the heap?

Leave a Reply

Your email address will not be published. Required fields are marked *

*

This site uses Akismet to reduce spam. Learn how your comment data is processed.