- 產品
- 產品解決方案
- 行業(yè)解決方案
- 案例
- 數(shù)據(jù)資產入表
- 賦能中心
- 伙伴
- 關于
-
數(shù)據(jù)治理
-
醫(yī)療衛(wèi)生
制造
-
億信動態(tài)
時間:2022-11-08來源:九分帥氣瀏覽數(shù):751次
今天談下大數(shù)據(jù)平臺構建中的數(shù)據(jù)采集和集成。在最早談BI或MDM系統(tǒng)的時候,也涉及到數(shù)據(jù)集成交換的事情,但是一般通過ETL工具或技術就能夠完全解決。而在大數(shù)據(jù)平臺構建中,對于數(shù)據(jù)采集的實時性要求出現(xiàn)變化,對于數(shù)據(jù)采集集成的類型也出現(xiàn)多樣性,這是整個大數(shù)據(jù)平臺采集和集成出現(xiàn)變化的重要原因。
首先在這里表面一個觀點,即:
不用期望通過單一的一個工具或技術來完成大數(shù)據(jù)采集和集成工作,而是需要針對數(shù)據(jù)采集的實時性需求,數(shù)據(jù)采集的類型,數(shù)據(jù)量大小等采用不同的方法和技術。
因此今天主要針對不同的大數(shù)據(jù)采集和集成場景做下說明。

結構化數(shù)據(jù)庫的實時同步復制,最早出現(xiàn)在類似異地雙活,多中心基礎設施架構的搭建中。而當前在很多數(shù)據(jù)庫讀寫分離集群的場景中也經常用到。簡單來說就是通過數(shù)據(jù)庫同步復制,將讀寫分離,實現(xiàn)讀集群本身的水平彈性擴展能力。
對于數(shù)據(jù)庫實時同步和復制一定會談到的兩款商用產品就是Oracle GoldenGate和Quest SharePlex,具體的介紹網(wǎng)上也比較多,其核心特點就是支持異構數(shù)據(jù)庫之間的實時數(shù)據(jù)同步和負責,而且對源數(shù)據(jù)庫本身侵入性很小。
兩個商用產品基本都是對各種數(shù)據(jù)庫的Log日志文件進行分析,然后進行復制。
那對于這塊如果要自研來實現(xiàn)有無可能?
對于Mysql來說由于采用Binlog日志方式,類似淘寶的Otter已經可以完整地實現(xiàn)數(shù)據(jù)庫的實體同步復制。如果單純是Oracle-Oracle數(shù)據(jù)庫之間,我們也可以采用Oracle DataGuard或者Oracle Stream流復制技術進行復制,還有就是基于Oracle LogMiner進行redo log日志分析后進行兩個數(shù)據(jù)庫之間的同步。
因此關鍵問題還是在異構數(shù)據(jù)庫之間的同步復制上。對于數(shù)據(jù)庫復制,Oracle當前常用的解決方案主要有:
oracle日志文件,比如LogMiner,OGG,SharePlex
oracle CDC(Change Data Capture)
oracle trigger機制,比如DataBus , SymmetricDS
oracle 物化視圖(materialized view)比如淘寶的yugong開源
在這些解決方案里面可以看到有開源的SymmetricDS解決方案,但是是基于觸發(fā)器機制,侵入性還是相對較大。也有淘寶的yugong可以實現(xiàn)Oracle->mysql的全量或增量復制,但是基于增量物化視圖方式,本身會影響到源庫數(shù)據(jù)表的CUD操作。
而實際上最佳的解決方案仍然是基于log日志的實時同步復制,其核心思路包括三個步驟
在源庫設置為記錄日志或歸檔模式,源庫首先能夠記錄下日志信息。實時的能夠讀取到日志信息,并對日志信息進行解析或適當轉換映射,包括和目標庫的適配。在目標數(shù)據(jù)庫直接運行相應解析后的日志SQL語句,實現(xiàn)同步更新。由于Mysql本身提供可讀性很強的Binlog日志,因此可以看到Mysql->Mysql,Mysql->Oracle的實時同步日志問題是可以得到很好解決的。而對于Oracle->Oracle也可以解決,較難的就是Oracle->Mysql或其它異構數(shù)據(jù)庫,這里需要分析Oracle本身的redo log日志(當前Oracle提供有l(wèi)ogminer工具),如果我們自己寫一個解析包的話就需要對Oracle redo log結構有完整的了解。
而結合Oracle 流復制技術,我們可以考慮Oracle首先將變更信息寫入到自己的AQ,然后我們從AQ訂閱消息后直接處理或者寫入到我們自己的消息隊列或流處理軟件,然后在流處理軟件中完成相關的映射轉換后寫入到目標異構數(shù)據(jù)庫中。

如果從Hadoop提供的標準技術架構和開源工具集,對于數(shù)據(jù)采集和集成部分重點就是兩個工具,一個是Sqoop,一個是Flume。
Sqoop主要用于在Hadoop(Hive)與傳統(tǒng)的數(shù)據(jù)庫(mysql、postgresql...)間進行數(shù)據(jù)的傳遞,可以將一個關系型數(shù)據(jù)庫(例如 :MySQL ,Oracle ,Postgres等)中的數(shù)據(jù)導進到Hadoop的HDFS中,也可以將HDFS的數(shù)據(jù)導進到關系型數(shù)據(jù)庫中。
Flume是Cloudera提供的一個高可用的,高可靠的,分布式的海量日志采集、聚合和傳輸?shù)南到y(tǒng),F(xiàn)lume支持在日志系統(tǒng)中定制各類數(shù)據(jù)發(fā)送方,用于收集數(shù)據(jù);同時,F(xiàn)lume提供對數(shù)據(jù)進行簡單處理,并達到各種數(shù)據(jù)接受方(可定制)的能力。
對于兩者的區(qū)別簡單說明如下:
Sqoop只支持結構化數(shù)據(jù)和HDFS之間的數(shù)據(jù)集成,F(xiàn)lume支持文件和日志
Sqoop基于Mapreduce的批處理機制,F(xiàn)lume基于事件和流處理機制
Sqoop偏定時處理,F(xiàn)lume偏實時或準實時處理
當面對的是批量和大數(shù)據(jù)的時候,Sqoop性能好于Flume
在采用Sqoop方式進行數(shù)據(jù)采集和集成的時候,需要考慮的就是增量數(shù)據(jù)采集。增量數(shù)據(jù)導入分兩種,一是基于遞增列的增量數(shù)據(jù)導入(Append方式)。二是基于時間列的增量數(shù)據(jù)導入(LastModified方式)。當前這兩種方式Sqoop已經支持。

而對于Flume,最早僅用于日志文件的實時采集和處理,而當前的Agent已經能夠支持對結構化數(shù)據(jù)庫的適配,也就是說結構化數(shù)據(jù)庫的數(shù)據(jù)也可以類似流處理的方式采集到Hdfs庫。

DataX 是阿里開源的一個異構數(shù)據(jù)源離線同步工具,致力于實現(xiàn)包括關系型數(shù)據(jù)庫(MySQL、Oracle等)、HDFS、Hive、ODPS、HBase、FTP等各種異構數(shù)據(jù)源之間穩(wěn)定高效的數(shù)據(jù)同步功能。
DataX本身作為數(shù)據(jù)同步框架,將不同數(shù)據(jù)源的同步抽象為從源頭數(shù)據(jù)源讀取數(shù)據(jù)的Reader插件,以及向目標端寫入數(shù)據(jù)的Writer插件,理論上DataX框架可以支持任意數(shù)據(jù)源類型的數(shù)據(jù)同步工作。對于DataX整個架構設計最好的地方就是將Reader和Writer分離,你可以靈活地定義各種讀寫插件進行擴展。
Reader:Reader為數(shù)據(jù)采集模塊,負責采集數(shù)據(jù)源的數(shù)據(jù),將數(shù)據(jù)發(fā)送給Framework。Writer:Writer為數(shù)據(jù)寫入模塊,負責不斷向Framework取數(shù)據(jù),并將數(shù)據(jù)寫入到目的端。
我們自己當前研發(fā)和使用的DIP大數(shù)據(jù)集成平臺,也是在DataX底層引擎的基礎上擴展了數(shù)據(jù)源配置,數(shù)據(jù)對象定義和管理,數(shù)據(jù)調度和任務管理,日志監(jiān)控等功能。形成一個完善的大數(shù)據(jù)采集和集成工具平臺,如下:

對于DataX可以看到實際和Sqoop大部分功能都相同,但是兩者本身架構實現(xiàn)機制還是有差異。Sqoop本身是基于Hadoop的MapReduce機制進行分布式作業(yè),而對于DataX則是自己對Job進行切分,然后并行執(zhí)行。
對于DataX和Sqoop實際在單節(jié)點測試情況來看,兩者在性能上的差距并不明顯。但是數(shù)據(jù)源是Oracle,Msyql數(shù)據(jù)庫的時候,DataX的性能略好;而當數(shù)據(jù)源是Hdfs庫的時候,Sqoop性能略好。但是開源的DataX不支撐分布式集群,這個本身也對于大數(shù)據(jù)量下的架構擴展有影響。單節(jié)點的峰值傳輸能力在15M/S左右。
當前gitbub有對datax定制的管理平臺開源,可以參考:
https://github.com/WeiYe-Jing/datax-web
而對于常規(guī)的數(shù)據(jù)庫包括大數(shù)據(jù)存儲之間的采集和集成,在充分考慮性能的情況下,核心思路為:
1. 將源數(shù)據(jù)庫數(shù)據(jù)進行導出,使用Sql或DB原生的導出命令直接導出為txt文件,字段以分隔符進行分隔。1.1 可以部署多個代理端,對數(shù)據(jù)庫數(shù)據(jù)啟用多個線程進行導出1.2 支持基于key值或時間戳的增量數(shù)據(jù)導出2. 對導出的數(shù)據(jù)進行壓縮后進行傳輸(特別是在源和目標庫不在同一個數(shù)據(jù)中心時)3. 在目標庫端基于數(shù)據(jù)庫原生的load命令對數(shù)據(jù)進行bulk批量導入。在整個實現(xiàn)里面有兩個核心,一個就是可以啟用多個代理端和多線程機制并行導出數(shù)據(jù)庫,其次就是導出數(shù)據(jù)壓縮傳輸,然后在通過load data原生命令進行數(shù)據(jù)庫的bulk批量裝載提升性能。

如果基于以上思路我們可以看到數(shù)據(jù)采集的重點還是在性能上面,不會去實現(xiàn)ETL工具本身復雜的數(shù)據(jù)映射和轉化,數(shù)據(jù)聚合等操作。核心只是做異構數(shù)據(jù)庫和Hdfs文件存儲之間的數(shù)據(jù)搬移。而我們完全自己研發(fā)的DataPipe產品基本參考上述思路實現(xiàn),其測試性能對于結構化數(shù)據(jù)庫之間采集和集成是Sqoop或DataX的2-3倍左右,而對于hdfs之間的集成則在5-10倍左右的性能提升。
該思路在遠程數(shù)據(jù)傳輸和集成中,有明顯的性能優(yōu)勢。比如內蒙數(shù)據(jù)中心的批量數(shù)據(jù)要傳輸?shù)劫F州大數(shù)據(jù)中心。一個10G的數(shù)據(jù)在源端導出后并壓縮后只有100M左右的大小,整個處理機制則是將壓縮數(shù)據(jù)傳輸?shù)劫F州后再進行解壓和入庫。
但是整個方案涉及到需要在源端配置Agent代理,因此本身對源端具有一定的侵入性,導致整體應用效果并不太好。
對于這種采集存在的約束就是不要去處理數(shù)據(jù)變更的問題,僅僅是做數(shù)據(jù)的全量同步或者是數(shù)據(jù)庫表數(shù)據(jù)的簡單Append處理,否則性能本身會下降很多。如果有大量數(shù)據(jù)更新需要同步,最好的方式還是首先Truncate掉目標數(shù)據(jù)庫表,然后再進行全量同步。簡單驗證對于Mysql數(shù)據(jù)庫間100萬數(shù)據(jù),180M左右數(shù)據(jù)量的全量同步整體同步時間在14秒左右即全部完成。
雖然這個采集工具現(xiàn)在沒有大范圍使用,但是卻對整體大數(shù)據(jù)采集和集成實施,功能擴展方面積累了相應的技術經驗。
流處理模式在前面談Flume日志采集,當時對于日志采集和分析還有比較主流的ELK方案,其中對于日志采集部分重點通過Logstash來實現(xiàn)。

Logstash是一款開源的數(shù)據(jù)收集引擎,具備實時管道處理能力。簡單來說,logstash作為數(shù)據(jù)源與數(shù)據(jù)存儲分析工具之間的橋梁,結合ElasticSearch以及Kibana,能夠極大方便數(shù)據(jù)的處理與分析。
從上圖可以看到Logstash核心就是一個數(shù)據(jù)源和數(shù)據(jù)存儲間的連接通道,而在ELK方案里面ElasticSearch就是支持全文檢索的分布式存儲。如果采集的數(shù)據(jù)量和并發(fā)量很大,還可以在ElasticSearch前增加Kafka消息中間件來實現(xiàn)前端輸入的削峰處理。
實際上可以看到ELK方案本身和大數(shù)據(jù)平臺的采集和集成關系并不密切,可以看做是針對日志采集分析的一個補充。
如果上面的方式更多的是流式采集和存儲的話,還有一個就是流式計算。簡單來說就是采集過來的數(shù)據(jù)不是簡單的導入到目標存儲中,而是對采集到的數(shù)據(jù)進行實時的處理和加工,將處理完成后的中間結果存儲到目標庫中。
比如當前談得比較多的SparkStream流式計算框架。
舉個簡單例子,當前我們的ESB總線每天運行3000萬次,產生3000萬條的實例日志記錄,但是我們并不希望將所有數(shù)據(jù)寫入到目標庫,而是希望按分鐘為單位寫入一個統(tǒng)計數(shù)據(jù)到目標庫。傳統(tǒng)方式我們是定時進行處理,而采用流式計算框架后可以做到實時或準實時處理。
前面談采集,可以看到在源和目標之間增加了一個采集集成工具。
即:?源端 -- 采集集成工具 -- 目標端
而流式計算框架后整個過程增加了計算環(huán)節(jié)如下:
即:?源端 -- 采集集成工具 -- 計算 - 目標端

Spark Streaming 是一套優(yōu)秀的實時計算框架。根據(jù)其官方文檔介紹,Spark Streaming 有高擴展性、高吞吐量和容錯能力強的特點。Spark Streaming 支持的數(shù)據(jù)輸入源很多,例如:Kafka、Flume、Twitter、ZeroMQ 和簡單的 TCP 套接字等等。
所以當你的場景不是簡單的將原生數(shù)據(jù)無變化的采集到大數(shù)據(jù)平臺的貼源層,而是需要進行加工處理僅僅寫入中間態(tài)數(shù)據(jù)的話,就需要在傳統(tǒng)方案的基礎上增加類似SparkStream處理環(huán)境,或者進行二次采集集成處理。
在線咨詢
點擊進入在線咨詢