deltalake的merge場景是怎么樣的

deltalake的merge場景是怎么樣的,很多新手對此不是很清楚,為了幫助大家解決這個難題,下面小編將為大家詳細講解,有這方面需求的人可以來學習下,希望你能有所收獲。

創新互聯公司是一家業務范圍包括IDC托管業務,虛擬空間、主機租用、主機托管,四川、重慶、廣東電信服務器租用,資陽主機托管,成都網通服務器托管,成都服務器租用,業務范圍遍及中國大陸、港澳臺以及歐美等多個國家及地區的互聯網數據服務公司。

下面主要是講merge操作的四個案例。

1.數據去重

實際上,線上業務很多時候數據源在上報數據的時候,由于各種原因可能會重復上報數據,這就會導致數據重復,使用merge函數可以避免插入重復的數據。具體操作方法如下:

sql

MERGE INTO logsUSING newDedupedLogsON logs.uniqueId = newDedupedLogs.uniqueIdWHEN NOT MATCHED  THEN INSERT *

scala

deltaTable  .as("logs")  .merge(    newDedupedLogs.as("newDedupedLogs"),    "logs.uniqueId = newDedupedLogs.uniqueId")  .whenNotMatched()  .insertAll()  .execute()

注意:需要寫入delta lake表的dataset自身要完成去重的 操作。我們可以通過merge語義區實現新數據和delta lake表中已有的數據之間去重,但是如果新的dataset內部有重復數據,重復數據依然會被插入。因此在寫入新數據之前一定要完成去重操作。

如果數據確定可能會在某些時間周期內重復,那么可以對目標表進行按照時間分區,這樣就可以在merge操作的時候指定時間范圍。

sql

MERGE INTO logsUSING newDedupedLogsON logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYSWHEN NOT MATCHED AND newDedupedLogs.date > current_date() - INTERVAL 7 DAYS  THEN INSERT *

scala

deltaTable.as("logs").merge(    newDedupedLogs.as("newDedupedLogs"),    "logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS")  .whenNotMatched("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS")  .insertAll()  .execute()

這種利用分區進行謂詞下推,可以大幅減少數據加載的量,進而提升速度。此外,對于Structured Streaming可以使用insert-only merge操作來實現連續不斷的去重操作。主要有以下場景:

a.對于一些streaming操作,可以在foreachBatch操作來實現連續不斷的將數據寫入delta lake表,同時具有去重的功能。

b.對于另一些流查詢,你可以連續不斷的從delta lake表中讀取去重的數據。可以這么做的原因是insert-only merge操作僅僅會追加新的數據到delta lake表中。

2.漸變緯度數據

另一個常見的操作是SCD Type 2,它維護對維表中每個key所做的所有變更的歷史記錄。此類操作需要更新現有行以將key的先前值標記為舊值,并插入新行作為最新值。給定具有更新的源表和具有維度數據的目標表,可以使用merge表達SCD type 2。

維護客戶地址歷史記錄以及每個地址的有效日期范圍,是本小節常見的示例操作。當需要更新客戶的地址時,必須將先前的地址標記為不是當前地址,更新其有效日期范圍,然后將新地址添加為當前地址。scala的表達方法如下:

val customersTable: DeltaTable = ...   // table with schema (customerId, address, current, effectiveDate, endDate)
val updatesDF: DataFrame = ...          // DataFrame with schema (customerId, address, effectiveDate)
// Rows to INSERT new addresses of existing customersval newAddressesToInsert = updatesDF  .as("updates")  .join(customersTable.toDF.as("customers"), "customerid")  .where("customers.current = true AND updates.address <> customers.address")
// Stage the update by unioning two sets of rows// 1. Rows that will be inserted in the whenNotMatched clause// 2. Rows that will either update the current addresses of existing customers or insert the new addresses of new customersval stagedUpdates = newAddressesToInsert  .selectExpr("NULL as mergeKey", "updates.*")   // Rows for 1.  .union(    updatesDF.selectExpr("updates.customerId as mergeKey", "*")  // Rows for 2.  )
// Apply SCD Type 2 operation using mergecustomersTable  .as("customers")  .merge(    stagedUpdates.as("staged_updates"),    "customers.customerId = mergeKey")  .whenMatched("customers.current = true AND customers.address <> staged_updates.address")  .updateExpr(Map(                                      // Set current to false and endDate to source's effective date.    "current" -> "false",    "endDate" -> "staged_updates.effectiveDate"))  .whenNotMatched()  .insertExpr(Map(    "customerid" -> "staged_updates.customerId",    "address" -> "staged_updates.address",    "current" -> "true",    "effectiveDate" -> "staged_updates.effectiveDate",  // Set current to true along with the new address and its effective date.    "endDate" -> "null"))  .execute()

3.cdc操作

和scd類似,另一個常見的案例是變化數據捕獲,也即是常說的CDC,簡單來說就是同步外部數據庫的變更數據到deta lake。換句話說,對于外部數據庫的 update,delete,insert操作,要同時作用于delta 表。這種情況,也可以使用merge操作來實現。

