Thread-safe priority queue?

T

Trans

Hi,

Does anyone know of a solid, thread-safe priority queue implementation in= Ruby?

The only one I can find is Joel Vanderwerf's
(http://groups.google.com/group/comp.lang.ruby/browse_thread/thread/9d...= )
which doesn't work with more recent versions of ruby (because Queue
implementation changed from Ruby to C).

Hmm... I'm not sure if Facets implementation is thread safe. It's may
be worth a look. If I recall correctly, Olivier Renaud was the last to
work on it, so he may know more. If it isn't thread safe, btw, it
would make a nice patch.

T.
 
S

Sean O'Halpin

Sean I seem to fail to understand why that change should have any
impact on Jo=EBl's work, can you elaborate please?

Cheers
Robert

I didn't explain that very well, did I? Joel's version inherits from
Queue and directly references an instance variable (@waiting) which
isn't there in the C version.
 
S

Sean O'Halpin

Hmm... I'm not sure if Facets implementation is thread safe. It's may
be worth a look. If I recall correctly, Olivier Renaud was the last to
work on it, so he may know more. If it isn't thread safe, btw, it
would make a nice patch.

Thanks for the pointer but the Facets version isn't thread-safe.
Still searching... :)
 
J

Joel VanderWerf

Sean said:
Hi,

Does anyone know of a solid, thread-safe priority queue implementation in Ruby?

The only one I can find is Joel Vanderwerf's
(http://groups.google.com/group/comp.lang.ruby/browse_thread/thread/9d9db98931e4a74f)
which doesn't work with more recent versions of ruby (because Queue
implementation changed from Ruby to C).

It's pretty easy to work around, I think. Try the following code. It's
based on something I'm using in live code and it seems to pass the test
referenced in the above link.

Btw, it's great that RBTree is a gem now. Thanks to whoever did that.



require 'thread'
require 'rbtree'

class PriorityQueue
def size
@tree.size
end

def initialize(*)
super
@tree = MultiRBTree.new
@que = Queue.new
@mutex = Mutex.new
end

# Push +obj+ with priority equal to +pri+ if given or, otherwise,
# the result of sending #queue_priority to +obj+. Objects are
# dequeued in priority order, and first-in-first-out among objects
# with equal priorities.
def push(obj, pri = obj.queue_priority)
@mutex.synchronize do
if @que.num_waiting > 0
@que << obj
else
@tree.store(pri, obj)
end
end
end

def pop(non_block=false)
@mutex.synchronize do
if ([email protected])
return @tree.delete(last[0]) # highest key, oldest first
end

if non_block
raise ThreadError, "priority queue empty"
end
end
@que.pop # wait
end
end
 
J

Joel VanderWerf

Looks like a race condition in that...

Joel said:
require 'thread'
require 'rbtree'

class PriorityQueue
def size
@tree.size
end

def initialize(*)
super
@tree = MultiRBTree.new
@que = Queue.new
@mutex = Mutex.new
end

# Push +obj+ with priority equal to +pri+ if given or, otherwise,
# the result of sending #queue_priority to +obj+. Objects are
# dequeued in priority order, and first-in-first-out among objects
# with equal priorities.
def push(obj, pri = obj.queue_priority)
@mutex.synchronize do
if @que.num_waiting > 0
@que << obj
else
@tree.store(pri, obj)
end
end
end

def pop(non_block=false)
@mutex.synchronize do
if ([email protected])
return @tree.delete(last[0]) # highest key, oldest first
end

if non_block
raise ThreadError, "priority queue empty"
end
end

### Race happens here: if someone else calls #push, then
### this thread will wait even though data is available.
@que.pop # wait
end
end

Will try to fix....
 
J

Joel VanderWerf

Joel said:
Looks like a race condition in that...

Proposed fix, using a condition var... still needs some eyeballing and
some tests:

require 'thread'
require 'rbtree'

class PriorityQueue
def size
@tree.size
end

def initialize(*)
super
@tree = MultiRBTree.new
@que = [] # should never have more than one entry
@num_waiting = 0
@mutex = Mutex.new
@cond = ConditionVariable.new
end

