日日碰狠狠躁久久躁96avv-97久久超碰国产精品最新-婷婷丁香五月天在线播放,狠狠色噜噜色狠狠狠综合久久 ,爱做久久久久久,高h喷水荡肉爽文np肉色学校

睿治

智能數據治理平臺

睿治作為國內功能最全的數據治理產品之一,入選IDC企業數據治理實施部署指南。同時,在IDC發布的《中國數據治理市場份額》報告中,連續四年蟬聯數據治理解決方案市場份額第一。

高效打通實時數據到特征工程

時間:2022-04-20來源:一壺溫酒瀏覽數:210

導讀:人工智能工程化落地的關鍵點之一,在于解決真實業務場景的實時批量預估和實時模型更新問題。更好更快地將線上實時數據轉化為AI可用的特征,將加速AI應用落地的效率及效果。為此,OpenMLDB 和 Apache Pulsar 合作推出OpenMLDB Pulsar Connector,實現穩定的流式集成,為高效打通實時數據到特征工程提供一條值得期待的清晰路徑。我是黃威,目前是第四范式研發架構師,也是OpenMLDB的核心研發。今天主要為大家介紹三個方面的內容:

Pulsar Connector簡介

OpenMLDB Connector on Pulsar介紹

OpenMLDB Connector on Pulsar演示

01Pulsar Connector簡介

Apache Pulsar 是一個云原生的,分布式消息流平臺。它可以作為 OpenMLDB 的在線數據源,將實時的數據流導入到 OpenMLDB 在線。Pulsar 提供了Connector 框架,在此基礎上可以與不同系統的對接。我們基于Connector框架,開發了 OpenMLDB JDBC Connector,通過它我們就可以無障礙地連接 Pulsar與OpenMLDB,Pulsar的消息將自動地寫入OpenMLDB。

02OpenMLDB-Pulsar Connector介紹

1. 定位

OpenMLDB Pulsar Connector,高效打通實時數據到特征工程,大幅提升數據使用效率、助力開發者構建實時數據管道、使企業更專注和更高效的探索數據的商業價值。

在OpenMLDB 的工作流中,Pulsar Connector(位置如下圖所示)幫助開發者輕松地將消息系統Pulsar與開源機器學習數據庫OpenMLDB連接起來,形成一條實時數據流。

2. 功能

Pulsar可以使用connector來連接其他系統。Source connector可以使其他系統的數據流入Pulsar,sink connector可以將消息流出至其他系統。

OpenMLDB Pulsar Connector支持了sink功能,使Pulsar消息可以寫入到OpenMLDB在線存儲中。

可以通過 Connector Admin CLI并結合 sinks 子命令來管理 Pulsar connector(例如,創建、更新、啟動、停止、重啟、重載、刪除以及其他操作)。

3. 優勢

想要使OpenMLDB與Pulsar擁有穩定的流式集成,我們推薦直接使用Pulsar OpenMLDB connector 。它具備諸多優勢,包括但不限于:

易上手。無需編寫任何代碼,只需進行簡單配置,便可通過OpenMLDB Pulsar Connector 將Pulsar的消息流入 OpenMLDB 。簡化的數據導入過程能大幅提升企業的數據使用效率。

易擴展。根據不同的業務需求,可以選擇在單機或集群上運行 OpenMLDB Pulsar Connector ,助力企業構建實時數據管道。

可持續。OpenMLDB Pulsar Connector 簡單的安裝和部署過程,使企業能更專注和更高效地探索數據的商業價值。

4. Connector下載地址

OpenMLDB Pulsar Connector:

https://github.com/4paradigm/OpenMLDB/releases/download/v0.4.4/pulsar-io-jdbc-openmldb-2.11.0-SNAPSHOT.nar

03Connector演示

1. 流程介紹

Pulsar OpenMLDB connector 用于 OpenMLDB 線上模式的實時數據流接入。使用connector的簡要流程,如下圖所示。我們接下來將詳細介紹每一步。

