Sunday, July 28, 2019

Channels

I implemented Go-style channels and select in C++. The need doesn't arise often, but when it does, it's frustrating not to be able to multiplex operations easily.

But Boost already implements channels! And why not just use asio? And if you care so much about multiplexing operations on logical threads of execution, why not just use Go instead of C++? What, you're going to use use kernel threads for this? Won't that use too many resources? Aren't you concerned about the cost of context switching? What about the C10k problem? Besides, you probably don't even need channels. You should just do things another way. Why are you reinventing the wheel? Don't you know anything?

Okay, okay. Your concerns are valid, but things are going to be fine. Computers are fun.

Motivation

Back when I was writing a C++ wrapper library for POSIX message queues, I was frustrated by how difficult it was to portably consume a message queue while also being able to stop on demand. The simplest consumer I could imagine, "consume messages from this queue until I tell you to stop," in general requires the use of UNIX signals, since in general POSIX message queues are not files, and so cannot be used in IO multiplexing facilities like poll. Sure, you could send a special message to the queue, "okay, stop now," but that works only if you are the only consumer. You wouldn't want your "stop" message to go to some other consumer.

Fortunately, on Linux it is the case that POSIX message queues are files, and so I can use poll to block on the condition that either a message arrives on the queue or somebody pokes me to tell me to stop. I could make a pipe on which the consumer would "poll" for reads, so that when I wanted to tell the consumer "stop," I'd just write to the pipe. The consumer would then handle that event by ceasing its queue consuming activities.

What do I write to the pipe? Anything really. What if I wanted to communicate more than just "stop," though? Maybe there are other commands I'd like to send to the queue-consuming thread. I could invent a protocol of messages to encode onto the pipe, and then the queue-consuming thread would parse them on the other end. That would be silly, though, since the consumer is in the same address space as the "stopper." Instead, it would be better to coordinate the copying/moving of a "command" object from one location to another, using the pipe only to wake a possibly sleeping thread.

Now what if I had more than one thread that wanted to send a command to the consumer? Well, they would contend for some mutex and thus each would have to wait its turn.

I could even add a more contrived requirement that a thread be able to send such a command to one of multiple consumers, whichever is available first. Regardless, the abstraction that is coming into focus from this combination of polling files and copying objects is a channel. Let the mutex, the pipe, and poll all be implementation details of a mechanism for exchanging a value with another thread. Further, I want to be able to perform one out of possibly many exchanges, with only one occurring between any two threads at a given time.

select(...)

In Go, the facility for doing exactly one send/receive operation from a set of possible channels is called select. I like that name, so let's use it.

The thing is, we're not concerned solely with sending and receiving on channels. In the motivating example, above, one of the operations is to receive from a POSIX message queue. Or, possibly I want to read/write on a FIFO, or wait for some timeout to expire, or accept on a socket. We need a more general notion of select than Go provides.

Also, as a library writer in C++, I can't change the language itself. What should the C++ analog of Go's select statement look like? My favorite idea, from this project, is to use a switch statement:

switch (select(file.read(&buffer), chan1.send(thing), chan2.recv(&message))) {
    case 0:  // successfully read data from `file`
    case 1:  // successfully sent `thing` to `chan1`
    case 2:  // successfully received `message` from `chan2`
    default: // an error occurred
}

For the naughty-minded among you: no, you can't use preprocessor macros to make something more like Go's select statement. Not without lambda expressions and additional overhead, anyway.

Concurrent ML

Go is not the only language with channels. It is likely the most popular, and the reason why so many other languages are now adding similar facilities of their own (e.g. Clojure).

I enjoy Scheme. One of its variants with which I have the most experience, Racket, has a select-like facility, called sync, that works with all kinds of things, not just channels. The "things" it works with are deemed "events," and evidently there's a whole calculus, called "Concurrent ML," for composing events and synchronizing threads of execution with them (see this, this, and this).

