Threading Pool Event()

G

Graeme Matthew

Hi all

I just cannot seem to find any documentation that shows an example of using
the factory method Event() in threads. I have a thread pool and if there are
no jobs in a Queue I want them to wait for something to be inserted. When a
job is inserted I want to send an Event, the first thread that picks it up
runs with the job the rest wait for another insert Event.

I have been looking at some C, c++ implementations and some use a continious
loop within a thread that polls the queue then if one is found pops it from
the queue and runs with it.

Problem when threads are dormant is it not better having the main thread
(i.e process) sending a signal rather than 10 threads polling surely this is
cpu intensive ??

any helps or references on Event() is much appreciated

many thanks

Graeme
 
A

Aahz

I just cannot seem to find any documentation that shows an example of
using the factory method Event() in threads. I have a thread pool and
if there are no jobs in a Queue I want them to wait for something to
be inserted. When a job is inserted I want to send an Event, the first
thread that picks it up runs with the job the rest wait for another
insert Event.

Given that you're already using a Queue, there is no, repeat NO, reason
for using an Event. Just have your threads block on the Queue.
 
G

Graeme Matthew

Aahz

Thanks, ive actually been using your OSCON slides which have helped a lot.
So it technically correct that all worker threads in a thread pool are
either doing some work or polling the queue to find out if there is a job
to process ?

eg: (pseudocodish :))

def run(self):

while 1:

self.lock.acquire()

if QueueManager.HasJob:
job = QueueManager.GetFreeJob()
__processJob(job)

self.lock.release()

def __processJob(self):

blah blah blah .....


many thanks ......
 
C

Cliff Wells

Aahz

Thanks, ive actually been using your OSCON slides which have helped a lot.
So it technically correct that all worker threads in a thread pool are
either doing some work or polling the queue to find out if there is a job
to process ?

They don't poll the queue, they block waiting on it until something is
available.
eg: (pseudocodish :))

def run(self):

while 1:

self.lock.acquire()

if QueueManager.HasJob:
job = QueueManager.GetFreeJob()
__processJob(job)

self.lock.release()

def __processJob(self):

blah blah blah .....

More like:

def run():
while 1:
job = queue.get() # this blocks until something is queue.put()
_processjob(job)


Acquiring the locks isn't necessary and ill-advised. The Queue itself
can be thought of as a type of locking mechanism, so you raise the
possibility of a deadlock condition by nesting locks (for instance if
you also put locks around the queue.put()).

Regards,
 
A

Aahz

Thanks, ive actually been using your OSCON slides which have helped a
lot. So it technically correct that all worker threads in a thread
pool are either doing some work or polling the queue to find out if
there is a job to process ?

The whole point is that it *doesn't* poll. It blocks on a lock internal
to the Queue object. That makes it extremely efficient.
eg: (pseudocodish :))

def run(self):

while 1:

self.lock.acquire()

if QueueManager.HasJob:
job = QueueManager.GetFreeJob()
__processJob(job)

self.lock.release()

def __processJob(self):

blah blah blah .....

Nope, it's even simpler than that:

def run(self):
done = False
while not done:
job = q.get()
if job.done:
done = True
else:
__processJob(job)

The Queue handles all the locks for you.
 
G

Graeme Matthew

ok still a bit confused sorry .... first attempt at a thread pool

Am I correct in saying that each thread runs in a continious loop

each thread calls queue.get. The get method will block and not return
anything until an item is placed into it. When an item is placed into it,
one of the threads will get assinged a job i.e the first one that happens to
be in use during the cycle ?

The job is returned to the thread, it runs with the job and does all the
processing then returns and calls queue.get again and waits for a job to
become available ?

When placing a job via Queue.put() one must acquire a lock place it and then
release it

Am i aslo correct in saying that queue is actually doing the blocking and
controls which thread gets the job ?

Lastly, sorry for all the questions, surely the CPU usage is the same when
the queue is waiting for jobs and when the threads are polling as theyre all
in one process anyway

thanks very much for your help

Graeme
Am i getting this or am i way off :)
 
C

Cliff Wells

ok still a bit confused sorry .... first attempt at a thread pool

Am I correct in saying that each thread runs in a continious loop
Yes.

each thread calls queue.get. The get method will block and not return
anything until an item is placed into it. When an item is placed into
it, one of the threads will get assinged a job i.e the first one that
happens to be in use during the cycle ?

I think you have the idea, but I'm not sure (your terminology escapes me
a bit toward the end). What happens is each thread blocks waiting for
something to be placed on the queue. When something is put on the
queue, one of the available threads (i.e. one that is blocking on
queue.get() rather than processing a job). Only a single thread will be
woken for each item that is put on the queue. It isn't knowable which
thread that will be.
The job is returned to the thread, it runs with the job and does all
the processing then returns and calls queue.get again and waits for a
job to become available ?
Yes.

When placing a job via Queue.put() one must acquire a lock place it
and then release it

No. The locking semantics are internal to the Queue object. Do not
concern yourself with it. It simply works. Do not attempt to put locks
around it.
Am i aslo correct in saying that queue is actually doing the blocking
and controls which thread gets the job ?

In a sense (that is, for practical purposes). How it is done internally
by the interpreter isn't important.
Lastly, sorry for all the questions, surely the CPU usage is the same
when the queue is waiting for jobs and when the threads are polling as
theyre all in one process anyway

The threads are asleep while waiting. They don't consume any CPU. The
queue doesn't "wait" for jobs (that is, it doesn't loop, poll or
otherwise consume any CPU time), when you call queue.put() it is a
method call on an object, not a thread.
Am i getting this or am i way off :)

Just a little way off ;)

Regards,

Cliff
 
P

Peter Hansen

Graeme said:
ok so code like this is perfectly safe

def run(self):

while 1:

job = queue.get()

__processjob()

It's almost like you aren't even seeing Aahz' replies. ;-)

The above is certainly safe, but can't be terminated easily.
Just use the loop Aahz showed, which is the above plus the
ability to terminate.

-Peter
 
A

Alan Kennedy

Graeme said:
here is my code, please criticise it to death i wont take offence
[snip]

#ON A MULTIPLE CPU MACHINE 2 SEPERATE SERVERS SHOULD BE RUN
#THE INCOMING REQUEST WILL NEED TO BE SWITCHED BETWEEN SERVERS
#THIS IS DUE TO THE GLOBAL INTERPRETER LOCK (GIL) IN PYTHON

If my understanding of your intentions is correct, then I think you're
expecting that multiple python threads within the same interpreter
will migrate to multiple processors, and thus execute in parallel,
i.e. simultaneously on multiple processors.

However, the GIL prevents precisely that: if you have one python
thread running on one processor, it will lock the GIL, thus causing
all other python threads on other processors to be suspended until the
GIL is free again.

One way to achieve true concurrency across multiple processors is
to release the GIL when you are processing, which you cannot do from
within python code. You have to write a C language extension to do the
work, that releases the GIL before doing its job, and reacquires it
again when finished.

If you really want true concurrency, and still want to write in pure
python, then one way to do it is run multiple python interpreters in
separate OS processes, which are bound to different processors. Which
means that

1. You can no longer use Queue.Queue objects to communicate between
servers, since your servers no longer share an address space.

2. You have to use some alternate comms mechanism between the two
servers, such as shared memory, named pipes or something like Pyro.

3. You can no longer pass the socket object across wrapped in a "job"
object, since the file descriptor tables of the two server processes
will be different.

Lastly, I have a question of my own: I've read statements along the
lines of "having the GIL in python provides certain guarantees that
make certain things possible". I can't remember specifics. I'd be most
grateful if someone could point me to reading material (apart from the
source, Luke :) which describes what these guarantees are, and what
they make possible.

TIA,
 
C

Cliff Wells

What if you want to make sure that there are exactly
'n' threads in the queue a given moment ?

Why not simply start 'n' threads and have them wait on the queue?
I had a problem of a similar kind, which I solved by
deriving a "Thread monitor" class from Queue. I initialized
the Queue with a given size (the number of threads in it
a given moment, the number 'n' above). But, I found that
this does not happen accurately as I wanted. I solved
it by using a non-blocking queue and managing the exception
by using an Event and by using my own locks.

You keep saying "threads in the queue". Why are you putting the threads
in the queue? Usually the threads run independently and get their
*work* from the queue. Part of the reason for having a pool of threads
(aside from controlling the number of active threads) is to avoid the
overhead of starting new threads. By putting threads on the queue and
starting them when there's work to do you lose this benefit.
The code would look something like this ...
It uses a polling mechanism with sleeping.

Polling + sleeping seems bad for most applications. Polling when
there's nothing to do is a waste of CPU, sleeping when there's work to
be done adds latency. Combining the two is the worst of both worlds
(again qualifying the statement for the general case).

Looking at the code you posted, it isn't clear to me why you would take
this approach. It appears to me that your fundamental flaw is thinking
that the *threads* must be put on the queue rather than the data the
threads will act on and the problems you faced resulted from that
misunderstanding.

If you disagree, then perhaps you could give an example of an
application that would require this type of approach versus the
traditional producer/pool-of-consumers setup.

Regards,
 

Ask a Question

Want to reply to this thread or ask your own question?

You'll need to choose a username for the site, which only take a couple of moments. After that, you can post your question and our members will help you out.

Ask a Question

Members online

No members online now.

Forum statistics

Threads
473,769
Messages
2,569,580
Members
45,054
Latest member
TrimKetoBoost

Latest Threads

Top