Coverage Report

Created: 2026-03-13 05:13

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/es/es_scroll_query.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "exec/es/es_scroll_query.h"
19
20
#include <glog/logging.h>
21
#include <rapidjson/encodings.h>
22
#include <rapidjson/rapidjson.h>
23
#include <stdlib.h>
24
25
#include <sstream>
26
27
#include "exec/es/es_scan_reader.h"
28
#include "rapidjson/document.h"
29
#include "rapidjson/stringbuffer.h"
30
#include "rapidjson/writer.h"
31
32
namespace doris {
33
34
0
ESScrollQueryBuilder::ESScrollQueryBuilder() {}
35
36
0
ESScrollQueryBuilder::~ESScrollQueryBuilder() {}
37
38
std::string ESScrollQueryBuilder::build_next_scroll_body(const std::string& scroll_id,
39
0
                                                         const std::string& scroll) {
40
0
    rapidjson::Document scroll_dsl;
41
0
    rapidjson::Document::AllocatorType& allocator = scroll_dsl.GetAllocator();
42
0
    scroll_dsl.SetObject();
43
0
    rapidjson::Value scroll_id_value(scroll_id.c_str(), allocator);
44
0
    scroll_dsl.AddMember("scroll_id", scroll_id_value, allocator);
45
0
    rapidjson::Value scroll_value(scroll.c_str(), allocator);
46
0
    scroll_dsl.AddMember("scroll", scroll_value, allocator);
47
0
    rapidjson::StringBuffer buffer;
48
0
    rapidjson::Writer<rapidjson::StringBuffer> writer(buffer);
49
0
    scroll_dsl.Accept(writer);
50
0
    return buffer.GetString();
51
0
}
52
704
std::string ESScrollQueryBuilder::build_clear_scroll_body(const std::string& scroll_id) {
53
704
    rapidjson::Document delete_scroll_dsl;
54
704
    rapidjson::Document::AllocatorType& allocator = delete_scroll_dsl.GetAllocator();
55
704
    delete_scroll_dsl.SetObject();
56
704
    rapidjson::Value scroll_id_value(scroll_id.c_str(), allocator);
57
704
    delete_scroll_dsl.AddMember("scroll_id", scroll_id_value, allocator);
58
704
    rapidjson::StringBuffer buffer;
59
704
    rapidjson::Writer<rapidjson::StringBuffer> writer(buffer);
60
704
    delete_scroll_dsl.Accept(writer);
61
704
    return buffer.GetString();
62
704
}
63
64
std::string ESScrollQueryBuilder::build(const std::map<std::string, std::string>& properties,
65
                                        const std::vector<std::string>& fields,
66
                                        const std::map<std::string, std::string>& docvalue_context,
67
704
                                        bool* doc_value_mode) {
68
704
    rapidjson::Document es_query_dsl;
69
704
    rapidjson::Document::AllocatorType& allocator = es_query_dsl.GetAllocator();
70
704
    es_query_dsl.SetObject();
71
    // generate the filter clause
72
704
    rapidjson::Document scratch_document;
73
704
    rapidjson::Value query_node(rapidjson::kObjectType);
74
    // use fe generate dsl, it must be placed outside the if, otherwise it will cause problems in AddMember
75
704
    rapidjson::Document fe_query_dsl;
76
704
    DCHECK(properties.find(ESScanReader::KEY_QUERY_DSL) != properties.end());
77
704
    auto query_dsl = properties.at(ESScanReader::KEY_QUERY_DSL);
78
704
    es_query_dsl.AddMember("query", fe_query_dsl.Parse(query_dsl.c_str(), query_dsl.length()),
79
704
                           allocator);
80
81
    // Doris FE already has checked docvalue-scan optimization
82
704
    bool pure_docvalue = true;
83
704
    if (properties.find(ESScanReader::KEY_DOC_VALUES_MODE) != properties.end()) {
84
700
        pure_docvalue = atoi(properties.at(ESScanReader::KEY_DOC_VALUES_MODE).c_str());
85
700
    } else {
86
        // check docvalue scan optimization, used for compatibility
87
4
        if (docvalue_context.size() == 0 || docvalue_context.size() < fields.size()) {
88
4
            pure_docvalue = false;
89
4
        } else {
90
0
            for (auto& select_field : fields) {
91
0
                if (docvalue_context.find(select_field) == docvalue_context.end()) {
92
0
                    pure_docvalue = false;
93
0
                    break;
94
0
                }
95
0
            }
96
0
        }
97
4
    }
98
99
704
    *doc_value_mode = pure_docvalue;
100
101
704
    rapidjson::Value source_node(rapidjson::kArrayType);
102
704
    if (pure_docvalue) {
103
228
        for (auto& select_field : fields) {
104
228
            rapidjson::Value field(docvalue_context.at(select_field).c_str(), allocator);
105
228
            source_node.PushBack(field, allocator);
106
228
        }
107
520
    } else {
108
6.86k
        for (auto& select_field : fields) {
109
6.86k
            rapidjson::Value field(select_field.c_str(), allocator);
110
6.86k
            source_node.PushBack(field, allocator);
111
6.86k
        }
112
520
    }
113
114
    // just filter the selected fields for reducing the network cost
115
704
    if (pure_docvalue) {
116
184
        es_query_dsl.AddMember("stored_fields", "_none_", allocator);
117
184
        es_query_dsl.AddMember("docvalue_fields", source_node, allocator);
118
520
    } else {
119
520
        es_query_dsl.AddMember("_source", source_node, allocator);
120
520
    }
121
122
704
    int size;
123
704
    if (properties.find(ESScanReader::KEY_TERMINATE_AFTER) != properties.end()) {
124
0
        size = atoi(properties.at(ESScanReader::KEY_TERMINATE_AFTER).c_str());
125
704
    } else {
126
704
        size = atoi(properties.at(ESScanReader::KEY_BATCH_SIZE).c_str());
127
704
    }
128
129
704
    std::string shard_id;
130
704
    if (properties.find(ESScanReader::KEY_SHARD) != properties.end()) {
131
704
        shard_id = properties.at(ESScanReader::KEY_SHARD);
132
704
    }
133
    // To maintain consistency with the query, when shard_id is negative, do not add sort node in scroll request body.
134
704
    if (!shard_id.empty() && std::stoi(shard_id) >= 0) {
135
390
        rapidjson::Value sort_node(rapidjson::kArrayType);
136
        // use the scroll-scan mode for scan index documents
137
390
        rapidjson::Value field("_doc", allocator);
138
390
        sort_node.PushBack(field, allocator);
139
390
        es_query_dsl.AddMember("sort", sort_node, allocator);
140
390
    }
141
    // number of documents returned
142
704
    es_query_dsl.AddMember("size", size, allocator);
143
704
    rapidjson::StringBuffer buffer;
144
704
    rapidjson::Writer<rapidjson::StringBuffer> writer(buffer);
145
704
    es_query_dsl.Accept(writer);
146
704
    std::string es_query_dsl_json = buffer.GetString();
147
    LOG(INFO) << "Generated ES queryDSL [ " << es_query_dsl_json << " ]";
148
704
    return es_query_dsl_json;
149
704
}
150
} // namespace doris