Queue.Queue-like class without the busy-wait

P

Paul L. Du Bois

Has anyone written a Queue.Queue replacement that avoids busy-waiting?
It doesn't matter if it uses os-specific APIs (eg
WaitForMultipleObjects). I did some googling around and haven't found
anything so far.

Because I know someone will ask: no, the busy-waiting hasn't been a
problem in my app. I'm just interested in reading the code.

p
 
P

Peter Hansen

Paul said:
Has anyone written a Queue.Queue replacement that avoids busy-waiting?
It doesn't matter if it uses os-specific APIs (eg
WaitForMultipleObjects). I did some googling around and haven't found
anything so far.

Because I know someone will ask: no, the busy-waiting hasn't been a
problem in my app. I'm just interested in reading the code.

I don't believe the term "busy-wait" applies here.
Busy-waiting is (in my experience) where you run in
tight loops without releasing the CPU, consuming all
CPU time available.

I'm fairly certain that while Queue (and other things
that wait in Python) does have a loop, you'll find that
it's not a busy-waiting loop and you definitely don't
get 100% CPU usage during the wait. It goes to sleep
for increasingly large periods of time while waiting,
if the code in threading._Condition is any indication.

-Peter
 
P

Paul L. Du Bois

Peter said:
I don't believe the term "busy-wait" applies here.
[Explanation]

Yes, well, you're right. I was thinking of calling it
"slacker-waiting" but didn't want to come off too cute.

p
 
A

Antoon Pardon

Op 2005-03-24 said:
Has anyone written a Queue.Queue replacement that avoids busy-waiting?
It doesn't matter if it uses os-specific APIs (eg
WaitForMultipleObjects). I did some googling around and haven't found
anything so far.

I started once, using the Timer class in the Threading Module to
break the lock. However the Timer class uses the same kind of
sleep-polling loop, to delay the exection and allow an intermediate
cancel, as the loop that is used in Queue.Queue, so that was no
gain.

I have still played with the idea, but haven't worked anything out
since then.
 
P

Paul Rubin

Antoon Pardon said:
I started once, using the Timer class in the Threading Module to
break the lock. However the Timer class uses the same kind of
sleep-polling loop, to delay the exection and allow an intermediate
cancel, as the loop that is used in Queue.Queue, so that was no
gain.

I've never checked this code but it wouldn't have occurred to me that
Queue uses any kind of timeout loop. Can't it work the obvious way
with a semaphore?
 
A

Antoon Pardon

Op 2005-03-25 said:
I've never checked this code but it wouldn't have occurred to me that
Queue uses any kind of timeout loop. Can't it work the obvious way
with a semaphore?

And how is this semaphore going to be released if the timeout is
reached?
 
P

Paul Rubin

Antoon Pardon said:
And how is this semaphore going to be released if the timeout is
reached?

I meant a semaphore to synchronize the queue when adding or removing
objects. Timeout would be handled with sigalarm or select.
 
A

Antoon Pardon

Op 2005-03-25 said:
I meant a semaphore to synchronize the queue when adding or removing
objects.

Last I looked there was a lock used for that.

The loop is only for when you cant remove or add an element immediatly
and there is a timeout.
Timeout would be handled with sigalarm or select.

How is select going to help? IMO you can't put a Queue in a select call.
And it is doubtfull if working with sigalarm will do the trick.

First of all is the problem the signal module in python is very limited.
IIRC all signals are routed to the main thread. So breaking a lock
by having the thread signaled is impossible in python.

You may provide your own signal module, but that may not be enough.
The last time I experimented with a pthreads in C, locks didn't
break by signalling the thread. That might be a bug, but I wouldn't
know since I'm not familiar with the pthread specifications.
 
P

Paul Rubin

Antoon Pardon said:
Last I looked there was a lock used for that.

OK, that amounts to the same thing.
The loop is only for when you cant remove or add an element immediatly
and there is a timeout.

How is select going to help? IMO you can't put a Queue in a select call.
And it is doubtfull if working with sigalarm will do the trick.

You could open a socket to your own loopback port and then select on
it, or something like that. The select call takes a timeout parameter.
First of all is the problem the signal module in python is very limited.
IIRC all signals are routed to the main thread. So breaking a lock
by having the thread signaled is impossible in python.

A signal handler in the main thread could release a lock that the
thread is waiting on.
 
A

Antoon Pardon

Op 2005-03-25 said:
OK, that amounts to the same thing.


You could open a socket to your own loopback port and then select on
it, or something like that. The select call takes a timeout parameter.

Well maybe you could use an os.pipe as a timeout lock then. When the lock is
instantiated you put one byte in it. Aquiring the lock is implemented by
reading one byte, releasing the lock is implemented by writing a byte.
Aquiring the lock with a timeout would make use of select.

It would require carefull coding, since you want to prevent the thread
blocking because of select returning indicating it could be read, but
between the select and the actual read an other thread already consumed
the byte.
A signal handler in the main thread could release a lock that the
thread is waiting on.

This wouldn't work. A thread would have no way knowing for what
purpose the lock was released, because the lock was released
by the thread holding the lock or because the signal handler
released the lock, both would look the same for the thread
aquiring the lock.

There is also the problem you can't direct which thread will
aquire the lock. Suppose you have two threads waiting on a
lock, one plain, one with a timeout. Your signal handler
kicks in and releases the lock. There is a good chance
the first thread will now aquire the thread and the thread
that used a timeout will continue to be blocked.
 
P

Paul Rubin

Antoon Pardon said:
Well maybe you could use an os.pipe as a timeout lock then. When the lock is
instantiated you put one byte in it. Aquiring the lock is implemented by
reading one byte, releasing the lock is implemented by writing a byte.
Aquiring the lock with a timeout would make use of select.

Hmm, if I understand what you're saying, you'd need a separate pipe
for every lock. Interesting idea but I think it would burn too many
file descriptors if you used a lot of them. My mentioning select also
is showing my age. That was the way of doing short sleeps before
usleep became widespread.
This wouldn't work. A thread would have no way knowing for what
purpose the lock was released, because the lock was released
by the thread holding the lock or because the signal handler
released the lock, both would look the same for the thread
aquiring the lock.

Yes, you'd need a separate lock for each blocked thread. There would
be a list of locks waiting for timeouts and the sigalarm handler would
release any for which a wakeup was due. You could use a priority
queue to maintain the timeout list, so that adding or servicing a
timeout would be O(log n).

The current implementation appears to wake up every few msec (50 msec
maximum if the thread stays blocked a long time) and check if the
timeout has expired. If you have 1000 threads doing that, it can
burn a fair amount of cycles.
 
A

Antoon Pardon

Op 2005-03-25 said:
Hmm, if I understand what you're saying, you'd need a separate pipe
for every lock. Interesting idea but I think it would burn too many
file descriptors if you used a lot of them. My mentioning select also
is showing my age. That was the way of doing short sleeps before
usleep became widespread.


Yes, you'd need a separate lock for each blocked thread. There would
be a list of locks waiting for timeouts and the sigalarm handler would
release any for which a wakeup was due. You could use a priority
queue to maintain the timeout list, so that adding or servicing a
timeout would be O(log n).

Well have a look at what I have written over the weekend. It uses
a seperate thread with one pipe for a wakeup mechanisme. I didn't
like using a signal handler because you never know what other
modules might have use for signals and how they might interfere.
Try it out and let me know what you think.


Not thoroughly tested:

------------------------------- tlock.py --------------------------------------

import threading
import os

import time

from heapq import heappush, heappop
from weakref import ref

from select import select

heapmutex = threading.Lock()
heap = []
heappipe = os.pipe()
sentinel = 365.25 * 50 * 24 * 3600 # 50 jaar
heappush(heap, (time.time() + sentinel, None, None, None))

