2014年7月3日 星期四

Apache Spark學習筆記(9) Monitoring and Instrumentation

There are several ways to monitor Spark applications: web UIs, metrics, and external instrumentation.

Web Interfaces

Every SparkContext launches a web UI, by default on port 4040, that displays useful information about the application. This includes:
  • A list of scheduler stages and tasks
  • A summary of RDD sizes and memory usage
  • Environmental information.
  • Information about the running executors
You can access this interface by simply opening http://<driver-node>:4040 in a web browser. If multiple SparkContexts are running on the same host, they will bind to successive ports beginning with 4040 (4041, 4042, etc).
Note that this information is only available for the duration of the application by default. To view the web UI after the fact, set spark.eventLog.enabled to true before starting the application. This configures Spark to log Spark events that encode the information displayed in the UI to persisted storage.

Viewing After the Fact

Spark’s Standalone Mode cluster manager also has its own web UI. If an application has logged events over the course of its lifetime, then the Standalone master’s web UI will automatically re-render the application’s UI after the application has finished.
If Spark is run on Mesos or YARN, it is still possible to reconstruct the UI of a finished application through Spark’s history server, provided that the application’s event logs exist. You can start a the history server by executing:
./sbin/start-history-server.sh <base-logging-directory>
The base logging directory must be supplied, and should contain sub-directories that each represents an application’s event logs. This creates a web interface at http://<server-url>:18080 by default. The history server can be configured as follows: (spark-env.sh)
Environment Variable
Meaning
SPARK_DAEMON_MEMORY
Memory to allocate to the history server (default: 512m).
SPARK_DAEMON_JAVA_OPTS
JVM options for the history server (default: none).
SPARK_PUBLIC_DNS
The public address for the history server. If this is not set, links to application history may use the internal address of the server, resulting in broken links (default: none).
SPARK_HISTORY_OPTS
spark.history.* configuration options for the history server (default: none).
(spark-defaults.conf ?)
Property Name
Default
Meaning
spark.history.updateInterval
10
The period, in seconds, at which information displayed by this history server is updated. Each update checks for any changes made to the event logs in persisted storage.
spark.history.retainedApplications
250
The number of application UIs to retain. If this cap is exceeded, then the oldest applications will be removed.
spark.history.ui.port
18080
The port to which the web interface of the history server binds.
spark.history.kerberos.enabled
false
Indicates whether the history server should use kerberos to login. This is useful if the history server is accessing HDFS files on a secure Hadoop cluster. If this is true it looks uses the configs spark.history.kerberos.principal and spark.history.kerberos.keytab.
spark.history.kerberos.principal
(none)
Kerberos principal name for the History Server.
spark.history.kerberos.keytab
(none)
Location of the kerberos keytab file for the History Server.
spark.history.ui.acls.enable
false
Specifies whether acls should be checked to authorize users viewing the applications. If enabled, access control checks are made regardless of what the individual application had set for spark.ui.acls.enable when the application was run. The application owner will always have authorization to view their own application and any users specified via spark.ui.view.aclswhen the application was run will also have authorization to view that application. If disabled, no access control checks are made.
Note that in all of these UIs, the tables are sortable by clicking their headers, making it easy to identify slow tasks, data skew, etc.

Metrics

Spark has a configurable metrics system based on the Coda Hale Metrics Library. This allows users to report Spark metrics to a variety of sinks including HTTP, JMX, and CSV files. The metrics system is configured via a configuration file that Spark expects to be present at $SPARK_HOME/conf/metrics.properties. A custom file location can be specified via the spark.metrics.conf configuration property (spark-defaults.conf). Spark’s metrics are decoupled into different instances corresponding to Spark components. Within each instance, you can configure a set of sinks to which metrics are reported. The following instances are currently supported:
  • master: The Spark standalone master process.
  • applications: A component within the master which reports on various applications.
  • worker: A Spark standalone worker process.
  • executor: A Spark executor.
  • driver: The Spark driver process (the process in which your SparkContext is created).
Each instance can report to zero or more sinks. Sinks are contained in the org.apache.spark.metrics.sink package:
  • ConsoleSink: Logs metrics information to the console.
  • CSVSink: Exports metrics data to CSV files at regular intervals.
  • JmxSink: Registers metrics for viewing in a JMX console.
  • MetricsServlet: Adds a servlet within the existing Spark UI to serve metrics data as JSON data.
  • GraphiteSink: Sends metrics to a Graphite node.
Spark also supports a Ganglia sink which is not included in the default build due to licensing restrictions:
  • GangliaSink: Sends metrics to a Ganglia node or multicast group.
