国产精品久久久aaaa,日日干夜夜操天天插,亚洲乱熟女香蕉一区二区三区少妇,99精品国产高清一区二区三区,国产成人精品一区二区色戒,久久久国产精品成人免费,亚洲精品毛片久久久久,99久久婷婷国产综合精品电影,国产一区二区三区任你鲁

0
  • 聊天消息
  • 系統消息
  • 評論與回復
登錄后你可以
  • 下載海量資料
  • 學習在線課程
  • 觀看技術視頻
  • 寫文章/發帖/加入社區
會員中心
創作中心

完善資料讓更多小伙伴認識你,還能領取20積分哦,立即完善>

3天內不再提示

跨機房ES同步實戰

OSC開源社區 ? 來源:OSCHINA 社區 ? 作者:京東云開發者-謝澤 ? 2022-12-13 15:10 ? 次閱讀
加入交流群
微信小助手二維碼

掃碼添加小助手

加入工程師交流群

背景眾所周知單個機房在出現不可抗拒的問題(如斷電、斷網等因素)時,會導致無法正常提供服務,會對業務造成潛在的損失。所以在協同辦公領域,一種可以基于同城或異地多活機制的高可用設計,在保障數據一致性的同時,能夠最大程度降低由于機房的僅單點可用所導致的潛在高可用問題,最大程度上保障業務的用戶體驗,降低單點問題對業務造成的潛在損失顯得尤為重要。同城雙活,對于生產的高可用保障,重大的意義和價值是不可言喻的。表面上同城雙活只是簡單的部署了一套生產環境而已,但是在架構上,這個改變的影響是巨大的,無狀態應用的高可用管理、請求流量的管理、版本發布的管理、網絡架構的管理等,其提升的架構復雜度巨大。結合真實的協同辦公產品:京辦(為北京市政府提供協同辦公服務的綜合性平臺)生產環境面對的復雜的政務網絡以及京辦同城雙活架構演進的案例,給大家介紹下京辦持續改進、分階段演進過程中的一些思考和實踐經驗的總結。本文僅針對 ES 集群在跨機房同步過程中的方案和經驗進行介紹和總結。

架構

1.部署 Logstash 在金山云機房上,Logstash 啟動多個實例(按不同的類型分類,提高同步效率),并且和金山云機房的 ES 集群在相同的 VPC2.Logstash 需要配置大網訪問權限,保證 Logstash 和 ES 原集群和目標集群互通。3.數據遷移可以全量遷移和增量遷移,首次遷移都是全量遷移后續的增加數據選擇增量遷移。4.增量遷移需要改造增加識別的增量數據的標識,具體方法后續進行介紹。ab134d5c-7a83-11ed-8abf-dac502259ad0.png?原理

Logstash 工作原理

ab315e3c-7a83-11ed-8abf-dac502259ad0.png??Logstash 分為三個部分 input 、filter、ouput:1.input 處理接收數據,數據可以來源 ES,日志文件,kafka 等通道.2.filter 對數據進行過濾,清洗。3.ouput 輸出數據到目標設備,可以輸出到 ES,kafka,文件等。

增量同步原理

1. 對于 T 時刻的數據,先使用 Logstash 將 T 以前的所有數據遷移到有孚機房京東云 ES,假設用時?T2. 對于 T 到 T+?T 的增量數據,再次使用 logstash 將數據導入到有孚機房京東云的 ES 集群3. 重復上述步驟 2,直到?T 足夠小,此時將業務切換到華為云,最后完成新增數據的遷移適用范圍:ES 的數據中帶有時間戳或者其他能夠區分新舊數據的標簽

流程

?ab61eb2e-7a83-11ed-8abf-dac502259ad0.png?

準備工作

1.創建 ECS 和安裝 JDK 忽略,自行安裝即可2.下載對應版本的 Logstash,盡量選擇與 Elasticsearch 版本一致,或接近的版本安裝即可https://www.elastic.co/cn/downloads/logstash

1)源碼下載直接解壓安裝包,開箱即用

2)修改對內存使用,logstash 默認的堆內存是 1G,根據 ECS 集群選擇合適的內存,可以加快集群數據的遷移效率。

ab81d11e-7a83-11ed-8abf-dac502259ad0.png

?3. 遷移索引

Logstash 會幫助用戶自動創建索引,但是自動創建的索引和用戶本身的索引會有些許差異,導致最終數據的搜索格式不一致,一般索引需要手動創建,保證索引的數據完全一致。

以下提供創建索引的 python 腳本,用戶可以使用該腳本創建需要的索引。

