/root/doris/be/src/util/async_io.h
| Line | Count | Source (jump to first uncovered line) | 
| 1 |  | // Licensed to the Apache Software Foundation (ASF) under one | 
| 2 |  | // or more contributor license agreements.  See the NOTICE file | 
| 3 |  | // distributed with this work for additional information | 
| 4 |  | // regarding copyright ownership.  The ASF licenses this file | 
| 5 |  | // to you under the Apache License, Version 2.0 (the | 
| 6 |  | // "License"); you may not use this file except in compliance | 
| 7 |  | // with the License.  You may obtain a copy of the License at | 
| 8 |  | // | 
| 9 |  | //   http://www.apache.org/licenses/LICENSE-2.0 | 
| 10 |  | // | 
| 11 |  | // Unless required by applicable law or agreed to in writing, | 
| 12 |  | // software distributed under the License is distributed on an | 
| 13 |  | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | 
| 14 |  | // KIND, either express or implied.  See the License for the | 
| 15 |  | // specific language governing permissions and limitations | 
| 16 |  | // under the License. | 
| 17 |  |  | 
| 18 |  | #pragma once | 
| 19 |  |  | 
| 20 |  | #include <bthread/bthread.h> | 
| 21 |  |  | 
| 22 |  | #include "io/fs/file_system.h" | 
| 23 |  | #include "olap/olap_define.h" | 
| 24 |  | #include "work_thread_pool.hpp" | 
| 25 |  |  | 
| 26 |  | namespace doris { | 
| 27 |  |  | 
| 28 |  | struct AsyncIOCtx { | 
| 29 |  |     int nice; | 
| 30 |  | }; | 
| 31 |  |  | 
| 32 |  | /** | 
| 33 |  |  * Separate task from bthread to pthread, specific for IO task. | 
| 34 |  |  */ | 
| 35 |  | class AsyncIO { | 
| 36 |  | public: | 
| 37 | 1 |     AsyncIO() { | 
| 38 | 1 |         _io_thread_pool = new PriorityThreadPool(config::doris_scanner_thread_pool_thread_num, | 
| 39 | 1 |                                                  config::doris_scanner_thread_pool_queue_size, | 
| 40 | 1 |                                                  "async_io_thread_pool"); | 
| 41 | 1 |         _remote_thread_pool = new PriorityThreadPool( | 
| 42 | 1 |                 config::doris_remote_scanner_thread_pool_thread_num, | 
| 43 | 1 |                 config::doris_remote_scanner_thread_pool_queue_size, "async_remote_thread_pool"); | 
| 44 | 1 |     } | 
| 45 |  |  | 
| 46 | 1 |     ~AsyncIO() { | 
| 47 | 1 |         SAFE_DELETE(_io_thread_pool); | Line | Count | Source |  | 168 | 1 |     do {                      \ |  | 169 | 1 |         if (nullptr != ptr) { \  Branch (169:13): [True: 1, False: 0]
 |  | 170 | 1 |             delete ptr;       \ |  | 171 | 1 |             ptr = nullptr;    \ |  | 172 | 1 |         }                     \ |  | 173 | 1 |     } while (0)   Branch (173:14): [Folded - Ignored]
 | 
 | 
| 48 | 1 |         SAFE_DELETE(_remote_thread_pool); | Line | Count | Source |  | 168 | 1 |     do {                      \ |  | 169 | 1 |         if (nullptr != ptr) { \  Branch (169:13): [True: 1, False: 0]
 |  | 170 | 1 |             delete ptr;       \ |  | 171 | 1 |             ptr = nullptr;    \ |  | 172 | 1 |         }                     \ |  | 173 | 1 |     } while (0)   Branch (173:14): [Folded - Ignored]
 | 
 | 
| 49 | 1 |     } | 
| 50 |  |  | 
| 51 |  |     AsyncIO& operator=(const AsyncIO&) = delete; | 
| 52 |  |     AsyncIO(const AsyncIO&) = delete; | 
| 53 |  |  | 
| 54 | 3 |     static AsyncIO& instance() { | 
| 55 | 3 |         static AsyncIO instance; | 
| 56 | 3 |         return instance; | 
| 57 | 3 |     } | 
| 58 |  |  | 
| 59 |  |     // This function should run on the bthread, and it will put the task into | 
| 60 |  |     // thread_pool and release the bthread_worker at cv.wait. When the task is completed, | 
| 61 |  |     // the bthread will continue to execute. | 
| 62 | 3 |     static void run_task(std::function<void()> fn, io::FileSystemType file_type) { | 
| 63 | 3 |         DCHECK(bthread_self() != 0); | 
| 64 | 3 |         std::mutex mutex; | 
| 65 | 3 |         std::condition_variable cv; | 
| 66 | 3 |         std::unique_lock l(mutex); | 
| 67 |  |  | 
| 68 | 3 |         AsyncIOCtx* ctx = static_cast<AsyncIOCtx*>(bthread_getspecific(btls_io_ctx_key)); | 
| 69 | 3 |         int nice = -1; | 
| 70 | 3 |         if (ctx == nullptr) {  Branch (70:13): [True: 0, False: 3]
 | 
| 71 | 0 |             nice = 18; | 
| 72 | 3 |         } else { | 
| 73 | 3 |             nice = ctx->nice; | 
| 74 | 3 |         } | 
| 75 |  |  | 
| 76 | 3 |         PriorityThreadPool::Task task; | 
| 77 | 3 |         task.priority = nice; | 
| 78 | 3 |         task.work_function = [&] { | 
| 79 | 3 |             fn(); | 
| 80 | 3 |             std::unique_lock l(mutex); | 
| 81 | 3 |             cv.notify_one(); | 
| 82 | 3 |         }; | 
| 83 |  |  | 
| 84 | 3 |         if (file_type == io::FileSystemType::LOCAL) {  Branch (84:13): [True: 3, False: 0]
 | 
| 85 | 3 |             AsyncIO::instance().io_thread_pool()->offer(task); | 
| 86 | 3 |         } else { | 
| 87 | 0 |             AsyncIO::instance().remote_thread_pool()->offer(task); | 
| 88 | 0 |         } | 
| 89 | 3 |         cv.wait(l); | 
| 90 | 3 |     } | 
| 91 |  |  | 
| 92 |  |     inline static bthread_key_t btls_io_ctx_key; | 
| 93 |  |  | 
| 94 | 0 |     static void io_ctx_key_deleter(void* d) { delete static_cast<AsyncIOCtx*>(d); } | 
| 95 |  |  | 
| 96 |  | private: | 
| 97 |  |     PriorityThreadPool* _io_thread_pool = nullptr; | 
| 98 |  |     PriorityThreadPool* _remote_thread_pool = nullptr; | 
| 99 |  |  | 
| 100 |  | private: | 
| 101 | 3 |     PriorityThreadPool* io_thread_pool() { return _io_thread_pool; } | 
| 102 | 0 |     PriorityThreadPool* remote_thread_pool() { return _remote_thread_pool; } | 
| 103 |  | }; | 
| 104 |  |  | 
| 105 |  | } // end namespace doris |