# Push +obj+ with priority equal to +pri+ if given or, otherwise,
# the result of sending #queue_priority to +obj+. Objects are
# dequeued in priority order, and first-in-first-out among objects
# with equal priorities.
def push(obj, pri = obj.queue_priority)
@mutex.synchronize do
if @num_waiting > 0
@que << obj
@cond.signal
else
@tree.store(pri, obj)
end
end
end

def pop(non_block=false)
@mutex.synchronize do
if ([email protected])
return @tree.delete(last[0]) # highest key, oldest first
end

if non_block
raise ThreadError, "priority queue empty"
end

@num_waiting += 1
@cond.wait(@mutex)
@num_waiting -= 1
@que.pop
end
end
end
 
J

Joel VanderWerf

Joel said:
Proposed fix, using a condition var... still needs some eyeballing and
some tests:

That was not quite right either (because cond.signal only wakes the
waiter, and doesn't schedule it). The following seems to complete
without deadlocks or starvation.

require 'thread'
require 'rbtree'

class PriorityQueue
def size
@tree.size
end

def initialize(*)
super
@tree = MultiRBTree.new
@mutex = Mutex.new
@cond = ConditionVariable.new
end

# Push +obj+ with priority equal to +pri+ if given or, otherwise,
# the result of sending #queue_priority to +obj+. Objects are
# dequeued in priority order, and first-in-first-out among objects
# with equal priorities.
def push(obj, pri = obj.queue_priority)
@mutex.synchronize do
@tree.store(pri, obj)
@cond.signal
end
end

def pop(non_block=false)
@mutex.synchronize do
if ([email protected])
return @tree.delete(last[0]) # highest key, oldest first
end

if non_block
raise ThreadError, "priority queue empty"
end

loop do
@cond.wait(@mutex)
if ([email protected])
return @tree.delete(last[0])
end
end
end
end
end


if __FILE__ == $0

Thread.abort_on_exception = true

pq = PriorityQueue.new

n_items_per_thread = 1000
n_writers = 10
n_readers = 10

writers = (0...n_writers).map do |i_thr|
Thread.new do
n_items_per_thread.times do |i|
pri = rand(10)
pq.push([pri, i, i_thr], pri)
Thread.pass if rand(5) == 0
end
end
end

sleep 0.1 until pq.size > 100 # a little head start populating the tree

results = Array.new(n_readers, 0)

readers = (0...n_readers).map do |i|
Thread.new do
loop do
pq.pop
results += 1
end
end
end

writers.each do |wr|
wr.join
end

p results
until pq.size == 0
sleep 0.1
p results
end

raise unless results.inject {|s,x|s+x} == n_items_per_thread * n_writers

end
 
S

Sean O'Halpin

That was not quite right either (because cond.signal only wakes the waiter,
and doesn't schedule it). The following seems to complete without deadlocks
or starvation.
Thanks for taking the time to do this Joel. You can see why I was
hoping it had already been done... ;)

I resurrected the Queue code from an old copy of 1.8.4 I had lying
around and went with that + your original version of the
PriorityQueue. Still putting it through its paces. I'll give your new
version a whirl too.

@Roger - just putting a mutex around all the access methods isn't
sufficient unfortunately (see Joel's code for evidence). I want the
calling thread to block if there's nothing in the queue (like the
standard lib Queue behaves). And once I start putting that machinery
in, I might as well write the whole thing myself. I was hoping to
avoid that (trying to be virtuous :)

Regards,
Sean
 
J

Joel VanderWerf

Sean said:
Thanks for taking the time to do this Joel. You can see why I was
hoping it had already been done... ;)

And as you can see from my other thread, this code breaks on 1.8.6. :(
 

Ask a Question

Want to reply to this thread or ask your own question?

You'll need to choose a username for the site, which only take a couple of moments. After that, you can post your question and our members will help you out.

Ask a Question

Members online

Forum statistics

Threads
473,744
Messages
2,569,482
Members
44,901
Latest member
Noble71S45

Latest Threads

Top