be/src/service/backend_service.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "service/backend_service.h" |
19 | | |
20 | | #include <absl/strings/str_split.h> |
21 | | #include <arrow/record_batch.h> |
22 | | #include <fmt/format.h> |
23 | | #include <gen_cpp/BackendService.h> |
24 | | #include <gen_cpp/BackendService_types.h> |
25 | | #include <gen_cpp/Data_types.h> |
26 | | #include <gen_cpp/DorisExternalService_types.h> |
27 | | #include <gen_cpp/FrontendService_types.h> |
28 | | #include <gen_cpp/Metrics_types.h> |
29 | | #include <gen_cpp/PaloInternalService_types.h> |
30 | | #include <gen_cpp/Planner_types.h> |
31 | | #include <gen_cpp/Status_types.h> |
32 | | #include <gen_cpp/Types_types.h> |
33 | | #include <sys/types.h> |
34 | | #include <thrift/concurrency/ThreadFactory.h> |
35 | | #include <thrift/protocol/TDebugProtocol.h> |
36 | | #include <time.h> |
37 | | |
38 | | #include <cstdint> |
39 | | #include <map> |
40 | | #include <memory> |
41 | | #include <ostream> |
42 | | #include <ranges> |
43 | | #include <string> |
44 | | #include <thread> |
45 | | #include <utility> |
46 | | #include <vector> |
47 | | |
48 | | #include "absl/strings/substitute.h" |
49 | | #include "cloud/config.h" |
50 | | #include "common/config.h" |
51 | | #include "common/logging.h" |
52 | | #include "common/status.h" |
53 | | #include "exprs/function/dictionary_factory.h" |
54 | | #include "format/arrow/arrow_row_batch.h" |
55 | | #include "io/fs/connectivity/storage_connectivity_tester.h" |
56 | | #include "io/fs/local_file_system.h" |
57 | | #include "load/routine_load/routine_load_task_executor.h" |
58 | | #include "load/stream_load/stream_load_context.h" |
59 | | #include "load/stream_load/stream_load_recorder.h" |
60 | | #include "runtime/exec_env.h" |
61 | | #include "runtime/external_scan_context_mgr.h" |
62 | | #include "runtime/fragment_mgr.h" |
63 | | #include "runtime/result_queue_mgr.h" |
64 | | #include "runtime/runtime_profile.h" |
65 | | #include "service/http/http_client.h" |
66 | | #include "storage/olap_common.h" |
67 | | #include "storage/olap_define.h" |
68 | | #include "storage/rowset/beta_rowset.h" |
69 | | #include "storage/rowset/pending_rowset_helper.h" |
70 | | #include "storage/rowset/rowset_factory.h" |
71 | | #include "storage/rowset/rowset_meta.h" |
72 | | #include "storage/snapshot/snapshot_manager.h" |
73 | | #include "storage/storage_engine.h" |
74 | | #include "storage/tablet/tablet_manager.h" |
75 | | #include "storage/tablet/tablet_meta.h" |
76 | | #include "storage/txn/txn_manager.h" |
77 | | #include "udf/python/python_env.h" |
78 | | #include "util/defer_op.h" |
79 | | #include "util/threadpool.h" |
80 | | #include "util/thrift_server.h" |
81 | | #include "util/uid_util.h" |
82 | | #include "util/url_coding.h" |
83 | | |
84 | | namespace apache { |
85 | | namespace thrift { |
86 | | class TException; |
87 | | class TMultiplexedProcessor; |
88 | | class TProcessor; |
89 | | namespace transport { |
90 | | class TTransportException; |
91 | | } // namespace transport |
92 | | } // namespace thrift |
93 | | } // namespace apache |
94 | | |
95 | | namespace doris { |
96 | | #include "common/compile_check_begin.h" |
97 | | namespace { |
98 | | |
99 | | bvar::LatencyRecorder g_ingest_binlog_latency("doris_backend_service", "ingest_binlog"); |
100 | | |
101 | | struct IngestBinlogArg { |
102 | | int64_t txn_id; |
103 | | int64_t partition_id; |
104 | | int64_t local_tablet_id; |
105 | | TabletSharedPtr local_tablet; |
106 | | TIngestBinlogRequest request; |
107 | | TStatus* tstatus; |
108 | | }; |
109 | | |
110 | | Status _exec_http_req(std::optional<HttpClient>& client, int retry_times, int sleep_time, |
111 | 0 | const std::function<Status(HttpClient*)>& callback) { |
112 | 0 | if (client.has_value()) { |
113 | 0 | return client->execute(retry_times, sleep_time, callback); |
114 | 0 | } else { |
115 | 0 | return HttpClient::execute_with_retry(retry_times, sleep_time, callback); |
116 | 0 | } |
117 | 0 | } |
118 | | |
119 | | Status _download_binlog_segment_file(HttpClient* client, const std::string& get_segment_file_url, |
120 | | const std::string& segment_path, uint64_t segment_file_size, |
121 | | uint64_t estimate_timeout, |
122 | 0 | std::vector<std::string>& download_success_files) { |
123 | 0 | RETURN_IF_ERROR(client->init(get_segment_file_url)); |
124 | 0 | client->set_timeout_ms(estimate_timeout * 1000); |
125 | 0 | RETURN_IF_ERROR(client->download(segment_path)); |
126 | 0 | download_success_files.push_back(segment_path); |
127 | |
|
128 | 0 | std::string remote_file_md5; |
129 | 0 | RETURN_IF_ERROR(client->get_content_md5(&remote_file_md5)); |
130 | 0 | LOG(INFO) << "download segment file to " << segment_path << ", remote md5: " << remote_file_md5 |
131 | 0 | << ", remote size: " << segment_file_size; |
132 | |
|
133 | 0 | std::error_code ec; |
134 | | // Check file length |
135 | 0 | uint64_t local_file_size = std::filesystem::file_size(segment_path, ec); |
136 | 0 | if (ec) { |
137 | 0 | LOG(WARNING) << "download file error" << ec.message(); |
138 | 0 | return Status::IOError("can't retrive file_size of {}, due to {}", segment_path, |
139 | 0 | ec.message()); |
140 | 0 | } |
141 | | |
142 | 0 | if (local_file_size != segment_file_size) { |
143 | 0 | LOG(WARNING) << "download file length error" |
144 | 0 | << ", get_segment_file_url=" << get_segment_file_url |
145 | 0 | << ", file_size=" << segment_file_size |
146 | 0 | << ", local_file_size=" << local_file_size; |
147 | 0 | return Status::RuntimeError("downloaded file size is not equal, local={}, remote={}", |
148 | 0 | local_file_size, segment_file_size); |
149 | 0 | } |
150 | | |
151 | 0 | if (!remote_file_md5.empty()) { // keep compatibility |
152 | 0 | std::string local_file_md5; |
153 | 0 | RETURN_IF_ERROR(io::global_local_filesystem()->md5sum(segment_path, &local_file_md5)); |
154 | 0 | if (local_file_md5 != remote_file_md5) { |
155 | 0 | LOG(WARNING) << "download file md5 error" |
156 | 0 | << ", get_segment_file_url=" << get_segment_file_url |
157 | 0 | << ", remote_file_md5=" << remote_file_md5 |
158 | 0 | << ", local_file_md5=" << local_file_md5; |
159 | 0 | return Status::RuntimeError("download file md5 is not equal, local={}, remote={}", |
160 | 0 | local_file_md5, remote_file_md5); |
161 | 0 | } |
162 | 0 | } |
163 | | |
164 | 0 | return io::global_local_filesystem()->permission(segment_path, |
165 | 0 | io::LocalFileSystem::PERMS_OWNER_RW); |
166 | 0 | } |
167 | | |
168 | | Status _download_binlog_index_file(HttpClient* client, |
169 | | const std::string& get_segment_index_file_url, |
170 | | const std::string& local_segment_index_path, |
171 | | uint64_t segment_index_file_size, uint64_t estimate_timeout, |
172 | 0 | std::vector<std::string>& download_success_files) { |
173 | 0 | RETURN_IF_ERROR(client->init(get_segment_index_file_url)); |
174 | 0 | client->set_timeout_ms(estimate_timeout * 1000); |
175 | 0 | RETURN_IF_ERROR(client->download(local_segment_index_path)); |
176 | 0 | download_success_files.push_back(local_segment_index_path); |
177 | |
|
178 | 0 | std::string remote_file_md5; |
179 | 0 | RETURN_IF_ERROR(client->get_content_md5(&remote_file_md5)); |
180 | | |
181 | 0 | LOG(INFO) << "download segment index file to " << local_segment_index_path |
182 | 0 | << ", remote md5: " << remote_file_md5 |
183 | 0 | << ", remote size: " << segment_index_file_size; |
184 | |
|
185 | 0 | std::error_code ec; |
186 | | // Check file length |
187 | 0 | uint64_t local_index_file_size = std::filesystem::file_size(local_segment_index_path, ec); |
188 | 0 | if (ec) { |
189 | 0 | LOG(WARNING) << "download index file error" << ec.message(); |
190 | 0 | return Status::IOError("can't retrive file_size of {}, due to {}", local_segment_index_path, |
191 | 0 | ec.message()); |
192 | 0 | } |
193 | 0 | if (local_index_file_size != segment_index_file_size) { |
194 | 0 | LOG(WARNING) << "download index file length error" |
195 | 0 | << ", get_segment_index_file_url=" << get_segment_index_file_url |
196 | 0 | << ", index_file_size=" << segment_index_file_size |
197 | 0 | << ", local_index_file_size=" << local_index_file_size; |
198 | 0 | return Status::RuntimeError("downloaded index file size is not equal, local={}, remote={}", |
199 | 0 | local_index_file_size, segment_index_file_size); |
200 | 0 | } |
201 | | |
202 | 0 | if (!remote_file_md5.empty()) { // keep compatibility |
203 | 0 | std::string local_file_md5; |
204 | 0 | RETURN_IF_ERROR( |
205 | 0 | io::global_local_filesystem()->md5sum(local_segment_index_path, &local_file_md5)); |
206 | 0 | if (local_file_md5 != remote_file_md5) { |
207 | 0 | LOG(WARNING) << "download file md5 error" |
208 | 0 | << ", get_segment_index_file_url=" << get_segment_index_file_url |
209 | 0 | << ", remote_file_md5=" << remote_file_md5 |
210 | 0 | << ", local_file_md5=" << local_file_md5; |
211 | 0 | return Status::RuntimeError("download file md5 is not equal, local={}, remote={}", |
212 | 0 | local_file_md5, remote_file_md5); |
213 | 0 | } |
214 | 0 | } |
215 | | |
216 | 0 | return io::global_local_filesystem()->permission(local_segment_index_path, |
217 | 0 | io::LocalFileSystem::PERMS_OWNER_RW); |
218 | 0 | } |
219 | | |
220 | 0 | void _ingest_binlog(StorageEngine& engine, IngestBinlogArg* arg) { |
221 | 0 | std::optional<HttpClient> client; |
222 | 0 | if (config::enable_ingest_binlog_with_persistent_connection) { |
223 | | // Save the http client instance for persistent connection |
224 | 0 | client = std::make_optional<HttpClient>(); |
225 | 0 | } |
226 | |
|
227 | 0 | auto txn_id = arg->txn_id; |
228 | 0 | auto partition_id = arg->partition_id; |
229 | 0 | auto local_tablet_id = arg->local_tablet_id; |
230 | 0 | const auto& local_tablet = arg->local_tablet; |
231 | 0 | const auto& local_tablet_uid = local_tablet->tablet_uid(); |
232 | |
|
233 | 0 | std::shared_ptr<MemTrackerLimiter> mem_tracker = MemTrackerLimiter::create_shared( |
234 | 0 | MemTrackerLimiter::Type::OTHER, fmt::format("IngestBinlog#TxnId={}", txn_id)); |
235 | 0 | SCOPED_ATTACH_TASK(mem_tracker); |
236 | |
|
237 | 0 | auto& request = arg->request; |
238 | |
|
239 | 0 | MonotonicStopWatch watch(true); |
240 | 0 | int64_t total_download_bytes = 0; |
241 | 0 | int64_t total_download_files = 0; |
242 | 0 | TStatus tstatus; |
243 | 0 | std::vector<std::string> download_success_files; |
244 | 0 | std::unordered_map<std::string_view, uint64_t> elapsed_time_map; |
245 | 0 | Defer defer {[=, &engine, &tstatus, ingest_binlog_tstatus = arg->tstatus, &watch, |
246 | 0 | &total_download_bytes, &total_download_files, &elapsed_time_map]() { |
247 | 0 | g_ingest_binlog_latency << watch.elapsed_time_microseconds(); |
248 | 0 | auto elapsed_time_ms = static_cast<int64_t>(watch.elapsed_time_milliseconds()); |
249 | 0 | double copy_rate = 0.0; |
250 | 0 | if (elapsed_time_ms > 0) { |
251 | 0 | copy_rate = (double)total_download_bytes / ((double)elapsed_time_ms) / 1000; |
252 | 0 | } |
253 | 0 | LOG(INFO) << "ingest binlog elapsed " << elapsed_time_ms << " ms, download " |
254 | 0 | << total_download_files << " files, total " << total_download_bytes |
255 | 0 | << " bytes, avg rate " << copy_rate |
256 | 0 | << " MB/s. result: " << apache::thrift::ThriftDebugString(tstatus); |
257 | 0 | if (config::ingest_binlog_elapsed_threshold_ms >= 0 && |
258 | 0 | elapsed_time_ms > config::ingest_binlog_elapsed_threshold_ms) { |
259 | 0 | auto elapsed_details_view = |
260 | 0 | elapsed_time_map | std::views::transform([](const auto& pair) { |
261 | 0 | return fmt::format("{}:{}", pair.first, pair.second); |
262 | 0 | }); |
263 | 0 | std::string elapsed_details = fmt::format("{}", fmt::join(elapsed_details_view, ", ")); |
264 | 0 | LOG(WARNING) << "ingest binlog elapsed " << elapsed_time_ms << " ms, " |
265 | 0 | << elapsed_details; |
266 | 0 | } |
267 | 0 | if (tstatus.status_code != TStatusCode::OK) { |
268 | | // abort txn |
269 | 0 | engine.txn_manager()->abort_txn(partition_id, txn_id, local_tablet_id, |
270 | 0 | local_tablet_uid); |
271 | | // delete all successfully downloaded files |
272 | 0 | LOG(WARNING) << "will delete downloaded success files due to error " << tstatus; |
273 | 0 | std::vector<io::Path> paths; |
274 | 0 | for (const auto& file : download_success_files) { |
275 | 0 | paths.emplace_back(file); |
276 | 0 | LOG(WARNING) << "will delete downloaded success file " << file << " due to error"; |
277 | 0 | } |
278 | 0 | static_cast<void>(io::global_local_filesystem()->batch_delete(paths)); |
279 | 0 | LOG(WARNING) << "done delete downloaded success files due to error " << tstatus; |
280 | 0 | } |
281 | |
|
282 | 0 | if (ingest_binlog_tstatus) { |
283 | 0 | *ingest_binlog_tstatus = std::move(tstatus); |
284 | 0 | } |
285 | 0 | }}; |
286 | |
|
287 | 0 | auto set_tstatus = [&tstatus](TStatusCode::type code, std::string error_msg) { |
288 | 0 | tstatus.__set_status_code(code); |
289 | 0 | tstatus.__isset.error_msgs = true; |
290 | 0 | tstatus.error_msgs.push_back(std::move(error_msg)); |
291 | 0 | }; |
292 | |
|
293 | 0 | auto estimate_download_timeout = [](int64_t file_size) { |
294 | 0 | uint64_t estimate_timeout = file_size / config::download_low_speed_limit_kbps / 1024; |
295 | 0 | if (estimate_timeout < config::download_low_speed_time) { |
296 | 0 | estimate_timeout = config::download_low_speed_time; |
297 | 0 | } |
298 | 0 | return estimate_timeout; |
299 | 0 | }; |
300 | | |
301 | | // Step 3: get binlog info |
302 | 0 | auto binlog_api_url = fmt::format("http://{}:{}/api/_binlog/_download", request.remote_host, |
303 | 0 | request.remote_port); |
304 | 0 | constexpr int max_retry = 3; |
305 | |
|
306 | 0 | auto get_binlog_info_url = |
307 | 0 | fmt::format("{}?method={}&tablet_id={}&binlog_version={}", binlog_api_url, |
308 | 0 | "get_binlog_info", request.remote_tablet_id, request.binlog_version); |
309 | 0 | std::string binlog_info; |
310 | 0 | auto get_binlog_info_cb = [&get_binlog_info_url, &binlog_info](HttpClient* client) { |
311 | 0 | RETURN_IF_ERROR(client->init(get_binlog_info_url)); |
312 | 0 | client->set_timeout_ms(config::download_binlog_meta_timeout_ms); |
313 | 0 | return client->execute(&binlog_info); |
314 | 0 | }; |
315 | 0 | auto status = _exec_http_req(client, max_retry, 1, get_binlog_info_cb); |
316 | 0 | if (!status.ok()) { |
317 | 0 | LOG(WARNING) << "failed to get binlog info from " << get_binlog_info_url |
318 | 0 | << ", status=" << status.to_string(); |
319 | 0 | status.to_thrift(&tstatus); |
320 | 0 | return; |
321 | 0 | } |
322 | 0 | elapsed_time_map.emplace("get_binlog_info", watch.elapsed_time_microseconds()); |
323 | |
|
324 | 0 | std::vector<std::string> binlog_info_parts = absl::StrSplit(binlog_info, ":"); |
325 | 0 | if (binlog_info_parts.size() != 2) { |
326 | 0 | status = Status::RuntimeError("failed to parse binlog info into 2 parts: {}", binlog_info); |
327 | 0 | LOG(WARNING) << "failed to get binlog info from " << get_binlog_info_url |
328 | 0 | << ", status=" << status.to_string(); |
329 | 0 | status.to_thrift(&tstatus); |
330 | 0 | return; |
331 | 0 | } |
332 | 0 | std::string remote_rowset_id = std::move(binlog_info_parts[0]); |
333 | 0 | int64_t num_segments = -1; |
334 | 0 | try { |
335 | 0 | num_segments = std::stoll(binlog_info_parts[1]); |
336 | 0 | } catch (std::exception& e) { |
337 | 0 | status = Status::RuntimeError("failed to parse num segments from binlog info {}: {}", |
338 | 0 | binlog_info, e.what()); |
339 | 0 | LOG(WARNING) << "failed to get binlog info from " << get_binlog_info_url |
340 | 0 | << ", status=" << status; |
341 | 0 | status.to_thrift(&tstatus); |
342 | 0 | return; |
343 | 0 | } |
344 | | |
345 | | // Step 4: get rowset meta |
346 | 0 | auto get_rowset_meta_url = fmt::format( |
347 | 0 | "{}?method={}&tablet_id={}&rowset_id={}&binlog_version={}", binlog_api_url, |
348 | 0 | "get_rowset_meta", request.remote_tablet_id, remote_rowset_id, request.binlog_version); |
349 | 0 | std::string rowset_meta_str; |
350 | 0 | auto get_rowset_meta_cb = [&get_rowset_meta_url, &rowset_meta_str](HttpClient* client) { |
351 | 0 | RETURN_IF_ERROR(client->init(get_rowset_meta_url)); |
352 | 0 | client->set_timeout_ms(config::download_binlog_meta_timeout_ms); |
353 | 0 | return client->execute(&rowset_meta_str); |
354 | 0 | }; |
355 | 0 | status = _exec_http_req(client, max_retry, 1, get_rowset_meta_cb); |
356 | 0 | if (!status.ok()) { |
357 | 0 | LOG(WARNING) << "failed to get rowset meta from " << get_rowset_meta_url |
358 | 0 | << ", status=" << status.to_string(); |
359 | 0 | status.to_thrift(&tstatus); |
360 | 0 | return; |
361 | 0 | } |
362 | 0 | elapsed_time_map.emplace("get_rowset_meta", watch.elapsed_time_microseconds()); |
363 | |
|
364 | 0 | RowsetMetaPB rowset_meta_pb; |
365 | 0 | if (!rowset_meta_pb.ParseFromString(rowset_meta_str)) { |
366 | 0 | LOG(WARNING) << "failed to parse rowset meta from " << get_rowset_meta_url; |
367 | 0 | status = Status::InternalError("failed to parse rowset meta"); |
368 | 0 | status.to_thrift(&tstatus); |
369 | 0 | return; |
370 | 0 | } |
371 | | // save source rowset id and tablet id |
372 | 0 | rowset_meta_pb.set_source_rowset_id(remote_rowset_id); |
373 | 0 | rowset_meta_pb.set_source_tablet_id(request.remote_tablet_id); |
374 | | // rewrite rowset meta |
375 | 0 | rowset_meta_pb.set_tablet_id(local_tablet_id); |
376 | 0 | rowset_meta_pb.set_partition_id(partition_id); |
377 | 0 | rowset_meta_pb.set_tablet_schema_hash(local_tablet->tablet_meta()->schema_hash()); |
378 | 0 | rowset_meta_pb.set_txn_id(txn_id); |
379 | 0 | rowset_meta_pb.set_rowset_state(RowsetStatePB::COMMITTED); |
380 | 0 | auto rowset_meta = std::make_shared<RowsetMeta>(); |
381 | 0 | if (!rowset_meta->init_from_pb(rowset_meta_pb)) { |
382 | 0 | LOG(WARNING) << "failed to init rowset meta from " << get_rowset_meta_url; |
383 | 0 | status = Status::InternalError("failed to init rowset meta"); |
384 | 0 | status.to_thrift(&tstatus); |
385 | 0 | return; |
386 | 0 | } |
387 | 0 | RowsetId new_rowset_id = engine.next_rowset_id(); |
388 | 0 | auto pending_rs_guard = engine.pending_local_rowsets().add(new_rowset_id); |
389 | 0 | rowset_meta->set_rowset_id(new_rowset_id); |
390 | 0 | rowset_meta->set_tablet_uid(local_tablet->tablet_uid()); |
391 | | |
392 | | // Step 5: get all segment files |
393 | | // Step 5.1: get all segment files size |
394 | 0 | std::vector<std::string> segment_file_urls; |
395 | 0 | segment_file_urls.reserve(num_segments); |
396 | 0 | std::vector<uint64_t> segment_file_sizes; |
397 | 0 | segment_file_sizes.reserve(num_segments); |
398 | 0 | for (int64_t segment_index = 0; segment_index < num_segments; ++segment_index) { |
399 | 0 | auto get_segment_file_size_url = fmt::format( |
400 | 0 | "{}?method={}&tablet_id={}&rowset_id={}&segment_index={}", binlog_api_url, |
401 | 0 | "get_segment_file", request.remote_tablet_id, remote_rowset_id, segment_index); |
402 | 0 | uint64_t segment_file_size; |
403 | 0 | auto get_segment_file_size_cb = [&get_segment_file_size_url, |
404 | 0 | &segment_file_size](HttpClient* client) { |
405 | 0 | RETURN_IF_ERROR(client->init(get_segment_file_size_url)); |
406 | 0 | client->set_timeout_ms(config::download_binlog_meta_timeout_ms); |
407 | 0 | RETURN_IF_ERROR(client->head()); |
408 | 0 | return client->get_content_length(&segment_file_size); |
409 | 0 | }; |
410 | |
|
411 | 0 | status = _exec_http_req(client, max_retry, 1, get_segment_file_size_cb); |
412 | 0 | if (!status.ok()) { |
413 | 0 | LOG(WARNING) << "failed to get segment file size from " << get_segment_file_size_url |
414 | 0 | << ", status=" << status.to_string(); |
415 | 0 | status.to_thrift(&tstatus); |
416 | 0 | return; |
417 | 0 | } |
418 | | |
419 | 0 | segment_file_sizes.push_back(segment_file_size); |
420 | 0 | segment_file_urls.push_back(std::move(get_segment_file_size_url)); |
421 | 0 | } |
422 | 0 | elapsed_time_map.emplace("get_segment_file_size", watch.elapsed_time_microseconds()); |
423 | | |
424 | | // Step 5.2: check data capacity |
425 | 0 | uint64_t total_size = std::accumulate(segment_file_sizes.begin(), segment_file_sizes.end(), |
426 | 0 | 0ULL); // NOLINT(bugprone-fold-init-type) |
427 | 0 | if (!local_tablet->can_add_binlog(total_size)) { |
428 | 0 | LOG(WARNING) << "failed to add binlog, no enough space, total_size=" << total_size |
429 | 0 | << ", tablet=" << local_tablet->tablet_id(); |
430 | 0 | status = Status::InternalError("no enough space"); |
431 | 0 | status.to_thrift(&tstatus); |
432 | 0 | return; |
433 | 0 | } |
434 | 0 | total_download_bytes = total_size; |
435 | 0 | total_download_files = num_segments; |
436 | | |
437 | | // Step 5.3: get all segment files |
438 | 0 | for (int64_t segment_index = 0; segment_index < num_segments; ++segment_index) { |
439 | 0 | auto segment_file_size = segment_file_sizes[segment_index]; |
440 | 0 | auto get_segment_file_url = segment_file_urls[segment_index]; |
441 | 0 | if (config::enable_download_md5sum_check) { |
442 | 0 | get_segment_file_url = fmt::format("{}&acquire_md5=true", get_segment_file_url); |
443 | 0 | } |
444 | |
|
445 | 0 | auto segment_path = local_segment_path(local_tablet->tablet_path(), |
446 | 0 | rowset_meta->rowset_id().to_string(), segment_index); |
447 | 0 | LOG(INFO) << "download segment file from " << get_segment_file_url << " to " |
448 | 0 | << segment_path; |
449 | 0 | uint64_t estimate_timeout = estimate_download_timeout(segment_file_size); |
450 | 0 | auto get_segment_file_cb = [&get_segment_file_url, &segment_path, segment_file_size, |
451 | 0 | estimate_timeout, &download_success_files](HttpClient* client) { |
452 | 0 | return _download_binlog_segment_file(client, get_segment_file_url, segment_path, |
453 | 0 | segment_file_size, estimate_timeout, |
454 | 0 | download_success_files); |
455 | 0 | }; |
456 | |
|
457 | 0 | status = _exec_http_req(client, max_retry, 1, get_segment_file_cb); |
458 | 0 | if (!status.ok()) { |
459 | 0 | LOG(WARNING) << "failed to get segment file from " << get_segment_file_url |
460 | 0 | << ", status=" << status.to_string(); |
461 | 0 | status.to_thrift(&tstatus); |
462 | 0 | return; |
463 | 0 | } |
464 | 0 | } |
465 | 0 | elapsed_time_map.emplace("get_segment_files", watch.elapsed_time_microseconds()); |
466 | | |
467 | | // Step 6: get all segment index files |
468 | | // Step 6.1: get all segment index files size |
469 | 0 | std::vector<std::string> segment_index_file_urls; |
470 | 0 | std::vector<uint64_t> segment_index_file_sizes; |
471 | 0 | std::vector<std::string> segment_index_file_names; |
472 | 0 | auto tablet_schema = rowset_meta->tablet_schema(); |
473 | 0 | if (tablet_schema->get_inverted_index_storage_format() == InvertedIndexStorageFormatPB::V1) { |
474 | 0 | for (const auto& index : tablet_schema->inverted_indexes()) { |
475 | 0 | auto index_id = index->index_id(); |
476 | 0 | for (int64_t segment_index = 0; segment_index < num_segments; ++segment_index) { |
477 | 0 | auto get_segment_index_file_size_url = fmt::format( |
478 | 0 | "{}?method={}&tablet_id={}&rowset_id={}&segment_index={}&segment_index_id={" |
479 | 0 | "}", |
480 | 0 | binlog_api_url, "get_segment_index_file", request.remote_tablet_id, |
481 | 0 | remote_rowset_id, segment_index, index_id); |
482 | 0 | uint64_t segment_index_file_size; |
483 | 0 | auto get_segment_index_file_size_cb = |
484 | 0 | [&get_segment_index_file_size_url, |
485 | 0 | &segment_index_file_size](HttpClient* client) { |
486 | 0 | RETURN_IF_ERROR(client->init(get_segment_index_file_size_url)); |
487 | 0 | client->set_timeout_ms(config::download_binlog_meta_timeout_ms); |
488 | 0 | RETURN_IF_ERROR(client->head()); |
489 | 0 | return client->get_content_length(&segment_index_file_size); |
490 | 0 | }; |
491 | |
|
492 | 0 | auto segment_path = |
493 | 0 | local_segment_path(local_tablet->tablet_path(), |
494 | 0 | rowset_meta->rowset_id().to_string(), segment_index); |
495 | 0 | segment_index_file_names.push_back(InvertedIndexDescriptor::get_index_file_path_v1( |
496 | 0 | InvertedIndexDescriptor::get_index_file_path_prefix(segment_path), index_id, |
497 | 0 | index->get_index_suffix())); |
498 | |
|
499 | 0 | status = _exec_http_req(client, max_retry, 1, get_segment_index_file_size_cb); |
500 | 0 | if (!status.ok()) { |
501 | 0 | LOG(WARNING) << "failed to get segment file size from " |
502 | 0 | << get_segment_index_file_size_url |
503 | 0 | << ", status=" << status.to_string(); |
504 | 0 | status.to_thrift(&tstatus); |
505 | 0 | return; |
506 | 0 | } |
507 | | |
508 | 0 | segment_index_file_sizes.push_back(segment_index_file_size); |
509 | 0 | segment_index_file_urls.push_back(std::move(get_segment_index_file_size_url)); |
510 | 0 | } |
511 | 0 | } |
512 | 0 | } else { |
513 | 0 | for (int64_t segment_index = 0; segment_index < num_segments; ++segment_index) { |
514 | 0 | if (tablet_schema->has_inverted_index() || tablet_schema->has_ann_index()) { |
515 | 0 | auto get_segment_index_file_size_url = fmt::format( |
516 | 0 | "{}?method={}&tablet_id={}&rowset_id={}&segment_index={}&segment_index_id={" |
517 | 0 | "}", |
518 | 0 | binlog_api_url, "get_segment_index_file", request.remote_tablet_id, |
519 | 0 | remote_rowset_id, segment_index, -1); |
520 | 0 | uint64_t segment_index_file_size; |
521 | 0 | auto get_segment_index_file_size_cb = |
522 | 0 | [&get_segment_index_file_size_url, |
523 | 0 | &segment_index_file_size](HttpClient* client) { |
524 | 0 | RETURN_IF_ERROR(client->init(get_segment_index_file_size_url)); |
525 | 0 | client->set_timeout_ms(config::download_binlog_meta_timeout_ms); |
526 | 0 | RETURN_IF_ERROR(client->head()); |
527 | 0 | return client->get_content_length(&segment_index_file_size); |
528 | 0 | }; |
529 | 0 | auto segment_path = |
530 | 0 | local_segment_path(local_tablet->tablet_path(), |
531 | 0 | rowset_meta->rowset_id().to_string(), segment_index); |
532 | 0 | segment_index_file_names.push_back(InvertedIndexDescriptor::get_index_file_path_v2( |
533 | 0 | InvertedIndexDescriptor::get_index_file_path_prefix(segment_path))); |
534 | |
|
535 | 0 | status = _exec_http_req(client, max_retry, 1, get_segment_index_file_size_cb); |
536 | 0 | if (!status.ok()) { |
537 | 0 | LOG(WARNING) << "failed to get segment file size from " |
538 | 0 | << get_segment_index_file_size_url |
539 | 0 | << ", status=" << status.to_string(); |
540 | 0 | status.to_thrift(&tstatus); |
541 | 0 | return; |
542 | 0 | } |
543 | | |
544 | 0 | segment_index_file_sizes.push_back(segment_index_file_size); |
545 | 0 | segment_index_file_urls.push_back(std::move(get_segment_index_file_size_url)); |
546 | 0 | } |
547 | 0 | } |
548 | 0 | } |
549 | 0 | elapsed_time_map.emplace("get_segment_index_file_size", watch.elapsed_time_microseconds()); |
550 | | |
551 | | // Step 6.2: check data capacity |
552 | 0 | uint64_t total_index_size = |
553 | 0 | std::accumulate(segment_index_file_sizes.begin(), segment_index_file_sizes.end(), |
554 | 0 | 0ULL); // NOLINT(bugprone-fold-init-type) |
555 | 0 | if (!local_tablet->can_add_binlog(total_index_size)) { |
556 | 0 | LOG(WARNING) << "failed to add binlog, no enough space, total_index_size=" |
557 | 0 | << total_index_size << ", tablet=" << local_tablet->tablet_id(); |
558 | 0 | status = Status::InternalError("no enough space"); |
559 | 0 | status.to_thrift(&tstatus); |
560 | 0 | return; |
561 | 0 | } |
562 | 0 | total_download_bytes += total_index_size; |
563 | 0 | total_download_files += segment_index_file_urls.size(); |
564 | | |
565 | | // Step 6.3: get all segment index files |
566 | 0 | DCHECK(segment_index_file_sizes.size() == segment_index_file_names.size()); |
567 | 0 | DCHECK(segment_index_file_names.size() == segment_index_file_urls.size()); |
568 | 0 | for (int64_t i = 0; i < segment_index_file_urls.size(); ++i) { |
569 | 0 | auto segment_index_file_size = segment_index_file_sizes[i]; |
570 | 0 | auto get_segment_index_file_url = segment_index_file_urls[i]; |
571 | 0 | if (config::enable_download_md5sum_check) { |
572 | 0 | get_segment_index_file_url = |
573 | 0 | fmt::format("{}&acquire_md5=true", get_segment_index_file_url); |
574 | 0 | } |
575 | |
|
576 | 0 | uint64_t estimate_timeout = estimate_download_timeout(segment_index_file_size); |
577 | 0 | auto local_segment_index_path = segment_index_file_names[i]; |
578 | 0 | LOG(INFO) << fmt::format("download segment index file from {} to {}", |
579 | 0 | get_segment_index_file_url, local_segment_index_path); |
580 | 0 | auto get_segment_index_file_cb = [&get_segment_index_file_url, &local_segment_index_path, |
581 | 0 | segment_index_file_size, estimate_timeout, |
582 | 0 | &download_success_files](HttpClient* client) { |
583 | 0 | return _download_binlog_index_file(client, get_segment_index_file_url, |
584 | 0 | local_segment_index_path, segment_index_file_size, |
585 | 0 | estimate_timeout, download_success_files); |
586 | 0 | }; |
587 | |
|
588 | 0 | status = _exec_http_req(client, max_retry, 1, get_segment_index_file_cb); |
589 | 0 | if (!status.ok()) { |
590 | 0 | LOG(WARNING) << "failed to get segment index file from " << get_segment_index_file_url |
591 | 0 | << ", status=" << status.to_string(); |
592 | 0 | status.to_thrift(&tstatus); |
593 | 0 | return; |
594 | 0 | } |
595 | 0 | } |
596 | 0 | elapsed_time_map.emplace("get_segment_index_files", watch.elapsed_time_microseconds()); |
597 | | |
598 | | // Step 7: create rowset && calculate delete bitmap && commit |
599 | | // Step 7.1: create rowset |
600 | 0 | RowsetSharedPtr rowset; |
601 | 0 | status = RowsetFactory::create_rowset(local_tablet->tablet_schema(), |
602 | 0 | local_tablet->tablet_path(), rowset_meta, &rowset); |
603 | 0 | if (!status) { |
604 | 0 | LOG(WARNING) << "failed to create rowset from rowset meta for remote tablet" |
605 | 0 | << ". rowset_id: " << rowset_meta_pb.rowset_id() |
606 | 0 | << ", rowset_type: " << rowset_meta_pb.rowset_type() |
607 | 0 | << ", remote_tablet_id=" << rowset_meta_pb.tablet_id() << ", txn_id=" << txn_id |
608 | 0 | << ", status=" << status.to_string(); |
609 | 0 | status.to_thrift(&tstatus); |
610 | 0 | return; |
611 | 0 | } |
612 | | |
613 | | // Step 7.2 calculate delete bitmap before commit |
614 | 0 | auto calc_delete_bitmap_token = engine.calc_delete_bitmap_executor()->create_token(); |
615 | 0 | DeleteBitmapPtr delete_bitmap = std::make_shared<DeleteBitmap>(local_tablet_id); |
616 | 0 | RowsetIdUnorderedSet pre_rowset_ids; |
617 | 0 | if (local_tablet->enable_unique_key_merge_on_write()) { |
618 | 0 | auto beta_rowset = reinterpret_cast<BetaRowset*>(rowset.get()); |
619 | 0 | std::vector<segment_v2::SegmentSharedPtr> segments; |
620 | 0 | status = beta_rowset->load_segments(&segments); |
621 | 0 | if (!status) { |
622 | 0 | LOG(WARNING) << "failed to load segments from rowset" |
623 | 0 | << ". rowset_id: " << beta_rowset->rowset_id() << ", txn_id=" << txn_id |
624 | 0 | << ", status=" << status.to_string(); |
625 | 0 | status.to_thrift(&tstatus); |
626 | 0 | return; |
627 | 0 | } |
628 | 0 | elapsed_time_map.emplace("load_segments", watch.elapsed_time_microseconds()); |
629 | 0 | if (segments.size() > 1) { |
630 | | // calculate delete bitmap between segments |
631 | 0 | status = local_tablet->calc_delete_bitmap_between_segments( |
632 | 0 | rowset->tablet_schema(), rowset->rowset_id(), segments, delete_bitmap); |
633 | 0 | if (!status) { |
634 | 0 | LOG(WARNING) << "failed to calculate delete bitmap" |
635 | 0 | << ". tablet_id: " << local_tablet->tablet_id() |
636 | 0 | << ". rowset_id: " << rowset->rowset_id() << ", txn_id=" << txn_id |
637 | 0 | << ", status=" << status.to_string(); |
638 | 0 | status.to_thrift(&tstatus); |
639 | 0 | return; |
640 | 0 | } |
641 | 0 | elapsed_time_map.emplace("calc_delete_bitmap", watch.elapsed_time_microseconds()); |
642 | 0 | } |
643 | | |
644 | 0 | static_cast<void>(BaseTablet::commit_phase_update_delete_bitmap( |
645 | 0 | local_tablet, rowset, pre_rowset_ids, delete_bitmap, segments, txn_id, |
646 | 0 | calc_delete_bitmap_token.get(), nullptr)); |
647 | 0 | elapsed_time_map.emplace("commit_phase_update_delete_bitmap", |
648 | 0 | watch.elapsed_time_microseconds()); |
649 | 0 | static_cast<void>(calc_delete_bitmap_token->wait()); |
650 | 0 | elapsed_time_map.emplace("wait_delete_bitmap", watch.elapsed_time_microseconds()); |
651 | 0 | } |
652 | | |
653 | | // Step 7.3: commit txn |
654 | 0 | Status commit_txn_status = engine.txn_manager()->commit_txn( |
655 | 0 | local_tablet->data_dir()->get_meta(), rowset_meta->partition_id(), |
656 | 0 | rowset_meta->txn_id(), rowset_meta->tablet_id(), local_tablet->tablet_uid(), |
657 | 0 | rowset_meta->load_id(), rowset, std::move(pending_rs_guard), false); |
658 | 0 | if (!commit_txn_status && !commit_txn_status.is<ErrorCode::PUSH_TRANSACTION_ALREADY_EXIST>()) { |
659 | 0 | auto err_msg = fmt::format( |
660 | 0 | "failed to commit txn for remote tablet. rowset_id: {}, remote_tablet_id={}, " |
661 | 0 | "txn_id={}, status={}", |
662 | 0 | rowset_meta->rowset_id().to_string(), rowset_meta->tablet_id(), |
663 | 0 | rowset_meta->txn_id(), commit_txn_status.to_string()); |
664 | 0 | LOG(WARNING) << err_msg; |
665 | 0 | set_tstatus(TStatusCode::RUNTIME_ERROR, std::move(err_msg)); |
666 | 0 | return; |
667 | 0 | } |
668 | 0 | elapsed_time_map.emplace("commit_txn", watch.elapsed_time_microseconds()); |
669 | |
|
670 | 0 | if (local_tablet->enable_unique_key_merge_on_write()) { |
671 | 0 | engine.txn_manager()->set_txn_related_delete_bitmap(partition_id, txn_id, local_tablet_id, |
672 | 0 | local_tablet->tablet_uid(), true, |
673 | 0 | delete_bitmap, pre_rowset_ids, nullptr); |
674 | 0 | elapsed_time_map.emplace("set_txn_related_delete_bitmap", |
675 | 0 | watch.elapsed_time_microseconds()); |
676 | 0 | } |
677 | |
|
678 | 0 | tstatus.__set_status_code(TStatusCode::OK); |
679 | 0 | } |
680 | | } // namespace |
681 | | |
682 | | BaseBackendService::BaseBackendService(ExecEnv* exec_env) |
683 | 7 | : _exec_env(exec_env), _agent_server(new AgentServer(exec_env, exec_env->cluster_info())) {} |
684 | | |
685 | 3 | BaseBackendService::~BaseBackendService() = default; |
686 | | |
687 | | BackendService::BackendService(StorageEngine& engine, ExecEnv* exec_env) |
688 | 6 | : BaseBackendService(exec_env), _engine(engine) {} |
689 | | |
690 | | BackendService::~BackendService() = default; |
691 | | |
692 | | Status BackendService::create_service(StorageEngine& engine, ExecEnv* exec_env, int port, |
693 | | std::unique_ptr<ThriftServer>* server, |
694 | 6 | std::shared_ptr<doris::BackendService> service) { |
695 | 6 | service->_agent_server->start_workers(engine, exec_env); |
696 | | // TODO: do we want a BoostThreadFactory? |
697 | | // TODO: we want separate thread factories here, so that fe requests can't starve |
698 | | // be requests |
699 | | // std::shared_ptr<TProcessor> be_processor = std::make_shared<BackendServiceProcessor>(service); |
700 | 6 | auto be_processor = std::make_shared<BackendServiceProcessor>(service); |
701 | | |
702 | 6 | *server = std::make_unique<ThriftServer>("backend", be_processor, port, |
703 | 6 | config::be_service_threads); |
704 | | |
705 | 6 | LOG(INFO) << "Doris BackendService listening on " << port; |
706 | | |
707 | 6 | auto thread_num = config::ingest_binlog_work_pool_size; |
708 | 6 | if (thread_num < 0) { |
709 | 6 | LOG(INFO) << fmt::format("ingest binlog thread pool size is {}, so we will in sync mode", |
710 | 6 | thread_num); |
711 | 6 | return Status::OK(); |
712 | 6 | } |
713 | | |
714 | 0 | if (thread_num == 0) { |
715 | 0 | thread_num = std::thread::hardware_concurrency(); |
716 | 0 | } |
717 | 0 | static_cast<void>(doris::ThreadPoolBuilder("IngestBinlog") |
718 | 0 | .set_min_threads(thread_num) |
719 | 0 | .set_max_threads(thread_num * 2) |
720 | 0 | .build(&(service->_ingest_binlog_workers))); |
721 | 0 | LOG(INFO) << fmt::format("ingest binlog thread pool size is {}, in async mode", thread_num); |
722 | 0 | return Status::OK(); |
723 | 6 | } |
724 | | |
725 | 25 | void BackendService::get_tablet_stat(TTabletStatResult& result) { |
726 | 25 | _engine.tablet_manager()->get_tablet_stat(&result); |
727 | 25 | } |
728 | | |
729 | 0 | int64_t BackendService::get_trash_used_capacity() { |
730 | 0 | int64_t result = 0; |
731 | |
|
732 | 0 | std::vector<DataDirInfo> data_dir_infos; |
733 | 0 | static_cast<void>(_engine.get_all_data_dir_info(&data_dir_infos, false /*do not update */)); |
734 | | |
735 | | // uses excute sql `show trash`, then update backend trash capacity too. |
736 | 0 | _engine.notify_listener("REPORT_DISK_STATE"); |
737 | |
|
738 | 0 | for (const auto& root_path_info : data_dir_infos) { |
739 | 0 | result += root_path_info.trash_used_capacity; |
740 | 0 | } |
741 | |
|
742 | 0 | return result; |
743 | 0 | } |
744 | | |
745 | 0 | void BackendService::get_disk_trash_used_capacity(std::vector<TDiskTrashInfo>& diskTrashInfos) { |
746 | 0 | std::vector<DataDirInfo> data_dir_infos; |
747 | 0 | static_cast<void>(_engine.get_all_data_dir_info(&data_dir_infos, false /*do not update */)); |
748 | | |
749 | | // uses excute sql `show trash on <be>`, then update backend trash capacity too. |
750 | 0 | _engine.notify_listener("REPORT_DISK_STATE"); |
751 | |
|
752 | 0 | for (const auto& root_path_info : data_dir_infos) { |
753 | 0 | TDiskTrashInfo diskTrashInfo; |
754 | 0 | diskTrashInfo.__set_root_path(root_path_info.path); |
755 | 0 | diskTrashInfo.__set_state(root_path_info.is_used ? "ONLINE" : "OFFLINE"); |
756 | 0 | diskTrashInfo.__set_trash_used_capacity(root_path_info.trash_used_capacity); |
757 | 0 | diskTrashInfos.push_back(diskTrashInfo); |
758 | 0 | } |
759 | 0 | } |
760 | | |
761 | | void BaseBackendService::submit_routine_load_task(TStatus& t_status, |
762 | 70 | const std::vector<TRoutineLoadTask>& tasks) { |
763 | 70 | for (auto& task : tasks) { |
764 | 70 | Status st = _exec_env->routine_load_task_executor()->submit_task(task); |
765 | 70 | if (!st.ok()) { |
766 | 0 | LOG(WARNING) << "failed to submit routine load task. job id: " << task.job_id |
767 | 0 | << " task id: " << task.id; |
768 | 0 | return st.to_thrift(&t_status); |
769 | 0 | } |
770 | 70 | } |
771 | | |
772 | 70 | return Status::OK().to_thrift(&t_status); |
773 | 70 | } |
774 | | |
775 | | /* |
776 | | * 1. validate user privilege (todo) |
777 | | * 2. FragmentMgr#exec_plan_fragment |
778 | | */ |
779 | 3 | void BaseBackendService::open_scanner(TScanOpenResult& result_, const TScanOpenParams& params) { |
780 | 3 | TStatus t_status; |
781 | 3 | TUniqueId fragment_instance_id = generate_uuid(); |
782 | | // A query_id is randomly generated to replace t_query_plan_info.query_id. |
783 | | // external query does not need to report anything to FE, so the query_id can be changed. |
784 | | // Otherwise, multiple independent concurrent open tablet scanners have the same query_id. |
785 | | // when one of the scanners ends, the other scanners will be canceled through FragmentMgr.cancel(query_id). |
786 | 3 | TUniqueId query_id = generate_uuid(); |
787 | 3 | std::shared_ptr<ScanContext> p_context; |
788 | 3 | static_cast<void>(_exec_env->external_scan_context_mgr()->create_scan_context(&p_context)); |
789 | 3 | p_context->fragment_instance_id = fragment_instance_id; |
790 | 3 | p_context->offset = 0; |
791 | 3 | p_context->last_access_time = time(nullptr); |
792 | 3 | if (params.__isset.keep_alive_min) { |
793 | 0 | p_context->keep_alive_min = params.keep_alive_min; |
794 | 3 | } else { |
795 | 3 | p_context->keep_alive_min = 5; |
796 | 3 | } |
797 | | |
798 | 3 | Status exec_st; |
799 | 3 | TQueryPlanInfo t_query_plan_info; |
800 | 3 | { |
801 | 3 | const std::string& opaqued_query_plan = params.opaqued_query_plan; |
802 | 3 | std::string query_plan_info; |
803 | | // base64 decode query plan |
804 | 3 | if (!base64_decode(opaqued_query_plan, &query_plan_info)) { |
805 | 0 | LOG(WARNING) << "open context error: base64_decode decode opaqued_query_plan failure"; |
806 | 0 | std::stringstream msg; |
807 | 0 | msg << "query_plan_info: " << query_plan_info |
808 | 0 | << " validate error, should not be modified after returned Doris FE processed"; |
809 | 0 | exec_st = Status::InvalidArgument(msg.str()); |
810 | 0 | } |
811 | | |
812 | 3 | const uint8_t* buf = (const uint8_t*)query_plan_info.data(); |
813 | 3 | uint32_t len = (uint32_t)query_plan_info.size(); |
814 | | // deserialize TQueryPlanInfo |
815 | 3 | auto st = deserialize_thrift_msg(buf, &len, false, &t_query_plan_info); |
816 | 3 | if (!st.ok()) { |
817 | 0 | LOG(WARNING) << "open context error: deserialize TQueryPlanInfo failure"; |
818 | 0 | std::stringstream msg; |
819 | 0 | msg << "query_plan_info: " << query_plan_info |
820 | 0 | << " deserialize error, should not be modified after returned Doris FE processed"; |
821 | 0 | exec_st = Status::InvalidArgument(msg.str()); |
822 | 0 | } |
823 | 3 | p_context->query_id = query_id; |
824 | 3 | } |
825 | 3 | std::vector<TScanColumnDesc> selected_columns; |
826 | 3 | if (exec_st.ok()) { |
827 | | // start the scan procedure |
828 | 3 | LOG(INFO) << fmt::format( |
829 | 3 | "exec external scanner, old_query_id = {}, new_query_id = {}, fragment_instance_id " |
830 | 3 | "= {}", |
831 | 3 | print_id(t_query_plan_info.query_id), print_id(query_id), |
832 | 3 | print_id(fragment_instance_id)); |
833 | 3 | exec_st = _exec_env->fragment_mgr()->exec_external_plan_fragment( |
834 | 3 | params, t_query_plan_info, query_id, fragment_instance_id, &selected_columns); |
835 | 3 | } |
836 | 3 | exec_st.to_thrift(&t_status); |
837 | | //return status |
838 | | // t_status.status_code = TStatusCode::OK; |
839 | 3 | result_.status = t_status; |
840 | 3 | result_.__set_context_id(p_context->context_id); |
841 | 3 | result_.__set_selected_columns(selected_columns); |
842 | 3 | } |
843 | | |
844 | | // fetch result from polling the queue, should always maintain the context offset, otherwise inconsistent result |
845 | 5 | void BaseBackendService::get_next(TScanBatchResult& result_, const TScanNextBatchParams& params) { |
846 | 5 | std::string context_id = params.context_id; |
847 | 5 | u_int64_t offset = params.offset; |
848 | 5 | TStatus t_status; |
849 | 5 | std::shared_ptr<ScanContext> context; |
850 | 5 | Status st = _exec_env->external_scan_context_mgr()->get_scan_context(context_id, &context); |
851 | 5 | if (!st.ok()) { |
852 | 0 | st.to_thrift(&t_status); |
853 | 0 | result_.status = t_status; |
854 | 0 | return; |
855 | 0 | } |
856 | 5 | if (offset != context->offset) { |
857 | 0 | LOG(ERROR) << "getNext error: context offset [" << context->offset << " ]" |
858 | 0 | << " ,client offset [ " << offset << " ]"; |
859 | | // invalid offset |
860 | 0 | t_status.status_code = TStatusCode::NOT_FOUND; |
861 | 0 | t_status.error_msgs.push_back( |
862 | 0 | absl::Substitute("context_id=$0, send_offset=$1, context_offset=$2", context_id, |
863 | 0 | offset, context->offset)); |
864 | 0 | result_.status = t_status; |
865 | 5 | } else { |
866 | | // during accessing, should disabled last_access_time |
867 | 5 | context->last_access_time = -1; |
868 | 5 | TUniqueId fragment_instance_id = context->fragment_instance_id; |
869 | 5 | std::shared_ptr<arrow::RecordBatch> record_batch; |
870 | 5 | bool eos; |
871 | | |
872 | 5 | st = _exec_env->result_queue_mgr()->fetch_result(fragment_instance_id, &record_batch, &eos); |
873 | 5 | if (st.ok()) { |
874 | 5 | result_.__set_eos(eos); |
875 | 5 | if (!eos) { |
876 | 3 | std::string record_batch_str; |
877 | 3 | st = serialize_record_batch(*record_batch, &record_batch_str); |
878 | 3 | st.to_thrift(&t_status); |
879 | 3 | if (st.ok()) { |
880 | | // avoid copy large string |
881 | 3 | result_.rows = std::move(record_batch_str); |
882 | | // set __isset |
883 | 3 | result_.__isset.rows = true; |
884 | 3 | context->offset += record_batch->num_rows(); |
885 | 3 | } |
886 | 3 | } |
887 | 5 | } else { |
888 | 0 | LOG(WARNING) << "fragment_instance_id [" << print_id(fragment_instance_id) |
889 | 0 | << "] fetch result status [" << st.to_string() + "]"; |
890 | 0 | st.to_thrift(&t_status); |
891 | 0 | result_.status = t_status; |
892 | 0 | } |
893 | 5 | } |
894 | 5 | context->last_access_time = time(nullptr); |
895 | 5 | } |
896 | | |
897 | 2 | void BaseBackendService::close_scanner(TScanCloseResult& result_, const TScanCloseParams& params) { |
898 | 2 | std::string context_id = params.context_id; |
899 | 2 | TStatus t_status; |
900 | 2 | Status st = _exec_env->external_scan_context_mgr()->clear_scan_context(context_id); |
901 | 2 | st.to_thrift(&t_status); |
902 | 2 | result_.status = t_status; |
903 | 2 | } |
904 | | |
905 | | void BackendService::get_stream_load_record(TStreamLoadRecordResult& result, |
906 | 12 | int64_t last_stream_record_time) { |
907 | 12 | BaseBackendService::get_stream_load_record(result, last_stream_record_time, |
908 | 12 | _engine.get_stream_load_recorder()); |
909 | 12 | } |
910 | | |
911 | 0 | void BackendService::check_storage_format(TCheckStorageFormatResult& result) { |
912 | 0 | _engine.tablet_manager()->get_all_tablets_storage_format(&result); |
913 | 0 | } |
914 | | |
915 | | void BackendService::make_snapshot(TAgentResult& return_value, |
916 | 0 | const TSnapshotRequest& snapshot_request) { |
917 | 0 | std::string snapshot_path; |
918 | 0 | bool allow_incremental_clone = false; |
919 | 0 | Status status = _engine.snapshot_mgr()->make_snapshot(snapshot_request, &snapshot_path, |
920 | 0 | &allow_incremental_clone); |
921 | 0 | if (!status) { |
922 | 0 | LOG_WARNING("failed to make snapshot") |
923 | 0 | .tag("tablet_id", snapshot_request.tablet_id) |
924 | 0 | .tag("schema_hash", snapshot_request.schema_hash) |
925 | 0 | .error(status); |
926 | 0 | } else { |
927 | 0 | LOG_INFO("successfully make snapshot") |
928 | 0 | .tag("tablet_id", snapshot_request.tablet_id) |
929 | 0 | .tag("schema_hash", snapshot_request.schema_hash) |
930 | 0 | .tag("snapshot_path", snapshot_path); |
931 | 0 | return_value.__set_snapshot_path(snapshot_path); |
932 | 0 | return_value.__set_allow_incremental_clone(allow_incremental_clone); |
933 | 0 | } |
934 | |
|
935 | 0 | status.to_thrift(&return_value.status); |
936 | 0 | return_value.__set_snapshot_version(snapshot_request.preferred_snapshot_version); |
937 | 0 | } |
938 | | |
939 | | void BackendService::release_snapshot(TAgentResult& return_value, |
940 | 0 | const std::string& snapshot_path) { |
941 | 0 | Status status = _engine.snapshot_mgr()->release_snapshot(snapshot_path); |
942 | 0 | if (!status) { |
943 | 0 | LOG_WARNING("failed to release snapshot").tag("snapshot_path", snapshot_path).error(status); |
944 | 0 | } else { |
945 | 0 | LOG_INFO("successfully release snapshot").tag("snapshot_path", snapshot_path); |
946 | 0 | } |
947 | 0 | status.to_thrift(&return_value.status); |
948 | 0 | } |
949 | | |
950 | | void BackendService::ingest_binlog(TIngestBinlogResult& result, |
951 | 0 | const TIngestBinlogRequest& request) { |
952 | 0 | LOG(INFO) << "ingest binlog. request: " << apache::thrift::ThriftDebugString(request); |
953 | |
|
954 | 0 | TStatus tstatus; |
955 | 0 | Defer defer {[&result, &tstatus]() { |
956 | 0 | result.__set_status(tstatus); |
957 | 0 | LOG(INFO) << "ingest binlog. result: " << apache::thrift::ThriftDebugString(result); |
958 | 0 | }}; |
959 | |
|
960 | 0 | auto set_tstatus = [&tstatus](TStatusCode::type code, std::string error_msg) { |
961 | 0 | tstatus.__set_status_code(code); |
962 | 0 | tstatus.__isset.error_msgs = true; |
963 | 0 | tstatus.error_msgs.push_back(std::move(error_msg)); |
964 | 0 | }; |
965 | |
|
966 | 0 | if (!config::enable_feature_binlog) { |
967 | 0 | set_tstatus(TStatusCode::RUNTIME_ERROR, "enable feature binlog is false"); |
968 | 0 | return; |
969 | 0 | } |
970 | | |
971 | | /// Check args: txn_id, remote_tablet_id, binlog_version, remote_host, remote_port, partition_id, load_id |
972 | 0 | if (!request.__isset.txn_id) { |
973 | 0 | auto error_msg = "txn_id is empty"; |
974 | 0 | LOG(WARNING) << error_msg; |
975 | 0 | set_tstatus(TStatusCode::ANALYSIS_ERROR, error_msg); |
976 | 0 | return; |
977 | 0 | } |
978 | 0 | if (!request.__isset.remote_tablet_id) { |
979 | 0 | auto error_msg = "remote_tablet_id is empty"; |
980 | 0 | LOG(WARNING) << error_msg; |
981 | 0 | set_tstatus(TStatusCode::ANALYSIS_ERROR, error_msg); |
982 | 0 | return; |
983 | 0 | } |
984 | 0 | if (!request.__isset.binlog_version) { |
985 | 0 | auto error_msg = "binlog_version is empty"; |
986 | 0 | LOG(WARNING) << error_msg; |
987 | 0 | set_tstatus(TStatusCode::ANALYSIS_ERROR, error_msg); |
988 | 0 | return; |
989 | 0 | } |
990 | 0 | if (!request.__isset.remote_host) { |
991 | 0 | auto error_msg = "remote_host is empty"; |
992 | 0 | LOG(WARNING) << error_msg; |
993 | 0 | set_tstatus(TStatusCode::ANALYSIS_ERROR, error_msg); |
994 | 0 | return; |
995 | 0 | } |
996 | 0 | if (!request.__isset.remote_port) { |
997 | 0 | auto error_msg = "remote_port is empty"; |
998 | 0 | LOG(WARNING) << error_msg; |
999 | 0 | set_tstatus(TStatusCode::ANALYSIS_ERROR, error_msg); |
1000 | 0 | return; |
1001 | 0 | } |
1002 | 0 | if (!request.__isset.partition_id) { |
1003 | 0 | auto error_msg = "partition_id is empty"; |
1004 | 0 | LOG(WARNING) << error_msg; |
1005 | 0 | set_tstatus(TStatusCode::ANALYSIS_ERROR, error_msg); |
1006 | 0 | return; |
1007 | 0 | } |
1008 | 0 | if (!request.__isset.local_tablet_id) { |
1009 | 0 | auto error_msg = "local_tablet_id is empty"; |
1010 | 0 | LOG(WARNING) << error_msg; |
1011 | 0 | set_tstatus(TStatusCode::ANALYSIS_ERROR, error_msg); |
1012 | 0 | return; |
1013 | 0 | } |
1014 | 0 | if (!request.__isset.load_id) { |
1015 | 0 | auto error_msg = "load_id is empty"; |
1016 | 0 | LOG(WARNING) << error_msg; |
1017 | 0 | set_tstatus(TStatusCode::ANALYSIS_ERROR, error_msg); |
1018 | 0 | return; |
1019 | 0 | } |
1020 | | |
1021 | 0 | auto txn_id = request.txn_id; |
1022 | | // Step 1: get local tablet |
1023 | 0 | auto const& local_tablet_id = request.local_tablet_id; |
1024 | 0 | auto local_tablet = _engine.tablet_manager()->get_tablet(local_tablet_id); |
1025 | 0 | if (local_tablet == nullptr) { |
1026 | 0 | auto error_msg = fmt::format("tablet {} not found", local_tablet_id); |
1027 | 0 | LOG(WARNING) << error_msg; |
1028 | 0 | set_tstatus(TStatusCode::TABLET_MISSING, std::move(error_msg)); |
1029 | 0 | return; |
1030 | 0 | } |
1031 | | |
1032 | | // Step 2: check txn, create txn, prepare_txn will check it |
1033 | 0 | auto partition_id = request.partition_id; |
1034 | 0 | auto& load_id = request.load_id; |
1035 | 0 | auto is_ingrest = true; |
1036 | 0 | PUniqueId p_load_id; |
1037 | 0 | p_load_id.set_hi(load_id.hi); |
1038 | 0 | p_load_id.set_lo(load_id.lo); |
1039 | |
|
1040 | 0 | { |
1041 | | // TODO: Before push_lock is not held, but I think it should hold. |
1042 | 0 | auto status = local_tablet->prepare_txn(partition_id, txn_id, p_load_id, is_ingrest); |
1043 | 0 | if (!status.ok()) { |
1044 | 0 | LOG(WARNING) << "prepare txn failed. txn_id=" << txn_id |
1045 | 0 | << ", status=" << status.to_string(); |
1046 | 0 | status.to_thrift(&tstatus); |
1047 | 0 | return; |
1048 | 0 | } |
1049 | 0 | } |
1050 | | |
1051 | 0 | bool is_async = (_ingest_binlog_workers != nullptr); |
1052 | 0 | result.__set_is_async(is_async); |
1053 | |
|
1054 | 0 | auto ingest_binlog_func = [=, this, tstatus = &tstatus]() { |
1055 | 0 | IngestBinlogArg ingest_binlog_arg = { |
1056 | 0 | .txn_id = txn_id, |
1057 | 0 | .partition_id = partition_id, |
1058 | 0 | .local_tablet_id = local_tablet_id, |
1059 | 0 | .local_tablet = local_tablet, |
1060 | |
|
1061 | 0 | .request = request, |
1062 | 0 | .tstatus = is_async ? nullptr : tstatus, |
1063 | 0 | }; |
1064 | |
|
1065 | 0 | _ingest_binlog(_engine, &ingest_binlog_arg); |
1066 | 0 | }; |
1067 | |
|
1068 | 0 | if (is_async) { |
1069 | 0 | auto status = _ingest_binlog_workers->submit_func(std::move(ingest_binlog_func)); |
1070 | 0 | if (!status.ok()) { |
1071 | 0 | status.to_thrift(&tstatus); |
1072 | 0 | return; |
1073 | 0 | } |
1074 | 0 | } else { |
1075 | 0 | ingest_binlog_func(); |
1076 | 0 | } |
1077 | 0 | } |
1078 | | |
1079 | | void BackendService::query_ingest_binlog(TQueryIngestBinlogResult& result, |
1080 | 0 | const TQueryIngestBinlogRequest& request) { |
1081 | 0 | LOG(INFO) << "query ingest binlog. request: " << apache::thrift::ThriftDebugString(request); |
1082 | |
|
1083 | 0 | auto set_result = [&](TIngestBinlogStatus::type status, std::string error_msg) { |
1084 | 0 | result.__set_status(status); |
1085 | 0 | result.__set_err_msg(std::move(error_msg)); |
1086 | 0 | }; |
1087 | | |
1088 | | /// Check args: txn_id, partition_id, tablet_id, load_id |
1089 | 0 | if (!request.__isset.txn_id) { |
1090 | 0 | auto error_msg = "txn_id is empty"; |
1091 | 0 | LOG(WARNING) << error_msg; |
1092 | 0 | set_result(TIngestBinlogStatus::ANALYSIS_ERROR, error_msg); |
1093 | 0 | return; |
1094 | 0 | } |
1095 | 0 | if (!request.__isset.partition_id) { |
1096 | 0 | auto error_msg = "partition_id is empty"; |
1097 | 0 | LOG(WARNING) << error_msg; |
1098 | 0 | set_result(TIngestBinlogStatus::ANALYSIS_ERROR, error_msg); |
1099 | 0 | return; |
1100 | 0 | } |
1101 | 0 | if (!request.__isset.tablet_id) { |
1102 | 0 | auto error_msg = "tablet_id is empty"; |
1103 | 0 | LOG(WARNING) << error_msg; |
1104 | 0 | set_result(TIngestBinlogStatus::ANALYSIS_ERROR, error_msg); |
1105 | 0 | return; |
1106 | 0 | } |
1107 | 0 | if (!request.__isset.load_id) { |
1108 | 0 | auto error_msg = "load_id is empty"; |
1109 | 0 | LOG(WARNING) << error_msg; |
1110 | 0 | set_result(TIngestBinlogStatus::ANALYSIS_ERROR, error_msg); |
1111 | 0 | return; |
1112 | 0 | } |
1113 | | |
1114 | 0 | auto partition_id = request.partition_id; |
1115 | 0 | auto txn_id = request.txn_id; |
1116 | 0 | auto tablet_id = request.tablet_id; |
1117 | | |
1118 | | // Step 1: get local tablet |
1119 | 0 | auto local_tablet = _engine.tablet_manager()->get_tablet(tablet_id); |
1120 | 0 | if (local_tablet == nullptr) { |
1121 | 0 | auto error_msg = fmt::format("tablet {} not found", tablet_id); |
1122 | 0 | LOG(WARNING) << error_msg; |
1123 | 0 | set_result(TIngestBinlogStatus::NOT_FOUND, std::move(error_msg)); |
1124 | 0 | return; |
1125 | 0 | } |
1126 | | |
1127 | | // Step 2: get txn state |
1128 | 0 | auto tablet_uid = local_tablet->tablet_uid(); |
1129 | 0 | auto txn_state = |
1130 | 0 | _engine.txn_manager()->get_txn_state(partition_id, txn_id, tablet_id, tablet_uid); |
1131 | 0 | switch (txn_state) { |
1132 | 0 | case TxnState::NOT_FOUND: |
1133 | 0 | result.__set_status(TIngestBinlogStatus::NOT_FOUND); |
1134 | 0 | break; |
1135 | 0 | case TxnState::PREPARED: |
1136 | 0 | result.__set_status(TIngestBinlogStatus::DOING); |
1137 | 0 | break; |
1138 | 0 | case TxnState::COMMITTED: |
1139 | 0 | result.__set_status(TIngestBinlogStatus::OK); |
1140 | 0 | break; |
1141 | 0 | case TxnState::ROLLEDBACK: |
1142 | 0 | result.__set_status(TIngestBinlogStatus::FAILED); |
1143 | 0 | break; |
1144 | 0 | case TxnState::ABORTED: |
1145 | 0 | result.__set_status(TIngestBinlogStatus::FAILED); |
1146 | 0 | break; |
1147 | 0 | case TxnState::DELETED: |
1148 | 0 | result.__set_status(TIngestBinlogStatus::FAILED); |
1149 | 0 | break; |
1150 | 0 | } |
1151 | 0 | } |
1152 | | |
1153 | 0 | void BaseBackendService::get_tablet_stat(TTabletStatResult& result) { |
1154 | 0 | LOG(ERROR) << "get_tablet_stat is not implemented"; |
1155 | 0 | } |
1156 | | |
1157 | 3 | int64_t BaseBackendService::get_trash_used_capacity() { |
1158 | 3 | LOG(ERROR) << "get_trash_used_capacity is not implemented"; |
1159 | 3 | return 0; |
1160 | 3 | } |
1161 | | |
1162 | | void BaseBackendService::get_stream_load_record(TStreamLoadRecordResult& result, |
1163 | 0 | int64_t last_stream_record_time) { |
1164 | 0 | LOG(ERROR) << "get_stream_load_record is not implemented"; |
1165 | 0 | } |
1166 | | |
1167 | | void BaseBackendService::get_stream_load_record( |
1168 | | TStreamLoadRecordResult& result, int64_t last_stream_record_time, |
1169 | 48 | std::shared_ptr<StreamLoadRecorder> stream_load_recorder) { |
1170 | 48 | if (stream_load_recorder != nullptr) { |
1171 | 48 | std::map<std::string, std::string> records; |
1172 | 48 | auto st = stream_load_recorder->get_batch(std::to_string(last_stream_record_time), |
1173 | 48 | config::stream_load_record_batch_size, &records); |
1174 | 48 | if (st.ok()) { |
1175 | 48 | LOG(INFO) << "get_batch stream_load_record rocksdb successfully. records size: " |
1176 | 48 | << records.size() |
1177 | 48 | << ", last_stream_load_timestamp: " << last_stream_record_time; |
1178 | 48 | std::map<std::string, TStreamLoadRecord> stream_load_record_batch; |
1179 | 48 | auto it = records.begin(); |
1180 | 2.18k | for (; it != records.end(); ++it) { |
1181 | 2.13k | TStreamLoadRecord stream_load_item; |
1182 | 2.13k | StreamLoadContext::parse_stream_load_record(it->second, stream_load_item); |
1183 | 2.13k | stream_load_record_batch.emplace(it->first.c_str(), stream_load_item); |
1184 | 2.13k | } |
1185 | 48 | result.__set_stream_load_record(stream_load_record_batch); |
1186 | 48 | } |
1187 | 48 | } else { |
1188 | 0 | LOG(WARNING) << "stream_load_recorder is null."; |
1189 | 0 | } |
1190 | 48 | } |
1191 | | |
1192 | 0 | void BaseBackendService::get_disk_trash_used_capacity(std::vector<TDiskTrashInfo>& diskTrashInfos) { |
1193 | 0 | LOG(ERROR) << "get_disk_trash_used_capacity is not implemented"; |
1194 | 0 | } |
1195 | | |
1196 | | void BaseBackendService::make_snapshot(TAgentResult& return_value, |
1197 | 0 | const TSnapshotRequest& snapshot_request) { |
1198 | 0 | LOG(ERROR) << "make_snapshot is not implemented"; |
1199 | 0 | return_value.__set_status(Status::NotSupported("make_snapshot is not implemented").to_thrift()); |
1200 | 0 | } |
1201 | | |
1202 | | void BaseBackendService::release_snapshot(TAgentResult& return_value, |
1203 | 0 | const std::string& snapshot_path) { |
1204 | 0 | LOG(ERROR) << "release_snapshot is not implemented"; |
1205 | 0 | return_value.__set_status( |
1206 | 0 | Status::NotSupported("release_snapshot is not implemented").to_thrift()); |
1207 | 0 | } |
1208 | | |
1209 | 0 | void BaseBackendService::check_storage_format(TCheckStorageFormatResult& result) { |
1210 | 0 | LOG(ERROR) << "check_storage_format is not implemented"; |
1211 | 0 | } |
1212 | | |
1213 | | void BaseBackendService::ingest_binlog(TIngestBinlogResult& result, |
1214 | 0 | const TIngestBinlogRequest& request) { |
1215 | 0 | LOG(ERROR) << "ingest_binlog is not implemented"; |
1216 | 0 | result.__set_status(Status::NotSupported("ingest_binlog is not implemented").to_thrift()); |
1217 | 0 | } |
1218 | | |
1219 | | void BaseBackendService::query_ingest_binlog(TQueryIngestBinlogResult& result, |
1220 | 0 | const TQueryIngestBinlogRequest& request) { |
1221 | 0 | LOG(ERROR) << "query_ingest_binlog is not implemented"; |
1222 | 0 | result.__set_status(TIngestBinlogStatus::UNKNOWN); |
1223 | 0 | result.__set_err_msg("query_ingest_binlog is not implemented"); |
1224 | 0 | } |
1225 | | |
1226 | | void BaseBackendService::warm_up_cache_async(TWarmUpCacheAsyncResponse& response, |
1227 | 0 | const TWarmUpCacheAsyncRequest& request) { |
1228 | 0 | LOG(ERROR) << "warm_up_cache_async is not implemented"; |
1229 | 0 | response.__set_status( |
1230 | 0 | Status::NotSupported("warm_up_cache_async is not implemented").to_thrift()); |
1231 | 0 | } |
1232 | | |
1233 | | void BaseBackendService::check_warm_up_cache_async(TCheckWarmUpCacheAsyncResponse& response, |
1234 | 0 | const TCheckWarmUpCacheAsyncRequest& request) { |
1235 | 0 | LOG(ERROR) << "check_warm_up_cache_async is not implemented"; |
1236 | 0 | response.__set_status( |
1237 | 0 | Status::NotSupported("check_warm_up_cache_async is not implemented").to_thrift()); |
1238 | 0 | } |
1239 | | |
1240 | | void BaseBackendService::sync_load_for_tablets(TSyncLoadForTabletsResponse& response, |
1241 | 0 | const TSyncLoadForTabletsRequest& request) { |
1242 | 0 | LOG(ERROR) << "sync_load_for_tablets is not implemented"; |
1243 | 0 | } |
1244 | | |
1245 | | void BaseBackendService::get_top_n_hot_partitions(TGetTopNHotPartitionsResponse& response, |
1246 | 0 | const TGetTopNHotPartitionsRequest& request) { |
1247 | 0 | LOG(ERROR) << "get_top_n_hot_partitions is not implemented"; |
1248 | 0 | } |
1249 | | |
1250 | | void BaseBackendService::warm_up_tablets(TWarmUpTabletsResponse& response, |
1251 | 0 | const TWarmUpTabletsRequest& request) { |
1252 | 0 | LOG(ERROR) << "warm_up_tablets is not implemented"; |
1253 | 0 | response.__set_status(Status::NotSupported("warm_up_tablets is not implemented").to_thrift()); |
1254 | 0 | } |
1255 | | |
1256 | | void BaseBackendService::get_realtime_exec_status(TGetRealtimeExecStatusResponse& response, |
1257 | 1 | const TGetRealtimeExecStatusRequest& request) { |
1258 | 1 | if (!request.__isset.id) { |
1259 | 0 | LOG_WARNING("Invalidate argument, id is empty"); |
1260 | 0 | response.__set_status(Status::InvalidArgument("id is empty").to_thrift()); |
1261 | 0 | return; |
1262 | 0 | } |
1263 | | |
1264 | 1 | RuntimeProfile::Counter get_realtime_timer {TUnit::TIME_NS}; |
1265 | | |
1266 | 1 | Defer _print_log([&]() { |
1267 | 1 | LOG_INFO("Getting realtime exec status of query {} , cost time {}", print_id(request.id), |
1268 | 1 | PrettyPrinter::print(get_realtime_timer.value(), get_realtime_timer.type())); |
1269 | 1 | }); |
1270 | | |
1271 | 1 | SCOPED_TIMER(&get_realtime_timer); |
1272 | | |
1273 | 1 | std::unique_ptr<TReportExecStatusParams> report_exec_status_params = |
1274 | 1 | std::make_unique<TReportExecStatusParams>(); |
1275 | 1 | std::unique_ptr<TQueryStatistics> query_stats = std::make_unique<TQueryStatistics>(); |
1276 | | |
1277 | 1 | std::string req_type = request.__isset.req_type ? request.req_type : "profile"; |
1278 | 1 | Status st; |
1279 | 1 | if (req_type == "stats") { |
1280 | 1 | st = ExecEnv::GetInstance()->fragment_mgr()->get_query_statistics(request.id, |
1281 | 1 | query_stats.get()); |
1282 | 1 | if (st.ok()) { |
1283 | 1 | response.__set_query_stats(*query_stats); |
1284 | 1 | } |
1285 | 1 | } else { |
1286 | | // default is "profile" |
1287 | 0 | st = ExecEnv::GetInstance()->fragment_mgr()->get_realtime_exec_status( |
1288 | 0 | request.id, report_exec_status_params.get()); |
1289 | 0 | if (st.ok()) { |
1290 | 0 | response.__set_report_exec_status_params(*report_exec_status_params); |
1291 | 0 | } |
1292 | 0 | } |
1293 | | |
1294 | 1 | report_exec_status_params->__set_query_id(TUniqueId()); |
1295 | 1 | report_exec_status_params->__set_done(false); |
1296 | 1 | response.__set_status(st.to_thrift()); |
1297 | 1 | } |
1298 | | |
1299 | | void BaseBackendService::get_dictionary_status(TDictionaryStatusList& result, |
1300 | 1.22k | const std::vector<int64_t>& dictionary_ids) { |
1301 | 1.22k | std::vector<TDictionaryStatus> dictionary_status; |
1302 | 1.22k | ExecEnv::GetInstance()->dict_factory()->get_dictionary_status(dictionary_status, |
1303 | 1.22k | dictionary_ids); |
1304 | 1.22k | result.__set_dictionary_status_list(dictionary_status); |
1305 | 1.22k | LOG(INFO) << "query for dictionary status, return " << result.dictionary_status_list.size() |
1306 | 1.22k | << " rows"; |
1307 | 1.22k | } |
1308 | | |
1309 | | void BaseBackendService::test_storage_connectivity(TTestStorageConnectivityResponse& response, |
1310 | 0 | const TTestStorageConnectivityRequest& request) { |
1311 | 0 | Status status = io::StorageConnectivityTester::test(request.type, request.properties); |
1312 | 0 | response.__set_status(status.to_thrift()); |
1313 | 0 | } |
1314 | | |
1315 | 2 | void BaseBackendService::get_python_envs(std::vector<TPythonEnvInfo>& result) { |
1316 | 2 | result = PythonVersionManager::instance().env_infos_to_thrift(); |
1317 | 2 | } |
1318 | | |
1319 | | void BaseBackendService::get_python_packages(std::vector<TPythonPackageInfo>& result, |
1320 | 1 | const std::string& python_version) { |
1321 | 1 | PythonVersion version; |
1322 | 1 | auto& manager = PythonVersionManager::instance(); |
1323 | 1 | THROW_IF_ERROR(manager.get_version(python_version, &version)); |
1324 | | |
1325 | 1 | std::vector<std::pair<std::string, std::string>> packages; |
1326 | 1 | THROW_IF_ERROR(list_installed_packages(version, &packages)); |
1327 | 1 | result = manager.package_infos_to_thrift(packages); |
1328 | 1 | } |
1329 | | |
1330 | | #include "common/compile_check_end.h" |
1331 | | } // namespace doris |