Threads in JMS listeners?


I

iksrazal

Could someone please post a working example of using threads and JMS,
of either a queue or topic listener? I searched google a lot, and have
a book with vague examples - neither of which have got me very far.

Thanks,
iksrazal
 
Ad

Advertisements

S

Sudsy

iksrazal said:
Could someone please post a working example of using threads and JMS,
of either a queue or topic listener? I searched google a lot, and have
a book with vague examples - neither of which have got me very far.

If you're using message-driven beans then the question is moot. Your
server should manage the bean pool and create or delete instances as
required. Why reinvent the wheel?
 
I

iksrazal

Sudsy said:
If you're using message-driven beans then the question is moot. Your
server should manage the bean pool and create or delete instances as
required. Why reinvent the wheel?

Nope, not using EJB. Anyways you can't use the synchronized keyword in
bean classes.

What I'm trying to do is thread the listener of a pure JMS
implementation - which is legal but tricky. As I understand it, each
thread must have their own unique TopicSession/QueueSession . And I'd
like to avoid reinventing the wheel, if possible, by following some
code from someone who knows the topic better than I do.

Thanks,
iksrazal
 
Ad

Advertisements

I

iksrazal

To answer my own question, and to help anyone who's trying to do the
same thing, this is how it turned out:

package com.infoseg.mr.reciever;

import javax.jms.*;
import java.io.*;
import java.util.*;
import java.net.*;
import javax.naming.*;

import EDU.oswego.cs.dl.util.concurrent.PooledExecutor;

public class TopicObserver
{
public static TopicConnection connection;
private static String topicName = "topic1";
private static PooledExecutor executor;

//Inner class
static public class TopicListener implements MessageListener,
Runnable
{
private int threadCount = 0;
private int index = 0;
private Topic topic;
private SecureWSInvoker si = SecureWSInvoker.getInstance();

public TopicListener(int _tc, Topic _t)
{
this.threadCount = _tc;
this.topic = _t;
}

public void onMessage(Message message)
{
try
{
System.out.println("["+this.threadCount+"]Listener: ACCEPTED
message");
TextMessage textMessage = (TextMessage)message;
System.out.println("received: " + textMessage.getText());
//Worker thread from thread pool calls web service
executor.execute(new WSClientInvoker (this.index,
textMessage.getText()));
//String resposta = this.si.getData(textMessage.getText());
//System.out.println(" recieved from web service: " +
resposta);
}
catch (Exception e)
{
e.printStackTrace();
}
}

public void run()
{
try
{
synchronized(this)
{
this.index++;
}
TopicSession session = connection.createTopicSession(false,
Session.AUTO_ACKNOWLEDGE);
TopicSubscriber topicSubscriber =
session.createSubscriber(this.topic);
topicSubscriber.setMessageListener(this);

while (true)
{
Thread.sleep(500); // keep the thread alive so that the
// session doesn't get garbage-collected
}
}
catch( Exception e )
{
e.printStackTrace();
}
}
}

//Inner class
static public class WSClientInvoker implements Runnable
{
private int id = 0;
private String message;
private SecureWSInvoker si = SecureWSInvoker.getInstance();

public WSClientInvoker (int _tc, String _msg)
{
this.id = _tc;
this.message = _msg;
}

public void run()
{
System.out.println(this.id + "@" +
Thread.currentThread().getName() + " {" );
try
{
String resposta = this.si.getData(this.message);
System.out.println(this.id + "@" +
Thread.currentThread().getName() + " recieved from web service: " +
resposta);
}
catch( Exception e )
{
e.printStackTrace();
}
System.out.println("} " + this.id + "@" +
Thread.currentThread().getName());
}
}

public static void main (String args[])
{
RecieverInit init = new RecieverInit();

String url = "rmi://localhost:1099/JndiServer";
Properties props = new Properties();

props.put(Context.INITIAL_CONTEXT_FACTORY,
"org.exolab.jms.jndi.rmi.RmiJndiInitialContextFactory");
props.put(Context.PROVIDER_URL, url);

System.out.println ("Running TopicListener...");

try
{
executor = new PooledExecutor(5);
//setup JMS...
Context context = new InitialContext(props);
Topic topic = (Topic) context.lookup(topicName);

TopicConnectionFactory factory = (TopicConnectionFactory)
context.lookup("JmsTopicConnectionFactory");

connection = factory.createTopicConnection();
connection.start();
// must have seperate threads - with a seperate session - for
every listener
(new Thread(new TopicListener(0, topic))).start();
}
catch ( Exception e )
{
e.printStackTrace();
}
}
}
 

Ask a Question

Want to reply to this thread or ask your own question?

You'll need to choose a username for the site, which only take a couple of moments. After that, you can post your question and our members will help you out.

Ask a Question

Top