Preface
multiprocessing 套件是 Python 中的多進程管理包。它與 threading.Thread 類似,可以利用 multiprocessing.Process 對象來創建一個進程。該進程可以允許放在 Python 程序內部編寫的函數中。該 Process 對象與 Thread 對象的用法相同,擁有 is_alive()、join([timeout])、run()、start()、terminate() 等方法。屬性有:authkey、daemon(要通過 start() 設置)、exitcode (進程在運行時為 None、如果為 –N,表示被信號N結束)、name、pid。此外 multiprocessing 套件中也有 Lock/Event/Semaphore/Condition 類,用來同步進程,其用法也與 threading 包中的同名類一樣。multiprocessing 的很大一部份與 threading 使用同一套 API,只不過換到了多進程的情境。
這個模塊表示像線程一樣管理進程,這個是 multiprocessing 的核心,它與 threading 很相似,對多核 CPU 的利用率會比 threading 好的多。看一下 Process 類的構造方法:
- class multiprocessing.Process(group=None, target=None, name=None, args=(), kwargs={})
創建進程的簡單實例:
- test.py
- #!/usr/bin/env python
- #coding=utf-8
- import multiprocessing
- def do(n) :
- # Obtain the name of current process
- name = multiprocessing.current_process().name
- print name,'starting'
- print "worker ", n
- return
- if __name__ == '__main__' :
- numList = []
- for i in xrange(5) :
- p = multiprocessing.Process(target=do, args=(i,))
- numList.append(p)
- p.start()
- p.join()
- print "Process end."
創建子進程時,只需要傳入一個執行函數和函數的參數,創建一個 Process 實例,並用其 start() 方法啟動,這樣創建進程比 fork() 還要簡單。join() 方法表示等待子進程結束以後再繼續往下運行,通常用於進程間的同步。
注意:
Pool 類
在使用 Python 進行系統管理時,特別是同時操作多個文件目錄或者遠程控制多台主機,並行操作可以節約大量的時間。如果操作的對像數目不大時,還可以直接使用 Process 類動態的生成多個進程,十幾個還好,但是如果上百個甚至更多,那手動去限制進程數量就顯得特別的繁瑣,此時進程池就派上用場了。Pool 類可以提供指定數量的進程供用戶調用,當有新的請求提交到 Pool 中時,如果池還沒有滿,就會創建一個新的進程來執行請求。如果池滿,請求就會告知先等待,直到池中有進程結束,才會創建新的進程來執行這些請求。下面介紹一下 multiprocessing 模塊下的 Pool 類下的幾個方法.
apply()
- apply(func[, args[, kwds]])
apply_async()
- apply_async(func[, args[, kwds[, callback]]])
map()
- map(func, iterable[, chunksize])
map_async
- map_async(func, iterable[, chunksize[, callback]])
If callback is specified then it should be a callable which accepts a single argument. When the result becomes ready callback is applied to it (unless the call failed). callback should complete immediately since otherwise the thread which handles the results will get blocked.
close()
關閉進程池(pool),使其不在接受新的任務。
terminate()
結束工作進程,不在處理未處理的任務。
join()
主進程阻塞等待子進程的退出,join 方法必須在 close 或 terminate 之後使用。
底下是 multiprocessing.Pool 類 的簡單範例:
- test2.py
- #!/usr/bin/env python
- import time
- from multiprocessing import Pool
- def run(fn):
- time.sleep(1)
- return fn*fn
- if __name__ == "__main__":
- testFL = [1,2,3,4,5,6]
- print 'shunxu:' # Single process to process each element one by one
- s = time.time()
- for fn in testFL:
- run(fn)
- e1 = time.time()
- print "Execution time in single process", int(e1 - s)
- print 'concurrent:' # Build multiple process and handle element concurrently
- pool = Pool(5) # Build up a Pool with 5 process
- #testFL: The list of element to handle ; run: Function to handle the element
- rl =pool.map(run, testFL)
- pool.close() # Close the pool to not create new process
- pool.join() # Make main process to wait for the pool
- e2 = time.time()
- print "Execution time in concurrent:", int(e2-e1)
- print rl
上例是一個創建多個進程並發處理與順序執行處理同一數據,所用時間的差別。從結果可以看出,並發執行的時間明顯比順序執行要快很多,但是進程是要耗資源的,所以平時工作中,進程數也不能開太大。 程序中的 r1 表示全部進程執行結束後全局的返回結果集,run 函數有返回值,所以一個進程對應一個返回結果,這個結果存在一個列表中,也就是一個結果堆中,實際上是用了隊列的原理,等待所有進程都執行完畢,就返回這個列表(列表的順序不定)。對 Pool 對象調用 join() 方法會等待所有子進程執行完畢,調用 join() 之前必須先調用 close(),讓其不再接受新的 Process 了。
再看一個實例:
- test3.py
- import time
- from multiprocessing import Pool
- def run(fn) :
- time.sleep(2)
- print fn
- if __name__ == "__main__" :
- startTime = time.time()
- testFL = [1,2,3,4,5]
- pool = Pool(10) # Create 10 process
- pool.map(run,testFL)
- pool.close()
- pool.join()
- endTime = time.time()
- print "time :", endTime - startTime
再次執行結果如下:
結果中為什麼還有空行和沒有折行的數據呢?其實這跟進程調度有關,當有多個進程並行執行時,每個進程得到的時間片時間不一樣,哪個進程接受哪個請求以及執行完成時間都是不定的,所以會出現輸出亂序的情況。那為什麼又會有沒這行和空行的情況呢?因為有可能在執行第一個進程時,剛要打印換行符時,切換到另一個進程,這樣就極有可能兩個數字打印到同一行,並且再次切換回第一個進程時會打印一個換行符,所以就會出現空行的情況。
進程實戰實例
並行處理某個目錄下文件中的字符個數和行數,存入 res.txt 文件中,每個文件一行,格式為:
- test3.py
- #!/usr/bin/env python
- import os
- import time
- from multiprocessing import Pool
- def getFile(path) :
- # Obtain the file list under given folder
- fileList = []
- for root, dirs, files in list(os.walk(path)) :
- for i in files :
- if i.endswith('.py'):
- fileList.append(root + "/" + i)
- return fileList
- def operFile(filePath) :
- # Calculate the line, word count of given file path
- filePath = filePath
- fp = open(filePath)
- content = fp.readlines()
- fp.close()
- lines = len(content)
- alphaNum = 0
- for i in content :
- alphaNum += len(i.strip('\n'))
- return lines,alphaNum,filePath
- def out(list1, writeFilePath) :
- # Output the statistic result into given file
- fileLines = 0
- charNum = 0
- fp = open(writeFilePath,'a')
- for i in list1 :
- fp.write(i[2] + " Line: "+ str(i[0]) + "Word: "+str(i[1]) + "\n")
- fileLines += i[0]
- charNum += i[1]
- fp.close()
- print fileLines, charNum
- if __name__ == "__main__":
- # Create multiple process to handle the request
- startTime = time.time()
- filePath = "/root/tmp/Udemy/machine_learning_examples/"
- fileList = getFile(filePath)
- pool = Pool(5)
- resultList =pool.map(operFile, fileList)
- pool.close()
- pool.join()
- writeFilePath = "res.txt"
- print('Total {} py file(s) being processed!'.format(len(resultList)))
- out(resultList, writeFilePath)
- endTime = time.time()
- print "used time is ", endTime - startTime
耗時不到1秒,可見多進程並發執行速度是很快的。
沒有留言:
張貼留言