Embedding ZeroMQ in the libev Event Loop

In a previous article on ZeroMQ we went over how ZeroMQ is triggered when you use the socket that ZeroMQ returns, in that article there was some discussion of embedding ZeroMQ into another event loop. Let's do that.

libev is an absolutely fantastic library that helps make it easy to write evented programs. Evented programs work by getting notified that an action has happened, and acting upon it. Unlike threaded where multiple pieces of work are being executed at the same time, in an evented system you move every item that could block to an event loop, that then calls back into user code with a notification to continue. If one event uses up more than its fair share of CPU time because it is busy doing a long calculation, every single other event that is waiting will never get notified.

Now, as previously discussed ZeroMQ is edge triggered, so embedding it into an event loop that is level triggered doesn't do us much good, because we will miss certain ZeroMQ notifications.

One way to solve this problem is by looping over ZeroMQ's event system until we get back a notification that it no longer has anything else for us to process, that would look something like this1:

int zevents = 0;
size_t zevents_len = sizeof(zevents);
zmq_socket.getsockopt(ZMQ_EVENTS, &zevents, &zevents_len);

do {
    if (zevents & ZMQ_POLLIN) {
        // We can read from the ZeroMQ socket
    } else {
        break;
    }

    // Check to see if there is more to read ...
    zmq_socket.getsockopt(ZMQ_EVENTS, &zevents, &zevents_len);
} while (zevents & ZMQ_POLLIN);

if (zevents & ZMQ_POLLOUT) {
    // We can write to the ZeroMQ socket
}

// If neither of the above is true, then it was a false positive

However if we are receiving information from ZeroMQ remote endpoints faster than we can process them, we end up being stuck in that do ... while loop forever. If we have other events we want to process, that isn't entirely fair since they will never ever get called again. Especially in a server application where it may be servicing thousands of clients this is simply not acceptable.

libev

libev provides various different event notifications, to be able to get around edge triggered notifications, and still provide fair round-robin for all events we are going to have to build on top of multiple different events.

The events used will be:

  • ev::io: This one is pretty self explanatory, this is for getting notified about input output changes. This is the one we are going to use on the ZMQ_FD.
  • ev::prepare and ev::check: These two are generally used together, they can be used to change the event loop and or make modifications on the fly to events that have been registered with the event loop.
  • ev::idle: This is an event that gets fired whenever the event loop has nothing else to do, so no other events fired, this will fire.

Plan of attack

Since the prepare and check run before and after the loop, we are going to be using those to do most of the work. We use an io so that we can turn off the idle when we can actually wait for a result from ZeroMQ's file descriptor, otherwise we use idle so that we will always get called once every loop.

In the prepare watcher callback we do the following:

  1. Check to see what events ZeroMQ has for us, and check what events the user has requested.
  2. If the ZeroMQ has an event for us that we want, and the user has requested that event, we start the idle watcher.
  3. If ZeroMQ has no events, we start the io watcher.

In the check watcher callback we do the following:

  1. Stop both the io and idle watchers, they were only there to make sure that our check watcher was called.
  2. See what event ZeroMQ has for us, and check that against what the user wants. Depending on the event, call user defined function write() or user defined function read().
  3. If this was a spurious wake-up on the part of ZeroMQ we simply ignore it and let libev go on to other events.

We could make all of this work by simply using the prepare, check and idle watchers, but that would mean libev would be busy-waiting for something to happen on the ZeroMQ socket. The io watcher is required simply so in times of nothing happening libev in its library can call into the kernels event handling mechanism and go to sleep. We can't use just the io watcher due to the edge-triggered notification, because we'd miss all kinds of ZeroMQ messages. So all four watchers are required, and play crucial parts in making this work.

Let's get down to code

Below you will find example code, it is not complete. Do note that I am using some C++11isms, error checking code may not be complete/correct and in general I don't suggest you copy and paste this without reading and understanding what it does.

The zmq_event class is meant to be used as a base class, inherit from it, and create the write() and read() functions. These functions will be called when you are able to read from the ZeroMQ socket, or when you are able to write to the ZeroMQ socket. You are guaranteed to be able to read one whole ZeroMQ message, so if it is a multi-part message, do make sure to loop on ZMQ_SNDMORE as required.

Upon instantiation it will automatically start being notified about events, we start off with ev::READ. When your sub-class wants to write to ZeroMQ it should put the messages to be written into a list somewhere, and set ev::READ | ev::WRITE on watcher_io, by calling watcher_io.set(socket_fd, ev::READ | ev::WRITE). write() will then be called, write a single message to ZeroMQ, and if necessary when finished writing, unset ev::WRITE using watcher_io.set(socket_fd, ev::READ). If you are not finished writing, after writing that singular message you may return and write() will be called again the next loop iteration. This way if you have a lot of data to write you don't starve the other events from receiving their notifications.

zmq_event.h

#include <string>

#include <ev++.h>
#include <zmq.hpp>

class zmq_event {
    public:
        zmq_event(zmq::context_t& context, int type, const std::string& connect);
        virtual ~zmq_event();

    protected:
        // This gets fired before the event loop, to prepare
        void before(ev::prepare& prep, int revents);

        // This is fired after the event loop, but before any other type of events
        void after(ev::check& check, int revents);

        // We need to have a no-op function available for those events that we
        // want to add to the list, but should never fire an actual event
        template <typename T>
            inline void noop(T& w, int revents) {};

        // Function we are going to call to write to the ZeroMQ socket
        virtual void write() = 0;

        // Function we are going to call to read from the ZeroMQ socket
        virtual void read() = 0;

        // Some helper function, one to start notifications
        void start_notify();

        // And one to stop notifications.
        void stop_notify();

        // Our event types
        ev::io      watcher_io;
        ev::prepare watcher_prepare;
        ev::check   watcher_check;
        ev::idle    watcher_idle;

        // Our ZeroMQ socket
        zmq::socket_t socket;
        int           socket_fd = -1;
};

zmq_event.cc

#include <stdexcept>

#include "zmq_event.h"

zmq_event::zmq_event(zmq::context_t& context, int type, const std::string& connect) : socket(context, type) {
    // Get the file descriptor for the socket
    size_t fd_len = sizeof(_socket_fd);
    socket.getsockopt(ZMQ_FD, &socket_fd, &fd_len);

    // Actually connect to the ZeroMQ endpoint, could replace this with a bind as well ...
    socket.bind(connect.c_str());

    // Set up all of our watchers

    // Have our IO watcher check for READ on the ZeroMQ socket
    watcher_io.set(socket_fd, ev::READ);

    // This watcher has a no-op callback
    watcher_io.set<zmq_event, &zmq_event::noop>(this);

    // Set up our prepare watcher to call the before() function
    watcher_prepare.set<zmq_event, &zmq_event::before>(this);

    // Set up the check watcher to call the after() function
    watcher_check.set<zmq_event, &zmq_event::after>(this);

    // Set up our idle watcher, once again a no-op
    watcher_idle.set<zmq_event, &zmq_event::noop>(this);

    // Tell libev to start notifying us!
    start_notify();
}

zmq_event::~zmq_event() {}

zmq_event::before(ev::prepare&, int revents) {
    if (EV_ERROR & revents) {
        throw std::runtime_error("libev error");
    }

    // Get any events that may be waiting
    uint32_t zevents = 0;
    size_t zevents_len = sizeof(zevents);

    // Lucky for us, getting the events available doesn't invalidate the
    // events, so that calling this in `before()` and in `after()` will
    // give us the same results.
    socket.getsockopt(ZMQ_EVENTS, &zevents, &zevents_len);

    // Check what events exists, and check it against what event we want. We
    // "abuse" our watcher_io.events for this information.
    if ((zevents & ZMQ_POLLOUT) && (watcher_io.events & ev::WRITE)) {
        watcher_idle.start();
        return;
    }

    if ((zevents & ZMQ_POLLIN) && (watcher_io.events & ev::READ)) {
        watcher_idle.start();
        return;
    }

    // No events ready to be processed, we'll just go watch some io
    watcher_io.start();
}

zmq_event::after(ev::check&, int revents) {
    if (EV_ERROR & revents) {
        throw std::runtime_error("libev error");
    }

    // Stop both the idle and the io watcher, no point in calling the no-op callback
    // One of them will be reactived by before() on the next loop
    watcher_idle.stop();
    watcher_io.stop();

    // Get the events
    uint32_t zevents = 0;
    size_t zevents_len = sizeof(zevents);
    socket.getsockopt(ZMQ_EVENTS, &zevents, &zevents_len);

    // Check the events and call the users read/write function
    if ((zevents & ZMQ_POLLIN) && (watcher_io.events & ev::READ)) {
        this->read();
    }

    if ((zevents & ZMQ_POLLOUT) && (watcher_io.events & ev::WRITE)) {
        this->write();
    }
}

zmq_event::start_notify() {
    watcher_check.start();
    watcher_prepare.start();
}

zmq_event::stop_notify() {
    watcher_check.stop();
    watcher_prepare.stop();
}

Other event loops

libev is but just one of many event loops that exist out there, hopefully this shows how it is possible to embed ZeroMQ into an event loop, thereby making it easier to embed ZeroMQ into any other event loops.


  1. This snippet was from my older article regarding ZeroMQ edge triggered notifications. I would highly suggest reading that article for more information and even more background on what is going on. 

Embedding ZeroMQ in the libev Event Loop

In a previous article on ZeroMQ we went over how ZeroMQ is triggered when you use the socket that ZeroMQ returns, in that article there was some discussion of embedding ZeroMQ into another event loop. Let's do that.

libev is an absolutely fantastic library that helps make it easy to write evented programs. Evented programs work by getting notified that an action has happened, and acting upon it. Unlike threaded where multiple pieces of work are being executed at the same time, in an evented system you move every item that could block to an event loop, that then calls back into user code with a notification to continue. If one event uses up more than its fair share of CPU time because it is busy doing a long calculation, every single other event that is waiting will never get notified.

Now, as previously discussed ZeroMQ is edge triggered, so embedding it into an event loop that is level triggered doesn't do us much good, because we will miss certain ZeroMQ notifications.

One way to solve this problem is by looping over ZeroMQ's event system until we get back a notification that it no longer has anything else for us to process, that would look something like this1:

int zevents = 0;
size_t zevents_len = sizeof(zevents);
zmq_socket.getsockopt(ZMQ_EVENTS, &zevents, &zevents_len);

do {
    if (zevents & ZMQ_POLLIN) {
        // We can read from the ZeroMQ socket
    } else {
        break;
    }

    // Check to see if there is more to read ...
    zmq_socket.getsockopt(ZMQ_EVENTS, &zevents, &zevents_len);
} while (zevents & ZMQ_POLLIN);

if (zevents & ZMQ_POLLOUT) {
    // We can write to the ZeroMQ socket
}

// If neither of the above is true, then it was a false positive

However if we are receiving information from ZeroMQ remote endpoints faster than we can process them, we end up being stuck in that do ... while loop forever. If we have other events we want to process, that isn't entirely fair since they will never ever get called again. Especially in a server application where it may be servicing thousands of clients this is simply not acceptable.

libev

libev provides various different event notifications, to be able to get around edge triggered notifications, and still provide fair round-robin for all events we are going to have to build on top of multiple different events.

The events used will be:

  • ev::io: This one is pretty self explanatory, this is for getting notified about input output changes. This is the one we are going to use on the ZMQ_FD.
  • ev::prepare and ev::check: These two are generally used together, they can be used to change the event loop and or make modifications on the fly to events that have been registered with the event loop.
  • ev::idle: This is an event that gets fired whenever the event loop has nothing else to do, so no other events fired, this will fire.

Plan of attack

Since the prepare and check run before and after the loop, we are going to be using those to do most of the work. We use an io so that we can turn off the idle when we can actually wait for a result from ZeroMQ's file descriptor, otherwise we use idle so that we will always get called once every loop.

In the prepare watcher callback we do the following:

  1. Check to see what events ZeroMQ has for us, and check what events the user has requested.
  2. If the ZeroMQ has an event for us that we want, and the user has requested that event, we start the idle watcher.
  3. If ZeroMQ has no events, we start the io watcher.

In the check watcher callback we do the following:

  1. Stop both the io and idle watchers, they were only there to make sure that our check watcher was called.
  2. See what event ZeroMQ has for us, and check that against what the user wants. Depending on the event, call user defined function write() or user defined function read().
  3. If this was a spurious wake-up on the part of ZeroMQ we simply ignore it and let libev go on to other events.

We could make all of this work by simply using the prepare, check and idle watchers, but that would mean libev would be busy-waiting for something to happen on the ZeroMQ socket. The io watcher is required simply so in times of nothing happening libev in its library can call into the kernels event handling mechanism and go to sleep. We can't use just the io watcher due to the edge-triggered notification, because we'd miss all kinds of ZeroMQ messages. So all four watchers are required, and play crucial parts in making this work.

Let's get down to code

Below you will find example code, it is not complete. Do note that I am using some C++11isms, error checking code may not be complete/correct and in general I don't suggest you copy and paste this without reading and understanding what it does.

The zmq_event class is meant to be used as a base class, inherit from it, and create the write() and read() functions. These functions will be called when you are able to read from the ZeroMQ socket, or when you are able to write to the ZeroMQ socket. You are guaranteed to be able to read one whole ZeroMQ message, so if it is a multi-part message, do make sure to loop on ZMQ_SNDMORE as required.

Upon instantiation it will automatically start being notified about events, we start off with ev::READ. When your sub-class wants to write to ZeroMQ it should put the messages to be written into a list somewhere, and set ev::READ | ev::WRITE on watcher_io, by calling watcher_io.set(socket_fd, ev::READ | ev::WRITE). write() will then be called, write a single message to ZeroMQ, and if necessary when finished writing, unset ev::WRITE using watcher_io.set(socket_fd, ev::READ). If you are not finished writing, after writing that singular message you may return and write() will be called again the next loop iteration. This way if you have a lot of data to write you don't starve the other events from receiving their notifications.

zmq_event.h

#include <string>

#include <ev++.h>
#include <zmq.hpp>

class zmq_event {
    public:
        zmq_event(zmq::context_t& context, int type, const std::string& connect);
        virtual ~zmq_event();

    protected:
        // This gets fired before the event loop, to prepare
        void before(ev::prepare& prep, int revents);

        // This is fired after the event loop, but before any other type of events
        void after(ev::check& check, int revents);

        // We need to have a no-op function available for those events that we
        // want to add to the list, but should never fire an actual event
        template <typename T>
            inline void noop(T& w, int revents) {};

        // Function we are going to call to write to the ZeroMQ socket
        virtual void write() = 0;

        // Function we are going to call to read from the ZeroMQ socket
        virtual void read() = 0;

        // Some helper function, one to start notifications
        void start_notify();

        // And one to stop notifications.
        void stop_notify();

        // Our event types
        ev::io      watcher_io;
        ev::prepare watcher_prepare;
        ev::check   watcher_check;
        ev::idle    watcher_idle;

        // Our ZeroMQ socket
        zmq::socket_t socket;
        int           socket_fd = -1;
};

zmq_event.cc

#include <stdexcept>

#include "zmq_event.h"

zmq_event::zmq_event(zmq::context_t& context, int type, const std::string& connect) : socket(context, type) {
    // Get the file descriptor for the socket
    size_t fd_len = sizeof(_socket_fd);
    socket.getsockopt(ZMQ_FD, &socket_fd, &fd_len);

    // Actually connect to the ZeroMQ endpoint, could replace this with a bind as well ...
    socket.bind(connect.c_str());

    // Set up all of our watchers

    // Have our IO watcher check for READ on the ZeroMQ socket
    watcher_io.set(socket_fd, ev::READ);

    // This watcher has a no-op callback
    watcher_io.set<zmq_event, &zmq_event::noop>(this);

    // Set up our prepare watcher to call the before() function
    watcher_prepare.set<zmq_event, &zmq_event::before>(this);

    // Set up the check watcher to call the after() function
    watcher_check.set<zmq_event, &zmq_event::after>(this);

    // Set up our idle watcher, once again a no-op
    watcher_idle.set<zmq_event, &zmq_event::noop>(this);

    // Tell libev to start notifying us!
    start_notify();
}

