2013年8月23日 星期五

[ InAction Note ] Ch7. ActiveMQ App - Implementing request/reply with JMS (Note)

Preface:
ActiveMQ 最常為人知的是 Asynchronous message 的使用, 事實上它也可以用來實作 request/reply 的 synchronous client/server 架構. 而更之甚者, 它的好處是除了保留 request/reply 的原味外, 又增加了許多好處. 例如你可以在不停止系統下動態的增加 workload 以服務更多的 client. 底下是其實作的概念圖:


底下的範例代碼只說明使用的框架代碼, 實際的商業邏輯就因人而異.

Sample Code:
下面的代碼是來自 ActiveMQ In Action 第七章的 "Implementing request/reply with JMS" 節, 首先要來看的是 Broker 與 Worker 啟動的代碼. 這邊為了簡化 Broker 的啟動流程, 使用的是 Java Embedded 的 Broker; 而在使用 request/reply 的 message 傳輸時必須要有一個類似 TCP 的 session id, 這樣才知道哪個 reply 要配哪個 request, 而這個功能的實現是透過設定 Message 的JMSCorrelationID 屬性. 因為 Message 並不是直接送到 Worker 端, 所以 reply 的 destination 必須要透過 Client 設定 Message 的屬性 JMSReplyTo 得知. 有了這些概念, 就來看看 Broker+Worker 的代碼吧:
- Server.java
  1. package ch7;  
  2.   
  3. import javax.jms.Connection;  
  4. import javax.jms.DeliveryMode;  
  5. import javax.jms.Destination;  
  6. import javax.jms.JMSException;  
  7. import javax.jms.Message;  
  8. import javax.jms.MessageConsumer;  
  9. import javax.jms.MessageListener;  
  10. import javax.jms.MessageProducer;  
  11. import javax.jms.Session;  
  12. import javax.jms.TextMessage;  
  13.   
  14. import org.apache.activemq.ActiveMQConnectionFactory;  
  15. import org.apache.activemq.broker.BrokerService;  
  16.   
  17. public class Server implements MessageListener {  
  18.     private String brokerUrl = "tcp://0.0.0.0:61616";  
  19.     private String requestQueue = "requests";  
  20.   
  21.     private BrokerService broker;  
  22.     private Session session;  
  23.     private MessageProducer producer;  
  24.     private MessageConsumer consumer;  
  25.   
  26.     public void start() throws Exception {  
  27.         createBroker();  
  28.         setupConsumer();  
  29.     }  
  30.   
  31.     private void createBroker() throws Exception {  
  32.         broker = new BrokerService();  
  33.         broker.setPersistent(false);  
  34.         broker.setUseJmx(false);  
  35.         broker.addConnector(brokerUrl);  
  36.         broker.start();  
  37.     }  
  38.   
  39.     private void setupConsumer() throws JMSException {  
  40.         ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerUrl);  
  41.   
  42.         Connection connection;  
  43.         connection = connectionFactory.createConnection();  
  44.         connection.start();  
  45.         session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);  
  46.         Destination adminQueue = session.createQueue(requestQueue);  
  47.   
  48.         producer = session.createProducer(null);  
  49.         producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);  
  50.   
  51.         consumer = session.createConsumer(adminQueue);  
  52.         consumer.setMessageListener(this);  
  53.     }  
  54.   
  55.     public void stop() throws Exception {  
  56.         producer.close();  
  57.         consumer.close();  
  58.         session.close();  
  59.         broker.stop();  
  60.     }  
  61.   
  62.     public void onMessage(Message message) {  
  63.         try {  
  64.             TextMessage response = this.session.createTextMessage();  
  65.             if (message instanceof TextMessage) {  
  66.                 TextMessage txtMsg = (TextMessage) message;  
  67.                 String messageText = txtMsg.getText();  
  68.                 response.setText(handleRequest(messageText));  
  69.             }  
  70.   
  71.             response.setJMSCorrelationID(message.getJMSCorrelationID());  
  72.   
  73.             producer.send(message.getJMSReplyTo(), response);  
  74.         } catch (JMSException e) {  
  75.             e.printStackTrace();  
  76.         }  
  77.     }  
  78.   
  79.     public String handleRequest(String messageText) {  
  80.         return "Response to '" + messageText + "'";  
  81.     }  
  82.   
  83.     public static void main(String[] args) throws Exception {  
  84.         Server server = new Server();  
  85.         server.start();  
  86.   
  87.         System.out.println();  
  88.         System.out.println("Press any key to stop the server");  
  89.         System.out.println();  
  90.   
  91.         System.in.read();  
  92.   
  93.         server.stop();  
  94.     }  
  95. }  
接著來看 Client 端的代碼, 在 Client 端一樣有 Consumer/Producer 的 agent, Producer 就是用來發送訊息到 "Request queue"; Consumer 是透過 session.createTemporaryQueue() 建立的 temple queue (名稱並不重要, 只是一個中介), 用來讓 Worker 有個地方可以送回 reply 的 Message. 另外 Client 再送出 request Message 前必須要做兩件事:
- 設定 JMSCorrelationID, 這樣才知道收到的 reply 是屬於哪個 request.
- 設定 JMSReplyTo, 這樣 Client 才知道送到哪去.

