Non-blocking communication between Ruby processes

  • Thread starter Iñaki Baz Castillo
  • Start date
I

Iñaki Baz Castillo

Hi, I run Unicorn which is a Rack http server using N forked worker process=
es.=20
I need the following:

=2D When a worker processes a HTTP request it must notify some data to othe=
r=20
independent Ruby process XXX (different than Unicorn).

=2D This communication must be non-blocking, this is, the Unicorn worker pr=
ocess=20
sends the notification and doesn't wait for response from the process XXX, =
so=20
the Unicorn worker can, at the moment, generate the HTTP response and send=
=20
back to the client, getting free to handle new HTTP requests.

=2D The ruby process XXX should use some kind of queue system to store=20
notifications and handle them. In fact, it should take them periodically an=
d=20
send via TCP (but not HTTP) to other server.


Which is the best approach to design such communication? perhaps using=20
something as EventMachine for the XXX process and Unix/TCP socket=20
communication between Unicorn processes and XXX process? any other alternat=
ive=20
or suggestion?

Thanks a lot.

=2D-=20
I=C3=B1aki Baz Castillo <[email protected]>
 
R

Robert Klemme

Hi, I run Unicorn which is a Rack http server using N forked worker processes.
I need the following:

- When a worker processes a HTTP request it must notify some data to other
independent Ruby process XXX (different than Unicorn).

- This communication must be non-blocking, this is, the Unicorn worker process
sends the notification and doesn't wait for response from the process XXX, so
the Unicorn worker can, at the moment, generate the HTTP response and send
back to the client, getting free to handle new HTTP requests.

- The ruby process XXX should use some kind of queue system to store
notifications and handle them. In fact, it should take them periodically and
send via TCP (but not HTTP) to other server.


Which is the best approach to design such communication? perhaps using
something as EventMachine for the XXX process and Unix/TCP socket
communication between Unicorn processes and XXX process? any other alternative
or suggestion?

Thanks a lot.

I would probably first try a simple setup: make process XXX publish a
Queue via DRb on a well known port and have one or more threads fetching
from the queue and processing data. If you fear resource exhaustion,
you can make the queue size limited. E.g.:

x.rb server
c.rb client


robert@fussel:~$ cat x.rb
#!/usr/local/bin/ruby19

require 'thread'
require 'drb'

QUEUE_SIZE = 1024
THREAD_COUNT = 5
URI="druby://localhost:8787"

QUEUE = SizedQueue.new QUEUE_SIZE

threads = (1..THREAD_COUNT).map do
Thread.new do
while msg = QUEUE.deq
p msg
end
end
end

DRb.start_service(URI, QUEUE)
DRb.thread.join

robert@fussel:~$ cat c.rb
#!/usr/local/bin/ruby19

require 'drb/drb'
require 'benchmark'

SERVER_URI="druby://localhost:8787"

QUEUE = DRbObject.new_with_uri(SERVER_URI)

10.times do |i|
puts Benchmark.times do
QUEUE.enq(sprintf("msg %4d at %-20s", i, Time.now))
end
end
robert@fussel:~$

Of course you can as well use a named pipe for the communication. But
then demarcation of message boundaries might be more difficult etc.

Kind regards

robert
 
I

Iñaki Baz Castillo

El Jueves, 7 de Enero de 2010, Robert Klemme escribi=F3:
=20
I would probably first try a simple setup: make process XXX publish a
Queue via DRb on a well known port and have one or more threads fetching
from the queue and processing data. If you fear resource exhaustion,
you can make the queue size limited. E.g.:
=20
x.rb server
c.rb client
=20
=20
robert@fussel:~$ cat x.rb
#!/usr/local/bin/ruby19
=20
require 'thread'
require 'drb'
=20
QUEUE_SIZE =3D 1024
THREAD_COUNT =3D 5
URI=3D"druby://localhost:8787"
=20
QUEUE =3D SizedQueue.new QUEUE_SIZE
=20
threads =3D (1..THREAD_COUNT).map do
Thread.new do
while msg =3D QUEUE.deq
p msg
end
end
end
=20
DRb.start_service(URI, QUEUE)
DRb.thread.join
=20
robert@fussel:~$ cat c.rb
#!/usr/local/bin/ruby19
=20
require 'drb/drb'
require 'benchmark'
=20
SERVER_URI=3D"druby://localhost:8787"
=20
QUEUE =3D DRbObject.new_with_uri(SERVER_URI)
=20
10.times do |i|
puts Benchmark.times do
QUEUE.enq(sprintf("msg %4d at %-20s", i, Time.now))
end
end
robert@fussel:~$
=20
Of course you can as well use a named pipe for the communication. But
then demarcation of message boundaries might be more difficult etc.