zmq_event::~zmq_event() {}

zmq_event::before(ev::prepare&, int revents) {
    if (EV_ERROR & revents) {
        throw std::runtime_error("libev error");
    }

    // Get any events that may be waiting
    uint32_t zevents = 0;
    size_t zevents_len = sizeof(zevents);

    // Lucky for us, getting the events available doesn't invalidate the
    // events, so that calling this in `before()` and in `after()` will
    // give us the same results.
    socket.getsockopt(ZMQ_EVENTS, &zevents, &zevents_len);

    // Check what events exists, and check it against what event we want. We
    // "abuse" our watcher_io.events for this information.
    if ((zevents & ZMQ_POLLOUT) && (watcher_io.events & ev::WRITE)) {
        watcher_idle.start();
        return;
    }

    if ((zevents & ZMQ_POLLIN) && (watcher_io.events & ev::READ)) {
        watcher_idle.start();
        return;
    }

    // No events ready to be processed, we'll just go watch some io
    watcher_io.start();
}

zmq_event::after(ev::check&, int revents) {
    if (EV_ERROR & revents) {
        throw std::runtime_error("libev error");
    }

    // Stop both the idle and the io watcher, no point in calling the no-op callback
    // One of them will be reactived by before() on the next loop
    watcher_idle.stop();
    watcher_io.stop();

    // Get the events
    uint32_t zevents = 0;
    size_t zevents_len = sizeof(zevents);
    socket.getsockopt(ZMQ_EVENTS, &zevents, &zevents_len);

    // Check the events and call the users read/write function
    if ((zevents & ZMQ_POLLIN) && (watcher_io.events & ev::READ)) {
        this->read();
    }

    if ((zevents & ZMQ_POLLOUT) && (watcher_io.events & ev::WRITE)) {
        this->write();
    }
}

zmq_event::start_notify() {
    watcher_check.start();
    watcher_prepare.start();
}

zmq_event::stop_notify() {
    watcher_check.stop();
    watcher_prepare.stop();
}

Other event loops

libev is but just one of many event loops that exist out there, hopefully this shows how it is possible to embed ZeroMQ into an event loop, thereby making it easier to embed ZeroMQ into any other event loops.


  1. This snippet was from my older article regarding ZeroMQ edge triggered notifications. I would highly suggest reading that article for more information and even more background on what is going on. 

Embedding ZeroMQ in the libev Event Loop

In a previous article on ZeroMQ we went over how ZeroMQ is triggered when you use the socket that ZeroMQ returns, in that article there was some discussion of embedding ZeroMQ into another event loop. Let's do that.

libev is an absolutely fantastic library that helps make it easy to write evented programs. Evented programs work by getting notified that an action has happened, and acting upon it. Unlike threaded where multiple pieces of work are being executed at the same time, in an evented system you move every item that could block to an event loop, that then calls back into user code with a notification to continue. If one event uses up more than its fair share of CPU time because it is busy doing a long calculation, every single other event that is waiting will never get notified.

Now, as previously discussed ZeroMQ is edge triggered, so embedding it into an event loop that is level triggered doesn't do us much good, because we will miss certain ZeroMQ notifications.

One way to solve this problem is by looping over ZeroMQ's event system until we get back a notification that it no longer has anything else for us to process, that would look something like this1:

int zevents = 0;
size_t zevents_len = sizeof(zevents);
zmq_socket.getsockopt(ZMQ_EVENTS, &zevents, &zevents_len);

do {
    if (zevents & ZMQ_POLLIN) {
        // We can read from the ZeroMQ socket
    } else {
        break;
    }

    // Check to see if there is more to read ...
    zmq_socket.getsockopt(ZMQ_EVENTS, &zevents, &zevents_len);
} while (zevents & ZMQ_POLLIN);

if (zevents & ZMQ_POLLOUT) {
    // We can write to the ZeroMQ socket
}

// If neither of the above is true, then it was a false positive

However if we are receiving information from ZeroMQ remote endpoints faster than we can process them, we end up being stuck in that do ... while loop forever. If we have other events we want to process, that isn't entirely fair since they will never ever get called again. Especially in a server application where it may be servicing thousands of clients this is simply not acceptable.

libev

libev provides various different event notifications, to be able to get around edge triggered notifications, and still provide fair round-robin for all events we are going to have to build on top of multiple different events.

The events used will be:

  • ev::io: This one is pretty self explanatory, this is for getting notified about input output changes. This is the one we are going to use on the ZMQ_FD.
  • ev::prepare and ev::check: These two are generally used together, they can be used to change the event loop and or make modifications on the fly to events that have been registered with the event loop.
  • ev::idle: This is an event that gets fired whenever the event loop has nothing else to do, so no other events fired, this will fire.

Plan of attack

Since the prepare and check run before and after the loop, we are going to be using those to do most of the work. We use an io so that we can turn off the idle when we can actually wait for a result from ZeroMQ's file descriptor, otherwise we use idle so that we will always get called once every loop.

In the prepare watcher callback we do the following:

  1. Check to see what events ZeroMQ has for us, and check what events the user has requested.
  2. If the ZeroMQ has an event for us that we want, and the user has requested that event, we start the idle watcher.
  3. If ZeroMQ has no events, we start the io watcher.

In the check watcher callback we do the following:

  1. Stop both the io and idle watchers, they were only there to make sure that our check watcher was called.
  2. See what event ZeroMQ has for us, and check that against what the user wants. Depending on the event, call user defined function write() or user defined function read().
  3. If this was a spurious wake-up on the part of ZeroMQ we simply ignore it and let libev go on to other events.

We could make all of this work by simply using the prepare, check and idle watchers, but that would mean libev would be busy-waiting for something to happen on the ZeroMQ socket. The io watcher is required simply so in times of nothing happening libev in its library can call into the kernels event handling mechanism and go to sleep. We can't use just the io watcher due to the edge-triggered notification, because we'd miss all kinds of ZeroMQ messages. So all four watchers are required, and play crucial parts in making this work.

Let's get down to code

Below you will find example code, it is not complete. Do note that I am using some C++11isms, error checking code may not be complete/correct and in general I don't suggest you copy and paste this without reading and understanding what it does.

The zmq_event class is meant to be used as a base class, inherit from it, and create the write() and read() functions. These functions will be called when you are able to read from the ZeroMQ socket, or when you are able to write to the ZeroMQ socket. You are guaranteed to be able to read one whole ZeroMQ message, so if it is a multi-part message, do make sure to loop on ZMQ_SNDMORE as required.

Upon instantiation it will automatically start being notified about events, we start off with ev::READ. When your sub-class wants to write to ZeroMQ it should put the messages to be written into a list somewhere, and set ev::READ | ev::WRITE on watcher_io, by calling watcher_io.set(socket_fd, ev::READ | ev::WRITE). write() will then be called, write a single message to ZeroMQ, and if necessary when finished writing, unset ev::WRITE using watcher_io.set(socket_fd, ev::READ). If you are not finished writing, after writing that singular message you may return and write() will be called again the next loop iteration. This way if you have a lot of data to write you don't starve the other events from receiving their notifications.

zmq_event.h

#include <string>

#include <ev++.h>
#include <zmq.hpp>

class zmq_event {
    public:
        zmq_event(zmq::context_t& context, int type, const std::string& connect);
        virtual ~zmq_event();

    protected:
        // This gets fired before the event loop, to prepare
        void before(ev::prepare& prep, int revents);

        // This is fired after the event loop, but before any other type of events
        void after(ev::check& check, int revents);

        // We need to have a no-op function available for those events that we
        // want to add to the list, but should never fire an actual event
        template <typename T>
            inline void noop(T& w, int revents) {};

        // Function we are going to call to write to the ZeroMQ socket
        virtual void write() = 0;

        // Function we are going to call to read from the ZeroMQ socket
        virtual void read() = 0;

        // Some helper function, one to start notifications
        void start_notify();

        // And one to stop notifications.
        void stop_notify();

        // Our event types
        ev::io      watcher_io;
        ev::prepare watcher_prepare;
        ev::check   watcher_check;
        ev::idle    watcher_idle;

