Multi Threading

Discussion in 'Ruby' started by Sriram Varahan, Apr 17, 2009.

  1. Hello,

    #*******************STARTCODE

    start_time = Time.now
    $count = 0
    class Queue
    def initialize *s # splat operator allows variable length argument
    list
    @mutex = Mutex.new
    @queue = []
    s.each { |e| @queue.push e }
    end

    def enq v
    @queue.push v
    end

    def deq
    @mutex.synchronize {item = @queue.shift}
    end

    def empty?
    @mutex.synchronize{@queue.length == 0}
    end

    def count
    @mutex.synchronize do
    $count += 1
    end
    end
    end


    #*****Test

    queue = Queue.new
    500.times do |a|
    queue.enq a
    end
    threads = []


    # Create 5 threads which fetch values from the Q.
    5.times do
    threads << Thread.new {
    until queue.empty?
    queue.count
    puts "Thread ID: #{Thread.current}.Job started"
    puts "#{queue.deq}"
    #sleep 0.0001
    puts "Thread ID: #{Thread.current}.Job complete"
    end
    }
    end


    threads.each {|t| t.join }
    puts "Count"
    puts $count
    puts "timeTaken:"
    puts Time.now - start_time

    # *************CODE ENDS******************


    I have five threads which fetch values from a queue. The above code
    works perfectly well in case of a single thread. But the issue arises
    when there are more threads.

    In case of 5 threads the number of times the block is executed is 503
    where it should have been 500.

    I know the reason why?
    The "deq" and "empty?" methods are not synchronized.
    So when the final item is removed from the thread, other threads access
    the empty? method before the @queue.length becomes 0.

    Hence the difference in count.

    If the sleep is activated this problem is solved.

    Any suggestion on how to get this working without the sleep?

    Thanks.
    --
    Posted via http://www.ruby-forum.com/.
    Sriram Varahan, Apr 17, 2009
    #1
    1. Advertising

  2. Sriram Varahan wrote:

    > Hello,
    >
    > #*******************STARTCODE
    >
    > start_time = Time.now
    > $count = 0
    > class Queue
    > def initialize *s # splat operator allows variable length argument
    > list
    > @mutex = Mutex.new
    > @queue = []
    > s.each { |e| @queue.push e }
    > end
    >
    > def enq v
    > @queue.push v
    > end
    >
    > def deq
    > @mutex.synchronize {item = @queue.shift}
    > end
    >
    > def empty?
    > @mutex.synchronize{@queue.length == 0}
    > end
    >
    > def count
    > @mutex.synchronize do
    > $count += 1
    > end
    > end
    > end
    >
    >
    > #*****Test
    >
    > queue = Queue.new
    > 500.times do |a|
    > queue.enq a
    > end
    > threads = []
    >
    >
    > # Create 5 threads which fetch values from the Q.
    > 5.times do
    > threads << Thread.new {
    > until queue.empty?
    > queue.count
    > puts "Thread ID: #{Thread.current}.Job started"
    > puts "#{queue.deq}"
    > #sleep 0.0001
    > puts "Thread ID: #{Thread.current}.Job complete"
    > end
    > }
    > end
    >
    >
    > threads.each {|t| t.join }
    > puts "Count"
    > puts $count
    > puts "timeTaken:"
    > puts Time.now - start_time
    >
    > # *************CODE ENDS******************
    >
    >
    > I have five threads which fetch values from a queue. The above code
    > works perfectly well in case of a single thread. But the issue arises
    > when there are more threads.
    >
    > In case of 5 threads the number of times the block is executed is 503
    > where it should have been 500.
    >
    > I know the reason why?
    > The "deq" and "empty?" methods are not synchronized.
    > So when the final item is removed from the thread, other threads access
    > the empty? method before the @queue.length becomes 0.
    >
    > Hence the difference in count.
    >
    > If the sleep is activated this problem is solved.
    >
    > Any suggestion on how to get this working without the sleep?


    You should also synchronize the enque operation (Queue#enq). Btw, there is
    an existing Queue class that does this thread-safe:

    require 'thread'
    q = Queue.new
    q.push 1
    x = q.pop
    q.pop # => would block the thread until a new element is available

    q2 = SizedQueue.new(10) # bounded queue, which blocks when size > 10

    Regards,

    Michael
    Michael Neumann, Apr 17, 2009
    #2
    1. Advertising

  3. 2009/4/17 Sriram Varahan <>:
    > Hello,
    >
    > #*******************STARTCODE
    >
    > start_time =3D Time.now
    > $count =3D 0
    > class Queue
    > =A0def initialize *s # splat operator allows variable length argument
    > list
    > =A0 =A0@mutex =3D Mutex.new
    > =A0 =A0@queue =3D []
    > =A0 =A0s.each { |e| @queue.push e }
    > =A0end
    >
    > =A0def enq v
    > =A0 =A0 = v
    > =A0end
    >
    > =A0def deq
    > =A0 = {item =3D @queue.shift}
    > =A0end
    >
    > =A0def empty?
    > =A0 ={@queue.length =3D=3D 0}
    > =A0end
    >
    > =A0def count
    > =A0 @mutex.synchronize do
    > =A0 =A0 $count +=3D 1
    > =A0 end
    > =A0end
    > end
    >
    >
    > #*****Test
    >
    > queue =3D Queue.new
    > 500.times do |a|
    > =A0queue.enq a
    > end
    > threads =3D []
    >
    >
    > # Create 5 threads which fetch values from the Q.
    > =A05.times do
    > =A0 =A0threads << Thread.new {
    > =A0 =A0until queue.empty?
    > =A0 =A0 =A0queue.count
    > =A0 =A0 =A0puts "Thread ID: #{Thread.current}.Job =A0started"
    > =A0 =A0 =A0puts "#{queue.deq}"
    > =A0 =A0 =A0#sleep 0.0001
    > =A0 =A0 =A0puts "Thread ID: #{Thread.current}.Job =A0complete"
    > =A0 =A0end
    > =A0 }
    > =A0end
    >
    >
    > threads.each {|t| t.join }
    > puts "Count"
    > puts $count
    > puts "timeTaken:"
    > puts Time.now - start_time
    >
    > # *************CODE ENDS******************
    >
    >
    > I have five threads which fetch values from a queue. The above code
    > works perfectly well in case of a single thread. But the issue arises
    > when there are more threads.
    >
    > In case of 5 threads the number of times the block is executed is 503
    > where it should have been 500.
    >
    > I know the reason why?
    > The "deq" and "empty?" methods are not synchronized.
    > So when the final item is removed from the thread, other threads =A0acces=

    s
    > the empty? method before the @queue.length becomes 0.
    >
    > Hence the difference in count.
    >
    > If the sleep is activated this problem is solved.
    >
    > Any suggestion on how to get this working without the sleep?


    There are several options. You could use MonitorMixin instead of Mutex
    and include it in initialize

    def initialize *s
    # @mutex =3D=3D self so you do not need to change sync code
    @mutex =3D extend MonitorMixin
    @queue =3D s.dup
    end

    Then you can do external synchronization, e.g.

    queue.synchronize do
    if queue.empty?
    # finish
    else
    elm =3D deq
    end
    end

    Much better though is this approach

    require 'thread'

    # use library class
    queue =3D Queue.new

    # _first_ start threads
    # does not really matter but if filling
    # the queue takes time work can
    # begin immediately
    threads =3D (1..5).map do
    label =3D Thread.current.to_s.freeze

    Thread.new queue do |q|
    until ( job =3D q.deq ) =3D=3D :terminate
    puts "Thread ID: #{label}.Job started"
    puts job
    puts "Thread ID: #{label}.Job complete"
    end
    end
    end

    # fill queues
    500.times do |a|
    queue.enq a
    end

    # "close" queues
    threads.size.times { queue.enq :terminate }

    # wait for termination
    threads.each do |th|
    th.join
    end

    Cheers

    robert

    --=20
    remember.guy do |as, often| as.you_can - without end
    http://blog.rubybestpractices.com/
    Robert Klemme, Apr 17, 2009
    #3
  4. On Fri, Apr 17, 2009 at 7:55 AM, Robert Klemme
    <> wrote:
    >
    > Much better though is this approach
    >
    > require 'thread'
    >
    > # use library class
    > queue =3D Queue.new
    >
    > # _first_ start threads
    > # does not really matter but if filling
    > # the queue takes time work can
    > # begin immediately
    > threads =3D (1..5).map do
    > =A0label =3D Thread.current.to_s.freeze
    >
    > =A0Thread.new queue do |q|
    > =A0 =A0until ( job =3D q.deq ) =3D=3D :terminate
    > =A0 =A0 =A0puts "Thread ID: #{label}.Job =A0started"
    > =A0 =A0 =A0puts job
    > =A0 =A0 =A0puts "Thread ID: #{label}.Job =A0complete"
    > =A0 =A0end
    > =A0end
    > end
    >
    > # fill queues
    > 500.times do |a|
    > =A0queue.enq a
    > end
    >
    > # "close" queues
    > threads.size.times { queue.enq :terminate }
    >
    > # wait for termination
    > threads.each do |th|
    > =A0th.join
    > end
    >
    > Cheers
    >
    > robert
    >
    > --
    > remember.guy do |as, often| as.you_can - without end
    > http://blog.rubybestpractices.com/
    >
    >


    Minor nitpick - these lines should be reversed:

    > label =3D Thread.current.to_s.freeze
    > Thread.new queue do |q|


    i.e.

    > Thread.new queue do |q|
    > label =3D Thread.current.to_s.freeze


    Regards,
    Sean
    Sean O'Halpin, Apr 17, 2009
    #4
  5. Hey Robert,

    That was an amazing solution!Thanks a million :)


    Thank you Michael and Sean for your time :)
    --
    Posted via http://www.ruby-forum.com/.
    Sriram Varahan, Apr 17, 2009
    #5
  6. 2009/4/17 Sean O'Halpin <>:

    > Minor nitpick - these lines should be reversed:
    >
    >> =A0label =3D Thread.current.to_s.freeze
    >> =A0Thread.new queue do |q|

    >
    > i.e.
    >
    >> =A0Thread.new queue do |q|
    >> =A0 =A0label =3D Thread.current.to_s.freeze


    Oh yes, absolutely! Apparently I moved the line too high. Thanks for
    catching that gotcha, Sean!

    Kind regards

    robert

    --=20
    remember.guy do |as, often| as.you_can - without end
    http://blog.rubybestpractices.com/
    Robert Klemme, Apr 17, 2009
    #6
    1. Advertising

Want to reply to this thread or ask your own question?

It takes just 2 minutes to sign up (and it's free!). Just click the sign up button to choose a username and then you can ask your own questions on the forum.
Similar Threads
  1. Replies:
    38
    Views:
    1,256
    Dennis Lee Bieber
    Feb 15, 2005
  2. Replies:
    9
    Views:
    1,008
    Mark Space
    Dec 29, 2007
  3. Steven Woody
    Replies:
    0
    Views:
    394
    Steven Woody
    Jan 9, 2009
  4. Steven Woody
    Replies:
    0
    Views:
    434
    Steven Woody
    Jan 9, 2009
  5. akineko
    Replies:
    3
    Views:
    2,597
    Jesse Noller
    Jan 29, 2009
Loading...

Share This Page