Shell pipeline in Ruby?

M

Michal Suchanek

Hello,

how do you write an equivalent of

$ cmdA | cmdB | cmdC

in Ruby?

Specifically, I would like to see the PID, return value and stderr of
each of these commands but I would like cmdB to read the stdout of
cmdA directly, on its stdin, and similarily for cmdC and cmdB.

Here I am not interested in feeding cmdA some particular input (eg. it
can read /dev/null for all I care) but in general gluing arbitrary fd
to its stdin might be desirable in other cases.

Thanks

Michal
 
J

Jos Backus

[Note: parts of this message were removed to make it a legal post.]

Have you checked out Open3.pipeline?

Jos
 
7

7stud --

Michal Suchanek wrote in post #998275:
Hello,

how do you write an equivalent of

$ cmdA | cmdB | cmdC

How about:

puts `cmdA | cmdB | cmdC`

or

puts %x{cmdA | cmdB | cmdC}
 
M

Michal Suchanek

Have you checked out Open3.pipeline?

I have checked all the stuff I could find once but did not find a solution.

That's why I am asking, perhaps I overlooked something.

While the pipeline gives the status of all the involved processes and
connects them properly which is more than I ever got from Ruby it is
not obvious how to capture the output - the stdout of the last
process can be captured with pipeline_r but the stderrs get sinked
somewhere in the pipeline_run.

Also the in and out options used in the example are not documented afaict.

Thanks

Michal
 
M

Michal Suchanek

There is also popen4 which I believe also gives you the pid with std -
in/out/err

It provides stderr but not the pipeline which cannot be constructed in
pure ruby (unless a popen taking fd:s for stdio is implemented).

Thanks

Michal
 
M

Michal Suchanek

In the spirit of the recent call for better ruby documentation, I wrote

Unfortunately, this does not work.

First, these pipelines don't always work as one would expect so
testing is *required* before posting a "solution".

Second, afaict the reopen method only changes the IOs at Ruby level,
not the fd:s at C level so it's useless in this context.

Thanks

Michal
 
R

Robert Klemme

how do you write an equivalent of

$ cmdA | cmdB | cmdC

in Ruby?

Specifically, I would like to see the PID, return value and stderr of
each of these commands but I would like cmdB to read the stdout of
cmdA directly, on its stdin, and similarily for cmdC and cmdB.

Here I am not interested in feeding cmdA some particular input (eg. it
can read /dev/null for all I care) but in general gluing arbitrary fd
to its stdin might be desirable in other cases.

$ ri Open3.pipeline_r Open3.pipeline_start

Cheers

robert
 
M

Michal Suchanek

$ ri Open3.pipeline_r Open3.pipeline_start
Again, like in Open3.pipeline the :err argument used in the example is
not documented.

From the description of the example it seems it captures the stderr of
all the commands together.

Are there more arguments like that?

What do they mean?

Also this is available in ruby 1.9 only.

Thanks

Michal
 
R

Roger Pack

Unfortunately, this does not work.

Try it now, fixed the bug.
Second, afaict the reopen method only changes the IOs at Ruby level,
not the fd:s at C level so it's useless in this context.

Try it out. This is, AFAIK, the "standard" way to pipeline processes,
and works in 1.8 even.

GL!
-r
 
R

Roger Pack

Roger Pack wrote in post #998434:
In the spirit of the recent call for better ruby documentation, I wrote
a
writeup:
http://en.wikibooks.org/w/index.php...ultiple_Processes&stable=0#Chaining_processes


Unfortunately it appears the wiki formatting is mangled in firefox, but
works well in chrome :)

Here's the complete code snippet:


pipe_me_in, pipe_peer_out = IO.pipe
pipe_peer_in, pipe_me_out = IO.pipe

fork do
STDIN.reopen(pipe_peer_in)
STDOUT.reopen(pipe_peer_out)
Kernel.exec("echo 33")
# this line is never executed because exec moves the process
end
pipe_peer_out.close # file handles have to all be closed in order for
the "read" method, below, to be able to know
# that it's done reading data, so it can return. See also
http://devver.wordpress.com/2009/10/22/beware-of-pipe-duplication-in-subprocesses/
pipe_me_in.read



There is a link in there to a blog describing ruby's shell class,
perhaps you did not see it?

-r
 
R

Robert Klemme

Again, like in Open3.pipeline the :err argument used in the example is
not documented.

Not true. From ri Open3.pipeline_r in 1.9.2p180:

Each cmd is a string or an array. If it is an array, the elements are passed
to Process.spawn.