        // Our ZeroMQ socket
        zmq::socket_t socket;
        int           socket_fd = -1;
};

zmq_event.cc

#include <stdexcept>

#include "zmq_event.h"

zmq_event::zmq_event(zmq::context_t& context, int type, const std::string& connect) : socket(context, type) {
    // Get the file descriptor for the socket
    size_t fd_len = sizeof(_socket_fd);
    socket.getsockopt(ZMQ_FD, &socket_fd, &fd_len);

    // Actually connect to the ZeroMQ endpoint, could replace this with a bind as well ...
    socket.bind(connect.c_str());

    // Set up all of our watchers

    // Have our IO watcher check for READ on the ZeroMQ socket
    watcher_io.set(socket_fd, ev::READ);

    // This watcher has a no-op callback
    watcher_io.set<zmq_event, &zmq_event::noop>(this);

    // Set up our prepare watcher to call the before() function
    watcher_prepare.set<zmq_event, &zmq_event::before>(this);

    // Set up the check watcher to call the after() function
    watcher_check.set<zmq_event, &zmq_event::after>(this);

    // Set up our idle watcher, once again a no-op
    watcher_idle.set<zmq_event, &zmq_event::noop>(this);

    // Tell libev to start notifying us!
    start_notify();
}

zmq_event::~zmq_event() {}

zmq_event::before(ev::prepare&, int revents) {
    if (EV_ERROR & revents) {
        throw std::runtime_error("libev error");
    }

    // Get any events that may be waiting
    uint32_t zevents = 0;
    size_t zevents_len = sizeof(zevents);

    // Lucky for us, getting the events available doesn't invalidate the
    // events, so that calling this in `before()` and in `after()` will
    // give us the same results.
    socket.getsockopt(ZMQ_EVENTS, &zevents, &zevents_len);

    // Check what events exists, and check it against what event we want. We
    // "abuse" our watcher_io.events for this information.
    if ((zevents & ZMQ_POLLOUT) && (watcher_io.events & ev::WRITE)) {
        watcher_idle.start();
        return;
    }

    if ((zevents & ZMQ_POLLIN) && (watcher_io.events & ev::READ)) {
        watcher_idle.start();
        return;
    }

    // No events ready to be processed, we'll just go watch some io
    watcher_io.start();
}

zmq_event::after(ev::check&, int revents) {
    if (EV_ERROR & revents) {
        throw std::runtime_error("libev error");
    }

    // Stop both the idle and the io watcher, no point in calling the no-op callback
    // One of them will be reactived by before() on the next loop
    watcher_idle.stop();
    watcher_io.stop();

    // Get the events
    uint32_t zevents = 0;
    size_t zevents_len = sizeof(zevents);
    socket.getsockopt(ZMQ_EVENTS, &zevents, &zevents_len);

    // Check the events and call the users read/write function
    if ((zevents & ZMQ_POLLIN) && (watcher_io.events & ev::READ)) {
        this->read();
    }

    if ((zevents & ZMQ_POLLOUT) && (watcher_io.events & ev::WRITE)) {
        this->write();
    }
}

zmq_event::start_notify() {
    watcher_check.start();
    watcher_prepare.start();
}

zmq_event::stop_notify() {
    watcher_check.stop();
    watcher_prepare.stop();
}

Other event loops

libev is but just one of many event loops that exist out there, hopefully this shows how it is possible to embed ZeroMQ into an event loop, thereby making it easier to embed ZeroMQ into any other event loops.


  1. This snippet was from my older article regarding ZeroMQ edge triggered notifications. I would highly suggest reading that article for more information and even more background on what is going on. 

Embedding ZeroMQ in the libev Event Loop

In a previous article on ZeroMQ we went over how ZeroMQ is triggered when you use the socket that ZeroMQ returns, in that article there was some discussion of embedding ZeroMQ into another event loop. Let's do that.

libev is an absolutely fantastic library that helps make it easy to write evented programs. Evented programs work by getting notified that an action has happened, and acting upon it. Unlike threaded where multiple pieces of work are being executed at the same time, in an evented system you move every item that could block to an event loop, that then calls back into user code with a notification to continue. If one event uses up more than its fair share of CPU time because it is busy doing a long calculation, every single other event that is waiting will never get notified.

Now, as previously discussed ZeroMQ is edge triggered, so embedding it into an event loop that is level triggered doesn't do us much good, because we will miss certain ZeroMQ notifications.

One way to solve this problem is by looping over ZeroMQ's event system until we get back a notification that it no longer has anything else for us to process, that would look something like this1:

int zevents = 0;
size_t zevents_len = sizeof(zevents);
zmq_socket.getsockopt(ZMQ_EVENTS, &zevents, &zevents_len);

do {
    if (zevents & ZMQ_POLLIN) {
        // We can read from the ZeroMQ socket
    } else {
        break;
    }

    // Check to see if there is more to read ...
    zmq_socket.getsockopt(ZMQ_EVENTS, &zevents, &zevents_len);
} while (zevents & ZMQ_POLLIN);

if (zevents & ZMQ_POLLOUT) {
    // We can write to the ZeroMQ socket
}

// If neither of the above is true, then it was a false positive

However if we are receiving information from ZeroMQ remote endpoints faster than we can process them, we end up being stuck in that do ... while loop forever. If we have other events we want to process, that isn't entirely fair since they will never ever get called again. Especially in a server application where it may be servicing thousands of clients this is simply not acceptable.

libev

libev provides various different event notifications, to be able to get around edge triggered notifications, and still provide fair round-robin for all events we are going to have to build on top of multiple different events.

The events used will be:

  • ev::io: This one is pretty self explanatory, this is for getting notified about input output changes. This is the one we are going to use on the ZMQ_FD.
  • ev::prepare and ev::check: These two are generally used together, they can be used to change the event loop and or make modifications on the fly to events that have been registered with the event loop.
  • ev::idle: This is an event that gets fired whenever the event loop has nothing else to do, so no other events fired, this will fire.

Plan of attack

Since the prepare and check run before and after the loop, we are going to be using those to do most of the work. We use an io so that we can turn off the idle when we can actually wait for a result from ZeroMQ's file descriptor, otherwise we use idle so that we will always get called once every loop.

In the prepare watcher callback we do the following:

  1. Check to see what events ZeroMQ has for us, and check what events the user has requested.
  2. If the ZeroMQ has an event for us that we want, and the user has requested that event, we start the idle watcher.
  3. If ZeroMQ has no events, we start the io watcher.

In the check watcher callback we do the following:

  1. Stop both the io and idle watchers, they were only there to make sure that our check watcher was called.
  2. See what event ZeroMQ has for us, and check that against what the user wants. Depending on the event, call user defined function write() or user defined function read().
  3. If this was a spurious wake-up on the part of ZeroMQ we simply ignore it and let libev go on to other events.

We could make all of this work by simply using the prepare, check and idle watchers, but that would mean libev would be busy-waiting for something to happen on the ZeroMQ socket. The io watcher is required simply so in times of nothing happening libev in its library can call into the kernels event handling mechanism and go to sleep. We can't use just the io watcher due to the edge-triggered notification, because we'd miss all kinds of ZeroMQ messages. So all four watchers are required, and play crucial parts in making this work.

Let's get down to code

Below you will find example code, it is not complete. Do note that I am using some C++11isms, error checking code may not be complete/correct and in general I don't suggest you copy and paste this without reading and understanding what it does.

The zmq_event class is meant to be used as a base class, inherit from it, and create the write() and read() functions. These functions will be called when you are able to read from the ZeroMQ socket, or when you are able to write to the ZeroMQ socket. You are guaranteed to be able to read one whole ZeroMQ message, so if it is a multi-part message, do make sure to loop on ZMQ_SNDMORE as required.

Upon instantiation it will automatically start being notified about events, we start off with ev::READ. When your sub-class wants to write to ZeroMQ it should put the messages to be written into a list somewhere, and set ev::READ | ev::WRITE on watcher_io, by calling watcher_io.set(socket_fd, ev::READ | ev::WRITE). write() will then be called, write a single message to ZeroMQ, and if necessary when finished writing, unset ev::WRITE using watcher_io.set(socket_fd, ev::READ). If you are not finished writing, after writing that singular message you may return and write() will be called again the next loop iteration. This way if you have a lot of data to write you don't starve the other events from receiving their notifications.

zmq_event.h

#include <string>

#include <ev++.h>
#include <zmq.hpp>

class zmq_event {
    public:
        zmq_event(zmq::context_t& context, int type, const std::string& connect);
        virtual ~zmq_event();

    protected:
        // This gets fired before the event loop, to prepare
        void before(ev::prepare& prep, int revents);

        // This is fired after the event loop, but before any other type of events
        void after(ev::check& check, int revents);

        // We need to have a no-op function available for those events that we
        // want to add to the list, but should never fire an actual event
        template <typename T>
            inline void noop(T& w, int revents) {};

        // Function we are going to call to write to the ZeroMQ socket
        virtual void write() = 0;

        // Function we are going to call to read from the ZeroMQ socket
        virtual void read() = 0;

        // Some helper function, one to start notifications
        void start_notify();

        // And one to stop notifications.
        void stop_notify();

        // Our event types
        ev::io      watcher_io;
        ev::prepare watcher_prepare;
        ev::check   watcher_check;
        ev::idle    watcher_idle;

        // Our ZeroMQ socket
        zmq::socket_t socket;
        int           socket_fd = -1;
};

zmq_event.cc

#include <stdexcept>

#include "zmq_event.h"

zmq_event::zmq_event(zmq::context_t& context, int type, const std::string& connect) : socket(context, type) {
    // Get the file descriptor for the socket
    size_t fd_len = sizeof(_socket_fd);
    socket.getsockopt(ZMQ_FD, &socket_fd, &fd_len);

    // Actually connect to the ZeroMQ endpoint, could replace this with a bind as well ...
    socket.bind(connect.c_str());

    // Set up all of our watchers

    // Have our IO watcher check for READ on the ZeroMQ socket
    watcher_io.set(socket_fd, ev::READ);

    // This watcher has a no-op callback
    watcher_io.set<zmq_event, &zmq_event::noop>(this);

    // Set up our prepare watcher to call the before() function
    watcher_prepare.set<zmq_event, &zmq_event::before>(this);

    // Set up the check watcher to call the after() function
    watcher_check.set<zmq_event, &zmq_event::after>(this);

    // Set up our idle watcher, once again a no-op
    watcher_idle.set<zmq_event, &zmq_event::noop>(this);

    // Tell libev to start notifying us!
    start_notify();
}

zmq_event::~zmq_event() {}

zmq_event::before(ev::prepare&, int revents) {
    if (EV_ERROR & revents) {
        throw std::runtime_error("libev error");
    }

    // Get any events that may be waiting
    uint32_t zevents = 0;
    size_t zevents_len = sizeof(zevents);

    // Lucky for us, getting the events available doesn't invalidate the
    // events, so that calling this in `before()` and in `after()` will
    // give us the same results.
    socket.getsockopt(ZMQ_EVENTS, &zevents, &zevents_len);

    // Check what events exists, and check it against what event we want. We
    // "abuse" our watcher_io.events for this information.
    if ((zevents & ZMQ_POLLOUT) && (watcher_io.events & ev::WRITE)) {
        watcher_idle.start();
        return;
    }

    if ((zevents & ZMQ_POLLIN) && (watcher_io.events & ev::READ)) {
        watcher_idle.start();
        return;
    }

    // No events ready to be processed, we'll just go watch some io
    watcher_io.start();
}

zmq_event::after(ev::check&, int revents) {
    if (EV_ERROR & revents) {
        throw std::runtime_error("libev error");
    }

    // Stop both the idle and the io watcher, no point in calling the no-op callback
    // One of them will be reactived by before() on the next loop
    watcher_idle.stop();
    watcher_io.stop();

    // Get the events
    uint32_t zevents = 0;
    size_t zevents_len = sizeof(zevents);
    socket.getsockopt(ZMQ_EVENTS, &zevents, &zevents_len);

    // Check the events and call the users read/write function
    if ((zevents & ZMQ_POLLIN) && (watcher_io.events & ev::READ)) {
        this->read();
    }

    if ((zevents & ZMQ_POLLOUT) && (watcher_io.events & ev::WRITE)) {
        this->write();
    }
}

zmq_event::start_notify() {
    watcher_check.start();
    watcher_prepare.start();
}

zmq_event::stop_notify() {
    watcher_check.stop();
    watcher_prepare.stop();
}

Other event loops

libev is but just one of many event loops that exist out there, hopefully this shows how it is possible to embed ZeroMQ into an event loop, thereby making it easier to embed ZeroMQ into any other event loops.


  1. This snippet was from my older article regarding ZeroMQ edge triggered notifications. I would highly suggest reading that article for more information and even more background on what is going on. 

Embedding ZeroMQ in the libev Event Loop

In a previous article on ZeroMQ we went over how ZeroMQ is triggered when you use the socket that ZeroMQ returns, in that article there was some discussion of embedding ZeroMQ into another event loop. Let's do that.

libev is an absolutely fantastic library that helps make it easy to write evented programs. Evented programs work by getting notified that an action has happened, and acting upon it. Unlike threaded where multiple pieces of work are being executed at the same time, in an evented system you move every item that could block to an event loop, that then calls back into user code with a notification to continue. If one event uses up more than its fair share of CPU time because it is busy doing a long calculation, every single other event that is waiting will never get notified.

Now, as previously discussed ZeroMQ is edge triggered, so embedding it into an event loop that is level triggered doesn't do us much good, because we will miss certain ZeroMQ notifications.

One way to solve this problem is by looping over ZeroMQ's event system until we get back a notification that it no longer has anything else for us to process, that would look something like this1:

int zevents = 0;
size_t zevents_len = sizeof(zevents);
zmq_socket.getsockopt(ZMQ_EVENTS, &zevents, &zevents_len);

do {
    if (zevents & ZMQ_POLLIN) {
        // We can read from the ZeroMQ socket
    } else {
        break;
    }

    // Check to see if there is more to read ...
    zmq_socket.getsockopt(ZMQ_EVENTS, &zevents, &zevents_len);
} while (zevents & ZMQ_POLLIN);

if (zevents & ZMQ_POLLOUT) {
    // We can write to the ZeroMQ socket
}

// If neither of the above is true, then it was a false positive

However if we are receiving information from ZeroMQ remote endpoints faster than we can process them, we end up being stuck in that do ... while loop forever. If we have other events we want to process, that isn't entirely fair since they will never ever get called again. Especially in a server application where it may be servicing thousands of clients this is simply not acceptable.

libev

libev provides various different event notifications, to be able to get around edge triggered notifications, and still provide fair round-robin for all events we are going to have to build on top of multiple different events.

The events used will be:

  • ev::io: This one is pretty self explanatory, this is for getting notified about input output changes. This is the one we are going to use on the ZMQ_FD.
  • ev::prepare and ev::check: These two are generally used together, they can be used to change the event loop and or make modifications on the fly to events that have been registered with the event loop.
  • ev::idle: This is an event that gets fired whenever the event loop has nothing else to do, so no other events fired, this will fire.

Plan of attack

Since the prepare and check run before and after the loop, we are going to be using those to do most of the work. We use an io so that we can turn off the idle when we can actually wait for a result from ZeroMQ's file descriptor, otherwise we use idle so that we will always get called once every loop.

In the prepare watcher callback we do the following:

  1. Check to see what events ZeroMQ has for us, and check what events the user has requested.
  2. If the ZeroMQ has an event for us that we want, and the user has requested that event, we start the idle watcher.
  3. If ZeroMQ has no events, we start the io watcher.

In the check watcher callback we do the following:

  1. Stop both the io and idle watchers, they were only there to make sure that our check watcher was called.
  2. See what event ZeroMQ has for us, and check that against what the user wants. Depending on the event, call user defined function write() or user defined function read().
  3. If this was a spurious wake-up on the part of ZeroMQ we simply ignore it and let libev go on to other events.

We could make all of this work by simply using the prepare, check and idle watchers, but that would mean libev would be busy-waiting for something to happen on the ZeroMQ socket. The io watcher is required simply so in times of nothing happening libev in its library can call into the kernels event handling mechanism and go to sleep. We can't use just the io watcher due to the edge-triggered notification, because we'd miss all kinds of ZeroMQ messages. So all four watchers are required, and play crucial parts in making this work.

