程式扎記: [ Java設計模式 ] 多線程設計模式 : Worker Thread Pattern (等到工作來, 來了就工作)

標籤

2010年10月3日 星期日

[ Java設計模式 ] 多線程設計模式 : Worker Thread Pattern (等到工作來, 來了就工作)


前言 :
這是某個工廠. 這裡工人要組裝塑膠玩具模型. 工作的委託人會將塑膠玩具模型盒子搬去工廠, 堆放在桌上. 工人必須逐個組裝收到的塑膠玩具模型. 工人首先會去拿桌上堆放的模型盒子, 並加以組裝. 做完工作的工人, 就去拿下一個盒子來組合. 如果盒子都拿完了, 就等待委託人再拿盒子過來.
這章要學習的是 Worker Thread PatternWorker Thread Pattern 中, 工人線程會依次抓一件工作來處理. 當沒有工作時, 工人線程會停下來等待新的工作過來. Worker Thread 也有人稱 Background Thread. 另外也有人把重點放在管理工人線程的地方, 稱為 Thread Pool.

Worker Thread 所有參予者 :
* Client (委託人) 參予者 :
Client 參予者會建立 Request 參予者. 傳給 Channel 參予者. 範例程序中 Client 參予者是 ClientThread 類.

* Channel (通道) 參予者 : 
Channel 參予者會從Client 參予者獲取 Request 參予者並傳給 Worker 參予者. 範例程序中, Channel 參予者是 Channel 類.

* Worker 參予者 :
Worker 參予者會從 Channel 參予者獲取 Request 參予者, 並執行這份工作. 當工作結束後, 再去拿下一個 Request參予者. 範例程序中 Worker 參予者是 WorkerThread 類.

* Request 參予者 :
Request 參予者用來表示工作. Request 參予者會存放執行這份工作所需要的數據, 範例程序中. Reqest 參予者是 Request 類.

* Worker Thread 示意圖 :


範例程式 :
範例程式操作如下 :
ClientThread 線程會對 Channel 類發出工作請求. 而Channel 類實例可以設定 WorkerThread 的數目來處理工作請求. 當工作來臨時, WorkerThread 會從Channel 抓取一件請求 Request 來處理, 處理完後就回到Channel 等待下一個工作.

* Main 類 :
用來建立一個擁有 5 個 WorkerThread 的Pool 的 Channel 物件與 ClientThread 物件 (Alice, Bob) 來共享這個 Channel 實例. 並在30 秒後停止Channel 的 WorkerThread, 代碼如下 :
  1. package dp.thread.ch8;  
  2.   
  3. public class Main {  
  4.     public static void main(String args[]) {  
  5.         int runningTime = 10// 單位秒  
  6.         Channel channel = new Channel(5);  
  7.         channel.startWorkers();  
  8.         new ClientThread("Alice", channel,10).start();  
  9.         new ClientThread("Bob",channel,10).start();  
  10.         try{  
  11.             Thread.sleep(runningTime * 1000);  
  12.             System.out.println("Terminate channel now...");  
  13.             channel.stopWorkers();  
  14.         }catch(InterruptedException e) {  
  15.             //e.printStackTrace();  
  16.         }  
  17.     }  
  18. }  
* ClientThread 類 :
ClientThread 類用來送出工作請求 (Request 類) , 代碼如下 :
  1. package dp.thread.ch8;  
  2.   
  3. import java.util.Random;  
  4.   
  5. public class ClientThread extends Thread{  
  6.     private final Channel channel;  
  7.     private static final Random random = new Random();  
  8.     private int workCount = 50;  
  9.       
  10.     public ClientThread(String name, Channel channel, int wc) {  
  11.         super(name);  
  12.         this.channel = channel;  
  13.         if(wc > 0) {  
  14.             workCount = wc;  
  15.         }  
  16.     }  
  17.   
  18.     public void run(){  
  19.         try {  
  20.             for(int i=0;i
  21.                 Request request = new Request(this.getName(), i);  
  22.                 channel.putRequest(request);  
  23.                 System.out.println("Put Request:"+request.toString());  
  24.                 Thread.sleep(random.nextInt(1000));  
  25.             }              
  26.             System.out.println("==========Requests are all put from "+this.getName()+"==========");  
  27.         } catch( InterruptedException e) {  
  28.             e.printStackTrace();  
  29.         }          
  30.     }  
  31. }  
* Request 類 :
用來表示工作請求, 該類別擁有 name 字段用來代表委託者 (ClientThread) 的名稱. 而 number 字段 則是該項工作請求的序號. 代碼如下 :
  1. package dp.thread.ch8;  
  2.   
  3. import java.util.Random;  
  4.   
  5. public class Request {  
  6.     private final String name;  //委託者  
  7.     private final int number;  //請求編號  
  8.     private static final Random random = new Random();  
  9.   
  10.     public Request(String name, int num){  
  11.         this.name = name;  
  12.         this.number = num;  
  13.     }  
  14.   
  15.     public void execute(){  
  16.         System.out.println(Thread.currentThread().getName()+" executes "+this);  
  17.         try{  
  18.             Thread.sleep(random.nextInt(1000));  
  19.         }catch (InterruptedException e) {  
  20.             e.printStackTrace();  
  21.         }  
  22.     }  
  23.   
  24.     public String toString(){  
  25.         return "[Request from "+name+ " No."+number+"]";  
  26.     }      
  27. }  
* Channel 類 :
Channel 類用來接受與傳送工作請求, 並產生保存 WorkerThread 的 Pool. 代碼如下 :
  1. package dp.thread.ch8;  
  2.   
  3. public class Channel {  
  4.     private static final int MAX_REQUEST =1000;  
  5.     private final Request[] requestQueue;  
  6.     private int tail;    //下一個 putRequest的地方  
  7.     private int head;    //下一個 takeRequest的地方  
  8.     private int count;   //Request的數量  
  9.     private boolean isStop = false;  
  10.   
  11.     private final WorkerThread[] threadPool;  
  12.   
  13.     public Channel(int threads) {  
  14.         this.requestQueue = new Request[MAX_REQUEST];  
  15.         this.head = 0;  
  16.         this.tail = 0;  
  17.         this.count = 0;  
  18.   
  19.         threadPool = new WorkerThread[threads];  
  20.         for(int i=0;i
  21.             threadPool[i] = new WorkerThread("Worker-"+i,this);  
  22.         }  
  23.     }  
  24.   
  25.     public void startWorkers(){  
  26.         for(int i=0;i
  27.             threadPool[i].start();  
  28.             System.out.println("Start Worker:"+threadPool[i].getName());  
  29.         }  
  30.     }  
  31.   
  32.     public synchronized void stopWorkers() {  
  33.         isStop = true;  
  34.         int tcount = 0;  
  35.         for(int i=0;i
  36.             System.out.println("Stop Worker:"+threadPool[i].getName());  
  37.             tcount += threadPool[i].stopWork();  
  38.         }  
  39.         System.out.println("**********Total completed "+tcount+"**********");  
  40.         notifyAll();  
  41.     }  
  42.   
  43.     public synchronized boolean isRequestEmpty(){  
  44.         System.out.println("There are still "+count+" Request left...");  
  45.         if(count==0){  
  46.             return true;  
  47.         } else {              
  48.             return false;  
  49.         }         
  50.     }  
  51.   
  52.     public synchronized void putRequest(Request request) {  
  53.         while(count > requestQueue.length) {  
  54.             try{  
  55.                 wait();  
  56.             }catch(InterruptedException e) {  
  57.                // e.printStackTrace();  
  58.             }  
  59.         }  
  60.         requestQueue[tail] = request;  
  61.         tail = (tail + 1) % requestQueue.length;  
  62.         count++;  
  63.         notifyAll();  
  64.     }  
  65.   
  66.     public synchronized Request takeRequest() {          
  67.         while (count <=0 && !isStop) {  
  68.             try{  
  69.                 wait();  
  70.             } catch(InterruptedException e) {  
  71.                 //e.printStackTrace();  
  72.             }  
  73.         }  
  74.         if(isStop) {  
  75.             return null;  
  76.         }  
  77.           
  78.         Request request = requestQueue[head];  
  79.         head = (head + 1) % requestQueue.length;  
  80.         count--;  
  81.         notifyAll();  
  82.         return request;  
  83.     }  
  84. }  
* WorkerThread 類 :
WorkerThread 用來"執行/處理工作" 的操作. 代碼如下 :
  1. package dp.thread.ch8;  
  2.   
  3. public class WorkerThread extends Thread{  
  4.     private final Channel channel;  
  5.     private boolean isStop = false;  
  6.     private int count = 0;  
  7.   
  8.     WorkerThread(String name, Channel c) {  
  9.         super(name);  
  10.         this.channel = c;  
  11.     }  
  12.   
  13.     public void run(){  
  14.         while(!isStop) {  
  15.             Request request = channel.takeRequest();  
  16.             if(request!=null) {  
  17.                 request.execute();  
  18.                 count ++;  
  19.             }  
  20.             if(isStop) {  
  21.                 return;  
  22.             }  
  23.         }  
  24.           
  25.     }  
  26.   
  27.     public int stopWork(){  
  28.         isStop = true;  
  29.          System.out.println(getName()+" finished "+count+ " requests and leave now!");  
  30.         interrupt();  
  31.         return count;  
  32.     }  
  33. }  



執行結果 :
Start Worker:Worker-0
Start Worker:Worker-1
Start Worker:Worker-2
Start Worker:Worker-3
Start Worker:Worker-4
Put Request:[Request from Alice No.0]
Worker-3 executes [Request from Alice No.0]
...(中間省略)...
Worker-2 executes [Request from Alice No.9]
Put Request:[Request from Alice No.9]
==========Requests are all put from Alice==========
Put Request:[Request from Bob No.6]
Worker-0 executes [Request from Bob No.6]
Put Request:[Request from Bob No.7]
Worker-0 executes [Request from Bob No.7]
Put Request:[Request from Bob No.8]
Worker-0 executes [Request from Bob No.8]
Put Request:[Request from Bob No.9]
Worker-2 executes [Request from Bob No.9]
==========Requests are all put from Bob==========
Terminate channel now...
Stop Worker:Worker-0
Worker-0 finished 8 requests and leave now!
Stop Worker:Worker-1
Worker-1 finished 2 requests and leave now!
Stop Worker:Worker-2
Worker-2 finished 3 requests and leave now!
Stop Worker:Worker-3
Worker-3 finished 4 requests and leave now!
Stop Worker:Worker-4
Worker-4 finished 3 requests and leave now!
**********Total completed 20**********

Ps. 將Worker-0 ~ Worker4 的 finished requests 加起來應該等於20.

補充說明 :
@. 啟動線程是繁重的操作: 如果可以把自己的工作交給別人, 自己就可以去做其他事情. 線程也一樣, 這是 Thread-Per-Message Pattern的內容. 但是啟動線程是花費時間的工作, 所以 Worker Thread 將重複利用線程, 而資源再利用也是提升效能的考量之一.
@. 控制承載量: Work Thread 還有一個主題, 就是承載量 (capacity) 的控制. 提高 Worker 參予者的數量, 可以提高併發處理的工作量. 但如果準備的Worker 大部分時間都在等待時, 不但不會工作還會占用內存, 因此根據 Request 的量來動態調整 Worker的數量也是提升效能的方法之一.
@. Innovation 與 execution 的分離: 直覺來看, innovation 與 execution 通常是在一個動作裡完成. 但在 Worker Thread Pattern 與 Thread-Per-Message Pattern 卻將其分開, 即雖然工作已經被分配, 但是工作的執行可以按照我們的策略安排執行. 這樣的好處可以用以下幾點說明 :
提高響應性 : innovation 不會受到 execution 緩慢影響.
控制執行順率 : 工作可以依照設計順序執行.
可以取消與重複執行 : 可以參照Command 設計模式.
分散處理的第一步 : 可以多台主機參與執行
This message was edited 5 times. Last update was at 11/01/2010 14:31:02

沒有留言:

張貼留言

網誌存檔