- 產品
- 產品解決方案
- 行業解決方案
- 案例
- 數據資產入表
- 賦能中心
- 伙伴
- 關于
時間:2022-01-14來源:玳瑁瀏覽數:619次
目錄
一.數據湖的起源
1.1.提出背景
1.2.出現問題
1.3.解決問題
1.4.提出者說
二. 開源廠商解決方案
2.1.背景
2.2.Delta
2.2Apache Hudi
2.3.Apache Iceberg
2.4.痛點小結
三.開源廠商維度對比
3.1.ACID和隔離級別支持
3.2.Schema變更支持和設計
3.3.流批接口支持
3.4.接口抽象程度和插件化
3.5.查詢性能優化
3.6.其他功能
3.7.社區現狀
3.8.總結
3.9.開源組件差異
3.9.1.delta特性
3.9.2.Iceberg特性
3.9.3.Hudi特性
3.9.4.hive特性
3.10.數據湖選型
四.數據倉庫和數據湖
4.1.數據倉庫的痛點
4.1.1資源浪費
4.1.2字段變更
4.1.3數據結構
4.1.4字段限制
4.2數據湖的理念
4.2.1功能
4.2.2理念
4.3特性對比
五.實時數倉和數據湖
5.1.實時數倉痛點
5.1.1沒有加工邏輯
5.2.2.分層邏輯
5.2.3.合并成本高
5.2數據湖解決
六. 架構和數據湖
6.1數據倉庫架構
6.2實時數倉架構
七.大數據平臺與數據湖
八. Apache Hudi原理介紹
8.1使用場景
8.1.1近實時寫入
8.1.2近實時分析
8.1.3增量 pipeline
8.1.4增量導出
8.2概念/術語
8.2.1Timeline
8.2.2Time
8.2.3文件管理
8.2.4 File Format
8.2.5 Table 類型
8.2.6數據寫
8.2.7工具
8.2.8 Key 生成策略
8.2.9刪除策略
8.2.10數據讀
8.2.11 Compaction
8.3總結
? ? ? 當我們學一樣東西,不深究其源頭,可能永遠也掌握不了最本質的東西,那我們看看一看數據湖。數據湖最早是由Pentaho的創始人兼CTO, James Dixon于2010年10月紐約Hadoop World大會上提出來。在Pentaho剛剛發布了Hadoop的第一個版本這樣的一個大背景下,當時James Dixon提出數據湖的概念,是為了推廣自家Pentaho產品以及Hadoop生態相關組件。
? ? ? 了解Pentaho的小伙伴都知道,他是一家做BI產品的。當時的BI分析主要是基于數據市場(Data Mart)的。數據市場的建立需要事先識別出感興趣的字段、屬性,并對數據進行聚合處理。這樣BI分析面臨兩個問題:
? ? ? 1.只使用一部分屬性,這些數據只能回答預先定義好(pre-determined)的結構對應的數據分析問題。
? ? ? 2.數據集市的數據被聚合,低層級的明細數據丟失了,能回答和探索分析的問題被限制。
? ? ? ?這個時候,James Dixon提出來把所有原始數據都存在Hadoop中,后面根據需要再來取用,就可以解決這個問題,所以數據湖由此誕生。有人這樣說,如果說數據集市、數據倉庫里面是瓶裝的水,那么它就是清潔的、打包好的、擺放整齊方便取用的;如果數據湖里面就是原生態的水,那么它就是未經處理的,原汁原味的。數據湖中的水從源頭流入湖中,所有用戶都可以來湖里獲取、蒸餾提純這些水(數據),獲取想要的數據,探索數據價值。由此,并引起了國內外數據行業的小伙伴的普遍關注和研究。
? ? ? 如下原話:

? ? ? 一個數據湖不是建立在hadoop上的一個數據倉庫。它可以建立在很多存儲系統上,意味著有很多數據湖,這些數據間是可以相互關聯的。(英語好,可以訪問https://jamesdixon.wordpress.com/2014/09/25/data-lakes-revisited/)所以,這里提到,也是我們經常說的,企業有很多信息系統,將數據統一采集到數據倉庫解決數據孤島的問題。那么數據湖一提出來,我們來看看各個開源廠商的動作。
? ? ? 三大開源數據湖方案分別為:Delta、Apache Iceberg、Apache Hudi。Apache Spark背后商業公司Databricks在商業化上取得巨大成功,所以推出的Delta也顯得格外亮眼,分為企業版和開源版本。Apache Hudi是由Uber的工程師為滿足其內部數據分析的需求而設計的數據湖項目,它提供的fast upsert/delete以及compaction等功能可以說是精準命中廣大人民群眾的痛點,加上項目各成員積極地社區建設,包括技術細節分享、國內社區推廣等等,也在逐步地吸引潛在用戶的目光,開源。Apache Iceberg當前看則會顯得相對平庸一些,簡單說社區關注度暫時比不上Delta,功能也不如Hudi豐富,但卻是一個野心勃勃的項目,因為它具有高度抽象和非常優雅的設計,為成為一個通用的數據湖方案奠定了良好基礎,開源項目。其實我們更關心的是他們解決什么問題和痛點,怎么使用的,為什么會有出現這些開源的組件(特別強調:數據湖不是一個技術,而是一個理念,類似數據倉庫)。
? ? ? 以Databricks推出的delta為例,它要解決的核心問題基本上集中在下原如圖 :

? ? ? 在沒有delta數據湖之前,Databricks的客戶一般會采用經典的lambda架構來構建他們的流批處理場景。
? ? ? 以用戶點擊行為分析為例,點擊事件經Kafka被下游的Spark Streaming作業消費,分析處理(業務層面聚合等)后得到一個實時的分析結果,這個實時結果只是當前時間所看到的一個狀態,無法反應時間軸上的所有點擊事件。所以為了保存全量點擊行為,Kafka還會被另外一個Spark Batch作業分析處理,導入到文件系統上(一般就是parquet格式寫HDFS或者S3,可以認為這個文件系統是一個簡配版的數據湖),供下游的Batch作業做全量的數據分析以及AI處理等。
? ? ? 第一、批量導入到文件系統的數據一般都缺乏全局的嚴格schema規范,下游的Spark作業做分析時碰到格式混亂的數據會很麻煩,每一個分析作業都要過濾處理錯亂缺失的數據,成本較大。
? ? ? 第二、數據寫入文件系統這個過程沒有ACID保證,用戶可能讀到導入中間狀態的數據。所以上層的批處理作業為了躲開這個坑,只能調度避開數據導入時間段,可以想象這對業務方是多么不友好;同時也無法保證多次導入的快照版本,例如業務方想讀最近5次導入的數據版本,其實是做不到的。
? ? ? 第三、用戶無法高效upsert/delete歷史數據,parquet文件一旦寫入HDFS文件,要想改數據,就只能全量重新寫一份的數據,成本很高。事實上,這種需求是廣泛存在的,例如由于程序問題,導致錯誤地寫入一些數據到文件系統,現在業務方想要把這些數據糾正過來;線上的MySQL binlog不斷地導入update/delete增量更新到下游數據湖中;某些數據審查規范要求做強制數據刪除,例如歐洲出臺的GDPR隱私保護等等。
? ? ? 第四、頻繁地數據導入會在文件系統上產生大量的小文件,導致文件系統不堪重負,尤其是HDFS這種對文件數有限制的文件系統。
? ? ? 所以,在Databricks看來,以下四個點是數據湖必備的:

? ? ? 事實上, ?Databricks在設計delta時,希望做到流批作業在數據層面做到進一步的統一(如下圖)。業務數據經過Kafka導入到統一的數據湖中(無論批處理,還是流處理),上層業務可以借助各種分析引擎做進一步的商業報表分析、流式計算以及AI分析等等。
? ? ? 現在的基于數據湖的架構圖:

? ? ? 所以,總結起來,我認為databricks設計delta時主要考慮實現以下核心功能特性:

? ? ? Uber的業務場景主要為:將線上產生的行程訂單數據,同步到一個統一的數據中心,然后供上層各個城市運營同事用來做分析和處理。
? ? ? 在2014年的時候,Uber的數據湖架構相對比較簡單,業務日志經由Kafka同步到S3上,上層用EMR做數據分析;線上的關系型數據庫以及NoSQL則會通過ETL(ETL任務也會拉去一些Kakfa同步到S3的數據)任務同步到閉源的Vertica分析型數據庫,城市運營同學主要通過Vertica SQL實現數據聚合。當時也碰到數據格式混亂、系統擴展成本高(依賴收Vertica商業收費軟件)、數據回填麻煩等問題。后續遷移到開源的Hadoop生態,解決了擴展性問題等問題,但依然碰到Databricks上述的一些問題,其中最核心的問題是無法快速upsert存量數據。

? ? ? 如上圖所示,ETL任務每隔30分鐘定期地把增量更新數據同步到分析表中,全部改寫已存在的全量舊數據文件,導致數據延遲和資源消耗都很高。
? ? ? 此外,在數據湖的下游,還存在流式作業會增量地消費新寫入的數據,數據湖的流式消費對他們來說也是必備的功能。所以,他們就希望設計一種合適的數據湖方案,在解決通用數據湖需求的前提下,還能實現快速的upsert以及流式增量消費。

? ? ?Uber團隊在Hudi上同時實現了Copy On Write和Merge On Read的兩種數據格式,其中Merge On Read就是為了解決他們的fast upsert而設計的。簡單來說,就是每次把增量更新的數據都寫入到一批獨立的delta文件集,定期地通過compaction合并delta文件和存量的data文件。同時給上層分析引擎提供三種不同的讀取視角:僅讀取delta增量文件、僅讀取data文件、合并讀取delta和data文件。滿足各種業務方對數據湖的流批數據分析需求。
? ? ? 最終,我們可以提煉出Uber的數據湖需求為如下圖,這也正好是Hudi所側重的核心特性:

? ? ? Netflix的數據湖原先是借助Hive來構建,但發現Hive在設計上的諸多缺陷之后,開始轉為自研Iceberg,并最終演化成Apache下一個高度抽象通用的開源數據湖方案。Netflix用內部的一個時序數據業務的案例來說明Hive的這些問題,采用Hive時按照時間字段做partition,他們發現僅一個月會產生2688個partition和270萬個數據文件。他們執行一個簡單的select查詢,發現僅在分區裁剪階段就耗費數十分鐘。

? ? ? 他們發現Hive的元數據依賴一個外部的MySQL和HDFS文件系統,通過MySQL找到相關的parition之后,需要為每個partition去HDFS文件系統上按照分區做目錄的list操作。在文件量大的情況下,這是一個非常耗時的操作。
? ? ? 同時,由于元數據分屬MySQL和HDFS管理,寫入操作本身的原子性難以保證。即使在開啟Hive ACID情況下,仍有很多細小場景無法保證原子性。另外,Hive Metastore沒有文件級別的統計信息,這使得filter只能下推到partition級別,而無法下推到文件級別,對上層分析性能損耗無可避免。
? ? ? 最后,Hive對底層文件系統的復雜語義依賴,使得數據湖難以構建在成本更低的S3上。于是,Netflix為了解決這些痛點,設計了自己的輕量級數據湖Iceberg。在設計之初,作者們將其定位為一個通用的數據湖項目,所以在實現上做了高度的抽象。
雖然目前從功能上看不如前面兩者豐富,但由于它牢固堅實的底層設計,一旦功能補齊,將成為一個非常有潛力的開源數據湖方案。
? ? ? 總體來說,Netflix設計Iceberg的核心訴求可以歸納為如下


? ? ? 我們可以把上述三個項目針對的痛點,放到一張圖上來看。可以發現標紅的功能點,基本上是一個好的數據湖方案應該去做到的功能點。
? ? ? 在理解了上述三大方案各自設計的初衷和面向的痛點之后,接下來我們從7個維度來對比評估三大項目的差異。通常人們在考慮數據湖方案選型時,Hive ACID也是一個強有力的候選人,因為它提供了人們需要的較為完善功能集合,所以這里我們把Hive ACID納入到對比行列中。

? ? ? 這里主要解釋下,對數據湖來說三種隔離分別代表的含義:Serialization是說所有的reader和writer都必須串行執行;Write Serialization: 是說多個writer必須嚴格串行,reader和writer之間則可以同時跑;Snapshot Isolation: 是說如果多個writer寫的數據無交集,則可以并發執行;否則只能串行。Reader和writer可以同時跑。綜合起來看,Snapshot Isolation隔離級別的并發性是相對比較好的。

? ? ? ?這里有兩個對比項,一個是schema變更的支持情況,我的理解是hudi僅支持添加可選列和刪除列這種向后兼容的DDL操作,而其他方案則沒有這個限制。另外一個是數據湖是否自定義schema接口,以期跟計算引擎的schema解耦。這里iceberg是做的比較好的,抽象了自己的schema,不綁定任何計算引擎層面的schema。

? ? ? 目前Iceberg和Hive暫時不支持流式消費,不過Iceberg社區正在issue 179上開發支持。

? ? ? 這里主要從計算引擎的寫入和讀取路徑、底層存儲可插拔、文件格式四個方面來做對比。這里Iceberg是抽象程度做得最好的數據湖方案,四個方面都做了非常干凈的解耦。delta是databricks背后主推的,必須天然綁定spark;hudi的代碼跟delta類似,也是強綁定spark。
? ? ? 存儲可插拔的意思是說,是否方便遷移到其他分布式文件系統上(例如S3),這需要數據湖對文件系統API接口有最少的語義依賴,例如若數據湖的ACID強依賴文件系統rename接口原子性的話,就難以遷移到S3這樣廉價存儲上,目前來看只有Hive沒有太考慮這方面的設計;文件格式指的是在不依賴數據湖工具的情況下,是否能讀取和分析文件數據,這就要求數據湖不額外設計自己的文件格式,統一用開源的parquet和avro等格式。這里,有一個好處就是,遷移的成本很低,不會被某一個數據湖方案給綁死。

? ? ? 這里One line demo指的是,示例demo是否足夠簡單,體現了方案的易用性,Iceberg稍微復雜一點(我認為主要是Iceberg自己抽象出了schema,所以操作前需要定義好表的schema)。做得最好的其實是delta,因為它深度跟隨spark易用性的腳步。
Python支持其實是很多基于數據湖之上做機器學習的開發者會考慮的問題,可以看到Iceberg和Delta是做的很好的兩個方案。出于數據安全的考慮,Iceberg還提供了文件級別的加密解密功能,這是其他方案未曾考慮到的一個比較重要的點。

? ? ? 這里需要說明的是,Delta和Hudi兩個項目在開源社區的建設和推動方面,做的比較好。Delta的開源版和商業版本,提供了詳細的內部設計文檔,用戶非常容易理解這個方案的內部設計和核心功能,同時Databricks還提供了大量對外分享的技術視頻和演講, 甚至邀請了他們的企業用戶來分享Delta的線上經驗。
? ? ? Uber的工程師也分享了大量Hudi的技術細節和內部方案落地,研究官網的近10個PPT已經能較為輕松理解內部細節,此外國內的小伙伴們也在積極地推動社區建設,提供了官方的技術公眾號和郵件列表周報。Iceberg相對會平靜一些,社區的大部分討論都在Github的issues和pull request上,郵件列表的討論會少一點,很多有價值的技術文檔要仔細跟蹤issues和PR才能看到,這也許跟社區核心開發者的風格有關。
? ? ? 我們把三個產品(其中delta分為databricks的開源版和商業版)總結成如下圖:

? ? ? 如果用一個比喻來說明delta、iceberg、hudi、hive-acid四者差異的話,可以把四個項目比做建房子。
? ? ? 由于開源的delta是databricks閉源delta的一個簡化版本,它主要為用戶提供一個table format的技術標準,閉源版本的delta基于這個標準實現了諸多優化,這里我們主要用閉源的delta來做對比。

? ? ? Delta的房子底座相對結實,功能樓層也建得相對比較高,但這個房子其實可以說是databricks的,本質上是為了更好地壯大Spark生態,在delta上其他的計算引擎難以替換Spark的位置,尤其是寫入路徑層面。
? ? ? Iceberg的建筑基礎非常扎實,擴展到新的計算引擎或者文件系統都非常的方便,但是現在功能樓層相對低一點,目前最缺的功能就是upsert和compaction兩個,Iceberg社區正在以最高優先級推動這兩個功能的實現。
? ? ? Hudi的情況要相對不一樣,它的建筑基礎設計不如iceberg結實,舉個例子,如果要接入Flink作為Sink的話,需要把整個房子從底向上翻一遍,把接口抽象出來,同時還要考慮不影響其他功能,當然Hudi的功能樓層還是比較完善的,提供的upsert和compaction功能直接命中廣大群眾的痛點。
? ? ? Hive的房子,看起來是一棟豪宅,絕大部分功能都有,把它做為數據湖有點像靠著豪宅的一堵墻建房子,顯得相對重量級一點,另外正如Netflix上述的分析,細看這個豪宅的墻面是其實是有一些問題的。
? ? ? 根據以上對比,可能更愿意選擇Hudi,功能齊全,并且還有各大廠商巨頭使用,社區非常活躍,具體問題也可以及時的結局。長遠考慮,如果iceberg如果功能慢慢完善,會選擇這個。
? ? ? TB級數據T+1離線數據倉庫跑批失敗,重跑之后資源浪費

? ? ? 寫時模型,字段變更怎么辦,重跑耗時,要跑很長的依賴,一旦有一個表錯誤,后面所有表都會報錯。

? ? ? 數據倉庫適合,結構化存儲;

? ? ? 而非結構化的圖片,音頻,視頻,媒體等,工業物聯網等產生的數據,半結構化的數據存儲后處理非常復雜,要寫大量udf等做特殊處理。

? ? ? 數據倉庫開發經過以上流程,提需求,指標分析,模型設計,到etl開發。如果數據分析師做探索性業務分析,無法隨時獲取對應字段。


? ? ? 能夠存儲海量的原始數據, 能夠支持任意的數據格式, 有較好的分析和處理能力

? ? ? 數據湖的數據可以來自所有結構化,半結構化和非結構化數據;表結構也是在用的時候定義等等。

? ? ? 沒有加工邏輯,直接加工入庫。優點:簡單, 容易開發 缺點:沒有模型數據不能復用,浪費資源。

? ? ? 多分層,中間結果基于mq,深度加工入庫。

? ? ? 做過數據處理的都有做過這種實時數據和批處理合并,帶來的痛苦。

? ? ? ?借助數據湖的思想進行演進,多分層,所有數據落地到數據湖,進行深度加工,不再有這種kafka數據存儲量不足的問題,可以隨意獲取分析等。
? ? ? 提出數據湖和數據倉庫一體。標準主題的數據存放數據倉庫,原始結構化數據存放數據湖,有如下特點:
? ? ? 1.開放性 使用的存儲格式是開放式和標準化的(如parquet),并且為各類工具和引擎,包括機器學習和 Python/R庫,提供API,以便它們可以直接有效地訪問數據
? ? ? 2.支持從非結構化數據到結構化數據的多種數據類型
? ? ? 3.BI支持 Lakehouse可以直接在源數據上使用BI工具
? ? ? 4.支持多種工作負載 包括數據科學、機器學習以及SQL和分析
? ? ? 5.Schema enforcement and governance(模式實施和治理) 未來能更好的管理元數據,schema管理和治理,不讓數據湖變成沼澤地
? ? ? 6.事務支持 企業內部許多數據管道通常會并發讀寫數據。對ACID事務的支持確保了多方并發讀寫數據時的一致性問題
? ? ? 7.端到端流 為了構建Lakehouse,需要一個增量數據處理框架,例如Apache Hudi。
? ? ? 那么現在架構:


? ? ? 基于flink生態構建,可以進行增量消費,批流讀寫,還可以基于flinkCDC數據統計等。
? ? ? ?那么現在的大數據平臺是什么樣子呢?存儲和計算分離。各自評估各自的資源。

? ? ? 1.計算層可以有flink,spark,hive,persto等;
? ? ? 2.數據湖加速層是一個存儲和計算徹底分離的架構,增加加速層,自然的實現冷熱分離,提高讀取性能,節省遠程訪問帶寬,如開源組件alluxio;
? ? ? 3.表的格式化層,把數據文件封裝有業務含義的表,如快照,表結構,分區,等事物一致性;
? ? ? 4.存儲層,可以有很多種,如oss,hdfs,s3等等
? ? ? 減少碎片化工具的使用
? ? ? CDC 增量導入 RDBMS 數據
? ? ? 限制小文件的大小和數量
? ? ? 相對于秒級存儲 (Druid, OpenTSDB) ,節省資源
? ? ? 提供分鐘級別時效性,支撐更高效的查詢
? ? ? Hudi 作為 lib,非常輕量
? ? ? 區分 arrivetime 和 event time 處理延遲數據
? ? ? 更短的調度 interval 減少端到端延遲 (小時 -> 分鐘) => Incremental Processing
? ? ? 替代部分 Kafka 的場景,數據導出到在線服務存儲 e.g. ES

? ? ? Timeline 是 HUDI 用來管理提交(commit)的抽象,每個 commit 都綁定一個固定時間戳,分散到時間線上。在 Timeline 上,每個 commit 被抽象為一個 HoodieInstant,一個 instant 記錄了一次提交 (commit) 的行為、時間戳、和狀態。HUDI 的讀寫 API 通過 Timeline 的接口可以方便的在 commits 上進行條件篩選,對 history 和 on-going 的 commits 應用各種策略,快速篩選出需要操作的目標 commit。
? ? ? Arrival time: 數據到達 Hudi 的時間,commit time
? ? ? Event time: record 中記錄的時間

? ? ? 上圖中采用時間(小時)作為分區字段,從 10:00 開始陸續產生各種 commits,10:20 來了一條 9:00 的數據,該數據仍然可以落到 9:00 對應的分區,通過 timeline 直接消費 10:00 之后的增量更新(只消費有新 commits 的 group),那么這條延遲的數據仍然可以被消費到。

? ? ? 一個新的 base commit time 對應一個新的 FileSlice,實際就是一個新的數據版本。HUDI 通過 TableFileSystemView 抽象來管理 table 對應的文件,比如找到所有最新版本 FileSlice 中的 base file (Copy On Write Snapshot 讀)或者 base + log files(Merge On Read 讀)。通過 Timeline 和 TableFileSystemView 抽象,HUDI 實現了非常便捷和高效的表文件查找。
? ? ? Hoodie 的每個 FileSlice 中包含一個 base file (merge on read 模式可能沒有)和多個 log file (copy on write 模式沒有)。每個文件的文件名都帶有其歸屬的 FileID(即 FileGroup Identifier)和 base commit time(即 InstanceTime)。通過文件名的 group id 組織 FileGroup 的 logical 關系;通過文件名的 base commit time 組織 FileSlice 的邏輯關系。HUDI 的 base file (parquet 文件) 在 footer 的 meta 去記錄了 record key 組成的 BloomFilter,用于在 file based index 的實現中實現高效率的 key contains 檢測。只有不在 BloomFilter 的 key 才需要掃描整個文件消滅假陽。HUDI 的 log (avro 文件)是自己編碼的,通過積攢數據 buffer 以 LogBlock 為單位寫出,每個 LogBlock 包含 magic number、size、content、footer 等信息,用于數據讀、校驗和過濾。

? ? ? Hoodie key (record key + partition path) 和 file id (FileGroup) 之間的映射關系,數據第一次寫入文件后保持不變,所以,一個 FileGroup 包含了一批 record 的所有版本記錄。Index 用于區分消息是 INSERT 還是 UPDATE。
? ? ? BloomFilter Index
? ? ? 新增 records 找到映射關系:record key => target partition
? ? ? 當前最新的數據 找到映射關系:partition => (fileID, minRecordKey, maxRecordKey) LIST (如果是 base files 可加速)
? ? ? 新增 records 找到需要搜索的映射關系:fileID => HoodieKey(record key + partition path) LIST,key 是候選的 fileID
? ? ? 通過 HoodieKeyLookupHandle 查找目標文件(通過 BloomFilter 加速)
? ? ? Flink State-based Index? ? ? HUDI 在 0.8.0 版本中實現的 Flink witer,采用了 Flink 的 state 作為底層的 index 存儲,每個 records 在寫入之前都會先計算目標 bucket ID,不同于 BloomFilter Index,避免了每次重復的文件 index 查找。
|
Table Type |
Supported Query types |
|
Copy On Write |
Snapshot Queries + Incremental Queries |
|
Merge On Read |
Snapshot Queries + Incremental Queries + Read Optimized Queries |
? ? ? Copy On Write 類型表每次寫入都會生成一個新的持有 base file
? ? ? (對應寫入的 instant time ) 的 FileSlice。
? ? ? 用戶在 snapshot 讀取的時候會掃描所有最新的 FileSlice 下的 base file。

? ? ? Merge On Read 表的寫入行為,依據 index 的不同會有細微的差別:
? ? ? 對于 BloomFilter 這種無法對 log file 生成 index 的索引方案,對于 INSERT 消息仍然會寫 base file (parquet format),只有 UPDATE 消息會 append log 文件(因為 base file 總已經記錄了該 UPDATE 消息的 FileGroup ID)。
? ? ? 對于可以對 log file 生成 index 的索引方案,例如 Flink writer 中基于 state 的索引,每次寫入都是 log format,并且會不斷追加和 roll over。
? ? ? Merge On Read 表的讀在 READ OPTIMIZED 模式下,只會讀最近的經過 compaction 的 commit。

? ? ? 8.2.6.1寫操作
? ? ? 1.UPSERT:默認行為,數據先通過 index 打標(INSERT/UPDATE),有一些啟發式算法決定消息的組織以優化文件的大小 => CDC 導入
? ? ? 2.INSERT:跳過 index,寫入效率更高 => Log Deduplication
? ? ? 1.BULK_INSERT:寫排序,對大數據量的 Hudi 表初始化友好,對文件大小的限制 best effort(寫 HFile)
? ? ? 8.2.6.2寫流程(UPSERT)?
? ? ??Copy On Write
? ? ? 先對 records 按照 record key 去重
? ? ? 首先對這批數據創建索引 (HoodieKey => HoodieRecordLocation);通過索引區分哪些 records 是 update,哪些 records 是 insert(key 第一次寫入)
? ? ? 對于 update 消息,會直接找到對應 key 所在的最新 FileSlice 的 base 文件,并做 merge 后寫新的 base file (新的 FileSlice)
? ? ? 對于 insert 消息,會掃描當前 partition 的所有 SmallFile(小于一定大小的 base file),然后 merge 寫新的 FileSlice;如果沒有 SmallFile,直接寫新的 FileGroup + FileSlice
? ? ?Merge On Read? ? ? 先對 records 按照 record key 去重(可選)
? ? ? 首先對這批數據創建索引 (HoodieKey => HoodieRecordLocation);通過索引區分哪些 records 是 update,哪些 records 是 insert(key 第一次寫入)
? ? ? 如果是 insert 消息,如果 log file 不可建索引(默認),會嘗試 merge 分區內最小的 base file (不包含 log file 的 FileSlice),生成新的 FileSlice;如果沒有 base file 就新寫一個 FileGroup + FileSlice + base file;如果 log file 可建索引,嘗試 append 小的 log file,如果沒有就新寫一個 FileGroup
? ? ?+ FileSlice + base file
? ? ? 如果是 update 消息,寫對應的 file group + file slice,直接 append 最新的 log file(如果碰巧是當前最小的小文件,會 merge base file,生成新的 file slice)
? ? ? ?log file 大小達到閾值會 roll over 一個新的
? ? ? 8.2.6.3寫流程(INSERT)
? ? ??Copy On Write
? ? ? ?先對 records 按照 record key 去重(可選)
? ? ? ?不會創建 Index
? ? ? ?如果有小的 base file 文件,merge base file,生成新的 FileSlice + base file,否則直接寫新的 FileSlice + base file
? ? ? Merge On Read? ? ? 先對 records 按照 record key 去重(可選)
? ? ? 不會創建 Index
? ? ? 如果 log file 可索引,并且有小的 FileSlice,嘗試追加或寫最新的 log file;如果 log file 不可索引,寫一個新的 FileSlice + base file
? ? ? DeltaStreamer
? ? ? Datasource Writer
? ? ? Flink SQL API
? ? ? 用來生成 HoodieKey(record key + partition path),目前支持以下策略:
? ? ? 邏輯刪:將 value 字段全部標記為 null
? ? ? ?物理刪:
? ? ? 1.通過?OPERATION_OPT_KEY? 刪除所有的輸入記錄
? ? ? 2.配置?PAYLOAD_CLASS_OPT_KEY = org.apache.hudi.EmptyHoodieRecordPayload?刪除所有的輸入記錄
? ? ? 3.在輸入記錄添加字段:_hoodie_is_deleted
? ? ? 8.2.10.1 Snapshot 讀
? ? ? 讀取所有 partiiton 下每個 FileGroup 最新的 FileSlice 中的文件,Copy On Write 表讀 parquet 文件,Merge On Read 表讀 parquet + log 文件
? ? ? 8.2.10.2 Incremantal 讀:? ? ? 當前的 Spark data source 可以指定消費的起始和結束 commit 時間,讀取 commit 增量的數據集。但是內部的實現不夠高效:拉取每個 commit 的全部目標文件再按照系統字段 _hoodie_commit_time_ apply 過濾條件。
? ? ? 8.2.10.3Streaming 讀? ? ? 0.8.0 版本的 HUDI Flink writer 支持實時的增量訂閱,可用于同步 CDC 數據,日常的數據同步 ETL pipeline。Flink 的 streaming 讀做到了真正的流式讀取,source 定期監控新增的改動文件,將讀取任務下派給讀 task。
? ? ? 沒有 base file:走 copy on write insert 流程,直接 merge 所有的 log file 并寫 base file
? ? ? 有 base file:走 copy on write upsert 流程,先讀 log file 建 index,再讀 base file,最后讀 log file 寫新的 base file
? ? ? Flink 和 Spark streaming 的 writer 都可以 apply 異步的 compaction 策略,按照間隔 commits 數或者時間來觸發 compaction 任務,在獨立的 pipeline 中執行。
? ? ? ?通過對寫流程的梳理我們了解到 Apache Hudi 相對于其他數據湖方案的核心
? ? ? 優勢:寫入過程充分優化了文件存儲的小文件問題,Copy On Write 寫會一直將一個 bucket (FileGroup)的 base 文件寫到設定的閾值大小才會劃分新的 bucket;Merge On Read 寫在同一個 bucket 中,log file 也是一直 append 直到大小超過設定的閾值 roll over。對 UPDATE 和 DELETE 的支持非常高效,一條 record 的整個生命周期操作都發生在同一個 bucket,不僅減少小文件數量,也提升了數據讀取的效率(不必要的 join 和 merge)。0.8.0 的 HUDI Flink 支持了 streaming 消費 HUDI 表,在后續版本還會支持 watermark 機制,讓 HUDI Flink 承擔 streaming ETL pipeline 的中間層,成為數據湖/倉建設中流批一體的中間計算層。