彭友們好,我是老彭啊。Hudi是現(xiàn)在非常熱門的數(shù)據(jù)湖開源方案,非常適合于搭建一個(gè)數(shù)據(jù)湖平臺(tái)。有些人認(rèn)為數(shù)據(jù)湖肯定與大數(shù)據(jù)技術(shù)體系完全不一樣,是兩個(gè)東西,甚至認(rèn)為他倆沒關(guān)系。但是,你知道Hudi的全稱叫啥么?就是“Hadoop Updates and Incrementals”

簡(jiǎn)單來說,就是基于Hadoop生態(tài),支持HDFS的數(shù)據(jù)刪除和增量更新的技術(shù)框架。所以,Apache Hudi其實(shí)本就是從Hadoop生態(tài)里來的,依賴 HDFS 做底層的存儲(chǔ),所以可以支撐非常大規(guī)模的數(shù)據(jù)存儲(chǔ)。同時(shí)基于update和Incrementals兩個(gè)原語(yǔ)解決流批一體的存儲(chǔ)問題:
Update/Delete 記錄:Hudi 支持更新/刪除記錄,使用文件/記錄級(jí)別索引,同時(shí)對(duì)寫操作提供事務(wù)保證。查詢可獲取最新提交的快照來產(chǎn)生結(jié)果。
變更流:支持增量獲取表中所有更新/插入/刪除的記錄,從指定時(shí)間點(diǎn)開始進(jìn)行增量查詢,可以實(shí)現(xiàn)類似 Kafka 的增量消費(fèi)機(jī)制。
Hudi設(shè)計(jì)原則 流式讀/寫:Hudi借鑒了數(shù)據(jù)庫(kù)設(shè)計(jì)的原理,從零設(shè)計(jì),應(yīng)用于大型數(shù)據(jù)集記錄流的輸入和輸出。為此,Hudi提供了索引實(shí)現(xiàn),可以將記錄的鍵快速映射到其所在的文件位置。同樣,對(duì)于流式輸出數(shù)據(jù),Hudi通過其特殊列添加并跟蹤記錄級(jí)的元數(shù)據(jù),從而可以提供所有發(fā)生變更的精確增量流。
自管理:Hudi注意到用戶可能對(duì)數(shù)據(jù)新鮮度(寫友好)與查詢性能(讀/查詢友好)有不同的期望,它支持了三種查詢類型,這些類型提供實(shí)時(shí)快照,增量流以及稍早的純列數(shù)據(jù)。在每一步,Hudi都努力做到自我管理(例如自動(dòng)優(yōu)化編寫程序的并行性,保持文件大小)和自我修復(fù)(例如:自動(dòng)回滾失敗的提交),即使這樣做會(huì)稍微增加運(yùn)行時(shí)成本(例如:在內(nèi)存中緩存輸入數(shù)據(jù)已分析工作負(fù)載)。如果沒有這些內(nèi)置的操作杠桿/自我管理功能,這些大型流水線的運(yùn)營(yíng)成本通常會(huì)翻倍。
萬(wàn)物皆日志:Hudi還具有 append only、云數(shù)據(jù)友好的設(shè)計(jì),該設(shè)計(jì)實(shí)現(xiàn)了日志結(jié)構(gòu)化存儲(chǔ)系統(tǒng)的原理,可以無縫管理所有云提供商的數(shù)據(jù)。
鍵-值數(shù)據(jù)模型:在寫方面,Hudi表被建模為鍵值對(duì)數(shù)據(jù)集,其中每條記錄都有一個(gè)唯一的記錄鍵。此外,一個(gè)記錄鍵還可以包括分區(qū)路徑,在該路徑下,可以對(duì)記錄進(jìn)行分區(qū)和存儲(chǔ)。這通常有助于減少索引查詢的搜索空間。
Hudi表設(shè)計(jì)
Hudi表的三個(gè)主要組件:
有序的時(shí)間軸元數(shù)據(jù):類似于數(shù)據(jù)庫(kù)事務(wù)日志。
分層布局的數(shù)據(jù)文件:實(shí)際寫入表中的數(shù)據(jù)。
索引(多種實(shí)現(xiàn)方式):映射包含指定記錄的數(shù)據(jù)集。
另外,針對(duì)數(shù)據(jù)的寫入和查詢,Hudi提供一些非常重要的功能例如upsert、mvvc等。
時(shí)間軸TimeLine
Timeline 是 HUDI 用來管理提交(commit)的抽象,每個(gè) commit 都綁定一個(gè)固定時(shí)間戳,分散到時(shí)間線上。在 Timeline 上,每個(gè) commit 被抽象為一個(gè) HoodieInstant,一個(gè) instant 記錄了一次提交 (commit) 的行為、時(shí)間戳、和狀態(tài)。HUDI 的讀寫 API 通過 Timeline 的接口可以方便的在 commits 上進(jìn)行條件篩選,對(duì) history 和 on-going 的 commits 應(yīng)用各種策略,快速篩選出需要操作的目標(biāo) commit。
如圖所示:

Hudi維護(hù)了一條包含在不同的即時(shí)時(shí)間(instant time)對(duì)數(shù)據(jù)集做的所有instant操作的timeline,從而提供表的即時(shí)視圖,同時(shí)還有效支持按到達(dá)順序進(jìn)行數(shù)據(jù)檢索。時(shí)間軸類似于數(shù)據(jù)庫(kù)的redo/transaction日志,由一組時(shí)間軸實(shí)例組成。Hudi保證在時(shí)間軸上執(zhí)行的操作的原子性和基于即時(shí)時(shí)間的時(shí)間軸一致性。時(shí)間軸被實(shí)現(xiàn)為表基礎(chǔ)路徑下.hoodie元數(shù)據(jù)文件夾下的一組文件。具體來說,最新的instant被保存為單個(gè)文件,而較舊的instant被存檔到時(shí)間軸歸檔文件夾中,以限制writers和queries列出的文件數(shù)量。
一個(gè)Hudi 時(shí)間軸instant由下面幾個(gè)組件構(gòu)成:
操作類型:對(duì)數(shù)據(jù)集執(zhí)行的操作類型;
即時(shí)時(shí)間:即時(shí)時(shí)間通常是一個(gè)時(shí)間戳(例如:20190117010349),該時(shí)間戳按操作開始時(shí)間的順序單調(diào)增加;
即時(shí)狀態(tài):instant的當(dāng)前狀態(tài);每個(gè)instant都有avro或者json格式的元數(shù)據(jù)信息,詳細(xì)的描述了該操作的狀態(tài)以及這個(gè)即時(shí)時(shí)刻instant的狀態(tài)。
關(guān)鍵的Instant操作類型有:
COMMIT:一次提交表示將一組記錄原子寫入到數(shù)據(jù)集中;
CLEAN: 刪除數(shù)據(jù)集中不再需要的舊文件版本的后臺(tái)活動(dòng);
DELTA_COMMIT:將一批記錄原子寫入到MergeOnRead存儲(chǔ)類型的數(shù)據(jù)集中,其中一些/所有數(shù)據(jù)都可以只寫到增量日志中;
COMPACTION: 協(xié)調(diào)Hudi中差異數(shù)據(jù)結(jié)構(gòu)的后臺(tái)活動(dòng),例如:將更新從基于行的日志文件變成列格式。在內(nèi)部,壓縮表現(xiàn)為時(shí)間軸上的特殊提交;
ROLLBACK: 表示提交/增量提交不成功且已回滾,刪除在寫入過程中產(chǎn)生的所有部分文件;
SAVEPOINT: 將某些文件組標(biāo)記為"已保存",以便清理程序不會(huì)將其刪除。在發(fā)生災(zāi)難/數(shù)據(jù)恢復(fù)的情況下,它有助于將數(shù)據(jù)集還原到時(shí)間軸上的某個(gè)點(diǎn);
任何給定的即時(shí)都會(huì)處于以下狀態(tài)之一:
REQUESTED:表示已調(diào)度但尚未初始化;
INFLIGHT: 表示當(dāng)前正在執(zhí)行該操作;
COMPLETED: 表示在時(shí)間軸上完成了該操作.
數(shù)據(jù)文件

