generators shared among threads

J

jess.austin

hi,

This seems like a difficult question to answer through testing, so I'm
hoping that someone will just know... Suppose I have the following
generator, g:

def f()
i = 0
while True:
yield i
i += 1
g=f()

If I pass g around to various threads and I want them to always be
yielded a unique value, will I have a race condition? That is, is it
possible that the cpython interpreter would interrupt one thread after
the increment and before the yield, and then resume another thread to
yield the first thread's value, or increment the stored i, or both,
before resuming the first thread? If so, would I get different
behavior if I just set g like:

g=itertools.count()

If both of these idioms will give me a race condition, how might I go
about preventing such? I thought about using threading.Lock, but I'm
sure that I don't want to put a lock around the yield statement.

thanks,
Jess
 
A

Alex Martelli

hi,

This seems like a difficult question to answer through testing, so I'm
hoping that someone will just know... Suppose I have the following
generator, g:

def f()
i = 0
while True:
yield i
i += 1
g=f()

If I pass g around to various threads and I want them to always be
yielded a unique value, will I have a race condition? That is, is it

Yes, you will.
before resuming the first thread? If so, would I get different
behavior if I just set g like:

g=itertools.count()

I believe that in the current implementation you'd get "lucky", but
there is no guarantee that such luck would persist across even a minor
bugfix in the implementation. Don't do it.
If both of these idioms will give me a race condition, how might I go
about preventing such? I thought about using threading.Lock, but I'm
sure that I don't want to put a lock around the yield statement.

Queue.Queue is often the best way to organize cooperation among threads.
Make a Queue.Queue with a reasonably small maximum size, a single
dedicated thread that puts successive items of itertools.count onto it
(implicitly blocking and waiting when the queue gets full), and any
other thread can call get on the queue and obtain a unique item
(implicitly waiting a little bit if the queue ever gets empty, until the
dedicated thread waits and fills the queue again). [[Alternatively you
could subclass Queue and override the hook-method _get, which always
gets called in a properly locked and thus serialized condition; but that
may be considered a reasonably advanced task, since such subclassing
isn't documented in the reference library, only in Queue's sources]].


Alex
 
A

Alan Kennedy

[[email protected]]
def f()
i = 0
while True:
yield i
i += 1
g=f()

If I pass g around to various threads and I want them to always be
yielded a unique value, will I have a race condition?

Yes.

Generators can be shared between threads, but they cannot be resumed
from two threads at the same time.

You should wrap it in a lock to ensure that only one thread at a time
can resume the generator.

Read this thread from a couple of years back about the same topic.

Suggested generator to add to threading module.
http://groups.google.com/group/comp...afa913fe4df/a2ede21f7dd78f34#a2ede21f7dd78f34

Also contained in that thread is an implementation of Queue.Queue which
supplies values from a generator, and which does not require a separate
thread to generate values.

HTH,
 
J

jess.austin

Thanks for the great advice, Alex. Here is a subclass that seems to
work:

from Queue import Queue
from itertools import count

class reentrantQueue(Queue):
def _init(self, maxsize):
self.maxsize = 0
self.queue = [] # so we don't have to override put()
self.counter = count()
def _empty(self):
return False
def _get(self):
return self.counter.next()
def next(self):
return self.get()
def __iter__(self):
return self
 
A

Alex Martelli

Thanks for the great advice, Alex. Here is a subclass that seems to
work:

You're welcome!
from Queue import Queue
from itertools import count

class reentrantQueue(Queue):
def _init(self, maxsize):
self.maxsize = 0
self.queue = [] # so we don't have to override put()
self.counter = count()
def _empty(self):
return False
def _get(self):
return self.counter.next()
def next(self):
return self.get()
def __iter__(self):
return self

You may also want to override _put to raise an exception, just to avoid
accidental misuse, though I agree it's marginal. Also, I'd use maxsize
(if provided and >0) as the upperbound for the counting; not sure that's
necessary but it seems pretty natural to raise StopIteration (rather
than returning the counter's value) if the counter reaches that
"maxsize". Last, I'm not sure I'd think of this as a reentrantQueue, so
much as a ReentrantCounter;-).


Alex
 
J

jess.austin

Alex said:
Last, I'm not sure I'd think of this as a reentrantQueue, so
much as a ReentrantCounter;-).

Of course! It must have been late when I named this class... I think
I'll go change the name in my code right now.
 
P

Paul Rubin

I believe that in the current implementation you'd get "lucky", but
there is no guarantee that such luck would persist across even a minor
bugfix in the implementation. Don't do it.

I remember being told that xrange(sys.maxint) was thread-safe, but of
course I wouldn't want to depend on that across Python versions either.
Queue.Queue is often the best way to organize cooperation among threads.
Make a Queue.Queue with a reasonably small maximum size, a single
dedicated thread that puts successive items of itertools.count onto it
(implicitly blocking and waiting when the queue gets full),

