Cost of Queue.put

J

James

Hi all,
I'm struggling with the following problem:

I need to disconnect a web service invocation from the addition of
some content to a search engine, so that the web service will always
return in reasonable time.

Basically, what I need is a multi-process safe persistent quick queue.
The arrangement I had was a simple XML-RPC service running on the web
server which the various web server threads POST the relevant search
engine updates to. These updates were added to a persistent queue
built on top of Queue.Queue and cPickle, and there was a separate
thread in the XML-RPC server actually adding the updates to the search
engine.

However, the CPU consumption of the XML-RPC server seems to grow
pretty much linearly with the length of the persistent queue. Isn't
Queue.put O(1)?

I include the code for the persistent queue below - it's a modified
version of this code: http://mail.python.org/pipermail/python-list/2002-December/177394.html

I suppose the addition of cPickle to the mix must hurt Queue.put()...
Any recommendations for alternative persistent queues?

Thanks,
James


class PickleQueue(Queue.Queue):
"""A multi-producer, multi-consumer, persistent queue."""


def __init__(self, filename, maxsize=0):
"""Initialize a persistent queue with a filename and maximum
size.


The filename is used as a persistent data store for the
queue.
If maxsize <= 0, the queue size is infinite.
"""
self.filename = filename
Queue.Queue.__init__(self, maxsize)

def _init(self, maxsize):
# Implements Queue protocol _init for persistent queue.
# Sets up the pickle files.
self.maxsize = maxsize
try:
self.readfile = file(self.filename, 'r')
self.queue = cPickle.load(self.readfile)
self.readfile.close()
except IOError, err:
if err.errno == 2:
# File doesn't exist, continue ...
self.queue = Queue.deque()
else:
# Some other I/O problem, reraise error
raise err
except EOFError:
# File was null? Continue ...
self.queue = Queue.deque()

# Rewrite file, so it's created if it doesn't exist,
# and raises an exception now if we aren't allowed
self.writefile = file(self.filename, 'w')
cPickle.dump(self.queue, self.writefile, 1)
log.info("(PickleQueue._init) created queue")


def __sync(self):
# Writes the queue to the pickle file.
self.writefile.seek(0)
cPickle.dump(self.queue, self.writefile, 1)
self.writefile.flush()


def _put(self, item):
# Implements Queue protocol _put for persistent queue.
self.queue.append(item)
self.__sync()


def _get(self):
# Implements Queue protocol _get for persistent queue.
item = self.queue.popleft()
self.__sync()
return item
 
G

Gabriel Genellina

Basically, what I need is a multi-process safe persistent quick queue.
The arrangement I had was a simple XML-RPC service running on the web
server which the various web server threads POST the relevant search
engine updates to. These updates were added to a persistent queue
built on top of Queue.Queue and cPickle, and there was a separate
thread in the XML-RPC server actually adding the updates to the search
engine.

However, the CPU consumption of the XML-RPC server seems to grow
pretty much linearly with the length of the persistent queue. Isn't
Queue.put O(1)?

Maybe, but pickle.dumps is very likely O(n) and dominates. Why don't
you use a database? Just insert and delete records with each get and
put.
 

Ask a Question

Want to reply to this thread or ask your own question?

You'll need to choose a username for the site, which only take a couple of moments. After that, you can post your question and our members will help you out.

Ask a Question

Members online

Forum statistics

Threads
473,756
Messages
2,569,531
Members
45,005
Latest member
MellisaSti

Latest Threads

Top