- 產品
- 產品解決方案
- 行業解決方案
- 案例
- 數據資產入表
- 賦能中心
- 伙伴
- 關于
時間:2022-05-05來源:邂逅瀏覽數:1259次
要實現數據中臺,一個最基本的要求就是同步交易系統接口數據。實現接口數據同步的方式主要有3種:全量同步、增量同步、流式數據同步,其中流式數據又分為業務流數據和日志流數據。
要實現數據中臺,一個最基本的要求就是同步交易系統接口數據。實現接口數據同步的方式主要有3種:全量同步、增量同步、流式數據同步,其中流式數據又分為業務流數據和日志流數據。
接口數據同步是數據中臺的一項重要工作,是在搭建數據中臺的過程中需要投入很多精力完成的。雖然單個表的數據同步任務難度不大,但是我們需要在數據中臺實現標準化配置,這樣才可以提高工作效率,為后續的數據中臺運維和持續擴充接口打下良好的基礎。
全量接口同步一般而言,全量接口同步是數據中臺必不可少的功能模塊。不管是增量數據同步還是流式數據同步,都是在全量接口同步的基礎上進行的。
全量接口同步一般針對T+1的業務進行,選擇晚上業務低峰和網絡空閑時期,全量抽取交易系統的某些業務數據。一般來說,雖然全量接口同步占用時間長,耗費網絡寬帶高,但是數據抽取過程簡單、準確度高,數據可靠性好,因此比較容易進行平臺標準化配置。
根據目前的開源生態,我們主要推薦了兩種數據同步工具,一個是Kettle,一個是DolphinScheduler集成的DataX。
1.Kettle
對于Kettle,我們一般按照系統+業務模塊來劃分Kettle數據抽取任務。
第一步,把對應數據庫的JDBC驅動都加入到data-integration\lib目錄下,然后重新打開Spoon.bat。
第二步,在新創建的轉換里面創建DB連接。

在彈出的頁面選擇對應的數據庫,填寫相關信息并保存。

針對DB連接設置“共享”,可以在多個Kettle中共享相同的數據庫鏈接信息。
第三步,在Kettle開發視圖中拖入一個表輸入組件和一個表輸出組件。
在表輸入組件和表輸出組件中分別選擇不同的數據庫連接,表輸入支持選擇一張表自動生成SQL語句,也支持手寫SQL語句。

表輸出組件則支持自動獲取表結構和自動生成目標表。通過點擊獲取字段,即可直接獲取表輸入查詢到的字段信息。

圖14-4 Kettle表輸出界面
點擊SQL,即可在彈出的窗口中看到工具自動生產的建表語句,再點擊“執行”,Kettle會自動完成目標表的創建。當然,這個建表語句是比較粗糙的,我們一般需要按照指定的規范來手工創建,需要指定分布鍵。

第四步,將輸入組件和輸出組件用線連起來,就組成了一個數據同步任務。

第五步,將上述組件一起復制多份,修改來源表、目標表、刷新字段,即可完成大量的數據同步任務。

第六步,直接點“開始”圖標運行數據同步任務或者通過Kettle的左右來調度數據同步任務。
2.DataX
由于DataX數據同步工具本身是沒有界面化配置的,因此我們一般會配套安裝DataX-web或者DolphinScheduler調度工具。DolphinScheduler集成DataX的配置也很簡單,只需要在DolphinScheduler的配置文件中指定DATAX_HOME即可。

在DolphinScheduler后臺配置datax任務,這里以MySQL數據源為例,數據流配置如下。
首先在數據源中心配置MySQL數據源。

然后在項目管理里面創建數據流任務,在畫布上拉去DataX類型配置第一個任務,選擇剛才配置的MySQL數據源。

保存以后,系統就會自動生成數據同步的工作量,將數據流上線,并配置定時調度策略,即可完成數據的定時同步。

