工程笔记:elasticsearch聚合分组操作语句和spring-data调用并解析结果数据
下面主要记录es如何实现将数据按字段分组(group by)的操作,以及spring data如何调用es分组语句和解析结果数据。
比如有如下的group by操作(以mysql语句为例):
1
| select count(id), key1, key2, key3 from table group by key1, key2, key3;
|
- 对table表分组查询
- 分组字段为key1,key2,key3
- 返回每组的条数,以及分组字段值
最终要解析到如下数据结构
1 2 3 4 5 6 7 8 9 10 11
| [ { key1=a, key2=b, key3=c, count=2 }, { key1=a, key2=d, key3=e, count=2 }, { key1=f, key2=g, key3=h, count=1 } ]
|
ES语句实现
假设现在es中有如下5条数据,按照key1,key2,key3字段做分组操作。
_index |
_type |
_id |
key1 |
key2 |
key3 |
value |
group |
test |
1 |
a |
b |
c |
val1 |
group |
test |
2 |
a |
b |
c |
val2 |
group |
test |
3 |
a |
d |
e |
val3 |
group |
test |
4 |
a |
d |
e |
val4 |
group |
test |
5 |
f |
g |
h |
val5 |
es中的查询语句为:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| GET /group/test/_search { "aggs": { "key1": { "terms": { "field": "key1.keyword" }, "aggs": { "key2": { "terms": { "field": "key2.keyword" }, "aggs": { "key3": { "terms": { "field": "key3.keyword" } } } } } } } }
|
语句执行后得到数据结果如下(数据中已经省略了部分无关的数据),可以看到结果层级非常复杂。
仔细观察可以发现,每一个分组字段下面都是一个桶(bucket),数据结构是一个数组,数据中的元素就是一个分组字段值对象,对象中指出了分组字段的值以及该组的数据数量,以及下一级分组,然后同理依次延生下去,这种数据结构最好的解析方法就是使用递归。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60
| { "aggregations": { "key1": { "buckets": [ { "key": "a", "doc_count": 4, "key2": { "buckets": [ { "key": "b", "doc_count": 2, "key3": { "buckets": [ { "key": "c", "doc_count": 2 } ] } }, { "key": "d", "doc_count": 2, "key3": { "buckets": [ { "key": "e", "doc_count": 2 } ] } } ] } }, { "key": "f", "doc_count": 1, "key2": { "buckets": [ { "key": "g", "doc_count": 1, "key3": { "buckets": [ { "key": "h", "doc_count": 1 } ] } } ] } } ] } } }
|
Template方式执行语句
springboot接入es后可以通过ElasticsearchTemplate执行查询语句。
引入ElasticsearchTemplate。
1 2
| @Autowired private ElasticsearchTemplate template;
|
定义聚合对象(分组字段),es中分组是使用的聚合操作的Terms Aggregation。
当有多级分组时,后一个分组作为前一个分组的子聚合,在spring data中使用subAggregation方法。
1 2 3 4 5 6
| TermsAggregationBuilder key1Builder = AggregationBuilders.terms("key1").field("key1.keyword"); TermsAggregationBuilder key2Builder = AggregationBuilders.terms("key2").field("key2.keyword"); TermsAggregationBuilder key3Builder = AggregationBuilders.terms("key3").field("key3.keyword");
TermsAggregationBuilder aggBuilder = key1Builder.subAggregation(key2Builder.subAggregation(key3Builder));
|
执行语句。
1 2 3 4 5 6
| NativeSearchQueryBuilder builder = new NativeSearchQueryBuilder().withIndices("group").withTypes("test"); NativeSearchQuery query = builder.addAggregation(aggBuilder).build(); T res = template.query(query, resp -> { return T; });
|
spring data解析数据
上述lambda表达式中的resp
如果直接打印出来就是es分组查询后的结果的json字符串,上面也说到这个数据结构可以使用递归来处理。
先通过getAggregations方法获取到第一级的分组字段的aggregation数组,然后遍历数组元素,并获取元素的桶数组,再依次对桶元素执行getAggregations方法,进入递归。
分组操作在es中叫做terms aggregation,所以getAggregations方法获取到的数组的元素类型就叫做StringTerms,由于每一级分组只有一个字段,所以aggregation数组中只会有一个StringTerms对象,当递归到某一级分组字段后,发现没有StringTerms对象了,那么就说明递归的上一级已是最后一级分组字段了,就不必继续递归了。
每递归深入一层,就会多一个字段,这种字段不确定的数据结构使用Map保存。当每深入一层时,需要把上层得到的字段Map传入,然后再把本层的字段添加进去。由于每一层的字段值可能有多个,所以需要用上层传入的字段Map的拷贝来保存数据,我把上层传入的字段Map叫做原型(prototype)。
当递归结束后,把最后的数据Map放入到集合中,最后将集合返回,就是解析完成的数据集合了。
下面将展示递归代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59
|
private void recurseMap(List<Map<String, Object>> resList, Aggregations aggregations, Map<String, Object> prototype) {
StringTerms stringTerms = null; for (Aggregation aggregation : aggregations) { if (aggregation instanceof StringTerms) { stringTerms = (StringTerms) aggregation; } } if (stringTerms == null) { return; }
Map<String, Object> thisTurnProptype = new HashMap<>(prototype); List<StringTerms.Bucket> thisTurnBuckets = stringTerms.getBuckets(); if (thisTurnBuckets.isEmpty()) { return; }
if (thisTurnBuckets.get(0).getAggregations().asList().size() > 0) { String aggKey = stringTerms.getName(); for (StringTerms.Bucket thisTrunBucket : thisTurnBuckets) { String aggValue = thisTrunBucket.getKeyAsString(); thisTurnProptype.put(aggKey, aggValue); recurseMap(resList, thisTrunBucket.getAggregations(), thisTurnProptype); } return; }
List<Map<String, Object>> thisTurnList = new ArrayList<>(); String mapKey = stringTerms.getName(); for (StringTerms.Bucket bucket : thisTurnBuckets) { String mapValue = bucket.getKeyAsString(); long count = bucket.getDocCount(); Map<String, Object> item = new HashMap<>(thisTurnProptype); item.put(mapKey, mapValue); item.put("count", count); thisTurnList.add(item); } resList.addAll(thisTurnList); }
|
递归调用代码和resp处理代码如下:
1 2 3 4 5
| private List<Map<String, Object>> flatAggRes(Aggregations aggregations) { List<Map<String, Object>> res = new ArrayList<>(); recurseMap(res, aggregations, new HashMap<>()); return res; }
|
1
| List<Map<String, Object>> res = template.query(query, resp -> flatAggRes(resp.getAggregations()));
|
获取其他字段
按照key1,key2,key3分组后,只查询了条数和分组字段,如果想要获取其他字段的话,可以使用tophits来获取最匹配的那条数据,从而获取的其他字段的数据。
对于分组语句,不建议获取除分组字段外的其他字段值的,因为其他字段在该分组情况下是不一定相同的,这样有可能获取到的字段不一致,就像MySQL5.7中就默认不建议这么做一样(否则需要开启ONLY_FULL_GROUP_BY)。
ES语句实现
只展示从key2开始聚合的部分语句,可以看到,other_col是top_hits语句,和key3语句并列,size表示只取1条。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| { ... "aggs": { "key3": { "terms": { "field": "key3.keyword" } }, "other_col": { "top_hits": { "size": 1 } } } ... }
|
得到的结果如下,依然只展示部分结果,可以看到与key3并排的other_col下面的hits数组中有一条数据对象,这个就是最匹配的数据,可以从中拿到其他字段的值,如val。需要注意的是,前面也说过了,这些字段的值可能不一致,只能选取一致的值。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
| { ... "key2": { "buckets": [ { "key": "b", "doc_count": 2, "key3": { "buckets": [ { "key": "c", "doc_count": 2 } ] }, "other_col": { "hits": { "total": 2, "max_score": 1, "hits": [ { "_index": "group", "_type": "test", "_id": "2", "_score": 1, "_source": { "key1": "a", "key2": "b", "key3": "c", "val": "val2" } } ] } } } ... ] } ... }
|
spring data解析数据
根据上面的解析数据的代码,只需要添加和修改topHits相关代码即可。
- 添加tophits的聚合对象,与key3Builder并列,都为key2Builder的子聚合。
1 2 3
| TopHitsAggregationBuilder otherColBuilder = AggregationBuilders.topHits("other_col"); TermsAggregationBuilder aggBuilder = key1Builder.subAggregation(key2Builder.subAggregation(key3Builder) .subAggregation(otherColBuilder));
|
- 修改aggregations遍历代码,判断StringTerms和TopHits,并将topHits数据放入原型Map中。这里的topHits每一层默认只有一个。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| StringTerms stringTerms = null; InternalTopHits topHits = null; for (Aggregation aggregation : aggregations) { if (aggregation instanceof StringTerms) { stringTerms = (StringTerms) aggregation; } if (aggregation instanceof InternalTopHits) { topHits = (InternalTopHits) aggregation; } } if (stringTerms == null) { return; } Map<String, Object> thisTurnProptype = new HashMap<>(prototype); if (topHits != null) { JSONObject topHitsJson = JSON.parseObject(topHits.getHits().iterator().next().getSourceAsString()); thisTurnProptype.put(topHits.getName(), topHitsJson); }
|
最终的代码见文末附件。
Client方式执行语句
Client方式和Template方式代码类似,不同的地方只是入口类不同,Client使用如下方法获得resp。
1 2
| SearchRequestBuilder builder = client.prepareSearch("group").setTypes("test"); SearchResponse resp = builder.addAggregation(aggBuilder).get();
|
附件:完整代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104
| package xxx; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import org.elasticsearch.search.aggregations.Aggregation; import org.elasticsearch.search.aggregations.AggregationBuilders; import org.elasticsearch.search.aggregations.Aggregations; import org.elasticsearch.search.aggregations.bucket.terms.StringTerms; import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder; import org.elasticsearch.search.aggregations.metrics.tophits.InternalTopHits; import org.elasticsearch.search.aggregations.metrics.tophits.TopHitsAggregationBuilder; import org.springframework.data.elasticsearch.core.ElasticsearchTemplate; import org.springframework.data.elasticsearch.core.query.NativeSearchQuery; import org.springframework.data.elasticsearch.core.query.NativeSearchQueryBuilder;
@Service public class EsService {
@Autowired private ElasticsearchTemplate template;
public List<Map<String, Object>> agg() { TermsAggregationBuilder key1Builder = AggregationBuilders.terms("key1").field("key1.keyword"); TermsAggregationBuilder key2Builder = AggregationBuilders.terms("key2").field("key2.keyword"); TermsAggregationBuilder key3Builder = AggregationBuilders.terms("key3").field("key3.keyword"); TopHitsAggregationBuilder otherColBuilder = AggregationBuilders.topHits("other_col"); TermsAggregationBuilder aggBuilder = key1Builder.subAggregation(key2Builder.subAggregation(key3Builder) .subAggregation(otherColBuilder)); NativeSearchQueryBuilder builder = new NativeSearchQueryBuilder().withIndices("group").withTypes("test"); NativeSearchQuery query = builder.addAggregation(aggBuilder).build(); List<Map<String, Object>> res = template.query(query, resp -> { System.out.println(resp); return flatAggRes(resp.getAggregations()); }); return res; }
private List<Map<String, Object>> flatAggRes(Aggregations aggregations) { List<Map<String, Object>> res = new ArrayList<>(); recurseMap(res, aggregations, new HashMap<>()); return res; }
private void recurseMap( List<Map<String, Object>> resList, Aggregations aggregations, Map<String, Object> prototype) { StringTerms stringTerms = null; InternalTopHits topHits = null; for (Aggregation aggregation : aggregations) { if (aggregation instanceof StringTerms) { stringTerms = (StringTerms) aggregation; } if (aggregation instanceof InternalTopHits) { topHits = (InternalTopHits) aggregation; } } if (stringTerms == null) { return; } Map<String, Object> thisTurnProptype = new HashMap<>(prototype); if (topHits != null) { JSONObject topHitsJson = JSON.parseObject(topHits.getHits().iterator().next().getSourceAsString()); thisTurnProptype.put(topHits.getName(), topHitsJson); }
List<StringTerms.Bucket> thisTurnBuckets = stringTerms.getBuckets(); if (thisTurnBuckets.isEmpty()) { return; } if (thisTurnBuckets.get(0).getAggregations().asList().size() > 0) { String aggKey = stringTerms.getName(); for (StringTerms.Bucket thisTrunBucket : thisTurnBuckets) { String aggValue = thisTrunBucket.getKeyAsString(); thisTurnProptype.put(aggKey, aggValue); recurseMap(resList, thisTrunBucket.getAggregations(), thisTurnProptype); } return; } List<Map<String, Object>> thisTurnList = new ArrayList<>(); String mapKey = stringTerms.getName(); for (StringTerms.Bucket bucket : thisTurnBuckets) { String mapValue = bucket.getKeyAsString(); long count = bucket.getDocCount(); Map<String, Object> item = new HashMap<>(thisTurnProptype); item.put(mapKey, mapValue); item.put("count", count); thisTurnList.add(item); } resList.addAll(thisTurnList); } }
|