2018年1月17日 星期三

[Python 文章收集] multiprocessing 模塊介紹

Source From Here 
Preface 
multiprocessing 套件是 Python 中的多進程管理包。它與 threading.Thread 類似,可以利用 multiprocessing.Process 對象來創建一個進程。該進程可以允許放在 Python 程序內部編寫的函數中。該 Process 對象與 Thread 對象的用法相同,擁有 is_alive()、join([timeout])、run()、start()、terminate() 等方法。屬性有:authkey、daemon(要通過 start() 設置)、exitcode (進程在運行時為 None、如果為 –N,表示被信號N結束)、name、pid。此外 multiprocessing 套件中也有 Lock/Event/Semaphore/Condition 類,用來同步進程,其用法也與 threading 包中的同名類一樣。multiprocessing 的很大一部份與 threading 使用同一套 API,只不過換到了多進程的情境。 

這個模塊表示像線程一樣管理進程,這個是 multiprocessing 的核心,它與 threading 很相似,對多核 CPU 的利用率會比 threading 好的多。看一下 Process 類的構造方法: 
  1. class multiprocessing.Process(group=None, target=None, name=None, args=(), kwargs={})  
參數說明: 
group:進程所屬組。基本不用
target:表示調用對象。
args:表示調用對象的位置參數元組。
name:別名
kwargs:表示調用對象的字典。

創建進程的簡單實例: 
- test.py 
  1. #!/usr/bin/env python  
  2. #coding=utf-8  
  3. import multiprocessing  
  4.   
  5. def do(n) :  
  6.   # Obtain the name of current process  
  7.   name = multiprocessing.current_process().name  
  8.   print name,'starting'  
  9.   print "worker ", n  
  10.   return  
  11.   
  12. if __name__ == '__main__' :  
  13.   numList = []  
  14.   for i in xrange(5) :  
  15.     p = multiprocessing.Process(target=do, args=(i,))  
  16.     numList.append(p)  
  17.     p.start()  
  18.     p.join()  
  19.     print "Process end."  
執行結果: 
# ./test.py
Process-1 starting
worker 0
Process end.
Process-2 starting
worker 1
Process end.
Process-3 starting
worker 2
Process end.
Process-4 starting
worker 3
Process end.
Process-5 starting
worker 4
Process end.

創建子進程時,只需要傳入一個執行函數和函數的參數,創建一個 Process 實例,並用其 start() 方法啟動,這樣創建進程比 fork() 還要簡單。join() 方法表示等待子進程結束以後再繼續往下運行,通常用於進程間的同步。 
注意: 
在 Windows 上要想使用進程模塊,就必須把有關進程的代碼寫在當前 .py 文件的 if __name__ == '__main__' :語句的下面,才能正常使用Windows下的進程模塊。Unix/Linux下則不需要。

Pool 類 
在使用 Python 進行系統管理時,特別是同時操作多個文件目錄或者遠程控制多台主機,並行操作可以節約大量的時間。如果操作的對像數目不大時,還可以直接使用 Process 類動態的生成多個進程,十幾個還好,但是如果上百個甚至更多,那手動去限制進程數量就顯得特別的繁瑣,此時進程池就派上用場了。Pool 類可以提供指定數量的進程供用戶調用,當有新的請求提交到 Pool 中時,如果池還沒有滿,就會創建一個新的進程來執行請求。如果池滿,請求就會告知先等待,直到池中有進程結束,才會創建新的進程來執行這些請求。下面介紹一下 multiprocessing 模塊下的 Pool 類下的幾個方法. 

apply() 
  1. apply(func[, args[, kwds]])  
該函數用於傳遞不定參數,主進程會被阻塞直到函數執行結束(不建議使用,並且 3.x 以後不在出現)。 

apply_async() 
  1. apply_async(func[, args[, kwds[, callback]]])  
與 apply 用法一樣,但它是非阻塞且支持結果返回進行回調。 

map() 
  1. map(func, iterable[, chunksize])  
Pool 類中的 map 方法,與內置的 map 函數用法行為基本一致,它會使進程阻塞直到返回結果。注意,雖然第二個參數是一個迭代器,但在實際使用中,必須在整個隊列都就緒後,程序才會運行子進程。 

map_async 
  1. map_async(func, iterable[, chunksize[, callback]])  
A variant of the map() method which returns a result object. 

If callback is specified then it should be a callable which accepts a single argument. When the result becomes ready callback is applied to it (unless the call failed). callback should complete immediately since otherwise the thread which handles the results will get blocked. 

close() 
關閉進程池(pool),使其不在接受新的任務。 

terminate() 
結束工作進程,不在處理未處理的任務。 

join() 
主進程阻塞等待子進程的退出,join 方法必須在 close 或 terminate 之後使用。 

底下是 multiprocessing.Pool 類 的簡單範例: 
- test2.py 
  1. #!/usr/bin/env python  
  2. import time  
  3. from multiprocessing import Pool  
  4. def run(fn):  
  5.   time.sleep(1)  
  6.   return fn*fn  
  7.   
  8. if __name__ == "__main__":  
  9.   testFL = [1,2,3,4,5,6]  
  10.   print 'shunxu:'  # Single process to process each element one by one  
  11.   s = time.time()  
  12.   for fn in testFL:  
  13.     run(fn)  
  14.   
  15.   e1 = time.time()  
  16.   print "Execution time in single process"int(e1 - s)  
  17.   
  18.   print 'concurrent:' # Build multiple process and handle element concurrently  
  19.   pool = Pool(5)  # Build up a Pool with 5 process  
  20.   #testFL: The list of element to handle ; run: Function to handle the element  
  21.   rl =pool.map(run, testFL)  
  22.   pool.close()  # Close the pool to not create new process  
  23.   pool.join()   # Make main process to wait for the pool  
  24.   e2 = time.time()  
  25.   print "Execution time in concurrent:"int(e2-e1)  
  26.   print rl  
執行結果: 
# ./test2.py
shunxu:
Execution time in single process 6
concurrent:
Execution time in concurrent: 2
[1, 4, 9, 16, 25, 36]

上例是一個創建多個進程並發處理與順序執行處理同一數據,所用時間的差別。從結果可以看出,並發執行的時間明顯比順序執行要快很多,但是進程是要耗資源的,所以平時工作中,進程數也不能開太大。 程序中的 r1 表示全部進程執行結束後全局的返回結果集,run 函數有返回值,所以一個進程對應一個返回結果,這個結果存在一個列表中,也就是一個結果堆中,實際上是用了隊列的原理,等待所有進程都執行完畢,就返回這個列表(列表的順序不定)。對 Pool 對象調用 join() 方法會等待所有子進程執行完畢,調用 join() 之前必須先調用 close(),讓其不再接受新的 Process 了。 

再看一個實例: 
- test3.py 
  1. import time  
  2. from multiprocessing import Pool  
  3. def run(fn) :  
  4.   time.sleep(2)  
  5.   print fn  
  6. if __name__ == "__main__" :  
  7.   startTime = time.time()  
  8.   testFL = [1,2,3,4,5]  
  9.   pool = Pool(10)  # Create 10 process  
  10.   pool.map(run,testFL)  
  11.   pool.close()  
  12.   pool.join()     
  13.   endTime = time.time()  
  14.   print "time :", endTime - startTime  
執行結果: 
21

3
4
5
time : 2.51999998093

再次執行結果如下: 
1
34

2
5
time : 2.48600006104

結果中為什麼還有空行和沒有折行的數據呢?其實這跟進程調度有關,當有多個進程並行執行時,每個進程得到的時間片時間不一樣,哪個進程接受哪個請求以及執行完成時間都是不定的,所以會出現輸出亂序的情況。那為什麼又會有沒這行和空行的情況呢?因為有可能在執行第一個進程時,剛要打印換行符時,切換到另一個進程,這樣就極有可能兩個數字打印到同一行,並且再次切換回第一個進程時會打印一個換行符,所以就會出現空行的情況。 

進程實戰實例 
並行處理某個目錄下文件中的字符個數和行數,存入 res.txt 文件中,每個文件一行,格式為:
filename:lineNumber,charNumber

- test3.py 
  1. #!/usr/bin/env python  
  2. import os  
  3. import time  
  4. from multiprocessing import Pool  
  5.   
  6. def getFile(path) :  
  7.   # Obtain the file list under given folder  
  8.   fileList = []  
  9.   for root, dirs, files in list(os.walk(path)) :  
  10.     for i in files :  
  11.       if i.endswith('.py'):  
  12.         fileList.append(root + "/" + i)  
  13.   return fileList  
  14.   
  15. def operFile(filePath) :  
  16.   # Calculate the line, word count of given file path  
  17.   filePath = filePath  
  18.   fp = open(filePath)  
  19.   content = fp.readlines()  
  20.   fp.close()  
  21.   lines = len(content)  
  22.   alphaNum = 0  
  23.   for i in content :  
  24.     alphaNum += len(i.strip('\n'))  
  25.   return lines,alphaNum,filePath  
  26.   
  27. def out(list1, writeFilePath) :  
  28.   # Output the statistic result into given file  
  29.   fileLines = 0  
  30.   charNum = 0  
  31.   fp = open(writeFilePath,'a')  
  32.   for i in list1 :  
  33.     fp.write(i[2] + " Line: "+ str(i[0]) + "Word: "+str(i[1]) + "\n")  
  34.     fileLines += i[0]  
  35.     charNum += i[1]  
  36.   fp.close()  
  37.   print fileLines, charNum  
  38.   
  39. if __name__ == "__main__":  
  40.   # Create multiple process to handle the request  
  41.   startTime = time.time()  
  42.   filePath = "/root/tmp/Udemy/machine_learning_examples/"  
  43.   fileList = getFile(filePath)  
  44.   pool = Pool(5)  
  45.   resultList =pool.map(operFile, fileList)  
  46.   pool.close()  
  47.   pool.join()  
  48.   
  49.   writeFilePath = "res.txt"  
  50.   print('Total {} py file(s) being processed!'.format(len(resultList)))  
  51.   out(resultList, writeFilePath)  
  52.   endTime = time.time()  
  53.   print "used time is ", endTime - startTime  
執行結果: 
# ./test3.py
Total 263 py file(s) being processed!
31887 916489
used time is 0.124183893204


# head res.txt
/root/tmp/Udemy/machine_learning_examples//best_fit_line.py Line: 58Word: 1042
/root/tmp/Udemy/machine_learning_examples/ab_testing/bayesian_bandit.py Line: 74Word: 1747
/root/tmp/Udemy/machine_learning_examples/ab_testing/chisquare.py Line: 69Word: 1904
/root/tmp/Udemy/machine_learning_examples/ab_testing/ci_comparison.py Line: 43Word: 1158
/root/tmp/Udemy/machine_learning_examples/ab_testing/client.py Line: 51Word: 1261
/root/tmp/Udemy/machine_learning_examples/ab_testing/convergence.py Line: 40Word: 1017
/root/tmp/Udemy/machine_learning_examples/ab_testing/demo.py Line: 33Word: 949
/root/tmp/Udemy/machine_learning_examples/ab_testing/ex_chisq.py Line: 42Word: 1141
/root/tmp/Udemy/machine_learning_examples/ab_testing/ex_ttest.py Line: 26Word: 716
/root/tmp/Udemy/machine_learning_examples/ab_testing/server_solution.py Line: 76Word: 1653

耗時不到1秒,可見多進程並發執行速度是很快的。

沒有留言:

張貼留言

[Git 常見問題] error: The following untracked working tree files would be overwritten by merge

  Source From  Here 方案1: // x -----删除忽略文件已经对 git 来说不识别的文件 // d -----删除未被添加到 git 的路径中的文件 // f -----强制运行 #   git clean -d -fx 方案2: 今天在服务器上  gi...