ZeroMQ: poll() and wait for child process

At work, I've been hacking on a distributed system based on ZeroMQ. It's a nice library that hides away a lot of the fuss and bother of network programming, but still exposes enough detail that you have a lot control over what's going on. However, the main mechanism for multiplexing I/O events is zmq_poll(), which only accepts 0MQ sockets or regular file descriptors. But if you're doing network I/O while running some work in child processes, you might want to block until some socket is ready or some child process has terminated. How to do this with zmq_poll() is not immediately apparent.

As it turns out, there are several nice ways to solve this problem, in both C and Python.

Parent and child

First, here's the setup: a parent process with one child at a time, where the child can terminate at any time. I'll show the Python version, since it's less verbose than the equivalent C:

#!/usr/bin/python

# combine 0MQ poll with child process

from __future__ import print_function

import sys
import os
import signal
import time
import random
import errno

import zmq

def main():
    # open a 0MQ socket that nobody ever connects to: the point
    # is not to do network I/O, but to poll with a child process
    # running in the background
    context = zmq.Context()
    resp = context.socket(zmq.REP)
    resp.bind("tcp://*:5433")
    poller = zmq.Poller()
    poller.register(resp)

    # start a child process that runs in the background and terminates
    # after a little while
    spawn_child()

    # main loop: this is where we would do network I/O and -- if
    # only we could figure out how -- respond to the termination of
    # the child process
    while True:
        print('parent: poll(5000)')
        poller.poll(5000)
        print('parent: poll() returned')

def spawn_child():
    pid = os.fork()
    if pid > 0:           # in the parent
        return

    # in the child: sleep for a random interval (2 .. 9 sec)
    stime = random.randint(2, 9)
    print('child: sleep(%d)' % stime)
    time.sleep(stime)
    print('child: exiting')
    os._exit(0)

main()

The child process here doesn't actually do anything (like execute a command), as that would distract from the purpose of the exercise.

This version illustrates the problem, but makes no attempt to solve it: if you run the program, you can clearly see when the child starts and exits. But since the child almost certainly terminates while the parent is in poll(), the parent doesn't know what happened. The parent could alternate calls to poll() with os.waitpid(), but then network activity could arrive while we're in waitpid(), and it would not respond to that immediately. One naive temptation is to alternate calls to poll() and waitpid() with as short a timeout as possible: the shorter the timeout, the better the response time—but that ends up as a CPU-bound busy loop rather than a low-overhead event-driven program. Alternating poll() with waitpid() is not the answer. There has to be a better way.

The classic: SIGCHLD with an interrupted system call

The classic Unix answer is SIGCHLD, the signal that is delivered to a parent process when one of its children terminates. You normally don't have to worry about SIGCHLD, since it's one of the few Unix signals that are ignored by default. But you're free to install a signal handler for it so you can do stuff as soon as a child terminates.

The nifty thing about signals is that they interrupt system calls, and ZeroMQ's poll() boils down to system calls. If ZeroMQ were trying to be clever (too clever by half), it might catch those errors and retry the poll() system call. Good news: ZeroMQ does not try to be clever. It does the right thing and exposes the interrupted system call to application code. (Pro tip: don't try to be clever. Just expose system errors to your caller with as little manipulation as possible. Your callers will thank you in the end. Thank you, ZeroMQ!)

So here's how it looks. First, the signal handler:

child_terminated = False

def handle_signal(sig, frame):
    global child_terminated
    child_terminated = True

There's very little you can do safely inside a signal handler, especially in Python. Assigning a constant to a global variable is about the only guaranteed safe action. (The reason is that just about anything you do might allocate memory with malloc(), and the signal might arrive while another malloc() call is already running. You cannot assume that the memory allocation system is re-entrant. Thus, you must avoid anything that might call malloc(), which in Python is just about anything.) (The other thing you don't want to do is anything that might block, like reading from a socket or even writing to a local disk file. “Local” disk files always turn out to be on failing disks or flaky NFS servers at just the wrong time.)

Next, just inside main(), we install the signal handler:

def main():
    signal.signal(signal.SIGCHLD, handle_signal)
    [...as before...]

As a first attempt, let's modify the main loop to check that child_terminated flag when poll() returns. After all, we expect poll() to block for 5000 ms or be interrupted by SIGCHLD, so we should get a quick reaction to the child process terminating:

while True:
    print('parent: poll(5000)')
    poller.poll(5000)
    if child_terminated:
        print('child terminated')
        child_terminated = False
        spawn_child()
    #print('parent: poll() returned')

Here's what happens with this version:

