在大數(shù)據(jù)中,使用了大量的數(shù)據(jù)。 關(guān)于數(shù)據(jù),我們有兩個主要挑戰(zhàn)。第一個挑戰(zhàn)是如何收集大量的數(shù)據(jù),第二個挑戰(zhàn)是分析收集的數(shù)據(jù)。 為了克服這些挑戰(zhàn),您必須需要一個消息系統(tǒng)。
Kafka專為分布式高吞吐量系統(tǒng)而設(shè)計(jì)。 Kafka往往工作得很好,作為一個更傳統(tǒng)的消息代理的替代品。 與其他消息傳遞系統(tǒng)相比,Kafka具有更好的吞吐量,內(nèi)置分區(qū),復(fù)制和固有的容錯能力,這使得它非常適合大規(guī)模消息處理應(yīng)用程序。
消息系統(tǒng)負(fù)責(zé)將數(shù)據(jù)從一個應(yīng)用程序傳輸?shù)搅硪粋€應(yīng)用程序,因此應(yīng)用程序可以專注于數(shù)據(jù),但不擔(dān)心如何共享它。 分布式消息傳遞基于可靠消息隊(duì)列的概念。 消息在客戶端應(yīng)用程序和消息傳遞系統(tǒng)之間異步排隊(duì)。 有兩種類型的消息模式可用 - 一種是點(diǎn)對點(diǎn),另一種是發(fā)布 - 訂閱(pub-sub)消息系統(tǒng)。 大多數(shù)消息模式遵循 pub-sub 。
在點(diǎn)對點(diǎn)系統(tǒng)中,消息被保留在隊(duì)列中。 一個或多個消費(fèi)者可以消耗隊(duì)列中的消息,但是特定消息只能由最多一個消費(fèi)者消費(fèi)。 一旦消費(fèi)者讀取隊(duì)列中的消息,它就從該隊(duì)列中消失。 該系統(tǒng)的典型示例是訂單處理系統(tǒng),其中每個訂單將由一個訂單處理器處理,但多個訂單處理器也可以同時(shí)工作。 下圖描述了結(jié)構(gòu)。
在發(fā)布 - 訂閱系統(tǒng)中,消息被保留在主題中。 與點(diǎn)對點(diǎn)系統(tǒng)不同,消費(fèi)者可以訂閱一個或多個主題并使用該主題中的所有消息。 在發(fā)布 - 訂閱系統(tǒng)中,消息生產(chǎn)者稱為發(fā)布者,消息使用者稱為訂閱者。 一個現(xiàn)實(shí)生活的例子是Dish電視,它發(fā)布不同的渠道,如運(yùn)動,電影,音樂等,任何人都可以訂閱自己的頻道集,并獲得他們訂閱的頻道時(shí)可用。
Apache Kafka是一個分布式發(fā)布 - 訂閱消息系統(tǒng)和一個強(qiáng)大的隊(duì)列,可以處理大量的數(shù)據(jù),并使您能夠?qū)⑾囊粋€端點(diǎn)傳遞到另一個端點(diǎn)。 Kafka適合離線和在線消息消費(fèi)。 Kafka消息保留在磁盤上,并在群集內(nèi)復(fù)制以防止數(shù)據(jù)丟失。 Kafka構(gòu)建在ZooKeeper同步服務(wù)之上。 它與Apache Storm和Spark非常好地集成,用于實(shí)時(shí)流式數(shù)據(jù)分析。
以下是Kafka的幾個好處 -
可靠性 - Kafka是分布式,分區(qū),復(fù)制和容錯的。
可擴(kuò)展性 - Kafka消息傳遞系統(tǒng)輕松縮放,無需停機(jī)。
耐用性 - Kafka使用分布式提交日志
,這意味著消息會盡可能快地保留在磁盤上,因此它是持久的。
性能 - Kafka對于發(fā)布和訂閱消息都具有高吞吐量。 即使存儲了許多TB的消息,它也保持穩(wěn)定的性能。
Kafka非??欤⒈WC零停機(jī)和零數(shù)據(jù)丟失。
Kafka可以在許多用例中使用。 其中一些列出如下 -
指標(biāo) - Kafka通常用于操作監(jiān)控?cái)?shù)據(jù)。 這涉及聚合來自分布式應(yīng)用程序的統(tǒng)計(jì)信息,以產(chǎn)生操作數(shù)據(jù)的集中饋送。
日志聚合解決方案 - Kafka可用于跨組織從多個服務(wù)收集日志,并使它們以標(biāo)準(zhǔn)格式提供給多個服務(wù)器。
流處理 - 流行的框架(如Storm和Spark Streaming)從主題中讀取數(shù)據(jù),對其進(jìn)行處理,并將處理后的數(shù)據(jù)寫入新主題,供用戶和應(yīng)用程序使用。 Kafka的強(qiáng)耐久性在流處理的上下文中也非常有用。
Kafka是一個統(tǒng)一的平臺,用于處理所有實(shí)時(shí)數(shù)據(jù)Feed。 Kafka支持低延遲消息傳遞,并在出現(xiàn)機(jī)器故障時(shí)提供對容錯的保證。 它具有處理大量不同消費(fèi)者的能力。 Kafka非常快,執(zhí)行2百萬寫/秒。 Kafka將所有數(shù)據(jù)保存到磁盤,這實(shí)質(zhì)上意味著所有寫入都會進(jìn)入操作系統(tǒng)(RAM)的頁面緩存。 這使得將數(shù)據(jù)從頁面緩存?zhèn)鬏數(shù)骄W(wǎng)絡(luò)套接字非常有效。
在深入學(xué)習(xí)Kafka之前,您必須了解主題,經(jīng)紀(jì)人,生產(chǎn)者和消費(fèi)者等主要術(shù)語。 下圖說明了主要術(shù)語,表格詳細(xì)描述了圖表組件。
在上圖中,主題配置為三個分區(qū)。 分區(qū)1具有兩個偏移因子0和1.分區(qū)2具有四個偏移因子0,1,2和3.分區(qū)3具有一個偏移因子0.副本的id與承載它的服務(wù)器的id相同。
假設(shè),如果主題的復(fù)制因子設(shè)置為3,那么Kafka將創(chuàng)建每個分區(qū)的3個相同的副本,并將它們放在集群中以使其可用于其所有操作。 為了平衡集群中的負(fù)載,每個代理都存儲一個或多個這些分區(qū)。 多個生產(chǎn)者和消費(fèi)者可以同時(shí)發(fā)布和檢索消息。
S.No | 組件和說明 |
---|---|
1 | Topics(主題) 屬于特定類別的消息流稱為主題。 數(shù)據(jù)存儲在主題中。 |
2 | Partition(分區(qū)) 主題被拆分成分區(qū)。 對于每個主題,Kafka保存一個分區(qū)的數(shù)據(jù)。 每個這樣的分區(qū)包含不可變有序序列的消息。 分區(qū)被實(shí)現(xiàn)為具有相等大小的一組分段文件。 主題可能有許多分區(qū),因此它可以處理任意數(shù)量的數(shù)據(jù)。 |
3 | Partition offset(分區(qū)偏移) 每個分區(qū)消息具有稱為 |
4 | Replicas of partition(分區(qū)備份) 副本只是一個分區(qū)的 |
5 | Brokers(經(jīng)紀(jì)人)
|
6 | Kafka Cluster(Kafka集群) Kafka有多個代理被稱為Kafka集群。 可以擴(kuò)展Kafka集群,無需停機(jī)。 這些集群用于管理消息數(shù)據(jù)的持久性和復(fù)制。 |
7 | Producers(生產(chǎn)者) 生產(chǎn)者是發(fā)送給一個或多個Kafka主題的消息的發(fā)布者。 生產(chǎn)者向Kafka經(jīng)紀(jì)人發(fā)送數(shù)據(jù)。 每當(dāng)生產(chǎn)者將消息發(fā)布給代理時(shí),代理只需將消息附加到最后一個段文件。實(shí)際上,該消息將被附加到分區(qū)。 生產(chǎn)者還可以向他們選擇的分區(qū)發(fā)送消息。 |
8 | Consumers(消費(fèi)者) Consumers從經(jīng)紀(jì)人處讀取數(shù)據(jù)。 消費(fèi)者訂閱一個或多個主題,并通過從代理中提取數(shù)據(jù)來使用已發(fā)布的消息。 |
9 | Leader(領(lǐng)導(dǎo)者)
|
10 | Follower(追隨者) 跟隨領(lǐng)導(dǎo)者指令的節(jié)點(diǎn)被稱為Follower。 如果領(lǐng)導(dǎo)失敗,一個追隨者將自動成為新的領(lǐng)導(dǎo)者。 跟隨者作為正常消費(fèi)者,拉取消息并更新其自己的數(shù)據(jù)存儲。 |
看看下面的插圖。 它顯示Kafka的集群圖。
下表描述了上圖中顯示的每個組件。
S.No | 組件和說明 |
---|---|
1 | Broker(代理) Kafka集群通常由多個代理組成以保持負(fù)載平衡。 Kafka代理是無狀態(tài)的,所以他們使用ZooKeeper來維護(hù)它們的集群狀態(tài)。 一個Kafka代理實(shí)例可以每秒處理數(shù)十萬次讀取和寫入,每個Broker可以處理TB的消息,而沒有性能影響。 Kafka經(jīng)紀(jì)人領(lǐng)導(dǎo)選舉可以由ZooKeeper完成。 |
2 | ZooKeeper ZooKeeper用于管理和協(xié)調(diào)Kafka代理。 ZooKeeper服務(wù)主要用于通知生產(chǎn)者和消費(fèi)者Kafka系統(tǒng)中存在任何新代理或Kafka系統(tǒng)中代理失敗。 根據(jù)Zookeeper接收到關(guān)于代理的存在或失敗的通知,然后產(chǎn)品和消費(fèi)者采取決定并開始與某些其他代理協(xié)調(diào)他們的任務(wù)。 |
3 | Producers(制片人) 生產(chǎn)者將數(shù)據(jù)推送給經(jīng)紀(jì)人。 當(dāng)新代理啟動時(shí),所有生產(chǎn)者搜索它并自動向該新代理發(fā)送消息。 Kafka生產(chǎn)者不等待來自代理的確認(rèn),并且發(fā)送消息的速度與代理可以處理的一樣快。 |
4 | Consumers(消費(fèi)者) 因?yàn)镵afka代理是無狀態(tài)的,這意味著消費(fèi)者必須通過使用分區(qū)偏移來維護(hù)已經(jīng)消耗了多少消息。 如果消費(fèi)者確認(rèn)特定的消息偏移,則意味著消費(fèi)者已經(jīng)消費(fèi)了所有先前的消息。 消費(fèi)者向代理發(fā)出異步拉取請求,以具有準(zhǔn)備好消耗的字節(jié)緩沖區(qū)。 消費(fèi)者可以簡單地通過提供偏移值來快退或跳到分區(qū)中的任何點(diǎn)。 消費(fèi)者偏移值由ZooKeeper通知。 |
到目前為止,我們討論了Kafka的核心概念。 讓我們現(xiàn)在來看一下Kafka的工作流程。
Kafka只是分為一個或多個分區(qū)的主題的集合。 Kafka分區(qū)是消息的線性有序序列,其中每個消息由它們的索引(稱為偏移)來標(biāo)識。 Kafka集群中的所有數(shù)據(jù)都是不相連的分區(qū)聯(lián)合。 傳入消息寫在分區(qū)的末尾,消息由消費(fèi)者順序讀取。 通過將消息復(fù)制到不同的代理提供持久性。
Kafka以快速,可靠,持久,容錯和零停機(jī)的方式提供基于pub-sub和隊(duì)列的消息系統(tǒng)。 在這兩種情況下,生產(chǎn)者只需將消息發(fā)送到主題,消費(fèi)者可以根據(jù)自己的需要選擇任何一種類型的消息傳遞系統(tǒng)。 讓我們按照下一節(jié)中的步驟來了解消費(fèi)者如何選擇他們選擇的消息系統(tǒng)。
以下是Pub-Sub消息的逐步工作流程 -
生產(chǎn)者定期向主題發(fā)送消息。
Kafka代理存儲為該特定主題配置的分區(qū)中的所有消息。 它確保消息在分區(qū)之間平等共享。 如果生產(chǎn)者發(fā)送兩個消息并且有兩個分區(qū),Kafka將在第一分區(qū)中存儲一個消息,在第二分區(qū)中存儲第二消息。
消費(fèi)者訂閱特定主題。
一旦消費(fèi)者訂閱主題,Kafka將向消費(fèi)者提供主題的當(dāng)前偏移,并且還將偏移保存在Zookeeper系綜中。
消費(fèi)者將定期請求Kafka(如100 Ms)新消息。
一旦Kafka收到來自生產(chǎn)者的消息,它將這些消息轉(zhuǎn)發(fā)給消費(fèi)者。
消費(fèi)者將收到消息并進(jìn)行處理。
一旦消息被處理,消費(fèi)者將向Kafka代理發(fā)送確認(rèn)。
一旦Kafka收到確認(rèn),它將偏移更改為新值,并在Zookeeper中更新它。 由于偏移在Zookeeper中維護(hù),消費(fèi)者可以正確地讀取下一封郵件,即使在服務(wù)器暴力期間。
以上流程將重復(fù),直到消費(fèi)者停止請求。
消費(fèi)者可以隨時(shí)回退/跳到所需的主題偏移量,并閱讀所有后續(xù)消息。
在隊(duì)列消息傳遞系統(tǒng)而不是單個消費(fèi)者中,具有相同組ID
的一組消費(fèi)者將訂閱主題。 簡單來說,訂閱具有相同 Group ID
的主題的消費(fèi)者被認(rèn)為是單個組,并且消息在它們之間共享。 讓我們檢查這個系統(tǒng)的實(shí)際工作流程。
生產(chǎn)者以固定間隔向某個主題發(fā)送消息。
Kafka存儲在為該特定主題配置的分區(qū)中的所有消息,類似于前面的方案。
單個消費(fèi)者訂閱特定主題,假設(shè) Topic-01
為 Group ID
為 Group-1
。
Kafka以與發(fā)布 - 訂閱消息相同的方式與消費(fèi)者交互,直到新消費(fèi)者以相同的組ID
訂閱相同主題 Topic-01
1 。
一旦新消費(fèi)者到達(dá),Kafka將其操作切換到共享模式,并在兩個消費(fèi)者之間共享數(shù)據(jù)。 此共享將繼續(xù),直到用戶數(shù)達(dá)到為該特定主題配置的分區(qū)數(shù)。
一旦消費(fèi)者的數(shù)量超過分區(qū)的數(shù)量,新消費(fèi)者將不會接收任何進(jìn)一步的消息,直到現(xiàn)有消費(fèi)者取消訂閱任何一個消費(fèi)者。 出現(xiàn)這種情況是因?yàn)镵afka中的每個消費(fèi)者將被分配至少一個分區(qū),并且一旦所有分區(qū)被分配給現(xiàn)有消費(fèi)者,新消費(fèi)者將必須等待。
此功能也稱為使用者組
。 同樣,Kafka將以非常簡單和高效的方式提供兩個系統(tǒng)中最好的。
Apache Kafka的一個關(guān)鍵依賴是Apache Zookeeper,它是一個分布式配置和同步服務(wù)。 Zookeeper是Kafka代理和消費(fèi)者之間的協(xié)調(diào)接口。 Kafka服務(wù)器通過Zookeeper集群共享信息。 Kafka在Zookeeper中存儲基本元數(shù)據(jù),例如關(guān)于主題,代理,消費(fèi)者偏移(隊(duì)列讀取器)等的信息。
由于所有關(guān)鍵信息存儲在Zookeeper中,并且它通常在其整體上復(fù)制此數(shù)據(jù),因此Kafka代理/ Zookeeper的故障不會影響Kafka集群的狀態(tài)。 Kafka將恢復(fù)狀態(tài),一旦Zookeeper重新啟動。 這為Kafka帶來了零停機(jī)時(shí)間。 Kafka代理之間的領(lǐng)導(dǎo)者選舉也通過使用Zookeeper在領(lǐng)導(dǎo)者失敗的情況下完成。
要了解有關(guān)Zookeeper的詳細(xì)信息,請參閱 zookeeper
讓我們繼續(xù)進(jìn)一步關(guān)于如何在您的機(jī)器上安裝Java,ZooKeeper和Kafka在下一章。
以下是在機(jī)器上安裝Java的步驟。
希望你已經(jīng)在你的機(jī)器上安裝了java,所以你只需使用下面的命令驗(yàn)證它。
$ java -version
如果java在您的機(jī)器上成功安裝,您可以看到已安裝的Java的版本。
如果沒有下載Java,請通過訪問以下鏈接并下載最新版本來下載最新版本的JDK。
http://www.oracle.com/technetwork/java/javase/downloads/index.html現(xiàn)在最新的版本是JDK 8u 60,文件是“jdk-8u60-linux-x64.tar.gz"。 請?jiān)谀臋C(jī)器上下載該文件。
通常,正在下載的文件存儲在下載文件夾中,驗(yàn)證它并使用以下命令提取tar設(shè)置。
$ cd /go/to/download/path $ tar -zxf jdk-8u60-linux-x64.gz
要將java提供給所有用戶,請將提取的java內(nèi)容移動到 usr / local / java
/ folder。
$ su password: (type password of root user) $ mkdir /opt/jdk $ mv jdk-1.8.0_60 /opt/jdk/
要設(shè)置路徑和JAVA_HOME變量,請將以下命令添加到?/ .bashrc文件。
export JAVA_HOME =/usr/jdk/jdk-1.8.0_60 export PATH=$PATH:$JAVA_HOME/bin
現(xiàn)在將所有更改應(yīng)用到當(dāng)前運(yùn)行的系統(tǒng)。
$ source ~/.bashrc
使用以下命令更改Java Alternatives。
update-alternatives --install /usr/bin/java java /opt/jdk/jdk1.8.0_60/bin/java 100
步驟1.6 - 現(xiàn)在使用第1步中說明的驗(yàn)證命令(java -version)驗(yàn)證java。
要在您的計(jì)算機(jī)上安裝ZooKeeper框架,請?jiān)L問以下鏈接并下載最新版本的ZooKeeper。
http://zookeeper.apache.org/releases.html現(xiàn)在,最新版本的ZooKeeper是3.4.6(ZooKeeper-3.4.6.tar.gz)。
使用以下命令提取tar文件
$ cd opt/ $ tar -zxf zookeeper-3.4.6.tar.gz $ cd zookeeper-3.4.6 $ mkdir data
使用命令vi“conf / zoo.cfg"打開名為 conf / zoo.cfg
的配置文件,并將所有以下參數(shù)設(shè)置為起點(diǎn)。
$ vi conf/zoo.cfg tickTime=2000 dataDir=/path/to/zookeeper/data clientPort=2181 initLimit=5 syncLimit=2
一旦配置文件成功保存并再次返回終端,您可以啟動zookeeper服務(wù)器。
$ bin/zkServer.sh start
執(zhí)行此命令后,您將得到如下所示的響應(yīng) -
$ JMX enabled by default $ Using config: /Users/../zookeeper-3.4.6/bin/../conf/zoo.cfg $ Starting zookeeper ... STARTED
$ bin/zkCli.sh
輸入上面的命令后,您將被連接到zookeeper服務(wù)器,并將獲得以下響應(yīng)。
Connecting to localhost:2181 ................ ................ ................ Welcome to ZooKeeper! ................ ................ WATCHER:: WatchedEvent state:SyncConnected type: None path:null [zk: localhost:2181(CONNECTED) 0]
連接服務(wù)器并執(zhí)行所有操作后,可以使用以下命令停止zookeeper服務(wù)器 -
$ bin/zkServer.sh stop
現(xiàn)在你已經(jīng)在你的機(jī)器上成功安裝了Java和ZooKeeper。 讓我們看看安裝Apache Kafka的步驟。
讓我們繼續(xù)以下步驟在您的機(jī)器上安裝Kafka。
要在您的機(jī)器上安裝Kafka,請點(diǎn)擊以下鏈接 -
https://www.apache.org/dyn/closer.cgi?path=/kafka/0.9.0.0/kafka_2.11-0.9.0.0.tgz現(xiàn)在最新版本,即 - kafka_2.11_0.9.0.0.tgz 將下載到您的計(jì)算機(jī)上。
使用以下命令提取tar文件 -
$ cd opt/ $ tar -zxf kafka_2.11.0.9.0.0 tar.gz $ cd kafka_2.11.0.9.0.0
現(xiàn)在您已經(jīng)在您的機(jī)器上下載了最新版本的Kafka。
您可以通過給出以下命令來啟動服務(wù)器 -
$ bin/kafka-server-start.sh config/server.properties
服務(wù)器啟動后,您會在屏幕上看到以下響應(yīng):
$ bin/kafka-server-start.sh config/server.properties [2016-01-02 15:37:30,410] INFO KafkaConfig values: request.timeout.ms = 30000 log.roll.hours = 168 inter.broker.protocol.version = 0.9.0.X log.preallocate = false security.inter.broker.protocol = PLAINTEXT ……………………………………………. …………………………………………….
執(zhí)行所有操作后,可以使用以下命令停止服務(wù)器 -
$ bin/kafka-server-stop.sh config/server.properties
現(xiàn)在我們已經(jīng)討論了Kafka安裝,我們可以在下一章中學(xué)習(xí)如何對Kafka執(zhí)行基本操作。
首先讓我們開始實(shí)現(xiàn)單節(jié)點(diǎn)單代理
配置,然后我們將我們的設(shè)置遷移到單節(jié)點(diǎn)多代理配置。
希望你現(xiàn)在可以在你的機(jī)器上安裝Java,ZooKeeper和Kafka。 在遷移到Kafka Cluster Setup之前,首先需要啟動ZooKeeper,因?yàn)镵afka Cluster使用ZooKeeper。
打開一個新終端并鍵入以下命令 -
bin/zookeeper-server-start.sh config/zookeeper.properties
要啟動Kafka Broker,請鍵入以下命令 -
bin/kafka-server-start.sh config/server.properties
啟動Kafka Broker后,在ZooKeeper終端上鍵入命令 jps
,您將看到以下響應(yīng) -
821 QuorumPeerMain 928 Kafka 931 Jps
現(xiàn)在你可以看到兩個守護(hù)進(jìn)程運(yùn)行在終端上,QuorumPeerMain是ZooKeeper守護(hù)進(jìn)程,另一個是Kafka守護(hù)進(jìn)程。
在此配置中,您有一個ZooKeeper和代理id實(shí)例。 以下是配置它的步驟 -
創(chuàng)建Kafka主題 - Kafka提供了一個名為 kafka-topics.sh
的命令行實(shí)用程序,用于在服務(wù)器上創(chuàng)建主題。 打開新終端并鍵入以下示例。
語法
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic topic-name
示例
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic Hello-Kafka
我們剛剛創(chuàng)建了一個名為 Hello-Kafka
的主題,其中包含一個分區(qū)和一個副本因子。 上面創(chuàng)建的輸出將類似于以下輸出 -
輸出 - 創(chuàng)建主題 Hello-Kafka
創(chuàng)建主題后,您可以在Kafka代理終端窗口中獲取通知,并在config / server.properties文件中的“/ tmp / kafka-logs /"中指定的創(chuàng)建主題的日志。
要獲取Kafka服務(wù)器中的主題列表,可以使用以下命令 -
語法
bin/kafka-topics.sh --list --zookeeper localhost:2181
輸出
Hello-Kafka
由于我們已經(jīng)創(chuàng)建了一個主題,它將僅列出 Hello-Kafka
。 假設(shè),如果創(chuàng)建多個主題,您將在輸出中獲取主題名稱。
語法
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic topic-name
從上面的語法,生產(chǎn)者命令行客戶端需要兩個主要參數(shù) -
代理列表 - 我們要發(fā)送郵件的代理列表。 在這種情況下,我們只有一個代理。 Config / server.properties文件包含代理端口ID,因?yàn)槲覀冎牢覀兊拇碚趥陕牰丝?092,因此您可以直接指定它。
主題名稱 - 以下是主題名稱的示例。
示例
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic Hello-Kafka
生產(chǎn)者將等待來自stdin的輸入并發(fā)布到Kafka集群。 默認(rèn)情況下,每個新行都作為新消息發(fā)布,然后在 config / producer.properties
文件中指定默認(rèn)生產(chǎn)者屬性。 現(xiàn)在,您可以在終端中鍵入幾行消息,如下所示。
輸出
$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic Hello-Kafka[2016-01-16 13:50:45,931] WARN property topic is not valid (kafka.utils.Verifia-bleProperties) Hello My first message
My second message
與生產(chǎn)者類似,在 config / consumer.proper-ties
文件中指定了缺省使用者屬性。 打開一個新終端并鍵入以下消息消息語法。
語法
bin/kafka-console-consumer.sh --zookeeper localhost:2181 —topic topic-name --from-beginning
示例
bin/kafka-console-consumer.sh --zookeeper localhost:2181 —topic Hello-Kafka --from-beginning
輸出
Hello My first message My second message
最后,您可以從制作商的終端輸入消息,并看到他們出現(xiàn)在消費(fèi)者的終端。 到目前為止,您對具有單個代理的單節(jié)點(diǎn)群集有非常好的了解。 現(xiàn)在讓我們繼續(xù)討論多個代理配置。
在進(jìn)入多個代理集群設(shè)置之前,首先啟動ZooKeeper服務(wù)器。
創(chuàng)建多個Kafka Brokers - 我們在配置/ server.properties中已有一個Kafka代理實(shí)例。 現(xiàn)在我們需要多個代理實(shí)例,因此將現(xiàn)有的server.prop-erties文件復(fù)制到兩個新的配置文件中,并將其重命名為server-one.properties和server-two.properties。 然后編輯這兩個新文件并分配以下更改 -
# The id of the broker. This must be set to a unique integer for each broker. broker.id=1 # The port the socket server listens on port=9093 # A comma seperated list of directories under which to store log files log.dirs=/tmp/kafka-logs-1
# The id of the broker. This must be set to a unique integer for each broker. broker.id=2 # The port the socket server listens on port=9094 # A comma seperated list of directories under which to store log files log.dirs=/tmp/kafka-logs-2
啟動多個代理 - 在三臺服務(wù)器上進(jìn)行所有更改后,打開三個新終端,逐個啟動每個代理。
Broker1 bin/kafka-server-start.sh config/server.properties Broker2 bin/kafka-server-start.sh config/server-one.properties Broker3 bin/kafka-server-start.sh config/server-two.properties
現(xiàn)在我們有三個不同的經(jīng)紀(jì)人在機(jī)器上運(yùn)行。 自己嘗試,通過在ZooKeeper終端上鍵入 jps 檢查所有守護(hù)程序,然后您將看到響應(yīng)。
讓我們?yōu)榇酥黝}將復(fù)制因子值指定為三個,因?yàn)槲覀冇腥齻€不同的代理運(yùn)行。 如果您有兩個代理,那么分配的副本值將是兩個。
語法
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 -partitions 1 --topic topic-name
示例
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 -partitions 1 --topic Multibrokerapplication
輸出
created topic “Multibrokerapplication"
Describe
命令用于檢查哪個代理正在偵聽當(dāng)前創(chuàng)建的主題,如下所示 -
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic Multibrokerappli-cation
輸出
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic Multibrokerappli-cation Topic:Multibrokerapplication PartitionCount:1 ReplicationFactor:3 Configs: Topic:Multibrokerapplication Partition:0 Leader:0 Replicas:0,2,1 Isr:0,2,1
從上面的輸出,我們可以得出結(jié)論,第一行給出所有分區(qū)的摘要,顯示主題名稱,分區(qū)數(shù)量和我們已經(jīng)選擇的復(fù)制因子。 在第二行中,每個節(jié)點(diǎn)將是分區(qū)的隨機(jī)選擇部分的領(lǐng)導(dǎo)者。
在我們的例子中,我們看到我們的第一個broker(with broker.id 0)是領(lǐng)導(dǎo)者。 然后Replicas:0,2,1意味著所有代理復(fù)制主題最后 Isr
是 in-sync
副本的集合。 那么,這是副本的子集,當(dāng)前活著并被領(lǐng)導(dǎo)者趕上。
此過程保持與單代理設(shè)置中相同。
示例
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic Multibrokerapplication
輸出
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic Multibrokerapplication [2016-01-20 19:27:21,045] WARN Property topic is not valid (kafka.utils.Verifia-bleProperties) This is single node-multi broker demo This is the second message
此過程保持與單代理設(shè)置中所示的相同。
示例
bin/kafka-console-consumer.sh --zookeeper localhost:2181 —topic Multibrokerapplica-tion --from-beginning
輸出
bin/kafka-console-consumer.sh --zookeeper localhost:2181 —topic Multibrokerapplica-tion —from-beginning This is single node-multi broker demo This is the second message
在本章中,我們將討論各種基本主題操作。
您已經(jīng)了解如何在Kafka Cluster中創(chuàng)建主題。 現(xiàn)在讓我們使用以下命令修改已創(chuàng)建的主題
語法
bin/kafka-topics.sh —zookeeper localhost:2181 --alter --topic topic_name --parti-tions count
示例
We have already created a topic “Hello-Kafka" with single partition count and one replica factor. Now using “alter" command we have changed the partition count. bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic Hello-kafka --parti-tions 2
輸出
WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected Adding partitions succeeded!
要刪除主題,可以使用以下語法。
語法
bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic topic_name
示例
bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic Hello-kafka
輸出
> Topic Hello-kafka marked for deletion
注意 - 如果 delete.topic.enable 未設(shè)置為true,則此操作不會產(chǎn)生任何影響
讓我們使用Java客戶端創(chuàng)建一個用于發(fā)布和使用消息的應(yīng)用程序。 Kafka生產(chǎn)者客戶端包括以下API。
讓我們了解本節(jié)中最重要的一組Kafka生產(chǎn)者API。 KafkaProducer API的中心部分是 KafkaProducer
類。 KafkaProducer類提供了一個選項(xiàng),用于將其構(gòu)造函數(shù)中的Kafka代理連接到以下方法。
KafkaProducer類提供send方法以異步方式將消息發(fā)送到主題。 send()的簽名如下
producer.send(new ProducerRecord<byte[],byte[]>(topic, partition, key1, value1) , callback);
ProducerRecord - 生產(chǎn)者管理等待發(fā)送的記錄的緩沖區(qū)。
回調(diào) - 當(dāng)服務(wù)器確認(rèn)記錄時(shí)執(zhí)行的用戶提供的回調(diào)(null表示無回調(diào))。
KafkaProducer類提供了一個flush方法,以確保所有先前發(fā)送的消息都已實(shí)際完成。 flush方法的語法如下 -
public void flush()
KafkaProducer類提供了partitionFor方法,這有助于獲取給定主題的分區(qū)元數(shù)據(jù)。 這可以用于自定義分區(qū)。 這種方法的簽名如下 -
public Map metrics()
它返回由生產(chǎn)者維護(hù)的內(nèi)部度量的映射。
public void close() - KafkaProducer類提供關(guān)閉方法塊,直到所有先前發(fā)送的請求完成。
生產(chǎn)者API的中心部分是生產(chǎn)者
類。 生產(chǎn)者類提供了一個選項(xiàng),通過以下方法在其構(gòu)造函數(shù)中連接Kafka代理。
生產(chǎn)者類提供send方法以使用以下簽名向單個或多個主題發(fā)送消息。
public void send(KeyedMessaget<k,v> message) - sends the data to a single topic,par-titioned by key using either sync or async producer. public void send(List<KeyedMessage<k,v>>messages) - sends data to multiple topics. Properties prop = new Properties(); prop.put(producer.type,"async") ProducerConfig config = new ProducerConfig(prop);
有兩種類型的生產(chǎn)者 - 同步和異步。
相同的API配置也適用于同步
生產(chǎn)者。 它們之間的區(qū)別是同步生成器直接發(fā)送消息,但在后臺發(fā)送消息。 當(dāng)您想要更高的吞吐量時(shí),異步生產(chǎn)者是首選。 在以前的版本,如0.8,一個異步生產(chǎn)者沒有回調(diào)send()注冊錯誤處理程序。 這僅在當(dāng)前版本0.9中可用。
生產(chǎn)者類提供關(guān)閉方法以關(guān)閉與所有Kafka代理的生產(chǎn)者池連接。
下表列出了Producer API的主要配置設(shè)置,以便更好地理解 -
S.No | 配置設(shè)置和說明 |
---|---|
1 | client.id 標(biāo)識生產(chǎn)者應(yīng)用程序 |
2 | producer.type 同步或異步 |
3 | acks acks配置控制生產(chǎn)者請求下的標(biāo)準(zhǔn)是完全的。 |
4 | 重試 如果生產(chǎn)者請求失敗,則使用特定值自動重試。 |
5 | bootstrapping代理列表。 |
6 | linger.ms 如果你想減少請求的數(shù)量,你可以將linger.ms設(shè)置為大于某個值的東西。 |
7 | key.serializer 序列化器接口的鍵。 |
8 | value.serializer 值。 |
9 | batch.size 緩沖區(qū)大小。 |
10 | buffer.memory 控制生產(chǎn)者可用于緩沖的存儲器的總量。 |
ProducerRecord是發(fā)送到Kafka cluster.ProducerRecord類構(gòu)造函數(shù)的鍵/值對,用于使用以下簽名創(chuàng)建具有分區(qū),鍵和值對的記錄。
public ProducerRecord (string topic, int partition, k key, v value)
主題 - 將附加到記錄的用戶定義的主題名稱。
分區(qū) - 分區(qū)計(jì)數(shù)
鍵 - 將包含在記錄中的鍵。
public ProducerRecord (string topic, k key, v value)
ProducerRecord類構(gòu)造函數(shù)用于創(chuàng)建帶有鍵,值對和無分區(qū)的記錄。
主題 - 創(chuàng)建主題以分配記錄。
鍵 - 記錄的鍵。
值 - 記錄內(nèi)容。
public ProducerRecord (string topic, v value)
ProducerRecord類創(chuàng)建一個沒有分區(qū)和鍵的記錄。
主題 - 創(chuàng)建主題。
值 - 記錄內(nèi)容。
ProducerRecord類方法列在下表中 -
S.No | 類方法和描述 |
---|---|
1 | public string topic() 主題將附加到記錄。 |
2 | public K key() 將包括在記錄中的鍵。 如果沒有這樣的鍵,null將在這里重新打開。 |
3 | public V value() 記錄內(nèi)容。 |
4 | partition() 記錄的分區(qū)計(jì)數(shù) |
在創(chuàng)建應(yīng)用程序之前,首先啟動ZooKeeper和Kafka代理,然后使用create topic命令在Kafka代理中創(chuàng)建自己的主題。 之后,創(chuàng)建一個名為 Sim-pleProducer.java
的java類,然后鍵入以下代碼。
//import util.properties packages import java.util.Properties; //import simple producer packages import org.apache.kafka.clients.producer.Producer; //import KafkaProducer packages import org.apache.kafka.clients.producer.KafkaProducer; //import ProducerRecord packages import org.apache.kafka.clients.producer.ProducerRecord; //Create java class named “SimpleProducer" public class SimpleProducer { public static void main(String[] args) throws Exception{ // Check arguments length value if(args.length == 0){ System.out.println("Enter topic name"); return; } //Assign topicName to string variable String topicName = args[0].toString(); // create instance for properties to access producer configs Properties props = new Properties(); //Assign localhost id props.put("bootstrap.servers", “l(fā)ocalhost:9092"); //Set acknowledgements for producer requests. props.put("acks", “all"); //If the request fails, the producer can automatically retry, props.put("retries", 0); //Specify buffer size in config props.put("batch.size", 16384); //Reduce the no of requests less than 0 props.put("linger.ms", 1); //The buffer.memory controls the total amount of memory available to the producer for buffering. props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serializa-tion.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serializa-tion.StringSerializer"); Producer<String, String> producer = new KafkaProducer <String, String>(props); for(int i = 0; i < 10; i++) producer.send(new ProducerRecord<String, String>(topicName, Integer.toString(i), Integer.toString(i))); System.out.println(“Message sent successfully"); producer.close(); } }
編譯 - 可以使用以下命令編譯應(yīng)用程序。
javac -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*" *.java
執(zhí)行 - 可以使用以下命令執(zhí)行應(yīng)用程序。
java -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*":. SimpleProducer <topic-name>
輸出
Message sent successfully To check the above output open new terminal and type Consumer CLI command to receive messages. >> bin/kafka-console-consumer.sh --zookeeper localhost:2181 —topic <topic-name> —from-beginning 1 2 3 4 5 6 7 8 9 10
到目前為止,我們已經(jīng)創(chuàng)建了一個發(fā)送消息到Kafka集群的生產(chǎn)者。 現(xiàn)在讓我們創(chuàng)建一個消費(fèi)者來消費(fèi)Kafka集群的消息。 KafkaConsumer API用于消費(fèi)來自Kafka集群的消息。 KafkaConsumer類的構(gòu)造函數(shù)定義如下。
public KafkaConsumer(java.util.Map<java.lang.String,java.lang.Object> configs)
configs - 返回消費(fèi)者配置的地圖。
KafkaConsumer類具有下表中列出的以下重要方法。
S.No | 方法和說明 |
---|---|
1 | public java.util.Set< TopicPar- tition> assignment() 獲取由用戶當(dāng)前分配的分區(qū)集。 |
2 | public string subscription() 訂閱給定的主題列表以獲取動態(tài)簽名的分區(qū)。 |
3 | public void sub-scribe(java.util.List< java.lang.String> topics,ConsumerRe-balanceListener listener) 訂閱給定的主題列表以獲取動態(tài)簽名的分區(qū)。 |
4 | public void unsubscribe() 從給定的分區(qū)列表中取消訂閱主題。 |
5 | public void sub-scribe(java.util.List< java.lang.String> topics) 訂閱給定的主題列表以獲取動態(tài)簽名的分區(qū)。 如果給定的主題列表為空,則將其視為與unsubscribe()相同。 |
6 | public void sub-scribe(java.util.regex.Pattern pattern,ConsumerRebalanceLis-tener listener) 參數(shù)模式以正則表達(dá)式的格式引用預(yù)訂模式,而偵聽器參數(shù)從預(yù)訂模式獲取通知。 |
7 | public void as-sign(java.util.List< TopicPartion> partitions) 向客戶手動分配分區(qū)列表。 |
8 | poll() 使用預(yù)訂/分配API之一獲取指定的主題或分區(qū)的數(shù)據(jù)。 如果在輪詢數(shù)據(jù)之前未預(yù)訂主題,這將返回錯誤。 |
9 | public void commitSync() 提交對主題和分區(qū)的所有子編制列表的最后一次poll()返回的提交偏移量。 相同的操作應(yīng)用于commitAsyn()。 |
10 | public void seek(TopicPartition partition,long offset) 獲取消費(fèi)者將在下一個poll()方法中使用的當(dāng)前偏移值。 |
11 | public void resume() 恢復(fù)暫停的分區(qū)。 |
12 | public void wakeup() 喚醒消費(fèi)者。 |
ConsumerRecord API用于從Kafka集群接收記錄。 此API由主題名稱,分區(qū)號(從中接收記錄)和指向Kafka分區(qū)中的記錄的偏移量組成。 ConsumerRecord類用于創(chuàng)建具有特定主題名稱,分區(qū)計(jì)數(shù)和< key,value>的消費(fèi)者記錄。 對。 它有以下簽名。
public ConsumerRecord(string topic,int partition, long offset,K key, V value)
主題 - 從Kafka集群接收的使用者記錄的主題名稱。
分區(qū) - 主題的分區(qū)。
鍵 - 記錄的鍵,如果沒有鍵存在null將被返回。
值 - 記錄內(nèi)容。
ConsumerRecords API充當(dāng)ConsumerRecord的容器。 此API用于保存特定主題的每個分區(qū)的ConsumerRecord列表。 它的構(gòu)造器定義如下。
public ConsumerRecords(java.util.Map<TopicPartition,java.util.List <Consumer-Record>K,V>>> records)
TopicPartition - 返回特定主題的分區(qū)地圖。
記錄 - ConsumerRecord的返回列表。
ConsumerRecords類定義了以下方法。
S.No | 方法和描述 |
---|---|
1 | public int count() 所有主題的記錄數(shù)。 |
2 | public Set partitions() 在此記錄集中具有數(shù)據(jù)的分區(qū)集(如果沒有返回?cái)?shù)據(jù),則該集為空)。 |
3 | public Iterator iterator() 迭代器使您可以循環(huán)訪問集合,獲取或重新移動元素。 |
4 | public List records() 獲取給定分區(qū)的記錄列表。 |
Consumer客戶端API主配置設(shè)置的配置設(shè)置如下所示 -
S.No | 設(shè)置和說明 |
---|---|
1 | 引導(dǎo)代理列表。 |
2 | group.id 將單個消費(fèi)者分配給組。 |
3 | enable.auto.commit 如果值為true,則為偏移啟用自動落實(shí),否則不提交。 |
4 | auto.commit.interval.ms 返回更新的消耗偏移量寫入ZooKeeper的頻率。 |
5 | session.timeout.ms 表示Kafka在放棄和繼續(xù)消費(fèi)消息之前等待ZooKeeper響應(yīng)請求(讀取或?qū)懭?多少毫秒。 |
生產(chǎn)者應(yīng)用程序步驟在此保持不變。 首先,啟動你的ZooKeeper和Kafka代理。 然后使用名為 SimpleCon-sumer.java
的Java類創(chuàng)建一個 SimpleConsumer
應(yīng)用程序,并鍵入以下代碼。
import java.util.Properties; import java.util.Arrays; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.ConsumerRecord; public class SimpleConsumer { public static void main(String[] args) throws Exception { if(args.length == 0){ System.out.println("Enter topic name"); return; } //Kafka consumer configuration settings String topicName = args[0].toString(); Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "test"); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("session.timeout.ms", "30000"); props.put("key.deserializer", "org.apache.kafka.common.serializa-tion.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serializa-tion.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer <String, String>(props); //Kafka Consumer subscribes list of topics here. consumer.subscribe(Arrays.asList(topicName)) //print the topic name System.out.println("Subscribed to topic " + topicName); int i = 0; while (true) { ConsumerRecords<String, String> records = con-sumer.poll(100); for (ConsumerRecord<String, String> record : records) // print the offset,key and value for the consumer records. System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value()); } } }
編譯 - 可以使用以下命令編譯應(yīng)用程序。
javac -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*" *.java
執(zhí)行 - 可以使用以下命令執(zhí)行應(yīng)用程序
java -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*":. SimpleConsumer <topic-name>
輸入 - 打開生成器CLI并向主題發(fā)送一些消息。 你可以把smple輸入為\'Hello Consumer\'。
輸出 - 以下是輸出。
Subscribed to topic Hello-Kafka offset = 3, key = null, value = Hello Consumer
消費(fèi)群是多線程或多機(jī)器的Kafka主題。
消費(fèi)者可以使用相同的 group.id
加入群組
一個組的最大并行度是組中的消費(fèi)者數(shù)量←不是分區(qū)。
Kafka將主題的分區(qū)分配給組中的使用者,以便每個分區(qū)僅由組中的一個使用者使用。
Kafka保證消息只能被組中的一個消費(fèi)者讀取。
消費(fèi)者可以按照消息存儲在日志中的順序查看消息。
添加更多進(jìn)程/線程將導(dǎo)致Kafka重新平衡。 如果任何消費(fèi)者或代理無法向ZooKeeper發(fā)送心跳,則可以通過Kafka集群重新配置。 在此重新平衡期間,Kafka將分配可用分區(qū)到可用線程,可能將分區(qū)移動到另一個進(jìn)程。
import java.util.Properties; import java.util.Arrays; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.ConsumerRecord; public class ConsumerGroup { public static void main(String[] args) throws Exception { if(args.length < 2){ System.out.println("Usage: consumer <topic> <groupname>"); return; } String topic = args[0].toString(); String group = args[1].toString(); Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", group); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("session.timeout.ms", "30000"); props.put("key.deserializer", "org.apache.kafka.common.serializa-tion.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serializa-tion.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props); consumer.subscribe(Arrays.asList(topic)); System.out.println("Subscribed to topic " + topic); int i = 0; while (true) { ConsumerRecords<String, String> records = con-sumer.poll(100); for (ConsumerRecord<String, String> record : records) System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value()); } } }
javac -cp “/path/to/kafka/kafka_2.11-0.9.0.0/libs/*" ConsumerGroup.java
>>java -cp “/path/to/kafka/kafka_2.11-0.9.0.0/libs/*":. ConsumerGroup <topic-name> my-group >>java -cp "/home/bala/Workspace/kafka/kafka_2.11-0.9.0.0/libs/*":. ConsumerGroup <topic-name> my-group
在這里,我們?yōu)閮蓚€消費(fèi)者創(chuàng)建了一個示例組名稱為 my-group
。 同樣,您可以在組中創(chuàng)建您的組和消費(fèi)者數(shù)量。
打開生產(chǎn)者CLI并發(fā)送一些消息 -
Test consumer group 01 Test consumer group 02
Subscribed to topic Hello-kafka offset = 3, key = null, value = Test consumer group 01
Subscribed to topic Hello-kafka offset = 3, key = null, value = Test consumer group 02
現(xiàn)在希望你能通過使用Java客戶端演示了解SimpleConsumer和ConsumeGroup。 現(xiàn)在,您了解如何使用Java客戶端發(fā)送和接收消息。 讓我們在下一章繼續(xù)Kafka與大數(shù)據(jù)技術(shù)的集成。
在本章中,我們將學(xué)習(xí)如何將Kafka與Apache Storm集成。
Storm最初由Nathan Marz和BackType的團(tuán)隊(duì)創(chuàng)建。 在短時(shí)間內(nèi),Apache Storm成為分布式實(shí)時(shí)處理系統(tǒng)的標(biāo)準(zhǔn),允許您處理大量數(shù)據(jù)。 Storm是非??斓?,并且一個基準(zhǔn)時(shí)鐘為每個節(jié)點(diǎn)每秒處理超過一百萬個元組。 Apache Storm持續(xù)運(yùn)行,從配置的源(Spouts)消耗數(shù)據(jù),并將數(shù)據(jù)傳遞到處理管道(Bolts)。 聯(lián)合,spout和Bolt
做一個拓?fù)洹?/span>
Kafka和Storm自然互補(bǔ),它們強(qiáng)大的合作能夠?qū)崿F(xiàn)快速移動的大數(shù)據(jù)的實(shí)時(shí)流分析。 Kafka和Storm集成是為了使開發(fā)人員更容易地從Storm拓?fù)浍@取和發(fā)布數(shù)據(jù)流。
spout是流的源。 例如,一個spout可以從Kafka Topic讀取元組并將它們作為流發(fā)送。 Bolt消耗輸入流,處理并可能發(fā)射新的流。 Bolt可以從運(yùn)行函數(shù),過濾元組,執(zhí)行流聚合,流連接,與數(shù)據(jù)庫交談等等做任何事情。 Storm拓?fù)渲械拿總€節(jié)點(diǎn)并行執(zhí)行。 拓?fù)錈o限運(yùn)行,直到終止它。 Storm將自動重新分配任何失敗的任務(wù)。 此外,Storm保證沒有數(shù)據(jù)丟失,即使機(jī)器停機(jī)和消息被丟棄。
讓我們詳細(xì)了解Kafka-Storm集成API。 有三個主要類集成Kafka與Storm。 他們?nèi)缦?-
BrokerHosts是一個接口,ZkHosts和StaticHosts是它的兩個主要實(shí)現(xiàn)。 ZkHosts用于通過在ZooKeeper中維護(hù)細(xì)節(jié)來動態(tài)跟蹤Kafka代理,而StaticHosts用于手動/靜態(tài)設(shè)置Kafka代理及其詳細(xì)信息。 ZkHosts是訪問Kafka代理的簡單快捷的方式。
ZkHosts的簽名如下 -
public ZkHosts(String brokerZkStr, String brokerZkPath) public ZkHosts(String brokerZkStr)
其中brokerZkStr是ZooKeeper主機(jī),brokerZkPath是ZooKeeper路徑以維護(hù)Kafka代理詳細(xì)信息。
此API用于定義Kafka集群的配置設(shè)置。 Kafka Con-fig的簽名定義如下
public KafkaConfig(BrokerHosts hosts, string topic)
主機(jī) - BrokerHosts可以是ZkHosts / StaticHosts。
主題 - 主題名稱。
Spoutconfig是KafkaConfig的擴(kuò)展,支持額外的ZooKeeper信息。
public SpoutConfig(BrokerHosts hosts, string topic, string zkRoot, string id)
主機(jī) - BrokerHosts可以是BrokerHosts接口的任何實(shí)現(xiàn)
主題 - 主題名稱。
zkRoot - ZooKeeper根路徑。
id - spouts存儲在Zookeeper中消耗的偏移量的狀態(tài)。 ID應(yīng)該唯一標(biāo)識您的spout。
SchemeAsMultiScheme是一個接口,用于指示如何將從Kafka中消耗的ByteBuffer轉(zhuǎn)換為Storm元組。 它源自MultiScheme并接受Scheme類的實(shí)現(xiàn)。 有很多Scheme類的實(shí)現(xiàn),一個這樣的實(shí)現(xiàn)是StringScheme,它將字節(jié)解析為一個簡單的字符串。 它還控制輸出字段的命名。 簽名定義如下。
public SchemeAsMultiScheme(Scheme scheme)
方案 - 從kafka消耗的字節(jié)緩沖區(qū)。
KafkaSpout是我們的spout實(shí)現(xiàn),它將與Storm集成。 它從kafka主題獲取消息,并將其作為元組發(fā)送到Storm生態(tài)系統(tǒng)。 KafkaSpout從SpoutConfig獲取其配置詳細(xì)信息。
下面是一個創(chuàng)建一個簡單的Kafka spout的示例代碼。
// ZooKeeper connection string BrokerHosts hosts = new ZkHosts(zkConnString); //Creating SpoutConfig Object SpoutConfig spoutConfig = new SpoutConfig(hosts, topicName, "/" + topicName UUID.randomUUID().toString()); //convert the ByteBuffer to String. spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme()); //Assign SpoutConfig to KafkaSpout. KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
Bolt是一個使用元組作為輸入,處理元組,并產(chǎn)生新的元組作為輸出的組件。 Bolt將實(shí)現(xiàn)IRichBolt接口。 在此程序中,使用兩個Bolt類WordSplitter-Bolt和WordCounterBolt來執(zhí)行操作。
IRichBolt接口有以下方法 -
準(zhǔn)備 - 為Bolt提供要執(zhí)行的環(huán)境。 執(zhí)行器將運(yùn)行此方法來初始化spout。
執(zhí)行 - 處理單個元組的輸入。
清理 - 當(dāng)Bolt要關(guān)閉時(shí)調(diào)用。
declareOutputFields - 聲明元組的輸出模式。
讓我們創(chuàng)建SplitBolt.java,它實(shí)現(xiàn)邏輯分割一個句子到詞和CountBolt.java,它實(shí)現(xiàn)邏輯分離獨(dú)特的單詞和計(jì)數(shù)其出現(xiàn)。
import java.util.Map; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; import backtype.storm.task.OutputCollector; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.IRichBolt; import backtype.storm.task.TopologyContext; public class SplitBolt implements IRichBolt { private OutputCollector collector; @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.collector = collector; } @Override public void execute(Tuple input) { String sentence = input.getString(0); String[] words = sentence.split(" "); for(String word: words) { word = word.trim(); if(!word.isEmpty()) { word = word.toLowerCase(); collector.emit(new Values(word)); } } collector.ack(input); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); } @Override public void cleanup() {} @Override public Map<String, Object> getComponentConfiguration() { return null; } }
import java.util.Map; import java.util.HashMap; import backtype.storm.tuple.Tuple; import backtype.storm.task.OutputCollector; import backtype.storm.topology.
更多建議: