elasticsearch 写流程
基于版本:2.3.2
这里分析 es 写入单个 doc 和 bulk 请求的处理流程,为了先弄清楚 es 的流程,暂时不涉及 lucene 内部处理
在分析读流程时,将流程划分成功各个阶段,在写流程中,由于可能涉及到的节点较多,把流程按不同节点执行的操作划分更清楚一些.
通过分析写流程,我们得到结论:
1. 集群 red 时,写操作到达未损坏的分片是可以正常处理的
2. 对索引别名执行写操作,别名只关联了一个索引的情况下是可以正常处理的
3. 自动创建索引是阻塞的过程,创建完成才会继续后面的流程
4. 关于写一致性,如果 activeshard 不足,不会等待,而是直接失败
5. 只有内容路由阶段才有重试机制,磁盘写失败等不会重试,而是直接失败,之后进入 shard 迁移流程
6. Master 记录的元数据信息中不含某个 doc 的版本号,因此写完不会上报 Master
7. es2.0之前的版本无法保证写入流程不丢失数据,之后的版本可以调整配置保证
put 基本流程
新建、索引和删除 请求都是 写 操作, 必须在主分片上面完成之后才能被复制到相关的副本
写操作可能会发生三个节点:协调节点,主分片所在节点,副本分片所在节点
下面从这三个节点上发生的流程分别梳理.
put 详细流程
详细流程图:
1. 协调节点流程
协调节点负责创建索引,转发请求到主分片节点,等待响应,回复客户端
路径:action.index.TransportIndexAction#doExecute
检查索引是否存在,如果不存在,且允许自动创建索引,就创建他
创建索引请求被发送到 master, 直到收到其 Response 之后,进入写doc操作主逻辑.master 什么时候返回Response? 在 master 执行完创建索引流程,将新的 clusterState 发布完毕后才会返回.那什么才算发布完毕呢?默认情况下,master 发布 clusterState 的 Request 收到半数以上的节点 Response, 认为发布成功.负责写数据的节点会先走一遍内容路由的过程已处理没有收到最新 clusterState 的情况.
参考手册
以下是为内容路由的过程,目的是找到主分片所在节点,转发请求:
action.support.replication.TransportReplicationAction.ReroutePhase#doRun
其中几个关键环节:
加载映射:加载请求指的的 type, 或使用默认映射
检查别名:如果是索引别名,做一些检查工作,有下列之一的将做失败处理:
- 关联了一个以上的索引.
- 别名设置了 routing, 且与请求的不一致
- 请求参数中的 routing 指定了多个.(search_routing允许设置多个)
获取主分片路由:之后判断是否自动生成 docid,计算 shardid(参考读流程),获取到主分片路由信息,如果主分片处于不可用状态,将进行重试,重试的触发时机为收到新的 clusterState ,或者1分钟超时.
转发请求:主分片确定后,将请求发送到主分片所在节点,等待其 Response,协调过程至此完毕,主节点可能在本地也可能在其他节点,如果在本地,不会产生网络请求,通过函数调用到相应的处理模块.
2. 主分片所在节点处理流程
主分片所在节点负责在本地写主分片,转发写副本分片请求,等待响应,回复协调节点.
在上一个流程中,目标是发送到网络中其他节点的,那么节点收到这个请求后,首先,将内容路由的流程重新走一遍~~ 因为在索引创建的过程返回后,并非集群的所有节点都有了最新的 clusterState,写操作落到这个节点上就会写失败.
接下来,进入primary 阶段:
action.support.replication.TransportReplicationAction.PrimaryPhase#doRun
这一部分很关键,贴一段比较长的代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 |
protected void doRun() throws Exception { setPhase(task, "primary"); // request shardID was set in ReroutePhase assert request.shardId() != null : "request shardID must be set prior to primary phase"; final ShardId shardId = request.shardId(); //写一致性检查,检查失败做整体流程失败处理 final String writeConsistencyFailure = checkWriteConsistency(shardId); if (writeConsistencyFailure != null) { finishBecauseUnavailable(shardId, writeConsistencyFailure); return; } final ReplicationPhase replicationPhase; try { indexShardReference = getIndexShardOperationsCounter(shardId); //写主分片 Tuple<Response, ReplicaRequest> primaryResponse = shardOperationOnPrimary(state.metaData(), request); if (logger.isTraceEnabled()) { logger.trace("action [{}] completed on shard [{}] for request [{}] with cluster state version [{}]", transportPrimaryAction, shardId, request, state.version()); }//准备要写的副本分片 replicationPhase = new ReplicationPhase(task, primaryResponse.v2(), primaryResponse.v1(), shardId, channel, indexShardReference); } catch (Throwable e) { request.setCanHaveDuplicates(); if (ExceptionsHelper.status(e) == RestStatus.CONFLICT) { if (logger.isTraceEnabled()) { logger.trace("failed to execute [{}] on [{}]", e, request, shardId); } } else { if (logger.isDebugEnabled()) { logger.debug("failed to execute [{}] on [{}]", e, request, shardId); } } finishAsFailed(e); return; } //写副本 finishAndMoveToReplication(replicationPhase); } |
无论是写单个 doc, 还是 bulk 写多个,都是这个处理逻辑, bulk 只是改写了shardOperationOnPrimary和shardOperationOnReplica,可以看出,写异常没有重试逻辑
检测写一致性
这里是很容易误解的地方,默认的一致性策略为半数以上,很多文章误认为写完半数以上的 shard 认为成功,而真正的含义是:写操作之前,涉及到要写的shard,可用 shard 数过半时,才执行写操作.参考手册
特别的,当数据分片设置为1个副本分片时(共两份数据),默认一致性行为是主分片可以写入成功即可.
写主分片
路径:
action.index.TransportIndexAction#shardOperationOnPrimary
index.shard.IndexShard#index||create
index 还是 create
如果putAPI 指定了 op_type=create,或者自动生成 ID ,会进入 create 过程,否则进入 index 过程.
create 路径:index.engine.InternalEngine#innerCreate
index 路径:index.engine.InternalEngine#innerIndex
如果是自动生成 id, 不检查数据版本号
写 doc流程:
加锁
获取版本号
从内存:versionMap 或者磁盘读取
检查数据版本
检查请求中的版本号与数据版本号是否一致(这里就是实现乐观并发控制),不一致按失败处理,否则更新版本号
写入lucene
通过版本号判断 doc 是否已存在,调用 lucene 的 add 或 update 接口写入数据
写 translog
写结束
至此 doc 写入完毕,释放锁.
处理可能 refresh
检查请求中是否有 refresh 设置,决定是否刷盘.
失败:如果主分片写失败,不会进入写副本流程
转发写副本请求
路径:TransportReplicationAction.ReplicationPhase#doRun
现在已经为要写的副本shard准备了一个列表,循环处理每个shard, 跳过unassigned 的,向每个目标节点发送请求,等待响应,这些都是异步的.
在等待 Response 的过程中,本节点发出了多少个 Request, 就要等待多少个 Response, 无论这些 Response 是成功的还是失败的,直到超时.收集到全部的 Response 后,执行doFinish(),给主分片节点返回消息,告知其哪些成功,哪些失败了.
3.副本分片节点流程
路径:TransportReplicationAction.AsyncReplicaAction#doRun
与主分片写 doc 一样的过程,完毕后回复主分片节点.
bulk 流程
客户端向 Node 1 发送 bulk 请求。
Node 1 为每个节点创建一个批量请求,并将这些请求并行转发到每个包含主分片的节点主机。
主分片一个接一个按顺序执行每个操作。当每个操作成功时,主分片并行转发新文档(或删除)到副本分片,然后执行下一个操作。 一旦所有的副本分片报告所有操作成功,该节点将向协调节点报告成功,协调节点将这些响应收集整理并返回给客户端。
官方的原图的第2步骤少了一条写本地主 shard 的过程,下图比较丑的那条线是我补充的.
主要处理类 TransportShardBulkAction 继承关自单个 doc 处理类:TransportReplicationAction,复用单个 doc 处理逻辑
入口路径:action.bulk.TransportBulkAction#doExecute
详细流程:
创建索引
BulkRequest是接口收到的原始 bulk 请求列表,遍历 BulkRequest,去除重复索引名称, 存到 indicesAndTypes,然后从中遍历,异步创建索引.等待所有索引创建完毕后进入下一步.在此阶段,如果某个 index 创建失败,将把位于此 index 的所有请求做失败处理.而其他创建成功的 index 上的请求会进入下一换环节
合并请求
遍历 BulkRequest 请求,为每个请求中计算 shardid,重新组织成以 shardid 为单位的结构:
Map<ShardId, List<BulkItemRequest>> requestsByShard
基于上述结构再把落到某个 shard 上的请求组织成下游能处理的 BulkShardRequest 结构
协调处理每个 shard 上的请求
作为协调节点,异步并行发出以 shard 为单位的请求,循环执行:
shardBulkAction.execute(bulkShardRequest, new ActionListener<BulkShardResponse>()
等待响应,每个响应也是以 shard 为单位的.如果某个 shard 的响应中部分 doc 写失败了,将异常信息填充到 Response 中,整体请求做成功处理.
主分片所在节点处理逻辑
作为主分片所在节点,顺序执行主分片的每个写操作,如果某个操作失败了,不会重试,也不会把位于此 shard 上的全部请求做失败处理,而是将失败的条目标记原因,进行下一条处理.
待主分片所有操作处理完毕,执行写副本.并行向其他节点发出写副本请求,等待响应,
主要逻辑 TransportShardBulkAction 由于是继承自 TransportReplicationAction,其执行入口为:
TransportReplicationAction.ReroutePhase#doRun
参考上面的大片代码.
派生类 TransportShardBulkAction 改写了父类的:
1 2 3 4 |
shardOperationOnPrimary shardOperationOnReplica |
两个函数来改写主分片和副本上的写逻辑,将原来的写一个改成写多个.其中,shardOperationOnPrimary 在主分片所在节点执行,shardOperationOnReplica 在副本分片所在节点执行.
异常流程
在一个 shard 上执行的一些操作可能会产生 IO 异常之类的情况,一个 shard 上的 CRUD 等操作在 ES 里由一个Engine对象封装,在Engine处理过程中,部分操作产生的部分异常 ES 会认为应当关闭此Engine,上报 master.例如,系统 io 层面的写入失败.
Engine类中的maybeFailEngine()负责检查是否应当关闭引擎:failEngine()
可能会触发maybeFailEngine()的有以下操作:
createSearcherManager
create: 创建文档
index: 索引文档
delete: 删除文档
delete_by_query
refresh
sync commit
flush
force merge
renew sync commit
注意,其中不包含 get 操作,也就是说读取 doc 失败不会触发 shard 迁移.
异常类型主要包括:OutOfMemoryError,IllegalStateException,IOException,并非每个操作都有这三种异常.典型的代码片段:
1 2 3 4 5 6 |
catch (OutOfMemoryError | IllegalStateException | IOException t) { maybeFailEngine("create", t); throw new CreateFailedEngineException(shardId, create.type(), create.id(), t); } |
关闭引擎的具体过程
首先将存储设置为异常:
Lucene.isCorruptionException(failure)
删除 shard:
indexService.removeShard(shardRouting.getId(), message);
向 master 节点发送 SHARD_FAILED_ACTION_NAME 请求:
innerShardFailed(shardRouting, indexUUID, masterNode, message, failure);
master 对 SHARD_FAILED_ACTION_NAME的处理
提交集群状态更新消息:
1 2 3 4 5 6 7 8 |
clusterService.submitStateUpdateTask( "shard-failed (" + shardRoutingEntry.shardRouting + "), message [" + shardRoutingEntry.message + "]", shardRoutingEntry, BasicClusterStateTaskConfig.create(Priority.HIGH), shardFailedClusterStateHandler, shardFailedClusterStateHandler); |
从代码来看,集群状态更新完毕后执行shardFailedClusterStateHandler任务,任务中的 clusterStatePublished() 会 检查所有的unassigned shard,并进行 reroute:
1 2 3 4 5 6 7 8 9 10 11 12 |
public void clusterStatePublished(ClusterState newClusterState) { int numberOfUnassignedShards = newClusterState.getRoutingNodes().unassigned().size(); if (numberOfUnassignedShards > 0) { String reason = String.format(Locale.ROOT, "[%d] unassigned shards after failing shards", numberOfUnassignedShards); if (logger.isTraceEnabled()) { logger.trace(reason + ", scheduling a reroute"); } routingService.reroute(reason); } } |
异常流程总结
- 路由阶段失败,会等待集群状态更新,拿到更新后,进行重试,再次失败,仍旧等集群状态更新,直到超时1m 为止.超时后仍失败的做整体请求失败处理.
- 路由阶段之后,当写一个 doc 失败,集群不会重试.而是从本地删除 shard, 然后向 master 汇报,删除是以 shard 为单位的.
- 写入过程中,主 shard写入是阻塞的过程,只有写入成功,才会发起写副本请求,如果主 shard 写失败,整个请求做失败处理.如果有部分副本写失败,整个请求做成功处理
手册中的错误
关于写入数据时的超时,中文手册中说:
如果没有足够的副本分片会发生什么? Elasticsearch会等待,希望更多的分片出现。默认情况下,它最多等待1分钟。 如果你需要,你可以使用 timeout 参数 使它更早终止: 100 100毫秒,30s 是30秒。
但是从代码来看,没有这种过程,副本数不够时,整个请求做失败处理.
数据安全性
回想整个写入流程, ES有没有数据丢失的风险,写入过程返回成功是否代表数据一定安全,能否作为可靠的中心存储?
ES 的设计倾向于更高的写入速度,所以默认设置下并没有保证数据写入过程绝对可靠:
- doc 写到 shard 时默认是被缓存的,不会立即刷到硬盘
- 事务日志也是有缓存的,不会立即刷到硬盘
关于这两个缓存的刷入,有两个对应的选项控制:
- refresh选项控制lucene 数据从内存转移到磁盘缓存,注意这里是到缓存并非硬盘
- translog.flush控制translog刷入磁盘.可以通过接口或配置来控制.
refresh
用于控制搜索的实时性.刚刚写入的数据多长时间可以被搜到
flush
用于控制索引数据的一致性.索引数据通过 translog 恢复
在默认配置下,translog的刷新策略为:每次写入都刷新
你需要在搜索实时性,写入速度和数据安全性直接综合考虑以调整这两方面的配置.
因此:
1.集群为 GREEN 状态:只要 translog 是可靠的,索引数据就是可靠的,而 translog 的flush 特性与 ES 版本有关,参考官方的说明可以得到以下结论:
1 2 3 |
2.0之前的版本无法保证可靠性,之后的版本可以调整配置达到完全可靠 |
2.当副本数量为1(数据存2份),其中一份产生异常,导致 activeShard 等于1,默认情况下,此时写操作可以成功.此时两份 shard 产生不一致,这时最后一份数据所在节点挂掉,然后最早产生异常的节点先启动,其老数据 shard 成为主分片,数据最新的节点后启动,那么recovery 过程会从主分片同步,删除新增的数据,导致数据丢失.因此,非 GREEN 状态下,写操作不是完全可靠的
5.5 版本差异
协调节点在开始处理时会先检测集群状态,集群异常则取消写入。例如,master 节点不存在,会阻塞等待 master 节点直至超时。
1 2 3 4 5 6 |
final ClusterState clusterState = observer.setAndGetObservedState(); if (handleBlockExceptions(clusterState)) { return; } |
因此索引为 red 时,如果 master节点存在,数据可以写到正常 shard,master 节点不存在,协调节点会阻塞等待或取消写入。
参考:
https://kibana.logstash.es/content/elasticsearch/principle/realtime.html?spm=5176.100239.0.0.sljkxE
下面这篇对不可靠原因的推理是错误的,供参考
http://www.hansight.com/blog-elasticsearch-data-loss-scenarios.html
几个疑问
- 为什么写失败会实施 shard 迁移机制,而读失败不会?经过试验之后基本认为:es 将读失败交给 recovery 流程修复数据,但我的测试并没有得出哪些条件会触发 recovery,需要进一步确认.
- 既然读失败交给 recovery 处理,shard 不迁移,如果磁盘坏了recovery 失败怎么办?
- 写失败, shard 进行迁移的时候,目标节点会排除故障 shard 所在节点吗?
- 通过索引别名进行搜索时,是从相关索引遍历的吗?别名关联太多索引有没有性能问题?
(转载请注明作者和出处 easyice.cn ,请勿用于任何商业用途)
7 thoughts on “elasticsearch 写流程”
最近在基于ELK做快速迭代做AIOps产品原型,遇到很多坑,笔者这些文章写的质量很高,远比国内搜索引擎上大部分有深度,希望有机会可以多交流!
感谢,互相学习!可以加 es 交流群:475241551
博主很厉害,学习了
请问,ES写入的时候,写入了主分片,只有同步到所有副分片,才能查询吧?如果未同步,就能查询,岂不是会导致:两次查询分别走主副分片,结果数量会先大后小——这个用户体验很差了吧?
没错,是这样的,主分片写完毕,副分片写完毕之前,从主副分片会查到不同的结果。你可以指定从主分片查询。
您好,这里提到的写一致性 write_consistency 和wait_for_active_shards 这个有什么联系吗