1. 實(shí)時(shí)需求日趨迫切
目前各大公司的產(chǎn)品需求和內(nèi)部決策對(duì)于數(shù)據(jù)實(shí)時(shí)性的要求越來(lái)越迫切, ?需要實(shí)時(shí)數(shù)倉(cāng)的能?來(lái)賦能 。傳統(tǒng)離 線數(shù)倉(cāng)的數(shù)據(jù)時(shí)效性是 T+1,調(diào)度頻率以天為單位,?法?撐實(shí)時(shí)場(chǎng)景的數(shù)據(jù)需求 。即使能將調(diào)度頻率設(shè)置成 ?時(shí),也只能解決部分時(shí)效性要求不高的場(chǎng)景,對(duì)于實(shí)效性要求很高的場(chǎng)景還是?法優(yōu)雅的?撐 。因此實(shí)時(shí)使 用數(shù)據(jù)的問(wèn)題必須得到有效解決。
2. 實(shí)時(shí)技術(shù)日趨成熟
實(shí)時(shí)計(jì)算框架已經(jīng)經(jīng)歷了三代發(fā)展,分別是:Storm 、SparkStreaming 、Flink,計(jì)算框架越來(lái)越成熟。
???, ?實(shí)時(shí)任務(wù)的開(kāi)發(fā)已經(jīng)能通過(guò)編寫(xiě) SQL 的?式來(lái)完成,在技術(shù)層?能很好地繼承離線數(shù)倉(cāng)的架構(gòu)設(shè)計(jì) 思想;
另??? ,在線數(shù)據(jù)開(kāi)發(fā)平臺(tái)所提供的功能對(duì)實(shí)時(shí)任務(wù)開(kāi)發(fā) 、調(diào)試 、運(yùn)維的?持也?漸趨于成熟, ?開(kāi)發(fā)成本逐 步降低,有助于去做這件事。
二、實(shí)時(shí)數(shù)倉(cāng)建設(shè)目的
1. 解決傳統(tǒng)數(shù)倉(cāng)的問(wèn)題
從目前數(shù)倉(cāng)建設(shè)的現(xiàn)狀來(lái)看, ?實(shí)時(shí)數(shù)倉(cāng)是?個(gè)容易讓?產(chǎn)生混淆的概念,根據(jù)傳統(tǒng)經(jīng)驗(yàn)分析,數(shù)倉(cāng)有?個(gè)重要 的功能, ?即能夠記錄歷史 。通常,數(shù)倉(cāng)都是希望從業(yè)務(wù)上線的第?天開(kāi)始有數(shù)據(jù),然后?直記錄到現(xiàn)在。
但實(shí)時(shí)流處理技術(shù), ??是強(qiáng)調(diào)當(dāng)前處理狀態(tài)的?個(gè)技術(shù),結(jié)合當(dāng)前?線大?的建設(shè)經(jīng)驗(yàn)和滴滴在該領(lǐng)域的建設(shè) 現(xiàn)狀,我們嘗試把公司內(nèi)實(shí)時(shí)數(shù)倉(cāng)建設(shè)的目的定位為, ?以數(shù)倉(cāng)建設(shè)理論和實(shí)時(shí)技術(shù),解決由于當(dāng)前離線數(shù)倉(cāng)數(shù) 據(jù)時(shí)效性低解決不了的問(wèn)題。
現(xiàn)階段我們要建設(shè)實(shí)時(shí)數(shù)倉(cāng)的主要原因是:
公司業(yè)務(wù)對(duì)于數(shù)據(jù)的實(shí)時(shí)性越來(lái)越迫切, ?需要有實(shí)時(shí)數(shù)據(jù)來(lái)輔助完成決策;
實(shí)時(shí)數(shù)據(jù)建設(shè)沒(méi)有規(guī)范,數(shù)據(jù)可用性較差,?法形成數(shù)倉(cāng)體系, ?資源大量浪費(fèi);
數(shù)據(jù)平臺(tái)?具對(duì)整體實(shí)時(shí)開(kāi)發(fā)的?持也?漸趨于成熟, ?開(kāi)發(fā)成本降低。
2. 實(shí)時(shí)數(shù)倉(cāng)的應(yīng)用場(chǎng)景
實(shí)時(shí) OLAP 分析;
實(shí)時(shí)數(shù)據(jù)看板;
實(shí)時(shí)業(yè)務(wù)監(jiān)控;
實(shí)時(shí)數(shù)據(jù)接?服務(wù)。
三、實(shí)時(shí)數(shù)倉(cāng)建設(shè)方案
接下來(lái)我們分析下目前實(shí)時(shí)數(shù)倉(cāng)建設(shè)比較好的?個(gè)案例,希望這些案例能夠給?家?guī)?lái)?些啟發(fā)。
1. 滴滴順風(fēng)車實(shí)時(shí)數(shù)倉(cāng)案例
滴滴數(shù)據(jù)團(tuán)隊(duì)建設(shè)的實(shí)時(shí)數(shù)倉(cāng),基本滿足了順風(fēng)車業(yè)務(wù)方在實(shí)時(shí)側(cè)的各類業(yè)務(wù)需求,初步建立起順風(fēng)車實(shí)時(shí)數(shù) 倉(cāng),完成了整體數(shù)據(jù)分層, ?包含明細(xì)數(shù)據(jù)和匯總數(shù)據(jù),統(tǒng)?了 DWD 層, ?降低了?數(shù)據(jù)資源消耗,提高了數(shù)據(jù)
復(fù)用性,可對(duì)外輸出豐富的數(shù)據(jù)服務(wù)。
數(shù)倉(cāng)具體架構(gòu)如下圖所示:

從數(shù)據(jù)架構(gòu)圖來(lái)看,順風(fēng)車實(shí)時(shí)數(shù)倉(cāng)和對(duì)應(yīng)的離線數(shù)倉(cāng)有很多類似的地方 。例如分層結(jié)構(gòu);?比如 ODS 層, ?明 細(xì)層,匯總層, ?乃至應(yīng)用層,他們命名的模式可能都是?樣的 。但仔細(xì)比較不難發(fā)現(xiàn),兩者有很多區(qū)別:
與離線數(shù)倉(cāng)相比, ?實(shí)時(shí)數(shù)倉(cāng)的層次更少?些
從目前建設(shè)離線數(shù)倉(cāng)的經(jīng)驗(yàn)來(lái)看,數(shù)倉(cāng)的數(shù)據(jù)明細(xì)層內(nèi)容會(huì)非常豐富,處理明細(xì)數(shù)據(jù)外?般還會(huì)包含輕度 匯總層的概念, ?另外離線數(shù)倉(cāng)中應(yīng)用層數(shù)據(jù)在數(shù)倉(cāng)內(nèi)部,但實(shí)時(shí)數(shù)倉(cāng)中,app 應(yīng)用層數(shù)據(jù)已經(jīng)落?應(yīng)用系 統(tǒng)的存儲(chǔ)介質(zhì)中,可以把該層與數(shù)倉(cāng)的表分離;
應(yīng)用層少建設(shè)的好處:實(shí)時(shí)處理數(shù)據(jù)的時(shí)候,每建?個(gè)層次,數(shù)據(jù)必然會(huì)產(chǎn)生?定的延遲;
匯總層少建的好處:在匯總統(tǒng)計(jì)的時(shí)候,往往為了容忍?部分?jǐn)?shù)據(jù)的延遲,可能會(huì)?為的制造?些延遲來(lái) 保證數(shù)據(jù)的準(zhǔn)確 。舉例,在統(tǒng)計(jì)跨天相關(guān)的訂單事件中的數(shù)據(jù)時(shí),可能會(huì)等到 00?00?05 或者 00?00? 10
再統(tǒng)計(jì),確保 00?00 前的數(shù)據(jù)已經(jīng)全部接受到位了,再進(jìn)?統(tǒng)計(jì) 。所以,匯總層的層次太多的話,就會(huì)更 大的加重?為造成的數(shù)據(jù)延遲。
與離線數(shù)倉(cāng)相比, ?實(shí)時(shí)數(shù)倉(cāng)的數(shù)據(jù)源存儲(chǔ)不同
在建設(shè)離線數(shù)倉(cāng)的時(shí)候, ? 目前滴滴內(nèi)部整個(gè)離線數(shù)倉(cāng)都是建?在 Hive 表之上 。但是,在建設(shè)實(shí)時(shí)數(shù)倉(cāng)的 時(shí)候, ?同?份表,會(huì)使用不同的方式進(jìn)?存儲(chǔ) 。比如常?的情況下, ?明細(xì)數(shù)據(jù)或者匯總數(shù)據(jù)都會(huì)存在
Kafka 里? ,但是像城市 、渠道等維度信息需要借助 Hbase, ?mysql 或者其他 KV 存儲(chǔ)等數(shù)據(jù)庫(kù)來(lái)進(jìn)?存 儲(chǔ)。
接下來(lái),根據(jù)順?車實(shí)時(shí)數(shù)倉(cāng)架構(gòu)圖,對(duì)每?層建設(shè)做具體展開(kāi):
實(shí)時(shí)數(shù)倉(cāng)分層建設(shè)
1. ODS 貼源層建設(shè)
根據(jù)順?車具體場(chǎng)景, ? 目前順?車數(shù)據(jù)源主要包括訂單相關(guān)的 binlog 日志, ?冒泡和安全相關(guān)的 public 日志, 流量相關(guān)的埋點(diǎn)日志等。
這些數(shù)據(jù)部分已采集寫(xiě)? kafka 或 ddmq 等數(shù)據(jù)通道中,部分?jǐn)?shù)據(jù)需要借助內(nèi)部自研同步?具完成采集, ?最終 基于順?車數(shù)倉(cāng) ods 層建設(shè)規(guī)范分主題統(tǒng)?寫(xiě)? kafka 存儲(chǔ)介質(zhì)中。
命名規(guī)范:ODS 層實(shí)時(shí)數(shù)據(jù)源主要包括兩種。
?種是在離線采集時(shí)已經(jīng)自動(dòng)生產(chǎn)的 DDMQ 或者是 Kafkatopic, ?這類型的數(shù)據(jù)命名方式為采集系統(tǒng)自動(dòng)
生成規(guī)范為:cn-binlog-數(shù)據(jù)庫(kù)名-數(shù)據(jù)庫(kù)名 eg:? cn-binlog-ihap_fangyuan-ihap_fangyuan ??種是需要自?進(jìn)?采集同步到 kafkatopic 中,生產(chǎn)的 topic 命名規(guī)范同離線類似:ODS 層采 用:? realtime_ods_binlog_{源系統(tǒng)庫(kù)/表名}/ods_log_{日志名} eg:
realtime_ods_binlog_ihap_fangyuan
2. DWD 明細(xì)層建設(shè)
根據(jù)順?車業(yè)務(wù)過(guò)程作為建模驅(qū)動(dòng),基于每個(gè)具體的業(yè)務(wù)過(guò)程特點(diǎn),構(gòu)建最細(xì)粒度的明細(xì)層事實(shí)表;結(jié)合順? 車分析師在離線側(cè)的數(shù)據(jù)使用特點(diǎn),將明細(xì)事實(shí)表的某些重要維度屬性字段做適當(dāng)冗余,完成寬表化處理, ?之 后基于當(dāng)前順?車業(yè)務(wù)方對(duì)實(shí)時(shí)數(shù)據(jù)的需求重點(diǎn), ?重點(diǎn)建設(shè)交易 、財(cái)務(wù) 、體驗(yàn) 、安全 、流量等?大模塊;該層 的數(shù)據(jù)來(lái)源于 ODS 層,通過(guò)大數(shù)據(jù)架構(gòu)提供的 Stream SQL 完成 ETL ?作,對(duì)于 binlog 日志的處理主要進(jìn)? 簡(jiǎn)單的數(shù)據(jù)清洗 、處理數(shù)據(jù)漂移和數(shù)據(jù)亂序, ?以及可能對(duì)多個(gè) ODS 表進(jìn)? Stream Join,對(duì)于流量日志主要是 做通用的 ETL 處理和針對(duì)順?車場(chǎng)景的數(shù)據(jù)過(guò)濾,完成?結(jié)構(gòu)化數(shù)據(jù)的結(jié)構(gòu)化處理和數(shù)據(jù)的分流;該層的數(shù)據(jù) 除了存儲(chǔ)在消息隊(duì)列 Kafka 中,通常也會(huì)把數(shù)據(jù)實(shí)時(shí)寫(xiě)? Druid 數(shù)據(jù)庫(kù)中,供查詢明細(xì)數(shù)據(jù)和作為簡(jiǎn)單匯總數(shù) 據(jù)的加?數(shù)據(jù)源。
命名規(guī)范:DWD 層的表命名使用英文?寫(xiě)字母, ?單詞之間用下劃線分開(kāi),總?度不能超過(guò) 40 個(gè)字符,并且應(yīng) 遵循下述規(guī)則:? realtime_dwd_{業(yè)務(wù)/pub}_{數(shù)據(jù)域縮寫(xiě)}_ [{業(yè)務(wù)過(guò)程縮寫(xiě)}]_ [{自定義表命名標(biāo)簽縮寫(xiě)}]
{業(yè)務(wù)/pub}:參考業(yè)務(wù)命名
{數(shù)據(jù)域縮寫(xiě)}:參考數(shù)據(jù)域劃分部分
{自定義表命名標(biāo)簽縮寫(xiě)}:實(shí)體名稱可以根據(jù)數(shù)據(jù)倉(cāng)庫(kù)轉(zhuǎn)換整合后做?定的業(yè)務(wù)抽象的名稱,該名稱應(yīng)該
準(zhǔn)確表述實(shí)體所代表的業(yè)務(wù)含義
樣例:realtime_dwd_trip_trd_order_base
3. DIM 層
公共維度層,基于維度建模理念思想, ?建?整個(gè)業(yè)務(wù)過(guò)程的?致性維度, ?降低數(shù)據(jù)計(jì)算?徑和算法不統(tǒng)? ?險(xiǎn);
DIM 層數(shù)據(jù)來(lái)源于兩部分:?部分是 Flink 程序?qū)崟r(shí)處理 ODS 層數(shù)據(jù)得到, ?另外?部分是通過(guò)離線任務(wù) 出倉(cāng)得到;
DIM 層維度數(shù)據(jù)主要使用 MySQL 、Hbase 、fusion(滴滴自研 KV 存儲(chǔ)) 三種存儲(chǔ)引擎,對(duì)于維表數(shù)據(jù)比較 少的情況可以使用 MySQL,對(duì)于單條數(shù)據(jù)大?比較? ,查詢 QPS 比較高的情況,可以使用 fusion 存
儲(chǔ), ?降低機(jī)器內(nèi)存資源占用,對(duì)于數(shù)據(jù)量比較大,對(duì)維表數(shù)據(jù)變化不是特別敏感的場(chǎng)景,可以使用 HBase 存儲(chǔ)。
命名規(guī)范:DIM 層的表命名使用英??寫(xiě)字母, ?單詞之間用下劃線分開(kāi),總?度不能超過(guò) 30 個(gè)字符,并且應(yīng) 遵循下述規(guī)則:? dim_{業(yè)務(wù)/pub}_{維度定義}[_{自定義命名標(biāo)簽}] :
{業(yè)務(wù)/pub}:參考業(yè)務(wù)命名
{維度定義}:參考維度命名
{自定義表命名標(biāo)簽縮寫(xiě)}:實(shí)體名稱可以根據(jù)數(shù)據(jù)倉(cāng)庫(kù)轉(zhuǎn)換整合后做?定的業(yè)務(wù)抽象的名稱,該名稱應(yīng)該
準(zhǔn)確表述實(shí)體所代表的業(yè)務(wù)含義
樣例:dim_trip_dri_base
4. DWM 匯總層建設(shè)
在建設(shè)順?車實(shí)時(shí)數(shù)倉(cāng)的匯總層的時(shí)候,跟順?車離線數(shù)倉(cāng)有很多?樣的地? ,但其具體技術(shù)實(shí)現(xiàn)會(huì)存在很大 不同。
第?:?對(duì)于?些共性指標(biāo)的加?, ?比如 pv, ?uv,訂單業(yè)務(wù)過(guò)程指標(biāo)等,我們會(huì)在匯總層進(jìn)?統(tǒng)?的運(yùn)算,確 ?保關(guān)于指標(biāo)的?徑是統(tǒng)?在?個(gè)固定的模型中完成 。對(duì)于?些個(gè)性指標(biāo),從指標(biāo)復(fù)用性的?度出發(fā),確定唯? 的時(shí)間字段, ?同時(shí)該字段盡可能與其他指標(biāo)在時(shí)間維度上完成拉齊,例如?中異常訂單數(shù)需要與交易域指標(biāo)在 事件時(shí)間上做到拉齊。
第?:在順?車匯總層建設(shè)中, ?需要進(jìn)?多維的主題匯總, ?因?yàn)閷?shí)時(shí)數(shù)倉(cāng)本身是?向主題的,可能每個(gè)主題會(huì) 關(guān)?的維度都不?樣,所以需要在不同的主題下, ?按照這個(gè)主題關(guān)?的維度對(duì)數(shù)據(jù)進(jìn)?匯總, ?最后來(lái)算業(yè)務(wù)? 需要的匯總指標(biāo) 。在具體操作中,對(duì)于 pv 類指標(biāo)使用 Stream SQL 實(shí)現(xiàn) 1 分鐘匯總指標(biāo)作為最?匯總單位指 標(biāo),在此基礎(chǔ)上進(jìn)?時(shí)間維度上的指標(biāo)累加;對(duì)于 uv 類指標(biāo)直接使用druid 數(shù)據(jù)庫(kù)作為指標(biāo)匯總?cè)萜鳎鶕?jù) 業(yè)務(wù)?對(duì)匯總指標(biāo)的及時(shí)性和準(zhǔn)確性的要求, ?實(shí)現(xiàn)相應(yīng)的精確去重和?精確去重。
第三:?匯總層建設(shè)過(guò)程中, ?還會(huì)涉及到衍生維度的加? 。在順?車券相關(guān)的匯總指標(biāo)加?中我們使用 Hbase 的版本機(jī)制來(lái)構(gòu)建?個(gè)衍生維度的拉鏈表,通過(guò)事件流和 Hbase 維表關(guān)聯(lián)的?式得到實(shí)時(shí)數(shù)據(jù)當(dāng)時(shí)的準(zhǔn)確維 度
命名規(guī)范:DWM 層的表命名使用英??寫(xiě)字母, ?單詞之間用下劃線分開(kāi),總?度不能超過(guò) 40 個(gè)字符,并且 應(yīng)遵循下述規(guī)則:? realtime_dwm_{業(yè)務(wù)/pub}_{數(shù)據(jù)域縮寫(xiě)}_{數(shù)據(jù)主粒度縮寫(xiě)}_ [{自定義表命名標(biāo)簽縮寫(xiě)}]_{統(tǒng)計(jì)時(shí) 間周期范圍縮寫(xiě)} :
{業(yè)務(wù)/pub}:參考業(yè)務(wù)命名
{數(shù)據(jù)域縮寫(xiě)}:參考數(shù)據(jù)域劃分部分
{數(shù)據(jù)主粒度縮寫(xiě)}:指數(shù)據(jù)主要粒度或數(shù)據(jù)域的縮寫(xiě),也是聯(lián)合主鍵中的主要維度
{自定義表命名標(biāo)簽縮寫(xiě)}:實(shí)體名稱可以根據(jù)數(shù)據(jù)倉(cāng)庫(kù)轉(zhuǎn)換整合后做?定的業(yè)務(wù)抽象的名稱,該名稱應(yīng)該 準(zhǔn)確表述實(shí)體所代表的業(yè)務(wù)含義
{統(tǒng)計(jì)時(shí)間周期范圍縮寫(xiě)}:1d:天增量;td:天累計(jì)(全量);1h:小時(shí)增量;th:小時(shí)累計(jì)(全量);1min:分鐘增 量;tmin:分鐘累計(jì)(全量)
樣例:? realtime_dwm_trip_trd_pas_bus_accum_1min
5. APP 應(yīng)用層
該層主要的工作是把實(shí)時(shí)匯總數(shù)據(jù)寫(xiě)?應(yīng)用系統(tǒng)的數(shù)據(jù)庫(kù)中, ?包括用于大屏顯示和實(shí)時(shí) OLAP 的 Druid 數(shù)據(jù)庫(kù) (該數(shù)據(jù)庫(kù)除了寫(xiě)?應(yīng)用數(shù)據(jù),也可以寫(xiě)?明細(xì)數(shù)據(jù)完成匯總指標(biāo)的計(jì)算)中,用于實(shí)時(shí)數(shù)據(jù)接?服務(wù)的 Hbase 數(shù)據(jù)庫(kù),用于實(shí)時(shí)數(shù)據(jù)產(chǎn)品的 mysql 或者 redis 數(shù)據(jù)庫(kù)中。
命名規(guī)范:基于實(shí)時(shí)數(shù)倉(cāng)的特殊性不做硬性要求。
順風(fēng)車實(shí)時(shí)數(shù)倉(cāng)建設(shè)成果
截止目前,? 共為順風(fēng)車業(yè)務(wù)線建立了增長(zhǎng) 、交易 、體驗(yàn) 、安全 、財(cái)務(wù)五大模塊,涉及 40+ 的實(shí)時(shí)看板,涵蓋 順風(fēng)車全部核?業(yè)務(wù)過(guò)程, ?實(shí)時(shí)和離線數(shù)據(jù)誤差<0.5%, ?是順風(fēng)車業(yè)務(wù)線數(shù)據(jù)分析?面的有利補(bǔ)充,為順風(fēng)車 當(dāng)天發(fā)券動(dòng)態(tài)策略調(diào)整, ?司乘安全相關(guān)監(jiān)控, ?實(shí)時(shí)訂單趨勢(shì)分析等提供了實(shí)時(shí)數(shù)據(jù)?持,提高了決策的時(shí)效 ? ? 性。
同時(shí)建立在數(shù)倉(cāng)模型之上的實(shí)時(shí)指標(biāo)能根據(jù)用戶需求及時(shí)完成?徑變更和實(shí)時(shí)離線數(shù)據(jù)?致性校驗(yàn),大大提高 了實(shí)時(shí)指標(biāo)的開(kāi)發(fā)效率和實(shí)時(shí)數(shù)據(jù)的準(zhǔn)確性,也為公司內(nèi)部大范圍建設(shè)實(shí)時(shí)數(shù)倉(cāng)提供了有?的理論和實(shí)踐? ? 持。
2. 快手實(shí)時(shí)數(shù)倉(cāng)場(chǎng)景化案例


