Ending data exchange through multiprocessing pipe

M

Michal Chruszcz

Hi,

I am adding support for parallel processing to an existing program
which fetches some data and then performs some computation with
results saved to a database. Everything went just fine until I wanted
to gather all of the results from the subprocesses.

First idea, which came to my mind, was using a queue. I've got many
producers (all of the workers) and one consumer. Seams quite simple,
but it isn't, at least for me. I presumed that each worker will put()
its results to the queue, and finally will close() it, while the
parent process will get() them as long as there is an active
subprocess. So I did this:
.... q.put(1)
.... q.close()
........ print queue.get()
....
1

This (of course?) hangs after first iteration of the loop. Delaying
second iteration by putting a sleep() call fixes the problem, since
the result of active_children is being some kind of refreshed, but
it's not the solution. One could say to iterate the exact number of
subprocesses I have, but let's presume such information isn't
available.

Due to my failure with queues I decided to have a try with pipes, and
again I found a behavior, which is at least surprising and not
obvious. When I use a pipe within a process everything works
perfectly:
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
EOFError

The problems appear in subprocess communication using pipes, though.
.... child.send(1)
.... child.close()
....
.... and hangs. No idea of fixing this, not even of a workaround, which
would solve my problem.

Most possibly I'm missing something in philosophy of multiprocessing,
but I couldn't find anything covering such a situation. I'd appreciate
any kind of hint on this topic, as it became a riddle I just have to
solve. :)

Best regards,
Michal Chruszcz
 
M

MRAB

Michal said:
Hi,

I am adding support for parallel processing to an existing program
which fetches some data and then performs some computation with
results saved to a database. Everything went just fine until I wanted
to gather all of the results from the subprocesses.

First idea, which came to my mind, was using a queue. I've got many
producers (all of the workers) and one consumer. Seams quite simple,
but it isn't, at least for me. I presumed that each worker will put()
its results to the queue, and finally will close() it, while the
parent process will get() them as long as there is an active
subprocess. So I did this:

... q.put(1)
... q.close()
...
... print queue.get()
...
1

This (of course?) hangs after first iteration of the loop. Delaying
second iteration by putting a sleep() call fixes the problem, since
the result of active_children is being some kind of refreshed, but
it's not the solution. One could say to iterate the exact number of
subprocesses I have, but let's presume such information isn't
available.

Due to my failure with queues I decided to have a try with pipes, and
again I found a behavior, which is at least surprising and not
obvious. When I use a pipe within a process everything works
perfectly:

Traceback (most recent call last):
File "<stdin>", line 1, in <module>
EOFError

The problems appear in subprocess communication using pipes, though.

... child.send(1)
... child.close()
...

... and hangs. No idea of fixing this, not even of a workaround, which
would solve my problem.

Most possibly I'm missing something in philosophy of multiprocessing,
but I couldn't find anything covering such a situation. I'd appreciate
any kind of hint on this topic, as it became a riddle I just have to
solve. :)
You could do this:

from multiprocessing.queues import Empty

queue = Queue()
Process(target=f, args=(queue,)).start()
while active_children():
try:
print queue.get(timeout=1)
except Empty:
pass
 
M

Michal Chruszcz

You could do this:

     from multiprocessing.queues import Empty

     queue = Queue()
     Process(target=f, args=(queue,)).start()
     while active_children():
         try:
             print queue.get(timeout=1)
         except Empty:
             pass

This one isn't generic. When I have tasks that all finish within 0.1
seconds the above gives 10x overhead. On the other hand, if I know the
results will be available after 10 hours there's no use in checking
every second.

Best regards,
Michal Chruszcz
 
M

Michal Chruszcz

Well, if the protocol for a worker is:
     <someloop>:
          <calculate>
          queue.put(result)
     queue.put(<worker_end_sentinel>)
     queue.close()

Then you can keep count of how many have finished in the consumer.

