Downloading Spark
The first step to use Spark is to downlaod and unpack it. Let's start by downloading a recent precompiled released version of Spark. Visit http://spark.apache.org/downloads.html, select the package type of "Pre-built for Hadoop 2.4 and later," and click "Direct Download." This will download a compressed TAR file, or tarball:
You don't need to have Hadoop, but if you have an existing Hadoop cluster of HDFS installation, download the matching version. Now that we have downloaded Spark, let's unpack it and take a look at what comes with the default Spark distribution. To do that, open a terminal, change to the directory where you downloaded Spark, and untar the file. This will create a new directory with the same name but without the final .tgz suffix. Change into that directory and see what's inside. You can use the following commands to accomplish all of that:
Let's briefly consider the name and purpose of some of the more important files and directories you see here that come with Spark:
* README.md
* bin
* core, streaming, python, ...
* examples
Don't worry about the large number of directories and files the Spark project comes with; we'll cover most of those in the rest of follow chapters. For now, let's dive right in and try out Spark's Python and Scala shells. We will start by running some of the examples that come with Spark. Then we will write, compile, and run a simple Spark job of our own.
All of the work we will do in this chapter will be with Spark running in local mode; that is, nondistributed model, which uses only a single machine. Spark can run in a variety of different modes, or environments. Beyond local mode, Spark can also be run on Mesos, YARN, or the Standalone Scheduler included in the Spark distribution. We will cover the various deployment modes in detail in Chapter 7.
Introduction to Spark's Python and Scala Shells
Spark comes with interactive shells that enable ad-hoc data analysis. Spark's shells will feel familiar if you have used other shells such as those in R, Python, and Scala, or operating system shells like Bash or the Windows command prompt. Unlike most other shells, however, which let you manipulate data using the disk and memory on a single machine, Spark's shells allow you to interact with data that is distributed on disk or in memory across many machines, and Spark takes care of automatically distributing this processing.
Because Spark can load data into memory on the worker nodes, many distributed computations, even ones that process terabytes of data across dozens of machines, can run in a few seconds. This makes the sort of interactive, ad-hoc, and exploratory analysis commonly done in shells a good fit for Spark. Spark provides both Python and Scala shells that have been augmented to support connecting to a cluster.
Note.
The easiest way to demonstrate the power of Spark's shells is to start using one of them for some simple data analysis. Let's walk through the example from the Quick Start Guide (http://spark.apache.org/docs/latest/quick-start.html) in the official Spark documentation. The first step is to open one of Spark's shells. To open the Python version of the Spark shell, which we also refer to as the PySpark Shell, go into your Spark directory and type pyspark:
The shell prompt should appear within a few seconds. Above figure shows what PySpark shell looks like when you open it. You may find the logging statements that get printed in the shell distracting. You can control the verbosity of the logging. To do this, you can create a file in the conf directory called log4j.properties. The Spark developers already include a template for this file called log4j.properties.template. To make the logging less verbose, make a copy ofconf/log4j.properties.template called conf/log4j.properties and find the following line:
- log4j.rootCategory=INFO, console
- log4j.rootCategory=WARN, console
In Spark, we express our computation through operations on distributed collections that are automatically parallelized across the cluster. These collections are called resilient distributed dataset, or RDDs. RDDs are Spark's fundamental abstraction for distributed data and computation.
Before we say more about RDDs, let's create one on the shell from a local text file and do some very simple ad-hoc analysis by following Example 2-1 for Python:
We can run various parallel operations on the RDD, such as counting the number of elements in the dataset (here, lines of text in the file) or printing the first one. We will discuss RDDs with great depth in later chapters, but before we go any further, let's take a moment now to introduce basic Spark concepts.
Note.
Introduction to Core Spark Concepts
Now that you have run your first Spark code using the shell, it's time to learn about programming in it in more details. At a high level, every Spark applications consists of a driver program that launches various parallel operations on a cluster. The driver program contains your application's main function and defines distributed dataset on the cluster, then applies operations to them. In the preceding examples, the driver example was the Spark shell itself, and you could just type in the operations you wanted to run.
Driver programs access Spark through a SparkContext object, which represents a connection to a computing clusters. In the shell, a SparkContext is automatically created for you as the variable called sc. Try printing out sc to see its type, as show in below:
Once you have a SparkContext, you can use it to build RDDs. In Example 2-1 and 2-2, we called sc.textFile() to create an RDD representing the lines of text in a file. We can then run various operations on these lines, such as count(). To run these operations, driver programs typically manage a number of nodes called executors. For example, if we were running the count() operation on a cluster, different machines might count lines in different ranges of the file. Because we just ran the Spark shell locally, it executed all its work on a single machine - but you can connect the same shell to a cluster to analyze data in parallel. Below figure shows how Spark execute on a cluster:
Figure 2-3. Components for distributed execution in Spark
Finally, a lot of Spark's API revolves around passing functions to its operations to run thme on the cluster. For example, we could extend our README example by filtering the lines in the file that contain a word, such as Python, as shown in below example in pyspark shell:
While we will conver the Spark API in more detail later, a lot of its magic is that function-based operations like filter also parallelize across the cluster. That is, Spark automatically takes your function (e.g., line.contains("Python")) and ships it to executor nodes, Thus you can write code in a single driver program and automatically have parts of it run on multiple nodes. Chapter 3 covers the RDD API in detail.
Standalone Applications
The final piece missing in this quick tour of Spark is how to use it in standalone programs. Apart from running interactively, Spark can be linked into standalone applications in either Java, Scala, or Python. The main difference from using it in the shell is that you need to initialize your own SparkContext. After that, the API is the same. The process of linking to Spark varies by language. In Java and Scala, you give your application a Maven dependency on the spark-core artifact. As of the time of writing, the latest Spark version is 1.3.0, and the Maven coordinates for that are:
- groupId = org.apache.spark
- artifactId = spark-core_2.10
- version = 1.3.0
Initializing a SparkContext
Once you have linked an application to Spark, you need to import the Spark packages in your program and create a SparkContext. You do so by first creating a SparkConf object to configure your application, and then builiding a SparkContext for it. Below examples demonstrate this in each supported language.
- Example 2-7. Initializing Spark in Python
- from pyspark import SparkConf, SparkContext
- conf = SparkConf().setMaster("local").setAppName("My App")
- sc = SparkContext(conf = conf)
- import org.apache.spark.SparkConf
- import org.apache.spark.SparkContext
- import org.apache.spark.SparkContext._
- val conf = new SparkConf().setMaster("local").setAppName("My App")
- val sc = new SparkContext(conf)
- import org.apache.spark.SparkConf;
- import org.apache.spark.api.java.JavaSparkContext;
- SparkConf conf = new SparkConf().setMaster("local").setAppName("My App");
- JavaSparkContext sc = new JavaSparkContext(conf);
Additional parameters exist for configuring how your application executes or adding code to be shipped to the cluster, but we will cover these in later chapters later. After you have initialized a SparkContext, you can use all the methods we showed before to create RDDs (e.g., from a text file) and manipulate them.
Finally, to shut down Spark, you can either call the stop() method on your SparkContext, or simple exit the application (e.g., with System.exit(0) or sys.exit()). This quick overview should be enough to let you run a standalone Spark application on your laptop. For more advanced configuration, Chapter 7 will cover how to connect your application to a cluster, including packaging your application so that its code is automatically shipped to worker nodes. For now, please refer to the Quick Start Guide in the official Spark documentation.
Building Standalone Applications
This wouldn't be a complete introductory chapter of a Big Data book if we didn't have a word count example. On a single machine, implementing world count is simple, but in distributed frameworks it is a common example because it involves reading and combining data from many worker nodes. We will look at building and packaging a simple word count example with Maven. All of our examples can be built together, but to illustrate a stripped-down build with minimal dependencies we have a separate smaller project underneath the learning-spark-examples/mini-complete-example directory.
- Example 2-10. Word count Java application - don't worry about the details yet.
- package demo;
- import java.util.Arrays;
- import org.apache.spark.SparkConf;
- import org.apache.spark.api.java.JavaPairRDD;
- import org.apache.spark.api.java.JavaRDD;
- import org.apache.spark.api.java.JavaSparkContext;
- import org.apache.spark.api.java.function.FlatMapFunction;
- import org.apache.spark.api.java.function.Function2;
- import org.apache.spark.api.java.function.PairFunction;
- import scala.Tuple2;
- public class E2_10 {
- public static void main(String args[])
- {
- String master = args[0];
- String inputFile = args[1];
- String outputFile = args[2];
- // Create a Java Spark Context
- JavaSparkContext sc = new JavaSparkContext(master,
- "wordcount",
- System.getenv("SPARK_HOME"),
- System.getenv("JARS"));
- // Load our input data.
- JavaRDD
input = sc.textFile(inputFile); - // Split up into words
- JavaRDD
words = input.flatMap(new FlatMapFunction (){ - public Iterable
call(String x){ - return Arrays.asList(x.split(" "));
- }
- });
- // Transform into pairs and count.
- JavaPairRDD
counts = words.mapToPair(new PairFunction (){ - public Tuple2
call(String x){ - return new Tuple2(x, 1);
- }
- }).reduceByKey(new Function2
(){ - public Integer call(Integer x, Integer y) {return x + y;}
- });
- // Save the word count back out to a text file, causing evaluation.
- counts.saveAsTextFile(outputFile);
- }
- }
- Example 2-13. Maven build file (Github - pom.xml)
Once we have our build defined, we can easily package and run our application using the bin/spark-submit script. This script sets up a number of environment variables used by Spark. From the mini-complete-example directory we can build in Java as below example (Before all, make sure your have installed maven/git in your local machine):
- Example 2-15. Maven build and run (WordCount.java)
For even more detailed examples of linking applications to Spark, refer to the Quick Start Guide in the official Spark documentation. Chapter 7 covers packaging Spark applications in more details.
Supplement
* Apache Spark example with Java and Maven
沒有留言:
張貼留言