第七周,大數(shù)據(jù)建模 scala 復(fù)習(xí) , KaFka 回顧 , 繼續(xù)講解
第七周,10-13 , scala 復(fù)習(xí) , KaFka 回顧
KAFAKA 消息中介,
比如說, 淘寶的訂單交易系統(tǒng), 產(chǎn)生訂單的信息, 對這些訂單要實時分析, 對他關(guān)心的, 進(jìn)行推送, 這時要用 KAFKA 進(jìn)行
推送, 再從KAFKA中讀取出來, 持久化, 7天可以存儲, 可以高存儲量, 百萬級別。
生產(chǎn)者發(fā)布一個消息, 或者就是客戶下一個訂單, 這個消息推送到 KAFKA的主題TOPIC中, 消費者要訂閱這個主題, 不同的
TOPIC 要進(jìn)行不同分析。 等于不同的消費者會訂閱不同的主題, 再從KAFKA的集群中獲得。
KAFKA 集群 由 ZOOKEPPER管理。 做一些搜索引擎的事情, KAFKA的數(shù)量, 可以和 瀏覽器對接, ELESHCHE , 輸入
關(guān)鍵詞, 進(jìn)行 TOPIC 主題的創(chuàng)建。
KAFKA就是一個 高吞吐量的一個集群。
淘寶數(shù)據(jù)要出現(xiàn)顯示大屏 , 把實時處理的數(shù)據(jù), 可以放在 Redis 里面,
序列號, 也就是偏移量, 這個就是由 ZOOKEPPER 管理, 消費者要從 KAFKA進(jìn)行 消費, 也需要進(jìn)行記錄。
在 不同的 TOPIC 也由 ZOOKEPPER 管理, 這2個集群都要建立。
分區(qū) partion , 可以設(shè)置在 TOPIC下 。
Broker 就是一個 緩存代理。
日志類、訂單類屬于不同的 分區(qū) PARTION , OFFSET 就是序號或者偏移量。
接下來講: KAFKA的 MASSAGE ,通訊的基本單位, 每個生產(chǎn)者可以向一個 TOPIC 發(fā)布一些消息。
KAFKA 中的MASSAGE是以 TOPIC 為基本單位組織的,
MASSAGE 是如何進(jìn)行存儲的, TOPIC 對應(yīng)一個 偏移量, 也就是 ID , 也就是指針,
總之, 幾十年來, 計算機還是進(jìn)行表管理。
放入 TOPIC ,變成一個字符串, 然后就是用 空格進(jìn)行確認(rèn), 總之, 把生產(chǎn)者的數(shù)據(jù), 存儲到 KAFKA
消費者再從這個 MASSAGE中取得數(shù)據(jù)。
11:10 開始上課, KAFKA的 消息處理機制。
1、發(fā)送到 PARTITION 中的消息, 自動追加到日志中, 順序是一至的,
2、對于消費者 , 消費消息的順序也是一至的。
3、如果 topic 的 replication factor 為 n , 那么允許n-1 個 kafka 的實例失效
4、kafka 對消息的重復(fù)、丟失、錯誤以及順序沒有嚴(yán)格的要求。
5、kafka 提供 at-least-once delivery , 當(dāng)消費者宕機后, 有些消息可能會被重復(fù) 發(fā)送 delivry
6、 因每個 partition只會被 消費者組內(nèi)部的一個消費者消費。 KAFKA是保證每個 PARTITION 內(nèi)的消息會被順序訂閱。
7、Kafka 為每條消息計算 CRC檢驗, 用于錯誤檢測, CRC檢驗不通過的消息會直接被丟棄掉
ACK校驗, 當(dāng)消費者消費成功, 返回ACK消息。
KAFKA數(shù)據(jù)傳輸?shù)臋C制又是什么?
1、at most once: 最多一次, 這個和 JMS 中的非持久化消息類似, 無論成敗, 將不會重發(fā)。
2、at least once : 消息至少發(fā)送一次, 如果消息美未能接受成功, 可能進(jìn)行重發(fā), 直到接受成功。
3、exactly once : 消息只會發(fā)送一次 。
對于 這 3點, 做詳細(xì)描述。
KafAKA的存儲策略,
生產(chǎn)者生產(chǎn)的消息, 然后在 kaFka 存儲 , 是順序產(chǎn)生的, offset 不一致
一、kafka的存儲機制
kafka通過topic來分主題存放數(shù)據(jù),主題內(nèi)有分區(qū),分區(qū)可以有多個副本,分區(qū)的內(nèi)部還細(xì)分為若干個segment。
所謂的分區(qū)其實就是在kafka對應(yīng)存儲目錄下創(chuàng)建的文件夾,文件夾的名字是主題名加上分區(qū)編號,編號從0開始。
1、segment
所謂的segment其實就是在分區(qū)對應(yīng)的文件夾下產(chǎn)生的文件。
一個分區(qū)會被劃分成大小相等的若干segment,這樣一方面保證了分區(qū)的數(shù)據(jù)被劃分到多個文件中保證不會產(chǎn)生體積過大的文件;
另一方面可以基于這些segment文件進(jìn)行歷史數(shù)據(jù)的刪除,提高效率。
一個segment又由一個.log和一個.index文件組成。
1..log
.log文件為數(shù)據(jù)文件用來存放數(shù)據(jù)分段數(shù)據(jù)。
2..index
.index為索引文件保存對對應(yīng)的.log文件的索引信息。
在.index文件中,保存了對對應(yīng).log文件的索引信息,通過查找.index文件可以獲知每個存儲在當(dāng)前segment中的offset在.log文件中的開始位置,
而每條日志有其固定格式,保存了包括offset編號、日志長度、key的長度等相關(guān)信息,通過這個固定格式中的數(shù)據(jù)可以確定出當(dāng)前offset的結(jié)束位置,
從而對數(shù)據(jù)進(jìn)行讀取。
3.命名規(guī)則
這兩個文件的命名規(guī)則為:
partition全局的第一個segment從0開始,后續(xù)每個segment文件名為上一個segment文件最后一條消息的offset值,數(shù)值大小為64位,
20位數(shù)字字符長度,沒有數(shù)字用0填充。
2、讀取數(shù)據(jù)
開始讀取指定分區(qū)中某個offset對應(yīng)的數(shù)據(jù)時,先根據(jù)offset和當(dāng)前分區(qū)的所有segment的名稱做比較,確定出數(shù)據(jù)在哪個segment中,
再查找該segment的索引文件,確定當(dāng)前offset在數(shù)據(jù)文件中的開始位置,最后從該位置開始讀取數(shù)據(jù)文件,在根據(jù)數(shù)據(jù)格式判斷結(jié)果,
獲取完整數(shù)據(jù)。
二、可靠性保證
1、AR
在Kafka中維護(hù)了一個AR列表,包括所有的分區(qū)的副本。AR又分為ISR和OSR。
AR = ISR + OSR。
AR、ISR、OSR、LEO、HW這些信息都被保存在Zookeeper中。
1.ISR
ISR中的副本都要同步leader中的數(shù)據(jù),只有都同步完成了數(shù)據(jù)才認(rèn)為是成功提交了,成功提交之后才能供外界訪問。
在這個同步的過程中,數(shù)據(jù)即使已經(jīng)寫入也不能被外界訪問,這個過程是通過LEO-HW機制來實現(xiàn)的。
2.OSR
OSR內(nèi)的副本是否同步了leader的數(shù)據(jù),不影響數(shù)據(jù)的提交,OSR內(nèi)的follower盡力的去同步leader,可能數(shù)據(jù)版本會落后。
最開始所有的副本都在ISR中,在kafka工作的過程中,如果某個副本同步速度慢于replica.lag.time.max.ms指定的閾值,
則被踢出ISR存入OSR,如果后續(xù)速度恢復(fù)可以回到ISR中。
3.LEO
LogEndOffset:分區(qū)的最新的數(shù)據(jù)的offset,當(dāng)數(shù)據(jù)寫入leader后,LEO就立即執(zhí)行該最新數(shù)據(jù)。相當(dāng)于最新數(shù)據(jù)標(biāo)識位。
4.HW
HighWatermark:只有寫入的數(shù)據(jù)被同步到所有的ISR中的副本后,數(shù)據(jù)才認(rèn)為已提交,HW更新到該位置,HW之前的數(shù)據(jù)才可以
被消費者訪問,保證沒有同步完成的數(shù)據(jù)不會被消費者訪問到。相當(dāng)于所有副本同步數(shù)據(jù)標(biāo)識位。
在leader宕機后,只能從ISR列表中選取新的leader,無論ISR中哪個副本被選為新的leader,它都知道HW之前的數(shù)據(jù),
可以保證在切換了leader后,消費者可以繼續(xù)看到HW之前已經(jīng)提交的數(shù)據(jù)。
所以LEO代表已經(jīng)寫入的最新數(shù)據(jù)位置,而HW表示已經(jīng)同步完成的數(shù)據(jù),只有HW之前的數(shù)據(jù)才能被外界訪問。
5.HW截斷機制
如果leader宕機,選出了新的leader,而新的leader并不能保證已經(jīng)完全同步了之前l(fā)eader的所有數(shù)據(jù),只能保證HW之前的數(shù)據(jù)是同步過的,此時所有的follower都要將數(shù)據(jù)截斷到HW的位置,再和新的leader同步數(shù)據(jù),來保證數(shù)據(jù)一致。
當(dāng)宕機的leader恢復(fù),發(fā)現(xiàn)新的leader中的數(shù)據(jù)和自己持有的數(shù)據(jù)不一致,此時宕機的leader會將自己的數(shù)據(jù)截斷到宕機之前的hw位置,然后同步新leader的數(shù)據(jù)。宕機的leader活過來也像follower一樣同步數(shù)據(jù),來保證數(shù)據(jù)的一致性。
2、生產(chǎn)者可靠性級別
通過以上的講解,已經(jīng)可以保證kafka集群內(nèi)部的可靠性,但是在生產(chǎn)者向kafka集群發(fā)送時,數(shù)據(jù)經(jīng)過網(wǎng)絡(luò)傳輸,也是不可靠的,可能因為網(wǎng)絡(luò)延遲、閃斷等原因造成數(shù)據(jù)的丟失。
kafka為生產(chǎn)者提供了如下的三種可靠性級別,通過不同策略保證不同的可靠性保障。
其實此策略配置的就是leader將成功接收消息信息響應(yīng)給客戶端的時機。
通過request.required.acks參數(shù)配置:
1:生產(chǎn)者發(fā)送數(shù)據(jù)給leader,leader收到數(shù)據(jù)后發(fā)送成功信息,生產(chǎn)者收到后認(rèn)為發(fā)送數(shù)據(jù)成功,如果一直收不到成功消息,則生產(chǎn)者認(rèn)為發(fā)送數(shù)據(jù)失敗會自動重發(fā)數(shù)據(jù)。
當(dāng)leader宕機時,可能丟失數(shù)據(jù)。
0:生產(chǎn)者不停向leader發(fā)送數(shù)據(jù),而不需要leader反饋成功消息。
這種模式效率最高,可靠性最低??赡茉诎l(fā)送過程中丟失數(shù)據(jù),也可能在leader宕機時丟失數(shù)據(jù)。
-1:生產(chǎn)者發(fā)送數(shù)據(jù)給leader,leader收到數(shù)據(jù)后要等到ISR列表中的所有副本都同步數(shù)據(jù)完成后,才向生產(chǎn)者發(fā)送成功消息,如果一只收不到成功消息,則認(rèn)為發(fā)送數(shù)據(jù)失敗會自動重發(fā)數(shù)據(jù)。
這種模式下可靠性很高,但是當(dāng)ISR列表中只剩下leader時,當(dāng)leader宕機讓然有可能丟數(shù)據(jù)。
此時可以配置min.insync.replicas指定要求觀察ISR中至少要有指定數(shù)量的副本,默認(rèn)該值為1,需要改為大于等于2的值
這樣當(dāng)生產(chǎn)者發(fā)送數(shù)據(jù)給leader但是發(fā)現(xiàn)ISR中只有l(wèi)eader自己時,會收到異常表明數(shù)據(jù)寫入失敗,此時無法寫入數(shù)據(jù),保證了數(shù)據(jù)絕對不丟。
雖然不丟但是可能會產(chǎn)生冗余數(shù)據(jù),例如生產(chǎn)者發(fā)送數(shù)據(jù)給leader,leader同步數(shù)據(jù)給ISR中的follower,同步到一半leader宕機,此時選出新的leader,可能具有部分此次提交的數(shù)據(jù),而生產(chǎn)者收到失敗消息重發(fā)數(shù)據(jù),新的leader接受數(shù)據(jù)則數(shù)據(jù)重復(fù)了。
3、leader選舉
當(dāng)leader宕機時會選擇ISR中的一個follower成為新的leader,如果ISR中的所有副本都宕機,怎么辦?
有如下配置可以解決此問題:
unclean.leader.election.enable=false
策略1:必須等待ISR列表中的副本活過來才選擇其成為leader繼續(xù)工作。
unclean.leader.election.enable=true
策略2:選擇任何一個活過來的副本,成為leader繼續(xù)工作,此follower可能不在ISR中。
策略1,可靠性有保證,但是可用性低,只有最后掛了leader活過來kafka才能恢復(fù)。
策略2,可用性高,可靠性沒有保證,任何一個副本活過來就可以繼續(xù)工作,但是有可能存在數(shù)據(jù)不一致的情況。
4、kafka可靠性的保證
At most once:消息可能會丟,但絕不會重復(fù)傳輸。
At least once:消息絕不會丟,但可能會重復(fù)傳輸。
Exactly once:每條消息肯定會被傳輸一次且僅傳輸一次。
kafka最多保證At least once,可以保證不丟,但是可能會重復(fù),為了解決重復(fù)需要引入唯一標(biāo)識和去重機制,kafka提供了GUID實現(xiàn)了唯一標(biāo)識,但是并沒有提供自帶的去重機制,需要開發(fā)人員基于業(yè)務(wù)規(guī)則自己去重。
生產(chǎn)者產(chǎn)生第一個消息, 會在 segment 中記錄第一個偏移量, 一致追加, 如果打了閥值, 會存儲到磁盤上去。
KAFKA 的數(shù)據(jù)傳輸
KAFKA 消息發(fā)布流程
消息處理的優(yōu)勢:
KAFKA的設(shè)計原理 ,無論做不做大數(shù)據(jù), 還是僅僅是配合 JAVA, 都需要了解?。耍粒疲耍痢?/p>
KAFKA 的 通訊協(xié)議
通訊過程
應(yīng)用層
與其它計算機進(jìn)行通訊的一個應(yīng)用,它是對應(yīng)應(yīng)用程序的通信服務(wù)的。
例如,一個沒有通信功能的字處理程序就不能執(zhí)行通信的代碼,從事字處理工作的程序員也不關(guān)心OSI的第7層。
但是,如果添加了一個傳輸文件的選項,那么字處理器的程序就需要實現(xiàn)OSI的第7層。
表示層
這一層的主要功能是定義數(shù)據(jù)格式及加密。
例如,F(xiàn)TP允許你選擇以二進(jìn)制或ASCII格式傳輸。
如果選擇二進(jìn)制,那么發(fā)送方和接收方不改變文件的內(nèi)容。
如果選擇ASCII格式,發(fā)送方將把文本從發(fā)送方的字符集轉(zhuǎn)換成標(biāo)準(zhǔn)的ASCII后發(fā)送數(shù)據(jù)。
在接收方將標(biāo)準(zhǔn)的ASCII轉(zhuǎn)換成接收方計算機的字符集。示例:加密,ASCII等。
會話層
它定義了如何開始、控制和結(jié)束一個會話,包括對多個雙向消息的控制和管理,
以便在只完成連續(xù)消息的一部分時可以通知應(yīng)用,從而使表示層看到的數(shù)據(jù)是連續(xù)的,
在某些情況下,如果表示層收到了所有的數(shù)據(jù),則用數(shù)據(jù)代表表示層。
示例:RPC,SQL等。
傳輸層
這層的功能包括是否選擇差錯恢復(fù)協(xié)議還是無差錯恢復(fù)協(xié)議,及在同一主機上對不同應(yīng)用的數(shù)據(jù)流的輸入
進(jìn)行復(fù)用,還包括對收到的順序不對的數(shù)據(jù)包的重新排序功能。示例:TCP,UDP,SPX。
網(wǎng)絡(luò)層
這層對端到端的包傳輸進(jìn)行定義,它定義了能夠標(biāo)識所有結(jié)點的邏輯地址,還定義了路由實現(xiàn)的方式和學(xué)習(xí)的方式。
為了適應(yīng)最大傳輸單元長度小于包長度的傳輸介質(zhì),網(wǎng)絡(luò)層還定義了如何將一個包分解成更小的包的分段方法。
示例:IP,IPX等。
數(shù)據(jù)鏈路層
物理層
OSI的物理層規(guī)范是有關(guān)傳輸介質(zhì)的特性,這些規(guī)范通常也參考了其他組織制定的標(biāo)準(zhǔn)。
連接頭、幀、幀的使用、電流、編碼及光調(diào)制等都屬于各種物理層規(guī)范中的內(nèi)容。
物理層常用多個規(guī)范完成對所有細(xì)節(jié)的定義。示例:Rj45,802.3等。
KAFKA 集群的安裝部署:
1、下載kafka.tgz 架包
2、解壓
3、配置 zookepper,配置producer,配置consumer
4、啟動服務(wù)
a 啟動zooKEPPER ,
b 啟動kafka
./bin/zkserver.sh start
./bin/kafka-server-start.sh /config/server.properties
創(chuàng)建 topic
./bin/kafka-topics.sh --create--zookepper hadoop1:2181,hadoop2:2181,hadoop3:2181
--replication -factor 1 --partition 1 -topic mytopic
配置消費者信息
查看是不是有 kafka 的進(jìn)程, 使用 # ps 命令
------------ 日志文件, 實時的推送到 Kafka 里面, 做一個 topic - from -beginning
把消費者的端口啟動了, 消費 test 的數(shù)據(jù)。
剛才的日志文件, 用 for循環(huán) 做了一個 50000個日志文件, 灌入到kafka 里面去, 然后, 就可以消費了。
生產(chǎn)者 產(chǎn)生了數(shù)據(jù), 放入到 KafKA里面, 然后消費者就可以消費了。
生產(chǎn)者對應(yīng)的參數(shù)。 消費者對應(yīng)的參數(shù)。
做一下回顧: KAFKA Message 不斷記錄,offset 偏移量, 到達(dá)閥值, flash到硬盤
P
----------------------------------------------------------------------------以下是原始筆記
Kafka
1、Kafka是什么
2、Kafka體系結(jié)構(gòu)
3、Kafka設(shè)計理念
4、Kafka通信協(xié)議
5、Kafka集群
6、Kafka相關(guān)操作:kafka的shell操作及java操作
7、kafka的producer和consumer開發(fā)
Kafka產(chǎn)生的背景:
Kafka是分布式發(fā)布-訂閱消息系統(tǒng),它最初由LInkedin公司開發(fā),使用scala語言編寫之后成為Apache項目的一部分,kafka是一個分布式的,可劃分的,多訂閱者,冗余備份持久性的日志服務(wù),它主要用于處理活
躍的流式數(shù)據(jù)。
kafka可以起到兩個作用:
1、降低系統(tǒng)組網(wǎng)的復(fù)雜度
2、降低編程的復(fù)雜度,各個子系統(tǒng)不在是相互協(xié)商接口,各個子系統(tǒng)類似插口插在插座上,kafka承載高速數(shù)據(jù)總線的作用。
kafka簡介:
1、同時為發(fā)布和訂閱提供高吞吐量,kafka每秒可以生產(chǎn)為25萬消息(50MB),每秒可以處理55萬條數(shù)據(jù)(110MB)。
2、可以進(jìn)行持久化操作,將消息持久化到磁盤,因此可用于批量消費,如ETL,以及實時應(yīng)用程序。通過將數(shù)據(jù)持久化到磁盤以及replication防止數(shù)據(jù)丟失。
3、分布式系統(tǒng),易于向外擴(kuò)展,所有的producer、broker、consumer都會有多個,均為分布式的,無需停機即可擴(kuò)展機器。
4、消息被處理的狀態(tài)是在consumer端維護(hù),而不是在server端維護(hù),當(dāng)失敗時能自動平衡。
5、支持onlin和offline的場景。
性能測試:
虛擬機:CPU雙核、內(nèi)存:2GB、硬盤:60GB
測試指標(biāo):
消息推積壓力測試:
單個kafka broker節(jié)點測試,啟動一個kafka broker和producer,producer不斷向broker發(fā)送消息
直到broker堆積數(shù)據(jù)為18GB為停止producer,接著啟動consumer,不斷從broker獲取數(shù)據(jù)
直到全部數(shù)據(jù)讀取完停止,最后檢查producer==consumer數(shù)據(jù),沒有出現(xiàn)卡死不響應(yīng)現(xiàn)象。
結(jié)論:數(shù)據(jù)大量堆積不會出現(xiàn)broker卡死或不影響現(xiàn)象。
生產(chǎn)者速率:
1萬左右。
結(jié)論:性能上完全滿足要求,其性能主要由磁盤決定
消費者速率
1萬左右
結(jié)論:性能上完全滿足要求,其性能主要由磁盤決定。
Kafka的基本概念:
1、Topic:特指kafka的消息源的不同分類
2、Partion: Topic物理上的分組,一個topic可以分為多個partion,每個partion是一個有序的隊列,partion中的每條消息都會被分配一個有序的id,也叫offset。
3、Message: 消息,是通信的基本單位,每個producer可以向一個topic發(fā)布一些消息。
4、Producers:消息和數(shù)據(jù)的生產(chǎn)者,向kafka的一個topic發(fā)布消息的過程叫做producers
5、Consumers:消息和數(shù)據(jù)的消費者,訂閱Topic并處理其發(fā)布的消息的過程叫做consumers。
6、Broker:緩存代理,kafka集群中的一臺或多臺服務(wù)器統(tǒng)稱為Broker.
kafka設(shè)計關(guān)注的重點:
1、為生產(chǎn)者和消費者提供一個通用的API
2、消息的持久化
3、高吞吐量,可以滿足百萬級別的消息處理。
4、對分布式和高擴(kuò)展的支持。
kafka最基本的架構(gòu)是生產(chǎn)者發(fā)布一個消息到kafka的一個主題topic,這個主題topic即是由扮演kafkaServer角色的broker提供,消費者訂閱這個主題,然后從中獲取信息。
kafka的兩大法寶:
1、提供文件的分段
2、提供文件索引
索引優(yōu)化:稀疏存儲,每隔一定字節(jié)的數(shù)據(jù)建立一條索引
kafka消息隊列分類:
1、點對點
消息生產(chǎn)者生產(chǎn)消息發(fā)送到queue中,然后消息消費者從queue中取出消息,并且消費消息。
注意:
消息被消費后,queue中不再有存儲,所以消息消費者不可能消費到已經(jīng)被消費的消息。
queue支持存在多個消費者,但是對一個消息而言,只會有一個消費者可以消費。
2、發(fā)布訂閱
消息生產(chǎn)者(發(fā)布)將消息發(fā)布到topic中,同時有多個消息消費者(訂閱)消費該消息,和點對點不同,發(fā)布到topic的消息會被所有訂閱者消費。
消息隊列MQ對比:
1、RabbitMQ:支持的協(xié)議多,非常重量級消息隊列,對路由(Routing),負(fù)載均衡(Load balance)或者數(shù)據(jù)持久化有很好的支持。
2、ZeroMQ:號稱最快的消息隊列系統(tǒng),尤其針對大吞吐量的需求場景,擅長的高級、復(fù)雜的隊列。但技術(shù)也復(fù)雜,并且只提供非持久性的隊列。
3、ActiveMQ:是Apache下的一個子項目,類似于ZeroMQ,能夠以代理人和點對點的技術(shù)實現(xiàn)隊列。
4、Redis:是一個Key-Value的Nosql數(shù)據(jù)庫,但也支持MQ功能,數(shù)據(jù)量小,性能優(yōu)于RabbitMQ,數(shù)據(jù)超過10k就慢得無法接受。
Kafka部署架構(gòu):
(Producer、Broker、Consumer、Zookeeper)
producer --(push)--> kafka(broker) <---(pull)---Consumer
|
|
|
|
Zookeeper
Kafka集群架構(gòu)
(Broker--Master、Slave <------Zookeeper)
Kafka的Producers
Producer將消息發(fā)布到指定的topic中,同時prodeucer也能決定將此消息歸屬于哪個partion,比如基于round-robin方式或者通過其它的一些算法等。
消息和數(shù)據(jù)的生產(chǎn)者,向kafka的一個topic發(fā)布消息的過程叫做producers
異步發(fā)送
批量發(fā)送可以很有效的提高發(fā)送效率,kafka producer的異步發(fā)送模式允許進(jìn)行批量發(fā)送,先將消息緩存在內(nèi)存中,然后一次請批量發(fā)送出去。
Kafka的Broker
Broker:緩存代理,為了減少磁盤寫入的次數(shù),Broker會將消息暫時buffer起來,當(dāng)消息的個數(shù)達(dá)到一定閥值時,再flush到磁盤,這樣就減少了磁盤io調(diào)用的次數(shù)。
kafka的Consumers
注意:kafka的設(shè)計原理決定對于一個topic同一個group不能有多于partions個數(shù)的consumer同時消費者,否則將意味著某些xonsumers將無法得到消息。
Kafka的broker無狀態(tài)機制
1、Broker沒有副本機制,一但broker宕機,該broker的消息將都不可用
2、Broker不保存訂閱者的狀態(tài),由訂閱者自己保存
3、無狀態(tài)導(dǎo)致消息的刪除成為難道,kafka采用基于時間的sla,消息保存一定時間后會被刪除。
4、消息訂閱者可以rewind back到任意位置重新進(jìn)行消費,當(dāng)訂閱者故障時,可以選擇最小的offset進(jìn)行重新讀取消費消息。
kafka的consumer group
1、允許consumer group對一個topic進(jìn)行消費,不同的consumer group之間獨立訂閱
2、為了對減小一個consumer group中不同的consumer之間的分布式協(xié)調(diào)開銷,指定partion為最小的并行消費單位,即一個group內(nèi)的consumer只能消費不同的partion
Kafka的Topic/Log
一個topic可以認(rèn)為是一類消息,每個topic將被分成多個partion分區(qū),每個partion在存儲層面是append log文件,任何發(fā)布到此partion的消息都會被追加到Log文件的尾部,每條消息在文件中的位置稱為offset,也叫做偏移量,partion是以文件的形式存儲在文件系統(tǒng)中。
logs文件根據(jù)broker中的配置來保存一定時間后刪除來釋放磁盤空間。
Kafka的partion
1、kafka基于文件存儲,通過分區(qū),可以將日志內(nèi)容分散到多個server上,來避免文件尺寸達(dá)到單機磁盤的上限,每個partion都會被當(dāng)前的server保存
2、可以將一個topic切分多任意多個partion,來消息保存消費的效率
3、越多的partion意味著可以容納更多的consumer,有效提升并發(fā)消費的能力。
kafka的Message
Message消息:是通信的基本單位,每個producer可以向一個topic發(fā)布一些消息。
Kafka中的Message是以topic為基本單位組織的,不同的topic之間是相互獨立的,每個topic以可以為每個partion存儲一部分message。
partiion中的每條message包含了三個屬性:
1、offset 對應(yīng)類型:long
2、MessageSize: 對應(yīng)類型:int32
3、data: 是Message的具體內(nèi)容。
Kafka的Offset
每條消息在文件中的位置稱為:offset,也叫偏移量,offset為一個long型數(shù)字,字是唯一標(biāo)記一條消息,kafka并沒有提供其他額外的索引機制來存儲offset,因為在kafka中不允許對消息進(jìn)行隨即讀寫。
partition中的每條消息message由offset來表示它在這個partition中的偏移量,這個offset不是這個message在partition數(shù)據(jù)文件中的實際存儲的位置,而是邏輯上一個值,它唯一確定了partition中的一條message,因此可以認(rèn)為offset是partition中message的id.
kafka的消息處理機制
1、發(fā)送到partitions中的消息將會按照它接收的順序追加到日志中
2、對于消費者,它們消費消息的順序和日志中消息順序一致
3、如果topic的replication factor為n,那么允許n-1個kafka實例失效
4、kafka對消息的重復(fù)、丟失、錯誤以及順序沒有嚴(yán)格的要求。
5、kafka提供at-least-once delivery,當(dāng)consumers宕機后,有些消息可能會被重復(fù)delivery
6、因每個partition只會被consumergroup內(nèi)的一個consumer消費,所以kafka保證每個partition內(nèi)的消息會被順序訂閱。
7、kafka為每條消息計算CRC檢驗,用于錯誤檢測,CRC檢驗不通過的消息會直接被丟棄掉
ACK校驗,當(dāng)消費者消費成功,返回ACK消息。
數(shù)據(jù)傳輸?shù)氖聞?wù)定義:
1、at most once: 最多一次,這個和jms中非持久化消息類似,無論成敗,將不會重發(fā)。
2、at least once: 消息至少發(fā)送一次,如果消息未能接受成功,可能會進(jìn)行重發(fā),直到接受成功。
3、exactly once: 消息只會發(fā)送一次。
at most once: 消費者fetch消息,然后保存offset,然后處理消息,當(dāng)client保存offset之后,但是在消息處理過程中出現(xiàn)了異常,導(dǎo)致部分消息未能繼續(xù)處理,那么此后未處理的消息都不能被fetch到,這就是at most once。
at least once: 消費者fetch消息,然后處理消息,然后打開offset,如果消息處理成功之后,但是在保存offset階段zookeeper異常,導(dǎo)致保存操作未能執(zhí)行成功,這就導(dǎo)致接下來再次fetch時可能獲得上次已經(jīng)處理過的消息,這就是at least once,原因offset沒有即時的提交到zookeeper,zookeeper恢復(fù)正常還是之前的offset狀態(tài)。
exactly once: kafka中并沒有嚴(yán)格的去實現(xiàn)基于2階段提交事務(wù),我們認(rèn)為這種策略在kafka中沒有必要。
注意:
通常情況下:at least once是我們的首選,相比at most once,重復(fù)接受消息總比丟失數(shù)據(jù)要好。
kafka的儲存策略:
1、kafka以topic來進(jìn)行消息管理,每個topic包含多個partition,每個partition對應(yīng)一個邏輯log,有多個segment組成。
2、每個segment中存儲多條消息,消息id由其邏輯位置決定,從消息id可直接定位到消息的存儲位置,避免id到位置的額外映射。
3、broker收到發(fā)布消息往對應(yīng)的partion的最后一個segment上添加消息。
4、每個partition在內(nèi)存中對應(yīng)一個index,記錄每個segment中的第一條消息偏移。
5、發(fā)布者發(fā)送到某個topic的消息會被 均勻的分布到多個partition上(隨機或者根據(jù)用戶指定的回調(diào)函數(shù)進(jìn)行分布),broker收到發(fā)布消息往對應(yīng)的partition的最后一個segment上進(jìn)行添加該消息,當(dāng)某個segment上的消息條數(shù)達(dá)到配置值或消息發(fā)布時間超過閥值時,segment上的消息會被flush到磁盤,只有flush到磁盤上的消息訂閱者才能訂閱到,segment達(dá)到一定的大小后將不會再往該segment寫數(shù)據(jù),broker會創(chuàng)建新的segment。
kafka的數(shù)據(jù)傳輸:
1、發(fā)布者每次可發(fā)布多條消息(將消息加到一個消息集合中發(fā)布),sub每次迭代一條消息。
2、不創(chuàng)建單獨的cache,使用系統(tǒng)的page cache。發(fā)布者順序發(fā)布,訂閱者通常比發(fā)布者滯后一點點,直接使用Linux的page cache效果也比較后,同時減少了cache管理及垃圾收集和開銷。
3、使用sendfile優(yōu)化網(wǎng)絡(luò)傳輸,減少一次內(nèi)存拷貝。
kafka的消息發(fā)送的流程:
1、由于kafka broker會持久化數(shù)據(jù),broker沒有內(nèi)存壓力,因此,consumer非常適合采取pull的方式消費數(shù)據(jù)。
2、producer向kafka(push)推數(shù)據(jù)
3、consumer從kafka拉(pull)數(shù)據(jù)
消息處理的優(yōu)勢:
1、簡化kafka設(shè)計
2、consumer根據(jù)消費能力自主控制消息拉取速度。
3、consumer根據(jù)自身情況自主選擇消費模式,例如:批量、重復(fù)消費,從尾端開始消費等。
4、kafka集群接收到producer發(fā)過來的消息后,將其持久化到硬盤,并保留消息指定時長,而不關(guān)注消息是否被消費。
kafka設(shè)計原理實現(xiàn):
1、kafka以topic來進(jìn)行消息管理,發(fā)布者發(fā)到某個topic的消息會被均勻的分布到多個partition上
2、每個topic包含多個partition,每個partition對應(yīng)一個邏輯log,有多個segment組成
3、每個segment中存儲多條消息,消息id由其邏輯位置決定,即從消息id可直接定位到消息的存儲 位置,避免id到位置的額外映射。
4、每個partition在內(nèi)存中有一個Index,記錄每個segment中的第一條消息的偏移量
5、當(dāng)某個segment上的消息數(shù)據(jù)達(dá)到一定閥值,會flush到磁盤,進(jìn)行訂閱,broker此時會重新創(chuàng)建新的segment。
kafka的通訊協(xié)議:
kafka通訊的基本單位是request/response
基本結(jié)構(gòu):messagesize、requestmessage、responsemessage
通訊過程:
客戶端打開與服務(wù)器的socket
往socket寫入一個Int32的數(shù)字
服務(wù)端先讀取出一個int32的整數(shù)從而獲取這次requests的大小
然后讀取對應(yīng)字節(jié)數(shù)的數(shù)據(jù)從而得到requests的具體內(nèi)容
服務(wù)器端處理了請求后,也用同樣的方式來發(fā)送響應(yīng)。
kafka的通訊協(xié)議組件關(guān)系:
Request/Response是通訊層的結(jié)構(gòu),和網(wǎng)絡(luò)的7層模型對比的話,它類似于TCP層
Message、MessageSet定義的是業(yè)務(wù)層的結(jié)構(gòu),類似于網(wǎng)絡(luò)7層模型中的HTTP層,Message、MessageSet只是Request、Response的payload中的一種數(shù)據(jù)結(jié)構(gòu)。
說明:
kafka的通訊協(xié)議中不包含schema,格式也比較簡單,這樣設(shè)計的好處是協(xié)議自身的overhead小,再加上把多條message放在一起做壓縮,提高壓縮比率,從而在網(wǎng)絡(luò)上傳輸?shù)臄?shù)據(jù)量會少一些。
kafka的分布式實現(xiàn):
1、一個topic的多個partition被分布在kafka集群中的多個server(kafka實例)上,每個server負(fù)責(zé)partition中消息的讀寫操作。
2、此外kafka還可以配置partition需要備份的個數(shù)replicas,每個partition將會被備份到多臺機器上,以提高可用性。
3、基于replicated方案,那么就意味著需要對多備份從進(jìn)行調(diào)整。
4、每個partition都有一個server為leader,leader負(fù)責(zé)所有的讀寫操作,如果leader失效,那么將會有其它的follower來接管,成為新的leader。
5、follower只是單調(diào)的和leader跟進(jìn),同步消息即可,由此可見作為leader的server承載了全部的請求壓力,因此從集群的整體考慮,有多少個partitions就意味著有多少個leader
6、kafka會將leader均衡的分散在每個實例上,來確保整體的性能穩(wěn)定。
kafka數(shù)據(jù)持久化:
1、發(fā)現(xiàn)線性的訪問磁盤,很多時候比隨機的內(nèi)存訪問快得多
2、傳統(tǒng)的使用內(nèi)存做為磁盤緩存
3、kafka直接將數(shù)據(jù)寫入到日志文件中
kafka安裝:
1、下載kafka.tgz包
2、解壓
3、配置zookeeper,配置producer,配置consumer
4、啟動服務(wù)
a、啟動zookeeper服務(wù),b、啟動kafka
./bin/zkServer.sh start /stop /status
./bin/kafka-server-start.sh config/server.properties
創(chuàng)建topic:
./bin/kafka-topics.sh --create --zookeeper hadoop1:2181,hadoop2:2181,hadoop3:2181 --replication-factor 1 --partitions 1 --topic mytopic
查看topic:
./bin/kafka-topics.sh --list --zookeeper hadoop1:2181,hadoop2:2181,hadoop3:2181
查看topic詳細(xì)信息:
./bin/kafka-topics.sh --describe --zookeeper hadoop1:2181,hadoop2:2181,hadoop3:2181 --topic mytopic
刪除topic
./bin/kafka-topics.sh --zookeeper hadoop1:2181,hadoop2:2181,hadoop3:2181 --delete --topic mytopic
創(chuàng)建生產(chǎn)者producer
./bin/kafka-console-producer.sh --broker--list hadoop1:9092,hadoop2:9092,hadoop3:9092 --topic mytopic
創(chuàng)建消費者consumer
./bin/kafka-console-consumer.sh --zookeeper hadoop1:2181,hadoop2:2181,hadoop3:2181 --topic mytopic --from-beginning
生產(chǎn)者參數(shù)查看:
./bin/kafka-console-producer.sh
消費者參數(shù)查看:
./bin/kafka-console-consumer.sh