Yes, I could, but I don't like the idea of using a sentinel, if I
still need to close the queue. I mean, if I mark queue closed or close
a connection through a pipe why do I still have to "mark" it closed
using a sentinel? From my point of view it's a duplication. Thus I
dare to say multiprocessing module misses something quite important.

Probably it is possible to retain a pipe state using a multiprocessing
manager, thus omitting the state exchange duplication, but I haven't
tried it yet.

Best regards,
Michal Chruszcz
 
J

Jesse Noller

Yes, I could, but I don't like the idea of using a sentinel, if I
still need to close the queue. I mean, if I mark queue closed or close
a connection through a pipe why do I still have to "mark" it closed
using a sentinel? From my point of view it's a duplication. Thus I
dare to say multiprocessing module misses something quite important.

Probably it is possible to retain a pipe state using a multiprocessing
manager, thus omitting the state exchange duplication, but I haven't
tried it yet.

Best regards,
Michal Chruszcz

Using a sentinel, or looping on get/Empty pattern are both valid, and
correct suggestions.

If you think it's a bug, or you want a new feature, post it,
preferably with a patch, to bugs.python.org. Add me to the +noisy, or
if you can assign it to me.

Jesse
 
M

MRAB

Michal said:
This one isn't generic. When I have tasks that all finish within 0.1
seconds the above gives 10x overhead. On the other hand, if I know the
results will be available after 10 hours there's no use in checking
every second.
If there is a result in the queue then it will return immediately; if
not, then it will wait for up to 1 second for a result to arrive, but if
none arrives in that time then it will raise an Empty exception. Raising
only 1 exception every second won't consume a negligible amount of
processing power (I wouldn't worry about it).
 
P

Paul Boddie

I am adding support for parallel processing to an existing program
which fetches some data and then performs some computation with
results saved to a database. Everything went just fine until I wanted
to gather all of the results from the subprocesses.

[Queue example]

I have to say that I'm not familiar with the multiprocessing API, but
for this kind of thing, there needs to be some reasonably complicated
stuff happening in the background to test for readable conditions on
the underlying pipes or sockets. In the pprocess module [1], I had to
implement a poll-based framework (probably quite similar to Twisted
and asyncore) to avoid deadlocks and other undesirable conditions.

[Pipe example]

Again, it's really awkward to monitor pipes between processes and to
have them "go away" when closed. Indeed, I found that you don't really
want them to disappear before everyone has finished reading from them,
but Linux (at least) tends to break pipes quite readily. I got round
this problem by having acknowledgements in pprocess, but it felt like
a hack.
Most possibly I'm missing something in philosophy of multiprocessing,
but I couldn't find anything covering such a situation. I'd appreciate
any kind of hint on this topic, as it became a riddle I just have to
solve. :)

The multiprocessing module appears to offer map-based conveniences
(Pool objects) where you indicate that you want the same callable
executed multiple times and the results to be returned, so perhaps
this is really what you want. In pprocess, there's a certain amount of
flexibility exposed in the API, so that you can choose to use a map-
like function, or you can open everything up and use the
communications primitives directly (which would appear to be similar
to the queue-oriented programming mentioned in the multiprocessing
documentation).

One thing that pprocess exposes (and which must be there in some form
in the multiprocessing module) is the notion of an "exchange" which is
something that monitors a number of communications channels between
processes and is able to detect and act upon readable channels in an
efficient way. If it's not the Pool class in multiprocessing that
supports such things, I'd take a look for the component which does
support them, if I were you, because this seems to be the
functionality you need.

Paul

[1] http://www.boddie.org.uk/python/pprocess.html
 

Ask a Question

Want to reply to this thread or ask your own question?

You'll need to choose a username for the site, which only take a couple of moments. After that, you can post your question and our members will help you out.

Ask a Question

Similar Threads


Members online

No members online now.

Forum statistics

Threads
473,769
Messages
2,569,580
Members
45,055
Latest member
SlimSparkKetoACVReview

Latest Threads

Top