forked processes and testing

A

andrea crotti

I wrote a decorator that takes a function, run it in a forked process
and return the PID:

def on_forked_process(func):
from os import fork
"""Decorator that forks the process, runs the function and gives
back control to the main process
"""
def _on_forked_process(*args, **kwargs):
pid = fork()
if pid == 0:
func(*args, **kwargs)
sys.exit(0)
else:
return pid

return _on_forked_process

It seems in general to work but I'm not able to test it, for example this fails:

def test_on_forked_process(self):
@utils.on_forked_process
def _dummy_func():
pass

with self.assertRaises(SystemExit):
retpid = _dummy_func()
# pid of the son process should be always > 0
self.assertTrue(retpid > 0)


and I'm not sure why, nose doesn't like the Exit apparently even if
it's happening in an unrelated proces..

Any idea of how to make it testable or improve it?

In theory probably I will not use it for production because I should
use something smarter to control the various processes I need to run,
but for my integration tests it's quite useful, because then I can
kill the processes like

except KeyboardInterrupt:
from os import kill
from signal import SIGTERM
print("Killing sink and worker")
kill(sink_pid, SIGTERM)
kill(worker_pid, SIGTERM)
 

Ask a Question

Want to reply to this thread or ask your own question?

You'll need to choose a username for the site, which only take a couple of moments. After that, you can post your question and our members will help you out.

Ask a Question

Members online

No members online now.

Forum statistics

Threads
473,755
Messages
2,569,537
Members
45,020
Latest member
GenesisGai

Latest Threads

Top