W3Cschool
恭喜您成為首批注冊用戶
獲得88經(jīng)驗值獎勵
在本章,我們要創(chuàng)建一個 Storm 工程和我們的第一個 Storm 拓?fù)浣Y(jié)構(gòu)。
NOTE: 下面假設(shè)你的 JRE 版本在 1.6 以上。我們推薦 Oracle 提供的 JRE。你可以到 http://www.java.com/downloads/ 下載。
開始之前,有必要了解一下 Storm 的操作模式。有下面兩種方式。
在本地模式下,Storm 拓?fù)浣Y(jié)構(gòu)運(yùn)行在本地計算機(jī)的單一 JVM 進(jìn)程上。這個模式用于開發(fā)、測試以及調(diào)試,因為這是觀察所有組件如何協(xié)同工作的最簡單方法。在這種模式下,我們可以調(diào)整參數(shù),觀察我們的拓?fù)浣Y(jié)構(gòu)如何在不同的 Storm 配置環(huán)境下運(yùn)行。要在本地模式下運(yùn)行,我們要下載 Storm 開發(fā)依賴,以便用來開發(fā)并測試我們的拓?fù)浣Y(jié)構(gòu)。我們創(chuàng)建了第一個 Storm 工程以后,很快就會明白如何使用本地模式了。
NOTE: 在本地模式下,跟在集群環(huán)境運(yùn)行很像。不過很有必要確認(rèn)一下所有組件都是線程安全的,因為當(dāng)把它們部署到遠(yuǎn)程模式時它們可能會運(yùn)行在不同的 JVM 進(jìn)程甚至不同的物理機(jī)上,這個時候它們之間沒有直接的通訊或共享內(nèi)存。
我們要在本地模式運(yùn)行本章的所有例子。
在遠(yuǎn)程模式下,我們向 Storm 集群提交拓?fù)?,它通常由許多運(yùn)行在不同機(jī)器上的流程組成。遠(yuǎn)程模式不會出現(xiàn)調(diào)試信息, 因此它也稱作生產(chǎn)模式。不過在單一開發(fā)機(jī)上建立一個 Storm 集群是一個好主意,可以在部署到生產(chǎn)環(huán)境之前,用來確認(rèn)拓?fù)湓诩涵h(huán)境下沒有任何問題。
你將在第六章學(xué)到更多關(guān)于遠(yuǎn)程模式的內(nèi)容,并在附錄B學(xué)到如何安裝一個 Storm 集群。
我們在這個工程里創(chuàng)建一個簡單的拓?fù)?,?shù)單詞數(shù)量。我們可以把這個看作 Storm 的 “Hello World”。不過,這是一個非常強(qiáng)大的拓?fù)?,因為它能夠擴(kuò)展到幾乎無限大的規(guī)模,而且只需要做一些小修改,就能用它構(gòu)建一個統(tǒng)計系統(tǒng)。舉個例子,我們可以修改一下工程用來找出 Twitter 上的熱點話題。
要創(chuàng)建這個拓?fù)?,我們要用一個 spout 讀取文本,第一個 bolt 用來標(biāo)準(zhǔn)化單詞,第二個 bolt 為單詞計數(shù),如圖2-1所示。
你可以從這個網(wǎng)址下載源碼壓縮包, https://github.com/storm-book/examples-ch02-getting_started/zipball/master。
NOTE: 如果你使用 git(一個分布式版本控制與源碼管理工具),你可以執(zhí)行 git clone git@github.com:storm-book/examples-ch02-getting_started.git
,把源碼檢出到你指定的目錄。
構(gòu)建 Storm 運(yùn)行環(huán)境的第一步是檢查你安裝的 Java 版本。打開一個控制臺窗口并執(zhí)行命令:java -version。控制臺應(yīng)該會顯示出類似如下的內(nèi)容:
java -version
java version "1.6.0_26"
Java(TM) SE Runtime Enviroment (build 1.6.0_26-b03)
Java HotSpot(TM) Server VM (build 20.1-b02, mixed mode)
如果不是上述內(nèi)容,檢查你的 Java 安裝情況。(參考 http://www.java.com/download/)
開始之前,先為這個應(yīng)用建一個目錄(就像你平常為 Java 應(yīng)用做的那樣)。這個目錄用來存放工程源碼。
接下來我們要下載 Storm 依賴包,這是一些 jar 包,我們要把它們添加到應(yīng)用類路徑中。你可以采用如下兩種方式之一完成這一步:
NOTE: Maven 是一個軟件項目管理的綜合工具。它可以用來管理項目的開發(fā)周期的許多方面,從包依賴到版本發(fā)布過程。在這本書中,我們將廣泛使用它。如果要檢查是否已經(jīng)安裝了maven,在命令行運(yùn)行 mvn。如果沒有安裝你可以從 http://maven.apache.org/download.html下載。
沒有必要先成為一個 Maven 專家才能使用 Storm,不過了解一下關(guān)于 Maven 工作方式的基礎(chǔ)知識仍然會對你有所幫助。你可以在 Apache Maven 的網(wǎng)站上找到更多的信息(http://maven.apache.org/)。
NOTE: Storm 的 Maven 依賴引用了運(yùn)行 Storm 本地模式的所有庫。
要運(yùn)行我們的拓?fù)?,我們可以編寫一個包含基本組件的 pom.xml 文件。
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>storm.book</groupId>
<artifactId>Getting-Started</artifactId>
<version>0.0.1-SNAPSHOT</version>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.3.2</version>
<configuration>
<source>1.6</source>
<target>1.6</target>
<compilerVersion>1.6</compilerVersion>
</configuration>
</plugin>
</plugins>
</build>
<repositories>
<repository>
<id>clojars.org</id>
<url>http://clojars.org/repo</url>
</repository>
</repositories>
<dependencies>
<dependency>
<groupId>storm</groupId>
<artifactId>storm</artifactId>
<version>0.6.0</version>
</dependency>
</dependencies>
</project>
開頭幾行指定了工程名稱和版本號。然后我們添加了一個編譯器插件,告知 Maven 我們的代碼要用 Java1.6 編譯。接下來我們定義了 Maven 倉庫(Maven 支持為同一個工程指定多個倉庫)。clojars 是存放 Storm 依賴的倉庫。Maven 會為運(yùn)行本地模式自動下載必要的所有子包依賴。
一個典型的 Maven Java 工程會擁有如下結(jié)構(gòu):
我們的應(yīng)用目錄/
├── pom.xml
└── src
└── main
└── java
| ├── spouts
| └── bolts
└── resources
java 目錄下的子目錄包含我們的代碼,我們把要統(tǒng)計單詞數(shù)的文件保存在 resource 目錄下。
NOTE:命令 mkdir -p 會創(chuàng)建所有需要的父目錄。
我們將為運(yùn)行單詞計數(shù)創(chuàng)建所有必要的類??赡苓@個例子中的某些部分,現(xiàn)在無法講的很清楚,不過我們會在隨后的章節(jié)做進(jìn)一步的講解。
pout WordReader 類實現(xiàn)了 IRichSpout 接口。我們將在第四章看到更多細(xì)節(jié)。WordReader負(fù)責(zé)從文件按行讀取文本,并把文本行提供給第一個 bolt。
NOTE: 一個 spout 發(fā)布一個定義域列表。這個架構(gòu)允許你使用不同的 bolts 從同一個spout 流讀取數(shù)據(jù),它們的輸出也可作為其它 bolts 的定義域,以此類推。
例2-1包含 WordRead 類的完整代碼(我們將會分析下述代碼的每一部分)。
/
例2-1.src/main/java/spouts/WordReader.java
/
package spouts;
import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.util.Map;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
public class WordReader implements IRichSpout {
private SpoutOutputCollector collector;
private FileReader fileReader;
private boolean completed = false;
private TopologyContext context;
public boolean isDistributed() {return false;}
public void ack(Object msgId) {
System.out.println("OK:"+msgId);
}
public void close() {}
public void fail(Object msgId) {
System.out.println("FAIL:"+msgId);
}
/
這個方法做的惟一一件事情就是分發(fā)文件中的文本行
/
public void nextTuple() {
/
這個方法會不斷的被調(diào)用,直到整個文件都讀完了,我們將等待并返回。
/
if(completed){
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
//什么也不做
}
return;
}
String str;
//創(chuàng)建reader
BufferedReader reader = new BufferedReader(fileReader);
try{
//讀所有文本行
while((str = reader.readLine()) != null){
/
按行發(fā)布一個新值
/
this.collector.emit(new Values(str),str);
}
}catch(Exception e){
throw new RuntimeException("Error reading tuple",e);
}finally{
completed = true;
}
}
/
我們將創(chuàng)建一個文件并維持一個collector對象
/
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
try {
this.context = context;
this.fileReader = new FileReader(conf.get("wordsFile").toString());
} catch (FileNotFoundException e) {
throw new RuntimeException("Error reading file ["+conf.get("wordFile")+"]");
}
this.collector = collector;
}
/
聲明輸入域"word"
/
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("line"));
}
}
第一個被調(diào)用的 spout 方法都是 public void open(Map conf, TopologyContext context, SpoutOutputCollector collector)。它接收如下參數(shù):配置對象,在定義topology 對象是創(chuàng)建;TopologyContext 對象,包含所有拓?fù)鋽?shù)據(jù);還有SpoutOutputCollector 對象,它能讓我們發(fā)布交給 bolts 處理的數(shù)據(jù)。下面的代碼主是這個方法的實現(xiàn)。
public void open(Map conf, TopologyContext context,
SpoutOutputCollector collector) {
try {
this.context = context;
this.fileReader = new FileReader(conf.get("wordsFile").toString());
} catch (FileNotFoundException e) {
throw new RuntimeException("Error reading file ["+conf.get("wordFile")+"]");
}
this.collector = collector;
}
我們在這個方法里創(chuàng)建了一個 FileReader 對象,用來讀取文件。接下來我們要實現(xiàn) public void nextTuple(),我們要通過它向 bolts 發(fā)布待處理的數(shù)據(jù)。在這個例子里,這個方法要讀取文件并逐行發(fā)布數(shù)據(jù)。
public void nextTuple() {
if(completed){
try {
Thread.sleep(1);
} catch (InterruptedException e) {
//什么也不做
}
return;
}
String str;
BufferedReader reader = new BufferedReader(fileReader);
try{
while((str = reader.readLine()) != null){
this.collector.emit(new Values(str));
}
}catch(Exception e){
throw new RuntimeException("Error reading tuple",e);
}finally{
completed = true;
}
}
NOTE: Values 是一個 ArrarList 實現(xiàn),它的元素就是傳入構(gòu)造器的參數(shù)。
nextTuple() 會在同一個循環(huán)內(nèi)被 ack() 和 fail() 周期性的調(diào)用。沒有任務(wù)時它必須釋放對線程的控制,其它方法才有機(jī)會得以執(zhí)行。因此 nextTuple 的第一行就要檢查是否已處理完成。如果完成了,為了降低處理器負(fù)載,會在返回前休眠一毫秒。如果任務(wù)完成了,文件中的每一行都已被讀出并分發(fā)了。
NOTE:元組(tuple)是一個具名值列表,它可以是任意 java 對象(只要它是可序列化的)。默認(rèn)情況,Storm 會序列化字符串、字節(jié)數(shù)組、ArrayList、HashMap 和 HashSet 等類型。
現(xiàn)在我們有了一個 spout,用來按行讀取文件并每行發(fā)布一個元組,還要創(chuàng)建兩個 bolts,用來處理它們(看圖2-1)。bolts 實現(xiàn)了接口 backtype.storm.topology.IRichBolt。
bolt最重要的方法是void execute(Tuple input),每次接收到元組時都會被調(diào)用一次,還會再發(fā)布若干個元組。
NOTE: 只要必要,bolt 或 spout 會發(fā)布若干元組。當(dāng)調(diào)用 nextTuple 或 execute 方法時,它們可能會發(fā)布0個、1個或許多個元組。你將在第五章學(xué)習(xí)更多這方面的內(nèi)容。
第一個 bolt,WordNormalizer,負(fù)責(zé)得到并標(biāo)準(zhǔn)化每行文本。它把文本行切分成單詞,大寫轉(zhuǎn)化成小寫,去掉頭尾空白符。
首先我們要聲明 bolt 的出參:
public void declareOutputFields(OutputFieldsDeclarer declarer){
declarer.declare(new Fields("word"));
}
這里我們聲明 bolt 將發(fā)布一個名為 “word” 的域。
下一步我們實現(xiàn) public void execute(Tuple input),處理傳入的元組:
public void execute(Tuple input){
String sentence=input.getString(0);
String[] words=sentence.split(" ");
for(String word : words){
word=word.trim();
if(!word.isEmpty()){
word=word.toLowerCase();
//發(fā)布這個單詞
collector.emit(new Values(word));
}
}
//對元組做出應(yīng)答
collector.ack(input);
}
第一行從元組讀取值。值可以按位置或名稱讀取。接下來值被處理并用collector對象發(fā)布。最后,每次都調(diào)用collector 對象的 ack() 方法確認(rèn)已成功處理了一個元組。
例2-2是這個類的完整代碼。
//例2-2 src/main/java/bolts/WordNormalizer.java
package bolts;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
public class WordNormalizer implements IRichBolt{
private OutputCollector collector;
public void cleanup(){}
/
bolt從單詞文件接收到文本行,并標(biāo)準(zhǔn)化它。
文本行會全部轉(zhuǎn)化成小寫,并切分它,從中得到所有單詞。
/
public void execute(Tuple input){
String sentence = input.getString(0);
String[] words = sentence.split(" ");
for(String word : words){
word = word.trim();
if(!word.isEmpty()){
word=word.toLowerCase();
//發(fā)布這個單詞
List a = new ArrayList();
a.add(input);
collector.emit(a,new Values(word));
}
}
//對元組做出應(yīng)答
collector.ack(input);
}
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector=collector;
}
/
這個bolt只會發(fā)布“word”域
/
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
}
NOTE:通過這個例子,我們了解了在一次 execute 調(diào)用中發(fā)布多個元組。如果這個方法在一次調(diào)用中接收到句子 “This is the Storm book”,它將會發(fā)布五個元組。
下一個bolt,WordCounter,負(fù)責(zé)為單詞計數(shù)。這個拓?fù)浣Y(jié)束時(cleanup() 方法被調(diào)用時),我們將顯示每個單詞的數(shù)量。
NOTE: 這個例子的 bolt 什么也沒發(fā)布,它把數(shù)據(jù)保存在 map 里,但是在真實的場景中可以把數(shù)據(jù)保存到數(shù)據(jù)庫。
package bolts; import java.util.HashMap; import java.util.Map; import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.IRichBolt; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.tuple.Tuple; public class WordCounter implements IRichBolt{ Integer id; String name; Map counters; private OutputCollector collector; / 這個spout結(jié)束時(集群關(guān)閉的時候),我們會顯示單詞數(shù)量 / @Override public void cleanup(){ System.out.println("-- 單詞數(shù) 【"+name+"-"+id+"】 --"); for(Map.Entry entry : counters.entrySet()){ System.out.println(entry.getKey()+": "+entry.getValue()); } } / 為每個單詞計數(shù) /
@Override public void execute(Tuple input) { String str=input.getString(0); /** 如果單詞尚不存在于map,我們就創(chuàng)建一個,如果已在,我們就為它加1 / if(!counters.containsKey(str)){ counters.put(str,1); }else{ Integer c = counters.get(str) + 1; counters.put(str,c); } //對元組作為應(yīng)答 collector.ack(input); }/ 初始化 / @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector){ this.counters = new HashMap(); this.collector = collector; this.name = context.getThisComponentId(); this.id = context.getThisTaskId(); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) {}}
execute 方法使用一個 map 收集單詞并計數(shù)。拓?fù)浣Y(jié)束時,將調(diào)用 clearup() 方法打印計數(shù)器 map。(雖然這只是一個例子,但是通常情況下,當(dāng)拓?fù)潢P(guān)閉時,你應(yīng)當(dāng)使用 cleanup() 方法關(guān)閉活動的連接和其它資源。)
你可以在主類中創(chuàng)建拓?fù)浜鸵粋€本地集群對象,以便于在本地測試和調(diào)試。LocalCluster 可以通過 Config 對象,讓你嘗試不同的集群配置。比如,當(dāng)使用不同數(shù)量的工作進(jìn)程測試你的拓?fù)鋾r,如果不小心使用了某個全局變量或類變量,你就能夠發(fā)現(xiàn)錯誤。(更多內(nèi)容請見第三章)
NOTE:所有拓?fù)涔?jié)點的各個進(jìn)程必須能夠獨立運(yùn)行,而不依賴共享數(shù)據(jù)(也就是沒有全局變量或類變量),因為當(dāng)拓?fù)溥\(yùn)行在真實的集群環(huán)境時,這些進(jìn)程可能會運(yùn)行在不同的機(jī)器上。
接下來,TopologyBuilder 將用來創(chuàng)建拓?fù)?,它決定 Storm 如何安排各節(jié)點,以及它們交換數(shù)據(jù)的方式。
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("word-reader", new WordReader());
builder.setBolt("word-normalizer", new WordNormalizer()).shuffleGrouping("word-reader");
builder.setBolt("word-counter", new WordCounter()).shuffleGrouping("word-normalizer");
在 spout 和 bolts 之間通過 shuffleGrouping 方法連接。這種分組方式?jīng)Q定了 Storm 會以隨機(jī)分配方式從源節(jié)點向目標(biāo)節(jié)點發(fā)送消息。
下一步,創(chuàng)建一個包含拓?fù)渑渲玫?Config 對象,它會在運(yùn)行時與集群配置合并,并通過prepare 方法發(fā)送給所有節(jié)點。
Config conf = new Config();
conf.put("wordsFile", args[0]);
conf.setDebug(true);
由 spout 讀取的文件的文件名,賦值給 wordFile 屬性。由于是在開發(fā)階段,設(shè)置 debug 屬性為 true,Strom 會打印節(jié)點間交換的所有消息,以及其它有助于理解拓?fù)溥\(yùn)行方式的調(diào)試數(shù)據(jù)。
正如之前講過的,你要用一個 LocalCluster 對象運(yùn)行這個拓?fù)洹T谏a(chǎn)環(huán)境中,拓?fù)鋾掷m(xù)運(yùn)行,不過對于這個例子而言,你只要運(yùn)行它幾秒鐘就能看到結(jié)果。
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("Getting-Started-Topologie", conf, builder.createTopology());
Thread.sleep(2000);
cluster.shutdown();
調(diào)用 createTopology 和 submitTopology,運(yùn)行拓?fù)?,休眠兩秒鐘(拓?fù)湓诹硗獾木€程運(yùn)行),然后關(guān)閉集群。
例2-3是完整的代碼
//例2-3 src/main/java/TopologyMain.java
import spouts.WordReader;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;
import bolts.WordCounter;
import bolts.WordNormalizer;
public class TopologyMain {
public static void main(String[] args) throws InterruptedException {
//定義拓?fù)? TopologyBuilder builder = new TopologyBuilder());
builder.setSpout("word-reader", new WordReader());
builder.setBolt("word-normalizer", new WordNormalizer()).shuffleGrouping("word-reader");
builder.setBolt("word-counter", new WordCounter(),2).fieldsGrouping("word-normalizer", new Fields("word"));
//配置
Config conf = new Config();
conf.put("wordsFile", args[0]);
conf.setDebug(false);
//運(yùn)行拓?fù)? conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("Getting-Started-Topologie", conf, builder.createTopology();
Thread.sleep(1000);
cluster.shutdown();
}
}
觀察運(yùn)行情況
你已經(jīng)為運(yùn)行你的第一個拓?fù)錅?zhǔn)備好了。在這個目錄下面創(chuàng)建一個文件,/src/main/resources/words.txt,一個單詞一行,然后用下面的命令運(yùn)行這個拓?fù)洌?strong>mvn exec:java -Dexec.mainClass=”TopologyMain” -Dexec.args=”src/main/resources/words.txt。舉個例子,如果你的 words.txt 文件有如下內(nèi)容: Storm test are great is an Storm simple application but very powerful really Storm is great 你應(yīng)該會在日志中看到類似下面的內(nèi)容: is: 2 application: 1 but: 1 great: 1 test: 1 simple: 1 Storm: 3 really: 1 are: 1 great: 1 an: 1 powerful: 1 very: 1 在這個例子中,每類節(jié)點只有一個實例。但是如果你有一個非常大的日志文件呢?你能夠很輕松的改變系統(tǒng)中的節(jié)點數(shù)量實現(xiàn)并行工作。這個時候,你就要創(chuàng)建兩個 WordCounter* 實例。
builder.setBolt("word-counter", new WordCounter(),2).shuffleGrouping("word-normalizer");
程序返回時,你將看到: — 單詞數(shù) 【word-counter-2】 — application: 1 is: 1 great: 1 are: 1 powerful: 1 Storm: 3 — 單詞數(shù) [word-counter-3] — really: 1 is: 1 but: 1 great: 1 test: 1 simple: 1 an: 1 very: 1 棒極了!修改并行度實在是太容易了(當(dāng)然對于實際情況來說,每個實例都會運(yùn)行在單獨的機(jī)器上)。不過似乎有一個問題:單詞 is 和 great 分別在每個 WordCounter 各計數(shù)一次。怎么會這樣?當(dāng)你調(diào)用shuffleGrouping 時,就決定了 Storm 會以隨機(jī)分配的方式向你的 bolt 實例發(fā)送消息。在這個例子中,理想的做法是相同的單詞問題發(fā)送給同一個 WordCounter 實例。你把shuffleGrouping(“word-normalizer”) 換成 fieldsGrouping(“word-normalizer”, new Fields(“word”)) 就能達(dá)到目的。試一試,重新運(yùn)行程序,確認(rèn)結(jié)果。 你將在后續(xù)章節(jié)學(xué)習(xí)更多分組方式和消息流類型。
我們已經(jīng)討論了 Storm 的本地和遠(yuǎn)程操作模式之間的不同,以及 Storm 的強(qiáng)大和易于開發(fā)的特性。你也學(xué)習(xí)了一些 Storm 的基本概念,我們將在后續(xù)章節(jié)深入講解它們。
Copyright©2021 w3cschool編程獅|閩ICP備15016281號-3|閩公網(wǎng)安備35020302033924號
違法和不良信息舉報電話:173-0602-2364|舉報郵箱:jubao@eeedong.com
掃描二維碼
下載編程獅App
編程獅公眾號
聯(lián)系方式:
更多建議: