Coverage Report

Created: 2026-03-16 01:21

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/es/es_scan_reader.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "exec/es/es_scan_reader.h"
19
20
#include <stdlib.h>
21
22
#include <map>
23
#include <sstream>
24
#include <string>
25
26
#include "common/config.h"
27
#include "common/logging.h"
28
#include "common/status.h"
29
#include "exec/es/es_scroll_parser.h"
30
#include "exec/es/es_scroll_query.h"
31
#include "service/http/http_method.h"
32
33
namespace doris {
34
35
// hits.hits._id used for obtain ES document `_id`
36
const std::string SOURCE_SCROLL_SEARCH_FILTER_PATH =
37
        "filter_path=_scroll_id,hits.hits._source,hits.total,hits.hits._id";
38
// hits.hits._score used for processing field not exists in one batch
39
const std::string DOCVALUE_SCROLL_SEARCH_FILTER_PATH =
40
        "filter_path=_scroll_id,hits.total,hits.hits._score,hits.hits.fields";
41
42
const std::string REQUEST_PREFERENCE_PREFIX = "&preference=_shards:";
43
const std::string REQUEST_SEARCH_SCROLL_PATH = "/_search/scroll";
44
const std::string REQUEST_SEPARATOR = "/";
45
46
ESScanReader::ESScanReader(const std::string& target,
47
                           const std::map<std::string, std::string>& props, bool doc_value_mode)
48
0
        : _scroll_keep_alive(config::es_scroll_keepalive),
49
0
          _http_timeout_ms(config::es_http_timeout_ms),
50
0
          _doc_value_mode(doc_value_mode) {
51
0
    _target = target;
52
0
    _index = props.at(KEY_INDEX);
53
0
    if (props.find(KEY_TYPE) != props.end()) {
54
0
        _type = props.at(KEY_TYPE);
55
0
    }
56
0
    if (props.find(KEY_USER_NAME) != props.end()) {
57
0
        _user_name = props.at(KEY_USER_NAME);
58
0
    }
59
0
    if (props.find(KEY_PASS_WORD) != props.end()) {
60
0
        _passwd = props.at(KEY_PASS_WORD);
61
0
    }
62
0
    if (props.find(KEY_SHARD) != props.end()) {
63
0
        _shards = props.at(KEY_SHARD);
64
0
    }
65
0
    if (props.find(KEY_QUERY) != props.end()) {
66
0
        _query = props.at(KEY_QUERY);
67
0
    }
68
0
    if (props.find(KEY_HTTP_SSL_ENABLED) != props.end()) {
69
0
        std::istringstream(props.at(KEY_HTTP_SSL_ENABLED)) >> std::boolalpha >> _use_ssl_client;
70
0
    }
71
72
0
    std::string batch_size_str = props.at(KEY_BATCH_SIZE);
73
0
    _batch_size = atoi(batch_size_str.c_str());
74
0
    std::string filter_path =
75
0
            _doc_value_mode ? DOCVALUE_SCROLL_SEARCH_FILTER_PATH : SOURCE_SCROLL_SEARCH_FILTER_PATH;
76
77
    // When shard_id is negative(-1), the request will be sent to ES without shard preference.
78
0
    int32_t shard_id = std::stoi(_shards);
79
0
    if (props.find(KEY_TERMINATE_AFTER) != props.end()) {
80
0
        _exactly_once = true;
81
0
        std::stringstream scratch;
82
        // just send a normal search  against the elasticsearch with additional terminate_after param to achieve terminate early effect when limit take effect
83
0
        if (_type.empty()) {
84
0
            if (shard_id < 0) {
85
0
                scratch << _target << REQUEST_SEPARATOR << _index << "/_search?" << filter_path;
86
0
            } else {
87
                // `terminate_after` and `size` can not be used together in scroll request of ES 8.x
88
0
                scratch << _target << REQUEST_SEPARATOR << _index << "/_search?"
89
0
                        << REQUEST_PREFERENCE_PREFIX << _shards << "&" << filter_path;
90
0
            }
91
0
        } else {
92
0
            if (shard_id < 0) {
93
0
                scratch << _target << REQUEST_SEPARATOR << _index << REQUEST_SEPARATOR << _type
94
0
                        << "/_search?"
95
0
                        << "terminate_after=" << props.at(KEY_TERMINATE_AFTER) << "&"
96
0
                        << filter_path;
97
0
            } else {
98
0
                scratch << _target << REQUEST_SEPARATOR << _index << REQUEST_SEPARATOR << _type
99
0
                        << "/_search?"
100
0
                        << "terminate_after=" << props.at(KEY_TERMINATE_AFTER)
101
0
                        << REQUEST_PREFERENCE_PREFIX << _shards << "&" << filter_path;
102
0
            }
103
0
        }
104
0
        _search_url = scratch.str();
105
0
    } else {
106
0
        _exactly_once = false;
107
0
        std::stringstream scratch;
108
        // scroll request for scanning
109
        // add terminate_after for the first scroll to avoid decompress all postings list
110
0
        if (_type.empty()) {
111
0
            if (shard_id < 0) {
112
0
                scratch << _target << REQUEST_SEPARATOR << _index << "/_search?"
113
0
                        << "scroll=" << _scroll_keep_alive << "&" << filter_path;
114
0
            } else {
115
                // `terminate_after` and `size` can not be used together in scroll request of ES 8.x
116
0
                scratch << _target << REQUEST_SEPARATOR << _index << "/_search?"
117
0
                        << "scroll=" << _scroll_keep_alive << REQUEST_PREFERENCE_PREFIX << _shards
118
0
                        << "&" << filter_path;
119
0
            }
120
0
        } else {
121
0
            if (shard_id < 0) {
122
0
                scratch << _target << REQUEST_SEPARATOR << _index << REQUEST_SEPARATOR << _type
123
0
                        << "/_search?"
124
0
                        << "scroll=" << _scroll_keep_alive << "&" << filter_path
125
0
                        << "&terminate_after=" << batch_size_str;
126
0
            } else {
127
0
                scratch << _target << REQUEST_SEPARATOR << _index << REQUEST_SEPARATOR << _type
128
0
                        << "/_search?"
129
0
                        << "scroll=" << _scroll_keep_alive << REQUEST_PREFERENCE_PREFIX << _shards
130
0
                        << "&" << filter_path << "&terminate_after=" << batch_size_str;
131
0
            }
132
0
        }
133
0
        _init_scroll_url = scratch.str();
134
0
        _next_scroll_url = _target + REQUEST_SEARCH_SCROLL_PATH + "?" + filter_path;
135
0
    }
136
0
    _eos = false;
137
0
}
138
139
0
ESScanReader::~ESScanReader() {}
140
141
0
Status ESScanReader::open() {
142
0
    _is_first = true;
143
    // we do not enable set_fail_on_error for ES http request to get more detail error messages
144
0
    bool set_fail_on_error = false;
145
0
    if (_exactly_once) {
146
0
        RETURN_IF_ERROR(_network_client.init(_search_url, set_fail_on_error));
147
0
        LOG(INFO) << "search request URL: " << _search_url;
148
0
    } else {
149
0
        RETURN_IF_ERROR(_network_client.init(_init_scroll_url, set_fail_on_error));
150
0
        LOG(INFO) << "First scroll request URL: " << _init_scroll_url;
151
0
    }
152
0
    _network_client.set_basic_auth(_user_name, _passwd);
153
0
    _network_client.set_content_type("application/json");
154
0
    _network_client.set_timeout_ms(_http_timeout_ms);
155
0
    if (_use_ssl_client) {
156
0
        _network_client.use_untrusted_ssl();
157
0
    }
158
    // phase open, we cached the first response for `get_next` phase
159
0
    Status status = _network_client.execute_post_request(_query, &_cached_response);
160
0
    if (!status.ok() || _network_client.get_http_status() != 200) {
161
0
        std::stringstream ss;
162
0
        ss << "Failed to connect to ES server, errmsg is: " << status
163
0
           << ", response: " << _cached_response;
164
0
        LOG(WARNING) << ss.str();
165
0
        return Status::InternalError(ss.str());
166
0
    }
167
0
    VLOG_CRITICAL << "open _cached response: " << _cached_response;
168
0
    return Status::OK();
169
0
}
170
171
0
Status ESScanReader::get_next(bool* scan_eos, std::unique_ptr<ScrollParser>& scroll_parser) {
172
0
    std::string response;
173
    // if is first scroll request, should return the cached response
174
0
    *scan_eos = true;
175
0
    if (_eos) {
176
0
        return Status::OK();
177
0
    }
178
179
0
    if (_is_first) {
180
0
        response = _cached_response;
181
0
        _is_first = false;
182
0
    } else {
183
0
        if (_exactly_once) {
184
0
            return Status::OK();
185
0
        }
186
        // we do not enable set_fail_on_error for ES http request to get more detail error messages
187
0
        bool set_fail_on_error = false;
188
0
        RETURN_IF_ERROR(_network_client.init(_next_scroll_url, set_fail_on_error));
189
0
        _network_client.set_basic_auth(_user_name, _passwd);
190
0
        _network_client.set_content_type("application/json");
191
0
        _network_client.set_timeout_ms(_http_timeout_ms);
192
0
        if (_use_ssl_client) {
193
0
            _network_client.use_untrusted_ssl();
194
0
        }
195
0
        RETURN_IF_ERROR(_network_client.execute_post_request(
196
0
                ESScrollQueryBuilder::build_next_scroll_body(_scroll_id, _scroll_keep_alive),
197
0
                &response));
198
0
        long status = _network_client.get_http_status();
199
0
        if (status == 404) {
200
0
            LOG(WARNING) << "request scroll search failure 404["
201
0
                         << ", response: " << (response.empty() ? "empty response" : response)
202
0
                         << "]";
203
0
            return Status::InternalError("No search context found for {}", _scroll_id);
204
0
        }
205
0
        if (status != 200) {
206
0
            LOG(WARNING) << "request scroll search failure["
207
0
                         << "http status: " << status
208
0
                         << ", response: " << (response.empty() ? "empty response" : response)
209
0
                         << "]";
210
0
            return Status::InternalError("request scroll search failure: {}",
211
0
                                         (response.empty() ? "empty response" : response));
212
0
        }
213
0
    }
214
215
0
    scroll_parser.reset(new ScrollParser(_doc_value_mode));
216
0
    VLOG_CRITICAL << "get_next request ES, returned response: " << response;
217
0
    Status status = scroll_parser->parse(response, _exactly_once);
218
0
    if (!status.ok()) {
219
0
        _eos = true;
220
0
        LOG(WARNING) << status;
221
0
        return status;
222
0
    }
223
224
    // request ES just only once
225
0
    if (_exactly_once) {
226
0
        _eos = true;
227
0
    } else {
228
0
        _scroll_id = scroll_parser->get_scroll_id();
229
0
        if (scroll_parser->get_size() == 0) {
230
0
            _eos = true;
231
0
            return Status::OK();
232
0
        }
233
234
0
        _eos = scroll_parser->get_size() < _batch_size;
235
0
    }
236
0
    *scan_eos = false;
237
0
    return Status::OK();
238
0
}
239
240
0
Status ESScanReader::close() {
241
0
    if (_scroll_id.empty()) {
242
0
        return Status::OK();
243
0
    }
244
245
0
    std::string scratch_target = _target + REQUEST_SEARCH_SCROLL_PATH;
246
    // we do not enable set_fail_on_error for ES http request to get more detail error messages
247
0
    bool set_fail_on_error = false;
248
0
    RETURN_IF_ERROR(_network_client.init(scratch_target, set_fail_on_error));
249
0
    _network_client.set_basic_auth(_user_name, _passwd);
250
0
    _network_client.set_method(DELETE);
251
0
    _network_client.set_content_type("application/json");
252
0
    _network_client.set_timeout_ms(_http_timeout_ms);
253
0
    if (_use_ssl_client) {
254
0
        _network_client.use_untrusted_ssl();
255
0
    }
256
0
    std::string response;
257
0
    RETURN_IF_ERROR(_network_client.execute_delete_request(
258
0
            ESScrollQueryBuilder::build_clear_scroll_body(_scroll_id), &response));
259
0
    long status = _network_client.get_http_status();
260
0
    if (status == 200) {
261
0
        return Status::OK();
262
0
    } else {
263
0
        LOG(WARNING) << "es_scan_reader delete scroll context failure["
264
0
                     << "http status: " << status
265
0
                     << ", response: " << (response.empty() ? "empty response" : response) << "]";
266
0
        return Status::InternalError("es_scan_reader delete scroll context failure");
267
0
    }
268
0
}
269
} // namespace doris