J
jrpfinch
I have a script which is based on the following code. Unfortunately,
it only works on Python 2.3 and not 2.5 because there is no esema or
fsema attribute in the 2.5 Queue. I am hunting through the Queue.py
code now to try to figure out how to make it work in 2.5, but as I am
a beginner, I am having difficulty and would appreciate your help.
Many thanks
Jon
import os
import Queue
import threading
import time
import cPickle
class PickleQueue(Queue.Queue):
"""A multi-producer, multi-consumer, persistent queue."""
def __init__(self, filename, maxsize=0):
"""Initialize a persistent queue with a filename and maximum
size.
The filename is used as a persistent data store for the
queue.
If maxsize <= 0, the queue size is infinite.
"""
self.filename = filename
Queue.Queue.__init__(self, maxsize)
if self.queue:
self.esema.release()
if self._full():
self.fsema.acquire()
def _init(self, maxsize):
# Implements Queue protocol _init for persistent queue.
# Sets up the pickle files.
self.maxsize = maxsize
try:
self.readfile = file(self.filename, 'r')
self.queue = cPickle.load(self.readfile)
self.readfile.close()
except IOError, err:
if err.errno == 2:
# File doesn't exist, continue ...
self.queue = []
else:
# Some other I/O problem, reraise error
raise err
except EOFError:
# File was null? Continue ...
self.queue = []
# Rewrite file, so it's created if it doesn't exist,
# and raises an exception now if we aren't allowed
self.writefile = file(self.filename, 'w')
cPickle.dump(self.queue, self.writefile, 1)
def __sync(self):
# Writes the queue to the pickle file.
self.writefile.seek(0)
cPickle.dump(self.queue, self.writefile, 1)
self.writefile.flush()
def _put(self, item):
# Implements Queue protocol _put for persistent queue.
self.queue.append(item)
self.__sync()
def _get(self):
# Implements Queue protocol _get for persistent queue.
item = self.queue[0]
del self.queue[0]
self.__sync()
return item
class counterThread(threading.Thread):
numberQueue = PickleQueue('/export/home/jrpf/data.pkl')
exitCounterQueue = Queue.Queue(1)
def run(self):
command = ''
i = 0
while 1:
self.numberQueue.put(i)
if i > 10:
print "i > 10 so attempting to exit"
wt.exit()
self.exit()
print i
try:
command = self.exitCounterQueue.get(block=False)
except Queue.Empty:
pass
if command == 'exit':
print "Counter thread exited"
break
i = i + 1
time.sleep(1)
def exit(self):
self.exitCounterQueue.put('exit')
def main():
ct = counterThread()
ct.setDaemon(True)
ct.start()
ct.join()
if __name__ == "__main__":
main()
it only works on Python 2.3 and not 2.5 because there is no esema or
fsema attribute in the 2.5 Queue. I am hunting through the Queue.py
code now to try to figure out how to make it work in 2.5, but as I am
a beginner, I am having difficulty and would appreciate your help.
Many thanks
Jon
import os
import Queue
import threading
import time
import cPickle
class PickleQueue(Queue.Queue):
"""A multi-producer, multi-consumer, persistent queue."""
def __init__(self, filename, maxsize=0):
"""Initialize a persistent queue with a filename and maximum
size.
The filename is used as a persistent data store for the
queue.
If maxsize <= 0, the queue size is infinite.
"""
self.filename = filename
Queue.Queue.__init__(self, maxsize)
if self.queue:
self.esema.release()
if self._full():
self.fsema.acquire()
def _init(self, maxsize):
# Implements Queue protocol _init for persistent queue.
# Sets up the pickle files.
self.maxsize = maxsize
try:
self.readfile = file(self.filename, 'r')
self.queue = cPickle.load(self.readfile)
self.readfile.close()
except IOError, err:
if err.errno == 2:
# File doesn't exist, continue ...
self.queue = []
else:
# Some other I/O problem, reraise error
raise err
except EOFError:
# File was null? Continue ...
self.queue = []
# Rewrite file, so it's created if it doesn't exist,
# and raises an exception now if we aren't allowed
self.writefile = file(self.filename, 'w')
cPickle.dump(self.queue, self.writefile, 1)
def __sync(self):
# Writes the queue to the pickle file.
self.writefile.seek(0)
cPickle.dump(self.queue, self.writefile, 1)
self.writefile.flush()
def _put(self, item):
# Implements Queue protocol _put for persistent queue.
self.queue.append(item)
self.__sync()
def _get(self):
# Implements Queue protocol _get for persistent queue.
item = self.queue[0]
del self.queue[0]
self.__sync()
return item
class counterThread(threading.Thread):
numberQueue = PickleQueue('/export/home/jrpf/data.pkl')
exitCounterQueue = Queue.Queue(1)
def run(self):
command = ''
i = 0
while 1:
self.numberQueue.put(i)
if i > 10:
print "i > 10 so attempting to exit"
wt.exit()
self.exit()
print i
try:
command = self.exitCounterQueue.get(block=False)
except Queue.Empty:
pass
if command == 'exit':
print "Counter thread exited"
break
i = i + 1
time.sleep(1)
def exit(self):
self.exitCounterQueue.put('exit')
def main():
ct = counterThread()
ct.setDaemon(True)
ct.start()
ct.join()
if __name__ == "__main__":
main()