JMS学习笔记之Pub/Sub

JMS学习笔记之Pub/Sub

publish-and-subscribe (pub/sub)

This messaging model is used when you need to broadcast an event or message to many message consumers.

Messages are published to a topic as opposed to being sent to a queue as in the point-to-point model

use case:

Eg: stock price updates, exception or error notification, and change notification of a particular data item in the database


Interface for Topics

Screen Shot 2020-07-30 at 3.42.21 PM

Screen Shot 2020-12-28 at 11.08.54 PM

Lender

jndi

java.naming.factory.initial = org.apache.activemq.jndi.ActiveMQInitialContextFactory
java.naming.provider.url = tcp://localhost:61616
java.naming.security.principal=system
java.naming.security.credentials=manager

connectionFactoryNames = TopicCF
topic.RateTopic = jms.RateTopic

code

public class TLender {

private TopicConnectionFactory topicConnectionFactory;
private TopicConnection topicConnection;
private TopicSession topicSession;
private Topic topic;

public TLender(String topicCf, String topicName) {
try {
Context ctx = new InitialContext();
this.topicConnectionFactory = (TopicConnectionFactory) ctx.lookup(topicCf);
this.topicConnection = topicConnectionFactory.createTopicConnection();

// create JMS session
this.topicSession = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);

// Look up topic
this.topic = (Topic) ctx.lookup(topicName);

// start the connection
this.topicConnection.start();
} catch (Exception e) {
e.printStackTrace();
}
}


private void exit() {
try {
this.topicConnection.close();
} catch (JMSException e) {
e.printStackTrace();
}
System.exit(0);
}


private void publishRate(double rate) {
try {

// create JMS message
BytesMessage msg = this.topicSession.createBytesMessage();
msg.writeDouble(rate);

// create publish and publish the message
TopicPublisher topicPublisher = topicSession.createPublisher(this.topic);
topicPublisher.publish(msg);

} catch (Exception e) {
e.printStackTrace();
}
}


public static void main(String[] args) {
String topicCf = null;
String topicName = null;
if (args.length == 2) {
topicCf = args[0];
topicName = args[1];
} else {
System.out.println("Invalid arguments. Should be: ");
System.out.println("java TLender factory topic");
System.exit(0);
}

TLender tLender = new TLender(topicCf, topicName);


try {
BufferedReader stdin = new BufferedReader
(new InputStreamReader(System.in));
System.out.println ("TLender Application Started");
System.out.println ("Press enter to quit application");
System.out.println ("Enter: Rate");
System.out.println("\ne.g. 6.8");

while (true) {
System.out.println("> ");
String rate = stdin.readLine();

if (rate == null || rate.trim().length() <= 0) {
tLender.exit();
}

double newRate = Double.valueOf(rate);
tLender.publishRate(newRate);
}
} catch (Exception e) {
e.printStackTrace();
}

}
}

Borrower

jndi

java.naming.factory.initial = org.apache.activemq.jndi.ActiveMQInitialContextFactory
java.naming.provider.url = tcp://localhost:61616
java.naming.security.principal=system
java.naming.security.credentials=manager

connectionFactoryNames = TopicCF
topic.RateTopic = jms.RateTopic

code

public class TBorrower implements MessageListener {

private TopicConnectionFactory topicConnectionFactory;
private TopicConnection topicConnection;
private TopicSession topicSession;
private Topic topic;

private double currentRate;

public TBorrower(String topicCf, String topicName, String currentRate) {

try {
this.currentRate = Double.valueOf(currentRate);

Context ctx = new InitialContext();
this.topicConnectionFactory = (TopicConnectionFactory) ctx.lookup(topicCf);
this.topicConnection = topicConnectionFactory.createTopicConnection();

// create JMS session
this.topicSession = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);

// lookup for topic destination
this.topic = (Topic) ctx.lookup(topicName);

// create message Listener
TopicSubscriber topicSubscriber = topicSession.createSubscriber(topic);
topicSubscriber.setMessageListener(this);

// start the connection
topicConnection.start();

System.out.println("Waiting for loan requests...");

} catch (Exception e) {
e.printStackTrace();
}


}

private void exit() {

try {
this.topicConnection.close();
} catch (JMSException e) {
e.printStackTrace();
}

System.exit(0);

}

@Override
public void onMessage(Message message) {

try {
BytesMessage msg = (BytesMessage) message;
double newRate = msg.readDouble();

// if rate is at least 1 point lower than current rate, considering refinancing
if ((currentRate - newRate) >= 1.0) {
System.out.println(
"New rate = " + newRate + " - Consider refinancing loan");
} else {
System.out.println("New rate = " + newRate + " - Keep existing loan");
}

System.out.println("\nWaiting for rate updates...");

} catch (Exception e) {
e.printStackTrace();
}

}

public static void main(String[] args) {

String topicCf = null;
String topicName = null;
String rate = null;
if (args.length == 3) {
topicCf = args[0];
topicName = args[1];
rate = args[2];
} else {
System.out.println("Invalid arguments. Should be: ");
System.out.println("java TBorrower factory topic rate");
System.exit(0);
}

TBorrower tBorrower = new TBorrower(topicCf, topicName, rate);

try {
// Run until enter is pressed
BufferedReader stdin = new BufferedReader
(new InputStreamReader(System.in));
System.out.println("TBorrower application started");
System.out.println("Press enter to quit application");
stdin.readLine();
tBorrower.exit();
} catch (IOException ioe) {
ioe.printStackTrace();
}

}
}

Testing

  1. cp libs to idea out folder (class file)

  2. java -cp javax.jms_1.1.1.jar:activemq-all-5.15.5.jar:. TLender TopicCF RateTopic

  3. java -cp javax.jms_1.1.1.jar:activemq-all-5.15.5.jar:. TBorrower TopicCF RateTopic 1

  4. java -cp javax.jms_1.1.1.jar:activemq-all-5.15.5.jar:. TBorrower TopicCF RateTopic 5


Durable Vs Non-Durable

Nondurable subscribers receive messages only when they are actively listening on that topic. Otherwise, the message is gone.

Durable subscribers, on the other hand, will receive all messages sent to that topic (depending on the message selectors applied to that subscriber), regardless of whether that subscriber is active or not.

“store-and-forward” : similar to email

how to create durable subscriber

TopicSubscriber subscriber = tSession.createDurableSubscriber(topic, "Borrower1");