Managing a fork pool to handle tasks

Discussion in 'Ruby' started by Abdul-rahman Advany, Apr 29, 2008.

  1. Hey guys,

    I am doing some background processes in ruby, and I would like to use
    forks. But I can't figure out how I can manage them.

    - I want to create a pool of child processes (forks) running in the
    background, limited on a number I specify
    - Create them from a parent task but creation of new child processes
    should be blocked untill one of the child processes in the pool is done
    - I want to limit the time a child process can run (so it should quit
    after x seconds)

    I tried to use Threads but somehow when managing a pool of threads
    doesn't work (they get stuck while doing stuff and don't die). I tried
    to use the code below but somehow the pool fill's up and I keep waiting
    for new threads to become available...

    pool = ThreadPool.new(10) # up to 10 threads
    pool.process do
    timeout(4) do
    fetch pages, parse stuff, enc...
    end
    end
    --
    Posted via http://www.ruby-forum.com/.
     
    Abdul-rahman Advany, Apr 29, 2008
    #1
    1. Advertising

  2. Abdul-rahman Advany

    Roger Pack Guest

    google ruby forkoff [?]

    On Tue, Apr 29, 2008 at 3:01 PM, Abdul-rahman Advany
    <> wrote:
    > Hey guys,
    >
    > I am doing some background processes in ruby, and I would like to use
    > forks. But I can't figure out how I can manage them.
    >
    > - I want to create a pool of child processes (forks) running in the
    > background, limited on a number I specify
    > - Create them from a parent task but creation of new child processes
    > should be blocked untill one of the child processes in the pool is done
    > - I want to limit the time a child process can run (so it should quit
    > after x seconds)
    >
    > I tried to use Threads but somehow when managing a pool of threads
    > doesn't work (they get stuck while doing stuff and don't die). I tried
    > to use the code below but somehow the pool fill's up and I keep waiting
    > for new threads to become available...
    >
    > pool = ThreadPool.new(10) # up to 10 threads
    > pool.process do
    > timeout(4) do
    > fetch pages, parse stuff, enc...
    > end
    > end
    > --
    > Posted via http://www.ruby-forum.com/.
    >
    >
     
    Roger Pack, Apr 29, 2008
    #2
    1. Advertising

  3. Roger Pack wrote:
    > google ruby forkoff [?]
    >
    > On Tue, Apr 29, 2008 at 3:01 PM, Abdul-rahman Advany


    I don't have a fixed queue (I use memcache to fill it and use other
    processed to get values from memcache). I don't think forkoff will work
    (and I have a hard time understanding how it works. I only see one call
    fork.. does that fork the thread?
    --
    Posted via http://www.ruby-forum.com/.
     
    Abdul-rahman Advany, Apr 29, 2008
    #3
  4. Abdul-rahman Advany wrote:
    > Roger Pack wrote:
    >> google ruby forkoff [?]
    >>
    >> On Tue, Apr 29, 2008 at 3:01 PM, Abdul-rahman Advany

    >
    > I don't have a fixed queue (I use memcache to fill it and use other
    > processed to get values from memcache). I don't think forkoff will work
    > (and I have a hard time understanding how it works. I only see one call
    > fork.. does that fork the thread?


    Sorry, I didn't know that calling fork makes the thread become a child
    process
    --
    Posted via http://www.ruby-forum.com/.
     
    Abdul-rahman Advany, Apr 29, 2008
    #4
  5. Abdul-rahman Advany

    John Carter Guest

    On Wed, 30 Apr 2008, Abdul-rahman Advany wrote:

    > I tried to use Threads but somehow when managing a pool of threads
    > doesn't work (they get stuck while doing stuff and don't die). I tried
    > to use the code below but somehow the pool fill's up and I keep waiting
    > for new threads to become available...
    >
    > pool = ThreadPool.new(10) # up to 10 threads
    > pool.process do
    > timeout(4) do
    > fetch pages, parse stuff, enc...
    > end
    > end


    Try my nifty MultiThread class. Creates a pool of N worker threads (no
    point in creating much more than you have CPU cores to do the work
    anyway)

    require 'thread'
    Thread.abort_on_exception = true

    class MultiFail < Exception
    attr_reader :queue

    def initialize( _queue)
    @queue = _queue
    end
    end

    class MultiThread
    private

    def do_stuff
    job = @jobs.deq
    while job
    job.call(Thread.current[:index])
    job = @jobs.deq
    end
    rescue Exception => failure
    @failed << failure
    end

    public

    # Spawns a pool of _jobs worker threads
    def initialize( _jobs = 1)
    raise "Insufficient threads to do anything! '#{_jobs}'" if _jobs <= 0
    @jobs = SizedQueue.new( 2 * _jobs)
    @threads = Array.new(_jobs){|i| Thread.new{Thread.current[:index]=i;do_stuff}}
    @failed = Queue.new
    end

    # Run block in one of the threads
    def run(&block)
    raise MultiFail.new(@failed) if @failed.size > 0
    @jobs.enq( block)
    end

    # Wait until all threads are finished doing whatever they're doing.
    def join
    @threads.each{|t| @jobs.enq nil}
    @threads.each{|t| t.join}
    raise MultiFail.new(@failed) if @failed.size > 0
    end
    end

    if $0 == __FILE__ then
    require 'test/unit'

    class TC_MultiThread < Test::Unit::TestCase
    def initialize(test)
    super(test)
    @c = 0
    end

    def wrap(s)
    @c += s
    if @c > 70
    puts
    @c = 0
    end
    end

    def dot(c)
    s = sprintf( '%x< ',c)
    print s
    wrap s.size
    end

    def undot(c)
    s = sprintf( '>%x ',c )
    print s
    wrap s.size
    end

    def try_for(loops,threads)
    puts "Trying [#{loops},#{threads}]"
    i = 0
    k = 0
    max = 0
    mutex = Mutex.new
    multi_thread = MultiThread.new(threads)

    loops.times do |j|
    multi_thread.run do |t|
    dot(t)
    mutex.synchronize do
    i += 1
    end
    sleep 1
    mutex.synchronize do
    assert( i <= threads)
    k +=1
    max = i if i > max
    end
    mutex.synchronize do
    i -= 1
    end
    undot(t)
    end
    end
    multi_thread.join
    assert_equal(0, i)
    assert( ((threads <= 1) || (loops <= 1)) || max > 1)
    assert_equal( loops, k)
    end

    def test_multi
    assert_raises(RuntimeError){ try_for(0,0)}
    try_for(0,1)
    try_for(0,2)
    try_for(1,1)
    try_for(2,1)
    try_for(2,2)
    try_for(2,100)
    try_for(3,1)
    try_for(3,2)
    try_for(3,3)
    try_for(3,100)
    try_for(100,100)
    end

    def test_fail
    multi_thread = MultiThread.new(3)

    multi_thread.run do
    sleep 2
    end

    multi_thread.run do
    raise "This thread failed for test purposes"
    end

    assert_raises( MultiFail) do
    multi_thread.run do
    sleep 2
    end
    end

    begin
    multi_thread.join
    rescue MultiFail => multi_fail
    assert_equal( RuntimeError, multi_fail.queue.pop.class)
    end
    end
    end

    end




    John Carter Phone : (64)(3) 358 6639
    Tait Electronics Fax : (64)(3) 359 4632
    PO Box 1645 Christchurch Email :
    New Zealand
     
    John Carter, Apr 29, 2008
    #5
  6. Abdul-rahman Advany, Apr 29, 2008
    #6
  7. Abdul-rahman Advany

    John Carter Guest

    On Wed, 30 Apr 2008, Abdul-rahman Advany wrote:

    > Your multithread class doesn't catch failures...
    > http://ruby-rails.pl/true-ruby-thread-pool


    Contrariwise.

    It does.

    Of course it's a bit debatable what you want to do with a failure once
    you have caught it.

    Having a exception bubble up the call frames to the top level of a
    generic pool worker thread is not very helpful.

    Having all the tasks complete before you act on a failure is not what
    I wanted either.

    The gotcha is two or more failures can happen before you start
    handling them in the parent thread.

    So what I do is catch failues, and drop them in a list which I check
    before every run / join.

    If there have been any failures I throw them all in a bundle up the
    parent thread.

    That may not be what you want, but it makes sense to me.



    John Carter Phone : (64)(3) 358 6639
    Tait Electronics Fax : (64)(3) 359 4632
    PO Box 1645 Christchurch Email :
    New Zealand
     
    John Carter, Apr 30, 2008
    #7
    1. Advertising

Want to reply to this thread or ask your own question?

It takes just 2 minutes to sign up (and it's free!). Just click the sign up button to choose a username and then you can ask your own questions on the forum.
Similar Threads
  1. Vinc
    Replies:
    0
    Views:
    447
  2. Eric Snow

    os.fork and pty.fork

    Eric Snow, Jan 8, 2009, in forum: Python
    Replies:
    0
    Views:
    602
    Eric Snow
    Jan 8, 2009
  3. Rick Lawson
    Replies:
    8
    Views:
    863
    Graham Dumpleton
    Jul 17, 2009
  4. Luca
    Replies:
    1
    Views:
    347
  5. Michele Dondi

    A problem with fork() and managing processes

    Michele Dondi, Dec 1, 2004, in forum: Perl Misc
    Replies:
    7
    Views:
    151
    Michele Dondi
    Dec 3, 2004
Loading...

Share This Page