架構(gòu)師之路-如何建立高可用消息中間件kafka

2018-12-13 18:07 更新

Kafka

一、熟悉kafka

images/F8s4zKwhyKCfZxaiefDSaAmkG46bwYYB.png

images/w8J8Jm2Ri6WN52WP5SGMFjP6NnAxWQPR.png

l  Server-1 broker其實就是kafka的server,因為producer和consumer都要去連它。Broker主要還是做存儲用。

l  Server-2是zookeeper的server端,zookeeper的具體作用你可以去官網(wǎng)查,在這里你可以先想象,它維持了一張表,記錄了各個節(jié)點的IP、端口等信息(以后還會講到,它里面還存了kafka的相關(guān)信息)。

l  Server-3、4、5他們的共同之處就是都配置了zkClient,更明確的說,就是運行前必須配置zookeeper的地址,道理也很簡單,這之間的連接都是需要zookeeper來進(jìn)行分發(fā)的。

l  Server-1和Server-2的關(guān)系,他們可以放在一臺機(jī)器上,也可以分開放,zookeeper也可以配集群。目的是防止某一臺掛了。

簡單說下整個系統(tǒng)運行的順序:

1. 啟動zookeeper的server

2. 啟動kafka的server

3. Producer如果生產(chǎn)了數(shù)據(jù),會先通過zookeeper找到broker,然后將數(shù)據(jù)存放進(jìn)broker

4.  Consumer如果要消費數(shù)據(jù),會先通過zookeeper找對應(yīng)的broker,然后消費。

 

Kafka 分布式消息隊列 類似產(chǎn)品有JBoss、MQ

一、由Linkedln 開源,使用scala開發(fā),有如下幾個特點:

(1)高吞吐

(2)分布式

(3)支持多語言客戶端 (C++、Java)


二、組成: 客戶端是 producer 和 consumer,提供一些API,服務(wù)器端是Broker,客戶端提供可以向Broker內(nèi)發(fā)布消息、消費消息,服務(wù)器端提供消息的存儲等功能

Kafka 特點是支持分區(qū)、分布式、可拓展性強


三、Kafka 的消息分幾個層次

(1)Topic 一類主題

(2)Partition 默認(rèn)每個消息有2個分區(qū),創(chuàng)建Topic可以指定分區(qū)數(shù),1天有 1億行可以分8個分區(qū),如果每天幾十萬行就一個分區(qū)吧

(3)Message 是每個消息


四、數(shù)據(jù)處理流程

1.生產(chǎn)者 生產(chǎn)消息、將消息發(fā)布到指定的topic分區(qū)

2.kafka 集群接收到producer發(fā)過來的消息后,將其持久化到硬盤,可以指定時長,而不關(guān)注消息是否被消費

3.consumer從kafka集群pull或push方式,并控制獲取消息的offset偏移量,consumer重啟時需要根據(jù)offset開始再次消費數(shù)據(jù),consumer自己維護(hù)offset


五、kafka如何實現(xiàn)高吞吐量

1.充分利用磁盤的順序讀寫2.數(shù)據(jù)批量發(fā)送3.數(shù)據(jù)壓縮4.Topic劃分多個partition


六、kafka 如何實現(xiàn)load balance &HA

1)producer 根據(jù)用戶指定的算法,將消息發(fā)送到指定的partition2)存在多個partition,每個partition存在多個副本replica,每個replica分布在不同的broker節(jié)點上3)每個partition需要選取lead partition,leader partition負(fù)責(zé)讀寫,并由zookeeper負(fù)責(zé)fail over 快速失敗4)通過zookeeper管理broker與consumer的動態(tài)加入與離開


七、擴(kuò)容

當(dāng)需要增加broker節(jié)點時,新增的broker會向zookeeper注冊,而producer及consumer會根據(jù)zookeeper上的watcher感知這些變化,并及時作出調(diào)整

 

副本分配邏輯規(guī)則如下:

  • 在Kafka集群中,每個Broker都有均等分配Partition的Leader機(jī)會。
  • 上述圖Broker      Partition中,箭頭指向為副本,以Partition-0為例:broker1中parition-0為Leader,Broker2中Partition-0為副本。
  • 上述圖種每個Broker(按照BrokerId有序)依次分配主Partition,下一個Broker為副本,如此循環(huán)迭代分配,多副本都遵循此規(guī)則。

 

副本分配算法如下:

  • 將所有N      Broker和待分配的i個Partition排序.
  • 將第i個Partition分配到第(i mod      n)個Broker上.
  • 將第i個Partition的第j個副本分配到第((i +      j) mod n)個Broker上.

 

二、安裝zookeeper,并配置集群

準(zhǔn)備三臺機(jī)器做集群

服務(wù)器

IP地址

端口

服務(wù)器1

172.16.0.41

2181/2881/3881

服務(wù)器2

172.16.0.42

2182/2882/3882

服務(wù)器3

172.16.0.43

2183/2883/3883

2.1配置java環(huán)境

