elasticsearch search 模块分析
基于版本:5.5.3
本篇从整体上分析一下检索的基本流程。
增删改查操作只对单个文档进行处理,通常由 _index, _type, 和 _id三元组来确定唯一文档。但搜索需要一种更复杂的模型,因为不知道查询会命中哪些文档。一个搜索请求必须询问指定索引的所有分片中的某个副本来进行匹配。假设一个索引有5个主分片,1个副本分片,共10个分片,一次搜索请求会由5个分片来共同完成,他们有可能是主分片,也可能是副分片。也就是说,一次搜索请求只会命中所有副本中的一个。
找到匹配文档仅仅完成了一半,多分片中的结果必须组合成单个排序列表。集群的任意节点都可以接受搜索请求,接收客户端请求的节点成为协调节点,在协调节点,搜索任务被执行成一个两阶段过程,称之为 query then fetch 。 真正执行搜索任务的节点暂且称为数据节点。
在协调节点,相应地实现位于:
查询阶段(query):o.e.a.search.InitialSearchPhase
取回阶段(fetch):o.e.a.search.FetchSearchPhase
他们都继承自:SearchPhase
search type
在 es5中有两种搜索类型:
- DFS_QUERY_THEN_FETCH
- QUERY_THEN_FETCH (默认)
两种不同的搜索类型的区别在于查询阶段,DFS 查询阶段的流程要多一些,他使用全局信息来获取更准确的评分。
下面的流程分析以默认搜索类型为例.
Query 阶段
在初始 查询阶段 时, 查询会广播到索引中每一个分片副本(主分片或者副分片)。 每个分片在本地执行搜索并构建一个匹配文档的 优先队列。
优先队列 是一个存有 topn 匹配文档的有序列表。优先队列大小为分页参数 from + size。
分布式搜索的 Query 阶段
QUERY_THEN_FETCH 搜索类型的查询阶段有以下步骤:
- 客户端发送 search 请求到 Node 3。
- Node 3 将查询请求转发到索引的每个主分片或副分片中。
- 每个分片在本地执行查询,并使用本地的Term/Document Frequency信息进行打分,添加结果到大小为 from + size 的本地有序优先队列中
- 每个分片返回各自优先队列中所有文档的 ID 和排序值给协调节点,它合并这些值到自己的优先队列,产生一个全局排序后的列表。
协调节点广播查询请求到所有相关分片时,可以是主分片或副分片,协调节点将在之后的请求中轮询所有的分片副本来分摊负载。
小结:查询阶段并不会对搜索请求的内容解析理解,无论搜什么东西,只看本次搜索需要命中哪些shard,然后针对每个特定 shard 选择一个副本,转发搜索请求。
Query 阶段源码解析
本阶段运行于 http_server_work 线程
1.REST接口收到搜索请求
主要是将请求体解析为 SearchRequest 数据结构
1 2 3 4 5 |
SearchRequest searchRequest = new SearchRequest(); request.withContentOrSourceParamParserOrNull(parser -> parseSearchRequest(searchRequest, request, parser)); |
2.构造目的 shard 列表
将本集群shard列表和远程集群的shard列表(如果有的话)合并
1 2 3 4 5 6 |
GroupShardsIterator<ShardIterator> localShardsIterator = clusterService.operationRouting().searchShards(clusterState, concreteIndices, routingMap, searchRequest.preference()); GroupShardsIterator<SearchShardIterator> shardIterators = mergeShardsIterators(localShardsIterator, localIndices, remoteShardIterators); |
3.遍历所有 shard 发送请求,请求是基于 shard 遍历的,如果某个节点有多个 shard,并不会把请求合并为一个。某节点有N个 shard,就向他发N次请求。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
public final void run() throws IOException { int shardIndex = -1; for (final SearchShardIterator shardIt : shardsIts) { shardIndex++; final ShardRouting shard = shardIt.nextOrNull(); //后续请求轮询所有的分片副本 if (shard != null) { performPhaseOnShard(shardIndex, shardIt, shard); } else { // really, no shards active in this group onShardFailure(shardIndex, null, null, shardIt, new NoShardAvailableActionException(shardIt.shardId())); } } } |
shardsIts为本次搜索涉及到的所有分片,shardIt.nextOrNull()从某个分片的所有副本中选择一个,例如从[website][0]中选择主分片。
发送请求同时定义一个listener,用于处理Response:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
executePhaseOnShard(shardIt, shard, new SearchActionListener<FirstResult>(new SearchShardTarget(shard.currentNodeId(), shardIt.shardId(), shardIt.getClusterAlias(), shardIt.getOriginalIndices()), shardIndex) { @Override public void innerOnResponse(FirstResult result) { onShardResult(result, shardIt); } @Override public void onFailure(Exception t) { onShardFailure(shardIndex, shard, shard.currentNodeId(), shardIt, t); } }); |
发送过程调用 transport 模块实现:
1 2 3 |
transportService.sendChildRequest |
以简单普通搜索为例,对应 action 为:
1 2 3 |
indices:data/read/search[phase/query] |
4.收集返回结果
1 2 3 4 5 6 7 8 9 10 11 12 |
private void onShardResult(FirstResult result, ShardIterator shardIt) { onShardSuccess(result); final int xTotalOps = totalOps.addAndGet(shardIt.remaining() + 1); if (xTotalOps == expectedTotalOps) { onPhaseDone(); } else if (xTotalOps > expectedTotalOps) { throw new AssertionError("unexpected higher total ops [" + xTotalOps + "] compared to expected [" + expectedTotalOps + "]"); } } |
onShardSuccess 对收集到的结果进行合并。
onPhaseDone 会调用 executeNextPhase 从而开始执行取回阶段。
Fetch 阶段
Query 阶段知道了要取哪些数据,但是并没有取具体的数据,这就是 fetch 阶段要做的。
分布式搜索的 Fetch 阶段
Fetch 阶段由以下步骤构成:
1. 协调节点向相关 node 发送 GET 请求
2. 分片所在节点向协调节点返回数据
3. 协调节点等待所有文档被取得,然后返回给客户端
分片所在节点在返回文档数据时,处理有可能的_source 字段以及高亮参数。
协调节点首先决定哪些文档 确实 需要被取回,例如,如果查询指定了 { “from”: 90, “size”: 10 } ,只有从第91个开始的10个结果需要被取回。
深度分页
为了避免在协调节点创建 number_of_shards * (from + size)的优先队列,尽量控制分页深度。
Fetch 阶段源码解析
本阶段运行于 search 线程
1.发送 Fetch 请求
Query 阶段的 executeNextPhase 开始执行 Fetch 阶段,进入FetchSearchPhase#innerRun,处理 scroll(略),从查询阶段的 shard 列表中遍历,跳过查询结果为空的 shard,对特定目标 shard 执行 executeFetch 获取数据,其中包括分页信息。
executeFetch主要实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
private void executeFetch(final int shardIndex, final SearchShardTarget shardTarget, final CountedCollector<FetchSearchResult> counter, final ShardFetchSearchRequest fetchSearchRequest, final QuerySearchResult querySearchResult, final Transport.Connection connection) { context.getSearchTransport().sendExecuteFetch(connection, fetchSearchRequest, context.getTask(), new SearchActionListener<FetchSearchResult>(shardTarget, shardIndex) { @Override public void innerOnResponse(FetchSearchResult result) { counter.onResult(result); } }); } |
executeFetch 的参数 querySearchResult 中包含分页信息,最后定义一个 Listener,每获取一个 shard 数据成功后执行 counter.onResult,其中调用对结果的处理回调,把 result 保存的数组,然后执行countDown:
1 2 3 4 5 6 7 8 9 |
void onResult(R result) { try { resultConsumer.accept(result); } finally { countDown(); } } |
以简单普通搜索为例,本节点发送的 action 为:
1 2 3 |
indices:data/read/search[phase/fetch/id |
2.收集结果
收集器的定义在innerRun中,定义了包括了收到的 shard 数据存到哪,收集完了谁来处理:
1 2 3 4 5 |
final CountedCollector<FetchSearchResult> counter = new CountedCollector<>(r -> fetchResults.set(r.getShardIndex(), r), docIdsToLoad.length, // we count down every shard in the result no matter if we got any results or not finishPhase, context); |
fetchResults 用于存储从某个 shard 收集到的结果,每收到一个 shard 的数据执行一次 counter.countDown() 当所有 shard 数据收集完毕后,countDown 会出触发执行 finishPhase:
1 2 3 4 5 |
final Runnable finishPhase = () -> moveToNextPhase(searchPhaseController, scrollId, reducedQueryPhase, queryAndFetchOptimization ? queryResults : fetchResults); |
下一阶段的定义在 FetchSearchPhase 构造函数:
1 2 3 4 5 |
this(resultConsumer, searchPhaseController, context, (response, scrollId) -> new ExpandSearchPhase(context, response, // collapse only happens if the request has inner hits (finalResponse) -> sendResponsePhase(finalResponse, scrollId, context))); |
3.ExpandSearchPhase
取回阶段完成之后执行ExpandSearchPhase#run,主要判断字段是否折叠,实现字段折叠,否则就直接返回给客户端。
4.回复客户端
ExpandSearchPhase 之后是恢复客户的 sendResponsePhase:
1 2 3 4 5 |
public void run() throws IOException { context.onResponse(context.buildSearchResponse(response, scrollId)); } |
执行搜索的数据节点
对各种 Query,Fetch 请求的处理入口注册于:
o.e.a.s.SearchTransportService#registerRequestHandler
响应 Query 请求
本阶段运行于 search 线程
以 active为:
1 2 3 |
indices:data/read/search[phase/query] |
为例的普通请求,执行查询,发送 Response
1 2 3 4 |
SearchPhaseResult result = searchService.executeQueryPhase(request, (SearchTask)task); channel.sendResponse(result); |
查询实现入口在 searchService.executeQueryPhase ,完全封装在这个函数。查询时,先看是否允许 cache,由配置:
1 2 3 |
index.requests.cache.enable |
决定,默认为 true,会把查询结果放到 cache 中,查询时优先从 cache 中取。这个 cache 由节点的所有分片共享,基于 LRU 算法实现:空间满的时候踢出最最近最少使用的数据。cache 并不缓存全部检索结果,具体参考这里
核心的查询封装在:queryPhase.execute(context) 其中调用 lucene 实现检索,同时实现聚合:
1 2 3 4 5 6 7 8 9 10 |
aggregationPhase.preProcess(searchContext); boolean rescore = execute(searchContext, searchContext.searcher()); if (rescore) { // only if we do a regular search rescorePhase.execute(searchContext); } suggestPhase.execute(searchContext); aggregationPhase.execute(searchContext); |
其中包含几个核心功能:
- execute() 调用 lucene, searcher.search() 实现搜索
- rescorePhase 全文检索且需要打分
- suggestPhase 自动补全及纠错
- aggregationPhase 实现聚合
小结:
- 慢查询Query日志的统计时间在于本阶段的处理时间。
- 聚合操作在本阶段实现, lucene 检索后完成。
响应 Fetch 请求
与 Query 节点类似,本阶段运行于 search 线程
以 action 为:
1 2 3 |
indices:data/read/search[phase/fetch/id] |
为例的普通请求,执行Fetch,发送 Response
1 2 3 4 |
FetchSearchResult result = searchService.executeFetchPhase(request, (SearchTask)task); channel.sendResponse(result); |
对 Fetch 响应的实现封装在:searchService.executeFetchPhase,核心是调用 fetchPhase.execute(context); 按照命中的 doc 取得相关数据,填充到 SearchHits,最终封装到 FetchSearchResult。
小结:
- 慢查询Fetch日志的统计时间在于本阶段的处理时间。
总结
- 聚合是在 es 实现的,而非 lucene
- Query 和 Fetch 请求之间是无状态的,除非是scroll方式
- 分页搜索不会单独 cache,cache 和分页没有关系
最后补一张整体流程图:
参考
http://www.opscoder.info/es_search_client.html
http://www.opscoder.info/es_search_server.html
http://www.jianshu.com/p/c7529b98993e
(转载请注明作者和出处 easyice.cn ,请勿用于任何商业用途)