elasticsearch cluster模块分析
目录
概述
管理集群状态,对其他内部模块提供提交集群状态更新任务的 submitStateUpdateTask 接口。管理,并串行地运行这些任务,来不及处理的放入队列,运行完毕后产生新集群状态,将集群状态发布到集群所有节点,发布成功后,负责通知各个模块去应用新的集群状态。
主要对外接口
- clusterService.start() 只是初始化内部数据
- clusterService.addListener(this);
- clusterService.removeListener(this);
- clusterService.getClusterSettings()
- clusterService.getClusterName()
- ClusterState newState = clusterService.state(); 获取集群状态
- clusterService.addTimeoutListener
- clusterService.removeTimeoutListener(this);
- clusterService.addInitialStateBlock
- clusterService.removeInitialStateBlock
- clusterService.pendingTasks(); 任务队列中等待处理的任务
- clusterService.localNode()
- clusterService.operationRouting()
- clusterService.submitStateUpdateTask
重要数据成员
clusterStateAppliers
当新的集群状态产生时,要通知一些内部模块进行处理,例如,索引 recovery 就是根据这个通知去触发的。
目前包括三类优先级的监听器,:highPriorityStateAppliers,normalPriorityStateAppliers,lowPriorityStateAppliers
有下列模块会被通知:
IndicesClusterStateService
PipelineStore
PipelineExecutionService
RepositoriesService
RestoreService
IngestActionForwarder
GatewayAllocator
TaskManager
SnapshotShardsService
Gateway
state
用于记录当前集群状态
clusterStateListeners,timeoutClusterStateListeners
集群状态更细完毕后通知Listener
taskBatcher
封装一个优先级线程池,用于管理运行提交的任务。任务队列就在此类内部定义的线程池中,REST API: _cluster/health
中的 number_of_pending_tasks 就是这个线程池中的workQueue:
1 2 3 |
private final PrioritizedEsThreadPoolExecutor threadExecutor; |
submitStateUpdateTask 提交任务时就是通过调用 taskBatcher 来提交
clusterStatePublisher
通过他来发布集群状态,广播到其他节点
submitStateUpdateTask
当节点需要把某种信息通知到整个集群时,调用本函数执行相关任务,触发时机大约有:
- 集群拓扑变化
- 模板,索引 map, 别名的变化
- 索引操作:创建,删除, open/close
- 快照, reroute api 等触发
submitStateUpdateTask 执行过程
提交一个任务,将任务交给线程池执行,这个线程池支持优先级,线程数量为1,因此任务将顺序执行。如同的线程池一般原理,首次运行任务会创建这个线程,后续的添加的任务放入阻塞队列。
submitStateUpdateTask 有多个重载,对外接口的函数原型为:
1 2 3 4 |
public <T extends ClusterStateTaskConfig & ClusterStateTaskExecutor<T> & ClusterStateTaskListener> void submitStateUpdateTask( final String source, final T updateTask) |
第一个参数是事件源,第二个参数为要提交的具体的任务,每个任务都应该继承自定义中描述的三个类,以 ClusterStateUpdateTask 为例:
该函数的主要实现先把提交的任务封装成一个列表:
1 2 3 |
List<ClusterServiceTaskBatcher.UpdateTask> |
ClusterServiceTaskBatcher.UpdateTask 继承自 BatchedTask,而 BatchedTask 继承自 Runnable,接下来线程池会执行这个 Runnable。
然后执行 taskBatcher.submitTasks ,在 taskBatcher.submitTasks内部,将所有 task放入 tasksPerBatchingKey(并非作为任务队列用途):
1 2 3 4 5 6 7 |
synchronized (tasksPerBatchingKey) { LinkedHashSet<BatchedTask> existingTasks = tasksPerBatchingKey.computeIfAbsent(firstTask.batchingKey, k -> new LinkedHashSet<>(tasks.size())); existingTasks.addAll(tasks); } |
然后执行 task 中的第一个:
1 2 3 4 |
final BatchedTask firstTask = tasks.get(0); threadExecutor.execute(firstTask); |
交给 PrioritizedEsThreadPoolExecutor 线程池执行,这个线程池支持优先级,有一个阻塞队列 BlockingQueue,线程数:maximumPoolSize=1 首次执行创建一个新的线程,线程名字就是大家熟悉的“clusterService#updateTask[T#1]”,后续来不及立即执行的任务放入线程池队列。
到这里,submitStateUpdateTask函数本身就执行完毕了,剩余的任务工作都在新的 updateTask 线程中执行
任务执行线程:updateTask 流程
新线程的执行入口:o.e.c.s.TaskBatcher.BatchedTask#run,期间做一些检查工作,主要逻辑实现进入到 ClusterService#runTasks,核心就做了两件事:执行任务,生成新的集群状态,把集群状态发布到集群所有节点
1 2 3 4 5 6 |
void runTasks(TaskInputs taskInputs) { TaskOutputs taskOutputs = calculateTaskOutputs(taskInputs, previousClusterState, startTimeNS); publishAndApplyChanges(taskInputs, taskOutputs); } |
整体流程图
每个节点都会运行这个流程,蓝色部分是只有 Master 节点才执行的。总体来看,Master 节点特有的工作是将集群状态发布到其他节点,除了发布之外,非 Master 节点会执行剩余所有的逻辑。但是不会产生需要发布到其他节点新集群状态的尴尬情况。es 里很多地方有类似的设计:让功能点走统一的代码逻辑,在逻辑中区分不同情况去处理。例如 副分片 recovery过程就是在写流程中处理冲突实现主副分片一致。
calculateTaskOutputs
调用 submitStateUpdateTask 时传入的任务,返回结果可能会包含新的集群状态
publishAndApplyChanges
发布,并应用新的集群状态,跟随日志输出,有几个重要的阶段:
准备阶段
根据新的集群状态,确认节点连接
1 2 3 |
nodeConnectionsService.connectToNodes(newClusterState.nodes()); |
发布集群状态
如果本节点是 Master,就把集群状态发布到其他节点
1 2 3 4 5 |
if (newClusterState.nodes().isLocalNodeElectedMaster()) { clusterStatePublisher.accept(clusterChangedEvent, ackListener); } |
应用集群状态
这里是应用集群状态中的配置信息
1 2 3 4 5 6 |
if (clusterChangedEvent.state().blocks().disableStatePersistence() == false && clusterChangedEvent.metaDataChanged()) { final Settings incomingSettings = clusterChangedEvent.state().metaData().settings(); clusterSettings.applySettings(incomingSettings); } |
将新集群状态通知到各模块
1 2 3 |
callClusterStateAppliers(newClusterState, clusterChangedEvent); |
更新集群状态
用新的集群状态替换旧集群状态
1 2 3 4 |
nodeConnectionsService.disconnectFromNodesExcept(newClusterState.nodes()); pdateState(css -> newClusterState); |
通知所有 listener
- 通知 clusterStateListeners 和 timeoutClusterStateListeners
- 如果本节点是 Master,还要通知 ackListener.onNodeAck
- 通知 task 的 listener 集群状态已处理:task.listener.clusterStateProcessed
- 如果本节点是 Master,通知 executor 集群状态已发布:taskInputs.executor.clusterStatePublished
发布集群状态
集群在状态是通过clusterStatePublisher发布的,节点start 的时候,clusterStatePublisher被设置:
1 2 3 |
clusterService.setClusterStatePublisher(discovery::publish); |
因此具体的集群状态发布在 ZenDiscovery#publish 中执行。
分布式事务一致性:二段提交
发布集群状态是一个分布式事务操作,分布式事务需要实现原子性:要么所有参与者都提交事务,要么都取消事务。二段提交可以避免失败回滚:把信息发下去,但不应用,如果得到多数节点的确认,再发一个请求出去要求节点应用。
es 实现二段提交与标准二段提交有些区别,发布集群状态到参与者的数量并非定义为全部,而是多数节点成功就算作成功。多数的定义取决于配置项:
1 2 3 |
discovery.zen.minimum_master_nodes |
整体过程如下:
- 发布集群状态,等待响应
- 收到的响应数量大于minimum_master_nodes数量,发送 commit 请求。
二段提交不能保证第二阶段,节点收到 commit 请求后正确应用事务。他只能保证参与者都提交了事务,不能保证事务在单个节点上提交成功还是失败,因此事务的执行可能在部分节点失败。
发布过程
从 es2.0版本开始,发布集群信息支持在相邻的两个版本号之间只发生增量内容。在准备阶段,为集群状态准备全量和增量两份序列化的内容:
1 2 3 4 |
buildDiffAndSerializeStates(clusterChangedEvent.state(), clusterChangedEvent.previousState(), nodesToPublishTo, sendFullVersion, serializedStates, serializedDiffs); |
然后将序列化后的数据发生到全部节点,然后等待事务变为已提交状态。
1 2 3 4 5 6 7 8 9 10 11 |
for (final DiscoveryNode node : nodesToPublishTo) { if (sendFullVersion || !previousState.nodes().nodeExists(node)) { sendFullClusterState(clusterState, serializedStates, node, publishTimeout, sendingController); } else { sendClusterStateDiff(clusterState, serializedDiffs, serializedStates, node, publishTimeout, sendingController); } } sendingController.waitForCommit(discoverySettings.getCommitTimeout()); |
无论发全量还是增量,最终调用到 sendClusterStateToNode进行发送。这里是二段提交的执行过程。
第一阶段,发送集群状态数据,每收到一个 Response调用一次 onNodeSendAck:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
transportService.sendRequest(node, SEND_ACTION_NAME, new BytesTransportRequest(bytes, node.getVersion()), options, new EmptyTransportResponseHandler(ThreadPool.Names.SAME) { @Override public void handleResponse(TransportResponse.Empty response) { if (sendingController.getPublishingTimedOut()) { logger.debug("node {} responded for cluster state [{}] (took longer than [{}])", node, clusterState.version(), publishTimeout); } sendingController.onNodeSendAck(node); } } |
第二阶段:收到足够的Response消息后,标记为已提交: markAsCommitted() 执行第二阶段,sendCommitToNode 提交:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
private synchronized void checkForCommitOrFailIfNoPending(DiscoveryNode masterNode) { neededMastersToCommit--; if (neededMastersToCommit == 0) { if (markAsCommitted()) { for (DiscoveryNode nodeToCommit : sendAckedBeforeCommit) { sendCommitToNode(nodeToCommit, clusterState, this); } sendAckedBeforeCommit.clear(); } } decrementPendingMasterAcksAndChangeForFailure(); } |
只要第一阶段收到足够的 Response,无论后面发送的 Commit 请求成功多少,整个事务都算做成功了。因此有可能最终正确应用事务的是少数。
(转载请注明作者和出处 easyice.cn ,请勿用于任何商业用途)