forked processes and testing

  1. I wrote a decorator that takes a function, run it in a forked process
    and return the PID:

    def on_forked_process(func):
    from os import fork
    """Decorator that forks the process, runs the function and gives
    back control to the main process
    def _on_forked_process(*args, **kwargs):
    pid = fork()
    if pid == 0:
    func(*args, **kwargs)
    return pid

    return _on_forked_process

    It seems in general to work but I'm not able to test it, for example this fails:

    def test_on_forked_process(self):
    def _dummy_func():

    with self.assertRaises(SystemExit):
    retpid = _dummy_func()
    # pid of the son process should be always > 0
    self.assertTrue(retpid > 0)

    and I'm not sure why, nose doesn't like the Exit apparently even if
    it's happening in an unrelated proces..

    Any idea of how to make it testable or improve it?

    In theory probably I will not use it for production because I should
    use something smarter to control the various processes I need to run,
    but for my integration tests it's quite useful, because then I can
    kill the processes like

    except KeyboardInterrupt:
    from os import kill
    from signal import SIGTERM
    print("Killing sink and worker")
    kill(sink_pid, SIGTERM)
    kill(worker_pid, SIGTERM)
    andrea crotti, Sep 12, 2012
