Storm 起步

2018-08-27 09:53 更新

準備開始

準備開始

在本章,我們要創(chuàng)建一個 Storm 工程和我們的第一個 Storm 拓撲結(jié)構(gòu)。

NOTE: 下面假設(shè)你的 JRE 版本在 1.6 以上。我們推薦 Oracle 提供的 JRE。你可以到 http://www.java.com/downloads/ 下載。

操作模式

開始之前,有必要了解一下 Storm 的操作模式。有下面兩種方式。

本地模式

在本地模式下,Storm 拓撲結(jié)構(gòu)運行在本地計算機的單一 JVM 進程上。這個模式用于開發(fā)、測試以及調(diào)試,因為這是觀察所有組件如何協(xié)同工作的最簡單方法。在這種模式下,我們可以調(diào)整參數(shù),觀察我們的拓撲結(jié)構(gòu)如何在不同的 Storm 配置環(huán)境下運行。要在本地模式下運行,我們要下載 Storm 開發(fā)依賴,以便用來開發(fā)并測試我們的拓撲結(jié)構(gòu)。我們創(chuàng)建了第一個 Storm 工程以后,很快就會明白如何使用本地模式了。

NOTE: 在本地模式下,跟在集群環(huán)境運行很像。不過很有必要確認一下所有組件都是線程安全的,因為當把它們部署到遠程模式時它們可能會運行在不同的 JVM 進程甚至不同的物理機上,這個時候它們之間沒有直接的通訊或共享內(nèi)存。

我們要在本地模式運行本章的所有例子。

遠程模式

在遠程模式下,我們向 Storm 集群提交拓撲,它通常由許多運行在不同機器上的流程組成。遠程模式不會出現(xiàn)調(diào)試信息, 因此它也稱作生產(chǎn)模式。不過在單一開發(fā)機上建立一個 Storm 集群是一個好主意,可以在部署到生產(chǎn)環(huán)境之前,用來確認拓撲在集群環(huán)境下沒有任何問題。

你將在第六章學(xué)到更多關(guān)于遠程模式的內(nèi)容,并在附錄B學(xué)到如何安裝一個 Storm 集群。

Hello World

我們在這個工程里創(chuàng)建一個簡單的拓撲,數(shù)單詞數(shù)量。我們可以把這個看作 Storm 的 “Hello World”。不過,這是一個非常強大的拓撲,因為它能夠擴展到幾乎無限大的規(guī)模,而且只需要做一些小修改,就能用它構(gòu)建一個統(tǒng)計系統(tǒng)。舉個例子,我們可以修改一下工程用來找出 Twitter 上的熱點話題。

要創(chuàng)建這個拓撲,我們要用一個 spout 讀取文本,第一個 bolt 用來標準化單詞,第二個 bolt 為單詞計數(shù),如圖2-1所示。

你可以從這個網(wǎng)址下載源碼壓縮包, https://github.com/storm-book/examples-ch02-getting_started/zipball/master。

NOTE: 如果你使用 git(一個分布式版本控制與源碼管理工具),你可以執(zhí)行 git clone git@github.com:storm-book/examples-ch02-getting_started.git,把源碼檢出到你指定的目錄。

Java 安裝檢查

構(gòu)建 Storm 運行環(huán)境的第一步是檢查你安裝的 Java 版本。打開一個控制臺窗口并執(zhí)行命令:java -version??刂婆_應(yīng)該會顯示出類似如下的內(nèi)容:

    java -version

    java version "1.6.0_26"
    Java(TM) SE Runtime Enviroment (build 1.6.0_26-b03)

    Java HotSpot(TM) Server VM (build 20.1-b02, mixed mode)  

如果不是上述內(nèi)容,檢查你的 Java 安裝情況。(參考 http://www.java.com/download/

創(chuàng)建工程

開始之前,先為這個應(yīng)用建一個目錄(就像你平常為 Java 應(yīng)用做的那樣)。這個目錄用來存放工程源碼。

接下來我們要下載 Storm 依賴包,這是一些 jar 包,我們要把它們添加到應(yīng)用類路徑中。你可以采用如下兩種方式之一完成這一步:

  • 下載所有依賴,解壓縮它們,把它 們添加到類路徑
  • 使用 Apache Maven

NOTE: Maven 是一個軟件項目管理的綜合工具。它可以用來管理項目的開發(fā)周期的許多方面,從包依賴到版本發(fā)布過程。在這本書中,我們將廣泛使用它。如果要檢查是否已經(jīng)安裝了maven,在命令行運行 mvn。如果沒有安裝你可以從 http://maven.apache.org/download.html下載。

沒有必要先成為一個 Maven 專家才能使用 Storm,不過了解一下關(guān)于 Maven 工作方式的基礎(chǔ)知識仍然會對你有所幫助。你可以在 Apache Maven 的網(wǎng)站上找到更多的信息(http://maven.apache.org/)。

NOTE: Storm 的 Maven 依賴引用了運行 Storm 本地模式的所有庫。

要運行我們的拓撲,我們可以編寫一個包含基本組件的 pom.xml 文件。

    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
             http://maven.apache.org/xsd/maven-4.0.0.xsd">
             <modelVersion>4.0.0</modelVersion>
             <groupId>storm.book</groupId>
             <artifactId>Getting-Started</artifactId>
             <version>0.0.1-SNAPSHOT</version>
             <build>
                 <plugins>
                     <plugin>
                         <groupId>org.apache.maven.plugins</groupId>
                         <artifactId>maven-compiler-plugin</artifactId>
                         <version>2.3.2</version>
                         <configuration>
                             <source>1.6</source>
                             <target>1.6</target>
                             <compilerVersion>1.6</compilerVersion>
                         </configuration>
                     </plugin>
                 </plugins>
             </build>
             <repositories>
                 
                 <repository>
                     <id>clojars.org</id>
                     <url>http://clojars.org/repo</url>
                 </repository>
             </repositories>
             <dependencies>
                 
                 <dependency>
                     <groupId>storm</groupId>
                     <artifactId>storm</artifactId>
                     <version>0.6.0</version>
                 </dependency>
             </dependencies>
    </project>  

開頭幾行指定了工程名稱和版本號。然后我們添加了一個編譯器插件,告知 Maven 我們的代碼要用 Java1.6 編譯。接下來我們定義了 Maven 倉庫(Maven 支持為同一個工程指定多個倉庫)。clojars 是存放 Storm 依賴的倉庫。Maven 會為運行本地模式自動下載必要的所有子包依賴。

一個典型的 Maven Java 工程會擁有如下結(jié)構(gòu):

我們的應(yīng)用目錄/
         ├── pom.xml
         └── src
               └── main
                  └── java
               |  ├── spouts
               |  └── bolts
               └── resources  

java 目錄下的子目錄包含我們的代碼,我們把要統(tǒng)計單詞數(shù)的文件保存在 resource 目錄下。

NOTE:命令 mkdir -p 會創(chuàng)建所有需要的父目錄。

創(chuàng)建我們的第一個 Topology

我們將為運行單詞計數(shù)創(chuàng)建所有必要的類。可能這個例子中的某些部分,現(xiàn)在無法講的很清楚,不過我們會在隨后的章節(jié)做進一步的講解。

Spout

pout WordReader 類實現(xiàn)了 IRichSpout 接口。我們將在第四章看到更多細節(jié)。WordReader負責從文件按行讀取文本,并把文本行提供給第一個 bolt。

NOTE: 一個 spout 發(fā)布一個定義域列表。這個架構(gòu)允許你使用不同的 bolts 從同一個spout 流讀取數(shù)據(jù),它們的輸出也可作為其它 bolts 的定義域,以此類推。

例2-1包含 WordRead 類的完整代碼(我們將會分析下述代碼的每一部分)。


       /
           例2-1.src/main/java/spouts/WordReader.java
         /
        package spouts;

        import java.io.BufferedReader;
        import java.io.FileNotFoundException;
        import java.io.FileReader;
        import java.util.Map;
        import backtype.storm.spout.SpoutOutputCollector;
        import backtype.storm.task.TopologyContext;
        import backtype.storm.topology.IRichSpout;
        import backtype.storm.topology.OutputFieldsDeclarer;
        import backtype.storm.tuple.Fields;
        import backtype.storm.tuple.Values;

        public class WordReader implements IRichSpout {
            private SpoutOutputCollector collector;
            private FileReader fileReader;
            private boolean completed = false;
            private TopologyContext context;
            public boolean isDistributed() {return false;}
            public void ack(Object msgId) {
                    System.out.println("OK:"+msgId);
            }
            public void close() {}
            public void fail(Object msgId) {
                 System.out.println("FAIL:"+msgId);
            }
            /
              這個方法做的惟一一件事情就是分發(fā)文件中的文本行
             /
            public void nextTuple() {
            /
              這個方法會不斷的被調(diào)用,直到整個文件都讀完了,我們將等待并返回。
             /
                 if(completed){
                     try {
                         Thread.sleep(1000);
                     } catch (InterruptedException e) {
                         //什么也不做
                     }
                    return;
                 }
                 String str;
                 //創(chuàng)建reader
                 BufferedReader reader = new BufferedReader(fileReader);
                 try{
                     //讀所有文本行
                    while((str = reader.readLine()) != null){
                     /
                       按行發(fā)布一個新值
                      /
                         this.collector.emit(new Values(str),str);
                     }
                 }catch(Exception e){
                     throw new RuntimeException("Error reading tuple",e);
                 }finally{
                     completed = true;
                 }
             }
             /
               我們將創(chuàng)建一個文件并維持一個collector對象
              /
             public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
                     try {
                         this.context = context;
                         this.fileReader = new FileReader(conf.get("wordsFile").toString());
                     } catch (FileNotFoundException e) {
                         throw new RuntimeException("Error reading file ["+conf.get("wordFile")+"]");
                     }
                     this.collector = collector;
             }
             /
               聲明輸入域"word"
              /
             public void declareOutputFields(OutputFieldsDeclarer declarer) {
                 declarer.declare(new Fields("line"));
             }
        }  

第一個被調(diào)用的 spout 方法都是 public void open(Map conf, TopologyContext context, SpoutOutputCollector collector)。它接收如下參數(shù):配置對象,在定義topology 對象是創(chuàng)建;TopologyContext 對象,包含所有拓撲數(shù)據(jù);還有SpoutOutputCollector 對象,它能讓我們發(fā)布交給 bolts 處理的數(shù)據(jù)。下面的代碼主是這個方法的實現(xiàn)。

    public void open(Map conf, TopologyContext context,
        SpoutOutputCollector collector) {
        try {
            this.context = context;
            this.fileReader = new FileReader(conf.get("wordsFile").toString());
        } catch (FileNotFoundException e) {
            throw new RuntimeException("Error reading file ["+conf.get("wordFile")+"]");
        }
        this.collector = collector;
    }  

我們在這個方法里創(chuàng)建了一個 FileReader 對象,用來讀取文件。接下來我們要實現(xiàn) public void nextTuple(),我們要通過它向 bolts 發(fā)布待處理的數(shù)據(jù)。在這個例子里,這個方法要讀取文件并逐行發(fā)布數(shù)據(jù)。

    public void nextTuple() {
        if(completed){
            try {
                Thread.sleep(1);
            } catch (InterruptedException e) {
                //什么也不做
            }
            return;
        }
        String str;
        BufferedReader reader = new BufferedReader(fileReader);
        try{
            while((str = reader.readLine()) != null){
                this.collector.emit(new Values(str));
            }
        }catch(Exception e){
            throw new RuntimeException("Error reading tuple",e);
        }finally{
            completed = true;
        }
    }  

NOTE: Values 是一個 ArrarList 實現(xiàn),它的元素就是傳入構(gòu)造器的參數(shù)。

nextTuple() 會在同一個循環(huán)內(nèi)被 ack()fail() 周期性的調(diào)用。沒有任務(wù)時它必須釋放對線程的控制,其它方法才有機會得以執(zhí)行。因此 nextTuple 的第一行就要檢查是否已處理完成。如果完成了,為了降低處理器負載,會在返回前休眠一毫秒。如果任務(wù)完成了,文件中的每一行都已被讀出并分發(fā)了。

NOTE:元組(tuple)是一個具名值列表,它可以是任意 java 對象(只要它是可序列化的)。默認情況,Storm 會序列化字符串、字節(jié)數(shù)組、ArrayList、HashMap 和 HashSet 等類型。

Bolts

現(xiàn)在我們有了一個 spout,用來按行讀取文件并每行發(fā)布一個元組,還要創(chuàng)建兩個 bolts,用來處理它們(看圖2-1)。bolts 實現(xiàn)了接口 backtype.storm.topology.IRichBolt。

bolt最重要的方法是void execute(Tuple input),每次接收到元組時都會被調(diào)用一次,還會再發(fā)布若干個元組。

NOTE: 只要必要,bolt 或 spout 會發(fā)布若干元組。當調(diào)用 nextTupleexecute 方法時,它們可能會發(fā)布0個、1個或許多個元組。你將在第五章學(xué)習(xí)更多這方面的內(nèi)容。

第一個 bolt,WordNormalizer,負責得到并標準化每行文本。它把文本行切分成單詞,大寫轉(zhuǎn)化成小寫,去掉頭尾空白符。

首先我們要聲明 bolt 的出參:

    public void declareOutputFields(OutputFieldsDeclarer declarer){
        declarer.declare(new Fields("word"));
    }  

這里我們聲明 bolt 將發(fā)布一個名為 “word” 的域。

下一步我們實現(xiàn) public void execute(Tuple input),處理傳入的元組:

    public void execute(Tuple input){
        String sentence=input.getString(0);
        String[] words=sentence.split(" ");
        for(String word : words){
            word=word.trim();
            if(!word.isEmpty()){
                word=word.toLowerCase();
                //發(fā)布這個單詞
                collector.emit(new Values(word));
            }
        }
        //對元組做出應(yīng)答
        collector.ack(input);
    }  

第一行從元組讀取值。值可以按位置或名稱讀取。接下來值被處理并用collector對象發(fā)布。最后,每次都調(diào)用collector 對象的 ack() 方法確認已成功處理了一個元組。

例2-2是這個類的完整代碼。

    //例2-2 src/main/java/bolts/WordNormalizer.java
    package bolts;
    import java.util.ArrayList;
    import java.util.List;
    import java.util.Map;
    import backtype.storm.task.OutputCollector;
    import backtype.storm.task.TopologyContext;
    import backtype.storm.topology.IRichBolt;
    import backtype.storm.topology.OutputFieldsDeclarer;
    import backtype.storm.tuple.Fields;
    import backtype.storm.tuple.Tuple;
    import backtype.storm.tuple.Values;
    public class WordNormalizer implements IRichBolt{
        private OutputCollector collector;
        public void cleanup(){}
        /
           bolt從單詞文件接收到文本行,并標準化它。
           文本行會全部轉(zhuǎn)化成小寫,并切分它,從中得到所有單詞。
         /
        public void execute(Tuple input){
            String sentence = input.getString(0);
            String[] words = sentence.split(" ");
            for(String word : words){
                word = word.trim();
                if(!word.isEmpty()){
                    word=word.toLowerCase();
                    //發(fā)布這個單詞
                    List a = new ArrayList();
                    a.add(input);
                    collector.emit(a,new Values(word));
                }
            }
            //對元組做出應(yīng)答
            collector.ack(input);
        }
        public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
            this.collector=collector;
        }

        /
           這個bolt只會發(fā)布“word”域
          /
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("word"));
        }
    }  

NOTE:通過這個例子,我們了解了在一次 execute 調(diào)用中發(fā)布多個元組。如果這個方法在一次調(diào)用中接收到句子 “This is the Storm book”,它將會發(fā)布五個元組。

下一個bolt,WordCounter,負責為單詞計數(shù)。這個拓撲結(jié)束時(cleanup() 方法被調(diào)用時),我們將顯示每個單詞的數(shù)量。

NOTE: 這個例子的 bolt 什么也沒發(fā)布,它把數(shù)據(jù)保存在 map 里,但是在真實的場景中可以把數(shù)據(jù)保存到數(shù)據(jù)庫。

package bolts;

import java.util.HashMap;
import java.util.Map;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;

public class WordCounter implements IRichBolt{
    Integer id;
    String name;
    Map counters;
    private OutputCollector collector;

    /
       這個spout結(jié)束時(集群關(guān)閉的時候),我們會顯示單詞數(shù)量
      /
    @Override
    public void cleanup(){
        System.out.println("-- 單詞數(shù) 【"+name+"-"+id+"】 --");
        for(Map.Entry entry : counters.entrySet()){
            System.out.println(entry.getKey()+": "+entry.getValue());
        }
    }

    /
       為每個單詞計數(shù)
     /
@Override
public void execute(Tuple input) {
    String str=input.getString(0);
    /**
      如果單詞尚不存在于map,我們就創(chuàng)建一個,如果已在,我們就為它加1
     /
    if(!counters.containsKey(str)){
        counters.put(str,1);
    }else{
        Integer c = counters.get(str) + 1;
        counters.put(str,c);
    }
    //對元組作為應(yīng)答
    collector.ack(input);
}
/ 初始化 / @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector){ this.counters = new HashMap(); this.collector = collector; this.name = context.getThisComponentId(); this.id = context.getThisTaskId(); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) {}}

execute 方法使用一個 map 收集單詞并計數(shù)。拓撲結(jié)束時,將調(diào)用 clearup() 方法打印計數(shù)器 map。(雖然這只是一個例子,但是通常情況下,當拓撲關(guān)閉時,你應(yīng)當使用 cleanup() 方法關(guān)閉活動的連接和其它資源。)

主類

你可以在主類中創(chuàng)建拓撲和一個本地集群對象,以便于在本地測試和調(diào)試。LocalCluster 可以通過 Config 對象,讓你嘗試不同的集群配置。比如,當使用不同數(shù)量的工作進程測試你的拓撲時,如果不小心使用了某個全局變量或類變量,你就能夠發(fā)現(xiàn)錯誤。(更多內(nèi)容請見第三章)

NOTE:所有拓撲節(jié)點的各個進程必須能夠獨立運行,而不依賴共享數(shù)據(jù)(也就是沒有全局變量或類變量),因為當拓撲運行在真實的集群環(huán)境時,這些進程可能會運行在不同的機器上。

接下來,TopologyBuilder 將用來創(chuàng)建拓撲,它決定 Storm 如何安排各節(jié)點,以及它們交換數(shù)據(jù)的方式。

    TopologyBuilder builder = new TopologyBuilder();
    builder.setSpout("word-reader", new WordReader());
    builder.setBolt("word-normalizer", new WordNormalizer()).shuffleGrouping("word-reader");
    builder.setBolt("word-counter", new WordCounter()).shuffleGrouping("word-normalizer");  

spoutbolts 之間通過 shuffleGrouping 方法連接。這種分組方式?jīng)Q定了 Storm 會以隨機分配方式從源節(jié)點向目標節(jié)點發(fā)送消息。

下一步,創(chuàng)建一個包含拓撲配置的 Config 對象,它會在運行時與集群配置合并,并通過prepare 方法發(fā)送給所有節(jié)點。

    Config conf = new Config();
    conf.put("wordsFile", args[0]);
    conf.setDebug(true);  

由 spout 讀取的文件的文件名,賦值給 wordFile 屬性。由于是在開發(fā)階段,設(shè)置 debug 屬性為 true,Strom 會打印節(jié)點間交換的所有消息,以及其它有助于理解拓撲運行方式的調(diào)試數(shù)據(jù)。

正如之前講過的,你要用一個 LocalCluster 對象運行這個拓撲。在生產(chǎn)環(huán)境中,拓撲會持續(xù)運行,不過對于這個例子而言,你只要運行它幾秒鐘就能看到結(jié)果。

    LocalCluster cluster = new LocalCluster();
    cluster.submitTopology("Getting-Started-Topologie", conf, builder.createTopology());
    Thread.sleep(2000);
    cluster.shutdown();  

調(diào)用 createTopologysubmitTopology,運行拓撲,休眠兩秒鐘(拓撲在另外的線程運行),然后關(guān)閉集群。

例2-3是完整的代碼

    //例2-3 src/main/java/TopologyMain.java
    import spouts.WordReader;
    import backtype.storm.Config;
    import backtype.storm.LocalCluster;
    import backtype.storm.topology.TopologyBuilder;
    import backtype.storm.tuple.Fields;
    import bolts.WordCounter;
    import bolts.WordNormalizer;

    public class TopologyMain {
        public static void main(String[] args) throws InterruptedException {
        //定義拓撲
            TopologyBuilder builder = new TopologyBuilder());
            builder.setSpout("word-reader", new WordReader());
            builder.setBolt("word-normalizer", new WordNormalizer()).shuffleGrouping("word-reader");
            builder.setBolt("word-counter", new WordCounter(),2).fieldsGrouping("word-normalizer", new Fields("word"));

        //配置
            Config conf = new Config();
            conf.put("wordsFile", args[0]);
            conf.setDebug(false);

        //運行拓撲
             conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("Getting-Started-Topologie", conf, builder.createTopology();
            Thread.sleep(1000);
            cluster.shutdown();
        }
    }  

觀察運行情況

你已經(jīng)為運行你的第一個拓撲準備好了。在這個目錄下面創(chuàng)建一個文件,/src/main/resources/words.txt,一個單詞一行,然后用下面的命令運行這個拓撲:mvn exec:java -Dexec.mainClass=”TopologyMain” -Dexec.args=”src/main/resources/words.txt。舉個例子,如果你的 words.txt 文件有如下內(nèi)容: Storm test are great is an Storm simple application but very powerful really Storm is great 你應(yīng)該會在日志中看到類似下面的內(nèi)容: is: 2 application: 1 but: 1 great: 1 test: 1 simple: 1 Storm: 3 really: 1 are: 1 great: 1 an: 1 powerful: 1 very: 1 在這個例子中,每類節(jié)點只有一個實例。但是如果你有一個非常大的日志文件呢?你能夠很輕松的改變系統(tǒng)中的節(jié)點數(shù)量實現(xiàn)并行工作。這個時候,你就要創(chuàng)建兩個 WordCounter* 實例。

    builder.setBolt("word-counter", new WordCounter(),2).shuffleGrouping("word-normalizer");  

程序返回時,你將看到: — 單詞數(shù) 【word-counter-2】 — application: 1 is: 1 great: 1 are: 1 powerful: 1 Storm: 3 — 單詞數(shù) [word-counter-3] — really: 1 is: 1 but: 1 great: 1 test: 1 simple: 1 an: 1 very: 1 棒極了!修改并行度實在是太容易了(當然對于實際情況來說,每個實例都會運行在單獨的機器上)。不過似乎有一個問題:單詞 is 和 great 分別在每個 WordCounter 各計數(shù)一次。怎么會這樣?當你調(diào)用shuffleGrouping 時,就決定了 Storm 會以隨機分配的方式向你的 bolt 實例發(fā)送消息。在這個例子中,理想的做法是相同的單詞問題發(fā)送給同一個 WordCounter 實例。你把shuffleGrouping(“word-normalizer”) 換成 fieldsGrouping(“word-normalizer”, new Fields(“word”)) 就能達到目的。試一試,重新運行程序,確認結(jié)果。 你將在后續(xù)章節(jié)學(xué)習(xí)更多分組方式和消息流類型。

結(jié)論

我們已經(jīng)討論了 Storm 的本地和遠程操作模式之間的不同,以及 Storm 的強大和易于開發(fā)的特性。你也學(xué)習(xí)了一些 Storm 的基本概念,我們將在后續(xù)章節(jié)深入講解它們。

以上內(nèi)容是否對您有幫助:
在線筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號
微信公眾號

編程獅公眾號