Threaded IO trouble

  • Thread starter Michal Suchanek
  • Start date
M

Michal Suchanek

Hello

I do not understand the internals of Ruby IO but what I get here looks
really awful.

I am interfacing an application that processes lines of text and
buffers part of the output until EOF (or EOF tag). I thought using an
additional thread would be the right thing to do in this case: one
thread writes stuff, and as the application starts to output stuff
another thread wakes up to read it.

There are two problems here:

1) only reading inside the separate thread is possible, writing causes
stdin to be read into the program

2) reading inside the thread produces deadlocks

Attaching the files required to reproduce both failures.

=================== mock.rb ===================
buf=[]
while l=gets
buf << l
puts buf.shift if buf.length >3
end
=================== garbage ===================
ilks nvd
sdh v
df bhl
dj fbo
fj wr;oug
rbfg
=================== testt.rb ===================
def unidentified l
false
end

def try_analyze *words
return [] if words.length < 1
analyzer = IO.popen( 'ruby -w mock.rb', IO::RDWR | IO::SYNC )
res = []
t = Thread.new( (IO::for_fd analyzer.fileno) ,res){|fd,ary|
while l = fd.gets do
break if l =~ %r|^</csts>$|
if l =~ /^$/
STDERR << "try_analyze: Empty line in output! \n"
next
else
ary.push l
end
end
fd.close rescue nil # hopefully prevents zombie hordes
}
words.each{|w|
STDERR.puts "try_analyze: #{w}"
analyzer.puts "<f>"+w
}
analyzer.puts "</csts>"
analyzer.close_write
t.join
res.map{|w|
if unidentified w then
nil
else if w !~ /^<f/
STDERR << "try_analyze: JUNK: #{w} \n"
nil
else
w
end end
}
end

1.upto(10000){|_|
try_analyze *%w(a b c d e f g h i)
}
=================== testt2.rb ===================
def unidentified l
false
end

def try_analyze *words
return [] if words.length < 1
analyzer = IO.popen 'ruby -w mock.rb'
res = []

t = Thread.new( (IO::for_fd analyzer.fileno) ,words){|fd,ary|
fd.sync = true
ary.each{|w|
STDERR.puts "try_analyze: #{w}"
fd.puts "<f>"+w
}
fd.puts "</csts>"
analyzer.close_write
}
analyzer.sync = true
while l = analyzer.gets do
break if l =~ %r|^</csts>$|
if l =~ /^$/
STDERR << "try_analyze: Empty line in output! \n"
next
else
res.push l
end
end
fd.close rescue nil # hopefully prevents zombie hordes
res.map{|w|
if unidentified w then
nil
else if w !~ /^<f/
STDERR << "try_analyze: JUNK: #{w} \n"
nil
else
w
end end
}
end

1.upto(10){|_|
try_analyze *%w(a b c d e f g h i)
}
======================================

Use:

