[ANN] forkoff - parallel processing for ruby enumerables

A

ara howard

NAME

forkoff

SYNOPSIS

brain-dead simple parallel processing for ruby

URI

http://rubyforge.org/projects/codeforpeople

INSTALL

gem install forkoff

DESCRIPTION

forkoff works for any enumerable object, iterating a code block to
run in a
child process and collecting the results. forkoff can limit the
number of
child processes which is, by default, 8.

SAMPLES

<========< samples/a.rb >========>

~ > cat samples/a.rb

#
# forkoff makes it trivial to do parallel processing with ruby,
the following
# prints out each word in a separate process
#

require 'forkoff'

%w( hey you ).forkoff!{|word| puts "#{ word } from
#{ Process.pid }"}

~ > ruby samples/a.rb

hey from 3239
you from 3240


<========< samples/b.rb >========>

~ > cat samples/b.rb

#
# for example, this takes only 1 second or so to complete
#

require 'forkoff'

a = Time.now.to_f

results =
(0..7).forkoff do |i|

sleep 1

i ** 2

end

b = Time.now.to_f

elapsed = b - a

puts "elapsed: #{ elapsed }"
puts "results: #{ results.inspect }"

~ > ruby samples/b.rb

elapsed: 1.07044386863708
results: [0, 1, 4, 9, 16, 25, 36, 49]


<========< samples/c.rb >========>

~ > cat samples/c.rb

#
# forkoff does *NOT* spawn processes in batches, waiting for each
batch to
# complete. rather, it keeps a certain number of processes busy
until all
# results have been gathered. in otherwords the following will
ensure that 2
# processes are running at all times, until the list is complete.
note that
# the following will take about 2 seconds to run (2 sets of 2 @ 1
second).
#

require 'forkoff'

pid = Process.pid

a = Time.now.to_f

pstrees =
%w( a b c d ).forkoff! :processes => 2 do |letter|
sleep 1
{ letter => ` pstree -l 2 #{ pid } ` }
end


b = Time.now.to_f

puts
puts "pid: #{ pid }"
puts "elapsed: #{ b - a }"
puts

require 'yaml'

pstrees.each do |pstree|
y pstree
end

~ > ruby samples/c.rb


pid: 3254
elapsed: 2.12998485565186

---
a: |
-+- 03254 ahoward ruby -Ilib samples/c.rb
|-+- 03255 ahoward ruby -Ilib samples/c.rb
\-+- 03256 ahoward ruby -Ilib samples/c.rb

---
b: |
-+- 03254 ahoward ruby -Ilib samples/c.rb
|-+- 03255 ahoward ruby -Ilib samples/c.rb
\-+- 03256 ahoward ruby -Ilib samples/c.rb

---
c: |
-+- 03254 ahoward ruby -Ilib samples/c.rb
|-+- 03261 ahoward (ruby)
\-+- 03262 ahoward ruby -Ilib samples/c.rb

---
d: |
-+- 03254 ahoward ruby -Ilib samples/c.rb
|-+- 03261 ahoward ruby -Ilib samples/c.rb
\-+- 03262 ahoward ruby -Ilib samples/c.rb





a @ http://codeforpeople.com/
 
R

Roger Pack

[Note: parts of this message were removed to make it a legal post.]
NAME

forkoff

Nice. Great idea.
# forkoff does *NOT* spawn processes in batches, waiting for each
batch to
# complete. rather, it keeps a certain number of processes busy
until all
# results have been gathered. in otherwords the following will
ensure that 2
# processes are running at all times, until the list is complete.
note that
# the following will take about 2 seconds to run (2 sets of 2 @ 1
second).
#

I assume then that at most 2 processes are forked, and each keeps
working?
 
P

Phillip Gawlowski

-----BEGIN PGP SIGNED MESSAGE-----
Hash: SHA1

ara howard wrote:
|
| NAME
|
| forkoff
|
| SYNOPSIS
|
| brain-dead simple parallel processing for ruby
|
| URI
|
| http://rubyforge.org/projects/codeforpeople
|
| INSTALL
|
| gem install forkoff
|
| DESCRIPTION
|
| forkoff works for any enumerable object, iterating a code block to run
| in a
| child process and collecting the results. forkoff can limit the
| number of
| child processes which is, by default, 8.

So, the tool that captures run away processes and terminates them will
be called 'sodoff', I wager? :p

SCNR

- --
Phillip Gawlowski
Twitter: twitter.com/cynicalryan

You thought I was taking your woman away from you. You're jealous.
You tried to kill me with your bare hands. Would a Kelvan do that?
Would he have to? You're reacting with the emotions of a human.
You are human.
~ -- Kirk, "By Any Other Name," stardate 4657.5
-----BEGIN PGP SIGNATURE-----
Version: GnuPG v1.4.8 (MingW32)
Comment: Using GnuPG with Mozilla - http://enigmail.mozdev.org

iEYEARECAAYFAkgIC/AACgkQbtAgaoJTgL/V1wCeMxLdzlPEbQDtp3fya03PRP8z
O6AAn0BA5yY/MU1dzKYt1Ezd/YbsFakv
=QIF9
-----END PGP SIGNATURE-----
 
P

Phillip Gawlowski

-----BEGIN PGP SIGNED MESSAGE-----
Hash: SHA1

ara.t.howard wrote:
|
| On Apr 17, 2008, at 8:48 PM, Phillip Gawlowski wrote:
|>
|> So, the tool that captures run away processes and terminates them will
|> be called 'sodoff', I wager? :p
|
| oh yeah, that's good - taken!

I want credit. Dollars aren't worth a dime. :p

- --
Phillip Gawlowski
Twitter: twitter.com/cynicalryan

"You speak truth," said Themistocles; "I should never have been famous
if I had been of Seriphus"
~ -- Plutarch (46-120 AD)
~ -- Life of Themistocles
-----BEGIN PGP SIGNATURE-----
Version: GnuPG v1.4.8 (MingW32)
Comment: Using GnuPG with Mozilla - http://enigmail.mozdev.org

iEYEARECAAYFAkgIGAMACgkQbtAgaoJTgL/40QCgpIHgsDVOKQHPfTLEWA05FwLs
73gAn0D6YYgbh0Td+nNcVf6xGMr6ZPGM
=hUj0
-----END PGP SIGNATURE-----
 
P

Piyush Ranjan

[Note: parts of this message were removed to make it a legal post.]

I think this is a great idea!
Kudos

On Fri, Apr 18, 2008 at 9:09 AM, Phillip Gawlowski <
 
M

Martin DeMello

DESCRIPTION

forkoff works for any enumerable object, iterating a code block to run in
a
child process and collecting the results. forkoff can limit the number of
child processes which is, by default, 8.

Very neat indeed!

martin
 
A

ara.t.howard

Since it's using Kernel#fork(), does this mean it is using OS threads?

yes. forkoff has a number of consumer *green* threads used to manage
an array of queues containing the elements destined to be passed to a
forked process/native thread for execution of the block. the code is
very short, give a read.

cheers.

a @ http://codeforpeople.com/
 
E

Erik Veenstra

I've once implemented Enumerable#fork myself. It doesn't use
queues, or a producer-consumer like pattern. It simply tells a
generic ThreadLimiter to spawn a new thread. Within this
thread, a new process is spawned. The number of concurrent
threads, and thus the number of concurrent processes, is
controlled by ThreadLimiter.

We might learn from both implementations.

gegroet,
Erik V. - http://www.erikveen.dds.nl/

----------------------------------------------------------------

Here's my code:

----------------------------------------------------------------

module Enumerable
def fork(max_number_of_threads=nil, &block)
thread_limiter =
EV::ThreadLimiter.new(max_number_of_threads)

collect do |x|
thread_limiter.fork do
Thread.current.abort_on_exception = true

r, w = IO.pipe

if pid = Process.fork
w.close
Process.wait(pid)
data = r.read
r.close
Marshal.load(data)
else
r.close
Marshal.dump(block.call(x), w)
w.close
exit
end
end
end.collect do |t|
t.value
end
end
end

----------------------------------------------------------------

module EV
class ThreadLimiter
def initialize(max_number_of_threads)
@number_of_threads = 0
@max_number_of_threads = max_number_of_threads

yield(self) if block_given?
end

def fork(*args, &block)
Thread.pass while @max_number_of_threads and
@max_number_of_threads > 0 and
@number_of_threads >
@max_number_of_threads

# If this methods is called from several threads, then
# @number_of_threads might get bigger than
@max_number_of_threads.
# This usually a) isn't the case and b) doesn't really matter
(to me...).
# I'm willing to accept this "risk", because a)
Thread.exclusive is
# much, much faster than Mutex#synchronize and b) we can't run
into
# deadlocks.

Thread.exclusive{@number_of_threads += 1}

Thread.fork do
begin
res = block.call(*args)
ensure
Thread.exclusive{@number_of_threads -= 1}
end

res
end
end
end
end

----------------------------------------------------------------

Here's a benchmark:

require "benchmark"

Benchmark.bm(15) do |bm|
rc = nil
r2 = nil
r4 = nil
rx = nil

data = 1..10
test = lambda{|x| 1_000_000.times{7+8}; [x, Process.pid]}

bm.report(" collect "){rc = data.collect(&test)}
bm.report(" 2 processes"){r2 = data.fork(2, &test)}
bm.report(" 4 processes"){r4 = data.fork(4, &test)}
bm.report("inf processes"){rx = data.fork(-1, &test)}

p rc
p r2
p r4
p rx
end

It produces these results on a dual core machine:

user system total real
collect 4.530000 0.000000 4.530000 ( 4.527982)
2 processes 0.030000 0.050000 3.170000 ( 1.733209)
4 processes 0.160000 0.370000 3.610000 ( 1.927826)
inf processes 0.000000 0.000000 3.080000 ( 1.691932)
[[1, 18732], [2, 18732], [3, 18732], [4, 18732], [5, 18732], [6,
18732], [7, 18732], [8, 18732], [9, 18732], [10, 18732]]
[[1, 18733], [2, 18734], [3, 18735], [4, 18736], [5, 18737], [6,
18738], [7, 18739], [8, 18740], [9, 18741], [10, 18742]]
[[1, 18743], [2, 18744], [3, 18745], [4, 18746], [5, 18747], [6,
18748], [7, 18749], [8, 18750], [9, 18751], [10, 18752]]
[[1, 18753], [2, 18754], [3, 18755], [4, 18756], [5, 18757], [6,
18758], [7, 18759], [8, 18760], [9, 18761], [10, 18762]]

----------------------------------------------------------------
 
E

Erik Veenstra

Just a word of warning: The construction "Thread.new(i){|i|" is
useless, by definition. Just like "i=i" is useless too.

If i isn't defined outside the loop, you don't have to pas i to
the thread, so "Thread.new{" will do. However, if i is defined
outside the loop (which it isn't, in your code...),
"Thread.new(i){|i|" won't work (see below): It's better to use
"Thread.new(i1){|i2|" instead.

gegroet,
Erik V. - http://www.erikveen.dds.nl/

----------------------------------------------------------------

a = (1..10).to_a
a1 = a.map{|i| Thread.new { sleep 0.01 ; i }}.map{|t|
t.value}
a2 = a.map{|i| Thread.new(i) {|i| sleep 0.01 ; i }}.map{|t|
t.value} # Will do.
i = nil
a3 = a.map{|i| Thread.new(i) {|i| sleep 0.01 ; i }}.map{|t|
t.value} # Won't do!
a4 = a.map{|i1| Thread.new(i1){|i2|sleep 0.01 ; i2}}.map{|t|
t.value}

p a1 # ==> [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
p a2 # ==> [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
p a3 # ==> [10, 10, 10, 10, 10, 10, 10, 10, 10, 10]
p a4 # ==> [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

----------------------------------------------------------------
 
A

ara.t.howard

I've once implemented Enumerable#fork myself. It doesn't use
queues, or a producer-consumer like pattern. It simply tells a
generic ThreadLimiter to spawn a new thread. Within this
thread, a new process is spawned. The number of concurrent
threads, and thus the number of concurrent processes, is
controlled by ThreadLimiter.

We might learn from both implementations.

this is precisely how forkoff manages the number of processes, only
it's via Queues and a fixed number of threads consuming from those
queues

the latest impl from svn:

http://p.ramaze.net/1141

regards.

a @ http://codeforpeople.com/
 
A

ara.t.howard

If i isn't defined outside the loop, you don't have to pas i to
the thread, so "Thread.new{" will do. However, if i is defined
outside the loop (which it isn't, in your code...),
"Thread.new(i){|i|" won't work (see below): It's better to use
"Thread.new(i1){|i2|" instead.

indeed - left over from a previous iteration.

a @ http://codeforpeople.com/
 
F

Fredrik

This is really nice! I would prefer to change this syntax

%w( a b c d ).forkoff! :processes => 2 do |letter|

for this syntax

%w( a b c d ).forkoff! 2 do |letter|

though. Then I wouldn't have to remember that 'processes' keyword.
Anyhow, it is a great piece of code.

/Fredrik
 
A

ara.t.howard

This is really nice! I would prefer to change this syntax

%w( a b c d ).forkoff! :processes => 2 do |letter|

for this syntax

%w( a b c d ).forkoff! 2 do |letter|

though. Then I wouldn't have to remember that 'processes' keyword.
Anyhow, it is a great piece of code.


the next release supports either an hash (options) or numeric argument
- so either will work - may release tonight...

cheers.

a @ http://codeforpeople.com/
 
A

ara.t.howard

My example code:

results =3D []
run =3D 0
runs =3D 2 ** fsm_inputs.size
(0...runs).forkoff do |vector|
results[vector] =3D by_input_sets vector
# snip some run-based stats
run +=3D 1
end

This, obviously, doesn=92t work (i.e., results is an empty array at = the
end and run is 0 in every iteration). I can get the results by making
the block return the by_input_sets call=92s result, but I still lose = the
run-based stats.

It seems a singleton-based approach would work (I=92d create a = singleton
object outside of the loop and have the results array and run counter
be its properties), but maybe there is an easier way?

to do this you'll want to combine forkoff with my slave lib: which =20
sets up an object which is fronted by drb, an which can indeed be a =20
singleton - note that this object is, itself, running in a child =20
process, but you can ignore this for the most part. an simple example:

cfp:~ > cat a.rb
require 'rubygems'
require 'slave'
require 'forkoff'

slave =3D Slave.new:)threadsafe =3D> true){ Hash.new }
process_global =3D slave.object

( 0 .. 4 ).each do |i|
process_global =3D i ** 2
end

process_global.each do |k,v|
p k =3D> v
end


cfp:~ > ruby a.rb
{0=3D>0}
{1=3D>1}
{2=3D>4}
{3=3D>9}
{4=3D>16}


even with these abstractions you have to consider deeply what's =20
happening with threads/processes etc - but yes, it's definitely =20
possible with little code.

cheers.

a @ http://codeforpeople.com/
 
A

Abdul-rahman Advany

Checking out the source I only see pid = fork (this is a call to
Thread.new isn't it?), I don't see that real fork (kernel) is used... or
I am wrong?
 
A

Abdul-rahman Advany

Abdul-rahman Advany said:
Checking out the source I only see pid = fork (this is a call to
Thread.new isn't it?), I don't see that real fork (kernel) is used... or
I am wrong?

Sorry, just figured out calling fork makes the thread a child process...
just need to find out how to set a timeout on the thread... anyone
suggestions?
 
A

ara.t.howard

Sorry, just figured out calling fork makes the thread a child
process...
just need to find out how to set a timeout on the thread... anyone
suggestions?

require 'timeout'

begin
Timeout.timeout(seconds){ thread.join }
rescue Timeout::Error
thread.kill rescue nil
end

a @ http://codeforpeople.com/
 

Ask a Question

Want to reply to this thread or ask your own question?

You'll need to choose a username for the site, which only take a couple of moments. After that, you can post your question and our members will help you out.

Ask a Question

Similar Threads

[ANN] forkoff-0.0.4 0
[ANN] forkoff-0.0.1 0
[ANN] forkoff-1.1.0 0
[ANN] terminator-0.4.2 0
[ANN] Terminator 0.4.4 0
[ANN] slave-1.2.1 1
[ANN] slave-1.1.0 11
[ANN] state-0.4.2 0

Members online

No members online now.

Forum statistics

Threads
473,769
Messages
2,569,580
Members
45,054
Latest member
TrimKetoBoost

Latest Threads

Top