日日碰狠狠躁久久躁96avv-97久久超碰国产精品最新-婷婷丁香五月天在线播放,狠狠色噜噜色狠狠狠综合久久 ,爱做久久久久久,高h喷水荡肉爽文np肉色学校

睿治

智能數據治理平臺

睿治作為國內功能最全的數據治理產品之一,入選IDC企業數據治理實施部署指南。同時,在IDC發布的《中國數據治理市場份額》報告中,連續四年蟬聯數據治理解決方案市場份額第一。

Hive到Spark離線計算實踐

時間:2022-06-01來源:生氣叻瀏覽數:382

為了能夠快速的定位到有問題的任務,我們在 Driver 中將 Job 相關的信息保存到 Hadoop CallerContext 中,在調用 HMS 接口的時候將 CallerContext 中的相關屬性設置到 EnvironmentContext 中透傳到 HMS 端,同時擴展了所有getPartitions 相關的接口支持傳遞 EnvironmentContext,EnvironmentContext 中的 properties 會在 HMS 的 audit log 中打印出來,方便問題任務的定位。

1. 背景介紹

2018年B站基于Hadoop開始搭建離線計算服務,計算集群規模從最初的兩百臺到發展到目前近萬臺,從單機房發展到多機房。我們先后在生產上大規模的使用了 Hive、Spark、Presto 作為離線計算引擎,其中 Hive 和 Spark 部署在 Yarn 上,具體的架構如下,目前每天有約20w的離線批作業運行在 Spark 和 Hive 上,下面介紹下我們做了哪些工作來確保這些作業的高效與穩定。

2. 從Hive到Spark

21年初的時候Hive還是B站主要的離線計算引擎,80%以上的離線作業使用 Hive 執行,Spark2.4作業占比接近20%,集群資源的使用率長期維持在80%以上。21年3月 Spark3.1 發布,相較于 Spark2.4 性能有了較大的提升,我們開始推動Spark3.1 在B站的落地,同時將 Hive-SQL 整體遷移至 Spark-SQL。

在B站,離線計算的調度已經完成了收口,80%以上的作業來自于自建的 BSK 調度平臺,其余的作業基本也都是 airflow 提交上來的,只有少量的任務來自散落的開發機。在推動 Hive 升級 Spark 時只要將調度平臺的作業完成遷移就可以覆蓋90%以上的作業。起步階段我們進行了少量的人工遷移,對用戶 SQL 進行了簡單改寫,修改了輸入輸出表后由兩個引擎執行,開發了一個結果對比的工具,通過對雙跑結果分析保障遷移效果?;谶@個操作鏈路我們自研了一個自動遷移工具,減少人工失誤和人力壓力。

2.1 語句轉換

我們重寫了 SparkSqlParser,將從調度系統中收集到的 SQL 進行輸入輸出表的替換,避免對生產環境的影響。調度平臺進行作業調度時以 DAG 為單位,一個調度任務里面可能存在多條 SQL,這些 SQL的輸入輸出表間存在依賴關系,為了保證雙跑盡可能的模擬生產表現,對一個 DAG 里面的多個調度作業進行輸入輸出表替換時進行整體替換,保證了相互間依賴。對于 Select語句因為本身沒有輸出表,需要將 Select 語句轉換為 CTAS 語句,這樣就能將執行結果落地進行對比,需要注意的是轉換過程中要將列名進行編碼防止中文列導致的建表失敗。當遷移工具識別出 SQL 語句為 DDL 語句,如果不是 CTAS 這種需要消耗計算資源的就直接跳過對比,同時對該語句進行標記,保證交由 Hive 執行,防止意外的元信息修改。

2.2 結果對比

雙跑輸出結果的對比是保證數據準確性的關鍵。首先對兩個結果表的 Schema 進行對比,這個通過調用 DESC 語法返回結果對照就可以完成。對于 Schema 一致的兩個表則進行下一步操作,兩表全量數據對比,我們設計了一個 SQL 對數據按行進行整體對比,具體的對比思路如圖:

第一步將兩表按所有列(這里是 name 和 num 字段)進行 GROUP BY,第二步 UNION ALL 兩表數據,第三步再按所有列(這里是 name, num 和 cnt 字段) GROUP BY 一次產生最終表,在最終表中 cnts 值為2的行表示這行數據在兩表中都有且重復值一致,對于值非2的數據就是差異行了。從上圖的例子來說差異行 Jack|1|2|1 表示 Jack|1 這行數據數據在一個表中存在兩行,結合差異行 Jack|1|1|1 來看其實就是 Jack|1 這行數據一個表有一行另一個表有兩行。通過這個方式就可以對雙跑產出的結果表進行一個全量的對比。通過這種結果對比方法可以完成大部分雙跑任務的結果對比,但是對于結果表中存在 LIST、SET、MAP 這種容器類型的,因為在 toString 時順序是無法保證的,所以會被識別為不一致,此外對于非穩定性的 SQL 如某列數據是 random 產生,因為每次執行產出的結果不一致,也會識別為對比失敗,這兩種情況下就需用人工的介入來分析了。

資源利用率的提升是做引擎升級的出發點,除了結果對比來保證數據準確性,我們還做了資源消耗對比來保證遷移的收益。對比系統收集了每個作業的執行時間以及消耗的資源,從執行時間、CPU 和內存的資源消耗進行兩個引擎執行性能的對比,在執行最終遷移前依據收集的數據為用戶提供了遷移的預期收益,提高了用戶遷移任務的積極性。從遷移中收集的數據來看 hive 切到 spark 可以減少40%以上的執行時間,同時整體資源消耗降低30%以上。

2.3 遷移&回滾

遷移系統對每個任務都執行了至少3次的雙跑對比,但依然不能完全消除執行遷移的風險,在實際遷移過程中的幾次問題都是遷移后穩定性不符合預期導致的,因此遷移系統對于遷移后的任務增加了監控,在一個任務遷移后,該任務的前3次調度執行消耗的時間、CPU 和內存資源將被用來和遷移前的七次平均執行數據對比,如果存在負優化的情況則會將這個任務執行引擎進行回滾并通知我們介入進行進一步分析。

3. Spark 在B站的實踐

3.1 穩定性改進

3.1.1 小文件問題

隨著B站業務高速發展,數據量和作業數增長越來越快,伴隨而來的小文件數也快速增長,小文件太多會增加 HDFS 元數據的壓力,在計算引擎讀取時也大大增加了讀請求的數量降低了讀取效率。為了解決小文件的問題,在寫表場景下對 Spark 做了如下兩種改造。

兜底小文件合并:我們修改了數據的寫出目錄,引擎計算先寫到一個中間目錄,在 FileFormatWriter.write 結束后 refreshUpdatedPartitions 前,插入了一個文件合并邏輯,從中間目錄中獲取分區下文件的平均大小,對于不存在小文件情況的目錄直接MV到最終目錄,對于存在小文件的目錄新增一個讀 RDD coalesce 到一個合適值寫出后 MV 到最終目錄。

基于 reparation 的小文件合并:可以看到兜底小文件合并方式需要先將數據落地到 HDFS,重新讀取后再寫出,這樣做放大了 HDFS寫操作(三副本),降低了計算引擎的執行性能。而 Spark3的 AQE 特性可以在有 shuffle 的場景下有效解決小文件的問題,很多情況下對于沒有 shuffle 的場景新增一個 reparation 操作就可以借助 AQE 的能力解決小文件的問題。社區 AQE 對于 reparation 這個 hint 是不會調整目標分區數的,我們新增了一個 rebalance hint,本質上和reparation 一樣只是將 AQE 的特性應用在了這個操作上,同時將 AQE 目標 size 相關的屬性和 rebalance 設置屬性做了隔離方便更好的設置文件大小而不影響計算的并行度。rebalance 操作會在最終寫出前增加一個 shuffle stage,有些情況下沒有這個 stage 上游輸出就已經沒有小文件了,為此作業是否增加 rebalance 操作依賴于我們對任務的畫像通過 HBO 系統開啟。

3.1.2 shuffle 穩定性問題

Shuffle 穩定性直接影響了 Spark 作業的 SLA,在B站推動 Spark 升級過程中成為用戶顧慮的點。

shuffle 磁盤分級:B站 Yarn 主集群采用 DataNode 和 NodeManage 混部模式,節點配置了多塊 HDD 盤和少量 SSD 盤,NM 以 HDD 盤作為計算盤,由于和 DN 沒有做到 IO 隔離,DN 和shuffle service 經?;ハ嘤绊?,因此我們對DiskBlockManager 進行了改造,優先使用 SSD 盤下的目錄作為工作目錄,當 SSD 盤存儲空間或者 inode 緊張時則降級到 Yarn 配置的計算目錄,借助 SSD 優異的隨機 IO 能力,有效的提高的了 shuffle 穩定性。

remote shuffle service:push based shuffle 方案可以大量降低磁盤隨機IO讀請求,如下圖:

通過中間服務將同屬一個分區的數據進行歸并,后續 reduce 操作就不需要從上游所有的 Map 節點拉取數據,在 shuffle 上下游 Task 數量多的情況下會對磁盤 IO 壓力指數放大,生產上 shuffle heavy 的任務表現很不穩定,經常出現FetchFailed Exception。B站在推動 RSS 落地時選擇了社區3.2 Push based shuffle 的方案,這個方案主要的優點是對 AQE 支持比較好,缺點是因為本地也要寫一份數據放大了寫。將數據先寫本地后異步的發送到 driver 維護的 executor 節點的 external shuffle 節點上,后續生產實踐中該方案有個問題,就是當作業啟動時通常 driver 維護的 executor 數不足以滿足遠程節點的選擇,而 SQL 作業參與計算的數據量通常是隨著過濾條件層層遞減的,通常 shuffle 數據量大的時候因為沒有足夠的節點會 fall back 到原先的 shuffle 方式,為了解決這個問題,我們新增了 shuffle ?service master 節點,具體調用流程如下圖,所有的 external shuffle 節點啟動時都會注冊到 shuffle master 節點上,后續節點本身也會周期性的上報心跳和節點繁忙程度,DAGScheduler 后續請求遠程節點都從 shuffle master 申請,這樣不僅解決了冷啟動節點不足的問題,在節點選擇上也考慮了節點的健康程度。因為是先落盤后發送,在 stage 執行結束后會有一個等待時間,這里面會有個性能回退的問題,對小任務不友好,所以在生產應用中我們基于任務畫像系統 HBO 自動決定任務是否啟用RSS服務,目前生產大約7%的大任務在使用RSS 服務,這些任務平均執行時間縮短了25%,穩定性有了顯著提升。

目前B站生產中使用該方案基本解決了 shuffle 穩定性的問題,不過這套方案依舊需要計算節點配置本地 shuffle 盤,在本地落 shuffle 數據,無法支持存算分離的架構。后續我們在 k8s 上會大規模上線混部集群,需要盡量不依賴本地磁盤,避免對在線應用的影響,我們也關注到騰訊和阿里相繼開源各自的 RSS 方案,我們也在嘗試在生產中使用純遠程 shuffle 方案來滿足 Spark on K8s 的技術需要。

3.1.3 大結果集溢寫到磁盤

在adhoc 場景中用戶通常會拉取大量結果到 driver 中,造成了大量的內存消耗,driver 穩定性又直接影響著用戶即席查詢的體驗,為此專門優化了 executor fetch result 的過程,在獲取結果時會監測 driver 內存使用情況,在高內存使用下將拉取到的結果直接寫出到文件中,返回給用戶時則直接分批從文件中獲取,增加 driver 的穩定性。

3.1.4 單 SQL task 并行度、task 數、執行時間限制

生產上我們按隊列隔離了用戶的 adhoc 查詢,在實踐過程中經常性的遇到單個大作業直接占用了全部并行度,有些短作業直接因為獲取不到資源導致長時間的 pending 的情況,為了解決這種問題首先對單個 SQL 執行時間和總 task 數進行了限制,此外考慮到在 task 調度時有資源就會全部調度出去,后續 SQL 過來就面臨著完全無資源可用的情況,我們修改了調度方法對單個 SQL 參與調度的 task 數進行了限制,具體的限制數隨著可用資源進行一個動態變化,在 current executor 數接近于 max executor 的情況下進行嚴格限制 ,在 current executor 數明顯少于 max executor 的情況下,提高單 SQL 并行的 task 總數限制。

3.1.5 危險 join condition 發現& join 膨脹率檢測

危險 join condition 發現

在選擇 join 方式的時候如果是等值 join 則按照 BHJ,SHJ,SMJ 的順序選擇,如果還沒有選擇出則判斷 Cartesian Join,如果 join 類型是 InnerType 的就使用 Cartesian Join,Cartesian Join 會產生笛卡爾積比較慢,如果不是 InnerType,則使用 BNLJ,在判斷 BHJ 時,表的大小就超過了 broadcast 閾值,因此將表 broadcast 出去可能會對 driver 內存造成壓力,性能比較差甚至可能會 OOM,因此將這兩種 join 類型定義為危險 join。

如果不是等值 join 則只能使用 BNLJ 或者 Cartesian Join,如果在第一次 BNLJ 時選不出 build side 說明兩個表的大小都超過了 broadcast 閾值,則使用 Cartesian Join,如果 Join Type 不是 InnerType 則只能使用 BNLJ,因此 Join 策略選擇Cartesian Join 和第二次選擇 BNLJ 時為危險 join。

join 膨脹率檢測

ShareState 中的 statusScheduler 用于收集 Execution 的狀態和指標,這其中的指標就是按照 nodes 匯總了各個 task 匯報上來的 metrics,我們啟動了一個 join 檢測的線程定時的監控 Join 節點的 "number of output rows"及 Join 的2個父節點的 "number of output rows" 算出該 Join 節點的膨脹率。

傾斜 Key 發現

數據傾斜是 ETL 任務比較常見的問題,以 shuffle 過程中的傾斜為例,通常有以下幾個解決方法:增大 shuffle 的分區數量從而使數據分散到更多的分區中;修改邏輯,將 shuffle 時的 key 盡可能打散;單獨找出產生了極大傾斜的 key,在邏輯中單獨處理等等。但在進行這些處理之前,我們都需要先知道傾斜發生在 SQL 邏輯的哪個部分以及發生傾斜的是哪些 key。為了幫助用戶自助高效的解決數據傾斜問題,我們實現了傾斜 key 發現的功能。以 SortMergeJoin 為例,在 shuffle fetch 階段,首先根據 mapStatuses 計算出每個 partition size,并根據一定策略判斷該 task 所處理的 partition 是否傾斜。如果傾斜,則在 join 階段對數據進行采樣,找到發生傾斜的 key,通過 TaskMetric 發送到 driver 端,driver 端消費 metric后會記錄傾斜信息。

上面這些 bad case 在運行時發現后會自動將信息發送到我們內部作業診斷平臺,用戶可以查看并對語句做優化和改進。

3.2 性能優化

3.2.1 DPP 和 AQE 兼容

spark3.1 的 DPP 和 AQE 存在兼容問題,在使用 AQE 后 DPP 的策略就無法生效,這個問題在3.2得到了修復,我們將3.2的相關代碼 backport 回來,從 TPCDS 測試上看對3.1有很明顯的提升。

3.2.2 AQE 支持 ShuffledHashJoin

AQE 通過對 map 階段收集的指標數據來優化 Join 方式,對于存在小表的情況能將 SMJ 優化為 BHJ,這個操作可以顯著的優化性能。Spark的 shuffle 策略還有一個就是 ShuffledHashJoin,該策略性能相對較好,但內存壓力大,在默認情況下為了保證任務的穩定性我們將其關閉,基于 AQE 的思想,在 map 完成后收集 partition size,當最大的 partition size 小于定義的值后,通過新增 DynamicJoin 優化策略將 SMJ 優化為 SHJ。

3.2.3 Runtime filter

DPP 通過對大表直接進行 partition 級別的裁剪,可以大大提高查詢速度,但 DPP 的適用條件也相對嚴格,需要大表的分區列參與 join,但如果大表參與 join 的列為非分區列則無法應用。我們知道 shuffle 是比較耗時的操作,shuffle 的數據量越大,耗時越久,而且對網絡,機器 IO 都會產生比較大的壓力。如果能在大表 shuffle 前根據非分區列的 join 列對其進行過濾,即使無法像 DPP 一樣直接減少從存儲中讀取的數據量,但減小了其參與 shuffle 以及后續操作的數據量,也能獲得比較不錯的收益,這就是 runtime filter 的動機,即運行時預先掃描小表獲取 join 列的值,構造 bloom filter 對大表進行過濾。具體實現思路和 DPP 基本一致,首先在 SparkOptimizer 新增 DynamicBloomFilterPruning 規則,邏輯上類似PartitionPruning,符合一系列判斷條件后插入一個節點 DynamicBloomFilterPruningSubquery。與 DPP 不同的是,如果 join 可以被轉化為 BroadcastHashJoin,則不會應用該規則,因為在 BroadcastHashJoin 的情況下對大表進行預先的過濾其實是多余的(非 pushdown 的情況下)。判斷是否加入 filter 節點的主要邏輯如下,這里以裁剪左表(左右兩側都為 logicalPlan,為了方便表達,用左右表指代)為例進行說明,需要滿足以下條件:

右表 rowCount 需要小于左表

Join 類型支持裁剪左表

右表 rowCount > 0

右表 rowCount 小于?spark.sql.optimizer.dynamicBloomFilterJoinPruning.maxBloomFilterEntries,默認值為100000000,避免 bloom filter 占用內存過大

右表中沒有DynamicBloomFilterPruningSubquery

右表不是 stream 且存在?SelectivePredicate

左表(這里的左表是真正的左表或者包含左表的Filter節點)沒有?SelectivePredicate,因為如果存在?SelectivePredicate,那么下一步便無法根據統計信息去計算過濾收益

在 prepare 階段,PlanAdaptiveSubqueries 會把 DynamicBloomFilterPruningSubquery 節點替換為 DynamicPruningExpression(InBloomFilterSubqueryExec(_, _, _)),擴展了PlanAdaptiveDynamicPruningFilters,支持對以上節點進行處理。新增了 BuildBloomFilter 和 InBloomFilter 兩個 UDF。BuildBloomFilter 在 sparkPlan prepare 階段提交任務構造 BloomFilter 并 broadcast 出去,具體的 evaluate 邏輯還是交給 InBloomFilter。另外在 AQE 的reOptimize 階段也新增了規則 OptimizeBloomFilterJoin,這個規則主要是用來根據執行過程的 metric 信息更新BuildBloomFilter的expectedNumItems。

可以看到在開啟了runtime filter后數據量在join前從120億條降至3W條,收益還是相當明顯的。

3.2.4 Data skipping

目前B站離線表存儲主要使用 orc、parquet 格式,列式存儲都支持一定程度的 data skipping,比如 orc 有三個級別的統計信息,file/stripe/row group,統計信息中會包含count,對于原始類型的列,還會記錄 min/max 值,對于數值類型的列,也會記錄 sum 值。在查詢時,就可以根據不同粒度的統計信息以及 index 決定該 file/stripe/row 是否符合條件,不符合條件的直接跳過。對于統計信息及索引的細節見orc format ?(https://orc.apache.org/specification/ORCv1/)? 和 orc index (https://orc.apache.org/docs/indexes.html)? 。Parquet 與 orc 類似,也有相應的設計,具體見parquet format (https://github.com/apache/parquet-format)? 和 parquet pageIndex (https://github.com/apache/parquet-format/blob/master/PageIndex.md)? 。雖然 orc/parquet 都有 data skipping 的能力,但這種能力非常依賴數據的分布。前面提到統計信息中會包含每一列的 min/max 值,理論上如果查詢條件(比如> < =)不在這個范圍內,那么這個file/stripe/row group 就可以被跳過。但如果數據沒有按照 filter 列排序,那最壞的情況下,可能每個 file/stripe/row group的min/max 值都一樣,這樣就造成任何粒度的數據都不能被跳過。為了增加列式存儲 data skipping 效果,可以通過對數據增加額外的組織,如下:

select ? ? count(1) ? from ? ? tpcds.archive_spl_cluster ? where ? ? log_date = '20211124' ? ? and state = -16

表 archive_spl,不調整任何分布與排序

表 archive_spl_order,order by state,avid

通過對 state 進行 order 后 scan 階段數據量直接從億級別降至數十萬級別。在生產中我們通過對 SQL 進行血緣分析找到那些熱點表及高頻 filter 列,將這些熱列作為 table properties 存入 hms 中,在 Spark 執行時根據從 hms 中獲取的列信息,通過相應的優化規則,物理計劃自動增加 sort 算子,完成對數據組織。這個方案是基于列存優化數據組織來進行 data skipping,目前我們也在往索引方向上進一步探索。

3.3 功能性改進

3.3.1 對于ZSTD的支持

Spark 社區在3.2版本全面支持了 ZSTD 壓縮,為了更好的使用 ZSTD,我們在 Spark3.1 ?的基礎上引入了社區的相關 patch。其中也遇到了一些問題。在測試 ZSTD 的過程中偶然發現下推到 ORC 的過濾條件沒有生效,經調查發現是 ORC 代碼的 bug,在和社區討論之后,我們修復了該 bug并將 patch提交給了社區:https://issues.apache.org/jira/browse/ORC-1121 。

離線平臺的 Presto 也承接了很多 ETL 任務,由于 Presto 使用的是自己實現的 ORC reader/writer,所以在 Spark 升級 ORC 版本之后,對一些 Presto 寫出的表,出現了查詢結果錯誤的問題。正常情況下,Apache ORC writer 在寫文件時會記錄每個 stripe/rowGroup 中每列的統計信息,如 min/max 等。Apache ORC reader 在讀取文件時會根據這些統計信息結合下推的過濾條件進行 stripe/rowGroup 級別的過濾。但 Presto ORC writer 在寫文件時,如果 String 類型的列長度超過64 bytes,則這一列不會記錄 min/max 信息。雖然 Presto ORC reader 可以正常處理這類文件,但 Spark/Hive 使用的 Apache ORC reader 因為無法正常的反序列化 columnStatistics 得到正確的統計信息,導致做 stripe/rowGroup 級別的過濾時出現了錯誤的結果。我們也發現這個問題是由于 ORC 1.6 版本的一次代碼重構導致,1.5及之前是沒有該問題的。我們已在內部分支修復了該問題,也已將問題反饋給社區。

3.3.2 多格式混合讀兼容

歷史上很多表使用了 text 存儲,在資源上造成了很大的浪費,通過修改表的元信息可以保障新增分區切換到列存,這就造成了一個離線表可能存在多種 fileformat 的情況,為了兼容我們修改了 DataSourceScanExec 相關的邏輯,將reader 的實例化從基于table元信息粒度細化到分區元信息粒度。

3.3.3 轉表&小文件合并語法

為了方便用戶修改表的存儲格式和文件壓縮格式我們在引擎層提供了相關語法及具體實現。用戶可以通過指定分區條件對特定分區進行轉換。

CONVERT TABLE target=tableIdentifier ? ? ? ?(convertFormat | compressType) ?partitionClause? ? ? ? ? ? ? ? #convertTableMERGE?TABLE?target=tableIdentifier ? ? ? ?partitionClause? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? #mergeTable

3.3.4 字段血緣

作業間的依賴關系分析、數據地圖等業務都需要SQL血緣的支持,團隊后續工作(z-order , analyze , index)也需要依賴血緣,我們通過注冊一個 LineageQueryListener 繼承 QueryExecutionListener,在 onSuccess 方法拿到當前執行的QueryExecution,通過 analyzedLogicalPlan,利用 NamedExpression 的 exprId 映射關系,對其進行遍歷和解析,構建出字段級血緣(PROJECTION/PREDICATE)和 levelRelation(層級關系)。

3.4 基于歷史執行的自動參數優化(HBO)

Spark 提供了大量的參數設置,對于用戶而言了解這些參數并使用好需要花費很大的代價,在很多情況下不同的參數調優對于 spark 的作業執行和資源消耗會有很大差異。為了盡可能的適配任務執行,我們預設了一組參數,這種統一配置存在很多問題,以內存而言為了適配盡可能多的任務,該值設置偏大,通過對執行的分析發現大量的任務存在資源浪費的問題,整體的內存利用率僅20%左右。要求每個用戶成為專家對作業進行細致的調優顯然不可能,因此我們設計了 HBO 系統,具體的思路如下圖:

首先對任務執行的 SQL 進行了指紋計算,通過指紋來標識該任務每天執行情況,將每次執行中采集到的 metrics 收集后用策略進行分析給出相應的參數優化建議,在下次執行的時候根據指紋來獲取推薦的執行參數,對于使用默認參數的任務則進行覆蓋,對于那些用戶指定的參數則優先使用用戶參數。

內存優化策略:通過收集每個 executor 的峰值內存,如果峰值內存占配置內存比值低于30%,就推薦使用更少的內存來執行此次的計算,對于峰值內存占比過高的任務,則調大內存配置。通過這個策略生產上的內存使用率提升至50%左右。

并行度優化策略:生產上開啟了動態資源配置,在對數據分析時發現有些節點從分配后就沒有task執行過,完全浪費了節點的資源,對于這些任務會在下次執行的時候降低 spark.dynamicAllocation.executorAllocationRatio 值來降低執行并行度,此外默認提供的 spark.sql.shuffle.partitions 值對于大任務來說執行并行度不夠,后續也將進行自動的調整。

優化shuffle策略:如上文所講 RSS 對小任務存在性能下降的問題,通過對 block size、shuffle 數據量的分析,HBO 系統只會對那些 shuffle heavy 任務開啟使用 RSS 服務。

小文件合并策略:小文件合并會消耗額外的資源,對于不存在小文件情況的作業 HBO 系統會關閉小文件合并相關的配置。

此外平時工作中一些 feature 的上線也會依賴該系統進行一個灰度過程。

3.5 Smart Data Manager (SDM)

Smart Data Manager(SDM)是我們自研的一個對數據進行組織和分析的服務,通過對數據的額外處理將我們對 Spark 的一些技改真正落地。它的整體架構如圖,目前提供了如下的幾個數據組織和分析能力:

表存儲和壓縮方式的轉換:將表從 Text 存儲轉換為 ORC 或 Parquet 存儲,將壓縮類型從 None 或 Snappy 轉換為 ZSTD 可以帶來不錯的存儲和性能收益,SDM 提供了按分區對表異步進行轉換的能力。

數據重組織:在分區內部按列對數據進行 order/zorder 組織可以有效的提高 data skipping 的效果,新增分區通過查詢 table properties 中的排序列 meta 來改寫執行計劃應用,存量分區就可以通過 SDM 重刷。

Statistics 的統計:開啟 CBO 時需要依賴對表統計信息的收集,在對 hive 表的列進行索引時也依賴收集到的列基數和操作信息選擇合適的索引類型,通過 sdm 監聽 hms 的 partition 事件就可以在分區更新時異步完成信息采樣。

小文件合并:對有小文件較多的分區和表異步進行小文件合并,減少 namenode 的壓力

Hive 表索引:通過分析血緣信息得到熱表熱列上的高頻操作(點查,范圍查詢),基于此在分區文件層面異步的建立索引來加速查詢。

血緣解析:解析語句,分析字段血緣,吐出 UDF 血緣、算子(order by / sort by / group by...)影響關系等

對數據進行重組織時會涉及到對數據的讀寫,為了防止對生產作業的影響我們在進行操作時會修改相關表的 Table Properties 增加鎖表標記,各個計算引擎適配實現了類 Hive 的鎖管理機制,由 Hive metastore 統一作為 lock manager,在對表和分區并發操作場景下,做到對用戶完全透明。

4. Hive Meta Store 上的優化

B站使用 HMS(Hive MetaStore)管理所有的離線表元信息,整個的離線計算的可用性都依賴 HMS 的穩定性。業務方在使用分區表時存在不少4級及以上分區的情況,有多個表分區數超百萬。分區元信息龐大單次分區獲取代價高,原生 HMS 基于單個 MySQL 實例存在性能瓶頸。

4.1 MetaStore Federation

隨著多機房業務的推進,獨立業務的 HDFS 數據和計算資源已經遷移到新機房,但是 HIVE 元數據仍在原有機房的 Mysql 中,這時候如果發生機房間的網絡分區,就會影響新機房的任務。

為了解決上述問題,我們進行了方案調研,有兩種方案供我們選擇:

WaggleDance

HMS Federation

4.1.1 WaggleDance

WaggleDance是開源的一個項目(https://github.com/ExpediaGroup/waggle-dance),該項目主要是聯合多個 HMS 的數據查詢服務,實現了一個統一的路由接口解決多套 HMS 環境間的元數據共享問題。并且 WaggleDance 支持 HMS Client的接口調用。主要是通過 DB,把請求路由到對應的 HMS。

4.1.2 HMS Federation

HMS Federation 是解決多機房場景下的 HIVE 元數據存儲問題,HIVE 元數據和 HDFS 數據存儲在同一個機房,并且允許跨機房訪問 HIVE 元數據。比如主站業務的 HDFS 數據存放在 IDC1,那么主站業務 HDFS 數據對應的 HIVE 元數據就存在IDC1 的 Mysql,同樣直播業務的 HDFS 數據和 HIVE 元數據都存放在 IDC2。

同時 HMS Federation 也提供了 Mysql 的橫向擴容能力,允許一個機房可以有多個 Mysql 來存放 HIVE 元數據,如果單個 Mysql 的壓力過大,可以把單個 Mysql 的數據存放到多個 Mysql 里面,分擔 Mysql 的壓力。比如主站業務的 HIVE 庫,zhu_zhan 和 zhu_zhan_tmp,可以分別放在 idc1-mysql1 和 idc1-mysql2。

我們在 HMS Federation 中加入了一個 StateStore 的角色,該角色可以理解為一個路由器,HMS 在查詢 Hive 庫/表/分區之前,先問 StateStore 所要訪問的 HIVE 元信息存放在哪一個 Mysql 中,獲取到了對應的 Mysql 后,構建相應的ObjectStore,進行 SQL 拼接或者是利用 JDO 查詢后端 Mysql。

4.1.3 HMS Federation 與 WaggleDance 的對比

數據遷移

我們的主要目的是實現 HIVE 元數據按業務劃分到各自 IDC 的 Mysql

WaggleDance 并沒有提供相應元數據遷移工具,要遷移需要停止整個 HIVE 庫新建表/分區,才能夠開始遷移過去,對業務影響較大。

HMS Federation 可以按表的粒度遷移,對業務影響較小,并且可以指定某個 HIVE 庫下,新建表在新的 Mysql,舊的等待著鎖表遷移。

運維復雜度

WaggleDance 方案需要不同的 HMS,配置不同的 Mysql 地址,增加了 HMS 配置的復雜度。WaggleDance 是一個獨立的服務,為了保證可用性,運維復雜度會再一次提升。

HMS Fedration 是 HMS 的功能升級,在 HMS 代碼上開發,并且使用統一的配置。

綜合上述對比,我們最終選擇了 HMS Federation 的方案。通過修改 HMS 的代碼,實現元數據跨 Mysql 存儲。

4.2 MetaStore 請求追蹤和流量控制

HMS 在處理 getPartitions 相關請求的時候,如果拉取的分區數量非常多,會給 HMS 的堆內存,以及后端的 Mysql 帶來很大的壓力,導致 HMS 服務響應延遲。

為了能夠快速的定位到有問題的任務,我們在 Driver 中將 Job 相關的信息保存到 Hadoop CallerContext 中,在調用 HMS 接口的時候將 CallerContext 中的相關屬性設置到 EnvironmentContext 中透傳到 HMS 端,同時擴展了所有getPartitions 相關的接口支持傳遞 EnvironmentContext,EnvironmentContext 中的 properties 會在 HMS 的 audit log 中打印出來,方便問題任務的定位。

同時為了提高 HMS 服務的穩定性,我們在 HMS 端也做了接口的限流和主動關閉大查詢。對于限流,我們新增了一個 TrafficControlListener,當接口被調用的時候會以 function 和 user 為單位記錄 Counters 保存在該 Listener 中,同時在該Listener 中啟動采集 used memory 和 counters 的線程,當平均使用內存達到閾值時,檢查接口的QPS,如果qps達到閾值會讓調用接口的線程 sleep 一段時間,下一次檢查通過或者達到最大等待時間后放行。HMS 也有可能因為 getPartitions 方法返回的分區數量太大導致內存被打滿,一方面我們限制了 getPartitions 從 mysql 返回的分區數量,超過一定數量就直接拒絕該請求,另一方面我們在 TProcessor 中以 threadId 和 socket 為 key 和 value 保存當前的連接,在檢查 partition 數量時我們也按照 threadId 和 num partitions 為 key 和 value 保存 partition 的 cost,當 HMS 平均使用內存達到閾值超過一定時間后,會選擇 num partitions 最大的 threadId,再根據 threadId 獲取對應的連接,主動 close 該連接,來緩解內存壓力。

5. 未來的一些工作

調研不落地的 Remote Shuffle Service 來更好的適配 K8S 混部的場景

使用向量化技術加速 Spark 的執行引擎,提升計算性能

增強自動排錯診斷系統,提升平臺用戶體驗

我們會和業界同行和開源社區保持密切技術交流,在服務好內部用戶作業的同時,也會積極反饋社區,共建社區生態。

(部分內容來源網絡,如有侵權請聯系刪除)
立即申請數據分析/數據治理產品免費試用 我要試用
customer

在線咨詢

在線咨詢

點擊進入在線咨詢