elasticsearch indices.recovery 流程分析与速度优化
目录
基于版本:5.5.3
recovery 是 es 数据恢复,保持数据一致性的过程,触发条件包括:从快照备份恢复,节点加入和离开,索引的_open操作等.
recovery 由clusterChanged触发,进入到:
1 2 3 4 |
applyNewOrUpdatedShards-> applyInitializingShard |
根据数据分片性质,分为主分片和副本分片恢复流程.
主分片从 translog 自我恢复,副本分片从主分片拉取数据进行恢复.
经历的阶段为:
init: Recovery has not started
index: Reading index meta-data and copying bytes from source to destination
start: Starting the engine; opening the index for use
translog: Replaying transaction log
finalize: Cleanup
done: Complete
从
1 2 3 |
cluster.IndicesClusterStateService.clusterChanged |
触发,进入
1 2 3 |
IndicesClusterStateService#applyInitializingShard |
每次处理一个 shard
1 2 3 4 5 6 7 8 9 10 11 12 |
if (isPeerRecovery(shardRouting)) {// 从远程主分片恢复 try { RecoveryState.Type type = shardRouting.primary() ? RecoveryState.Type.RELOCATION : RecoveryState.Type.REPLICA; recoveryTarget.startRecovery(indexShard, type, sourceNode, new PeerRecoveryListener(shardRouting, indexService, indexMetaData)); } else {//Primary 进行自我恢复,不需要其他节点的支持 indexService.shard(shardId).recoverFromStore(shardRouting, new StoreRecoveryService.RecoveryListener() { ... }); } } |
主分片恢复流程
实现的主要思路是:系统的每次 flush 操作会清理相关 translog, 因此 translog 中存在的数据就是 lucene 索引中可能尚未刷入的数据,主分片的 recovery 就是把 translog 中的内容转移到 lucene.
具体做法是:把当前 translog 做快照,重放每条记录,调用标准的index 操作创建或更新 doc来恢复,然后再处理recovery期间新写入的数据.
路径:org/elasticsearch/index/shard/StoreRecoveryService.java
在新的线程池任务中执行:
1 2 3 |
recoverFromStore(indexShard, indexShouldExists, recoveryState); |
然后会进入InternalEngine构造函数:
1 2 3 4 5 6 7 8 |
if (skipInitialTranslogRecovery) { // make sure we point at the latest translog from now on.. commitIndexWriter(writer, translog, lastCommittedSegmentInfos.getUserData().get(SYNC_COMMIT_ID)); } else {//具体的从 Translog 恢复的实现 recoverFromTranslog(engineConfig, translogGeneration); } |
skipInitialTranslogRecovery一定为 false, 进入recoverFromTranslog,从 translog 做个快照,挨个恢复:
1 2 3 4 5 6 7 8 |
while ((operation = snapshot.next()) != null) { try { performRecoveryOperation(engine, operation, true); opsRecovered++; } } |
重放完毕后,如果重放写入的数据大于0,则 flush, 否则写一个 synced flush id:syncId
1 2 3 4 5 6 7 |
if (opsRecovered > 0) { flush(true, true); } else if (translog.isCurrent(translogGeneration) == false) { commitIndexWriter(indexWriter, translog, lastCommittedSegmentInfos.getUserData().get(Engine.SYNC_COMMIT_ID)); } |
副本分片恢复流程
从主分片恢复到副本分片主要有两个阶段(在主分片节点执行):
phase1
对比分段信息,如果 syncid 相同且 doc 数量相同,则跳过,否则复制整个分段
phase2
将当前 translog 做快照,发送所有的 translog operation 到对端节点,不限速
恢复过程中的数据传输方向,主分片节点为 Source,副本分片节点为 Target
主要处理逻辑:副本分片节点为 RecoveryTarget类,主分片节点为 RecoverySource 类.
首先,副本分片的恢复也会启动一个新的线程池任务:
1 2 3 4 |
final long recoveryId = onGoingRecoveries.startRecovery(indexShard, sourceNode, listener, recoverySettings.activityTimeout()); threadPool.generic().execute(new RecoveryRunner(recoveryId)); |
任务处理模块:indices/recovery/RecoveryTarget.java
在doRecovery函数中,将本次要恢复的 shard 相关信息,如 shardid,metadataSnapshot 重要的是metadataSnapshot中包含 syncid等,封装成 StartRecoveryRequest ,RPC 发送出去:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
final StartRecoveryRequest request = new StartRecoveryRequest(recoveryStatus.shardId(), recoveryStatus.sourceNode(), clusterService.localNode(), false, metadataSnapshot, recoveryStatus.state().getType(), recoveryStatus.recoveryId()); recoveryStatus.indexShard().prepareForIndexRecovery(); recoveryStatus.CancellableThreads().execute(new CancellableThreads.Interruptable() { @Override public void run() throws InterruptedException { responseHolder.set(transportService.submitRequest(request.sourceNode(), RecoverySource.Actions.START_RECOVERY, request, new FutureTransportResponseHandler<RecoveryResponse>() { @Override public RecoveryResponse newInstance() { return new RecoveryResponse(); } }).txGet()); } }); |
对端(主分片节点)处理模块:/indices/recovery/RecoverySource.java
入口:StartRecoveryTransportRequestHandler.messageReceived
主要处理逻辑:RecoverySourceHandler.recoverToTarget()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
try (Translog.View translogView = engine.getTranslog().newView()) { final SnapshotIndexCommit phase1Snapshot; try { phase1Snapshot = shard.snapshotIndex(false);//对当前索引做快照 } try {//phase1阶段,该阶段是把索引文件和请求的进行对比,然后得出有差异的部分,主动将数据推送给请求方 phase1(phase1Snapshot, translogView); } //当前的translogView 进行一次snapshot,然后发送 try (Translog.Snapshot phase2Snapshot = translogView.snapshot()) { phase2(phase2Snapshot); } finalizeRecovery(); } |
在第一阶段,值得注意的是关于 syncid 的处理,如果两个分片有一致的 syncid, 且 doc 数相同,则跳过第一阶段.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
String recoverySourceSyncId = recoverySourceMetadata.getSyncId(); String recoveryTargetSyncId = request.metadataSnapshot().getSyncId(); final boolean recoverWithSyncId = recoverySourceSyncId != null && recoverySourceSyncId.equals(recoveryTargetSyncId); if (recoverWithSyncId) { final long numDocsTarget = request.metadataSnapshot().getNumDocs(); final long numDocsSource = recoverySourceMetadata.getNumDocs(); if (numDocsTarget != numDocsSource) { throw new IllegalStateException("try to recover " + request.shardId() + " from primary shard with sync id but number of docs differ: " + numDocsTarget + " (" + request.sourceNode().getName() + ", primary) vs " + numDocsSource + "(" + request.targetNode().getName() + ")"); } } else{ //计算 diff 并发送 } |
在第二阶段,从当前translogView进行快照后批量发送,
对端的处理模块:RecoveryTarget.TranslogOperationsRequestHandler
主要是调用 recoveryStatus.indexShard().performBatchRecovery
重放 translog
recovery 慢的原因分析
最慢的过程在于副本分片恢复的第一阶段,各节点单独执行分段合并逻辑,合并后的分段基本不会相同,所以拷贝 lucene 分段是最耗时的,其中有一些相关的限速配置:
1 2 3 4 5 |
cluster.routing.allocation.node_concurrent_recoveries 单个节点最大并发进/出 recovery 数,默认2 indices.recovery.max_bytes_per_sec 默认40m indices.recovery.concurrent_streams 单个节点恢复时可以打开的网络流数量,默认3 |
即使关闭限速,这个阶段仍然可能非常漫长,目前最好的方式就是先执行 synced flush, 但是 syncd flush 并且本身也可能比较慢,因为我们常常为了优化写入速度而加大 translog 刷盘周期,也会延长 translog 恢复阶段时间
在 es 6.0中再次优化这个问题,思路是给每次写入成功的操作都分配一个序号,通过对比序号就可以计算出差异范围,在实现方式上, 添加了global checkpoint 和 local checkpoint,checkpoint,主分片负责维护global checkpoint,代表所有分片都已写入到了这个序号的位置,local checkpoint代表当前分片已写入成功的最新位置,恢复时通过对比两个序列号,计算出缺失的数据范围,然后通过translog重放这部分数据,同时 translog 会为此保留更长的时间.
参考:
https://www.elastic.co/blog/elasticsearch-sequence-ids-6-0
https://github.com/elastic/elasticsearch/issues/10708
synced flush 机制
es 为了解决副本分片恢复过程第一阶段的漫长过程引入synced flush,默认情况下5分钟没有写入操作的索引被标记为inactive,执行 synced flush,生成一个唯一的 syncid,写入到所有 shard, 这个 syncid是shard 级,拥有相同syncid的 shard具有相同的 lucene 索引.
synced flush的实现思路是先执行普通的 flush 操作,各分片 flush 成功后,他们理应有相同的 lucene 索引内容,无论分段是否一致.于是给大家分配一个 id, 表示数据一致.但是显然 synced flush 期间不能有新写入的内容,对于这种情况, es 的处理是:让 synced flush 失败,让写操作成功.在没有执行 flush 的情况下已有 syncid 不会失效.当某个 shard 上执行了普通 flush 操作会删除已有 syncid,因此,synced flush操作是一个不可靠操作,只适用于冷索引.
主要实现:
1 2 3 4 5 6 7 8 9 |
private PreSyncedFlushResponse performPreSyncedFlush(PreShardSyncedFlushRequest request) { indexShard.flush(flushRequest); } private ShardSyncedFlushResponse performSyncedFlush(ShardSyncedFlushRequest request) { Engine.SyncedFlushResult result = indexShard.syncFlush(request.syncId(), } |
indexShard.syncFlush
只是写了一个 id 进去:
代码路径:
1 2 3 4 |
InternalEngine#syncFlush commitIndexWriter(indexWriter, translog, syncId); |
副分片如何做到和主分片一致的
index.recovery 的一个难题在于如何维护主副分片一致性。假设从副分片 recovery 之前到 recovery 完毕一致有写操作,他是如何实现一致的呢?
在2.0 版本之前,副本recovery 要经历三个阶段:
- phase1:将主分片的 lucene做快照,发送到 target。期间不阻塞索引操作,新增数据写到主分片的 translog
- phase2:将主分片 translog 做快照,发送到 target 重放,期间不阻塞索引操作。
- phase3:为主分片加写锁,将剩余的translog 发送到 target。此时数据量很小,写入过程的阻塞很短。
从第一阶段开始,就要阻止 lucene 执行commit 操作,避免 translog 被刷盘后清除。
本质上来说,只要流程上允许将写操作阻塞一段时间,实现主副一致是比较容易的。但是后来(从2.0开始)官方觉得不太好:
为了安全地完成 recoveries / relocations,我们必须在 recovery 开始后保持所有的operation全部 done,以便重放。目前我们实现这点是通过防止engine flush,从而确保操作operations都在 translog 中。这不是一个问题,因为我们确实需要这些operations。但是如果另一个 recovery 并发启动,可能会有不必要的长时间重试。另外如果我们在这个时候因为某种原因关闭了engine(比如一个节点重新启动),当我们回来的时候,我们需要恢复一个很大的 translog。
为了解决这个问题,translog被改为基于多个文件而不是一个文件。 这允许recovery保留所需的文件,同时允许engine执行flush,以及执行lucene的commit(这将创建一个新的translog文件)。
重构了 translog 文件管理模块,允许多个文件。
translog 维护一个引用文件的列表。包括未完成的recovery 以及那些包含尚未提交到 lucene 的operations的文件
引入了新的 translog.view概念,允许 recovery 获取一个引用,包括所有当前未提交的 translog 文件,以及所有未来新创建的 translog 文件,直到 view 关闭。他们可以使用这个 view 做operations的遍历操作
phase3被删除,这个阶段是重放operations,同时防止新的写入到engine。这是不必要的,因为自 recovery 开始,标准的 index 操作会发送所有的operations到正在recovery中的 shard。重放recovery 开始时获取的 view 中的所有operations足够保证不丢失任何operations。
从2.0开始,phase3被删除。对于如何做到主副一致的,描述的很模糊。分析完相关代码后,整理流程如下:
先创建一个 Translog.view,然后
- phase1:将主分片的 lucene 做快照,发送到 target。期间允许索引操作和 flush 操作。发送完毕后,告知 target 启动 engine,phase2开始之前,新的索引操作都会转发副分片正常执行。
- phase2:将主分片的 translog 做快照,发送到 target 去重放。
完整性:
phase2 对translog 的快照包含了从 phase1开始的新增操作,而 phase2开始之前,副分片已经可以正常处理写操作,只要把 phase2的 translog 重放,就可以保证副分片不丢数据
一致性:
由于没有了阻塞写操作的第三阶段,接下来的问题就是解决 phase1和 phase2之间的写操作,与 phase2重放操作之间的时序和冲突问题。在 phase1执行完毕后,副分片已经可以正常处理写请求,副分片的新增写操作和 translog 重放的写操作是并行执行的。如果 translog 重放慢,又把他写会老数据怎么办?
es 现在的机制是在写操作中做异常处理。
写操作有三种类型:新增、更新、删除,分别看一下处理机制:
新增:不存在冲突问题,不需要处理。
更新:判断本次操作的版本号是否小于 lucene 中 doc 的版本号,如果小于,则放弃本次操作。
Index,Delete,都继承自Operation,每个Operation都有一个版本号,这个版本号就是 doc 版本号。对于副分片的写流程来说,正常情况下是主分片写成功后,相应 doc 写入的版本号被放到转发写副分片的请求中。对于更新来说,就是主分片将原 doc 版本号+1后转发的副分片来的。在对比版本号的时候:
1 2 3 |
expectedVersion = 写副分片请求中的 version = 写主分片成功后的 version |
副分片在InternalEngine#index函数中通过plan判断是否写到 lucene:
1 2 3 4 |
// non-primary mode (i.e., replica or recovery) plan = planIndexingAsNonPrimary(index); |
在planIndexingAsNonPrimary函数中,通过
1 2 3 |
final OpVsLuceneDocStatus opVsLucene = compareOpToLuceneDocBasedOnVersions(index); |
1 2 3 4 5 6 7 8 9 10 11 12 13 |
private OpVsLuceneDocStatus compareOpToLuceneDocBasedOnVersions(final Operation op) throws IOException { assert op.version() >= 0 : "versions should be non-negative. got " + op.version(); final VersionValue versionValue = resolveDocVersion(op); if (versionValue == null) { return OpVsLuceneDocStatus.LUCENE_DOC_NOT_FOUND; } else { return op.versionType().isVersionConflictForWrites(versionValue.getVersion(), op.version(), versionValue.isDelete()) ? OpVsLuceneDocStatus.OP_STALE_OR_EQUAL : OpVsLuceneDocStatus.OP_NEWER; } } |
判断当前操作的版本号是否低于 lucene 中的版本号。
对比部分:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
EXTERNAL((byte) 1) { @Override public boolean isVersionConflictForWrites(long currentVersion, long expectedVersion, boolean deleted) { if (currentVersion == Versions.NOT_FOUND) { return false; } if (expectedVersion == Versions.MATCH_ANY) { return true; } if (currentVersion >= expectedVersion) { return true; } return false; } |
如果 translog 重放的操作在写一条老数据,compareOpToLuceneDocBasedOnVersions 会返回:OpVsLuceneDocStatus.OP_STALE_OR_EQUAL,
plan 的最终结果就是:plan = IndexingStrategy.skipAsStale,后面就会跳过写 lucene 和 translog 的逻辑。
删除:判断本次操作中的版本号是否小于 lucene 中 doc 的版本号,如果小于,放弃本次操作。
同样,在InternalEngine#delete函数中,
1 2 3 |
plan = planDeletionAsNonPrimary(delete); |
判断是否要从 lucene 删除:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
private DeletionStrategy planDeletionAsNonPrimary(Delete delete) throws IOException { final OpVsLuceneDocStatus opVsLucene = compareOpToLuceneDocBasedOnVersions(delete); final DeletionStrategy plan; if (opVsLucene == OpVsLuceneDocStatus.OP_STALE_OR_EQUAL) { plan = DeletionStrategy.processButSkipLucene(false, delete.version()); } else { plan = DeletionStrategy.processNormally( opVsLucene == OpVsLuceneDocStatus.LUCENE_DOC_NOT_FOUND, delete.version()); } return plan; } |
通过compareOpToLuceneDocBasedOnVersions判断本次操作是否小于 lucenne 中 doc 的版本号,与 Index 操作时使用相同的函数。
如果 translog 重放的是一个老 的删除操作,compareOpToLuceneDocBasedOnVersions 会返回:OpVsLuceneDocStatus.OP_STALE_OR_EQUAL,
plan 的最终结果就是:plan = DeletionStrategy.processButSkipLucene,后面就会跳过从 lucene 删除的逻辑。
提升 recovery 速度的建议
使用 _forcemerge
由于 synced flush 不是可靠操作,以下操作都会将其打断:
1. 因写入过程被打断
2. 因普通 flush 被删除 syncdid
3. 因系统自行merge后 flush 删除syncdid
对于冷索引,可以考虑将 segment 强制合并为一个分段,这样各分片 segment 一致,可以跳过副本恢复的第一阶段.
执行:
1 2 3 |
forcemerge?max_num_segments=1 |
集群 FullRestart 的建议操作过程
- 停止写入
- 禁用 shard allocation
1 2 3 |
curl -XPUT localhost:9200/_cluster/settings { "persistent": { "cluster.routing.allocation.enable": "none" }} |
- 执行 synced flush
1 2 3 |
curl -XPOST localhost:9200/_flush/synced |
- 重启集群
- 等待到 yellow 状态后,启用 allocation
1 2 3 |
curl -XPUT localhost:9200/_cluster/settings { "persistent": { "cluster.routing.allocation.enable": "all" }} |
- 等待 recovery 完毕
- 开启写入程序
一些用于查看 recovery 状态的命令
1 2 3 4 5 |
curl localhost:9200/{index}/_stats?level=shards&pretty curl localhost:9200/{index}/_recovery?pretty&human&detailed=true curl localhost:9200/_cat/recovery |
参考:
https://www.elastic.co/guide/en/elasticsearch/reference/2.3/indices-optimize.html
https://www.elastic.co/guide/en/elasticsearch/reference/5.5/indices-recovery.html
https://www.elastic.co/guide/en/elasticsearch/reference/5.5/restart-upgrade.html
http://www.jianshu.com/p/0d0f3d2b9ecd
https://elasticsearch.cn/article/38
https://www.elastic.co/guide/en/elasticsearch/reference/5.5/shards-allocation.html
https://github.com/elastic/elasticsearch/pull/10624
(转载请注明作者和出处 easyice.cn ,请勿用于任何商业用途)