程式扎記: [ In Action ] Ch10. Pig: Speaking Pig Latin (3)

標籤

2015年4月26日 星期日

[ In Action ] Ch10. Pig: Speaking Pig Latin (3)

Preface 
You now know how to use Grunt to run Pig Latin statements and investigate their execution and results. We can come back and give a more formal treatment of the language. You should feel free to use Grunt to explore these language concepts as we present them. 

Data types and schemas 
Let’s first look at Pig data types from a bottom-up view. Pig has six simple atomic types and three complex types, shown in tables 10.4 and 10.5 respectively. The atomic types include numeric scalars as well as string and binary objects. Type casting is supported and done in the usual manner. Fields default to bytearray unless specified otherwise
 
 

A field in a tuple or a value in a map can be null or any atomic or complex type. This enables nesting and complex data structures. Whereas data structures can be arbitrarily complex, some are definitely more useful and occur more often than others, and nesting usually doesn’t go deeper than two levels. In the Excite log example earlier, the GROUP BY operator generated a relation grpd where each tuple has a field that is a bag. The schema for the relation seems more natural once you think of grpd as the query history of each user. Each tuple represents one user and has a field that is a bag of the user’s queries. 

We can also look at Pig’s data model from the top down. At the top, Pig Latin statements work with relations, which is a bag of tuples. If you force all the tuples in a bag to have a fixed number of fields and each field has a fixed atomic type, then it behaves like a relational data model —the relation is a table, tuples are rows (records), and fields are columns. But, Pig’s data model has more power and flexibility by allowing nested data types . Fields can themselves be tuples, bags, or maps. Maps are helpful in processing semistructured data such as JSON, XML, and sparse relational data. In addition, it isn’t necessary that tuples in a bag have the same number of fields. This allows tuples to represent unstructured data. 

Besides declaring types for fields, schemas can also assign names to fields to make them easier to reference. Users can define schemas for relations using the ASkeyword with the LOADSTREAM, and FOREACH operators. For example, in the LOAD statement for getting the Excite query log, we defined the data types for the fields in log, as well as named the fields user, time, and query
grunt> log = LOAD 'tutorial/data/excite-small.log'
➥ AS (user:chararray, time:long, query:chararray);

In defining a schema, if you leave out the type, Pig will default to bytearray as the most generic type. You can also leave out the name, in which case a field would be unnamed and you can only reference it by position. 

Expressions and functions 
You can apply expressions and functions to data fields to compute various values. The simplest expression is a constant value . Next is to reference the value of a field . You can reference the named fields’ value directly by the name. You can reference an unnamed field by $n, where n is its position inside the tuple. (Position is numbered starting at 0.) For example, this LOAD command provides named fields to log through the schema. 
grunt> log = LOAD 'tutorial/data/excite-small.log'
➥ AS (user:chararray, time:long, query:chararray);

The three named fields are usertime, and query. For example, we can refer to the time field as either time or $1, because the time field is the second field in log (position number 1). Let’s say we want to extract the time field into its own relation; we can use this statement: 
grunt> projection = FOREACH log GENERATE time;

We can also achieve the same with: 
grunt> projection = FOREACH log GENERATE $1;

Most of the time you should give names to fields. One use of referring to fields by position is when you’re working with unstructured data. 

When using complex types, you use the dot notation to reference fields nested inside tuples or bags. For example, recall earlier that we’d grouped the Excite log by user ID and arrived at relation grpd with a nested schema: 
 

The second field in grpd is named log, of type bag. Each bag has tuples with three named fields: usertime, and query. In this relation, log.query would refer to the two copies of “conan” “o’brien” when applied to the first tuple. You can get the same field with log.$2

You reference fields inside maps through the pound operator instead of the dot operator. For a map named m, the value associated with key k is referenced through m#k

Being able to refer to values is only a first step. Pig supports the standard arithmetic, comparison, conditional, type casting , and Boolean expressions that are common in most popular programming languages. See table 10.6. 
 

Furthermore, Pig also supports functions. Table 10.7 shows Pig’s built-in functions, most of which are self-explanatory. We’ll discuss user-defined functions (UDF) in section 10.6
 

You can’t use expressions and functions alone. You must use them within relational operators to transform data. 

Relational operators 
The most salient characteristic about Pig Latin as a language is its relational operators. These operators define Pig Latin as a data processing language. We’ll quickly go over the more straightforward operators first, to acclimate ourselves to their style and syntax. Afterward we’ll go into more details on the more complex operators such as COGROUP and FOREACH

UNION combines multiple relations together whereas SPLIT partitions a relation into multiple ones. An example will make it clear: 
grunt> a = LOAD 'A' using PigStorage(',') AS (a1:int, a2:int, a3:int);
grunt> DUMP a;
(0,1,2)
(1,3,4)

grunt> b = LOAD 'B' using PigStorage(',') AS (b1:int, b2:int, b3:int);
grunt> DUMP b;
(0,5,2)
(1,7,8)

grunt> c = UNION a, b;
grunt> DUMP c;
(0,1,2)
(1,3,4)
(0,5,2)
(1,7,8)

grunt> SPLIT c INTO d IF $0 == 0, e IF $0 == 1;
grunt> DUMP d;
(0,5,2)
(0,1,2)

grunt> DUMP e;
(1,7,8)
(1,3,4)

The UNION operator allows duplicates. You can use the DISTINCT operator to remove duplicates from a relation. Our SPLIT operation on c sends a tuple to d if its first field ($0) is 0, and to e if it’s 1. It’s possible to write conditions such that some rows will go to both dand e or to neither. You can simulate SPLIT by multiple FILTER operators. The FILTER operator alone trims a relation down to only tuples that pass a certain test: 
grunt> f = FILTER c BY $1 > 3;
grunt> DUMP f;
(0,5,2)
(1,7,8)

We’ve seen LIMIT being used to take a specified number of tuples from a relation. SAMPLE is an operator that randomly samples tuples in a relation according to a specified percentage. 

The operations ‘till now are relatively simple in the sense that they operate on each tuple as an atomic unit. More complex data processing, on the other hand, will require working on groups of tuples together. We’ll next look at operators for grouping. Unlike previous operators, these grouping operators will create new schemas in their output that rely heavily on bags and nested data types. The generated schema may take a little time to get used to at first. Keep in mind that these grouping operators are almost always for generating intermediate data. Their complexity is only temporary on your way to computing the final results. 

The simpler of these operators is GROUP. Continuing with the same set of relations we used earlier, 
grunt> DUMP c;
(0,5,2)
(1,7,8)
(0,1,2)
(1,3,4)

grunt> g = GROUP c BY $2;
grunt> DUMP g;
(2,{(0,1,2),(0,5,2)})
(4,{(1,3,4)})
(8,{(1,7,8)})

grunt> DESCRIBE c;
c: {a1: int,a2: int,a3: int}
grunt> DESCRIBE g;
g: {group: int,c: {(a1: int,a2: int,a3: int)}}

We’ve created a new relation, g, from grouping tuples in c having the same value on the third column ($2, also named a3). The output of GROUP always has two fields. The first field is group key, which is a3 in this case. The second field is a bag containing all the tuples with the same group key. Looking at g’s dump, we see that it has three tuples, corresponding to the three unique values in c’s third column. The first field of GROUP’s output relation is always named group, for the group key. In this case it may seem more natural to call the first field a3, but currently Pig doesn’t allow you to assign a name to replace group. You’ll have to adapt yourself to refer to it as group. The second field of GROUP’s output relation is always named after the relation it’s operating on, which is in this case, and as we said earlier it’s always a bag. As we use this bag to hold tuples from c, the schema for this bag is exactly the schema for c —three fields of integers named a1a2, and a3

Before moving on, we want to note that one can GROUP by any function or expression. For example, if time is a timestamp and there exists a functionDayOfWeek, one can conceivably do this grouping that would create a relation with seven tuples. 
GROUP log BY DayOfWeek(time);

Finally, one can put all tuples in a relation into one big bag. This is useful for aggregate analysis on relations, as functions work on bags but not relations. For example: 
grunt> h = GROUP c ALL;
grunt> DUMP h;
(all,{(0,1,2),(1,3,4),(0,5,2),(1,7,8)})
grunt> i = FOREACH h GENERATE COUNT($1);
grunt> DUMP i;
(4)

This is one way to count the number of tuples in c. The first field in GROUP ALL’s output is always the string all

Now that you’re comfortable with GROUP, we can look at COGROUP, which groups together tuples from multiple relations. It functions much like a join. For example, let’s cogroup a and b on the third column. 
grunt> j = COGROUP a BY $2, b BY $2;
grunt> DUMP j;
(2,{(0,1,2)},{(0,5,2)})
(4,{(1,3,4)},{})
(8,{},{(1,7,8)})

grunt> DESCRIBE j;
j: {group: int,a: {(a1: int,a2: int,a3: int)},b: {(b1: int,b2: int,b3: int)}}

Whereas GROUP always generates two fields in its output, COGROUP always generates three (more if cogrouping more than two relations). The first field is the group key, whereas the second and third fields are bags. These bags hold tuples from the cogrouping relations that match the grouping key. If a grouping key matches only tuples from one relation but not the other, then the field corresponding to the nonmatching relation will have an empty bag. To ignore group keys that don’t exist for a relation, one can add the INNER keyword to the operation, like 
grunt> j = COGROUP a BY $2, b BY $2 INNER;
grunt> DUMP j;
(2,{(0,1,2)},{(0,5,2)})
(8,{},{(1,7,8)})

grunt> j = COGROUP a BY $2 INNER, b BY $2 INNER;
grunt> DUMP j;
(2,{(0,1,2)},{(0,5,2)})

Conceptually, you can think of the default behavior of COGROUP as an outer join, and the INNER keyword can modify it to be left outer join, right outer join, or inner join. Another way to do inner join in Pig is to use the JOIN operator. The main difference between JOIN and an inner COGROUP is that JOIN creates a flat set of output records, as indicated by looking at the schema: 
grunt> j = JOIN a BY $2, b BY $2;
grunt> DUMP j;
(0,1,2,0,5,2)
grunt> DESCRIBE j;
j: {a::a1: int,a::a2: int,a::a3: int,b::b1: int,b::b2: int,b::b3: int}

The last relational operator we look at is FOREACH. It goes through all tuples in a relation and generates new tuples in the output. Behind this seeming simplicity lies tremendous power though, particularly when it’s applied to complex data types outputted by the grouping operators. There’s even a nested form of FOREACHdesigned for handling complex types. First let’s familiarize ourselves with different FOREACH operations on simple relations. 
grunt> k = FOREACH c GENERATE a2, a2*a3;
grunt> DUMP k;
(5,10)
(7,56)
(1,2)
(3,12)

FOREACH is always followed by an alias (name given to a relation) followed by the keyword GENERATE. The expressions after GENERATE control the output. At its simplest, we use FOREACH to project specific columns of a relation into the output. We can also apply arbitrary expressions, such as multiplication in the preceding example. 

For relations with nested bags (e.g., ones generated by the grouping operations), FOREACH has special projection syntax, and a richer set of functions. For example, applying nested projection to have each bag retain only the first field: 
grunt> k = FOREACH g GENERATE group, c.a1;
grunt> DUMP k;
(2,{(0),(0)})
(4,{(1)})
(8,{(1)})

To get two fields in each bag: 
grunt> k = FOREACH g GENERATE group, c.(a1,a2);
grunt> DUMP k;
(2,{(0,1),(0,5)})
(4,{(1,3)})
(8,{(1,7)})

Most built-in Pig functions are geared toward working on bags. 
grunt> k = FOREACH g GENERATE group, COUNT(c);
grunt> DUMP k;
(2,2)
(4,1)
(8,1)

Recall that g is based on grouping c on the third column. This FOREACH statement therefore generates a frequency count of the values in c’s third column. As we said earlier, grouping operators are mainly for generating intermediate data that will be simplified by other operators such as FOREACH. The COUNTfunction is one of the aggregate functions. As we’ll see, you can create many other functions via UDFs

The FLATTEN function is designed to flatten nested data types. Syntactically it looks like a function, such as COUNT and AVG, but it’s a special operator as it can change the structure of the output created by FOREACH...GENERATE. Its flattening behavior is also different depending on how it’s applied and what it’s applied to. For example, consider a relation with tuples of the form (a, (b, c)). The statement FOREACH... GENERATE $0, FLATTEN($1) will create one output tuple of the form (a, b, c) for each input tuple. 