Hudi將表組織成DFS上基本路徑下的文件夾結(jié)構(gòu)中。如果表是分區(qū)的,則在基本路徑下還會(huì)有其他的分區(qū),這些分區(qū)是包含該分區(qū)數(shù)據(jù)的文件夾,與Hive表非常類似。每個(gè)分區(qū)均由相對(duì)于基本路徑的分區(qū)路徑唯一標(biāo)識(shí)。在每個(gè)分區(qū)內(nèi),文件被組織成文件組,由文件ID唯一標(biāo)識(shí)。其中每個(gè)切片包含在某個(gè)提交/壓縮即時(shí)時(shí)間生成的基本列文件(.parquet)以及一組日志文件(.log*),該文件包含自生成基本文件以來對(duì)基本文件的插入/更新。Hudi采用了MVCC設(shè)計(jì),壓縮操作會(huì)將日志和基本文件合并以產(chǎn)生新的文件片,而清理操作則將未使用的/較舊的文件片刪除以回收HDFS上的空間。
下圖展示了一個(gè)分區(qū)內(nèi)的文件結(jié)構(gòu):

文件版本
一個(gè)新的 base commit time 對(duì)應(yīng)一個(gè)新的 FileSlice,實(shí)際就是一個(gè)新的數(shù)據(jù)版本。HUDI 通過 TableFileSystemView 抽象來管理 table 對(duì)應(yīng)的文件,比如找到所有最新版本 FileSlice 中的 base file (Copy On Write Snapshot 讀)或者 base + log files(Merge On Read 讀)。通過 Timeline 和 TableFileSystemView 抽象,HUDI 實(shí)現(xiàn)了非常便捷和高效的表文件查找。
文件格式
Hoodie 的每個(gè) FileSlice 中包含一個(gè) base file (merge on read 模式可能沒有)和多個(gè) log file (copy on write 模式?jīng)]有)。
每個(gè)文件的文件名都帶有其歸屬的 FileID(即 FileGroup Identifier)和 base commit time(即 InstanceTime)。通過文件名的 group id 組織 FileGroup 的 logical 關(guān)系;通過文件名的 base commit time 組織 FileSlice 的邏輯關(guān)系。
HUDI 的 base file (parquet 文件) 在 footer 的 meta 去記錄了 record key 組成的 BloomFilter,用于在 file based index 的實(shí)現(xiàn)中實(shí)現(xiàn)高效率的 key contains 檢測(cè)。只有不在 BloomFilter 的 key 才需要掃描整個(gè)文件消滅假陽(yáng)。
HUDI 的 log (avro 文件)是自己編碼的,通過積攢數(shù)據(jù) buffer 以 LogBlock 為單位寫出,每個(gè) LogBlock 包含 magic number、size、content、footer 等信息,用于數(shù)據(jù)讀、校驗(yàn)和過濾。
索引設(shè)計(jì)
Hudi通過索引機(jī)制提供高效的upsert操作,該機(jī)制會(huì)將一個(gè)記錄鍵+分區(qū)路徑組合一致性的映射到一個(gè)文件ID.這個(gè)記錄鍵和文件組/文件ID之間的映射自記錄被寫入文件組開始就不會(huì)再改變。簡(jiǎn)而言之,這個(gè)映射文件組包含了一組文件的所有版本。Hudi當(dāng)前提供了3種索引實(shí)現(xiàn)(HBaseIndex、HoodieBloomIndex(HoodieGlobalBloomIndex)、InMemoryHashIndex)來映射一個(gè)記錄鍵到包含該記錄的文件ID。這將使我們無需掃描表中的每條記錄,就可顯著提高upsert速度。
Hudi索引可以根據(jù)其查詢分區(qū)記錄的能力進(jìn)行分類:
1. 全局索引:不需要分區(qū)信息即可查詢記錄鍵映射的文件ID。比如,寫程序可以傳入null或者任何字符串作為分區(qū)路徑(partitionPath),但索引仍然會(huì)查找到該記錄的位置。全局索引在記錄鍵在整張表中保證唯一的情況下非常有用,但是查詢的消耗隨著表的大小呈函數(shù)式增加。
2. 非全局索引:與全局索引不同,非全局索引依賴分區(qū)路徑(partitionPath),對(duì)于給定的記錄鍵,它只會(huì)在給定分區(qū)路徑下查找該記錄。這比較適合總是同時(shí)生成分區(qū)路徑和記錄鍵的場(chǎng)景,同時(shí)還能享受到更好的擴(kuò)展性,因?yàn)椴樵兯饕南闹慌c寫入到該分區(qū)下數(shù)據(jù)集大小有關(guān)系。
表類型
Copy On Write
COW表寫的時(shí)候數(shù)據(jù)直接寫入basefile,(parquet)不寫log文件。所以COW表的文件片只包含basefile(一個(gè)parquet文件構(gòu)成一個(gè)文件片)。這種的存儲(chǔ)方式的Spark DAG相對(duì)簡(jiǎn)單。關(guān)鍵目標(biāo)是是使用partitioner將tagged Hudi記錄RDD(所謂的tagged是指已經(jīng)通過索引查詢,標(biāo)記每條輸入記錄在表中的位置)分成一些列的updates和inserts.為了維護(hù)文件大小,我們先對(duì)輸入進(jìn)行采樣,獲得一個(gè)工作負(fù)載profile,這個(gè)profile記錄了輸入記錄的insert和update、以及在分區(qū)中的分布等信息。把數(shù)據(jù)從新打包,這樣:
對(duì)于updates,該文件ID的最新版本都將被重寫一次,并對(duì)所有已更改的記錄使用新值。
對(duì)于inserts,記錄首先打包到每個(gè)分區(qū)路徑中的最小文件中,直到達(dá)到配置的最大大小。之后的所有剩余記錄將再次打包到新的文件組,新的文件組也會(huì)滿足最大文件大小要求。

