前言 :
這是某個工廠. 這裡工人要組裝塑膠玩具模型. 工作的委託人會將塑膠玩具模型盒子搬去工廠, 堆放在桌上. 工人必須逐個組裝收到的塑膠玩具模型. 工人首先會去拿桌上堆放的模型盒子, 並加以組裝. 做完工作的工人, 就去拿下一個盒子來組合. 如果盒子都拿完了, 就等待委託人再拿盒子過來.
這章要學習的是 Worker Thread Pattern. Worker Thread Pattern 中, 工人線程會依次抓一件工作來處理. 當沒有工作時, 工人線程會停下來等待新的工作過來. Worker Thread 也有人稱 Background Thread. 另外也有人把重點放在管理工人線程的地方, 稱為 Thread Pool.
Worker Thread 所有參予者 :
* Client (委託人) 參予者 :
* Channel (通道) 參予者 :
* Worker 參予者 :
* Request 參予者 :
* Worker Thread 示意圖 :
範例程式 :
範例程式操作如下 :
* Main 類 :
用來建立一個擁有 5 個 WorkerThread 的Pool 的 Channel 物件與 ClientThread 物件 (Alice, Bob) 來共享這個 Channel 實例. 並在30 秒後停止Channel 的 WorkerThread, 代碼如下 :
* ClientThread 類 :
ClientThread 類用來送出工作請求 (Request 類) , 代碼如下 :
* Request 類 :
用來表示工作請求, 該類別擁有 name 字段用來代表委託者 (ClientThread) 的名稱. 而 number 字段 則是該項工作請求的序號. 代碼如下 :
* Channel 類 :
Channel 類用來接受與傳送工作請求, 並產生保存 WorkerThread 的 Pool. 代碼如下 :
* WorkerThread 類 :
WorkerThread 用來"執行/處理工作" 的操作. 代碼如下 :
執行結果 :
Ps. 將Worker-0 ~ Worker4 的 finished requests 加起來應該等於20.
補充說明 :
@. 啟動線程是繁重的操作: 如果可以把自己的工作交給別人, 自己就可以去做其他事情. 線程也一樣, 這是 Thread-Per-Message Pattern的內容. 但是啟動線程是花費時間的工作, 所以 Worker Thread 將重複利用線程, 而資源再利用也是提升效能的考量之一.
@. 控制承載量: Work Thread 還有一個主題, 就是承載量 (capacity) 的控制. 提高 Worker 參予者的數量, 可以提高併發處理的工作量. 但如果準備的Worker 大部分時間都在等待時, 不但不會工作還會占用內存, 因此根據 Request 的量來動態調整 Worker的數量也是提升效能的方法之一.
@. Innovation 與 execution 的分離: 直覺來看, innovation 與 execution 通常是在一個動作裡完成. 但在 Worker Thread Pattern 與 Thread-Per-Message Pattern 卻將其分開, 即雖然工作已經被分配, 但是工作的執行可以按照我們的策略安排執行. 這樣的好處可以用以下幾點說明 :
這是某個工廠. 這裡工人要組裝塑膠玩具模型. 工作的委託人會將塑膠玩具模型盒子搬去工廠, 堆放在桌上. 工人必須逐個組裝收到的塑膠玩具模型. 工人首先會去拿桌上堆放的模型盒子, 並加以組裝. 做完工作的工人, 就去拿下一個盒子來組合. 如果盒子都拿完了, 就等待委託人再拿盒子過來.
這章要學習的是 Worker Thread Pattern. Worker Thread Pattern 中, 工人線程會依次抓一件工作來處理. 當沒有工作時, 工人線程會停下來等待新的工作過來. Worker Thread 也有人稱 Background Thread. 另外也有人把重點放在管理工人線程的地方, 稱為 Thread Pool.
Worker Thread 所有參予者 :
* Client (委託人) 參予者 :
* Channel (通道) 參予者 :
* Worker 參予者 :
* Request 參予者 :
* Worker Thread 示意圖 :
範例程式 :
範例程式操作如下 :
* Main 類 :
用來建立一個擁有 5 個 WorkerThread 的Pool 的 Channel 物件與 ClientThread 物件 (Alice, Bob) 來共享這個 Channel 實例. 並在30 秒後停止Channel 的 WorkerThread, 代碼如下 :
- package dp.thread.ch8;
- public class Main {
- public static void main(String args[]) {
- int runningTime = 10; // 單位秒
- Channel channel = new Channel(5);
- channel.startWorkers();
- new ClientThread("Alice", channel,10).start();
- new ClientThread("Bob",channel,10).start();
- try{
- Thread.sleep(runningTime * 1000);
- System.out.println("Terminate channel now...");
- channel.stopWorkers();
- }catch(InterruptedException e) {
- //e.printStackTrace();
- }
- }
- }
ClientThread 類用來送出工作請求 (Request 類) , 代碼如下 :
- package dp.thread.ch8;
- import java.util.Random;
- public class ClientThread extends Thread{
- private final Channel channel;
- private static final Random random = new Random();
- private int workCount = 50;
- public ClientThread(String name, Channel channel, int wc) {
- super(name);
- this.channel = channel;
- if(wc > 0) {
- workCount = wc;
- }
- }
- public void run(){
- try {
- for(int i=0;i
- Request request = new Request(this.getName(), i);
- channel.putRequest(request);
- System.out.println("Put Request:"+request.toString());
- Thread.sleep(random.nextInt(1000));
- }
- System.out.println("==========Requests are all put from "+this.getName()+"==========");
- } catch( InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
用來表示工作請求, 該類別擁有 name 字段用來代表委託者 (ClientThread) 的名稱. 而 number 字段 則是該項工作請求的序號. 代碼如下 :
- package dp.thread.ch8;
- import java.util.Random;
- public class Request {
- private final String name; //委託者
- private final int number; //請求編號
- private static final Random random = new Random();
- public Request(String name, int num){
- this.name = name;
- this.number = num;
- }
- public void execute(){
- System.out.println(Thread.currentThread().getName()+" executes "+this);
- try{
- Thread.sleep(random.nextInt(1000));
- }catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- public String toString(){
- return "[Request from "+name+ " No."+number+"]";
- }
- }
Channel 類用來接受與傳送工作請求, 並產生保存 WorkerThread 的 Pool. 代碼如下 :
- package dp.thread.ch8;
- public class Channel {
- private static final int MAX_REQUEST =1000;
- private final Request[] requestQueue;
- private int tail; //下一個 putRequest的地方
- private int head; //下一個 takeRequest的地方
- private int count; //Request的數量
- private boolean isStop = false;
- private final WorkerThread[] threadPool;
- public Channel(int threads) {
- this.requestQueue = new Request[MAX_REQUEST];
- this.head = 0;
- this.tail = 0;
- this.count = 0;
- threadPool = new WorkerThread[threads];
- for(int i=0;i
- threadPool[i] = new WorkerThread("Worker-"+i,this);
- }
- }
- public void startWorkers(){
- for(int i=0;i
- threadPool[i].start();
- System.out.println("Start Worker:"+threadPool[i].getName());
- }
- }
- public synchronized void stopWorkers() {
- isStop = true;
- int tcount = 0;
- for(int i=0;i
- System.out.println("Stop Worker:"+threadPool[i].getName());
- tcount += threadPool[i].stopWork();
- }
- System.out.println("**********Total completed "+tcount+"**********");
- notifyAll();
- }
- public synchronized boolean isRequestEmpty(){
- System.out.println("There are still "+count+" Request left...");
- if(count==0){
- return true;
- } else {
- return false;
- }
- }
- public synchronized void putRequest(Request request) {
- while(count > requestQueue.length) {
- try{
- wait();
- }catch(InterruptedException e) {
- // e.printStackTrace();
- }
- }
- requestQueue[tail] = request;
- tail = (tail + 1) % requestQueue.length;
- count++;
- notifyAll();
- }
- public synchronized Request takeRequest() {
- while (count <=0 && !isStop) {
- try{
- wait();
- } catch(InterruptedException e) {
- //e.printStackTrace();
- }
- }
- if(isStop) {
- return null;
- }
- Request request = requestQueue[head];
- head = (head + 1) % requestQueue.length;
- count--;
- notifyAll();
- return request;
- }
- }
WorkerThread 用來"執行/處理工作" 的操作. 代碼如下 :
- package dp.thread.ch8;
- public class WorkerThread extends Thread{
- private final Channel channel;
- private boolean isStop = false;
- private int count = 0;
- WorkerThread(String name, Channel c) {
- super(name);
- this.channel = c;
- }
- public void run(){
- while(!isStop) {
- Request request = channel.takeRequest();
- if(request!=null) {
- request.execute();
- count ++;
- }
- if(isStop) {
- return;
- }
- }
- }
- public int stopWork(){
- isStop = true;
- System.out.println(getName()+" finished "+count+ " requests and leave now!");
- interrupt();
- return count;
- }
- }
執行結果 :
Ps. 將Worker-0 ~ Worker4 的 finished requests 加起來應該等於20.
補充說明 :
@. 啟動線程是繁重的操作: 如果可以把自己的工作交給別人, 自己就可以去做其他事情. 線程也一樣, 這是 Thread-Per-Message Pattern的內容. 但是啟動線程是花費時間的工作, 所以 Worker Thread 將重複利用線程, 而資源再利用也是提升效能的考量之一.
@. 控制承載量: Work Thread 還有一個主題, 就是承載量 (capacity) 的控制. 提高 Worker 參予者的數量, 可以提高併發處理的工作量. 但如果準備的Worker 大部分時間都在等待時, 不但不會工作還會占用內存, 因此根據 Request 的量來動態調整 Worker的數量也是提升效能的方法之一.
@. Innovation 與 execution 的分離: 直覺來看, innovation 與 execution 通常是在一個動作裡完成. 但在 Worker Thread Pattern 與 Thread-Per-Message Pattern 卻將其分開, 即雖然工作已經被分配, 但是工作的執行可以按照我們的策略安排執行. 這樣的好處可以用以下幾點說明 :
This message was edited 5 times. Last update was at 11/01/2010 14:31:02
沒有留言:
張貼留言