2020年5月5日 星期二

[ Python 文章收集 ] How can I Launch Parallel Tasks in Python

Source From Here
Preface
By reading this piece, you will learn how to use the concurrent.futures library to run tasks asynchronously in Python. It is a better alternative to the threading and multiprocessing classes in Python due to the fact that it implemented both Thread and Process with the same interface, which is defined by the abstract Executor class. The official documentation reveals one major problem with Thread.

Besides, the threading class does not allow you to return a value from the callable functions except null. The main concept of the concurrent.futures module lies with the Executor class. It is an abstract class that provides methods to execute calls asynchronously. Instead of using it directly, we will be using the subclasses that inherit from it:
ThreadPoolExecutor: The ThreadPoolExecutor class is an Executor subclass that uses a pool of threads to execute calls asynchronously.
ProcessPoolExecutor: The ProcessPoolExecutor class is an Executor subclass that uses a pool of processes to execute calls asynchronously.

Let’s proceed to the next section and start writing some Python code.

ThreadPoolExecutor

Import
Add the following import declaration at the top of your Python file:
  1. from concurrent.futures import ThreadPoolExecutor  
  2. import time  
Callable function (target)
Let’s define a new function that serves as the callable function for the asynchronous call. I will just define a simple function that sleeps for two seconds and returns the multiplication of both input parameters as a result after that:
  1. def wait_function(x, y):  
  2.     print('Task(', x,'multiply', y, ') started')  
  3.     time.sleep(2)  
  4.     print('Task(', x,'multiply', y, ') completed')  
  5.     return x * y  
Single task
The next step is to create a ThreadPoolExecutor object. It is highly recommended to wrap it inside the with context manager, as it will call the shutdown function on its own and free up the resources once it finishes the execution. It accepts the following input parameters.
max_workers — The number of workers for this instance. For version 3.5 onward, it will default to the number of processors on the machine, multiplied by five. From version 3.8 onward, the default value is changed to min(32, os.cpu_count() + 4).
* thread_name_prefix — Allows users to control the threading.Thread names for worker threads created by the pool for easier debugging.
initializer — An optional callable that is called at the start of each worker process.
initargs — A tuple of arguments passed to the initializer.

In this tutorial, I will be using just the max_workers parameter. Let’s create a ThreadPoolExecutor and call the submit function with the wait_function as an input callable function. Remember that wait_function accepts two input parameters. I am going to pass them as separate parameters instead of a tuple:
  1. with ThreadPoolExecutor(max_workers=1) as executor:  
  2.     future = executor.submit(wait_function, 34)  
Output:
Task( 3 multiply 4 ) started
Task( 3 multiply 4 ) completed

The submit function will return a Future object that encapsulates the asynchronous execution of a callable. The most commonly used functions for the Future object are:
cancel — Attempts to cancel the execution. Returns a boolean that indicates if the call has been successfully canceled. running — Checks if the call is being executed. Returns a boolean.
done — Checks if the call was canceled or completed. Returns a boolean.
result — Returns the value returned by the call. If the call hasn’t yet completed, then this method will wait up to n seconds given by the input timeout parameter. It is highly recommended to check using the done function before calling the result, as timeout will block the current execution.
add_done_callback — Attaches the callable function to the Future object. This function will be called with Future as its only argument when Future is canceled or finishes running.

Append the following code right below the submit function. It is just a simple loop that prints a string while the thread is running. When it is completed, it will print out the result:
  1. while True:  
  2.     if future.running():  
  3.         print("Task 1 running")  
  4.     elif future.done():  
  5.         print(future.result())  
  6.         break  
Check out the complete code on GitHub.

Multiple tasks
Next, we are going to add another task to it so that both of them will run in parallel. Change the code in your Python file to the following:
  1. %%time  
  2. from concurrent.futures import ThreadPoolExecutor  
  3. import time  
  4.   
  5. def wait_function(x, y):  
  6.     print('Task(', x,'multiply', y, ') started')  
  7.     time.sleep(2)  
  8.     print('Task(', x,'multiply', y, ') completed')  
  9.     return x * y  
  10.   
  11. with ThreadPoolExecutor(max_workers=1) as executor: #change max_workers to 2 and see the results  
  12.     future = executor.submit(wait_function, 34)  
  13.     future2 = executor.submit(wait_function, 88)  
  14.     while True:  
  15.         if future.running() or future2.running():  
  16.             continue  
  17.   
  18.         if(future.done() and future2.done()):  
  19.             print(future.result(), future2.result())  
  20.             break  
Output:
Task( 3 multiply 4 ) started
Task( 3 multiply 4 ) completed
Task( 8 multiply 8 ) started
Task( 8 multiply 8 ) completed
12 64
Wall time: 4.03 s