parent: poll(5000)
child: sleep(2)
child: exiting
Traceback (most recent call last):
  File "pollchild.py", line 58, in <module>
    main()
  File "pollchild.py", line 51, in main
    poller.poll(5000)
  File "/usr/lib64/python2.7/site-packages/zmq/sugar/poll.py", line 97, in poll
    return zmq_poll(list(self.sockets.items()), timeout=timeout)
  File "_poll.pyx", line 116, in zmq.core._poll.zmq_poll (zmq/core/_poll.c:1598)
  File "checkrc.pxd", line 21, in zmq.core.checkrc._check_rc (zmq/core/_poll.c:1965)
zmq.error.ZMQError: Interrupted system call

Ooops! Interrupting a system call is treated as an error in the interrupted application code! Looks like we need to catch that error. Here is the revised main loop:

while True:
    print('parent: poll(5000)')
    try:
        poller.poll(5000)
    except zmq.ZMQError as err:
        if err.errno != errno.EINTR:
            raise
    if child_terminated:
        print('child terminated')
        child_terminated = False
        spawn_child()

This one works just fine:

parent: poll(5000)
child: sleep(3)
child: exiting
child terminated

There's no visible delay between "child: exiting" and "child terminated". The parent responds immediately to child termination, just as it would if any network activity arrived on the ZeroMQ socket(s) that it's polling.

You may have noticed that I took advantage of the parent's newfound knowledge to do something new: start another child process. This guarantees that there is pretty much always one child running, except during the brief interval between one exiting and the next starting. There will definitely never be more than one child, which is why we can get away with just a single child_terminated flag. Real life is never that simple, of course.

The classic, in C

If you appreciate classic Unix tricks like SIGCHLD and interrupted system calls, then surely you will appreciate seeing the same thing again in C. Here it is:

int child_terminated = 0;

void handle_signal(int sig) {
    child_terminated = 1;
}

void spawn_child(void) {
    pid_t pid = fork();
    if (pid < 0) {
        perror("fork");
        exit(1);
    }
    if (pid > 0) {
        return;
    }

    // sleep from 2 .. 9 sec
    srandom(getpid());
    int stime = (random() % 7) + 2;
    printf("child: sleep(%d) ...\n", stime);
    sleep(stime);
    printf("child: exiting\n");
    exit(0);
}

int main(void) {
    // install SIGCHLD handler
    struct sigaction action;
    action.sa_handler = handle_signal;
    sigemptyset(&action.sa_mask);
    action.sa_flags = 0;
    if (sigaction(SIGCHLD, &action, NULL) < 0) {
        perror("error installing signal handler");
        return 1;
    }

    // setup a 0MQ socket waiting for incoming TCP connections
    void *context = zmq_ctx_new();
    void *resp = zmq_socket(context, ZMQ_REP);
    int rc = zmq_bind(resp, "tcp://*:4522");
    if (rc != 0) {
        perror("zmq_bind (tcp://*:4522) failed: ");
        return 1;
    }

    // build list of things that we want to poll on
    int nitems = 1;
    zmq_pollitem_t items[] = {{resp, -1, ZMQ_POLLIN, 0}};

    spawn_child();

    while (1) {
        printf("poll() for 5 s ...\n");
        if (zmq_poll(items, nitems, 5*1000) < 0) {
            if (errno != EINTR) {
                perror("zmq_poll");
                exit(1);
            }
        }
        if (child_terminated) {
            printf("child terminated\n");
            child_terminated = 0;
            spawn_child();
        }
    }
}

A curious phenomenon: even though Python is typically much more concise than equivalent C code, that's not the case here: 70-odd lines of C versus 60-odd lines of Python, despite the need for explicit error checking in C. I find this often happens with low-level system programming. The closer you get to the OS, the smaller the benefit of using a high-level language.

The modern twist: signalfd() (Linux only)

