forked from JerryLead/SparkInternals
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request JerryLead#20 from invkrh/en-ch5
[EN] translated 5-Architecture.md
- Loading branch information
Showing
1 changed file
with
275 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,275 @@ | ||
# Architecture | ||
|
||
We talked about spark jobs in chapter 3. In this chapter, we will talk about the architecture and how master, worker, driver and executors are coordinated to finish a job. | ||
|
||
> Feel free to skip code if you prefer to the pictures on the principle. | ||
|
||
## Deployment diagram | ||
|
||
We have seen the following diagram in `overview` chapter. | ||
|
||
![deploy](../PNGfigures/deploy.png) | ||
|
||
Next, we will talk about some details about it. | ||
|
||
## Job submission | ||
|
||
The picture below illustrates how driver program (on master node) produces job, and then submits them to worker nodes. | ||
|
||
![JobSubmission](../PNGfigures/JobSubmission.png) | ||
|
||
Driver side behavior is equivalent to the code below: | ||
|
||
```scala | ||
finalRDD.action() | ||
=> sc.runJob() | ||
|
||
// generate job, stages and tasks | ||
=> dagScheduler.runJob() | ||
=> dagScheduler.submitJob() | ||
=> dagSchedulerEventProcessActor ! JobSubmitted | ||
=> dagSchedulerEventProcessActor.JobSubmitted() | ||
=> dagScheduler.handleJobSubmitted() | ||
=> finalStage = newStage() | ||
=> mapOutputTracker.registerShuffle(shuffleId, rdd.partitions.size) | ||
=> dagScheduler.submitStage() | ||
=> missingStages = dagScheduler.getMissingParentStages() | ||
=> dagScheduler.subMissingTasks(readyStage) | ||
|
||
// add tasks to the taskScheduler | ||
=> taskScheduler.submitTasks(new TaskSet(tasks)) | ||
=> fifoSchedulableBuilder.addTaskSetManager(taskSet) | ||
|
||
// send tasks | ||
=> sparkDeploySchedulerBackend.reviveOffers() | ||
=> driverActor ! ReviveOffers | ||
=> sparkDeploySchedulerBackend.makeOffers() | ||
=> sparkDeploySchedulerBackend.launchTasks() | ||
=> foreach task | ||
CoarseGrainedExecutorBackend(executorId) ! LaunchTask(serializedTask) | ||
``` | ||
|
||
Explanation: | ||
|
||
When the following code gets evaluated, the program will launch a lot of driver communication, e.g. job's executors, threads, actors, etc. | ||
|
||
```scala | ||
val sc = new SparkContext(sparkConf) | ||
``` | ||
|
||
**This line defines the role of driver** | ||
|
||
### Job logical plan | ||
|
||
`transformation()` in driver program builds a computing chain (a series of `RDD`). In each `RDD`: | ||
- `compute()` function defines the computation of records for its partitions | ||
- `getDependencies()` function defines the dependency relationship across RDD partitions. | ||
|
||
### Job physical plan | ||
|
||
Each `action()` triggers a job: | ||
- During `dagScheduler.runJob()`, different stages are defined | ||
- During `submitStage()`, `ResultTasks` and `ShuffleMapTasks` needed by the stage are produced, then they are packaged in `TaskSet` and sent to `TaskScheduler`. If `TaskSet` can be executed, tasks will be submitted to `sparkDeploySchedulerBackend` which will distribute tasks. | ||
|
||
|
||
|
||
### Task distribution | ||
|
||
After `sparkDeploySchedulerBackend` gets `TaskSet`, the `Driver Actor` sends the serialized tasks to `CoarseGrainedExecutorBackend Actor` on worker node. | ||
|
||
## Job reception | ||
|
||
After receiving tasks, worker will do the following things: | ||
|
||
```scala | ||
coarseGrainedExecutorBackend ! LaunchTask(serializedTask) | ||
=> executor.launchTask() | ||
=> executor.threadPool.execute(new TaskRunner(taskId, serializedTask)) | ||
``` | ||
**Executor packages each task into `taskRunner`, and picks a free thread to run the task. A `CoarseGrainedExecutorBackend` process has only one executor** | ||
|
||
## Task execution | ||
|
||
The picture below shows the execution of a task distributed to worker node and how to process task results. | ||
|
||
![TaskExecution](../PNGfigures/taskexecution.png) | ||
|
||
After receiving a serialized task, the executor deserializes it to normal task, and then runs the task to get `directResult` which will be sent back to driver. But data package sent from `Actor` can not be too big: | ||
|
||
- If the result is too big (e.g. the one of `groupByKey`), it will be persisted to memory + hard disk managed by `blockManager`. Driver will only get `indirectResult` containing the storage location. When result is needed, driver will fetch them via http. | ||
- If the result is not big (less than `spark.akka.frameSize = 10MB`), then it will be directly sent to driver. | ||
|
||
**Some more details about `blockManager`:** | ||
|
||
When `directResult > akka.frameSize`, the `memoryStore` of `BlockManager` creates a `LinkedHashMap` to hold the data stored in memory whose size should be less than `Runtime.getRuntime.maxMemory * spark.storage.memoryFraction(default 0.6)`. If `LinkedHashMap` has no space to save the incoming data, these data will be sent to `diskStore` which persists data to hard disk if the data `storageLevel` contains "disk" | ||
|
||
```scala | ||
In TaskRunner.run() | ||
// deserialize task, run it and then send the result to | ||
=> coarseGrainedExecutorBackend.statusUpdate() | ||
=> task = ser.deserialize(serializedTask) | ||
=> value = task.run(taskId) | ||
=> directResult = new DirectTaskResult(ser.serialize(value)) | ||
=> if( directResult.size() > akkaFrameSize() ) | ||
indirectResult = blockManager.putBytes(taskId, directResult, MEMORY+DISK+SER) | ||
else | ||
return directResult | ||
=> coarseGrainedExecutorBackend.statusUpdate(result) | ||
=> driver ! StatusUpdate(executorId, taskId, result) | ||
``` | ||
The result produced by `ShuffleMapTask` and `ResultTask` are different. | ||
|
||
- `ShuffleMapTask` produces `MapStatus` containing 2 things: | ||
- the `BlockManagerId` of the task's `BlockManager`: (executorId + host, port, nettyPort) | ||
- the size of each output `FileSegment` of a task | ||
|
||
- `ResultTask` produces the execution result of the specified `function` on one partition | ||
e.g. the `function` of `count()` is simply for counting the number of records in partition. Since `ShuffleMapTask` needs `FileSegment` for writing on disk, `OutputStream` writers are needed. These writers are produced and managed by `blockManger` of `shuffleBlockManager` | ||
|
||
```scala | ||
In task.run(taskId) | ||
// if the task is ShuffleMapTask | ||
=> shuffleMapTask.runTask(context) | ||
=> shuffleWriterGroup = shuffleBlockManager.forMapTask(shuffleId, partitionId, numOutputSplits) | ||
=> shuffleWriterGroup.writers(bucketId).write(rdd.iterator(split, context)) | ||
=> return MapStatus(blockManager.blockManagerId, Array[compressedSize(fileSegment)]) | ||
|
||
//If the task is ResultTask | ||
=> return func(context, rdd.iterator(split, context)) | ||
``` | ||
|
||
A series of operations will be done after driver gets a task's result: | ||
|
||
`TaskScheduler` will be notified that the task is finished, and its result will be analyzed. | ||
- If it's a `indirectResult`, `BlockManager.getRemotedBytes()` will be invoked to fetch actual results. | ||
- If it is `ResultTask`, `ResultHandler()` invokes driver side computation on result (e.g. `count()` take `sum` operation on all ResultTask) | ||
- If it is `MapStatus` of `ShuffleMapTask`, then `MapStatus` will be put into `mapStatuses` of `mapOutputTrackerMaster`, which makes it more easy to be requested during reduce shuffle | ||
- If the received task on driver is the last task in the stage, then next stage will be submitted. If the stage is already the last one, `dagScheduler` will be told that the job is finished | ||
|
||
```scala | ||
After driver receives StatusUpdate(result) | ||
=> taskScheduler.statusUpdate(taskId, state, result.value) | ||
=> taskResultGetter.enqueueSuccessfulTask(taskSet, tid, result) | ||
=> if result is IndirectResult | ||
serializedTaskResult = blockManager.getRemoteBytes(IndirectResult.blockId) | ||
=> scheduler.handleSuccessfulTask(taskSetManager, tid, result) | ||
=> taskSetManager.handleSuccessfulTask(tid, taskResult) | ||
=> dagScheduler.taskEnded(result.value, result.accumUpdates) | ||
=> dagSchedulerEventProcessActor ! CompletionEvent(result, accumUpdates) | ||
=> dagScheduler.handleTaskCompletion(completion) | ||
=> Accumulators.add(event.accumUpdates) | ||
|
||
// If the finished task is ResultTask | ||
=> if (job.numFinished == job.numPartitions) | ||
listenerBus.post(SparkListenerJobEnd(job.jobId, JobSucceeded)) | ||
=> job.listener.taskSucceeded(outputId, result) | ||
=> jobWaiter.taskSucceeded(index, result) | ||
=> resultHandler(index, result) | ||
|
||
// if the finished task is ShuffleMapTask | ||
=> stage.addOutputLoc(smt.partitionId, status) | ||
=> if (all tasks in current stage have finished) | ||
mapOutputTrackerMaster.registerMapOutputs(shuffleId, Array[MapStatus]) | ||
mapStatuses.put(shuffleId, Array[MapStatus]() ++ statuses) | ||
=> submitStage(stage) | ||
``` | ||
|
||
## Shuffle read | ||
|
||
In the last paragraph, we talked about task execution and result processing, now we will talk about how reducer (tasks needs shuffle) gets the input data. The shuffle read part in last chapter has already talked about how reducer processes input data. | ||
|
||
**How does reducer know where to fetch data ?** | ||
|
||
![readMapStatus](../PNGfigures/readMapStatus.png) | ||
|
||
Reducer needs to know on which node the `FileSegments` produced by `ShuffleMapTask` of parent stage are. **This kind of information is sent to driver’s `mapOutputTrackerMaster` when `ShuffleMapTask` is finished. The information is also stored in `mapStatuses: HashMap[stageId, Array[MapStatus]]`**. Given `stageId`, we can get`Array[MapStatus]` which contains information about `FileSegments` produced by `ShuffleMapTasks`. `Array(taskId)` tells the location(`blockManagerId`) and the size of each `FileSegment`. | ||
|
||
When reducer need fetch input data, it will first invoke `blockStoreShuffleFetcher` to get input data’s location (`FileSegments`). `blockStoreShuffleFetcher` calls local `MapOutputTrackerWorker` to do the work. `MapOutputTrackerWorker` uses `mapOutputTrackerMasterActorRef` to communicate with `mapOutputTrackerMasterActor` in order to get `MapStatus`. `blockStoreShuffleFetcher` processes `MapStatus` and finds out where reducer should fetch `FileSegment` information, and then it stores this information in `blocksByAddress`. `blockStoreShuffleFetcher` tells `basicBlockFetcherIterator` to fetch `FileSegment` data. | ||
|
||
```scala | ||
rdd.iterator() | ||
=> rdd(e.g., ShuffledRDD/CoGroupedRDD).compute() | ||
=> SparkEnv.get.shuffleFetcher.fetch(shuffledId, split.index, context, ser) | ||
=> blockStoreShuffleFetcher.fetch(shuffleId, reduceId, context, serializer) | ||
=> statuses = MapOutputTrackerWorker.getServerStatuses(shuffleId, reduceId) | ||
|
||
=> blocksByAddress: Seq[(BlockManagerId, Seq[(BlockId, Long)])] = compute(statuses) | ||
=> basicBlockFetcherIterator = blockManager.getMultiple(blocksByAddress, serializer) | ||
=> itr = basicBlockFetcherIterator.flatMap(unpackBlock) | ||
``` | ||
|
||
![blocksByAddress](../PNGfigures/blocksByAddress.png) | ||
|
||
After `basicBlockFetcherIterator` received the task for data retrieving, it produces several `fetchRequest`s. **Each of them contains the tasks to fetch `FileSegment`s from several nodes. **According to the picture above, we know that `reducer-2` needs to fetch `FileSegment`(FS)(in white) from 3 worker nodes. The global data fetching task can be represented by `blockByAddress`: 4 blocks from node 1, 3 blocks from node 2, and 4 blocks from node 3 | ||
|
||
In order to accelerate data fetching process, it makes sense to divide the global tasks into sub tasks(`fetchRequest`), then every task takes a thread to fetch data. Spark launches 5 parallel threads for each reducer (same as Hadoop). Since the fetched data will be buffered into memory, one fetch is not able to take too much data (no more than `spark.reducer.maxMbInFlight=48MB`). **Note that `48MB` is shared by the 5 fetch threads,** so each sub task should take no more than `48MB / 5 = 9.6MB`. In the picture, on node 1, `size(FS0-2) + size(FS1-2) < 9.6MB, but size(FS0-2) + size(FS1-2) + size(FS2-2) > 9.6MB`, then we should break between `t1-r2` and `t2-r2`. As a result, we can see 2 `fetchRequest`s fetch data from node 1. **Will there be `fetchRequest` larger than 9.6MB?** The answer is yes. If one `FileSegment` is too large, it still needs to be fetched in one shot. Besides, if reducer needs some `FileSegment`s already exist on the local, it will do local read. At the end of shuffle read, it will deserialize fetched `FileSegment` and offer record iterators to `RDD.compute()` | ||
|
||
```scala | ||
In basicBlockFetcherIterator: | ||
|
||
// generate the fetch requests | ||
=> basicBlockFetcherIterator.initialize() | ||
=> remoteRequests = splitLocalRemoteBlocks() | ||
=> fetchRequests ++= Utils.randomize(remoteRequests) | ||
|
||
// fetch remote blocks | ||
=> sendRequest(fetchRequests.dequeue()) until Size(fetchRequests) > maxBytesInFlight | ||
=> blockManager.connectionManager.sendMessageReliably(cmId, | ||
blockMessageArray.toBufferMessage) | ||
=> fetchResults.put(new FetchResult(blockId, sizeMap(blockId))) | ||
=> dataDeserialize(blockId, blockMessage.getData, serializer) | ||
|
||
// fetch local blocks | ||
=> getLocalBlocks() | ||
=> fetchResults.put(new FetchResult(id, 0, () => iter)) | ||
``` | ||
|
||
Some more details: | ||
|
||
**How does the reducer send `fetchRequest` to the target node? How does the target node process `fetchRequest`, read and send back `FileSegment` to reducer?** | ||
|
||
![fetchrequest](../PNGfigures/fetchrequest.png) | ||
|
||
When `RDD.iterator()` meets `ShuffleDependency`, `BasicBlockFetcherIterator` will be called to fetch `FileSegment`s. `BasicBlockFetcherIterator` uses `connectionManager` of `blockManger` to send `fetchRequest` to `connectionManager`s on the other nodes. NIO is used for communication between `connectionManager`s. On the other nodes, for example, after `connectionManager` on worker node 2 receives a message, it will forward to `blockManager`. The latter uses `diskStore` to read `FileSegment`s requested by `fetchRequest` locally, they will still be sent back by `connectionManager`. If `FileConsolidation` is activated, `diskStore` needs the location of `blockId` given by `shuffleBolockManager`. If `FileSegment` is no more than `spark.storage.memoryMapThreshold = 8KB`, then diskStore will put `FileSegment` into memory when reading it, otherwise, The memory mapping method in `FileChannel` of `RandomAccessFile` will be used to read `FileSegment`, thus large `FileSegment` can be loaded into memory. | ||
|
||
When `BasicBlockFetcherIterator` receives serialized `FileSegments` from the other nodes, it will deserialize and put them in `fetchResults.Queue`. You may notice that **`fetchResults.Queue` is similar to `softBuffer` in `Shuffle detials` chapter.** If the `FileSegment`s needed by `BasicBlockFetcherIterator` are local, they will be found locally by `diskStore`, and put in `fetchResults`, finally, reducer reads the records from `FileSegment` and processes them. | ||
|
||
```scala | ||
After the blockManager receives the fetch request | ||
|
||
=> connectionManager.receiveMessage(bufferMessage) | ||
=> handleMessage(connectionManagerId, message, connection) | ||
|
||
// invoke blockManagerWorker to read the block (FileSegment) | ||
=> blockManagerWorker.onBlockMessageReceive() | ||
=> blockManagerWorker.processBlockMessage(blockMessage) | ||
=> buffer = blockManager.getLocalBytes(blockId) | ||
=> buffer = diskStore.getBytes(blockId) | ||
=> fileSegment = diskManager.getBlockLocation(blockId) | ||
=> shuffleManager.getBlockLocation() | ||
=> if(fileSegment < minMemoryMapBytes) | ||
buffer = ByteBuffer.allocate(fileSegment) | ||
else | ||
channel.map(MapMode.READ_ONLY, segment.offset, segment.length) | ||
``` | ||
|
||
Every reducer has a `BasicBlockFetcherIterator`, and one `BasicBlockFetcherIterator` could, in theory, hold 48MB `fetchResults`. As soon as one `FileSegment` in `fetchResults` is read off, some `FileSegment`s will be fetched to fill the 48MB. | ||
|
||
```scala | ||
BasicBlockFetcherIterator.next() | ||
=> result = results.task() | ||
=> while (!fetchRequests.isEmpty && | ||
(bytesInFlight == 0 || bytesInFlight + fetchRequests.front.size <= maxBytesInFlight)) { | ||
sendRequest(fetchRequests.dequeue()) | ||
} | ||
=> result.deserialize() | ||
``` | ||
|
||
## Discussion | ||
|
||
In terms of architecture design, functionalities and modules are pretty independent. `BlockManager` is well designed, but it seems to manage too many things (data block, memory, disk, network communication) | ||
|
||
This chapter discussed about how the modules of spark system are coordinated to finish a job (production, submission, execution, results collection, results computation and shuffle). A lot of code is pasted, many pictures are drawn. More details can be found in source code, if you want. | ||
|
||
If you also want to know more about `blockManager`, please refer to Jerry Shao's [blog](http://jerryshao.me/architecture/2013/10/08/spark-storage-module-analysis/) (in Chinese). |