To install the GangliaSink you’ll need to perform a custom build of Spark. Note that by embedding this library you will include LGPL-licensed code in your Spark package. For sbt users, set the SPARK_GANGLIA_LGPL environment variable before building. For Maven users, enable the -Pspark-ganglia-lgpl profile. In addition to modifying the cluster’s Spark build user applications will need to link to the spark-ganglia-lgplartifact.
The syntax of the metrics configuration file is defined in an example configuration file, $SPARK_HOME/conf/metrics.properties.template.
[root@0a90e21c spark-1.0.0-bin-cdh4]# vim conf/metrics.properties
#  syntax: [instance].sink|source.[name].[options]=[value]

#  This file configures Spark's internal metrics system. The metrics system is
#  divided into instances which correspond to internal components.
#  Each instance can be configured to report its metrics to one or more sinks.
#  Accepted values for [instance] are "master", "worker", "executor", "driver",
#  and "applications". A wild card "*" can be used as an instance name, in
#  which case all instances will inherit the supplied property.
#
#  Within an instance, a "source" specifies a particular set of grouped metrics.
#  there are two kinds of sources:
#    1. Spark internal sources, like MasterSource, WorkerSource, etc, which will
#    collect a Spark component's internal state. Each instance is paired with a
#    Spark source that is added automatically.
#    2. Common sources, like JvmSource, which will collect low level state.
#    These can be added through configuration options and are then loaded
#    using reflection.
#
#  A "sink" specifies where metrics are delivered to. Each instance can be
#  assigned one or more sinks.
#
#  The sink|source field specifies whether the property relates to a sink or
#  source.
#
#  The [name] field specifies the name of source or sink.
#
#  The [options] field is the specific property of this source or sink. The
#  source or sink is responsible for parsing this property.
#
#  Notes:
#    1. To add a new sink, set the "class" option to a fully qualified class
#    name (see examples below).
#    2. Some sinks involve a polling period. The minimum allowed polling period
#    is 1 second.
#    3. Wild card properties can be overridden by more specific properties.
#    For example, master.sink.console.period takes precedence over
#    *.sink.console.period.
#    4. A metrics specific configuration
#    "spark.metrics.conf=${SPARK_HOME}/conf/metrics.properties" should be
#    added to Java properties using -Dspark.metrics.conf=xxx if you want to
#    customize metrics system. You can also put the file in ${SPARK_HOME}/conf
#    and it will be loaded automatically.
#    5. MetricsServlet is added by default as a sink in master, worker and client
#    driver, you can send http request "/metrics/json" to get a snapshot of all the
#    registered metrics in json format. For master, requests "/metrics/master/json" and
#    "/metrics/applications/json" can be sent seperately to get metrics snapshot of
#    instance master and applications. MetricsServlet may not be configured by self.
#

## List of available sinks and their properties.

# org.apache.spark.metrics.sink.ConsoleSink
#   Name:   Default:   Description:
#   period  10         Poll period
#   unit    seconds    Units of poll period

# org.apache.spark.metrics.sink.CSVSink
#   Name:     Default:   Description:
#   period    10         Poll period
#   unit      seconds    Units of poll period
#   directory /tmp       Where to store CSV files

# org.apache.spark.metrics.sink.GangliaSink
#   Name:     Default:   Description:
#   host      NONE       Hostname or multicast group of Ganglia server
#   port      NONE       Port of Ganglia server(s)
#   period    10         Poll period
#   unit      seconds    Units of poll period
#   ttl       1          TTL of messages sent by Ganglia
#   mode      multicast  Ganglia network mode ('unicast' or 'multicast')

# org.apache.spark.metrics.sink.JmxSink

# org.apache.spark.metrics.sink.MetricsServlet
#   Name:     Default:   Description:
#   path      VARIES*    Path prefix from the web server root
#   sample    false      Whether to show entire set of samples for histograms ('false' or 'true')
#
# * Default path is /metrics/json for all instances except the master. The master has two paths:
#     /metrics/aplications/json # App information
#     /metrics/master/json      # Master information

# org.apache.spark.metrics.sink.GraphiteSink
#   Name:     Default:      Description:
#   host      NONE          Hostname of Graphite server
#   port      NONE          Port of Graphite server
#   period    10            Poll period
#   unit      seconds       Units of poll period
#   prefix    EMPTY STRING  Prefix to prepend to metric name

## Examples
# Enable JmxSink for all instances by class name
#*.sink.jmx.class=org.apache.spark.metrics.sink.JmxSink

# Enable ConsoleSink for all instances by class name
#*.sink.console.class=org.apache.spark.metrics.sink.ConsoleSink

# Polling period for ConsoleSink
#*.sink.console.period=10

#*.sink.console.unit=seconds

# Master instance overlap polling period
#master.sink.console.period=15

#master.sink.console.unit=seconds

# Enable CsvSink for all instances
#*.sink.csv.class=org.apache.spark.metrics.sink.CsvSink

