Threads over same socket/streams instances.

U

udupi_mail

I have a situation where I will be writing a standalone Multithreaded
Server(Server-1) (which also acts as a delegate client to another
server appln(Server-2, written by 3rd party), which can only
communicates thru sockets).

Multiple clients(webservices, EJBs ..I'll call this as Clients-1) call
on Server-1 which is listening on a pre-defined port. On every client
request, Server-1 will spawn a new thread to handle the client socket.

Now each of these threads in turn would need to connect to Server-2.
Server-2 will be listening on pre-defined ports. Depending on the
configuration there may be 3-6 ports it is listening on.

The restriction is, each of these threads(created in Server-1) cannot
open their own client socket instances and IO-streams to connect to the
Server-2.
The socket and the streams should be established at startup whcih
depends on the config, and all threads would have to use the same
stream references for IO.

So to be precise, is there a way in java that multiple threads
read/write on the same socket instance streams?
In this case will by read/write methods have to be synchronized? Can I
implement asynchronous communication between Server-1 and Server-2?
...would have been nice if there was a possibility of using JMS.

How do I handle a situation where client-thread1 had read data
belonging to client-thread2? Is there some callback mechanism?


Any feedback will be very very helpful. Sorry for being so wordy.
- Guru.
 
I

iksrazal

(e-mail address removed) wrote in message
How do I handle a situation where client-thread1 had read data
belonging to client-thread2? Is there some callback mechanism?


Any feedback will be very very helpful. Sorry for being so wordy.
- Guru.

Have you looked at the NIO classes? Last year, I wrote a scalable
client socket architecure using a thread pool and socket objects
placed into a cache, protected by semaphores. The thread pool and
semaphores are now part of 1.5 , but are available as backports.

Concerning 'callbacks' - with NIO you can register events.

Here's two links that may help:

http://www.onjava.com/pub/a/onjava/2002/10/02/javanio.html
http://www.krisnmike.com/mike/nio.html

iksrazal
http://www.braziloutsource.com/
 
I

iksrazal

In this case will by read/write methods have to be synchronized? Can I
implement asynchronous communication between Server-1 and Server-2?
..would have been nice if there was a possibility of using JMS.

How do I handle a situation where client-thread1 had read data
belonging to client-thread2? Is there some callback mechanism?


Any feedback will be very very helpful. Sorry for being so wordy.
- Guru.

A bit wordy, yes. I missed the JMS part.

You can also use threads with JMS, but you must have must have
seperate threads - with a seperate session - for every listener. The
o'reilly JMS book explains that pretty well.

Here's a simple observer that recieves messages and fires off
webservices - similair to firing off sockets.

For simultaneous threads, just add

(new Thread(new TopicListener(1, topic))).start();
(new Thread(new TopicListener(2, topic))).start();

etc...

package com.infoseg.mr.reciever;

import javax.jms.*;
import java.io.*;
import java.util.*;
import java.net.*;
import javax.naming.*;

import EDU.oswego.cs.dl.util.concurrent.PooledExecutor;

public class TopicObserver
{
public static TopicConnection connection;
private static String topicName = "topic1";
private static PooledExecutor executor;

//Inner class
static public class TopicListener implements MessageListener,
Runnable
{
private int threadCount = 0;
private int index = 0;
private Topic topic;
private SecureWSInvoker si = SecureWSInvoker.getInstance();

public TopicListener(int _tc, Topic _t)
{
this.threadCount = _tc;
this.topic = _t;
}

public void onMessage(Message message)
{
try
{
System.out.println("["+this.threadCount+"]Listener: ACCEPTED
message");
TextMessage textMessage = (TextMessage)message;
System.out.println("received: " + textMessage.getText());
//Worker thread from thread pool calls web service
executor.execute(new WSClientInvoker (this.index,
textMessage.getText()));
//String resposta = this.si.getData(textMessage.getText());
//System.out.println(" recieved from web service: " +
resposta);
}
catch (Exception e)
{
e.printStackTrace();
}
}

public void run()
{
try
{
synchronized(this)
{
this.index++;
}
TopicSession session = connection.createTopicSession(false,
Session.AUTO_ACKNOWLEDGE);
TopicSubscriber topicSubscriber =
session.createSubscriber(this.topic);
topicSubscriber.setMessageListener(this);

while (true)
{
Thread.sleep(500); // keep the thread alive so that the
// session doesn't get garbage-collected
}
}
catch( Exception e )
{
e.printStackTrace();
}
}
}

//Inner class
static public class WSClientInvoker implements Runnable
{
private int id = 0;
private String message;
private SecureWSInvoker si = SecureWSInvoker.getInstance();

public WSClientInvoker (int _tc, String _msg)
{
this.id = _tc;
this.message = _msg;
}

public void run()
{
System.out.println(this.id + "@" +
Thread.currentThread().getName() + " {" );
try
{
System.out.println(this.id + "@" +
Thread.currentThread().getName() + "\n\nwaiting for response from web
service...");
String resposta = this.si.getData(this.message);
System.out.println(this.id + "@" +
Thread.currentThread().getName() + " recieved from web service: " +
resposta);
}
catch( Exception e )
{
e.printStackTrace();
}
System.out.println("} " + this.id + "@" +
Thread.currentThread().getName());
}
}

public static void main (String args[])
{
RecieverInit init = new RecieverInit();

String url = "rmi://localhost:1099/JndiServer";
Properties props = new Properties();

props.put(Context.INITIAL_CONTEXT_FACTORY,
"org.exolab.jms.jndi.rmi.RmiJndiInitialContextFactory");
props.put(Context.PROVIDER_URL, url);

System.out.println ("Running TopicListener...");

try
{
executor = new PooledExecutor(5);
//setup JMS...
Context context = new InitialContext(props);
Topic topic = (Topic) context.lookup(topicName);

TopicConnectionFactory factory = (TopicConnectionFactory)
context.lookup("JmsTopicConnectionFactory");

connection = factory.createTopicConnection();
connection.start();
// must have seperate threads - with a seperate session - for
every listener
(new Thread(new TopicListener(0, topic))).start();
}
catch ( Exception e )
{
e.printStackTrace();
}
}
}

iksrazal
http://www.braziloutsource.com/
 

Ask a Question

Want to reply to this thread or ask your own question?

You'll need to choose a username for the site, which only take a couple of moments. After that, you can post your question and our members will help you out.

Ask a Question

Members online

Forum statistics

Threads
473,755
Messages
2,569,536
Members
45,020
Latest member
GenesisGai

Latest Threads

Top