create_mapping.py 文件是同步索引的 python 腳本,config.yaml 是集群地址配置文件。

注:使用該腳本需要安裝相關依賴

yum install -y PyYAML
yum install -y python-requests

拷貝以下代碼保存為 create_mapping.py:

import yaml
import requests
import json
import getopt
import sys

defhelp():
    print
    """
    usage:
    -h/--help print this help.
    -c/--config config file path, default is config.yaml
    
    example:  
    python create_mapping.py -c config.yaml 
    """
defprocess_mapping(index_mapping, dest_index):
    print(index_mapping)
    # remove unnecessary keys
    del index_mapping["settings"]["index"]["provided_name"]
    del index_mapping["settings"]["index"]["uuid"]
    del index_mapping["settings"]["index"]["creation_date"]
    del index_mapping["settings"]["index"]["version"]

    # check alias
    aliases = index_mapping["aliases"]
    for alias inlist(aliases.keys()):
        if alias == dest_index:
            print(
                "source index "+ dest_index +" alias "+ alias +" is the same as dest_index name, will remove this alias.")
            del index_mapping["aliases"][alias]
    if index_mapping["settings"]["index"].has_key("lifecycle"):
        lifecycle = index_mapping["settings"]["index"]["lifecycle"]
        opendistro ={"opendistro":{"index_state_management":
                                         {"policy_id": lifecycle["name"],
                                          "rollover_alias": lifecycle["rollover_alias"]}}}
        index_mapping["settings"].update(opendistro)
        # index_mapping["settings"]["opendistro"]["index_state_management"]["rollover_alias"] = lifecycle["rollover_alias"]
        del index_mapping["settings"]["index"]["lifecycle"]
    print(index_mapping)
    return index_mapping
defput_mapping_to_target(url, mapping, source_index, dest_auth=None):
    headers ={'Content-Type':'application/json'}
    create_resp = requests.put(url, headers=headers, data=json.dumps(mapping), auth=dest_auth)
    if create_resp.status_code !=200:
        print(
            "create index "+ url +" failed with response: "+str(create_resp)+", source index is "+ source_index)
        print(create_resp.text)
        withopen(source_index +".json","w")as f:
            json.dump(mapping, f)
defmain():
    config_yaml ="config.yaml"
    opts, args = getopt.getopt(sys.argv[1:],'-h-c:',['help','config='])
    for opt_name, opt_value in opts:
        if opt_name in('-h','--help'):
            help()
            exit()
        if opt_name in('-c','--config'):
            config_yaml = opt_value

    config_file =open(config_yaml)
    config = yaml.load(config_file)
    source = config["source"]
    source_user = config["source_user"]
    source_passwd = config["source_passwd"]
    source_auth =None
    if source_user !="":
        source_auth =(source_user, source_passwd)
    dest = config["destination"]
    dest_user = config["destination_user"]
    dest_passwd = config["destination_passwd"]
    dest_auth =None
    if dest_user !="":
        dest_auth =(dest_user, dest_passwd)
    print(source_auth)
    print(dest_auth)

    # only deal with mapping list
    if config["only_mapping"]:
        for source_index, dest_index in config["mapping"].iteritems():
            print("start to process source index"+ source_index +", target index: "+ dest_index)
            source_url = source +"/"+ source_index
            response = requests.get(source_url, auth=source_auth)
            if response.status_code !=200:
                print("*** get ElasticSearch message failed. resp statusCode:"+str(
                    response.status_code)+" response is "+ response.text)
                continue
            mapping = response.json()
            index_mapping = process_mapping(mapping[source_index], dest_index)

            dest_url = dest +"/"+ dest_index
            put_mapping_to_target(dest_url, index_mapping, source_index, dest_auth)
            print("process source index "+ source_index +" to target index "+ dest_index +" successed.")
    else:
        # get all indices
        response = requests.get(source +"/_alias", auth=source_auth)
        if response.status_code !=200:
            print("*** get all index failed. resp statusCode:"+str(
                response.status_code)+" response is "+ response.text)
            exit()
        all_index = response.json()
        for index inlist(all_index.keys()):
            if"."in index:
                continue
            print("start to process source index"+ index)
            source_url = source +"/"+ index
            index_response = requests.get(source_url, auth=source_auth)
            if index_response.status_code !=200:
                print("*** get ElasticSearch message failed. resp statusCode:"+str(
                    index_response.status_code)+" response is "+ index_response.text)
                continue
            mapping = index_response.json()

            dest_index = index
            if index in config["mapping"].keys():
                dest_index = config["mapping"][index]
            index_mapping = process_mapping(mapping[index], dest_index)

            dest_url = dest +"/"+ dest_index
            put_mapping_to_target(dest_url, index_mapping, index, dest_auth)
            print("process source index "+ index +" to target index "+ dest_index +" successed.")

