Calling Queue experts

J

jrpfinch

I have a script which is based on the following code. Unfortunately,
it only works on Python 2.3 and not 2.5 because there is no esema or
fsema attribute in the 2.5 Queue. I am hunting through the Queue.py
code now to try to figure out how to make it work in 2.5, but as I am
a beginner, I am having difficulty and would appreciate your help.

Many thanks

Jon

import os
import Queue
import threading
import time
import cPickle

class PickleQueue(Queue.Queue):
"""A multi-producer, multi-consumer, persistent queue."""


def __init__(self, filename, maxsize=0):
"""Initialize a persistent queue with a filename and maximum
size.


The filename is used as a persistent data store for the
queue.
If maxsize <= 0, the queue size is infinite.
"""
self.filename = filename
Queue.Queue.__init__(self, maxsize)
if self.queue:
self.esema.release()
if self._full():
self.fsema.acquire()


def _init(self, maxsize):
# Implements Queue protocol _init for persistent queue.
# Sets up the pickle files.
self.maxsize = maxsize
try:
self.readfile = file(self.filename, 'r')
self.queue = cPickle.load(self.readfile)
self.readfile.close()
except IOError, err:
if err.errno == 2:
# File doesn't exist, continue ...
self.queue = []
else:
# Some other I/O problem, reraise error
raise err
except EOFError:
# File was null? Continue ...
self.queue = []


# Rewrite file, so it's created if it doesn't exist,
# and raises an exception now if we aren't allowed
self.writefile = file(self.filename, 'w')
cPickle.dump(self.queue, self.writefile, 1)


def __sync(self):
# Writes the queue to the pickle file.
self.writefile.seek(0)
cPickle.dump(self.queue, self.writefile, 1)
self.writefile.flush()


def _put(self, item):
# Implements Queue protocol _put for persistent queue.
self.queue.append(item)
self.__sync()


def _get(self):
# Implements Queue protocol _get for persistent queue.
item = self.queue[0]
del self.queue[0]
self.__sync()
return item

class counterThread(threading.Thread):
numberQueue = PickleQueue('/export/home/jrpf/data.pkl')
exitCounterQueue = Queue.Queue(1)

def run(self):
command = ''
i = 0
while 1:
self.numberQueue.put(i)
if i > 10:
print "i > 10 so attempting to exit"
wt.exit()
self.exit()


print i
try:
command = self.exitCounterQueue.get(block=False)
except Queue.Empty:
pass
if command == 'exit':
print "Counter thread exited"
break
i = i + 1
time.sleep(1)

def exit(self):
self.exitCounterQueue.put('exit')

def main():

ct = counterThread()
ct.setDaemon(True)
ct.start()
ct.join()

if __name__ == "__main__":
main()
 
J

jrpfinch

Got it. New PickleQueue class should be as follows:

import Queue
import cPickle

class PickleQueue(Queue.Queue):
"""A multi-producer, multi-consumer, persistent queue."""


def __init__(self, filename, maxsize=0):
"""Initialize a persistent queue with a filename and maximum
size.


The filename is used as a persistent data store for the
queue.
If maxsize <= 0, the queue size is infinite.
"""
self.filename = filename
Queue.Queue.__init__(self, maxsize)
print self.queue

def _init(self, maxsize):
# Implements Queue protocol _init for persistent queue.
# Sets up the pickle files.
self.maxsize = maxsize
try:
self.readfile = file(self.filename, 'r')
self.queue = cPickle.load(self.readfile)
self.readfile.close()
except IOError, err:
if err.errno == 2:
# File doesn't exist, continue ...
self.queue = Queue.deque()
else:
# Some other I/O problem, reraise error
raise err
except EOFError:
# File was null? Continue ...
self.queue = Queue.deque()


# Rewrite file, so it's created if it doesn't exist,
# and raises an exception now if we aren't allowed
self.writefile = file(self.filename, 'w')
cPickle.dump(self.queue, self.writefile, 1)


def __sync(self):
# Writes the queue to the pickle file.
self.writefile.seek(0)
cPickle.dump(self.queue, self.writefile, 1)
self.writefile.flush()


def _put(self, item):
# Implements Queue protocol _put for persistent queue.
self.queue.append(item)
self.__sync()


def _get(self):
# Implements Queue protocol _get for persistent queue.
item = self.queue.popleft()
self.__sync()
return item
 
S

skip

jrpfinch> # Some other I/O problem, reraise error
jrpfinch> raise err

I'd just execute a bare raise (without err). That way the caller gets the
stack trace of the actual IOError.

Skip
 
G

Gabriel Genellina

Got it. New PickleQueue class should be as follows:

Only a comment:
def _init(self, maxsize):
# Implements Queue protocol _init for persistent queue.
# Sets up the pickle files.
self.maxsize = maxsize
try:
self.readfile = file(self.filename, 'r')
self.queue = cPickle.load(self.readfile)
self.readfile.close()
except IOError, err:
if err.errno == 2:
# File doesn't exist, continue ...
self.queue = Queue.deque()
else:
# Some other I/O problem, reraise error
raise err
except EOFError:
# File was null? Continue ...
self.queue = Queue.deque()


# Rewrite file, so it's created if it doesn't exist,
# and raises an exception now if we aren't allowed
self.writefile = file(self.filename, 'w')
cPickle.dump(self.queue, self.writefile, 1)

self.readfile may be left open in case of error, I'd use a try/finally.
And since it isn't used anywhere, I'd just use a local variable instead of
an instance attribute.
And the final write is not necesary when you have just read it - and
alters the "last-modified-time" (that may not be relevant for you, of
course, but as a general tool it may confuse other users).
 

Ask a Question

Want to reply to this thread or ask your own question?

You'll need to choose a username for the site, which only take a couple of moments. After that, you can post your question and our members will help you out.

Ask a Question

Members online

Forum statistics

Threads
474,037
Messages
2,570,371
Members
47,014
Latest member
TashaMorei

Latest Threads

Top