is there enough information?

D

Dennis Lee Bieber

I want to waken a specific thread that's waiting on a S.O., synchro
object, based on an index.
Where is the thread? The idea of threads is that they have
independent "threads of control" -- but you seem to be passing some
object to a pair of functions that are supposed to take turns processing
the object. Are each of those functions running as a thread -- on some
non-thread data object?

There are no synchronization primitives visible anywhere in your
sample code (with does not do synchronization or parallelism).
From the docs:
Semaphore acquire: "The implementation may pick one at random."
If multiple threads are /blocked/ on a semaphore, yes... When the
semaphore is freed, any one of the blocked threads will be next. But I
don't see multiple threads in your sample code.
Thus semaphores will not do, outside of an array of them. Did I miss
something?
Passing the object around on queues, perhaps? One queue (and step
handler) per step.

def handler(step, input, output):
while True:
object = input.get()
if not object:
output.put(object)
break
object.step[step]()
output.put(object)

hthrd1 = threading.thread(handler, (1, stepQ1, stepQ2)) #pseudocode
hthrd2 = threading.thread(handler, (2, stepQ2, stepQ3))
....

o1 = myProcessObject()
o2 = myProcessObject()

stepQ1.put(o1)
stepQ1.put(o2)

hthrd1.start()
hthrd2.start()

stepQ1.put(None)

while True:
res = resultQ.get()
if not res: break
do stuff with results

A: I want a specific thread.
B: Semaphores pick one at random.
||| I don't want semaphores.
--
Wulfraed Dennis Lee Bieber KD6MOG
(e-mail address removed) (e-mail address removed)
HTTP://wlfraed.home.netcom.com/
(Bestiaria Support Staff: (e-mail address removed))
HTTP://www.bestiaria.com/
 
D

Dennis Lee Bieber

The relevant snippet is:

def thloop( thd ):
while thd.cont:
with thd.step[1]:
if not thd.cont: break
print( 'step 1', end= ' ' )
thd.ret= thd.cmd+ 1
with thd.step[3]:
print( 'step 3' )
thd.ret= None
thd.step.complete()

def op100( thd ):
with thd.step[0]:
print( 'step 0', end= ' ' )
thd.cmd= 100
with thd.step[2]:
print( 'step 2', end= ' ' )
ret1= thd.ret
assert ret1== 101


Show us the code for "thd" -- we have no idea of where thd.cont is
set... And how is "thd.step[x]" -- a list object, suddenly converted to
some other component object with a "thd.step.complete"?

The major thing I get out of this is that your "threads", which
should have a complete process sequence /within/ them, are being passed
around to a pair of functions which are trying to control the inner
workings of the threads. Where are op100 and thloop invoked? In separate
threads? Then why is the parameter called "thd".

Which is, it seems, totally backwards... Also... to my knowledge,
the "with" construct (I'm still on Python 2.4 and don't have "with") is
NOT the same thing as a Java "Synchronized" object. The common use of
"with" is with objects, like files, that are opened at the start of the
block, and need to be closed on block exit, regardless of how the block
is exited.

with something as open(fid): #syntax may be off
do stuff that may raise an exception

is

something = open(fid)
try:
do stuff that may raise an exception
something.close()
except blah-blah-blah:
something.close()
raise


Old comp.sci. theory => google "communicating sequential processes"
--
Wulfraed Dennis Lee Bieber KD6MOG
(e-mail address removed) (e-mail address removed)
HTTP://wlfraed.home.netcom.com/
(Bestiaria Support Staff: (e-mail address removed))
HTTP://www.bestiaria.com/
 
J

Jeff Schwab

I want to awaken -that- -specific- -thread-.

Semaphore acquire: "The implementation may pick one at random."
Lock acquire: "which one of the waiting threads proceeds is not
defined"
Event set: "All threads waiting for it to become true are awakened"
Condition notify: "This method wakes up one of the threads waiting"
Condition notifyAll: "Wake up all threads waiting on this condition"

I will contend it's impossible without a specialization. Certainly an
array of semaphores could fit the bill; is that what you were
suggesting?

A collection of semaphores could well be the right solution. Actually,
I'm getting the feeling that you want something like a Java Futures:

http://java.sun.com/j2se/1.5.0/docs/api/java/util/concurrent/Future.html

Does that sound promising?
 
G

Gabriel Genellina

I'm not quite sure a semaphore is exactly the synchronization object
I'm looking for, but I'm a little new to concurrency myself.

The easiest way to implement a producer-consumer model in Python, is using
a Queue.Queue object. It already implements the necesary synchronization
mechanisms. The producer(s) put items in the Queue; the consumer(s) get
items from it.
 
C

castironpi

The easiest way to implement a producer-consumer model in Python, is using  
a Queue.Queue object. It already implements the necesary synchronization  
mechanisms. The producer(s) put items in the Queue; the consumer(s) get  
items from it.

Doesn't Queue.Queue operate by side effect?
 
C

castironpi

A collection of semaphores could well be the right solution.  Actually,
I'm getting the feeling that you want something like a Java Futures:

http://java.sun.com/j2se/1.5.0/docs/api/java/util/concurrent/Future.html

Does that sound promising?- Hide quoted text -

Extremely promising! You could even have a subclass of FutureTask
enqueue itself to ExecutorService executor. You could have different
models of execution too, such as initializing with a priority,
scheduling conditions, etc.

The exact implementation is a little suspicious: does ExecutorService
run in its own thread, and callback FutureTasks in order, or in some
order? Python can do the same thing too.
 
C

castironpi

The relevant snippet is:
def thloop( thd ):
    while thd.cont:
        with thd.step[1]:
            if not thd.cont: break
            print( 'step 1', end= ' ' )
            thd.ret= thd.cmd+ 1
        with thd.step[3]:
            print( 'step 3' )
            thd.ret= None
        thd.step.complete()
def op100( thd ):
    with thd.step[0]:
        print( 'step 0', end= ' ' )
        thd.cmd= 100
    with thd.step[2]:
        print( 'step 2', end= ' ' )
        ret1= thd.ret
    assert ret1== 101

        Show us the code for "thd" -- we have no idea of where thd..cont is
set... And how is "thd.step[x]" -- a list object, suddenly converted to
some other component object with a "thd.step.complete"?

        The major thing I get out of this is that your "threads", which
should have a complete process sequence /within/ them, are being passed
around to a pair of functions which are trying to control the inner
workings of the threads. Where are op100 and thloop invoked? In separate
threads? Then why is the parameter called "thd".

        Which is, it seems, totally backwards... Also... to my knowledge,
the "with" construct (I'm still on Python 2.4 and don't have "with") is
NOT the same thing as a Java "Synchronized" object. The common use of
"with" is with objects, like files, that are opened at the start of the
block, and need to be closed on block exit, regardless of how the block
is exited.

with something as open(fid):    #syntax may be off
        do stuff that may raise an exception

is

something = open(fid)
try:
        do stuff that may raise an exception
        something.close()
except blah-blah-blah:
        something.close()
        raise

        Old comp.sci. theory => google "communicating sequential processes"

OOo-- good questions-- rare treat.

I invented the idiom of passing a thread object to an execution
routine: sort of a midpoint between subclassing Thread and
threads.start_new_thread. Come to think of it, if you rewrite:

def thloop( self ):
while self.cont:
with self.step[1]:
if not self.cont: break
print( 'step 1', end= ' ' )
self.ret= self.cmd+ 1
with self.step[3]:
print( 'step 3' )
self.ret= None
self.step.complete()

def op100( thd ):
with self.step[0]:
print( 'step 0', end= ' ' )
self.cmd= 100
with self.step[2]:
print( 'step 2', end= ' ' )
ret1= self.ret
assert ret1== 101

The Step class overloads the __getitem__ operator to enable sequence-
step specification. It also has a 'finalize'/'quit'/'open' method
(the latter of which is a somewhat counterintuitive name) to signal
that the thread is terminated, and any threads waiting for step[n] can
look the other way.

