I want to use multi-threading + JRuby to achieve better performance.
My problem has a single input file that contains millions of lines. Each
line is the input of a time-consuming computing and will generate an
output result.
What in my mind is:
1. the main thread prepares N working threads, and put them into sleep
=A0 state.
2. the main thread open the input file and pass the file object to each
=A0 thread[:file]
3. the main thread send a "GO" command to all the working threads
Did you ever hear of blocking queues?
4. each thread works in the below loop:
=A0 4.1 lock the mutex associated with the file object, read a line, and
=A0 =A0 =A0 then unlcok. if EOF is encountered, exit from the loop.
=A0 4.2 do the very time-consuming computing
=A0 4.3 put the result to thread.current[:result]
=A0 4.4 signal the main thread to pick up the result
=A0 4.5 go to sleep state (waiting for being waken up by the main thread)
5. the main thread works in the below loop:
=A0 5.1 go to sleep state until being waken up by one of the working
=A0 =A0 =A0 threads.
=A0 5.2 check all the working threads' [:result] and extract them out and
=A0 =A0 =A0 do some aggregation.
=A0 5.3 for those threads whose result has been picked, send a signal to
=A0 =A0 =A0 let them proceed with the next line.
6. once all the working threads finish, the main thread output the
=A0 aggregated result.
This sounds like a very typical application of farmer worker. You
create two queues, one for tasks and one for results. Then you start
a thread which fetches results from the result queue and processes
them. Then you start a number of threads which read from the tasks
queue, process tasks and place results in the result queue. Finally
you use Thread#value to join on the result processor.
# untested
require 'thread'
WORKERS =3D 5
tasks =3D SizedQueue.new WORKERS * 10
results =3D SizedQueue.new WORKERS * 10
agg =3D Thread.new do
state =3D Hash.new 0
th =3D WORKERS
while th > 0
x =3D results.deq
if Thread =3D=3D=3D x
th -=3D 1
else
# aggregate
state[x] +=3D 1
end
end
state
end
workers =3D WORKERS.times.map do
Thread.new do
until (it =3D tasks.deq) =3D=3D tasks
results.enq(process it)
end
results.enq(Thread.current)
end
end
p agg.value
Kind regards
robert
--=20
remember.guy do |as, often| as.you_can - without end
http://blog.rubybestpractices.com/