分享嘉賓:陳玉兆 阿里巴巴 技術(shù)專家編輯整理:許友昌 中控集團(tuán)出品平臺(tái):DataFunTalk導(dǎo)讀:大家好,我是來自阿里巴巴計(jì)算平臺(tái)事業(yè)部 SQL 引擎組的玉兆,我們團(tuán)隊(duì)之前主要負(fù)責(zé) Apache Flink sql 模塊的開發(fā),過去半年我的主要工作是 Flink 與 Hudi 的集成,借此機(jī)會(huì)跟大家分享一下 Flink 與 Hudi 的集成工作,Hudi在數(shù)據(jù)湖方面的發(fā)展方向。今天的介紹包括以下幾大方面內(nèi)容:
數(shù)倉到數(shù)據(jù)湖
數(shù)據(jù)庫入倉湖
HUDI 核心
Flink HUDI Inc ETL
01數(shù)倉到數(shù)據(jù)湖
圖一 從數(shù)倉到數(shù)據(jù)湖的發(fā)展
1. 發(fā)展歷史
近兩年數(shù)據(jù)湖是一個(gè)比較火的技術(shù),從傳統(tǒng)的數(shù)倉到數(shù)據(jù)湖,在過去 5 年里架構(gòu)演變得非常迅速。在 2015 年之前提到數(shù)倉我們想到的都是一些非常專業(yè)的數(shù)據(jù)公司,像 Teradata、Vertica 做的類似 MPP 架構(gòu)的數(shù)據(jù)庫,它的模型基本是存儲(chǔ)與計(jì)算耦合在一起,format 是封閉的,后期的維護(hù)也處于比較封閉的狀態(tài),暴露給外界的接口也沒有那么豐富。2015年到2018年隨著云廠商的興起,像 EMR、Amazon、Redshift 等云上數(shù)倉,特色是將傳統(tǒng)的垂直架構(gòu)改成了分層的存儲(chǔ)計(jì)算分離的水平架構(gòu),盡量利用云上廉價(jià)存儲(chǔ)的優(yōu)勢,利用對象存儲(chǔ) s3、oss來降低成本,同時(shí)支持海量數(shù)據(jù)的計(jì)算能力。但是它們的 format 仍然是封閉的,會(huì)定制自己的 format 來做一些深度的優(yōu)化,下游查詢引擎也比較單一。從 2018 年開始到目前,伴隨著云服務(wù)的逐漸流行,數(shù)據(jù)湖技術(shù)漸漸興起。數(shù)據(jù)湖技術(shù)目前主要有 Hudi、Iceberg、Dalta Lake。
2. 為什么需要數(shù)據(jù)湖?
數(shù)據(jù)湖相比原有的數(shù)倉更加靈活,它并不是一個(gè) server,而是一個(gè)類似 table format 的概念。它定義了 table 的一些規(guī)范以及 format 的操作規(guī)范,可以操作云服務(wù)上底層的對象存儲(chǔ),所以可以和云服務(wù)很好地結(jié)合起來,下游對接的查詢引擎也非常豐富,如 presto, sparkSQL, hive等。同時(shí)它的 format 本身也是非常開放的,像列式存儲(chǔ)有 orc, parquet,行式存儲(chǔ)有 avro 這些標(biāo)準(zhǔn)的數(shù)據(jù)格式,為下游生態(tài)的對接提供了豐富的可能性。這樣以一種 table format 的形式暴露給下游,不管是運(yùn)維還是開發(fā),基本都是透明的,所以對于自建集群與開源生態(tài)來說,數(shù)據(jù)湖是很受歡迎的一種形式。
在云服務(wù)上去解決傳統(tǒng)數(shù)倉處理的業(yè)務(wù)問題,那么在數(shù)據(jù)湖上也必須要具備事務(wù)、upsert 等能力。推動(dòng)架構(gòu)的演變的是我們希望把數(shù)倉上的操作原語能夠在數(shù)據(jù)湖上支持起來,這樣湖倉一體的架構(gòu)才能支持后續(xù)業(yè)務(wù)的發(fā)展。所以數(shù)據(jù)湖需要解決的核心問題,第一是事務(wù),第二是 upsert 能力。綜合這兩塊,目前 hudi 在目前的數(shù)據(jù)湖框架里是做的最成熟的,提供的事務(wù)模型是快照級別,初步實(shí)現(xiàn)了海量數(shù)據(jù) upsert 以及事務(wù)的管理能力。
02數(shù)據(jù)庫入倉/湖
大數(shù)據(jù)領(lǐng)域的數(shù)據(jù)類型主要分兩塊,第一塊是沒有 upsert 的 append 日志流,主要記錄業(yè)務(wù)上的一些 record 或者從 log server 上發(fā)過來的數(shù)據(jù)。第二塊是業(yè)務(wù)庫發(fā)過來的數(shù)據(jù),如交易訂單,這種數(shù)據(jù)有變更有狀態(tài)。在沒有流式框架入湖之前,我們通常會(huì)使用離線批量調(diào)度的架構(gòu),每天定時(shí)取數(shù)據(jù)。比如使用的是 mysql ,就會(huì)定時(shí)去 mysql 全量地拉一次數(shù)據(jù),做一個(gè)小時(shí)級別的隊(duì)列,每個(gè)小時(shí)通過 MySQL binlog 或其他的手段采集增量數(shù)據(jù),然后做一個(gè)天級別的 merge,這樣來實(shí)現(xiàn)數(shù)據(jù)庫到數(shù)倉的同步。

圖二 數(shù)據(jù)庫數(shù)據(jù)寫入倉湖
最近興起的流批一體的架構(gòu),像debezium、canal 通過訂閱 MySQL binlog 事件的方式將增量數(shù)據(jù)近實(shí)時(shí)地導(dǎo)入數(shù)倉之中,這就要求下游數(shù)據(jù)庫本身有 upsert 語義,而 hudi 提供了這樣的能力,并且是目前做得比較成熟的,因此 hudi 可以使用這兩種途徑至少在 ODS 層進(jìn)行近實(shí)時(shí)的數(shù)據(jù)庫數(shù)據(jù)入湖:先使用debezium 采集 binlog,在使用 flink cdc connector 直接對接,flink cdc connector 具有 snapshot 再加增量消費(fèi)的能力,可以直接向下游擁有 upsert 的數(shù)據(jù)湖(如hudi)進(jìn)行同步,不需要再去接一層 kafka 就可以做到分鐘級別的入倉入湖。
不過目前的社區(qū)版的 flink cdc connector 在 snapshot 階段,會(huì) load 大量數(shù)據(jù)到內(nèi)存,所以在數(shù)據(jù)量大的情況下會(huì)有一些瓶頸。如果是歷史數(shù)據(jù)特別多,比如上億級別,一次性的歷史加增量導(dǎo)入數(shù)據(jù),目前推薦圖二下面的一種架構(gòu)。先通過 debezium 等工具把數(shù)據(jù)庫里的數(shù)據(jù)同步到 kafka,統(tǒng)一維護(hù)中間的增量加全量消息,再去接入 flink 實(shí)時(shí)的導(dǎo)入數(shù)據(jù)湖中。
這樣的好處是留給 flink 一定的 buffer 能力,如果中間出現(xiàn)一下 failover 等狀況斷點(diǎn)續(xù)接起來比較方便。圖二第一種架構(gòu)在全量同步階段如果想要做到斷點(diǎn)續(xù)接,目前需要改一些代碼,所以實(shí)施起來不是那么方便。相比之下第二種的擴(kuò)展性會(huì)更好一些。當(dāng)然在后續(xù)的商業(yè)版本中,flink cdc connector 會(huì)得到改進(jìn),像目前的單并發(fā)問題、斷點(diǎn)續(xù)接問題,在商業(yè)版本都會(huì)解決。
通過這樣的兩條鏈路我們可以將數(shù)據(jù)庫的快照一分鐘級別同步到數(shù)倉,在下游 hudi 還可以向 hive 的 metastore 中同步一份 meta,這樣我們就可以在 hive 中查詢到比如 5 分鐘、10分鐘新鮮度的數(shù)據(jù)。當(dāng)然也可以接一些 OLAP 引擎如 presto。
03HUDI 核心
1. Timeline
Hudi 是一個(gè) table format,雖然它是一種沒有 server 的數(shù)據(jù)格式,但內(nèi)部是有狀態(tài)的。它的第一個(gè)狀態(tài)就是 Timeline,我們在 Hudi 數(shù)據(jù)湖上所做的所有的動(dòng)作,比如寫入compaction、clean、rollback等,都有一個(gè)唯一的事件時(shí)間戳,hudi 把這樣的時(shí)間戳抽象為 Instant 代表某個(gè)時(shí)間上的一個(gè)動(dòng)作,一次行為的記錄,一個(gè) instant 包括三個(gè)部分:
Action:動(dòng)作的類型 commit, delta_commit, clean, compaction, rollback, savepoint;
Time:instant的事件時(shí)間,一個(gè)事件時(shí)間唯一標(biāo)識(shí)了一個(gè)instant;
State:每個(gè)action的狀態(tài),包括requested, inflight, completed。

圖三 Hudi Timeline
Action 代表動(dòng)作的類型,包括:
commit (copy on write 模式下寫入產(chǎn)生的動(dòng)作);
delta_commit(merge on read 模式下寫入產(chǎn)生的動(dòng)作);
clean(對歷史 commit 數(shù)據(jù)的定期清理)。
每一個(gè)最新的快照都記住了當(dāng)前所有數(shù)據(jù),因此沒有必要記錄所有commit ,這樣一方面可以減少存儲(chǔ)上小文件的壓力,另一方面對 hudi 本身的視圖,比如對文件的 scan 也是一種優(yōu)化;compaction 是 merge on read 模式下面的一個(gè)優(yōu)化動(dòng)作,寫入的 log 文件在查詢時(shí)并不是那么高效,compaction 是一個(gè)定期觸發(fā)的策略,會(huì)把 avro 格式的 log 文件定期的壓縮成 parquet 格式;rollback 是對寫入動(dòng)作失敗產(chǎn)生的臟文件進(jìn)行清理,是對這些文件做一次回滾動(dòng)作,即在下一個(gè)寫入動(dòng)作執(zhí)行之前先執(zhí)行一次 rollback,把上一次產(chǎn)生的臟文件清理掉;savepoint 是做一些歷史快照,一旦 transaction 做了 savepoint 操作,它就會(huì)永遠(yuǎn)的記錄在數(shù)據(jù)湖當(dāng)中。clean 動(dòng)作會(huì)忽略掉這些 savepoint。
Time 代表 instant 的事件時(shí)間,一個(gè)事件時(shí)間唯一標(biāo)識(shí)了一個(gè)instant。
State 代表 action 的狀態(tài),每個(gè) action 有三種狀態(tài),requested 代表向數(shù)據(jù)庫發(fā)起 action 請求,action 真正執(zhí)行時(shí)的狀態(tài)是 inflight;action 已經(jīng)完成的狀態(tài)是 completed,標(biāo)志著這個(gè)事務(wù)已經(jīng)成功的結(jié)束,對于寫入動(dòng)作來說 completed 狀態(tài)就意味著這份數(shù)據(jù)擁有一個(gè)完整的事務(wù),就可以對下游的 reader 暴露了,即對下游 reader 可見。
通過唯一的 instant time 串聯(lián)起來的 Timeline 視圖,hudi 自己的 reader 可以暴露一個(gè)統(tǒng)一、連貫的視圖,對文件、meta 的一些管理以及對下游可見性的維護(hù)也比較便捷。比如可以靈活的選擇當(dāng)前時(shí)間線上應(yīng)該暴露哪些視圖,是最新的快照或者是歷史某個(gè)snapshot,或者 merge on read 模式下只暴露 parquet 等。
2. FileGrouping
通常我們的 writer 在寫的過程中可能會(huì)同時(shí)寫多個(gè)文件,這樣就可能產(chǎn)生很多小文件,如果使用 hdfs 的話會(huì)造成 NameNode 的壓力過大,下游 reader 的讀取效率也不高。hudi 于是在寫入就做了一些抽象,將一個(gè) partition 下的文件按照邏輯上的組織關(guān)系,組成了多個(gè) file group。file goup 在 hudi 類似于 hive 中 bucket 的概念,切換一個(gè) file group 的依據(jù)是希望這個(gè) file group 不會(huì)太小也不會(huì)太大,parquet 一般希望是 100 多兆,log 格式的可能會(huì)大一些,可能會(huì)配置 512MB或 1G。
hudi 劃分 file group 的方式是通過文件名中的一段 UUID,一個(gè) UUID 標(biāo)志一個(gè) file group,例如一個(gè) parquet 文件與 3 個(gè) log 文件有相同的一段 UUID 的話,則屬于同一個(gè) file group。file group 中不同時(shí)間的提交會(huì)生成不同版本的文件,為了減少小文件的問題,新版本的提交可能會(huì)對之前的文件進(jìn)行合并操作。
例如在merge on read 模式下,新版本的 parquet 文件是舊版本的 parquet 文件與新增 log 文件的merge,直到 parquet 文件的大小達(dá)到設(shè)置的閾值,才會(huì)切換到新的 parquet 文件去寫。log 的策略要靈活一些,因?yàn)橄?avro 這種存儲(chǔ)是支持 append 操作的,效率會(huì)高一些。hudi 在寫入的過程中會(huì)實(shí)時(shí)監(jiān)控當(dāng)前 partition 中有多少 file group,以及每個(gè) file group 的大小是多少,對于較小的 file group,insert 數(shù)據(jù)可以繼續(xù)向里面追加,這樣的話都會(huì)寫到我們期望的大小,這在一定程度上緩解了小文件的問題。
劃分 file group 的核心邏輯就是其中新版本的 file slice 的大小,超過了設(shè)定的閾值就會(huì)劃分新的 file group。一個(gè)新的 file slice 對應(yīng) file group 中的一個(gè)新的版本,如果你想要讀數(shù)據(jù)湖里最新的 snapshot 數(shù)據(jù),hudi 會(huì)去讀最新的 file slice。
3. Copy On Write
copy on write 模式會(huì)一直寫列式的 parquet 文件,新數(shù)據(jù)過來會(huì)被緩存在內(nèi)存中,構(gòu)建一個(gè)內(nèi)存的索引,新的數(shù)據(jù)會(huì)和老的 parquet 數(shù)據(jù)不斷的 merge,每次 merge 會(huì)形成一個(gè)新的 parquet 文件,一個(gè) parquet 文件對應(yīng)一個(gè) file slice,一個(gè) file slice 對應(yīng)一個(gè)新的數(shù)據(jù)版本。在hudi 中,你需要定義一個(gè) primary key 和一個(gè) precombine key,合并的邏輯就是根據(jù)primary key 和 precombine key(代表數(shù)據(jù)的版本)在不同的 record 之間做一個(gè)比較,默認(rèn)的策略是 replace策略,新版本的 record 會(huì)替換老版本的 record,我們也可以定義自己的 precombine 策略而不使用 replace 策略。