For now, set the max_workers to one first. Run it and you should notice that the tasks are not running in parallel. It will run the first task and then the second task. This is mainly because you only have one worker in the pool. Let’s increase the max_workers to two and you should be able to see that both tasks are running in parallel.

Callback function
You can attach a callback function to the Future object. It will call the attached function once the execution is canceled or completed. This is extremely useful if you intend to proceed with an update to the UI after a successful connection to the database or completion of URL requests. Let’s create a simple callback function for now:
  1. def callback_function(future):  
  2.     print('Callback with the following result', future.result())  
Check out the complete code on GitHub. The following result will be shown in the console when you run the Python file:
  1. %%time  
  2. with ThreadPoolExecutor(max_workers=2) as executor: #change max_workers to 2 and see the results  
  3.     future = executor.submit(wait_function, 34)  
  4.     future.add_done_callback(callback_function)  # Setup callback function here  
  5.     future2 = executor.submit(wait_function, 88)  
  6.     while True:  
  7.         if future.running() or future2.running():  
  8.             continue  
  9.   
  10.         if future.done() and future2.done():  
  11.             print(future.result(), future2.result())  
  12.             break  
Output:
Task( 3 multiply 4 ) started
Task( 8 multiply 8 ) started
Task( Task(8 multiply 8 ) completed
3 multiply 4 ) completed
Callback with the following result 12
12 64
Wall time: 2.01 s

Because we set max_workers to 2 and the execution time is reduced from 4 seconds to 2 seconds which shows that the tasks are executed in parallel. Also you can observe from the log that the callback function callback_function is being called too.

ProcessPoolExecutor
The ProcessPoolExecutor class works exactly the same as ThreadPoolExecutor, but with a few minor differences. It uses the multiprocessing module, which allows it to sidestep the Global Interpreter Lock. However, this also means that only pickable objects can be executed and returned.

Besides, it does not work in an interactive interpreter and must have a __main__ function that is importable by worker subprocesses. max_workers will be the number of processes in the machine. On Windows operating system, max_workers must be equal to or lower than 61.

You have to import the ProcessPoolExecutor to use it:
  1. from concurrent.futures import ProcessPoolExecutor  
You can reuse the previous code and modify it to ProcessPoolExecutor instead of ThreadPoolExecutor. Wrap the code inside a function and call it directly from __main__. Check out the complete code in the following on GitHub:
  1. from concurrent.futures import ProcessPoolExecutor  
  2. import time  
  3.   
  4. def wait_function(x, y):  
  5.     print('Task(', x,'multiply', y, ') started')  
  6.     time.sleep(2)  
  7.     print('Task(', x,'multiply', y, ') completed')  
  8.     return x * y  
  9.   
  10. def callback_function(future):  
  11.     print('Callback with the following result', future.result())  
  12.   
  13. def main():  
  14.     with ProcessPoolExecutor(max_workers=2) as executor:  
  15.         future = executor.submit(wait_function, 34)  
  16.         future.add_done_callback(callback_function)  
  17.         future2 = executor.submit(wait_function, 88)  
  18.         while True:  
  19.             if(future.running()):  
  20.                 print("Task 1 running")  
  21.             if(future2.running()):  
  22.                 print("Task 2 running")  
  23.   
  24.             if(future.done() and future2.done()):  
  25.                 print(future.result(), future2.result())  
  26.                 break  
  27.   
  28. if __name__ == '__main__':  
  29.     main()  
Keyboard Interrupt
If you intend to stop the execution via Ctrl+C while the process is running in a thread, the compiler will most likely hang and get stuck at the KeyboardInterupt exception. This is mainly because the Ctrl+C command generates SIGINT, which will not stop or break the execution. You need to generate SIGBREAK to end the execution and return to the terminal. Use the following command to generate SIGBREAK based on the operating system and computer model:


Conclusion
Let’s recap what we have learned today.

We started off with a simple explanation of the concurrent.futures module.

After that, we explored in-depth the basic ThreadPoolExecutor class and the Future class. We tried running multiple tasks in parallel with a different number of max_workers. We also tested out setting up a callback function that will execute upon completion of the task.

We moved on to the ProcessPoolExecutor, which is similar to the ThreadPoolExecutor with a few minor differences.

沒有留言:

張貼留言

[Git 常見問題] error: The following untracked working tree files would be overwritten by merge

  Source From  Here 方案1: // x -----删除忽略文件已经对 git 来说不识别的文件 // d -----删除未被添加到 git 的路径中的文件 // f -----强制运行 #   git clean -d -fx 方案2: 今天在服务器上  gi...