日日碰狠狠躁久久躁96avv-97久久超碰国产精品最新-婷婷丁香五月天在线播放,狠狠色噜噜色狠狠狠综合久久 ,爱做久久久久久,高h喷水荡肉爽文np肉色学校

睿治

智能數據治理平臺

睿治作為國內功能最全的數據治理產品之一,入選IDC企業數據治理實施部署指南。同時,在IDC發布的《中國數據治理市場份額》報告中,連續四年蟬聯數據治理解決方案市場份額第一。

2w字詳解大廠實時數倉建設

時間:2022-01-21來源:空青瀏覽數:580

? ? ??一、實時數倉建設背景

? ? ??1. 實時需求日趨迫切

? ? ??目前各大公司的產品需求和內部決策對于數據實時性的要求越來越迫切,需要實時數倉的能力來賦能。傳統離線數倉的數據時效性是 T+1,調度頻率以天為單位,無法支撐實時場景的數據需求。即使能將調度頻率設置成小時,也只能解決部分時效性要求不高的場景,對于實效性要求很高的場景還是無法優雅的支撐。因此實時使用數據的問題必須得到有效解決。

? ? ??2. 實時技術日趨成熟

? ? ??實時計算框架已經經歷了三代發展,分別是:Storm、SparkStreaming、Flink,計算框架越來越成熟。一方面,實時任務的開發已經能通過編寫 SQL 的方式來完成,在技術層面能很好地繼承離線數倉的架構設計思想;另一方面,在線數據開發平臺所提供的功能對實時任務開發、調試、運維的支持也日漸趨于成熟,開發成本逐步降低,有助于去做這件事。

? ? ??二、實時數倉建設目的?

? ? ??1. 解決傳統數倉的問題

? ? ??從目前數倉建設的現狀來看,實時數倉是一個容易讓人產生混淆的概念,根據傳統經驗分析,數倉有一個重要的功能,即能夠記錄歷史。通常,數倉都是希望從業務上線的第一天開始有數據,然后一直記錄到現在。但實時流處理技術,又是強調當前處理狀態的一個技術,結合當前一線大廠的建設經驗和滴滴在該領域的建設現狀,我們嘗試把公司內實時數倉建設的目的定位為,以數倉建設理論和實時技術,解決由于當前離線數倉數據時效性低解決不了的問題。

? ? ??現階段我們要建設實時數倉的主要原因是:

? ? ??公司業務對于數據的實時性越來越迫切,需要有實時數據來輔助完成決策; 實時數據建設沒有規范,數據可用性較差,無法形成數倉體系,資源大量浪費; 數據平臺工具對整體實時開發的支持也日漸趨于成熟,開發成本降低。?

? ? ??2. 實時數倉的應用場景?

? ? ??實時 OLAP 分析; 實時數據看板; 實時業務監控; 實時數據接口服務。 ? ? ? ? ??

? ? ??三、實時數倉建設方案

? ? ??接下來我們分析下目前實時數倉建設比較好的幾個案例,希望這些案例能夠給大家帶來一些啟發。

? ? ??1. 滴滴順風車實時數倉案例

? ? ??滴滴數據團隊建設的實時數倉,基本滿足了順風車業務方在實時側的各類業務需求,初步建立起順風車實時數倉,完成了整體數據分層,包含明細數據和匯總數據,統一了 DWD 層,降低了大數據資源消耗,提高了數據復用性,可對外輸出豐富的數據服務。

? ? ??數倉具體架構如下圖所示:

? ? ??從數據架構圖來看,順風車實時數倉和對應的離線數倉有很多類似的地方。例如分層結構;比如 ODS 層,明細層,匯總層,乃至應用層,他們命名的模式可能都是一樣的。但仔細比較不難發現,兩者有很多區別:

? ? ??與離線數倉相比,實時數倉的層次更少一些: 從目前建設離線數倉的經驗來看,數倉的數據明細層內容會非常豐富,處理明細數據外一般還會包含輕度匯總層的概念,另外離線數倉中應用層數據在數倉內部,但實時數倉中,app 應用層數據已經落入應用系統的存儲介質中,可以把該層與數倉的表分離; 應用層少建設的好處:實時處理數據的時候,每建一個層次,數據必然會產生一定的延遲; 匯總層少建的好處:在匯總統計的時候,往往為了容忍一部分數據的延遲,可能會人為的制造一些延遲來保證數據的準確。舉例,在統計跨天相關的訂單事件中的數據時,可能會等到 00:00:05 或者 00:00:10 再統計,確保 00:00 前的數據已經全部接受到位了,再進行統計。所以,匯總層的層次太多的話,就會更大的加重人為造成的數據延遲。?

? ? ??與離線數倉相比,實時數倉的數據源存儲不同: 在建設離線數倉的時候,目前滴滴內部整個離線數倉都是建立在 Hive 表之上。但是,在建設實時數倉的時候,同一份表,會使用不同的方式進行存儲。比如常見的情況下,明細數據或者匯總數據都會存在 Kafka 里面,但是像城市、渠道等維度信息需要借助 Hbase,mysql 或者其他 KV 存儲等數據庫來進行存儲。

? ? ??接下來,根據順風車實時數倉架構圖,對每一層建設做具體展開:

? ? ??1. ODS 貼源層建設

? ? ??根據順風車具體場景,目前順風車數據源主要包括訂單相關的 binlog 日志,冒泡和安全相關的 public 日志,流量相關的埋點日志等。這些數據部分已采集寫入 kafka 或 ddmq 等數據通道中,部分數據需要借助內部自研同步工具完成采集,最終基于順風車數倉 ods 層建設規范分主題統一寫入 kafka 存儲介質中。

命名規范:ODS 層實時數據源主要包括兩種。

? ? ??一種是在離線采集時已經自動生產的 DDMQ 或者是 Kafka topic,這類型的數據命名方式為采集系統自動生成規范為:cn-binlog-數據庫名-數據庫名 eg:cn-binlog-ihap_fangyuan-ihap_fangyuan 一種是需要自己進行采集同步到 kafka topic 中,生產的 topic 命名規范同離線類似:ODS 層采用:realtime_ods_binlog_{源系統庫/表名}/ods_log_{日志名} eg: realtime_ods_binlog_ihap_fangyuan

? ? ???2. DWD 明細層建設

? ? ??根據順風車業務過程作為建模驅動,基于每個具體的業務過程特點,構建最細粒度的明細層事實表;結合順風車分析師在離線側的數據使用特點,將明細事實表的某些重要維度屬性字段做適當冗余,完成寬表化處理,之后基于當前順風車業務方對實時數據的需求重點,重點建設交易、財務、體驗、安全、流量等幾大模塊;該層的數據來源于 ODS 層,通過大數據架構提供的 Stream SQL 完成 ETL 工作,對于 binlog 日志的處理主要進行簡單的數據清洗、處理數據漂移和數據亂序,以及可能對多個 ODS 表進行 Stream Join,對于流量日志主要是做通用的 ETL 處理和針對順風車場景的數據過濾,完成非結構化數據的結構化處理和數據的分流;該層的數據除了存儲在消息隊列 Kafka 中,通常也會把數據實時寫入 Druid 數據庫中,供查詢明細數據和作為簡單匯總數據的加工數據源。

? ? ??命名規范:DWD 層的表命名使用英文小寫字母,單詞之間用下劃線分開,總長度不能超過 40 個字符,并且應遵循下述規則:realtime_dwd_{業務/pub}_{數據域縮寫}_[{業務過程縮寫}]_[{自定義表命名標簽縮寫}]

{業務/pub}:參考業務命名 {數據域縮寫}:參考數據域劃分部分 {自定義表命名標簽縮寫}:實體名稱可以根據數據倉庫轉換整合后做一定的業務抽象的名稱,該名稱應該準確表述實體所代表的業務含義 樣例:realtime_dwd_trip_trd_order_base?

? ? ??3. DIM 層?

? ? ??公共維度層,基于維度建模理念思想,建立整個業務過程的一致性維度,降低數據計算口徑和算法不統一風險; DIM 層數據來源于兩部分:一部分是 Flink 程序實時處理 ODS 層數據得到,另外一部分是通過離線任務出倉得到; DIM 層維度數據主要使用 MySQL、Hbase、fusion(滴滴自研 KV 存儲) 三種存儲引擎,對于維表數據比較少的情況可以使用 MySQL,對于單條數據大小比較小,查詢 QPS 比較高的情況,可以使用 fusion 存儲,降低機器內存資源占用,對于數據量比較大,對維表數據變化不是特別敏感的場景,可以使用 HBase 存儲。

? ? ??命名規范:DIM 層的表命名使用英文小寫字母,單詞之間用下劃線分開,總長度不能超過 30 個字符,并且應遵循下述規則:dim_{業務/pub}_{維度定義}[_{自定義命名標簽}]:

{業務/pub}:參考業務命名 {維度定義}:參考維度命名 {自定義表命名標簽縮寫}:實體名稱可以根據數據倉庫轉換整合后做一定的業務抽象的名稱,該名稱應該準確表述實體所代表的業務含義 樣例:dim_trip_dri_base?

? ? ??4. DWM 匯總層建設

? ? ??在建設順風車實時數倉的匯總層的時候,跟順風車離線數倉有很多一樣的地方,但其具體技術實現會存在很大不同。

? ? ??第一:對于一些共性指標的加工,比如 pv,uv,訂單業務過程指標等,我們會在匯總層進行統一的運算,確保關于指標的口徑是統一在一個固定的模型中完成。對于一些個性指標,從指標復用性的角度出發,確定唯一的時間字段,同時該字段盡可能與其他指標在時間維度上完成拉齊,例如行中異常訂單數需要與交易域指標在事件時間上做到拉齊。

? ? ??第二:在順風車匯總層建設中,需要進行多維的主題匯總,因為實時數倉本身是面向主題的,可能每個主題會關心的維度都不一樣,所以需要在不同的主題下,按照這個主題關心的維度對數據進行匯總,最后來算業務方需要的匯總指標。在具體操作中,對于 pv 類指標使用 Stream SQL 實現 1 分鐘匯總指標作為最小匯總單位指標,在此基礎上進行時間維度上的指標累加;對于 uv 類指標直接使用 druid 數據庫作為指標匯總容器,根據業務方對匯總指標的及時性和準確性的要求,實現相應的精確去重和非精確去重。

? ? ??第三:匯總層建設過程中,還會涉及到衍生維度的加工。在順風車券相關的匯總指標加工中我們使用 Hbase 的版本機制來構建一個衍生維度的拉鏈表,通過事件流和 Hbase 維表關聯的方式得到實時數據當時的準確維度

? ? ??命名規范:DWM 層的表命名使用英文小寫字母,單詞之間用下劃線分開,總長度不能超過 40 個字符,并且應遵循下述規則:realtime_dwm_{業務/pub}_{數據域縮寫}_{數據主粒度縮寫}_[{自定義表命名標簽縮寫}]_{統計時間周期范圍縮寫}:

{業務/pub}:參考業務命名 {數據域縮寫}:參考數據域劃分部分 {數據主粒度縮寫}:指數據主要粒度或數據域的縮寫,也是聯合主鍵中的主要維度 {自定義表命名標簽縮寫}:實體名稱可以根據數據倉庫轉換整合后做一定的業務抽象的名稱,該名稱應該準確表述實體所代表的業務含義 {統計時間周期范圍縮寫}:1d:天增量;td:天累計(全量);1h:小時增量;th:小時累計(全量);1min:分鐘增量;tmin:分鐘累計(全量) 樣例:realtime_dwm_trip_trd_pas_bus_accum_1min?

? ? ??APP 應用層

? ? ??該層主要的工作是把實時匯總數據寫入應用系統的數據庫中,包括用于大屏顯示和實時 OLAP 的 Druid 數據庫(該數據庫除了寫入應用數據,也可以寫入明細數據完成匯總指標的計算)中,用于實時數據接口服務的 Hbase 數據庫,用于實時數據產品的 mysql 或者 redis 數據庫中。

? ? ??命名規范:基于實時數倉的特殊性不做硬性要求。

? ? ??2. 快手實時數倉場景化案例

?? ? ??1) 目標及難點

? ? ??目標

? ? ??首先由于是做數倉,因此希望所有的實時指標都有離線指標去對應,要求實時指標和離線指標整體的數據差異在 1% 以內,這是最低標準。

? ? ??其次是數據延遲,其 SLA 標準是活動期間所有核心報表場景的數據延遲不能超過 5 分鐘,這 5 分鐘包括作業掛掉之后和恢復時間,如果超過則意味著 SLA 不達標。

? ? ??最后是穩定性,針對一些場景,比如作業重啟后,我們的曲線是正常的,不會因為作業重啟導致指標產出一些明顯的異常。

? ? ??難點

? ? ??第一個難點是數據量大。每天整體的入口流量數據量級大概在萬億級。在活動如春晚的場景,QPS 峰值能達到億 / 秒。

? ? ??第二個難點是組件依賴比較復雜。可能這條鏈路里有的依賴于 Kafka,有的依賴 Flink,還有一些依賴 KV 存儲、RPC 接口、OLAP 引擎等,我們需要思考在這條鏈路里如何分布,才能讓這些組件都能正常工作。

? ? ??第三個難點是鏈路復雜。目前我們有 200+ 核心業務作業,50+ 核心數據源,整體作業超過 1000。

? ? ??2) 實時數倉 - 分層模型

? ? ??基于上面三個難點,來看一下數倉架構:

? ? ??如上所示:

? ? ??最下層有三個不同的數據源,分別是客戶端日志、服務端日志以及 Binlog 日志;在公共基礎層分為兩個不同的層次,一個是 DWD 層,做明細數據,另一個是 DWS 層,做公共聚合數據,DIM 是我們常說的維度。我們有一個基于離線數倉的主題預分層,這個主題預分層可能包括流量、用戶、設備、視頻的生產消費、風控、社交等。DWD 層的核心工作是標準化的清洗;DWS 層是把維度的數據和 DWD 層進行關聯,關聯之后生成一些通用粒度的聚合層次。再往上是應用層,包括一些大盤的數據,多維分析的模型以及業務專題數據;最上面是場景。整體過程可以分為三步:

? ? ??第一步是做業務數據化,相當于把業務的數據接進來;第二步是數據資產化,意思是對數據做很多的清洗,然后形成一些規則有序的數據;第三步是數據業務化,可以理解數據在實時數據層面可以反哺業務,為業務數據價值建設提供一些賦能。

? ? ??3) 實時數倉 - 保障措施

? ? ??基于上面的分層模型,來看一下整體的保障措施:

? ? ??保障層面分為三個不同的部分,分別是質量保障,時效保障以及穩定保障。

