首先讓我們開始實(shí)現(xiàn)單節(jié)點(diǎn)單代理
配置,然后我們將我們的設(shè)置遷移到單節(jié)點(diǎn)多代理配置。
希望你現(xiàn)在可以在你的機(jī)器上安裝 Java,ZooKeeper 和 Kafka 。 在遷移到 Kafka Cluster Setup
之前,首先需要啟動(dòng) ZooKeeper,因?yàn)?Kafka Cluster
使用 ZooKeeper。
打開一個(gè)新終端并鍵入以下命令 -
bin/zookeeper-server-start.sh config/zookeeper.properties
要啟動(dòng) Kafka Broker
,請(qǐng)鍵入以下命令 -
bin/kafka-server-start.sh config/server.properties
啟動(dòng) Kafka Broker
后,在 ZooKeeper 終端上鍵入命令
,您將看到以下響應(yīng) -
jps
821 QuorumPeerMain 928 Kafka 931 Jps
現(xiàn)在你可以看到兩個(gè)守護(hù)進(jìn)程運(yùn)行在終端上,QuorumPeerMain
是 ZooKeeper 守護(hù)進(jìn)程,另一個(gè)是 Kafka 守護(hù)進(jìn)程。
單節(jié)點(diǎn) - 單代理配置
在此配置中,您有一個(gè) ZooKeeper 和代理 id 實(shí)例。 以下是配置它的步驟 -
創(chuàng)建 Kafka
主題 - Kafka
提供了一個(gè)名為 kafka-topics.sh
的命令行實(shí)用程序,用于在服務(wù)器上創(chuàng)建主題。 打開新終端并鍵入以下示例。
語法
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1
--partitions 1 --topic topic-name
示例
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1
--partitions 1 --topic Hello-Kafka
我們剛剛創(chuàng)建了一個(gè)名為 Hello-Kafka
的主題,其中包含一個(gè)分區(qū)和一個(gè)副本因子。 上面創(chuàng)建的輸出將類似于以下輸出 -
輸出 - 創(chuàng)建主題 Hello-Kafka
創(chuàng)建主題后,您可以在 Kafka
代理終端窗口中獲取通知,并在 config / server.properties
文件中的“/ tmp / kafka-logs /"
中指定的創(chuàng)建主題的日志。
主題列表
要獲取 Kafka
服務(wù)器中的主題列表,可以使用以下命令 -
語法
bin/kafka-topics.sh --list --zookeeper localhost:2181
輸出
Hello-Kafka
由于我們已經(jīng)創(chuàng)建了一個(gè)主題,它將僅列出 Hello-Kafka
。 假設(shè),如果創(chuàng)建多個(gè)主題,您將在輸出中獲取主題名稱。
啟動(dòng)生產(chǎn)者以發(fā)送消息
語法
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic topic-name
從上面的語法,生產(chǎn)者命令行客戶端需要兩個(gè)主要參數(shù) -
代理列表 - 我們要發(fā)送郵件的代理列表。 在這種情況下,我們只有一個(gè)代理。 Config / server.properties
文件包含代理端口 ID,因?yàn)槲覀冎牢覀兊拇碚趥陕牰丝?9092,因此您可以直接指定它。
主題名稱 - 以下是主題名稱的示例。
示例
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic Hello-Kafka
生產(chǎn)者將等待來自 stdin
的輸入并發(fā)布到 Kafka
集群。 默認(rèn)情況下,每個(gè)新行都作為新消息發(fā)布,然后在 config / producer.properties
文件中指定默認(rèn)生產(chǎn)者屬性。 現(xiàn)在,您可以在終端中鍵入幾行消息,如下所示。
輸出
$ bin/kafka-console-producer.sh --broker-list localhost:9092
--topic Hello-Kafka[2016-01-16 13:50:45,931]
WARN property topic is not valid (kafka.utils.Verifia-bleProperties)
Hello
My first message
My second message
啟動(dòng)消費(fèi)者以接收消息
與生產(chǎn)者類似,在config / consumer.proper-ties
文件中指定了缺省使用者屬性。 打開一個(gè)新終端并鍵入以下消息消息語法。
語法
bin/kafka-console-consumer.sh --zookeeper localhost:2181 —-topic topic-name
--from-beginning
示例
bin/kafka-console-consumer.sh --zookeeper localhost:2181 —-topic Hello-Kafka
--from-beginning
輸出
Hello
My first message
My second message
最后,您可以從制作商的終端輸入消息,并看到他們出現(xiàn)在消費(fèi)者的終端。 到目前為止,您對(duì)具有單個(gè)代理的單節(jié)點(diǎn)群集有非常好的了解。 現(xiàn)在讓我們繼續(xù)討論多個(gè)代理配置。
單節(jié)點(diǎn)多代理配置
在進(jìn)入多個(gè)代理集群設(shè)置之前,首先啟動(dòng) ZooKeeper 服務(wù)器。
創(chuàng)建多個(gè)
中已有一個(gè) Kafka Brokers
- 我們?cè)谂渲?code>/ server.propertiesKafka
代理實(shí)例。 現(xiàn)在我們需要多個(gè)代理實(shí)例,因此將現(xiàn)有的 server.properties
文件復(fù)制到兩個(gè)新的配置文件中,并將其重命名為 server-one.properties
和 server-two.properties
。 然后編輯這兩個(gè)新文件并分配以下更改 -
# The id of the broker. This must be set to a unique integer for each broker. broker.id=1 # The port the socket server listens on port=9093 # A comma seperated list of directories under which to store log files log.dirs=/tmp/kafka-logs-1
config / server-two.properties
# The id of the broker. This must be set to a unique integer for each broker. broker.id=2 # The port the socket server listens on port=9094 # A comma seperated list of directories under which to store log files log.dirs=/tmp/kafka-logs-2
啟動(dòng)多個(gè)代理 - 在三臺(tái)服務(wù)器上進(jìn)行所有更改后,打開三個(gè)新終端,逐個(gè)啟動(dòng)每個(gè)代理。
Broker1 bin/kafka-server-start.sh config/server.properties Broker2 bin/kafka-server-start.sh config/server-one.properties Broker3 bin/kafka-server-start.sh config/server-two.properties
現(xiàn)在我們有三個(gè)不同的經(jīng)紀(jì)人在機(jī)器上運(yùn)行。 自己嘗試,通過在 ZooKeeper 終端上鍵入 jps
檢查所有守護(hù)程序,然后您將看到響應(yīng)。
讓我們?yōu)榇酥黝}將復(fù)制因子值指定為三個(gè),因?yàn)槲覀冇腥齻€(gè)不同的代理運(yùn)行。 如果您有兩個(gè)代理,那么分配的副本值將是兩個(gè)。
語法
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 -partitions 1 --topic topic-name
示例
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 -partitions 1 --topic Multibrokerapplication
輸出
created topic “Multibrokerapplication"
Describe
命令用于檢查哪個(gè)代理正在偵聽當(dāng)前創(chuàng)建的主題,如下所示 -
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic Multibrokerappli-cation
輸出
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic Multibrokerappli-cation Topic:Multibrokerapplication PartitionCount:1 ReplicationFactor:3 Configs: Topic:Multibrokerapplication Partition:0 Leader:0 Replicas:0,2,1 Isr:0,2,1
從上面的輸出,我們可以得出結(jié)論,第一行給出所有分區(qū)的摘要,顯示主題名稱,分區(qū)數(shù)量和我們已經(jīng)選擇的復(fù)制因子。 在第二行中,每個(gè)節(jié)點(diǎn)將是分區(qū)的隨機(jī)選擇部分的領(lǐng)導(dǎo)者。
在我們的例子中,我們看到我們的第一個(gè) broker(with broker.id 0)
是領(lǐng)導(dǎo)者。 然后 Replicas:0,2,1
意味著所有代理復(fù)制主題最后
是Isr
副本的集合。 那么,這是副本的子集,當(dāng)前活著并被領(lǐng)導(dǎo)者趕上。
in-sync
此過程保持與單代理設(shè)置中相同。
示例
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic Multibrokerapplication
輸出
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic Multibrokerapplication [2016-01-20 19:27:21,045] WARN Property topic is not valid (kafka.utils.Verifia-bleProperties) This is single node-multi broker demo This is the second message
此過程保持與單代理設(shè)置中所示的相同。
示例
bin/kafka-console-consumer.sh --zookeeper localhost:2181 —topic Multibrokerapplica-tion --from-beginning
輸出
bin/kafka-console-consumer.sh --zookeeper localhost:2181 —topic Multibrokerapplica-tion —from-beginning This is single node-multi broker demo This is the second message
在本章中,我們將討論各種基本主題操作。
您已經(jīng)了解如何在 Kafka Cluster
中創(chuàng)建主題。 現(xiàn)在讓我們使用以下命令修改已創(chuàng)建的主題
語法
bin/kafka-topics.sh —zookeeper localhost:2181 --alter --topic topic_name --parti-tions count
示例
We have already created a topic “Hello-Kafka" with single partition count and one replica factor. Now using “alter" command we have changed the partition count. bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic Hello-kafka --parti-tions 2
輸出
WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected Adding partitions succeeded!
要?jiǎng)h除主題,可以使用以下語法。
語法
bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic topic_name
示例
bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic Hello-kafka
輸出
> Topic Hello-kafka marked for deletion
注意 - 如果 delete.topic.enable
未設(shè)置為 true
,則此操作不會(huì)產(chǎn)生任何影響
更多建議: