Flink集成iceberg在生產環境中的使用方法是什么

這篇文章主要介紹“Flink集成iceberg在生產環境中的使用方法是什么”,在日常操作中,相信很多人在Flink集成iceberg在生產環境中的使用方法是什么問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”Flink集成iceberg在生產環境中的使用方法是什么”的疑惑有所幫助!接下來,請跟著小編一起來學習吧!

創新互聯專注于企業營銷型網站建設、網站重做改版、坪山網站定制設計、自適應品牌網站建設、HTML5成都商城網站開發、集團公司官網建設、成都外貿網站制作、高端網站制作、響應式網頁設計等建站業務,價格優惠性價比高,為坪山等各大城市提供網站開發制作服務。

背景

在大數據處理領域,有一個非常常見但是很麻煩的問題,即hdfs小文件問題,我們也被這個問題困擾了很久。開始的時候我們是自己寫的一個小文件壓縮工具,定期的去合并,原理就是把待壓縮數據寫入一個新的臨時的文件夾,壓縮完,和原來的數據進行檢驗,數據一致之后,用壓縮的數據覆蓋原來的數據,但是由于無法保證事務,所以出現了很多的問題,比如壓縮的同時又有數據寫入了,檢驗就會失敗,導致合并小文件失敗,而且無法實時的合并,只能按照分區合并一天之前的。或者一個小時之前的,最新的數據仍然有小文件的問題,導致查詢性能提高不了。

所以基于以上的一些問題,我調研了數據湖技術,由于我們的流式數據主要是flink為主,查詢引擎是presto,而hudi強耦合了spark,對flink的支持還不太友好,所以綜合考慮了一下,決定引入iceberg。在對iceberg進行功能測試和簡單代碼review之后,發現iceberg在flink這塊還有一些需要優化和提升,不過我覺得應該能hold的住,不完善的地方和需要優化的地方我們自己來補全,所以最終引入了iceberg來解決小文件的問題。

除此之外,對于一些其他的問題,比如cdc數據的接入,以及根據查詢條件刪除數據等,后續也可以通過數據湖技術來解決。

flink流式數據寫入iceberg

我們的主要使用場景是使用flink將kafka的流式數據寫入到Iceberg,為了代碼的簡潔以及可維護性,我們盡量將程序使用sql來編寫,示例代碼如下:

// create catalog CREATE CATALOG iceberg WITH (  'type'='iceberg',  'catalog-type'='hive'," +   'warehouse'='hdfs://localhost/user/hive/warehouse',   'uri'='thrift://localhost:9083')// create table CREATE TABLE iceberg.tmp.iceberg_table (
  id BIGINT COMMENT 'unique id',
    data STRING,
   d int) 
PARTITIONED BY (d)WITH ('connector'='iceberg','write.format.default'='orc')// insert into insert into iceberg.tmp.iceberg_table select * from kafka_table

提示:記得開啟checkpoint

壓縮小文件

目前壓縮小文件是采用的一個額外批任務來進行的,Iceberg提供了一個spark版本的action,我在做功能測試的時候發現了一些問題,此外我對spark也不是非常熟悉,擔心出了問題不好排查,所以參照spark版本的自己實現了一個flink版本,并修復了一些bug,進行了一些功能的優化。

由于我們的iceberg的元數據都是存儲在hive中的,所以壓縮程序的邏輯是我把hive中所有的iceberg表全部都查出來,依次壓縮。壓縮沒有過濾條件,不管是分區表還是非分區表,都進行全表的壓縮。這樣做是為了處理某些使用eventtime的flink任務,如果有延遲的數據的到來。就會把數據寫入以前的分區,如果不是全表壓縮只壓縮當天分區的話,新寫入的其他天的數據就不會被壓縮。

代碼示例參考:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();Actions.forTable(env, table)
		.rewriteDataFiles()//.maxParallelism(parallelism)//.filter(Expressions.equal("day", day))//.targetSizeInBytes(targetSizeInBytes).execute();

具體的壓縮小文件相關的信息可以參考這篇文章[Flink集成iceberg數據湖之合并小文件]。

快照過期處理