Let's get down to code

Below you will find example code, it is not complete. Do note that I am using some C++11isms, error checking code may not be complete/correct and in general I don't suggest you copy and paste this without reading and understanding what it does.

The zmq_event class is meant to be used as a base class, inherit from it, and create the write() and read() functions. These functions will be called when you are able to read from the ZeroMQ socket, or when you are able to write to the ZeroMQ socket. You are guaranteed to be able to read one whole ZeroMQ message, so if it is a multi-part message, do make sure to loop on ZMQ_SNDMORE as required.

Upon instantiation it will automatically start being notified about events, we start off with ev::READ. When your sub-class wants to write to ZeroMQ it should put the messages to be written into a list somewhere, and set ev::READ | ev::WRITE on watcher_io, by calling watcher_io.set(socket_fd, ev::READ | ev::WRITE). write() will then be called, write a single message to ZeroMQ, and if necessary when finished writing, unset ev::WRITE using watcher_io.set(socket_fd, ev::READ). If you are not finished writing, after writing that singular message you may return and write() will be called again the next loop iteration. This way if you have a lot of data to write you don't starve the other events from receiving their notifications.

zmq_event.h

#include <string>

#include <ev++.h>
#include <zmq.hpp>

class zmq_event {
    public:
        zmq_event(zmq::context_t& context, int type, const std::string& connect);
        virtual ~zmq_event();

    protected:
        // This gets fired before the event loop, to prepare
        void before(ev::prepare& prep, int revents);

        // This is fired after the event loop, but before any other type of events
        void after(ev::check& check, int revents);

        // We need to have a no-op function available for those events that we
        // want to add to the list, but should never fire an actual event
        template <typename T>
            inline void noop(T& w, int revents) {};

        // Function we are going to call to write to the ZeroMQ socket
        virtual void write() = 0;

        // Function we are going to call to read from the ZeroMQ socket
        virtual void read() = 0;

        // Some helper function, one to start notifications
        void start_notify();

        // And one to stop notifications.
        void stop_notify();

        // Our event types
        ev::io      watcher_io;
        ev::prepare watcher_prepare;
        ev::check   watcher_check;
        ev::idle    watcher_idle;

        // Our ZeroMQ socket
        zmq::socket_t socket;
        int           socket_fd = -1;
};

zmq_event.cc

#include <stdexcept>

#include "zmq_event.h"

zmq_event::zmq_event(zmq::context_t& context, int type, const std::string& connect) : socket(context, type) {
    // Get the file descriptor for the socket
    size_t fd_len = sizeof(_socket_fd);
    socket.getsockopt(ZMQ_FD, &socket_fd, &fd_len);

    // Actually connect to the ZeroMQ endpoint, could replace this with a bind as well ...
    socket.bind(connect.c_str());

    // Set up all of our watchers

    // Have our IO watcher check for READ on the ZeroMQ socket
    watcher_io.set(socket_fd, ev::READ);

    // This watcher has a no-op callback
    watcher_io.set<zmq_event, &zmq_event::noop>(this);

    // Set up our prepare watcher to call the before() function
    watcher_prepare.set<zmq_event, &zmq_event::before>(this);

    // Set up the check watcher to call the after() function
    watcher_check.set<zmq_event, &zmq_event::after>(this);

    // Set up our idle watcher, once again a no-op
    watcher_idle.set<zmq_event, &zmq_event::noop>(this);

    // Tell libev to start notifying us!
    start_notify();
}

zmq_event::~zmq_event() {}

zmq_event::before(ev::prepare&, int revents) {
    if (EV_ERROR & revents) {
        throw std::runtime_error("libev error");
    }

    // Get any events that may be waiting
    uint32_t zevents = 0;
    size_t zevents_len = sizeof(zevents);

    // Lucky for us, getting the events available doesn't invalidate the
    // events, so that calling this in `before()` and in `after()` will
    // give us the same results.
    socket.getsockopt(ZMQ_EVENTS, &zevents, &zevents_len);

    // Check what events exists, and check it against what event we want. We
    // "abuse" our watcher_io.events for this information.
    if ((zevents & ZMQ_POLLOUT) && (watcher_io.events & ev::WRITE)) {
        watcher_idle.start();
        return;
    }

    if ((zevents & ZMQ_POLLIN) && (watcher_io.events & ev::READ)) {
        watcher_idle.start();
        return;
    }

    // No events ready to be processed, we'll just go watch some io
    watcher_io.start();
}

zmq_event::after(ev::check&, int revents) {
    if (EV_ERROR & revents) {
        throw std::runtime_error("libev error");
    }

    // Stop both the idle and the io watcher, no point in calling the no-op callback
    // One of them will be reactived by before() on the next loop
    watcher_idle.stop();
    watcher_io.stop();

    // Get the events
    uint32_t zevents = 0;
    size_t zevents_len = sizeof(zevents);
    socket.getsockopt(ZMQ_EVENTS, &zevents, &zevents_len);

    // Check the events and call the users read/write function
    if ((zevents & ZMQ_POLLIN) && (watcher_io.events & ev::READ)) {
        this->read();
    }

    if ((zevents & ZMQ_POLLOUT) && (watcher_io.events & ev::WRITE)) {
        this->write();
    }
}

zmq_event::start_notify() {
    watcher_check.start();
    watcher_prepare.start();
}

zmq_event::stop_notify() {
    watcher_check.stop();
    watcher_prepare.stop();
}

Other event loops

libev is but just one of many event loops that exist out there, hopefully this shows how it is possible to embed ZeroMQ into an event loop, thereby making it easier to embed ZeroMQ into any other event loops.


  1. This snippet was from my older article regarding ZeroMQ edge triggered notifications. I would highly suggest reading that article for more information and even more background on what is going on. 

Embedding ZeroMQ in the libev Event Loop

In a previous article on ZeroMQ we went over how ZeroMQ is triggered when you use the socket that ZeroMQ returns, in that article there was some discussion of embedding ZeroMQ into another event loop. Let's do that.

libev is an absolutely fantastic library that helps make it easy to write evented programs. Evented programs work by getting notified that an action has happened, and acting upon it. Unlike threaded where multiple pieces of work are being executed at the same time, in an evented system you move every item that could block to an event loop, that then calls back into user code with a notification to continue. If one event uses up more than its fair share of CPU time because it is busy doing a long calculation, every single other event that is waiting will never get notified.

Now, as previously discussed ZeroMQ is edge triggered, so embedding it into an event loop that is level triggered doesn't do us much good, because we will miss certain ZeroMQ notifications.

One way to solve this problem is by looping over ZeroMQ's event system until we get back a notification that it no longer has anything else for us to process, that would look something like this1:

int zevents = 0;
size_t zevents_len = sizeof(zevents);
zmq_socket.getsockopt(ZMQ_EVENTS, &zevents, &zevents_len);

do {
    if (zevents & ZMQ_POLLIN) {
        // We can read from the ZeroMQ socket
    } else {
        break;
    }

    // Check to see if there is more to read ...
    zmq_socket.getsockopt(ZMQ_EVENTS, &zevents, &zevents_len);
} while (zevents & ZMQ_POLLIN);

if (zevents & ZMQ_POLLOUT) {
    // We can write to the ZeroMQ socket
}

// If neither of the above is true, then it was a false positive

However if we are receiving information from ZeroMQ remote endpoints faster than we can process them, we end up being stuck in that do ... while loop forever. If we have other events we want to process, that isn't entirely fair since they will never ever get called again. Especially in a server application where it may be servicing thousands of clients this is simply not acceptable.

libev

libev provides various different event notifications, to be able to get around edge triggered notifications, and still provide fair round-robin for all events we are going to have to build on top of multiple different events.

The events used will be:

  • ev::io: This one is pretty self explanatory, this is for getting notified about input output changes. This is the one we are going to use on the ZMQ_FD.
  • ev::prepare and ev::check: These two are generally used together, they can be used to change the event loop and or make modifications on the fly to events that have been registered with the event loop.
  • ev::idle: This is an event that gets fired whenever the event loop has nothing else to do, so no other events fired, this will fire.

