J
James
Hi all,
I'm struggling with the following problem:
I need to disconnect a web service invocation from the addition of
some content to a search engine, so that the web service will always
return in reasonable time.
Basically, what I need is a multi-process safe persistent quick queue.
The arrangement I had was a simple XML-RPC service running on the web
server which the various web server threads POST the relevant search
engine updates to. These updates were added to a persistent queue
built on top of Queue.Queue and cPickle, and there was a separate
thread in the XML-RPC server actually adding the updates to the search
engine.
However, the CPU consumption of the XML-RPC server seems to grow
pretty much linearly with the length of the persistent queue. Isn't
Queue.put O(1)?
I include the code for the persistent queue below - it's a modified
version of this code: http://mail.python.org/pipermail/python-list/2002-December/177394.html
I suppose the addition of cPickle to the mix must hurt Queue.put()...
Any recommendations for alternative persistent queues?
Thanks,
James
class PickleQueue(Queue.Queue):
"""A multi-producer, multi-consumer, persistent queue."""
def __init__(self, filename, maxsize=0):
"""Initialize a persistent queue with a filename and maximum
size.
The filename is used as a persistent data store for the
queue.
If maxsize <= 0, the queue size is infinite.
"""
self.filename = filename
Queue.Queue.__init__(self, maxsize)
def _init(self, maxsize):
# Implements Queue protocol _init for persistent queue.
# Sets up the pickle files.
self.maxsize = maxsize
try:
self.readfile = file(self.filename, 'r')
self.queue = cPickle.load(self.readfile)
self.readfile.close()
except IOError, err:
if err.errno == 2:
# File doesn't exist, continue ...
self.queue = Queue.deque()
else:
# Some other I/O problem, reraise error
raise err
except EOFError:
# File was null? Continue ...
self.queue = Queue.deque()
# Rewrite file, so it's created if it doesn't exist,
# and raises an exception now if we aren't allowed
self.writefile = file(self.filename, 'w')
cPickle.dump(self.queue, self.writefile, 1)
log.info("(PickleQueue._init) created queue")
def __sync(self):
# Writes the queue to the pickle file.
self.writefile.seek(0)
cPickle.dump(self.queue, self.writefile, 1)
self.writefile.flush()
def _put(self, item):
# Implements Queue protocol _put for persistent queue.
self.queue.append(item)
self.__sync()
def _get(self):
# Implements Queue protocol _get for persistent queue.
item = self.queue.popleft()
self.__sync()
return item
I'm struggling with the following problem:
I need to disconnect a web service invocation from the addition of
some content to a search engine, so that the web service will always
return in reasonable time.
Basically, what I need is a multi-process safe persistent quick queue.
The arrangement I had was a simple XML-RPC service running on the web
server which the various web server threads POST the relevant search
engine updates to. These updates were added to a persistent queue
built on top of Queue.Queue and cPickle, and there was a separate
thread in the XML-RPC server actually adding the updates to the search
engine.
However, the CPU consumption of the XML-RPC server seems to grow
pretty much linearly with the length of the persistent queue. Isn't
Queue.put O(1)?
I include the code for the persistent queue below - it's a modified
version of this code: http://mail.python.org/pipermail/python-list/2002-December/177394.html
I suppose the addition of cPickle to the mix must hurt Queue.put()...
Any recommendations for alternative persistent queues?
Thanks,
James
class PickleQueue(Queue.Queue):
"""A multi-producer, multi-consumer, persistent queue."""
def __init__(self, filename, maxsize=0):
"""Initialize a persistent queue with a filename and maximum
size.
The filename is used as a persistent data store for the
queue.
If maxsize <= 0, the queue size is infinite.
"""
self.filename = filename
Queue.Queue.__init__(self, maxsize)
def _init(self, maxsize):
# Implements Queue protocol _init for persistent queue.
# Sets up the pickle files.
self.maxsize = maxsize
try:
self.readfile = file(self.filename, 'r')
self.queue = cPickle.load(self.readfile)
self.readfile.close()
except IOError, err:
if err.errno == 2:
# File doesn't exist, continue ...
self.queue = Queue.deque()
else:
# Some other I/O problem, reraise error
raise err
except EOFError:
# File was null? Continue ...
self.queue = Queue.deque()
# Rewrite file, so it's created if it doesn't exist,
# and raises an exception now if we aren't allowed
self.writefile = file(self.filename, 'w')
cPickle.dump(self.queue, self.writefile, 1)
log.info("(PickleQueue._init) created queue")
def __sync(self):
# Writes the queue to the pickle file.
self.writefile.seek(0)
cPickle.dump(self.queue, self.writefile, 1)
self.writefile.flush()
def _put(self, item):
# Implements Queue protocol _put for persistent queue.
self.queue.append(item)
self.__sync()
def _get(self):
# Implements Queue protocol _get for persistent queue.
item = self.queue.popleft()
self.__sync()
return item