cmd:
commandline command line string which
is passed to a shell
[env, commandline, opts] command line string which
is passed to a shell
[env, cmdname, arg1, ..., opts] command name and one or
more arguments (no shell)
[env, [cmdname, argv0], arg1, ..., opts] command name and
arguments including argv[0] (no shell)

Note that env and opts are optional, as Process.spawn.

Then

$ ri Process.spawn
From the description of the example it seems it captures the stderr of
all the commands together.

Are there more arguments like that?

What do they mean?

See above.
Also this is available in ruby 1.9 only.

Well?

Cheers

robert
 
M

Michal Suchanek

Hello,

how do you write an equivalent of

$ cmdA | cmdB | cmdC

in Ruby?

Specifically, I would like to see the PID, return value and stderr of
each of these commands but I would like cmdB to read the stdout of
cmdA directly, on its stdin, and similarily for cmdC and cmdB.

Here I am not interested in feeding cmdA some particular input (eg. it
can read /dev/null for all I care) but in general gluing arbitrary fd
to its stdin might be desirable in other cases.

Thanks

Michal

OK, thanks for all the replies.

It seems this is not doable with ruby 1.8 but spawn and similar
methods in 1.9 grew the named arguments which include the file
descriptors.

Here is a working (although not polished) solution for ruby 1.9.

First a short testing script called cmd:

#!/usr/bin/ruby

prefix = ARGV[0] + ":"
append = ARGV[1]

while l = STDIN.gets do
STDOUT << l
STDERR << prefix << l
end

STDOUT.puts append

And the long script with pipes that calls the above:

#!/usr/bin/ruby1.9.1
# Debian calls ruby 1.9 ruby1.9.1

pipes = []
(1..4).each {pipes << IO.pipe}
errs = []
(1..3).each {errs << IO.pipe}

writer = pipes[0][1]
reader = pipes[3][0]
i = 1

# Careful about pipe direction, it only works one way.
pids = [
spawn("cmd", "a", "aa", :in => pipes[0][0], :eek:ut => pipes[1][1], :err
=> errs[0][1] ),
spawn("cmd", "b", "bb", :in => pipes[1][0], :eek:ut => pipes[2][1], :err
=> errs[1][1] ),
spawn("cmd", "c", "cc", :in => pipes[2][0], :eek:ut => pipes[3][1], :err
=> errs[2][1] ),
]

# We don't really want to see these pipes, perhaps some
Open3.pipeline_* method would create them out of sight
pipes.flatten.select{|p| p != writer && p != reader}.each{|p|p.close}\

maxlen = 128 # some arbitrary size for chunk of data read at once

outputs = errs.collect{|r,w|r}
outputs << reader
wsel = [writer]

while true do
ios = IO.select(outputs, wsel, nil, 1)
STDERR << '.'

string = "%4i\n" % i
if writer then
begin
result = writer.write_nonblock(string)
i = i + 1
if ( i > 1000 ) then
writer.close
writer = nil
wsel = nil
end
rescue IO::WaitWritable, Errno::EINTR
end
end

outputs.each{|io|
begin
result = io.read_nonblock(maxlen)
STDOUT << result
rescue IO::WaitReadable, Errno::EINTR, EOFError
if $!.is_a? EOFError then
STDERR.puts "EOF"
outputs.reject!{|o| o == io }
end
end
}

(0...3).each{|i| p = pids
if p and Process.waitpid(p, Process::WNOHANG) then
STDOUT.puts $?.inspect
pids = nil
end
}
break if !ios && pids.select{|pid| pid} == []
end
 
M

Michal Suchanek

Not true. =C2=A0From ri Open3.pipeline_r in 1.9.2p180:

Each cmd is a string or an array. If it is an array, the elements are pas= sed
to Process.spawn.

=C2=A0cmd:
=C2=A0 =C2=A0commandline =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0=
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0command line string=
which
is passed to a shell
=C2=A0 =C2=A0[env, commandline, opts] =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =
=C2=A0 =C2=A0 =C2=A0 command line string which
is passed to a shell
=C2=A0 =C2=A0[env, cmdname, arg1, ..., opts] =C2=A0 =C2=A0 =C2=A0 =C2=A0 = =C2=A0command name and one or
more arguments (no shell)
=C2=A0 =C2=A0[env, [cmdname, argv0], arg1, ..., opts] command name and
arguments including argv[0] (no shell)

=C2=A0Note that env and opts are optional, as Process.spawn.

Then

$ ri Process.spawn

It's not quite clear that Process.spawn description also applies to
options outside the commands and how it applies but maybe it makes
sense, they would be applied to the whole pipeline.

