Hadoop MapReduce

2021-06-30 16:43 更新

MapReduce是一個(gè)框架,我們可以使用它來編寫應(yīng)用程序,以可靠的方式并行地處理大量商品硬件群集上的大量數(shù)據(jù)。

什么是MapReduce?

MapReduce是一種基于java的分布式計(jì)算的處理技術(shù)和程序模型。 MapReduce算法包含兩個(gè)重要任務(wù),即Map和Reduce。Map采用一組數(shù)據(jù)并將其轉(zhuǎn)換為另一組數(shù)據(jù),其中各個(gè)元素被分解為元組(鍵/值對)。其次,reduce任務(wù),它將map的輸出作為輸入,并將這些數(shù)據(jù)元組合并成一組較小的元組。作為MapReduce名稱的順序,reduce任務(wù)總是在map作業(yè)之后執(zhí)行。

MapReduce的主要優(yōu)點(diǎn)是易于在多個(gè)計(jì)算節(jié)點(diǎn)上擴(kuò)展數(shù)據(jù)處理。在MapReduce模型下,數(shù)據(jù)處理原語稱為映射器和縮減器。將數(shù)據(jù)處理應(yīng)用程序分解為映射器和簡化器有時(shí)并不重要。但是,一旦我們以MapReduce形式編寫應(yīng)用程序,擴(kuò)展應(yīng)用程序以在集群中運(yùn)行數(shù)百,數(shù)千甚至數(shù)萬臺機(jī)器只是一種配置更改。這種簡單的可擴(kuò)展性是吸引許多程序員使用MapReduce模型的原因。

算法

  • 通常MapReduce范例是基于將計(jì)算機(jī)發(fā)送到數(shù)據(jù)所在的位置!

  • MapReduce程序在三個(gè)階段執(zhí)行,即map階段,shuffle階段和reduce階段。

    • Map 階段映射或映射器的作業(yè)是處理輸入數(shù)據(jù)。一般來說,輸入數(shù)據(jù)是以文件或目錄的形式存儲在Hadoop文件系統(tǒng)(HDFS)中。輸入文件逐行傳遞到映射器函數(shù)。映射器處理數(shù)據(jù)并創(chuàng)建幾個(gè)小塊的數(shù)據(jù)。

    • Reduce 階段這個(gè)階段是Shuffle階段和Reduce階段的組合。 Reducer的工作是處理來自映射器的數(shù)據(jù)。處理后,它產(chǎn)生一組新的輸出,將存儲在HDFS中。

  • 在MapReduce作業(yè)期間,Hadoop將Map和Reduce任務(wù)發(fā)送到集群中的相應(yīng)服務(wù)器。

  • 該框架管理數(shù)據(jù)傳遞的所有細(xì)節(jié),例如發(fā)出任務(wù),驗(yàn)證任務(wù)完成,以及在節(jié)點(diǎn)之間復(fù)制集群周圍的數(shù)據(jù)。

  • 大多數(shù)計(jì)算發(fā)生在節(jié)點(diǎn)上,本地磁盤上的數(shù)據(jù)減少了網(wǎng)絡(luò)流量。

  • 完成給定任務(wù)后,集群收集并減少數(shù)據(jù)以形成適當(dāng)?shù)慕Y(jié)果,并將其發(fā)送回Hadoop服務(wù)器。

MapReduce的算法

輸入和輸出(Java透視圖)

MapReduce框架對<key,value>對進(jìn)行操作,也就是說,框架將作業(yè)的輸入視為一組<key,value>對,并生成一組<key,value>對作為作業(yè)輸出,可能是不同類型。

鍵和值類應(yīng)該由框架以序列化的方式,因此,需要實(shí)現(xiàn)Writable接口。此外,鍵類必須實(shí)現(xiàn)Writable-Comparable接口,以方便框架進(jìn)行排序。MapReduce作業(yè)的輸入和輸出類型:(輸入)<k1,v1> - > map - > <k2,v2> - > reduce - > <k3,v3>(輸出)。

 輸入輸出
Map<k1, v1>list (<k2, v2>)
Reduce<k2, list(v2)>list (<k3, v3>)

