日日碰狠狠躁久久躁96avv-97久久超碰国产精品最新-婷婷丁香五月天在线播放,狠狠色噜噜色狠狠狠综合久久 ,爱做久久久久久,高h喷水荡肉爽文np肉色学校

睿治

智能數據治理平臺

睿治作為國內功能最全的數據治理產品之一,入選IDC企業數據治理實施部署指南。同時,在IDC發布的《中國數據治理市場份額》報告中,連續四年蟬聯數據治理解決方案市場份額第一。

京東數據傾斜內部解決方案

時間:2022-02-22來源:吃不飽的大可愛瀏覽數:221

? ? ??本文討論了京東Spark計算引擎研發團隊關于自主研發并落地數據傾斜解決方案,助力京東大規模離線計算場景的探索和實踐。近年來,大數據技術在各行各業的應用越來越廣泛,Spark自UCBerkeley的AMP實驗室誕生到如今3.0版本的發布,已過十年,儼然已經成為大數據計算領域名副其實的老項目。雖經過不斷的迭代和優化,Spark功能日趨成熟與完善,但在性能及穩定性方面,仍然還有很多可以提升的地方。

? ? ??數據傾斜問題作為大數據計算領域中的一個頑疾,就是其中重點之一。我們希望在京東超大規模數據體量及復雜業務場景的背景下,通過自研數據傾斜優化算法,解決現有問題,打造穩定高效的JDSpark計算引擎,助力京東大促過程中的一些應用實踐,能夠給大家提供一些思路和啟發,同時也歡迎大家多多交流,給我們提出寶貴建議。

? ? ??什么是數據傾斜

? ? ??數據傾斜是分布式計算領域中最為棘手的問題之一,它是指在分布式的數據并行處理過程中,由于數據分片不均勻,導致大量數據集中到一個或少數計算節點上,導致少數任務的處理速度遠低于平均速度,甚至直接導致OOM內存溢出,進而拖慢整個計算環節。

? ? ??數據傾斜的發生往往與數據Shuffle緊密相連,在Join(連接)、Aggregate(聚合)、Window(窗口)等計算中,必須從各個前序計算節點中將相同key相關的數據拉取到某個節點上進行計算,當數據分布不均存在hot key時,其對應計算節點上的數據量便很可能超過其他節點。

? ? ??社區版實現

? ? ??通過對線上任務的分析,我們發現約80%的數據傾斜是由Join計算導致,傾斜Stage嚴重影響了任務的時效性,解決SkewedJoin迫在眉睫。所幸的是,從3.0開始,Spark引入了Adaptive Query Execution(AQE)機制,可以通過分析已完成Stage的統計信息,來動態的調整后續Stage的執行,以達到更佳的優化效果。圖1展示了AQE的基本架構。而SkewedJoin的優化處理,正是AQE中最重要的應用之一。

圖1 Adaptive Query Execution

? ? ??圖2、3來自于Databricks介紹AQE的博客。其中圖2展示了兩表Join情況下的數據傾斜,其中左側Table A發生了嚴重的數據傾斜,其分區A0的數據量遠大于其他三個分區A1~A3;在計算過程中,A0-B0對應的任務Task0的計算壓力將大于其他三個Task,進而拖慢整個Stage。

圖2 數據傾斜

? ? ??基于AQE框架,社區版本是這樣處理2表Join場景下的數據傾斜:在前序Stage完成后,AQE可以獲取其各個分區大小;其中的OptimizeSkewedJoin優化規則,將首先分析JoinType(如Inner、Left、Right、LeftSemi等)來判斷左右兩側是否可以進行Split分割處理,判斷的依據是Split之后再執行Join是否會破壞數據正確性;若可處理,則進一步分析分區大小的分布,檢測是否存在傾斜分區(默認邏輯為分區大小超過中位數的5倍);分析結束后,若存在可處理的傾斜分區(圖2中的A0),則對其進行分割(分割為A0-0,A0-1),并將對應分區(B0)進行Duplicate復制處理,這樣Task0(A0 - B0)將被分割成為兩個Task(A0-0 - B0;A0-1 - B0)。處理后的Stage將由4個Task變為5個Task,但各個Task的計算壓力得到了平滑,整個Stage的執行效率得到了提升。

? ? ??在實際中,我們發現這里有三點值得注意的技術細節:

? ? ??1、 JoinType約束:目前的Split+Duplicate處理機制,會對JoinType產生約束;具體見表1,如果圖3中的JoinType為RightOuter,則不能對A0進行分割,否則會導致數據正確性問題。

? ? ??2、 處理開銷:對應非傾斜分區會被多次讀取,如圖3中的B0被讀取兩次。

? ? ? 3、 組合爆炸:以Table A Inner Join Table B為例,若左右兩側的分區A0和B0均發生了傾斜,且分別被分割成M份和N份,則Task0最終會被拆分成M X N個Task,當M X N數值過大時,可能會導致一定的性能回歸。

圖3 基于AQE處理數據傾斜

表1 JoinType約束

? ? ??業界的改進

? ? ??通過觀察線上任務,社區版數據傾斜處理能力可以明顯提升命中任務的時效性,但其依然存在一些不足:

? ? ??1、模式限制:目前社區版本僅支持2表Join的場景,且匹配模式非常嚴格(見圖4,左側為SortMergeJoin,右側為ShuffledHashJoin);當前支持的模式中,兩側的輸入不能存在聚合、映射等其他節點。

? ? ??2、語義限制:即上文所提的JoinType約束,目前在Split+Duplicate處理機制下,類似Table A leftJoin skewed Table B的情況是不能被處理的。

圖4 目前社區版所支持的數據傾斜模式

? ? ??目前業界主要的優化方向在于放寬模式限制,即在AQE框架下支持更多的模式,以如圖5為例,假設Shuffle2中發生數據傾斜,但由于Shuffle1端存在Aggregate算子,導致匹配不上圖4中的模式。目前業界主要有兩種實踐方向:

? ? ??方案1:主動引入額外的Shuffle將復雜Stage轉化為社區版可以處理的簡單模式。在Aggregate節點后,主動插入Shuffle,導致Shuffle3與Shuffle2之間的Join計算滿足圖4中的模式;。

? ? ??方案2:擴展并維護傾斜模式集,需要不斷將線上遇到的傾斜模式添加進集合,如圖5這種單側存在聚合的模式。

圖5 左側存在聚合節點的數據傾斜

? ? ??

? ? ??業界的改進均可以拓寬數據傾斜處理的覆蓋面,但我們在引入京東平臺時,發現了其中的一些缺陷:

? ? ??方案1:主動引入的Shuffle有時是可以避免的,如圖5中的Shuffle3可以通過調整數據傾斜算法來避免的。

? ? ??方案2:除了圖5中的單邊聚合模式,還需要支持多表Join模式、含Union的模式等;特別的,傾斜模式之間可能是關聯的,比如Union節點的每個子節點分屬不同模式,或多表Join中的某些節點又包含Aggregate節點;這導致很難枚舉所有可能的模式,且難以維護,很容易陷入頭痛醫頭腳痛醫腳的困境。

? ? ??通過仔細分析平臺任務的傾斜模式,并深入調研業界已有的改進工作,我們歸納出了一種較為通用的優化算法。不再使用模式匹配的方法,而是分析節點語義,搜索出可以分割的傾斜葉子節點,并進行分割和組合處理。其處理邏輯如下:

OptimizeSkewedJoinV2 :

Check operator types

{ Non-Skewed SMJ/SHJ/Aggregrate/Window, BHJ/Sort/Project/Filter/… }

Find Splittable Shuffles:

Split Splittable Skewed partitions & Duplicate other partitions

Check Combinatorial Explosion & Re-split if necessary

Mark all SMJ/SHJ/Aggregate/Window Skewed

? ? ??步驟一:出于保證數據正確性的考慮,利用白名單(包含Sort、Project、BHJ、Non-Skewed SMJ/SHJ/Aggregate/Window等)對全部子孫節點進行類型檢查,若滿足要求,則可以進行后續優化;

? ? ??步驟二:語義分析,搜索Splittable葉子節點:

? ? ??遇到SMJ/SHJ (Inner/Cross)節點,搜索左右子節點;

? ? ??遇到SMJ/SHJ (LeftOuter/LeftSemi/LeftAnti)節點,搜索左子節點;

? ? ??遇到SMJ/SHJ (RightOuter)節點,搜索右子節點;

? ? ??遇到SMJ/SHJ (FullOuter)節點,停止當前路徑搜索;

? ? ??遇到Aggregate/Window節點,停止當前路徑搜索;

? ? ??遇到其他節點,則搜索其子節點;

? ? ??所有最終到達的Shuffle節點均為Splittable;

? ? ??步驟三:利用分區信息,分析Splittable葉子節點是否發生傾斜;若發生傾斜,則對其進行分割,并復制其他分區數據;

? ? ??步驟四:對組合數進行檢測,當超過閾值(默認1000)時,會對分割數進行壓縮;

? ? ??步驟五:對分割后的數據進行組合遍歷,生成最終的Task。并將樹結構中的SMJ/SHJ/Aggregate/Window節點標記為Skewed狀態,以避免EnsureRequirements引入新的Shuffle;

圖6 在復雜Stage中搜索Splittable葉子結點

? ? ??以圖6為例,步驟二中篩選出Shuffle 0/2/4具備可分割性,當其中發生數據傾斜時,則可以被處理;步驟三中,對于每個分區,會通過遍歷所有的Split組合生成優化后的任務;

假設對于分區0,Shuffle 0/2/4均發生了數據傾斜,并均被分割成了100份,則通過組合遍歷生成一百萬(100X100X100)個新Task,但這會導致Shuffle 1/3/5中分區0的數據均被讀取了一百萬次;步驟四會對組合數進行壓縮,將分割方案由100X100X100壓縮至10X10X10。

? ? ??線上案例

? ? ??目前我們設計的新算法已經上線一段時間,應用于保障關鍵鏈路中的計算任務。

圖7 四表Join傾斜

? ? ??圖7為一個典型的線上案例,在計算過程中發生了4表Join傾斜(2SMJ+1BHJ),其中最左側的Shuffle數據發生了傾斜;

圖8 四表Join傾斜優化相關日志


? ? ??圖8中的日志記錄了整個處理過程,從Stage的Project節點開始,首先發現兩個SMJ(#2002,#1996)和三個Shuffle輸入(#12,#9,#11);分析中發現JoinType均為LeftOuter,故最左側的Shuffle(#12)滿足Splittable條件;接著對Shuffle(#12)進行分析,發現多個傾斜分區,于是進行分割處理;

圖9 四表Join傾斜優化時效性優化


? ? ??優化前Stage 20共包含3000個Task,耗時長達1~2小時;優化后共生成3051個Task,耗時優化至11分鐘,顯著提高了時效性。


? ? ??總結與展望

? ? ??京東Spark平臺承擔了海量的計算任務,常有幾千行的大SQL出沒,計算邏輯千差萬別。我們對線上傾斜模式進行分析,并對業界處理方法進行借鑒,歸納總結出一種較為通用的處理算法。不過目前仍然存在不少的待優化項需要繼續進行攻克:

? ? ??1、目前的算法需要所有葉子節點均為Shuffle/Broadcast類型,尚不支持Bucket Join。后續需要支持DataScan類型的葉子節點,以支持Bucket Join的推廣。

? ? ??2、突破JoinType限制:通過對線上任務的分析,目前仍然有一定量的類似A leftJoin skewed B、A leftJoin skewed B leftJoin C的傾斜模式。目前Split+Duplicate處理機制不能處理這種傾斜模式,需要突破當前限制,進一步拓展覆蓋度。

? ? ??3、目前我們的工作主要圍繞Join開展,未來需要進一步支持Window和Aggregate算子引發的數據傾斜。

? ? ??在不斷提升平臺整體性能的同時,我們會繼續深度鉆研數據傾斜、數據膨脹等異常場景,保障長尾任務的穩定性,為京東大數據平臺的湖倉一體升級工程打造堅實可依賴的高效引擎基礎。

? ? ??我們還會同開源社區和業界同行保持密切的技術交流,在支撐好內部計算任務的同時,也會把相關優化工作回饋給社區,共建Spark生態。

(部分內容來源網絡,如有侵權請聯系刪除)
立即申請數據分析/數據治理產品免費試用 我要試用
customer

在線咨詢

在線咨詢

點擊進入在線咨詢