2015年1月18日 星期日

[OO 設計模式] Gossip@DesignPattern : 多執行緒模式 - Thread Pool 模式

Source From Here
Preface
在 Thread-Per-Message 模式 中,每次請求來到,就建立一個新的執行緒,用完就不再使用,然後執行緒的建立需要系統資源,對於一個接受許多請求的情況,不斷的建立新執行緒,會導致系統 效能的降低。可以的話,若能重複使用所建立的執行緒,而不是用完就丟,可以有效的重複利用資源。在 Worker Thread 模式 的範例中,預先建立好執行緒,當請求佇列有新請求時,通知等待的執行緒取出請求進行處理,其實就是一種重複使用執行緒的方式。

Thread Pool 模式
該如何重複使用執行緒?執行緒一旦離開 run() 方法,該執行緒任務就不能再重複使用,所以就是想辦法讓執行緒不離開 run() 方法,但不執行完 run () 方法,又如何能完成您交付給執行緒的任務?看似矛盾的需求,其實在 Worker Thread 模式 中就有示範過概念了,也就是在 run() 中設定無窮迴圈:
  1. class Worker implements Runnable {  
  2.     private RequestQueue queue;  
  3.     Worker(RequestQueue queue) {  
  4.         this.queue = queue;  
  5.     }  
  6.     public void run() {  
  7.         while(true) {  
  8.             queue.get().execute();  
  9.         }  
  10.     }  
  11. }  
