對(duì)Map的結(jié)果進(jìn)行排序并傳輸?shù)絉educe進(jìn)行處理 Map的結(jié)果并不是直接存放到硬盤,而是利用緩存做一些預(yù)排序處理 Map會(huì)調(diào)用Combiner,壓縮,按key進(jìn)行分區(qū)、排序等,盡量減少結(jié)果的大小 每個(gè)Map完成后都會(huì)通知Task,然后Reduce就可以進(jìn)行處理
當(dāng)Map程序開始產(chǎn)生結(jié)果的時(shí)候,并不是直接寫到文件的,而是利用緩存做一些排序方面的預(yù)處理操作
每個(gè)Map任務(wù)都有一個(gè)循環(huán)內(nèi)存緩沖區(qū)(默認(rèn)100MB),當(dāng)緩存的內(nèi)容達(dá)到80%時(shí),后臺(tái)線程開始將內(nèi)容寫到文件,此時(shí)Map任務(wù)可以繼續(xù)輸出結(jié)果,但如果緩沖區(qū)滿了,Map任務(wù)則需要等待
寫文件使用round-robin方式。在寫入文件之前,先將數(shù)據(jù)按照Reduce進(jìn)行分區(qū)。對(duì)于每一個(gè)分區(qū),都會(huì)在內(nèi)存中根據(jù)key進(jìn)行排序,如果配置了Combiner,則排序后執(zhí)行Combiner(Combine之后可以減少寫入文件和傳輸?shù)臄?shù)據(jù))
每次結(jié)果達(dá)到緩沖區(qū)的閥值時(shí),都會(huì)創(chuàng)建一個(gè)文件,在Map結(jié)束時(shí),可能會(huì)產(chǎn)生大量的文件。在Map完成前,會(huì)將這些文件進(jìn)行合并和排序。如果文件的數(shù)量超過(guò)3個(gè),則合并后會(huì)再次運(yùn)行Combiner(1、2個(gè)文件就沒(méi)有必要了)
如果配置了壓縮,則最終寫入的文件會(huì)先進(jìn)行壓縮,這樣可以減少寫入和傳輸?shù)臄?shù)據(jù)
一旦Map完成,則通知任務(wù)管理器,此時(shí)Reduce就可以開始復(fù)制結(jié)果數(shù)據(jù)
Map的結(jié)果文件都存放到運(yùn)行Map任務(wù)的機(jī)器的本地硬盤中
如果Map的結(jié)果很少,則直接放到內(nèi)存,否則寫入文件中
同時(shí)后臺(tái)線程將這些文件進(jìn)行合并和排序到一個(gè)更大的文件中(如果文件是壓縮的,則需要先解壓)
當(dāng)所有的Map結(jié)果都被復(fù)制和合并后,就會(huì)調(diào)用Reduce方法
Reduce結(jié)果會(huì)寫入到HDFS中
一般的原則是給shuffle分配盡可能多的內(nèi)存,但前提是要保證Map、Reduce任務(wù)有足夠的內(nèi)存
對(duì)于Map,主要就是避免把文件寫入磁盤,例如使用Combiner,增大io.sort.mb的值
對(duì)于Reduce,主要是把Map的結(jié)果盡可能地保存到內(nèi)存中,同樣也是要避免把中間結(jié)果寫入磁盤。默認(rèn)情況下,所有的內(nèi)存都是分配給Reduce方法的,如果Reduce方法不怎么消耗內(nèi)存,可以mapred.inmem.merge.threshold設(shè)成0,mapred.job.reduce.input.buffer.percent設(shè)成1.0
在任務(wù)監(jiān)控中可通過(guò)Spilled records counter來(lái)監(jiān)控寫入磁盤的數(shù),但這個(gè)值是包括map和reduce的
對(duì)于IO方面,可以Map的結(jié)果可以使用壓縮,同時(shí)增大buffer size(io.file.buffer.size,默認(rèn)4kb)
屬性 | 默認(rèn)值 | 描述 |
---|---|---|
io.sort.mb | 100 | 映射輸出分類時(shí)所使用緩沖區(qū)的大小. |
io.sort.record.percent | 0.05 | 剩余空間用于映射輸出自身記錄.在1.X發(fā)布后去除此屬性.隨機(jī)代碼用于使用映射所有內(nèi)存并記錄信息. |
io.sort.spill.percent | 0.80 | 針對(duì)映射輸出內(nèi)存緩沖和記錄索引的閾值使用比例. |
io.sort.factor | 10 | 文件分類時(shí)合并流的最大數(shù)量。此屬性也用于reduce。通常把數(shù)字設(shè)為100. |
min.num.spills.for.combine | 3 | 組合運(yùn)行所需最小溢出文件數(shù)目. |
mapred.compress.map.output | false | 壓縮映射輸出. |
mapred.map.output.compression.codec | DefaultCodec | 映射輸出所需的壓縮解編碼器. |
mapred.reduce.parallel.copies | 5 | 用于向reducer傳送映射輸出的線程數(shù)目. |
mapred.reduce.copy.backoff | 300 | 時(shí)間的最大數(shù)量,以秒為單位,這段時(shí)間內(nèi)若reducer失敗則會(huì)反復(fù)嘗試傳輸 |
io.sort.factor | 10 | 組合運(yùn)行所需最大溢出文件數(shù)目. |
mapred.job.shuffle.input.buffer.percent | 0.70 | 隨機(jī)復(fù)制階段映射輸出緩沖器的堆棧大小比例 |
mapred.job.shuffle.merge.percent | 0.66 | 用于啟動(dòng)合并輸出進(jìn)程和磁盤傳輸?shù)挠成漭敵鼍彌_器的閥值使用比例 |
mapred.inmem.merge.threshold | 1000 | 用于啟動(dòng)合并輸出和磁盤傳輸進(jìn)程的映射輸出的閥值數(shù)目。小于等于0意味著沒(méi)有門檻,而溢出行為由 mapred.job.shuffle.merge.percent單獨(dú)管理. |
mapred.job.reduce.input.buffer.percent | 0.0 | 用于減少內(nèi)存映射輸出的堆棧大小比例,內(nèi)存中映射大小不得超出此值。若reducer需要較少內(nèi)存則可以提高該值. |
更多建議: