Coverage Report

Created: 2026-03-26 04:18

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/es/es_scan_reader.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "exec/es/es_scan_reader.h"
19
20
#include <stdlib.h>
21
22
#include <map>
23
#include <sstream>
24
#include <string>
25
26
#include "common/config.h"
27
#include "common/logging.h"
28
#include "common/status.h"
29
#include "exec/es/es_scroll_parser.h"
30
#include "exec/es/es_scroll_query.h"
31
#include "service/http/http_method.h"
32
33
namespace doris {
34
35
// hits.hits._id used for obtain ES document `_id`
36
const std::string SOURCE_SCROLL_SEARCH_FILTER_PATH =
37
        "filter_path=_scroll_id,hits.hits._source,hits.total,hits.hits._id";
38
// hits.hits._score used for processing field not exists in one batch
39
const std::string DOCVALUE_SCROLL_SEARCH_FILTER_PATH =
40
        "filter_path=_scroll_id,hits.total,hits.hits._score,hits.hits.fields";
41
42
const std::string REQUEST_PREFERENCE_PREFIX = "&preference=_shards:";
43
const std::string REQUEST_SEARCH_SCROLL_PATH = "/_search/scroll";
44
const std::string REQUEST_SEPARATOR = "/";
45
46
ESScanReader::ESScanReader(const std::string& target,
47
                           const std::map<std::string, std::string>& props, bool doc_value_mode)
48
640
        : _scroll_keep_alive(config::es_scroll_keepalive),
49
640
          _http_timeout_ms(config::es_http_timeout_ms),
50
640
          _doc_value_mode(doc_value_mode) {
51
640
    _target = target;
52
640
    _index = props.at(KEY_INDEX);
53
640
    if (props.find(KEY_TYPE) != props.end()) {
54
0
        _type = props.at(KEY_TYPE);
55
0
    }
56
640
    if (props.find(KEY_USER_NAME) != props.end()) {
57
640
        _user_name = props.at(KEY_USER_NAME);
58
640
    }
59
640
    if (props.find(KEY_PASS_WORD) != props.end()) {
60
640
        _passwd = props.at(KEY_PASS_WORD);
61
640
    }
62
640
    if (props.find(KEY_SHARD) != props.end()) {
63
640
        _shards = props.at(KEY_SHARD);
64
640
    }
65
640
    if (props.find(KEY_QUERY) != props.end()) {
66
640
        _query = props.at(KEY_QUERY);
67
640
    }
68
640
    if (props.find(KEY_HTTP_SSL_ENABLED) != props.end()) {
69
640
        std::istringstream(props.at(KEY_HTTP_SSL_ENABLED)) >> std::boolalpha >> _use_ssl_client;
70
640
    }
71
72
640
    std::string batch_size_str = props.at(KEY_BATCH_SIZE);
73
640
    _batch_size = atoi(batch_size_str.c_str());
74
640
    std::string filter_path =
75
640
            _doc_value_mode ? DOCVALUE_SCROLL_SEARCH_FILTER_PATH : SOURCE_SCROLL_SEARCH_FILTER_PATH;
76
77
    // When shard_id is negative(-1), the request will be sent to ES without shard preference.
78
640
    int32_t shard_id = std::stoi(_shards);
79
640
    if (props.find(KEY_TERMINATE_AFTER) != props.end()) {
80
0
        _exactly_once = true;
81
0
        std::stringstream scratch;
82
        // just send a normal search  against the elasticsearch with additional terminate_after param to achieve terminate early effect when limit take effect
83
0
        if (_type.empty()) {
84
0
            if (shard_id < 0) {
85
0
                scratch << _target << REQUEST_SEPARATOR << _index << "/_search?" << filter_path;
86
0
            } else {
87
                // `terminate_after` and `size` can not be used together in scroll request of ES 8.x
88
0
                scratch << _target << REQUEST_SEPARATOR << _index << "/_search?"
89
0
                        << REQUEST_PREFERENCE_PREFIX << _shards << "&" << filter_path;
90
0
            }
91
0
        } else {
92
0
            if (shard_id < 0) {
93
0
                scratch << _target << REQUEST_SEPARATOR << _index << REQUEST_SEPARATOR << _type
94
0
                        << "/_search?"
95
0
                        << "terminate_after=" << props.at(KEY_TERMINATE_AFTER) << "&"
96
0
                        << filter_path;
97
0
            } else {
98
0
                scratch << _target << REQUEST_SEPARATOR << _index << REQUEST_SEPARATOR << _type
99
0
                        << "/_search?"
100
0
                        << "terminate_after=" << props.at(KEY_TERMINATE_AFTER)
101
0
                        << REQUEST_PREFERENCE_PREFIX << _shards << "&" << filter_path;
102
0
            }
103
0
        }
104
0
        _search_url = scratch.str();
105
640
    } else {
106
640
        _exactly_once = false;
107
640
        std::stringstream scratch;
108
        // scroll request for scanning
109
        // add terminate_after for the first scroll to avoid decompress all postings list
110
640
        if (_type.empty()) {
111
640
            if (shard_id < 0) {
112
274
                scratch << _target << REQUEST_SEPARATOR << _index << "/_search?"
113
274
                        << "scroll=" << _scroll_keep_alive << "&" << filter_path;
114
366
            } else {
115
                // `terminate_after` and `size` can not be used together in scroll request of ES 8.x
116
366
                scratch << _target << REQUEST_SEPARATOR << _index << "/_search?"
117
366
                        << "scroll=" << _scroll_keep_alive << REQUEST_PREFERENCE_PREFIX << _shards
118
366
                        << "&" << filter_path;
119
366
            }
120
640
        } else {
121
0
            if (shard_id < 0) {
122
0
                scratch << _target << REQUEST_SEPARATOR << _index << REQUEST_SEPARATOR << _type
123
0
                        << "/_search?"
124
0
                        << "scroll=" << _scroll_keep_alive << "&" << filter_path
125
0
                        << "&terminate_after=" << batch_size_str;
126
0
            } else {
127
0
                scratch << _target << REQUEST_SEPARATOR << _index << REQUEST_SEPARATOR << _type
128
0
                        << "/_search?"
129
0
                        << "scroll=" << _scroll_keep_alive << REQUEST_PREFERENCE_PREFIX << _shards
130
0
                        << "&" << filter_path << "&terminate_after=" << batch_size_str;
131
0
            }
132
0
        }
133
640
        _init_scroll_url = scratch.str();
134
640
        _next_scroll_url = _target + REQUEST_SEARCH_SCROLL_PATH + "?" + filter_path;
135
640
    }
136
640
    _eos = false;
137
640
}
138
139
640
ESScanReader::~ESScanReader() {}
140
141
640
Status ESScanReader::open() {
142
640
    _is_first = true;
143
    // we do not enable set_fail_on_error for ES http request to get more detail error messages
144
640
    bool set_fail_on_error = false;
145
640
    if (_exactly_once) {
146
0
        RETURN_IF_ERROR(_network_client.init(_search_url, set_fail_on_error));
147
0
        LOG(INFO) << "search request URL: " << _search_url;
148
640
    } else {
149
640
        RETURN_IF_ERROR(_network_client.init(_init_scroll_url, set_fail_on_error));
150
640
        LOG(INFO) << "First scroll request URL: " << _init_scroll_url;
151
640
    }
152
640
    _network_client.set_basic_auth(_user_name, _passwd);
153
640
    _network_client.set_content_type("application/json");
154
640
    _network_client.set_timeout_ms(_http_timeout_ms);
155
640
    if (_use_ssl_client) {
156
0
        _network_client.use_untrusted_ssl();
157
0
    }
158
    // phase open, we cached the first response for `get_next` phase
159
640
    Status status = _network_client.execute_post_request(_query, &_cached_response);
160
640
    if (!status.ok() || _network_client.get_http_status() != 200) {
161
0
        std::stringstream ss;
162
0
        ss << "Failed to connect to ES server, errmsg is: " << status
163
0
           << ", response: " << _cached_response;
164
0
        LOG(WARNING) << ss.str();
165
0
        return Status::InternalError(ss.str());
166
0
    }
167
640
    VLOG_CRITICAL << "open _cached response: " << _cached_response;
168
640
    return Status::OK();
169
640
}
170
171
1.27k
Status ESScanReader::get_next(bool* scan_eos, std::unique_ptr<ScrollParser>& scroll_parser) {
172
1.27k
    std::string response;
173
    // if is first scroll request, should return the cached response
174
1.27k
    *scan_eos = true;
175
1.27k
    if (_eos) {
176
636
        return Status::OK();
177
636
    }
178
179
638
    if (_is_first) {
180
638
        response = _cached_response;
181
638
        _is_first = false;
182
638
    } else {
183
0
        if (_exactly_once) {
184
0
            return Status::OK();
185
0
        }
186
        // we do not enable set_fail_on_error for ES http request to get more detail error messages
187
0
        bool set_fail_on_error = false;
188
0
        RETURN_IF_ERROR(_network_client.init(_next_scroll_url, set_fail_on_error));
189
0
        _network_client.set_basic_auth(_user_name, _passwd);
190
0
        _network_client.set_content_type("application/json");
191
0
        _network_client.set_timeout_ms(_http_timeout_ms);
192
0
        if (_use_ssl_client) {
193
0
            _network_client.use_untrusted_ssl();
194
0
        }
195
0
        RETURN_IF_ERROR(_network_client.execute_post_request(
196
0
                ESScrollQueryBuilder::build_next_scroll_body(_scroll_id, _scroll_keep_alive),
197
0
                &response));
198
0
        long status = _network_client.get_http_status();
199
0
        if (status == 404) {
200
0
            LOG(WARNING) << "request scroll search failure 404["
201
0
                         << ", response: " << (response.empty() ? "empty response" : response)
202
0
                         << "]";
203
0
            return Status::InternalError("No search context found for {}", _scroll_id);
204
0
        }
205
0
        if (status != 200) {
206
0
            LOG(WARNING) << "request scroll search failure["
207
0
                         << "http status: " << status
208
0
                         << ", response: " << (response.empty() ? "empty response" : response)
209
0
                         << "]";
210
0
            return Status::InternalError("request scroll search failure: {}",
211
0
                                         (response.empty() ? "empty response" : response));
212
0
        }
213
0
    }
214
215
638
    scroll_parser.reset(new ScrollParser(_doc_value_mode));
216
18.4E
    VLOG_CRITICAL << "get_next request ES, returned response: " << response;
217
638
    Status status = scroll_parser->parse(response, _exactly_once);
218
638
    if (!status.ok()) {
219
0
        _eos = true;
220
0
        LOG(WARNING) << status;
221
0
        return status;
222
0
    }
223
224
    // request ES just only once
225
638
    if (_exactly_once) {
226
0
        _eos = true;
227
638
    } else {
228
638
        _scroll_id = scroll_parser->get_scroll_id();
229
638
        if (scroll_parser->get_size() == 0) {
230
4
            _eos = true;
231
4
            return Status::OK();
232
4
        }
233
234
634
        _eos = scroll_parser->get_size() < _batch_size;
235
634
    }
236
634
    *scan_eos = false;
237
634
    return Status::OK();
238
638
}
239
240
640
Status ESScanReader::close() {
241
640
    if (_scroll_id.empty()) {
242
0
        return Status::OK();
243
0
    }
244
245
640
    std::string scratch_target = _target + REQUEST_SEARCH_SCROLL_PATH;
246
    // we do not enable set_fail_on_error for ES http request to get more detail error messages
247
640
    bool set_fail_on_error = false;
248
640
    RETURN_IF_ERROR(_network_client.init(scratch_target, set_fail_on_error));
249
640
    _network_client.set_basic_auth(_user_name, _passwd);
250
640
    _network_client.set_method(DELETE);
251
640
    _network_client.set_content_type("application/json");
252
640
    _network_client.set_timeout_ms(_http_timeout_ms);
253
640
    if (_use_ssl_client) {
254
0
        _network_client.use_untrusted_ssl();
255
0
    }
256
640
    std::string response;
257
640
    RETURN_IF_ERROR(_network_client.execute_delete_request(
258
640
            ESScrollQueryBuilder::build_clear_scroll_body(_scroll_id), &response));
259
640
    long status = _network_client.get_http_status();
260
640
    if (status == 200) {
261
640
        return Status::OK();
262
640
    } else {
263
0
        LOG(WARNING) << "es_scan_reader delete scroll context failure["
264
0
                     << "http status: " << status
265
0
                     << ", response: " << (response.empty() ? "empty response" : response) << "]";
266
0
        return Status::InternalError("es_scan_reader delete scroll context failure");
267
0
    }
268
640
}
269
} // namespace doris