/root/doris/be/src/olap/memtable_writer.cpp
| Line | Count | Source (jump to first uncovered line) | 
| 1 |  | // Licensed to the Apache Software Foundation (ASF) under one | 
| 2 |  | // or more contributor license agreements.  See the NOTICE file | 
| 3 |  | // distributed with this work for additional information | 
| 4 |  | // regarding copyright ownership.  The ASF licenses this file | 
| 5 |  | // to you under the Apache License, Version 2.0 (the | 
| 6 |  | // "License"); you may not use this file except in compliance | 
| 7 |  | // with the License.  You may obtain a copy of the License at | 
| 8 |  | // | 
| 9 |  | //   http://www.apache.org/licenses/LICENSE-2.0 | 
| 10 |  | // | 
| 11 |  | // Unless required by applicable law or agreed to in writing, | 
| 12 |  | // software distributed under the License is distributed on an | 
| 13 |  | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | 
| 14 |  | // KIND, either express or implied.  See the License for the | 
| 15 |  | // specific language governing permissions and limitations | 
| 16 |  | // under the License. | 
| 17 |  |  | 
| 18 |  | #include "olap/memtable_writer.h" | 
| 19 |  |  | 
| 20 |  | #include <fmt/format.h> | 
| 21 |  |  | 
| 22 |  | #include <ostream> | 
| 23 |  | #include <string> | 
| 24 |  |  | 
| 25 |  | #include "common/compiler_util.h" // IWYU pragma: keep | 
| 26 |  | #include "common/config.h" | 
| 27 |  | #include "common/logging.h" | 
| 28 |  | #include "common/status.h" | 
| 29 |  | #include "exec/tablet_info.h" | 
| 30 |  | #include "gutil/strings/numbers.h" | 
| 31 |  | #include "io/fs/file_writer.h" // IWYU pragma: keep | 
| 32 |  | #include "olap/memtable.h" | 
| 33 |  | #include "olap/memtable_flush_executor.h" | 
| 34 |  | #include "olap/memtable_memory_limiter.h" | 
| 35 |  | #include "olap/rowset/beta_rowset_writer.h" | 
| 36 |  | #include "olap/rowset/rowset_writer.h" | 
| 37 |  | #include "olap/schema_change.h" | 
| 38 |  | #include "olap/storage_engine.h" | 
| 39 |  | #include "olap/tablet_schema.h" | 
| 40 |  | #include "runtime/exec_env.h" | 
| 41 |  | #include "util/mem_info.h" | 
| 42 |  | #include "util/stopwatch.hpp" | 
| 43 |  | #include "vec/core/block.h" | 
| 44 |  |  | 
| 45 |  | namespace doris { | 
| 46 |  | using namespace ErrorCode; | 
| 47 |  |  | 
| 48 | 17 | MemTableWriter::MemTableWriter(const WriteRequest& req) : _req(req) {} | 
| 49 |  |  | 
| 50 | 17 | MemTableWriter::~MemTableWriter() { | 
| 51 | 17 |     if (!_is_init) {  Branch (51:9): [True: 2, False: 15]
 | 
| 52 | 2 |         return; | 
| 53 | 2 |     } | 
| 54 | 15 |     if (_flush_token != nullptr) {  Branch (54:9): [True: 15, False: 0]
 | 
| 55 |  |         // cancel and wait all memtables in flush queue to be finished | 
| 56 | 15 |         _flush_token->cancel(); | 
| 57 | 15 |     } | 
| 58 | 15 |     _mem_table.reset(); | 
| 59 | 15 | } | 
| 60 |  |  | 
| 61 |  | Status MemTableWriter::init(std::shared_ptr<RowsetWriter> rowset_writer, | 
| 62 |  |                             TabletSchemaSPtr tablet_schema, | 
| 63 |  |                             std::shared_ptr<PartialUpdateInfo> partial_update_info, | 
| 64 | 15 |                             std::shared_ptr<WorkloadGroup> wg_sptr, bool unique_key_mow) { | 
| 65 | 15 |     _rowset_writer = rowset_writer; | 
| 66 | 15 |     _tablet_schema = tablet_schema; | 
| 67 | 15 |     _unique_key_mow = unique_key_mow; | 
| 68 | 15 |     _partial_update_info = partial_update_info; | 
| 69 | 15 |     _query_thread_context.init_unlocked(); | 
| 70 |  |  | 
| 71 | 15 |     _reset_mem_table(); | 
| 72 |  |  | 
| 73 |  |     // create flush handler | 
| 74 |  |     // by assigning segment_id to memtable before submiting to flush executor, | 
| 75 |  |     // we can make sure same keys sort in the same order in all replicas. | 
| 76 | 15 |     RETURN_IF_ERROR( | Line | Count | Source |  | 637 | 15 |     do {                                \ |  | 638 | 15 |         Status _status_ = (stmt);       \ |  | 639 | 15 |         if (UNLIKELY(!_status_.ok())) { \| Line | Count | Source |  | 36 | 15 | #define UNLIKELY(expr) __builtin_expect(!!(expr), 0)   Branch (36:24): [True: 0, False: 15]
 | 
 |  | 640 | 0 |             return _status_;            \ |  | 641 | 0 |         }                               \ |  | 642 | 15 |     } while (false)   Branch (642:14): [Folded - Ignored]
 | 
 | 
| 77 | 15 |             ExecEnv::GetInstance()->storage_engine().memtable_flush_executor()->create_flush_token( | 
| 78 | 15 |                     _flush_token, _rowset_writer, _req.is_high_priority, wg_sptr)); | 
| 79 |  |  | 
| 80 | 15 |     _is_init = true; | 
| 81 | 15 |     return Status::OK(); | 
| 82 | 15 | } | 
| 83 |  |  | 
| 84 |  | Status MemTableWriter::write(const vectorized::Block* block, | 
| 85 | 20 |                              const std::vector<uint32_t>& row_idxs) { | 
| 86 | 20 |     if (UNLIKELY(row_idxs.empty())) {| Line | Count | Source |  | 36 | 20 | #define UNLIKELY(expr) __builtin_expect(!!(expr), 0)   Branch (36:24): [True: 0, False: 20]
 | 
 | 
| 87 | 0 |         return Status::OK(); | 
| 88 | 0 |     } | 
| 89 | 20 |     _lock_watch.start(); | 
| 90 | 20 |     std::lock_guard<std::mutex> l(_lock); | 
| 91 | 20 |     _lock_watch.stop(); | 
| 92 | 20 |     if (_is_cancelled) {  Branch (92:9): [True: 0, False: 20]
 | 
| 93 | 0 |         return _cancel_status; | 
| 94 | 0 |     } | 
| 95 | 20 |     if (!_is_init) {  Branch (95:9): [True: 0, False: 20]
 | 
| 96 | 0 |         return Status::Error<NOT_INITIALIZED>("delta segment writer has not been initialized"); | 
| 97 | 0 |     } | 
| 98 | 20 |     if (_is_closed) {  Branch (98:9): [True: 0, False: 20]
 | 
| 99 | 0 |         return Status::Error<ALREADY_CLOSED>("write block after closed tablet_id={}, load_id={}-{}", | 
| 100 | 0 |                                              _req.tablet_id, _req.load_id.hi(), _req.load_id.lo()); | 
| 101 | 0 |     } | 
| 102 |  |  | 
| 103 | 20 |     _total_received_rows += row_idxs.size(); | 
| 104 | 20 |     auto st = _mem_table->insert(block, row_idxs); | 
| 105 |  |  | 
| 106 |  |     // Reset memtable immediately after insert failure to prevent potential flush operations. | 
| 107 |  |     // This is a defensive measure because: | 
| 108 |  |     // 1. When insert fails (e.g., memory allocation failure during add_rows), | 
| 109 |  |     //    the memtable is in an inconsistent state and should not be flushed | 
| 110 |  |     // 2. However, memory pressure might trigger a flush operation on this failed memtable | 
| 111 |  |     // 3. By resetting here, we ensure the failed memtable won't be included in any subsequent flush, | 
| 112 |  |     //    thus preventing potential crashes | 
| 113 | 20 |     DBUG_EXECUTE_IF("MemTableWriter.write.random_insert_error", {| Line | Count | Source |  | 37 | 20 |     if (UNLIKELY(config::enable_debug_points)) {                              \| Line | Count | Source |  | 36 | 20 | #define UNLIKELY(expr) __builtin_expect(!!(expr), 0)   Branch (36:24): [True: 0, False: 20]
 | 
 |  | 38 | 0 |         auto dp = DebugPoints::instance()->get_debug_point(debug_point_name); \ |  | 39 | 0 |         if (dp) {                                                             \  Branch (39:13): [True: 0, False: 0]
 |  | 40 | 0 |             [[maybe_unused]] auto DP_NAME = debug_point_name;                 \ |  | 41 | 0 |             { code; }                                                         \  Branch (41:15): [True: 0, False: 0]
 |  | 42 | 0 |         }                                                                     \ |  | 43 | 0 |     } | 
 | 
| 114 | 20 |         if (rand() % 100 < (100 * dp->param("percent", 0.3))) { | 
| 115 | 20 |             st = Status::InternalError<false>("write memtable random failed for debug"); | 
| 116 | 20 |         } | 
| 117 | 20 |     }); | 
| 118 | 20 |     if (!st.ok()) [[unlikely]] {  Branch (118:9): [True: 0, False: 20]
 | 
| 119 | 0 |         _reset_mem_table(); | 
| 120 | 0 |         return st; | 
| 121 | 0 |     } | 
| 122 |  |  | 
| 123 | 20 |     if (UNLIKELY(_mem_table->need_agg() && config::enable_shrink_memory)) {| Line | Count | Source |  | 36 | 20 | #define UNLIKELY(expr) __builtin_expect(!!(expr), 0)   Branch (36:24): [True: 0, False: 20]
  Branch (36:44): [True: 0, False: 20]
  Branch (36:44): [True: 0, False: 0]
 | 
 | 
| 124 | 0 |         _mem_table->shrink_memtable_by_agg(); | 
| 125 | 0 |     } | 
| 126 | 20 |     if (UNLIKELY(_mem_table->need_flush())) {| Line | Count | Source |  | 36 | 20 | #define UNLIKELY(expr) __builtin_expect(!!(expr), 0)   Branch (36:24): [True: 0, False: 20]
 | 
 | 
| 127 | 0 |         auto s = _flush_memtable_async(); | 
| 128 | 0 |         _reset_mem_table(); | 
| 129 | 0 |         if (UNLIKELY(!s.ok())) {| Line | Count | Source |  | 36 | 0 | #define UNLIKELY(expr) __builtin_expect(!!(expr), 0)   Branch (36:24): [True: 0, False: 0]
 | 
 | 
| 130 | 0 |             return s; | 
| 131 | 0 |         } | 
| 132 | 0 |     } | 
| 133 |  |  | 
| 134 | 20 |     return Status::OK(); | 
| 135 | 20 | } | 
| 136 |  |  | 
| 137 | 15 | Status MemTableWriter::_flush_memtable_async() { | 
| 138 | 15 |     DCHECK(_flush_token != nullptr); | 
| 139 | 15 |     std::shared_ptr<MemTable> memtable; | 
| 140 | 15 |     { | 
| 141 | 15 |         std::lock_guard<std::mutex> l(_mem_table_ptr_lock); | 
| 142 | 15 |         memtable = _mem_table; | 
| 143 | 15 |         _mem_table = nullptr; | 
| 144 | 15 |     } | 
| 145 | 15 |     { | 
| 146 | 15 |         std::lock_guard<std::mutex> l(_mem_table_ptr_lock); | 
| 147 | 15 |         memtable->update_mem_type(MemType::WRITE_FINISHED); | 
| 148 | 15 |         _freezed_mem_tables.push_back(memtable); | 
| 149 | 15 |     } | 
| 150 | 15 |     return _flush_token->submit(memtable); | 
| 151 | 15 | } | 
| 152 |  |  | 
| 153 | 0 | Status MemTableWriter::flush_async() { | 
| 154 | 0 |     std::lock_guard<std::mutex> l(_lock); | 
| 155 |  |     // In order to avoid repeated ATTACH, use SWITCH here. have two calling paths: | 
| 156 |  |     // 1. call by local, from `VTabletWriterV2::_write_memtable`, has been ATTACH Load memory tracker | 
| 157 |  |     // into thread context, ATTACH cannot be repeated here. | 
| 158 |  |     // 2. call by remote, from `LoadChannelMgr::_get_load_channel`, no ATTACH because LoadChannelMgr | 
| 159 |  |     // not know Load context. | 
| 160 | 0 |     SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_query_thread_context.query_mem_tracker); | Line | Count | Source |  | 76 | 0 |     auto VARNAME_LINENUM(scoped_tls_stmtl) = doris::ScopedInitThreadContext() | 
 | 
| 161 | 0 |     if (!_is_init || _is_closed) {  Branch (161:9): [True: 0, False: 0]
  Branch (161:22): [True: 0, False: 0]
 | 
| 162 |  |         // This writer is uninitialized or closed before flushing, do nothing. | 
| 163 |  |         // We return OK instead of NOT_INITIALIZED or ALREADY_CLOSED. | 
| 164 |  |         // Because this method maybe called when trying to reduce mem consumption, | 
| 165 |  |         // and at that time, the writer may not be initialized yet and that is a normal case. | 
| 166 | 0 |         return Status::OK(); | 
| 167 | 0 |     } | 
| 168 |  |  | 
| 169 | 0 |     if (_is_cancelled) {  Branch (169:9): [True: 0, False: 0]
 | 
| 170 | 0 |         return _cancel_status; | 
| 171 | 0 |     } | 
| 172 |  |  | 
| 173 | 0 |     VLOG_NOTICE << "flush memtable to reduce mem consumption. memtable size: " | Line | Count | Source |  | 42 | 0 | #define VLOG_NOTICE VLOG(3) | 
 | 
| 174 | 0 |                 << _mem_table->memory_usage() << ", tablet: " << _req.tablet_id | 
| 175 | 0 |                 << ", load id: " << print_id(_req.load_id); | 
| 176 | 0 |     auto s = _flush_memtable_async(); | 
| 177 | 0 |     _reset_mem_table(); | 
| 178 | 0 |     return s; | 
| 179 | 0 | } | 
| 180 |  |  | 
| 181 | 9 | Status MemTableWriter::wait_flush() { | 
| 182 | 9 |     { | 
| 183 | 9 |         std::lock_guard<std::mutex> l(_lock); | 
| 184 | 9 |         if (!_is_init || _is_closed) {  Branch (184:13): [True: 0, False: 9]
  Branch (184:26): [True: 9, False: 0]
 | 
| 185 |  |             // return OK instead of NOT_INITIALIZED or ALREADY_CLOSED for same reason | 
| 186 |  |             // as described in flush_async() | 
| 187 | 9 |             return Status::OK(); | 
| 188 | 9 |         } | 
| 189 | 0 |         if (_is_cancelled) {  Branch (189:13): [True: 0, False: 0]
 | 
| 190 | 0 |             return _cancel_status; | 
| 191 | 0 |         } | 
| 192 | 0 |     } | 
| 193 | 0 |     SCOPED_RAW_TIMER(&_wait_flush_time_ns); | Line | Count | Source |  | 77 | 0 |     doris::ScopedRawTimer<doris::MonotonicStopWatch, int64_t> MACRO_CONCAT(SCOPED_RAW_TIMER, \ | Line | Count | Source |  | 52 | 0 | #define MACRO_CONCAT(x, y) CONCAT_IMPL(x, y) | Line | Count | Source |  | 51 | 0 | #define CONCAT_IMPL(x, y) x##y | 
 | 
 |  | 78 | 0 |                                                                            __COUNTER__)(c) | 
 | 
| 194 | 0 |     RETURN_IF_ERROR(_flush_token->wait()); | Line | Count | Source |  | 637 | 0 |     do {                                \ |  | 638 | 0 |         Status _status_ = (stmt);       \ |  | 639 | 0 |         if (UNLIKELY(!_status_.ok())) { \| Line | Count | Source |  | 36 | 0 | #define UNLIKELY(expr) __builtin_expect(!!(expr), 0)   Branch (36:24): [True: 0, False: 0]
 | 
 |  | 640 | 0 |             return _status_;            \ |  | 641 | 0 |         }                               \ |  | 642 | 0 |     } while (false)   Branch (642:14): [Folded - Ignored]
 | 
 | 
| 195 | 0 |     return Status::OK(); | 
| 196 | 0 | } | 
| 197 |  |  | 
| 198 | 15 | void MemTableWriter::_reset_mem_table() { | 
| 199 | 15 |     { | 
| 200 | 15 |         std::lock_guard<std::mutex> l(_mem_table_ptr_lock); | 
| 201 | 15 |         _mem_table.reset(new MemTable(_req.tablet_id, _tablet_schema, _req.slots, _req.tuple_desc, | 
| 202 | 15 |                                       _unique_key_mow, _partial_update_info.get())); | 
| 203 | 15 |     } | 
| 204 |  |  | 
| 205 | 15 |     _segment_num++; | 
| 206 | 15 | } | 
| 207 |  |  | 
| 208 | 15 | Status MemTableWriter::close() { | 
| 209 | 15 |     _lock_watch.start(); | 
| 210 | 15 |     std::lock_guard<std::mutex> l(_lock); | 
| 211 | 15 |     _lock_watch.stop(); | 
| 212 | 15 |     if (_is_cancelled) {  Branch (212:9): [True: 0, False: 15]
 | 
| 213 | 0 |         return _cancel_status; | 
| 214 | 0 |     } | 
| 215 | 15 |     if (!_is_init) {  Branch (215:9): [True: 0, False: 15]
 | 
| 216 | 0 |         return Status::Error<NOT_INITIALIZED>("delta segment writer has not been initialized"); | 
| 217 | 0 |     } | 
| 218 | 15 |     if (_is_closed) {  Branch (218:9): [True: 0, False: 15]
 | 
| 219 | 0 |         LOG(WARNING) << "close after closed tablet_id=" << _req.tablet_id | 
| 220 | 0 |                      << " load_id=" << _req.load_id; | 
| 221 | 0 |         return Status::OK(); | 
| 222 | 0 |     } | 
| 223 |  |  | 
| 224 | 15 |     auto s = _flush_memtable_async(); | 
| 225 | 15 |     { | 
| 226 | 15 |         std::lock_guard<std::mutex> l(_mem_table_ptr_lock); | 
| 227 | 15 |         _mem_table.reset(); | 
| 228 | 15 |     } | 
| 229 | 15 |     _is_closed = true; | 
| 230 | 15 |     if (UNLIKELY(!s.ok())) {| Line | Count | Source |  | 36 | 15 | #define UNLIKELY(expr) __builtin_expect(!!(expr), 0)   Branch (36:24): [True: 0, False: 15]
 | 
 | 
| 231 | 0 |         return s; | 
| 232 | 15 |     } else { | 
| 233 | 15 |         return Status::OK(); | 
| 234 | 15 |     } | 
| 235 | 15 | } | 
| 236 |  |  | 
| 237 | 15 | Status MemTableWriter::_do_close_wait() { | 
| 238 | 15 |     SCOPED_RAW_TIMER(&_close_wait_time_ns); | Line | Count | Source |  | 77 | 15 |     doris::ScopedRawTimer<doris::MonotonicStopWatch, int64_t> MACRO_CONCAT(SCOPED_RAW_TIMER, \ | Line | Count | Source |  | 52 | 15 | #define MACRO_CONCAT(x, y) CONCAT_IMPL(x, y) | Line | Count | Source |  | 51 | 15 | #define CONCAT_IMPL(x, y) x##y | 
 | 
 |  | 78 | 15 |                                                                            __COUNTER__)(c) | 
 | 
| 239 | 15 |     std::lock_guard<std::mutex> l(_lock); | 
| 240 | 15 |     DCHECK(_is_init) | 
| 241 | 0 |             << "delta writer is supposed be to initialized before close_wait() being called"; | 
| 242 |  |  | 
| 243 | 15 |     if (_is_cancelled) {  Branch (243:9): [True: 0, False: 15]
 | 
| 244 | 0 |         return _cancel_status; | 
| 245 | 0 |     } | 
| 246 |  |  | 
| 247 | 15 |     Status st; | 
| 248 |  |     // return error if previous flush failed | 
| 249 | 15 |     { | 
| 250 | 15 |         SCOPED_RAW_TIMER(&_wait_flush_time_ns); | Line | Count | Source |  | 77 | 15 |     doris::ScopedRawTimer<doris::MonotonicStopWatch, int64_t> MACRO_CONCAT(SCOPED_RAW_TIMER, \ | Line | Count | Source |  | 52 | 15 | #define MACRO_CONCAT(x, y) CONCAT_IMPL(x, y) | Line | Count | Source |  | 51 | 15 | #define CONCAT_IMPL(x, y) x##y | 
 | 
 |  | 78 | 15 |                                                                            __COUNTER__)(c) | 
 | 
| 251 | 15 |         st = _flush_token->wait(); | 
| 252 | 15 |     } | 
| 253 | 15 |     if (UNLIKELY(!st.ok())) {| Line | Count | Source |  | 36 | 15 | #define UNLIKELY(expr) __builtin_expect(!!(expr), 0)   Branch (36:24): [True: 0, False: 15]
 | 
 | 
| 254 | 0 |         LOG(WARNING) << "previous flush failed tablet " << _req.tablet_id; | 
| 255 | 0 |         return st; | 
| 256 | 0 |     } | 
| 257 |  |  | 
| 258 | 15 |     if (_rowset_writer->num_rows() + _flush_token->memtable_stat().merged_rows !=   Branch (258:9): [True: 0, False: 15]
 | 
| 259 | 15 |         _total_received_rows) { | 
| 260 | 0 |         LOG(WARNING) << "the rows number written doesn't match, rowset num rows written to file: " | 
| 261 | 0 |                      << _rowset_writer->num_rows() | 
| 262 | 0 |                      << ", merged_rows: " << _flush_token->memtable_stat().merged_rows | 
| 263 | 0 |                      << ", total received rows: " << _total_received_rows; | 
| 264 | 0 |         return Status::InternalError("rows number written by delta writer dosen't match"); | 
| 265 | 0 |     } | 
| 266 |  |  | 
| 267 |  |     // const FlushStatistic& stat = _flush_token->get_stats(); | 
| 268 |  |     // print slow log if wait more than 1s | 
| 269 |  |     /*if (_wait_flush_timer->elapsed_time() > 1000UL * 1000 * 1000) { | 
| 270 |  |         LOG(INFO) << "close delta writer for tablet: " << req.tablet_id | 
| 271 |  |                   << ", load id: " << print_id(_req.load_id) << ", wait close for " | 
| 272 |  |                   << _wait_flush_timer->elapsed_time() << "(ns), stats: " << stat; | 
| 273 |  |     }*/ | 
| 274 |  |  | 
| 275 | 15 |     return Status::OK(); | 
| 276 | 15 | } | 
| 277 |  |  | 
| 278 | 15 | void MemTableWriter::_update_profile(RuntimeProfile* profile) { | 
| 279 |  |     // NOTE: MemTableWriter may be accessed when profile is out of scope, in MemTableMemoryLimiter. | 
| 280 |  |     // To avoid accessing dangling pointers, we cannot make profile as a member of MemTableWriter. | 
| 281 | 15 |     auto child = | 
| 282 | 15 |             profile->create_child(fmt::format("MemTableWriter {}", _req.tablet_id), true, true); | 
| 283 | 15 |     auto lock_timer = ADD_TIMER(child, "LockTime"); | Line | Count | Source |  | 60 | 15 | #define ADD_TIMER(profile, name) (profile)->add_counter(name, TUnit::TIME_NS) | 
 | 
| 284 | 15 |     auto sort_timer = ADD_TIMER(child, "MemTableSortTime"); | Line | Count | Source |  | 60 | 15 | #define ADD_TIMER(profile, name) (profile)->add_counter(name, TUnit::TIME_NS) | 
 | 
| 285 | 15 |     auto agg_timer = ADD_TIMER(child, "MemTableAggTime"); | Line | Count | Source |  | 60 | 15 | #define ADD_TIMER(profile, name) (profile)->add_counter(name, TUnit::TIME_NS) | 
 | 
| 286 | 15 |     auto memtable_duration_timer = ADD_TIMER(child, "MemTableDurationTime"); | Line | Count | Source |  | 60 | 15 | #define ADD_TIMER(profile, name) (profile)->add_counter(name, TUnit::TIME_NS) | 
 | 
| 287 | 15 |     auto segment_writer_timer = ADD_TIMER(child, "SegmentWriterTime"); | Line | Count | Source |  | 60 | 15 | #define ADD_TIMER(profile, name) (profile)->add_counter(name, TUnit::TIME_NS) | 
 | 
| 288 | 15 |     auto wait_flush_timer = ADD_TIMER(child, "MemTableWaitFlushTime"); | Line | Count | Source |  | 60 | 15 | #define ADD_TIMER(profile, name) (profile)->add_counter(name, TUnit::TIME_NS) | 
 | 
| 289 | 15 |     auto put_into_output_timer = ADD_TIMER(child, "MemTablePutIntoOutputTime"); | Line | Count | Source |  | 60 | 15 | #define ADD_TIMER(profile, name) (profile)->add_counter(name, TUnit::TIME_NS) | 
 | 
| 290 | 15 |     auto delete_bitmap_timer = ADD_TIMER(child, "DeleteBitmapTime"); | Line | Count | Source |  | 60 | 15 | #define ADD_TIMER(profile, name) (profile)->add_counter(name, TUnit::TIME_NS) | 
 | 
| 291 | 15 |     auto close_wait_timer = ADD_TIMER(child, "CloseWaitTime"); | Line | Count | Source |  | 60 | 15 | #define ADD_TIMER(profile, name) (profile)->add_counter(name, TUnit::TIME_NS) | 
 | 
| 292 | 15 |     auto sort_times = ADD_COUNTER(child, "MemTableSortTimes", TUnit::UNIT); | Line | Count | Source |  | 57 | 15 | #define ADD_COUNTER(profile, name, type) (profile)->add_counter(name, type) | 
 | 
| 293 | 15 |     auto agg_times = ADD_COUNTER(child, "MemTableAggTimes", TUnit::UNIT); | Line | Count | Source |  | 57 | 15 | #define ADD_COUNTER(profile, name, type) (profile)->add_counter(name, type) | 
 | 
| 294 | 15 |     auto segment_num = ADD_COUNTER(child, "SegmentNum", TUnit::UNIT); | Line | Count | Source |  | 57 | 15 | #define ADD_COUNTER(profile, name, type) (profile)->add_counter(name, type) | 
 | 
| 295 | 15 |     auto raw_rows_num = ADD_COUNTER(child, "RawRowNum", TUnit::UNIT); | Line | Count | Source |  | 57 | 15 | #define ADD_COUNTER(profile, name, type) (profile)->add_counter(name, type) | 
 | 
| 296 | 15 |     auto merged_rows_num = ADD_COUNTER(child, "MergedRowNum", TUnit::UNIT); | Line | Count | Source |  | 57 | 15 | #define ADD_COUNTER(profile, name, type) (profile)->add_counter(name, type) | 
 | 
| 297 |  |  | 
| 298 | 15 |     COUNTER_UPDATE(lock_timer, _lock_watch.elapsed_time()); | Line | Count | Source |  | 82 | 15 | #define COUNTER_UPDATE(c, v) (c)->update(v) | 
 | 
| 299 | 15 |     COUNTER_SET(delete_bitmap_timer, _rowset_writer->delete_bitmap_ns()); | Line | Count | Source |  | 83 | 15 | #define COUNTER_SET(c, v) (c)->set(v) | 
 | 
| 300 | 15 |     COUNTER_SET(segment_writer_timer, _rowset_writer->segment_writer_ns()); | Line | Count | Source |  | 83 | 15 | #define COUNTER_SET(c, v) (c)->set(v) | 
 | 
| 301 | 15 |     COUNTER_SET(wait_flush_timer, _wait_flush_time_ns); | Line | Count | Source |  | 83 | 15 | #define COUNTER_SET(c, v) (c)->set(v) | 
 | 
| 302 | 15 |     COUNTER_SET(close_wait_timer, _close_wait_time_ns); | Line | Count | Source |  | 83 | 15 | #define COUNTER_SET(c, v) (c)->set(v) | 
 | 
| 303 | 15 |     COUNTER_SET(segment_num, _segment_num); | Line | Count | Source |  | 83 | 15 | #define COUNTER_SET(c, v) (c)->set(v) | 
 | 
| 304 | 15 |     const auto& memtable_stat = _flush_token->memtable_stat(); | 
| 305 | 15 |     COUNTER_SET(sort_timer, memtable_stat.sort_ns); | Line | Count | Source |  | 83 | 15 | #define COUNTER_SET(c, v) (c)->set(v) | 
 | 
| 306 | 15 |     COUNTER_SET(agg_timer, memtable_stat.agg_ns); | Line | Count | Source |  | 83 | 15 | #define COUNTER_SET(c, v) (c)->set(v) | 
 | 
| 307 | 15 |     COUNTER_SET(memtable_duration_timer, memtable_stat.duration_ns); | Line | Count | Source |  | 83 | 15 | #define COUNTER_SET(c, v) (c)->set(v) | 
 | 
| 308 | 15 |     COUNTER_SET(put_into_output_timer, memtable_stat.put_into_output_ns); | Line | Count | Source |  | 83 | 15 | #define COUNTER_SET(c, v) (c)->set(v) | 
 | 
| 309 | 15 |     COUNTER_SET(sort_times, memtable_stat.sort_times); | Line | Count | Source |  | 83 | 15 | #define COUNTER_SET(c, v) (c)->set(v) | 
 | 
| 310 | 15 |     COUNTER_SET(agg_times, memtable_stat.agg_times); | Line | Count | Source |  | 83 | 15 | #define COUNTER_SET(c, v) (c)->set(v) | 
 | 
| 311 | 15 |     COUNTER_SET(raw_rows_num, memtable_stat.raw_rows); | Line | Count | Source |  | 83 | 15 | #define COUNTER_SET(c, v) (c)->set(v) | 
 | 
| 312 | 15 |     COUNTER_SET(merged_rows_num, memtable_stat.merged_rows); | Line | Count | Source |  | 83 | 15 | #define COUNTER_SET(c, v) (c)->set(v) | 
 | 
| 313 | 15 | } | 
| 314 |  |  | 
| 315 | 15 | Status MemTableWriter::cancel() { | 
| 316 | 15 |     return cancel_with_status(Status::Cancelled("already cancelled")); | 
| 317 | 15 | } | 
| 318 |  |  | 
| 319 | 15 | Status MemTableWriter::cancel_with_status(const Status& st) { | 
| 320 | 15 |     std::lock_guard<std::mutex> l(_lock); | 
| 321 | 15 |     if (_is_cancelled) {  Branch (321:9): [True: 0, False: 15]
 | 
| 322 | 0 |         return Status::OK(); | 
| 323 | 0 |     } | 
| 324 | 15 |     { | 
| 325 | 15 |         std::lock_guard<std::mutex> l(_mem_table_ptr_lock); | 
| 326 | 15 |         _mem_table.reset(); | 
| 327 | 15 |     } | 
| 328 | 15 |     if (_flush_token != nullptr) {  Branch (328:9): [True: 15, False: 0]
 | 
| 329 |  |         // cancel and wait all memtables in flush queue to be finished | 
| 330 | 15 |         _flush_token->cancel(); | 
| 331 | 15 |     } | 
| 332 | 15 |     _is_cancelled = true; | 
| 333 | 15 |     _cancel_status = st; | 
| 334 | 15 |     return Status::OK(); | 
| 335 | 15 | } | 
| 336 |  |  | 
| 337 | 15 | const FlushStatistic& MemTableWriter::get_flush_token_stats() { | 
| 338 | 15 |     return _flush_token->get_stats(); | 
| 339 | 15 | } | 
| 340 |  |  | 
| 341 | 20 | uint64_t MemTableWriter::flush_running_count() const { | 
| 342 | 20 |     return _flush_token == nullptr ? 0 : _flush_token->get_stats().flush_running_count.load();   Branch (342:12): [True: 0, False: 20]
 | 
| 343 | 20 | } | 
| 344 |  |  | 
| 345 | 0 | int64_t MemTableWriter::mem_consumption(MemType mem) { | 
| 346 | 0 |     if (!_is_init) {  Branch (346:9): [True: 0, False: 0]
 | 
| 347 |  |         // This method may be called before this writer is initialized. | 
| 348 |  |         // So _flush_token may be null. | 
| 349 | 0 |         return 0; | 
| 350 | 0 |     } | 
| 351 | 0 |     int64_t mem_usage = 0; | 
| 352 | 0 |     { | 
| 353 | 0 |         std::lock_guard<std::mutex> l(_mem_table_ptr_lock); | 
| 354 | 0 |         for (const auto& mem_table : _freezed_mem_tables) {  Branch (354:36): [True: 0, False: 0]
 | 
| 355 | 0 |             auto mem_table_sptr = mem_table.lock(); | 
| 356 | 0 |             if (mem_table_sptr != nullptr && mem_table_sptr->get_mem_type() == mem) {  Branch (356:17): [True: 0, False: 0]
  Branch (356:46): [True: 0, False: 0]
 | 
| 357 | 0 |                 mem_usage += mem_table_sptr->memory_usage(); | 
| 358 | 0 |             } | 
| 359 | 0 |         } | 
| 360 | 0 |     } | 
| 361 | 0 |     return mem_usage; | 
| 362 | 0 | } | 
| 363 |  |  | 
| 364 | 0 | int64_t MemTableWriter::active_memtable_mem_consumption() { | 
| 365 | 0 |     std::lock_guard<std::mutex> l(_mem_table_ptr_lock); | 
| 366 | 0 |     return _mem_table != nullptr ? _mem_table->memory_usage() : 0;   Branch (366:12): [True: 0, False: 0]
 | 
| 367 | 0 | } | 
| 368 |  |  | 
| 369 |  | } // namespace doris |