I did not implement Concurrent ML in C++. It's a little beyond my grasp. What I did take from Concurrent ML, though, is the idea that my synchronization primitive, select, will operate on events, not on channels.

Events

What is an event? To me, it's a state machine. Under the hood, a thread will be blocking in a call to poll, but the events determine which files will be monitored by poll.

Let an IoEvent be a notion of the sort of thing poll can monitor (like the pollfd structure, but including timeouts and not depending on any system headers), together with a special value indicating "I'm done." Then I call an event any object that supports the following three operations:

That's it! Then a sketch of how select works is straightforward:

def select(events):
    ioEvents = [event.file() for event in events]
    while True:
        poll(ioEvents)
        for i, (ioEvent, event) in enumerate(zip(ioEvents, events)):
            if ioEvent.ready:
                ioEvent = ioEvents[i] = event.fulfill(ioEvent)
                if ioEvent.fulfilled:
                    for ioEvent, loser in zip(ioEvents, events):
                        if loser is not event:
                            loser.cancel(ioEvent)
                    return event

The trick, then, is to express sending to or receiving from a channel as one such event.

The Original Channel Design

I don't know how to implement channel send events and receive events using the framework described above. I thought that I did, but there's an essential piece missing that, I think, makes selecting on channel events impossible.

Here was my original design. A channel contains a mutex, a queue of senders, and a queue of receivers. Each sender or receiver has two pipes: one for communicating with that sender/receiver, and another for responding to whoever was writing to the sender/receiver.

The event member functions for a sender or receiver could then look like this:

I thought that this was a good protocol, and it mostly works. The fatal flaw takes the form of a deadlock.

Suppose you have two threads, thread1 and thread2, selecting on two channels, chan1 and chan2. The following situation can produce a deadlock some minority of the time.

On thread1:

switch (select(chan1.send(value), chan2.recv(&destination))) {
    // ...
}

On thread2:

switch (select(chan2.send(value), chan1.recv(&destination))) {
    // ...
}

That is, thread1 is sending to chan1 and receiving from chan2, while thread2 is doing the opposite β€” sending to chan2 and receiving from chan1.

What causes the deadlock is that blocking read in IoEvent fulfill(IoEvent). Here's one possible interleaving that causes a deadlock.

thread1 thread2
sit in chan1 sit in chan2
say HI on chan2 say HI on chan1
got HI on chan1 got HI on chan2
block for reply on chan1 block for reply on chan2

For comparison, what happens more often is the following:

thread1 thread2
sit in chan1 sit in chan2
say HI on chan2 say HI on chan1
got HI on chan1
block for reply on chan1 got READY on chan1
transfer value over chan1
say DONE on chan1
got DONE on chan1

Here there's no deadlock; instead, chan1 "won the race." How can I avoid the deadlocking case?

No amount of protocol tweaking is enough to fix this problem. In order to have the "exactly one event is fulfilled" guarantee, a send/receive event must perform a blocking read at some point, and doing so could cause a deadlock when select involves more than one channel.

The New Channel Design

Deadlocked and forlorn, I looked to Go's implementation of select for inspiration. This description of Go channels, by Dmitry Vyukov, was especially helpful. In particular, he notes the following (emphasis mine):

There is another tricky aspect. We add select as waiter to several channels, but we do not want several sync channel operations to complete communication with the select (for sync channels unblocking completes successful communication). In order to prevent this, select-related entries in waiter queues contain a pointer to a select-global state word. Before unblocking such waiters other goroutines try to CAS(statep, nil, sg), which gives them the right to unblock/communicate with the waiter. If the CAS fails, goroutines ignore the waiter (it’s being signaled by somebody else).

That's what I was missing! In my original design, a thread interacting with another thread over a channel had no notion of the other events happening in either thread's select call. A thread must bring along with it a piece of (as Dmitry put it) "select-global state," effectively allowing different events in the same call to select to interact with each other.

While it's encouraging that there is a way to overcome the deadlock described above, doing so spoils the simplicity of the original select implementation.

