通過InputFormat決定讀取的數(shù)據(jù)的類型,然后拆分成一個個InputSplit,每個InputSplit對應一個Map處理,RecordReader讀取InputSplit的內(nèi)容給Map
決定讀取數(shù)據(jù)的格式,可以是文件或數(shù)據(jù)庫等
List getSplits(): 獲取由輸入文件計算出輸入分片(InputSplit),解決數(shù)據(jù)或文件分割成片問題
RecordReader createRecordReader(): 創(chuàng)建RecordReader,從InputSplit中讀取數(shù)據(jù),解決讀取分片中數(shù)據(jù)問題
TextInputFormat: 輸入文件中的每一行就是一個記錄,Key是這一行的byte offset,而value是這一行的內(nèi)容
KeyValueTextInputFormat: 輸入文件中每一行就是一個記錄,第一個分隔符字符切分每行。在分隔符字符之前的內(nèi)容為Key,在之后的為Value。分隔符變量通過key.value.separator.in.input.line變量設置,默認為(\t)字符。
NLineInputFormat: 與TextInputFormat一樣,但每個數(shù)據(jù)塊必須保證有且只有N行,mapred.line.input.format.linespermap屬性,默認為1
SequenceFileInputFormat: 一個用來讀取字符流數(shù)據(jù)的InputFormat,<key,value>為用戶自定義的。字符流數(shù)據(jù)是Hadoop自定義的壓縮的二進制數(shù)據(jù)格式。它用來優(yōu)化從一個MapReduce任務的輸出到另一個MapReduce任務的輸入之間的數(shù)據(jù)傳輸過程。</key,value>
代表一個個邏輯分片,并沒有真正存儲數(shù)據(jù),只是提供了一個如何將數(shù)據(jù)分片的方法
Split內(nèi)有Location信息,利于數(shù)據(jù)局部化
一個InputSplit給一個單獨的Map處理
public abstract class InputSplit {
/**
* 獲取Split的大小,支持根據(jù)size對InputSplit排序.
*/
public abstract long getLength() throws IOException, InterruptedException;
/**
* 獲取存儲該分片的數(shù)據(jù)所在的節(jié)點位置.
*/
public abstract String[] getLocations() throws IOException, InterruptedException;
}
將InputSplit拆分成一個個<key,value>對給Map處理,也是實際的文件讀取分隔對象</key,value>
CombineFileInputFormat可以將若干個Split打包成一個,目的是避免過多的Map任務(因為Split的數(shù)目決定了Map的數(shù)目,大量的Mapper Task創(chuàng)建銷毀開銷將是巨大的)
通常一個split就是一個block(FileInputFormat僅僅拆分比block大的文件),這樣做的好處是使得Map可以在存儲有當前數(shù)據(jù)的節(jié)點上運行本地的任務,而不需要通過網(wǎng)絡進行跨節(jié)點的任務調(diào)度
通過mapred.min.split.size, mapred.max.split.size, block.size來控制拆分的大小
如果mapred.min.split.size大于block size,則會將兩個block合成到一個split,這樣有部分block數(shù)據(jù)需要通過網(wǎng)絡讀取
如果mapred.max.split.size小于block size,則會將一個block拆成多個split,增加了Map任務數(shù)(Map對split進行計算并且上報結果,關閉當前計算打開新的split均需要耗費資源)
先獲取文件在HDFS上的路徑和Block信息,然后根據(jù)splitSize對文件進行切分( splitSize = computeSplitSize(blockSize, minSize, maxSize) ),默認splitSize 就等于blockSize的默認值(64m)
public List<InputSplit> getSplits(JobContext job) throws IOException {
// 首先計算分片的最大和最小值。這兩個值將會用來計算分片的大小
long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
long maxSize = getMaxSplitSize(job);
// generate splits
List<InputSplit> splits = new ArrayList<InputSplit>();
List<FileStatus> files = listStatus(job);
for (FileStatus file: files) {
Path path = file.getPath();
long length = file.getLen();
if (length != 0) {
FileSystem fs = path.getFileSystem(job.getConfiguration());
// 獲取該文件所有的block信息列表[hostname, offset, length]
BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length);
// 判斷文件是否可分割,通常是可分割的,但如果文件是壓縮的,將不可分割
if (isSplitable(job, path)) {
long blockSize = file.getBlockSize();
// 計算分片大小
// 即 Math.max(minSize, Math.min(maxSize, blockSize));
long splitSize = computeSplitSize(blockSize, minSize, maxSize);
long bytesRemaining = length;
// 循環(huán)分片。
// 當剩余數(shù)據(jù)與分片大小比值大于Split_Slop時,繼續(xù)分片, 小于等于時,停止分片
while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
splits.add(makeSplit(path, length-bytesRemaining, splitSize, blkLocations[blkIndex].getHosts()));
bytesRemaining -= splitSize;
}
// 處理余下的數(shù)據(jù)
if (bytesRemaining != 0) {
splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining, blkLocations[blkLocations.length-1].getHosts()));
}
} else {
// 不可split,整塊返回
splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts()));
}
} else {
// 對于長度為0的文件,創(chuàng)建空Hosts列表,返回
splits.add(makeSplit(path, 0, length, new String[0]));
}
}
// 設置輸入文件數(shù)量
job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());
LOG.debug("Total # of splits: " + splits.size());
return splits;
}
split是根據(jù)文件大小分割的,而一般處理是根據(jù)分隔符進行分割的,這樣勢必存在一條記錄橫跨兩個split
解決辦法是只要不是第一個split,都會遠程讀取一條記錄。不是第一個split的都忽略到第一條記錄
public class LineRecordReader extends RecordReader<LongWritable, Text> {
private CompressionCodecFactory compressionCodecs = null;
private long start;
private long pos;
private long end;
private LineReader in;
private int maxLineLength;
private LongWritable key = null;
private Text value = null;
// initialize函數(shù)即對LineRecordReader的一個初始化
// 主要是計算分片的始末位置,打開輸入流以供讀取K-V對,處理分片經(jīng)過壓縮的情況等
public void initialize(InputSplit genericSplit, TaskAttemptContext context) throws IOException {
FileSplit split = (FileSplit) genericSplit;
Configuration job = context.getConfiguration();
this.maxLineLength = job.getInt("mapred.linerecordreader.maxlength", Integer.MAX_VALUE);
start = split.getStart();
end = start + split.getLength();
final Path file = split.getPath();
compressionCodecs = new CompressionCodecFactory(job);
final CompressionCodec codec = compressionCodecs.getCodec(file);
// 打開文件,并定位到分片讀取的起始位置
FileSystem fs = file.getFileSystem(job);
FSDataInputStream fileIn = fs.open(split.getPath());
boolean skipFirstLine = false;
if (codec != null) {
// 文件是壓縮文件的話,直接打開文件
in = new LineReader(codec.createInputStream(fileIn), job);
end = Long.MAX_VALUE;
} else {
// 只要不是第一個split,則忽略本split的第一行數(shù)據(jù)
if (start != 0) {
skipFirstLine = true;
--start;
// 定位到偏移位置,下次讀取就會從偏移位置開始
fileIn.seek(start);
}
in = new LineReader(fileIn, job);
}
if (skipFirstLine) {
// 忽略第一行數(shù)據(jù),重新定位start
start += in.readLine(new Text(), 0, (int) Math.min((long) Integer.MAX_VALUE, end - start));
}
this.pos = start;
}
public boolean nextKeyValue() throws IOException {
if (key == null) {
key = new LongWritable();
}
key.set(pos);// key即為偏移量
if (value == null) {
value = new Text();
}
int newSize = 0;
while (pos < end) {
newSize = in.readLine(value, maxLineLength, Math.max((int) Math.min(Integer.MAX_VALUE, end - pos), maxLineLength));
// 讀取的數(shù)據(jù)長度為0,則說明已讀完
if (newSize == 0) {
break;
}
pos += newSize;
// 讀取的數(shù)據(jù)長度小于最大行長度,也說明已讀取完畢
if (newSize < maxLineLength) {
break;
}
// 執(zhí)行到此處,說明該行數(shù)據(jù)沒讀完,繼續(xù)讀入
}
if (newSize == 0) {
key = null;
value = null;
return false;
} else {
return true;
}
}
}
更多建議: