配置zookeeper 使用kafka/bin/下自帶的zk
創(chuàng)新互聯(lián)網(wǎng)絡(luò)公司擁有10余年的成都網(wǎng)站開發(fā)建設(shè)經(jīng)驗(yàn),上千客戶的共同信賴。提供網(wǎng)站制作、成都網(wǎng)站建設(shè)、網(wǎng)站開發(fā)、網(wǎng)站定制、外鏈、建網(wǎng)站、網(wǎng)站搭建、響應(yīng)式網(wǎng)站、網(wǎng)頁設(shè)計(jì)師打造企業(yè)風(fēng)格,提供周到的售前咨詢和貼心的售后服務(wù)
運(yùn)行 報錯 卒。配置低了
docker-compose.yml
報錯
換云搬瓦工的機(jī)器試一下
但是docker ps -a 發(fā)現(xiàn)只有zookeeper啟動了,kafka失敗, 檢查日志 發(fā)現(xiàn)kafka運(yùn)行需要java環(huán)境,而且對內(nèi)存有要求,搬瓦工的vps不足夠
因此修改docker-compose.yml 加入以下
stop 再啟動
完美
測試
進(jìn)入容器
查看已經(jīng)建好的topic (docker-compose.yml)
發(fā)送消息
接收消息
接下來是golang接入kafka了
運(yùn)行
需要注意的是,
1. borker / server 默認(rèn)允許的最大消息大小是 1M,過大的消息會被拒
2. 1M 是包括壓縮之后的大小,因此 producer/client 如果開啟壓縮,將大于 1M 的數(shù)據(jù)壓縮至小于 1M 發(fā)送即可
3. 如果修改 broker 端的 message.max.bytes 大小,需要修改消費(fèi)者、follower fetch 的大小與之匹配,并且允許較大的消息對性能有較大影響
1. 允許發(fā)的數(shù)據(jù) 1M
2. 開啟壓縮
環(huán)境:
現(xiàn)象:golang微服務(wù)內(nèi)存占用超過1G,查看日志發(fā)現(xiàn)大量kafka相關(guān)錯誤日志,繼而查看kafka集群,其中一個kafka節(jié)點(diǎn)容器掛掉了。
疑問 為什么kafka集群只有一個broker掛了,客戶端就大量報錯呢
通過beego admin頁面獲取 mem-1.memprof
可以看到調(diào)用棧為 withRecover backgroundMetadataUpdataer refreshMeaatdata RefreshMetada tryRefreshMetadata ...
sarama-cluster: NewClient
為什么kafka集群只有一個broker,但是NewClient確失敗了?
在kafka容器里查看topic, 發(fā)現(xiàn)Replicas和Isr只有一個,找到kafka官方配置說明,自動生成的topic需要配置default.replication.factor這個參數(shù),才會生成3副本。
1. 介紹
最近在研究一些消息中間件,常用的MQ如RabbitMQ,ActiveMQ,Kafka等。NSQ是一個基于Go語言的分布式實(shí)時消息平臺,它基于MIT開源協(xié)議發(fā)布,由bitly公司開源出來的一款簡單易用的消息中間件。
官方和第三方還為NSQ開發(fā)了眾多客戶端功能庫,如官方提供的基于HTTP的nsqd、Go客戶端go-nsq、Python客戶端pynsq、基于Node.js的JavaScript客戶端nsqjs、異步C客戶端libnsq、Java客戶端nsq-java以及基于各種語言的眾多第三方客戶端功能庫。
1.1 Features
1). Distributed
NSQ提供了分布式的,去中心化,且沒有單點(diǎn)故障的拓?fù)浣Y(jié)構(gòu),穩(wěn)定的消息傳輸發(fā)布保障,能夠具有高容錯和HA(高可用)特性。
2). Scalable易于擴(kuò)展
NSQ支持水平擴(kuò)展,沒有中心化的brokers。內(nèi)置的發(fā)現(xiàn)服務(wù)簡化了在集群中增加節(jié)點(diǎn)。同時支持pub-sub和load-balanced 的消息分發(fā)。
3). Ops Friendly
NSQ非常容易配置和部署,生來就綁定了一個管理界面。二進(jìn)制包沒有運(yùn)行時依賴。官方有Docker image。
4.Integrated高度集成
官方的 Go 和 Python庫都有提供。而且為大多數(shù)語言提供了庫。
1.2 組件
1.3 拓?fù)浣Y(jié)構(gòu)
NSQ推薦通過他們相應(yīng)的nsqd實(shí)例使用協(xié)同定位發(fā)布者,這意味著即使面對網(wǎng)絡(luò)分區(qū),消息也會被保存在本地,直到它們被一個消費(fèi)者讀取。更重要的是,發(fā)布者不必去發(fā)現(xiàn)其他的nsqd節(jié)點(diǎn),他們總是可以向本地實(shí)例發(fā)布消息。
NSQ
首先,一個發(fā)布者向它的本地nsqd發(fā)送消息,要做到這點(diǎn),首先要先打開一個連接,然后發(fā)送一個包含topic和消息主體的發(fā)布命令,在這種情況下,我們將消息發(fā)布到事件topic上以分散到我們不同的worker中。
事件topic會復(fù)制這些消息并且在每一個連接topic的channel上進(jìn)行排隊(duì),在我們的案例中,有三個channel,它們其中之一作為檔案channel。消費(fèi)者會獲取這些消息并且上傳到S3。
nsqd
每個channel的消息都會進(jìn)行排隊(duì),直到一個worker把他們消費(fèi),如果此隊(duì)列超出了內(nèi)存限制,消息將會被寫入到磁盤中。Nsqd節(jié)點(diǎn)首先會向nsqlookup廣播他們的位置信息,一旦它們注冊成功,worker將會從nsqlookup服務(wù)器節(jié)點(diǎn)上發(fā)現(xiàn)所有包含事件topic的nsqd節(jié)點(diǎn)。
nsqlookupd
2. Internals
2.1 消息傳遞擔(dān)保
1)客戶表示已經(jīng)準(zhǔn)備好接收消息
2)NSQ 發(fā)送一條消息,并暫時將數(shù)據(jù)存儲在本地(在 re-queue 或 timeout)
3)客戶端回復(fù) FIN(結(jié)束)或 REQ(重新排隊(duì))分別指示成功或失敗。如果客戶端沒有回復(fù), NSQ 會在設(shè)定的時間超時,自動重新排隊(duì)消息
這確保了消息丟失唯一可能的情況是不正常結(jié)束 nsqd 進(jìn)程。在這種情況下,這是在內(nèi)存中的任何信息(或任何緩沖未刷新到磁盤)都將丟失。
如何防止消息丟失是最重要的,即使是這個意外情況可以得到緩解。一種解決方案是構(gòu)成冗余 nsqd對(在不同的主機(jī)上)接收消息的相同部分的副本。因?yàn)槟銓?shí)現(xiàn)的消費(fèi)者是冪等的,以兩倍時間處理這些消息不會對下游造成影響,并使得系統(tǒng)能夠承受任何單一節(jié)點(diǎn)故障而不會丟失信息。
2.2 簡化配置和管理
單個 nsqd 實(shí)例被設(shè)計(jì)成可以同時處理多個數(shù)據(jù)流。流被稱為“話題”和話題有 1 個或多個“通道”。每個通道都接收到一個話題中所有消息的拷貝。在實(shí)踐中,一個通道映射到下行服務(wù)消費(fèi)一個話題。
在更底的層面,每個 nsqd 有一個與 nsqlookupd 的長期 TCP 連接,定期推動其狀態(tài)。這個數(shù)據(jù)被 nsqlookupd 用于給消費(fèi)者通知 nsqd 地址。對于消費(fèi)者來說,一個暴露的 HTTP /lookup 接口用于輪詢。為話題引入一個新的消費(fèi)者,只需啟動一個配置了 nsqlookup 實(shí)例地址的 NSQ 客戶端。無需為添加任何新的消費(fèi)者或生產(chǎn)者更改配置,大大降低了開銷和復(fù)雜性。
2.3 消除單點(diǎn)故障
NSQ被設(shè)計(jì)以分布的方式被使用。nsqd 客戶端(通過 TCP )連接到指定話題的所有生產(chǎn)者實(shí)例。沒有中間人,沒有消息代理,也沒有單點(diǎn)故障。
這種拓?fù)浣Y(jié)構(gòu)消除單鏈,聚合,反饋。相反,你的消費(fèi)者直接訪問所有生產(chǎn)者。從技術(shù)上講,哪個客戶端連接到哪個 NSQ 不重要,只要有足夠的消費(fèi)者連接到所有生產(chǎn)者,以滿足大量的消息,保證所有東西最終將被處理。對于 nsqlookupd,高可用性是通過運(yùn)行多個實(shí)例來實(shí)現(xiàn)。他們不直接相互通信和數(shù)據(jù)被認(rèn)為是最終一致。消費(fèi)者輪詢所有的配置的 nsqlookupd 實(shí)例和合并 response。失敗的,無法訪問的,或以其他方式故障的節(jié)點(diǎn)不會讓系統(tǒng)陷于停頓。
2.4 效率
對于數(shù)據(jù)的協(xié)議,通過推送數(shù)據(jù)到客戶端最大限度地提高性能和吞吐量的,而不是等待客戶端拉數(shù)據(jù)。這個概念,稱之為 RDY 狀態(tài),基本上是客戶端流量控制的一種形式。
efficiency
2.5 心跳和超時
組合應(yīng)用級別的心跳和 RDY 狀態(tài),避免頭阻塞現(xiàn)象,也可能使心跳無用(即,如果消費(fèi)者是在后面的處理消息流的接收緩沖區(qū)中,操作系統(tǒng)將被填滿,堵心跳)為了保證進(jìn)度,所有的網(wǎng)絡(luò) IO 時間上限勢必與配置的心跳間隔相關(guān)聯(lián)。這意味著,你可以從字面上拔掉之間的網(wǎng)絡(luò)連接 nsqd 和消費(fèi)者,它會檢測并正確處理錯誤。當(dāng)檢測到一個致命錯誤,客戶端連接被強(qiáng)制關(guān)閉。在傳輸中的消息會超時而重新排隊(duì)等待傳遞到另一個消費(fèi)者。最后,錯誤會被記錄并累計(jì)到各種內(nèi)部指標(biāo)。
2.6 分布式
因?yàn)镹SQ沒有在守護(hù)程序之間共享信息,所以它從一開始就是為了分布式操作而生。個別的機(jī)器可以隨便宕機(jī)隨便啟動而不會影響到系統(tǒng)的其余部分,消息發(fā)布者可以在本地發(fā)布,即使面對網(wǎng)絡(luò)分區(qū)。
這種“分布式優(yōu)先”的設(shè)計(jì)理念意味著NSQ基本上可以永遠(yuǎn)不斷地?cái)U(kuò)展,需要更高的吞吐量?那就添加更多的nsqd吧。唯一的共享狀態(tài)就是保存在lookup節(jié)點(diǎn)上,甚至它們不需要全局視圖,配置某些nsqd注冊到某些lookup節(jié)點(diǎn)上這是很簡單的配置,唯一關(guān)鍵的地方就是消費(fèi)者可以通過lookup節(jié)點(diǎn)獲取所有完整的節(jié)點(diǎn)集。清晰的故障事件——NSQ在組件內(nèi)建立了一套明確關(guān)于可能導(dǎo)致故障的的故障權(quán)衡機(jī)制,這對消息傳遞和恢復(fù)都有意義。雖然它們可能不像Kafka系統(tǒng)那樣提供嚴(yán)格的保證級別,但NSQ簡單的操作使故障情況非常明顯。
2.7 no replication
不像其他的隊(duì)列組件,NSQ并沒有提供任何形式的復(fù)制和集群,也正是這點(diǎn)讓它能夠如此簡單地運(yùn)行,但它確實(shí)對于一些高保證性高可靠性的消息發(fā)布沒有足夠的保證。我們可以通過降低文件同步的時間來部分避免,只需通過一個標(biāo)志配置,通過EBS支持我們的隊(duì)列。但是這樣仍然存在一個消息被發(fā)布后馬上死亡,丟失了有效的寫入的情況。
2.8 沒有嚴(yán)格的順序
雖然Kafka由一個有序的日志構(gòu)成,但NSQ不是。消息可以在任何時間以任何順序進(jìn)入隊(duì)列。在我們使用的案例中,這通常沒有關(guān)系,因?yàn)樗械臄?shù)據(jù)都被加上了時間戳,但它并不適合需要嚴(yán)格順序的情況。
2.9 無數(shù)據(jù)重復(fù)刪除功能
NSQ對于超時系統(tǒng),它使用了心跳檢測機(jī)制去測試消費(fèi)者是否存活還是死亡。很多原因會導(dǎo)致我們的consumer無法完成心跳檢測,所以在consumer中必須有一個單獨(dú)的步驟確保冪等性。
3. 實(shí)踐安裝過程
本文將nsq集群具體的安裝過程略去,大家可以自行參考官網(wǎng),比較簡單。這部分介紹下筆者實(shí)驗(yàn)的拓?fù)洌约皀sqadmin的相關(guān)信息。
3.1 拓?fù)浣Y(jié)構(gòu)
topology
實(shí)驗(yàn)采用3臺NSQD服務(wù),2臺LOOKUPD服務(wù)。
采用官方推薦的拓?fù)?,消息發(fā)布的服務(wù)和NSQD在一臺主機(jī)。一共5臺機(jī)器。
NSQ基本沒有配置文件,配置通過命令行指定參數(shù)。
主要命令如下:
LOOKUPD命令
NSQD命令
工具類,消費(fèi)后存儲到本地文件。
發(fā)布一條消息
3.2 nsqadmin
對Streams的詳細(xì)信息進(jìn)行查看,包括NSQD節(jié)點(diǎn),具體的channel,隊(duì)列中的消息數(shù),連接數(shù)等信息。
nsqadmin
channel
列出所有的NSQD節(jié)點(diǎn):
nodes
消息的統(tǒng)計(jì):
msgs
lookup主機(jī)的列表:
hosts
4. 總結(jié)
NSQ基本核心就是簡單性,是一個簡單的隊(duì)列,這意味著它很容易進(jìn)行故障推理和很容易發(fā)現(xiàn)bug。消費(fèi)者可以自行處理故障事件而不會影響系統(tǒng)剩下的其余部分。
事實(shí)上,簡單性是我們決定使用NSQ的首要因素,這方便與我們的許多其他軟件一起維護(hù),通過引入隊(duì)列使我們得到了堪稱完美的表現(xiàn),通過隊(duì)列甚至讓我們增加了幾個數(shù)量級的吞吐量。越來越多的consumer需要一套嚴(yán)格可靠性和順序性保障,這已經(jīng)超過了NSQ提供的簡單功能。
結(jié)合我們的業(yè)務(wù)系統(tǒng)來看,對于我們所需要傳輸?shù)陌l(fā)票消息,相對比較敏感,無法容忍某個nsqd宕機(jī),或者磁盤無法使用的情況,該節(jié)點(diǎn)堆積的消息無法找回。這是我們沒有選擇該消息中間件的主要原因。簡單性和可靠性似乎并不能完全滿足。相比Kafka,ops肩負(fù)起更多負(fù)責(zé)的運(yùn)營。另一方面,它擁有一個可復(fù)制的、有序的日志可以提供給我們更好的服務(wù)。但對于其他適合NSQ的consumer,它為我們服務(wù)的相當(dāng)好,我們期待著繼續(xù)鞏固它的堅(jiān)實(shí)的基礎(chǔ)。
一、Kafka簡述
1. 為什么需要用到消息隊(duì)列
異步:對比以前的串行同步方式來說,可以在同一時間做更多的事情,提高效率;
解耦:在耦合太高的場景,多個任務(wù)要對同一個數(shù)據(jù)進(jìn)行操作消費(fèi)的時候,會導(dǎo)致一個任務(wù)的處理因?yàn)榱硪粋€任務(wù)對數(shù)據(jù)的操作變得及其復(fù)雜。
緩沖:當(dāng)遇到突發(fā)大流量的時候,消息隊(duì)列可以先把所有消息有序保存起來,避免直接作用于系統(tǒng)主體,系統(tǒng)主題始終以一個平穩(wěn)的速率去消費(fèi)這些消息。
2.為什么選擇kafka呢?
這沒有絕對的好壞,看個人需求來選擇,我這里就抄了一段他人總結(jié)的的優(yōu)缺點(diǎn),可見原文
kafka的優(yōu)點(diǎn):
1.支持多個生產(chǎn)者和消費(fèi)者2.支持broker的橫向拓展3.副本集機(jī)制,實(shí)現(xiàn)數(shù)據(jù)冗余,保證數(shù)據(jù)不丟失4.通過topic將數(shù)據(jù)進(jìn)行分類5.通過分批發(fā)送壓縮數(shù)據(jù)的方式,減少數(shù)據(jù)傳輸開銷,提高吞高量6.支持多種模式的消息7.基于磁盤實(shí)現(xiàn)數(shù)據(jù)的持久化8.高性能的處理信息,在大數(shù)據(jù)的情況下,可以保證亞秒級的消息延遲9.一個消費(fèi)者可以支持多種topic的消息10.對CPU和內(nèi)存的消耗比較小11.對網(wǎng)絡(luò)開銷也比較小12.支持跨數(shù)據(jù)中心的數(shù)據(jù)復(fù)制13.支持鏡像集群
kafka的缺點(diǎn):
1.由于是批量發(fā)送,所以數(shù)據(jù)達(dá)不到真正的實(shí)時2.對于mqtt協(xié)議不支持3.不支持物聯(lián)網(wǎng)傳感數(shù)據(jù)直接接入4.只能支持統(tǒng)一分區(qū)內(nèi)消息有序,無法實(shí)現(xiàn)全局消息有序5.監(jiān)控不完善,需要安裝插件6.需要配合zookeeper進(jìn)行元數(shù)據(jù)管理7.會丟失數(shù)據(jù),并且不支持事務(wù)8.可能會重復(fù)消費(fèi)數(shù)據(jù),消息會亂序,可用保證一個固定的partition內(nèi)部的消息是有序的,但是一個topic有多個partition的話,就不能保證有序了,需要zookeeper的支持,topic一般需要人工創(chuàng)建,部署和維護(hù)一般都比mq高
3. Golang 操作kafka
3.1. kafka的環(huán)境
網(wǎng)上有很多搭建kafka環(huán)境教程,這里就不再搭建,就展示一下kafka的環(huán)境,在kubernetes上進(jìn)行的搭建,有需要的私我,可以發(fā)yaml文件
3.2. 第三方庫
github.com/Shopify/sarama // kafka主要的庫*github.com/bsm/sarama-cluster // kafka消費(fèi)組
3.3. 消費(fèi)者
單個消費(fèi)者
funcconsumer(){varwg sync.WaitGroup? consumer, err := sarama.NewConsumer([]string{"172.20.3.13:30901"},nil)iferr !=nil{? ? ? fmt.Println("Failed to start consumer: %s", err)return}? partitionList, err := consumer.Partitions("test0")//獲得該topic所有的分區(qū)iferr !=nil{? ? ? fmt.Println("Failed to get the list of partition:, ", err)return}forpartition :=rangepartitionList {? ? ? pc, err := consumer.ConsumePartition("test0",int32(partition), sarama.OffsetNewest)iferr !=nil{? ? ? ? fmt.Println("Failed to start consumer for partition %d: %s\n", partition, err)return}? ? ? wg.Add(1)gofunc(sarama.PartitionConsumer){//為每個分區(qū)開一個go協(xié)程去取值formsg :=rangepc.Messages() {//阻塞直到有值發(fā)送過來,然后再繼續(xù)等待fmt.Printf("Partition:%d, Offset:%d, key:%s, value:%s\n", msg.Partition, msg.Offset,string(msg.Key),string(msg.Value))? ? ? ? }deferpc.AsyncClose()? ? ? ? wg.Done()? ? ? }(pc)? }? wg.Wait()}funcmain(){? consumer()}
消費(fèi)組
funcconsumerCluster(){? groupID :="group-1"config := cluster.NewConfig()? config.Group.Return.Notifications =trueconfig.Consumer.Offsets.CommitInterval =1* time.Second? config.Consumer.Offsets.Initial = sarama.OffsetNewest//初始從最新的offset開始c, err := cluster.NewConsumer(strings.Split("172.20.3.13:30901",","),groupID, strings.Split("test0",","), config)iferr !=nil{? ? ? glog.Errorf("Failed open consumer: %v", err)return}deferc.Close()gofunc(c *cluster.Consumer){? ? ? errors := c.Errors()? ? ? noti := c.Notifications()for{select{caseerr := -errors:? ? ? ? ? ? glog.Errorln(err)case-noti:? ? ? ? }? ? ? }? }(c)formsg :=rangec.Messages() {? ? ? fmt.Printf("Partition:%d, Offset:%d, key:%s, value:%s\n", msg.Partition, msg.Offset,string(msg.Key),string(msg.Value))? ? ? c.MarkOffset(msg,"")//MarkOffset 并不是實(shí)時寫入kafka,有可能在程序crash時丟掉未提交的offset}}funcmain(){goconsumerCluster()}
3.4. 生產(chǎn)者
同步生產(chǎn)者
packagemainimport("fmt""github.com/Shopify/sarama")funcmain(){? config := sarama.NewConfig()? config.Producer.RequiredAcks = sarama.WaitForAll//賦值為-1:這意味著producer在follower副本確認(rèn)接收到數(shù)據(jù)后才算一次發(fā)送完成。config.Producer.Partitioner = sarama.NewRandomPartitioner//寫到隨機(jī)分區(qū)中,默認(rèn)設(shè)置8個分區(qū)config.Producer.Return.Successes =truemsg := sarama.ProducerMessage{}? msg.Topic =`test0`msg.Value = sarama.StringEncoder("Hello World!")? client, err := sarama.NewSyncProducer([]string{"172.20.3.13:30901"}, config)iferr !=nil{? ? ? fmt.Println("producer close err, ", err)return}deferclient.Close()? pid, offset, err := client.SendMessage(msg)iferr !=nil{? ? ? fmt.Println("send message failed, ", err)return}? fmt.Printf("分區(qū)ID:%v, offset:%v \n", pid, offset)}
異步生產(chǎn)者
funcasyncProducer(){? config := sarama.NewConfig()? config.Producer.Return.Successes =true//必須有這個選項(xiàng)config.Producer.Timeout =5* time.Second? p, err := sarama.NewAsyncProducer(strings.Split("172.20.3.13:30901",","), config)deferp.Close()iferr !=nil{return}//這個部分一定要寫,不然通道會被堵塞gofunc(p sarama.AsyncProducer){? ? ? errors := p.Errors()? ? ? success := p.Successes()for{select{caseerr := -errors:iferr !=nil{? ? ? ? ? ? ? glog.Errorln(err)? ? ? ? ? ? }case-success:? ? ? ? }? ? ? }? }(p)for{? ? ? v :="async: "+ strconv.Itoa(rand.New(rand.NewSource(time.Now().UnixNano())).Intn(10000))? ? ? fmt.Fprintln(os.Stdout, v)? ? ? msg := sarama.ProducerMessage{? ? ? ? Topic: topics,? ? ? ? Value: sarama.ByteEncoder(v),? ? ? }? ? ? p.Input() - msg? ? ? time.Sleep(time.Second *1)? }}funcmain(){goasyncProducer()select{? ? ? }}
3.5. 結(jié)果展示-
同步生產(chǎn)打?。?/p>
分區(qū)ID:0,offset:90
消費(fèi)打印:
Partition:0,Offset:90,key:,value:Hello World!
異步生產(chǎn)打印:
async:7272async:7616async:998
消費(fèi)打?。?/p>
Partition:0,Offset:91,key:,value:async:7272Partition:0,Offset:92,key:,value:async:7616Partition:0,Offset:93,key:,value:async:998
新聞名稱:go語言kafka客戶端 kafka支持的客戶端語言
本文URL:http://m.kartarina.com/article42/dodsgec.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供品牌網(wǎng)站建設(shè)、網(wǎng)頁設(shè)計(jì)公司、動態(tài)網(wǎng)站、域名注冊、網(wǎng)站改版、搜索引擎優(yōu)化
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)