日日碰狠狠躁久久躁96avv-97久久超碰国产精品最新-婷婷丁香五月天在线播放,狠狠色噜噜色狠狠狠综合久久 ,爱做久久久久久,高h喷水荡肉爽文np肉色学校

睿治

智能數據治理平臺

睿治作為國內功能最全的數據治理產品之一,入選IDC企業數據治理實施部署指南。同時,在IDC發布的《中國數據治理市場份額》報告中,連續四年蟬聯數據治理解決方案市場份額第一。

數據血緣構建及應用

時間:2022-10-09來源:別惹我我是男神瀏覽數:1292

一、前言

數據血緣是元數據管理、數據治理、數據質量的重要一環,追蹤數據的來源、處理、出處,對數據價值評估提供依據,描述源數據流程、表、報表、即席查詢之間的流向關系,表與表的依賴關系、表與離線ETL任務,調度平臺,計算引擎之間的依賴關系。數據倉庫是構建在Hive之上,而Hive的原始數據往往來自于生產DB,也會把計算結果導出到外部存儲,異構數據源的表之間是有血緣關系的。

數據血緣用途:

追蹤數據溯源:當數據發生異常,幫助追蹤到異常發生的原因;影響面分析,追蹤數據的來源,追蹤數據處理過程。

評估數據價值:從數據受眾、更新量級、更新頻次等幾個方面給數據價值的評估提供依據。

生命周期:直觀地得到數據整個生命周期,為數據治理提供依據。

安全管控:對源頭打上敏感等級標簽后,傳遞敏感等級標簽到下游。

本文介紹攜程數據血緣如何構建及應用場景。第一版T+1構建Hive引擎的表級別的血緣關系,第二版近實時構建Hive,Spark,Presto多個查詢引擎和DataX傳輸工具的字段級別血緣關系。?

二、構建血緣的方案

方案一:只收集SQL,事后分析。

當SQL執行結束,收集SQL到DB或者Kafka。

優點:當計算引擎和工具不多的時候,語法相對兼容的時候,用Hive自帶的LineageLogger重新解析SQL可以獲得表和字段級別的關系。

缺點:重放SQL的時候可能元數據發生改變,比如臨時表可能被Drop,沒有臨時自定義函數UDF,或者SQL解析失敗。

方案二:運行時分析SQL并收集。

當SQL執行結束后立即分析Lineage,異步發送到Kafka。

優點:運行時的狀態和信息是最準確的,不會有SQL解析語法錯誤。

缺點:需要針對各個引擎和工具開發解析模塊,解析速度需要足夠快。

Apache Atlas

Apache Atlas是Hadoop社區為解決Hadoop生態系統的元數據治理問題而產生的開源項目,它為Hadoop集群提供了包括數據分類、集中策略引擎、數據血緣、安全和生命周期管理在內的元數據治理核心能力。官方插件支持HBase、Hive、Sqoop、Storm、Storm、Kafka、Falcon組件。

Hook在運行時采集血緣數據,發送到Kafka。Atlas消費Kafka數據,將關系寫到圖數據庫JanusGraph,并提供REST API。

其中Hive Hook支持表和列級別血緣,Spark需要使用GitHub的hortonworks-spark/spark-atlas-connector,不支持列級別,Presto則不支持。

Linkedin DataHub

WhereHows項目已于2018年重新被LinkedIn公司設計為DataHub項目。它從不同的源系統中采集元數據,并進行標準化和建模,從而作為元數據倉庫完成血緣分析。

社區提供了一個Demo,演示地址:https://demo.datahubproject.io/

與Airflow集成較好,支持數據集級別血緣,字段級別在2021Q3的Roadmap。

三、攜程方案

攜程采用了方案二,運行時分析SQL并收集分析結果到Kafka。由于開源方案在現階段不滿足需求,則自行開發。

由于當時缺少血緣關系,對數據治理難度較大,表級別的血緣解析難度較低,表的數量遠小于字段的數量,早期先快速實現了表級別版本。

在16-17年實現和上線了第一個版本,收集常用的工具和引擎的表級別的血緣關系,T+1構建關系。

在19年迭代了第二個版本,支持解析Hive,Spark,Presto多個查詢引擎和DataX傳輸工具的字段級別血緣關系,近實時構建關系。

四、第一個版本-表級別血緣關系

針對Hive引擎開發了一個Hook,實現ExecuteWithHookContext接口,從HookContext可以獲得執行計劃,輸入表,輸出表等豐富信息,異步發送到Kafka,部署的時候在hive.exec.post.hooks添加插件即可。

在17年引入Spark2后,大部分Hive作業遷移到Spark引擎上,這時候針對Spark SQL CLI快速開發一個類似Hive Hook機制,收集表級別的血緣關系。

傳輸工具DataX作為一個異構數據源同步的工具,單獨對其開發了收集插件。

在經過解析處理后,將數據寫到圖數據庫Neo4j,提供元數據系統展示和REST API服務,落地成Hive關系表,供用戶查詢和治理使用。

在元數據系統上,可以查看一張表多層級的上下游血緣關系,在關系邊上會有任務ID等一些屬性。

隨著計算引擎的增加,業務的增長,表級別的血緣關系已經不滿足需求。

覆蓋面不足,缺少Spark ThriftServer , Presto引擎,缺少即席查詢平臺,報表平臺等。

關系不夠實時,期望寫入表后可以快速查詢到關系,用戶可以直觀查看輸入和輸出,數據質量系統,調度系統可以根據任務ID查詢到輸出表,對表執行質量校驗任務。

圖數據庫Neo4j社區版為單機版本,存儲數量有限,穩定性欠佳,當時使用的版本較低,對邊不能使用索引(3.5支持),這使得想從關系搜索到關聯的上下游較為麻煩。

五、第二版本-字段級別血緣關系

之前實現的第一個版本,對于細粒度的治理和追蹤還不夠,不僅缺少對字段級別的血緣關系,也不支持采集各個系統的埋點信息和自定義擴展屬性,難以追蹤完整鏈路來源,并且關系是T+1,不夠實時。

針對各個計算引擎和傳輸工具DataX開發不同的解析插件,將解析好的血緣數據發送到Kafka,實時消費Kafka,把關系數據寫到分布式圖數據JanusGraph。

阿里開源的Druid是一個 JDBC 組件庫,包含數據庫連接池、SQL Parser 等組件。通過重寫MySqlASTVisitor、SQLServerASTVisitor來解析MySQL / SQLServer的查詢SQL,獲得列級別的關系。

計算引擎統一格式,收集輸入表、輸出表,輸入字段、輸出字段,流轉的表達式等一些信息。

Hive

參考 org.apache.hadoop.hive.ql.hooks.LineageLogger 實現,異步發送血緣數據到 Kafka。

Atlas的HiveHook也是實現ExecuteWithHookContext接口,從HookContext獲得LineageInfo,也可以參考HIVE-19288 引入的org.apache.hadoop.hive.ql.hooks.HiveProtoLoggingHook,采集更多引擎相關的信息。

其中遇到幾個問題:

通過HiveServer2執行獲取的start time不正確

HIVE-10957 QueryPlan's start time is incorrect in certain cases

獲取執行計劃空指針,導致收集失敗

HIVE-12709 further improve user level explain

獲取執行計劃有可能出現卡住,可以加個調用超時。

Spark 前置條件:引入 SPARK-19558 Add config key to register QueryExecutionListeners automatically,實現自動注冊QueryExecutionListener。

實現方式:通過實現QueryExecutionListener接口,在onSuccess回調函數拿到當前執行的QueryExecution,通過LogicalPlan的output方法,獲得所有Attribute,利用NamedExpression的exprId映射關系,對其進行遍歷和解析,構建列級別關系。

覆蓋范圍:Spark SQL CLI、Thrift Server、使用Dataset/DataFrame API(如spark-submit、spark-shell、pyspark)

遇到問題:

使用analyzedPlan而不是optimizedPlan,optimizer的執行計劃可能會丟失一些信息,可以在analyzedPlan的基礎上apply一些有助于分析的Rule,如CombineUnions。

傳遞的初始化用的hiveconf/hivevar變量被Thrift Server忽略,導致初始化Connection沒有辦法埋點。

打上Patch SPARK-13983 ,可以實現第一步,傳遞變量,但是這個變量在每次執行新的statement都重新初始化,導致用戶set的變量不可更新。后續給社區提交PR SPARK-26598,修復變量不可更新的問題。

SPARK-13983 Fix HiveThriftServer2 can not get "--hiveconf" and "--hivevar" variables since 2.0

SPARK-26598 Fix HiveThriftServer2 cannot be modified hiveconf/hivevar variables

Drop Table 的限制,DropTableCommand執行成功的時候,該表不一定在之前存在過,如果在Drop之前存在過,元數據也已經被刪除了,無從考證。

在DropTableCommand增加了一個標志位,真正在有執行Drop操作的話再置為True,保證收集的血緣數據是對的。

使用Transform用戶自定義腳本的限制


Transform不像java UDF,只輸入需要用到的字段即可,而是需要將所有后續用到的字段都輸入到自定義腳本,腳本再決定輸出哪些字段,這其中列與列之間的映射關系無法通過執行計劃獲得,只能簡單的記錄輸出列的表達式,如transform(c1,c2,c3) script xxx.py to c4。

Presto

開發Presto EventListener Plugin,實現EventListener接口,從queryCompleted回調函數的QueryCompletedEvent解析得到相應的信息。

上線的時候遇到一個無法加載Kafka加載StringSerializer的問題(StringSerializer could not be found)。

