How to properly implement worker processes

  • Thread starter Dennis Jacobfeuerborn
  • Start date
D

Dennis Jacobfeuerborn

Hi,
I'm trying to implement a system for periodically checking URLs and I've run into problems with some of the implementation details. The URLs are supposed to be checked continuously until the config for an URL is explicitly removed.

The plan is to spawn a worker process for each URL that sends the status ofthe last check to its parent which keeps track of the state of all URLs. When a URL is no longer supposed to be checked the parent process should shutdown/kill the respective worker process.

What I've been going for so far is that the parent process creates a globalqueue that is passed to all children upon creation which they use to send status messages to the parent. Then for each process a dedicated queue is created that the parent uses to issue commands to the child.

The issue is that since the child processes spent some time in sleep() whena command from the parent comes they cannot respond immediately which is rather undesirable. What I would rather like to do is have the parent simplykill the child instead which is instantaneous and more reliable.

My problem is that according to the multiprocessing docs if I kill the child while it uses the queue to send a status to the parent then the queue becomes corrupted and since that queue is shared that means the whole thing pretty much stops working.

How can I get around this problem and receive status updates from all children efficiently without a shared queue and with the ability to simply kill the child process when it's no longer needed?

Regards,
Dennis
 
I

Ian Kelly

Hi,
I'm trying to implement a system for periodically checking URLs and I've run into problems with some of the implementation details. The URLs are supposed to be checked continuously until the config for an URL is explicitly removed.

The plan is to spawn a worker process for each URL that sends the status of the last check to its parent which keeps track of the state of all URLs.When a URL is no longer supposed to be checked the parent process should shutdown/kill the respective worker process.

What I've been going for so far is that the parent process creates a global queue that is passed to all children upon creation which they use to send status messages to the parent. Then for each process a dedicated queue iscreated that the parent uses to issue commands to the child.

The issue is that since the child processes spent some time in sleep() when a command from the parent comes they cannot respond immediately which israther undesirable. What I would rather like to do is have the parent simply kill the child instead which is instantaneous and more reliable.

My problem is that according to the multiprocessing docs if I kill the child while it uses the queue to send a status to the parent then the queue becomes corrupted and since that queue is shared that means the whole thing pretty much stops working.

How can I get around this problem and receive status updates from all children efficiently without a shared queue and with the ability to simply kill the child process when it's no longer needed?

The usual approach to killing worker processes safely is to send them
an "exit" command, which they should respond to by terminating
cleanly. Instead of using sleep(), have the workers do a blocking
get() on the queue with a timeout. This way they'll receive the
"exit" message immediately as desired, but they'll still wake up at
the desired intervals in order to do their work.
 
D

Dennis Jacobfeuerborn

The usual approach to killing worker processes safely is to send them

an "exit" command, which they should respond to by terminating

cleanly. Instead of using sleep(), have the workers do a blocking

get() on the queue with a timeout. This way they'll receive the

"exit" message immediately as desired, but they'll still wake up at

the desired intervals in order to do their work.

I was thinking about something like that but the issue is that this really only works when you don't do any actual blocking work. I may be able to getaround the sleep() but then I have to fetch the URL or do some other work that might block for a while so the get() trick doesn't work.
Also the child process might not be able to deal with such an exit command at all for one reason or another so the only safe way to get rid of it is for the parent to kill it.

The better option would be to not use a shared queue for communication and instead use only dedicated pipes/queues for each child process but the doesn't seem to be a way to wait for a message from multiple queues/pipes. If that were the case then I could simply kill the child and get rid of the respective pipes/queues without affecting the other processes or communicationchannels.

Regards,
Dennis
 
D

Dennis Jacobfeuerborn

The usual approach to killing worker processes safely is to send them

an "exit" command, which they should respond to by terminating

cleanly. Instead of using sleep(), have the workers do a blocking

get() on the queue with a timeout. This way they'll receive the

"exit" message immediately as desired, but they'll still wake up at

the desired intervals in order to do their work.

I was thinking about something like that but the issue is that this really only works when you don't do any actual blocking work. I may be able to getaround the sleep() but then I have to fetch the URL or do some other work that might block for a while so the get() trick doesn't work.
Also the child process might not be able to deal with such an exit command at all for one reason or another so the only safe way to get rid of it is for the parent to kill it.

The better option would be to not use a shared queue for communication and instead use only dedicated pipes/queues for each child process but the doesn't seem to be a way to wait for a message from multiple queues/pipes. If that were the case then I could simply kill the child and get rid of the respective pipes/queues without affecting the other processes or communicationchannels.

Regards,
Dennis
 
D

Dennis Lee Bieber

How can I get around this problem and receive status updates from all children efficiently without a shared queue and with the ability to simply kill the child process when it's no longer needed?

How much actual processing is done during the "check"?

Your description makes it sound like these are I/O bound operations
(combined with sleep()) -- and plain old threading tends to work fine
for I/O bound systems.

If you used threading, you could signal a thread to die via simply
setting a property in the thread:

t[x].die = True.

The thread would wrap a loop of the form:

while not self.die:
#do URL check
resque.put(status of check)
time.sleep()

Thereby doing away with many of your queues -- you'd only need the
result/status queue, and the thread would only exit at a clean point.

The next step up, depending on the overhead of spawning processes,
would be to still use control threads and a local result queue, but have
"do URL check" create the check process each time -- you could probably
use "proc.communicate()" to obtain the status via the process stdout
[and pass the URL via stdin]. The rest of the control thread remains the
same.