Thread pool 模式的概念就是,需要使用執行緒時,在一個執行緒池中尋找可用的執行緒,如果找不到再建立新的,執行緒使用完畢後,留在池中重複使用。以下是個簡單的示範程式,可以看出如何建立可重複使用的執行緒,與執行緒池的基本概念:
- ThreadPool.groovy (1)
  1. package dp.mt.threadPool  
  2.   
  3. import groovy.transform.Synchronized  
  4.   
  5. public class Request{  
  6.     String from  
  7.       
  8.     public Request(){}  
  9.     public Request(String cn){from=cn}  
  10.       
  11.     public void execute(String tn)  
  12.     {  
  13.         System.out.printf("${tn} handle request from ${from}...\n")  
  14.         sleep(2000)  
  15.     }  
  16. }  
  17.   
  18. class WorkerThread extends Thread   
  19. {  
  20.     private Request request  
  21.     private boolean isContinue = true  
  22.     private String  name=""  
  23.       
  24.     public WorkerThread(String n){this.name = n}  
  25.       
  26.     boolean isIdle() {  
  27.         return request == null;  
  28.     }  
  29.       
  30.     void setRequest(Request request) {  
  31.         if(isIdle()) {  
  32.             synchronized(this)  
  33.             {  
  34.                 this.request = request;  
  35.                 notify();  
  36.             }  
  37.         }  
  38.     }  
  39.       
  40.     @Override  
  41.     public void run() {  
  42.         while(isContinue) {  
  43.             synchronized(this) {  
  44.                 try   
  45.                 {  
  46.                     wait()  
  47.                 }  
  48.                 catch(InterruptedException e) {  
  49.                     e.printStackTrace();  
  50.                 }  
  51.                 request.execute(name)  
  52.                 request = null  
  53.             }  
  54.         }  
  55.     }  
  56.       
  57.     void terminate()   
  58.     {  
  59.         isContinue = false;  
  60.         notify()  
  61.     }  
  62. }  
  63.   
  64. class WorkerThreadPool {  
  65.     private List workerThreads  
  66.     private tid=0  
  67.     private tlimit=10 /*At most there will be 10 threads being running currently*/  
  68.       
  69.     WorkerThreadPool() {  
  70.         workerThreads = new ArrayList();  
  71.     }  
  72.     synchronized void service(Request request) {  
  73.         boolean idleNotFound = true;  
  74.         while(true)  
  75.         {  
  76.             for(WorkerThread workerThread : workerThreads) {  
  77.                 if(workerThread.isIdle()) {  
  78.                     workerThread.setRequest(request);  
  79.                     idleNotFound = false;  
  80.                     break;  
  81.                 }  
  82.             }  
  83.             if(workerThreads.size()break  
  84.         }  
  85.         if(idleNotFound) {  
  86.             WorkerThread workerThread = createWorkerThread();  
  87.             workerThread.setRequest(request);  
  88.         }  
  89.     }  
  90.     synchronized void cleanIdle() {  
  91.         for(WorkerThread workerThread : workerThreads) {  
  92.             if(workerThread.isIdle()) {  
  93.                 workerThreads.remove(workerThread);  
  94.                 workerThread.terminate();  
  95.             }  
  96.         }  
  97.     }  
  98.     private WorkerThread createWorkerThread() {  
  99.         WorkerThread workerThread = new WorkerThread("T${tid++}");  
  100.         workerThread.start();  
  101.         workerThreads.add(workerThread);  
  102.         try {  
  103.             Thread.sleep(1000); // 給點時間進入 Runnable  
  104.         }  
  105.         catch(InterruptedException e) {  
  106.             e.printStackTrace();  
  107.         }  
  108.         return workerThread;  
  109.     }  
  110. }  
範例中的 WorkerThreadPool 是個簡單的實現,您可以採用更完善的池化技術,另一個重點則在於 WorkerThread 如何重用,執行緒一旦啟 動,就進入無窮迴圈並進入等待,如果有設定請求,則被通知執行請求,請求執行完畢,回到迴圈開頭又進入等待,如此循環不斷。以下則是一個使用 WorkerThreadPool 的示範:
- ThreadPool.groovy (2)
  1. class Service {  
  2.     private WorkerThreadPool pool = new WorkerThreadPool();  
  3.     void accept(Request request) {  
  4.         pool.service(request);  
  5.     }  
  6. }  
  7.   
  8. // 以下模擬客戶發出請求  
  9. class Client implements Runnable {  
  10.     private Service service  
  11.     private String name  
  12.       
  13.     Client(String name, Service service) {  
  14.         this.name = name  
  15.         this.service = service;  
  16.     }  
  17.     public void run() {  
  18.         int sn=0  
  19.           
  20.         while(true) {  
  21.             sn++  
  22.             Request request = new Request("${name}-${sn}");           
  23.             service.accept(request);  
  24.             try {  
  25.                 Thread.sleep((int) (Math.random() * 1000));  
  26.             }  
  27.             catch(InterruptedException e) {  
  28.                 e.printStackTrace();  
  29.             }  
  30.         }  
  31.     }     
  32. }  
  33.   
  34. static void main(args)  
  35. {  
  36.     Service service = new Service();  
  37.     for(int i = 0; i < 5; i++) {  
  38.         (new Thread(new Client("C${i}", service))).start();  
  39.     }  
  40. }  
執行結果:
T0 handle request from C1-1...
T1 handle request from C3-1...
T2 handle request from C2-1...
T3 handle request from C0-1...
T0 handle request from C4-1...
T1 handle request from C2-2...
...

使用 Python 來示範的話:
  1. import threading  
  2. import time  
  3. import random  
  4.   
  5. class WorkerThread(threading.Thread):  
  6.     def __init__(self):  
  7.         threading.Thread.__init__(self)  
  8.         self.condition = threading.Condition()  
  9.         self.isContinue = True  
  10.         self.request = None  
  11.           
  12.     def isIdle(self):  
  13.         return self.request == None  
  14.           
  15.     def setRequest(self, request):  
  16.         self.condition.acquire()  
  17.         if self.isIdle():  
  18.             self.request = request  
  19.         self.condition.notify()  
  20.         self.condition.release()  
  21.       
  22.     def run(self):  
  23.         while self.isContinue:  
  24.             self.condition.acquire()  
  25.             self.condition.wait()  
  26.             self.request()  
  27.             self.request = None  
  28.             self.condition.release()  
  29.       
  30.     def terminate(self):  
  31.         self.isContinue = False  
  32.         self.setRequest(lambda: None) # do nothing  
  33.   
  34. class WorkerThreadPool:  
  35.     def __init__(self):  
  36.         self.workerThreads = []  
  37.       
  38.     def service(self, request):  
  39.         idleNotFound = True  
  40.         for workerThread in self.workerThreads:  
  41.             if workerThread.isIdle():  
  42.                 workerThread.setRequest(request)  
  43.                 idleNotFound = False  
  44.                 break  
  45.         if idleNotFound:  
  46.             workerThread = self.createWorkerThread()  
  47.             workerThread.setRequest(request)  
  48.               
  49.     def cleanIdle(self):  
  50.         for workerThread in self.workerThreads:  
  51.             if workerThread.isIdle():  
  52.                 self.workerThreads.remove(workerThread)  
  53.                 workerThread.terminate()  
  54.   
  55.     def createWorkerThread(self):  
  56.         workerThread = WorkerThread()  
  57.         workerThread.start()  
  58.         self.workerThreads.append(workerThread)  
  59.         time.sleep(1)  
  60.         return workerThread  
  61.   
  62. class Service:  
  63.     def __init__(self):  
  64.         self.pool = WorkerThreadPool()  
  65.           
  66.     def accept(self, request):  
  67.         self.pool.service(request)  
  68.           
  69. class Client(threading.Thread):  
  70.     def __init__(self, service):  
  71.         threading.Thread.__init__(self)  
  72.         self.service = service  
  73.           
  74.     def run(self):  
  75.         while True:  
  76.             second = int(random.random() * 3) # 隨機模擬請求的執行時間  
  77.             request = lambda: print("執行客戶請求...XD"); time.sleep(second)  
  78.             self.service.accept(request)  
  79.             time.sleep(int(random.random() * 3))  
  80.               
  81. service = Service()  
  82. for i in range(5):  
  83.     Client(service).start()  
  84.           
  85. while True:  
  86.     try:  
  87.         time.sleep(1)  
  88.     except KeyboardInterrupt:  
  89.         exit()  


沒有留言:

張貼留言

[Git 常見問題] error: The following untracked working tree files would be overwritten by merge

  Source From  Here 方案1: // x -----删除忽略文件已经对 git 来说不识别的文件 // d -----删除未被添加到 git 的路径中的文件 // f -----强制运行 #   git clean -d -fx 方案2: 今天在服务器上  gi...