圖四 copy on write 模式
新增數(shù)據(jù)分為兩部分,一個(gè)是對老數(shù)據(jù)的更新,就是 upsert 數(shù)據(jù),另一部分是 insert 數(shù)據(jù)。對于upsert 數(shù)據(jù)寫到哪個(gè)文件,哪個(gè)位置是固定的,因?yàn)樵?hudi 中一個(gè) file group 維護(hù)了一個(gè) primary key 所有版本的記錄,這是為了做高效的 upsert 操作,也就是一個(gè)主鍵的所有版本都會(huì) merge 到一個(gè) file group 里最新的 file slice 中,例如 id = 1 的 record,它后續(xù)所有的版本都會(huì)寫入到這一個(gè) file group 中,因?yàn)檫@是一個(gè) merge 的操作,即使后續(xù)有多次的變更或刪除,也不會(huì)改變最新的 file slice 的大小,在最新的 file slice 中一個(gè) primary key 只會(huì)保留一條最新的 record。
對于 insert 數(shù)據(jù),寫到哪里其實(shí)無所謂,因?yàn)椴黄茐恼Z義。hudi 寫insert數(shù)據(jù)的依據(jù)是小文件策略,比如圖中的 file group 還沒寫到 100 兆,這時(shí)就會(huì)繼續(xù)往里寫,一直把它寫到我希望的大小。這就是 hudi 核心的 upsert 與 insert 數(shù)據(jù)寫入模型。
4. Merge On Read