Kafka客戶端使用 Class.forName(trimmed, true, Utils.getContextOrKafkaClassLoader()) 來加載Class,優先從當前線程的ContextClassLoader加載,與Presto的ThreadContextClassLoader有沖突,需要初化始KafkaProducer的時候,將ContextClassLoader暫時置為NULL。https://stackoverflow.com/a/50981469/1673775

JanusGraph是一個開源的分布式圖數據庫。具有很好的擴展性,通過多機集群可支持存儲和查詢數百億的頂點和邊的圖數據。JanusGraph是一個事務數據庫,支持大量用戶高并發地執行復雜的實時圖遍歷。

生產上,存儲我們使用Cassandra,索引使用Elasticsearch,使用Gremlin查詢/遍歷語言來讀寫JanusGraph,有上手難度,熟悉Neo4j的Cypher語法可以使用cypher-for-gremlin plugin。

以下是數據血緣寫入圖數據庫的模型,Hive字段單獨為一個Lable,關系型DB字段為一個Label,關系分兩種,LABELWRITE,LABELWRITE_TTL。

只有輸入沒有輸出(Query查詢操作),只有輸出沒有輸入(建表等DDL操作)也會強制綁定一個來源系統的ID及擴展屬性。

在生產上使用JanusGraph,存儲億級的血緣關系,但是在開發過程中也遇到了一些性能問題。

寫入速度優化

以DB名+表名+字段名作為唯一key,實現getOrCreateVertex,并對vertex id緩存,加速頂點的加載速度。

關系批量刪除

關系LABELWRITETTL表示寫入的關系有存活時間(TTL-Time to live),這是因為在批量刪除關系的時候,JanusGraph速度相當慢,而且很容易OOM。比如要一次性刪除,Label為WRITE,x=y,寫入時間小于等于某個時間的邊,這時候Vertex和Edge load到內存中,容易OOM。

g.E().hasLabel("WRITE").has("x",eq("y")).has("publishedDate",P.lte(new Date(1610640000))).drop().iterate()

嘗試使用多線程+分批次的方式,即N個線程,每個線程刪除1000條,速度也不太可接受。

這時候采用了折中的方案,需要刪除關系用另外一種Label來表示,并在創建Label指定了TTL,由于Cassandra支持cell level TTL,所以邊的數據會自動被刪除。但是ES不支持TTL,實現一個定時刪除ES過期數據即可。

Zeus調度平臺 (ETL操作INSERT、CTAS,QUERY)

Ad-Hoc即席查詢平臺 (CTAS,QUERY)

報表平臺 (QUERY)

元數據平臺 (DDL操作)

GPU平臺 (PySpark)

通過ETL任務ID,查詢任務ID,報表ID,都可以獲取到輸入,輸出的表和字段的關系。

使用MapReduce、Spark RDD讀寫HDFS的血緣暫時沒有實現。

思路可以在JobClient.submitJob的時候采集輸入和輸出路徑,又或者通過HDFS的AuditLog、CallerContext來關聯。

在第一版使用圖的方式展示血緣關系,在上下游關系較多的時候,顯示較為混亂,第二版改成樹狀表格的方式展示。

字段operator在調度系統Zeus被轉換成hive_account,最后輸出是ArtNova報表系統的一張報表。

六、實際應用場景

通過血緣關系篩選,每天清理數千張未使用的臨時表,節約空間。

作為數據資產評估的依據,統計表、字段讀寫次數,生成的表無下游訪問,包括有沒有調度任務,報表任務,即席查詢。

統計一張表的生成時間,而不是統計整個任務的完成時間。

數據異常,或者下線一張表、一個字段的時候,可以找到相關的ETL任務或者報表任務,及時通知下游。

統計表的使用熱度,顯示趨勢。

得益于在圖數據庫JanusGraph可以使用關系邊的key作為索引,可以根據任務ID可以輕松獲得該任務輸入和輸出表。

當配置一個任務A的依賴任務列表的時候,可以使用推薦依賴,檢查依賴功能,獲得任務A的所有輸入表,再通過輸入的表獲得寫入任務ID列表,即為任務A所需依賴的任務列表。

在任務結束后,獲取該任務所有輸出的表,進行預配的規則進行數據質量校驗。

當源頭的數據來自生產DB時,生產DB有些列的標簽已打上了敏感等級,通過血緣關系,下游的表可以繼承敏感等級,自動打上敏感標簽。

七、總結

以上描述了攜程如何構建表和字段級別的血緣關系,及在實際應用的場景。

隨著業務需求和數據的增長,數據的加工流程越來越復雜,構建一套數據血緣,可以輕松查詢到數據之間的關系,進行表和字段級的血緣追溯,在元數據管理,數據治理,數據質量上承擔重要一環。

(部分內容來源網絡,如有侵權請聯系刪除)
立即申請數據分析/數據治理產品免費試用 我要試用
customer

在線咨詢

在線咨詢

點擊進入在線咨詢