JMS学习笔记之Pub/Sub
publish-and-subscribe (pub/sub)
This messaging model is used when you need to broadcast an event or message to many message consumers.
Messages are published to a topic as opposed to being sent to a queue as in the point-to-point model
use case :
Eg: stock price updates, exception or error notification, and change notification of a particular data item in the database
Interface for Topics
Screen Shot 2020-07-30 at 3.42.21 PM
Screen Shot 2020-12-28 at 11.08.54 PM
Lender
jndi
java.naming.factory.initial = org.apache.activemq.jndi.ActiveMQInitialContextFactory java.naming.provider.url = tcp://localhost:61616 java.naming.security.principal =system java.naming.security.credentials =manager connectionFactoryNames = TopicCF topic.RateTopic = jms.RateTopic
code
public class TLender { private TopicConnectionFactory topicConnectionFactory; private TopicConnection topicConnection; private TopicSession topicSession; private Topic topic; public TLender (String topicCf, String topicName) { try { Context ctx = new InitialContext(); this .topicConnectionFactory = (TopicConnectionFactory) ctx.lookup(topicCf); this .topicConnection = topicConnectionFactory.createTopicConnection(); this .topicSession = topicConnection.createTopicSession(false , Session.AUTO_ACKNOWLEDGE); this .topic = (Topic) ctx.lookup(topicName); this .topicConnection.start(); } catch (Exception e) { e.printStackTrace(); } } private void exit () { try { this .topicConnection.close(); } catch (JMSException e) { e.printStackTrace(); } System.exit(0 ); } private void publishRate (double rate) { try { BytesMessage msg = this .topicSession.createBytesMessage(); msg.writeDouble(rate); TopicPublisher topicPublisher = topicSession.createPublisher(this .topic); topicPublisher.publish(msg); } catch (Exception e) { e.printStackTrace(); } } public static void main (String[] args) { String topicCf = null ; String topicName = null ; if (args.length == 2 ) { topicCf = args[0 ]; topicName = args[1 ]; } else { System.out.println("Invalid arguments. Should be: " ); System.out.println("java TLender factory topic" ); System.exit(0 ); } TLender tLender = new TLender(topicCf, topicName); try { BufferedReader stdin = new BufferedReader (new InputStreamReader(System.in)); System.out.println ("TLender Application Started" ); System.out.println ("Press enter to quit application" ); System.out.println ("Enter: Rate" ); System.out.println("\ne.g. 6.8" ); while (true ) { System.out.println("> " ); String rate = stdin.readLine(); if (rate == null || rate.trim().length() <= 0 ) { tLender.exit(); } double newRate = Double.valueOf(rate); tLender.publishRate(newRate); } } catch (Exception e) { e.printStackTrace(); } } }
Borrower
jndi
java.naming.factory.initial = org.apache.activemq.jndi.ActiveMQInitialContextFactory java.naming.provider.url = tcp://localhost:61616 java.naming.security.principal =system java.naming.security.credentials =manager connectionFactoryNames = TopicCF topic.RateTopic = jms.RateTopic
code
public class TBorrower implements MessageListener { private TopicConnectionFactory topicConnectionFactory; private TopicConnection topicConnection; private TopicSession topicSession; private Topic topic; private double currentRate; public TBorrower (String topicCf, String topicName, String currentRate) { try { this .currentRate = Double.valueOf(currentRate); Context ctx = new InitialContext(); this .topicConnectionFactory = (TopicConnectionFactory) ctx.lookup(topicCf); this .topicConnection = topicConnectionFactory.createTopicConnection(); this .topicSession = topicConnection.createTopicSession(false , Session.AUTO_ACKNOWLEDGE); this .topic = (Topic) ctx.lookup(topicName); TopicSubscriber topicSubscriber = topicSession.createSubscriber(topic); topicSubscriber.setMessageListener(this ); topicConnection.start(); System.out.println("Waiting for loan requests..." ); } catch (Exception e) { e.printStackTrace(); } } private void exit () { try { this .topicConnection.close(); } catch (JMSException e) { e.printStackTrace(); } System.exit(0 ); } @Override public void onMessage (Message message) { try { BytesMessage msg = (BytesMessage) message; double newRate = msg.readDouble(); if ((currentRate - newRate) >= 1.0 ) { System.out.println( "New rate = " + newRate + " - Consider refinancing loan" ); } else { System.out.println("New rate = " + newRate + " - Keep existing loan" ); } System.out.println("\nWaiting for rate updates..." ); } catch (Exception e) { e.printStackTrace(); } } public static void main (String[] args) { String topicCf = null ; String topicName = null ; String rate = null ; if (args.length == 3 ) { topicCf = args[0 ]; topicName = args[1 ]; rate = args[2 ]; } else { System.out.println("Invalid arguments. Should be: " ); System.out.println("java TBorrower factory topic rate" ); System.exit(0 ); } TBorrower tBorrower = new TBorrower(topicCf, topicName, rate); try { BufferedReader stdin = new BufferedReader (new InputStreamReader(System.in)); System.out.println("TBorrower application started" ); System.out.println("Press enter to quit application" ); stdin.readLine(); tBorrower.exit(); } catch (IOException ioe) { ioe.printStackTrace(); } } }
Testing
cp libs to idea out folder (class file)
java -cp javax.jms_1.1.1.jar:activemq-all-5.15.5.jar:. TLender TopicCF RateTopic
java -cp javax.jms_1.1.1.jar:activemq-all-5.15.5.jar:. TBorrower TopicCF RateTopic 1
java -cp javax.jms_1.1.1.jar:activemq-all-5.15.5.jar:. TBorrower TopicCF RateTopic 5
Durable Vs Non-Durable
Nondurable subscribers receive messages only when they are actively listening on that topic. Otherwise, the message is gone.
Durable subscribers, on the other hand, will receive all messages sent to that topic (depending on the message selectors applied to that subscriber), regardless of whether that subscriber is active or not.
“store-and-forward” : similar to email
how to create durable subscriber
TopicSubscriber subscriber = tSession.createDurableSubscriber(topic, "Borrower1" );