2014年6月25日 星期三

Apache Spark學習筆記(5) Programming Guide Part 4

RDD Persistence

One of the most important capabilities in Spark is persisting (or caching) a dataset in memory across operations. When you persist an RDD, each node stores any partitions of it that it computes in memory and reuses them in other actions on that dataset (or datasets derived from it). This allows future actions to be much faster (often by more than 10x). Caching is a key tool for iterative algorithms and fast interactive use.
You can mark an RDD to be persisted using the persist() or cache() methods on it. The first time it is computed in an action, it will be kept in memory on the nodes. Spark’s cache is fault-tolerant – if any partition of an RDD is lost, it will automatically be recomputed using the transformations that originally created it.
In addition, each persisted RDD can be stored using a different storage level, allowing you, for example, to persist the dataset on disk, persist it in memory but as serialized Java objects (to save space), replicate it across nodes, or store it off-heap in Tachyon. These levels are set by passing a StorageLevel object (Scala, Java, Python) to persist(). The cache() method is a shorthand for using the default storage level, which is StorageLevel.MEMORY_ONLY (store deserialized objects in memory). The full set of storage levels is:

Storage Level
Meaning
MEMORY_ONLY
Store RDD as deserialized Java objects in the JVM. If the RDD does not fit in memory, some partitions will not be cached and will be recomputed on the fly each time they're needed. This is the default level.
MEMORY_AND_DISK
Store RDD as deserialized Java objects in the JVM. If the RDD does not fit in memory, store the partitions that don't fit on disk, and read them from there when they're needed.
MEMORY_ONLY_SER
Store RDD as serialized Java objects (one byte array per partition). This is generally more space-efficient than deserialized objects, especially when using a fast serializer, but more CPU-intensive to read.
MEMORY_AND_DISK_SER
Similar to MEMORY_ONLY_SER, but spill partitions that don't fit in memory to disk instead of recomputing them on the fly each time they're needed.
DISK_ONLY
Store the RDD partitions only on disk.
MEMORY_ONLY_2, MEMORY_AND_DISK_2, etc.
Same as the levels above, but replicate each partition on two cluster nodes.
OFF_HEAP (experimental)
Store RDD in serialized format in Tachyon. Compared to MEMORY_ONLY_SER, OFF_HEAP reduces garbage collection overhead and allows executors to be smaller and to share a pool of memory, making it attractive in environments with large heaps or multiple concurrent applications.
Note: In Python, stored objects will always be serialized with the Pickle library, so it does not matter whether you choose a serialized level.
Spark also automatically persists some intermediate data in shuffle operations (e.g. reduceByKey), even without users calling persist. This is done to avoid recomputing the entire input if a node fails during the shuffle. We still recommend users call persist on the resulting RDD if they plan to reuse it.

Which Storage Level to Choose?

Spark’s storage levels are meant to provide different trade-offs between memory usage and CPU efficiency. We recommend going through the following process to select one:
  • If your RDDs fit comfortably with the default storage level (MEMORY_ONLY), leave them that way. This is the most CPU-efficient option, allowing operations on the RDDs to run as fast as possible.
  • If not, try using MEMORY_ONLY_SER and selecting a fast serialization library to make the objects much more space-efficient, but still reasonably fast to access.
  • Don’t spill to disk unless the functions that computed your datasets are expensive, or they filter a large amount of the data. Otherwise, recomputing a partition may be as fast as reading it from disk.
  • Use the replicated storage levels if you want fast fault recovery (e.g. if using Spark to serve requests from a web application). All the storage levels provide full fault tolerance by recomputing lost data, but the replicated ones let you continue running tasks on the RDD without waiting to recompute a lost partition.
  • In environments with high amounts of memory or multiple applications, the experimental OFF_HEAP mode has several advantages:
    • It allows multiple executors to share the same pool of memory in Tachyon.
    • It significantly reduces garbage collection costs.
    • Cached data is not lost if individual executors crash.

Removing Data

Spark automatically monitors cache usage on each node and drops out old data partitions in a least-recently-used (LRU) fashion. If you would like to manually remove an RDD instead of waiting for it to fall out of the cache, use the RDD.unpersist() method.

org.apache.spark.storage.StorageLevel

