Op 2005-03-25 said:
Hmm, if I understand what you're saying, you'd need a separate pipe
for every lock. Interesting idea but I think it would burn too many
file descriptors if you used a lot of them. My mentioning select also
is showing my age. That was the way of doing short sleeps before
usleep became widespread.
Yes, you'd need a separate lock for each blocked thread. There would
be a list of locks waiting for timeouts and the sigalarm handler would
release any for which a wakeup was due. You could use a priority
queue to maintain the timeout list, so that adding or servicing a
timeout would be O(log n).
Well have a look at what I have written over the weekend. It uses
a seperate thread with one pipe for a wakeup mechanisme. I didn't
like using a signal handler because you never know what other
modules might have use for signals and how they might interfere.
Try it out and let me know what you think.
Not thoroughly tested:
------------------------------- tlock.py --------------------------------------
import threading
import os
import time
from heapq import heappush, heappop
from weakref import ref
from select import select
heapmutex = threading.Lock()
heap = []
heappipe = os.pipe()
sentinel = 365.25 * 50 * 24 * 3600 # 50 jaar
heappush(heap, (time.time() + sentinel, None, None, None))
class _Plock:
def __init__(self):
self.mutex = threading.Lock()
self.broken = False
def acquire(self):
self.mutex.acquire()
def release(self):
self.mutex.release()
class TimeOut(Exception):
pass
class Tlock:
def __init__(self):
self.mutex = threading.Lock()
self.locktable = [_Plock()]
def acquire(self, timeout=None):
self.mutex.acquire()
newlock = _Plock()
newlock.acquire()
self.locktable.append(newlock)
prevlock = self.locktable[-2]
if len(self.locktable) > 2 and timeout is not None:
heapmutex.acquire()
heappush(heap, (time.time() + timeout, ref(prevlock), self.mutex, self.locktable))
os.write(heappipe[1] , '-')
heapmutex.release()
self.mutex.release()
prevlock.acquire()
if prevlock.broken:
raise TimeOut, "lock timed out"
def release(self):
self.mutex.acquire()
self.locktable[0].release()
del self.locktable[0]
self.locktable[0].release()
self.mutex.release()
def lock_breaker():
heapfd = heappipe[0]
while True:
heapmutex.acquire()
waketime, pl, mutex, table = heap[0]
timeout = waketime - time.time()
while timeout <= 0.0:
lck = pl()
if lck is not None:
mutex.acquire()
try:
try:
i = table.index(lck, 1)
del table
lck.broken = True
lck.release()
# lck.release()
except ValueError:
pass
finally:
mutex.release()
heappop(heap)
waketime, pl, mutex, table = heap[0]
timeout = waketime - time.time()
heapmutex.release()
rdlst, wrlst, erlst = select([heapfd],[],[],timeout)
if rdlst:
os.read(rdlst[0],1)
breaker = threading.Thread(target = lock_breaker)
breaker.setDaemon(True)
breaker.start()
if __name__ == "__main__":
from time import sleep
from random import randint
T = Tlock()
rg = 5
def thrd(Id):
for Nr in xrange(20):
try:
print "Trying %d (loop %d)" % (Id, Nr)
T.acquire(randint(0,rg))
print "Entering %d (loop %d)" % (Id, Nr)
sleep(randint(0,rg))
print "Leaving %d (loop %d)" % (Id, Nr)
T.release()
except TimeOut, ErrId:
print "Failed %d (loop %d)" % (Id, Nr)
sleep(randint(0,rg))
for i in xrange(rg):
th = threading.Thread(target=thrd, args=(i,))
th.start()