国产精品久久久aaaa,日日干夜夜操天天插,亚洲乱熟女香蕉一区二区三区少妇,99精品国产高清一区二区三区,国产成人精品一区二区色戒,久久久国产精品成人免费,亚洲精品毛片久久久久,99久久婷婷国产综合精品电影,国产一区二区三区任你鲁

0
  • 聊天消息
  • 系統消息
  • 評論與回復
登錄后你可以
  • 下載海量資料
  • 學習在線課程
  • 觀看技術視頻
  • 寫文章/發帖/加入社區
會員中心
創作中心

完善資料讓更多小伙伴認識你,還能領取20積分哦,立即完善>

3天內不再提示

利用KoP如何將Pulsar數據快速且無縫接入Apache Doris

電子工程師 ? 來源:OSC開源社區 ? 作者:OSC開源社區 ? 2022-08-08 15:13 ? 次閱讀
加入交流群
微信小助手二維碼

掃碼添加小助手

加入工程師交流群

KoP 架構介紹

KoP 是 Kafka on Pulsar 的簡寫,顧名思義就是如何在 Pulsar 上實現對 Kafka 數據的讀寫。KoP 將 Kafka 協議處理插件引入 Pulsar Broker 來實現 Apache Pulsar 對 Apache Kafka 協議的支持。將 KoP 協議處理插件添加到現有 Pulsar 集群后,用戶不用修改代碼就可以將現有的 Kafka 應用程序和服務遷移到 Pulsar。

Apache Pulsar 主要特點如下:

利用企業級多租戶特性簡化運營。

避免數據搬遷,簡化操作。

利用 Apache BookKeeper 和分層存儲持久保留事件流。

利用 Pulsar Functions 進行無服務器化事件處理。

KoP 架構如下圖,通過圖可以看到 KoP 引入一個新的協議處理插件,該協議處理插件利用 Pulsar 的現有組件(例如 Topic 發現、分布式日志庫-ManagedLedger、cursor 等)來實現 Kafka 傳輸協議。

Routine Load 訂閱 Pulsar 數據思路

Apache Doris Routine Load 支持了將 Kafka 數據接入 Apache Doris,并保障了數據接入過程中的事務性操作。Apache Pulsar 定位為一個云原生時代企業級的消息發布和訂閱系統,已經在很多線上服務使用。那么 Apache Pulsar 用戶如何將數據接入 Apache Doris 呢,答案是通過 KoP 實現。

由于 KoP 直接在 Pulsar 側提供了對 Kafka 的兼容,那么對于 Apache Doris 來說可以像使用 Kafka 一樣使用 Plusar。整個過程對于 Apache Doris 來說無需任務改變,就能將 Pulsar 數據接入 Apache Doris,并且可以獲得 Routine Load 的事務性保障。

--------------------------

| Apache Doris |

| --------------- |

| | Routine Load | |

| --------------- |

--------------------------

|Kafka Protocol(librdkafka)

------------v--------------

| --------------- |

| | KoP | |

| --------------- |

| Apache Pulsar |

--------------------------

操作實踐

Pulsar Standalone 安裝環境準備:

JDK 安裝:略

下載 Pulsar 二進制包,并解壓:

#下載

wget https://archive.apache.org/dist/pulsar/pulsar-2.10.0/apache-pulsar-2.10.0-bin.tar.gz

#解壓并進入安裝目錄

tar xvfz apache-pulsar-2.10.0-bin.tar.gz

cd apache-pulsar-2.10.0

組件編譯和安裝

1. 下載 KoP 源碼

git clone https://github.com/streamnative/kop.git

cd kop

2. 編譯 KoP 項目

mvn clean install -DskipTests

3. protocols 配置:在解壓后的 apache-pulsar 目錄下創建 protocols文 件夾,并把編譯好的 nar 包復制到 protocols 文件夾中。

mkdir apache-pulsar-2.10.0/protocols

# mv kop/kafka-impl/target/pulsar-protocol-handler-kafka-{{protocol:version}}.nar apache-pulsar-2.10.0/protocols

cp kop/kafka-impl/target/pulsar-protocol-handler-kafka-2.11.0-SNAPSHOT.nar apache-pulsar-2.10.0/protocols

4. 添加后的結果查看

[root@17a5da45700b apache-pulsar-2.10.0]# ls protocols/

pulsar-protocol-handler-kafka-2.11.0-SNAPSHOT.nar

KoP 配置添加

1. 在 standalone.conf 或者 broker.conf 添加如下配置

#kop適配的協議

messagingProtocols=kafka

#kop 的NAR文件路徑

protocolHandlerDirectory=。/protocols

#是否允許自動創建topic

allowAutoTopicCreationType=partitioned

2. 添加如下服務監聽配置

# Use `kafkaListeners` here for KoP 2.8.0 because `listeners` is marked as deprecated from KoP 2.8.0

kafkaListeners=PLAINTEXT://127.0.0.1:9092# This config is not required unless you want to expose another address to the Kafka client.

# If it’s not configured, it will be the same with `kafkaListeners` config by default

kafkaAdvertisedListeners=PLAINTEXT://127.0.0.1:9092

brokerEntryMetadataInterceptors=org.apache.pulsar.common.intercept.AppendIndexMetadataInterceptor

brokerDeleteInactiveTopicsEnabled=false

當出現如下錯誤:

java.lang.IllegalArgumentException: Broker has disabled transaction coordinator, please enable it before using transaction.

添加如下配置,開啟 transactionCoordinatorEnabled

kafkaTransactionCoordinatorEnabled=true

transactionCoordinatorEnabled=true

Pulsar 啟動

#前臺啟動

#bin/pulsar standalone

#后臺啟動

pulsar-daemon start standalone

創建 Doris 數據庫和建表

#進入Doris

mysql -u root -h 127.0.0.1 -P 9030

# 創建數據庫

create database pulsar_doris;

#切換數據庫

use pulsar_doris;

#創建clicklog表

CREATE TABLE IF NOT EXISTS pulsar_doris.clicklog

`clickTime` DATETIME NOT NULL COMMENT “點擊時間”,

`type` String NOT NULL COMMENT “點擊類型”,

`id` VARCHAR(100) COMMENT “唯一id”,

`user` VARCHAR(100) COMMENT “用戶名稱”,

`city` VARCHAR(50) COMMENT “所在城市”

DUPLICATE KEY(`clickTime`, `type`)

DISTRIBUTED BY HASH(`type`) BUCKETS 1

PROPERTIES (

“replication_allocation” = “tag.location.default: 1”

);

創建 Routine Load 任務

CREATE ROUTINE LOAD pulsar_doris.load_from_pulsar_test ON clicklog

COLUMNS(clickTime,id,type,user)

PROPERTIES

“desired_concurrent_number”=“3”,

“max_batch_interval” = “20”,

“max_batch_rows” = “300000”,

“max_batch_size” = “209715200”,

“strict_mode” = “false”,

“format” = “json”

FROM KAFKA

“kafka_broker_list” = “127.0.0.1:9092”,

“kafka_topic” = “test”,

“property.group.id” = “doris”

);

上述命令中的參數解釋如下:

pulsar_doris :Routine Load 任務所在的數據庫

load_from_pulsar_test:Routine Load 任務名稱

clicklog:Routine Load 任務的目標表,也就是配置 Routine Load 任務將數據導入到 Doris 哪個表中。

strict_mode:導入是否為嚴格模式,這里設置為 False。

format:導入數據的類型,這里配置為 Json。

kafka_broker_list:Kafka Broker 服務的地址

kafka_broker_list:Kafka Topic 名稱,也就是同步哪個 Topic 上的數據。

property.group.id:消費組 ID

數據導入和測試

1. 數據導入 構造一個 ClickLog 的數據結構,并調用 Kafka 的 Producer 發送 5000 萬條數據到 Pulsar。 ClickLog 數據結構如下:

public class ClickLog {

private String id;

private String user;

private String city;

private String clickTime;

private String type;

。.. //省略getter和setter

}

消息構造和發送的核心代碼邏輯如下:

String strDateFormat = “yyyy-MM-dd HHss”;

@Autowired

private Producer producer;

try {

for(int j =0 ; j《50000;j++){

int batchSize = 1000;

for(int i = 0 ; i《batchSize ;i++){

ClickLog clickLog = new ClickLog();

clickLog.setId(UUID.randomUUID().toString());

SimpleDateFormat simpleDateFormat = new SimpleDateFormat(strDateFormat);

clickLog.setClickTime(simpleDateFormat.format(new Date()));

clickLog.setType(“webset”);

clickLog.setUser(“user”+ new Random().nextInt(1000) +i);

producer.sendMessage(Constant.topicName, JSONObject.toJSONString(clickLog));

}

}

} catch (Exception e) {

e.printStackTrace();

}

2. ROUTINE LOAD 任務查看執行 SHOW ALL ROUTINE LOAD FOR load_from_pulsar_test G;命令,查看導入任務的狀態。

mysql》 SHOW ALL ROUTINE LOAD FOR load_from_pulsar_test G;

*************************** 1. row ***************************

Id: 87873

Name: load_from_pulsar_test

