Apache Kafka 基礎(chǔ)

2021-07-27 16:23 更新

對(duì)于大數(shù)據(jù),我們要考慮的問(wèn)題有很多,首先海量數(shù)據(jù)如何收集(如 Flume),然后對(duì)于收集到的數(shù)據(jù)如何存儲(chǔ)(典型的分布式文件系統(tǒng) HDFS、分布式數(shù)據(jù)庫(kù) HBase、NoSQL 數(shù)據(jù)庫(kù) Redis),其次存儲(chǔ)的數(shù)據(jù)不是存起來(lái)就沒(méi)事了,要通過(guò)計(jì)算從中獲取有用的信息,這就涉及到計(jì)算模型(典型的離線計(jì)算 MapReduce、流式實(shí)時(shí)計(jì)算Storm、Spark),或者要從數(shù)據(jù)中挖掘信息,還需要相應(yīng)的機(jī)器學(xué)習(xí)算法。在這些之上,還有一些各種各樣的查詢(xún)分析數(shù)據(jù)的工具(如 Hive、Pig 等)。除此之外,要構(gòu)建分布式應(yīng)用還需要一些工具,比如分布式協(xié)調(diào)服務(wù) Zookeeper 等等。

??這里,我們講到的是消息系統(tǒng),Kafka 專(zhuān)為分布式高吞吐量系統(tǒng)而設(shè)計(jì),其他消息傳遞系統(tǒng)相比,Kafka 具有更好的吞吐量,內(nèi)置分區(qū),復(fù)制和固有的容錯(cuò)能力,這使得它非常適合大規(guī)模消息處理應(yīng)用程序。

(一)消息系統(tǒng)

??首先,我們理解一下什么是消息系統(tǒng):消息系統(tǒng)負(fù)責(zé)將數(shù)據(jù)從一個(gè)應(yīng)用程序傳輸?shù)搅硗庖粋€(gè)應(yīng)用程序,使得應(yīng)用程序可以專(zhuān)注于處理邏輯,而不用過(guò)多的考慮如何將消息共享出去。

??分布式消息系統(tǒng)基于可靠消息隊(duì)列的方式,消息在應(yīng)用程序和消息系統(tǒng)之間異步排隊(duì)。實(shí)際上,消息系統(tǒng)有兩種消息傳遞模式:一種是點(diǎn)對(duì)點(diǎn),另外一種是基于發(fā)布-訂閱(publish-subscribe)的消息系統(tǒng)。

1、點(diǎn)對(duì)點(diǎn)的消息系統(tǒng)

??在點(diǎn)對(duì)點(diǎn)的消息系統(tǒng)中,消息保留在隊(duì)列中,一個(gè)或者多個(gè)消費(fèi)者可以消耗隊(duì)列中的消息,但是消息最多只能被一個(gè)消費(fèi)者消費(fèi),一旦有一個(gè)消費(fèi)者將其消費(fèi)掉,消息就從該隊(duì)列中消失。這里要注意:多個(gè)消費(fèi)者可以同時(shí)工作,但是最終能拿到該消息的只有其中一個(gè)。最典型的例子就是訂單處理系統(tǒng),多個(gè)訂單處理器可以同時(shí)工作,但是對(duì)于一個(gè)特定的訂單,只有其中一個(gè)訂單處理器可以拿到該訂單進(jìn)行處理。

2、發(fā)布-訂閱消息系統(tǒng)

??在發(fā)布 - 訂閱系統(tǒng)中,消息被保留在主題中。 與點(diǎn)對(duì)點(diǎn)系統(tǒng)不同,消費(fèi)者可以訂閱一個(gè)或多個(gè)主題并使用該主題中的所有消息。在發(fā)布 - 訂閱系統(tǒng)中,消息生產(chǎn)者稱(chēng)為發(fā)布者,消息使用者稱(chēng)為訂閱者。 一個(gè)現(xiàn)實(shí)生活的例子是Dish電視,它發(fā)布不同的渠道,如運(yùn)動(dòng),電影,音樂(lè)等,任何人都可以訂閱自己的頻道集,并獲得他們訂閱的頻道時(shí)可用。

(二)Apache Kafka 簡(jiǎn)介

??Kafka is a distributed,partitioned,replicated commit logservice。

??Apache Kafka 是一個(gè)分布式發(fā)布 - 訂閱消息系統(tǒng)和一個(gè)強(qiáng)大的隊(duì)列,可以處理大量的數(shù)據(jù),并使你能夠?qū)⑾囊粋€(gè)端點(diǎn)傳遞到另一個(gè)端點(diǎn)。 Kafka 適合離線和在線消息消費(fèi)。 Kafka 消息保留在磁盤(pán)上,并在群集內(nèi)復(fù)制以防止數(shù)據(jù)丟失。 Kafka 構(gòu)建在 ZooKeeper 同步服務(wù)之上。 它與 Apache Storm 和 Spark 非常好地集成,用于實(shí)時(shí)流式數(shù)據(jù)分析。

??Kafka 是一個(gè)分布式消息隊(duì)列,具有高性能、持久化、多副本備份、橫向擴(kuò)展能力。生產(chǎn)者往隊(duì)列里寫(xiě)消息,消費(fèi)者從隊(duì)列里取消息進(jìn)行業(yè)務(wù)邏輯。一般在架構(gòu)設(shè)計(jì)中起到解耦、削峰、異步處理的作用。

??關(guān)鍵術(shù)語(yǔ):

??(1)生產(chǎn)者和消費(fèi)者(producer和consumer):消息的發(fā)送者叫 Producer,消息的使用者和接受者是 Consumer,生產(chǎn)者將數(shù)據(jù)保存到 Kafka 集群中,消費(fèi)者從中獲取消息進(jìn)行業(yè)務(wù)的處理。

??(2)broker:Kafka 集群中有很多臺(tái) Server,其中每一臺(tái) Server 都可以存儲(chǔ)消息,將每一臺(tái) Server 稱(chēng)為一個(gè) kafka 實(shí)例,也叫做 broker。

??(3)主題(topic):一個(gè) topic 里保存的是同一類(lèi)消息,相當(dāng)于對(duì)消息的分類(lèi),每個(gè) producer 將消息發(fā)送到 kafka 中,都需要指明要存的 topic 是哪個(gè),也就是指明這個(gè)消息屬于哪一類(lèi)。

??(4)分區(qū)(partition):每個(gè) topic 都可以分成多個(gè) partition,每個(gè) partition 在存儲(chǔ)層面是 append log 文件。任何發(fā)布到此 partition 的消息都會(huì)被直接追加到 log 文件的尾部。為什么要進(jìn)行分區(qū)呢?最根本的原因就是:kafka基于文件進(jìn)行存儲(chǔ),當(dāng)文件內(nèi)容大到一定程度時(shí),很容易達(dá)到單個(gè)磁盤(pán)的上限,因此,采用分區(qū)的辦法,一個(gè)分區(qū)對(duì)應(yīng)一個(gè)文件,這樣就可以將數(shù)據(jù)分別存儲(chǔ)到不同的server上去,另外這樣做也可以負(fù)載均衡,容納更多的消費(fèi)者。

??(5)偏移量(Offset):一個(gè)分區(qū)對(duì)應(yīng)一個(gè)磁盤(pán)上的文件,而消息在文件中的位置就稱(chēng)為 offset(偏移量),offset 為一個(gè) long 型數(shù)字,它可以唯一標(biāo)記一條消息。由于kafka 并沒(méi)有提供其他額外的索引機(jī)制來(lái)存儲(chǔ) offset,文件只能順序的讀寫(xiě),所以在kafka中幾乎不允許對(duì)消息進(jìn)行“隨機(jī)讀寫(xiě)”。

??綜上,我們總結(jié)一下 Kafka 的幾個(gè)要點(diǎn):

  • kafka 是一個(gè)基于發(fā)布-訂閱的分布式消息系統(tǒng)(消息隊(duì)列)
  • Kafka 面向大數(shù)據(jù),消息保存在主題中,而每個(gè) topic 有分為多個(gè)分區(qū)
  • kafka 的消息數(shù)據(jù)保存在磁盤(pán),每個(gè) partition 對(duì)應(yīng)磁盤(pán)上的一個(gè)文件,消息寫(xiě)入就是簡(jiǎn)單的文件追加,文件可以在集群內(nèi)復(fù)制備份以防丟失
  • 即使消息被消費(fèi),kafka 也不會(huì)立即刪除該消息,可以通過(guò)配置使得過(guò)一段時(shí)間后自動(dòng)刪除以釋放磁盤(pán)空間
  • kafka依賴(lài)分布式協(xié)調(diào)服務(wù)Zookeeper,適合離線/在線信息的消費(fèi),與 storm 和 spark 等實(shí)時(shí)流式數(shù)據(jù)分析常常結(jié)合使用

(三)Apache Kafka基本原理

??通過(guò)之前的介紹,我們對(duì) kafka 有了一個(gè)簡(jiǎn)單的理解,它的設(shè)計(jì)初衷是建立一個(gè)統(tǒng)一的信息收集平臺(tái),使其可以做到對(duì)信息的實(shí)時(shí)反饋。Kafka is a distributed,partitioned,replicated commit logservice。接下來(lái)我們著重從幾個(gè)方面分析其基本原理。

1、分布式和分區(qū)(distributed、partitioned)

??我們說(shuō) kafka 是一個(gè)分布式消息系統(tǒng),所謂的分布式,實(shí)際上我們已經(jīng)大致了解。消息保存在 Topic 中,而為了能夠?qū)崿F(xiàn)大數(shù)據(jù)的存儲(chǔ),一個(gè) topic 劃分為多個(gè)分區(qū),每個(gè)分區(qū)對(duì)應(yīng)一個(gè)文件,可以分別存儲(chǔ)到不同的機(jī)器上,以實(shí)現(xiàn)分布式的集群存儲(chǔ)。另外,每個(gè) partition 可以有一定的副本,備份到多臺(tái)機(jī)器上,以提高可用性。

??總結(jié)起來(lái)就是:一個(gè) topic 對(duì)應(yīng)的多個(gè) partition 分散存儲(chǔ)到集群中的多個(gè) broker 上,存儲(chǔ)方式是一個(gè) partition 對(duì)應(yīng)一個(gè)文件,每個(gè) broker 負(fù)責(zé)存儲(chǔ)在自己機(jī)器上的 partition 中的消息讀寫(xiě)。

2、副本(replicated )

??kafka 還可以配置 partitions 需要備份的個(gè)數(shù)(replicas),每個(gè) partition 將會(huì)被備份到多臺(tái)機(jī)器上,以提高可用性,備份的數(shù)量可以通過(guò)配置文件指定。

??這種冗余備份的方式在分布式系統(tǒng)中是很常見(jiàn)的,那么既然有副本,就涉及到對(duì)同一個(gè)文件的多個(gè)備份如何進(jìn)行管理和調(diào)度。kafka 采取的方案是:每個(gè) partition 選舉一個(gè) server 作為“l(fā)eader”,由 leader 負(fù)責(zé)所有對(duì)該分區(qū)的讀寫(xiě),其他 server 作為 follower 只需要簡(jiǎn)單的與 leader 同步,保持跟進(jìn)即可。如果原來(lái)的 leader 失效,會(huì)重新選舉由其他的 follower 來(lái)成為新的 leader。

??至于如何選取 leader,實(shí)際上如果我們了解 ZooKeeper,就會(huì)發(fā)現(xiàn)其實(shí)這正是 Zookeeper 所擅長(zhǎng)的,Kafka 使用 ZK 在 Broker 中選出一個(gè) Controller,用于 Partition 分配和 Leader 選舉。

??另外,這里我們可以看到,實(shí)際上作為 leader 的 server 承擔(dān)了該分區(qū)所有的讀寫(xiě)請(qǐng)求,因此其壓力是比較大的,從整體考慮,有多少個(gè) partition 就意味著會(huì)有多少個(gè)leader,kafka 會(huì)將 leader 分散到不同的 broker 上,確保整體的負(fù)載均衡。

3、整體數(shù)據(jù)流程

??Kafka 的總體數(shù)據(jù)流滿足下圖,該圖可以說(shuō)是概括了整個(gè) kafka 的基本原理。


(1)數(shù)據(jù)生產(chǎn)過(guò)程(Produce)

??對(duì)于生產(chǎn)者要寫(xiě)入的一條記錄,可以指定四個(gè)參數(shù):分別是 topic、partition、key 和 value,其中 topic 和 value(要寫(xiě)入的數(shù)據(jù))是必須要指定的,而 key 和 partition 是可選的。

??對(duì)于一條記錄,先對(duì)其進(jìn)行序列化,然后按照 Topic 和 Partition,放進(jìn)對(duì)應(yīng)的發(fā)送隊(duì)列中。如果 Partition 沒(méi)填,那么情況會(huì)是這樣的:a、Key 有填。按照 Key 進(jìn)行哈希,相同 Key 去一個(gè) Partition。b、Key 沒(méi)填。Round-Robin 來(lái)選 Partition。

