- 產品
- 產品解決方案
- 行業解決方案
- 案例
- 數據資產入表
- 賦能中心
- 伙伴
- 關于
時間:2022-03-15來源:小鎮姑娘瀏覽數:415次

導讀:B站千億級數據同步,每天100T+數據導入是如何實現的?本文將介紹Apache SeaTunnel在嗶哩嗶哩的實踐。包括以下幾方面內容:
工具選擇
日志
提速/限流
監控自理
01工具選擇數據集成和數據出倉的大體流程如下圖所示,主要以數倉為中心,從HTTP、MySQL等外部數據源抽數入倉,在數倉內做相應業務處理后,出倉到對應的Clickhouse、MySQL等存儲,供業務使用。

B站數據平臺在離線出入倉工具上目前主要有兩類。一類是基于DataX二次開發的Rider項目,另一類是基于Seatunnel 1.1.3二次開發的AlterEgo項目。

上圖展示了Rider的架構。
Rider在使用上支持T+1、H+1定時調度,在數據源上支持HTTP、MySQL、BOSS等作為數據源,此外Rider在使用上主要原生讀寫Hdfs文件。
DataX雖然在單個進程下已經足夠優秀,但是不支持分布式,另外在大數據量下表現不是很好,Seatunnel在分布式場景下表現優秀,一方面構建在spark之上,天然分布式,另外且自帶了很多插件,非常適合二次開發。我們調研后,在Seatunnel基礎上二次開發了AlterEgo項目。

AlterEgo的工作流程:Input[數據源輸入] -> Filter[數據處理] -> Output[結果輸出]。
集群規模方面,由于歷史原因,目前離線集群與出入倉工具集群是分開部署的。出入倉工具集群節點數20+,CPU核數750+,內存1.8T+;每日出入倉調度任務同步數據方面,日均記錄數上千億,日均數據量在100T以上。
在落地方面,主要提供了界面化操作,對任務做了抽象和封裝,平臺化后提供給用戶使用。平臺會根據用戶不同選擇,把任務封裝為Seatunnel的配置格式或DataX的Json格式配置。目前平臺的功能如圖,用戶可以選擇數倉數據源的庫名、表名和出倉后存儲的目標數據源的信息。這里為了管理接入血緣考慮,和安全規范使用流程,用戶在界面內不被允許填寫用戶名密碼,可以選擇對應創建的數據源。在存儲個性上,比如為兼容MySQL協議的數據源的導入上,支持了Insert Ignore、Insert Update方式導入數據。字段映射可以通過界面上拖拽完成配置。

在任務運維界面,通過DAG查看調度任務上下文,出倉和入倉的整個過程中,任務是互相依賴的,前面的任務出問題會導致后面的任務產出慢、數據延遲等。因此排查問題的過程中,往往需要在任務DAG中找到上游依賴最長的鏈路或是未完成鏈路并排查問題。
02日志平臺化落地其實難度不大,套個皮就可以了,但是很多時候我們要考慮的是面向運維開發,細節都是對用戶是封裝好的,需要為用戶提供足夠的運維工具支持。這里以日志為例,排錯是很常見的場景,當用戶排錯時,用戶并不希望看到密密麻麻且無用的Yarn日志,但如果使用spark日志,由于Spark環境配置繁瑣,直接暴露Spark UI給用戶也會讓用戶的使用體驗不佳。此外,在后期我們整合入離線大集群后,集群節點數目有四五千個節點,集群規模大就導致日志聚合會慢,日志響應時間長。

為解決日志查詢困難的問題,我們對日志層做了優化,在Spark上使用LogAgent把業務日志轉發到我們的日志服務上。為方便查詢,且讓日志歷史信息可追溯,LogAgent在日志中追加了jobId、jobHistoryId、priority等信息。這樣采集日志后,我們會根據日志內信息做各類告警,例如當任務出倉條數為0時發送告警等。此外,當日志有報錯打出,用戶可以直接在日志界面展示的日志里定位程序的問題,所有操作直接在平臺就可以完成,而不需要其他復雜的配置。
03提速/限流1. ClickHouse出倉
Clickhouse數據出倉方式有三種:
寫分布式表:寫入性能偏低,代碼比較簡單,不需要依賴RDD Repartition。
寫Local表:需要在本地做一次repartition,會有性能壓力。但寫入性能會更高,和寫分布式表一樣,主要用Jdbc協議。
BulkLoad:BulkLoad將寫壓力前置到Spark層,寫入速度快,降低了Clickhouse側壓力,寫入不影響讀性能,做到讀寫分離,更加安全。依賴的是文件復制。
Clickhouse出倉任務調度記錄達到60億以上,數據量達到13T以上;手動補數據數據量在70T,數據量和記錄數都在不斷增長。
2. 創作中心-出倉加速
簡單介紹下我們對創作中心的數據出倉做了一系列優化,加速了數據出倉過程。
創作中心使用大量使用TiDB,我們利用jdbc協議批量寫數據。當寫得快會導致TiDB-Server IO高、壓力大。另外在數據出倉過程中,可能有新建分區表的需求,當出現DDL操作時,寫入很容易出現Information schema is changed導致失敗。如果存在更新數據場景,也會由于Insert update時需要把數據全部讀出,當任務出現失敗后重試時任務耗時會增長,性能降低。此外,TIDB集群是有限的,多個業務同時寫入TiDB時候會出現多實例競爭寫入資源,導致寫入時間耗時增加。
以上問題,我們的應對方案主要是基于業務大都為KV查詢,用自研的分布式KV存儲TaiShan替代TiDB。創作中心業務主要集中在點查和Range查詢,比較適合KV類存儲。TaiShan是B站自研的分布式KV存儲,經過多次出倉實際壓測,TaiShan的Batch寫入方式和TiDB性能接近,實際使用并沒有多少性能提升,寫入多時也會影響到讀的業務。我們最終采用正在做的Bulk Load方式寫入TaiShan。Bulk Load優化和前面介紹的Clickhouse的優化類似,將寫入壓力前置,放到SeaTunnel層來生成數據文件;對于業務庫能實現簡單的讀寫分離,但可能會存在一些熱點問題,需要前置一次repartion。

我們對jdbc協議寫入TiDB和BulkLoad的方案分別做了壓測,TiDB寫入3、4億條數據多實例寫入的情況下,壓測任務要運行兩小時以上,TaiShan只需要十幾分鐘即可跑完壓測,從結果來看Bulkload簡直不要太好,但有個無法回避的問題是TiDB集群是多個業務同時寫入的,分散到單個任務看起來寫入時間長。
我們也在嘗試繞過TiDB直接將數據寫入TiKV,這個方案我們也在調研和實踐中,感興趣的小伙伴可以看下:https://github.com/tidb-incubator/TiBigData

3. 限速
在出倉場景,實際上還要考慮限流以及熔斷,沒有限速可能導致業務庫有一些問題,畢竟服務器能力有限,寫的太快將導致讀有影響。最開始我們使用的方法是在代碼內Sleep,簡單實現就是假設數據寫入很快,可以在一毫秒內完成,那么寫入的耗時就是代碼中sleep的時間,假如為例限速1w或5k,我們會通過sleep的時長就可以得出Spark需要的Executor數,達到間接的、不準確的限流。漏斗桶和令牌桶一個限制流入一個限制流出,我們用的不是很多。分布式的話我們小范圍使用了Sentinel,但分布式限流如果觸發熔斷可能由于寫入資源有限而寫入一直處于熔斷狀態,導致寫入時間長、數據任務破線。BBR算法是個很好的工具,有很多種實現,依賴的參數很多,甚至可以有對端的CPU及內存水位,不斷嘗試得到最佳寫入量,在使用上可以很好的改善峰值問題。
04監控自理1. 監控
在使用上,我們承接了幾乎所有的離線出倉和入倉任務,作為數倉的零層和尾層,在入倉和出倉時需要及時感知可能存在的問題,一方面任務打優先級,方便分級處理,在運行時,基于歷史指標預測當前任務的指標,當出現問題時及時告警接入檢查。
AlterEgo中,我們在Spark的Application Job內定時上報寫入速度和寫入的數據量。Rider是常駐的可以運行多個Job,在Job中以Job為單位上報監控數據。AlterEgo和Rider的數據全部接入到消息隊列里,消息最后被Aulick消費。Aulick內設計了多種指標監聽器,用于任務的監控,包括運行時間、起止時間、速度、數據量、失敗重試次數和TiDB和MySQL特有的插入/更新數據量?;谶@些采集的指標數據,可以做到任務實例頁,方便用戶查看,另外匯總信息可以通過Grafana工具以及其他BI工具功能展示,異常告警交由Sensor做告警觸發。

