全國(guó)咨詢(xún)/投訴熱線(xiàn):400-618-4000

首頁(yè)技術(shù)文章正文

Spark Shuffle的Write階段和Read階段

更新時(shí)間:2023-08-23 來(lái)源:黑馬程序員 瀏覽量:

Spark在DAG調(diào)度階段會(huì)將一個(gè)Job劃分為多個(gè)Stage,上游Stage做map工作,下游Stage做reduce工作,其本質(zhì)上還是MapReduce計(jì)算框架。Shuffle是連接map和reduce之間的橋梁,它將map的輸出對(duì)應(yīng)到reduce輸入中,涉及到序列化反序列化、跨節(jié)點(diǎn)網(wǎng)絡(luò)IO以及磁盤(pán)讀寫(xiě)IO等。

shuffle.png

Spark的Shuffle分為Write和Read兩個(gè)階段,分屬于兩個(gè)不同的Stage,前者是Parent Stage的最后一步,后者是Child Stage的第一步。

執(zhí)行Shuffle的主體是Stage中的并發(fā)任務(wù),這些任務(wù)分ShuffleMapTask和ResultTask兩種,ShuffleMapTask要進(jìn)行Shuffle,ResultTask負(fù)責(zé)返回計(jì)算結(jié)果,一個(gè)Job中只有最后的Stage采用ResultTask,其他的均為ShuffleMapTask。如果要按照map端和reduce端來(lái)分析的話(huà),ShuffleMapTask可以即是map端任務(wù),又是reduce端任務(wù),因?yàn)镾park中的Shuffle是可以串行的;ResultTask則只能充當(dāng)reduce端任務(wù)的角色。

stage

Spark在1.1以前的版本一直是采用Hash Shuffle的實(shí)現(xiàn)的方式,到1.1版本時(shí)參考Hadoop MapReduce的實(shí)現(xiàn)開(kāi)始引入Sort Shuffle,在1.5版本時(shí)開(kāi)始Tungsten鎢絲計(jì)劃,引入U(xiǎn)nSafe Shuffle優(yōu)化內(nèi)存及CPU的使用,在1.6中將Tungsten統(tǒng)一到Sort Shuffle中,實(shí)現(xiàn)自我感知選擇最佳Shuffle方式,到的2.0版本,Hash Shuffle已被刪除,所有Shuffle方式全部統(tǒng)一到Sort Shuffle一個(gè)實(shí)現(xiàn)中。

1692771388826_2.png

在Spark的中,負(fù)責(zé)shuffle過(guò)程的執(zhí)行、計(jì)算和處理的組件主要就是ShuffleManager,也即shuffle管理器。ShuffleManager隨著Spark的發(fā)展有兩種實(shí)現(xiàn)的方式,分別為HashShuffleManager和SortShuffleManager,因此spark的Shuffle有Hash Shuffle和Sort Shuffle兩種。

在Spark 1.2以前,默認(rèn)的shuffle計(jì)算引擎是HashShuffleManager。該ShuffleManager而HashShuffleManager有著一個(gè)非常嚴(yán)重的弊端,就是會(huì)產(chǎn)生大量的中間磁盤(pán)文件,進(jìn)而由大量的磁盤(pán)IO操作影響了性能。

因此在Spark 1.2以后的版本中,默認(rèn)的ShuffleManager改成了SortShuffleManager。SortShuffleManager相較于HashShuffleManager來(lái)說(shuō),有了一定的改進(jìn)。主要就在于,每個(gè)Task在進(jìn)行shuffle操作時(shí),雖然也會(huì)產(chǎn)生較多的臨時(shí)磁盤(pán)文件,但

是最后會(huì)將所有的臨時(shí)文件合并(merge)成一個(gè)磁盤(pán)文件,因此每個(gè)Task就只有一個(gè)磁盤(pán)文件。在下一個(gè)stage的shuffle read task拉取自己的數(shù)據(jù)時(shí),只要根據(jù)索引讀取每個(gè)磁盤(pán)文件中的部分?jǐn)?shù)據(jù)即可。

Shuffle前十今生

Shuffle階段劃分:

shuffle write:mapper階段,上一個(gè)stage得到最后的結(jié)果寫(xiě)出

shuffle read :reduce階段,下一個(gè)stage拉取上一個(gè)stage進(jìn)行合并

1)未經(jīng)優(yōu)化的hashShuffleManager:

HashShuffle是根據(jù)task的計(jì)算結(jié)果的key值的hashcode%ReduceTask來(lái)決定放入哪一個(gè)區(qū)分,這樣保證相同的數(shù)據(jù)一定放入一個(gè)分區(qū),Hash Shuffle過(guò)程如下:

Shuffle階段劃分

根據(jù)下游的task決定生成幾個(gè)文件,先生成緩沖區(qū)文件在寫(xiě)入磁盤(pán)文件,再將block文件進(jìn)行合并。

未經(jīng)優(yōu)化的shuffle write操作所產(chǎn)生的磁盤(pán)文件的數(shù)量是極其驚人的。提出如下解決方案

2)經(jīng)過(guò)優(yōu)化的hashShuffleManager:

在shuffle write過(guò)程中,task就不是為下游stage的每個(gè)task創(chuàng)建一個(gè)磁盤(pán)文件了。此時(shí)會(huì)出現(xiàn)shuffleFileGroup的概念,每個(gè)shuffleFileGroup會(huì)對(duì)應(yīng)一批磁盤(pán)文件,每一個(gè)Group磁盤(pán)文件的數(shù)量與下游stage的task數(shù)量是相同的。


分享到:
在線(xiàn)咨詢(xún) 我要報(bào)名
和我們?cè)诰€(xiàn)交談!