$ cat garbage | ruby -w testt2.rb
testt2.rb:47: warning: `*' interpreted as argument prefix
try_analyze: a
try_analyze: JUNK: ilks nvd

try_analyze: JUNK: sdh v

try_analyze: JUNK: df bhl

try_analyze: a
try_analyze: a
try_analyze: a
try_analyze: a
try_analyze: a
try_analyze: a
try_analyze: a
try_analyze: a
try_analyze: a

$ cat garbage | ruby -w testt.rb

[lots of output snipped]

deadlock 0x53d4: sleep:F(4) - testt.rb:14
deadlock 0x31704: sleep:S (main) - testt.rb:27
testt.rb:27:in `write': Thread(0x31704): deadlock (fatal)
from testt.rb:27:in `puts'
from testt.rb:27:in `try_analyze'
from testt.rb:25:in `each'
from testt.rb:25:in `try_analyze'
from testt.rb:45
from testt.rb:44:in `upto'
from testt.rb:44


Thanks

Michal
 
M

Michal Suchanek

Hello

I do not understand the internals of Ruby IO but what I get here looks
really awful.

I am interfacing an application that processes lines of text and
buffers part of the output until EOF (or EOF tag). I thought using an
additional thread would be the right thing to do in this case: one
thread writes stuff, and as the application starts to output stuff
another thread wakes up to read it.

There are two problems here:

1) only reading inside the separate thread is possible, writing causes
stdin to be read into the program

2) reading inside the thread produces deadlocks

Attaching the files required to reproduce both failures.

=================== mock.rb ===================
buf=[]
while l=gets
buf << l
puts buf.shift if buf.length >3
end
Actually the mockup misses a line here:

while l = buf.shift do puts l end

However, the program reproduces the errors anyway.
=================== garbage ===================
ilks nvd
sdh v
df bhl
dj fbo
fj wr;oug
rbfg
=================== testt.rb ===================
def unidentified l
false
end

def try_analyze *words
return [] if words.length < 1
analyzer = IO.popen( 'ruby -w mock.rb', IO::RDWR | IO::SYNC )
res = []
t = Thread.new( (IO::for_fd analyzer.fileno) ,res){|fd,ary|
while l = fd.gets do
break if l =~ %r|^</csts>$|
if l =~ /^$/
STDERR << "try_analyze: Empty line in output! \n"
next
else
ary.push l
end
end
fd.close rescue nil # hopefully prevents zombie hordes
}
words.each{|w|
STDERR.puts "try_analyze: #{w}"
analyzer.puts "<f>"+w
}
analyzer.puts "</csts>"
analyzer.close_write
t.join
res.map{|w|
if unidentified w then
nil
else if w !~ /^<f/
STDERR << "try_analyze: JUNK: #{w} \n"
nil
else
w
end end
}
end

1.upto(10000){|_|
try_analyze *%w(a b c d e f g h i)
}
=================== testt2.rb ===================
def unidentified l
false
end

def try_analyze *words
return [] if words.length < 1
analyzer = IO.popen 'ruby -w mock.rb'
res = []

t = Thread.new( (IO::for_fd analyzer.fileno) ,words){|fd,ary|
fd.sync = true
ary.each{|w|
STDERR.puts "try_analyze: #{w}"
fd.puts "<f>"+w
}
fd.puts "</csts>"
analyzer.close_write
}
analyzer.sync = true
while l = analyzer.gets do
break if l =~ %r|^</csts>$|
if l =~ /^$/
STDERR << "try_analyze: Empty line in output! \n"
next
else
res.push l
end
end
fd.close rescue nil # hopefully prevents zombie hordes
res.map{|w|
if unidentified w then
nil
else if w !~ /^<f/
STDERR << "try_analyze: JUNK: #{w} \n"
nil
else
w
end end
}
end

1.upto(10){|_|
try_analyze *%w(a b c d e f g h i)
}
======================================

Use:

$ cat garbage | ruby -w testt2.rb
testt2.rb:47: warning: `*' interpreted as argument prefix
try_analyze: a
try_analyze: JUNK: ilks nvd

try_analyze: JUNK: sdh v

try_analyze: JUNK: df bhl

try_analyze: a
try_analyze: a
try_analyze: a
try_analyze: a
try_analyze: a
try_analyze: a
try_analyze: a
try_analyze: a
try_analyze: a

$ cat garbage | ruby -w testt.rb

[lots of output snipped]

deadlock 0x53d4: sleep:F(4) - testt.rb:14
deadlock 0x31704: sleep:S (main) - testt.rb:27
testt.rb:27:in `write': Thread(0x31704): deadlock (fatal)
from testt.rb:27:in `puts'
from testt.rb:27:in `try_analyze'
from testt.rb:25:in `each'
from testt.rb:25:in `try_analyze'
from testt.rb:45
from testt.rb:44:in `upto'
from testt.rb:44


Thanks

Michal
 
R

Robert Klemme

2008/8/6 Michal Suchanek said:
Hello

I do not understand the internals of Ruby IO but what I get here looks
really awful.

I am interfacing an application that processes lines of text and
buffers part of the output until EOF (or EOF tag). I thought using an
additional thread would be the right thing to do in this case: one
thread writes stuff, and as the application starts to output stuff
another thread wakes up to read it.

There are two problems here:

1) only reading inside the separate thread is possible, writing causes
stdin to be read into the program

2) reading inside the thread produces deadlocks

Attaching the files required to reproduce both failures.

=================== mock.rb ===================
buf=[]
while l=gets
buf << l
puts buf.shift if buf.length >3
end
Actually the mockup misses a line here:

while l = buf.shift do puts l end

And it misses another line at the beginning:

$defout.sync = true

:)

With that change for me this works as expected:

10:04:42 Temp$ ./ttest.rb
1218096292.04: main : 0
1218096293.07: main : 1
1218096297.08: main : 2
1218096298.08: main : 3
1218096298.08: reader : 0
1218096303.08: main : 4
1218096303.08: reader : 1
1218096306.09: reader : 2
1218096306.09: reader : 3
1218096306.09: reader : 4
10:05:06 Temp$ cat ttest.rb
#!/bin/env ruby

def log(m)
t = Thread.current
printf "%10.2f: %-10s: %s\n",
Time.now,
t[:label] || t.inspect,
m
end

Thread.current[:label] = "main"

IO.popen "./mock.rb", "r+" do |io|
io.sync = true

t = Thread.new do
Thread.current[:label] = "reader"
io.each do |line|
log line.chomp
end
end

5.times do |i|
log i
io.puts i
sleep(1 + rand(5))
end

io.close_write
t.join
end
10:05:08 Temp$

Kind regards

robert
 
M

Michal Suchanek

And it misses another line at the beginning:

$defout.sync = true

:)

Fixing the mockup is not the right way, I need the ruby script working
with the application which is represented by the mockup. Also doing 5
repetitions is insufficient to test for the problem. It fails only in
few % of cases.

Thanks

Michal
 
R

Robert Klemme

2008/8/7 Michal Suchanek said:
Fixing the mockup is not the right way,

Maybe I am missing your point but the buffering in the mock client is
certainly delaying the complete process. Also, I did not only add
that line to mock.rb but provided a different client implementation.
Did you actually try it out?
I need the ruby script working
with the application which is represented by the mockup. Also doing 5
repetitions is insufficient to test for the problem. It fails only in
few % of cases.

Well, you can imagine that I did not want to post 1,000,000 lines
here. Do the errors you report show up with my code when increasing
the number of lines?

I usually start with a simple bit to verify the basic functionality is
ok and build from there. I do not understand, why you go to all this
fd duplicating stuff. Also, I find your processor quite complex and
it seems that for the error you are seeing a simpler program will be
sufficient.

Btw, what does "writing causes stdin to be read into the program" exactly mean?

Cheers

robert
 
M

Michal Suchanek

Maybe I am missing your point but the buffering in the mock client is
certainly delaying the complete process. Also, I did not only add
that line to mock.rb but provided a different client implementation.
Did you actually try it out?

Sorry, I did not try that yet.
Well, you can imagine that I did not want to post 1,000,000 lines
here. Do the errors you report show up with my code when increasing
the number of lines?

I usually start with a simple bit to verify the basic functionality is
ok and build from there. I do not understand, why you go to all this
fd duplicating stuff. Also, I find your processor quite complex and
it seems that for the error you are seeing a simpler program will be
sufficient.

I came from the other end: I have a script that used to work so I
tried to rip minimal piece of the script so that if a modification to
the example makes it work I would be able to apply the modification
back to the original script.
Btw, what does "writing causes stdin to be read into the program" exactly mean?

Did you try testt2?

That prints out the stuff that you feed it on stdin for some reason I
don't understand at all.

Thanks

Michal
 
M

Michal Suchanek

Sorry, I did not try that yet.