將jdk-7u79-linux-x64上傳到三臺服務(wù)器安裝配置。

給三臺服務(wù)器分別創(chuàng)建java文件夾。

將jdk 放到j(luò)ava文件夾下并解壓,然后刪掉壓縮文件。

配置jdk全局變量。

#vi /etc/profile

export JAVA_HOME=/usr/local/java/jdk1.7.0_79

export JRE_HOME=/usr/local/java/jdk1.7.0_79/jre

export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar:$JRE_HOME/lib:$CLASSPATH

export PATH=$JAVA_HOME/bin:$PATH


2.2 修改操作系統(tǒng)的/etc/hosts文件,添加IP與主機(jī)名映射:

   # zookeeper cluster servers

172.16.0.41 edu-zk-01

172.16.0.42 edu-zk-02

172.16.0.43 edu-zk-03


2.3下載zookeeper-3.4.7.tar.gz 到/home/zy/zookeeper目錄

# mkdir -p /usr/local/zookeeper

# cd / usr/local/zookeeper/

# wget http://apache.fayea.com/zookeeper/zookeeper-3.4.7/zookeeper-3.4.7.tar.gz


2.4 解壓zookeeper安裝包,并對節(jié)點重民名

#tar -zxvf zookeeper-3.4.7.tar.gz

服務(wù)器1:

#mv zookeeper-3.4.7 node-01

服務(wù)器2:    #mv zookeeper-3.4.7 node-02

服務(wù)器3:

#mv zookeeper-3.4.7 node-03


2.5 在zookeeper的各個節(jié)點下 創(chuàng)建數(shù)據(jù)和日志目錄

#cd /usr/local/zookeeper

#mkdir data

#mkdir logs


2.6 重命名配置文件

    將zookeeper/node-0X/conf目錄下的zoo_sample.cfg文件拷貝一份,命名為zoo.cfg:

#cp zoo_sample.cfg zoo.cfg


2.7 修改zoo.cfg 配置文件

三臺服務(wù)器做同樣配置:zookeeper/node-01的配置(/usr/local/zookeeper/node-01/conf/zoo.cfg)如下:

images/PB6HXCyF82SxJfsQT6ThjMbKAD74ewGA.png

參數(shù)說明:

tickTime=2000

tickTime這個時間是作為Zookeeper服務(wù)器之間或客戶端與服務(wù)器之間維持心跳的時間間隔,也就是每個tickTime時間就會發(fā)送一個心跳。

initLimit=10

initLimit這個配置項是用來配置Zookeeper接受客戶端(這里所說的客戶端不是用戶連接Zookeeper服務(wù)器的客戶端,而是Zookeeper服務(wù)器集群中連接到Leader的Follower 服務(wù)器)初始化連接時最長能忍受多少個心跳時間間隔數(shù)。當(dāng)已經(jīng)超過10個心跳的時間(也就是tickTime)長度后Zookeeper 服務(wù)器還沒有收到客戶端的返回信息,那么表明這個客戶端連接失敗??偟臅r間長度就是10*2000=20 秒。

syncLimit=5

syncLimit這個配置項標(biāo)識Leader與Follower之間發(fā)送消息,請求和應(yīng)答時間長度,最長不能超過多少個tickTime的時間長度,總的時間長度就是5*2000=10秒。

dataDir=/usr/local/zookeeper/node-01/data

dataDir顧名思義就是Zookeeper保存數(shù)據(jù)的目錄,默認(rèn)情況下Zookeeper將寫數(shù)據(jù)的日志文件也保存在這個目錄里。

clientPort=2181

clientPort這個端口就是客戶端(應(yīng)用程序)連接Zookeeper服務(wù)器的端口,Zookeeper會監(jiān)聽這個端口接受客戶端的訪問請求。

server.A=B:C:D

server.1=edu-zk-01:2881:3881

server.2=edu-zk-02:2882:3882

server.3=edu-zk-03:2883:3883

A是一個數(shù)字,表示這個是第幾號服務(wù)器;

B是這個服務(wù)器的IP地址(或者是與IP地址做了映射的主機(jī)名);

C第一個端口用來集群成員的信息交換,表示這個服務(wù)器與集群中的Leader服務(wù)器交換信息的端口;

D是在leader掛掉時專門用來進(jìn)行選舉leader所用的端口。

注意:如果是偽集群的配置方式,不同的 Zookeeper 實例通信端口號不能一樣,所以要給它們分配不同的端口號。


2.8 創(chuàng)建myid文件

在dataDir=/usr/local/zookeeper/node-0X/data 下創(chuàng)建myid文件

編輯myid文件,并在對應(yīng)的IP的機(jī)器上輸入對應(yīng)的編號。如在node-01上,myid文件內(nèi)容就是1,node-02上就是2,node-03上就是3:

#vi /usr/local/zookeeper/node-01/data/myid## 值為1

#vi /usr/local/zookeeper/node-02/data/myid## 值為2

#vi /usr/local/zookeeper/node-03/data/myid## 值為3


2.9 啟動測試zookeeper

(1)進(jìn)入/usr/local/zookeeper/node-0X/bin目錄下執(zhí)行:

#/usr/local/zookeeper/node-01/bin/zkServer.sh start

#/usr/local/zookeeper/node-02/bin/zkServer.sh start

#/usr/local/zookeeper/node-03/bin/zkServer.sh start

images/RJBD8tnKFe8EQ6nNJX7wrTWKF3FeBbTK.png

(2)輸入jps命令查看進(jìn)程:

  images/BwddsBjXh2tnwNGCC6Xw8H8mEMDjBzht.png

其中,QuorumPeerMain是zookeeper進(jìn)程,說明啟動正常

(3)查看狀態(tài):

   # /usr/local/zookeeper/node-01/bin/zkServer.sh status

images/JTMDFQ5Sa26YaEpQ8N7XXTaQsd2f3fWN.png

 (4)查看zookeeper服務(wù)輸出信息:

 由于服務(wù)信息輸出文件在/usr/local/zookeeper/node-0X/bin/zookeeper.out

$ tail -500f zookeeper.out

images/C8CXnXpXnxnBFNmaZhSzjFtcKSfHwZ3T.png


三、KAFKA集群配置

利用安裝zookeeper的三臺服務(wù)器做KAFKA集群,也可以新建三個虛擬機(jī)去操作。

服務(wù)器

IP地址

端口

服務(wù)器1

172.16.0.41

9092

服務(wù)器2

172.16.0.42

9092

服務(wù)器3

172.16.0.43

9092


4.1 下載 kafka_2.9.2-0.8.1

分別在三臺服務(wù)器創(chuàng)建kafka目錄并且下載kafka壓縮包

#mkdir /usr/local/kafka

#tar –zxvf kafka_2.9.2-0.8.1.tar.gz


4.2 創(chuàng)建log文件夾

#mkdir /usr/local/kafka/kafkalogs


4.3 配置kafka

#cd /usr/local/kafka/kafka_2.9.2-0.8.1/config

#vi server.properties  修改項如下:

broker.id=0      //當(dāng)前機(jī)器在集群中的唯一標(biāo)識

port=9092       //kafka對外提供服務(wù)的tcp端口

host.name=172.16.0.41    //主機(jī)IP地址

log.dirs=/usr/local/kafka/kafkalogs    //log存放目錄

message.max.byte=5048576     //kafka一條消息容納的消息最大為多少

default.replication.factor=2   //每個分區(qū)默認(rèn)副本數(shù)量

replica.fetch.max.bytes=5048576  

zookeeper.connect=172.16.0.41:2181,172.16.0.42:2182,172.16.0.43:2183


4.4 啟動kafka

# ./kafka-server-start.sh  -daemon ../config/server.properties   //后臺啟動運行


4.5 問題解決

[root@master ~]#  /export/kafka/bin/kafka-console-producer.sh  --broker-list 10.14.2.201:9092,10.14.2.202:9092,10.14.2.203:9092,10.14.2.204:9092    --topic test

SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".

 SLF4J: Defaulting to no-operation (NOP) logger implementation

 SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.

 

 # /export/kafka/bin/kafka-console-consumer.sh  --zookeeper   10.14.2.201:2181,10.14.2.202:2181,10.14.2.203:2181,10.14.2.204:2181  --topic test --from-beginning

SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".

 SLF4J: Defaulting to no-operation (NOP) logger implementation

 SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.

解決方法:

 下載slf4j-1.7.6.zip

 http://www.slf4j.org/dist/slf4j-1.7.6.zip

 解壓

 unzip slf4j-1.7.6.zip

 把slf4j-nop-1.7.6.jar 包復(fù)制到kafka libs目錄下面

 cd  slf4j-1.7.6

 cp slf4j-nop-1.7.6.jar  /export/kafka/libs/


四、KAFKA集群驗證

5.1 創(chuàng)建topic

#./kafka-topics.sh --create --zookeeper 172.16.0.42:2182 --replication-factor 1 --partitions 1 --topic test


5.2 查看topic

# ./kafka-topics.sh --list --zookeeper 172.16.0.42:2182


5.3 開啟發(fā)送者并發(fā)送消息

#./kafka-console-producer.sh --broker-list 172.16.0.41:9092 --topic test

images/58wEcGsxTb2RNb7ynYhnwtksGApXPRa6.png


5.4 開啟消費者并接收消息

#./kafka-console-consumer.sh --zookeeper 172.16.0.42:2182 --topic test --from-beginning

images/d5dS3yndAsHswEFwctMdMHxmFH6wYFYz.png


更參考內(nèi)容:http://www.roncoo.com/article/index.html?title=%E6%9E%B6%E6%9E%84%E5%B8%88%E4%B9%8B%E8%B7%AF


以上內(nèi)容是否對您有幫助:
在線筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號
微信公眾號

編程獅公眾號