- 產品
- 產品解決方案
- 行業解決方案
- 案例
- 數據資產入表
- 賦能中心
- 伙伴
- 關于
時間:2022-11-09來源:生氣叻瀏覽數:466次
數據服務是數據中臺體系中的關鍵組成部分。
廣告人群 USP、DMP 系統每天需要通過 HiveServer 以流的方式從數倉導出數據到本地,每個人群的數據量從幾十萬到幾個億,人群數量 2w+,每個人群運行時間在 30min +,部分大人群的運行直接超過 1h,在資源緊張的情況下,人群延遲情況嚴重。
數倉的數據在被數據產品使用時,需要為每個表新生成一個單獨的接口,應用端需要為每一種訪問方式(如 Presto、ClickHouse)區分使用不同的接口,導致數據產品接口暴漲,不方便維護,影響開發及維護效率。數據在不同的存儲時,需要包含 clickhouse-client,presto-client 等等第三方 jar 包。
不同數據產品中都需要使用一些常用的數據指標,如銷售額、訂單數、PV、UV 等,而這些數據在不同數據產品的實現口徑、實現方式都不一樣,無法形成數據共享,每個數據產品都重復進行相同的指標建設。因此,在不同數據產品查看相同指標卻發現數值不同的情況下,難以判斷哪個數據產品提供的數據是準確的。
圖 1.數據流入流出方式
為解決以上問題,數據服務應運而生。目前數據服務的主要優勢有:屏蔽底層的存儲引擎、計算引擎,使用同一個 API(one service),數倉數據分層存儲,不同Engine 的 SQL 生成能力,自適應 SQL 執行以及統一緩存架構保障業務 SLA,支持數據注冊并授權給任何調用方進行使用,提高數據交付效率。通過唯一的 ID 標識,數據產品可通過 ID 查閱數據,而非直接訪問對應的數倉表。一方面,指標服務統一了指標的口徑,同時也支持快速構建新的數據產品。數據服務能給業務帶來運營和商業價值,核心在于給用戶提供自助分析數據能力。Hera整體架構基于典型的 Master/slave 模型,數據流與控制流單獨鏈路,從而保障系統的高可用性。數據服務系統主要分為三層:
應用接入層:業務申請接入時,可以根據業務要求選擇數據服務 API(TCP Client),HTTP 以及 OSP 服務接口(公司內部 RPC 框架)。
數據服務層:主要執行業務提交的任務,并返回結果。主要功能點包括:路由策略,多引擎支持,引擎資源配置,引擎參數動態組裝,SQL Lispengine 生成,SQL 自適應執行,統一數據查詢緩存,FreeMaker SQL 動態生成等功能。
數據層:業務查詢的數據無論在數倉、Clickhouse、MySQL 還是 Redis 中,都可以很好地得到支持,用戶都使用同一套 API。
圖 2. 數據服務整體架構圖
調度系統的整體流程大致包含以下模塊:
Master:負責管理所有的 Worker、TransferServer、AdhocWorker 節點,同時負責調度分發作業;
Worker:負責執行 ETL 和數據文件導出類型的作業,拉起 AdhocWorker 進程(Adhoc 任務在 AdhocWorker 進程中的線程池中執行),ETL 類型的作業通過子進程的方式完成;
Client:客戶端,用于編程式地提交 SQL 作業;
ConfigCenter:負責向集群推送統一配置信息及其它運行時相關的配置和 SQLParser (根據給定的規則解析、替換、生成改寫 SQL 語句,以支持不同計算引擎的執行);
TransferServer:文件傳輸服務。
圖 3. 數據服務調度流程圖
Hera 數據服務的主要功能有:多隊列調度策略、多引擎查詢、多任務類型、文件導出、資源隔離、引擎參數動態組裝、自適應 Engine 執行和 SQL 構建。多隊列調度策略 數據服務支持按照不同用戶、不同任務類型并根據權重劃分不同調度隊列,以滿足不同任務類型的 SLA。多引擎查詢 數據服務支持目前公司內部所有 OLAP 和數據庫類型,包括 Spark、Presto、Clickhouse、Hive 、MySQL、Redis。會根據業務具體場景和要求,選擇當前最佳的查詢引擎。多任務類型 數據服務支持的任務類型有:ETL、Adhoc、文件導出、數據導入。加上多引擎功能,實現多種功能組合,如 Spark adhoc 和 Presto adhoc。文件導出 主要是支持大量的數據從數據倉庫中導出,便于業務分析和處理,比如供應商發券和信息推送等。具體執行過程如下:用戶提交需要導出數據的 SQL,通過分布式 engine 執行完成后,落地文件到 hdfs/alluxio. 客戶端通過 TCP 拉取文件到本地。千萬億級的數據導出耗時最多 10min。數據導出在人群數據導出上性能由原來的 30min+ ,提升到最多不超過 3min,性能提升 10~30 倍。具體流程如下:
圖 4. 數據服務文件下載流程圖
資源隔離(Worker 資源和計算資源) 業務一般分為核心和非核心,在資源分配和調度上也不同。主要是從執行任務 Worker 和引擎資源,都可以實現物理級別的隔離,最大化減少不同業務之間相互影響。引擎參數動態組裝 線上業務執行需要根據業務情況進行調優,動態限制用戶資源使用,集群整體切換等操作,這個時候就需要對用戶作業參數動態修改,如 OLAP 引擎執行任務時,經常都要根據任務調優,設置不同參數。針對這類問題,數據服務提供了根據引擎類型自動組裝引擎參數,并且引擎參數支持動態調整,也可以針對特定任務、執行賬號、業務類型來設定 OLAP 引擎執行參數。自適應 Engine 執行 業務方在查詢時,有可能因為引擎資源不足或者查詢條件數據類型不匹配從而導致執行失敗。為了提高查詢成功率和服務 SLA 保障,設計了 Ad Hoc 自適應引擎執行,當一個引擎執行報錯后,會切換到另外一個引擎繼續執行。具體自適應執行邏輯如下圖所示:
圖 5. 自適應 Engine
執行SQL構建 數據服務 SQL 構建基于維度事實建模,支持單表模型、星型模型和雪花模型。
單表模型:一張事實表,一般為 DWS 或者 ADS 的匯總事實表。
星型模型:1 張事實表(如 DWD 明細事實表)+ N 張維表,例如訂單明細表 (事實表 FK=商品 ID) +? 商品維表 (維度表 PK=商品 ID)?。
雪花模型:1 張事實表(如 DWD 明細事實表)+ N 張維表+M 張沒有直接連接到事實表的維表,例如訂單明細表 (事實表 FK=商品 ID) +? 商品維表 (維度表 PK=商品 ID,FK=品類 ID)? +? 品類維表(維度表 PK=品類 ID)。 
圖 6.SQL 維度模型
任務調度 基于 Netty 庫收發集群消息,系統僅僅使用同一個線程池對象 EventLoopGroup 來收發消息,而用戶的業務邏輯,則交由一個單獨的線程池。選用 Netty 的另外一個原因是“零拷貝”的能力,在大量數據返回時,通過文件的形式直接將結果送給調用者。業務需求通常包含時間敏感與不敏感作業,為了提高作業的穩定性和系統的可配置性,Hera 提供了多隊列作業調度的功能。用戶在提交作業時可以顯式地指定一個作業隊列名,當這個作業在提交到集群時,如果相應的隊列有空閑,則就會被添加進相應的隊列中,否則返回具體的錯誤給客戶端,如任務隊列滿、隊列名不存在、隊列已經關閉等,客戶端可以選擇“是否重試提交”。當一個作業被添加進隊列之后,Master 就會立即嘗試調度這個隊列中的作業,基于以下條件選擇合適的作業運行:
每個隊列都有自己的權重,同時會設置占用整個集群的資源總量,如最多使用多少內存、最多運行的任務數量等。
隊列中的任務也有自己的權重,同時會記錄這個作業入隊的時間,在排序當前隊列的作業時,利用入隊的時間偏移量和總的超時時間,計算得到一個最終的評分。
除了調度系統本身的調度策略外,還需要考慮外部計算集群的負載,在從某個隊列中拿出一個作業后,再進行一次過濾,或者是先過濾,再進行作業的評分計算。
一個可用的計算作業評分模型如下:
隊列動態因子 = 隊列大小 / 隊列容量 * (1 - 作業運行數 / 隊列并行度)
這個等式表示的意義是:如果某個隊列正在等待的作業的占比比較大,同時并行運行的作業數占比也比較大時,這個隊列的作業就擁有一個更大的因子,也就意味著在隊列權重相同時,這個隊列中的作業應該被優先調度。
作業權重 = 1 - (當前時間-入隊時間) / 超時時間
這個等式表示的意義是:在同一個隊列中,如果一個作業的剩余超時時間越少,則意味著此作業將更快達到超時,因此它應該獲得更大的選擇機會。
Score = 作業權重 + 隊列動態因子 + 隊列權重
這個等式表示的意義是:對于所有的隊列中的所有任務,首先決定一個作業是否優先被調度的因子是設置的隊列權重,例如權重為 10 的隊列的作業,應該比權重為 1 的隊列中的作業被優先調度,而不管作業本身的權重(是否會有很大的機率超時);其次影響作業調度優先級的因子是隊列動態因子,例如有兩個相同權重的隊列時,如果一個隊列的動態因子為 0.5,另外一個隊列的動態因子是 0.3,那么應該優先選擇動態因子為 0.5 的隊列作業進行調度,而不管作業本身的權重;最后影響作業調度優先級的因子是作業權重,例如在同一個隊列中,有兩個權重分別為 0.2 和 0.5 的作業,那么為了避免更多的作業超時,權重為 0.2 的作業應該被優先調度。
簡單描述作業的排序過程就是,首先按隊列權重排序所有的隊列;對于有重復的隊列,則會計算每個隊列的動態因子,并按此因子排序;對于每一個隊列,作業的排序規則按作業的超時比率來排序;最終依次按序遍歷每一個隊列,嘗試從中選擇足夠多的作業運行,直到作業都被運行或是達到集群限制條件。這里說足夠多,是指每一個隊列都會有一個最大的并行度和最大資源占比,這兩個限制隊列的參數組合,是為了避免因某一個隊列的容量和并行度被設置的過大,可能超過了整個集群,導致其它隊列被“餓死”的情況。
用戶通過 Client 提交原始 SQL,這里以 Presto SQL 為例,Client 在提交作業時,指定了 SQL 路由,則會首先通過訪問 SQLParser 服務,在發送給 Master 之前,會首先提交 SQL 語句到 SQLParser 服務器,將 SQL 解析成后端計算集群可以支持的 SQL 語句,如 Spark、Presto、ClickHouse 等,為了能夠減少 RPC 交互次數,SQLParser 會一次返回所有可能被改寫的 SQL 語句。在接收到 SQLParser 服務返回的多個可能 SQL 語句后,就會填充當前的作業對象,真正開始向 Master 提交運行。Master 在收到用戶提交的作業后,會根據一定的調度策略,最終將任務分發到合適的 Worker 上,開始執行。Worker 會首先采用 SQL 作業默認的執行引擎,比如 Presto,提交到對應的計算集群運行,但如果因為某種原因不能得到結果,則會嘗試使用其它的計算引擎進行計算。
當然這里也可以同時向多個計算集群提交作業,一旦某個集群首先返回結果時,就取消所有其它的作業,不過這需要其它計算集群的入口能夠支持取消操作。當 SQL 作業完成后,將結果返回到 Worker 端,為了能夠更加高效地將查詢結果返回給 Client 端,Worker 會從 Master 發送的任務對象中提取 Client 側信息,并將結果直接發送給 Client,直到收到確認信息,至此整個任務才算執行完畢。在整個作業的流轉過程中,會以任務的概念在調度系統中進行傳播,并經歷幾個狀態的更新,分別標識 new、waiting、running、succeed、failed 階段。
圖 9. SQL 作業處理流程
數據服務搜集兩類 metrics,一類靜態的,用于描述 master/worker/client 的基本信息;一類是動態的,描述 master/worker 的運行時信息。這里主要說明一下有關集群動態信息的采集過程及作用。以 worker 為例,當 worker 成功注冊到 master 時,就會開啟定時心跳匯報動作,并借道心跳請求,將自己的運行時信息匯報給 master。這里主要是內存使用情況,例如當前 worker 通過估算方法,統計目前運行的任務占據了多少內存,以便 master 能夠在后續的任務分發過程中,能夠根據內存信息進行決策。master 會統計它所管理的集群整個情況,例如每個任務隊列的快照信息、worker 的快照信息、集群的運行時配置信息等,并通過參數控制是否打印這些信息,以便調試。數據服務主要解決 SLA 方面的問題。如人群計算、數據無縫遷移、數據產品 SLA 等,這里用人群舉例說明如下:
人群計算遇到的問題:
數據服務改造新的架構方案:
計算與存儲同置,這樣數據就不需通過網絡反復讀取,造成網絡流量浪費。
減少 HDFS 讀寫長尾對人群計算造成的額外影響,同時減少人群計算對于 HDFS 穩定性的影響。
廣告人群計算介于線上生產任務跟離線任務之間的任務類型。這里我們希望能保證這類應用的可靠性和穩定性,從而更好地為公司業務賦能
通過數據服務執行人群計算。

圖 10. Alluxio 和 Spark 集群混部
將 Hive 表的 location 從 HDFS 路徑替換為 Alluxio 路徑,即表示該表的數據存儲于 Alluxio 中。我們使用的方案不是直接寫通過 ETL 任務寫 Alluxio 表的數據,而是由 Alluxio 主動去拉取同樣 Hive 表結構的 HDFS 中的數據,即我們創建了一個 HDFS 表的 Alluxio 緩存表。由于 Alluxio 不能感知到分區表的變化,我們開發了一個定時任務去自動感知源表的分區變化,使得 Hive 表的數據能夠同步到 Alluxio 中。
具體步驟如下:
定時任務發起輪詢,檢測源表是否有新增分區。
發起一個 SYN2ALLUXIO 的任務由數據服務執行。
任務執行腳本為將 Alluxio 表添加與 HDFS 表相同的分區。
分區添加完成之后,Alluxio 會自動從 mount 的 HDFS 路徑完成數據同步。

圖 11. Alluxio 緩存表同步
上小節介紹了如何讓 Alluxio 和 HDFS 的 Hive 表保持數據同步,接下來需要做的就是讓任務計算的 Spark 任務跑在 Spark 與 Alluxio 混部的集群上,充分利用數據的本地性以及計算資源的隔離性,提高人群計算效率。人群服務通過調用數據服務執行。數據服務根據底表分區是否同步到 Alluxio 決定是否需要下推是用 Alluxio 表來完成計算。如果底表數據已經同步到 Alluxio,則使用 Alluxio 表來做為底表計算人群。依靠數據服務調度系統,通過用戶 SQL 改寫以及 Alluxio 和 Spark 計算結點混部模式,人群計算任務提速了 10%~30%。雖然截至今天,Hera 數據服務已經支持了很多生產業務,但目前仍有很多需要完善的地方:
不同 engine 存在同一個含義函數寫法不一致的情況。這種情況在 Presto 跟 ClickHouse 的函數比較時尤為突出,如 Presto 的 strpos(string, substring)函數,在 Clickhouse 中為 position(haystack, needle[, start_pos]),且這些函數的參數順序存在不一致的情況,如何更優雅地支持不同 engine 的差異情況還需要進一步思考。
人群計算采用業界通用的 ClickHouse BitMap 解決方案落地,提升人群的計算效率同時擴展數據服務的業務邊界。