Multithreaded class with queues

A

Antal Rutz

Hi!

I wrote a little class to make multihreading easier. It's based on one
of aahz's threading example scripts. What it does:

It spawns up number of CollectorThreads and one ProcessThread. The
CollectorThreads listen on one queue (inputqueue), read, process the
data (with colfunc), put the result onto the outputqueue. The
ProcessThread listens on the outputqueue, reads, processes (with
prfunc). end. (more details in the attached file)

it seems to work with test functions but when I use a network-intensive
function (snmp-queries) it just gets slower with maxThreads set to more
than 1.

Any help?
Thanks.
see the class attached.
ps. Maybe I basically don't understand something...

--


--arutz

#!/usr/local/bin/python

"""
Multithreaded class for the task: multiple collector - one dataprocessor

Usage:

collector = Collector(data, colfunc, prfunc, maxThreads)
collector.run()

Internals:

Collector spawns up the CollectorThreads and a ProcessThread and puts
the data onto the inputQueue. The CollectorThread reads the data from
inputQueue and processes it through 'colfunc()'. Then puts
the result onto the outputQueue. The ProcessThread only listens on the
outputQueue (blocks on it) and feeds the data to `prfunc()`.

Thread shutdown: collectorthreads: inputQueue.put(shutdown=True)
processthread: outputQueue.put(shutdown=True)
"""

import threading
import Queue
#from operator import truth as _truth

#def _xor(a,b):
# return _truth(a) ^ _truth(b)

class _Token:
def __init__(self, data=None, shutdown=None):
#if not _xor(data, shutdown):
# raise "Tsk, tsk, need to set either URL or shutdown (not both)"
self.data = data
self.shutdown = shutdown

class _CollectorThread(threading.Thread):
"""Worker thread blocking on inputQueue.
The result goes to outputQueue after processed by self.func.
"""

def __init__(self, inQueue, outQueue, func):
threading.Thread.__init__(self)
self.inQ = inQueue
self.outQ = outQueue
self.func = func

def run(self):
while True:
token = self.inQ.get()
if token.shutdown is not None:
break
else:
#collect data from the routers
#print token.data
result = self.func(token.data)
self.outQ.put_nowait(_Token(data=result))

class _ProcessThread(threading.Thread):
"""'Reader-only' thread processing outputQueue."""

def __init__(self, outQueue, func):
threading.Thread.__init__(self)
self.outQ = outQueue
self.func = func

def run(self):
while True:
token = self.outQ.get()
if token.shutdown is not None:
break
else:
#insert into db or do anything
self.func(token.data)

class Collector:
"""Spawns up the threadpool (worker and processthreads)
and puts tha data onto the inputQueue of the worker threads.
Then shuts them down."""

def __init__(self, data, colfunc, prfunc, maxThreads=5):
"""Parameters:
- data: data for collectfunc (type of sequence)
- colfunc: function to process inputQueue into outputQueue
- prfunc: function to process outputQueue
- maxThreads: MAX_THREADS
"""

self.data = data
self.inputQueue = Queue.Queue()
self.outputQueue = Queue.Queue()
self.threadPool = []

#Start the worker threads
for i in range(maxThreads):
collector = _CollectorThread(self.inputQueue,
self.outputQueue,
colfunc)
collector.start()
self.threadPool.append(collector)

#Start the db thread
self.processthread = _ProcessThread(self.outputQueue, prfunc)
self.processthread.start()

def run(self):
"""Queue the data and shutdown the threads."""
self._queueData()
self._shutdown()

def _queueData(self):
"""Put data onto the inputQueue."""
for d in self.data:
self.inputQueue.put_nowait(_Token(data=d))

def _shutdown(self):
for i in self.threadPool:
self.inputQueue.put(_Token(shutdown=True))
for thread in self.threadPool:
thread.join()
self.outputQueue.put(_Token(shutdown=True))
self.processthread.join()

if __name__ == '__main__':
def myprint(s):
print s

def hashdata(a):
return a + ': OK'

MAX_THREADS = 5

data = ['1', '2', 'asd', 'qwe']
collect = Collector(data=data, colfunc=hashdata, prfunc=myprint, maxThreads=MAX_THREADS)
collect.run()
 
A

Aahz

I wrote a little class to make multihreading easier. It's based on one
of aahz's threading example scripts. What it does:
Thanks!

It spawns up number of CollectorThreads and one ProcessThread. The
CollectorThreads listen on one queue (inputqueue), read, process the
data (with colfunc), put the result onto the outputqueue. The
ProcessThread listens on the outputqueue, reads, processes (with
prfunc). end. (more details in the attached file)

it seems to work with test functions but when I use a network-intensive
function (snmp-queries) it just gets slower with maxThreads set to more
than 1.

Hrm. There's nothing obviously wrong. What happens if you use it as a
spider?
 

Ask a Question

Want to reply to this thread or ask your own question?

You'll need to choose a username for the site, which only take a couple of moments. After that, you can post your question and our members will help you out.

Ask a Question

Members online

No members online now.

Forum statistics

Threads
473,755
Messages
2,569,536
Members
45,011
Latest member
AjaUqq1950

Latest Threads

Top