Skip to content

Commit

Permalink
fix typo by zzl0
Browse files Browse the repository at this point in the history
  • Loading branch information
JerryLead committed Aug 25, 2014
1 parent e2e9418 commit 9224d7c
Show file tree
Hide file tree
Showing 5 changed files with 10 additions and 16 deletions.
8 changes: 1 addition & 7 deletions markdown/1-Overview.md
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ object GroupByTest {
- 执行 RDD 上的 transformation 操作(这里是 flatMap)以后,生成 FlatMappedRDD,其中每个 partition 包含一个 Array[(Int, Array[Byte])]
- 第一个 count() 执行时,先在每个 partition 上执行 count,然后执行结果被发送到 driver,最后在 driver 端进行 sum。
- 由于 FlatMappedRDD 被 cache 到内存,因此这里将里面的 partition 都换了一种颜色表示。
- groupByKey 产生了后面三个 RDD,为什么产生这三个在后面章节讨论
- groupByKey 产生了后面两个 RDD,为什么产生这两个在后面章节讨论
- 如果 job 需要 shuffle,一般会产生 ShuffledRDD。该 RDD 与前面的 RDD 的关系类似于 Hadoop 中 mapper 输出数据与 reducer 输入数据之间的关系。
- MapPartitionsRDD 里包含 groupByKey() 的结果。
- 最后将 MapPartitionsRDD 中的 每个value(也就是Array[Byte])都转换成 Iterable 类型。
Expand Down Expand Up @@ -168,9 +168,3 @@ object GroupByTest {
6. cache机制
7. broadcast 机制


Hi,文章写得很赞~关于OverView中如何配置多个Backend进程的问题:在Worker Actor中,每次LaunchExecutor会创建一个Backend进程,它们是1对1的关系。也就是说集群里启动多少Executor实例就有多少Backend进程。

Backend个数发生变化情况:1、启动一个新的Application(每个APP都会launceExecutor,此时会生成此进程)2、还可以通过设置SPARK\_WORKER\_INSTANCES参数来增加Backend个数。图可以依此稍做改动。

Backend进程是SparkContext初始化taskcScheduler,taskcScheduler初始化SparkDeploySchedulerBackend里appDesc里的command...顺藤摸瓜即可。。CoarseGrainedExecutorBackend
2 changes: 1 addition & 1 deletion markdown/2-JobLogicalPlan.md
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ val pairs = sc.parallelize(List(1, 2, 3, 4, 5), 3)

![reduceyByKey](PNGfigures/reduceByKey.png)

reduceyByKey() 相当于传统的 MapReduce,整个也数据流与 Hadoop 中的数据流基本一样。reduceyByKey() 默认在 map 端开启 combine(),因此在 shuffle 之前先通过 mapPartitions 操作进行 combine,得到 MapPartitionsRDD,然后 shuffle 得到 ShuffledRDD,然后再进行 reduce(通过 aggregate + mapPartitions() 操作来实现)得到 MapPartitionsRDD。
reduceyByKey() 相当于传统的 MapReduce,整个数据流也与 Hadoop 中的数据流基本一样。reduceyByKey() 默认在 map 端开启 combine(),因此在 shuffle 之前先通过 mapPartitions 操作进行 combine,得到 MapPartitionsRDD,然后 shuffle 得到 ShuffledRDD,然后再进行 reduce(通过 aggregate + mapPartitions() 操作来实现)得到 MapPartitionsRDD。

**3) distinct(numPartitions)**

Expand Down
14 changes: 7 additions & 7 deletions markdown/3-JobPhysicalPlan.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

所有的粗箭头组合成第一个 task,该 task 计算结束后顺便将 CoGroupedRDD 中已经计算得到的第二个和第三个 partition 存起来。之后第二个 task(细实线)只需计算两步,第三个 task(细虚线)也只需要计算两步,最后得到结果。

这个想法由两个不靠谱的地方
这个想法有两个不靠谱的地方

- 第一个 task 太大,碰到 ShuffleDependency 后,不得不计算 shuffle 依赖的 RDDs 的所有 partitions,而且都在这一个 task 里面计算。
- 需要设计巧妙的算法来判断哪个 RDD 中的哪些 partition 需要 cache。而且 cache 会占用存储空间。
Expand Down Expand Up @@ -113,7 +113,7 @@
## 生成 job
前面介绍了逻辑和物理执行图的生成原理,那么,**怎么触发 job 的生成?已经介绍了 task,那么 job 是什么?**

下表列出了可以触发生成执行图生成典型 [action()](http://spark.apache.org/docs/latest/programming-guide.html#actions),其中第二列是 `processPartition()`,定义如何计算 partition 中的 records 得到 result。第三列是 `resultHandler()`,定义如何对从各个 partition 收集来的 results 进行计算来得到最终结果。
下表列出了可以触发执行图生成的典型 [action()](http://spark.apache.org/docs/latest/programming-guide.html#actions),其中第二列是 `processPartition()`,定义如何计算 partition 中的 records 得到 result。第三列是 `resultHandler()`,定义如何对从各个 partition 收集来的 results 进行计算来得到最终结果。


| Action | finalRDD(records) => result | compute(results) |
Expand All @@ -122,9 +122,9 @@
| collect() |Array[records] => result | Array[result] |
| count() | count(records) => result | sum(result) |
| foreach(f) | f(records) => result | Array[result] |
| take(n) | record (i<=n) => result | Array{result] |
| first() | record 1 => result | Array{result] |
| takeSample() | selected records => result | Array{result] |
| take(n) | record (i<=n) => result | Array[result] |
| first() | record 1 => result | Array[result] |
| takeSample() | selected records => result | Array[result] |
| takeOrdered(n, [ordering]) | TopN(records) => result | TopN(results) |
| saveAsHadoopFile(path) | records => write(records) | null |
| countByKey() | (K, V) => Map(K, count(K)) | (Map, Map) => Map(K, count(K)) |
Expand Down Expand Up @@ -155,7 +155,7 @@
1. 先确定该 stage 的 missingParentStages,使用`getMissingParentStages(stage)`。如果 parentStages 都可能已经执行过了,那么就为空了。
2. 如果 missingParentStages 不为空,那么先递归提交 missing 的 parent stages,并将自己加入到 waitingStages 里面,等到 parent stages 执行结束后,会触发提交 waitingStages 里面的 stage。
3. 如果 missingParentStages 为空,说明该 stage 可以立即执行,那么就调用`submitMissingTasks(stage, jobId)`来生成和提交具体的 task。如果 stage 是 ShuffleMapStage,那么 new 出来与该 stage 最后一个 RDD 的 partition 数相同的 ShuffleMapTasks。如果 stage 是 ResultStage,那么 new 出来与 stage 最后一个 RDD 的 partition 个数相同的 ResultTasks。一个 stage 里面的 task 组成一个 TaskSet,最后调用`taskScheduler.submitTasks(taskSet)`来提交一整个 taskSet。
4. 这个 taskScheduler 类型是 TaskSchedulerImpl,在 submitTasks() 里面,每一个 taskSet 被包装成 manager: TaskSetMananger,然后交给`schedulableBuilder.addTaskSetManager(manager)`。schedulableBuilder 可以是 FIFOSchedulableBuilder 或者 FairSchedulableBuilder 调度器。submitTasks() 最后一步时通知`backend.reviveOffers()`去执行 task,backend 的类型是 SchedulerBackend。如果在集群上运行,那么这个 backend 类型是 SparkDeploySchedulerBackend。
4. 这个 taskScheduler 类型是 TaskSchedulerImpl,在 submitTasks() 里面,每一个 taskSet 被包装成 manager: TaskSetMananger,然后交给`schedulableBuilder.addTaskSetManager(manager)`。schedulableBuilder 可以是 FIFOSchedulableBuilder 或者 FairSchedulableBuilder 调度器。submitTasks() 最后一步是通知`backend.reviveOffers()`去执行 task,backend 的类型是 SchedulerBackend。如果在集群上运行,那么这个 backend 类型是 SparkDeploySchedulerBackend。
5. SparkDeploySchedulerBackend 是 CoarseGrainedSchedulerBackend 的子类,`backend.reviveOffers()`其实是向 DriverActor 发送 ReviveOffers 信息。SparkDeploySchedulerBackend 在 start() 的时候,会启动 DriverActor。DriverActor 收到 ReviveOffers 消息后,会调用`launchTasks(scheduler.resourceOffers(Seq(new WorkerOffer(executorId, executorHost(executorId), freeCores(executorId)))))` 来 launch tasks。scheduler 就是 TaskSchedulerImpl。`scheduler.resourceOffers()`从 FIFO 或者 Fair 调度器那里获得排序后的 TaskSetManager,并经过`TaskSchedulerImpl.resourceOffer()`,考虑 locality 等因素来确定 task 的全部信息 TaskDescription。调度细节这里暂不讨论。
6. DriverActor 中的 launchTasks() 将每个 task 序列化,如果序列化大小不超过 Akka 的 akkaFrameSize,那么直接将 task 送到 executor 那里执行`executorActor(task.executorId) ! LaunchTask(new SerializableBuffer(serializedTask))`

Expand All @@ -176,6 +176,7 @@
从逻辑执行图的建立,到将其转换成物理执行图的过程很经典,过程中的 dependency 划分,pipeline,stage 分割,task 生成 都是有条不紊,有理有据的。

## ComplexJob 的源代码

```scala
package internals

Expand All @@ -189,7 +190,6 @@ object complexJob {

val sc = new SparkContext("local", "ComplexJob test")


val data1 = Array[(Int, Char)](
(1, 'a'), (2, 'b'),
(3, 'c'), (4, 'd'),
Expand Down
2 changes: 1 addition & 1 deletion markdown/4-shuffleDetails.md
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,6 @@ ExternalAppendOnlyMap 持有一个 AppendOnlyMap,shuffle 来的一个个 (K, V
## Discussion
通过本章的介绍可以发现,相比 MapReduce 固定的 shuffle-combine-merge-reduce 策略,Spark 更加灵活,会根据不同的 transformation() 的语义去设计不同的 shuffle-aggregate 策略,再加上不同的内存数据结构来混搭出合理的执行流程。

这章主要讨论了 Spark 是怎么在不排序 records 的情况下完成 shuffle write 和 shuffle write,以及怎么将 shuffle 过程融入 RDD computing chain 中的。附带讨论了内存与磁盘的平衡以及与 Hadoop MapReduce shuffle 的异同。下一章将从部署图以及进程通信角度来描述 job 执行的整个流程,也会涉及 shuffle write 和 shuffle read 中的数据位置获取问题。
这章主要讨论了 Spark 是怎么在不排序 records 的情况下完成 shuffle write 和 shuffle read,以及怎么将 shuffle 过程融入 RDD computing chain 中的。附带讨论了内存与磁盘的平衡以及与 Hadoop MapReduce shuffle 的异同。下一章将从部署图以及进程通信角度来描述 job 执行的整个流程,也会涉及 shuffle write 和 shuffle read 中的数据位置获取问题。

另外,Jerry Shao 写的 [详细探究Spark的shuffle实现](http://jerryshao.me/architecture/2014/01/04/spark-shuffle-detail-investigation/) 很赞,里面还介绍了 shuffle 过程在 Spark 中的进化史。目前 sort-based 的 shuffle 也在实现当中,stay tuned。
Expand Down
Binary file added pdf/.DS_Store
Binary file not shown.

0 comments on commit 9224d7c

Please sign in to comment.