2012年7月8日 星期日

[ Python 文章收集 ] Python模塊學習 - threading 多線程控制和處理

來源自 這裡 
前言 : 
Thread 是 threading 模塊中最重要的類之一,可以使用它來創建線程。有兩種方式來創建線程:一種是通過繼承Thread 類,重寫它的 run 方法;另一種是創建一個 threading.Thread 對象,在它的初始化函數(__init__)中將可調用對像作為參數傳入。下面分別舉例說明。先來看看通過繼承 threading.Thread 類來創建線程的例子 : 
  1. import  threading, time, random    
  2. count =  0    
  3. class  Counter(threading.Thread):    
  4.     def  __init__( self , lock, threadName):    
  5.         '''@summary:初始化對象。   
  6.            
  7.         @param lock: 瑣對象。   
  8.         @param threadName: 線程名稱。   
  9.         '''    
  10.         super(Counter,  self ).__init__(name = threadName)   #注意:一定要顯式的調用父類的初始化函數.  
  11.         self.lock = lock    
  12.         
  13.     def  run( self ):    
  14.         '''@summary:重寫父類run方法,在線程啟動後執行該方法內的代碼. '''    
  15.         global  count    
  16.         self.lock.acquire()    
  17.         for  i  in  range(1000):    
  18.             count = count +  1    
  19.         self .lock.release()    
  20. lock = threading.Lock()    
  21. for  i  in  range(5):     
  22.     Counter(lock,  "thread-"  + str(i)).start()   # Open 5 隻線程  
  23. time.sleep( 2 )    #確保線程都執行完畢    
  24. print("Count={0}!".format(count))   
在代碼中,我們創建了一個Counter類,它繼承了 threading.Thread。初始化函數接收兩個參數,一個是瑣對象,另一個是線程的名稱。在Counter中,重寫了從父類繼承的run方法,run方法將一個全局變量逐一的增加1000。在接下來的代碼中,創建了五個Counter對象,分別調用其start方法。最後打印結果。這裡要說明一下 run方法 和 start方法: 它們都是從 Thread 繼承而來的,run() 方法將在線程開啟後執行,可以把相關的邏輯寫到run方法中 ; start() 方法用於啟動線程. 


Thread 類別的使用 : 
再看看另外一種創建線程的方法 : 
  1. import  threading, time, random    
  2. count =  0    
  3. lock = threading.Lock()    
  4. def  doAdd():    
  5.     '' '''@summary:將全局變量count逐一的增加10000。   
  6.     '''    
  7.     global  count, lock    
  8.     lock.acquire()    
  9.     for  i  in  range( 1000 ):    
  10.         count = count +  1    
  11.     lock.release()  
  12.       
  13. for  i  in  range( 5 ):    
  14.     threading.Thread(target = doAdd, args = (), name =  'thread-'  + str(i)).start()    
  15. time.sleep( 2 )    #確保線程都執行完畢    
  16. print("Count={0}!".format(count))   
在這段代碼中,我們定義了方法doAdd,它將全局變量count逐一的增加 1000。然後創建了5 個Thread對象,把函數對象doAdd作為參數傳給它的初始化函數,再調用 Thread 對象的start方法,線程啟動後將執行 doAdd 函數。這裡有必要介紹一下 threading.Thread 類的初始化函數原型 : 
def __init__(self, group=None, target=None, name=None, args=(), kwargs={}) 
參數group是預留的,用於將來擴展;
參數target是一個可調用對象(也稱為活動[activity]),在線程啟動後執行;
參數name是線程的名字。默認值為“Thread-N“,N是一個數字。
參數args和kwargs分別表示調用target時的參數列表和關鍵字參數

Thread.join([timeout]) 的使用 : 
調用 Thread.join 將會使主調線程堵塞,直到被調用線程運行結束或超時。參數 timeout 是一個數值類型,表示超時時間,如果未提供該參數,那麼主調線程將一直堵塞到被調線程結束。下面舉個例子說明 join() 的使用 : 
  1. import  threading, time    
  2. def  doWaiting():    
  3.     print('\t[Info] start waiting...{0}' , time.strftime( '%H:%M:%S' ))    
  4.     time.sleep( 3 )    
  5.     print('\t[Info] stop waiting...{0}' , time.strftime( '%H:%M:%S' ))    
  6. thread1 = threading.Thread(target = doWaiting)    
  7. thread1.start()    
  8. time.sleep( 1 )   #確保線程thread1已經啟動    
  9. print('start join')    
  10. thread1.join()   #將一直堵塞,直到thread1運行結束。    
  11. print('end join')  