The third step: Still use controller threads, but the controller
thread would create a queue pair (to-process, from-process) on
initialization, and then spawn the process. You might even be able to
remove the time.sleep() from the thread level. Actually, checking the
docs, forget about the Queue... Use a Pipe

self.from, self.to = multiprocessing.Pipe()
self.p = multiprocessing.Process(target = worker,
args = (self.from, self.to, URL) )
workerdead = False
while not self.die:
try:
status = self.from.recv() #blocks until data
resque.put(status) #local Queue collecting results
except EOFError:
# whatever for unexpected shutdown
workerdead = True
if not workerdead:
self.to.send("SHUTDOWN")
while True:
status = self.from.recv()
if status == "SHUTTING DOWN": break
resque.put(status) #might have had a last cycle


The worker should poll rather than sleep.

def worker(pto, pfrom, URL): #note reverse of to/from connections
while True:
#do URL check
pto.send(status)
data = pfrom.poll(1.0) #sleep until command or time-out
if data:
command = pfrom.recv()
if command == "SHUTDOWN":
pto.send("SHUTTING DOWN")
break
 
I

Ian Kelly

I was thinking about something like that but the issue is that this really only works when you don't do any actual blocking work. I may be able to get around the sleep() but then I have to fetch the URL or do some other work that might block for a while so the get() trick doesn't work.

At a lower level, it is possible to poll on both the pipe and the
socket simultaneously. At this point though you might want to start
looking at an asynchronous or event-driven framework like twisted or
gevent.
Also the child process might not be able to deal with such an exit command at all for one reason or another so the only safe way to get rid of it isfor the parent to kill it.

I think you mean that it is the most "reliable" way. In general, the
only "safe" way to cause a process to exit is the cooperative
approach, because it may otherwise leave external resources such as
file data in an unexpected state that could cause problems later.
The better option would be to not use a shared queue for communication and instead use only dedicated pipes/queues for each child process but the doesn't seem to be a way to wait for a message from multiple queues/pipes. Ifthat were the case then I could simply kill the child and get rid of the respective pipes/queues without affecting the other processes or communication channels.

Assuming that you're using a Unix system:

from select import select

while True:
ready, _, _ = select(pipes, [], [], timeout)
if not ready:
# process timeout
else:
for pipe in ready:
message = pipe.get()
# process message
 
D

Dennis Jacobfeuerborn

At a lower level, it is possible to poll on both the pipe and the

socket simultaneously. At this point though you might want to start

looking at an asynchronous or event-driven framework like twisted or

gevent.

I was looking at twisted and while the Agent class would allow me to make async request it doesn't seem to support setting a timeout or aborting the running request. That's really the important part since the http request is really the only thing that might block for a while. If I can make the request asynchronously and abort it when I receive a QUIT command from the parent then this would pretty much solve the issue.
I think you mean that it is the most "reliable" way. In general, the

only "safe" way to cause a process to exit is the cooperative

approach, because it may otherwise leave external resources such as

file data in an unexpected state that could cause problems later.

True but the child is doing nothing but making http requests and reporting the result to the parent so killing the process shouldn't be too much of a deal in this case. A segfault in an Apache worker process is very similar in that it's an uncontrolled termination of the process and that works out fine.
The better option would be to not use a shared queue for communication and instead use only dedicated pipes/queues for each child process but the doesn't seem to be a way to wait for a message from multiple queues/pipes. If that were the case then I could simply kill the child and get rid of therespective pipes/queues without affecting the other processes or communication channels.



Assuming that you're using a Unix system:



from select import select



while True:

ready, _, _ = select(pipes, [], [], timeout)

if not ready:

# process timeout

else:

for pipe in ready:

message = pipe.get()

# process message

That looks like a workable solution. When I decide to kill a worker processI can remove the pipe from the pipes list and discard it since it's not shared.

Regards,
Dennis
 
D

Dennis Jacobfeuerborn

At a lower level, it is possible to poll on both the pipe and the

socket simultaneously. At this point though you might want to start

looking at an asynchronous or event-driven framework like twisted or

gevent.

I was looking at twisted and while the Agent class would allow me to make async request it doesn't seem to support setting a timeout or aborting the running request. That's really the important part since the http request is really the only thing that might block for a while. If I can make the request asynchronously and abort it when I receive a QUIT command from the parent then this would pretty much solve the issue.
I think you mean that it is the most "reliable" way. In general, the

only "safe" way to cause a process to exit is the cooperative

approach, because it may otherwise leave external resources such as

file data in an unexpected state that could cause problems later.

True but the child is doing nothing but making http requests and reporting the result to the parent so killing the process shouldn't be too much of a deal in this case. A segfault in an Apache worker process is very similar in that it's an uncontrolled termination of the process and that works out fine.
The better option would be to not use a shared queue for communication and instead use only dedicated pipes/queues for each child process but the doesn't seem to be a way to wait for a message from multiple queues/pipes. If that were the case then I could simply kill the child and get rid of therespective pipes/queues without affecting the other processes or communication channels.



Assuming that you're using a Unix system:



from select import select



while True:

ready, _, _ = select(pipes, [], [], timeout)

if not ready:

# process timeout

else:

for pipe in ready:

message = pipe.get()

# process message

That looks like a workable solution. When I decide to kill a worker processI can remove the pipe from the pipes list and discard it since it's not shared.

Regards,
Dennis
 

Ask a Question

Want to reply to this thread or ask your own question?

You'll need to choose a username for the site, which only take a couple of moments. After that, you can post your question and our members will help you out.

Ask a Question

Members online

Forum statistics

Threads
473,744
Messages
2,569,481
Members
44,900
Latest member
Nell636132

Latest Threads

Top