工程笔记:elasticsearch聚合分组操作语句和spring-data调用并解析结果数据

下面主要记录es如何实现将数据按字段分组(group by)的操作,以及spring data如何调用es分组语句和解析结果数据。

比如有如下的group by操作(以mysql语句为例):

1
select count(id), key1, key2, key3 from table group by key1, key2, key3;
  • 对table表分组查询
  • 分组字段为key1,key2,key3
  • 返回每组的条数,以及分组字段值

最终要解析到如下数据结构

1
2
3
4
5
6
7
8
9
10
11
[
{
key1=a, key2=b, key3=c, count=2
},
{
key1=a, key2=d, key3=e, count=2
},
{
key1=f, key2=g, key3=h, count=1
}
]

ES语句实现

假设现在es中有如下5条数据,按照key1,key2,key3字段做分组操作。

_index _type _id key1 key2 key3 value
group test 1 a b c val1
group test 2 a b c val2
group test 3 a d e val3
group test 4 a d e val4
group test 5 f g h val5

es中的查询语句为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
GET /group/test/_search
{
"aggs": {
"key1": {
"terms": {
"field": "key1.keyword"
},
"aggs": {
"key2": {
"terms": {
"field": "key2.keyword"
},
"aggs": {
"key3": {
"terms": {
"field": "key3.keyword"
}
}
}
}
}
}
}
}

语句执行后得到数据结果如下(数据中已经省略了部分无关的数据),可以看到结果层级非常复杂。
仔细观察可以发现,每一个分组字段下面都是一个桶(bucket),数据结构是一个数组,数据中的元素就是一个分组字段值对象,对象中指出了分组字段的值以及该组的数据数量,以及下一级分组,然后同理依次延生下去,这种数据结构最好的解析方法就是使用递归。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
{
"aggregations": {
"key1": {
"buckets": [
{
"key": "a",
"doc_count": 4,
"key2": {
"buckets": [
{
"key": "b",
"doc_count": 2,
"key3": {
"buckets": [
{
"key": "c",
"doc_count": 2
}
]
}
},
{
"key": "d",
"doc_count": 2,
"key3": {
"buckets": [
{
"key": "e",
"doc_count": 2
}
]
}
}
]
}
},
{
"key": "f",
"doc_count": 1,
"key2": {
"buckets": [
{
"key": "g",
"doc_count": 1,
"key3": {
"buckets": [
{
"key": "h",
"doc_count": 1
}
]
}
}
]
}
}
]
}
}
}

Template方式执行语句

springboot接入es后可以通过ElasticsearchTemplate执行查询语句。

  1. 引入ElasticsearchTemplate。

    1
    2
    @Autowired
    private ElasticsearchTemplate template;
  2. 定义聚合对象(分组字段),es中分组是使用的聚合操作的Terms Aggregation。

    当有多级分组时,后一个分组作为前一个分组的子聚合,在spring data中使用subAggregation方法。

    1
    2
    3
    4
    5
    6
    // 定义分组字段
    TermsAggregationBuilder key1Builder = AggregationBuilders.terms("key1").field("key1.keyword");
    TermsAggregationBuilder key2Builder = AggregationBuilders.terms("key2").field("key2.keyword");
    TermsAggregationBuilder key3Builder = AggregationBuilders.terms("key3").field("key3.keyword");
    // 组合分组字段
    TermsAggregationBuilder aggBuilder = key1Builder.subAggregation(key2Builder.subAggregation(key3Builder));
  3. 执行语句。

    1
    2
    3
    4
    5
    6
    NativeSearchQueryBuilder builder = new NativeSearchQueryBuilder().withIndices("group").withTypes("test");
    NativeSearchQuery query = builder.addAggregation(aggBuilder).build();
    T res = template.query(query, resp -> {
    // 函数中传入SearchResponse对象,可以通过getAggregations获取聚合结果,然后解析后返回
    return T;
    });

    spring data解析数据

上述lambda表达式中的resp如果直接打印出来就是es分组查询后的结果的json字符串,上面也说到这个数据结构可以使用递归来处理。

  1. 先通过getAggregations方法获取到第一级的分组字段的aggregation数组,然后遍历数组元素,并获取元素的桶数组,再依次对桶元素执行getAggregations方法,进入递归。

  2. 分组操作在es中叫做terms aggregation,所以getAggregations方法获取到的数组的元素类型就叫做StringTerms,由于每一级分组只有一个字段,所以aggregation数组中只会有一个StringTerms对象,当递归到某一级分组字段后,发现没有StringTerms对象了,那么就说明递归的上一级已是最后一级分组字段了,就不必继续递归了。

  3. 每递归深入一层,就会多一个字段,这种字段不确定的数据结构使用Map保存。当每深入一层时,需要把上层得到的字段Map传入,然后再把本层的字段添加进去。由于每一层的字段值可能有多个,所以需要用上层传入的字段Map的拷贝来保存数据,我把上层传入的字段Map叫做原型(prototype)。

  4. 当递归结束后,把最后的数据Map放入到集合中,最后将集合返回,就是解析完成的数据集合了。

