be/src/service/doris_main.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include <arrow/flight/client.h> |
19 | | #include <arrow/flight/sql/client.h> |
20 | | #include <arrow/scalar.h> |
21 | | #include <arrow/status.h> |
22 | | #include <arrow/table.h> |
23 | | #include <butil/macros.h> |
24 | | // IWYU pragma: no_include <bthread/errno.h> |
25 | | #include <errno.h> // IWYU pragma: keep |
26 | | #include <fcntl.h> |
27 | | #include <fmt/core.h> |
28 | | #if !defined(__SANITIZE_ADDRESS__) && !defined(ADDRESS_SANITIZER) && !defined(LEAK_SANITIZER) && \ |
29 | | !defined(THREAD_SANITIZER) && !defined(USE_JEMALLOC) |
30 | | #include <gperftools/malloc_extension.h> // IWYU pragma: keep |
31 | | #endif |
32 | | #include <libgen.h> |
33 | | #include <setjmp.h> |
34 | | #include <signal.h> |
35 | | #include <stdint.h> |
36 | | #include <stdio.h> |
37 | | #include <stdlib.h> |
38 | | #include <unistd.h> |
39 | | |
40 | | #include <cstring> |
41 | | #include <functional> |
42 | | #include <memory> |
43 | | #include <ostream> |
44 | | #include <string> |
45 | | #include <string_view> |
46 | | #include <tuple> |
47 | | #include <vector> |
48 | | |
49 | | #include "cloud/cloud_backend_service.h" |
50 | | #include "cloud/config.h" |
51 | | #include "common/stack_trace.h" |
52 | | #include "runtime/memory/mem_tracker_limiter.h" |
53 | | #include "storage/tablet/tablet_schema_cache.h" |
54 | | #include "storage/utils.h" |
55 | | #include "util/concurrency_stats.h" |
56 | | #include "util/jni-util.h" |
57 | | |
58 | | #if defined(LEAK_SANITIZER) |
59 | | #include <sanitizer/lsan_interface.h> |
60 | | #endif |
61 | | |
62 | | #include <curl/curl.h> |
63 | | #include <thrift/TOutput.h> |
64 | | |
65 | | #include "agent/heartbeat_server.h" |
66 | | #include "common/config.h" |
67 | | #include "common/daemon.h" |
68 | | #include "common/logging.h" |
69 | | #include "common/signal_handler.h" |
70 | | #include "common/status.h" |
71 | | #include "io/cache/block_file_cache_factory.h" |
72 | | #include "runtime/exec_env.h" |
73 | | #include "runtime/user_function_cache.h" |
74 | | #include "service/arrow_flight/flight_sql_service.h" |
75 | | #include "service/backend_options.h" |
76 | | #include "service/backend_service.h" |
77 | | #include "service/http_service.h" |
78 | | #include "service/server/be_server_starter_factory.h" |
79 | | #include "storage/options.h" |
80 | | #include "storage/storage_engine.h" |
81 | | #include "udf/python/python_env.h" |
82 | | #include "util/debug_util.h" |
83 | | #include "util/disk_info.h" |
84 | | #include "util/mem_info.h" |
85 | | #include "util/string_util.h" |
86 | | #include "util/thrift_rpc_helper.h" |
87 | | #include "util/thrift_server.h" |
88 | | #include "util/uid_util.h" |
89 | | |
90 | | namespace doris {} // namespace doris |
91 | | |
92 | | static void help(const char*); |
93 | | |
94 | | extern "C" { |
95 | | void __lsan_do_leak_check(); |
96 | | int __llvm_profile_write_file(); |
97 | | } |
98 | | |
99 | | namespace doris { |
100 | | |
101 | 7 | void signal_handler(int signal) { |
102 | 7 | if (signal == SIGINT || signal == SIGTERM) { |
103 | 7 | k_doris_exit = true; |
104 | 7 | } |
105 | 7 | } |
106 | | |
107 | 14 | int install_signal(int signo, void (*handler)(int)) { |
108 | 14 | struct sigaction sa; |
109 | 14 | memset(&sa, 0, sizeof(struct sigaction)); |
110 | 14 | sa.sa_handler = handler; |
111 | 14 | sigemptyset(&sa.sa_mask); |
112 | 14 | auto ret = sigaction(signo, &sa, nullptr); |
113 | 14 | if (ret != 0) { |
114 | 0 | char buf[64]; |
115 | 0 | LOG(ERROR) << "install signal failed, signo=" << signo << ", errno=" << errno |
116 | 0 | << ", errmsg=" << strerror_r(errno, buf, sizeof(buf)); |
117 | 0 | } |
118 | 14 | return ret; |
119 | 14 | } |
120 | | |
121 | 7 | void init_signals() { |
122 | 7 | auto ret = install_signal(SIGINT, signal_handler); |
123 | 7 | if (ret < 0) { |
124 | 0 | exit(-1); |
125 | 0 | } |
126 | 7 | ret = install_signal(SIGTERM, signal_handler); |
127 | 7 | if (ret < 0) { |
128 | 0 | exit(-1); |
129 | 0 | } |
130 | 7 | } |
131 | | |
132 | 4 | static void thrift_output(const char* x) { |
133 | 4 | LOG(WARNING) << "thrift internal message: " << x; |
134 | 4 | } |
135 | | |
136 | | } // namespace doris |
137 | | |
138 | | // These code is referenced from clickhouse |
139 | | // It is used to check the SIMD instructions |
140 | | enum class InstructionFail { |
141 | | NONE = 0, |
142 | | SSE3 = 1, |
143 | | SSSE3 = 2, |
144 | | SSE4_1 = 3, |
145 | | SSE4_2 = 4, |
146 | | POPCNT = 5, |
147 | | AVX = 6, |
148 | | AVX2 = 7, |
149 | | AVX512 = 8, |
150 | | ARM_NEON = 9 |
151 | | }; |
152 | | |
153 | 0 | auto instruction_fail_to_string(InstructionFail fail) { |
154 | 0 | switch (fail) { |
155 | 0 | #define ret(x) return std::make_tuple(STDERR_FILENO, x, ARRAY_SIZE(x) - 1) |
156 | 0 | case InstructionFail::NONE: |
157 | 0 | ret("NONE"); |
158 | 0 | case InstructionFail::SSE3: |
159 | 0 | ret("SSE3"); |
160 | 0 | case InstructionFail::SSSE3: |
161 | 0 | ret("SSSE3"); |
162 | 0 | case InstructionFail::SSE4_1: |
163 | 0 | ret("SSE4.1"); |
164 | 0 | case InstructionFail::SSE4_2: |
165 | 0 | ret("SSE4.2"); |
166 | 0 | case InstructionFail::POPCNT: |
167 | 0 | ret("POPCNT"); |
168 | 0 | case InstructionFail::AVX: |
169 | 0 | ret("AVX"); |
170 | 0 | case InstructionFail::AVX2: |
171 | 0 | ret("AVX2"); |
172 | 0 | case InstructionFail::AVX512: |
173 | 0 | ret("AVX512"); |
174 | 0 | case InstructionFail::ARM_NEON: |
175 | 0 | ret("ARM_NEON"); |
176 | 0 | } |
177 | | |
178 | 0 | LOG(ERROR) << "Unrecognized instruction fail value." << std::endl; |
179 | 0 | exit(-1); |
180 | 0 | } |
181 | | |
182 | | sigjmp_buf jmpbuf; |
183 | | |
184 | 0 | void sig_ill_check_handler(int, siginfo_t*, void*) { |
185 | 0 | siglongjmp(jmpbuf, 1); |
186 | 0 | } |
187 | | |
188 | | /// Check if necessary SSE extensions are available by trying to execute some sse instructions. |
189 | | /// If instruction is unavailable, SIGILL will be sent by kernel. |
190 | 8 | void check_required_instructions_impl(volatile InstructionFail& fail) { |
191 | 8 | #if defined(__SSE3__) |
192 | 8 | fail = InstructionFail::SSE3; |
193 | 8 | __asm__ volatile("addsubpd %%xmm0, %%xmm0" : : : "xmm0"); |
194 | 8 | #endif |
195 | | |
196 | 8 | #if defined(__SSSE3__) |
197 | 8 | fail = InstructionFail::SSSE3; |
198 | 8 | __asm__ volatile("pabsw %%xmm0, %%xmm0" : : : "xmm0"); |
199 | | |
200 | 8 | #endif |
201 | | |
202 | 8 | #if defined(__SSE4_1__) |
203 | 8 | fail = InstructionFail::SSE4_1; |
204 | 8 | __asm__ volatile("pmaxud %%xmm0, %%xmm0" : : : "xmm0"); |
205 | 8 | #endif |
206 | | |
207 | 8 | #if defined(__SSE4_2__) |
208 | 8 | fail = InstructionFail::SSE4_2; |
209 | 8 | __asm__ volatile("pcmpgtq %%xmm0, %%xmm0" : : : "xmm0"); |
210 | 8 | #endif |
211 | | |
212 | | /// Defined by -msse4.2 |
213 | 8 | #if defined(__POPCNT__) |
214 | 8 | fail = InstructionFail::POPCNT; |
215 | 8 | { |
216 | 8 | uint64_t a = 0; |
217 | 8 | uint64_t b = 0; |
218 | 8 | __asm__ volatile("popcnt %1, %0" : "=r"(a) : "r"(b) :); |
219 | 8 | } |
220 | 8 | #endif |
221 | | |
222 | 8 | #if defined(__AVX__) |
223 | 8 | fail = InstructionFail::AVX; |
224 | 8 | __asm__ volatile("vaddpd %%ymm0, %%ymm0, %%ymm0" : : : "ymm0"); |
225 | 8 | #endif |
226 | | |
227 | 8 | #if defined(__AVX2__) |
228 | 8 | fail = InstructionFail::AVX2; |
229 | 8 | __asm__ volatile("vpabsw %%ymm0, %%ymm0" : : : "ymm0"); |
230 | 8 | #endif |
231 | | |
232 | | #if defined(__AVX512__) |
233 | | fail = InstructionFail::AVX512; |
234 | | __asm__ volatile("vpabsw %%zmm0, %%zmm0" : : : "zmm0"); |
235 | | #endif |
236 | | |
237 | | #if defined(__ARM_NEON__) |
238 | | fail = InstructionFail::ARM_NEON; |
239 | | #ifndef __APPLE__ |
240 | | __asm__ volatile("vadd.i32 q8, q8, q8" : : : "q8"); |
241 | | #endif |
242 | | #endif |
243 | | |
244 | 8 | fail = InstructionFail::NONE; |
245 | 8 | } |
246 | | |
247 | 0 | bool write_retry(int fd, const char* data, size_t size) { |
248 | 0 | if (!size) size = strlen(data); |
249 | |
|
250 | 0 | while (size != 0) { |
251 | 0 | ssize_t res = ::write(fd, data, size); |
252 | |
|
253 | 0 | if ((-1 == res || 0 == res) && errno != EINTR) return false; |
254 | | |
255 | 0 | if (res > 0) { |
256 | 0 | data += res; |
257 | 0 | size -= res; |
258 | 0 | } |
259 | 0 | } |
260 | | |
261 | 0 | return true; |
262 | 0 | } |
263 | | |
264 | | /// Macros to avoid using strlen(), since it may fail if SSE is not supported. |
265 | | #define WRITE_ERROR(data) \ |
266 | 0 | do { \ |
267 | 0 | static_assert(__builtin_constant_p(data)); \ |
268 | 0 | if (!write_retry(STDERR_FILENO, data, ARRAY_SIZE(data) - 1)) _Exit(1); \ |
269 | 0 | } while (false) |
270 | | |
271 | | /// Check SSE and others instructions availability. Calls exit on fail. |
272 | | /// This function must be called as early as possible, even before main, because static initializers may use unavailable instructions. |
273 | 8 | void check_required_instructions() { |
274 | 8 | struct sigaction sa {}; |
275 | 8 | struct sigaction sa_old {}; |
276 | 8 | sa.sa_sigaction = sig_ill_check_handler; |
277 | 8 | sa.sa_flags = SA_SIGINFO; |
278 | 8 | auto signal = SIGILL; |
279 | 8 | if (sigemptyset(&sa.sa_mask) != 0 || sigaddset(&sa.sa_mask, signal) != 0 || |
280 | 8 | sigaction(signal, &sa, &sa_old) != 0) { |
281 | | /// You may wonder about strlen. |
282 | | /// Typical implementation of strlen is using SSE4.2 or AVX2. |
283 | | /// But this is not the case because it's compiler builtin and is executed at compile time. |
284 | |
|
285 | 0 | WRITE_ERROR("Can not set signal handler\n"); |
286 | 0 | _Exit(1); |
287 | 0 | } |
288 | | |
289 | 8 | volatile InstructionFail fail = InstructionFail::NONE; |
290 | | |
291 | 8 | if (sigsetjmp(jmpbuf, 1)) { |
292 | 0 | WRITE_ERROR("Instruction check fail. The CPU does not support "); |
293 | 0 | if (!std::apply(write_retry, instruction_fail_to_string(fail))) _Exit(1); |
294 | 0 | WRITE_ERROR(" instruction set.\n"); |
295 | 0 | WRITE_ERROR( |
296 | 0 | "For example, if your CPU does not support AVX2, you need to rebuild the Doris BE " |
297 | 0 | "with: USE_AVX2=0 sh build.sh --be"); |
298 | 0 | _Exit(1); |
299 | 0 | } |
300 | | |
301 | 8 | check_required_instructions_impl(fail); |
302 | | |
303 | 8 | if (sigaction(signal, &sa_old, nullptr)) { |
304 | 0 | WRITE_ERROR("Can not set signal handler\n"); |
305 | 0 | _Exit(1); |
306 | 0 | } |
307 | 8 | } |
308 | | |
309 | | struct Checker { |
310 | 8 | Checker() { check_required_instructions(); } |
311 | | } checker |
312 | | #ifndef __APPLE__ |
313 | | __attribute__((init_priority(101))) /// Run before other static initializers. |
314 | | #endif |
315 | | ; |
316 | | |
317 | | // A startup failure that happens after ExecEnv::init() has run must terminate the |
318 | | // process the same way normal shutdown does (see the _exit(0) at the end of main): |
319 | | // in the default mode we _exit() immediately, skipping global destructors and the |
320 | | // LeakSanitizer atexit check. Init-time singletons (e.g. the internal workload |
321 | | // group's task scheduler) intentionally live for the whole process lifetime, so |
322 | | // running the leak check on this abnormal-exit path reports them as false-positive |
323 | | // leaks. enable_graceful_exit_check is honored so memleak-check mode still runs LSAN. |
324 | 0 | [[noreturn]] static void exit_on_startup_failure() { |
325 | 0 | google::FlushLogFiles(google::GLOG_INFO); |
326 | 0 | if (!doris::config::enable_graceful_exit_check) { |
327 | 0 | _exit(1); |
328 | 0 | } |
329 | 0 | exit(1); |
330 | 0 | } |
331 | | |
332 | 8 | int main(int argc, char** argv) { |
333 | 8 | doris::signal::InstallFailureSignalHandler(); |
334 | | // create StackTraceCache Instance, at the beginning, other static destructors may use. |
335 | 8 | StackTrace::createCache(); |
336 | | // extern doris::ErrorCode::ErrorCodeInitializer error_code_init; |
337 | | // Some developers will modify status.h and we use a very ticky logic to init error_states |
338 | | // and it maybe not inited. So add a check here. |
339 | 8 | doris::ErrorCode::error_code_init.check_init(); |
340 | | // check if print version or help |
341 | 8 | if (argc > 1) { |
342 | 1 | if (strcmp(argv[1], "--version") == 0 || strcmp(argv[1], "-v") == 0) { |
343 | 1 | puts(doris::get_build_version(false).c_str()); |
344 | 1 | exit(0); |
345 | 1 | } else if (strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-?") == 0) { |
346 | 0 | help(basename(argv[0])); |
347 | 0 | exit(0); |
348 | 0 | } |
349 | 1 | } |
350 | | |
351 | 7 | if (getenv("DORIS_HOME") == nullptr) { |
352 | 0 | fprintf(stderr, "you need set DORIS_HOME environment variable.\n"); |
353 | 0 | exit(-1); |
354 | 0 | } |
355 | 7 | if (getenv("PID_DIR") == nullptr) { |
356 | 0 | fprintf(stderr, "you need set PID_DIR environment variable.\n"); |
357 | 0 | exit(-1); |
358 | 0 | } |
359 | | |
360 | 7 | SCOPED_INIT_THREAD_CONTEXT(); |
361 | | |
362 | 7 | using doris::Status; |
363 | 7 | using std::string; |
364 | | |
365 | | // open pid file, obtain file lock and save pid |
366 | 7 | string pid_file = string(getenv("PID_DIR")) + "/be.pid"; |
367 | 7 | int fd = open(pid_file.c_str(), O_RDWR | O_CREAT, |
368 | 7 | S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH); |
369 | 7 | if (fd < 0) { |
370 | 0 | fprintf(stderr, "fail to create pid file."); |
371 | 0 | exit(-1); |
372 | 0 | } |
373 | | |
374 | 7 | string pid = std::to_string((long)getpid()); |
375 | 7 | pid += "\n"; |
376 | 7 | size_t length = write(fd, pid.c_str(), pid.size()); |
377 | 7 | if (length != pid.size()) { |
378 | 0 | fprintf(stderr, "fail to save pid into pid file."); |
379 | 0 | exit(-1); |
380 | 0 | } |
381 | | |
382 | | // descriptor will be leaked when failing to close fd |
383 | 7 | if (::close(fd) < 0) { |
384 | 0 | fprintf(stderr, "failed to close fd of pidfile."); |
385 | 0 | exit(-1); |
386 | 0 | } |
387 | | |
388 | | // init config. |
389 | | // the config in be_custom.conf will overwrite the config in be.conf |
390 | | // Must init custom config after init config, separately. |
391 | | // Because the path of custom config file is defined in be.conf |
392 | 7 | string conffile = string(getenv("DORIS_HOME")) + "/conf/be.conf"; |
393 | 7 | if (!doris::config::init(conffile.c_str(), true, true, true)) { |
394 | 0 | fprintf(stderr, "error read config file. \n"); |
395 | 0 | return -1; |
396 | 0 | } |
397 | | |
398 | 7 | string custom_conffile = doris::config::custom_config_dir + "/be_custom.conf"; |
399 | 7 | if (!doris::config::init(custom_conffile.c_str(), true, false, false)) { |
400 | 0 | fprintf(stderr, "error read custom config file. \n"); |
401 | 0 | return -1; |
402 | 0 | } |
403 | | |
404 | | // ATTN: Callers that want to override default gflags variables should do so before calling this method |
405 | 7 | google::ParseCommandLineFlags(&argc, &argv, true); |
406 | | // ATTN: MUST init before LOG |
407 | 7 | doris::init_glog("be"); |
408 | | |
409 | 7 | LOG(INFO) << doris::get_version_string(false); |
410 | | |
411 | 7 | doris::init_thrift_logging(); |
412 | | |
413 | 7 | if (doris::config::enable_fuzzy_mode) { |
414 | 7 | Status status = doris::config::set_fuzzy_configs(); |
415 | 7 | if (!status.ok()) { |
416 | 0 | LOG(WARNING) << "Failed to initialize fuzzy config: " << status; |
417 | 0 | exit(1); |
418 | 0 | } |
419 | 7 | } |
420 | | |
421 | | #if !defined(__SANITIZE_ADDRESS__) && !defined(ADDRESS_SANITIZER) && !defined(LEAK_SANITIZER) && \ |
422 | | !defined(THREAD_SANITIZER) && !defined(USE_JEMALLOC) |
423 | | // Change the total TCMalloc thread cache size if necessary. |
424 | | const size_t kDefaultTotalThreadCacheBytes = 1024 * 1024 * 1024; |
425 | | if (!MallocExtension::instance()->SetNumericProperty("tcmalloc.max_total_thread_cache_bytes", |
426 | | kDefaultTotalThreadCacheBytes)) { |
427 | | fprintf(stderr, "Failed to change TCMalloc total thread cache size.\n"); |
428 | | return -1; |
429 | | } |
430 | | #endif |
431 | | |
432 | 7 | std::vector<doris::StorePath> paths; |
433 | 7 | auto olap_res = doris::parse_conf_store_paths(doris::config::storage_root_path, &paths); |
434 | 7 | if (!olap_res) { |
435 | 0 | LOG(ERROR) << "parse config storage path failed, path=" << doris::config::storage_root_path; |
436 | 0 | exit(-1); |
437 | 0 | } |
438 | | |
439 | 7 | std::vector<doris::StorePath> spill_paths; |
440 | 7 | if (doris::config::spill_storage_root_path.empty()) { |
441 | 7 | doris::config::spill_storage_root_path = doris::config::storage_root_path; |
442 | 7 | } |
443 | 7 | olap_res = doris::parse_conf_store_paths(doris::config::spill_storage_root_path, &spill_paths); |
444 | 7 | if (!olap_res) { |
445 | 0 | LOG(ERROR) << "parse config spill storage path failed, path=" |
446 | 0 | << doris::config::spill_storage_root_path; |
447 | 0 | exit(-1); |
448 | 0 | } |
449 | 7 | std::set<std::string> broken_paths; |
450 | 7 | doris::parse_conf_broken_store_paths(doris::config::broken_storage_path, &broken_paths); |
451 | | |
452 | 7 | auto it = paths.begin(); |
453 | 18 | for (; it != paths.end();) { |
454 | 11 | if (broken_paths.count(it->path) > 0) { |
455 | 0 | if (doris::config::ignore_broken_disk) { |
456 | 0 | LOG(WARNING) << "ignore broken disk, path = " << it->path; |
457 | 0 | it = paths.erase(it); |
458 | 0 | } else { |
459 | 0 | LOG(ERROR) << "a broken disk is found " << it->path; |
460 | 0 | exit(-1); |
461 | 0 | } |
462 | 11 | } else if (!doris::check_datapath_rw(it->path)) { |
463 | 0 | if (doris::config::ignore_broken_disk) { |
464 | 0 | LOG(WARNING) << "read write test file failed, path=" << it->path; |
465 | 0 | it = paths.erase(it); |
466 | 0 | } else { |
467 | 0 | LOG(ERROR) << "read write test file failed, path=" << it->path; |
468 | | // if only one disk and the disk is full, also need exit because rocksdb will open failed |
469 | 0 | exit(-1); |
470 | 0 | } |
471 | 11 | } else { |
472 | 11 | ++it; |
473 | 11 | } |
474 | 11 | } |
475 | | |
476 | 7 | if (paths.empty()) { |
477 | 0 | LOG(ERROR) << "All disks are broken, exit."; |
478 | 0 | exit(-1); |
479 | 0 | } |
480 | | |
481 | 7 | it = spill_paths.begin(); |
482 | 18 | for (; it != spill_paths.end();) { |
483 | 11 | if (!doris::check_datapath_rw(it->path)) { |
484 | 0 | if (doris::config::ignore_broken_disk) { |
485 | 0 | LOG(WARNING) << "read write test file failed, path=" << it->path; |
486 | 0 | it = spill_paths.erase(it); |
487 | 0 | } else { |
488 | 0 | LOG(ERROR) << "read write test file failed, path=" << it->path; |
489 | 0 | exit(-1); |
490 | 0 | } |
491 | 11 | } else { |
492 | 11 | ++it; |
493 | 11 | } |
494 | 11 | } |
495 | 7 | if (spill_paths.empty()) { |
496 | 0 | LOG(ERROR) << "All spill disks are broken, exit."; |
497 | 0 | exit(-1); |
498 | 0 | } |
499 | | |
500 | | // initialize libcurl here to avoid concurrent initialization |
501 | 7 | auto curl_ret = curl_global_init(CURL_GLOBAL_ALL); |
502 | 7 | if (curl_ret != 0) { |
503 | 0 | LOG(ERROR) << "fail to initialize libcurl, curl_ret=" << curl_ret; |
504 | 0 | exit(-1); |
505 | 0 | } |
506 | | // add logger for thrift internal |
507 | 7 | apache::thrift::GlobalOutput.setOutputFunction(doris::thrift_output); |
508 | | |
509 | 7 | Status status = Status::OK(); |
510 | 7 | if (doris::config::enable_java_support) { |
511 | | // Init jni |
512 | 7 | status = doris::Jni::Util::Init(); |
513 | 7 | if (!status.ok()) { |
514 | 0 | LOG(WARNING) << "Failed to initialize JNI: " << status; |
515 | 0 | exit(1); |
516 | 7 | } else { |
517 | 7 | LOG(INFO) << "Doris backend JNI is initialized."; |
518 | 7 | } |
519 | 7 | } |
520 | | |
521 | 7 | if (doris::config::enable_python_udf_support) { |
522 | 7 | if (std::string python_udf_root_path = |
523 | 7 | fmt::format("{}/lib/udf/python", std::getenv("DORIS_HOME")); |
524 | 7 | !std::filesystem::exists(python_udf_root_path)) { |
525 | 3 | std::filesystem::create_directories(python_udf_root_path); |
526 | 3 | } |
527 | | |
528 | | // Normalize and trim all Python-related config parameters |
529 | 7 | std::string python_env_mode = |
530 | 7 | std::string(doris::trim(doris::to_lower(doris::config::python_env_mode))); |
531 | 7 | std::string python_conda_root_path = |
532 | 7 | std::string(doris::trim(doris::config::python_conda_root_path)); |
533 | 7 | std::string python_venv_root_path = |
534 | 7 | std::string(doris::trim(doris::config::python_venv_root_path)); |
535 | 7 | std::string python_venv_interpreter_paths = |
536 | 7 | std::string(doris::trim(doris::config::python_venv_interpreter_paths)); |
537 | | |
538 | 7 | if (python_env_mode == "conda") { |
539 | 1 | if (python_conda_root_path.empty()) { |
540 | 0 | LOG(ERROR) |
541 | 0 | << "Python conda root path is empty, please set `python_conda_root_path` " |
542 | 0 | "or set `enable_python_udf_support` to `false`"; |
543 | 0 | exit(1); |
544 | 0 | } |
545 | 1 | LOG(INFO) << "Doris backend python version manager is initialized. Python conda " |
546 | 1 | "root path: " |
547 | 1 | << python_conda_root_path; |
548 | 1 | status = doris::PythonVersionManager::instance().init(doris::PythonEnvType::CONDA, |
549 | 1 | python_conda_root_path, ""); |
550 | 6 | } else if (python_env_mode == "venv") { |
551 | 6 | if (python_venv_root_path.empty()) { |
552 | 0 | LOG(ERROR) |
553 | 0 | << "Python venv root path is empty, please set `python_venv_root_path` or " |
554 | 0 | "set `enable_python_udf_support` to `false`"; |
555 | 0 | exit(1); |
556 | 0 | } |
557 | 6 | if (python_venv_interpreter_paths.empty()) { |
558 | 0 | LOG(ERROR) |
559 | 0 | << "Python interpreter paths is empty, please set " |
560 | 0 | "`python_venv_interpreter_paths` or set `enable_python_udf_support` to " |
561 | 0 | "`false`"; |
562 | 0 | exit(1); |
563 | 0 | } |
564 | 6 | LOG(INFO) << "Doris backend python version manager is initialized. Python venv " |
565 | 6 | "root path: " |
566 | 6 | << python_venv_root_path |
567 | 6 | << ", python interpreter paths: " << python_venv_interpreter_paths; |
568 | 6 | status = doris::PythonVersionManager::instance().init(doris::PythonEnvType::VENV, |
569 | 6 | python_venv_root_path, |
570 | 6 | python_venv_interpreter_paths); |
571 | 6 | } else { |
572 | 0 | status = Status::InvalidArgument( |
573 | 0 | "Python env mode is invalid, should be `conda` or `venv`. If you don't want to " |
574 | 0 | "enable the Python UDF function, please set `enable_python_udf_support` to " |
575 | 0 | "`false`"); |
576 | 0 | } |
577 | | |
578 | 7 | if (!status.ok()) { |
579 | 0 | LOG(ERROR) << "Failed to initialize python version manager: " << status; |
580 | 0 | exit(1); |
581 | 0 | } |
582 | 7 | LOG(INFO) << doris::PythonVersionManager::instance().to_string(); |
583 | 7 | } |
584 | | |
585 | | // Doris own signal handler must be register after jvm is init. |
586 | | // Or our own sig-handler for SIGINT & SIGTERM will not be chained ... |
587 | | // https://www.oracle.com/java/technologies/javase/signals.html |
588 | 7 | doris::init_signals(); |
589 | | // ATTN: MUST init before `ExecEnv`, `StorageEngine` and other daemon services |
590 | | // |
591 | | // Daemon ───┬──► StorageEngine ──► ExecEnv ──► Disk/Mem/CpuInfo |
592 | | // │ |
593 | | // │ |
594 | | // BackendService ─┘ |
595 | 7 | doris::CpuInfo::init(); |
596 | 7 | doris::DiskInfo::init(); |
597 | 7 | doris::MemInfo::init(); |
598 | | |
599 | 7 | LOG(INFO) << doris::CpuInfo::debug_string(); |
600 | 7 | LOG(INFO) << doris::DiskInfo::debug_string(); |
601 | 7 | LOG(INFO) << doris::MemInfo::debug_string(); |
602 | | |
603 | | // PHDR speed up exception handling, but exceptions from dynamically loaded libraries (dlopen) |
604 | | // will work only after additional call of this function. |
605 | | // rewrites dl_iterate_phdr will cause Jemalloc to fail to run after enable profile. see # |
606 | | // updatePHDRCache(); |
607 | 7 | if (!doris::BackendOptions::init()) { |
608 | 0 | exit(-1); |
609 | 0 | } |
610 | | |
611 | | // init exec env |
612 | 7 | auto* exec_env(doris::ExecEnv::GetInstance()); |
613 | 7 | status = doris::ExecEnv::init(doris::ExecEnv::GetInstance(), paths, spill_paths, broken_paths); |
614 | 7 | if (status != Status::OK()) { |
615 | 0 | std::cerr << "failed to init doris storage engine, res=" << status; |
616 | 0 | exit_on_startup_failure(); |
617 | 0 | } |
618 | | |
619 | | // Start concurrency stats manager |
620 | 7 | doris::ConcurrencyStatsManager::instance().start(); |
621 | | |
622 | | // begin to start services |
623 | 7 | doris::ThriftRpcHelper::setup(exec_env); |
624 | | // 1. thrift server with be_port |
625 | 7 | std::shared_ptr<doris::BaseBackendService> service; |
626 | 7 | std::function<void(Status&, std::string_view)> stop_work_if_error = [&](Status& status, |
627 | 42 | std::string_view msg) { |
628 | 42 | if (!status.ok()) { |
629 | 0 | std::cerr << msg << '\n'; |
630 | 0 | service->stop_works(); |
631 | 0 | exit_on_startup_failure(); |
632 | 0 | } |
633 | 42 | }; |
634 | | |
635 | 7 | if (doris::config::is_cloud_mode()) { |
636 | 1 | service = std::make_shared<doris::CloudBackendService>( |
637 | 1 | exec_env->storage_engine().to_cloud(), exec_env); |
638 | 6 | } else { |
639 | 6 | service = std::make_shared<doris::BackendService>(exec_env->storage_engine().to_local(), |
640 | 6 | exec_env); |
641 | 6 | } |
642 | | |
643 | 7 | std::unique_ptr<doris::server::IServerStarter> backend_thrift_starter; |
644 | 7 | EXIT_IF_ERROR(doris::server::create_backend_thrift_starter(exec_env, doris::config::be_port, |
645 | 7 | service, &backend_thrift_starter)); |
646 | 7 | status = backend_thrift_starter->start(); |
647 | 7 | stop_work_if_error(status, "Doris BE server did not start correctly, exiting"); |
648 | | |
649 | | // 2. brpc service |
650 | 7 | std::unique_ptr<doris::server::IServerStarter> brpc_starter; |
651 | 7 | EXIT_IF_ERROR(doris::server::create_brpc_starter( |
652 | 7 | exec_env, doris::config::brpc_port, doris::config::brpc_num_threads, &brpc_starter)); |
653 | 7 | status = brpc_starter->start(); |
654 | 7 | stop_work_if_error(status, "BRPC service did not start correctly, exiting"); |
655 | | |
656 | | // 3. http service |
657 | 7 | std::unique_ptr<doris::server::IServerStarter> http_starter; |
658 | 7 | EXIT_IF_ERROR(doris::server::create_http_starter(exec_env, doris::config::webserver_port, |
659 | 7 | doris::config::webserver_num_workers, |
660 | 7 | &http_starter)); |
661 | 7 | status = http_starter->start(); |
662 | 7 | stop_work_if_error(status, "Doris Be http service did not start correctly, exiting"); |
663 | | |
664 | | // 4. heart beat server |
665 | 7 | doris::ClusterInfo* cluster_info = exec_env->cluster_info(); |
666 | 7 | std::unique_ptr<doris::server::IServerStarter> heartbeat_thrift_starter; |
667 | 7 | status = doris::server::create_heartbeat_thrift_starter( |
668 | 7 | exec_env, doris::config::heartbeat_service_port, |
669 | 7 | doris::config::heartbeat_service_thread_count, cluster_info, &heartbeat_thrift_starter); |
670 | 7 | stop_work_if_error(status, "Heartbeat services did not start correctly, exiting"); |
671 | | |
672 | 7 | status = heartbeat_thrift_starter->start(); |
673 | 7 | stop_work_if_error(status, "Doris BE HeartBeat Service did not start correctly, exiting: " + |
674 | 7 | status.to_string()); |
675 | | |
676 | | // 5. arrow flight service |
677 | 7 | std::unique_ptr<doris::server::IServerStarter> flight_starter; |
678 | 7 | EXIT_IF_ERROR(doris::server::create_flight_starter(doris::config::arrow_flight_sql_port, |
679 | 7 | &flight_starter)); |
680 | 7 | status = flight_starter->start(); |
681 | 7 | stop_work_if_error( |
682 | 7 | status, "Arrow Flight Service did not start correctly, exiting, " + status.to_string()); |
683 | | |
684 | | // 6. start daemon thread to do clean or gc jobs |
685 | 7 | doris::Daemon daemon; |
686 | 7 | daemon.start(); |
687 | | |
688 | 7 | exec_env->storage_engine().notify_listeners(); |
689 | | |
690 | 7 | doris::k_is_server_ready = true; |
691 | | |
692 | 4.42k | while (!doris::k_doris_exit) { |
693 | | #if defined(LEAK_SANITIZER) |
694 | | __lsan_do_leak_check(); |
695 | | #endif |
696 | 4.42k | sleep(3); |
697 | 4.42k | } |
698 | 7 | doris::k_is_server_ready = false; |
699 | 7 | LOG(INFO) << "Doris main exiting."; |
700 | 7 | #if defined(LLVM_PROFILE) |
701 | 7 | __llvm_profile_write_file(); |
702 | 7 | LOG(INFO) << "Flush profile file."; |
703 | 7 | #endif |
704 | | // For graceful shutdown, need to wait for all running queries to stop |
705 | 7 | exec_env->wait_for_all_tasks_done(); |
706 | | |
707 | 7 | if (!doris::config::enable_graceful_exit_check) { |
708 | | // If not in memleak check mode, no need to wait all objects de-constructed normally, just exit. |
709 | | // It will make sure that graceful shutdown can be done definitely. |
710 | 0 | LOG(INFO) << "Doris main exited."; |
711 | 0 | google::FlushLogFiles(google::GLOG_INFO); |
712 | 0 | _exit(0); // Do not call exit(0), it will wait for all objects de-constructed normally |
713 | 0 | return 0; |
714 | 0 | } |
715 | 7 | daemon.stop(); |
716 | 7 | flight_starter->stop(); |
717 | 7 | flight_starter->join(); |
718 | 7 | LOG(INFO) << "Flight server stopped."; |
719 | 7 | heartbeat_thrift_starter->stop(); |
720 | 7 | heartbeat_thrift_starter->join(); |
721 | 7 | LOG(INFO) << "Heartbeat server stopped"; |
722 | | // TODO(zhiqiang): http_service |
723 | 7 | http_starter->stop(); |
724 | 7 | http_starter->join(); |
725 | 7 | LOG(INFO) << "Http service stopped"; |
726 | 7 | backend_thrift_starter->stop(); |
727 | 7 | backend_thrift_starter->join(); |
728 | 7 | LOG(INFO) << "Be server stopped"; |
729 | 7 | brpc_starter->stop(); |
730 | 7 | brpc_starter->join(); |
731 | 7 | LOG(INFO) << "Brpc service stopped"; |
732 | 7 | service.reset(); |
733 | 7 | LOG(INFO) << "Backend Service stopped"; |
734 | 7 | exec_env->destroy(); |
735 | 7 | LOG(INFO) << "All service stopped, doris main exited."; |
736 | 7 | return 0; |
737 | 7 | } |
738 | | |
739 | 0 | static void help(const char* progname) { |
740 | 0 | printf("%s is the Doris backend server.\n\n", progname); |
741 | 0 | printf("Usage:\n %s [OPTION]...\n\n", progname); |
742 | 0 | printf("Options:\n"); |
743 | 0 | printf(" -v, --version output version information, then exit\n"); |
744 | 0 | printf(" -?, --help show this help, then exit\n"); |
745 | 0 | } |