On the other hand, it simplifies the protocol described in the previous section (HI, READY, DONE, etc.) since now a mutex will be used for coordinating one side of the communication between two threads, rather than an additional pipe.

EventContext

Associated with each call to select will be an instance of the following struct:

// `SelectorFulfillment` is a means by which an event in one `select`
// invocation can check or set the fulfillment of an event in a different
// `select` invocation.
struct SelectorFulfillment {
    // Note that, by convention, `&mutex` (the address of the `mutex`) will be
    // used to determine the locking order among two or more
    // `SelectorFulfillment::mutex`.
    Mutex mutex;

    enum State {
        FULFILLABLE,   // not fulfilled, and fulfillment is allowed
        FULFILLED,     // has already been fulfilled
        UNFULFILLABLE  // not fulfilled, but fulfillment is not allowed
    };

    State state;

    // key of the fulfilled event; valid only if `state == FULFILLED`
    EventKey fulfilledEventKey;
};

Channel send/receive events are then each given an EventContext by select, where the EventContext contains the EventKey of that event, and a smart pointer to the select's SelectorFulfillment. EventContext looks like this:

struct EventContext {
    SharedPtr<SelectorFulfillment> fulfillment;
    // key of the event to which this `EventContext` was originally given
    EventKey eventKey;
};

An event can be given its EventContext as an argument to the one call to IoEvent file(), so now the event concept looks like this:

Non-channel events, such as file reads/writes, can simply ignore the additional const EventContext& argument.

Now, to make this new scheme work, there are three things that need to happen.

  1. select keeps its SelectorFulfillment::mutex locked at all times except when it's blocked by ::poll. Effectively, we're implementing a condition variable β€” but one that plays nice with file IO multiplexing.
    fulfillment.mutex.unlock();
    const int rc = ::poll(/*...*/);
    fulfillment.mutex.lock();
    
  2. When a channel send/receive event wants to "visit" another thread, it does so by locking the other thread's SelectorFulfillment. Naively, this can cause another deadlock, where now instead of blocking each other on reading a pipe, threads could block each other acquiring a lock on each others' mutexes. The trick to avoiding this is always lock the mutexes in the same order. In particular, this means that if a thread's mutex comes after the mutex of the thread it is trying to visit, it must first unlock its mutex, then acquire a lock on the other mutex, and then re-lock its mutex. The initial unlocking of the thread's mutex prevents the deadlock.

    Once a visiting thread has acquired the two locks, it examines the state field of the other thread's SelectorFulfillment. If the state is FULFILLABLE, then the thread performs the transfer, marks the state FULFILLED, notes the EventKey of the other thread (so that select knows who was fulfilled when that thread next awakens), and writes DONE to the other thread's pipe. If the state is not FULFILLABLE, then unlock that thread's mutex and try somebody else.

  3. select checks its SelectorFulfillment::state after each call to poll, or any event's file or to fulfill member functions. It could be that during one of those calls, the event fulfilled an event on another thread, or it could be that the event momentarily relinquished the lock on its mutex and was fulfilled by another thread. Either way, select's work is done. It can see which event was fulfilled by reading the SelectorFulfillment::fulfilledEventKey field, and proceed with cleanup.

Once I implemented these changes, the deadlock described in the previous section went away.

selectOnDestroy

For any of you still reading this (good job!), there were other morsels of C++ design that I encountered while working on this project.

For example, I want a channel's send and recv member functions to return an event object suitable for use as an argument to select:

switch (select(chan1.send(something), chan2.recv(&somethingElse))) {
    case 0: // ...
    case 1: // ...
    default: // ...
}

That's fine, but what if I want to perform a channel operation on its own, e.g.

chan1.send(something);

or

std::string message;
chan2.recv(&message);

How do I make sure that such calls actually do something? One option is to have separate member functions instead:

chan1.doSend(something)

std::string message;
chan2.doRecv(&message);

That looks terrible.