? ? ??我們先看藍色部分的質量保障。針對質量保障,可以看到在數據源階段,做了如數據源的亂序監控,這是我們基于自己的 SDK 的采集做的,以及數據源和離線的一致性校準。研發階段的計算過程有三個階段,分別是研發階段、上線階段和服務階段。研發階段可能會提供一個標準化的模型,基于這個模型會有一些 Benchmark,并且做離線的比對驗證,保證質量是一致的;上線階段更多的是服務監控和指標監控;在服務階段,如果出現一些異常情況,先做 Flink 狀態拉起,如果出現了一些不符合預期的場景,我們會做離線的整體數據修復。

? ? ??第二個是時效性保障。針對數據源,我們把數據源的延遲情況也納入監控。在研發階段其實還有兩個事情:首先是壓測,常規的任務會拿最近 7 天或者最近 14 天的峰值流量去看它是否存在任務延遲的情況;通過壓測之后,會有一些任務上線和重啟性能評估,相當于按照 CP 恢復之后,重啟的性能是什么樣子。

? ? ??最后一個是穩定保障,這在大型活動中會做得比較多,比如切換演練和分級保障。我們會基于之前的壓測結果做限流,目的是保障作業在超過極限的情況下,仍然是穩定的,不會出現很多的不穩定或者 CP 失敗的情況。之后我們會有兩種不同的標準,一種是冷備雙機房,另外一種是熱備雙機房。冷備雙機房是:當一個單機房掛掉,我們會從另一個機房去拉起;熱備雙機房:相當于同樣一份邏輯在兩個機房各部署一次。以上就是我們整體的保障措施。

? ? ??3) 快手場景問題及解決方案

? ? ??1. PV/UV 標準化

? ? ??1.1 場景

? ? ??第一個問題是 PV/UV 標準化,這里有三個截圖:

? ? ??第一張圖是春晚活動的預熱場景,相當于是一種玩法,第二和第三張圖是春晚當天的發紅包活動和直播間截圖。

? ? ??在活動進行過程中,我們發現 60~70% 的需求是計算頁面里的信息,如:

? ? ??這個頁面來了多少人,或者有多少人點擊進入這個頁面; 活動一共來了多少人; 頁面里的某一個掛件,獲得了多少點擊、產生了多少曝光。

? ? ??1.2 方案

? ? ??抽象一下這個場景就是下面這種 SQL:

? ? ??簡單來說,就是從一張表做篩選條件,然后按照維度層面做聚合,接著產生一些 Count 或者 Sum 操作。

? ? ??基于這種場景,我們最開始的解決方案如上圖右邊所示。

? ? ??我們用到了 Flink SQL 的 Early Fire 機制,從 Source 數據源取數據,之后做了 DID 的分桶。比如最開始紫色的部分按這個做分桶,先做分桶的原因是防止某一個 DID 存在熱點的問題。分桶之后會有一個叫做 Local Window Agg 的東西,相當于數據分完桶之后把相同類型的數據相加。Local Window Agg 之后再按照維度進行 Global Window Agg 的合桶,合桶的概念相當于按照維度計算出最終的結果。Early Fire 機制相當于在 Local Window Agg 開一個天級的窗口,然后每分鐘去對外輸出一次。

? ? ??這個過程中我們遇到了一些問題,如上圖左下角所示。

? ? ??在代碼正常運行的情況下是沒有問題的,但如果整體數據存在延遲或者追溯歷史數據的情況,比如一分鐘 Early Fire 一次,因為追溯歷史的時候數據量會比較大,所以可能導致 14:00 追溯歷史,直接讀到了 14:02 的數據,而 14:01 的那個點就被丟掉了,丟掉了以后會發生什么?

? ? ??在這種場景下,圖中上方的曲線為 Early Fire 回溯歷史數據的結果。橫坐標是分鐘,縱坐標是截止到當前時刻的頁面 UV,我們發現有些點是橫著的,意味著沒有數據結果,然后一個陡增,然后又橫著的,接著又一個陡增,而這個曲線的預期結果其實是圖中下方那種平滑的曲線。

為了解決這個問題,我們用到了 Cumulate Window 的解決方案,這個解決方案在 Flink 1.13 版本里也有涉及,其原理是一樣的。

? ? ??數據開一個大的天級窗口,大窗口下又開了一個小的分鐘級窗口,數據按數據本身的 Row Time 落到分鐘級窗口。

? ? ??Watermark 推進過了窗口的 event_time,它會進行一次下發的觸發,通過這種方式可以解決回溯的問題,數據本身落在真實的窗口, Watermark 推進,在窗口結束后觸發。此外,這種方式在一定程度上能夠解決亂序的問題。比如它的亂序數據本身是一個不丟棄的狀態,會記錄到最新的累計數據。最后是語義一致性,它會基于事件時間,在亂序不嚴重的情況下,和離線計算出來的結果一致性是相當高的。以上是 PV/UV 一個標準化的解決方案。

? ? ??2. DAU 計算

? ? ??2.1 背景介紹

? ? ??下面介紹一下 DAU 計算:

? ? ??我們對于整個大盤的活躍設備、新增設備和回流設備有比較多的監控。

? ? ??活躍設備指的是當天來過的設備;新增設備指的是當天來過且歷史沒有來過的設備;回流設備指的是當天來過且 N 天內沒有來過的設備。但是我們計算過程之中可能需要 5~8 個這樣不同的 Topic 去計算這幾個指標。

? ? ??我們看一下離線過程中,邏輯應該怎么算。

? ? ??首先我們先算活躍設備,把這些合并到一起,然后做一個維度下的天級別去重,接著再去關聯維度表,這個維度表包括設備的首末次時間,就是截止到昨天設備首次訪問和末次訪問的時間。

? ? ??得到這個信息之后,我們就可以進行邏輯計算,然后我們會發現新增和回流的設備其實是活躍設備里打的一個子標簽。新增設備就是做了一個邏輯處理,回流設備是做了 30 天的邏輯處理,基于這樣的解決方案,我們能否簡單地寫一個 SQL 去解決這個問題?

? ? ??其實我們最開始是這么做的,但遇到了一些問題:

? ? ??第一個問題是:數據源是 6~8 個,而且我們大盤的口徑經常會做微調,如果是單作業的話,每次微調的過程之中都要改,單作業的穩定性會非常差;第二個問題是:數據量是萬億級,這會導致兩個情況,首先是這個量級的單作業穩定性非常差,其次是實時關聯維表的時候用的 KV 存儲,任何一個這樣的 RPC 服務接口,都不可能在萬億級數據量的場景下保證服務穩定性;第三個問題是:我們對于時延要求比較高,要求時延小于一分鐘。整個鏈路要避免批處理,如果出現了一些任務性能的單點問題,我們還要保證高性能和可擴容。

? ? ??2.2 技術方案

? ? ??針對以上問題,介紹一下我們是怎么做的:

? ? ??如上圖的例子,第一步是對 A B C 這三個數據源,先按照維度和 DID 做分鐘級別去重,分別去重之后得到三個分鐘級別去重的數據源,接著把它們 Union 到一起,然后再進行同樣的邏輯操作。

? ? ??這相當于我們數據源的入口從萬億變到了百億的級別,分鐘級別去重之后再進行一個天級別的去重,產生的數據源就可以從百億變成了幾十億的級別。

? ? ??在幾十億級別數據量的情況下,我們再去關聯數據服務化,這就是一種比較可行的狀態,相當于去關聯用戶畫像的 RPC 接口,得到 RPC 接口之后,最終寫入到了目標 Topic。這個目標 Topic 會導入到 OLAP 引擎,供給多個不同的服務,包括移動版服務,大屏服務,指標看板服務等。

? ? ??這個方案有三個方面的優勢,分別是穩定性、時效性和準確性。

? ? ??首先是穩定性。松耦合可以簡單理解為當數據源 A 的邏輯和數據源 B 的邏輯需要修改時,可以單獨修改。第二是任務可擴容,因為我們把所有邏輯拆分得非常細粒度,當一些地方出現了如流量問題,不會影響后面的部分,所以它擴容比較簡單,除此之外還有服務化后置和狀態可控。其次是時效性,我們做到毫秒延遲,并且維度豐富,整體上有 20+ 的維度做多維聚合。最后是準確性,我們支持數據驗證、實時監控、模型出口統一等。此時我們遇到了另外一個問題 - 亂序。對于上方三個不同的作業,每一個作業重啟至少會有兩分鐘左右的延遲,延遲會導致下游的數據源 Union 到一起就會有亂序。

? ? ??2.3 延遲計算方案

? ? ??遇到上面這種有亂序的情況下,我們要怎么處理?

? ? ??我們總共有三種處理方案:

? ? ??第一種解決方案是用 “did + 維度 + 分鐘” 進行去重,Value 設為 “是否來過”。比如同一個 did,04:01 來了一條,它會進行結果輸出。同樣的,04:02 和 04:04 也會進行結果輸出。但如果 04:01 再來,它就會丟棄,但如果 04:00 來,依舊會進行結果輸出。

這個解決方案存在一些問題,因為我們按分鐘存,存 20 分鐘的狀態大小是存 10 分鐘的兩倍,到后面這個狀態大小有點不太可控,因此我們又換了解決方案 2。

? ? ??第二種解決方案,我們的做法會涉及到一個假設前提,就是假設不存在數據源亂序的情況。在這種情況下,key 存的是 “did + 維度”,Value 為 “時間戳”,它的更新方式如上圖所示。04:01 來了一條數據,進行結果輸出。04:02 來了一條數據,如果是同一個 did,那么它會更新時間戳,然后仍然做結果輸出。04:04 也是同樣的邏輯,然后將時間戳更新到 04:04,如果后面來了一條 04:01 的數據,它發現時間戳已經更新到 04:04,它會丟棄這條數據。這樣的做法大幅度減少了本身所需要的一些狀態,但是對亂序是零容忍,不允許發生任何亂序的情況,由于我們不好解決這個問題,因此我們又想出了解決方案 3。

? ? ??方案 3 是在方案 2 時間戳的基礎之上,加了一個類似于環形緩沖區,在緩沖區之內允許亂序。

比如 04:01 來了一條數據,進行結果輸出;04:02 來了一條數據,它會把時間戳更新到 04:02,并且會記錄同一個設備在 04:01 也來過。如果 04:04 再來了一條數據,就按照相應的時間差做一個位移,最后通過這樣的邏輯去保障它能夠容忍一定的亂序。

? ? ??綜合來看這三個方案:

? ? ??方案 1 在容忍 16 分鐘亂序的情況下,單作業的狀態大小在 480G 左右。這種情況雖然保證了準確性,但是作業的恢復和穩定性是完全不可控的狀態,因此我們還是放棄了這個方案;

? ? ??方案 2 是 30G 左右的狀態大小,對于亂序 0 容忍,但是數據不準確,由于我們對準確性的要求非常高,因此也放棄了這個方案;

? ? ??方案 3 的狀態跟方案 1 相比,它的狀態雖然變化了但是增加的不多,而且整體能達到跟方案 1 同樣的效果。方案 3 容忍亂序的時間是 16 分鐘,我們正常更新一個作業的話,10 分鐘完全足夠重啟,因此最終選擇了方案 3。

? ? ??3. 運營場景

? ? ??3.1 背景介紹

? ? ??運營場景可分為四個部分:

? ? ??第一個是數據大屏支持,包括單直播間的分析數據和大盤的分析數據,需要做到分鐘級延遲,更新要求比較高;

? ? ??第二個是直播看板支持,直播看板的數據會有特定維度的分析,特定人群支持,對維度豐富性要求比較高;

? ? ??第三個是數據策略榜單,這個榜單主要是預測熱門作品、爆款,要求的是小時級別的數據,更新要求比較低;

? ? ??第四個是 C 端實時指標展示,查詢量比較大,但是查詢模式比較固定。

? ? ??下面進行分析這 4 種不同的狀態產生的一些不同的場景。

? ? ??前 3 種基本沒有什么差別,只是在查詢模式上,有的是特定業務場景,有的是通用業務場景。

針對第 3 種和第 4 種,它對于更新的要求比較低,對于吞吐的要求比較高,過程之中的曲線也不要求有一致性。第 4 種查詢模式更多的是單實體的一些查詢,比如去查詢內容,會有哪些指標,而且對 QPS 要求比較高。

? ? ??3.2 技術方案

? ? ??針對上方 4 種不同的場景,我們是如何去做的?

? ? ??首先看一下基礎明細層 (圖中左方),數據源有兩條鏈路,其中一條鏈路是消費的流,比如直播的消費信息,還有觀看 / 點贊 / 評論。經過一輪基礎清洗,然后做維度管理。上游的這些維度信息來源于 Kafka,Kafka 寫入了一些內容的維度,放到了 KV 存儲里邊,包括一些用戶的維度。

? ? ??這些維度關聯了之后,最終寫入 Kafka 的 DWD 事實層,這里為了做性能的提升,我們做了二級緩存的操作。

? ? ??如圖中上方,我們讀取 DWD 層的數據然后做基礎匯總,核心是窗口維度聚合生成 4 種不同粒度的數據,分別是大盤多維匯總 topic、直播間多維匯總 topic、作者多維匯總 topic、用戶多維匯總 topic,這些都是通用維度的數據。

? ? ??如圖中下方,基于這些通用維度數據,我們再去加工個性化維度的數據,也就是 ADS 層。拿到了這些數據之后會有維度擴展,包括內容擴展和運營維度的拓展,然后再去做聚合,比如會有電商實時 topic,機構服務實時 topic 和大 V 直播實時 topic。

? ? ??分成這樣的兩個鏈路會有一個好處:一個地方處理的是通用維度,另一個地方處理的是個性化的維度。通用維度保障的要求會比較高一些,個性化維度則會做很多個性化的邏輯。如果這兩個耦合在一起的話,會發現任務經常出問題,并且分不清楚哪個任務的職責是什么,構建不出這樣的一個穩定層。

? ? ??如圖中右方,最終我們用到了三種不同的引擎。簡單來說就是 Redis 查詢用到了 C 端的場景,OLAP 查詢用到了大屏、業務看板的場景。

? ? ??3. 騰訊看點實時數倉案例

騰訊看點業務為什么要構建實時數倉,因為原始的上報數據量非常大,一天上報峰值就有上萬億條。而且上報格式混亂。缺乏內容維度信息、用戶畫像信息,下游沒辦法直接使用。而我們提供的實時數倉,是根據騰訊看點信息流的業務場景,進行了內容維度的關聯,用戶畫像的關聯,各種粒度的聚合,下游可以非常方便的使用實時數據。

? ? ??1) 方案選型

? ? ? 那就看下我們多維實時數據分析系統的方案選型,選型我們對比了行業內的領先方案,選擇了最符合我們業務場景的方案。

? ? ? 第一塊是實時數倉的選型,我們選擇的是業界比較成熟的 Lambda 架構,他的優點是靈活性高、容錯性高、成熟度高和遷移成本低;缺點是實時、離線數據用兩套代碼,可能會存在一個口徑修改了,另一個沒改的問題,我們每天都有做數據對賬的工作,如果有異常會進行告警。

? ? ? 第二塊是實時計算引擎選型,因為 Flink 設計之初就是為了流處理,SparkStreaming 嚴格來說還是微批處理,Strom 用的已經不多了。再看 Flink 具有 Exactly-once 的準確性、輕量級 Checkpoint 容錯機制、低延時高吞吐和易用性高的特點,我們選擇了 Flink 作為實時計算引擎。

? ? ? 第三塊是實時存儲引擎,我們的要求就是需要有維度索引、支持高并發、預聚合、高性能實時多維 OLAP 查詢。可以看到,Hbase、Tdsql 和 ES 都不能滿足要求,Druid 有一個缺陷,它是按照時序劃分 Segment,無法將同一個內容,存放在同一個 Segment 上,計算全局 TopN 只能是近似值,所以我們選擇了最近兩年大火的 MPP 數據庫引擎 ClickHouse。

? ? ? 2) 設計目標與設計難點

? ? ? 我們多維實時數據分析系統分為三大模塊

? ? ? 實時計算引擎 實時存儲引擎 App 層

? ? ? 難點主要在前兩個模塊:實時計算引擎和實時存儲引擎。

? ? ? 千萬級/s 的海量數據如何實時接入,并且進行極低延遲維表關聯。 實時存儲引擎如何支持高并發寫入、高可用分布式和高性能索引查詢,是比較難的。

? ? ? ?這幾個模塊的具體實現,看一下我們系統的架構設計。

? ? ? 3) 架構設計

? ? ? 前端采用的是開源組件 Ant Design,利用了 Nginx 服務器,部署靜態頁面,并反向代理了瀏覽器的請求到后臺服務器上。

? ? ? 后臺服務是基于騰訊自研的 RPC 后臺服務框架寫的,并且會進行一些二級緩存。

? ? ? 實時數倉部分,分為了接入層、實時計算層和實時數倉存儲層。

? ? ? 接入層主要是從千萬級/s 的原始消息隊列中,拆分出不同行為數據的微隊列,拿看點的視頻來說,拆分過后,數據就只有百萬級/s 了;

? ? ? 實時計算層主要負責,多行行為流水數據進行行轉列,實時關聯用戶畫像數據和內容維度數據;

? ? ? ?實時數倉存儲層主要是設計出符合看點業務的,下游好用的實時消息隊列。我們暫時提供了兩個消息隊列,作為實時數倉的兩層。一層 DWM 層是內容 ID-用戶 ID 粒度聚合的,就是一條數據包含內容 ID-用戶 ID 還有 B 側內容數據、C 側用戶數據和用戶畫像數據;另一層是 DWS 層,是內容 ID 粒度聚合的,一條數據包含內容 ID,B 側數據和 C 側數據。可以看到內容 ID-用戶 ID 粒度的消息隊列流量進一步減小到十萬級/s,內容 ID 粒度的更是萬級/s,并且格式更加清晰,維度信息更加豐富。

? ? ? ?實時存儲部分分為實時寫入層、OLAP 存儲層和后臺接口層。

? ? ? ?實時寫入層主要是負責 Hash 路由將數據寫入; OLAP 存儲層利用 MPP 存儲引擎,設計符合業務的索引和物化視圖,高效存儲海量數據; 后臺接口層提供高效的多維實時查詢接口。

? ? ? ?4) 實時計算

? ? ??這個系統最復雜的兩塊,實時計算和實時存儲。

? ? ??先介紹實時計算部分:分為實時關聯和實時數倉。

? ? ??1. 實時高性能維表關聯

? ? ??實時維表關聯這一塊難度在于 百萬級/s 的實時數據流,如果直接去關聯 HBase,1 分鐘的數據,關聯完 HBase 耗時是小時級的,會導致數據延遲嚴重。

我們提出了幾個解決方案:

? ? ??第一個是,在 Flink 實時計算環節,先按照 1 分鐘進行了窗口聚合,將窗口內多行行為數據轉一行多列的數據格式,經過這一步操作,原本小時級的關聯耗時下降到了十幾分鐘,但是還是不夠的。

? ? ??第二個是,在訪問 HBase 內容之前設置一層 Redis 緩存,因為 1000 條數據訪問 HBase 是秒級的,而訪問 Redis 是毫秒級的,訪問 Redis 的速度基本是訪問 HBase 的 1000 倍。為了防止過期的數據浪費緩存,緩存過期時間設置成 24 小時,同時通過監聽寫 HBase Proxy 來保證緩存的一致性。這樣將訪問時間從十幾分鐘變成了秒級。

? ? ??第三個是,上報過程中會上報不少非常規內容 ID,這些內容 ID 在內容 HBase 中是不存儲的,會造成緩存穿透的問題。所以在實時計算的時候,我們直接過濾掉這些內容 ID,防止緩存穿透,又減少一些時間。

? ? ??第四個是,因為設置了定時緩存,會引入一個緩存雪崩的問題。為了防止雪崩,我們在實時計算中,進行了削峰填谷的操作,錯開設置緩存的時間。

可以看到,優化前后,數據量從百億級減少到了十億級,耗時從小時級減少到了數十秒,減少 99%。

? ? ??2. 下游提供服務

? ? ??實時數倉的難度在于:它處于比較新的領域,并且各個公司各個業務差距比較大,怎么能設計出方便,好用,符合看點業務場景的實時數倉是有難度的。

先看一下實時數倉做了什么,實時數倉對外就是幾個消息隊列,不同的消息隊列里面存放的就是不同聚合粒度的實時數據,包括內容 ID、用戶 ID、C 側行為數據、B 側內容維度數據和用戶畫像數據等。

? ? ??我們是怎么搭建實時數倉的,就是上面介紹的實時計算引擎的輸出,放到消息隊列中保存,可以提供給下游多用戶復用。

? ? ??我們可以看下,在我們建設實時數據倉庫前后,開發一個實時應用的區別。沒有數倉的時候,我們需要消費千萬級/s 的原始隊列,進行復雜的數據清洗,然后再進行用戶畫像關聯、內容維度關聯,才能拿到符合要求格式的實時數據,開發和擴展的成本都會比較高,如果想開發一個新的應用,又要走一遍這個流程。有了數倉之后,如果想開發內容 ID 粒度的實時應用,就直接申請 TPS 萬級/s 的 DWS 層的消息隊列。開發成本變低很多,資源消耗小很多,可擴展性也強很多。

看個實際例子,開發我們系統的實時數據大屏,原本需要進行如上所有操作,才能拿到數據。現在只需要消費 DWS 層消息隊列,寫一條 Flink SQL 即可,僅消耗 2 個 CPU 核心,1G 內存。

? ? ??可以看到,以 50 個消費者為例,建立實時數倉前后,下游開發一個實時應用,可以減少 98%的資源消耗。包括計算資源,存儲資源,人力成本和開發人員學習接入成本等等。并且消費者越多,節省越多。就拿 Redis 存儲這一部分來說,一個月就能省下上百萬人民幣。

? ? ??5) 實時存儲

? ? ??介紹完實時計算,再來介紹實時存儲。

? ? ??這塊分為三個部分來介紹

? ? ? 第一是 分布式-高可用 第二是 海量數據-寫入 第三是 高性能-查詢

? ? ? 1. 分布式-高可用

? ? ? 我們這里聽取的是 Clickhouse 官方的建議,借助 ZK 實現高可用的方案。數據寫入一個分片,僅寫入一個副本,然后再寫 ZK,通過 ZK 告訴同一個分片的其他副本,其他副本再過來拉取數據,保證數據一致性。

? ? ? 這里沒有選用消息隊列進行數據同步,是因為 ZK 更加輕量級。而且寫的時候,任意寫一個副本,其它副本都能夠通過 ZK 獲得一致的數據。而且就算其它節點第一次來獲取數據失敗了,后面只要發現它跟 ZK 上記錄的數據不一致,就會再次嘗試獲取數據,保證一致性。

? ? ? ?2. 海量數據-寫入

? ? ? 數據寫入遇到的第一個問題是,海量數據直接寫入 Clickhouse 的話,會導致 ZK 的 QPS 太高,解決方案是改用 Batch 方式寫入。Batch 設置多大呢,Batch 太小的話緩解不了 ZK 的壓力,Batch 也不能太大,不然上游內存壓力太大,通過實驗,最終我們選用了大小幾十萬的 Batch。

? ? ? 第二個問題是,隨著數據量的增長,單 QQ 看點的視頻內容每天可能寫入百億級的數據,默認方案是寫一張分布式表,這就會造成單臺機器出現磁盤的瓶頸,尤其是 Clickhouse 底層運用的是 Mergetree,原理類似于 HBase、RocketsDB 的底層 LSM-Tree。在合并的過程中會存在寫放大的問題,加重磁盤壓力。峰值每分鐘幾千萬條數據,寫完耗時幾十秒,如果正在做 Merge,就會阻塞寫入請求,查詢也會非常慢。我們做的兩個優化方案:一是對磁盤做 Raid,提升磁盤的 IO;二是在寫入之前進行分表,直接分開寫入到不同的分片上,磁盤壓力直接變為 1/N。

? ? ? ?第三個問題是,雖然我們寫入按照分片進行了劃分,但是這里引入了一個分布式系統常見的問題,就是局部的 Top 并非全局 Top 的問題。比如同一個內容 ID 的數據落在了不同的分片上,計算全局 Top100 閱讀的內容 ID,有一個內容 ID 在分片 1 上是 Top100,但是在其它分片上不是 Top100,導致匯總的時候,會丟失一部分數據,影響最終結果。我們做的優化是在寫入之前加上一層路由,將同一個內容 ID 的記錄,全部路由到同一個分片上,解決了該問題。

? ? ? 介紹完寫入,下一步介紹 Clickhouse 的高性能存儲和查詢。

? ? ? ?3. 高性能-存儲-查詢

? ? ??Clickhouse 高性能查詢的一個關鍵點是稀疏索引。稀疏索引這個設計就很有講究,設計得好可以加速查詢,設計不好反而會影響查詢效率。我根據我們的業務場景,因為我們的查詢大部分都是時間和內容 ID 相關的,比如說,某個內容,過去 N 分鐘在各個人群表現如何?我按照日期,分鐘粒度時間和內容 ID 建立了稀疏索引。針對某個內容的查詢,建立稀疏索引之后,可以減少 99%的文件掃描。

? ? ??還有一個問題就是,我們現在數據量太大,維度太多。拿 QQ 看點的視頻內容來說,一天流水有上百億條,有些維度有幾百個類別。如果一次性把所有維度進行預聚合,數據量會指數膨脹,查詢反而變慢,并且會占用大量內存空間。我們的優化,針對不同的維度,建立對應的預聚合物化視圖,用空間換時間,這樣可以縮短查詢的時間。

? ? ??分布式表查詢還會有一個問題,查詢單個內容 ID 的信息,分布式表會將查詢下發到所有的分片上,然后再返回查詢結果進行匯總。實際上,因為做過路由,一個內容 ID 只存在于一個分片上,剩下的分片都在空跑。針對這類查詢,我們的優化是后臺按照同樣的規則先進行路由,直接查詢目標分片,這樣減少了 N-1/N 的負載,可以大量縮短查詢時間。而且由于我們是提供的 OLAP 查詢,數據滿足最終一致性即可,通過主從副本讀寫分離,可以進一步提升性能。

? ? ??我們在后臺還做了一個 1 分鐘的數據緩存,針對相同條件查詢,后臺就直接返回了。

? ? ??4. 擴容

? ? ??這里再介紹一下我們的擴容的方案,調研了業內的一些常見方案。

? ? ??比如 HBase,原始數據都存放在 HDFS 上,擴容只是 Region Server 擴容,不涉及原始數據的遷移。但是 Clickhouse 的每個分片數據都是在本地,是一個比較底層存儲引擎,不能像 HBase 那樣方便擴容。

? ? ??Redis 是哈希槽這種類似一致性哈希的方式,是比較經典分布式緩存的方案。Redis slot 在 Rehash 的過程中雖然存在短暫的 ask 讀不可用,但是總體來說遷移是比較方便的,從原 h[0]遷移到 h[1],最后再刪除 h[0]。但是 Clickhouse 大部分都是 OLAP 批量查詢,不是點查,而且由于列式存儲,不支持刪除的特性,一致性哈希的方案不是很適合。

目前擴容的方案是,另外消費一份數據,寫入新 Clickhouse 集群,兩個集群一起跑一段時間,因為實時數據就保存 3 天,等 3 天之后,后臺服務直接訪問新集群。

? ? ? 4. 有贊實時數倉案例?

? ? ??1) 分層設計

? ? ? 傳統離線數倉的分層設計大家都很熟悉,為了規范的組織和管理數據,層級劃分會比較多,在一些復雜邏輯處理場景還會引入臨時層落地中間結果以方便下游加工處理。實時數倉考慮到時效性問題,分層設計需要盡量精簡,降低中間流程出錯的可能性,不過總體而言,實時數倉還是會參考離線數倉的分層思想來設計。

? ? ? 實時數倉分層架構如下圖所示 :

? ? ??- ODS(實時數據接入層)

? ? ??ODS 層,即實時數據接入層,通過數據采集工具收集各個業務系統的實時數據,對非結構化的數據進行結構化處理,保存原始數據,幾乎不過濾數據;該層數據的主要來源有三個部分:第一部分是業務方創建的 NSQ 消息,第二部分是業務數據庫的 Binlog 日志,第三部分是埋點日志和應用程序日志,以上三部分的實時數據最終統一寫入 Kafka 存儲介質中。

? ? ??ODS 層表命名規范:部門名稱.應用名稱.數倉層級主題域前綴數據庫名/消息名

? ? ??例如:接入業務庫的 Binlog

? ? ??實時數倉表命名:deptname.appname.ods_subjectname_tablename

? ? ??例如:接入業務方的 NSQ 消息

? ? ??實時數倉表命名:deptname.appname.ods_subjectname_msgname

? ? ??- DWS(實時明細中間層)

? ? ??DWS 層,即實時明細中間層,該層以業務過程作為建模驅動,基于每個具體的業務過程事件來構建最細粒度的明細層事實表;比如交易過程,有下單事件、支付事件、發貨事件等,我們會基于這些獨立的事件來進行明細層的構建。在這層,事實明細數據同樣是按照離線數倉的主題域來進行劃分,也會采用維度建模的方式組織數據,對于一些重要的維度字段,會做適當冗余。基于有贊實時需求的場景,重點建設交易、營銷、客戶、店鋪、商品等主題域的數據。該層的數據來源于 ODS 層,通過 FlinkSQL 進行 ETL 處理,主要工作有規范命名、數據清洗、維度補全、多流關聯,最終統一寫入 Kafka 存儲介質中。

? ? ??DWS 層表命名規范:部門名稱.應用名稱.數倉層級_主題域前綴_數倉表命名

? ? ??例如:實時事件 A 的中間層

? ? ??實時數倉表命名:deptname.appname.dws_subjectname_tablename_eventnameA

? ? ??例如:實時事件 B 的中間層

? ? ??實時數倉表命名:deptname.appname.dws_subjectname_tablename_eventnameB

? ? ??- DIM(實時維表層)

? ? ??DIM 層,即實時維表層,用來存放維度數據,主要用于實時明細中間層寬化處理時補全維度使用,目前該層的數據主要存儲于 HBase 中,后續會基于 QPS 和數據量大小提供更多合適類型的存儲介質。

? ? ??DIM 層表命名規范:應用名稱_數倉層級_主題域前綴_數倉表命名

? ? ??例如:HBase 存儲,實時維度表

? ? ??實時數倉表命名:appname_dim_tablename

? ? ??- DWA(實時匯總層)

? ? ??DWA 層,即實時匯總層,該層通過 DWS 層數據進行多維匯總,提供給下游業務方使用,在實際應用過程中,不同業務方使用維度匯總的方式不太一樣,根據不同的需求采用不同的技術方案去實現。第一種方式,采用 FlinkSQL 進行實時匯總,將結果指標存入 HBase、MySQL 等數據庫,該種方式是我們早期采用的方案,優點是實現業務邏輯比較靈活,缺點是聚合粒度固化,不易擴展;第二種方式,采用實時 OLAP 工具進行匯總,該種方式是我們目前常用的方案,優點是聚合粒度易擴展,缺點是業務邏輯需要在中間層預處理。

? ? ??DWA 層表命名規范:應用名稱_數倉層級_主題域前綴_聚合粒度_數據范圍

? ? ??例如:HBase 存儲,某域當日某粒度實時匯總表

? ? ??實時數倉表命名:appname_dwa_subjectname_aggname_daily

? ? ??- APP(實時應用層)

? ? ??APP 層,即實時應用層,該層數據已經寫入應用系統的存儲中,例如寫入 Druid 作為 BI 看板的實時數據集;寫入 HBase、MySQL 用于提供統一數據服務接口;寫入 ClickHouse 用于提供實時 OLAP 服務。因為該層非常貼近業務,在命名規范上實時數倉不做統一要求。

? ? ? 2) 實時 ETL

? ? ? 實時數倉 ETL 處理過程所涉及的組件比較多,接下來盤點構建實時數倉所需要的組件以及每個組件的應用場景。如下圖所示:

? ? ? 具體實時 ETL 處理流程如下圖所示:

? ? ??1. 維度補全

? ? ??創建調用 Duboo 接口的 UDF 函數在實時流里補全維度是最便捷的使用方式,但如果請求量過大,對 Duboo 接口壓力會過大。在實際應用場景補全維度首選還是關聯維度表,但關聯也存在一定概率的丟失問題,為了彌補這種丟失,可以采用 Duboo 接口調用兜底的方式來補全。偽代碼如下:

create?function?call_dubbo?as?'XXXXXXX';create?function?get_json_object?as?'XXXXXXX';case????when?cast(?b.column?as?bigint)?is?not?null????????then?cast(?b.column?as?bigint)????????????else?cast(coalesce(cast(get_json_object(call_dubbo('clusterUrl'??????????????????????????????????????????????????????????????,'serviceName'??????????????????????????????????????????????????????????????,'methodName'??????????????????????????????????????????????????????????????,cast(concat('[',cast(a.column?as?varchar),']')?as?varchar)??????????????????????????????????????????????????????????????,'key'??????????????????????????????????????????????????????????????)?????????????????????????????????????????????,'rootId')?????????????????????????????????????????as?bigint)???????????????????????????????????,a.column)????????????????????????????as?bigint)??end

? ? ??2. 冪等處理

? ? ??實時任務在運行過程中難免會遇到執行異常的情況,當任務異常重啟的時候會導致部分消息重新發送和消費,從而引發下游實時統計數據不準確,為了有效避免這種情況,可以選擇對實時消息流做冪等處理,當消費完一條消息,將這條消息的 Key 存入 KV,如果任務異常重啟導致消息重新發送的時候,先從 KV 判斷該消息是否已被消費,如果已消費就不再往下發送。偽代碼如下:

create?function?idempotenc?as?'XXXXXXX';insert?into?tableselect????order_nofrom????(????????select????????????a.orderNo????????????????????????????????????????as??order_no??????????,?idempotenc('XXXXXXX',?coalesce(?order_no,?'')?)??as??rid????????from????????????table1????)?twhere????t.rid?=?;

? ? ??3. 數據驗證

? ? ??由于實時數倉的數據是無邊界的流,相比于離線數倉固定不變的數據更難驗收。基于不同的場景,我們提供了 2 種驗證方式,分別是:抽樣驗證與全量驗證。如圖 3.3 所示

抽樣驗證方案

? ? ??該方案主要應用在數據準確性驗證上,實時匯總結果是基于存儲在 Kafka 的實時明細中間層計算而來,但 Kafka 本身不支持按照特定條件檢索,不支持寫查詢語句,再加上消息的無邊界性,統計結果是在不斷變化的,很難尋找參照物進行比對。鑒于此,我們采用了持久化消息的方法,將消息落盤到 TiDB 存儲,基于 TiDB 的能力對落盤的消息進行檢索、查詢、匯總。編寫固定時間邊界的測試用例與相同時間邊界的業務庫數據或者離線數倉數據進行比對。通過以上方式,抽樣核心店鋪的數據進行指標準確性驗證,確保測試用例全部通過。

全量驗證方案

? ? ??該方案主要應用在數據完整性和一致性驗證上,在實時維度表驗證的場景使用最多。大體思路:將存儲實時維度表的在線 HBase 集群中的數據同步到離線 HBase 集群中,再將離線 HBase 集群中的數據導入到 Hive 中,在限定實時維度表的時間邊界后,通過數據平臺提供的數據校驗功能,比對實時維度表與離線維度表是否存在差異,最終確保兩張表的數據完全一致。

? ? ??4. 數據恢復

? ? ??實時任務一旦上線就要求持續不斷的提供準確、穩定的服務。區別于離線任務按天調度,如果離線任務出現 Bug,會有充足的時間去修復。如果實時任務出現 Bug,必須按照提前制定好的流程,嚴格按照步驟執行,否則極易出現問題。造成 Bug 的情況有非常多,比如代碼 Bug、異常數據 Bug、實時集群 Bug,如下圖展示了修復實時任務 Bug 并恢復數據的流程。

? ? ? 5. 騰訊全場景實時數倉建設案例

? ? ? 在數倉體系中會有各種各樣的大數據組件,譬如 Hive/HBase/HDFS/S3,計算引擎如 MapReduce、Spark、Flink,根據不同的需求,用戶會構建大數據存儲和處理平臺,數據在平臺經過處理和分析,結果數據會保存到 MySQL、Elasticsearch 等支持快速查詢的關系型、非關系型數據庫中,接下來應用層就可以基于這些數據進行 BI 報表開發、用戶畫像,或基于 Presto 這種 OLAP 工具進行交互式查詢等。

? ? ? 1) Lambda 架構的痛點

? ? ? 在整個過程中我們常常會用一些離線的調度系統,定期的(T+1 或者每隔幾小時)去執行一些 Spark 分析任務,做一些數據的輸入、輸出或是 ETL 工作。離線數據處理的整個過程中必然存在數據延遲的現象,不管是數據接入還是中間的分析,數據的延遲都是比較大的,可能是小時級也有可能是天級別的。另外一些場景中我們也常常會為了一些實時性的需求去構建一個實時處理過程,比如借助 Flink+Kafka 去構建實時的流處理系統。

? ? ? 整體上,數倉架構中有非常多的組件,大大增加了整個架構的復雜性和運維的成本。

? ? ? 如下圖,這是很多公司之前或者現在正在采用的 Lambda 架構,Lambda 架構將數倉分為離線層和實時層,相應的就有批處理和流處理兩個相互獨立的數據處理流程,同一份數據會被處理兩次以上,同一套業務邏輯代碼需要適配性的開發兩次。Lambda 架構大家應該已經非常熟悉了,下面我就著重介紹一下我們采用 Lambda 架構在數倉建設過程中遇到的一些痛點問題。

? ? ? 例如在實時計算一些用戶相關指標的實時場景下,我們想看到當前 pv、uv 時,我們會將這些數據放到實時層去做一些計算,這些指標的值就會實時呈現出來,但同時想了解用戶的一個增長趨勢,需要把過去一天的數據計算出來。這樣就需要通過批處理的調度任務來實現,比如凌晨兩三點的時候在調度系統上起一個 Spark 調度任務把當天所有的數據重新跑一遍。

? ? ? 很顯然在這個過程中,由于兩個過程運行的時間是不一樣的,跑的數據卻相同,因此可能造成數據的不一致。因為某一條或幾條數據的更新,需要重新跑一遍整個離線分析的鏈路,數據更新成本很大,同時需要維護離線和實時分析兩套計算平臺,整個上下兩層的開發流程和運維成本其實都是非常高的。

? ? ? ?為了解決 Lambda 架構帶來的各種問題,就誕生了 Kappa 架構,這個架構大家應該也非常的熟悉。

? ? ? 2) Kappa 架構的痛點

? ? ? 我們來講一下 Kappa 架構,如下圖,它中間其實用的是消息隊列,通過用 Flink 將整個鏈路串聯起來。Kappa 架構解決了 Lambda 架構中離線處理層和實時處理層之間由于引擎不一樣,導致的運維成本和開發成本高昂的問題,但 Kappa 架構也有其痛點。

? ? ? 首先,在構建實時業務場景時,會用到 Kappa 去構建一個近實時的場景,但如果想對數倉中間層例如 ODS 層做一些簡單的 OLAP 分析或者進一步的數據處理時,如將數據寫到 DWD 層的 Kafka,則需要另外接入 Flink。同時,當需要從 DWD 層的 Kafka 把數據再導入到 Clickhouse,Elasticsearch,MySQL 或者是 Hive 里面做進一步的分析時,顯然就增加了整個架構的復雜性。

? ? ? 其次,Kappa 架構是強烈依賴消息隊列的,我們知道消息隊列本身在整個鏈路上數據計算的準確性是嚴格依賴它上游數據的順序,消息隊列接的越多,發生亂序的可能性就越大。ODS 層數據一般是絕對準確的,把 ODS 層的數據發送到下一個 kafka 的時候就有可能發生亂序,DWD 層再發到 DWS 的時候可能又亂序了,這樣數據不一致性就會變得很嚴重。

? ? ? 第三,Kafka 由于它是一個順序存儲的系統,順序存儲系統是沒有辦法直接在其上面利用 OLAP 分析的一些優化策略,例如謂詞下推這類的優化策略,在順序存儲的 Kafka 上來實現是比較困難的事情。

? ? ? 那么有沒有這樣一個架構,既能夠滿足實時性的需求,又能夠滿足離線計算的要求,而且還能夠減輕運維開發的成本,解決通過消息隊列構建 Kappa 架構過程中遇到的一些痛點?答案是肯定的,后面的篇幅會詳細論述。

? ? ? 3) 痛點總結

? ? ? ?4) Flink+Iceberg 構建實時數倉

? ? ? 1. 近實時的數據接入

? ? ? 前面介紹了 Iceberg 既支持讀寫分離,又支持并發讀、增量讀、小文件合并,還可以支持秒級到分鐘級的延遲,基于這些優勢我們嘗試采用 Iceberg 這些功能來構建基于 Flink 的實時全鏈路批流一體化的實時數倉架構。

? ? ? 如下圖所示,Iceberg 每次的 commit 操作,都是對數據的可見性的改變,比如說讓數據從不可見變成可見,在這個過程中,就可以實現近實時的數據記錄。

? ? ? 2. 實時數倉 - 數據湖分析系統

? ? ? 此前需要先進行數據接入,比如用 Spark 的離線調度任務去跑一些數據,拉取,抽取最后再寫入到 Hive 表里面,這個過程的延時比較大。有了 Iceberg 的表結構,可以中間使用 Flink,或者 spark streaming,完成近實時的數據接入。

? ? ? 基于以上功能,我們再來回顧一下前面討論的 Kappa 架構,Kappa 架構的痛點上面已經描述過,Iceberg 既然能夠作為一個優秀的表格式,既支持 Streaming reader,又可以支持 Streaming sink,是否可以考慮將 Kafka 替換成 Iceberg?

? ? ? Iceberg 底層依賴的存儲是像 HDFS 或 S3 這樣的廉價存儲,而且 Iceberg 是支持 parquet、orc、Avro 這樣的列式存儲。有列式存儲的支持,就可以對 OLAP 分析進行基本的優化,在中間層直接進行計算。例如謂詞下推最基本的 OLAP 優化策略,基于 Iceberg snapshot 的 Streaming reader 功能,可以把離線任務天級別到小時級別的延遲大大的降低,改造成一個近實時的數據湖分析系統。

? ? ? 在中間處理層,可以用 presto 進行一些簡單的查詢,因為 Iceberg 支持 Streaming read,所以在系統的中間層也可以直接接入 Flink,直接在中間層用 Flink 做一些批處理或者流式計算的任務,把中間結果做進一步計算后輸出到下游。

? ? ? 替換 Kafka 的優劣勢:

? ? ? 總的來說,Iceberg 替換 Kafka 的優勢主要包括:

? ? ? 實現存儲層的流批統一

? ? ??中間層支持 OLAP 分析

? ? ??完美支持高效回溯

? ? ??存儲成本降低

? ? ??當然,也存在一定的缺陷,如:

? ? ??數據延遲從實時變成近實時

? ? ??對接其他數據系統需要額外開發工作

? ? ??秒級分析 - 數據湖加速

? ? ??由于 Iceberg 本身是將數據文件全部存儲在 HDFS 上的,HDFS 讀寫這塊對于秒級分析的場景,還是不能夠完全滿足我們的需求,所以接下去我們會在 Iceberg 底層支持 Alluxio 這樣一個緩存,借助于緩存的能力可以實現數據湖的加速。這塊的架構也在我們未來的一個規劃和建設中。


(部分內容來源網絡,如有侵權請聯系刪除)
立即申請數據分析/數據治理產品免費試用 我要試用
customer

在線咨詢

在線咨詢

點擊進入在線咨詢