Plan of attack

Since the prepare and check run before and after the loop, we are going to be using those to do most of the work. We use an io so that we can turn off the idle when we can actually wait for a result from ZeroMQ's file descriptor, otherwise we use idle so that we will always get called once every loop.

In the prepare watcher callback we do the following:

  1. Check to see what events ZeroMQ has for us, and check what events the user has requested.
  2. If the ZeroMQ has an event for us that we want, and the user has requested that event, we start the idle watcher.
  3. If ZeroMQ has no events, we start the io watcher.

In the check watcher callback we do the following:

  1. Stop both the io and idle watchers, they were only there to make sure that our check watcher was called.
  2. See what event ZeroMQ has for us, and check that against what the user wants. Depending on the event, call user defined function write() or user defined function read().
  3. If this was a spurious wake-up on the part of ZeroMQ we simply ignore it and let libev go on to other events.

We could make all of this work by simply using the prepare, check and idle watchers, but that would mean libev would be busy-waiting for something to happen on the ZeroMQ socket. The io watcher is required simply so in times of nothing happening libev in its library can call into the kernels event handling mechanism and go to sleep. We can't use just the io watcher due to the edge-triggered notification, because we'd miss all kinds of ZeroMQ messages. So all four watchers are required, and play crucial parts in making this work.

Let's get down to code

Below you will find example code, it is not complete. Do note that I am using some C++11isms, error checking code may not be complete/correct and in general I don't suggest you copy and paste this without reading and understanding what it does.

The zmq_event class is meant to be used as a base class, inherit from it, and create the write() and read() functions. These functions will be called when you are able to read from the ZeroMQ socket, or when you are able to write to the ZeroMQ socket. You are guaranteed to be able to read one whole ZeroMQ message, so if it is a multi-part message, do make sure to loop on ZMQ_SNDMORE as required.

Upon instantiation it will automatically start being notified about events, we start off with ev::READ. When your sub-class wants to write to ZeroMQ it should put the messages to be written into a list somewhere, and set ev::READ | ev::WRITE on watcher_io, by calling watcher_io.set(socket_fd, ev::READ | ev::WRITE). write() will then be called, write a single message to ZeroMQ, and if necessary when finished writing, unset ev::WRITE using watcher_io.set(socket_fd, ev::READ). If you are not finished writing, after writing that singular message you may return and write() will be called again the next loop iteration. This way if you have a lot of data to write you don't starve the other events from receiving their notifications.

zmq_event.h

#include <string>

#include <ev++.h>
#include <zmq.hpp>

class zmq_event {
    public:
        zmq_event(zmq::context_t& context, int type, const std::string& connect);
        virtual ~zmq_event();

    protected:
        // This gets fired before the event loop, to prepare
        void before(ev::prepare& prep, int revents);

        // This is fired after the event loop, but before any other type of events
        void after(ev::check& check, int revents);

        // We need to have a no-op function available for those events that we
        // want to add to the list, but should never fire an actual event
        template <typename T>
            inline void noop(T& w, int revents) {};

        // Function we are going to call to write to the ZeroMQ socket
        virtual void write() = 0;

        // Function we are going to call to read from the ZeroMQ socket
        virtual void read() = 0;

        // Some helper function, one to start notifications
        void start_notify();

        // And one to stop notifications.
        void stop_notify();

        // Our event types
        ev::io      watcher_io;
        ev::prepare watcher_prepare;
        ev::check   watcher_check;
        ev::idle    watcher_idle;

        // Our ZeroMQ socket
        zmq::socket_t socket;
        int           socket_fd = -1;
};

zmq_event.cc

#include <stdexcept>

#include "zmq_event.h"

zmq_event::zmq_event(zmq::context_t& context, int type, const std::string& connect) : socket(context, type) {
    // Get the file descriptor for the socket
    size_t fd_len = sizeof(_socket_fd);
    socket.getsockopt(ZMQ_FD, &socket_fd, &fd_len);

    // Actually connect to the ZeroMQ endpoint, could replace this with a bind as well ...
    socket.bind(connect.c_str());

    // Set up all of our watchers

    // Have our IO watcher check for READ on the ZeroMQ socket
    watcher_io.set(socket_fd, ev::READ);

    // This watcher has a no-op callback
    watcher_io.set<zmq_event, &zmq_event::noop>(this);

    // Set up our prepare watcher to call the before() function
    watcher_prepare.set<zmq_event, &zmq_event::before>(this);

    // Set up the check watcher to call the after() function
    watcher_check.set<zmq_event, &zmq_event::after>(this);

    // Set up our idle watcher, once again a no-op
    watcher_idle.set<zmq_event, &zmq_event::noop>(this);

    // Tell libev to start notifying us!
    start_notify();
}

zmq_event::~zmq_event() {}

zmq_event::before(ev::prepare&, int revents) {
    if (EV_ERROR & revents) {
        throw std::runtime_error("libev error");
    }

    // Get any events that may be waiting
    uint32_t zevents = 0;
    size_t zevents_len = sizeof(zevents);

    // Lucky for us, getting the events available doesn't invalidate the
    // events, so that calling this in `before()` and in `after()` will
    // give us the same results.
    socket.getsockopt(ZMQ_EVENTS, &zevents, &zevents_len);

    // Check what events exists, and check it against what event we want. We
    // "abuse" our watcher_io.events for this information.
    if ((zevents & ZMQ_POLLOUT) && (watcher_io.events & ev::WRITE)) {
        watcher_idle.start();
        return;
    }

    if ((zevents & ZMQ_POLLIN) && (watcher_io.events & ev::READ)) {
        watcher_idle.start();
        return;
    }

    // No events ready to be processed, we'll just go watch some io
    watcher_io.start();
}

zmq_event::after(ev::check&, int revents) {
    if (EV_ERROR & revents) {
        throw std::runtime_error("libev error");
    }

    // Stop both the idle and the io watcher, no point in calling the no-op callback
    // One of them will be reactived by before() on the next loop
    watcher_idle.stop();
    watcher_io.stop();

    // Get the events
    uint32_t zevents = 0;
    size_t zevents_len = sizeof(zevents);
    socket.getsockopt(ZMQ_EVENTS, &zevents, &zevents_len);

    // Check the events and call the users read/write function
    if ((zevents & ZMQ_POLLIN) && (watcher_io.events & ev::READ)) {
        this->read();
    }

    if ((zevents & ZMQ_POLLOUT) && (watcher_io.events & ev::WRITE)) {
        this->write();
    }
}

zmq_event::start_notify() {
    watcher_check.start();
    watcher_prepare.start();
}

zmq_event::stop_notify() {
    watcher_check.stop();
    watcher_prepare.stop();
}

Other event loops

libev is but just one of many event loops that exist out there, hopefully this shows how it is possible to embed ZeroMQ into an event loop, thereby making it easier to embed ZeroMQ into any other event loops.


  1. This snippet was from my older article regarding ZeroMQ edge triggered notifications. I would highly suggest reading that article for more information and even more background on what is going on. 

Embedding ZeroMQ in the libev Event Loop

In a previous article on ZeroMQ we went over how ZeroMQ is triggered when you use the socket that ZeroMQ returns, in that article there was some discussion of embedding ZeroMQ into another event loop. Let's do that.

libev is an absolutely fantastic library that helps make it easy to write evented programs. Evented programs work by getting notified that an action has happened, and acting upon it. Unlike threaded where multiple pieces of work are being executed at the same time, in an evented system you move every item that could block to an event loop, that then calls back into user code with a notification to continue. If one event uses up more than its fair share of CPU time because it is busy doing a long calculation, every single other event that is waiting will never get notified.

Now, as previously discussed ZeroMQ is edge triggered, so embedding it into an event loop that is level triggered doesn't do us much good, because we will miss certain ZeroMQ notifications.

One way to solve this problem is by looping over ZeroMQ's event system until we get back a notification that it no longer has anything else for us to process, that would look something like this1:

int zevents = 0;
size_t zevents_len = sizeof(zevents);
zmq_socket.getsockopt(ZMQ_EVENTS, &zevents, &zevents_len);

