Question regarding Queue object

Discussion in 'Python' started by Terry, Apr 27, 2008.

  1. Terry

    Terry Guest

    Hello!

    I'm trying to implement a message queue among threads using Queue. The
    message queue has two operations:
    PutMsg(id, msg) # this is simple, just combine the id and msg as one
    and put it into the Queue.
    WaitMsg(ids, msg) # this is the hard part

    WaitMsg will get only msg with certain ids, but this is not possible
    in Queue object, because Queue provides no method to peek into the
    message queue and fetch only matched item.

    Now I'm using an ugly solution, fetch all the messages and put the not
    used ones back to the queue. But I want a better performance. Is there
    any alternative out there?

    This is my current solution:

    def _get_with_ids(self,wait, timeout, ids):
    to = timeout
    msg = None
    saved = []
    while True:
    start = time.clock()
    msg =self.q.get(wait, to)
    if msg and msg['id'] in ids:
    break;
    # not the expecting message, save it.
    saved.append(msg)
    to = to - (time.clock()-start)
    if to <= 0:
    break
    # put the saved messages back to the queue
    for m in saved:
    self.q.put(m, True)
    return msg

    br, Terry
    Terry, Apr 27, 2008
    #1
    1. Advertising

  2. Terry

    Terry Guest

    On Apr 27, 6:27 pm, Terry <> wrote:
    > Hello!
    >
    > I'm trying to implement a message queue among threads using Queue. The
    > message queue has two operations:
    > PutMsg(id, msg) # this is simple, just combine the id and msg as one
    > and put it into the Queue.
    > WaitMsg(ids, msg) # this is the hard part
    >
    > WaitMsg will get only msg with certain ids, but this is not possible
    > in Queue object, because Queue provides no method to peek into the
    > message queue and fetch only matched item.
    >
    > Now I'm using an ugly solution, fetch all the messages and put the not
    > used ones back to the queue. But I want a better performance. Is there
    > any alternative out there?
    >
    > This is my current solution:
    >
    > def _get_with_ids(self,wait, timeout, ids):
    > to = timeout
    > msg = None
    > saved = []
    > while True:
    > start = time.clock()
    > msg =self.q.get(wait, to)
    > if msg and msg['id'] in ids:
    > break;
    > # not the expecting message, save it.
    > saved.append(msg)
    > to = to - (time.clock()-start)
    > if to <= 0:
    > break
    > # put the saved messages back to the queue
    > for m in saved:
    > self.q.put(m, True)
    > return msg
    >
    > br, Terry


    I just found that Queue is written in Python, maybe I can override it.
    Terry, Apr 27, 2008
    #2
    1. Advertising

  3. Terry

    David Guest

    > WaitMsg will get only msg with certain ids, but this is not possible
    > in Queue object, because Queue provides no method to peek into the
    > message queue and fetch only matched item.
    >
    > Now I'm using an ugly solution, fetch all the messages and put the not
    > used ones back to the queue. But I want a better performance. Is there
    > any alternative out there?
    >


    You could try a defaultdict containing queues, one queue per message ID.

    Or you could implement your own thread-safe LookAheadQueue class.

    David
    David, Apr 27, 2008
    #3
  4. Terry

    David Guest

    (re-cc-ing the list)

    On Sun, Apr 27, 2008 at 4:40 PM, Terry Yin <> wrote:
    > Defaultdict is not an option because there will be a lot of message IDs (and
    > increasing). I will implement LookAheadQueue class by overriding the Queue
    > class.
    >
    > Thanks for your kind advice.
    >
    > BTW, I have been in old-fashion telecommunication R&D for years, where
    > messages and state machines are heavily used in software development. And
    > this makes me automatically resort to messages between task-specific
    > processes/threads when designing any software, even in python. I'm wondering
    > if this is the right choice, or it's already not a modern way of design.
    >


    There are a lot of ways you could go about it, those 2 were the first
    that came to mind.

    Another idea would be to have multiple queues, one per thread or per
    message type "group". The producer thread pushes into the appropriate
    queues (through an intelligent PutMsg function), and the consumer
    threads pull from the queues they're interested in and ignore the
    others.

    If your apps are heavily threaded you might take a look at Stackless
    Python: http://www.stackless.com/

    David.
    David, Apr 27, 2008
    #4
  5. Terry

    Guest

    I've never used it myself but you may find candygram interesting;
    http://candygram.sourceforge.net, which AFAIK implements Erlang-style
    message queues in Python.
    , Apr 28, 2008
    #5
  6. Terry

    Terry Guest

    On Apr 28, 5:30 pm, Nick Craig-Wood <> wrote:
    > David <> wrote:
    > > Another idea would be to have multiple queues, one per thread or per
    > > message type "group". The producer thread pushes into the appropriate
    > > queues (through an intelligent PutMsg function), and the consumer
    > > threads pull from the queues they're interested in and ignore the
    > > others.

    >
    > Unfortunately a thread can only wait on one Queue at once (without
    > polling). So really the only efficient solution is one Queue per
    > thread.
    >
    > Make an intelligent PutMsg function which knows which Queue (or
    > Queues) each message needs to be put in and all the threads will have
    > to do is Queue.get() and be sure they've got a message they can deal
    > with.
    >
    > --
    > Nick Craig-Wood <> --http://www.craig-wood.com/nick



    I do have one Queue per thread. The problem is the thread can not peek
    into the Queue and select msg with certain ID first.
    Terry, Apr 29, 2008
    #6
  7. Terry

    Terry Guest

    On Apr 28, 10:48 pm, "" <> wrote:
    > I've never used it myself but you may find candygram interesting;http://candygram.sourceforge.net, which AFAIK implements Erlang-style
    > message queues in Python.


    Thank you. I will look at candygram and stackless. I believe my
    solution lies in either of them.
    Terry, Apr 29, 2008
    #7
  8. Terry

    Guest

    On 27 Apr, 12:27, Terry <> wrote:
    > Hello!
    >
    > I'm trying to implement a message queue among threads using Queue. The
    > message queue has two operations:
    > PutMsg(id, msg) #  this is simple, just combine the id and msg as one
    > and put it into the Queue.
    > WaitMsg(ids, msg) # this is the hard part
    >
    > WaitMsg will get only msg with certain ids, but this is not possible
    > in Queue object, because Queue provides no method to peek into the
    > message queue and fetch only matched item.
    >
    > Now I'm using an ugly solution, fetch all the messages and put the not
    > used ones back to the queue. But I want a better performance. Is there
    > any alternative out there?
    >
    > This is my current solution:
    >
    >     def _get_with_ids(self,wait, timeout, ids):
    >         to = timeout
    >         msg = None
    >         saved = []
    >         while True:
    >             start = time.clock()
    >             msg =self.q.get(wait, to)
    >             if msg and msg['id'] in ids:
    >                 break;
    >             # not the expecting message, save it.
    >             saved.append(msg)
    >             to = to - (time.clock()-start)
    >             if to <= 0:
    >                 break
    >         # put the saved messages back to the queue
    >         for m in saved:
    >             self.q.put(m, True)
    >         return msg
    >
    > br, Terry


    Wy put them back in the queue?
    You could have a defaultdict with the id as key and a list of
    unprocessed messages with that id as items.
    Your _get_by_ids function could first look into the unprocessed
    messages for items with that ids and then
    look into the queue, putting any unprocessed item in the dictionary,
    for later processing.
    This should improve the performances, with a little complication of
    the method code (but way simpler
    that implementing your own priority-based queue).

    Ciao
    -----
    FB
    , Apr 29, 2008
    #8
  9. Terry

    Terry Guest

    On Apr 29, 3:01 pm, Dennis Lee Bieber <> wrote:
    > On Sun, 27 Apr 2008 03:27:59 -0700 (PDT), Terry <>
    > declaimed the following in comp.lang.python:
    >
    > > I'm trying to implement a message queue among threads using Queue. The
    > > message queue has two operations:
    > > PutMsg(id, msg) # this is simple, just combine the id and msg as one
    > > and put it into the Queue.
    > > WaitMsg(ids, msg) # this is the hard part

    >
    > > WaitMsg will get only msg with certain ids, but this is not possible
    > > in Queue object, because Queue provides no method to peek into the
    > > message queue and fetch only matched item.

    >
    > > Now I'm using an ugly solution, fetch all the messages and put the not
    > > used ones back to the queue. But I want a better performance. Is there
    > > any alternative out there?

    >
    > Create your own queue class -- including locking objects.
    >
    > Implement the queue itself (I've not looked at how Queue.Queue is
    > really done) as a priority queue (that is, a simple list ordered by your
    > ID -- new items are inserted after all existing items with the same or
    > lower ID number).
    >
    > Surround list manipulations with a lock based on a Condition.
    >
    > Now, the trick -- the .get(ID) sequence being something like (this
    > is pseudo-code):
    >
    > while True:
    > self.condition.acquire()
    > scan self.qlist for first entry with ID
    > if found:
    > remove entry from self.qlist
    > self.condition.release()
    > return entry
    > self.condition.wait()
    >
    > -=-=-=-=- the .put(ID, data) looks like
    >
    > self.condition.acquire()
    > scan self.qlist for position to insert (ID, data)
    > self.condition.notifyAll()
    > self.condition.release()
    >
    > -=-=-=-=-
    >
    > Essentially, if the first pass over the list does not find an entry
    > to return, it waits for a notify to occur... and notification will only
    > occur when some other thread puts new data into the list.
    > --
    > Wulfraed Dennis Lee Bieber KD6MOG
    >
    > HTTP://wlfraed.home.netcom.com/
    > (Bestiaria Support Staff: )
    > HTTP://www.bestiaria.com/


    Yes, now I have a similar solution in my code. But after read the
    stackless python, I'm thinking if I can move to stackless, which might
    improve the performance of my thread. Because I'm trying to simulate
    some behavior of the real world (trading), I believe there will be a
    lot of threads in the future in my program.
    Terry, Apr 29, 2008
    #9
  10. Terry

    Terry Guest

    On Apr 29, 4:32 pm, wrote:
    > On 27 Apr, 12:27, Terry <> wrote:
    >
    >
    >
    > > Hello!

    >
    > > I'm trying to implement a message queue among threads using Queue. The
    > > message queue has two operations:
    > > PutMsg(id, msg) # this is simple, just combine the id and msg as one
    > > and put it into the Queue.
    > > WaitMsg(ids, msg) # this is the hard part

    >
    > > WaitMsg will get only msg with certain ids, but this is not possible
    > > in Queue object, because Queue provides no method to peek into the
    > > message queue and fetch only matched item.

    >
    > > Now I'm using an ugly solution, fetch all the messages and put the not
    > > used ones back to the queue. But I want a better performance. Is there
    > > any alternative out there?

    >
    > > This is my current solution:

    >
    > > def _get_with_ids(self,wait, timeout, ids):
    > > to = timeout
    > > msg = None
    > > saved = []
    > > while True:
    > > start = time.clock()
    > > msg =self.q.get(wait, to)
    > > if msg and msg['id'] in ids:
    > > break;
    > > # not the expecting message, save it.
    > > saved.append(msg)
    > > to = to - (time.clock()-start)
    > > if to <= 0:
    > > break
    > > # put the saved messages back to the queue
    > > for m in saved:
    > > self.q.put(m, True)
    > > return msg

    >
    > > br, Terry

    >
    > Wy put them back in the queue?
    > You could have a defaultdict with the id as key and a list of
    > unprocessed messages with that id as items.
    > Your _get_by_ids function could first look into the unprocessed
    > messages for items with that ids and then
    > look into the queue, putting any unprocessed item in the dictionary,
    > for later processing.
    > This should improve the performances, with a little complication of
    > the method code (but way simpler
    > that implementing your own priority-based queue).
    >
    > Ciao
    > -----
    > FB


    Yes, this will improve the performance. And I can see there's a
    problem in my current implementation. The order of the message might
    be changed if I put the saved message back to the end of the queue.
    This may cause some confusion later, though I don't want to depend too
    much on the message orders.

    And you remind me one thing -- I need to implement 'priority' for
    messages, so that the message with highest priority will tend to be
    fetched first. OMG, this is going to be much more complicated then I
    have expected.

    Thanks for your suggestion. And I hope this will also work when I move
    to stackless.
    Terry, Apr 29, 2008
    #10
  11. Terry

    Terry Guest

    On Apr 29, 5:30 pm, Nick Craig-Wood <> wrote:
    > Terry <> wrote:
    > > On Apr 28, 5:30 pm, Nick Craig-Wood <> wrote:
    > > > David <> wrote:
    > > > > Another idea would be to have multiple queues, one per thread or per
    > > > > message type "group". The producer thread pushes into the appropriate
    > > > > queues (through an intelligent PutMsg function), and the consumer
    > > > > threads pull from the queues they're interested in and ignore the
    > > > > others.

    >
    > > > Unfortunately a thread can only wait on one Queue at once (without
    > > > polling). So really the only efficient solution is one Queue per
    > > > thread.

    >
    > > > Make an intelligent PutMsg function which knows which Queue (or
    > > > Queues) each message needs to be put in and all the threads will have
    > > > to do is Queue.get() and be sure they've got a message they can deal
    > > > with.

    >
    > > I do have one Queue per thread. The problem is the thread can not peek
    > > into the Queue and select msg with certain ID first.

    >
    > My point is don't put messages that the thread doesn't need in the
    > queue in the first place. Ie move that logic into PutMsg.
    >
    > --
    > Nick Craig-Wood <> --http://www.craig-wood.com/nick


    Well, I'm simulating the real world. It's like that you wouldn't drop
    or proceed a task when you already started your lunch, just save it
    and process it later when you finish your lunch.
    Of course the task sender can send the task again and again if he got
    not ack from you. But that's just one possible situation in the real
    world, and not an efficient one.
    Terry, Apr 29, 2008
    #11
    1. Advertising

Want to reply to this thread or ask your own question?

It takes just 2 minutes to sign up (and it's free!). Just click the sign up button to choose a username and then you can ask your own questions on the forum.
Similar Threads
  1. Paul L. Du Bois

    Queue.Queue-like class without the busy-wait

    Paul L. Du Bois, Mar 24, 2005, in forum: Python
    Replies:
    29
    Views:
    1,054
    Antoon Pardon
    Apr 4, 2005
  2. Russell Warren

    Is Queue.Queue.queue.clear() thread-safe?

    Russell Warren, Jun 22, 2006, in forum: Python
    Replies:
    4
    Views:
    678
    Russell Warren
    Jun 27, 2006
  3. Kceiw
    Replies:
    3
    Views:
    994
    Jim Langston
    Mar 14, 2006
  4. Frank Millman
    Replies:
    3
    Views:
    344
    Frank Millman
    Feb 9, 2010
  5. Kris
    Replies:
    0
    Views:
    479
Loading...

Share This Page