threading - race condition?

S

skunkwerk

i'm getting the wrong output for the 'title' attributes for this
data. the queue holds a data structure (item name, position, and list
to store results in). each thread takes in an item name and queries a
database for various attributes. from the debug statements the item
names are being retrieved correctly, but the attributes returned are
those of other items in the queue - not its own item. however, the
model variable is not a global variable... so i'm not sure what's
wrong.

i've declared a bunch of worker threads (100) and a queue into which
new requests are inserted, like so:

queue = Queue.Queue(0)
WORKERS=100
for i in range(WORKERS):
thread = SDBThread(queue)
thread.setDaemon(True)
thread.start()

the thread:

class SimpleDBThread ( threading.Thread ):
def __init__ ( self, queue ):
self.__queue = queue
threading.Thread.__init__ ( self )
def run ( self ):
while 1:
item = self.__queue.get()
if item!=None:
model = domain.get_item(item[0])
logger.debug('sdbthread item:'+item[0])
title = model['title']
scraped = model['scraped']
logger.debug("sdbthread title:"+title)

any suggestions?
thanks
 
C

castironpi

i'm getting the wrong output for the 'title' attributes for this
data.  the queue holds a data structure (item name, position, and list
to store results in).  each thread takes in an item name and queries a
database for various attributes.  from the debug statements the item
names are being retrieved correctly, but the attributes returned are
those of other items in the queue - not its own item.  however, the
model variable is not a global variable... so i'm not sure what's
wrong.

i've declared a bunch of worker threads (100) and a queue into which
new requests are inserted, like so:

queue = Queue.Queue(0)
 WORKERS=100
for i in range(WORKERS):
        thread = SDBThread(queue)
        thread.setDaemon(True)
        thread.start()

the thread:

class SimpleDBThread ( threading.Thread ):
   def __init__ ( self, queue ):
                self.__queue = queue
                threading.Thread.__init__ ( self )
   def run ( self ):
                while 1:
                        item = self.__queue.get()
                        if item!=None:
                                model = domain.get_item(item[0])
                                logger.debug('sdbthread item:'+item[0])
                                title = model['title']
                                scraped = model['scraped']
                                logger.debug("sdbthread title:"+title)

any suggestions?
thanks

I'll base this on terminology: if a model is in a brain (throughout
the known universe), and a dollar's a dollar, it may not pay to build
a computer out of brains.

If man arises as a tool-carrier, we will carry tools, not people.
Don't use Python to make people; make money, and not too much. Pick a
wage and you might get somewhere.
 
S

skunkwerk

i'm getting the wrong output for the 'title' attributes for this
data. the queue holds a data structure (item name, position, and list
to store results in). each thread takes in an item name and queries a
database for various attributes. from the debug statements the item
names are being retrieved correctly, but the attributes returned are
those of other items in the queue - not its own item. however, the
model variable is not a global variable... so i'm not sure what's
wrong.
i've declared a bunch of workerthreads(100) and a queue into which
new requests are inserted, like so:
queue = Queue.Queue(0)
WORKERS=100
for i in range(WORKERS):
thread = SDBThread(queue)
thread.setDaemon(True)
thread.start()
the thread:
class SimpleDBThread ( threading.Thread ):
def __init__ ( self, queue ):
self.__queue = queue
threading.Thread.__init__ ( self )
def run ( self ):
while 1:
item = self.__queue.get()
if item!=None:
model = domain.get_item(item[0])
logger.debug('sdbthread item:'+item[0])
title = model['title']
scraped = model['scraped']
logger.debug("sdbthread title:"+title)
any suggestions?
thanks

I'll base this on terminology: if a model is in a brain (throughout
the known universe), and a dollar's a dollar, it may not pay to build
a computer out of brains.

If man arises as a tool-carrier, we will carry tools, not people.
Don't use Python to make people; make money, and not too much. Pick a
wage and you might get somewhere.

excuse me?
 
G

Gabriel Genellina

i'm getting the wrong output for the 'title' attributes for this
data. the queue holds a data structure (item name, position, and list
to store results in). each thread takes in an item name and queries a
database for various attributes. from the debug statements the item
names are being retrieved correctly, but the attributes returned are
those of other items in the queue - not its own item. however, the
model variable is not a global variable... so i'm not sure what's
wrong.
i've declared a bunch of workerthreads(100) and a queue into which
new requests are inserted, like so:
class SimpleDBThread ( threading.Thread ):
def __init__ ( self, queue ):
self.__queue = queue
threading.Thread.__init__ ( self )
def run ( self ):
while 1:
item = self.__queue.get()
if item!=None:
model = domain.get_item(item[0])
logger.debug('sdbthread item:'+item[0])
title = model['title']
scraped = model['scraped']
logger.debug("sdbthread title:"+title)
any suggestions?

If man arises as a tool-carrier, we will carry tools, not people.
Don't use Python to make people; make money, and not too much. Pick a
wage and you might get somewhere.

excuse me?

(Please forgive our local pet)

is "item" a list? Perhaps the *same* list as other requests? Be careful
when you put mutable objects in a queue.
And what about domain.get_item? is it a thread safe operation?
You said the model is not a global - but is it a *different* object for
each request?
 
J

John Nagle

skunkwerk said:
i'm getting the wrong output for the 'title' attributes for this
data. the queue holds a data structure (item name, position, and list
to store results in). each thread takes in an item name and queries a
database for various attributes. from the debug statements the item
names are being retrieved correctly, but the attributes returned are
those of other items in the queue - not its own item. however, the
model variable is not a global variable... so i'm not sure what's
wrong.

i've declared a bunch of worker threads (100) and a queue into which
new requests are inserted, like so:

queue = Queue.Queue(0)
WORKERS=100
for i in range(WORKERS):
thread = SDBThread(queue)
thread.setDaemon(True)
thread.start()

the thread:

class SimpleDBThread ( threading.Thread ):
def __init__ ( self, queue ):
self.__queue = queue
threading.Thread.__init__ ( self )
def run ( self ):
while 1:
item = self.__queue.get()
if item!=None:
model = domain.get_item(item[0])
logger.debug('sdbthread item:'+item[0])
title = model['title']
scraped = model['scraped']
logger.debug("sdbthread title:"+title)

any suggestions?
thanks

Hm. We don't have enough code here to see what's wrong.
For one thing, we're not seeing how items get put on the queue. The
trouble might be at the "put" end.

Make sure that "model", "item", "title", and "scraped" are not globals.
Remember, any assignment to them in a global context makes them a global.

You should never get "None" from the queue unless you put a "None"
on the queue. "get()" blocks until there's work to do.

John Nagle
 
S

skunkwerk

skunkwerk said:
i'm getting the wrong output for the 'title' attributes for this
data.  the queue holds a data structure (item name, position, and list
to store results in).  each thread takes in an item name and queries a
database for various attributes.  from the debug statements the item
names are being retrieved correctly, but the attributes returned are
those of other items in the queue - not its own item.  however, the
model variable is not a global variable... so i'm not sure what's
wrong.
i've declared a bunch of workerthreads(100) and a queue into which
new requests are inserted, like so:
queue = Queue.Queue(0)
 WORKERS=100
for i in range(WORKERS):
   thread = SDBThread(queue)
   thread.setDaemon(True)
   thread.start()
the thread:
class SimpleDBThread ( threading.Thread ):
   def __init__ ( self, queue ):
           self.__queue = queue
           threading.Thread.__init__ ( self )
   def run ( self ):
           while 1:
                   item = self.__queue.get()
                   if item!=None:
                           model = domain.get_item(item[0])
                           logger.debug('sdbthread item:'+item[0])
                           title = model['title']
                           scraped = model['scraped']
                           logger.debug("sdbthread title:"+title)
any suggestions?
thanks

   Hm.  We don't have enough code here to see what's wrong.
For one thing, we're not seeing how items get put on the queue.  The
trouble might be at the "put" end.

   Make sure that "model", "item", "title", and "scraped" are not globals.
Remember, any assignment to them in a global context makes them a global.

   You should never get "None" from the queue unless you put a "None"
on the queue.  "get()" blocks until there's work to do.

                                        John Nagle

thanks John, Gabriel,
here's the 'put' side of the requests:

def prepSDBSearch(results):
modelList = [0]
counter=1
for result in results:
data = [result.item, counter, modelList]
queue.put(data)
counter+=1
while modelList[0] < len(results):
print 'waiting...'#wait for them to come home
modelList.pop(0)#now remove '0'
return modelList

responses to your follow ups:
1) 'item' in the threads is a list that corresponds to the 'data'
list in the above function. it's not global, and the initial values
seem ok, but i'm not sure if every time i pass in data to the queue it
passes in the same memory address or declares a new 'data' list (which
I guess is what I want)
2) john, i don't think any of the variables you mentioned are
global. the 'none' check was just for extra safety.
3) the first item in the modelList is a counter that keeps track of
the number of threads for this call that have completed - is there any
better way of doing this?

thanks again
 
S

skunkwerk

        Coming in late...

        <snip>



        Note: double-leading __ means "name mangling" -- typically only
needed when doing multiple layers of inheritance where different parents
have similar named items that need to be kept independent; a single _ is
the convention for "don't touch me unless you know what you are doing"
           threading.Thread.__init__ ( self )
   def run ( self ):
           while 1:
                   item = self.__queue.get()
                   if item!=None:
                           model = domain.get_item(item[0])
                           logger.debug('sdbthread item:'+item[0])
                           title = model['title']
                           scraped = model['scraped']
                           logger.debug("sdbthread title:"+title)