圖五 merge on read 模式
merge on read 模式在 upsert 時(shí)首先會(huì)根據(jù) primary key 找 file group,把數(shù)據(jù)寫入 file group中,與 copy on write 不同的是,在 upsert 時(shí)不需要實(shí)時(shí) merge 的過程,它會(huì)把增量的數(shù)據(jù)追加到最新的 file slice 的 log 中,在 hdfs 上可以直接在原文件上追加,這樣它的寫入效率會(huì)比 Copy On Write 高效得多。rollover 的邏輯則與 copy on write 類似,比如 log 寫到了設(shè)定的大小,就會(huì)切換一個(gè)新的 file group,但在此之前的 upsert 和 insert 數(shù)據(jù)都會(huì) append 到 log 文件中。
因?yàn)?merge on read 寫入的是 avro 格式,在查詢上就不會(huì)像 parquet 這么高效,后續(xù)需要依賴自動(dòng)的 compaction 或離線定時(shí)的 compaction 任務(wù),把 log 文件進(jìn)一步壓縮成 parquet 來實(shí)現(xiàn)高效查詢。與 copy on write 相比,merge on read 的寫入吞吐較高,但是查詢效率較低。根據(jù)社區(qū)的反饋,在 flink 實(shí)時(shí)寫時(shí),由于會(huì)自動(dòng)啟動(dòng)一個(gè) compaction 的 pipeline,會(huì)比較吃內(nèi)存,所以 merge on read 需要配置更多的內(nèi)存。最近社區(qū)也在開發(fā)這種離線的 compaction,以獨(dú)立的 job 來運(yùn)行,這樣就不會(huì)干擾寫入流程。
5. Flink Write Pipeline( copy on write )

圖六 flink hudi copy on write 模式 pipeline
上圖是社區(qū)版 flink 寫操作的一個(gè) pipeline 模型,對應(yīng)于 copy on write 模式的模型。數(shù)據(jù)源對接的是 flink sql 的 raw data,使用 raw data 的好處是:上游我們可以選擇任意 table 支持的 format,比如 kafka 的 format。pipeline 會(huì)將上游的數(shù)據(jù)轉(zhuǎn)為 raw data,再轉(zhuǎn)成我們的 hudi record,record 會(huì)根據(jù)我們設(shè)置的 primary key 進(jìn)行 shuffle,shuffle 的目的是為了讓下游的一個(gè) task 只看到一個(gè) record 的所有版本,方便把一個(gè) record 的所有版本寫到一個(gè) file group 里面。bucket assign 為接收到的 record 分配 file group id,即 uuid,這樣 partition 加 uuid 的唯一位置,確定了一個(gè) record 在文件系統(tǒng)上的位置。分配完 file group id 之后,會(huì)根據(jù) file group id 進(jìn)行 shuffle,目的是讓一個(gè) write function 只寫一個(gè)文件,不會(huì)出現(xiàn)一個(gè)文件被多個(gè) task 去寫導(dǎo)致亂序的情況。write function 會(huì)有 buffer 的策略,數(shù)據(jù) flash 的策略有三種,第一種是單個(gè) buffer 的大小到達(dá)閾值(默認(rèn) 64MB),數(shù)據(jù)會(huì)被 flash 到磁盤;第二種是總 buffer 的大小到達(dá)閾值,當(dāng)前最大的 buffer 數(shù)據(jù)會(huì)被 flash 到磁盤;第三種策略是 checkpoint,一次 checkpoint 觸發(fā)一次 flash。
最后有一些運(yùn)維動(dòng)作,比如清理動(dòng)作,它們和 Stream Write 之間沒有數(shù)據(jù)的傳輸,是通過 event 來進(jìn)行通信的。
當(dāng)前版本的 Stream Write task 的事務(wù)提交是依賴于 coordinator 來實(shí)現(xiàn)的,它們之間主要通過 RPC 來完成一些 metadata 的傳輸,這些 metadata 主要是描述了 Stream Write task 寫入了哪個(gè)文件,寫了多少條記錄,coordinator 將 metadata 收集起來最終一起提交,提交成功則表示這次寫入成功,下游即可以看到最新的記錄。同時(shí) coordinator 還會(huì)有一些 meta 的同步動(dòng)作,比如向 hive 同步 partition、字段變更的一些信息。
04Flink HUDI Inc ETL

圖七 hudi flink etl
為了構(gòu)建經(jīng)典的數(shù)倉模型,傳統(tǒng)的方式是通過調(diào)度系統(tǒng)按照某種時(shí)間策略構(gòu)建一個(gè)定期的 pipeline 任務(wù),依據(jù) pipeline 之間的依賴關(guān)系規(guī)定觸發(fā)機(jī)制,整體的維護(hù)十分復(fù)雜。
hudi 因?yàn)榫哂?upsert 的能力,因此我們可以利用 debezium 等工具,通過 flink CDC 加 kafka 將數(shù)據(jù)庫數(shù)據(jù)近實(shí)時(shí)的同步到 ODS 層。如果hudi 可以繼續(xù)將上游數(shù)據(jù)的變更數(shù)據(jù)流傳到下游,借助 flink CDC 的能力下游可以繼續(xù)消費(fèi)這種增量數(shù)據(jù),然后在原有狀態(tài)的基礎(chǔ)上繼續(xù)做增量計(jì)算,這就構(gòu)建了一個(gè)分鐘級別近實(shí)時(shí)的增量數(shù)倉模型,因此我們對 hudi table format 進(jìn)行了改動(dòng)。
對于 copy on write 模式,由于每次更新都是 merge 操作,所以不需要修改,而 merge on read 模式由于 log 是 append 寫入的,我們會(huì)在 log format 下面增加一個(gè) change flags,包括每次操作的 update before、update after、delete 等這些 change flags,上游數(shù)據(jù)中只要攜帶了 change flags,在下游會(huì)一直傳播下去。flink 利用 change flags 可以將上游的操作在下游全部還原出來,成為當(dāng)前最新視圖。這樣在 ADS 層就可以直接對接如 presto、es、mysql 等。該實(shí)現(xiàn)在 jira 的 id為 HUDI-1771,預(yù)計(jì)在 hudi 0.9 版本發(fā)布,為了應(yīng)對更大的數(shù)據(jù)量和更復(fù)雜的模型,還會(huì)對 hudi 進(jìn)行進(jìn)一步的優(yōu)化。
05答疑
Q:可以對比一下 hudi 與 iceberg、delta lake 的區(qū)別嗎?
A:我的理解是,從社區(qū)的角度來看,是希望把 hudi 作為一個(gè)數(shù)據(jù)湖平臺(tái),而不僅僅是 table format,為此構(gòu)建了數(shù)據(jù)湖周邊的很多工具,比如數(shù)據(jù)的自動(dòng)清理、數(shù)據(jù)的二次聚合、與其他生態(tài)的對接,比如 hive meta 的同步。從 table format 上來看,目前 hudi upsert 能力是最成熟的,更新是最高效的,并且對小文件友好。但是它的寫入 pipeline 操作相對其它數(shù)據(jù)湖方案會(huì)比較重一些,在吞吐上有一些劣勢。
Q:flink hudi 是比較推薦 copy on write 模式嗎?
A:目前從很多公司的反饋來看,是比較推薦 copy on write 模式的,因?yàn)樗膬?nèi)存管理比較直觀,只在寫入那一步 buffer 的內(nèi)存以及 merge 時(shí)的 merge map 使用的內(nèi)存,因此我們在啟動(dòng)作業(yè)時(shí)配置內(nèi)存是十分明確的,job 也更加穩(wěn)定,如果對吞吐要求不高,推薦使用 copy on write 模式。merge on read 模式我們后面也會(huì)做一個(gè)離線的 compaction 方案,也會(huì)提升它的穩(wěn)定性。
Q:flink hudi 0.9 版本穩(wěn)定了嗎?
A:copy on write 現(xiàn)在已經(jīng)很穩(wěn)定了,merge on read 模式我們把 compaction 剝離出來后也會(huì)很穩(wěn)定。
Q:PrestoDB 支持 MOR 的快照讀了嗎?
A:最新版的 prestoDB 對兩種模式都支持的。
(部分內(nèi)容來源網(wǎng)絡(luò),如有侵權(quán)請聯(lián)系刪除)