PulsarFunction例子-創新互聯

在單機環境下實現字符串追加函數(Pulsar 2.4.2版本)

創新互聯公司主營云南網站建設的網絡公司,主營網站建設方案,成都app開發,云南h5微信小程序開發搭建,云南網站營銷推廣歡迎云南等地區企業咨詢

1 啟動單機Pulsar

$ bin/pulsar-daemon start standalone

2 創建函數

1) 準備環境

項目引用 compile 'org.apache.pulsar:pulsar-functions-api:2.4.2'

2) 創建JAVA函數(此函數用于數據源來的topic schema是string,輸出的tiopic schema是string)

Pulsar Function 例子

導出jar包,放到pulsar服務器目錄下,本例子放在 /data/jar/下

3)使用命令行工具加載函數到Pulsar,? ? ? ? ? ? ? ? ? ? ?

bin/pulsar-admin functions create \

--classname test.AppStrFunction \

--jar /data/jar/pf.jar \

--inputs persistent://public/default/tlstest \

--output persistent://public/default/teststr \

--tenant public \

--namespace default \

--name appStrFunction

參數說明:

參數
說明
functions通知 pulsar broker,函數操作
create創建函數,默認創建成功后啟動
classname函數類名稱,需要加上包名
jar指定 jar 包的運行路徑
inputs指定 函數 數據的來源在哪里,支持多個 topics 作為輸入
output如果該 函數 有輸出(有些情況下,function 沒有輸出),指定 function 輸出的 topic,只能有一個輸出
tenant指定該 函數 運行的租戶名
namespace指定該 函數 運行的命名空間
name指定該 函數 運行的名稱
以下是函數相關其他操作

停止函數

bin/pulsar-admin functions stop \

--tenant public \

--namespace default \

--name appStrFunction

啟動函數

bin/pulsar-admin functions start \

--tenant public \

--namespace default \

--name appStrFunction

刪除函數

bin/pulsar-admin functions delete \

--tenant public \

--namespace default \

--name appStrFunction

函數的日志在 pulsar安裝目錄 /logs/functions下

3 測試函數

根據前邊函數已成功加載啟動

1)向tlstest主題發送消息? ?

import?java.util.concurrent.TimeUnit; import?org.apache.pulsar.client.api.Producer; import?org.apache.pulsar.client.api.PulsarClient; import?org.apache.pulsar.client.api.Schema; public?class?SendMsgTest{ ??public?static?void?main(String[]?args){ ??????String?url="pulsar://192.168.1.48:6650"; ??try{ ?????PulsarClient?client?=PulsarClient.builder() ???????????.serviceUrl(url) ???????????.connectionTimeout(10,TimeUnit.SECONDS) ???????????.build(); ?????Producer<String>?producer=client.newProducer(Schema.STRING) ???????????.topic("tlstest") ???????????.sendTimeout(10,TimeUnit.SECONDS) ???????????.producerName("senduser") ???????????.create(); ???????????producer.send("this?is?a?book"); ???????????System.out.print("send?ok"); ???????????client.close(); ??????}catch(Exception?e){ ????????e.printStackTrace(); ??????} ??} }

2)讀取teststr主題消息

import?org.apache.pulsar.client.api.Consumer; import?org.apache.pulsar.client.api.Message; import?org.apache.pulsar.client.api.PulsarClient; import?org.apache.pulsar.client.api.Schema; import?org.apache.pulsar.client.api.SubscriptionInitialPosition; import?org.apache.pulsar.client.api.SubscriptionType; import?org.apache.pulsar.client.impl.schema.JSONSchema; import?schema.OrderModel; import?com.alibaba.fastjson.JSON; public?class?RecFunTest?{ public?static?void?main(String[]?args)?{ String?url?=?"http://192.168.1.48:8080"; try{ ??PulsarClient?client?=PulsarClient.builder() ????.serviceUrl(url) ????.build(); ?Consumer<String>?consumer=client.newConsumer(Schema.STRING) ????.topic("teststr") ????.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) ????.subscriptionType(SubscriptionType.Exclusive)//訂閱模式??Exclusive(獨占,默認模式)?Failover(災備)Shared(共享) ????.subscriptionName("wbq")//訂閱者名稱 ????.subscribe(); ?while?(true)?{ ???Message<String>?mondmsg?=?consumer.receive(); ???String?msg=mondmsg.getValue(); ????????????????System.out.println("receive?message=:"+msg); ?????????????} ??}catch(Exception?e){ ?????e.printStackTrace(); ??} ?} }

另外有需要云服務器可以了解下創新互聯cdcxhl.cn,海內外云服務器15元起步,三天無理由+7*72小時售后在線,公司持有idc許可證,提供“云服務器、裸金屬服務器、高防服務器、香港服務器、美國服務器、虛擬主機、免備案服務器”等云主機租用服務以及企業上云的綜合解決方案,具有“安全穩定、簡單易用、服務可用性高、性價比高”等特點與優勢,專為企業上云打造定制,能夠滿足用戶豐富、多元化的應用場景需求。

網站欄目:PulsarFunction例子-創新互聯
本文來源:http://m.kartarina.com/article16/cddggg.html

成都網站建設公司_創新互聯,為您提供手機網站建設做網站網站營銷服務器托管品牌網站設計關鍵詞優化

廣告

聲明:本網站發布的內容(圖片、視頻和文字)以用戶投稿、用戶轉載內容為主,如果涉及侵權請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網站立場,如需處理請聯系客服。電話:028-86922220;郵箱:631063699@qq.com。內容未經允許不得轉載,或轉載時需注明來源: 創新互聯

網站托管運營
主站蜘蛛池模板: 午夜福利av无码一区二区| 国产莉萝无码AV在线播放| 午夜无码中文字幕在线播放| 免费人妻无码不卡中文字幕系| 亚洲中文无码av永久| 免费无码AV一区二区| 久久亚洲精品无码aⅴ大香| 成人免费无码大片A毛片抽搐 | 国产成年无码久久久久下载| 一本大道无码日韩精品影视_| 东京热av人妻无码专区| 亚洲欧洲无码一区二区三区| 中文字幕丰满乱子无码视频| 无码福利写真片视频在线播放| 久久精品无码一区二区app| 精品无码国产自产在线观看水浒传| 亚洲AV无码一区二区三区在线观看| 无码国产伦一区二区三区视频| 色欲香天天综合网无码| 无码中文在线二区免费| 亚洲精品色午夜无码专区日韩| 尤物永久免费AV无码网站| 亚洲色av性色在线观无码| 国产精品亚洲а∨无码播放| 无码的免费不卡毛片视频| 无码aⅴ精品一区二区三区| 亚洲另类无码专区首页| 91无码人妻精品一区二区三区L| 国产亚洲精品无码成人| 成人无码一区二区三区| 久久久精品人妻无码专区不卡| 成人免费一区二区无码视频| 无码日韩人妻av一区免费| 91精品久久久久久无码| 无码国产精品一区二区免费式直播 | 精品无码成人片一区二区98| 亚洲AV无码专区国产乱码4SE | 亚洲国产精品无码久久一区二区| 久久人妻无码一区二区| 免费无码AV片在线观看软件| 大胆日本无码裸体日本动漫|