Non-blocking communication between Ruby processes

Discussion in 'Ruby' started by Iñaki Baz Castillo, Jan 7, 2010.

  1. Hi, I run Unicorn which is a Rack http server using N forked worker process=
    es.=20
    I need the following:

    =2D When a worker processes a HTTP request it must notify some data to othe=
    r=20
    independent Ruby process XXX (different than Unicorn).

    =2D This communication must be non-blocking, this is, the Unicorn worker pr=
    ocess=20
    sends the notification and doesn't wait for response from the process XXX, =
    so=20
    the Unicorn worker can, at the moment, generate the HTTP response and send=
    =20
    back to the client, getting free to handle new HTTP requests.

    =2D The ruby process XXX should use some kind of queue system to store=20
    notifications and handle them. In fact, it should take them periodically an=
    d=20
    send via TCP (but not HTTP) to other server.


    Which is the best approach to design such communication? perhaps using=20
    something as EventMachine for the XXX process and Unix/TCP socket=20
    communication between Unicorn processes and XXX process? any other alternat=
    ive=20
    or suggestion?

    Thanks a lot.

    =2D-=20
    I=C3=B1aki Baz Castillo <>
    Iñaki Baz Castillo, Jan 7, 2010
    #1
    1. Advertising

  2. On 01/07/2010 02:18 PM, Iñaki Baz Castillo wrote:
    > Hi, I run Unicorn which is a Rack http server using N forked worker processes.
    > I need the following:
    >
    > - When a worker processes a HTTP request it must notify some data to other
    > independent Ruby process XXX (different than Unicorn).
    >
    > - This communication must be non-blocking, this is, the Unicorn worker process
    > sends the notification and doesn't wait for response from the process XXX, so
    > the Unicorn worker can, at the moment, generate the HTTP response and send
    > back to the client, getting free to handle new HTTP requests.
    >
    > - The ruby process XXX should use some kind of queue system to store
    > notifications and handle them. In fact, it should take them periodically and
    > send via TCP (but not HTTP) to other server.
    >
    >
    > Which is the best approach to design such communication? perhaps using
    > something as EventMachine for the XXX process and Unix/TCP socket
    > communication between Unicorn processes and XXX process? any other alternative
    > or suggestion?
    >
    > Thanks a lot.


    I would probably first try a simple setup: make process XXX publish a
    Queue via DRb on a well known port and have one or more threads fetching
    from the queue and processing data. If you fear resource exhaustion,
    you can make the queue size limited. E.g.:

    x.rb server
    c.rb client


    robert@fussel:~$ cat x.rb
    #!/usr/local/bin/ruby19

    require 'thread'
    require 'drb'

    QUEUE_SIZE = 1024
    THREAD_COUNT = 5
    URI="druby://localhost:8787"

    QUEUE = SizedQueue.new QUEUE_SIZE

    threads = (1..THREAD_COUNT).map do
    Thread.new do
    while msg = QUEUE.deq
    p msg
    end
    end
    end

    DRb.start_service(URI, QUEUE)
    DRb.thread.join

    robert@fussel:~$ cat c.rb
    #!/usr/local/bin/ruby19

    require 'drb/drb'
    require 'benchmark'

    SERVER_URI="druby://localhost:8787"

    QUEUE = DRbObject.new_with_uri(SERVER_URI)

    10.times do |i|
    puts Benchmark.times do
    QUEUE.enq(sprintf("msg %4d at %-20s", i, Time.now))
    end
    end
    robert@fussel:~$

    Of course you can as well use a named pipe for the communication. But
    then demarcation of message boundaries might be more difficult etc.

    Kind regards

    robert


    --
    remember.guy do |as, often| as.you_can - without end
    http://blog.rubybestpractices.com/
    Robert Klemme, Jan 7, 2010
    #2
    1. Advertising

  3. El Jueves, 7 de Enero de 2010, Robert Klemme escribi=F3:
    > On 01/07/2010 02:18 PM, I=F1aki Baz Castillo wrote:
    > > Hi, I run Unicorn which is a Rack http server using N forked worker
    > > processes. I need the following:
    > >
    > > - When a worker processes a HTTP request it must notify some data to
    > > other independent Ruby process XXX (different than Unicorn).
    > >
    > > - This communication must be non-blocking, this is, the Unicorn worker
    > > process sends the notification and doesn't wait for response from the
    > > process XXX, so the Unicorn worker can, at the moment, generate the HTTP
    > > response and send back to the client, getting free to handle new HTTP
    > > requests.
    > >
    > > - The ruby process XXX should use some kind of queue system to store
    > > notifications and handle them. In fact, it should take them periodically
    > > and send via TCP (but not HTTP) to other server.
    > >
    > >
    > > Which is the best approach to design such communication? perhaps using
    > > something as EventMachine for the XXX process and Unix/TCP socket
    > > communication between Unicorn processes and XXX process? any other
    > > alternative or suggestion?
    > >
    > > Thanks a lot.

    >=20
    > I would probably first try a simple setup: make process XXX publish a
    > Queue via DRb on a well known port and have one or more threads fetching
    > from the queue and processing data. If you fear resource exhaustion,
    > you can make the queue size limited. E.g.:
    >=20
    > x.rb server
    > c.rb client
    >=20
    >=20
    > robert@fussel:~$ cat x.rb
    > #!/usr/local/bin/ruby19
    >=20
    > require 'thread'
    > require 'drb'
    >=20
    > QUEUE_SIZE =3D 1024
    > THREAD_COUNT =3D 5
    > URI=3D"druby://localhost:8787"
    >=20
    > QUEUE =3D SizedQueue.new QUEUE_SIZE
    >=20
    > threads =3D (1..THREAD_COUNT).map do
    > Thread.new do
    > while msg =3D QUEUE.deq
    > p msg
    > end
    > end
    > end
    >=20
    > DRb.start_service(URI, QUEUE)
    > DRb.thread.join
    >=20
    > robert@fussel:~$ cat c.rb
    > #!/usr/local/bin/ruby19
    >=20
    > require 'drb/drb'
    > require 'benchmark'
    >=20
    > SERVER_URI=3D"druby://localhost:8787"
    >=20
    > QUEUE =3D DRbObject.new_with_uri(SERVER_URI)
    >=20
    > 10.times do |i|
    > puts Benchmark.times do
    > QUEUE.enq(sprintf("msg %4d at %-20s", i, Time.now))
    > end
    > end
    > robert@fussel:~$
    >=20
    > Of course you can as well use a named pipe for the communication. But
    > then demarcation of message boundaries might be more difficult etc.


    Really thanks a lot.
    just a question: is it DRb good enough for performance?


    =2D-=20
    I=F1aki Baz Castillo <>
    Iñaki Baz Castillo, Jan 7, 2010
    #3
  4. On 01/07/2010 03:07 PM, Iñaki Baz Castillo wrote:
    > El Jueves, 7 de Enero de 2010, Robert Klemme escribió:
    >> On 01/07/2010 02:18 PM, Iñaki Baz Castillo wrote:
    >>> Hi, I run Unicorn which is a Rack http server using N forked worker
    >>> processes. I need the following:
    >>>
    >>> - When a worker processes a HTTP request it must notify some data to
    >>> other independent Ruby process XXX (different than Unicorn).
    >>>
    >>> - This communication must be non-blocking, this is, the Unicorn worker
    >>> process sends the notification and doesn't wait for response from the
    >>> process XXX, so the Unicorn worker can, at the moment, generate the HTTP
    >>> response and send back to the client, getting free to handle new HTTP
    >>> requests.
    >>>
    >>> - The ruby process XXX should use some kind of queue system to store
    >>> notifications and handle them. In fact, it should take them periodically
    >>> and send via TCP (but not HTTP) to other server.
    >>>
    >>>
    >>> Which is the best approach to design such communication? perhaps using
    >>> something as EventMachine for the XXX process and Unix/TCP socket
    >>> communication between Unicorn processes and XXX process? any other
    >>> alternative or suggestion?
    >>>
    >>> Thanks a lot.

    >> I would probably first try a simple setup: make process XXX publish a
    >> Queue via DRb on a well known port and have one or more threads fetching
    >> from the queue and processing data. If you fear resource exhaustion,
    >> you can make the queue size limited. E.g.:
    >>
    >> x.rb server
    >> c.rb client
    >>
    >>
    >> robert@fussel:~$ cat x.rb
    >> #!/usr/local/bin/ruby19
    >>
    >> require 'thread'
    >> require 'drb'
    >>
    >> QUEUE_SIZE = 1024
    >> THREAD_COUNT = 5
    >> URI="druby://localhost:8787"
    >>
    >> QUEUE = SizedQueue.new QUEUE_SIZE
    >>
    >> threads = (1..THREAD_COUNT).map do
    >> Thread.new do
    >> while msg = QUEUE.deq
    >> p msg
    >> end
    >> end
    >> end
    >>
    >> DRb.start_service(URI, QUEUE)
    >> DRb.thread.join
    >>
    >> robert@fussel:~$ cat c.rb
    >> #!/usr/local/bin/ruby19
    >>
    >> require 'drb/drb'
    >> require 'benchmark'
    >>
    >> SERVER_URI="druby://localhost:8787"
    >>
    >> QUEUE = DRbObject.new_with_uri(SERVER_URI)
    >>
    >> 10.times do |i|
    >> puts Benchmark.times do
    >> QUEUE.enq(sprintf("msg %4d at %-20s", i, Time.now))
    >> end
    >> end
    >> robert@fussel:~$
    >>
    >> Of course you can as well use a named pipe for the communication. But
    >> then demarcation of message boundaries might be more difficult etc.

    >
    > Really thanks a lot.
    > just a question: is it DRb good enough for performance?


    I don't know about your requirements. Just try it out - you can start
    multiple clients and vary the number of threads and the queue size in
    the server at will. To me it seemed pretty fast. I did

    $ for i in 1 2 3 4 5 6 7 8 9 10; do ./c.rb & done

    and message came really fast. Also note that each client prints timing
    so you can see how fast it is on your machine.

    If you need more performance then I'm sure you'll find a Ruby binding to
    any of the queuing framework like GNU Queue, NQS and whatnot. But I'd
    start with the simple DRb based solution. It's easily done, you have
    everything you need and do not need to install extra software, not even
    gems.

    I just notice, there was a bug in my code: I used Benchmark.times which
    prints timings of the current process. What I meant was
    Benchmark.measure. I have changed the code a bit so you can easy
    experiment with queue ssizes, thread counts and message counts (see below).

    With this command line

    t=10;for i in `seq 1 $t`; do ./c.rb 10000 >"cl-$i"& done; for i in `seq
    1 $t`; do wait; done; cat cl-*

    I get pretty good timings of 7.6ms / msg with unlimited Queue size and
    default thread count (5) for this unrealistic test that the queue is
    hammered.

    Kind regards

    robert

    Modified code:

    robert@fussel:~$ cat x.rb
    #!/usr/local/bin/ruby19

    require 'thread'
    require 'drb'

    THREAD_COUNT = (ARGV.shift || 5).to_i
    QUEUE_SIZE = ARGV.shift

    printf "%4d threads, queue size=%p\n", THREAD_COUNT, QUEUE_SIZE

    URI="druby://localhost:8787"

    Thread.abort_on_exception = true

    QUEUE = QUEUE_SIZE ? SizedQueue.new(QUEUE_SIZE.to_i) : Queue.new
    # QUEUE.extend DRb::DRbUndumped

    threads = (1..THREAD_COUNT).map do |i|
    Thread.new i do |id|
    while msg = QUEUE.deq
    printf "thread %2d: %p\n", id, msg
    end
    end
    end

    DRb.start_service(URI, QUEUE)
    puts 'Started'
    DRb.thread.join
    puts 'Returned'
    threads.each {|th| th.join rescue nil}
    puts 'Done'

    robert@fussel:~$

    robert@fussel:~$ cat c.rb
    #!/usr/local/bin/ruby19

    require 'drb/drb'
    require 'benchmark'

    SERVER_URI="druby://localhost:8787"

    rep = (ARGV.shift || 20).to_i

    QUEUE = DRb::DRbObject.new_with_uri(SERVER_URI)

    QUEUE.enq "Started client"

    Benchmark.bm 20 do |b|
    b.report "client %4d" % $$ do
    rep.times do |i|
    QUEUE.enq(sprintf("client %4d msg %4d at %-20s", $$, i, Time.now))
    end
    end
    end

    QUEUE.enq "Stopped client"

    robert@fussel:~$


    --
    remember.guy do |as, often| as.you_can - without end
    http://blog.rubybestpractices.com/
    Robert Klemme, Jan 7, 2010
    #4
  5. El Jueves, 7 de Enero de 2010, Robert Klemme escribi=F3:
    > I don't know about your requirements. Just try it out - you can start=20
    > multiple clients and vary the number of threads and the queue size in=20
    > the server at will. To me it seemed pretty fast. I did
    >=20
    > $ for i in 1 2 3 4 5 6 7 8 9 10; do ./c.rb & done
    >=20
    > and message came really fast. Also note that each client prints timing=20
    > so you can see how fast it is on your machine.
    >=20
    > If you need more performance then I'm sure you'll find a Ruby binding to=

    =20
    > any of the queuing framework like GNU Queue, NQS and whatnot. But I'd=20
    > start with the simple DRb based solution. It's easily done, you have=20
    > everything you need and do not need to install extra software, not even=20
    > gems.


    Thanks a lot. I've tryed a code similar to this one:
    http://www.idle-hacking.com/2007/11/iopipe-for-interprocess-communication/

    It uses a pipe file (of course there is no queue at all).

    Well, sending 100000 strings (with a loop) it takes 2-3 seconds to receive =
    and=20
    print all the received data.
    however using the DRb solution it just didn't finish (I had to interrupt th=
    e=20
    process after 30 seconds due to CPU usage).

    I'd like a simple solution. Using DRb could be nice. However using a pipe f=
    ile=20
    seems simpler and faster. The doubt I have now is about how secure is a pip=
    e.=20
    Could it leak memory if some process die or the reader process is not so fa=
    st=20
    to handle the received data?





    > I just notice, there was a bug in my code: I used Benchmark.times which=20
    > prints timings of the current process. What I meant was=20
    > Benchmark.measure. I have changed the code a bit so you can easy=20
    > experiment with queue ssizes, thread counts and message counts (see below=

    ).
    >=20
    > With this command line
    >=20
    > t=3D10;for i in `seq 1 $t`; do ./c.rb 10000 >"cl-$i"& done; for i in `seq=

    =20
    > 1 $t`; do wait; done; cat cl-*
    >=20
    > I get pretty good timings of 7.6ms / msg with unlimited Queue size and=20
    > default thread count (5) for this unrealistic test that the queue is=20
    > hammered.

    =20
    Really thanks a lot, I'll try it.



    =2D-=20
    I=F1aki Baz Castillo <>
    Iñaki Baz Castillo, Jan 7, 2010
    #5
  6. El Jueves, 7 de Enero de 2010, I=F1aki Baz Castillo escribi=F3:
    > The doubt I have now is about how secure is a pipe.=20
    > Could it leak memory if some process die or the reader process is not so
    > fast to handle the received data?


    Hummm, I have a reader process and a writer process.
    The wirter process writes into the pipe file.
    If I kill the reader process then the writer process remains writting in th=
    e=20
    pipe and the data is stored (in the filesystem?).

    So there is the leaking problem... I must investigate it a bit more...

    Thanks a lot.


    =2D-=20
    I=F1aki Baz Castillo <>
    Iñaki Baz Castillo, Jan 7, 2010
    #6
  7. On 07.01.2010 18:58, Iñaki Baz Castillo wrote:

    > Hummm, I have a reader process and a writer process.
    > The wirter process writes into the pipe file.
    > If I kill the reader process then the writer process remains writting in the
    > pipe and the data is stored (in the filesystem?).
    >
    > So there is the leaking problem... I must investigate it a bit more...


    pipe.write unless pipe.full?

    i.e. check if your pipe hits a set limit on disk, and generate an
    exception if the pipe_file reaches (or is close to reaching) the limit.

    You could then buffer the data to be written until an additional (or
    new) reading thread has started.

    --
    Phillip Gawlowski
    Phillip Gawlowski, Jan 7, 2010
    #7
  8. On 01/07/2010 06:58 PM, Iñaki Baz Castillo wrote:
    > El Jueves, 7 de Enero de 2010, Iñaki Baz Castillo escribió:
    >> The doubt I have now is about how secure is a pipe.
    >> Could it leak memory if some process die or the reader process is not so
    >> fast to handle the received data?

    >
    > Hummm, I have a reader process and a writer process.


    I thought you have multiple writers. Didn't you mention multiple forked
    Rack handlers?

    > The wirter process writes into the pipe file.
    > If I kill the reader process then the writer process remains writting in the
    > pipe and the data is stored (in the filesystem?).
    >
    > So there is the leaking problem...


    Not exactly: the writer is blocked. You can try this out:

    robert@fussel:~$ mkfifo ff
    robert@fussel:~$ ls -lF ff
    prw-r--r-- 1 robert robert 0 2010-01-07 19:25 ff|
    robert@fussel:~$ ruby19 -e 'puts("+"*10_000)' > ff
    ^Z
    [1]+ Stopped ruby19 -e 'puts("+"*10_000)' > ff
    robert@fussel:~$ wc ff &
    [2] 14036
    robert@fussel:~$ %1
    ruby19 -e 'puts("+"*10_000)' > ff
    robert@fussel:~$ 1 1 10001 ff

    [2]+ Done wc ff
    robert@fussel:~$ jobs
    robert@fussel:~$

    At the point where I pressed Ctrl-Z the writer hung because the pipe was
    full. (The size of a pipe is usually the memory page size of the OS
    IIRC, this would be 4k in case of Linux 32 bit).

    > I must investigate it a bit more...


    I'd personally prefer to use the DRb approach because then you can
    actually send typed messages, i.e. whatever information you need. Also,
    it was fun to play around with those small test programs. ;-) And you
    can have the reader run on any machine in the network.

    Whatever you do, you have to decide how to go about the situation when
    the reader goes away - for whatever reasons. You could write your
    messages to a file and use an approach like "tail -f" uses to read them.
    But this has the nasty effect of clobbering the file system plus if
    the reader goes away the file might grow arbitrary large. And you have
    locking issues. Using any in memory pipe (e.g. mkfifo or via DRb) is
    preferrable IMHO. The you can still decide in the client what you do if
    you cannot get rid of the message.

    > Thanks a lot.


    You're welcome.

    Kind regards

    robert

    --
    remember.guy do |as, often| as.you_can - without end
    http://blog.rubybestpractices.com/
    Robert Klemme, Jan 7, 2010
    #8
  9. El Jueves, 7 de Enero de 2010, I=F1aki Baz Castillo escribi=F3:
    > El Jueves, 7 de Enero de 2010, I=F1aki Baz Castillo escribi=F3:
    > > The doubt I have now is about how secure is a pipe.
    > > Could it leak memory if some process die or the reader process is not so
    > > fast to handle the received data?

    >=20
    > Hummm, I have a reader process and a writer process.
    > The wirter process writes into the pipe file.
    > If I kill the reader process then the writer process remains writting in
    > the pipe and the data is stored (in the filesystem?).
    >=20
    > So there is the leaking problem... I must investigate it a bit more...


    Ok, the fifo remains working at SO level so it can receive messages after s=
    ome=20
    SO buffer capability is filled. Then the writer process blocks when trying =
    to=20
    "flush" the data.
    =46ortunatelly it just blocks as Ruby thread level so other thread can work.

    =2D-=20
    I=F1aki Baz Castillo <>
    Iñaki Baz Castillo, Jan 7, 2010
    #9
  10. El Jueves, 7 de Enero de 2010, Robert Klemme escribi=F3:
    > On 01/07/2010 06:58 PM, I=F1aki Baz Castillo wrote:
    > > El Jueves, 7 de Enero de 2010, I=F1aki Baz Castillo escribi=F3:
    > >> The doubt I have now is about how secure is a pipe.
    > >> Could it leak memory if some process die or the reader process is not =

    so
    > >> fast to handle the received data?

    > >
    > > Hummm, I have a reader process and a writer process.

    >=20
    > I thought you have multiple writers. Didn't you mention multiple forked
    > Rack handlers?


    Yes, that's true. Sure I'll get into problems when writting in the FIFO fro=
    m=20
    varios clients at the same time :)
    But for that I could generate so many fifo's as Rack workers...



    > > The wirter process writes into the pipe file.
    > > If I kill the reader process then the writer process remains writting in
    > > the pipe and the data is stored (in the filesystem?).
    > >
    > > So there is the leaking problem...

    >=20
    > Not exactly: the writer is blocked. You can try this out:
    >=20
    > robert@fussel:~$ mkfifo ff
    > robert@fussel:~$ ls -lF ff
    > prw-r--r-- 1 robert robert 0 2010-01-07 19:25 ff|
    > robert@fussel:~$ ruby19 -e 'puts("+"*10_000)' > ff
    > ^Z
    > [1]+ Stopped ruby19 -e 'puts("+"*10_000)' > ff
    > robert@fussel:~$ wc ff &
    > [2] 14036
    > robert@fussel:~$ %1
    > ruby19 -e 'puts("+"*10_000)' > ff
    > robert@fussel:~$ 1 1 10001 ff
    >=20
    > [2]+ Done wc ff
    > robert@fussel:~$ jobs
    > robert@fussel:~$
    >=20
    > At the point where I pressed Ctrl-Z the writer hung because the pipe was
    > full. (The size of a pipe is usually the memory page size of the OS
    > IIRC, this would be 4k in case of Linux 32 bit).
    >=20
    > > I must investigate it a bit more...

    >=20
    > I'd personally prefer to use the DRb approach because then you can
    > actually send typed messages, i.e. whatever information you need. Also,
    > it was fun to play around with those small test programs. ;-) And you
    > can have the reader run on any machine in the network.
    >=20
    > Whatever you do, you have to decide how to go about the situation when
    > the reader goes away - for whatever reasons.


    It's realtime info so if the reader dies then it's not so important to reco=
    ver=20
    that information when starting again. Well, it would be nice to recover it=
    =20
    just for 5-10 minutes, but no more.



    > You could write your
    > messages to a file and use an approach like "tail -f" uses to read them.
    > But this has the nasty effect of clobbering the file system plus if
    > the reader goes away the file might grow arbitrary large. And you have
    > locking issues. Using any in memory pipe (e.g. mkfifo or via DRb) is
    > preferrable IMHO. The you can still decide in the client what you do if
    > you cannot get rid of the message.


    Yes, I must think a bit aobut it :)

    Thanks a lot for your help.


    =2D-=20
    I=F1aki Baz Castillo <>
    Iñaki Baz Castillo, Jan 7, 2010
    #10
  11. El Jueves, 7 de Enero de 2010, Phillip Gawlowski escribi=F3:
    > On 07.01.2010 18:58, I=F1aki Baz Castillo wrote:
    > > Hummm, I have a reader process and a writer process.
    > > The wirter process writes into the pipe file.
    > > If I kill the reader process then the writer process remains writting in
    > > the pipe and the data is stored (in the filesystem?).
    > >
    > > So there is the leaking problem... I must investigate it a bit more...

    >=20
    > pipe.write unless pipe.full?


    Unfortunatelly #full? is not a method of File :(
    Note that I'm using a fifo file (created with "mkfifo file") so it is not=20
    "stored" in the filesystem. Instead it's just a communication between two=20
    processes at SO level via SO's buffers.


    =2D-=20
    I=F1aki Baz Castillo <>
    Iñaki Baz Castillo, Jan 7, 2010
    #11
  12. On 07.01.2010 19:50, Iñaki Baz Castillo wrote:

    >> pipe.write unless pipe.full?

    >
    > Unfortunatelly #full? is not a method of File :(


    Well, yes, you'd have to implement the method (or something like it)
    yourself. ;)

    > Note that I'm using a fifo file (created with "mkfifo file") so it is not
    > "stored" in the filesystem. Instead it's just a communication between two
    > processes at SO level via SO's buffers.


    Yeah, I gathered that from your other posts. The general point, though,
    still applies: check the pipe's size, and if it grows too large, spin
    off a new reading thread.

    --
    Phillip Gawlowski
    Phillip Gawlowski, Jan 7, 2010
    #12
  13. On 01/07/2010 08:01 PM, Phillip Gawlowski wrote:
    > On 07.01.2010 19:50, Iñaki Baz Castillo wrote:
    >
    >>> pipe.write unless pipe.full?

    >> Unfortunatelly #full? is not a method of File :(

    >
    > Well, yes, you'd have to implement the method (or something like it)
    > yourself. ;)
    >
    >> Note that I'm using a fifo file (created with "mkfifo file") so it is not
    >> "stored" in the filesystem. Instead it's just a communication between two
    >> processes at SO level via SO's buffers.

    >
    > Yeah, I gathered that from your other posts. The general point, though,
    > still applies: check the pipe's size, and if it grows too large, spin
    > off a new reading thread.


    That's something different than you proposed initially, isn't it? This
    approach (increasing the number of readers if the pipe fills too fast)
    is better because it regulates read performance according to load.

    > pipe.write unless pipe.full?
    >
    > i.e. check if your pipe hits a set limit on disk, and generate an
    > exception if the pipe_file reaches (or is close to reaching) the limit.
    >
    > You could then buffer the data to be written until an additional (or
    > new) reading thread has started.


    IMHO this approach (local buffering if the pipe cannot be written to) is
    not really helping because the pipe *is* a buffer already. In other
    words, the same effect will happen - only later. The only argument in
    favor of additional buffering I can see is less lock contention: if
    every writer process has multiple threads that want to write to the
    buffer, they could instead write to a Queue internally and a single
    reader could read from that local queue and write to the global queue.
    That would reduce the number of writers that compete for locks on the
    global queue. Whether that is performant or not would need to be tested.

    Nevertheless I would start with a simple solution, monitor its
    performance and change the implementation if it does not scale well
    enough. Often simple solutions work surprisingly well... :)

    Kind regards

    robert

    --
    remember.guy do |as, often| as.you_can - without end
    http://blog.rubybestpractices.com/
    Robert Klemme, Jan 7, 2010
    #13
  14. On 07.01.2010 20:50, Robert Klemme wrote:
    > On 01/07/2010 08:01 PM, Phillip Gawlowski wrote:


    >
    > That's something different than you proposed initially, isn't it? This
    > approach (increasing the number of readers if the pipe fills too fast)
    > is better because it regulates read performance according to load.


    A little refined (in that I skipped the buffering), but it's still the
    same core: check the pipe, and sin off new threads as needed.

    > IMHO this approach (local buffering if the pipe cannot be written to) is
    > not really helping because the pipe *is* a buffer already. In other
    > words, the same effect will happen - only later. The only argument in
    > favor of additional buffering I can see is less lock contention: if
    > every writer process has multiple threads that want to write to the
    > buffer, they could instead write to a Queue internally and a single
    > reader could read from that local queue and write to the global queue.
    > That would reduce the number of writers that compete for locks on the
    > global queue. Whether that is performant or not would need to be tested.


    This might be a difference in interpretation: I see the pipe in this
    instance as a simple inter-process communication solution, not per se a
    buffer.

    Otherwise: You are right.

    Also in that performance would've to be tested, and the constraints have
    to be known (Iñaki already mentioned that getting all data is less
    important to him, so buffering wouldn't be strictly necessary, either).

    > Nevertheless I would start with a simple solution, monitor its
    > performance and change the implementation if it does not scale well
    > enough. Often simple solutions work surprisingly well... :)


    Indeed. And it's easier to iterate from something simple, than to
    iterate from something complex, too. ;)

    --
    Phillip Gawlowski
    Phillip Gawlowski, Jan 7, 2010
    #14
  15. El Jueves, 7 de Enero de 2010, Robert Klemme escribi=F3:
    > > Yeah, I gathered that from your other posts. The general point, though,=

    =20
    > > still applies: check the pipe's size, and if it grows too large, spin=20
    > > off a new reading thread.

    >=20
    > That's something different than you proposed initially, isn't it? This=20
    > approach (increasing the number of readers if the pipe fills too fast)=20
    > is better because it regulates read performance according to load.


    Definitively I have no idea of how to know the status of a FIFO (not a IO p=
    ipe=20
    but a FIFO file). The only it occurs when it's full (because no reader is=20
    getting the data) is that the writer #flush operation gets blocked.
    I've found no way to determine how "full" is a FIFO file.=20


    =2D-=20
    I=F1aki Baz Castillo <>
    Iñaki Baz Castillo, Jan 7, 2010
    #15
  16. Iñaki Baz Castillo

    Eric Wong Guest

    Iñaki Baz Castillo <> wrote:
    > El Jueves, 7 de Enero de 2010, Robert Klemme escribió:
    > > > Yeah, I gathered that from your other posts. The general point, though,
    > > > still applies: check the pipe's size, and if it grows too large, spin
    > > > off a new reading thread.

    > >
    > > That's something different than you proposed initially, isn't it? This
    > > approach (increasing the number of readers if the pipe fills too fast)
    > > is better because it regulates read performance according to load.

    >
    > Definitively I have no idea of how to know the status of a FIFO (not a IO pipe
    > but a FIFO file). The only it occurs when it's full (because no reader is
    > getting the data) is that the writer #flush operation gets blocked.
    > I've found no way to determine how "full" is a FIFO file.


    FIFO are pipes, they just have a name on the filesystem.

    In any case, use IO#write_nonblock. Any writes you do will raise
    Errno::EAGAIN if your FIFO/pipe is full.

    See the pipe(7) manpage on a Linux machine, it provides a great overview
    of pipe semantics for blocking/non-blocking operations.

    --
    Eric Wong
    Eric Wong, Jan 7, 2010
    #16
  17. Iñaki Baz Castillo

    Eric Wong Guest

    Iñaki Baz Castillo <> wrote:
    > Hi, I run Unicorn which is a Rack http server using N forked worker processes.
    > I need the following:
    >
    > - When a worker processes a HTTP request it must notify some data to other
    > independent Ruby process XXX (different than Unicorn).
    >
    > - This communication must be non-blocking, this is, the Unicorn worker process
    > sends the notification and doesn't wait for response from the process XXX, so
    > the Unicorn worker can, at the moment, generate the HTTP response and send
    > back to the client, getting free to handle new HTTP requests.


    If stressed enough, everything has to block/reject or run your systems
    out of memory/disk space :)

    > - The ruby process XXX should use some kind of queue system to store
    > notifications and handle them. In fact, it should take them periodically and
    > send via TCP (but not HTTP) to other server.
    >
    >
    > Which is the best approach to design such communication? perhaps using
    > something as EventMachine for the XXX process and Unix/TCP socket
    > communication between Unicorn processes and XXX process? any other alternative
    > or suggestion?


    If you only talk between processes on one machine (since you're trying
    FIFOs), you can also check out the "posix_mq" gem/library I started
    recently:

    http://bogomips.org/ruby_posix_mq/

    It's less portable than FIFOs but if you're running a modern GNU/Linux or
    FreeBSD, it should work. The default queue sizes on Linux are small:
    8192 bytes per message, and 10 messages in the queue. You'll need
    root to increase them.

    But then FIFOs are hard-coded to 65536 bytes total under Linux and a
    4096 byte PIPE_BUF (POSIX only requires a 512 byte PIPE_BUF).

    --
    Eric Wong
    Eric Wong, Jan 7, 2010
    #17
  18. El Jueves, 7 de Enero de 2010, Eric Wong escribi=F3:
    > I=F1aki Baz Castillo <> wrote:
    > > El Jueves, 7 de Enero de 2010, Robert Klemme escribi=F3:
    > > > > Yeah, I gathered that from your other posts. The general point,
    > > > > though, still applies: check the pipe's size, and if it grows too
    > > > > large, spin off a new reading thread.
    > > >
    > > > That's something different than you proposed initially, isn't it? Th=

    is
    > > > approach (increasing the number of readers if the pipe fills too fast)
    > > > is better because it regulates read performance according to load.

    > >
    > > Definitively I have no idea of how to know the status of a FIFO (not a =

    IO
    > > pipe but a FIFO file). The only it occurs when it's full (because no
    > > reader is getting the data) is that the writer #flush operation gets
    > > blocked. I've found no way to determine how "full" is a FIFO file.

    >=20
    > FIFO are pipes, they just have a name on the filesystem.
    >=20
    > In any case, use IO#write_nonblock. Any writes you do will raise
    > Errno::EAGAIN if your FIFO/pipe is full.
    >=20
    > See the pipe(7) manpage on a Linux machine, it provides a great overview
    > of pipe semantics for blocking/non-blocking operations.


    Thanks, I'll do.


    =2D-=20
    I=F1aki Baz Castillo <>
    Iñaki Baz Castillo, Jan 7, 2010
    #18
  19. El Jueves, 7 de Enero de 2010, Eric Wong escribi=F3:
    > If you only talk between processes on one machine (since you're trying
    > FIFOs), you can also check out the "posix_mq" gem/library I started
    > recently:
    >=20
    > http://bogomips.org/ruby_posix_mq/


    Really interesting. Is it safe to have various processes (Unicorn workers)=
    =20
    writting to a single posix_mq? or will the data be "mixed"? is there any wa=
    y=20
    to perform "atomic" writting operation in this queue?

    Thanks.


    =2D-=20
    I=F1aki Baz Castillo <>
    Iñaki Baz Castillo, Jan 7, 2010
    #19
  20. Iñaki Baz Castillo

    Eric Wong Guest

    Iñaki Baz Castillo <> wrote:
    > El Jueves, 7 de Enero de 2010, Eric Wong escribió:
    > > If you only talk between processes on one machine (since you're trying
    > > FIFOs), you can also check out the "posix_mq" gem/library I started
    > > recently:
    > >
    > > http://bogomips.org/ruby_posix_mq/

    >
    > Really interesting. Is it safe to have various processes (Unicorn workers)
    > writting to a single posix_mq? or will the data be "mixed"? is there any way
    > to perform "atomic" writting operation in this queue?


    These queues are completely atomic at the message level and descriptors
    can be safely shared between processes/threads. SysV message queues
    weren't thread-safe, but POSIX ones are.

    --
    Eric Wong
    Eric Wong, Jan 7, 2010
    #20
    1. Advertising

Want to reply to this thread or ask your own question?

It takes just 2 minutes to sign up (and it's free!). Just click the sign up button to choose a username and then you can ask your own questions on the forum.
Similar Threads
  1. Hendra Gunawan
    Replies:
    1
    Views:
    12,325
    Allan Herriman
    Apr 8, 2004
  2. Andre Kelmanson

    blocking i/o vs. non blocking i/o (performance)

    Andre Kelmanson, Oct 10, 2003, in forum: C Programming
    Replies:
    3
    Views:
    898
    Valentin Tihomirov
    Oct 12, 2003
  3. nukleus
    Replies:
    14
    Views:
    801
    Chris Uppal
    Jan 22, 2007
  4. Christian
    Replies:
    5
    Views:
    714
    Esmond Pitt
    Dec 2, 2007
  5. Serge Savoie
    Replies:
    4
    Views:
    244
    Serge Savoie
    Oct 1, 2008
Loading...

Share This Page