be/src/exec/sink/vrow_distribution.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "exec/sink/vrow_distribution.h" |
19 | | |
20 | | #include <gen_cpp/FrontendService.h> |
21 | | #include <gen_cpp/FrontendService_types.h> |
22 | | #include <glog/logging.h> |
23 | | |
24 | | #include <cstdint> |
25 | | #include <memory> |
26 | | #include <string> |
27 | | |
28 | | #include "common/cast_set.h" |
29 | | #include "common/logging.h" |
30 | | #include "common/metrics/doris_metrics.h" |
31 | | #include "common/status.h" |
32 | | #include "core/assert_cast.h" |
33 | | #include "core/column/column.h" |
34 | | #include "core/column/column_const.h" |
35 | | #include "core/column/column_nullable.h" |
36 | | #include "core/column/column_vector.h" |
37 | | #include "core/data_type/data_type.h" |
38 | | #include "exec/sink/writer/vtablet_writer.h" |
39 | | #include "runtime/exec_env.h" |
40 | | #include "runtime/query_context.h" |
41 | | #include "runtime/runtime_state.h" |
42 | | #include "service/backend_options.h" |
43 | | #include "util/client_cache.h" |
44 | | #include "util/debug_points.h" |
45 | | #include "util/thrift_rpc_helper.h" |
46 | | |
47 | | namespace doris { |
48 | | #include "common/compile_check_begin.h" |
49 | | |
50 | 13 | std::pair<VExprContextSPtrs, VExprSPtrs> VRowDistribution::_get_partition_function() { |
51 | 13 | return {_vpartition->get_part_func_ctx(), _vpartition->get_partition_function()}; |
52 | 13 | } |
53 | | |
54 | | Status VRowDistribution::_save_missing_values( |
55 | | const Block& input_block, |
56 | | std::vector<std::vector<std::string>>& col_strs, // non-const ref for move |
57 | | int col_size, Block* block, const std::vector<uint32_t>& filter, |
58 | 2 | const std::vector<const NullMap*>& col_null_maps) { |
59 | | // de-duplication for new partitions but save all rows. |
60 | 2 | RETURN_IF_ERROR( |
61 | 2 | _batching_block->add_rows(&input_block, filter.data(), filter.data() + filter.size())); |
62 | 2 | std::vector<TNullableStringLiteral> cur_row_values; |
63 | 6 | for (int row = 0; row < col_strs[0].size(); ++row) { |
64 | 4 | cur_row_values.clear(); |
65 | 8 | for (int col = 0; col < col_size; ++col) { |
66 | 4 | TNullableStringLiteral node; |
67 | 4 | const auto* null_map = col_null_maps[col]; // null map for this col |
68 | 4 | node.__set_is_null((null_map && (*null_map)[filter[row]]) |
69 | 4 | ? true |
70 | 4 | : node.is_null); // if not, dont change(default false) |
71 | 4 | if (!node.is_null) { |
72 | 4 | node.__set_value(col_strs[col][row]); |
73 | 4 | } |
74 | 4 | cur_row_values.push_back(node); |
75 | 4 | } |
76 | 4 | if (!_deduper.contains(cur_row_values)) { |
77 | 2 | _deduper.insert(cur_row_values); |
78 | 2 | _partitions_need_create.emplace_back(cur_row_values); |
79 | 2 | } |
80 | 4 | } |
81 | | |
82 | | // to avoid too large mem use |
83 | 2 | if (_batching_block->rows() > _batch_size) { |
84 | 0 | _deal_batched = true; |
85 | 0 | } |
86 | 2 | _batching_rows = _batching_block->rows(); |
87 | 2 | VLOG_NOTICE << "pushed some batching lines, now numbers = " << _batching_rows; |
88 | | |
89 | 2 | return Status::OK(); |
90 | 2 | } |
91 | | |
92 | 2 | void VRowDistribution::clear_batching_stats() { |
93 | 2 | _partitions_need_create.clear(); |
94 | 2 | _batching_rows = 0; |
95 | 2 | _batching_bytes = 0; |
96 | 2 | } |
97 | | |
98 | 2 | Status VRowDistribution::automatic_create_partition() { |
99 | 2 | MonotonicStopWatch timer; |
100 | 2 | if (_state->enable_profile() && _state->profile_level() >= 2) { |
101 | 0 | timer.start(); |
102 | 0 | } |
103 | | |
104 | 2 | SCOPED_TIMER(_add_partition_request_timer); |
105 | 2 | TCreatePartitionRequest request; |
106 | 2 | TCreatePartitionResult result; |
107 | 2 | bool injected = false; |
108 | 2 | std::string be_endpoint = BackendOptions::get_be_endpoint(); |
109 | 2 | request.__set_txn_id(_txn_id); |
110 | 2 | request.__set_db_id(_vpartition->db_id()); |
111 | 2 | request.__set_table_id(_vpartition->table_id()); |
112 | 2 | request.__set_partitionValues(_partitions_need_create); |
113 | 2 | request.__set_be_endpoint(be_endpoint); |
114 | 2 | request.__set_write_single_replica(_write_single_replica); |
115 | 2 | if (_state && _state->get_query_ctx()) { |
116 | | // Pass query_id to FE so it can determine if this is a multi-instance load by checking Coordinator |
117 | 2 | request.__set_query_id(_state->get_query_ctx()->query_id()); |
118 | 2 | } |
119 | | |
120 | 2 | DBUG_EXECUTE_IF("VRowDistribution.automatic_create_partition.inject_result", { |
121 | 2 | DBUG_RUN_CALLBACK(&request, &result); |
122 | 2 | injected = true; |
123 | 2 | }); |
124 | | |
125 | 2 | VLOG_NOTICE << "automatic partition rpc begin request " << request; |
126 | 2 | if (!injected) { |
127 | 0 | std::shared_ptr<TNetworkAddress> master_addr; |
128 | 0 | if (_vpartition->get_master_address() == nullptr) { |
129 | 0 | auto* cluster_info = ExecEnv::GetInstance()->cluster_info(); |
130 | 0 | if (cluster_info == nullptr) { |
131 | 0 | return Status::InternalError("cluster_info is null"); |
132 | 0 | } |
133 | 0 | master_addr = std::make_shared<TNetworkAddress>(cluster_info->master_fe_addr); |
134 | 0 | } else { |
135 | 0 | master_addr = _vpartition->get_master_address(); |
136 | 0 | } |
137 | 0 | int time_out = _state->execution_timeout() * 1000; |
138 | 0 | RETURN_IF_ERROR(ThriftRpcHelper::rpc<FrontendServiceClient>( |
139 | 0 | master_addr->hostname, master_addr->port, |
140 | 0 | [&request, &result](FrontendServiceConnection& client) { |
141 | 0 | client->createPartition(result, request); |
142 | 0 | }, |
143 | 0 | time_out)); |
144 | 0 | } |
145 | | |
146 | 2 | Status status(Status::create(result.status)); |
147 | 2 | VLOG_NOTICE << "automatic partition rpc end response " << result; |
148 | 2 | if (result.status.status_code == TStatusCode::OK) { |
149 | | // add new created partitions |
150 | 2 | RETURN_IF_ERROR(_vpartition->add_partitions(result.partitions)); |
151 | 2 | for (const auto& part : result.partitions) { |
152 | 2 | _new_partition_ids.insert(part.id); |
153 | 2 | VLOG_TRACE << "record new id: " << part.id; |
154 | 2 | } |
155 | 2 | RETURN_IF_ERROR(_create_partition_callback(_caller, &result)); |
156 | 2 | } |
157 | | |
158 | | // Record this request's elapsed time |
159 | 2 | if (_state->enable_profile() && _state->profile_level() >= 2) { |
160 | 0 | int64_t elapsed_ns = timer.elapsed_time(); |
161 | 0 | _add_partition_request_times.push_back(elapsed_ns); |
162 | 0 | } |
163 | 2 | return status; |
164 | 2 | } |
165 | | |
166 | | // for reuse the same create callback of create-partition |
167 | 1 | static TCreatePartitionResult cast_as_create_result(TReplacePartitionResult& arg) { |
168 | 1 | TCreatePartitionResult result; |
169 | 1 | result.status = arg.status; |
170 | 1 | result.nodes = std::move(arg.nodes); |
171 | 1 | result.partitions = std::move(arg.partitions); |
172 | 1 | result.tablets = std::move(arg.tablets); |
173 | 1 | result.slave_tablets = std::move(arg.slave_tablets); |
174 | 1 | return result; |
175 | 1 | } |
176 | | |
177 | | // use _partitions and replace them |
178 | 2 | Status VRowDistribution::_replace_overwriting_partition() { |
179 | 2 | SCOPED_TIMER(_add_partition_request_timer); // also for replace_partition |
180 | 2 | TReplacePartitionRequest request; |
181 | 2 | TReplacePartitionResult result; |
182 | 2 | bool injected = false; |
183 | 2 | request.__set_overwrite_group_id(_vpartition->get_overwrite_group_id()); |
184 | 2 | request.__set_db_id(_vpartition->db_id()); |
185 | 2 | request.__set_table_id(_vpartition->table_id()); |
186 | 2 | request.__set_write_single_replica(_write_single_replica); |
187 | | |
188 | | // only request for partitions not recorded for replacement |
189 | 2 | std::set<int64_t> id_deduper; |
190 | 3 | for (const auto* part : _partitions) { |
191 | 3 | if (part != nullptr) { |
192 | 3 | if (_new_partition_ids.contains(part->id)) { |
193 | | // this is a new partition. dont replace again. |
194 | 1 | VLOG_TRACE << "skip new partition: " << part->id; |
195 | 2 | } else { |
196 | | // request for replacement |
197 | 2 | id_deduper.insert(part->id); |
198 | 2 | } |
199 | 3 | } else if (_missing_map.empty()) { |
200 | | // no origin partition. and not allow to create. |
201 | 0 | return Status::InvalidArgument( |
202 | 0 | "Cannot found origin partitions in auto detect overwriting, stop " |
203 | 0 | "processing"); |
204 | 0 | } // else: part is null and _missing_map is not empty. dealed outside using auto-partition way. nothing to do here. |
205 | 3 | } |
206 | 2 | if (id_deduper.empty()) { |
207 | 1 | return Status::OK(); // no need to request |
208 | 1 | } |
209 | | // de-duplicate. there's no check in FE |
210 | 1 | std::vector<int64_t> request_part_ids(id_deduper.begin(), id_deduper.end()); |
211 | | |
212 | 1 | request.__set_partition_ids(request_part_ids); |
213 | | |
214 | 1 | std::string be_endpoint = BackendOptions::get_be_endpoint(); |
215 | 1 | request.__set_be_endpoint(be_endpoint); |
216 | 1 | if (_state && _state->get_query_ctx()) { |
217 | | // Pass query_id to FE so it can determine if this is a multi-instance load by checking Coordinator |
218 | 1 | request.__set_query_id(_state->get_query_ctx()->query_id()); |
219 | 1 | } |
220 | | |
221 | 1 | DBUG_EXECUTE_IF("VRowDistribution.replace_overwriting_partition.inject_result", { |
222 | 1 | DBUG_RUN_CALLBACK(&request, &result); |
223 | 1 | injected = true; |
224 | 1 | }); |
225 | | |
226 | 1 | VLOG_NOTICE << "auto detect replace partition request: " << request; |
227 | 1 | if (!injected) { |
228 | 0 | std::shared_ptr<TNetworkAddress> master_addr; |
229 | 0 | if (_vpartition->get_master_address() == nullptr) { |
230 | 0 | auto* cluster_info = ExecEnv::GetInstance()->cluster_info(); |
231 | 0 | if (cluster_info == nullptr) { |
232 | 0 | return Status::InternalError("cluster_info is null"); |
233 | 0 | } |
234 | 0 | master_addr = std::make_shared<TNetworkAddress>(cluster_info->master_fe_addr); |
235 | 0 | } else { |
236 | 0 | master_addr = _vpartition->get_master_address(); |
237 | 0 | } |
238 | 0 | int time_out = _state->execution_timeout() * 1000; |
239 | 0 | RETURN_IF_ERROR(ThriftRpcHelper::rpc<FrontendServiceClient>( |
240 | 0 | master_addr->hostname, master_addr->port, |
241 | 0 | [&request, &result](FrontendServiceConnection& client) { |
242 | 0 | client->replacePartition(result, request); |
243 | 0 | }, |
244 | 0 | time_out)); |
245 | 0 | } |
246 | | |
247 | 1 | Status status(Status::create(result.status)); |
248 | 1 | VLOG_NOTICE << "auto detect replace partition result: " << result; |
249 | 1 | if (result.status.status_code == TStatusCode::OK) { |
250 | | // record new partitions |
251 | 2 | for (const auto& part : result.partitions) { |
252 | 2 | _new_partition_ids.insert(part.id); |
253 | 2 | VLOG_TRACE << "record new id: " << part.id; |
254 | 2 | } |
255 | | // replace data in _partitions |
256 | 1 | RETURN_IF_ERROR(_vpartition->replace_partitions(request_part_ids, result.partitions)); |
257 | | // reuse the function as the args' structure are same. it add nodes/locations and incremental_open |
258 | 1 | auto result_as_create = cast_as_create_result(result); |
259 | 1 | RETURN_IF_ERROR(_create_partition_callback(_caller, &result_as_create)); |
260 | 1 | } |
261 | | |
262 | 1 | return status; |
263 | 1 | } |
264 | | |
265 | | void VRowDistribution::_get_tablet_ids(Block* block, int32_t index_idx, |
266 | 9 | std::vector<int64_t>& tablet_ids) { |
267 | 9 | tablet_ids.resize(block->rows()); |
268 | 26 | for (int row_idx = 0; row_idx < block->rows(); row_idx++) { |
269 | 17 | if (_skip[row_idx]) { |
270 | 6 | continue; |
271 | 6 | } |
272 | 11 | auto& partition = _partitions[row_idx]; |
273 | 11 | auto& tablet_index = _tablet_indexes[row_idx]; |
274 | 11 | auto& index = partition->indexes[index_idx]; |
275 | | |
276 | 11 | auto tablet_id = index.tablets[tablet_index]; |
277 | 11 | tablet_ids[row_idx] = tablet_id; |
278 | 11 | } |
279 | 9 | } |
280 | | |
281 | 7 | void VRowDistribution::_filter_block_by_skip(Block* block, RowPartTabletIds& row_part_tablet_id) { |
282 | 7 | auto& row_ids = row_part_tablet_id.row_ids; |
283 | 7 | auto& partition_ids = row_part_tablet_id.partition_ids; |
284 | 7 | auto& tablet_ids = row_part_tablet_id.tablet_ids; |
285 | | |
286 | 7 | auto rows = block->rows(); |
287 | | // row count of a block should not exceed UINT32_MAX |
288 | 7 | auto rows_uint32 = cast_set<uint32_t>(rows); |
289 | 19 | for (uint32_t i = 0; i < rows_uint32; i++) { |
290 | 12 | if (!_skip[i]) { |
291 | 6 | row_ids.emplace_back(i); |
292 | 6 | partition_ids.emplace_back(_partitions[i]->id); |
293 | 6 | tablet_ids.emplace_back(_tablet_ids[i]); |
294 | 6 | } |
295 | 12 | } |
296 | 7 | } |
297 | | |
298 | | Status VRowDistribution::_filter_block_by_skip_and_where_clause( |
299 | 2 | Block* block, const VExprContextSPtr& where_clause, RowPartTabletIds& row_part_tablet_id) { |
300 | | // TODO |
301 | | //SCOPED_RAW_TIMER(&_stat.where_clause_ns); |
302 | 2 | int result_index = -1; |
303 | 2 | size_t column_number = block->columns(); |
304 | 2 | RETURN_IF_ERROR(where_clause->execute(block, &result_index)); |
305 | | |
306 | 2 | auto filter_column = block->get_by_position(result_index).column; |
307 | | |
308 | 2 | auto& row_ids = row_part_tablet_id.row_ids; |
309 | 2 | auto& partition_ids = row_part_tablet_id.partition_ids; |
310 | 2 | auto& tablet_ids = row_part_tablet_id.tablet_ids; |
311 | 2 | if (const auto* nullable_column = check_and_get_column<ColumnNullable>(*filter_column)) { |
312 | 0 | auto rows = block->rows(); |
313 | | // row count of a block should not exceed UINT32_MAX |
314 | 0 | auto rows_uint32 = cast_set<uint32_t>(rows); |
315 | 0 | for (uint32_t i = 0; i < rows_uint32; i++) { |
316 | 0 | if (nullable_column->get_bool_inline(i) && !_skip[i]) { |
317 | 0 | row_ids.emplace_back(i); |
318 | 0 | partition_ids.emplace_back(_partitions[i]->id); |
319 | 0 | tablet_ids.emplace_back(_tablet_ids[i]); |
320 | 0 | } |
321 | 0 | } |
322 | 2 | } else if (const auto* const_column = check_and_get_column<ColumnConst>(*filter_column)) { |
323 | 1 | bool ret = const_column->get_bool(0); |
324 | 1 | if (!ret) { |
325 | 1 | return Status::OK(); |
326 | 1 | } |
327 | | // should we optimize? |
328 | 0 | _filter_block_by_skip(block, row_part_tablet_id); |
329 | 1 | } else { |
330 | 1 | const auto& filter = assert_cast<const ColumnUInt8&>(*filter_column).get_data(); |
331 | 1 | auto rows = block->rows(); |
332 | | // row count of a block should not exceed UINT32_MAX |
333 | 1 | auto rows_uint32 = cast_set<uint32_t>(rows); |
334 | 4 | for (uint32_t i = 0; i < rows_uint32; i++) { |
335 | 3 | if (filter[i] != 0 && !_skip[i]) { |
336 | 2 | row_ids.emplace_back(i); |
337 | 2 | partition_ids.emplace_back(_partitions[i]->id); |
338 | 2 | tablet_ids.emplace_back(_tablet_ids[i]); |
339 | 2 | } |
340 | 3 | } |
341 | 1 | } |
342 | | |
343 | 1 | for (size_t i = block->columns() - 1; i >= column_number; i--) { |
344 | 0 | block->erase(i); |
345 | 0 | } |
346 | 1 | return Status::OK(); |
347 | 2 | } |
348 | | |
349 | | Status VRowDistribution::_filter_block(Block* block, |
350 | 9 | std::vector<RowPartTabletIds>& row_part_tablet_ids) { |
351 | 18 | for (int i = 0; i < _schema->indexes().size(); i++) { |
352 | 9 | _get_tablet_ids(block, i, _tablet_ids); |
353 | 9 | auto& where_clause = _schema->indexes()[i]->where_clause; |
354 | 9 | if (where_clause != nullptr) { |
355 | 2 | RETURN_IF_ERROR(_filter_block_by_skip_and_where_clause(block, where_clause, |
356 | 2 | row_part_tablet_ids[i])); |
357 | 7 | } else { |
358 | 7 | _filter_block_by_skip(block, row_part_tablet_ids[i]); |
359 | 7 | } |
360 | 9 | } |
361 | 9 | return Status::OK(); |
362 | 9 | } |
363 | | |
364 | | Status VRowDistribution::_generate_rows_distribution_for_non_auto_partition( |
365 | 5 | Block* block, bool has_filtered_rows, std::vector<RowPartTabletIds>& row_part_tablet_ids) { |
366 | 5 | int num_rows = cast_set<int>(block->rows()); |
367 | | |
368 | 5 | RETURN_IF_ERROR(_tablet_finder->find_tablets(_state, block, num_rows, _partitions, |
369 | 5 | _tablet_indexes, _skip)); |
370 | 5 | if (has_filtered_rows) { |
371 | 0 | for (int i = 0; i < num_rows; i++) { |
372 | 0 | _skip[i] = _skip[i] || _block_convertor->filter_map()[i]; |
373 | 0 | } |
374 | 0 | } |
375 | 5 | RETURN_IF_ERROR(_filter_block(block, row_part_tablet_ids)); |
376 | 5 | return Status::OK(); |
377 | 5 | } |
378 | | |
379 | | Status VRowDistribution::_deal_missing_map(const Block& input_block, Block* block, |
380 | | const std::vector<uint16_t>& partition_cols_idx, |
381 | 2 | int64_t& rows_stat_val) { |
382 | | // for missing partition keys, calc the missing partition and save in _partitions_need_create |
383 | 2 | auto [part_ctxs, part_exprs] = _get_partition_function(); |
384 | 2 | int part_col_num = cast_set<int>(part_exprs.size()); |
385 | | // the two vectors are in column-first-order |
386 | 2 | std::vector<std::vector<std::string>> col_strs; |
387 | 2 | std::vector<const NullMap*> col_null_maps; |
388 | 2 | col_strs.resize(part_col_num); |
389 | 2 | col_null_maps.reserve(part_col_num); |
390 | | |
391 | 2 | auto format_options = DataTypeSerDe::get_default_format_options(); |
392 | 2 | format_options.timezone = &_state->timezone_obj(); |
393 | | |
394 | 4 | for (int i = 0; i < part_col_num; ++i) { |
395 | 2 | auto return_type = part_exprs[i]->data_type(); |
396 | | // expose the data column. the return type would be nullable |
397 | 2 | const auto& [range_left_col, col_const] = |
398 | 2 | unpack_if_const(block->get_by_position(partition_cols_idx[i]).column); |
399 | 2 | if (range_left_col->is_nullable()) { |
400 | 0 | col_null_maps.push_back(&( |
401 | 0 | assert_cast<const ColumnNullable*>(range_left_col.get())->get_null_map_data())); |
402 | 2 | } else { |
403 | 2 | col_null_maps.push_back(nullptr); |
404 | 2 | } |
405 | 4 | for (auto row : _missing_map) { |
406 | 4 | col_strs[i].push_back(return_type->to_string( |
407 | 4 | *range_left_col, index_check_const(row, col_const), format_options)); |
408 | 4 | } |
409 | 2 | } |
410 | | |
411 | | // calc the end value and save them. in the end of sending, we will create partitions for them and deal them. |
412 | | // NOTE: must save old batching stats before calling _save_missing_values(), |
413 | | // because _save_missing_values() will update _batching_rows internally. |
414 | 2 | size_t old_bt_rows = _batching_rows; |
415 | 2 | size_t old_bt_bytes = _batching_bytes; |
416 | | |
417 | 2 | RETURN_IF_ERROR(_save_missing_values(input_block, col_strs, part_col_num, block, _missing_map, |
418 | 2 | col_null_maps)); |
419 | | |
420 | 2 | size_t new_bt_rows = _batching_block->rows(); |
421 | 2 | size_t new_bt_bytes = _batching_block->bytes(); |
422 | 2 | rows_stat_val -= new_bt_rows - old_bt_rows; |
423 | 2 | _state->update_num_rows_load_total(old_bt_rows - new_bt_rows); |
424 | 2 | _state->update_num_bytes_load_total(old_bt_bytes - new_bt_bytes); |
425 | 2 | DorisMetrics::instance()->load_rows->increment(old_bt_rows - new_bt_rows); |
426 | 2 | DorisMetrics::instance()->load_bytes->increment(old_bt_bytes - new_bt_bytes); |
427 | | |
428 | 2 | return Status::OK(); |
429 | 2 | } |
430 | | |
431 | | Status VRowDistribution::_generate_rows_distribution_for_auto_partition( |
432 | | const Block& input_block, Block* block, const std::vector<uint16_t>& partition_cols_idx, |
433 | | bool has_filtered_rows, std::vector<RowPartTabletIds>& row_part_tablet_ids, |
434 | 2 | int64_t& rows_stat_val) { |
435 | 2 | int num_rows = cast_set<int>(block->rows()); |
436 | 2 | std::vector<uint16_t> partition_keys = _vpartition->get_partition_keys(); |
437 | | |
438 | 2 | auto& partition_col = block->get_by_position(partition_keys[0]); |
439 | 2 | _missing_map.clear(); |
440 | 2 | _missing_map.reserve(partition_col.column->size()); |
441 | | |
442 | 2 | RETURN_IF_ERROR(_tablet_finder->find_tablets(_state, block, num_rows, _partitions, |
443 | 2 | _tablet_indexes, _skip, &_missing_map)); |
444 | | |
445 | | // the missing vals for auto partition are also skipped. |
446 | 2 | if (has_filtered_rows) { |
447 | 0 | for (int i = 0; i < num_rows; i++) { |
448 | 0 | _skip[i] = _skip[i] || _block_convertor->filter_map()[i]; |
449 | 0 | } |
450 | 0 | } |
451 | 2 | RETURN_IF_ERROR(_filter_block(block, row_part_tablet_ids)); |
452 | | |
453 | 2 | if (!_missing_map.empty()) { |
454 | 2 | RETURN_IF_ERROR(_deal_missing_map(input_block, block, partition_cols_idx, |
455 | 2 | rows_stat_val)); // send input block to save |
456 | 2 | } |
457 | 2 | return Status::OK(); |
458 | 2 | } |
459 | | |
460 | | Status VRowDistribution::_generate_rows_distribution_for_auto_overwrite( |
461 | | const Block& input_block, Block* block, const std::vector<uint16_t>& partition_cols_idx, |
462 | | bool has_filtered_rows, std::vector<RowPartTabletIds>& row_part_tablet_ids, |
463 | 2 | int64_t& rows_stat_val) { |
464 | 2 | int num_rows = cast_set<int>(block->rows()); |
465 | | |
466 | | // for non-auto-partition situation, goes into two 'else' branch. just find the origin partitions, replace them by rpc, |
467 | | // and find the new partitions to use. |
468 | | // for auto-partition's, find and save origins in _partitions and replace them. at meanwhile save the missing values for auto |
469 | | // partition. then we find partition again to get replaced partitions in _partitions. this time _missing_map is ignored cuz |
470 | | // we already saved missing values. |
471 | 2 | if (_vpartition->is_auto_partition() && |
472 | 2 | _state->query_options().enable_auto_create_when_overwrite) { |
473 | | // allow auto create partition for missing rows. |
474 | 0 | std::vector<uint16_t> partition_keys = _vpartition->get_partition_keys(); |
475 | 0 | auto partition_col = block->get_by_position(partition_keys[0]); |
476 | 0 | _missing_map.clear(); |
477 | 0 | _missing_map.reserve(partition_col.column->size()); |
478 | |
|
479 | 0 | RETURN_IF_ERROR(_tablet_finder->find_tablets(_state, block, num_rows, _partitions, |
480 | 0 | _tablet_indexes, _skip, &_missing_map)); |
481 | | |
482 | | // allow and really need to create during auto-detect-overwriting. |
483 | 0 | if (!_missing_map.empty()) { |
484 | 0 | RETURN_IF_ERROR( |
485 | 0 | _deal_missing_map(input_block, block, partition_cols_idx, rows_stat_val)); |
486 | 0 | } |
487 | 2 | } else { |
488 | 2 | RETURN_IF_ERROR(_tablet_finder->find_tablets(_state, block, num_rows, _partitions, |
489 | 2 | _tablet_indexes, _skip)); |
490 | 2 | } |
491 | 2 | RETURN_IF_ERROR(_replace_overwriting_partition()); |
492 | | |
493 | | // regenerate locations for new partitions & tablets |
494 | 2 | _reset_find_tablets(num_rows); |
495 | 2 | if (_vpartition->is_auto_partition() && |
496 | 2 | _state->query_options().enable_auto_create_when_overwrite) { |
497 | | // here _missing_map is just a placeholder |
498 | 0 | RETURN_IF_ERROR(_tablet_finder->find_tablets(_state, block, num_rows, _partitions, |
499 | 0 | _tablet_indexes, _skip, &_missing_map)); |
500 | 0 | if (VLOG_TRACE_IS_ON) { |
501 | 0 | std::string tmp; |
502 | 0 | for (auto v : _missing_map) { |
503 | 0 | tmp += std::to_string(v).append(", "); |
504 | 0 | } |
505 | 0 | VLOG_TRACE << "Trace missing map of " << this << ':' << tmp; |
506 | 0 | } |
507 | 2 | } else { |
508 | 2 | RETURN_IF_ERROR(_tablet_finder->find_tablets(_state, block, num_rows, _partitions, |
509 | 2 | _tablet_indexes, _skip)); |
510 | 2 | } |
511 | 2 | if (has_filtered_rows) { |
512 | 0 | for (int i = 0; i < num_rows; i++) { |
513 | 0 | _skip[i] = _skip[i] || _block_convertor->filter_map()[i]; |
514 | 0 | } |
515 | 0 | } |
516 | 2 | RETURN_IF_ERROR(_filter_block(block, row_part_tablet_ids)); |
517 | 2 | return Status::OK(); |
518 | 2 | } |
519 | | |
520 | | void VRowDistribution::_reset_row_part_tablet_ids( |
521 | 9 | std::vector<RowPartTabletIds>& row_part_tablet_ids, int64_t rows) { |
522 | 9 | row_part_tablet_ids.resize(_schema->indexes().size()); |
523 | 9 | for (auto& row_part_tablet_id : row_part_tablet_ids) { |
524 | 9 | auto& row_ids = row_part_tablet_id.row_ids; |
525 | 9 | auto& partition_ids = row_part_tablet_id.partition_ids; |
526 | 9 | auto& tablet_ids = row_part_tablet_id.tablet_ids; |
527 | | |
528 | 9 | row_ids.clear(); |
529 | 9 | partition_ids.clear(); |
530 | 9 | tablet_ids.clear(); |
531 | | // This is important for performance. |
532 | 9 | row_ids.reserve(rows); |
533 | 9 | partition_ids.reserve(rows); |
534 | 9 | tablet_ids.reserve(rows); |
535 | 9 | } |
536 | 9 | } |
537 | | |
538 | | Status VRowDistribution::generate_rows_distribution( |
539 | | Block& input_block, std::shared_ptr<Block>& block, |
540 | 9 | std::vector<RowPartTabletIds>& row_part_tablet_ids, int64_t& rows_stat_val) { |
541 | 9 | auto input_rows = input_block.rows(); |
542 | 9 | _reset_row_part_tablet_ids(row_part_tablet_ids, input_rows); |
543 | | |
544 | | // we store the batching block with value of `input_block`. so just do all of these again. |
545 | 9 | bool has_filtered_rows = false; |
546 | 9 | RETURN_IF_ERROR(_block_convertor->validate_and_convert_block( |
547 | 9 | _state, &input_block, block, *_vec_output_expr_ctxs, input_rows, has_filtered_rows)); |
548 | | |
549 | | // batching block rows which need new partitions. deal together at finish. |
550 | 9 | if (!_batching_block) [[unlikely]] { |
551 | 8 | std::unique_ptr<Block> tmp_block = input_block.create_same_struct_block(0); |
552 | 8 | _batching_block = MutableBlock::create_unique(std::move(*tmp_block)); |
553 | 8 | } |
554 | | |
555 | 9 | auto num_rows = block->rows(); |
556 | 9 | _reset_find_tablets(num_rows); |
557 | | |
558 | | // if there's projection of partition calc, we need to calc it first. |
559 | 9 | auto [part_ctxs, part_funcs] = _get_partition_function(); |
560 | 9 | std::vector<uint16_t> partition_cols_idx; |
561 | 9 | if (_vpartition->is_projection_partition()) { |
562 | | // calc the start value of missing partition ranges. |
563 | 2 | auto func_size = part_funcs.size(); |
564 | 4 | for (int i = 0; i < func_size; ++i) { |
565 | 2 | int result_idx = -1; |
566 | | // we just calc left range here. leave right to FE to avoid dup calc. |
567 | 2 | RETURN_IF_ERROR(part_funcs[i]->execute(part_ctxs[i].get(), block.get(), &result_idx)); |
568 | | |
569 | 2 | VLOG_DEBUG << "Partition-calculated block:\n" << block->dump_data(0, 1); |
570 | 2 | DCHECK(result_idx != -1); |
571 | | |
572 | 2 | partition_cols_idx.push_back(cast_set<uint16_t>(result_idx)); |
573 | 2 | } |
574 | | |
575 | | // change the column to compare to transformed. |
576 | 2 | _vpartition->set_transformed_slots(partition_cols_idx); |
577 | 2 | } |
578 | | |
579 | 9 | Status st = Status::OK(); |
580 | 9 | if (_vpartition->is_auto_detect_overwrite() && !_deal_batched) { |
581 | | // when overwrite, no auto create partition allowed. |
582 | 2 | st = _generate_rows_distribution_for_auto_overwrite(input_block, block.get(), |
583 | 2 | partition_cols_idx, has_filtered_rows, |
584 | 2 | row_part_tablet_ids, rows_stat_val); |
585 | 7 | } else if (_vpartition->is_auto_partition() && !_deal_batched) { |
586 | 2 | st = _generate_rows_distribution_for_auto_partition(input_block, block.get(), |
587 | 2 | partition_cols_idx, has_filtered_rows, |
588 | 2 | row_part_tablet_ids, rows_stat_val); |
589 | 5 | } else { // not auto partition |
590 | 5 | st = _generate_rows_distribution_for_non_auto_partition(block.get(), has_filtered_rows, |
591 | 5 | row_part_tablet_ids); |
592 | 5 | } |
593 | | |
594 | 9 | return st; |
595 | 9 | } |
596 | | |
597 | | // reuse vars for find_tablets |
598 | 11 | void VRowDistribution::_reset_find_tablets(int64_t rows) { |
599 | 11 | _tablet_finder->filter_bitmap().Reset(rows); |
600 | 11 | _partitions.assign(rows, nullptr); |
601 | 11 | _skip.assign(rows, false); |
602 | 11 | _tablet_indexes.assign(rows, 0); |
603 | 11 | } |
604 | | |
605 | | } // namespace doris |