UponAcquiring synchro. class

C

castironpi

from __future__ import with_statement
'''
3) upon_acquiring( lockA, lockB )( function, *ar, **kwar )

upon_acquiring spawns new thread upon acquiring locks A and B. Locks
may be specified in any order, as none is acquired until all are free.

The options to spawn a new thread upon call, lock, and not release
until "it's its turn"; just block until then; and vary honoring order
are open.

6) @with_self
Prepends the function object to itself to the parameter list

'''
'''
a lockA lockB
b lockA lockB
c lockA lockB
d lockA lockC
e lockB lockD
f lockE lockF

assert a< b
assert b< c
assert c< d
assert c< e
assert f in.
'''
'''
A (A,B)1 (A,C)2 (A,B)3
B (A,B)1 (A,B)3
C (A,C)2 (C,D,E)4
D (C,D,E)4
E (C,D,E)4
a.lock: A, a.state: Free, a.waiters: [ X1(a,b), X2(a,c), X3(a,b) ]
b.lock: B, b.state: Free, b.waiters: [ X1(a,b), X3(a,b) ]
c.lock: C, c.state: Free, X3.waiters: [ X2(a,c), X4(c,d,e) ]
d.lock: D, d.state: Free, X3.waiters: [ X4(c,d,e) ]
e.lock: E, e.state: Free, X3.waiters: [ X4(c,d,e) ]

acq a,b
x1= a,b
a.waiters+= x1
b.waiters+= x1
#same as
if a.state is free and b.state is free:
a.state= taken
b.state= taken
a.waiters-= x1
b.waiters-= x1
a.lock.release() #acq?
b.lock.release() #acq?
x1.lock.release()
acq a,c
x2= a,c
a.waiters+= x2
c.waiters+= x2
if a.state is free and c.state is free:
a.state= taken
c.state= taken
a.waiters-= x2
c.waiters-= x2
a.lock.release() #acq?
c.lock.release() #acq?
x2.lock.release()
acq a,b
x3= a,b
a.waiters+= x3
b.waiters+= x3
acq c,d,e
x4= c,d,e
c.waiters+= x4
d.waiters+= x4
e.waiters+= x4
'''

from thread import start_new_thread
from threading import Lock, Thread, Event
import time
from functools import partial

class LockDeps:
def __init__( self, lock, state, waiters ):
self.lock, self.state, self.waiters= \
lock, state, waiters
class LockSet:
#ok to use them elsewhere, just gets in line.
def __init__( self, *locks ):
self._locks= locks
self._lock= Lock()
self._lock.acquire()
self._remains= set( locks )
self._doneevt= Event()
self.th= None
self.retval= None
def release( self, *locks ):
for lock in locks:
lock.release()
self._remains.remove( lock )
def releaseall( self ):
for lock in self._remains:
lock.release()
self._remains.clear()

class UponAcquiring:
def __init__( self ):
self._deps= {}
self._oplock= Lock()
def acq( self, *locks ):
lckset= LockSet( *locks )
return partial( self._enqueue, lckset )
def _enqueue( self, lckset, fun, *ar, **kwar ):
with self._oplock:
for lock in lckset._locks:
dep= self._deps.get( lock )
if None is dep:
dep= LockDeps( lock, False, [] )
self._deps[ lock ]= dep
dep.waiters.append( lckset )
th= Thread( target= self._functhd,
args= ( lckset, fun )+ ar,
kwargs= kwar )
lckset.th= th
th.start()
self._analyze( lckset )
return lckset
def _functhd( self, lckset, fun, *ar, **kwar ):
try:
with lckset._lock:
lckset.retval=\
fun( lckset, *ar, **kwar )
lckset._doneevt.set()
finally:
with self._oplock:
lckset.releaseall()
for lock in lckset._locks:
self._deps[ lock ].state= False
self._analyze( lckset )
def _analyze( self, lckset ):
with self._oplock:
for lock in lckset._locks:
dep= self._deps[ lock ]
if dep.state: continue
for lckset in dep.waiters:
assert lock in lckset._locks
for lock2 in lckset._locks:
if self._deps[ lock2 ].state:
break
else:
for lock2 in lckset._locks:
dep2= self._deps[ lock2 ]
dep2.state= True
assert dep2.waiters.count(
lckset )== 1
dep2.waiters.remove(
lckset )
lock2.acquire()
lckset._lock.release()
break

results= []
ver= results.index
lcksets= set()
import random
from sys import stdout
def callback( locks, i ):
stdout.write( 'cb%i '% i )
time.sleep( random.uniform( 0, .01 ) )
results.append( i )
if random.choice( [ False, True ] ):
locks.releaseall()
#if random.random()< 0.1:
# raise Exception()

while 1:
class Case1:
lockA, lockB, lockC= Lock(), Lock(), Lock()
lockD, lockE, lockF= Lock(), Lock(), Lock()
a= ( lockA, lockB )
b= ( lockA, lockB )
c= ( lockA, lockB )
d= ( lockA, lockC )
e= ( lockB, lockD )
f= ( lockE, lockF )

ua= UponAcquiring()
for i, x in enumerate( [ a, b, c, d, e, f ] ):
lcksets.add( ua.acq( *x )( callback, i ) )

for lckset in lcksets:
lckset.th.join()
stdout.write( repr( results ) )
stdout.write( '\n' )
stdout.flush()
assert ver( 0 )< ver( 1 )
assert ver( 1 )< ver( 2 )
assert ver( 2 )< ver( 3 )
assert ver( 2 )< ver( 4 )
assert len( set( results ) )== len( results )
'''permissible orders e.g.:
[0, 5, 1, 2, 3, 4]
[5, 0, 1, 2, 3, 4]
[0, 5, 1, 2, 4, 3]
[5, 0, 1, 2, 4, 3]
'''
del results[:]
lcksets.clear()
 

Ask a Question

Want to reply to this thread or ask your own question?

You'll need to choose a username for the site, which only take a couple of moments. After that, you can post your question and our members will help you out.

Ask a Question

Members online

No members online now.

Forum statistics

Threads
473,770
Messages
2,569,584
Members
45,075
Latest member
MakersCBDBloodSupport

Latest Threads

Top