Really thanks a lot.
just a question: is it DRb good enough for performance?


=2D-=20
I=F1aki Baz Castillo <[email protected]>
 
R

Robert Klemme

El Jueves, 7 de Enero de 2010, Robert Klemme escribió:

Really thanks a lot.
just a question: is it DRb good enough for performance?

I don't know about your requirements. Just try it out - you can start
multiple clients and vary the number of threads and the queue size in
the server at will. To me it seemed pretty fast. I did

$ for i in 1 2 3 4 5 6 7 8 9 10; do ./c.rb & done

and message came really fast. Also note that each client prints timing
so you can see how fast it is on your machine.

If you need more performance then I'm sure you'll find a Ruby binding to
any of the queuing framework like GNU Queue, NQS and whatnot. But I'd
start with the simple DRb based solution. It's easily done, you have
everything you need and do not need to install extra software, not even
gems.

I just notice, there was a bug in my code: I used Benchmark.times which
prints timings of the current process. What I meant was
Benchmark.measure. I have changed the code a bit so you can easy
experiment with queue ssizes, thread counts and message counts (see below).

With this command line

t=10;for i in `seq 1 $t`; do ./c.rb 10000 >"cl-$i"& done; for i in `seq
1 $t`; do wait; done; cat cl-*

I get pretty good timings of 7.6ms / msg with unlimited Queue size and
default thread count (5) for this unrealistic test that the queue is
hammered.

Kind regards

robert

Modified code:

robert@fussel:~$ cat x.rb
#!/usr/local/bin/ruby19

require 'thread'
require 'drb'

THREAD_COUNT = (ARGV.shift || 5).to_i
QUEUE_SIZE = ARGV.shift

printf "%4d threads, queue size=%p\n", THREAD_COUNT, QUEUE_SIZE

URI="druby://localhost:8787"

Thread.abort_on_exception = true

QUEUE = QUEUE_SIZE ? SizedQueue.new(QUEUE_SIZE.to_i) : Queue.new
# QUEUE.extend DRb::DRbUndumped

threads = (1..THREAD_COUNT).map do |i|
Thread.new i do |id|
while msg = QUEUE.deq
printf "thread %2d: %p\n", id, msg
end
end
end

DRb.start_service(URI, QUEUE)
puts 'Started'
DRb.thread.join
puts 'Returned'
threads.each {|th| th.join rescue nil}
puts 'Done'

robert@fussel:~$

robert@fussel:~$ cat c.rb
#!/usr/local/bin/ruby19

require 'drb/drb'
require 'benchmark'

SERVER_URI="druby://localhost:8787"

rep = (ARGV.shift || 20).to_i

QUEUE = DRb::DRbObject.new_with_uri(SERVER_URI)

QUEUE.enq "Started client"

Benchmark.bm 20 do |b|
b.report "client %4d" % $$ do
rep.times do |i|
QUEUE.enq(sprintf("client %4d msg %4d at %-20s", $$, i, Time.now))
end
end
end

QUEUE.enq "Stopped client"

robert@fussel:~$
 
I

Iñaki Baz Castillo

El Jueves, 7 de Enero de 2010, Robert Klemme escribi=F3:
I don't know about your requirements. Just try it out - you can start=20
multiple clients and vary the number of threads and the queue size in=20
the server at will. To me it seemed pretty fast. I did
=20
$ for i in 1 2 3 4 5 6 7 8 9 10; do ./c.rb & done
=20
and message came really fast. Also note that each client prints timing=20
so you can see how fast it is on your machine.
=20
If you need more performance then I'm sure you'll find a Ruby binding to= =20
any of the queuing framework like GNU Queue, NQS and whatnot. But I'd=20
start with the simple DRb based solution. It's easily done, you have=20
everything you need and do not need to install extra software, not even=20
gems.