You raise a good point too:
should have a complete process sequence /within/ them, are being

Yes. Can you join these and respawn? Which makes more sense in the
situation?

I am exploring some possibilities for concurrency that Python
"availableizes," Lewis and Clark-style. Are you the frontier?
<grunts, guestures toward self> ME Tarzan.

In another language, you could write:

syncrostep.stepacq( 1 )
self.ret= self.cmd+ 1
syncrostep.release( 1 )

and

steplock= syncrostep.stepacq( 1 )
self.ret= self.cmd+ 1
steplock.release()
But I
don't see multiple threads in your sample code.

There are.

The SyncroStep class abstracts an array of semaphores with 'value' =
1, in other words, locks.

At the very least, it's a cool abstraction, even if thdA.haltjoin()/
thdB.haltandjoin()/ store.ret= store.cmd+ 1/ restart( thdA )/
restart( thdB ) or further is always better, and even if I forget what
threads were good for anyway.
 
D

Dennis Lee Bieber

I invented the idiom of passing a thread object to an execution
routine: sort of a midpoint between subclassing Thread and
threads.start_new_thread. Come to think of it, if you rewrite:
Well, I always used the threading module, and have never subclassed
threading.Thread -- I just pass in the execution "function" as the
target parameter... See the super long post I recently created.
The Step class overloads the __getitem__ operator to enable sequence-

What "Step" class? I don't recall ever seeing it, so I have no idea
what it does, or may do incorrectly.
Yes. Can you join these and respawn? Which makes more sense in the
situation?
I don't think so... But so far it still looks like your threads
aren't -- should be objects containing sequencing locks, and the
functions you've listed should be the threads instead. Or both are
totally misformed.
I am exploring some possibilities for concurrency that Python
"availableizes," Lewis and Clark-style. Are you the frontier?
<grunts, guestures toward self> ME Tarzan.
Tarzan would seem to have a long way to go before showing up in
public as Lord Greystoke. said:
In another language, you could write:

syncrostep.stepacq( 1 )
self.ret= self.cmd+ 1
syncrostep.release( 1 )
Looks like a pair of semaphore or lock statements surrounding some
action... Note that I see nothing in that middle ground that warrants a
critical section lock -- self.cmd is a read-only parameter, and self.ret
doesn't do anything where a mutex is going to save it.

Now, if you had a pair of threads doing:

mutex.acquire()
shared_object = shared_object + 1
mutex.release()

in parallel with

mutex.acquire()
shared_object = shared_object - 1
mutex.release

you'd have a valid case. Without the locks it is possible for the first
thread to read shared_object, get pre-empted, the second thread reads
the same value, then subtracts one from it, stores it back into
shared_object, THEN the first thread resumes control, adds one the value
it already read (NOT the new values that the second thread created) and
stores the result into shared_object.
There are.

The SyncroStep class abstracts an array of semaphores with 'value' =
1, in other words, locks.
Another class you haven't shown us.
restart( thdB ) or further is always better, and even if I forget what
threads were good for anyway.

Threads, in Python, are good for parallel processing of items that
tend to be I/O bound -- that is, stuff that blocks on lots of I/O calls
allowing other threads to execute until they block too. Due to the GIL
in the common Python implementation, threading is not useful for
number-crunching (CPU bound) processing.

Now, there is a very vocal group that recommend Twisted style
asynchronous call-backs for everything in the world... But I think that
group tends to forget that Windows I/O is incompatible with the
low-level select() call often used to do parallel I/O -- leaving it only
useful for the network socket I/O, but not local file I/O processing.
--
Wulfraed Dennis Lee Bieber KD6MOG
(e-mail address removed) (e-mail address removed)
HTTP://wlfraed.home.netcom.com/
(Bestiaria Support Staff: (e-mail address removed))
HTTP://www.bestiaria.com/
 
D

Dennis Lee Bieber

Warning -- long post follows


Which is, it seems, totally backwards... Also... to my knowledge,
the "with" construct (I'm still on Python 2.4 and don't have "with") is
NOT the same thing as a Java "Synchronized" object. The common use of
"with" is with objects, like files, that are opened at the start of the
block, and need to be closed on block exit, regardless of how the block
is exited.
Okay... I do need to retract this part... Looking at the PEP, the
"with" statement /can/ be made to act as a synchronization method IF
supplied with an object that performs locking/unlocking as part of the
"context manager" methods.

But I still don't see any such in your usage. If it is there, it is
not in the code you show us.

Note that such synchronization of a critical section is meant
only to ensure that one processing thread can execute the code (or
access some data) in the critical section at a time. It is NOT meant to
provide sequential control between threads. Sequential processing needs
multiple signalling methods. You have four, I believe, critical sections
(though they aren't critical in terms of being a shared access item --
nothing is shared between your threads) -- hence you would need four
separate locking objects.

-=-=-=-=-=-=-

I wasted time at work on implementing two schemes. (beware of line
wraps in the code)

----- staged.py
import random
import time
import threading
import Queue
import sys

msgQ = Queue.Queue()

class StagedObject(object):
def __init__(self, ID=None):
if ID:
self.id = "userID_%s" % ID
else:
self.id = "systemID_%s" % id(self)
self.accumulator = 0
msgQ.put( "%s initialized\n" % self.id)
def step1(self):
slp = random.randint(1, 10) / 2.0
msgQ.put( "%s in step 1, sleeping %s\n" % (self.id, slp))
time.sleep(slp)
self.accumulator += slp
msgQ.put( "%s in step 1, leaving\n" % self.id)
def step2(self):
slp = random.randint(1, 20) / 2.0
msgQ.put( "%s in step 2, sleeping %s\n" % (self.id, slp))
time.sleep(slp)
self.accumulator += slp
msgQ.put( "%s in step 2, leaving\n" % self.id)
def step3(self):
slp = random.randint(1, 5) / 2.0
msgQ.put( "%s in step 3, sleeping %s\n" % (self.id, slp))
time.sleep(slp)
self.accumulator += slp
msgQ.put( "%s in step 3, leaving\n" % self.id)
def step4(self):
slp = random.randint(4, 20) / 2.0
msgQ.put( "%s in step 4, sleeping %s\n" % (self.id, slp))
time.sleep(slp)
self.accumulator += slp
msgQ.put( "%s in step 4, leaving\n" % self.id)


stepQs = [ Queue.Queue(), #step 1 input
Queue.Queue(), #step 2
Queue.Queue(), #step 3
Queue.Queue(), #step 4
Queue.Queue() ] #results

def messageProcess():
while True:
msg = msgQ.get()
if not msg: break
sys.stdout.write(msg)
sys.stdout.flush()

#start the message processor
msgProc = threading.Thread(target=messageProcess)
msgProc.start()

def processStep(step, qin, qout):
while True:
obj = qin.get()
if obj:
getattr(obj, "step%s" % step)() #call the step process
qout.put(obj) #pass it to the next processor
if not obj: break #receiving None is signal to exit


processThreads = []
for i in range(4):
#create the threads that process only one step of the object
processThreads.append(
threading.Thread(target=processStep,
args=(i+1, stepQs, stepQs[i+1]),
name="StepThread%s" % (i + 1) ) )

#create a few staged objects and insert them onto the first queue
stepQs[0].put(StagedObject())
stepQs[0].put(StagedObject(ID="I have a Name"))
stepQs[0].put(StagedObject(ID=42))
stepQs[0].put(StagedObject())

#start the processing threads
for t in processThreads:
t.start()

stepQs[0].put(None) #end of processing

while True:
obj = stepQs[4].get()
if obj:
msgQ.put("Processed %s with result %s\n" % (obj.id,
obj.accumulator))
if not obj: break

msgQ.put("Completed\n")
msgQ.put(None)
msgProc.join()

-----
A four-stage "assembly line". Only one object may be on any given
stage at a time, and all objects must proceed through the four stages in
sequence. The time spent on a stage may vary, but should not delay
objects that are /in/ other stages. Stages are connected by queues which
enforce a first-in/first-out order on the assembled objects (first
object into stage 1 will be the first object out of stage 4), although
it is possible for more than one object to be in the queue between
stages.

