JMS学习笔记之P2P

JMS学习笔记之P2P

Point-to-Point

The producer is called a sender and the consumer is called a receiver.

  • msg exchanged via virtual channel - queue
  • each msg only delievered to only one receiver
  • msgs are ordered
  • no coupling of the producers to the consumers

multiple consumers using same queue:

How messages sent to a queue are distributed to the queue’s consumers depends on the policies of the JMS provider. Some JMS providers use load-balancing


2 types of p2p:

fire and forget : logging, kick-off report, trigger event on another process

async request/reply (blocking waiting for reply on reply queue)


point-to-point guarantees that only one consumer will process a given message.

Another advantage is that the point-to-point model provides a QueueBrowser that allows the JMS client to take a snapshot of the queue to see messages waiting to be consumed


QBorrower and QLender Application - request/reply

Screen Shot 2020-07-17 at 4.55.31 PM

Key:

  • Borrower: setJMSReplyTo 为borrower的queue name,发送完消息,使用filter (correlationID)接受消息。
  • Lender: 监听并接收消息,接收消息后发送回复,setJMSCorrelationID为收到的msgID,发送的queue名字为接收消息的header中的replyTo

Borrower

jndi

java.naming.factory.initial = org.apache.activemq.jndi.ActiveMQInitialContextFactory
java.naming.provider.url = tcp://localhost:61616
java.naming.security.principal=system
java.naming.security.credentials=manager

connectionFactoryNames = QueueCF
queue.LoanRequestQ = jms.LoanRequestQ
queue.LoanResponseQ = jms.LoanResponseQ

code

public class QBorrower {

private QueueConnectionFactory queueConnectionFactory;
private QueueConnection queueConnection;
private QueueSession queueSession;
private Queue requestQueue;
private Queue responseQueue;


public QBorrower(String queueCf, String requestQueue, String responseQueue) {

try {
Context ctx = new InitialContext();
this.queueConnectionFactory = (QueueConnectionFactory) ctx.lookup(queueCf);
this.queueConnection = queueConnectionFactory.createQueueConnection();

// create JMS session
this.queueSession = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);

// look up request/response queue
this.requestQueue = (Queue) ctx.lookup(requestQueue);
this.responseQueue = (Queue) ctx.lookup(responseQueue);

// start the connection
this.queueConnection.start();

} catch (Exception e) {
e.printStackTrace();
System.exit(1);
}
}

private void exit() {
try {
this.queueConnection.close();
} catch (JMSException e) {
e.printStackTrace();
}
System.exit(0);

}

private void sendLoanRequest(double salary, double loanAmt) {
try {
MapMessage msg = this.queueSession.createMapMessage();
msg.setDouble("Salary", salary);
msg.setDouble("LoanAmount", loanAmt);
msg.setJMSReplyTo(this.responseQueue);

// create queue sender and send the message
QueueSender queueSender = queueSession.createSender(requestQueue);
queueSender.send(msg);

// wait to see if the loan request was accepted or rejected
String filter = "JMSCorrelationID ='" + msg.getJMSMessageID() + "'";
QueueReceiver queueReceiver = queueSession.createReceiver(responseQueue, filter);
TextMessage receiveMsg = (TextMessage) queueReceiver.receive(30 * 1000);
if (receiveMsg == null) {
System.out.println("QLender not responding");
} else {
System.out.println("Loan request was " + receiveMsg.getText());
}

} catch (JMSException e) {
e.printStackTrace();
System.exit(1);
}
}

public static void main(String[] args) throws Exception {

String queueCf = null;
String requestQ = null;
String responseQ = null;
if (args.length == 3) {
queueCf = args[0];
requestQ = args[1];
responseQ = args[2];
} else {
System.out.println("Invalid arguments, should be:");
System.out.println("java QBorrower factory requestQ responseQ");
System.exit(0);
}

QBorrower borrower = new QBorrower(queueCf, requestQ, responseQ);

BufferedReader stdin = new BufferedReader
(new InputStreamReader(System.in));
System.out.println ("QBorrower Application Started");
System.out.println ("Press enter to quit application");
System.out.println ("Enter: Salary, Loan_Amount");
System.out.println("\ne.g. 50000, 120000");

while (true) {
System.out.println("> ");
String loadRequest = stdin.readLine();
if (loadRequest == null || loadRequest.trim().length() <= 0) {
borrower.exit();
}

StringTokenizer st = new StringTokenizer(loadRequest, ",");
double salary = Double.valueOf(st.nextToken().trim()).doubleValue();
double loanAmt = Double.valueOf(st.nextToken().trim()).doubleValue();

borrower.sendLoanRequest(salary, loanAmt);
}
}

}

Lender

jndi

java.naming.factory.initial = org.apache.activemq.jndi.ActiveMQInitialContextFactory
java.naming.provider.url = tcp://localhost:61616
java.naming.security.principal=system
java.naming.security.credentials=manager

connectionFactoryNames = QueueCF
queue.LoanRequestQ = jms.LoanRequestQ
queue.LoanResponseQ = jms.LoanResponseQ

code

public class QLender implements MessageListener {

private QueueConnectionFactory queueConnectionFactory;
private QueueConnection queueConnection;
private QueueSession queueSession;
private Queue requestQueue;

public QLender(String queueCF, String requestQueue) {

try {
Context ctx = new InitialContext();
this.queueConnectionFactory = (QueueConnectionFactory) ctx.lookup(queueCF);
this.queueConnection = queueConnectionFactory.createQueueConnection();

// create JMS session
this.queueSession = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);

// lookup request queue
this.requestQueue = (Queue) ctx.lookup(requestQueue);

// start the connection
this.queueConnection.start();

QueueReceiver queueReceiver = queueSession.createReceiver(this.requestQueue);
queueReceiver.setMessageListener(this);

System.out.println("\nWaiting for loan requests...\n");

} catch (Exception e) {
e.printStackTrace();
}
}


@Override
public void onMessage(Message message) {

try {

MapMessage msg = (MapMessage) message;
double salary = msg.getDouble("Salary");
double loanAmt = msg.getDouble("LoanAmount");

// determine whether to accept the load request
boolean accepted = (salary / loanAmt) > .25;
System.out.println("Percentage = " + (salary / loanAmt) + ", loan is " + (accepted ? "Accept!" : "Declined"));

// send result back to borrower
TextMessage responseMsg = queueSession.createTextMessage();
responseMsg.setText(accepted ? "Accept!" : "Declined");
responseMsg.setJMSCorrelationID(msg.getJMSMessageID());

// create sender and send message
QueueSender sender = queueSession.createSender((Queue) msg.getJMSReplyTo());
sender.send(responseMsg);

System.out.println("\nWaiting for loan requests...\n");


} catch (Exception e) {
e.printStackTrace();
}
}

private void exit() {
try {
queueConnection.close();
} catch (JMSException e) {
e.printStackTrace();
}

System.exit(0);
}

public static void main(String[] args) {
String queueCf = null;
String requestQ = null;
if (args.length == 2) {
queueCf = args[0];
requestQ = args[1];
} else {
System.out.println("Invalid argument, should be:");
System.out.println("java QLender factory request_queue");
System.exit(1);
}

QLender qLender = new QLender(queueCf, requestQ);

// run util enter is pressed
try {
BufferedReader stdin = new BufferedReader
(new InputStreamReader(System.in));
System.out.println ("QLender application started");
System.out.println ("Press enter to quit application");
stdin.readLine();
qLender.exit();
} catch (Exception e) {
e.printStackTrace();
}
}
}

Testing:

  1. cp libs to idea out folder (class file)
  2. java -cp javax.jms_1.1.1.jar:activemq-all-5.15.5.jar:. QBorrower QueueCF LoanRequestQ LoanResponseQ
  3. java -cp javax.jms_1.1.1.jar:activemq-all-5.15.5.jar:. QLender QueueCF LoanRequestQ

Message Correlation

Message correlation is required when using the request/reply model of point-to-point messaging where the queue is being shared by multiple producers and consumers

QueueReceiver using filter (correlationID equals to original msgID)

String filter = "JMSCorrelationID = '" + msg.getJMSMessageID() + "'"; 
QueueReceiver qReceiver = qSession.createReceiver(responseQ, filter);

Dynamic Versus Administered Queues

Dynamic queues are queues that are created through the application source code using a vendor-specific API. Administered queues are queues that are defined in the JMS provider configuration files or administration tools.

Load Balancing Using Multiple Receivers

The JMS specification states that this capability must be implemented by a JMS provider, although it does not define the rules for how the messages are distributed among consumers.

Start another QLender·: java -cp javax.jms_1.1.1.jar:activemq-all-5.15.5.jar:. QLender QueueCF LoanRequestQ

Examining a Queue

Messages obtained from a QueueBrowser are copies of messages contained in the queue and are not considered to be consumed—they are merely for browsing.

It is also important to note that the QueueBrowser is not guaranteed to have a definitive list of messages in the queue: The contents of the queue may change between the time the browser is created and the time you examine its contents.


UseCase

Point-to-point is used when you want one receiver to process any given message once and only once - point-to-point guarantees that only one consumer will process a given message.

Use multple consumer to balance the load.