Hadoop 流

2018-01-07 15:06 更新

Hadoop流是Hadoop發(fā)行版附帶的一個實用程序。此實用程序允許您使用任何可執(zhí)行文件或腳本作為映射程序和/或reducer創(chuàng)建和運行Map / Reduce作業(yè)。

使用Python的示例

對于Hadoop流,我們正在考慮字?jǐn)?shù)問題。 Hadoop中的任何作業(yè)必須有兩個階段:mapper和reducer。我們已經(jīng)為python腳本中的mapper和reducer編寫了代碼,以便在Hadoop下運行它。也可以在Perl和Ruby中寫同樣的內(nèi)容。

映射器階段代碼

!/usr/bin/python
import sys
# Input takes from standard input for myline in sys.stdin: 
# Remove whitespace either side myline = myline.strip() 
# Break the line into words words = myline.split() 
# Iterate the words list for myword in words: 
# Write the results to standard output print '%s	%s' % (myword, 1)

確保此文件具有執(zhí)行權(quán)限(chmod + x /home/expert /hadoop-1.2.1 / mapper.py)。

減速器階段代碼

#!/usr/bin/python
from operator import itemgetter 
import sys 
current_word = ""
current_count = 0 
word = "" 
# Input takes from standard input for myline in sys.stdin: 
# Remove whitespace either side myline = myline.strip() 
# Split the input we got from mapper.py word, count = myline.split('	', 1) 
# Convert count variable to integer 
   try: 
      count = int(count) 
except ValueError: 
   # Count was not a number, so silently ignore this line continue
if current_word == word: 
   current_count += count 
else: 
   if current_word: 
      # Write result to standard output print '%s	%s' % (current_word, current_count) 
   current_count = count
   current_word = word
# Do not forget to output the last word if needed! 
if current_word == word: 
   print '%s	%s' % (current_word, current_count)

將mapper和reducer代碼保存在Hadoop主目錄中的mapper.py和reducer.py中。確保這些文件具有執(zhí)行權(quán)限(chmod + x mapper.py和chmod + x reducer.py)。因為python是縮進敏感所以相同的代碼可以從下面的鏈接下載。

執(zhí)行WordCount程序

$ $HADOOP_HOME/bin/hadoop jar contrib/streaming/hadoop-streaming-1.
2.1.jar 
   -input input_dirs  
   -output output_dir  
   -mapper <path/mapper.py  
   -reducer <path/reducer.py

其中“\”用于行連續(xù)以便清楚可讀性。

例如:

./bin/hadoop jar contrib/streaming/hadoop-streaming-1.2.1.jar -input myinput -output myoutput -mapper /home/expert/hadoop-1.2.1/mapper.py -reducer /home/expert/hadoop-1.2.1/reducer.py

流如何工作

在上面的示例中,mapper和reducer都是從標(biāo)準(zhǔn)輸入讀取輸入并將輸出發(fā)送到標(biāo)準(zhǔn)輸出的python腳本。該實用程序?qū)?chuàng)建一個Map / Reduce作業(yè),將作業(yè)提交到適當(dāng)?shù)娜杭?,并監(jiān)視作業(yè)的進度,直到作業(yè)完成。

當(dāng)為映射器指定腳本時,每個映射器任務(wù)將在映射器初始化時作為單獨的進程啟動腳本。當(dāng)映射器任務(wù)運行時,它將其輸入轉(zhuǎn)換為行,并將這些行饋送到進程的標(biāo)準(zhǔn)輸入(STDIN)。同時,映射器從進程的標(biāo)準(zhǔn)輸出(STDOUT)收集面向行的輸出,并將每行轉(zhuǎn)換為鍵/值對,作為映射器的輸出收集。默認(rèn)情況下,直到第一個制表符字符的行的前綴是鍵,行的其余部分(不包括制表符字符)將是值。如果行中沒有制表符,則整個行被視為鍵,值為null。但是,這可以根據(jù)一個需要定制。

當(dāng)為reducer指定腳本時,每個reducer任務(wù)將作為單獨的進程啟動腳本,然后初始化reducer。當(dāng)reducer任務(wù)運行時,它將其輸入鍵/值對轉(zhuǎn)換為行,并將行饋送到進程的標(biāo)準(zhǔn)輸入(STDIN)。同時,reducer從進程的標(biāo)準(zhǔn)輸出(STDOUT)收集面向行的輸出,將每行轉(zhuǎn)換為鍵/值對,將其作為reducer的輸出進行收集。默認(rèn)情況下,直到第一個制表符字符的行的前綴是鍵,行的其余部分(不包括制表符字符)是值。但是,這可以根據(jù)特定要求進行定制。

重要命令

參數(shù)描述
-input directory/file-name輸入mapper的位置。(需要)
-output directory-name減速器的輸出位置。(需要)
-mapper executable or script or JavaClassNameMapper可執(zhí)行文件。(需要)
-reducer executable or script or JavaClassNameReducer可執(zhí)行文件。(需要)
-file file-name使mapper,reducer或combiner可執(zhí)行文件在計算節(jié)點本地可用。
-inputformat JavaClassName你提供的類應(yīng)該返回Text類的鍵/值對。如果未指定,則使用TextInputFormat作為默認(rèn)值。
-outputformat JavaClassName您提供的類應(yīng)該采用Text類的鍵/值對。如果未指定,則使用TextOutputformat作為默認(rèn)值。
-partitioner JavaClassName確定將鍵發(fā)送到哪個reduce的類。
-combiner streamingCommand or JavaClassName組合器可執(zhí)行映射輸出。
-cmdenv name=value將環(huán)境變量傳遞到流式命令。
-inputreader對于向后兼容性:指定記錄讀取器類(而不是輸入格式類)。
-verbose詳細(xì)輸出。
-lazyOutput創(chuàng)建輸出延遲。例如,如果輸出格式基于FileOutputFormat,則輸出文件僅在首次調(diào)用output.collect(或Context.write)時創(chuàng)建。
-numReduceTasks指定Reducer的數(shù)量。
-mapdebug映射任務(wù)失敗時調(diào)用的腳本。
-reducedebug當(dāng)reduce任務(wù)失敗時調(diào)用的腳本。
以上內(nèi)容是否對您有幫助:
在線筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號
微信公眾號

編程獅公眾號