go語言kafak,go語言kafka

聊聊golang的zap的ZapKafkaWriter

本文主要研究一下golang的zap的ZapKafkaWriter

讓客戶滿意是我們工作的目標,不斷超越客戶的期望值來自于我們對這個行業的熱愛。我們立志把好的技術通過有效、簡單的方式提供給客戶,將通過不懈努力成為客戶在信息化領域值得信任、有價值的長期合作伙伴,公司提供的服務項目有:域名與空間、虛擬空間、營銷軟件、網站建設、黃岡網站維護、網站推廣。

WriteSyncer內嵌了io.Writer接口,定義了Sync方法;Sink接口內嵌了zapcore.WriteSyncer及io.Closer接口;ZapKafkaWriter實現Sink接口及zapcore.WriteSyncer接口,其Write方法直接將data通過kafka發送出去。

golang的回調和接口

最近寫了個kafka的接收消息的功能,需要使用回調處理收到的消息。

一個是基本的回調,一個是使用接口功能實現回調,對接口是個很好的學習。

1.正常回調

kafka的接收消息處。收到消息后,使用傳入的Onmessage進行處理。

調用kafka接收消息的單元,并在調用方寫好回調

在調用方實現回調需要執行的方法

感覺還是使用基本回調相對簡單點,接口就當學習了。

另外跨包的接口的方法要大寫!定位了好久發現個入門的問題。

Golang kafka簡述和操作(sarama同步異步和消費組)

一、Kafka簡述

1. 為什么需要用到消息隊列

異步:對比以前的串行同步方式來說,可以在同一時間做更多的事情,提高效率;

解耦:在耦合太高的場景,多個任務要對同一個數據進行操作消費的時候,會導致一個任務的處理因為另一個任務對數據的操作變得及其復雜。

緩沖:當遇到突發大流量的時候,消息隊列可以先把所有消息有序保存起來,避免直接作用于系統主體,系統主題始終以一個平穩的速率去消費這些消息。

2.為什么選擇kafka呢?

這沒有絕對的好壞,看個人需求來選擇,我這里就抄了一段他人總結的的優缺點,可見原文

kafka的優點:

1.支持多個生產者和消費者2.支持broker的橫向拓展3.副本集機制,實現數據冗余,保證數據不丟失4.通過topic將數據進行分類5.通過分批發送壓縮數據的方式,減少數據傳輸開銷,提高吞高量6.支持多種模式的消息7.基于磁盤實現數據的持久化8.高性能的處理信息,在大數據的情況下,可以保證亞秒級的消息延遲9.一個消費者可以支持多種topic的消息10.對CPU和內存的消耗比較小11.對網絡開銷也比較小12.支持跨數據中心的數據復制13.支持鏡像集群

kafka的缺點:

1.由于是批量發送,所以數據達不到真正的實時2.對于mqtt協議不支持3.不支持物聯網傳感數據直接接入4.只能支持統一分區內消息有序,無法實現全局消息有序5.監控不完善,需要安裝插件6.需要配合zookeeper進行元數據管理7.會丟失數據,并且不支持事務8.可能會重復消費數據,消息會亂序,可用保證一個固定的partition內部的消息是有序的,但是一個topic有多個partition的話,就不能保證有序了,需要zookeeper的支持,topic一般需要人工創建,部署和維護一般都比mq高

3. Golang 操作kafka

3.1. kafka的環境

網上有很多搭建kafka環境教程,這里就不再搭建,就展示一下kafka的環境,在kubernetes上進行的搭建,有需要的私我,可以發yaml文件

3.2. 第三方庫

github.com/Shopify/sarama // kafka主要的庫*github.com/bsm/sarama-cluster // kafka消費組

3.3. 消費者

單個消費者

