Apache Kafka 快速指南

2022-02-09 14:18 更新

Apache Kafka - 簡介

在大數(shù)據(jù)中,使用了大量的數(shù)據(jù)。 關(guān)于數(shù)據(jù),我們有兩個主要挑戰(zhàn)。第一個挑戰(zhàn)是如何收集大量的數(shù)據(jù),第二個挑戰(zhàn)是分析收集的數(shù)據(jù)。 為了克服這些挑戰(zhàn),您必須需要一個消息系統(tǒng)。

Kafka專為分布式高吞吐量系統(tǒng)而設(shè)計(jì)。 Kafka往往工作得很好,作為一個更傳統(tǒng)的消息代理的替代品。 與其他消息傳遞系統(tǒng)相比,Kafka具有更好的吞吐量,內(nèi)置分區(qū),復(fù)制和固有的容錯能力,這使得它非常適合大規(guī)模消息處理應(yīng)用程序。

什么是消息系統(tǒng)?

消息系統(tǒng)負(fù)責(zé)將數(shù)據(jù)從一個應(yīng)用程序傳輸?shù)搅硪粋€應(yīng)用程序,因此應(yīng)用程序可以專注于數(shù)據(jù),但不擔(dān)心如何共享它。 分布式消息傳遞基于可靠消息隊(duì)列的概念。 消息在客戶端應(yīng)用程序和消息傳遞系統(tǒng)之間異步排隊(duì)。 有兩種類型的消息模式可用 - 一種是點(diǎn)對點(diǎn),另一種是發(fā)布 - 訂閱(pub-sub)消息系統(tǒng)。 大多數(shù)消息模式遵循 pub-sub 。

點(diǎn)對點(diǎn)消息系統(tǒng)

在點(diǎn)對點(diǎn)系統(tǒng)中,消息被保留在隊(duì)列中。 一個或多個消費(fèi)者可以消耗隊(duì)列中的消息,但是特定消息只能由最多一個消費(fèi)者消費(fèi)。 一旦消費(fèi)者讀取隊(duì)列中的消息,它就從該隊(duì)列中消失。 該系統(tǒng)的典型示例是訂單處理系統(tǒng),其中每個訂單將由一個訂單處理器處理,但多個訂單處理器也可以同時(shí)工作。 下圖描述了結(jié)構(gòu)。

point-to-point Messaging system

發(fā)布 - 訂閱消息系統(tǒng)

在發(fā)布 - 訂閱系統(tǒng)中,消息被保留在主題中。 與點(diǎn)對點(diǎn)系統(tǒng)不同,消費(fèi)者可以訂閱一個或多個主題并使用該主題中的所有消息。 在發(fā)布 - 訂閱系統(tǒng)中,消息生產(chǎn)者稱為發(fā)布者,消息使用者稱為訂閱者。 一個現(xiàn)實(shí)生活的例子是Dish電視,它發(fā)布不同的渠道,如運(yùn)動,電影,音樂等,任何人都可以訂閱自己的頻道集,并獲得他們訂閱的頻道時(shí)可用。

Publish-Subscribe Messaging system

什么是Kafka?

Apache Kafka是一個分布式發(fā)布 - 訂閱消息系統(tǒng)和一個強(qiáng)大的隊(duì)列,可以處理大量的數(shù)據(jù),并使您能夠?qū)⑾囊粋€端點(diǎn)傳遞到另一個端點(diǎn)。 Kafka適合離線和在線消息消費(fèi)。 Kafka消息保留在磁盤上,并在群集內(nèi)復(fù)制以防止數(shù)據(jù)丟失。 Kafka構(gòu)建在ZooKeeper同步服務(wù)之上。 它與Apache Storm和Spark非常好地集成,用于實(shí)時(shí)流式數(shù)據(jù)分析。

好處

以下是Kafka的幾個好處 -

  • 可靠性 - Kafka是分布式,分區(qū),復(fù)制和容錯的。

  • 可擴(kuò)展性 - Kafka消息傳遞系統(tǒng)輕松縮放,無需停機(jī)。

  • 耐用性 - Kafka使用分布式提交日志,這意味著消息會盡可能快地保留在磁盤上,因此它是持久的。

  • 性能 - Kafka對于發(fā)布和訂閱消息都具有高吞吐量。 即使存儲了許多TB的消息,它也保持穩(wěn)定的性能。

Kafka非??欤⒈WC零停機(jī)和零數(shù)據(jù)丟失。

用例

Kafka可以在許多用例中使用。 其中一些列出如下 -

  • 指標(biāo) - Kafka通常用于操作監(jiān)控?cái)?shù)據(jù)。 這涉及聚合來自分布式應(yīng)用程序的統(tǒng)計(jì)信息,以產(chǎn)生操作數(shù)據(jù)的集中饋送。

  • 日志聚合解決方案 - Kafka可用于跨組織從多個服務(wù)收集日志,并使它們以標(biāo)準(zhǔn)格式提供給多個服務(wù)器。

  • 流處理 - 流行的框架(如Storm和Spark Streaming)從主題中讀取數(shù)據(jù),對其進(jìn)行處理,并將處理后的數(shù)據(jù)寫入新主題,供用戶和應(yīng)用程序使用。 Kafka的強(qiáng)耐久性在流處理的上下文中也非常有用。

需要Kafka

Kafka是一個統(tǒng)一的平臺,用于處理所有實(shí)時(shí)數(shù)據(jù)Feed。 Kafka支持低延遲消息傳遞,并在出現(xiàn)機(jī)器故障時(shí)提供對容錯的保證。 它具有處理大量不同消費(fèi)者的能力。 Kafka非常快,執(zhí)行2百萬寫/秒。 Kafka將所有數(shù)據(jù)保存到磁盤,這實(shí)質(zhì)上意味著所有寫入都會進(jìn)入操作系統(tǒng)(RAM)的頁面緩存。 這使得將數(shù)據(jù)從頁面緩存?zhèn)鬏數(shù)骄W(wǎng)絡(luò)套接字非常有效。

Apache Kafka - 基礎(chǔ)

在深入學(xué)習(xí)Kafka之前,您必須了解主題,經(jīng)紀(jì)人,生產(chǎn)者和消費(fèi)者等主要術(shù)語。 下圖說明了主要術(shù)語,表格詳細(xì)描述了圖表組件。

Fundamentals

在上圖中,主題配置為三個分區(qū)。 分區(qū)1具有兩個偏移因子0和1.分區(qū)2具有四個偏移因子0,1,2和3.分區(qū)3具有一個偏移因子0.副本的id與承載它的服務(wù)器的id相同。

假設(shè),如果主題的復(fù)制因子設(shè)置為3,那么Kafka將創(chuàng)建每個分區(qū)的3個相同的副本,并將它們放在集群中以使其可用于其所有操作。 為了平衡集群中的負(fù)載,每個代理都存儲一個或多個這些分區(qū)。 多個生產(chǎn)者和消費(fèi)者可以同時(shí)發(fā)布和檢索消息。

S.No組件和說明
1

Topics(主題)

屬于特定類別的消息流稱為主題。 數(shù)據(jù)存儲在主題中。

2

Partition(分區(qū))

主題被拆分成分區(qū)。 對于每個主題,Kafka保存一個分區(qū)的數(shù)據(jù)。 每個這樣的分區(qū)包含不可變有序序列的消息。 分區(qū)被實(shí)現(xiàn)為具有相等大小的一組分段文件。

