日日碰狠狠躁久久躁96avv-97久久超碰国产精品最新-婷婷丁香五月天在线播放,狠狠色噜噜色狠狠狠综合久久 ,爱做久久久久久,高h喷水荡肉爽文np肉色学校

睿治

智能數(shù)據(jù)治理平臺

睿治作為國內(nèi)功能最全的數(shù)據(jù)治理產(chǎn)品之一,入選IDC企業(yè)數(shù)據(jù)治理實施部署指南。同時,在IDC發(fā)布的《中國數(shù)據(jù)治理市場份額》報告中,連續(xù)四年蟬聯(lián)數(shù)據(jù)治理解決方案市場份額第一。

Flink+Clickhouse實時數(shù)倉最佳實踐

時間:2023-05-16來源:癮蠻大瀏覽數(shù):1394

一、業(yè)務(wù)背景

由于歷史原因,大型集團企業(yè)往往多個帳套系統(tǒng)共存,包括國內(nèi)知名ERP廠商浪潮、金蝶、速達(dá)所提供的財務(wù)系統(tǒng),集團財務(wù)共享中心的財務(wù)人員在核對財務(wù)憑證數(shù)據(jù)時經(jīng)常需要跨多個系統(tǒng)查詢且每個系統(tǒng)使用方式不一,同時因為系統(tǒng)累計數(shù)據(jù)龐大,制單和查詢操作經(jīng)常出現(xiàn)卡頓,工作效率非常低。

數(shù)據(jù)中臺天然就是為了解決數(shù)據(jù)孤島和數(shù)據(jù)口徑不一致問題應(yīng)運而生的,總的來說就是要將原本存在各帳套系統(tǒng)的數(shù)據(jù)實時接入中臺,中臺再將不同系統(tǒng)的數(shù)據(jù)模型進行歸一化處理,并且在數(shù)據(jù)分析平臺上提供統(tǒng)一的查詢?nèi)肟凇南旅娴募軜?gòu)圖可以直觀地了解。

總體架構(gòu).png

一方面,數(shù)據(jù)中臺提供的數(shù)據(jù)查詢服務(wù)需要覆蓋原系統(tǒng)的帳套查詢功能,這意味著原系統(tǒng)做的任何業(yè)務(wù)操作(插入分錄、刪除憑證、廢棄憑證、保存等)在數(shù)據(jù)中臺都需要有同步的事務(wù)反應(yīng),確保中臺提供的數(shù)據(jù)結(jié)果與原系統(tǒng)客戶端保持嚴(yán)格一致;另一方面,數(shù)據(jù)分析平臺需要提供級聯(lián)、上卷下鉆、多維聚合等附加能力,滿足海量數(shù)據(jù)分析的需求。重點需要考慮以下幾個問題:

開源的CDC監(jiān)控產(chǎn)品大都是針對Mysql或Postgresql的,然而財務(wù)核算系統(tǒng)大多使用Oracel數(shù)據(jù)庫,如何選擇一套穩(wěn)定且滿足業(yè)務(wù)需求(可監(jiān)控增刪改事務(wù)操作)的CDC插件或lib庫;

如何保證端到端的數(shù)據(jù)一致性,包括維度一致性以及全流程數(shù)據(jù)一致性;

實時流處理過程中數(shù)據(jù)到達(dá)順序無法預(yù)知時,如何保證雙流join時數(shù)據(jù)能及時關(guān)聯(lián)同時不造成數(shù)據(jù)堵塞;

這個需求是典型的集多維分析和事務(wù)更新為一體的場景,并且對多維分析的響應(yīng)時間(毫秒級)以及事務(wù)更新效率有極高的要求,所以如何解決HTAP的問題成為一大難題,目前開源社區(qū)并沒有提供一個能較好處理此問題的解決方案,包括Tidb和Greenplum。

我們一起來看看廣投集團實時數(shù)倉是如何巧妙解決這些問題的。

二、常見的實時數(shù)倉方案

常見的實時數(shù)倉架構(gòu)有三種。第一種是Lambda架構(gòu),是目前主流的一套實時數(shù)倉架構(gòu),存在離線和實時兩條鏈路。實時部分以消息隊列的方式實時增量消費,一般以Flink+Kafka的組合實現(xiàn),維度表存在關(guān)系型數(shù)據(jù)庫或者HBase;離線部分一般采用T+1周期調(diào)度分析歷史存量數(shù)據(jù),每天凌晨產(chǎn)出,更新覆蓋前一天的結(jié)果數(shù)據(jù),計算引擎通常會選擇Hive或者Spark。優(yōu)點是數(shù)據(jù)準(zhǔn)確度高,不易出錯;缺點是架構(gòu)復(fù)雜,運維成本高。

第二種是Kappa架構(gòu),相較于Lambda架構(gòu),它移除了離線生產(chǎn)鏈路,思路是通過傳遞任意想要的offset(偏移量)來達(dá)到重新消費處理歷史數(shù)據(jù)的目的。優(yōu)點是架構(gòu)相對簡化,數(shù)據(jù)來源單一,共用一套代碼,開發(fā)效率高;缺點是必須要求消息隊列中保存了存量數(shù)據(jù),而且主要業(yè)務(wù)邏輯在計算層,比較消耗內(nèi)存資源。

第三種是實時OLAP變體架構(gòu),是Kappa架構(gòu)的進一步演化,它的思路是將聚合分析計算由OLAP引擎承擔(dān),減輕實時計算部分的聚合處理壓力。優(yōu)點是自由度高,可以滿足數(shù)據(jù)分析師的實時自助分析需求,減輕了計算引擎的處理壓力;缺點是必須要求消息隊列中保存存量數(shù)據(jù),且因為是將計算部分的壓力轉(zhuǎn)移到了查詢層,對查詢引擎的吞吐和實時攝入性能要求較高。

三、為什么選擇Flink+Clickhouse?

以上任何一種架構(gòu)都難以解決開篇提出的第四個問題,它是影響技術(shù)選型的關(guān)鍵制約因素。為什么這么說呢?Lambda架構(gòu)的數(shù)據(jù)服務(wù)層無法同時滿足批量數(shù)據(jù)查詢、單條數(shù)據(jù)檢索以及Merge合并,而Kappa架構(gòu)和實時OLAP變體架構(gòu)要求實時采集側(cè)要拿到全量的Oracle歸檔日志數(shù)據(jù),這在實際操作上沒有可行性,一方面Oracle是第三方廠商維護的,不允許對線上系統(tǒng)有過多的侵入,容易造成監(jiān)聽故障甚至系統(tǒng)癱瘓,另一方面歸檔日志是在開啟那一刻起才開始生成的,之前的存量數(shù)據(jù)難以進入kafka,但是后來實時數(shù)據(jù)又必須依賴前面的計算結(jié)果。

怎么走出這樣的窘境呢?首先需要達(dá)成一個共識,就是計算層必須是Lambda架構(gòu),并且計算層離線鏈路的數(shù)據(jù)歸檔不再來源于實時日志,而是直接從業(yè)務(wù)庫定期抽取或?qū)搿嶋H項目中由于產(chǎn)品體系技術(shù)兼容性的原因,離線鏈路這里選擇了Hive;實時鏈路上,F(xiàn)link依靠其狀態(tài)管理、容錯機制、低時延和Exactly Once語義的優(yōu)勢依然占據(jù)著流式計算領(lǐng)域難以撼動的地位,所以計算層我們確定了Hive+Flink的架構(gòu)選型。有了這個共識我們再進一步分析數(shù)據(jù)服務(wù)層,這一層的性能要求有哪些呢,不妨先從大數(shù)據(jù)領(lǐng)域的4類場景分析:

batch (B):離線計算

Analytical(A):交互式分析

Servering (S):高并發(fā)的在線服務(wù)

Transaction (T):事務(wù)隔離機制

離線計算通常在計算層,所以我們重點考慮A、S和T,這三個場景在廣投計財實時查詢業(yè)務(wù)中都有涉及,這也是區(qū)別一般互聯(lián)網(wǎng)場景的地方,A、S、T的統(tǒng)一服務(wù)成為了亟待解決的難題。A要求快速的響應(yīng)時間,S需要滿足高并發(fā),T支持實時事務(wù)更新(傳統(tǒng)數(shù)據(jù)庫,一般交易場景對事務(wù)要求高)。市面上很多號稱能做到HTAP的產(chǎn)品,例如Tidb和Greeplum,深入分析就會發(fā)現(xiàn),HTAP其實是一個偽命題,因為A和T的優(yōu)化方向不同,為了保證T必然要犧牲A的性能,相反,如果想做到極致的A,則T的寫入鏈路將非常復(fù)雜,事務(wù)機制和QPS無法滿足需求。所以,不可能在一個系統(tǒng)上性能同時滿足高效的A、S、T。

大數(shù)據(jù)技術(shù)高速發(fā)展的時期,涌現(xiàn)出了一批A性能非常好的OLAP引擎,比如基于cube預(yù)聚合的kylin、Impala、阿里AnalyticsDB,但是適合實時攝入又能夠做離線分析的數(shù)據(jù)分析系統(tǒng)選擇性并不多,當(dāng)前流行的有Druid或Clickhouse,它們是典型的列存架構(gòu),能構(gòu)建index、或者通過向量化計算加速列式計算的分析。在Clickhouse還未被廣泛接受之前,Druid作為實時OLAP被一些互聯(lián)網(wǎng)大廠極力推崇使用,但是一直被詬病的是它復(fù)雜的技術(shù)架構(gòu),組件非常多,包括4個節(jié)點3個依賴,四個節(jié)點分別是實時節(jié)點(Realtime Node)、歷史節(jié)點(Histrical Node)、查詢節(jié)點(Broker Node)、協(xié)調(diào)節(jié)點(Coodinator Node),三個依賴分別是Mysql、Deep storage(如本地磁盤、Hdfs、S3)、Zookeeper,相當(dāng)于內(nèi)部實現(xiàn)了一個Lambda+OLAP的架構(gòu),學(xué)習(xí)成本和使用成本都非常高。下面從TPC-DH性能測試來看看幾大OLAP引擎對比。

Clickhouse與其他分析性系統(tǒng)的性能對比

Clickhouse與其他分析性系統(tǒng)的性能對比

TiDB性能測試

從性能對比數(shù)據(jù)可以看出,Clickhouse在億量級數(shù)據(jù)集上平均響應(yīng)時間為毫秒級,是其他分析性系統(tǒng)的幾十倍甚至上百倍。

為什么Clickhouse在A方向表現(xiàn)如此優(yōu)異?它的S性能又如何呢?我們從存儲和查詢兩個維度來論證。

Clickhouse存儲中的最小單位是DataPart,寫入鏈路為了提升吞吐,放棄了部分寫入實時可見性,即數(shù)據(jù)攢批寫入,一次批量寫入的數(shù)據(jù)會落盤成一個DataPart,它不像Druid那樣一條一條實時攝入。但ClickHouse把數(shù)據(jù)延遲攢批寫入的工作交給來客戶端實現(xiàn),比如達(dá)到10條記錄或每過5s間隔寫入,換句話說就是可以在用戶側(cè)平衡吞吐量和時延,如果在業(yè)務(wù)高峰期流量不是太大,可以結(jié)合實際場景將參數(shù)調(diào)小,以達(dá)到極致的實時效果。

(1)計算能力方面,Clickhouse采用向量化函數(shù)和aggregator算子極大地提升了聚合計算性能,配合完備的SQL能力使得數(shù)據(jù)分析變得更加簡單、靈活。

(2)數(shù)據(jù)掃描方面,ClickHouse是完全列式的存儲計算引擎,而且是以有序存儲為核心,在查詢掃描數(shù)據(jù)的過程中,首先會根據(jù)存儲的有序性、列存塊統(tǒng)計信息、分區(qū)鍵等信息推斷出需要掃描的列存塊,然后進行并行的數(shù)據(jù)掃描,像表達(dá)式計算、聚合算子都是在正規(guī)的計算引擎中處理。從計算引擎到數(shù)據(jù)掃描,數(shù)據(jù)流轉(zhuǎn)都是以列存塊為單位,高度向量化的。

(3)高并發(fā)服務(wù)方面,Clickhouse的并發(fā)能力其實是與并行計算量和機器資源決定的。如果查詢需要掃描的數(shù)據(jù)量和計算復(fù)雜度很大,并發(fā)度就會降低,但是如果保證單個query的latency足夠低(增加內(nèi)存和cpu資源),部分場景下用戶可以通過設(shè)置合適的系統(tǒng)參數(shù)來提升并發(fā)能力,比如max_threads等。其他分析型系統(tǒng)(例如Elasticsearch)的并發(fā)能力為什么很好,從Cache設(shè)計層面來看,ES的Cache包括Query Cache, Request Cache,Data Cache,Index Cache,從查詢結(jié)果到索引掃描結(jié)果層層的Cache加速,因為Elasticsearch認(rèn)為它的場景下存在熱點數(shù)據(jù),可能被反復(fù)查詢。反觀ClickHouse,只有一個面向IO的UnCompressedBlockCache和系統(tǒng)的PageCache,為了實現(xiàn)更優(yōu)秀的并發(fā),我們很容易想到在Clickhouse外面加一層Cache,比如redis,但是分析場景下的數(shù)據(jù)和查詢都是多變的,查詢結(jié)果等Cache都不容易命中,而且在廣投業(yè)務(wù)中實時查詢的數(shù)據(jù)是基于T之后不斷更新的數(shù)據(jù),如果外掛緩存將降低數(shù)據(jù)查詢的時效性。

事實上,Clickhouse在億數(shù)量級數(shù)據(jù)集基礎(chǔ)上聚合分析查詢響應(yīng)時間、吞吐和并發(fā)能力不亞于ES,并且隨著數(shù)據(jù)量的增大而擴大。下圖是分別在2億和5億數(shù)據(jù)集上的測試結(jié)果,Q1、Q2、Q3、Q4表示數(shù)據(jù)量依次增大的sql query。

2億數(shù)據(jù)級的性能對比

5億數(shù)據(jù)級的性能對比

我們已經(jīng)分析Clickhouse在A和S方面的優(yōu)勢,那么它又該如何承載T的業(yè)務(wù)呢?

前面我們已經(jīng)講了,在一個系統(tǒng)中不可能同時實現(xiàn)A和T,為什么不讓他們做自己擅長的事情呢?經(jīng)過深入業(yè)務(wù)洞察發(fā)現(xiàn),廣投計財實時查詢業(yè)務(wù)中T的作用范圍是最近一年的數(shù)據(jù),經(jīng)過估算,頻繁發(fā)生刪改操作的數(shù)據(jù)有500萬左右,不到總數(shù)據(jù)量2.5億的1/50。我們完全可以用兩個不同系統(tǒng)協(xié)作實現(xiàn),其中一個系統(tǒng)實現(xiàn)S和T,一般的關(guān)系型數(shù)據(jù)庫就可以滿足,例如Mysql、Postgresql;另外一個系統(tǒng)實現(xiàn)A,這里我們選擇Clickhouse。我們很容易想到聯(lián)邦查詢,例如Presto和Drill的解決方案,其實Clickhouse內(nèi)部已經(jīng)集成了多個數(shù)據(jù)庫引擎來替代聯(lián)邦查詢,沒必要引入第三方框架,于是適合廣投計財實時查詢業(yè)務(wù)的實時數(shù)倉架構(gòu)如下,筆者暫且稱它為LSTAP架構(gòu)(Lambda+HSTP+OLAP)。

LSTAP架構(gòu)

在這個架構(gòu)中,最外層以一個Clickhouse視圖連接Mysql引擎和Distributed引擎對應(yīng)的表數(shù)據(jù),Mysql只儲存需要實時更新的那部分?jǐn)?shù)據(jù),實時鏈路每天從Mysql中取離線定期刷新的狀態(tài)數(shù)據(jù),確保不會因為實時鏈路網(wǎng)絡(luò)原因、系統(tǒng)故障、應(yīng)用邏輯錯誤等造成數(shù)據(jù)質(zhì)量問題;Distributed引擎對應(yīng)的Clickhouse表存儲歷史數(shù)據(jù),類似于Druid里面的Histrical Node,滿足統(tǒng)計分析和歷史賬單數(shù)據(jù)的查詢需求。

--映射Mysql表提取最新一年的數(shù)據(jù)CREATE TABLE jc_bi.ads_journal_recent_1year( `pk_detail` String, ... `datasource` String, `synctime` String)ENGINE = MySQL('10.100.x.xx:3306', 'jc_bi', 'ads_journal', 'xxx', 'xxx');--副本表CREATE TABLE jc_bi.ads_journal_replica( `pk_detail` String, ... `datasource` String, `synctime` String)ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/ads_journal_replica', '{replica}')PARTITION BY datasourceORDER BY (unitcode,...)SETTINGS index_granularity = 8192;--Clickhouse分布式表,與副本表ads_journal_replica對應(yīng)CREATE TABLE jc_bi.ads_journal_dist( `pk_detail` String, ... `datasource` String, `synctime` String)ENGINE = Distributed('cluster_3shards_1replicas', 'jc_bi', 'ads_journal_replica', rand() % 3);--以視圖合并Mysql引擎和distributed引擎的兩張表CREATE VIEW jc_bi.v_ads_journal( `pk_detail` String, ... `datasource` String, `synctime` String) ASSELECT *FROM jc_bi.jc_bi.ads_journal_recent_1yearUNION ALLSELECT *FROM jc_bi.ads_journal_dist;四、實時數(shù)倉1.0

