/root/doris/be/src/olap/schema_change.cpp
Line | Count | Source (jump to first uncovered line) |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "olap/schema_change.h" |
19 | | |
20 | | #include <gen_cpp/olap_file.pb.h> |
21 | | #include <glog/logging.h> |
22 | | #include <thrift/protocol/TDebugProtocol.h> |
23 | | |
24 | | #include <algorithm> |
25 | | #include <exception> |
26 | | #include <map> |
27 | | #include <memory> |
28 | | #include <mutex> |
29 | | #include <roaring/roaring.hh> |
30 | | #include <tuple> |
31 | | #include <utility> |
32 | | |
33 | | #include "agent/be_exec_version_manager.h" |
34 | | #include "cloud/cloud_schema_change_job.h" |
35 | | #include "cloud/config.h" |
36 | | #include "common/consts.h" |
37 | | #include "common/logging.h" |
38 | | #include "common/signal_handler.h" |
39 | | #include "common/status.h" |
40 | | #include "exec/schema_scanner/schema_metadata_name_ids_scanner.h" |
41 | | #include "gutil/integral_types.h" |
42 | | #include "gutil/strings/numbers.h" |
43 | | #include "io/fs/file_system.h" |
44 | | #include "io/io_common.h" |
45 | | #include "olap/base_tablet.h" |
46 | | #include "olap/data_dir.h" |
47 | | #include "olap/delete_handler.h" |
48 | | #include "olap/field.h" |
49 | | #include "olap/iterators.h" |
50 | | #include "olap/merger.h" |
51 | | #include "olap/olap_common.h" |
52 | | #include "olap/olap_define.h" |
53 | | #include "olap/rowset/beta_rowset.h" |
54 | | #include "olap/rowset/pending_rowset_helper.h" |
55 | | #include "olap/rowset/rowset_meta.h" |
56 | | #include "olap/rowset/rowset_reader_context.h" |
57 | | #include "olap/rowset/rowset_writer_context.h" |
58 | | #include "olap/rowset/segment_v2/column_reader.h" |
59 | | #include "olap/rowset/segment_v2/inverted_index_desc.h" |
60 | | #include "olap/rowset/segment_v2/inverted_index_writer.h" |
61 | | #include "olap/rowset/segment_v2/segment.h" |
62 | | #include "olap/schema.h" |
63 | | #include "olap/segment_loader.h" |
64 | | #include "olap/storage_engine.h" |
65 | | #include "olap/tablet.h" |
66 | | #include "olap/tablet_fwd.h" |
67 | | #include "olap/tablet_manager.h" |
68 | | #include "olap/tablet_meta.h" |
69 | | #include "olap/tablet_schema.h" |
70 | | #include "olap/types.h" |
71 | | #include "olap/utils.h" |
72 | | #include "olap/wrapper_field.h" |
73 | | #include "runtime/exec_env.h" |
74 | | #include "runtime/memory/mem_tracker.h" |
75 | | #include "runtime/runtime_state.h" |
76 | | #include "util/debug_points.h" |
77 | | #include "util/defer_op.h" |
78 | | #include "util/trace.h" |
79 | | #include "vec/aggregate_functions/aggregate_function.h" |
80 | | #include "vec/aggregate_functions/aggregate_function_reader.h" |
81 | | #include "vec/columns/column.h" |
82 | | #include "vec/columns/column_nullable.h" |
83 | | #include "vec/common/assert_cast.h" |
84 | | #include "vec/common/schema_util.h" |
85 | | #include "vec/core/block.h" |
86 | | #include "vec/core/column_with_type_and_name.h" |
87 | | #include "vec/exprs/vexpr.h" |
88 | | #include "vec/exprs/vexpr_context.h" |
89 | | #include "vec/olap/olap_data_convertor.h" |
90 | | |
91 | | namespace doris { |
92 | | class CollectionValue; |
93 | | |
94 | | using namespace ErrorCode; |
95 | | |
96 | | constexpr int ALTER_TABLE_BATCH_SIZE = 4064; |
97 | | |
98 | | class MultiBlockMerger { |
99 | | public: |
100 | 0 | MultiBlockMerger(BaseTabletSPtr tablet) : _tablet(tablet), _cmp(*tablet) {} |
101 | | |
102 | | Status merge(const std::vector<std::unique_ptr<vectorized::Block>>& blocks, |
103 | 0 | RowsetWriter* rowset_writer, uint64_t* merged_rows) { |
104 | 0 | int rows = 0; |
105 | 0 | for (const auto& block : blocks) { Branch (105:32): [True: 0, False: 0]
|
106 | 0 | rows += block->rows(); |
107 | 0 | } |
108 | 0 | if (!rows) { Branch (108:13): [True: 0, False: 0]
|
109 | 0 | return Status::OK(); |
110 | 0 | } |
111 | | |
112 | 0 | std::vector<RowRef> row_refs; |
113 | 0 | row_refs.reserve(rows); |
114 | 0 | for (const auto& block : blocks) { Branch (114:32): [True: 0, False: 0]
|
115 | 0 | for (uint16_t i = 0; i < block->rows(); i++) { Branch (115:34): [True: 0, False: 0]
|
116 | 0 | row_refs.emplace_back(block.get(), i); |
117 | 0 | } |
118 | 0 | } |
119 | | // TODO: try to use pdqsort to replace std::sort |
120 | | // The block version is incremental. |
121 | 0 | std::stable_sort(row_refs.begin(), row_refs.end(), _cmp); |
122 | |
|
123 | 0 | auto finalized_block = _tablet->tablet_schema()->create_block(); |
124 | 0 | int columns = finalized_block.columns(); |
125 | 0 | *merged_rows += rows; |
126 | |
|
127 | 0 | if (_tablet->keys_type() == KeysType::AGG_KEYS) { Branch (127:13): [True: 0, False: 0]
|
128 | 0 | auto tablet_schema = _tablet->tablet_schema(); |
129 | 0 | int key_number = _tablet->num_key_columns(); |
130 | |
|
131 | 0 | std::vector<vectorized::AggregateFunctionPtr> agg_functions; |
132 | 0 | std::vector<vectorized::AggregateDataPtr> agg_places; |
133 | |
|
134 | 0 | for (int i = key_number; i < columns; i++) { Branch (134:38): [True: 0, False: 0]
|
135 | 0 | try { |
136 | 0 | vectorized::AggregateFunctionPtr function = |
137 | 0 | tablet_schema->column(i).get_aggregate_function( |
138 | 0 | vectorized::AGG_LOAD_SUFFIX, |
139 | 0 | tablet_schema->column(i).get_be_exec_version()); |
140 | 0 | if (!function) { Branch (140:25): [True: 0, False: 0]
|
141 | 0 | return Status::InternalError( |
142 | 0 | "could not find aggregate function on column {}, aggregation={}", |
143 | 0 | tablet_schema->column(i).name(), |
144 | 0 | tablet_schema->column(i).aggregation()); |
145 | 0 | } |
146 | 0 | agg_functions.push_back(function); |
147 | | // create aggregate data |
148 | 0 | auto* place = new char[function->size_of_data()]; |
149 | 0 | function->create(place); |
150 | 0 | agg_places.push_back(place); |
151 | 0 | } catch (...) { |
152 | 0 | for (int j = 0; j < i - key_number; ++j) { Branch (152:37): [True: 0, False: 0]
|
153 | 0 | agg_functions[j]->destroy(agg_places[j]); |
154 | 0 | delete[] agg_places[j]; |
155 | 0 | } |
156 | 0 | throw; |
157 | 0 | } |
158 | 0 | } |
159 | | |
160 | 0 | DEFER({ Line | Count | Source | 46 | 0 | #define DEFER(...) DEFER_FWD(__LINE__, __VA_ARGS__) Line | Count | Source | 45 | 0 | #define DEFER_FWD(n, ...) DEFER_CONCAT(n, __VA_ARGS__) Line | Count | Source | 44 | 0 | #define DEFER_CONCAT(n, ...) const auto defer##n = doris::Defer([&]() { __VA_ARGS__; }) |
|
|
|
161 | 0 | for (int i = 0; i < columns - key_number; i++) { |
162 | 0 | agg_functions[i]->destroy(agg_places[i]); |
163 | 0 | delete[] agg_places[i]; |
164 | 0 | } |
165 | 0 | }); |
166 | |
|
167 | 0 | for (int i = 0; i < rows; i++) { Branch (167:29): [True: 0, False: 0]
|
168 | 0 | auto row_ref = row_refs[i]; |
169 | 0 | for (int j = key_number; j < columns; j++) { Branch (169:42): [True: 0, False: 0]
|
170 | 0 | const auto* column_ptr = row_ref.get_column(j).get(); |
171 | 0 | agg_functions[j - key_number]->add( |
172 | 0 | agg_places[j - key_number], |
173 | 0 | const_cast<const vectorized::IColumn**>(&column_ptr), row_ref.position, |
174 | 0 | &_arena); |
175 | 0 | } |
176 | |
|
177 | 0 | if (i == rows - 1 || _cmp.compare(row_refs[i], row_refs[i + 1])) { Branch (177:21): [True: 0, False: 0]
Branch (177:38): [True: 0, False: 0]
|
178 | 0 | for (int j = 0; j < key_number; j++) { Branch (178:37): [True: 0, False: 0]
|
179 | 0 | finalized_block.get_by_position(j).column->assume_mutable()->insert_from( |
180 | 0 | *row_ref.get_column(j), row_ref.position); |
181 | 0 | } |
182 | |
|
183 | 0 | for (int j = key_number; j < columns; j++) { Branch (183:46): [True: 0, False: 0]
|
184 | 0 | agg_functions[j - key_number]->insert_result_into( |
185 | 0 | agg_places[j - key_number], |
186 | 0 | finalized_block.get_by_position(j).column->assume_mutable_ref()); |
187 | 0 | agg_functions[j - key_number]->reset(agg_places[j - key_number]); |
188 | 0 | } |
189 | |
|
190 | 0 | if (i == rows - 1 || finalized_block.rows() == ALTER_TABLE_BATCH_SIZE) { Branch (190:25): [True: 0, False: 0]
Branch (190:42): [True: 0, False: 0]
|
191 | 0 | *merged_rows -= finalized_block.rows(); |
192 | 0 | RETURN_IF_ERROR(rowset_writer->add_block(&finalized_block)); Line | Count | Source | 637 | 0 | do { \ | 638 | 0 | Status _status_ = (stmt); \ | 639 | 0 | if (UNLIKELY(!_status_.ok())) { \ Line | Count | Source | 36 | 0 | #define UNLIKELY(expr) __builtin_expect(!!(expr), 0) Branch (36:24): [True: 0, False: 0]
|
| 640 | 0 | return _status_; \ | 641 | 0 | } \ | 642 | 0 | } while (false) Branch (642:14): [Folded - Ignored]
|
|
193 | 0 | finalized_block.clear_column_data(); |
194 | 0 | } |
195 | 0 | } |
196 | 0 | } |
197 | 0 | } else { |
198 | 0 | std::vector<RowRef> pushed_row_refs; |
199 | 0 | if (_tablet->keys_type() == KeysType::DUP_KEYS) { Branch (199:17): [True: 0, False: 0]
|
200 | 0 | std::swap(pushed_row_refs, row_refs); |
201 | 0 | } else if (_tablet->keys_type() == KeysType::UNIQUE_KEYS) { Branch (201:24): [True: 0, False: 0]
|
202 | 0 | for (int i = 0; i < rows; i++) { Branch (202:33): [True: 0, False: 0]
|
203 | 0 | if (i == rows - 1 || _cmp.compare(row_refs[i], row_refs[i + 1])) { Branch (203:25): [True: 0, False: 0]
Branch (203:42): [True: 0, False: 0]
|
204 | 0 | pushed_row_refs.push_back(row_refs[i]); |
205 | 0 | } |
206 | 0 | } |
207 | 0 | } |
208 | | |
209 | | // update real inserted row number |
210 | 0 | rows = pushed_row_refs.size(); |
211 | 0 | *merged_rows -= rows; |
212 | |
|
213 | 0 | for (int i = 0; i < rows; i += ALTER_TABLE_BATCH_SIZE) { Branch (213:29): [True: 0, False: 0]
|
214 | 0 | int limit = std::min(ALTER_TABLE_BATCH_SIZE, rows - i); |
215 | |
|
216 | 0 | for (int idx = 0; idx < columns; idx++) { Branch (216:35): [True: 0, False: 0]
|
217 | 0 | auto column = finalized_block.get_by_position(idx).column->assume_mutable(); |
218 | 0 | for (int j = 0; j < limit; j++) { Branch (218:37): [True: 0, False: 0]
|
219 | 0 | auto row_ref = pushed_row_refs[i + j]; |
220 | 0 | column->insert_from(*row_ref.get_column(idx), row_ref.position); |
221 | 0 | } |
222 | 0 | } |
223 | 0 | RETURN_IF_ERROR(rowset_writer->add_block(&finalized_block)); Line | Count | Source | 637 | 0 | do { \ | 638 | 0 | Status _status_ = (stmt); \ | 639 | 0 | if (UNLIKELY(!_status_.ok())) { \ Line | Count | Source | 36 | 0 | #define UNLIKELY(expr) __builtin_expect(!!(expr), 0) Branch (36:24): [True: 0, False: 0]
|
| 640 | 0 | return _status_; \ | 641 | 0 | } \ | 642 | 0 | } while (false) Branch (642:14): [Folded - Ignored]
|
|
224 | 0 | finalized_block.clear_column_data(); |
225 | 0 | } |
226 | 0 | } |
227 | | |
228 | 0 | RETURN_IF_ERROR(rowset_writer->flush()); Line | Count | Source | 637 | 0 | do { \ | 638 | 0 | Status _status_ = (stmt); \ | 639 | 0 | if (UNLIKELY(!_status_.ok())) { \ Line | Count | Source | 36 | 0 | #define UNLIKELY(expr) __builtin_expect(!!(expr), 0) Branch (36:24): [True: 0, False: 0]
|
| 640 | 0 | return _status_; \ | 641 | 0 | } \ | 642 | 0 | } while (false) Branch (642:14): [Folded - Ignored]
|
|
229 | 0 | return Status::OK(); |
230 | 0 | } |
231 | | |
232 | | private: |
233 | | struct RowRef { |
234 | | RowRef(vectorized::Block* block_, uint16_t position_) |
235 | 0 | : block(block_), position(position_) {} |
236 | 0 | vectorized::ColumnPtr get_column(int index) const { |
237 | 0 | return block->get_by_position(index).column; |
238 | 0 | } |
239 | | const vectorized::Block* block; |
240 | | uint16_t position; |
241 | | }; |
242 | | |
243 | | struct RowRefComparator { |
244 | 0 | RowRefComparator(const BaseTablet& tablet) : _num_columns(tablet.num_key_columns()) {} |
245 | | |
246 | 0 | int compare(const RowRef& lhs, const RowRef& rhs) const { |
247 | 0 | return lhs.block->compare_at(lhs.position, rhs.position, _num_columns, *rhs.block, -1); |
248 | 0 | } |
249 | | |
250 | 0 | bool operator()(const RowRef& lhs, const RowRef& rhs) const { |
251 | 0 | return compare(lhs, rhs) < 0; |
252 | 0 | } |
253 | | |
254 | | const size_t _num_columns; |
255 | | }; |
256 | | |
257 | | BaseTabletSPtr _tablet; |
258 | | RowRefComparator _cmp; |
259 | | vectorized::Arena _arena; |
260 | | }; |
261 | | |
262 | | BlockChanger::BlockChanger(TabletSchemaSPtr tablet_schema, DescriptorTbl desc_tbl) |
263 | 0 | : _desc_tbl(std::move(desc_tbl)) { |
264 | 0 | _schema_mapping.resize(tablet_schema->num_columns()); |
265 | 0 | } |
266 | | |
267 | 0 | BlockChanger::~BlockChanger() { |
268 | 0 | for (auto it = _schema_mapping.begin(); it != _schema_mapping.end(); ++it) { Branch (268:45): [True: 0, False: 0]
|
269 | 0 | SAFE_DELETE(it->default_value); Line | Count | Source | 168 | 0 | do { \ | 169 | 0 | if (nullptr != ptr) { \ Branch (169:13): [True: 0, False: 0]
| 170 | 0 | delete ptr; \ | 171 | 0 | ptr = nullptr; \ | 172 | 0 | } \ | 173 | 0 | } while (0) Branch (173:14): [Folded - Ignored]
|
|
270 | 0 | } |
271 | 0 | _schema_mapping.clear(); |
272 | 0 | } |
273 | | |
274 | 0 | ColumnMapping* BlockChanger::get_mutable_column_mapping(size_t column_index) { |
275 | 0 | if (column_index >= _schema_mapping.size()) { Branch (275:9): [True: 0, False: 0]
|
276 | 0 | return nullptr; |
277 | 0 | } |
278 | | |
279 | 0 | return &(_schema_mapping[column_index]); |
280 | 0 | } |
281 | | |
282 | | Status BlockChanger::change_block(vectorized::Block* ref_block, |
283 | 0 | vectorized::Block* new_block) const { |
284 | 0 | std::unique_ptr<RuntimeState> state = RuntimeState::create_unique(); |
285 | 0 | state->set_desc_tbl(&_desc_tbl); |
286 | 0 | state->set_be_exec_version(_fe_compatible_version); |
287 | 0 | RowDescriptor row_desc = |
288 | 0 | RowDescriptor(_desc_tbl.get_tuple_descriptor(_desc_tbl.get_row_tuples()[0]), false); |
289 | |
|
290 | 0 | if (_where_expr != nullptr) { Branch (290:9): [True: 0, False: 0]
|
291 | 0 | vectorized::VExprContextSPtr ctx = nullptr; |
292 | 0 | RETURN_IF_ERROR(vectorized::VExpr::create_expr_tree(*_where_expr, ctx)); Line | Count | Source | 637 | 0 | do { \ | 638 | 0 | Status _status_ = (stmt); \ | 639 | 0 | if (UNLIKELY(!_status_.ok())) { \ Line | Count | Source | 36 | 0 | #define UNLIKELY(expr) __builtin_expect(!!(expr), 0) Branch (36:24): [True: 0, False: 0]
|
| 640 | 0 | return _status_; \ | 641 | 0 | } \ | 642 | 0 | } while (false) Branch (642:14): [Folded - Ignored]
|
|
293 | 0 | RETURN_IF_ERROR(ctx->prepare(state.get(), row_desc)); Line | Count | Source | 637 | 0 | do { \ | 638 | 0 | Status _status_ = (stmt); \ | 639 | 0 | if (UNLIKELY(!_status_.ok())) { \ Line | Count | Source | 36 | 0 | #define UNLIKELY(expr) __builtin_expect(!!(expr), 0) Branch (36:24): [True: 0, False: 0]
|
| 640 | 0 | return _status_; \ | 641 | 0 | } \ | 642 | 0 | } while (false) Branch (642:14): [Folded - Ignored]
|
|
294 | 0 | RETURN_IF_ERROR(ctx->open(state.get())); Line | Count | Source | 637 | 0 | do { \ | 638 | 0 | Status _status_ = (stmt); \ | 639 | 0 | if (UNLIKELY(!_status_.ok())) { \ Line | Count | Source | 36 | 0 | #define UNLIKELY(expr) __builtin_expect(!!(expr), 0) Branch (36:24): [True: 0, False: 0]
|
| 640 | 0 | return _status_; \ | 641 | 0 | } \ | 642 | 0 | } while (false) Branch (642:14): [Folded - Ignored]
|
|
295 | | |
296 | 0 | RETURN_IF_ERROR( Line | Count | Source | 637 | 0 | do { \ | 638 | 0 | Status _status_ = (stmt); \ | 639 | 0 | if (UNLIKELY(!_status_.ok())) { \ Line | Count | Source | 36 | 0 | #define UNLIKELY(expr) __builtin_expect(!!(expr), 0) Branch (36:24): [True: 0, False: 0]
|
| 640 | 0 | return _status_; \ | 641 | 0 | } \ | 642 | 0 | } while (false) Branch (642:14): [Folded - Ignored]
|
|
297 | 0 | vectorized::VExprContext::filter_block(ctx.get(), ref_block, ref_block->columns())); |
298 | 0 | } |
299 | | |
300 | 0 | const int row_num = ref_block->rows(); |
301 | 0 | const int new_schema_cols_num = new_block->columns(); |
302 | | |
303 | | // will be used for swaping ref_block[entry.first] and new_block[entry.second] |
304 | 0 | std::list<std::pair<int, int>> swap_idx_list; |
305 | 0 | for (int idx = 0; idx < new_schema_cols_num; idx++) { Branch (305:23): [True: 0, False: 0]
|
306 | 0 | auto expr = _schema_mapping[idx].expr; |
307 | 0 | if (expr != nullptr) { Branch (307:13): [True: 0, False: 0]
|
308 | 0 | vectorized::VExprContextSPtr ctx; |
309 | 0 | RETURN_IF_ERROR(vectorized::VExpr::create_expr_tree(*expr, ctx)); Line | Count | Source | 637 | 0 | do { \ | 638 | 0 | Status _status_ = (stmt); \ | 639 | 0 | if (UNLIKELY(!_status_.ok())) { \ Line | Count | Source | 36 | 0 | #define UNLIKELY(expr) __builtin_expect(!!(expr), 0) Branch (36:24): [True: 0, False: 0]
|
| 640 | 0 | return _status_; \ | 641 | 0 | } \ | 642 | 0 | } while (false) Branch (642:14): [Folded - Ignored]
|
|
310 | 0 | RETURN_IF_ERROR(ctx->prepare(state.get(), row_desc)); Line | Count | Source | 637 | 0 | do { \ | 638 | 0 | Status _status_ = (stmt); \ | 639 | 0 | if (UNLIKELY(!_status_.ok())) { \ Line | Count | Source | 36 | 0 | #define UNLIKELY(expr) __builtin_expect(!!(expr), 0) Branch (36:24): [True: 0, False: 0]
|
| 640 | 0 | return _status_; \ | 641 | 0 | } \ | 642 | 0 | } while (false) Branch (642:14): [Folded - Ignored]
|
|
311 | 0 | RETURN_IF_ERROR(ctx->open(state.get())); Line | Count | Source | 637 | 0 | do { \ | 638 | 0 | Status _status_ = (stmt); \ | 639 | 0 | if (UNLIKELY(!_status_.ok())) { \ Line | Count | Source | 36 | 0 | #define UNLIKELY(expr) __builtin_expect(!!(expr), 0) Branch (36:24): [True: 0, False: 0]
|
| 640 | 0 | return _status_; \ | 641 | 0 | } \ | 642 | 0 | } while (false) Branch (642:14): [Folded - Ignored]
|
|
312 | | |
313 | 0 | int result_tmp_column_idx = -1; |
314 | 0 | RETURN_IF_ERROR(ctx->execute(ref_block, &result_tmp_column_idx)); Line | Count | Source | 637 | 0 | do { \ | 638 | 0 | Status _status_ = (stmt); \ | 639 | 0 | if (UNLIKELY(!_status_.ok())) { \ Line | Count | Source | 36 | 0 | #define UNLIKELY(expr) __builtin_expect(!!(expr), 0) Branch (36:24): [True: 0, False: 0]
|
| 640 | 0 | return _status_; \ | 641 | 0 | } \ | 642 | 0 | } while (false) Branch (642:14): [Folded - Ignored]
|
|
315 | 0 | auto& result_tmp_column_def = ref_block->get_by_position(result_tmp_column_idx); |
316 | 0 | if (result_tmp_column_def.column == nullptr) { Branch (316:17): [True: 0, False: 0]
|
317 | 0 | return Status::Error<ErrorCode::INTERNAL_ERROR>( |
318 | 0 | "result column={} is nullptr, input expr={}", result_tmp_column_def.name, |
319 | 0 | apache::thrift::ThriftDebugString(*expr)); |
320 | 0 | } |
321 | 0 | ref_block->replace_by_position_if_const(result_tmp_column_idx); |
322 | |
|
323 | 0 | if (result_tmp_column_def.column->size() != row_num) { Branch (323:17): [True: 0, False: 0]
|
324 | 0 | return Status::Error<ErrorCode::INTERNAL_ERROR>( |
325 | 0 | "result size invalid, expect={}, real={}; input expr={}", row_num, |
326 | 0 | result_tmp_column_def.column->size(), |
327 | 0 | apache::thrift::ThriftDebugString(*expr)); |
328 | 0 | } |
329 | | |
330 | 0 | if (_type == SCHEMA_CHANGE) { Branch (330:17): [True: 0, False: 0]
|
331 | | // danger casts (expected to be rejected by upstream caller) may cause data to be null and result in data loss in schema change |
332 | | // for rollup, this check is unecessary, and ref columns are not set in this case, it works on exprs |
333 | | |
334 | | // column_idx in base schema |
335 | 0 | int32_t ref_column_idx = _schema_mapping[idx].ref_column_idx; |
336 | 0 | DCHECK_GE(ref_column_idx, 0); |
337 | 0 | auto& ref_column_def = ref_block->get_by_position(ref_column_idx); |
338 | 0 | RETURN_IF_ERROR( Line | Count | Source | 637 | 0 | do { \ | 638 | 0 | Status _status_ = (stmt); \ | 639 | 0 | if (UNLIKELY(!_status_.ok())) { \ Line | Count | Source | 36 | 0 | #define UNLIKELY(expr) __builtin_expect(!!(expr), 0) Branch (36:24): [True: 0, False: 0]
|
| 640 | 0 | return _status_; \ | 641 | 0 | } \ | 642 | 0 | } while (false) Branch (642:14): [Folded - Ignored]
|
|
339 | 0 | _check_cast_valid(ref_column_def.column, result_tmp_column_def.column)); |
340 | 0 | } |
341 | 0 | swap_idx_list.emplace_back(result_tmp_column_idx, idx); |
342 | 0 | } else if (_schema_mapping[idx].ref_column_idx < 0) { Branch (342:20): [True: 0, False: 0]
|
343 | | // new column, write default value |
344 | 0 | auto* value = _schema_mapping[idx].default_value; |
345 | 0 | auto column = new_block->get_by_position(idx).column->assume_mutable(); |
346 | 0 | if (value->is_null()) { Branch (346:17): [True: 0, False: 0]
|
347 | 0 | DCHECK(column->is_nullable()); |
348 | 0 | column->insert_many_defaults(row_num); |
349 | 0 | } else { |
350 | 0 | auto type_info = get_type_info(_schema_mapping[idx].new_column); |
351 | 0 | DefaultValueColumnIterator::insert_default_data(type_info.get(), value->size(), |
352 | 0 | value->ptr(), column, row_num); |
353 | 0 | } |
354 | 0 | } else { |
355 | | // same type, just swap column |
356 | 0 | swap_idx_list.emplace_back(_schema_mapping[idx].ref_column_idx, idx); |
357 | 0 | } |
358 | 0 | } |
359 | | |
360 | 0 | for (auto it : swap_idx_list) { Branch (360:18): [True: 0, False: 0]
|
361 | 0 | auto& ref_col = ref_block->get_by_position(it.first).column; |
362 | 0 | auto& new_col = new_block->get_by_position(it.second).column; |
363 | |
|
364 | 0 | bool ref_col_nullable = ref_col->is_nullable(); |
365 | 0 | bool new_col_nullable = new_col->is_nullable(); |
366 | |
|
367 | 0 | if (ref_col_nullable != new_col_nullable) { Branch (367:13): [True: 0, False: 0]
|
368 | | // not nullable to nullable |
369 | 0 | if (new_col_nullable) { Branch (369:17): [True: 0, False: 0]
|
370 | 0 | auto* new_nullable_col = |
371 | 0 | assert_cast<vectorized::ColumnNullable*>(new_col->assume_mutable().get()); |
372 | |
|
373 | 0 | new_nullable_col->change_nested_column(ref_col); |
374 | 0 | new_nullable_col->get_null_map_data().resize_fill(ref_col->size()); |
375 | 0 | } else { |
376 | | // nullable to not nullable: |
377 | | // suppose column `c_phone` is originally varchar(16) NOT NULL, |
378 | | // then do schema change `alter table test modify column c_phone int not null`, |
379 | | // the cast expr of schema change is `CastExpr(CAST String to Nullable(Int32))`, |
380 | | // so need to handle nullable to not nullable here |
381 | 0 | auto* ref_nullable_col = |
382 | 0 | assert_cast<vectorized::ColumnNullable*>(ref_col->assume_mutable().get()); |
383 | |
|
384 | 0 | new_col = ref_nullable_col->get_nested_column_ptr(); |
385 | 0 | } |
386 | 0 | } else { |
387 | 0 | new_block->get_by_position(it.second).column = |
388 | 0 | ref_block->get_by_position(it.first).column; |
389 | 0 | } |
390 | 0 | } |
391 | 0 | return Status::OK(); |
392 | 0 | } |
393 | | |
394 | | // This check can prevent schema-change from causing data loss after type cast |
395 | | Status BlockChanger::_check_cast_valid(vectorized::ColumnPtr input_column, |
396 | 0 | vectorized::ColumnPtr output_column) { |
397 | 0 | if (input_column->size() != output_column->size()) { Branch (397:9): [True: 0, False: 0]
|
398 | 0 | return Status::InternalError( |
399 | 0 | "column size is changed, input_column_size={}, output_column_size={}; " |
400 | 0 | "input_column={}", |
401 | 0 | input_column->size(), output_column->size(), input_column->get_name()); |
402 | 0 | } |
403 | 0 | DCHECK_EQ(input_column->size(), output_column->size()) |
404 | 0 | << "length check should have done before calling this function!"; |
405 | |
|
406 | 0 | if (input_column->is_nullable() != output_column->is_nullable()) { Branch (406:9): [True: 0, False: 0]
|
407 | 0 | if (input_column->is_nullable()) { Branch (407:13): [True: 0, False: 0]
|
408 | 0 | const auto* ref_null_map = |
409 | 0 | vectorized::check_and_get_column<vectorized::ColumnNullable>(input_column) |
410 | 0 | ->get_null_map_column() |
411 | 0 | .get_data() |
412 | 0 | .data(); |
413 | |
|
414 | 0 | bool is_changed = false; |
415 | 0 | for (size_t i = 0; i < input_column->size(); i++) { Branch (415:32): [True: 0, False: 0]
|
416 | 0 | is_changed |= ref_null_map[i]; |
417 | 0 | } |
418 | 0 | if (is_changed) { Branch (418:17): [True: 0, False: 0]
|
419 | 0 | return Status::DataQualityError( |
420 | 0 | "some null data is changed to not null, intput_column={}", |
421 | 0 | input_column->get_name()); |
422 | 0 | } |
423 | 0 | } else { |
424 | 0 | const auto& null_map_column = |
425 | 0 | vectorized::check_and_get_column<vectorized::ColumnNullable>(output_column) |
426 | 0 | ->get_null_map_column(); |
427 | 0 | const auto& nested_column = |
428 | 0 | vectorized::check_and_get_column<vectorized::ColumnNullable>(output_column) |
429 | 0 | ->get_nested_column(); |
430 | 0 | const auto* new_null_map = null_map_column.get_data().data(); |
431 | |
|
432 | 0 | if (null_map_column.size() != output_column->size()) { Branch (432:17): [True: 0, False: 0]
|
433 | 0 | return Status::InternalError( |
434 | 0 | "null_map_column size mismatch output_column_size, " |
435 | 0 | "null_map_column_size={}, output_column_size={}; input_column={}", |
436 | 0 | null_map_column.size(), output_column->size(), input_column->get_name()); |
437 | 0 | } |
438 | | |
439 | 0 | if (nested_column.size() != output_column->size()) { Branch (439:17): [True: 0, False: 0]
|
440 | 0 | return Status::InternalError( |
441 | 0 | "nested_column size is changed, nested_column_size={}, " |
442 | 0 | "ouput_column_size={}; input_column={}", |
443 | 0 | nested_column.size(), output_column->size(), input_column->get_name()); |
444 | 0 | } |
445 | | |
446 | 0 | bool is_changed = false; |
447 | 0 | for (size_t i = 0; i < input_column->size(); i++) { Branch (447:32): [True: 0, False: 0]
|
448 | 0 | is_changed |= new_null_map[i]; |
449 | 0 | } |
450 | 0 | if (is_changed) { Branch (450:17): [True: 0, False: 0]
|
451 | 0 | return Status::DataQualityError( |
452 | 0 | "some not null data is changed to null, intput_column={}", |
453 | 0 | input_column->get_name()); |
454 | 0 | } |
455 | 0 | } |
456 | 0 | } |
457 | | |
458 | 0 | if (input_column->is_nullable() && output_column->is_nullable()) { Branch (458:9): [True: 0, False: 0]
Branch (458:40): [True: 0, False: 0]
|
459 | 0 | const auto* ref_null_map = |
460 | 0 | vectorized::check_and_get_column<vectorized::ColumnNullable>(input_column) |
461 | 0 | ->get_null_map_column() |
462 | 0 | .get_data() |
463 | 0 | .data(); |
464 | 0 | const auto* new_null_map = |
465 | 0 | vectorized::check_and_get_column<vectorized::ColumnNullable>(output_column) |
466 | 0 | ->get_null_map_column() |
467 | 0 | .get_data() |
468 | 0 | .data(); |
469 | |
|
470 | 0 | bool is_changed = false; |
471 | 0 | for (size_t i = 0; i < input_column->size(); i++) { Branch (471:28): [True: 0, False: 0]
|
472 | 0 | is_changed |= (ref_null_map[i] != new_null_map[i]); |
473 | 0 | } |
474 | 0 | if (is_changed) { Branch (474:13): [True: 0, False: 0]
|
475 | 0 | return Status::DataQualityError( |
476 | 0 | "null map is changed after calculation, input_column={}", |
477 | 0 | input_column->get_name()); |
478 | 0 | } |
479 | 0 | } |
480 | 0 | return Status::OK(); |
481 | 0 | } |
482 | | |
483 | | Status LinkedSchemaChange::process(RowsetReaderSharedPtr rowset_reader, RowsetWriter* rowset_writer, |
484 | | BaseTabletSPtr new_tablet, BaseTabletSPtr base_tablet, |
485 | | TabletSchemaSPtr base_tablet_schema, |
486 | 0 | TabletSchemaSPtr new_tablet_schema) { |
487 | 0 | Status status = rowset_writer->add_rowset_for_linked_schema_change(rowset_reader->rowset()); |
488 | 0 | if (!status) { Branch (488:9): [True: 0, False: 0]
|
489 | 0 | LOG(WARNING) << "fail to convert rowset." |
490 | 0 | << ", new_tablet=" << new_tablet->tablet_id() |
491 | 0 | << ", version=" << rowset_writer->version().first << "-" |
492 | 0 | << rowset_writer->version().second << ", error status " << status; |
493 | 0 | return status; |
494 | 0 | } |
495 | | // copy delete bitmap to new tablet. |
496 | 0 | if (new_tablet->keys_type() == UNIQUE_KEYS && new_tablet->enable_unique_key_merge_on_write()) { Branch (496:9): [True: 0, False: 0]
Branch (496:51): [True: 0, False: 0]
|
497 | 0 | DeleteBitmap origin_delete_bitmap(base_tablet->tablet_id()); |
498 | 0 | base_tablet->tablet_meta()->delete_bitmap()->subset( |
499 | 0 | {rowset_reader->rowset()->rowset_id(), 0, 0}, |
500 | 0 | {rowset_reader->rowset()->rowset_id(), UINT32_MAX, INT64_MAX}, |
501 | 0 | &origin_delete_bitmap); |
502 | 0 | for (auto& iter : origin_delete_bitmap.delete_bitmap) { Branch (502:25): [True: 0, False: 0]
|
503 | 0 | int ret = new_tablet->tablet_meta()->delete_bitmap()->set( |
504 | 0 | {rowset_writer->rowset_id(), std::get<1>(iter.first), std::get<2>(iter.first)}, |
505 | 0 | iter.second); |
506 | 0 | DCHECK(ret == 1); |
507 | 0 | } |
508 | 0 | } |
509 | 0 | return Status::OK(); |
510 | 0 | } |
511 | | |
512 | | Status VSchemaChangeDirectly::_inner_process(RowsetReaderSharedPtr rowset_reader, |
513 | | RowsetWriter* rowset_writer, BaseTabletSPtr new_tablet, |
514 | | TabletSchemaSPtr base_tablet_schema, |
515 | 0 | TabletSchemaSPtr new_tablet_schema) { |
516 | 0 | bool eof = false; |
517 | 0 | do { |
518 | 0 | auto new_block = vectorized::Block::create_unique(new_tablet_schema->create_block()); |
519 | 0 | auto ref_block = vectorized::Block::create_unique(base_tablet_schema->create_block()); |
520 | |
|
521 | 0 | auto st = rowset_reader->next_block(ref_block.get()); |
522 | 0 | if (!st) { Branch (522:13): [True: 0, False: 0]
|
523 | 0 | if (st.is<ErrorCode::END_OF_FILE>()) { Branch (523:17): [True: 0, False: 0]
|
524 | 0 | if (ref_block->rows() == 0) { Branch (524:21): [True: 0, False: 0]
|
525 | 0 | break; |
526 | 0 | } else { |
527 | 0 | eof = true; |
528 | 0 | } |
529 | 0 | } else { |
530 | 0 | return st; |
531 | 0 | } |
532 | 0 | } |
533 | | |
534 | 0 | RETURN_IF_ERROR(_changer.change_block(ref_block.get(), new_block.get())); Line | Count | Source | 637 | 0 | do { \ | 638 | 0 | Status _status_ = (stmt); \ | 639 | 0 | if (UNLIKELY(!_status_.ok())) { \ Line | Count | Source | 36 | 0 | #define UNLIKELY(expr) __builtin_expect(!!(expr), 0) Branch (36:24): [True: 0, False: 0]
|
| 640 | 0 | return _status_; \ | 641 | 0 | } \ | 642 | 0 | } while (false) Branch (642:14): [Folded - Ignored]
|
|
535 | 0 | RETURN_IF_ERROR(rowset_writer->add_block(new_block.get())); Line | Count | Source | 637 | 0 | do { \ | 638 | 0 | Status _status_ = (stmt); \ | 639 | 0 | if (UNLIKELY(!_status_.ok())) { \ Line | Count | Source | 36 | 0 | #define UNLIKELY(expr) __builtin_expect(!!(expr), 0) Branch (36:24): [True: 0, False: 0]
|
| 640 | 0 | return _status_; \ | 641 | 0 | } \ | 642 | 0 | } while (false) Branch (642:14): [Folded - Ignored]
|
|
536 | 0 | } while (!eof); Branch (536:14): [True: 0, False: 0]
|
537 | | |
538 | 0 | RETURN_IF_ERROR(rowset_writer->flush()); Line | Count | Source | 637 | 0 | do { \ | 638 | 0 | Status _status_ = (stmt); \ | 639 | 0 | if (UNLIKELY(!_status_.ok())) { \ Line | Count | Source | 36 | 0 | #define UNLIKELY(expr) __builtin_expect(!!(expr), 0) Branch (36:24): [True: 0, False: 0]
|
| 640 | 0 | return _status_; \ | 641 | 0 | } \ | 642 | 0 | } while (false) Branch (642:14): [Folded - Ignored]
|
|
539 | 0 | return Status::OK(); |
540 | 0 | } |
541 | | |
542 | | VBaseSchemaChangeWithSorting::VBaseSchemaChangeWithSorting(const BlockChanger& changer, |
543 | | size_t memory_limitation) |
544 | | : _changer(changer), |
545 | | _memory_limitation(memory_limitation), |
546 | 0 | _temp_delta_versions(Version::mock()) { |
547 | 0 | _mem_tracker = std::make_unique<MemTracker>( |
548 | 0 | fmt::format("VSchemaChangeWithSorting:changer={}", std::to_string(int64(&changer)))); |
549 | 0 | } |
550 | | |
551 | | Status VBaseSchemaChangeWithSorting::_inner_process(RowsetReaderSharedPtr rowset_reader, |
552 | | RowsetWriter* rowset_writer, |
553 | | BaseTabletSPtr new_tablet, |
554 | | TabletSchemaSPtr base_tablet_schema, |
555 | 0 | TabletSchemaSPtr new_tablet_schema) { |
556 | | // for internal sorting |
557 | 0 | std::vector<std::unique_ptr<vectorized::Block>> blocks; |
558 | |
|
559 | 0 | RowsetSharedPtr rowset = rowset_reader->rowset(); |
560 | 0 | SegmentsOverlapPB segments_overlap = rowset->rowset_meta()->segments_overlap(); |
561 | 0 | int64_t newest_write_timestamp = rowset->newest_write_timestamp(); |
562 | 0 | _temp_delta_versions.first = _temp_delta_versions.second; |
563 | 0 | _src_rowsets.clear(); // init _src_rowsets |
564 | 0 | auto create_rowset = [&]() -> Status { |
565 | 0 | if (blocks.empty()) { Branch (565:13): [True: 0, False: 0]
|
566 | 0 | return Status::OK(); |
567 | 0 | } |
568 | | |
569 | 0 | auto rowset = DORIS_TRY(_internal_sorting( Line | Count | Source | 716 | 0 | ({ \ | 717 | 0 | auto&& res = (stmt); \ | 718 | 0 | using T = std::decay_t<decltype(res)>; \ | 719 | 0 | if (!res.has_value()) [[unlikely]] { \ Branch (719:13): [True: 0, False: 0]
| 720 | 0 | return std::forward<T>(res).error(); \ | 721 | 0 | } \ | 722 | 0 | std::forward<T>(res).value(); \ | 723 | 0 | }); |
|
570 | 0 | blocks, Version(_temp_delta_versions.second, _temp_delta_versions.second + 1), |
571 | 0 | newest_write_timestamp, new_tablet, BETA_ROWSET, segments_overlap, |
572 | 0 | new_tablet_schema)); |
573 | 0 | _src_rowsets.push_back(std::move(rowset)); |
574 | 0 | for (auto& block : blocks) { Branch (574:26): [True: 0, False: 0]
|
575 | 0 | _mem_tracker->release(block->allocated_bytes()); |
576 | 0 | } |
577 | 0 | blocks.clear(); |
578 | | |
579 | | // increase temp version |
580 | 0 | _temp_delta_versions.second += 2; |
581 | 0 | return Status::OK(); |
582 | 0 | }; |
583 | |
|
584 | 0 | auto new_block = vectorized::Block::create_unique(new_tablet_schema->create_block()); |
585 | |
|
586 | 0 | bool eof = false; |
587 | 0 | do { |
588 | 0 | auto ref_block = vectorized::Block::create_unique(base_tablet_schema->create_block()); |
589 | 0 | auto st = rowset_reader->next_block(ref_block.get()); |
590 | 0 | if (!st) { Branch (590:13): [True: 0, False: 0]
|
591 | 0 | if (st.is<ErrorCode::END_OF_FILE>()) { Branch (591:17): [True: 0, False: 0]
|
592 | 0 | if (ref_block->rows() == 0) { Branch (592:21): [True: 0, False: 0]
|
593 | 0 | break; |
594 | 0 | } else { |
595 | 0 | eof = true; |
596 | 0 | } |
597 | 0 | } else { |
598 | 0 | return st; |
599 | 0 | } |
600 | 0 | } |
601 | | |
602 | 0 | RETURN_IF_ERROR(_changer.change_block(ref_block.get(), new_block.get())); Line | Count | Source | 637 | 0 | do { \ | 638 | 0 | Status _status_ = (stmt); \ | 639 | 0 | if (UNLIKELY(!_status_.ok())) { \ Line | Count | Source | 36 | 0 | #define UNLIKELY(expr) __builtin_expect(!!(expr), 0) Branch (36:24): [True: 0, False: 0]
|
| 640 | 0 | return _status_; \ | 641 | 0 | } \ | 642 | 0 | } while (false) Branch (642:14): [Folded - Ignored]
|
|
603 | | |
604 | 0 | constexpr double HOLD_BLOCK_MEMORY_RATE = |
605 | 0 | 0.66; // Reserve some memory for use by other parts of this job |
606 | 0 | if (_mem_tracker->consumption() + new_block->allocated_bytes() > _memory_limitation || Branch (606:13): [True: 0, False: 0]
|
607 | 0 | _mem_tracker->consumption() > _memory_limitation * HOLD_BLOCK_MEMORY_RATE) { Branch (607:13): [True: 0, False: 0]
|
608 | 0 | RETURN_IF_ERROR(create_rowset()); Line | Count | Source | 637 | 0 | do { \ | 638 | 0 | Status _status_ = (stmt); \ | 639 | 0 | if (UNLIKELY(!_status_.ok())) { \ Line | Count | Source | 36 | 0 | #define UNLIKELY(expr) __builtin_expect(!!(expr), 0) Branch (36:24): [True: 0, False: 0]
|
| 640 | 0 | return _status_; \ | 641 | 0 | } \ | 642 | 0 | } while (false) Branch (642:14): [Folded - Ignored]
|
|
609 | | |
610 | 0 | if (_mem_tracker->consumption() + new_block->allocated_bytes() > _memory_limitation) { Branch (610:17): [True: 0, False: 0]
|
611 | 0 | return Status::Error<INVALID_ARGUMENT>( |
612 | 0 | "Memory limitation is too small for Schema Change. _memory_limitation={}, " |
613 | 0 | "new_block->allocated_bytes()={}, consumption={}", |
614 | 0 | _memory_limitation, new_block->allocated_bytes(), |
615 | 0 | _mem_tracker->consumption()); |
616 | 0 | } |
617 | 0 | } |
618 | 0 | _mem_tracker->consume(new_block->allocated_bytes()); |
619 | | |
620 | | // move unique ptr |
621 | 0 | blocks.push_back(vectorized::Block::create_unique(new_tablet_schema->create_block())); |
622 | 0 | swap(blocks.back(), new_block); |
623 | 0 | } while (!eof); Branch (623:14): [True: 0, False: 0]
|
624 | | |
625 | 0 | RETURN_IF_ERROR(create_rowset()); Line | Count | Source | 637 | 0 | do { \ | 638 | 0 | Status _status_ = (stmt); \ | 639 | 0 | if (UNLIKELY(!_status_.ok())) { \ Line | Count | Source | 36 | 0 | #define UNLIKELY(expr) __builtin_expect(!!(expr), 0) Branch (36:24): [True: 0, False: 0]
|
| 640 | 0 | return _status_; \ | 641 | 0 | } \ | 642 | 0 | } while (false) Branch (642:14): [Folded - Ignored]
|
|
626 | | |
627 | 0 | if (_src_rowsets.empty()) { Branch (627:9): [True: 0, False: 0]
|
628 | 0 | RETURN_IF_ERROR(rowset_writer->flush()); Line | Count | Source | 637 | 0 | do { \ | 638 | 0 | Status _status_ = (stmt); \ | 639 | 0 | if (UNLIKELY(!_status_.ok())) { \ Line | Count | Source | 36 | 0 | #define UNLIKELY(expr) __builtin_expect(!!(expr), 0) Branch (36:24): [True: 0, False: 0]
|
| 640 | 0 | return _status_; \ | 641 | 0 | } \ | 642 | 0 | } while (false) Branch (642:14): [Folded - Ignored]
|
|
629 | 0 | } else { |
630 | 0 | RETURN_IF_ERROR( Line | Count | Source | 637 | 0 | do { \ | 638 | 0 | Status _status_ = (stmt); \ | 639 | 0 | if (UNLIKELY(!_status_.ok())) { \ Line | Count | Source | 36 | 0 | #define UNLIKELY(expr) __builtin_expect(!!(expr), 0) Branch (36:24): [True: 0, False: 0]
|
| 640 | 0 | return _status_; \ | 641 | 0 | } \ | 642 | 0 | } while (false) Branch (642:14): [Folded - Ignored]
|
|
631 | 0 | _external_sorting(_src_rowsets, rowset_writer, new_tablet, new_tablet_schema)); |
632 | 0 | } |
633 | | |
634 | 0 | return Status::OK(); |
635 | 0 | } |
636 | | |
637 | | Result<RowsetSharedPtr> VBaseSchemaChangeWithSorting::_internal_sorting( |
638 | | const std::vector<std::unique_ptr<vectorized::Block>>& blocks, const Version& version, |
639 | | int64_t newest_write_timestamp, BaseTabletSPtr new_tablet, RowsetTypePB new_rowset_type, |
640 | 0 | SegmentsOverlapPB segments_overlap, TabletSchemaSPtr new_tablet_schema) { |
641 | 0 | uint64_t merged_rows = 0; |
642 | 0 | MultiBlockMerger merger(new_tablet); |
643 | 0 | RowsetWriterContext context; |
644 | 0 | context.version = version; |
645 | 0 | context.rowset_state = VISIBLE; |
646 | 0 | context.segments_overlap = segments_overlap; |
647 | 0 | context.tablet_schema = new_tablet_schema; |
648 | 0 | context.newest_write_timestamp = newest_write_timestamp; |
649 | 0 | context.write_type = DataWriteType::TYPE_SCHEMA_CHANGE; |
650 | 0 | std::unique_ptr<RowsetWriter> rowset_writer; |
651 | | // TODO(plat1ko): Use monad op |
652 | 0 | if (auto result = new_tablet->create_rowset_writer(context, false); !result.has_value()) Branch (652:73): [True: 0, False: 0]
|
653 | 0 | [[unlikely]] { |
654 | 0 | return unexpected(std::move(result).error()); |
655 | 0 | } else { |
656 | 0 | rowset_writer = std::move(result).value(); |
657 | 0 | } |
658 | 0 | RETURN_IF_ERROR_RESULT(merger.merge(blocks, rowset_writer.get(), &merged_rows)); Line | Count | Source | 708 | 0 | do { \ | 709 | 0 | Status _status_ = (stmt); \ | 710 | 0 | if (UNLIKELY(!_status_.ok())) { \ Line | Count | Source | 36 | 0 | #define UNLIKELY(expr) __builtin_expect(!!(expr), 0) Branch (36:24): [True: 0, False: 0]
|
| 711 | 0 | return unexpected(std::move(_status_)); \ | 712 | 0 | } \ | 713 | 0 | } while (false) Branch (713:14): [Folded - Ignored]
|
|
659 | 0 | _add_merged_rows(merged_rows); |
660 | 0 | RowsetSharedPtr rowset; |
661 | 0 | RETURN_IF_ERROR_RESULT(rowset_writer->build(rowset)); Line | Count | Source | 708 | 0 | do { \ | 709 | 0 | Status _status_ = (stmt); \ | 710 | 0 | if (UNLIKELY(!_status_.ok())) { \ Line | Count | Source | 36 | 0 | #define UNLIKELY(expr) __builtin_expect(!!(expr), 0) Branch (36:24): [True: 0, False: 0]
|
| 711 | 0 | return unexpected(std::move(_status_)); \ | 712 | 0 | } \ | 713 | 0 | } while (false) Branch (713:14): [Folded - Ignored]
|
|
662 | 0 | return rowset; |
663 | 0 | } |
664 | | |
665 | | Result<RowsetSharedPtr> VLocalSchemaChangeWithSorting::_internal_sorting( |
666 | | const std::vector<std::unique_ptr<vectorized::Block>>& blocks, const Version& version, |
667 | | int64_t newest_write_timestamp, BaseTabletSPtr new_tablet, RowsetTypePB new_rowset_type, |
668 | 0 | SegmentsOverlapPB segments_overlap, TabletSchemaSPtr new_tablet_schema) { |
669 | 0 | uint64_t merged_rows = 0; |
670 | 0 | MultiBlockMerger merger(new_tablet); |
671 | 0 | RowsetWriterContext context; |
672 | 0 | context.version = version; |
673 | 0 | context.rowset_state = VISIBLE; |
674 | 0 | context.segments_overlap = segments_overlap; |
675 | 0 | context.tablet_schema = new_tablet_schema; |
676 | 0 | context.newest_write_timestamp = newest_write_timestamp; |
677 | 0 | context.write_type = DataWriteType::TYPE_SCHEMA_CHANGE; |
678 | 0 | std::unique_ptr<RowsetWriter> rowset_writer; |
679 | | // TODO(plat1ko): Use monad op |
680 | 0 | if (auto result = new_tablet->create_rowset_writer(context, false); !result.has_value()) Branch (680:73): [True: 0, False: 0]
|
681 | 0 | [[unlikely]] { |
682 | 0 | return unexpected(std::move(result).error()); |
683 | 0 | } else { |
684 | 0 | rowset_writer = std::move(result).value(); |
685 | 0 | } |
686 | 0 | auto guard = _local_storage_engine.pending_local_rowsets().add(context.rowset_id); |
687 | 0 | _pending_rs_guards.push_back(std::move(guard)); |
688 | 0 | RETURN_IF_ERROR_RESULT(merger.merge(blocks, rowset_writer.get(), &merged_rows)); Line | Count | Source | 708 | 0 | do { \ | 709 | 0 | Status _status_ = (stmt); \ | 710 | 0 | if (UNLIKELY(!_status_.ok())) { \ Line | Count | Source | 36 | 0 | #define UNLIKELY(expr) __builtin_expect(!!(expr), 0) Branch (36:24): [True: 0, False: 0]
|
| 711 | 0 | return unexpected(std::move(_status_)); \ | 712 | 0 | } \ | 713 | 0 | } while (false) Branch (713:14): [Folded - Ignored]
|
|
689 | 0 | _add_merged_rows(merged_rows); |
690 | 0 | RowsetSharedPtr rowset; |
691 | 0 | RETURN_IF_ERROR_RESULT(rowset_writer->build(rowset)); Line | Count | Source | 708 | 0 | do { \ | 709 | 0 | Status _status_ = (stmt); \ | 710 | 0 | if (UNLIKELY(!_status_.ok())) { \ Line | Count | Source | 36 | 0 | #define UNLIKELY(expr) __builtin_expect(!!(expr), 0) Branch (36:24): [True: 0, False: 0]
|
| 711 | 0 | return unexpected(std::move(_status_)); \ | 712 | 0 | } \ | 713 | 0 | } while (false) Branch (713:14): [Folded - Ignored]
|
|
692 | 0 | return rowset; |
693 | 0 | } |
694 | | |
695 | | Status VBaseSchemaChangeWithSorting::_external_sorting(vector<RowsetSharedPtr>& src_rowsets, |
696 | | RowsetWriter* rowset_writer, |
697 | | BaseTabletSPtr new_tablet, |
698 | 0 | TabletSchemaSPtr new_tablet_schema) { |
699 | 0 | std::vector<RowsetReaderSharedPtr> rs_readers; |
700 | 0 | for (auto& rowset : src_rowsets) { Branch (700:23): [True: 0, False: 0]
|
701 | 0 | RowsetReaderSharedPtr rs_reader; |
702 | 0 | RETURN_IF_ERROR(rowset->create_reader(&rs_reader)); Line | Count | Source | 637 | 0 | do { \ | 638 | 0 | Status _status_ = (stmt); \ | 639 | 0 | if (UNLIKELY(!_status_.ok())) { \ Line | Count | Source | 36 | 0 | #define UNLIKELY(expr) __builtin_expect(!!(expr), 0) Branch (36:24): [True: 0, False: 0]
|
| 640 | 0 | return _status_; \ | 641 | 0 | } \ | 642 | 0 | } while (false) Branch (642:14): [Folded - Ignored]
|
|
703 | 0 | rs_readers.push_back(rs_reader); |
704 | 0 | } |
705 | | |
706 | 0 | Merger::Statistics stats; |
707 | 0 | RETURN_IF_ERROR(Merger::vmerge_rowsets(new_tablet, ReaderType::READER_ALTER_TABLE, Line | Count | Source | 637 | 0 | do { \ | 638 | 0 | Status _status_ = (stmt); \ | 639 | 0 | if (UNLIKELY(!_status_.ok())) { \ Line | Count | Source | 36 | 0 | #define UNLIKELY(expr) __builtin_expect(!!(expr), 0) Branch (36:24): [True: 0, False: 0]
|
| 640 | 0 | return _status_; \ | 641 | 0 | } \ | 642 | 0 | } while (false) Branch (642:14): [Folded - Ignored]
|
|
708 | 0 | *new_tablet_schema, rs_readers, rowset_writer, &stats)); |
709 | 0 | _add_merged_rows(stats.merged_rows); |
710 | 0 | _add_filtered_rows(stats.filtered_rows); |
711 | 0 | return Status::OK(); |
712 | 0 | } |
713 | | |
714 | | Status VLocalSchemaChangeWithSorting::_inner_process(RowsetReaderSharedPtr rowset_reader, |
715 | | RowsetWriter* rowset_writer, |
716 | | BaseTabletSPtr new_tablet, |
717 | | TabletSchemaSPtr base_tablet_schema, |
718 | 0 | TabletSchemaSPtr new_tablet_schema) { |
719 | 0 | Defer defer {[&]() { |
720 | | // remove the intermediate rowsets generated by internal sorting |
721 | 0 | for (auto& row_set : _src_rowsets) { Branch (721:28): [True: 0, False: 0]
|
722 | 0 | _local_storage_engine.add_unused_rowset(row_set); |
723 | 0 | } |
724 | 0 | }}; |
725 | 0 | _pending_rs_guards.clear(); |
726 | 0 | return VBaseSchemaChangeWithSorting::_inner_process(rowset_reader, rowset_writer, new_tablet, |
727 | 0 | base_tablet_schema, new_tablet_schema); |
728 | 0 | } |
729 | | |
730 | 0 | Status SchemaChangeJob::process_alter_tablet(const TAlterTabletReqV2& request) { |
731 | 0 | if (!request.__isset.desc_tbl) { Branch (731:9): [True: 0, False: 0]
|
732 | 0 | return Status::Error<INVALID_ARGUMENT>( |
733 | 0 | "desc_tbl is not set. Maybe the FE version is not equal to the BE " |
734 | 0 | "version."); |
735 | 0 | } |
736 | 0 | if (_base_tablet == nullptr) { Branch (736:9): [True: 0, False: 0]
|
737 | 0 | return Status::Error<TABLE_NOT_FOUND>("fail to find base tablet. base_tablet={}", |
738 | 0 | request.base_tablet_id); |
739 | 0 | } |
740 | 0 | if (_new_tablet == nullptr) { Branch (740:9): [True: 0, False: 0]
|
741 | 0 | return Status::Error<TABLE_NOT_FOUND>("fail to find new tablet. new_tablet={}", |
742 | 0 | request.new_tablet_id); |
743 | 0 | } |
744 | | |
745 | 0 | LOG(INFO) << "begin to do request alter tablet: base_tablet_id=" << request.base_tablet_id |
746 | 0 | << ", new_tablet_id=" << request.new_tablet_id |
747 | 0 | << ", alter_version=" << request.alter_version; |
748 | | |
749 | | // Lock schema_change_lock util schema change info is stored in tablet header |
750 | 0 | std::unique_lock<std::mutex> schema_change_lock(_base_tablet->get_schema_change_lock(), |
751 | 0 | std::try_to_lock); |
752 | 0 | if (!schema_change_lock.owns_lock()) { Branch (752:9): [True: 0, False: 0]
|
753 | 0 | return Status::Error<TRY_LOCK_FAILED>("failed to obtain schema change lock. base_tablet={}", |
754 | 0 | request.base_tablet_id); |
755 | 0 | } |
756 | | |
757 | 0 | Status res = _do_process_alter_tablet(request); |
758 | 0 | LOG(INFO) << "finished alter tablet process, res=" << res; |
759 | 0 | DBUG_EXECUTE_IF("SchemaChangeJob::process_alter_tablet.leave.sleep", { sleep(5); }); Line | Count | Source | 37 | 0 | if (UNLIKELY(config::enable_debug_points)) { \ Line | Count | Source | 36 | 0 | #define UNLIKELY(expr) __builtin_expect(!!(expr), 0) Branch (36:24): [True: 0, False: 0]
|
| 38 | 0 | auto dp = DebugPoints::instance()->get_debug_point(debug_point_name); \ | 39 | 0 | if (dp) { \ Branch (39:13): [True: 0, False: 0]
| 40 | 0 | [[maybe_unused]] auto DP_NAME = debug_point_name; \ | 41 | 0 | { code; } \ | 42 | 0 | } \ | 43 | 0 | } |
|
760 | 0 | return res; |
761 | 0 | } |
762 | | |
763 | | SchemaChangeJob::SchemaChangeJob(StorageEngine& local_storage_engine, |
764 | | const TAlterTabletReqV2& request, const std::string& job_id) |
765 | 0 | : _local_storage_engine(local_storage_engine) { |
766 | 0 | _base_tablet = _local_storage_engine.tablet_manager()->get_tablet(request.base_tablet_id); |
767 | 0 | _new_tablet = _local_storage_engine.tablet_manager()->get_tablet(request.new_tablet_id); |
768 | 0 | if (_base_tablet && _new_tablet) { Branch (768:9): [True: 0, False: 0]
Branch (768:25): [True: 0, False: 0]
|
769 | 0 | _base_tablet_schema = std::make_shared<TabletSchema>(); |
770 | 0 | _base_tablet_schema->update_tablet_columns(*_base_tablet->tablet_schema(), request.columns); |
771 | | // The request only include column info, do not include bitmap or bloomfilter index info, |
772 | | // So we also need to copy index info from the real base tablet |
773 | 0 | _base_tablet_schema->update_index_info_from(*_base_tablet->tablet_schema()); |
774 | | // During a schema change, the extracted columns of a variant should not be included in the tablet schema. |
775 | | // This is because the schema change for a variant needs to ignore the extracted columns. |
776 | | // Otherwise, the schema types in different rowsets might be inconsistent. When performing a schema change, |
777 | | // the complete variant is constructed by reading all the sub-columns of the variant. |
778 | 0 | _new_tablet_schema = _new_tablet->tablet_schema()->copy_without_variant_extracted_columns(); |
779 | 0 | } |
780 | 0 | _job_id = job_id; |
781 | 0 | } |
782 | | |
783 | | // In the past schema change and rollup will create new tablet and will wait for txns starting before the task to finished |
784 | | // It will cost a lot of time to wait and the task is very difficult to understand. |
785 | | // In alter task v2, FE will call BE to create tablet and send an alter task to BE to convert historical data. |
786 | | // The admin should upgrade all BE and then upgrade FE. |
787 | | // Should delete the old code after upgrade finished. |
788 | 0 | Status SchemaChangeJob::_do_process_alter_tablet(const TAlterTabletReqV2& request) { |
789 | 0 | DBUG_EXECUTE_IF("SchemaChangeJob._do_process_alter_tablet.sleep", { sleep(10); }) Line | Count | Source | 37 | 0 | if (UNLIKELY(config::enable_debug_points)) { \ Line | Count | Source | 36 | 0 | #define UNLIKELY(expr) __builtin_expect(!!(expr), 0) Branch (36:24): [True: 0, False: 0]
|
| 38 | 0 | auto dp = DebugPoints::instance()->get_debug_point(debug_point_name); \ | 39 | 0 | if (dp) { \ Branch (39:13): [True: 0, False: 0]
| 40 | 0 | [[maybe_unused]] auto DP_NAME = debug_point_name; \ | 41 | 0 | { code; } \ | 42 | 0 | } \ | 43 | 0 | } |
|
790 | 0 | Status res; |
791 | 0 | signal::tablet_id = _base_tablet->get_table_id(); |
792 | | |
793 | | // check if tablet's state is not_ready, if it is ready, it means the tablet already finished |
794 | | // check whether the tablet's max continuous version == request.version |
795 | 0 | if (_new_tablet->tablet_state() != TABLET_NOTREADY) { Branch (795:9): [True: 0, False: 0]
|
796 | 0 | res = _validate_alter_result(request); |
797 | 0 | LOG(INFO) << "tablet's state=" << _new_tablet->tablet_state() |
798 | 0 | << " the convert job already finished, check its version" |
799 | 0 | << " res=" << res; |
800 | 0 | return res; |
801 | 0 | } |
802 | 0 | _new_tablet->set_alter_failed(false); |
803 | 0 | Defer defer([this] { |
804 | | // if tablet state is not TABLET_RUNNING when return, indicates that alter has failed. |
805 | 0 | if (_new_tablet->tablet_state() != TABLET_RUNNING) { Branch (805:13): [True: 0, False: 0]
|
806 | 0 | _new_tablet->set_alter_failed(true); |
807 | 0 | } |
808 | 0 | }); |
809 | |
|
810 | 0 | LOG(INFO) << "finish to validate alter tablet request. begin to convert data from base tablet " |
811 | 0 | "to new tablet" |
812 | 0 | << " base_tablet=" << _base_tablet->tablet_id() |
813 | 0 | << " new_tablet=" << _new_tablet->tablet_id(); |
814 | |
|
815 | 0 | std::shared_lock base_migration_rlock(_base_tablet->get_migration_lock(), std::try_to_lock); |
816 | 0 | if (!base_migration_rlock.owns_lock()) { Branch (816:9): [True: 0, False: 0]
|
817 | 0 | return Status::Error<TRY_LOCK_FAILED>( |
818 | 0 | "SchemaChangeJob::_do_process_alter_tablet get lock failed"); |
819 | 0 | } |
820 | 0 | std::shared_lock new_migration_rlock(_new_tablet->get_migration_lock(), std::try_to_lock); |
821 | 0 | if (!new_migration_rlock.owns_lock()) { Branch (821:9): [True: 0, False: 0]
|
822 | 0 | return Status::Error<TRY_LOCK_FAILED>( |
823 | 0 | "SchemaChangeJob::_do_process_alter_tablet get lock failed"); |
824 | 0 | } |
825 | | |
826 | 0 | std::vector<Version> versions_to_be_changed; |
827 | 0 | int64_t end_version = -1; |
828 | | // reader_context is stack variables, it's lifetime should keep the same |
829 | | // with rs_readers |
830 | 0 | RowsetReaderContext reader_context; |
831 | 0 | std::vector<RowSetSplits> rs_splits; |
832 | | // delete handlers for new tablet |
833 | 0 | DeleteHandler delete_handler; |
834 | 0 | std::vector<ColumnId> return_columns; |
835 | | |
836 | | // Use tablet schema directly from base tablet, they are the newest schema, not contain |
837 | | // dropped column during light weight schema change. |
838 | | // But the tablet schema in base tablet maybe not the latest from FE, so that if fe pass through |
839 | | // a tablet schema, then use request schema. |
840 | 0 | size_t num_cols = |
841 | 0 | request.columns.empty() ? _base_tablet_schema->num_columns() : request.columns.size(); Branch (841:13): [True: 0, False: 0]
|
842 | 0 | return_columns.resize(num_cols); |
843 | 0 | for (int i = 0; i < num_cols; ++i) { Branch (843:21): [True: 0, False: 0]
|
844 | 0 | return_columns[i] = i; |
845 | 0 | } |
846 | |
|
847 | 0 | DBUG_EXECUTE_IF("SchemaChangeJob::_do_process_alter_tablet.block", DBUG_BLOCK); Line | Count | Source | 37 | 0 | if (UNLIKELY(config::enable_debug_points)) { \ Line | Count | Source | 36 | 0 | #define UNLIKELY(expr) __builtin_expect(!!(expr), 0) Branch (36:24): [True: 0, False: 0]
|
| 38 | 0 | auto dp = DebugPoints::instance()->get_debug_point(debug_point_name); \ | 39 | 0 | if (dp) { \ Branch (39:13): [True: 0, False: 0]
| 40 | 0 | [[maybe_unused]] auto DP_NAME = debug_point_name; \ | 41 | 0 | { code; } \ Branch (41:15): [True: 0, False: 0]
| 42 | 0 | } \ | 43 | 0 | } |
|
848 | | |
849 | | // begin to find deltas to convert from base tablet to new tablet so that |
850 | | // obtain base tablet and new tablet's push lock and header write lock to prevent loading data |
851 | 0 | { |
852 | 0 | std::lock_guard base_tablet_lock(_base_tablet->get_push_lock()); |
853 | 0 | std::lock_guard new_tablet_lock(_new_tablet->get_push_lock()); |
854 | 0 | std::lock_guard base_tablet_wlock(_base_tablet->get_header_lock()); |
855 | 0 | SCOPED_SIMPLE_TRACE_IF_TIMEOUT(TRACE_TABLET_LOCK_THRESHOLD); Line | Count | Source | 31 | 0 | SCOPED_SIMPLE_TRACE_TO_STREAM_IF_TIMEOUT(timeout, LOG(WARNING)) Line | Count | Source | 41 | 0 | using namespace std::chrono_literals; \ | 42 | 0 | auto VARNAME_LINENUM(scoped_simple_trace) = doris::MonotonicMicros(); \ | 43 | 0 | SCOPED_CLEANUP({ \ Line | Count | Source | 34 | 0 | auto VARNAME_LINENUM(scoped_cleanup) = MakeScopedCleanup([&] { func_body }); |
| 44 | 0 | auto VARNAME_LINENUM(timeout_us) = \ | 45 | 0 | std::chrono::duration_cast<std::chrono::microseconds>(timeout).count(); \ | 46 | 0 | auto VARNAME_LINENUM(cost_us) = \ | 47 | 0 | doris::MonotonicMicros() - VARNAME_LINENUM(scoped_simple_trace); \ | 48 | 0 | if (VARNAME_LINENUM(cost_us) >= VARNAME_LINENUM(timeout_us)) { \ | 49 | 0 | stream << "Simple trace cost(us): " << VARNAME_LINENUM(cost_us); \ | 50 | 0 | } \ | 51 | 0 | }) |
|
|
856 | 0 | std::lock_guard<std::shared_mutex> new_tablet_wlock(_new_tablet->get_header_lock()); |
857 | |
|
858 | 0 | do { |
859 | 0 | RowsetSharedPtr max_rowset; |
860 | | // get history data to be converted and it will check if there is hold in base tablet |
861 | 0 | res = _get_versions_to_be_changed(&versions_to_be_changed, &max_rowset); |
862 | 0 | if (!res) { Branch (862:17): [True: 0, False: 0]
|
863 | 0 | LOG(WARNING) << "fail to get version to be changed. res=" << res; |
864 | 0 | break; |
865 | 0 | } |
866 | | |
867 | 0 | DBUG_EXECUTE_IF("SchemaChangeJob.process_alter_tablet.alter_fail", { Line | Count | Source | 37 | 0 | if (UNLIKELY(config::enable_debug_points)) { \ Line | Count | Source | 36 | 0 | #define UNLIKELY(expr) __builtin_expect(!!(expr), 0) Branch (36:24): [True: 0, False: 0]
|
| 38 | 0 | auto dp = DebugPoints::instance()->get_debug_point(debug_point_name); \ | 39 | 0 | if (dp) { \ Branch (39:13): [True: 0, False: 0]
| 40 | 0 | [[maybe_unused]] auto DP_NAME = debug_point_name; \ | 41 | 0 | { code; } \ | 42 | 0 | } \ | 43 | 0 | } |
|
868 | 0 | res = Status::InternalError( |
869 | 0 | "inject alter tablet failed. base_tablet={}, new_tablet={}", |
870 | 0 | request.base_tablet_id, request.new_tablet_id); |
871 | 0 | LOG(WARNING) << "inject error. res=" << res; |
872 | 0 | break; |
873 | 0 | }); |
874 | | |
875 | | // should check the max_version >= request.alter_version, if not the convert is useless |
876 | 0 | if (max_rowset == nullptr || max_rowset->end_version() < request.alter_version) { Branch (876:17): [True: 0, False: 0]
Branch (876:42): [True: 0, False: 0]
|
877 | 0 | res = Status::InternalError( |
878 | 0 | "base tablet's max version={} is less than request version={}", |
879 | 0 | (max_rowset == nullptr ? 0 : max_rowset->end_version()), Branch (879:26): [True: 0, False: 0]
|
880 | 0 | request.alter_version); |
881 | 0 | break; |
882 | 0 | } |
883 | | // before calculating version_to_be_changed, |
884 | | // remove all data from new tablet, prevent to rewrite data(those double pushed when wait) |
885 | 0 | LOG(INFO) << "begin to remove all data before end version from new tablet to prevent " |
886 | 0 | "rewrite." |
887 | 0 | << " new_tablet=" << _new_tablet->tablet_id() |
888 | 0 | << ", end_version=" << max_rowset->end_version(); |
889 | 0 | std::vector<RowsetSharedPtr> rowsets_to_delete; |
890 | 0 | std::vector<std::pair<Version, RowsetSharedPtr>> version_rowsets; |
891 | 0 | _new_tablet->acquire_version_and_rowsets(&version_rowsets); |
892 | 0 | std::sort(version_rowsets.begin(), version_rowsets.end(), |
893 | 0 | [](const std::pair<Version, RowsetSharedPtr>& l, |
894 | 0 | const std::pair<Version, RowsetSharedPtr>& r) { |
895 | 0 | return l.first.first < r.first.first; |
896 | 0 | }); |
897 | 0 | for (auto& pair : version_rowsets) { Branch (897:29): [True: 0, False: 0]
|
898 | 0 | if (pair.first.second <= max_rowset->end_version()) { Branch (898:21): [True: 0, False: 0]
|
899 | 0 | rowsets_to_delete.push_back(pair.second); |
900 | 0 | } else if (pair.first.first <= max_rowset->end_version()) { Branch (900:28): [True: 0, False: 0]
|
901 | | // If max version is [X-10] and new tablet has version [7-9][10-12], |
902 | | // we only can remove [7-9] from new tablet. If we add [X-10] to new tablet, it will has version |
903 | | // cross: [X-10] [10-12]. |
904 | | // So, we should return OLAP_ERR_VERSION_ALREADY_MERGED for fast fail. |
905 | 0 | return Status::Error<VERSION_ALREADY_MERGED>( |
906 | 0 | "New tablet has a version {} crossing base tablet's max_version={}", |
907 | 0 | pair.first.to_string(), max_rowset->end_version()); |
908 | 0 | } |
909 | 0 | } |
910 | 0 | std::vector<RowsetSharedPtr> empty_vec; |
911 | 0 | RETURN_IF_ERROR(_new_tablet->delete_rowsets(rowsets_to_delete, false)); Line | Count | Source | 637 | 0 | do { \ | 638 | 0 | Status _status_ = (stmt); \ | 639 | 0 | if (UNLIKELY(!_status_.ok())) { \ Line | Count | Source | 36 | 0 | #define UNLIKELY(expr) __builtin_expect(!!(expr), 0) Branch (36:24): [True: 0, False: 0]
|
| 640 | 0 | return _status_; \ | 641 | 0 | } \ | 642 | 0 | } while (false) Branch (642:14): [Folded - Ignored]
|
|
912 | | // inherit cumulative_layer_point from base_tablet |
913 | | // check if new_tablet.ce_point > base_tablet.ce_point? |
914 | 0 | _new_tablet->set_cumulative_layer_point(-1); |
915 | | // save tablet meta |
916 | 0 | _new_tablet->save_meta(); |
917 | 0 | for (auto& rowset : rowsets_to_delete) { Branch (917:31): [True: 0, False: 0]
|
918 | | // do not call rowset.remove directly, using gc thread to delete it |
919 | 0 | _local_storage_engine.add_unused_rowset(rowset); |
920 | 0 | } |
921 | | |
922 | | // init one delete handler |
923 | 0 | for (auto& version : versions_to_be_changed) { Branch (923:32): [True: 0, False: 0]
|
924 | 0 | end_version = std::max(end_version, version.second); |
925 | 0 | } |
926 | | |
927 | | // acquire data sources correspond to history versions |
928 | 0 | RETURN_IF_ERROR( Line | Count | Source | 637 | 0 | do { \ | 638 | 0 | Status _status_ = (stmt); \ | 639 | 0 | if (UNLIKELY(!_status_.ok())) { \ Line | Count | Source | 36 | 0 | #define UNLIKELY(expr) __builtin_expect(!!(expr), 0) Branch (36:24): [True: 0, False: 0]
|
| 640 | 0 | return _status_; \ | 641 | 0 | } \ | 642 | 0 | } while (false) Branch (642:14): [Folded - Ignored]
|
|
929 | 0 | _base_tablet->capture_rs_readers_unlocked(versions_to_be_changed, &rs_splits)); |
930 | 0 | if (rs_splits.empty()) { Branch (930:17): [True: 0, False: 0]
|
931 | 0 | res = Status::Error<ALTER_DELTA_DOES_NOT_EXISTS>( |
932 | 0 | "fail to acquire all data sources. version_num={}, data_source_num={}", |
933 | 0 | versions_to_be_changed.size(), rs_splits.size()); |
934 | 0 | break; |
935 | 0 | } |
936 | 0 | std::vector<RowsetMetaSharedPtr> del_preds; |
937 | 0 | for (auto&& split : rs_splits) { Branch (937:31): [True: 0, False: 0]
|
938 | 0 | const auto& rs_meta = split.rs_reader->rowset()->rowset_meta(); |
939 | 0 | if (!rs_meta->has_delete_predicate() || rs_meta->start_version() > end_version) { Branch (939:21): [True: 0, False: 0]
Branch (939:57): [True: 0, False: 0]
|
940 | 0 | continue; |
941 | 0 | } |
942 | 0 | _base_tablet_schema->merge_dropped_columns(*rs_meta->tablet_schema()); |
943 | 0 | del_preds.push_back(rs_meta); |
944 | 0 | } |
945 | 0 | res = delete_handler.init(_base_tablet_schema, del_preds, end_version); |
946 | 0 | if (!res) { Branch (946:17): [True: 0, False: 0]
|
947 | 0 | LOG(WARNING) << "init delete handler failed. base_tablet=" |
948 | 0 | << _base_tablet->tablet_id() << ", end_version=" << end_version; |
949 | 0 | break; |
950 | 0 | } |
951 | | |
952 | 0 | reader_context.reader_type = ReaderType::READER_ALTER_TABLE; |
953 | 0 | reader_context.tablet_schema = _base_tablet_schema; |
954 | 0 | reader_context.need_ordered_result = true; |
955 | 0 | reader_context.delete_handler = &delete_handler; |
956 | 0 | reader_context.return_columns = &return_columns; |
957 | 0 | reader_context.sequence_id_idx = reader_context.tablet_schema->sequence_col_idx(); |
958 | 0 | reader_context.is_unique = _base_tablet->keys_type() == UNIQUE_KEYS; |
959 | 0 | reader_context.batch_size = ALTER_TABLE_BATCH_SIZE; |
960 | 0 | reader_context.delete_bitmap = _base_tablet->tablet_meta()->delete_bitmap(); |
961 | 0 | reader_context.version = Version(0, end_version); |
962 | 0 | for (auto& rs_split : rs_splits) { Branch (962:33): [True: 0, False: 0]
|
963 | 0 | res = rs_split.rs_reader->init(&reader_context); |
964 | 0 | if (!res) { Branch (964:21): [True: 0, False: 0]
|
965 | 0 | LOG(WARNING) << "failed to init rowset reader: " << _base_tablet->tablet_id(); |
966 | 0 | break; |
967 | 0 | } |
968 | 0 | } |
969 | 0 | } while (false); Branch (969:18): [Folded - Ignored]
|
970 | 0 | } |
971 | | |
972 | 0 | do { |
973 | 0 | if (!res) { Branch (973:13): [True: 0, False: 0]
|
974 | 0 | break; |
975 | 0 | } |
976 | 0 | SchemaChangeParams sc_params; |
977 | |
|
978 | 0 | RETURN_IF_ERROR( Line | Count | Source | 637 | 0 | do { \ | 638 | 0 | Status _status_ = (stmt); \ | 639 | 0 | if (UNLIKELY(!_status_.ok())) { \ Line | Count | Source | 36 | 0 | #define UNLIKELY(expr) __builtin_expect(!!(expr), 0) Branch (36:24): [True: 0, False: 0]
|
| 640 | 0 | return _status_; \ | 641 | 0 | } \ | 642 | 0 | } while (false) Branch (642:14): [Folded - Ignored]
|
|
979 | 0 | DescriptorTbl::create(&sc_params.pool, request.desc_tbl, &sc_params.desc_tbl)); |
980 | 0 | sc_params.ref_rowset_readers.reserve(rs_splits.size()); |
981 | 0 | for (RowSetSplits& split : rs_splits) { Branch (981:34): [True: 0, False: 0]
|
982 | 0 | sc_params.ref_rowset_readers.emplace_back(split.rs_reader); |
983 | 0 | } |
984 | 0 | sc_params.delete_handler = &delete_handler; |
985 | 0 | sc_params.be_exec_version = request.be_exec_version; |
986 | 0 | DCHECK(request.__isset.alter_tablet_type); |
987 | 0 | switch (request.alter_tablet_type) { Branch (987:17): [True: 0, False: 0]
|
988 | 0 | case TAlterTabletType::SCHEMA_CHANGE: Branch (988:9): [True: 0, False: 0]
|
989 | 0 | sc_params.alter_tablet_type = AlterTabletType::SCHEMA_CHANGE; |
990 | 0 | break; |
991 | 0 | case TAlterTabletType::ROLLUP: Branch (991:9): [True: 0, False: 0]
|
992 | 0 | sc_params.alter_tablet_type = AlterTabletType::ROLLUP; |
993 | 0 | break; |
994 | 0 | case TAlterTabletType::MIGRATION: Branch (994:9): [True: 0, False: 0]
|
995 | 0 | sc_params.alter_tablet_type = AlterTabletType::MIGRATION; |
996 | 0 | break; |
997 | 0 | } |
998 | 0 | if (request.__isset.materialized_view_params) { Branch (998:13): [True: 0, False: 0]
|
999 | 0 | for (auto item : request.materialized_view_params) { Branch (999:28): [True: 0, False: 0]
|
1000 | 0 | AlterMaterializedViewParam mv_param; |
1001 | 0 | mv_param.column_name = item.column_name; |
1002 | |
|
1003 | 0 | if (item.__isset.mv_expr) { Branch (1003:21): [True: 0, False: 0]
|
1004 | 0 | mv_param.expr = std::make_shared<TExpr>(item.mv_expr); |
1005 | 0 | } |
1006 | 0 | sc_params.materialized_params_map.insert( |
1007 | 0 | std::make_pair(to_lower(item.column_name), mv_param)); |
1008 | 0 | } |
1009 | 0 | } |
1010 | 0 | { |
1011 | 0 | std::lock_guard<std::shared_mutex> wrlock(_mutex); |
1012 | 0 | _tablet_ids_in_converting.insert(_new_tablet->tablet_id()); |
1013 | 0 | } |
1014 | 0 | int64_t real_alter_version = 0; |
1015 | 0 | sc_params.enable_unique_key_merge_on_write = |
1016 | 0 | _new_tablet->enable_unique_key_merge_on_write(); |
1017 | 0 | res = _convert_historical_rowsets(sc_params, &real_alter_version); |
1018 | 0 | { |
1019 | 0 | std::lock_guard<std::shared_mutex> wrlock(_mutex); |
1020 | 0 | _tablet_ids_in_converting.erase(_new_tablet->tablet_id()); |
1021 | 0 | } |
1022 | 0 | if (!res) { Branch (1022:13): [True: 0, False: 0]
|
1023 | 0 | break; |
1024 | 0 | } |
1025 | | |
1026 | 0 | DCHECK_GE(real_alter_version, request.alter_version); |
1027 | |
|
1028 | 0 | if (_new_tablet->keys_type() == UNIQUE_KEYS && Branch (1028:13): [True: 0, False: 0]
|
1029 | 0 | _new_tablet->enable_unique_key_merge_on_write()) { Branch (1029:13): [True: 0, False: 0]
|
1030 | 0 | res = _calc_delete_bitmap_for_mow_table(real_alter_version); |
1031 | 0 | if (!res) { Branch (1031:17): [True: 0, False: 0]
|
1032 | 0 | break; |
1033 | 0 | } |
1034 | 0 | } else { |
1035 | | // set state to ready |
1036 | 0 | std::lock_guard<std::shared_mutex> new_wlock(_new_tablet->get_header_lock()); |
1037 | 0 | SCOPED_SIMPLE_TRACE_IF_TIMEOUT(TRACE_TABLET_LOCK_THRESHOLD); Line | Count | Source | 31 | 0 | SCOPED_SIMPLE_TRACE_TO_STREAM_IF_TIMEOUT(timeout, LOG(WARNING)) Line | Count | Source | 41 | 0 | using namespace std::chrono_literals; \ | 42 | 0 | auto VARNAME_LINENUM(scoped_simple_trace) = doris::MonotonicMicros(); \ | 43 | 0 | SCOPED_CLEANUP({ \ Line | Count | Source | 34 | 0 | auto VARNAME_LINENUM(scoped_cleanup) = MakeScopedCleanup([&] { func_body }); |
| 44 | 0 | auto VARNAME_LINENUM(timeout_us) = \ | 45 | 0 | std::chrono::duration_cast<std::chrono::microseconds>(timeout).count(); \ | 46 | 0 | auto VARNAME_LINENUM(cost_us) = \ | 47 | 0 | doris::MonotonicMicros() - VARNAME_LINENUM(scoped_simple_trace); \ | 48 | 0 | if (VARNAME_LINENUM(cost_us) >= VARNAME_LINENUM(timeout_us)) { \ | 49 | 0 | stream << "Simple trace cost(us): " << VARNAME_LINENUM(cost_us); \ | 50 | 0 | } \ | 51 | 0 | }) |
|
|
1038 | 0 | res = _new_tablet->set_tablet_state(TabletState::TABLET_RUNNING); |
1039 | 0 | if (!res) { Branch (1039:17): [True: 0, False: 0]
|
1040 | 0 | break; |
1041 | 0 | } |
1042 | 0 | _new_tablet->save_meta(); |
1043 | 0 | } |
1044 | 0 | } while (false); Branch (1044:14): [Folded - Ignored]
|
1045 | | |
1046 | 0 | if (res) { Branch (1046:9): [True: 0, False: 0]
|
1047 | | // _validate_alter_result should be outside the above while loop. |
1048 | | // to avoid requiring the header lock twice. |
1049 | 0 | res = _validate_alter_result(request); |
1050 | 0 | } |
1051 | | |
1052 | | // if failed convert history data, then just remove the new tablet |
1053 | 0 | if (!res) { Branch (1053:9): [True: 0, False: 0]
|
1054 | 0 | LOG(WARNING) << "failed to alter tablet. base_tablet=" << _base_tablet->tablet_id() |
1055 | 0 | << ", drop new_tablet=" << _new_tablet->tablet_id(); |
1056 | | // do not drop the new tablet and its data. GC thread will |
1057 | 0 | } |
1058 | |
|
1059 | 0 | return res; |
1060 | 0 | } |
1061 | | |
1062 | 0 | bool SchemaChangeJob::tablet_in_converting(int64_t tablet_id) { |
1063 | 0 | std::shared_lock rdlock(_mutex); |
1064 | 0 | return _tablet_ids_in_converting.find(tablet_id) != _tablet_ids_in_converting.end(); |
1065 | 0 | } |
1066 | | |
1067 | | Status SchemaChangeJob::_get_versions_to_be_changed(std::vector<Version>* versions_to_be_changed, |
1068 | 0 | RowsetSharedPtr* max_rowset) { |
1069 | 0 | RowsetSharedPtr rowset = _base_tablet->get_rowset_with_max_version(); |
1070 | 0 | if (rowset == nullptr) { Branch (1070:9): [True: 0, False: 0]
|
1071 | 0 | return Status::Error<ALTER_DELTA_DOES_NOT_EXISTS>("Tablet has no version. base_tablet={}", |
1072 | 0 | _base_tablet->tablet_id()); |
1073 | 0 | } |
1074 | 0 | *max_rowset = rowset; |
1075 | 0 | *versions_to_be_changed = DORIS_TRY(_base_tablet->capture_consistent_versions_unlocked( Line | Count | Source | 716 | 0 | ({ \ | 717 | 0 | auto&& res = (stmt); \ | 718 | 0 | using T = std::decay_t<decltype(res)>; \ | 719 | 0 | if (!res.has_value()) [[unlikely]] { \ Branch (719:13): [True: 0, False: 0]
| 720 | 0 | return std::forward<T>(res).error(); \ | 721 | 0 | } \ | 722 | 0 | std::forward<T>(res).value(); \ | 723 | 0 | }); |
|
1076 | 0 | Version(0, rowset->version().second), {})); |
1077 | 0 | return Status::OK(); |
1078 | 0 | } |
1079 | | |
1080 | | // The `real_alter_version` parameter indicates that the version of [0-real_alter_version] is |
1081 | | // converted from a base tablet, only used for the mow table now. |
1082 | | Status SchemaChangeJob::_convert_historical_rowsets(const SchemaChangeParams& sc_params, |
1083 | 0 | int64_t* real_alter_version) { |
1084 | 0 | LOG(INFO) << "begin to convert historical rowsets for new_tablet from base_tablet." |
1085 | 0 | << " base_tablet=" << _base_tablet->tablet_id() |
1086 | 0 | << ", new_tablet=" << _new_tablet->tablet_id() << ", job_id=" << _job_id; |
1087 | | |
1088 | | // find end version |
1089 | 0 | int32_t end_version = -1; |
1090 | 0 | for (const auto& ref_rowset_reader : sc_params.ref_rowset_readers) { Branch (1090:40): [True: 0, False: 0]
|
1091 | 0 | if (ref_rowset_reader->version().second > end_version) { Branch (1091:13): [True: 0, False: 0]
|
1092 | 0 | end_version = ref_rowset_reader->version().second; |
1093 | 0 | } |
1094 | 0 | } |
1095 | | |
1096 | | // Add filter information in change, and filter column information will be set in parse_request |
1097 | | // And filter some data every time the row block changes |
1098 | 0 | BlockChanger changer(_new_tablet_schema, *sc_params.desc_tbl); |
1099 | |
|
1100 | 0 | bool sc_sorting = false; |
1101 | 0 | bool sc_directly = false; |
1102 | | |
1103 | | // a.Parse the Alter request and convert it into an internal representation |
1104 | 0 | Status res = parse_request(sc_params, _base_tablet_schema.get(), _new_tablet_schema.get(), |
1105 | 0 | &changer, &sc_sorting, &sc_directly); |
1106 | 0 | LOG(INFO) << "schema change type, sc_sorting: " << sc_sorting |
1107 | 0 | << ", sc_directly: " << sc_directly << ", base_tablet=" << _base_tablet->tablet_id() |
1108 | 0 | << ", new_tablet=" << _new_tablet->tablet_id(); |
1109 | |
|
1110 | 0 | auto process_alter_exit = [&]() -> Status { |
1111 | 0 | { |
1112 | | // save tablet meta here because rowset meta is not saved during add rowset |
1113 | 0 | std::lock_guard new_wlock(_new_tablet->get_header_lock()); |
1114 | 0 | SCOPED_SIMPLE_TRACE_IF_TIMEOUT(TRACE_TABLET_LOCK_THRESHOLD); Line | Count | Source | 31 | 0 | SCOPED_SIMPLE_TRACE_TO_STREAM_IF_TIMEOUT(timeout, LOG(WARNING)) Line | Count | Source | 41 | 0 | using namespace std::chrono_literals; \ | 42 | 0 | auto VARNAME_LINENUM(scoped_simple_trace) = doris::MonotonicMicros(); \ | 43 | 0 | SCOPED_CLEANUP({ \ Line | Count | Source | 34 | 0 | auto VARNAME_LINENUM(scoped_cleanup) = MakeScopedCleanup([&] { func_body }); |
| 44 | 0 | auto VARNAME_LINENUM(timeout_us) = \ | 45 | 0 | std::chrono::duration_cast<std::chrono::microseconds>(timeout).count(); \ | 46 | 0 | auto VARNAME_LINENUM(cost_us) = \ | 47 | 0 | doris::MonotonicMicros() - VARNAME_LINENUM(scoped_simple_trace); \ | 48 | 0 | if (VARNAME_LINENUM(cost_us) >= VARNAME_LINENUM(timeout_us)) { \ | 49 | 0 | stream << "Simple trace cost(us): " << VARNAME_LINENUM(cost_us); \ | 50 | 0 | } \ | 51 | 0 | }) |
|
|
1115 | 0 | _new_tablet->save_meta(); |
1116 | 0 | } |
1117 | 0 | if (res) { Branch (1117:13): [True: 0, False: 0]
|
1118 | 0 | Version test_version(0, end_version); |
1119 | 0 | res = _new_tablet->check_version_integrity(test_version); |
1120 | 0 | } |
1121 | |
|
1122 | 0 | LOG(INFO) << "finish converting rowsets for new_tablet from base_tablet. " |
1123 | 0 | << "base_tablet=" << _base_tablet->tablet_id() |
1124 | 0 | << ", new_tablet=" << _new_tablet->tablet_id(); |
1125 | 0 | return res; |
1126 | 0 | }; |
1127 | |
|
1128 | 0 | if (!res) { Branch (1128:9): [True: 0, False: 0]
|
1129 | 0 | LOG(WARNING) << "failed to parse the request. res=" << res; |
1130 | 0 | return process_alter_exit(); |
1131 | 0 | } |
1132 | | |
1133 | 0 | if (!sc_sorting && !sc_directly && sc_params.alter_tablet_type == AlterTabletType::ROLLUP) { Branch (1133:9): [True: 0, False: 0]
Branch (1133:24): [True: 0, False: 0]
Branch (1133:40): [True: 0, False: 0]
|
1134 | 0 | res = Status::Error<SCHEMA_SCHEMA_INVALID>( |
1135 | 0 | "Don't support to add materialized view by linked schema change"); |
1136 | 0 | return process_alter_exit(); |
1137 | 0 | } |
1138 | | |
1139 | | // b. Generate historical data converter |
1140 | 0 | auto sc_procedure = _get_sc_procedure( |
1141 | 0 | changer, sc_sorting, sc_directly, |
1142 | 0 | _local_storage_engine.memory_limitation_bytes_per_thread_for_schema_change()); |
1143 | |
|
1144 | 0 | DBUG_EXECUTE_IF("SchemaChangeJob::_convert_historical_rowsets.block", DBUG_BLOCK); Line | Count | Source | 37 | 0 | if (UNLIKELY(config::enable_debug_points)) { \ Line | Count | Source | 36 | 0 | #define UNLIKELY(expr) __builtin_expect(!!(expr), 0) Branch (36:24): [True: 0, False: 0]
|
| 38 | 0 | auto dp = DebugPoints::instance()->get_debug_point(debug_point_name); \ | 39 | 0 | if (dp) { \ Branch (39:13): [True: 0, False: 0]
| 40 | 0 | [[maybe_unused]] auto DP_NAME = debug_point_name; \ | 41 | 0 | { code; } \ Branch (41:15): [True: 0, False: 0]
| 42 | 0 | } \ | 43 | 0 | } |
|
1145 | | |
1146 | | // c.Convert historical data |
1147 | 0 | bool have_failure_rowset = false; |
1148 | 0 | for (const auto& rs_reader : sc_params.ref_rowset_readers) { Branch (1148:32): [True: 0, False: 0]
|
1149 | | // set status for monitor |
1150 | | // As long as there is a new_table as running, ref table is set as running |
1151 | | // NOTE If the first sub_table fails first, it will continue to go as normal here |
1152 | | // When tablet create new rowset writer, it may change rowset type, in this case |
1153 | | // linked schema change will not be used. |
1154 | 0 | RowsetWriterContext context; |
1155 | 0 | context.version = rs_reader->version(); |
1156 | 0 | context.rowset_state = VISIBLE; |
1157 | 0 | context.segments_overlap = rs_reader->rowset()->rowset_meta()->segments_overlap(); |
1158 | 0 | context.tablet_schema = _new_tablet_schema; |
1159 | 0 | context.newest_write_timestamp = rs_reader->newest_write_timestamp(); |
1160 | |
|
1161 | 0 | if (!rs_reader->rowset()->is_local()) { Branch (1161:13): [True: 0, False: 0]
|
1162 | 0 | context.storage_resource = |
1163 | 0 | *DORIS_TRY(rs_reader->rowset()->rowset_meta()->remote_storage_resource()); Line | Count | Source | 716 | 0 | ({ \ | 717 | 0 | auto&& res = (stmt); \ | 718 | 0 | using T = std::decay_t<decltype(res)>; \ | 719 | 0 | if (!res.has_value()) [[unlikely]] { \ Branch (719:13): [True: 0, False: 0]
| 720 | 0 | return std::forward<T>(res).error(); \ | 721 | 0 | } \ | 722 | 0 | std::forward<T>(res).value(); \ | 723 | 0 | }); |
|
1164 | 0 | } |
1165 | | |
1166 | 0 | context.write_type = DataWriteType::TYPE_SCHEMA_CHANGE; |
1167 | 0 | auto result = _new_tablet->create_rowset_writer(context, false); |
1168 | 0 | if (!result.has_value()) { Branch (1168:13): [True: 0, False: 0]
|
1169 | 0 | res = Status::Error<ROWSET_BUILDER_INIT>("create_rowset_writer failed, reason={}", |
1170 | 0 | result.error().to_string()); |
1171 | 0 | return process_alter_exit(); |
1172 | 0 | } |
1173 | 0 | auto rowset_writer = std::move(result).value(); |
1174 | 0 | auto pending_rs_guard = _local_storage_engine.add_pending_rowset(context); |
1175 | |
|
1176 | 0 | if (res = sc_procedure->process(rs_reader, rowset_writer.get(), _new_tablet, _base_tablet, |
1177 | 0 | _base_tablet_schema, _new_tablet_schema); |
1178 | 0 | !res) { Branch (1178:13): [True: 0, False: 0]
|
1179 | 0 | LOG(WARNING) << "failed to process the version." |
1180 | 0 | << " version=" << rs_reader->version().first << "-" |
1181 | 0 | << rs_reader->version().second << ", " << res.to_string(); |
1182 | 0 | return process_alter_exit(); |
1183 | 0 | } |
1184 | | // Add the new version of the data to the header |
1185 | | // In order to prevent the occurrence of deadlock, we must first lock the old table, and then lock the new table |
1186 | 0 | std::lock_guard lock(_new_tablet->get_push_lock()); |
1187 | 0 | RowsetSharedPtr new_rowset; |
1188 | 0 | if (!(res = rowset_writer->build(new_rowset)).ok()) { Branch (1188:13): [True: 0, False: 0]
|
1189 | 0 | LOG(WARNING) << "failed to build rowset, exit alter process"; |
1190 | 0 | return process_alter_exit(); |
1191 | 0 | } |
1192 | 0 | res = _new_tablet->add_rowset(new_rowset); |
1193 | 0 | if (res.is<PUSH_VERSION_ALREADY_EXIST>()) { Branch (1193:13): [True: 0, False: 0]
|
1194 | 0 | LOG(WARNING) << "version already exist, version revert occurred. " |
1195 | 0 | << "tablet=" << _new_tablet->tablet_id() << ", version='" |
1196 | 0 | << rs_reader->version().first << "-" << rs_reader->version().second; |
1197 | 0 | _local_storage_engine.add_unused_rowset(new_rowset); |
1198 | 0 | have_failure_rowset = true; |
1199 | 0 | res = Status::OK(); |
1200 | 0 | } else if (!res) { Branch (1200:20): [True: 0, False: 0]
|
1201 | 0 | LOG(WARNING) << "failed to register new version. " |
1202 | 0 | << " tablet=" << _new_tablet->tablet_id() |
1203 | 0 | << ", version=" << rs_reader->version().first << "-" |
1204 | 0 | << rs_reader->version().second; |
1205 | 0 | _local_storage_engine.add_unused_rowset(new_rowset); |
1206 | 0 | return process_alter_exit(); |
1207 | 0 | } else { |
1208 | 0 | VLOG_NOTICE << "register new version. tablet=" << _new_tablet->tablet_id() Line | Count | Source | 42 | 0 | #define VLOG_NOTICE VLOG(3) |
|
1209 | 0 | << ", version=" << rs_reader->version().first << "-" |
1210 | 0 | << rs_reader->version().second; |
1211 | 0 | } |
1212 | 0 | if (!have_failure_rowset) { Branch (1212:13): [True: 0, False: 0]
|
1213 | 0 | *real_alter_version = rs_reader->version().second; |
1214 | 0 | } |
1215 | |
|
1216 | 0 | VLOG_TRACE << "succeed to convert a history version." Line | Count | Source | 40 | 0 | #define VLOG_TRACE VLOG(10) |
|
1217 | 0 | << " version=" << rs_reader->version().first << "-" |
1218 | 0 | << rs_reader->version().second; |
1219 | 0 | } |
1220 | | |
1221 | | // XXX:The SchemaChange state should not be canceled at this time, because the new Delta has to be converted to the old and new Schema version |
1222 | 0 | return process_alter_exit(); |
1223 | 0 | } |
1224 | | |
1225 | | static const std::string WHERE_SIGN_LOWER = to_lower("__DORIS_WHERE_SIGN__"); |
1226 | | |
1227 | | // @static |
1228 | | // Analyze the mapping of the column and the mapping of the filter key |
1229 | | Status SchemaChangeJob::parse_request(const SchemaChangeParams& sc_params, |
1230 | | TabletSchema* base_tablet_schema, |
1231 | | TabletSchema* new_tablet_schema, BlockChanger* changer, |
1232 | 0 | bool* sc_sorting, bool* sc_directly) { |
1233 | 0 | changer->set_type(sc_params.alter_tablet_type); |
1234 | 0 | changer->set_compatible_version(sc_params.be_exec_version); |
1235 | |
|
1236 | 0 | const std::unordered_map<std::string, AlterMaterializedViewParam>& materialized_function_map = |
1237 | 0 | sc_params.materialized_params_map; |
1238 | 0 | DescriptorTbl desc_tbl = *sc_params.desc_tbl; |
1239 | | |
1240 | | // set column mapping |
1241 | 0 | for (int i = 0, new_schema_size = new_tablet_schema->num_columns(); i < new_schema_size; ++i) { Branch (1241:73): [True: 0, False: 0]
|
1242 | 0 | const TabletColumn& new_column = new_tablet_schema->column(i); |
1243 | 0 | const std::string& column_name_lower = to_lower(new_column.name()); |
1244 | 0 | ColumnMapping* column_mapping = changer->get_mutable_column_mapping(i); |
1245 | 0 | column_mapping->new_column = &new_column; |
1246 | |
|
1247 | 0 | column_mapping->ref_column_idx = base_tablet_schema->field_index(new_column.name()); |
1248 | |
|
1249 | 0 | if (materialized_function_map.find(column_name_lower) != materialized_function_map.end()) { Branch (1249:13): [True: 0, False: 0]
|
1250 | 0 | auto mv_param = materialized_function_map.find(column_name_lower)->second; |
1251 | 0 | column_mapping->expr = mv_param.expr; |
1252 | 0 | if (column_mapping->expr != nullptr) { Branch (1252:17): [True: 0, False: 0]
|
1253 | 0 | continue; |
1254 | 0 | } |
1255 | 0 | } |
1256 | | |
1257 | 0 | if (column_mapping->ref_column_idx >= 0) { Branch (1257:13): [True: 0, False: 0]
|
1258 | 0 | continue; |
1259 | 0 | } |
1260 | | |
1261 | 0 | if (sc_params.alter_tablet_type == ROLLUP) { Branch (1261:13): [True: 0, False: 0]
|
1262 | 0 | std::string materialized_function_map_str; |
1263 | 0 | for (auto str : materialized_function_map) { Branch (1263:27): [True: 0, False: 0]
|
1264 | 0 | if (!materialized_function_map_str.empty()) { Branch (1264:21): [True: 0, False: 0]
|
1265 | 0 | materialized_function_map_str += ','; |
1266 | 0 | } |
1267 | 0 | materialized_function_map_str += str.first; |
1268 | 0 | } |
1269 | 0 | return Status::InternalError( |
1270 | 0 | "referenced column was missing. [column={},materialized_function_map={}]", |
1271 | 0 | new_column.name(), materialized_function_map_str); |
1272 | 0 | } |
1273 | | |
1274 | 0 | if (new_column.name().find("__doris_shadow_") == 0) { Branch (1274:13): [True: 0, False: 0]
|
1275 | | // Should delete in the future, just a protection for bug. |
1276 | 0 | LOG(INFO) << "a shadow column is encountered " << new_column.name(); |
1277 | 0 | return Status::InternalError("failed due to operate on shadow column"); |
1278 | 0 | } |
1279 | | // Newly added column go here |
1280 | 0 | column_mapping->ref_column_idx = -1; |
1281 | |
|
1282 | 0 | if (i < base_tablet_schema->num_short_key_columns()) { Branch (1282:13): [True: 0, False: 0]
|
1283 | 0 | *sc_directly = true; |
1284 | 0 | } |
1285 | 0 | RETURN_IF_ERROR( Line | Count | Source | 637 | 0 | do { \ | 638 | 0 | Status _status_ = (stmt); \ | 639 | 0 | if (UNLIKELY(!_status_.ok())) { \ Line | Count | Source | 36 | 0 | #define UNLIKELY(expr) __builtin_expect(!!(expr), 0) Branch (36:24): [True: 0, False: 0]
|
| 640 | 0 | return _status_; \ | 641 | 0 | } \ | 642 | 0 | } while (false) Branch (642:14): [Folded - Ignored]
|
|
1286 | 0 | _init_column_mapping(column_mapping, new_column, new_column.default_value())); |
1287 | | |
1288 | 0 | LOG(INFO) << "A column with default value will be added after schema changing. " |
1289 | 0 | << "column=" << new_column.name() |
1290 | 0 | << ", default_value=" << new_column.default_value(); |
1291 | 0 | } |
1292 | | |
1293 | 0 | if (materialized_function_map.contains(WHERE_SIGN_LOWER)) { Branch (1293:9): [True: 0, False: 0]
|
1294 | 0 | changer->set_where_expr(materialized_function_map.find(WHERE_SIGN_LOWER)->second.expr); |
1295 | 0 | } |
1296 | | |
1297 | | // If the reference sequence of the Key column is out of order, it needs to be reordered |
1298 | 0 | int num_default_value = 0; |
1299 | |
|
1300 | 0 | for (int i = 0, new_schema_size = new_tablet_schema->num_key_columns(); i < new_schema_size; Branch (1300:77): [True: 0, False: 0]
|
1301 | 0 | ++i) { |
1302 | 0 | ColumnMapping* column_mapping = changer->get_mutable_column_mapping(i); |
1303 | |
|
1304 | 0 | if (!column_mapping->has_reference()) { Branch (1304:13): [True: 0, False: 0]
|
1305 | 0 | num_default_value++; |
1306 | 0 | continue; |
1307 | 0 | } |
1308 | | |
1309 | 0 | if (column_mapping->ref_column_idx != i - num_default_value) { Branch (1309:13): [True: 0, False: 0]
|
1310 | 0 | *sc_sorting = true; |
1311 | 0 | return Status::OK(); |
1312 | 0 | } |
1313 | 0 | } |
1314 | | |
1315 | 0 | if (base_tablet_schema->keys_type() != new_tablet_schema->keys_type()) { Branch (1315:9): [True: 0, False: 0]
|
1316 | | // only when base table is dup and mv is agg |
1317 | | // the rollup job must be reagg. |
1318 | 0 | *sc_sorting = true; |
1319 | 0 | return Status::OK(); |
1320 | 0 | } |
1321 | | |
1322 | | // If the sort of key has not been changed but the new keys num is less then base's, |
1323 | | // the new table should be re agg. |
1324 | | // So we also need to set sc_sorting = true. |
1325 | | // A, B, C are keys(sort keys), D is value |
1326 | | // followings need resort: |
1327 | | // old keys: A B C D |
1328 | | // new keys: A B |
1329 | 0 | if (new_tablet_schema->keys_type() != KeysType::DUP_KEYS && Branch (1329:9): [True: 0, False: 0]
|
1330 | 0 | new_tablet_schema->num_key_columns() < base_tablet_schema->num_key_columns()) { Branch (1330:9): [True: 0, False: 0]
|
1331 | | // this is a table with aggregate key type, and num of key columns in new schema |
1332 | | // is less, which means the data in new tablet should be more aggregated. |
1333 | | // so we use sorting schema change to sort and merge the data. |
1334 | 0 | *sc_sorting = true; |
1335 | 0 | return Status::OK(); |
1336 | 0 | } |
1337 | | |
1338 | 0 | if (sc_params.alter_tablet_type == ROLLUP) { Branch (1338:9): [True: 0, False: 0]
|
1339 | 0 | *sc_directly = true; |
1340 | 0 | return Status::OK(); |
1341 | 0 | } |
1342 | | |
1343 | 0 | if (sc_params.enable_unique_key_merge_on_write && Branch (1343:9): [True: 0, False: 0]
|
1344 | 0 | new_tablet_schema->num_key_columns() > base_tablet_schema->num_key_columns()) { Branch (1344:9): [True: 0, False: 0]
|
1345 | 0 | *sc_directly = true; |
1346 | 0 | return Status::OK(); |
1347 | 0 | } |
1348 | | |
1349 | 0 | if (base_tablet_schema->num_short_key_columns() != new_tablet_schema->num_short_key_columns()) { Branch (1349:9): [True: 0, False: 0]
|
1350 | | // the number of short_keys changed, can't do linked schema change |
1351 | 0 | *sc_directly = true; |
1352 | 0 | return Status::OK(); |
1353 | 0 | } |
1354 | | |
1355 | 0 | if (!sc_params.delete_handler->empty()) { Branch (1355:9): [True: 0, False: 0]
|
1356 | | // there exists delete condition in header, can't do linked schema change |
1357 | 0 | *sc_directly = true; |
1358 | 0 | return Status::OK(); |
1359 | 0 | } |
1360 | | |
1361 | | // if new tablet enable row store, or new tablet has different row store columns |
1362 | 0 | if ((!base_tablet_schema->exist_column(BeConsts::ROW_STORE_COL) && Branch (1362:10): [True: 0, False: 0]
|
1363 | 0 | new_tablet_schema->exist_column(BeConsts::ROW_STORE_COL)) || Branch (1363:10): [True: 0, False: 0]
|
1364 | 0 | !std::equal(new_tablet_schema->row_columns_uids().begin(), Branch (1364:9): [True: 0, False: 0]
|
1365 | 0 | new_tablet_schema->row_columns_uids().end(), |
1366 | 0 | base_tablet_schema->row_columns_uids().begin(), |
1367 | 0 | base_tablet_schema->row_columns_uids().end())) { |
1368 | 0 | *sc_directly = true; |
1369 | 0 | } |
1370 | |
|
1371 | 0 | for (size_t i = 0; i < new_tablet_schema->num_columns(); ++i) { Branch (1371:24): [True: 0, False: 0]
|
1372 | 0 | ColumnMapping* column_mapping = changer->get_mutable_column_mapping(i); |
1373 | 0 | if (column_mapping->expr != nullptr) { Branch (1373:13): [True: 0, False: 0]
|
1374 | 0 | *sc_directly = true; |
1375 | 0 | return Status::OK(); |
1376 | 0 | } else if (column_mapping->ref_column_idx >= 0) { Branch (1376:20): [True: 0, False: 0]
|
1377 | | // index changed |
1378 | 0 | if (vectorized::schema_util::has_schema_index_diff( Branch (1378:17): [True: 0, False: 0]
|
1379 | 0 | new_tablet_schema, base_tablet_schema, i, column_mapping->ref_column_idx)) { |
1380 | 0 | *sc_directly = true; |
1381 | 0 | return Status::OK(); |
1382 | 0 | } |
1383 | 0 | } |
1384 | 0 | } |
1385 | | |
1386 | | // if rs_reader has remote files, link schema change is not supported, |
1387 | | // use directly schema change instead. |
1388 | 0 | if (!(*sc_directly) && !(*sc_sorting)) { Branch (1388:9): [True: 0, False: 0]
Branch (1388:28): [True: 0, False: 0]
|
1389 | | // check has remote rowset |
1390 | | // work for cloud and cold storage |
1391 | 0 | for (const auto& rs_reader : sc_params.ref_rowset_readers) { Branch (1391:36): [True: 0, False: 0]
|
1392 | 0 | if (!rs_reader->rowset()->is_local()) { Branch (1392:17): [True: 0, False: 0]
|
1393 | 0 | *sc_directly = true; |
1394 | 0 | break; |
1395 | 0 | } |
1396 | 0 | } |
1397 | 0 | } |
1398 | |
|
1399 | 0 | return Status::OK(); |
1400 | 0 | } |
1401 | | |
1402 | | Status SchemaChangeJob::_init_column_mapping(ColumnMapping* column_mapping, |
1403 | | const TabletColumn& column_schema, |
1404 | 0 | const std::string& value) { |
1405 | 0 | if (auto field = WrapperField::create(column_schema); field.has_value()) { Branch (1405:59): [True: 0, False: 0]
|
1406 | 0 | column_mapping->default_value = field.value(); |
1407 | 0 | } else { |
1408 | 0 | return field.error(); |
1409 | 0 | } |
1410 | | |
1411 | 0 | if (column_schema.is_nullable() && value.length() == 0) { Branch (1411:9): [True: 0, False: 0]
Branch (1411:40): [True: 0, False: 0]
|
1412 | 0 | column_mapping->default_value->set_null(); |
1413 | 0 | } else { |
1414 | 0 | RETURN_IF_ERROR(column_mapping->default_value->from_string(value, column_schema.precision(), Line | Count | Source | 637 | 0 | do { \ | 638 | 0 | Status _status_ = (stmt); \ | 639 | 0 | if (UNLIKELY(!_status_.ok())) { \ Line | Count | Source | 36 | 0 | #define UNLIKELY(expr) __builtin_expect(!!(expr), 0) Branch (36:24): [True: 0, False: 0]
|
| 640 | 0 | return _status_; \ | 641 | 0 | } \ | 642 | 0 | } while (false) Branch (642:14): [Folded - Ignored]
|
|
1415 | 0 | column_schema.frac())); |
1416 | 0 | } |
1417 | | |
1418 | 0 | return Status::OK(); |
1419 | 0 | } |
1420 | | |
1421 | 0 | Status SchemaChangeJob::_validate_alter_result(const TAlterTabletReqV2& request) { |
1422 | 0 | Version max_continuous_version = {-1, 0}; |
1423 | 0 | _new_tablet->max_continuous_version_from_beginning(&max_continuous_version); |
1424 | 0 | LOG(INFO) << "find max continuous version of tablet=" << _new_tablet->tablet_id() |
1425 | 0 | << ", start_version=" << max_continuous_version.first |
1426 | 0 | << ", end_version=" << max_continuous_version.second; |
1427 | 0 | if (max_continuous_version.second < request.alter_version) { Branch (1427:9): [True: 0, False: 0]
|
1428 | 0 | return Status::InternalError("result version={} is less than request version={}", |
1429 | 0 | max_continuous_version.second, request.alter_version); |
1430 | 0 | } |
1431 | | |
1432 | 0 | std::vector<std::pair<Version, RowsetSharedPtr>> version_rowsets; |
1433 | 0 | { |
1434 | 0 | std::shared_lock rdlock(_new_tablet->get_header_lock()); |
1435 | 0 | _new_tablet->acquire_version_and_rowsets(&version_rowsets); |
1436 | 0 | } |
1437 | 0 | for (auto& pair : version_rowsets) { Branch (1437:21): [True: 0, False: 0]
|
1438 | 0 | RowsetSharedPtr rowset = pair.second; |
1439 | 0 | if (!rowset->check_file_exist()) { Branch (1439:13): [True: 0, False: 0]
|
1440 | 0 | return Status::Error<NOT_FOUND>( |
1441 | 0 | "SchemaChangeJob::_validate_alter_result meet invalid rowset"); |
1442 | 0 | } |
1443 | 0 | } |
1444 | 0 | return Status::OK(); |
1445 | 0 | } |
1446 | | |
1447 | | // For unique with merge-on-write table, should process delete bitmap here. |
1448 | | // 1. During double write, the newly imported rowsets does not calculate |
1449 | | // delete bitmap and publish successfully. |
1450 | | // 2. After conversion, calculate delete bitmap for the rowsets imported |
1451 | | // during double write. During this period, new data can still be imported |
1452 | | // witout calculating delete bitmap and publish successfully. |
1453 | | // 3. Block the new publish, calculate the delete bitmap of the |
1454 | | // incremental rowsets. |
1455 | | // 4. Switch the tablet status to TABLET_RUNNING. The newly imported |
1456 | | // data will calculate delete bitmap. |
1457 | 0 | Status SchemaChangeJob::_calc_delete_bitmap_for_mow_table(int64_t alter_version) { |
1458 | 0 | DBUG_EXECUTE_IF("SchemaChangeJob._calc_delete_bitmap_for_mow_table.random_failed", { Line | Count | Source | 37 | 0 | if (UNLIKELY(config::enable_debug_points)) { \ Line | Count | Source | 36 | 0 | #define UNLIKELY(expr) __builtin_expect(!!(expr), 0) Branch (36:24): [True: 0, False: 0]
|
| 38 | 0 | auto dp = DebugPoints::instance()->get_debug_point(debug_point_name); \ | 39 | 0 | if (dp) { \ Branch (39:13): [True: 0, False: 0]
| 40 | 0 | [[maybe_unused]] auto DP_NAME = debug_point_name; \ | 41 | 0 | { code; } \ Branch (41:15): [True: 0, False: 0]
| 42 | 0 | } \ | 43 | 0 | } |
|
1459 | 0 | if (rand() % 100 < (100 * dp->param("percent", 0.1))) { |
1460 | 0 | LOG_WARNING("SchemaChangeJob._calc_delete_bitmap_for_mow_table.random_failed"); |
1461 | 0 | return Status::InternalError("debug schema change calc delete bitmap random failed"); |
1462 | 0 | } |
1463 | 0 | }); |
1464 | | |
1465 | | // can't do compaction when calc delete bitmap, if the rowset being calculated does |
1466 | | // a compaction, it may cause the delete bitmap to be missed. |
1467 | 0 | std::lock_guard base_compaction_lock(_new_tablet->get_base_compaction_lock()); |
1468 | 0 | std::lock_guard cumu_compaction_lock(_new_tablet->get_cumulative_compaction_lock()); |
1469 | | |
1470 | | // step 2 |
1471 | 0 | int64_t max_version = _new_tablet->max_version().second; |
1472 | 0 | std::vector<RowsetSharedPtr> rowsets; |
1473 | 0 | if (alter_version < max_version) { Branch (1473:9): [True: 0, False: 0]
|
1474 | 0 | LOG(INFO) << "alter table for unique with merge-on-write, calculate delete bitmap of " |
1475 | 0 | << "double write rowsets for version: " << alter_version + 1 << "-" << max_version |
1476 | 0 | << " new_tablet=" << _new_tablet->tablet_id(); |
1477 | 0 | std::shared_lock rlock(_new_tablet->get_header_lock()); |
1478 | 0 | auto ret = DORIS_TRY(_new_tablet->capture_consistent_rowsets_unlocked( Line | Count | Source | 716 | 0 | ({ \ | 717 | 0 | auto&& res = (stmt); \ | 718 | 0 | using T = std::decay_t<decltype(res)>; \ | 719 | 0 | if (!res.has_value()) [[unlikely]] { \ Branch (719:13): [True: 0, False: 0]
| 720 | 0 | return std::forward<T>(res).error(); \ | 721 | 0 | } \ | 722 | 0 | std::forward<T>(res).value(); \ | 723 | 0 | }); |
|
1479 | 0 | {alter_version + 1, max_version}, CaptureRowsetOps {})); |
1480 | 0 | rowsets = std::move(ret.rowsets); |
1481 | 0 | } |
1482 | 0 | for (auto rowset_ptr : rowsets) { Branch (1482:26): [True: 0, False: 0]
|
1483 | 0 | std::lock_guard rwlock(_new_tablet->get_rowset_update_lock()); |
1484 | 0 | std::shared_lock rlock(_new_tablet->get_header_lock()); |
1485 | 0 | RETURN_IF_ERROR(Tablet::update_delete_bitmap_without_lock(_new_tablet, rowset_ptr)); Line | Count | Source | 637 | 0 | do { \ | 638 | 0 | Status _status_ = (stmt); \ | 639 | 0 | if (UNLIKELY(!_status_.ok())) { \ Line | Count | Source | 36 | 0 | #define UNLIKELY(expr) __builtin_expect(!!(expr), 0) Branch (36:24): [True: 0, False: 0]
|
| 640 | 0 | return _status_; \ | 641 | 0 | } \ | 642 | 0 | } while (false) Branch (642:14): [Folded - Ignored]
|
|
1486 | 0 | } |
1487 | | |
1488 | | // step 3 |
1489 | 0 | std::lock_guard rwlock(_new_tablet->get_rowset_update_lock()); |
1490 | 0 | std::lock_guard new_wlock(_new_tablet->get_header_lock()); |
1491 | 0 | SCOPED_SIMPLE_TRACE_IF_TIMEOUT(TRACE_TABLET_LOCK_THRESHOLD); Line | Count | Source | 31 | 0 | SCOPED_SIMPLE_TRACE_TO_STREAM_IF_TIMEOUT(timeout, LOG(WARNING)) Line | Count | Source | 41 | 0 | using namespace std::chrono_literals; \ | 42 | 0 | auto VARNAME_LINENUM(scoped_simple_trace) = doris::MonotonicMicros(); \ | 43 | 0 | SCOPED_CLEANUP({ \ Line | Count | Source | 34 | 0 | auto VARNAME_LINENUM(scoped_cleanup) = MakeScopedCleanup([&] { func_body }); |
| 44 | 0 | auto VARNAME_LINENUM(timeout_us) = \ | 45 | 0 | std::chrono::duration_cast<std::chrono::microseconds>(timeout).count(); \ | 46 | 0 | auto VARNAME_LINENUM(cost_us) = \ | 47 | 0 | doris::MonotonicMicros() - VARNAME_LINENUM(scoped_simple_trace); \ | 48 | 0 | if (VARNAME_LINENUM(cost_us) >= VARNAME_LINENUM(timeout_us)) { \ | 49 | 0 | stream << "Simple trace cost(us): " << VARNAME_LINENUM(cost_us); \ | 50 | 0 | } \ | 51 | 0 | }) |
|
|
1492 | 0 | int64_t new_max_version = _new_tablet->max_version_unlocked(); |
1493 | 0 | rowsets.clear(); |
1494 | 0 | if (max_version < new_max_version) { Branch (1494:9): [True: 0, False: 0]
|
1495 | 0 | LOG(INFO) << "alter table for unique with merge-on-write, calculate delete bitmap of " |
1496 | 0 | << "incremental rowsets for version: " << max_version + 1 << "-" |
1497 | 0 | << new_max_version << " new_tablet=" << _new_tablet->tablet_id(); |
1498 | 0 | auto ret = DORIS_TRY(_new_tablet->capture_consistent_rowsets_unlocked( Line | Count | Source | 716 | 0 | ({ \ | 717 | 0 | auto&& res = (stmt); \ | 718 | 0 | using T = std::decay_t<decltype(res)>; \ | 719 | 0 | if (!res.has_value()) [[unlikely]] { \ Branch (719:13): [True: 0, False: 0]
| 720 | 0 | return std::forward<T>(res).error(); \ | 721 | 0 | } \ | 722 | 0 | std::forward<T>(res).value(); \ | 723 | 0 | }); |
|
1499 | 0 | {max_version + 1, new_max_version}, CaptureRowsetOps {})); |
1500 | 0 | rowsets = std::move(ret.rowsets); |
1501 | 0 | } |
1502 | 0 | for (auto&& rowset_ptr : rowsets) { Branch (1502:28): [True: 0, False: 0]
|
1503 | 0 | RETURN_IF_ERROR(Tablet::update_delete_bitmap_without_lock(_new_tablet, rowset_ptr)); Line | Count | Source | 637 | 0 | do { \ | 638 | 0 | Status _status_ = (stmt); \ | 639 | 0 | if (UNLIKELY(!_status_.ok())) { \ Line | Count | Source | 36 | 0 | #define UNLIKELY(expr) __builtin_expect(!!(expr), 0) Branch (36:24): [True: 0, False: 0]
|
| 640 | 0 | return _status_; \ | 641 | 0 | } \ | 642 | 0 | } while (false) Branch (642:14): [Folded - Ignored]
|
|
1504 | 0 | } |
1505 | | // step 4 |
1506 | 0 | RETURN_IF_ERROR(_new_tablet->set_tablet_state(TabletState::TABLET_RUNNING)); Line | Count | Source | 637 | 0 | do { \ | 638 | 0 | Status _status_ = (stmt); \ | 639 | 0 | if (UNLIKELY(!_status_.ok())) { \ Line | Count | Source | 36 | 0 | #define UNLIKELY(expr) __builtin_expect(!!(expr), 0) Branch (36:24): [True: 0, False: 0]
|
| 640 | 0 | return _status_; \ | 641 | 0 | } \ | 642 | 0 | } while (false) Branch (642:14): [Folded - Ignored]
|
|
1507 | 0 | _new_tablet->save_meta(); |
1508 | 0 | return Status::OK(); |
1509 | 0 | } |
1510 | | |
1511 | | } // namespace doris |