pipedstreams

  • Thread starter Tomasz Grobelny
  • Start date
T

Tomasz Grobelny

What's wrong with the following code? Usually works fine but from time to
time readInt returns values that has never been written to the stream (that
results in ArrayIndexOutOfBoundsException). On the other hand if I have
only one consumer and one producer thread everything works just fine. What
may be the problem? Should I synchronize access to that streams in some
other way?
--
Regards,
Tomasz Grobelny

import java.io.*;

class comm
{
        PipedInputStream in;
        PipedOutputStream out;
        PipedInputStream[] in_t;
        PipedOutputStream[] out_t;
        public comm(int num)
        {
                in=new PipedInputStream();
                out=new PipedOutputStream();
                try{in.connect(out);}
                catch (IOException e){System.err.println("An error occured
while
connecting");}
                in_t=new PipedInputStream[num];
                out_t=new PipedOutputStream[num];
                for(int i=0;i<num;i++)
                {
                        in_t=new PipedInputStream();
                        out_t=new PipedOutputStream();
                        try{in_t.connect(out_t);}
                        catch (IOException e){System.err.println("An error
occured while
connecting ("+i+")");}
                }
        }
        public synchronized void send(int dest, int v)
        {
                DataOutputStream dos;
                if(dest==-1)
                {
                        synchronized(out)
                        {
                                dos=new DataOutputStream(out);
                               
try{dos.writeInt(v);dos.flush();out.flush();}
                                catch (IOException e
{System.err.println("Error while writing (-1)");}
                        }
                }
                else
                {
                        synchronized(out_t[dest])
                        {
                                dos=new DataOutputStream(out_t[dest]);
                               
try{dos.writeInt(v);dos.flush();out_t[dest].flush();}
                                catch (IOException e
{System.err.println("Error while writing
("+dest+")");}
                        }
                }
        }
        public int receive(int from)
        {
                if(from==-1)
                {
                        synchronized(in)
                        {
                                try{return (new
DataInputStream(in)).readInt();}
                                catch (IOException e
{System.err.println("Error while reading
(-1)");return -1;}
                        }
                }
                else
                {
                        synchronized(in_t[from])
                        {
                                try{return (new
DataInputStream(in_t[from])).readInt();}
                                catch (IOException e
{System.err.println("Error while reading
("+from+")");return -1;}
                        }
                }
        }
}

class mp_producer extends Thread
{
        comm global;
        int id;
        public mp_producer(comm g, int i)
        {
                global=g;
                id=i;
        }
        public void run()
        {
                comm local;
                for(int i=0;;i++)
                {
                        System.out.println("Producer "+id+" Waiting for free
space...");
                        int cd=global.receive(-1);
                        System.out.println("Producer "+id+" Started
producing for "+cd);
                        try{Thread.sleep((int
(java.lang.Math.random()*1000));}catch(java.lang.InterruptedException e){}
                        System.out.println("Producer "+id+" Finished
producing for "+cd+"
item "+i);
                        global.send(cd, i);
                }
        }
}

class mp_consumer extends Thread
{
        comm global;
        int id;
        public mp_consumer(comm g, int i)
        {
                global=g;
                id=i;
        }
        public void run()
        {
                for(;;)
                {
                        global.send(-1, id); //announce ability to consume
an item
                        System.out.println("Consumer "+id+" Waiting for
item...");
                        int d=global.receive(id);
                        System.out.println("Consumer "+id+" Started
consuming: "+d);
                        try{Thread.sleep((int
(java.lang.Math.random()*1000));}catch(java.lang.InterruptedException e){}
                        System.out.println("Consumer "+id+" Finished
consuming: "+d);
                }
        }
}

class mp
{
        public static void main(String[] args)
        {
                int max=10;
                comm globalcomm=new comm(max);
                for(int i=0;i<max;i++)
                {
                        mp_consumer cons=new mp_consumer(globalcomm, i);
                        mp_producer prod=new mp_producer(globalcomm, i);
                        cons.start();
                        prod.start();
                }
        }
}
 
M

Matt Humphrey

Tomasz Grobelny said:
What's wrong with the following code? Usually works fine but from time to
time readInt returns values that has never been written to the stream
(that
results in ArrayIndexOutOfBoundsException). On the other hand if I have
only one consumer and one producer thread everything works just fine. What
may be the problem? Should I synchronize access to that streams in some
other way?

I haven't seen anything yet in this, although it would help greatly if you
could post indented code and for Pete's sake capitalize your classnames.
Although there are only a few places where you index an array here, say
which line the error is occuring on. It can't really be in the comm
constructor, but that still leaves send and receive and I'm guessing you
mean the error is in send because the producer calls that with an index read
from the stream.

I noticed that the catch about line 39 has no closing parenthesis. This is
unusual for cut-and-paste and makes me think the code isn't real (was
re-typed and edited) or not really run. Are you sure you recompiled after
some other fix? The compiler may issue error messages but still leave the
old code behind. Finally, I don't know whether it's a good idea to be
creating and discarding so many DataInput/OutputStreams. I doubt they would
interfere with the stream handling, but why risk it when it's so much easier
to put the wrapped streams into your Comm object.

--
Regards,
Tomasz Grobelny

import java.io.*;

class comm
{
PipedInputStream in;
PipedOutputStream out;
PipedInputStream[] in_t;
PipedOutputStream[] out_t;
public comm(int num)
{
in=new PipedInputStream();
out=new PipedOutputStream();
try{in.connect(out);}
catch (IOException e){System.err.println("An error occured
while
connecting");}
in_t=new PipedInputStream[num];
out_t=new PipedOutputStream[num];
for(int i=0;i<num;i++)
{
in_t=new PipedInputStream();
out_t=new PipedOutputStream();
try{in_t.connect(out_t);}
catch (IOException e){System.err.println("An error
occured while
connecting ("+i+")");}
}
}
public synchronized void send(int dest, int v)
{
DataOutputStream dos;
if(dest==-1)
{
synchronized(out)
{
dos=new DataOutputStream(out);

try{dos.writeInt(v);dos.flush();out.flush();}
catch (IOException e
{System.err.println("Error while writing (-1)");}
}
}
else
{
synchronized(out_t[dest])
{
dos=new DataOutputStream(out_t[dest]);

try{dos.writeInt(v);dos.flush();out_t[dest].flush();}
catch (IOException e


^ Why no closing parenthesis here?

{System.err.println("Error while writing
("+dest+")");}
}
}
}
public int receive(int from)
{
if(from==-1)
{
synchronized(in)
{
try{return (new
DataInputStream(in)).readInt();}
catch (IOException e
{System.err.println("Error while reading
(-1)");return -1;}
}
}
else
{
synchronized(in_t[from])
{
try{return (new
DataInputStream(in_t[from])).readInt();}
catch (IOException e

^ and here for that matter?
{System.err.println("Error while reading
("+from+")");return -1;}
}
}
}
}

class mp_producer extends Thread
{
comm global;
int id;
public mp_producer(comm g, int i)
{
global=g;
id=i;
}
public void run()
{
comm local;
for(int i=0;;i++)
{
System.out.println("Producer "+id+" Waiting for free
space...");
int cd=global.receive(-1);
System.out.println("Producer "+id+" Started
producing for "+cd);
try{Thread.sleep((int
(java.lang.Math.random()*1000));}catch(java.lang.InterruptedException e){}
System.out.println("Producer "+id+" Finished
producing for "+cd+"
item "+i);
global.send(cd, i);
}
}
}

class mp_consumer extends Thread
{
comm global;
int id;
public mp_consumer(comm g, int i)
{
global=g;
id=i;
}
public void run()
{
for(;;)
{
global.send(-1, id); //announce ability to consume
an item
System.out.println("Consumer "+id+" Waiting for
item...");
int d=global.receive(id);
System.out.println("Consumer "+id+" Started
consuming: "+d);
try{Thread.sleep((int
(java.lang.Math.random()*1000));}catch(java.lang.InterruptedException e){}
System.out.println("Consumer "+id+" Finished
consuming: "+d);
}
}
}

class mp
{
public static void main(String[] args)
{
int max=10;
comm globalcomm=new comm(max);
for(int i=0;i<max;i++)
{
mp_consumer cons=new mp_consumer(globalcomm, i);
mp_producer prod=new mp_producer(globalcomm, i);
cons.start();
prod.start();
}
}
}

Cheers,
Matt Humphrey (e-mail address removed) http://www.iviz.com/
 
T

Tomasz Grobelny

Matt said:
I haven't seen anything yet in this, although it would help greatly if you
could post indented code and for Pete's sake capitalize your classnames.
Although there are only a few places where you index an array here, say
which line the error is occuring on. It can't really be in the comm
constructor, but that still leaves send and receive and I'm guessing you
mean the error is in send because the producer calls that with an index
read from the stream.
The file is here http://grobelny.oswiecenia.net/AGH/tw/mp.java.1
It is a bit modified, I (at least) added flush calls on the streams.
The exception looks like this:
Exception in thread "Thread-11" java.lang.ArrayIndexOutOfBoundsException:
1536
at comm.send(mp.java:39)
at mp_producer.run(mp.java:87)

So something wrong must have been read from PipedInputStream by receive
method. Supposedly two instances of DataOutputStream mix bytes in the pipe,
but why if access to streams is synchronized?
Finally, I don't know whether it's a good idea to be
creating and discarding so many DataInput/OutputStreams. I doubt they
would interfere with the stream handling, but why risk it when it's so
much easier to put the wrapped streams into your Comm object.
It seems it started to work (you never know for sure), thank you very much.
The modified source is under
http://grobelny.oswiecenia.net/AGH/tw/mp.java.2
It is likely that flush methods don't work as expected (in mp.java.1), or
are my expectations wrong?
try{dos.writeInt(v);dos.flush();out_t[dest].flush();}
catch (IOException e

^ Why no closing parenthesis here?
I don't really know, I'm not that mad to type this code by hand instead of
copying. Maybe I tried to manually unwrap this line... doesn't matter now.
 
C

Chris Uppal

Tomasz said:
What's wrong with the following code? Usually works fine but from time to
time readInt returns values that has never been written to the stream
(that results in ArrayIndexOutOfBoundsException). On the other hand if I
have only one consumer and one producer thread everything works just
fine. What may be the problem? Should I synchronize access to that
streams in some other way?

This puzzled me greatly. As far as I can tell your synchronisation /should/ be
correct (although your Comm.send() method is synchronised unnecessarily).

The appended test program illustrates the problem rather more directly. The
program should never read the Whoops!!! line at all, but it does -- and rather
quickly.

It's obvious that what's happening is that the bytes of the ints written into
"out" are getting scrambler together by the time they are read from "in". It's
easy to fix that by ensuring that the reads and writes are synchronised on the
/same/ object, but it's not obvious why that is necessary here.

It's either a rather serious bug in PipedInputStream, or an even worse bit of
nasty undocumented behaviour. It ends up not working the way that one ought to
be able to expect, given the published API and JavaDocs.

This is how I think we /should/ be able to reason:

Consider the Writers. Each atomicWrite() will hold a lock on the input side of
the pipe while it writes 4 bytes (and flushes the stream). Since all write()s
are synchronised on the same object, the writeInt()s will be atomic, so the
bytes written to the underlying stream will not be intermingled. So the bytes
going into the pipe will be strictly arranged in chunks of 4 bytes.

Now, a similar argument applies to the Readers. They all synchronise on the
same object while they are reading, so each one should read chunks of 4 bytes
atomically from the underlying pipe.

So it should work correctly. QED

So how can any intermingling occur ? It's because of undocumented nastiness in
PipedInputStream. Before getting into that, it's worth noting that if I change
the appended code so that it uses two different lock objects (still two locks,
but not using the PipedInputStream and PipedOutputStream instances as the read
and write lock objects), then it works perfectly, so this is definitely a
problem with the piped stream implementation, rather than the above logic.

It turns out that PipedInputStream.read() does a wait() if there is no data
available. Specifically it wait()s on /itself/. And that means that it
releases the lock which is so important to the above reasoning. And /that/
means that the code breaks :-(

I believe that to be a bug in the implementation -- it's fairly naff code
anyway (it has potential failure modes if more than one thread writes to the
corresponding write stream -- which is also either a bug or a nasty
undocumented restriction).

One solution is to ensure that you use separate lock objects for each stream
(either separate reading and writing locks, or one object used as the lock for
both). I guess that the reason your latest version has mysteriously started
working is that you are now using long lived DataXxxxStreams, and are
synchronising on those instead of the underlying piped streams.

A better solution, if you are using JDK 1.5, is to drop the buggy piped streams
altogether and use one of the BlockingQueue implementations from
java.util.concurrent.

-- chris

============= Test.java =============
import java.io.*;

public class Test
{
private final PipedInputStream in;
private final PipedOutputStream out;

Test()
{
in = new PipedInputStream();
out = new PipedOutputStream();
try
{
in.connect(out);
}
catch (IOException e)
{
e.printStackTrace();
System.exit(0);
}
}

class Reader
implements Runnable
{
public void
run()
{
// read a endless stream of ints and check that each
// is in range (0, 10)
for (;;)
{
int i = read();
if (i <= 0 || i >= 10)
{
System.out.println("Whoops!!!");
System.exit(0);
}
}
}
}

class Writer
implements Runnable
{
public void
run()
{
// write a endless stream of 1, 2, 3 ... 8, 9 ...
int i = 1;
for (;;)
{
write(i);
if (++i >= 10)
i = 1;
}
}
}

int
read()
{
try
{
return atomicRead();
}
catch (IOException e)
{
e.printStackTrace();
System.exit(0);
return Integer.MIN_VALUE; // sigh...
}
}

void
write(int i)
{
try
{
atomicWrite(i);
}
catch (IOException e)
{
e.printStackTrace();
System.exit(0);
}
}

private int
atomicRead()
throws IOException
{
synchronized (in)
{
int i = new DataInputStream(in).readInt();
System.out.println("<- " + i);
return i;
}
}

private void
atomicWrite(int i)
throws IOException
{
synchronized (out)
{
System.out.println("-> " + i);
DataOutputStream dos = new DataOutputStream(out);
dos.writeInt(i);
dos.flush();
}
}

private void
go(int n)
{
for (int i = 0; i < n; i++)
{
new Thread(new Reader()).start();
new Thread(new Writer()).start();
}
}

public static void
main(String[] args)
{
new Test().go(5);
}
}
=================================
 
T

Thomas Weidenfeller

Chris said:
I believe that to be a bug in the implementation -- it's fairly naff code
anyway

The most shocking thing in that code is the line

* @author James Gosling

:)
(it has potential failure modes if more than one thread writes to the
corresponding write stream -- which is also either a bug or a nasty
undocumented restriction).

That one has been reported to Sun multiple times. Reports got closed in
typical Sun-style ala "not a documented or specified feature".
PipedReader/Writer has similar problems - not surprising, because the
code has largely been copied from PipedInput/OutputStream.

I don't understand why they don't synchronize on the buffer they use
internally. That would be the natural way from my point of view, since
this is where they hand-over the data between the threads.
A better solution, if you are using JDK 1.5, is to drop the buggy piped streams
altogether and use one of the BlockingQueue implementations from
java.util.concurrent.

One could also give java.nio.Pipe() a try. But this requires to deal
with ByteBuffers for data transfer.

/Thomas
 
C

Chris Uppal

Thomas Weidenfeller wrote:

[me:]
The most shocking thing in that code is the line

* @author James Gosling

:)

That one has been reported to Sun multiple times. Reports got closed in
typical Sun-style ala "not a documented or specified feature".

I can believe it. I get the impression that they are distinctly over-hasty
closing bugs.

I don't understand why they don't synchronize on the buffer they use
internally. That would be the natural way from my point of view, since
this is where they hand-over the data between the threads.

<nods/>

A cleaner implementation would use an explicit Pipe, or FIFOBuffer, object
shared between the two streams That would naturally take over the
wait()/notify() stuff, and all would be well ;-)

-- chris
 

Ask a Question

Want to reply to this thread or ask your own question?

You'll need to choose a username for the site, which only take a couple of moments. After that, you can post your question and our members will help you out.

Ask a Question

Members online

Forum statistics

Threads
473,780
Messages
2,569,608
Members
45,241
Latest member
Lisa1997

Latest Threads

Top