經(jīng)過前期的技術(shù)調(diào)研和性能分析,基本確定了以Flink+Clickhouse為核心構(gòu)建實時數(shù)倉。當(dāng)然,還需要依賴一些其他技術(shù)組件來支起整個實時數(shù)倉,比如消息隊列Kafka、維度存儲、CDC組件等。廣投數(shù)據(jù)中臺項目的基礎(chǔ)設(shè)施除了部署了開源的CDH存儲與計算平臺之外,還采購了“Dataphin+QuickBI”分別提供數(shù)據(jù)治理能力和可視化能力,在計財實時查詢系統(tǒng)中,Dataphin主要用來承擔(dān)離線任務(wù)調(diào)度以及起到HQL ide集成環(huán)境的作用,QuickBI作為數(shù)據(jù)分析門戶提供數(shù)據(jù)查詢窗口。

數(shù)據(jù)流

這里提幾個關(guān)鍵的設(shè)計點:1、實時維度存儲采用Hbase,對于快變維度可以實現(xiàn)實時更新,rowkey采用“md5(主鍵)取前8位+datasource”,保證唯一性和散列性;2、為了保證離線和實時維度數(shù)據(jù)一致性,將hive dwd層中維表數(shù)據(jù)映射到Hbase中,同時為了保障實時查詢系統(tǒng)的穩(wěn)定性,規(guī)避實時鏈路中由于網(wǎng)絡(luò)延遲、數(shù)據(jù)丟失、維度未及時更新造成數(shù)據(jù)項缺失或其他不可預(yù)知的問題等導(dǎo)致的查詢結(jié)果不可信以及例如kafka集群某節(jié)點掉線,代碼bug導(dǎo)致任務(wù)中斷等造成計算結(jié)果無法回滾,將離線計算結(jié)果每日定期供給到實時應(yīng)用checkpoint,以此來解決開篇的問題2,即端到端的數(shù)據(jù)一致性;

create external table cdmd.dim_bd_accsubj_mapping_hbase(rowkey String,pk_accsubj String,...modifytime String,datasource String)stored by 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'with serdeproperties("hbase.columns.mapping"=":key,f:pk_accsubj,...,f:modifytime,f:datasource")tblproperties("hbase.table.name"="dim:DIM_BD_ACCSUBJ");insert overwrite table cdmd.dim_bd_accsubj_mapping_hbaseselect concat(substring(md5(pk_accsubj),,8),datasource) as rowkey,pk_accsubj,...modifytime,datasourcefrom cdmd.dim_bd_accsubj;

3、使用Mysql結(jié)合Clickhouse的組合方式提供實時數(shù)據(jù)寫入、事實數(shù)據(jù)更新、批量分析、實時響應(yīng)、高并發(fā)查詢?yōu)橐惑w的數(shù)據(jù)服務(wù)能力,解決了開篇問題4,這一點在第三章已經(jīng)詳細(xì)論述;4、采用“多流join+實時維度讀取”的獨創(chuàng)雙保險模式,解決了多流關(guān)聯(lián)場景下的數(shù)據(jù)項丟失和數(shù)據(jù)堵塞問題,即開篇的問題3,這部分將在第五章中詳細(xì)介紹。

五、踩過的“坑”

在整個實時數(shù)倉構(gòu)建的過程中,遇到了不少麻煩,尤其是實時Flink應(yīng)用開發(fā),現(xiàn)將關(guān)鍵問題列舉如下:(1) cdc插件選型網(wǎng)上關(guān)于實時采集Oracle數(shù)據(jù)的資料并不多,通常的做法有以下幾種:

購買Oracel原生提供的OGG,debizum的本質(zhì)也是基于OGG,這種方式雖然省事但是價格昂貴;

Kafka提供了連接各種關(guān)系型數(shù)據(jù)庫的Connect,但是它是基于時間戳或整型增量主鍵的觸發(fā)式拉取,對源系統(tǒng)壓力大且時延較高,最主要的是無法感知刪除操作;

自研kafka的connect,基于Logminer實現(xiàn)重做日志監(jiān)控和解析,并實現(xiàn)Kafka的connect接口將解析后的數(shù)據(jù)推送到Kafka topic;前兩種方案很快就被pass掉了,只剩下第三種方案,在死磕Logminer實現(xiàn)機制和歷經(jīng)艱辛的研發(fā)后,終于實現(xiàn)了Oracle數(shù)據(jù)增刪改的實時監(jiān)控并推送到Kafka。但是在使用一段時間以后會出現(xiàn)數(shù)據(jù)丟失和無響應(yīng)的情況,主要原因是對Logminer查詢的優(yōu)化不夠,鑒于項目緊急程度和時間成本的考量,項目組評估決定暫時放棄自研,于是尋求另外的解決方案,最終使用了Streamsets。Streamsets圖形化的配置非常友好,監(jiān)控也比較穩(wěn)定。但是很快就發(fā)現(xiàn)了它的弊端:online模式(streamsets分為redo模式和online模式,redo模式生成的是原生sql,online模式產(chǎn)生的是解析后的json)下更新操作時無法拿到舊數(shù)據(jù)。由于后續(xù)的架構(gòu)優(yōu)化需要利用這個特性,為了解決這個問題,下一步我們將在這個基礎(chǔ)上進行二次開發(fā),具體方案在“實時數(shù)倉2.0”章節(jié)中介紹。