一般來說,數據倉庫的接口都符合二八規律,即20%的表存儲了80%的數據,因此這20%的表數據抽取特別耗費時間。此時,對于批處理來說,最好的方法是,對于80%數據量較小的表,采用流水線作業的方式,快速生成接口表、接口程序、接口任務,通過全量接口快速抽取、先清空后插入目標表;針對20%數據量較大的表,則需要精耕細作,確定一個具體可行的增量方案。
我認為一般滿足以下條件之一就是較大的表:①抽取時間超過10分鐘;②單表記錄數超過或者接近100萬;③接口數據超過1GB。之所以如此定義,是從數據接口的實際情況出發。第一,抽取時間超過10分鐘,會影響整體調度任務的執行時間;第二,單表記錄數超過100萬,則插入數據占用數據庫大量的資源,會影響其他任務的插入,降低系統的并發能力;第三,數據傳輸超過1GB,則需要耗費大量的網絡寬帶,每天重復一次會增加網絡負擔。
對于需要做增量的接口表,主要推薦以下兩種批處理方案。
方案一:根據數據創建或者修改時間來實現增量
很多業務系統一般都會在表結構上增加創建和修改時間字段,并且存在主鍵或者唯一鍵(可以是一個字段,也可以是多個字段組合),同時確保數據不會被物理刪除,這種表適合方案一。實際情況是,各大OLTP系統的數據庫都可以滿足記錄創建和修改時間信息的,因此這種方式應用最廣泛。
對于創建或者修改時間,MySQL數據庫可以在建表時指定字段默認值的方式來生成。
`create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '創建時間', `update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新時間'也可以在建表以后通過增加字段的方式來補充。
-- 修改create_time 設置默認時間 CURRENT_TIMESTAMP ALTER TABLE `tb_course`MODIFY COLUMN `create_time` datetime NULL DEFAULT CURRENT_TIMESTAMP COMMENT '創建時間' ;-- 添加update_time 設置 默認時間 CURRENT_TIMESTAMP 設置更新時間為 ON UPDATE CURRENT_TIMESTAMP ALTER TABLE `tb_course`ADD COLUMN `update_time` datetime NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新時間' ;Oracle數據庫默認情況下只能記錄創建時間,不能記錄修改時間。 --先添加一個date類型的字段alter tabletb_courseaddcreate_timedate; --將該字段默認為系統時間alter tabletb_coursemodifycreate_timedefault sysdate;如果需要記錄修改時間,則需要通過觸發器或者修改更新語句來實現。觸發器的腳本如下。 CREATE OR REPLACE TRIGGERtrig_tb_course afterINSERT OR UPDATE ON tb_course--新增和修改執行后出發,對象目標:tb_course表,執行后觸發器對業務交易影響比較小FOR EACH ROW --行級觸發器,每影響一行觸發一次BEGIN IF INSERTING THEN --插入數據操作 :NEW.createtime := SYSDATE; ELSIF UPDATING then --修改數據操作 :NEW.createtime := SYSDATE; END IF; END;有了創建或者修改時間以后,每次抽取最近幾天(一般建議3天)的數據,則直接在where條件后面加上下面的過濾條件。
--取最近3天插入或者更新的記錄where create_time >= cast(date_add(CURRENT_DATE,interval -3 day) as datetime)or update_time >= cast(date_add(CURRENT_DATE,interval -3 day) as datetime)DataX或者Kettle在抽取數據時直接在SQL語句上加上上述條件即可,數據寫入臨時表,筆者一般以_incr作為臨時表后綴。
抽取到變化的數據以后,將前后數據進行合并即可完成增量數據更新。一般情況下我們可能會采用MERGE INTO的方式進行數據合并,這里推薦先刪除后插入的方式。首先,MERGE只有少數數據庫支持,雖然Greenplum也支持,但是功能不夠完善,語法比較復雜。其次對于大多數數據庫而言,刪除比更新更快,因此推薦先刪除后插入的方式。如果變化的數據不大,可以直接采用刪除再插入的方式;如果變化的數據太大,刪除的效率太低,則需要借助第三張表來完成數據的合并。先刪除后插入的語句示例如下,假設DRP系統的item_info表是一張商品主數據,數據量大,但是變化頻率不高,則我們可以通過下面的語句來合并增量數據。
--先刪除有過變化的數據delete from ods_drp.ods_drp_item_info twhere exists(select 1 from ods.ods_drp_item_info_incr bwhere t.item_id = b.item_id);--然后插入新抽取過來的數據insert into ods_drp.ods_drp_item_infoselect t.*,current_timestamp() insert_timefrom ods_drp.ods_drp_item_info_incr t;方案二:增加觸發器記錄創建或者修改時間來實現增量
對于業務系統,我們經常遇到有些表要么沒有創建、修改時間,要么存在記錄物理刪除的情況,因此無法通過方案一實現增量。結合HANA數據庫的特點,我們最后采用了創建觸發器來記錄業務數據創建、修改時間的方案。
這種方案下,我們需要針對每一張增量接口表,創建一張日志表,包括接口表的主鍵字段、操作標志、操作時間。每次抽取數據需要用日志表關聯業務數據,然后抽取一段時間內新增、修改、刪除的記錄到數據中臺數據庫,最后根據操作標志+操作時間對目標表數據進行更新。
本方案雖然看上去對交易系統的侵入性較高,很難被接受,但其實是一個非常好用的增量方案,適合任何場景。首先,觸發器是Oracle、DB2、HANA等數據庫系統標配的功能,在表上增加after觸發器對業務交易影響微乎其微。其次,抽取數據的時間一般都在業務空閑時間,業務表和日志表的關聯不會影響正常交易。第三,本方案可以捕捉數據的物理刪除操作,可以保證數據同步100%的準確性。
下面,我們以 S/4 HANA的EKPO表為例進行方案解析。首先創建EKPO變更日志表。
--創建EKPO變更日志表,需要包含主鍵字段和變更標志、變更時間字段CREATE TABLE HANABI.DI_EKPO_TRIG_LOG ( EBELN CHAR(10) , EBELP CHAR(10), FLAG CHAR(5) , INSERT_TIME SECONDDATE?);然后給EKPO表添加觸發器。
--INSERT觸發器CREATE TRIGGER DI_TRIGGER_EKPO_I AFTER INSERT ON HANADB.EKPOREFERENCING NEW ROW MYNEWROWFOR EACH ROWBEGININSERT INTO HANABI.DI_EKPO_TRIG_LOG VALUES(:MYNEWROW.EBELN, :MYNEWROW.EBELP , 'I' , CURRENT_TIMESTAMP );END;--UPDATE觸發器CREATE TRIGGER DI_TRIGGER_EKPO_U AFTER UPDATE ON HANADB.EKPOREFERENCING NEW ROW MYNEWROWFOR EACH ROWBEGININSERT INTOHANABI.DI_EKPO_TRIG_LOG VALUES (:MYNEWROW.EBELN, :MYNEWROW.EBELP , 'U' ,CURRENT_TIMESTAMP ) ;END;--DELETE觸發器CREATE TRIGGER DI_TRIGGER_EKPO_D AFTER DELETE ON HANADB.EKPOREFERENCING OLD ROW MYOLDROWFOR EACH ROWBEGININSERT INTOHANABI.DI_EKPO_TRIG_LOG VALUES (:MYOLDROW.EBELN, :MYOLDROW.EBELP , 'D' ,CURRENT_TIMESTAMP );END ;有了變更日志表以后,用變更日志表關聯源表,就可以得到源表新發生的所有增、刪、改記錄時間。
#查詢一段時間內EKPO表新增、修改、刪除的記錄信息select tr.flag op_flag,tr.insert_time op_time,tb.mandt,tr.ebeln,tr.ebelp,uniqueid,loekz,statu,aedat,matnr,--此處省略其余字段 from HANABI.DI_EKPO_TRIG_LOG tr left join HANADB.ekpo tb on tr.ebeln = tb.ebeln and tr.ebelp = tb.ebelpwhere?tr.insert_time?BETWEEN?to_TIMESTAMP('${start_time}','YYYY-MM-DD-HH24:MI:SS')?AND to_TIMESTAMP('${end_time}','YYYY-MM-DD HH24:MI:SS')記錄上次抽取時間的方案可以更加靈活地控制抽取數據的區間。為了抽取的數據不會遺漏,我們一般根據數據量預留10分鐘的重疊區間。
首先,我們需要創建增量數據抽取的控制參數表ctl_ods_sync_incr。
| 字段名 | 字段類型 | 字段長度 | 小數位 | 是否主鍵 | 字段描述 |
| schema_name | varchar | 40 | 否 | 模式名 | |
| table_name | varchar | 40 | 是 | 表名 | |
| last_sysn_time | timestamp | 6 | 否 | 上次同步時間 |
然后,我們在抽取腳本中讀取和更新抽取日志表。
#!bin/bash#GP的用戶名export gpuser="xxxx"#GP的密碼export gppass="xxxx"#目標數據庫模式名export gp_schema="ods_s4"# 目標數據庫表名export gp_table="ods_s4_ekpo_i"# 數據源地址export datasource="s4"#為了避免丟失數據,從上次抽取時間的十分鐘前開始抽取數據result=`psql -h gp-master -p 5432 -U ${gpuser} -d ${gppass} << EOF select to_char(last_sync_time + '-10 sec', 'yyyy-mm-dd-HH24:MI:SS') from cfg.ctl_ods_sync_incr where table_name ='ods_s4_ekpo'; EOF`start_time=`echo $result | awk -F' ' '{print $3}'`end_time=$(date "+%Y-%m-%d %H:%M:%S")#輸出抽取時間日期echo "now sqoop data from ${start_time} to ${end_time}"export querySql="select tr.flag op_flag,tr.insert_time op_time,tb.mandt,tr.ebeln,tr.ebelp,uniqueid ,loekz,statu,aedat,txz01,matnr,#此處省略其余字段 from HANABI.DI_EKPO_TRIG_LOG tr inner join HANADB.ekpo tb on tr.ebeln = tb.ebeln and tr.ebelp = tb.ebelpwhere tr.insert_time BETWEEN to_TIMESTAMP('${start_time}','YYYY-MM-DD-HH24:MI:SS') AND to_TIMESTAMP('${end_time}','YYYY-MM-DD HH24:MI:SS')"cat>dataxjob.json<{ "job": { "name": "in-$db-$table", "content": [{ "reader": { "name": "hanareader", "parameter": { "dsDatasource" : "$datasource", "jsonLine" : true, "connection": [ {"querySql": ["$querySql"]}] } }, "writer": { "name": "gpdbwriter", "parameter": { "username": $gpuser, "password": $gppass, "preSql": [ "truncate table $gp_schema.$gp_table" ], "column": [{"name": "body","type": "string"}] "segment_reject_limit": 0, "copy_queue_size": 2000, "num_copy_processor": 1, "num_copy_writer": 1, "connection": [ { "jdbcUrl": "jdbc:postgresql://gp-master:5432/dp", "table": [ "$gp_schema.$gp_table" ] } ] } } } ], "setting": { "speed": { "channel": 1 } } }}EOFpython $DATAX_HOME/bin/datax.py --jobid ${system.taskId} dataxjob.json if [ $? -ne 0 ]; then echo "DataX failed,next time try again!"else echo "DataX succeed ,now update cfg.ctl_ods_sync_incr表" psql -h gp-master -p 5432 -U ${gpuser} -d ${gppass} << EOF update cfg.ctl_ods_sync_incr set last_sync_time = to_timestamp('${end_time}', 'YYYY-MM-DD HH24:MI:SS') where table_name ='ods_s4_ekpo'EOFfi在保證數據抽取過程中不遺漏數據的前提下,我們需要對新抽取到的數據和歷史數據進行合并。由于數據可能存在刪除和多次修改的情況,我們的數據更新操作會比方案一更加復雜,需要在插入或者刪除數據的過程中做一些開窗函數排序取最新的記錄,操作語句如下。
--所有存在插入、刪除、更新的數據全部從目標表刪掉delete from ods_d4.ods_d4_ekpo t where exists (select 1 from ods_d4.ods_d4_ekpo_incr b where b.op_flag in ('I', 'D', 'U') and t.ebeln = b.ebeln and t.ebelp = b.ebelp);--插入最后一次操作不是刪除的數據insert into ods_d4.ods_d4_ekposelect mandt,t.ebeln,t.ebelp,uniqueid,loekz,statu,aedat,txz01,matnr,--此處省略其余字段 from (select row_number() over(partition by b.ebeln,b.ebelp order by b.op_time desc,b.op_flag desc) rank_num, b.* from ods_d4.ods_d4_ekpo_incr b where b.op_flag in ('I', 'U' ,'D')) t where t.rank_num = 1 and t.op_flag <>'D';為什么接口會有這么復雜的邏輯,這是我們多次實踐總結出來的經驗。這個增量接口方式我們核對和修復了不少于5次漏洞才最終實現準確快速又穩定的增量接口方案。
流式數據同步通過上節的增量數據接口可以看出,不管是方案一還是方案二,都需要業務系統數據庫做出一定的調整(當然,如果某些業務系統數據庫已經在設計的時候考慮到創建和更新時間,則不需要修改),一次性抽取大量的數據會對交易數據庫產生壓力。基于上述原因,對于類MySQL數據庫,我們推薦CDC日志的同步方式。其他數據庫如果可以滿足CDC日志的要求,也可以采用這種方式。
基于CDC日志同步的方案,也稱作流式數據同步方案,一個典型的數據采集流程如下圖。

