elasticsearch allocation 分析
目录
本文主要分析allocation 模块的结构和原理,然后以集群启动过程为例分析 allocation 模块的工作过程
什么是 allocation
分片分配就是把一个分片指派到集群中某个节点的过程. 分配决策由主节点完成,分配决策包含两方面:
- 哪些分片应该分配给哪些节点
- 哪个分片作为主分片,哪些作为副本分片
对于新建索引和已有索引, 分片分配过程也不尽相同. 不过不管哪种场景, ElasticSearch都通过两个基础组件完成工作: allocators
和deciders
. Allocators尝试寻找最优的节点来分配分片, deciders则负责判断并决定是否要进行这次分配.
- 对于新建索引, allocators负责找出拥有分片数最少的节点列表, 并按分片数量增序排序, 因此分片较少的节点会被优先选择. 所以对于新建索引, allocators的目标就是以更为均衡的方式为把新索引的分片分配到集群的节点中. 然后deciders依次遍历allocators给出的节点, 并判断是否把分片分配到该节点. 例如, 如果分配过滤规则中禁止节点A持有索引idx中的任一分片, 那么过滤器也阻止把索引idx分配到节点A中, 即便A节点是allocators从集群负载均衡角度选出的最优节点. 需要注意的是allocators只关心每个节点上的分片数, 而不管每个分片的具体大小. 这恰好是deciders工作的一部分, 即阻止把分片分配到将超出节点磁盘容量阈值的节点上.
-
对于已有索引, 则要区分主分片还是副本分片. 对于主分片, allocators只允许把主分片指定在已经拥有该分片完整数据的节点上. 而对于副本分片, allocators则是先判断其他节点上是否已有该分片的数据的拷贝(即便数据不是最新的). 如果有这样的节点, allocators就优先把把分片分配到这其中一个节点. 因为副本分片一旦分配, 就需要从主分片中进行数据同步, 所以当一个节点只拥分片中的部分时, 也就意思着那些未拥有的数据必须从主节点中复制得到. 这样可以明显的提高副本分片的数据恢复过程.
触发时机
- index 增删
- node 增删
- 手工 reroute
- replica数量改变
- 集群重启
allocation 模块结构概述
这个复杂的分配过程在一个叫 reroute 的函数中实现: AllocationService.reroute ,其函数对外有两种重载,一种是通过接口调用的手工 reroute, 一种是内部模块调用的 reroute
allocators
es 中有以下几个类型的 allocator:
Allocator负责为某个特定的shard 分配目的 node。每个Allocator的主要工作是根据某种逻辑得到一个节点列表,然后调用 deciders 去决策,根据决策结果选择一个目的 node。一句话概述每个 Allocator的作用:
- primaryShardAllocator:找到那些拥有某 shard 最新数据的 node
- replicaShardAllocator:找到磁盘上拥有这个 shard 数据的 node
- BalancedShardsAllocator:找到拥有最少 shard 个数的 node
对于这两类 allocator,我的理解是 gatewayAllocator 是为了找到现有 shard,shardsAllocator 是为了分配全新 shard。
Deciders
es5中有下列类型的决策器:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
List<Class<? extends AllocationDecider>> expectedDeciders = Arrays.asList( MaxRetryAllocationDecider.class, ReplicaAfterPrimaryActiveAllocationDecider.class, RebalanceOnlyWhenActiveAllocationDecider.class, ClusterRebalanceAllocationDecider.class, ConcurrentRebalanceAllocationDecider.class, EnableAllocationDecider.class, NodeVersionAllocationDecider.class, SnapshotInProgressAllocationDecider.class, FilterAllocationDecider.class, SameShardAllocationDecider.class, DiskThresholdDecider.class, ThrottlingAllocationDecider.class, ShardsLimitAllocationDecider.class, AwarenessAllocationDecider.class); |
他们继承自AllocationDecider,需要实现的接口有:
1 2 3 4 5 6 |
* canRebalance * canAllocate 指定 shard 是否可以分配到指定 node * canRemain * canForceAllocatePrimary |
这些 decider 在org.elasticsearch.cluster.ClusterModule#createAllocationDeciders
中全部添加进去,Decider 运行之后可能产生的结果:
1 2 3 |
ALWAYS、YES、NO、THROTTLE |
这些 decider 大致可以分为以下几类
负载均衡类
SameShardAllocationDecider
避免主副分片分配到同一个节点
AwarenessAllocationDecider
感知分配器,感知服务器、机架等,尽量分散存储shard
有两种参数用于调整:
cluster.routing.allocation.awareness.attributes: rack_id
cluster.routing.allocation.awareness.attributes: zone
ShardsLimitAllocationDecider
同一个节点上允许存在的同一个index的shard数目
并发控制类
ThrottlingAllocationDecider
recovery阶段的限速配置,包括:
cluster.routing.allocation.node_concurrent_recoveries
cluster.routing.allocation.node_initial_primaries_recoveries
cluster.routing.allocation.node_concurrent_incoming_recoveries
cluster.routing.allocation.node_concurrent_outgoing_recoveries
ConcurrentRebalanceAllocationDecider
rebalance并发控制
通过cluster.routing.allocation.cluster_concurrent_rebalance参数控制,可以动态生效
DiskThresholdDecider
根据磁盘空间进行决策的分配器
条件限制类
RebalanceOnlyWhenActiveAllocationDecider
所有shard都处在active状态下,才可以执行rebalance操作
FilterAllocationDecider
通过接口动态设置的过滤器,包括:
index.routing.allocation.require.【必须】
index.routing.allocation.include. 【允许】
index.routing.allocation.exclude.* 【排除】
cluster.routing.allocation.require.*
cluster.routing.allocation.include.*
cluster.routing.allocation.exclude.*
配置的目标为节点 ip 或节点名等。cluster级别设置会覆盖 index 级别设置。
ReplicaAfterPrimaryActiveAllocationDecider
保证只会在主分片分配完毕后才开始分配分片副本。
ClusterRebalanceAllocationDecider
通过集群中active的shard状态来决定是否可以执行rebalance,通过配置cluster.routing.allocation.allow_rebalance控制,可以动态生效:
indices_all_active
当集群所有的节点分配完毕,才认定集群rebalance完成 默认
indices_primaries_active
只要所有主分片分配完毕,就可以认定集群rebalance 完成
always
即使当主分片和分片副本都没有分配,也允许rebalance操作
内部模块的 reroute
这里以集群启动时 gateway 之后的 reroute 为背景.
reroute中主要实现是两种 Allocator: gateway 和真正数据的
1 2 3 4 |
gatewayAllocator.allocateUnassigned(allocation); shardsAllocator.allocate(allocation); |
集群启动时 reroute 的触发时机
第一次触发在gateway 元信息选举完毕后submitStateUpdateTask提交的任务中执行:
1 2 3 |
allocationService.reroute |
然后收集各个节点的 shard 元数据,待某个shard 的 Response 从所有节点 全部返回 后,执行finishHim(),然后对收集到的数据进行处理:
1 2 3 |
o.e.g.AsyncShardFetch#processAsyncFetch |
进而执行:
1 2 3 |
routingService.reroute("async_shard_fetch"); |
这个任务被 clusterService 放入队列,等待 gateway 流程结束才执行,当这个 reroute 才会开始执行
流程分析
Allocator分为两种:gateway 和真正数据的.其中gateway 又分主分片和副本的Allocator.
gateway的allocate结束之后,才是数据的allocate,主分片allocate结束后才是副本的allocate
gatewayAllocator
分为主分片和副本的
1 2 3 4 5 |
primaryShardAllocator.allocateUnassigned(allocation); replicaShardAllocator.processExistingRecoveries(allocation); replicaShardAllocator.allocateUnassigned(allocation); |
主分片分配器
primaryShardAllocator.allocateUnassigned
主分片分配器与副分片分配器都继承自 BaseGatewayShardAllocator,执行相同的allocateUnassigned过程,只是 makeAllocationDecision时,主副分片分配器各自执行自己的策略。
allocateUnassigned的流程是:遍历所有unassigned shard依次处理,通过 decider 决策分配,期间可能需要 fetchData 获取这个 shard 对应的元数据。如果决策结果为 YES,就将其 init
主副分片执行相同的 unassignedIterator.initialize:
创建一个ShardRouting,如果是副分片,会设置recoverySource为 PEER。然后将其添加到集群状态,设置状态已更新
更新的内容大约就是某个 shard 被分配到了某个节点,这个 shard 是主还是副,副的话会设置recoverySource为 PEER,但只是一个类型,并没有告诉节点 recovery的时候从哪个节点去恢复,节点恢复时自己从集群状态中的路由表中查找。
然后, master 把新的集群状态广播下去,当数据节点发现某个分片分配给自己,开始执行分片的 recovery。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
public void allocateUnassigned(RoutingAllocation allocation) { while (unassignedIterator.hasNext()) { final ShardRouting shard = unassignedIterator.next(); final AllocateUnassignedDecision allocateUnassignedDecision = makeAllocationDecision(shard, allocation, logger); if (allocateUnassignedDecision.isDecisionTaken() == false) { // no decision was taken by this allocator continue; } if (allocateUnassignedDecision.getAllocationDecision() == AllocationDecision.YES) { unassignedIterator.initialize(allocateUnassignedDecision.getTargetNode().getId(), allocateUnassignedDecision.getAllocationId(), shard.primary() ? ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE : allocation.clusterInfo().getShardSize(shard, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE), allocation.changes()); } else { unassignedIterator.removeAndIgnore(allocateUnassignedDecision.getAllocationStatus(), allocation.changes()); } } } |
PrimaryShardAllocator.makeAllocationDecision
这个过程直接返回指定的分片是否可以被分配,如果还没有这个分片的信息,就向集群的其他节点去要,如果已经有了,就根据 decider 进行决策。
首次进入时,还没有任何分片的元信息,发起向集群所有数据节点获取某个 shard 元信息的 fetchData请求
之所以把请求发到所有节点,因为他不知道哪个节点有这个 shard 的数据。集群启动的时候,遍历所有 shard,再对每个 shard 向所有数据节点发 fetchData 虽然整体请求量比较多,但因为这个过程是异步的,执行时间一般还可以接受,主要的时间还是浪费在index的 recovery拖数据的过程,不过个人还是觉得有待改善,可以借鉴 HDFS 上报块状态流程。所以当 es 的集群规模变大,shard 数非常多的时候,这个请求的总量就会很大,
1.向各节点发起fetchData请求
在 AllocaService 的 reroute 中判断是否先进行 gateway 的 allocate:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
private void reroute(RoutingAllocation allocation) { assert hasDeadNodes(allocation) == false : "dead nodes should be explicitly cleaned up. See deassociateDeadNodes"; // now allocate all the unassigned to available nodes if (allocation.routingNodes().unassigned().size() > 0) { removeDelayMarkers(allocation); gatewayAllocator.allocateUnassigned(allocation); } shardsAllocator.allocate(allocation); assert RoutingNodes.assertShardStats(allocation.routingNodes()); } |
然后会进入:gateway.PrimaryShardAllocator#allocateUnassigned
遍历所有 Unassignedshard, 通过 fetchData 挨个获取其元数据.
1 2 3 4 5 6 7 8 9 10 11 |
public void allocateUnassigned(RoutingAllocation allocation) { final RoutingNodes routingNodes = allocation.routingNodes(); final RoutingNodes.UnassignedShards.UnassignedIterator unassignedIterator = routingNodes.unassigned().iterator(); while (unassignedIterator.hasNext()) { final ShardRouting shard = unassignedIterator.next(); final AllocateUnassignedDecision allocateUnassignedDecision = makeAllocationDecision(shard, allocation, logger); } //... } |
makeAllocationDecision函数是一个虚方法,其实现位于PrimaryShardAllocator.makeAllocationDecision
fetchData会向各个节点请求对应 shard 的元数据,拿到结果才返回,或者13秒超时.在 fetchData 的获取某个 shard 的过程中,不知道这个 shard 在哪个 node 上,因此向集群所有数据 node 发送请求获取这个 shard 的信息.
1 2 3 |
AsyncShardFetch.FetchResult<TransportNodesListGatewayStartedShards.NodeGatewayStartedShards> shardState = fetchData(shard, allocation); |
在: o.e.g.AsyncShardFetch#asyncFetch ,从 所有数据 node 获取某个特定 shard 的信息
1 2 3 4 5 6 7 8 |
void asyncFetch(final DiscoveryNode[] nodes, long fetchingRound) { action.list(shardId, nodes, new ActionListener<BaseNodesResponse<T>>() { @Override public void onResponse(BaseNodesResponse<T> response) { processAsyncFetch(response.getNodes(), response.failures(), fetchingRound); } |
请求节点 shard 元信息的 action 为:
1 2 3 |
internal:gateway/local/started_shards |
遍历发送的过程主要实现位于:
1 2 3 |
o.e.action.s.n.TransportNodesAction.AsyncAction#start |
2.数据节点的响应模块
对端对此响应的模块为:
1 2 3 |
gateway.TransportNodesListGatewayStartedShards#nodeOperation |
其中读取本地 shard 元数据,返回请求方.
3.收集返回结果并处理
对于一个特定shard,当response达到期望数量(发出请求时的节点数)时执行finishHim,调用到处理模块:
1 2 3 |
gateway.AsyncShardFetch#processAsyncFetch |
在这里实现收到各节点返回的 shard 级别元数据的对应处理,将 response 信息放到this.cache,下次 reroute,的时候从 cache 里取.然后 再次执行 reroute
1 2 3 4 5 |
reroute(shardId, "post_response"); 接着调用到--> routingService.reroute("async_shard_fetch"); |
主要实现是对当前ClusterState 提交一个任务,再次执行 allocationService.reroute,当前ClusterState可能是一个,也可能是多个
1 2 3 4 5 6 7 8 9 10 11 12 |
protected void performReroute(final String reason) { clusterService.submitStateUpdateTask(CLUSTER_UPDATE_TASK_SOURCE + "(" + reason + ")", new ClusterStateUpdateTask(Priority.HIGH) { @Override public ClusterState execute(ClusterState currentState) { rerouting.set(false); RoutingAllocation.Result routingResult = allocationService.reroute(currentState, reason); return ClusterState.builder(currentState).routingResult(routingResult).build(); } }); } |
shard 元数据有了,再次进入主分片分配过程
4.主分片选举实现
如果读者还没有 allocation id 的概念,先看一下这篇翻译的文章,然后就会了解到主分片决策的原理,es5和 es2中的机制是不一样的。
reroute 再次回到 makeAllocationDecision 函数,从inSyncAllocationIds集合找到活跃的 shard,得到一个列表,把这个列表中的节点依次执行 decider,决策结果:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 |
/** * Split the list of node shard states into groups yes/no/throttle based on allocation deciders */ private NodesToAllocate buildNodesToAllocate(RoutingAllocation allocation, List<NodeGatewayStartedShards> nodeShardStates, ShardRouting shardRouting, boolean forceAllocate) { List<DecidedNode> yesNodeShards = new ArrayList<>(); List<DecidedNode> throttledNodeShards = new ArrayList<>(); List<DecidedNode> noNodeShards = new ArrayList<>(); for (NodeGatewayStartedShards nodeShardState : nodeShardStates) { RoutingNode node = allocation.routingNodes().node(nodeShardState.getNode().getId()); if (node == null) { continue; } Decision decision = forceAllocate ? allocation.deciders().canForceAllocatePrimary(shardRouting, node, allocation) : allocation.deciders().canAllocate(shardRouting, node, allocation); DecidedNode decidedNode = new DecidedNode(nodeShardState, decision); if (decision.type() == Type.THROTTLE) { throttledNodeShards.add(decidedNode); } else if (decision.type() == Type.NO) { noNodeShards.add(decidedNode); } else { yesNodeShards.add(decidedNode); } } return new NodesToAllocate(Collections.unmodifiableList(yesNodeShards), Collections.unmodifiableList(throttledNodeShards), Collections.unmodifiableList(noNodeShards)); } |
具体的决策过程会依次遍历执行全部14个 decider:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) { if (allocation.shouldIgnoreShardForNode(shardRouting.shardId(), node.nodeId())) { return Decision.NO; } Decision.Multi ret = new Decision.Multi(); for (AllocationDecider allocationDecider : allocations) { Decision decision = allocationDecider.canAllocate(shardRouting, node, allocation); // short track if a NO is returned. if (decision == Decision.NO) { if (logger.isTraceEnabled()) { logger.trace("Can not allocate [{}] on node [{}] due to [{}]", shardRouting, node.node(), allocationDecider.getClass().getSimpleName()); } // short circuit only if debugging is not enabled if (!allocation.debugDecision()) { return decision; } else { ret.add(decision); } } else if (decision != Decision.ALWAYS && (allocation.getDebugMode() != EXCLUDE_YES_DECISIONS || decision.type() != Decision.Type.YES)) { // the assumption is that a decider that returns the static instance Decision#ALWAYS // does not really implements canAllocate ret.add(decision); } } return ret; } |
决策之后的结果可能会有多个节点,取第一个。至此,主分片选取完成。
cluster.routing.allocation.enable 对主分片分配的影响
集群完全重启的操作流程中,要求把这个先设置为 none,然后重启集群。对这个选项进行判断和实施在 EnableAllocationDecider 中,如你所见,主分片的分配会被拦截吗?答案是肯定的,主分片被这个 decider 所拦截,但是在主分片的分配过程中有另外一层逻辑:如果被 decider 拦截,返回 NO,则尝试强制分配。给buildNodesToAllocate的最后一个参数传入 true,接下来尝试强制分配的逻辑如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
public Decision canForceAllocatePrimary(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) { Decision decision = canAllocate(shardRouting, node, allocation); if (decision.type() == Type.NO) { // On a NO decision, by default, we allow force allocating the primary. return allocation.decision(Decision.YES, decision.label(), "primary shard [{}] allowed to force allocate on node [{}]", shardRouting.shardId(), node.nodeId()); } else { // On a THROTTLE/YES decision, we use the same decision instead of forcing allocation return decision; } } |
如果 decider 返回 NO,直接设置成 YES,这种情况只在分配一个磁盘上已经存在的unassigned primary shards时出现。
副分片分配器
replicaShardAllocator.allocateUnassigned
与primaryShardAllocator.allocateUnassigned执行的流程相同。
ReplicaShardAllocator.makeAllocationDecision
副分片决策过程中也需要 fetchData,只不过主分片分配节点已经 fetch过,可以直接从结果中取。但是在 fetchData之前先跑一遍 allocation.deciders().canAllocate,来判断是否至少可以在一个 node 上分配(canBeAllocatedToAtLeastOneNode),如果分配不了就省略后面的逻辑了,例如其主分片尚未就绪等
然后根据fetchData到的 shard 元信息,分配到已经拥有这个 shard 副本的节点,如果没有相关节点,就判断下是否需要 delay,否则返回 NOT_TAKEN。分配成功之后一样进入 init 过程。
是否delay由配置项:index.unassigned.node_left.delayed_timeout控制,可以动态调整
数据的分配由BalancedShardsAllocator完成,其中有三个配置项:
1 2 3 4 5 |
cluster.routing.allocation.balance.index cluster.routing.allocation.balance.shard cluster.routing.allocation.balance.threshold |
具体实现:
1 2 3 4 5 6 7 8 |
public void allocate(RoutingAllocation allocation) { final Balancer balancer = new Balancer(logger, allocation, weightFunction, threshold); balancer.allocateUnassigned(); balancer.moveShards(); balancer.balance(); } |
allocateUnassigned
根据权重算法和decider决定把 shard 分配到哪个节点。同样将决策后的分配信息更新到集群状态,由 master 广播下去。。
moveShards
对状态 started 的分片根据 decider来判断是否需要 move,move 过程中此 shard 的状态被设置为 RELOCATING,在目标上创建这个 shard 时状态为 INITIALIZING,同时版本号会加1。
balance
根据权重函数平衡集群模型上的节点。
手工 reroute
后续补充
从 gateway 到 allocation 流程的转换
两者之间没有明显的界限,gateway 的最后一步,执行 reroute,等待这个函数返回,然后打印 gateway 选举结果的日志,此时 reroute 向各节点发起的询问 shard 级元数据的操作基本还没执行完,因此一般只有少数主分片被选举完了,gateway 流程的结束只是集群级和索引级的元数据已选举完毕。主分片的选举正在进行中。
从 allocation流程 到 recovery 流程的转换
makeAllocationDecision成功后,unassignedIterator.initialize初始化这个 shard,创建一个新的ShardRouting对象,把相关信息添加到集群状态,设置routingChangesObserver为已经发出变化,后面的流程就会把新的集群状态广播出去。到此,reroute 函数执行完毕。
节点收到广播下来的集群状态,进入 applyClusterState处理所有相关操作,其中,createOrUpdateShards 执行到 createShard 时,准备recovery 相关信息:
1 2 3 4 5 6 7 8 |
if (shardRouting.recoverySource().getType() == Type.PEER) { sourceNode = findSourceNodeForPeerRecovery(logger, routingTable, nodes, shardRouting); } RecoveryState recoveryState = new RecoveryState(shardRouting, nodes.getLocalNode(), sourceNode); indicesService.createShard(shardRouting, recoveryState, recoveryTargetService, new RecoveryListener(shardRouting), repositoriesService, failedShardHandler); |
接下来到 IndicesService.createShard开始执行 recovery:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
indexShard.startRecovery(recoveryState, recoveryTargetService, recoveryListener, repositoriesService, (type, mapping) -> { assert recoveryState.getRecoverySource().getType() == RecoverySource.Type.LOCAL_SHARDS: "mapping update consumer only required by local shards recovery"; try { client.admin().indices().preparePutMapping() .setConcreteIndex(shardRouting.index()) // concrete index - no name clash, it uses uuid .setType(type) .setSource(mapping.source().string()) .get(); } catch (IOException ex) { throw new ElasticsearchException("failed to stringify mapping source", ex); } }, this); |
在:PeerRecoveryTargetService.doRecovery 发出 action=internal:index/shard/recovery/start_recovery 的请求:
1 2 3 4 5 6 7 8 9 10 |
cancellableThreads.execute(() -> responseHolder.set( transportService.submitRequest(request.sourceNode(), PeerRecoverySourceService.Actions.START_RECOVERY, request, new FutureTransportResponseHandler<RecoveryResponse>() { @Override public RecoveryResponse newInstance() { return new RecoveryResponse(); } }).txGet())); |
对端收到请求进行处理:
1 2 3 |
PeerRecoverySourceService.recover |
小结
- 不需要等所有主分片都分配完毕才执行副分片的分配。每个分片有自己的分配流程。
- 不需要等所有分片都分配完才走 recovery 流程
- 主分片不需要等副分片分配成功才进入主分片的 recovery,主副分片有自己的 recovery 流程
参考
https://www.elastic.co/blog/tracking-in-sync-shard-copies
https://segmentfault.com/a/1190000008956708
https://www.elastic.co/guide/en/elasticsearch/reference/current/breaking_50_allocation.html
http://www.jianshu.com/p/7600e13e8b12
http://blog.csdn.net/thomas0yang/article/details/78485067
(转载请注明作者和出处 easyice.cn ,请勿用于任何商业用途)
One thought on “elasticsearch allocation 分析”
您好,想问下新版的es为什么把if (allocation.routingNodes().unassigned().size() > 0) {
这个条件给去掉了,像是要解决无法取消副本的任务,但是我对这块不太熟悉,期待超哥的回复,能详细的讲解下这里,整个reroute各个节点是怎样发起的,怎样执行的;辛苦