日日碰狠狠躁久久躁96avv-97久久超碰国产精品最新-婷婷丁香五月天在线播放,狠狠色噜噜色狠狠狠综合久久 ,爱做久久久久久,高h喷水荡肉爽文np肉色学校

睿治

智能數(shù)據(jù)治理平臺(tái)

睿治作為國(guó)內(nèi)功能最全的數(shù)據(jù)治理產(chǎn)品之一,入選IDC企業(yè)數(shù)據(jù)治理實(shí)施部署指南。同時(shí),在IDC發(fā)布的《中國(guó)數(shù)據(jù)治理市場(chǎng)份額》報(bào)告中,連續(xù)四年蟬聯(lián)數(shù)據(jù)治理解決方案市場(chǎng)份額第一。

B站增量數(shù)據(jù)湖探索與實(shí)踐

時(shí)間:2022-12-23來(lái)源:梔子瀏覽數(shù):277

1. 背景

眾所周知,越實(shí)時(shí)的數(shù)據(jù)越有價(jià)值。直播、推薦、審核等領(lǐng)域中有越來(lái)越多的場(chǎng)景需要近實(shí)時(shí)的數(shù)據(jù)來(lái)進(jìn)行數(shù)據(jù)分析。我們?cè)谔剿骱蛯?shí)踐增量數(shù)據(jù)湖的過程中遇到許多痛點(diǎn),如時(shí)效性、數(shù)據(jù)集成同步和批流一體的存儲(chǔ)介質(zhì)不統(tǒng)一的問題。本文將介紹我們針對(duì)這些痛點(diǎn)所進(jìn)行的思考與實(shí)踐方案。

1.1 時(shí)效性痛點(diǎn)

傳統(tǒng)數(shù)倉(cāng)以小時(shí)/天級(jí)分區(qū),數(shù)據(jù)完整才可查。然而,一些用戶并不需要數(shù)據(jù)完整,只需要最近的數(shù)據(jù)做一些趨勢(shì)分析。因此,現(xiàn)狀無(wú)法滿足用戶越來(lái)越強(qiáng)的數(shù)據(jù)時(shí)效性需求。傳統(tǒng)數(shù)倉(cāng)ETL上一個(gè)任務(wù)完成后,才能開始下一個(gè)任務(wù)。即使是小時(shí)分區(qū),層級(jí)處理越多,數(shù)據(jù)最終產(chǎn)出時(shí)效性越低。

1.2 數(shù)據(jù)集成同步痛點(diǎn)

業(yè)界成熟的方案是通過阿里巴巴 datax系統(tǒng)同步mysql 從庫(kù)數(shù)據(jù)到hive表。定期全量或者增量進(jìn)行同步。需要單獨(dú)設(shè)置從庫(kù)以應(yīng)對(duì)數(shù)據(jù)同步時(shí)對(duì)db請(qǐng)求的壓力。此外,db從庫(kù)成本高,不可忽視。

增量同步面臨如何解決歷史分區(qū)數(shù)據(jù)修改問題。如果一條數(shù)據(jù)被更新了,那么僅通過增量同步,可能會(huì)在兩個(gè)分區(qū)里分別存在更新前和更新后的數(shù)據(jù)。用戶需要自行合并更新數(shù)據(jù)后,才能使用。

1.3 批流一體的存儲(chǔ)介質(zhì)不統(tǒng)一

業(yè)務(wù)下游即有時(shí)效性要求場(chǎng)景,也有離線ETL場(chǎng)景。Flink sql可以統(tǒng)一流批計(jì)算過程,但無(wú)統(tǒng)一存儲(chǔ),仍需要將實(shí)時(shí)、離線數(shù)據(jù)分開存儲(chǔ)在kafka、hdfs。

2. 思考與方案

增量化數(shù)據(jù)湖建設(shè)選型采用Flink + Hudi。我們需要數(shù)據(jù)湖的ACID 事務(wù)保障、流批讀寫操作的支持。并且,相對(duì)于 Iceberg 的設(shè)計(jì),Hudi 對(duì) Upsert 的支持設(shè)計(jì)之初的主要支持方案在upsert能力、小文件合并能力上有明顯優(yōu)勢(shì)。Append性能在版本迭代中逐漸完善。活躍的社區(qū)在陸續(xù)迭代增量消費(fèi)、流式消費(fèi)能力。綜合對(duì)比,最終選擇基于Hudi搭建增量數(shù)據(jù)湖生態(tài)。

3. HUDI內(nèi)核優(yōu)化

Hudi cow模式每次寫入都需要進(jìn)行合并,有io放大問題。Hudi 0.8起支持了mor模式,僅更新部分需要更新的數(shù)據(jù)文件。

但這會(huì)帶來(lái)數(shù)據(jù)質(zhì)量問題,主要是一些極端情況下的數(shù)據(jù)丟失、數(shù)據(jù)重復(fù)、數(shù)據(jù)延遲問題。我們?cè)趯?shí)際測(cè)試生產(chǎn)過程中發(fā)現(xiàn)了一些問題,嘗試解決并反饋給社區(qū)。

3.1 底層數(shù)據(jù)可靠性優(yōu)化

Hudi compaction大概代碼結(jié)構(gòu)

StreamWriteOperatorCoordinator 算子中的notifyCheckpointComplete邏輯:

提交當(dāng)前事務(wù)并將元數(shù)據(jù)deltacommit寫入hdfs

調(diào)用調(diào)度生成compant 執(zhí)行計(jì)劃并將compact.requested元數(shù)據(jù)寫入hdfs

開啟下一次事務(wù)

CompactionPlanOperator 算子中的notifyCheckpointComplete邏輯:

在0.9版本中通過sheduleCompation方法獲取最后一個(gè)compact執(zhí)行計(jì)劃

將compact.requested執(zhí)行計(jì)劃轉(zhuǎn)化為CompactionPlanEvent下發(fā)下游

CompactFuncation

processElement 最后執(zhí)行doCompaction 完成合并

3.1.1 log跳過壞塊bug,造成數(shù)據(jù)丟失

hudi log文件由一個(gè)個(gè)block 塊組成,一個(gè)log中可能包含多個(gè)deltacommit 寫入的塊信息,每個(gè)塊通過MAGIC進(jìn)行分割,每個(gè)塊包含的內(nèi)容如上圖所示。其中Block Header 會(huì)記錄每個(gè)塊的deltacommit ?instantTime,在compact 的過程中會(huì)掃描讀取需要合并的塊。HoodieLogFileReader是用來(lái)讀取log文件,會(huì)將log 文件轉(zhuǎn)化為一個(gè)個(gè)塊對(duì)象。首先會(huì)讀取MAGIC分隔,如果存在會(huì)讀下一位置塊的總大小,判斷是否超過當(dāng)前文件總大小來(lái)界定是否為壞塊,檢測(cè)到壞塊會(huì)跳過壞塊內(nèi)容并創(chuàng)建壞塊對(duì)象HoodieCorruptBlock,跳過的壞塊是從當(dāng)前block total size 位置以后檢索MAGIC(#HUDI#),每次讀取1兆大小,一直讀取到下一個(gè)MAGIC分割為止,找到MAGIC后跳到當(dāng)前MAGIC位置,后面可以接著在讀完整的塊信息。

上述提到壞塊的位置是從block total size 位置以后檢索的,當(dāng)文件在極端情況下只寫入MAGIC內(nèi)容還沒有寫入內(nèi)容任務(wù)會(huì)出現(xiàn)異常,此時(shí)會(huì)寫入連續(xù)MAGIC,在讀取的過程中會(huì)把MAGIC當(dāng)做Block Total size 讀取,在檢索的過程中會(huì)將下一個(gè)正常的塊給跳過,當(dāng)成壞塊返回。在合并的過程中這部分塊數(shù)據(jù)就丟失了。這里跳過的壞塊位置應(yīng)該從MAGIC之后開始檢索而不是block total size之后,讀取時(shí)出現(xiàn)連續(xù)MAGIC就不會(huì)跳過正常塊。

相關(guān)pr:https://github.com/apache/hudi/pull/4015

3.1.2 compaction和回滾的log合并,導(dǎo)致數(shù)據(jù)重復(fù)

在HoodieMergedLogRecordScanner 會(huì)掃描有效的塊信息,極端情況下log 文件寫入的塊寫入是完整的,但是deltacommit元數(shù)據(jù)提交前任務(wù)失敗,并且log中的塊信息也比deltacommit元數(shù)據(jù)的instant Time 還小,此時(shí)會(huì)被認(rèn)為是有效的塊被合并。因?yàn)閐eltacommit元數(shù)據(jù)沒有寫入成功,check point 重新啟動(dòng)將之前的數(shù)據(jù)進(jìn)行回放。回放時(shí)被分配的flieId可能不同,當(dāng)這部分?jǐn)?shù)據(jù)被合并會(huì)發(fā)現(xiàn)數(shù)據(jù)重復(fù)并且fileId不一樣現(xiàn)象。

在掃描log是否為效塊時(shí),如果當(dāng)前時(shí)間線比現(xiàn)有沒歸檔的時(shí)間線的元數(shù)據(jù)小的時(shí)候,加入已經(jīng)歸檔的時(shí)間線,進(jìn)行協(xié)同判斷是否為有效塊,不再只將未歸檔的時(shí)間線作為篩選條件。

相關(guān)pr:https://github.com/apache/hudi/pull/5052