主題可能有許多分區(qū),因此它可以處理任意數(shù)量的數(shù)據(jù)。

3

Partition offset(分區(qū)偏移)

每個分區(qū)消息具有稱為 offset 的唯一序列標(biāo)識。

4

Replicas of partition(分區(qū)備份)

副本只是一個分區(qū)的備份。 副本從不讀取或?qū)懭霐?shù)據(jù)。 它們用于防止數(shù)據(jù)丟失。

5

Brokers(經(jīng)紀(jì)人)

  • 代理是負(fù)責(zé)維護(hù)發(fā)布數(shù)據(jù)的簡單系統(tǒng)。 每個代理可以每個主題具有零個或多個分區(qū)。 假設(shè),如果在一個主題和N個代理中有N個分區(qū),每個代理將有一個分區(qū)。

  • 假設(shè)在一個主題中有N個分區(qū)并且多于N個代理(n + m),則第一個N代理將具有一個分區(qū),并且下一個M代理將不具有用于該特定主題的任何分區(qū)。

  • 假設(shè)在一個主題中有N個分區(qū)并且小于N個代理(n-m),每個代理將在它們之間具有一個或多個分區(qū)共享。 由于代理之間的負(fù)載分布不相等,不推薦使用此方案。

6

Kafka Cluster(Kafka集群

Kafka有多個代理被稱為Kafka集群。 可以擴(kuò)展Kafka集群,無需停機(jī)。 這些集群用于管理消息數(shù)據(jù)的持久性和復(fù)制。

7

Producers(生產(chǎn)者)

生產(chǎn)者是發(fā)送給一個或多個Kafka主題的消息的發(fā)布者。 生產(chǎn)者向Kafka經(jīng)紀(jì)人發(fā)送數(shù)據(jù)。 每當(dāng)生產(chǎn)者將消息發(fā)布給代理時(shí),代理只需將消息附加到最后一個段文件。實(shí)際上,該消息將被附加到分區(qū)。 生產(chǎn)者還可以向他們選擇的分區(qū)發(fā)送消息。

8

Consumers(消費(fèi)者)

Consumers從經(jīng)紀(jì)人處讀取數(shù)據(jù)。 消費(fèi)者訂閱一個或多個主題,并通過從代理中提取數(shù)據(jù)來使用已發(fā)布的消息。

9

Leader(領(lǐng)導(dǎo)者)

 Leader 是負(fù)責(zé)給定分區(qū)的所有讀取和寫入的節(jié)點(diǎn)。 每個分區(qū)都有一個服務(wù)器充當(dāng)Leader
。

10

Follower(追隨者)

跟隨領(lǐng)導(dǎo)者指令的節(jié)點(diǎn)被稱為Follower。 如果領(lǐng)導(dǎo)失敗,一個追隨者將自動成為新的領(lǐng)導(dǎo)者。 跟隨者作為正常消費(fèi)者,拉取消息并更新其自己的數(shù)據(jù)存儲。


Apache Kafka - 集群架構(gòu)

看看下面的插圖。 它顯示Kafka的集群圖。

Cluster Architecture

下表描述了上圖中顯示的每個組件。

S.No組件和說明
1

Broker代理

Kafka集群通常由多個代理組成以保持負(fù)載平衡。 Kafka代理是無狀態(tài)的,所以他們使用ZooKeeper來維護(hù)它們的集群狀態(tài)。 一個Kafka代理實(shí)例可以每秒處理數(shù)十萬次讀取和寫入,每個Broker可以處理TB的消息,而沒有性能影響。 Kafka經(jīng)紀(jì)人領(lǐng)導(dǎo)選舉可以由ZooKeeper完成。

2

ZooKeeper

ZooKeeper用于管理和協(xié)調(diào)Kafka代理。 ZooKeeper服務(wù)主要用于通知生產(chǎn)者和消費(fèi)者Kafka系統(tǒng)中存在任何新代理或Kafka系統(tǒng)中代理失敗。 根據(jù)Zookeeper接收到關(guān)于代理的存在或失敗的通知,然后產(chǎn)品和消費(fèi)者采取決定并開始與某些其他代理協(xié)調(diào)他們的任務(wù)。

3

Producers(制片人

生產(chǎn)者將數(shù)據(jù)推送給經(jīng)紀(jì)人。 當(dāng)新代理啟動時(shí),所有生產(chǎn)者搜索它并自動向該新代理發(fā)送消息。 Kafka生產(chǎn)者不等待來自代理的確認(rèn),并且發(fā)送消息的速度與代理可以處理的一樣快。

4

Consumers(消費(fèi)者

因?yàn)镵afka代理是無狀態(tài)的,這意味著消費(fèi)者必須通過使用分區(qū)偏移來維護(hù)已經(jīng)消耗了多少消息。 如果消費(fèi)者確認(rèn)特定的消息偏移,則意味著消費(fèi)者已經(jīng)消費(fèi)了所有先前的消息。 消費(fèi)者向代理發(fā)出異步拉取請求,以具有準(zhǔn)備好消耗的字節(jié)緩沖區(qū)。 消費(fèi)者可以簡單地通過提供偏移值來快退或跳到分區(qū)中的任何點(diǎn)。 消費(fèi)者偏移值由ZooKeeper通知。

Apache Kafka - WorkFlow

到目前為止,我們討論了Kafka的核心概念。 讓我們現(xiàn)在來看一下Kafka的工作流程。

Kafka只是分為一個或多個分區(qū)的主題的集合。 Kafka分區(qū)是消息的線性有序序列,其中每個消息由它們的索引(稱為偏移)來標(biāo)識。 Kafka集群中的所有數(shù)據(jù)都是不相連的分區(qū)聯(lián)合。 傳入消息寫在分區(qū)的末尾,消息由消費(fèi)者順序讀取。 通過將消息復(fù)制到不同的代理提供持久性。

Kafka以快速,可靠,持久,容錯和零停機(jī)的方式提供基于pub-sub和隊(duì)列的消息系統(tǒng)。 在這兩種情況下,生產(chǎn)者只需將消息發(fā)送到主題,消費(fèi)者可以根據(jù)自己的需要選擇任何一種類型的消息傳遞系統(tǒng)。 讓我們按照下一節(jié)中的步驟來了解消費(fèi)者如何選擇他們選擇的消息系統(tǒng)。

發(fā)布 - 訂閱消息的工作流程

以下是Pub-Sub消息的逐步工作流程 -

  • 生產(chǎn)者定期向主題發(fā)送消息。

  • Kafka代理存儲為該特定主題配置的分區(qū)中的所有消息。 它確保消息在分區(qū)之間平等共享。 如果生產(chǎn)者發(fā)送兩個消息并且有兩個分區(qū),Kafka將在第一分區(qū)中存儲一個消息,在第二分區(qū)中存儲第二消息。

  • 消費(fèi)者訂閱特定主題。

  • 一旦消費(fèi)者訂閱主題,Kafka將向消費(fèi)者提供主題的當(dāng)前偏移,并且還將偏移保存在Zookeeper系綜中。

  • 消費(fèi)者將定期請求Kafka(如100 Ms)新消息。

  • 一旦Kafka收到來自生產(chǎn)者的消息,它將這些消息轉(zhuǎn)發(fā)給消費(fèi)者。

  • 消費(fèi)者將收到消息并進(jìn)行處理。

  • 一旦消息被處理,消費(fèi)者將向Kafka代理發(fā)送確認(rèn)。

  • 一旦Kafka收到確認(rèn),它將偏移更改為新值,并在Zookeeper中更新它。 由于偏移在Zookeeper中維護(hù),消費(fèi)者可以正確地讀取下一封郵件,即使在服務(wù)器暴力期間。

  • 以上流程將重復(fù),直到消費(fèi)者停止請求。

  • 消費(fèi)者可以隨時(shí)回退/跳到所需的主題偏移量,并閱讀所有后續(xù)消息。

隊(duì)列消息/用戶組的工作流

在隊(duì)列消息傳遞系統(tǒng)而不是單個消費(fèi)者中,具有相同組ID 的一組消費(fèi)者將訂閱主題。 簡單來說,訂閱具有相同 Group ID 的主題的消費(fèi)者被認(rèn)為是單個組,并且消息在它們之間共享。 讓我們檢查這個系統(tǒng)的實(shí)際工作流程。

  • 生產(chǎn)者以固定間隔向某個主題發(fā)送消息。

  • Kafka存儲在為該特定主題配置的分區(qū)中的所有消息,類似于前面的方案。

  • 單個消費(fèi)者訂閱特定主題,假設(shè) Topic-01 Group ID Group-1 。

  • Kafka以與發(fā)布 - 訂閱消息相同的方式與消費(fèi)者交互,直到新消費(fèi)者以相同的組ID 訂閱相同主題 Topic-01 1 。

  • 一旦新消費(fèi)者到達(dá),Kafka將其操作切換到共享模式,并在兩個消費(fèi)者之間共享數(shù)據(jù)。 此共享將繼續(xù),直到用戶數(shù)達(dá)到為該特定主題配置的分區(qū)數(shù)。

  • 一旦消費(fèi)者的數(shù)量超過分區(qū)的數(shù)量,新消費(fèi)者將不會接收任何進(jìn)一步的消息,直到現(xiàn)有消費(fèi)者取消訂閱任何一個消費(fèi)者。 出現(xiàn)這種情況是因?yàn)镵afka中的每個消費(fèi)者將被分配至少一個分區(qū),并且一旦所有分區(qū)被分配給現(xiàn)有消費(fèi)者,新消費(fèi)者將必須等待。

  • 此功能也稱為使用者組。 同樣,Kafka將以非常簡單和高效的方式提供兩個系統(tǒng)中最好的。

ZooKeeper的作用

Apache Kafka的一個關(guān)鍵依賴是Apache Zookeeper,它是一個分布式配置和同步服務(wù)。 Zookeeper是Kafka代理和消費(fèi)者之間的協(xié)調(diào)接口。 Kafka服務(wù)器通過Zookeeper集群共享信息。 Kafka在Zookeeper中存儲基本元數(shù)據(jù),例如關(guān)于主題,代理,消費(fèi)者偏移(隊(duì)列讀取器)等的信息。

由于所有關(guān)鍵信息存儲在Zookeeper中,并且它通常在其整體上復(fù)制此數(shù)據(jù),因此Kafka代理/ Zookeeper的故障不會影響Kafka集群的狀態(tài)。 Kafka將恢復(fù)狀態(tài),一旦Zookeeper重新啟動。 這為Kafka帶來了零停機(jī)時(shí)間。 Kafka代理之間的領(lǐng)導(dǎo)者選舉也通過使用Zookeeper在領(lǐng)導(dǎo)者失敗的情況下完成。

要了解有關(guān)Zookeeper的詳細(xì)信息,請參閱 zookeeper

讓我們繼續(xù)進(jìn)一步關(guān)于如何在您的機(jī)器上安裝Java,ZooKeeper和Kafka在下一章。

Apache Kafka - 安裝步驟

以下是在機(jī)器上安裝Java的步驟。

步驟1 - 驗(yàn)證Java安裝

希望你已經(jīng)在你的機(jī)器上安裝了java,所以你只需使用下面的命令驗(yàn)證它。

$ java -version

如果java在您的機(jī)器上成功安裝,您可以看到已安裝的Java的版本。

步驟1.1 - 下載JDK

如果沒有下載Java,請通過訪問以下鏈接并下載最新版本來下載最新版本的JDK。

http://www.oracle.com/technetwork/java/javase/downloads/index.html

現(xiàn)在最新的版本是JDK 8u 60,文件是“jdk-8u60-linux-x64.tar.gz"。 請?jiān)谀臋C(jī)器上下載該文件。

步驟1.2 - 提取文件

通常,正在下載的文件存儲在下載文件夾中,驗(yàn)證它并使用以下命令提取tar設(shè)置。

$ cd /go/to/download/path
$ tar -zxf jdk-8u60-linux-x64.gz

步驟1.3 - 移動到選擇目錄

要將java提供給所有用戶,請將提取的java內(nèi)容移動到 usr / local / java / folder。

$ su
password: (type password of root user)
$ mkdir /opt/jdk
$ mv jdk-1.8.0_60 /opt/jdk/

步驟1.4 - 設(shè)置路徑

要設(shè)置路徑和JAVA_HOME變量,請將以下命令添加到?/ .bashrc文件。

export JAVA_HOME =/usr/jdk/jdk-1.8.0_60
export PATH=$PATH:$JAVA_HOME/bin

現(xiàn)在將所有更改應(yīng)用到當(dāng)前運(yùn)行的系統(tǒng)。

$ source ~/.bashrc

步驟1.5 - Java替代

使用以下命令更改Java Alternatives。

update-alternatives --install /usr/bin/java java /opt/jdk/jdk1.8.0_60/bin/java 100

步驟1.6 - 現(xiàn)在使用第1步中說明的驗(yàn)證命令(java -version)驗(yàn)證java。

步驟2 - ZooKeeper框架安裝

步驟2.1 - 下載ZooKeeper

要在您的計(jì)算機(jī)上安裝ZooKeeper框架,請?jiān)L問以下鏈接并下載最新版本的ZooKeeper。