if __name__ =='__main__':
    main()

配置文件保存為 config.yaml:

# 源端ES集群地址,加上http://
source: http://ip:port
source_user: "username"
source_passwd: "password"
# 目的端ES集群地址,加上http://
destination: http://ip:port
destination_user: "username"
destination_passwd: "password"

# 是否只處理這個文件中mapping地址的索引
# 如果設置成true,則只會將下面的mapping中的索引獲取到并在目的端創建
# 如果設置成false,則會取源端集群的所有索引,除去(.kibana)
# 并且將索引名稱與下面的mapping匹配,如果匹配到使用mapping的value作為目的端的索引名稱
# 如果匹配不到,則使用源端原始的索引名稱
only_mapping: true

# 要遷移的索引,key為源端的索引名字,value為目的端的索引名字
mapping:
    source_index: dest_index

以上代碼和配置文件準備完成,直接執行 python create_mapping.py 即可完成索引同步。

索引同步完成可以取目標集群的 kibana 上查看或者執行 curl 查看索引遷移情況:

GET _cat/indices?v

?ab95c520-7a83-11ed-8abf-dac502259ad0.png

??全量遷移Logstash 配置位于 config 目錄下。用戶可以參考配置修改 Logstash 配置文件,為了保證遷移數據的準確性,一般建議建立多組 Logstash,分批次遷移數據,每個 Logstash 遷移部分數據。配置集群間遷移配置參考:abc4a7aa-7a83-11ed-8abf-dac502259ad0.png

?

input{
    elasticsearch{
        # 源端地址
        hosts =>  ["ip1:port1","ip2:port2"]
        # 安全集群配置登錄用戶名密碼
        user => "username"
        password => "password"
        # 需要遷移的索引列表,以逗號分隔,支持通配符
        index => "a_*,b_*"
        # 以下三項保持默認即可,包含線程數和遷移數據大小和logstash jvm配置相關
        docinfo=>true
        slices => 10
        size => 2000
        scroll => "60m"
    }
}

filter {
  # 去掉一些logstash自己加的字段
  mutate {
    remove_field => ["@timestamp", "@version"]
  }
}

output{
    elasticsearch{
        # 目的端es地址
        hosts => ["http://ip:port"]
        # 安全集群配置登錄用戶名密碼
        user => "username"
        password => "password"
 # 目的端索引名稱,以下配置為和源端保持一致
        index => "%{[@metadata][_index]}"
        # 目的端索引type,以下配置為和源端保持一致
        document_type => "%{[@metadata][_type]}"
        # 目標端數據的_id,如果不需要保留原_id,可以刪除以下這行,刪除后性能會更好
        document_id => "%{[@metadata][_id]}"
        ilm_enabled => false
        manage_template => false
    }

    # 調試信息,正式遷移去掉
    stdout { codec => rubydebug { metadata => true }}
}

增量遷移

預處理:

1.@timestamp在 elasticsearch2.0.0beta 版本后棄用

https://www.elastic.co/guide/en/elasticsearch/reference/2.4/mapping-timestamp-field.html

2. 本次對于京辦從金山云機房遷移到京東有孚機房,所涉及到的業務領域多,各個業務線中所代表新增記錄的時間戳字段不統一,所涉及到的兼容工作量大,于是考慮通過 elasticsearch 中預處理功能 pipeline 進行預處理添加統一增量標記字段:gmt_created_at,以減少遷移工作的復雜度(各自業務線可自行評估是否需要此步驟)。

PUT _ingest/pipeline/gmt_created_at
{
  "description":"Adds gmt_created_at timestamp to documents",
  "processors":[
    {
      "set":{
        "field":"_source.gmt_created_at",
        "value":"{{_ingest.timestamp}}"
      }
    }
  ]
}

3. 檢查 pipeline 是否生效

GET _ingest/pipeline/*

4. 各個 index 設置對應 settings 增加 pipeline 為默認預處理

PUT index_xxxx/_settings
{
  "settings": {
    "index.default_pipeline": "gmt_created_at"
  }
}

5. 檢查新增 settings 是否生效

GET index_xxxx/_settings

?ac0e3f46-7a83-11ed-8abf-dac502259ad0.png

??增量遷移腳本

schedule-migrate.conf

index:可以使用通配符的方式

query: 增量同步的 DSL,統一 gmt_create_at 為增量同步的特殊標記

schedule: 每分鐘同步一把,"* * * * *"

input {
elasticsearch {
        hosts =>["ip:port"]
        # 安全集群配置登錄用戶名密碼
        user =>"username"
        password =>"password"
        index =>"index_*"
        query =>'{"query":{"range":{"gmt_create_at":{"gte":"now-1m","lte":"now/m"}}}}'
        size =>5000
        scroll =>"5m"
        docinfo =>true
        schedule =>"* * * * *"
      }
}
filter {
     mutate {
      remove_field =>["source", "@version"]
   }
}
output {
    elasticsearch {
        # 目的端es地址
        hosts =>["http://ip:port"]
        # 安全集群配置登錄用戶名密碼
        user =>"username"
        password =>"password"
        index =>"%{[@metadata][_index]}"
        document_type =>"%{[@metadata][_type]}"
        document_id =>"%{[@metadata][_id]}"
        ilm_enabled =>false
        manage_template =>false
    }

# 調試信息,正式遷移去掉
stdout { codec => rubydebug { metadata =>true}}
}

問題:

mapping 中存在 join 父子類型的字段,直接遷移報 400 異常?ac4a1f70-7a83-11ed-8abf-dac502259ad0.png ?
[2022-09-20T20:02:16,404][WARN ][logstash.outputs.elasticsearch] Could not index event to Elasticsearch. {:status=>400, 
:action=>["index", {:_id=>"xxx", :_index=>"xxx", :_type=>"joywork_t_work", :routing=>nil}, #], 
:response=>{"index"=>{"_index"=>"xxx", "_type"=>"xxx", "_id"=>"xxx", "status"=>400, 
"error"=>{"type"=>"mapper_parsing_exception", "reason"=>"failed to parse", 
"caused_by"=>{"type"=>"illegal_argument_exception", "reason"=>"[routing] is missing for join field [task_user]"}}}}}

解決方法:

https://discuss.elastic.co/t/an-routing-missing-exception-is-obtained-when-reindex-sets-the-routing-value/155140https://github.com/elastic/elasticsearch/issues/26183

結合業務特征,通過在 filter 中加入小量的 ruby 代碼,將_routing 的值取出來,放回 logstah event 中,由此問題得以解決。

示例:

ac725e7c-7a83-11ed-8abf-dac502259ad0.png


審核編輯 :李倩


聲明:本文內容及配圖由入駐作者撰寫或者入駐合作網站授權轉載。文章觀點僅代表作者本人,不代表電子發燒友網立場。文章及其配圖僅供工程師學習之用,如有內容侵權或者其他違規問題,請聯系本站處理。 舉報投訴
  • 數據
    +關注

    關注

    8

    文章

    7335

    瀏覽量

    94774
  • 數據遷移
    +關注

    關注

    0

    文章

    89

    瀏覽量

    7258

原文標題:跨機房 ES 同步實戰

文章出處:【微信號:OSC開源社區,微信公眾號:OSC開源社區】歡迎添加關注!文章轉載請注明出處。

收藏 人收藏
加入交流群
微信小助手二維碼

掃碼添加小助手

加入工程師交流群

    評論

    相關推薦
    熱點推薦

    ES7243E+ES8311音頻錄制與播放電路資料

    本電路為一款低成本音頻錄制與播放參考電路,含sch原理圖和pcb板圖。ES7243E接2路模擬麥克風實現音頻采集,ES8311接1路功放實現音頻播放,適合用于語音對話類的電子玩具或在線語音通訊設備
    發表于 02-04 17:18 ?1次下載

    深度解析ES8389/ES8390/音頻芯片Linux驅動(Linux6.1內核)

    ? ? ? 在嵌入式音頻開發中,順芯( Everest ) ES8389/ES8390 是一款高集成度的音頻 Codec 芯片,廣泛應用于智能音箱、車載終端、便攜設備等場景。本文
    的頭像 發表于 02-02 11:37 ?1349次閱讀
    深度解析<b class='flag-5'>ES</b>8389/<b class='flag-5'>ES</b>8390/音頻芯片Linux驅動(Linux6.1內核)

    機房項目中的時間系統:從忽視到謹慎的十年體會

    機房項目中的時間系統:從忽視到謹慎的十年體會 做系統集成、機房項目這些年,我對“時間同步”這個基礎環節的看法,變化其實挺大的。 剛入行那會兒,時間同步在方案里幾乎沒什么存在感。常見做法
    的頭像 發表于 01-20 13:13 ?148次閱讀

    機房如何搬遷?有哪些步驟與規范?

    很多朋友提到機房搬遷,最近有不少的朋友做到這方面的項目。隨著企業、單位的不斷深入發展,現有的涉及辦公、生產場所已經不能滿足,新的廠房及辦公大樓逐漸籌建,那么機房搬遷或機房改建服務應運而生,而
    的頭像 發表于 12-19 10:42 ?390次閱讀
    <b class='flag-5'>機房</b>如何搬遷?有哪些步驟與規范?

    基于PXIe總線的多板卡通道同步機制研究

    PXIe背板資源,對多板卡同步的基本原理和常見實現方案進行論述。機箱、機柜的分布式時間同步問題將在后續文章中展開。
    的頭像 發表于 12-18 09:35 ?469次閱讀
    基于PXIe總線的多板卡通道<b class='flag-5'>同步</b>機制研究

    屏蔽機房建設圖解,與非屏蔽機房有何區別?

    有不少朋友問到關于屏蔽機房與一般的機房有何區別,本期,為了方便大家更詳細的了解關于屏蔽機房建設,我們可以通過這個實際圖紙來詳細了解。 一、普通機房安裝圖 普通
    的頭像 發表于 12-17 09:50 ?455次閱讀
    屏蔽<b class='flag-5'>機房</b>建設圖解,與非屏蔽<b class='flag-5'>機房</b>有何區別?

    在恩智浦i.MX RT1180界處理器上如何實現1588協議同步

    簡介 在前面的文章中介紹了i.MX RT1180界處理器的主要應用場景是在工業自動化與智能制造的高精度測量領域,而這其中時間同步是一個實時系統能夠穩定運行的關鍵。而主流的工業網絡協議基本上的時鐘
    的頭像 發表于 12-03 09:09 ?7850次閱讀
    在恩智浦i.MX RT1180<b class='flag-5'>跨</b>界處理器上如何實現1588協議<b class='flag-5'>同步</b>

    EMC電路怎么整改:如何縮短整改周期的實戰案例

    EMC電路怎么整改:如何縮短整改周期的實戰案例|南柯電子
    的頭像 發表于 10-20 10:17 ?664次閱讀

    蔚來全新ES8掉頭能有多敏捷

    隨著全新ES8媒體試駕及用戶試駕的持續進行,有許多朋友表示:“全新ES8作為中國最大的純電SUV,開起來卻沒有傳統大車的笨重感”。這正是智能敏捷掉頭功能的功勞,也是蔚來全棧自研能力的顯現。全新ES8掘金行動第三期,一起來看看全新
    的頭像 發表于 09-22 11:48 ?1165次閱讀

    手機板 layout 走線分割問題

    初學習layout時,都在說信號線不可分割,但是在工作中為了成本不能分割似乎也非絕對。 在后續工作中,分割的基礎都是相鄰層有一面完整的GND參考,分割發生在相鄰的另外一層。 但
    發表于 09-16 14:56

    BASiC_BSRD-2503-ES01數據手冊

    BASiC_BSRD-2503-ES01
    發表于 09-01 16:25 ?7次下載

    淘寶API平臺數據同步,多店管理一屏搞定!

    同步中樞 通過淘寶開放平臺API構建數據中臺,實現: # 示例:平臺庫存同步核心邏輯import requestsdef sync_inventory(item_id, platforms
    的頭像 發表于 07-30 14:41 ?650次閱讀
    淘寶API<b class='flag-5'>跨</b>平臺數據<b class='flag-5'>同步</b>,多店管理一屏搞定!

    黑芝麻智能域時間同步技術:消除多域計算單元的時鐘信任鴻溝

    上海2025年7月21日 /美通社/ -- 本文圍繞域時間同步技術展開,作為智能汽車 "感知-決策-執行 -交互" 全鏈路的時間基準,文章介紹了 PTP、gPTP、CAN 等主流同步技術及特點
    的頭像 發表于 07-22 09:17 ?589次閱讀
    黑芝麻智能<b class='flag-5'>跨</b>域時間<b class='flag-5'>同步</b>技術:消除多域計算單元的時鐘信任鴻溝

    鴻蒙5開發寶藏案例分享---線程性能優化指南

    發現鴻蒙寶藏:線程序列化性能優化實戰指南 大家好呀!今天在翻鴻蒙文檔時挖到一個超級實用的工具—— DevEco Profiler的序列化檢測功能 !平時用<span class
    發表于 06-12 17:13

    簡述通信機房電磁兼容要求

    通信機房作為信息傳輸的核心樞紐,其設備穩定性直接關系到網絡服務的質量。電磁干擾可能導致機房內設備異常、數據丟失甚至系統癱瘓。
    的頭像 發表于 05-28 15:48 ?903次閱讀