數據采集上,Rider方式實現有的內存Channel可以拿到同步的數據量等指標信息。AlterEgo方式由于是分布式,會有多個Executor進程同時上報,目前我們主要是通過自定義了累加器完成指標的上報,Executor端在使用累加器時實例化定時采集線程,由于各個Executor進程啟動時間不同,所以在上報時的時間點是不準確的,在使用上我們把時間按照10秒一個窗口進行規整,如在0-10秒上報的數據會全部規整到0秒上進行匯總。

2. 自理
在數據同步過程中,在數據維度上,需要發現異常讀寫速度、異常讀寫流量和異常走勢,出現問題及時監控和報警,報警會有電話報警到對應負責人,對數據異常進行處理,防止由于上游數據導入任務異常導致下游數據產出問題。這類其實也可以通過DQC去做,但側重點不同,這里更關注事中觸發。
在時間維度上,基于任務歷史預測,數據同步任務到時間未啟動,或者任務已提交到Yarn但是由于資源不足沒有啟動,數據同步任務執行時間過長,也是需要及時處理。
在診斷方面,在任務失敗后需要解析Error日志進行失敗歸因以及跟蹤,方便用戶自理,有一定量以后,還可以做任務的統計,以及資源優化。
05精彩問答Q1:3-4億的BulkLoad壓測性能提升是如何實現的?
A:BulkLoad是先寫數據到本地磁盤,然后推到存儲系統,由存儲系統加載到內存,首先是Seatunnel分布式的,在執行時是分布式多進程生成各自的數據文件,最后再把數據文件推到存儲系統, 實際的性能就看開多少并發度了,并且整個過程不會太消耗存儲系統的CPU及IO壓力,對讀也是非常友好。
Q2:限速階段如何感知下游壓力?
A:感知下游壓力,目前簡單做法可以通過后端返回的RT時間或者失敗來感知到存儲端壓力,目前熔斷可以簡單這么做,但這種不精細,無法處理好峰值問題,高級玩法可以參考BBR算法,依賴的參數很多,可以有存儲段的CPU及內存水位,可以很好的改善峰值問題。
Q3:能否監控精細到Byte字節數?
A:大部分細節指標需要自己試下,B站這邊是通過自定義d累加器實的現,在寫入數據時記錄數據字節數、條數等,簡單點可以自己getBytes拿到,我們這邊累加器會定時上報到消息隊列,然后由Aulick消費數據后再進行相應地做報警動作。比如如果傳輸的字節量,字節速率存在異常,就可以及時的發下,找相應的同學協同排查問題。
Q4:DataX和Seatunnel是否可以互相替代?
A:工具平臺在落地上,互相替換是很有必要的,出去性能差別外,在執行上也只是配置文件的區別。在集群規模很大后會收到很多環境問題、異常及各類問題影響,在工具使用上有個降級方案還是很有必要的。比如我們最近遇到了JDK的Bug,在kerberos認證時認證隊列可能拉長,引起寫入速度慢,事故當時,DataX在寫數據時需要經過認證,無法及時定位問題,大部分集成任務運行出現速度慢、以及超時。為了防止事故再次出現,我們已經實現了大部分任務的可互相替換運行數據同步任務,實現任務的高可用。
下一篇:企業數據治理體系...