any suggestions?
thanks

        said:
thanks John, Gabriel,
  here's the 'put' side of the requests:
def prepSDBSearch(results):
   modelList = [0]
   counter=1
   for result in results:
           data = [result.item, counter, modelList]
           queue.put(data)
           counter+=1
   while modelList[0] < len(results):
           print 'waiting...'#wait for them to come home
   modelList.pop(0)#now remove '0'
   return modelList

        My suggestion, if you really want diagnostic help -- follow the
common recommendation of posting the minimal /runable (if erroneous)/
code... If "domain.get_item()" is some sort of RDBM access, you might
fake it using a pre-loaded dictionary -- anything that allows it to
return something when given the key value.
responses to your follow ups:
1)  'item' in thethreadsis a list that corresponds to the 'data'
list in the above function.  it's not global, and the initial values
seem ok, but i'm not sure if every time i pass in data to the queue it
passes in the same memory address or declares a new 'data' list (which
I guess is what I want)

        Rather confusing usage... In your "put" you have a list whose first
element is "result.item", but then in the work thread, you refer to the
entire list as "item"
3)  the first item in the modelList is a counter that keeps track of
the number ofthreadsfor this call that have completed - is there any
better way of doing this?

        Where? None of your posted code shows either "counter" or modelList
being used by thethreads.

        And yes, if you havethreadstrying to update a shared mutable, you
have a race condition.

        You also have a problem if you are using "counter" to define where
in modelList a thread is supposed to store its results -- as you can not
access an element that doesn't already exist...

a = [0]
a[3] = 1        #failure, need to create elements 1, 2, 3 first

        Now, if position is irrelevant, and a thread just appends its
results to modelList, then you don't need some counter, all you need is
to check the length of modelList against the count expected.

        Overall -- even though you are passing things via the queue, the
contents being pass via the queue are being treated as if they were
global entities (you could make modelList a global, remove it from the
queue entries, and have the same net access)...

        IOWs, you have too much coupling between thethreadsand the feed
routine...

        As for me... I'd be using a second queue for return values....

WORKERTHREADS = 100
feed = Queue.Queue()
result = Queue.Queue()

def worker():
        while True:
                (ID, item) = feed.get()                 #I leave the queues globals
                                                                                #since they perform locking
                                                                                #internally
                model = domain.get_item(item)
                results.put( (ID, model["title"], model["scraped"]) )

for i in range(WORKERTHREADS):
        aThread = threading.Thread(target=worker)
                #overkill to subclass as there is now no specialized init
                #and if I really wanted to make the queues non-global
                #I'd pass them as arguments:
                #       threading.Thread(target=worker, args=(feed, results))
                #where worker is now
                #       def worker(feed, results):
        aThread.setDaemon(True)
        aThread.start()

...

def prepSearch(searches):
        modelList = []
        counter = 0
        for searchItem in searches:
                feed.put( (counter, searchItem) )
                counter += 1
                modelList.append(None)  #extend list one element per search
        while counter:
                (ID, title, scraped) = results.get()
                modelList[ID] = (title, scraped)
                counter -= 1
        return modelList

        The only place counter and modelList are modified are within the
prepSearch. I'm passing counter out and back to use as an ID value if
the final results are supposed to be in order -- that way if one thread
finishes before another, the items can be placed into the list where
they'd have been sequentially.

        I can only hope that "domain.get_item" is an activity that is I/O
bound AND that it supports parallel accesses... Otherwise the above
workerthreadsseem to be adding a lot of overhead for queue I/O and
threading swaps for what is otherwise a rather linear process.

        Perhaps your posts don't reveal enough... Maybe you have multiple
mainthreadsthat are posting to the worker feed queue (and you were
using separate mutables for storing the results). In this situation, I'd
remove the results queue from being a global entity, create one queue
per main processing thread, and pass the queue as one of the parameters.
This way, a worker can return data to any source thread by using the
supplied queue for the return...

Modify prepSearch with:

        myQueue = Queue.Queue()
...
        feed.put( (counter, searchItem, myQueue) )
...
        (ID, title, scraped) = myQueue.get()

Modify worker with:

        (ID, item, retqueue) = feed.get()
...
        retqueue.put( (ID, model["title"], model["scraped"]) )
--
        Wulfraed        Dennis Lee Bieber               KD6MOG
        (e-mail address removed)               (e-mail address removed)
                HTTP://wlfraed.home.netcom.com/
        (Bestiaria Support Staff:               (e-mail address removed))
                HTTP://www.bestiaria.com/

