Managing a fork pool to handle tasks

  • Thread starter Abdul-rahman Advany
  • Start date
A

Abdul-rahman Advany

Hey guys,

I am doing some background processes in ruby, and I would like to use
forks. But I can't figure out how I can manage them.

- I want to create a pool of child processes (forks) running in the
background, limited on a number I specify
- Create them from a parent task but creation of new child processes
should be blocked untill one of the child processes in the pool is done
- I want to limit the time a child process can run (so it should quit
after x seconds)

I tried to use Threads but somehow when managing a pool of threads
doesn't work (they get stuck while doing stuff and don't die). I tried
to use the code below but somehow the pool fill's up and I keep waiting
for new threads to become available...

pool = ThreadPool.new(10) # up to 10 threads
pool.process do
timeout(4) do
fetch pages, parse stuff, enc...
end
end
 
A

Abdul-rahman Advany

Roger said:
google ruby forkoff [?]
I don't have a fixed queue (I use memcache to fill it and use other
processed to get values from memcache). I don't think forkoff will work
(and I have a hard time understanding how it works. I only see one call
fork.. does that fork the thread?
 
A

Abdul-rahman Advany

Abdul-rahman Advany said:
Roger said:
google ruby forkoff [?]
I don't have a fixed queue (I use memcache to fill it and use other
processed to get values from memcache). I don't think forkoff will work
(and I have a hard time understanding how it works. I only see one call
fork.. does that fork the thread?

Sorry, I didn't know that calling fork makes the thread become a child
process
 
J

John Carter

I tried to use Threads but somehow when managing a pool of threads
doesn't work (they get stuck while doing stuff and don't die). I tried
to use the code below but somehow the pool fill's up and I keep waiting
for new threads to become available...

pool = ThreadPool.new(10) # up to 10 threads
pool.process do
timeout(4) do
fetch pages, parse stuff, enc...
end
end

Try my nifty MultiThread class. Creates a pool of N worker threads (no
point in creating much more than you have CPU cores to do the work
anyway)

require 'thread'
Thread.abort_on_exception = true

class MultiFail < Exception
attr_reader :queue

def initialize( _queue)
@Queue = _queue
end
end

class MultiThread
private

def do_stuff
job = @jobs.deq
while job
job.call(Thread.current[:index])
job = @jobs.deq
end
rescue Exception => failure
@failed << failure
end

public

# Spawns a pool of _jobs worker threads
def initialize( _jobs = 1)
raise "Insufficient threads to do anything! '#{_jobs}'" if _jobs <= 0
@jobs = SizedQueue.new( 2 * _jobs)
@threads = Array.new(_jobs){|i| Thread.new{Thread.current[:index]=i;do_stuff}}
@failed = Queue.new
end

# Run block in one of the threads
def run(&block)
raise MultiFail.new(@failed) if @failed.size > 0
@jobs.enq( block)
end

# Wait until all threads are finished doing whatever they're doing.
def join
@threads.each{|t| @jobs.enq nil}
@threads.each{|t| t.join}
raise MultiFail.new(@failed) if @failed.size > 0
end
end

if $0 == __FILE__ then
require 'test/unit'

class TC_MultiThread < Test::Unit::TestCase
def initialize(test)
super(test)
@c = 0
end

def wrap(s)
@c += s
if @c > 70
puts
@c = 0
end
end

def dot(c)
s = sprintf( '%x< ',c)
print s
wrap s.size
end

def undot(c)
s = sprintf( '>%x ',c )
print s
wrap s.size
end

def try_for(loops,threads)
puts "Trying [#{loops},#{threads}]"
i = 0
k = 0
max = 0
mutex = Mutex.new
multi_thread = MultiThread.new(threads)

loops.times do |j|
multi_thread.run do |t|
dot(t)
mutex.synchronize do
i += 1
end
sleep 1
mutex.synchronize do
assert( i <= threads)
k +=1
max = i if i > max
end
mutex.synchronize do
i -= 1
end
undot(t)
end
end
multi_thread.join
assert_equal(0, i)
assert( ((threads <= 1) || (loops <= 1)) || max > 1)
assert_equal( loops, k)
end

def test_multi
assert_raises(RuntimeError){ try_for(0,0)}
try_for(0,1)
try_for(0,2)
try_for(1,1)
try_for(2,1)
try_for(2,2)
try_for(2,100)
try_for(3,1)
try_for(3,2)
try_for(3,3)
try_for(3,100)
try_for(100,100)
end

def test_fail
multi_thread = MultiThread.new(3)

multi_thread.run do
sleep 2
end

multi_thread.run do
raise "This thread failed for test purposes"
end

assert_raises( MultiFail) do
multi_thread.run do
sleep 2
end
end

begin
multi_thread.join
rescue MultiFail => multi_fail
assert_equal( RuntimeError, multi_fail.queue.pop.class)
end
end
end

end




John Carter Phone : (64)(3) 358 6639
Tait Electronics Fax : (64)(3) 359 4632
PO Box 1645 Christchurch Email : (e-mail address removed)
New Zealand
 
J

John Carter

Your multithread class doesn't catch failures...
http://ruby-rails.pl/true-ruby-thread-pool

Contrariwise.

It does.

Of course it's a bit debatable what you want to do with a failure once
you have caught it.

Having a exception bubble up the call frames to the top level of a
generic pool worker thread is not very helpful.

Having all the tasks complete before you act on a failure is not what
I wanted either.

The gotcha is two or more failures can happen before you start
handling them in the parent thread.

So what I do is catch failues, and drop them in a list which I check
before every run / join.

If there have been any failures I throw them all in a bundle up the
parent thread.

That may not be what you want, but it makes sense to me.



John Carter Phone : (64)(3) 358 6639
Tait Electronics Fax : (64)(3) 359 4632
PO Box 1645 Christchurch Email : (e-mail address removed)
New Zealand
 

Ask a Question

Want to reply to this thread or ask your own question?

You'll need to choose a username for the site, which only take a couple of moments. After that, you can post your question and our members will help you out.

Ask a Question

Members online

Forum statistics

Threads
473,755
Messages
2,569,536
Members
45,015
Latest member
AmbrosePal

Latest Threads

Top