How to run background processes (more than 1 worker) parallely.

Discussion in 'Ruby' started by Deepak Gole, Dec 12, 2008.

  1. Deepak Gole

    Deepak Gole Guest

    [Note: parts of this message were removed to make it a legal post.]

    Hi

    My requirement is as follows

    1) I have around 200 feeds in the database that I need to parse (fetch
    contents) *parallely* after some interval and store those feed items in
    database.

    2) Now I am using backgroundrb with 10 workers each worker has assigned a
    job to parse data from 20 feeds (e.g 1st worker will fecth data from
    feeds(1..20), 2nd from feeds(21..30) ..etc.....

    3) But backgroundrb is not reliable and it fails after some time. So I have
    tried Starling & Workling but those worker doesn't run *parallely.

    ( I need to run **parallely because those feeds will increase say 1000
    feeds. So I can't run them sequentially. ) *
    *
    Pls I need a help on above problem.*


    Thanks
    Deepak
    Deepak Gole, Dec 12, 2008
    #1
    1. Advertising

  2. Deepak Gole

    Chris Lowis Guest

    Chris Lowis, Dec 12, 2008
    #2
    1. Advertising

  3. [Note: parts of this message were removed to make it a legal post.]

    hi Deepak,
    You may go for Starfish <http://rufy.com/starfish/doc/> or
    Skynet<http://skynet.rubyforge.org/>,
    which are based on google's map-reduce algorithm.
    Even you may get more information on the rails cats episodes 127 to 129.

    On Fri, Dec 12, 2008 at 7:28 PM, Deepak Gole <> wrote:

    > Hi
    >
    > My requirement is as follows
    >
    > 1) I have around 200 feeds in the database that I need to parse (fetch
    > contents) *parallely* after some interval and store those feed items in
    > database.
    >
    > 2) Now I am using backgroundrb with 10 workers each worker has assigned a
    > job to parse data from 20 feeds (e.g 1st worker will fecth data from
    > feeds(1..20), 2nd from feeds(21..30) ..etc.....
    >
    > 3) But backgroundrb is not reliable and it fails after some time. So I have
    > tried Starling & Workling but those worker doesn't run *parallely.
    >
    > ( I need to run **parallely because those feeds will increase say 1000
    > feeds. So I can't run them sequentially. ) *
    > *
    > Pls I need a help on above problem.*
    >
    >
    > Thanks
    > Deepak
    >




    --
    --
    Thanks and Regards
    Saurabh Purnaye
    +91-9922071155
    skype: sorab_pune
    yahoo & gtalk: saurabh.purnaye
    msn:
    saurabh purnaye, Dec 12, 2008
    #3
  4. Hi Deepak,

    As others mentioned, an adaptation of Google Map-Reduce technique
    may be of use. To this end, you could you Ruby's Linda. For my needs
    I wrote a small script that puts work descriptions on a tuple space.
    This is taken up by one or more workers in parallel.

    If you write distinct messages that are recognized by workers, you
    could probably achieve your parallelism in a few lines without extra
    libraries, perhaps except for DRBfire.

    I attach it here (i am a novice Ruby programmer, the code may not
    be optimal) - hope it helps.

    saji

    --queue code

    require 'thread'
    require 'sequel'
    require 'rinda/tuplespace'
    require 'drb'

    ts = Rinda::TupleSpace.new
    DRb.start_service("druby://:3130",ts)
    puts "Drb server running at #{DRb.uri}"

    dbname="sqlite://testQ.db"
    db = Sequel.connect(dbname)
    pause = 15

    loop do
    th1 = Thread.new do
    job = db[:jobs].filter:)status => "queued").first
    submit = job.merge:)status => "submitted")
    ts.write [:q1, submit]
    db[:jobs].filter(job).update(submit)
    end
    th2 = Thread.new do
    result = ts.take [:rq1,nil,nil]
    unless result[1]==nil
    p "processing images"
    p "finished image processing"
    p "update job status in database"
    db[:jobs].filter(result[1]).update:)status => "finished")
    end
    end
    sleep(pause)
    end
    th1.join
    th2.join

    # connect to database
    # create tuplespace
    # thread1
    # - collect from database
    # - put on tuple
    # - update db

    # thread2
    # - check tuple
    # - download data
    # - update db

    ---worker code

    require 'drb'
    require 'rinda/rinda'

    DRb.start_service
    ro = DRbObject.new_with_uri('druby://localhost:3130')
    ts = Rinda::TupleSpaceProxy.new(ro)

    def make_mme(job)
    "This will be passed to AFS Server: don't worry yet"
    p job
    end

    job = ts.take([:q1,nil])
    msg = make_mme(job[1])
    ts.write [:rq1,job,0] # write return status to tuplespace

    DRb.thread.join

    # worker takes job from tuple space (ts.take[:q1,..])
    # executes job (make_mme)
    # writes message on tuple space (ts.write[:rq1,..])
    * Deepak Gole <> [2008-12-12 22:58:58 +0900]:

    > Hi
    >
    > My requirement is as follows
    >
    > 1) I have around 200 feeds in the database that I need to parse (fetch
    > contents) *parallely* after some interval and store those feed items in
    > database.
    >
    > 2) Now I am using backgroundrb with 10 workers each worker has assigned a
    > job to parse data from 20 feeds (e.g 1st worker will fecth data from
    > feeds(1..20), 2nd from feeds(21..30) ..etc.....
    >
    > 3) But backgroundrb is not reliable and it fails after some time. So I have
    > tried Starling & Workling but those worker doesn't run *parallely.
    >
    > ( I need to run **parallely because those feeds will increase say 1000
    > feeds. So I can't run them sequentially. ) *
    > *
    > Pls I need a help on above problem.*
    >
    >
    > Thanks
    > Deepak


    --
    Saji N. Hameed

    APEC Climate Center +82 51 668 7470
    National Pension Corporation Busan Building 12F
    Yeonsan 2-dong, Yeonje-gu, BUSAN 611705
    KOREA
    Saji N Hameed, Dec 13, 2008
    #4
  5. On 12.12.2008 14:58, Deepak Gole wrote:

    > My requirement is as follows
    >
    > 1) I have around 200 feeds in the database that I need to parse (fetch
    > contents) *parallely* after some interval and store those feed items in
    > database.
    >
    > 2) Now I am using backgroundrb with 10 workers each worker has assigned a
    > job to parse data from 20 feeds (e.g 1st worker will fecth data from
    > feeds(1..20), 2nd from feeds(21..30) ..etc.....
    >
    > 3) But backgroundrb is not reliable and it fails after some time.


    Can you be more specific what you really mean by this? How does it fail?

    > So I have
    > tried Starling & Workling but those worker doesn't run *parallely.


    Maybe you used it not in the proper way. From what I read on the web
    site doing work concurrently is all that S+W is about.

    Cheers

    robert
    Robert Klemme, Dec 13, 2008
    #5
  6. Deepak Gole

    Jeff Moore Guest

    Re: How to run background processes (more than 1 worker) par

    Deepak Gole wrote:
    > Hi
    >
    > Pls I need a help on above problem.*
    >
    >
    > Thanks
    > Deepak


    Here's my approach to a similar problem. Still not as polished as I'd
    like, but it maybe useful.

    The core is the PoolQM class (the CircularBuffer class exists to catch
    a limited number of exceptions).

    =begin

    NAME

    class CircularBuffer

    DESCRIPTION

    A lightweight but (hopefully) thread-safe version of the circular
    buffer

    Designed primarily for intentionally limited in-memory event/error
    logging.

    URI



    INSTALL



    HISTORY

    0.1

    SYNOPSIS

    cb = CircularBuffer.new(50) # create a new CircularBuffer that
    holds 50 nil elements
    cb << 'fnord' # append an element to the buffer
    elements = cb.to_a # return elements as an array with
    elements ordered from oldest to newest
    cb.clear # force all entires to nil

    CAVEATS

    The CircularBuffer ignores nil elements and ignores attempts to append
    them

    2DOs



    By Djief

    =end

    require 'thread'

    class CircularBuffer

    def initialize(max_size)
    @max_size = max_size
    @ra = Array.new(@max_size, nil)
    @head = 0
    @mutex = Mutex.new
    end

    private

    def inc(index)
    (index +1) % @max_size
    end

    public

    # set all elements to nil
    #
    def clear
    @mutex.synchronize do
    @ra.collect! { |element| element = nil }
    end
    end

    # append a new element to the current 'end'
    #
    def <<(element)
    unless element.nil?
    @mutex.synchronize do
    @ra[@head]=element
    @head = inc(@head)
    end
    end
    end

    # return the entire buffer (except nil elements)
    # as an array
    #
    def to_a
    index = @head
    result = []
    @mutex.synchronize do
    @max_size.times do
    result << @ra[index] unless @ra[index].nil?
    index = inc(index)
    end
    end
    result
    end

    end

    =begin

    NAME

    class PoolQM

    DESCRIPTION

    PoolQM extends an Array with MonitorMixin to create a queue with
    an associated pool of worker threads that wait process any requests
    that are added to the queue.

    A dispatcher thread watches continuously for enqueued requests and
    signals idle worker threads (if any) to dequeue and process the
    request(s). If no idle workers exist, the request remains in the
    queue until one is available.

    During the creation of a new instance of PoolQM, the number of worker
    threads is established and the request processing block is defined:

    results = Queue.new
    NUM_OF_WORKERS = 10
    pqm = PoolQM.new(NUM_OF_WORKERS) do |request|
    results << "Current request: #{request}" # processing request
    here
    end

    Note that any output you expect to collect from your worker threads
    should
    be returned via some thread-safe mechanism or container (Queue is a
    good
    default).

    Enqueuing a request is all that is necessary to initiate it's
    processing:

    pqm.enq("This is a test, this is only a test")

    If a request causes an exception within the processing block, the
    Exception
    is appended to a circular buffer whose contents can be obtained as an
    array
    with the PoolQM#exceptions method.

    If you're intested in logging exceptions, you'll have a bit more work
    to
    do but replacing the CircularBuffer with a Queue that has it's own
    worker
    to handle disk IO is probably a good bet.

    Performance-wise this approach behaves more consistently than any I've
    produced so far i.e. it's both fast and performs with repeatable
    uniformity.

    No doubt, there's still room for improvement.


    URI



    INSTALL



    HISTORY

    0.1 - genesis
    0.2 - documentation and clean-up

    SYNOPSIS

    require 'thread'

    results = Queue.new # thread-safe container
    for results! <<<<<<<<<< IMPORTANT

    NUM_OF_WORKERS = 10

    pqm = PoolQM.new(NUM_OF_WORKERS) do |request|
    results << "Current request: #{request}" # processing request
    here
    end

    100.times do |index|
    pqm.enq("Request number #{index}") # enqueuing requests
    here
    end

    pqm.wait_until_idle # wait for all requests
    to be processed

    until results.empty? do # dump results
    p results.pop
    end

    pqm.exceptions.each do |exception| # obtain exceptions
    array and dump it
    p exception
    end

    CAVEATS



    2DOs



    By Djief

    =end


    require 'monitor'

    class PoolQM

    # default size for the exceptions CircularBuffer
    #
    DEFAULT_EXCEPTION_BUFFER_SIZE = 10

    # Create a new PoolQM with 'worker_count' worker threads to execute
    # the associated block
    #
    def initialize(worker_count = 1)
    raise 'block required: { |request| ... }' unless block_given?
    @worker_count = worker_count
    @request_q = []
    @request_q.extend(MonitorMixin)
    @request_ready = @request_q.new_cond
    @exceptions = CircularBuffer.new(DEFAULT_EXCEPTION_BUFFER_SIZE)
    @worker_count.times do
    Thread.new do
    loop do
    request = nil
    @request_q.synchronize do
    @request_ready.wait
    request = @request_q.delete_at(0)
    end
    begin
    yield request
    rescue Exception => e
    @exceptions << e
    end
    Thread.pass
    end
    end
    end
    @dispatcher = Thread.new do
    loop do
    @request_q.synchronize do
    @request_ready.signal unless @request_q.empty? ||
    @request_ready.count_waiters == 0
    end
    Thread.pass
    end
    end

    end

    # enq the request data
    #
    def enq(request)
    @request_q.synchronize do
    @request_q << request
    end
    end

    # Wait until all the queued requests have been removed
    # from the request_q && then wait until all threads have
    # compeleted their processing and are idle
    #
    def wait_until_idle(wait_resolution=0.3)
    q_empty = false
    until q_empty
    @request_q.synchronize do
    q_empty = @request_q.empty?
    end
    sleep(wait_resolution) unless q_empty
    end
    all_threads_idle = false
    until all_threads_idle
    @request_q.synchronize do
    all_threads_idle = @request_ready.count_waiters == @worker_count
    end
    sleep(wait_resolution) unless all_threads_idle
    end
    end

    # create a new exceptions buffer of new_size
    #
    def exceptions_buffer_size=(new_size)
    @exceptions = CircularBuffer.new(new_size)
    end

    # report the size of the current exceptions buffer
    #
    def exceptions_buffer_size
    @exceptions.size
    end

    # return the current exceptions buffer as an ordered Array
    #
    def exceptions
    @exceptions.to_a
    end

    end

    if __FILE__ == $0

    # the usual trivial example

    require 'thread'

    # >>>> thread-safe container for result <<<<
    #
    results = Queue.new

    NUM_OF_WORKERS = 10

    pqm = PoolQM.new(NUM_OF_WORKERS) do |request|
    raise "Dummy Exception during #{request}" if rand(10) == 0 #
    simulate random exceptions
    results << "Current request: #{request}" # processing request
    here
    end

    100.times do |index|
    pqm.enq("Request number #{index}") # enqueuing requests
    here
    end

    # wait for all requests to be processed
    pqm.wait_until_idle

    # dump results
    until results.empty? do
    p results.pop
    end

    # obtain exceptions array and dump it
    pqm.exceptions.each do |exception|
    p exception
    end

    end


    Regards,

    djief
    --
    Posted via http://www.ruby-forum.com/.
    Jeff Moore, Dec 13, 2008
    #6
  7. Deepak Gole

    Deepak Gole Guest

    [Note: parts of this message were removed to make it a legal post.]

    Hello Robert



    On Sat, Dec 13, 2008 at 10:22 PM, Robert Klemme
    <>wrote:

    > On 12.12.2008 14:58, Deepak Gole wrote:
    >
    > My requirement is as follows
    >>
    >> 1) I have around 200 feeds in the database that I need to parse (fetch
    >> contents) *parallely* after some interval and store those feed items in
    >> database.
    >>
    >> 2) Now I am using backgroundrb with 10 workers each worker has assigned a
    >> job to parse data from 20 feeds (e.g 1st worker will fecth data from
    >> feeds(1..20), 2nd from feeds(21..30) ..etc.....
    >>
    >> 3) But backgroundrb is not reliable and it fails after some time.
    >>

    >
    > Can you be more specific what you really mean by this? How does it fail?
    >


    >>> Well when I start the backgroundrb processes then for next few or next

    1,2 days everything works well, But after some time worker just gets hangs I
    can see there process ID till active.
    But No output. I examine logs also but didn't get anything in log files. I
    am not getting a single clue of what went wrong?

    >
    >
    > So I have
    >> tried Starling & Workling but those worker doesn't run *parallely.
    >>

    >
    > Maybe you used it not in the proper way. From what I read on the web site
    > doing work concurrently is all that S+W is about.
    >
    >>> I have created 2 workling workers each worker has one method which just

    logs the some o/p and then I ran them from console. When I examined the log
    files I got those o/p sequentially

    class MyWorker < Workling::Base

    def sample_one(options)
    5.times do |i|
    logger.info "====Hi from 1st worker===============" end
    end
    end
    end

    class MySecondWorker < Workling::Base

    def sample_twooptions)
    5.times do |i|
    logger.info "====Hi from 2nd worker===============" end
    end
    end

    end

    I got following o/p

    ====Hi from 1st worker===============
    ====Hi from 1st worker===============
    ====Hi from 1st worker===============
    ====Hi from 1st worker===============
    ====Hi from 1st worker===============
    ====Hi from 2nd worker===============
    ====Hi from 2nd worker===============
    ====Hi from 2nd worker===============
    ====Hi from 2nd worker===============
    ====Hi from 2nd worker===============


    I was expecting something like this

    ====Hi from 1st worker===============
    ====Hi from 2nd worker===============
    ====Hi from 1st worker===============
    ====Hi from 2nd worker===============

    > ........



    Thanks For your help
    Deepak
    Deepak Gole, Dec 15, 2008
    #7
  8. On 15.12.2008 14:28, Deepak Gole wrote:

    > On Sat, Dec 13, 2008 at 10:22 PM, Robert Klemme
    > <>wrote:


    >> Can you be more specific what you really mean by this? How does it fail?

    >
    >>>> Well when I start the backgroundrb processes then for next few or next

    > 1,2 days everything works well, But after some time worker just gets hangs I
    > can see there process ID till active.
    > But No output. I examine logs also but didn't get anything in log files. I
    > am not getting a single clue of what went wrong?


    Apparently. Since I don't know the code I cannot really make sense of
    what you report. It does seem weird though that apparently you keep
    your workers active for several days. Do you actually keep them busy or
    do you just keep them around?

    >> So I have
    >>> tried Starling & Workling but those worker doesn't run *parallely.
    >>>

    >> Maybe you used it not in the proper way. From what I read on the web site
    >> doing work concurrently is all that S+W is about.
    >>
    >>>> I have created 2 workling workers each worker has one method which just

    > logs the some o/p and then I ran them from console. When I examined the log
    > files I got those o/p sequentially
    >
    > class MyWorker < Workling::Base
    >
    > def sample_one(options)
    > 5.times do |i|
    > logger.info "====Hi from 1st worker===============" end
    > end
    > end
    > end
    >
    > class MySecondWorker < Workling::Base
    >
    > def sample_twooptions)
    > 5.times do |i|
    > logger.info "====Hi from 2nd worker===============" end
    > end
    > end
    >
    > end
    >
    > I got following o/p
    >
    > ====Hi from 1st worker===============
    > ====Hi from 1st worker===============
    > ====Hi from 1st worker===============
    > ====Hi from 1st worker===============
    > ====Hi from 1st worker===============
    > ====Hi from 2nd worker===============
    > ====Hi from 2nd worker===============
    > ====Hi from 2nd worker===============
    > ====Hi from 2nd worker===============
    > ====Hi from 2nd worker===============
    >
    >
    > I was expecting something like this
    >
    > ====Hi from 1st worker===============
    > ====Hi from 2nd worker===============
    > ====Hi from 1st worker===============
    > ====Hi from 2nd worker===============


    Well, there is no guarantee that messages are actually intermixed as you
    expect - especially not with Ruby's green threads - if that's what
    Workling is using.

    Cheers

    robert
    Robert Klemme, Dec 15, 2008
    #8
  9. Deepak Gole

    ara.t.howard Guest

    On Dec 12, 2008, at 6:58 AM, Deepak Gole wrote:

    > Hi
    >
    > My requirement is as follows
    >
    > 1) I have around 200 feeds in the database that I need to parse (fetch
    > contents) *parallely* after some interval and store those feed
    > items in
    > database.
    >
    > 2) Now I am using backgroundrb with 10 workers each worker has
    > assigned a
    > job to parse data from 20 feeds (e.g 1st worker will fecth data from
    > feeds(1..20), 2nd from feeds(21..30) ..etc.....
    >
    > 3) But backgroundrb is not reliable and it fails after some time. So
    > I have
    > tried Starling & Workling but those worker doesn't run *parallely.
    >
    > ( I need to run **parallely because those feeds will increase say 1000
    > feeds. So I can't run them sequentially. ) *
    > *
    > Pls I need a help on above problem.*
    >
    >
    > Thanks
    > Deepak



    use bj


    http://codeforpeople.rubyforge.org/svn/bj/trunk/README


    it was written for engine yard and is under heavy use there. the
    focus is on simplicity and robustness.

    a @ http://codeforpeople.com/
    --
    we can deny everything, except that we have the possibility of being
    better. simply reflect on that.
    h.h. the 14th dalai lama
    ara.t.howard, Dec 16, 2008
    #9
  10. Deepak Gole

    hemant Guest

    On Tue, Dec 16, 2008 at 10:15 PM, ara.t.howard <> wrote:
    >
    > On Dec 12, 2008, at 6:58 AM, Deepak Gole wrote:
    >
    >> Hi
    >>
    >> My requirement is as follows
    >>
    >> 1) I have around 200 feeds in the database that I need to parse (fetch
    >> contents) *parallely* after some interval and store those feed items in
    >> database.
    >>
    >> 2) Now I am using backgroundrb with 10 workers each worker has assigned a
    >> job to parse data from 20 feeds (e.g 1st worker will fecth data from
    >> feeds(1..20), 2nd from feeds(21..30) ..etc.....
    >>
    >> 3) But backgroundrb is not reliable and it fails after some time. So I
    >> have
    >> tried Starling & Workling but those worker doesn't run *parallely.
    >>
    >> ( I need to run **parallely because those feeds will increase say 1000
    >> feeds. So I can't run them sequentially. ) *


    Do you have backtrace of any kind? Can you post your worker code?
    Which version of BackgrounDRb you are running?
    hemant, Dec 17, 2008
    #10
    1. Advertising

Want to reply to this thread or ask your own question?

It takes just 2 minutes to sign up (and it's free!). Just click the sign up button to choose a username and then you can ask your own questions on the forum.
Similar Threads
  1. alex
    Replies:
    1
    Views:
    622
    Lau Lei Cheong
    Feb 4, 2005
  2. cdrsir
    Replies:
    2
    Views:
    303
    kwikius
    May 19, 2006
  3. =?Utf-8?B?Sm9obiBCYWlsZXk=?=

    Will one ASP .Net worker process use more than one CPU?

    =?Utf-8?B?Sm9obiBCYWlsZXk=?=, Sep 10, 2007, in forum: ASP .Net
    Replies:
    3
    Views:
    769
    =?Utf-8?B?Sm9obiBCYWlsZXk=?=
    Sep 14, 2007
  4. Steven D'Aprano
    Replies:
    0
    Views:
    78
    Steven D'Aprano
    Dec 23, 2013
  5. Replies:
    3
    Views:
    72
    Gary Herron
    Dec 23, 2013
Loading...

Share This Page