Flags for controlling the storage of an RDD. Each StorageLevel records whether to use memory, or Tachyon, whether to drop the RDD to disk if it falls out of memory or Tachyon , whether to keep the data in memory in a serialized format, and whether to replicate the RDD partitions on multiple nodes.
The org.apache.spark.storage.StorageLevel$ singleton object contains some static constants for commonly useful storage levels. To create your own storage level object, use the factory method of the singleton object (StorageLevel(...)).

org.apache.spark.storage.StorageLevel$

val DISK_ONLY: StorageLevel
val DISK_ONLY_2: StorageLevel
val MEMORY_AND_DISK: StorageLevel
val MEMORY_AND_DISK_2: StorageLevel
val MEMORY_AND_DISK_SER: StorageLevel
val MEMORY_AND_DISK_SER_2: StorageLevel
val MEMORY_ONLY: StorageLevel
val MEMORY_ONLY_2: StorageLevel
val MEMORY_ONLY_SER: StorageLevel
val MEMORY_ONLY_SER_2: StorageLevel
val NONE: StorageLevel
val OFF_HEAP: StorageLevel
def apply(in: ObjectInput): StorageLevel
Read StorageLevel object from ObjectInput stream.
def apply(flags: Int, replication: Int): StorageLevel
Create a new StorageLevel object from its integer representation.
def apply(useDisk: Boolean, useMemory: Boolean, deserialized: Boolean, replication: Int = 1): StorageLevel
Create a new StorageLevel object.
def apply(useDisk: Boolean, useMemory: Boolean, useOffHeap: Boolean, deserialized: Boolean, replication: Int): StorageLevel
Create a new StorageLevel object without setting useOffHeap.

org.apache.spark.rdd.RDD

