Thread-safe priority queue?

Discussion in 'Ruby' started by Sean O'Halpin, Jul 9, 2008.

  1. Sean O'Halpin, Jul 9, 2008
    #1
    1. Advertising

  2. Sean O'Halpin

    Robert Dober Guest

    Sean I seem to fail to understand why that change should have any
    impact on Jo=EBl's work, can you elaborate please?

    Cheers
    Robert

    --=20
    http://ruby-smalltalk.blogspot.com/

    ---
    AALST (n.) One who changes his name to be further to the front
    D.Adams; The Meaning of LIFF
     
    Robert Dober, Jul 9, 2008
    #2
    1. Advertising

  3. Sean O'Halpin

    Trans Guest

    On Jul 9, 9:08=A0am, "Sean O'Halpin" <> wrote:
    > Hi,
    >
    > Does anyone know of a solid, thread-safe priority queue implementation in=

    Ruby?
    >
    > The only one I can find is Joel Vanderwerf's
    > (http://groups.google.com/group/comp.lang.ruby/browse_thread/thread/9d...=

    )
    > which doesn't work with more recent versions of ruby (because Queue
    > implementation changed from Ruby to C).


    Hmm... I'm not sure if Facets implementation is thread safe. It's may
    be worth a look. If I recall correctly, Olivier Renaud was the last to
    work on it, so he may know more. If it isn't thread safe, btw, it
    would make a nice patch.

    T.
     
    Trans, Jul 9, 2008
    #3
  4. On Wed, Jul 9, 2008 at 6:34 PM, Robert Dober <> wrote=
    :
    > Sean I seem to fail to understand why that change should have any
    > impact on Jo=EBl's work, can you elaborate please?
    >
    > Cheers
    > Robert
    >


    I didn't explain that very well, did I? Joel's version inherits from
    Queue and directly references an instance variable (@waiting) which
    isn't there in the C version.
     
    Sean O'Halpin, Jul 9, 2008
    #4
  5. On Wed, Jul 9, 2008 at 6:50 PM, Trans <> wrote:
    >
    > Hmm... I'm not sure if Facets implementation is thread safe. It's may
    > be worth a look. If I recall correctly, Olivier Renaud was the last to
    > work on it, so he may know more. If it isn't thread safe, btw, it
    > would make a nice patch.


    Thanks for the pointer but the Facets version isn't thread-safe.
    Still searching... :)
     
    Sean O'Halpin, Jul 9, 2008
    #5
  6. Sean O'Halpin

    Roger Pack Guest

    Sean O'halpin wrote:
    > On Wed, Jul 9, 2008 at 6:50 PM, Trans <> wrote:
    >>
    >> Hmm... I'm not sure if Facets implementation is thread safe. It's may
    >> be worth a look. If I recall correctly, Olivier Renaud was the last to
    >> work on it, so he may know more. If it isn't thread safe, btw, it
    >> would make a nice patch.

    >
    > Thanks for the pointer but the Facets version isn't thread-safe.
    > Still searching... :)


    Could throw a mutex around the facets version.
    --
    Posted via http://www.ruby-forum.com/.
     
    Roger Pack, Jul 10, 2008
    #6
  7. Sean O'Halpin wrote:
    > Hi,
    >
    > Does anyone know of a solid, thread-safe priority queue implementation in Ruby?
    >
    > The only one I can find is Joel Vanderwerf's
    > (http://groups.google.com/group/comp.lang.ruby/browse_thread/thread/9d9db98931e4a74f)
    > which doesn't work with more recent versions of ruby (because Queue
    > implementation changed from Ruby to C).


    It's pretty easy to work around, I think. Try the following code. It's
    based on something I'm using in live code and it seems to pass the test
    referenced in the above link.

    Btw, it's great that RBTree is a gem now. Thanks to whoever did that.



    require 'thread'
    require 'rbtree'

    class PriorityQueue
    def size
    @tree.size
    end

    def initialize(*)
    super
    @tree = MultiRBTree.new
    @que = Queue.new
    @mutex = Mutex.new
    end

    # Push +obj+ with priority equal to +pri+ if given or, otherwise,
    # the result of sending #queue_priority to +obj+. Objects are
    # dequeued in priority order, and first-in-first-out among objects
    # with equal priorities.
    def push(obj, pri = obj.queue_priority)
    @mutex.synchronize do
    if @que.num_waiting > 0
    @que << obj
    else
    @tree.store(pri, obj)
    end
    end
    end

    def pop(non_block=false)
    @mutex.synchronize do
    if (last=@tree.last)
    return @tree.delete(last[0]) # highest key, oldest first
    end

    if non_block
    raise ThreadError, "priority queue empty"
    end
    end
    @que.pop # wait
    end
    end


    --
    vjoel : Joel VanderWerf : path berkeley edu : 510 665 3407
     
    Joel VanderWerf, Jul 10, 2008
    #7
  8. On Wed, Jul 9, 2008 at 11:17 PM, Joel VanderWerf
    <> wrote:
    >
    > Btw, it's great that RBTree is a gem now. Thanks to whoever did that.


    Ooh, seconded! Stand up and be thanked :)

    martin
     
    Martin DeMello, Jul 10, 2008
    #8
  9. Looks like a race condition in that...

    Joel VanderWerf wrote:
    > require 'thread'
    > require 'rbtree'
    >
    > class PriorityQueue
    > def size
    > @tree.size
    > end
    >
    > def initialize(*)
    > super
    > @tree = MultiRBTree.new
    > @que = Queue.new
    > @mutex = Mutex.new
    > end
    >
    > # Push +obj+ with priority equal to +pri+ if given or, otherwise,
    > # the result of sending #queue_priority to +obj+. Objects are
    > # dequeued in priority order, and first-in-first-out among objects
    > # with equal priorities.
    > def push(obj, pri = obj.queue_priority)
    > @mutex.synchronize do
    > if @que.num_waiting > 0
    > @que << obj
    > else
    > @tree.store(pri, obj)
    > end
    > end
    > end
    >
    > def pop(non_block=false)
    > @mutex.synchronize do
    > if (last=@tree.last)
    > return @tree.delete(last[0]) # highest key, oldest first
    > end
    >
    > if non_block
    > raise ThreadError, "priority queue empty"
    > end
    > end


    ### Race happens here: if someone else calls #push, then
    ### this thread will wait even though data is available.

    > @que.pop # wait
    > end
    > end


    Will try to fix....

    --
    vjoel : Joel VanderWerf : path berkeley edu : 510 665 3407
     
    Joel VanderWerf, Jul 10, 2008
    #9
  10. Joel VanderWerf wrote:
    >
    > Looks like a race condition in that...


    Proposed fix, using a condition var... still needs some eyeballing and
    some tests:

    require 'thread'
    require 'rbtree'

    class PriorityQueue
    def size
    @tree.size
    end

    def initialize(*)
    super
    @tree = MultiRBTree.new
    @que = [] # should never have more than one entry
    @num_waiting = 0
    @mutex = Mutex.new
    @cond = ConditionVariable.new
    end

    # Push +obj+ with priority equal to +pri+ if given or, otherwise,
    # the result of sending #queue_priority to +obj+. Objects are
    # dequeued in priority order, and first-in-first-out among objects
    # with equal priorities.
    def push(obj, pri = obj.queue_priority)
    @mutex.synchronize do
    if @num_waiting > 0
    @que << obj
    @cond.signal
    else
    @tree.store(pri, obj)
    end
    end
    end

    def pop(non_block=false)
    @mutex.synchronize do
    if (last=@tree.last)
    return @tree.delete(last[0]) # highest key, oldest first
    end

    if non_block
    raise ThreadError, "priority queue empty"
    end

    @num_waiting += 1
    @cond.wait(@mutex)
    @num_waiting -= 1
    @que.pop
    end
    end
    end



    --
    vjoel : Joel VanderWerf : path berkeley edu : 510 665 3407
     
    Joel VanderWerf, Jul 10, 2008
    #10
  11. Joel VanderWerf wrote:
    > Joel VanderWerf wrote:
    >>
    >> Looks like a race condition in that...

    >
    > Proposed fix, using a condition var... still needs some eyeballing and
    > some tests:


    That was not quite right either (because cond.signal only wakes the
    waiter, and doesn't schedule it). The following seems to complete
    without deadlocks or starvation.

    require 'thread'
    require 'rbtree'

    class PriorityQueue
    def size
    @tree.size
    end

    def initialize(*)
    super
    @tree = MultiRBTree.new
    @mutex = Mutex.new
    @cond = ConditionVariable.new
    end

    # Push +obj+ with priority equal to +pri+ if given or, otherwise,
    # the result of sending #queue_priority to +obj+. Objects are
    # dequeued in priority order, and first-in-first-out among objects
    # with equal priorities.
    def push(obj, pri = obj.queue_priority)
    @mutex.synchronize do
    @tree.store(pri, obj)
    @cond.signal
    end
    end

    def pop(non_block=false)
    @mutex.synchronize do
    if (last=@tree.last)
    return @tree.delete(last[0]) # highest key, oldest first
    end

    if non_block
    raise ThreadError, "priority queue empty"
    end

    loop do
    @cond.wait(@mutex)
    if (last=@tree.last)
    return @tree.delete(last[0])
    end
    end
    end
    end
    end


    if __FILE__ == $0

    Thread.abort_on_exception = true

    pq = PriorityQueue.new

    n_items_per_thread = 1000
    n_writers = 10
    n_readers = 10

    writers = (0...n_writers).map do |i_thr|
    Thread.new do
    n_items_per_thread.times do |i|
    pri = rand(10)
    pq.push([pri, i, i_thr], pri)
    Thread.pass if rand(5) == 0
    end
    end
    end

    sleep 0.1 until pq.size > 100 # a little head start populating the tree

    results = Array.new(n_readers, 0)

    readers = (0...n_readers).map do |i|
    Thread.new do
    loop do
    pq.pop
    results += 1
    end
    end
    end

    writers.each do |wr|
    wr.join
    end

    p results
    until pq.size == 0
    sleep 0.1
    p results
    end

    raise unless results.inject {|s,x|s+x} == n_items_per_thread * n_writers

    end


    --
    vjoel : Joel VanderWerf : path berkeley edu : 510 665 3407
     
    Joel VanderWerf, Jul 10, 2008
    #11
  12. On Thu, Jul 10, 2008 at 8:03 PM, Joel VanderWerf
    <> wrote:
    > Joel VanderWerf wrote:
    >>
    >> Joel VanderWerf wrote:
    >>>
    >>> Looks like a race condition in that...

    >>
    >> Proposed fix, using a condition var... still needs some eyeballing and
    >> some tests:

    >
    > That was not quite right either (because cond.signal only wakes the waiter,
    > and doesn't schedule it). The following seems to complete without deadlocks
    > or starvation.
    >

    Thanks for taking the time to do this Joel. You can see why I was
    hoping it had already been done... ;)

    I resurrected the Queue code from an old copy of 1.8.4 I had lying
    around and went with that + your original version of the
    PriorityQueue. Still putting it through its paces. I'll give your new
    version a whirl too.

    @Roger - just putting a mutex around all the access methods isn't
    sufficient unfortunately (see Joel's code for evidence). I want the
    calling thread to block if there's nothing in the queue (like the
    standard lib Queue behaves). And once I start putting that machinery
    in, I might as well write the whole thing myself. I was hoping to
    avoid that (trying to be virtuous :)

    Regards,
    Sean
     
    Sean O'Halpin, Jul 12, 2008
    #12
  13. Sean O'Halpin wrote:
    > Thanks for taking the time to do this Joel. You can see why I was
    > hoping it had already been done... ;)


    And as you can see from my other thread, this code breaks on 1.8.6. :(

    --
    vjoel : Joel VanderWerf : path berkeley edu : 510 665 3407
     
    Joel VanderWerf, Jul 14, 2008
    #13
    1. Advertising

Want to reply to this thread or ask your own question?

It takes just 2 minutes to sign up (and it's free!). Just click the sign up button to choose a username and then you can ask your own questions on the forum.
Similar Threads
  1. Russell Warren

    Is Queue.Queue.queue.clear() thread-safe?

    Russell Warren, Jun 22, 2006, in forum: Python
    Replies:
    4
    Views:
    722
    Russell Warren
    Jun 27, 2006
  2. Gabriel Rossetti
    Replies:
    0
    Views:
    1,394
    Gabriel Rossetti
    Aug 29, 2008
  3. Marcel Müller
    Replies:
    3
    Views:
    593
    Marcel Müller
    Apr 27, 2009
  4. Kris
    Replies:
    0
    Views:
    534
  5. Frank Meyer
    Replies:
    3
    Views:
    201
    Frank Meyer
    Aug 21, 2007
Loading...

Share This Page