3.1.3 數(shù)據(jù)非連續(xù)場(chǎng)景,最后一次數(shù)據(jù)不會(huì)觸發(fā)compaction,導(dǎo)致RO表數(shù)據(jù)延遲

上述流程中調(diào)度compaction 執(zhí)行計(jì)劃必須在上一次事務(wù)提交成功后才會(huì)觸發(fā),如果一段數(shù)據(jù)都沒有數(shù)據(jù)寫入,compact 即使?jié)M足時(shí)間條件或者commit個(gè)數(shù)條件都不會(huì)形成compact 執(zhí)行計(jì)劃。上游沒有執(zhí)行計(jì)劃下游Compaction 算子不會(huì)觸發(fā)合并,用戶在查詢r(jià)o表,部分?jǐn)?shù)據(jù)會(huì)一直查詢不到。

去掉必須commitInstant 提交成功才調(diào)度生成compact執(zhí)行計(jì)劃的綁定,每次checkpoint 后都檢查是否滿足條件觸發(fā)并生成Compaction 執(zhí)行計(jì)劃,避免最新數(shù)據(jù)無(wú)法被合并。當(dāng)沒有新數(shù)據(jù)寫入的場(chǎng)景下元數(shù)據(jù)只有deltacommit.requested和deltacommit.inflight元數(shù)據(jù)不能直接用當(dāng)前時(shí)間為compact instantTime。上游可能隨時(shí)寫入數(shù)據(jù)避免合并沒有寫完的數(shù)據(jù),在生成compact執(zhí)行計(jì)劃也會(huì)檢查元數(shù)據(jù)的deltacommit 和compact 元數(shù)據(jù)避免出現(xiàn)合并沒寫完的數(shù)據(jù)。此時(shí)compact instantTime可以為最近沒完成的deltacommit instantTime 和最新完成compact 之間的時(shí)間。這樣生成compaction執(zhí)行計(jì)劃元數(shù)據(jù)下游完成compact 合并就不會(huì)延遲了。

相關(guān)pr:https://github.com/apache/hudi/pull/3928

3.1.4 CompactionPlanOperator每次只取最后一次compaction,導(dǎo)致數(shù)據(jù)延遲

在StreamWriteOperatorCoordinator算子notifyCheckpointComplete方法中生產(chǎn)compact執(zhí)行計(jì)劃,在CompactionPlanOperator 算子notifyCheckpointComplete方法中消費(fèi)執(zhí)行計(jì)劃下發(fā)操作合并數(shù)據(jù)。在極端情況下如果設(shè)置compact策略為一個(gè)commit就觸發(fā)compact合并操作,這樣在兩個(gè)算子notifyCheckpointComplete中會(huì)不斷的生產(chǎn)和消費(fèi)compact 執(zhí)行計(jì)劃,一旦消費(fèi)一端的compact出現(xiàn)異常任務(wù)失敗,這樣會(huì)堆積很多的compact.requested執(zhí)行計(jì)劃,而每次CompactionPlanOperator只會(huì)獲取一個(gè)執(zhí)行計(jì)劃元數(shù)據(jù),這樣數(shù)據(jù)會(huì)產(chǎn)生堆積和延遲,總有一部分執(zhí)行計(jì)劃無(wú)法執(zhí)行。在0.8版本取最后一次的compaction執(zhí)行計(jì)劃,這樣會(huì)新的commit一直在合并,老的數(shù)據(jù)一直無(wú)法合并造成丟失數(shù)據(jù)的假像。后續(xù)社區(qū)改為獲取最新的一次,如果下游出現(xiàn)某種原因的失敗導(dǎo)致compact執(zhí)行計(jì)劃擠壓,數(shù)據(jù)延遲會(huì)越來(lái)越大。

CompactionPlanOperator獲取所有的compact 執(zhí)行計(jì)劃轉(zhuǎn)化為CompactionPlanEvent下發(fā)下游,將CompactFunction 方法改為默認(rèn)同步模式。異步模式中底層使用newSingleThreadExecutor線程池避免在同步的過程隊(duì)列持有大量對(duì)象。

相關(guān)pr:https://issues.apache.org/jira/browse/HUDI-3618

3.1.5 log中沒有符合時(shí)間線塊,parquet 文件重新生成,之前記錄丟失

compact 執(zhí)行計(jì)劃包含多個(gè)HoodieCompactionOperation,每個(gè)HoodieCompactionOperation包含log文件和parquet 文件,但可能也只有l(wèi)ogFlies 說(shuō)明只有新增數(shù)據(jù)。在做compact 合并時(shí)會(huì)獲取HoodieCompactionOperation中的log文件和parquet 文件信息,構(gòu)建HoodieMergedLogRecordScanner將log文件中符合合并要求的塊數(shù)據(jù)刷入ExternalSpillableMap中,在merge 階段根據(jù)parquet 數(shù)據(jù)和ExternalSpillableMap的數(shù)據(jù)比較合并形成新的parquet文件,新文件的instantTime為compact 的instantTime。

在HoodieMergedLogRecordScanner中掃描log file文件中,需要符合時(shí)間線要求的log 塊信息,如果沒符合要求的塊被掃描到,后續(xù)的merge操作不會(huì)運(yùn)行,新的parquet 文件版本也沒有。下一次compact 的執(zhí)行計(jì)劃獲取的FileSlice只會(huì)有l(wèi)og 文件而沒parquet文件,在執(zhí)行compact runMerge 會(huì)當(dāng)新增操作寫入,沒有和之前合并parquet數(shù)據(jù)合并之前的數(shù)據(jù)全部丟失,新parquet 文件中只有下一次log 產(chǎn)生的數(shù)據(jù),導(dǎo)致數(shù)據(jù)丟失。

不論掃描是否符合要求,塊信息都強(qiáng)制寫入新的parquet文件,這樣下一次compact合并執(zhí)行計(jì)劃中獲取FileSlice都會(huì)有l(wèi)og 文件和parquet文件,可以正常進(jìn)行handleUpdate 合并,保證數(shù)據(jù)不丟失。

3.2 Table Service優(yōu)化

3.2.1 獨(dú)立的table service? -- compaction外掛

背景:

目前社區(qū)提供了多種表服務(wù)方案,但實(shí)際生產(chǎn)應(yīng)用中,尤其是平臺(tái)化過程中,會(huì)面臨多種問題。

原方案分析:

為了對(duì)比各方案的特點(diǎn),我們基于hudi v0.9提供的表服務(wù)進(jìn)行分析。

首先,我們將表服務(wù)拆解為調(diào)度+執(zhí)行2階段的過程。

根據(jù)調(diào)度后是否立刻執(zhí)行,可以將調(diào)度分為inline調(diào)度(即調(diào)度后立馬執(zhí)行)與async調(diào)度(即調(diào)度后只生成plan,不立馬執(zhí)行)兩種,

根據(jù)執(zhí)行調(diào)用的方式,可分為sync執(zhí)行(同步執(zhí)行)、async執(zhí)行(通過相應(yīng)service異步執(zhí)行,如AsyncCleanService)兩種,

此外,相對(duì)于寫入job,表服務(wù)作為一種數(shù)據(jù)編排job,本質(zhì)上是區(qū)別于寫入job的,根據(jù)這些服務(wù)是否內(nèi)嵌在寫入job中,我們將其稱為內(nèi)嵌模式和standalone模式兩種模式。

以下,對(duì)社區(qū)提供的Flink on Hudi的多種表服務(wù)方案進(jìn)行分析:

方案一 內(nèi)嵌同步模式:在ingestion作業(yè)中,inline schedule + sync compact/cluster & inline schedule clean + async clean

該模式的問題很明顯:每次寫入后,立即通過內(nèi)聯(lián)調(diào)度并執(zhí)行compact作業(yè),完成后才開始新的instant,在流程上即直接影響數(shù)據(jù)寫入的性能,在實(shí)際生產(chǎn)中不會(huì)采用。

方案二 ?內(nèi)嵌異步模式:在ingestion作業(yè)中, async schedule + sync/async compact/cluster && inline schedule + sync/async clean

不同于方案一,該模式將資源消耗較大的compact/cluster等操作異步化至專門算子處理,ingestion流程僅保留了輕量的調(diào)度操作,對(duì)clean操作增加了同步/異步選擇。

但依舊存在缺點(diǎn),ingestion作業(yè)的流式處理,疊加上表服務(wù)的間歇性批處理,對(duì)資源消耗曲線新增造成很多沖激毛刺,甚至是很多作業(yè)oom的元兇,使得作業(yè)配置時(shí)不得不預(yù)留足夠多資源,造成高優(yōu)先資源閑時(shí)浪費(fèi)。

方案三? standalone模式:ingestion作業(yè) + compact/cluster作業(yè)組合

目前社區(qū)該方案尚不完善,其中寫入作業(yè)流程參考方案一,對(duì)compact/cluster/clean等action提供單獨(dú)的編排作業(yè),以compact為例,HoodieFlinkCompactor的流程如下:

該方案通過抽象出單獨(dú)的數(shù)據(jù)編排作業(yè),從作業(yè)級(jí)別隔離使用,克服了方案二的弊端,從平臺(tái)化的角度看,符合我們的需求。

選定方向后,就需要面對(duì)該方案目前的諸多缺陷,包括如下幾點(diǎn):

1. 在單writer模型下,編排作業(yè)的schedule模式不可用,會(huì)有的timeline一致性問題,導(dǎo)致數(shù)據(jù)丟失。

相對(duì)于ingestion作業(yè),compact/cluster job本質(zhì)上是另一個(gè)writer,多writer處于并發(fā)下,在無(wú)鎖狀態(tài)下timeline一致性是無(wú)法保證的,極端情況下會(huì)出現(xiàn)丟數(shù)據(jù)的問題,如下圖所示:

2. 寫入作業(yè)和編排作業(yè)沒有standalone模式下的協(xié)同能力。

首先,是clean action的問題,雖然hudi內(nèi)核能力已經(jīng)健全,但目前表服務(wù)層面僅暴露出inline調(diào)度+執(zhí)行的方法,導(dǎo)致無(wú)論寫入還是編排作業(yè)都會(huì)包含clean,架構(gòu)上過于混亂與不可控;

其次,編排作業(yè)CompactPlanSource與內(nèi)嵌模式寫入作業(yè)的CompactPlan是兩個(gè)不同算子,dag未保持線性,不利于不同模式的切換;

此外,還存在作業(yè)編排調(diào)度不具備接收外部策略的能力,無(wú)法進(jìn)行平臺(tái)化,集成公司智能調(diào)度、專家診斷等系統(tǒng)等問題。

優(yōu)化方案:

首先,解決timeline一致性問題,目前hudi社區(qū)已經(jīng)有occ(樂觀并發(fā)控制)運(yùn)行模式的支持,引入了分布式鎖(hive metastore、zookeeper)。但flink模塊的相關(guān)支持尚在初始階段中,我們內(nèi)部也在進(jìn)行相應(yīng)應(yīng)用測(cè)試,但發(fā)現(xiàn)距生產(chǎn)應(yīng)用尚有諸多問題需要解決。

由于調(diào)度操作本身較為輕量,本期暫時(shí)把表服務(wù)scheduler保留在寫入作業(yè)中,仍舊保持以單writer模型運(yùn)行,以規(guī)避多writer問題。

其次,針對(duì)性優(yōu)化hudi底層的表服務(wù)調(diào)度機(jī)制,將clean action也拆解為調(diào)度+執(zhí)行的的使用范式,通過inlineScheduleEnabled配置,默認(rèn)為true進(jìn)行后向兼容,在standalone模式下,inlineScheduleEnabled為false。

然后,重構(gòu)寫入作業(yè)與編排作業(yè),完善對(duì)3種運(yùn)行模式的支持。具體包括:

對(duì)寫入作業(yè)的表服務(wù)scheduler優(yōu)化,提供DynamicConfigProvider支持外部策略集成;重寫Clean算子,支持多種運(yùn)行模式的切換;

對(duì)編排作業(yè),重構(gòu)作業(yè)使作業(yè)dag與內(nèi)嵌模式線性一致;支持單instant,全批以及service常駐模式;優(yōu)化寫入與編排作業(yè)間的配置傳遞,使其達(dá)到托管任務(wù)的要求任意啟停;

重構(gòu)后的寫入與編排作業(yè)(以compact為例)如下:

3.2.2 metaStore解決分區(qū)ready問題

增量化數(shù)倉(cāng),需要支持近實(shí)時(shí)業(yè)務(wù)場(chǎng)景分鐘級(jí)數(shù)據(jù)可見性,需要在寫入數(shù)據(jù)時(shí)就創(chuàng)建hiveMetaStore分區(qū)信息。

而離線依賴建hiveMetaStore分區(qū)即數(shù)據(jù)完整的語(yǔ)義。如何解決是一個(gè)問題。

任務(wù)調(diào)度,依賴調(diào)度系統(tǒng)。在數(shù)據(jù)ready后,通知調(diào)度系統(tǒng),可以進(jìn)行下游任務(wù)調(diào)度。與建分區(qū)討論關(guān)系不大。

離線依賴hiveMetaStore問題,我們通過改造 hiveMetaStore ,賦予分區(qū)一個(gè)新的commit屬性,若數(shù)據(jù)未ready 則commit為false,分區(qū)不可見。保持原有語(yǔ)義不變。

對(duì)于adhoc來(lái)說(shuō),帶hint includeUnCommit=true標(biāo)識(shí)查詢,可在數(shù)據(jù)未完成時(shí)查詢到數(shù)據(jù)。

對(duì)于離線來(lái)說(shuō),當(dāng)分區(qū)的commit屬性被置為true,才能查到分區(qū)。滿足分區(qū)可見即數(shù)據(jù)完整的語(yǔ)義。

對(duì)于flink job來(lái)說(shuō),在數(shù)據(jù)第一次寫入時(shí),創(chuàng)建分區(qū),并賦予分區(qū)commit=false標(biāo)簽,使得adhoc可以查到最新寫入的數(shù)據(jù)。

在處理完分區(qū)數(shù)據(jù),判斷分區(qū)數(shù)據(jù)ready后,更新分區(qū)commit=true。此時(shí),數(shù)據(jù)ready,分區(qū)對(duì)離線可見,滿足“分區(qū)可見即數(shù)據(jù)完整”的語(yǔ)義。