wow. thanks for the help.
i seem to have fixed my problem though - it turns out
domain.get_item was not thread safe as it was using the underlying
httplib. the solution was to create a new connection to the database
for each thread (which is ok as the database is meant to be queried in
a massively paralell fashion). the missing part of the code included
a part where i inserted the results at the given position into the
list.

the only issue i have now is that it takes a long time for 100 threads
to initialize that connection (>5 minutes) - and as i'm doing this on
a webserver any time i update the code i have to restart all those
threads, which i'm doing right now in a for loop. is there any way I
can keep the thread stuff separate from the rest of the code for this
file, yet allow access? It wouldn't help having a .pyc or using
psycho, correct, as the time is being spent in the runtime? something
along the lines of 'start a new thread every minute until you get to a
100' without blocking the execution of the rest of the code in that
file? or maybe any time i need to do a search, start a new thread if
the #threads is <100?

thanks again
 
G

Gabriel Genellina

the only issue i have now is that it takes a long time for 100 threads
to initialize that connection (>5 minutes) - and as i'm doing this on
a webserver any time i update the code i have to restart all those
threads, which i'm doing right now in a for loop. is there any way I
can keep the thread stuff separate from the rest of the code for this
file, yet allow access?

Like using a separate thread to create the other 100?
 
R

Rhamphoryncus

Coming in late...
Note: double-leading __ means "name mangling" -- typically only
needed when doing multiple layers of inheritance where different parents
have similar named items that need to be kept independent; a single _ is
the convention for "don't touch me unless you know what you are doing"
threading.Thread.__init__ ( self )
def run ( self ):
while 1:
item = self.__queue.get()
if item!=None:
model = domain.get_item(item[0])
logger.debug('sdbthread item:'+item[0])
title = model['title']
scraped = model['scraped']
logger.debug("sdbthread title:"+title)
any suggestions?
thanks
thanks John, Gabriel,
here's the 'put' side of the requests:
def prepSDBSearch(results):
modelList = [0]
counter=1
for result in results:
data = [result.item, counter, modelList]
queue.put(data)
counter+=1
while modelList[0] < len(results):
print 'waiting...'#wait for them to come home
modelList.pop(0)#now remove '0'
return modelList
My suggestion, if you really want diagnostic help -- follow the
common recommendation of posting the minimal /runable (if erroneous)/
code... If "domain.get_item()" is some sort of RDBM access, you might
fake it using a pre-loaded dictionary -- anything that allows it to
return something when given the key value.
Rather confusing usage... In your "put" you have a list whose first
element is "result.item", but then in the work thread, you refer to the
entire list as "item"
Where? None of your posted code shows either "counter" or modelList
being used by thethreads.
And yes, if you havethreadstrying to update a shared mutable, you
have a race condition.
You also have a problem if you are using "counter" to define where
in modelList a thread is supposed to store its results -- as you can not
access an element that doesn't already exist...
a = [0]
a[3] = 1 #failure, need to create elements 1, 2, 3 first
Now, if position is irrelevant, and a thread just appends its
results to modelList, then you don't need some counter, all you need is
to check the length of modelList against the count expected.
Overall -- even though you are passing things via the queue, the
contents being pass via the queue are being treated as if they were
global entities (you could make modelList a global, remove it from the
queue entries, and have the same net access)...
IOWs, you have too much coupling between thethreadsand the feed
routine...
As for me... I'd be using a second queue for return values...
WORKERTHREADS = 100
feed = Queue.Queue()
result = Queue.Queue()
def worker():
while True:
(ID, item) = feed.get() #I leave the queues globals
#since they perform locking
#internally
model = domain.get_item(item)
results.put( (ID, model["title"], model["scraped"]) )
for i in range(WORKERTHREADS):
aThread = threading.Thread(target=worker)
#overkill to subclass as there is now no specialized init
#and if I really wanted to make the queues non-global
#I'd pass them as arguments:
# threading.Thread(target=worker, args=(feed, results))
#where worker is now
# def worker(feed, results):
aThread.setDaemon(True)
aThread.start()

def prepSearch(searches):
modelList = []
counter = 0
for searchItem in searches:
feed.put( (counter, searchItem) )
counter += 1
modelList.append(None) #extend list one element per search
while counter:
(ID, title, scraped) = results.get()
modelList[ID] = (title, scraped)
counter -= 1
return modelList
The only place counter and modelList are modified are within the
prepSearch. I'm passing counter out and back to use as an ID value if
the final results are supposed to be in order -- that way if one thread
finishes before another, the items can be placed into the list where
they'd have been sequentially.
I can only hope that "domain.get_item" is an activity that is I/O
bound AND that it supports parallel accesses... Otherwise the above
workerthreadsseem to be adding a lot of overhead for queue I/O and
threading swaps for what is otherwise a rather linear process.
Perhaps your posts don't reveal enough... Maybe you have multiple
mainthreadsthat are posting to the worker feed queue (and you were
using separate mutables for storing the results). In this situation, I'd
remove the results queue from being a global entity, create one queue
per main processing thread, and pass the queue as one of the parameters.
This way, a worker can return data to any source thread by using the
supplied queue for the return...
Modify prepSearch with:
myQueue = Queue.Queue()
...
feed.put( (counter, searchItem, myQueue) )
...
(ID, title, scraped) = myQueue.get()
Modify worker with:
(ID, item, retqueue) = feed.get()
...
retqueue.put( (ID, model["title"], model["scraped"]) )