This should be pretty simple to implement and not much can go wrong
with it, but it means a separate thread for each such generator, and
waiting for thread switches every time the queue goes empty. A more
traditional approach would be to use a lock in the generator,

def f():
lock = threading.Lock()
i = 0
while True:
lock.acquire()
yield i
i += 1
lock.release()

but it's easy to make mistakes when implementing things like that
(I'm not even totally confident that the above is correct).

Hmm (untested, like above):

class Synchronized:
def __init__(self, generator):
self.gen = generator
self.lock = threading.Lock()
def next(self):
self.lock.acquire()
try:
yield self.gen.next()
finally:
self.lock.release()

synchronized_counter = Synchronized(itertools.count())

That isn't a general solution but can be convenient (if I didn't mess
it up). Maybe there's a more general recipe somewhere.
 
J

jess.austin

Paul said:
def f():
lock = threading.Lock()
i = 0
while True:
lock.acquire()
yield i
i += 1
lock.release()

but it's easy to make mistakes when implementing things like that
(I'm not even totally confident that the above is correct).

The main problem with this is that the yield leaves the lock locked.
If any other thread wants to read the generator it will block. Your
class Synchronized fixes this with the "finally" hack (please note that
from me this is NOT a pejorative). I wonder... is that future-proof?
It seems that something related to this might change with 2.5? My
notes from GvR's keynote don't seem to include this. Someone that
knows more than I do about the intersection between "yield" and
"finally" would have to speak to that.
 
P

Paul Rubin

The main problem with this is that the yield leaves the lock locked.
If any other thread wants to read the generator it will block.

Ouch, good catch. Do you see a good fix other than try/finally?
Is there a classical way to do it with coroutines and semaphores?
 
A

Alex Martelli

Paul Rubin said:
Ouch, good catch. Do you see a good fix other than try/finally?
Is there a classical way to do it with coroutines and semaphores?

Jesse's solution from the other half of this thread, generalized:

import Queue

class ReentrantIterator(Queue.Queue):
def _init(self, iterator):
self.iterator = iterator
def _empty(self):
return False
def _get(self):
return self.iterator.next()
def _put(*ignore):
raise TypeError, "Can't put to a ReentrantIterator"
def next(self):
return self.get()
def __iter__(self):
return self

Now, x=ReentrantIterator(itertools.count()) should have all the
properties we want, I think. The locking is thanks of Queue.Queue and
its sweet implementation of the Template Method design pattern.


Alex
 
K

Kent Johnson

Paul said:
Hmm (untested, like above):

class Synchronized:
def __init__(self, generator):
self.gen = generator
self.lock = threading.Lock()
def next(self):
self.lock.acquire()
try:
yield self.gen.next()
finally:
self.lock.release()

synchronized_counter = Synchronized(itertools.count())

That isn't a general solution but can be convenient (if I didn't mess
it up). Maybe there's a more general recipe somewhere.

This code is not allowed in Python 2.4. From PEP 255:

Restriction: A yield statement is not allowed in the try clause of a
try/finally construct. The difficulty is that there's no guarantee
the generator will ever be resumed, hence no guarantee that the finally
block will ever get executed; that's too much a violation of finally's
purpose to bear.

Even if this was allowed, ISTM the semantics might be the same as your
previous attempt - I would expect the finally to be executed after the
yield returns, meaning the lock would be held during the yield.

Python 2.5 will allow this (see PEP 342) but from the examples it seems
the finally won't execute until the yield returns.

Kent
 
J

Just

Kent Johnson said:
This code is not allowed in Python 2.4. From PEP 255:
[ snip ]

The code also doesn't make sense: .next() should *return* a value, not
yield one. Substituting "return" for "yield" might just work for the
code above.

Just
 
P

Paul Rubin

Now, x=ReentrantIterator(itertools.count()) should have all the
properties we want, I think. The locking is thanks of Queue.Queue and
its sweet implementation of the Template Method design pattern.

That is very cool, and generally useful enough that maybe it should be
dropped into itertools. I see that Queue.get's implementation is
quite intricate (it uses three separate locks to handle some
additional cases like timeouts and non-blocking gets) but I'm not up
to trying to grok it right now. Thanks!
 
B

Bryan Olson

The main problem with this is that the yield leaves the lock locked.
If any other thread wants to read the generator it will block.

I don't think so. The second thread will start right after the
yeild, and release the lock before acquiring it.

Here's a demo:


import threading
import time

def start_daemon(closure):
t = threading.Thread(target=closure)
t.setDaemon(True)
t.start()

def f():
lock = threading.Lock()
i = 0
while True:
lock.acquire()
yield i
i += 1
lock.release()