Well, I did not quite understand how your client maps to my client,
and it is indeed completely different.

While your client does only one popen my client does numerous popens.
This is necessary to work around the buffering: the pipe is closed
every time so that the application knows it's time to print out the
rest of the output.

Your client is indeed very slow with the mockup and might fail
completely with the application I use.

I haven't explored the option of using an EOF marker instead of
closing the pipe too much. Maybe that would be the option that works
around limitations in both my application and Ruby.

As for duplication the IO using a fd: I was unsure it is safe to use a
single IO structure in two threads.

Thanks

Michal
 
R

Robert Klemme

Well, I did not quite understand how your client maps to my client,
and it is indeed completely different.

While your client does only one popen my client does numerous popens.
This is necessary to work around the buffering: the pipe is closed
every time so that the application knows it's time to print out the
rest of the output.

Well, that obviously depends on the application you are using for
filtering. If it will send out the last output only on termination
there is no other way than to create new processes over and over again.
Of course, this is not really efficient - especially for short lived
processes - no matter what programming language you use.
Your client is indeed very slow with the mockup and might fail
completely with the application I use.

I am not sure what you mean by "slow". I hope you noticed the sleep in
there.
I haven't explored the option of using an EOF marker instead of
closing the pipe too much. Maybe that would be the option that works
around limitations in both my application and Ruby.

So the application that replaces mock.rb is under your control? I was
assuming that it is not and you need to work with it as is. In that
case you should certainly introduce some kind of signaling for the end
of one chunk of data (if that is needed).
As for duplication the IO using a fd: I was unsure it is safe to use a
single IO structure in two threads.

It is because you are using it in one direction in each thread only.

Cheers

robert
 
M

Michal Suchanek

Well, that obviously depends on the application you are using for
filtering. If it will send out the last output only on termination there is
no other way than to create new processes over and over again. Of course,
this is not really efficient - especially for short lived processes - no
matter what programming language you use.

I am under the impression it is so. Using single instance of the
application did not work so far. However, there are at least two
markers that could be possibly used for input termination so it might
be worth experimenting with that a bit.

Still even if it is possible to run a single instance ruby should be
able to run multiple instances, too.
I am not sure what you mean by "slow". I hope you noticed the sleep in
there.
Oh, yeah, no ehmm..


So the application that replaces mock.rb is under your control? I was
assuming that it is not and you need to work with it as is. In that case
you should certainly introduce some kind of signaling for the end of one
chunk of data (if that is needed).

Unfortunately it is not. If it was so I would use a saner interface in
the first place.
It is because you are using it in one direction in each thread only.

Thanks

Michal
 
R

Roger Pack

deadlock 0x53d4: sleep:F(4) - testt.rb:14
deadlock 0x31704: sleep:S (main) - testt.rb:27
testt.rb:27:in `write': Thread(0x31704): deadlock (fatal)
from testt.rb:27:in `puts'
from testt.rb:27:in `try_analyze'


I recently got something similar waiting for threads:
/lib/ruby_useful_here.rb:682:in `join': Thread(0x346fc): deadlock
(fatal)

In error, at least to my reckoning.
Answer for now: avoid threads and just used evented sockets, like Rev or
EventMachine.

-R
 
M

Michal Suchanek

So it turns out the application I am using has some provisions for
running in terminal. If I run just the application from a shell it
behaves much like cat - it processes lines as I type them except it
buffers one line. If I wrap it in iconvs or cats it buffers much more,
and the only way to get output is to close the input pipe.

So I would really like to get ruby to use pipes properly.

I can reproduce deadlocks even when I replace the application with cat:

$ cat testt.rb
def try_analyze *words
analyzer = IO.popen( 'cat', IO::RDWR | IO::SYNC )
res = []

t = Thread.new( (IO::for_fd analyzer.fileno) ,res){|fd,ary|
while l = fd.gets do
STDERR.putc 8
ary.push l
end
fd.close rescue nil # hopefully prevents zombie hordes
}

words.each{|w|
STDERR.putc '.'[0]
analyzer.puts w
}

analyzer.close_write rescue nil
t.join
end

1.upto(10000){|_|
try_analyze *%w(a b c d e f g h i)
}

$ ruby -w testt.rb
testt.rb:23: warning: `*' interpreted as argument prefix
....cat: stdin: Bad file descriptor
deadlock 0x5154: sleep:F(4) - testt.rb:6
deadlock 0x318f8: sleep:S (main) - testt.rb:15
testt.rb:15:in `write': Thread(0x318f8): deadlock (fatal)
from testt.rb:15:in `puts'
from testt.rb:15:in `try_analyze'
from testt.rb:13:in `each'
from testt.rb:13:in `try_analyze'
from testt.rb:23
from testt.rb:22:in `upto'
from testt.rb:22

The other way around the code looks much simpler but it really locks up:

$ cat testt2.rb
def try_analyze *words
res=[]
analyzer = IO.popen 'cat', IO::RDWR + IO::SYNC

t = Thread.new(analyzer, words){|fd,ary|
ary.each{|w|
STDERR.putc '.'[0]
fd.puts w
}
fd.close_write
}

while l = analyzer.gets do
STDERR.putc 8
res.push l
end
fd.close rescue nil # hopefully prevents zombie hordes
res
end

1.upto(10000){|_|
try_analyze *%w(a b c d e f g h i)
}

$ ruby -w testt2.rb
testt2.rb:22: warning: `*' interpreted as argument prefix
^Ctestt2.rb:3:in `popen': Interrupt from testt2.rb:3:in `try_analyze'
from testt2.rb:22
from testt2.rb:21:in `upto'
from testt2.rb:21


Any suggestions how to make pipes work in ruby are welcome.

Thanks

Michal
 
R

Roger Pack

[lots of output snipped]
deadlock 0x53d4: sleep:F(4) - testt.rb:14
deadlock 0x31704: sleep:S (main) - testt.rb:27
testt.rb:27:in `write': Thread(0x31704): deadlock (fatal)
from testt.rb:27:in `puts'
from testt.rb:27:in `try_analyze'
from testt.rb:25:in `each'
from testt.rb:25:in `try_analyze'
from testt.rb:45
from testt.rb:44:in `upto'
from testt.rb:44

It's probably a bug in Ruby. Most recently I've gotten deadlock
messages just joining on other threads [which do terminate correctly],
which thing is truly bizarre to me. I don't seem to remember it
happening in older patch versions of Ruby. Tough to isolate.

Thoughts?
-=R
 
M

Michal Suchanek

[lots of output snipped]

deadlock 0x53d4: sleep:F(4) - testt.rb:14
deadlock 0x31704: sleep:S (main) - testt.rb:27
testt.rb:27:in `write': Thread(0x31704): deadlock (fatal)
from testt.rb:27:in `puts'
from testt.rb:27:in `try_analyze'
from testt.rb:25:in `each'
from testt.rb:25:in `try_analyze'
from testt.rb:45
from testt.rb:44:in `upto'
from testt.rb:44


It's probably a bug in Ruby. Most recently I've gotten deadlock
messages just joining on other threads [which do terminate correctly],
which thing is truly bizarre to me. I don't seem to remember it
happening in older patch versions of Ruby. Tough to isolate.

These tests do have the same results in 1.8.1 on OS X.

However, I the script in which I saw the error first used to run on
Linux in older versions of Ruby. Still it might be that the error
occurs less often with older versions, and I think I do at most
hundreds of calls in one script run, not thousands.

Thanks

Michal
 

Ask a Question

Want to reply to this thread or ask your own question?

You'll need to choose a username for the site, which only take a couple of moments. After that, you can post your question and our members will help you out.

Ask a Question

Members online

No members online now.

Forum statistics

Threads
473,770
Messages
2,569,584
Members
45,075
Latest member
MakersCBDBloodSupport

Latest Threads

Top