2016年6月17日 星期五

[ Learn Spark ] Ch7. Running on a Cluster

Introduction 
Up to now, we've focused on learning Spark by using the Spark shell and examples that run in Spark's local made. One benefit of writing applications on Spark is the ability to scale computation by adding more machines and running in cluster mode. The good news is that writing applications for parallel cluster execution uses the same API you've already learned in this book. The examples and applications you've written so far will run on a cluster "out of the box." This is one of the benefits of Spark's higher level API: users can rapidly prototype applications on smaller dataset locally, then run unmodified code on even very large clusters. 

This chapter first explains the runtime architecture of a distributed Spark application, then discuss options for running Spark in distributed clusters. Spark can run on a wide variety of cluster managers (Hadoop YARN, Apache Mesos, and Spark's own built-in Standalone cluster manager) in both on-premise and cloud deployments. We'll discuss the trade-offs and configurations required for running in each case. Along the way we'll also cover the "nuts and bolts" of scheduling, deploying, and configuring a Spark application. After reading this chapter you'll have everything you need to run a distributed Spark program. The following chapter will cover tuning and debugging applications. 

Spark Runtime Architecture 
Before we dive into the specifics of running Spark on a cluster, it's helpful to understand the architecture of Spark in distributed mode (illustrated in Figure 7-1): 


Figure 7-1. The components of a distributed Spark application 

In distributed mode, Spark uses a mater/slave architecture with one central coordinator and many distributed workers. The central coordinator is called the driver. driver communicates with a potentially large number of distributed workers called executors. The driver runs in its own Java process and each executor is a separator Java process. A driver and its executors are together termed as a Spark application. 

A Spark application is launched on a set of machines using an external service called a cluster manager. As noted, Spark is packaged with a built-in cluster manager called the Standalone cluster manager. Spark also works with Hadoop YARN and Apache Mesos, two popular open source cluster managers. 

The Driver 
The driver is the process where the main() method of your program runs. It is the process running the user code that creates a SparkContext, creates RDDs, and performs transformations and actions. When you launch a Spark shell, you've created a driver program (if you remember, the Spark shell comes preloaded with a SparkContext called sc). Once the driver terminates, the application is finished. 

When the driver runs, it performs two duties: 
- Converting a user program into tasks 

The Spark driver is responsible for converting a user program into units of physical execution called tasks. At a high level, all Spark programs follow the same structure: they create RDDs from some input, drive new RDDs from those using transformations, and perform actions to collect or save data. A Spark program implicitly create a logical directed acyclic graph (DAG) of operations. When the driver runs, it converts this logical graph into physical execution plan.

Spark performs several optimizations, such as "pipelining" map transformations together to merge them, and converts the execution graph into a set of stages. Each stage, in turn, consists of multiple tasks. The tasks are bundled up and prepared to be sent to the cluster. Tasks are the smallest unit of work in Spark; a typical user program can launch hundreds or thousands of individual tasks.

- Scheduling tasks on executors 
Given a physical execution plan, a Spark driver must coordinate the scheduling of individual tasks on executors. When executors are started they register themselves with the driver, so it has a complete view of the application's executors at all times. Each executor represents a process capable of running tasks and storing RDD data.

The Spark driver will look at the current set of executors and try to schedule each task in a appropriate location, based on data placement. When tasks execute, they may have a side effect of storing cached data. The driver also tracks the location of cached data and uses it to schedule future tasks that access that data. The driver expose information about the running Spark application through a web interface, which by default is available at port 4040. For instance, in local mode, this UI is available at http://localhost:4040. We'll cover Spark's web UI and its scheduling mechanisms in more detail in Chapter 8.


Executors 
Spark executors are worker process responsible for running the individual tasks in a given Spark job. Executors are launched once at the beginning of a Spark application and typically run for the entire lifetime of an application, through Spark applications can continue if executors fail. Executors have two roles. First, they run the tasks that make up the application and return results to the driver. Second, they provide in-memory storage for RDDs that are cached by user programs, through a service called the Block Manager that lives within each executor. Because RDDs are cached directly inside of executors, tasks can run alongside the cached data. 

Cluster Manager 
So far we've discussed drivers and executors in somewhat abstract terms. But how do drivers and executor process initially get launched? Spark depends on a cluster manager to launch executors and, in certain cases, to launch the driver. The cluster manager is a pluggable component in Spark. This allow Spark to run on top of different external managers, such as YARN and Mesos, as well as its built-in Standalone cluster manager. 

Launching a Program 
No matter which cluster manager you use, Spark provides a single script you can use to submit your program to it called spark-submit. Through various options, spark-submit can connect to different cluster managers and control how many resources your application gets. For some cluster managers, spark-submit can run the driver within the cluster (e.g., on a YARN worker node), while for others, it can run it only on your local machine. We'll cover spark-submit in more detail later. 

Summary 
To summarize the concepts in this section, let's walk through the exact steps that occur when you run a Spark application on a cluster: 
1. The user submits an applications using spark-submit.
2. spark-submit launches the driver program and invokes the main() method specified by the user.
3. The driver program contacts the cluster manager to ask for resources to launch executors.
4. The cluster manager launches executors on behalf of the driver program.
5. The driver process runs through the user application. Based on the RDD actions and transformations in the program, the driver sends work to executors in the form of tasks.
6. Tasks are run on executor processes to compute and save results.
7. If the driver's main() methods exits or it calls SparkContext.stop(), it will terminate the executors and release resources from the cluster manager.

Deploying Applications with Spark-submit 
As you have learned, Spark provides a single tool for submitting jobs across all cluster managers, called spark-submit. In Chapter 2 you saw a simple example of submitting a Python program with spark-submit, repeated here below: 
- Example 7-1. Submitting a Python application 
# spark-submit my_script.py

When spark-submit is called with nothing but the name of a script of JAR, it simply runs the supplied Spark program locally. Let's say we wanted to submit this program to a Spark Standalone cluster. We can provide extra flags with the address of a Standalone cluster and a specific size of each executor process we'd like to launch, as show in below example: 
- Example 7-2. Submitting an application with extra arguments 
# spark-submit --master spark://host:7077 --executor-memory 10g my_script.py

The --master flag specifies a cluster URL to connect to; in this case, the spark://URL means a cluster using Spark's Standalone mode (see Table 7-1). We will discuss other URL types later. 
 

Apart from a cluster URL, spark-submit provides a variety of options that let you control specific details about a particular run of your application. These options fall roughly into two categories. The first is scheduling information, such as amount of resources you'd like to request for your job (as shown in Example 7-2). The second is information about the runtime dependencies of your application, such as libraries of files you want to deploy to all worker machines. The general format for spark-submit is shown in below example: 
- Example 7-3. General format for spark-submit 
# spark-submit [options] <app jar | python file> [app options]

[options] are a list of flags for spark-submit. You can enumerate all possible flags by running spark-submit --help. A list of common flags is enumerated in Table 7-2. 
 

<app jar | python file> refers to the JAR or Python script containing the entry point into your application. [app options] are options that will be passed onto your application. If the main() method of your program parses its calling arguments, it will see only [app options] and not the flags specific to spark-submitspark-submit also allows setting arbitrary SparkConf configuration options using either the --conf prop=value flag or providing a properties file through --properties-file that contains key/value pairs. Chapter 8 will discuss Spark's configuration system. 

Below example shows a few longer-form invocations of spark-submit using various options: 
- Example 7-4. Using spark-submit with various options 
// submitting a Java application to Standalone cluster mode
# spark-submit \
--master spark://hostname:7077 \
--deploy-mode cluster \
--class com.databricks.examples.SparkExample \
--name "Example Program" \
--jars dep1.jar, dep2.jar, dep3.jar \
--total-executor-cores 300 \
--executor-memory 10g \
myApp.jar "options" "to your application" "go here"


// Submitting a Python application in YARN client mode
# export HADOOP_CONF_DIR=/opt/hadoop/conf
# spark-submit \
--master yarn \
--py-files somelib-1.2.egg, otherlib-4.4.zip, other-file.py \
--deploy-mode client \
--name "Example Program" \
--queue exampleQueue \
--num-executors 40 \
--executor-memory 10g \
my_script.py "options" "to your application" "go here"

Packaging Your Code and Dependencies 
Throughout most of this book we've provided example programs that are self-contained and had no library dependencies outside of Spark. More often, user programs depend on third-party libraries. If your program imports any libraries that are not in org.apache.spark package or part of the language library, you need to ensure that all your dependencies are present at the runtime of your Spark application. For Python users, there are a few ways to install third-party libraries, Since PySpark uses the existing Python installation on worker machines, you can install dependency libraries directly to the cluster machines using standard Python package managers (such as pip or easy_install), or via a manual installation into the site-package directory of your Python installation. Alternatively, you can submit individual libraries using the --py-files argument to spark-submit and they will be added to the Python interpreter's path. Adding libraries manually is more convenient if you do not have access to install packages on the cluster, but do keep in mind potential conflicts with existing packages already installed on the machines

For Java and Scala users, it is also possible to submit individual JAR files using the --jars flag to spark-submit. This can work well if you have a very simple dependency on one or two libraries and they themselves don't have any other dependencies. It is more common, however, for users to have Java or Scala projects that depend on several libraries. When you submit an application to Spark, it must ship with its entire transitive dependency graph to the cluster. This includes not only the libraries you directly depend on, but also their dependencies, their dependencies's dependencies, and so on. Manually tracking and submitting this set of JAR files would be extremely cumbersome. Instead, it's common practice to rely on a build tool to produce a single large JAR containing the entire transitive dependency graph of an application. This is often called an uber JAR or an assembly JAR, and most Java or Scala build tools can produce this type of artifact. 

The most popular build tools for Java and Scala are Maven and sbt (Scala Build tool). Either tool can be used with either language, but Maven is more often used for Java project and sbt for Scala projects. Here, we'll give example for Spark application builds using Maven. (For sbt, please buy this book for more reference :p). You can use it as template for your own Spark projects. 

A Java Spark Application Built with Maven 
Let's look at an example Java project with multiple dependencies that produces an uber JAR example. Below example provides a Maven pom.xml file containing a build definition. This example doesn't show the actual Java code or project directory structure, but Maven expects user code to be in a src/main/java directory relative to the project root (the root should contain the pom.xml file): 
- Example 7-5. pom.xml file for a Spark application built with Maven 

This project declares two transitive dependencies: jopt-simple, a Java library to perform option parsing, and joda-time, a library with utilities for time and date conversion. It also depends on Spark, but Spark is marked as provided to ensure that Spark is never packaged with the application artifacts. The build includes the maven-shade-plugin to create an uber JAR containing all of its dependencies. You enable this by asking Maven to execute the shade goal of the plug-in every time a package phase occurs. With this build configuration, and uber JAR is created automatically when mvn package is run: 

- Example 7-6. Packaging a Spark application built with Maven 
# mvn package

// In the target directory, we'll see an uber JAR and the origianl package JAR.
# ls target/
example-build-1.0.jar
original-example-build-1.0.jar


// Listing the uber JAR will reveal classes from dependency libraries
# jar tf target/example-build-1.0.jar
...

// An uber JAR can be passed directly to spark-submit
# spark-submit --master local ... target/example-build-1.0.jar


Supplement 
Spark Archtecture 
Tutorialspoint - Maven Tutorial

沒有留言:

張貼留言

[Git 常見問題] error: The following untracked working tree files would be overwritten by merge

  Source From  Here 方案1: // x -----删除忽略文件已经对 git 来说不识别的文件 // d -----删除未被添加到 git 的路径中的文件 // f -----强制运行 #   git clean -d -fx 方案2: 今天在服务器上  gi...