前言 :
Thread 是 threading 模塊中最重要的類之一,可以使用它來創建線程。有兩種方式來創建線程:一種是通過繼承Thread 類,重寫它的 run 方法;另一種是創建一個 threading.Thread 對象,在它的初始化函數(__init__)中將可調用對像作為參數傳入。下面分別舉例說明。先來看看通過繼承 threading.Thread 類來創建線程的例子 :
- import threading, time, random
- count = 0
- class Counter(threading.Thread):
- def __init__( self , lock, threadName):
- '''@summary:初始化對象。
- @param lock: 瑣對象。
- @param threadName: 線程名稱。
- '''
- super(Counter, self ).__init__(name = threadName) #注意:一定要顯式的調用父類的初始化函數.
- self.lock = lock
- def run( self ):
- '''@summary:重寫父類run方法,在線程啟動後執行該方法內的代碼. '''
- global count
- self.lock.acquire()
- for i in range(1000):
- count = count + 1
- self .lock.release()
- lock = threading.Lock()
- for i in range(5):
- Counter(lock, "thread-" + str(i)).start() # Open 5 隻線程
- time.sleep( 2 ) #確保線程都執行完畢
- print("Count={0}!".format(count))
Thread 類別的使用 :
再看看另外一種創建線程的方法 :
- import threading, time, random
- count = 0
- lock = threading.Lock()
- def doAdd():
- '' '''@summary:將全局變量count逐一的增加10000。
- '''
- global count, lock
- lock.acquire()
- for i in range( 1000 ):
- count = count + 1
- lock.release()
- for i in range( 5 ):
- threading.Thread(target = doAdd, args = (), name = 'thread-' + str(i)).start()
- time.sleep( 2 ) #確保線程都執行完畢
- print("Count={0}!".format(count))
* def __init__(self, group=None, target=None, name=None, args=(), kwargs={})
Thread.join([timeout]) 的使用 :
調用 Thread.join 將會使主調線程堵塞,直到被調用線程運行結束或超時。參數 timeout 是一個數值類型,表示超時時間,如果未提供該參數,那麼主調線程將一直堵塞到被調線程結束。下面舉個例子說明 join() 的使用 :
- import threading, time
- def doWaiting():
- print('\t[Info] start waiting...{0}' , time.strftime( '%H:%M:%S' ))
- time.sleep( 3 )
- print('\t[Info] stop waiting...{0}' , time.strftime( '%H:%M:%S' ))
- thread1 = threading.Thread(target = doWaiting)
- thread1.start()
- time.sleep( 1 ) #確保線程thread1已經啟動
- print('start join')
- thread1.join() #將一直堵塞,直到thread1運行結束。
- print('end join')
threading.RLock 和 threading.Lock :
在threading模塊中,定義兩種類型的瑣:threading.Lock 和 threading.RLock。它們之間有一點細微的區別,通過比較下面兩段代碼來說明 :
- import threading
- lock = threading.Lock() #Lock對象
- lock.acquire()
- lock.acquire() #產生了死瑣。
- lock.release()
- lock.release()
- import threading
- rLock = threading.RLock() #RLock對象
- rLock.acquire()
- rLock.acquire() #在同一線程內,程序不會堵塞。
- rLock.release()
- rLock.release()
threading.Condition :
可以把Condiftion理解為一把高級的瑣,它提供了比Lock, RLock更高級的功能,允許我們能夠控制複雜的線程同步問題。threadiong.Condition在內部維護一個瑣對象(默認是RLock),可以在創建Condigtion對象的時候把瑣對像作為參數傳入。Condition也提供了acquire, release方法,其含義與瑣的acquire, release方法一致,其實它只是簡單的調用內部瑣對象的對應的方法而已。Condition還提供瞭如下方法(特別要注意:這些方法只有在佔用瑣(acquire)之後才能調用,否則將會報RuntimeError異常。) :
- Condition.wait([timeout]):
- Condition.notify():
- Condition.notifyAll()
現在寫個捉迷藏的遊戲來具體介紹 threading.Condition 的基本使用。假設這個遊戲由兩個人來玩,一個藏(Hider),一個找 (Seeker)。遊戲的規則如下 :
Hider 和 Seeker都是獨立的個體,在程序中用兩個獨立的線程來表示,在遊戲過程中,兩者之間的行為有一定的時序關係,我們通過 Condition 來控制這種時序關係 :
- #---- Condition
- #---- 捉迷藏的遊戲
- import threading, time
- class Hider(threading.Thread):
- def __init__( self , cond, name):
- super(Hider, self ).__init__()
- self.cond = cond
- self.name = name
- def run( self ):
- time.sleep( 1 ) #確保先運行Seeker中的方法
- self.cond.acquire() #b
- print('To {0}: 我已經把眼睛蒙上了'.format(self.name))
- print("\t[Info] {0} notify()...".format(self.name))
- self.cond.notify()
- print("\t[Info] {0} wait()...".format(self.name))
- self.cond.wait() #c
- #f
- print('To {0}: 我找到你了~_~'.format(self.name))
- print("\t[Info] {0} notify()...".format(self.name))
- self.cond.notify()
- print("\t[Info] {0} release()...".format(self.name))
- self.cond.release()
- #g
- print('To {0}: 我贏了'.format(self.name)) #h
- class Seeker(threading.Thread):
- def __init__( self , cond, name):
- super(Seeker, self ).__init__()
- self.cond = cond
- self.name = name
- def run( self ):
- self.cond.acquire()
- print("\t[Info] {0} wait()...".format(self.name))
- self.cond.wait() #a #釋放對瑣的佔用,同時線程掛起在這裡,直到被notify並重新佔有瑣。
- #d
- print('To {0}: 我已經藏好了,你快來找我吧'.format(self.name))
- print("\t[Info] {0} notify()...".format(self.name))
- self.cond.notify()
- print("\t[Info] {0} wait()...".format(self.name))
- self.cond.wait() #e
- #h
- print("\t[Info] {0} release()...".format(self.name))
- self.cond.release()
- print('To {0}: 被你找到了,哎~~~'.format(self.name) )
- cond = threading.Condition()
- seeker = Seeker(cond, 'seeker' )
- hider = Hider(cond, 'hider' )
- seeker.start()
- hider.start()
threading.Event & threading.Timer :
Event 實現 與 Condition 類似的功能,不過比 Condition 簡單一點。它通過維護內部的標識符來實現線程間的同步問題。(threading.Event 和 .NET 中 System.Threading.ManualResetEvent類實現同樣的功能。). 而 threading.Timer 是 threading.Thread 的子類,可以在指定時間間隔後執行某個操作。下面是Python手冊上提供的一個例子 :
- def hello():
- print "hello, world"
- t = Timer(30.0, hello)
- t.start() # after 30 seconds, "hello, world" will be printed
要使用 Semaphore 之前, 得先了解它是什麼樣的東西. Wiki 上的說明是:
有看沒有懂? 沒關係, 可以簡單想像 Semaphore 成進入與出去某個 Code block 的門鎖, 而這把門鎖會記錄多少個 Process 進入到控制的 Code block 以確保該 Code block 最多只能被 n 個 Process 同時執行, 如果 n=1, 則是常聽到的 Mutex (有進有出搂). 還是不清楚? 沒關係, 先看看看 Python 如何使用 Semaphore. 而 Python Standard Library 上對threading.Semaphore([value]) 的說明如下:
也就是說 Semaphore 物件上面只有兩個方法 acquire([blocking]) 與 release(), 另外在取得 Semaphore 物件的時候你可以透過參數 value 指定 Code block 最多只能有多少個 Process 同時進入 (該 Code block 即是所謂的 Critical Section). 接著來看 Semaphore 的 HelloWorld:
- # coding=UTF-8
- import threading, time, random
- count=0
- round=3
- lock=threading.Lock()
- sem=threading.Semaphore(5) # Code block can be executed by at most 5 threads concurrently
- def codeBlock(thd, i):
- global count
- lock.acquire()
- count+=1
- print "\t[Info] {0}/{1} entering({2})...".format(thd.name, i, count)
- lock.release()
- time.sleep(random.randrange(2,10))
- lock.acquire()
- count-=1
- print "\t[Info] {0}/{1} exit({2})...".format(thd.name, i, count)
- lock.release()
- class Guest(threading.Thread):
- def __init__( self , lock, threadName):
- super(Guest, self ).__init__(name = threadName) #注意:一定要顯式的調用父類的初始化函數.
- self.lock = lock
- def run( self ):
- '''@summary:重寫父類run方法,在線程啟動後執行該方法內的代碼. '''
- global count
- for i in range(round):
- self.lock.acquire()
- codeBlock(self, i)
- self.lock.release()
- print "\t[Info] {0} Bye!".format(self.name)
- for i in range(10):
- Guest(sem, "thread-"+ str(i)).start() # Open 10 隻線程
上面的範例代碼中的 codeBlock 函數便是 Critical Section (要保護避免 Race condition 的程式區塊), 透過 Semaphore 的應用, 我們保證了 codeBlock 函數同一時間最多只被 5 支線程執行 (在上面範例代碼我們開啟了 10 支線程). 而這樣的證明可以從 log 後面的數字 enter(?) 與 exit(4) 得知 (數字不會超過 5).
而 threading.BoundedSemaphore 使用上跟上面介紹一樣, 只是更為嚴謹. 例如 release() 卻沒有對應的 acquire(), 則會拋出 ValueError 異常.
Barrier 使用
考慮有一種多線程的使用狀況, 需要等到所有線程都到達同一行 code 才一起往下走 (也就是先到的要等後來的, 等全部到齊才繼續下一個動作). 有個名詞 Barrier 說明這樣的使用情境:
接著來看一個平行進行加法的範例:
上面的示意圖說明每次 Iteration 都是由奇數的線程 (1,3,5...) 執行加法, 並將結果寫到偶數 (0,2,4,...) 的線程中. 這樣子的平行設計需要在每個 Iteration 開始前, 確認前一個 Iteration 的線程都已經執行完畢. 因此 Barrier 會被設置在每個 Iteration 結束的位置.
目前 Python 2.7.8 中並沒有特別針對 Barrier 進行 API 的支持, 因此如果要完成上述的平行工作, 則需要利用既有的 threading API 進行 Barrier 的設置. 一種解法的 pseudo code 如下:
- ATOMIC(status[pid] = BUSY);
- BARRIER()
- for(int s = 1; s <= N; s *= 2)
- {
- if (pid % (2*s) == 0)
- while (ATOMIC(status[pid+s]!=DONE)); //wait for pid+s to finish
- temp_sum[pid] += temp_sum[pid+s];
- else {
- ATOMIC(status[pid] = DONE); //my work is done, exit
- break;
- }
- }
下面為完整的範例代碼:
- # coding=UTF-8
- import threading, time, random
- lock = threading.Lock()
- rlock = threading.Lock()
- data_size=10
- tmp_sum = []
- print "\t[Info] Prepare data..."
- for i in range(data_size):
- tmp_sum.append(i+1)
- class Status:
- def __init__( self , size, lock):
- self._init(size)
- self.lock = lock
- def _init(self, size):
- self.status = []
- for i in range(size+1):
- self.status.append(1)
- def atom_read(self, pid):
- self.lock.acquire()
- try:
- return self.status[pid]
- finally:
- self.lock.release()
- def atom_done(self, pid):
- self.lock.acquire()
- try:
- self.status[pid]=0
- finally:
- self.lock.release()
- def isDone(self):
- return self.status[0]==0
- print "\t[Info] Preparing status object..."
- status = Status(data_size, lock)
- class Reducer(threading.Thread):
- def __init__( self , lock, pid, tsize, status, tmp_sum):
- super(Reducer, self ).__init__(name = pid) #注意:一定要顯式的調用父類的初始化函數.
- self.lock = lock
- self.pid = pid
- self.tsize= tsize
- self.status = status
- self.tmp_sum = tmp_sum
- def run(self):
- for s in range(0,self.tsize):
- s=2**s
- if s>self.tsize: break
- if (self.pid % (2*s)==0) and (self.pid+s<self.tsize):
- while(self.status.atom_read(self.pid+s)!=0):
- print "\t[Info] PID={0} wait for PID={1}...".format(self.pid, self.pid+s)
- time.sleep(1)
- pass
- #print "sum(tmp_sum[{0}]={1}, tmp_sum[{2}]={3})...".format(self.pid, self.tmp_sum[self.pid], self.pid+s, self.tmp_sum[self.pid+s])
- self.tmp_sum[self.pid]=self.tmp_sum[self.pid+s]+self.tmp_sum[self.pid]
- self.lock.acquire()
- print "\t[Info] tmp_sum[{0}]={1}...(s={2})".format(self.pid, self.tmp_sum[self.pid], s)
- self.lock.release()
- else:
- break
- self.lock.acquire()
- print "\t[Info] PID={0} is done with value={1}...".format(self.pid, self.tmp_sum[self.pid])
- self.lock.release()
- self.status.atom_done(self.pid)
- print "\t[Info] Start paralleling actions..."
- for i in range(data_size):
- Reducer(rlock, i, data_size, status, tmp_sum).start()
- while(not status.isDone()):
- time.sleep(2)
- print "\t[Info] Sum={0}".format(tmp_sum[0])
Supplement
* Standard Library - threading — Higher-level threading interface
很好的文章,感謝分享,原Po有時間可以介紹一下 Semaphore 和 Barrier 嗎?
回覆刪除看來是高手喔 ^^. 因為我不是原 Po, 但是 Semaphore 與 Barrier 略懂.
刪除有空我再補這兩個, 十分感謝!
Semaphore 與 Barrier 補充上去搂, 有不周全的再請指教摟 ^^
刪除56行應為
回覆刪除if (self.pid % (2 * s) == 0)and (self.pid + s < self.tsize):
56行應為
回覆刪除if (self.pid % (2 * s) == 0)and (self.pid + s < self.tsize):
Thanks for reminding! The post had been updated.
刪除請問如何在 各自的 Thread 開出 console shell for debug . ex: print variables.
回覆刪除expect behavior same with the tcl example ==> thread::send $tid {source c:/tcl/bin/tkcon.tcl ; tkcon show }
nice post.
回覆刪除data science online free