(2) Idea調(diào)試Flink代碼在開啟checkpoint的情況下,觸發(fā)報錯時不輸出異常信息且不斷重啟Flink默認(rèn)開啟任務(wù)重啟策略,當(dāng)開啟checkpoint時,如果代碼有bug會導(dǎo)致整個任務(wù)不斷重啟,而不會拋出異常,很難排查問題。解決思路:在開發(fā)環(huán)境注釋checkpoint調(diào)試運行,如果業(yè)務(wù)邏輯代碼沒有問題再開啟恢復(fù)checkpoint代碼后重新發(fā)布到線上。(3) hive映射Hbase后數(shù)據(jù)類型不對應(yīng)問題將維度表從hive映射到hbase時,假如在hive中的數(shù)據(jù)類型是smallint類型,如果映射到hbase中然后將get到的字節(jié)數(shù)組轉(zhuǎn)int會報錯。解決辦法:先將get到的字節(jié)數(shù)據(jù)轉(zhuǎn)String類型然后使用包裝類轉(zhuǎn)int。(4) hdfs配置高可用后namenode節(jié)點切換導(dǎo)致hdfs地址失效在使用RocksDb狀態(tài)后端時需要設(shè)置一個hdfs路徑用來保存checkpoint文件。如果hdfs路徑寫死為active的節(jié)點,當(dāng)集群出現(xiàn)問題namenode切換時,原來的active狀態(tài)的namenode變?yōu)閟tandby狀態(tài),代碼會拋出異常:

錯誤日志

解決辦法:Hadoop配置nameservice,然后hdfs路徑使用nameservice路徑。(5) fastjson的坑用fastjson將一個pojo對象轉(zhuǎn)換為json字符串時,如果pojo的屬性名同時有大小寫,那么直接使用JSONObject.toJSONString方法轉(zhuǎn)換json會造成屬性的大小寫改變。另外使用fastjson將json字符串轉(zhuǎn)換為JSONObject時可能丟失一些屬性。為了穩(wěn)定選擇Gson。(6)實時數(shù)據(jù)亂序?qū)е掠嬎憬Y(jié)果錯誤財務(wù)人員在核算系統(tǒng)上的每個操作動作在數(shù)據(jù)庫中都有對應(yīng)的事務(wù)變化,但是這個變化可能不是一一對應(yīng)的,一個動作可能產(chǎn)生多個事務(wù),而且每個系統(tǒng)的規(guī)則可能都不一樣:刪除操作可能是物理刪除也可能標(biāo)記刪除,更新操作可能在原紀(jì)錄直接更新,也可能標(biāo)記刪除后再插入新的數(shù)據(jù)。比如其中一個核算系統(tǒng)的更新憑證表現(xiàn)為:insert-update-delete-insert,刪除憑證操作表現(xiàn)為:update標(biāo)志位。在分布式場景下,數(shù)據(jù)流從kafka(多個partition分區(qū))到Flink的過程中,數(shù)據(jù)的先后順序會發(fā)生改變導(dǎo)致計算結(jié)果錯誤,解決數(shù)據(jù)亂序問題有兩種方案:第一種是kafka設(shè)置單分區(qū),第二種是在Flink中分組處理。第一種方案在生產(chǎn)環(huán)境業(yè)務(wù)高峰期顯然是不太合適的,對kafka單節(jié)點壓力較大且無法發(fā)揮分布式系統(tǒng)并行處理能力的優(yōu)勢。于是我們選擇了第二種方案,以主鍵或聯(lián)合主鍵作為分區(qū)鍵,保證同一主鍵或聯(lián)合主鍵對應(yīng)的數(shù)據(jù)有序。(7) 關(guān)聯(lián)維度等待數(shù)據(jù)導(dǎo)致數(shù)據(jù)阻塞問題在對事實表數(shù)據(jù)進行拉寬操作時,需要從hbase關(guān)聯(lián)維度數(shù)據(jù),對于實時更新的維度,并不能保證在從hbase取維度數(shù)據(jù)之前,維度數(shù)據(jù)已經(jīng)更新到hbase。

最初的方案是如果從hbase拿不到維度數(shù)據(jù)則繼續(xù)查,直到拿到維度數(shù)據(jù)或者超過500毫秒。Demon如下:

public static byte[] getCellBytes(Connection connection, String tableName, String rowkey, String columnFamily, String column) { //獲取表的連接 Table table = null; try { table = connection.getTable(TableName.valueOf("dim", tableName)); } catch (IOException e) { e.printStackTrace(); log.error("查詢hbase中維表過程中創(chuàng)建table出錯!!!"); } //創(chuàng)建get對象 Get get = new Get(Bytes.toBytes(rowkey)); Result result = null; if ("T_GL_VOUCHERENTRY_Data".equals(tableName)) { long time = System.currentTimeMillis(); byte[] value; do { try { result = table.get(get); } catch (IOException e) { e.printStackTrace(); log.error("查詢hbase中維表過程中g(shù)et result出錯!!!"); } value = result.getValue(Bytes.toBytes(columnFamily), Bytes.toBytes(column)); } while ( value == null && System.currentTimeMillis() <= time + 500); //未拿到維度數(shù)據(jù)則等待500ms return value; } else { try { result = table.get(get); } catch (IOException e) { e.printStackTrace(); log.error("查詢hbase中維表過程中g(shù)et result出錯!!!"); } byte[] value = result.getValue(Bytes.toBytes(columnFamily), Bytes.toBytes(column)); return value; } }

這個方案帶來了致命性問題:數(shù)據(jù)處理時延非常高,達(dá)不到理想的實時效果。每條憑證數(shù)據(jù)要關(guān)聯(lián)多個字段,假如每關(guān)聯(lián)一個字段都要等待500ms,每條數(shù)據(jù)都要等好幾秒才能拿到全部維度,在數(shù)據(jù)量大的情況下數(shù)據(jù)堵塞非常嚴(yán)重,時延超過一小時。

為了解決這個問題,決定采用“多流join”的方案:使用開窗函數(shù),三流join關(guān)聯(lián)維度。拿浪潮核算系統(tǒng)為例,F(xiàn)link消費Kafka中事實數(shù)據(jù)為一條事實流,消費憑證維度數(shù)據(jù)為一條維度流,消費輔助項維度數(shù)據(jù)為另一條維度流,采用處理時間(process time)語義,對三條流進行開窗,窗口長度時間為5秒,使用coGroup算子左連接進行三流join。

多join雖然能解決時延問題,但是假如事實數(shù)據(jù)和維度數(shù)據(jù)所在的窗口不對齊,那么會導(dǎo)致拿不到相應(yīng)的維度數(shù)據(jù)。為了解決這一問題,同時另外運行一個維度更新的Flink任務(wù)將更新的維度數(shù)據(jù)寫入hbase。在事實流與維度流進行左連接join的時候,若維度流中拿不到該維度數(shù)據(jù)則往hbase查詢,即“多流join+Hbase維度讀取”的雙重保險方案。Demon如下:

DataStream<DWD_GL_DETAIL> joinDs1 = LangChaoFctDs.coGroup(LangChaoVoucherDim) .where(new KeySelector<FCT_GL_DETAIL, String>() { @Override public String getKey(FCT_GL_DETAIL value) throws Exception { String pk_voucher = value.getPk_voucher(); return pk_voucher; } }) .equalTo(new KeySelector<DIM_BD_VOUCHER, String>() { @Override public String getKey(DIM_BD_VOUCHER value) throws Exception { String pk_voucher = value.getPk_voucher(); return pk_voucher; } }) .window(TumblingProcessingTimeWindows.of(Time.seconds(5))) .apply(new RichCoGroupFunction<FCT_GL_DETAIL, DIM_BD_VOUCHER, DWD_GL_DETAIL>() { Connection connection; Gson gson; @Override public void open(Configuration parameters) throws Exception { connection = HbaseConnection.createConnection(); gson = new Gson(); } @Override public void coGroup(Iterable<FCT_GL_DETAIL> first, Iterable<DIM_BD_VOUCHER> second, Collector<DWD_GL_DETAIL> out) throws Exception { for (FCT_GL_DETAIL fct_gl_detail : first) { String pk_voucher = fct_gl_detail.getPk_voucher(); String datasource = fct_gl_detail.getDatasource(); DWD_GL_DETAIL dwdpojo = new DWD_GL_DETAIL(); DWD_GL_DETAIL dwd_gl_detail = handleDwdLc(connection, gson, fct_gl_detail, dwdpojo); Boolean ishave = false; for (DIM_BD_VOUCHER dim_bd_voucher : second) { //省略中間處理過程 ishave = true; } //如果維度流中拿不到維度數(shù)據(jù)則從hbase中查詢 if (!ishave && pk_voucher != null) { String rowkey1 = Md5Util.getMD5Str(pk_voucher).substring(, 8) + datasource; //省略往hbase查詢憑證維度代碼 } out.collect(dwd_gl_detail); } } @Override public void close() throws Exception { if (connection != null) { connection.close(); } } });//雙join關(guān)聯(lián)輔助項維度與關(guān)聯(lián)憑證維度的邏輯類似,這里省略。

新方案解決了數(shù)據(jù)處理不過來導(dǎo)致的背壓問題,將處理時延降低到5秒以內(nèi)。

六、實踐展望-實時數(shù)倉2.0

實時數(shù)倉1.0在廣投集團已經(jīng)穩(wěn)定上線運行近一個月,但是回顧技術(shù)架構(gòu),盡管它解決了實時計算領(lǐng)域的AST共存問題,其實還有一些優(yōu)化改進的地方,比如存儲冗余、實時和離線獨立開發(fā)、查詢系統(tǒng)依賴組件較多等,接下來我們逐一分析。

仔細(xì)分析實時數(shù)倉1.0的架構(gòu)我們會發(fā)現(xiàn):為了解決離線計算問題,往Hive里面存儲一份數(shù)據(jù);為了解決流式數(shù)據(jù)緩沖問題,在Kafka存儲了一份;為了解決維度更新和點查詢的問題,又往HBase里面存儲一份;為了解決列存的快速分析,數(shù)據(jù)需要Clickhouse里面存一份,甚至為了提供S和T的服務(wù)能力,我們又加入了Mysql。這樣帶來的問題就是:(1)存儲成本高同一份數(shù)據(jù)在多個系統(tǒng)中做了冗余存儲,增加了存儲成本。(2)維護難度大每個系統(tǒng)的存儲格式不一致,導(dǎo)入導(dǎo)出需要做類型轉(zhuǎn)換,而且隨著業(yè)務(wù)量增加,系統(tǒng)本身的維護變得困難,甚至需要配置專業(yè)的集群運維人員,對系統(tǒng)異常和安全管控做日常巡檢。(3)學(xué)習(xí)成本高每一套存儲系統(tǒng)運行原理和開發(fā)方式都不一樣,對于新人來講很難快速上手,增加了培訓(xùn)成本。

有沒有一個統(tǒng)一的存儲產(chǎn)品呢?有,數(shù)據(jù)湖,目前關(guān)注度比較高的有Databricks推出的Delta Lake、Uber的Hudi以及Netflix的Iceberg,詳細(xì)的參數(shù)對比可參考https://mp.weixin.qq.com/s/m8-iFg-ekykWGrG3gXlLew。Delta Lake和Hudi都和Spark結(jié)合的比較好,不得不說,在數(shù)據(jù)湖的實踐方面,Spark生態(tài)構(gòu)建走在了Flink前面,但是也已經(jīng)有一些互聯(lián)網(wǎng)大廠開始實踐Hudi、Iceberg與Flink結(jié)合的實時數(shù)倉,期待數(shù)據(jù)湖開源社區(qū)能夠兼容Flink。

目前離線和實時采用兩套代碼開發(fā),離線采用HQL,實時使用Flink DataStream API 進行開發(fā),會導(dǎo)致開發(fā)時間長,可閱讀性差,代碼交接和運維難度大。

基于數(shù)據(jù)湖的流批一體、版本管理、自管理schema的特性,以及Flink 1.12以后具備批流API的統(tǒng)一以及與Hive的集成,我們很方便地將Hive中的數(shù)據(jù)遷移到數(shù)據(jù)湖,使用Flink SQL進行離線和實時程序開發(fā),并且可以共用一套代碼。但是目前數(shù)據(jù)湖在寫多讀少的場景性能還有待提升,我們不妨拭目以待。

查詢層為了實現(xiàn)AST服務(wù)能力,我們引入了Clickhouse+Mysql的架構(gòu)。查詢層的架構(gòu)能否再簡化呢?答案是肯定的。

首先,通過 jdbc連接器的方式 create table 創(chuàng)建Clickhouse 表

CREATE TABLE gl_detail_ck ( `PK_DETAIL` STRING, ... `TABLENAME` STRING, `OPERATION` STRING, `DATASOURCE` STRING, `SCN` BIGINT ) WITH ('connector' = 'jdbc', 'url' = 'jdbc:clickhouse://:/', 'table-name' = 'gl_detail', 'username' = '', 'password' = '' );

Flink Sql jdbc的方式默認(rèn)是不能集成Clickhouse的,為什么呢?

因為在源碼中已經(jīng)寫死了只支持Derby,Mysql,Postgres方言。如果非要集成,有以下兩種途徑:(1)修改Flink源代碼,重新打包,添加修改后的jar包(2)使用反射修改DIALECTS靜態(tài)final屬性第一種方式太麻煩了,在這里我們采用第二種方案:

CREATE TABLE jc_bi.gl_detail ( `pk_detail` String, ...`sign` Int8, `version` UInt32 )ENGINE = VersionedCollapsingMergeTree(sign,version) PARTITION BY datasource ORDER BY pk_detail SETTINGS index_granularity = 8192;

image.png

(1)為了解決數(shù)據(jù)爆發(fā)式增長問題,需要定期執(zhí)行

optimize table 表名

(2)如果要實現(xiàn)數(shù)據(jù)增量折疊,必須拿到修改之前的舊數(shù)據(jù)來以此來抵消上個版本對應(yīng)的數(shù)據(jù),這里采用“Streamsets+原生SQL解析”的方式實現(xiàn)。

通過 Streamsets data controller(sdc)消費歸檔日志獲取執(zhí)行語句后寫入 Kafka 中

{"sql":"update 'GL_DETAIL' set 'ERRMESSAGE' = NULL where PK_DETAIL=********,、、、","TABLENAME":"GL_DETAIL", "OPERATION":"UPDATE", "DATASOURCE":"NC","SCN":"91685778"}

通過Flink消費 Kafka 并且解析 Sql 獲取更新前的字段Sql解析邏輯簡化如下:

public void parse(String sqlRedo) throws JSQLParserException { //通過jsqlparser開源sql解析框架對Sql進行解析獲取Satement Statement stmt = CCJSqlParserUtil.parse(sqlRedo); LinkedHashMap<String,String> afterDataMap = new LinkedHashMap<>(); LinkedHashMap<String,String> beforeDataMap = new LinkedHashMap<>(); parseUpdateStmt((Update) stmt, beforeDataMap, afterDataMap, sqlRedo);}private static void parseUpdateStmt(Update update, LinkedHashMap<String,String> beforeDataMap, LinkedHashMap<String,String> afterDataMap, String sqlRedo){ Iterator<Expression> iterator = update.getExpressions().iterator(); //通過獲取更新字段的迭代器填充afterDataMap for (Column c : update.getColumns()){ afterDataMap.put(cleanString(c.getColumnName()), cleanString(iterator.next().toString())); } //通過where語句來獲取未修改的字段的值 if(update.getWhere() != null){ update.getWhere().accept(new ExpressionVisitorAdapter() { @Override public void visit(final EqualsTo expr){ String col = cleanString(expr.getLeftExpression().toString()); if(afterDataMap.containsKey(col)){ String value = cleanString(expr.getRightExpression().toString()); beforeDataMap.put(col, value); } else { String value = cleanString(expr.getRightExpression().toString()); beforeDataMap.put(col, value); afterDataMap.put(col, value); } } }); }else{ LOG.error("where is null when LogParser parse sqlRedo, sqlRedo = {}, update = {}", sqlRedo, update.toString()); } }

最終解析后將數(shù)據(jù)再寫入到kafka,數(shù)據(jù)如下:

{"scn":91685778,"type":"UPDATE","schema":"nc", "table":"GL_DETAIL","ts":6797472127529390080,"opTime":91945745,"after_TS":"2021-05-10 18:50:53"、、、、,"before_TS":"2021-05-10 18:50:23"、、、、}

Flink Sql進行數(shù)據(jù)版本標(biāo)記,再寫入到Clickhouse中,至此實現(xiàn)了與關(guān)系型數(shù)據(jù)庫一致的增刪改查, 主要邏輯如下。

Table sqlParsertest =tEnv.sqlQuery("select "+ "*"+"from (\n" +"select "+ before_column + " , -1 AS sign ,abs(hashcode("+before_column_string+")) AS version from sqlParsertest where type = 'UPDATE'\n" +"union all\n" +"select "+ after_column + ", 1 AS sign , abs(hashcode("+after_column_string+")) AS version from sqlParsertest where type = 'UPDATE' " +"union all\n" +"select " +after_column +" ,1 AS sign, abs(hashcode(" +after_column_string +")) AS version from sqlParsertest where type = 'INSERT'\n" +"union all\n" +"select "+ before_column+",-1 AS sign, abs(hashcode( "+before_column_string +")) AS version from sqlParsertest where type = 'DELETE' "+") \n" );tEnv.executeSql( " insert into gl_detail_ck select * from " + sqlParsertest);

通過abs(hashcode("+before_column_string+"))來得到版本號,當(dāng)刪除和更新時都會生成與舊數(shù)據(jù)相同的版本號,同時通過-1的標(biāo)志位來實現(xiàn)折疊的效果,從而實現(xiàn)與關(guān)系型數(shù)據(jù)一樣的增刪改查操作。

實現(xiàn)效果如下:

相同版本號且標(biāo)志位相反的數(shù)據(jù)被折疊抵消了,實現(xiàn)了增刪改的操作。

如果更新操作僅修改了度量,可以通過變體sql的查詢方式實現(xiàn)折疊的效果,獲得最新數(shù)據(jù)。m(sign) > 0;

因為在定義version字段之后,VersionedCollapsingMergeTree會自動將version作為排序條件并增加到ORDER BY的末端,就上述的例子而言,最終的排序字段為ORDER BY pk_detail,version desc。

但是如果更新操作修改了度量之外的屬性信息,需要執(zhí)行:

SELECT pk_detail, sum(度量 * sign) FROM gl_detail GROUP BY pk_detail HAVING sum(sign) > ;

在實時數(shù)倉1.0架構(gòu)優(yōu)化后,架構(gòu)進行了極度簡化,實時數(shù)倉2.0架構(gòu)如下:

實時數(shù)倉2.0架構(gòu)

實時數(shù)倉2.0架構(gòu)統(tǒng)一了存儲、計算和查詢,分別由三個獨立產(chǎn)品負(fù)責(zé),分別是數(shù)據(jù)湖、Flink和Clickhouse。數(shù)倉分層存儲和維度表管理均由數(shù)據(jù)湖承擔(dān),F(xiàn)link SQL負(fù)責(zé)批流任務(wù)的SQL化協(xié)同開發(fā),ClickHouse實現(xiàn)變體的事務(wù)機制,為用戶提供離線分析和交互查詢。CDC到消息隊列這一鏈路將來是完全可以去掉的,只需要Flink CDC家族中再添加Oracle CDC一員。未來,實時數(shù)倉架構(gòu)將得到極致的簡化并且性能有質(zhì)的提升。


億信華辰助力數(shù)字中國新征程
億信華辰17年來,致力于為客戶提供一站式數(shù)據(jù)管理服務(wù),對政務(wù)信息化也有著深刻的理解和積累,豐富的技術(shù)和業(yè)務(wù)能力,為數(shù)字時代下信息化智慧創(chuàng)新發(fā)展增添助力。在2021年被IDC認(rèn)可為數(shù)據(jù)治理行業(yè)認(rèn)知引領(lǐng)型廠商,在政務(wù)數(shù)據(jù)治理領(lǐng)域取得了不俗的成績,在2022年被IDC認(rèn)可中國數(shù)據(jù)治理解決方案市場份額第一

在數(shù)據(jù)治理領(lǐng)域,億信華辰基于數(shù)據(jù)資產(chǎn)管理理念,打造的數(shù)據(jù)治理全功能產(chǎn)品睿治數(shù)據(jù)治理平臺,滿足用戶在數(shù)據(jù)治理領(lǐng)域的全場景需求。包含元數(shù)據(jù)管理數(shù)據(jù)標(biāo)準(zhǔn)管理、數(shù)據(jù)質(zhì)量管理、主數(shù)據(jù)管理、數(shù)據(jù)集成管理、數(shù)據(jù)交換管理、實時大數(shù)據(jù)管理、數(shù)據(jù)資產(chǎn)管理、數(shù)據(jù)安全管理、數(shù)據(jù)生命周期管理等十大產(chǎn)品模塊,可獨立或靈活組合使用,打通數(shù)據(jù)治理各個環(huán)節(jié),可快速滿足用戶各類不同的數(shù)據(jù)治理場景。

在數(shù)據(jù)分析領(lǐng)域,十多年技術(shù)沉淀,億信華辰首創(chuàng)一站式數(shù)據(jù)分析平臺ABI,融合了ETL數(shù)據(jù)處理、數(shù)據(jù)建模、數(shù)據(jù)可視化、數(shù)據(jù)分析、數(shù)據(jù)填報、移動應(yīng)用等核心功能,打通數(shù)據(jù)分析應(yīng)用全鏈路,實現(xiàn)數(shù)據(jù)填報、處理、分析一體化,以挖掘預(yù)測分析、大數(shù)據(jù)分析,人工智能分析為引領(lǐng),讓決策層直觀洞悉數(shù)據(jù)聯(lián)系,輔助智能決策。

在眾多實踐中,億信華辰有幸與廣州荔灣區(qū)政數(shù)局、佛山禪城區(qū)政數(shù)局、江蘇江陰大數(shù)據(jù)中心、四川涼山州政府、甘肅稅務(wù)局等各大政務(wù)部門緊緊圍繞政務(wù)服務(wù)“一網(wǎng)通辦”和辦事“只進一扇門”、“最多跑一次”等要求,加強政務(wù)信息系統(tǒng)整合共享,建設(shè)數(shù)據(jù)治理體系,推動智慧城市建設(shè),打造政務(wù)服務(wù)高地。

未來,億信華辰將以數(shù)字政府建設(shè)為契機,以我們豐富的數(shù)據(jù)治理經(jīng)驗為政府機構(gòu)提供專業(yè)的技術(shù)服務(wù),更好地挖掘數(shù)據(jù)的潛在價值,推動科技創(chuàng)新和數(shù)字經(jīng)濟發(fā)展。

(部分內(nèi)容來源網(wǎng)絡(luò),如有侵權(quán)請聯(lián)系刪除)
立即申請數(shù)據(jù)分析/數(shù)據(jù)治理產(chǎn)品免費試用 我要試用
customer

在線咨詢

在線咨詢

點擊進入在線咨詢