T
Tomasz Grobelny
What's wrong with the following code? Usually works fine but from time to
time readInt returns values that has never been written to the stream (that
results in ArrayIndexOutOfBoundsException). On the other hand if I have
only one consumer and one producer thread everything works just fine. What
may be the problem? Should I synchronize access to that streams in some
other way?
--
Regards,
Tomasz Grobelny
import java.io.*;
class comm
{
PipedInputStream in;
PipedOutputStream out;
PipedInputStream[] in_t;
PipedOutputStream[] out_t;
public comm(int num)
{
in=new PipedInputStream();
out=new PipedOutputStream();
try{in.connect(out);}
catch (IOException e){System.err.println("An error occured
while
connecting");}
in_t=new PipedInputStream[num];
out_t=new PipedOutputStream[num];
for(int i=0;i<num;i++)
{
in_t=new PipedInputStream();
out_t=new PipedOutputStream();
try{in_t.connect(out_t);}
catch (IOException e){System.err.println("An error
occured while
connecting ("+i+")");}
}
}
public synchronized void send(int dest, int v)
{
DataOutputStream dos;
if(dest==-1)
{
synchronized(out)
{
dos=new DataOutputStream(out);
try{dos.writeInt(v);dos.flush();out.flush();}
catch (IOException e
{System.err.println("Error while writing (-1)");}
}
}
else
{
synchronized(out_t[dest])
{
dos=new DataOutputStream(out_t[dest]);
try{dos.writeInt(v);dos.flush();out_t[dest].flush();}
catch (IOException e
{System.err.println("Error while writing
("+dest+")");}
}
}
}
public int receive(int from)
{
if(from==-1)
{
synchronized(in)
{
try{return (new
DataInputStream(in)).readInt();}
catch (IOException e
{System.err.println("Error while reading
(-1)");return -1;}
}
}
else
{
synchronized(in_t[from])
{
try{return (new
DataInputStream(in_t[from])).readInt();}
catch (IOException e
{System.err.println("Error while reading
("+from+")");return -1;}
}
}
}
}
class mp_producer extends Thread
{
comm global;
int id;
public mp_producer(comm g, int i)
{
global=g;
id=i;
}
public void run()
{
comm local;
for(int i=0;;i++)
{
System.out.println("Producer "+id+" Waiting for free
space...");
int cd=global.receive(-1);
System.out.println("Producer "+id+" Started
producing for "+cd);
try{Thread.sleep((int
(java.lang.Math.random()*1000));}catch(java.lang.InterruptedException e){}
System.out.println("Producer "+id+" Finished
producing for "+cd+"
item "+i);
global.send(cd, i);
}
}
}
class mp_consumer extends Thread
{
comm global;
int id;
public mp_consumer(comm g, int i)
{
global=g;
id=i;
}
public void run()
{
for(;
{
global.send(-1, id); //announce ability to consume
an item
System.out.println("Consumer "+id+" Waiting for
item...");
int d=global.receive(id);
System.out.println("Consumer "+id+" Started
consuming: "+d);
try{Thread.sleep((int
(java.lang.Math.random()*1000));}catch(java.lang.InterruptedException e){}
System.out.println("Consumer "+id+" Finished
consuming: "+d);
}
}
}
class mp
{
public static void main(String[] args)
{
int max=10;
comm globalcomm=new comm(max);
for(int i=0;i<max;i++)
{
mp_consumer cons=new mp_consumer(globalcomm, i);
mp_producer prod=new mp_producer(globalcomm, i);
cons.start();
prod.start();
}
}
}
time readInt returns values that has never been written to the stream (that
results in ArrayIndexOutOfBoundsException). On the other hand if I have
only one consumer and one producer thread everything works just fine. What
may be the problem? Should I synchronize access to that streams in some
other way?
--
Regards,
Tomasz Grobelny
import java.io.*;
class comm
{
PipedInputStream in;
PipedOutputStream out;
PipedInputStream[] in_t;
PipedOutputStream[] out_t;
public comm(int num)
{
in=new PipedInputStream();
out=new PipedOutputStream();
try{in.connect(out);}
catch (IOException e){System.err.println("An error occured
while
connecting");}
in_t=new PipedInputStream[num];
out_t=new PipedOutputStream[num];
for(int i=0;i<num;i++)
{
in_t=new PipedInputStream();
out_t=new PipedOutputStream();
try{in_t.connect(out_t);}
catch (IOException e){System.err.println("An error
occured while
connecting ("+i+")");}
}
}
public synchronized void send(int dest, int v)
{
DataOutputStream dos;
if(dest==-1)
{
synchronized(out)
{
dos=new DataOutputStream(out);
try{dos.writeInt(v);dos.flush();out.flush();}
catch (IOException e
{System.err.println("Error while writing (-1)");}
}
}
else
{
synchronized(out_t[dest])
{
dos=new DataOutputStream(out_t[dest]);
try{dos.writeInt(v);dos.flush();out_t[dest].flush();}
catch (IOException e
{System.err.println("Error while writing
("+dest+")");}
}
}
}
public int receive(int from)
{
if(from==-1)
{
synchronized(in)
{
try{return (new
DataInputStream(in)).readInt();}
catch (IOException e
{System.err.println("Error while reading
(-1)");return -1;}
}
}
else
{
synchronized(in_t[from])
{
try{return (new
DataInputStream(in_t[from])).readInt();}
catch (IOException e
{System.err.println("Error while reading
("+from+")");return -1;}
}
}
}
}
class mp_producer extends Thread
{
comm global;
int id;
public mp_producer(comm g, int i)
{
global=g;
id=i;
}
public void run()
{
comm local;
for(int i=0;;i++)
{
System.out.println("Producer "+id+" Waiting for free
space...");
int cd=global.receive(-1);
System.out.println("Producer "+id+" Started
producing for "+cd);
try{Thread.sleep((int
(java.lang.Math.random()*1000));}catch(java.lang.InterruptedException e){}
System.out.println("Producer "+id+" Finished
producing for "+cd+"
item "+i);
global.send(cd, i);
}
}
}
class mp_consumer extends Thread
{
comm global;
int id;
public mp_consumer(comm g, int i)
{
global=g;
id=i;
}
public void run()
{
for(;
{
global.send(-1, id); //announce ability to consume
an item
System.out.println("Consumer "+id+" Waiting for
item...");
int d=global.receive(id);
System.out.println("Consumer "+id+" Started
consuming: "+d);
try{Thread.sleep((int
(java.lang.Math.random()*1000));}catch(java.lang.InterruptedException e){}
System.out.println("Consumer "+id+" Finished
consuming: "+d);
}
}
}
class mp
{
public static void main(String[] args)
{
int max=10;
comm globalcomm=new comm(max);
for(int i=0;i<max;i++)
{
mp_consumer cons=new mp_consumer(globalcomm, i);
mp_producer prod=new mp_producer(globalcomm, i);
cons.start();
prod.start();
}
}
}