如此下面的 Client 代碼便不難了解了:
- Client.java
  1. package ch7;  
  2.   
  3. import java.util.UUID;  
  4.   
  5. import javax.jms.Connection;  
  6. import javax.jms.DeliveryMode;  
  7. import javax.jms.Destination;  
  8. import javax.jms.JMSException;  
  9. import javax.jms.Message;  
  10. import javax.jms.MessageConsumer;  
  11. import javax.jms.MessageListener;  
  12. import javax.jms.MessageProducer;  
  13. import javax.jms.Session;  
  14. import javax.jms.TextMessage;  
  15.   
  16. import org.apache.activemq.ActiveMQConnectionFactory;  
  17.   
  18. public class Client implements MessageListener {  
  19.     private String brokerUrl = "tcp://0.0.0.0:61616";  
  20.     private String requestQueue = "requests";  
  21.       
  22.     Connection connection;  
  23.     private Session session;  
  24.     private MessageProducer producer;  
  25.     private MessageConsumer consumer;  
  26.       
  27.     private Destination tempDest;  
  28.       
  29.     public void start() throws JMSException {  
  30.             ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(  
  31.                             brokerUrl);  
  32.             connection = connectionFactory.createConnection();  
  33.             connection.start();  
  34.             session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);  
  35.             Destination adminQueue = session.createQueue(requestQueue);  
  36.   
  37.             producer = session.createProducer(adminQueue);  
  38.             producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);  
  39.   
  40.             tempDest = session.createTemporaryQueue();  
  41.             consumer = session.createConsumer(tempDest);  
  42.   
  43.             consumer.setMessageListener(this);  
  44.     }  
  45.   
  46.     public void stop() throws JMSException {  
  47.             producer.close();  
  48.             consumer.close();  
  49.             session.close();  
  50.             connection.close();  
  51.     }  
  52.       
  53.     public void request(String request) throws JMSException {  
  54.             System.out.println("Requesting: " + request);  
  55.             TextMessage txtMessage = session.createTextMessage();  
  56.             txtMessage.setText(request);  
  57.   
  58.             txtMessage.setJMSReplyTo(tempDest);  
  59.   
  60.             String correlationId = UUID.randomUUID().toString();  
  61.             txtMessage.setJMSCorrelationID(correlationId);  
  62.             this.producer.send(txtMessage);  
  63.     }  
  64.   
  65.     public void onMessage(Message message) {  
  66.             try {  
  67.                     System.out.println("Received response for: "  
  68.                                     + ((TextMessage) message).getText());  
  69.             } catch (JMSException e) {  
  70.                     e.printStackTrace();  
  71.             }  
  72.     }  
  73.       
  74.     public static void main(String[] args) throws Exception {  
  75.             Client client = new Client();  
  76.             client.start();  
  77.             int i = 0;  
  78.             while (i++ < 10) {  
  79.                     client.request("REQUEST-" + i);  
  80.             }  
  81.             Thread.sleep(3000); //wait for replies  
  82.             client.stop();  
  83.     }  
  84.   
  85. }  
最後我將上面兩個代碼都封裝到 AMQInAction.jar, 可以在 這裡下載. 最後便是檢驗代碼的成果摟, 先要啟動 Broker+Worker:
# java -cp AMQInAction.jar ch7.Server
Press any key to stop the server

接著是啟動 Client 發送 request Message, 並等待回復 Message:
# java -cp AMQInAction.jar ch7.Client
Requesting: REQUEST-1
Requesting: REQUEST-2
Requesting: REQUEST-3
Requesting: REQUEST-4
...
Requesting: REQUEST-10
Received response for: Response to 'REQUEST-2'
Received response for: Response to 'REQUEST-3'
Received response for: Response to 'REQUEST-4'
Received response for: Response to 'REQUEST-5'
...

其實這樣的功能簡單使用 Java Socket 的 programming 便可以達到, 但是除此之外它帶來的好處書上有提到:
Using the request/reply pattern, envision that there are thousands of requests entering the broker every second from many clients, all distributed across many hosts. In a production system, more than just a single broker instance would be used for the purposes of redundancy, failover, and load balancing. These brokers would also be distributed across many hosts. The only way to handle this many requests would be to use many workers. Producers can always send messages much faster than a consumer can receive and process them, so lots of workers would be needed, all of them spread out across many hosts as well. The advantage of using many workers is that each one can go up and down at will, and the overall system itself isn’t affected. The producers and workers would continue to process messages, and even if one of them crashed, it wouldn’t affect the system. This is exactly how many large-scale systems can handle such a tremendous load—through the use of asynchronous messaging like that demonstrated by the request/reply pattern.

This message was edited 8 times. Last update was at 24/08/2013 10:27:06

沒有留言:

張貼留言

[Git 常見問題] error: The following untracked working tree files would be overwritten by merge

  Source From  Here 方案1: // x -----删除忽略文件已经对 git 来说不识别的文件 // d -----删除未被添加到 git 的路径中的文件 // f -----强制运行 #   git clean -d -fx 方案2: 今天在服务器上  gi...