跨机房ES同步实战

背景众所周知单个机房在出现不可抗拒的问题(如断电、断网等因素)时,会导致无法正常提供服务,会对业务造成潜在的损失。所以在协同办公领域,一种可以基于同城或异地多活机制的高可用设计,在保障数据一致性的同时,能够最大程度降低由于机房的仅单点可用所导致的潜在高可用问题,最大程度上保障业务的用户体验,降低单点问题对业务造成的潜在损失显得尤为重要。同城双活,对于生产的高可用保障,重大的意义和价值是不可言喻的。表面上同城双活只是简单的部署了一套生产环境而已,但是在架构上,这个改变的影响是巨大的,无状态应用的高可用管理、请求流量的管理、版本发布的管理、网络架构的管理等,其提升的架构复杂度巨大。结合真实的协同办公产品:京办(为北京市政府提供协同办公服务的综合性平台)生产环境面对的复杂的政务网络以及京办同城双活架构演进的案例,给大家介绍下京办持续改进、分阶段演进过程中的一些思考和实践经验的总结。本文仅针对 es 集群在跨机房同步过程中的方案和经验进行介绍和总结。 架构 1. 部署 logstash 在金山云机房上,logstash 启动多个实例(按不同的类型分类,提高同步效率),并且和金山云机房的 es 集群在相同的 vpc2. logstash 需要配置大网访问权限,保证 logstash 和 es 原集群和目标集群互通。3. 数据迁移可以全量迁移和增量迁移,首次迁移都是全量迁移后续的增加数据选择增量迁移。4. 增量迁移需要改造增加识别的增量数据的标识,具体方法后续进行介绍。原理 logstash 工作原理  logstash 分为三个部分 input 、filter、ouput:1. input 处理接收数据,数据可以来源 es,日志文件,kafka 等通道.2. filter 对数据进行过滤,清洗。3. ouput 输出数据到目标设备,可以输出到 es,kafka,文件等。 增量同步原理 1. 对于 t 时刻的数据,先使用 logstash 将 t 以前的所有数据迁移到有孚机房京东云 es,假设用时∆t2. 对于 t 到 t+∆t 的增量数据,再次使用 logstash 将数据导入到有孚机房京东云的 es 集群3. 重复上述步骤 2,直到∆t 足够小,此时将业务切换到华为云,最后完成新增数据的迁移适用范围:es 的数据中带有时间戳或者其他能够区分新旧数据的标签 流程  准备工作 1. 创建 ecs 和安装 jdk 忽略,自行安装即可2. 下载对应版本的 logstash,尽量选择与 elasticsearch 版本一致,或接近的版本安装即可https://www.elastic.co/cn/downloads/logstash 1) 源码下载直接解压安装包,开箱即用
2)修改对内存使用,logstash 默认的堆内存是 1g,根据 ecs 集群选择合适的内存,可以加快集群数据的迁移效率。
 3. 迁移索引