When applied to bags, FLATTEN modifies the FOREACH...GENERATE statement to generate new tuples. It removes one layer of nesting and behaves almost the opposite of grouping operations. If a bag contains N tuples, flattening it will remove the bag and create N tuples in its place. 
grunt> k = FOREACH g GENERATE group, FLATTEN(c);
grunt> DUMP k;
(2,0,1,2)
(2,0,5,2)
(4,1,3,4)
(8,1,7,8)

grunt> DESCRIBE k;
k: {group: int,c::a1: int,c::a2: int,c::a3: int}

Another way to understand FLATTEN is to see that it produces a cross-product. This view is helpful when we use FLATTEN multiple times within a single FOREACHstatement. For example, let’s say we’ve somehow created a relation l
grunt> DUMP l;
(1,{(1,2)},{(3)})
(4,{(4,2),(4,3)},{(6),(9)})
(8,{(8,3),(8,4)},{(9)})

grunt> DESCRIBE l;
d: {group: int,a: {a1: int,a2: int},b: {b1: int}}

The following statement that flattens two bags outputs all combinations of those two bags for each tuple: 
grunt> m = FOREACH l GENERATE group, FLATTEN(a), FLATTEN(b);
grunt> DUMP m;
(1,1,2,3)
(4,4,2,6)
(4,4,2,9)
(4,4,3,6)
(4,4,3,9)
(8,8,3,9)
(8,8,4,9)

We see that the tuple with group key 4 in relation l has a bag of size 2 in field a and also a bag size 2 in field b. The corresponding output in m therefore has four rows representing the full cross-product. 

Finally, there’s a nested form of FOREACH to allow for more complex processing of bags. Let’s assume you have a relation (say l) and one of its fields (say a) is a bag, a FOREACH with nested block has this form: 
  1. alias = FOREACH l {  
  2.     tmp1 = operation on a;  
  3.     [more operations...]  
  4.     GENERATE expr [, expr...]  
  5. }  
The GENERATE statement must always be present at the end of the nested block. It will create some output for each tuple in l. The operations in the nested block can create new relations based on the bag a. For example, we can trim down the a bag in each element of l’s tuple. 
 

You can have multiple statements in the nested block. Each one can even be operating on different bags. 
 

As of this writing, only five operators are allowed in the nested block: DISTINCTFILTERLIMITORDER, and SAMPLE. It’s expected that more will be supported in the future. 
NOTE. 
Sometimes the output of FOREACH can have a completely different schema from its input. In those cases, users can specify the output schema using the ASoption after each field. This syntax differs from the LOAD command where the schema is specified as a list after the ASoption, but in both cases we use AS to specify a schema.

For more information how to use Pig Latin, please refer to Official Document - Pig Latin Basics (r0.9.1). On many operators you’ll see an option for PARALLEL n(See more on Use the parallel feature). The number n is the degree of parallelism you want for executing that operator. In practice n is the number of reduce tasks in Hadoop that Pig will use. If you don’t set n it’ll default to the default setting of your Hadoop cluster. Pig documentation recommends setting the value of n according to the following guideline: 
n = (#nodes - 1) * 0.45 * RAM

where #nodes is the number of nodes and RAM is the amount of memory in GB on each node. 

At this point you’ve learned various aspects of the Pig Latin language—data types, expressions, functions, and relational operators. You can extend the language further with user-defined functions. But before discussing that we’ll end this section with a note on Pig Latin compilation and optimization. 

Execution optimization 
As with many modern compilers, the Pig compiler can reorder the execution sequence to optimize performance, as long as the execution plan remains logically equivalent to the original program. For example, imagine a program that applies an expensive function (say, encryption) to a certain field (say, social security number) of every record, followed by a filtering function to select records based on a different field (say, limit only to people within a certain geography). The compiler can reverse the execution order of those two operations without affecting the final result, yet performance is much improved. Having the filtering step first can dramatically reduce the amount of data and work the encryption step will have to do

As Pig matures, more optimization will be added to the compiler. Therefore it’s important to try to always use the latest version. But there’s always a limit to a compiler’s ability to optimize arbitrary code. You can read Pig’s web documentation for techniques to improve performance. A list of tips for enhancing performance under Pig version r0.9.1 is at https://pig.apache.org/docs/r0.9.1/perf.html.

沒有留言:

張貼留言

網誌存檔

關於我自己

我的相片
Where there is a will, there is a way!