Preface:
ActiveMQ 最常為人知的是 Asynchronous message 的使用, 事實上它也可以用來實作 request/reply 的 synchronous client/server 架構. 而更之甚者, 它的好處是除了保留 request/reply 的原味外, 又增加了許多好處. 例如你可以在不停止系統下動態的增加 workload 以服務更多的 client. 底下是其實作的概念圖:
底下的範例代碼只說明使用的框架代碼, 實際的商業邏輯就因人而異.
Sample Code:
下面的代碼是來自 ActiveMQ In Action 第七章的 "Implementing request/reply with JMS" 節, 首先要來看的是 Broker 與 Worker 啟動的代碼. 這邊為了簡化 Broker 的啟動流程, 使用的是 Java Embedded 的 Broker; 而在使用 request/reply 的 message 傳輸時必須要有一個類似 TCP 的 session id, 這樣才知道哪個 reply 要配哪個 request, 而這個功能的實現是透過設定 Message 的JMSCorrelationID 屬性. 因為 Message 並不是直接送到 Worker 端, 所以 reply 的 destination 必須要透過 Client 設定 Message 的屬性 JMSReplyTo 得知. 有了這些概念, 就來看看 Broker+Worker 的代碼吧:
- Server.java
接著來看 Client 端的代碼, 在 Client 端一樣有 Consumer/Producer 的 agent, Producer 就是用來發送訊息到 "Request queue"; Consumer 是透過
session.createTemporaryQueue() 建立的 temple queue (名稱並不重要, 只是一個中介), 用來讓 Worker 有個地方可以送回 reply 的 Message. 另外 Client 再送出 request Message 前必須要做兩件事:
如此下面的 Client 代碼便不難了解了:
- Client.java
最後我將上面兩個代碼都封裝到
AMQInAction.jar, 可以在 這裡下載. 最後便是檢驗代碼的成果摟, 先要啟動 Broker+Worker:
接著是啟動 Client 發送 request Message, 並等待回復 Message:
其實這樣的功能簡單使用 Java Socket 的 programming 便可以達到, 但是除此之外它帶來的好處書上有提到:
ActiveMQ 最常為人知的是 Asynchronous message 的使用, 事實上它也可以用來實作 request/reply 的 synchronous client/server 架構. 而更之甚者, 它的好處是除了保留 request/reply 的原味外, 又增加了許多好處. 例如你可以在不停止系統下動態的增加 workload 以服務更多的 client. 底下是其實作的概念圖:
底下的範例代碼只說明使用的框架代碼, 實際的商業邏輯就因人而異.
Sample Code:
下面的代碼是來自 ActiveMQ In Action 第七章的 "Implementing request/reply with JMS" 節, 首先要來看的是 Broker 與 Worker 啟動的代碼. 這邊為了簡化 Broker 的啟動流程, 使用的是 Java Embedded 的 Broker; 而在使用 request/reply 的 message 傳輸時必須要有一個類似 TCP 的 session id, 這樣才知道哪個 reply 要配哪個 request, 而這個功能的實現是透過設定 Message 的JMSCorrelationID 屬性. 因為 Message 並不是直接送到 Worker 端, 所以 reply 的 destination 必須要透過 Client 設定 Message 的屬性 JMSReplyTo 得知. 有了這些概念, 就來看看 Broker+Worker 的代碼吧:
- Server.java
- package ch7;
- import javax.jms.Connection;
- import javax.jms.DeliveryMode;
- import javax.jms.Destination;
- import javax.jms.JMSException;
- import javax.jms.Message;
- import javax.jms.MessageConsumer;
- import javax.jms.MessageListener;
- import javax.jms.MessageProducer;
- import javax.jms.Session;
- import javax.jms.TextMessage;
- import org.apache.activemq.ActiveMQConnectionFactory;
- import org.apache.activemq.broker.BrokerService;
- public class Server implements MessageListener {
- private String brokerUrl = "tcp://0.0.0.0:61616";
- private String requestQueue = "requests";
- private BrokerService broker;
- private Session session;
- private MessageProducer producer;
- private MessageConsumer consumer;
- public void start() throws Exception {
- createBroker();
- setupConsumer();
- }
- private void createBroker() throws Exception {
- broker = new BrokerService();
- broker.setPersistent(false);
- broker.setUseJmx(false);
- broker.addConnector(brokerUrl);
- broker.start();
- }
- private void setupConsumer() throws JMSException {
- ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerUrl);
- Connection connection;
- connection = connectionFactory.createConnection();
- connection.start();
- session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Destination adminQueue = session.createQueue(requestQueue);
- producer = session.createProducer(null);
- producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
- consumer = session.createConsumer(adminQueue);
- consumer.setMessageListener(this);
- }
- public void stop() throws Exception {
- producer.close();
- consumer.close();
- session.close();
- broker.stop();
- }
- public void onMessage(Message message) {
- try {
- TextMessage response = this.session.createTextMessage();
- if (message instanceof TextMessage) {
- TextMessage txtMsg = (TextMessage) message;
- String messageText = txtMsg.getText();
- response.setText(handleRequest(messageText));
- }
- response.setJMSCorrelationID(message.getJMSCorrelationID());
- producer.send(message.getJMSReplyTo(), response);
- } catch (JMSException e) {
- e.printStackTrace();
- }
- }
- public String handleRequest(String messageText) {
- return "Response to '" + messageText + "'";
- }
- public static void main(String[] args) throws Exception {
- Server server = new Server();
- server.start();
- System.out.println();
- System.out.println("Press any key to stop the server");
- System.out.println();
- System.in.read();
- server.stop();
- }
- }
如此下面的 Client 代碼便不難了解了:
- Client.java
- package ch7;
- import java.util.UUID;
- import javax.jms.Connection;
- import javax.jms.DeliveryMode;
- import javax.jms.Destination;
- import javax.jms.JMSException;
- import javax.jms.Message;
- import javax.jms.MessageConsumer;
- import javax.jms.MessageListener;
- import javax.jms.MessageProducer;
- import javax.jms.Session;
- import javax.jms.TextMessage;
- import org.apache.activemq.ActiveMQConnectionFactory;
- public class Client implements MessageListener {
- private String brokerUrl = "tcp://0.0.0.0:61616";
- private String requestQueue = "requests";
- Connection connection;
- private Session session;
- private MessageProducer producer;
- private MessageConsumer consumer;
- private Destination tempDest;
- public void start() throws JMSException {
- ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
- brokerUrl);
- connection = connectionFactory.createConnection();
- connection.start();
- session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Destination adminQueue = session.createQueue(requestQueue);
- producer = session.createProducer(adminQueue);
- producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
- tempDest = session.createTemporaryQueue();
- consumer = session.createConsumer(tempDest);
- consumer.setMessageListener(this);
- }
- public void stop() throws JMSException {
- producer.close();
- consumer.close();
- session.close();
- connection.close();
- }
- public void request(String request) throws JMSException {
- System.out.println("Requesting: " + request);
- TextMessage txtMessage = session.createTextMessage();
- txtMessage.setText(request);
- txtMessage.setJMSReplyTo(tempDest);
- String correlationId = UUID.randomUUID().toString();
- txtMessage.setJMSCorrelationID(correlationId);
- this.producer.send(txtMessage);
- }
- public void onMessage(Message message) {
- try {
- System.out.println("Received response for: "
- + ((TextMessage) message).getText());
- } catch (JMSException e) {
- e.printStackTrace();
- }
- }
- public static void main(String[] args) throws Exception {
- Client client = new Client();
- client.start();
- int i = 0;
- while (i++ < 10) {
- client.request("REQUEST-" + i);
- }
- Thread.sleep(3000); //wait for replies
- client.stop();
- }
- }
接著是啟動 Client 發送 request Message, 並等待回復 Message:
其實這樣的功能簡單使用 Java Socket 的 programming 便可以達到, 但是除此之外它帶來的好處書上有提到:
This message was edited 8 times. Last update was at 24/08/2013 10:27:06
沒有留言:
張貼留言