CreateTime: 2022-05-31 1234

PauseTime: NULL

EndTime: NULL

DbName: default_cluster:pulsar_doris

TableName: clicklog1

State: RUNNING

DataSourceType: KAFKA

CurrentTaskNum: 1

JobProperties: {“partitions”:“*”,“columnToColumnExpr”:“clickTime,id,type,user”,“maxBatchIntervalS”:“20”,“whereExpr”:“*”,“dataFormat”:“json”,“timezone”:“Europe/London”,“send_batch_parallelism”:“1”,“precedingFilter”:“*”,“mergeType”:“APPEND”,“format”:“json”,“json_root”:“”,“maxBatchSizeBytes”:“209715200”,“exec_mem_limit”:“2147483648”,“strict_mode”:“false”,“jsonpaths”:“”,“deleteCondition”:“*”,“desireTaskConcurrentNum”:“3”,“maxErrorNum”:“0”,“strip_outer_array”:“false”,“currentTaskConcurrentNum”:“1”,“execMemLimit”:“2147483648”,“num_as_string”:“false”,“fuzzy_parse”:“false”,“maxBatchRows”:“300000”}

DataSourceProperties: {“topic”:“test”,“currentKafkaPartitions”:“0”,“brokerList”:“127.0.0.1:9092”}

CustomProperties: {“group.id”:“doris”,“kafka_default_offsets”:“OFFSET_END”,“client.id”:“doris.client”}

Statistic: {“receivedBytes”:5739001913,“runningTxns”:[],“errorRows”:0,“committedTaskNum”:168,“loadedRows”:50000000,“loadRowsRate”:23000,“abortedTaskNum”:1,“errorRowsAfterResumed”:0,“totalRows”:50000000,“unselectedRows”:0,“receivedBytesRate”:2675000,“taskExecuteTimeMs”:2144799}

Progress: {“0”:“51139566”}

Lag: {“0”:0}

ReasonOfStateChanged:

ErrorLogUrls:

OtherMsg:

1 row in set (0.00 sec)

ERROR:

No query specified

從上面結果可以看到 totalRows 為 50000000,errorRows 為 0。說明數據不丟不重的導入 Apache Doris 了。

3. 數據統計驗證執行如下命令統計表中的數據,發現統計的結果也是 50000000,符合預期。

mysql》 select count(*) from clicklog;

+----------+

| count(*) |

+----------+

| 50000000 |

+----------+

1 row in set (3.73 sec)

mysql》

通過 KoP 我們實現了將 Apache Pulsar 數據無縫接入 Apache Doris ,無需對 Routine Load 任務進行任何修改,并保障了數據導入過程中的事務性。與此同時,Apache Doris 社區已經啟動了 Apache Pulsar 原生導入支持的設計,相信在不久后就可以直接訂閱 Pulsar 中的消息數據,并保證數據導入過程中的 Exactly-Once 語義。

審核編輯:郭婷

聲明:本文內容及配圖由入駐作者撰寫或者入駐合作網站授權轉載。文章觀點僅代表作者本人,不代表電子發燒友網立場。文章及其配圖僅供工程師學習之用,如有內容侵權或者其他違規問題,請聯系本站處理。 舉報投訴
  • 服務器
    +關注

    關注

    14

    文章

    10253

    瀏覽量

    91482
  • 代碼
    +關注

    關注

    30

    文章

    4968

    瀏覽量

    73960

原文標題:如何將Pulsar數據快速且無縫接入Apache Doris

文章出處:【微信號:OSC開源社區,微信公眾號:OSC開源社區】歡迎添加關注!文章轉載請注明出處。

收藏 人收藏
加入交流群
微信小助手二維碼

掃碼添加小助手

加入工程師交流群

    評論

    相關推薦
    熱點推薦

    工業數據中臺支持接入MySQL數據庫嗎

    工業數據中臺完全支持接入MySQL數據庫 ,通過數據同步、集成與治理等技術手段,能夠充分發揮MySQL在
    的頭像 發表于 12-04 11:23 ?375次閱讀
    工業<b class='flag-5'>數據</b>中臺支持<b class='flag-5'>接入</b>MySQL<b class='flag-5'>數據</b>庫嗎

    如何將某個函數或變量放在固定的地址 ?

    如題,如何將某個函數或者變量放在固定的地址?類似于__attribute__( at(絕對地址) ),用的nucleistudio ide, __attribute__( at(絕對地址) )出現警告 at屬性被忽視,有類似的屬性嗎?
    發表于 11-07 06:07

    如何將GCC項目導入NuEclipse?

    如何將GCC項目導入NuEclipse?
    發表于 09-01 07:04

    請問編譯程序時如何將數據放入Flash固定地址?

    編譯程序時如何將數據放入Flash固定地址?
    發表于 08-29 06:40

    編譯程序時如何將數據放入Flash固定地址?

    編譯程序時如何將數據放入Flash固定地址?
    發表于 08-27 13:16

    請問如何將ISP寫入Nuvoton 8051 MCU系列?

    如何將ISP寫入Nuvoton 8051 MCU系列?
    發表于 08-18 07:34

    臺灣偉斯掃碼槍通過RS232轉Profinet網關接入西門子1200 PLC的完整指南

    在工業自動化領域,設備間的無縫通信是提升生產效率的關鍵。本文詳細解析如何將臺灣偉斯掃碼槍通過RS232轉Profinet網關接入西門子1200系列PLC,實現
    的頭像 發表于 08-05 14:29 ?871次閱讀
    臺灣偉斯掃碼槍通過RS232轉Profinet網關<b class='flag-5'>接入</b>西門子1200 PLC的完整指南

    工業自動化通信方案:臺灣偉斯掃碼槍通過RS232轉Profinet網關接入西門子S7-1200 PLC系統詳解

    在工業自動化領域,設備間的通信如同神經網絡的信號傳遞,需要精準高效的連接方式。本文聚焦如何將臺灣偉斯掃碼槍通過RS232轉Profinet網關接入西門子S7-1200 PLC系統,
    的頭像 發表于 08-04 18:25 ?993次閱讀
    工業自動化通信方案:臺灣偉斯掃碼槍通過RS232轉Profinet網關<b class='flag-5'>接入</b>西門子S7-1200 PLC系統詳解

    使用NVIDIA GPU加速Apache Spark中Parquet數據掃描

    隨著各行各業的企業數據規模不斷增長,Apache Parquet 已經成為了一種主流數據存儲格式。Apache Parquet 是一種列式存儲格式,專為高效的大規模
    的頭像 發表于 07-23 10:52 ?1038次閱讀
    使用NVIDIA GPU加速<b class='flag-5'>Apache</b> Spark中Parquet<b class='flag-5'>數據</b>掃描

    Modbus TCP轉Profibus網關如何快速把流量計接入到DCS?

    在工業自動化領域,設備間的協議互通往往如同語言不通的對話者,需要一位“翻譯官”才能實現高效協作。本文圍繞Modbus TCP轉Profibus網關的應用,解析如何通過這一技術橋梁,流量計數據
    的頭像 發表于 07-07 16:50 ?581次閱讀
    Modbus TCP轉Profibus網關如何<b class='flag-5'>快速</b>把流量計<b class='flag-5'>接入</b>到DCS?

    如何將K230 image轉成jpg?

    如何將K230image轉成jpg,我想獲取圖片,傳回我自己的系統 你好,可以使用img.to_jpeg()來對其進行壓縮。
    發表于 06-12 06:41

    485自由口轉profibus網關快速配置案例

    在當今工業自動化的浪潮中,如何將傳統的工業設備與現代化的自動化系統無縫對接成了眾多工程師和技術專家面臨的難題。 而今天,我們要介紹的485自由口轉Profibus網關,就是為解決這一問題而生的革新
    的頭像 發表于 05-07 17:53 ?795次閱讀
    485自由口轉profibus網關<b class='flag-5'>快速</b>配置案例

    用MCP百度地圖能力輕松接入DeepSeek

    如何將百度地圖的能力接入DeepSeek。本文詳細介紹通過MCP百度地圖的能力接入DeepSeek,為用戶提供精準的智能規劃服務。 一
    的頭像 發表于 03-31 11:05 ?1934次閱讀
    用MCP<b class='flag-5'>將</b>百度地圖能力輕松<b class='flag-5'>接入</b>DeepSeek

    NVIDIA加速的Apache Spark助力企業節省大量成本

    隨著 NVIDIA 推出 Aether 項目,通過采用 NVIDIA 加速的 Apache Spark 企業得以自動加速其數據中心規模的分析工作負載,從而節省數百萬美元。
    的頭像 發表于 03-25 15:09 ?1158次閱讀
    NVIDIA加速的<b class='flag-5'>Apache</b> Spark助力企業節省大量成本

    如何將Linux安裝包快速轉成玲瓏包

    本篇將以 motrix 為例為大家展示如何將 Linux 安裝包快速轉成玲瓏包。
    的頭像 發表于 03-12 16:01 ?1662次閱讀
    <b class='flag-5'>如何將</b>Linux安裝包<b class='flag-5'>快速</b>轉成玲瓏包