wow. thanks for the help.
i seem to have fixed my problem though - it turns out
domain.get_item was not thread safe as it was using the underlying
httplib. the solution was to create a new connection to the database
for each thread (which is ok as the database is meant to be queried in
a massively paralell fashion). the missing part of the code included
a part where i inserted the results at the given position into the
list.

the only issue i have now is that it takes a long time for 100 threads
to initialize that connection (>5 minutes) - and as i'm doing this on
a webserver any time i update the code i have to restart all those
threads, which i'm doing right now in a for loop. is there any way I
can keep the thread stuff separate from the rest of the code for this
file, yet allow access? It wouldn't help having a .pyc or using
psycho, correct, as the time is being spent in the runtime? something
along the lines of 'start a new thread every minute until you get to a
100' without blocking the execution of the rest of the code in that
file? or maybe any time i need to do a search, start a new thread if
the #threads is <100?

$ python2.5 -m timeit -s 'import threading' 't = threading.Thread();
t.start(); t.join()'
10000 loops, best of 3: 131 usec per loop

Clearly it is not the threads themselves, but something else which is
expensive.

It's not clear why you need threads at all. Unless you've got a lot
of cpus/cores running that DBMS, or it's got fairly high latency (and
no way to pipeline), attacking it with more threads isn't gonna give
significant speedups.
 
S

skunkwerk

the only issue i have now is that it takes a long time for 100 threads
to initialize that connection (>5 minutes) - and as i'm doing this on
a webserver any time i update the code i have to restart all those
threads, which i'm doing right now in a for loop. is there any way I
can keep the thread stuff separate from the rest of the code for this
file, yet allow access? It wouldn't help having a .pyc or using
psycho, correct, as the time is being spent in the runtime? something
along the lines of 'start a new thread every minute until you get to a
100' without blocking the execution of the rest of the code in that
file? or maybe any time i need to do a search, start a new thread if
the #threads is <100?

Is this running as part of the server process, or as a client
accessing the server?

Alternative question: Have you tried measuring the performance using
/fewer/ threads... 25 or less? I believe I'd mentioned prior that you
seem to have a lot of overhead code for what may be a short query.

If the .get_item() code is doing a full sequence of: connect to
database; format&submit query; fetch results; disconnect from
database... I'd recommend putting the connect/disconnect outside of the
thread while loop (though you may then need to put sentinel values into
the feed queue -- one per thread -- so they can cleanly exit and
disconnect rather than relying on daemonization for exit).

thread:
dbcon = ...
while True:
query = Q.get()
if query == SENTINEL: break
result = get_item(dbcon, query)
...
dbcon.close()

Third alternative: Find some way to combine the database queries.
Rather than 100 threads each doing a single lookup (from your code, it
appears that only 1 result is expected per search term), run 10 threads
each looking up 10 items at once...

thread:
dbcon = ...
terms = []
terminate = False
while not terminate:
while len(terms) < 10:
query = Q.get_nowait()
if not query: break
if query == SENTINEL:
terminate = True
break
terms.append(query)
results = get_item(dbcon, terms)
terms = []
#however you are returning items; match the query term to the
#key item in the list of returned data?
dbcon.close()

where the final select statement looks something like:

SQL = """select key, title, scraped from ***
where key in ( %s )""" % ", ".join("?" for x in terms)
#assumes database adapter uses ? for placeholder
dbcur.execute(SQL, terms)
--
Wulfraed Dennis Lee Bieber KD6MOG
(e-mail address removed) (e-mail address removed)
HTTP://wlfraed.home.netcom.com/
(Bestiaria Support Staff: (e-mail address removed))
HTTP://www.bestiaria.com/

thanks again Dennis,
i chose 100 threads so i could do 10 simultaneous searches (where
each search contains 10 terms - using 10 threads). the .get_item()
code is not doing the database connection - rather the intialization
is done in the initialization of each thread. so basically once a
thread starts the database connection is persistent and .get_item
queries are very fast. this is running as a server process (using
django).

cheers
 
S

skunkwerk