Thanks a lot. I've tryed a code similar to this one:
http://www.idle-hacking.com/2007/11/iopipe-for-interprocess-communication/

It uses a pipe file (of course there is no queue at all).

Well, sending 100000 strings (with a loop) it takes 2-3 seconds to receive =
and=20
print all the received data.
however using the DRb solution it just didn't finish (I had to interrupt th=
e=20
process after 30 seconds due to CPU usage).

I'd like a simple solution. Using DRb could be nice. However using a pipe f=
ile=20
seems simpler and faster. The doubt I have now is about how secure is a pip=
e.=20
Could it leak memory if some process die or the reader process is not so fa=
st=20
to handle the received data?




I just notice, there was a bug in my code: I used Benchmark.times which=20
prints timings of the current process. What I meant was=20
Benchmark.measure. I have changed the code a bit so you can easy=20
experiment with queue ssizes, thread counts and message counts (see below= ).
=20
With this command line
=20
t=3D10;for i in `seq 1 $t`; do ./c.rb 10000 >"cl-$i"& done; for i in `seq= =20
1 $t`; do wait; done; cat cl-*
=20
I get pretty good timings of 7.6ms / msg with unlimited Queue size and=20
default thread count (5) for this unrealistic test that the queue is=20
hammered.
=20
Really thanks a lot, I'll try it.



=2D-=20
I=F1aki Baz Castillo <[email protected]>
 
I

Iñaki Baz Castillo

El Jueves, 7 de Enero de 2010, I=F1aki Baz Castillo escribi=F3:
The doubt I have now is about how secure is a pipe.=20
Could it leak memory if some process die or the reader process is not so
fast to handle the received data?

Hummm, I have a reader process and a writer process.
The wirter process writes into the pipe file.
If I kill the reader process then the writer process remains writting in th=
e=20
pipe and the data is stored (in the filesystem?).

So there is the leaking problem... I must investigate it a bit more...

Thanks a lot.


=2D-=20
I=F1aki Baz Castillo <[email protected]>
 
P

Phillip Gawlowski

Hummm, I have a reader process and a writer process.
The wirter process writes into the pipe file.
If I kill the reader process then the writer process remains writting in the
pipe and the data is stored (in the filesystem?).

So there is the leaking problem... I must investigate it a bit more...

pipe.write unless pipe.full?

i.e. check if your pipe hits a set limit on disk, and generate an
exception if the pipe_file reaches (or is close to reaching) the limit.

You could then buffer the data to be written until an additional (or
new) reading thread has started.
 
R

Robert Klemme

El Jueves, 7 de Enero de 2010, Iñaki Baz Castillo escribió:

Hummm, I have a reader process and a writer process.

I thought you have multiple writers. Didn't you mention multiple forked
Rack handlers?
The wirter process writes into the pipe file.
If I kill the reader process then the writer process remains writting in the
pipe and the data is stored (in the filesystem?).

So there is the leaking problem...

Not exactly: the writer is blocked. You can try this out:

robert@fussel:~$ mkfifo ff
robert@fussel:~$ ls -lF ff
prw-r--r-- 1 robert robert 0 2010-01-07 19:25 ff|
robert@fussel:~$ ruby19 -e 'puts("+"*10_000)' > ff
^Z
[1]+ Stopped ruby19 -e 'puts("+"*10_000)' > ff
robert@fussel:~$ wc ff &
[2] 14036
robert@fussel:~$ %1
ruby19 -e 'puts("+"*10_000)' > ff
robert@fussel:~$ 1 1 10001 ff

[2]+ Done wc ff
robert@fussel:~$ jobs
robert@fussel:~$

At the point where I pressed Ctrl-Z the writer hung because the pipe was
full. (The size of a pipe is usually the memory page size of the OS
IIRC, this would be 4k in case of Linux 32 bit).
I must investigate it a bit more...

