Multiprocessing / threading confusion

  • Thread starter Paul Pittlerson
  • Start date
P

Paul Pittlerson

I'm trying to understand data handling using multiprocessing and threading,haven't gotten very far without running into problems. This is my code:

#!/usr/bin/python

from multiprocessing import Process
from multiprocessing import Queue
from multiprocessing import current_process

from threading import Thread

import time

def gogo(qu):
w = Worker(qu)
w.start()

class Worker(Thread):
def __init__(self, Que):
super (Worker, self).__init__()

self._pid = current_process().pid

self.que = Que
self.que.put('started worker %s' % self._pid)

def run(self):
self.que.put('%s ticked' % self._pid)

def __del__(self):
self.que.put('%s has exited' % self._pid)

class Debugger(Thread):
def __init__(self, q):
super(Debugger, self).__init__()

self.q = q

def run(self):
while True:
time.sleep(1)

if not self.q.empty():
print self.q.get()

else: break;

if __name__ == '__main__':

debug_q = Queue()

debug = Debugger(debug_q)
debug.start()

for i in range(5):
d = Process(target=gogo, args=(debug_q,))
d.start()

What I expect to happen is the Debugger object will receive one string at atime, and read it from the queue. But that's not what I see the the output, the "started worker" stuff seems to print for every process, but "ticked"and "exited" will show up in unpredictable ways, I'm guessing they overwrite each other and therefore will not always appear in the output.

So I'm looking for help in trying to make sense of this kind of stuff, I thought this was the basic functionality that Queue() would take care of unless there is some other problem in my code.
 
M

marduk

I'm trying to understand data handling using multiprocessing and
threading, haven't gotten very far without running into problems. This is
my code:

[snip (not sure why you are using multiprocessing and threading at the
same time]

What I expect to happen is the Debugger object will receive one string at
a time, and read it from the queue. But that's not what I see the the
output, the "started worker" stuff seems to print for every process, but
"ticked" and "exited" will show up in unpredictable ways, I'm guessing
they overwrite each other and therefore will not always appear in the
output.

So I'm looking for help in trying to make sense of this kind of stuff, I
thought this was the basic functionality that Queue() would take care of
unless there is some other problem in my code.

My output is probably totally different than your output. I only get
the processes starting. Here's why: This stuff all runs
asynchronously. When you start the "Debugger" thread.. I see you put a
sleep() in it, but that guarantees nothing. At least on my machine
which is somewhat loaded ATM, by the time the Processes are started, the
Debugger thread has already finished (because of the check to see if the
queue was empty). Apparently it is took longer than 1 second from the
time the Debugger was started and the first Process was started.
Likewise, what you are getting is probably a case where the queue is
momentarily empty by the time the debugger loop gets ahold of the queue
lock and checks to see if it's empty. Therefore the Debugger quits.
Also because of the asynchronicity of processes, threads, you can not
guarantee the order that the processes will get the opportunity to put()
into the queue.

Also you can't (and shouldn't) depend on the time that __del__ gets
called. It can get called at any time, in any order and sometimes not
at all.*

Hope this helps.

*
http://docs.python.org/3/reference/datamodel.html?highlight=__del__#object.__del__
 
C

Chris Angelico

I'm trying to understand data handling using multiprocessing and threading, haven't gotten very far without running into problems. This is my code:


What I expect to happen is the Debugger object will receive one string ata time, and read it from the queue. But that's not what I see the the output, the "started worker" stuff seems to print for every process, but "ticked" and "exited" will show up in unpredictable ways, I'm guessing they overwrite each other and therefore will not always appear in the output.

So I'm looking for help in trying to make sense of this kind of stuff, I thought this was the basic functionality that Queue() would take care of unless there is some other problem in my code.

The first thing I notice is that your Debugger will quit as soon as
its one-secondly poll results in no data. This may or may not be a
problem for your code, but I'd classify it as code smell at best. Is
your goal here to make sure Debugger doesn't stop your process from
exiting? If so, a simpler and safer solution is to make it a daemon
thread.

The other thing I see here is your use of __del__ to print your exit
message. I don't know if Thread objects are involved in reference
loops, but if they are, __del__ (probably) won't be called immediately
on thread termination.

Your subprocesses are a little odd; they spin off another thread, then
halt the first thread. Why not simply do the work in the first thread?
You then treat the thread's __del__ method as the process's death,
which isn't strictly true, but probably close enough.

ChrisA
 
P

Paul Pittlerson

Also you can't (and shouldn't) depend on the time that __del__ gets
called. It can get called at any time, in any order and sometimes not
at all.*

Wow I did not know that! I was counting on that it reliably gets called when the object is destroyed.
 
P

Paul Pittlerson

