关键词排名优化网站建设公司,信融营销型网站建设,双鸭山网站建设,wordpress勋章文章目录**一、前期准备****1. 集群健康检查****2. 备份数据****3. 监控系统准备****二、创建新索引并配置****1. 设计新索引映射****2. 创建读写别名****三、全量数据迁移****1. 执行初始 Reindex****2. 监控 Reindex 进度****四、增量数据同步****1. 方案选择****五、双写切换…
文章目录**一、前期准备****1. 集群健康检查****2. 备份数据****3. 监控系统准备****二、创建新索引并配置****1. 设计新索引映射****2. 创建读写别名****三、全量数据迁移****1. 执行初始 Reindex****2. 监控 Reindex 进度****四、增量数据同步****1. 方案选择****五、双写切换****1. 修改应用代码实现双写****2. 验证双写一致性****六、流量切换****1. 只读流量切换****2. 验证查询结果一致性****3. 写入流量切换****七、收尾工作****1. 恢复新索引配置****2. 验证性能和稳定性****3. 删除旧索引可选****八、回滚策略****九、优化建议****十、风险控制**一、前期准备
1. 集群健康检查
GET /_cluster/health确保
status 为 greennumber_of_nodes 符合预期unassigned_shards 为 0
2. 备份数据
# 注册快照仓库
PUT /_snapshot/my_backup
{type: fs,settings: {location: /path/to/snapshots}
}# 创建全量快照
PUT /_snapshot/my_backup/snapshot_1?wait_for_completiontrue3. 监控系统准备
开启 ES 性能监控如使用 Elastic APM、Prometheus Grafana设置关键指标告警如集群负载、JVM 内存、磁盘使用率
二、创建新索引并配置
1. 设计新索引映射
PUT /new_products
{settings: {index.number_of_shards: 5, // 与旧索引保持一致index.number_of_replicas: 1,index.refresh_interval: 30s, // 临时调大提升写入性能index.translog.durability: async, // 临时使用异步持久化index.translog.sync_interval: 30s},mappings: {properties: {id: {type: keyword},name: {type: text},price: {type: double}, // 假设原字段为 integercreate_time: {type: date, format: yyyy-MM-dd HH:mm:ss||epoch_millis},tags: {type: keyword}}}
}2. 创建读写别名
POST /_aliases
{actions: [{ add: { alias: products_read, index: old_products } },{ add: { alias: products_write, index: old_products } }]
}三、全量数据迁移
1. 执行初始 Reindex
POST /_reindex?wait_for_completionfalse
{source: {index: old_products,size: 5000, // 每次查询 5000 条sort: [_doc] // 按文档顺序处理避免遗漏},dest: {index: new_products,op_type: create},script: {source: // 类型转换逻辑if (ctx._source.containsKey(price)) {ctx._source.price Double.parseDouble(ctx._source.price.toString());}// 日期格式转换if (ctx._source.containsKey(create_time)) {try {ctx._source.create_time new Date(ctx._source.create_time).getTime();} catch (Exception e) {// 处理异常日期格式ctx._source.create_time System.currentTimeMillis();}}}
}2. 监控 Reindex 进度
GET /_tasks?detailedtrueactions*reindex四、增量数据同步
1. 方案选择
方案 A基于时间戳的定时同步适合有 update_time 字段的场景
POST /_reindex?wait_for_completionfalse
{source: {index: old_products,query: {range: {update_time: {gte: {{last_sync_time}}, // 上次同步时间lt: now}}}},dest: {index: new_products}
}方案 B基于 Canal 的 binlog 订阅适合 ES 作为 MySQL 从库的场景
# 部署 Canal 客户端
canal.deployer-1.1.5/bin/startup.sh# 配置 canal.instance.filter.regex.*\\..* 监听全量变更五、双写切换
1. 修改应用代码实现双写
在应用层同时写入新旧索引
// 伪代码示例
public void indexProduct(Product product) {// 写入旧索引esClient.index(products_write, product);// 写入新索引带类型转换Product newProduct convertProduct(product);esClient.index(new_products, newProduct);
}2. 验证双写一致性
// 对比同一文档在新旧索引中的差异
GET old_products/_doc/1
GET new_products/_doc/1六、流量切换
1. 只读流量切换
POST /_aliases
{actions: [{ remove: { alias: products_read, index: old_products } },{ add: { alias: products_read, index: new_products } }]
}2. 验证查询结果一致性
// 对比相同查询在新旧索引中的结果
GET products_read/_search?qname:iphone
GET old_products/_search?qname:iphone3. 写入流量切换
POST /_aliases
{actions: [{ remove: { alias: products_write, index: old_products } },{ add: { alias: products_write, index: new_products } }]
}七、收尾工作
1. 恢复新索引配置
PUT /new_products/_settings
{index.refresh_interval: 1s,index.translog.durability: request,index.translog.sync_interval: 5s
}2. 验证性能和稳定性
监控集群负载验证业务查询性能验证写入吞吐量
3. 删除旧索引可选
DELETE /old_products八、回滚策略
若出现问题可快速回滚
POST /_aliases
{actions: [{ remove: { alias: products_read, index: new_products } },{ add: { alias: products_read, index: old_products } },{ remove: { alias: products_write, index: new_products } },{ add: { alias: products_write, index: old_products } }]
}九、优化建议
分批次迁移对百万级数据按时间或 ID 范围分批 Reindex避免单次任务过大限流控制
POST /_reindex?wait_for_completionfalse
{source: { index: old_products },dest: { index: new_products },requests_per_second: 100 // 每秒处理 100 个请求
}临时扩容迁移期间增加专用协调节点减轻数据节点压力预热缓存迁移后对热点数据执行预热查询自动化脚本使用 Python 脚本编排整个流程
import requests
import timedef reindex_with_progress():# 启动 reindexresponse requests.post(http://localhost:9200/_reindex?wait_for_completionfalse,json{source: {index: old_products},dest: {index: new_products}})task_id response.json()[task]# 监控进度while True:status requests.get(fhttp://localhost:9200/_tasks/{task_id}).json()completed status[task][status][completed]total status[task][status][total]print(f进度: {completed}/{total} ({completed/total*100:.2f}%))if status[completed]:breaktime.sleep(5)reindex_with_progress()十、风险控制
灰度发布先迁移部分数据进行验证熔断机制设置错误率阈值超过则自动停止迁移预留资源确保集群有 30% 以上的空闲资源夜间执行选择业务低峰期执行核心操作