It turns out that the classic Unix signal API is a bit awkward to use (read the sigaction(2) man page if you don't believe me). Perhaps you would rather deal with one abstraction than two, and file descriptors are a more general abstraction—and they're what poll() works with. Likewise zmq_poll() works with both 0MQ sockets and file descriptors.

It turns out that recent versions of Linux (2.6.22 and up) have a non-standard system call, signalfd(), that exposes a file-like interface to signal handling. Instead of installing a signal handler with sigaction(), you create a signal file descriptor with signalfd(). The setup is roughly as awkward as installing a signal handler, but once you have that file descriptor, things are a little neater: no more worrying about interrupted system calls.

There's one new header to include:

#include <sys/signalfd.h>

The implementation of spawn_child() doesn't change, so I'll skip over that. But pretty much everything in main() changed a bit, so here's the signalfd()-based version of main():

int main(void) {
    // build the list of signals that we're interested in (just SIGCHLD)
    sigset_t mask;
    sigemptyset(&mask);
    sigaddset(&mask, SIGCHLD);

    // block SIGCHLD from being handled in the normal way
    // (otherwise, the signalfd does not work)
    if (sigprocmask(SIG_BLOCK, &mask, NULL) == -1) {
        perror("sigprocmask");
        return 1;
    }

    // create the file descriptor that will be readable when
    // SIGCHLD happens, i.e. when a child process terminates
    int sigfd = signalfd(-1, &mask, 0);
    if (sigfd == -1) {
        perror("signalfd");
        return 1;
    }

    // setup a 0MQ socket waiting for incoming TCP connections
    void *context = zmq_ctx_new();
    void *resp = zmq_socket(context, ZMQ_REP);
    int rc = zmq_bind(resp, "tcp://*:4522");
    if (rc != 0) {
        perror("zmq_bind (tcp://*:4522) failed: ");
        return 1;
    }

    // build list of things that we want to poll on
    int nitems = 2;
    zmq_pollitem_t items[] = {
        {resp, -1, ZMQ_POLLIN, 0},
        {NULL, sigfd, ZMQ_POLLIN, 0},
    };

    struct signalfd_siginfo siginfo;

    spawn_child();

    while (1) {
        printf("poll() for 5 s ...\n");
        int rc = zmq_poll(items, nitems, 5*1000);
        if (rc < 0) {
            perror("zmq_poll");
            exit(1);
        }
        if (items[0].revents & ZMQ_POLLIN) {
            printf("0MQ messages received\n");
        }
        if (items[1].revents & ZMQ_POLLIN) {
            printf("child process terminated\n");
            ssize_t nbytes = read(sigfd, &siginfo, sizeof siginfo);
            if (nbytes != sizeof siginfo) {
                perror("read(sigfd)");
                return 1;
            }
            spawn_child();
        }
    }
}

As you can see, the initial overhead to create one file descriptor that exposes one signal is a bit more than installing a traditional signal handler. The 0MQ stuff is the same, except that now we're passing a list of two items to zmq_poll()—and one of them is a system file descriptor rather than a 0MQ socket. That of course is the key change. Finally, interpreting the outcome of zmq_poll() is totally different. Errors are errors, period: we make no exceptions for EINTR. Instead, socket activity and SIGCHLD both appear as readable things: a 0MQ socket that can recv() a message, or a file descriptor that we can read() from. (Note that it's essential to actually call read() on the signalfd() file descriptor: until you do that, the signal remains pending, the file descriptor remains readable, and zmq_poll() returns immediately.)

The disadvantage of this approach is portability: it only works with Linux 2.6.22 and later. More subtly, it only works with programming languages that expose signalfd().

signalfd() in Python

Unfortunately, current versions of Python (2.7.5 and 3.3.2 as I write this) do not expose signalfd() in the standard library. Luckily, Jean-Paul Calderone has written a wrapper, which you'll find in PyPI (https://pypi.python.org/pypi/python-signalfd). The documentation is a bit lacking and the API not quite complete, but I got it to work.

I installed it with

pip install --user python-signalfd

(You'll want to leave out the --user option if you're using a virtualenv.)

As with the C version, most of the changes are in main(). There's no more signal handler and no more child_terminated global variable.

Here's the code:

def main():
    # list of signals that we're interested in (just SIGCHLD)
    mask = [signal.SIGCHLD]

    # block SIGCHLD from being handled in the normal way
    # (otherwise, the signalfd does not work)
    signalfd.sigprocmask(signalfd.SIG_BLOCK, mask)

    # create the file descriptor that will be readable when
    # SIGCHLD happens, i.e. when a child process terminates
    sigfd = signalfd.signalfd(-1, mask, 0)

    # setup a 0MQ socket waiting for incoming TCP connections
    context = zmq.Context()
    resp = context.socket(zmq.REP)
    resp.bind("tcp://*:5433")

    # things we want to poll() on
    poller = zmq.Poller()
    poller.register(resp)
    poller.register(sigfd)

    spawn_child()

    while True:
        print('parent: poll(5000)')
        ready = poller.poll(5000)
        for (thing, flags) in ready:
            if flags & zmq.POLLIN == 0:
                continue
            if thing is resp:
                print('0MQ messages received')
            elif thing is sigfd:
                print('child process terminated')

                # YUCK: 128 is a magic number, sizeof(struct signalfd_siginfo)
                # on the Linux box where I wrote this code (kernel 3.11.0,
                # x86_64). You'll need to write a small C program to determine
                # its value on your machine.
                data = os.read(sigfd, 128)
                assert len(data) == 128
                spawn_child()

That magic number—128 for sizeof(struct signalfd_siginfo)—is what's missing from the signalfd module. Someone really should submit a patch for that. Even better would be code to read() the struct and unpack it to a Python namedtuple.

Author: Greg Ward
Published on: Dec 18, 2013, 9:30:06 AM - Modified on: Aug 30, 2014, 3:38:01 PM
Permalink - Source code