The first thing I notice is that your Debugger will quit as soon as
its one-secondly poll results in no data. This may or may not be a
problem for your code, but I'd classify it as code smell at best. Is
your goal here to make sure Debugger doesn't stop your process from
exiting? If so, a simpler and safer solution is to make it a daemon
thread.

I didn't think it would be a problem, because unless the system is very
slow, the functions will finish in a fraction of a second, on my machine
it does not matter whether I have it set as 0.1 second or several seconds,
the output is still the same. It's not eloquent, but the point was just to
exit the test when no more prints are to be made.

But how can I fix the actual bug I was asking about though? I want to
print ticked and exited for all the processes, just to acknowledge to
myself that the code is working.. so I can proceed to experiment with
more complexity! :D
 
C

Chris Angelico

Wow I did not know that! I was counting on that it reliably gets called when the object is destroyed.

Even that isn't technically reliable, though in CPython, objects will
usually be __del__'d promptly as long as they're not in reference
cycles. But the main problem here is that the destruction of the
object has nothing to do with the ending of the thread or process; the
object will hang around for as long as the caller might want it.
You'll want to put your "end of process" code at the bottom of run(),
I think, unless there's some other place for it.

ChrisA
 
P

Piet van Oostrum

Paul Pittlerson said:
I didn't think it would be a problem, because unless the system is very
slow, the functions will finish in a fraction of a second, on my machine
it does not matter whether I have it set as 0.1 second or several seconds,
the output is still the same. It's not eloquent, but the point was just to
exit the test when no more prints are to be made.

But how can I fix the actual bug I was asking about though? I want to
print ticked and exited for all the processes, just to acknowledge to
myself that the code is working.. so I can proceed to experiment with
more complexity! :D

On my system I get the output:

started worker 75501
75501 ticked
75501 has exited
started worker 75505
75505 ticked
75505 has exited
started worker 75504
started worker 75502
started worker 75503
75502 ticked
75502 has exited
75504 ticked
75504 has exited
75503 ticked
75503 has exited

So all the provesses have their 'started' 'ticked' and 'exited' message.
But as others have indicated, because your code is timing dependent,
that is just coincidental. Because multiprocessing/multithreading is
inherently non-deterministic, the order of the messages will be
unpredictable. But you should not make the exiting of the Debugger non
deterministic, as you have it now. One way to do this is to count the
number of processes that have exited, and wait until all are done. In
this case you could count the number of 'exited' messages that have
arrived.

def run(self):
nbr_process = 5
while True:
time.sleep(1)

msg = self.q.get()
print msg
if 'exited' in msg:
nbr_process -= 1
if nbr_process == 0:
break

Of course the 5 should be given as a parameter.

This still leaves you with the uncertainty of the __del__ being called.
Why not just put the message at the end of the Worker run code?

def run(self):
self.que.put('%s ticked' % self._pid)
# do some work
time.sleep(1)
self.que.put('%s has exited' % self._pid)

However, in my experiments (bot Python 2.7.5 and 3.3.2 on Mac OS X
10.6.8) it seems that there is a problem with a Thread inside a Process:
When the main thread of the Process is finished, the other Thread is
also terminated, as if it is a daemon thread. Although self.daemon ==
False!!

You can check this with the following Worker code:

def run(self):
for n in range(5):
self.que.put('%s tick %d' % (self._pid, n))
# do some work
time.sleep(1)
self.que.put('%s has exited' % self._pid)

It appears that not all ticks are deliverd. In my system, only one tick
per thread, and then it disappears. I have no idea if this is a bug. I
certainly couldn't find it documented.

The solution to this is to put a join statement in gogo:

def gogo(qu):
w = Worker(qu)
w.start()
w.join()
 
P

Piet van Oostrum

def run(self):
for n in range(5):
self.que.put('%s tick %d' % (self._pid, n))
# do some work
time.sleep(1)
self.que.put('%s has exited' % self._pid)

To prevent the 'exited' message to disappear if there is an exception in
the thread you should protect it with try -- finally:

def run(self):
try:
for n in range(5):
self.que.put('%s tick %d' % (self._pid, n))
# do some work
time.sleep(1)
finally:
self.que.put('%s has exited' % self._pid)

This doesn't help for the premature termination of the thread, as that
isn't an exception. But use the w.join() for that. Or you could put the
'exited' message after the w.join() command.
 
P

Paul Pittlerson

Ok here is the fixed and shortened version of my script:

#!/usr/bin/python

from multiprocessing import Process, Queue, current_process
from threading import Thread
from time import sleep

class Worker():
def __init__(self, Que):
self._pid = current_process().pid
self.que = Que
self.que.put('started worker %s' % self._pid)