http://zookeeper.apache.org/releases.html

現(xiàn)在,最新版本的ZooKeeper是3.4.6(ZooKeeper-3.4.6.tar.gz)。

步驟2.2 - 提取tar文件

使用以下命令提取tar文件

$ cd opt/
$ tar -zxf zookeeper-3.4.6.tar.gz
$ cd zookeeper-3.4.6
$ mkdir data

步驟2.3 - 創(chuàng)建配置文件

使用命令vi“conf / zoo.cfg"打開名為 conf / zoo.cfg 的配置文件,并將所有以下參數(shù)設(shè)置為起點(diǎn)。

$ vi conf/zoo.cfg
tickTime=2000
dataDir=/path/to/zookeeper/data
clientPort=2181
initLimit=5
syncLimit=2

一旦配置文件成功保存并再次返回終端,您可以啟動zookeeper服務(wù)器。

步驟2.4 - 啟動ZooKeeper服務(wù)器

$ bin/zkServer.sh start

執(zhí)行此命令后,您將得到如下所示的響應(yīng) -

$ JMX enabled by default
$ Using config: /Users/../zookeeper-3.4.6/bin/../conf/zoo.cfg
$ Starting zookeeper ... STARTED

步驟2.5 - 啟動CLI

$ bin/zkCli.sh

輸入上面的命令后,您將被連接到zookeeper服務(wù)器,并將獲得以下響應(yīng)。

Connecting to localhost:2181
................
................
................
Welcome to ZooKeeper!
................
................
WATCHER::
WatchedEvent state:SyncConnected type: None path:null
[zk: localhost:2181(CONNECTED) 0]

步驟2.6 - 停止Zookeeper服務(wù)器

連接服務(wù)器并執(zhí)行所有操作后,可以使用以下命令停止zookeeper服務(wù)器 -

$ bin/zkServer.sh stop

現(xiàn)在你已經(jīng)在你的機(jī)器上成功安裝了Java和ZooKeeper。 讓我們看看安裝Apache Kafka的步驟。

步驟3 - Apache Kafka安裝

讓我們繼續(xù)以下步驟在您的機(jī)器上安裝Kafka。

步驟3.1 - 下載Kafka

要在您的機(jī)器上安裝Kafka,請點(diǎn)擊以下鏈接 -

https://www.apache.org/dyn/closer.cgi?path=/kafka/0.9.0.0/kafka_2.11-0.9.0.0.tgz

現(xiàn)在最新版本,即 - kafka_2.11_0.9.0.0.tgz 將下載到您的計(jì)算機(jī)上。

步驟3.2 - 解壓tar文件

使用以下命令提取tar文件 -

$ cd opt/
$ tar -zxf kafka_2.11.0.9.0.0 tar.gz
$ cd kafka_2.11.0.9.0.0

現(xiàn)在您已經(jīng)在您的機(jī)器上下載了最新版本的Kafka。

步驟3.3 - 啟動服務(wù)器

您可以通過給出以下命令來啟動服務(wù)器 -

$ bin/kafka-server-start.sh config/server.properties

服務(wù)器啟動后,您會在屏幕上看到以下響應(yīng):

$ bin/kafka-server-start.sh config/server.properties
[2016-01-02 15:37:30,410] INFO KafkaConfig values:
request.timeout.ms = 30000
log.roll.hours = 168
inter.broker.protocol.version = 0.9.0.X
log.preallocate = false
security.inter.broker.protocol = PLAINTEXT
…………………………………………….
…………………………………………….

步驟4 - 停止服務(wù)器

執(zhí)行所有操作后,可以使用以下命令停止服務(wù)器 -

$ bin/kafka-server-stop.sh config/server.properties

現(xiàn)在我們已經(jīng)討論了Kafka安裝,我們可以在下一章中學(xué)習(xí)如何對Kafka執(zhí)行基本操作。

Apache Kafka - 基本操作

首先讓我們開始實(shí)現(xiàn)單節(jié)點(diǎn)單代理配置,然后我們將我們的設(shè)置遷移到單節(jié)點(diǎn)多代理配置。

希望你現(xiàn)在可以在你的機(jī)器上安裝Java,ZooKeeper和Kafka。 在遷移到Kafka Cluster Setup之前,首先需要啟動ZooKeeper,因?yàn)镵afka Cluster使用ZooKeeper。

啟動ZooKeeper

打開一個新終端并鍵入以下命令 -

bin/zookeeper-server-start.sh config/zookeeper.properties

要啟動Kafka Broker,請鍵入以下命令 -

bin/kafka-server-start.sh config/server.properties

啟動Kafka Broker后,在ZooKeeper終端上鍵入命令 jps ,您將看到以下響應(yīng) -

821 QuorumPeerMain
928 Kafka
931 Jps

現(xiàn)在你可以看到兩個守護(hù)進(jìn)程運(yùn)行在終端上,QuorumPeerMain是ZooKeeper守護(hù)進(jìn)程,另一個是Kafka守護(hù)進(jìn)程。

單節(jié)點(diǎn) - 單代理配置

在此配置中,您有一個ZooKeeper和代理id實(shí)例。 以下是配置它的步驟 -

創(chuàng)建Kafka主題 - Kafka提供了一個名為 kafka-topics.sh 的命令行實(shí)用程序,用于在服務(wù)器上創(chuàng)建主題。 打開新終端并鍵入以下示例。

語法

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 
--partitions 1 --topic topic-name

示例

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1   
--partitions 1 --topic Hello-Kafka

我們剛剛創(chuàng)建了一個名為 Hello-Kafka 的主題,其中包含一個分區(qū)和一個副本因子。 上面創(chuàng)建的輸出將類似于以下輸出 -

輸出 - 創(chuàng)建主題 Hello-Kafka

創(chuàng)建主題后,您可以在Kafka代理終端窗口中獲取通知,并在config / server.properties文件中的“/ tmp / kafka-logs /"中指定的創(chuàng)建主題的日志。

主題列表

要獲取Kafka服務(wù)器中的主題列表,可以使用以下命令 -

語法

bin/kafka-topics.sh --list --zookeeper localhost:2181

輸出

Hello-Kafka

由于我們已經(jīng)創(chuàng)建了一個主題,它將僅列出 Hello-Kafka 假設(shè),如果創(chuàng)建多個主題,您將在輸出中獲取主題名稱。

啟動生產(chǎn)者以發(fā)送消息

語法

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic topic-name

從上面的語法,生產(chǎn)者命令行客戶端需要兩個主要參數(shù) -

代理列表 - 我們要發(fā)送郵件的代理列表。 在這種情況下,我們只有一個代理。 Config / server.properties文件包含代理端口ID,因?yàn)槲覀冎牢覀兊拇碚趥陕牰丝?092,因此您可以直接指定它。

主題名稱 - 以下是主題名稱的示例。

示例

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic Hello-Kafka

生產(chǎn)者將等待來自stdin的輸入并發(fā)布到Kafka集群。 默認(rèn)情況下,每個新行都作為新消息發(fā)布,然后在 config / producer.properties 文件中指定默認(rèn)生產(chǎn)者屬性。 現(xiàn)在,您可以在終端中鍵入幾行消息,如下所示。

輸出

$ bin/kafka-console-producer.sh --broker-list localhost:9092 
--topic Hello-Kafka[2016-01-16 13:50:45,931] 
WARN property topic is not valid (kafka.utils.Verifia-bleProperties)
Hello
My first message
My second message

啟動消費(fèi)者以接收消息

與生產(chǎn)者類似,在 config / consumer.proper-ties 文件中指定了缺省使用者屬性。 打開一個新終端并鍵入以下消息消息語法。

語法

bin/kafka-console-consumer.sh --zookeeper localhost:2181 —topic topic-name 
--from-beginning

示例

bin/kafka-console-consumer.sh --zookeeper localhost:2181 —topic Hello-Kafka 
--from-beginning

輸出

Hello
My first message
My second message

最后,您可以從制作商的終端輸入消息,并看到他們出現(xiàn)在消費(fèi)者的終端。 到目前為止,您對具有單個代理的單節(jié)點(diǎn)群集有非常好的了解。 現(xiàn)在讓我們繼續(xù)討論多個代理配置。

單節(jié)點(diǎn)多代理配置

在進(jìn)入多個代理集群設(shè)置之前,首先啟動ZooKeeper服務(wù)器。

創(chuàng)建多個Kafka Brokers - 我們在配置/ server.properties中已有一個Kafka代理實(shí)例。 現(xiàn)在我們需要多個代理實(shí)例,因此將現(xiàn)有的server.prop-erties文件復(fù)制到兩個新的配置文件中,并將其重命名為server-one.properties和server-two.properties。 然后編輯這兩個新文件并分配以下更改 -

config / server-one.properties

# The id of the broker. This must be set to a unique integer for each broker.
broker.id=1
# The port the socket server listens on
port=9093
# A comma seperated list of directories under which to store log files
log.dirs=/tmp/kafka-logs-1

config / server-two.properties

# The id of the broker. This must be set to a unique integer for each broker.
broker.id=2
# The port the socket server listens on
port=9094
# A comma seperated list of directories under which to store log files
log.dirs=/tmp/kafka-logs-2

