Source From Here
Preface
在 Thread-Per-Message 模式 中,每次請求來到,就建立一個新的執行緒,用完就不再使用,然後執行緒的建立需要系統資源,對於一個接受許多請求的情況,不斷的建立新執行緒,會導致系統 效能的降低。可以的話,若能重複使用所建立的執行緒,而不是用完就丟,可以有效的重複利用資源。在 Worker Thread 模式 的範例中,預先建立好執行緒,當請求佇列有新請求時,通知等待的執行緒取出請求進行處理,其實就是一種重複使用執行緒的方式。
Thread Pool 模式
該如何重複使用執行緒?執行緒一旦離開 run() 方法,該執行緒任務就不能再重複使用,所以就是想辦法讓執行緒不離開 run() 方法,但不執行完 run () 方法,又如何能完成您交付給執行緒的任務?看似矛盾的需求,其實在 Worker Thread 模式 中就有示範過概念了,也就是在 run() 中設定無窮迴圈:
Thread pool 模式的概念就是,需要使用執行緒時,在一個執行緒池中尋找可用的執行緒,如果找不到再建立新的,執行緒使用完畢後,留在池中重複使用。以下是個簡單的示範程式,可以看出如何建立可重複使用的執行緒,與執行緒池的基本概念:
- ThreadPool.groovy (1)
範例中的
WorkerThreadPool 是個簡單的實現,您可以採用更完善的池化技術,另一個重點則在於 WorkerThread 如何重用,執行緒一旦啟 動,就進入無窮迴圈並進入等待,如果有設定請求,則被通知執行請求,請求執行完畢,回到迴圈開頭又進入等待,如此循環不斷。以下則是一個使用 WorkerThreadPool 的示範:
- ThreadPool.groovy (2)
執行結果:
使用 Python 來示範的話:
Preface
在 Thread-Per-Message 模式 中,每次請求來到,就建立一個新的執行緒,用完就不再使用,然後執行緒的建立需要系統資源,對於一個接受許多請求的情況,不斷的建立新執行緒,會導致系統 效能的降低。可以的話,若能重複使用所建立的執行緒,而不是用完就丟,可以有效的重複利用資源。在 Worker Thread 模式 的範例中,預先建立好執行緒,當請求佇列有新請求時,通知等待的執行緒取出請求進行處理,其實就是一種重複使用執行緒的方式。
Thread Pool 模式
該如何重複使用執行緒?執行緒一旦離開 run() 方法,該執行緒任務就不能再重複使用,所以就是想辦法讓執行緒不離開 run() 方法,但不執行完 run () 方法,又如何能完成您交付給執行緒的任務?看似矛盾的需求,其實在 Worker Thread 模式 中就有示範過概念了,也就是在 run() 中設定無窮迴圈:
- class Worker implements Runnable {
- private RequestQueue queue;
- Worker(RequestQueue queue) {
- this.queue = queue;
- }
- public void run() {
- while(true) {
- queue.get().execute();
- }
- }
- }
- ThreadPool.groovy (1)
- package dp.mt.threadPool
- import groovy.transform.Synchronized
- public class Request{
- String from
- public Request(){}
- public Request(String cn){from=cn}
- public void execute(String tn)
- {
- System.out.printf("${tn} handle request from ${from}...\n")
- sleep(2000)
- }
- }
- class WorkerThread extends Thread
- {
- private Request request
- private boolean isContinue = true
- private String name=""
- public WorkerThread(String n){this.name = n}
- boolean isIdle() {
- return request == null;
- }
- void setRequest(Request request) {
- if(isIdle()) {
- synchronized(this)
- {
- this.request = request;
- notify();
- }
- }
- }
- @Override
- public void run() {
- while(isContinue) {
- synchronized(this) {
- try
- {
- wait()
- }
- catch(InterruptedException e) {
- e.printStackTrace();
- }
- request.execute(name)
- request = null
- }
- }
- }
- void terminate()
- {
- isContinue = false;
- notify()
- }
- }
- class WorkerThreadPool {
- private List
workerThreads - private tid=0
- private tlimit=10 /*At most there will be 10 threads being running currently*/
- WorkerThreadPool() {
- workerThreads = new ArrayList
(); - }
- synchronized void service(Request request) {
- boolean idleNotFound = true;
- while(true)
- {
- for(WorkerThread workerThread : workerThreads) {
- if(workerThread.isIdle()) {
- workerThread.setRequest(request);
- idleNotFound = false;
- break;
- }
- }
- if(workerThreads.size()
break - }
- if(idleNotFound) {
- WorkerThread workerThread = createWorkerThread();
- workerThread.setRequest(request);
- }
- }
- synchronized void cleanIdle() {
- for(WorkerThread workerThread : workerThreads) {
- if(workerThread.isIdle()) {
- workerThreads.remove(workerThread);
- workerThread.terminate();
- }
- }
- }
- private WorkerThread createWorkerThread() {
- WorkerThread workerThread = new WorkerThread("T${tid++}");
- workerThread.start();
- workerThreads.add(workerThread);
- try {
- Thread.sleep(1000); // 給點時間進入 Runnable
- }
- catch(InterruptedException e) {
- e.printStackTrace();
- }
- return workerThread;
- }
- }
- ThreadPool.groovy (2)
- class Service {
- private WorkerThreadPool pool = new WorkerThreadPool();
- void accept(Request request) {
- pool.service(request);
- }
- }
- // 以下模擬客戶發出請求
- class Client implements Runnable {
- private Service service
- private String name
- Client(String name, Service service) {
- this.name = name
- this.service = service;
- }
- public void run() {
- int sn=0
- while(true) {
- sn++
- Request request = new Request("${name}-${sn}");
- service.accept(request);
- try {
- Thread.sleep((int) (Math.random() * 1000));
- }
- catch(InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
- }
- static void main(args)
- {
- Service service = new Service();
- for(int i = 0; i < 5; i++) {
- (new Thread(new Client("C${i}", service))).start();
- }
- }
使用 Python 來示範的話:
- import threading
- import time
- import random
- class WorkerThread(threading.Thread):
- def __init__(self):
- threading.Thread.__init__(self)
- self.condition = threading.Condition()
- self.isContinue = True
- self.request = None
- def isIdle(self):
- return self.request == None
- def setRequest(self, request):
- self.condition.acquire()
- if self.isIdle():
- self.request = request
- self.condition.notify()
- self.condition.release()
- def run(self):
- while self.isContinue:
- self.condition.acquire()
- self.condition.wait()
- self.request()
- self.request = None
- self.condition.release()
- def terminate(self):
- self.isContinue = False
- self.setRequest(lambda: None) # do nothing
- class WorkerThreadPool:
- def __init__(self):
- self.workerThreads = []
- def service(self, request):
- idleNotFound = True
- for workerThread in self.workerThreads:
- if workerThread.isIdle():
- workerThread.setRequest(request)
- idleNotFound = False
- break
- if idleNotFound:
- workerThread = self.createWorkerThread()
- workerThread.setRequest(request)
- def cleanIdle(self):
- for workerThread in self.workerThreads:
- if workerThread.isIdle():
- self.workerThreads.remove(workerThread)
- workerThread.terminate()
- def createWorkerThread(self):
- workerThread = WorkerThread()
- workerThread.start()
- self.workerThreads.append(workerThread)
- time.sleep(1)
- return workerThread
- class Service:
- def __init__(self):
- self.pool = WorkerThreadPool()
- def accept(self, request):
- self.pool.service(request)
- class Client(threading.Thread):
- def __init__(self, service):
- threading.Thread.__init__(self)
- self.service = service
- def run(self):
- while True:
- second = int(random.random() * 3) # 隨機模擬請求的執行時間
- request = lambda: print("執行客戶請求...XD"); time.sleep(second)
- self.service.accept(request)
- time.sleep(int(random.random() * 3))
- service = Service()
- for i in range(5):
- Client(service).start()
- while True:
- try:
- time.sleep(1)
- except KeyboardInterrupt:
- exit()
沒有留言:
張貼留言