第一步,需要對業務數據庫進行分析,分析數據庫是否支持CDC日志。一般來說,業務數據通常保存在關系型數據庫中,從數據庫的發展來看,MySQL對CDC日志的支持是最好的。
第二步,需要有DBA權限的管理員開啟數據庫的CDC日志功能。其中MySQL數據庫的CDC功能開啟過程如下。
|
使用命令行工具連接到MySQL數據庫所在服務器,執行以下命令以root用戶登錄數據庫。 mysql -uroot -ppassword 其中,password為數據庫root用戶的密碼,可向數據庫管理員獲取。 執行以下命令,查詢MySQL數據庫是否開啟了Binlog。 show variables like 'log_bin'; 若變量log_bin的值為“OFF”,則說明Binlog未開啟,繼續執行下一步。 若變量log_bin的值為“ON”,則說明Binlog已開啟,繼續執行以下SQL命令,檢查相關參數的配置是否符合要求。 show variables like '%binlog_format%'; show variables like '%binlog_row_image%'; 變量binlog_format的值應該為ROW,變量binlog_row_image的值應該為FULL。如果滿足要求,直接跳到2,否則繼續執行下一步。 執行以下命令編輯MySQL配置文件,然后按“i”進入輸入模式。在配置文件中增加如下配置,開啟Binlog。 vi /etc/my.cnf server-id = 123log_bin = mysql-binbinlog_format = rowbinlog_row_image = fullexpire_logs_days = 10gtid_mode = onenforce_gtid_consistency = on其中,server-id的值應為大于1的整數,請根據實際規劃設置,并且在創建數據集成任務時設置的Server Id值需要此處設置的值不同。 expire_logs_days為Binlog日志文件保留時間,超過保留時間的Binlog日志會被自動刪除,應保留至少2天的日志文件。 “gtid_mode = on”和“enforce_gtid_consistency = on”僅當MySQL的版本大于、等于5.6.5時才需要添加,否則刪除這兩行內容。 按“ESC”鍵退出輸入模式,然后輸入“:wq”并回車,保存后退出。 執行以下命令重啟MySQL數據庫。 service mysqld restart 以root用戶登錄數據庫,執行以下命令,查詢變量log_bin的值是否為ON,即是否已開啟Binlog。 show variables like 'log_bin'; 在數據庫中執行以下命令創建ROMA Connect連接數據庫的用戶并配置權限。 CREATE?USER?'roma'@'%'?IDENTIFIED?BY?'password';GRANT?SELECT,?RELOAD,?SHOW?DATABASES,?REPLICATION?SLAVE,?REPLICATION?CLIENT?ON?*.*?TO?'roma'@'%';其中,roma為ROMA Connect連接用戶名,請根據實際規劃設置。password為ROMA Connect連接用戶密碼,請根據實際規劃設置。 如果MySQL數據庫版本為8.0,則需要執行以下命令,修改數據庫連接用戶的密碼認證方式(可選)。 ALTER USER roma IDENTIFIED WITH mysql_native_password BY 'password'; 其中,roma為2中創建的數據庫連接用戶名。password為數據庫連接用戶的密碼。 執行以下命令退出數據庫連接。 exit; |
第三步,通過Canal、MaxWell、Debezium等開源組件讀取數據庫日志,將數據庫轉換成JSON對象寫入Kafka隊列。
第四步,通過Greenplum的GPSS讀取Kafka消息或者通過Flink程序(Spark也可以)讀取Kafka數據更新寫入Greenplum數據庫。
隨著各個開源組件功能的完善,其中有些步驟已經可以省略了。例如Flink社區開發了flink-cdc-connectors組件,這是一個可以直接從MySQL、PostgreSQL等數據庫直接讀取全量數據和增量變更數據的source組件。
日志流數據同步隨著大數據技術的發展,日志數據的分析也成為數據中臺必不可少的一個功能。一般來說,日志數據會以日志流的方式進入系統。
我們這里所說的日志數據,一般是指Web應用或者手機程序的埋點日志,一般包括用戶點擊、瀏覽、鼠標停留、收藏、加購、分享等操作信息。這些操作雖然未產生真正的業務價值,但是可以用來進行用戶行為分析和業務促銷活動。
典型的用戶行為數據采集流程:

第一步,需要前端工程師或者App開發者在Web或者APP應用頁面添加埋點信息,搜集用戶的操作,并通過異步Post的方式發生給Ngnix服務器。
第二步,Ngnix服務器接收到用戶的數據后,以文本格式記錄請求參數等信息至access.log。
第三步,Flume實時監控Nginx日志變化,收集并過濾有用日志,之后發送至Kafka。
第四步,埋點數據到達Kafka后,通過Spark程序或者Flink程序完成日志解析,解析完成的數據格式化保存到Greenplum數據庫的ODS層。
至此,日志數據采集和解析的工作完成了。
正如文章的章節分布一樣,在數據中臺實現過程中,最難做也是最費時間的就是增量數據接口。想要實現精準的增量數據快照,不僅有很多前置要求,也有很多數據合并的特殊處理。本文分析的內容適合絕大多數需要做增量接口的場景。而全量接口、實時接口、日志接口大多數情況下主要搭好平臺做好配置即可。
下一篇:搭建指標體系的底層邏輯...