2016年4月5日 星期二

[ Learn Spark ] Ch4. Working with Key/Value Pairs - Part2

Grouping Data 
With keyed data a common use case is grouping our data by key - for example, viewing all of a customer's order together. If our data is already keyed in the way we want, groupByKey() will group our data using the key in our RDD. On an RDD consisting of keys of type K and values of type V, we get back an RDD of type [K, Iterable[V]]

groupBy() works on unpaired data or data where we want to use a different condition besides equality on the current key. It takes a function that it applies to every element in the source RDD and uses the result to determine the key. 
>>> rdd = sc.parallelize([1, 1, 2, 3, 5, 8])
>>> result = rdd.groupBy(lambda x: x % 2).collect() // Group data into even/odd
>>> sorted([(x, sorted(y)) for (x, y) in result])
[(0, [2, 8]), (1, [1, 1, 3, 5])]

In addition to grouping data from a single RDD, we can group data sharing the same key from multiple RDDs using a function called cogroup(). This function over two RDDs sharing the same key type K, with the respective value types Vand W gives us back RDD[(K, (Iterable[V], Iterable[W]))]. If one of the RDDs doesn't have element for a given key that is present in the other RDD, the corresponding Iterable is simply empty. cogroup() gives us the power to group data from multiple RDDs. Below is a simple example: 
>>> x = sc.parallelize([("a", 1), ("b", 4), ("a", 3)])
>>> y = sc.parallelize([("a", "Apple"), ("c", "Cacao"), ("c", "Calamondin")])
>>> g = x.cogroup(y)
>>> g.collect()
[('a', (, )), ... )]
>>> [(x, tuple(map(list, y))) for x, y in sorted(list(g.collect()))]
[('a', ([1, 3], ['Apple'])), ('b', ([4], [])), ('c', ([], ['Cacao', 'Calamondin']))]


cogroup() is used as a building block for the joins we discuss in next section. 

Joins 
Some of the most useful operations we get with keyed data comes from using it together with other keyed data. Joining data together is probably one of the most common operations on a pair RDD, and we have a full range of options including right and left outer joins, cross joins, and inner joins. The simple join operator is an inner join. Only keys that are present in both pair RDDs are output. When there are multiple values for the same key in one of the inputs the resulting pair RDD will have an entry for every possible pair of values with that key from the two input RDDs. A simple to understand this is by look at below example: 
- Example 4-17. inner join in Python 
>>> storeAddress = sc.parallelize([("Ritual", "1026 Valencia St"), ("Philz", "748 Van Ness Ave"), ("Philz", "3101 24th St"), ("Starbucks", "Seattle")])
>>> storeRating = sc.parallelize([("Ritual", 4.9), ("Philz", 4.8)])
>>> g = storeAddress.join(storeRating)
>>> g.collect()
[('Philz', ('748 Van Ness Ave', 4.8)), ('Philz', ('3101 24th St', 4.8)), ('Ritual', ('1026 Valencia St', 4.9))]

Sometimes we don't need the key to be present in both RDDs to want it in our result. For example, if we were joining customer information with recommendations we might not want to drop customers if there were not any recommendations yet. leftOuterJoin(other) and rightOuterJoin(other) both join pair RDDs together by key, where one of the pair RDDs can be missing the key. 

We can revisit Example 4-17 and do a leftOuterJoin(other) and a rightOuterJoin(other) between two pair RDDs we used to illustrate join() in below example: 
- Example 4-18. leftOuterJoin() and rightOuterJoin() 
>>> storeAddress.leftOuterJoin(storeRating).collect()
[('Philz', ('748 Van Ness Ave', 4.8)), ('Philz', ('3101 24th St', 4.8)), ('Ritual', ('1026 Valencia St', 4.9)), ('Starbucks', ('Seattle', None))]
>>> storeAddress.rightOuterJoin(storeRating).collect()
[('Philz', ('748 Van Ness Ave', 4.8)), ('Philz', ('3101 24th St', 4.8)), ('Ritual', ('1026 Valencia St', 4.9))]


Sorting Data 
Having sorted data is quite useful in many cases, especially when you're producing downstream output. We can sort an RDD with key/value pairs provided that there is an ordering defined on the key. Once we have sorted our data, any subsequent call on the sorted data to collect() or save() will result in ordered data. Since we often want our RDDs in the reverse order, the sortByKey() function takes a parameter called ascending indicating whether we want it in ascending order (in default to true). Sometimes we want a different sort order entirely, and to support this we can provide our own comparison function. In below example, we will sort our RDD by converting the integers to strings and using the string comparison functions: 
- Example 4-19. Custom sort order in Python, sorting integers as if strings 
>>> rdd = sc.parallelize([(1, 'One'), (2, 'Two'), (11, 'Eleven')])
>>> rdd.sortByKey(ascending=True).collect() // 1 < 2 < 11
[(1, 'One'), (2, 'Two'), (11, 'Eleven')]
>>> rdd.sortByKey(ascending=False).collect()
[(11, 'Eleven'), (2, 'Two'), (1, 'One')]
>>> rdd.sortByKey(ascending=True, keyfunc = lambda x: str(x)).collect() // '1' < '11' < '2'
[(1, 'One'), (11, 'Eleven'), (2, 'Two')]
>>> rdd.sortByKey(ascending=False, keyfunc = lambda x: str(x)).collect()
[(2, 'Two'), (11, 'Eleven'), (1, 'One')]


Actions Available on Pair RDDs 
As with the transformations, all of the traditional actions available on the base RDD are also available on pair RDDs. Some additional actions are available on pair RDDs to take advantage of the key/value nature of the data; these are listed below: 
countByKey(): Count the number of elements for each key 
>>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
>>> sorted(rdd.countByKey().items())
[('a', 2), ('b', 1)] // We have 2 'a' and 1 'b'

collectAsMap(): Collect the result as a map to provide easy lookup. 
>>> m = sc.parallelize([(1, 'One'), (3, 'Three')]).collectAsMap()
>>> m.__class__

>>> m[1]
'One'
>>> m[3]
'Three'

lookup(key): Return all values associated with the provided key. 
>>> r = range(10)
>>> z = zip(r, r)
>>> z
[(0, 0), (1, 1), (2, 2), (3, 3), (4, 4), (5, 5), (6, 6), (7, 7), (8, 8), (9, 9)]
>>> rdd = sc.parallelize(z, 10)
>>> rdd.collect()
[(0, 0), (1, 1), (2, 2), (3, 3), (4, 4), (5, 5), (6, 6), (7, 7), (8, 8), (9, 9)]
>>> rdd.lookup(2) // slow
[2]
>>> sorted = rdd.sortByKey()
>>> sorted.lookup(2) //quick
[2]
>>> sorted.lookup(11)
[]

There are also multiple other actions on pair RDDs that save the RDD, which we will describe in Chapter 5. 

Supplement 
Ch4. Working with Key/Value Pairs - Part1 
Ch4. Working with Key/Value Pairs - Part2 
Ch4. Working with Key/Value Pairs - Part3

沒有留言:

張貼留言

[Git 常見問題] error: The following untracked working tree files would be overwritten by merge

  Source From  Here 方案1: // x -----删除忽略文件已经对 git 来说不识别的文件 // d -----删除未被添加到 git 的路径中的文件 // f -----强制运行 #   git clean -d -fx 方案2: 今天在服务器上  gi...