1. 目標(biāo)
首先由于是做數(shù)倉(cāng), ?因此希望所有的實(shí)時(shí)指標(biāo)都有離線指標(biāo)去對(duì)應(yīng),要求實(shí)時(shí)指標(biāo)和離線指標(biāo)整體的數(shù)據(jù) 差異在 1% 以內(nèi), ?這是最低標(biāo)準(zhǔn)。
其次是數(shù)據(jù)延遲,其 SLA 標(biāo)準(zhǔn)是活動(dòng)期間所有核?報(bào)表場(chǎng)景的數(shù)據(jù)延遲不能超過(guò) 5 分鐘, ?這 5 分鐘包括 作業(yè)掛掉之后和恢復(fù)時(shí)間,如果超過(guò)則意味著 SLA 不達(dá)標(biāo)。
最后是穩(wěn)定性,針對(duì)?些場(chǎng)景, ?比如作業(yè)重啟后,我們的曲線是正常的,不會(huì)因?yàn)樽鳂I(yè)重啟導(dǎo)致指標(biāo)產(chǎn)出 ?些明顯的異常。
2. 難點(diǎn)
第?個(gè)難點(diǎn)是數(shù)據(jù)量大 。每天整體的?口流量數(shù)據(jù)量級(jí)大概在萬(wàn)億級(jí) 。在活動(dòng)如春晚的場(chǎng)景, ?QPS 峰值能 達(dá)到億 / 秒。
第?個(gè)難點(diǎn)是組件依賴比較復(fù)雜 。可能這條鏈路里有的依賴于 Kafka,有的依賴 Flink, ?還有?些依賴 KV 存儲(chǔ) 、RPC 接口 、OLAP 引擎等,我們需要思考在這條鏈路里如何分布,才能讓這些組件都能正常工作。
第三個(gè)難點(diǎn)是鏈路復(fù)雜 。目前我們有 200+ 核?業(yè)務(wù)作業(yè), ?50+ 核?數(shù)據(jù)源,整體作業(yè)超過(guò) 1000。
2) 實(shí)時(shí)數(shù)倉(cāng) - 分層模型
基于上面三個(gè)難點(diǎn),來(lái)看?下數(shù)倉(cāng)架構(gòu):

如上所示:
最下層有三個(gè)不同的數(shù)據(jù)源,分別是客戶端日志 、服務(wù)端日志以及 Binlog 日志;
在公共基礎(chǔ)層分為兩個(gè)不同的層次,?個(gè)是 DWD 層,做明細(xì)數(shù)據(jù), ?另?個(gè)是 DWS 層,做公共聚合數(shù)
據(jù), ?DIM 是我們常說(shuō)的維度 。我們有?個(gè)基于離線數(shù)倉(cāng)的主題預(yù)分層, ?這個(gè)主題預(yù)分層可能包括流量 、用 戶 、設(shè)備 、視頻的生產(chǎn)消費(fèi) 、風(fēng)控 、社交等。
DWD 層的核?工作是標(biāo)準(zhǔn)化的清洗;
DWS 層是把維度的數(shù)據(jù)和 DWD 層進(jìn)行關(guān)聯(lián),關(guān)聯(lián)之后生成?些通用粒度的聚合層次。
再往上是應(yīng)用層, ?包括?些大盤(pán)的數(shù)據(jù), ?多維分析的模型以及業(yè)務(wù)專題數(shù)據(jù);
最上面是場(chǎng)景。
整體過(guò)程可以分為三步:
第?步是做業(yè)務(wù)數(shù)據(jù)化,相當(dāng)于把業(yè)務(wù)的數(shù)據(jù)接進(jìn)來(lái);
第?步是數(shù)據(jù)資產(chǎn)化,意思是對(duì)數(shù)據(jù)做很多的清洗,然后形成?些規(guī)則有序的數(shù)據(jù);
第三步是數(shù)據(jù)業(yè)務(wù)化,可以理解數(shù)據(jù)在實(shí)時(shí)數(shù)據(jù)層面可以反哺業(yè)務(wù),為業(yè)務(wù)數(shù)據(jù)價(jià)值建設(shè)提供?些賦能。
3) 實(shí)時(shí)數(shù)倉(cāng) - 保障措施
基于上面的分層模型,來(lái)看?下整體的保障措施:

保障層面分為三個(gè)不同的部分,分別是質(zhì)量保障, ?時(shí)效保障以及穩(wěn)定保障。
我們先看藍(lán)色部分的質(zhì)量保障 。針對(duì)質(zhì)量保障,可以看到在數(shù)據(jù)源階段,做了如數(shù)據(jù)源的亂序監(jiān)控, ?這是
我們基于自?的 SDK 的采集做的, ?以及數(shù)據(jù)源和離線的?致性校準(zhǔn) 。研發(fā)階段的計(jì)算過(guò)程有三個(gè)階段, 分別是研發(fā)階段 、上線階段和服務(wù)階段。
研發(fā)階段可能會(huì)提供?個(gè)標(biāo)準(zhǔn)化的模型,基于這個(gè)模型會(huì)有?些 Benchmark,并且做離線的比對(duì)驗(yàn)
證,保證質(zhì)量是?致的;
上線階段更多的是服務(wù)監(jiān)控和指標(biāo)監(jiān)控;
在服務(wù)階段,如果出現(xiàn)?些異常情況,先做 Flink 狀態(tài)拉起,如果出現(xiàn)了?些不符合預(yù)期的場(chǎng)景,我
們會(huì)做離線的整體數(shù)據(jù)修復(fù)。
第?個(gè)是時(shí)效性保障 。針對(duì)數(shù)據(jù)源,我們把數(shù)據(jù)源的延遲情況也納?監(jiān)控 。在研發(fā)階段其實(shí)還有兩個(gè)事
情:
首先是壓測(cè), ?常規(guī)的任務(wù)會(huì)拿最近 7 天或者最近 14 天的峰值流量去看它是否存在任務(wù)延遲的情況; ?通過(guò)壓測(cè)之后,會(huì)有?些任務(wù)上線和重啟性能評(píng)估,相當(dāng)于按照 CP 恢復(fù)之后, ?重啟的性能是什么樣
? 。
最后?個(gè)是穩(wěn)定保障, ?這在大型活動(dòng)中會(huì)做得比較多, ?比如切換演練和分級(jí)保障 。我們會(huì)基于之前的壓測(cè)
結(jié)果做限流, ? 目的是保障作業(yè)在超過(guò)極限的情況下, ?仍然是穩(wěn)定的,不會(huì)出現(xiàn)很多的不穩(wěn)定或者 CP 失敗 的情況 。之后我們會(huì)有兩種不同的標(biāo)準(zhǔn),? 種是冷備雙機(jī)房, ?另外?種是熱備雙機(jī)房。
冷備雙機(jī)房是:?當(dāng)?個(gè)單機(jī)房掛掉,我們會(huì)從另?個(gè)機(jī)房去拉起;
熱備雙機(jī)房:相當(dāng)于同樣?份邏輯在兩個(gè)機(jī)房各部署?次。
以上就是我們整體的保障措施。
4) 快手場(chǎng)景問(wèn)題及解決方案
1. PV/UV 標(biāo)準(zhǔn)化
1.1 場(chǎng)景
第?個(gè)問(wèn)題是 PV/UV 標(biāo)準(zhǔn)化, ?這里有三個(gè)截圖:

第?張圖是春晚活動(dòng)的預(yù)熱場(chǎng)景,相當(dāng)于是?種玩法,第二和第三張圖是春晚當(dāng)天的發(fā)紅包活動(dòng)和直播間截 圖。
在活動(dòng)進(jìn)行過(guò)程中,我們發(fā)現(xiàn) 60~70% 的需求是計(jì)算頁(yè)面里的信息,如:
這個(gè)頁(yè)面來(lái)了多少人,或者有多少人點(diǎn)擊進(jìn)入這個(gè)頁(yè)面;
活動(dòng)?共來(lái)了多少人;
頁(yè)面里的某?個(gè)掛件,獲得了多少點(diǎn)擊 、產(chǎn)生了多少曝光。
1.2 方案
抽象?下這個(gè)場(chǎng)景就是下面這種 SQL:

簡(jiǎn)單來(lái)說(shuō),就是從?張表做篩選條件,然后按照維度層面做聚合,接著產(chǎn)生?些 Count 或者 Sum 操作。基于這種場(chǎng)景,我們最開(kāi)始的解決方案如上圖右邊所示。
我們用到了 Flink SQL 的 Early Fire 機(jī)制,從 Source 數(shù)據(jù)源取數(shù)據(jù), ?之后做了 DID 的分桶 。比如最開(kāi)始紫色 的部分按這個(gè)做分桶,先做分桶的原因是防止某?個(gè) DID 存在熱點(diǎn)的問(wèn)題 。分桶之后會(huì)有?個(gè)叫做 Local ? ? ?Window Agg 的東西,相當(dāng)于數(shù)據(jù)分完桶之后把相同類型的數(shù)據(jù)相加 。Local Window Agg 之后再按照維度進(jìn) 行 Global Window Agg 的合桶,合桶的概念相當(dāng)于按照維度計(jì)算出最終的結(jié)果 。Early Fire 機(jī)制相當(dāng)于在 ? ? ?Local Window Agg 開(kāi)?個(gè)天級(jí)的窗? ,然后每分鐘去對(duì)外輸出?次。
這個(gè)過(guò)程中我們遇到了?些問(wèn)題,如上圖左下角所示。
在代碼正常運(yùn)行的情況下是沒(méi)有問(wèn)題的,但如果整體數(shù)據(jù)存在延遲或者追溯歷史數(shù)據(jù)的情況, ?比如?分鐘 ? ? ?Early Fire ?次, ?因?yàn)樽匪輾v史的時(shí)候數(shù)據(jù)量會(huì)比較大,所以可能導(dǎo)致 14?00 追溯歷史, ?直接讀到了 14?02 的 數(shù)據(jù),而 14?01 的那個(gè)點(diǎn)就被丟掉了,丟掉了以后會(huì)發(fā)生什么?

在這種場(chǎng)景下, ?圖中上方的曲線為 Early Fire 回溯歷史數(shù)據(jù)的結(jié)果 。橫坐標(biāo)是分鐘,縱坐標(biāo)是截止到當(dāng)前時(shí)刻 的頁(yè)面 UV,我們發(fā)現(xiàn)有些點(diǎn)是橫著的,意味著沒(méi)有數(shù)據(jù)結(jié)果,然后?個(gè)陡增,然后?橫著的,接著??個(gè)陡 增,而這個(gè)曲線的預(yù)期結(jié)果其實(shí)是圖中下方那種平滑的曲線。
為了解決這個(gè)問(wèn)題,我們用到了 Cumulate Window 的解決方案, ?這個(gè)解決方案在 Flink 1.13 版本里也有涉 及,其原理是?樣的。

數(shù)據(jù)開(kāi)?個(gè)大的天級(jí)窗口,大窗口下又開(kāi)了?個(gè)小的分鐘級(jí)窗口,數(shù)據(jù)按數(shù)據(jù)本身的 Row Time 落到分鐘級(jí)窗 口。
Watermark 推進(jìn)過(guò)了窗口的 event_time, ?它會(huì)進(jìn)行?次下發(fā)的觸發(fā),通過(guò)這種方式可以解決回溯的問(wèn) 題,數(shù)據(jù)本身落在真實(shí)的窗口, ? Watermark 推進(jìn),在窗口結(jié)束后觸發(fā)。
此外, ?這種方式在?定程度上能夠解決亂序的問(wèn)題 。比如它的亂序數(shù)據(jù)本身是?個(gè)不丟棄的狀態(tài),會(huì)記錄 到最新的累計(jì)數(shù)據(jù)。
最后是語(yǔ)義?致性, ?它會(huì)基于事件時(shí)間,在亂序不嚴(yán)重的情況下, ?和離線計(jì)算出來(lái)的結(jié)果?致性是相當(dāng)高 的。
2. DAU 計(jì)算
2.1 背景介紹
下面介紹?下 DAU 計(jì)算:

我們對(duì)于整個(gè)大盤(pán)的活躍設(shè)備 、新增設(shè)備和回流設(shè)備有比較多的監(jiān)控。
活躍設(shè)備指的是當(dāng)天來(lái)過(guò)的設(shè)備;
新增設(shè)備指的是當(dāng)天來(lái)過(guò)且歷史沒(méi)有來(lái)過(guò)的設(shè)備;
回流設(shè)備指的是當(dāng)天來(lái)過(guò)且 N 天內(nèi)沒(méi)有來(lái)過(guò)的設(shè)備。
但是我們計(jì)算過(guò)程之中可能需要 5~8 個(gè)這樣不同的 Topic 去計(jì)算這?個(gè)指標(biāo)。
我們看?下離線過(guò)程中,邏輯應(yīng)該怎么算。
首先我們先算活躍設(shè)備,把這些合并到?起,然后做?個(gè)維度下的天級(jí)別去重,接著再去關(guān)聯(lián)維度表, ?這個(gè)維 度表包括設(shè)備的首末次時(shí)間,就是截止到昨天設(shè)備首次訪問(wèn)和末次訪問(wèn)的時(shí)間。
得到這個(gè)信息之后,我們就可以進(jìn)?邏輯計(jì)算,然后我們會(huì)發(fā)現(xiàn)新增和回流的設(shè)備其實(shí)是活躍設(shè)備里打的?個(gè) ?標(biāo)簽 。新增設(shè)備就是做了?個(gè)邏輯處理, ?回流設(shè)備是做了 30 天的邏輯處理,基于這樣的解決方案,我們能 否簡(jiǎn)單地寫(xiě)?個(gè) SQL 去解決這個(gè)問(wèn)題?
其實(shí)我們最開(kāi)始是這么做的,但遇到了?些問(wèn)題:
第?個(gè)問(wèn)題是:數(shù)據(jù)源是 6~8 個(gè), ?而且我們大盤(pán)的?徑經(jīng)常會(huì)做微調(diào),如果是單作業(yè)的話,每次微調(diào)的過(guò) 程之中都要改, ?單作業(yè)的穩(wěn)定性會(huì)非常差;
第?個(gè)問(wèn)題是:數(shù)據(jù)量是萬(wàn)億級(jí), ?這會(huì)導(dǎo)致兩個(gè)情況, ?首先是這個(gè)量級(jí)的單作業(yè)穩(wěn)定性非常差,其次是實(shí) 時(shí)關(guān)聯(lián)維表的時(shí)候用的 KV 存儲(chǔ),任何?個(gè)這樣的 RPC 服務(wù)接? ,都不可能在萬(wàn)億級(jí)數(shù)據(jù)量的場(chǎng)景下保證 服務(wù)穩(wěn)定性;
第三個(gè)問(wèn)題是:我們對(duì)于時(shí)延要求比較高,要求時(shí)延?于?分鐘 。整個(gè)鏈路要避免批處理,如果出現(xiàn)了? 些任務(wù)性能的單點(diǎn)問(wèn)題,我們還要保證高性能和可擴(kuò)容。
2.2 技術(shù)方案
針對(duì)以上問(wèn)題,介紹?下我們是怎么做的:

如上圖的例? ,第?步是對(duì) A B C 這三個(gè)數(shù)據(jù)源,先按照維度和 DID 做分鐘級(jí)別去重,分別去重之后得到三個(gè) 分鐘級(jí)別去重的數(shù)據(jù)源,接著把它們 Union 到?起,然后再進(jìn)行同樣的邏輯操作。
這相當(dāng)于我們數(shù)據(jù)源的??從萬(wàn)億變到了百億的級(jí)別,分鐘級(jí)別去重之后再進(jìn)行?個(gè)天級(jí)別的去重,產(chǎn)生的數(shù) 據(jù)源就可以從百億變成了??億的級(jí)別。
在??億級(jí)別數(shù)據(jù)量的情況下, ?我們?cè)偃リP(guān)聯(lián)數(shù)據(jù)服務(wù)化, ?這就是?種比較可行的狀態(tài),相當(dāng)于去關(guān)聯(lián)用戶畫(huà) 像的 RPC 接? ,得到 RPC 接?之后, ?最終寫(xiě)?到了目標(biāo) Topic。這個(gè)目標(biāo) Topic 會(huì)導(dǎo)?到 OLAP 引擎,供給
多個(gè)不同的服務(wù), ?包括移動(dòng)版服務(wù),大屏服務(wù),指標(biāo)看板服務(wù)等。
這個(gè)?案有三個(gè)?面的優(yōu)勢(shì),分別是穩(wěn)定性 、時(shí)效性和準(zhǔn)確性。
首先是穩(wěn)定性 。松耦合可以簡(jiǎn)單理解為當(dāng)數(shù)據(jù)源 A 的邏輯和數(shù)據(jù)源 B 的邏輯需要修改時(shí),可以單獨(dú)修改。
第?是任務(wù)可擴(kuò)容, ?因?yàn)槲覀儼阉羞壿嫴鸱值梅浅<?xì)粒度, ?當(dāng)?些地?出現(xiàn)了如流量問(wèn)題,不會(huì)影響后 面的部分,所以它擴(kuò)容比較簡(jiǎn)單, ?除此之外還有服務(wù)化后置和狀態(tài)可控。
其次是時(shí)效性,我們做到毫秒延遲,并且維度豐富,整體上有 20+ 的維度做多維聚合。最后是準(zhǔn)確性,我們?持?jǐn)?shù)據(jù)驗(yàn)證 、實(shí)時(shí)監(jiān)控 、模型出?統(tǒng)?等。
此時(shí)我們遇到了另外?個(gè)問(wèn)題 - 亂序 。對(duì)于上?三個(gè)不同的作業(yè), ?每?個(gè)作業(yè)重啟至少會(huì)有兩分鐘左右的延 遲,延遲會(huì)導(dǎo)致下游的數(shù)據(jù)源 Union 到?起就會(huì)有亂序。
2.3 延遲計(jì)算?案
遇到上面這種有亂序的情況下, ?我們要怎么處理?

我們總共有三種處理方案:
第?種解決方案是用“did + 維度 + 分鐘”進(jìn)行去重,Value 設(shè)為“是否來(lái)過(guò)”。比如同?個(gè) did, ?04:01 來(lái)
了?條, ?它會(huì)進(jìn)行結(jié)果輸出 。同樣的, ?04:02 和 04?04 也會(huì)進(jìn)行結(jié)果輸出 。但如果 04:01 再來(lái), ?它就會(huì) 丟棄,但如果 04?00 來(lái),依舊會(huì)進(jìn)行結(jié)果輸出。
這個(gè)解決方案存在?些問(wèn)題, ?因?yàn)槲覀儼捶昼姶妫?20 分鐘的狀態(tài)大?是存 10 分鐘的兩倍,到后面這個(gè) 狀態(tài)大?有點(diǎn)不太可控, ?因此我們?換了解決方案 2。
第?種解決方案,我們的做法會(huì)涉及到?個(gè)假設(shè)前提,就是假設(shè)不存在數(shù)據(jù)源亂序的情況 。在這種情況 下, ?key 存的是“did + 維度”,Value 為“時(shí)間戳”, ?它的更新方式如上圖所示。
04?01 來(lái)了?條數(shù)據(jù), ?進(jìn)行結(jié)果輸出 。04:02 來(lái)了?條數(shù)據(jù),如果是同?個(gè) did,那么它會(huì)更新時(shí)間戳, ?然后仍然做結(jié)果輸出 。04:04 也是同樣的邏輯,然后將時(shí)間戳更新到 04:04,如果后面來(lái)了?條 04:
01 的數(shù)據(jù), ?它發(fā)現(xiàn)時(shí)間戳已經(jīng)更新到 04?04, ?它會(huì)丟棄這條數(shù)據(jù)。
這樣的做法大幅度減少了本身所需要的?些狀態(tài),但是對(duì)亂序是零容忍,不允許發(fā)生任何亂序的情況, ?由 于我們不好解決這個(gè)問(wèn)題, ?因此我們?想出了解決方案 3。
方案 3 是在方案 2 時(shí)間戳的基礎(chǔ)之上, ?加了?個(gè)類似于環(huán)形緩沖區(qū),在緩沖區(qū)之內(nèi)允許亂序。
比如 04?01 來(lái)了?條數(shù)據(jù), ?進(jìn)行結(jié)果輸出;04?02 來(lái)了?條數(shù)據(jù), ?它會(huì)把時(shí)間戳更新到 04?02,并且會(huì)記 錄同?個(gè)設(shè)備在 04?01 也來(lái)過(guò) 。如果 04?04 再來(lái)了?條數(shù)據(jù),就按照相應(yīng)的時(shí)間差做?個(gè)位移, ?最后通 ?過(guò)這樣的邏輯去保障它能夠容忍?定的亂序。
綜合來(lái)看這三個(gè)方案:
方案 1 在容忍 16 分鐘亂序的情況下, ?單作業(yè)的狀態(tài)大?在 480G 左右 。這種情況雖然保證了準(zhǔn)確性,但 是作業(yè)的恢復(fù)和穩(wěn)定性是完全不可控的狀態(tài), ?因此我們還是放棄了這個(gè)方案;
方案 2 是 30G 左右的狀態(tài)大? ,對(duì)于亂序 0 容忍,但是數(shù)據(jù)不準(zhǔn)確, ?由于我們對(duì)準(zhǔn)確性的要求非常高, 因此也放棄了這個(gè)方案;
方案 3 的狀態(tài)跟方案 1 相比, ?它的狀態(tài)雖然變化了但是增加的不多,而且整體能達(dá)到跟方案 1 同樣的效
果 。方案 3 容忍亂序的時(shí)間是 16 分鐘,我們正常更新?個(gè)作業(yè)的話, ?10 分鐘完全足夠重啟, ?因此最終選 擇了方案 3。
3. 運(yùn)營(yíng)場(chǎng)景
3.1 背景介紹