啟動多個代理 - 在三臺服務(wù)器上進(jìn)行所有更改后,打開三個新終端,逐個啟動每個代理。

Broker1
bin/kafka-server-start.sh config/server.properties
Broker2
bin/kafka-server-start.sh config/server-one.properties
Broker3
bin/kafka-server-start.sh config/server-two.properties

現(xiàn)在我們有三個不同的經(jīng)紀(jì)人在機(jī)器上運(yùn)行。 自己嘗試,通過在ZooKeeper終端上鍵入 jps 檢查所有守護(hù)程序,然后您將看到響應(yīng)。

創(chuàng)建主題

讓我們?yōu)榇酥黝}將復(fù)制因子值指定為三個,因?yàn)槲覀冇腥齻€不同的代理運(yùn)行。 如果您有兩個代理,那么分配的副本值將是兩個。

語法

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 
-partitions 1 --topic topic-name

示例

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 
-partitions 1 --topic Multibrokerapplication

輸出

created topic “Multibrokerapplication"

Describe 命令用于檢查哪個代理正在偵聽當(dāng)前創(chuàng)建的主題,如下所示 -

bin/kafka-topics.sh --describe --zookeeper localhost:2181 
--topic Multibrokerappli-cation

輸出

bin/kafka-topics.sh --describe --zookeeper localhost:2181 
--topic Multibrokerappli-cation

Topic:Multibrokerapplication    PartitionCount:1 
ReplicationFactor:3 Configs:
   
Topic:Multibrokerapplication Partition:0 Leader:0 
Replicas:0,2,1 Isr:0,2,1

從上面的輸出,我們可以得出結(jié)論,第一行給出所有分區(qū)的摘要,顯示主題名稱,分區(qū)數(shù)量和我們已經(jīng)選擇的復(fù)制因子。 在第二行中,每個節(jié)點(diǎn)將是分區(qū)的隨機(jī)選擇部分的領(lǐng)導(dǎo)者。

在我們的例子中,我們看到我們的第一個broker(with broker.id 0)是領(lǐng)導(dǎo)者。 然后Replicas:0,2,1意味著所有代理復(fù)制主題最后 Isr in-sync 副本的集合。 那么,這是副本的子集,當(dāng)前活著并被領(lǐng)導(dǎo)者趕上。

啟動生產(chǎn)者以發(fā)送消息

此過程保持與單代理設(shè)置中相同。

示例

bin/kafka-console-producer.sh --broker-list localhost:9092 
--topic Multibrokerapplication

輸出

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic Multibrokerapplication
[2016-01-20 19:27:21,045] WARN Property topic is not valid (kafka.utils.Verifia-bleProperties)
This is single node-multi broker demo
This is the second message

啟動消費(fèi)者以接收消息

此過程保持與單代理設(shè)置中所示的相同。

示例

bin/kafka-console-consumer.sh --zookeeper localhost:2181 
—topic Multibrokerapplica-tion --from-beginning

輸出

bin/kafka-console-consumer.sh --zookeeper localhost:2181 
—topic Multibrokerapplica-tion —from-beginning
This is single node-multi broker demo
This is the second message

基本主題操作

在本章中,我們將討論各種基本主題操作。

修改主題

您已經(jīng)了解如何在Kafka Cluster中創(chuàng)建主題。 現(xiàn)在讓我們使用以下命令修改已創(chuàng)建的主題

語法

bin/kafka-topics.sh —zookeeper localhost:2181 --alter --topic topic_name 
--parti-tions count

示例

We have already created a topic “Hello-Kafka" with single partition count and one replica factor. 
Now using “alter" command we have changed the partition count.
bin/kafka-topics.sh --zookeeper localhost:2181 
--alter --topic Hello-kafka --parti-tions 2

輸出

WARNING: If partitions are increased for a topic that has a key, 
the partition logic or ordering of the messages will be affected
Adding partitions succeeded!

刪除主題

要刪除主題,可以使用以下語法。

語法

bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic topic_name

示例

bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic Hello-kafka

輸出

> Topic Hello-kafka marked for deletion

注意 - 如果 delete.topic.enable 未設(shè)置為true,則此操作不會產(chǎn)生任何影響

Apache Kafka - 簡單生產(chǎn)者示例

讓我們使用Java客戶端創(chuàng)建一個用于發(fā)布和使用消息的應(yīng)用程序。 Kafka生產(chǎn)者客戶端包括以下API。

KafkaProducer API

讓我們了解本節(jié)中最重要的一組Kafka生產(chǎn)者API。 KafkaProducer API的中心部分是 KafkaProducer 類。 KafkaProducer類提供了一個選項(xiàng),用于將其構(gòu)造函數(shù)中的Kafka代理連接到以下方法。

  • KafkaProducer類提供send方法以異步方式將消息發(fā)送到主題。 send()的簽名如下

producer.send(new ProducerRecord<byte[],byte[]>(topic, 
partition, key1, value1) , callback);
  • ProducerRecord - 生產(chǎn)者管理等待發(fā)送的記錄的緩沖區(qū)。

  • 回調(diào) - 當(dāng)服務(wù)器確認(rèn)記錄時(shí)執(zhí)行的用戶提供的回調(diào)(null表示無回調(diào))。

  • KafkaProducer類提供了一個flush方法,以確保所有先前發(fā)送的消息都已實(shí)際完成。 flush方法的語法如下 -

public void flush()
  • KafkaProducer類提供了partitionFor方法,這有助于獲取給定主題的分區(qū)元數(shù)據(jù)。 這可以用于自定義分區(qū)。 這種方法的簽名如下 -

public Map metrics()

它返回由生產(chǎn)者維護(hù)的內(nèi)部度量的映射。

  • public void close() - KafkaProducer類提供關(guān)閉方法塊,直到所有先前發(fā)送的請求完成。

生產(chǎn)者API

生產(chǎn)者API的中心部分是生產(chǎn)者類。 生產(chǎn)者類提供了一個選項(xiàng),通過以下方法在其構(gòu)造函數(shù)中連接Kafka代理。

生產(chǎn)者類

生產(chǎn)者類提供send方法以使用以下簽名向單個或多個主題發(fā)送消息。


public void send(KeyedMessaget<k,v> message) 
- sends the data to a single topic,par-titioned by key using either sync or async producer.
public void send(List<KeyedMessage<k,v>>messages)
- sends data to multiple topics.
Properties prop = new Properties();
prop.put(producer.type,"async")
ProducerConfig config = new ProducerConfig(prop);

有兩種類型的生產(chǎn)者 - 同步異步。

相同的API配置也適用于同步生產(chǎn)者。 它們之間的區(qū)別是同步生成器直接發(fā)送消息,但在后臺發(fā)送消息。 當(dāng)您想要更高的吞吐量時(shí),異步生產(chǎn)者是首選。 在以前的版本,如0.8,一個異步生產(chǎn)者沒有回調(diào)send()注冊錯誤處理程序。 這僅在當(dāng)前版本0.9中可用。

public void close()

生產(chǎn)者類提供關(guān)閉方法以關(guān)閉與所有Kafka代理的生產(chǎn)者池連接。

配置設(shè)置

下表列出了Producer API的主要配置設(shè)置,以便更好地理解 -

S.No配置設(shè)置和說明
1

client.id

標(biāo)識生產(chǎn)者應(yīng)用程序

2

producer.type

同步或異步

3

acks