??producer 將會(huì)和Topic下所有 partition leader 保持 socket 連接,消息由 producer 直接通過(guò) socket 發(fā)送到 broker。其中 partition leader 的位置( host : port )注冊(cè)在 zookeeper 中,producer 作為 zookeeper client,已經(jīng)注冊(cè)了 watch 用來(lái)監(jiān)聽(tīng) partition leader 的變更事件,因此,可以準(zhǔn)確的知道誰(shuí)是當(dāng)前的 leader。

??producer 端采用異步發(fā)送:將多條消息暫且在客戶端 buffer 起來(lái),并將他們批量的發(fā)送到 broker,小數(shù)據(jù) IO 太多,會(huì)拖慢整體的網(wǎng)絡(luò)延遲,批量延遲發(fā)送事實(shí)上提升了網(wǎng)絡(luò)效率。

(2)數(shù)據(jù)消費(fèi)過(guò)程(Consume)

??對(duì)于消費(fèi)者,不是以單獨(dú)的形式存在的,每一個(gè)消費(fèi)者屬于一個(gè) consumer group,一個(gè) group 包含多個(gè) consumer。特別需要注意的是:訂閱 Topic 是以一個(gè)消費(fèi)組來(lái)訂閱的,發(fā)送到 Topic 的消息,只會(huì)被訂閱此 Topic 的每個(gè) group 中的一個(gè) consumer 消費(fèi)。

??如果所有的 Consumer 都具有相同的 group,那么就像是一個(gè)點(diǎn)對(duì)點(diǎn)的消息系統(tǒng);如果每個(gè) consumer 都具有不同的 group,那么消息會(huì)廣播給所有的消費(fèi)者。

??具體說(shuō)來(lái),這實(shí)際上是根據(jù) partition 來(lái)分的,一個(gè) Partition,只能被消費(fèi)組里的一個(gè)消費(fèi)者消費(fèi),但是可以同時(shí)被多個(gè)消費(fèi)組消費(fèi),消費(fèi)組里的每個(gè)消費(fèi)者是關(guān)聯(lián)到一個(gè) partition 的,因此有這樣的說(shuō)法:對(duì)于一個(gè) topic,同一個(gè) group 中不能有多于 partitions 個(gè)數(shù)的 consumer 同時(shí)消費(fèi),否則將意味著某些 consumer 將無(wú)法得到消息。

??同一個(gè)消費(fèi)組的兩個(gè)消費(fèi)者不會(huì)同時(shí)消費(fèi)一個(gè) partition。

??在 kafka 中,采用了 pull 方式,即 consumer 在和 broker 建立連接之后,主動(dòng)去 pull(或者說(shuō) fetch )消息,首先 consumer 端可以根據(jù)自己的消費(fèi)能力適時(shí)的去 fetch 消息并處理,且可以控制消息消費(fèi)的進(jìn)度(offset)。

??partition 中的消息只有一個(gè) consumer 在消費(fèi),且不存在消息狀態(tài)的控制,也沒(méi)有復(fù)雜的消息確認(rèn)機(jī)制,可見(jiàn) kafka broker 端是相當(dāng)輕量級(jí)的。當(dāng)消息被 consumer 接收之后,需要保存 Offset 記錄消費(fèi)到哪,以前保存在 ZK 中,由于 ZK 的寫(xiě)性能不好,以前的解決方法都是 Consumer 每隔一分鐘上報(bào)一次,在 0.10 版本后,Kafka 把這個(gè) Offset 的保存,從 ZK 中剝離,保存在一個(gè)名叫 consumeroffsets topic 的 Topic 中,由此可見(jiàn),consumer 客戶端也很輕量級(jí)。

4、消息傳送機(jī)制

??Kafka 支持 3 種消息投遞語(yǔ)義,在業(yè)務(wù)中,常常都是使用 At least once 的模型。

  • At most once:最多一次,消息可能會(huì)丟失,但不會(huì)重復(fù)。
  • At least once:最少一次,消息不會(huì)丟失,可能會(huì)重復(fù)。
  • Exactly once:只且一次,消息不丟失不重復(fù),只且消費(fèi)一次。


以上內(nèi)容是否對(duì)您有幫助:
在線筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號(hào)
微信公眾號(hào)

編程獅公眾號(hào)