How to run background processes (more than 1 worker) parallely.

D

Deepak Gole

[Note: parts of this message were removed to make it a legal post.]

Hi

My requirement is as follows

1) I have around 200 feeds in the database that I need to parse (fetch
contents) *parallely* after some interval and store those feed items in
database.

2) Now I am using backgroundrb with 10 workers each worker has assigned a
job to parse data from 20 feeds (e.g 1st worker will fecth data from
feeds(1..20), 2nd from feeds(21..30) ..etc.....

3) But backgroundrb is not reliable and it fails after some time. So I have
tried Starling & Workling but those worker doesn't run *parallely.

( I need to run **parallely because those feeds will increase say 1000
feeds. So I can't run them sequentially. ) *
*
Pls I need a help on above problem.*


Thanks
Deepak
 
S

saurabh purnaye

[Note: parts of this message were removed to make it a legal post.]

hi Deepak,
You may go for Starfish <http://rufy.com/starfish/doc/> or
Skynet<http://skynet.rubyforge.org/>,
which are based on google's map-reduce algorithm.
Even you may get more information on the rails cats episodes 127 to 129.

Hi

My requirement is as follows

1) I have around 200 feeds in the database that I need to parse (fetch
contents) *parallely* after some interval and store those feed items in
database.

2) Now I am using backgroundrb with 10 workers each worker has assigned a
job to parse data from 20 feeds (e.g 1st worker will fecth data from
feeds(1..20), 2nd from feeds(21..30) ..etc.....

3) But backgroundrb is not reliable and it fails after some time. So I have
tried Starling & Workling but those worker doesn't run *parallely.

( I need to run **parallely because those feeds will increase say 1000
feeds. So I can't run them sequentially. ) *
*
Pls I need a help on above problem.*


Thanks
Deepak



--
--
Thanks and Regards
Saurabh Purnaye
+91-9922071155
skype: sorab_pune
yahoo & gtalk: saurabh.purnaye
msn: (e-mail address removed)
 
S

Saji N Hameed

Hi Deepak,

As others mentioned, an adaptation of Google Map-Reduce technique
may be of use. To this end, you could you Ruby's Linda. For my needs
I wrote a small script that puts work descriptions on a tuple space.
This is taken up by one or more workers in parallel.

If you write distinct messages that are recognized by workers, you
could probably achieve your parallelism in a few lines without extra
libraries, perhaps except for DRBfire.

I attach it here (i am a novice Ruby programmer, the code may not
be optimal) - hope it helps.

saji

--queue code

require 'thread'
require 'sequel'
require 'rinda/tuplespace'
require 'drb'

ts = Rinda::TupleSpace.new
DRb.start_service("druby://:3130",ts)
puts "Drb server running at #{DRb.uri}"

dbname="sqlite://testQ.db"
db = Sequel.connect(dbname)
pause = 15

loop do
th1 = Thread.new do
job = db[:jobs].filter:)status => "queued").first
submit = job.merge:)status => "submitted")
ts.write [:q1, submit]
db[:jobs].filter(job).update(submit)
end
th2 = Thread.new do
result = ts.take [:rq1,nil,nil]
unless result[1]==nil
p "processing images"
p "finished image processing"
p "update job status in database"
db[:jobs].filter(result[1]).update:)status => "finished")
end
end
sleep(pause)
end
th1.join
th2.join

# connect to database
# create tuplespace
# thread1
# - collect from database
# - put on tuple
# - update db

# thread2
# - check tuple
# - download data
# - update db

---worker code

require 'drb'
require 'rinda/rinda'

DRb.start_service
ro = DRbObject.new_with_uri('druby://localhost:3130')
ts = Rinda::TupleSpaceProxy.new(ro)

def make_mme(job)
"This will be passed to AFS Server: don't worry yet"
p job
end

job = ts.take([:q1,nil])
msg = make_mme(job[1])
ts.write [:rq1,job,0] # write return status to tuplespace

DRb.thread.join

# worker takes job from tuple space (ts.take[:q1,..])
# executes job (make_mme)
# writes message on tuple space (ts.write[:rq1,..])
* Deepak Gole said:
Hi

My requirement is as follows

1) I have around 200 feeds in the database that I need to parse (fetch
contents) *parallely* after some interval and store those feed items in
database.

2) Now I am using backgroundrb with 10 workers each worker has assigned a
job to parse data from 20 feeds (e.g 1st worker will fecth data from
feeds(1..20), 2nd from feeds(21..30) ..etc.....

3) But backgroundrb is not reliable and it fails after some time. So I have
tried Starling & Workling but those worker doesn't run *parallely.

( I need to run **parallely because those feeds will increase say 1000
feeds. So I can't run them sequentially. ) *
*
Pls I need a help on above problem.*


Thanks
Deepak

--
Saji N. Hameed

APEC Climate Center +82 51 668 7470
National Pension Corporation Busan Building 12F
Yeonsan 2-dong, Yeonje-gu, BUSAN 611705 (e-mail address removed)
KOREA
 
R

Robert Klemme

My requirement is as follows

1) I have around 200 feeds in the database that I need to parse (fetch
contents) *parallely* after some interval and store those feed items in
database.

2) Now I am using backgroundrb with 10 workers each worker has assigned a
job to parse data from 20 feeds (e.g 1st worker will fecth data from
feeds(1..20), 2nd from feeds(21..30) ..etc.....

3) But backgroundrb is not reliable and it fails after some time.

Can you be more specific what you really mean by this? How does it fail?
So I have
tried Starling & Workling but those worker doesn't run *parallely.

Maybe you used it not in the proper way. From what I read on the web
site doing work concurrently is all that S+W is about.

Cheers

robert
 
J

Jeff Moore

Deepak said:
Hi

Pls I need a help on above problem.*


Thanks
Deepak

Here's my approach to a similar problem. Still not as polished as I'd
like, but it maybe useful.

The core is the PoolQM class (the CircularBuffer class exists to catch
a limited number of exceptions).

=begin

NAME

class CircularBuffer

DESCRIPTION

A lightweight but (hopefully) thread-safe version of the circular
buffer

Designed primarily for intentionally limited in-memory event/error
logging.

URI



INSTALL



HISTORY

0.1

SYNOPSIS

cb = CircularBuffer.new(50) # create a new CircularBuffer that
holds 50 nil elements
cb << 'fnord' # append an element to the buffer
elements = cb.to_a # return elements as an array with
elements ordered from oldest to newest
cb.clear # force all entires to nil

CAVEATS

The CircularBuffer ignores nil elements and ignores attempts to append
them

2DOs



By Djief

=end

require 'thread'

class CircularBuffer

def initialize(max_size)
@max_size = max_size
@ra = Array.new(@max_size, nil)
@head = 0
@mutex = Mutex.new
end

private

def inc(index)
(index +1) % @max_size
end

public

# set all elements to nil
#
def clear
@mutex.synchronize do
@ra.collect! { |element| element = nil }
end
end

# append a new element to the current 'end'
#
def <<(element)
unless element.nil?
@mutex.synchronize do
@ra[@head]=element
@head = inc(@head)
end
end
end

# return the entire buffer (except nil elements)
# as an array
#
def to_a
index = @head
result = []
@mutex.synchronize do
@max_size.times do
result << @ra[index] unless @ra[index].nil?
index = inc(index)
end
end
result
end

end

=begin

NAME

class PoolQM

DESCRIPTION

PoolQM extends an Array with MonitorMixin to create a queue with
an associated pool of worker threads that wait process any requests
that are added to the queue.

A dispatcher thread watches continuously for enqueued requests and
signals idle worker threads (if any) to dequeue and process the
request(s). If no idle workers exist, the request remains in the
queue until one is available.

During the creation of a new instance of PoolQM, the number of worker
threads is established and the request processing block is defined:

results = Queue.new
NUM_OF_WORKERS = 10
pqm = PoolQM.new(NUM_OF_WORKERS) do |request|
results << "Current request: #{request}" # processing request
here
end

Note that any output you expect to collect from your worker threads
should
be returned via some thread-safe mechanism or container (Queue is a
good
default).

Enqueuing a request is all that is necessary to initiate it's
processing:

pqm.enq("This is a test, this is only a test")

If a request causes an exception within the processing block, the
Exception
is appended to a circular buffer whose contents can be obtained as an
array
with the PoolQM#exceptions method.

If you're intested in logging exceptions, you'll have a bit more work
to
do but replacing the CircularBuffer with a Queue that has it's own
worker
to handle disk IO is probably a good bet.

Performance-wise this approach behaves more consistently than any I've
produced so far i.e. it's both fast and performs with repeatable
uniformity.

No doubt, there's still room for improvement.


URI



INSTALL



HISTORY

0.1 - genesis
0.2 - documentation and clean-up

SYNOPSIS

require 'thread'

results = Queue.new # thread-safe container
for results! <<<<<<<<<< IMPORTANT

NUM_OF_WORKERS = 10

pqm = PoolQM.new(NUM_OF_WORKERS) do |request|
results << "Current request: #{request}" # processing request
here
end

100.times do |index|
pqm.enq("Request number #{index}") # enqueuing requests
here
end

pqm.wait_until_idle # wait for all requests
to be processed

until results.empty? do # dump results
p results.pop
end

pqm.exceptions.each do |exception| # obtain exceptions
array and dump it
p exception
end

CAVEATS



2DOs



By Djief

=end


require 'monitor'

class PoolQM

# default size for the exceptions CircularBuffer
#
DEFAULT_EXCEPTION_BUFFER_SIZE = 10

# Create a new PoolQM with 'worker_count' worker threads to execute
# the associated block
#
def initialize(worker_count = 1)
raise 'block required: { |request| ... }' unless block_given?
@worker_count = worker_count
@request_q = []
@request_q.extend(MonitorMixin)
@request_ready = @request_q.new_cond
@exceptions = CircularBuffer.new(DEFAULT_EXCEPTION_BUFFER_SIZE)
@worker_count.times do
Thread.new do
loop do
request = nil
@request_q.synchronize do
@request_ready.wait
request = @request_q.delete_at(0)
end
begin
yield request
rescue Exception => e
@exceptions << e
end
Thread.pass
end
end
end
@dispatcher = Thread.new do
loop do
@request_q.synchronize do
@request_ready.signal unless @request_q.empty? ||
@request_ready.count_waiters == 0
end
Thread.pass
end
end

end

# enq the request data
#
def enq(request)
@request_q.synchronize do
@request_q << request
end
end

# Wait until all the queued requests have been removed
# from the request_q && then wait until all threads have
# compeleted their processing and are idle
#
def wait_until_idle(wait_resolution=0.3)
q_empty = false
until q_empty
@request_q.synchronize do
q_empty = @request_q.empty?
end
sleep(wait_resolution) unless q_empty
end
all_threads_idle = false
until all_threads_idle
@request_q.synchronize do
all_threads_idle = @request_ready.count_waiters == @worker_count
end
sleep(wait_resolution) unless all_threads_idle
end
end

# create a new exceptions buffer of new_size
#
def exceptions_buffer_size=(new_size)
@exceptions = CircularBuffer.new(new_size)
end

# report the size of the current exceptions buffer
#
def exceptions_buffer_size
@exceptions.size
end

# return the current exceptions buffer as an ordered Array
#
def exceptions
@exceptions.to_a
end

end

if __FILE__ == $0

# the usual trivial example

require 'thread'

# >>>> thread-safe container for result <<<<
#
results = Queue.new

NUM_OF_WORKERS = 10

pqm = PoolQM.new(NUM_OF_WORKERS) do |request|
raise "Dummy Exception during #{request}" if rand(10) == 0 #
simulate random exceptions
results << "Current request: #{request}" # processing request
here
end

100.times do |index|
pqm.enq("Request number #{index}") # enqueuing requests
here
end

# wait for all requests to be processed
pqm.wait_until_idle

# dump results
until results.empty? do
p results.pop
end

# obtain exceptions array and dump it
pqm.exceptions.each do |exception|
p exception
end

end


Regards,

djief
 
D

Deepak Gole

[Note: parts of this message were removed to make it a legal post.]

Hello Robert



On 12.12.2008 14:58, Deepak Gole wrote:

My requirement is as follows