At least with recv we could overload the member function to have a no-argument version that just returns the received value:

std::string message = chan2.recv();

This wouldn't work for send, though.

The equivalent code using the existing send and recv would be:

select(chan1.send(something));

std::string message;
select(chan2.recv(&message));

That also looks terrible.

If only send/receive events could somehow know whether they were part of a select invocation. If they could, then they could have the policy "if my destructor is being called and I was never copied into a select call, then call select with myself as the argument.

This way, code like this would still work:

select(chan1.send(something));  // Used in `select`; don't block in destructor

but so would this:

chan1.send(something);  // Not used in `select`; call `select` in destructor

For those of you currently thinking "that is a terrible idea," I agree with you. Returning an object whose destructor then performs an operation is not the same thing as performing an operation before returning.

Also, aren't we supposed to avoid blocking in destructors? I mean, look at what std::thread does. What about stack unwinding? Fortunately, a destructor can detect whether there are currently any exceptions in flight. It wouldn't surprise me if use of that function is frowned upon, though.

Terrible idea or not, at least for the intended use case, the "history-aware destructor" gets the job done. My main concern would be that returned temporaries are not destroyed until the "end of the full statement" in which they were created, which would mean that if you create multiple send/receive events as part of one complicated expression, the actual sends and receives will all happen "at the semicolon," rather than at their call sites. I just don't see this being a problem, though, because there are only two reasons why a send or recv would be part of a larger statement:

  1. You're using them in select. Fine, that's their intended use.
  2. You're using their return values as input to some expression other than a call to select. Like what? The overloads in question don't return meaningful values, so in what situation would you compose them into a non-select expression?

So, the "history-aware destructor" solution is viable. How do we implement it?

Let's ignore C++11's move semantics for now, and restrict ourselves to copies. The signature of the copy constructor looks like this:

Object(const Object& other);

const Object, so we can't modify the other object. Then how are we supposed to mark it as "don't block in your destructor"? We'll have to use mutable:

class Object {
    mutable bool selectOnDestroy = true;

  public:
    Object(const Object& other)
    : selectOnDestroy(other.selectOnDestroy) {
        other.selectOnDestroy = false;
    }

    ~Object() {
        if (selectOnDestroy && !std::uncaught_exceptions()) {
            select(*this);
        }
    }

    // ...
};

This breaks the idea of what it means to copy something. Better would be to make Object a move-only type, and modify other.selectOnDestroy in the move constructor. However, I want my library to support C++98, and so I'd need this hack anyway.

Now, how does an Object detect that it is being used in a call to select? We could set selectOnDestroy = false in the file member function, but it's possible that file will never get called if another event's file causes the select to be fulfilled. What's needed is an additional member function in the event concept:

void touch() noexcept;

touch is guaranteed to be called exactly once on each event before file is called on anybody. This way, each event gets an opportunity to mark itself selectOnDestroy = false:

void Object::touch() noexcept {
    selectOnDestroy = false;
}

With these changes, we support both usage styles for send and recv:

// block until we can send
chan1.send(something);

std::string message;
// block until we can receive
chan2.recv(&message);

// block until we can either send or receive, but not both
switch (select(chan1.send(somethingElse), chan2.recv(&message))) {
    case 0: // ...
    case 1: // ...
    default: // ...
}

Error Handling

I haven't mentioned how error handling works in this channels library. Does select throw exceptions? Does it return special values indicating errors? How does a client of select know when an error occurs, and which kind?

My first idea was just to have select throw an exception when an error occurs. The trouble with this is that then if a client wants to handle the error immediately, they have to indent the entire select/switch construct in a try block:

try {
    switch (select(...)) {
        case 0: // ...
        case 1: // ...
    }
}
catch (...) {
    // ...
}

This wouldn't bother me if it weren't for that fact that one of the strengths of the select/switch combination is that the "handler" for each case is right there in the switch statement. Indenting the switch in order to catch exceptions means indenting all of the "handler" code as well.