fgen = f()

def count3():
for _ in range(3):
print '---', fgen.next()
time.sleep(10)

start_daemon(count3)
time.sleep(1.0)
print "+++", fgen.next()
 
J

jess.austin

I just noticed, if you don't define maxsize in _init(), you need to
override _full() as well:

def _full(self):
return False

cheers,
Jess
 
J

jess.austin

Bryan,

You'll get the same result without the lock. I'm not sure what this
indicates. It may show that the contention on the lock and the race
condition on i aren't always problems. It may show that generators, at
least in CPython 2.4, provide thread safety for free. It does seem to
disprove my statement that, "the yield leaves the lock locked".

More than that, I don't know. When threading is involved, different
runs of the same code can yield different results. Can we be sure that
each thread starts where the last one left off? Why wouldn't a thread
just start where it had left off before? Of course, this case would
have the potential for problems that Alex talked about earlier. Why
would a generator object be any more reentrant than a function object?
Because it has a gi_frame attribute? Would generators be thread-safe
only in CPython?

I started the discussion with simpler versions of these same questions.
I'm convinced that using Queue is safe, but now I'm not convinced that
just using a generator is not safe.

cheers,
Jess
 
B

Bryan Olson

You'll get the same result without the lock. I'm not sure what this
indicates. It may show that the contention on the lock and the race
condition on i aren't always problems. It may show that generators, at
least in CPython 2.4, provide thread safety for free. It does seem to
disprove my statement that, "the yield leaves the lock locked".

More than that, I don't know. When threading is involved, different
runs of the same code can yield different results. Can we be sure that
each thread starts where the last one left off? Why wouldn't a thread
just start where it had left off before? Of course, this case would
have the potential for problems that Alex talked about earlier. Why
would a generator object be any more reentrant than a function object?
Because it has a gi_frame attribute? Would generators be thread-safe
only in CPython?

I have not found definitive answers in the Python doc. Both
generators and threads keep their own line-of-control, and
how they interact is not clear.
 
P

Paul Rubin

Bryan Olson said:
I have not found definitive answers in the Python doc. Both
generators and threads keep their own line-of-control, and
how they interact is not clear.

It looks to me like you can't have two threads in the same generator:

import threading, time

def make_gen():
lock = threading.Lock()
count = 0
delay = [0,1]
while True:
lock.acquire()

# sleep 1 sec on first iteration, then 0 seconds on next iteration
time.sleep(delay.pop())
count += 1
yield count

lock.release()

def run():
print gen.next()

gen = make_gen()

# start a thread that will lock the generator for 1 sec
threading.Thread(target=run).start()

# make sure first thread has a chance to get started
time.sleep(0.2)

# start second thread while the generator is locked
threading.Thread(target=run).start()

raises ValueError:

Python 2.3.4 (#1, Feb 2 2005, 12:11:53)
[GCC 3.4.2 20041017 (Red Hat 3.4.2-6.fc3)] on linux2
Type "help", "copyright", "credits" or "license" for more information. Traceback (most recent call last):
File "/usr/lib/python2.3/threading.py", line 436, in __bootstrap
self.run()
File "/usr/lib/python2.3/threading.py", line 416, in run
self.__target(*self.__args, **self.__kwargs)
File "/usr/tmp/python-28906xpZ", line 18, in run
ValueError: generator already executing

1
 
T

Tim Peters

[Paul Rubin]
It looks to me like you can't have two threads in the same generator:

You can't even have one thread in a generator-iterator get away with
activating the generator-iterator while it's already active. That's
an example in PEP 255:

"""
Restriction: A generator cannot be resumed while it is actively
running:
... i = me.next()
... yield i Traceback (most recent call last):
...
File "<string>", line 2, in g
ValueError: generator already executing
"""

Same thing if more than one thread tries to do that, but perhaps
harder to see then.

To make some intuitive sense of those, note that a generator-iterator
reuses a single stack frame across resumptions. There is only once
such frame per generator-iterator, hence only (among other things)
one "program counter" per generator-iterator. It should be obvious
that multiple threads can't be allowed to muck with _the_ frame's
program counter simultaneously, and the example above is actually
subtler on that count (because nothing in non-resumable functions
prepares you for that generators make it possible for a _single_
thread to _try_ to "be in two places at the same time" inside a single
invocation of a function -- although any attempt to try to do that
with a single thread is necessarily convoluted, like the example
above, the implementation still has to prevent it).
 

Ask a Question

Want to reply to this thread or ask your own question?

You'll need to choose a username for the site, which only take a couple of moments. After that, you can post your question and our members will help you out.

Ask a Question

Members online

No members online now.

Forum statistics

Threads
473,763
Messages
2,569,562
Members
45,038
Latest member
OrderProperKetocapsules

Latest Threads

Top