Calling Queue experts

Discussion in 'Python' started by jrpfinch, Mar 26, 2007.

  1. jrpfinch

    jrpfinch Guest

    I have a script which is based on the following code. Unfortunately,
    it only works on Python 2.3 and not 2.5 because there is no esema or
    fsema attribute in the 2.5 Queue. I am hunting through the Queue.py
    code now to try to figure out how to make it work in 2.5, but as I am
    a beginner, I am having difficulty and would appreciate your help.

    Many thanks

    Jon

    import os
    import Queue
    import threading
    import time
    import cPickle

    class PickleQueue(Queue.Queue):
    """A multi-producer, multi-consumer, persistent queue."""


    def __init__(self, filename, maxsize=0):
    """Initialize a persistent queue with a filename and maximum
    size.


    The filename is used as a persistent data store for the
    queue.
    If maxsize <= 0, the queue size is infinite.
    """
    self.filename = filename
    Queue.Queue.__init__(self, maxsize)
    if self.queue:
    self.esema.release()
    if self._full():
    self.fsema.acquire()


    def _init(self, maxsize):
    # Implements Queue protocol _init for persistent queue.
    # Sets up the pickle files.
    self.maxsize = maxsize
    try:
    self.readfile = file(self.filename, 'r')
    self.queue = cPickle.load(self.readfile)
    self.readfile.close()
    except IOError, err:
    if err.errno == 2:
    # File doesn't exist, continue ...
    self.queue = []
    else:
    # Some other I/O problem, reraise error
    raise err
    except EOFError:
    # File was null? Continue ...
    self.queue = []


    # Rewrite file, so it's created if it doesn't exist,
    # and raises an exception now if we aren't allowed
    self.writefile = file(self.filename, 'w')
    cPickle.dump(self.queue, self.writefile, 1)


    def __sync(self):
    # Writes the queue to the pickle file.
    self.writefile.seek(0)
    cPickle.dump(self.queue, self.writefile, 1)
    self.writefile.flush()


    def _put(self, item):
    # Implements Queue protocol _put for persistent queue.
    self.queue.append(item)
    self.__sync()


    def _get(self):
    # Implements Queue protocol _get for persistent queue.
    item = self.queue[0]
    del self.queue[0]
    self.__sync()
    return item

    class counterThread(threading.Thread):
    numberQueue = PickleQueue('/export/home/jrpf/data.pkl')
    exitCounterQueue = Queue.Queue(1)

    def run(self):
    command = ''
    i = 0
    while 1:
    self.numberQueue.put(i)
    if i > 10:
    print "i > 10 so attempting to exit"
    wt.exit()
    self.exit()


    print i
    try:
    command = self.exitCounterQueue.get(block=False)
    except Queue.Empty:
    pass
    if command == 'exit':
    print "Counter thread exited"
    break
    i = i + 1
    time.sleep(1)

    def exit(self):
    self.exitCounterQueue.put('exit')

    def main():

    ct = counterThread()
    ct.setDaemon(True)
    ct.start()
    ct.join()

    if __name__ == "__main__":
    main()
     
    jrpfinch, Mar 26, 2007
    #1
    1. Advertising

  2. jrpfinch

    jrpfinch Guest

    Got it. New PickleQueue class should be as follows:

    import Queue
    import cPickle

    class PickleQueue(Queue.Queue):
    """A multi-producer, multi-consumer, persistent queue."""


    def __init__(self, filename, maxsize=0):
    """Initialize a persistent queue with a filename and maximum
    size.


    The filename is used as a persistent data store for the
    queue.
    If maxsize <= 0, the queue size is infinite.
    """
    self.filename = filename
    Queue.Queue.__init__(self, maxsize)
    print self.queue

    def _init(self, maxsize):
    # Implements Queue protocol _init for persistent queue.
    # Sets up the pickle files.
    self.maxsize = maxsize
    try:
    self.readfile = file(self.filename, 'r')
    self.queue = cPickle.load(self.readfile)
    self.readfile.close()
    except IOError, err:
    if err.errno == 2:
    # File doesn't exist, continue ...
    self.queue = Queue.deque()
    else:
    # Some other I/O problem, reraise error
    raise err
    except EOFError:
    # File was null? Continue ...
    self.queue = Queue.deque()


    # Rewrite file, so it's created if it doesn't exist,
    # and raises an exception now if we aren't allowed
    self.writefile = file(self.filename, 'w')
    cPickle.dump(self.queue, self.writefile, 1)


    def __sync(self):
    # Writes the queue to the pickle file.
    self.writefile.seek(0)
    cPickle.dump(self.queue, self.writefile, 1)
    self.writefile.flush()


    def _put(self, item):
    # Implements Queue protocol _put for persistent queue.
    self.queue.append(item)
    self.__sync()


    def _get(self):
    # Implements Queue protocol _get for persistent queue.
    item = self.queue.popleft()
    self.__sync()
    return item
     
    jrpfinch, Mar 26, 2007
    #2
    1. Advertising

  3. jrpfinch

    Guest

    jrpfinch> # Some other I/O problem, reraise error
    jrpfinch> raise err

    I'd just execute a bare raise (without err). That way the caller gets the
    stack trace of the actual IOError.

    Skip
     
    , Mar 26, 2007
    #3
  4. En Mon, 26 Mar 2007 07:29:32 -0300, jrpfinch <> escribió:

    > Got it. New PickleQueue class should be as follows:


    Only a comment:

    > def _init(self, maxsize):
    > # Implements Queue protocol _init for persistent queue.
    > # Sets up the pickle files.
    > self.maxsize = maxsize
    > try:
    > self.readfile = file(self.filename, 'r')
    > self.queue = cPickle.load(self.readfile)
    > self.readfile.close()
    > except IOError, err:
    > if err.errno == 2:
    > # File doesn't exist, continue ...
    > self.queue = Queue.deque()
    > else:
    > # Some other I/O problem, reraise error
    > raise err
    > except EOFError:
    > # File was null? Continue ...
    > self.queue = Queue.deque()
    >
    >
    > # Rewrite file, so it's created if it doesn't exist,
    > # and raises an exception now if we aren't allowed
    > self.writefile = file(self.filename, 'w')
    > cPickle.dump(self.queue, self.writefile, 1)


    self.readfile may be left open in case of error, I'd use a try/finally.
    And since it isn't used anywhere, I'd just use a local variable instead of
    an instance attribute.
    And the final write is not necesary when you have just read it - and
    alters the "last-modified-time" (that may not be relevant for you, of
    course, but as a general tool it may confuse other users).

    --
    Gabriel Genellina
     
    Gabriel Genellina, Mar 26, 2007
    #4
    1. Advertising

Want to reply to this thread or ask your own question?

It takes just 2 minutes to sign up (and it's free!). Just click the sign up button to choose a username and then you can ask your own questions on the forum.
Similar Threads
  1. Paul L. Du Bois

    Queue.Queue-like class without the busy-wait

    Paul L. Du Bois, Mar 24, 2005, in forum: Python
    Replies:
    29
    Views:
    1,080
    Antoon Pardon
    Apr 4, 2005
  2. Russell Warren

    Is Queue.Queue.queue.clear() thread-safe?

    Russell Warren, Jun 22, 2006, in forum: Python
    Replies:
    4
    Views:
    702
    Russell Warren
    Jun 27, 2006
  3. Kceiw
    Replies:
    3
    Views:
    1,017
    Jim Langston
    Mar 14, 2006
  4. Gabriel Rossetti
    Replies:
    3
    Views:
    578
    Jerry Hill
    Apr 25, 2008
  5. Kris
    Replies:
    0
    Views:
    505
Loading...

Share This Page