I'd personally prefer to use the DRb approach because then you can
actually send typed messages, i.e. whatever information you need. Also,
it was fun to play around with those small test programs. ;-) And you
can have the reader run on any machine in the network.

Whatever you do, you have to decide how to go about the situation when
the reader goes away - for whatever reasons. You could write your
messages to a file and use an approach like "tail -f" uses to read them.
But this has the nasty effect of clobbering the file system plus if
the reader goes away the file might grow arbitrary large. And you have
locking issues. Using any in memory pipe (e.g. mkfifo or via DRb) is
preferrable IMHO. The you can still decide in the client what you do if
you cannot get rid of the message.
Thanks a lot.

You're welcome.

Kind regards

robert
 
I

Iñaki Baz Castillo

El Jueves, 7 de Enero de 2010, I=F1aki Baz Castillo escribi=F3:
El Jueves, 7 de Enero de 2010, I=F1aki Baz Castillo escribi=F3:
=20
Hummm, I have a reader process and a writer process.
The wirter process writes into the pipe file.
If I kill the reader process then the writer process remains writting in
the pipe and the data is stored (in the filesystem?).
=20
So there is the leaking problem... I must investigate it a bit more...

Ok, the fifo remains working at SO level so it can receive messages after s=
ome=20
SO buffer capability is filled. Then the writer process blocks when trying =
to=20
"flush" the data.
=46ortunatelly it just blocks as Ruby thread level so other thread can work.

=2D-=20
I=F1aki Baz Castillo <[email protected]>
 
I

Iñaki Baz Castillo

El Jueves, 7 de Enero de 2010, Robert Klemme escribi=F3:
=20
I thought you have multiple writers. Didn't you mention multiple forked
Rack handlers?

Yes, that's true. Sure I'll get into problems when writting in the FIFO fro=
m=20
varios clients at the same time :)
But for that I could generate so many fifo's as Rack workers...


The wirter process writes into the pipe file.
If I kill the reader process then the writer process remains writting in
the pipe and the data is stored (in the filesystem?).

So there is the leaking problem...
=20
Not exactly: the writer is blocked. You can try this out:
=20
robert@fussel:~$ mkfifo ff
robert@fussel:~$ ls -lF ff
prw-r--r-- 1 robert robert 0 2010-01-07 19:25 ff|
robert@fussel:~$ ruby19 -e 'puts("+"*10_000)' > ff
^Z
[1]+ Stopped ruby19 -e 'puts("+"*10_000)' > ff
robert@fussel:~$ wc ff &
[2] 14036
robert@fussel:~$ %1
ruby19 -e 'puts("+"*10_000)' > ff
robert@fussel:~$ 1 1 10001 ff
=20
[2]+ Done wc ff
robert@fussel:~$ jobs
robert@fussel:~$
=20
At the point where I pressed Ctrl-Z the writer hung because the pipe was
full. (The size of a pipe is usually the memory page size of the OS
IIRC, this would be 4k in case of Linux 32 bit).
=20
I must investigate it a bit more...
=20
I'd personally prefer to use the DRb approach because then you can
actually send typed messages, i.e. whatever information you need. Also,
it was fun to play around with those small test programs. ;-) And you
can have the reader run on any machine in the network.
=20
Whatever you do, you have to decide how to go about the situation when
the reader goes away - for whatever reasons.

It's realtime info so if the reader dies then it's not so important to reco=
ver=20
that information when starting again. Well, it would be nice to recover it=
=20
just for 5-10 minutes, but no more.


You could write your
messages to a file and use an approach like "tail -f" uses to read them.
But this has the nasty effect of clobbering the file system plus if
the reader goes away the file might grow arbitrary large. And you have
locking issues. Using any in memory pipe (e.g. mkfifo or via DRb) is
preferrable IMHO. The you can still decide in the client what you do if
you cannot get rid of the message.

Yes, I must think a bit aobut it :)

Thanks a lot for your help.


=2D-=20
I=F1aki Baz Castillo <[email protected]>
 
I

Iñaki Baz Castillo