Thanks

Michal
 
M

Michal Suchanek

Roger Pack wrote in post #998434:
iple_Processes&stable=3D0#Chaining_processes


Unfortunately it appears the wiki formatting is mangled in firefox, but
works well in chrome :)

Here's the complete code snippet:


pipe_me_in, pipe_peer_out =3D IO.pipe
pipe_peer_in, pipe_me_out =3D IO.pipe

fork do
=C2=A0STDIN.reopen(pipe_peer_in)
=C2=A0STDOUT.reopen(pipe_peer_out)
=C2=A0Kernel.exec("echo 33")
=C2=A0# this line is never executed because exec moves the process
end
pipe_peer_out.close # file handles have to all be closed in order for
the "read" method, below, to be able to know
# that it's done reading data, so it can return. =C2=A0See also
http://devver.wordpress.com/2009/10/22/beware-of-pipe-duplication-in-subp= rocesses/
pipe_me_in.read

Changing the last line to

puts '"' + pipe_me_in.read.chomp + '"'

Really writes 33 quoted so IO.reopen works for setting up the file descript=
ors.

It is not clear from the docs if it would work or not.

It does work, though:

out =3D File.open("testfile","w")

STDOUT.reopen(out)

system("echo 33")

STDERR.puts '"' + File.read("testfile").chomp + '"'

=3D> "33"

I vaguely recall some issues with using STDIN and STDOUT for
redirection earlier but it might be due to using them incorrectly.
There is a link in there to a blog describing ruby's shell class,
perhaps you did not see it?

Yes, I missed the link labeled just [1]


Thanks

Michal
 
B

Brian Candler

Michal Suchanek wrote in post #998548:
And the long script with pipes that calls the above:

Since you are doing this on a real operating system, there's no need to
use ruby to copy the output of one process to the input of the next
process (unless you want to capture the communication between the
processes as well).

OTOH, making sure all the right FDs are closed in each child can be
tricky in 1.8, because there's no IO.close_on_exec= (although 1.9 has
this).

Here is an example which works in 1.8.7. Obviously it's possible to
refactor this to N children, although I expect you'll end up with
something like the 'pipeline' method mentioned before if you do.

--------------------------------
#Thread.abort_on_exception = true # for debugging

ruby_to_a_rd, ruby_to_a_wr = IO.pipe
a_to_b_rd, a_to_b_wr = IO.pipe
a_err_rd, a_err_wr = IO.pipe
# open: r2a_r, r2a_w, a2b_r, a2b_w, ae_r, ae_w
pid1 = fork do
ruby_to_a_wr.close
a_to_b_rd.close
a_err_rd.close
STDIN.reopen(ruby_to_a_rd)
STDOUT.reopen(a_to_b_wr)
STDERR.reopen(a_err_wr)
exec("cat; echo done cat 1>&2")
STDERR.puts "Whoops! #{$!}"
end
ruby_to_a_rd.close
a_to_b_wr.close
a_err_wr.close
# open: r2a_w, a2b_r, ae_r

b_to_c_rd, b_to_c_wr = IO.pipe
b_err_rd, b_err_wr = IO.pipe
# open: r2a_w, a2b_r, ae_r, b2c_r, b2c_w, be_r, be_w
pid2 = fork do
ruby_to_a_wr.close
b_to_c_rd.close
a_err_rd.close
b_err_rd.close
STDIN.reopen(a_to_b_rd)
STDOUT.reopen(b_to_c_wr)
STDERR.reopen(b_err_wr)
exec("tr 'a-z' 'A-Z'; echo done tr 1>&2")
STDERR.puts "Whoops! #{$!}"
end
a_to_b_rd.close
b_to_c_wr.close
b_err_wr.close
# open: r2a_w, ae_r, b2c_r, be_r

c_to_ruby_rd, c_to_ruby_wr = IO.pipe
c_err_rd, c_err_wr = IO.pipe
# open: r2a_w, ae_r, b2c_r, be_r, c2r_r, c2r_w, ce_r, ce_w
pid3 = fork do
ruby_to_a_wr.close
c_to_ruby_rd.close
a_err_rd.close
b_err_rd.close
c_err_rd.close
STDIN.reopen(b_to_c_rd)
STDOUT.reopen(c_to_ruby_wr)
STDERR.reopen(c_err_wr)
exec("sed 's/O/0/g'; echo done sed 1>&2")
STDERR.puts "Whoops! #{$!}"
end
b_to_c_rd.close
c_to_ruby_wr.close
c_err_wr.close
# open: r2a_w, ae_r, be_r, c2r_r, ce_r

