Preface
By reading this piece, you will learn how to use the concurrent.futures library to run tasks asynchronously in Python. It is a better alternative to the threading and multiprocessing classes in Python due to the fact that it implemented both Thread and Process with the same interface, which is defined by the abstract Executor class. The official documentation reveals one major problem with Thread.
Besides, the threading class does not allow you to return a value from the callable functions except null. The main concept of the concurrent.futures module lies with the Executor class. It is an abstract class that provides methods to execute calls asynchronously. Instead of using it directly, we will be using the subclasses that inherit from it:
Let’s proceed to the next section and start writing some Python code.
ThreadPoolExecutor
Import
Add the following import declaration at the top of your Python file:
- from concurrent.futures import ThreadPoolExecutor
- import time
Let’s define a new function that serves as the callable function for the asynchronous call. I will just define a simple function that sleeps for two seconds and returns the multiplication of both input parameters as a result after that:
- def wait_function(x, y):
- print('Task(', x,'multiply', y, ') started')
- time.sleep(2)
- print('Task(', x,'multiply', y, ') completed')
- return x * y
The next step is to create a ThreadPoolExecutor object. It is highly recommended to wrap it inside the with context manager, as it will call the shutdown function on its own and free up the resources once it finishes the execution. It accepts the following input parameters.
In this tutorial, I will be using just the max_workers parameter. Let’s create a ThreadPoolExecutor and call the submit function with the wait_function as an input callable function. Remember that wait_function accepts two input parameters. I am going to pass them as separate parameters instead of a tuple:
- with ThreadPoolExecutor(max_workers=1) as executor:
- future = executor.submit(wait_function, 3, 4)
The submit function will return a Future object that encapsulates the asynchronous execution of a callable. The most commonly used functions for the Future object are:
Append the following code right below the submit function. It is just a simple loop that prints a string while the thread is running. When it is completed, it will print out the result:
- while True:
- if future.running():
- print("Task 1 running")
- elif future.done():
- print(future.result())
- break
Multiple tasks
Next, we are going to add another task to it so that both of them will run in parallel. Change the code in your Python file to the following:
- %%time
- from concurrent.futures import ThreadPoolExecutor
- import time
- def wait_function(x, y):
- print('Task(', x,'multiply', y, ') started')
- time.sleep(2)
- print('Task(', x,'multiply', y, ') completed')
- return x * y
- with ThreadPoolExecutor(max_workers=1) as executor: #change max_workers to 2 and see the results
- future = executor.submit(wait_function, 3, 4)
- future2 = executor.submit(wait_function, 8, 8)
- while True:
- if future.running() or future2.running():
- continue
- if(future.done() and future2.done()):
- print(future.result(), future2.result())
- break
For now, set the max_workers to one first. Run it and you should notice that the tasks are not running in parallel. It will run the first task and then the second task. This is mainly because you only have one worker in the pool. Let’s increase the max_workers to two and you should be able to see that both tasks are running in parallel.
Callback function
You can attach a callback function to the Future object. It will call the attached function once the execution is canceled or completed. This is extremely useful if you intend to proceed with an update to the UI after a successful connection to the database or completion of URL requests. Let’s create a simple callback function for now:
- def callback_function(future):
- print('Callback with the following result', future.result())
- %%time
- with ThreadPoolExecutor(max_workers=2) as executor: #change max_workers to 2 and see the results
- future = executor.submit(wait_function, 3, 4)
- future.add_done_callback(callback_function) # Setup callback function here
- future2 = executor.submit(wait_function, 8, 8)
- while True:
- if future.running() or future2.running():
- continue
- if future.done() and future2.done():
- print(future.result(), future2.result())
- break
Because we set max_workers to 2 and the execution time is reduced from 4 seconds to 2 seconds which shows that the tasks are executed in parallel. Also you can observe from the log that the callback function callback_function is being called too.
ProcessPoolExecutor
The ProcessPoolExecutor class works exactly the same as ThreadPoolExecutor, but with a few minor differences. It uses the multiprocessing module, which allows it to sidestep the Global Interpreter Lock. However, this also means that only pickable objects can be executed and returned.
Besides, it does not work in an interactive interpreter and must have a __main__ function that is importable by worker subprocesses. max_workers will be the number of processes in the machine. On Windows operating system, max_workers must be equal to or lower than 61.
You have to import the ProcessPoolExecutor to use it:
- from concurrent.futures import ProcessPoolExecutor
- from concurrent.futures import ProcessPoolExecutor
- import time
- def wait_function(x, y):
- print('Task(', x,'multiply', y, ') started')
- time.sleep(2)
- print('Task(', x,'multiply', y, ') completed')
- return x * y
- def callback_function(future):
- print('Callback with the following result', future.result())
- def main():
- with ProcessPoolExecutor(max_workers=2) as executor:
- future = executor.submit(wait_function, 3, 4)
- future.add_done_callback(callback_function)
- future2 = executor.submit(wait_function, 8, 8)
- while True:
- if(future.running()):
- print("Task 1 running")
- if(future2.running()):
- print("Task 2 running")
- if(future.done() and future2.done()):
- print(future.result(), future2.result())
- break
- if __name__ == '__main__':
- main()
If you intend to stop the execution via Ctrl+C while the process is running in a thread, the compiler will most likely hang and get stuck at the KeyboardInterupt exception. This is mainly because the Ctrl+C command generates SIGINT, which will not stop or break the execution. You need to generate SIGBREAK to end the execution and return to the terminal. Use the following command to generate SIGBREAK based on the operating system and computer model:
Conclusion
Let’s recap what we have learned today.
We started off with a simple explanation of the concurrent.futures module.
After that, we explored in-depth the basic ThreadPoolExecutor class and the Future class. We tried running multiple tasks in parallel with a different number of max_workers. We also tested out setting up a callback function that will execute upon completion of the task.
We moved on to the ProcessPoolExecutor, which is similar to the ThreadPoolExecutor with a few minor differences.
沒有留言:
張貼留言