val deltaTable: DeltaTable = ... // DeltaTable with schema (key, value)// DataFrame with changes having following columns// - key: key of the change// - time: time of change for ordering between changes (can replaced by other ordering id)// - newValue: updated or inserted value if key was not deleted// - deleted: true if the key was deleted, false if the key was inserted or updatedval changesDF: DataFrame = ...// Find the latest change for each key based on the timestamp// Note: For nested structs, max on struct is computed as// max on first struct field, if equal fall back to second fields, and so on.val latestChangeForEachKey = changesDF  .selectExpr("key", "struct(time, newValue, deleted) as otherCols" )  .groupBy("key")  .agg(max("otherCols").as("latest"))  .selectExpr("key", "latest.*")deltaTable.as("t")  .merge(    latestChangeForEachKey.as("s"),    "s.key = t.key")  .whenMatched("s.deleted = true")  .delete()  .whenMatched()  .updateExpr(Map("key" -> "s.key", "value" -> "s.newValue"))  .whenNotMatched("s.deleted = false")  .insertExpr(Map("key" -> "s.key", "value" -> "s.newValue"))  .execute()

4. 整合foreachBatch

實際上在使用delta lake的時候可以結合foreachBatch和merge,來實現復雜的流查詢到delta lake表的upsert功能。總共有以下幾個場景:

a.在update模式下寫流聚合結果到delta lake。這種情況,實際上比Complete模式更加高效。

import io.delta.tables.*
val deltaTable = DeltaTable.forPath(spark, "/data/aggregates")
// Function to upsert microBatchOutputDF into Delta table using mergedef upsertToDelta(microBatchOutputDF: DataFrame, batchId: Long) {  deltaTable.as("t")    .merge(      microBatchOutputDF.as("s"),      "s.key = t.key")    .whenMatched().updateAll()    .whenNotMatched().insertAll()    .execute()}
// Write the output of a streaming aggregation query into Delta tablestreamingAggregatesDF.writeStream  .format("delta")  .foreachBatch(upsertToDelta _)  .outputMode("update")  .start()

b.將數據庫變更操作同步到delta lake。該場景就是寫變化數據到delta lake,也即是本問第三小節。

c.流數據以去重的方式寫入delta lake。這個就是本文第一小節。

注意:

確保foreachBatch中的merge語句是冪等的,因為重新啟動流查詢可以將對該操作對同一批數據重復執行。

當在foreachBatch中使用merge時,流查詢的輸入數據速率可能會上報為在源處生成數據的實際速率的若干倍數。這是因為merge多次讀取輸入數據,導致輸入指標倍增。如果這是瓶頸,則可以在合并之前緩存批處理DataFrame,然后在合并之后取消緩存。

看完上述內容是否對您有幫助呢?如果還想對相關知識有進一步的了解或閱讀更多相關文章,請關注創新互聯行業資訊頻道,感謝您對創新互聯的支持。

文章名稱:deltalake的merge場景是怎么樣的
新聞來源:http://m.kartarina.com/article0/pipgoo.html

成都網站建設公司_創新互聯,為您提供品牌網站設計標簽優化建站公司域名注冊品牌網站建設移動網站建設

廣告

聲明:本網站發布的內容(圖片、視頻和文字)以用戶投稿、用戶轉載內容為主,如果涉及侵權請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網站立場,如需處理請聯系客服。電話:028-86922220;郵箱:631063699@qq.com。內容未經允許不得轉載,或轉載時需注明來源: 創新互聯

成都定制網站建設
主站蜘蛛池模板: 极品无码国模国产在线观看| 老司机亚洲精品影院无码| 亚洲中久无码永久在线观看同| 无码人妻精品一二三区免费 | 亚洲综合久久精品无码色欲| 亚洲成av人片天堂网无码】| 国产色无码精品视频国产| 无码国产午夜福利片在线观看| 亚洲av无码成人精品区一本二本 | 国产成人无码av| 人妻精品久久无码区洗澡| 国产在线无码精品电影网| 69成人免费视频无码专区| 久久人妻内射无码一区三区| 亚洲日韩VA无码中文字幕| 久久Av无码精品人妻系列| 国产午夜精品无码| 中文成人无码精品久久久不卡 | 无码丰满熟妇浪潮一区二区AV| 无码少妇一区二区三区浪潮AV | 久久久久久亚洲AV无码专区| 精品久久久久久无码国产| 亚洲国产精品无码久久九九大片| 高清无码视频直接看| 超清无码熟妇人妻AV在线电影| 国产精品爆乳奶水无码视频| 亚洲中文字幕无码爆乳app| 久久久无码精品国产一区| 日本无码色情三级播放| 中日精品无码一本二本三本| 久久中文字幕无码专区| 四虎成人精品国产永久免费无码| 亚洲AV无码成人网站在线观看| 亚洲日韩精品无码专区加勒比| 久久久久亚洲Av无码专| 18禁无遮拦无码国产在线播放| 免费无码成人AV在线播放不卡| 日韩av无码久久精品免费| 无码av免费一区二区三区| 久久久久久精品无码人妻| 亚洲另类无码专区丝袜|