funcconsumer(){varwg sync.WaitGroup? consumer, err := sarama.NewConsumer([]string{"172.20.3.13:30901"},nil)iferr !=nil{? ? ? fmt.Println("Failed to start consumer: %s", err)return}? partitionList, err := consumer.Partitions("test0")//獲得該topic所有的分區iferr !=nil{? ? ? fmt.Println("Failed to get the list of partition:, ", err)return}forpartition :=rangepartitionList {? ? ? pc, err := consumer.ConsumePartition("test0",int32(partition), sarama.OffsetNewest)iferr !=nil{? ? ? ? fmt.Println("Failed to start consumer for partition %d: %s\n", partition, err)return}? ? ? wg.Add(1)gofunc(sarama.PartitionConsumer){//為每個分區開一個go協程去取值formsg :=rangepc.Messages() {//阻塞直到有值發送過來,然后再繼續等待fmt.Printf("Partition:%d, Offset:%d, key:%s, value:%s\n", msg.Partition, msg.Offset,string(msg.Key),string(msg.Value))? ? ? ? }deferpc.AsyncClose()? ? ? ? wg.Done()? ? ? }(pc)? }? wg.Wait()}funcmain(){? consumer()}

消費組

funcconsumerCluster(){? groupID :="group-1"config := cluster.NewConfig()? config.Group.Return.Notifications =trueconfig.Consumer.Offsets.CommitInterval =1* time.Second? config.Consumer.Offsets.Initial = sarama.OffsetNewest//初始從最新的offset開始c, err := cluster.NewConsumer(strings.Split("172.20.3.13:30901",","),groupID, strings.Split("test0",","), config)iferr !=nil{? ? ? glog.Errorf("Failed open consumer: %v", err)return}deferc.Close()gofunc(c *cluster.Consumer){? ? ? errors := c.Errors()? ? ? noti := c.Notifications()for{select{caseerr := -errors:? ? ? ? ? ? glog.Errorln(err)case-noti:? ? ? ? }? ? ? }? }(c)formsg :=rangec.Messages() {? ? ? fmt.Printf("Partition:%d, Offset:%d, key:%s, value:%s\n", msg.Partition, msg.Offset,string(msg.Key),string(msg.Value))? ? ? c.MarkOffset(msg,"")//MarkOffset 并不是實時寫入kafka,有可能在程序crash時丟掉未提交的offset}}funcmain(){goconsumerCluster()}

3.4. 生產者

同步生產者

packagemainimport("fmt""github.com/Shopify/sarama")funcmain(){? config := sarama.NewConfig()? config.Producer.RequiredAcks = sarama.WaitForAll//賦值為-1:這意味著producer在follower副本確認接收到數據后才算一次發送完成。config.Producer.Partitioner = sarama.NewRandomPartitioner//寫到隨機分區中,默認設置8個分區config.Producer.Return.Successes =truemsg := sarama.ProducerMessage{}? msg.Topic =`test0`msg.Value = sarama.StringEncoder("Hello World!")? client, err := sarama.NewSyncProducer([]string{"172.20.3.13:30901"}, config)iferr !=nil{? ? ? fmt.Println("producer close err, ", err)return}deferclient.Close()? pid, offset, err := client.SendMessage(msg)iferr !=nil{? ? ? fmt.Println("send message failed, ", err)return}? fmt.Printf("分區ID:%v, offset:%v \n", pid, offset)}

異步生產者

funcasyncProducer(){? config := sarama.NewConfig()? config.Producer.Return.Successes =true//必須有這個選項config.Producer.Timeout =5* time.Second? p, err := sarama.NewAsyncProducer(strings.Split("172.20.3.13:30901",","), config)deferp.Close()iferr !=nil{return}//這個部分一定要寫,不然通道會被堵塞gofunc(p sarama.AsyncProducer){? ? ? errors := p.Errors()? ? ? success := p.Successes()for{select{caseerr := -errors:iferr !=nil{? ? ? ? ? ? ? glog.Errorln(err)? ? ? ? ? ? }case-success:? ? ? ? }? ? ? }? }(p)for{? ? ? v :="async: "+ strconv.Itoa(rand.New(rand.NewSource(time.Now().UnixNano())).Intn(10000))? ? ? fmt.Fprintln(os.Stdout, v)? ? ? msg := sarama.ProducerMessage{? ? ? ? Topic: topics,? ? ? ? Value: sarama.ByteEncoder(v),? ? ? }? ? ? p.Input() - msg? ? ? time.Sleep(time.Second *1)? }}funcmain(){goasyncProducer()select{? ? ? }}

3.5. 結果展示-

同步生產打印:

分區ID:0,offset:90

消費打印:

Partition:0,Offset:90,key:,value:Hello World!

異步生產打印:

async:7272async:7616async:998

消費打印:

Partition:0,Offset:91,key:,value:async:7272Partition:0,Offset:92,key:,value:async:7616Partition:0,Offset:93,key:,value:async:998

文章標題:go語言kafak,go語言kafka
文章分享:http://m.kartarina.com/article26/hddijg.html

成都網站建設公司_創新互聯,為您提供網站內鏈App設計電子商務網站收錄用戶體驗營銷型網站建設

廣告

聲明:本網站發布的內容(圖片、視頻和文字)以用戶投稿、用戶轉載內容為主,如果涉及侵權請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網站立場,如需處理請聯系客服。電話:028-86922220;郵箱:631063699@qq.com。內容未經允許不得轉載,或轉載時需注明來源: 創新互聯

h5響應式網站建設
主站蜘蛛池模板: yy111111电影院少妇影院无码| 久久国产加勒比精品无码| 国产成人精品无码免费看| 久久老子午夜精品无码怎么打| 91精品久久久久久无码| 一区二区三区无码高清视频| 久久亚洲精品成人无码网站| 久久久精品人妻无码专区不卡| 久久精品亚洲中文字幕无码麻豆| av色欲无码人妻中文字幕| 亚洲AV无码一区二三区| 中文字幕无码日韩欧毛 | 中国无码人妻丰满熟妇啪啪软件 | 高h纯肉无码视频在线观看| 成人免费一区二区无码视频| 日韩欧精品无码视频无删节| 国产成人无码午夜福利软件| 精品久久久久久中文字幕无码 | 无码国产福利av私拍| 亚洲无码日韩精品第一页| 亚洲a∨无码精品色午夜| 91无码人妻精品一区二区三区L| 东京无码熟妇人妻AV在线网址| 亚洲AV蜜桃永久无码精品| 无码国产精成人午夜视频不卡| 免费无码成人AV在线播放不卡| 亚洲成A人片在线观看无码不卡| 秋霞鲁丝片无码av| 免费人妻无码不卡中文字幕18禁| 亚洲乱亚洲乱妇无码| 亚洲Aⅴ无码专区在线观看q| 久久午夜无码免费| 亚洲AV无码久久寂寞少妇| 亚洲精品无码成人AAA片| 国99精品无码一区二区三区| 亚洲AV中文无码字幕色三| 无码精品一区二区三区在线| 蜜桃无码一区二区三区| 蜜桃臀AV高潮无码| 亚洲国产成人精品无码区花野真一| 亚洲av中文无码乱人伦在线观看|