class _Plock:

def __init__(self):
self.mutex = threading.Lock()
self.broken = False

def acquire(self):
self.mutex.acquire()

def release(self):
self.mutex.release()

class TimeOut(Exception):
pass


class Tlock:

def __init__(self):
self.mutex = threading.Lock()
self.locktable = [_Plock()]

def acquire(self, timeout=None):
self.mutex.acquire()
newlock = _Plock()
newlock.acquire()
self.locktable.append(newlock)
prevlock = self.locktable[-2]
if len(self.locktable) > 2 and timeout is not None:
heapmutex.acquire()
heappush(heap, (time.time() + timeout, ref(prevlock), self.mutex, self.locktable))
os.write(heappipe[1] , '-')
heapmutex.release()
self.mutex.release()
prevlock.acquire()
if prevlock.broken:
raise TimeOut, "lock timed out"

def release(self):
self.mutex.acquire()
self.locktable[0].release()
del self.locktable[0]
self.locktable[0].release()
self.mutex.release()


def lock_breaker():

heapfd = heappipe[0]
while True:
heapmutex.acquire()
waketime, pl, mutex, table = heap[0]
timeout = waketime - time.time()
while timeout <= 0.0:
lck = pl()
if lck is not None:
mutex.acquire()
try:
try:
i = table.index(lck, 1)
del table
lck.broken = True
lck.release()
# lck.release()
except ValueError:
pass
finally:
mutex.release()
heappop(heap)
waketime, pl, mutex, table = heap[0]
timeout = waketime - time.time()
heapmutex.release()
rdlst, wrlst, erlst = select([heapfd],[],[],timeout)
if rdlst:
os.read(rdlst[0],1)

breaker = threading.Thread(target = lock_breaker)
breaker.setDaemon(True)
breaker.start()

if __name__ == "__main__":

from time import sleep
from random import randint

T = Tlock()

rg = 5

def thrd(Id):

for Nr in xrange(20):
try:
print "Trying %d (loop %d)" % (Id, Nr)
T.acquire(randint(0,rg))
print "Entering %d (loop %d)" % (Id, Nr)
sleep(randint(0,rg))
print "Leaving %d (loop %d)" % (Id, Nr)
T.release()
except TimeOut, ErrId:
print "Failed %d (loop %d)" % (Id, Nr)
sleep(randint(0,rg))


for i in xrange(rg):
th = threading.Thread(target=thrd, args=(i,))
th.start()
 
P

Paul Rubin

Antoon Pardon said:
Well have a look at what I have written over the weekend. It uses
a seperate thread with one pipe for a wakeup mechanisme.

Thanks, I'll look at it. Why don't you use usleep instead of a pipe?
I decided over the weekend that using a separate thread with usleep is
the simplest thing to do in pure Python. If you want to use sigalarm,
that should be put in the low level C sigalarm handler so it gets
taken care separately from the Python interpreter does anything with
the signal. Locks should also be low level primitives.

I don't know how this stuff maps onto PyPy but I sincerely hope the
looping stuff goes away. Having a series of processing steps
connected by queues is a perfectly good way to organize a program, but
if there's even just 20 steps, then waiting for all those 50 msec
wakeups adds a whole second to the processing latency if the system
has been sleeping for a while. The latency really only needs to be in
the microseconds on a system with efficient enough threading.
 
A

Antoon Pardon

Op 2005-03-29 said:
Thanks, I'll look at it. Why don't you use usleep instead of a pipe?

Because with the pipe the "sleep" can be indeterminate.

The select make the thread sleep until either of the folowing
happens.

1) A timeout, which means one of the locks has to be broken

2) A byte was received. This means a lock was tried to be
acquired and inserted in the heap, so the timeout may
need to be recalculated. (acquiring a lock, sends a
byte over the pipe)
I decided over the weekend that using a separate thread with usleep is
the simplest thing to do in pure Python.

I'm not going to call my solution simple, but it wastes very few
cycles. if no thread is blocked on a lock, the select will just
block until that changes. No need for some kind of polling loop.
 
A

Antoon Pardon

Op 2005-03-29 said:
Because with the pipe the "sleep" can be indeterminate.

The select make the thread sleep until either of the folowing
happens.

1) A timeout, which means one of the locks has to be broken

2) A byte was received. This means a lock was tried to be
acquired and inserted in the heap, so the timeout may
need to be recalculated. (acquiring a lock, sends a
byte over the pipe)


I'm not going to call my solution simple, but it wastes very few
cycles. if no thread is blocked on a lock, the select will just
block until that changes. No need for some kind of polling loop.

And here is a small patch for it. It corrects the acquiring and
releasing of the heapmutex.

--- tlock.py 2005-03-29 14:25:09.000000000 +0200
+++ src/python/tlock.py 2005-03-29 14:25:43.000000000 +0200
@@ -67,6 +67,7 @@
heapmutex.acquire()
waketime, pl, mutex, table = heap[0]
timeout = waketime - time.time()
+ heapmutex.release()
while timeout <= 0.0:
lck = pl()
if lck is not None:
@@ -82,10 +83,11 @@
pass
finally:
mutex.release()
+ heapmutex.acquire()
heappop(heap)
waketime, pl, mutex, table = heap[0]
timeout = waketime - time.time()
- heapmutex.release()
+ heapmutex.release()
rdlst, wrlst, erlst = select([heapfd],[],[],timeout)
if rdlst:
os.read(rdlst[0],1)
@@ -107,17 +109,17 @@

for Nr in xrange(20):
try:
- print "Trying %d (loop %d)" % (Id, Nr)
+ print "Trying %2d (loop %2d)" % (Id, Nr)
T.acquire(randint(0,rg))
- print "Entering %d (loop %d)" % (Id, Nr)
+ print "Entering %2d (loop %2d)" % (Id, Nr)
sleep(randint(0,rg))
- print "Leaving %d (loop %d)" % (Id, Nr)
+ print "Leaving %2d (loop %2d)" % (Id, Nr)
T.release()
except TimeOut, ErrId:
- print "Failed %d (loop %d)" % (Id, Nr)
+ print "Failed %2d (loop %2d)" % (Id, Nr)
sleep(randint(0,rg))


- for i in xrange(rg):
+ for i in xrange(5 * rg):
th = threading.Thread(target=thrd, args=(i,))
th.start()
 
P

Paul Rubin

Antoon Pardon said:
I'm not going to call my solution simple, but it wastes very few
cycles. if no thread is blocked on a lock, the select will just
block until that changes. No need for some kind of polling loop.

I think I understand. My original idea was to use a heapq to be able
to know exactly when the next pending timeout is due, and usleep for
long enough to wake up at just the right time. Then you service the
timeout, pop the heap to find when the next timeout after that is, and
usleep again. No polling loops and no pipe. But it could be that
someone inserts a new timeout while you're sleeping, that's due before
you're scheduled to wake up. Your pipe scheme takes care of that,
since any thread can write to the pipe and wake up the blocked thread
at any time.

Really, the culprit here is the weak signalling scheme in Python.
There needs to be a way to send signals to threads, or raise
asynchronous exceptions in them. There's been some discussion in
sourceforge about that, but the issues involved are complex.

I think the best bet for the short term is handling it at the C level,
with sigalarm. Another way is to have chained sigalarm handlers in
the main thread.
 
A

Antoon Pardon

Op 2005-03-29 said:
I think I understand. My original idea was to use a heapq to be able
to know exactly when the next pending timeout is due, and usleep for
long enough to wake up at just the right time. Then you service the
timeout, pop the heap to find when the next timeout after that is, and
usleep again. No polling loops and no pipe. But it could be that
someone inserts a new timeout while you're sleeping, that's due before
you're scheduled to wake up. Your pipe scheme takes care of that,
since any thread can write to the pipe and wake up the blocked thread
at any time.

Right, that is the idea.
Really, the culprit here is the weak signalling scheme in Python.
There needs to be a way to send signals to threads, or raise
asynchronous exceptions in them. There's been some discussion in
sourceforge about that, but the issues involved are complex.

Well I have raised this issue before and as far as I understand,
the big problem seems to be the various kind of behaviour you
can get depending on what platform you are working, so writing
a module so that python programs behave the same on various
platforms seems a hell of a job.

So I decided not to pester the python people for this kind
of functionality, although I would very much like to have
it.

I have been playing with the C-API and have somekind of
class that allows one thread to raise an excetion in an
other but that wouldn't be a solution here, since the
raised exception will not manifest itself while the
thread is in a C-function.
I think the best bet for the short term is handling it at the C level,
with sigalarm. Another way is to have chained sigalarm handlers in
the main thread.

Possible, but I don't have the time to investigate that
possibility now.
 
P

Paul Rubin

Antoon Pardon said:
Well I have raised this issue before and as far as I understand, the
big problem seems to be the various kind of behaviour you can get
depending on what platform you are working, so writing a module so
that python programs behave the same on various platforms seems a
hell of a job.

I think there are some issues that are even more fundamentally
confusing. It's hard to figure out exactly what behavior is
desirable, let alone how to implement it.
So I decided not to pester the python people for this kind of
functionality, although I would very much like to have it.

Yes, I hope something happens sometime, but I don't have any immediate
concrete suggestions. I'm not enough of a concurrency whiz to know how
other languages handle it.
I have been playing with the C-API and have somekind of class that
allows one thread to raise an excetion in an other but that wouldn't
be a solution here, since the raised exception will not manifest
itself while the thread is in a C-function.

Yes, that sounds hairy if done in a general way.
Possible, but I don't have the time to investigate that possibility now.

Actually there's a simple and obvious approach: Linux and Windows both
already implement semaphore objects with timeouts (see "man semop"
under Linux). Other modern Unixes probably also have them. So I'd
think it would be straightforward to just make a C module that wraps
these semaphores with the C API.
 
A

Antoon Pardon

Op 2005-03-30 said:
Actually there's a simple and obvious approach: Linux and Windows both
already implement semaphore objects with timeouts (see "man semop"
under Linux). Other modern Unixes probably also have them. So I'd
think it would be straightforward to just make a C module that wraps
these semaphores with the C API.

I'm not sure that this would be an acceptable approach. I did the man
semop and it indicates this is part of system V IPC. This makes me
fear that semaphores will use file descriptors or other resources
that are only available in a limited amount. Not usefull if you are
talking about thousands of threads.
 
P

Paul Rubin

Antoon Pardon said:
I'm not sure that this would be an acceptable approach. I did the man
semop and it indicates this is part of system V IPC. This makes me
fear that semaphores will use file descriptors or other resources
that are only available in a limited amount. Not usefull if you are
talking about thousands of threads.

That would be terrible, if semaphores are as heavy as file descriptors.
I'd like to hope the OS's are better designed than that.

So, if we have to do this at user level, the two best choices seem to
be either your pipe method, or an asynchronous sigalarm handler that
goes and releases any timed out locks. The sigalarm method is
conceptually cleaner, but it involves hacking C code, so it's not so
easy. Plus I think using sigalarm results in more total system calls
if there's lots of timeouts. I do believe that the current scheme
with the slow-motion spinlocks is pretty revolting and that any of the
alternatives we've discussed are better.

I wonder what the Pypy folks are doing. It would be great if they
have some general treatment of asynchronous exceptions.
 

Ask a Question

Want to reply to this thread or ask your own question?

You'll need to choose a username for the site, which only take a couple of moments. After that, you can post your question and our members will help you out.

Ask a Question

Members online

No members online now.

Forum statistics

Threads
473,734
Messages
2,569,441
Members
44,832
Latest member
GlennSmall

Latest Threads

Top