Each stage is a thread. Each object does its own "assembly" when
the stage tells it to perform the step that stage controls. The
interface thereby is that each object provides a stepn() method that can
be invoked by the "line". The contents of the object may be completely
different (emulated here via the random interval of sleep()).

Four objects are initialized and passed to the "line". The
signal that all objects have been processed is passing None along the
queues. The main thread collects final objects from the last queue.

An addition messaging thread and queue are used to ensure
sequential output of status messages.

This scheme can handle any number of four stage objects, as
there are four stage specific threads, and five queues the objects pass
through.
pythonw -u "staged2.py"
systemID_10436144 initialized
userID_I have a Name initialized
userID_42 initialized
systemID_10436592 initialized
systemID_10436144 in step 1, sleeping 3.5
systemID_10436144 in step 1, leaving
userID_I have a Name in step 1, sleeping 4.0
systemID_10436144 in step 2, sleeping 2.0
systemID_10436144 in step 2, leaving
systemID_10436144 in step 3, sleeping 1.0
systemID_10436144 in step 3, leaving
systemID_10436144 in step 4, sleeping 6.0
userID_I have a Name in step 1, leaving
userID_42 in step 1, sleeping 4.5
userID_I have a Name in step 2, sleeping 1.5
userID_I have a Name in step 2, leaving
userID_I have a Name in step 3, sleeping 2.5
userID_I have a Name in step 3, leaving
userID_42 in step 1, leaving
systemID_10436592 in step 1, sleeping 4.0
userID_42 in step 2, sleeping 7.5
systemID_10436144 in step 4, leaving
userID_I have a Name in step 4, sleeping 4.5
Processed systemID_10436144 with result 12.5
systemID_10436592 in step 1, leaving
userID_I have a Name in step 4, leaving
Processed userID_I have a Name with result 12.5
userID_42 in step 2, leaving
systemID_10436592 in step 2, sleeping 7.5
userID_42 in step 3, sleeping 1.5
userID_42 in step 3, leaving
userID_42 in step 4, sleeping 10.0
systemID_10436592 in step 2, leaving
systemID_10436592 in step 3, sleeping 1.0
systemID_10436592 in step 3, leaving
userID_42 in step 4, leaving
Processed userID_42 with result 23.5
systemID_10436592 in step 4, sleeping 10.0
systemID_10436592 in step 4, leaving
Processed systemID_10436592 with result 22.5
Completed
Exit code: 0

-=-=-=-=-=-=-=-
----- twostage.py
import random
import time
import threading
import Queue
import sys

msgQ = Queue.Queue()

class StagedObject(object):
def __init__(self, ID=None):
if ID:
self.id = "userID_%s" % ID
else:
self.id = "systemID_%s" % id(self)
self.accumulator = 0
self.stage1 = threading.Lock()
self.stage2 = threading.Lock()
self.stage3 = threading.Lock()
self.stage4 = threading.Lock()
self.complete = threading.Lock()
self.stage1.acquire() #initially lock all
self.stage2.acquire()
self.stage3.acquire()
self.stage4.acquire()
self.complete.acquire()
msgQ.put( "%s initialized\n" % self.id)
def step1(self):
slp = random.randint(1, 10) / 2.0
msgQ.put( "%s in step 1, sleeping %s\n" % (self.id, slp))
time.sleep(slp)
self.accumulator += slp
msgQ.put( "%s in step 1, leaving\n" % self.id)
self.stage2.release()
def step2(self):
slp = random.randint(1, 20) / 2.0
msgQ.put( "%s in step 2, sleeping %s\n" % (self.id, slp))
time.sleep(slp)
self.accumulator += slp
msgQ.put( "%s in step 2, leaving\n" % self.id)
self.stage3.release()
def step3(self):
slp = random.randint(1, 5) / 2.0
msgQ.put( "%s in step 3, sleeping %s\n" % (self.id, slp))
time.sleep(slp)
self.accumulator += slp
msgQ.put( "%s in step 3, leaving\n" % self.id)
self.stage4.release()
def step4(self):
slp = random.randint(4, 20) / 2.0
msgQ.put( "%s in step 4, sleeping %s\n" % (self.id, slp))
time.sleep(slp)
self.accumulator += slp
msgQ.put( "%s in step 4, completed\n" % self.id)
self.complete.release()

def messageProcess():
while True:
msg = msgQ.get()
if not msg: break
sys.stdout.write(msg)
sys.stdout.flush()

def processOddSteps(obj):
obj.stage1.acquire()
obj.step1()
obj.stage1.release()
obj.stage3.acquire()
obj.step3()
obj.stage3.release()

def processEvenSteps(obj):
obj.stage2.acquire()
obj.step2()
obj.stage2.release()
obj.stage4.acquire()
obj.step4()
obj.stage4.release()

#start the message processor
msgProc = threading.Thread(target=messageProcess)
msgProc.start()

#create a few staged objects and insert them onto the first queue
sos = [ StagedObject(),
StagedObject(ID="I have a Name"),
StagedObject(ID=42),
StagedObject() ]

processThreads = []
for so in sos:
#create the threads that process each odd/even for each object
processThreads.append(
threading.Thread(target=processOddSteps,
args=(so,) ) )
processThreads.append(
threading.Thread(target=processEvenSteps,
args=(so,) ) )

#start the processing threads
for t in processThreads:
t.start()

#since all objects initialize as fully locked, the threads should be
blocked
#release stage 1 locks
for so in sos:
so.stage1.release()

#collect results -- note, some threads may finish sooner, but this is in
#object order

for so in sos:
so.complete.acquire()
msgQ.put( "Processed %s with result %s\n" % (so.id, so.accumulator))
so.complete.release()

msgQ.put("Completed\n")
msgQ.put(None)

msgProc.join()

-----
Python 2.4 variant, using explicit lock.acquire()/lock.release()
calls in place of "with lock" statements.

In this one, two threads are created for EACH of the objects to
be "assembled". One thread handles odd numbered steps, the other handles
even number steps. BUT, the steps still must be performed sequentially.
This is done by having each object contain a lock for each step
(including a lock for completion). The locks are initially acquired,
blocking all processing by the associated threads. When processing is to
begin, the first stage lock of each object is released. This allows the
odd-step thread (for that object) to be acquired, and stage 1 processing
to be invoked. The key facet here is that the processing for each stage
MUST release its lock of the next stage in sequence.

A message processing thread and queue also are used.

In this system, there are two threads for each object, and five
locks per object. For the four objects in the sample, this means 8
threads, and 20 locks are in play. Each object is sequential in steps
which are split between two threads, but multiple objects can be in the
same step (just in a different thread).
pythonw -u "twostage21.py"
systemID_10423760 initialized
userID_I have a Name initialized
userID_42 initialized
systemID_10423856 initialized
userID_I have a Name in step 1, sleeping 3.5
userID_42 in step 1, sleeping 4.5
systemID_10423856 in step 1, sleeping 3.5
systemID_10423760 in step 1, sleeping 1.0
systemID_10423760 in step 1, leaving
systemID_10423760 in step 2, sleeping 9.5
systemID_10423856 in step 1, leaving
systemID_10423856 in step 2, sleeping 4.5
userID_I have a Name in step 1, leaving
userID_I have a Name in step 2, sleeping 1.5
userID_42 in step 1, leaving
userID_42 in step 2, sleeping 4.5
userID_I have a Name in step 2, leaving
userID_I have a Name in step 3, sleeping 2.0
userID_I have a Name in step 3, leaving
userID_I have a Name in step 4, sleeping 6.0
systemID_10423856 in step 2, leaving
systemID_10423856 in step 3, sleeping 2.5
userID_42 in step 2, leaving
userID_42 in step 3, sleeping 1.0
userID_42 in step 3, leaving
userID_42 in step 4, sleeping 5.0
systemID_10423760 in step 2, leaving
systemID_10423856 in step 3, leaving
systemID_10423760 in step 3, sleeping 2.5
systemID_10423856 in step 4, sleeping 4.0
userID_I have a Name in step 4, completed
systemID_10423760 in step 3, leaving
systemID_10423760 in step 4, sleeping 2.5
systemID_10423856 in step 4, completed
userID_42 in step 4, completed
systemID_10423760 in step 4, completed
Processed systemID_10423760 with result 15.5
Processed userID_I have a Name with result 13.0
Processed userID_42 with result 15.0
Processed systemID_10423856 with result 14.5
Completed
Exit code: 0