logstash 会帮助用户自动创建索引,但是自动创建的索引和用户本身的索引会有些许差异,导致最终数据的搜索格式不一致,一般索引需要手动创建,保证索引的数据完全一致。
以下提供创建索引的 python 脚本,用户可以使用该脚本创建需要的索引。
create_mapping.py 文件是同步索引的 python 脚本,config.yaml 是集群地址配置文件。
注:使用该脚本需要安装相关依赖
yum install -y pyyamlyum install -y python-requests 拷贝以下代码保存为 create_mapping.py:
import yamlimport requestsimport jsonimport getoptimport sysdefhelp(): print usage: -h/--help print this help. -c/--config config file path, default is config.yaml example: python create_mapping.py -c config.yaml defprocess_mapping(index_mapping, dest_index): print(index_mapping) # remove unnecessary keys del index_mapping[settings][index][provided_name] del index_mapping[settings][index][uuid] del index_mapping[settings][index][creation_date] del index_mapping[settings][index][version] # check alias aliases = index_mapping[aliases] for alias inlist(aliases.keys()): if alias == dest_index: print( source index + dest_index + alias + alias + is the same as dest_index name, will remove this alias.) del index_mapping[aliases][alias] if index_mapping[settings][index].has_key(lifecycle): lifecycle = index_mapping[settings][index][lifecycle] opendistro ={opendistro:{index_state_management: {policy_id: lifecycle[name], rollover_alias: lifecycle[rollover_alias]}}} index_mapping[settings].update(opendistro) # index_mapping[settings][opendistro][index_state_management][rollover_alias] = lifecycle[rollover_alias] del index_mapping[settings][index][lifecycle] print(index_mapping) return index_mappingdefput_mapping_to_target(url, mapping, source_index, dest_auth=none): headers ={'content-type':'application/json'} create_resp = requests.put(url, headers=headers, data=json.dumps(mapping), auth=dest_auth) if create_resp.status_code !=200: print( create index + url + failed with response: +str(create_resp)+, source index is + source_index) print(create_resp.text) withopen(source_index +.json,w)as f: json.dump(mapping, f)defmain(): config_yaml =config.yaml opts, args = getopt.getopt(sys.argv[1:],'-h-c:',['help','config=']) for opt_name, opt_value in opts: if opt_name in('-h','--help'): help() exit() if opt_name in('-c','--config'): config_yaml = opt_value config_file =open(config_yaml) config = yaml.load(config_file) source = config[source] source_user = config[source_user] source_passwd = config[source_passwd] source_auth =none if source_user !=: source_auth =(source_user, source_passwd) dest = config[destination] dest_user = config[destination_user] dest_passwd = config[destination_passwd] dest_auth =none if dest_user !=: dest_auth =(dest_user, dest_passwd) print(source_auth) print(dest_auth) # only deal with mapping list if config[only_mapping]: for source_index, dest_index in config[mapping].iteritems(): print(start to process source index+ source_index +, target index: + dest_index) source_url = source +/+ source_index response = requests.get(source_url, auth=source_auth) if response.status_code !=200: print(*** get elasticsearch message failed. resp statuscode:+str( response.status_code)+ response is + response.text) continue mapping = response.json() index_mapping = process_mapping(mapping[source_index], dest_index) dest_url = dest +/+ dest_index put_mapping_to_target(dest_url, index_mapping, source_index, dest_auth) print(process source index + source_index + to target index + dest_index + successed.) else: # get all indices response = requests.get(source +/_alias, auth=source_auth) if response.status_code !=200: print(*** get all index failed. resp statuscode:+str( response.status_code)+ response is + response.text) exit() all_index = response.json() for index inlist(all_index.keys()): if.in index: continue print(start to process source index+ index) source_url = source +/+ index index_response = requests.get(source_url, auth=source_auth) if index_response.status_code !=200: print(*** get elasticsearch message failed. resp statuscode:+str( index_response.status_code)+ response is + index_response.text) continue mapping = index_response.json() dest_index = index if index in config[mapping].keys(): dest_index = config[mapping][index] index_mapping = process_mapping(mapping[index], dest_index) dest_url = dest +/+ dest_index put_mapping_to_target(dest_url, index_mapping, index, dest_auth) print(process source index + index + to target index + dest_index + successed.)if __name__ =='__main__': main() 配置文件保存为 config.yaml:
# 源端es集群地址,加上http://source: http://ip:portsource_user: usernamesource_passwd: password# 目的端es集群地址,加上http://destination: http://ip:portdestination_user: usernamedestination_passwd: password# 是否只处理这个文件中mapping地址的索引# 如果设置成true,则只会将下面的mapping中的索引获取到并在目的端创建# 如果设置成false,则会取源端集群的所有索引,除去(.kibana)# 并且将索引名称与下面的mapping匹配,如果匹配到使用mapping的value作为目的端的索引名称# 如果匹配不到,则使用源端原始的索引名称only_mapping: true# 要迁移的索引,key为源端的索引名字,value为目的端的索引名字mapping: source_index: dest_index 以上代码和配置文件准备完成,直接执行 python create_mapping.py 即可完成索引同步。
索引同步完成可以取目标集群的 kibana 上查看或者执行 curl 查看索引迁移情况:
get _cat/indices?v 
全量迁移logstash 配置位于 config 目录下。用户可以参考配置修改 logstash 配置文件,为了保证迁移数据的准确性,一般建议建立多组 logstash,分批次迁移数据,每个 logstash 迁移部分数据。配置集群间迁移配置参考: 
input{ elasticsearch{ # 源端地址 hosts => [ip1:port1,ip2:port2] # 安全集群配置登录用户名密码 user => username password => password # 需要迁移的索引列表,以逗号分隔,支持通配符 index => a_*,b_* # 以下三项保持默认即可,包含线程数和迁移数据大小和logstash jvm配置相关 docinfo=>true slices => 10 size => 2000 scroll => 60m }}filter { # 去掉一些logstash自己加的字段 mutate { remove_field => [@timestamp, @version] }}output{ elasticsearch{ # 目的端es地址 hosts => [http://ip:port] # 安全集群配置登录用户名密码 user => username password => password # 目的端索引名称,以下配置为和源端保持一致 index => %{[@metadata][_index]} # 目的端索引type,以下配置为和源端保持一致 document_type => %{[@metadata][_type]} # 目标端数据的_id,如果不需要保留原_id,可以删除以下这行,删除后性能会更好 document_id => %{[@metadata][_id]} ilm_enabled => false manage_template => false } # 调试信息,正式迁移去掉 stdout { codec => rubydebug { metadata => true }}} 增量迁移 预处理: 1. @timestamp 在 elasticsearch2.0.0beta 版本后弃用
https://www.elastic.co/guide/en/elasticsearch/reference/2.4/mapping-timestamp-field.html
2. 本次对于京办从金山云机房迁移到京东有孚机房,所涉及到的业务领域多,各个业务线中所代表新增记录的时间戳字段不统一,所涉及到的兼容工作量大,于是考虑通过 elasticsearch 中预处理功能 pipeline 进行预处理添加统一增量标记字段:gmt_created_at,以减少迁移工作的复杂度(各自业务线可自行评估是否需要此步骤)。
put _ingest/pipeline/gmt_created_at{ description:adds gmt_created_at timestamp to documents, processors:[ { set:{ field:_source.gmt_created_at, value:{{_ingest.timestamp}} } } ]} 3. 检查 pipeline 是否生效
get _ingest/pipeline/* 4. 各个 index 设置对应 settings 增加 pipeline 为默认预处理
put index_xxxx/_settings{ settings: { index.default_pipeline: gmt_created_at }} 5. 检查新增 settings 是否生效
get index_xxxx/_settings 
增量迁移脚本
schedule-migrate.conf
index:可以使用通配符的方式
query: 增量同步的 dsl,统一 gmt_create_at 为增量同步的特殊标记
schedule: 每分钟同步一把,* * * * *
input {elasticsearch { hosts =>[ip:port] # 安全集群配置登录用户名密码 user =>username password =>password index =>index_* query =>'{query:{range:{gmt_create_at:{gte:now-1m,lte:now/m}}}}' size =>5000 scroll =>5m docinfo =>true schedule =>* * * * * }}filter { mutate { remove_field =>[source, @version] }}output { elasticsearch { # 目的端es地址 hosts =>[http://ip:port] # 安全集群配置登录用户名密码 user =>username password =>password index =>%{[@metadata][_index]} document_type =>%{[@metadata][_type]} document_id =>%{[@metadata][_id]} ilm_enabled =>false manage_template =>false }# 调试信息,正式迁移去掉stdout { codec => rubydebug { metadata =>true}}} 问题: mapping 中存在 join 父子类型的字段,直接迁移报 400 异常  [2022-09-20t20:02:16,404][warn ][logstash.outputs.elasticsearch] could not index event to elasticsearch. {:status=>400, :action=>[index, {:_id=>xxx, :_index=>xxx, :_type=>joywork_t_work, :routing=>nil}, #], :response=>{index=>{_index=>xxx, _type=>xxx, _id=>xxx, status=>400, error=>{type=>mapper_parsing_exception, reason=>failed to parse, caused_by=>{type=>illegal_argument_exception, reason=>[routing] is missing for join field [task_user]}}}}} 解决方法: https://discuss.elastic.co/t/an-routing-missing-exception-is-obtained-when-reindex-sets-the-routing-value/155140 https://github.com/elastic/elasticsearch/issues/26183
结合业务特征,通过在 filter 中加入小量的 ruby 代码,将_routing 的值取出来,放回 logstah event 中,由此问题得以解决。
示例:


Cadence Tensilica Xtensa 处理器满足最严格的汽车功能安全要求,完全达到ISO 26262 ASIL-D等级
我国实现制造强国目标至少30年
运放保护电路设计
白墙是否可以代替投影屏幕?
智慧城市怎样变得更加的智能化
跨机房ES同步实战
不惧严寒 超威电池质量受消费者盛赞
助力方案商海内外应用落地,参考方案分享
EPR6-S工业机器人通过EtherCAT转profinet网关接入西门子系统
深度解析大语言模型内部运行原理
如何确认USB接口是不是“真”高速
免洗焊接工艺、材料与设备的分析
采购60个面试问题和参考答案
工业4.0下的现代的驱动系统介绍
人工智能会不会成为程序员的终结者?
谷歌宣布将全面推行移动优先索引
日媒:华为投资50亿日元在日本建立工厂
AVR单片机的特性、缺点和应用
未来所有事物都将会装上微型传感器
BERT模型的PyTorch实现