def cache(): RDD.this.type
Persist this RDD with the default storage level (MEMORY_ONLY).
@@shorthand for using MEMORY_ONLY
def getStorageLevel: StorageLevel
Get the RDD's current storage level, or StorageLevel.
getStorageLevel.toString: String = (T=DISK, T=MEM, T=OFFHEAP, T=DESERIALIZED, #replication)
def persist(): RDD.this.type
Persist this RDD with the default storage level (MEMORY_ONLY).
def persist(newLevel: StorageLevel): RDD.this.type
Set this RDD's storage level to persist its values across operations after the first time it is computed. This can only be used to assign a new storage level if the RDD does not have a storage level set yet.
def unpersist(blocking: Boolean = true): RDD.this.type
Mark the RDD as non-persistent, and remove all blocks for it from memory and disk.
blocking : Whether to block until all blocks are deleted.
returns : This RDD.

@@新建立的RDD預設storagelevel為NODISK, NOMEM, SERIALIZED, 1x relicated
scala> val rdd1 = sc.parallelize(1 to 1000000000)
scala> rdd1.getStorageLevel.description
res10: String = Serialized 1x Replicated
scala> rdd1.getStorageLevel.toString
res11: String = StorageLevel(false, false, false, false, 1)
@@RDD.persist後就不能再修改storagelevel, 使用MEMORY_ONLY
scala> rdd1.persist(StorageLevel.MEMORY_ONLY)
res12: rdd1.type = ParallelCollectionRDD[2] at parallelize at <console>:15
scala> rdd1.getStorageLevel.description
res14: String = Memory Deserialized 1x Replicated
scala> rdd1.getStorageLevel.toString
res13: String = StorageLevel(false, true, false, true, 1)
@@使用MEMORY_ONLY_SER
scala> val rdd1 = sc.parallelize(1 to 1000000000)
scala> rdd1.persist(StorageLevel.MEMORY_ONLY_SER)
scala> rdd1.getStorageLevel.description
res20: String = Memory Serialized 1x Replicated
scala> rdd1.getStorageLevel.toString
res19: String = StorageLevel(false, true, false, false, 1)
@@使用DISK_ONLY
scala> val rdd1 = sc.parallelize(1 to 1000000000)
scala> rdd1.persist(StorageLevel.DISK_ONLY)
scala> rdd1.getStorageLevel.description
res22: String = Disk Serialized 1x Replicated
scala> rdd1.getStorageLevel.toString
res23: String = StorageLevel(true, false, false, false, 1)
@@使用OFF_HEAP
scala> val rdd1 = sc.parallelize(1 to 1000000000)
scala> rdd1.persist(StorageLevel.OFF_HEAP)
scala> rdd1.getStorageLevel.description
res25: String = Tachyon Serialized 1x Replicated
scala> rdd1.getStorageLevel.toString
res26: String = StorageLevel(false, false, true, false, 1)
@@1st測試, restart spark-shell
scala> val rdd1 = sc.parallelize(1 to 1000000000)
scala> val rdd2 = rdd1.sample(true, 0.5)
scala> rdd2.getStorageLevel.toString
res27: String = StorageLevel(false, false, false, false, 1)
scala> val col1 = rdd2.max()
14/06/18 18:30:28 INFO SparkContext: Job finished: max at <console>:16, took 10.880428561 s
col1: Int = 1000000000
scala> val col2 = rdd2.min()
14/06/18 18:30:59 INFO SparkContext: Job finished: min at <console>:16, took 7.276179896 s
col2: Int = 1
scala> val col3 = rdd2.mean()
14/06/18 18:31:34 INFO SparkContext: Job finished: mean at <console>:16, took 6.172543001 s
col3: Double = 5.0000175265337527E8
scala> val col4 = rdd2.count
14/06/18 18:32:12 INFO SparkContext: Job finished: count at <console>:16, took 5.662854106 s
col4: Long = 500027117
scala> rdd2.getStorageLevel.toString
res1: String = StorageLevel(false, false, false, false, 1)
@@2nd測試, restart spark-shell
scala> val rdd1 = sc.parallelize(1 to 1000000000)
scala> val rdd2 = rdd1.sample(true, 0.5)
scala> rdd2.getStorageLevel.toString
res27: String = StorageLevel(false, false, false, false, 1)
scala> val col1 = rdd2.max()
...
14/06/19 18:02:02 INFO TaskSetManager: Finished TID 39 in 6143 ms on 0a90e21e.cht.local (progress: 4/72)
...
14/06/19 18:02:03 INFO TaskSetManager: Finished TID 26 in 7492 ms on 0a90e21f.cht.local (progress: 30/72)
...
14/06/19 18:02:04 INFO TaskSetManager: Finished TID 37 in 8044 ms on 0a90e21d.cht.local (progress: 51/72)
...
14/06/19 18:02:04 INFO SparkContext: Job finished: max at <console>:16, took 8.910027265 s
col1: Int = 1000000000
scala> val col2 = rdd2.min()
...
14/06/19 18:06:55 INFO TaskSetManager: Finished TID 104 in 5073 ms on 0a90e21e.cht.local (progress: 24/72)
...
14/06/19 18:06:58 INFO TaskSetManager: Finished TID 114 in 7353 ms on 0a90e21f.cht.local (progress: 62/72)
...
14/06/19 18:06:57 INFO TaskSetManager: Finished TID 79 in 6930 ms on 0a90e21d.cht.local (progress: 58/72)
...
14/06/19 18:06:59 INFO SparkContext: Job finished: min at <console>:16, took 8.314410462 s
col2: Int = 2
scala> val col3 = rdd2.mean()
...
14/06/19 18:14:43 INFO TaskSetManager: Finished TID 146 in 3730 ms on 0a90e21e.cht.local (progress: 17/72)
...
14/06/19 18:14:43 INFO TaskSetManager: Finished TID 196 in 3906 ms on 0a90e21f.cht.local (progress: 31/72)
...
14/06/19 18:14:44 INFO TaskSetManager: Finished TID 156 in 4703 ms on 0a90e21d.cht.local (progress: 70/72)
...
14/06/19 18:14:44 INFO SparkContext: Job finished: mean at <console>:16, took 4.755392182 s
col3: Double = 5.000138385235054E8
scala> val col4 = rdd2.count
...
14/06/19 18:16:49 INFO TaskSetManager: Finished TID 281 in 2354 ms on 0a90e21e.cht.local (progress: 18/72)
...
14/06/19 18:16:50 INFO TaskSetManager: Finished TID 246 in 3284 ms on 0a90e21d.cht.local (progress: 37/72)
...
14/06/19 18:16:51 INFO TaskSetManager: Finished TID 265 in 4340 ms on 0a90e21f.cht.local (progress: 65/72)
...
14/06/19 18:16:51 INFO SparkContext: Job finished: count at <console>:16, took 4.943686317 s
col4: Long = 500014049
scala> rdd2.getStorageLevel.toString
res1: String = StorageLevel(false, false, false, false, 1)

@@1st測試, restart spark-shell
scala> val rdd1 = sc.parallelize(1 to 1000000000)
scala> val rdd2 = rdd1.sample(true, 0.5).cache
scala> rdd2.getStorageLevel.toString
res0: String = StorageLevel(false, true, false, true, 1)
scala> val col1 = rdd2.max()
14/06/18 18:35:04 INFO SparkContext: Job finished: max at <console>:16, took 15.940654738 s
col1: Int = 999999997
scala> val col2 = rdd2.min()
14/06/18 18:35:31 INFO SparkContext: Job finished: min at <console>:16, took 3.681960131 s
col2: Int = 1
scala> val col3 = rdd2.mean()
14/06/18 18:36:56 INFO SparkContext: Job finished: mean at <console>:16, took 30.61424862 s
col3: Double = 4.9998534056021273E8
scala> val col4 = rdd2.count
14/06/18 18:37:23 INFO SparkContext: Job finished: count at <console>:16, took 0.51403803 s
col4: Long = 499994557
scala> rdd2.getStorageLevel.toString
res1: String = StorageLevel(false, true, false, true, 1)
@@2nd測試, restart spark-shell
scala> val rdd1 = sc.parallelize(1 to 1000000000)
scala> val rdd2 = rdd1.sample(true, 0.5).cache
scala> rdd2.getStorageLevel.toString
res0: String = StorageLevel(false, true, false, true, 1)
scala> val col1 = rdd2.max()
...
14/06/19 14:18:29 INFO BlockManagerInfo: Added rdd_1_57 in memory on 0a90e21e.cht.local:38523 (size: 233.0 MB, free: 26.7 GB)
14/06/19 14:18:29 INFO BlockManagerInfo: Added rdd_1_69 in memory on 0a90e21e.cht.local:38523 (size: 233.0 MB, free: 26.5 GB)
…@@呼叫cache()後, 1st action指令task完成時間比沒有cache()慢10x
14/06/19 14:18:31 INFO DAGScheduler: Completed ResultTask(0, 57)
14/06/19 14:18:31 INFO TaskSetManager: Finished TID 57 in 53569 ms on 0a90e21e.cht.local (progress: 4/72)
...
14/06/19 14:18:33 INFO BlockManagerInfo: Added rdd_1_19 in memory on 0a90e21f.cht.local:58978 (size: 233.0 MB, free: 27.4 GB)
...
14/06/19 14:18:40 INFO DAGScheduler: Completed ResultTask(0, 19)
14/06/19 14:18:40 INFO TaskSetManager: Finished TID 19 in 62831 ms on 0a90e21f.cht.local (progress: 30/72)
...
14/06/19 14:18:41 INFO BlockManagerInfo: Added rdd_1_11 in memory on 0a90e21d.cht.local:50875 (size: 233.0 MB, free: 24.0 GB)
...
14/06/19 14:18:42 INFO DAGScheduler: Completed ResultTask(0, 11)
14/06/19 14:18:42 INFO TaskSetManager: Finished TID 11 in 64401 ms on 0a90e21d.cht.local (progress: 50/72)
...
14/06/19 14:18:44 INFO SparkContext: Job finished: max at <console>:16, took 66.77888306 s
col1: Int = 1000000000
col1: Int = 999999997
scala> val col2 = rdd2.min()
…@@呼叫cache()後, 2nd action指令task完成時間比沒有cache()快5x~6x
14/06/19 14:27:09 INFO TaskSetManager: Finished TID 135 in 872 ms on 0a90e21d.cht.local (progress: 24/72)
...
14/06/19 14:27:09 INFO TaskSetManager: Finished TID 137 in 1047 ms on 0a90e21f.cht.local (progress: 62/72)
...
14/06/19 14:27:09 INFO TaskSetManager: Finished TID 82 in 1100 ms on 0a90e21e.cht.local (progress: 67/72)
...
114/06/19 14:27:09 INFO SparkContext: Job finished: min at <console>:16, took 1.136201522 s
col2: Int = 2
…@@呼叫cache()後, 3rd action指令task完成時間比沒有cache()慢4x~6x
scala> val col3 = rdd2.mean()
...
14/06/19 17:52:03 INFO TaskSetManager: Finished TID 176 in 10582 ms on 0a90e21e.cht.local (progress: 17/72)
...
14/06/19 17:52:11 INFO TaskSetManager: Finished TID 192 in 18987 ms on 0a90e21d.cht.local (progress: 31/72)
...
14/06/19 17:52:18 INFO TaskSetManager: Finished TID 163 in 25714 ms on 0a90e21f.cht.local (progress: 70/72)
...
14/06/19 17:52:18 INFO SparkContext: Job finished: mean at <console>:16, took 25.85574916 s
col3: Double = 5.000152471322728E8
…@@呼叫cache()後, 4th action指令task完成時間比沒有cache()快5x~6x
scala> val col4 = rdd2.count
...
14/06/19 17:55:37 INFO TaskSetManager: Finished TID 244 in 405 ms on 0a90e21d.cht.local (progress: 18/72)
...
14/06/19 17:55:37 INFO TaskSetManager: Finished TID 278 in 484 ms on 0a90e21e.cht.local (progress: 37/72)
...
14/06/19 17:55:37 INFO TaskSetManager: Finished TID 255 in 857 ms on 0a90e21f.cht.local (progress: 65/72)
...
14/06/19 17:55:38 INFO SparkContext: Job finished: count at <console>:16, took 1.020729327 s
col4: Long = 500020669
scala> rdd2.getStorageLevel.toString
res1: String = StorageLevel(false, true, false, true, 1)


default mode (NO_DISK, NO_MEM, NO_OFFHEAP, SERIALIZED, 1 replication)
MEMORY_ONLY_SER()
MEMORY_ONLY (NO_DISK, MEM, NO_OFFHEAP, DESERIALIZED, 1 replication)

1st action, tasks
6143 ms
7492 ms
8044 ms

53569 ms
62831 ms
64401 ms

1st action, job
8.910027265 s

66.77888306 s

2nd action, task
5073 ms
7353 ms
6930 ms

872 ms
1047 ms
1100 ms

2nd action, stage
8.314410462 s

1.136201522 s

3rd action, task
3730 ms
3906 ms
4703 ms

10582 ms
18987 ms
25714 ms

3rd action, stage
4.755392182 s

25.85574916 s

4th action, task
2354 ms
3284 ms
4340 ms

405 ms
484 ms
857 ms

4th action, stage
4.943686317 s

1.020729327 s

Shared Variables

Normally, when a function passed to a Spark operation (such as map or reduce) is executed on a remote cluster node, it works on separate copies of all the variables used in the function. These variables are copied to each machine, and no updates to the variables on the remote machine are propagated back to the driver program. Supporting general, read-write shared variables across tasks would be inefficient. However, Spark does provide two limited types of shared variables for two common usage patterns: broadcast variables and accumulators.
@@傳入spark成員函式(map, reduce)的closure, 其使用的”外部”變數會複製到所有executor(的task), 而且變數異動後不會傳回給driver program

Broadcast Variables

Broadcast variables allow the programmer to keep a read-only variable cached on each machine rather than shipping a copy of it with tasks. They can be used, for example, to give every node a copy of a large input dataset in an efficient manner. Spark also attempts to distribute broadcast variables using efficient broadcast algorithms to reduce communication cost.
@@broadcast var僅保留一份在每台worker, 而不是在每個task, 可以降低資料傳輸的需求
Broadcast variables are created from a variable v by calling SparkContext.broadcast(v). The broadcast variable is a wrapper around v, and its value can be accessed by calling the value method. The code below shows this:
scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))
broadcastVar: spark.Broadcast[Array[Int]] = spark.Broadcast(b5c40191-a864-4c7d-b9bf-d87e1a4e787c)