Coming in late...
skunkwerkwrote:
i've declared a bunch of workerthreads(100) and a queue into which
new requests are inserted, like so:
<snip>
queue = Queue.Queue(0)
WORKERS=100
for i in range(WORKERS):
thread = SDBThread(queue)
thread.setDaemon(True)
thread.start()
the thread:
class SimpleDBThread ( threading.Thread ):
def __init__ ( self, queue ):
self.__queue = queue
Note: double-leading __ means "name mangling" -- typically only
needed when doing multiple layers of inheritance where different parents
have similar named items that need to be kept independent; a single _ is
the convention for "don't touch me unless you know what you are doing"
threading.Thread.__init__ ( self )
def run ( self ):
while 1:
item = self.__queue.get()
if item!=None:
model = domain.get_item(item[0])
logger.debug('sdbthread item:'+item[0])
title = model['title']
scraped = model['scraped']
logger.debug("sdbthread title:"+title)
any suggestions?
thanks
<snip>
thanks John, Gabriel,
here's the 'put' side of the requests:
def prepSDBSearch(results):
modelList = [0]
counter=1
for result in results:
data = [result.item, counter, modelList]
queue.put(data)
counter+=1
while modelList[0] < len(results):
print 'waiting...'#wait for them to come home
modelList.pop(0)#now remove '0'
return modelList
My suggestion, if you really want diagnostic help -- follow the
common recommendation of posting the minimal /runable (if erroneous)/
code... If "domain.get_item()" is some sort of RDBM access, you might
fake it using a pre-loaded dictionary -- anything that allows it to
return something when given the key value.
responses to your follow ups:
1) 'item' in thethreadsis a list that corresponds to the 'data'
list in the above function. it's not global, and the initial values
seem ok, but i'm not sure if every time i pass in data to the queue it
passes in the same memory address or declares a new 'data' list (which
I guess is what I want)
Rather confusing usage... In your "put" you have a list whose first
element is "result.item", but then in the work thread, you refer to the
entire list as "item"
3) the first item in the modelList is a counter that keeps track of
the number ofthreadsfor this call that have completed - is there any
better way of doing this?
Where? None of your posted code shows either "counter" or modelList
being used by thethreads.
And yes, if you havethreadstrying to update a shared mutable, you
have a race condition.
You also have a problem if you are using "counter" to define where
in modelList a thread is supposed to store its results -- as you can not
access an element that doesn't already exist...
a = [0]
a[3] = 1 #failure, need to create elements 1, 2, 3 first
Now, if position is irrelevant, and a thread just appends its
results to modelList, then you don't need some counter, all you need is
to check the length of modelList against the count expected.
Overall -- even though you are passing things via the queue, the
contents being pass via the queue are being treated as if they were
global entities (you could make modelList a global, remove it from the
queue entries, and have the same net access)...
IOWs, you have too much coupling between thethreadsand the feed
routine...
As for me... I'd be using a second queue for return values...
WORKERTHREADS = 100
feed = Queue.Queue()
result = Queue.Queue()
def worker():
while True:
(ID, item) = feed.get() #I leave the queues globals
#since they perform locking
#internally
model = domain.get_item(item)
results.put( (ID, model["title"], model["scraped"]) )
for i in range(WORKERTHREADS):
aThread = threading.Thread(target=worker)
#overkill to subclass as there is now no specialized init
#and if I really wanted to make the queues non-global
#I'd pass them as arguments:
# threading.Thread(target=worker, args=(feed, results))
#where worker is now
# def worker(feed, results):
aThread.setDaemon(True)
aThread.start()
...
def prepSearch(searches):
modelList = []
counter = 0
for searchItem in searches:
feed.put( (counter, searchItem) )
counter += 1
modelList.append(None) #extend list one element per search
while counter:
(ID, title, scraped) = results.get()
modelList[ID] = (title, scraped)
counter -= 1
return modelList
The only place counter and modelList are modified are within the
prepSearch. I'm passing counter out and back to use as an ID value if
the final results are supposed to be in order -- that way if one thread
finishes before another, the items can be placed into the list where
they'd have been sequentially.
I can only hope that "domain.get_item" is an activity that is I/O
bound AND that it supports parallel accesses... Otherwise the above
workerthreadsseem to be adding a lot of overhead for queue I/O and
threading swaps for what is otherwise a rather linear process.
Perhaps your posts don't reveal enough... Maybe you have multiple
mainthreadsthat are posting to the worker feed queue (and you were
using separate mutables for storing the results). In this situation, I'd
remove the results queue from being a global entity, create one queue
per main processing thread, and pass the queue as one of the parameters.
This way, a worker can return data to any source thread by using the
supplied queue for the return...
Modify prepSearch with:
myQueue = Queue.Queue()
...
feed.put( (counter, searchItem, myQueue) )
...
(ID, title, scraped) = myQueue.get()
Modify worker with:
(ID, item, retqueue) = feed.get()
...
retqueue.put( (ID, model["title"], model["scraped"]) )
wow. thanks for the help.
i seem to have fixed my problem though - it turns out
domain.get_item was not thread safe as it was using the underlying
httplib. the solution was to create a new connection to the database
for each thread (which is ok as the database is meant to be queried in
a massively paralell fashion). the missing part of the code included
a part where i inserted the results at the given position into the
list.
the only issue i have now is that it takes a long time for 100 threads
to initialize that connection (>5 minutes) - and as i'm doing this on
a webserver any time i update the code i have to restart all those
threads, which i'm doing right now in a for loop. is there any way I
can keep the thread stuff separate from the rest of the code for this
file, yet allow access? It wouldn't help having a .pyc or using
psycho, correct, as the time is being spent in the runtime? something
along the lines of 'start a new thread every minute until you get to a
100' without blocking the execution of the rest of the code in that
file? or maybe any time i need to do a search, start a new thread if
the #threads is <100?