Thread.new do
ruby_to_a_wr.puts "Here is some data"
ruby_to_a_wr.puts "And some more"
ruby_to_a_wr.close
end

Thread.new do
while line = a_err_rd.gets
puts "A err: #{line}"
end
a_err_rd.close
end

Thread.new do
while line = b_err_rd.gets
puts "B err: #{line}"
end
b_err_rd.close
end

Thread.new do
while line = c_err_rd.gets
puts "C err: #{line}"
end
c_err_rd.close
end

while line = c_to_ruby_rd.gets
puts line
end
c_to_ruby_rd.close
--------------------------------

This can be simplified a bit if you don't want three separate error
streams; they can all share the same one. i.e. each child would have

err_rd.close
STDERR.reopen(err_wr)

Regards,

Brian.
 
B

Brian Candler

Here is a ruby 1.9.2 version with close_on_exec=. I leave it as an
exercise to refactor this to an arbitrary pipeline of N commands - it
would return one input pipe, one output pipe, N pids and N error pipes.

Regards,

Brian.


#Thread.abort_on_exception = true # for debugging

ruby_to_a_rd, ruby_to_a_wr = IO.pipe
ruby_to_a_wr.close_on_exec = true # no children should have this

a_to_b_rd, a_to_b_wr = IO.pipe

a_err_rd, a_err_wr = IO.pipe
a_err_rd.close_on_exec = true # no children should have this

pid1 = fork do
a_to_b_rd.close
STDIN.reopen(ruby_to_a_rd)
STDOUT.reopen(a_to_b_wr)
STDERR.reopen(a_err_wr)
exec("cat; echo done cat 1>&2")
STDERR.puts "Whoops! #{$!}"
end
# we don't want these fds, nor any of the further children
ruby_to_a_rd.close
a_to_b_wr.close
a_err_wr.close

b_to_c_rd, b_to_c_wr = IO.pipe
b_err_rd, b_err_wr = IO.pipe
b_err_rd.close_on_exec = true # no children should have this

pid2 = fork do
b_to_c_rd.close
STDIN.reopen(a_to_b_rd)
STDOUT.reopen(b_to_c_wr)
STDERR.reopen(b_err_wr)
exec("tr 'a-z' 'A-Z'; echo done tr 1>&2")
STDERR.puts "Whoops! #{$!}"
end
# we don't want these fds, nor any of the further children
a_to_b_rd.close
b_to_c_wr.close
b_err_wr.close

c_to_ruby_rd, c_to_ruby_wr = IO.pipe
c_err_rd, c_err_wr = IO.pipe
c_err_rd.close_on_exec = true # no children should have this
pid3 = fork do
c_to_ruby_rd.close
STDIN.reopen(b_to_c_rd)
STDOUT.reopen(c_to_ruby_wr)
STDERR.reopen(c_err_wr)
exec("sed 's/O/0/g'; echo done sed 1>&2")
STDERR.puts "Whoops! #{$!}"
end
# we don't want these fds, nor any of the further children
b_to_c_rd.close
c_to_ruby_wr.close
c_err_wr.close

Thread.new do
ruby_to_a_wr.puts "Here is some data"
ruby_to_a_wr.puts "And some more"
ruby_to_a_wr.close
end

Thread.new do
while line = a_err_rd.gets
puts "A err: #{line}"
end
a_err_rd.close
end

Thread.new do
while line = b_err_rd.gets
puts "B err: #{line}"
end
b_err_rd.close
end

Thread.new do
while line = c_err_rd.gets
puts "C err: #{line}"
end
c_err_rd.close
end

while line = c_to_ruby_rd.gets
puts line
end
c_to_ruby_rd.close
 
M

Michal Suchanek

Michal Suchanek wrote in post #998548:

Since you are doing this on a real operating system, there's no need to
use ruby to copy the output of one process to the input of the next

I don't use Ruby for that.

And I used a select loop because I had issues with threads in the past.

However, your example works perfectly for me, even when updated to put
more data through each of the pipes to make sure they don't end up all
in a buffer and the threads have to be actually switched for the
pipeline to work.

I think there might still be issues if the produced data was without
line endings because none of the gets would finish but that can be
solved by using read_nonblock or somesuch instead.

Thanks

Michal

-----------
#Thread.abort_on_exception = true # for debugging

ruby_to_a_rd, ruby_to_a_wr = IO.pipe
a_to_b_rd, a_to_b_wr = IO.pipe
a_err_rd, a_err_wr = IO.pipe
# open: r2a_r, r2a_w, a2b_r, a2b_w, ae_r, ae_w
pid1 = fork do
ruby_to_a_wr.close
a_to_b_rd.close
a_err_rd.close
STDIN.reopen(ruby_to_a_rd)
STDOUT.reopen(a_to_b_wr)
STDERR.reopen(a_err_wr)
exec('while read x ; do echo "$x" ; echo "$x" >&2 ; done')
STDERR.puts "Whoops! #{$!}"
end
ruby_to_a_rd.close
a_to_b_wr.close
a_err_wr.close
# open: r2a_w, a2b_r, ae_r

b_to_c_rd, b_to_c_wr = IO.pipe
b_err_rd, b_err_wr = IO.pipe
# open: r2a_w, a2b_r, ae_r, b2c_r, b2c_w, be_r, be_w
pid2 = fork do
ruby_to_a_wr.close
b_to_c_rd.close
a_err_rd.close
b_err_rd.close
STDIN.reopen(a_to_b_rd)
STDOUT.reopen(b_to_c_wr)
STDERR.reopen(b_err_wr)
exec('while read x ; do x=`echo "$x" | tr a-z A-Z` ; echo "$x" ;
echo "$x" >&2 ; done')
STDERR.puts "Whoops! #{$!}"
end
a_to_b_rd.close
b_to_c_wr.close
b_err_wr.close
# open: r2a_w, ae_r, b2c_r, be_r

c_to_ruby_rd, c_to_ruby_wr = IO.pipe
c_err_rd, c_err_wr = IO.pipe
# open: r2a_w, ae_r, b2c_r, be_r, c2r_r, c2r_w, ce_r, ce_w
pid3 = fork do
ruby_to_a_wr.close
c_to_ruby_rd.close
a_err_rd.close
b_err_rd.close
c_err_rd.close
STDIN.reopen(b_to_c_rd)
STDOUT.reopen(c_to_ruby_wr)
STDERR.reopen(c_err_wr)
exec('while read x ; do x=`echo "$x" | sed -e s/O/0/g` ; echo "$x"
; echo "$x" >&2 ; done')
STDERR.puts "Whoops! #{$!}"
end
b_to_c_rd.close
c_to_ruby_wr.close
c_err_wr.close
# open: r2a_w, ae_r, be_r, c2r_r, ce_r

Thread.new do
ruby_to_a_wr.puts "Here is some data"
(1..1000).each{|i|
ruby_to_a_wr.puts "#{i} And some more"
}
ruby_to_a_wr.close
end

Thread.new do
while line = a_err_rd.gets
puts "A err: #{line}"
end
a_err_rd.close
end

Thread.new do
while line = b_err_rd.gets
puts "B err: #{line}"
end
b_err_rd.close
end

Thread.new do
while line = c_err_rd.gets
puts "C err: #{line}"
end
c_err_rd.close
end

while line = c_to_ruby_rd.gets
puts line
end
c_to_ruby_rd.close
 
M

Michal Suchanek

Hello,

how do you write an equivalent of

$ cmdA | cmdB | cmdC

in Ruby?

Specifically, I would like to see the PID, return value and stderr of
each of these commands but I would like cmdB to read the stdout of
cmdA directly, on its stdin, and similarily for cmdC and cmdB.

Here I am not interested in feeding cmdA some particular input (eg. it
can read /dev/null for all I care) but in general gluing arbitrary fd
to its stdin might be desirable in other cases.

Thanks for all the replies.

To sum up there are multiple methods to achieve this.

- fork - works on ruby 1.8 but not on Windows
- Open3 - very nice interface with pipeline_start and such which
does much of the setup behind the scenes but from what I read on the
Net does not work on Windows, again
- Open3-win32 and Open4 exist to address this issue but they may not
have the same interface.
- Process.spawn - works only in ruby 1.9 but should be portable to
Windows should you need it. Basically equivalent to Open3 except you
see all the gory details. Not sure why Open3 does not use this on
Windows. Maybe Open3 was in 1.8 already when spawn did not allow for
redirects or something.
- Shell - somewhat not well known and not well documented Ruby
library. You will reimplement this if you use one of the above ;-)

For reading and writing numerous pipes you can use either a Ruby
thread per pipe or a select loop, either works.

Thanks

Michal
 

Ask a Question

Want to reply to this thread or ask your own question?

You'll need to choose a username for the site, which only take a couple of moments. After that, you can post your question and our members will help you out.

Ask a Question

Members online

Forum statistics

Threads
473,773
Messages
2,569,594
Members
45,123
Latest member
Layne6498
Top