Copy On Write 類型表每次寫入都會(huì)生成一個(gè)新的持有base file(對(duì)應(yīng)寫入的 instant time)的 FileSlice。
用戶在snapshot讀取的時(shí)候會(huì)掃描所有最新的FileSlice下的base file。
Merge On Read
MOR表寫數(shù)據(jù)時(shí),記錄首先會(huì)被快速的寫進(jìn)日志文件,稍后會(huì)使用時(shí)間軸上的壓縮操作將其與基礎(chǔ)文件合并。根據(jù)查詢是讀取日志中的合并快照流還是變更流,還是僅讀取未合并的基礎(chǔ)文件,MOR表支持多種查詢類型。在高層次上,MOR writer在讀取數(shù)據(jù)時(shí)會(huì)經(jīng)歷與COW writer 相同的階段。這些更新將追加到最新文件篇的最新日志文件中,而不會(huì)合并。對(duì)于insert,Hudi支持兩種模式:
插入到日志文件:有可索引日志文件的表會(huì)執(zhí)行此操作(HBase索引);
插入parquet文件:沒有索引文件的表(例如布隆索引)
與寫時(shí)復(fù)制(COW)一樣,對(duì)已標(biāo)記位置的輸入記錄進(jìn)行分區(qū),將所有發(fā)往相同文件id的upsert分到一組。這批upsert會(huì)作為一個(gè)或多個(gè)日志塊寫入日志文件。Hudi允許客戶端控制日志文件大小。對(duì)于寫時(shí)復(fù)制(COW)和讀時(shí)合并(MOR)writer來說,Hudi的WriteClient是相同的。幾輪數(shù)據(jù)的寫入將會(huì)累積一個(gè)或多個(gè)日志文件。這些日志文件與基本的parquet文件(如果有)一起構(gòu)成一個(gè)文件片,而這個(gè)文件片代表該文件的一個(gè)完整版本。
這種表是用途最廣、最高級(jí)的表。為寫(可以指定不同的壓縮策略,吸收突發(fā)寫流量)和查詢(例如權(quán)衡數(shù)據(jù)的時(shí)效性和查詢性能)提供了很大的靈活性。
Merge On Read 表的寫入行為,依據(jù) index 的不同會(huì)有細(xì)微的差別:
對(duì)于 BloomFilter 這種無法對(duì) log file 生成 index 的索引方案,對(duì)于 INSERT 消息仍然會(huì)寫 base file (parquet format),只有 UPDATE 消息會(huì) append log 文件(因?yàn)?base file 已經(jīng)記錄了該 UPDATE 消息的 FileGroup ID)。
對(duì)于可以對(duì) log file 生成 index 的索引方案,例如 Flink writer 中基于 state 的索引,每次寫入都是 log format,并且會(huì)不斷追加和 roll over。
Merge On Read 表的讀在 READ OPTIMIZED 模式下,只會(huì)讀最近的經(jīng)過 compaction 的 commit。

數(shù)據(jù)讀寫流程 讀流程
Snapshot讀
讀取所有 partiiton 下每個(gè) FileGroup 最新的 FileSlice 中的文件,Copy On Write 表讀 parquet 文件,Merge On Read 表讀 parquet + log 文件
Incremantal讀
根據(jù)https://hudi.apache.org/docs/querying_data.html#spark-incr-query描述,當(dāng)前的 Spark data source 可以指定消費(fèi)的起始和結(jié)束 commit 時(shí)間,讀取 commit 增量的數(shù)據(jù)集。但是內(nèi)部的實(shí)現(xiàn)不夠高效:拉取每個(gè) commit 的全部目標(biāo)文件再按照系統(tǒng)字段 hoodie_commit_time apply 過濾條件。
Streaming 讀
HUDI Flink writer 支持實(shí)時(shí)的增量訂閱,可用于同步 CDC 數(shù)據(jù),日常的數(shù)據(jù)同步 ETL pipeline。Flink 的 streaming 讀做到了真正的流式讀取,source 定期監(jiān)控新增的改動(dòng)文件,將讀取任務(wù)下派給讀 task。
寫流程
寫操作
UPSERT:默認(rèn)行為,數(shù)據(jù)先通過 index 打標(biāo)(INSERT/UPDATE),有一些啟發(fā)式算法決定消息的組織以優(yōu)化文件的大小 => CDC 導(dǎo)入
INSERT:跳過 index,寫入效率更高 => Log Deduplication
BULK_INSERT:寫排序,對(duì)大數(shù)據(jù)量的 Hudi 表初始化友好,對(duì)文件大小的限制 best effort(寫 HFile)
寫流程(UPSERT)
Copy On Write
先對(duì) records 按照 record key 去重
首先對(duì)這批數(shù)據(jù)創(chuàng)建索引 (HoodieKey => HoodieRecordLocation);通過索引區(qū)分哪些 records 是 update,哪些 records 是 insert(key 第一次寫入)
對(duì)于 update 消息,會(huì)直接找到對(duì)應(yīng) key 所在的最新 FileSlice 的 base 文件,并做 merge 后寫新的 base file (新的 FileSlice)
對(duì)于 insert 消息,會(huì)掃描當(dāng)前 partition 的所有 SmallFile(小于一定大小的 base file),然后 merge 寫新的 FileSlice;如果沒有 SmallFile,直接寫新的 FileGroup + FileSlice
Merge On Read
先對(duì) records 按照 record key 去重(可選)
首先對(duì)這批數(shù)據(jù)創(chuàng)建索引 (HoodieKey => HoodieRecordLocation);通過索引區(qū)分哪些 records 是 update,哪些 records 是 insert(key 第一次寫入)
如果是 insert 消息,如果 log file 不可建索引(默認(rèn)),會(huì)嘗試 merge 分區(qū)內(nèi)最小的 base file (不包含 log file 的 FileSlice),生成新的 FileSlice;如果沒有 base file 就新寫一個(gè) FileGroup + FileSlice + base file;如果 log file 可建索引,嘗試 append 小的 log file,如果沒有就新寫一個(gè) FileGroup + FileSlice + base file
如果是 update 消息,寫對(duì)應(yīng)的 file group + file slice,直接 append 最新的 log file(如果碰巧是當(dāng)前最小的小文件,會(huì) merge base file,生成新的 file slice)log file 大小達(dá)到閾值會(huì) roll over 一個(gè)新的
寫流程(INSERT)
Copy On Write
先對(duì) records 按照 record key 去重(可選)
不會(huì)創(chuàng)建 Index
如果有小的 base file 文件,merge base file,生成新的 FileSlice + base file,否則直接寫新的 FileSlice + base file
Merge On Read
先對(duì) records 按照 record key 去重(可選)
不會(huì)創(chuàng)建 Index
如果 log file 可索引,并且有小的 FileSlice,嘗試追加或?qū)懽钚碌?log file;如果 log file 不可索引,寫一個(gè)新的 FileSlice + base file。
總結(jié)
主要是我個(gè)人收集和翻閱Hudi社區(qū)的一些資料過程中的總結(jié)。目前Hudi版本到了0.11版本。細(xì)節(jié)上可能有所差異,以社區(qū)為準(zhǔn)。
(部分內(nèi)容來源網(wǎng)絡(luò),如有侵權(quán)請(qǐng)聯(lián)系刪除)