我們的快照過期策略,我是和壓縮小文件的批處理任務寫在一起的,壓縮完小文件之后,進行表的快照過期處理,目前保留的時間是一個小時,這是因為對于有一些比較大的表,分區比較多,而且checkpoint比較短,如果保留的快照過長的話,還是會保留過多小文件,我們暫時沒有查詢歷史快照的需求,所以我將快照的保留時間設置了一個小時。

long olderThanTimestamp = System.currentTimeMillis() - TimeUnit.HOURS.toMillis(1);
   table.expireSnapshots()
   // .retainLast(20)
   .expireOlderThan(olderThanTimestamp)
   .commit();

數據管理

寫入了數據之后,有時候我想查看一下相應的快照下面有多少數據文件,直接查詢hdfs你不知道哪個是有用的,哪個是沒用的。所以需要有對應的管理工具。目前flink這塊還不太成熟,我們可以使用spark3提供的工具來查看。

ddl

目前create table 這些操作我們是通過flink sql client來做的。其他相關的ddl的操作可以使用spark來做:

https://iceberg.apache.org/spark/#ddl-commands

Dml

一些相關的數據的操作,比如刪除數據等可以通過spark來實現,presto目前只支持分區級別的刪除功能。

移除孤立的文件

定時任務刪除

在使用iceberg的過程中,有時候會有這樣的情況,我提交了一個flink任務,由于各種原因,我把它給停了,這個時候iceberg還沒提交相應的快照。還有由于一些異常導致程序失敗,就會產生一些不在iceberg元數據里面的孤立的數據文件,這些文件對iceberg來說是不可達的,也是沒用的。所以我們需要像jvm的垃圾回收一樣來清理這些文件。

目前iceberg提供了一個spark版本的action來進行處理這些沒用的文件,我們采取的策略和壓縮小文件一樣,獲取hive中的所有的iceberg表。每隔一個小時執行一次定時任務來刪除這些沒用的文件。

  SparkSession spark = ......  Actions.forTable(spark, table) .removeOrphanFiles() //.deleteWith(...) .execute();

踩坑

在程序運行過程中出現了正常的數據文件被刪除的問題,經過調研,由于我的快照保留設置是一小時,這個清理程序清理時間也是設置一個小時,通過日志發現是這個清理程序刪除了正常的數據。查了查代碼,覺得應該是他們設置了一樣的時間,在清理孤立文件的時候,有其他程序正在讀寫表,由于這個清理程序是沒有事務的,導致刪除了正常的數據。最后把這個清理程序的清理時間改成默認的三天,沒有再出現刪除數據文件的問題。當然,為了保險起見,我們可以覆蓋原來的刪除文件的方法,改成將文件到一個備份文件夾,檢查沒有問題之后,手工刪除。

使用presto進行查詢

目前我們使用的版本是prestosql 346,這個版本安裝的時候需要jdk11,presto查詢iceberg比較簡單。官方提供了相應的conncter,我們配置一下就行,

//iceberg.propertiesconnector.name=iceberg
hive.metastore.uri=thrift://localhost:9083

批任務處理

手工執行sql批任務

目前查詢iceberg的批處理任務,使用的flink的客戶端,首先我們啟動一個基于yarn session 的flink集群,然后通過sql客戶端提交任務到集群。

主要的配置就是我們需要根據數據的大小設置sql任務執行的并行度,可以通過以下參數設置。

set table.exec.resource.default-parallelism = 100;

此外我在sql客戶端的配置文件里配置了hive和iceberg相應的catalog,這樣每次客戶端啟動的時候就不需要建catalog了。

catalogs:  # empty list  - name: iceberg    type: iceberg    warehouse: hdfs://localhost/user/hive2/warehouse    uri: thrift://localhost:9083    catalog-type: hive    cache-enabled: false - name: hive    type: hive    hive-conf-dir: /Users/user/work/hive/conf    default-database: default

定時任務

目前對于定時調度中的批處理任務,flink的sql客戶端還沒hive那樣做的很完善,比如執行hive -f來執行一個文件。而且不同的任務需要不同的資源,并行度等。所以我自己封裝了一個flinK程序,通過調用這個程序來進行處理,讀取一個指定文件里面的sql,來提交批任務。在命令行控制任務的資源和并行度等。