El Jueves, 7 de Enero de 2010, Phillip Gawlowski escribi=F3:
=20
pipe.write unless pipe.full?

Unfortunatelly #full? is not a method of File :(
Note that I'm using a fifo file (created with "mkfifo file") so it is not=20
"stored" in the filesystem. Instead it's just a communication between two=20
processes at SO level via SO's buffers.


=2D-=20
I=F1aki Baz Castillo <[email protected]>
 
P

Phillip Gawlowski

Unfortunatelly #full? is not a method of File :(

Well, yes, you'd have to implement the method (or something like it)
yourself. ;)
Note that I'm using a fifo file (created with "mkfifo file") so it is not
"stored" in the filesystem. Instead it's just a communication between two
processes at SO level via SO's buffers.

Yeah, I gathered that from your other posts. The general point, though,
still applies: check the pipe's size, and if it grows too large, spin
off a new reading thread.
 
R

Robert Klemme

Well, yes, you'd have to implement the method (or something like it)
yourself. ;)


Yeah, I gathered that from your other posts. The general point, though,
still applies: check the pipe's size, and if it grows too large, spin
off a new reading thread.

That's something different than you proposed initially, isn't it? This
approach (increasing the number of readers if the pipe fills too fast)
is better because it regulates read performance according to load.
pipe.write unless pipe.full?

i.e. check if your pipe hits a set limit on disk, and generate an
exception if the pipe_file reaches (or is close to reaching) the limit.

You could then buffer the data to be written until an additional (or
new) reading thread has started.

IMHO this approach (local buffering if the pipe cannot be written to) is
not really helping because the pipe *is* a buffer already. In other
words, the same effect will happen - only later. The only argument in
favor of additional buffering I can see is less lock contention: if
every writer process has multiple threads that want to write to the
buffer, they could instead write to a Queue internally and a single
reader could read from that local queue and write to the global queue.
That would reduce the number of writers that compete for locks on the
global queue. Whether that is performant or not would need to be tested.

Nevertheless I would start with a simple solution, monitor its
performance and change the implementation if it does not scale well
enough. Often simple solutions work surprisingly well... :)

Kind regards

robert
 
P

Phillip Gawlowski

On 01/07/2010 08:01 PM, Phillip Gawlowski wrote:
That's something different than you proposed initially, isn't it? This
approach (increasing the number of readers if the pipe fills too fast)
is better because it regulates read performance according to load.

A little refined (in that I skipped the buffering), but it's still the
same core: check the pipe, and sin off new threads as needed.
IMHO this approach (local buffering if the pipe cannot be written to) is
not really helping because the pipe *is* a buffer already. In other
words, the same effect will happen - only later. The only argument in
favor of additional buffering I can see is less lock contention: if
every writer process has multiple threads that want to write to the
buffer, they could instead write to a Queue internally and a single
reader could read from that local queue and write to the global queue.
That would reduce the number of writers that compete for locks on the
global queue. Whether that is performant or not would need to be tested.

This might be a difference in interpretation: I see the pipe in this
instance as a simple inter-process communication solution, not per se a
buffer.

Otherwise: You are right.

Also in that performance would've to be tested, and the constraints have
to be known (Iñaki already mentioned that getting all data is less
important to him, so buffering wouldn't be strictly necessary, either).
Nevertheless I would start with a simple solution, monitor its
performance and change the implementation if it does not scale well
enough. Often simple solutions work surprisingly well... :)

Indeed. And it's easier to iterate from something simple, than to
iterate from something complex, too. ;)
 
I

Iñaki Baz Castillo

El Jueves, 7 de Enero de 2010, Robert Klemme escribi=F3:
=20
That's something different than you proposed initially, isn't it? This=20
approach (increasing the number of readers if the pipe fills too fast)=20
is better because it regulates read performance according to load.

Definitively I have no idea of how to know the status of a FIFO (not a IO p=
ipe=20
but a FIFO file). The only it occurs when it's full (because no reader is=20
getting the data) is that the writer #flush operation gets blocked.
I've found no way to determine how "full" is a FIFO file.=20


=2D-=20
I=F1aki Baz Castillo <[email protected]>
 