運(yùn)營(yíng)場(chǎng)景可分為四個(gè)部分:
第?個(gè)是數(shù)據(jù)大屏支持, ?包括單直播間的分析數(shù)據(jù)和大盤(pán)的分析數(shù)據(jù), ?需要做到分鐘級(jí)延遲,更新要求比
較高;
第?個(gè)是直播看板支持, ?直播看板的數(shù)據(jù)會(huì)有特定維度的分析,特定?群支持,對(duì)維度豐富性要求比較
高;
第三個(gè)是數(shù)據(jù)策略榜單, ?這個(gè)榜單主要是預(yù)測(cè)熱門作品 、爆款,要求的是?時(shí)級(jí)別的數(shù)據(jù),更新要求比較
低;
第四個(gè)是 C 端實(shí)時(shí)指標(biāo)展示,查詢量比較大,但是查詢模式比較固定。
下面進(jìn)行分析這 4 種不同的狀態(tài)產(chǎn)生的?些不同的場(chǎng)景。

前 3 種基本沒(méi)有什么差別, ?只是在查詢模式上, ?有的是特定業(yè)務(wù)場(chǎng)景,有的是通用業(yè)務(wù)場(chǎng)景。
針對(duì)第 3 種和第 4 種, ?它對(duì)于更新的要求比較低,對(duì)于吞吐的要求比較高,過(guò)程之中的曲線也不要求有?致 性 。第 4 種查詢模式更多的是單實(shí)體的?些查詢, ?比如去查詢內(nèi)容,會(huì)有哪些指標(biāo),而且對(duì) QPS 要求比較 ?高。
3.2 技術(shù)方案
針對(duì)上方 4 種不同的場(chǎng)景,我們是如何去做的?

首先看?下基礎(chǔ)明細(xì)層 (圖中左方),數(shù)據(jù)源有兩條鏈路,其中?條鏈路是消費(fèi)的流, ?比如直播的消費(fèi)信 息, ?還有觀看 / 點(diǎn)贊 / 評(píng)論 。經(jīng)過(guò)?輪基礎(chǔ)清洗,然后做維度管理 。上游的這些維度信息來(lái)源于 Kafka,
Kafka 寫(xiě)?了?些內(nèi)容的維度,放到了 KV 存儲(chǔ)里邊, ?包括?些用戶的維度。
這些維度關(guān)聯(lián)了之后, ?最終寫(xiě)? Kafka 的 DWD 事實(shí)層, ?這里為了做性能的提升,我們做了?級(jí)緩存的操 作。
如圖中上? ,我們讀取 DWD 層的數(shù)據(jù)然后做基礎(chǔ)匯總,核?是窗?維度聚合生成 4 種不同粒度的數(shù)據(jù),
分別是大盤(pán)多維匯總 topic 、直播間多維匯總 topic 、作者多維匯總 topic 、用戶多維匯總 topic, ?這些都是 通用維度的數(shù)據(jù)。
如圖中下? ,基于這些通用維度數(shù)據(jù),我們?cè)偃ゼ?個(gè)性化維度的數(shù)據(jù),也就是 ADS 層 。拿到了這些數(shù)
據(jù)之后會(huì)有維度擴(kuò)展, ?包括內(nèi)容擴(kuò)展和運(yùn)營(yíng)維度的拓展,然后再去做聚合, ?比如會(huì)有電商實(shí)時(shí) topic,機(jī) 構(gòu)服務(wù)實(shí)時(shí) topic 和大 V 直播實(shí)時(shí) topic。
分成這樣的兩個(gè)鏈路會(huì)有?個(gè)好處:?個(gè)地?處理的是通用維度, ?另?個(gè)地?處理的是個(gè)性化的維度 。通 用維度保障的要求會(huì)比較高?些,個(gè)性化維度則會(huì)做很多個(gè)性化的邏輯 。如果這兩個(gè)耦合在?起的話,會(huì) 發(fā)現(xiàn)任務(wù)經(jīng)常出問(wèn)題,并且分不清楚哪個(gè)任務(wù)的職責(zé)是什么,構(gòu)建不出這樣的?個(gè)穩(wěn)定層。
如圖中右?, ?最終我們用到了三種不同的引擎 。簡(jiǎn)單來(lái)說(shuō)就是 Redis 查詢用到了 C 端的場(chǎng)景, ?OLAP 查詢 用到了大屏 、業(yè)務(wù)看板的場(chǎng)景。
5) 未來(lái)規(guī)劃
上??共講了三個(gè)場(chǎng)景,第?個(gè)場(chǎng)景是標(biāo)準(zhǔn)化 PU/UV 的計(jì)算,第?個(gè)場(chǎng)景是 DAU 整體的解決?案,第三個(gè)場(chǎng) 景是運(yùn)營(yíng)側(cè)如何解決 。基于這些內(nèi)容,我們有?些未來(lái)規(guī)劃,分為 4 個(gè)部分。

第?部分是實(shí)時(shí)保障體系完善:
??面做?些大型的活動(dòng), ?包括春晚活動(dòng)以及后續(xù)常態(tài)化的活動(dòng) 。針對(duì)這些活動(dòng)如何去保障,我們有
?套規(guī)范去做平臺(tái)化的建設(shè);
第?個(gè)是分級(jí)保障標(biāo)準(zhǔn)制定, ?哪些作業(yè)是什么樣的保障級(jí)別 / 標(biāo)準(zhǔn),會(huì)有?個(gè)標(biāo)準(zhǔn)化的說(shuō)明;
第三個(gè)是引擎平臺(tái)能?推動(dòng)解決, ?包括 Flink 任務(wù)的?些引擎,在這上面我們會(huì)有?個(gè)平臺(tái),基于這
個(gè)平臺(tái)去做規(guī)范 、標(biāo)準(zhǔn)化的推動(dòng)。
第?部分是實(shí)時(shí)數(shù)倉(cāng)內(nèi)容構(gòu)建:
??面是場(chǎng)景化?案的輸出, ?比如針對(duì)活動(dòng)會(huì)有?些通用化的?案,而不是每次活動(dòng)都開(kāi)發(fā)?套新的
解決?案;
另??面是內(nèi)容數(shù)據(jù)層次沉淀, ?比如現(xiàn)在的數(shù)據(jù)內(nèi)容建設(shè),在厚度?面有?些場(chǎng)景的缺失, ?包括內(nèi)容
如何更好地服務(wù)于上游的場(chǎng)景。
第三部分是 Flink SQL 場(chǎng)景化構(gòu)建, ?包括 SQL 持續(xù)推? 、SQL 任務(wù)穩(wěn)定性和 SQL 任務(wù)資源利用率 。我們 在預(yù)估資源的過(guò)程中,會(huì)考慮比如在什么樣 QPS 的場(chǎng)景下, ? SQL 用什么樣的解決?案,能?撐到什么情
況 。Flink SQL 可以?幅減少?效,但是在這個(gè)過(guò)程中,我們想讓業(yè)務(wù)操作更加簡(jiǎn)單。
第四部分是批流?體探索 。實(shí)時(shí)數(shù)倉(cāng)的場(chǎng)景其實(shí)就是做離線 ETL 計(jì)算加速,我們會(huì)有很多?時(shí)級(jí)別的任
務(wù),針對(duì)這些任務(wù),每次批處理的時(shí)候有?些邏輯可以放到流處理去解決, ?這對(duì)于離線數(shù)倉(cāng) SLA 體系的提 升?分巨? 。
3. 騰訊看點(diǎn)實(shí)時(shí)數(shù)倉(cāng)案例
騰訊看點(diǎn)業(yè)務(wù)為什么要構(gòu)建實(shí)時(shí)數(shù)倉(cāng)??因?yàn)樵嫉纳蠄?bào)數(shù)據(jù)量非常?,? 天上報(bào)峰值就有上萬(wàn)億條 。而且上報(bào) 格式混亂 。缺乏內(nèi)容維度信息 、用戶畫(huà)像信息,下游沒(méi)辦法直接使用。
而我們提供的實(shí)時(shí)數(shù)倉(cāng), ?是根據(jù)騰訊看點(diǎn)信息流的業(yè)務(wù)場(chǎng)景, ?進(jìn)行了內(nèi)容維度的關(guān)聯(lián),用戶畫(huà)像的關(guān)聯(lián),各種 粒度的聚合,下游可以非常?便的使用實(shí)時(shí)數(shù)據(jù),而且實(shí)時(shí)數(shù)據(jù)倉(cāng)庫(kù)可以提供給下游的用戶反復(fù)的消費(fèi)使用, 可以?量的減少重復(fù)的?作。
1) 方案選型

那就看下我們多維實(shí)時(shí)數(shù)據(jù)分析系統(tǒng)的?案選型,選型我們對(duì)比了行業(yè)內(nèi)的領(lǐng)先?案,選擇了最符合我們業(yè)務(wù) 場(chǎng)景的?案。
第?塊是實(shí)時(shí)數(shù)倉(cāng)的選型,我們選擇的是業(yè)界比較成熟的 Lambda 架構(gòu),他的優(yōu)點(diǎn)是靈活性高 、容錯(cuò)性
高 、成熟度高和遷移成本低;缺點(diǎn)是實(shí)時(shí) 、離線數(shù)據(jù)用兩套代碼,可能會(huì)存在?個(gè)?徑修改了, ?另?個(gè)沒(méi) 改的問(wèn)題,我們每天都有做數(shù)據(jù)對(duì)賬的?作,如果有異常會(huì)進(jìn)行告警。
第?塊是實(shí)時(shí)計(jì)算引擎選型, ?因?yàn)?Flink 設(shè)計(jì)之初就是為了流處理,SparkStreaming 嚴(yán)格來(lái)說(shuō)還是微批處 理,Strom 用的已經(jīng)不多了 。再看 Flink 具有 Exactly-once 的準(zhǔn)確性 、輕量級(jí) Checkpoint 容錯(cuò)機(jī)制 、低
延時(shí)高吞吐和易用性高的特點(diǎn),我們選擇了 Flink 作為實(shí)時(shí)計(jì)算引擎。
第三塊是實(shí)時(shí)存儲(chǔ)引擎,我們的要求就是需要有維度索引 、支持高并發(fā) 、預(yù)聚合 、高性能實(shí)時(shí)多維 OLAP
查詢 。可以看到, ?Hbase 、Tdsql 和 ES 都不能滿足要求, ?Druid 有?個(gè)缺陷, ?它是按照時(shí)序劃分 ? ? ? ? ? ?Segment,無(wú)法將同?個(gè)內(nèi)容,存放在同?個(gè) Segment 上, ?計(jì)算全局 TopN 只能是近似值,所以我們選 擇了最近兩年大火的 MPP 數(shù)據(jù)庫(kù)引擎 ClickHouse。
2) 設(shè)計(jì)目標(biāo)與設(shè)計(jì)難點(diǎn)

我們多維實(shí)時(shí)數(shù)據(jù)分析系統(tǒng)分為三大模塊
1. 實(shí)時(shí)計(jì)算引擎
2. 實(shí)時(shí)存儲(chǔ)引擎
3. App 層
難點(diǎn)主要在前兩個(gè)模塊:實(shí)時(shí)計(jì)算引擎和實(shí)時(shí)存儲(chǔ)引擎。
1. 千萬(wàn)級(jí)/s 的海量數(shù)據(jù)如何實(shí)時(shí)接? ,并且進(jìn)行極低延遲維表關(guān)聯(lián)。
2. 實(shí)時(shí)存儲(chǔ)引擎如何支持高并發(fā)寫(xiě)? 、高可用分布式和高性能索引查詢, ?是比較難的。這?個(gè)模塊的具體實(shí)現(xiàn),看?下我們系統(tǒng)的架構(gòu)設(shè)計(jì)。
3) 架構(gòu)設(shè)計(jì)