4. 場(chǎng)景落地實(shí)踐

4.1 增量化數(shù)倉(cāng)

傳統(tǒng)數(shù)倉(cāng)TL上一個(gè)任務(wù)完成后,才能開始下一個(gè)任務(wù)。即使是小時(shí)分區(qū),層級(jí)處理越多,數(shù)據(jù)最終產(chǎn)出時(shí)效性越低。

采用增量計(jì)算方式,每次計(jì)算讀取上一次增量。這樣當(dāng)上游數(shù)據(jù)完整后,只需要額外計(jì)算最后一次增量的時(shí)間即可完成,可以提升數(shù)據(jù)完成時(shí)效性。

同時(shí)在第一批數(shù)據(jù)寫入到ods層后就可增量計(jì)算至下一層直至產(chǎn)出,數(shù)據(jù)即可見,大大提升數(shù)據(jù)可見時(shí)效性。

具體實(shí)現(xiàn)方式是:通過hudi source,flink增量消費(fèi)hudi數(shù)據(jù)。支持?jǐn)?shù)倉(cāng)跨層增量計(jì)算,如ods → dwd → ads 都使用hudi串聯(lián)。支持同其他數(shù)據(jù)源做join、groupby,最終產(chǎn)出繼續(xù)落hudi。

對(duì)于審核數(shù)據(jù)等有較高時(shí)效性訴求,可以采用此方案加速數(shù)據(jù)產(chǎn)出,提升數(shù)據(jù)可見時(shí)效性。

4.2 CDC到HUDI

4.2.1 面臨的問題

1. 原有datax同步方式 簡(jiǎn)單來(lái)說(shuō)就是 select * ,對(duì)mysql來(lái)說(shuō)是慢查詢,有阻塞業(yè)務(wù)庫(kù)風(fēng)險(xiǎn),所以需要單獨(dú)開辟mysql從庫(kù)滿足入倉(cāng)需求,有較高的mysql從庫(kù)機(jī)器成本,是降本增效的對(duì)象之一。

2. 原有同步方式不能滿足日益增加的時(shí)效性需求,僅能支持天/小時(shí)同步,無(wú)法支持到分鐘級(jí)數(shù)據(jù)可見粒度。

3. 原有同步方式落hive表,不具備update能力,如果一條記錄經(jīng)過update,則可能在兩個(gè)以mtime為時(shí)間分區(qū)都存在此數(shù)據(jù),業(yè)務(wù)使用還需要做去重,使用成本較高。

4.2.2 解決思路

通過flink cdc消費(fèi)mysql增量+全量數(shù)據(jù),分chunk進(jìn)行select,無(wú)需單獨(dú)為入倉(cāng)開辟mysql從庫(kù)。

落hudi支持update、delete,相當(dāng)于hudi表是mysql表的鏡像。

同時(shí)支持分鐘級(jí)可見,滿足業(yè)務(wù)時(shí)效性訴求。

4.2.3 整體架構(gòu)

一個(gè)db庫(kù)用一個(gè)flink cdc job進(jìn)行mysql數(shù)據(jù)同步,一張表的數(shù)據(jù)分流到一個(gè)kafka topic中,由一個(gè)flink job消費(fèi)一個(gè)kafka topic,落到hudi表中。

4.2.4 數(shù)據(jù)質(zhì)量保障 - 不丟不重

flink cdc source

簡(jiǎn)單來(lái)說(shuō)就是:全量 + 增量 通過changlog stream 方式將數(shù)據(jù)變更傳遞給下游。

全量階段:分chunk讀取, select ?* + binlog修正,無(wú)鎖的將全量數(shù)據(jù)讀出并傳遞給下游。

增量階段:偽裝成為一個(gè)從庫(kù),讀取binlog數(shù)據(jù)傳遞給下游。

flink

通過flink checkpoint機(jī)制,將處理完成的數(shù)據(jù)位點(diǎn)記錄到checkpoint中,如果后續(xù)發(fā)生異常則從checkpoint可保證數(shù)據(jù)不丟不重。

kafka

kafka client開啟ack = all ,當(dāng)所有副本都接收到數(shù)據(jù)后,才ack,保證數(shù)據(jù)不丟。

kafka server 保證replicas大于1,避免臟選舉。

這里不會(huì)開啟kafka事務(wù)(成本較高),保證at least once 即可。由下游hudi做去重。

hudi sink

hudi sink同樣基于flink checkpoint實(shí)現(xiàn)類似二階段提交方式,實(shí)現(xiàn)數(shù)據(jù)寫hudi表不丟失。

通過增加由flink cdc生成的單調(diào)遞增的“版本字段”進(jìn)行比較,單條記錄版本高的寫入,低的舍棄。同時(shí)解決去重和亂序消費(fèi)問題。

4.2.5 字段變更

mysql業(yè)務(wù)庫(kù)進(jìn)行了字段變更、新增字段怎么辦?

面臨的問題

1. 數(shù)據(jù)平臺(tái)新增字段有安全準(zhǔn)入問題,需要用戶確認(rèn),是否需要加密入倉(cāng)。

2. 字段類型變更,需要用戶確認(rèn)下游任務(wù)是否兼容。

3. hudi的column evalution能力有限,比如int轉(zhuǎn)string類型就無(wú)法支持。

解決思路

與dba約定,部分變更支持自動(dòng)審批(如新增字段、int類型轉(zhuǎn)long等)通過。并且異步通知berserker(b站大數(shù)據(jù)平臺(tái)系統(tǒng)),由berserker變更①hudi表信息,以及②更新flink job信息。

超出約定變更部分(如int轉(zhuǎn)varchar等),走人工審批,需要berserker確認(rèn)① hudi表變更完成、②寫入hudi的flink job變更完成后,再放行mysql ddl變更工單。

我們改造了 Flink cdc job,可以感知mysql字段變更,向下游kafka發(fā)送變更后數(shù)據(jù),不受審批約束,將變更后的數(shù)據(jù)暫存在kafka topic中,此kafka對(duì)用戶不可見。下游寫入hudi任務(wù)不變更照常消費(fèi),不寫入新增字段,用戶確認(rèn)數(shù)據(jù)可入倉(cāng)后,再?gòu)膋afka回放數(shù)據(jù),補(bǔ)充寫入新增字段。

方案

mysql字段類型和hudi類型存在不對(duì)應(yīng)情況。flink job 消費(fèi)kafka 定義字段類型和 hudi 表定義字段不對(duì)應(yīng),需要berserker在拼flink sql時(shí)候,額外拼入轉(zhuǎn)換的邏輯。

Flink cdc sql 自動(dòng)感知字段變更改造

flink cdc原生sql是需要定義mysql表的字段信息的,那么當(dāng)mysql出現(xiàn)字段變更時(shí),是必然無(wú)法做到自動(dòng)感知,并傳遞變更后數(shù)據(jù)給下游的。

原生flink cdc source會(huì)對(duì)所有監(jiān)聽到的數(shù)據(jù)在反序列化時(shí)根據(jù)sql ddl定義做column轉(zhuǎn)換和解析,以row的形式傳給下游。

我們?cè)赾dc-source中新增了一種的format方式:changelog bytes序列化方式。該format在將數(shù)據(jù)反序列化時(shí)在不再進(jìn)行column轉(zhuǎn)換和解析,而是將所有column直接轉(zhuǎn)換為changelog-json二進(jìn)制傳輸,外層將該二進(jìn)制數(shù)據(jù)直接封裝成row再傳給下游。對(duì)下游透明,下游hudi在消費(fèi)kafka數(shù)據(jù)的時(shí)候可以直接通過changelog-json反序列化進(jìn)行數(shù)據(jù)解析。

并且由于該改動(dòng)減少了一次column的轉(zhuǎn)換和解析工作,通過實(shí)際測(cè)試下來(lái)發(fā)現(xiàn)除自動(dòng)感知schema變更外還能提升1倍的吞吐。

4.3 實(shí)時(shí)DQC

dqc kafka監(jiān)控存在幾個(gè)痛點(diǎn):

基于kafka的實(shí)時(shí)dqc很難做同比環(huán)比的指標(biāo)判斷。

dqc實(shí)時(shí)鏈路是單獨(dú)的開發(fā)的,和離線dqc不通用。維護(hù)成本高。

針對(duì)同一條流多個(gè)監(jiān)控規(guī)則,是需要設(shè)立多個(gè)flink job,每個(gè)job計(jì)算一個(gè)指標(biāo)。不能復(fù)用,資源開銷大。

架構(gòu)

將kafka數(shù)據(jù)dump到hudi表后,提供dqc數(shù)據(jù)校驗(yàn)。不影響生產(chǎn)秒級(jí)/亞秒級(jí)數(shù)據(jù)時(shí)效,又可以解決以上痛點(diǎn)。

hudi提供分鐘級(jí)的監(jiān)控,可以滿足實(shí)時(shí)dqc監(jiān)控訴求。時(shí)間過短,可能反而會(huì)因?yàn)閿?shù)據(jù)波動(dòng)產(chǎn)生誤告警。

hudi以hive表的形式呈現(xiàn),使得實(shí)時(shí)dqc可以和離線dqc邏輯一致,可以很容易的進(jìn)行同環(huán)比告警,易于開發(fā)維護(hù)。

實(shí)時(shí)DQC on Hudi 使得實(shí)時(shí)鏈路數(shù)據(jù)變得更易觀測(cè)。

4.4 實(shí)時(shí)物化

背景

有些業(yè)務(wù)方需要對(duì)實(shí)時(shí)產(chǎn)出的數(shù)據(jù)進(jìn)行一個(gè)秒級(jí)的聚合查詢。

如實(shí)時(shí)看板需求,需要一分鐘一個(gè)數(shù)據(jù)點(diǎn)來(lái)展示DAU曲線,等多指標(biāo)聚合查詢場(chǎng)景。

同時(shí)結(jié)果數(shù)據(jù)要寫入update存儲(chǔ),實(shí)時(shí)更新。

難點(diǎn)

在較大數(shù)據(jù)規(guī)模下,基于明細(xì)產(chǎn)出幾十上百個(gè)聚合計(jì)算結(jié)果,要求秒級(jí)返回,幾乎不可能。

目前公司內(nèi)支持update類型的存儲(chǔ)主要是redis/mysql,計(jì)算結(jié)果導(dǎo)入意味著數(shù)據(jù)出倉(cāng),脫離了hdfs存儲(chǔ)體系,同時(shí)也要使用對(duì)應(yīng)的client進(jìn)行查詢,開發(fā)成本較高。

現(xiàn)有的hdfs體系內(nèi)計(jì)算加速方案如物化、預(yù)計(jì)算大都是基于離線場(chǎng)景,對(duì)實(shí)時(shí)數(shù)據(jù)提供物化查詢能力較弱。

目標(biāo)

支持hdfs體系內(nèi)的update存儲(chǔ)。讓數(shù)據(jù)無(wú)需出倉(cāng)導(dǎo)入外部存儲(chǔ),可以直接使用olap引擎高效查詢。

通過sql就可以簡(jiǎn)單定義實(shí)時(shí)物化表。查詢時(shí)通過sql解析,命中物化表查詢則可秒級(jí)返回多個(gè)聚合查詢結(jié)果。

方案

基于flink + hudi提供實(shí)時(shí)物化的能力。

通過sql自定義物化邏輯到基于hudi的物化表。將明細(xì)數(shù)據(jù)寫入明細(xì)hudi表中,并拉起一個(gè)flink job 進(jìn)行實(shí)時(shí)聚合計(jì)算,將計(jì)算結(jié)果upsert到物化的hudi表中。

在查詢時(shí)通過sql解析,如果規(guī)則命中物化表,則查詢物化表中的數(shù)據(jù),從而達(dá)到加速查詢的效果。

5. 未來(lái)展望

5.1 HUDI內(nèi)核能力增強(qiáng)及穩(wěn)定性優(yōu)化

Hudi timeline支持 樂觀鎖解決并發(fā)沖突,支持多流同時(shí)寫一張表。從底層支持新增數(shù)據(jù)和回補(bǔ)數(shù)據(jù)同時(shí)寫入hudi。

支持更豐富的schema evalution。避免重新建表、重新導(dǎo)入數(shù)據(jù)的繁重操作。

Hudi meta server,統(tǒng)一實(shí)時(shí)表離線表,支持instance版本等信息。支持flink sql上使用time travel,滿足取數(shù)據(jù)快照等訴求

Hudi manager 根據(jù)不同的表按需調(diào)配compaction、clustering、clean。用于離線ETL的表,低峰期進(jìn)行compaction,資源上削峰填谷。對(duì)于近線分析的表,積極compaction以及clustering,減少查詢攝取文件數(shù),提升查詢速度。

5.2 切換弱實(shí)時(shí)場(chǎng)景從Kafka到HUDI

在弱實(shí)時(shí)場(chǎng)景上實(shí)現(xiàn)流批統(tǒng)一存儲(chǔ)。Kafka對(duì)于突發(fā)流量以及拉取歷史數(shù)據(jù)達(dá)到性能瓶頸時(shí),難易緊急擴(kuò)容分?jǐn)傋x寫負(fù)載。可以將分鐘級(jí)的弱實(shí)時(shí)使用場(chǎng)景,從Kafka切換到HUDI,利用HUDI可讀取增量數(shù)據(jù)的能力,滿足業(yè)務(wù)需求,并且HUDI基于分布式文件系統(tǒng)可快速擴(kuò)容副本的能力,滿足緊急擴(kuò)容的需求。

(部分內(nèi)容來(lái)源網(wǎng)絡(luò),如有侵權(quán)請(qǐng)聯(lián)系刪除)
立即申請(qǐng)數(shù)據(jù)分析/數(shù)據(jù)治理產(chǎn)品免費(fèi)試用 我要試用
  • 相關(guān)主題
  • 相關(guān)大數(shù)據(jù)問答
  • 相關(guān)大數(shù)據(jù)知識(shí)
customer

在線咨詢

在線咨詢

點(diǎn)擊進(jìn)入在線咨詢