2018年2月25日 星期日

[ FP with Java ] Ch8 - Advanced list handling - Part2


Automatic parallel processing of lists 
Most computations that are applied to lists resort to folds. A fold involves applying an operation as many times as there are elements in the list. For very long lists and long-lasting operations, a fold can take a considerable amount of time. Because most computers are now equipped with multicore processors (if not multiple processors), you may be tempted to find a way to make the computer process a list in parallel. 

In order to parallelize a fold, you need only one thing (beside a multicore processor, of course): an additional operation allowing you to recompose the results of each parallel computation. 

Not all computations can be parallelized 
Take the example of a list of integers. Finding the mean of all integers isn’t something you can directly parallelize. You could break the list into four pieces (if you have a computer with four processors) and compute the mean of each sublist. But you wouldn’t be able to compute the mean of the list from the means of the sublists. On the other hand, computing the mean of a list implies computing the sum of all elements and then dividing it by the number of elements. And computing the sum is something that can be easily parallelized by computing the sums of the sublists, and then computing the sum of the sublist sums. 

This is a very particular example, where the operation used for the fold (the addition) is the same as the operation used to assemble the sublist results. This isn’t always the case. Take the example of a list of characters that’s folded by adding a character to a String. To assemble the intermediate results, you need a different operation: string concatenation. 

Breaking the list into sublists 
First, you must break the list into sublists, and you must do this automatically. One important question is how many sublists you should obtain. At first glance, you might think that one sublist for each available processor would be ideal, but this isn’t exactly the case. The number of processors (or, more precisely, the number of logical cores) isn’t the most important factor. There’s a more crucial question: will all sublist computations take the same amount of time? Probably not, but this depends on the type of computation. If you were to divide the list into four sublists because you decided to dedicate four threads to parallel processing, some threads might finish very quickly, while others might have to make a much longer computation. This would ruin the benefit of parallelization, because it might result in most of the computing task being handled by a single thread. 

A better solution is to divide the list into a large number of sublists, and then submit each sublist to a pool of threads. This way, as soon as a thread finishes processing a sublist, it’s handed a new one to process. So the first task is to create a method that will divide a list into sublists. 

Write a divide(int depth) method that will divide a list into a number of sublists. The list will be divided in two, and each sublist recursively divided in two, with the depth parameter representing the number of recursion steps. This method will be implemented in the List parent class with the following signature: 
  1. List<List<A>> divide(int depth)  
Let's first define a new version of the splitAt method that returns a list of lists instead of a Tuple<List, List>. Let’s call this method splitListAt and give it the following signature: 
  1. List<List<A>> splitListAt(int i)  
The splitListAt method is an explicitly recursive method made stack-safe through the use of the TailCall class: 
  1. public List<List<A>> splitListAt(int i) {  
  2.   return splitListAt(list(), this.reverse(), i).eval();  
  3. }  
  4.   
  5. private TailCall<List<List<A>>> splitListAt(List<A> acc,  
  6.                                             List<A> list, int i) {  
  7.   return i == 0 || list.isEmpty()  
  8.       ? ret(List.list(list.reverse(), acc))  
  9.       : sus(() -> splitListAt(acc.cons(list.head()), list.tail(), i - 1));  
  10. }  
This method will, of course, always return a list of two lists. Then you can define the divide method as follows: 
  1. public List<List<A>> divide(int depth) {  
  2.     return this.isEmpty() ? list(this) : divide(list(this), depth);  
  3. }  
  4.   
  5. private List<List<A>> divide(List<List<A>> list, int depth) {  
  6.     return list.head().length() < depth || depth < 2   
  7.             ? list  
  8.             : divide(list.flatMap(x -> x.splitListAt(x.length() / 2)), depth / 2);  
  9. }  
Note that you don’t need to make this method stack-safe because the number of recursion steps will only be log(length). In other words, you’ll never have enough heap memory to hold a list long enough to cause a stack overflow. 

Processing sublists in parallel 
To process the sublists in parallel, you’ll need a special version of the method to execute, which will take as an additional parameter an ExecutorService configured with the number of threads you want to use in parallel. 

Let's create a parFoldLeft method in List<A> that will take the same parameters as fold-Left plus an v and a function from B to B to B and that will return a Result<List<B>>. The additional function will be used to assemble the results from the sublists. Here’s the signature of the method (Exercise 8.23): 
  1. public<B> Result<B> parFoldLeft(ExecutorService es, B identity, Function<B, Function<A, B>> f, Function<B, Function<B, B>> m)  
First, you must define the number of sublists you want to use and divide the list accordingly: 
  1. final int chunks = 1024;  
  2. final List<List<A>> dList = divide(chunks);  
Then, you’ll map the list of sublists with a function that will submit a task to the ExecutorService. This task consists of folding each sublist and returning a Future instance. The list of Future instances is mapped to a function calling get on each Future to produce a list of results (one for each sublist). Note that you must catch the potential exceptions. 

Eventually, the list of results is folded with the second function, and the result is returned in a Result.Success. In the case of an exception, a Failure is returned. 
  1. try {  
  2.   List<B> result = dList.map(x -> es.submit(() -> x.foldLeft(identity,  
  3.                                                          f))).map(x -> {  
  4.     try {  
  5.       return x.get();  
  6.     } catch (InterruptedException | ExecutionException e) {  
  7.       throw new RuntimeException(e);  
  8.     }  
  9.   });  
  10.   return Result.success(result.foldLeft(identity, m));  
  11. catch (Exception e) {  
  12.   return Result.failure(e);  
  13. }  
You’ll find an example benchmark of this method in the accompanying code (https://github.com/fpinjava/fpinjava). The benchmark consists of computing 10 times the Fibonacci value of 35,000 random numbers between 1 and 30 with a very slow algorithm. On a four-core Macintosh, the parallel version executes in 22 seconds, whereas the serial version needs 83 seconds. 

Although mapping can be implemented through a fold (and thus can benefit from automatic parallelization), it can also be implemented in parallel without using a fold. This is probably the simplest automatic parallelization that can be implemented on a list. Create a parMap method that will automatically apply a given function to all elements of a list in parallel. Here’s the method signature (Exercise 8.24): 
  1. public <B> Result<List<B>> parMap(ExecutorService es, Function<A, B> g)  
Here’s the solution: 
  1. public <B> Result<List<B>> parMap(ExecutorService es, Function<A, B> g) {  
  2.   try {  
  3.     return Result.success(this.map(x -> es.submit(() -> g.apply(x)))  
  4.                                                              .map(x -> {  
  5.       try {  
  6.         return x.get();  
  7.       } catch (InterruptedException | ExecutionException e) {  
  8.         throw new RuntimeException(e);  
  9.       }  
  10.     }));  
  11.   } catch (Exception e) {  
  12.     return Result.failure(e);  
  13.   }  
  14. }  
The benchmark available in the code accompanying this book will allow you to measure the increase in performance. This increase may, of course, vary depending on the machine running the program. 

Supplement 
Ch8 - Advanced list handling - Part1 
Ch8 - Advanced list handling - Part2 
Java 文章收集 - ExecutorService usage tutorial

沒有留言:

張貼留言

[Git 常見問題] error: The following untracked working tree files would be overwritten by merge

  Source From  Here 方案1: // x -----删除忽略文件已经对 git 来说不识别的文件 // d -----删除未被添加到 git 的路径中的文件 // f -----强制运行 #   git clean -d -fx 方案2: 今天在服务器上  gi...