主要是讀取InputSplit的每一個Key,Value對并進行處理
public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
/**
* 預處理,僅在map task啟動時運行一次
*/
protected void setup(Context context) throws IOException, InterruptedException {
}
/**
* 對于InputSplit中的每一對<key, value>都會運行一次
*/
@SuppressWarnings("unchecked")
protected void map(KEYIN key, VALUEIN value, Context context) throws IOException, InterruptedException {
context.write((KEYOUT) key, (VALUEOUT) value);
}
/**
* 掃尾工作,比如關(guān)閉流等
*/
protected void cleanup(Context context) throws IOException, InterruptedException {
}
/**
* map task的驅(qū)動器
*/
public void run(Context context) throws IOException, InterruptedException {
setup(context);
while (context.nextKeyValue()) {
map(context.getCurrentKey(), context.getCurrentValue(), context);
}
cleanup(context);
}
}
public class MapContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> extends TaskInputOutputContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
private RecordReader<KEYIN, VALUEIN> reader;
private InputSplit split;
/**
* Get the input split for this map.
*/
public InputSplit getInputSplit() {
return split;
}
@Override
public KEYIN getCurrentKey() throws IOException, InterruptedException {
return reader.getCurrentKey();
}
@Override
public VALUEIN getCurrentValue() throws IOException, InterruptedException {
return reader.getCurrentValue();
}
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
return reader.nextKeyValue();
}
}
更多建議: