S
Suresh
Hi,
I am trying to write a small program using the NIO class that would read
from a socket some data, process it and then send the result back. For this
I need to register the channel for OP_READ first, when I have processed the
data and have some results I would set it to OP_READ | OP_WRITE. When I dont
have anything more to write to the channel, I set it back to OP_READ.
This seems to work fine on a linux and Windows system. But it doesnt work on
the sun system. I get a WRITE notification only once in the begning. But
never again. Am I going something wrong here ? Is there a better way to do
this ?
I have attached the code I used. (The code is very curde one, but helps show
the problem.)
The system that I used for testing is:
$ uname -X
System = SunOS
Node = suntest
Release = 5.10
KernelID = Generic
Machine = i86pc
BusType = <unknown>
Serial = <unknown>
Users = <unknown>
OEM# = 0
Origin# = 1
NumCPU = 2
$ java -version
java version "1.5.0_02"
Java(TM) 2 Runtime Environment, Standard Edition (build 1.5.0_02-b09)
Java HotSpot(TM) Client VM (build 1.5.0_02-b09, mixed mode)
Thanks for your time.
<CODE>
import java.net.*;
import java.io.*;
import java.util.*;
import java.nio.channels.*;
import java.nio.*;
import java.nio.channels.spi.*;
public class SelectorTestServer {
/** Single selector for accepts, reads */
private Selector readSelector = null;
/** ServerSocketChannel which listens for client connections*/
private ServerSocketChannel ssch = null;
/** The thread that waits for ready Channels - accept / read*/
private SelectorThread readThread = null;
/** Input queue **/
private Vector inData=new Vector();
/** Output queue **/
private Vector outData=new Vector();
private ByteBuffer inBuff;
private ByteBuffer outBuff;
private Object readLock=new Object();
/** Thread which runs the Selector */
private class SelectorThread extends Thread {
public SelectorThread() {
super("SelectorThread");
}
public void run () {
boolean running = true;
try{
// block until a Channel is ready for I/O
while (1==1) {
System.out.println("Going to select:
"+readSelector.keys().size());
while(readSelector.select() > 0){
System.out.println("In buffer: "+inData.size()+" Out
buffer: "+outData.size());
System.out.println("Read Thread while loop");
Set readyKeys = readSelector.selectedKeys();
Iterator i = readyKeys.iterator();
while (i.hasNext()){
SelectionKey sk = (SelectionKey) i.next();
i.remove();
if (sk.isValid() && sk.isAcceptable()){
// new client connection
ServerSocketChannel nextReady = (ServerSocketChannel)
sk.channel();
SocketChannel channel = nextReady.accept();
channel.configureBlocking(false);
channel.register(readSelector, SelectionKey.OP_READ);
WriteThread writeThread=new
WriteThread(channel);
writeThread.start();
}
if (sk.isValid() && sk.isReadable()){
System.out.println("****** readable");
SocketChannel
channel=(SocketChannel)sk.channel();
inBuff=ByteBuffer.allocate(1024);
int len=channel.read(inBuff);
if (len == -1 || len == 0) {
// Socket closed
channel.close();
}else{
inData.add(inBuff);
System.out.println("Read "+len+"
bytes");
synchronized(readLock){
readLock.notify();
}
}
}
if (sk.isValid() && sk.isWritable()) {
System.out.println("****** writable");
SocketChannel
channel=(SocketChannel)sk.channel();
if (outData.size() > 0) {
outBuff=(ByteBuffer)outData.remove(0);
outBuff.flip();
int len=channel.write(outBuff);
System.out.println("Sent ... "+len);
}
if (outData.size() == 0) {
// Nothing to send anymore ... so,
only reading
channel.register(readSelector,SelectionKey.OP_READ);
}
}
}
}
}
}
catch (Exception ex){
System.out.println("Exception in selector loop: ");
ex.printStackTrace();
running = false;
}
System.out.println("Selector thread terminated");
System.exit(0);
} //end run()
} //end class
/** Thread which runs the Selector */
private class WriteThread extends Thread {
private SocketChannel channel;
public WriteThread(SocketChannel chnl) {
super("WriteThread");
channel=chnl;
}
public void run () {
boolean running = true;
try{
while (running) {
System.out.println("Write thread: In buffer:
"+inData.size()+" Out buffer: "+outData.size());
if (inData.size() > 0) {
System.out.println("Write thread ... ");
ByteBuffer tmp=(ByteBuffer)inData.remove(0);
outData.add(tmp);
System.out.print("[");
readSelector.wakeup();
System.out.print(".");
channel.register(readSelector,SelectionKey.OP_READ|SelectionKey.OP_WRITE);
System.out.println("]");
}
System.out.println("Write Thread: Waiting for data ...");
synchronized(readLock){
readLock.wait();
}
System.out.println("Write Thread: got data ...
"+inData.size());
}
}
catch (Exception ex){
System.out.println("Write Thread: Exception in write selector loop:
");
ex.printStackTrace();
running = false;
}
} //end run()
} //end class
/**
Sets up the selectors and starts listening
*/
protected void startListening () {
try{
// create the selector and the ServerSocket
readSelector = SelectorProvider.provider().openSelector();
ssch = ServerSocketChannel.open();
ssch.configureBlocking(false);
InetSocketAddress isa = new
InetSocketAddress(InetAddress.getLocalHost(), 2000);
ssch.socket().bind(isa);
ssch.register(readSelector, SelectionKey.OP_ACCEPT);
}
catch (Exception e){
System.out.println("Error starting listening");
e.printStackTrace();
}
this.readThread = new SelectorThread();
this.readThread.setDaemon(true);
this.readThread.start();
}
// Test the working
public static void main (String argv[]){
SelectorTestServer s = new SelectorTestServer();
s.startListening();
try{
Thread.currentThread().sleep(500000);
}
catch (Exception e){
System.err.println(e.toString());
}
}
}
</CODE>
I am trying to write a small program using the NIO class that would read
from a socket some data, process it and then send the result back. For this
I need to register the channel for OP_READ first, when I have processed the
data and have some results I would set it to OP_READ | OP_WRITE. When I dont
have anything more to write to the channel, I set it back to OP_READ.
This seems to work fine on a linux and Windows system. But it doesnt work on
the sun system. I get a WRITE notification only once in the begning. But
never again. Am I going something wrong here ? Is there a better way to do
this ?
I have attached the code I used. (The code is very curde one, but helps show
the problem.)
The system that I used for testing is:
$ uname -X
System = SunOS
Node = suntest
Release = 5.10
KernelID = Generic
Machine = i86pc
BusType = <unknown>
Serial = <unknown>
Users = <unknown>
OEM# = 0
Origin# = 1
NumCPU = 2
$ java -version
java version "1.5.0_02"
Java(TM) 2 Runtime Environment, Standard Edition (build 1.5.0_02-b09)
Java HotSpot(TM) Client VM (build 1.5.0_02-b09, mixed mode)
Thanks for your time.
<CODE>
import java.net.*;
import java.io.*;
import java.util.*;
import java.nio.channels.*;
import java.nio.*;
import java.nio.channels.spi.*;
public class SelectorTestServer {
/** Single selector for accepts, reads */
private Selector readSelector = null;
/** ServerSocketChannel which listens for client connections*/
private ServerSocketChannel ssch = null;
/** The thread that waits for ready Channels - accept / read*/
private SelectorThread readThread = null;
/** Input queue **/
private Vector inData=new Vector();
/** Output queue **/
private Vector outData=new Vector();
private ByteBuffer inBuff;
private ByteBuffer outBuff;
private Object readLock=new Object();
/** Thread which runs the Selector */
private class SelectorThread extends Thread {
public SelectorThread() {
super("SelectorThread");
}
public void run () {
boolean running = true;
try{
// block until a Channel is ready for I/O
while (1==1) {
System.out.println("Going to select:
"+readSelector.keys().size());
while(readSelector.select() > 0){
System.out.println("In buffer: "+inData.size()+" Out
buffer: "+outData.size());
System.out.println("Read Thread while loop");
Set readyKeys = readSelector.selectedKeys();
Iterator i = readyKeys.iterator();
while (i.hasNext()){
SelectionKey sk = (SelectionKey) i.next();
i.remove();
if (sk.isValid() && sk.isAcceptable()){
// new client connection
ServerSocketChannel nextReady = (ServerSocketChannel)
sk.channel();
SocketChannel channel = nextReady.accept();
channel.configureBlocking(false);
channel.register(readSelector, SelectionKey.OP_READ);
WriteThread writeThread=new
WriteThread(channel);
writeThread.start();
}
if (sk.isValid() && sk.isReadable()){
System.out.println("****** readable");
SocketChannel
channel=(SocketChannel)sk.channel();
inBuff=ByteBuffer.allocate(1024);
int len=channel.read(inBuff);
if (len == -1 || len == 0) {
// Socket closed
channel.close();
}else{
inData.add(inBuff);
System.out.println("Read "+len+"
bytes");
synchronized(readLock){
readLock.notify();
}
}
}
if (sk.isValid() && sk.isWritable()) {
System.out.println("****** writable");
SocketChannel
channel=(SocketChannel)sk.channel();
if (outData.size() > 0) {
outBuff=(ByteBuffer)outData.remove(0);
outBuff.flip();
int len=channel.write(outBuff);
System.out.println("Sent ... "+len);
}
if (outData.size() == 0) {
// Nothing to send anymore ... so,
only reading
channel.register(readSelector,SelectionKey.OP_READ);
}
}
}
}
}
}
catch (Exception ex){
System.out.println("Exception in selector loop: ");
ex.printStackTrace();
running = false;
}
System.out.println("Selector thread terminated");
System.exit(0);
} //end run()
} //end class
/** Thread which runs the Selector */
private class WriteThread extends Thread {
private SocketChannel channel;
public WriteThread(SocketChannel chnl) {
super("WriteThread");
channel=chnl;
}
public void run () {
boolean running = true;
try{
while (running) {
System.out.println("Write thread: In buffer:
"+inData.size()+" Out buffer: "+outData.size());
if (inData.size() > 0) {
System.out.println("Write thread ... ");
ByteBuffer tmp=(ByteBuffer)inData.remove(0);
outData.add(tmp);
System.out.print("[");
readSelector.wakeup();
System.out.print(".");
channel.register(readSelector,SelectionKey.OP_READ|SelectionKey.OP_WRITE);
System.out.println("]");
}
System.out.println("Write Thread: Waiting for data ...");
synchronized(readLock){
readLock.wait();
}
System.out.println("Write Thread: got data ...
"+inData.size());
}
}
catch (Exception ex){
System.out.println("Write Thread: Exception in write selector loop:
");
ex.printStackTrace();
running = false;
}
} //end run()
} //end class
/**
Sets up the selectors and starts listening
*/
protected void startListening () {
try{
// create the selector and the ServerSocket
readSelector = SelectorProvider.provider().openSelector();
ssch = ServerSocketChannel.open();
ssch.configureBlocking(false);
InetSocketAddress isa = new
InetSocketAddress(InetAddress.getLocalHost(), 2000);
ssch.socket().bind(isa);
ssch.register(readSelector, SelectionKey.OP_ACCEPT);
}
catch (Exception e){
System.out.println("Error starting listening");
e.printStackTrace();
}
this.readThread = new SelectorThread();
this.readThread.setDaemon(true);
this.readThread.start();
}
// Test the working
public static void main (String argv[]){
SelectorTestServer s = new SelectorTestServer();
s.startListening();
try{
Thread.currentThread().sleep(500000);
}
catch (Exception e){
System.err.println(e.toString());
}
}
}
</CODE>