for n in range(5):
self.que.put('%s tick %d' % (self._pid, n))
# do some work
sleep(0.01)

self.que.put('%s has exited' % self._pid)

class Debugger(Thread):
def __init__(self, q):
super(Debugger, self).__init__()
self.q = q

def run(self):
while True:

sleep(0.1)

if not self.q.empty():
print self.q.get()

else:
break
#

if __name__ == '__main__':

debug_q = Queue()
debug = Debugger(debug_q)
debug.start()

for i in range(5):

d = Process(target=Worker, args=(debug_q,))
d.start()

This works great on linux, but does not run on windows (7). The behavior was: I
opened it with double clicking and so a window appeared and disappeared (no
text) then I opened it with IDLE and ran it there, where it worked a couple
times. Then reopened it with IDLE and this time it did not work at all. After
that the script did not run either through IDLE or opening directly.

What may be the reason it works on linux, but seems buggy on windows?
 
D

Dave Angel

#!/usr/bin/python

from multiprocessing import Process, Queue, current_process
from threading import Thread
from time import sleep

class Worker():
def __init__(self, Que):
self._pid = current_process().pid
self.que = Que
self.que.put('started worker %s' % self._pid)

for n in range(5):
self.que.put('%s tick %d' % (self._pid, n))
# do some work
sleep(0.01)

self.que.put('%s has exited' % self._pid)

class Debugger(Thread):
def __init__(self, q):
super(Debugger, self).__init__()
self.q = q

def run(self):
while True:

sleep(0.1)

if not self.q.empty():
print self.q.get()

else:
break
#

if __name__ == '__main__':

debug_q = Queue()
debug = Debugger(debug_q)
debug.start()

for i in range(5):

d = Process(target=Worker, args=(debug_q,))
d.start()

This works great on linux, but does not run on windows (7). The behavior was: I
opened it with double clicking and so a window appeared and disappeared (no
text) then I opened it with IDLE and ran it there, where it worked a couple
times. Then reopened it with IDLE and this time it did not work at all. After
that the script did not run either through IDLE or opening directly.

What may be the reason it works on linux, but seems buggy on windows?

In Linux, the Process() class works very differently. One effect is
that it's probably much quicker than in Windows. But also, the
relationship between the original process and the extra 5 is different.

I wouldn't even try to debug anything else till I add join() calls for
both the extra thread and the 5 extra processes. You could just stick a
raw_input() at the end to fake it, but that's just a temporary hack.

Untested:


if __name__ == '__main__':

debug_q = Queue()
debug = Debugger(debug_q)
debug.start()

processes = []
for i in range(5):

d = Process(target=Worker, args=(debug_q,))
processes.append(d)
d.start()
for proc in processes:
proc.join()
debug.join()


As for running it in various shells, you missed the only one worth
using: run it in the cmd shell. When you double-click, you can't
really be sure if that temporary cmd shell was really empty before it
vanished; it might just not have bothered to update its pixels on its
way out. And any IDE shell like IDLE has so many other glitches in it,
that bugs like these are as likely to be the IDE's fault as anything
else.
 
P

Piet van Oostrum

[...]
def run(self):
while True:

sleep(0.1)

if not self.q.empty():
print self.q.get()

else:
break [...]

This works great on linux, but does not run on windows (7). The behavior was: I
opened it with double clicking and so a window appeared and disappeared (no
text) then I opened it with IDLE and ran it there, where it worked a couple
times. Then reopened it with IDLE and this time it did not work at all. After
that the script did not run either through IDLE or opening directly.

What may be the reason it works on linux, but seems buggy on windows?

That it works on Linux is just coincidence. Your script is still timing
dependent because the while loop in Debug.run stops when the queue is
empty. As has been explained in other answers, the queue can just become
empty when Debug empties it faster than the other processes can fill it.
That is entirely dependent on the scheduling of the O.S. so you have no
control over it. You must use a safe way to stop, for example to count
the exited messages.

Another way is to join all the processes in the main program, and after
that put a special END message to the queue, which causes Debug to stop:

class Debugger(Thread):
....
def run(self):
while True:
sleep(0.1)
msg = self.q.get()
print(msg)
if 'END' in msg:
break

...main..
processes = []
for i in range(5):

d = Process(target=Worker, args=(debug_q,))
d.start()
processes.append(d)

for p in processes:
p.join()
debug_q.put('END')
 

Ask a Question

Want to reply to this thread or ask your own question?

You'll need to choose a username for the site, which only take a couple of moments. After that, you can post your question and our members will help you out.

Ask a Question

Members online

Forum statistics

Threads
473,780
Messages
2,569,611
Members
45,283
Latest member
JoannaGrif

Latest Threads

Top