collecting results in threading app

G

Gerardo Herzig

Hi all. Newbee@threads over here. Im missing some point here, but cant
figure out which one.

This little peace of code executes a 'select count(*)' over every table
in a database, one thread per table:
<code>
class TableCounter(threading.Thread):
def __init__(self, conn, table):
self.connection = connection.Connection(host=conn.host,
port=conn.port, user=conn.user, password='', base=conn.base)
threading.Thread.__init__(self)
self.table = table

def run(self):
result = self.connection.doQuery("select count(*) from %s" %
self.table, [])[0][0]
print result
return result


class DataChecker(metadata.Database):

def countAll(self):
for table in self.tables:
t = TableCounter(self.connection, table.name)
t.start()
return
</code>

It works fine, in the sense that every run() method prints the correct
value.
But...I would like to store the result of t.start() in, say, a list. The
thing is, t.start() returns None, so...what im i missing here?
Its the desing wrong?

thanks!

Gerardo
 
G

George Sakkis

Hi all. Newbee@threads over here. Im missing some point here, but cant
figure out which one.

This little peace of code executes a 'select count(*)' over every table
in a database, one thread per table:
<code>
class TableCounter(threading.Thread):
def __init__(self, conn, table):
self.connection = connection.Connection(host=conn.host,
port=conn.port, user=conn.user, password='', base=conn.base)
threading.Thread.__init__(self)
self.table = table

def run(self):
result = self.connection.doQuery("select count(*) from %s" %
self.table, [])[0][0]
print result
return result

class DataChecker(metadata.Database):

def countAll(self):
for table in self.tables:
t = TableCounter(self.connection, table.name)
t.start()
return
</code>

It works fine, in the sense that every run() method prints the correct
value.
But...I would like to store the result of t.start() in, say, a list. The
thing is, t.start() returns None, so...what im i missing here?
Its the desing wrong?

The simplest way is to just store it as an attribute in the
TableCounter instance:

def run(self):
self.result = self.connection.doQuery(...)

Another alternative is to add it to a Queue. You can't use a list
unless you protect with a lock to prevent concurrent append()s, but
that's what Queues do anyway [1].

Regardless of where the results are stored, a second issue which you
don't address here is, how do you know that a given result or all
results are done ? Again there are several alternatives, but Python
2.5 adds two convenient Queue methods for this, task_done() and
join(). Check out the example at the bottom of the Queue doc page [2]
to see how it works.

HTH,
George


[1] http://docs.python.org/lib/module-Queue.html
[2] http://docs.python.org/lib/QueueObjects.html
 
J

John Nagle

Gerardo said:
Hi all. Newbee@threads over here. Im missing some point here, but cant
figure out which one.

This little peace of code executes a 'select count(*)' over every table
in a database, one thread per table:
<code>
class TableCounter(threading.Thread):
def __init__(self, conn, table):
self.connection = connection.Connection(host=conn.host,
port=conn.port, user=conn.user, password='', base=conn.base)
threading.Thread.__init__(self)
self.table = table

def run(self):
result = self.connection.doQuery("select count(*) from %s" %
self.table, [])[0][0]
print result
return result


class DataChecker(metadata.Database):

def countAll(self):
for table in self.tables:
t = TableCounter(self.connection, table.name)
t.start()
return
</code>

It works fine, in the sense that every run() method prints the correct
value.
But...I would like to store the result of t.start() in, say, a list. The
thing is, t.start() returns None, so...what im i missing here?
Its the desing wrong?

1. What interface to MySQL are you using? That's not MySQLdb.
2. If SELECT COUNT(*) is slow, check your table definitions.
For MyISAM, it's a fixed-time operation, and even for InnoDB,
it shouldn't take that long if you have an INDEX.
3. Threads don't return "results" as such; they're not functions.


As for the code, you need something like this:

class TableCounter(threading.Thread):
def __init__(self, conn, table):
self.result = None
...

def run(self):
self.result = self.connection.doQuery("select count(*) from %s" %
self.table, [])[0][0]