下面将展示递归代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
/**
* 递归处理聚合结果,结束逻辑:当前的agg下没有stringterm聚合
*
* @param resList 结果集合
* @param aggregations 聚合结果,根据业务,每层只有一个stringterm聚合
* @param prototype 上层传来的原型,需要将上层的原型复制到本层,并将本层的map副本作为下层的原型
* @return
*/
private void recurseMap(List<Map<String, Object>> resList,
Aggregations aggregations,
Map<String, Object> prototype) {

// 如果没有StringTerms对象了,说明不需要继续递归了
StringTerms stringTerms = null;
for (Aggregation aggregation : aggregations) {
if (aggregation instanceof StringTerms) {
stringTerms = (StringTerms) aggregation;
}
}
if (stringTerms == null) {
return;
}

// 复制原型
Map<String, Object> thisTurnProptype = new HashMap<>(prototype);
List<StringTerms.Bucket> thisTurnBuckets = stringTerms.getBuckets();
// 没有数据就结束执行
if (thisTurnBuckets.isEmpty()) {
return;
}

// 结束条件判断
// 判断当前agg下的第一个bucket下是否有agg,如果有就增加原型后继续递归,否则不需要继续递归了
if (thisTurnBuckets.get(0).getAggregations().asList().size() > 0) {
// 增加原型数据,继续进入递归
String aggKey = stringTerms.getName();
for (StringTerms.Bucket thisTrunBucket : thisTurnBuckets) {
String aggValue = thisTrunBucket.getKeyAsString();
thisTurnProptype.put(aggKey, aggValue);
recurseMap(resList, thisTrunBucket.getAggregations(), thisTurnProptype);
}
return;
}

// 处理数据
List<Map<String, Object>> thisTurnList = new ArrayList<>();
// agg的term作为map的key
String mapKey = stringTerms.getName();
for (StringTerms.Bucket bucket : thisTurnBuckets) {
// 聚合之后得到的key作为map的value
String mapValue = bucket.getKeyAsString();
long count = bucket.getDocCount();
Map<String, Object> item = new HashMap<>(thisTurnProptype);
item.put(mapKey, mapValue);
item.put("count", count);
thisTurnList.add(item);
}
resList.addAll(thisTurnList);
}

递归调用代码和resp处理代码如下:

1
2
3
4
5
private List<Map<String, Object>> flatAggRes(Aggregations aggregations) {
List<Map<String, Object>> res = new ArrayList<>();
recurseMap(res, aggregations, new HashMap<>());
return res;
}
1
List<Map<String, Object>> res = template.query(query, resp -> flatAggRes(resp.getAggregations()));

获取其他字段

按照key1,key2,key3分组后,只查询了条数和分组字段,如果想要获取其他字段的话,可以使用tophits来获取最匹配的那条数据,从而获取的其他字段的数据。

对于分组语句,不建议获取除分组字段外的其他字段值的,因为其他字段在该分组情况下是不一定相同的,这样有可能获取到的字段不一致,就像MySQL5.7中就默认不建议这么做一样(否则需要开启ONLY_FULL_GROUP_BY)。

ES语句实现

只展示从key2开始聚合的部分语句,可以看到,other_col是top_hits语句,和key3语句并列,size表示只取1条。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
{
...
"aggs": {
"key3": {
"terms": {
"field": "key3.keyword"
}
},
"other_col": {
"top_hits": {
"size": 1
}
}
}
...
}

得到的结果如下,依然只展示部分结果,可以看到与key3并排的other_col下面的hits数组中有一条数据对象,这个就是最匹配的数据,可以从中拿到其他字段的值,如val。需要注意的是,前面也说过了,这些字段的值可能不一致,只能选取一致的值。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
{
...
"key2": {
"buckets": [
{
"key": "b",
"doc_count": 2,
"key3": {
"buckets": [
{
"key": "c",
"doc_count": 2
}
]
},
"other_col": {
"hits": {
"total": 2,
"max_score": 1,
"hits": [
{
"_index": "group",
"_type": "test",
"_id": "2",
"_score": 1,
"_source": {
"key1": "a",
"key2": "b",
"key3": "c",
"val": "val2"
}
}
]
}
}
}
...
]
}
...
}

spring data解析数据

根据上面的解析数据的代码,只需要添加和修改topHits相关代码即可。

  1. 添加tophits的聚合对象,与key3Builder并列,都为key2Builder的子聚合。
    1
    2
    3
    TopHitsAggregationBuilder otherColBuilder = AggregationBuilders.topHits("other_col");
    TermsAggregationBuilder aggBuilder = key1Builder.subAggregation(key2Builder.subAggregation(key3Builder)
    .subAggregation(otherColBuilder));
  2. 修改aggregations遍历代码,判断StringTerms和TopHits,并将topHits数据放入原型Map中。这里的topHits每一层默认只有一个。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    StringTerms stringTerms = null;
    InternalTopHits topHits = null;
    for (Aggregation aggregation : aggregations) {
    if (aggregation instanceof StringTerms) {
    stringTerms = (StringTerms) aggregation;
    }
    // 判断是否为tophits
    if (aggregation instanceof InternalTopHits) {
    topHits = (InternalTopHits) aggregation;
    }
    }
    if (stringTerms == null) {
    return;
    }
    Map<String, Object> thisTurnProptype = new HashMap<>(prototype);
    if (topHits != null) {
    // 将tophits加入到原型中
    JSONObject topHitsJson = JSON.parseObject(topHits.getHits().iterator().next().getSourceAsString());
    thisTurnProptype.put(topHits.getName(), topHitsJson);
    }
    最终的代码见文末附件。

Client方式执行语句

Client方式和Template方式代码类似,不同的地方只是入口类不同,Client使用如下方法获得resp。

1
2
SearchRequestBuilder builder = client.prepareSearch("group").setTypes("test");
SearchResponse resp = builder.addAggregation(aggBuilder).get();

附件:完整代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
package xxx;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.elasticsearch.search.aggregations.Aggregation;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.bucket.terms.StringTerms;
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.tophits.InternalTopHits;
import org.elasticsearch.search.aggregations.metrics.tophits.TopHitsAggregationBuilder;
import org.springframework.data.elasticsearch.core.ElasticsearchTemplate;
import org.springframework.data.elasticsearch.core.query.NativeSearchQuery;
import org.springframework.data.elasticsearch.core.query.NativeSearchQueryBuilder;

@Service
public class EsService {

@Autowired
private ElasticsearchTemplate template;

public List<Map<String, Object>> agg() {
TermsAggregationBuilder key1Builder = AggregationBuilders.terms("key1").field("key1.keyword");
TermsAggregationBuilder key2Builder = AggregationBuilders.terms("key2").field("key2.keyword");
TermsAggregationBuilder key3Builder = AggregationBuilders.terms("key3").field("key3.keyword");
TopHitsAggregationBuilder otherColBuilder = AggregationBuilders.topHits("other_col");
TermsAggregationBuilder aggBuilder = key1Builder.subAggregation(key2Builder.subAggregation(key3Builder)
.subAggregation(otherColBuilder));
NativeSearchQueryBuilder builder = new NativeSearchQueryBuilder().withIndices("group").withTypes("test");
NativeSearchQuery query = builder.addAggregation(aggBuilder).build();
List<Map<String, Object>> res = template.query(query, resp -> {
System.out.println(resp);
return flatAggRes(resp.getAggregations());
});
return res;
}

private List<Map<String, Object>> flatAggRes(Aggregations aggregations) {
List<Map<String, Object>> res = new ArrayList<>();
recurseMap(res, aggregations, new HashMap<>());
return res;
}

/**
* 递归处理聚合结果,结束逻辑:当前的agg下没有stringterm聚合
*
* @param resList 结果集合
* @param aggregations 聚合结果,根据业务,每层只有一个stringterm聚合,有可能并排一个tophits聚合
* @param prototype 上层传来的原型,需要将上层的原型复制到本层,并将本层的map副本作为下层的原型
* @return
*/
private void recurseMap(
List<Map<String, Object>> resList, Aggregations aggregations, Map<String, Object> prototype) {
StringTerms stringTerms = null;
InternalTopHits topHits = null;
for (Aggregation aggregation : aggregations) {
if (aggregation instanceof StringTerms) {
stringTerms = (StringTerms) aggregation;
}
if (aggregation instanceof InternalTopHits) {
topHits = (InternalTopHits) aggregation;
}
}
if (stringTerms == null) {
return;
}
Map<String, Object> thisTurnProptype = new HashMap<>(prototype);
if (topHits != null) {
JSONObject topHitsJson = JSON.parseObject(topHits.getHits().iterator().next().getSourceAsString());
thisTurnProptype.put(topHits.getName(), topHitsJson);
}

List<StringTerms.Bucket> thisTurnBuckets = stringTerms.getBuckets();
// 没有数据就结束执行
if (thisTurnBuckets.isEmpty()) {
return;
}
// 结束条件判断
// 判断当前agg下的第一个bucket下是否有agg,如果有就增加原型后继续递归,否则处理数据
if (thisTurnBuckets.get(0).getAggregations().asList().size() > 0) {
// 增加原型数据,继续进入递归
String aggKey = stringTerms.getName();
for (StringTerms.Bucket thisTrunBucket : thisTurnBuckets) {
String aggValue = thisTrunBucket.getKeyAsString();
thisTurnProptype.put(aggKey, aggValue);
recurseMap(resList, thisTrunBucket.getAggregations(), thisTurnProptype);
}
return;
}
// 处理数据
List<Map<String, Object>> thisTurnList = new ArrayList<>();
// agg的term作为map的key
String mapKey = stringTerms.getName();
for (StringTerms.Bucket bucket : thisTurnBuckets) {
// 聚合之后得到的key作为map的value
String mapValue = bucket.getKeyAsString();
long count = bucket.getDocCount();
Map<String, Object> item = new HashMap<>(thisTurnProptype);
item.put(mapKey, mapValue);
item.put("count", count);
thisTurnList.add(item);
}
resList.addAll(thisTurnList);
}
}