/home/flink/bin/flink run -p 10 -m yarn-cluster  /home/work/iceberg-scheduler.jar my.sql

優化

批任務的查詢這塊,做了一些優化,比如limit下推,filter下推,查詢并行度優化等,可以大大提高查詢的速度,這些優化都已經推回給社區。

數據遷移

目前我們的所有數據都是存儲在hive表的,在驗證完iceberg之后,我們決定將hive的數據遷移到iceberg,所以我寫了一個工具,可以使用hive的數據,然后新建一個iceberg表,為其建立相應的元數據,但是測試的時候發現,如果采用這種方式,就需要把寫入hive的程序停止,因為如果iceberg和hive使用同一個數據文件,而壓縮程序會不斷地壓縮iceberg表的小文件,壓縮完之后,不會馬上刪除舊數據,所以hive表就會查到雙份的數據。鑒于iceberg測試的時候還有一些不穩定,所以我們采用雙寫的策略,原來寫入hive的程序不動,新啟動一套程序寫入iceberg,這樣能對iceberg表觀察一段時間。還能和原來hive中的數據進行比對,來驗證程序的正確性。

經過一段時間觀察,每天將近20億數據的hive表和iceberg表,一條數據也不差。所以在最終對比數據沒有問題之后,把hive表停止寫入,使用新的iceberg表,然后把hive中的舊數據導入到iceberg。

到此,關于“Flink集成iceberg在生產環境中的使用方法是什么”的學習就結束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學習,快去試試吧!若想繼續學習更多相關知識,請繼續關注創新互聯網站,小編會繼續努力為大家帶來更多實用的文章!

文章題目:Flink集成iceberg在生產環境中的使用方法是什么
標題URL:http://m.kartarina.com/article28/jecdjp.html

成都網站建設公司_創新互聯,為您提供小程序開發自適應網站、App設計、、網站內鏈、標簽優化

廣告

聲明:本網站發布的內容(圖片、視頻和文字)以用戶投稿、用戶轉載內容為主,如果涉及侵權請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網站立場,如需處理請聯系客服。電話:028-86922220;郵箱:631063699@qq.com。內容未經允許不得轉載,或轉載時需注明來源: 創新互聯

微信小程序開發
主站蜘蛛池模板: 成人无码区免费A∨直播| 久久久久久久无码高潮 | 久久久久久精品无码人妻| 亚洲精品中文字幕无码A片老| 国产精品成人无码免费| 国产成人无码一二三区视频| 精品人妻系列无码一区二区三区| 亚洲国产精品成人AV无码久久综合影院| 国产精品va无码一区二区| 无码人妻丰满熟妇区毛片18| 无码精品人妻一区二区三区人妻斩| 丰满少妇人妻无码专区| 久久精品无码一区二区WWW| 亚洲av中文无码乱人伦在线r▽| 国产成人亚洲精品无码AV大片| 久久午夜夜伦鲁鲁片免费无码影视| 精品人妻无码专区中文字幕| 无码熟熟妇丰满人妻啪啪软件 | 中文无码不卡的岛国片| 最新国产精品无码| 亚洲av无码天堂一区二区三区| 成人免费午夜无码视频| 无码人妻精品一区二区在线视频| 精品无码国产污污污免费网站国产| 久久久久久久久免费看无码 | 无码人妻精品丰满熟妇区| 无码人妻丰满熟妇区96| 无码专区—VA亚洲V天堂| 亚洲AV无码久久寂寞少妇| 亚洲AV无码精品无码麻豆| 亚洲AV无码久久寂寞少妇| 无码孕妇孕交在线观看| 无码精品A∨在线观看中文| 亚洲爆乳无码一区二区三区| 中文字幕无码免费久久| 亚洲国产精品无码av| 亚洲精品中文字幕无码AV| 亚洲成A∨人片在线观看无码| AV无码久久久久不卡蜜桃| 激情射精爆插热吻无码视频| 午夜爽喷水无码成人18禁三级|