整體上,使用流程可以概括為三步:

創建connector前需要啟動OpenMLDB集群,并創建表。

創建Pulsar standalone,創建sink,sink配置中使用OpenMLDB集群的JDBC地址。并且,創建用于解析消息的schema。

向Pulsar發送消息,來測試消息是否能自動寫入到OpenMLDB。

2. 關鍵步驟

注意,為了使演示更簡單,本文中將使用Pulsar Standalone,OpenMLDB集群和一個簡單JSON消息生產者程序,來演示OpenMLDB JDBC Connector是如何工作的。該connector是完全可以在Pulsar Cluster中正常使用的。

步驟1 | 在 OpenMLDB 創建數據庫和數據表

啟動 OpenMLDB 集群

使用Docker可以快速啟動OpenMLDB,除此之外,我們還需要創建測試用的表。

提醒:目前只有OpenMLDB集群版可以作為sink的接收端,數據只會sink到集群的在線存儲中。

我們更推薦你使用host network模式運行docker,以及綁定文件目錄files,sql腳本在該目錄中。

docker run -dit --network host -v `pwd`/files:/work/taxi-trip/files --name openmldb 4pdosc/openmldb:0.4.4 bashdocker exec -it openmldb bash

在OpenMLDB容器中,啟動集群:

./init.sh

需要注意的是,在macOS平臺上,即使使用host網絡,也不支持從容器外部去連接容器內的 OpenMLDB 服務器。但從容器內,去連接別的容器內的OpenMLDB服務,是可行的。

創建表

我們使用一個腳本快速創建表,腳本內容如下:

create database pulsar_test;use pulsar_test;create table connector_test(id string, vendor_id int, pickup_datetime bigint, dropoff_datetime bigint, passenger_count int, pickup_longitude double, pickup_latitude double, dropoff_longitude double, dropoff_latitude double, store_and_fwd_flag string, trip_duration int);desc connector_test;

執行腳本:

../openmldb/bin/openmldb --zk_cluster=127.0.0.1:2181 --zk_root_path=/openmldb --role=sql_client < files/create.sql

目前,Pulsar中JSONSchema和JDBC base connector都不支java.sql.Timestamp。所以我們使用long作為timestamp列的數據類型(在OpenMLDB可以使用long作為時間戳)。

步驟2 | 啟動Pulsar,創建sink和schem

①?啟動 Pulsar Standalone

使用docker,可以更簡單快速地啟動Pulsar。我們推薦你使用host network來運行docker,這樣可以避免諸多容器相關的網絡連接問題。而且,我們需要使用pulsar-admin來進行sink創建,這個程序在Pulsar鏡像內。所以,我們使用bash運行容器,在容器內部逐一執行命令。此處,也需要綁定files文件目錄。

docker run -dit --network host -v `pwd`/files:/pulsar/files --name pulsar apachepulsar/pulsar:2.9.1 bashdocker exec -it pulsar bash

在Pulsar容器中,啟動standalone服務端。

bin/pulsar-daemon start standalone --zookeeper-port 5181

OpenMLDB服務已經使用了端口2181,所以此處我們為Pulsar重新設置一個zk端口。我們將使用端口2181來連接OpenMLDB,但Pulsar standalone內的zk端口不會對外造成影響。

你可以檢查一下Pulsar是否正常運行,可以使用ps或者檢查日志。

ps axu|grep pulsar

當你啟動一個本地standalone集群,會自動創建pulic/default?namesapce。這個namespace用于開發,我們將在此namespace中創建sink。

如果你想要在本地直接啟動Pulsar,可以參考Set up a standalone Pulsar locally。

鏈接:https://pulsar.apache.org/docs/en/standalone/

Q&A

Q1: 碰到以下問題是什么原因

2022-04-07T03:15:59,289+0000 [main] INFO ?org.apache.zookeeper.server.NIOServerCnxnFactory - binding to port 0.0.0.0/0.0.0.0:5181