def countAll(self):
mythreads = [] # list of TableCounter objects
# Start all threads
for table in self.tables:
t = TableCounter(self.connection, table.name)
mythreads.append(t) # list of counter threads
t.start()
# Wait for all threads to finish
totalcount = 0
for mythread in mythreads: # for all threads
mythread.join() # wait for thread to finish
totalcount += mythread.result # add to result
print "Total size of all tables is:", totalcount



John Nagle
 
G

Gerardo Herzig

John said:
Gerardo Herzig wrote:

Hi all. Newbee@threads over here. Im missing some point here, but cant
figure out which one.

This little peace of code executes a 'select count(*)' over every table
in a database, one thread per table:
<code>
class TableCounter(threading.Thread):
def __init__(self, conn, table):
self.connection = connection.Connection(host=conn.host,
port=conn.port, user=conn.user, password='', base=conn.base)
threading.Thread.__init__(self)
self.table = table

def run(self):
result = self.connection.doQuery("select count(*) from %s" %
self.table, [])[0][0]
print result
return result


class DataChecker(metadata.Database):

def countAll(self):
for table in self.tables:
t = TableCounter(self.connection, table.name)
t.start()
return
</code>

It works fine, in the sense that every run() method prints the correct
value.
But...I would like to store the result of t.start() in, say, a list. The
thing is, t.start() returns None, so...what im i missing here?
Its the desing wrong?

1. What interface to MySQL are you using? That's not MySQLdb.
2. If SELECT COUNT(*) is slow, check your table definitions.
For MyISAM, it's a fixed-time operation, and even for InnoDB,
it shouldn't take that long if you have an INDEX.
3. Threads don't return "results" as such; they're not functions.


As for the code, you need something like this:

class TableCounter(threading.Thread):
def __init__(self, conn, table):
self.result = None
...

def run(self):
self.result = self.connection.doQuery("select count(*) from %s" %
self.table, [])[0][0]


def countAll(self):
mythreads = [] # list of TableCounter objects
# Start all threads
for table in self.tables:
t = TableCounter(self.connection, table.name)
mythreads.append(t) # list of counter threads
t.start()
# Wait for all threads to finish
totalcount = 0
for mythread in mythreads: # for all threads
mythread.join() # wait for thread to finish
totalcount += mythread.result # add to result
print "Total size of all tables is:", totalcount



John Nagle
Thanks John, that certanly works. According to George's suggestion, i
will take a look to the Queue module.
One question about

for mythread in mythreads: # for all threads
mythread.join() # wait for thread to finish


That code will wait for the first count(*) to finish and then continues
to the next count(*). Because if is that so, it will be some kind of
'use threads, but execute one at the time'.
I mean, if mytreads[0] is a very longer one, all the others will be
waiting...rigth?
There is an approach in which i can 'sum' after *any* thread finish?

Could a Queue help me there?
Thanks!

Gerardo
 
J

John Nagle

Thanks John, that certanly works. According to George's suggestion, i
will take a look to the Queue module.
One question about

for mythread in mythreads: # for all threads
mythread.join() # wait for thread to finish


That code will wait for the first count(*) to finish and then continues
to the next count(*). Because if is that so, it will be some kind of
'use threads, but execute one at the time'.

No, all the threads are started in the first loop, and can run
their MySQL queries concurrently. Once all threads have been
started, the second loop (above) waits for all of them to finish.

John Nagle
 
G

George Sakkis

