A
Antoon Pardon
The queue and condition class allow threads to wait only a limited
time. However this currently is implemented by a polling loop.
Now for those who like to avoid polling I have here a Tlock
class that allows for a timeout but doesn't use polling.
All comments are welcome.
----------------------- Tlock.py -------------------------------
import threading
class TimeOut(Exception):
pass
class Tlock:
def __init__(self):
self.mutex = threading.Lock()
self.lktab = [threading.Lock()]
def acquire(self, timeout=None):
class SC:
pass
def break_lock(sc, mutex, tab):
mutex.acquire()
try:
try:
i = tab.index(sc.pl, 1)
del tab
sc.broken = True
sc.pl.release()
sc.pl.release()
except ValueError:
pass
finally:
mutex.release()
self.mutex.acquire()
sc=SC()
sc.ll = threading.Lock()
sc.ll.acquire()
self.lktab.append(sc.ll)
sc.pl = self.lktab[-2]
sc.broken = False
if len(self.lktab) > 2 and timeout is not None:
tm = threading.Timer(timeout, break_lock,
args=[sc, self.mutex, self.lktab])
tm.start()
else:
tm = None
self.mutex.release()
sc.pl.acquire()
if sc.broken:
raise TimeOut, "lock timed out"
else:
if tm is not None:
tm.cancel()
def release(self):
self.mutex.acquire()
self.lktab[0].release()
del self.lktab[0]
self.lktab[0].release()
self.mutex.release()
if __name__ == "__main__":
from time import sleep
from random import randint
T = Tlock()
def thrd(Id):
for _ in xrange(100):
try:
print "Trying %d" % Id
T.acquire(randint(0,6))
print "Entering %d" % Id
sleep(randint(0,6))
print "Leaving %d" % Id
T.release()
except TimeOut, ErrId:
print "Failed %d" % Id
sleep(randint(0,6))
for i in xrange(5):
th = threading.Thread(target=thrd, args=(i,))
th.start()
time. However this currently is implemented by a polling loop.
Now for those who like to avoid polling I have here a Tlock
class that allows for a timeout but doesn't use polling.
All comments are welcome.
----------------------- Tlock.py -------------------------------
import threading
class TimeOut(Exception):
pass
class Tlock:
def __init__(self):
self.mutex = threading.Lock()
self.lktab = [threading.Lock()]
def acquire(self, timeout=None):
class SC:
pass
def break_lock(sc, mutex, tab):
mutex.acquire()
try:
try:
i = tab.index(sc.pl, 1)
del tab
sc.broken = True
sc.pl.release()
sc.pl.release()
except ValueError:
pass
finally:
mutex.release()
self.mutex.acquire()
sc=SC()
sc.ll = threading.Lock()
sc.ll.acquire()
self.lktab.append(sc.ll)
sc.pl = self.lktab[-2]
sc.broken = False
if len(self.lktab) > 2 and timeout is not None:
tm = threading.Timer(timeout, break_lock,
args=[sc, self.mutex, self.lktab])
tm.start()
else:
tm = None
self.mutex.release()
sc.pl.acquire()
if sc.broken:
raise TimeOut, "lock timed out"
else:
if tm is not None:
tm.cancel()
def release(self):
self.mutex.acquire()
self.lktab[0].release()
del self.lktab[0]
self.lktab[0].release()
self.mutex.release()
if __name__ == "__main__":
from time import sleep
from random import randint
T = Tlock()
def thrd(Id):
for _ in xrange(100):
try:
print "Trying %d" % Id
T.acquire(randint(0,6))
print "Entering %d" % Id
sleep(randint(0,6))
print "Leaving %d" % Id
T.release()
except TimeOut, ErrId:
print "Failed %d" % Id
sleep(randint(0,6))
for i in xrange(5):
th = threading.Thread(target=thrd, args=(i,))
th.start()