前端采用的是開(kāi)源組件 Ant Design,利用了 Nginx 服務(wù)器,部署靜態(tài)頁(yè)面,并反向代理了瀏覽器的請(qǐng)求到后 臺(tái)服務(wù)器上。
后臺(tái)服務(wù)是基于騰訊自研的 RPC 后臺(tái)服務(wù)框架寫(xiě)的,并且會(huì)進(jìn)??些?級(jí)緩存。
實(shí)時(shí)數(shù)倉(cāng)部分,分為了接?層 、實(shí)時(shí)計(jì)算層和實(shí)時(shí)數(shù)倉(cāng)存儲(chǔ)層。
接?層主要是從千萬(wàn)級(jí)/s 的原始消息隊(duì)列中,拆分出不同?為數(shù)據(jù)的微隊(duì)列,拿看點(diǎn)的視頻來(lái)說(shuō),拆分過(guò) 后,數(shù)據(jù)就只有百萬(wàn)級(jí)/s 了;
實(shí)時(shí)計(jì)算層主要負(fù)責(zé), ?多??為流水?dāng)?shù)據(jù)進(jìn)??轉(zhuǎn)列, ?實(shí)時(shí)關(guān)聯(lián)用戶畫(huà)像數(shù)據(jù)和內(nèi)容維度數(shù)據(jù);
實(shí)時(shí)數(shù)倉(cāng)存儲(chǔ)層主要是設(shè)計(jì)出符合看點(diǎn)業(yè)務(wù)的,下游好用的實(shí)時(shí)消息隊(duì)列 。我們暫時(shí)提供了兩個(gè)消息隊(duì)
列,作為實(shí)時(shí)數(shù)倉(cāng)的兩層 。?層 DWM 層是內(nèi)容 ID-用戶 ID 粒度聚合的,就是?條數(shù)據(jù)包含內(nèi)容 ID-用戶 ID 還有 B 側(cè)內(nèi)容數(shù)據(jù) 、C 側(cè)用戶數(shù)據(jù)和用戶畫(huà)像數(shù)據(jù);另?層是 DWS 層, ?是內(nèi)容 ID 粒度聚合的,? 條 數(shù)據(jù)包含內(nèi)容 ID, ?B 側(cè)數(shù)據(jù)和 C 側(cè)數(shù)據(jù) 。可以看到內(nèi)容 ID-用戶 ID 粒度的消息隊(duì)列流量進(jìn)?步減小到? 萬(wàn)級(jí)/s, ?內(nèi)容 ID 粒度的更是萬(wàn)級(jí)/s,并且格式更加清晰,維度信息更加豐富。
實(shí)時(shí)存儲(chǔ)部分分為實(shí)時(shí)寫(xiě)?層 、OLAP 存儲(chǔ)層和后臺(tái)接?層。
實(shí)時(shí)寫(xiě)?層主要是負(fù)責(zé) Hash 路由將數(shù)據(jù)寫(xiě)?;
OLAP 存儲(chǔ)層利用 MPP 存儲(chǔ)引擎,設(shè)計(jì)符合業(yè)務(wù)的索引和物化視圖, ?高效存儲(chǔ)海量數(shù)據(jù);后臺(tái)接?層提供高效的多維實(shí)時(shí)查詢接? 。
4) 實(shí)時(shí)計(jì)算
這個(gè)系統(tǒng)最復(fù)雜的兩塊, ?實(shí)時(shí)計(jì)算和實(shí)時(shí)存儲(chǔ)。
先介紹實(shí)時(shí)計(jì)算部分:分為實(shí)時(shí)關(guān)聯(lián)和實(shí)時(shí)數(shù)倉(cāng)。
1. 實(shí)時(shí)高性能維表關(guān)聯(lián)

實(shí)時(shí)維表關(guān)聯(lián)這?塊難度在于 百萬(wàn)級(jí)/s 的實(shí)時(shí)數(shù)據(jù)流,如果直接去關(guān)聯(lián) HBase, ?1 分鐘的數(shù)據(jù),關(guān)聯(lián)完 HBase 耗時(shí)是?時(shí)級(jí)的,會(huì)導(dǎo)致數(shù)據(jù)延遲嚴(yán)重。
我們提出了?個(gè)解決方案:
第?個(gè)是,在 Flink 實(shí)時(shí)計(jì)算環(huán)節(jié),先按照 1 分鐘進(jìn)行了窗?聚合,將窗?內(nèi)多行行為數(shù)據(jù)轉(zhuǎn)?行多列的 數(shù)據(jù)格式,經(jīng)過(guò)這?步操作,原本?時(shí)級(jí)的關(guān)聯(lián)耗時(shí)下降到了??分鐘,但是還是不夠的。
第二個(gè)是,在訪問(wèn) HBase 內(nèi)容之前設(shè)置?層 Redis 緩存, ?因?yàn)?1000 條數(shù)據(jù)訪問(wèn) HBase 是秒級(jí)的,而訪
問(wèn) Redis 是毫秒級(jí)的,訪問(wèn) Redis 的速度基本是訪問(wèn) HBase 的 1000 倍 。為了防止過(guò)期的數(shù)據(jù)浪費(fèi)緩 ? ? 存,緩存過(guò)期時(shí)間設(shè)置成 24 ?時(shí), ?同時(shí)通過(guò)監(jiān)聽(tīng)寫(xiě) HBase Proxy 來(lái)保證緩存的?致性 。這樣將訪問(wèn)時(shí)間
從??分鐘變成了秒級(jí)。
第三個(gè)是,上報(bào)過(guò)程中會(huì)上報(bào)不少非常規(guī)內(nèi)容 ID, ?這些內(nèi)容 ID 在內(nèi)容 HBase 中是不存儲(chǔ)的,會(huì)造成緩存
穿透的問(wèn)題 。所以在實(shí)時(shí)計(jì)算的時(shí)候,我們直接過(guò)濾掉這些內(nèi)容 ID, ?防止緩存穿透, ??減少?些時(shí)間 。? 第四個(gè)是, ?因?yàn)樵O(shè)置了定時(shí)緩存,會(huì)引??個(gè)緩存雪崩的問(wèn)題 。為了防止雪崩,我們?cè)趯?shí)時(shí)計(jì)算中, ?進(jìn)行
了削峰填谷的操作,錯(cuò)開(kāi)設(shè)置緩存的時(shí)間。
可以看到,優(yōu)化前后,數(shù)據(jù)量從百億級(jí)減少到了?億級(jí),耗時(shí)從?t時(shí)級(jí)減少到了數(shù)?秒,減少 99%。
2. 下游提供服務(wù)

實(shí)時(shí)數(shù)倉(cāng)的難度在于:它處于比較新的領(lǐng)域,并且各個(gè)公司各個(gè)業(yè)務(wù)差距比較大,怎么能設(shè)計(jì)出?便,好用, 符合看點(diǎn)業(yè)務(wù)場(chǎng)景的實(shí)時(shí)數(shù)倉(cāng)是有難度的。
先看?下實(shí)時(shí)數(shù)倉(cāng)做了什么, ?實(shí)時(shí)數(shù)倉(cāng)對(duì)外就是?個(gè)消息隊(duì)列,不同的消息隊(duì)列里面存放的就是不同聚合粒度 的實(shí)時(shí)數(shù)據(jù), ?包括內(nèi)容 ID 、用戶 ID 、C 側(cè)行為數(shù)據(jù) 、B 側(cè)內(nèi)容維度數(shù)據(jù)和用戶畫(huà)像數(shù)據(jù)等。
我們是怎么搭建實(shí)時(shí)數(shù)倉(cāng)的,就是上面介紹的實(shí)時(shí)計(jì)算引擎的輸出,放到消息隊(duì)列中保存,可以提供給下游多 用戶復(fù)用。
我們可以看下, ?在我們建設(shè)實(shí)時(shí)數(shù)據(jù)倉(cāng)庫(kù)前后, ?開(kāi)發(fā)?個(gè)實(shí)時(shí)應(yīng)用的區(qū)別 。沒(méi)有數(shù)倉(cāng)的時(shí)候,我們需要消費(fèi)千 萬(wàn)級(jí)/s 的原始隊(duì)列, ?進(jìn)行復(fù)雜的數(shù)據(jù)清洗,然后再進(jìn)行用戶畫(huà)像關(guān)聯(lián) 、內(nèi)容維度關(guān)聯(lián),才能拿到符合要求格式 的實(shí)時(shí)數(shù)據(jù), ?開(kāi)發(fā)和擴(kuò)展的成本都會(huì)比較高,如果想開(kāi)發(fā)?個(gè)新的應(yīng)用, ??要走?遍這個(gè)流程 。有了數(shù)倉(cāng)之 ? 后,如果想開(kāi)發(fā)內(nèi)容 ID 粒度的實(shí)時(shí)應(yīng)用,就直接申請(qǐng) TPS 萬(wàn)級(jí)/s 的 DWS 層的消息隊(duì)列 。開(kāi)發(fā)成本變低很 ? ?多, ?資源消耗?很多,可擴(kuò)展性也強(qiáng)很多。
看個(gè)實(shí)際例?, ?開(kāi)發(fā)我們系統(tǒng)的實(shí)時(shí)數(shù)據(jù)大屏,原本需要進(jìn)行如上所有操作,才能拿到數(shù)據(jù) 。現(xiàn)在只需要消費(fèi) DWS 層消息隊(duì)列, ?寫(xiě)?條 Flink SQL 即可,僅消耗 2 個(gè) CPU 核?, ?1G 內(nèi)存。
可以看到, ?以 50 個(gè)消費(fèi)者為例, ?建立實(shí)時(shí)數(shù)倉(cāng)前后,下游開(kāi)發(fā)?個(gè)實(shí)時(shí)應(yīng)用,可以減少 98%的資源消耗 。包 括計(jì)算資源,存儲(chǔ)資源,??成本和開(kāi)發(fā)?員學(xué)習(xí)接?成本等等 。并且消費(fèi)者越多, ?節(jié)省越多 。就拿 Redis 存 儲(chǔ)這?部分來(lái)說(shuō),?個(gè)?就能省下上百萬(wàn)?民幣。
5) 實(shí)時(shí)存儲(chǔ)
介紹完實(shí)時(shí)計(jì)算,再來(lái)介紹實(shí)時(shí)存儲(chǔ)。
這塊分為三個(gè)部分來(lái)介紹
第?是 分布式-高可用
第?是 海量數(shù)據(jù)-寫(xiě)?
第三是 高性能-查詢

我們這里聽(tīng)取的是 Clickhouse 官方的建議,借助 ZK 實(shí)現(xiàn)高可用的方案 。數(shù)據(jù)寫(xiě)??個(gè)分片,僅寫(xiě)??個(gè)副 本,然后再寫(xiě) ZK,通過(guò) ZK 告訴同?個(gè)分片的其他副本,其他副本再過(guò)來(lái)拉取數(shù)據(jù),保證數(shù)據(jù)?致性。
這里沒(méi)有選用消息隊(duì)列進(jìn)?數(shù)據(jù)同步, ?是因?yàn)?ZK 更加輕量級(jí) 。而且寫(xiě)的時(shí)候,任意寫(xiě)?個(gè)副本,其它副本都 能夠通過(guò) ZK 獲得?致的數(shù)據(jù) 。而且就算其它節(jié)點(diǎn)第?次來(lái)獲取數(shù)據(jù)失敗了,后?只要發(fā)現(xiàn)它跟 ZK 上記錄的 數(shù)據(jù)不?致,就會(huì)再次嘗試獲取數(shù)據(jù),保證?致性。
2. 海量數(shù)據(jù)-寫(xiě)入

數(shù)據(jù)寫(xiě)?遇到的第?個(gè)問(wèn)題是,海量數(shù)據(jù)直接寫(xiě)? Clickhouse 的話,會(huì)導(dǎo)致 ZK 的 QPS 太高,解決方案是改 用 Batch 方式寫(xiě)? 。Batch 設(shè)置多大呢, ?Batch 太?的話緩解不了 ZK 的壓? , ?Batch 也不能太大,不然上游 內(nèi)存壓?太大,通過(guò)實(shí)驗(yàn), ?最終我們選用了大???萬(wàn)的 Batch。
第?個(gè)問(wèn)題是, ?隨著數(shù)據(jù)量的增?, ?單 QQ 看點(diǎn)的視頻內(nèi)容每天可能寫(xiě)?百億級(jí)的數(shù)據(jù),默認(rèn)方案是寫(xiě)?張分 布式表, ?這就會(huì)造成單臺(tái)機(jī)器出現(xiàn)磁盤(pán)的瓶頸,尤其是 Clickhouse 底層運(yùn)用的是 Mergetree,原理類似于 ? ? HBase 、RocketsDB 的底層 LSM-Tree。在合并的過(guò)程中會(huì)存在寫(xiě)放大的問(wèn)題,加重磁盤(pán)壓? 。峰值每分鐘? 千萬(wàn)條數(shù)據(jù), ?寫(xiě)完耗時(shí)??秒,如果正在做 Merge,就會(huì)阻塞寫(xiě)?請(qǐng)求,查詢也會(huì)?常慢 。我們做的兩個(gè)優(yōu)化 方案:?是對(duì)磁盤(pán)做 Raid,提升磁盤(pán)的 IO;?是在寫(xiě)?之前進(jìn)?分表, ?直接分開(kāi)寫(xiě)?到不同的分片上, ?磁盤(pán) ?壓?直接變?yōu)?1/N。
第三個(gè)問(wèn)題是,雖然我們寫(xiě)?按照分片進(jìn)?了劃分,但是這里引?了?個(gè)分布式系統(tǒng)常?的問(wèn)題,就是局部的 Top 并非全局 Top 的問(wèn)題 。比如同?個(gè)內(nèi)容 ID 的數(shù)據(jù)落在了不同的分片上, ?計(jì)算全局 Top100 閱讀的內(nèi)容 ? ?ID,有?個(gè)內(nèi)容 ID 在分片 1 上是 Top100,但是在其它分片上不是 Top100,導(dǎo)致匯總的時(shí)候,會(huì)丟失?部分 數(shù)據(jù),影響最終結(jié)果 。我們做的優(yōu)化是在寫(xiě)?之前加上?層路由,將同?個(gè)內(nèi)容 ID 的記錄,全部路由到同? ?個(gè)分片上, ?解決了該問(wèn)題。
介紹完寫(xiě)?,下?步介紹 Clickhouse 的高性能存儲(chǔ)和查詢。
3. 高性能-存儲(chǔ)-查詢
Clickhouse 高性能查詢的?個(gè)關(guān)鍵點(diǎn)是稀疏索引 。稀疏索引這個(gè)設(shè)計(jì)就很有講究,設(shè)計(jì)得好可以加速查詢,設(shè) 計(jì)不好反而會(huì)影響查詢效率 。我根據(jù)我們的業(yè)務(wù)場(chǎng)景, ?因?yàn)槲覀兊牟樵兇蟛糠侄际菚r(shí)間和內(nèi)容 ID 相關(guān)的, ?比 ? 如說(shuō),某個(gè)內(nèi)容,過(guò)去 N 分鐘在各個(gè)?群表現(xiàn)如何?我按照?期,分鐘粒度時(shí)間和內(nèi)容 ID 建立了稀疏索引 。?針對(duì)某個(gè)內(nèi)容的查詢, ?建立稀疏索引之后,可以減少 99%的?件掃描。
還有?個(gè)問(wèn)題就是,我們現(xiàn)在數(shù)據(jù)量太大,維度太多 。拿 QQ 看點(diǎn)的視頻內(nèi)容來(lái)說(shuō),? 天流水有上百億條,有 些維度有?百個(gè)類別 。如果?次性把所有維度進(jìn)?預(yù)聚合,數(shù)據(jù)量會(huì)指數(shù)膨脹,查詢反而變慢,并且會(huì)占用大 量?jī)?nèi)存空間 。我們的優(yōu)化,針對(duì)不同的維度, ?建立對(duì)應(yīng)的預(yù)聚合物化視圖,用空間換時(shí)間, ?這樣可以縮短查詢 的時(shí)間。
騰訊看點(diǎn)高性能存儲(chǔ)