$ python2.5 -m timeit -s 'import threading' 't = threading.Thread();
t.start(); t.join()'
10000 loops, best of 3: 131 usec per loop

Clearly it is not the threads themselves, but something else which is
expensive.

It's not clear why you need threads at all. Unless you've got a lot
of cpus/cores running that DBMS, or it's got fairly high latency (and
no way to pipeline), attacking it with more threads isn't gonna give
significant speedups.

correct. the threads themselves are not taking up the time, but the
initialization of each thread (which includes making a new connection
to the database) - typically this is ~3 seconds. The database is
amazon's simpleDB, which is meant to support massively parallel
queries. once the connection has been made, queries are very fast.

thanks
 
S

skunkwerk

Like using a separate thread to create the other 100?

thanks Gabriel,
i think that could do it - let me try it out. don't know why i
didn't think of it earlier.
 
R

Rhamphoryncus

Coming in late...
skunkwerkwrote:
i've declared a bunch of workerthreads(100) and a queue into which
new requests are inserted, like so:
<snip>
queue = Queue.Queue(0)
WORKERS=100
for i in range(WORKERS):
thread = SDBThread(queue)
thread.setDaemon(True)
thread.start()
the thread:
class SimpleDBThread ( threading.Thread ):
def __init__ ( self, queue ):
self.__queue = queue
Note: double-leading __ means "name mangling" -- typically only
needed when doing multiple layers of inheritance where different parents
have similar named items that need to be kept independent; a single _ is
the convention for "don't touch me unless you know what you are doing"
threading.Thread.__init__ ( self )
def run ( self ):
while 1:
item = self.__queue.get()
if item!=None:
model = domain.get_item(item[0])
logger.debug('sdbthread item:'+item[0])
title = model['title']
scraped = model['scraped']
logger.debug("sdbthread title:"+title)
any suggestions?
thanks
<snip>
thanks John, Gabriel,
here's the 'put' side of the requests:
def prepSDBSearch(results):
modelList = [0]
counter=1
for result in results:
data = [result.item, counter, modelList]
queue.put(data)
counter+=1
while modelList[0] < len(results):
print 'waiting...'#wait for them to come home
modelList.pop(0)#now remove '0'
return modelList
My suggestion, if you really want diagnostic help -- follow the
common recommendation of posting the minimal /runable (if erroneous)/
code... If "domain.get_item()" is some sort of RDBM access, you might
fake it using a pre-loaded dictionary -- anything that allows it to
return something when given the key value.
responses to your follow ups:
1) 'item' in thethreadsis a list that corresponds to the 'data'
list in the above function. it's not global, and the initial values
seem ok, but i'm not sure if every time i pass in data to the queue it
passes in the same memory address or declares a new 'data' list (which
I guess is what I want)
Rather confusing usage... In your "put" you have a list whose first
element is "result.item", but then in the work thread, you refer to the
entire list as "item"
3) the first item in the modelList is a counter that keeps track of
the number ofthreadsfor this call that have completed - is there any
better way of doing this?
Where? None of your posted code shows either "counter" or modelList
being used by thethreads.
And yes, if you havethreadstrying to update a shared mutable, you
have a race condition.
You also have a problem if you are using "counter" to define where
in modelList a thread is supposed to store its results -- as you can not
access an element that doesn't already exist...
a = [0]
a[3] = 1 #failure, need to create elements 1, 2, 3 first
Now, if position is irrelevant, and a thread just appends its
results to modelList, then you don't need some counter, all you need is
to check the length of modelList against the count expected.
Overall -- even though you are passing things via the queue, the
contents being pass via the queue are being treated as if they were
global entities (you could make modelList a global, remove it from the
queue entries, and have the same net access)...
IOWs, you have too much coupling between thethreadsand the feed
routine...
As for me... I'd be using a second queue for return values...
WORKERTHREADS = 100
feed = Queue.Queue()
result = Queue.Queue()
def worker():
while True:
(ID, item) = feed.get() #I leave the queues globals
#since they perform locking
#internally
model = domain.get_item(item)
results.put( (ID, model["title"], model["scraped"]) )
for i in range(WORKERTHREADS):
aThread = threading.Thread(target=worker)
#overkill to subclass as there is now no specialized init
#and if I really wanted to make the queues non-global
#I'd pass them as arguments:
# threading.Thread(target=worker, args=(feed, results))
#where worker is now
# def worker(feed, results):
aThread.setDaemon(True)
aThread.start()
...
def prepSearch(searches):
modelList = []
counter = 0
for searchItem in searches:
feed.put( (counter, searchItem) )
counter += 1
modelList.append(None) #extend list one element per search
while counter:
(ID, title, scraped) = results.get()
modelList[ID] = (title, scraped)
counter -= 1
return modelList
The only place counter and modelList are modified are within the
prepSearch. I'm passing counter out and back to use as an ID value if
the final results are supposed to be in order -- that way if one thread
finishes before another, the items can be placed into the list where
they'd have been sequentially.
I can only hope that "domain.get_item" is an activity that is I/O
bound AND that it supports parallel accesses... Otherwise the above
workerthreadsseem to be adding a lot of overhead for queue I/O and
threading swaps for what is otherwise a rather linear process.
Perhaps your posts don't reveal enough... Maybe you have multiple
mainthreadsthat are posting to the worker feed queue (and you were
using separate mutables for storing the results). In this situation, I'd
remove the results queue from being a global entity, create one queue
per main processing thread, and pass the queue as one of the parameters.
This way, a worker can return data to any source thread by using the
supplied queue for the return...
Modify prepSearch with:
myQueue = Queue.Queue()
...
feed.put( (counter, searchItem, myQueue) )
...
(ID, title, scraped) = myQueue.get()
Modify worker with:
(ID, item, retqueue) = feed.get()
...
retqueue.put( (ID, model["title"], model["scraped"]) )
wow. thanks for the help.
i seem to have fixed my problem though - it turns out
domain.get_item was not thread safe as it was using the underlying
httplib. the solution was to create a new connection to the database
for each thread (which is ok as the database is meant to be queried in
a massively paralell fashion). the missing part of the code included
a part where i inserted the results at the given position into the
list.
the only issue i have now is that it takes a long time for 100 threads
to initialize that connection (>5 minutes) - and as i'm doing this on
a webserver any time i update the code i have to restart all those
threads, which i'm doing right now in a for loop. is there any way I
can keep the thread stuff separate from the rest of the code for this
file, yet allow access? It wouldn't help having a .pyc or using
psycho, correct, as the time is being spent in the runtime? something
along the lines of 'start a new thread every minute until you get to a
100' without blocking the execution of the rest of the code in that
file? or maybe any time i need to do a search, start a new thread if
the #threads is <100?
$ python2.5 -m timeit -s 'import threading' 't = threading.Thread();
t.start(); t.join()'
10000 loops, best of 3: 131 usec per loop
Clearly it is not the threads themselves, but something else which is
expensive.
It's not clear why you need threads at all. Unless you've got a lot
of cpus/cores running that DBMS, or it's got fairly high latency (and
no way to pipeline), attacking it with more threads isn't gonna give
significant speedups.

correct. the threads themselves are not taking up the time, but the
initialization of each thread (which includes making a new connection
to the database) - typically this is ~3 seconds. The database is
amazon's simpleDB, which is meant to support massively parallel
queries. once the connection has been made, queries are very fast.

So it shouldn't take much more than 3 seconds to create all 100
threads. It certainly should take 5 minutes. However, 3 seconds *
100 = 5 minutes, so it sounds like the connection process is getting
serialized somehow.

Maybe you're doing the connection in the thread's __init__? Note that
__init__ is called when the thread *object* is created, by the main
thread, and not when you *start* the thread.

I find the threading.Thread API to be vastly over complicated. It's
much simpler to wrap it like this:

def start_thread(func, *args, **kwargs):
t = threading.Thread(target=func, args=args, kwargs=kwargs)
t.start()
return t

Then you can pass a function to start_thread and it will run in the
new child thread.
 

Ask a Question

Want to reply to this thread or ask your own question?

You'll need to choose a username for the site, which only take a couple of moments. After that, you can post your question and our members will help you out.

Ask a Question

Members online

Forum statistics

Threads
473,764
Messages
2,569,564
Members
45,039
Latest member
CasimiraVa

Latest Threads

Top