E

Eric Wong

Iñaki Baz Castillo said:
El Jueves, 7 de Enero de 2010, Robert Klemme escribió:

Definitively I have no idea of how to know the status of a FIFO (not a IO pipe
but a FIFO file). The only it occurs when it's full (because no reader is
getting the data) is that the writer #flush operation gets blocked.
I've found no way to determine how "full" is a FIFO file.

FIFO are pipes, they just have a name on the filesystem.

In any case, use IO#write_nonblock. Any writes you do will raise
Errno::EAGAIN if your FIFO/pipe is full.

See the pipe(7) manpage on a Linux machine, it provides a great overview
of pipe semantics for blocking/non-blocking operations.
 
E

Eric Wong

Iñaki Baz Castillo said:
Hi, I run Unicorn which is a Rack http server using N forked worker processes.
I need the following:

- When a worker processes a HTTP request it must notify some data to other
independent Ruby process XXX (different than Unicorn).

- This communication must be non-blocking, this is, the Unicorn worker process
sends the notification and doesn't wait for response from the process XXX, so
the Unicorn worker can, at the moment, generate the HTTP response and send
back to the client, getting free to handle new HTTP requests.

If stressed enough, everything has to block/reject or run your systems
out of memory/disk space :)
- The ruby process XXX should use some kind of queue system to store
notifications and handle them. In fact, it should take them periodically and
send via TCP (but not HTTP) to other server.


Which is the best approach to design such communication? perhaps using
something as EventMachine for the XXX process and Unix/TCP socket
communication between Unicorn processes and XXX process? any other alternative
or suggestion?

If you only talk between processes on one machine (since you're trying
FIFOs), you can also check out the "posix_mq" gem/library I started
recently:

http://bogomips.org/ruby_posix_mq/

It's less portable than FIFOs but if you're running a modern GNU/Linux or
FreeBSD, it should work. The default queue sizes on Linux are small:
8192 bytes per message, and 10 messages in the queue. You'll need
root to increase them.

But then FIFOs are hard-coded to 65536 bytes total under Linux and a
4096 byte PIPE_BUF (POSIX only requires a 512 byte PIPE_BUF).
 
I

Iñaki Baz Castillo

El Jueves, 7 de Enero de 2010, Eric Wong escribi=F3:
=20
FIFO are pipes, they just have a name on the filesystem.
=20
In any case, use IO#write_nonblock. Any writes you do will raise
Errno::EAGAIN if your FIFO/pipe is full.
=20
See the pipe(7) manpage on a Linux machine, it provides a great overview
of pipe semantics for blocking/non-blocking operations.

Thanks, I'll do.


=2D-=20
I=F1aki Baz Castillo <[email protected]>
 
I

Iñaki Baz Castillo

El Jueves, 7 de Enero de 2010, Eric Wong escribi=F3:
If you only talk between processes on one machine (since you're trying
FIFOs), you can also check out the "posix_mq" gem/library I started
recently:
=20
http://bogomips.org/ruby_posix_mq/

Really interesting. Is it safe to have various processes (Unicorn workers)=
=20
writting to a single posix_mq? or will the data be "mixed"? is there any wa=
y=20
to perform "atomic" writting operation in this queue?

Thanks.


=2D-=20
I=F1aki Baz Castillo <[email protected]>
 
E

Eric Wong

Iñaki Baz Castillo said:
El Jueves, 7 de Enero de 2010, Eric Wong escribió:

Really interesting. Is it safe to have various processes (Unicorn workers)
writting to a single posix_mq? or will the data be "mixed"? is there any way
to perform "atomic" writting operation in this queue?

These queues are completely atomic at the message level and descriptors
can be safely shared between processes/threads. SysV message queues
weren't thread-safe, but POSIX ones are.
 

Ask a Question

Want to reply to this thread or ask your own question?

You'll need to choose a username for the site, which only take a couple of moments. After that, you can post your question and our members will help you out.

Ask a Question

Members online

No members online now.

Forum statistics

Threads
473,744
Messages
2,569,484
Members
44,903
Latest member
orderPeak8CBDGummies

Latest Threads

Top