-=-=-=-=-=-

Do either of these show some signs of what you are attempting... And
if not, please supply a complete, minimal, set of code that reproduces
your problem and can actually be executed...

Personally, I still don't understand WHY one would treat four phases
of a sequential process as one critical section invoked from four
different concurrent areas yet needing to be serialized.



--
Wulfraed Dennis Lee Bieber KD6MOG
(e-mail address removed) (e-mail address removed)
HTTP://wlfraed.home.netcom.com/
(Bestiaria Support Staff: (e-mail address removed))
HTTP://www.bestiaria.com/
 
P

Paul McGuire

Doesn't Queue.Queue operate by side effect?

Are you sure you aren't a Turing testbot? What does that question
even *mean*?
Wait! Don't bother answering, I'm already bored with this thread.

I agree with Gabriel. Instead of randomly sprinkling locks about
through different threads' code, use a synchronized queue like the one
in Queue.Queue.
See example code at http://docs.python.org/lib/QueueObjects.html.

-- Paul
 
G

Gabriel Genellina

Doesn't Queue.Queue operate by side effect?

What side effect? Producer thread(s) put items in the queue; consumer
thread(s) get items from it; the queue just ensures the proper
syncronization between them.

It's like a McDonalds: three guys (the producers) are taking orders from
the customers, and place those orders in a queue; in the kitchen, two guys
(the consumers) check the queue and prepare the requested sandwichs (it
doesn't matter which one gets which order). The finished sandwichs are put
onto another queue (the kitchen guys are acting as producers now) and
another guy -he may or may not be one of the first three; he's acting as a
consumer- takes the sandwich, puts it on a tray with other stuff and
delivers it to the customer.
 
D

Dennis Lee Bieber

It's like a McDonalds: three guys (the producers) are taking orders from
the customers, and place those orders in a queue; in the kitchen, two guys
(the consumers) check the queue and prepare the requested sandwichs (it
doesn't matter which one gets which order). The finished sandwichs are put
onto another queue (the kitchen guys are acting as producers now) and
another guy -he may or may not be one of the first three; he's acting as a
consumer- takes the sandwich, puts it on a tray with other stuff and
delivers it to the customer.

Has McDonald's changed that much... Last time I was in one, the
kitchen staff just took trays of finished patties, stuck them on a bun
with the stock ingredients, wrapped them, and put them on a
first-in/first-out rack from which the register staff pulled them as
needed... (I've not bought at a McD since I got sick off of one back in
1970 or so)

Now, Burger King... That's different -- they were always assembled
based upon the most recent register order...

Speak not of Wendy's -- they moved into town in my college days...
The "hot and juicy" was commonly taken to mean: patty dipped in pan
drippings, then nuked in microwave... And any CompSci person could
figure out that the "256 different ways" meant one had access to a tray
of 8 condiments, and had any combination of on or off for each... 8
condiments, let's see: ketchup, mayo, mustard, pickle, onion, lettuce,
tomato, cheese?
--
Wulfraed Dennis Lee Bieber KD6MOG
(e-mail address removed) (e-mail address removed)
HTTP://wlfraed.home.netcom.com/
(Bestiaria Support Staff: (e-mail address removed))
HTTP://www.bestiaria.com/
 
G

Gabriel Genellina

En Wed, 27 Feb 2008 06:25:25 -0200, Dennis Lee Bieber
Has McDonald's changed that much... Last time I was in one, the
kitchen staff just took trays of finished patties, stuck them on a bun
with the stock ingredients, wrapped them, and put them on a
first-in/first-out rack from which the register staff pulled them as
needed...

I think they work that way at peak hours - but e.g. at 3AM they just
prepare what is requested.
(I've not bought at a McD since I got sick off of one back in
1970 or so)

That happens from time to time, unfortunately, and not only with McD.
http://www.mcspotlight.org/media/press/mcds/reuters121001.html
Now, Burger King... That's different -- they were always assembled
based upon the most recent register order...

I like BK hamburguers... but I shouldn't eat them, as the doctor told me :(
Speak not of Wendy's -- they moved into town in my college days...
The "hot and juicy" was commonly taken to mean: patty dipped in pan
drippings, then nuked in microwave... And any CompSci person could
figure out that the "256 different ways" meant one had access to a tray
of 8 condiments, and had any combination of on or off for each... 8
condiments, let's see: ketchup, mayo, mustard, pickle, onion, lettuce,
tomato, cheese?

I went once to Wendy's, and couldn't finish my sandwich. It was
ho-rri-ble. It was not a surprise when they closed all their restaurants
in Argentina and leave the country, after being here for less than 4 years.
 
C

castironpi

Warning -- long post follows



Okay... I do need to retract this part... Looking at the PEP, the
"with" statement /can/ be made to act as a synchronization method IF
supplied with an object that performs locking/unlocking as part of the
"context manager" methods.

But I still don't see any such in your usage. If it is there, it is
not in the code you show us.

Note that such synchronization of a critical section is meant
only to ensure that one processing thread can execute the code (or
access some data) in the critical section at a time. It is NOT meant to
provide sequential control between threads. Sequential processing needs
multiple signalling methods. You have four, I believe, critical sections
(though they aren't critical in terms of being a shared access item --
nothing is shared between your threads) -- hence you would need four
separate locking objects.

This is very odd. Yours -seems- (*subjective) backwards to -me-. I
get the sense that yours and mine are orthogonal: you approach the
task "horizontally" where I approach it "vertically", for some rather
post-cognitive metric of approach, perhaps like breadth-first vs.
depth-first. But I digress.

I realize that concrete examples are very useful in a thourough
analysis of the differences between our code. Mine isn't very
concrete, but it might shed some light, keeping in mind that even if
it spotlights a flaw or weakness in either my or your solution, it's
not (repeat, NOT) a conclusive case for or against either. Don't get
defensive!

The application is actually a test bed for still another kind of lock
I'm whittling, ironically enough. The acq( thd, lck ) function sends
a message to thread thd to try to acquire lck. It's important that
lck be acquired from thd specifically, as (i) there are get_ident()
calls like in threading.RLock, and as (ii) thd should actually block.
acq, however, does not: it fires the call, waits for a completion
event -or- timeout, and returns the return value if there is one, or a
special one if thd blocked. That way I can check to make sure threads
are blocking on the right calls to the derived acquire method, and
returning the right values if they're not.

Rough overview:

#Case 1
[snip]
assert acq( thread3, lock3 ) is Blocks
assert acq( thread2, lock2 ) is Fails
assert acq( thread4, lock4 ) is Fails
assert acq( thread1, lock1 ) is Acquires
[snip]

The results vary because of other things that happened to the threads
and locks earlier in the case. So I define acq like this:

def acq( thd, lck ):
thd.test_lock_to_acquire= lck
thd.set_cmd_assigned_signal.set()
thd.set_cmd_completed.wait( 1 )
ret= thd.cmd_return_value
thd.set_return_value_read.set()
return ret

The thread is running a loop, so that I can make multiple test calls
to the same thread, to simulate what happens if a thread acquires one
lock first, then another lock later, but, to repeat, I want the thread
that's governing the case to keep moving, whereas the thread that
makes the real acquire() call does actually block.

With acq, I set up an instruction-acknowledgement sequence, to get one
thread to do something that I don't determine ahead of time.
(Instruction and acknowledgement may not be the right words.) The
thread runs in a function, thread_loop, that takes the thd parameter,
a probably misnamed container object instance that merely contains the
synchro events, a continue flag, the lock to try to acquire or None,
and the result of trying to acquire it or None.

def thread_loop( thd ):
while thd.cont:
thd.set_cmd_assigned_signal.wait()
thd.cmd_return_value= Vacant
ret= thd.test_lock_to_acquire.acquire()
thd.cmd_return_value= ret
thd.set_cmd_completed.set()
thd.set_return_value_read.wait()

It first waits to be informed that it has an instruction pending.
Then it preps cmd_return_value, just in case the lock.acquire()
doesn't return, which it then attempts. Whenever it returns, it sets
cmd_return_value to the returned value, informs acq() that it has
returned and waits for acq() to inform it that it has successfully
copied the value. It may be that I don't have any use for the last
step, the corner case being if the case tries to use it twice in a
row.

To avoid the confusion of the trio of Event objects,
set_cmd_assigned_signal, set_cmd_completed, and set_return_value_read,
and the clear instructions not shown, I abstract it away, replacing
them with a single instance of a class (which I haven't shown), but
which does the same thing.

def acq( thd, lck ):
with thd.steps[0]:
thd.test_lock_to_acquire= lck
with thd.steps[2]:
ret= thd.cmd_return_value
return ret

steps[0] is a method in disguise. Unless it's never been called
before with index 0, it waits until steps.reset() is called. steps[1]
waits until the steps[0] context exits. steps[2] waits until the
steps[1] context exits (even if steps[1] hasn't entered yet!) and so
on. Later on, steps[0] enters again, and waits, again, for
steps.reset().

You may dispute that a class such as the one of which 'steps' is an
instance exists. That was my very first question. Specify 'acq'
only, and does a Steps class have enough information, from only the
details given and no more, to release each of its individual locks all
and only in the order of the indices? Regardless, here is how
thread_loop shapes up. Once again, 'thd' is a container, that now I'm
thinking is -probably- misnamed. Tell my publisher. ;)

def thread_loop( thd ):
while thd.cont:
thd.cmd_return_value= Vacant
with thd.steps[1]:
ret= thd.test_lock_to_acquire.acquire()
thd.cmd_return_value= ret
with thd.steps[3]:
pass
thd.steps.reset()

Here we find the missing steps[1] from earlier, which runs acquire on
the lock that acq assigned in steps[0]. If it returns, it records the
return value, and then steps[3] blocks for steps[2] to complete. This
fires a blank, but makes sure that steps.reset() isn't called before
steps[2] is done. It can go there if you want.

The implementation specific of the timeout, whether you want it in
steps[1] or steps[2], I intentionally left open. Either way, it might
take more than the brackets operator, such as:

with thd.steps.timingout( step= 1, timeout= 1 ):

or

with thd.steps[1].timingout( 1 ):

, which is actually possible. steps[n] may return a newly-created
object instance with only __enter__ and __exit__ methods, which just
reroute back to the steps instance, so it wouldn't be hard to add a
timingout method to handle this. Either way, I find this
(*subjective) a fabulous illustration of the power of the context
manager, especially with the extra timingout option, and it's a snap
to write. For the record, Step was only 44 lines long! The return
from __getitem__ was particularly cute:

return WithObj(
partial( self.ienter, index ),
partial( self.iexit, index ) )


Any questions?
 
M

Micah Cowan

Gabriel said:
I went once to Wendy's, and couldn't finish my sandwich. It was
ho-rri-ble. It was not a surprise when they closed all their restaurants
in Argentina and leave the country, after being here for less than 4 years.

You have _got_ to be kidding me. Where do you guys live? In California
(at least, Silicon Valley and Sacramento area), Wendy's and In-N-Out are
the only "fast food" chains that sell anything that taste like real
hamburgers. Way, _way_ better than the cardboard stuff at McD's and
BK. Though Carl's Jr's $6 burgers are okay. And Carl's has _terrific_
ice cream shakes, whereas Wendy's has... "frosted dairy desserts". :p

Great. Now I'm hungry.
 
D

Dennis Lee Bieber

I realize that concrete examples are very useful in a thourough
analysis of the differences between our code. Mine isn't very
concrete, but it might shed some light, keeping in mind that even if

Which may be part of the problem I'm having... I've not seen a
complete example -- even if it is NOT behaving as you'd like. All I keep
seeing is snippets of stand-alone functions, and mentions of lots of
classes that have never been shown in their entirety.
The application is actually a test bed for still another kind of lock

A good book on concurrency might be a recommendation... Not sure
what is currently on the market -- too many books have dropped theory to
emphasize some existing OS capability.

Consider: Ada, which has a language defined concept for concurrency
(tasks and protected objects) does NOT have a defined semaphore or mutex
type; if one wants to program using semaphores or mutexes, one needs to
create such using protected objects. Synchronization in Ada is done via
the "rendezvous" -- one task "calls" the synchronization entry point,
blocking there until the other task reaches code that "accepts" the
"call", this second task then executes some code in the accept block,
and when it exits the accept block, the first task is unblocked (with
access to any changed parameters that had been passed in the "call").

The closest Python equivalent to a rendezvous requires each task
(thread) to create a Queue object. A rendezvous would then look like:

[calling task]
....
called_task.entryQ.put((myReturnQ, input, args, ...))
return_values = myReturnQ.get()

[called task]
....
input_arg = entryQ.get()
(returnQ, args) = (input_arg[0], input_arg[1:])
do stuff with args
returnQ.put((return, values, ...))

This neglects that Ada has selective accepts and entry calls --
wherein the first responding of a list of entries will activate.


I'm whittling, ironically enough. The acq( thd, lck ) function sends
a message to thread thd to try to acquire lck. It's important that
lck be acquired from thd specifically, as (i) there are get_ident()
calls like in threading.RLock, and as (ii) thd should actually block.
acq, however, does not: it fires the call, waits for a completion
event -or- timeout, and returns the return value if there is one, or a
special one if thd blocked. That way I can check to make sure threads

That is going to be very difficult... The only way to know if "thd"
(which still does not seem to fit the definition of a "thread" in any
threading model I know of -- unless I see all the code of it, and even
that may not answer this uncertainty) has become blocked is that it does
NOT return any status. The only way a status can be returned is if it
either acquires the lock, or times out rather than waiting indefinitely
for the lock to become available.
def acq( thd, lck ):
thd.test_lock_to_acquire= lck
thd.set_cmd_assigned_signal.set()
thd.set_cmd_completed.wait( 1 )
ret= thd.cmd_return_value
thd.set_return_value_read.set()
return ret
Note that, if "thd" is an object that is truely running as a thread,
then all the above operations are being executed in the context of your
control and NOT in that of the thread.
The thread is running a loop, so that I can make multiple test calls
to the same thread, to simulate what happens if a thread acquires one

And again, everything I read tells me you are NOT running anything
Python would consider a "thread" -- you can not "call" methods that are
supposed to run as part of the thread itself from another thread. You
CAN call methods of a thread object that, themselves, set attributes of
the object that the thread itself will respond to...

[untested, written cold]
import time
import threading

class Something(object):
def __init__(self):
self.run = True
self.cycles = 0
# self.mutex = threading.Lock()
self.thd = threading.Thread(target=self.theThread)
self.thd.start()
def theThread(self):
while True:
# self.mutex.acquire() #protect access to .run
if self.run:
# self.mutex.release()
time.sleep(1.5)
self.cycles += 1
else:
# self.mutex.release()
break
def shutdown(self):
# self.mutex.acquire()
self.run = False
# self.mutex.release()
self.thd.join()
return self.cycles

aThing = Something()
#aThing is NOT a thread... aThing.thd IS running as a separate thread,
#and the only code that the thread is executing is the loop inside
#the function theThread

time.sleep(10) #let it run for a while

result = aThing.shutdown()
#shutdown does not run in the context of the "thd" thread... It is
#running as part of the main code -- in parallel with the thread.
#in this demonstration, it is safe to modify the .run attribute as
#the real thread is only reading it. If the real thread were updating
#it, a lock would need to be used by both theThread code and
#shutdown code so only one user can be modifying it at a time
# The commented out code represents this (which is why the loop
#in theThread looks so ugly

With acq, I set up an instruction-acknowledgement sequence, to get one
thread to do something that I don't determine ahead of time.
(Instruction and acknowledgement may not be the right words.) The
thread runs in a function, thread_loop, that takes the thd parameter,
a probably misnamed container object instance that merely contains the
synchro events, a continue flag, the lock to try to acquire or None,
and the result of trying to acquire it or None.

<sigh> If anything is a real thread, in common threading terms, it
would be your "thread_loop", while thd is just some object being used as
data. But you've never shown where any threading.Thread(...) instances
(or even the lower-level thread module) are created so I can't tell.
def thread_loop( thd ):
while thd.cont:
thd.set_cmd_assigned_signal.wait()
thd.cmd_return_value= Vacant
ret= thd.test_lock_to_acquire.acquire()
thd.cmd_return_value= ret
thd.set_cmd_completed.set()
thd.set_return_value_read.wait()
I still haven't seen any code for all those methods of "thd" and
that is where all the magic seems to lie.
It first waits to be informed that it has an instruction pending.

"It"? Which, "thread_loop" or "thd".
Then it preps cmd_return_value, just in case the lock.acquire()
doesn't return, which it then attempts. Whenever it returns, it sets

Meaningless, from what I can tell...

.acquire() MUST return at some point in time before ANY subsequent
statement in that loop will execute... If it never returns, you have a
deadlock. If it blocks for some period of time, you have a normal block
with eventual acquisition of the lock. If it doesn't block at all you
have immediate acquisition of the lock. So...
thd.cmd_return_value= Vacant
ret= thd.test_lock_to_acquire.acquire()
thd.cmd_return_value= ret

condenses down to one line:

thd.cmd_return_value = thd.test_lock_to_acquire.acquire()
To avoid the confusion of the trio of Event objects,
set_cmd_assigned_signal, set_cmd_completed, and set_return_value_read,
and the clear instructions not shown, I abstract it away, replacing
them with a single instance of a class (which I haven't shown), but
which does the same thing.
Another magic class that no one else has ever seen.
def acq( thd, lck ):
with thd.steps[0]:
thd.test_lock_to_acquire= lck
with thd.steps[2]:
ret= thd.cmd_return_value
return ret
Without seeing the code of thd, or that of the list of steps, this
looks nothing more than using a series of locks merely to protect the
assignment of single values -- and I see nothing that needs locking for
that purpose alone. The only thing this appears to need is a set of
Events, one per step... Or better -- if you have real threads so one
thread can set it, a Condition object...

def acq(thing, lck):
thing.condition.acquire() #acquire the condition lock
while thing.step != 0:
thing.condition.wait() #release lock, wait for notify
#it reacquires when notified
thing.test_lock_to_acquire = lck
while thing.step != 2:
thing.condition.wait()
thing.condition.release()
return thing.cmd_return_value

then in the other thread you invoke something like:

thing.set_step(new_step)

where set_step consists of:

def set_step(self, ns):
self.step = ns
self.condition.acquire() #get the condition lock
self.condition.notify() #set the notify for a waiting thread
# or .notifyAll()
self.condition.release() #release from here, allow waiting
#thread to acquire/proceed

If you STILL need to protect access to the variables, you need a
bigger context manager (since I don't have 2.5 this is all based on
documentation) thusly

def acq(thing, lck):
with step0(thing):
thing.text_lock_to_acquire = lck
with step2(thing):
ret = thing.cmd_return_value
return ret

where you have in thing (note -- it looks like context managers don't
take arguments, so I don't know how to really fake code this)

class StepControl(object):
def __init__(self, stp):
self.stp = stp
def __enter__(self, thng)
thng.condition.acquire()
while thng.step != self.stp:
thng.condition.wait()
thng.condition.release()
thng.criticalsectionlock.acquire()
def __exit__(self, thng) :
thng.criticalsectionlock.release()

thing.step0 = StepControl(0)
thing.step2 = StepControl(2)

said:
Any questions?

In the words of pretty much any poker player...

"It is time to show your cards or fold"




There is just too much hidden in unrevealed magic classes and unseen
code to make any determination.

While you are admitting that "thd" is probably /not/ a thread
itself, you haven't show enough to determine what IS a thread in your
scheme; and you likely need two threads (main code thread, and something
started from a threading.Thread instance) to do your interleaved steps.

After all, it IS your problem, yet I've written and supplied over 10
times the lines of actual (working and hypothetical) code in attempting
to understand what you are trying to accomplish.
--
Wulfraed Dennis Lee Bieber KD6MOG
(e-mail address removed) (e-mail address removed)
HTTP://wlfraed.home.netcom.com/
(Bestiaria Support Staff: (e-mail address removed))
HTTP://www.bestiaria.com/
 
C

castironpi

"It is time to show your cards or fold"

Here. Run it. Download Python 3.0a2.

from thread import start_new_thread as launch
from threading import Lock
import time
from functools import partial

class WithObj:
def __init__( self, enter, exit ):
self.__enter__, self.__exit__= enter, exit

class Step:
def __init__( self ):
self._index= 0
self._locks= {}
self._opened= False
self._oplock= Lock()
def __getitem__( self, index ):
with self._oplock:
lock= self._locks.get( index, None )
if None is lock:
lock= self._locks[ index ]= Lock()
if index!= self._index:
assert lock.acquire( False )
return WithObj(
partial( self.ienter, index ),
partial( self.iexit, index ) )
def ienter( self, index ):
with self._oplock:
if self._opened:
return self
lock= self._locks.get( index )
assert lock.acquire()
with self._oplock:
return self
def iexit( self, index, *a ):
with self._oplock:
self._index+= 1
lock= self._locks.get( self._index )
if None is not lock:
lock.acquire( False )
lock.release()
def complete( self ):
with self._oplock:
self._index= 0
lock= self._locks.get( 0 )
if None is not lock:
lock.acquire( False )
lock.release()
def open( self ):
with self._oplock:
self._opened= True
for lock in self._locks.values():
lock.acquire( False )
lock.release()

class CustThread:
count= 0
def __init__( self ):
CustThread.count+= 1
self.id= CustThread.count
self.step= Step()
self.cont= True
self.cmd= None
self.ret= None
def __repr__( self ):
return '<CustThread %i>'% self.id

def thloop( thd ):
while thd.cont:
with thd.step[1]:
if not thd.cont: break
print( 'step 1', end= ' ' )
thd.ret= thd.cmd+ 1
with thd.step[3]:
print( 'step 3' )
thd.ret= None
thd.step.complete()

def op100( thd ):
with thd.step[0]:
print( 'step 0', end= ' ' )
thd.cmd= 100
with thd.step[2]:
print( 'step 2', end= ' ' )
ret1= thd.ret
assert ret1== 101

def main( func ):
if __name__== '__main__':
func()

@main
def fmain():
class Case:
def __init__( self ):
self.th1= CustThread()

while 1:
print( '===============================' )
class Case1:
case= Case()
launch( thloop, ( case.th1, ) )
for _ in range( 10 ):
case.th1.cmd= None
case.th1.ret= None
op100( case.th1 )
case.th1.cont= False
case.th1.step.open()
print( 'case complete' )

while 1: time.sleep( 1 )

What results do you get on your computer?
 
G

Gabriel Genellina

You have _got_ to be kidding me. Where do you guys live? In California
(at least, Silicon Valley and Sacramento area), Wendy's and In-N-Out are
the only "fast food" chains that sell anything that taste like real
hamburgers. Way, _way_ better than the cardboard stuff at McD's and
BK. Though Carl's Jr's $6 burgers are okay. And Carl's has _terrific_
ice cream shakes, whereas Wendy's has... "frosted dairy desserts". :p

Perhaps Wendy's tried to impose here the "Californian taste" on
hamburguers - bad move... For example, those pieces of dry and burnt meat
that Americans call "barbecue" have nothing to do with Argentinian
"asado", and nobody here would eat such "barbecue" if given a choice. On
the other hand, McDonalds and Burger King have been here for a long time
now, and it seems they learned how to please the local consumers (McD even
has some certified kosher stores).
Great. Now I'm hungry.

Me too!
 
D

Dennis Lee Bieber

Here. Run it. Download Python 3.0a2.
I'm unlikely to download an alpha release when I haven't even
upgraded to 2.5 (maybe the next three day weekend I'll have time to
track down new versions of all the third party modules I have installed
and then download 2.5)

But I'll meet you half-way

said:
What results do you get on your computer?

Since "with" is a convenience being used to "hide" explicit entry
and exit code, I've hacked in said explicit calls...

-=-=-=-=-=-=-
import threading
##import time

####class WithObj:
#### def __init__( self, enter, exit ):
#### self.__enter__, self.__exit__= enter, exit
##
####class Step:
#### def __init__( self ):
#### self._index= 0
#### self._locks= {}
#### self._opened= False
#### self._oplock= Lock()
#### def __getitem__( self, index ):
#### with self._oplock:
#### lock= self._locks.get( index, None )
#### if None is lock:
#### lock= self._locks[ index ]= Lock()
#### if index!= self._index:
#### assert lock.acquire( False )
#### return WithObj(
#### partial( self.ienter, index ),
#### partial( self.iexit, index ) )
#### def ienter( self, index ):
#### with self._oplock:
#### if self._opened:
#### return self
#### lock= self._locks.get( index )
#### assert lock.acquire()
#### with self._oplock:
#### return self
#### def iexit( self, index, *a ):
#### with self._oplock:
#### self._index+= 1
#### lock= self._locks.get( self._index )
#### if None is not lock:
#### lock.acquire( False )
#### lock.release()
#### def complete( self ):
#### with self._oplock:
#### self._index= 0
#### lock= self._locks.get( 0 )
#### if None is not lock:
#### lock.acquire( False )
#### lock.release()
#### def open( self ):
#### with self._oplock:
#### self._opened= True
#### for lock in self._locks.values():
#### lock.acquire( False )
#### lock.release()

class StepLock(object): #cannot inherit from threading.Condition
#as Condition is a function returning an
#instance of _Condition
def __init__(self, lock=None):
self.condition = threading.Condition(lock)
self.reset()
def reset(self, step=0): #can be used as a "set"
self._step = step
## print "\tRESET: step %s" % self._step,
def nextstep(self, increment=1): #used for delta changes
self._step += increment
## print "\tNEXTSTEP: step %s" % self._step,
def acquire(self, *args):
## tid = int(time.clock() * 100000)
## print "\tACQUIRE %s..." % tid,
self.condition.acquire(*args)
## print "\t%s taken" % tid,
def wait(self, timeout=None):
## tid = int(time.clock() * 100000)
## print "\tWAITING %s..." % tid,
self.condition.wait(timeout)
## print "\t%s done" % tid,
def acquirestep(self, step):
self.acquire()
## print "\tWANT %s, at %s" % (step, self._step),
while self._step != step:
self.notify() #not at our step, so notify some other
self.wait() #waiting thread that it can check
def notify(self):
## print "\tNOTIFY",
self.condition.notify()
def notifyAll(self):
## print "\tNOTIFYALL",
self.condition.notifyAll()
def release(self):
## print "\tRELEASE",
self.condition.release()
def releasestep(self):
## print "\tRELEASESTEP",
self.nextstep()
self.notify()
self.release()

class Worker(object):
count= 0
def __init__( self ):
Worker.count+= 1
self.id= Worker.count
#### self.step= Step()
self.slock = StepLock()
self.running = True
self.cmd = None
self.ret = None
def __repr__( self ):
return '<Worker %s>' % self.id

def threadbody( worker ):
## print "\tworker entered",
while worker.running:
#### with worker.step[1]:
#### if not worker.running: break
#### print( 'step 1', end= ' ' )
#### worker.ret= worker.cmd+ 1
worker.slock.acquirestep(1)
print "\n\n%s step 1\n" % worker
worker.ret = worker.cmd + 1
worker.slock.releasestep()
#### with worker.step[3]:
#### print( 'step 3' )
#### worker.ret= None
worker.slock.acquirestep(3)
print "\n\n%s step 3\n" % worker
worker.ret = None
#### worker.step.complete()
worker.slock.reset()
worker.slock.release() #note -- not a notify

def op100( worker ):
#### with worker.step[0]:
#### print( 'step 0', end= ' ' )
#### worker.cmd= 100
## print "\top100 entered",
worker.slock.acquirestep(0)
print "\n\n%s step 0\n" % worker
worker.cmd = 100
worker.slock.releasestep()
#### with worker.step[2]:
#### print( 'step 2', end= ' ' )
#### ret1= worker.ret
worker.slock.acquirestep(2)
print "\n\n%s step 2\n" % worker
result = worker.ret
worker.slock.releasestep()
print "\n\nOP100 RESULT: %s\n" % result
assert result == 101

####def main( func ):
#### if __name__== '__main__':
#### func()
####
### you defined a decorator just to hide a two-line bit of code?

####@main
def fmain():
#### class Case:
#### def __init__( self ):
#### self.th1= CustThread()
### A class that just adds one more level of attribute
### indirection?

# while True:
# For this test I don't want an infinite loop
#### print( '===============================' )
worktask = Worker()
#### class Case1:
### I smell Java burning... A class containing no methods, just
### a static body?
#### case= Case()
#### launch( thloop, ( worktask, ) )
### better not to mix features of threading and thread (especially as
### threading itself imports thread
thrd = threading.Thread(target = threadbody,
args = (worktask,) )
thrd.setDaemon(True) #kill the thread if main body dies
thrd.start()
for _ in range( 5 ): #only five cycles for proof
print "=" * 40
worktask.cmd = None
worktask.ret = None
worktask.slock.acquire()
worktask.slock.notify() #signal that prepwork is done
worktask.slock.release() #let other threads check
op100( worktask )
worktask.running= False
## thrd.join() #daemon thread should die when we exit
#otherwise deadlock as thread is blocked
#on .acquirestep(0)
#### worktask.step.open()
print( '\n\ncase complete' )

#### while True: time.sleep( 1 )
### the previous "while" loop would never exit, so a second
### "while" loop doing 1 second sleeps after the first will
### never be executed (and would never exit itself)

if __name__ == "__main__":
fmain()
-=-=-=-=-=-=-=-=-

Output, clean, looks like:

-=-=-=-=-=-
========================================


<Worker 1> step 0



<Worker 1> step 1



<Worker 1> step 2



OP100 RESULT: 101

========================================


<Worker 1> step 3



<Worker 1> step 0



<Worker 1> step 1



<Worker 1> step 2



OP100 RESULT: 101

========================================


<Worker 1> step 3



<Worker 1> step 0



<Worker 1> step 1



<Worker 1> step 2



OP100 RESULT: 101

========================================


<Worker 1> step 3



<Worker 1> step 0



<Worker 1> step 1



<Worker 1> step 2



OP100 RESULT: 101

========================================


<Worker 1> step 3



<Worker 1> step 0



<Worker 1> step 1



<Worker 1> step 2



OP100 RESULT: 101



case complete


<Worker 1> step 3

-=-=-=-=-=-
Lots of blank lines as I had forced new-lines to put the core output
stand-alone when running with the commented out wolf-fences (debug
output) as shown next.

===============================
RESET: step 0 ACQUIRE 0... 0 taken NOTIFY RELEASE op100
entered ACQUIRE 5... worker entered ACQUIRE 8... 8 taken WANT
1, at 0 NOTIFY WAITING 12... 5 taken WANT 0, at 0

<Worker 1> step 0

RELEASESTEP NEXTSTEP: step 1 NOTIFY RELEASE ACQUIRE
17... 17 taken WANT 2, at 1 NOTIFY WAITING 21... 12 done

<Worker 1> step 1

RELEASESTEP NEXTSTEP: step 2 NOTIFY RELEASE ACQUIRE
26... 21 done

<Worker 1> step 2

RELEASESTEP NEXTSTEP: step 3 NOTIFY RELEASE

OP100 RESULT: 101

ACQUIRE 31... 26 taken WANT 3, at 3

<Worker 1> step 3

RESET: step 0 RELEASE ACQUIRE 35... 31 taken NOTIFY
RELEASE op100 entered ACQUIRE 38... 35 taken WANT 1, at 0
NOTIFY WAITING 40... 38 taken WANT 0, at 0

<Worker 1> step 0

RELEASESTEP NEXTSTEP: step 1 NOTIFY RELEASE ACQUIRE
45... 45 taken WANT 2, at 1 NOTIFY WAITING 48... 40 done

<Worker 1> step 1

RELEASESTEP NEXTSTEP: step 2 NOTIFY RELEASE ACQUIRE
55... 48 done

<Worker 1> step 2

RELEASESTEP NEXTSTEP: step 3 NOTIFY RELEASE

OP100 RESULT: 101

ACQUIRE 60... 55 taken WANT 3, at 3

<Worker 1> step 3

RESET: step 0 RELEASE ACQUIRE 63... 60 taken NOTIFY
RELEASE op100 entered ACQUIRE 66... 63 taken WANT 1, at 0
NOTIFY WAITING 68... 66 taken WANT 0, at 0

<Worker 1> step 0

RELEASESTEP NEXTSTEP: step 1 NOTIFY RELEASE ACQUIRE
73... 68 done

<Worker 1> step 1

RELEASESTEP NEXTSTEP: step 2 NOTIFY RELEASE ACQUIRE
78... 73 taken WANT 2, at 2

<Worker 1> step 2

RELEASESTEP NEXTSTEP: step 3 NOTIFY RELEASE

OP100 RESULT: 101

ACQUIRE 82... 78 taken WANT 3, at 3

<Worker 1> step 3

RESET: step 0 RELEASE ACQUIRE 86... 82 taken NOTIFY
RELEASE op100 entered ACQUIRE 89... 86 taken WANT 1, at 0
NOTIFY WAITING 91... 89 taken WANT 0, at 0

<Worker 1> step 0

RELEASESTEP NEXTSTEP: step 1 NOTIFY RELEASE ACQUIRE
97... 91 done

<Worker 1> step 1

RELEASESTEP NEXTSTEP: step 2 NOTIFY RELEASE ACQUIRE
101... 97 taken WANT 2, at 2

<Worker 1> step 2

RELEASESTEP NEXTSTEP: step 3 NOTIFY RELEASE

OP100 RESULT: 101

ACQUIRE 105... 101 taken WANT 3, at 3

<Worker 1> step 3

RESET: step 0 RELEASE ACQUIRE 108... 105 taken NOTIFY
RELEASE op100 entered ACQUIRE 111... 108 taken WANT 1, at 0
NOTIFY WAITING 113... 111 taken WANT 0, at 0

<Worker 1> step 0

RELEASESTEP NEXTSTEP: step 1 NOTIFY RELEASE ACQUIRE
119... 113 done

<Worker 1> step 1

RELEASESTEP NEXTSTEP: step 2 NOTIFY RELEASE ACQUIRE
123... 119 taken WANT 2, at 2

<Worker 1> step 2

RELEASESTEP NEXTSTEP: step 3 NOTIFY RELEASE

OP100 RESULT: 101



case complete
123 taken WANT 3, at 3

<Worker 1> step 3

RESET: step 0 RELEASE
-=-=-=-=-=-=-

The WAIT/DONE and ACQUIRESTEP/TAKEN outputs show the clock time
(multiplied by 100000 to get nice integers) so you can see which acquire
or wait actually succeeded. As with:

RESET: step 0 ACQUIRE 0... 0 taken NOTIFY RELEASE op100
entered ACQUIRE 5... worker entered ACQUIRE 8... 8 taken WANT
1, at 0 NOTIFY WAITING 12... 5 taken WANT 0, at 0

<Worker 1> step 0

The RESET is the one executed as part of initializing the StepLock
member of Worker. Even though thrd.start() is invoked, it is likely the
thread does not gain control at that point. That would mean the ACQUIRE
0 is the one in the range(5) loop, along with the NOTIFY and RELEASE,
followed by calling op100. ACQUIRE 5 is a result of the acquirestep(0).
NOW the thread actually gained control (worker entered) and ACQUIRE 8 8
are part of its acquirestep(1). The thread actually gets the condition
lock immediately, tests the current step level against the step is want
and finds a mismatch. The thread then signals for any other thread
waiting on the condition via NOTIFY, and then waits for another thread
to trigger a notify on it (WAIT 12). Control returns to op100 and the
acquirestep(0) ACQUIRE 5 is answered with 5 taken; the step level
matches the desired step, and the code for step 0 is executed...
--
Wulfraed Dennis Lee Bieber KD6MOG
(e-mail address removed) (e-mail address removed)
HTTP://wlfraed.home.netcom.com/
(Bestiaria Support Staff: (e-mail address removed))
HTTP://www.bestiaria.com/
 
C

castironpi

Here.  Run it.  Download Python 3.0a2.

        I'm unlikely to download an alpha release when I haven't even
upgraded to 2.5 (maybe the next three day weekend I'll have time to
track down new versions of all the third party modules I have installed
and then download 2.5)

        But I'll meet you half-way
[snip]
        The RESET is the one executed as part of initializing the StepLock
member of Worker. Even though thrd.start() is invoked, it is likely the
thread does not gain control at that point. That would mean the ACQUIRE
0 is the one in the range(5) loop, along with the NOTIFY and RELEASE,
followed by calling op100. ACQUIRE 5 is a result of the acquirestep(0).
NOW the thread actually gained control (worker entered) and ACQUIRE 8 8
are part of its acquirestep(1). The thread actually gets the condition
lock immediately, tests the current step level against the step is want
and finds a mismatch. The thread then signals for any other thread
waiting on the condition via NOTIFY, and then waits for another thread
to trigger a notify on it (WAIT 12). Control returns to op100 and the
acquirestep(0) ACQUIRE 5 is answered with 5 taken; the step level
matches the desired step, and the code for step 0 is executed...

Pretty cool.

First, thanks for taking the time to -edit- my code, along with
getting it running! I appreciate your effort. And I'm happy you like
it enough to put it in.

It runs on my machine in 2.5. First change was to run it
indefinitely, and it's doing so. Still not conclusive evidence that
it's free of deadlocks, nor is that I don't see any by
inspection. ...But it's running.

Second, thanks for picking better names for the identifiers. Mine
always suck.

Third. StepLock.acquirestep can use notifyAll: wake all of them up,
and all but one go back to sleep. However, the current implementation
wakes them up in order wait was called, so you do save some time here,
(Even though the docs say otherwise). The while self._step!= step is
suspicious, but the alternative is an -sequence- of locks, the penalty
is only large for high step lengths, and it's not exactly polling-- so
maybe that's just the purist in me.

Fourth: wait is a private member, unless you're enabling the
functionality of waiting for the next change in step by another
thread. notifyAll isn't used anywhere. condition is also private, I
believe.

Fifth: My webreader gives me this information: "Note: The author of
this message requested that it not be archived. This message will be
removed from Groups in 6 days (Mar 7, 12:55 am)." Curious if you
intentially have it. It could be nice to retrieve this-- same with
Stage.

Sixth: If I comment out fmain:worktask.slock.release() and
threadbody:worker.slock.release() there is no effect. It's bizarre.
Is it a requirement to use them?

Seventh: What is the advantage of using a Condition over an Event?
 

Ask a Question

Want to reply to this thread or ask your own question?

You'll need to choose a username for the site, which only take a couple of moments. After that, you can post your question and our members will help you out.

Ask a Question

Members online

Forum statistics

Threads
474,432
Messages
2,571,680
Members
48,796
Latest member
Greg L.

Latest Threads

Top