Apache Kafka 基本操作

2023-03-15 15:44 更新

首先讓我們開始實(shí)現(xiàn)單節(jié)點(diǎn)單代理配置,然后我們將我們的設(shè)置遷移到單節(jié)點(diǎn)多代理配置。

希望你現(xiàn)在可以在你的機(jī)器上安裝 Java,ZooKeeper 和 Kafka 。 在遷移到 Kafka Cluster Setup 之前,首先需要啟動(dòng) ZooKeeper,因?yàn)?Kafka Cluster 使用 ZooKeeper。

啟動(dòng)ZooKeeper

打開一個(gè)新終端并鍵入以下命令 -

bin/zookeeper-server-start.sh config/zookeeper.properties

要啟動(dòng) Kafka Broker,請(qǐng)鍵入以下命令 -

bin/kafka-server-start.sh config/server.properties

啟動(dòng) Kafka Broker后,在 ZooKeeper 終端上鍵入命令 jps ,您將看到以下響應(yīng) -

821 QuorumPeerMain
928 Kafka
931 Jps

現(xiàn)在你可以看到兩個(gè)守護(hù)進(jìn)程運(yùn)行在終端上,QuorumPeerMain 是 ZooKeeper 守護(hù)進(jìn)程,另一個(gè)是 Kafka 守護(hù)進(jìn)程。

單節(jié)點(diǎn) - 單代理配置

在此配置中,您有一個(gè) ZooKeeper 和代理 id 實(shí)例。 以下是配置它的步驟 -

創(chuàng)建 Kafka 主題 - Kafka 提供了一個(gè)名為 kafka-topics.sh 的命令行實(shí)用程序,用于在服務(wù)器上創(chuàng)建主題。 打開新終端并鍵入以下示例。

語法

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 
--partitions 1 --topic topic-name

示例

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1   
--partitions 1 --topic Hello-Kafka

我們剛剛創(chuàng)建了一個(gè)名為 Hello-Kafka 的主題,其中包含一個(gè)分區(qū)和一個(gè)副本因子。 上面創(chuàng)建的輸出將類似于以下輸出 -

輸出 - 創(chuàng)建主題 Hello-Kafka

創(chuàng)建主題后,您可以在 Kafka 代理終端窗口中獲取通知,并在 config / server.properties 文件中的“/ tmp / kafka-logs /"中指定的創(chuàng)建主題的日志。

主題列表

要獲取 Kafka 服務(wù)器中的主題列表,可以使用以下命令 -

語法

bin/kafka-topics.sh --list --zookeeper localhost:2181

輸出

Hello-Kafka

由于我們已經(jīng)創(chuàng)建了一個(gè)主題,它將僅列出 Hello-Kafka 。 假設(shè),如果創(chuàng)建多個(gè)主題,您將在輸出中獲取主題名稱。

啟動(dòng)生產(chǎn)者以發(fā)送消息

語法

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic topic-name

從上面的語法,生產(chǎn)者命令行客戶端需要兩個(gè)主要參數(shù) -

代理列表 - 我們要發(fā)送郵件的代理列表。 在這種情況下,我們只有一個(gè)代理。 Config / server.properties 文件包含代理端口 ID,因?yàn)槲覀冎牢覀兊拇碚趥陕牰丝?9092,因此您可以直接指定它。

主題名稱 - 以下是主題名稱的示例。

示例

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic Hello-Kafka

生產(chǎn)者將等待來自 stdin 的輸入并發(fā)布到 Kafka 集群。 默認(rèn)情況下,每個(gè)新行都作為新消息發(fā)布,然后在 config / producer.properties 文件中指定默認(rèn)生產(chǎn)者屬性。 現(xiàn)在,您可以在終端中鍵入幾行消息,如下所示。

輸出

$ bin/kafka-console-producer.sh --broker-list localhost:9092 
--topic Hello-Kafka[2016-01-16 13:50:45,931] 
WARN property topic is not valid (kafka.utils.Verifia-bleProperties)
Hello
My first message
My second message

啟動(dòng)消費(fèi)者以接收消息

與生產(chǎn)者類似,在config / consumer.proper-ties 文件中指定了缺省使用者屬性。 打開一個(gè)新終端并鍵入以下消息消息語法。

語法

bin/kafka-console-consumer.sh --zookeeper localhost:2181 —-topic topic-name 
--from-beginning

示例

bin/kafka-console-consumer.sh --zookeeper localhost:2181 —-topic Hello-Kafka 
--from-beginning

輸出

Hello
My first message
My second message

最后,您可以從制作商的終端輸入消息,并看到他們出現(xiàn)在消費(fèi)者的終端。 到目前為止,您對(duì)具有單個(gè)代理的單節(jié)點(diǎn)群集有非常好的了解。 現(xiàn)在讓我們繼續(xù)討論多個(gè)代理配置。

單節(jié)點(diǎn)多代理配置

在進(jìn)入多個(gè)代理集群設(shè)置之前,首先啟動(dòng) ZooKeeper 服務(wù)器。

創(chuàng)建多個(gè)Kafka Brokers - 我們?cè)谂渲?code>/ server.properties 中已有一個(gè) Kafka 代理實(shí)例。 現(xiàn)在我們需要多個(gè)代理實(shí)例,因此將現(xiàn)有的 server.properties 文件復(fù)制到兩個(gè)新的配置文件中,并將其重命名為 server-one.propertiesserver-two.properties。 然后編輯這兩個(gè)新文件并分配以下更改 -

config / server-one.properties

# The id of the broker. This must be set to a unique integer for each broker.
broker.id=1
# The port the socket server listens on
port=9093
# A comma seperated list of directories under which to store log files
log.dirs=/tmp/kafka-logs-1

config / server-two.properties

# The id of the broker. This must be set to a unique integer for each broker.
broker.id=2
# The port the socket server listens on
port=9094
# A comma seperated list of directories under which to store log files
log.dirs=/tmp/kafka-logs-2

啟動(dòng)多個(gè)代理 - 在三臺(tái)服務(wù)器上進(jìn)行所有更改后,打開三個(gè)新終端,逐個(gè)啟動(dòng)每個(gè)代理。

Broker1
bin/kafka-server-start.sh config/server.properties
Broker2
bin/kafka-server-start.sh config/server-one.properties
Broker3
bin/kafka-server-start.sh config/server-two.properties

現(xiàn)在我們有三個(gè)不同的經(jīng)紀(jì)人在機(jī)器上運(yùn)行。 自己嘗試,通過在 ZooKeeper 終端上鍵入 jps 檢查所有守護(hù)程序,然后您將看到響應(yīng)。

創(chuàng)建主題

讓我們?yōu)榇酥黝}將復(fù)制因子值指定為三個(gè),因?yàn)槲覀冇腥齻€(gè)不同的代理運(yùn)行。 如果您有兩個(gè)代理,那么分配的副本值將是兩個(gè)。

語法

bin/kafka-topics.sh 
--create 
--zookeeper localhost:2181 
--replication-factor 3 
-partitions 1 
--topic topic-name

示例

bin/kafka-topics.sh 
--create 
--zookeeper localhost:2181 
--replication-factor 3 
-partitions 1 
--topic Multibrokerapplication

輸出

created topic “Multibrokerapplication"

Describe 命令用于檢查哪個(gè)代理正在偵聽當(dāng)前創(chuàng)建的主題,如下所示 -

bin/kafka-topics.sh 
--describe 
--zookeeper localhost:2181 
--topic Multibrokerappli-cation

輸出

bin/kafka-topics.sh 
--describe 
--zookeeper localhost:2181 
--topic Multibrokerappli-cation

Topic:Multibrokerapplication    PartitionCount:1 
ReplicationFactor:3 Configs:
   
Topic:Multibrokerapplication Partition:0 Leader:0 
Replicas:0,2,1 Isr:0,2,1

從上面的輸出,我們可以得出結(jié)論,第一行給出所有分區(qū)的摘要,顯示主題名稱,分區(qū)數(shù)量和我們已經(jīng)選擇的復(fù)制因子。 在第二行中,每個(gè)節(jié)點(diǎn)將是分區(qū)的隨機(jī)選擇部分的領(lǐng)導(dǎo)者。

在我們的例子中,我們看到我們的第一個(gè) broker(with broker.id 0)是領(lǐng)導(dǎo)者。 然后 Replicas:0,2,1 意味著所有代理復(fù)制主題最后 Isr in-sync 副本的集合。 那么,這是副本的子集,當(dāng)前活著并被領(lǐng)導(dǎo)者趕上。

啟動(dòng)生產(chǎn)者以發(fā)送消息

此過程保持與單代理設(shè)置中相同。

示例

bin/kafka-console-producer.sh --broker-list localhost:9092 
--topic Multibrokerapplication

輸出

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic Multibrokerapplication
[2016-01-20 19:27:21,045] WARN Property topic is not valid (kafka.utils.Verifia-bleProperties)
This is single node-multi broker demo
This is the second message

啟動(dòng)消費(fèi)者以接收消息

此過程保持與單代理設(shè)置中所示的相同。

示例

bin/kafka-console-consumer.sh 
--zookeeper localhost:2181 
—topic Multibrokerapplica-tion 
--from-beginning

輸出

bin/kafka-console-consumer.sh 
--zookeeper localhost:2181 
—topic Multibrokerapplica-tion —from-beginning
This is single node-multi broker demo
This is the second message

基本主題操作

在本章中,我們將討論各種基本主題操作。

修改主題

您已經(jīng)了解如何在 Kafka Cluster 中創(chuàng)建主題。 現(xiàn)在讓我們使用以下命令修改已創(chuàng)建的主題

語法

bin/kafka-topics.sh 
—zookeeper localhost:2181 
--alter 
--topic topic_name 
--parti-tions count

示例

We have already created a topic “Hello-Kafka" with single partition count and one replica factor. 
Now using “alter" command we have changed the partition count.
bin/kafka-topics.sh 
--zookeeper localhost:2181 
--alter 
--topic Hello-kafka 
--parti-tions 2

輸出

WARNING: If partitions are increased for a topic that has a key, 
the partition logic or ordering of the messages will be affected
Adding partitions succeeded!

刪除主題

要?jiǎng)h除主題,可以使用以下語法。

語法

bin/kafka-topics.sh 
--zookeeper localhost:2181 
--delete 
--topic topic_name

示例

bin/kafka-topics.sh 
--zookeeper localhost:2181 
--delete 
--topic Hello-kafka

輸出

> Topic Hello-kafka marked for deletion

注意 - 如果 delete.topic.enable 未設(shè)置為 true,則此操作不會(huì)產(chǎn)生任何影響



以上內(nèi)容是否對(duì)您有幫助:
在線筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號(hào)
微信公眾號(hào)

編程獅公眾號(hào)