Can you be more specific what you really mean by this? How does it fail?
1,2 days everything works well, But after some time worker just gets hangs I
can see there process ID till active.
But No output. I examine logs also but didn't get anything in log files. I
am not getting a single clue of what went wrong?
So I have

Maybe you used it not in the proper way. From what I read on the web site
doing work concurrently is all that S+W is about.
logs the some o/p and then I ran them from console. When I examined the log
files I got those o/p sequentially

class MyWorker < Workling::Base

def sample_one(options)
5.times do |i|
logger.info "====Hi from 1st worker===============" end
end
end
end

class MySecondWorker < Workling::Base

def sample_twooptions)
5.times do |i|
logger.info "====Hi from 2nd worker===============" end
end
end

end

I got following o/p

====Hi from 1st worker===============
====Hi from 1st worker===============
====Hi from 1st worker===============
====Hi from 1st worker===============
====Hi from 1st worker===============
====Hi from 2nd worker===============
====Hi from 2nd worker===============
====Hi from 2nd worker===============
====Hi from 2nd worker===============
====Hi from 2nd worker===============


I was expecting something like this

====Hi from 1st worker===============
====Hi from 2nd worker===============
====Hi from 1st worker===============
====Hi from 2nd worker===============


Thanks For your help
Deepak
 
R

Robert Klemme

On Sat, Dec 13, 2008 at 10:22 PM, Robert Klemme

1,2 days everything works well, But after some time worker just gets hangs I
can see there process ID till active.
But No output. I examine logs also but didn't get anything in log files. I
am not getting a single clue of what went wrong?

Apparently. Since I don't know the code I cannot really make sense of
what you report. It does seem weird though that apparently you keep
your workers active for several days. Do you actually keep them busy or
do you just keep them around?
logs the some o/p and then I ran them from console. When I examined the log
files I got those o/p sequentially

class MyWorker < Workling::Base

def sample_one(options)
5.times do |i|
logger.info "====Hi from 1st worker===============" end
end
end
end

class MySecondWorker < Workling::Base

def sample_twooptions)
5.times do |i|
logger.info "====Hi from 2nd worker===============" end
end
end

end

I got following o/p

====Hi from 1st worker===============
====Hi from 1st worker===============
====Hi from 1st worker===============
====Hi from 1st worker===============
====Hi from 1st worker===============
====Hi from 2nd worker===============
====Hi from 2nd worker===============
====Hi from 2nd worker===============
====Hi from 2nd worker===============
====Hi from 2nd worker===============


I was expecting something like this

====Hi from 1st worker===============
====Hi from 2nd worker===============
====Hi from 1st worker===============
====Hi from 2nd worker===============

Well, there is no guarantee that messages are actually intermixed as you
expect - especially not with Ruby's green threads - if that's what
Workling is using.

Cheers

robert
 
A

ara.t.howard

Hi

My requirement is as follows

1) I have around 200 feeds in the database that I need to parse (fetch
contents) *parallely* after some interval and store those feed
items in
database.

2) Now I am using backgroundrb with 10 workers each worker has
assigned a
job to parse data from 20 feeds (e.g 1st worker will fecth data from
feeds(1..20), 2nd from feeds(21..30) ..etc.....

3) But backgroundrb is not reliable and it fails after some time. So
I have
tried Starling & Workling but those worker doesn't run *parallely.

( I need to run **parallely because those feeds will increase say 1000
feeds. So I can't run them sequentially. ) *
*
Pls I need a help on above problem.*


Thanks
Deepak


use bj


http://codeforpeople.rubyforge.org/svn/bj/trunk/README


it was written for engine yard and is under heavy use there. the
focus is on simplicity and robustness.

a @ http://codeforpeople.com/
 
H

hemant

Do you have backtrace of any kind? Can you post your worker code?
Which version of BackgrounDRb you are running?
 

Ask a Question

Want to reply to this thread or ask your own question?

You'll need to choose a username for the site, which only take a couple of moments. After that, you can post your question and our members will help you out.

Ask a Question

Members online

Forum statistics

Threads
473,769
Messages
2,569,580
Members
45,054
Latest member
TrimKetoBoost

Latest Threads

Top