術(shù)語

  • PayLoad - 應(yīng)用程序?qū)崿F(xiàn)Map和Reduce功能,并形成作業(yè)的核心。

  • Mapper- 映射器將輸入鍵/值對映射到一組中間鍵/值對。

  • NamedNode - 管理Hadoop分布式文件系統(tǒng)(HDFS)的節(jié)點(diǎn)。

  • DataNode - 在任何處理發(fā)生之前提前呈現(xiàn)數(shù)據(jù)的節(jié)點(diǎn)。

  • MasterNode - JobTracker運(yùn)行并接受來自客戶端的作業(yè)請求的節(jié)??點(diǎn)。

  • SlaveNode - Map和Reduce程序運(yùn)行的節(jié)點(diǎn)。

  • JobTracker - 計(jì)劃作業(yè)并跟蹤將作業(yè)分配給任務(wù)跟蹤器。

  • Task Tracker - 跟蹤任務(wù)并向JobTracker報(bào)告狀態(tài)。

  • Job - 程序是跨數(shù)據(jù)集的Mapper和Reducer的執(zhí)行

  • Task - 在一個(gè)數(shù)據(jù)片段上執(zhí)行Mapper或Reducer。

  • Task Attempt - 嘗試在SlaveNode上執(zhí)行任務(wù)的特定實(shí)例。

示例場景

下面給出了關(guān)于組織的電力消耗的數(shù)據(jù)。它包含每月的電力消耗和各年的年平均值。

 一月二月三月四月五月六月七月八月九月十月十一月十二月平均
1979年2323243242526262626252625
1980年26272828283031313130303029
1981年31323232333435363634343434
1984年39383939394142434039383840
1985年38393939394141410040393945

如果上述數(shù)據(jù)作為輸入,我們必須編寫應(yīng)用程序來處理它,并產(chǎn)生結(jié)果,如找到最大使用年份,最小使用年份等。這是一個(gè)對于有限數(shù)量的記錄的程序員的walkover。它們將簡單地寫入邏輯以產(chǎn)生所需的輸出,并將數(shù)據(jù)傳遞給所寫的應(yīng)用程序。

但是,考慮一個(gè)特定國家的所有大型產(chǎn)業(yè)的電力消耗的數(shù)據(jù),因?yàn)樗男纬伞?/span>

當(dāng)我們編寫應(yīng)用程序來處理這樣的批量數(shù)據(jù)時(shí),

  • 他們將需要很多時(shí)間來執(zhí)行。
  • 當(dāng)我們將數(shù)據(jù)從源服務(wù)器移動到網(wǎng)絡(luò)服務(wù)器時(shí),會有很大的網(wǎng)絡(luò)流量,等等。

為了解決這些問題,我們有MapReduce框架。

輸入數(shù)據(jù)

上述數(shù)據(jù)保存為sample.txt并作為輸入。輸入文件如下所示。

1979   23   23   2   43   24   25   26   26   26   26   25   26  25 
1980   26   27   28  28   28   30   31   31   31   30   30   30  29 
1981   31   32   32  32   33   34   35   36   36   34   34   34  34 
1984   39   38   39  39   39   41   42   43   40   39   38   38  40 
1985   38   39   39  39   39   41   41   41   00   40   39   39  45 

示例程序

下面給出了程序?qū)κ褂肕apReduce框架的示例數(shù)據(jù)。

package hadoop; 

import java.util.*; 

import java.io.IOException; 
import java.io.IOException; 

import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.conf.*; 
import org.apache.hadoop.io.*; 
import org.apache.hadoop.mapred.*; 
import org.apache.hadoop.util.*; 

public class ProcessUnits 
{ 
   //Mapper class 
   public static class E_EMapper extends MapReduceBase implements 
   Mapper<LongWritable ,/*Input key Type */ 
   Text,                /*Input value Type*/ 
   Text,                /*Output key Type*/ 
   IntWritable>        /*Output value Type*/ 
   { 
      
      //Map function 
      public void map(LongWritable key, Text value, 
      OutputCollector<Text, IntWritable> output,   
      Reporter reporter) throws IOException 
      { 
         String line = value.toString(); 
         String lasttoken = null; 
         StringTokenizer s = new StringTokenizer(line,"	"); 
         String year = s.nextToken(); 
         
         while(s.hasMoreTokens())
            {
               lasttoken=s.nextToken();
            } 
            
         int avgprice = Integer.parseInt(lasttoken); 
         output.collect(new Text(year), new IntWritable(avgprice)); 
      } 
   } 
   
   
   //Reducer class 
   public static class E_EReduce extends MapReduceBase implements 
   Reducer< Text, IntWritable, Text, IntWritable > 
   {  
   
      //Reduce function 
      public void reduce( Text key, Iterator <IntWritable> values, 
         OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException 
         { 
            int maxavg=30; 
            int val=Integer.MIN_VALUE; 
            
            while (values.hasNext()) 
            { 
               if((val=values.next().get())>maxavg) 
               { 
                  output.collect(key, new IntWritable(val)); 
               } 
            } 
 
         } 
   }  
   
   
   //Main function 
   public static void main(String args[])throws Exception 
   { 
      JobConf conf = new JobConf(ProcessUnits.class); 
      
      conf.setJobName("max_eletricityunits"); 
      conf.setOutputKeyClass(Text.class);
      conf.setOutputValueClass(IntWritable.class); 
      conf.setMapperClass(E_EMapper.class); 
      conf.setCombinerClass(E_EReduce.class); 
      conf.setReducerClass(E_EReduce.class); 
      conf.setInputFormat(TextInputFormat.class); 
      conf.setOutputFormat(TextOutputFormat.class); 
      
      FileInputFormat.setInputPaths(conf, new Path(args[0])); 
      FileOutputFormat.setOutputPath(conf, new Path(args[1])); 
      
      JobClient.runJob(conf); 
   } 
} 

將上述程序保存為ProcessUnits.java。程序的編譯和執(zhí)行說明如下。

過程單元程序的編譯和執(zhí)行

讓我們假設(shè)在Hadoop用戶的主目錄(例如/home/hadoop)。。

按照以下步驟編譯并執(zhí)行上述程序。

第1步

以下命令是創(chuàng)建一個(gè)目錄來存儲編譯的java類。

$ mkdir units 

第2步

下載Hadoop-core-1.2.1.jar,用于編譯和執(zhí)行MapReduce程序。訪問以下鏈接http://mvnrepository.com/artifact/org.apache.hadoop/hadoop-core/1.2.1下載jar。讓我們假設(shè)下載的文件夾是/ home / hadoop /。

第3步

以下命令用于編譯ProcessUnits.java程序并為該程序創(chuàng)建一個(gè)jar。

$ javac -classpath hadoop-core-1.2.1.jar -d units ProcessUnits.java 
$ jar -cvf units.jar -C units/ . 

第4步

以下命令用于在HDFS中創(chuàng)建輸入目錄。

$HADOOP_HOME/bin/hadoop fs -mkdir input_dir 

第5步

下命令用于復(fù)制名為sample.txt的輸入文件,在HDFS的輸入目錄中。

$HADOOP_HOME/bin/hadoop fs -put /home/hadoop/sample.txt input_dir 

第6步

以下命令用于驗(yàn)證輸入目錄中的文件。

$HADOOP_HOME/bin/hadoop fs -ls input_dir/ 

第7步

以下命令用于通過從輸入目錄獲取輸入文件來運(yùn)行Eleunit_max應(yīng)用程序。

$HADOOP_HOME/bin/hadoop jar units.jar hadoop.ProcessUnits input_dir output_dir 

等待一段時(shí)間,直到文件被執(zhí)行。執(zhí)行后,如下所示,輸出將包含輸入拆分的數(shù)量,Map任務(wù)的數(shù)量,reducer任務(wù)的數(shù)量等。

INFO mapreduce.Job: Job job_1414748220717_0002 
completed successfully 
14/10/31 06:02:52 
INFO mapreduce.Job: Counters: 49 
File System Counters 
 
FILE: Number of bytes read=61 
FILE: Number of bytes written=279400 
FILE: Number of read operations=0 
FILE: Number of large read operations=0   
FILE: Number of write operations=0 
HDFS: Number of bytes read=546 
HDFS: Number of bytes written=40 
HDFS: Number of read operations=9 
HDFS: Number of large read operations=0 
HDFS: Number of write operations=2 Job Counters 


   Launched map tasks=2  
   Launched reduce tasks=1 
   Data-local map tasks=2  
   Total time spent by all maps in occupied slots (ms)=146137 
   Total time spent by all reduces in occupied slots (ms)=441   
   Total time spent by all map tasks (ms)=14613 
   Total time spent by all reduce tasks (ms)=44120 
   Total vcore-seconds taken by all map tasks=146137 
   
   Total vcore-seconds taken by all reduce tasks=44120 
   Total megabyte-seconds taken by all map tasks=149644288 
   Total megabyte-seconds taken by all reduce tasks=45178880 
   
Map-Reduce Framework 
 
Map input records=5  
   Map output records=5   
   Map output bytes=45  
   Map output materialized bytes=67  
   Input split bytes=208 
   Combine input records=5  
   Combine output records=5 
   Reduce input groups=5  
   Reduce shuffle bytes=6  
   Reduce input records=5  
   Reduce output records=5  
   Spilled Records=10  
   Shuffled Maps =2  
   Failed Shuffles=0  
   Merged Map outputs=2  
   GC time elapsed (ms)=948  
   CPU time spent (ms)=5160  
   Physical memory (bytes) snapshot=47749120  
   Virtual memory (bytes) snapshot=2899349504  
   Total committed heap usage (bytes)=277684224
     
File Output Format Counters 
 
   Bytes Written=40 

第8步

以下命令用于驗(yàn)證輸出文件夾中的結(jié)果文件。

$HADOOP_HOME/bin/hadoop fs -ls output_dir/ 

第9步

以下命令用于查看Part-00000文件中的輸出。此文件由HDFS生成。

$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000 

下面是MapReduce程序生成的輸出。

1981    34 
1984    40 
1985    45 

第10步

以下命令用于將輸出文件夾從HDFS復(fù)制到本地文件系統(tǒng)進(jìn)行分析

$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000/bin/hadoop dfs get output_dir /home/hadoop 

重要命令

所有Hadoop命令都由$ HADOOP_HOME / bin / hadoop命令調(diào)用。運(yùn)行不帶任何參數(shù)的Hadoop腳本會打印所有命令的描述。

用法hadoop [--config confdir] COMMAND

下表列出了可用的選項(xiàng)及其說明。

選項(xiàng)描述
namenode -format格式化DFS文件系統(tǒng)。
secondarynamenode運(yùn)行DFS二次名稱節(jié)點(diǎn)。
namenode運(yùn)行DFS名稱節(jié)點(diǎn)。
datanode運(yùn)行DFS數(shù)據(jù)節(jié)點(diǎn)。
dfsadmin運(yùn)行DFS管理客戶端。
mradmin運(yùn)行Map-Reduce管理客戶端。
fsck運(yùn)行DFS文件系統(tǒng)檢查實(shí)用程序。
fs運(yùn)行通用文件系統(tǒng)用戶客戶端。
balancer運(yùn)行集群平衡實(shí)用程序。
oiv將離線fsimage查看器應(yīng)用于fsimage。
fetchdt從NameNode獲取委派令牌。
jobtracker運(yùn)行MapReduce作業(yè)跟蹤節(jié)點(diǎn)。
pipes運(yùn)行管道作業(yè)。
tasktracker運(yùn)行MapReduce任務(wù)跟蹤節(jié)點(diǎn)。
historyserver作為獨(dú)立的守護(hù)程序運(yùn)行作業(yè)歷史記錄服務(wù)器。
job操作MapReduce作業(yè)。
queue獲取有關(guān)JobQueues的信息。
version打印版本。
jar <jar>運(yùn)行jar文件。
distcp <srcurl> <desturl>遞歸復(fù)制文件或目錄。
distcp2 <srcurl> <desturl>DistCp版本2。
archive -archiveName NAME -p創(chuàng)建hadoop歸檔。
<parent path> <src>* <dest> 
classpath打印獲取Hadoop jar所需的類路徑和所需的庫。
daemonlog獲取/設(shè)置每個(gè)守護(hù)程序的日志級別

如何使用MapReduce任務(wù)交互

用法:Hadoop的工作[GENERIC_OPTIONS]

以下是在Hadoop作業(yè)的可用通用的選項(xiàng)。

通用選項(xiàng)描述
-submit <job-file>提交作業(yè)。
-status <job-id>打印映射并減少完成百分比和所有作業(yè)計(jì)數(shù)器。
-counter <job-id> <group-name> <countername>打印計(jì)數(shù)器值。
-kill <job-id>終止作業(yè)
-events <job-id> <fromevent-#> <#-of-events>打印jobtracker為給定范圍接收的事件詳細(xì)信息。
-history [all] <jobOutputDir> - history < jobOutputDir>打印作業(yè)詳細(xì)信息,失敗并停用提示詳細(xì)信息??梢酝ㄟ^指定[all]選項(xiàng)查看有關(guān)作業(yè)的更多詳細(xì)信息,如每個(gè)任務(wù)的成功任務(wù)和任務(wù)嘗試。
-list[all]顯示所有作業(yè)。 -list僅顯示尚未完成的作業(yè)。
-kill-task <task-id>終止任務(wù)。已終止的任務(wù)不會計(jì)入失敗的嘗試次數(shù)。
-fail-task <task-id>失敗的任務(wù)。失敗的任務(wù)將根據(jù)失敗的嘗試進(jìn)行計(jì)數(shù)。
-set-priority <job-id> <priority>更改作業(yè)的優(yōu)先級。允許的優(yōu)先級值為VERY_HIGH,HIGH,NORMAL,LOW,VERY_LOW

查看作業(yè)的狀態(tài)

$ $HADOOP_HOME/bin/hadoop job -status <JOB-ID> 
e.g. 
$ $HADOOP_HOME/bin/hadoop job -status job_201310191043_0004 

查看job output-dir的歷史

$ $HADOOP_HOME/bin/hadoop job -history <DIR-NAME> 
e.g. 
$ $HADOOP_HOME/bin/hadoop job -history /user/expert/output 

終止作業(yè)

$ $HADOOP_HOME/bin/hadoop job -kill <JOB-ID> 
e.g. 
$ $HADOOP_HOME/bin/hadoop job -kill job_201310191043_0004 
以上內(nèi)容是否對您有幫助:
在線筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號
微信公眾號

編程獅公眾號