trouble with mkfifo and forks

J

Josh

# In a nutshell:
1) Create a pipe
2) Fork twice
3) Each for sends 10 strings over the pipe
4) See what comes out the other side

So, why isn't what I send coming out?

class WaitListTest < Test::Unit::TestCase
NUMBER_OF_ORDERS = 2
PIPE = "test/wl_pipe"

def setup
`rm #{PIPE}`
`mkfifo #{PIPE}`
end

def test_registering_last_race
term_count = 0
responses = []
pids = []

NUMBER_OF_ORDERS.times do |i|
pids << Process.fork do
sleep 1 + (0.1 * i)
10.times do |a|
File.open("#{RAILS_ROOT}/#{PIPE}", "w+") do |pipe|
sleep 0.1
pipe.puts "Fork #{i} - #{a}"
end
end
File.open("#{RAILS_ROOT}/#{PIPE}", "a") do |pipe|
sleep 0.1
pipe.puts "DONE"
end
end
end

while term_count < NUMBER_OF_ORDERS
File.open("#{RAILS_ROOT}/#{PIPE}", "r+") do |pipe|
val = pipe.gets
if val.match(/DONE/)
term_count += 1
else
responses << val
end
end
end
puts responses.sort
puts responses.count
end
end

# output
Started
Fork 0 - 0
Fork 0 - 1
Fork 0 - 2
Fork 0 - 3
Fork 0 - 4
Fork 0 - 5
Fork 0 - 7
Fork 0 - 9
Fork 1 - 0
Fork 1 - 1
Fork 1 - 2
Fork 1 - 3
Fork 1 - 4
Fork 1 - 5
Fork 1 - 7
Fork 1 - 8
Fork 1 - 9
17

Why isn't this 20?
 
J

Joel VanderWerf

# In a nutshell:
1) Create a pipe
2) Fork twice
3) Each for sends 10 strings over the pipe
4) See what comes out the other side

I'm probably naive about something, but fifos almost never work right
for what I want to do, and I usually end up using (unix domain) sockets
(which also makes it easier to move to tcp sockets if you need to
distribute across hosts).

Anyway, in your code, it seems that the repeated closing of the *read*
end of the pipe causes some data to be lost. If you can modify your read
loop to keep using the same IO object, it seems to work (see below).
Also, I found I needed to watch for #gets returning nil (though that
doesn't happen reliably).

Perhaps someone who knows more about pipes will "pipe up" on this thread.

class WaitListTest
NUMBER_OF_ORDERS = 2
PIPE = File.expand_path("~/tmp/wl_pipe")

def setup
`rm -f #{PIPE}`
`mkfifo #{PIPE}`
end

def test_registering_last_race
term_count = 0
responses = []
pids = []

NUMBER_OF_ORDERS.times do |i|
pids << Process.fork do
sleep 0.1 + (0.1 * i)
10.times do |a|
File.open(PIPE, "w") do |pipe|
sleep 0.1
pipe.puts "Fork #{i} - #{a}"
end
end
File.open(PIPE, "w") do |pipe|
sleep 0.1
pipe.puts "DONE"
end
end
end

pipe = File.open(PIPE, "r")
while term_count < NUMBER_OF_ORDERS
# File.open(PIPE, "r") do |pipe|
val = pipe.gets
case val
when /DONE/
term_count += 1
when nil
puts "pipe closed" # need this too
else
responses << val
end
# end
end

pids.each do |pid|
Process.waitpid(pid)
end

puts responses.sort
puts responses.count
end
end

wlt = WaitListTest.new
wlt.setup
wlt.test_registering_last_race
 
B

Brian Candler

In the receiver, you are constantly opening the pipe for read and then
closing it. I expect that if someone writes to the pipe while there's no
reader, the data is lost.

The program works for me if I swap the open and while around:

File.open("#{RAILS_ROOT}/#{PIPE}", "r") do |pipe|
while term_count < NUMBER_OF_ORDERS
val = pipe.gets
if val.match(/DONE/)
.. etc

But it is much better form to create an anonymous pipe in the parent
(rd, wr = IO.pipe) then fork. Close the rd end in the child and the wr
end in the parent.

Use Socket.pair if you want a bidirectional channel.
 

Ask a Question

Want to reply to this thread or ask your own question?

You'll need to choose a username for the site, which only take a couple of moments. After that, you can post your question and our members will help you out.

Ask a Question

Members online

No members online now.

Forum statistics

Threads
473,774
Messages
2,569,599
Members
45,175
Latest member
Vinay Kumar_ Nevatia
Top