執行結果 : 
[Info] start waiting...{0} 19:03:25 # 線程開始執行
start join # 主線程等待其他線程執行完畢.
[Info] stop waiting...{0} 19:03:28 # 開啟線程結束
end join

threading.RLock 和 threading.Lock : 
在threading模塊中,定義兩種類型的瑣:threading.Lock 和 threading.RLock。它們之間有一點細微的區別,通過比較下面兩段代碼來說明 : 
  1. import  threading    
  2. lock = threading.Lock()  #Lock對象    
  3. lock.acquire()    
  4. lock.acquire()   #產生了死瑣。    
  5. lock.release()    
  6. lock.release()  
  1. import  threading    
  2. rLock = threading.RLock()   #RLock對象    
  3. rLock.acquire()    
  4. rLock.acquire()  #在同一線程內,程序不會堵塞。    
  5. rLock.release()    
  6. rLock.release()  
這兩種瑣的主要區別是:RLock允許在同一線程中被多次acquire。而Lock卻不允許這種情況。注意:如果使用RLock,那麼acquire和release必須成對出現,即調用了n次acquire,必須調用n次的release才能真正釋放所佔用的瑣! 

threading.Condition : 
可以把Condiftion理解為一把高級的瑣,它提供了比Lock, RLock更高級的功能,允許我們能夠控制複雜的線程同步問題。threadiong.Condition在內部維護一個瑣對象(默認是RLock),可以在創建Condigtion對象的時候把瑣對像作為參數傳入。Condition也提​​供了acquire, release方法,其含義與瑣的acquire, release方法一致,其實它只是簡單的調用內部瑣對象的對應的方法而已。Condition還提供瞭如下方法(特別要注意:這些方法只有在佔用瑣(acquire)之後才能調用,否則將會報RuntimeError異常。) : 
Condition.wait([timeout]): 
wait 方法釋放內部所佔用的瑣,同時線程被掛起,直至接收到通知被喚醒或超時(如果提供了timeout參數的話)。當線程被喚醒並重新佔有瑣的時候,程序才會繼續執行下去.

Condition.notify(): 
喚醒一個掛起的線程(如果存在掛起的線程)。注意: 此方法不會釋放所佔用的瑣!

Condition.notifyAll() 
喚醒所有掛起的線程(如果存在掛起的線程)。注意: 此方法不會釋放所佔用的瑣!

現在寫個捉迷藏的遊戲來具體介紹 threading.Condition 的基本使用。假設這個遊戲由兩個人來玩,一個藏(Hider),一個找 (Seeker)。遊戲的規則如下 : 
1. 遊戲開始之後,Seeker先把自己眼睛蒙上,蒙上眼睛後,就通知Hider;
2. Hider接收通知後開始找地方將自己藏起來,藏好之後,再通知Seeker可以找了;
3. Seeker接收到通知之後,就開始找Hider.

Hider 和 Seeker都是獨立的個體,在程序中用兩個獨立的線程來表示,在遊戲過程中,兩者之間的行為有一定的時序關係,我們通過 Condition 來控制這種時序關係 : 
  1. #---- Condition    
  2. #---- 捉迷藏的遊戲    
  3. import  threading, time    
  4. class  Hider(threading.Thread):    
  5.     def  __init__( self , cond, name):    
  6.         super(Hider,  self ).__init__()    
  7.         self.cond = cond    
  8.         self.name = name    
  9.         
  10.     def  run( self ):    
  11.         time.sleep( 1 )  #確保先運行Seeker中的方法       
  12.             
  13.         self.cond.acquire()  #b        
  14.         print('To {0}: 我已經把眼睛蒙上了'.format(self.name))  
  15.         print("\t[Info] {0} notify()...".format(self.name))  
  16.         self.cond.notify()  
  17.         print("\t[Info] {0} wait()...".format(self.name))  
  18.         self.cond.wait()  #c        
  19.                          #f     
  20.         print('To {0}: 我找到你了~_~'.format(self.name))  
  21.         print("\t[Info] {0} notify()...".format(self.name))  
  22.         self.cond.notify()  
  23.         print("\t[Info] {0} release()...".format(self.name))  
  24.         self.cond.release()    
  25.                             #g    
  26.         print('To {0}: 我贏了'.format(self.name))    #h    
  27.             
  28. class  Seeker(threading.Thread):    
  29.     def  __init__( self , cond, name):    
  30.         super(Seeker,  self ).__init__()    
  31.         self.cond = cond    
  32.         self.name = name    
  33.     def  run( self ):    
  34.         self.cond.acquire()  
  35.         print("\t[Info] {0} wait()...".format(self.name))  
  36.         self.cond.wait()     #a #釋放對瑣的佔用,同時線程掛起在這裡,直到被notify並重新佔有瑣。    
  37.                             #d    
  38.         print('To {0}: 我已經藏好了,你快來找我吧'.format(self.name))  
  39.         print("\t[Info] {0} notify()...".format(self.name))  
  40.         self.cond.notify()  
  41.         print("\t[Info] {0} wait()...".format(self.name))  
  42.         self.cond.wait()     #e    
  43.                             #h  
  44.         print("\t[Info] {0} release()...".format(self.name))  
  45.         self.cond.release()     
  46.         print('To {0}: 被你找到了,哎~~~'.format(self.name) )   
  47.             
  48. cond = threading.Condition()    
  49. seeker = Seeker(cond,  'seeker' )    
  50. hider = Hider(cond,  'hider' )    
  51. seeker.start()    
  52. hider.start()  
執行結果 : 
 

threading.Event & threading.Timer : 
Event 實現 與 Condition 類似的功能,不過比 Condition 簡單一點。它通過維護內部的標識符來實現線程間的同步問題。(threading.Event 和 .NET 中 System.Threading.ManualResetEvent類實現同樣的功能。). 而 threading.Timer 是 threading.Thread 的子類,可以在指定時間間隔後執行某個操作。下面是Python手冊上提供的一個例子 : 
  1. def hello():  
  2.     print "hello, world"  
  3.   
  4. t = Timer(30.0, hello)  
  5. t.start() # after 30 seconds, "hello, world" will be printed  
threading.Semaphore & threading.BoundedSemaphore: 
要使用 Semaphore 之前, 得先了解它是什麼樣的東西. Wiki 上的說明是: 
semaphore is a variable or abstract data type that is used for controlling access, by multiple processes, to a common resource in a parallel programming or a multi user environment. A useful way to think of a semaphore is as a record of how many units of a particular resource are available, coupled with operations to safely (i.e., without race conditions) adjust that record as units are required or become free, and, if necessary, wait until a unit of the resource becomes available.


有看沒有懂? 沒關係, 可以簡單想像 Semaphore 成進入與出去某個 Code block 的門鎖, 而這把門鎖會記錄多少個 Process 進入到控制的 Code block 以確保該 Code block 最多只能被 n 個 Process 同時執行, 如果 n=1, 則是常聽到的 Mutex (有進有出搂). 還是不清楚? 沒關係, 先看看看 Python 如何使用 Semaphore. 而 Python Standard Library 上對threading.Semaphore([value]) 的說明如下: 
A factory function that returns a new semaphore objectA semaphore manages a counter representing the number of release() calls minus the number of acquire()calls, plus an initial value. The acquire() method blocks if necessary until it can return without making the counter negative. If not given, value defaults to 1.

也就是說 Semaphore 物件上面只有兩個方法 acquire([blocking]) 與 release(), 另外在取得 Semaphore 物件的時候你可以透過參數 value 指定 Code block 最多只能有多少個 Process 同時進入 (該 Code block 即是所謂的 Critical Section). 接著來看 Semaphore 的 HelloWorld: 
  1. # coding=UTF-8  
  2. import threading, time, random   
  3.   
  4. count=0  
  5. round=3  
  6. lock=threading.Lock()  
  7. sem=threading.Semaphore(5)  # Code block can be executed by at most 5 threads concurrently  
  8. def codeBlock(thd, i):  
  9.     global count  
  10.     lock.acquire()  
  11.     count+=1  
  12.     print "\t[Info] {0}/{1} entering({2})...".format(thd.name, i, count)      
  13.     lock.release()      
  14.     time.sleep(random.randrange(2,10))  
  15.     lock.acquire()  
  16.     count-=1  
  17.     print "\t[Info] {0}/{1} exit({2})...".format(thd.name, i, count)      
  18.     lock.release()  
  19.       
  20. class Guest(threading.Thread):  
  21.     def  __init__( self , lock, threadName):  
  22.         super(Guest,  self ).__init__(name = threadName)   #注意:一定要顯式的調用父類的初始化函數.    
  23.         self.lock = lock  
  24.           
  25.     def  run( self ):      
  26.         '''@summary:重寫父類run方法,在線程啟動後執行該方法內的代碼. '''      
  27.         global  count                  
  28.         for  i  in  range(round):  
  29.             self.lock.acquire()      
  30.             codeBlock(self, i)      
  31.             self.lock.release()  
  32.         print "\t[Info] {0} Bye!".format(self.name)  
  33.           
  34. for  i  in  range(10):       
  35.     Guest(sem,  "thread-"+ str(i)).start()   # Open 10 隻線程  
