elasticsearch 写流程

elasticsearch 写流程


基于版本:2.3.2

这里分析 es 写入单个 doc 和 bulk 请求的处理流程,为了先弄清楚 es 的流程,暂时不涉及 lucene 内部处理

在分析读流程时,将流程划分成功各个阶段,在写流程中,由于可能涉及到的节点较多,把流程按不同节点执行的操作划分更清楚一些.

通过分析写流程,我们得到结论:
1. 集群 red 时,写操作到达未损坏的分片是可以正常处理的
2. 对索引别名执行写操作,别名只关联了一个索引的情况下是可以正常处理的
3. 自动创建索引是阻塞的过程,创建完成才会继续后面的流程
4. 关于写一致性,如果 activeshard 不足,不会等待,而是直接失败
5. 只有内容路由阶段才有重试机制,磁盘写失败等不会重试,而是直接失败,之后进入 shard 迁移流程
6. Master 记录的元数据信息中不含某个 doc 的版本号,因此写完不会上报 Master
7. es2.0之前的版本无法保证写入流程不丢失数据,之后的版本可以调整配置保证

put 基本流程


新建、索引和删除 请求都是 写 操作, 必须在主分片上面完成之后才能被复制到相关的副本

写操作可能会发生三个节点:协调节点,主分片所在节点,副本分片所在节点
下面从这三个节点上发生的流程分别梳理.

put 详细流程


详细流程图:

1. 协调节点流程

协调节点负责创建索引,转发请求到主分片节点,等待响应,回复客户端

路径:action.index.TransportIndexAction#doExecute
检查索引是否存在,如果不存在,且允许自动创建索引,就创建他

创建索引请求被发送到 master, 直到收到其 Response 之后,进入写doc操作主逻辑.master 什么时候返回Response? 在 master 执行完创建索引流程,将新的 clusterState 发布完毕后才会返回.那什么才算发布完毕呢?默认情况下,master 发布 clusterState 的 Request 收到半数以上的节点 Response, 认为发布成功.负责写数据的节点会先走一遍内容路由的过程已处理没有收到最新 clusterState 的情况.
参考手册

以下是为内容路由的过程,目的是找到主分片所在节点,转发请求:
action.support.replication.TransportReplicationAction.ReroutePhase#doRun
其中几个关键环节:

加载映射:加载请求指的的 type, 或使用默认映射

检查别名:如果是索引别名,做一些检查工作,有下列之一的将做失败处理:

  • 关联了一个以上的索引.
  • 别名设置了 routing, 且与请求的不一致
  • 请求参数中的 routing 指定了多个.(search_routing允许设置多个)

获取主分片路由:之后判断是否自动生成 docid,计算 shardid(参考读流程),获取到主分片路由信息,如果主分片处于不可用状态,将进行重试,重试的触发时机为收到新的 clusterState ,或者1分钟超时.

转发请求:主分片确定后,将请求发送到主分片所在节点,等待其 Response,协调过程至此完毕,主节点可能在本地也可能在其他节点,如果在本地,不会产生网络请求,通过函数调用到相应的处理模块.

2. 主分片所在节点处理流程

主分片所在节点负责在本地写主分片,转发写副本分片请求,等待响应,回复协调节点.

在上一个流程中,目标是发送到网络中其他节点的,那么节点收到这个请求后,首先,将内容路由的流程重新走一遍~~ 因为在索引创建的过程返回后,并非集群的所有节点都有了最新的 clusterState,写操作落到这个节点上就会写失败.
接下来,进入primary 阶段:
action.support.replication.TransportReplicationAction.PrimaryPhase#doRun

这一部分很关键,贴一段比较长的代码:

无论是写单个 doc, 还是 bulk 写多个,都是这个处理逻辑, bulk 只是改写了shardOperationOnPrimary和shardOperationOnReplica,可以看出,写异常没有重试逻辑

检测写一致性
这里是很容易误解的地方,默认的一致性策略为半数以上,很多文章误认为写完半数以上的 shard 认为成功,而真正的含义是:写操作之前,涉及到要写的shard,可用 shard 数过半时,才执行写操作.参考手册

特别的,当数据分片设置为1个副本分片时(共两份数据),默认一致性行为是主分片可以写入成功即可.

写主分片
路径:
action.index.TransportIndexAction#shardOperationOnPrimary
index.shard.IndexShard#index||create

index 还是 create
如果putAPI 指定了 op_type=create,或者自动生成 ID ,会进入 create 过程,否则进入 index 过程.
create 路径:index.engine.InternalEngine#innerCreate
index 路径:index.engine.InternalEngine#innerIndex
如果是自动生成 id, 不检查数据版本号

写 doc流程:

加锁
获取版本号
从内存:versionMap 或者磁盘读取
检查数据版本
检查请求中的版本号与数据版本号是否一致(这里就是实现乐观并发控制),不一致按失败处理,否则更新版本号
写入lucene
通过版本号判断 doc 是否已存在,调用 lucene 的 add 或 update 接口写入数据
写 translog
写结束
至此 doc 写入完毕,释放锁.

处理可能 refresh
检查请求中是否有 refresh 设置,决定是否刷盘.
失败:如果主分片写失败,不会进入写副本流程

转发写副本请求
路径:TransportReplicationAction.ReplicationPhase#doRun
现在已经为要写的副本shard准备了一个列表,循环处理每个shard, 跳过unassigned 的,向每个目标节点发送请求,等待响应,这些都是异步的.
在等待 Response 的过程中,本节点发出了多少个 Request, 就要等待多少个 Response, 无论这些 Response 是成功的还是失败的,直到超时.收集到全部的 Response 后,执行doFinish(),给主分片节点返回消息,告知其哪些成功,哪些失败了.

3.副本分片节点流程

路径:TransportReplicationAction.AsyncReplicaAction#doRun
与主分片写 doc 一样的过程,完毕后回复主分片节点.

bulk 流程

客户端向 Node 1 发送 bulk 请求。
Node 1 为每个节点创建一个批量请求,并将这些请求并行转发到每个包含主分片的节点主机。
主分片一个接一个按顺序执行每个操作。当每个操作成功时,主分片并行转发新文档(或删除)到副本分片,然后执行下一个操作。 一旦所有的副本分片报告所有操作成功,该节点将向协调节点报告成功,协调节点将这些响应收集整理并返回给客户端。

官方的原图的第2步骤少了一条写本地主 shard 的过程,下图比较丑的那条线是我补充的.

主要处理类 TransportShardBulkAction 继承关自单个 doc 处理类:TransportReplicationAction,复用单个 doc 处理逻辑

入口路径:action.bulk.TransportBulkAction#doExecute

详细流程:

创建索引
BulkRequest是接口收到的原始 bulk 请求列表,遍历 BulkRequest,去除重复索引名称, 存到 indicesAndTypes,然后从中遍历,异步创建索引.等待所有索引创建完毕后进入下一步.在此阶段,如果某个 index 创建失败,将把位于此 index 的所有请求做失败处理.而其他创建成功的 index 上的请求会进入下一换环节

合并请求
遍历 BulkRequest 请求,为每个请求中计算 shardid,重新组织成以 shardid 为单位的结构:
Map<ShardId, List<BulkItemRequest>> requestsByShard
基于上述结构再把落到某个 shard 上的请求组织成下游能处理的 BulkShardRequest 结构

协调处理每个 shard 上的请求
作为协调节点,异步并行发出以 shard 为单位的请求,循环执行:
shardBulkAction.execute(bulkShardRequest, new ActionListener<BulkShardResponse>()
等待响应,每个响应也是以 shard 为单位的.如果某个 shard 的响应中部分 doc 写失败了,将异常信息填充到 Response 中,整体请求做成功处理.

主分片所在节点处理逻辑
作为主分片所在节点,顺序执行主分片的每个写操作,如果某个操作失败了,不会重试,也不会把位于此 shard 上的全部请求做失败处理,而是将失败的条目标记原因,进行下一条处理.
待主分片所有操作处理完毕,执行写副本.并行向其他节点发出写副本请求,等待响应,

主要逻辑 TransportShardBulkAction 由于是继承自 TransportReplicationAction,其执行入口为:
TransportReplicationAction.ReroutePhase#doRun

参考上面的大片代码.
派生类 TransportShardBulkAction 改写了父类的:

两个函数来改写主分片和副本上的写逻辑,将原来的写一个改成写多个.其中,shardOperationOnPrimary 在主分片所在节点执行,shardOperationOnReplica 在副本分片所在节点执行.

异常流程


shard 异常

在一个 shard 上执行的一些操作可能会产生 IO 异常之类的情况,一个 shard 上的 CRUD 等操作在 ES 里由一个Engine对象封装,在Engine处理过程中,部分操作产生的部分异常 ES 会认为应当关闭此Engine,上报 master.例如,系统 io 层面的写入失败.
Engine类中的maybeFailEngine()负责检查是否应当关闭引擎:failEngine()
可能会触发maybeFailEngine()的有以下操作:

createSearcherManager
create: 创建文档
index: 索引文档
delete: 删除文档
delete_by_query
refresh
sync commit
flush
force merge
renew sync commit

注意,其中不包含 get 操作,也就是说读取 doc 失败不会触发 shard 迁移.

异常类型主要包括:OutOfMemoryError,IllegalStateException,IOException,并非每个操作都有这三种异常.典型的代码片段:

关闭引擎的具体过程
首先将存储设置为异常:
Lucene.isCorruptionException(failure)

删除 shard:
indexService.removeShard(shardRouting.getId(), message);

向 master 节点发送 SHARD_FAILED_ACTION_NAME 请求:
innerShardFailed(shardRouting, indexUUID, masterNode, message, failure);

master 对 SHARD_FAILED_ACTION_NAME的处理
提交集群状态更新消息:

从代码来看,集群状态更新完毕后执行shardFailedClusterStateHandler任务,任务中的 clusterStatePublished() 会 检查所有的unassigned shard,并进行 reroute:

异常流程总结

  1. 路由阶段失败,会等待集群状态更新,拿到更新后,进行重试,再次失败,仍旧等集群状态更新,直到超时1m 为止.超时后仍失败的做整体请求失败处理.
  2. 路由阶段之后,当写一个 doc 失败,集群不会重试.而是从本地删除 shard, 然后向 master 汇报,删除是以 shard 为单位的.
  3. 写入过程中,主 shard写入是阻塞的过程,只有写入成功,才会发起写副本请求,如果主 shard 写失败,整个请求做失败处理.如果有部分副本写失败,整个请求做成功处理

手册中的错误

关于写入数据时的超时,中文手册中说:

如果没有足够的副本分片会发生什么? Elasticsearch会等待,希望更多的分片出现。默认情况下,它最多等待1分钟。 如果你需要,你可以使用 timeout 参数 使它更早终止: 100 100毫秒,30s 是30秒。

但是从代码来看,没有这种过程,副本数不够时,整个请求做失败处理.

数据安全性

回想整个写入流程, ES有没有数据丢失的风险,写入过程返回成功是否代表数据一定安全,能否作为可靠的中心存储?
ES 的设计倾向于更高的写入速度,所以默认设置下并没有保证数据写入过程绝对可靠:

  1. doc 写到 shard 时默认是被缓存的,不会立即刷到硬盘
  2. 事务日志也是有缓存的,不会立即刷到硬盘

关于这两个缓存的刷入,有两个对应的选项控制:

  1. refresh选项控制lucene 数据从内存转移到磁盘缓存,注意这里是到缓存并非硬盘
  2. translog.flush控制translog刷入磁盘.可以通过接口或配置来控制.

refresh 用于控制搜索的实时性.刚刚写入的数据多长时间可以被搜到
flush 用于控制索引数据的一致性.索引数据通过 translog 恢复

在默认配置下,translog的刷新策略为:每次写入都刷新

你需要在搜索实时性,写入速度和数据安全性直接综合考虑以调整这两方面的配置.

因此:
1.集群为 GREEN 状态:只要 translog 是可靠的,索引数据就是可靠的,而 translog 的flush 特性与 ES 版本有关,参考官方的说明可以得到以下结论:

2.当副本数量为1(数据存2份),其中一份产生异常,导致 activeShard 等于1,默认情况下,此时写操作可以成功.此时两份 shard 产生不一致,这时最后一份数据所在节点挂掉,然后最早产生异常的节点先启动,其老数据 shard 成为主分片,数据最新的节点后启动,那么recovery 过程会从主分片同步,删除新增的数据,导致数据丢失.因此,非 GREEN 状态下,写操作不是完全可靠的

5.5 版本差异

协调节点在开始处理时会先检测集群状态,集群异常则取消写入。例如,master 节点不存在,会阻塞等待 master 节点直至超时。

因此索引为 red 时,如果 master节点存在,数据可以写到正常 shard,master 节点不存在,协调节点会阻塞等待或取消写入。

参考:
https://kibana.logstash.es/content/elasticsearch/principle/realtime.html?spm=5176.100239.0.0.sljkxE
下面这篇对不可靠原因的推理是错误的,供参考
http://www.hansight.com/blog-elasticsearch-data-loss-scenarios.html

几个疑问

  1. 为什么写失败会实施 shard 迁移机制,而读失败不会?经过试验之后基本认为:es 将读失败交给 recovery 流程修复数据,但我的测试并没有得出哪些条件会触发 recovery,需要进一步确认.
  2. 既然读失败交给 recovery 处理,shard 不迁移,如果磁盘坏了recovery 失败怎么办?
  3. 写失败, shard 进行迁移的时候,目标节点会排除故障 shard 所在节点吗?
  4. 通过索引别名进行搜索时,是从相关索引遍历的吗?别名关联太多索引有没有性能问题?

(转载请注明作者和出处 easyice.cn ,请勿用于任何商业用途)

1 Star2 Stars3 Stars4 Stars5 Stars (1 人打了分, 平均分: 5.00 )
Loading...

7 thoughts on “elasticsearch 写流程

  1. 最近在基于ELK做快速迭代做AIOps产品原型,遇到很多坑,笔者这些文章写的质量很高,远比国内搜索引擎上大部分有深度,希望有机会可以多交流!

  2. 请问,ES写入的时候,写入了主分片,只有同步到所有副分片,才能查询吧?如果未同步,就能查询,岂不是会导致:两次查询分别走主副分片,结果数量会先大后小——这个用户体验很差了吧?

    1. 没错,是这样的,主分片写完毕,副分片写完毕之前,从主副分片会查到不同的结果。你可以指定从主分片查询。

发表评论

邮箱地址不会被公开。 必填项已用*标注