/root/doris/be/src/olap/rowset_builder.cpp
| Line | Count | Source (jump to first uncovered line) | 
| 1 |  | // Licensed to the Apache Software Foundation (ASF) under one | 
| 2 |  | // or more contributor license agreements.  See the NOTICE file | 
| 3 |  | // distributed with this work for additional information | 
| 4 |  | // regarding copyright ownership.  The ASF licenses this file | 
| 5 |  | // to you under the Apache License, Version 2.0 (the | 
| 6 |  | // "License"); you may not use this file except in compliance | 
| 7 |  | // with the License.  You may obtain a copy of the License at | 
| 8 |  | // | 
| 9 |  | //   http://www.apache.org/licenses/LICENSE-2.0 | 
| 10 |  | // | 
| 11 |  | // Unless required by applicable law or agreed to in writing, | 
| 12 |  | // software distributed under the License is distributed on an | 
| 13 |  | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | 
| 14 |  | // KIND, either express or implied.  See the License for the | 
| 15 |  | // specific language governing permissions and limitations | 
| 16 |  | // under the License. | 
| 17 |  |  | 
| 18 |  | #include "olap/rowset_builder.h" | 
| 19 |  |  | 
| 20 |  | #include <brpc/controller.h> | 
| 21 |  | #include <fmt/format.h> | 
| 22 |  |  | 
| 23 |  | #include <filesystem> | 
| 24 |  | #include <memory> | 
| 25 |  | #include <ostream> | 
| 26 |  | #include <string> | 
| 27 |  | #include <utility> | 
| 28 |  |  | 
| 29 |  | // IWYU pragma: no_include <opentelemetry/common/threadlocal.h> | 
| 30 |  | #include "common/compiler_util.h" // IWYU pragma: keep | 
| 31 |  | #include "common/config.h" | 
| 32 |  | #include "common/status.h" | 
| 33 |  | #include "exec/tablet_info.h" | 
| 34 |  | #include "gutil/strings/numbers.h" | 
| 35 |  | #include "io/fs/file_system.h" | 
| 36 |  | #include "io/fs/file_writer.h" // IWYU pragma: keep | 
| 37 |  | #include "olap/calc_delete_bitmap_executor.h" | 
| 38 |  | #include "olap/olap_define.h" | 
| 39 |  | #include "olap/partial_update_info.h" | 
| 40 |  | #include "olap/rowset/beta_rowset.h" | 
| 41 |  | #include "olap/rowset/beta_rowset_writer.h" | 
| 42 |  | #include "olap/rowset/pending_rowset_helper.h" | 
| 43 |  | #include "olap/rowset/rowset_meta.h" | 
| 44 |  | #include "olap/rowset/rowset_meta_manager.h" | 
| 45 |  | #include "olap/rowset/rowset_writer.h" | 
| 46 |  | #include "olap/rowset/rowset_writer_context.h" | 
| 47 |  | #include "olap/schema_change.h" | 
| 48 |  | #include "olap/storage_engine.h" | 
| 49 |  | #include "olap/tablet_manager.h" | 
| 50 |  | #include "olap/tablet_meta.h" | 
| 51 |  | #include "olap/tablet_schema.h" | 
| 52 |  | #include "olap/txn_manager.h" | 
| 53 |  | #include "runtime/memory/global_memory_arbitrator.h" | 
| 54 |  | #include "util/brpc_client_cache.h" | 
| 55 |  | #include "util/debug_points.h" | 
| 56 |  | #include "util/mem_info.h" | 
| 57 |  | #include "util/ref_count_closure.h" | 
| 58 |  | #include "util/stopwatch.hpp" | 
| 59 |  | #include "util/time.h" | 
| 60 |  | #include "util/trace.h" | 
| 61 |  | #include "vec/common/schema_util.h" | 
| 62 |  | #include "vec/core/block.h" | 
| 63 |  |  | 
| 64 |  | namespace doris { | 
| 65 |  | using namespace ErrorCode; | 
| 66 |  |  | 
| 67 |  | BaseRowsetBuilder::BaseRowsetBuilder(const WriteRequest& req, RuntimeProfile* profile) | 
| 68 | 29 |         : _req(req), _tablet_schema(std::make_shared<TabletSchema>()) { | 
| 69 | 29 |     _init_profile(profile); | 
| 70 | 29 | } | 
| 71 |  |  | 
| 72 |  | RowsetBuilder::RowsetBuilder(StorageEngine& engine, const WriteRequest& req, | 
| 73 |  |                              RuntimeProfile* profile) | 
| 74 | 29 |         : BaseRowsetBuilder(req, profile), _engine(engine) {} | 
| 75 |  |  | 
| 76 | 29 | void BaseRowsetBuilder::_init_profile(RuntimeProfile* profile) { | 
| 77 | 29 |     _profile = profile->create_child(fmt::format("RowsetBuilder {}", _req.tablet_id), true, true); | 
| 78 | 29 |     _build_rowset_timer = ADD_TIMER(_profile, "BuildRowsetTime"); | Line | Count | Source |  | 60 | 29 | #define ADD_TIMER(profile, name) (profile)->add_counter(name, TUnit::TIME_NS) | 
 | 
| 79 | 29 |     _submit_delete_bitmap_timer = ADD_TIMER(_profile, "DeleteBitmapSubmitTime"); | Line | Count | Source |  | 60 | 29 | #define ADD_TIMER(profile, name) (profile)->add_counter(name, TUnit::TIME_NS) | 
 | 
| 80 | 29 |     _wait_delete_bitmap_timer = ADD_TIMER(_profile, "DeleteBitmapWaitTime"); | Line | Count | Source |  | 60 | 29 | #define ADD_TIMER(profile, name) (profile)->add_counter(name, TUnit::TIME_NS) | 
 | 
| 81 | 29 | } | 
| 82 |  |  | 
| 83 | 0 | void RowsetBuilder::_init_profile(RuntimeProfile* profile) { | 
| 84 | 0 |     BaseRowsetBuilder::_init_profile(profile); | 
| 85 | 0 |     _commit_txn_timer = ADD_TIMER(_profile, "CommitTxnTime"); | Line | Count | Source |  | 60 | 0 | #define ADD_TIMER(profile, name) (profile)->add_counter(name, TUnit::TIME_NS) | 
 | 
| 86 | 0 | } | 
| 87 |  |  | 
| 88 | 29 | BaseRowsetBuilder::~BaseRowsetBuilder() { | 
| 89 | 29 |     if (!_is_init) {  Branch (89:9): [True: 1, False: 28]
 | 
| 90 | 1 |         return; | 
| 91 | 1 |     } | 
| 92 |  |  | 
| 93 | 28 |     if (_calc_delete_bitmap_token != nullptr) {  Branch (93:9): [True: 28, False: 0]
 | 
| 94 | 28 |         _calc_delete_bitmap_token->cancel(); | 
| 95 | 28 |     } | 
| 96 | 28 | } | 
| 97 |  |  | 
| 98 | 29 | RowsetBuilder::~RowsetBuilder() { | 
| 99 | 29 |     if (_is_init && !_is_committed) {  Branch (99:9): [True: 28, False: 1]
  Branch (99:21): [True: 5, False: 23]
 | 
| 100 | 5 |         _garbage_collection(); | 
| 101 | 5 |     } | 
| 102 | 29 | } | 
| 103 |  |  | 
| 104 | 236 | Tablet* RowsetBuilder::tablet() { | 
| 105 | 236 |     return static_cast<Tablet*>(_tablet.get()); | 
| 106 | 236 | } | 
| 107 |  |  | 
| 108 | 0 | TabletSharedPtr RowsetBuilder::tablet_sptr() { | 
| 109 | 0 |     return std::static_pointer_cast<Tablet>(_tablet); | 
| 110 | 0 | } | 
| 111 |  |  | 
| 112 | 5 | void RowsetBuilder::_garbage_collection() { | 
| 113 | 5 |     Status rollback_status; | 
| 114 | 5 |     TxnManager* txn_mgr = _engine.txn_manager(); | 
| 115 | 5 |     if (tablet() != nullptr) {  Branch (115:9): [True: 5, False: 0]
 | 
| 116 | 5 |         rollback_status = txn_mgr->rollback_txn(_req.partition_id, *tablet(), _req.txn_id); | 
| 117 | 5 |     } | 
| 118 |  |     // has to check rollback status, because the rowset maybe committed in this thread and | 
| 119 |  |     // published in another thread, then rollback will fail. | 
| 120 |  |     // when rollback failed should not delete rowset | 
| 121 | 5 |     if (rollback_status.ok()) {  Branch (121:9): [True: 5, False: 0]
 | 
| 122 | 5 |         _engine.add_unused_rowset(_rowset); | 
| 123 | 5 |     } | 
| 124 | 5 | } | 
| 125 |  |  | 
| 126 | 3 | Status BaseRowsetBuilder::init_mow_context(std::shared_ptr<MowContext>& mow_context) { | 
| 127 | 3 |     std::lock_guard<std::shared_mutex> lck(tablet()->get_header_lock()); | 
| 128 | 3 |     _max_version_in_flush_phase = tablet()->max_version_unlocked(); | 
| 129 | 3 |     std::vector<RowsetSharedPtr> rowset_ptrs; | 
| 130 |  |     // tablet is under alter process. The delete bitmap will be calculated after conversion. | 
| 131 | 3 |     if (tablet()->tablet_state() == TABLET_NOTREADY) {  Branch (131:9): [True: 0, False: 3]
 | 
| 132 |  |         // Disable 'partial_update' when the tablet is undergoing a 'schema changing process' | 
| 133 | 0 |         if (_req.table_schema_param->is_partial_update()) {  Branch (133:13): [True: 0, False: 0]
 | 
| 134 | 0 |             return Status::InternalError( | 
| 135 | 0 |                     "Unable to do 'partial_update' when " | 
| 136 | 0 |                     "the tablet is undergoing a 'schema changing process'"); | 
| 137 | 0 |         } | 
| 138 | 0 |         _rowset_ids->clear(); | 
| 139 | 3 |     } else { | 
| 140 | 3 |         RETURN_IF_ERROR( | Line | Count | Source |  | 637 | 3 |     do {                                \ |  | 638 | 3 |         Status _status_ = (stmt);       \ |  | 639 | 3 |         if (UNLIKELY(!_status_.ok())) { \| Line | Count | Source |  | 36 | 3 | #define UNLIKELY(expr) __builtin_expect(!!(expr), 0)   Branch (36:24): [True: 0, False: 3]
 | 
 |  | 640 | 0 |             return _status_;            \ |  | 641 | 0 |         }                               \ |  | 642 | 3 |     } while (false)   Branch (642:14): [Folded - Ignored]
 | 
 | 
| 141 | 3 |                 tablet()->get_all_rs_id_unlocked(_max_version_in_flush_phase, _rowset_ids.get())); | 
| 142 | 3 |         rowset_ptrs = tablet()->get_rowset_by_ids(_rowset_ids.get()); | 
| 143 | 3 |     } | 
| 144 | 3 |     _delete_bitmap = std::make_shared<DeleteBitmap>(tablet()->tablet_id()); | 
| 145 | 3 |     mow_context = std::make_shared<MowContext>(_max_version_in_flush_phase, _req.txn_id, | 
| 146 | 3 |                                                _rowset_ids, rowset_ptrs, _delete_bitmap); | 
| 147 | 3 |     return Status::OK(); | 
| 148 | 3 | } | 
| 149 |  |  | 
| 150 | 28 | Status RowsetBuilder::check_tablet_version_count() { | 
| 151 | 28 |     bool injection = false; | 
| 152 | 28 |     DBUG_EXECUTE_IF("RowsetBuilder.check_tablet_version_count.too_many_version",| Line | Count | Source |  | 37 | 28 |     if (UNLIKELY(config::enable_debug_points)) {                              \| Line | Count | Source |  | 36 | 28 | #define UNLIKELY(expr) __builtin_expect(!!(expr), 0)   Branch (36:24): [True: 0, False: 28]
 | 
 |  | 38 | 0 |         auto dp = DebugPoints::instance()->get_debug_point(debug_point_name); \ |  | 39 | 0 |         if (dp) {                                                             \  Branch (39:13): [True: 0, False: 0]
 |  | 40 | 0 |             [[maybe_unused]] auto DP_NAME = debug_point_name;                 \ |  | 41 | 0 |             { code; }                                                         \ |  | 42 | 0 |         }                                                                     \ |  | 43 | 0 |     } | 
 | 
| 153 | 28 |                     { injection = true; }); | 
| 154 | 28 |     int32_t max_version_config = _tablet->max_version_config(); | 
| 155 | 28 |     if (injection) {  Branch (155:9): [True: 0, False: 28]
 | 
| 156 |  |         // do not return if injection | 
| 157 | 28 |     } else if (!_tablet->exceed_version_limit(max_version_config - 100) ||   Branch (157:16): [True: 28, False: 0]
 | 
| 158 | 28 |                GlobalMemoryArbitrator::is_exceed_soft_mem_limit(GB_EXCHANGE_BYTE)) {  Branch (158:16): [True: 0, False: 0]
 | 
| 159 | 28 |         return Status::OK(); | 
| 160 | 28 |     } | 
| 161 |  |     //trigger compaction | 
| 162 | 0 |     auto st = _engine.submit_compaction_task(tablet_sptr(), CompactionType::CUMULATIVE_COMPACTION, | 
| 163 | 0 |                                              true); | 
| 164 | 0 |     if (!st.ok()) [[unlikely]] {  Branch (164:9): [True: 0, False: 0]
 | 
| 165 | 0 |         LOG(WARNING) << "failed to trigger compaction, tablet_id=" << _tablet->tablet_id() << " : " | 
| 166 | 0 |                      << st; | 
| 167 | 0 |     } | 
| 168 | 0 |     int version_count = tablet()->version_count(); | 
| 169 | 0 |     DBUG_EXECUTE_IF("RowsetBuilder.check_tablet_version_count.too_many_version",| Line | Count | Source |  | 37 | 0 |     if (UNLIKELY(config::enable_debug_points)) {                              \| Line | Count | Source |  | 36 | 0 | #define UNLIKELY(expr) __builtin_expect(!!(expr), 0)   Branch (36:24): [True: 0, False: 0]
 | 
 |  | 38 | 0 |         auto dp = DebugPoints::instance()->get_debug_point(debug_point_name); \ |  | 39 | 0 |         if (dp) {                                                             \  Branch (39:13): [True: 0, False: 0]
 |  | 40 | 0 |             [[maybe_unused]] auto DP_NAME = debug_point_name;                 \ |  | 41 | 0 |             { code; }                                                         \ |  | 42 | 0 |         }                                                                     \ |  | 43 | 0 |     } | 
 | 
| 170 | 0 |                     { version_count = INT_MAX; }); | 
| 171 | 0 |     if (version_count > max_version_config) {  Branch (171:9): [True: 0, False: 0]
 | 
| 172 | 0 |         return Status::Error<TOO_MANY_VERSION>( | 
| 173 | 0 |                 "failed to init rowset builder. version count: {}, exceed limit: {}, " | 
| 174 | 0 |                 "tablet: {}. Please reduce the frequency of loading data or adjust the " | 
| 175 | 0 |                 "max_tablet_version_num or time_series_max_tablet_version_num in be.conf to a " | 
| 176 | 0 |                 "larger value.", | 
| 177 | 0 |                 version_count, max_version_config, _tablet->tablet_id()); | 
| 178 | 0 |     } | 
| 179 | 0 |     return Status::OK(); | 
| 180 | 0 | } | 
| 181 |  |  | 
| 182 | 28 | Status RowsetBuilder::prepare_txn() { | 
| 183 | 28 |     std::shared_lock base_migration_lock(tablet()->get_migration_lock(), std::defer_lock); | 
| 184 | 28 |     if (!base_migration_lock.try_lock_for(   Branch (184:9): [True: 0, False: 28]
 | 
| 185 | 28 |                 std::chrono::milliseconds(config::migration_lock_timeout_ms))) { | 
| 186 | 0 |         return Status::Error<TRY_LOCK_FAILED>("try_lock migration lock failed after {}ms", | 
| 187 | 0 |                                               config::migration_lock_timeout_ms); | 
| 188 | 0 |     } | 
| 189 | 28 |     std::lock_guard<std::mutex> push_lock(tablet()->get_push_lock()); | 
| 190 | 28 |     return _engine.txn_manager()->prepare_txn(_req.partition_id, *tablet(), _req.txn_id, | 
| 191 | 28 |                                               _req.load_id); | 
| 192 | 28 | } | 
| 193 |  |  | 
| 194 | 29 | Status RowsetBuilder::init() { | 
| 195 | 29 |     _tablet = DORIS_TRY(_engine.get_tablet(_req.tablet_id)); | Line | Count | Source |  | 716 | 29 |     ({                                           \ |  | 717 | 29 |         auto&& res = (stmt);                     \ |  | 718 | 29 |         using T = std::decay_t<decltype(res)>;   \ |  | 719 | 29 |         if (!res.has_value()) [[unlikely]] {     \  Branch (719:13): [True: 1, False: 28]
 |  | 720 | 1 |             return std::forward<T>(res).error(); \ |  | 721 | 1 |         }                                        \ |  | 722 | 29 |         std::forward<T>(res).value();            \ |  | 723 | 28 |     }); | 
 | 
| 196 | 28 |     std::shared_ptr<MowContext> mow_context; | 
| 197 | 28 |     if (_tablet->enable_unique_key_merge_on_write()) {  Branch (197:9): [True: 3, False: 25]
 | 
| 198 | 3 |         RETURN_IF_ERROR(init_mow_context(mow_context)); | Line | Count | Source |  | 637 | 3 |     do {                                \ |  | 638 | 3 |         Status _status_ = (stmt);       \ |  | 639 | 3 |         if (UNLIKELY(!_status_.ok())) { \| Line | Count | Source |  | 36 | 3 | #define UNLIKELY(expr) __builtin_expect(!!(expr), 0)   Branch (36:24): [True: 0, False: 3]
 | 
 |  | 640 | 0 |             return _status_;            \ |  | 641 | 0 |         }                               \ |  | 642 | 3 |     } while (false)   Branch (642:14): [Folded - Ignored]
 | 
 | 
| 199 | 3 |     } | 
| 200 |  |  | 
| 201 | 28 |     if (!config::disable_auto_compaction &&   Branch (201:9): [True: 28, False: 0]
 | 
| 202 | 28 |         !_tablet->tablet_meta()->tablet_schema()->disable_auto_compaction()) {  Branch (202:9): [True: 28, False: 0]
 | 
| 203 | 28 |         RETURN_IF_ERROR(check_tablet_version_count()); | Line | Count | Source |  | 637 | 28 |     do {                                \ |  | 638 | 28 |         Status _status_ = (stmt);       \ |  | 639 | 28 |         if (UNLIKELY(!_status_.ok())) { \| Line | Count | Source |  | 36 | 28 | #define UNLIKELY(expr) __builtin_expect(!!(expr), 0)   Branch (36:24): [True: 0, False: 28]
 | 
 |  | 640 | 0 |             return _status_;            \ |  | 641 | 0 |         }                               \ |  | 642 | 28 |     } while (false)   Branch (642:14): [Folded - Ignored]
 | 
 | 
| 204 | 28 |     } | 
| 205 |  |  | 
| 206 | 28 |     int version_count = tablet()->version_count() + tablet()->stale_version_count(); | 
| 207 | 28 |     if (tablet()->avg_rs_meta_serialize_size() * version_count >   Branch (207:9): [True: 0, False: 28]
 | 
| 208 | 28 |         config::tablet_meta_serialize_size_limit) { | 
| 209 | 0 |         return Status::Error<TOO_MANY_VERSION>( | 
| 210 | 0 |                 "failed to init rowset builder. meta serialize size : {}, exceed limit: {}, " | 
| 211 | 0 |                 "tablet: {}. Please reduce the frequency of loading data or adjust the " | 
| 212 | 0 |                 "max_tablet_version_num in be.conf to a larger value.", | 
| 213 | 0 |                 tablet()->avg_rs_meta_serialize_size() * version_count, | 
| 214 | 0 |                 config::tablet_meta_serialize_size_limit, _tablet->tablet_id()); | 
| 215 | 0 |     } | 
| 216 |  |  | 
| 217 | 28 |     RETURN_IF_ERROR(prepare_txn()); | Line | Count | Source |  | 637 | 28 |     do {                                \ |  | 638 | 28 |         Status _status_ = (stmt);       \ |  | 639 | 28 |         if (UNLIKELY(!_status_.ok())) { \| Line | Count | Source |  | 36 | 28 | #define UNLIKELY(expr) __builtin_expect(!!(expr), 0)   Branch (36:24): [True: 0, False: 28]
 | 
 |  | 640 | 0 |             return _status_;            \ |  | 641 | 0 |         }                               \ |  | 642 | 28 |     } while (false)   Branch (642:14): [Folded - Ignored]
 | 
 | 
| 218 |  |  | 
| 219 | 28 |     DBUG_EXECUTE_IF("BaseRowsetBuilder::init.check_partial_update_column_num", {| Line | Count | Source |  | 37 | 28 |     if (UNLIKELY(config::enable_debug_points)) {                              \| Line | Count | Source |  | 36 | 28 | #define UNLIKELY(expr) __builtin_expect(!!(expr), 0)   Branch (36:24): [True: 0, False: 28]
 | 
 |  | 38 | 0 |         auto dp = DebugPoints::instance()->get_debug_point(debug_point_name); \ |  | 39 | 0 |         if (dp) {                                                             \  Branch (39:13): [True: 0, False: 0]
 |  | 40 | 0 |             [[maybe_unused]] auto DP_NAME = debug_point_name;                 \ |  | 41 | 0 |             { code; }                                                         \  Branch (41:15): [True: 0, False: 0]
 |  | 42 | 0 |         }                                                                     \ |  | 43 | 0 |     } | 
 | 
| 220 | 28 |         if (_req.table_schema_param->partial_update_input_columns().size() != | 
| 221 | 28 |             dp->param<int>("column_num")) { | 
| 222 | 28 |             return Status::InternalError("partial update input column num wrong!"); | 
| 223 | 28 |         }; | 
| 224 | 28 |     }) | 
| 225 |  |     // build tablet schema in request level | 
| 226 | 28 |     RETURN_IF_ERROR(_build_current_tablet_schema(_req.index_id, _req.table_schema_param.get(), | Line | Count | Source |  | 637 | 28 |     do {                                \ |  | 638 | 28 |         Status _status_ = (stmt);       \ |  | 639 | 28 |         if (UNLIKELY(!_status_.ok())) { \| Line | Count | Source |  | 36 | 28 | #define UNLIKELY(expr) __builtin_expect(!!(expr), 0)   Branch (36:24): [True: 0, False: 28]
 | 
 |  | 640 | 0 |             return _status_;            \ |  | 641 | 0 |         }                               \ |  | 642 | 28 |     } while (false)   Branch (642:14): [Folded - Ignored]
 | 
 | 
| 227 | 28 |                                                  *_tablet->tablet_schema())); | 
| 228 | 28 |     RowsetWriterContext context; | 
| 229 | 28 |     context.txn_id = _req.txn_id; | 
| 230 | 28 |     context.load_id = _req.load_id; | 
| 231 | 28 |     context.rowset_state = PREPARED; | 
| 232 | 28 |     context.segments_overlap = OVERLAPPING; | 
| 233 | 28 |     context.tablet_schema = _tablet_schema; | 
| 234 | 28 |     context.newest_write_timestamp = UnixSeconds(); | 
| 235 | 28 |     context.tablet_id = _req.tablet_id; | 
| 236 | 28 |     context.index_id = _req.index_id; | 
| 237 | 28 |     context.tablet = _tablet; | 
| 238 | 28 |     context.enable_segcompaction = true; | 
| 239 | 28 |     context.write_type = DataWriteType::TYPE_DIRECT; | 
| 240 | 28 |     context.mow_context = mow_context; | 
| 241 | 28 |     context.write_file_cache = _req.write_file_cache; | 
| 242 | 28 |     context.partial_update_info = _partial_update_info; | 
| 243 | 28 |     _rowset_writer = DORIS_TRY(_tablet->create_rowset_writer(context, false)); | Line | Count | Source |  | 716 | 28 |     ({                                           \ |  | 717 | 28 |         auto&& res = (stmt);                     \ |  | 718 | 28 |         using T = std::decay_t<decltype(res)>;   \ |  | 719 | 28 |         if (!res.has_value()) [[unlikely]] {     \  Branch (719:13): [True: 0, False: 28]
 |  | 720 | 0 |             return std::forward<T>(res).error(); \ |  | 721 | 0 |         }                                        \ |  | 722 | 28 |         std::forward<T>(res).value();            \ |  | 723 | 28 |     }); | 
 | 
| 244 | 28 |     _pending_rs_guard = _engine.pending_local_rowsets().add(context.rowset_id); | 
| 245 |  |  | 
| 246 | 28 |     _calc_delete_bitmap_token = _engine.calc_delete_bitmap_executor()->create_token(); | 
| 247 |  |  | 
| 248 | 28 |     _is_init = true; | 
| 249 | 28 |     return Status::OK(); | 
| 250 | 28 | } | 
| 251 |  |  | 
| 252 | 23 | Status BaseRowsetBuilder::build_rowset() { | 
| 253 | 23 |     std::lock_guard<std::mutex> l(_lock); | 
| 254 | 23 |     DCHECK(_is_init) << "rowset builder is supposed be to initialized before " | 
| 255 | 0 |                         "build_rowset() being called"; | 
| 256 |  |  | 
| 257 | 23 |     SCOPED_TIMER(_build_rowset_timer); | Line | Count | Source |  | 69 | 23 | #define SCOPED_TIMER(c) ScopedTimer<MonotonicStopWatch> MACRO_CONCAT(SCOPED_TIMER, __COUNTER__)(c) | Line | Count | Source |  | 52 | 23 | #define MACRO_CONCAT(x, y) CONCAT_IMPL(x, y) | Line | Count | Source |  | 51 | 23 | #define CONCAT_IMPL(x, y) x##y | 
 | 
 | 
 | 
| 258 |  |     // use rowset meta manager to save meta | 
| 259 | 23 |     RETURN_NOT_OK_STATUS_WITH_WARN(_rowset_writer->build(_rowset), "fail to build rowset"); | Line | Count | Source |  | 694 | 23 |     do {                                                           \ |  | 695 | 23 |         Status _s = (stmt);                                        \ |  | 696 | 23 |         if (UNLIKELY(!_s.ok())) {                                  \| Line | Count | Source |  | 36 | 23 | #define UNLIKELY(expr) __builtin_expect(!!(expr), 0)   Branch (36:24): [True: 0, False: 23]
 | 
 |  | 697 | 0 |             LOG(WARNING) << (warning_prefix) << ", error: " << _s; \ |  | 698 | 0 |             return _s;                                             \ |  | 699 | 0 |         }                                                          \ |  | 700 | 23 |     } while (false);   Branch (700:14): [Folded - Ignored]
 | 
 | 
| 260 | 23 |     return Status::OK(); | 
| 261 | 23 | } | 
| 262 |  |  | 
| 263 | 17 | Status BaseRowsetBuilder::submit_calc_delete_bitmap_task() { | 
| 264 | 17 |     if (!_tablet->enable_unique_key_merge_on_write() || _rowset->num_segments() == 0) {  Branch (264:9): [True: 14, False: 3]
  Branch (264:57): [True: 0, False: 3]
 | 
| 265 | 14 |         return Status::OK(); | 
| 266 | 14 |     } | 
| 267 | 3 |     std::lock_guard<std::mutex> l(_lock); | 
| 268 | 3 |     SCOPED_TIMER(_submit_delete_bitmap_timer); | Line | Count | Source |  | 69 | 3 | #define SCOPED_TIMER(c) ScopedTimer<MonotonicStopWatch> MACRO_CONCAT(SCOPED_TIMER, __COUNTER__)(c) | Line | Count | Source |  | 52 | 3 | #define MACRO_CONCAT(x, y) CONCAT_IMPL(x, y) | Line | Count | Source |  | 51 | 3 | #define CONCAT_IMPL(x, y) x##y | 
 | 
 | 
 | 
| 269 | 3 |     if (_partial_update_info && _partial_update_info->is_flexible_partial_update()) {  Branch (269:9): [True: 3, False: 0]
  Branch (269:33): [True: 0, False: 3]
 | 
| 270 | 0 |         if (_rowset->num_segments() > 1) {  Branch (270:13): [True: 0, False: 0]
 | 
| 271 |  |             // in flexible partial update, when there are more one segment in one load, | 
| 272 |  |             // we need to do alignment process for same keys between segments, we haven't | 
| 273 |  |             // implemented it yet and just report an error when encouter this situation | 
| 274 | 0 |             return Status::NotSupported( | 
| 275 | 0 |                     "too large input data in flexible partial update, Please " | 
| 276 | 0 |                     "reduce the amount of data imported in a single load."); | 
| 277 | 0 |         } | 
| 278 | 0 |     } | 
| 279 | 3 |     auto* beta_rowset = reinterpret_cast<BetaRowset*>(_rowset.get()); | 
| 280 | 3 |     std::vector<segment_v2::SegmentSharedPtr> segments; | 
| 281 | 3 |     RETURN_IF_ERROR(beta_rowset->load_segments(&segments)); | Line | Count | Source |  | 637 | 3 |     do {                                \ |  | 638 | 3 |         Status _status_ = (stmt);       \ |  | 639 | 3 |         if (UNLIKELY(!_status_.ok())) { \| Line | Count | Source |  | 36 | 3 | #define UNLIKELY(expr) __builtin_expect(!!(expr), 0)   Branch (36:24): [True: 0, False: 3]
 | 
 |  | 640 | 0 |             return _status_;            \ |  | 641 | 0 |         }                               \ |  | 642 | 3 |     } while (false)   Branch (642:14): [Folded - Ignored]
 | 
 | 
| 282 | 3 |     if (segments.size() > 1) {  Branch (282:9): [True: 0, False: 3]
 | 
| 283 |  |         // calculate delete bitmap between segments | 
| 284 | 0 |         if (config::enable_calc_delete_bitmap_between_segments_concurrently) {  Branch (284:13): [True: 0, False: 0]
 | 
| 285 | 0 |             RETURN_IF_ERROR(_calc_delete_bitmap_token->submit( | Line | Count | Source |  | 637 | 0 |     do {                                \ |  | 638 | 0 |         Status _status_ = (stmt);       \ |  | 639 | 0 |         if (UNLIKELY(!_status_.ok())) { \| Line | Count | Source |  | 36 | 0 | #define UNLIKELY(expr) __builtin_expect(!!(expr), 0)   Branch (36:24): [True: 0, False: 0]
 | 
 |  | 640 | 0 |             return _status_;            \ |  | 641 | 0 |         }                               \ |  | 642 | 0 |     } while (false)   Branch (642:14): [Folded - Ignored]
 | 
 | 
| 286 | 0 |                     _tablet, _tablet_schema, _rowset->rowset_id(), segments, _delete_bitmap)); | 
| 287 | 0 |         } else { | 
| 288 | 0 |             RETURN_IF_ERROR(_tablet->calc_delete_bitmap_between_segments( | Line | Count | Source |  | 637 | 0 |     do {                                \ |  | 638 | 0 |         Status _status_ = (stmt);       \ |  | 639 | 0 |         if (UNLIKELY(!_status_.ok())) { \| Line | Count | Source |  | 36 | 0 | #define UNLIKELY(expr) __builtin_expect(!!(expr), 0)   Branch (36:24): [True: 0, False: 0]
 | 
 |  | 640 | 0 |             return _status_;            \ |  | 641 | 0 |         }                               \ |  | 642 | 0 |     } while (false)   Branch (642:14): [Folded - Ignored]
 | 
 | 
| 289 | 0 |                     _tablet_schema, _rowset->rowset_id(), segments, _delete_bitmap)); | 
| 290 | 0 |         } | 
| 291 | 0 |     } | 
| 292 |  |  | 
| 293 |  |     // For partial update, we need to fill in the entire row of data, during the calculation | 
| 294 |  |     // of the delete bitmap. This operation is resource-intensive, and we need to minimize | 
| 295 |  |     // the number of times it occurs. Therefore, we skip this operation here. | 
| 296 | 3 |     if (_partial_update_info->is_partial_update()) {  Branch (296:9): [True: 0, False: 3]
 | 
| 297 |  |         // for partial update, the delete bitmap calculation is done while append_block() | 
| 298 |  |         // we print it's summarize logs here before commit. | 
| 299 | 0 |         LOG(INFO) << fmt::format( | 
| 300 | 0 |                 "{} calc delete bitmap summary before commit: tablet({}), txn_id({}), " | 
| 301 | 0 |                 "rowset_ids({}), cur max_version({}), bitmap num({}), bitmap_cardinality({}), num " | 
| 302 | 0 |                 "rows updated({}), num rows new added({}), num rows deleted({}), total rows({})", | 
| 303 | 0 |                 _partial_update_info->partial_update_mode_str(), tablet()->tablet_id(), _req.txn_id, | 
| 304 | 0 |                 _rowset_ids->size(), rowset_writer()->context().mow_context->max_version, | 
| 305 | 0 |                 _delete_bitmap->get_delete_bitmap_count(), _delete_bitmap->cardinality(), | 
| 306 | 0 |                 rowset_writer()->num_rows_updated(), rowset_writer()->num_rows_new_added(), | 
| 307 | 0 |                 rowset_writer()->num_rows_deleted(), rowset_writer()->num_rows()); | 
| 308 | 0 |         return Status::OK(); | 
| 309 | 0 |     } | 
| 310 |  |  | 
| 311 | 3 |     LOG(INFO) << "submit calc delete bitmap task to executor, tablet_id: " << tablet()->tablet_id() | 
| 312 | 3 |               << ", txn_id: " << _req.txn_id; | 
| 313 | 3 |     return BaseTablet::commit_phase_update_delete_bitmap(_tablet, _rowset, *_rowset_ids, | 
| 314 | 3 |                                                          _delete_bitmap, segments, _req.txn_id, | 
| 315 | 3 |                                                          _calc_delete_bitmap_token.get(), nullptr); | 
| 316 | 3 | } | 
| 317 |  |  | 
| 318 | 17 | Status BaseRowsetBuilder::wait_calc_delete_bitmap() { | 
| 319 | 17 |     if (!_tablet->enable_unique_key_merge_on_write() || _partial_update_info->is_partial_update()) {  Branch (319:9): [True: 14, False: 3]
  Branch (319:57): [True: 0, False: 3]
 | 
| 320 | 14 |         return Status::OK(); | 
| 321 | 14 |     } | 
| 322 | 3 |     std::lock_guard<std::mutex> l(_lock); | 
| 323 | 3 |     SCOPED_TIMER(_wait_delete_bitmap_timer); | Line | Count | Source |  | 69 | 3 | #define SCOPED_TIMER(c) ScopedTimer<MonotonicStopWatch> MACRO_CONCAT(SCOPED_TIMER, __COUNTER__)(c) | Line | Count | Source |  | 52 | 3 | #define MACRO_CONCAT(x, y) CONCAT_IMPL(x, y) | Line | Count | Source |  | 51 | 3 | #define CONCAT_IMPL(x, y) x##y | 
 | 
 | 
 | 
| 324 | 3 |     RETURN_IF_ERROR(_calc_delete_bitmap_token->wait()); | Line | Count | Source |  | 637 | 3 |     do {                                \ |  | 638 | 3 |         Status _status_ = (stmt);       \ |  | 639 | 3 |         if (UNLIKELY(!_status_.ok())) { \| Line | Count | Source |  | 36 | 3 | #define UNLIKELY(expr) __builtin_expect(!!(expr), 0)   Branch (36:24): [True: 0, False: 3]
 | 
 |  | 640 | 0 |             return _status_;            \ |  | 641 | 0 |         }                               \ |  | 642 | 3 |     } while (false)   Branch (642:14): [Folded - Ignored]
 | 
 | 
| 325 | 3 |     return Status::OK(); | 
| 326 | 3 | } | 
| 327 |  |  | 
| 328 | 23 | Status RowsetBuilder::commit_txn() { | 
| 329 | 23 |     if (tablet()->enable_unique_key_merge_on_write() &&   Branch (329:9): [True: 3, False: 20]
 | 
| 330 | 23 |         config::enable_merge_on_write_correctness_check && _rowset->num_rows() != 0 &&   Branch (330:9): [True: 3, False: 0]
  Branch (330:60): [True: 3, False: 0]
 | 
| 331 | 23 |         tablet()->tablet_state() != TABLET_NOTREADY) {  Branch (331:9): [True: 3, False: 0]
 | 
| 332 | 3 |         auto st = tablet()->check_delete_bitmap_correctness( | 
| 333 | 3 |                 _delete_bitmap, _rowset->end_version() - 1, _req.txn_id, *_rowset_ids); | 
| 334 | 3 |         if (!st.ok()) {  Branch (334:13): [True: 0, False: 3]
 | 
| 335 | 0 |             LOG(WARNING) << fmt::format( | 
| 336 | 0 |                     "[tablet_id:{}][txn_id:{}][load_id:{}][partition_id:{}] " | 
| 337 | 0 |                     "delete bitmap correctness check failed in commit phase!", | 
| 338 | 0 |                     _req.tablet_id, _req.txn_id, UniqueId(_req.load_id).to_string(), | 
| 339 | 0 |                     _req.partition_id); | 
| 340 | 0 |             return st; | 
| 341 | 0 |         } | 
| 342 | 3 |     } | 
| 343 | 23 |     std::lock_guard<std::mutex> l(_lock); | 
| 344 | 23 |     SCOPED_TIMER(_commit_txn_timer); | Line | Count | Source |  | 69 | 23 | #define SCOPED_TIMER(c) ScopedTimer<MonotonicStopWatch> MACRO_CONCAT(SCOPED_TIMER, __COUNTER__)(c) | Line | Count | Source |  | 52 | 23 | #define MACRO_CONCAT(x, y) CONCAT_IMPL(x, y) | Line | Count | Source |  | 51 | 23 | #define CONCAT_IMPL(x, y) x##y | 
 | 
 | 
 | 
| 345 |  |  | 
| 346 | 23 |     const RowsetWriterContext& rw_ctx = _rowset_writer->context(); | 
| 347 | 23 |     if (rw_ctx.tablet_schema->num_variant_columns() > 0 && _rowset->num_rows() > 0) {  Branch (347:9): [True: 0, False: 23]
  Branch (347:60): [True: 0, False: 0]
 | 
| 348 |  |         // Need to merge schema with `rw_ctx.merged_tablet_schema` in prior, | 
| 349 |  |         // merged schema keeps the newest merged schema for the rowset, which is updated and merged | 
| 350 |  |         // during flushing segments. | 
| 351 | 0 |         if (rw_ctx.merged_tablet_schema != nullptr) {  Branch (351:13): [True: 0, False: 0]
 | 
| 352 | 0 |             RETURN_IF_ERROR(tablet()->update_by_least_common_schema(rw_ctx.merged_tablet_schema)); | Line | Count | Source |  | 637 | 0 |     do {                                \ |  | 638 | 0 |         Status _status_ = (stmt);       \ |  | 639 | 0 |         if (UNLIKELY(!_status_.ok())) { \| Line | Count | Source |  | 36 | 0 | #define UNLIKELY(expr) __builtin_expect(!!(expr), 0)   Branch (36:24): [True: 0, False: 0]
 | 
 |  | 640 | 0 |             return _status_;            \ |  | 641 | 0 |         }                               \ |  | 642 | 0 |     } while (false)   Branch (642:14): [Folded - Ignored]
 | 
 | 
| 353 | 0 |         } else { | 
| 354 |  |             // We should merge rowset schema further, in case that the merged_tablet_schema maybe null | 
| 355 |  |             // when enable_memtable_on_sink_node is true, the merged_tablet_schema will not be passed to | 
| 356 |  |             // the destination backend. | 
| 357 |  |             // update tablet schema when meet variant columns, before commit_txn | 
| 358 |  |             // Eg. rowset schema:       A(int),    B(float),  C(int), D(int) | 
| 359 |  |             // _tabelt->tablet_schema:  A(bigint), B(double) | 
| 360 |  |             //  => update_schema:       A(bigint), B(double), C(int), D(int) | 
| 361 | 0 |             RETURN_IF_ERROR(tablet()->update_by_least_common_schema(rw_ctx.tablet_schema)); | Line | Count | Source |  | 637 | 0 |     do {                                \ |  | 638 | 0 |         Status _status_ = (stmt);       \ |  | 639 | 0 |         if (UNLIKELY(!_status_.ok())) { \| Line | Count | Source |  | 36 | 0 | #define UNLIKELY(expr) __builtin_expect(!!(expr), 0)   Branch (36:24): [True: 0, False: 0]
 | 
 |  | 640 | 0 |             return _status_;            \ |  | 641 | 0 |         }                               \ |  | 642 | 0 |     } while (false)   Branch (642:14): [Folded - Ignored]
 | 
 | 
| 362 | 0 |         } | 
| 363 | 0 |     } | 
| 364 |  |  | 
| 365 |  |     // Transfer ownership of `PendingRowsetGuard` to `TxnManager` | 
| 366 | 23 |     Status res = _engine.txn_manager()->commit_txn( | 
| 367 | 23 |             _req.partition_id, *tablet(), _req.txn_id, _req.load_id, _rowset, | 
| 368 | 23 |             std::move(_pending_rs_guard), false, _partial_update_info); | 
| 369 |  |  | 
| 370 | 23 |     if (!res && !res.is<PUSH_TRANSACTION_ALREADY_EXIST>()) {  Branch (370:9): [True: 0, False: 23]
  Branch (370:17): [True: 0, False: 0]
 | 
| 371 | 0 |         LOG(WARNING) << "Failed to commit txn: " << _req.txn_id | 
| 372 | 0 |                      << " for rowset: " << _rowset->rowset_id(); | 
| 373 | 0 |         return res; | 
| 374 | 0 |     } | 
| 375 | 23 |     if (_tablet->enable_unique_key_merge_on_write()) {  Branch (375:9): [True: 3, False: 20]
 | 
| 376 | 3 |         _engine.txn_manager()->set_txn_related_delete_bitmap( | 
| 377 | 3 |                 _req.partition_id, _req.txn_id, tablet()->tablet_id(), tablet()->tablet_uid(), true, | 
| 378 | 3 |                 _delete_bitmap, *_rowset_ids, _partial_update_info); | 
| 379 | 3 |     } | 
| 380 |  |  | 
| 381 | 23 |     _is_committed = true; | 
| 382 | 23 |     return Status::OK(); | 
| 383 | 23 | } | 
| 384 |  |  | 
| 385 | 0 | Status BaseRowsetBuilder::cancel() { | 
| 386 | 0 |     std::lock_guard<std::mutex> l(_lock); | 
| 387 | 0 |     if (_is_cancelled) {  Branch (387:9): [True: 0, False: 0]
 | 
| 388 | 0 |         return Status::OK(); | 
| 389 | 0 |     } | 
| 390 | 0 |     if (_calc_delete_bitmap_token != nullptr) {  Branch (390:9): [True: 0, False: 0]
 | 
| 391 | 0 |         _calc_delete_bitmap_token->cancel(); | 
| 392 | 0 |     } | 
| 393 | 0 |     _is_cancelled = true; | 
| 394 | 0 |     return Status::OK(); | 
| 395 | 0 | } | 
| 396 |  |  | 
| 397 |  | Status BaseRowsetBuilder::_build_current_tablet_schema( | 
| 398 |  |         int64_t index_id, const OlapTableSchemaParam* table_schema_param, | 
| 399 | 28 |         const TabletSchema& ori_tablet_schema) { | 
| 400 |  |     // find the right index id | 
| 401 | 28 |     int i = 0; | 
| 402 | 28 |     auto indexes = table_schema_param->indexes(); | 
| 403 | 28 |     for (; i < indexes.size(); i++) {  Branch (403:12): [True: 13, False: 15]
 | 
| 404 | 13 |         if (indexes[i]->index_id == index_id) {  Branch (404:13): [True: 13, False: 0]
 | 
| 405 | 13 |             break; | 
| 406 | 13 |         } | 
| 407 | 13 |     } | 
| 408 | 28 |     if (!indexes.empty() && !indexes[i]->columns.empty() &&   Branch (408:9): [True: 13, False: 15]
  Branch (408:29): [True: 0, False: 13]
 | 
| 409 | 28 |         indexes[i]->columns[0]->unique_id() >= 0) {  Branch (409:9): [True: 0, False: 0]
 | 
| 410 | 0 |         _tablet_schema->shawdow_copy_without_columns(ori_tablet_schema); | 
| 411 | 0 |         _tablet_schema->build_current_tablet_schema(index_id, table_schema_param->version(), | 
| 412 | 0 |                                                     indexes[i], ori_tablet_schema); | 
| 413 | 28 |     } else { | 
| 414 | 28 |         _tablet_schema->copy_from(ori_tablet_schema); | 
| 415 | 28 |     } | 
| 416 | 28 |     if (_tablet_schema->schema_version() > ori_tablet_schema.schema_version()) {  Branch (416:9): [True: 0, False: 28]
 | 
| 417 |  |         // After schema change, should include extracted column | 
| 418 |  |         // For example: a table has two columns, k and v | 
| 419 |  |         // After adding a column v2, the schema version increases, max_version_schema needs to be updated. | 
| 420 |  |         // _tablet_schema includes k, v, and v2 | 
| 421 |  |         // if v is a variant, need to add the columns decomposed from the v to the _tablet_schema. | 
| 422 | 0 |         if (_tablet_schema->num_variant_columns() > 0) {  Branch (422:13): [True: 0, False: 0]
 | 
| 423 | 0 |             TabletSchemaSPtr max_version_schema = std::make_shared<TabletSchema>(); | 
| 424 | 0 |             max_version_schema->copy_from(*_tablet_schema); | 
| 425 | 0 |             max_version_schema->copy_extracted_columns(ori_tablet_schema); | 
| 426 | 0 |             _tablet->update_max_version_schema(max_version_schema); | 
| 427 | 0 |         } else { | 
| 428 | 0 |             _tablet->update_max_version_schema(_tablet_schema); | 
| 429 | 0 |         } | 
| 430 | 0 |     } | 
| 431 |  |  | 
| 432 | 28 |     _tablet_schema->set_table_id(table_schema_param->table_id()); | 
| 433 | 28 |     _tablet_schema->set_db_id(table_schema_param->db_id()); | 
| 434 | 28 |     if (table_schema_param->is_partial_update()) {  Branch (434:9): [True: 0, False: 28]
 | 
| 435 | 0 |         _tablet_schema->set_auto_increment_column(table_schema_param->auto_increment_coulumn()); | 
| 436 | 0 |     } | 
| 437 |  |     // set partial update columns info | 
| 438 | 28 |     _partial_update_info = std::make_shared<PartialUpdateInfo>(); | 
| 439 | 28 |     RETURN_IF_ERROR(_partial_update_info->init( | Line | Count | Source |  | 637 | 28 |     do {                                \ |  | 638 | 28 |         Status _status_ = (stmt);       \ |  | 639 | 28 |         if (UNLIKELY(!_status_.ok())) { \| Line | Count | Source |  | 36 | 28 | #define UNLIKELY(expr) __builtin_expect(!!(expr), 0)   Branch (36:24): [True: 0, False: 28]
 | 
 |  | 640 | 0 |             return _status_;            \ |  | 641 | 0 |         }                               \ |  | 642 | 28 |     } while (false)   Branch (642:14): [Folded - Ignored]
 | 
 | 
| 440 | 28 |             tablet()->tablet_id(), _req.txn_id, *_tablet_schema, | 
| 441 | 28 |             table_schema_param->unique_key_update_mode(), | 
| 442 | 28 |             table_schema_param->partial_update_new_key_policy(), | 
| 443 | 28 |             table_schema_param->partial_update_input_columns(), | 
| 444 | 28 |             table_schema_param->is_strict_mode(), table_schema_param->timestamp_ms(), | 
| 445 | 28 |             table_schema_param->nano_seconds(), table_schema_param->timezone(), | 
| 446 | 28 |             table_schema_param->auto_increment_coulumn(), | 
| 447 | 28 |             table_schema_param->sequence_map_col_uid(), _max_version_in_flush_phase)); | 
| 448 | 28 |     return Status::OK(); | 
| 449 | 28 | } | 
| 450 |  |  | 
| 451 |  | } // namespace doris |