分布式表查詢還會(huì)有?個(gè)問(wèn)題,查詢單個(gè)內(nèi)容 ID 的信息,分布式表會(huì)將查詢下發(fā)到所有的分片上, ?然后再返 ?回查詢結(jié)果進(jìn)?匯總 。實(shí)際上, ?因?yàn)樽鲞^(guò)路由,?個(gè)內(nèi)容 ID 只存在于?個(gè)分片上, ?剩下的分片都在空跑 。針 ?對(duì)這類查詢,我們的優(yōu)化是后臺(tái)按照同樣的規(guī)則先進(jìn)?路由, ?直接查詢目標(biāo)分片, ?這樣減少了 N-1/N 的負(fù)載, 可以大量縮短查詢時(shí)間 。而且由于我們是提供的 OLAP 查詢,數(shù)據(jù)滿足最終?致性即可,通過(guò)主從副本讀寫(xiě)分 離,可以進(jìn)?步提升性能。
我們?cè)诤笈_(tái)還做了?個(gè) 1 分鐘的數(shù)據(jù)緩存,針對(duì)相同條件查詢,后臺(tái)就直接返回了。
4. 擴(kuò)容
這里再介紹?下我們的擴(kuò)容的?案,調(diào)研了業(yè)內(nèi)的?些常??案。
比如 HBase,原始數(shù)據(jù)都存放在 HDFS 上, ?擴(kuò)容只是 Region Server 擴(kuò)容,不涉及原始數(shù)據(jù)的遷移 。但是 Clickhouse 的每個(gè)分片數(shù)據(jù)都是在本地, ?是?個(gè)比較底層存儲(chǔ)引擎,不能像 HBase 那樣?便擴(kuò)容。
Redis 是哈希槽這種類似?致性哈希的方式, ?是比較經(jīng)典分布式緩存的方案 。Redis slot 在 Rehash 的過(guò)程中 雖然存在短暫的 ask 讀不可用,但是總體來(lái)說(shuō)遷移是比較方便的,從原 h[0]遷移到 h[1], ?最后再刪除 h[0]。但是 Clickhouse ?部分都是 OLAP 批量查詢,不是點(diǎn)查,而且由于列式存儲(chǔ),不支持刪除的特性,? 致性哈 希的方案不是很適合。
目前擴(kuò)容的方案是, ?另外消費(fèi)?份數(shù)據(jù), ?寫(xiě)?新 Clickhouse 集群,兩個(gè)集群?起跑?段時(shí)間, ?因?yàn)閷?shí)時(shí)數(shù)據(jù) 就保存 3 天,等 3 天之后,后臺(tái)服務(wù)直接訪問(wèn)新集群。
4. 有贊實(shí)時(shí)數(shù)倉(cāng)案例
1) 分層設(shè)計(jì)
傳統(tǒng)離線數(shù)倉(cāng)的分層設(shè)計(jì)?家都很熟悉,為了規(guī)范的組織和管理數(shù)據(jù),層級(jí)劃分會(huì)比較多,在?些復(fù)雜邏輯處 理場(chǎng)景還會(huì)引?臨時(shí)層落地中間結(jié)果以方便下游加?處理 。實(shí)時(shí)數(shù)倉(cāng)考慮到時(shí)效性問(wèn)題,分層設(shè)計(jì)需要盡量精
簡(jiǎn), ?降低中間流程出錯(cuò)的可能性,不過(guò)總體而?, ?實(shí)時(shí)數(shù)倉(cāng)還是會(huì)參考離線數(shù)倉(cāng)的分層思想來(lái)設(shè)計(jì)。
實(shí)時(shí)數(shù)倉(cāng)分層架構(gòu)如下圖所示 :

ODS ?( 實(shí)時(shí)數(shù)據(jù)接入層)
ODS 層, ?即實(shí)時(shí)數(shù)據(jù)接?層,通過(guò)數(shù)據(jù)采集?具收集各個(gè)業(yè)務(wù)系統(tǒng)的實(shí)時(shí)數(shù)據(jù),對(duì)非結(jié)構(gòu)化的數(shù)據(jù)進(jìn)?結(jié)構(gòu)化 處理,保存原始數(shù)據(jù),?乎不過(guò)濾數(shù)據(jù);該層數(shù)據(jù)的主要來(lái)源有三個(gè)部分:第?部分是業(yè)務(wù)方創(chuàng)建的 NSQ 消 ?息,第?部分是業(yè)務(wù)數(shù)據(jù)庫(kù)的 Binlog 日志,第三部分是埋點(diǎn)日志和應(yīng)用程序日志, ?以上三部分的實(shí)時(shí)數(shù)據(jù)最終 統(tǒng)?寫(xiě)? Kafka 存儲(chǔ)介質(zhì)中。
ODS 層表命名規(guī)范:部門名稱.應(yīng)用名稱.數(shù)倉(cāng)層級(jí)主題域前綴數(shù)據(jù)庫(kù)名/消息名
例如:接?業(yè)務(wù)庫(kù)的 Binlog
實(shí)時(shí)數(shù)倉(cāng)表命名:? deptname.appname.ods_subjectname_tablename
例如:接?業(yè)務(wù)方的 NSQ 消息
實(shí)時(shí)數(shù)倉(cāng)表命名:? deptname.appname.ods_subjectname_msgname
DWS ?( 實(shí)時(shí)明細(xì)中間層)
DWS 層, ?即實(shí)時(shí)明細(xì)中間層,該層以業(yè)務(wù)過(guò)程作為建模驅(qū)動(dòng),基于每個(gè)具體的業(yè)務(wù)過(guò)程事件來(lái)構(gòu)建最細(xì)粒度 的明細(xì)層事實(shí)表;?比如交易過(guò)程,有下單事件 、支付事件 、發(fā)貨事件等,我們會(huì)基于這些獨(dú)立的事件來(lái)進(jìn)行明 細(xì)層的構(gòu)建 。在這層,事實(shí)明細(xì)數(shù)據(jù)同樣是按照離線數(shù)倉(cāng)的主題域來(lái)進(jìn)行劃分,也會(huì)采用維度建模的方式組織 數(shù)據(jù),對(duì)于?些重要的維度字段,會(huì)做適當(dāng)冗余 。基于有贊實(shí)時(shí)需求的場(chǎng)景, ?重點(diǎn)建設(shè)交易 、營(yíng)銷 、客戶 、店 鋪 、商品等主題域的數(shù)據(jù) 。該層的數(shù)據(jù)來(lái)源于 ODS 層,通過(guò) FlinkSQL 進(jìn)行 ETL 處理, ?主要?作有規(guī)范命 ? ? 名 、數(shù)據(jù)清洗 、維度補(bǔ)全 、多流關(guān)聯(lián), ?最終統(tǒng)?寫(xiě)? Kafka 存儲(chǔ)介質(zhì)中。
DWS 層表命名規(guī)范:? 部門名稱 .應(yīng)用名稱 .數(shù)倉(cāng)層級(jí)_主題域前綴_數(shù)倉(cāng)表命名
例如:實(shí)時(shí)事件 A 的中間層
實(shí)時(shí)數(shù)倉(cāng)表命名:? deptname.appname.dws_subjectname_tablename_eventnameA
例如:實(shí)時(shí)事件 B 的中間層
實(shí)時(shí)數(shù)倉(cāng)表命名:? deptname.appname.dws_subjectname_tablename_eventnameB
DIM ?( 實(shí)時(shí)維表層)
DIM 層, ?即實(shí)時(shí)維表層,用來(lái)存放維度數(shù)據(jù), ?主要用于實(shí)時(shí)明細(xì)中間層寬化處理時(shí)補(bǔ)全維度使用, ? 目前該層的 數(shù)據(jù)主要存儲(chǔ)于 HBase 中,后續(xù)會(huì)基于 QPS 和數(shù)據(jù)量大?提供更多合適類型的存儲(chǔ)介質(zhì)。
DIM 層表命名規(guī)范:? 應(yīng)用名稱_數(shù)倉(cāng)層級(jí)_主題域前綴_數(shù)倉(cāng)表命名
例如:HBase 存儲(chǔ), ?實(shí)時(shí)維度表
實(shí)時(shí)數(shù)倉(cāng)表命名:? appname_dim_tablename
DWA ?( 實(shí)時(shí)匯總層)
DWA 層, ?即實(shí)時(shí)匯總層,該層通過(guò) DWS 層數(shù)據(jù)進(jìn)行多維匯總,提供給下游業(yè)務(wù)方使用,在實(shí)際應(yīng)用過(guò)程中, 不同業(yè)務(wù)方使用維度匯總的方式不太?樣,根據(jù)不同的需求采用不同的技術(shù)方案去實(shí)現(xiàn) 。第?種方式,采用 ? ?FlinkSQL 進(jìn)行實(shí)時(shí)匯總,將結(jié)果指標(biāo)存? HBase 、MySQL 等數(shù)據(jù)庫(kù),該種方式是我們?cè)缙诓捎玫姆桨福瑑?yōu)點(diǎn) 是實(shí)現(xiàn)業(yè)務(wù)邏輯比較靈活,缺點(diǎn)是聚合粒度固化,不易擴(kuò)展;第?種方式,采用實(shí)時(shí) OLAP ?具進(jìn)行匯總,該 種方式是我們目前常用的方案,優(yōu)點(diǎn)是聚合粒度易擴(kuò)展,缺點(diǎn)是業(yè)務(wù)邏輯需要在中間層預(yù)處理。
DWA 層表命名規(guī)范:? 應(yīng)用名稱_數(shù)倉(cāng)層級(jí)_主題域前綴_聚合粒度_數(shù)據(jù)范圍
例如:HBase 存儲(chǔ),某域當(dāng)日某粒度實(shí)時(shí)匯總表
實(shí)時(shí)數(shù)倉(cāng)表命名:? appname_dwa_subjectname_aggname_daily
APP ?( 實(shí)時(shí)應(yīng)用層)
APP 層, ?即實(shí)時(shí)應(yīng)用層,該層數(shù)據(jù)已經(jīng)寫(xiě)入應(yīng)用系統(tǒng)的存儲(chǔ)中,例如寫(xiě)入 Druid 作為 BI 看板的實(shí)時(shí)數(shù)據(jù)集;寫(xiě)入 HBase 、MySQL 用于提供統(tǒng)?數(shù)據(jù)服務(wù)接口;寫(xiě)入 ClickHouse 用于提供實(shí)時(shí) OLAP 服務(wù) 。因?yàn)樵搶臃?常貼近業(yè)務(wù),在命名規(guī)范上實(shí)時(shí)數(shù)倉(cāng)不做統(tǒng)?要求。
2) 實(shí)時(shí) ETr
實(shí)時(shí)數(shù)倉(cāng) ETL 處理過(guò)程所涉及的組件比較多,接下來(lái)盤(pán)點(diǎn)構(gòu)建實(shí)時(shí)數(shù)倉(cāng)所需要的組件以及每個(gè)組件的應(yīng)用場(chǎng) 景 。如下圖所示:

具體實(shí)時(shí) ETL 處理流程如下圖所示:

1. 維度補(bǔ)全
創(chuàng)建調(diào)用 Duboo 接口的 UDF 函數(shù)在實(shí)時(shí)流里補(bǔ)全維度是最便捷的使用方式,但如果請(qǐng)求量過(guò)大,對(duì) Duboo ?接口壓力會(huì)過(guò)大 。在實(shí)際應(yīng)用場(chǎng)景補(bǔ)全維度首選還是關(guān)聯(lián)維度表,但關(guān)聯(lián)也存在?定概率的丟失問(wèn)題,為了彌 補(bǔ)這種丟失,可以采用 Duboo 接口調(diào)用兜底的方式來(lái)補(bǔ)全 。偽代碼如下:
create function call_dubbo as 'XXXXXXX';
create function get_json_object as 'XXXXXXX';
case
when cast( b.column as bigint) is not null
then cast( b.column as bigint)
else cast(coalesce(cast(get_json_object(call_dubbo( 'clusterUrl '
, 'serviceName '
, 'methodName '
,cast(concat( ' [ ',cast(a.column as varchar), '] ') as varchar)
, 'key '
)
, 'rootId ')
as bigint)
,a.column)
as bigint) ?end
2. 幕等處理
實(shí)時(shí)任務(wù)在運(yùn)?過(guò)程中難免會(huì)遇到執(zhí)?異常的情況, ?當(dāng)任務(wù)異常重啟的時(shí)候會(huì)導(dǎo)致部分消息重新發(fā)送和消費(fèi), 從而引發(fā)下游實(shí)時(shí)統(tǒng)計(jì)數(shù)據(jù)不準(zhǔn)確,為了有效避免這種情況,可以選擇對(duì)實(shí)時(shí)消息流做冪等處理, ?當(dāng)消費(fèi)完? 條消息,將這條消息的 Key 存? KV,如果任務(wù)異常重啟導(dǎo)致消息重新發(fā)送的時(shí)候,先從 KV 判斷該消息是否 已被消費(fèi),如果已消費(fèi)就不再往下發(fā)送 。偽代碼如下:
create function idempotenc as 'XXXXXXX';
insert into table
select
order_no
from
(
select
a.orderNo ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?as ?order_no
, idempotenc( 'XXXXXXX', coalesce( order_no, ' ') ) ?as ?rid
from
table1
) t
where
t.rid = 0;
3. 數(shù)據(jù)驗(yàn)證
由于實(shí)時(shí)數(shù)倉(cāng)的數(shù)據(jù)是?邊界的流,相比于離線數(shù)倉(cāng)固定不變的數(shù)據(jù)更難驗(yàn)收 。基于不同的場(chǎng)景,我們提供了 2 種驗(yàn)證?式,分別是:抽樣驗(yàn)證與全量驗(yàn)證 。如下圖所示
抽樣驗(yàn)證?案
該?案主要應(yīng)用在數(shù)據(jù)準(zhǔn)確性驗(yàn)證上, ?實(shí)時(shí)匯總結(jié)果是基于存儲(chǔ)在 Kafka 的實(shí)時(shí)明細(xì)中間層計(jì)算而來(lái),但 ? ? ? Kafka 本身不?持按照特定條件檢索,不?持寫(xiě)查詢語(yǔ)句,再加上消息的?邊界性,統(tǒng)計(jì)結(jié)果是在不斷變化 ? ?的,很難尋找參照物進(jìn)?比對(duì) 。鑒于此,我們采用了持久化消息的?法,將消息落盤(pán)到 TiDB 存儲(chǔ),基于 TiDB 的能?對(duì)落盤(pán)的消息進(jìn)?檢索 、查詢 、匯總 。編寫(xiě)固定時(shí)間邊界的測(cè)試用例與相同時(shí)間邊界的業(yè)務(wù)庫(kù)數(shù)據(jù)或者 離線數(shù)倉(cāng)數(shù)據(jù)進(jìn)?比對(duì) 。通過(guò)以上?式,抽樣核?店鋪的數(shù)據(jù)進(jìn)?指標(biāo)準(zhǔn)確性驗(yàn)證,確保測(cè)試用例全部通過(guò)。
全量驗(yàn)證?案
該?案主要應(yīng)用在數(shù)據(jù)完整性和?致性驗(yàn)證上, ?在實(shí)時(shí)維度表驗(yàn)證的場(chǎng)景使用最多 。大體思路:將存儲(chǔ)實(shí)時(shí)維 度表的在線 HBase 集群中的數(shù)據(jù)同步到離線 HBase 集群中,再將離線 HBase 集群中的數(shù)據(jù)導(dǎo)?到 Hive 中, 在限定實(shí)時(shí)維度表的時(shí)間邊界后,通過(guò)數(shù)據(jù)平臺(tái)提供的數(shù)據(jù)校驗(yàn)功能, ?比對(duì)實(shí)時(shí)維度表與離線維度表是否存在 差異, ?最終確保兩張表的數(shù)據(jù)完全?致。

4. 數(shù)據(jù)恢復(fù)
實(shí)時(shí)任務(wù)?旦上線就要求持續(xù)不斷的提供準(zhǔn)確 、穩(wěn)定的服務(wù) 。區(qū)別于離線任務(wù)按天調(diào)度,如果離線任務(wù)出現(xiàn) ? Bug,會(huì)有充足的時(shí)間去修復(fù) 。如果實(shí)時(shí)任務(wù)出現(xiàn) Bug,必須按照提前制定好的流程,嚴(yán)格按照步驟執(zhí)行,否 則極易出現(xiàn)問(wèn)題 。造成 Bug 的情況有非常多, ?比如代碼 Bug 、異常數(shù)據(jù) Bug 、實(shí)時(shí)集群 Bug,如下圖展示了 修復(fù)實(shí)時(shí)任務(wù) Bug 并恢復(fù)數(shù)據(jù)的流程。

5. 騰訊全場(chǎng)景實(shí)時(shí)數(shù)倉(cāng)建設(shè)案例
在數(shù)倉(cāng)體系中會(huì)有各種各樣的大數(shù)據(jù)組件,譬如 Hive/HBase/HDFS/S3,計(jì)算引擎如 MapReduce 、Spark 、 ?Flink,根據(jù)不同的需求,用戶會(huì)構(gòu)建大數(shù)據(jù)存儲(chǔ)和處理平臺(tái),數(shù)據(jù)在平臺(tái)經(jīng)過(guò)處理和分析,結(jié)果數(shù)據(jù)會(huì)保存到 MySQL 、Elasticsearch 等支持快速查詢的關(guān)系型 、非關(guān)系型數(shù)據(jù)庫(kù)中,接下來(lái)應(yīng)用層就可以基于這些數(shù)據(jù)進(jìn) 行 BI 報(bào)表開(kāi)發(fā) 、用戶畫(huà)像,或基于 Presto 這種 OLAP 工具進(jìn)行交互式查詢等。

1) Lambda 架構(gòu)的痛點(diǎn)
在整個(gè)過(guò)程中我們常常會(huì)用?些離線的調(diào)度系統(tǒng),定期的 ?( T+1 或者每隔??時(shí)) ?去執(zhí)行?些 Spark 分析任 ? ?務(wù),做?些數(shù)據(jù)的輸? 、輸出或是 ETL ?作 。離線數(shù)據(jù)處理的整個(gè)過(guò)程中必然存在數(shù)據(jù)延遲的現(xiàn)象,不管是數(shù) 據(jù)接?還是中間的分析,數(shù)據(jù)的延遲都是比較大的,可能是?時(shí)級(jí)也有可能是天級(jí)別的 。另外?些場(chǎng)景中我們 也常常會(huì)為了?些實(shí)時(shí)性的需求去構(gòu)建?個(gè)實(shí)時(shí)處理過(guò)程, ?比如借助 Flink+Kafka 去構(gòu)建實(shí)時(shí)的流處理系統(tǒng)。
整體上, ?數(shù)倉(cāng)架構(gòu)中有非常多的組件,大大增加了整個(gè)架構(gòu)的復(fù)雜性和運(yùn)維的成本。
如下圖, ?這是很多公司之前或者現(xiàn)在正在采用的 Lambda 架構(gòu), ?Lambda 架構(gòu)將數(shù)倉(cāng)分為離線層和實(shí)時(shí)層,相 應(yīng)的就有批處理和流處理兩個(gè)相互獨(dú)立的數(shù)據(jù)處理流程, ?同?份數(shù)據(jù)會(huì)被處理兩次以上, ?同?套業(yè)務(wù)邏輯代碼 需要適配性的開(kāi)發(fā)兩次 。Lambda 架構(gòu)大家應(yīng)該已經(jīng)非常熟悉了,下面我就著重介紹?下我們采用 Lambda 架 構(gòu)在數(shù)倉(cāng)建設(shè)過(guò)程中遇到的?些痛點(diǎn)問(wèn)題。

例如在實(shí)時(shí)計(jì)算?些用戶相關(guān)指標(biāo)的實(shí)時(shí)場(chǎng)景下, ?我們想看到當(dāng)前 pv 、uv 時(shí),我們會(huì)將這些數(shù)據(jù)放到實(shí)時(shí)層 去做?些計(jì)算, ?這些指標(biāo)的值就會(huì)實(shí)時(shí)呈現(xiàn)出來(lái),但同時(shí)想了解用戶的?個(gè)增?趨勢(shì), ?需要把過(guò)去?天的數(shù)據(jù) 計(jì)算出來(lái) 。這樣就需要通過(guò)批處理的調(diào)度任務(wù)來(lái)實(shí)現(xiàn), ?比如凌晨?jī)扇c(diǎn)的時(shí)候在調(diào)度系統(tǒng)上起?個(gè) Spark 調(diào)度 任務(wù)把當(dāng)天所有的數(shù)據(jù)重新跑?遍。
很顯然在這個(gè)過(guò)程中, ?由于兩個(gè)過(guò)程運(yùn)?的時(shí)間是不?樣的,跑的數(shù)據(jù)卻相同, ?因此可能造成數(shù)據(jù)的不?致 。因?yàn)槟?條或?條數(shù)據(jù)的更新, ?需要重新跑?遍整個(gè)離線分析的鏈路,數(shù)據(jù)更新成本很?, ?同時(shí)需要維護(hù)離線 和實(shí)時(shí)分析兩套計(jì)算平臺(tái),整個(gè)上下兩層的開(kāi)發(fā)流程和運(yùn)維成本其實(shí)都是?常高的。
為了解決 Lambda 架構(gòu)帶來(lái)的各種問(wèn)題,就誕生了 Kappa 架構(gòu), ?這個(gè)架構(gòu)?家應(yīng)該也?常的熟悉。
2) Kappa 架構(gòu)的痛點(diǎn)
我們來(lái)講?下 Kappa 架構(gòu),如下圖, ?它中間其實(shí)用的是消息隊(duì)列,通過(guò)用 Flink 將整個(gè)鏈路串聯(lián)起來(lái) 。Kappa 架構(gòu)解決了 Lambda 架構(gòu)中離線處理層和實(shí)時(shí)處理層之間由于引擎不?樣,導(dǎo)致的運(yùn)維成本和開(kāi)發(fā)成本高昂的 問(wèn)題,但 Kappa 架構(gòu)也有其痛點(diǎn)。
首先,在構(gòu)建實(shí)時(shí)業(yè)務(wù)場(chǎng)景時(shí),會(huì)用到 Kappa 去構(gòu)建?個(gè)近實(shí)時(shí)的場(chǎng)景,但如果想對(duì)數(shù)倉(cāng)中間層例如
ODS 層做?些簡(jiǎn)單的 OLAP 分析或者進(jìn)?步的數(shù)據(jù)處理時(shí),如將數(shù)據(jù)寫(xiě)到 DWD 層的 Kafka,則需要另外 接? Flink。同時(shí), ?當(dāng)需要從 DWD 層的 Kafka 把數(shù)據(jù)再導(dǎo)?到 Clickhouse, ?Elasticsearch, ?MySQL 或 ?者是 Hive 里?做進(jìn)?步的分析時(shí), ?顯然就增加了整個(gè)架構(gòu)的復(fù)雜性。
其次, ?Kappa 架構(gòu)是強(qiáng)烈依賴消息隊(duì)列的,我們知道消息隊(duì)列本身在整個(gè)鏈路上數(shù)據(jù)計(jì)算的準(zhǔn)確性是嚴(yán)格
依賴它上游數(shù)據(jù)的順序,消息隊(duì)列接的越多,發(fā)生亂序的可能性就越? 。ODS 層數(shù)據(jù)?般是絕對(duì)準(zhǔn)確的, 把 ODS 層的數(shù)據(jù)發(fā)送到下?個(gè) kafka 的時(shí)候就有可能發(fā)生亂序, ?DWD 層再發(fā)到 DWS 的時(shí)候可能?亂序 了, ?這樣數(shù)據(jù)不?致性就會(huì)變得很嚴(yán)重。
第三, ?Kafka 由于它是?個(gè)順序存儲(chǔ)的系統(tǒng),順序存儲(chǔ)系統(tǒng)是沒(méi)有辦法直接在其上?利用 OLAP 分析的? 些優(yōu)化策略,例如謂詞下推這類的優(yōu)化策略,在順序存儲(chǔ)的 Kafka 上來(lái)實(shí)現(xiàn)是比較困難的事情。
那么有沒(méi)有這樣?個(gè)架構(gòu), ?既能夠滿足實(shí)時(shí)性的需求, ??能夠滿足離線計(jì)算的要求,而且還能夠減輕運(yùn)維開(kāi)發(fā) 的成本,解決通過(guò)消息隊(duì)列構(gòu)建 Kappa 架構(gòu)過(guò)程中遇到的?些痛點(diǎn)?答案是肯定的,后?的篇幅會(huì)詳細(xì)論 ? ? 述。

3) 痛點(diǎn)總結(jié)
傳統(tǒng) T+1 任務(wù)
海量的TB級(jí) T+ 1 任務(wù)延遲導(dǎo)致下游數(shù)據(jù)產(chǎn)出時(shí)間不穩(wěn)定。
任務(wù)遇到故障重試恢復(fù)代價(jià)昂貴
數(shù)據(jù)架構(gòu)在處理去重和 exactly-once語(yǔ)義能??面比較吃?
架構(gòu)復(fù)雜,涉及多個(gè)系統(tǒng)協(xié)調(diào), ?靠調(diào)度系統(tǒng)來(lái)構(gòu)建任務(wù)依賴關(guān)系
Lambda 架構(gòu)痛點(diǎn)
同時(shí)維護(hù)實(shí)時(shí)平臺(tái)和離線平臺(tái)兩套引擎, ?運(yùn)維成本高
實(shí)時(shí)離線兩個(gè)平臺(tái)需要維護(hù)兩套框架不同但業(yè)務(wù)邏輯相同代碼, ?開(kāi)發(fā)成本高
數(shù)據(jù)有兩條不同鏈路,容易造成數(shù)據(jù)的不?致性
數(shù)據(jù)更新成本大, ?需要重跑鏈路
Kappa 架構(gòu)痛點(diǎn)
對(duì)消息隊(duì)列存儲(chǔ)要求高,消息隊(duì)列的回溯能?不及離線存儲(chǔ)
消息隊(duì)列本身對(duì)數(shù)據(jù)存儲(chǔ)有時(shí)效性,且當(dāng)前?法使用 OLAP 引擎直接分析消息隊(duì)列中的數(shù)據(jù) ?全鏈路依賴消息隊(duì)列的實(shí)時(shí)計(jì)算可能因?yàn)閿?shù)據(jù)的時(shí)序性導(dǎo)致結(jié)果不正確
4)實(shí)時(shí)數(shù)倉(cāng)建設(shè)需求
是否存在?種存儲(chǔ)技術(shù), ?既能夠?持?jǐn)?shù)據(jù)高效的回溯能? ,?持?jǐn)?shù)據(jù)的更新, ??能夠?qū)崿F(xiàn)數(shù)據(jù)的批流讀寫(xiě),并 且還能夠?qū)崿F(xiàn)分鐘級(jí)到秒級(jí)的數(shù)據(jù)接??
這也是實(shí)時(shí)數(shù)倉(cāng)建設(shè)的迫切需求 。實(shí)際上是可以通過(guò)對(duì) Kappa 架構(gòu)進(jìn)行升級(jí), ?以解決 Kappa 架構(gòu)中遇到的? 些問(wèn)題,接下來(lái)主要分享當(dāng)前比較火的數(shù)據(jù)湖技術(shù)--Iceberg。

5) 數(shù)據(jù)湖 Apache Iceberg 的介紹
1.Iceberg 是什么
首先介紹?下什么是 Iceberg。官網(wǎng)描述如下:
Apache Iceberg is an open table format for huge analytic datasets. Iceberg adds tables to Presto and Spark that use a high-performance format that works just like a SQL table.
Iceberg 的官方定義是?種表格式,可以簡(jiǎn)單理解為是基于計(jì)算層 ?( Flink , Spark) ?和存儲(chǔ)層 ?( ORC, ? ? ? ? ?Parqurt,Avro) ?的?個(gè)中間層,用 Flink 或者 Spark 將數(shù)據(jù)寫(xiě)入 Iceberg,然后再通過(guò)其他方式來(lái)讀取這個(gè) 表, ?比如 Spark, ?Flink, ?Presto 等。

Iceberg 是為分析海量數(shù)據(jù)準(zhǔn)備的,被定義為 table format,table format 介于計(jì)算層和存儲(chǔ)層之間。
tableformat 主要用于向下管理在存儲(chǔ)系統(tǒng)上的?件, ?向上為計(jì)算層提供?些接? 。存儲(chǔ)系統(tǒng)上的?件存儲(chǔ)都 會(huì)采用?定的組織形式,譬如讀?張 Hive 表的時(shí)候, ?HDFS ?件系統(tǒng)會(huì)帶?些 partition,數(shù)據(jù)存儲(chǔ)格式 、數(shù) ?據(jù)壓縮格式 、數(shù)據(jù)存儲(chǔ) HDFS 目錄的信息等, ?這些信息都存在 Metastore 上, ?Metastore 就可以稱之為?種? 件組織格式。
?個(gè)優(yōu)秀的?件組織格式,如 Iceberg,可以更高效的?持上層的計(jì)算層訪問(wèn)磁盤(pán)上的?件,做?些 list、 rename 或者查找等操作。
3.Iceberg 的能力總結(jié)
Iceberg 目前?持三種?件格式 parquet,Avro, ?ORC,?論是 HDFS 或者 S3 上的?件,可以看到有?存也 有列存,后面會(huì)詳細(xì)的去介紹其作用 。Iceberg 本身具備的能?總結(jié)如下, ?這些能?對(duì)于后面我們利用 ? ? ? ? ?Iceberg 來(lái)構(gòu)建實(shí)時(shí)數(shù)倉(cāng)是非常重要的。

4.Iceberg 的文件組織格式介紹
下圖展示的是 Iceberg 的整個(gè)?件組織格式 。從上往下看:
首先最上層是 snapshot 模塊 。Iceberg 里面的 snapshot 是?個(gè)用戶可讀取的基本的數(shù)據(jù)單位,也就是說(shuō) 用戶每次讀取?張表里面的所有數(shù)據(jù),都是?個(gè)snapshot 下的數(shù)據(jù)。
其次, ?manifest。?個(gè) snapshot 下面會(huì)有多個(gè) manifest,如圖 snapshot-0 有兩個(gè) manifest,而 snapshot-1 有三個(gè) manifest,每個(gè) manifest 下面會(huì)管理?個(gè)至多個(gè) DataFiles ?件。
第三, ?DataFiles。manifest ?件里面存放的就是數(shù)據(jù)的元信息,我們可以打開(kāi) manifest ?件,可以看到 里面其實(shí)是???的 datafiles ?件路徑。
從圖上看到,snapshot-1 包含了 snapshop-0 的數(shù)據(jù),而 snapshot-1 這個(gè)時(shí)刻寫(xiě)?的數(shù)據(jù)只有 manifest2, 這個(gè)能?其實(shí)就為我們后面去做增量讀取提供了?個(gè)很好的?持。

5.Iceberg 讀寫(xiě)過(guò)程介紹
Apache Iceberg 讀寫(xiě)
首先,如果有?個(gè) write 操作,在寫(xiě) snapsho-1 的時(shí)候,snapshot-1 是虛線框,也就是說(shuō)此時(shí)還沒(méi)有發(fā)生 commit 操作 。這時(shí)候?qū)?snapshot-1 的讀其實(shí)是不可讀的, ?因?yàn)橛脩舻淖x只能讀到已經(jīng) commit 之后的 ? ?snapshot。發(fā)生 commit 之后才可以讀 。同理,會(huì)有 snapshot-2,snapshot-3。
Iceberg 提供的?個(gè)重要能力,就是讀寫(xiě)分離能力 。在對(duì) snapshot-4 進(jìn)行寫(xiě)的時(shí)候,其實(shí)是完全不影響對(duì) snapshot-2 和 snapshot-3 的讀 。Iceberg 的這個(gè)能力對(duì)于構(gòu)建實(shí)時(shí)數(shù)倉(cāng)是非常重要的能力之? 。

同理,讀也是可以并發(fā)的,可以同時(shí)讀 s1 、s2 、s3 的快照數(shù)據(jù), ?這就提供了回溯讀到 snapshot-2 或者 ? ? ? ? snapshot-3 數(shù)據(jù)的能力 。Snapshot-4 寫(xiě)完成之后,會(huì)發(fā)生?次 commit 操作, ?這個(gè)時(shí)候 snapshot-4 變成了
實(shí)心,此時(shí)就可以讀了 。另外,可以看到 current Snapshot 的指針移到 s4,也就是說(shuō)默認(rèn)情況下, ?用戶對(duì)? 張表的讀操作,都是讀 current Snapshot 指針?biāo)赶虻?Snapshot,但不會(huì)影響前面的 snapshot 的讀操作。
Apache Iceberg 增量讀
接下來(lái)講?下 Iceberg 的增量讀 。首先我們知道 Iceberg 的讀操作只能基于已經(jīng)提交完成的 snapshot-1,此 ?時(shí)會(huì)有?個(gè) snapshot-2,可以看到每個(gè) snapshot 都包含前面 snapshot 的所有數(shù)據(jù),如果每次都讀全量的數(shù) 據(jù),整個(gè)鏈路上對(duì)計(jì)算引擎來(lái)說(shuō),讀取的代價(jià)非常高。
如果只希望讀到當(dāng)前時(shí)刻新增的數(shù)據(jù), ?這個(gè)時(shí)候其實(shí)就可以根據(jù) Iceberg 的 snapshot 的回溯機(jī)制,僅讀取 snapshot1 到 snapshot2 的增量數(shù)據(jù),也就是紫色這塊的數(shù)據(jù)可以讀的。

同理 s3 也是可以只讀黃色的這塊區(qū)域的數(shù)據(jù), ?同時(shí)也可以讀 s3 到 s1 這塊的增量數(shù)據(jù),基于 Flink source 的 streaming reader 功能在內(nèi)部我們已經(jīng)實(shí)現(xiàn)這種增量讀取的功能,并且已經(jīng)在線上運(yùn)行了 。剛才講到了?個(gè)非 常重要的問(wèn)題, ?既然 Iceberg 已經(jīng)有了讀寫(xiě)分離,并發(fā)讀,增量讀的功能, ?Iceberg 要跟 Flink 實(shí)現(xiàn)對(duì)接,那 ?么就必須實(shí)現(xiàn) Iceberg 的 sink。
實(shí)時(shí)小文件問(wèn)題
社區(qū)現(xiàn)在已經(jīng)重構(gòu)了 Flink 里面的 FlinkIcebergSink,提供了 global committee 的功能,我們的架構(gòu)其實(shí)跟社 區(qū)的架構(gòu)是保持?致的, ?曲線框中的這塊內(nèi)容是 FlinkIcebergSink。
在有多個(gè) IcebergStreamWriter 和?個(gè) IcebergFileCommitter 的情況下,上游的數(shù)據(jù)寫(xiě)到 IcebergStreamWriter 的時(shí)候,每個(gè) writer 里面做的事情都是去寫(xiě) datafiles ?件。

當(dāng)每個(gè) writer 寫(xiě)完自己當(dāng)前這?批 datafiles 小文件的時(shí)候,就會(huì)發(fā)送消息給 IcebergmileCommitter,告訴它 可以提交了 。而 IcebergmileCommitter 收到信息的時(shí),就?次性將 datafiles 的文件提交, ?進(jìn)行?次 commit 操作。
commit 操作本身只是對(duì)?些原始信息的修改, ?當(dāng)數(shù)據(jù)都已經(jīng)寫(xiě)到磁盤(pán)了, ?只是讓其從不可見(jiàn)變成可見(jiàn) 。在這 個(gè)情況下, ?Iceberg 只需要用?個(gè) commit 即可完成數(shù)據(jù)從不可見(jiàn)變成可見(jiàn)的過(guò)程。
實(shí)時(shí)小文件合并
mlink 實(shí)時(shí)作業(yè)?般會(huì)長(zhǎng)期在集群中運(yùn)行,為了要保證數(shù)據(jù)的時(shí)效性,? 般會(huì)把 Iceberg commit 操作的時(shí)間周 期設(shè)成 30 秒或者是?分鐘 。當(dāng) mlink 作業(yè)跑?天時(shí),如果是?分鐘?次 commit,? 天需要 1440 個(gè) ? ? ? ? ? ? ?commit,如果 mlink 作業(yè)跑?個(gè)月commit 操作會(huì)更多 。甚至 snapshot commit 的時(shí)間間隔越短,生成的 ? ? ?snapshot 的數(shù)量會(huì)越多 。當(dāng)流式作業(yè)運(yùn)行后,就會(huì)生成大量的小文件。
這個(gè)問(wèn)題如果不解決的話, ?Iceberg 在 mlink 處理引擎上的 sink 操作就不可用了 。我們?cè)趦?nèi)部實(shí)現(xiàn)了?個(gè)叫做 data compaction operator 的功能, ?這個(gè) operator 是跟著 mlink sink ?起走的 。當(dāng) Iceberg 的 ? ? ? ? ? ? ? ? ? ? mlinkIcebergSink 每完成?次 commit 操作的時(shí)候, ?它都會(huì)向下游 mileScanTaskGen 發(fā)送消息,告訴 ? ? ? ? ? ?mileScanTaskGen 已經(jīng)完成了?次 commit。

FileScanTaskGen 里面會(huì)有相關(guān)的邏輯,能夠根據(jù)用戶的配置或者當(dāng)前磁盤(pán)的特性來(lái)進(jìn)?文件合并任務(wù)的生成 操作 。FileScanTaskGen 發(fā)送到 DataFileRewitre 的內(nèi)容其實(shí)就是在 FileScanTaskGen 里面生成的需要合并的 文件的列表 。同理, ?因?yàn)楹喜⑽募切枰?定的耗時(shí)操作,所以需要將其進(jìn)?異步的操作分發(fā)到不同的task ? rewrite operator 中。
上面講過(guò)的 Iceberg 是有 commit 操作,對(duì)于 rewrite 之后的文件需要有?個(gè)新的 snapshot 。這里對(duì) Iceberg 來(lái)說(shuō),也是?個(gè) commit 操作,所以采用?個(gè)單并發(fā)的像 commit 操作?樣的事件。
整條鏈路下來(lái),小文件的合并目前采用的是 commit 操作,如果 commit 操作后面阻塞了,會(huì)影響前面的寫(xiě)? 操作, ?這塊我們后面會(huì)持續(xù)優(yōu)化。
6) Flink+Iceberg 構(gòu)建實(shí)時(shí)數(shù)倉(cāng)
1.近實(shí)時(shí)的數(shù)據(jù)接入
前面介紹了 Iceberg 既支持讀寫(xiě)分離, ?又支持并發(fā)讀 、增量讀 、小文件合并, ?還可以支持秒級(jí)到分鐘級(jí)的延 遲,基于這些優(yōu)勢(shì)我們嘗試采用 Iceberg 這些功能來(lái)構(gòu)建基于 Flink 的實(shí)時(shí)全鏈路批流?體化的實(shí)時(shí)數(shù)倉(cāng)架 構(gòu)。
如下圖所示, ?Iceberg 每次的 commit 操作,都是對(duì)數(shù)據(jù)的可?性的改變, ?比如說(shuō)讓數(shù)據(jù)從不可?變成可?, 在這個(gè)過(guò)程中,就可以實(shí)現(xiàn)近實(shí)時(shí)的數(shù)據(jù)記錄。

2.實(shí)時(shí)數(shù)倉(cāng) - 數(shù)據(jù)湖分析系統(tǒng)
此前需要先進(jìn)行數(shù)據(jù)接?, ?比如用 Spark 的離線調(diào)度任務(wù)去跑?些數(shù)據(jù),拉取,抽取最后再寫(xiě)?到 Hive 表里 面, ?這個(gè)過(guò)程的延時(shí)比較大 。有了 Iceberg 的表結(jié)構(gòu),可以中間使用 Flink,或者 spark streaming,完成近實(shí) 時(shí)的數(shù)據(jù)接? 。
基于以上功能,我們?cè)賮?lái)回顧?下前面討論的 Kappa 架構(gòu), ?Kappa 架構(gòu)的痛點(diǎn)上面已經(jīng)描述過(guò), ?Iceberg 既然 能夠作為?個(gè)優(yōu)秀的表格式, ?既支持 Streaming reader, ?又可以支持 Streaming sink, ?是否可以考慮將 Kafka 替換成 Iceberg?
Iceberg 底層依賴的存儲(chǔ)是像 HDFS 或 S3 這樣的廉價(jià)存儲(chǔ),而且 Iceberg 是支持 parquet 、orc 、Avro 這樣的 列式存儲(chǔ) 。有列式存儲(chǔ)的支持,就可以對(duì) OLAP 分析進(jìn)行基本的優(yōu)化,在中間層直接進(jìn)行計(jì)算 。例如謂詞下推 最基本的 OLAP 優(yōu)化策略,基于 Iceberg snapshot 的 Streaming reader 功能,可以把離線任務(wù)天級(jí)別到小時(shí) 級(jí)別的延遲大大的降低, ?改造成?個(gè)近實(shí)時(shí)的數(shù)據(jù)湖分析系統(tǒng)。

在中間處理層,可以用 presto 進(jìn)行?些簡(jiǎn)單的查詢, ?因?yàn)?Iceberg 支持 Streaming read,所以在系統(tǒng)的中間 層也可以直接接? mlink, ?直接在中間層用 mlink 做?些批處理或者流式計(jì)算的任務(wù),把中間結(jié)果做進(jìn)?步計(jì)算 后輸出到下游。
替換 Kafka 的優(yōu)劣勢(shì):
總的來(lái)說(shuō),Iceberg 替換 Kafka 的優(yōu)勢(shì)主要包括:
實(shí)現(xiàn)存儲(chǔ)層的流批統(tǒng)?
中間層支持 OLAP 分析
完美支持高效回溯
存儲(chǔ)成本降低
當(dāng)然,也存在?定的缺陷,如:
數(shù)據(jù)延遲從實(shí)時(shí)變成近實(shí)時(shí)
對(duì)接其他數(shù)據(jù)系統(tǒng)需要額外開(kāi)發(fā)工作
秒級(jí)分析 - 數(shù)據(jù)湖加速:
由于 Iceberg 本身是將數(shù)據(jù)文件全部存儲(chǔ)在 工DmS 上的, ?工DmS 讀寫(xiě)這塊對(duì)于秒級(jí)分析的場(chǎng)景, ?還是不能夠完 全滿足我們的需求,所以接下去我們會(huì)在 Iceberg 底層支持 Alluxio 這樣?個(gè)緩存,借助于緩存的能?可以實(shí) 現(xiàn)數(shù)據(jù)湖的加速 。這塊的架構(gòu)也在我們未來(lái)的?個(gè)規(guī)劃和建設(shè)中。

(部分內(nèi)容來(lái)源網(wǎng)絡(luò),如有侵權(quán)請(qǐng)聯(lián)系刪除)