scala> broadcastVar.value
res0: Array[Int] = Array(1, 2, 3)
After the broadcast variable is created, it should be used instead of the value v in any functions run on the cluster so that v is not shipped to the nodes more than once. In addition, the object v should not be modified after it is broadcast in order to ensure that all nodes get the same value of the broadcast variable (e.g. if the variable is shipped to a new node later).

org.apache.spark.SparkContext

def broadcast[T](value: T)(implicit arg0: ClassTag[T]): Broadcast[T]
Broadcast a read-only variable to the cluster, returning a org.apache.spark.broadcast.Broadcast object for reading it in distributed functions. The variable will be sent to each cluster only once.

org.apache.spark.broadcast.Broadcast

def toString(): String
Definition Classes
Broadcast → AnyRef → Any
def unpersist(blocking: Boolean): Unit
Delete cached copies of this broadcast on the executors. If the broadcast is used after this is called, it will need to be re-sent to each executor.
blocking
Whether to block until unpersisting has completed
def unpersist(): Unit
Asynchronously delete cached copies of this broadcast on the executors. If the broadcast is used after this is called, it will need to be re-sent to each executor.
def value: T
Get the broadcasted value.

@@restart spark-shell, using outer-inner loop
scala> val var1 = 1 to 100000
var1: scala.collection.immutable.Range.Inclusive = Range(1, 2, 3, 4, 5,
scala> val rdd1 = sc.parallelize(for (i <- 1 to 1000000) yield ("k"+i, i))
rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[0] at parallelize at <console>:12
scala> val rdd2 = rdd1.mapPartitions(ite => {
  for {
    i <- ite
    j <- var1 if i._2 == j
  } yield (i._1, i._2, j) })
rdd2: org.apache.spark.rdd.RDD[(String, Int, Int)] = MapPartitionsRDD[2] at mapPartitions at <console>:16
scala> val col1 = rdd2.collect.take(2)
...
23 23 SUCCESS PROCESS_LOCAL 0a90e21f.cht.local 2014/06/20 19:44:38 1.6 min 2 s 1 ms
48 48 SUCCESS PROCESS_LOCAL 0a90e21e.cht.local 2014/06/20 19:44:38 1.5 min 6 s
70 70 SUCCESS PROCESS_LOCAL 0a90e21d.cht.local 2014/06/20 19:44:40 1.4 min 2 s
...
14/06/20 19:46:29 INFO SparkContext: Job finished: collect at <console>:18, took 112.232739939 s
col1: Array[(String, Int, Int)] = Array((k1,1,1), (k2,2,2))
@@restart spark-shell, using filter method
scala> val var1 = 1 to 100000
scala> val rdd1 = sc.parallelize(for (i <- 1 to 1000000) yield ("k"+i, i))
scala> val rdd2 = rdd1.mapPartitions(ite => {
  for {
    i <- ite
    elems = var1.filter(_.equals(i._2))
    if elems.size > 0
  } yield (i._1, i._2, elems)
})
rdd2: org.apache.spark.rdd.RDD[(String, Int, scala.collection.immutable.IndexedSeq[Int])] = MapPartitionsRDD[1] at mapPartitions at <console>:16
scala> val col1 = rdd2.collect.take(2)
...
23 23 SUCCESS PROCESS_LOCAL 0a90e21e.cht.local 2014/06/24 10:29:55 3.0 min 2 s
48 48 SUCCESS PROCESS_LOCAL 0a90e21f.cht.local 2014/06/24 10:29:55 2.8 min 2 s
70 70 SUCCESS PROCESS_LOCAL 0a90e21d.cht.local 2014/06/24 10:29:56 2.8 min 1 s
...
14/06/24 10:32:57 INFO SparkContext: Job finished: collect at <console>:18, took 183.184140862 s
col1: Array[(String, Int, scala.collection.immutable.IndexedSeq[Int])] = Array((k1,1,Vector(1)), (k2,2,Vector(2)))
@@restart spark-shell, using contains & find method
scala> val var1 = 1 to 100000
scala> val rdd1 = sc.parallelize(for (i <- 1 to 1000000) yield ("k"+i, i))
scala> val rdd2 = rdd1.mapPartitions(ite => {
  for {
    i <- ite if var1.contains(i._2)
    elem = var1.find(_.equals(i._2))
  } yield (i._1, i._2, elem)
})
rdd2: org.apache.spark.rdd.RDD[(String, Int, Option[Int])] = MapPartitionsRDD[1] at mapPartitions at <console>:16
scala> val col1 = rdd2.collect.take(2)
...
23 23 SUCCESS PROCESS_LOCAL 0a90e21d.cht.local 2014/06/24 10:53:35 0.1 s 22 ms 1 ms
48 48 SUCCESS PROCESS_LOCAL 0a90e21f.cht.local 2014/06/24 10:53:36 3 ms   23 ms 4 ms
70 70 SUCCESS PROCESS_LOCAL 0a90e21e.cht.local 2014/06/24 10:53:36 69 ms
...
14/06/24 10:54:18 INFO SparkContext: Job finished: collect at <console>:18, took 43.63701334 s
col1: Array[(String, Int, Option[Int])] = Array((k1,1,Some(1)), (k2,2,Some(2)))
@@restart spark-shell, using contains & find method with bc
scala> val bc1 = sc.broadcast(1 to 100000)
14/06/23 18:01:10 INFO MemoryStore: ensureFreeSpace(160) called with curMem=0, maxMem=308713881
14/06/23 18:01:10 INFO MemoryStore: Block broadcast_0 stored as values to memory (estimated size 160.0 B, free 294.4 MB)
bc1: org.apache.spark.broadcast.Broadcast[scala.collection.immutable.Range.Inclusive] = Broadcast(0)
scala> val m = bc1.value
m: scala.collection.immutable.Range.Inclusive = Range(1, 2, 3, 4, 5, 6, 7, 8, 9,
scala> val rdd1 = sc.parallelize(for (i <- 1 to 1000000) yield ("k"+i, i))
scala> val rdd2 = rdd1.mapPartitions(ite => {
  var m = bc1.value
  for {
    i <- ite if m.contains(i._2)
    elem = m.find(_.equals(i._2))
  } yield (i._1, i._2, elem)
})
rdd2: org.apache.spark.rdd.RDD[(String, Int, Option[Int])] = MapPartitionsRDD[2] at mapPartitions at <console>:16
scala> val col1 = rdd2.collect.take(2)
...
23 23 SUCCESS PROCESS_LOCAL 0a90e21e.cht.local 2014/06/24 13:17:14 0.1 s 23 ms
48 48 SUCCESS PROCESS_LOCAL 0a90e21d.cht.local 2014/06/24 13:17:14 0.2 s 22 ms 3 ms
70 70 SUCCESS PROCESS_LOCAL 0a90e21f.cht.local 2014/06/24 13:17:15 61 ms X      6 ms
...
14/06/24 13:17:54 INFO SparkContext: Job finishedcollect at <console>:18, took 41.452483908 s
col1: Array[(String, Int, Option[Int])] = Array((k1,1,Some(1)), (k2,2,Some(2)))
@@restart spark-shell, using filter method with bc
scala> val bc1 = sc.broadcast(1 to 100000)
scala> val rdd1 = sc.parallelize(for (i <- 1 to 1000000) yield ("k"+i, i))
scala> val rdd2 = rdd1.mapPartitions(ite => {
  var m = bc1.value
  for {
    i <- ite
    elems = m.filter(_.equals(i._2))
    if elems.size > 0
  } yield (i._1, i._2, elems)
})
rdd2: org.apache.spark.rdd.RDD[(String, Int, scala.collection.immutable.IndexedSeq[Int])] = MapPartitionsRDD[1] at mapPartitions at <console>:16
scala> val col1 = rdd2.collect.take(2)
...
23 23 SUCCESS PROCESS_LOCAL 0a90e21d.cht.local 2014/06/24 13:28:15 2.1 min 0.5 s
48 48 SUCCESS PROCESS_LOCAL 0a90e21f.cht.local 2014/06/24 13:28:15 2.0 min 0.4 s
70 70 SUCCESS PROCESS_LOCAL 0a90e21e.cht.local 2014/06/24 13:28:16 2.0 min 1 s
...
14/06/24 13:30:25 INFO SparkContext: Job finished: collect at <console>:18, took 130.344458484 s
col1: Array[(String, Int, scala.collection.immutable.IndexedSeq[Int])] = Array((k1,1,Vector(1)), (k2,2,Vector(2)))
.

Accumulators

Accumulators are variables that are only “added” to through an associative operation and can therefore be efficiently supported in parallel. They can be used to implement counters (as in MapReduce) or sums. Spark natively supports accumulators of numeric types, and programmers can add support for new types.
An accumulator is created from an initial value v by calling SparkContext.accumulator(v). Tasks running on the cluster can then add to it using the add method or the += operator (in Scala and Python). However, they cannot read its value. Only the driver program can read the accumulator’s value, using its value method.
@@RDD提供count method, 但是未提供 sum method, 因此可以用 accumulator 來實作 sum 或是特定條件的 counter
@@accumulator搭配foreach method的效果與RDD fold/reduce method類似, 不過 accumulator 僅能累加, 運算時沒有上次運算結果,


init value
last op value
op
use cases
fold
Yes
Yes
Any

accumulator
Yes
No
add only
特定條件的counter
reduce
No
Yes*(1st op value is 1st member)
Any
非數值型態RDD的max, min, sum

The code below shows an accumulator being used to add up the elements of an array:

scala> val accum = sc.accumulator(0)
accum: spark.Accumulator[Int] = 0

scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x)
...
10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s

scala> accum.value
res2: Int = 10

org.apache.spark.SparkContext

def accumulableCollection[R, T](initialValue: R)(implicit arg0: (R) ⇒ Growable[T] with TraversableOnce[T] with Serializable, arg1:ClassTag[R]): Accumulable[R, T]
Create an accumulator from a "mutable collection" type.
Growable and TraversableOnce are the standard APIs that guarantee += and ++=, implemented by standard mutable collections. So you can use this with mutable Map, Set, etc.
def accumulator[T](initialValue: T)(implicit param: AccumulatorParam[T]): Accumulator[T]
Create an org.apache.spark.Accumulator variable of a given type, which tasks can "add" values to using the += method. Only the driver can access the accumulator's value.
@@restart spark-shell
scala> val accum = sc.accumulator(0)
accum: org.apache.spark.Accumulator[Int] = 0
scala> val rdd1 = sc.parallelize(for (i <- 1 to 2000000) yield ("k"+i, i))
scala> rdd1.foreach(tup => if (tup._2 % 3 == 0) accum += 1)
...
14/06/25 11:11:16 INFO SparkContext: Job finished: foreach at <console>:17, took 30.419204199 s
scala> accum.value
res1: Int = 66666
@@restart spark-shell
scala> val rdd1 = sc.parallelize(for (i <- 1 to 2000000) yield ("k"+i, i))
scala> val rdd3 = rdd1.reduce( (t1,t2) => {
 if (t1._2 > t2._2)
   t1
 else
   t2
})
...
14/06/25 13:17:59 INFO SparkContext: Job finished: reduce at <console>:14, took 32.298484018 s
rdd3: (String, Int) = (k2000000,2000000)
@@restart spark-shell
scala> val rdd1 = sc.parallelize(for (i <- 1 to 2000000) yield ("k"+i, i))
scala> val rdd3 = rdd1.map(_._2).max
14/06/25 14:15:08 INFO SparkContext: Job finished: max at <console>:14, took 29.415851174 s
rdd3: Int = 2000000
.

While this code used the built-in support for accumulators of type Int, programmers can also create their own types by subclassing AccumulatorParam. The AccumulatorParam interface has two methods: zero for providing a “zero value” for your data type, and addInPlace for adding two values together. For example, supposing we had a Vector class representing mathematical vectors, we could write:

object VectorAccumulatorParam extends AccumulatorParam[Vector] {
 def zero(initialValue: Vector): Vector = {
   Vector.zeros(initialValue.size)
 }
 def addInPlace(v1: Vector, v2: Vector): Vector = {
   v1 += v2
 }
}
// Then, create an Accumulator of this type:
val vecAccum = sc.accumulator(new Vector(...))(VectorAccumulatorParam)
In Scala, Spark also supports the more general Accumulable interface to accumulate data where the resulting type is not the same as the elements added (e.g. build a list by collecting together elements), and the SparkContext.accumulableCollection method for accumulating common Scala collection types.

Deploying to a Cluster

The application submission guide describes how to submit applications to a cluster. In short, once you package your application into a JAR (for Java/Scala) or a set of .py or .zip files (for Python), the bin/spark-submit script lets you submit it to any supported cluster manager.

Unit Testing

Spark is friendly to unit testing with any popular unit test framework. Simply create a SparkContext in your test with the master URL set to local, run your operations, and then call SparkContext.stop() to tear it down. Make sure you stop the context within a finally block or the test framework’s tearDown method, as Spark does not support two contexts running concurrently in the same program.

Migrating from pre-1.0 Versions of Spark

Spark 1.0 freezes the API of Spark Core for the 1.X series, in that any API available today that is not marked “experimental” or “developer API” will be supported in future versions. The only change for Scala users is that the grouping operations, e.g. groupByKey, cogroup and join, have changed from returning (Key, Seq[Value]) pairs to (Key, Iterable[Value]).
Migration guides are also available for Spark Streaming, MLlib and GraphX.

Where to Go from Here

You can see some example Spark programs on the Spark website. In addition, Spark includes several samples in the examples directory (Scala,Java, Python). You can run Java and Scala examples by passing the class name to Spark’s bin/run-example script; for instance:
./bin/run-example SparkPi
For Python examples, use spark-submit instead:
./bin/spark-submit examples/src/main/python/pi.py
For help on optimizing your programs, the configuration and tuning guides provide information on best practices. They are especially important for making sure that your data is stored in memory in an efficient format. For help on deploying, the cluster mode overview describes the components involved in distributed operation and supported cluster managers.
Finally, full API documentation is available in Scala, Java and Python.