# Polling period for CsvSink
#*.sink.csv.period=1

#*.sink.csv.unit=minutes

# Polling directory for CsvSink
#*.sink.csv.directory=/tmp/

# Worker instance overlap polling period
#worker.sink.csv.period=10

#worker.sink.csv.unit=minutes

# Enable jvm source for instance master, worker, driver and executor
#master.source.jvm.class=org.apache.spark.metrics.source.JvmSource

#worker.source.jvm.class=org.apache.spark.metrics.source.JvmSource

#driver.source.jvm.class=org.apache.spark.metrics.source.JvmSource

#executor.source.jvm.class=org.apache.spark.metrics.source.JvmSource

master.source.jvm.class=org.apache.spark.metrics.source.JvmSource
worker.source.jvm.class=org.apache.spark.metrics.source.JvmSource
driver.source.jvm.class=org.apache.spark.metrics.source.JvmSource
executor.source.jvm.class=org.apache.spark.metrics.source.JvmSource

*.sink.csv.class=org.apache.spark.metrics.sink.CsvSink
*.sink.csv.period=10
*.sink.csv.unit=seconds
*.sink.csv.directory=/tmp/spark.metrics/

[root@0a90e21c spark-1.0.0-bin-cdh4]# sh ~/util/scpfile.sh $SPARK_HOME/conf/metrics.properties
[root@0a90e21c spark-1.0.0-bin-cdh4]# mkdir -p /tmp/spark.metrics/;chmod 777 /tmp/spark.metrics
[root@0a90e21c spark-1.0.0-bin-cdh4]# sh ~/util/sshcmd.sh "mkdir -p /tmp/spark.metrics/;chmod 777 /tmp/spark.metrics"
[root@0a90e21c spark-1.0.0-bin-cdh4]# ./sbin/start-master.sh
[root@0a90e21c spark-1.0.0-bin-cdh4]# ls /tmp/spark.metrics/
jvm.heap.committed.csv             jvm.pools.PS-Perm-Gen.usage.csv
jvm.heap.init.csv                  jvm.pools.PS-Survivor-Space.usage.csv
jvm.heap.max.csv                   jvm.PS-MarkSweep.count.csv
jvm.heap.usage.csv                 jvm.PS-MarkSweep.time.csv
jvm.heap.used.csv                  jvm.PS-Scavenge.count.csv
jvm.non-heap.committed.csv         jvm.PS-Scavenge.time.csv
jvm.non-heap.init.csv              jvm.total.committed.csv
jvm.non-heap.max.csv               jvm.total.init.csv
jvm.non-heap.usage.csv             jvm.total.max.csv
jvm.non-heap.used.csv              jvm.total.used.csv
jvm.pools.Code-Cache.usage.csv     master.apps.csv
jvm.pools.PS-Eden-Space.usage.csv  master.waitingApps.csv
jvm.pools.PS-Old-Gen.usage.csv     master.workers.csv

[root@0a90e21c spark-1.0.0-bin-cdh4]# ./sbin/start-slaves.sh

[root@0a90e21f ~]# ls /tmp/spark.metrics/
jvm.heap.committed.csv             jvm.pools.PS-Survivor-Space.usage.csv
jvm.heap.init.csv                  jvm.PS-MarkSweep.count.csv
jvm.heap.max.csv                   jvm.PS-MarkSweep.time.csv
jvm.heap.usage.csv                 jvm.PS-Scavenge.count.csv
jvm.heap.used.csv                  jvm.PS-Scavenge.time.csv
jvm.non-heap.committed.csv         jvm.total.committed.csv
jvm.non-heap.init.csv              jvm.total.init.csv
jvm.non-heap.max.csv               jvm.total.max.csv
jvm.non-heap.usage.csv             jvm.total.used.csv
jvm.non-heap.used.csv              worker.coresFree.csv
jvm.pools.Code-Cache.usage.csv     worker.coresUsed.csv
jvm.pools.PS-Eden-Space.usage.csv  worker.executors.csv
jvm.pools.PS-Old-Gen.usage.csv     worker.memFree_MB.csv
jvm.pools.PS-Perm-Gen.usage.csv    worker.memUsed_MB.csv
.

Advanced Instrumentation

Several external tools can be used to help profile the performance of Spark jobs:
  • Cluster-wide monitoring tools, such as Ganglia, can provide insight into overall cluster utilization and resource bottlenecks. For instance, a Ganglia dashboard can quickly reveal whether a particular workload is disk bound, network bound, or CPU bound.
  • OS profiling tools such as dstat, iostat, and iotop can provide fine-grained profiling on individual nodes.
  • JVM utilities such as jstack for providing stack traces, jmap for creating heap-dumps, jstat for reporting time-series statistics and jconsolefor visually exploring various JVM properties are useful for those comfortable with JVM internals.