do {
    if (zevents & ZMQ_POLLIN) {
        // We can read from the ZeroMQ socket
    } else {
        break;
    }

    // Check to see if there is more to read ...
    zmq_socket.getsockopt(ZMQ_EVENTS, &zevents, &zevents_len);
} while (zevents & ZMQ_POLLIN);

if (zevents & ZMQ_POLLOUT) {
    // We can write to the ZeroMQ socket
}

// If neither of the above is true, then it was a false positive

However if we are receiving information from ZeroMQ remote endpoints faster than we can process them, we end up being stuck in that do ... while loop forever. If we have other events we want to process, that isn't entirely fair since they will never ever get called again. Especially in a server application where it may be servicing thousands of clients this is simply not acceptable.

libev

libev provides various different event notifications, to be able to get around edge triggered notifications, and still provide fair round-robin for all events we are going to have to build on top of multiple different events.

The events used will be:

  • ev::io: This one is pretty self explanatory, this is for getting notified about input output changes. This is the one we are going to use on the ZMQ_FD.
  • ev::prepare and ev::check: These two are generally used together, they can be used to change the event loop and or make modifications on the fly to events that have been registered with the event loop.
  • ev::idle: This is an event that gets fired whenever the event loop has nothing else to do, so no other events fired, this will fire.

Plan of attack

Since the prepare and check run before and after the loop, we are going to be using those to do most of the work. We use an io so that we can turn off the idle when we can actually wait for a result from ZeroMQ's file descriptor, otherwise we use idle so that we will always get called once every loop.

In the prepare watcher callback we do the following:

  1. Check to see what events ZeroMQ has for us, and check what events the user has requested.
  2. If the ZeroMQ has an event for us that we want, and the user has requested that event, we start the idle watcher.
  3. If ZeroMQ has no events, we start the io watcher.

In the check watcher callback we do the following:

  1. Stop both the io and idle watchers, they were only there to make sure that our check watcher was called.
  2. See what event ZeroMQ has for us, and check that against what the user wants. Depending on the event, call user defined function write() or user defined function read().
  3. If this was a spurious wake-up on the part of ZeroMQ we simply ignore it and let libev go on to other events.

We could make all of this work by simply using the prepare, check and idle watchers, but that would mean libev would be busy-waiting for something to happen on the ZeroMQ socket. The io watcher is required simply so in times of nothing happening libev in its library can call into the kernels event handling mechanism and go to sleep. We can't use just the io watcher due to the edge-triggered notification, because we'd miss all kinds of ZeroMQ messages. So all four watchers are required, and play crucial parts in making this work.

Let's get down to code

Below you will find example code, it is not complete. Do note that I am using some C++11isms, error checking code may not be complete/correct and in general I don't suggest you copy and paste this without reading and understanding what it does.

The zmq_event class is meant to be used as a base class, inherit from it, and create the write() and read() functions. These functions will be called when you are able to read from the ZeroMQ socket, or when you are able to write to the ZeroMQ socket. You are guaranteed to be able to read one whole ZeroMQ message, so if it is a multi-part message, do make sure to loop on ZMQ_SNDMORE as required.

Upon instantiation it will automatically start being notified about events, we start off with ev::READ. When your sub-class wants to write to ZeroMQ it should put the messages to be written into a list somewhere, and set ev::READ | ev::WRITE on watcher_io, by calling watcher_io.set(socket_fd, ev::READ | ev::WRITE). write() will then be called, write a single message to ZeroMQ, and if necessary when finished writing, unset ev::WRITE using watcher_io.set(socket_fd, ev::READ). If you are not finished writing, after writing that singular message you may return and write() will be called again the next loop iteration. This way if you have a lot of data to write you don't starve the other events from receiving their notifications.

zmq_event.h

#include <string>

#include <ev++.h>
#include <zmq.hpp>

class zmq_event {
    public:
        zmq_event(zmq::context_t& context, int type, const std::string& connect);
        virtual ~zmq_event();

    protected:
        // This gets fired before the event loop, to prepare
        void before(ev::prepare& prep, int revents);

        // This is fired after the event loop, but before any other type of events
        void after(ev::check& check, int revents);

        // We need to have a no-op function available for those events that we
        // want to add to the list, but should never fire an actual event
        template <typename T>
            inline void noop(T& w, int revents) {};

        // Function we are going to call to write to the ZeroMQ socket
        virtual void write() = 0;

        // Function we are going to call to read from the ZeroMQ socket
        virtual void read() = 0;

        // Some helper function, one to start notifications
        void start_notify();

        // And one to stop notifications.
        void stop_notify();

        // Our event types
        ev::io      watcher_io;
        ev::prepare watcher_prepare;
        ev::check   watcher_check;
        ev::idle    watcher_idle;

        // Our ZeroMQ socket
        zmq::socket_t socket;
        int           socket_fd = -1;
};

zmq_event.cc

#include <stdexcept>

#include "zmq_event.h"

zmq_event::zmq_event(zmq::context_t& context, int type, const std::string& connect) : socket(context, type) {
    // Get the file descriptor for the socket
    size_t fd_len = sizeof(_socket_fd);
    socket.getsockopt(ZMQ_FD, &socket_fd, &fd_len);

    // Actually connect to the ZeroMQ endpoint, could replace this with a bind as well ...
    socket.bind(connect.c_str());

    // Set up all of our watchers

    // Have our IO watcher check for READ on the ZeroMQ socket
    watcher_io.set(socket_fd, ev::READ);

    // This watcher has a no-op callback
    watcher_io.set<zmq_event, &zmq_event::noop>(this);

    // Set up our prepare watcher to call the before() function
    watcher_prepare.set<zmq_event, &zmq_event::before>(this);

    // Set up the check watcher to call the after() function
    watcher_check.set<zmq_event, &zmq_event::after>(this);

    // Set up our idle watcher, once again a no-op
    watcher_idle.set<zmq_event, &zmq_event::noop>(this);

    // Tell libev to start notifying us!
    start_notify();
}

zmq_event::~zmq_event() {}

zmq_event::before(ev::prepare&, int revents) {
    if (EV_ERROR & revents) {
        throw std::runtime_error("libev error");
    }

    // Get any events that may be waiting
    uint32_t zevents = 0;
    size_t zevents_len = sizeof(zevents);

    // Lucky for us, getting the events available doesn't invalidate the
    // events, so that calling this in `before()` and in `after()` will
    // give us the same results.
    socket.getsockopt(ZMQ_EVENTS, &zevents, &zevents_len);

    // Check what events exists, and check it against what event we want. We
    // "abuse" our watcher_io.events for this information.
    if ((zevents & ZMQ_POLLOUT) && (watcher_io.events & ev::WRITE)) {
        watcher_idle.start();
        return;
    }

    if ((zevents & ZMQ_POLLIN) && (watcher_io.events & ev::READ)) {
        watcher_idle.start();
        return;
    }

    // No events ready to be processed, we'll just go watch some io
    watcher_io.start();
}

zmq_event::after(ev::check&, int revents) {
    if (EV_ERROR & revents) {
        throw std::runtime_error("libev error");
    }

    // Stop both the idle and the io watcher, no point in calling the no-op callback
    // One of them will be reactived by before() on the next loop
    watcher_idle.stop();
    watcher_io.stop();

    // Get the events
    uint32_t zevents = 0;
    size_t zevents_len = sizeof(zevents);
    socket.getsockopt(ZMQ_EVENTS, &zevents, &zevents_len);

    // Check the events and call the users read/write function
    if ((zevents & ZMQ_POLLIN) && (watcher_io.events & ev::READ)) {
        this->read();
    }

    if ((zevents & ZMQ_POLLOUT) && (watcher_io.events & ev::WRITE)) {
        this->write();
    }
}

zmq_event::start_notify() {
    watcher_check.start();
    watcher_prepare.start();
}

zmq_event::stop_notify() {
    watcher_check.stop();
    watcher_prepare.stop();
}

Other event loops

libev is but just one of many event loops that exist out there, hopefully this shows how it is possible to embed ZeroMQ into an event loop, thereby making it easier to embed ZeroMQ into any other event loops.


  1. This snippet was from my older article regarding ZeroMQ edge triggered notifications. I would highly suggest reading that article for more information and even more background on what is going on.