This problem goes away if the client allows the exception to propagate out of the scope in which select was called, which is probably the common case, and the benefit of exceptions generally. However, I still consider the try block too high a price to pay.

As an alternative, select can return negative values for errors, and associated with each error code there can be a descriptive (though non-specific) error message. For example:

switch (const int rc = select(...)) {
    case 0: // ...
    case 1: // ...
    case 2: // ...
    default:
        std::cerr << "error in select(): " << errorMessage(rc) << "\n";
}

That looks okay. But what if the client wants an exception to be thrown? For that, we can replace the errorMessage(int) function with a SelectError(int) constructor:

switch (const int rc = select(...)) {
    case 0: // ...
    case 1: // ...
    case 2: // ...
    default:
        throw SelectError(rc);
}

This way, the extra code needed to use exceptions is just one statement.

So far so good, but there is still something missing. My original idea of using exceptions throughout had the added benefit that the thrower of the exception can include runtime-specific information in the exception. For example, if copying/moving a value across a channel throws an exception, that exact exception could be propagated to the caller of select. Or, if the error that occurred was at the system level, such as in the pthreads library, then the relevant errno value could be included in the thrown exception. This is not possible if all you have to work with is the category of error (one of the negative return values of select).

Is there a way to combine the "throw an exception only if you want" behavior above with the "preserve information known only at the site of the error" property of using exceptions throughout?

The only way I thought to reconcile them is by using a thread-local exception object. When an error occurs within a call to select, an exception is thrown, but then rather than letting the exception escape, select instead catches it and copies it to thread-local storage. This way, clients of select can do the following:

switch (select(...)) {
    case 0: // ...
    case 1: // ...
    case 2: // ...
    default:
        throw lastError();
}

Maybe you don't like the idea of using thread-local storage. It feels like a global variable. It feels like a hack. It feels dirty.

Hey, it works.

There's one more alternative that I considered. Instead of returning an integer, what if select returned some object implicitly convertible to an integer, but that also contained error information?

switch (Selection rc = select(...)) {
    case 0: // ...
    case 1: // ...
    case 2: // ...
    default:
        throw rc.exception();
}

Now there's no need for thread-local storage, because the exception object that clients might want to throw can be stored in the Selection object returned by select. To be honest, I still prefer the thread-local version, but I might implement this variant as well, for naysayers.

Supporting C++98 Sucks Without Boost

I set out with the requirement that this channels library work with C++98 in addition to more recent versions of the language. One reason is simply the joy of what I'll call "constraint driven design." Another reason is that there are droves of programmers out there still chained to dead-end platforms and profitable balls of mud. I highly doubt that any of those programmers are about to start using my channels library in their legacy code, but they could if they wanted to.

One easy way to support C++98 without losing your mind is use boost, the grandfather of all C++ libraries. Boost is both at the cutting edge of what can be done with the language, and provides portable C++98 versions of various now-standard facilities.

Boost is also big. That's not a viable excuse for my not using it, but requiring clients of my library to have boost installed does contradict the goal of providing a minimal, portable (except for Windows), self-contained library.

An alternative to boost that I considered is BDE, Bloomberg's C++ library. It's about half the size of boost, and certainly implements all of the facilities I'd need for the channels library, but BDE is not nearly as widely used as boost, uses its own version of parts of the standard library, and does not seem to be maintained.

Without boost or something like it, I'm on my own to use POSIX for whatever I need. At first I thought that this wouldn't be a big deal, but it ended up consuming most of my development time.

Since you asked, here is the list of could-have-just-used-C++11 features that I ended up implementing:

I could have avoided implementing those twelve components if only I had required C++11 or boost. All together my implementations amount to an additional 1173 lines of source. That sounds like a lot, but considering that it allows the library to support C++98 without depending on a large external library, I think that it's justified.

More

That's enough of that. If your curiosity is piqued, then you can get started playing with C++ channels and see how you like it.