2022-04-07T03:15:59,289+0000 [main] ERROR org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble - Exception while instantiating ZooKeeper

java.net.BindException: Address already in use

A: Pulsar需要一個未被使用的端口來啟動zk,端口5181頁已經被使用,需要再更改一下–zookeeper-port的端口號。

Q2: 8080端口已被使用?

A: 8080是webServicePort默認配置端口,在conf/standalone.conf中,可以更換這個端口。但注意,pulsar-admin會使用conf/client.conf中的webServiceUrl進行連接,也需要同步更改。

Q3: 6650端口已被使用?

A: 需要同步更改conf/standalone.conf中的brokerServicePort和conf/client.conf中的brokerServiceUrl配置項。

Connector安裝(Optional)

前面的步驟中我們綁定了files目錄,里面已經提供了connector的nar包。我們可以使用“非內建connector”模式來設置connector(即在sink配置中指定archive配置項,將在下一個步驟中描述)。

但如果你希望將OpenMLDB connector作為內建的connector,你需要創建connectors目錄,并拷貝nar文件到connectors目錄。

mkdir connectorscp files/pulsar-io-jdbc-openmldb-2.11.0-SNAPSHOT.nar connectors

如果在Pulsar運行時,你想改變或增加connector,你可以通知Pulsar更新信息:

bin/pulsar-admin sinks reload

當OpenMLDB connector成為內建connector時,它的sink類型名為jdbc-openmldb,你可以直接使用這個類型名來指定使用OpenMLDB connector。

③?創建sink

我們使用public/default這個namespace來創建sink, 我們需要一個sink的配置文件, 它在files/pulsar-openmldb-jdbc-sink.yaml,內容如下:

tenant: "public" namespace: "default" name: "openmldb-test-sink" archive: "files/pulsar-io-jdbc-openmldb-2.11.0-SNAPSHOT.nar" inputs: ["test_openmldb"] configs: jdbcUrl: "jdbc:openmldb:///pulsar_test?zk=localhost:2181&zkPath=/openmldb" tableName: "connector_test"

其中:

name:sink名。

archive:我們使用archive來指定sink connector, 所以這里我們是將OpenMLDB connector當作非內建connector使用。

input:可以是多個topic的名字,本文只使用一個。

config:用于連接OpenMLDB集群的jdbc配置。

接下來,創建這個sink并檢查。注意,我們設置的輸入topic是‘test_openmldb’,后續步驟需要使用到。

./bin/pulsar-admin sinks create --sink-config-file files/pulsar-openmldb-jdbc-sink.yaml./bin/pulsar-admin sinks status --name openmldb-test-sink

④ 創建 Schema

上傳schema到topic test_openmldb,schema類型是JSON格式。后續步驟中,我們將生產一樣schema的JSON消息。schema文件是files/openmldb-table-schema,內容如下:

{ "type": "JSON", "schema":"{\"type\":\"record\",\"name\":\"OpenMLDBSchema\",\"namespace\":\"com.foo\",\"fields\":[{\"name\":\"id\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"vendor_id\",\"type\":\"int\"},{\"name\":\"pickup_datetime\",\"type\":\"long\"},{\"name\":\"dropoff_datetime\",\"type\":\"long\"},{\"name\":\"passenger_count\",\"type\":\"int\"},{\"name\":\"pickup_longitude\",\"type\":\"double\"},{\"name\":\"pickup_latitude\",\"type\":\"double\"},{\"name\":\"dropoff_longitude\",\"type\":\"double\"},{\"name\":\"dropoff_latitude\",\"type\":\"double\"},{\"name\":\"store_and_fwd_flag\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"trip_duration\",\"type\":\"int\"}]}", "properties": {} }

上傳并檢查schema的命令,如下所示:

./bin/pulsar-admin schemas upload test_openmldb -f ./files/openmldb-table-schema./bin/pulsar-admin schemas get test_openmldb

步驟3?| 測試

① 發送消息

我們使用兩條OpenMLDB鏡像中data/taxi_tour_table_train_simple.csv的樣本數據,作為測試用的消息。數據如下圖所示:

測試用Producer關鍵代碼如下:

可以看到,producer將發送兩條消息到topic test_openmldb。這之后,Pulsar將讀到消息,并將其寫入OpenMLDB集群的在線存儲中。

程序包在files中,你可以直接運行它:

java -cp files/pulsar-client-java-1.0-SNAPSHOT-jar-with-dependencies.jar org.example.Client

② 檢查

我們可以檢查Pulsar中的sink狀態:

./bin/pulsar-admin sinks status --name openmldb-test-sink

“numReadFromPulsar”: pulsar發送了2條message到sink實例中。

“numWrittenToSink”: sink實例向OpenMLDB寫入2條message。

同樣,我們可以在OpenMLDB在線存儲中查詢到這些消息數據。查詢腳本select.sql內容如下:

set @@execute_mode='online';use pulsar_test;select *, string(timestamp(pickup_datetime)), string(timestamp(dropoff_datetime)) from connector_test;

在OpenMLDB容器中執行腳本:

../openmldb/bin/openmldb --zk_cluster=127.0.0.1:2181 --zk_root_path=/openmldb --role=sql_client < files/select.sq

04寫在最后

1. OpenMLDB上下游生態體系

為更好降低開發者使用OpenMLDB的門檻,OpenMLDB社區將持續打造面向上下游技術組件的生態圈,為開發者提供更多簡單易用的生態Connector(如下圖所示):

面向線上數據生態,如Kafka, Flin, RabbitMQ, RocketMQ等

面向離線數據生態,如HDFS, HBase, Cassandra, S3等

面向模型構建的算法、框架,例如XGBoost, LightGBM, TensorFlow, PyTorch,?Scikit Learn等

面向機器學習建模全流程的調度框架、部署工具,例如Airflow,Kubeflow,DolphinScheduler,Prometheus,Grafana等

2. OpenMLDB Roadmap v0.5.0

OpenMLDB社區將于4月底發布v0.5.0版本(鏈接:https://github.com/4paradigm/OpenMLDB/issues/1506),屆時OpenMLDB將具備新特性如下:

窗口預聚合技術,指數級提升長窗口聚合性能

完善的監控,?trace 和?profiling?能力,在企業級應用環境中大幅提升穩定性、可觀測性、和可分析性

線上存儲引擎可插拔以適配不同業務需求,既可以支持基于內存的高性能存儲引擎,也可以支持基于外存的大容量低成本存儲引擎,還可以支持基于持久內存的存儲引擎以在性能和成本間保持平衡

用戶自定義函數(UDF)支持,大幅提升易用性和適用性

上下游數據源生態整合,提供線上數據源的?Kafka,?Pulsar?connectors

05相關閱讀

https://github.com/4paradigm/OpenMLDB/issues/1506

(OpenMLDB Pulsar Connector)

https://openmldb.ai/docs/zh/v0.4/about/index.html

(OpenMLDB文檔)

https://pulsar.apache.org/docs/en/next/io-connectors/

(Apache Pulsar connector文檔, OpenMLDB Pulsar Connector位置如圖所示)

希望這篇文章能夠幫助大家認識Pulsar Connector的開發流程,理解OpenMLDB Connector on Pulsar是什么樣的,了解Pulsar如何接入OpenMLDB。

最后,AI 的進步需要付出多方面的努力,而開放式協作是其中的關鍵環節,我們期待來自開發者的交流討論。歡迎大家加入OpenMLDB社區,掃描下方二維碼可加入社區技術交流微信群。


(部分內容來源網絡,如有侵權請聯系刪除)
立即申請數據分析/數據治理產品免費試用 我要試用
customer

在線咨詢

在線咨詢

點擊進入在線咨詢