執行結果部分 log 如下: 
...
[Info] thread-3/0 entering(4)...
[Info] thread-4/0 entering(5)...
[Info] thread-4/0 exit(4)...
[Info] thread-4/1 entering(5)...
[Info] thread-0/0 exit(4)...
[Info] thread-0/1 entering(5)...
[Info] thread-1/0 exit(4)...
[Info] thread-7/0 entering(5)...
[Info] thread-4/1 exit(4)...
[Info] thread-8/0 entering(5)...
...

上面的範例代碼中的 codeBlock 函數便是 Critical Section (要保護避免 Race condition 的程式區塊), 透過 Semaphore 的應用, 我們保證了 codeBlock 函數同一時間最多只被 5 支線程執行 (在上面範例代碼我們開啟了 10 支線程). 而這樣的證明可以從 log 後面的數字 enter(?) 與 exit(4) 得知 (數字不會超過 5). 

而 threading.BoundedSemaphore 使用上跟上面介紹一樣, 只是更為嚴謹. 例如 release() 卻沒有對應的 acquire(), 則會拋出 ValueError 異常. 

Barrier 使用 
考慮有一種多線程的使用狀況, 需要等到所有線程都到達同一行 code 才一起往下走 (也就是先到的要等後來的, 等全部到齊才繼續下一個動作). 有個名詞 Barrier 說明這樣的使用情境: 
In parallel computing, a barrier is a type of synchronization method. A barrier for a group of threads or processes in the source code means any thread/process must stop at this point and cannot proceed until all other threads/processes reach this barrier.

接著來看一個平行進行加法的範例: 
 

上面的示意圖說明每次 Iteration 都是由奇數的線程 (1,3,5...) 執行加法, 並將結果寫到偶數 (0,2,4,...) 的線程中. 這樣子的平行設計需要在每個 Iteration 開始前, 確認前一個 Iteration 的線程都已經執行完畢. 因此 Barrier 會被設置在每個 Iteration 結束的位置. 

目前 Python 2.7.8 中並沒有特別針對 Barrier 進行 API 的支持, 因此如果要完成上述的平行工作, 則需要利用既有的 threading API 進行 Barrier 的設置. 一種解法的 pseudo code 如下: 
  1. ATOMIC(status[pid] = BUSY);   
  2. BARRIER()  
  3. for(int s = 1; s <= N; s *= 2)   
  4. {   
  5.   if (pid % (2*s) == 0)   
  6.      while (ATOMIC(status[pid+s]!=DONE)); //wait for pid+s to finish  
  7.      temp_sum[pid] += temp_sum[pid+s];  
  8.   else {  
  9.      ATOMIC(status[pid] = DONE);           //my work is done, exit  
  10.      break;  
  11.   }  
  12. }  
這邊透過一個 status 的設置來間接完成 Barrier 的工作. for loop 的每一 round 都是一個 Iteration; 每一個 PID 為偶數的線程會等每一 round 對應的 PID 為奇數的線程狀態由 BUSY 轉為 DONE時, 才會進行加法的動作. 每完成一個 round, s 的值便會乘與 2 如同上面的示意圖, 指示每次加法元素的位置. 而有 ATOMIC 的地方說明需要 lock 保護確保一次只有一個線程執行. 

下面為完整的範例代碼: 
  1. # coding=UTF-8  
  2. import threading, time, random   
  3.   
  4. lock = threading.Lock()  
  5. rlock = threading.Lock()  
  6. data_size=10  
  7. tmp_sum = []  
  8.   
  9. print "\t[Info] Prepare data..."  
  10. for i in range(data_size):  
  11.     tmp_sum.append(i+1)  
  12.       
  13. class Status:          
  14.     def  __init__( self , size, lock):  
  15.         self._init(size)  
  16.         self.lock = lock  
  17.           
  18.     def _init(self, size):  
  19.         self.status = []  
  20.         for i in range(size+1):              
  21.             self.status.append(1)  
  22.               
  23.     def atom_read(self, pid):  
  24.         self.lock.acquire()  
  25.         try:  
  26.             return self.status[pid]  
  27.         finally:  
  28.             self.lock.release()  
  29.               
  30.     def atom_done(self, pid):  
  31.         self.lock.acquire()  
  32.         try:  
  33.             self.status[pid]=0  
  34.         finally:  
  35.             self.lock.release()  
  36.               
  37.     def isDone(self):  
  38.         return self.status[0]==0  
  39.           
  40. print "\t[Info] Preparing status object..."              
  41. status = Status(data_size, lock)  
  42.   
  43. class Reducer(threading.Thread):  
  44.     def  __init__( self , lock, pid, tsize, status, tmp_sum):  
  45.         super(Reducer,  self ).__init__(name = pid)   #注意:一定要顯式的調用父類的初始化函數.  
  46.         self.lock = lock            
  47.         self.pid = pid  
  48.         self.tsize= tsize  
  49.         self.status = status  
  50.         self.tmp_sum = tmp_sum  
  51.           
  52.     def  run(self):  
  53.         for s in range(0,self.tsize):  
  54.             s=2**s  
  55.             if s>self.tsize: break  
  56.             if (self.pid % (2*s)==0) and (self.pid+s<self.tsize):  
  57.                 while(self.status.atom_read(self.pid+s)!=0):  
  58.                     print "\t[Info] PID={0} wait for PID={1}...".format(self.pid, self.pid+s)  
  59.                     time.sleep(1)  
  60.                     pass  
  61.                 #print "sum(tmp_sum[{0}]={1}, tmp_sum[{2}]={3})...".format(self.pid, self.tmp_sum[self.pid], self.pid+s, self.tmp_sum[self.pid+s])  
  62.                 self.tmp_sum[self.pid]=self.tmp_sum[self.pid+s]+self.tmp_sum[self.pid]  
  63.                 self.lock.acquire()  
  64.                 print "\t[Info] tmp_sum[{0}]={1}...(s={2})".format(self.pid, self.tmp_sum[self.pid], s)  
  65.                 self.lock.release()  
  66.             else:                  
  67.                 break  
  68.         self.lock.acquire()  
  69.         print "\t[Info] PID={0} is done with value={1}...".format(self.pid, self.tmp_sum[self.pid])  
  70.         self.lock.release()  
  71.         self.status.atom_done(self.pid)  
  72.               
  73. print "\t[Info] Start paralleling actions..."  
  74. for i in range(data_size):  
  75.     Reducer(rlock, i, data_size, status, tmp_sum).start()  
  76.                    
  77. while(not status.isDone()):  
  78.     time.sleep(2)  
  79.       
  80. print "\t[Info] Sum={0}".format(tmp_sum[0])    
執行結果為 1+2+3+...+10=55


Supplement 
Standard Library - threading — Higher-level threading interface

8 則留言:

  1. 很好的文章,感謝分享,原Po有時間可以介紹一下 Semaphore 和 Barrier 嗎?

    回覆刪除
    回覆
    1. 看來是高手喔 ^^. 因為我不是原 Po, 但是 Semaphore 與 Barrier 略懂.
      有空我再補這兩個, 十分感謝!

      刪除
    2. Semaphore 與 Barrier 補充上去搂, 有不周全的再請指教摟 ^^

      刪除
  2. 56行應為
    if (self.pid % (2 * s) == 0)and (self.pid + s < self.tsize):

    回覆刪除
  3. 56行應為
    if (self.pid % (2 * s) == 0)and (self.pid + s < self.tsize):

    回覆刪除
    回覆
    1. Thanks for reminding! The post had been updated.

      刪除
  4. 請問如何在 各自的 Thread 開出 console shell for debug . ex: print variables.

    expect behavior same with the tcl example ==> thread::send $tid {source c:/tcl/bin/tkcon.tcl ; tkcon show }

    回覆刪除

[Git 常見問題] error: The following untracked working tree files would be overwritten by merge

  Source From  Here 方案1: // x -----删除忽略文件已经对 git 来说不识别的文件 // d -----删除未被添加到 git 的路径中的文件 // f -----强制运行 #   git clean -d -fx 方案2: 今天在服务器上  gi...