John said:
Gerardo Herzig wrote:
Hi all. Newbee@threads over here. Im missing some point here, but cant
figure out which one.
This little peace of code executes a 'select count(*)' over every table
in a database, one thread per table:
<code>
class TableCounter(threading.Thread):
def __init__(self, conn, table):
self.connection = connection.Connection(host=conn.host,
port=conn.port, user=conn.user, password='', base=conn.base)
threading.Thread.__init__(self)
self.table = table
def run(self):
result = self.connection.doQuery("select count(*) from %s" %
self.table, [])[0][0]
print result
return result
class DataChecker(metadata.Database):
def countAll(self):
for table in self.tables:
t = TableCounter(self.connection, table.name)
t.start()
return
</code>
It works fine, in the sense that every run() method prints the correct
value.
But...I would like to store the result of t.start() in, say, a list. The
thing is, t.start() returns None, so...what im i missing here?
Its the desing wrong?
1. What interface to MySQL are you using? That's not MySQLdb.
2. If SELECT COUNT(*) is slow, check your table definitions.
For MyISAM, it's a fixed-time operation, and even for InnoDB,
it shouldn't take that long if you have an INDEX.
3. Threads don't return "results" as such; they're not functions.
As for the code, you need something like this:
class TableCounter(threading.Thread):
def __init__(self, conn, table):
self.result = None
...
def run(self):
self.result = self.connection.doQuery("select count(*) from %s" %
self.table, [])[0][0]
def countAll(self):
mythreads = [] # list of TableCounter objects
# Start all threads
for table in self.tables:
t = TableCounter(self.connection, table.name)
mythreads.append(t) # list of counter threads
t.start()
# Wait for all threads to finish
totalcount = 0
for mythread in mythreads: # for all threads
mythread.join() # wait for thread to finish
totalcount += mythread.result # add to result
print "Total size of all tables is:", totalcount
John Nagle

Thanks John, that certanly works. According to George's suggestion, i
will take a look to the Queue module.
One question about

for mythread in mythreads: # for all threads
mythread.join() # wait for thread to finish

That code will wait for the first count(*) to finish and then continues
to the next count(*). Because if is that so, it will be some kind of
'use threads, but execute one at the time'.
I mean, if mytreads[0] is a very longer one, all the others will be
waiting...rigth?

No, all will be executed in parallel; only the main thread will be
waiting for the first thread to finish. So if only the first job is
long, as soon as it finishes and join()s, all the others will already
have finished and their join() will be instantaneous.
There is an approach in which i can 'sum' after *any* thread finish?

Could a Queue help me there?

Yes, you can push each result to a queue and have the main thread wait
in a loop doing a queue.get() every time. After each get() you can do
whatever with the results so far (partial sum, update a progress bar,
etc.)

<shameless-plug>
You can take a look at papyros [1], a small package I wrote for hiding
the details behind a simple Pythonic API. Using papyros, your example
would look something like this:


import sys
from papyros import Job
from papyros.multithreaded import MultiThreadedMaster

# a papyros.Job subclass for each type of task you want to run
concurrently
class CountJob(Job):
def __call__(self, connection, table_name):
return connection.doQuery("select count(*) from %s" %
table_name, [])[0][0]


class DataChecker(metadata.Database):
def countAll(self):
sum_count = 0
# create a pool of 4 threads
master = MultiThreadedMaster(4)
# issue all the jobs
for table in self.tables:
master.addJob(CountJob(self.connection, table.name))
# get each processed job as soon as it finishes
for job in iter(master.popProcessedJob, None):
# the job arguments are available as job.args
table_name = job.args[1]
try: # try to get the result
count = job.result
except Exception, ex:
# some exception was raised when executing this job
print '* Exception raised for table %s: %s' %
(table_name, ex)
else:
# job finished successfully
sum_count += count
print 'Table %s: count=%d (running total=%d)' % (
table_name, count, sum_count)
return sum_count


As you can see, any exception raised in a thread is stored and
reraised on the main thread when you attempt to get the result. You
can also specify a timeout in popProcessedJob() so that the main
thread doesn't wait forever in case a job hangs.

Last but not least, the same API is implemented both for threads and
processes (using Pyro) so it's not restricted by the GIL in case the
jobs are CPU-intensive.
</shameless-plug>

George
 
G

George Sakkis

There is an approach in which i can 'sum' after *any* thread finish?
Could a Queue help me there?

Yes, you can push each result to a queue and have the main thread wait
in a loop doing a queue.get() every time. After each get() you can do
whatever with the results so far (partial sum, update a progress bar,
etc.)

<shameless-plug>
You can take a look at papyros [1],

I forgot the link: http://pypi.python.org/pypi/papyros/

George
 

Ask a Question

Want to reply to this thread or ask your own question?

You'll need to choose a username for the site, which only take a couple of moments. After that, you can post your question and our members will help you out.

Ask a Question

Members online

Forum statistics

Threads
473,755
Messages
2,569,536
Members
45,020
Latest member
GenesisGai

Latest Threads

Top