acks配置控制生產(chǎn)者請求下的標(biāo)準(zhǔn)是完全的。

4

重試

如果生產(chǎn)者請求失敗,則使用特定值自動重試。

5

bootstrapping代理列表。

6

linger.ms

如果你想減少請求的數(shù)量,你可以將linger.ms設(shè)置為大于某個值的東西。

7

key.serializer

序列化器接口的鍵。

8

value.serializer

值。

9

batch.size

緩沖區(qū)大小。

10

buffer.memory

控制生產(chǎn)者可用于緩沖的存儲器的總量。

ProducerRecord API

ProducerRecord是發(fā)送到Kafka cluster.ProducerRecord類構(gòu)造函數(shù)的鍵/值對,用于使用以下簽名創(chuàng)建具有分區(qū),鍵和值對的記錄。

public ProducerRecord (string topic, int partition, k key, v value)
  • 主題 - 將附加到記錄的用戶定義的主題名稱。

  • 分區(qū) - 分區(qū)計(jì)數(shù)

  • - 將包含在記錄中的鍵。

  • Value ? Record contents
public ProducerRecord (string topic, k key, v value)

ProducerRecord類構(gòu)造函數(shù)用于創(chuàng)建帶有鍵,值對和無分區(qū)的記錄。

  • 主題 - 創(chuàng)建主題以分配記錄。

  • - 記錄的鍵。

  • - 記錄內(nèi)容。

public ProducerRecord (string topic, v value)

ProducerRecord類創(chuàng)建一個沒有分區(qū)和鍵的記錄。

  • 主題 - 創(chuàng)建主題。

  • - 記錄內(nèi)容。

ProducerRecord類方法列在下表中 -

S.No類方法和描述
1

public string topic()

主題將附加到記錄。

2

public K key()

將包括在記錄中的鍵。 如果沒有這樣的鍵,null將在這里重新打開。

3

public V value()

記錄內(nèi)容。

4

partition()

記錄的分區(qū)計(jì)數(shù)

SimpleProducer應(yīng)用程序

在創(chuàng)建應(yīng)用程序之前,首先啟動ZooKeeper和Kafka代理,然后使用create topic命令在Kafka代理中創(chuàng)建自己的主題。 之后,創(chuàng)建一個名為 Sim-pleProducer.java 的java類,然后鍵入以下代碼。

//import util.properties packages
import java.util.Properties;

//import simple producer packages
import org.apache.kafka.clients.producer.Producer;

//import KafkaProducer packages
import org.apache.kafka.clients.producer.KafkaProducer;

//import ProducerRecord packages
import org.apache.kafka.clients.producer.ProducerRecord;

//Create java class named “SimpleProducer"
public class SimpleProducer {
   
   public static void main(String[] args) throws Exception{
      
      // Check arguments length value
      if(args.length == 0){
         System.out.println("Enter topic name");
         return;
      }
      
      //Assign topicName to string variable
      String topicName = args[0].toString();
      
      // create instance for properties to access producer configs   
      Properties props = new Properties();
      
      //Assign localhost id
      props.put("bootstrap.servers", “l(fā)ocalhost:9092");
      
      //Set acknowledgements for producer requests.      
      props.put("acks", “all");
      
      //If the request fails, the producer can automatically retry,
      props.put("retries", 0);
      
      //Specify buffer size in config
      props.put("batch.size", 16384);
      
      //Reduce the no of requests less than 0   
      props.put("linger.ms", 1);
      
      //The buffer.memory controls the total amount of memory available to the producer for buffering.   
      props.put("buffer.memory", 33554432);
      
      props.put("key.serializer", 
         "org.apache.kafka.common.serializa-tion.StringSerializer");
         
      props.put("value.serializer", 
         "org.apache.kafka.common.serializa-tion.StringSerializer");
      
      Producer<String, String> producer = new KafkaProducer
         <String, String>(props);
            
      for(int i = 0; i < 10; i++)
         producer.send(new ProducerRecord<String, String>(topicName, 
            Integer.toString(i), Integer.toString(i)));
               System.out.println(“Message sent successfully");
               producer.close();
   }
}

編譯 - 可以使用以下命令編譯應(yīng)用程序。

javac -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*" *.java

執(zhí)行 - 可以使用以下命令執(zhí)行應(yīng)用程序。

java -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*":. SimpleProducer <topic-name>

輸出

Message sent successfully
To check the above output open new terminal and type Consumer CLI command to receive messages.
>> bin/kafka-console-consumer.sh --zookeeper localhost:2181 —topic <topic-name> —from-beginning
1
2
3
4
5
6
7
8
9
10

簡單消費(fèi)者示例

到目前為止,我們已經(jīng)創(chuàng)建了一個發(fā)送消息到Kafka集群的生產(chǎn)者。 現(xiàn)在讓我們創(chuàng)建一個消費(fèi)者來消費(fèi)Kafka集群的消息。 KafkaConsumer API用于消費(fèi)來自Kafka集群的消息。 KafkaConsumer類的構(gòu)造函數(shù)定義如下。

public KafkaConsumer(java.util.Map<java.lang.String,java.lang.Object> configs)

configs - 返回消費(fèi)者配置的地圖。

KafkaConsumer類具有下表中列出的以下重要方法。

S.No方法和說明
1

public java.util.Set< TopicPar- tition> assignment()

獲取由用戶當(dāng)前分配的分區(qū)集。

2

public string subscription()

訂閱給定的主題列表以獲取動態(tài)簽名的分區(qū)。

3

public void sub-scribe(java.util.List< java.lang.String> topics,ConsumerRe-balanceListener listener)

訂閱給定的主題列表以獲取動態(tài)簽名的分區(qū)。

4

public void unsubscribe()

從給定的分區(qū)列表中取消訂閱主題。

5

public void sub-scribe(java.util.List< java.lang.String> topics)

訂閱給定的主題列表以獲取動態(tài)簽名的分區(qū)。 如果給定的主題列表為空,則將其視為與unsubscribe()相同。

6

public void sub-scribe(java.util.regex.Pattern pattern,ConsumerRebalanceLis-tener listener)

參數(shù)模式以正則表達(dá)式的格式引用預(yù)訂模式,而偵聽器參數(shù)從預(yù)訂模式獲取通知。

7

public void as-sign(java.util.List< TopicPartion> partitions)

向客戶手動分配分區(qū)列表。

8

poll()

使用預(yù)訂/分配API之一獲取指定的主題或分區(qū)的數(shù)據(jù)。 如果在輪詢數(shù)據(jù)之前未預(yù)訂主題,這將返回錯誤。

9

public void commitSync()

提交對主題和分區(qū)的所有子編制列表的最后一次poll()返回的提交偏移量。 相同的操作應(yīng)用于commitAsyn()。

10

public void seek(TopicPartition partition,long offset)

獲取消費(fèi)者將在下一個poll()方法中使用的當(dāng)前偏移值。

11

public void resume()

恢復(fù)暫停的分區(qū)。

12

public void wakeup()

喚醒消費(fèi)者。

ConsumerRecord API

ConsumerRecord API用于從Kafka集群接收記錄。 此API由主題名稱,分區(qū)號(從中接收記錄)和指向Kafka分區(qū)中的記錄的偏移量組成。 ConsumerRecord類用于創(chuàng)建具有特定主題名稱,分區(qū)計(jì)數(shù)和< key,value>的消費(fèi)者記錄。 對。 它有以下簽名。

public ConsumerRecord(string topic,int partition, long offset,K key, V value)
  • 主題 - 從Kafka集群接收的使用者記錄的主題名稱。

  • 分區(qū) - 主題的分區(qū)。

  • - 記錄的鍵,如果沒有鍵存在null將被返回。

  • - 記錄內(nèi)容。

ConsumerRecords API

ConsumerRecords API充當(dāng)ConsumerRecord的容器。 此API用于保存特定主題的每個分區(qū)的ConsumerRecord列表。 它的構(gòu)造器定義如下。

public ConsumerRecords(java.util.Map<TopicPartition,java.util.List
<Consumer-Record>K,V>>> records)
  • TopicPartition - 返回特定主題的分區(qū)地圖。

  • 記錄 - ConsumerRecord的返回列表。

ConsumerRecords類定義了以下方法。

S.No方法和描述
1

public int count()

所有主題的記錄數(shù)。

2

public Set partitions()

在此記錄集中具有數(shù)據(jù)的分區(qū)集(如果沒有返回?cái)?shù)據(jù),則該集為空)。

3

public Iterator iterator()

迭代器使您可以循環(huán)訪問集合,獲取或重新移動元素。

4

public List records()

獲取給定分區(qū)的記錄列表。

配置設(shè)置

Consumer客戶端API主配置設(shè)置的配置設(shè)置如下所示 -

S.No設(shè)置和說明
1

引導(dǎo)代理列表。

2

group.id

將單個消費(fèi)者分配給組。

3

enable.auto.commit

如果值為true,則為偏移啟用自動落實(shí),否則不提交。

4

auto.commit.interval.ms

返回更新的消耗偏移量寫入ZooKeeper的頻率。

5

session.timeout.ms

表示Kafka在放棄和繼續(xù)消費(fèi)消息之前等待ZooKeeper響應(yīng)請求(讀取或?qū)懭?多少毫秒。

SimpleConsumer應(yīng)用程序

生產(chǎn)者應(yīng)用程序步驟在此保持不變。 首先,啟動你的ZooKeeper和Kafka代理。 然后使用名為 SimpleCon-sumer.java 的Java類創(chuàng)建一個 SimpleConsumer 應(yīng)用程序,并鍵入以下代碼。

import java.util.Properties;
import java.util.Arrays;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;

public class SimpleConsumer {
   public static void main(String[] args) throws Exception {
      if(args.length == 0){
         System.out.println("Enter topic name");
         return;
      }
      //Kafka consumer configuration settings
      String topicName = args[0].toString();
      Properties props = new Properties();
      
      props.put("bootstrap.servers", "localhost:9092");
      props.put("group.id", "test");
      props.put("enable.auto.commit", "true");
      props.put("auto.commit.interval.ms", "1000");
      props.put("session.timeout.ms", "30000");
      props.put("key.deserializer", 
         "org.apache.kafka.common.serializa-tion.StringDeserializer");
      props.put("value.deserializer", 
         "org.apache.kafka.common.serializa-tion.StringDeserializer");
      KafkaConsumer<String, String> consumer = new KafkaConsumer
         <String, String>(props);
      
      //Kafka Consumer subscribes list of topics here.
      consumer.subscribe(Arrays.asList(topicName))
      
      //print the topic name
      System.out.println("Subscribed to topic " &plus; topicName);
      int i = 0;
      
      while (true) {
         ConsumerRecords<String, String> records = con-sumer.poll(100);
         for (ConsumerRecord<String, String> record : records)
         
         // print the offset,key and value for the consumer records.
         System.out.printf("offset = %d, key = %s, value = %s\n", 
            record.offset(), record.key(), record.value());
      }
   }
}

編譯 - 可以使用以下命令編譯應(yīng)用程序。

javac -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*" *.java

執(zhí)行 - 可以使用以下命令執(zhí)行應(yīng)用程序

java -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*":. SimpleConsumer <topic-name>

輸入 - 打開生成器CLI并向主題發(fā)送一些消息。 你可以把smple輸入為\'Hello Consumer\'。

輸出 - 以下是輸出。

Subscribed to topic Hello-Kafka
offset = 3, key = null, value = Hello Consumer

Apache Kafka - 用戶組示例

消費(fèi)群是多線程或多機(jī)器的Kafka主題。

消費(fèi)者群體

  • 消費(fèi)者可以使用相同的 group.id 加入群組

  • 一個組的最大并行度是組中的消費(fèi)者數(shù)量←不是分區(qū)。

  • Kafka將主題的分區(qū)分配給組中的使用者,以便每個分區(qū)僅由組中的一個使用者使用。

  • Kafka保證消息只能被組中的一個消費(fèi)者讀取。

  • 消費(fèi)者可以按照消息存儲在日志中的順序查看消息。

重新平衡消費(fèi)者

添加更多進(jìn)程/線程將導(dǎo)致Kafka重新平衡。 如果任何消費(fèi)者或代理無法向ZooKeeper發(fā)送心跳,則可以通過Kafka集群重新配置。 在此重新平衡期間,Kafka將分配可用分區(qū)到可用線程,可能將分區(qū)移動到另一個進(jìn)程。

import java.util.Properties;
import java.util.Arrays;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;

public class ConsumerGroup {
   public static void main(String[] args) throws Exception {
      if(args.length < 2){
         System.out.println("Usage: consumer <topic> <groupname>");
         return;
      }
      
      String topic = args[0].toString();
      String group = args[1].toString();
      Properties props = new Properties();
      props.put("bootstrap.servers", "localhost:9092");
      props.put("group.id", group);
      props.put("enable.auto.commit", "true");
      props.put("auto.commit.interval.ms", "1000");
      props.put("session.timeout.ms", "30000");
      props.put("key.deserializer",          
         "org.apache.kafka.common.serializa-tion.StringDeserializer");
      props.put("value.deserializer", 
         "org.apache.kafka.common.serializa-tion.StringDeserializer");
      KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
      
      consumer.subscribe(Arrays.asList(topic));
      System.out.println("Subscribed to topic " &plus; topic);
      int i = 0;
         
      while (true) {
         ConsumerRecords<String, String> records = con-sumer.poll(100);
            for (ConsumerRecord<String, String> record : records)
               System.out.printf("offset = %d, key = %s, value = %s\n", 
               record.offset(), record.key(), record.value());
      }     
   }  
}

匯編

javac -cp “/path/to/kafka/kafka_2.11-0.9.0.0/libs/*" ConsumerGroup.java

執(zhí)行

>>java -cp “/path/to/kafka/kafka_2.11-0.9.0.0/libs/*":. 
ConsumerGroup <topic-name> my-group
>>java -cp "/home/bala/Workspace/kafka/kafka_2.11-0.9.0.0/libs/*":. 
ConsumerGroup <topic-name> my-group

在這里,我們?yōu)閮蓚€消費(fèi)者創(chuàng)建了一個示例組名稱為 my-group 。 同樣,您可以在組中創(chuàng)建您的組和消費(fèi)者數(shù)量。

輸入

打開生產(chǎn)者CLI并發(fā)送一些消息 -

Test consumer group 01
Test consumer group 02

第一個過程的輸出

Subscribed to topic Hello-kafka
offset = 3, key = null, value = Test consumer group 01

第二個過程的輸出

Subscribed to topic Hello-kafka
offset = 3, key = null, value = Test consumer group 02

現(xiàn)在希望你能通過使用Java客戶端演示了解SimpleConsumer和ConsumeGroup。 現(xiàn)在,您了解如何使用Java客戶端發(fā)送和接收消息。 讓我們在下一章繼續(xù)Kafka與大數(shù)據(jù)技術(shù)的集成。

Apache Kafka - 與Storm集成

在本章中,我們將學(xué)習(xí)如何將Kafka與Apache Storm集成。

關(guān)于Storm

Storm最初由Nathan Marz和BackType的團(tuán)隊(duì)創(chuàng)建。 在短時(shí)間內(nèi),Apache Storm成為分布式實(shí)時(shí)處理系統(tǒng)的標(biāo)準(zhǔn),允許您處理大量數(shù)據(jù)。 Storm是非??斓?,并且一個基準(zhǔn)時(shí)鐘為每個節(jié)點(diǎn)每秒處理超過一百萬個元組。 Apache Storm持續(xù)運(yùn)行,從配置的源(Spouts)消耗數(shù)據(jù),并將數(shù)據(jù)傳遞到處理管道(Bolts)。 聯(lián)合,spout和Bolt
做一個拓?fù)洹?/span>

與Storm集成

Kafka和Storm自然互補(bǔ),它們強(qiáng)大的合作能夠?qū)崿F(xiàn)快速移動的大數(shù)據(jù)的實(shí)時(shí)流分析。 Kafka和Storm集成是為了使開發(fā)人員更容易地從Storm拓?fù)浍@取和發(fā)布數(shù)據(jù)流。

概念流

spout是流的源。 例如,一個spout可以從Kafka Topic讀取元組并將它們作為流發(fā)送。 Bolt消耗輸入流,處理并可能發(fā)射新的流。 Bolt可以從運(yùn)行函數(shù),過濾元組,執(zhí)行流聚合,流連接,與數(shù)據(jù)庫交談等等做任何事情。 Storm拓?fù)渲械拿總€節(jié)點(diǎn)并行執(zhí)行。 拓?fù)錈o限運(yùn)行,直到終止它。 Storm將自動重新分配任何失敗的任務(wù)。 此外,Storm保證沒有數(shù)據(jù)丟失,即使機(jī)器停機(jī)和消息被丟棄。

讓我們詳細(xì)了解Kafka-Storm集成API。 有三個主要類集成Kafka與Storm。 他們?nèi)缦?-

經(jīng)紀(jì)人 - ZkHosts&amp; 靜態(tài)主機(jī)

BrokerHosts是一個接口,ZkHosts和StaticHosts是它的兩個主要實(shí)現(xiàn)。 ZkHosts用于通過在ZooKeeper中維護(hù)細(xì)節(jié)來動態(tài)跟蹤Kafka代理,而StaticHosts用于手動/靜態(tài)設(shè)置Kafka代理及其詳細(xì)信息。 ZkHosts是訪問Kafka代理的簡單快捷的方式。

ZkHosts的簽名如下 -

public ZkHosts(String brokerZkStr, String brokerZkPath)
public ZkHosts(String brokerZkStr)

其中brokerZkStr是ZooKeeper主機(jī),brokerZkPath是ZooKeeper路徑以維護(hù)Kafka代理詳細(xì)信息。

KafkaConfig API

此API用于定義Kafka集群的配置設(shè)置。 Kafka Con-fig的簽名定義如下

public KafkaConfig(BrokerHosts hosts, string topic)

    主機(jī) - BrokerHosts可以是ZkHosts / StaticHosts。

    主題 - 主題名稱。

SpoutConfig API

Spoutconfig是KafkaConfig的擴(kuò)展,支持額外的ZooKeeper信息。

public SpoutConfig(BrokerHosts hosts, string topic, string zkRoot, string id)
  • 主機(jī) - BrokerHosts可以是BrokerHosts接口的任何實(shí)現(xiàn)

  • 主題 - 主題名稱。

  • zkRoot - ZooKeeper根路徑。

  • id - spouts存儲在Zookeeper中消耗的偏移量的狀態(tài)。 ID應(yīng)該唯一標(biāo)識您的spout。

SchemeAsMultiScheme

SchemeAsMultiScheme是一個接口,用于指示如何將從Kafka中消耗的ByteBuffer轉(zhuǎn)換為Storm元組。 它源自MultiScheme并接受Scheme類的實(shí)現(xiàn)。 有很多Scheme類的實(shí)現(xiàn),一個這樣的實(shí)現(xiàn)是StringScheme,它將字節(jié)解析為一個簡單的字符串。 它還控制輸出字段的命名。 簽名定義如下。

public SchemeAsMultiScheme(Scheme scheme)
  • 方案 - 從kafka消耗的字節(jié)緩沖區(qū)。

KafkaSpout API

KafkaSpout是我們的spout實(shí)現(xiàn),它將與Storm集成。 它從kafka主題獲取消息,并將其作為元組發(fā)送到Storm生態(tài)系統(tǒng)。 KafkaSpout從SpoutConfig獲取其配置詳細(xì)信息。

下面是一個創(chuàng)建一個簡單的Kafka spout的示例代碼。

// ZooKeeper connection string
BrokerHosts hosts = new ZkHosts(zkConnString);

//Creating SpoutConfig Object
SpoutConfig spoutConfig = new SpoutConfig(hosts, 
   topicName, "/" + topicName UUID.randomUUID().toString());

//convert the ByteBuffer to String.
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());

//Assign SpoutConfig to KafkaSpout.
KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);

創(chuàng)建Bolt

Bolt是一個使用元組作為輸入,處理元組,并產(chǎn)生新的元組作為輸出的組件。 Bolt將實(shí)現(xiàn)IRichBolt接口。 在此程序中,使用兩個Bolt類WordSplitter-Bolt和WordCounterBolt來執(zhí)行操作。

IRichBolt接口有以下方法 -

  • 準(zhǔn)備 - 為Bolt提供要執(zhí)行的環(huán)境。 執(zhí)行器將運(yùn)行此方法來初始化spout。

  • 執(zhí)行 - 處理單個元組的輸入。

  • 清理 - 當(dāng)Bolt要關(guān)閉時(shí)調(diào)用。

  • declareOutputFields - 聲明元組的輸出模式。

讓我們創(chuàng)建SplitBolt.java,它實(shí)現(xiàn)邏輯分割一個句子到詞和CountBolt.java,它實(shí)現(xiàn)邏輯分離獨(dú)特的單詞和計(jì)數(shù)其出現(xiàn)。

SplitBolt.java

import java.util.Map;

import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

import backtype.storm.task.OutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.IRichBolt;
import backtype.storm.task.TopologyContext;

public class SplitBolt implements IRichBolt {
   private OutputCollector collector;
   
   @Override
   public void prepare(Map stormConf, TopologyContext context,
      OutputCollector collector) {
      this.collector = collector;
   }
   
   @Override
   public void execute(Tuple input) {
      String sentence = input.getString(0);
      String[] words = sentence.split(" ");
      
      for(String word: words) {
         word = word.trim();
         
         if(!word.isEmpty()) {
            word = word.toLowerCase();
            collector.emit(new Values(word));
         }
         
      }

      collector.ack(input);
   }
   
   @Override
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("word"));
   }

   @Override
   public void cleanup() {}
   
   @Override
   public Map<String, Object> getComponentConfiguration() {
      return null;
   }
   
}

CountBolt.java

import java.util.Map;
import java.util.HashMap;

import backtype.storm.tuple.Tuple;
import backtype.storm.task.OutputCollector;
import backtype.storm.topology.


以上內(nèi)容是否對您有幫助:
在線筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號
微信公眾號

編程獅公眾號